Fix a bug in FlushJob picking more memtables beyond synced WALs (#9142)

Summary:
After RocksDB 6.19 and before this PR, RocksDB FlushJob may pick more memtables to flush beyond synced WALs.
This can be problematic if there are multiple column families, since it can prematurely advance the flushed column
family's log_number. Should subsequent attempts fail to sync the latest WALs and the database goes
through a recovery, it may detect corrupted WAL number below the flushed column family's log number
and complain about column family inconsistency.
To fix, we record the maximum memtable ID of the column family being flushed. Then we call SyncClosedLogs()
so that all closed WALs at the time when memtable ID is recorded will be synced.
I also disabled a unit test temporarily due to reasons described in https://github.com/facebook/rocksdb/issues/9151

Pull Request resolved: https://github.com/facebook/rocksdb/pull/9142

Test Plan: make check

Reviewed By: ajkr

Differential Revision: D32299956

Pulled By: riversand963

fbshipit-source-id: 0da75888177d91905cf8c9d00605b73afb5970a7
This commit is contained in:
Yanqin Jin 2021-11-19 09:55:10 -08:00 committed by Facebook GitHub Bot
parent 8cf4294e25
commit 1e8322c0f5
3 changed files with 78 additions and 14 deletions

View File

@ -21,6 +21,7 @@
* In some cases outside of the DB read and compaction paths, SST block checksums are now checked where they were not before. * In some cases outside of the DB read and compaction paths, SST block checksums are now checked where they were not before.
* Explicitly check for and disallow the `BlockBasedTableOptions` if insertion into one of {`block_cache`, `block_cache_compressed`, `persistent_cache`} can show up in another of these. (RocksDB expects to be able to use the same key for different physical data among tiers.) * Explicitly check for and disallow the `BlockBasedTableOptions` if insertion into one of {`block_cache`, `block_cache_compressed`, `persistent_cache`} can show up in another of these. (RocksDB expects to be able to use the same key for different physical data among tiers.)
* Users who configured a dedicated thread pool for bottommost compactions by explicitly adding threads to the `Env::Priority::BOTTOM` pool will no longer see RocksDB schedule automatic compactions exceeding the DB's compaction concurrency limit. For details on per-DB compaction concurrency limit, see API docs of `max_background_compactions` and `max_background_jobs`. * Users who configured a dedicated thread pool for bottommost compactions by explicitly adding threads to the `Env::Priority::BOTTOM` pool will no longer see RocksDB schedule automatic compactions exceeding the DB's compaction concurrency limit. For details on per-DB compaction concurrency limit, see API docs of `max_background_compactions` and `max_background_jobs`.
* Fixed a bug of background flush thread picking more memtables to flush and prematurely advancing column family's log_number.
### Behavior Changes ### Behavior Changes
* `NUM_FILES_IN_SINGLE_COMPACTION` was only counting the first input level files, now it's including all input files. * `NUM_FILES_IN_SINGLE_COMPACTION` was only counting the first input level files, now it's including all input files.

View File

@ -1199,7 +1199,7 @@ TEST_F(DBFlushTest, MemPurgeAndCompactionFilter) {
ASSERT_EQ(Get(KEY5), p_v5); ASSERT_EQ(Get(KEY5), p_v5);
} }
TEST_F(DBFlushTest, MemPurgeWALSupport) { TEST_F(DBFlushTest, DISABLED_MemPurgeWALSupport) {
Options options = CurrentOptions(); Options options = CurrentOptions();
options.statistics = CreateDBStatistics(); options.statistics = CreateDBStatistics();
@ -1858,6 +1858,50 @@ TEST_F(DBFlushTest, FlushWithChecksumHandoffManifest2) {
Destroy(options); Destroy(options);
} }
TEST_F(DBFlushTest, PickRightMemtables) {
Options options = CurrentOptions();
DestroyAndReopen(options);
options.create_if_missing = true;
const std::string test_cf_name = "test_cf";
options.max_write_buffer_number = 128;
CreateColumnFamilies({test_cf_name}, options);
Close();
ReopenWithColumnFamilies({kDefaultColumnFamilyName, test_cf_name}, options);
ASSERT_OK(db_->Put(WriteOptions(), "key", "value"));
ASSERT_OK(db_->Put(WriteOptions(), handles_[1], "key", "value"));
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
SyncPoint::GetInstance()->SetCallBack(
"DBImpl::SyncClosedLogs:BeforeReLock", [&](void* /*arg*/) {
ASSERT_OK(db_->Put(WriteOptions(), handles_[1], "what", "v"));
auto* cfhi =
static_cast_with_check<ColumnFamilyHandleImpl>(handles_[1]);
assert(cfhi);
ASSERT_OK(dbfull()->TEST_SwitchMemtable(cfhi->cfd()));
});
SyncPoint::GetInstance()->SetCallBack(
"DBImpl::FlushMemTableToOutputFile:AfterPickMemtables", [&](void* arg) {
auto* job = reinterpret_cast<FlushJob*>(arg);
assert(job);
const auto& mems = job->GetMemTables();
assert(mems.size() == 1);
assert(mems[0]);
ASSERT_EQ(1, mems[0]->GetID());
});
SyncPoint::GetInstance()->EnableProcessing();
ASSERT_OK(db_->Flush(FlushOptions(), handles_[1]));
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
}
class DBFlushTestBlobError : public DBFlushTest, class DBFlushTestBlobError : public DBFlushTest,
public testing::WithParamInterface<std::string> { public testing::WithParamInterface<std::string> {
public: public:

View File

@ -103,6 +103,8 @@ IOStatus DBImpl::SyncClosedLogs(JobContext* job_context) {
if (!logs_to_sync.empty()) { if (!logs_to_sync.empty()) {
mutex_.Unlock(); mutex_.Unlock();
assert(job_context);
for (log::Writer* log : logs_to_sync) { for (log::Writer* log : logs_to_sync) {
ROCKS_LOG_INFO(immutable_db_options_.info_log, ROCKS_LOG_INFO(immutable_db_options_.info_log,
"[JOB %d] Syncing log #%" PRIu64, job_context->job_id, "[JOB %d] Syncing log #%" PRIu64, job_context->job_id,
@ -125,6 +127,8 @@ IOStatus DBImpl::SyncClosedLogs(JobContext* job_context) {
DirFsyncOptions(DirFsyncOptions::FsyncReason::kNewFileSynced)); DirFsyncOptions(DirFsyncOptions::FsyncReason::kNewFileSynced));
} }
TEST_SYNC_POINT_CALLBACK("DBImpl::SyncClosedLogs:BeforeReLock",
/*arg=*/nullptr);
mutex_.Lock(); mutex_.Lock();
// "number <= current_log_number - 1" is equivalent to // "number <= current_log_number - 1" is equivalent to
@ -153,14 +157,34 @@ Status DBImpl::FlushMemTableToOutputFile(
Env::Priority thread_pri) { Env::Priority thread_pri) {
mutex_.AssertHeld(); mutex_.AssertHeld();
assert(cfd); assert(cfd);
assert(cfd->imm());
assert(cfd->imm()->NumNotFlushed() != 0); assert(cfd->imm()->NumNotFlushed() != 0);
assert(cfd->imm()->IsFlushPending()); assert(cfd->imm()->IsFlushPending());
assert(versions_);
assert(versions_->GetColumnFamilySet());
// If there are more than one column families, we need to make sure that
// all the log files except the most recent one are synced. Otherwise if
// the host crashes after flushing and before WAL is persistent, the
// flushed SST may contain data from write batches whose updates to
// other (unflushed) column families are missing.
const bool needs_to_sync_closed_wals =
logfile_number_ > 0 &&
versions_->GetColumnFamilySet()->NumberOfColumnFamilies() > 1;
// If needs_to_sync_closed_wals is true, we need to record the current
// maximum memtable ID of this column family so that a later PickMemtables()
// call will not pick memtables whose IDs are higher. This is due to the fact
// that SyncClosedLogs() may release the db mutex, and memtable switch can
// happen for this column family in the meantime. The newly created memtables
// have their data backed by unsynced WALs, thus they cannot be included in
// this flush job.
uint64_t max_memtable_id = needs_to_sync_closed_wals
? cfd->imm()->GetLatestMemTableID()
: port::kMaxUint64;
FlushJob flush_job( FlushJob flush_job(
dbname_, cfd, immutable_db_options_, mutable_cf_options, dbname_, cfd, immutable_db_options_, mutable_cf_options, max_memtable_id,
port::kMaxUint64 /* memtable_id */, file_options_for_compaction_, file_options_for_compaction_, versions_.get(), &mutex_, &shutting_down_,
versions_.get(), &mutex_, &shutting_down_, snapshot_seqs, snapshot_seqs, earliest_write_conflict_snapshot, snapshot_checker,
earliest_write_conflict_snapshot, snapshot_checker, job_context, job_context, log_buffer, directories_.GetDbDir(), GetDataDir(cfd, 0U),
log_buffer, directories_.GetDbDir(), GetDataDir(cfd, 0U),
GetCompressionFlush(*cfd->ioptions(), mutable_cf_options), stats_, GetCompressionFlush(*cfd->ioptions(), mutable_cf_options), stats_,
&event_logger_, mutable_cf_options.report_bg_io_stats, &event_logger_, mutable_cf_options.report_bg_io_stats,
true /* sync_output_directory */, true /* write_manifest */, thread_pri, true /* sync_output_directory */, true /* write_manifest */, thread_pri,
@ -176,13 +200,7 @@ Status DBImpl::FlushMemTableToOutputFile(
Status s; Status s;
bool need_cancel = false; bool need_cancel = false;
IOStatus log_io_s = IOStatus::OK(); IOStatus log_io_s = IOStatus::OK();
if (logfile_number_ > 0 && if (needs_to_sync_closed_wals) {
versions_->GetColumnFamilySet()->NumberOfColumnFamilies() > 1) {
// If there are more than one column families, we need to make sure that
// all the log files except the most recent one are synced. Otherwise if
// the host crashes after flushing and before WAL is persistent, the
// flushed SST may contain data from write batches whose updates to
// other column families are missing.
// SyncClosedLogs() may unlock and re-lock the db_mutex. // SyncClosedLogs() may unlock and re-lock the db_mutex.
log_io_s = SyncClosedLogs(job_context); log_io_s = SyncClosedLogs(job_context);
if (!log_io_s.ok() && !log_io_s.IsShutdownInProgress() && if (!log_io_s.ok() && !log_io_s.IsShutdownInProgress() &&
@ -201,7 +219,8 @@ Status DBImpl::FlushMemTableToOutputFile(
flush_job.PickMemTable(); flush_job.PickMemTable();
need_cancel = true; need_cancel = true;
} }
TEST_SYNC_POINT("DBImpl::FlushMemTableToOutputFile:AfterPickMemtables"); TEST_SYNC_POINT_CALLBACK(
"DBImpl::FlushMemTableToOutputFile:AfterPickMemtables", &flush_job);
bool switched_to_mempurge = false; bool switched_to_mempurge = false;
// Within flush_job.Run, rocksdb may call event listener to notify // Within flush_job.Run, rocksdb may call event listener to notify
// file creation and deletion. // file creation and deletion.