diff --git a/db/compaction_iterator.cc b/db/compaction_iterator.cc index a57f558d7..a50572276 100644 --- a/db/compaction_iterator.cc +++ b/db/compaction_iterator.cc @@ -16,14 +16,14 @@ CompactionIterator::CompactionIterator( SequenceNumber earliest_write_conflict_snapshot, Env* env, bool expect_valid_internal_key, RangeDelAggregator* range_del_agg, const Compaction* compaction, const CompactionFilter* compaction_filter, - LogBuffer* log_buffer) + const std::atomic* shutting_down) : CompactionIterator( input, cmp, merge_helper, last_sequence, snapshots, earliest_write_conflict_snapshot, env, expect_valid_internal_key, range_del_agg, std::unique_ptr( compaction ? new CompactionProxy(compaction) : nullptr), - compaction_filter, log_buffer) {} + compaction_filter, shutting_down) {} CompactionIterator::CompactionIterator( InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper, @@ -31,7 +31,8 @@ CompactionIterator::CompactionIterator( SequenceNumber earliest_write_conflict_snapshot, Env* env, bool expect_valid_internal_key, RangeDelAggregator* range_del_agg, std::unique_ptr compaction, - const CompactionFilter* compaction_filter, LogBuffer* log_buffer) + const CompactionFilter* compaction_filter, + const std::atomic* shutting_down) : input_(input), cmp_(cmp), merge_helper_(merge_helper), @@ -42,7 +43,7 @@ CompactionIterator::CompactionIterator( range_del_agg_(range_del_agg), compaction_(std::move(compaction)), compaction_filter_(compaction_filter), - log_buffer_(log_buffer), + shutting_down_(shutting_down), merge_out_iter_(merge_helper_) { assert(compaction_filter_ == nullptr || compaction_ != nullptr); bottommost_level_ = @@ -136,7 +137,7 @@ void CompactionIterator::NextFromInput() { at_next_ = false; valid_ = false; - while (!valid_ && input_->Valid()) { + while (!valid_ && input_->Valid() && !IsShuttingDown()) { key_ = input_->key(); value_ = input_->value(); iter_stats_.num_input_records++; @@ -217,7 +218,8 @@ void CompactionIterator::NextFromInput() { } if (filter == CompactionFilter::Decision::kRemove) { - // convert the current key to a delete + // convert the current key to a delete; key_ is pointing into + // current_key_ at this point, so updating current_key_ updates key() ikey_.type = kTypeDeletion; current_key_.UpdateInternalKey(ikey_.sequence, kTypeDeletion); // no value associated with delete @@ -422,7 +424,6 @@ void CompactionIterator::NextFromInput() { input_->Next(); } else if (ikey_.type == kTypeMerge) { if (!merge_helper_->HasOperator()) { - LogToBuffer(log_buffer_, "Options::merge_operator is null."); status_ = Status::InvalidArgument( "merge_operator is not properly initialized."); return; @@ -433,11 +434,14 @@ void CompactionIterator::NextFromInput() { // have hit (A) // We encapsulate the merge related state machine in a different // object to minimize change to the existing flow. - merge_helper_->MergeUntil(input_, range_del_agg_, prev_snapshot, - bottommost_level_); + Status s = merge_helper_->MergeUntil(input_, range_del_agg_, + prev_snapshot, bottommost_level_); merge_out_iter_.SeekToFirst(); - if (merge_out_iter_.Valid()) { + if (!s.ok() && !s.IsMergeInProgress()) { + status_ = s; + return; + } else if (merge_out_iter_.Valid()) { // NOTE: key, value, and ikey_ refer to old entries. // These will be correctly set below. key_ = merge_out_iter_.key(); @@ -481,6 +485,10 @@ void CompactionIterator::NextFromInput() { input_->Seek(skip_until); } } + + if (!valid_ && IsShuttingDown()) { + status_ = Status::ShutdownInProgress(); + } } void CompactionIterator::PrepareOutput() { diff --git a/db/compaction_iterator.h b/db/compaction_iterator.h index 8677719f9..16cb2fb10 100644 --- a/db/compaction_iterator.h +++ b/db/compaction_iterator.h @@ -17,7 +17,6 @@ #include "db/pinned_iterators_manager.h" #include "db/range_del_aggregator.h" #include "rocksdb/compaction_filter.h" -#include "util/log_buffer.h" namespace rocksdb { @@ -61,7 +60,7 @@ class CompactionIterator { RangeDelAggregator* range_del_agg, const Compaction* compaction = nullptr, const CompactionFilter* compaction_filter = nullptr, - LogBuffer* log_buffer = nullptr); + const std::atomic* shutting_down = nullptr); // Constructor with custom CompactionProxy, used for tests. CompactionIterator(InternalIterator* input, const Comparator* cmp, @@ -72,7 +71,7 @@ class CompactionIterator { RangeDelAggregator* range_del_agg, std::unique_ptr compaction, const CompactionFilter* compaction_filter = nullptr, - LogBuffer* log_buffer = nullptr); + const std::atomic* shutting_down = nullptr); ~CompactionIterator(); @@ -125,7 +124,7 @@ class CompactionIterator { RangeDelAggregator* range_del_agg_; std::unique_ptr compaction_; const CompactionFilter* compaction_filter_; - LogBuffer* log_buffer_; + const std::atomic* shutting_down_; bool bottommost_level_; bool valid_ = false; bool visible_at_tip_; @@ -180,5 +179,10 @@ class CompactionIterator { // is in or beyond the last file checked during the previous call std::vector level_ptrs_; CompactionIterationStats iter_stats_; + + bool IsShuttingDown() { + // This is a best-effort facility, so memory_order_relaxed is sufficient. + return shutting_down_ && shutting_down_->load(std::memory_order_relaxed); + } }; } // namespace rocksdb diff --git a/db/compaction_iterator_test.cc b/db/compaction_iterator_test.cc index dd398696d..fede1cbd4 100644 --- a/db/compaction_iterator_test.cc +++ b/db/compaction_iterator_test.cc @@ -13,6 +13,64 @@ namespace rocksdb { +// Expects no merging attempts. +class NoMergingMergeOp : public MergeOperator { + public: + bool FullMergeV2(const MergeOperationInput& merge_in, + MergeOperationOutput* merge_out) const override { + ADD_FAILURE(); + return false; + } + bool PartialMergeMulti(const Slice& key, + const std::deque& operand_list, + std::string* new_value, + Logger* logger) const override { + ADD_FAILURE(); + return false; + } + const char* Name() const override { + return "CompactionIteratorTest NoMergingMergeOp"; + } +}; + +// Compaction filter that gets stuck when it sees a particular key, +// then gets unstuck when told to. +// Always returns Decition::kRemove. +class StallingFilter : public CompactionFilter { + public: + virtual Decision FilterV2(int level, const Slice& key, ValueType t, + const Slice& existing_value, std::string* new_value, + std::string* skip_until) const override { + int k = std::atoi(key.ToString().c_str()); + last_seen.store(k); + while (k >= stall_at.load()) { + std::this_thread::yield(); + } + return Decision::kRemove; + } + + const char* Name() const override { + return "CompactionIteratorTest StallingFilter"; + } + + // Wait until the filter sees a key >= k and stalls at that key. + // If `exact`, asserts that the seen key is equal to k. + void WaitForStall(int k, bool exact = true) { + stall_at.store(k); + while (last_seen.load() < k) { + std::this_thread::yield(); + } + if (exact) { + EXPECT_EQ(k, last_seen.load()); + } + } + + // Filter will stall on key >= stall_at. Advance stall_at to unstall. + mutable std::atomic stall_at{0}; + // Last key the filter was called with. + mutable std::atomic last_seen{0}; +}; + class LoggingForwardVectorIterator : public InternalIterator { public: struct Action { @@ -88,13 +146,15 @@ class FakeCompaction : public CompactionIterator::CompactionProxy { virtual int level(size_t compaction_input_level) const { return 0; } virtual bool KeyNotExistsBeyondOutputLevel( const Slice& user_key, std::vector* level_ptrs) const { - return false; + return key_not_exists_beyond_output_level; } virtual bool bottommost_level() const { return false; } virtual int number_levels() const { return 1; } virtual Slice GetLargestUserKey() const { return "\xff\xff\xff\xff\xff\xff\xff\xff\xff"; } + + bool key_not_exists_beyond_output_level = false; }; class CompactionIteratorTest : public testing::Test { @@ -116,17 +176,19 @@ class CompactionIteratorTest : public testing::Test { std::unique_ptr compaction; if (filter) { - compaction.reset(new FakeCompaction()); + compaction_proxy_ = new FakeCompaction(); + compaction.reset(compaction_proxy_); } merge_helper_.reset(new MergeHelper(Env::Default(), cmp_, merge_op, filter, - nullptr, 0U, false, 0)); + nullptr, 0U, false, 0, 0, nullptr, + &shutting_down_)); iter_.reset(new LoggingForwardVectorIterator(ks, vs)); iter_->SeekToFirst(); c_iter_.reset(new CompactionIterator( iter_.get(), cmp_, merge_helper_.get(), last_sequence, &snapshots_, kMaxSequenceNumber, Env::Default(), false, range_del_agg_.get(), - std::move(compaction), filter)); + std::move(compaction), filter, &shutting_down_)); } void AddSnapshot(SequenceNumber snapshot) { snapshots_.push_back(snapshot); } @@ -138,6 +200,8 @@ class CompactionIteratorTest : public testing::Test { std::unique_ptr iter_; std::unique_ptr c_iter_; std::unique_ptr range_del_agg_; + std::atomic shutting_down_{false}; + FakeCompaction* compaction_proxy_; }; // It is possible that the output of the compaction iterator is empty even if @@ -209,26 +273,6 @@ TEST_F(CompactionIteratorTest, RangeDeletionWithSnapshots) { } TEST_F(CompactionIteratorTest, CompactionFilterSkipUntil) { - // Expect no merging attempts. - class MergeOp : public MergeOperator { - public: - bool FullMergeV2(const MergeOperationInput& merge_in, - MergeOperationOutput* merge_out) const override { - ADD_FAILURE(); - return false; - } - bool PartialMergeMulti(const Slice& key, - const std::deque& operand_list, - std::string* new_value, - Logger* logger) const override { - ADD_FAILURE(); - return false; - } - const char* Name() const override { - return "CompactionIteratorTest.CompactionFilterSkipUntil::MergeOp"; - } - }; - class Filter : public CompactionFilter { virtual Decision FilterV2(int level, const Slice& key, ValueType t, const Slice& existing_value, @@ -286,7 +330,7 @@ TEST_F(CompactionIteratorTest, CompactionFilterSkipUntil) { } }; - MergeOp merge_op; + NoMergingMergeOp merge_op; Filter filter; InitIterators( {test::KeyStr("a", 50, kTypeValue), // keep @@ -338,6 +382,77 @@ TEST_F(CompactionIteratorTest, CompactionFilterSkipUntil) { ASSERT_EQ(expected_actions, iter_->log); } +TEST_F(CompactionIteratorTest, ShuttingDownInFilter) { + NoMergingMergeOp merge_op; + StallingFilter filter; + InitIterators( + {test::KeyStr("1", 1, kTypeValue), test::KeyStr("2", 2, kTypeValue), + test::KeyStr("3", 3, kTypeValue), test::KeyStr("4", 4, kTypeValue)}, + {"v1", "v2", "v3", "v4"}, {}, {}, kMaxSequenceNumber, &merge_op, &filter); + // Don't leave tombstones (kTypeDeletion) for filtered keys. + compaction_proxy_->key_not_exists_beyond_output_level = true; + + std::atomic seek_done{false}; + std::thread compaction_thread([&] { + c_iter_->SeekToFirst(); + EXPECT_FALSE(c_iter_->Valid()); + EXPECT_TRUE(c_iter_->status().IsShutdownInProgress()); + seek_done.store(true); + }); + + // Let key 1 through. + filter.WaitForStall(1); + + // Shutdown during compaction filter call for key 2. + filter.WaitForStall(2); + shutting_down_.store(true); + EXPECT_FALSE(seek_done.load()); + + // Unstall filter and wait for SeekToFirst() to return. + filter.stall_at.store(3); + compaction_thread.join(); + assert(seek_done.load()); + + // Check that filter was never called again. + EXPECT_EQ(2, filter.last_seen.load()); +} + +// Same as ShuttingDownInFilter, but shutdown happens during filter call for +// a merge operand, not for a value. +TEST_F(CompactionIteratorTest, ShuttingDownInMerge) { + NoMergingMergeOp merge_op; + StallingFilter filter; + InitIterators( + {test::KeyStr("1", 1, kTypeValue), test::KeyStr("2", 2, kTypeMerge), + test::KeyStr("3", 3, kTypeMerge), test::KeyStr("4", 4, kTypeValue)}, + {"v1", "v2", "v3", "v4"}, {}, {}, kMaxSequenceNumber, &merge_op, &filter); + compaction_proxy_->key_not_exists_beyond_output_level = true; + + std::atomic seek_done{false}; + std::thread compaction_thread([&] { + c_iter_->SeekToFirst(); + ASSERT_FALSE(c_iter_->Valid()); + ASSERT_TRUE(c_iter_->status().IsShutdownInProgress()); + seek_done.store(true); + }); + + // Let key 1 through. + filter.WaitForStall(1); + + // Shutdown during compaction filter call for key 2. + filter.WaitForStall(2); + shutting_down_.store(true); + EXPECT_FALSE(seek_done.load()); + + // Unstall filter and wait for SeekToFirst() to return. + filter.stall_at.store(3); + compaction_thread.join(); + assert(seek_done.load()); + + // Check that filter was never called again. + EXPECT_EQ(2, filter.last_seen.load()); +} + } // namespace rocksdb int main(int argc, char** argv) { diff --git a/db/compaction_job.cc b/db/compaction_job.cc index 42f32936c..a730be191 100644 --- a/db/compaction_job.cc +++ b/db/compaction_job.cc @@ -264,7 +264,7 @@ void CompactionJob::AggregateStatistics() { CompactionJob::CompactionJob( int job_id, Compaction* compaction, const ImmutableDBOptions& db_options, const EnvOptions& env_options, VersionSet* versions, - std::atomic* shutting_down, LogBuffer* log_buffer, + const std::atomic* shutting_down, LogBuffer* log_buffer, Directory* db_directory, Directory* output_directory, Statistics* stats, InstrumentedMutex* db_mutex, Status* db_bg_error, std::vector existing_snapshots, @@ -724,7 +724,8 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { mutable_cf_options->min_partial_merge_operands, false /* internal key corruption is expected */, existing_snapshots_.empty() ? 0 : existing_snapshots_.back(), - compact_->compaction->level(), db_options_.statistics.get()); + compact_->compaction->level(), db_options_.statistics.get(), + shutting_down_); TEST_SYNC_POINT("CompactionJob::Run():Inprogress"); @@ -742,7 +743,8 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { sub_compact->c_iter.reset(new CompactionIterator( input.get(), cfd->user_comparator(), &merge, versions_->LastSequence(), &existing_snapshots_, earliest_write_conflict_snapshot_, env_, false, - range_del_agg.get(), sub_compact->compaction, compaction_filter)); + range_del_agg.get(), sub_compact->compaction, compaction_filter, + shutting_down_)); auto c_iter = sub_compact->c_iter.get(); c_iter->SeekToFirst(); const auto& c_iter_stats = c_iter->iter_stats(); @@ -753,10 +755,7 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { std::string compression_dict; compression_dict.reserve(cfd->ioptions()->compression_opts.max_dict_bytes); - // TODO(noetzli): check whether we could check !shutting_down_->... only - // only occasionally (see diff D42687) - while (status.ok() && !shutting_down_->load(std::memory_order_acquire) && - !cfd->IsDropped() && c_iter->Valid()) { + while (status.ok() && !cfd->IsDropped() && c_iter->Valid()) { // Invariant: c_iter.status() is guaranteed to be OK if c_iter->Valid() // returns true. const Slice& key = c_iter->key(); @@ -903,27 +902,36 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { RecordDroppedKeys(c_iter_stats, &sub_compact->compaction_job_stats); RecordCompactionIOStats(); - if (status.ok() && - (shutting_down_->load(std::memory_order_acquire) || cfd->IsDropped())) { + if (status.ok() && (shutting_down_->load(std::memory_order_relaxed) || + cfd->IsDropped())) { status = Status::ShutdownInProgress( "Database shutdown or Column family drop during compaction"); } + if (status.ok()) { + status = input->status(); + } + if (status.ok()) { + status = c_iter->status(); + } + if (status.ok() && sub_compact->builder == nullptr && sub_compact->outputs.size() == 0 && range_del_agg->ShouldAddTombstones(bottommost_level_)) { // handle subcompaction containing only range deletions status = OpenCompactionOutputFile(sub_compact); } - if (status.ok() && sub_compact->builder != nullptr) { + + // Call FinishCompactionOutputFile() even if status is not ok: it needs to + // close the output file. + if (sub_compact->builder != nullptr) { CompactionIterationStats range_del_out_stats; - status = - FinishCompactionOutputFile(input->status(), sub_compact, - range_del_agg.get(), &range_del_out_stats); + Status s = FinishCompactionOutputFile( + status, sub_compact, range_del_agg.get(), &range_del_out_stats); + if (status.ok()) { + status = s; + } RecordDroppedKeys(range_del_out_stats, &sub_compact->compaction_job_stats); } - if (status.ok()) { - status = input->status(); - } if (measure_io_stats_) { sub_compact->compaction_job_stats.file_write_nanos += diff --git a/db/compaction_job.h b/db/compaction_job.h index 25199a6ff..be94520ea 100644 --- a/db/compaction_job.h +++ b/db/compaction_job.h @@ -57,7 +57,7 @@ class CompactionJob { CompactionJob(int job_id, Compaction* compaction, const ImmutableDBOptions& db_options, const EnvOptions& env_options, VersionSet* versions, - std::atomic* shutting_down, LogBuffer* log_buffer, + const std::atomic* shutting_down, LogBuffer* log_buffer, Directory* db_directory, Directory* output_directory, Statistics* stats, InstrumentedMutex* db_mutex, Status* db_bg_error, @@ -131,7 +131,7 @@ class CompactionJob { Env* env_; VersionSet* versions_; - std::atomic* shutting_down_; + const std::atomic* shutting_down_; LogBuffer* log_buffer_; Directory* db_directory_; Directory* output_directory_; diff --git a/db/db_impl.cc b/db/db_impl.cc index 7b737f66f..a084d29d0 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -379,6 +379,9 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname) void DBImpl::CancelAllBackgroundWork(bool wait) { InstrumentedMutexLock l(&mutex_); + Log(InfoLogLevel::INFO_LEVEL, immutable_db_options_.info_log, + "Shutdown: canceling all background work"); + if (!shutting_down_.load(std::memory_order_acquire) && has_unpersisted_data_ && !mutable_db_options_.avoid_flush_during_shutdown) { @@ -503,6 +506,8 @@ DBImpl::~DBImpl() { env_->UnlockFile(db_lock_); } + Log(InfoLogLevel::INFO_LEVEL, immutable_db_options_.info_log, + "Shutdown complete"); LogFlush(immutable_db_options_.info_log); } diff --git a/db/merge_helper.cc b/db/merge_helper.cc index 04e7bcb80..9e3d7d1d7 100644 --- a/db/merge_helper.cc +++ b/db/merge_helper.cc @@ -104,6 +104,10 @@ Status MergeHelper::MergeUntil(InternalIterator* iter, Status s; bool hit_the_next_user_key = false; for (; iter->Valid(); iter->Next(), original_key_is_iter = false) { + if (IsShuttingDown()) { + return Status::ShutdownInProgress(); + } + ParsedInternalKey ikey; assert(keys_.size() == merge_context_.GetNumOperands()); @@ -278,10 +282,6 @@ Status MergeHelper::MergeUntil(InternalIterator* iter, // We haven't seen the beginning of the key nor a Put/Delete. // Attempt to use the user's associative merge function to // merge the stacked merge operands into a single operand. - // - // TODO(noetzli) The docblock of MergeUntil suggests that a successful - // partial merge returns Status::OK(). Should we change the status code - // after a successful partial merge? s = Status::MergeInProgress(); if (merge_context_.GetNumOperands() >= 2 && merge_context_.GetNumOperands() >= min_partial_merge_operands_) { diff --git a/db/merge_helper.h b/db/merge_helper.h index 00a8cf1cf..455b5fb07 100644 --- a/db/merge_helper.h +++ b/db/merge_helper.h @@ -34,11 +34,13 @@ class MergeHelper { const CompactionFilter* compaction_filter, Logger* logger, unsigned min_partial_merge_operands, bool assert_valid_internal_key, SequenceNumber latest_snapshot, - int level = 0, Statistics* stats = nullptr) + int level = 0, Statistics* stats = nullptr, + const std::atomic* shutting_down = nullptr) : env_(env), user_comparator_(user_comparator), user_merge_operator_(user_merge_operator), compaction_filter_(compaction_filter), + shutting_down_(shutting_down), logger_(logger), min_partial_merge_operands_(min_partial_merge_operands), assert_valid_internal_key_(assert_valid_internal_key), @@ -81,10 +83,12 @@ class MergeHelper { // // Returns one of the following statuses: // - OK: Entries were successfully merged. - // - MergeInProgress: Put/Delete not encountered and unable to merge operands. + // - MergeInProgress: Put/Delete not encountered, and didn't reach the start + // of key's history. Output consists of merge operands only. // - Corruption: Merge operator reported unsuccessful merge or a corrupted // key has been encountered and not expected (applies only when compiling // with asserts removed). + // - ShutdownInProgress: interrupted by shutdown (*shutting_down == true). // // REQUIRED: The first key in the input is not corrupted. Status MergeUntil(InternalIterator* iter, @@ -150,6 +154,7 @@ class MergeHelper { const Comparator* user_comparator_; const MergeOperator* user_merge_operator_; const CompactionFilter* compaction_filter_; + const std::atomic* shutting_down_; Logger* logger_; unsigned min_partial_merge_operands_; bool assert_valid_internal_key_; // enforce no internal key corruption? @@ -171,6 +176,11 @@ class MergeHelper { bool has_compaction_filter_skip_until_ = false; std::string compaction_filter_value_; InternalKey compaction_filter_skip_until_; + + bool IsShuttingDown() { + // This is a best-effort facility, so memory_order_relaxed is sufficient. + return shutting_down_ && shutting_down_->load(std::memory_order_relaxed); + } }; // MergeOutputIterator can be used to iterate over the result of a merge. diff --git a/include/rocksdb/compaction_filter.h b/include/rocksdb/compaction_filter.h index ff6d6e2db..8a4e84d90 100644 --- a/include/rocksdb/compaction_filter.h +++ b/include/rocksdb/compaction_filter.h @@ -140,6 +140,12 @@ class CompactionFilter { // by kRemoveAndSkipUntil can disappear from a snapshot - beware // if you're using TransactionDB or DB::GetSnapshot(). // + // Another warning: if value for a key was overwritten or merged into + // (multiple Put()s or Merge()s), and compaction filter skips this key + // with kRemoveAndSkipUntil, it's possible that it will remove only + // the new value, exposing the old value that was supposed to be + // overwritten. + // // If you use kRemoveAndSkipUntil, consider also reducing // compaction_readahead_size option. //