wait pending memtable writes on file ingestion or compact range (#6113)

Summary:
**Summary:**
This PR fixes two unordered_write related issues:
- ingestion job may skip the necessary memtable flush https://github.com/facebook/rocksdb/issues/6026
- compact range may cause memtable is flushed before pending unordered write finished
    1. `CompactRange` triggers memtable flush but doesn't wait for pending-writes
    2.  there are some pending writes but memtable is already flushed
    3.  the memtable related WAL is removed( note that the pending-writes were recorded in that WAL).
    4.  pending-writes write to newer created memtable
    5. there is a restart
    6. missing the previous pending-writes because WAL is removed but they aren't included in SST.

**How to solve:**
- Wait pending memtable writes before ingestion job check memtable key range
- Wait pending memtable writes before flush memtable.
**Note that: `CompactRange` calls `RangesOverlapWithMemtables` too without waiting for pending waits, but I'm not sure whether it affects the correctness.**

**Test Plan:**
make check
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6113

Differential Revision: D18895674

Pulled By: maysamyabandeh

fbshipit-source-id: da22b4476fc7e06c176020e7cc171eb78189ecaf
This commit is contained in:
Connor 2019-12-12 14:05:48 -08:00 committed by Facebook Github Bot
parent 814d4e7ce0
commit a844591201
7 changed files with 78 additions and 4 deletions

View File

@ -1503,6 +1503,37 @@ TEST_F(DBCompactionTest, DISABLED_ManualPartialFill) {
}
}
TEST_F(DBCompactionTest, ManualCompactionWithUnorderedWrite) {
rocksdb::SyncPoint::GetInstance()->LoadDependency(
{{"DBImpl::WriteImpl:UnorderedWriteAfterWriteWAL",
"DBCompactionTest::ManualCompactionWithUnorderedWrite:WaitWriteWAL"},
{"DBImpl::WaitForPendingWrites:BeforeBlock",
"DBImpl::WriteImpl:BeforeUnorderedWriteMemtable"}});
Options options = CurrentOptions();
options.unordered_write = true;
DestroyAndReopen(options);
Put("foo", "v1");
ASSERT_OK(Flush());
Put("bar", "v1");
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
port::Thread writer([&]() { Put("foo", "v2"); });
TEST_SYNC_POINT(
"DBCompactionTest::ManualCompactionWithUnorderedWrite:WaitWriteWAL");
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
writer.join();
ASSERT_EQ(Get("foo"), "v2");
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
Reopen(options);
ASSERT_EQ(Get("foo"), "v2");
}
TEST_F(DBCompactionTest, DeleteFileRange) {
Options options = CurrentOptions();
options.write_buffer_size = 10 * 1024 * 1024;

View File

@ -3958,6 +3958,13 @@ Status DBImpl::IngestExternalFiles(
nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_);
}
// When unordered_write is enabled, the keys are writing to memtable in an
// unordered way. If the ingestion job checks memtable key range before the
// key landing in memtable, the ingestion job may skip the necessary
// memtable flush.
// So wait here to ensure there is no pending write to memtable.
WaitForPendingWrites();
num_running_ingest_file_ += static_cast<int>(num_cfs);
TEST_SYNC_POINT("DBImpl::IngestExternalFile:AfterIncIngestFileCounter");

View File

