diff --git a/db/db_test.cc b/db/db_test.cc index d126bb530..d240e41e5 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -2383,6 +2383,97 @@ TEST(DBTest, CompactionTrigger) { ASSERT_EQ(NumTableFilesAtLevel(1), 1); } +// This is a static filter used for filtering +// kvs during the compaction process. +static int cfilter_count; +static std::string NEW_VALUE = "NewValue"; + +class KeepFilter : public CompactionFilter { + public: + virtual bool Filter(int level, const Slice& key, const Slice& value, + std::string* new_value, bool* value_changed) const + override { + cfilter_count++; + return false; + } + + virtual const char* Name() const override { return "KeepFilter"; } +}; + +class DeleteFilter : public CompactionFilter { + public: + virtual bool Filter(int level, const Slice& key, const Slice& value, + std::string* new_value, bool* value_changed) const + override { + cfilter_count++; + return true; + } + + virtual const char* Name() const override { return "DeleteFilter"; } +}; + +class ChangeFilter : public CompactionFilter { + public: + explicit ChangeFilter() {} + + virtual bool Filter(int level, const Slice& key, const Slice& value, + std::string* new_value, bool* value_changed) const + override { + assert(new_value != nullptr); + *new_value = NEW_VALUE; + *value_changed = true; + return false; + } + + virtual const char* Name() const override { return "ChangeFilter"; } +}; + +class KeepFilterFactory : public CompactionFilterFactory { + public: + explicit KeepFilterFactory(bool check_context = false) + : check_context_(check_context) {} + + virtual std::unique_ptr CreateCompactionFilter( + const CompactionFilter::Context& context) override { + if (check_context_) { + ASSERT_EQ(expect_full_compaction_.load(), context.is_full_compaction); + ASSERT_EQ(expect_manual_compaction_.load(), context.is_manual_compaction); + } + return std::unique_ptr(new KeepFilter()); + } + + virtual const char* Name() const override { return "KeepFilterFactory"; } + bool check_context_; + std::atomic_bool expect_full_compaction_; + std::atomic_bool expect_manual_compaction_; +}; + +class DeleteFilterFactory : public CompactionFilterFactory { + public: + virtual std::unique_ptr CreateCompactionFilter( + const CompactionFilter::Context& context) override { + if (context.is_manual_compaction) { + return std::unique_ptr(new DeleteFilter()); + } else { + return std::unique_ptr(nullptr); + } + } + + virtual const char* Name() const override { return "DeleteFilterFactory"; } +}; + +class ChangeFilterFactory : public CompactionFilterFactory { + public: + explicit ChangeFilterFactory() {} + + virtual std::unique_ptr CreateCompactionFilter( + const CompactionFilter::Context& context) override { + return std::unique_ptr(new ChangeFilter()); + } + + virtual const char* Name() const override { return "ChangeFilterFactory"; } +}; + // TODO(kailiu) The tests on UniversalCompaction has some issues: // 1. A lot of magic numbers ("11" or "12"). // 2. Made assumption on the memtable flush conidtions, which may change from @@ -2393,11 +2484,16 @@ TEST(DBTest, UniversalCompactionTrigger) { options.write_buffer_size = 100<<10; //100KB // trigger compaction if there are >= 4 files options.level0_file_num_compaction_trigger = 4; + KeepFilterFactory* filter = new KeepFilterFactory(true); + filter->expect_manual_compaction_.store(false); + options.compaction_filter_factory.reset(filter); + Reopen(&options); Random rnd(301); int key_idx = 0; + filter->expect_full_compaction_.store(true); // Stage 1: // Generate a set of files at level 0, but don't trigger level-0 // compaction. @@ -2415,6 +2511,7 @@ TEST(DBTest, UniversalCompactionTrigger) { // Generate one more file at level-0, which should trigger level-0 // compaction. + filter->expect_full_compaction_.store(false); for (int i = 0; i < 11; i++) { ASSERT_OK(Put(Key(key_idx), RandomString(&rnd, 10000))); key_idx++; @@ -2508,6 +2605,7 @@ TEST(DBTest, UniversalCompactionTrigger) { // Stage 5: // Now we have 4 files at level 0, with size 4, 2.4, 2, 1. Let's generate // a new file of size 1. + filter->expect_full_compaction_.store(true); for (int i = 0; i < 11; i++) { ASSERT_OK(Put(Key(key_idx), RandomString(&rnd, 10000))); key_idx++; @@ -3230,100 +3328,6 @@ TEST(DBTest, InPlaceUpdateCallbackNoAction) { } while (ChangeCompactOptions()); } -// This is a static filter used for filtering -// kvs during the compaction process. -static int cfilter_count; -static std::string NEW_VALUE = "NewValue"; - -class KeepFilter : public CompactionFilter { - public: - virtual bool Filter(int level, const Slice& key, - const Slice& value, std::string* new_value, - bool* value_changed) const override { - cfilter_count++; - return false; - } - - virtual const char* Name() const override { - return "KeepFilter"; - } - -}; - -class DeleteFilter : public CompactionFilter { - public: - virtual bool Filter(int level, const Slice& key, - const Slice& value, std::string* new_value, - bool* value_changed) const override { - cfilter_count++; - return true; - } - - virtual const char* Name() const override { - return "DeleteFilter"; - } -}; - -class ChangeFilter : public CompactionFilter { - public: - explicit ChangeFilter() {} - - virtual bool Filter(int level, const Slice& key, - const Slice& value, std::string* new_value, - bool* value_changed) const override { - assert(new_value != nullptr); - *new_value = NEW_VALUE; - *value_changed = true; - return false; - } - - virtual const char* Name() const override { - return "ChangeFilter"; - } -}; - -class KeepFilterFactory : public CompactionFilterFactory { - public: - virtual std::unique_ptr - CreateCompactionFilter(const CompactionFilter::Context& context) override { - return std::unique_ptr(new KeepFilter()); - } - - virtual const char* Name() const override { - return "KeepFilterFactory"; - } -}; - -class DeleteFilterFactory : public CompactionFilterFactory { - public: - virtual std::unique_ptr - CreateCompactionFilter(const CompactionFilter::Context& context) override { - if (context.is_manual_compaction) { - return std::unique_ptr(new DeleteFilter()); - } else { - return std::unique_ptr(nullptr); - } - } - - virtual const char* Name() const override { - return "DeleteFilterFactory"; - } -}; - -class ChangeFilterFactory : public CompactionFilterFactory { - public: - explicit ChangeFilterFactory() {} - - virtual std::unique_ptr - CreateCompactionFilter(const CompactionFilter::Context& context) override { - return std::unique_ptr(new ChangeFilter()); - } - - virtual const char* Name() const override { - return "ChangeFilterFactory"; - } -}; - TEST(DBTest, CompactionFilter) { Options options = CurrentOptions(); options.num_levels = 3; @@ -3512,6 +3516,60 @@ TEST(DBTest, CompactionFilterWithValueChange) { } while (ChangeCompactOptions()); } +TEST(DBTest, CompactionFilterContextManual) { + KeepFilterFactory* filter = new KeepFilterFactory(); + + Options options = CurrentOptions(); + options.compaction_style = kCompactionStyleUniversal; + options.compaction_filter_factory.reset(filter); + options.compression = kNoCompression; + options.level0_file_num_compaction_trigger = 8; + Reopen(&options); + int num_keys_per_file = 400; + for (int j = 0; j < 3; j++) { + // Write several keys. + const std::string value(10, 'x'); + for (int i = 0; i < num_keys_per_file; i++) { + char key[100]; + snprintf(key, sizeof(key), "B%08d%02d", i, j); + Put(key, value); + } + dbfull()->TEST_FlushMemTable(); + // Make sure next file is much smaller so automatic compaction will not + // be triggered. + num_keys_per_file /= 2; + } + + // Force a manual compaction + cfilter_count = 0; + filter->expect_manual_compaction_.store(true); + filter->expect_full_compaction_.store(false); // Manual compaction always + // set this flag. + dbfull()->CompactRange(nullptr, nullptr); + ASSERT_EQ(cfilter_count, 700); + ASSERT_EQ(NumTableFilesAtLevel(0), 1); + + // Verify total number of keys is correct after manual compaction. + int count = 0; + int total = 0; + Iterator* iter = dbfull()->TEST_NewInternalIterator(); + iter->SeekToFirst(); + ASSERT_OK(iter->status()); + while (iter->Valid()) { + ParsedInternalKey ikey(Slice(), 0, kTypeValue); + ikey.sequence = -1; + ASSERT_EQ(ParseInternalKey(iter->key(), &ikey), true); + total++; + if (ikey.sequence != 0) { + count++; + } + iter->Next(); + } + ASSERT_EQ(total, 700); + ASSERT_EQ(count, 1); + delete iter; +} + TEST(DBTest, SparseMerge) { do { Options options = CurrentOptions();