Fix a data race for cfd->log_number_ (#6249)
Summary: A thread calling LogAndApply may release db mutex when calling WriteCurrentStateToManifest() which reads cfd->log_number_. Another thread can call SwitchMemtable() and writes to cfd->log_number_. Solution is to cache the cfd->log_number_ before releasing mutex in LogAndApply. Test Plan (on devserver): ``` $COMPILE_WITH_TSAN=1 make db_stress $./db_stress --acquire_snapshot_one_in=10000 --avoid_unnecessary_blocking_io=1 --block_size=16384 --bloom_bits=16 --bottommost_compression_type=zstd --cache_index_and_filter_blocks=1 --cache_size=1048576 --checkpoint_one_in=1000000 --checksum_type=kxxHash --clear_column_family_one_in=0 --compact_files_one_in=1000000 --compact_range_one_in=1000000 --compaction_ttl=0 --compression_max_dict_bytes=16384 --compression_type=zstd --compression_zstd_max_train_bytes=0 --continuous_verification_interval=0 --db=/dev/shm/rocksdb/rocksdb_crashtest_blackbox --db_write_buffer_size=1048576 --delpercent=5 --delrangepercent=0 --destroy_db_initially=0 --enable_pipelined_write=0 --flush_one_in=1000000 --format_version=5 --get_live_files_and_wal_files_one_in=1000000 --index_block_restart_interval=5 --index_type=0 --log2_keys_per_lock=22 --long_running_snapshots=0 --max_background_compactions=20 --max_bytes_for_level_base=10485760 --max_key=1000000 --max_manifest_file_size=16384 --max_write_batch_group_size_bytes=16 --max_write_buffer_number=3 --memtablerep=skip_list --mmap_read=0 --nooverwritepercent=1 --open_files=500000 --ops_per_thread=100000000 --partition_filters=0 --pause_background_one_in=1000000 --periodic_compaction_seconds=0 --prefixpercent=5 --progress_reports=0 --readpercent=45 --recycle_log_file_num=0 --reopen=20 --set_options_one_in=10000 --snapshot_hold_ops=100000 --subcompactions=2 --sync=1 --target_file_size_base=2097152 --target_file_size_multiplier=2 --test_batches_snapshots=1 --use_direct_io_for_flush_and_compaction=0 --use_direct_reads=0 --use_full_merge_v1=0 --use_merge=0 --use_multiget=1 --verify_checksum=1 --verify_checksum_one_in=1000000 --verify_db_one_in=100000 --write_buffer_size=4194304 --write_dbid_to_manifest=1 --writepercent=35 ``` Then repeat the following multiple times, e.g. 100 after compiling with tsan. ``` $./db_test2 --gtest_filter=DBTest2.SwitchMemtableRaceWithNewManifest ``` Pull Request resolved: https://github.com/facebook/rocksdb/pull/6249 Differential Revision: D19235077 Pulled By: riversand963 fbshipit-source-id: 79467b52f48739ce7c27e440caa2447a40653173
This commit is contained in:
parent
946c43a026
commit
1aaa145877
@ -13,6 +13,7 @@
|
||||
* Fix a bug in WriteBatchWithIndex::MultiGetFromBatchAndDB, which is called by Transaction::MultiGet, that causes due to stale pointer access when the number of keys is > 32
|
||||
* BlobDB no longer updates the SST to blob file mapping upon failed compactions.
|
||||
* Fixed a bug where BlobDB was comparing the `ColumnFamilyHandle` pointers themselves instead of only the column family IDs when checking whether an API call uses the default column family or not.
|
||||
* Fix a race condition for cfd->log_number_ between manifest switch and memtable switch (PR 6249) when number of column families is greater than 1.
|
||||
|
||||
### New Features
|
||||
* It is now possible to enable periodic compactions for the base DB when using BlobDB.
|
||||
|
@ -4248,6 +4248,18 @@ TEST_F(DBTest2, BackgroundPurgeTest) {
|
||||
value = options.write_buffer_manager->memory_usage();
|
||||
ASSERT_EQ(base_value, value);
|
||||
}
|
||||
|
||||
TEST_F(DBTest2, SwitchMemtableRaceWithNewManifest) {
|
||||
Options options = CurrentOptions();
|
||||
DestroyAndReopen(options);
|
||||
options.max_manifest_file_size = 10;
|
||||
options.create_if_missing = true;
|
||||
CreateAndReopenWithCF({"pikachu"}, options);
|
||||
ASSERT_OK(Put("foo", "value"));
|
||||
port::Thread thread([&]() { ASSERT_OK(Flush()); });
|
||||
ASSERT_OK(dbfull()->SetOptions({{"prefix_extractor", "fixed:5"}}));
|
||||
thread.join();
|
||||
}
|
||||
} // namespace rocksdb
|
||||
|
||||
#ifdef ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS
|
||||
|
@ -3753,6 +3753,10 @@ Status VersionSet::ProcessManifestWrites(
|
||||
pending_manifest_file_number_ = manifest_file_number_;
|
||||
}
|
||||
|
||||
// Local cached copy of state variable(s). WriteCurrentStateToManifest()
|
||||
// reads its content after releasing db mutex to avoid race with
|
||||
// SwitchMemtable().
|
||||
std::unordered_map<uint32_t, MutableCFState> curr_state;
|
||||
if (new_descriptor_log) {
|
||||
// if we are writing out new snapshot make sure to persist max column
|
||||
// family.
|
||||
@ -3760,6 +3764,10 @@ Status VersionSet::ProcessManifestWrites(
|
||||
first_writer.edit_list.front()->SetMaxColumnFamily(
|
||||
column_family_set_->GetMaxColumnFamily());
|
||||
}
|
||||
for (const auto* cfd : *column_family_set_) {
|
||||
assert(curr_state.find(cfd->GetID()) == curr_state.end());
|
||||
curr_state[cfd->GetID()] = {cfd->GetLogNumber()};
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
@ -3802,7 +3810,7 @@ Status VersionSet::ProcessManifestWrites(
|
||||
nullptr, db_options_->listeners));
|
||||
descriptor_log_.reset(
|
||||
new log::Writer(std::move(file_writer), 0, false));
|
||||
s = WriteCurrentStateToManifest(descriptor_log_.get());
|
||||
s = WriteCurrentStateToManifest(curr_state, descriptor_log_.get());
|
||||
}
|
||||
}
|
||||
|
||||
@ -4918,7 +4926,9 @@ void VersionSet::MarkMinLogNumberToKeep2PC(uint64_t number) {
|
||||
}
|
||||
}
|
||||
|
||||
Status VersionSet::WriteCurrentStateToManifest(log::Writer* log) {
|
||||
Status VersionSet::WriteCurrentStateToManifest(
|
||||
const std::unordered_map<uint32_t, MutableCFState>& curr_state,
|
||||
log::Writer* log) {
|
||||
// TODO: Break up into multiple records to reduce memory usage on recovery?
|
||||
|
||||
// WARNING: This method doesn't hold a mutex!!
|
||||
@ -4984,7 +4994,10 @@ Status VersionSet::WriteCurrentStateToManifest(log::Writer* log) {
|
||||
f->oldest_ancester_time, f->file_creation_time);
|
||||
}
|
||||
}
|
||||
edit.SetLogNumber(cfd->GetLogNumber());
|
||||
const auto iter = curr_state.find(cfd->GetID());
|
||||
assert(iter != curr_state.end());
|
||||
uint64_t log_number = iter->second.log_number;
|
||||
edit.SetLogNumber(log_number);
|
||||
std::string record;
|
||||
if (!edit.EncodeTo(&record)) {
|
||||
return Status::Corruption(
|
||||
|
@ -1075,8 +1075,14 @@ class VersionSet {
|
||||
const Slice& start, const Slice& end,
|
||||
TableReaderCaller caller);
|
||||
|
||||
struct MutableCFState {
|
||||
uint64_t log_number;
|
||||
};
|
||||
|
||||
// Save current contents to *log
|
||||
Status WriteCurrentStateToManifest(log::Writer* log);
|
||||
Status WriteCurrentStateToManifest(
|
||||
const std::unordered_map<uint32_t, MutableCFState>& curr_state,
|
||||
log::Writer* log);
|
||||
|
||||
void AppendVersion(ColumnFamilyData* column_family_data, Version* v);
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user