From 014fd55adca7b217d08f579f78303eef39b834f2 Mon Sep 17 00:00:00 2001 From: Andres Noetzli Date: Thu, 17 Sep 2015 11:42:56 -0700 Subject: [PATCH] Support for SingleDelete() Summary: This patch fixes #7460559. It introduces SingleDelete as a new database operation. This operation can be used to delete keys that were never overwritten (no put following another put of the same key). If an overwritten key is single deleted the behavior is undefined. Single deletion of a non-existent key has no effect but multiple consecutive single deletions are not allowed (see limitations). In contrast to the conventional Delete() operation, the deletion entry is removed along with the value when the two are lined up in a compaction. Note: The semantics are similar to @igor's prototype that allowed to have this behavior on the granularity of a column family ( https://reviews.facebook.net/D42093 ). This new patch, however, is more aggressive when it comes to removing tombstones: It removes the SingleDelete together with the value whenever there is no snapshot between them while the older patch only did this when the sequence number of the deletion was older than the earliest snapshot. Most of the complex additions are in the Compaction Iterator, all other changes should be relatively straightforward. The patch also includes basic support for single deletions in db_stress and db_bench. Limitations: - Not compatible with cuckoo hash tables - Single deletions cannot be used in combination with merges and normal deletions on the same key (other keys are not affected by this) - Consecutive single deletions are currently not allowed (and older version of this patch supported this so it could be resurrected if needed) Test Plan: make all check Reviewers: yhchiang, sdong, rven, anthony, yoshinorim, igor Reviewed By: igor Subscribers: maykov, dhruba, leveldb Differential Revision: https://reviews.facebook.net/D43179 --- HISTORY.md | 7 + Makefile | 4 + db/builder.cc | 29 ++- db/compaction_iterator.cc | 117 ++++++++--- db/compaction_iterator.h | 22 +- db/compaction_iterator_test.cc | 70 +++++++ db/compaction_job_test.cc | 170 +++++++++++++++- db/db_bench.cc | 75 ++++++- db/db_impl.cc | 13 ++ db/db_impl.h | 6 +- db/db_impl_readonly.h | 6 + db/db_iter.cc | 30 +-- db/db_iter_test.cc | 66 +++++- db/db_test.cc | 170 +++++++++++++++- db/dbformat.cc | 4 +- db/dbformat.h | 53 +++-- db/internal_stats.cc | 2 +- db/memtable.cc | 5 +- db/merge_helper.cc | 9 +- db/merge_helper.h | 4 +- db/merge_helper_test.cc | 23 +-- db/table_properties_collector.cc | 8 +- db/table_properties_collector_test.cc | 84 +++++--- db/write_batch.cc | 137 ++++++++++--- db/write_batch_base.cc | 13 ++ db/write_batch_internal.h | 6 + db/write_batch_test.cc | 188 ++++++++++++------ include/rocksdb/db.h | 11 + include/rocksdb/perf_context.h | 2 +- include/rocksdb/table_properties.h | 1 + include/rocksdb/utilities/stackable_db.h | 7 + .../utilities/write_batch_with_index.h | 13 +- include/rocksdb/write_batch.h | 63 ++++-- include/rocksdb/write_batch_base.h | 11 + table/get_context.cc | 3 + tools/db_stress.cc | 169 ++++++++++++---- util/db_test_util.cc | 11 + util/db_test_util.h | 4 + util/testutil.cc | 9 + util/testutil.h | 4 + .../write_batch_with_index.cc | 41 ++-- .../write_batch_with_index_internal.cc | 7 +- .../write_batch_with_index_internal.h | 2 +- 43 files changed, 1364 insertions(+), 315 deletions(-) create mode 100644 db/compaction_iterator_test.cc diff --git a/HISTORY.md b/HISTORY.md index 9457c0d8e..82656e2a0 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -1,5 +1,12 @@ # Rocksdb Change Log +## Unreleased +### New Features +* Added single delete operation as a more efficient way to delete keys that have not been overwritten. + +### Public API Changes +* Added SingleDelete() to the DB interface. + ## 4.0.0 (9/9/2015) ### New Features * Added support for transactions. See include/rocksdb/utilities/transaction.h for more info. diff --git a/Makefile b/Makefile index 648d5800d..68e5ac59e 100644 --- a/Makefile +++ b/Makefile @@ -295,6 +295,7 @@ TESTS = \ flush_job_test \ wal_manager_test \ listener_test \ + compaction_iterator_test \ compaction_job_test \ thread_list_test \ sst_dump_test \ @@ -773,6 +774,9 @@ write_batch_with_index_test: utilities/write_batch_with_index/write_batch_with_i flush_job_test: db/flush_job_test.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) +compaction_iterator_test: db/compaction_iterator_test.o $(LIBOBJECTS) $(TESTHARNESS) + $(AM_LINK) + compaction_job_test: db/compaction_job_test.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) diff --git a/db/builder.cc b/db/builder.cc index a40a9564a..99ddcb049 100644 --- a/db/builder.cc +++ b/db/builder.cc @@ -110,13 +110,15 @@ Status BuildTable( } // Finish and check for builder errors + bool empty = builder->NumEntries() == 0; s = c_iter.status(); - if (s.ok()) { - s = builder->Finish(); - } else { + if (!s.ok() || empty) { builder->Abandon(); + } else { + s = builder->Finish(); } - if (s.ok()) { + + if (s.ok() && !empty) { meta->fd.file_size = builder->FileSize(); meta->marked_for_compaction = builder->NeedCompact(); assert(meta->fd.GetFileSize() > 0); @@ -127,28 +129,27 @@ Status BuildTable( delete builder; // Finish and check for file errors - if (s.ok() && !ioptions.disable_data_sync) { + if (s.ok() && !empty && !ioptions.disable_data_sync) { StopWatch sw(env, ioptions.statistics, TABLE_SYNC_MICROS); file_writer->Sync(ioptions.use_fsync); } - if (s.ok()) { + if (s.ok() && !empty) { s = file_writer->Close(); } - if (s.ok()) { + if (s.ok() && !empty) { // Verify that the table is usable - Iterator* it = table_cache->NewIterator( + std::unique_ptr it(table_cache->NewIterator( ReadOptions(), env_options, internal_comparator, meta->fd, nullptr, (internal_stats == nullptr) ? nullptr : internal_stats->GetFileReadHist(0), - false); + false)); s = it->status(); if (s.ok() && paranoid_file_checks) { - for (it->SeekToFirst(); it->Valid(); it->Next()) {} + for (it->SeekToFirst(); it->Valid(); it->Next()) { + } s = it->status(); } - - delete it; } } @@ -157,9 +158,7 @@ Status BuildTable( s = iter->status(); } - if (s.ok() && meta->fd.GetFileSize() > 0) { - // Keep it - } else { + if (!s.ok() || meta->fd.GetFileSize() == 0) { env->DeleteFile(fname); } return s; diff --git a/db/compaction_iterator.cc b/db/compaction_iterator.cc index d43651667..e6f1bb60a 100644 --- a/db/compaction_iterator.cc +++ b/db/compaction_iterator.cc @@ -71,6 +71,10 @@ void CompactionIterator::Next() { // MergeUntil stops when it encounters a corrupt key and does not // include them in the result, so we expect the keys here to be valid. assert(valid_key); + // Keep current_key_ in sync. + current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type); + key_ = current_key_.GetKey(); + ikey_.user_key = current_key_.GetUserKey(); valid_ = true; } else { // MergeHelper moves the iterator to the first record after the merged @@ -79,8 +83,11 @@ void CompactionIterator::Next() { NextFromInput(); } } else { - // Only advance the input iterator if there is no merge output. - input_->Next(); + // Only advance the input iterator if there is no merge output and the + // iterator is not already at the next record. + if (!at_next_) { + input_->Next(); + } NextFromInput(); } @@ -88,9 +95,10 @@ void CompactionIterator::Next() { } void CompactionIterator::NextFromInput() { + at_next_ = false; valid_ = false; - while (input_->Valid()) { + while (!valid_ && input_->Valid()) { key_ = input_->key(); value_ = input_->value(); iter_stats_.num_input_records++; @@ -100,10 +108,11 @@ void CompactionIterator::NextFromInput() { // and let the caller decide what to do with it. // TODO(noetzli): We should have a more elegant solution for this. if (expect_valid_internal_key_) { - assert(!"corrupted internal key is not expected"); + assert(!"Corrupted internal key not expected."); + status_ = Status::Corruption("Corrupted internal key not expected."); break; } - current_user_key_.Clear(); + key_ = current_key_.SetKey(key_); has_current_user_key_ = false; current_user_key_sequence_ = kMaxSequenceNumber; current_user_key_snapshot_ = 0; @@ -113,16 +122,20 @@ void CompactionIterator::NextFromInput() { } // Update input statistics - if (ikey_.type == kTypeDeletion) { + if (ikey_.type == kTypeDeletion || ikey_.type == kTypeSingleDeletion) { iter_stats_.num_input_deletion_records++; } iter_stats_.total_input_raw_key_bytes += key_.size(); iter_stats_.total_input_raw_value_bytes += value_.size(); + // Check whether the user key changed. After this if statement current_key_ + // is a copy of the current input key (maybe converted to a delete by the + // compaction filter). ikey_.user_key is pointing to the copy. if (!has_current_user_key_ || - cmp_->Compare(ikey_.user_key, current_user_key_.GetKey()) != 0) { + !cmp_->Equal(ikey_.user_key, current_user_key_)) { // First occurrence of this user key - current_user_key_.SetKey(ikey_.user_key); + key_ = current_key_.SetKey(key_, &ikey_); + current_user_key_ = ikey_.user_key; has_current_user_key_ = true; current_user_key_sequence_ = kMaxSequenceNumber; current_user_key_snapshot_ = 0; @@ -145,13 +158,9 @@ void CompactionIterator::NextFromInput() { env_ != nullptr ? timer.ElapsedNanos() : 0; } if (to_delete) { - // make a copy of the original key and convert it to a delete - delete_key_.SetInternalKey(ExtractUserKey(key_), ikey_.sequence, - kTypeDeletion); - // anchor the key again - key_ = delete_key_.GetKey(); - // needed because ikey_ is backed by key - ParseInternalKey(key_, &ikey_); + // convert the current key to a delete + ikey_.type = kTypeDeletion; + current_key_.UpdateInternalKey(ikey_.sequence, kTypeDeletion); // no value associated with delete value_.clear(); iter_stats_.num_record_drop_user++; @@ -159,6 +168,12 @@ void CompactionIterator::NextFromInput() { value_ = compaction_filter_value_; } } + } else { + // Update the current key to reflect the new sequence number/type without + // copying the user key. + current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type); + key_ = current_key_.GetKey(); + ikey_.user_key = current_key_.GetUserKey(); } // If there are no snapshots, then this kv affect visibility at tip. @@ -173,7 +188,58 @@ void CompactionIterator::NextFromInput() { visible_at_tip_ ? visible_at_tip_ : findEarliestVisibleSnapshot( ikey_.sequence, &prev_snapshot); - if (last_snapshot == current_user_key_snapshot_) { + if (ikey_.type == kTypeSingleDeletion) { + ParsedInternalKey next_ikey; + input_->Next(); + + // Check whether the current key is valid, not corrupt and the same + // as the single delete. + if (input_->Valid() && ParseInternalKey(input_->key(), &next_ikey) && + cmp_->Equal(ikey_.user_key, next_ikey.user_key)) { + // Mixing single deletes and merges is not supported. Consecutive + // single deletes are not valid. + if (next_ikey.type != kTypeValue) { + assert(false); + status_ = + Status::InvalidArgument("Put expected after single delete."); + break; + } + + // Check whether the current key belongs to the same snapshot as the + // single delete. + if (prev_snapshot == 0 || next_ikey.sequence > prev_snapshot) { + // Found the matching value, we can drop the single delete and the + // value. + ++iter_stats_.num_record_drop_hidden; + ++iter_stats_.num_record_drop_obsolete; + input_->Next(); + } else { + // We hit the next snapshot without hitting a put, so the iterator + // returns the single delete. + valid_ = true; + } + } else { + // We are at the end of the input, could not parse the next key, or hit + // the next key. The iterator returns the single delete if the key + // possibly exists beyond the current output level. We set + // has_current_user_key to false so that if the iterator is at the next + // key, we do not compare it again against the previous key at the next + // iteration. If the next key is corrupt, we return before the + // comparison, so the value of has_current_user_key does not matter. + has_current_user_key_ = false; + if (compaction_ != nullptr && + compaction_->KeyNotExistsBeyondOutputLevel(ikey_.user_key, + &level_ptrs_)) { + ++iter_stats_.num_record_drop_obsolete; + } else { + valid_ = true; + } + } + + if (valid_) { + at_next_ = true; + } + } else if (last_snapshot == current_user_key_snapshot_) { // If the earliest snapshot is which this key is visible in // is the same as the visibility of a previous instance of the // same key, then this kv is not visible in any snapshot. @@ -181,6 +247,7 @@ void CompactionIterator::NextFromInput() { // TODO: why not > ? assert(last_sequence >= current_user_key_sequence_); ++iter_stats_.num_record_drop_hidden; // (A) + input_->Next(); } else if (compaction_ != nullptr && ikey_.type == kTypeDeletion && ikey_.sequence <= earliest_snapshot_ && compaction_->KeyNotExistsBeyondOutputLevel(ikey_.user_key, @@ -197,6 +264,7 @@ void CompactionIterator::NextFromInput() { // few iterations of this loop (by rule (A) above). // Therefore this deletion marker is obsolete and can be dropped. ++iter_stats_.num_record_drop_obsolete; + input_->Next(); } else if (ikey_.type == kTypeMerge) { if (!merge_helper_->HasOperator()) { LogToBuffer(log_buffer_, "Options::merge_operator is null."); @@ -222,14 +290,14 @@ void CompactionIterator::NextFromInput() { // MergeUntil stops when it encounters a corrupt key and does not // include them in the result, so we expect the keys here to valid. assert(valid_key); + // Keep current_key_ in sync. + current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type); + key_ = current_key_.GetKey(); + ikey_.user_key = current_key_.GetUserKey(); valid_ = true; - break; } else { valid_ = true; - break; } - - input_->Next(); } } @@ -240,12 +308,9 @@ void CompactionIterator::PrepareOutput() { // then we can squash the seqno to zero. if (bottommost_level_ && valid_ && ikey_.sequence < earliest_snapshot_ && ikey_.type != kTypeMerge) { - assert(ikey_.type != kTypeDeletion); - // make a copy because updating in place would cause problems - // with the priority queue that is managing the input key iterator - updated_key_.assign(key_.data(), key_.size()); - UpdateInternalKey(&updated_key_, (uint64_t)0, ikey_.type); - key_ = Slice(updated_key_); + assert(ikey_.type != kTypeDeletion && ikey_.type != kTypeSingleDeletion); + ikey_.sequence = 0; + current_key_.UpdateInternalKey(0, ikey_.type); } } diff --git a/db/compaction_iterator.h b/db/compaction_iterator.h index a92972256..8229f2f00 100644 --- a/db/compaction_iterator.h +++ b/db/compaction_iterator.h @@ -64,7 +64,7 @@ class CompactionIterator { const Status& status() const { return status_; } const ParsedInternalKey& ikey() const { return ikey_; } bool Valid() const { return valid_; } - Slice user_key() const { return current_user_key_.GetKey(); } + const Slice& user_key() const { return current_user_key_; } const CompactionIteratorStats& iter_stats() const { return iter_stats_; } private: @@ -102,18 +102,32 @@ class CompactionIterator { SequenceNumber latest_snapshot_; // State + // + // Points to a copy of the current compaction iterator output (current_key_) + // if valid_. Slice key_; + // Points to the value in the underlying iterator that corresponds to the + // current output. Slice value_; + // The status is OK unless compaction iterator encounters a merge operand + // while not having a merge operator defined. Status status_; + // Stores the user key, sequence number and type of the current compaction + // iterator output (or current key in the underlying iterator during + // NextFromInput()). ParsedInternalKey ikey_; + // Stores whether ikey_.user_key is valid. If set to false, the user key is + // not compared against the current key in the underlying iterator. bool has_current_user_key_ = false; - IterKey current_user_key_; + bool at_next_ = false; // If false, the iterator + // Holds a copy of the current compaction iterator output (or current key in + // the underlying iterator during NextFromInput()). + IterKey current_key_; + Slice current_user_key_; SequenceNumber current_user_key_sequence_; SequenceNumber current_user_key_snapshot_; MergeOutputIterator merge_out_iter_; - std::string updated_key_; std::string compaction_filter_value_; - IterKey delete_key_; // "level_ptrs" holds indices that remember which file of an associated // level we were last checking during the last call to compaction-> // KeyNotExistsBeyondOutputLevel(). This allows future calls to the function diff --git a/db/compaction_iterator_test.cc b/db/compaction_iterator_test.cc new file mode 100644 index 000000000..8b8a3c9a4 --- /dev/null +++ b/db/compaction_iterator_test.cc @@ -0,0 +1,70 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#include "db/compaction_iterator.h" +#include "util/testharness.h" +#include "util/testutil.h" + +namespace rocksdb { + +class CompactionIteratorTest : public testing::Test { + public: + CompactionIteratorTest() : cmp_(BytewiseComparator()), snapshots_({}) {} + + void InitIterator(const std::vector& ks, + const std::vector& vs, + SequenceNumber last_sequence) { + merge_helper_.reset(new MergeHelper(cmp_, nullptr, nullptr, 0U, false)); + iter_.reset(new test::VectorIterator(ks, vs)); + iter_->SeekToFirst(); + c_iter_.reset(new CompactionIterator(iter_.get(), cmp_, merge_helper_.get(), + last_sequence, &snapshots_, + Env::Default(), false)); + } + + const Comparator* cmp_; + std::vector snapshots_; + std::unique_ptr merge_helper_; + std::unique_ptr iter_; + std::unique_ptr c_iter_; +}; + +// It is possible that the output of the compaction iterator is empty even if +// the input is not. +TEST_F(CompactionIteratorTest, EmptyResult) { + InitIterator({test::KeyStr("a", 5, kTypeSingleDeletion), + test::KeyStr("a", 3, kTypeValue)}, + {"", "val"}, 5); + c_iter_->SeekToFirst(); + ASSERT_FALSE(c_iter_->Valid()); +} + +// If there is a corruption after a single deletion, the corrupted key should +// be preserved. +TEST_F(CompactionIteratorTest, CorruptionAfterSingleDeletion) { + InitIterator({test::KeyStr("a", 5, kTypeSingleDeletion), + test::KeyStr("a", 3, kTypeValue, true), + test::KeyStr("b", 10, kTypeValue)}, + {"", "val", "val2"}, 10); + c_iter_->SeekToFirst(); + ASSERT_TRUE(c_iter_->Valid()); + ASSERT_EQ(test::KeyStr("a", 5, kTypeSingleDeletion), + c_iter_->key().ToString()); + c_iter_->Next(); + ASSERT_TRUE(c_iter_->Valid()); + ASSERT_EQ(test::KeyStr("a", 3, kTypeValue, true), c_iter_->key().ToString()); + c_iter_->Next(); + ASSERT_TRUE(c_iter_->Valid()); + ASSERT_EQ(test::KeyStr("b", 10, kTypeValue), c_iter_->key().ToString()); + c_iter_->Next(); + ASSERT_FALSE(c_iter_->Valid()); +} + +} // namespace rocksdb + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/db/compaction_job_test.cc b/db/compaction_job_test.cc index 2d0568585..795607afe 100644 --- a/db/compaction_job_test.cc +++ b/db/compaction_job_test.cc @@ -215,7 +215,8 @@ class CompactionJobTest : public testing::Test { } void RunCompaction(const std::vector>& input_files, - const stl_wrappers::KVMap& expected_results) { + const stl_wrappers::KVMap& expected_results, + const std::vector& snapshots = {}) { auto cfd = versions_->GetColumnFamilySet()->GetDefault(); size_t num_input_files = 0; @@ -241,9 +242,9 @@ class CompactionJobTest : public testing::Test { EventLogger event_logger(db_options_.info_log.get()); CompactionJob compaction_job(0, &compaction, db_options_, env_options_, versions_.get(), &shutting_down_, &log_buffer, - nullptr, nullptr, nullptr, {}, table_cache_, - &event_logger, false, false, dbname_, - &compaction_job_stats_); + nullptr, nullptr, nullptr, snapshots, + table_cache_, &event_logger, false, false, + dbname_, &compaction_job_stats_); VerifyInitializationOfCompactionJobStats(compaction_job_stats_); @@ -419,6 +420,167 @@ TEST_F(CompactionJobTest, NonAssocMerge) { RunCompaction({ files }, expected_results); } +TEST_F(CompactionJobTest, SimpleSingleDelete) { + NewDB(); + + auto file1 = mock::MakeMockFile({ + {KeyStr("a", 5U, kTypeDeletion), ""}, + {KeyStr("b", 6U, kTypeSingleDeletion), ""}, + }); + AddMockFile(file1); + + auto file2 = mock::MakeMockFile({{KeyStr("a", 3U, kTypeValue), "val"}, + {KeyStr("b", 4U, kTypeValue), "val"}}); + AddMockFile(file2); + + auto file3 = mock::MakeMockFile({ + {KeyStr("a", 1U, kTypeValue), "val"}, + }); + AddMockFile(file3, 2); + + auto expected_results = + mock::MakeMockFile({{KeyStr("a", 5U, kTypeDeletion), ""}}); + + SetLastSequence(6U); + auto files = cfd_->current()->storage_info()->LevelFiles(0); + RunCompaction({files}, expected_results); +} + +TEST_F(CompactionJobTest, SingleDeleteSnapshots) { + NewDB(); + + auto file1 = mock::MakeMockFile({{KeyStr("A", 12U, kTypeSingleDeletion), ""}, + {KeyStr("a", 12U, kTypeSingleDeletion), ""}, + {KeyStr("b", 21U, kTypeSingleDeletion), ""}, + {KeyStr("c", 22U, kTypeSingleDeletion), ""}, + {KeyStr("d", 9U, kTypeSingleDeletion), ""}}); + AddMockFile(file1); + + auto file2 = mock::MakeMockFile({{KeyStr("0", 2U, kTypeSingleDeletion), ""}, + {KeyStr("a", 11U, kTypeValue), "val1"}, + {KeyStr("b", 11U, kTypeValue), "val2"}, + {KeyStr("c", 21U, kTypeValue), "val3"}, + {KeyStr("d", 8U, kTypeValue), "val4"}, + {KeyStr("e", 2U, kTypeSingleDeletion), ""}}); + AddMockFile(file2); + + auto file3 = mock::MakeMockFile({{KeyStr("A", 1U, kTypeValue), "val"}, + {KeyStr("e", 1U, kTypeValue), "val"}}); + AddMockFile(file3, 2); + + auto expected_results = + mock::MakeMockFile({{KeyStr("A", 12U, kTypeSingleDeletion), ""}, + {KeyStr("b", 21U, kTypeSingleDeletion), ""}, + {KeyStr("b", 11U, kTypeValue), "val2"}, + {KeyStr("e", 2U, kTypeSingleDeletion), ""}}); + + SetLastSequence(22U); + auto files = cfd_->current()->storage_info()->LevelFiles(0); + RunCompaction({files}, expected_results, {10U, 20U}); +} + +TEST_F(CompactionJobTest, SingleDeleteZeroSeq) { + NewDB(); + + auto file1 = mock::MakeMockFile({ + {KeyStr("A", 10U, kTypeSingleDeletion), ""}, + {KeyStr("dummy", 5U, kTypeValue), "val2"}, + }); + AddMockFile(file1); + + auto file2 = mock::MakeMockFile({ + {KeyStr("A", 0U, kTypeValue), "val"}, + }); + AddMockFile(file2); + + auto expected_results = mock::MakeMockFile({ + {KeyStr("dummy", 0U, kTypeValue), "val2"}, + }); + + SetLastSequence(22U); + auto files = cfd_->current()->storage_info()->LevelFiles(0); + RunCompaction({files}, expected_results, {}); +} + +TEST_F(CompactionJobTest, MultiSingleDelete) { + // Tests three scenarios involving multiple single delete/put pairs: + // + // A: Put Snapshot SDel Put SDel -> Put Snapshot SDel + // B: Put SDel Put SDel -> (Removed) + // C: SDel Put SDel Snapshot Put -> Snapshot Put + // D: (Put) SDel Snapshot Put SDel -> (Put) SDel Snapshot + NewDB(); + + auto file1 = mock::MakeMockFile({ + {KeyStr("A", 14U, kTypeSingleDeletion), ""}, + {KeyStr("A", 13U, kTypeValue), "val5"}, + {KeyStr("A", 12U, kTypeSingleDeletion), ""}, + {KeyStr("B", 14U, kTypeSingleDeletion), ""}, + {KeyStr("B", 13U, kTypeValue), "val2"}, + {KeyStr("C", 14U, kTypeValue), "val3"}, + {KeyStr("D", 12U, kTypeSingleDeletion), ""}, + {KeyStr("D", 11U, kTypeValue), "val4"}, + }); + AddMockFile(file1); + + auto file2 = mock::MakeMockFile({ + {KeyStr("A", 10U, kTypeValue), "val"}, + {KeyStr("B", 12U, kTypeSingleDeletion), ""}, + {KeyStr("B", 11U, kTypeValue), "val2"}, + {KeyStr("C", 10U, kTypeSingleDeletion), ""}, + {KeyStr("C", 9U, kTypeValue), "val6"}, + {KeyStr("C", 8U, kTypeSingleDeletion), ""}, + {KeyStr("D", 10U, kTypeSingleDeletion), ""}, + }); + AddMockFile(file2); + + auto file3 = mock::MakeMockFile({ + {KeyStr("D", 11U, kTypeValue), "val"}, + }); + AddMockFile(file3, 2); + + auto expected_results = mock::MakeMockFile({ + {KeyStr("A", 12U, kTypeSingleDeletion), ""}, + {KeyStr("A", 10U, kTypeValue), "val"}, + {KeyStr("C", 14U, kTypeValue), "val3"}, + {KeyStr("D", 10U, kTypeSingleDeletion), ""}, + }); + + SetLastSequence(22U); + auto files = cfd_->current()->storage_info()->LevelFiles(0); + RunCompaction({files}, expected_results, {10U}); +} + +// This test documents the behavior where a corrupt key follows a deletion or a +// single deletion and the (single) deletion gets removed while the corrupt key +// gets written out. TODO(noetzli): We probably want a better way to treat +// corrupt keys. +TEST_F(CompactionJobTest, CorruptionAfterDeletion) { + NewDB(); + + auto file1 = + mock::MakeMockFile({{test::KeyStr("A", 6U, kTypeValue), "val3"}, + {test::KeyStr("a", 5U, kTypeDeletion), ""}, + {test::KeyStr("a", 4U, kTypeValue, true), "val"}}); + AddMockFile(file1); + + auto file2 = + mock::MakeMockFile({{test::KeyStr("b", 3U, kTypeSingleDeletion), ""}, + {test::KeyStr("b", 2U, kTypeValue, true), "val"}, + {test::KeyStr("c", 1U, kTypeValue), "val2"}}); + AddMockFile(file2); + + auto expected_results = + mock::MakeMockFile({{test::KeyStr("A", 0U, kTypeValue), "val3"}, + {test::KeyStr("a", 0U, kTypeValue, true), "val"}, + {test::KeyStr("b", 0U, kTypeValue, true), "val"}, + {test::KeyStr("c", 0U, kTypeValue), "val2"}}); + + SetLastSequence(6U); + auto files = cfd_->current()->storage_info()->LevelFiles(0); + RunCompaction({files}, expected_results); +} + } // namespace rocksdb int main(int argc, char** argv) { diff --git a/db/db_bench.cc b/db/db_bench.cc index 9f9239734..c02457e22 100644 --- a/db/db_bench.cc +++ b/db/db_bench.cc @@ -110,7 +110,8 @@ DEFINE_string(benchmarks, "uncompress," "acquireload," "fillseekseq," - "randomtransaction", + "randomtransaction," + "randomreplacekeys", "Comma-separated list of operations to run in the specified" " order. Available benchmarks:\n" @@ -161,6 +162,8 @@ DEFINE_string(benchmarks, "them by seeking to each key\n" "\trandomtransaction -- execute N random transactions and " "verify correctness\n" + "\trandomreplacekeys -- randomly replaces N keys by deleting " + "the old version and putting the new version\n\n" "Meta operations:\n" "\tcompact -- Compact the entire DB\n" "\tstats -- Print DB stats\n" @@ -688,6 +691,13 @@ DEFINE_uint64(wal_bytes_per_sync, rocksdb::Options().wal_bytes_per_sync, DEFINE_bool(filter_deletes, false, " On true, deletes use bloom-filter and drop" " the delete if key not present"); +DEFINE_bool(use_single_deletes, true, + "Use single deletes (used in RandomReplaceKeys only)."); + +DEFINE_double(stddev, 2000.0, + "Standard deviation of normal distribution used for picking keys" + " (used in RandomReplaceKeys only)."); + DEFINE_int32(max_successive_merges, 0, "Maximum number of successive merge" " operations on a key in the memtable"); @@ -1925,6 +1935,9 @@ class Benchmark { } else if (name == "randomtransaction") { method = &Benchmark::RandomTransaction; post_process_method = &Benchmark::RandomTransactionVerify; + } else if (name == "randomreplacekeys") { + fresh_db = true; + method = &Benchmark::RandomReplaceKeys; } else if (name == "stats") { PrintStats("rocksdb.stats"); } else if (name == "levelstats") { @@ -3846,6 +3859,66 @@ class Benchmark { fprintf(stdout, "RandomTransactionVerify Success!\n"); } + // Writes and deletes random keys without overwriting keys. + // + // This benchmark is intended to partially replicate the behavior of MyRocks + // secondary indices: All data is stored in keys and updates happen by + // deleting the old version of the key and inserting the new version. + void RandomReplaceKeys(ThreadState* thread) { + std::unique_ptr key_guard; + Slice key = AllocateKey(&key_guard); + std::vector counters(FLAGS_numdistinct, 0); + size_t max_counter = 50; + RandomGenerator gen; + + Status s; + DB* db = SelectDB(thread); + for (int64_t i = 0; i < FLAGS_numdistinct; i++) { + GenerateKeyFromInt(i * max_counter, FLAGS_num, &key); + s = db->Put(write_options_, key, gen.Generate(value_size_)); + if (!s.ok()) { + fprintf(stderr, "Operation failed: %s\n", s.ToString().c_str()); + exit(1); + } + } + + db->GetSnapshot(); + + std::default_random_engine generator; + std::normal_distribution distribution(FLAGS_numdistinct / 2.0, + FLAGS_stddev); + Duration duration(FLAGS_duration, FLAGS_num); + while (!duration.Done(1)) { + int64_t rnd_id = static_cast(distribution(generator)); + int64_t key_id = std::max(std::min(FLAGS_numdistinct - 1, rnd_id), + static_cast(0)); + GenerateKeyFromInt(key_id * max_counter + counters[key_id], FLAGS_num, + &key); + s = FLAGS_use_single_deletes ? db->SingleDelete(write_options_, key) + : db->Delete(write_options_, key); + if (s.ok()) { + counters[key_id] = (counters[key_id] + 1) % max_counter; + GenerateKeyFromInt(key_id * max_counter + counters[key_id], FLAGS_num, + &key); + s = db->Put(write_options_, key, Slice()); + } + + if (!s.ok()) { + fprintf(stderr, "Operation failed: %s\n", s.ToString().c_str()); + exit(1); + } + + thread->stats.FinishedOps(nullptr, db, 1); + } + + char msg[200]; + snprintf(msg, sizeof(msg), + "use single deletes: %d, " + "standard deviation: %lf\n", + FLAGS_use_single_deletes, FLAGS_stddev); + thread->stats.AddMessage(msg); + } + void Compact(ThreadState* thread) { DB* db = SelectDB(thread); db->CompactRange(CompactRangeOptions(), nullptr, nullptr); diff --git a/db/db_impl.cc b/db/db_impl.cc index 36d29327b..3e1d3c703 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -3432,6 +3432,12 @@ Status DBImpl::Delete(const WriteOptions& write_options, return DB::Delete(write_options, column_family, key); } +Status DBImpl::SingleDelete(const WriteOptions& write_options, + ColumnFamilyHandle* column_family, + const Slice& key) { + return DB::SingleDelete(write_options, column_family, key); +} + Status DBImpl::Write(const WriteOptions& write_options, WriteBatch* my_batch) { return WriteImpl(write_options, my_batch, nullptr); } @@ -4315,6 +4321,13 @@ Status DB::Delete(const WriteOptions& opt, ColumnFamilyHandle* column_family, return Write(opt, &batch); } +Status DB::SingleDelete(const WriteOptions& opt, + ColumnFamilyHandle* column_family, const Slice& key) { + WriteBatch batch; + batch.SingleDelete(column_family, key); + return Write(opt, &batch); +} + Status DB::Merge(const WriteOptions& opt, ColumnFamilyHandle* column_family, const Slice& key, const Slice& value) { WriteBatch batch; diff --git a/db/db_impl.h b/db/db_impl.h index 662c613a9..18a2b34a8 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -12,7 +12,6 @@ #include #include #include -#include #include #include #include @@ -40,7 +39,6 @@ #include "util/autovector.h" #include "util/event_logger.h" #include "util/hash.h" -#include "util/hash.h" #include "util/instrumented_mutex.h" #include "util/scoped_arena_iterator.h" #include "util/stop_watch.h" @@ -76,6 +74,10 @@ class DBImpl : public DB { virtual Status Delete(const WriteOptions& options, ColumnFamilyHandle* column_family, const Slice& key) override; + using DB::SingleDelete; + virtual Status SingleDelete(const WriteOptions& options, + ColumnFamilyHandle* column_family, + const Slice& key) override; using DB::Write; virtual Status Write(const WriteOptions& options, WriteBatch* updates) override; diff --git a/db/db_impl_readonly.h b/db/db_impl_readonly.h index fc14c04bc..6ad82bbf5 100644 --- a/db/db_impl_readonly.h +++ b/db/db_impl_readonly.h @@ -53,6 +53,12 @@ class DBImplReadOnly : public DBImpl { const Slice& key) override { return Status::NotSupported("Not supported operation in read only mode."); } + using DBImpl::SingleDelete; + virtual Status SingleDelete(const WriteOptions& options, + ColumnFamilyHandle* column_family, + const Slice& key) override { + return Status::NotSupported("Not supported operation in read only mode."); + } virtual Status Write(const WriteOptions& options, WriteBatch* updates) override { return Status::NotSupported("Not supported operation in read only mode."); diff --git a/db/db_iter.cc b/db/db_iter.cc index 571e8ee2a..065b8e4fc 100644 --- a/db/db_iter.cc +++ b/db/db_iter.cc @@ -235,6 +235,7 @@ void DBIter::FindNextUserEntryInternal(bool skipping) { } else { switch (ikey.type) { case kTypeDeletion: + case kTypeSingleDeletion: // Arrange to skip all upcoming entries for this key since // they are hidden by this deletion. saved_key_.SetKey(ikey.user_key); @@ -308,16 +309,12 @@ void DBIter::MergeValuesNewToOld() { if (!user_comparator_->Equal(ikey.user_key, saved_key_.GetKey())) { // hit the next user key, stop right here break; - } - - if (kTypeDeletion == ikey.type) { + } else if (kTypeDeletion == ikey.type || kTypeSingleDeletion == ikey.type) { // hit a delete with the same user key, stop right here // iter_ is positioned after delete iter_->Next(); break; - } - - if (kTypeValue == ikey.type) { + } else if (kTypeValue == ikey.type) { // hit a put, merge the put value with operands and store the // final result in saved_value_. We are done! // ignore corruption if there is any. @@ -333,13 +330,13 @@ void DBIter::MergeValuesNewToOld() { // iter_ is positioned after put iter_->Next(); return; - } - - if (kTypeMerge == ikey.type) { + } else if (kTypeMerge == ikey.type) { // hit a merge, add the value as an operand and run associative merge. // when complete, add result to operands and continue. const Slice& val = iter_->value(); operands.push_front(val.ToString()); + } else { + assert(false); } } @@ -433,12 +430,14 @@ void DBIter::PrevInternal() { } // This function checks, if the entry with biggest sequence_number <= sequence_ -// is non kTypeDeletion. If it's not, we save value in saved_value_ +// is non kTypeDeletion or kTypeSingleDeletion. If it's not, we save value in +// saved_value_ bool DBIter::FindValueForCurrentKey() { assert(iter_->Valid()); // Contains operands for merge operator. std::deque operands; - // last entry before merge (could be kTypeDeletion or kTypeValue) + // last entry before merge (could be kTypeDeletion, kTypeSingleDeletion or + // kTypeValue) ValueType last_not_merge_type = kTypeDeletion; ValueType last_key_entry_type = kTypeDeletion; @@ -461,8 +460,9 @@ bool DBIter::FindValueForCurrentKey() { last_not_merge_type = kTypeValue; break; case kTypeDeletion: + case kTypeSingleDeletion: operands.clear(); - last_not_merge_type = kTypeDeletion; + last_not_merge_type = last_key_entry_type; PERF_COUNTER_ADD(internal_delete_skipped_count, 1); break; case kTypeMerge: @@ -482,6 +482,7 @@ bool DBIter::FindValueForCurrentKey() { switch (last_key_entry_type) { case kTypeDeletion: + case kTypeSingleDeletion: valid_ = false; return false; case kTypeMerge: @@ -530,7 +531,8 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() { ParsedInternalKey ikey; FindParseableKey(&ikey, kForward); - if (ikey.type == kTypeValue || ikey.type == kTypeDeletion) { + if (ikey.type == kTypeValue || ikey.type == kTypeDeletion || + ikey.type == kTypeSingleDeletion) { if (ikey.type == kTypeValue) { saved_value_ = iter_->value().ToString(); valid_ = true; @@ -553,7 +555,7 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() { if (!iter_->Valid() || !user_comparator_->Equal(ikey.user_key, saved_key_.GetKey()) || - ikey.type == kTypeDeletion) { + ikey.type == kTypeDeletion || ikey.type == kTypeSingleDeletion) { { StopWatchNano timer(env_, statistics_ != nullptr); PERF_TIMER_GUARD(merge_operator_time_nanos); diff --git a/db/db_iter_test.cc b/db/db_iter_test.cc index 90ec90831..68c5b158d 100644 --- a/db/db_iter_test.cc +++ b/db/db_iter_test.cc @@ -38,16 +38,20 @@ class TestIterator : public Iterator { iter_(0), cmp(comparator) {} - void AddMerge(std::string argkey, std::string argvalue) { - Add(argkey, kTypeMerge, argvalue); + void AddPut(std::string argkey, std::string argvalue) { + Add(argkey, kTypeValue, argvalue); } void AddDeletion(std::string argkey) { Add(argkey, kTypeDeletion, std::string()); } - void AddPut(std::string argkey, std::string argvalue) { - Add(argkey, kTypeValue, argvalue); + void AddSingleDeletion(std::string argkey) { + Add(argkey, kTypeSingleDeletion, std::string()); + } + + void AddMerge(std::string argkey, std::string argvalue) { + Add(argkey, kTypeMerge, argvalue); } void Add(std::string argkey, ValueType type, std::string argvalue) { @@ -891,6 +895,8 @@ TEST_F(DBIteratorTest, DBIterator1) { db_iter->Next(); ASSERT_TRUE(db_iter->Valid()); ASSERT_EQ(db_iter->key().ToString(), "b"); + db_iter->Next(); + ASSERT_FALSE(db_iter->Valid()); } TEST_F(DBIteratorTest, DBIterator2) { @@ -1787,6 +1793,58 @@ TEST_F(DBIteratorTest, SeekToLastOccurrenceSeq0) { ASSERT_FALSE(db_iter->Valid()); } +TEST_F(DBIteratorTest, DBIterator11) { + Options options; + options.merge_operator = MergeOperators::CreateFromStringId("stringappend"); + + TestIterator* internal_iter = new TestIterator(BytewiseComparator()); + internal_iter->AddPut("a", "0"); + internal_iter->AddPut("b", "0"); + internal_iter->AddSingleDeletion("b"); + internal_iter->AddMerge("a", "1"); + internal_iter->AddMerge("b", "2"); + internal_iter->Finish(); + + std::unique_ptr db_iter(NewDBIterator( + env_, ImmutableCFOptions(options), BytewiseComparator(), internal_iter, 1, + options.max_sequential_skip_in_iterations)); + db_iter->SeekToFirst(); + ASSERT_TRUE(db_iter->Valid()); + ASSERT_EQ(db_iter->key().ToString(), "a"); + ASSERT_EQ(db_iter->value().ToString(), "0"); + db_iter->Next(); + ASSERT_TRUE(db_iter->Valid()); + ASSERT_EQ(db_iter->key().ToString(), "b"); + db_iter->Next(); + ASSERT_FALSE(db_iter->Valid()); +} + +TEST_F(DBIteratorTest, DBIterator12) { + Options options; + options.merge_operator = nullptr; + + TestIterator* internal_iter = new TestIterator(BytewiseComparator()); + internal_iter->AddPut("a", "1"); + internal_iter->AddPut("b", "2"); + internal_iter->AddPut("c", "3"); + internal_iter->AddSingleDeletion("b"); + internal_iter->Finish(); + + std::unique_ptr db_iter( + NewDBIterator(env_, ImmutableCFOptions(options), BytewiseComparator(), + internal_iter, 10, 0)); + db_iter->SeekToLast(); + ASSERT_TRUE(db_iter->Valid()); + ASSERT_EQ(db_iter->key().ToString(), "c"); + ASSERT_EQ(db_iter->value().ToString(), "3"); + db_iter->Prev(); + ASSERT_TRUE(db_iter->Valid()); + ASSERT_EQ(db_iter->key().ToString(), "a"); + ASSERT_EQ(db_iter->value().ToString(), "1"); + db_iter->Prev(); + ASSERT_FALSE(db_iter->Valid()); +} + class DBIterWithMergeIterTest : public testing::Test { public: DBIterWithMergeIterTest() diff --git a/db/db_test.cc b/db/db_test.cc index 16b962078..5e874ea6f 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -854,6 +854,107 @@ TEST_F(DBTest, PutDeleteGet) { } while (ChangeOptions()); } +TEST_F(DBTest, PutSingleDeleteGet) { + do { + CreateAndReopenWithCF({"pikachu"}, CurrentOptions()); + ASSERT_OK(Put(1, "foo", "v1")); + ASSERT_EQ("v1", Get(1, "foo")); + ASSERT_OK(Put(1, "foo2", "v2")); + ASSERT_EQ("v2", Get(1, "foo2")); + ASSERT_OK(SingleDelete(1, "foo")); + ASSERT_EQ("NOT_FOUND", Get(1, "foo")); + // Skip HashCuckooRep as it does not support single delete. FIFO and + // universal compaction do not apply to the test case. Skip MergePut + // because single delete does not get removed when it encounters a merge. + } while (ChangeOptions(kSkipHashCuckoo | kSkipFIFOCompaction | + kSkipUniversalCompaction | kSkipMergePut)); +} + +TEST_F(DBTest, SingleDeleteFlush) { + // Test to check whether flushing preserves a single delete hidden + // behind a put. + do { + Random rnd(301); + + Options options = CurrentOptions(); + options.disable_auto_compactions = true; + CreateAndReopenWithCF({"pikachu"}, options); + + // Put values on second level (so that they will not be in the same + // compaction as the other operations. + Put(1, "foo", "first"); + Put(1, "bar", "one"); + ASSERT_OK(Flush(1)); + MoveFilesToLevel(2, 1); + + // (Single) delete hidden by a put + SingleDelete(1, "foo"); + Put(1, "foo", "second"); + Delete(1, "bar"); + Put(1, "bar", "two"); + ASSERT_OK(Flush(1)); + + SingleDelete(1, "foo"); + Delete(1, "bar"); + ASSERT_OK(Flush(1)); + + dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr, + nullptr); + + ASSERT_EQ("NOT_FOUND", Get(1, "bar")); + ASSERT_EQ("NOT_FOUND", Get(1, "foo")); + // Skip HashCuckooRep as it does not support single delete. FIFO and + // universal compaction do not apply to the test case. Skip MergePut + // because merges cannot be combined with single deletions. + } while (ChangeOptions(kSkipHashCuckoo | kSkipFIFOCompaction | + kSkipUniversalCompaction | kSkipMergePut)); +} + +TEST_F(DBTest, SingleDeletePutFlush) { + // Single deletes that encounter the matching put in a flush should get + // removed. + do { + Random rnd(301); + + Options options = CurrentOptions(); + options.disable_auto_compactions = true; + CreateAndReopenWithCF({"pikachu"}, options); + + Put(1, "foo", Slice()); + Put(1, "a", Slice()); + SingleDelete(1, "a"); + ASSERT_OK(Flush(1)); + + ASSERT_EQ("[ ]", AllEntriesFor("a", 1)); + // Skip HashCuckooRep as it does not support single delete. FIFO and + // universal compaction do not apply to the test case. Skip MergePut + // because merges cannot be combined with single deletions. + } while (ChangeOptions(kSkipHashCuckoo | kSkipFIFOCompaction | + kSkipUniversalCompaction | kSkipMergePut)); +} + +TEST_F(DBTest, EmptyFlush) { + // It is possible to produce empty flushes when using single deletes. Tests + // whether empty flushes cause issues. + do { + Random rnd(301); + + Options options = CurrentOptions(); + options.disable_auto_compactions = true; + CreateAndReopenWithCF({"pikachu"}, options); + + Put(1, "a", Slice()); + SingleDelete(1, "a"); + ASSERT_OK(Flush(1)); + + ASSERT_EQ("[ ]", AllEntriesFor("a", 1)); + // Skip HashCuckooRep as it does not support single delete. FIFO and + // universal compaction do not apply to the test case. Skip MergePut + // because merges cannot be combined with single deletions. + } while (ChangeOptions(kSkipHashCuckoo | kSkipFIFOCompaction | + kSkipUniversalCompaction | kSkipMergePut)); +} + TEST_F(DBTest, GetFromImmutableLayer) { do { Options options; @@ -3548,6 +3649,54 @@ TEST_F(DBTest, CompactBetweenSnapshots) { } while (ChangeOptions(kSkipHashCuckoo | kSkipFIFOCompaction)); } +TEST_F(DBTest, UnremovableSingleDelete) { + // If we compact: + // + // Put(A, v1) Snapshot SingleDelete(A) Put(A, v2) + // + // We do not want to end up with: + // + // Put(A, v1) Snapshot Put(A, v2) + // + // Because a subsequent SingleDelete(A) would delete the Put(A, v2) + // but not Put(A, v1), so Get(A) would return v1. + anon::OptionsOverride options_override; + options_override.skip_policy = kSkipNoSnapshot; + do { + Options options = CurrentOptions(options_override); + options.disable_auto_compactions = true; + CreateAndReopenWithCF({"pikachu"}, options); + + Put(1, "foo", "first"); + const Snapshot* snapshot = db_->GetSnapshot(); + SingleDelete(1, "foo"); + Put(1, "foo", "second"); + ASSERT_OK(Flush(1)); + + ASSERT_EQ("first", Get(1, "foo", snapshot)); + ASSERT_EQ("second", Get(1, "foo")); + + dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr, + nullptr); + ASSERT_EQ("[ second, SDEL, first ]", AllEntriesFor("foo", 1)); + + SingleDelete(1, "foo"); + + ASSERT_EQ("first", Get(1, "foo", snapshot)); + ASSERT_EQ("NOT_FOUND", Get(1, "foo")); + + dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr, + nullptr); + + ASSERT_EQ("first", Get(1, "foo", snapshot)); + ASSERT_EQ("NOT_FOUND", Get(1, "foo")); + // Skip HashCuckooRep as it does not support single delete. FIFO and + // universal compaction do not apply to the test case. Skip MergePut + // because single delete does not get removed when it encounters a merge. + } while (ChangeOptions(kSkipHashCuckoo | kSkipFIFOCompaction | + kSkipUniversalCompaction | kSkipMergePut)); +} + TEST_F(DBTest, DeletionMarkers1) { Options options = CurrentOptions(); options.max_background_flushes = 0; @@ -5361,13 +5510,6 @@ class ModelDB: public DB { batch.Put(cf, k, v); return Write(o, &batch); } - using DB::Merge; - virtual Status Merge(const WriteOptions& o, ColumnFamilyHandle* cf, - const Slice& k, const Slice& v) override { - WriteBatch batch; - batch.Merge(cf, k, v); - return Write(o, &batch); - } using DB::Delete; virtual Status Delete(const WriteOptions& o, ColumnFamilyHandle* cf, const Slice& key) override { @@ -5375,6 +5517,20 @@ class ModelDB: public DB { batch.Delete(cf, key); return Write(o, &batch); } + using DB::SingleDelete; + virtual Status SingleDelete(const WriteOptions& o, ColumnFamilyHandle* cf, + const Slice& key) override { + WriteBatch batch; + batch.SingleDelete(cf, key); + return Write(o, &batch); + } + using DB::Merge; + virtual Status Merge(const WriteOptions& o, ColumnFamilyHandle* cf, + const Slice& k, const Slice& v) override { + WriteBatch batch; + batch.Merge(cf, k, v); + return Write(o, &batch); + } using DB::Get; virtual Status Get(const ReadOptions& options, ColumnFamilyHandle* cf, const Slice& key, std::string* value) override { diff --git a/db/dbformat.cc b/db/dbformat.cc index 8b87604b2..eb19a7b17 100644 --- a/db/dbformat.cc +++ b/db/dbformat.cc @@ -22,7 +22,7 @@ namespace rocksdb { uint64_t PackSequenceAndType(uint64_t seq, ValueType t) { assert(seq <= kMaxSequenceNumber); - assert(t <= kValueTypeForSeek); + assert(IsValueType(t)); return (seq << 8) | t; } @@ -31,7 +31,7 @@ void UnPackSequenceAndType(uint64_t packed, uint64_t* seq, ValueType* t) { *t = static_cast(packed & 0xff); assert(*seq <= kMaxSequenceNumber); - assert(*t <= kValueTypeForSeek); + assert(IsValueType(*t)); } void AppendInternalKey(std::string* result, const ParsedInternalKey& key) { diff --git a/db/dbformat.h b/db/dbformat.h index c3ff2ef2f..52f18e6c5 100644 --- a/db/dbformat.h +++ b/db/dbformat.h @@ -33,13 +33,13 @@ enum ValueType : unsigned char { kTypeDeletion = 0x0, kTypeValue = 0x1, kTypeMerge = 0x2, - // Following types are used only in write ahead logs. They are not used in - // memtables or sst files: - kTypeLogData = 0x3, - kTypeColumnFamilyDeletion = 0x4, - kTypeColumnFamilyValue = 0x5, - kTypeColumnFamilyMerge = 0x6, - kMaxValue = 0x7F + kTypeLogData = 0x3, // WAL only. + kTypeColumnFamilyDeletion = 0x4, // WAL only. + kTypeColumnFamilyValue = 0x5, // WAL only. + kTypeColumnFamilyMerge = 0x6, // WAL only. + kTypeSingleDeletion = 0x7, + kTypeColumnFamilySingleDeletion = 0x8, // WAL only. + kMaxValue = 0x7F // Not used for storing records. }; // kValueTypeForSeek defines the ValueType that should be passed when @@ -48,7 +48,13 @@ enum ValueType : unsigned char { // and the value type is embedded as the low 8 bits in the sequence // number in internal keys, we need to use the highest-numbered // ValueType, not the lowest). -static const ValueType kValueTypeForSeek = kTypeMerge; +static const ValueType kValueTypeForSeek = kTypeSingleDeletion; + +// Checks whether a type is a value type (i.e. a type used in memtables and sst +// files). +inline bool IsValueType(ValueType t) { + return t <= kTypeMerge || t == kTypeSingleDeletion; +} // We leave eight bits empty at the bottom so a type and sequence# // can be packed together into 64-bits. @@ -193,13 +199,12 @@ inline bool ParseInternalKey(const Slice& internal_key, result->type = static_cast(c); assert(result->type <= ValueType::kMaxValue); result->user_key = Slice(internal_key.data(), n - 8); - return (c <= static_cast(kValueTypeForSeek)); + return IsValueType(result->type); } // Update the sequence number in the internal key. // Guarantees not to invalidate ikey.data(). -inline void UpdateInternalKey(std::string* ikey, - uint64_t seq, ValueType t) { +inline void UpdateInternalKey(std::string* ikey, uint64_t seq, ValueType t) { size_t ikey_sz = ikey->size(); assert(ikey_sz >= 8); uint64_t newval = (seq << 8) | t; @@ -272,6 +277,11 @@ class IterKey { Slice GetKey() const { return Slice(key_, key_size_); } + Slice GetUserKey() const { + assert(key_size_ >= 8); + return Slice(key_, key_size_ - 8); + } + size_t Size() { return key_size_; } void Clear() { key_size_ = 0; } @@ -304,11 +314,30 @@ class IterKey { memcpy(key_ + shared_len, non_shared_data, non_shared_len); } - void SetKey(const Slice& key) { + Slice SetKey(const Slice& key) { size_t size = key.size(); EnlargeBufferIfNeeded(size); memcpy(key_, key.data(), size); key_size_ = size; + return Slice(key_, key_size_); + } + + // Copies the content of key, updates the reference to the user key in ikey + // and returns a Slice referencing the new copy. + Slice SetKey(const Slice& key, ParsedInternalKey* ikey) { + size_t key_n = key.size(); + assert(key_n >= 8); + SetKey(key); + ikey->user_key = Slice(key_, key_n - 8); + return Slice(key_, key_n); + } + + // Update the sequence number in the internal key. Guarantees not to + // invalidate slices to the key (and the user key). + void UpdateInternalKey(uint64_t seq, ValueType t) { + assert(key_size_ >= 8); + uint64_t newval = (seq << 8) | t; + EncodeFixed64(&key_[key_size_ - 8], newval); } void SetInternalKey(const Slice& key_prefix, const Slice& user_key, diff --git a/db/internal_stats.cc b/db/internal_stats.cc index 41f065b9e..4e37c1d08 100644 --- a/db/internal_stats.cc +++ b/db/internal_stats.cc @@ -57,7 +57,7 @@ void PrintLevelStats(char* buf, size_t len, const std::string& name, NumberToHumanString(stats.num_dropped_records); snprintf(buf, len, - "%4s %6d/%-3d %8.0f %5.1f " /* Level, Files, Size(MB), Score */ + "%4s %6d/%-3d %8.2f %5.1f " /* Level, Files, Size(MB), Score */ "%8.1f " /* Read(GB) */ "%7.1f " /* Rn(GB) */ "%8.1f " /* Rnp1(GB) */ diff --git a/db/memtable.cc b/db/memtable.cc index e712a7b9c..b3692364d 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -444,9 +444,10 @@ static bool SaveValue(void* arg, const char* entry) { *(s->found_final_value) = true; return false; } - case kTypeDeletion: { + case kTypeDeletion: + case kTypeSingleDeletion: { if (*(s->merge_in_progress)) { - assert(merge_operator); + assert(merge_operator != nullptr); *(s->status) = Status::OK(); bool merge_success = false; { diff --git a/db/merge_helper.cc b/db/merge_helper.cc index a87584647..325fc33fa 100644 --- a/db/merge_helper.cc +++ b/db/merge_helper.cc @@ -88,7 +88,8 @@ Status MergeHelper::MergeUntil(Iterator* iter, const SequenceNumber stop_before, if (!ParseInternalKey(iter->key(), &ikey)) { // stop at corrupted key if (assert_valid_internal_key_) { - assert(!"corrupted internal key is not expected"); + assert(!"Corrupted internal key not expected."); + return Status::Corruption("Corrupted internal key not expected."); } break; } else if (!user_comparator_->Equal(ikey.user_key, orig_ikey.user_key)) { @@ -102,8 +103,12 @@ Status MergeHelper::MergeUntil(Iterator* iter, const SequenceNumber stop_before, // At this point we are guaranteed that we need to process this key. - assert(ikey.type <= kValueTypeForSeek); + assert(IsValueType(ikey.type)); if (ikey.type != kTypeMerge) { + // Merges operands can only be used with puts and deletions, single + // deletions are not supported. + assert(ikey.type == kTypeValue || ikey.type == kTypeDeletion); + // hit a put/delete // => merge the put value or a nullptr with operands_ // => store result in operands_.back() (and update keys_.back()) diff --git a/db/merge_helper.h b/db/merge_helper.h index a65639025..39c7126de 100644 --- a/db/merge_helper.h +++ b/db/merge_helper.h @@ -65,7 +65,9 @@ class MergeHelper { // Returns one of the following statuses: // - OK: Entries were successfully merged. // - MergeInProgress: Put/Delete not encountered and unable to merge operands. - // - Corruption: Merge operator reported unsuccessful merge. + // - Corruption: Merge operator reported unsuccessful merge or a corrupted + // key has been encountered and not expected (applies only when compiling + // with asserts removed). // // REQUIRED: The first key in the input is not corrupted. Status MergeUntil(Iterator* iter, const SequenceNumber stop_before = 0, diff --git a/db/merge_helper_test.cc b/db/merge_helper_test.cc index 53b13a2cb..209ede01f 100644 --- a/db/merge_helper_test.cc +++ b/db/merge_helper_test.cc @@ -40,15 +40,10 @@ class MergeHelperTest : public testing::Test { nullptr, Env::Default()); } - std::string Key(const std::string& user_key, const SequenceNumber& seq, - const ValueType& t) { - return InternalKey(user_key, seq, t).Encode().ToString(); - } - void AddKeyVal(const std::string& user_key, const SequenceNumber& seq, const ValueType& t, const std::string& val, bool corrupt = false) { - InternalKey ikey = InternalKey(user_key, seq, t); + InternalKey ikey(user_key, seq, t); if (corrupt) { test::CorruptKeyType(&ikey); } @@ -83,7 +78,7 @@ TEST_F(MergeHelperTest, MergeAtBottomSuccess) { ASSERT_TRUE(RunUInt64MergeHelper(0, true).ok()); ASSERT_EQ(ks_[2], iter_->key()); - ASSERT_EQ(Key("a", 20, kTypeValue), merge_helper_->keys()[0]); + ASSERT_EQ(test::KeyStr("a", 20, kTypeValue), merge_helper_->keys()[0]); ASSERT_EQ(EncodeInt(4U), merge_helper_->values()[0]); ASSERT_EQ(1U, merge_helper_->keys().size()); ASSERT_EQ(1U, merge_helper_->values().size()); @@ -98,7 +93,7 @@ TEST_F(MergeHelperTest, MergeValue) { ASSERT_TRUE(RunUInt64MergeHelper(0, false).ok()); ASSERT_EQ(ks_[3], iter_->key()); - ASSERT_EQ(Key("a", 40, kTypeValue), merge_helper_->keys()[0]); + ASSERT_EQ(test::KeyStr("a", 40, kTypeValue), merge_helper_->keys()[0]); ASSERT_EQ(EncodeInt(8U), merge_helper_->values()[0]); ASSERT_EQ(1U, merge_helper_->keys().size()); ASSERT_EQ(1U, merge_helper_->values().size()); @@ -114,7 +109,7 @@ TEST_F(MergeHelperTest, SnapshotBeforeValue) { ASSERT_TRUE(RunUInt64MergeHelper(31, true).IsMergeInProgress()); ASSERT_EQ(ks_[2], iter_->key()); - ASSERT_EQ(Key("a", 50, kTypeMerge), merge_helper_->keys()[0]); + ASSERT_EQ(test::KeyStr("a", 50, kTypeMerge), merge_helper_->keys()[0]); ASSERT_EQ(EncodeInt(4U), merge_helper_->values()[0]); ASSERT_EQ(1U, merge_helper_->keys().size()); ASSERT_EQ(1U, merge_helper_->values().size()); @@ -129,9 +124,9 @@ TEST_F(MergeHelperTest, NoPartialMerge) { ASSERT_TRUE(RunStringAppendMergeHelper(31, true).IsMergeInProgress()); ASSERT_EQ(ks_[2], iter_->key()); - ASSERT_EQ(Key("a", 40, kTypeMerge), merge_helper_->keys()[0]); + ASSERT_EQ(test::KeyStr("a", 40, kTypeMerge), merge_helper_->keys()[0]); ASSERT_EQ("v", merge_helper_->values()[0]); - ASSERT_EQ(Key("a", 50, kTypeMerge), merge_helper_->keys()[1]); + ASSERT_EQ(test::KeyStr("a", 50, kTypeMerge), merge_helper_->keys()[1]); ASSERT_EQ("v2", merge_helper_->values()[1]); ASSERT_EQ(2U, merge_helper_->keys().size()); ASSERT_EQ(2U, merge_helper_->values().size()); @@ -143,7 +138,7 @@ TEST_F(MergeHelperTest, SingleOperand) { ASSERT_TRUE(RunUInt64MergeHelper(31, true).IsMergeInProgress()); ASSERT_FALSE(iter_->Valid()); - ASSERT_EQ(Key("a", 50, kTypeMerge), merge_helper_->keys()[0]); + ASSERT_EQ(test::KeyStr("a", 50, kTypeMerge), merge_helper_->keys()[0]); ASSERT_EQ(EncodeInt(1U), merge_helper_->values()[0]); ASSERT_EQ(1U, merge_helper_->keys().size()); ASSERT_EQ(1U, merge_helper_->values().size()); @@ -156,7 +151,7 @@ TEST_F(MergeHelperTest, MergeDeletion) { ASSERT_TRUE(RunUInt64MergeHelper(15, false).ok()); ASSERT_FALSE(iter_->Valid()); - ASSERT_EQ(Key("a", 30, kTypeValue), merge_helper_->keys()[0]); + ASSERT_EQ(test::KeyStr("a", 30, kTypeValue), merge_helper_->keys()[0]); ASSERT_EQ(EncodeInt(3U), merge_helper_->values()[0]); ASSERT_EQ(1U, merge_helper_->keys().size()); ASSERT_EQ(1U, merge_helper_->values().size()); @@ -171,7 +166,7 @@ TEST_F(MergeHelperTest, CorruptKey) { ASSERT_TRUE(RunUInt64MergeHelper(15, false).IsMergeInProgress()); ASSERT_EQ(ks_[2], iter_->key()); - ASSERT_EQ(Key("a", 30, kTypeMerge), merge_helper_->keys()[0]); + ASSERT_EQ(test::KeyStr("a", 30, kTypeMerge), merge_helper_->keys()[0]); ASSERT_EQ(EncodeInt(4U), merge_helper_->values()[0]); ASSERT_EQ(1U, merge_helper_->keys().size()); ASSERT_EQ(1U, merge_helper_->values().size()); diff --git a/db/table_properties_collector.cc b/db/table_properties_collector.cc index 2e0a67972..c14ecec11 100644 --- a/db/table_properties_collector.cc +++ b/db/table_properties_collector.cc @@ -19,7 +19,9 @@ Status InternalKeyPropertiesCollector::InternalAdd(const Slice& key, return Status::InvalidArgument("Invalid internal key"); } - if (ikey.type == ValueType::kTypeDeletion) { + // Note: We count both, deletions and single deletions here. + if (ikey.type == ValueType::kTypeDeletion || + ikey.type == ValueType::kTypeSingleDeletion) { ++deleted_keys_; } @@ -47,18 +49,22 @@ InternalKeyPropertiesCollector::GetReadableProperties() const { } namespace { + EntryType GetEntryType(ValueType value_type) { switch (value_type) { case kTypeValue: return kEntryPut; case kTypeDeletion: return kEntryDelete; + case kTypeSingleDeletion: + return kEntrySingleDelete; case kTypeMerge: return kEntryMerge; default: return kEntryOther; } } + } // namespace Status UserKeyTablePropertiesCollector::InternalAdd(const Slice& key, diff --git a/db/table_properties_collector_test.cc b/db/table_properties_collector_test.cc index f9ebdcd2b..0eeed8191 100644 --- a/db/table_properties_collector_test.cc +++ b/db/table_properties_collector_test.cc @@ -6,6 +6,7 @@ #include #include #include +#include #include #include "db/db_impl.h" @@ -58,16 +59,19 @@ class RegularKeysStartWithA: public TablePropertiesCollector { std::string encoded; std::string encoded_num_puts; std::string encoded_num_deletes; + std::string encoded_num_single_deletes; std::string encoded_num_size_changes; PutVarint32(&encoded, count_); PutVarint32(&encoded_num_puts, num_puts_); PutVarint32(&encoded_num_deletes, num_deletes_); + PutVarint32(&encoded_num_single_deletes, num_single_deletes_); PutVarint32(&encoded_num_size_changes, num_size_changes_); *properties = UserCollectedProperties{ {"TablePropertiesTest", message_}, {"Count", encoded}, {"NumPuts", encoded_num_puts}, {"NumDeletes", encoded_num_deletes}, + {"NumSingleDeletes", encoded_num_single_deletes}, {"NumSizeChanges", encoded_num_size_changes}, }; return Status::OK(); @@ -83,6 +87,8 @@ class RegularKeysStartWithA: public TablePropertiesCollector { num_puts_++; } else if (type == kEntryDelete) { num_deletes_++; + } else if (type == kEntrySingleDelete) { + num_single_deletes_++; } if (file_size < file_size_) { message_ = "File size should not decrease."; @@ -102,6 +108,7 @@ class RegularKeysStartWithA: public TablePropertiesCollector { uint32_t count_ = 0; uint32_t num_puts_ = 0; uint32_t num_deletes_ = 0; + uint32_t num_single_deletes_ = 0; uint32_t num_size_changes_ = 0; uint64_t file_size_ = 0; }; @@ -217,18 +224,18 @@ namespace { void TestCustomizedTablePropertiesCollector( bool backward_mode, uint64_t magic_number, bool test_int_tbl_prop_collector, const Options& options, const InternalKeyComparator& internal_comparator) { - const std::string kDeleteFlag = "D"; // make sure the entries will be inserted with order. - std::map kvs = { - {"About ", "val5"}, // starts with 'A' - {"Abstract", "val2"}, // starts with 'A' - {"Around ", "val7"}, // starts with 'A' - {"Beyond ", "val3"}, - {"Builder ", "val1"}, - {"Love ", kDeleteFlag}, - {"Cancel ", "val4"}, - {"Find ", "val6"}, - {"Rocks ", kDeleteFlag}, + std::map, std::string> kvs = { + {{"About ", kTypeValue}, "val5"}, // starts with 'A' + {{"Abstract", kTypeValue}, "val2"}, // starts with 'A' + {{"Around ", kTypeValue}, "val7"}, // starts with 'A' + {{"Beyond ", kTypeValue}, "val3"}, + {{"Builder ", kTypeValue}, "val1"}, + {{"Love ", kTypeDeletion}, ""}, + {{"Cancel ", kTypeValue}, "val4"}, + {{"Find ", kTypeValue}, "val6"}, + {{"Rocks ", kTypeDeletion}, ""}, + {{"Foo ", kTypeSingleDeletion}, ""}, }; // -- Step 1: build table @@ -248,9 +255,7 @@ void TestCustomizedTablePropertiesCollector( SequenceNumber seqNum = 0U; for (const auto& kv : kvs) { - InternalKey ikey(kv.first, seqNum++, (kv.second != kDeleteFlag) - ? ValueType::kTypeValue - : ValueType::kTypeDeletion); + InternalKey ikey(kv.first.first, seqNum++, kv.first.second); builder->Add(ikey.Encode(), kv.second); } ASSERT_OK(builder->Finish()); @@ -270,31 +275,36 @@ void TestCustomizedTablePropertiesCollector( auto user_collected = props->user_collected_properties; - ASSERT_TRUE(user_collected.find("TablePropertiesTest") != - user_collected.end()); + ASSERT_NE(user_collected.find("TablePropertiesTest"), user_collected.end()); ASSERT_EQ("Rocksdb", user_collected.at("TablePropertiesTest")); uint32_t starts_with_A = 0; - ASSERT_TRUE(user_collected.find("Count") != user_collected.end()); + ASSERT_NE(user_collected.find("Count"), user_collected.end()); Slice key(user_collected.at("Count")); ASSERT_TRUE(GetVarint32(&key, &starts_with_A)); ASSERT_EQ(3u, starts_with_A); if (!backward_mode && !test_int_tbl_prop_collector) { - uint32_t num_deletes; - ASSERT_TRUE(user_collected.find("NumDeletes") != user_collected.end()); - Slice key_deletes(user_collected.at("NumDeletes")); - ASSERT_TRUE(GetVarint32(&key_deletes, &num_deletes)); - ASSERT_EQ(2u, num_deletes); - uint32_t num_puts; - ASSERT_TRUE(user_collected.find("NumPuts") != user_collected.end()); + ASSERT_NE(user_collected.find("NumPuts"), user_collected.end()); Slice key_puts(user_collected.at("NumPuts")); ASSERT_TRUE(GetVarint32(&key_puts, &num_puts)); ASSERT_EQ(7u, num_puts); + uint32_t num_deletes; + ASSERT_NE(user_collected.find("NumDeletes"), user_collected.end()); + Slice key_deletes(user_collected.at("NumDeletes")); + ASSERT_TRUE(GetVarint32(&key_deletes, &num_deletes)); + ASSERT_EQ(2u, num_deletes); + + uint32_t num_single_deletes; + ASSERT_NE(user_collected.find("NumSingleDeletes"), user_collected.end()); + Slice key_single_deletes(user_collected.at("NumSingleDeletes")); + ASSERT_TRUE(GetVarint32(&key_single_deletes, &num_single_deletes)); + ASSERT_EQ(1u, num_single_deletes); + uint32_t num_size_changes; - ASSERT_TRUE(user_collected.find("NumSizeChanges") != user_collected.end()); + ASSERT_NE(user_collected.find("NumSizeChanges"), user_collected.end()); Slice key_size_changes(user_collected.at("NumSizeChanges")); ASSERT_TRUE(GetVarint32(&key_size_changes, &num_size_changes)); ASSERT_GE(num_size_changes, 2u); @@ -350,6 +360,7 @@ void TestInternalKeyPropertiesCollector( InternalKey("X ", 4, ValueType::kTypeDeletion), InternalKey("Y ", 5, ValueType::kTypeDeletion), InternalKey("Z ", 6, ValueType::kTypeDeletion), + InternalKey("a ", 7, ValueType::kTypeSingleDeletion), }; std::unique_ptr builder; @@ -403,27 +414,34 @@ void TestInternalKeyPropertiesCollector( std::unique_ptr props_guard(props); auto user_collected = props->user_collected_properties; uint64_t deleted = GetDeletedKeys(user_collected); - ASSERT_EQ(4u, deleted); + ASSERT_EQ(5u, deleted); // deletes + single-deletes if (sanitized) { uint32_t starts_with_A = 0; - ASSERT_TRUE(user_collected.find("Count") != user_collected.end()); + ASSERT_NE(user_collected.find("Count"), user_collected.end()); Slice key(user_collected.at("Count")); ASSERT_TRUE(GetVarint32(&key, &starts_with_A)); ASSERT_EQ(1u, starts_with_A); if (!backward_mode) { + uint32_t num_puts; + ASSERT_NE(user_collected.find("NumPuts"), user_collected.end()); + Slice key_puts(user_collected.at("NumPuts")); + ASSERT_TRUE(GetVarint32(&key_puts, &num_puts)); + ASSERT_EQ(3u, num_puts); + uint32_t num_deletes; - ASSERT_TRUE(user_collected.find("NumDeletes") != user_collected.end()); + ASSERT_NE(user_collected.find("NumDeletes"), user_collected.end()); Slice key_deletes(user_collected.at("NumDeletes")); ASSERT_TRUE(GetVarint32(&key_deletes, &num_deletes)); ASSERT_EQ(4u, num_deletes); - uint32_t num_puts; - ASSERT_TRUE(user_collected.find("NumPuts") != user_collected.end()); - Slice key_puts(user_collected.at("NumPuts")); - ASSERT_TRUE(GetVarint32(&key_puts, &num_puts)); - ASSERT_EQ(3u, num_puts); + uint32_t num_single_deletes; + ASSERT_NE(user_collected.find("NumSingleDeletes"), + user_collected.end()); + Slice key_single_deletes(user_collected.at("NumSingleDeletes")); + ASSERT_TRUE(GetVarint32(&key_single_deletes, &num_single_deletes)); + ASSERT_EQ(1u, num_single_deletes); } } } diff --git a/db/write_batch.cc b/db/write_batch.cc index f79082ac5..53431b92a 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -13,11 +13,13 @@ // data: record[count] // record := // kTypeValue varstring varstring -// kTypeMerge varstring varstring // kTypeDeletion varstring +// kTypeSingleDeletion varstring +// kTypeMerge varstring varstring // kTypeColumnFamilyValue varint32 varstring varstring -// kTypeColumnFamilyMerge varint32 varstring varstring // kTypeColumnFamilyDeletion varint32 varstring varstring +// kTypeColumnFamilySingleDeletion varint32 varstring varstring +// kTypeColumnFamilyMerge varint32 varstring varstring // varstring := // len: varint32 // data: uint8[len] @@ -110,11 +112,13 @@ Status ReadRecordFromWriteBatch(Slice* input, char* tag, } break; case kTypeColumnFamilyDeletion: + case kTypeColumnFamilySingleDeletion: if (!GetVarint32(input, column_family)) { return Status::Corruption("bad WriteBatch Delete"); } // intentional fallthrough case kTypeDeletion: + case kTypeSingleDeletion: if (!GetLengthPrefixedSlice(input, key)) { return Status::Corruption("bad WriteBatch Delete"); } @@ -173,6 +177,11 @@ Status WriteBatch::Iterate(Handler* handler) const { s = handler->DeleteCF(column_family, key); found++; break; + case kTypeColumnFamilySingleDeletion: + case kTypeSingleDeletion: + s = handler->SingleDeleteCF(column_family, key); + found++; + break; case kTypeColumnFamilyMerge: case kTypeMerge: s = handler->MergeCF(column_family, key, value); @@ -282,6 +291,40 @@ void WriteBatch::Delete(ColumnFamilyHandle* column_family, WriteBatchInternal::Delete(this, GetColumnFamilyID(column_family), key); } +void WriteBatchInternal::SingleDelete(WriteBatch* b, uint32_t column_family_id, + const Slice& key) { + WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1); + if (column_family_id == 0) { + b->rep_.push_back(static_cast(kTypeSingleDeletion)); + } else { + b->rep_.push_back(static_cast(kTypeColumnFamilySingleDeletion)); + PutVarint32(&b->rep_, column_family_id); + } + PutLengthPrefixedSlice(&b->rep_, key); +} + +void WriteBatch::SingleDelete(ColumnFamilyHandle* column_family, + const Slice& key) { + WriteBatchInternal::SingleDelete(this, GetColumnFamilyID(column_family), key); +} + +void WriteBatchInternal::SingleDelete(WriteBatch* b, uint32_t column_family_id, + const SliceParts& key) { + WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1); + if (column_family_id == 0) { + b->rep_.push_back(static_cast(kTypeSingleDeletion)); + } else { + b->rep_.push_back(static_cast(kTypeColumnFamilySingleDeletion)); + PutVarint32(&b->rep_, column_family_id); + } + PutLengthPrefixedSliceParts(&b->rep_, key); +} + +void WriteBatch::SingleDelete(ColumnFamilyHandle* column_family, + const SliceParts& key) { + WriteBatchInternal::SingleDelete(this, GetColumnFamilyID(column_family), key); +} + void WriteBatchInternal::Merge(WriteBatch* b, uint32_t column_family_id, const Slice& key, const Slice& value) { WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1); @@ -466,6 +509,66 @@ class MemTableInserter : public WriteBatch::Handler { return Status::OK(); } + virtual Status DeleteCF(uint32_t column_family_id, + const Slice& key) override { + Status seek_status; + if (!SeekToColumnFamily(column_family_id, &seek_status)) { + ++sequence_; + return seek_status; + } + MemTable* mem = cf_mems_->GetMemTable(); + auto* moptions = mem->GetMemTableOptions(); + if (!dont_filter_deletes_ && moptions->filter_deletes) { + SnapshotImpl read_from_snapshot; + read_from_snapshot.number_ = sequence_; + ReadOptions ropts; + ropts.snapshot = &read_from_snapshot; + std::string value; + auto cf_handle = cf_mems_->GetColumnFamilyHandle(); + if (cf_handle == nullptr) { + cf_handle = db_->DefaultColumnFamily(); + } + if (!db_->KeyMayExist(ropts, cf_handle, key, &value)) { + RecordTick(moptions->statistics, NUMBER_FILTERED_DELETES); + return Status::OK(); + } + } + mem->Add(sequence_, kTypeDeletion, key, Slice()); + sequence_++; + cf_mems_->CheckMemtableFull(); + return Status::OK(); + } + + virtual Status SingleDeleteCF(uint32_t column_family_id, + const Slice& key) override { + Status seek_status; + if (!SeekToColumnFamily(column_family_id, &seek_status)) { + ++sequence_; + return seek_status; + } + MemTable* mem = cf_mems_->GetMemTable(); + auto* moptions = mem->GetMemTableOptions(); + if (!dont_filter_deletes_ && moptions->filter_deletes) { + SnapshotImpl read_from_snapshot; + read_from_snapshot.number_ = sequence_; + ReadOptions ropts; + ropts.snapshot = &read_from_snapshot; + std::string value; + auto cf_handle = cf_mems_->GetColumnFamilyHandle(); + if (cf_handle == nullptr) { + cf_handle = db_->DefaultColumnFamily(); + } + if (!db_->KeyMayExist(ropts, cf_handle, key, &value)) { + RecordTick(moptions->statistics, NUMBER_FILTERED_DELETES); + return Status::OK(); + } + } + mem->Add(sequence_, kTypeSingleDeletion, key, Slice()); + sequence_++; + cf_mems_->CheckMemtableFull(); + return Status::OK(); + } + virtual Status MergeCF(uint32_t column_family_id, const Slice& key, const Slice& value) override { Status seek_status; @@ -545,36 +648,6 @@ class MemTableInserter : public WriteBatch::Handler { cf_mems_->CheckMemtableFull(); return Status::OK(); } - - virtual Status DeleteCF(uint32_t column_family_id, - const Slice& key) override { - Status seek_status; - if (!SeekToColumnFamily(column_family_id, &seek_status)) { - ++sequence_; - return seek_status; - } - MemTable* mem = cf_mems_->GetMemTable(); - auto* moptions = mem->GetMemTableOptions(); - if (!dont_filter_deletes_ && moptions->filter_deletes) { - SnapshotImpl read_from_snapshot; - read_from_snapshot.number_ = sequence_; - ReadOptions ropts; - ropts.snapshot = &read_from_snapshot; - std::string value; - auto cf_handle = cf_mems_->GetColumnFamilyHandle(); - if (cf_handle == nullptr) { - cf_handle = db_->DefaultColumnFamily(); - } - if (!db_->KeyMayExist(ropts, cf_handle, key, &value)) { - RecordTick(moptions->statistics, NUMBER_FILTERED_DELETES); - return Status::OK(); - } - } - mem->Add(sequence_, kTypeDeletion, key, Slice()); - sequence_++; - cf_mems_->CheckMemtableFull(); - return Status::OK(); - } }; } // namespace diff --git a/db/write_batch_base.cc b/db/write_batch_base.cc index 4eca5d259..9f7f00d2c 100644 --- a/db/write_batch_base.cc +++ b/db/write_batch_base.cc @@ -43,6 +43,19 @@ void WriteBatchBase::Delete(const SliceParts& key) { Delete(key_slice); } +void WriteBatchBase::SingleDelete(ColumnFamilyHandle* column_family, + const SliceParts& key) { + std::string key_buf; + Slice key_slice(key, &key_buf); + SingleDelete(column_family, key_slice); +} + +void WriteBatchBase::SingleDelete(const SliceParts& key) { + std::string key_buf; + Slice key_slice(key, &key_buf); + SingleDelete(key_slice); +} + void WriteBatchBase::Merge(ColumnFamilyHandle* column_family, const SliceParts& key, const SliceParts& value) { std::string key_buf, value_buf; diff --git a/db/write_batch_internal.h b/db/write_batch_internal.h index 0718057e8..04db461a0 100644 --- a/db/write_batch_internal.h +++ b/db/write_batch_internal.h @@ -73,6 +73,12 @@ class WriteBatchInternal { static void Delete(WriteBatch* batch, uint32_t column_family_id, const Slice& key); + static void SingleDelete(WriteBatch* batch, uint32_t column_family_id, + const SliceParts& key); + + static void SingleDelete(WriteBatch* batch, uint32_t column_family_id, + const Slice& key); + static void Merge(WriteBatch* batch, uint32_t column_family_id, const Slice& key, const Slice& value); diff --git a/db/write_batch_test.cc b/db/write_batch_test.cc index 077a54fb2..d8c6f8cb0 100644 --- a/db/write_batch_test.cc +++ b/db/write_batch_test.cc @@ -54,6 +54,18 @@ static std::string PrintContents(WriteBatch* b) { state.append(")"); count++; break; + case kTypeDeletion: + state.append("Delete("); + state.append(ikey.user_key.ToString()); + state.append(")"); + count++; + break; + case kTypeSingleDeletion: + state.append("SingleDelete("); + state.append(ikey.user_key.ToString()); + state.append(")"); + count++; + break; case kTypeMerge: state.append("Merge("); state.append(ikey.user_key.ToString()); @@ -62,12 +74,6 @@ static std::string PrintContents(WriteBatch* b) { state.append(")"); count++; break; - case kTypeDeletion: - state.append("Delete("); - state.append(ikey.user_key.ToString()); - state.append(")"); - count++; - break; default: assert(false); break; @@ -151,6 +157,22 @@ TEST_F(WriteBatchTest, Append) { ASSERT_EQ(4, b1.Count()); } +TEST_F(WriteBatchTest, SingleDeletion) { + WriteBatch batch; + WriteBatchInternal::SetSequence(&batch, 100); + ASSERT_EQ("", PrintContents(&batch)); + ASSERT_EQ(0, batch.Count()); + batch.Put("a", "va"); + ASSERT_EQ("Put(a, va)@100", PrintContents(&batch)); + ASSERT_EQ(1, batch.Count()); + batch.SingleDelete("a"); + ASSERT_EQ( + "SingleDelete(a)@101" + "Put(a, va)@100", + PrintContents(&batch)); + ASSERT_EQ(2, batch.Count()); +} + namespace { struct TestHandler : public WriteBatch::Handler { std::string seen; @@ -164,6 +186,26 @@ namespace { } return Status::OK(); } + virtual Status DeleteCF(uint32_t column_family_id, + const Slice& key) override { + if (column_family_id == 0) { + seen += "Delete(" + key.ToString() + ")"; + } else { + seen += "DeleteCF(" + ToString(column_family_id) + ", " + + key.ToString() + ")"; + } + return Status::OK(); + } + virtual Status SingleDeleteCF(uint32_t column_family_id, + const Slice& key) override { + if (column_family_id == 0) { + seen += "SingleDelete(" + key.ToString() + ")"; + } else { + seen += "SingleDeleteCF(" + ToString(column_family_id) + ", " + + key.ToString() + ")"; + } + return Status::OK(); + } virtual Status MergeCF(uint32_t column_family_id, const Slice& key, const Slice& value) override { if (column_family_id == 0) { @@ -177,36 +219,14 @@ namespace { virtual void LogData(const Slice& blob) override { seen += "LogData(" + blob.ToString() + ")"; } - virtual Status DeleteCF(uint32_t column_family_id, - const Slice& key) override { - if (column_family_id == 0) { - seen += "Delete(" + key.ToString() + ")"; - } else { - seen += "DeleteCF(" + ToString(column_family_id) + ", " + - key.ToString() + ")"; - } - return Status::OK(); - } }; } -TEST_F(WriteBatchTest, MergeNotImplemented) { - WriteBatch batch; - batch.Merge(Slice("foo"), Slice("bar")); - ASSERT_EQ(1, batch.Count()); - ASSERT_EQ("Merge(foo, bar)@0", - PrintContents(&batch)); - - WriteBatch::Handler handler; - ASSERT_OK(batch.Iterate(&handler)); -} - TEST_F(WriteBatchTest, PutNotImplemented) { WriteBatch batch; batch.Put(Slice("k1"), Slice("v1")); ASSERT_EQ(1, batch.Count()); - ASSERT_EQ("Put(k1, v1)@0", - PrintContents(&batch)); + ASSERT_EQ("Put(k1, v1)@0", PrintContents(&batch)); WriteBatch::Handler handler; ASSERT_OK(batch.Iterate(&handler)); @@ -216,8 +236,27 @@ TEST_F(WriteBatchTest, DeleteNotImplemented) { WriteBatch batch; batch.Delete(Slice("k2")); ASSERT_EQ(1, batch.Count()); - ASSERT_EQ("Delete(k2)@0", - PrintContents(&batch)); + ASSERT_EQ("Delete(k2)@0", PrintContents(&batch)); + + WriteBatch::Handler handler; + ASSERT_OK(batch.Iterate(&handler)); +} + +TEST_F(WriteBatchTest, SingleDeleteNotImplemented) { + WriteBatch batch; + batch.SingleDelete(Slice("k2")); + ASSERT_EQ(1, batch.Count()); + ASSERT_EQ("SingleDelete(k2)@0", PrintContents(&batch)); + + WriteBatch::Handler handler; + ASSERT_OK(batch.Iterate(&handler)); +} + +TEST_F(WriteBatchTest, MergeNotImplemented) { + WriteBatch batch; + batch.Merge(Slice("foo"), Slice("bar")); + ASSERT_EQ(1, batch.Count()); + ASSERT_EQ("Merge(foo, bar)@0", PrintContents(&batch)); WriteBatch::Handler handler; ASSERT_OK(batch.Iterate(&handler)); @@ -230,27 +269,31 @@ TEST_F(WriteBatchTest, Blob) { batch.Put(Slice("k3"), Slice("v3")); batch.PutLogData(Slice("blob1")); batch.Delete(Slice("k2")); + batch.SingleDelete(Slice("k3")); batch.PutLogData(Slice("blob2")); batch.Merge(Slice("foo"), Slice("bar")); - ASSERT_EQ(5, batch.Count()); - ASSERT_EQ("Merge(foo, bar)@4" - "Put(k1, v1)@0" - "Delete(k2)@3" - "Put(k2, v2)@1" - "Put(k3, v3)@2", - PrintContents(&batch)); + ASSERT_EQ(6, batch.Count()); + ASSERT_EQ( + "Merge(foo, bar)@5" + "Put(k1, v1)@0" + "Delete(k2)@3" + "Put(k2, v2)@1" + "SingleDelete(k3)@4" + "Put(k3, v3)@2", + PrintContents(&batch)); TestHandler handler; batch.Iterate(&handler); ASSERT_EQ( - "Put(k1, v1)" - "Put(k2, v2)" - "Put(k3, v3)" - "LogData(blob1)" - "Delete(k2)" - "LogData(blob2)" - "Merge(foo, bar)", - handler.seen); + "Put(k1, v1)" + "Put(k2, v2)" + "Put(k3, v3)" + "LogData(blob1)" + "Delete(k2)" + "SingleDelete(k3)" + "LogData(blob2)" + "Merge(foo, bar)", + handler.seen); } TEST_F(WriteBatchTest, Continue) { @@ -263,6 +306,16 @@ TEST_F(WriteBatchTest, Continue) { ++num_seen; return TestHandler::PutCF(column_family_id, key, value); } + virtual Status DeleteCF(uint32_t column_family_id, + const Slice& key) override { + ++num_seen; + return TestHandler::DeleteCF(column_family_id, key); + } + virtual Status SingleDeleteCF(uint32_t column_family_id, + const Slice& key) override { + ++num_seen; + return TestHandler::SingleDeleteCF(column_family_id, key); + } virtual Status MergeCF(uint32_t column_family_id, const Slice& key, const Slice& value) override { ++num_seen; @@ -272,27 +325,24 @@ TEST_F(WriteBatchTest, Continue) { ++num_seen; TestHandler::LogData(blob); } - virtual Status DeleteCF(uint32_t column_family_id, - const Slice& key) override { - ++num_seen; - return TestHandler::DeleteCF(column_family_id, key); - } - virtual bool Continue() override { - return num_seen < 3; - } + virtual bool Continue() override { return num_seen < 5; } } handler; batch.Put(Slice("k1"), Slice("v1")); + batch.Put(Slice("k2"), Slice("v2")); batch.PutLogData(Slice("blob1")); batch.Delete(Slice("k1")); + batch.SingleDelete(Slice("k2")); batch.PutLogData(Slice("blob2")); batch.Merge(Slice("foo"), Slice("bar")); batch.Iterate(&handler); ASSERT_EQ( - "Put(k1, v1)" - "LogData(blob1)" - "Delete(k1)", - handler.seen); + "Put(k1, v1)" + "Put(k2, v2)" + "LogData(blob1)" + "Delete(k1)" + "SingleDelete(k2)", + handler.seen); } TEST_F(WriteBatchTest, PutGatherSlices) { @@ -345,6 +395,7 @@ TEST_F(WriteBatchTest, ColumnFamiliesBatchTest) { batch.Put(&two, Slice("twofoo"), Slice("bar2")); batch.Put(&eight, Slice("eightfoo"), Slice("bar8")); batch.Delete(&eight, Slice("eightfoo")); + batch.SingleDelete(&two, Slice("twofoo")); batch.Merge(&three, Slice("threethree"), Slice("3three")); batch.Put(&zero, Slice("foo"), Slice("bar")); batch.Merge(Slice("omom"), Slice("nom")); @@ -356,6 +407,7 @@ TEST_F(WriteBatchTest, ColumnFamiliesBatchTest) { "PutCF(2, twofoo, bar2)" "PutCF(8, eightfoo, bar8)" "DeleteCF(8, eightfoo)" + "SingleDeleteCF(2, twofoo)" "MergeCF(3, threethree, 3three)" "Put(foo, bar)" "Merge(omom, nom)", @@ -370,6 +422,7 @@ TEST_F(WriteBatchTest, ColumnFamiliesBatchWithIndexTest) { batch.Put(&two, Slice("twofoo"), Slice("bar2")); batch.Put(&eight, Slice("eightfoo"), Slice("bar8")); batch.Delete(&eight, Slice("eightfoo")); + batch.SingleDelete(&two, Slice("twofoo")); batch.Merge(&three, Slice("threethree"), Slice("3three")); batch.Put(&zero, Slice("foo"), Slice("bar")); batch.Merge(Slice("omom"), Slice("nom")); @@ -394,6 +447,24 @@ TEST_F(WriteBatchTest, ColumnFamiliesBatchWithIndexTest) { ASSERT_OK(iter->status()); ASSERT_TRUE(!iter->Valid()); + iter.reset(batch.NewIterator(&two)); + iter->Seek("twofoo"); + ASSERT_OK(iter->status()); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(WriteType::kPutRecord, iter->Entry().type); + ASSERT_EQ("twofoo", iter->Entry().key.ToString()); + ASSERT_EQ("bar2", iter->Entry().value.ToString()); + + iter->Next(); + ASSERT_OK(iter->status()); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(WriteType::kSingleDeleteRecord, iter->Entry().type); + ASSERT_EQ("twofoo", iter->Entry().key.ToString()); + + iter->Next(); + ASSERT_OK(iter->status()); + ASSERT_TRUE(!iter->Valid()); + iter.reset(batch.NewIterator()); iter->Seek("gggg"); ASSERT_OK(iter->status()); @@ -439,6 +510,7 @@ TEST_F(WriteBatchTest, ColumnFamiliesBatchWithIndexTest) { "PutCF(2, twofoo, bar2)" "PutCF(8, eightfoo, bar8)" "DeleteCF(8, eightfoo)" + "SingleDeleteCF(2, twofoo)" "MergeCF(3, threethree, 3three)" "Put(foo, bar)" "Merge(omom, nom)", diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index a5640ebb7..c73720951 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -184,6 +184,17 @@ class DB { return Delete(options, DefaultColumnFamily(), key); } + // Remove the database entry for "key". Requires that the key exists + // and was not overwritten. Returns OK on success, and a non-OK status + // on error. It is not an error if "key" did not exist in the database. + // Note: consider setting options.sync = true. + virtual Status SingleDelete(const WriteOptions& options, + ColumnFamilyHandle* column_family, + const Slice& key) = 0; + virtual Status SingleDelete(const WriteOptions& options, const Slice& key) { + return SingleDelete(options, DefaultColumnFamily(), key); + } + // Merge the database entry for "key" with "value". Returns OK on success, // and a non-OK status on error. The semantics of this operation is // determined by the user provided merge_operator when opening DB. diff --git a/include/rocksdb/perf_context.h b/include/rocksdb/perf_context.h index 150393aa6..10cae422d 100644 --- a/include/rocksdb/perf_context.h +++ b/include/rocksdb/perf_context.h @@ -33,7 +33,7 @@ struct PerfContext { // total number of internal keys skipped over during iteration (overwritten or // deleted, to be more specific, hidden by a put or delete of the same key) uint64_t internal_key_skipped_count; - // total number of deletes skipped over during iteration + // total number of deletes and single deletes skipped over during iteration uint64_t internal_delete_skipped_count; uint64_t get_snapshot_time; // total time spent on getting snapshot diff --git a/include/rocksdb/table_properties.h b/include/rocksdb/table_properties.h index 9b3c8148e..28500749a 100644 --- a/include/rocksdb/table_properties.h +++ b/include/rocksdb/table_properties.h @@ -86,6 +86,7 @@ extern const std::string kPropertiesBlock; enum EntryType { kEntryPut, kEntryDelete, + kEntrySingleDelete, kEntryMerge, kEntryOther, }; diff --git a/include/rocksdb/utilities/stackable_db.h b/include/rocksdb/utilities/stackable_db.h index 63e4aaa15..2077f8f87 100644 --- a/include/rocksdb/utilities/stackable_db.h +++ b/include/rocksdb/utilities/stackable_db.h @@ -78,6 +78,13 @@ class StackableDB : public DB { return db_->Delete(wopts, column_family, key); } + using DB::SingleDelete; + virtual Status SingleDelete(const WriteOptions& wopts, + ColumnFamilyHandle* column_family, + const Slice& key) override { + return db_->SingleDelete(wopts, column_family, key); + } + using DB::Merge; virtual Status Merge(const WriteOptions& options, ColumnFamilyHandle* column_family, const Slice& key, diff --git a/include/rocksdb/utilities/write_batch_with_index.h b/include/rocksdb/utilities/write_batch_with_index.h index 402e3f3a7..6d75c4b91 100644 --- a/include/rocksdb/utilities/write_batch_with_index.h +++ b/include/rocksdb/utilities/write_batch_with_index.h @@ -29,7 +29,13 @@ class DB; struct ReadOptions; struct DBOptions; -enum WriteType { kPutRecord, kMergeRecord, kDeleteRecord, kLogDataRecord }; +enum WriteType { + kPutRecord, + kMergeRecord, + kDeleteRecord, + kSingleDeleteRecord, + kLogDataRecord +}; // an entry for Put, Merge or Delete entry for write batches. Used in // WBWIIterator. @@ -101,6 +107,11 @@ class WriteBatchWithIndex : public WriteBatchBase { void Delete(ColumnFamilyHandle* column_family, const Slice& key) override; void Delete(const Slice& key) override; + using WriteBatchBase::SingleDelete; + void SingleDelete(ColumnFamilyHandle* column_family, + const Slice& key) override; + void SingleDelete(const Slice& key) override; + using WriteBatchBase::PutLogData; void PutLogData(const Slice& blob) override; diff --git a/include/rocksdb/write_batch.h b/include/rocksdb/write_batch.h index 7fb4b6e52..e0949b51d 100644 --- a/include/rocksdb/write_batch.h +++ b/include/rocksdb/write_batch.h @@ -60,6 +60,30 @@ class WriteBatch : public WriteBatchBase { Put(nullptr, key, value); } + using WriteBatchBase::Delete; + // If the database contains a mapping for "key", erase it. Else do nothing. + void Delete(ColumnFamilyHandle* column_family, const Slice& key) override; + void Delete(const Slice& key) override { Delete(nullptr, key); } + + // variant that takes SliceParts + void Delete(ColumnFamilyHandle* column_family, + const SliceParts& key) override; + void Delete(const SliceParts& key) override { Delete(nullptr, key); } + + using WriteBatchBase::SingleDelete; + // If the database contains a mapping for "key", erase it. Expects that the + // key was not overwritten. Else do nothing. + void SingleDelete(ColumnFamilyHandle* column_family, + const Slice& key) override; + void SingleDelete(const Slice& key) override { SingleDelete(nullptr, key); } + + // variant that takes SliceParts + void SingleDelete(ColumnFamilyHandle* column_family, + const SliceParts& key) override; + void SingleDelete(const SliceParts& key) override { + SingleDelete(nullptr, key); + } + using WriteBatchBase::Merge; // Merge "value" with the existing value of "key" in the database. // "key->merge(existing, value)" @@ -76,16 +100,6 @@ class WriteBatch : public WriteBatchBase { Merge(nullptr, key, value); } - using WriteBatchBase::Delete; - // If the database contains a mapping for "key", erase it. Else do nothing. - void Delete(ColumnFamilyHandle* column_family, const Slice& key) override; - void Delete(const Slice& key) override { Delete(nullptr, key); } - - // variant that takes SliceParts - void Delete(ColumnFamilyHandle* column_family, - const SliceParts& key) override; - void Delete(const SliceParts& key) override { Delete(nullptr, key); } - using WriteBatchBase::PutLogData; // Append a blob of arbitrary size to the records in this batch. The blob will // be stored in the transaction log but not in any other file. In particular, @@ -135,6 +149,26 @@ class WriteBatch : public WriteBatchBase { } virtual void Put(const Slice& key, const Slice& value) {} + virtual Status DeleteCF(uint32_t column_family_id, const Slice& key) { + if (column_family_id == 0) { + Delete(key); + return Status::OK(); + } + return Status::InvalidArgument( + "non-default column family and DeleteCF not implemented"); + } + virtual void Delete(const Slice& key) {} + + virtual Status SingleDeleteCF(uint32_t column_family_id, const Slice& key) { + if (column_family_id == 0) { + SingleDelete(key); + return Status::OK(); + } + return Status::InvalidArgument( + "non-default column family and SingleDeleteCF not implemented"); + } + virtual void SingleDelete(const Slice& key) {} + // Merge and LogData are not pure virtual. Otherwise, we would break // existing clients of Handler on a source code level. The default // implementation of Merge does nothing. @@ -151,15 +185,6 @@ class WriteBatch : public WriteBatchBase { // The default implementation of LogData does nothing. virtual void LogData(const Slice& blob); - virtual Status DeleteCF(uint32_t column_family_id, const Slice& key) { - if (column_family_id == 0) { - Delete(key); - return Status::OK(); - } - return Status::InvalidArgument( - "non-default column family and DeleteCF not implemented"); - } - virtual void Delete(const Slice& key) {} // Continue is called by WriteBatch::Iterate. If it returns false, // iteration is halted. Otherwise, it continues iterating. The default diff --git a/include/rocksdb/write_batch_base.h b/include/rocksdb/write_batch_base.h index d64ffc200..c4083754d 100644 --- a/include/rocksdb/write_batch_base.h +++ b/include/rocksdb/write_batch_base.h @@ -54,6 +54,17 @@ class WriteBatchBase { virtual void Delete(ColumnFamilyHandle* column_family, const SliceParts& key); virtual void Delete(const SliceParts& key); + // If the database contains a mapping for "key", erase it. Expects that the + // key was not overwritten. Else do nothing. + virtual void SingleDelete(ColumnFamilyHandle* column_family, + const Slice& key) = 0; + virtual void SingleDelete(const Slice& key) = 0; + + // variant that takes SliceParts + virtual void SingleDelete(ColumnFamilyHandle* column_family, + const SliceParts& key); + virtual void SingleDelete(const SliceParts& key); + // Append a blob of arbitrary size to the records in this batch. The blob will // be stored in the transaction log but not in any other file. In particular, // it will not be persisted to the SST files. When iterating over this diff --git a/table/get_context.cc b/table/get_context.cc index 77ac8f4f7..bfa53d7a4 100644 --- a/table/get_context.cc +++ b/table/get_context.cc @@ -102,6 +102,9 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key, return false; case kTypeDeletion: + case kTypeSingleDeletion: + // TODO(noetzli): Verify correctness once merge of single-deletes + // is supported assert(state_ == kNotFound || state_ == kMerge); if (kNotFound == state_) { state_ = kDeleted; diff --git a/tools/db_stress.cc b/tools/db_stress.cc index b6e46c2fa..5b2e4584e 100644 --- a/tools/db_stress.cc +++ b/tools/db_stress.cc @@ -86,6 +86,7 @@ DEFINE_int64(max_key, 1 * KB* KB, DEFINE_int32(column_families, 10, "Number of column families"); +// TODO(noetzli) Add support for single deletes DEFINE_bool(test_batches_snapshots, false, "If set, the test uses MultiGet(), MultiPut() and MultiDelete()" " which read/write/delete multiple keys in a batch. In this mode," @@ -318,6 +319,12 @@ DEFINE_int32(delpercent, 15, static const bool FLAGS_delpercent_dummy __attribute__((unused)) = RegisterFlagValidator(&FLAGS_delpercent, &ValidateInt32Percent); +DEFINE_int32(nooverwritepercent, 60, + "Ratio of keys without overwrite to total workload (expressed as " + " a percentage)"); +static const bool FLAGS_nooverwritepercent_dummy __attribute__((__unused__)) = + RegisterFlagValidator(&FLAGS_nooverwritepercent, &ValidateInt32Percent); + DEFINE_int32(iterpercent, 10, "Ratio of iterations to total workload" " (expressed as a percentage)"); static const bool FLAGS_iterpercent_dummy __attribute__((unused)) = @@ -453,6 +460,7 @@ class Stats { long prefixes_; long writes_; long deletes_; + size_t single_deletes_; long iterator_size_sums_; long founds_; long iterations_; @@ -473,6 +481,7 @@ class Stats { prefixes_ = 0; writes_ = 0; deletes_ = 0; + single_deletes_ = 0; iterator_size_sums_ = 0; founds_ = 0; iterations_ = 0; @@ -491,6 +500,7 @@ class Stats { prefixes_ += other.prefixes_; writes_ += other.writes_; deletes_ += other.deletes_; + single_deletes_ += other.single_deletes_; iterator_size_sums_ += other.iterator_size_sums_; founds_ += other.founds_; iterations_ += other.iterations_; @@ -555,6 +565,8 @@ class Stats { deletes_ += n; } + void AddSingleDeletes(size_t n) { single_deletes_ += n; } + void AddErrors(int n) { errors_ += n; } @@ -578,6 +590,7 @@ class Stats { "", bytes_mb, rate, (100*writes_)/done_, done_); fprintf(stdout, "%-12s: Wrote %ld times\n", "", writes_); fprintf(stdout, "%-12s: Deleted %ld times\n", "", deletes_); + fprintf(stdout, "%-12s: Single deleted %ld times\n", "", single_deletes_); fprintf(stdout, "%-12s: %ld read and %ld found the key\n", "", gets_, founds_); fprintf(stdout, "%-12s: Prefix scanned %ld times\n", "", prefixes_); @@ -613,7 +626,25 @@ class SharedState { should_stop_bg_thread_(false), bg_thread_finished_(false), stress_test_(stress_test), - verification_failure_(false) { + verification_failure_(false), + no_overwrite_ids_(FLAGS_column_families) { + // Pick random keys in each column family that will not experience + // overwrite + + printf("Choosing random keys with no overwrite\n"); + Random rnd(seed_); + size_t num_no_overwrite_keys = (max_key_ * FLAGS_nooverwritepercent) / 100; + for (auto& cf_ids : no_overwrite_ids_) { + for (size_t i = 0; i < num_no_overwrite_keys; i++) { + size_t rand_key; + do { + rand_key = rnd.Next() % max_key_; + } while (cf_ids.find(rand_key) != cf_ids.end()); + cf_ids.insert(rand_key); + } + assert(cf_ids.size() == num_no_overwrite_keys); + } + if (FLAGS_test_batches_snapshots) { fprintf(stdout, "No lock creation because test_batches_snapshots set\n"); return; @@ -741,6 +772,14 @@ class SharedState { void Delete(int cf, long key) { values_[cf][key] = SENTINEL; } + void SingleDelete(int cf, size_t key) { values_[cf][key] = SENTINEL; } + + bool AllowsOverwrite(int cf, size_t key) { + return no_overwrite_ids_[cf].find(key) == no_overwrite_ids_[cf].end(); + } + + bool Exists(int cf, size_t key) { return values_[cf][key] != SENTINEL; } + uint32_t GetSeed() const { return seed_; } void SetShouldStopBgThread() { should_stop_bg_thread_ = true; } @@ -769,6 +808,9 @@ class SharedState { StressTest* stress_test_; std::atomic verification_failure_; + // Keys that should not be overwritten + std::vector > no_overwrite_ids_; + std::vector> values_; // Has to make it owned by a smart ptr as port::Mutex is not copyable // and storing it in the container may require copying depending on the impl. @@ -1445,6 +1487,7 @@ class StressTest { void OperateDb(ThreadState* thread) { ReadOptions read_opts(FLAGS_verify_checksum, true); WriteOptions write_opts; + auto shared = thread->shared; char value[100]; long max_key = thread->shared->GetMaxKey(); std::string from_db; @@ -1529,14 +1572,14 @@ class StressTest { int rand_column_family = thread->rand.Next() % FLAGS_column_families; std::string keystr = Key(rand_key); Slice key = keystr; - int prob_op = thread->rand.Uniform(100); std::unique_ptr l; if (!FLAGS_test_batches_snapshots) { l.reset(new MutexLock( - thread->shared->GetMutexForKey(rand_column_family, rand_key))); + shared->GetMutexForKey(rand_column_family, rand_key))); } auto column_family = column_families_[rand_column_family]; + int prob_op = thread->rand.Uniform(100); if (prob_op >= 0 && prob_op < (int)FLAGS_readpercent) { // OPERATION read if (!FLAGS_test_batches_snapshots) { @@ -1585,16 +1628,31 @@ class StressTest { size_t sz = GenerateValue(value_base, value, sizeof(value)); Slice v(value, sz); if (!FLAGS_test_batches_snapshots) { + // If the chosen key does not allow overwrite and it already + // exists, choose another key. + while (!shared->AllowsOverwrite(rand_column_family, rand_key) && + shared->Exists(rand_column_family, rand_key)) { + l.reset(); + rand_key = thread->rand.Next() % max_key; + rand_column_family = thread->rand.Next() % FLAGS_column_families; + l.reset(new MutexLock( + shared->GetMutexForKey(rand_column_family, rand_key))); + } + + keystr = Key(rand_key); + key = keystr; + column_family = column_families_[rand_column_family]; + if (FLAGS_verify_before_write) { std::string keystr2 = Key(rand_key); Slice k = keystr2; Status s = db_->Get(read_opts, column_family, k, &from_db); - if (VerifyValue(rand_column_family, rand_key, read_opts, - thread->shared, from_db, s, true) == false) { + if (!VerifyValue(rand_column_family, rand_key, read_opts, + thread->shared, from_db, s, true)) { break; } } - thread->shared->Put(rand_column_family, rand_key, value_base); + shared->Put(rand_column_family, rand_key, value_base); Status s; if (FLAGS_use_merge) { s = db_->Merge(write_opts, column_family, key, v); @@ -1614,12 +1672,40 @@ class StressTest { } else if (writeBound <= prob_op && prob_op < delBound) { // OPERATION delete if (!FLAGS_test_batches_snapshots) { - thread->shared->Delete(rand_column_family, rand_key); - Status s = db_->Delete(write_opts, column_family, key); - thread->stats.AddDeletes(1); - if (!s.ok()) { - fprintf(stderr, "delete error: %s\n", s.ToString().c_str()); - std::terminate(); + // If the chosen key does not allow overwrite and it does not exist, + // choose another key. + while (!shared->AllowsOverwrite(rand_column_family, rand_key) && + !shared->Exists(rand_column_family, rand_key)) { + l.reset(); + rand_key = thread->rand.Next() % max_key; + rand_column_family = thread->rand.Next() % FLAGS_column_families; + l.reset(new MutexLock( + shared->GetMutexForKey(rand_column_family, rand_key))); + } + + keystr = Key(rand_key); + key = keystr; + column_family = column_families_[rand_column_family]; + + // Use delete if the key may be overwritten and a single deletion + // otherwise. + if (shared->AllowsOverwrite(rand_column_family, rand_key)) { + shared->Delete(rand_column_family, rand_key); + Status s = db_->Delete(write_opts, column_family, key); + thread->stats.AddDeletes(1); + if (!s.ok()) { + fprintf(stderr, "delete error: %s\n", s.ToString().c_str()); + std::terminate(); + } + } else { + shared->SingleDelete(rand_column_family, rand_key); + Status s = db_->SingleDelete(write_opts, column_family, key); + thread->stats.AddSingleDeletes(1); + if (!s.ok()) { + fprintf(stderr, "single delete error: %s\n", + s.ToString().c_str()); + std::terminate(); + } } } else { MultiDelete(thread, write_opts, column_family, key); @@ -1778,50 +1864,47 @@ class StressTest { } void PrintEnv() const { - fprintf(stdout, "RocksDB version : %d.%d\n", kMajorVersion, + fprintf(stdout, "RocksDB version : %d.%d\n", kMajorVersion, kMinorVersion); - fprintf(stdout, "Column families : %d\n", FLAGS_column_families); + fprintf(stdout, "Column families : %d\n", FLAGS_column_families); if (!FLAGS_test_batches_snapshots) { - fprintf(stdout, "Clear CFs one in : %d\n", + fprintf(stdout, "Clear CFs one in : %d\n", FLAGS_clear_column_family_one_in); } - fprintf(stdout, "Number of threads : %d\n", FLAGS_threads); - fprintf(stdout, - "Ops per thread : %lu\n", + fprintf(stdout, "Number of threads : %d\n", FLAGS_threads); + fprintf(stdout, "Ops per thread : %lu\n", (unsigned long)FLAGS_ops_per_thread); std::string ttl_state("unused"); if (FLAGS_ttl > 0) { ttl_state = NumberToString(FLAGS_ttl); } - fprintf(stdout, "Time to live(sec) : %s\n", ttl_state.c_str()); - fprintf(stdout, "Read percentage : %d%%\n", FLAGS_readpercent); - fprintf(stdout, "Prefix percentage : %d%%\n", FLAGS_prefixpercent); - fprintf(stdout, "Write percentage : %d%%\n", FLAGS_writepercent); - fprintf(stdout, "Delete percentage : %d%%\n", FLAGS_delpercent); - fprintf(stdout, "Iterate percentage : %d%%\n", FLAGS_iterpercent); - fprintf(stdout, "DB-write-buffer-size: %" PRIu64 "\n", - FLAGS_db_write_buffer_size); - fprintf(stdout, "Write-buffer-size : %d\n", FLAGS_write_buffer_size); - fprintf(stdout, - "Iterations : %lu\n", + fprintf(stdout, "Time to live(sec) : %s\n", ttl_state.c_str()); + fprintf(stdout, "Read percentage : %d%%\n", FLAGS_readpercent); + fprintf(stdout, "Prefix percentage : %d%%\n", FLAGS_prefixpercent); + fprintf(stdout, "Write percentage : %d%%\n", FLAGS_writepercent); + fprintf(stdout, "Delete percentage : %d%%\n", FLAGS_delpercent); + fprintf(stdout, "No overwrite percentage : %d%%\n", + FLAGS_nooverwritepercent); + fprintf(stdout, "Iterate percentage : %d%%\n", FLAGS_iterpercent); + fprintf(stdout, "DB-write-buffer-size : %" PRIu64 "\n", + FLAGS_db_write_buffer_size); + fprintf(stdout, "Write-buffer-size : %d\n", + FLAGS_write_buffer_size); + fprintf(stdout, "Iterations : %lu\n", (unsigned long)FLAGS_num_iterations); - fprintf(stdout, - "Max key : %lu\n", + fprintf(stdout, "Max key : %lu\n", (unsigned long)FLAGS_max_key); - fprintf(stdout, "Ratio #ops/#keys : %f\n", - (1.0 * FLAGS_ops_per_thread * FLAGS_threads)/FLAGS_max_key); - fprintf(stdout, "Num times DB reopens: %d\n", FLAGS_reopen); - fprintf(stdout, "Batches/snapshots : %d\n", + fprintf(stdout, "Ratio #ops/#keys : %f\n", + (1.0 * FLAGS_ops_per_thread * FLAGS_threads) / FLAGS_max_key); + fprintf(stdout, "Num times DB reopens : %d\n", FLAGS_reopen); + fprintf(stdout, "Batches/snapshots : %d\n", FLAGS_test_batches_snapshots); - fprintf(stdout, "Deletes use filter : %d\n", - FLAGS_filter_deletes); - fprintf(stdout, "Do update in place : %d\n", - FLAGS_in_place_update); - fprintf(stdout, "Num keys per lock : %d\n", + fprintf(stdout, "Deletes use filter : %d\n", FLAGS_filter_deletes); + fprintf(stdout, "Do update in place : %d\n", FLAGS_in_place_update); + fprintf(stdout, "Num keys per lock : %d\n", 1 << FLAGS_log2_keys_per_lock); - std::string compression = CompressionTypeToString(FLAGS_compression_type_e); - fprintf(stdout, "Compression : %s\n", compression.c_str()); + fprintf(stdout, "Compression : %s\n", compression.c_str()); const char* memtablerep = ""; switch (FLAGS_rep_factory) { @@ -1836,7 +1919,7 @@ class StressTest { break; } - fprintf(stdout, "Memtablerep : %s\n", memtablerep); + fprintf(stdout, "Memtablerep : %s\n", memtablerep); fprintf(stdout, "------------------------------------------------\n"); } diff --git a/util/db_test_util.cc b/util/db_test_util.cc index 186528a2c..b9baa0e86 100644 --- a/util/db_test_util.cc +++ b/util/db_test_util.cc @@ -462,6 +462,14 @@ Status DBTestBase::Delete(int cf, const std::string& k) { return db_->Delete(WriteOptions(), handles_[cf], k); } +Status DBTestBase::SingleDelete(const std::string& k) { + return db_->SingleDelete(WriteOptions(), k); +} + +Status DBTestBase::SingleDelete(int cf, const std::string& k) { + return db_->SingleDelete(WriteOptions(), handles_[cf], k); +} + std::string DBTestBase::Get(const std::string& k, const Snapshot* snapshot) { ReadOptions options; options.verify_checksums = true; @@ -571,6 +579,9 @@ std::string DBTestBase::AllEntriesFor(const Slice& user_key, int cf) { case kTypeDeletion: result += "DEL"; break; + case kTypeSingleDeletion: + result += "SDEL"; + break; default: assert(false); break; diff --git a/util/db_test_util.h b/util/db_test_util.h index 3407e3907..3afdc289a 100644 --- a/util/db_test_util.h +++ b/util/db_test_util.h @@ -554,6 +554,10 @@ class DBTestBase : public testing::Test { Status Delete(int cf, const std::string& k); + Status SingleDelete(const std::string& k); + + Status SingleDelete(int cf, const std::string& k); + std::string Get(const std::string& k, const Snapshot* snapshot = nullptr); std::string Get(int cf, const std::string& k, diff --git a/util/testutil.cc b/util/testutil.cc index 72ecdeec4..5f7422172 100644 --- a/util/testutil.cc +++ b/util/testutil.cc @@ -129,5 +129,14 @@ void CorruptKeyType(InternalKey* ikey) { ikey->DecodeFrom(Slice(keystr.data(), keystr.size())); } +std::string KeyStr(const std::string& user_key, const SequenceNumber& seq, + const ValueType& t, bool corrupt) { + InternalKey k(user_key, seq, t); + if (corrupt) { + CorruptKeyType(&k); + } + return k.Encode().ToString(); +} + } // namespace test } // namespace rocksdb diff --git a/util/testutil.h b/util/testutil.h index 9e452fe8a..45c822e82 100644 --- a/util/testutil.h +++ b/util/testutil.h @@ -277,6 +277,10 @@ class NullLogger : public Logger { // Corrupts key by changing the type extern void CorruptKeyType(InternalKey* ikey); +extern std::string KeyStr(const std::string& user_key, + const SequenceNumber& seq, const ValueType& t, + bool corrupt = false); + class SleepingBackgroundTask { public: SleepingBackgroundTask() diff --git a/utilities/write_batch_with_index/write_batch_with_index.cc b/utilities/write_batch_with_index/write_batch_with_index.cc index e3caa7932..189e557ed 100644 --- a/utilities/write_batch_with_index/write_batch_with_index.cc +++ b/utilities/write_batch_with_index/write_batch_with_index.cc @@ -344,7 +344,7 @@ class WBWIIteratorImpl : public WBWIIterator { &ret.key, &ret.value, &blob); assert(s.ok()); assert(ret.type == kPutRecord || ret.type == kDeleteRecord || - ret.type == kMergeRecord); + ret.type == kSingleDeleteRecord || ret.type == kMergeRecord); return ret; } @@ -580,6 +580,32 @@ void WriteBatchWithIndex::Put(const Slice& key, const Slice& value) { rep->AddOrUpdateIndex(key); } +void WriteBatchWithIndex::Delete(ColumnFamilyHandle* column_family, + const Slice& key) { + rep->SetLastEntryOffset(); + rep->write_batch.Delete(column_family, key); + rep->AddOrUpdateIndex(column_family, key); +} + +void WriteBatchWithIndex::Delete(const Slice& key) { + rep->SetLastEntryOffset(); + rep->write_batch.Delete(key); + rep->AddOrUpdateIndex(key); +} + +void WriteBatchWithIndex::SingleDelete(ColumnFamilyHandle* column_family, + const Slice& key) { + rep->SetLastEntryOffset(); + rep->write_batch.SingleDelete(column_family, key); + rep->AddOrUpdateIndex(column_family, key); +} + +void WriteBatchWithIndex::SingleDelete(const Slice& key) { + rep->SetLastEntryOffset(); + rep->write_batch.SingleDelete(key); + rep->AddOrUpdateIndex(key); +} + void WriteBatchWithIndex::Merge(ColumnFamilyHandle* column_family, const Slice& key, const Slice& value) { rep->SetLastEntryOffset(); @@ -597,19 +623,6 @@ void WriteBatchWithIndex::PutLogData(const Slice& blob) { rep->write_batch.PutLogData(blob); } -void WriteBatchWithIndex::Delete(ColumnFamilyHandle* column_family, - const Slice& key) { - rep->SetLastEntryOffset(); - rep->write_batch.Delete(column_family, key); - rep->AddOrUpdateIndex(column_family, key); -} - -void WriteBatchWithIndex::Delete(const Slice& key) { - rep->SetLastEntryOffset(); - rep->write_batch.Delete(key); - rep->AddOrUpdateIndex(key); -} - void WriteBatchWithIndex::Clear() { rep->Clear(); } Status WriteBatchWithIndex::GetFromBatch(ColumnFamilyHandle* column_family, diff --git a/utilities/write_batch_with_index/write_batch_with_index_internal.cc b/utilities/write_batch_with_index/write_batch_with_index_internal.cc index f5c141121..eda20150c 100644 --- a/utilities/write_batch_with_index/write_batch_with_index_internal.cc +++ b/utilities/write_batch_with_index/write_batch_with_index_internal.cc @@ -5,6 +5,8 @@ #ifndef ROCKSDB_LITE +#include "utilities/write_batch_with_index/write_batch_with_index_internal.h" + #include "db/column_family.h" #include "db/merge_context.h" #include "db/merge_helper.h" @@ -13,7 +15,6 @@ #include "rocksdb/utilities/write_batch_with_index.h" #include "util/coding.h" #include "util/string_util.h" -#include "utilities/write_batch_with_index/write_batch_with_index_internal.h" namespace rocksdb { @@ -53,6 +54,10 @@ Status ReadableWriteBatch::GetEntryFromDataOffset(size_t data_offset, case kTypeDeletion: *type = kDeleteRecord; break; + case kTypeColumnFamilySingleDeletion: + case kTypeSingleDeletion: + *type = kSingleDeleteRecord; + break; case kTypeColumnFamilyMerge: case kTypeMerge: *type = kMergeRecord; diff --git a/utilities/write_batch_with_index/write_batch_with_index_internal.h b/utilities/write_batch_with_index/write_batch_with_index_internal.h index 54bbd81da..3c894ebbb 100644 --- a/utilities/write_batch_with_index/write_batch_with_index_internal.h +++ b/utilities/write_batch_with_index/write_batch_with_index_internal.h @@ -30,7 +30,7 @@ struct WriteBatchIndexEntry { // If this flag appears in the offset, it indicates a key that is smaller // than any other entry for the same column family - static const size_t kFlagMin = UINT_MAX; + static const size_t kFlagMin = std::numeric_limits::max(); size_t offset; // offset of an entry in write batch's string buffer. uint32_t column_family; // column family of the entry