From 775dc623ad12dc7f9905c215153cdc849fe1f5aa Mon Sep 17 00:00:00 2001 From: Andrew Kryczka Date: Thu, 18 Jun 2020 09:51:14 -0700 Subject: [PATCH] add `CompactionFilter` to stress/crash tests (#6988) Summary: Added a `CompactionFilter` that is aware of the stress test's expected state. It only drops key versions that are already covered according to the expected state. It is incompatible with snapshots (same as all `CompactionFilter`s), so disables all snapshot-related features when used in the crash test. Pull Request resolved: https://github.com/facebook/rocksdb/pull/6988 Test Plan: running a minified blackbox crash test ``` $ TEST_TMPDIR=/dev/shm python tools/db_crashtest.py blackbox --max_key=1000000 -write_buffer_size=1048576 -max_bytes_for_level_base=4194304 -target_file_size_base=1048576 -value_size_mult=33 --interval=10 --duration=3600 ``` Reviewed By: anand1976 Differential Revision: D22072888 Pulled By: ajkr fbshipit-source-id: 727b9d7a90d5eab18be0ec6cd5a810712ac13320 --- db_stress_tool/db_stress_common.h | 40 +++++++--- db_stress_tool/db_stress_compaction_filter.h | 78 ++++++++++++++++++++ db_stress_tool/db_stress_driver.cc | 5 +- db_stress_tool/db_stress_gflags.cc | 5 ++ db_stress_tool/db_stress_test_base.cc | 22 ++++-- db_stress_tool/db_stress_test_base.h | 4 +- db_stress_tool/db_stress_tool.cc | 9 +++ tools/db_crashtest.py | 12 +++ 8 files changed, 153 insertions(+), 22 deletions(-) create mode 100644 db_stress_tool/db_stress_compaction_filter.h diff --git a/db_stress_tool/db_stress_common.h b/db_stress_tool/db_stress_common.h index 816c9182f..da8d40237 100644 --- a/db_stress_tool/db_stress_common.h +++ b/db_stress_tool/db_stress_common.h @@ -238,6 +238,7 @@ DECLARE_bool(sync_fault_injection); DECLARE_bool(best_efforts_recovery); DECLARE_bool(skip_verifydb); +DECLARE_bool(enable_compaction_filter); const long KB = 1024; const int kRandomValueMaxFactor = 3; @@ -439,19 +440,10 @@ extern inline bool GetIntVal(std::string big_endian_key, uint64_t* key_p) { assert(size_key <= key_gen_ctx.weights.size() * sizeof(uint64_t)); - // Pad with zeros to make it a multiple of 8. This function may be called - // with a prefix, in which case we return the first index that falls - // inside or outside that prefix, dependeing on whether the prefix is - // the start of upper bound of a scan - unsigned int pad = sizeof(uint64_t) - (size_key % sizeof(uint64_t)); - if (pad < sizeof(uint64_t)) { - big_endian_key.append(pad, '\0'); - size_key += pad; - } - std::string little_endian_key; little_endian_key.resize(size_key); - for (size_t start = 0; start < size_key; start += sizeof(uint64_t)) { + for (size_t start = 0; start + sizeof(uint64_t) <= size_key; + start += sizeof(uint64_t)) { size_t end = start + sizeof(uint64_t); for (size_t i = 0; i < sizeof(uint64_t); ++i) { little_endian_key[start + i] = big_endian_key[end - 1 - i]; @@ -470,17 +462,41 @@ extern inline bool GetIntVal(std::string big_endian_key, uint64_t* key_p) { uint64_t pfx = prefixes[i]; key += (pfx / key_gen_ctx.weights[i]) * key_gen_ctx.window + pfx % key_gen_ctx.weights[i]; + if (i < prefixes.size() - 1) { + // The encoding writes a `key_gen_ctx.weights[i] - 1` that counts for + // `key_gen_ctx.weights[i]` when there are more prefixes to come. So we + // need to add back the one here as we're at a non-last prefix. + ++key; + } } *key_p = key; return true; } +// Given a string prefix, map it to the first corresponding index in the +// expected values buffer. +inline bool GetFirstIntValInPrefix(std::string big_endian_prefix, + uint64_t* key_p) { + size_t size_key = big_endian_prefix.size(); + // Pad with zeros to make it a multiple of 8. This function may be called + // with a prefix, in which case we return the first index that falls + // inside or outside that prefix, dependeing on whether the prefix is + // the start of upper bound of a scan + unsigned int pad = sizeof(uint64_t) - (size_key % sizeof(uint64_t)); + if (pad < sizeof(uint64_t)) { + big_endian_prefix.append(pad, '\0'); + size_key += pad; + } + return GetIntVal(std::move(big_endian_prefix), key_p); +} + extern inline uint64_t GetPrefixKeyCount(const std::string& prefix, const std::string& ub) { uint64_t start = 0; uint64_t end = 0; - if (!GetIntVal(prefix, &start) || !GetIntVal(ub, &end)) { + if (!GetFirstIntValInPrefix(prefix, &start) || + !GetFirstIntValInPrefix(ub, &end)) { return 0; } diff --git a/db_stress_tool/db_stress_compaction_filter.h b/db_stress_tool/db_stress_compaction_filter.h new file mode 100644 index 000000000..95e4fc7d9 --- /dev/null +++ b/db_stress_tool/db_stress_compaction_filter.h @@ -0,0 +1,78 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#pragma once + +#include "rocksdb/compaction_filter.h" + +namespace ROCKSDB_NAMESPACE { + +// DbStressCompactionFilter is safe to use with db_stress as it does not perform +// any mutation. It only makes `kRemove` decisions for keys that are already +// non-existent according to the `SharedState`. +class DbStressCompactionFilter : public CompactionFilter { + public: + DbStressCompactionFilter(SharedState* state, int cf_id) + : state_(state), cf_id_(cf_id) {} + + Decision FilterV2(int /*level*/, const Slice& key, ValueType /*value_type*/, + const Slice& /*existing_value*/, std::string* /*new_value*/, + std::string* /*skip_until*/) const override { + if (state_ == nullptr) { + return Decision::kKeep; + } + if (key.empty() || ('0' <= key[0] && key[0] <= '9')) { + // It is likely leftover from a test_batches_snapshots run. Below this + // conditional, the test_batches_snapshots key format is not handled + // properly. Just keep it to be safe. + return Decision::kKeep; + } + uint64_t key_num = 0; + bool ok = GetIntVal(key.ToString(), &key_num); + assert(ok); + MutexLock key_lock(state_->GetMutexForKey(cf_id_, key_num)); + if (!state_->Exists(cf_id_, key_num)) { + return Decision::kRemove; + } + return Decision::kKeep; + } + + const char* Name() const override { return "DbStressCompactionFilter"; } + + private: + SharedState* const state_; + const int cf_id_; +}; + +class DbStressCompactionFilterFactory : public CompactionFilterFactory { + public: + DbStressCompactionFilterFactory() : state_(nullptr) {} + + void SetSharedState(SharedState* state) { + MutexLock state_mutex_guard(&state_mutex_); + state_ = state; + } + + std::unique_ptr CreateCompactionFilter( + const CompactionFilter::Context& context) override { + MutexLock state_mutex_guard(&state_mutex_); + return std::unique_ptr( + new DbStressCompactionFilter(state_, context.column_family_id)); + } + + const char* Name() const override { + return "DbStressCompactionFilterFactory"; + } + + private: + port::Mutex state_mutex_; + SharedState* state_; +}; + +} // namespace ROCKSDB_NAMESPACE diff --git a/db_stress_tool/db_stress_driver.cc b/db_stress_tool/db_stress_driver.cc index a1e2b9411..0ae848049 100644 --- a/db_stress_tool/db_stress_driver.cc +++ b/db_stress_tool/db_stress_driver.cc @@ -57,11 +57,8 @@ void ThreadBody(void* v) { bool RunStressTest(StressTest* stress) { stress->InitDb(); - SharedState shared(db_stress_env, stress); - if (FLAGS_read_only) { - stress->InitReadonlyDb(&shared); - } + stress->FinishInitDb(&shared); #ifndef NDEBUG if (FLAGS_sync_fault_injection) { diff --git a/db_stress_tool/db_stress_gflags.cc b/db_stress_tool/db_stress_gflags.cc index f3960387c..fa20ef453 100644 --- a/db_stress_tool/db_stress_gflags.cc +++ b/db_stress_tool/db_stress_gflags.cc @@ -695,4 +695,9 @@ DEFINE_bool(sync_fault_injection, false, DEFINE_bool(best_efforts_recovery, false, "If true, use best efforts recovery."); DEFINE_bool(skip_verifydb, false, "If true, skip VerifyDb() calls."); + +DEFINE_bool(enable_compaction_filter, false, + "If true, configures a compaction filter that returns a kRemove " + "decision for deleted keys."); + #endif // GFLAGS diff --git a/db_stress_tool/db_stress_test_base.cc b/db_stress_tool/db_stress_test_base.cc index 95cf9b755..e7a75c82f 100644 --- a/db_stress_tool/db_stress_test_base.cc +++ b/db_stress_tool/db_stress_test_base.cc @@ -10,6 +10,7 @@ #ifdef GFLAGS #include "db_stress_tool/db_stress_common.h" +#include "db_stress_tool/db_stress_compaction_filter.h" #include "db_stress_tool/db_stress_driver.h" #include "rocksdb/convenience.h" #include "rocksdb/sst_file_manager.h" @@ -195,11 +196,18 @@ void StressTest::InitDb() { BuildOptionsTable(); } -void StressTest::InitReadonlyDb(SharedState* shared) { - uint64_t now = db_stress_env->NowMicros(); - fprintf(stdout, "%s Preloading db with %" PRIu64 " KVs\n", - db_stress_env->TimeToString(now / 1000000).c_str(), FLAGS_max_key); - PreloadDbAndReopenAsReadOnly(FLAGS_max_key, shared); +void StressTest::FinishInitDb(SharedState* shared) { + if (FLAGS_read_only) { + uint64_t now = db_stress_env->NowMicros(); + fprintf(stdout, "%s Preloading db with %" PRIu64 " KVs\n", + db_stress_env->TimeToString(now / 1000000).c_str(), FLAGS_max_key); + PreloadDbAndReopenAsReadOnly(FLAGS_max_key, shared); + } + if (FLAGS_enable_compaction_filter) { + reinterpret_cast( + options_.compaction_filter_factory.get()) + ->SetSharedState(shared); + } } bool StressTest::VerifySecondaries() { @@ -1914,6 +1922,10 @@ void StressTest::Open() { } else { options_.merge_operator = MergeOperators::CreatePutOperator(); } + if (FLAGS_enable_compaction_filter) { + options_.compaction_filter_factory = + std::make_shared(); + } options_.best_efforts_recovery = FLAGS_best_efforts_recovery; diff --git a/db_stress_tool/db_stress_test_base.h b/db_stress_tool/db_stress_test_base.h index ab80fc578..a5d1ab4fe 100644 --- a/db_stress_tool/db_stress_test_base.h +++ b/db_stress_tool/db_stress_test_base.h @@ -27,7 +27,9 @@ class StressTest { bool BuildOptionsTable(); void InitDb(); - void InitReadonlyDb(SharedState*); + // The initialization work is split into two parts to avoid a circular + // dependency with `SharedState`. + void FinishInitDb(SharedState*); // Return false if verification fails. bool VerifySecondaries(); diff --git a/db_stress_tool/db_stress_tool.cc b/db_stress_tool/db_stress_tool.cc index e28779132..2393a835b 100644 --- a/db_stress_tool/db_stress_tool.cc +++ b/db_stress_tool/db_stress_tool.cc @@ -236,6 +236,15 @@ int db_stress_tool(int argc, char** argv) { exit(1); } } + if (FLAGS_enable_compaction_filter && + (FLAGS_acquire_snapshot_one_in > 0 || FLAGS_compact_range_one_in > 0 || + FLAGS_iterpercent > 0 || FLAGS_test_batches_snapshots > 0)) { + fprintf( + stderr, + "Error: acquire_snapshot_one_in, compact_range_one_in, iterpercent, " + "test_batches_snapshots must all be 0 when using compaction filter\n"); + exit(1); + } rocksdb_kill_odds = FLAGS_kill_random_test; rocksdb_kill_prefix_blacklist = SplitString(FLAGS_kill_prefix_blacklist); diff --git a/tools/db_crashtest.py b/tools/db_crashtest.py index d645dc711..6ed1edd1a 100644 --- a/tools/db_crashtest.py +++ b/tools/db_crashtest.py @@ -55,6 +55,7 @@ default_params = { "delrangepercent": 1, "destroy_db_initially": 0, "enable_pipelined_write": lambda: random.randint(0, 1), + "enable_compaction_filter": lambda: random.choice([0, 0, 0, 1]), "expected_values_path": expected_values_file.name, "flush_one_in": 1000000, "get_live_files_one_in": 1000000, @@ -64,6 +65,7 @@ default_params = { "get_current_wal_file_one_in": 0, # Temporarily disable hash index "index_type": lambda: random.choice([0, 0, 0, 2, 2, 3]), + "iterpercent": 10, "max_background_compactions": 20, "max_bytes_for_level_base": 10485760, "max_key": 100000000, @@ -275,6 +277,16 @@ def finalize_and_sanitize(src_params): if dest_params.get("atomic_flush", 0) == 1: # disable pipelined write when atomic flush is used. dest_params["enable_pipelined_write"] = 0 + if dest_params.get("enable_compaction_filter", 0) == 1: + # Compaction filter is incompatible with snapshots. Need to avoid taking + # snapshots, as well as avoid operations that use snapshots for + # verification. + dest_params["acquire_snapshot_one_in"] = 0 + dest_params["compact_range_one_in"] = 0 + # Give the iterator ops away to reads. + dest_params["readpercent"] += dest_params.get("iterpercent", 10) + dest_params["iterpercent"] = 0 + dest_params["test_batches_snapshots"] = 0 return dest_params def gen_cmd_params(args):