Avoid overwriting options loaded from OPTIONS (#9943)

Summary:
This is similar to https://github.com/facebook/rocksdb/issues/9862, including the following fixes/refactoring:

1. If OPTIONS file is specified via `-options_file`, majority of options will be loaded from the file. We should not
overwrite options that have been loaded from the file. Instead, we configure only fields of options which are
shared objects and not set by the OPTIONS file. We also configure a few fields, e.g. `create_if_missing` necessary
for stress test to run.

2. Refactor options initialization into three functions, `InitializeOptionsFromFile()`, `InitializeOptionsFromFlags()`
and `InitializeOptionsGeneral()` similar to db_bench. I hope they can be shared in the future. The high-level logic is
as follows:
```cpp
if (!InitializeOptionsFromFile(...)) {
  InitializeOptionsFromFlags(...);
}
InitializeOptionsGeneral(...);
```

3. Currently, the setting for `block_cache_compressed` does not seem correct because it by default specifies a
size of `numeric_limits<size_t>::max()` ((size_t)-1). According to code comments, `-1` indicates default value,
which should be referring to `num_shard_bits` argument.

4. Clarify `fail_if_options_file_error`.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/9943

Test Plan:
1. make check
2. Run stress tests, and manually check generated OPTIONS file and compare them with input OPTIONS files

Reviewed By: jay-zhuang

Differential Revision: D36133769

Pulled By: riversand963

fbshipit-source-id: 35dacdc090a0a72c922907170cd132b9ecaa073e
This commit is contained in:
Yanqin Jin 2022-05-18 12:43:50 -07:00 committed by Facebook GitHub Bot
parent a74f14b550
commit e3a3dbf2be
5 changed files with 340 additions and 239 deletions

View File

@ -108,6 +108,7 @@ DECLARE_double(memtable_prefix_bloom_size_ratio);
DECLARE_bool(memtable_whole_key_filtering);
DECLARE_int32(open_files);
DECLARE_int64(compressed_cache_size);
DECLARE_int32(compressed_cache_numshardbits);
DECLARE_int32(compaction_style);
DECLARE_int32(num_levels);
DECLARE_int32(level0_file_num_compaction_trigger);

View File

@ -187,9 +187,15 @@ DEFINE_int32(open_files, ROCKSDB_NAMESPACE::Options().max_open_files,
"Maximum number of files to keep open at the same time "
"(use default if == 0)");
DEFINE_int64(compressed_cache_size, -1,
DEFINE_int64(compressed_cache_size, 0,
"Number of bytes to use as a cache of compressed data."
" Negative means use default settings.");
" 0 means use default settings.");
DEFINE_int32(
compressed_cache_numshardbits, -1,
"Number of shards for the compressed block cache is 2 ** "
"compressed_cache_numshardbits. Negative value means default settings. "
"This is applied only if compressed_cache_size is greater than 0.");
DEFINE_int32(compaction_style, ROCKSDB_NAMESPACE::Options().compaction_style,
"");

View File

@ -57,7 +57,8 @@ std::shared_ptr<const FilterPolicy> CreateFilterPolicy() {
StressTest::StressTest()
: cache_(NewCache(FLAGS_cache_size, FLAGS_cache_numshardbits)),
compressed_cache_(NewLRUCache(FLAGS_compressed_cache_size)),
compressed_cache_(NewLRUCache(FLAGS_compressed_cache_size,
FLAGS_compressed_cache_numshardbits)),
filter_policy_(CreateFilterPolicy()),
db_(nullptr),
#ifndef ROCKSDB_LITE
@ -2325,198 +2326,11 @@ void StressTest::Open(SharedState* shared) {
#else
(void)shared;
#endif
if (FLAGS_options_file.empty()) {
BlockBasedTableOptions block_based_options;
block_based_options.block_cache = cache_;
block_based_options.cache_index_and_filter_blocks =
FLAGS_cache_index_and_filter_blocks;
block_based_options.metadata_cache_options.top_level_index_pinning =
static_cast<PinningTier>(FLAGS_top_level_index_pinning);
block_based_options.metadata_cache_options.partition_pinning =
static_cast<PinningTier>(FLAGS_partition_pinning);
block_based_options.metadata_cache_options.unpartitioned_pinning =
static_cast<PinningTier>(FLAGS_unpartitioned_pinning);
block_based_options.block_cache_compressed = compressed_cache_;
block_based_options.checksum = checksum_type_e;
block_based_options.block_size = FLAGS_block_size;
block_based_options.cache_usage_options.options_overrides.insert(
{CacheEntryRole::kCompressionDictionaryBuildingBuffer,
{/*.charged = */ FLAGS_charge_compression_dictionary_building_buffer
? CacheEntryRoleOptions::Decision::kEnabled
: CacheEntryRoleOptions::Decision::kDisabled}});
block_based_options.cache_usage_options.options_overrides.insert(
{CacheEntryRole::kFilterConstruction,
{/*.charged = */ FLAGS_charge_filter_construction
? CacheEntryRoleOptions::Decision::kEnabled
: CacheEntryRoleOptions::Decision::kDisabled}});
block_based_options.cache_usage_options.options_overrides.insert(
{CacheEntryRole::kBlockBasedTableReader,
{/*.charged = */ FLAGS_charge_table_reader
? CacheEntryRoleOptions::Decision::kEnabled
: CacheEntryRoleOptions::Decision::kDisabled}});
block_based_options.format_version =
static_cast<uint32_t>(FLAGS_format_version);
block_based_options.index_block_restart_interval =
static_cast<int32_t>(FLAGS_index_block_restart_interval);
block_based_options.filter_policy = filter_policy_;
block_based_options.partition_filters = FLAGS_partition_filters;
block_based_options.optimize_filters_for_memory =
FLAGS_optimize_filters_for_memory;
block_based_options.detect_filter_construct_corruption =
FLAGS_detect_filter_construct_corruption;
block_based_options.index_type =
static_cast<BlockBasedTableOptions::IndexType>(FLAGS_index_type);
block_based_options.prepopulate_block_cache =
static_cast<BlockBasedTableOptions::PrepopulateBlockCache>(
FLAGS_prepopulate_block_cache);
options_.table_factory.reset(
NewBlockBasedTableFactory(block_based_options));
options_.db_write_buffer_size = FLAGS_db_write_buffer_size;
options_.write_buffer_size = FLAGS_write_buffer_size;
options_.max_write_buffer_number = FLAGS_max_write_buffer_number;
options_.min_write_buffer_number_to_merge =
FLAGS_min_write_buffer_number_to_merge;
options_.max_write_buffer_number_to_maintain =
FLAGS_max_write_buffer_number_to_maintain;
options_.max_write_buffer_size_to_maintain =
FLAGS_max_write_buffer_size_to_maintain;
options_.memtable_prefix_bloom_size_ratio =
FLAGS_memtable_prefix_bloom_size_ratio;
options_.memtable_whole_key_filtering = FLAGS_memtable_whole_key_filtering;
options_.disable_auto_compactions = FLAGS_disable_auto_compactions;
options_.max_background_compactions = FLAGS_max_background_compactions;
options_.max_background_flushes = FLAGS_max_background_flushes;
options_.compaction_style =
static_cast<ROCKSDB_NAMESPACE::CompactionStyle>(FLAGS_compaction_style);
if (FLAGS_prefix_size >= 0) {
options_.prefix_extractor.reset(
NewFixedPrefixTransform(FLAGS_prefix_size));
}
options_.max_open_files = FLAGS_open_files;
options_.statistics = dbstats;
options_.env = db_stress_env;
options_.use_fsync = FLAGS_use_fsync;
options_.bytes_per_sync = FLAGS_bytes_per_sync;
options_.wal_bytes_per_sync = FLAGS_wal_bytes_per_sync;
options_.compaction_readahead_size = FLAGS_compaction_readahead_size;
options_.allow_mmap_reads = FLAGS_mmap_read;
options_.allow_mmap_writes = FLAGS_mmap_write;
options_.use_direct_reads = FLAGS_use_direct_reads;
options_.use_direct_io_for_flush_and_compaction =
FLAGS_use_direct_io_for_flush_and_compaction;
options_.recycle_log_file_num =
static_cast<size_t>(FLAGS_recycle_log_file_num);
options_.target_file_size_base = FLAGS_target_file_size_base;
options_.target_file_size_multiplier = FLAGS_target_file_size_multiplier;
options_.max_bytes_for_level_base = FLAGS_max_bytes_for_level_base;
options_.max_bytes_for_level_multiplier =
FLAGS_max_bytes_for_level_multiplier;
options_.level0_stop_writes_trigger = FLAGS_level0_stop_writes_trigger;
options_.level0_slowdown_writes_trigger =
FLAGS_level0_slowdown_writes_trigger;
options_.level0_file_num_compaction_trigger =
FLAGS_level0_file_num_compaction_trigger;
options_.compression = compression_type_e;
options_.bottommost_compression = bottommost_compression_type_e;
options_.compression_opts.max_dict_bytes = FLAGS_compression_max_dict_bytes;
options_.compression_opts.zstd_max_train_bytes =
FLAGS_compression_zstd_max_train_bytes;
options_.compression_opts.parallel_threads =
FLAGS_compression_parallel_threads;
options_.compression_opts.max_dict_buffer_bytes =
FLAGS_compression_max_dict_buffer_bytes;
options_.create_if_missing = true;
options_.max_manifest_file_size = FLAGS_max_manifest_file_size;
options_.inplace_update_support = FLAGS_in_place_update;
options_.max_subcompactions = static_cast<uint32_t>(FLAGS_subcompactions);
options_.allow_concurrent_memtable_write =
FLAGS_allow_concurrent_memtable_write;
options_.experimental_mempurge_threshold =
FLAGS_experimental_mempurge_threshold;
options_.periodic_compaction_seconds = FLAGS_periodic_compaction_seconds;
options_.ttl = FLAGS_compaction_ttl;
options_.enable_pipelined_write = FLAGS_enable_pipelined_write;
options_.enable_write_thread_adaptive_yield =
FLAGS_enable_write_thread_adaptive_yield;
options_.compaction_options_universal.size_ratio =
FLAGS_universal_size_ratio;
options_.compaction_options_universal.min_merge_width =
FLAGS_universal_min_merge_width;
options_.compaction_options_universal.max_merge_width =
FLAGS_universal_max_merge_width;
options_.compaction_options_universal.max_size_amplification_percent =
FLAGS_universal_max_size_amplification_percent;
options_.atomic_flush = FLAGS_atomic_flush;
options_.avoid_unnecessary_blocking_io =
FLAGS_avoid_unnecessary_blocking_io;
options_.write_dbid_to_manifest = FLAGS_write_dbid_to_manifest;
options_.avoid_flush_during_recovery = FLAGS_avoid_flush_during_recovery;
options_.max_write_batch_group_size_bytes =
FLAGS_max_write_batch_group_size_bytes;
options_.level_compaction_dynamic_level_bytes =
FLAGS_level_compaction_dynamic_level_bytes;
options_.file_checksum_gen_factory =
GetFileChecksumImpl(FLAGS_file_checksum_impl);
options_.track_and_verify_wals_in_manifest = true;
// Integrated BlobDB
options_.enable_blob_files = FLAGS_enable_blob_files;
options_.min_blob_size = FLAGS_min_blob_size;
options_.blob_file_size = FLAGS_blob_file_size;
options_.blob_compression_type =
StringToCompressionType(FLAGS_blob_compression_type.c_str());
options_.enable_blob_garbage_collection =
FLAGS_enable_blob_garbage_collection;
options_.blob_garbage_collection_age_cutoff =
FLAGS_blob_garbage_collection_age_cutoff;
options_.blob_garbage_collection_force_threshold =
FLAGS_blob_garbage_collection_force_threshold;
options_.blob_compaction_readahead_size =
FLAGS_blob_compaction_readahead_size;
options_.wal_compression =
StringToCompressionType(FLAGS_wal_compression.c_str());
} else {
#ifdef ROCKSDB_LITE
fprintf(stderr, "--options_file not supported in lite mode\n");
exit(1);
#else
DBOptions db_options;
std::vector<ColumnFamilyDescriptor> cf_descriptors;
Status s = LoadOptionsFromFile(FLAGS_options_file, db_stress_env,
&db_options, &cf_descriptors);
db_options.env = new DbStressEnvWrapper(db_stress_env);
if (!s.ok()) {
fprintf(stderr, "Unable to load options file %s --- %s\n",
FLAGS_options_file.c_str(), s.ToString().c_str());
exit(1);
}
options_ = Options(db_options, cf_descriptors[0].options);
#endif // ROCKSDB_LITE
}
if (FLAGS_rate_limiter_bytes_per_sec > 0) {
options_.rate_limiter.reset(NewGenericRateLimiter(
FLAGS_rate_limiter_bytes_per_sec, 1000 /* refill_period_us */,
10 /* fairness */,
FLAGS_rate_limit_bg_reads ? RateLimiter::Mode::kReadsOnly
: RateLimiter::Mode::kWritesOnly));
}
if (FLAGS_sst_file_manager_bytes_per_sec > 0 ||
FLAGS_sst_file_manager_bytes_per_truncate > 0) {
Status status;
options_.sst_file_manager.reset(NewSstFileManager(
db_stress_env, options_.info_log, "" /* trash_dir */,
static_cast<int64_t>(FLAGS_sst_file_manager_bytes_per_sec),
true /* delete_existing_trash */, &status,
0.25 /* max_trash_db_ratio */,
FLAGS_sst_file_manager_bytes_per_truncate));
if (!status.ok()) {
fprintf(stderr, "SstFileManager creation failed: %s\n",
status.ToString().c_str());
exit(1);
}
if (!InitializeOptionsFromFile(options_)) {
InitializeOptionsFromFlags(cache_, compressed_cache_, filter_policy_,
options_);
}
InitializeOptionsGeneral(cache_, compressed_cache_, filter_policy_, options_);
if (FLAGS_prefix_size == 0 && FLAGS_rep_factory == kHashSkipList) {
fprintf(stderr,
@ -2528,40 +2342,6 @@ void StressTest::Open(SharedState* shared) {
"WARNING: prefix_size is non-zero but "
"memtablerep != prefix_hash\n");
}
switch (FLAGS_rep_factory) {
case kSkipList:
// no need to do anything
break;
#ifndef ROCKSDB_LITE
case kHashSkipList:
options_.memtable_factory.reset(NewHashSkipListRepFactory(10000));
break;
case kVectorRep:
options_.memtable_factory.reset(new VectorRepFactory());
break;
#else
default:
fprintf(stderr,
"RocksdbLite only supports skip list mem table. Skip "
"--rep_factory\n");
#endif // ROCKSDB_LITE
}
if (FLAGS_use_full_merge_v1) {
options_.merge_operator = MergeOperators::CreateDeprecatedPutOperator();
} else {
options_.merge_operator = MergeOperators::CreatePutOperator();
}
if (FLAGS_enable_compaction_filter) {
options_.compaction_filter_factory =
std::make_shared<DbStressCompactionFilterFactory>();
}
options_.table_properties_collector_factories.emplace_back(
std::make_shared<DbStressTablePropertiesCollectorFactory>());
options_.best_efforts_recovery = FLAGS_best_efforts_recovery;
options_.paranoid_file_checks = FLAGS_paranoid_file_checks;
options_.fail_if_options_file_error = FLAGS_fail_if_options_file_error;
if ((options_.enable_blob_files || options_.enable_blob_garbage_collection ||
FLAGS_allow_setting_blob_options_dynamically) &&
@ -2589,10 +2369,6 @@ void StressTest::Open(SharedState* shared) {
Status s;
if (FLAGS_user_timestamp_size > 0) {
CheckAndSetOptionsForUserTimestamp();
}
if (FLAGS_ttl == -1) {
std::vector<std::string> existing_column_families;
s = DB::ListColumnFamilies(DBOptions(options_), FLAGS_db,
@ -2641,13 +2417,14 @@ void StressTest::Open(SharedState* shared) {
cf_descriptors.emplace_back(name, ColumnFamilyOptions(options_));
column_family_names_.push_back(name);
}
options_.listeners.clear();
#ifndef ROCKSDB_LITE
options_.listeners.emplace_back(new DbStressListener(
FLAGS_db, options_.db_paths, cf_descriptors, db_stress_listener_env));
#endif // !ROCKSDB_LITE
RegisterAdditionalListeners();
options_.create_missing_column_families = true;
if (!FLAGS_use_txn) {
// Determine whether we need to ingest file metadata write failures
// during DB reopen. If it does, enable it.
@ -2953,7 +2730,7 @@ void StressTest::Reopen(ThreadState* thread) {
}
}
void StressTest::CheckAndSetOptionsForUserTimestamp() {
void CheckAndSetOptionsForUserTimestamp(Options& options) {
assert(FLAGS_user_timestamp_size > 0);
const Comparator* const cmp = test::BytewiseComparatorWithU64TsWrapper();
assert(cmp);
@ -3011,7 +2788,283 @@ void StressTest::CheckAndSetOptionsForUserTimestamp() {
fprintf(stderr, "Bulk loading may not support timestamp yet.\n");
exit(1);
}
options_.comparator = cmp;
options.comparator = cmp;
}
bool InitializeOptionsFromFile(Options& options) {
#ifndef ROCKSDB_LITE
DBOptions db_options;
std::vector<ColumnFamilyDescriptor> cf_descriptors;
if (!FLAGS_options_file.empty()) {
Status s = LoadOptionsFromFile(FLAGS_options_file, db_stress_env,
&db_options, &cf_descriptors);
if (!s.ok()) {
fprintf(stderr, "Unable to load options file %s --- %s\n",
FLAGS_options_file.c_str(), s.ToString().c_str());
exit(1);
}
db_options.env = new DbStressEnvWrapper(db_stress_env);
options = Options(db_options, cf_descriptors[0].options);
return true;
}
#else
(void)options;
fprintf(stderr, "--options_file not supported in lite mode\n");
exit(1);
#endif //! ROCKSDB_LITE
return false;
}
void InitializeOptionsFromFlags(
const std::shared_ptr<Cache>& cache,
const std::shared_ptr<Cache>& block_cache_compressed,
const std::shared_ptr<const FilterPolicy>& filter_policy,
Options& options) {
BlockBasedTableOptions block_based_options;
block_based_options.block_cache = cache;
block_based_options.cache_index_and_filter_blocks =
FLAGS_cache_index_and_filter_blocks;
block_based_options.metadata_cache_options.top_level_index_pinning =
static_cast<PinningTier>(FLAGS_top_level_index_pinning);
block_based_options.metadata_cache_options.partition_pinning =
static_cast<PinningTier>(FLAGS_partition_pinning);
block_based_options.metadata_cache_options.unpartitioned_pinning =
static_cast<PinningTier>(FLAGS_unpartitioned_pinning);
block_based_options.block_cache_compressed = block_cache_compressed;
block_based_options.checksum = checksum_type_e;
block_based_options.block_size = FLAGS_block_size;
block_based_options.cache_usage_options.options_overrides.insert(
{CacheEntryRole::kCompressionDictionaryBuildingBuffer,
{/*.charged = */ FLAGS_charge_compression_dictionary_building_buffer
? CacheEntryRoleOptions::Decision::kEnabled
: CacheEntryRoleOptions::Decision::kDisabled}});
block_based_options.cache_usage_options.options_overrides.insert(
{CacheEntryRole::kFilterConstruction,
{/*.charged = */ FLAGS_charge_filter_construction
? CacheEntryRoleOptions::Decision::kEnabled
: CacheEntryRoleOptions::Decision::kDisabled}});
block_based_options.cache_usage_options.options_overrides.insert(
{CacheEntryRole::kBlockBasedTableReader,
{/*.charged = */ FLAGS_charge_table_reader
? CacheEntryRoleOptions::Decision::kEnabled
: CacheEntryRoleOptions::Decision::kDisabled}});
block_based_options.format_version =
static_cast<uint32_t>(FLAGS_format_version);
block_based_options.index_block_restart_interval =
static_cast<int32_t>(FLAGS_index_block_restart_interval);
block_based_options.filter_policy = filter_policy;
block_based_options.partition_filters = FLAGS_partition_filters;
block_based_options.optimize_filters_for_memory =
FLAGS_optimize_filters_for_memory;
block_based_options.detect_filter_construct_corruption =
FLAGS_detect_filter_construct_corruption;
block_based_options.index_type =
static_cast<BlockBasedTableOptions::IndexType>(FLAGS_index_type);
block_based_options.prepopulate_block_cache =
static_cast<BlockBasedTableOptions::PrepopulateBlockCache>(
FLAGS_prepopulate_block_cache);
options.table_factory.reset(NewBlockBasedTableFactory(block_based_options));
options.db_write_buffer_size = FLAGS_db_write_buffer_size;
options.write_buffer_size = FLAGS_write_buffer_size;
options.max_write_buffer_number = FLAGS_max_write_buffer_number;
options.min_write_buffer_number_to_merge =
FLAGS_min_write_buffer_number_to_merge;
options.max_write_buffer_number_to_maintain =
FLAGS_max_write_buffer_number_to_maintain;
options.max_write_buffer_size_to_maintain =
FLAGS_max_write_buffer_size_to_maintain;
options.memtable_prefix_bloom_size_ratio =
FLAGS_memtable_prefix_bloom_size_ratio;
options.memtable_whole_key_filtering = FLAGS_memtable_whole_key_filtering;
options.disable_auto_compactions = FLAGS_disable_auto_compactions;
options.max_background_compactions = FLAGS_max_background_compactions;
options.max_background_flushes = FLAGS_max_background_flushes;
options.compaction_style =
static_cast<ROCKSDB_NAMESPACE::CompactionStyle>(FLAGS_compaction_style);
if (FLAGS_prefix_size >= 0) {
options.prefix_extractor.reset(NewFixedPrefixTransform(FLAGS_prefix_size));
}
options.max_open_files = FLAGS_open_files;
options.statistics = dbstats;
options.env = db_stress_env;
options.use_fsync = FLAGS_use_fsync;
options.compaction_readahead_size = FLAGS_compaction_readahead_size;
options.allow_mmap_reads = FLAGS_mmap_read;
options.allow_mmap_writes = FLAGS_mmap_write;
options.use_direct_reads = FLAGS_use_direct_reads;
options.use_direct_io_for_flush_and_compaction =
FLAGS_use_direct_io_for_flush_and_compaction;
options.recycle_log_file_num =
static_cast<size_t>(FLAGS_recycle_log_file_num);
options.target_file_size_base = FLAGS_target_file_size_base;
options.target_file_size_multiplier = FLAGS_target_file_size_multiplier;
options.max_bytes_for_level_base = FLAGS_max_bytes_for_level_base;
options.max_bytes_for_level_multiplier = FLAGS_max_bytes_for_level_multiplier;
options.level0_stop_writes_trigger = FLAGS_level0_stop_writes_trigger;
options.level0_slowdown_writes_trigger = FLAGS_level0_slowdown_writes_trigger;
options.level0_file_num_compaction_trigger =
FLAGS_level0_file_num_compaction_trigger;
options.compression = compression_type_e;
options.bottommost_compression = bottommost_compression_type_e;
options.compression_opts.max_dict_bytes = FLAGS_compression_max_dict_bytes;
options.compression_opts.zstd_max_train_bytes =
FLAGS_compression_zstd_max_train_bytes;
options.compression_opts.parallel_threads =
FLAGS_compression_parallel_threads;
options.compression_opts.max_dict_buffer_bytes =
FLAGS_compression_max_dict_buffer_bytes;
options.max_manifest_file_size = FLAGS_max_manifest_file_size;
options.inplace_update_support = FLAGS_in_place_update;
options.max_subcompactions = static_cast<uint32_t>(FLAGS_subcompactions);
options.allow_concurrent_memtable_write =
FLAGS_allow_concurrent_memtable_write;
options.experimental_mempurge_threshold =
FLAGS_experimental_mempurge_threshold;
options.periodic_compaction_seconds = FLAGS_periodic_compaction_seconds;
options.ttl = FLAGS_compaction_ttl;
options.enable_pipelined_write = FLAGS_enable_pipelined_write;
options.enable_write_thread_adaptive_yield =
FLAGS_enable_write_thread_adaptive_yield;
options.compaction_options_universal.size_ratio = FLAGS_universal_size_ratio;
options.compaction_options_universal.min_merge_width =
FLAGS_universal_min_merge_width;
options.compaction_options_universal.max_merge_width =
FLAGS_universal_max_merge_width;
options.compaction_options_universal.max_size_amplification_percent =
FLAGS_universal_max_size_amplification_percent;
options.atomic_flush = FLAGS_atomic_flush;
options.avoid_unnecessary_blocking_io = FLAGS_avoid_unnecessary_blocking_io;
options.write_dbid_to_manifest = FLAGS_write_dbid_to_manifest;
options.avoid_flush_during_recovery = FLAGS_avoid_flush_during_recovery;
options.max_write_batch_group_size_bytes =
FLAGS_max_write_batch_group_size_bytes;
options.level_compaction_dynamic_level_bytes =
FLAGS_level_compaction_dynamic_level_bytes;
options.track_and_verify_wals_in_manifest = true;
// Integrated BlobDB
options.enable_blob_files = FLAGS_enable_blob_files;
options.min_blob_size = FLAGS_min_blob_size;
options.blob_file_size = FLAGS_blob_file_size;
options.blob_compression_type =
StringToCompressionType(FLAGS_blob_compression_type.c_str());
options.enable_blob_garbage_collection = FLAGS_enable_blob_garbage_collection;
options.blob_garbage_collection_age_cutoff =
FLAGS_blob_garbage_collection_age_cutoff;
options.blob_garbage_collection_force_threshold =
FLAGS_blob_garbage_collection_force_threshold;
options.blob_compaction_readahead_size = FLAGS_blob_compaction_readahead_size;
options.wal_compression =
StringToCompressionType(FLAGS_wal_compression.c_str());
switch (FLAGS_rep_factory) {
case kSkipList:
// no need to do anything
break;
#ifndef ROCKSDB_LITE
case kHashSkipList:
options.memtable_factory.reset(NewHashSkipListRepFactory(10000));
break;
case kVectorRep:
options.memtable_factory.reset(new VectorRepFactory());
break;
#else
default:
fprintf(stderr,
"RocksdbLite only supports skip list mem table. Skip "
"--rep_factory\n");
#endif // ROCKSDB_LITE
}
if (FLAGS_use_full_merge_v1) {
options.merge_operator = MergeOperators::CreateDeprecatedPutOperator();
} else {
options.merge_operator = MergeOperators::CreatePutOperator();
}
if (FLAGS_enable_compaction_filter) {
options.compaction_filter_factory =
std::make_shared<DbStressCompactionFilterFactory>();
}
options.best_efforts_recovery = FLAGS_best_efforts_recovery;
options.paranoid_file_checks = FLAGS_paranoid_file_checks;
options.fail_if_options_file_error = FLAGS_fail_if_options_file_error;
if (FLAGS_user_timestamp_size > 0) {
CheckAndSetOptionsForUserTimestamp(options);
}
}
void InitializeOptionsGeneral(
const std::shared_ptr<Cache>& cache,
const std::shared_ptr<Cache>& block_cache_compressed,
const std::shared_ptr<const FilterPolicy>& filter_policy,
Options& options) {
options.create_missing_column_families = true;
options.create_if_missing = true;
if (!options.statistics) {
options.statistics = dbstats;
}
if (options.env == Options().env) {
options.env = db_stress_env;
}
assert(options.table_factory);
auto table_options =
options.table_factory->GetOptions<BlockBasedTableOptions>();
if (table_options) {
if (FLAGS_cache_size > 0) {
table_options->block_cache = cache;
}
if (!table_options->block_cache_compressed &&
FLAGS_compressed_cache_size > 0) {
table_options->block_cache_compressed = block_cache_compressed;
}
if (!table_options->filter_policy) {
table_options->filter_policy = filter_policy;
}
}
// TODO: row_cache, thread-pool IO priority, CPU priority.
if (!options.rate_limiter) {
if (FLAGS_rate_limiter_bytes_per_sec > 0) {
options.rate_limiter.reset(NewGenericRateLimiter(
FLAGS_rate_limiter_bytes_per_sec, 1000 /* refill_period_us */,
10 /* fairness */,
FLAGS_rate_limit_bg_reads ? RateLimiter::Mode::kReadsOnly
: RateLimiter::Mode::kWritesOnly));
}
}
if (!options.file_checksum_gen_factory) {
options.file_checksum_gen_factory =
GetFileChecksumImpl(FLAGS_file_checksum_impl);
}
if (FLAGS_sst_file_manager_bytes_per_sec > 0 ||
FLAGS_sst_file_manager_bytes_per_truncate > 0) {
Status status;
options.sst_file_manager.reset(NewSstFileManager(
db_stress_env, options.info_log, "" /* trash_dir */,
static_cast<int64_t>(FLAGS_sst_file_manager_bytes_per_sec),
true /* delete_existing_trash */, &status,
0.25 /* max_trash_db_ratio */,
FLAGS_sst_file_manager_bytes_per_truncate));
if (!status.ok()) {
fprintf(stderr, "SstFileManager creation failed: %s\n",
status.ToString().c_str());
exit(1);
}
}
options.table_properties_collector_factories.emplace_back(
std::make_shared<DbStressTablePropertiesCollectorFactory>());
}
} // namespace ROCKSDB_NAMESPACE
#endif // GFLAGS

View File

@ -223,8 +223,6 @@ class StressTest {
void Reopen(ThreadState* thread);
void CheckAndSetOptionsForUserTimestamp();
virtual void RegisterAdditionalListeners() {}
#ifndef ROCKSDB_LITE
@ -259,5 +257,49 @@ class StressTest {
bool is_db_stopped_;
};
// Load options from OPTIONS file and populate `options`.
extern bool InitializeOptionsFromFile(Options& options);
// Initialize `options` using command line arguments.
// When this function is called, `cache`, `block_cache_compressed`,
// `filter_policy` have all been initialized. Therefore, we just pass them as
// input arguments.
extern void InitializeOptionsFromFlags(
const std::shared_ptr<Cache>& cache,
const std::shared_ptr<Cache>& block_cache_compressed,
const std::shared_ptr<const FilterPolicy>& filter_policy, Options& options);
// Initialize `options` on which `InitializeOptionsFromFile()` and
// `InitializeOptionsFromFlags()` have both been called already.
// There are two cases.
// Case 1: OPTIONS file is not specified. Command line arguments have been used
// to initialize `options`. InitializeOptionsGeneral() will use
// `cache`, `block_cache_compressed` and `filter_policy` to initialize
// corresponding fields of `options`. InitializeOptionsGeneral() will
// also set up other fields of `options` so that stress test can run.
// Examples include `create_if_missing` and
// `create_missing_column_families`, etc.
// Case 2: OPTIONS file is specified. It is possible that, after loading from
// the given OPTIONS files, some shared object fields are still not
// initialized because they are not set in the OPTIONS file. In this
// case, if command line arguments indicate that the user wants to set
// up such shared objects, e.g. block cache, compressed block cache,
// row cache, filter policy, then InitializeOptionsGeneral() will honor
// the user's choice, thus passing `cache`, `block_cache_compressed`,
// `filter_policy` as input arguments.
//
// InitializeOptionsGeneral() must not overwrite fields of `options` loaded
// from OPTIONS file.
extern void InitializeOptionsGeneral(
const std::shared_ptr<Cache>& cache,
const std::shared_ptr<Cache>& block_cache_compressed,
const std::shared_ptr<const FilterPolicy>& filter_policy, Options& options);
// If no OPTIONS file is specified, set up `options` so that we can test
// user-defined timestamp which requires `-user_timestamp_size=8`.
// This function also checks for known (currently) incompatible features with
// user-defined timestamp.
extern void CheckAndSetOptionsForUserTimestamp(Options& options);
} // namespace ROCKSDB_NAMESPACE
#endif // GFLAGS

View File

@ -1150,8 +1150,7 @@ struct DBOptions {
#endif // ROCKSDB_LITE
// If true, then DB::Open / CreateColumnFamily / DropColumnFamily
// / SetOptions will fail if options file is not detected or properly
// persisted.
// SetOptions will fail if options file is not properly persisted.
//
// DEFAULT: false
bool fail_if_options_file_error = false;