From 1aaa145877772bff5652d06a3914d9d66364f41e Mon Sep 17 00:00:00 2001 From: Yanqin Jin Date: Mon, 6 Jan 2020 20:08:24 -0800 Subject: [PATCH] Fix a data race for cfd->log_number_ (#6249) Summary: A thread calling LogAndApply may release db mutex when calling WriteCurrentStateToManifest() which reads cfd->log_number_. Another thread can call SwitchMemtable() and writes to cfd->log_number_. Solution is to cache the cfd->log_number_ before releasing mutex in LogAndApply. Test Plan (on devserver): ``` $COMPILE_WITH_TSAN=1 make db_stress $./db_stress --acquire_snapshot_one_in=10000 --avoid_unnecessary_blocking_io=1 --block_size=16384 --bloom_bits=16 --bottommost_compression_type=zstd --cache_index_and_filter_blocks=1 --cache_size=1048576 --checkpoint_one_in=1000000 --checksum_type=kxxHash --clear_column_family_one_in=0 --compact_files_one_in=1000000 --compact_range_one_in=1000000 --compaction_ttl=0 --compression_max_dict_bytes=16384 --compression_type=zstd --compression_zstd_max_train_bytes=0 --continuous_verification_interval=0 --db=/dev/shm/rocksdb/rocksdb_crashtest_blackbox --db_write_buffer_size=1048576 --delpercent=5 --delrangepercent=0 --destroy_db_initially=0 --enable_pipelined_write=0 --flush_one_in=1000000 --format_version=5 --get_live_files_and_wal_files_one_in=1000000 --index_block_restart_interval=5 --index_type=0 --log2_keys_per_lock=22 --long_running_snapshots=0 --max_background_compactions=20 --max_bytes_for_level_base=10485760 --max_key=1000000 --max_manifest_file_size=16384 --max_write_batch_group_size_bytes=16 --max_write_buffer_number=3 --memtablerep=skip_list --mmap_read=0 --nooverwritepercent=1 --open_files=500000 --ops_per_thread=100000000 --partition_filters=0 --pause_background_one_in=1000000 --periodic_compaction_seconds=0 --prefixpercent=5 --progress_reports=0 --readpercent=45 --recycle_log_file_num=0 --reopen=20 --set_options_one_in=10000 --snapshot_hold_ops=100000 --subcompactions=2 --sync=1 --target_file_size_base=2097152 --target_file_size_multiplier=2 --test_batches_snapshots=1 --use_direct_io_for_flush_and_compaction=0 --use_direct_reads=0 --use_full_merge_v1=0 --use_merge=0 --use_multiget=1 --verify_checksum=1 --verify_checksum_one_in=1000000 --verify_db_one_in=100000 --write_buffer_size=4194304 --write_dbid_to_manifest=1 --writepercent=35 ``` Then repeat the following multiple times, e.g. 100 after compiling with tsan. ``` $./db_test2 --gtest_filter=DBTest2.SwitchMemtableRaceWithNewManifest ``` Pull Request resolved: https://github.com/facebook/rocksdb/pull/6249 Differential Revision: D19235077 Pulled By: riversand963 fbshipit-source-id: 79467b52f48739ce7c27e440caa2447a40653173 --- HISTORY.md | 1 + db/db_test2.cc | 12 ++++++++++++ db/version_set.cc | 19 ++++++++++++++++--- db/version_set.h | 8 +++++++- 4 files changed, 36 insertions(+), 4 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index f5d77802e..5affc7960 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -13,6 +13,7 @@ * Fix a bug in WriteBatchWithIndex::MultiGetFromBatchAndDB, which is called by Transaction::MultiGet, that causes due to stale pointer access when the number of keys is > 32 * BlobDB no longer updates the SST to blob file mapping upon failed compactions. * Fixed a bug where BlobDB was comparing the `ColumnFamilyHandle` pointers themselves instead of only the column family IDs when checking whether an API call uses the default column family or not. +* Fix a race condition for cfd->log_number_ between manifest switch and memtable switch (PR 6249) when number of column families is greater than 1. ### New Features * It is now possible to enable periodic compactions for the base DB when using BlobDB. diff --git a/db/db_test2.cc b/db/db_test2.cc index 464aa23a9..0b7fdb44b 100644 --- a/db/db_test2.cc +++ b/db/db_test2.cc @@ -4248,6 +4248,18 @@ TEST_F(DBTest2, BackgroundPurgeTest) { value = options.write_buffer_manager->memory_usage(); ASSERT_EQ(base_value, value); } + +TEST_F(DBTest2, SwitchMemtableRaceWithNewManifest) { + Options options = CurrentOptions(); + DestroyAndReopen(options); + options.max_manifest_file_size = 10; + options.create_if_missing = true; + CreateAndReopenWithCF({"pikachu"}, options); + ASSERT_OK(Put("foo", "value")); + port::Thread thread([&]() { ASSERT_OK(Flush()); }); + ASSERT_OK(dbfull()->SetOptions({{"prefix_extractor", "fixed:5"}})); + thread.join(); +} } // namespace rocksdb #ifdef ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS diff --git a/db/version_set.cc b/db/version_set.cc index 06d4abd7f..a473c7129 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -3753,6 +3753,10 @@ Status VersionSet::ProcessManifestWrites( pending_manifest_file_number_ = manifest_file_number_; } + // Local cached copy of state variable(s). WriteCurrentStateToManifest() + // reads its content after releasing db mutex to avoid race with + // SwitchMemtable(). + std::unordered_map curr_state; if (new_descriptor_log) { // if we are writing out new snapshot make sure to persist max column // family. @@ -3760,6 +3764,10 @@ Status VersionSet::ProcessManifestWrites( first_writer.edit_list.front()->SetMaxColumnFamily( column_family_set_->GetMaxColumnFamily()); } + for (const auto* cfd : *column_family_set_) { + assert(curr_state.find(cfd->GetID()) == curr_state.end()); + curr_state[cfd->GetID()] = {cfd->GetLogNumber()}; + } } { @@ -3802,7 +3810,7 @@ Status VersionSet::ProcessManifestWrites( nullptr, db_options_->listeners)); descriptor_log_.reset( new log::Writer(std::move(file_writer), 0, false)); - s = WriteCurrentStateToManifest(descriptor_log_.get()); + s = WriteCurrentStateToManifest(curr_state, descriptor_log_.get()); } } @@ -4918,7 +4926,9 @@ void VersionSet::MarkMinLogNumberToKeep2PC(uint64_t number) { } } -Status VersionSet::WriteCurrentStateToManifest(log::Writer* log) { +Status VersionSet::WriteCurrentStateToManifest( + const std::unordered_map& curr_state, + log::Writer* log) { // TODO: Break up into multiple records to reduce memory usage on recovery? // WARNING: This method doesn't hold a mutex!! @@ -4984,7 +4994,10 @@ Status VersionSet::WriteCurrentStateToManifest(log::Writer* log) { f->oldest_ancester_time, f->file_creation_time); } } - edit.SetLogNumber(cfd->GetLogNumber()); + const auto iter = curr_state.find(cfd->GetID()); + assert(iter != curr_state.end()); + uint64_t log_number = iter->second.log_number; + edit.SetLogNumber(log_number); std::string record; if (!edit.EncodeTo(&record)) { return Status::Corruption( diff --git a/db/version_set.h b/db/version_set.h index aa18cabb5..c44e9f536 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -1075,8 +1075,14 @@ class VersionSet { const Slice& start, const Slice& end, TableReaderCaller caller); + struct MutableCFState { + uint64_t log_number; + }; + // Save current contents to *log - Status WriteCurrentStateToManifest(log::Writer* log); + Status WriteCurrentStateToManifest( + const std::unordered_map& curr_state, + log::Writer* log); void AppendVersion(ColumnFamilyData* column_family_data, Version* v);