Add De/Serialization for CompactionInput/Result (#8247)
Summary: The functions will be used for remote compaction parameter input and result. Pull Request resolved: https://github.com/facebook/rocksdb/pull/8247 Test Plan: `make check` Reviewed By: ajkr Differential Revision: D28104680 Pulled By: jay-zhuang fbshipit-source-id: c0a5178e6277125118384278efea2acbf90aa6cb
This commit is contained in:
parent
e9a0bc14dd
commit
a79b46c503
@ -46,6 +46,7 @@
|
||||
#include "monitoring/iostats_context_imp.h"
|
||||
#include "monitoring/perf_context_imp.h"
|
||||
#include "monitoring/thread_status_util.h"
|
||||
#include "options/configurable_helper.h"
|
||||
#include "options/options_helper.h"
|
||||
#include "port/port.h"
|
||||
#include "rocksdb/db.h"
|
||||
@ -54,6 +55,7 @@
|
||||
#include "rocksdb/statistics.h"
|
||||
#include "rocksdb/status.h"
|
||||
#include "rocksdb/table.h"
|
||||
#include "rocksdb/utilities/options_type.h"
|
||||
#include "table/block_based/block.h"
|
||||
#include "table/block_based/block_based_table_factory.h"
|
||||
#include "table/merging_iterator.h"
|
||||
@ -1939,6 +1941,7 @@ std::string CompactionJob::GetTableFileName(uint64_t file_number) {
|
||||
file_number, compact_->compaction->output_path_id());
|
||||
}
|
||||
|
||||
#ifndef ROCKSDB_LITE
|
||||
std::string CompactionServiceCompactionJob::GetTableFileName(
|
||||
uint64_t file_number) {
|
||||
return MakeTableFileName(output_path_, file_number);
|
||||
@ -1984,9 +1987,12 @@ Status CompactionServiceCompactionJob::Run() {
|
||||
c->column_family_data()->CalculateSSTWriteHint(c->output_level());
|
||||
bottommost_level_ = c->bottommost_level();
|
||||
|
||||
compact_->sub_compact_states.emplace_back(c, compaction_input_.begin,
|
||||
compaction_input_.end,
|
||||
compaction_input_.approx_size);
|
||||
Slice begin = compaction_input_.begin;
|
||||
Slice end = compaction_input_.end;
|
||||
compact_->sub_compact_states.emplace_back(
|
||||
c, compaction_input_.has_begin ? &begin : nullptr,
|
||||
compaction_input_.has_end ? &end : nullptr,
|
||||
compaction_input_.approx_size);
|
||||
|
||||
log_buffer_->FlushBufferToLog();
|
||||
LogCompaction();
|
||||
@ -2062,4 +2068,412 @@ void CompactionServiceCompactionJob::CleanupCompaction() {
|
||||
CompactionJob::CleanupCompaction();
|
||||
}
|
||||
|
||||
// Internal binary format for the input and result data
|
||||
enum BinaryFormatVersion : uint32_t {
|
||||
kOptionsString = 1, // Use string format similar to Option string format
|
||||
};
|
||||
|
||||
// offset_of is used to get the offset of a class data member
|
||||
// ex: offset_of(&ColumnFamilyDescriptor::options)
|
||||
// This call will return the offset of options in ColumnFamilyDescriptor class
|
||||
//
|
||||
// This is the same as offsetof() but allow us to work with non standard-layout
|
||||
// classes and structures
|
||||
// refs:
|
||||
// http://en.cppreference.com/w/cpp/concept/StandardLayoutType
|
||||
// https://gist.github.com/graphitemaster/494f21190bb2c63c5516
|
||||
static ColumnFamilyDescriptor dummy_cfd("", ColumnFamilyOptions());
|
||||
template <typename T1>
|
||||
int offset_of(T1 ColumnFamilyDescriptor::*member) {
|
||||
return int(size_t(&(dummy_cfd.*member)) - size_t(&dummy_cfd));
|
||||
}
|
||||
|
||||
static CompactionServiceInput dummy_cs_input;
|
||||
template <typename T1>
|
||||
int offset_of(T1 CompactionServiceInput::*member) {
|
||||
return int(size_t(&(dummy_cs_input.*member)) - size_t(&dummy_cs_input));
|
||||
}
|
||||
|
||||
static std::unordered_map<std::string, OptionTypeInfo> cfd_type_info = {
|
||||
{"name",
|
||||
{offset_of(&ColumnFamilyDescriptor::name), OptionType::kEncodedString,
|
||||
OptionVerificationType::kNormal, OptionTypeFlags::kNone}},
|
||||
{"options",
|
||||
{offset_of(&ColumnFamilyDescriptor::options), OptionType::kConfigurable,
|
||||
OptionVerificationType::kNormal, OptionTypeFlags::kNone,
|
||||
[](const ConfigOptions& opts, const std::string& /*name*/,
|
||||
const std::string& value, char* addr) {
|
||||
auto cf_options = reinterpret_cast<ColumnFamilyOptions*>(addr);
|
||||
return GetColumnFamilyOptionsFromString(opts, ColumnFamilyOptions(),
|
||||
value, cf_options);
|
||||
},
|
||||
[](const ConfigOptions& opts, const std::string& /*name*/,
|
||||
const char* addr, std::string* value) {
|
||||
const auto cf_options =
|
||||
reinterpret_cast<const ColumnFamilyOptions*>(addr);
|
||||
std::string result;
|
||||
auto status =
|
||||
GetStringFromColumnFamilyOptions(opts, *cf_options, &result);
|
||||
*value = "{" + result + "}";
|
||||
return status;
|
||||
},
|
||||
[](const ConfigOptions& opts, const std::string& name, const char* addr1,
|
||||
const char* addr2, std::string* mismatch) {
|
||||
const auto this_one =
|
||||
reinterpret_cast<const ColumnFamilyOptions*>(addr1);
|
||||
const auto that_one =
|
||||
reinterpret_cast<const ColumnFamilyOptions*>(addr2);
|
||||
auto this_conf = CFOptionsAsConfigurable(*this_one);
|
||||
auto that_conf = CFOptionsAsConfigurable(*that_one);
|
||||
std::string mismatch_opt;
|
||||
bool result =
|
||||
this_conf->AreEquivalent(opts, that_conf.get(), &mismatch_opt);
|
||||
if (!result) {
|
||||
*mismatch = name + "." + mismatch_opt;
|
||||
}
|
||||
return result;
|
||||
}}},
|
||||
};
|
||||
|
||||
static std::unordered_map<std::string, OptionTypeInfo> cs_input_type_info = {
|
||||
{"column_family",
|
||||
OptionTypeInfo::Struct("column_family", &cfd_type_info,
|
||||
offset_of(&CompactionServiceInput::column_family),
|
||||
OptionVerificationType::kNormal,
|
||||
OptionTypeFlags::kNone)},
|
||||
{"db_options",
|
||||
{offset_of(&CompactionServiceInput::db_options), OptionType::kConfigurable,
|
||||
OptionVerificationType::kNormal, OptionTypeFlags::kNone,
|
||||
[](const ConfigOptions& opts, const std::string& /*name*/,
|
||||
const std::string& value, char* addr) {
|
||||
auto options = reinterpret_cast<DBOptions*>(addr);
|
||||
return GetDBOptionsFromString(opts, DBOptions(), value, options);
|
||||
},
|
||||
[](const ConfigOptions& opts, const std::string& /*name*/,
|
||||
const char* addr, std::string* value) {
|
||||
const auto options = reinterpret_cast<const DBOptions*>(addr);
|
||||
std::string result;
|
||||
auto status = GetStringFromDBOptions(opts, *options, &result);
|
||||
*value = "{" + result + "}";
|
||||
return status;
|
||||
},
|
||||
[](const ConfigOptions& opts, const std::string& name, const char* addr1,
|
||||
const char* addr2, std::string* mismatch) {
|
||||
const auto this_one = reinterpret_cast<const DBOptions*>(addr1);
|
||||
const auto that_one = reinterpret_cast<const DBOptions*>(addr2);
|
||||
auto this_conf = DBOptionsAsConfigurable(*this_one);
|
||||
auto that_conf = DBOptionsAsConfigurable(*that_one);
|
||||
std::string mismatch_opt;
|
||||
bool result =
|
||||
this_conf->AreEquivalent(opts, that_conf.get(), &mismatch_opt);
|
||||
if (!result) {
|
||||
*mismatch = name + "." + mismatch_opt;
|
||||
}
|
||||
return result;
|
||||
}}},
|
||||
{"snapshots", OptionTypeInfo::Vector<uint64_t>(
|
||||
offset_of(&CompactionServiceInput::snapshots),
|
||||
OptionVerificationType::kNormal, OptionTypeFlags::kNone,
|
||||
{0, OptionType::kUInt64T})},
|
||||
{"input_files", OptionTypeInfo::Vector<std::string>(
|
||||
offset_of(&CompactionServiceInput::input_files),
|
||||
OptionVerificationType::kNormal, OptionTypeFlags::kNone,
|
||||
{0, OptionType::kEncodedString})},
|
||||
{"output_level",
|
||||
{offset_of(&CompactionServiceInput::output_level), OptionType::kInt,
|
||||
OptionVerificationType::kNormal, OptionTypeFlags::kNone}},
|
||||
{"has_begin",
|
||||
{offset_of(&CompactionServiceInput::has_begin), OptionType::kBoolean,
|
||||
OptionVerificationType::kNormal, OptionTypeFlags::kNone}},
|
||||
{"begin",
|
||||
{offset_of(&CompactionServiceInput::begin), OptionType::kEncodedString,
|
||||
OptionVerificationType::kNormal, OptionTypeFlags::kNone}},
|
||||
{"has_end",
|
||||
{offset_of(&CompactionServiceInput::has_end), OptionType::kBoolean,
|
||||
OptionVerificationType::kNormal, OptionTypeFlags::kNone}},
|
||||
{"end",
|
||||
{offset_of(&CompactionServiceInput::end), OptionType::kEncodedString,
|
||||
OptionVerificationType::kNormal, OptionTypeFlags::kNone}},
|
||||
{"approx_size",
|
||||
{offset_of(&CompactionServiceInput::approx_size), OptionType::kUInt64T,
|
||||
OptionVerificationType::kNormal, OptionTypeFlags::kNone}},
|
||||
};
|
||||
|
||||
static std::unordered_map<std::string, OptionTypeInfo>
|
||||
cs_output_file_type_info = {
|
||||
{"file_name",
|
||||
{offsetof(struct CompactionServiceOutputFile, file_name),
|
||||
OptionType::kEncodedString, OptionVerificationType::kNormal,
|
||||
OptionTypeFlags::kNone}},
|
||||
{"smallest_seqno",
|
||||
{offsetof(struct CompactionServiceOutputFile, smallest_seqno),
|
||||
OptionType::kUInt64T, OptionVerificationType::kNormal,
|
||||
OptionTypeFlags::kNone}},
|
||||
{"largest_seqno",
|
||||
{offsetof(struct CompactionServiceOutputFile, largest_seqno),
|
||||
OptionType::kUInt64T, OptionVerificationType::kNormal,
|
||||
OptionTypeFlags::kNone}},
|
||||
{"smallest_internal_key",
|
||||
{offsetof(struct CompactionServiceOutputFile, smallest_internal_key),
|
||||
OptionType::kEncodedString, OptionVerificationType::kNormal,
|
||||
OptionTypeFlags::kNone}},
|
||||
{"largest_internal_key",
|
||||
{offsetof(struct CompactionServiceOutputFile, largest_internal_key),
|
||||
OptionType::kEncodedString, OptionVerificationType::kNormal,
|
||||
OptionTypeFlags::kNone}},
|
||||
{"oldest_ancester_time",
|
||||
{offsetof(struct CompactionServiceOutputFile, oldest_ancester_time),
|
||||
OptionType::kUInt64T, OptionVerificationType::kNormal,
|
||||
OptionTypeFlags::kNone}},
|
||||
{"file_creation_time",
|
||||
{offsetof(struct CompactionServiceOutputFile, file_creation_time),
|
||||
OptionType::kUInt64T, OptionVerificationType::kNormal,
|
||||
OptionTypeFlags::kNone}},
|
||||
{"paranoid_hash",
|
||||
{offsetof(struct CompactionServiceOutputFile, paranoid_hash),
|
||||
OptionType::kUInt64T, OptionVerificationType::kNormal,
|
||||
OptionTypeFlags::kNone}},
|
||||
{"marked_for_compaction",
|
||||
{offsetof(struct CompactionServiceOutputFile, marked_for_compaction),
|
||||
OptionType::kBoolean, OptionVerificationType::kNormal,
|
||||
OptionTypeFlags::kNone}},
|
||||
};
|
||||
|
||||
static std::unordered_map<std::string, OptionTypeInfo>
|
||||
compaction_job_stats_type_info = {
|
||||
{"elapsed_micros",
|
||||
{offsetof(struct CompactionJobStats, elapsed_micros),
|
||||
OptionType::kUInt64T, OptionVerificationType::kNormal,
|
||||
OptionTypeFlags::kNone}},
|
||||
{"cpu_micros",
|
||||
{offsetof(struct CompactionJobStats, cpu_micros), OptionType::kUInt64T,
|
||||
OptionVerificationType::kNormal, OptionTypeFlags::kNone}},
|
||||
{"num_input_records",
|
||||
{offsetof(struct CompactionJobStats, num_input_records),
|
||||
OptionType::kUInt64T, OptionVerificationType::kNormal,
|
||||
OptionTypeFlags::kNone}},
|
||||
{"num_blobs_read",
|
||||
{offsetof(struct CompactionJobStats, num_blobs_read),
|
||||
OptionType::kUInt64T, OptionVerificationType::kNormal,
|
||||
OptionTypeFlags::kNone}},
|
||||
{"num_input_files",
|
||||
{offsetof(struct CompactionJobStats, num_input_files),
|
||||
OptionType::kSizeT, OptionVerificationType::kNormal,
|
||||
OptionTypeFlags::kNone}},
|
||||
{"num_input_files_at_output_level",
|
||||
{offsetof(struct CompactionJobStats, num_input_files_at_output_level),
|
||||
OptionType::kSizeT, OptionVerificationType::kNormal,
|
||||
OptionTypeFlags::kNone}},
|
||||
{"num_output_records",
|
||||
{offsetof(struct CompactionJobStats, num_output_records),
|
||||
OptionType::kUInt64T, OptionVerificationType::kNormal,
|
||||
OptionTypeFlags::kNone}},
|
||||
{"num_output_files",
|
||||
{offsetof(struct CompactionJobStats, num_output_files),
|
||||
OptionType::kSizeT, OptionVerificationType::kNormal,
|
||||
OptionTypeFlags::kNone}},
|
||||
{"num_output_files_blob",
|
||||
{offsetof(struct CompactionJobStats, num_output_files_blob),
|
||||
OptionType::kSizeT, OptionVerificationType::kNormal,
|
||||
OptionTypeFlags::kNone}},
|
||||
{"is_full_compaction",
|
||||
{offsetof(struct CompactionJobStats, is_full_compaction),
|
||||
OptionType::kBoolean, OptionVerificationType::kNormal,
|
||||
OptionTypeFlags::kNone}},
|
||||
{"is_manual_compaction",
|
||||
{offsetof(struct CompactionJobStats, is_manual_compaction),
|
||||
OptionType::kBoolean, OptionVerificationType::kNormal,
|
||||
OptionTypeFlags::kNone}},
|
||||
{"total_input_bytes",
|
||||
{offsetof(struct CompactionJobStats, total_input_bytes),
|
||||
OptionType::kUInt64T, OptionVerificationType::kNormal,
|
||||
OptionTypeFlags::kNone}},
|
||||
{"total_blob_bytes_read",
|
||||
{offsetof(struct CompactionJobStats, total_blob_bytes_read),
|
||||
OptionType::kUInt64T, OptionVerificationType::kNormal,
|
||||
OptionTypeFlags::kNone}},
|
||||
{"total_output_bytes",
|
||||
{offsetof(struct CompactionJobStats, total_output_bytes),
|
||||
OptionType::kUInt64T, OptionVerificationType::kNormal,
|
||||
OptionTypeFlags::kNone}},
|
||||
{"total_output_bytes_blob",
|
||||
{offsetof(struct CompactionJobStats, total_output_bytes_blob),
|
||||
OptionType::kUInt64T, OptionVerificationType::kNormal,
|
||||
OptionTypeFlags::kNone}},
|
||||
{"num_records_replaced",
|
||||
{offsetof(struct CompactionJobStats, num_records_replaced),
|
||||
OptionType::kUInt64T, OptionVerificationType::kNormal,
|
||||
OptionTypeFlags::kNone}},
|
||||
{"total_input_raw_key_bytes",
|
||||
{offsetof(struct CompactionJobStats, total_input_raw_key_bytes),
|
||||
OptionType::kUInt64T, OptionVerificationType::kNormal,
|
||||
OptionTypeFlags::kNone}},
|
||||
{"total_input_raw_value_bytes",
|
||||
{offsetof(struct CompactionJobStats, total_input_raw_value_bytes),
|
||||
OptionType::kUInt64T, OptionVerificationType::kNormal,
|
||||
OptionTypeFlags::kNone}},
|
||||
{"num_input_deletion_records",
|
||||
{offsetof(struct CompactionJobStats, num_input_deletion_records),
|
||||
OptionType::kUInt64T, OptionVerificationType::kNormal,
|
||||
OptionTypeFlags::kNone}},
|
||||
{"num_expired_deletion_records",
|
||||
{offsetof(struct CompactionJobStats, num_expired_deletion_records),
|
||||
OptionType::kUInt64T, OptionVerificationType::kNormal,
|
||||
OptionTypeFlags::kNone}},
|
||||
{"num_corrupt_keys",
|
||||
{offsetof(struct CompactionJobStats, num_corrupt_keys),
|
||||
OptionType::kUInt64T, OptionVerificationType::kNormal,
|
||||
OptionTypeFlags::kNone}},
|
||||
{"file_write_nanos",
|
||||
{offsetof(struct CompactionJobStats, file_write_nanos),
|
||||
OptionType::kUInt64T, OptionVerificationType::kNormal,
|
||||
OptionTypeFlags::kNone}},
|
||||
{"file_range_sync_nanos",
|
||||
{offsetof(struct CompactionJobStats, file_range_sync_nanos),
|
||||
OptionType::kUInt64T, OptionVerificationType::kNormal,
|
||||
OptionTypeFlags::kNone}},
|
||||
{"file_fsync_nanos",
|
||||
{offsetof(struct CompactionJobStats, file_fsync_nanos),
|
||||
OptionType::kUInt64T, OptionVerificationType::kNormal,
|
||||
OptionTypeFlags::kNone}},
|
||||
{"file_prepare_write_nanos",
|
||||
{offsetof(struct CompactionJobStats, file_prepare_write_nanos),
|
||||
OptionType::kUInt64T, OptionVerificationType::kNormal,
|
||||
OptionTypeFlags::kNone}},
|
||||
{"smallest_output_key_prefix",
|
||||
{offsetof(struct CompactionJobStats, smallest_output_key_prefix),
|
||||
OptionType::kEncodedString, OptionVerificationType::kNormal,
|
||||
OptionTypeFlags::kNone}},
|
||||
{"largest_output_key_prefix",
|
||||
{offsetof(struct CompactionJobStats, largest_output_key_prefix),
|
||||
OptionType::kEncodedString, OptionVerificationType::kNormal,
|
||||
OptionTypeFlags::kNone}},
|
||||
{"num_single_del_fallthru",
|
||||
{offsetof(struct CompactionJobStats, num_single_del_fallthru),
|
||||
OptionType::kUInt64T, OptionVerificationType::kNormal,
|
||||
OptionTypeFlags::kNone}},
|
||||
{"num_single_del_mismatch",
|
||||
{offsetof(struct CompactionJobStats, num_single_del_mismatch),
|
||||
OptionType::kUInt64T, OptionVerificationType::kNormal,
|
||||
OptionTypeFlags::kNone}},
|
||||
};
|
||||
|
||||
static std::unordered_map<std::string, OptionTypeInfo> cs_result_type_info = {
|
||||
{"output_files",
|
||||
OptionTypeInfo::Vector<CompactionServiceOutputFile>(
|
||||
offsetof(struct CompactionServiceResult, output_files),
|
||||
OptionVerificationType::kNormal, OptionTypeFlags::kNone,
|
||||
OptionTypeInfo::Struct("output_files", &cs_output_file_type_info, 0,
|
||||
OptionVerificationType::kNormal,
|
||||
OptionTypeFlags::kNone))},
|
||||
{"output_level",
|
||||
{offsetof(struct CompactionServiceResult, output_level), OptionType::kInt,
|
||||
OptionVerificationType::kNormal, OptionTypeFlags::kNone}},
|
||||
{"output_path",
|
||||
{offsetof(struct CompactionServiceResult, output_path),
|
||||
OptionType::kEncodedString, OptionVerificationType::kNormal,
|
||||
OptionTypeFlags::kNone}},
|
||||
{"num_output_records",
|
||||
{offsetof(struct CompactionServiceResult, num_output_records),
|
||||
OptionType::kUInt64T, OptionVerificationType::kNormal,
|
||||
OptionTypeFlags::kNone}},
|
||||
{"total_bytes",
|
||||
{offsetof(struct CompactionServiceResult, total_bytes),
|
||||
OptionType::kUInt64T, OptionVerificationType::kNormal,
|
||||
OptionTypeFlags::kNone}},
|
||||
{"bytes_read",
|
||||
{offsetof(struct CompactionServiceResult, bytes_read),
|
||||
OptionType::kUInt64T, OptionVerificationType::kNormal,
|
||||
OptionTypeFlags::kNone}},
|
||||
{"bytes_written",
|
||||
{offsetof(struct CompactionServiceResult, bytes_written),
|
||||
OptionType::kUInt64T, OptionVerificationType::kNormal,
|
||||
OptionTypeFlags::kNone}},
|
||||
{"stats", OptionTypeInfo::Struct(
|
||||
"stats", &compaction_job_stats_type_info,
|
||||
offsetof(struct CompactionServiceResult, stats),
|
||||
OptionVerificationType::kNormal, OptionTypeFlags::kNone)},
|
||||
};
|
||||
|
||||
Status CompactionServiceInput::Read(const std::string& data_str,
|
||||
CompactionServiceInput* obj) {
|
||||
auto format_version = DecodeFixed32(data_str.data());
|
||||
if (format_version == kOptionsString) {
|
||||
ConfigOptions cf;
|
||||
cf.invoke_prepare_options = false;
|
||||
cf.ignore_unknown_options = true;
|
||||
return OptionTypeInfo::ParseType(
|
||||
cf, data_str.substr(sizeof(BinaryFormatVersion)), cs_input_type_info,
|
||||
obj);
|
||||
} else {
|
||||
return Status::NotSupported(
|
||||
"Compaction Service Input data version not supported: " +
|
||||
ToString(format_version));
|
||||
}
|
||||
}
|
||||
|
||||
Status CompactionServiceInput::Write(std::string* output) {
|
||||
char buf[sizeof(BinaryFormatVersion)];
|
||||
EncodeFixed32(buf, kOptionsString);
|
||||
output->append(buf, sizeof(BinaryFormatVersion));
|
||||
ConfigOptions cf;
|
||||
cf.invoke_prepare_options = false;
|
||||
return OptionTypeInfo::SerializeType(cf, cs_input_type_info, this, output);
|
||||
}
|
||||
|
||||
Status CompactionServiceResult::Read(const std::string& data_str,
|
||||
CompactionServiceResult* obj) {
|
||||
auto format_version = DecodeFixed32(data_str.data());
|
||||
if (format_version == kOptionsString) {
|
||||
ConfigOptions cf;
|
||||
cf.invoke_prepare_options = false;
|
||||
cf.ignore_unknown_options = true;
|
||||
return OptionTypeInfo::ParseType(
|
||||
cf, data_str.substr(sizeof(BinaryFormatVersion)), cs_result_type_info,
|
||||
obj);
|
||||
} else {
|
||||
return Status::NotSupported(
|
||||
"Compaction Service Result data version not supported: " +
|
||||
ToString(format_version));
|
||||
}
|
||||
}
|
||||
|
||||
Status CompactionServiceResult::Write(std::string* output) {
|
||||
char buf[sizeof(BinaryFormatVersion)];
|
||||
EncodeFixed32(buf, kOptionsString);
|
||||
output->append(buf, sizeof(BinaryFormatVersion));
|
||||
ConfigOptions cf;
|
||||
cf.invoke_prepare_options = false;
|
||||
return OptionTypeInfo::SerializeType(cf, cs_result_type_info, this, output);
|
||||
}
|
||||
|
||||
#ifndef NDEBUG
|
||||
bool CompactionServiceResult::TEST_Equals(CompactionServiceResult* other) {
|
||||
std::string mismatch;
|
||||
return TEST_Equals(other, &mismatch);
|
||||
}
|
||||
|
||||
bool CompactionServiceResult::TEST_Equals(CompactionServiceResult* other,
|
||||
std::string* mismatch) {
|
||||
ConfigOptions cf;
|
||||
cf.invoke_prepare_options = false;
|
||||
return OptionTypeInfo::TypesAreEqual(cf, cs_result_type_info, this, other,
|
||||
mismatch);
|
||||
}
|
||||
|
||||
bool CompactionServiceInput::TEST_Equals(CompactionServiceInput* other) {
|
||||
std::string mismatch;
|
||||
return TEST_Equals(other, &mismatch);
|
||||
}
|
||||
|
||||
bool CompactionServiceInput::TEST_Equals(CompactionServiceInput* other,
|
||||
std::string* mismatch) {
|
||||
ConfigOptions cf;
|
||||
cf.invoke_prepare_options = false;
|
||||
return OptionTypeInfo::TypesAreEqual(cf, cs_input_type_info, this, other,
|
||||
mismatch);
|
||||
}
|
||||
#endif // NDEBUG
|
||||
#endif // !ROCKSDB_LITE
|
||||
|
||||
} // namespace ROCKSDB_NAMESPACE
|
||||
|
@ -235,9 +235,23 @@ struct CompactionServiceInput {
|
||||
int output_level;
|
||||
|
||||
// information for subcompaction
|
||||
Slice* begin = nullptr;
|
||||
Slice* end = nullptr;
|
||||
bool has_begin = false;
|
||||
std::string begin;
|
||||
bool has_end = false;
|
||||
std::string end;
|
||||
uint64_t approx_size = 0;
|
||||
|
||||
// serialization interface to read and write the object
|
||||
static Status Read(const std::string& data_str, CompactionServiceInput* obj);
|
||||
Status Write(std::string* output);
|
||||
|
||||
// Initialize a dummy ColumnFamilyDescriptor
|
||||
CompactionServiceInput() : column_family("", ColumnFamilyOptions()) {}
|
||||
|
||||
#ifndef NDEBUG
|
||||
bool TEST_Equals(CompactionServiceInput* other);
|
||||
bool TEST_Equals(CompactionServiceInput* other, std::string* mismatch);
|
||||
#endif // NDEBUG
|
||||
};
|
||||
|
||||
// CompactionServiceOutputFile is the metadata for the output SST file
|
||||
@ -285,6 +299,15 @@ struct CompactionServiceResult {
|
||||
uint64_t bytes_read;
|
||||
uint64_t bytes_written;
|
||||
CompactionJobStats stats;
|
||||
|
||||
// serialization interface to read and write the object
|
||||
static Status Read(const std::string& data_str, CompactionServiceResult* obj);
|
||||
Status Write(std::string* output);
|
||||
|
||||
#ifndef NDEBUG
|
||||
bool TEST_Equals(CompactionServiceResult* other);
|
||||
bool TEST_Equals(CompactionServiceResult* other, std::string* mismatch);
|
||||
#endif // NDEBUG
|
||||
};
|
||||
|
||||
// CompactionServiceCompactionJob is an read-only compaction job, it takes
|
||||
|
@ -1096,6 +1096,174 @@ TEST_F(CompactionJobTest, OldestBlobFileNumber) {
|
||||
/* expected_oldest_blob_file_number */ 19);
|
||||
}
|
||||
|
||||
TEST_F(CompactionJobTest, InputSerialization) {
|
||||
// Setup a random CompactionServiceInput
|
||||
CompactionServiceInput input;
|
||||
const int kStrMaxLen = 1000;
|
||||
Random rnd(static_cast<uint32_t>(time(nullptr)));
|
||||
Random64 rnd64(time(nullptr));
|
||||
input.column_family.name = rnd.RandomString(rnd.Uniform(kStrMaxLen));
|
||||
input.column_family.options.comparator = ReverseBytewiseComparator();
|
||||
input.column_family.options.max_bytes_for_level_base =
|
||||
rnd64.Uniform(UINT64_MAX);
|
||||
input.column_family.options.disable_auto_compactions = rnd.OneIn(2);
|
||||
input.column_family.options.compression = kZSTD;
|
||||
input.column_family.options.compression_opts.level = 4;
|
||||
input.db_options.max_background_flushes = 10;
|
||||
input.db_options.paranoid_checks = rnd.OneIn(2);
|
||||
input.db_options.statistics = CreateDBStatistics();
|
||||
input.db_options.env = env_;
|
||||
while (!rnd.OneIn(10)) {
|
||||
input.snapshots.emplace_back(rnd64.Uniform(UINT64_MAX));
|
||||
}
|
||||
while (!rnd.OneIn(10)) {
|
||||
input.input_files.emplace_back(rnd.RandomString(rnd.Uniform(kStrMaxLen)));
|
||||
}
|
||||
input.output_level = 4;
|
||||
input.has_begin = rnd.OneIn(2);
|
||||
if (input.has_begin) {
|
||||
input.begin = rnd.RandomBinaryString(rnd.Uniform(kStrMaxLen));
|
||||
}
|
||||
input.has_end = rnd.OneIn(2);
|
||||
if (input.has_end) {
|
||||
input.end = rnd.RandomBinaryString(rnd.Uniform(kStrMaxLen));
|
||||
}
|
||||
input.approx_size = rnd64.Uniform(UINT64_MAX);
|
||||
|
||||
std::string output;
|
||||
ASSERT_OK(input.Write(&output));
|
||||
|
||||
// Test deserialization
|
||||
CompactionServiceInput deserialized1;
|
||||
ASSERT_OK(CompactionServiceInput::Read(output, &deserialized1));
|
||||
ASSERT_TRUE(deserialized1.TEST_Equals(&input));
|
||||
|
||||
// Test mismatch
|
||||
deserialized1.db_options.max_background_flushes += 10;
|
||||
std::string mismatch;
|
||||
ASSERT_FALSE(deserialized1.TEST_Equals(&input, &mismatch));
|
||||
ASSERT_EQ(mismatch, "db_options.max_background_flushes");
|
||||
|
||||
// Test unknown field
|
||||
CompactionServiceInput deserialized2;
|
||||
output.clear();
|
||||
ASSERT_OK(input.Write(&output));
|
||||
output.append("new_field=123;");
|
||||
|
||||
ASSERT_OK(CompactionServiceInput::Read(output, &deserialized2));
|
||||
ASSERT_TRUE(deserialized2.TEST_Equals(&input));
|
||||
|
||||
// Test missing field
|
||||
CompactionServiceInput deserialized3;
|
||||
deserialized3.output_level = 0;
|
||||
std::string to_remove = "output_level=4;";
|
||||
size_t pos = output.find(to_remove);
|
||||
ASSERT_TRUE(pos != std::string::npos);
|
||||
output.erase(pos, to_remove.length());
|
||||
ASSERT_OK(CompactionServiceInput::Read(output, &deserialized3));
|
||||
mismatch.clear();
|
||||
ASSERT_FALSE(deserialized3.TEST_Equals(&input, &mismatch));
|
||||
ASSERT_EQ(mismatch, "output_level");
|
||||
|
||||
// manually set the value back, should match the original structure
|
||||
deserialized3.output_level = 4;
|
||||
ASSERT_TRUE(deserialized3.TEST_Equals(&input));
|
||||
|
||||
// Test invalid version
|
||||
output.clear();
|
||||
ASSERT_OK(input.Write(&output));
|
||||
|
||||
uint32_t data_version = DecodeFixed32(output.data());
|
||||
const size_t kDataVersionSize = sizeof(data_version);
|
||||
ASSERT_EQ(data_version,
|
||||
1U); // Update once the default data version is changed
|
||||
char buf[kDataVersionSize];
|
||||
EncodeFixed32(buf, data_version + 10); // make sure it's not valid
|
||||
output.replace(0, kDataVersionSize, buf, kDataVersionSize);
|
||||
Status s = CompactionServiceInput::Read(output, &deserialized3);
|
||||
ASSERT_TRUE(s.IsNotSupported());
|
||||
}
|
||||
|
||||
TEST_F(CompactionJobTest, ResultSerialization) {
|
||||
// Setup a random CompactionServiceResult
|
||||
CompactionServiceResult result;
|
||||
const int kStrMaxLen = 1000;
|
||||
Random rnd(static_cast<uint32_t>(time(nullptr)));
|
||||
Random64 rnd64(time(nullptr));
|
||||
while (!rnd.OneIn(10)) {
|
||||
result.output_files.emplace_back(
|
||||
rnd.RandomString(rnd.Uniform(kStrMaxLen)), rnd64.Uniform(UINT64_MAX),
|
||||
rnd64.Uniform(UINT64_MAX),
|
||||
rnd.RandomBinaryString(rnd.Uniform(kStrMaxLen)),
|
||||
rnd.RandomBinaryString(rnd.Uniform(kStrMaxLen)),
|
||||
rnd64.Uniform(UINT64_MAX), rnd64.Uniform(UINT64_MAX),
|
||||
rnd64.Uniform(UINT64_MAX), rnd.OneIn(2));
|
||||
}
|
||||
result.output_level = rnd.Uniform(10);
|
||||
result.output_path = rnd.RandomString(rnd.Uniform(kStrMaxLen));
|
||||
result.num_output_records = rnd64.Uniform(UINT64_MAX);
|
||||
result.total_bytes = rnd64.Uniform(UINT64_MAX);
|
||||
result.bytes_read = 123;
|
||||
result.bytes_written = rnd64.Uniform(UINT64_MAX);
|
||||
result.stats.elapsed_micros = rnd64.Uniform(UINT64_MAX);
|
||||
result.stats.num_output_files = rnd.Uniform(1000);
|
||||
result.stats.is_full_compaction = rnd.OneIn(2);
|
||||
result.stats.num_single_del_mismatch = rnd64.Uniform(UINT64_MAX);
|
||||
result.stats.num_input_files = 9;
|
||||
|
||||
std::string output;
|
||||
ASSERT_OK(result.Write(&output));
|
||||
|
||||
// Test deserialization
|
||||
CompactionServiceResult deserialized1;
|
||||
ASSERT_OK(CompactionServiceResult::Read(output, &deserialized1));
|
||||
ASSERT_TRUE(deserialized1.TEST_Equals(&result));
|
||||
|
||||
// Test mismatch
|
||||
deserialized1.stats.num_input_files += 10;
|
||||
std::string mismatch;
|
||||
ASSERT_FALSE(deserialized1.TEST_Equals(&result, &mismatch));
|
||||
ASSERT_EQ(mismatch, "stats.num_input_files");
|
||||
|
||||
// Test unknown field
|
||||
CompactionServiceResult deserialized2;
|
||||
output.clear();
|
||||
ASSERT_OK(result.Write(&output));
|
||||
output.append("new_field=123;");
|
||||
|
||||
ASSERT_OK(CompactionServiceResult::Read(output, &deserialized2));
|
||||
ASSERT_TRUE(deserialized2.TEST_Equals(&result));
|
||||
|
||||
// Test missing field
|
||||
CompactionServiceResult deserialized3;
|
||||
deserialized3.bytes_read = 0;
|
||||
std::string to_remove = "bytes_read=123;";
|
||||
size_t pos = output.find(to_remove);
|
||||
ASSERT_TRUE(pos != std::string::npos);
|
||||
output.erase(pos, to_remove.length());
|
||||
ASSERT_OK(CompactionServiceResult::Read(output, &deserialized3));
|
||||
mismatch.clear();
|
||||
ASSERT_FALSE(deserialized3.TEST_Equals(&result, &mismatch));
|
||||
ASSERT_EQ(mismatch, "bytes_read");
|
||||
|
||||
deserialized3.bytes_read = 123;
|
||||
ASSERT_TRUE(deserialized3.TEST_Equals(&result));
|
||||
|
||||
// Test invalid version
|
||||
output.clear();
|
||||
ASSERT_OK(result.Write(&output));
|
||||
|
||||
uint32_t data_version = DecodeFixed32(output.data());
|
||||
const size_t kDataVersionSize = sizeof(data_version);
|
||||
ASSERT_EQ(data_version,
|
||||
1U); // Update once the default data version is changed
|
||||
char buf[kDataVersionSize];
|
||||
EncodeFixed32(buf, data_version + 10); // make sure it's not valid
|
||||
output.replace(0, kDataVersionSize, buf, kDataVersionSize);
|
||||
Status s = CompactionServiceResult::Read(output, &deserialized3);
|
||||
ASSERT_TRUE(s.IsNotSupported());
|
||||
}
|
||||
|
||||
class CompactionJobTimestampTest : public CompactionJobTestBase {
|
||||
public:
|
||||
CompactionJobTimestampTest()
|
||||
|
@ -50,6 +50,7 @@ enum class OptionType {
|
||||
kVector,
|
||||
kConfigurable,
|
||||
kCustomizable,
|
||||
kEncodedString,
|
||||
kUnknown,
|
||||
};
|
||||
|
||||
|
@ -478,6 +478,11 @@ static bool ParseOptionHelper(char* opt_address, const OptionType& opt_type,
|
||||
return ParseEnum<CompactionStopStyle>(
|
||||
compaction_stop_style_string_map, value,
|
||||
reinterpret_cast<CompactionStopStyle*>(opt_address));
|
||||
case OptionType::kEncodedString: {
|
||||
std::string* output_addr = reinterpret_cast<std::string*>(opt_address);
|
||||
(Slice(value)).DecodeHex(output_addr);
|
||||
break;
|
||||
}
|
||||
default:
|
||||
return false;
|
||||
}
|
||||
@ -622,6 +627,11 @@ bool SerializeSingleOptionHelper(const char* opt_address,
|
||||
return SerializeEnum<CompactionStopStyle>(
|
||||
compaction_stop_style_string_map,
|
||||
*reinterpret_cast<const CompactionStopStyle*>(opt_address), value);
|
||||
case OptionType::kEncodedString: {
|
||||
const auto* ptr = reinterpret_cast<const std::string*>(opt_address);
|
||||
*value = (Slice(*ptr)).ToString(true);
|
||||
break;
|
||||
}
|
||||
default:
|
||||
return false;
|
||||
}
|
||||
@ -1251,6 +1261,8 @@ static bool AreOptionsEqual(OptionType type, const char* this_offset,
|
||||
return IsOptionEqual<ChecksumType>(this_offset, that_offset);
|
||||
case OptionType::kEncodingType:
|
||||
return IsOptionEqual<EncodingType>(this_offset, that_offset);
|
||||
case OptionType::kEncodedString:
|
||||
return IsOptionEqual<std::string>(this_offset, that_offset);
|
||||
default:
|
||||
return false;
|
||||
} // End switch
|
||||
|
@ -53,4 +53,13 @@ std::string Random::RandomString(int len) {
|
||||
return ret;
|
||||
}
|
||||
|
||||
std::string Random::RandomBinaryString(int len) {
|
||||
std::string ret;
|
||||
ret.resize(len);
|
||||
for (int i = 0; i < len; i++) {
|
||||
ret[i] = static_cast<char>(Uniform(CHAR_MAX));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
} // namespace ROCKSDB_NAMESPACE
|
||||
|
@ -92,6 +92,9 @@ class Random {
|
||||
// Generates a random string of len bytes using human-readable characters
|
||||
std::string HumanReadableString(int len);
|
||||
|
||||
// Generates a random binary data
|
||||
std::string RandomBinaryString(int len);
|
||||
|
||||
// Returns a Random instance for use by the current thread without
|
||||
// additional locking
|
||||
static Random* GetTLSInstance();
|
||||
|
Loading…
Reference in New Issue
Block a user