diff --git a/HISTORY.md b/HISTORY.md index e96b6264f..604b09673 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -26,6 +26,7 @@ * Add an experimental Remote Compaction feature, which allows the user to run Compaction on a different host or process. The feature is still under development, currently only works on some basic use cases. The interface will be changed without backward/forward compatibility support. * RocksDB would validate total entries read in flush, and compare with counter inserted into it. If flush_verify_memtable_count = true (default), flush will fail. Otherwise, only log to info logs. * Add `TableProperties::num_filter_entries`, which can be used with `TableProperties::filter_size` to calculate the effective bits per filter entry (unique user key or prefix) for a table file. +* Added a `cancel` field to `CompactRangeOptions`, allowing individual in-process manual range compactions to be cancelled. ### Performance Improvements * BlockPrefetcher is used by iterators to prefetch data if they anticipate more data to be used in future. It is enabled implicitly by rocksdb. Added change to take in account read pattern if reads are sequential. This would disable prefetching for random reads in MultiGet and iterators as readahead_size is increased exponential doing large prefetches. diff --git a/db/builder.cc b/db/builder.cc index 6314ea589..b160ccea7 100644 --- a/db/builder.cc +++ b/db/builder.cc @@ -191,7 +191,8 @@ Status BuildTable( /*compaction=*/nullptr, compaction_filter.get(), /*shutting_down=*/nullptr, /*preserve_deletes_seqnum=*/0, /*manual_compaction_paused=*/nullptr, - db_options.info_log, full_history_ts_low); + /*manual_compaction_canceled=*/nullptr, db_options.info_log, + full_history_ts_low); c_iter.SeekToFirst(); for (; c_iter.Valid(); c_iter.Next()) { diff --git a/db/compaction/compaction_iterator.cc b/db/compaction/compaction_iterator.cc index 11db69fb4..e48818fd0 100644 --- a/db/compaction/compaction_iterator.cc +++ b/db/compaction/compaction_iterator.cc @@ -45,6 +45,7 @@ CompactionIterator::CompactionIterator( const std::atomic* shutting_down, const SequenceNumber preserve_deletes_seqnum, const std::atomic* manual_compaction_paused, + const std::atomic* manual_compaction_canceled, const std::shared_ptr info_log, const std::string* full_history_ts_low) : CompactionIterator( @@ -55,7 +56,8 @@ CompactionIterator::CompactionIterator( std::unique_ptr( compaction ? new RealCompaction(compaction) : nullptr), compaction_filter, shutting_down, preserve_deletes_seqnum, - manual_compaction_paused, info_log, full_history_ts_low) {} + manual_compaction_paused, manual_compaction_canceled, info_log, + full_history_ts_low) {} CompactionIterator::CompactionIterator( InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper, @@ -70,6 +72,7 @@ CompactionIterator::CompactionIterator( const std::atomic* shutting_down, const SequenceNumber preserve_deletes_seqnum, const std::atomic* manual_compaction_paused, + const std::atomic* manual_compaction_canceled, const std::shared_ptr info_log, const std::string* full_history_ts_low) : input_( @@ -91,6 +94,7 @@ CompactionIterator::CompactionIterator( compaction_filter_(compaction_filter), shutting_down_(shutting_down), manual_compaction_paused_(manual_compaction_paused), + manual_compaction_canceled_(manual_compaction_canceled), preserve_deletes_seqnum_(preserve_deletes_seqnum), info_log_(info_log), allow_data_in_errors_(allow_data_in_errors), diff --git a/db/compaction/compaction_iterator.h b/db/compaction/compaction_iterator.h index 616434253..7c459a767 100644 --- a/db/compaction/compaction_iterator.h +++ b/db/compaction/compaction_iterator.h @@ -150,40 +150,40 @@ class CompactionIterator { const Compaction* compaction_; }; - CompactionIterator(InternalIterator* input, const Comparator* cmp, - MergeHelper* merge_helper, SequenceNumber last_sequence, - std::vector* snapshots, - SequenceNumber earliest_write_conflict_snapshot, - const SnapshotChecker* snapshot_checker, Env* env, - bool report_detailed_time, bool expect_valid_internal_key, - CompactionRangeDelAggregator* range_del_agg, - BlobFileBuilder* blob_file_builder, - bool allow_data_in_errors, - const Compaction* compaction = nullptr, - const CompactionFilter* compaction_filter = nullptr, - const std::atomic* shutting_down = nullptr, - const SequenceNumber preserve_deletes_seqnum = 0, - const std::atomic* manual_compaction_paused = nullptr, - const std::shared_ptr info_log = nullptr, - const std::string* full_history_ts_low = nullptr); + CompactionIterator( + InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper, + SequenceNumber last_sequence, std::vector* snapshots, + SequenceNumber earliest_write_conflict_snapshot, + const SnapshotChecker* snapshot_checker, Env* env, + bool report_detailed_time, bool expect_valid_internal_key, + CompactionRangeDelAggregator* range_del_agg, + BlobFileBuilder* blob_file_builder, bool allow_data_in_errors, + const Compaction* compaction = nullptr, + const CompactionFilter* compaction_filter = nullptr, + const std::atomic* shutting_down = nullptr, + const SequenceNumber preserve_deletes_seqnum = 0, + const std::atomic* manual_compaction_paused = nullptr, + const std::atomic* manual_compaction_canceled = nullptr, + const std::shared_ptr info_log = nullptr, + const std::string* full_history_ts_low = nullptr); // Constructor with custom CompactionProxy, used for tests. - CompactionIterator(InternalIterator* input, const Comparator* cmp, - MergeHelper* merge_helper, SequenceNumber last_sequence, - std::vector* snapshots, - SequenceNumber earliest_write_conflict_snapshot, - const SnapshotChecker* snapshot_checker, Env* env, - bool report_detailed_time, bool expect_valid_internal_key, - CompactionRangeDelAggregator* range_del_agg, - BlobFileBuilder* blob_file_builder, - bool allow_data_in_errors, - std::unique_ptr compaction, - const CompactionFilter* compaction_filter = nullptr, - const std::atomic* shutting_down = nullptr, - const SequenceNumber preserve_deletes_seqnum = 0, - const std::atomic* manual_compaction_paused = nullptr, - const std::shared_ptr info_log = nullptr, - const std::string* full_history_ts_low = nullptr); + CompactionIterator( + InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper, + SequenceNumber last_sequence, std::vector* snapshots, + SequenceNumber earliest_write_conflict_snapshot, + const SnapshotChecker* snapshot_checker, Env* env, + bool report_detailed_time, bool expect_valid_internal_key, + CompactionRangeDelAggregator* range_del_agg, + BlobFileBuilder* blob_file_builder, bool allow_data_in_errors, + std::unique_ptr compaction, + const CompactionFilter* compaction_filter = nullptr, + const std::atomic* shutting_down = nullptr, + const SequenceNumber preserve_deletes_seqnum = 0, + const std::atomic* manual_compaction_paused = nullptr, + const std::atomic* manual_compaction_canceled = nullptr, + const std::shared_ptr info_log = nullptr, + const std::string* full_history_ts_low = nullptr); ~CompactionIterator(); @@ -303,6 +303,7 @@ class CompactionIterator { const CompactionFilter* compaction_filter_; const std::atomic* shutting_down_; const std::atomic* manual_compaction_paused_; + const std::atomic* manual_compaction_canceled_; const SequenceNumber preserve_deletes_seqnum_; bool bottommost_level_; bool valid_ = false; @@ -399,8 +400,10 @@ class CompactionIterator { bool IsPausingManualCompaction() { // This is a best-effort facility, so memory_order_relaxed is sufficient. - return manual_compaction_paused_ && - manual_compaction_paused_->load(std::memory_order_relaxed) > 0; + return (manual_compaction_paused_ && + manual_compaction_paused_->load(std::memory_order_relaxed) > 0) || + (manual_compaction_canceled_ && + manual_compaction_canceled_->load(std::memory_order_relaxed)); } }; } // namespace ROCKSDB_NAMESPACE diff --git a/db/compaction/compaction_iterator_test.cc b/db/compaction/compaction_iterator_test.cc index e14e5ec12..fef7b5417 100644 --- a/db/compaction/compaction_iterator_test.cc +++ b/db/compaction/compaction_iterator_test.cc @@ -282,7 +282,8 @@ class CompactionIteratorTest : public testing::TestWithParam { range_del_agg_.get(), nullptr /* blob_file_builder */, true /*allow_data_in_errors*/, std::move(compaction), filter, &shutting_down_, /*preserve_deletes_seqnum=*/0, - /*manual_compaction_paused=*/nullptr, /*info_log=*/nullptr, + /*manual_compaction_paused=*/nullptr, + /*manual_compaction_canceled=*/nullptr, /*info_log=*/nullptr, full_history_ts_low)); } diff --git a/db/compaction/compaction_job.cc b/db/compaction/compaction_job.cc index c1819af10..1fb2c63e2 100644 --- a/db/compaction/compaction_job.cc +++ b/db/compaction/compaction_job.cc @@ -313,9 +313,10 @@ CompactionJob::CompactionJob( EventLogger* event_logger, bool paranoid_file_checks, bool measure_io_stats, const std::string& dbname, CompactionJobStats* compaction_job_stats, Env::Priority thread_pri, const std::shared_ptr& io_tracer, - const std::atomic* manual_compaction_paused, const std::string& db_id, - const std::string& db_session_id, std::string full_history_ts_low, - BlobFileCompletionCallback* blob_callback) + const std::atomic* manual_compaction_paused, + const std::atomic* manual_compaction_canceled, + const std::string& db_id, const std::string& db_session_id, + std::string full_history_ts_low, BlobFileCompletionCallback* blob_callback) : compact_(new CompactionState(compaction)), compaction_stats_(compaction->compaction_reason(), 1), db_options_(db_options), @@ -339,6 +340,7 @@ CompactionJob::CompactionJob( versions_(versions), shutting_down_(shutting_down), manual_compaction_paused_(manual_compaction_paused), + manual_compaction_canceled_(manual_compaction_canceled), preserve_deletes_seqnum_(preserve_deletes_seqnum), db_directory_(db_directory), blob_output_directory_(blob_output_directory), @@ -1172,8 +1174,8 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { /*expect_valid_internal_key=*/true, &range_del_agg, blob_file_builder.get(), db_options_.allow_data_in_errors, sub_compact->compaction, compaction_filter, shutting_down_, - preserve_deletes_seqnum_, manual_compaction_paused_, db_options_.info_log, - full_history_ts_low)); + preserve_deletes_seqnum_, manual_compaction_paused_, + manual_compaction_canceled_, db_options_.info_log, full_history_ts_low)); auto c_iter = sub_compact->c_iter.get(); c_iter->SeekToFirst(); if (c_iter->Valid() && sub_compact->compaction->output_level() != 0) { @@ -1317,8 +1319,10 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { status = Status::ShutdownInProgress("Database shutdown"); } if ((status.ok() || status.IsColumnFamilyDropped()) && - (manual_compaction_paused_ && - manual_compaction_paused_->load(std::memory_order_relaxed) > 0)) { + ((manual_compaction_paused_ && + manual_compaction_paused_->load(std::memory_order_relaxed) > 0) || + (manual_compaction_canceled_ && + manual_compaction_canceled_->load(std::memory_order_relaxed)))) { status = Status::Incomplete(Status::SubCode::kManualCompactionPaused); } if (status.ok()) { @@ -2126,7 +2130,7 @@ CompactionServiceCompactionJob::CompactionServiceCompactionJob( compaction->mutable_cf_options()->paranoid_file_checks, compaction->mutable_cf_options()->report_bg_io_stats, dbname, &(compaction_service_result->stats), Env::Priority::USER, io_tracer, - nullptr, db_id, db_session_id, + nullptr, nullptr, db_id, db_session_id, compaction->column_family_data()->GetFullHistoryTsLow()), output_path_(output_path), compaction_input_(compaction_service_input), diff --git a/db/compaction/compaction_job.h b/db/compaction/compaction_job.h index 197f7e93b..0f71fd57b 100644 --- a/db/compaction/compaction_job.h +++ b/db/compaction/compaction_job.h @@ -81,6 +81,7 @@ class CompactionJob { const std::string& dbname, CompactionJobStats* compaction_job_stats, Env::Priority thread_pri, const std::shared_ptr& io_tracer, const std::atomic* manual_compaction_paused = nullptr, + const std::atomic* manual_compaction_canceled = nullptr, const std::string& db_id = "", const std::string& db_session_id = "", std::string full_history_ts_low = "", BlobFileCompletionCallback* blob_callback = nullptr); @@ -185,6 +186,7 @@ class CompactionJob { VersionSet* versions_; const std::atomic* shutting_down_; const std::atomic* manual_compaction_paused_; + const std::atomic* manual_compaction_canceled_; const SequenceNumber preserve_deletes_seqnum_; FSDirectory* db_directory_; FSDirectory* blob_output_directory_; diff --git a/db/compaction/compaction_job_test.cc b/db/compaction/compaction_job_test.cc index 062aa7d15..e7f985b38 100644 --- a/db/compaction/compaction_job_test.cc +++ b/db/compaction/compaction_job_test.cc @@ -350,7 +350,8 @@ class CompactionJobTestBase : public testing::Test { earliest_write_conflict_snapshot, snapshot_checker, table_cache_, &event_logger, false, false, dbname_, &compaction_job_stats_, Env::Priority::USER, nullptr /* IOTracer */, - /*manual_compaction_paused=*/nullptr, /*db_id=*/"", + /*manual_compaction_paused=*/nullptr, + /*manual_compaction_canceled=*/nullptr, /*db_id=*/"", /*db_session_id=*/"", full_history_ts_low_); VerifyInitializationOfCompactionJobStats(compaction_job_stats_); diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 00210d6bb..d787f66f4 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -1454,15 +1454,16 @@ class DBImpl : public DB { uint32_t output_path_id; Status status; bool done; - bool in_progress; // compaction request being processed? - bool incomplete; // only part of requested range compacted - bool exclusive; // current behavior of only one manual - bool disallow_trivial_move; // Force actual compaction to run - const InternalKey* begin; // nullptr means beginning of key range - const InternalKey* end; // nullptr means end of key range - InternalKey* manual_end; // how far we are compacting - InternalKey tmp_storage; // Used to keep track of compaction progress - InternalKey tmp_storage1; // Used to keep track of compaction progress + bool in_progress; // compaction request being processed? + bool incomplete; // only part of requested range compacted + bool exclusive; // current behavior of only one manual + bool disallow_trivial_move; // Force actual compaction to run + const InternalKey* begin; // nullptr means beginning of key range + const InternalKey* end; // nullptr means end of key range + InternalKey* manual_end; // how far we are compacting + InternalKey tmp_storage; // Used to keep track of compaction progress + InternalKey tmp_storage1; // Used to keep track of compaction progress + std::atomic* canceled; // Compaction canceled by the user? }; struct PrepickedCompaction { // background compaction takes ownership of `compaction`. diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index 6ec2f1c63..ec876d91e 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -807,6 +807,10 @@ Status DBImpl::CompactRange(const CompactRangeOptions& options, return Status::Incomplete(Status::SubCode::kManualCompactionPaused); } + if (options.canceled && options.canceled->load(std::memory_order_acquire)) { + return Status::Incomplete(Status::SubCode::kManualCompactionPaused); + } + const Comparator* const ucmp = column_family->GetComparator(); assert(ucmp); size_t ts_sz = ucmp->timestamp_size(); @@ -1253,7 +1257,7 @@ Status DBImpl::CompactFilesImpl( c->mutable_cf_options()->paranoid_file_checks, c->mutable_cf_options()->report_bg_io_stats, dbname_, &compaction_job_stats, Env::Priority::USER, io_tracer_, - &manual_compaction_paused_, db_id_, db_session_id_, + &manual_compaction_paused_, nullptr, db_id_, db_session_id_, c->column_family_data()->GetFullHistoryTsLow()); // Creating a compaction influences the compaction score because the score @@ -1426,10 +1430,13 @@ void DBImpl::NotifyOnCompactionCompleted( if (shutting_down_.load(std::memory_order_acquire)) { return; } + // TODO: Should disabling manual compaction squash compaction completed + // notifications that aren't the result of a shutdown? if (c->is_manual_compaction() && manual_compaction_paused_.load(std::memory_order_acquire) > 0) { return; } + Version* current = cfd->current(); current->Ref(); // release lock while notifying events @@ -1654,6 +1661,7 @@ Status DBImpl::RunManualCompaction( manual.incomplete = false; manual.exclusive = exclusive; manual.disallow_trivial_move = disallow_trivial_move; + manual.canceled = compact_range_options.canceled; // For universal compaction, we enforce every manual compaction to compact // all files. if (begin == nullptr || @@ -2819,6 +2827,9 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, } else if (is_manual && manual_compaction_paused_.load(std::memory_order_acquire) > 0) { status = Status::Incomplete(Status::SubCode::kManualCompactionPaused); + } else if (is_manual && manual_compaction->canceled && + manual_compaction->canceled->load(std::memory_order_acquire)) { + status = Status::Incomplete(Status::SubCode::kManualCompactionPaused); } } else { status = error_handler_.GetBGError(); @@ -3140,7 +3151,8 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, c->mutable_cf_options()->paranoid_file_checks, c->mutable_cf_options()->report_bg_io_stats, dbname_, &compaction_job_stats, thread_pri, io_tracer_, - is_manual ? &manual_compaction_paused_ : nullptr, db_id_, + is_manual ? &manual_compaction_paused_ : nullptr, + is_manual ? manual_compaction->canceled : nullptr, db_id_, db_session_id_, c->column_family_data()->GetFullHistoryTsLow()); compaction_job.Prepare(); diff --git a/db/db_test2.cc b/db/db_test2.cc index 5f87955e0..f209cdd03 100644 --- a/db/db_test2.cc +++ b/db/db_test2.cc @@ -9,6 +9,7 @@ #include #include #include +#include #include "db/db_test_util.h" #include "db/read_callback.h" @@ -3201,6 +3202,180 @@ TEST_F(DBTest2, PausingManualCompaction4) { ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); } +TEST_F(DBTest2, CancelManualCompaction1) { + CompactRangeOptions compact_options; + auto canceledPtr = + std::unique_ptr>(new std::atomic{true}); + compact_options.canceled = canceledPtr.get(); + + Options options = CurrentOptions(); + options.disable_auto_compactions = true; + options.num_levels = 7; + + Random rnd(301); + auto generate_files = [&]() { + for (int i = 0; i < options.num_levels; i++) { + for (int j = 0; j < options.num_levels - i + 1; j++) { + for (int k = 0; k < 1000; k++) { + ASSERT_OK(Put(Key(k + j * 1000), rnd.RandomString(50))); + } + Flush(); + } + + for (int l = 1; l < options.num_levels - i; l++) { + MoveFilesToLevel(l); + } + } + }; + + DestroyAndReopen(options); + generate_files(); +#ifndef ROCKSDB_LITE + ASSERT_EQ("2,3,4,5,6,7,8", FilesPerLevel()); +#endif // !ROCKSDB_LITE + + int run_manual_compactions = 0; + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "CompactionJob::Run():PausingManualCompaction:1", + [&](void* /*arg*/) { run_manual_compactions++; }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + // Setup a callback to disable compactions after a couple of levels are + // compacted + int compactions_run = 0; + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::RunManualCompaction()::1", + [&](void* /*arg*/) { ++compactions_run; }); + + dbfull()->CompactRange(compact_options, nullptr, nullptr); + dbfull()->TEST_WaitForCompact(true); + + // Since compactions are disabled, we shouldn't start compacting. + // E.g. we should call the compaction function exactly one time. + ASSERT_EQ(compactions_run, 0); + ASSERT_EQ(run_manual_compactions, 0); +#ifndef ROCKSDB_LITE + ASSERT_EQ("2,3,4,5,6,7,8", FilesPerLevel()); +#endif // !ROCKSDB_LITE + + compactions_run = 0; + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearCallBack( + "DBImpl::RunManualCompaction()::1"); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::RunManualCompaction()::1", [&](void* /*arg*/) { + ++compactions_run; + // After 3 compactions disable + if (compactions_run == 3) { + compact_options.canceled->store(true, std::memory_order_release); + } + }); + + compact_options.canceled->store(false, std::memory_order_release); + dbfull()->CompactRange(compact_options, nullptr, nullptr); + dbfull()->TEST_WaitForCompact(true); + + ASSERT_EQ(compactions_run, 3); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearCallBack( + "DBImpl::RunManualCompaction()::1"); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearCallBack( + "CompactionJob::Run():PausingManualCompaction:1"); + + // Compactions should work again if we re-enable them.. + compact_options.canceled->store(false, std::memory_order_relaxed); + dbfull()->CompactRange(compact_options, nullptr, nullptr); + dbfull()->TEST_WaitForCompact(true); +#ifndef ROCKSDB_LITE + ASSERT_EQ("0,0,0,0,0,0,2", FilesPerLevel()); +#endif // !ROCKSDB_LITE + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); +} + +TEST_F(DBTest2, CancelManualCompaction2) { + CompactRangeOptions compact_options; + auto canceledPtr = + std::unique_ptr>(new std::atomic{true}); + compact_options.canceled = canceledPtr.get(); + compact_options.max_subcompactions = 1; + + Options options = CurrentOptions(); + options.disable_auto_compactions = true; + options.num_levels = 7; + + Random rnd(301); + auto generate_files = [&]() { + for (int i = 0; i < options.num_levels; i++) { + for (int j = 0; j < options.num_levels - i + 1; j++) { + for (int k = 0; k < 1000; k++) { + ASSERT_OK(Put(Key(k + j * 1000), rnd.RandomString(50))); + } + Flush(); + } + + for (int l = 1; l < options.num_levels - i; l++) { + MoveFilesToLevel(l); + } + } + }; + + DestroyAndReopen(options); + generate_files(); +#ifndef ROCKSDB_LITE + ASSERT_EQ("2,3,4,5,6,7,8", FilesPerLevel()); +#endif // !ROCKSDB_LITE + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + int compactions_run = 0; + std::atomic kv_compactions{0}; + int compactions_stopped_at = 0; + int kv_compactions_stopped_at = 0; + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::RunManualCompaction()::1", [&](void* /*arg*/) { + ++compactions_run; + // After 3 compactions disable + }); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "CompactionIterator:ProcessKV", [&](void* /*arg*/) { + int kv_compactions_run = + kv_compactions.fetch_add(1, std::memory_order_release); + if (kv_compactions_run == 5) { + compact_options.canceled->store(true, std::memory_order_release); + kv_compactions_stopped_at = kv_compactions_run; + compactions_stopped_at = compactions_run; + } + }); + + compact_options.canceled->store(false, std::memory_order_release); + dbfull()->CompactRange(compact_options, nullptr, nullptr); + dbfull()->TEST_WaitForCompact(true); + + // NOTE: as we set compact_options.max_subcompacitons = 1, and store true to + // the canceled variable from the single compacting thread (via callback), + // this value is deterministically kv_compactions_stopped_at + 1. + ASSERT_EQ(kv_compactions, kv_compactions_stopped_at + 1); + ASSERT_EQ(compactions_run, compactions_stopped_at); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearCallBack( + "CompactionIterator::ProcessKV"); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearCallBack( + "DBImpl::RunManualCompaction()::1"); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearCallBack( + "CompactionJob::Run():PausingManualCompaction:1"); + + // Compactions should work again if we re-enable them.. + compact_options.canceled->store(false, std::memory_order_relaxed); + dbfull()->CompactRange(compact_options, nullptr, nullptr); + dbfull()->TEST_WaitForCompact(true); +#ifndef ROCKSDB_LITE + ASSERT_EQ("0,0,0,0,0,0,2", FilesPerLevel()); +#endif // !ROCKSDB_LITE + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); +} + TEST_F(DBTest2, OptimizeForPointLookup) { Options options = CurrentOptions(); Close(); diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 1669997bd..f5f3c9363 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -1656,6 +1656,9 @@ struct CompactRangeOptions { // Set user-defined timestamp low bound, the data with older timestamp than // low bound maybe GCed by compaction. Default: nullptr Slice* full_history_ts_low = nullptr; + + // Allows cancellation of an in-progress manual compaction. + std::atomic* canceled = nullptr; }; // IngestExternalFileOptions is used by IngestExternalFile()