diff --git a/db_stress_tool/db_stress_common.h b/db_stress_tool/db_stress_common.h index 816c9182f..da8d40237 100644 --- a/db_stress_tool/db_stress_common.h +++ b/db_stress_tool/db_stress_common.h @@ -238,6 +238,7 @@ DECLARE_bool(sync_fault_injection); DECLARE_bool(best_efforts_recovery); DECLARE_bool(skip_verifydb); +DECLARE_bool(enable_compaction_filter); const long KB = 1024; const int kRandomValueMaxFactor = 3; @@ -439,19 +440,10 @@ extern inline bool GetIntVal(std::string big_endian_key, uint64_t* key_p) { assert(size_key <= key_gen_ctx.weights.size() * sizeof(uint64_t)); - // Pad with zeros to make it a multiple of 8. This function may be called - // with a prefix, in which case we return the first index that falls - // inside or outside that prefix, dependeing on whether the prefix is - // the start of upper bound of a scan - unsigned int pad = sizeof(uint64_t) - (size_key % sizeof(uint64_t)); - if (pad < sizeof(uint64_t)) { - big_endian_key.append(pad, '\0'); - size_key += pad; - } - std::string little_endian_key; little_endian_key.resize(size_key); - for (size_t start = 0; start < size_key; start += sizeof(uint64_t)) { + for (size_t start = 0; start + sizeof(uint64_t) <= size_key; + start += sizeof(uint64_t)) { size_t end = start + sizeof(uint64_t); for (size_t i = 0; i < sizeof(uint64_t); ++i) { little_endian_key[start + i] = big_endian_key[end - 1 - i]; @@ -470,17 +462,41 @@ extern inline bool GetIntVal(std::string big_endian_key, uint64_t* key_p) { uint64_t pfx = prefixes[i]; key += (pfx / key_gen_ctx.weights[i]) * key_gen_ctx.window + pfx % key_gen_ctx.weights[i]; + if (i < prefixes.size() - 1) { + // The encoding writes a `key_gen_ctx.weights[i] - 1` that counts for + // `key_gen_ctx.weights[i]` when there are more prefixes to come. So we + // need to add back the one here as we're at a non-last prefix. + ++key; + } } *key_p = key; return true; } +// Given a string prefix, map it to the first corresponding index in the +// expected values buffer. +inline bool GetFirstIntValInPrefix(std::string big_endian_prefix, + uint64_t* key_p) { + size_t size_key = big_endian_prefix.size(); + // Pad with zeros to make it a multiple of 8. This function may be called + // with a prefix, in which case we return the first index that falls + // inside or outside that prefix, dependeing on whether the prefix is + // the start of upper bound of a scan + unsigned int pad = sizeof(uint64_t) - (size_key % sizeof(uint64_t)); + if (pad < sizeof(uint64_t)) { + big_endian_prefix.append(pad, '\0'); + size_key += pad; + } + return GetIntVal(std::move(big_endian_prefix), key_p); +} + extern inline uint64_t GetPrefixKeyCount(const std::string& prefix, const std::string& ub) { uint64_t start = 0; uint64_t end = 0; - if (!GetIntVal(prefix, &start) || !GetIntVal(ub, &end)) { + if (!GetFirstIntValInPrefix(prefix, &start) || + !GetFirstIntValInPrefix(ub, &end)) { return 0; } diff --git a/db_stress_tool/db_stress_compaction_filter.h b/db_stress_tool/db_stress_compaction_filter.h new file mode 100644 index 000000000..95e4fc7d9 --- /dev/null +++ b/db_stress_tool/db_stress_compaction_filter.h @@ -0,0 +1,78 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#pragma once + +#include "rocksdb/compaction_filter.h" + +namespace ROCKSDB_NAMESPACE { + +// DbStressCompactionFilter is safe to use with db_stress as it does not perform +// any mutation. It only makes `kRemove` decisions for keys that are already +// non-existent according to the `SharedState`. +class DbStressCompactionFilter : public CompactionFilter { + public: + DbStressCompactionFilter(SharedState* state, int cf_id) + : state_(state), cf_id_(cf_id) {} + + Decision FilterV2(int /*level*/, const Slice& key, ValueType /*value_type*/, + const Slice& /*existing_value*/, std::string* /*new_value*/, + std::string* /*skip_until*/) const override { + if (state_ == nullptr) { + return Decision::kKeep; + } + if (key.empty() || ('0' <= key[0] && key[0] <= '9')) { + // It is likely leftover from a test_batches_snapshots run. Below this + // conditional, the test_batches_snapshots key format is not handled + // properly. Just keep it to be safe. + return Decision::kKeep; + } + uint64_t key_num = 0; + bool ok = GetIntVal(key.ToString(), &key_num); + assert(ok); + MutexLock key_lock(state_->GetMutexForKey(cf_id_, key_num)); + if (!state_->Exists(cf_id_, key_num)) { + return Decision::kRemove; + } + return Decision::kKeep; + } + + const char* Name() const override { return "DbStressCompactionFilter"; } + + private: + SharedState* const state_; + const int cf_id_; +}; + +class DbStressCompactionFilterFactory : public CompactionFilterFactory { + public: + DbStressCompactionFilterFactory() : state_(nullptr) {} + + void SetSharedState(SharedState* state) { + MutexLock state_mutex_guard(&state_mutex_); + state_ = state; + } + + std::unique_ptr CreateCompactionFilter( + const CompactionFilter::Context& context) override { + MutexLock state_mutex_guard(&state_mutex_); + return std::unique_ptr( + new DbStressCompactionFilter(state_, context.column_family_id)); + } + + const char* Name() const override { + return "DbStressCompactionFilterFactory"; + } + + private: + port::Mutex state_mutex_; + SharedState* state_; +}; + +} // namespace ROCKSDB_NAMESPACE diff --git a/db_stress_tool/db_stress_driver.cc b/db_stress_tool/db_stress_driver.cc index a1e2b9411..0ae848049 100644 --- a/db_stress_tool/db_stress_driver.cc +++ b/db_stress_tool/db_stress_driver.cc @@ -57,11 +57,8 @@ void ThreadBody(void* v) { bool RunStressTest(StressTest* stress) { stress->InitDb(); - SharedState shared(db_stress_env, stress); - if (FLAGS_read_only) { - stress->InitReadonlyDb(&shared); - } + stress->FinishInitDb(&shared); #ifndef NDEBUG if (FLAGS_sync_fault_injection) { diff --git a/db_stress_tool/db_stress_gflags.cc b/db_stress_tool/db_stress_gflags.cc index f3960387c..fa20ef453 100644 --- a/db_stress_tool/db_stress_gflags.cc +++ b/db_stress_tool/db_stress_gflags.cc @@ -695,4 +695,9 @@ DEFINE_bool(sync_fault_injection, false, DEFINE_bool(best_efforts_recovery, false, "If true, use best efforts recovery."); DEFINE_bool(skip_verifydb, false, "If true, skip VerifyDb() calls."); + +DEFINE_bool(enable_compaction_filter, false, + "If true, configures a compaction filter that returns a kRemove " + "decision for deleted keys."); + #endif // GFLAGS diff --git a/db_stress_tool/db_stress_test_base.cc b/db_stress_tool/db_stress_test_base.cc index 95cf9b755..e7a75c82f 100644 --- a/db_stress_tool/db_stress_test_base.cc +++ b/db_stress_tool/db_stress_test_base.cc @@ -10,6 +10,7 @@ #ifdef GFLAGS #include "db_stress_tool/db_stress_common.h" +#include "db_stress_tool/db_stress_compaction_filter.h" #include "db_stress_tool/db_stress_driver.h" #include "rocksdb/convenience.h" #include "rocksdb/sst_file_manager.h" @@ -195,11 +196,18 @@ void StressTest::InitDb() { BuildOptionsTable(); } -void StressTest::InitReadonlyDb(SharedState* shared) { - uint64_t now = db_stress_env->NowMicros(); - fprintf(stdout, "%s Preloading db with %" PRIu64 " KVs\n", - db_stress_env->TimeToString(now / 1000000).c_str(), FLAGS_max_key); - PreloadDbAndReopenAsReadOnly(FLAGS_max_key, shared); +void StressTest::FinishInitDb(SharedState* shared) { + if (FLAGS_read_only) { + uint64_t now = db_stress_env->NowMicros(); + fprintf(stdout, "%s Preloading db with %" PRIu64 " KVs\n", + db_stress_env->TimeToString(now / 1000000).c_str(), FLAGS_max_key); + PreloadDbAndReopenAsReadOnly(FLAGS_max_key, shared); + } + if (FLAGS_enable_compaction_filter) { + reinterpret_cast( + options_.compaction_filter_factory.get()) + ->SetSharedState(shared); + } } bool StressTest::VerifySecondaries() { @@ -1914,6 +1922,10 @@ void StressTest::Open() { } else { options_.merge_operator = MergeOperators::CreatePutOperator(); } + if (FLAGS_enable_compaction_filter) { + options_.compaction_filter_factory = + std::make_shared(); + } options_.best_efforts_recovery = FLAGS_best_efforts_recovery; diff --git a/db_stress_tool/db_stress_test_base.h b/db_stress_tool/db_stress_test_base.h index ab80fc578..a5d1ab4fe 100644 --- a/db_stress_tool/db_stress_test_base.h +++ b/db_stress_tool/db_stress_test_base.h @@ -27,7 +27,9 @@ class StressTest { bool BuildOptionsTable(); void InitDb(); - void InitReadonlyDb(SharedState*); + // The initialization work is split into two parts to avoid a circular + // dependency with `SharedState`. + void FinishInitDb(SharedState*); // Return false if verification fails. bool VerifySecondaries(); diff --git a/db_stress_tool/db_stress_tool.cc b/db_stress_tool/db_stress_tool.cc index e28779132..2393a835b 100644 --- a/db_stress_tool/db_stress_tool.cc +++ b/db_stress_tool/db_stress_tool.cc @@ -236,6 +236,15 @@ int db_stress_tool(int argc, char** argv) { exit(1); } } + if (FLAGS_enable_compaction_filter && + (FLAGS_acquire_snapshot_one_in > 0 || FLAGS_compact_range_one_in > 0 || + FLAGS_iterpercent > 0 || FLAGS_test_batches_snapshots > 0)) { + fprintf( + stderr, + "Error: acquire_snapshot_one_in, compact_range_one_in, iterpercent, " + "test_batches_snapshots must all be 0 when using compaction filter\n"); + exit(1); + } rocksdb_kill_odds = FLAGS_kill_random_test; rocksdb_kill_prefix_blacklist = SplitString(FLAGS_kill_prefix_blacklist); diff --git a/tools/db_crashtest.py b/tools/db_crashtest.py index d645dc711..6ed1edd1a 100644 --- a/tools/db_crashtest.py +++ b/tools/db_crashtest.py @@ -55,6 +55,7 @@ default_params = { "delrangepercent": 1, "destroy_db_initially": 0, "enable_pipelined_write": lambda: random.randint(0, 1), + "enable_compaction_filter": lambda: random.choice([0, 0, 0, 1]), "expected_values_path": expected_values_file.name, "flush_one_in": 1000000, "get_live_files_one_in": 1000000, @@ -64,6 +65,7 @@ default_params = { "get_current_wal_file_one_in": 0, # Temporarily disable hash index "index_type": lambda: random.choice([0, 0, 0, 2, 2, 3]), + "iterpercent": 10, "max_background_compactions": 20, "max_bytes_for_level_base": 10485760, "max_key": 100000000, @@ -275,6 +277,16 @@ def finalize_and_sanitize(src_params): if dest_params.get("atomic_flush", 0) == 1: # disable pipelined write when atomic flush is used. dest_params["enable_pipelined_write"] = 0 + if dest_params.get("enable_compaction_filter", 0) == 1: + # Compaction filter is incompatible with snapshots. Need to avoid taking + # snapshots, as well as avoid operations that use snapshots for + # verification. + dest_params["acquire_snapshot_one_in"] = 0 + dest_params["compact_range_one_in"] = 0 + # Give the iterator ops away to reads. + dest_params["readpercent"] += dest_params.get("iterpercent", 10) + dest_params["iterpercent"] = 0 + dest_params["test_batches_snapshots"] = 0 return dest_params def gen_cmd_params(args):