From f1cb83fcf408b2870e3c4372fb13e58255a2cadf Mon Sep 17 00:00:00 2001 From: Yi Wu Date: Thu, 18 Jan 2018 17:32:50 -0800 Subject: [PATCH] Fix Flush() keep waiting after flush finish Summary: Flush() call could be waiting indefinitely if min_write_buffer_number_to_merge is used. Consider the sequence: 1. User call Flush() with flush_options.wait = true 2. The manual flush started in the background 3. New memtable become immutable because of writes. The new memtable will not trigger flush if min_write_buffer_number_to_merge is not reached. 4. The manual flush finish. Because of the new memtable created at step 3 not being flush, previous logic of WaitForFlushMemTable() keep waiting, despite the memtables it intent to flush has been flushed. Here instead of checking if there are any more memtables to flush, WaitForFlushMemTable() also check the id of the earliest memtable. If the id is larger than that of latest memtable at the time flush was initiated, it means all the memtable at the time of flush start has all been flush. Closes https://github.com/facebook/rocksdb/pull/3378 Differential Revision: D6746789 Pulled By: yiwu-arbug fbshipit-source-id: 35e698f71c7f90b06337a93e6825f4ea3b619bfa --- HISTORY.md | 1 + db/column_family.cc | 3 ++- db/column_family.h | 9 ++++++++- db/db_flush_test.cc | 35 ++++++++++++++++++++++++++++++++++ db/db_impl.h | 8 ++++++-- db/db_impl_compaction_flush.cc | 20 +++++++++++++++---- db/memtable.h | 8 ++++++++ db/memtable_list.h | 25 ++++++++++++++++++++---- 8 files changed, 97 insertions(+), 12 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index d9ec5d444..baf4961da 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -2,6 +2,7 @@ ## Unreleased ### Bug Fixes * Fix `DisableFileDeletions()` followed by `GetSortedWalFiles()` to not return obsolete WAL files that `PurgeObsoleteFiles()` is going to delete. +* Fix DB::Flush() keep waiting after flush finish under certain condition. ## 5.10.0 (12/11/2017) ### Public API Change diff --git a/db/column_family.cc b/db/column_family.cc index 81b63262e..c32e97f06 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -385,7 +385,8 @@ ColumnFamilyData::ColumnFamilyData( pending_flush_(false), pending_compaction_(false), prev_compaction_needed_bytes_(0), - allow_2pc_(db_options.allow_2pc) { + allow_2pc_(db_options.allow_2pc), + last_memtable_id_(0) { Ref(); // Convert user defined table properties collector factories to internal ones. diff --git a/db/column_family.h b/db/column_family.h index 9a125aa1c..ce1fd4738 100644 --- a/db/column_family.h +++ b/db/column_family.h @@ -239,7 +239,11 @@ class ColumnFamilyData { void SetCurrent(Version* _current); uint64_t GetNumLiveVersions() const; // REQUIRE: DB mutex held uint64_t GetTotalSstFilesSize() const; // REQUIRE: DB mutex held - void SetMemtable(MemTable* new_mem) { mem_ = new_mem; } + void SetMemtable(MemTable* new_mem) { + uint64_t memtable_id = last_memtable_id_.fetch_add(1) + 1; + new_mem->SetID(memtable_id); + mem_ = new_mem; + } // calculate the oldest log needed for the durability of this column family uint64_t OldestLogToKeep(); @@ -419,6 +423,9 @@ class ColumnFamilyData { // if the database was opened with 2pc enabled bool allow_2pc_; + + // Memtable id to track flush. + std::atomic last_memtable_id_; }; // ColumnFamilySet has interesting thread-safety requirements diff --git a/db/db_flush_test.cc b/db/db_flush_test.cc index 107e82467..83895ea6c 100644 --- a/db/db_flush_test.cc +++ b/db/db_flush_test.cc @@ -126,6 +126,41 @@ TEST_F(DBFlushTest, FlushInLowPriThreadPool) { ASSERT_EQ(1, num_compactions); } +TEST_F(DBFlushTest, ManualFlushWithMinWriteBufferNumberToMerge) { + Options options = CurrentOptions(); + options.write_buffer_size = 100; + options.max_write_buffer_number = 4; + options.min_write_buffer_number_to_merge = 3; + Reopen(options); + + SyncPoint::GetInstance()->LoadDependency( + {{"DBImpl::BGWorkFlush", + "DBFlushTest::ManualFlushWithMinWriteBufferNumberToMerge:1"}, + {"DBFlushTest::ManualFlushWithMinWriteBufferNumberToMerge:2", + "DBImpl::FlushMemTableToOutputFile:BeforeInstallSV"}}); + SyncPoint::GetInstance()->EnableProcessing(); + + ASSERT_OK(Put("key1", "value1")); + + port::Thread t([&]() { + // The call wait for flush to finish, i.e. with flush_options.wait = true. + ASSERT_OK(Flush()); + }); + + // Wait for flush start. + TEST_SYNC_POINT("DBFlushTest::ManualFlushWithMinWriteBufferNumberToMerge:1"); + // Insert a second memtable before the manual flush finish. + // At the end of the manual flush job, it will check if further flush + // is needed, but it will not trigger flush of the second memtable because + // min_write_buffer_number_to_merge is not reached. + ASSERT_OK(Put("key2", "value2")); + ASSERT_OK(dbfull()->TEST_SwitchMemtable()); + TEST_SYNC_POINT("DBFlushTest::ManualFlushWithMinWriteBufferNumberToMerge:2"); + + // Manual flush should return, without waiting for flush indefinitely. + t.join(); +} + TEST_P(DBFlushDirectIOTest, DirectIO) { Options options; options.create_if_missing = true; diff --git a/db/db_impl.h b/db/db_impl.h index 5249d72eb..6c8dbaa8a 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -813,8 +813,12 @@ class DBImpl : public DB { Status FlushMemTable(ColumnFamilyData* cfd, const FlushOptions& options, bool writes_stopped = false); - // Wait for memtable flushed - Status WaitForFlushMemTable(ColumnFamilyData* cfd); + // Wait for memtable flushed. + // If flush_memtable_id is non-null, wait until the memtable with the ID + // gets flush. Otherwise, wait until the column family don't have any + // memtable pending flush. + Status WaitForFlushMemTable(ColumnFamilyData* cfd, + const uint64_t* flush_memtable_id = nullptr); // REQUIRES: mutex locked Status SwitchWAL(WriteContext* write_context); diff --git a/db/db_impl_compaction_flush.cc b/db/db_impl_compaction_flush.cc index 37facf5df..df097f478 100644 --- a/db/db_impl_compaction_flush.cc +++ b/db/db_impl_compaction_flush.cc @@ -134,6 +134,7 @@ Status DBImpl::FlushMemTableToOutputFile( } if (s.ok()) { + TEST_SYNC_POINT("DBImpl::FlushMemTableToOutputFile:BeforeInstallSV"); InstallSuperVersionAndScheduleWork(cfd, &job_context->superversion_context, mutable_cf_options); if (made_progress) { @@ -810,7 +811,13 @@ int DBImpl::Level0StopWriteTrigger(ColumnFamilyHandle* column_family) { Status DBImpl::Flush(const FlushOptions& flush_options, ColumnFamilyHandle* column_family) { auto cfh = reinterpret_cast(column_family); - return FlushMemTable(cfh->cfd(), flush_options); + ROCKS_LOG_INFO(immutable_db_options_.info_log, "[%s] Manual flush start.", + cfh->GetName().c_str()); + Status s = FlushMemTable(cfh->cfd(), flush_options); + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "[%s] Manual flush finished, status: %s\n", + cfh->GetName().c_str(), s.ToString().c_str()); + return s; } Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level, @@ -945,6 +952,7 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd, const FlushOptions& flush_options, bool writes_stopped) { Status s; + uint64_t flush_memtable_id = 0; { WriteContext context; InstrumentedMutexLock guard_lock(&mutex_); @@ -962,6 +970,7 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd, // SwitchMemtable() will release and reacquire mutex during execution s = SwitchMemtable(cfd, &context); + flush_memtable_id = cfd->imm()->GetLatestMemTableID(); if (!writes_stopped) { write_thread_.ExitUnbatched(&w); @@ -976,16 +985,19 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd, if (s.ok() && flush_options.wait) { // Wait until the compaction completes - s = WaitForFlushMemTable(cfd); + s = WaitForFlushMemTable(cfd, &flush_memtable_id); } return s; } -Status DBImpl::WaitForFlushMemTable(ColumnFamilyData* cfd) { +Status DBImpl::WaitForFlushMemTable(ColumnFamilyData* cfd, + const uint64_t* flush_memtable_id) { Status s; // Wait until the compaction completes InstrumentedMutexLock l(&mutex_); - while (cfd->imm()->NumNotFlushed() > 0 && bg_error_.ok()) { + while (cfd->imm()->NumNotFlushed() > 0 && bg_error_.ok() && + (flush_memtable_id == nullptr || + cfd->imm()->GetEarliestMemTableID() <= *flush_memtable_id)) { if (shutting_down_.load(std::memory_order_acquire)) { return Status::ShutdownInProgress(); } diff --git a/db/memtable.h b/db/memtable.h index 76e3cf1bf..bfafbeacc 100644 --- a/db/memtable.h +++ b/db/memtable.h @@ -368,6 +368,11 @@ class MemTable { return oldest_key_time_.load(std::memory_order_relaxed); } + // REQUIRES: db_mutex held. + void SetID(uint64_t id) { id_ = id; } + + uint64_t GetID() const { return id_; } + private: enum FlushStateEnum { FLUSH_NOT_REQUESTED, FLUSH_REQUESTED, FLUSH_SCHEDULED }; @@ -437,6 +442,9 @@ class MemTable { // Timestamp of oldest key std::atomic oldest_key_time_; + // Memtable id to track flush. + uint64_t id_ = 0; + // Returns a heuristic flush decision bool ShouldFlushNow() const; diff --git a/db/memtable_list.h b/db/memtable_list.h index 1e46642a9..c2ac65a2f 100644 --- a/db/memtable_list.h +++ b/db/memtable_list.h @@ -5,11 +5,12 @@ // #pragma once -#include -#include -#include -#include #include +#include +#include +#include +#include +#include #include "db/dbformat.h" #include "db/memtable.h" @@ -244,6 +245,22 @@ class MemTableList { uint64_t GetMinLogContainingPrepSection(); + uint64_t GetEarliestMemTableID() const { + auto& memlist = current_->memlist_; + if (memlist.empty()) { + return std::numeric_limits::max(); + } + return memlist.back()->GetID(); + } + + uint64_t GetLatestMemTableID() const { + auto& memlist = current_->memlist_; + if (memlist.empty()) { + return 0; + } + return memlist.front()->GetID(); + } + private: // DB mutex held void InstallNewVersion();