From 5691a1d8a480f8e16903cbdd5200e72c7f4297b1 Mon Sep 17 00:00:00 2001 From: Islam AbdelRahman Date: Thu, 13 Oct 2016 10:49:06 -0700 Subject: [PATCH] Fix compaction conflict with running compaction Summary: Issue scenario: (1) We have 3 files in L1 and we issue a compaction that will compact them into 1 file in L2 (2) While compaction (1) is running, we flush a file into L0 and trigger another compaction that decide to move this file to L1 and then move it again to L2 (this file don't overlap with any other files) (3) compaction (1) finishes and install the file it generated in L2, but this file overlap with the file we generated in (2) so we break the LSM consistency Looks like this issue can be triggered by using non-exclusive manual compaction or AddFile() Test Plan: unit tests Reviewers: sdong Reviewed By: sdong Subscribers: hermanlee4, jkedgar, andrewkr, dhruba, yoshinorim Differential Revision: https://reviews.facebook.net/D64947 --- db/column_family.cc | 7 ++ db/column_family.h | 7 ++ db/compaction.h | 2 + db/compaction_iterator.cc | 11 +-- db/compaction_iterator.h | 2 +- db/compaction_picker.cc | 156 ++++++++++++++++++++++++++++++----- db/compaction_picker.h | 33 +++++++- db/db_impl.cc | 12 --- db/db_impl.h | 4 - db/db_impl_add_file.cc | 45 +++------- db/db_test2.cc | 120 +++++++++++++++++++++++++++ db/external_sst_file_test.cc | 46 +++++++++++ 12 files changed, 365 insertions(+), 80 deletions(-) diff --git a/db/column_family.cc b/db/column_family.cc index 7efc5214d..02cb5035f 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -719,6 +719,13 @@ Compaction* ColumnFamilyData::PickCompaction( return result; } +bool ColumnFamilyData::RangeOverlapWithCompaction( + const Slice& smallest_user_key, const Slice& largest_user_key, + int level) const { + return compaction_picker_->RangeOverlapWithCompaction( + smallest_user_key, largest_user_key, level); +} + const int ColumnFamilyData::kCompactAllLevels = -1; const int ColumnFamilyData::kCompactToBaseLevel = -2; diff --git a/db/column_family.h b/db/column_family.h index 3b827a25e..57cb907d0 100644 --- a/db/column_family.h +++ b/db/column_family.h @@ -252,6 +252,13 @@ class ColumnFamilyData { // REQUIRES: DB mutex held Compaction* PickCompaction(const MutableCFOptions& mutable_options, LogBuffer* log_buffer); + + // Check if the passed range overlap with any running compactions. + // REQUIRES: DB mutex held + bool RangeOverlapWithCompaction(const Slice& smallest_user_key, + const Slice& largest_user_key, + int level) const; + // A flag to tell a manual compaction is to compact all levels together // instad of for specific level. static const int kCompactAllLevels; diff --git a/db/compaction.h b/db/compaction.h index 058e4dab0..f0edee841 100644 --- a/db/compaction.h +++ b/db/compaction.h @@ -104,6 +104,8 @@ class Compaction { return &inputs_[compaction_input_level].files; } + const std::vector* inputs() { return &inputs_; } + // Returns the LevelFilesBrief of the specified compaction input level. const LevelFilesBrief* input_levels(size_t compaction_input_level) const { return &input_levels_[compaction_input_level]; diff --git a/db/compaction_iterator.cc b/db/compaction_iterator.cc index d91ff8c7a..0333bf3a2 100644 --- a/db/compaction_iterator.cc +++ b/db/compaction_iterator.cc @@ -36,11 +36,11 @@ CompactionIterator::CompactionIterator( if (snapshots_->size() == 0) { // optimize for fast path if there are no snapshots - visible_at_tip_ = last_sequence; - earliest_snapshot_ = visible_at_tip_; + visible_at_tip_ = true; + earliest_snapshot_ = last_sequence; latest_snapshot_ = 0; } else { - visible_at_tip_ = 0; + visible_at_tip_ = false; earliest_snapshot_ = snapshots_->at(0); latest_snapshot_ = snapshots_->back(); } @@ -210,8 +210,9 @@ void CompactionIterator::NextFromInput() { SequenceNumber last_snapshot = current_user_key_snapshot_; SequenceNumber prev_snapshot = 0; // 0 means no previous snapshot current_user_key_snapshot_ = - visible_at_tip_ ? visible_at_tip_ : findEarliestVisibleSnapshot( - ikey_.sequence, &prev_snapshot); + visible_at_tip_ + ? earliest_snapshot_ + : findEarliestVisibleSnapshot(ikey_.sequence, &prev_snapshot); if (clear_and_output_next_key_) { // In the previous iteration we encountered a single delete that we could diff --git a/db/compaction_iterator.h b/db/compaction_iterator.h index 306dcfd57..04a24fdfe 100644 --- a/db/compaction_iterator.h +++ b/db/compaction_iterator.h @@ -104,7 +104,7 @@ class CompactionIterator { LogBuffer* log_buffer_; bool bottommost_level_; bool valid_ = false; - SequenceNumber visible_at_tip_; + bool visible_at_tip_; SequenceNumber earliest_snapshot_; SequenceNumber latest_snapshot_; bool ignore_snapshots_; diff --git a/db/compaction_picker.cc b/db/compaction_picker.cc index 67e4ec9de..7630453d7 100644 --- a/db/compaction_picker.cc +++ b/db/compaction_picker.cc @@ -151,17 +151,15 @@ CompactionPicker::~CompactionPicker() {} // Delete this compaction from the list of running compactions. void CompactionPicker::ReleaseCompactionFiles(Compaction* c, Status status) { - if (c->start_level() == 0 || - ioptions_.compaction_style == kCompactionStyleUniversal) { - level0_compactions_in_progress_.erase(c); - } + UnregisterCompaction(c); if (!status.ok()) { c->ResetNextCompactionIndex(); } } void CompactionPicker::GetRange(const CompactionInputFiles& inputs, - InternalKey* smallest, InternalKey* largest) { + InternalKey* smallest, + InternalKey* largest) const { const int level = inputs.level; assert(!inputs.empty()); smallest->Clear(); @@ -190,7 +188,8 @@ void CompactionPicker::GetRange(const CompactionInputFiles& inputs, void CompactionPicker::GetRange(const CompactionInputFiles& inputs1, const CompactionInputFiles& inputs2, - InternalKey* smallest, InternalKey* largest) { + InternalKey* smallest, + InternalKey* largest) const { assert(!inputs1.empty() || !inputs2.empty()); if (inputs1.empty()) { GetRange(inputs2, smallest, largest); @@ -206,6 +205,33 @@ void CompactionPicker::GetRange(const CompactionInputFiles& inputs1, } } +void CompactionPicker::GetRange(const std::vector& inputs, + InternalKey* smallest, + InternalKey* largest) const { + InternalKey current_smallest; + InternalKey current_largest; + bool initialized = false; + for (const auto& in : inputs) { + if (in.empty()) { + continue; + } + GetRange(in, ¤t_smallest, ¤t_largest); + if (!initialized) { + *smallest = current_smallest; + *largest = current_largest; + initialized = true; + } else { + if (icmp_->Compare(current_smallest, *smallest) < 0) { + *smallest = current_smallest; + } + if (icmp_->Compare(current_largest, *largest) > 0) { + *largest = current_largest; + } + } + } + assert(initialized); +} + bool CompactionPicker::ExpandWhileOverlapping(const std::string& cf_name, VersionStorageInfo* vstorage, CompactionInputFiles* inputs) { @@ -250,6 +276,42 @@ bool CompactionPicker::ExpandWhileOverlapping(const std::string& cf_name, return true; } +bool CompactionPicker::RangeOverlapWithCompaction( + const Slice& smallest_user_key, const Slice& largest_user_key, + int level) const { + const Comparator* ucmp = icmp_->user_comparator(); + for (Compaction* c : compactions_in_progress_) { + if (c->output_level() == level && + ucmp->Compare(smallest_user_key, c->GetLargestUserKey()) <= 0 && + ucmp->Compare(largest_user_key, c->GetSmallestUserKey()) >= 0) { + // Overlap + return true; + } + } + // Did not overlap with any running compaction in level `level` + return false; +} + +bool CompactionPicker::FilesRangeOverlapWithCompaction( + const std::vector& inputs, int level) const { + bool is_empty = true; + for (auto& in : inputs) { + if (!in.empty()) { + is_empty = false; + break; + } + } + if (is_empty) { + // No files in inputs + return false; + } + + InternalKey smallest, largest; + GetRange(inputs, &smallest, &largest); + return RangeOverlapWithCompaction(smallest.user_key(), largest.user_key(), + level); +} + // Returns true if any one of specified files are being compacted bool CompactionPicker::FilesInCompaction( const std::vector& files) { @@ -274,6 +336,10 @@ Compaction* CompactionPicker::FormCompaction( if ((input_files[0].level == 0) && !level0_compactions_in_progress_.empty()) { return nullptr; } + // This compaction output could overlap with a running compaction + if (FilesRangeOverlapWithCompaction(input_files, output_level)) { + return nullptr; + } auto c = new Compaction(vstorage, ioptions_, mutable_cf_options, input_files, output_level, compact_options.output_file_size_limit, @@ -282,9 +348,7 @@ Compaction* CompactionPicker::FormCompaction( // If it's level 0 compaction, make sure we don't execute any other level 0 // compactions in parallel - if ((c != nullptr) && (input_files[0].level == 0)) { - level0_compactions_in_progress_.insert(c); - } + RegisterCompaction(c); return c; } @@ -506,6 +570,16 @@ Compaction* CompactionPicker::CompactRange( return nullptr; } } + + // 2 non-exclusive manual compactions could run at the same time producing + // overlaping outputs in the same level. + if (FilesRangeOverlapWithCompaction(inputs, output_level)) { + // This compaction output could potentially conflict with the output + // of a currently running compaction, we cannot run it. + *manual_conflict = true; + return nullptr; + } + Compaction* c = new Compaction( vstorage, ioptions_, mutable_cf_options, std::move(inputs), output_level, mutable_cf_options.MaxFileSizeForLevel(output_level), @@ -513,9 +587,7 @@ Compaction* CompactionPicker::CompactRange( GetCompressionType(ioptions_, vstorage, mutable_cf_options, output_level, 1), /* grandparents */ {}, /* is manual */ true); - if (start_level == 0) { - level0_compactions_in_progress_.insert(c); - } + RegisterCompaction(c); return c; } @@ -604,6 +676,15 @@ Compaction* CompactionPicker::CompactRange( } } + // 2 non-exclusive manual compactions could run at the same time producing + // overlaping outputs in the same level. + if (FilesRangeOverlapWithCompaction(compaction_inputs, output_level)) { + // This compaction output could potentially conflict with the output + // of a currently running compaction, we cannot run it. + *manual_conflict = true; + return nullptr; + } + std::vector grandparents; GetGrandparents(vstorage, inputs, output_level_inputs, &grandparents); Compaction* compaction = new Compaction( @@ -615,9 +696,7 @@ Compaction* CompactionPicker::CompactRange( std::move(grandparents), /* is manual compaction */ true); TEST_SYNC_POINT_CALLBACK("CompactionPicker::CompactRange:Return", compaction); - if (input_level == 0) { - level0_compactions_in_progress_.insert(compaction); - } + RegisterCompaction(compaction); // Creating a compaction influences the compaction score because the score // takes running compactions into account (by skipping files that are already @@ -853,6 +932,30 @@ Status CompactionPicker::SanitizeCompactionInputFiles( } #endif // !ROCKSDB_LITE +void CompactionPicker::RegisterCompaction(Compaction* c) { + if (c == nullptr) { + return; + } + assert(ioptions_.compaction_style != kCompactionStyleLevel || + !FilesRangeOverlapWithCompaction(*c->inputs(), c->output_level())); + if (c->start_level() == 0 || + ioptions_.compaction_style == kCompactionStyleUniversal) { + level0_compactions_in_progress_.insert(c); + } + compactions_in_progress_.insert(c); +} + +void CompactionPicker::UnregisterCompaction(Compaction* c) { + if (c == nullptr) { + return; + } + if (c->start_level() == 0 || + ioptions_.compaction_style == kCompactionStyleUniversal) { + level0_compactions_in_progress_.erase(c); + } + compactions_in_progress_.erase(c); +} + bool LevelCompactionPicker::NeedsCompaction( const VersionStorageInfo* vstorage) const { if (!vstorage->FilesMarkedForCompaction().empty()) { @@ -936,7 +1039,8 @@ Compaction* LevelCompactionPicker::PickCompaction( output_level = (level == 0) ? vstorage->base_level() : level + 1; if (PickCompactionBySize(vstorage, level, output_level, &inputs, &parent_index, &base_index) && - ExpandWhileOverlapping(cf_name, vstorage, &inputs)) { + ExpandWhileOverlapping(cf_name, vstorage, &inputs) && + !FilesRangeOverlapWithCompaction({inputs}, output_level)) { // found the compaction! if (level == 0) { // L0 score = `num L0 files` / `level0_file_num_compaction_trigger` @@ -1009,6 +1113,18 @@ Compaction* LevelCompactionPicker::PickCompaction( compaction_inputs.push_back(output_level_inputs); } + // In some edge cases we could pick a compaction that will be compacting + // a key range that overlap with another running compaction, and both + // of them have the same output leve. This could happen if + // (1) we are running a non-exclusive manual compaction + // (2) AddFile ingest a new file into the LSM tree + // We need to disallow this from happening. + if (FilesRangeOverlapWithCompaction(compaction_inputs, output_level)) { + // This compaction output could potentially conflict with the output + // of a currently running compaction, we cannot run it. + return nullptr; + } + std::vector grandparents; GetGrandparents(vstorage, inputs, output_level_inputs, &grandparents); auto c = new Compaction( @@ -1023,9 +1139,7 @@ Compaction* LevelCompactionPicker::PickCompaction( // If it's level 0 compaction, make sure we don't execute any other level 0 // compactions in parallel - if (level == 0) { - level0_compactions_in_progress_.insert(c); - } + RegisterCompaction(c); // Creating a compaction influences the compaction score because the score // takes running compactions into account (by skipping files that are already @@ -1421,7 +1535,7 @@ Compaction* UniversalCompactionPicker::PickCompaction( MeasureTime(ioptions_.statistics, NUM_FILES_IN_SINGLE_COMPACTION, c->inputs(0)->size()); - level0_compactions_in_progress_.insert(c); + RegisterCompaction(c); TEST_SYNC_POINT_CALLBACK("UniversalCompactionPicker::PickCompaction:Return", c); @@ -1835,7 +1949,7 @@ Compaction* FIFOCompactionPicker::PickCompaction( vstorage, ioptions_, mutable_cf_options, std::move(inputs), 0, 0, 0, 0, kNoCompression, {}, /* is manual */ false, vstorage->CompactionScore(0), /* is deletion compaction */ true, CompactionReason::kFIFOMaxSize); - level0_compactions_in_progress_.insert(c); + RegisterCompaction(c); return c; } diff --git a/db/compaction_picker.h b/db/compaction_picker.h index 82f792c1d..7f3918fe4 100644 --- a/db/compaction_picker.h +++ b/db/compaction_picker.h @@ -113,6 +113,12 @@ class CompactionPicker { return !level0_compactions_in_progress_.empty(); } + // Return true if the passed key range overlap with a compaction output + // that is currently running. + bool RangeOverlapWithCompaction(const Slice& smallest_user_key, + const Slice& largest_user_key, + int level) const; + protected: int NumberLevels() const { return ioptions_.num_levels; } @@ -120,14 +126,20 @@ class CompactionPicker { // *smallest, *largest. // REQUIRES: inputs is not empty void GetRange(const CompactionInputFiles& inputs, InternalKey* smallest, - InternalKey* largest); + InternalKey* largest) const; // Stores the minimal range that covers all entries in inputs1 and inputs2 // in *smallest, *largest. // REQUIRES: inputs is not empty void GetRange(const CompactionInputFiles& inputs1, const CompactionInputFiles& inputs2, InternalKey* smallest, - InternalKey* largest); + InternalKey* largest) const; + + // Stores the minimal range that covers all entries in inputs + // in *smallest, *largest. + // REQUIRES: inputs is not empty (at least on entry have one file) + void GetRange(const std::vector& inputs, + InternalKey* smallest, InternalKey* largest) const; // Add more files to the inputs on "level" to make sure that // no newer version of a key is compacted to "level+1" while leaving an older @@ -148,6 +160,11 @@ class CompactionPicker { const InternalKey* smallest, const InternalKey* largest, int level, int* index); + // Returns true if the key range that `inputs` files cover overlap with the + // key range of a currently running compaction. + bool FilesRangeOverlapWithCompaction( + const std::vector& inputs, int level) const; + bool SetupOtherInputs(const std::string& cf_name, const MutableCFOptions& mutable_cf_options, VersionStorageInfo* vstorage, @@ -170,10 +187,20 @@ class CompactionPicker { const ColumnFamilyMetaData& cf_meta, const int output_level) const; #endif // ROCKSDB_LITE + // Register this compaction in the set of running compactions + void RegisterCompaction(Compaction* c); + + // Remove this compaction from the set of running compactions + void UnregisterCompaction(Compaction* c); + // Keeps track of all compactions that are running on Level0. - // It is protected by DB mutex + // Protected by DB mutex std::set level0_compactions_in_progress_; + // Keeps track of all compactions that are running. + // Protected by DB mutex + std::unordered_set compactions_in_progress_; + const InternalKeyComparator* const icmp_; }; diff --git a/db/db_impl.cc b/db/db_impl.cc index 44e53316f..0e6d08521 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -2238,7 +2238,6 @@ Status DBImpl::CompactFilesImpl( c->SetInputVersion(version); // deletion compaction currently not allowed in CompactFiles. assert(!c->deletion_compaction()); - running_compactions_.insert(c.get()); SequenceNumber earliest_write_conflict_snapshot; std::vector snapshot_seqs = @@ -2295,8 +2294,6 @@ Status DBImpl::CompactFilesImpl( ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem); - running_compactions_.erase(c.get()); - if (status.ok()) { // Done } else if (status.IsShutdownInProgress()) { @@ -2798,10 +2795,6 @@ Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level, ca->m = &manual; manual.incomplete = false; bg_compaction_scheduled_++; - // manual.compaction will be added to running_compactions_ and erased - // inside BackgroundCompaction() but we need to put it now since we - // will unlock the mutex. - running_compactions_.insert(manual.compaction); env_->Schedule(&DBImpl::BGWorkCompaction, ca, Env::Priority::LOW, this, &DBImpl::UnscheduleCallback); scheduled = true; @@ -3431,10 +3424,6 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, } } - if (c != nullptr) { - running_compactions_.insert(c.get()); - } - if (!c) { // Nothing to do LogToBuffer(log_buffer, "Compaction nothing to do"); @@ -3561,7 +3550,6 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, NotifyOnCompactionCompleted( c->column_family_data(), c.get(), status, compaction_job_stats, job_context->job_id); - running_compactions_.erase(c.get()); } // this will unref its input_version and column_family_data c.reset(); diff --git a/db/db_impl.h b/db/db_impl.h index 4eff025a5..e018fdb5a 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -980,10 +980,6 @@ class DBImpl : public DB { // The options to access storage files const EnvOptions env_options_; - // A set of compactions that are running right now - // REQUIRES: mutex held - std::unordered_set running_compactions_; - // Number of running AddFile() calls. // REQUIRES: mutex held int num_running_addfile_; diff --git a/db/db_impl_add_file.cc b/db/db_impl_add_file.cc index 4277e93c0..20962cd6f 100644 --- a/db/db_impl_add_file.cc +++ b/db/db_impl_add_file.cc @@ -264,7 +264,7 @@ Status DBImpl::AddFile(ColumnFamilyHandle* column_family, for (size_t i = 0; i < num_files; i++) { StopWatch sw(env_, nullptr, 0, µ_list[i], false); InternalKey range_start(file_info_list[i].smallest_key, - kMaxSequenceNumber, kTypeValue); + kMaxSequenceNumber, kValueTypeForSeek); iter->Seek(range_start.Encode()); status = iter->status(); @@ -378,45 +378,22 @@ int DBImpl::PickLevelForIngestedFile(ColumnFamilyData* cfd, int target_level = 0; auto* vstorage = cfd->current()->storage_info(); - auto* ucmp = vstorage->InternalComparator()->user_comparator(); - Slice file_smallest_user_key(file_info.smallest_key); Slice file_largest_user_key(file_info.largest_key); for (int lvl = cfd->NumberLevels() - 1; lvl >= vstorage->base_level(); lvl--) { + // Make sure that the file fits in Level `lvl` and dont overlap with + // the output of any compaction running right now. if (vstorage->OverlapInLevel(lvl, &file_smallest_user_key, - &file_largest_user_key) == false) { - // Make sure that the file dont overlap with the output of any - // compaction running right now - Slice compaction_smallest_user_key; - Slice compaction_largest_user_key; - bool overlap_with_compaction_output = false; - for (Compaction* c : running_compactions_) { - if (c->column_family_data()->GetID() != cfd->GetID() || - c->output_level() != lvl) { - continue; - } - - compaction_smallest_user_key = c->GetSmallestUserKey(); - compaction_largest_user_key = c->GetLargestUserKey(); - - if (ucmp->Compare(file_smallest_user_key, - compaction_largest_user_key) <= 0 && - ucmp->Compare(file_largest_user_key, - compaction_smallest_user_key) >= 0) { - overlap_with_compaction_output = true; - break; - } - } - - if (overlap_with_compaction_output == false) { - // Level lvl is the lowest level that dont have any files with key - // range overlapping with our file key range and no compactions - // planning to add overlapping files in it. - target_level = lvl; - break; - } + &file_largest_user_key) == false && + cfd->RangeOverlapWithCompaction(file_smallest_user_key, + file_largest_user_key, lvl) == false) { + // Level lvl is the lowest level that dont have any files with key + // range overlapping with our file key range and no compactions + // planning to add overlapping files in it. + target_level = lvl; + break; } } diff --git a/db/db_test2.cc b/db/db_test2.cc index 20ccf0a77..c72837eff 100644 --- a/db/db_test2.cc +++ b/db/db_test2.cc @@ -2064,6 +2064,126 @@ TEST_F(DBTest2, ReadAmpBitmapLiveInCacheAfterDBClose) { ASSERT_EQ(total_useful_bytes_iter1 + total_useful_bytes_iter2, total_loaded_bytes_iter1 + total_loaded_bytes_iter2); } + +#ifndef ROCKSDB_LITE +TEST_F(DBTest2, AutomaticCompactionOverlapManualCompaction) { + Options options = CurrentOptions(); + options.num_levels = 3; + options.IncreaseParallelism(20); + DestroyAndReopen(options); + + ASSERT_OK(Put(Key(0), "a")); + ASSERT_OK(Put(Key(5), "a")); + ASSERT_OK(Flush()); + + ASSERT_OK(Put(Key(10), "a")); + ASSERT_OK(Put(Key(15), "a")); + ASSERT_OK(Flush()); + + CompactRangeOptions cro; + cro.change_level = true; + cro.target_level = 2; + ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); + + // Trivial move 2 files to L2 + ASSERT_EQ("0,0,2", FilesPerLevel()); + + // While the compaction is running, we will create 2 new files that + // can fit in L2, these 2 files will be moved to L2 and overlap with + // the running compaction and break the LSM consistency. + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "CompactionJob::Run():Start", [&](void* arg) { + ASSERT_OK( + dbfull()->SetOptions({{"level0_file_num_compaction_trigger", "2"}, + {"max_bytes_for_level_base", "1"}})); + ASSERT_OK(Put(Key(6), "a")); + ASSERT_OK(Put(Key(7), "a")); + ASSERT_OK(Flush()); + + ASSERT_OK(Put(Key(8), "a")); + ASSERT_OK(Put(Key(9), "a")); + ASSERT_OK(Flush()); + }); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + // Run a manual compaction that will compact the 2 files in L2 + // into 1 file in L2 + cro.exclusive_manual_compaction = false; + cro.bottommost_level_compaction = BottommostLevelCompaction::kForce; + ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); + + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); +} + +TEST_F(DBTest2, ManualCompactionOverlapManualCompaction) { + Options options = CurrentOptions(); + options.num_levels = 2; + options.IncreaseParallelism(20); + options.disable_auto_compactions = true; + DestroyAndReopen(options); + + ASSERT_OK(Put(Key(0), "a")); + ASSERT_OK(Put(Key(5), "a")); + ASSERT_OK(Flush()); + + ASSERT_OK(Put(Key(10), "a")); + ASSERT_OK(Put(Key(15), "a")); + ASSERT_OK(Flush()); + + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + + // Trivial move 2 files to L1 + ASSERT_EQ("0,2", FilesPerLevel()); + + std::function bg_manual_compact = [&]() { + std::string k1 = Key(6); + std::string k2 = Key(9); + Slice k1s(k1); + Slice k2s(k2); + CompactRangeOptions cro; + cro.exclusive_manual_compaction = false; + ASSERT_OK(db_->CompactRange(cro, &k1s, &k2s)); + }; + std::thread bg_thread; + + // While the compaction is running, we will create 2 new files that + // can fit in L1, these 2 files will be moved to L1 and overlap with + // the running compaction and break the LSM consistency. + std::atomic flag(false); + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "CompactionJob::Run():Start", [&](void* arg) { + if (flag.exchange(true)) { + // We want to make sure to call this callback only once + return; + } + ASSERT_OK(Put(Key(6), "a")); + ASSERT_OK(Put(Key(7), "a")); + ASSERT_OK(Flush()); + + ASSERT_OK(Put(Key(8), "a")); + ASSERT_OK(Put(Key(9), "a")); + ASSERT_OK(Flush()); + + // Start a non-exclusive manual compaction in a bg thread + bg_thread = std::thread(bg_manual_compact); + // This manual compaction conflict with the other manual compaction + // so it should wait until the first compaction finish + env_->SleepForMicroseconds(1000000); + }); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + // Run a manual compaction that will compact the 2 files in L1 + // into 1 file in L1 + CompactRangeOptions cro; + cro.exclusive_manual_compaction = false; + cro.bottommost_level_compaction = BottommostLevelCompaction::kForce; + ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); + bg_thread.join(); + + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); +} +#endif // ROCKSDB_LITE + } // namespace rocksdb int main(int argc, char** argv) { diff --git a/db/external_sst_file_test.cc b/db/external_sst_file_test.cc index b79f1d957..aaaa8b147 100644 --- a/db/external_sst_file_test.cc +++ b/db/external_sst_file_test.cc @@ -1299,6 +1299,52 @@ TEST_F(ExternalSSTFileTest, AddExternalSstFileWithCustomCompartor) { } } +TEST_F(ExternalSSTFileTest, AddFileTrivialMoveBug) { + Options options = CurrentOptions(); + options.num_levels = 3; + options.IncreaseParallelism(20); + DestroyAndReopen(options); + + ASSERT_OK(GenerateAndAddExternalFile(options, {1, 4}, 1)); // L3 + ASSERT_OK(GenerateAndAddExternalFile(options, {2, 3}, 2)); // L2 + + ASSERT_OK(GenerateAndAddExternalFile(options, {10, 14}, 3)); // L3 + ASSERT_OK(GenerateAndAddExternalFile(options, {12, 13}, 4)); // L2 + + ASSERT_OK(GenerateAndAddExternalFile(options, {20, 24}, 5)); // L3 + ASSERT_OK(GenerateAndAddExternalFile(options, {22, 23}, 6)); // L2 + + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "CompactionJob::Run():Start", [&](void* arg) { + // fit in L3 but will overlap with compaction so will be added + // to L2 but a compaction will trivially move it to L3 + // and break LSM consistency + ASSERT_OK(dbfull()->SetOptions({{"max_bytes_for_level_base", "1"}})); + ASSERT_OK(GenerateAndAddExternalFile(options, {15, 16}, 7)); + }); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + CompactRangeOptions cro; + cro.exclusive_manual_compaction = false; + ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); + + dbfull()->TEST_WaitForCompact(); + + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); +} + +TEST_F(ExternalSSTFileTest, CompactAddedFiles) { + Options options = CurrentOptions(); + options.num_levels = 3; + DestroyAndReopen(options); + + ASSERT_OK(GenerateAndAddExternalFile(options, {1, 10}, 1)); // L3 + ASSERT_OK(GenerateAndAddExternalFile(options, {2, 9}, 2)); // L2 + ASSERT_OK(GenerateAndAddExternalFile(options, {3, 8}, 3)); // L1 + ASSERT_OK(GenerateAndAddExternalFile(options, {4, 7}, 4)); // L0 + + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); +} #endif // ROCKSDB_LITE } // namespace rocksdb