Make options.bottommost_compression, compression_opts and bottommost_compression_opts dynamically changeable. (#6615)

Summary:
These three options should be made dynamically changeable. Simply add them to MutableCFOptions and made the change.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6615

Test Plan: Add a unit test to make sure that SetOptions() can change the options.

Reviewed By: riversand963

Differential Revision: D20755951

fbshipit-source-id: 8165f4fd7a7a665cc7fb049698935022a5d2e7ff
This commit is contained in:
sdong 2020-03-31 12:08:41 -07:00 committed by Zhichao Cao
parent b490f3d753
commit a8c6000d86
14 changed files with 211 additions and 129 deletions

View File

@ -24,6 +24,7 @@
* Basic support for user timestamp in iterator. Seek/SeekToFirst/Next and lower/upper bounds are supported. Reverse iteration is not supported. Merge is not considered.
* When file lock failure when the lock is held by the current process, return acquiring time and thread ID in the error message.
* Added a new option, best_efforts_recovery (default: false), to allow database to open in a db dir with missing table files. During best efforts recovery, missing table files are ignored, and database recovers to the most recent state without missing table file. Cross-column-family consistency is not guaranteed even if WAL is enabled.
* options.bottommost_compression, options.compression_opts and options.bottommost_compression_opts are now dynamically changeable.
## 6.8.0 (02/24/2020)
### Java API Changes

View File

@ -314,11 +314,11 @@ class CompactionJobTest : public testing::Test {
num_input_files += level_files.size();
}
Compaction compaction(cfd->current()->storage_info(), *cfd->ioptions(),
*cfd->GetLatestMutableCFOptions(),
compaction_input_files, output_level, 1024 * 1024,
10 * 1024 * 1024, 0, kNoCompression,
cfd->ioptions()->compression_opts, 0, {}, true);
Compaction compaction(
cfd->current()->storage_info(), *cfd->ioptions(),
*cfd->GetLatestMutableCFOptions(), compaction_input_files, output_level,
1024 * 1024, 10 * 1024 * 1024, 0, kNoCompression,
cfd->GetLatestMutableCFOptions()->compression_opts, 0, {}, true);
compaction.SetInputVersion(cfd->current());
LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, db_options_.info_log.get());

View File

