Fix Flush() keep waiting after flush finish

Summary:
Flush() call could be waiting indefinitely if min_write_buffer_number_to_merge is used. Consider the sequence:
1. User call Flush() with flush_options.wait = true
2. The manual flush started in the background
3. New memtable become immutable because of writes. The new memtable will not trigger flush if min_write_buffer_number_to_merge is not reached.
4. The manual flush finish.

Because of the new memtable created at step 3 not being flush, previous logic of WaitForFlushMemTable() keep waiting, despite the memtables it intent to flush has been flushed.

Here instead of checking if there are any more memtables to flush, WaitForFlushMemTable() also check the id of the earliest memtable. If the id is larger than that of latest memtable at the time flush was initiated, it means all the memtable at the time of flush start has all been flush.
Closes https://github.com/facebook/rocksdb/pull/3378

Differential Revision: D6746789

Pulled By: yiwu-arbug

fbshipit-source-id: 35e698f71c7f90b06337a93e6825f4ea3b619bfa
This commit is contained in:
Yi Wu 2018-01-18 17:32:50 -08:00 committed by Facebook Github Bot
parent b9873162f0
commit f1cb83fcf4
8 changed files with 97 additions and 12 deletions

View File

@ -2,6 +2,7 @@
## Unreleased
### Bug Fixes
* Fix `DisableFileDeletions()` followed by `GetSortedWalFiles()` to not return obsolete WAL files that `PurgeObsoleteFiles()` is going to delete.
* Fix DB::Flush() keep waiting after flush finish under certain condition.
## 5.10.0 (12/11/2017)
### Public API Change

View File

