diff --git a/db_stress_tool/db_stress_common.h b/db_stress_tool/db_stress_common.h index 686ee4969..5d51829ad 100644 --- a/db_stress_tool/db_stress_common.h +++ b/db_stress_tool/db_stress_common.h @@ -232,6 +232,7 @@ DECLARE_int32(get_property_one_in); DECLARE_string(file_checksum_impl); #ifndef ROCKSDB_LITE +// Options for StackableDB-based BlobDB DECLARE_bool(use_blob_db); DECLARE_uint64(blob_db_min_blob_size); DECLARE_uint64(blob_db_bytes_per_sync); @@ -239,6 +240,16 @@ DECLARE_uint64(blob_db_file_size); DECLARE_bool(blob_db_enable_gc); DECLARE_double(blob_db_gc_cutoff); #endif // !ROCKSDB_LITE + +// Options for integrated BlobDB +DECLARE_bool(allow_setting_blob_options_dynamically); +DECLARE_bool(enable_blob_files); +DECLARE_uint64(min_blob_size); +DECLARE_uint64(blob_file_size); +DECLARE_string(blob_compression_type); +DECLARE_bool(enable_blob_garbage_collection); +DECLARE_double(blob_garbage_collection_age_cutoff); + DECLARE_int32(approximate_size_one_in); DECLARE_bool(sync_fault_injection); diff --git a/db_stress_tool/db_stress_gflags.cc b/db_stress_tool/db_stress_gflags.cc index becc4820f..d46fea322 100644 --- a/db_stress_tool/db_stress_gflags.cc +++ b/db_stress_tool/db_stress_gflags.cc @@ -325,33 +325,68 @@ DEFINE_bool(enable_write_thread_adaptive_yield, true, "Use a yielding spin loop for brief writer thread waits."); #ifndef ROCKSDB_LITE -// BlobDB Options -DEFINE_bool(use_blob_db, false, "Use BlobDB."); +// Options for StackableDB-based BlobDB +DEFINE_bool(use_blob_db, false, "[Stacked BlobDB] Use BlobDB."); -DEFINE_uint64(blob_db_min_blob_size, - ROCKSDB_NAMESPACE::blob_db::BlobDBOptions().min_blob_size, - "Smallest blob to store in a file. Blobs smaller than this " - "will be inlined with the key in the LSM tree."); +DEFINE_uint64( + blob_db_min_blob_size, + ROCKSDB_NAMESPACE::blob_db::BlobDBOptions().min_blob_size, + "[Stacked BlobDB] Smallest blob to store in a file. Blobs " + "smaller than this will be inlined with the key in the LSM tree."); -DEFINE_uint64(blob_db_bytes_per_sync, - ROCKSDB_NAMESPACE::blob_db::BlobDBOptions().bytes_per_sync, - "Sync blob files once per every N bytes written."); +DEFINE_uint64( + blob_db_bytes_per_sync, + ROCKSDB_NAMESPACE::blob_db::BlobDBOptions().bytes_per_sync, + "[Stacked BlobDB] Sync blob files once per every N bytes written."); DEFINE_uint64(blob_db_file_size, ROCKSDB_NAMESPACE::blob_db::BlobDBOptions().blob_file_size, - "Target size of each blob file."); + "[Stacked BlobDB] Target size of each blob file."); DEFINE_bool( blob_db_enable_gc, ROCKSDB_NAMESPACE::blob_db::BlobDBOptions().enable_garbage_collection, - "Enable BlobDB garbage collection."); + "[Stacked BlobDB] Enable BlobDB garbage collection."); DEFINE_double( blob_db_gc_cutoff, ROCKSDB_NAMESPACE::blob_db::BlobDBOptions().garbage_collection_cutoff, - "Cutoff ratio for BlobDB garbage collection."); + "[Stacked BlobDB] Cutoff ratio for BlobDB garbage collection."); #endif // !ROCKSDB_LITE +// Options for integrated BlobDB +DEFINE_bool(allow_setting_blob_options_dynamically, false, + "[Integrated BlobDB] Allow setting blob options dynamically."); + +DEFINE_bool( + enable_blob_files, + ROCKSDB_NAMESPACE::AdvancedColumnFamilyOptions().enable_blob_files, + "[Integrated BlobDB] Enable writing large values to separate blob files."); + +DEFINE_uint64(min_blob_size, + ROCKSDB_NAMESPACE::AdvancedColumnFamilyOptions().min_blob_size, + "[Integrated BlobDB] The size of the smallest value to be stored " + "separately in a blob file."); + +DEFINE_uint64(blob_file_size, + ROCKSDB_NAMESPACE::AdvancedColumnFamilyOptions().blob_file_size, + "[Integrated BlobDB] The size limit for blob files."); + +DEFINE_string(blob_compression_type, "none", + "[Integrated BlobDB] The compression algorithm to use for large " + "values stored in blob files."); + +DEFINE_bool(enable_blob_garbage_collection, + ROCKSDB_NAMESPACE::AdvancedColumnFamilyOptions() + .enable_blob_garbage_collection, + "[Integrated BlobDB] Enable blob garbage collection."); + +DEFINE_double(blob_garbage_collection_age_cutoff, + ROCKSDB_NAMESPACE::AdvancedColumnFamilyOptions() + .blob_garbage_collection_age_cutoff, + "[Integrated BlobDB] The cutoff in terms of blob file age for " + "garbage collection."); + static const bool FLAGS_subcompactions_dummy __attribute__((__unused__)) = RegisterFlagValidator(&FLAGS_subcompactions, &ValidateUint32Range); diff --git a/db_stress_tool/db_stress_test_base.cc b/db_stress_tool/db_stress_test_base.cc index 52e879e24..e31da3a4e 100644 --- a/db_stress_tool/db_stress_test_base.cc +++ b/db_stress_tool/db_stress_test_base.cc @@ -108,6 +108,22 @@ std::shared_ptr StressTest::NewCache(size_t capacity) { } } +std::vector StressTest::GetBlobCompressionTags() { + std::vector compression_tags{"kNoCompression"}; + + if (Snappy_Supported()) { + compression_tags.emplace_back("kSnappyCompression"); + } + if (LZ4_Supported()) { + compression_tags.emplace_back("kLZ4Compression"); + } + if (ZSTD_Supported()) { + compression_tags.emplace_back("kZSTD"); + } + + return compression_tags; +} + bool StressTest::BuildOptionsTable() { if (FLAGS_set_options_one_in <= 0) { return true; @@ -186,6 +202,21 @@ bool StressTest::BuildOptionsTable() { {"max_sequential_skip_in_iterations", {"4", "8", "12"}}, }; + if (FLAGS_allow_setting_blob_options_dynamically) { + options_tbl.emplace("enable_blob_files", + std::vector{"false", "true"}); + options_tbl.emplace("min_blob_size", + std::vector{"0", "16", "256"}); + options_tbl.emplace("blob_file_size", + std::vector{"1M", "16M", "256M", "1G"}); + options_tbl.emplace("blob_compression_type", GetBlobCompressionTags()); + options_tbl.emplace("enable_blob_garbage_collection", + std::vector{"false", "true"}); + options_tbl.emplace( + "blob_garbage_collection_age_cutoff", + std::vector{"0.0", "0.25", "0.5", "0.75", "1.0"}); + } + options_table_ = std::move(options_tbl); for (const auto& iter : options_table_) { @@ -1869,7 +1900,7 @@ void StressTest::PrintEnv() const { fprintf(stdout, "TransactionDB : %s\n", FLAGS_use_txn ? "true" : "false"); #ifndef ROCKSDB_LITE - fprintf(stdout, "BlobDB : %s\n", + fprintf(stdout, "Stacked BlobDB : %s\n", FLAGS_use_blob_db ? "true" : "false"); #endif // !ROCKSDB_LITE fprintf(stdout, "Read only mode : %s\n", @@ -2083,6 +2114,17 @@ void StressTest::Open() { options_.file_checksum_gen_factory = GetFileChecksumImpl(FLAGS_file_checksum_impl); options_.track_and_verify_wals_in_manifest = true; + + // Integrated BlobDB + options_.enable_blob_files = FLAGS_enable_blob_files; + options_.min_blob_size = FLAGS_min_blob_size; + options_.blob_file_size = FLAGS_blob_file_size; + options_.blob_compression_type = + StringToCompressionType(FLAGS_blob_compression_type.c_str()); + options_.enable_blob_garbage_collection = + FLAGS_enable_blob_garbage_collection; + options_.blob_garbage_collection_age_cutoff = + FLAGS_blob_garbage_collection_age_cutoff; } else { #ifdef ROCKSDB_LITE fprintf(stderr, "--options_file not supported in lite mode\n"); @@ -2172,6 +2214,31 @@ void StressTest::Open() { options_.best_efforts_recovery = FLAGS_best_efforts_recovery; options_.paranoid_file_checks = FLAGS_paranoid_file_checks; + if ((options_.enable_blob_files || options_.enable_blob_garbage_collection || + FLAGS_allow_setting_blob_options_dynamically) && + (FLAGS_use_merge || FLAGS_enable_compaction_filter || + FLAGS_checkpoint_one_in > 0 || FLAGS_backup_one_in > 0 || + FLAGS_best_efforts_recovery)) { + fprintf( + stderr, + "Integrated BlobDB is currently incompatible with Merge, compaction " + "filters, checkpoints, backup/restore, and best-effort recovery\n"); + exit(1); + } + + if (options_.enable_blob_files) { + fprintf(stdout, + "Integrated BlobDB: blob files enabled, min blob size %" PRIu64 + ", blob file size %" PRIu64 ", blob compression type %s\n", + options_.min_blob_size, options_.blob_file_size, + CompressionTypeToString(options_.blob_compression_type).c_str()); + } + + if (options_.enable_blob_garbage_collection) { + fprintf(stdout, "Integrated BlobDB: blob GC enabled, cutoff %f\n", + options_.blob_garbage_collection_age_cutoff); + } + fprintf(stdout, "DB path: [%s]\n", FLAGS_db.c_str()); Status s; @@ -2229,6 +2296,7 @@ void StressTest::Open() { options_.create_missing_column_families = true; if (!FLAGS_use_txn) { #ifndef ROCKSDB_LITE + // StackableDB-based BlobDB if (FLAGS_use_blob_db) { blob_db::BlobDBOptions blob_db_options; blob_db_options.min_blob_size = FLAGS_blob_db_min_blob_size; diff --git a/db_stress_tool/db_stress_test_base.h b/db_stress_tool/db_stress_test_base.h index 426af3bd0..8fc15def1 100644 --- a/db_stress_tool/db_stress_test_base.h +++ b/db_stress_tool/db_stress_test_base.h @@ -24,6 +24,8 @@ class StressTest { std::shared_ptr NewCache(size_t capacity); + static std::vector GetBlobCompressionTags(); + bool BuildOptionsTable(); void InitDb(); diff --git a/tools/db_crashtest.py b/tools/db_crashtest.py index f59524f9f..32a6abe1b 100644 --- a/tools/db_crashtest.py +++ b/tools/db_crashtest.py @@ -265,6 +265,23 @@ best_efforts_recovery_params = { "continuous_verification_interval": 0, } +blob_params = { + "allow_setting_blob_options_dynamically": 1, + # Enable blob files and GC with a 75% chance initially; note that they might still be + # enabled/disabled during the test via SetOptions + "enable_blob_files": lambda: random.choice([0] + [1] * 3), + "min_blob_size": lambda: random.choice([0, 16, 256]), + "blob_file_size": lambda: random.choice([1048576, 16777216, 268435456, 1073741824]), + "blob_compression_type": lambda: random.choice(["none", "snappy", "lz4", "zstd"]), + "enable_blob_garbage_collection": lambda: random.choice([0] + [1] * 3), + "blob_garbage_collection_age_cutoff": lambda: random.choice([0.0, 0.25, 0.5, 0.75, 1.0]), + # The following are currently incompatible with the integrated BlobDB + "use_merge": 0, + "enable_compaction_filter": 0, + "backup_one_in": 0, + "checkpoint_one_in": 0, +} + def finalize_and_sanitize(src_params): dest_params = dict([(k, v() if callable(v) else v) for (k, v) in src_params.items()]) @@ -356,6 +373,12 @@ def gen_cmd_params(args): if args.test_best_efforts_recovery: params.update(best_efforts_recovery_params) + # Best-effort recovery and BlobDB are currently incompatible. Test BE recovery + # if specified on the command line; otherwise, apply BlobDB related overrides + # with a 10% chance. + if not args.test_best_efforts_recovery and random.choice([0] * 9 + [1]) == 1: + params.update(blob_params) + for k, v in vars(args).items(): if v is not None: params[k] = v @@ -628,7 +651,8 @@ def main(): + list(whitebox_default_params.items()) + list(simple_default_params.items()) + list(blackbox_simple_default_params.items()) - + list(whitebox_simple_default_params.items())) + + list(whitebox_simple_default_params.items()) + + list(blob_params.items())) for k, v in all_params.items(): parser.add_argument("--" + k, type=type(v() if callable(v) else v))