@ -1416,6 +1416,7 @@ class DBImpl : public DB {
inline void WaitForPendingWrites() {
mutex_.AssertHeld();
TEST_SYNC_POINT("DBImpl::WaitForPendingWrites:BeforeBlock");
// In case of pipelined write is enabled, wait for all pending memtable
// writers.
if (immutable_db_options_.enable_pipelined_write) {

View File

@ -1540,6 +1540,7 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_);
}
}
WaitForPendingWrites();
if (!cfd->mem()->IsEmpty() || !cached_recoverable_state_empty_.load()) {
s = SwitchMemtable(cfd, &context);
@ -1625,11 +1626,11 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
}
}
}
TEST_SYNC_POINT("FlushMemTableFinished");
TEST_SYNC_POINT("DBImpl::FlushMemTable:FlushMemTableFinished");
return s;
}
// Flush all elments in 'column_family_datas'
// Flush all elements in 'column_family_datas'
// and atomically record the result to the MANIFEST.
Status DBImpl::AtomicFlushMemTables(
const autovector<ColumnFamilyData*>& column_family_datas,
@ -1665,6 +1666,7 @@ Status DBImpl::AtomicFlushMemTables(
nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_);
}
}
WaitForPendingWrites();
for (auto cfd : column_family_datas) {
if (cfd->IsDropped()) {

View File

@ -131,6 +131,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
log_used, log_ref, &seq, sub_batch_cnt,
pre_release_callback, kDoAssignOrder,
kDoPublishLastSeq, disable_memtable);
TEST_SYNC_POINT("DBImpl::WriteImpl:UnorderedWriteAfterWriteWAL");
if (!status.ok()) {
return status;
}
@ -138,6 +139,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
*seq_used = seq;
}
if (!disable_memtable) {
TEST_SYNC_POINT("DBImpl::WriteImpl:BeforeUnorderedWriteMemtable");
status = UnorderedWriteMemtable(write_options, my_batch, callback,
log_ref, seq, sub_batch_cnt);
}

View File

@ -183,7 +183,8 @@ TEST_F(DBErrorHandlingTest, CompactionWriteError) {
);
listener->EnableAutoRecovery(false);
rocksdb::SyncPoint::GetInstance()->LoadDependency(
{{"FlushMemTableFinished", "BackgroundCallCompaction:0"}});
{{"DBImpl::FlushMemTable:FlushMemTableFinished",
"BackgroundCallCompaction:0"}});
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"BackgroundCallCompaction:0", [&](void *) {
fault_env->SetFilesystemActive(false, Status::NoSpace("Out of space"));
@ -219,7 +220,8 @@ TEST_F(DBErrorHandlingTest, CorruptionError) {
ASSERT_EQ(s, Status::OK());
rocksdb::SyncPoint::GetInstance()->LoadDependency(
{{"FlushMemTableFinished", "BackgroundCallCompaction:0"}});
{{"DBImpl::FlushMemTable:FlushMemTableFinished",
"BackgroundCallCompaction:0"}});
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"BackgroundCallCompaction:0", [&](void *) {
fault_env->SetFilesystemActive(false, Status::Corruption("Corruption"));

View File

@ -1700,6 +1700,35 @@ TEST_F(ExternalSSTFileTest, SstFileWriterNonSharedKeys) {
ASSERT_OK(DeprecatedAddFile({file_path}));
}
TEST_F(ExternalSSTFileTest, WithUnorderedWrite) {
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->LoadDependency(
{{"DBImpl::WriteImpl:UnorderedWriteAfterWriteWAL",
"ExternalSSTFileTest::WithUnorderedWrite:WaitWriteWAL"},
{"DBImpl::WaitForPendingWrites:BeforeBlock",
"DBImpl::WriteImpl:BeforeUnorderedWriteMemtable"}});
SyncPoint::GetInstance()->SetCallBack(
"DBImpl::IngestExternalFile:NeedFlush", [&](void* need_flush) {
ASSERT_TRUE(*reinterpret_cast<bool*>(need_flush));
});
Options options = CurrentOptions();
options.unordered_write = true;
DestroyAndReopen(options);
Put("foo", "v1");
SyncPoint::GetInstance()->EnableProcessing();
port::Thread writer([&]() { Put("bar", "v2"); });
TEST_SYNC_POINT("ExternalSSTFileTest::WithUnorderedWrite:WaitWriteWAL");
ASSERT_OK(GenerateAndAddExternalFile(options, {{"bar", "v3"}}, -1,
true /* allow_global_seqno */));
ASSERT_EQ(Get("bar"), "v3");
writer.join();
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
}
TEST_P(ExternalSSTFileTest, IngestFileWithGlobalSeqnoRandomized) {
Options options = CurrentOptions();
options.IncreaseParallelism(20);