diff --git a/db/db_impl.cc b/db/db_impl.cc index 3958614e4..f1d004122 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -85,11 +85,11 @@ struct DBImpl::Writer { struct DBImpl::CompactionState { Compaction* const compaction; - // Sequence numbers < smallest_snapshot are not significant since we - // will never have to service a snapshot below smallest_snapshot. - // Therefore if we have seen a sequence number S <= smallest_snapshot, - // we can drop all entries for the same key with sequence numbers < S. - SequenceNumber smallest_snapshot; + // If there were two snapshots with seq numbers s1 and + // s2 and s1 < s2, and if we find two instances of a key k1 then lies + // entirely within s1 and s2, then the earlier version of k1 can be safely + // deleted because that version is not visible in any snapshot. + std::vector existing_snapshots; // Files produced by compaction struct Output { @@ -1262,6 +1262,32 @@ Status DBImpl::InstallCompactionResults(CompactionState* compact) { return versions_->LogAndApply(compact->compaction->edit(), &mutex_); } +// +// Given a sequence number, return the sequence number of the +// earliest snapshot that this sequence number is visible in. +// The snapshots themselves are arranged in ascending order of +// sequence numbers. +// Employ a sequential search because the total number of +// snapshots are typically small. +inline SequenceNumber DBImpl::findEarliestVisibleSnapshot( + SequenceNumber in, std::vector& snapshots) { + SequenceNumber prev; + prev = 0; + for (std::vector::iterator it = snapshots.begin(); + it < snapshots.end(); it++) { + assert (prev <= *it); + if (*it >= in) { + return *it; + } + assert(prev = *it); // assignment + } + Log(options_.info_log, + "Looking for seqid %ld but maxseqid is %ld", in, + snapshots[snapshots.size()-1]); + assert(0); + return 0; +} + Status DBImpl::DoCompactionWork(CompactionState* compact) { int64_t imm_micros = 0; // Micros spent doing imm_ compactions @@ -1279,10 +1305,19 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { assert(versions_->NumLevelFiles(compact->compaction->level()) > 0); assert(compact->builder == NULL); assert(compact->outfile == NULL); - if (snapshots_.empty()) { - compact->smallest_snapshot = versions_->LastSequence(); + + SequenceNumber visible_at_tip = 0; + SequenceNumber earliest_snapshot; + snapshots_.getAll(compact->existing_snapshots); + if (compact->existing_snapshots.size() == 0) { + // optimize for fast path if there are no snapshots + visible_at_tip = versions_->LastSequence(); + earliest_snapshot = visible_at_tip; } else { - compact->smallest_snapshot = snapshots_.oldest()->number_; + // Add the current seqno as the 'latest' virtual + // snapshot to the end of this list. + compact->existing_snapshots.push_back(versions_->LastSequence()); + earliest_snapshot = compact->existing_snapshots[0]; } // Allocate the output file numbers before we release the lock @@ -1299,6 +1334,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { std::string current_user_key; bool has_current_user_key = false; SequenceNumber last_sequence_for_key = kMaxSequenceNumber; + SequenceNumber visible_in_snapshot = kMaxSequenceNumber; for (; input->Valid() && !shutting_down_.Acquire_Load(); ) { // Prioritize immutable compaction work if (imm_.imm_flush_needed.NoBarrier_Load() != NULL) { @@ -1330,6 +1366,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { current_user_key.clear(); has_current_user_key = false; last_sequence_for_key = kMaxSequenceNumber; + visible_in_snapshot = kMaxSequenceNumber; } else { if (!has_current_user_key || user_comparator()->Compare(ikey.user_key, @@ -1338,14 +1375,26 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { current_user_key.assign(ikey.user_key.data(), ikey.user_key.size()); has_current_user_key = true; last_sequence_for_key = kMaxSequenceNumber; + visible_in_snapshot = kMaxSequenceNumber; } - if (last_sequence_for_key <= compact->smallest_snapshot) { + // If there are no snapshots, then this kv affect visibility at tip. + // Otherwise, search though all existing snapshots to find + // the earlist snapshot that is affected by this kv. + SequenceNumber visible = visible_at_tip ? visible_at_tip : + findEarliestVisibleSnapshot(ikey.sequence, + compact->existing_snapshots); + + if (visible_in_snapshot == visible) { + // If the earliest snapshot is which this key is visible in + // is the same as the visibily of a previous instance of the + // same key, then this kv is not visible in any snapshot. // Hidden by an newer entry for same user key + assert(last_sequence_for_key >= ikey.sequence); drop = true; // (A) RecordTick(options_.statistics, COMPACTION_KEY_DROP_NEWER_ENTRY); } else if (ikey.type == kTypeDeletion && - ikey.sequence <= compact->smallest_snapshot && + ikey.sequence <= earliest_snapshot && compact->compaction->IsBaseLevelForKey(ikey.user_key)) { // For this user key: // (1) there is no data in higher levels @@ -1358,7 +1407,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { RecordTick(options_.statistics, COMPACTION_KEY_DROP_OBSOLETE); } else if (options_.CompactionFilter != NULL && ikey.type != kTypeDeletion && - ikey.sequence < compact->smallest_snapshot) { + ikey.sequence < earliest_snapshot) { // If the user has specified a compaction filter, then invoke // it. If this key is not visible via any snapshot and the // return value of the compaction filter is true and then @@ -1378,6 +1427,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { } last_sequence_for_key = ikey.sequence; + visible_in_snapshot = visible; } #if 0 Log(options_.info_log, @@ -1762,7 +1812,7 @@ Status DBImpl::MakeRoomForWrite(bool force) { } else if ( allow_delay && versions_->NumLevelFiles(0) >= - options_.level0_slowdown_writes_trigger) { + options_.level0_slowdown_writes_trigger) { // We are getting close to hitting a hard limit on the number of // L0 files. Rather than delaying a single write by several // seconds when we hit the hard limit, start delaying each @@ -1796,7 +1846,7 @@ Status DBImpl::MakeRoomForWrite(bool force) { bg_cv_.Wait(); stall_memtable_compaction_ += env_->NowMicros() - t1; } else if (versions_->NumLevelFiles(0) >= - options_.level0_stop_writes_trigger) { + options_.level0_stop_writes_trigger) { // There are too many level-0 files. DelayLoggingAndReset(); uint64_t t1 = env_->NowMicros(); diff --git a/db/db_impl.h b/db/db_impl.h index 334c17ed7..0c894acfa 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -303,6 +303,10 @@ protected: // dump the delayed_writes_ to the log file and reset counter. void DelayLoggingAndReset(); + + // find the earliest snapshot where seqno is visible + inline SequenceNumber findEarliestVisibleSnapshot(SequenceNumber in, + std::vector& snapshots); }; // Sanitize db options. The caller should delete result.info_log if diff --git a/db/db_test.cc b/db/db_test.cc index 2b2b9165d..ed32e940b 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -1050,9 +1050,8 @@ TEST(DBTest, CompactionTrigger) { Random rnd(301); for (int num = 0; - num < options.level0_file_num_compaction_trigger - 1; - num++) - { + num < options.level0_file_num_compaction_trigger - 1; + num++) { std::vector values; // Write 120KB (12 values, each 10K) for (int i = 0; i < 12; i++) { @@ -1189,7 +1188,7 @@ TEST(DBTest, RepeatedWritesToSameKey) { // We must have at most one file per level except for level-0, // which may have up to kL0_StopWritesTrigger files. const int kMaxFiles = dbfull()->NumberLevels() + - dbfull()->Level0StopWriteTrigger(); + dbfull()->Level0StopWriteTrigger(); Random rnd(301); std::string value = RandomString(&rnd, 2 * options.write_buffer_size); @@ -1594,6 +1593,59 @@ TEST(DBTest, HiddenValuesAreRemoved) { } while (ChangeOptions()); } +TEST(DBTest, CompactBetweenSnapshots) { + do { + Random rnd(301); + FillLevels("a", "z"); + + Put("foo", "first"); + const Snapshot* snapshot1 = db_->GetSnapshot(); + Put("foo", "second"); + Put("foo", "third"); + Put("foo", "fourth"); + const Snapshot* snapshot2 = db_->GetSnapshot(); + Put("foo", "fifth"); + Put("foo", "sixth"); + + // All entries (including duplicates) exist + // before any compaction is triggered. + ASSERT_OK(dbfull()->TEST_CompactMemTable()); + ASSERT_EQ("sixth", Get("foo")); + ASSERT_EQ("fourth", Get("foo", snapshot2)); + ASSERT_EQ("first", Get("foo", snapshot1)); + ASSERT_EQ(AllEntriesFor("foo"), + "[ sixth, fifth, fourth, third, second, first ]"); + + // After a compaction, "second", "third" and "fifth" should + // be removed + FillLevels("a", "z"); + dbfull()->CompactRange(NULL, NULL); + ASSERT_EQ("sixth", Get("foo")); + ASSERT_EQ("fourth", Get("foo", snapshot2)); + ASSERT_EQ("first", Get("foo", snapshot1)); + ASSERT_EQ(AllEntriesFor("foo"), "[ sixth, fourth, first ]"); + + // after we release the snapshot1, only two values left + db_->ReleaseSnapshot(snapshot1); + FillLevels("a", "z"); + dbfull()->CompactRange(NULL, NULL); + + // We have only one valid snapshot snapshot2. Since snapshot1 is + // not valid anymore, "first" should be removed by a compaction. + ASSERT_EQ("sixth", Get("foo")); + ASSERT_EQ("fourth", Get("foo", snapshot2)); + ASSERT_EQ(AllEntriesFor("foo"), "[ sixth, fourth ]"); + + // after we release the snapshot2, only one value should be left + db_->ReleaseSnapshot(snapshot2); + FillLevels("a", "z"); + dbfull()->CompactRange(NULL, NULL); + ASSERT_EQ("sixth", Get("foo")); + ASSERT_EQ(AllEntriesFor("foo"), "[ sixth ]"); + + } while (ChangeOptions()); +} + TEST(DBTest, DeletionMarkers1) { Put("foo", "v1"); ASSERT_OK(dbfull()->TEST_CompactMemTable()); diff --git a/db/snapshot.h b/db/snapshot.h index e7f8fd2c3..dc7db09a2 100644 --- a/db/snapshot.h +++ b/db/snapshot.h @@ -32,6 +32,7 @@ class SnapshotList { SnapshotList() { list_.prev_ = &list_; list_.next_ = &list_; + list_.number_ = 0xFFFFFFFFL; // placeholder marker, for debugging } bool empty() const { return list_.next_ == &list_; } @@ -56,6 +57,20 @@ class SnapshotList { delete s; } + // retrieve all snapshot numbers. They are sorted in ascending order. + void getAll(std::vector& ret) { + SnapshotImpl* s = &list_; + SequenceNumber prev; + prev = 0; + if (empty()) return; + while (s->next_ != &list_) { + assert(prev <= s->next_->number_); + assert(prev = s->next_->number_); // assignment + ret.push_back(s->next_->number_); + s = s ->next_; + } + } + private: // Dummy head of doubly-linked list of snapshots SnapshotImpl list_;