// Copyright (c) 2016-present, Facebook, Inc. All rights reserved. // This source code is licensed under the BSD-style license found in the // LICENSE file in the root directory of this source tree. An additional grant // of patent rights can be found in the PATENTS file in the same directory. #include "db/db_test_util.h" #include "port/stack_trace.h" #include "util/testutil.h" #include "utilities/merge_operators.h" namespace rocksdb { class DBRangeDelTest : public DBTestBase { public: DBRangeDelTest() : DBTestBase("/db_range_del_test") {} std::string GetNumericStr(int key) { uint64_t uint64_key = static_cast(key); std::string str; str.resize(8); memcpy(&str[0], static_cast(&uint64_key), 8); return str; } }; // PlainTableFactory and NumTableFilesAtLevel() are not supported in // ROCKSDB_LITE #ifndef ROCKSDB_LITE TEST_F(DBRangeDelTest, NonBlockBasedTableNotSupported) { Options opts = CurrentOptions(); opts.table_factory.reset(new PlainTableFactory()); opts.prefix_extractor.reset(NewNoopTransform()); opts.allow_mmap_reads = true; opts.max_sequential_skip_in_iterations = 999999; Reopen(opts); ASSERT_TRUE( db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), "dr1", "dr1") .IsNotSupported()); } TEST_F(DBRangeDelTest, FlushOutputHasOnlyRangeTombstones) { ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), "dr1", "dr2")); ASSERT_OK(db_->Flush(FlushOptions())); ASSERT_EQ(1, NumTableFilesAtLevel(0)); } TEST_F(DBRangeDelTest, CompactionOutputHasOnlyRangeTombstone) { Options opts = CurrentOptions(); opts.disable_auto_compactions = true; opts.statistics = CreateDBStatistics(); Reopen(opts); // snapshot protects range tombstone from dropping due to becoming obsolete. const Snapshot* snapshot = db_->GetSnapshot(); db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), "a", "z"); db_->Flush(FlushOptions()); ASSERT_EQ(1, NumTableFilesAtLevel(0)); ASSERT_EQ(0, NumTableFilesAtLevel(1)); dbfull()->TEST_CompactRange(0, nullptr, nullptr, nullptr, true /* disallow_trivial_move */); ASSERT_EQ(0, NumTableFilesAtLevel(0)); ASSERT_EQ(1, NumTableFilesAtLevel(1)); ASSERT_EQ(0, TestGetTickerCount(opts, COMPACTION_RANGE_DEL_DROP_OBSOLETE)); db_->ReleaseSnapshot(snapshot); } TEST_F(DBRangeDelTest, CompactionOutputFilesExactlyFilled) { // regression test for exactly filled compaction output files. Previously // another file would be generated containing all range deletions, which // could invalidate the non-overlapping file boundary invariant. const int kNumPerFile = 4, kNumFiles = 2, kFileBytes = 9 << 10; Options options = CurrentOptions(); options.disable_auto_compactions = true; options.level0_file_num_compaction_trigger = kNumFiles; options.memtable_factory.reset(new SpecialSkipListFactory(kNumPerFile)); options.num_levels = 2; options.target_file_size_base = kFileBytes; BlockBasedTableOptions table_options; table_options.block_size_deviation = 50; // each block holds two keys options.table_factory.reset(NewBlockBasedTableFactory(table_options)); Reopen(options); // snapshot protects range tombstone from dropping due to becoming obsolete. const Snapshot* snapshot = db_->GetSnapshot(); db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), Key(0), Key(1)); Random rnd(301); for (int i = 0; i < kNumFiles; ++i) { std::vector values; // Write 12K (4 values, each 3K) for (int j = 0; j < kNumPerFile; j++) { values.push_back(RandomString(&rnd, 3 << 10)); ASSERT_OK(Put(Key(i * kNumPerFile + j), values[j])); if (j == 0 && i > 0) { dbfull()->TEST_WaitForFlushMemTable(); } } } // put extra key to trigger final flush ASSERT_OK(Put("", "")); dbfull()->TEST_WaitForFlushMemTable(); ASSERT_EQ(kNumFiles, NumTableFilesAtLevel(0)); ASSERT_EQ(0, NumTableFilesAtLevel(1)); dbfull()->TEST_CompactRange(0, nullptr, nullptr, nullptr, true /* disallow_trivial_move */); ASSERT_EQ(0, NumTableFilesAtLevel(0)); ASSERT_EQ(2, NumTableFilesAtLevel(1)); db_->ReleaseSnapshot(snapshot); } TEST_F(DBRangeDelTest, MaxCompactionBytesCutsOutputFiles) { // Ensures range deletion spanning multiple compaction output files that are // cut by max_compaction_bytes will have non-overlapping key-ranges. // https://github.com/facebook/rocksdb/issues/1778 const int kNumFiles = 2, kNumPerFile = 1 << 8, kBytesPerVal = 1 << 12; Options opts = CurrentOptions(); opts.comparator = test::Uint64Comparator(); opts.disable_auto_compactions = true; opts.level0_file_num_compaction_trigger = kNumFiles; opts.max_compaction_bytes = kNumPerFile * kBytesPerVal; opts.memtable_factory.reset(new SpecialSkipListFactory(kNumPerFile)); // Want max_compaction_bytes to trigger the end of compaction output file, not // target_file_size_base, so make the latter much bigger opts.target_file_size_base = 100 * opts.max_compaction_bytes; Reopen(opts); // snapshot protects range tombstone from dropping due to becoming obsolete. const Snapshot* snapshot = db_->GetSnapshot(); // It spans the whole key-range, thus will be included in all output files ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), GetNumericStr(0), GetNumericStr(kNumFiles * kNumPerFile - 1))); Random rnd(301); for (int i = 0; i < kNumFiles; ++i) { std::vector values; // Write 1MB (256 values, each 4K) for (int j = 0; j < kNumPerFile; j++) { values.push_back(RandomString(&rnd, kBytesPerVal)); ASSERT_OK(Put(GetNumericStr(kNumPerFile * i + j), values[j])); } // extra entry to trigger SpecialSkipListFactory's flush ASSERT_OK(Put(GetNumericStr(kNumPerFile), "")); dbfull()->TEST_WaitForFlushMemTable(); ASSERT_EQ(i + 1, NumTableFilesAtLevel(0)); } dbfull()->TEST_CompactRange(0, nullptr, nullptr, nullptr, true /* disallow_trivial_move */); ASSERT_EQ(0, NumTableFilesAtLevel(0)); ASSERT_GE(NumTableFilesAtLevel(1), 2); std::vector> files; dbfull()->TEST_GetFilesMetaData(db_->DefaultColumnFamily(), &files); for (size_t i = 0; i < files[1].size() - 1; ++i) { ASSERT_TRUE(InternalKeyComparator(opts.comparator) .Compare(files[1][i].largest, files[1][i + 1].smallest) < 0); } db_->ReleaseSnapshot(snapshot); } TEST_F(DBRangeDelTest, FlushRangeDelsSameStartKey) { db_->Put(WriteOptions(), "b1", "val"); ASSERT_OK( db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), "a", "c")); db_->Put(WriteOptions(), "b2", "val"); ASSERT_OK( db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), "a", "b")); // first iteration verifies query correctness in memtable, second verifies // query correctness for a single SST file for (int i = 0; i < 2; ++i) { if (i > 0) { ASSERT_OK(db_->Flush(FlushOptions())); ASSERT_EQ(1, NumTableFilesAtLevel(0)); } std::string value; ASSERT_TRUE(db_->Get(ReadOptions(), "b1", &value).IsNotFound()); ASSERT_OK(db_->Get(ReadOptions(), "b2", &value)); } } TEST_F(DBRangeDelTest, CompactRangeDelsSameStartKey) { db_->Put(WriteOptions(), "unused", "val"); // prevents empty after compaction db_->Put(WriteOptions(), "b1", "val"); ASSERT_OK(db_->Flush(FlushOptions())); ASSERT_OK( db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), "a", "c")); ASSERT_OK(db_->Flush(FlushOptions())); ASSERT_OK( db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), "a", "b")); ASSERT_OK(db_->Flush(FlushOptions())); ASSERT_EQ(3, NumTableFilesAtLevel(0)); for (int i = 0; i < 2; ++i) { if (i > 0) { dbfull()->TEST_CompactRange(0, nullptr, nullptr, nullptr, true /* disallow_trivial_move */); ASSERT_EQ(0, NumTableFilesAtLevel(0)); ASSERT_EQ(1, NumTableFilesAtLevel(1)); } std::string value; ASSERT_TRUE(db_->Get(ReadOptions(), "b1", &value).IsNotFound()); } } #endif // ROCKSDB_LITE TEST_F(DBRangeDelTest, FlushRemovesCoveredKeys) { const int kNum = 300, kRangeBegin = 50, kRangeEnd = 250; Options opts = CurrentOptions(); opts.comparator = test::Uint64Comparator(); Reopen(opts); // Write a third before snapshot, a third between snapshot and tombstone, and // a third after the tombstone. Keys older than snapshot or newer than the // tombstone should be preserved. const Snapshot* snapshot = nullptr; for (int i = 0; i < kNum; ++i) { if (i == kNum / 3) { snapshot = db_->GetSnapshot(); } else if (i == 2 * kNum / 3) { db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), GetNumericStr(kRangeBegin), GetNumericStr(kRangeEnd)); } db_->Put(WriteOptions(), GetNumericStr(i), "val"); } db_->Flush(FlushOptions()); for (int i = 0; i < kNum; ++i) { ReadOptions read_opts; read_opts.ignore_range_deletions = true; std::string value; if (i < kRangeBegin || i > kRangeEnd || i < kNum / 3 || i >= 2 * kNum / 3) { ASSERT_OK(db_->Get(read_opts, GetNumericStr(i), &value)); } else { ASSERT_TRUE(db_->Get(read_opts, GetNumericStr(i), &value).IsNotFound()); } } db_->ReleaseSnapshot(snapshot); } // NumTableFilesAtLevel() is not supported in ROCKSDB_LITE #ifndef ROCKSDB_LITE TEST_F(DBRangeDelTest, CompactionRemovesCoveredKeys) { const int kNumPerFile = 100, kNumFiles = 4; Options opts = CurrentOptions(); opts.comparator = test::Uint64Comparator(); opts.disable_auto_compactions = true; opts.memtable_factory.reset(new SpecialSkipListFactory(kNumPerFile)); opts.num_levels = 2; opts.statistics = CreateDBStatistics(); Reopen(opts); for (int i = 0; i < kNumFiles; ++i) { if (i > 0) { // range tombstone covers first half of the previous file db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), GetNumericStr((i - 1) * kNumPerFile), GetNumericStr((i - 1) * kNumPerFile + kNumPerFile / 2)); } // Make sure a given key appears in each file so compaction won't be able to // use trivial move, which would happen if the ranges were non-overlapping. // Also, we need an extra element since flush is only triggered when the // number of keys is one greater than SpecialSkipListFactory's limit. // We choose a key outside the key-range used by the test to avoid conflict. db_->Put(WriteOptions(), GetNumericStr(kNumPerFile * kNumFiles), "val"); for (int j = 0; j < kNumPerFile; ++j) { db_->Put(WriteOptions(), GetNumericStr(i * kNumPerFile + j), "val"); } dbfull()->TEST_WaitForFlushMemTable(); ASSERT_EQ(i + 1, NumTableFilesAtLevel(0)); } db_->CompactRange(CompactRangeOptions(), nullptr, nullptr); ASSERT_EQ(0, NumTableFilesAtLevel(0)); ASSERT_GT(NumTableFilesAtLevel(1), 0); ASSERT_EQ((kNumFiles - 1) * kNumPerFile / 2, TestGetTickerCount(opts, COMPACTION_KEY_DROP_RANGE_DEL)); for (int i = 0; i < kNumFiles; ++i) { for (int j = 0; j < kNumPerFile; ++j) { ReadOptions read_opts; read_opts.ignore_range_deletions = true; std::string value; if (i == kNumFiles - 1 || j >= kNumPerFile / 2) { ASSERT_OK( db_->Get(read_opts, GetNumericStr(i * kNumPerFile + j), &value)); } else { ASSERT_TRUE( db_->Get(read_opts, GetNumericStr(i * kNumPerFile + j), &value) .IsNotFound()); } } } } TEST_F(DBRangeDelTest, ValidLevelSubcompactionBoundaries) { const int kNumPerFile = 100, kNumFiles = 4, kFileBytes = 100 << 10; Options options = CurrentOptions(); options.level0_file_num_compaction_trigger = kNumFiles; options.max_bytes_for_level_base = 2 * kFileBytes; options.max_subcompactions = 4; options.memtable_factory.reset(new SpecialSkipListFactory(kNumPerFile)); options.num_levels = 3; options.target_file_size_base = kFileBytes; options.target_file_size_multiplier = 1; Reopen(options); Random rnd(301); for (int i = 0; i < 2; ++i) { for (int j = 0; j < kNumFiles; ++j) { if (i > 0) { // delete [95,105) in two files, [295,305) in next two int mid = (j + (1 - j % 2)) * kNumPerFile; db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), Key(mid - 5), Key(mid + 5)); } std::vector values; // Write 100KB (100 values, each 1K) for (int k = 0; k < kNumPerFile; k++) { values.push_back(RandomString(&rnd, 990)); ASSERT_OK(Put(Key(j * kNumPerFile + k), values[k])); } // put extra key to trigger flush ASSERT_OK(Put("", "")); dbfull()->TEST_WaitForFlushMemTable(); if (j < kNumFiles - 1) { // background compaction may happen early for kNumFiles'th file ASSERT_EQ(NumTableFilesAtLevel(0), j + 1); } if (j == options.level0_file_num_compaction_trigger - 1) { // When i == 1, compaction will output some files to L1, at which point // L1 is not bottommost so range deletions cannot be compacted away. The // new L1 files must be generated with non-overlapping key ranges even // though multiple subcompactions see the same ranges deleted, else an // assertion will fail. dbfull()->TEST_WaitForCompact(); ASSERT_EQ(NumTableFilesAtLevel(0), 0); ASSERT_GT(NumTableFilesAtLevel(1), 0); ASSERT_GT(NumTableFilesAtLevel(2), 0); } } } } TEST_F(DBRangeDelTest, ValidUniversalSubcompactionBoundaries) { const int kNumPerFile = 100, kFilesPerLevel = 4, kNumLevels = 4; Options options = CurrentOptions(); options.compaction_options_universal.min_merge_width = kFilesPerLevel; options.compaction_options_universal.max_merge_width = kFilesPerLevel; options.compaction_options_universal.size_ratio = 10; options.compaction_style = kCompactionStyleUniversal; options.level0_file_num_compaction_trigger = kFilesPerLevel; options.max_subcompactions = 4; options.memtable_factory.reset(new SpecialSkipListFactory(kNumPerFile)); options.num_levels = kNumLevels; options.target_file_size_base = kNumPerFile << 10; options.target_file_size_multiplier = 1; Reopen(options); Random rnd(301); for (int i = 0; i < kNumLevels - 1; ++i) { for (int j = 0; j < kFilesPerLevel; ++j) { if (i == kNumLevels - 2) { // insert range deletions [95,105) in two files, [295,305) in next two // to prepare L1 for later manual compaction. int mid = (j + (1 - j % 2)) * kNumPerFile; db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), Key(mid - 5), Key(mid + 5)); } std::vector values; // Write 100KB (100 values, each 1K) for (int k = 0; k < kNumPerFile; k++) { values.push_back(RandomString(&rnd, 990)); ASSERT_OK(Put(Key(j * kNumPerFile + k), values[k])); } // put extra key to trigger flush ASSERT_OK(Put("", "")); dbfull()->TEST_WaitForFlushMemTable(); if (j < kFilesPerLevel - 1) { // background compaction may happen early for kFilesPerLevel'th file ASSERT_EQ(NumTableFilesAtLevel(0), j + 1); } } dbfull()->TEST_WaitForCompact(); ASSERT_EQ(NumTableFilesAtLevel(0), 0); ASSERT_GT(NumTableFilesAtLevel(kNumLevels - 1 - i), kFilesPerLevel - 1); } // Now L1-L3 are full, when we compact L1->L2 we should see (1) subcompactions // happen since input level > 0; (2) range deletions are not dropped since // output level is not bottommost. If no file boundary assertion fails, that // probably means universal compaction + subcompaction + range deletion are // compatible. ASSERT_OK(dbfull()->RunManualCompaction( reinterpret_cast(db_->DefaultColumnFamily()) ->cfd(), 1 /* input_level */, 2 /* output_level */, 0 /* output_path_id */, nullptr /* begin */, nullptr /* end */, true /* exclusive */, true /* disallow_trivial_move */)); } #endif // ROCKSDB_LITE TEST_F(DBRangeDelTest, CompactionRemovesCoveredMergeOperands) { const int kNumPerFile = 3, kNumFiles = 3; Options opts = CurrentOptions(); opts.disable_auto_compactions = true; opts.memtable_factory.reset(new SpecialSkipListFactory(2 * kNumPerFile)); opts.merge_operator = MergeOperators::CreateUInt64AddOperator(); opts.num_levels = 2; Reopen(opts); // Iterates kNumFiles * kNumPerFile + 1 times since flushing the last file // requires an extra entry. for (int i = 0; i <= kNumFiles * kNumPerFile; ++i) { if (i % kNumPerFile == 0 && i / kNumPerFile == kNumFiles - 1) { // Delete merge operands from all but the last file db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), "key", "key_"); } std::string val; PutFixed64(&val, i); db_->Merge(WriteOptions(), "key", val); // we need to prevent trivial move using Puts so compaction will actually // process the merge operands. db_->Put(WriteOptions(), "prevent_trivial_move", ""); if (i > 0 && i % kNumPerFile == 0) { dbfull()->TEST_WaitForFlushMemTable(); } } ReadOptions read_opts; read_opts.ignore_range_deletions = true; std::string expected, actual; ASSERT_OK(db_->Get(read_opts, "key", &actual)); PutFixed64(&expected, 45); // 1+2+...+9 ASSERT_EQ(expected, actual); db_->CompactRange(CompactRangeOptions(), nullptr, nullptr); expected.clear(); ASSERT_OK(db_->Get(read_opts, "key", &actual)); uint64_t tmp; Slice tmp2(actual); GetFixed64(&tmp2, &tmp); PutFixed64(&expected, 30); // 6+7+8+9 (earlier operands covered by tombstone) ASSERT_EQ(expected, actual); } // NumTableFilesAtLevel() is not supported in ROCKSDB_LITE #ifndef ROCKSDB_LITE TEST_F(DBRangeDelTest, ObsoleteTombstoneCleanup) { // During compaction to bottommost level, verify range tombstones older than // the oldest snapshot are removed, while others are preserved. Options opts = CurrentOptions(); opts.disable_auto_compactions = true; opts.num_levels = 2; opts.statistics = CreateDBStatistics(); Reopen(opts); db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), "dr1", "dr1"); // obsolete after compaction db_->Put(WriteOptions(), "key", "val"); db_->Flush(FlushOptions()); const Snapshot* snapshot = db_->GetSnapshot(); db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), "dr2", "dr2"); // protected by snapshot db_->Put(WriteOptions(), "key", "val"); db_->Flush(FlushOptions()); ASSERT_EQ(2, NumTableFilesAtLevel(0)); ASSERT_EQ(0, NumTableFilesAtLevel(1)); db_->CompactRange(CompactRangeOptions(), nullptr, nullptr); ASSERT_EQ(0, NumTableFilesAtLevel(0)); ASSERT_EQ(1, NumTableFilesAtLevel(1)); ASSERT_EQ(1, TestGetTickerCount(opts, COMPACTION_RANGE_DEL_DROP_OBSOLETE)); db_->ReleaseSnapshot(snapshot); } #endif // ROCKSDB_LITE TEST_F(DBRangeDelTest, GetCoveredKeyFromMutableMemtable) { db_->Put(WriteOptions(), "key", "val"); ASSERT_OK( db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), "a", "z")); ReadOptions read_opts; std::string value; ASSERT_TRUE(db_->Get(read_opts, "key", &value).IsNotFound()); } TEST_F(DBRangeDelTest, GetCoveredKeyFromImmutableMemtable) { Options opts = CurrentOptions(); opts.max_write_buffer_number = 3; opts.min_write_buffer_number_to_merge = 2; // SpecialSkipListFactory lets us specify maximum number of elements the // memtable can hold. It switches the active memtable to immutable (flush is // prevented by the above options) upon inserting an element that would // overflow the memtable. opts.memtable_factory.reset(new SpecialSkipListFactory(1)); Reopen(opts); db_->Put(WriteOptions(), "key", "val"); ASSERT_OK( db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), "a", "z")); db_->Put(WriteOptions(), "blah", "val"); ReadOptions read_opts; std::string value; ASSERT_TRUE(db_->Get(read_opts, "key", &value).IsNotFound()); } TEST_F(DBRangeDelTest, GetCoveredKeyFromSst) { db_->Put(WriteOptions(), "key", "val"); // snapshot prevents key from being deleted during flush const Snapshot* snapshot = db_->GetSnapshot(); ASSERT_OK( db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), "a", "z")); ASSERT_OK(db_->Flush(FlushOptions())); ReadOptions read_opts; std::string value; ASSERT_TRUE(db_->Get(read_opts, "key", &value).IsNotFound()); db_->ReleaseSnapshot(snapshot); } TEST_F(DBRangeDelTest, GetCoveredMergeOperandFromMemtable) { const int kNumMergeOps = 10; Options opts = CurrentOptions(); opts.merge_operator = MergeOperators::CreateUInt64AddOperator(); Reopen(opts); for (int i = 0; i < kNumMergeOps; ++i) { std::string val; PutFixed64(&val, i); db_->Merge(WriteOptions(), "key", val); if (i == kNumMergeOps / 2) { // deletes [0, 5] db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), "key", "key_"); } } ReadOptions read_opts; std::string expected, actual; ASSERT_OK(db_->Get(read_opts, "key", &actual)); PutFixed64(&expected, 30); // 6+7+8+9 ASSERT_EQ(expected, actual); expected.clear(); read_opts.ignore_range_deletions = true; ASSERT_OK(db_->Get(read_opts, "key", &actual)); PutFixed64(&expected, 45); // 0+1+2+...+9 ASSERT_EQ(expected, actual); } TEST_F(DBRangeDelTest, GetIgnoresRangeDeletions) { Options opts = CurrentOptions(); opts.max_write_buffer_number = 4; opts.min_write_buffer_number_to_merge = 3; opts.memtable_factory.reset(new SpecialSkipListFactory(1)); Reopen(opts); db_->Put(WriteOptions(), "sst_key", "val"); // snapshot prevents key from being deleted during flush const Snapshot* snapshot = db_->GetSnapshot(); ASSERT_OK( db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), "a", "z")); ASSERT_OK(db_->Flush(FlushOptions())); db_->Put(WriteOptions(), "imm_key", "val"); ASSERT_OK( db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), "a", "z")); db_->Put(WriteOptions(), "mem_key", "val"); ASSERT_OK( db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), "a", "z")); ReadOptions read_opts; read_opts.ignore_range_deletions = true; for (std::string key : {"sst_key", "imm_key", "mem_key"}) { std::string value; ASSERT_OK(db_->Get(read_opts, key, &value)); } db_->ReleaseSnapshot(snapshot); } TEST_F(DBRangeDelTest, IteratorRemovesCoveredKeys) { const int kNum = 200, kRangeBegin = 50, kRangeEnd = 150, kNumPerFile = 25; Options opts = CurrentOptions(); opts.comparator = test::Uint64Comparator(); opts.memtable_factory.reset(new SpecialSkipListFactory(kNumPerFile)); Reopen(opts); // Write half of the keys before the tombstone and half after the tombstone. // Only covered keys (i.e., within the range and older than the tombstone) // should be deleted. for (int i = 0; i < kNum; ++i) { if (i == kNum / 2) { db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), GetNumericStr(kRangeBegin), GetNumericStr(kRangeEnd)); } db_->Put(WriteOptions(), GetNumericStr(i), "val"); } ReadOptions read_opts; auto* iter = db_->NewIterator(read_opts); int expected = 0; for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { ASSERT_EQ(GetNumericStr(expected), iter->key()); if (expected == kRangeBegin - 1) { expected = kNum / 2; } else { ++expected; } } ASSERT_EQ(kNum, expected); delete iter; } TEST_F(DBRangeDelTest, IteratorOverUserSnapshot) { const int kNum = 200, kRangeBegin = 50, kRangeEnd = 150, kNumPerFile = 25; Options opts = CurrentOptions(); opts.comparator = test::Uint64Comparator(); opts.memtable_factory.reset(new SpecialSkipListFactory(kNumPerFile)); Reopen(opts); const Snapshot* snapshot = nullptr; // Put a snapshot before the range tombstone, verify an iterator using that // snapshot sees all inserted keys. for (int i = 0; i < kNum; ++i) { if (i == kNum / 2) { snapshot = db_->GetSnapshot(); db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), GetNumericStr(kRangeBegin), GetNumericStr(kRangeEnd)); } db_->Put(WriteOptions(), GetNumericStr(i), "val"); } ReadOptions read_opts; read_opts.snapshot = snapshot; auto* iter = db_->NewIterator(read_opts); int expected = 0; for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { ASSERT_EQ(GetNumericStr(expected), iter->key()); ++expected; } ASSERT_EQ(kNum / 2, expected); delete iter; db_->ReleaseSnapshot(snapshot); } TEST_F(DBRangeDelTest, IteratorIgnoresRangeDeletions) { Options opts = CurrentOptions(); opts.max_write_buffer_number = 4; opts.min_write_buffer_number_to_merge = 3; opts.memtable_factory.reset(new SpecialSkipListFactory(1)); Reopen(opts); db_->Put(WriteOptions(), "sst_key", "val"); // snapshot prevents key from being deleted during flush const Snapshot* snapshot = db_->GetSnapshot(); ASSERT_OK( db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), "a", "z")); ASSERT_OK(db_->Flush(FlushOptions())); db_->Put(WriteOptions(), "imm_key", "val"); ASSERT_OK( db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), "a", "z")); db_->Put(WriteOptions(), "mem_key", "val"); ASSERT_OK( db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), "a", "z")); ReadOptions read_opts; read_opts.ignore_range_deletions = true; auto* iter = db_->NewIterator(read_opts); int i = 0; std::string expected[] = {"imm_key", "mem_key", "sst_key"}; for (iter->SeekToFirst(); iter->Valid(); iter->Next(), ++i) { std::string key; ASSERT_EQ(expected[i], iter->key()); } ASSERT_EQ(3, i); delete iter; db_->ReleaseSnapshot(snapshot); } } // namespace rocksdb int main(int argc, char** argv) { rocksdb::port::InstallStackTraceHandler(); ::testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); }