@ -110,9 +110,9 @@ CompressionType GetCompressionType(const ImmutableCFOptions& ioptions,
// If bottommost_compression is set and we are compacting to the
// bottommost level then we should use it.
if (ioptions.bottommost_compression != kDisableCompressionOption &&
if (mutable_cf_options.bottommost_compression != kDisableCompressionOption &&
level >= (vstorage->num_non_empty_levels() - 1)) {
return ioptions.bottommost_compression;
return mutable_cf_options.bottommost_compression;
}
// If the user has specified a different compression level for each level,
// then pick the compression for that level.
@ -132,22 +132,22 @@ CompressionType GetCompressionType(const ImmutableCFOptions& ioptions,
}
}
CompressionOptions GetCompressionOptions(const ImmutableCFOptions& ioptions,
CompressionOptions GetCompressionOptions(const MutableCFOptions& cf_options,
const VersionStorageInfo* vstorage,
int level,
const bool enable_compression) {
if (!enable_compression) {
return ioptions.compression_opts;
return cf_options.compression_opts;
}
// If bottommost_compression is set and we are compacting to the
// bottommost level then we should use the specified compression options
// for the bottmomost_compression.
if (ioptions.bottommost_compression != kDisableCompressionOption &&
if (cf_options.bottommost_compression != kDisableCompressionOption &&
level >= (vstorage->num_non_empty_levels() - 1) &&
ioptions.bottommost_compression_opts.enabled) {
return ioptions.bottommost_compression_opts;
cf_options.bottommost_compression_opts.enabled) {
return cf_options.bottommost_compression_opts;
}
return ioptions.compression_opts;
return cf_options.compression_opts;
}
CompactionPicker::CompactionPicker(const ImmutableCFOptions& ioptions,
@ -359,7 +359,7 @@ Compaction* CompactionPicker::CompactFiles(
vstorage, ioptions_, mutable_cf_options, input_files, output_level,
compact_options.output_file_size_limit,
mutable_cf_options.max_compaction_bytes, output_path_id, compression_type,
GetCompressionOptions(ioptions_, vstorage, output_level),
GetCompressionOptions(mutable_cf_options, vstorage, output_level),
compact_options.max_subcompactions,
/* grandparents */ {}, true);
RegisterCompaction(c);
@ -634,7 +634,7 @@ Compaction* CompactionPicker::CompactRange(
compact_range_options.target_path_id,
GetCompressionType(ioptions_, vstorage, mutable_cf_options,
output_level, 1),
GetCompressionOptions(ioptions_, vstorage, output_level),
GetCompressionOptions(mutable_cf_options, vstorage, output_level),
compact_range_options.max_subcompactions, /* grandparents */ {},
/* is manual */ true);
RegisterCompaction(c);
@ -787,7 +787,7 @@ Compaction* CompactionPicker::CompactRange(
compact_range_options.target_path_id,
GetCompressionType(ioptions_, vstorage, mutable_cf_options, output_level,
vstorage->base_level()),
GetCompressionOptions(ioptions_, vstorage, output_level),
GetCompressionOptions(mutable_cf_options, vstorage, output_level),
compact_range_options.max_subcompactions, std::move(grandparents),
/* is manual compaction */ true);

View File

@ -305,9 +305,9 @@ CompressionType GetCompressionType(const ImmutableCFOptions& ioptions,
int level, int base_level,
const bool enable_compression = true);
CompressionOptions GetCompressionOptions(const ImmutableCFOptions& ioptions,
const VersionStorageInfo* vstorage,
int level,
const bool enable_compression = true);
CompressionOptions GetCompressionOptions(
const MutableCFOptions& mutable_cf_options,
const VersionStorageInfo* vstorage, int level,
const bool enable_compression = true);
} // namespace ROCKSDB_NAMESPACE

View File

@ -107,8 +107,9 @@ Compaction* FIFOCompactionPicker::PickTTLCompaction(
Compaction* c = new Compaction(
vstorage, ioptions_, mutable_cf_options, std::move(inputs), 0, 0, 0, 0,
kNoCompression, ioptions_.compression_opts, /* max_subcompactions */ 0,
{}, /* is manual */ false, vstorage->CompactionScore(0),
kNoCompression, mutable_cf_options.compression_opts,
/* max_subcompactions */ 0, {}, /* is manual */ false,
vstorage->CompactionScore(0),
/* is deletion compaction */ true, CompactionReason::kFIFOTtl);
return c;
}
@ -148,7 +149,7 @@ Compaction* FIFOCompactionPicker::PickSizeCompaction(
16 * 1024 * 1024 /* output file size limit */,
0 /* max compaction bytes, not applicable */,
0 /* output path ID */, mutable_cf_options.compression,
ioptions_.compression_opts, 0 /* max_subcompactions */, {},
mutable_cf_options.compression_opts, 0 /* max_subcompactions */, {},
/* is manual */ false, vstorage->CompactionScore(0),
/* is deletion compaction */ false,
CompactionReason::kFIFOReduceNumFiles);
@ -196,8 +197,9 @@ Compaction* FIFOCompactionPicker::PickSizeCompaction(
Compaction* c = new Compaction(
vstorage, ioptions_, mutable_cf_options, std::move(inputs), 0, 0, 0, 0,
kNoCompression, ioptions_.compression_opts, /* max_subcompactions */ 0,
{}, /* is manual */ false, vstorage->CompactionScore(0),
kNoCompression, mutable_cf_options.compression_opts,
/* max_subcompactions */ 0, {}, /* is manual */ false,
vstorage->CompactionScore(0),
/* is deletion compaction */ true, CompactionReason::kFIFOMaxSize);
return c;
}

View File

@ -384,7 +384,7 @@ Compaction* LevelCompactionBuilder::GetCompaction() {
GetPathId(ioptions_, mutable_cf_options_, output_level_),
GetCompressionType(ioptions_, vstorage_, mutable_cf_options_,
output_level_, vstorage_->base_level()),
GetCompressionOptions(ioptions_, vstorage_, output_level_),
GetCompressionOptions(mutable_cf_options_, vstorage_, output_level_),
/* max_subcompactions */ 0, std::move(grandparents_), is_manual_,
start_level_score_, false /* deletion_compaction */, compaction_reason_);

View File

@ -751,7 +751,7 @@ Compaction* UniversalCompactionBuilder::PickCompactionToReduceSortedRuns(
LLONG_MAX, path_id,
GetCompressionType(ioptions_, vstorage_, mutable_cf_options_, start_level,
1, enable_compression),
GetCompressionOptions(ioptions_, vstorage_, start_level,
GetCompressionOptions(mutable_cf_options_, vstorage_, start_level,
enable_compression),
/* max_subcompactions */ 0, /* grandparents */ {}, /* is manual */ false,
score_, false /* deletion_compaction */, compaction_reason);
@ -959,7 +959,7 @@ Compaction* UniversalCompactionBuilder::PickDeleteTriggeredCompaction() {
/* max_grandparent_overlap_bytes */ LLONG_MAX, path_id,
GetCompressionType(ioptions_, vstorage_, mutable_cf_options_,
output_level, 1),
GetCompressionOptions(ioptions_, vstorage_, output_level),
GetCompressionOptions(mutable_cf_options_, vstorage_, output_level),
/* max_subcompactions */ 0, /* grandparents */ {}, /* is manual */ true,
score_, false /* deletion_compaction */,
CompactionReason::kFilesMarkedForCompaction);
@ -1029,7 +1029,7 @@ Compaction* UniversalCompactionBuilder::PickCompactionToOldest(
LLONG_MAX, path_id,
GetCompressionType(ioptions_, vstorage_, mutable_cf_options_, start_level,
1, true /* enable_compression */),
GetCompressionOptions(ioptions_, vstorage_, start_level,
GetCompressionOptions(mutable_cf_options_, vstorage_, start_level,
true /* enable_compression */),
/* max_subcompactions */ 0, /* grandparents */ {}, /* is manual */ false,
score_, false /* deletion_compaction */, compaction_reason);

View File

@ -1248,7 +1248,7 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
snapshot_seqs, earliest_write_conflict_snapshot, snapshot_checker,
GetCompressionFlush(*cfd->ioptions(), mutable_cf_options),
mutable_cf_options.sample_for_compression,
cfd->ioptions()->compression_opts, paranoid_file_checks,
mutable_cf_options.compression_opts, paranoid_file_checks,
cfd->internal_stats(), TableFileCreationReason::kRecovery, &io_s,
&event_logger_, job_id, Env::IO_HIGH, nullptr /* table_properties */,
-1 /* level */, current_time, write_hint);

View File

@ -861,6 +861,62 @@ TEST_F(DBOptionsTest, FIFOTtlBackwardCompatible) {
#endif // ROCKSDB_LITE
TEST_F(DBOptionsTest, ChangeCompression) {
if (!Snappy_Supported() || !LZ4_Supported()) {
return;
}
Options options;
options.write_buffer_size = 10 << 10; // 10KB
options.level0_file_num_compaction_trigger = 2;
options.create_if_missing = true;
options.compression = CompressionType::kLZ4Compression;
options.bottommost_compression = CompressionType::kNoCompression;
options.bottommost_compression_opts.level = 2;
ASSERT_OK(TryReopen(options));
CompressionType compression_used = CompressionType::kLZ4Compression;
CompressionOptions compression_opt_used;
bool compacted = false;
SyncPoint::GetInstance()->SetCallBack(
"LevelCompactionPicker::PickCompaction:Return", [&](void* arg) {
Compaction* c = reinterpret_cast<Compaction*>(arg);
compression_used = c->output_compression();
compression_opt_used = c->output_compression_opts();
compacted = true;
});
SyncPoint::GetInstance()->EnableProcessing();
ASSERT_OK(Put("foo", "foofoofoo"));
ASSERT_OK(Put("bar", "foofoofoo"));
ASSERT_OK(Flush());
ASSERT_OK(Put("foo", "foofoofoo"));
ASSERT_OK(Put("bar", "foofoofoo"));
ASSERT_OK(Flush());
dbfull()->TEST_WaitForCompact();
ASSERT_TRUE(compacted);
ASSERT_EQ(CompressionType::kNoCompression, compression_used);
ASSERT_EQ(options.compression_opts.level, compression_opt_used.level);
compression_used = CompressionType::kLZ4Compression;
compacted = false;
ASSERT_OK(dbfull()->SetOptions(
{{"bottommost_compression", "kSnappyCompression"},
{"bottommost_compression_opts", "0:6:0:0:0:true"}}));
ASSERT_OK(Put("foo", "foofoofoo"));
ASSERT_OK(Put("bar", "foofoofoo"));
ASSERT_OK(Flush());
ASSERT_OK(Put("foo", "foofoofoo"));
ASSERT_OK(Put("bar", "foofoofoo"));
ASSERT_OK(Flush());
dbfull()->TEST_WaitForCompact();
ASSERT_TRUE(compacted);
ASSERT_EQ(CompressionType::kSnappyCompression, compression_used);
ASSERT_EQ(6, compression_opt_used.level);
SyncPoint::GetInstance()->DisableProcessing();
}
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {

View File

@ -389,7 +389,7 @@ Status FlushJob::WriteLevel0Table() {
cfd_->GetName(), existing_snapshots_,
earliest_write_conflict_snapshot_, snapshot_checker_,
output_compression_, mutable_cf_options_.sample_for_compression,
cfd_->ioptions()->compression_opts,
mutable_cf_options_.compression_opts,
mutable_cf_options_.paranoid_file_checks, cfd_->internal_stats(),
TableFileCreationReason::kFlush, &io_s, event_logger_,
job_context_->job_id, Env::IO_HIGH, &table_properties_, 0 /* level */,

View File

@ -57,9 +57,6 @@ ImmutableCFOptions::ImmutableCFOptions(const ImmutableDBOptions& db_options,
cf_options.purge_redundant_kvs_while_flush),
use_fsync(db_options.use_fsync),
compression_per_level(cf_options.compression_per_level),
bottommost_compression(cf_options.bottommost_compression),
bottommost_compression_opts(cf_options.bottommost_compression_opts),
compression_opts(cf_options.compression_opts),
level_compaction_dynamic_level_bytes(
cf_options.level_compaction_dynamic_level_bytes),
access_hint_on_compaction_start(

View File

@ -90,12 +90,6 @@ struct ImmutableCFOptions {
std::vector<CompressionType> compression_per_level;
CompressionType bottommost_compression;
CompressionOptions bottommost_compression_opts;
CompressionOptions compression_opts;
bool level_compaction_dynamic_level_bytes;
Options::AccessHint access_hint_on_compaction_start;
@ -166,6 +160,9 @@ struct MutableCFOptions {
paranoid_file_checks(options.paranoid_file_checks),
report_bg_io_stats(options.report_bg_io_stats),
compression(options.compression),
bottommost_compression(options.bottommost_compression),
compression_opts(options.compression_opts),
bottommost_compression_opts(options.bottommost_compression_opts),
sample_for_compression(options.sample_for_compression) {
RefreshDerivedOptions(options.num_levels, options.compaction_style);
}
@ -198,6 +195,7 @@ struct MutableCFOptions {
paranoid_file_checks(false),
report_bg_io_stats(false),
compression(Snappy_Supported() ? kSnappyCompression : kNoCompression),
bottommost_compression(kDisableCompressionOption),
sample_for_compression(0) {}
explicit MutableCFOptions(const Options& options);
@ -253,6 +251,10 @@ struct MutableCFOptions {
bool paranoid_file_checks;
bool report_bg_io_stats;
CompressionType compression;
CompressionType bottommost_compression;
CompressionOptions compression_opts;
CompressionOptions bottommost_compression_opts;
uint64_t sample_for_compression;
// Derived options

View File

@ -259,6 +259,8 @@ std::unordered_map<std::string, CompressionType>
const std::string kNameComparator = "comparator";
const std::string kNameEnv = "env";
const std::string kNameMergeOperator = "merge_operator";
const std::string kOptNameBMCompOpts = "bottommost_compression_opts";
const std::string kOptNameCompOpts = "compression_opts";
template <typename T>
Status GetStringFromStruct(
@ -786,6 +788,66 @@ bool SerializeSingleOptionHelper(const char* opt_address,
return true;
}
Status ParseCompressionOptions(const std::string& value,
const std::string& name,
CompressionOptions& compression_opts) {
size_t start = 0;
size_t end = value.find(':');
if (end == std::string::npos) {
return Status::InvalidArgument("unable to parse the specified CF option " +
name);
}
compression_opts.window_bits = ParseInt(value.substr(start, end - start));
start = end + 1;
end = value.find(':', start);
if (end == std::string::npos) {
return Status::InvalidArgument("unable to parse the specified CF option " +
name);
}
compression_opts.level = ParseInt(value.substr(start, end - start));
start = end + 1;
if (start >= value.size()) {
return Status::InvalidArgument("unable to parse the specified CF option " +
name);
}
end = value.find(':', start);
compression_opts.strategy =
ParseInt(value.substr(start, value.size() - start));
// max_dict_bytes is optional for backwards compatibility
if (end != std::string::npos) {
start = end + 1;
if (start >= value.size()) {
return Status::InvalidArgument(
"unable to parse the specified CF option " + name);
}
compression_opts.max_dict_bytes =
ParseInt(value.substr(start, value.size() - start));
end = value.find(':', start);
}
// zstd_max_train_bytes is optional for backwards compatibility
if (end != std::string::npos) {
start = end + 1;
if (start >= value.size()) {
return Status::InvalidArgument(
"unable to parse the specified CF option " + name);
}
compression_opts.zstd_max_train_bytes =
ParseInt(value.substr(start, value.size() - start));
end = value.find(':', start);
}
// enabled is optional for backwards compatibility
if (end != std::string::npos) {
start = end + 1;
if (start >= value.size()) {
return Status::InvalidArgument(
"unable to parse the specified CF option " + name);
}
compression_opts.enabled =
ParseBoolean("", value.substr(start, value.size() - start));
}
return Status::OK();
}
Status GetMutableOptionsFromStrings(
const MutableCFOptions& base_options,
const std::unordered_map<std::string, std::string>& options_map,
@ -793,30 +855,50 @@ Status GetMutableOptionsFromStrings(
assert(new_options);
*new_options = base_options;
for (const auto& o : options_map) {
auto& option_name = o.first;
auto& option_value = o.second;
try {
auto iter = cf_options_type_info.find(o.first);
if (iter == cf_options_type_info.end()) {
return Status::InvalidArgument("Unrecognized option: " + o.first);
}
const auto& opt_info = iter->second;
if (!opt_info.is_mutable) {
return Status::InvalidArgument("Option not changeable: " + o.first);
}
if (opt_info.verification == OptionVerificationType::kDeprecated) {
// log warning when user tries to set a deprecated option but don't fail
// the call for compatibility.
ROCKS_LOG_WARN(info_log, "%s is a deprecated option and cannot be set",
o.first.c_str());
continue;
}
bool is_ok = ParseOptionHelper(
reinterpret_cast<char*>(new_options) + opt_info.mutable_offset,
opt_info.type, o.second);
if (!is_ok) {
return Status::InvalidArgument("Error parsing " + o.first);
if (option_name == kOptNameBMCompOpts) {
Status s =
ParseCompressionOptions(option_value, option_name,
new_options->bottommost_compression_opts);
if (!s.ok()) {
return s;
}
} else if (option_name == kOptNameCompOpts) {
Status s = ParseCompressionOptions(option_value, option_name,
new_options->compression_opts);
if (!s.ok()) {
return s;
}
} else {
auto iter = cf_options_type_info.find(option_name);
if (iter == cf_options_type_info.end()) {
return Status::InvalidArgument("Unrecognized option: " + option_name);
}
const auto& opt_info = iter->second;
if (!opt_info.is_mutable) {
return Status::InvalidArgument("Option not changeable: " +
option_name);
}
if (opt_info.verification == OptionVerificationType::kDeprecated) {
// log warning when user tries to set a deprecated option but don't
// fail the call for compatibility.
ROCKS_LOG_WARN(info_log,
"%s is a deprecated option and cannot be set",
option_name.c_str());
continue;
}
bool is_ok = ParseOptionHelper(
reinterpret_cast<char*>(new_options) + opt_info.mutable_offset,
opt_info.type, option_value);
if (!is_ok) {
return Status::InvalidArgument("Error parsing " + option_name);
}
}
} catch (std::exception& e) {
return Status::InvalidArgument("Error parsing " + o.first + ":" +
return Status::InvalidArgument("Error parsing " + option_name + ":" +
std::string(e.what()));
}
}
@ -929,65 +1011,6 @@ Status StringToMap(const std::string& opts_str,
return Status::OK();
}
Status ParseCompressionOptions(const std::string& value, const std::string& name,
CompressionOptions& compression_opts) {
size_t start = 0;
size_t end = value.find(':');
if (end == std::string::npos) {
return Status::InvalidArgument("unable to parse the specified CF option " +
name);
}
compression_opts.window_bits = ParseInt(value.substr(start, end - start));
start = end + 1;
end = value.find(':', start);
if (end == std::string::npos) {
return Status::InvalidArgument("unable to parse the specified CF option " +
name);
}
compression_opts.level = ParseInt(value.substr(start, end - start));
start = end + 1;
if (start >= value.size()) {
return Status::InvalidArgument("unable to parse the specified CF option " +
name);
}
end = value.find(':', start);
compression_opts.strategy =
ParseInt(value.substr(start, value.size() - start));
// max_dict_bytes is optional for backwards compatibility
if (end != std::string::npos) {
start = end + 1;
if (start >= value.size()) {
return Status::InvalidArgument(
"unable to parse the specified CF option " + name);
}
compression_opts.max_dict_bytes =
ParseInt(value.substr(start, value.size() - start));
end = value.find(':', start);
}
// zstd_max_train_bytes is optional for backwards compatibility
if (end != std::string::npos) {
start = end + 1;
if (start >= value.size()) {
return Status::InvalidArgument(
"unable to parse the specified CF option " + name);
}
compression_opts.zstd_max_train_bytes =
ParseInt(value.substr(start, value.size() - start));
end = value.find(':', start);
}
// enabled is optional for backwards compatibility
if (end != std::string::npos) {
start = end + 1;
if (start >= value.size()) {
return Status::InvalidArgument(
"unable to parse the specified CF option " + name);
}
compression_opts.enabled =
ParseBoolean("", value.substr(start, value.size() - start));
}
return Status::OK();
}
Status ParseColumnFamilyOption(const std::string& name,
const std::string& org_value,
ColumnFamilyOptions* new_options,
@ -1986,8 +2009,8 @@ std::unordered_map<std::string, OptionTypeInfo>
false, 0}},
{"bottommost_compression",
{offset_of(&ColumnFamilyOptions::bottommost_compression),
OptionType::kCompressionType, OptionVerificationType::kNormal, false,
0}},
OptionType::kCompressionType, OptionVerificationType::kNormal, true,
offsetof(struct MutableCFOptions, bottommost_compression)}},
{kNameComparator,
{offset_of(&ColumnFamilyOptions::comparator), OptionType::kComparator,
OptionVerificationType::kByName, false, 0}},

View File

@ -190,20 +190,21 @@ Status SstFileWriter::Open(const std::string& file_path) {
CompressionType compression_type;
CompressionOptions compression_opts;
if (r->ioptions.bottommost_compression != kDisableCompressionOption) {
compression_type = r->ioptions.bottommost_compression;
if (r->ioptions.bottommost_compression_opts.enabled) {
compression_opts = r->ioptions.bottommost_compression_opts;
if (r->mutable_cf_options.bottommost_compression !=
kDisableCompressionOption) {
compression_type = r->mutable_cf_options.bottommost_compression;
if (r->mutable_cf_options.bottommost_compression_opts.enabled) {
compression_opts = r->mutable_cf_options.bottommost_compression_opts;
} else {
compression_opts = r->ioptions.compression_opts;
compression_opts = r->mutable_cf_options.compression_opts;
}
} else if (!r->ioptions.compression_per_level.empty()) {
// Use the compression of the last level if we have per level compression
compression_type = *(r->ioptions.compression_per_level.rbegin());
compression_opts = r->ioptions.compression_opts;
compression_opts = r->mutable_cf_options.compression_opts;
} else {
compression_type = r->mutable_cf_options.compression;
compression_opts = r->ioptions.compression_opts;
compression_opts = r->mutable_cf_options.compression_opts;
}
uint64_t sample_for_compression =
r->mutable_cf_options.sample_for_compression;