diff --git a/HISTORY.md b/HISTORY.md index d9ec5d444..baf4961da 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -2,6 +2,7 @@ ## Unreleased ### Bug Fixes * Fix `DisableFileDeletions()` followed by `GetSortedWalFiles()` to not return obsolete WAL files that `PurgeObsoleteFiles()` is going to delete. +* Fix DB::Flush() keep waiting after flush finish under certain condition. ## 5.10.0 (12/11/2017) ### Public API Change diff --git a/db/column_family.cc b/db/column_family.cc index 81b63262e..c32e97f06 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -385,7 +385,8 @@ ColumnFamilyData::ColumnFamilyData( pending_flush_(false), pending_compaction_(false), prev_compaction_needed_bytes_(0), - allow_2pc_(db_options.allow_2pc) { + allow_2pc_(db_options.allow_2pc), + last_memtable_id_(0) { Ref(); // Convert user defined table properties collector factories to internal ones. diff --git a/db/column_family.h b/db/column_family.h index 9a125aa1c..ce1fd4738 100644 --- a/db/column_family.h +++ b/db/column_family.h @@ -239,7 +239,11 @@ class ColumnFamilyData { void SetCurrent(Version* _current); uint64_t GetNumLiveVersions() const; // REQUIRE: DB mutex held uint64_t GetTotalSstFilesSize() const; // REQUIRE: DB mutex held - void SetMemtable(MemTable* new_mem) { mem_ = new_mem; } + void SetMemtable(MemTable* new_mem) { + uint64_t memtable_id = last_memtable_id_.fetch_add(1) + 1; + new_mem->SetID(memtable_id); + mem_ = new_mem; + } // calculate the oldest log needed for the durability of this column family uint64_t OldestLogToKeep(); @@ -419,6 +423,9 @@ class ColumnFamilyData { // if the database was opened with 2pc enabled bool allow_2pc_; + + // Memtable id to track flush. + std::atomic last_memtable_id_; }; // ColumnFamilySet has interesting thread-safety requirements diff --git a/db/db_flush_test.cc b/db/db_flush_test.cc index 107e82467..83895ea6c 100644 --- a/db/db_flush_test.cc +++ b/db/db_flush_test.cc @@ -126,6 +126,41 @@ TEST_F(DBFlushTest, FlushInLowPriThreadPool) { ASSERT_EQ(1, num_compactions); } +TEST_F(DBFlushTest, ManualFlushWithMinWriteBufferNumberToMerge) { + Options options = CurrentOptions(); + options.write_buffer_size = 100; + options.max_write_buffer_number = 4; + options.min_write_buffer_number_to_merge = 3; + Reopen(options); + + SyncPoint::GetInstance()->LoadDependency( + {{"DBImpl::BGWorkFlush", + "DBFlushTest::ManualFlushWithMinWriteBufferNumberToMerge:1"}, + {"DBFlushTest::ManualFlushWithMinWriteBufferNumberToMerge:2", + "DBImpl::FlushMemTableToOutputFile:BeforeInstallSV"}}); + SyncPoint::GetInstance()->EnableProcessing(); + + ASSERT_OK(Put("key1", "value1")); + + port::Thread t([&]() { + // The call wait for flush to finish, i.e. with flush_options.wait = true. + ASSERT_OK(Flush()); + }); + + // Wait for flush start. + TEST_SYNC_POINT("DBFlushTest::ManualFlushWithMinWriteBufferNumberToMerge:1"); + // Insert a second memtable before the manual flush finish. + // At the end of the manual flush job, it will check if further flush + // is needed, but it will not trigger flush of the second memtable because + // min_write_buffer_number_to_merge is not reached. + ASSERT_OK(Put("key2", "value2")); + ASSERT_OK(dbfull()->TEST_SwitchMemtable()); + TEST_SYNC_POINT("DBFlushTest::ManualFlushWithMinWriteBufferNumberToMerge:2"); + + // Manual flush should return, without waiting for flush indefinitely. + t.join(); +} + TEST_P(DBFlushDirectIOTest, DirectIO) { Options options; options.create_if_missing = true; diff --git a/db/db_impl.h b/db/db_impl.h index 5249d72eb..6c8dbaa8a 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -813,8 +813,12 @@ class DBImpl : public DB { Status FlushMemTable(ColumnFamilyData* cfd, const FlushOptions& options, bool writes_stopped = false); - // Wait for memtable flushed - Status WaitForFlushMemTable(ColumnFamilyData* cfd); + // Wait for memtable flushed. + // If flush_memtable_id is non-null, wait until the memtable with the ID + // gets flush. Otherwise, wait until the column family don't have any + // memtable pending flush. + Status WaitForFlushMemTable(ColumnFamilyData* cfd, + const uint64_t* flush_memtable_id = nullptr); // REQUIRES: mutex locked Status SwitchWAL(WriteContext* write_context); diff --git a/db/db_impl_compaction_flush.cc b/db/db_impl_compaction_flush.cc index 37facf5df..df097f478 100644 --- a/db/db_impl_compaction_flush.cc +++ b/db/db_impl_compaction_flush.cc @@ -134,6 +134,7 @@ Status DBImpl::FlushMemTableToOutputFile( } if (s.ok()) { + TEST_SYNC_POINT("DBImpl::FlushMemTableToOutputFile:BeforeInstallSV"); InstallSuperVersionAndScheduleWork(cfd, &job_context->superversion_context, mutable_cf_options); if (made_progress) { @@ -810,7 +811,13 @@ int DBImpl::Level0StopWriteTrigger(ColumnFamilyHandle* column_family) { Status DBImpl::Flush(const FlushOptions& flush_options, ColumnFamilyHandle* column_family) { auto cfh = reinterpret_cast(column_family); - return FlushMemTable(cfh->cfd(), flush_options); + ROCKS_LOG_INFO(immutable_db_options_.info_log, "[%s] Manual flush start.", + cfh->GetName().c_str()); + Status s = FlushMemTable(cfh->cfd(), flush_options); + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "[%s] Manual flush finished, status: %s\n", + cfh->GetName().c_str(), s.ToString().c_str()); + return s; } Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level, @@ -945,6 +952,7 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd, const FlushOptions& flush_options, bool writes_stopped) { Status s; + uint64_t flush_memtable_id = 0; { WriteContext context; InstrumentedMutexLock guard_lock(&mutex_); @@ -962,6 +970,7 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd, // SwitchMemtable() will release and reacquire mutex during execution s = SwitchMemtable(cfd, &context); + flush_memtable_id = cfd->imm()->GetLatestMemTableID(); if (!writes_stopped) { write_thread_.ExitUnbatched(&w); @@ -976,16 +985,19 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd, if (s.ok() && flush_options.wait) { // Wait until the compaction completes - s = WaitForFlushMemTable(cfd); + s = WaitForFlushMemTable(cfd, &flush_memtable_id); } return s; } -Status DBImpl::WaitForFlushMemTable(ColumnFamilyData* cfd) { +Status DBImpl::WaitForFlushMemTable(ColumnFamilyData* cfd, + const uint64_t* flush_memtable_id) { Status s; // Wait until the compaction completes InstrumentedMutexLock l(&mutex_); - while (cfd->imm()->NumNotFlushed() > 0 && bg_error_.ok()) { + while (cfd->imm()->NumNotFlushed() > 0 && bg_error_.ok() && + (flush_memtable_id == nullptr || + cfd->imm()->GetEarliestMemTableID() <= *flush_memtable_id)) { if (shutting_down_.load(std::memory_order_acquire)) { return Status::ShutdownInProgress(); } diff --git a/db/memtable.h b/db/memtable.h index 76e3cf1bf..bfafbeacc 100644 --- a/db/memtable.h +++ b/db/memtable.h @@ -368,6 +368,11 @@ class MemTable { return oldest_key_time_.load(std::memory_order_relaxed); } + // REQUIRES: db_mutex held. + void SetID(uint64_t id) { id_ = id; } + + uint64_t GetID() const { return id_; } + private: enum FlushStateEnum { FLUSH_NOT_REQUESTED, FLUSH_REQUESTED, FLUSH_SCHEDULED }; @@ -437,6 +442,9 @@ class MemTable { // Timestamp of oldest key std::atomic oldest_key_time_; + // Memtable id to track flush. + uint64_t id_ = 0; + // Returns a heuristic flush decision bool ShouldFlushNow() const; diff --git a/db/memtable_list.h b/db/memtable_list.h index 1e46642a9..c2ac65a2f 100644 --- a/db/memtable_list.h +++ b/db/memtable_list.h @@ -5,11 +5,12 @@ // #pragma once -#include -#include -#include -#include #include +#include +#include +#include +#include +#include #include "db/dbformat.h" #include "db/memtable.h" @@ -244,6 +245,22 @@ class MemTableList { uint64_t GetMinLogContainingPrepSection(); + uint64_t GetEarliestMemTableID() const { + auto& memlist = current_->memlist_; + if (memlist.empty()) { + return std::numeric_limits::max(); + } + return memlist.back()->GetID(); + } + + uint64_t GetLatestMemTableID() const { + auto& memlist = current_->memlist_; + if (memlist.empty()) { + return 0; + } + return memlist.front()->GetID(); + } + private: // DB mutex held void InstallNewVersion();