@ -385,7 +385,8 @@ ColumnFamilyData::ColumnFamilyData(
pending_flush_(false),
pending_compaction_(false),
prev_compaction_needed_bytes_(0),
allow_2pc_(db_options.allow_2pc) {
allow_2pc_(db_options.allow_2pc),
last_memtable_id_(0) {
Ref();
// Convert user defined table properties collector factories to internal ones.

View File

@ -239,7 +239,11 @@ class ColumnFamilyData {
void SetCurrent(Version* _current);
uint64_t GetNumLiveVersions() const; // REQUIRE: DB mutex held
uint64_t GetTotalSstFilesSize() const; // REQUIRE: DB mutex held
void SetMemtable(MemTable* new_mem) { mem_ = new_mem; }
void SetMemtable(MemTable* new_mem) {
uint64_t memtable_id = last_memtable_id_.fetch_add(1) + 1;
new_mem->SetID(memtable_id);
mem_ = new_mem;
}
// calculate the oldest log needed for the durability of this column family
uint64_t OldestLogToKeep();
@ -419,6 +423,9 @@ class ColumnFamilyData {
// if the database was opened with 2pc enabled
bool allow_2pc_;
// Memtable id to track flush.
std::atomic<uint64_t> last_memtable_id_;
};
// ColumnFamilySet has interesting thread-safety requirements

View File

@ -126,6 +126,41 @@ TEST_F(DBFlushTest, FlushInLowPriThreadPool) {
ASSERT_EQ(1, num_compactions);
}
TEST_F(DBFlushTest, ManualFlushWithMinWriteBufferNumberToMerge) {
Options options = CurrentOptions();
options.write_buffer_size = 100;
options.max_write_buffer_number = 4;
options.min_write_buffer_number_to_merge = 3;
Reopen(options);
SyncPoint::GetInstance()->LoadDependency(
{{"DBImpl::BGWorkFlush",
"DBFlushTest::ManualFlushWithMinWriteBufferNumberToMerge:1"},
{"DBFlushTest::ManualFlushWithMinWriteBufferNumberToMerge:2",
"DBImpl::FlushMemTableToOutputFile:BeforeInstallSV"}});
SyncPoint::GetInstance()->EnableProcessing();
ASSERT_OK(Put("key1", "value1"));
port::Thread t([&]() {
// The call wait for flush to finish, i.e. with flush_options.wait = true.
ASSERT_OK(Flush());
});
// Wait for flush start.
TEST_SYNC_POINT("DBFlushTest::ManualFlushWithMinWriteBufferNumberToMerge:1");
// Insert a second memtable before the manual flush finish.
// At the end of the manual flush job, it will check if further flush
// is needed, but it will not trigger flush of the second memtable because
// min_write_buffer_number_to_merge is not reached.
ASSERT_OK(Put("key2", "value2"));
ASSERT_OK(dbfull()->TEST_SwitchMemtable());
TEST_SYNC_POINT("DBFlushTest::ManualFlushWithMinWriteBufferNumberToMerge:2");
// Manual flush should return, without waiting for flush indefinitely.
t.join();
}
TEST_P(DBFlushDirectIOTest, DirectIO) {
Options options;
options.create_if_missing = true;

View File

@ -813,8 +813,12 @@ class DBImpl : public DB {
Status FlushMemTable(ColumnFamilyData* cfd, const FlushOptions& options,
bool writes_stopped = false);
// Wait for memtable flushed
Status WaitForFlushMemTable(ColumnFamilyData* cfd);
// Wait for memtable flushed.
// If flush_memtable_id is non-null, wait until the memtable with the ID
// gets flush. Otherwise, wait until the column family don't have any
// memtable pending flush.
Status WaitForFlushMemTable(ColumnFamilyData* cfd,
const uint64_t* flush_memtable_id = nullptr);
// REQUIRES: mutex locked
Status SwitchWAL(WriteContext* write_context);

View File

@ -134,6 +134,7 @@ Status DBImpl::FlushMemTableToOutputFile(
}
if (s.ok()) {
TEST_SYNC_POINT("DBImpl::FlushMemTableToOutputFile:BeforeInstallSV");
InstallSuperVersionAndScheduleWork(cfd, &job_context->superversion_context,
mutable_cf_options);
if (made_progress) {
@ -810,7 +811,13 @@ int DBImpl::Level0StopWriteTrigger(ColumnFamilyHandle* column_family) {
Status DBImpl::Flush(const FlushOptions& flush_options,
ColumnFamilyHandle* column_family) {
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
return FlushMemTable(cfh->cfd(), flush_options);
ROCKS_LOG_INFO(immutable_db_options_.info_log, "[%s] Manual flush start.",
cfh->GetName().c_str());
Status s = FlushMemTable(cfh->cfd(), flush_options);
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"[%s] Manual flush finished, status: %s\n",
cfh->GetName().c_str(), s.ToString().c_str());
return s;
}
Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level,
@ -945,6 +952,7 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
const FlushOptions& flush_options,
bool writes_stopped) {
Status s;
uint64_t flush_memtable_id = 0;
{
WriteContext context;
InstrumentedMutexLock guard_lock(&mutex_);
@ -962,6 +970,7 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
// SwitchMemtable() will release and reacquire mutex during execution
s = SwitchMemtable(cfd, &context);
flush_memtable_id = cfd->imm()->GetLatestMemTableID();
if (!writes_stopped) {
write_thread_.ExitUnbatched(&w);
@ -976,16 +985,19 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
if (s.ok() && flush_options.wait) {
// Wait until the compaction completes
s = WaitForFlushMemTable(cfd);
s = WaitForFlushMemTable(cfd, &flush_memtable_id);
}
return s;
}
Status DBImpl::WaitForFlushMemTable(ColumnFamilyData* cfd) {
Status DBImpl::WaitForFlushMemTable(ColumnFamilyData* cfd,
const uint64_t* flush_memtable_id) {
Status s;
// Wait until the compaction completes
InstrumentedMutexLock l(&mutex_);
while (cfd->imm()->NumNotFlushed() > 0 && bg_error_.ok()) {
while (cfd->imm()->NumNotFlushed() > 0 && bg_error_.ok() &&
(flush_memtable_id == nullptr ||
cfd->imm()->GetEarliestMemTableID() <= *flush_memtable_id)) {
if (shutting_down_.load(std::memory_order_acquire)) {
return Status::ShutdownInProgress();
}

View File

@ -368,6 +368,11 @@ class MemTable {
return oldest_key_time_.load(std::memory_order_relaxed);
}
// REQUIRES: db_mutex held.
void SetID(uint64_t id) { id_ = id; }
uint64_t GetID() const { return id_; }
private:
enum FlushStateEnum { FLUSH_NOT_REQUESTED, FLUSH_REQUESTED, FLUSH_SCHEDULED };
@ -437,6 +442,9 @@ class MemTable {
// Timestamp of oldest key
std::atomic<uint64_t> oldest_key_time_;
// Memtable id to track flush.
uint64_t id_ = 0;
// Returns a heuristic flush decision
bool ShouldFlushNow() const;

View File

@ -5,11 +5,12 @@
//
#pragma once
#include <string>
#include <list>
#include <vector>
#include <set>
#include <deque>
#include <limits>
#include <list>
#include <set>
#include <string>
#include <vector>
#include "db/dbformat.h"
#include "db/memtable.h"
@ -244,6 +245,22 @@ class MemTableList {
uint64_t GetMinLogContainingPrepSection();
uint64_t GetEarliestMemTableID() const {
auto& memlist = current_->memlist_;
if (memlist.empty()) {
return std::numeric_limits<uint64_t>::max();
}
return memlist.back()->GetID();
}
uint64_t GetLatestMemTableID() const {
auto& memlist = current_->memlist_;
if (memlist.empty()) {
return 0;
}
return memlist.front()->GetID();
}
private:
// DB mutex held
void InstallNewVersion();