rocksdb/db/blob/db_blob_compaction_test.cc

731 lines
24 KiB
C++
Raw Normal View History

// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include "db/blob/blob_index.h"
#include "db/blob/blob_log_format.h"
#include "db/db_test_util.h"
#include "port/stack_trace.h"
#include "test_util/sync_point.h"
namespace ROCKSDB_NAMESPACE {
class DBBlobCompactionTest : public DBTestBase {
public:
explicit DBBlobCompactionTest()
: DBTestBase("db_blob_compaction_test", /*env_do_fsync=*/false) {}
#ifndef ROCKSDB_LITE
const std::vector<InternalStats::CompactionStats>& GetCompactionStats() {
VersionSet* const versions = dbfull()->GetVersionSet();
assert(versions);
assert(versions->GetColumnFamilySet());
ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault();
assert(cfd);
const InternalStats* const internal_stats = cfd->internal_stats();
assert(internal_stats);
return internal_stats->TEST_GetCompactionStats();
}
#endif // ROCKSDB_LITE
};
namespace {
class FilterByKeyLength : public CompactionFilter {
public:
explicit FilterByKeyLength(size_t len) : length_threshold_(len) {}
const char* Name() const override {
return "rocksdb.compaction.filter.by.key.length";
}
CompactionFilter::Decision FilterBlobByKey(
int /*level*/, const Slice& key, std::string* /*new_value*/,
std::string* /*skip_until*/) const override {
if (key.size() < length_threshold_) {
return CompactionFilter::Decision::kRemove;
}
return CompactionFilter::Decision::kKeep;
}
private:
size_t length_threshold_;
};
class BadBlobCompactionFilter : public CompactionFilter {
public:
explicit BadBlobCompactionFilter(std::string prefix,
CompactionFilter::Decision filter_by_key,
CompactionFilter::Decision filter_v2)
: prefix_(std::move(prefix)),
filter_blob_by_key_(filter_by_key),
filter_v2_(filter_v2) {}
const char* Name() const override { return "rocksdb.compaction.filter.bad"; }
CompactionFilter::Decision FilterBlobByKey(
int /*level*/, const Slice& key, std::string* /*new_value*/,
std::string* /*skip_until*/) const override {
if (key.size() >= prefix_.size() &&
0 == strncmp(prefix_.data(), key.data(), prefix_.size())) {
return CompactionFilter::Decision::kUndetermined;
}
return filter_blob_by_key_;
}
CompactionFilter::Decision FilterV2(
int /*level*/, const Slice& /*key*/, ValueType /*value_type*/,
const Slice& /*existing_value*/, std::string* /*new_value*/,
std::string* /*skip_until*/) const override {
return filter_v2_;
}
private:
const std::string prefix_;
const CompactionFilter::Decision filter_blob_by_key_;
const CompactionFilter::Decision filter_v2_;
};
class ValueBlindWriteFilter : public CompactionFilter {
public:
explicit ValueBlindWriteFilter(std::string new_val)
: new_value_(std::move(new_val)) {}
const char* Name() const override {
return "rocksdb.compaction.filter.blind.write";
}
CompactionFilter::Decision FilterBlobByKey(
int level, const Slice& key, std::string* new_value,
std::string* skip_until) const override;
private:
const std::string new_value_;
};
CompactionFilter::Decision ValueBlindWriteFilter::FilterBlobByKey(
int /*level*/, const Slice& /*key*/, std::string* new_value,
std::string* /*skip_until*/) const {
assert(new_value);
new_value->assign(new_value_);
return CompactionFilter::Decision::kChangeValue;
}
class ValueMutationFilter : public CompactionFilter {
public:
explicit ValueMutationFilter(std::string padding)
: padding_(std::move(padding)) {}
const char* Name() const override {
return "rocksdb.compaction.filter.value.mutation";
}
CompactionFilter::Decision FilterV2(int level, const Slice& key,
ValueType value_type,
const Slice& existing_value,
std::string* new_value,
std::string* skip_until) const override;
private:
const std::string padding_;
};
CompactionFilter::Decision ValueMutationFilter::FilterV2(
int /*level*/, const Slice& /*key*/, ValueType value_type,
const Slice& existing_value, std::string* new_value,
std::string* /*skip_until*/) const {
assert(CompactionFilter::ValueType::kBlobIndex != value_type);
if (CompactionFilter::ValueType::kValue != value_type) {
return CompactionFilter::Decision::kKeep;
}
assert(new_value);
new_value->assign(existing_value.data(), existing_value.size());
new_value->append(padding_);
return CompactionFilter::Decision::kChangeValue;
}
class AlwaysKeepFilter : public CompactionFilter {
public:
explicit AlwaysKeepFilter() = default;
const char* Name() const override {
return "rocksdb.compaction.filter.always.keep";
}
CompactionFilter::Decision FilterV2(
int /*level*/, const Slice& /*key*/, ValueType /*value_type*/,
const Slice& /*existing_value*/, std::string* /*new_value*/,
std::string* /*skip_until*/) const override {
return CompactionFilter::Decision::kKeep;
}
};
class SkipUntilFilter : public CompactionFilter {
public:
explicit SkipUntilFilter(std::string skip_until)
: skip_until_(std::move(skip_until)) {}
const char* Name() const override {
return "rocksdb.compaction.filter.skip.until";
}
CompactionFilter::Decision FilterV2(int /* level */, const Slice& /* key */,
ValueType /* value_type */,
const Slice& /* existing_value */,
std::string* /* new_value */,
std::string* skip_until) const override {
assert(skip_until);
*skip_until = skip_until_;
return CompactionFilter::Decision::kRemoveAndSkipUntil;
}
private:
std::string skip_until_;
};
} // anonymous namespace
class DBBlobBadCompactionFilterTest
: public DBBlobCompactionTest,
public testing::WithParamInterface<
std::tuple<std::string, CompactionFilter::Decision,
CompactionFilter::Decision>> {
public:
explicit DBBlobBadCompactionFilterTest()
: compaction_filter_guard_(new BadBlobCompactionFilter(
std::get<0>(GetParam()), std::get<1>(GetParam()),
std::get<2>(GetParam()))) {}
protected:
std::unique_ptr<CompactionFilter> compaction_filter_guard_;
};
INSTANTIATE_TEST_CASE_P(
BadCompactionFilter, DBBlobBadCompactionFilterTest,
testing::Combine(
testing::Values("a"),
testing::Values(CompactionFilter::Decision::kChangeBlobIndex,
CompactionFilter::Decision::kIOError),
testing::Values(CompactionFilter::Decision::kUndetermined,
CompactionFilter::Decision::kChangeBlobIndex,
CompactionFilter::Decision::kIOError)));
TEST_F(DBBlobCompactionTest, FilterByKeyLength) {
Options options = GetDefaultOptions();
options.enable_blob_files = true;
options.min_blob_size = 0;
options.create_if_missing = true;
constexpr size_t kKeyLength = 2;
std::unique_ptr<CompactionFilter> compaction_filter_guard(
new FilterByKeyLength(kKeyLength));
options.compaction_filter = compaction_filter_guard.get();
constexpr char short_key[] = "a";
constexpr char long_key[] = "abc";
constexpr char blob_value[] = "value";
DestroyAndReopen(options);
ASSERT_OK(Put(short_key, blob_value));
ASSERT_OK(Put(long_key, blob_value));
ASSERT_OK(Flush());
CompactRangeOptions cro;
ASSERT_OK(db_->CompactRange(cro, /*begin=*/nullptr, /*end=*/nullptr));
std::string value;
ASSERT_TRUE(db_->Get(ReadOptions(), short_key, &value).IsNotFound());
value.clear();
ASSERT_OK(db_->Get(ReadOptions(), long_key, &value));
ASSERT_EQ("value", value);
#ifndef ROCKSDB_LITE
const auto& compaction_stats = GetCompactionStats();
ASSERT_GE(compaction_stats.size(), 2);
// Filter decides between kKeep and kRemove solely based on key;
// this involves neither reading nor writing blobs
ASSERT_EQ(compaction_stats[1].bytes_read_blob, 0);
ASSERT_EQ(compaction_stats[1].bytes_written_blob, 0);
#endif // ROCKSDB_LITE
Close();
}
TEST_F(DBBlobCompactionTest, BlindWriteFilter) {
Options options = GetDefaultOptions();
options.enable_blob_files = true;
options.min_blob_size = 0;
options.create_if_missing = true;
constexpr char new_blob_value[] = "new_blob_value";
std::unique_ptr<CompactionFilter> compaction_filter_guard(
new ValueBlindWriteFilter(new_blob_value));
options.compaction_filter = compaction_filter_guard.get();
DestroyAndReopen(options);
const std::vector<std::string> keys = {"a", "b", "c"};
const std::vector<std::string> values = {"a_value", "b_value", "c_value"};
assert(keys.size() == values.size());
for (size_t i = 0; i < keys.size(); ++i) {
ASSERT_OK(Put(keys[i], values[i]));
}
ASSERT_OK(Flush());
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), /*begin=*/nullptr,
/*end=*/nullptr));
for (const auto& key : keys) {
ASSERT_EQ(new_blob_value, Get(key));
}
#ifndef ROCKSDB_LITE
const auto& compaction_stats = GetCompactionStats();
ASSERT_GE(compaction_stats.size(), 2);
// Filter unconditionally changes value in FilterBlobByKey;
// this involves writing but not reading blobs
ASSERT_EQ(compaction_stats[1].bytes_read_blob, 0);
ASSERT_GT(compaction_stats[1].bytes_written_blob, 0);
#endif // ROCKSDB_LITE
Close();
}
TEST_F(DBBlobCompactionTest, SkipUntilFilter) {
Options options = GetDefaultOptions();
options.enable_blob_files = true;
std::unique_ptr<CompactionFilter> compaction_filter_guard(
new SkipUntilFilter("z"));
options.compaction_filter = compaction_filter_guard.get();
Reopen(options);
const std::vector<std::string> keys{"a", "b", "c"};
const std::vector<std::string> values{"a_value", "b_value", "c_value"};
assert(keys.size() == values.size());
for (size_t i = 0; i < keys.size(); ++i) {
ASSERT_OK(Put(keys[i], values[i]));
}
ASSERT_OK(Flush());
int process_in_flow_called = 0;
SyncPoint::GetInstance()->SetCallBack(
"BlobCountingIterator::UpdateAndCountBlobIfNeeded:ProcessInFlow",
[&process_in_flow_called](void* /* arg */) { ++process_in_flow_called; });
SyncPoint::GetInstance()->EnableProcessing();
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), /* begin */ nullptr,
/* end */ nullptr));
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
for (const auto& key : keys) {
ASSERT_EQ(Get(key), "NOT_FOUND");
}
// Make sure SkipUntil was performed using iteration rather than Seek
ASSERT_EQ(process_in_flow_called, keys.size());
Close();
}
TEST_P(DBBlobBadCompactionFilterTest, BadDecisionFromCompactionFilter) {
Options options = GetDefaultOptions();
options.enable_blob_files = true;
options.min_blob_size = 0;
options.create_if_missing = true;
options.compaction_filter = compaction_filter_guard_.get();
DestroyAndReopen(options);
ASSERT_OK(Put("b", "value"));
ASSERT_OK(Flush());
ASSERT_TRUE(db_->CompactRange(CompactRangeOptions(), /*begin=*/nullptr,
/*end=*/nullptr)
.IsNotSupported());
Close();
DestroyAndReopen(options);
std::string key(std::get<0>(GetParam()));
ASSERT_OK(Put(key, "value"));
ASSERT_OK(Flush());
ASSERT_TRUE(db_->CompactRange(CompactRangeOptions(), /*begin=*/nullptr,
/*end=*/nullptr)
.IsNotSupported());
Close();
}
TEST_F(DBBlobCompactionTest, CompactionFilter_InlinedTTLIndex) {
Options options = GetDefaultOptions();
options.create_if_missing = true;
options.enable_blob_files = true;
options.min_blob_size = 0;
std::unique_ptr<CompactionFilter> compaction_filter_guard(
new ValueMutationFilter(""));
options.compaction_filter = compaction_filter_guard.get();
DestroyAndReopen(options);
constexpr char key[] = "key";
constexpr char blob[] = "blob";
// Fake an inlined TTL blob index.
std::string blob_index;
constexpr uint64_t expiration = 1234567890;
BlobIndex::EncodeInlinedTTL(&blob_index, expiration, blob);
WriteBatch batch;
ASSERT_OK(WriteBatchInternal::PutBlobIndex(&batch, 0, key, blob_index));
ASSERT_OK(db_->Write(WriteOptions(), &batch));
ASSERT_OK(Flush());
ASSERT_TRUE(db_->CompactRange(CompactRangeOptions(), /*begin=*/nullptr,
/*end=*/nullptr)
.IsCorruption());
Close();
}
TEST_F(DBBlobCompactionTest, CompactionFilter) {
Options options = GetDefaultOptions();
options.create_if_missing = true;
options.enable_blob_files = true;
options.min_blob_size = 0;
constexpr char padding[] = "_delta";
std::unique_ptr<CompactionFilter> compaction_filter_guard(
new ValueMutationFilter(padding));
options.compaction_filter = compaction_filter_guard.get();
DestroyAndReopen(options);
const std::vector<std::pair<std::string, std::string>> kvs = {
{"a", "a_value"}, {"b", "b_value"}, {"c", "c_value"}};
for (const auto& kv : kvs) {
ASSERT_OK(Put(kv.first, kv.second));
}
ASSERT_OK(Flush());
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), /*begin=*/nullptr,
/*end=*/nullptr));
for (const auto& kv : kvs) {
ASSERT_EQ(kv.second + std::string(padding), Get(kv.first));
}
#ifndef ROCKSDB_LITE
const auto& compaction_stats = GetCompactionStats();
ASSERT_GE(compaction_stats.size(), 2);
// Filter changes the value using the previous value in FilterV2;
// this involves reading and writing blobs
ASSERT_GT(compaction_stats[1].bytes_read_blob, 0);
ASSERT_GT(compaction_stats[1].bytes_written_blob, 0);
#endif // ROCKSDB_LITE
Close();
}
TEST_F(DBBlobCompactionTest, CorruptedBlobIndex) {
Options options = GetDefaultOptions();
options.create_if_missing = true;
options.enable_blob_files = true;
options.min_blob_size = 0;
std::unique_ptr<CompactionFilter> compaction_filter_guard(
new ValueMutationFilter(""));
options.compaction_filter = compaction_filter_guard.get();
DestroyAndReopen(options);
constexpr char key[] = "key";
constexpr char blob[] = "blob";
ASSERT_OK(Put(key, blob));
ASSERT_OK(Flush());
SyncPoint::GetInstance()->SetCallBack(
"CompactionIterator::InvokeFilterIfNeeded::TamperWithBlobIndex",
[](void* arg) {
Slice* const blob_index = static_cast<Slice*>(arg);
assert(blob_index);
assert(!blob_index->empty());
blob_index->remove_prefix(1);
});
SyncPoint::GetInstance()->EnableProcessing();
ASSERT_TRUE(db_->CompactRange(CompactRangeOptions(), /*begin=*/nullptr,
/*end=*/nullptr)
.IsCorruption());
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
Close();
}
TEST_F(DBBlobCompactionTest, CompactionFilterReadBlobAndKeep) {
Options options = GetDefaultOptions();
options.create_if_missing = true;
options.enable_blob_files = true;
options.min_blob_size = 0;
std::unique_ptr<CompactionFilter> compaction_filter_guard(
new AlwaysKeepFilter());
options.compaction_filter = compaction_filter_guard.get();
DestroyAndReopen(options);
ASSERT_OK(Put("foo", "foo_value"));
ASSERT_OK(Flush());
std::vector<uint64_t> blob_files = GetBlobFileNumbers();
ASSERT_EQ(1, blob_files.size());
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), /*begin=*/nullptr,
/*end=*/nullptr));
ASSERT_EQ(blob_files, GetBlobFileNumbers());
#ifndef ROCKSDB_LITE
const auto& compaction_stats = GetCompactionStats();
ASSERT_GE(compaction_stats.size(), 2);
// Filter decides to keep the existing value in FilterV2;
// this involves reading but not writing blobs
ASSERT_GT(compaction_stats[1].bytes_read_blob, 0);
ASSERT_EQ(compaction_stats[1].bytes_written_blob, 0);
#endif // ROCKSDB_LITE
Close();
}
TEST_F(DBBlobCompactionTest, TrackGarbage) {
Options options = GetDefaultOptions();
options.enable_blob_files = true;
Reopen(options);
// First table+blob file pair: 4 blobs with different keys
constexpr char first_key[] = "first_key";
constexpr char first_value[] = "first_value";
constexpr char second_key[] = "second_key";
constexpr char second_value[] = "second_value";
constexpr char third_key[] = "third_key";
constexpr char third_value[] = "third_value";
constexpr char fourth_key[] = "fourth_key";
constexpr char fourth_value[] = "fourth_value";
ASSERT_OK(Put(first_key, first_value));
ASSERT_OK(Put(second_key, second_value));
ASSERT_OK(Put(third_key, third_value));
ASSERT_OK(Put(fourth_key, fourth_value));
ASSERT_OK(Flush());
// Second table+blob file pair: overwrite 2 existing keys
constexpr char new_first_value[] = "new_first_value";
constexpr char new_second_value[] = "new_second_value";
ASSERT_OK(Put(first_key, new_first_value));
ASSERT_OK(Put(second_key, new_second_value));
ASSERT_OK(Flush());
// Compact them together. The first blob file should have 2 garbage blobs
// corresponding to the 2 overwritten keys.
constexpr Slice* begin = nullptr;
constexpr Slice* end = nullptr;
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), begin, end));
VersionSet* const versions = dbfull()->GetVersionSet();
assert(versions);
assert(versions->GetColumnFamilySet());
ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault();
assert(cfd);
Version* const current = cfd->current();
assert(current);
const VersionStorageInfo* const storage_info = current->storage_info();
assert(storage_info);
const auto& blob_files = storage_info->GetBlobFiles();
ASSERT_EQ(blob_files.size(), 2);
{
Use a sorted vector instead of a map to store blob file metadata (#9526) Summary: The patch replaces `std::map` with a sorted `std::vector` for `VersionStorageInfo::blob_files_` and preallocates the space for the `vector` before saving the `BlobFileMetaData` into the new `VersionStorageInfo` in `VersionBuilder::Rep::SaveBlobFilesTo`. These changes reduce the time the DB mutex is held while saving new `Version`s, and using a sorted `vector` also makes lookups faster thanks to better memory locality. In addition, the patch introduces helper methods `VersionStorageInfo::GetBlobFileMetaData` and `VersionStorageInfo::GetBlobFileMetaDataLB` that can be used by clients to perform lookups in the `vector`, and does some general cleanup in the parts of code where blob file metadata are used. Pull Request resolved: https://github.com/facebook/rocksdb/pull/9526 Test Plan: Ran `make check` and the crash test script for a while. Performance was tested using a load-optimized benchmark (`fillseq` with vector memtable, no WAL) and small file sizes so that a significant number of files are produced: ``` numactl --interleave=all ./db_bench --benchmarks=fillseq --allow_concurrent_memtable_write=false --level0_file_num_compaction_trigger=4 --level0_slowdown_writes_trigger=20 --level0_stop_writes_trigger=30 --max_background_jobs=8 --max_write_buffer_number=8 --db=/data/ltamasi-dbbench --wal_dir=/data/ltamasi-dbbench --num=800000000 --num_levels=8 --key_size=20 --value_size=400 --block_size=8192 --cache_size=51539607552 --cache_numshardbits=6 --compression_max_dict_bytes=0 --compression_ratio=0.5 --compression_type=lz4 --bytes_per_sync=8388608 --cache_index_and_filter_blocks=1 --cache_high_pri_pool_ratio=0.5 --benchmark_write_rate_limit=0 --write_buffer_size=16777216 --target_file_size_base=16777216 --max_bytes_for_level_base=67108864 --verify_checksum=1 --delete_obsolete_files_period_micros=62914560 --max_bytes_for_level_multiplier=8 --statistics=0 --stats_per_interval=1 --stats_interval_seconds=20 --histogram=1 --memtablerep=skip_list --bloom_bits=10 --open_files=-1 --subcompactions=1 --compaction_style=0 --min_level_to_compress=3 --level_compaction_dynamic_level_bytes=true --pin_l0_filter_and_index_blocks_in_cache=1 --soft_pending_compaction_bytes_limit=167503724544 --hard_pending_compaction_bytes_limit=335007449088 --min_level_to_compress=0 --use_existing_db=0 --sync=0 --threads=1 --memtablerep=vector --allow_concurrent_memtable_write=false --disable_wal=1 --enable_blob_files=1 --blob_file_size=16777216 --min_blob_size=0 --blob_compression_type=lz4 --enable_blob_garbage_collection=1 --seed=<some value> ``` Final statistics before the patch: ``` Cumulative writes: 0 writes, 700M keys, 0 commit groups, 0.0 writes per commit group, ingest: 284.62 GB, 121.27 MB/s Interval writes: 0 writes, 334K keys, 0 commit groups, 0.0 writes per commit group, ingest: 139.28 MB, 72.46 MB/s ``` With the patch: ``` Cumulative writes: 0 writes, 760M keys, 0 commit groups, 0.0 writes per commit group, ingest: 308.66 GB, 131.52 MB/s Interval writes: 0 writes, 445K keys, 0 commit groups, 0.0 writes per commit group, ingest: 185.35 MB, 93.15 MB/s ``` Total time to complete the benchmark is 2611 seconds with the patch, down from 2986 secs. Reviewed By: riversand963 Differential Revision: D34082728 Pulled By: ltamasi fbshipit-source-id: fc598abf676dce436734d06bb9d2d99a26a004fc
2022-02-09 12:35:39 -08:00
const auto& meta = blob_files.front();
assert(meta);
constexpr uint64_t first_expected_bytes =
sizeof(first_value) - 1 +
BlobLogRecord::CalculateAdjustmentForRecordHeader(sizeof(first_key) -
1);
constexpr uint64_t second_expected_bytes =
sizeof(second_value) - 1 +
BlobLogRecord::CalculateAdjustmentForRecordHeader(sizeof(second_key) -
1);
constexpr uint64_t third_expected_bytes =
sizeof(third_value) - 1 +
BlobLogRecord::CalculateAdjustmentForRecordHeader(sizeof(third_key) -
1);
constexpr uint64_t fourth_expected_bytes =
sizeof(fourth_value) - 1 +
BlobLogRecord::CalculateAdjustmentForRecordHeader(sizeof(fourth_key) -
1);
ASSERT_EQ(meta->GetTotalBlobCount(), 4);
ASSERT_EQ(meta->GetTotalBlobBytes(),
first_expected_bytes + second_expected_bytes +
third_expected_bytes + fourth_expected_bytes);
ASSERT_EQ(meta->GetGarbageBlobCount(), 2);
ASSERT_EQ(meta->GetGarbageBlobBytes(),
first_expected_bytes + second_expected_bytes);
}
{
Use a sorted vector instead of a map to store blob file metadata (#9526) Summary: The patch replaces `std::map` with a sorted `std::vector` for `VersionStorageInfo::blob_files_` and preallocates the space for the `vector` before saving the `BlobFileMetaData` into the new `VersionStorageInfo` in `VersionBuilder::Rep::SaveBlobFilesTo`. These changes reduce the time the DB mutex is held while saving new `Version`s, and using a sorted `vector` also makes lookups faster thanks to better memory locality. In addition, the patch introduces helper methods `VersionStorageInfo::GetBlobFileMetaData` and `VersionStorageInfo::GetBlobFileMetaDataLB` that can be used by clients to perform lookups in the `vector`, and does some general cleanup in the parts of code where blob file metadata are used. Pull Request resolved: https://github.com/facebook/rocksdb/pull/9526 Test Plan: Ran `make check` and the crash test script for a while. Performance was tested using a load-optimized benchmark (`fillseq` with vector memtable, no WAL) and small file sizes so that a significant number of files are produced: ``` numactl --interleave=all ./db_bench --benchmarks=fillseq --allow_concurrent_memtable_write=false --level0_file_num_compaction_trigger=4 --level0_slowdown_writes_trigger=20 --level0_stop_writes_trigger=30 --max_background_jobs=8 --max_write_buffer_number=8 --db=/data/ltamasi-dbbench --wal_dir=/data/ltamasi-dbbench --num=800000000 --num_levels=8 --key_size=20 --value_size=400 --block_size=8192 --cache_size=51539607552 --cache_numshardbits=6 --compression_max_dict_bytes=0 --compression_ratio=0.5 --compression_type=lz4 --bytes_per_sync=8388608 --cache_index_and_filter_blocks=1 --cache_high_pri_pool_ratio=0.5 --benchmark_write_rate_limit=0 --write_buffer_size=16777216 --target_file_size_base=16777216 --max_bytes_for_level_base=67108864 --verify_checksum=1 --delete_obsolete_files_period_micros=62914560 --max_bytes_for_level_multiplier=8 --statistics=0 --stats_per_interval=1 --stats_interval_seconds=20 --histogram=1 --memtablerep=skip_list --bloom_bits=10 --open_files=-1 --subcompactions=1 --compaction_style=0 --min_level_to_compress=3 --level_compaction_dynamic_level_bytes=true --pin_l0_filter_and_index_blocks_in_cache=1 --soft_pending_compaction_bytes_limit=167503724544 --hard_pending_compaction_bytes_limit=335007449088 --min_level_to_compress=0 --use_existing_db=0 --sync=0 --threads=1 --memtablerep=vector --allow_concurrent_memtable_write=false --disable_wal=1 --enable_blob_files=1 --blob_file_size=16777216 --min_blob_size=0 --blob_compression_type=lz4 --enable_blob_garbage_collection=1 --seed=<some value> ``` Final statistics before the patch: ``` Cumulative writes: 0 writes, 700M keys, 0 commit groups, 0.0 writes per commit group, ingest: 284.62 GB, 121.27 MB/s Interval writes: 0 writes, 334K keys, 0 commit groups, 0.0 writes per commit group, ingest: 139.28 MB, 72.46 MB/s ``` With the patch: ``` Cumulative writes: 0 writes, 760M keys, 0 commit groups, 0.0 writes per commit group, ingest: 308.66 GB, 131.52 MB/s Interval writes: 0 writes, 445K keys, 0 commit groups, 0.0 writes per commit group, ingest: 185.35 MB, 93.15 MB/s ``` Total time to complete the benchmark is 2611 seconds with the patch, down from 2986 secs. Reviewed By: riversand963 Differential Revision: D34082728 Pulled By: ltamasi fbshipit-source-id: fc598abf676dce436734d06bb9d2d99a26a004fc
2022-02-09 12:35:39 -08:00
const auto& meta = blob_files.back();
assert(meta);
constexpr uint64_t new_first_expected_bytes =
sizeof(new_first_value) - 1 +
BlobLogRecord::CalculateAdjustmentForRecordHeader(sizeof(first_key) -
1);
constexpr uint64_t new_second_expected_bytes =
sizeof(new_second_value) - 1 +
BlobLogRecord::CalculateAdjustmentForRecordHeader(sizeof(second_key) -
1);
ASSERT_EQ(meta->GetTotalBlobCount(), 2);
ASSERT_EQ(meta->GetTotalBlobBytes(),
new_first_expected_bytes + new_second_expected_bytes);
ASSERT_EQ(meta->GetGarbageBlobCount(), 0);
ASSERT_EQ(meta->GetGarbageBlobBytes(), 0);
}
}
TEST_F(DBBlobCompactionTest, MergeBlobWithBase) {
Options options = GetDefaultOptions();
options.enable_blob_files = true;
options.min_blob_size = 0;
options.merge_operator = MergeOperators::CreateStringAppendOperator();
options.disable_auto_compactions = true;
Reopen(options);
ASSERT_OK(Put("Key1", "v1_1"));
ASSERT_OK(Put("Key2", "v2_1"));
ASSERT_OK(Flush());
ASSERT_OK(Merge("Key1", "v1_2"));
ASSERT_OK(Merge("Key2", "v2_2"));
ASSERT_OK(Flush());
ASSERT_OK(Merge("Key1", "v1_3"));
ASSERT_OK(Flush());
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), /*begin=*/nullptr,
/*end=*/nullptr));
ASSERT_EQ(Get("Key1"), "v1_1,v1_2,v1_3");
ASSERT_EQ(Get("Key2"), "v2_1,v2_2");
Close();
}
TEST_F(DBBlobCompactionTest, CompactionReadaheadGarbageCollection) {
Options options = GetDefaultOptions();
options.enable_blob_files = true;
options.min_blob_size = 0;
options.enable_blob_garbage_collection = true;
options.blob_garbage_collection_age_cutoff = 1.0;
options.blob_compaction_readahead_size = 1 << 10;
options.disable_auto_compactions = true;
Reopen(options);
ASSERT_OK(Put("key", "lime"));
ASSERT_OK(Put("foo", "bar"));
ASSERT_OK(Flush());
ASSERT_OK(Put("key", "pie"));
ASSERT_OK(Put("foo", "baz"));
ASSERT_OK(Flush());
size_t num_non_prefetch_reads = 0;
SyncPoint::GetInstance()->SetCallBack(
"BlobFileReader::GetBlob:ReadFromFile",
[&num_non_prefetch_reads](void* /* arg */) { ++num_non_prefetch_reads; });
SyncPoint::GetInstance()->EnableProcessing();
constexpr Slice* begin = nullptr;
constexpr Slice* end = nullptr;
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), begin, end));
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
ASSERT_EQ(Get("key"), "pie");
ASSERT_EQ(Get("foo"), "baz");
ASSERT_EQ(num_non_prefetch_reads, 0);
Close();
}
TEST_F(DBBlobCompactionTest, CompactionReadaheadFilter) {
Options options = GetDefaultOptions();
std::unique_ptr<CompactionFilter> compaction_filter_guard(
new ValueMutationFilter("pie"));
options.compaction_filter = compaction_filter_guard.get();
options.enable_blob_files = true;
options.min_blob_size = 0;
options.blob_compaction_readahead_size = 1 << 10;
options.disable_auto_compactions = true;
Reopen(options);
ASSERT_OK(Put("key", "lime"));
ASSERT_OK(Put("foo", "bar"));
ASSERT_OK(Flush());
size_t num_non_prefetch_reads = 0;
SyncPoint::GetInstance()->SetCallBack(
"BlobFileReader::GetBlob:ReadFromFile",
[&num_non_prefetch_reads](void* /* arg */) { ++num_non_prefetch_reads; });
SyncPoint::GetInstance()->EnableProcessing();
constexpr Slice* begin = nullptr;
constexpr Slice* end = nullptr;
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), begin, end));
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
ASSERT_EQ(Get("key"), "limepie");
ASSERT_EQ(Get("foo"), "barpie");
ASSERT_EQ(num_non_prefetch_reads, 0);
Close();
}
TEST_F(DBBlobCompactionTest, CompactionReadaheadMerge) {
Options options = GetDefaultOptions();
options.enable_blob_files = true;
options.min_blob_size = 0;
options.blob_compaction_readahead_size = 1 << 10;
options.merge_operator = MergeOperators::CreateStringAppendOperator();
options.disable_auto_compactions = true;
Reopen(options);
ASSERT_OK(Put("key", "lime"));
ASSERT_OK(Put("foo", "bar"));
ASSERT_OK(Flush());
ASSERT_OK(Merge("key", "pie"));
ASSERT_OK(Merge("foo", "baz"));
ASSERT_OK(Flush());
size_t num_non_prefetch_reads = 0;
SyncPoint::GetInstance()->SetCallBack(
"BlobFileReader::GetBlob:ReadFromFile",
[&num_non_prefetch_reads](void* /* arg */) { ++num_non_prefetch_reads; });
SyncPoint::GetInstance()->EnableProcessing();
constexpr Slice* begin = nullptr;
constexpr Slice* end = nullptr;
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), begin, end));
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
ASSERT_EQ(Get("key"), "lime,pie");
ASSERT_EQ(Get("foo"), "bar,baz");
ASSERT_EQ(num_non_prefetch_reads, 0);
Close();
}
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {
ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
::testing::InitGoogleTest(&argc, argv);
RegisterCustomObjects(argc, argv);
return RUN_ALL_TESTS();
}