diff --git a/db/db_impl.cc b/db/db_impl.cc index a6a884d12..1183ef128 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -26,6 +26,7 @@ #include "db/version_set.h" #include "db/write_batch_internal.h" #include "db/transaction_log_iterator_impl.h" +#include "leveldb/compaction_filter.h" #include "leveldb/db.h" #include "leveldb/env.h" #include "leveldb/merge_operator.h" @@ -1701,7 +1702,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { key = merge.key(); ParseInternalKey(key, &ikey); value = merge.value(); - } else if (options_.CompactionFilter != nullptr && + } else if (options_.compaction_filter && ikey.type != kTypeDeletion && visible_at_tip) { // If the user has specified a compaction filter and there are no @@ -1712,11 +1713,10 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { assert(!drop); bool value_changed = false; compaction_filter_value.clear(); - drop = options_.CompactionFilter(options_.compaction_filter_args, - compact->compaction->level(), - ikey.user_key, value, - &compaction_filter_value, - &value_changed); + drop = options_.compaction_filter->Filter(compact->compaction->level(), + ikey.user_key, value, + &compaction_filter_value, + &value_changed); // Another example of statistics update without holding the lock // TODO: clean it up if (drop) { diff --git a/db/db_test.cc b/db/db_test.cc index 6cb42b8d4..e5d90f19b 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -12,6 +12,7 @@ #include "db/version_set.h" #include "db/write_batch_internal.h" #include "leveldb/cache.h" +#include "leveldb/compaction_filter.h" #include "leveldb/env.h" #include "table/table.h" #include "util/hash.h" @@ -1369,35 +1370,64 @@ TEST(DBTest, RepeatedWritesToSameKey) { // kvs during the compaction process. static int cfilter_count; static std::string NEW_VALUE = "NewValue"; -static bool keep_filter(void* arg, int level, const Slice& key, - const Slice& value, std::string* new_value, - bool* value_changed) { - assert(arg == nullptr); - cfilter_count++; - return false; -} -static bool delete_filter(void*argv, int level, const Slice& key, - const Slice& value, std::string* new_value, - bool* value_changed) { - assert(argv == nullptr); - cfilter_count++; - return true; -} -static bool change_filter(void*argv, int level, const Slice& key, - const Slice& value, std::string* new_value, - bool* value_changed) { - assert(argv == (void*)100); - assert(new_value != nullptr); - *new_value = NEW_VALUE; - *value_changed = true; - return false; -} + +class KeepFilter : public CompactionFilter { + public: + virtual bool Filter(int level, const Slice& key, + const Slice& value, std::string* new_value, + bool* value_changed) const override { + cfilter_count++; + return false; + } + + virtual const char* Name() const override { + return "KeepFilter"; + } + +}; + +class DeleteFilter : public CompactionFilter { + public: + virtual bool Filter(int level, const Slice& key, + const Slice& value, std::string* new_value, + bool* value_changed) const override { + cfilter_count++; + return true; + } + + virtual const char* Name() const override { + return "DeleteFilter"; + } +}; + +class ChangeFilter : public CompactionFilter { + public: + ChangeFilter(int argv) : argv_(argv) {} + + virtual bool Filter(int level, const Slice& key, + const Slice& value, std::string* new_value, + bool* value_changed) const override { + assert(argv_ == 100); + assert(new_value != nullptr); + *new_value = NEW_VALUE; + *value_changed = true; + return false; + } + + virtual const char* Name() const override { + return "ChangeFilter"; + } + + private: + const int argv_; +}; TEST(DBTest, CompactionFilter) { Options options = CurrentOptions(); options.num_levels = 3; options.max_mem_compaction_level = 0; - options.CompactionFilter = keep_filter; + auto keep_filter = std::make_shared(); + options.compaction_filter = keep_filter.get(); Reopen(&options); // Write 100K keys, these are written to a few files in L0. @@ -1472,7 +1502,8 @@ TEST(DBTest, CompactionFilter) { // create a new database with the compaction // filter in such a way that it deletes all keys - options.CompactionFilter = delete_filter; + auto delete_filter = std::make_shared(); + options.compaction_filter = delete_filter.get(); options.create_if_missing = true; DestroyAndReopen(&options); @@ -1535,8 +1566,8 @@ TEST(DBTest, CompactionFilterWithValueChange) { Options options = CurrentOptions(); options.num_levels = 3; options.max_mem_compaction_level = 0; - options.compaction_filter_args = (void *)100; - options.CompactionFilter = change_filter; + auto change_filter = std::make_shared(100); + options.compaction_filter = change_filter.get(); Reopen(&options); // Write 100K+1 keys, these are written to a few files diff --git a/include/leveldb/compaction_filter.h b/include/leveldb/compaction_filter.h new file mode 100644 index 000000000..73614742e --- /dev/null +++ b/include/leveldb/compaction_filter.h @@ -0,0 +1,46 @@ +// Copyright (c) 2013 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#ifndef STORAGE_LEVELDB_INCLUDE_COMPACTION_FILTER_H_ +#define STORAGE_LEVELDB_INCLUDE_COMPACTION_FILTER_H_ + +#include + +namespace leveldb { + +class Slice; + +// CompactionFilter allows an application to modify/delete a key-value at +// the time of compaction. + +class CompactionFilter { + public: + virtual ~CompactionFilter() {} + + + // The compaction process invokes this + // method for kv that is being compacted. A return value + // of false indicates that the kv should be preserved in the + // output of this compaction run and a return value of true + // indicates that this key-value should be removed from the + // output of the compaction. The application can inspect + // the existing value of the key and make decision based on it. + // + // When the value is to be preserved, the application has the option + // to modify the existing_value and pass it back through new_value. + // value_changed needs to be set to true in this case. + virtual bool Filter(int level, + const Slice& key, + const Slice& existing_value, + std::string* new_value, + bool* value_changed) const = 0; + + // Returns a name that identifies this compaction filter. + // The name will be printed to LOG file on start up for diagnosis. + virtual const char* Name() const = 0; +}; + +} // namespace leveldb + +#endif // STORAGE_LEVELDB_INCLUDE_COMPACTION_FILTER_H_ diff --git a/include/leveldb/options.h b/include/leveldb/options.h index 557303138..e435e5865 100644 --- a/include/leveldb/options.h +++ b/include/leveldb/options.h @@ -22,6 +22,7 @@ class FilterPolicy; class Logger; class MergeOperator; class Snapshot; +class CompactionFilter; using std::shared_ptr; @@ -76,6 +77,11 @@ struct Options { // Default: nullptr const MergeOperator* merge_operator; + // Allows an application to modify/delete a key-value during background + // compaction. + // Default: nullptr + const CompactionFilter* compaction_filter; + // If true, the database will be created if it is missing. // Default: false bool create_if_missing; @@ -367,28 +373,6 @@ struct Options { // from the database, because otherwise the read can be very slow. Options* PrepareForBulkLoad(); - // This method allows an application to modify/delete a key-value at - // the time of compaction. The compaction process invokes this - // method for kv that is being compacted. A return value - // of false indicates that the kv should be preserved in the - // output of this compaction run and a return value of true - // indicates that this key-value should be removed from the - // output of the compaction. The application can inspect - // the existing value of the key and make decision based on it. - - // When the value is to be preserved, the application has the option - // to modify the existing_value and pass it back through new_value. - // value_changed needs to be set to true in this case. - - // The compaction_filter_args, if specified here, are passed - // back to the invocation of the CompactionFilter. - void* compaction_filter_args; - bool (*CompactionFilter)(void* compaction_filter_args, - int level, const Slice& key, - const Slice& existing_value, - std::string* new_value, - bool* value_changed); - // Disable automatic compactions. Manual compactions can still // be issued on this database. bool disable_auto_compactions; diff --git a/util/options.cc b/util/options.cc index 0fa80a06b..689aa3df2 100644 --- a/util/options.cc +++ b/util/options.cc @@ -7,6 +7,7 @@ #include #include "leveldb/cache.h" +#include "leveldb/compaction_filter.h" #include "leveldb/comparator.h" #include "leveldb/env.h" #include "leveldb/filter_policy.h" @@ -17,6 +18,7 @@ namespace leveldb { Options::Options() : comparator(BytewiseComparator()), merge_operator(nullptr), + compaction_filter(nullptr), create_if_missing(false), error_if_exists(false), paranoid_checks(false), @@ -56,8 +58,6 @@ Options::Options() max_manifest_file_size(std::numeric_limits::max()), no_block_cache(false), table_cache_numshardbits(4), - compaction_filter_args(nullptr), - CompactionFilter(nullptr), disable_auto_compactions(false), WAL_ttl_seconds(0), manifest_preallocation_size(4 * 1024 * 1024), @@ -76,6 +76,8 @@ Options::Dump(Logger* log) const Log(log," Options.comparator: %s", comparator->Name()); Log(log," Options.merge_operator: %s", merge_operator? merge_operator->Name() : "None"); + Log(log," Options.compaction_filter: %s", + compaction_filter? compaction_filter->Name() : "None"); Log(log," Options.error_if_exists: %d", error_if_exists); Log(log," Options.paranoid_checks: %d", paranoid_checks); Log(log," Options.env: %p", env); @@ -162,10 +164,6 @@ Options::Dump(Logger* log) const rate_limit); Log(log," Options.rate_limit_delay_milliseconds: %d", rate_limit_delay_milliseconds); - Log(log," Options.compaction_filter_args: %p", - compaction_filter_args); - Log(log," Options.CompactionFilter: %p", - CompactionFilter); Log(log," Options.disable_auto_compactions: %d", disable_auto_compactions); Log(log," Options.WAL_ttl_seconds: %ld", diff --git a/utilities/ttl/db_ttl.cc b/utilities/ttl/db_ttl.cc index 949140c58..9e628b8ad 100644 --- a/utilities/ttl/db_ttl.cc +++ b/utilities/ttl/db_ttl.cc @@ -75,10 +75,9 @@ DBWithTTL::DBWithTTL(const int32_t ttl, Status& st, bool read_only) : ttl_(ttl) { - assert(options.CompactionFilter == nullptr); + assert(options.compaction_filter == nullptr); Options options_to_open = options; - options_to_open.compaction_filter_args = &ttl_; - options_to_open.CompactionFilter = DeleteByTS; + options_to_open.compaction_filter = this; if (read_only) { st = DB::OpenForReadOnly(options_to_open, dbname, &db_); } else { @@ -105,16 +104,20 @@ Status UtilityDB::OpenTtlDB( } // returns true(i.e. key-value to be deleted) if its TS has expired based on ttl -bool DBWithTTL::DeleteByTS( - void* args, +bool DBWithTTL::Filter( int level, const Slice& key, const Slice& old_val, std::string* new_val, - bool* value_changed) { - return IsStale(old_val, *(int32_t*)args); + bool* value_changed) const { + return IsStale(old_val, ttl_); } +const char* DBWithTTL::Name() const { + return "Delete By TTL"; +} + + // Gives back the current time Status DBWithTTL::GetCurrentTime(int32_t& curtime) { return Env::Default()->GetCurrentTime((int64_t*)&curtime); diff --git a/utilities/ttl/db_ttl.h b/utilities/ttl/db_ttl.h index b36573b87..ecae9edc1 100644 --- a/utilities/ttl/db_ttl.h +++ b/utilities/ttl/db_ttl.h @@ -6,11 +6,12 @@ #define LEVELDB_UTILITIES_TTL_DB_TTL_H_ #include "include/leveldb/db.h" +#include "include/leveldb/compaction_filter.h" #include "db/db_impl.h" namespace leveldb { -class DBWithTTL : public DB { +class DBWithTTL : public DB, CompactionFilter { public: DBWithTTL(const int32_t ttl, const Options& options, @@ -71,12 +72,14 @@ class DBWithTTL : public DB { // Simulate a db crash, no elegant closing of database. void TEST_Destroy_DBWithTtl(); - static bool DeleteByTS(void* args, - int level, - const Slice& key, - const Slice& old_val, - std::string* new_val, - bool* value_changed); + // The following two methods are for CompactionFilter + virtual bool Filter(int level, + const Slice& key, + const Slice& old_val, + std::string* new_val, + bool* value_changed) const override; + + virtual const char* Name() const override; static bool IsStale(const Slice& value, int32_t ttl);