Blob DB: Add compaction filter to remove expired blob index entries
Summary: After adding expiration to blob index in #3066, we are now able to add a compaction filter to cleanup expired blob index entries. Closes https://github.com/facebook/rocksdb/pull/3090 Differential Revision: D6183812 Pulled By: yiwu-arbug fbshipit-source-id: 9cb03267a9702975290e758c9c176a2c03530b83
This commit is contained in:
parent
f90ced92f5
commit
6fb56c582c
@ -230,7 +230,8 @@ void CompactionIterator::NextFromInput() {
|
||||
#endif // ROCKSDB_LITE
|
||||
|
||||
// apply the compaction filter to the first occurrence of the user key
|
||||
if (compaction_filter_ != nullptr && ikey_.type == kTypeValue &&
|
||||
if (compaction_filter_ != nullptr &&
|
||||
(ikey_.type == kTypeValue || ikey_.type == kTypeBlobIndex) &&
|
||||
(visible_at_tip_ || ikey_.sequence > latest_snapshot_ ||
|
||||
ignore_snapshots_)) {
|
||||
// If the user has specified a compaction filter and the sequence
|
||||
@ -240,11 +241,13 @@ void CompactionIterator::NextFromInput() {
|
||||
CompactionFilter::Decision filter;
|
||||
compaction_filter_value_.clear();
|
||||
compaction_filter_skip_until_.Clear();
|
||||
CompactionFilter::ValueType value_type =
|
||||
ikey_.type == kTypeValue ? CompactionFilter::ValueType::kValue
|
||||
: CompactionFilter::ValueType::kBlobIndex;
|
||||
{
|
||||
StopWatchNano timer(env_, true);
|
||||
filter = compaction_filter_->FilterV2(
|
||||
compaction_->level(), ikey_.user_key,
|
||||
CompactionFilter::ValueType::kValue, value_,
|
||||
compaction_->level(), ikey_.user_key, value_type, value_,
|
||||
&compaction_filter_value_, compaction_filter_skip_until_.rep());
|
||||
iter_stats_.total_filter_time +=
|
||||
env_ != nullptr ? timer.ElapsedNanos() : 0;
|
||||
|
@ -36,6 +36,7 @@ class CompactionFilter {
|
||||
enum ValueType {
|
||||
kValue,
|
||||
kMergeOperand,
|
||||
kBlobIndex, // used internally by BlobDB.
|
||||
};
|
||||
|
||||
enum class Decision {
|
||||
@ -171,6 +172,8 @@ class CompactionFilter {
|
||||
bool rv = FilterMergeOperand(level, key, existing_value);
|
||||
return rv ? Decision::kRemove : Decision::kKeep;
|
||||
}
|
||||
case ValueType::kBlobIndex:
|
||||
return Decision::kKeep;
|
||||
}
|
||||
assert(false);
|
||||
return Decision::kKeep;
|
||||
|
78
utilities/blob_db/blob_compaction_filter.h
Normal file
78
utilities/blob_db/blob_compaction_filter.h
Normal file
@ -0,0 +1,78 @@
|
||||
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
||||
// This source code is licensed under both the GPLv2 (found in the
|
||||
// COPYING file in the root directory) and Apache 2.0 License
|
||||
// (found in the LICENSE.Apache file in the root directory).
|
||||
#pragma once
|
||||
#ifndef ROCKSDB_LITE
|
||||
|
||||
#include "rocksdb/compaction_filter.h"
|
||||
#include "rocksdb/env.h"
|
||||
#include "utilities/blob_db/blob_index.h"
|
||||
|
||||
namespace rocksdb {
|
||||
namespace blob_db {
|
||||
|
||||
// CompactionFilter to delete expired blob index from base DB.
|
||||
class BlobIndexCompactionFilter : public CompactionFilter {
|
||||
public:
|
||||
explicit BlobIndexCompactionFilter(uint64_t current_time)
|
||||
: current_time_(current_time) {}
|
||||
|
||||
virtual const char* Name() const override {
|
||||
return "BlobIndexCompactionFilter";
|
||||
}
|
||||
|
||||
// Filter expired blob indexes regardless of snapshots.
|
||||
virtual bool IgnoreSnapshots() const override { return true; }
|
||||
|
||||
virtual Decision FilterV2(int /*level*/, const Slice& /*key*/,
|
||||
ValueType value_type, const Slice& value,
|
||||
std::string* /*new_value*/,
|
||||
std::string* /*skip_until*/) const override {
|
||||
if (value_type != kBlobIndex) {
|
||||
return Decision::kKeep;
|
||||
}
|
||||
BlobIndex blob_index;
|
||||
Status s = blob_index.DecodeFrom(value);
|
||||
if (!s.ok()) {
|
||||
// Unable to decode blob index. Keeping the value.
|
||||
return Decision::kKeep;
|
||||
}
|
||||
if (blob_index.HasTTL() && blob_index.expiration() <= current_time_) {
|
||||
// Expired
|
||||
return Decision::kRemove;
|
||||
}
|
||||
return Decision::kKeep;
|
||||
}
|
||||
|
||||
private:
|
||||
const uint64_t current_time_;
|
||||
};
|
||||
|
||||
class BlobIndexCompactionFilterFactory : public CompactionFilterFactory {
|
||||
public:
|
||||
explicit BlobIndexCompactionFilterFactory(Env* env) : env_(env) {}
|
||||
|
||||
virtual const char* Name() const override {
|
||||
return "BlobIndexCompactionFilterFactory";
|
||||
}
|
||||
|
||||
virtual std::unique_ptr<CompactionFilter> CreateCompactionFilter(
|
||||
const CompactionFilter::Context& /*context*/) override {
|
||||
int64_t current_time = 0;
|
||||
Status s = env_->GetCurrentTime(¤t_time);
|
||||
if (!s.ok()) {
|
||||
return nullptr;
|
||||
}
|
||||
assert(current_time >= 0);
|
||||
return std::unique_ptr<CompactionFilter>(
|
||||
new BlobIndexCompactionFilter(static_cast<uint64_t>(current_time)));
|
||||
}
|
||||
|
||||
private:
|
||||
Env* env_;
|
||||
};
|
||||
|
||||
} // namespace blob_db
|
||||
} // namespace rocksdb
|
||||
#endif // ROCKSDB_LITE
|
@ -26,6 +26,7 @@
|
||||
#include "table/block_builder.h"
|
||||
#include "util/file_reader_writer.h"
|
||||
#include "util/filename.h"
|
||||
#include "utilities/blob_db/blob_compaction_filter.h"
|
||||
#include "utilities/blob_db/blob_db_impl.h"
|
||||
|
||||
namespace rocksdb {
|
||||
@ -45,6 +46,11 @@ Status BlobDB::OpenAndLoad(const Options& options,
|
||||
const BlobDBOptions& bdb_options,
|
||||
const std::string& dbname, BlobDB** blob_db,
|
||||
Options* changed_options) {
|
||||
if (options.compaction_filter != nullptr ||
|
||||
options.compaction_filter_factory != nullptr) {
|
||||
return Status::NotSupported("Blob DB doesn't support compaction filter.");
|
||||
}
|
||||
|
||||
*changed_options = options;
|
||||
*blob_db = nullptr;
|
||||
|
||||
@ -63,6 +69,8 @@ Status BlobDB::OpenAndLoad(const Options& options,
|
||||
all_wal_filters.push_back(rw_filter);
|
||||
}
|
||||
|
||||
changed_options->compaction_filter_factory.reset(
|
||||
new BlobIndexCompactionFilterFactory(options.env));
|
||||
changed_options->listeners.emplace_back(fblistener);
|
||||
if (bdb_options.enable_garbage_collection) {
|
||||
changed_options->listeners.emplace_back(ce_listener);
|
||||
@ -112,6 +120,11 @@ Status BlobDB::Open(const DBOptions& db_options_input,
|
||||
const std::vector<ColumnFamilyDescriptor>& column_families,
|
||||
std::vector<ColumnFamilyHandle*>* handles, BlobDB** blob_db,
|
||||
bool no_base_db) {
|
||||
if (column_families.size() != 1 ||
|
||||
column_families[0].name != kDefaultColumnFamilyName) {
|
||||
return Status::NotSupported(
|
||||
"Blob DB doesn't support non-default column family.");
|
||||
}
|
||||
*blob_db = nullptr;
|
||||
Status s;
|
||||
|
||||
@ -144,6 +157,15 @@ Status BlobDB::Open(const DBOptions& db_options_input,
|
||||
all_wal_filters.push_back(rw_filter);
|
||||
}
|
||||
|
||||
ColumnFamilyOptions cf_options(column_families[0].options);
|
||||
if (cf_options.compaction_filter != nullptr ||
|
||||
cf_options.compaction_filter_factory != nullptr) {
|
||||
return Status::NotSupported("Blob DB doesn't support compaction filter.");
|
||||
}
|
||||
cf_options.compaction_filter_factory.reset(
|
||||
new BlobIndexCompactionFilterFactory(db_options.env));
|
||||
ColumnFamilyDescriptor cf_descriptor(kDefaultColumnFamilyName, cf_options);
|
||||
|
||||
// we need to open blob db first so that recovery can happen
|
||||
BlobDBImpl* bdb = new BlobDBImpl(dbname, bdb_options, db_options);
|
||||
fblistener->SetImplPtr(bdb);
|
||||
@ -164,7 +186,7 @@ Status BlobDB::Open(const DBOptions& db_options_input,
|
||||
}
|
||||
|
||||
DB* db = nullptr;
|
||||
s = DB::Open(db_options, dbname, column_families, handles, &db);
|
||||
s = DB::Open(db_options, dbname, {cf_descriptor}, handles, &db);
|
||||
if (!s.ok()) {
|
||||
delete bdb;
|
||||
return s;
|
||||
|
@ -1727,6 +1727,8 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr<BlobFile>& bfptr,
|
||||
GarbageCollectionWriteCallback callback(cfd, record.key, latest_seq);
|
||||
|
||||
// If key has expired, remove it from base DB.
|
||||
// TODO(yiwu): Blob indexes will be remove by BlobIndexCompactionFilter.
|
||||
// We can just drop the blob record.
|
||||
if (no_relocation_ttl || (has_ttl && now >= record.expiration)) {
|
||||
gc_stats->num_deletes++;
|
||||
gc_stats->deleted_size += record.value_size;
|
||||
|
@ -47,10 +47,15 @@ class BlobDBTest : public testing::Test {
|
||||
|
||||
~BlobDBTest() { Destroy(); }
|
||||
|
||||
void Open(BlobDBOptions bdb_options = BlobDBOptions(),
|
||||
Status TryOpen(BlobDBOptions bdb_options = BlobDBOptions(),
|
||||
Options options = Options()) {
|
||||
options.create_if_missing = true;
|
||||
ASSERT_OK(BlobDB::Open(options, bdb_options, dbname_, &blob_db_));
|
||||
return BlobDB::Open(options, bdb_options, dbname_, &blob_db_);
|
||||
}
|
||||
|
||||
void Open(BlobDBOptions bdb_options = BlobDBOptions(),
|
||||
Options options = Options()) {
|
||||
ASSERT_OK(TryOpen(bdb_options, options));
|
||||
}
|
||||
|
||||
void Destroy() {
|
||||
@ -79,6 +84,10 @@ class BlobDBTest : public testing::Test {
|
||||
}
|
||||
}
|
||||
|
||||
Status PutUntil(const Slice &key, const Slice &value, uint64_t expiration) {
|
||||
return blob_db_->PutUntil(WriteOptions(), key, value, expiration);
|
||||
}
|
||||
|
||||
void PutRandomWithTTL(const std::string &key, uint64_t ttl, Random *rnd,
|
||||
std::map<std::string, std::string> *data = nullptr) {
|
||||
int len = rnd->Next() % kMaxBlobSize + 1;
|
||||
@ -1122,6 +1131,95 @@ TEST_F(BlobDBTest, InlineSmallValues) {
|
||||
ASSERT_EQ(last_ttl_seq, ttl_file->GetSequenceRange().second);
|
||||
}
|
||||
|
||||
TEST_F(BlobDBTest, CompactionFilterNotSupported) {
|
||||
class TestCompactionFilter : public CompactionFilter {
|
||||
virtual const char *Name() const { return "TestCompactionFilter"; }
|
||||
};
|
||||
class TestCompactionFilterFactory : public CompactionFilterFactory {
|
||||
virtual const char *Name() const { return "TestCompactionFilterFactory"; }
|
||||
virtual std::unique_ptr<CompactionFilter> CreateCompactionFilter(
|
||||
const CompactionFilter::Context & /*context*/) {
|
||||
return std::unique_ptr<CompactionFilter>(new TestCompactionFilter());
|
||||
}
|
||||
};
|
||||
for (int i = 0; i < 2; i++) {
|
||||
Options options;
|
||||
if (i == 0) {
|
||||
options.compaction_filter = new TestCompactionFilter();
|
||||
} else {
|
||||
options.compaction_filter_factory.reset(
|
||||
new TestCompactionFilterFactory());
|
||||
}
|
||||
ASSERT_TRUE(TryOpen(BlobDBOptions(), options).IsNotSupported());
|
||||
delete options.compaction_filter;
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(BlobDBTest, FilterExpiredBlobIndex) {
|
||||
constexpr size_t kNumKeys = 100;
|
||||
constexpr size_t kNumPuts = 1000;
|
||||
constexpr uint64_t kMaxExpiration = 1000;
|
||||
constexpr uint64_t kCompactTime = 500;
|
||||
constexpr uint64_t kMinBlobSize = 100;
|
||||
Random rnd(301);
|
||||
mock_env_->set_current_time(0);
|
||||
BlobDBOptions bdb_options;
|
||||
bdb_options.min_blob_size = kMinBlobSize;
|
||||
bdb_options.disable_background_tasks = true;
|
||||
Options options;
|
||||
options.env = mock_env_.get();
|
||||
Open(bdb_options, options);
|
||||
|
||||
std::map<std::string, std::string> data;
|
||||
std::map<std::string, std::string> data_after_compact;
|
||||
for (size_t i = 0; i < kNumPuts; i++) {
|
||||
bool is_small_value = rnd.Next() % 2;
|
||||
bool has_ttl = rnd.Next() % 2;
|
||||
uint64_t expiration = rnd.Next() % kMaxExpiration;
|
||||
int len = is_small_value ? 10 : 200;
|
||||
std::string key = "key" + ToString(rnd.Next() % kNumKeys);
|
||||
std::string value = test::RandomHumanReadableString(&rnd, len);
|
||||
if (!has_ttl) {
|
||||
if (is_small_value) {
|
||||
std::string blob_entry;
|
||||
BlobIndex::EncodeInlinedTTL(&blob_entry, expiration, value);
|
||||
// Fake blob index with TTL. See what it will do.
|
||||
ASSERT_GT(kMinBlobSize, blob_entry.size());
|
||||
value = blob_entry;
|
||||
}
|
||||
ASSERT_OK(Put(key, value));
|
||||
data_after_compact[key] = value;
|
||||
} else {
|
||||
ASSERT_OK(PutUntil(key, value, expiration));
|
||||
if (expiration <= kCompactTime) {
|
||||
data_after_compact.erase(key);
|
||||
} else {
|
||||
data_after_compact[key] = value;
|
||||
}
|
||||
}
|
||||
data[key] = value;
|
||||
}
|
||||
VerifyDB(data);
|
||||
|
||||
mock_env_->set_current_time(kCompactTime);
|
||||
// Take a snapshot before compaction. Make sure expired blob indexes is
|
||||
// filtered regardless of snapshot.
|
||||
const Snapshot *snapshot = blob_db_->GetSnapshot();
|
||||
// Issue manual compaction to trigger compaction filter.
|
||||
ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(),
|
||||
blob_db_->DefaultColumnFamily(), nullptr,
|
||||
nullptr));
|
||||
blob_db_->ReleaseSnapshot(snapshot);
|
||||
// Verify expired blob index are filtered.
|
||||
std::vector<KeyVersion> versions;
|
||||
GetAllKeyVersions(blob_db_, "", "", &versions);
|
||||
ASSERT_EQ(data_after_compact.size(), versions.size());
|
||||
for (auto &version : versions) {
|
||||
ASSERT_TRUE(data_after_compact.count(version.user_key) > 0);
|
||||
}
|
||||
VerifyDB(data_after_compact);
|
||||
}
|
||||
|
||||
} // namespace blob_db
|
||||
} // namespace rocksdb
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user