diff --git a/db/column_family.cc b/db/column_family.cc index 7efc5214d..02cb5035f 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -719,6 +719,13 @@ Compaction* ColumnFamilyData::PickCompaction( return result; } +bool ColumnFamilyData::RangeOverlapWithCompaction( + const Slice& smallest_user_key, const Slice& largest_user_key, + int level) const { + return compaction_picker_->RangeOverlapWithCompaction( + smallest_user_key, largest_user_key, level); +} + const int ColumnFamilyData::kCompactAllLevels = -1; const int ColumnFamilyData::kCompactToBaseLevel = -2; diff --git a/db/column_family.h b/db/column_family.h index 3b827a25e..57cb907d0 100644 --- a/db/column_family.h +++ b/db/column_family.h @@ -252,6 +252,13 @@ class ColumnFamilyData { // REQUIRES: DB mutex held Compaction* PickCompaction(const MutableCFOptions& mutable_options, LogBuffer* log_buffer); + + // Check if the passed range overlap with any running compactions. + // REQUIRES: DB mutex held + bool RangeOverlapWithCompaction(const Slice& smallest_user_key, + const Slice& largest_user_key, + int level) const; + // A flag to tell a manual compaction is to compact all levels together // instad of for specific level. static const int kCompactAllLevels; diff --git a/db/compaction.h b/db/compaction.h index 058e4dab0..f0edee841 100644 --- a/db/compaction.h +++ b/db/compaction.h @@ -104,6 +104,8 @@ class Compaction { return &inputs_[compaction_input_level].files; } + const std::vector* inputs() { return &inputs_; } + // Returns the LevelFilesBrief of the specified compaction input level. const LevelFilesBrief* input_levels(size_t compaction_input_level) const { return &input_levels_[compaction_input_level]; diff --git a/db/compaction_iterator.cc b/db/compaction_iterator.cc index d91ff8c7a..0333bf3a2 100644 --- a/db/compaction_iterator.cc +++ b/db/compaction_iterator.cc @@ -36,11 +36,11 @@ CompactionIterator::CompactionIterator( if (snapshots_->size() == 0) { // optimize for fast path if there are no snapshots - visible_at_tip_ = last_sequence; - earliest_snapshot_ = visible_at_tip_; + visible_at_tip_ = true; + earliest_snapshot_ = last_sequence; latest_snapshot_ = 0; } else { - visible_at_tip_ = 0; + visible_at_tip_ = false; earliest_snapshot_ = snapshots_->at(0); latest_snapshot_ = snapshots_->back(); } @@ -210,8 +210,9 @@ void CompactionIterator::NextFromInput() { SequenceNumber last_snapshot = current_user_key_snapshot_; SequenceNumber prev_snapshot = 0; // 0 means no previous snapshot current_user_key_snapshot_ = - visible_at_tip_ ? visible_at_tip_ : findEarliestVisibleSnapshot( - ikey_.sequence, &prev_snapshot); + visible_at_tip_ + ? earliest_snapshot_ + : findEarliestVisibleSnapshot(ikey_.sequence, &prev_snapshot); if (clear_and_output_next_key_) { // In the previous iteration we encountered a single delete that we could diff --git a/db/compaction_iterator.h b/db/compaction_iterator.h index 306dcfd57..04a24fdfe 100644 --- a/db/compaction_iterator.h +++ b/db/compaction_iterator.h @@ -104,7 +104,7 @@ class CompactionIterator { LogBuffer* log_buffer_; bool bottommost_level_; bool valid_ = false; - SequenceNumber visible_at_tip_; + bool visible_at_tip_; SequenceNumber earliest_snapshot_; SequenceNumber latest_snapshot_; bool ignore_snapshots_; diff --git a/db/compaction_picker.cc b/db/compaction_picker.cc index 67e4ec9de..7630453d7 100644 --- a/db/compaction_picker.cc +++ b/db/compaction_picker.cc @@ -151,17 +151,15 @@ CompactionPicker::~CompactionPicker() {} // Delete this compaction from the list of running compactions. void CompactionPicker::ReleaseCompactionFiles(Compaction* c, Status status) { - if (c->start_level() == 0 || - ioptions_.compaction_style == kCompactionStyleUniversal) { - level0_compactions_in_progress_.erase(c); - } + UnregisterCompaction(c); if (!status.ok()) { c->ResetNextCompactionIndex(); } } void CompactionPicker::GetRange(const CompactionInputFiles& inputs, - InternalKey* smallest, InternalKey* largest) { + InternalKey* smallest, + InternalKey* largest) const { const int level = inputs.level; assert(!inputs.empty()); smallest->Clear(); @@ -190,7 +188,8 @@ void CompactionPicker::GetRange(const CompactionInputFiles& inputs, void CompactionPicker::GetRange(const CompactionInputFiles& inputs1, const CompactionInputFiles& inputs2, - InternalKey* smallest, InternalKey* largest) { + InternalKey* smallest, + InternalKey* largest) const { assert(!inputs1.empty() || !inputs2.empty()); if (inputs1.empty()) { GetRange(inputs2, smallest, largest); @@ -206,6 +205,33 @@ void CompactionPicker::GetRange(const CompactionInputFiles& inputs1, } } +void CompactionPicker::GetRange(const std::vector& inputs, + InternalKey* smallest, + InternalKey* largest) const { + InternalKey current_smallest; + InternalKey current_largest; + bool initialized = false; + for (const auto& in : inputs) { + if (in.empty()) { + continue; + } + GetRange(in, ¤t_smallest, ¤t_largest); + if (!initialized) { + *smallest = current_smallest; + *largest = current_largest; + initialized = true; + } else { + if (icmp_->Compare(current_smallest, *smallest) < 0) { + *smallest = current_smallest; + } + if (icmp_->Compare(current_largest, *largest) > 0) { + *largest = current_largest; + } + } + } + assert(initialized); +} + bool CompactionPicker::ExpandWhileOverlapping(const std::string& cf_name, VersionStorageInfo* vstorage, CompactionInputFiles* inputs) { @@ -250,6 +276,42 @@ bool CompactionPicker::ExpandWhileOverlapping(const std::string& cf_name, return true; } +bool CompactionPicker::RangeOverlapWithCompaction( + const Slice& smallest_user_key, const Slice& largest_user_key, + int level) const { + const Comparator* ucmp = icmp_->user_comparator(); + for (Compaction* c : compactions_in_progress_) { + if (c->output_level() == level && + ucmp->Compare(smallest_user_key, c->GetLargestUserKey()) <= 0 && + ucmp->Compare(largest_user_key, c->GetSmallestUserKey()) >= 0) { + // Overlap + return true; + } + } + // Did not overlap with any running compaction in level `level` + return false; +} + +bool CompactionPicker::FilesRangeOverlapWithCompaction( + const std::vector& inputs, int level) const { + bool is_empty = true; + for (auto& in : inputs) { + if (!in.empty()) { + is_empty = false; + break; + } + } + if (is_empty) { + // No files in inputs + return false; + } + + InternalKey smallest, largest; + GetRange(inputs, &smallest, &largest); + return RangeOverlapWithCompaction(smallest.user_key(), largest.user_key(), + level); +} + // Returns true if any one of specified files are being compacted bool CompactionPicker::FilesInCompaction( const std::vector& files) { @@ -274,6 +336,10 @@ Compaction* CompactionPicker::FormCompaction( if ((input_files[0].level == 0) && !level0_compactions_in_progress_.empty()) { return nullptr; } + // This compaction output could overlap with a running compaction + if (FilesRangeOverlapWithCompaction(input_files, output_level)) { + return nullptr; + } auto c = new Compaction(vstorage, ioptions_, mutable_cf_options, input_files, output_level, compact_options.output_file_size_limit, @@ -282,9 +348,7 @@ Compaction* CompactionPicker::FormCompaction( // If it's level 0 compaction, make sure we don't execute any other level 0 // compactions in parallel - if ((c != nullptr) && (input_files[0].level == 0)) { - level0_compactions_in_progress_.insert(c); - } + RegisterCompaction(c); return c; } @@ -506,6 +570,16 @@ Compaction* CompactionPicker::CompactRange( return nullptr; } } + + // 2 non-exclusive manual compactions could run at the same time producing + // overlaping outputs in the same level. + if (FilesRangeOverlapWithCompaction(inputs, output_level)) { + // This compaction output could potentially conflict with the output + // of a currently running compaction, we cannot run it. + *manual_conflict = true; + return nullptr; + } + Compaction* c = new Compaction( vstorage, ioptions_, mutable_cf_options, std::move(inputs), output_level, mutable_cf_options.MaxFileSizeForLevel(output_level), @@ -513,9 +587,7 @@ Compaction* CompactionPicker::CompactRange( GetCompressionType(ioptions_, vstorage, mutable_cf_options, output_level, 1), /* grandparents */ {}, /* is manual */ true); - if (start_level == 0) { - level0_compactions_in_progress_.insert(c); - } + RegisterCompaction(c); return c; } @@ -604,6 +676,15 @@ Compaction* CompactionPicker::CompactRange( } } + // 2 non-exclusive manual compactions could run at the same time producing + // overlaping outputs in the same level. + if (FilesRangeOverlapWithCompaction(compaction_inputs, output_level)) { + // This compaction output could potentially conflict with the output + // of a currently running compaction, we cannot run it. + *manual_conflict = true; + return nullptr; + } + std::vector grandparents; GetGrandparents(vstorage, inputs, output_level_inputs, &grandparents); Compaction* compaction = new Compaction( @@ -615,9 +696,7 @@ Compaction* CompactionPicker::CompactRange( std::move(grandparents), /* is manual compaction */ true); TEST_SYNC_POINT_CALLBACK("CompactionPicker::CompactRange:Return", compaction); - if (input_level == 0) { - level0_compactions_in_progress_.insert(compaction); - } + RegisterCompaction(compaction); // Creating a compaction influences the compaction score because the score // takes running compactions into account (by skipping files that are already @@ -853,6 +932,30 @@ Status CompactionPicker::SanitizeCompactionInputFiles( } #endif // !ROCKSDB_LITE +void CompactionPicker::RegisterCompaction(Compaction* c) { + if (c == nullptr) { + return; + } + assert(ioptions_.compaction_style != kCompactionStyleLevel || + !FilesRangeOverlapWithCompaction(*c->inputs(), c->output_level())); + if (c->start_level() == 0 || + ioptions_.compaction_style == kCompactionStyleUniversal) { + level0_compactions_in_progress_.insert(c); + } + compactions_in_progress_.insert(c); +} + +void CompactionPicker::UnregisterCompaction(Compaction* c) { + if (c == nullptr) { + return; + } + if (c->start_level() == 0 || + ioptions_.compaction_style == kCompactionStyleUniversal) { + level0_compactions_in_progress_.erase(c); + } + compactions_in_progress_.erase(c); +} + bool LevelCompactionPicker::NeedsCompaction( const VersionStorageInfo* vstorage) const { if (!vstorage->FilesMarkedForCompaction().empty()) { @@ -936,7 +1039,8 @@ Compaction* LevelCompactionPicker::PickCompaction( output_level = (level == 0) ? vstorage->base_level() : level + 1; if (PickCompactionBySize(vstorage, level, output_level, &inputs, &parent_index, &base_index) && - ExpandWhileOverlapping(cf_name, vstorage, &inputs)) { + ExpandWhileOverlapping(cf_name, vstorage, &inputs) && + !FilesRangeOverlapWithCompaction({inputs}, output_level)) { // found the compaction! if (level == 0) { // L0 score = `num L0 files` / `level0_file_num_compaction_trigger` @@ -1009,6 +1113,18 @@ Compaction* LevelCompactionPicker::PickCompaction( compaction_inputs.push_back(output_level_inputs); } + // In some edge cases we could pick a compaction that will be compacting + // a key range that overlap with another running compaction, and both + // of them have the same output leve. This could happen if + // (1) we are running a non-exclusive manual compaction + // (2) AddFile ingest a new file into the LSM tree + // We need to disallow this from happening. + if (FilesRangeOverlapWithCompaction(compaction_inputs, output_level)) { + // This compaction output could potentially conflict with the output + // of a currently running compaction, we cannot run it. + return nullptr; + } + std::vector grandparents; GetGrandparents(vstorage, inputs, output_level_inputs, &grandparents); auto c = new Compaction( @@ -1023,9 +1139,7 @@ Compaction* LevelCompactionPicker::PickCompaction( // If it's level 0 compaction, make sure we don't execute any other level 0 // compactions in parallel - if (level == 0) { - level0_compactions_in_progress_.insert(c); - } + RegisterCompaction(c); // Creating a compaction influences the compaction score because the score // takes running compactions into account (by skipping files that are already @@ -1421,7 +1535,7 @@ Compaction* UniversalCompactionPicker::PickCompaction( MeasureTime(ioptions_.statistics, NUM_FILES_IN_SINGLE_COMPACTION, c->inputs(0)->size()); - level0_compactions_in_progress_.insert(c); + RegisterCompaction(c); TEST_SYNC_POINT_CALLBACK("UniversalCompactionPicker::PickCompaction:Return", c); @@ -1835,7 +1949,7 @@ Compaction* FIFOCompactionPicker::PickCompaction( vstorage, ioptions_, mutable_cf_options, std::move(inputs), 0, 0, 0, 0, kNoCompression, {}, /* is manual */ false, vstorage->CompactionScore(0), /* is deletion compaction */ true, CompactionReason::kFIFOMaxSize); - level0_compactions_in_progress_.insert(c); + RegisterCompaction(c); return c; } diff --git a/db/compaction_picker.h b/db/compaction_picker.h index 82f792c1d..7f3918fe4 100644 --- a/db/compaction_picker.h +++ b/db/compaction_picker.h @@ -113,6 +113,12 @@ class CompactionPicker { return !level0_compactions_in_progress_.empty(); } + // Return true if the passed key range overlap with a compaction output + // that is currently running. + bool RangeOverlapWithCompaction(const Slice& smallest_user_key, + const Slice& largest_user_key, + int level) const; + protected: int NumberLevels() const { return ioptions_.num_levels; } @@ -120,14 +126,20 @@ class CompactionPicker { // *smallest, *largest. // REQUIRES: inputs is not empty void GetRange(const CompactionInputFiles& inputs, InternalKey* smallest, - InternalKey* largest); + InternalKey* largest) const; // Stores the minimal range that covers all entries in inputs1 and inputs2 // in *smallest, *largest. // REQUIRES: inputs is not empty void GetRange(const CompactionInputFiles& inputs1, const CompactionInputFiles& inputs2, InternalKey* smallest, - InternalKey* largest); + InternalKey* largest) const; + + // Stores the minimal range that covers all entries in inputs + // in *smallest, *largest. + // REQUIRES: inputs is not empty (at least on entry have one file) + void GetRange(const std::vector& inputs, + InternalKey* smallest, InternalKey* largest) const; // Add more files to the inputs on "level" to make sure that // no newer version of a key is compacted to "level+1" while leaving an older @@ -148,6 +160,11 @@ class CompactionPicker { const InternalKey* smallest, const InternalKey* largest, int level, int* index); + // Returns true if the key range that `inputs` files cover overlap with the + // key range of a currently running compaction. + bool FilesRangeOverlapWithCompaction( + const std::vector& inputs, int level) const; + bool SetupOtherInputs(const std::string& cf_name, const MutableCFOptions& mutable_cf_options, VersionStorageInfo* vstorage, @@ -170,10 +187,20 @@ class CompactionPicker { const ColumnFamilyMetaData& cf_meta, const int output_level) const; #endif // ROCKSDB_LITE + // Register this compaction in the set of running compactions + void RegisterCompaction(Compaction* c); + + // Remove this compaction from the set of running compactions + void UnregisterCompaction(Compaction* c); + // Keeps track of all compactions that are running on Level0. - // It is protected by DB mutex + // Protected by DB mutex std::set level0_compactions_in_progress_; + // Keeps track of all compactions that are running. + // Protected by DB mutex + std::unordered_set compactions_in_progress_; + const InternalKeyComparator* const icmp_; }; diff --git a/db/db_impl.cc b/db/db_impl.cc index 44e53316f..0e6d08521 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -2238,7 +2238,6 @@ Status DBImpl::CompactFilesImpl( c->SetInputVersion(version); // deletion compaction currently not allowed in CompactFiles. assert(!c->deletion_compaction()); - running_compactions_.insert(c.get()); SequenceNumber earliest_write_conflict_snapshot; std::vector snapshot_seqs = @@ -2295,8 +2294,6 @@ Status DBImpl::CompactFilesImpl( ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem); - running_compactions_.erase(c.get()); - if (status.ok()) { // Done } else if (status.IsShutdownInProgress()) { @@ -2798,10 +2795,6 @@ Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level, ca->m = &manual; manual.incomplete = false; bg_compaction_scheduled_++; - // manual.compaction will be added to running_compactions_ and erased - // inside BackgroundCompaction() but we need to put it now since we - // will unlock the mutex. - running_compactions_.insert(manual.compaction); env_->Schedule(&DBImpl::BGWorkCompaction, ca, Env::Priority::LOW, this, &DBImpl::UnscheduleCallback); scheduled = true; @@ -3431,10 +3424,6 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, } } - if (c != nullptr) { - running_compactions_.insert(c.get()); - } - if (!c) { // Nothing to do LogToBuffer(log_buffer, "Compaction nothing to do"); @@ -3561,7 +3550,6 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, NotifyOnCompactionCompleted( c->column_family_data(), c.get(), status, compaction_job_stats, job_context->job_id); - running_compactions_.erase(c.get()); } // this will unref its input_version and column_family_data c.reset(); diff --git a/db/db_impl.h b/db/db_impl.h index 4eff025a5..e018fdb5a 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -980,10 +980,6 @@ class DBImpl : public DB { // The options to access storage files const EnvOptions env_options_; - // A set of compactions that are running right now - // REQUIRES: mutex held - std::unordered_set running_compactions_; - // Number of running AddFile() calls. // REQUIRES: mutex held int num_running_addfile_; diff --git a/db/db_impl_add_file.cc b/db/db_impl_add_file.cc index 4277e93c0..20962cd6f 100644 --- a/db/db_impl_add_file.cc +++ b/db/db_impl_add_file.cc @@ -264,7 +264,7 @@ Status DBImpl::AddFile(ColumnFamilyHandle* column_family, for (size_t i = 0; i < num_files; i++) { StopWatch sw(env_, nullptr, 0, µ_list[i], false); InternalKey range_start(file_info_list[i].smallest_key, - kMaxSequenceNumber, kTypeValue); + kMaxSequenceNumber, kValueTypeForSeek); iter->Seek(range_start.Encode()); status = iter->status(); @@ -378,45 +378,22 @@ int DBImpl::PickLevelForIngestedFile(ColumnFamilyData* cfd, int target_level = 0; auto* vstorage = cfd->current()->storage_info(); - auto* ucmp = vstorage->InternalComparator()->user_comparator(); - Slice file_smallest_user_key(file_info.smallest_key); Slice file_largest_user_key(file_info.largest_key); for (int lvl = cfd->NumberLevels() - 1; lvl >= vstorage->base_level(); lvl--) { + // Make sure that the file fits in Level `lvl` and dont overlap with + // the output of any compaction running right now. if (vstorage->OverlapInLevel(lvl, &file_smallest_user_key, - &file_largest_user_key) == false) { - // Make sure that the file dont overlap with the output of any - // compaction running right now - Slice compaction_smallest_user_key; - Slice compaction_largest_user_key; - bool overlap_with_compaction_output = false; - for (Compaction* c : running_compactions_) { - if (c->column_family_data()->GetID() != cfd->GetID() || - c->output_level() != lvl) { - continue; - } - - compaction_smallest_user_key = c->GetSmallestUserKey(); - compaction_largest_user_key = c->GetLargestUserKey(); - - if (ucmp->Compare(file_smallest_user_key, - compaction_largest_user_key) <= 0 && - ucmp->Compare(file_largest_user_key, - compaction_smallest_user_key) >= 0) { - overlap_with_compaction_output = true; - break; - } - } - - if (overlap_with_compaction_output == false) { - // Level lvl is the lowest level that dont have any files with key - // range overlapping with our file key range and no compactions - // planning to add overlapping files in it. - target_level = lvl; - break; - } + &file_largest_user_key) == false && + cfd->RangeOverlapWithCompaction(file_smallest_user_key, + file_largest_user_key, lvl) == false) { + // Level lvl is the lowest level that dont have any files with key + // range overlapping with our file key range and no compactions + // planning to add overlapping files in it. + target_level = lvl; + break; } } diff --git a/db/db_test2.cc b/db/db_test2.cc index 20ccf0a77..c72837eff 100644 --- a/db/db_test2.cc +++ b/db/db_test2.cc @@ -2064,6 +2064,126 @@ TEST_F(DBTest2, ReadAmpBitmapLiveInCacheAfterDBClose) { ASSERT_EQ(total_useful_bytes_iter1 + total_useful_bytes_iter2, total_loaded_bytes_iter1 + total_loaded_bytes_iter2); } + +#ifndef ROCKSDB_LITE +TEST_F(DBTest2, AutomaticCompactionOverlapManualCompaction) { + Options options = CurrentOptions(); + options.num_levels = 3; + options.IncreaseParallelism(20); + DestroyAndReopen(options); + + ASSERT_OK(Put(Key(0), "a")); + ASSERT_OK(Put(Key(5), "a")); + ASSERT_OK(Flush()); + + ASSERT_OK(Put(Key(10), "a")); + ASSERT_OK(Put(Key(15), "a")); + ASSERT_OK(Flush()); + + CompactRangeOptions cro; + cro.change_level = true; + cro.target_level = 2; + ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); + + // Trivial move 2 files to L2 + ASSERT_EQ("0,0,2", FilesPerLevel()); + + // While the compaction is running, we will create 2 new files that + // can fit in L2, these 2 files will be moved to L2 and overlap with + // the running compaction and break the LSM consistency. + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "CompactionJob::Run():Start", [&](void* arg) { + ASSERT_OK( + dbfull()->SetOptions({{"level0_file_num_compaction_trigger", "2"}, + {"max_bytes_for_level_base", "1"}})); + ASSERT_OK(Put(Key(6), "a")); + ASSERT_OK(Put(Key(7), "a")); + ASSERT_OK(Flush()); + + ASSERT_OK(Put(Key(8), "a")); + ASSERT_OK(Put(Key(9), "a")); + ASSERT_OK(Flush()); + }); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + // Run a manual compaction that will compact the 2 files in L2 + // into 1 file in L2 + cro.exclusive_manual_compaction = false; + cro.bottommost_level_compaction = BottommostLevelCompaction::kForce; + ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); + + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); +} + +TEST_F(DBTest2, ManualCompactionOverlapManualCompaction) { + Options options = CurrentOptions(); + options.num_levels = 2; + options.IncreaseParallelism(20); + options.disable_auto_compactions = true; + DestroyAndReopen(options); + + ASSERT_OK(Put(Key(0), "a")); + ASSERT_OK(Put(Key(5), "a")); + ASSERT_OK(Flush()); + + ASSERT_OK(Put(Key(10), "a")); + ASSERT_OK(Put(Key(15), "a")); + ASSERT_OK(Flush()); + + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + + // Trivial move 2 files to L1 + ASSERT_EQ("0,2", FilesPerLevel()); + + std::function bg_manual_compact = [&]() { + std::string k1 = Key(6); + std::string k2 = Key(9); + Slice k1s(k1); + Slice k2s(k2); + CompactRangeOptions cro; + cro.exclusive_manual_compaction = false; + ASSERT_OK(db_->CompactRange(cro, &k1s, &k2s)); + }; + std::thread bg_thread; + + // While the compaction is running, we will create 2 new files that + // can fit in L1, these 2 files will be moved to L1 and overlap with + // the running compaction and break the LSM consistency. + std::atomic flag(false); + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "CompactionJob::Run():Start", [&](void* arg) { + if (flag.exchange(true)) { + // We want to make sure to call this callback only once + return; + } + ASSERT_OK(Put(Key(6), "a")); + ASSERT_OK(Put(Key(7), "a")); + ASSERT_OK(Flush()); + + ASSERT_OK(Put(Key(8), "a")); + ASSERT_OK(Put(Key(9), "a")); + ASSERT_OK(Flush()); + + // Start a non-exclusive manual compaction in a bg thread + bg_thread = std::thread(bg_manual_compact); + // This manual compaction conflict with the other manual compaction + // so it should wait until the first compaction finish + env_->SleepForMicroseconds(1000000); + }); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + // Run a manual compaction that will compact the 2 files in L1 + // into 1 file in L1 + CompactRangeOptions cro; + cro.exclusive_manual_compaction = false; + cro.bottommost_level_compaction = BottommostLevelCompaction::kForce; + ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); + bg_thread.join(); + + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); +} +#endif // ROCKSDB_LITE + } // namespace rocksdb int main(int argc, char** argv) { diff --git a/db/external_sst_file_test.cc b/db/external_sst_file_test.cc index b79f1d957..aaaa8b147 100644 --- a/db/external_sst_file_test.cc +++ b/db/external_sst_file_test.cc @@ -1299,6 +1299,52 @@ TEST_F(ExternalSSTFileTest, AddExternalSstFileWithCustomCompartor) { } } +TEST_F(ExternalSSTFileTest, AddFileTrivialMoveBug) { + Options options = CurrentOptions(); + options.num_levels = 3; + options.IncreaseParallelism(20); + DestroyAndReopen(options); + + ASSERT_OK(GenerateAndAddExternalFile(options, {1, 4}, 1)); // L3 + ASSERT_OK(GenerateAndAddExternalFile(options, {2, 3}, 2)); // L2 + + ASSERT_OK(GenerateAndAddExternalFile(options, {10, 14}, 3)); // L3 + ASSERT_OK(GenerateAndAddExternalFile(options, {12, 13}, 4)); // L2 + + ASSERT_OK(GenerateAndAddExternalFile(options, {20, 24}, 5)); // L3 + ASSERT_OK(GenerateAndAddExternalFile(options, {22, 23}, 6)); // L2 + + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "CompactionJob::Run():Start", [&](void* arg) { + // fit in L3 but will overlap with compaction so will be added + // to L2 but a compaction will trivially move it to L3 + // and break LSM consistency + ASSERT_OK(dbfull()->SetOptions({{"max_bytes_for_level_base", "1"}})); + ASSERT_OK(GenerateAndAddExternalFile(options, {15, 16}, 7)); + }); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + CompactRangeOptions cro; + cro.exclusive_manual_compaction = false; + ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); + + dbfull()->TEST_WaitForCompact(); + + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); +} + +TEST_F(ExternalSSTFileTest, CompactAddedFiles) { + Options options = CurrentOptions(); + options.num_levels = 3; + DestroyAndReopen(options); + + ASSERT_OK(GenerateAndAddExternalFile(options, {1, 10}, 1)); // L3 + ASSERT_OK(GenerateAndAddExternalFile(options, {2, 9}, 2)); // L2 + ASSERT_OK(GenerateAndAddExternalFile(options, {3, 8}, 3)); // L1 + ASSERT_OK(GenerateAndAddExternalFile(options, {4, 7}, 4)); // L0 + + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); +} #endif // ROCKSDB_LITE } // namespace rocksdb