Add Bloom/Ribbon hybrid API support

Summary: This is essentially resurrection and fixing of level support
in #8198 and reverted in #8212. TODO: finish summary

Test Plan: new + updated tests
This commit is contained in:
Peter Dillinger 2021-08-19 17:23:00 -07:00
parent 9eb002fcf0
commit c4d22d517f
10 changed files with 306 additions and 52 deletions

View File

@ -20,6 +20,7 @@
* Add property `LiveSstFilesSizeAtTemperature` to retrieve sst file size at different temperature.
* Added a stat rocksdb.secondary.cache.hits
* The integrated BlobDB implementation now supports the tickers `BLOB_DB_BLOB_FILE_BYTES_READ`, `BLOB_DB_GC_NUM_KEYS_RELOCATED`, and `BLOB_DB_GC_BYTES_RELOCATED`, as well as the histograms `BLOB_DB_COMPRESSION_MICROS` and `BLOB_DB_DECOMPRESSION_MICROS`.
* Added hybrid configuration of Ribbon filter and Bloom filter where some LSM levels use Ribbon for memory space efficiency and some use Bloom for speed. See NewRibbonFilterPolicy. This also changes the default behavior of NewRibbonFilterPolicy to use Bloom for flushes under Leveled and Universal compaction and Ribbon otherwise.
## Public API change
* Added APIs to decode and replay trace file via Replayer class. Added `DB::NewDefaultReplayer()` to create a default Replayer instance. Added `TraceReader::Reset()` to restart reading a trace file. Created trace_record.h, trace_record_result.h and utilities/replayer.h files to access the decoded Trace records, replay them, and query the actual operation results.

View File

@ -146,7 +146,7 @@ DECLARE_bool(enable_write_thread_adaptive_yield);
DECLARE_int32(reopen);
DECLARE_double(bloom_bits);
DECLARE_bool(use_block_based_filter);
DECLARE_bool(use_ribbon_filter);
DECLARE_int32(ribbon_starting_level);
DECLARE_bool(partition_filters);
DECLARE_bool(optimize_filters_for_memory);
DECLARE_int32(index_type);

View File

