diff --git a/db/column_family.cc b/db/column_family.cc index 7dfd0b37b..797018077 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -370,7 +370,8 @@ ColumnFamilyData::ColumnFamilyData( column_family_set_(column_family_set), pending_flush_(false), pending_compaction_(false), - prev_compaction_needed_bytes_(0) { + prev_compaction_needed_bytes_(0), + allow_2pc_(db_options.allow_2pc) { Ref(); // Convert user defined table properties collector factories to internal ones. @@ -492,6 +493,25 @@ ColumnFamilyOptions ColumnFamilyData::GetLatestCFOptions() const { return BuildColumnFamilyOptions(initial_cf_options_, mutable_cf_options_); } +uint64_t ColumnFamilyData::OldestLogToKeep() { + auto current_log = GetLogNumber(); + + if (allow_2pc_) { + auto imm_prep_log = imm()->GetMinLogContainingPrepSection(); + auto mem_prep_log = mem()->GetMinLogContainingPrepSection(); + + if (imm_prep_log > 0 && imm_prep_log < current_log) { + current_log = imm_prep_log; + } + + if (mem_prep_log > 0 && mem_prep_log < current_log) { + current_log = mem_prep_log; + } + } + + return current_log; +} + const double kIncSlowdownRatio = 0.8; const double kDecSlowdownRatio = 1 / kIncSlowdownRatio; const double kNearStopSlowdownRatio = 0.6; diff --git a/db/column_family.h b/db/column_family.h index 29d297157..2bf579a4a 100644 --- a/db/column_family.h +++ b/db/column_family.h @@ -239,6 +239,9 @@ class ColumnFamilyData { uint64_t GetTotalSstFilesSize() const; // REQUIRE: DB mutex held void SetMemtable(MemTable* new_mem) { mem_ = new_mem; } + // calculate the oldest log needed for the durability of this column family + uint64_t OldestLogToKeep(); + // See Memtable constructor for explanation of earliest_seq param. MemTable* ConstructNewMemtable(const MutableCFOptions& mutable_cf_options, SequenceNumber earliest_seq); @@ -404,6 +407,9 @@ class ColumnFamilyData { bool pending_compaction_; uint64_t prev_compaction_needed_bytes_; + + // if the database was opened with 2pc enabled + bool allow_2pc_; }; // ColumnFamilySet has interesting thread-safety requirements diff --git a/db/db_impl.cc b/db/db_impl.cc index 766e35e9d..99e51003f 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -338,6 +338,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname) last_stats_dump_time_microsec_(0), next_job_id_(1), has_unpersisted_data_(false), + unable_to_flush_oldest_log_(false), env_options_(BuildDBOptions(immutable_db_options_, mutable_db_options_)), num_running_ingest_file_(0), #ifndef ROCKSDB_LITE @@ -654,6 +655,10 @@ void DBImpl::MaybeDumpStats() { } uint64_t DBImpl::FindMinPrepLogReferencedByMemTable() { + if (!allow_2pc()) { + return 0; + } + uint64_t min_log = 0; // we must look through the memtables for two phase transactions @@ -698,6 +703,11 @@ void DBImpl::MarkLogAsContainingPrepSection(uint64_t log) { } uint64_t DBImpl::FindMinLogContainingOutstandingPrep() { + + if (!allow_2pc()) { + return 0; + } + std::lock_guard lock(prep_heap_mutex_); uint64_t min_log = 0; @@ -2493,7 +2503,7 @@ Status DBImpl::SetDBOptions( mutable_db_options_ = new_options; if (total_log_size_ > GetMaxTotalWalSize()) { - FlushColumnFamilies(); + MaybeFlushColumnFamilies(); } persist_options_status = PersistOptions(); @@ -4686,9 +4696,8 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, versions_->GetColumnFamilySet()->NumberOfColumnFamilies() == 1); if (UNLIKELY(!single_column_family_mode_ && - !alive_log_files_.begin()->getting_flushed && total_log_size_ > GetMaxTotalWalSize())) { - FlushColumnFamilies(); + MaybeFlushColumnFamilies(); } else if (UNLIKELY(write_buffer_manager_->ShouldFlush())) { // Before a new memtable is added in SwitchMemtable(), // write_buffer_manager_->ShouldFlush() will keep returning true. If another @@ -5006,28 +5015,40 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, return status; } -void DBImpl::FlushColumnFamilies() { +void DBImpl::MaybeFlushColumnFamilies() { mutex_.AssertHeld(); - WriteContext context; - if (alive_log_files_.begin()->getting_flushed) { return; } - uint64_t flush_column_family_if_log_file = alive_log_files_.begin()->number; - alive_log_files_.begin()->getting_flushed = true; + auto oldest_alive_log = alive_log_files_.begin()->number; + auto oldest_log_with_uncommited_prep = FindMinLogContainingOutstandingPrep(); + + if (allow_2pc() && + unable_to_flush_oldest_log_ && + oldest_log_with_uncommited_prep > 0 && + oldest_log_with_uncommited_prep <= oldest_alive_log) { + // we already attempted to flush all column families dependent on + // the oldest alive log but the log still contained uncommited transactions. + // the oldest alive log STILL contains uncommited transaction so there + // is still nothing that we can do. + return; + } + + WriteContext context; + Log(InfoLogLevel::INFO_LEVEL, immutable_db_options_.info_log, "Flushing all column families with data in WAL number %" PRIu64 ". Total log size is %" PRIu64 " while max_total_wal_size is %" PRIu64, - flush_column_family_if_log_file, total_log_size_, GetMaxTotalWalSize()); + oldest_alive_log, total_log_size_, GetMaxTotalWalSize()); // no need to refcount because drop is happening in write thread, so can't // happen while we're in the write thread for (auto cfd : *versions_->GetColumnFamilySet()) { if (cfd->IsDropped()) { continue; } - if (cfd->GetLogNumber() <= flush_column_family_if_log_file) { + if (cfd->OldestLogToKeep() <= oldest_alive_log) { auto status = SwitchMemtable(cfd, &context); if (!status.ok()) { break; @@ -5037,6 +5058,26 @@ void DBImpl::FlushColumnFamilies() { } } MaybeScheduleFlushOrCompaction(); + + // we only mark this log as getting flushed if we have successfully + // flushed all data in this log. If this log contains outstanding prepred + // transactions then we cannot flush this log until those transactions are commited. + + unable_to_flush_oldest_log_ = false; + + if (allow_2pc()) { + if (oldest_log_with_uncommited_prep == 0 || + oldest_log_with_uncommited_prep > oldest_alive_log) { + // this log contains no outstanding prepared transactions + alive_log_files_.begin()->getting_flushed = true; + } else { + Log(InfoLogLevel::WARN_LEVEL, immutable_db_options_.info_log, + "Unable to release oldest log due to uncommited transaction"); + unable_to_flush_oldest_log_ = true; + } + } else { + alive_log_files_.begin()->getting_flushed = true; + } } uint64_t DBImpl::GetMaxTotalWalSize() const { diff --git a/db/db_impl.h b/db/db_impl.h index c87299b0f..f172717a1 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -308,6 +308,16 @@ class DBImpl : public DB { ColumnFamilyHandle* column_family = nullptr, bool disallow_trivial_move = false); + void TEST_MaybeFlushColumnFamilies(); + + bool TEST_UnableToFlushOldestLog() { + return unable_to_flush_oldest_log_; + } + + bool TEST_IsLogGettingFlushed() { + return alive_log_files_.begin()->getting_flushed; + } + // Force current memtable contents to be flushed. Status TEST_FlushMemTable(bool wait = true, ColumnFamilyHandle* cfh = nullptr); @@ -732,7 +742,7 @@ class DBImpl : public DB { // REQUIRES: mutex locked Status PersistOptions(); - void FlushColumnFamilies(); + void MaybeFlushColumnFamilies(); uint64_t GetMaxTotalWalSize() const; @@ -991,6 +1001,15 @@ class DBImpl : public DB { // Used when disableWAL is true. bool has_unpersisted_data_; + + // if an attempt was made to flush all column families that + // the oldest log depends on but uncommited data in the oldest + // log prevents the log from being released. + // We must attempt to free the dependent memtables again + // at a later time after the transaction in the oldest + // log is fully commited. + bool unable_to_flush_oldest_log_; + static const int KEEP_LOG_FILE_NUM = 1000; // MSVC version 1800 still does not have constexpr for ::max() static const uint64_t kNoTimeOut = port::kMaxUint64; diff --git a/db/db_impl_debug.cc b/db/db_impl_debug.cc index dbf2391bd..a69d2658d 100644 --- a/db/db_impl_debug.cc +++ b/db/db_impl_debug.cc @@ -19,6 +19,11 @@ uint64_t DBImpl::TEST_GetLevel0TotalSize() { return default_cf_handle_->cfd()->current()->storage_info()->NumLevelBytes(0); } +void DBImpl::TEST_MaybeFlushColumnFamilies() { + InstrumentedMutexLock l(&mutex_); + MaybeFlushColumnFamilies(); +} + int64_t DBImpl::TEST_MaxNextLevelOverlappingBytes( ColumnFamilyHandle* column_family) { ColumnFamilyData* cfd; diff --git a/db/memtable_list.h b/db/memtable_list.h index 67ef95bd3..97438d754 100644 --- a/db/memtable_list.h +++ b/db/memtable_list.h @@ -221,6 +221,8 @@ class MemTableList { // PickMemtablesToFlush() is called. void FlushRequested() { flush_requested_ = true; } + bool HasFlushRequested() { return flush_requested_; } + // Copying allowed // MemTableList(const MemTableList&); // void operator=(const MemTableList&); diff --git a/utilities/transactions/transaction_test.cc b/utilities/transactions/transaction_test.cc index 917dbb7d2..0ad1fdda7 100644 --- a/utilities/transactions/transaction_test.cc +++ b/utilities/transactions/transaction_test.cc @@ -1178,6 +1178,108 @@ TEST_P(TransactionTest, TwoPhaseLogRollingTest) { delete cfb; } +TEST_P(TransactionTest, TwoPhaseLogRollingTest2) { + DBImpl* db_impl = reinterpret_cast(db->GetRootDB()); + + Status s; + ColumnFamilyHandle *cfa, *cfb; + + ColumnFamilyOptions cf_options; + s = db->CreateColumnFamily(cf_options, "CFA", &cfa); + ASSERT_OK(s); + s = db->CreateColumnFamily(cf_options, "CFB", &cfb); + ASSERT_OK(s); + + WriteOptions wopts; + wopts.disableWAL = false; + wopts.sync = true; + + auto cfh_a = reinterpret_cast(cfa); + auto cfh_b = reinterpret_cast(cfb); + + TransactionOptions topts1; + Transaction* txn1 = db->BeginTransaction(wopts, topts1); + s = txn1->SetName("xid1"); + ASSERT_OK(s); + s = txn1->Put(cfa, "boys", "girls1"); + ASSERT_OK(s); + + Transaction* txn2 = db->BeginTransaction(wopts, topts1); + s = txn2->SetName("xid2"); + ASSERT_OK(s); + s = txn2->Put(cfb, "up", "down1"); + ASSERT_OK(s); + + // prepre transaction in LOG A + s = txn1->Prepare(); + ASSERT_OK(s); + + // prepre transaction in LOG A + s = txn2->Prepare(); + ASSERT_OK(s); + + // regular put so that mem table can actually be flushed for log rolling + s = db->Put(wopts, "cats", "dogs1"); + ASSERT_OK(s); + + auto prepare_log_no = txn1->GetLogNumber(); + + // roll to LOG B + s = db_impl->TEST_FlushMemTable(true); + ASSERT_OK(s); + + // now we pause background work so that + // imm()s are not flushed before we can check their status + s = db_impl->PauseBackgroundWork(); + ASSERT_OK(s); + + ASSERT_GT(db_impl->TEST_LogfileNumber(), prepare_log_no); + ASSERT_GT(cfh_a->cfd()->GetLogNumber(), prepare_log_no); + ASSERT_EQ(cfh_a->cfd()->GetLogNumber(), db_impl->TEST_LogfileNumber()); + ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), + prepare_log_no); + ASSERT_EQ(db_impl->TEST_FindMinPrepLogReferencedByMemTable(), 0); + + // commit in LOG B + s = txn1->Commit(); + ASSERT_OK(s); + + ASSERT_EQ(db_impl->TEST_FindMinPrepLogReferencedByMemTable(), prepare_log_no); + + ASSERT_TRUE(!db_impl->TEST_UnableToFlushOldestLog()); + + // request a flush for all column families such that the earliest + // alive log file can be killed + db_impl->TEST_MaybeFlushColumnFamilies(); + // log cannot be flushed because txn2 has not been commited + ASSERT_TRUE(!db_impl->TEST_IsLogGettingFlushed()); + ASSERT_TRUE(db_impl->TEST_UnableToFlushOldestLog()); + + // assert that cfa has a flush requested + ASSERT_TRUE(cfh_a->cfd()->imm()->HasFlushRequested()); + + // cfb should not be flushed becuse it has no data from LOG A + ASSERT_TRUE(!cfh_b->cfd()->imm()->HasFlushRequested()); + + // cfb now has data from LOG A + s = txn2->Commit(); + ASSERT_OK(s); + + db_impl->TEST_MaybeFlushColumnFamilies(); + ASSERT_TRUE(!db_impl->TEST_UnableToFlushOldestLog()); + + // we should see that cfb now has a flush requested + ASSERT_TRUE(cfh_b->cfd()->imm()->HasFlushRequested()); + + // all data in LOG A resides in a memtable that has been + // requested for a flush + ASSERT_TRUE(db_impl->TEST_IsLogGettingFlushed()); + + delete txn1; + delete txn2; + delete cfa; + delete cfb; +} /* * 1) use prepare to keep first log around to determine starting sequence * during recovery.