@ -419,8 +419,11 @@ DEFINE_bool(use_block_based_filter, false,
"use block based filter"
"instead of full filter for block based table");
DEFINE_bool(use_ribbon_filter, false,
"Use Ribbon filter instead of Bloom filter");
DEFINE_int32(
ribbon_starting_level, 999,
"Use Bloom filter on levels below specified and Ribbon beginning on level "
"specified. Flush is considered level -1. 999 or more -> always Bloom. 0 "
"-> Ribbon except Bloom for flush. -1 -> always Ribbon.");
DEFINE_bool(partition_filters, false,
"use partitioned filters "

View File

@ -31,19 +31,21 @@ std::shared_ptr<const FilterPolicy> CreateFilterPolicy() {
return BlockBasedTableOptions().filter_policy;
}
const FilterPolicy* new_policy;
if (FLAGS_use_ribbon_filter) {
// Old and new API should be same
if (std::random_device()() & 1) {
new_policy = NewExperimentalRibbonFilterPolicy(FLAGS_bloom_bits);
if (FLAGS_use_block_based_filter) {
if (FLAGS_ribbon_starting_level < 999) {
fprintf(
stderr,
"Cannot combine use_block_based_filter and ribbon_starting_level\n");
exit(1);
} else {
new_policy = NewRibbonFilterPolicy(FLAGS_bloom_bits);
}
} else {
if (FLAGS_use_block_based_filter) {
new_policy = NewBloomFilterPolicy(FLAGS_bloom_bits, true);
} else {
new_policy = NewBloomFilterPolicy(FLAGS_bloom_bits, false);
}
} else if (FLAGS_ribbon_starting_level >= 999) {
// Use Bloom API
new_policy = NewBloomFilterPolicy(FLAGS_bloom_bits, false);
} else {
new_policy = NewRibbonFilterPolicy(
FLAGS_bloom_bits, /* bloom_before_level */ FLAGS_ribbon_starting_level);
}
return std::shared_ptr<const FilterPolicy>(new_policy);
}

View File

@ -250,6 +250,20 @@ extern const FilterPolicy* NewBloomFilterPolicy(
// you pass in 10 for bloom_equivalent_bits_per_key, you'll get the same
// 0.95% FP rate as Bloom filter but only using about 7 bits per key.
//
// The space savings of Ribbon filters makes sense for lower (higher
// numbered; larger; longer-lived) levels of LSM, whereas the speed of
// Bloom filters make sense for highest levels of LSM. Setting
// bloom_before_level allows for this design with Level and Universal
// compaction styles. For example, bloom_before_level=1 means that Bloom
// filters will be used in level 0, including flushes, and Ribbon
// filters elsewhere, including FIFO compaction and external SST files.
// For this option, memtable flushes are considered level -1 (so that
// flushes can be distinguished from intra-L0 compaction).
// bloom_before_level=0 (default) -> Generate Bloom filters only for
// flushes under Level and Universal compaction styles.
// bloom_before_level=-1 -> Always generate Ribbon filters (except in
// some extreme or exceptional cases).
//
// Ribbon filters are compatible with RocksDB >= 6.15.0. Earlier
// versions reading the data will behave as if no filter was used
// (degraded performance until compaction rebuilds filters). All
@ -266,12 +280,12 @@ extern const FilterPolicy* NewBloomFilterPolicy(
// Also consider using optimize_filters_for_memory to save filter
// memory.
extern const FilterPolicy* NewRibbonFilterPolicy(
double bloom_equivalent_bits_per_key);
double bloom_equivalent_bits_per_key, int bloom_before_level = 0);
// Old name
// Old name and old default behavior
inline const FilterPolicy* NewExperimentalRibbonFilterPolicy(
double bloom_equivalent_bits_per_key) {
return NewRibbonFilterPolicy(bloom_equivalent_bits_per_key);
return NewRibbonFilterPolicy(bloom_equivalent_bits_per_key, -1);
}
} // namespace ROCKSDB_NAMESPACE

View File

@ -932,14 +932,49 @@ TEST_F(OptionsTest, GetBlockBasedTableOptionsFromString) {
new_opt.cache_index_and_filter_blocks);
ASSERT_EQ(table_opt.filter_policy, new_opt.filter_policy);
// Ribbon filter policy
// Ribbon filter policy (no Bloom hybrid)
ASSERT_OK(GetBlockBasedTableOptionsFromString(
config_options, table_opt, "filter_policy=ribbonfilter:5.678;",
config_options, table_opt, "filter_policy=ribbonfilter:5.678:-1;",
&new_opt));
ASSERT_TRUE(new_opt.filter_policy != nullptr);
bfp = dynamic_cast<const BloomFilterPolicy*>(new_opt.filter_policy.get());
EXPECT_EQ(bfp->GetMillibitsPerKey(), 5678);
EXPECT_EQ(bfp->GetMode(), BloomFilterPolicy::kStandard128Ribbon);
// Ribbon filter policy (default Bloom hybrid)
ASSERT_OK(GetBlockBasedTableOptionsFromString(
config_options, table_opt, "filter_policy=ribbonfilter:6.789;",
&new_opt));
ASSERT_TRUE(new_opt.filter_policy != nullptr);
auto ltfp = dynamic_cast<const LevelThresholdFilterPolicy*>(
new_opt.filter_policy.get());
EXPECT_EQ(ltfp->TEST_GetStartingLevelForB(), 0);
bfp = dynamic_cast<const BloomFilterPolicy*>(ltfp->TEST_GetPolicyA());
EXPECT_EQ(bfp->GetMillibitsPerKey(), 6789);
EXPECT_EQ(bfp->GetMode(), BloomFilterPolicy::kFastLocalBloom);
bfp = dynamic_cast<const BloomFilterPolicy*>(ltfp->TEST_GetPolicyB());
EXPECT_EQ(bfp->GetMillibitsPerKey(), 6789);
EXPECT_EQ(bfp->GetMode(), BloomFilterPolicy::kStandard128Ribbon);
// Ribbon filter policy (custom Bloom hybrid)
ASSERT_OK(GetBlockBasedTableOptionsFromString(
config_options, table_opt, "filter_policy=ribbonfilter:6.789:5;",
&new_opt));
ASSERT_TRUE(new_opt.filter_policy != nullptr);
ltfp = dynamic_cast<const LevelThresholdFilterPolicy*>(
new_opt.filter_policy.get());
EXPECT_EQ(ltfp->TEST_GetStartingLevelForB(), 5);
bfp = dynamic_cast<const BloomFilterPolicy*>(ltfp->TEST_GetPolicyA());
EXPECT_EQ(bfp->GetMillibitsPerKey(), 6789);
EXPECT_EQ(bfp->GetMode(), BloomFilterPolicy::kFastLocalBloom);
bfp = dynamic_cast<const BloomFilterPolicy*>(ltfp->TEST_GetPolicyB());
EXPECT_EQ(bfp->GetMillibitsPerKey(), 6789);
EXPECT_EQ(bfp->GetMode(), BloomFilterPolicy::kStandard128Ribbon);
// Old name
ASSERT_OK(GetBlockBasedTableOptionsFromString(
config_options, table_opt, "filter_policy=experimental_ribbon:6.789;",

View File

@ -1062,7 +1062,7 @@ BloomFilterPolicy::BloomFilterPolicy(double bits_per_key, Mode mode)
BloomFilterPolicy::~BloomFilterPolicy() {}
const char* BloomFilterPolicy::Name() const {
const char* BuiltinFilterPolicy::Name() const {
return "rocksdb.BuiltinBloomFilter";
}
@ -1095,8 +1095,8 @@ void BloomFilterPolicy::CreateFilter(const Slice* keys, int n,
}
}
bool BloomFilterPolicy::KeyMayMatch(const Slice& key,
const Slice& bloom_filter) const {
bool BuiltinFilterPolicy::KeyMayMatch(const Slice& key,
const Slice& bloom_filter) const {
const size_t len = bloom_filter.size();
if (len < 2 || len > 0xffffffffU) {
return false;
@ -1118,7 +1118,7 @@ bool BloomFilterPolicy::KeyMayMatch(const Slice& key,
array);
}
FilterBitsBuilder* BloomFilterPolicy::GetFilterBitsBuilder() const {
FilterBitsBuilder* BuiltinFilterPolicy::GetFilterBitsBuilder() const {
// This code path should no longer be used, for the built-in
// BloomFilterPolicy. Internal to RocksDB and outside
// BloomFilterPolicy, only get a FilterBitsBuilder with
@ -1192,7 +1192,7 @@ FilterBitsBuilder* BloomFilterPolicy::GetBuilderFromContext(
// Read metadata to determine what kind of FilterBitsReader is needed
// and return a new one.
FilterBitsReader* BloomFilterPolicy::GetFilterBitsReader(
FilterBitsReader* BuiltinFilterPolicy::GetFilterBitsReader(
const Slice& contents) const {
uint32_t len_with_meta = static_cast<uint32_t>(contents.size());
if (len_with_meta <= kMetadataLen) {
@ -1273,7 +1273,7 @@ FilterBitsReader* BloomFilterPolicy::GetFilterBitsReader(
log2_cache_line_size);
}
FilterBitsReader* BloomFilterPolicy::GetRibbonBitsReader(
FilterBitsReader* BuiltinFilterPolicy::GetRibbonBitsReader(
const Slice& contents) const {
uint32_t len_with_meta = static_cast<uint32_t>(contents.size());
uint32_t len = len_with_meta - kMetadataLen;
@ -1297,7 +1297,7 @@ FilterBitsReader* BloomFilterPolicy::GetRibbonBitsReader(
}
// For newer Bloom filter implementations
FilterBitsReader* BloomFilterPolicy::GetBloomBitsReader(
FilterBitsReader* BuiltinFilterPolicy::GetBloomBitsReader(
const Slice& contents) const {
uint32_t len_with_meta = static_cast<uint32_t>(contents.size());
uint32_t len = len_with_meta - kMetadataLen;
@ -1370,10 +1370,69 @@ const FilterPolicy* NewBloomFilterPolicy(double bits_per_key,
return new BloomFilterPolicy(bits_per_key, m);
}
extern const FilterPolicy* NewRibbonFilterPolicy(
double bloom_equivalent_bits_per_key) {
return new BloomFilterPolicy(bloom_equivalent_bits_per_key,
BloomFilterPolicy::kStandard128Ribbon);
// Chooses between two filter policies based on LSM level, but
// only for Level and Universal compaction styles. Flush is treated
// as level -1. Policy b is considered fallback / primary policy.
LevelThresholdFilterPolicy::LevelThresholdFilterPolicy(
std::unique_ptr<const FilterPolicy>&& a,
std::unique_ptr<const FilterPolicy>&& b, int starting_level_for_b)
: policy_a_(std::move(a)),
policy_b_(std::move(b)),
starting_level_for_b_(starting_level_for_b) {
// Don't use this wrapper class if you were going to set to -1
assert(starting_level_for_b_ >= 0);
}
// Deprecated block-based filter only
void LevelThresholdFilterPolicy::CreateFilter(const Slice* keys, int n,
std::string* dst) const {
policy_b_->CreateFilter(keys, n, dst);
}
FilterBitsBuilder* LevelThresholdFilterPolicy::GetBuilderWithContext(
const FilterBuildingContext& context) const {
switch (context.compaction_style) {
case kCompactionStyleLevel:
case kCompactionStyleUniversal: {
int levelish;
if (context.reason == TableFileCreationReason::kFlush) {
// Treat flush as level -1
assert(context.level_at_creation == 0);
levelish = -1;
} else if (context.level_at_creation == -1) {
// Unknown level
// Policy b considered fallback / primary
return policy_b_->GetBuilderWithContext(context);
} else {
levelish = context.level_at_creation;
}
if (levelish >= starting_level_for_b_) {
return policy_b_->GetBuilderWithContext(context);
} else {
return policy_a_->GetBuilderWithContext(context);
}
}
case kCompactionStyleFIFO:
case kCompactionStyleNone:
break;
}
// Policy b considered fallback / primary
return policy_b_->GetBuilderWithContext(context);
}
const FilterPolicy* NewRibbonFilterPolicy(double bloom_equivalent_bits_per_key,
int bloom_before_level) {
std::unique_ptr<const FilterPolicy> ribbon_only{new BloomFilterPolicy(
bloom_equivalent_bits_per_key, BloomFilterPolicy::kStandard128Ribbon)};
if (bloom_before_level > -1) {
// Could also use Bloom policy
std::unique_ptr<const FilterPolicy> bloom_only{new BloomFilterPolicy(
bloom_equivalent_bits_per_key, BloomFilterPolicy::kFastLocalBloom)};
return new LevelThresholdFilterPolicy(
std::move(bloom_only), std::move(ribbon_only), bloom_before_level);
} else {
return ribbon_only.release();
}
}
FilterBuildingContext::FilterBuildingContext(
@ -1410,9 +1469,18 @@ Status FilterPolicy::CreateFromString(
policy->reset(
NewExperimentalRibbonFilterPolicy(bloom_equivalent_bits_per_key));
} else if (value.compare(0, kRibbonName.size(), kRibbonName) == 0) {
size_t pos = value.find(':', kRibbonName.size());
int bloom_before_level;
if (pos == std::string::npos) {
pos = value.size();
bloom_before_level = 0;
} else {
bloom_before_level = ParseInt(trim(value.substr(pos + 1)));
}
double bloom_equivalent_bits_per_key =
ParseDouble(trim(value.substr(kRibbonName.size())));
policy->reset(NewRibbonFilterPolicy(bloom_equivalent_bits_per_key));
ParseDouble(trim(value.substr(kRibbonName.size(), pos)));
policy->reset(NewRibbonFilterPolicy(bloom_equivalent_bits_per_key,
bloom_before_level));
} else {
return Status::NotFound("Invalid filter policy name ", value);
#else

View File

@ -38,10 +38,39 @@ class BuiltinFilterBitsBuilder : public FilterBitsBuilder {
virtual double EstimatedFpRate(size_t num_entries, size_t bytes) = 0;
};
// RocksDB built-in filter policy for Bloom or Bloom-like filters.
// Abstract base class for RocksDB built-in filter policies.
// This class is considered internal API and subject to change.
// See NewBloomFilterPolicy.
class BloomFilterPolicy : public FilterPolicy {
class BuiltinFilterPolicy : public FilterPolicy {
public:
// Shared name because any built-in policy can read filters from
// any other
const char* Name() const override;
// Deprecated block-based filter only
bool KeyMayMatch(const Slice& key, const Slice& bloom_filter) const override;
// Old API
FilterBitsBuilder* GetFilterBitsBuilder() const override;
// Read metadata to determine what kind of FilterBitsReader is needed
// and return a new one. This must successfully process any filter data
// generated by a built-in FilterBitsBuilder, regardless of the impl
// chosen for this BloomFilterPolicy. Not compatible with CreateFilter.
FilterBitsReader* GetFilterBitsReader(const Slice& contents) const override;
private:
// For Bloom filter implementation(s) (except deprecated block-based filter)
FilterBitsReader* GetBloomBitsReader(const Slice& contents) const;
// For Ribbon filter implementation(s)
FilterBitsReader* GetRibbonBitsReader(const Slice& contents) const;
};
// RocksDB built-in filter policy for Bloom or Bloom-like filters including
// Ribbon filters.
// This class is considered internal API and subject to change.
// See NewBloomFilterPolicy and NewRibbonFilterPolicy.
class BloomFilterPolicy : public BuiltinFilterPolicy {
public:
// An internal marker for operating modes of BloomFilterPolicy, in terms
// of selecting an implementation. This makes it easier for tests to track
@ -88,16 +117,9 @@ class BloomFilterPolicy : public FilterPolicy {
~BloomFilterPolicy() override;
const char* Name() const override;
// Deprecated block-based filter only
void CreateFilter(const Slice* keys, int n, std::string* dst) const override;
// Deprecated block-based filter only
bool KeyMayMatch(const Slice& key, const Slice& bloom_filter) const override;
FilterBitsBuilder* GetFilterBitsBuilder() const override;
// To use this function, call GetBuilderFromContext().
//
// Neither the context nor any objects therein should be saved beyond
@ -110,12 +132,6 @@ class BloomFilterPolicy : public FilterPolicy {
// (An internal convenience function to save boilerplate.)
static FilterBitsBuilder* GetBuilderFromContext(const FilterBuildingContext&);
// Read metadata to determine what kind of FilterBitsReader is needed
// and return a new one. This must successfully process any filter data
// generated by a built-in FilterBitsBuilder, regardless of the impl
// chosen for this BloomFilterPolicy. Not compatible with CreateFilter.
FilterBitsReader* GetFilterBitsReader(const Slice& contents) const override;
// Essentially for testing only: configured millibits/key
int GetMillibitsPerKey() const { return millibits_per_key_; }
// Essentially for testing only: legacy whole bits/key
@ -157,12 +173,33 @@ class BloomFilterPolicy : public FilterPolicy {
// Sum over all generated filters f:
// (predicted_fp_rate(f) - predicted_fp_rate(f|o_f_f_m=false)) * 2^32
mutable std::atomic<int64_t> aggregate_rounding_balance_;
};
// For newer Bloom filter implementation(s)
FilterBitsReader* GetBloomBitsReader(const Slice& contents) const;
// Chooses between two filter policies based on LSM level, but
// only for Level and Universal compaction styles. Flush is treated
// as level -1. Policy b is considered fallback / primary policy.
class LevelThresholdFilterPolicy : public BuiltinFilterPolicy {
public:
LevelThresholdFilterPolicy(std::unique_ptr<const FilterPolicy>&& a,
std::unique_ptr<const FilterPolicy>&& b,
int starting_level_for_b);
// For Ribbon filter implementation(s)
FilterBitsReader* GetRibbonBitsReader(const Slice& contents) const;
// Deprecated block-based filter only
void CreateFilter(const Slice* keys, int n, std::string* dst) const override;
FilterBitsBuilder* GetBuilderWithContext(
const FilterBuildingContext& context) const override;
inline int TEST_GetStartingLevelForB() const { return starting_level_for_b_; }
inline const FilterPolicy* TEST_GetPolicyA() const { return policy_a_.get(); }
inline const FilterPolicy* TEST_GetPolicyB() const { return policy_b_.get(); }
private:
const std::unique_ptr<const FilterPolicy> policy_a_;
const std::unique_ptr<const FilterPolicy> policy_b_;
int starting_level_for_b_;
};
} // namespace ROCKSDB_NAMESPACE

View File

@ -104,7 +104,9 @@ default_params = {
"use_clock_cache": 0, # currently broken
"use_full_merge_v1": lambda: random.randint(0, 1),
"use_merge": lambda: random.randint(0, 1),
"use_ribbon_filter": lambda: random.randint(0, 1),
# 999 -> use Bloom API
"ribbon_starting_level": lambda: random.choice([random.randint(-1, 10), 999]),
"use_block_based_filter": lambda: random.randint(0, 1),
"verify_checksum": 1,
"write_buffer_size": 4 * 1024 * 1024,
"writepercent": 35,
@ -359,6 +361,8 @@ def finalize_and_sanitize(src_params):
dest_params["partition_filters"] = 0
else:
dest_params["use_block_based_filter"] = 0
if dest_params["ribbon_starting_level"] < 999:
dest_params["use_block_based_filter"] = 0
if dest_params.get("atomic_flush", 0) == 1:
# disable pipelined write when atomic flush is used.
dest_params["enable_pipelined_write"] = 0

View File

@ -1195,6 +1195,96 @@ INSTANTIATE_TEST_CASE_P(Full, FullBloomTest,
BloomFilterPolicy::kFastLocalBloom,
BloomFilterPolicy::kStandard128Ribbon));
static double GetEffectiveBitsPerKey(FilterBitsBuilder* builder) {
union {
uint64_t key_value;
char key_bytes[8];
};
const unsigned kNumKeys = 1000;
Slice key_slice{key_bytes, 8};
for (key_value = 0; key_value < kNumKeys; ++key_value) {
builder->AddKey(key_slice);
}
std::unique_ptr<const char[]> buf;
auto filter = builder->Finish(&buf);
return filter.size() * /*bits per byte*/ 8 / (1.0 * kNumKeys);
}
static void SetTestingLevel(int levelish, FilterBuildingContext* ctx) {
if (levelish == -1) {
// Flush is treated as level -1 for this option but actually level 0
ctx->level_at_creation = 0;
ctx->reason = TableFileCreationReason::kFlush;
} else {
ctx->level_at_creation = levelish;
ctx->reason = TableFileCreationReason::kCompaction;
}
}
TEST(RibbonTest, RibbonTestLevelThreshold) {
BlockBasedTableOptions opts;
FilterBuildingContext ctx(opts);
// A few settings
for (CompactionStyle cs : {kCompactionStyleLevel, kCompactionStyleUniversal,
kCompactionStyleFIFO, kCompactionStyleNone}) {
ctx.compaction_style = cs;
for (int bloom_before_level : {-1, 0, 1, 10}) {
std::vector<std::unique_ptr<const FilterPolicy> > policies;
policies.emplace_back(NewRibbonFilterPolicy(10, bloom_before_level));
if (bloom_before_level == -1) {
// Also test old API
policies.emplace_back(NewExperimentalRibbonFilterPolicy(10));
}
if (bloom_before_level == 0) {
// Also test old API and new API default
policies.emplace_back(NewRibbonFilterPolicy(10));
}
for (std::unique_ptr<const FilterPolicy>& policy : policies) {
// Claim to be generating filter for this level
SetTestingLevel(bloom_before_level, &ctx);
std::unique_ptr<FilterBitsBuilder> builder{
policy->GetBuilderWithContext(ctx)};
// Must be Ribbon (more space efficient than 10 bits per key)
ASSERT_LT(GetEffectiveBitsPerKey(builder.get()), 8);
if (bloom_before_level >= 0) {
// Claim to be generating filter for previous level
SetTestingLevel(bloom_before_level - 1, &ctx);
builder.reset(policy->GetBuilderWithContext(ctx));
if (cs == kCompactionStyleLevel || cs == kCompactionStyleUniversal) {
// Level is considered.
// Must be Bloom (~ 10 bits per key)
ASSERT_GT(GetEffectiveBitsPerKey(builder.get()), 9);
} else {
// Level is ignored under non-traditional compaction styles.
// Must be Ribbon (more space efficient than 10 bits per key)
ASSERT_LT(GetEffectiveBitsPerKey(builder.get()), 8);
}
}
// Like SST file writer
ctx.level_at_creation = -1;
ctx.reason = TableFileCreationReason::kMisc;
builder.reset(policy->GetBuilderWithContext(ctx));
// Must be Ribbon (more space efficient than 10 bits per key)
ASSERT_LT(GetEffectiveBitsPerKey(builder.get()), 8);
}
}
}
}
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {