Blob DB: Add compaction filter to remove expired blob index entries

Summary:
After adding expiration to blob index in #3066, we are now able to add a compaction filter to cleanup expired blob index entries.
Closes https://github.com/facebook/rocksdb/pull/3090

Differential Revision: D6183812

Pulled By: yiwu-arbug

fbshipit-source-id: 9cb03267a9702975290e758c9c176a2c03530b83
This commit is contained in:
Yi Wu 2017-11-02 17:26:46 -07:00
parent 7c27f3ddc6
commit 49e764a468
6 changed files with 212 additions and 6 deletions

View File

@ -182,7 +182,8 @@ void CompactionIterator::Next() {
void CompactionIterator::InvokeFilterIfNeeded(bool* need_skip, void CompactionIterator::InvokeFilterIfNeeded(bool* need_skip,
Slice* skip_until) { Slice* skip_until) {
if (compaction_filter_ != nullptr && ikey_.type == kTypeValue && if (compaction_filter_ != nullptr &&
(ikey_.type == kTypeValue || ikey_.type == kTypeBlobIndex) &&
(visible_at_tip_ || ikey_.sequence > latest_snapshot_ || (visible_at_tip_ || ikey_.sequence > latest_snapshot_ ||
ignore_snapshots_)) { ignore_snapshots_)) {
// If the user has specified a compaction filter and the sequence // If the user has specified a compaction filter and the sequence
@ -192,11 +193,13 @@ void CompactionIterator::InvokeFilterIfNeeded(bool* need_skip,
CompactionFilter::Decision filter; CompactionFilter::Decision filter;
compaction_filter_value_.clear(); compaction_filter_value_.clear();
compaction_filter_skip_until_.Clear(); compaction_filter_skip_until_.Clear();
CompactionFilter::ValueType value_type =
ikey_.type == kTypeValue ? CompactionFilter::ValueType::kValue
: CompactionFilter::ValueType::kBlobIndex;
{ {
StopWatchNano timer(env_, true); StopWatchNano timer(env_, true);
filter = compaction_filter_->FilterV2( filter = compaction_filter_->FilterV2(
compaction_->level(), ikey_.user_key, compaction_->level(), ikey_.user_key, value_type, value_,
CompactionFilter::ValueType::kValue, value_,
&compaction_filter_value_, compaction_filter_skip_until_.rep()); &compaction_filter_value_, compaction_filter_skip_until_.rep());
iter_stats_.total_filter_time += iter_stats_.total_filter_time +=
env_ != nullptr ? timer.ElapsedNanos() : 0; env_ != nullptr ? timer.ElapsedNanos() : 0;

View File

@ -36,6 +36,7 @@ class CompactionFilter {
enum ValueType { enum ValueType {
kValue, kValue,
kMergeOperand, kMergeOperand,
kBlobIndex, // used internally by BlobDB.
}; };
enum class Decision { enum class Decision {
@ -171,6 +172,8 @@ class CompactionFilter {
bool rv = FilterMergeOperand(level, key, existing_value); bool rv = FilterMergeOperand(level, key, existing_value);
return rv ? Decision::kRemove : Decision::kKeep; return rv ? Decision::kRemove : Decision::kKeep;
} }
case ValueType::kBlobIndex:
return Decision::kKeep;
} }
assert(false); assert(false);
return Decision::kKeep; return Decision::kKeep;

View File

@ -0,0 +1,78 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#pragma once
#ifndef ROCKSDB_LITE
#include "rocksdb/compaction_filter.h"
#include "rocksdb/env.h"
#include "utilities/blob_db/blob_index.h"
namespace rocksdb {
namespace blob_db {
// CompactionFilter to delete expired blob index from base DB.
class BlobIndexCompactionFilter : public CompactionFilter {
public:
explicit BlobIndexCompactionFilter(uint64_t current_time)
: current_time_(current_time) {}
virtual const char* Name() const override {
return "BlobIndexCompactionFilter";
}
// Filter expired blob indexes regardless of snapshots.
virtual bool IgnoreSnapshots() const override { return true; }
virtual Decision FilterV2(int /*level*/, const Slice& /*key*/,
ValueType value_type, const Slice& value,
std::string* /*new_value*/,
std::string* /*skip_until*/) const override {
if (value_type != kBlobIndex) {
return Decision::kKeep;
}
BlobIndex blob_index;
Status s = blob_index.DecodeFrom(value);
if (!s.ok()) {
// Unable to decode blob index. Keeping the value.
return Decision::kKeep;
}
if (blob_index.HasTTL() && blob_index.expiration() <= current_time_) {
// Expired
return Decision::kRemove;
}
return Decision::kKeep;
}
private:
const uint64_t current_time_;
};
class BlobIndexCompactionFilterFactory : public CompactionFilterFactory {
public:
explicit BlobIndexCompactionFilterFactory(Env* env) : env_(env) {}
virtual const char* Name() const override {
return "BlobIndexCompactionFilterFactory";
}
virtual std::unique_ptr<CompactionFilter> CreateCompactionFilter(
const CompactionFilter::Context& /*context*/) override {
int64_t current_time = 0;
Status s = env_->GetCurrentTime(&current_time);
if (!s.ok()) {
return nullptr;
}
assert(current_time >= 0);
return std::unique_ptr<CompactionFilter>(
new BlobIndexCompactionFilter(static_cast<uint64_t>(current_time)));
}
private:
Env* env_;
};
} // namespace blob_db
} // namespace rocksdb
#endif // ROCKSDB_LITE

View File

@ -26,6 +26,7 @@
#include "table/block_builder.h" #include "table/block_builder.h"
#include "util/file_reader_writer.h" #include "util/file_reader_writer.h"
#include "util/filename.h" #include "util/filename.h"
#include "utilities/blob_db/blob_compaction_filter.h"
#include "utilities/blob_db/blob_db_impl.h" #include "utilities/blob_db/blob_db_impl.h"
namespace rocksdb { namespace rocksdb {
@ -45,6 +46,11 @@ Status BlobDB::OpenAndLoad(const Options& options,
const BlobDBOptions& bdb_options, const BlobDBOptions& bdb_options,
const std::string& dbname, BlobDB** blob_db, const std::string& dbname, BlobDB** blob_db,
Options* changed_options) { Options* changed_options) {
if (options.compaction_filter != nullptr ||
options.compaction_filter_factory != nullptr) {
return Status::NotSupported("Blob DB doesn't support compaction filter.");
}
*changed_options = options; *changed_options = options;
*blob_db = nullptr; *blob_db = nullptr;
@ -63,6 +69,8 @@ Status BlobDB::OpenAndLoad(const Options& options,
all_wal_filters.push_back(rw_filter); all_wal_filters.push_back(rw_filter);
} }
changed_options->compaction_filter_factory.reset(
new BlobIndexCompactionFilterFactory(options.env));
changed_options->listeners.emplace_back(fblistener); changed_options->listeners.emplace_back(fblistener);
if (bdb_options.enable_garbage_collection) { if (bdb_options.enable_garbage_collection) {
changed_options->listeners.emplace_back(ce_listener); changed_options->listeners.emplace_back(ce_listener);
@ -112,6 +120,11 @@ Status BlobDB::Open(const DBOptions& db_options_input,
const std::vector<ColumnFamilyDescriptor>& column_families, const std::vector<ColumnFamilyDescriptor>& column_families,
std::vector<ColumnFamilyHandle*>* handles, BlobDB** blob_db, std::vector<ColumnFamilyHandle*>* handles, BlobDB** blob_db,
bool no_base_db) { bool no_base_db) {
if (column_families.size() != 1 ||
column_families[0].name != kDefaultColumnFamilyName) {
return Status::NotSupported(
"Blob DB doesn't support non-default column family.");
}
*blob_db = nullptr; *blob_db = nullptr;
Status s; Status s;
@ -144,6 +157,15 @@ Status BlobDB::Open(const DBOptions& db_options_input,
all_wal_filters.push_back(rw_filter); all_wal_filters.push_back(rw_filter);
} }
ColumnFamilyOptions cf_options(column_families[0].options);
if (cf_options.compaction_filter != nullptr ||
cf_options.compaction_filter_factory != nullptr) {
return Status::NotSupported("Blob DB doesn't support compaction filter.");
}
cf_options.compaction_filter_factory.reset(
new BlobIndexCompactionFilterFactory(db_options.env));
ColumnFamilyDescriptor cf_descriptor(kDefaultColumnFamilyName, cf_options);
// we need to open blob db first so that recovery can happen // we need to open blob db first so that recovery can happen
BlobDBImpl* bdb = new BlobDBImpl(dbname, bdb_options, db_options); BlobDBImpl* bdb = new BlobDBImpl(dbname, bdb_options, db_options);
fblistener->SetImplPtr(bdb); fblistener->SetImplPtr(bdb);
@ -164,7 +186,7 @@ Status BlobDB::Open(const DBOptions& db_options_input,
} }
DB* db = nullptr; DB* db = nullptr;
s = DB::Open(db_options, dbname, column_families, handles, &db); s = DB::Open(db_options, dbname, {cf_descriptor}, handles, &db);
if (!s.ok()) { if (!s.ok()) {
delete bdb; delete bdb;
return s; return s;

View File

@ -1727,6 +1727,8 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr<BlobFile>& bfptr,
GarbageCollectionWriteCallback callback(cfd, record.key, latest_seq); GarbageCollectionWriteCallback callback(cfd, record.key, latest_seq);
// If key has expired, remove it from base DB. // If key has expired, remove it from base DB.
// TODO(yiwu): Blob indexes will be remove by BlobIndexCompactionFilter.
// We can just drop the blob record.
if (no_relocation_ttl || (has_ttl && now >= record.expiration)) { if (no_relocation_ttl || (has_ttl && now >= record.expiration)) {
gc_stats->num_deletes++; gc_stats->num_deletes++;
gc_stats->deleted_size += record.value_size; gc_stats->deleted_size += record.value_size;

View File

@ -47,10 +47,15 @@ class BlobDBTest : public testing::Test {
~BlobDBTest() { Destroy(); } ~BlobDBTest() { Destroy(); }
void Open(BlobDBOptions bdb_options = BlobDBOptions(), Status TryOpen(BlobDBOptions bdb_options = BlobDBOptions(),
Options options = Options()) { Options options = Options()) {
options.create_if_missing = true; options.create_if_missing = true;
ASSERT_OK(BlobDB::Open(options, bdb_options, dbname_, &blob_db_)); return BlobDB::Open(options, bdb_options, dbname_, &blob_db_);
}
void Open(BlobDBOptions bdb_options = BlobDBOptions(),
Options options = Options()) {
ASSERT_OK(TryOpen(bdb_options, options));
} }
void Destroy() { void Destroy() {
@ -79,6 +84,10 @@ class BlobDBTest : public testing::Test {
} }
} }
Status PutUntil(const Slice &key, const Slice &value, uint64_t expiration) {
return blob_db_->PutUntil(WriteOptions(), key, value, expiration);
}
void PutRandomWithTTL(const std::string &key, uint64_t ttl, Random *rnd, void PutRandomWithTTL(const std::string &key, uint64_t ttl, Random *rnd,
std::map<std::string, std::string> *data = nullptr) { std::map<std::string, std::string> *data = nullptr) {
int len = rnd->Next() % kMaxBlobSize + 1; int len = rnd->Next() % kMaxBlobSize + 1;
@ -1122,6 +1131,95 @@ TEST_F(BlobDBTest, InlineSmallValues) {
ASSERT_EQ(last_ttl_seq, ttl_file->GetSequenceRange().second); ASSERT_EQ(last_ttl_seq, ttl_file->GetSequenceRange().second);
} }
TEST_F(BlobDBTest, CompactionFilterNotSupported) {
class TestCompactionFilter : public CompactionFilter {
virtual const char *Name() const { return "TestCompactionFilter"; }
};
class TestCompactionFilterFactory : public CompactionFilterFactory {
virtual const char *Name() const { return "TestCompactionFilterFactory"; }
virtual std::unique_ptr<CompactionFilter> CreateCompactionFilter(
const CompactionFilter::Context & /*context*/) {
return std::unique_ptr<CompactionFilter>(new TestCompactionFilter());
}
};
for (int i = 0; i < 2; i++) {
Options options;
if (i == 0) {
options.compaction_filter = new TestCompactionFilter();
} else {
options.compaction_filter_factory.reset(
new TestCompactionFilterFactory());
}
ASSERT_TRUE(TryOpen(BlobDBOptions(), options).IsNotSupported());
delete options.compaction_filter;
}
}
TEST_F(BlobDBTest, FilterExpiredBlobIndex) {
constexpr size_t kNumKeys = 100;
constexpr size_t kNumPuts = 1000;
constexpr uint64_t kMaxExpiration = 1000;
constexpr uint64_t kCompactTime = 500;
constexpr uint64_t kMinBlobSize = 100;
Random rnd(301);
mock_env_->set_current_time(0);
BlobDBOptions bdb_options;
bdb_options.min_blob_size = kMinBlobSize;
bdb_options.disable_background_tasks = true;
Options options;
options.env = mock_env_.get();
Open(bdb_options, options);
std::map<std::string, std::string> data;
std::map<std::string, std::string> data_after_compact;
for (size_t i = 0; i < kNumPuts; i++) {
bool is_small_value = rnd.Next() % 2;
bool has_ttl = rnd.Next() % 2;
uint64_t expiration = rnd.Next() % kMaxExpiration;
int len = is_small_value ? 10 : 200;
std::string key = "key" + ToString(rnd.Next() % kNumKeys);
std::string value = test::RandomHumanReadableString(&rnd, len);
if (!has_ttl) {
if (is_small_value) {
std::string blob_entry;
BlobIndex::EncodeInlinedTTL(&blob_entry, expiration, value);
// Fake blob index with TTL. See what it will do.
ASSERT_GT(kMinBlobSize, blob_entry.size());
value = blob_entry;
}
ASSERT_OK(Put(key, value));
data_after_compact[key] = value;
} else {
ASSERT_OK(PutUntil(key, value, expiration));
if (expiration <= kCompactTime) {
data_after_compact.erase(key);
} else {
data_after_compact[key] = value;
}
}
data[key] = value;
}
VerifyDB(data);
mock_env_->set_current_time(kCompactTime);
// Take a snapshot before compaction. Make sure expired blob indexes is
// filtered regardless of snapshot.
const Snapshot *snapshot = blob_db_->GetSnapshot();
// Issue manual compaction to trigger compaction filter.
ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(),
blob_db_->DefaultColumnFamily(), nullptr,
nullptr));
blob_db_->ReleaseSnapshot(snapshot);
// Verify expired blob index are filtered.
std::vector<KeyVersion> versions;
GetAllKeyVersions(blob_db_, "", "", &versions);
ASSERT_EQ(data_after_compact.size(), versions.size());
for (auto &version : versions) {
ASSERT_TRUE(data_after_compact.count(version.user_key) > 0);
}
VerifyDB(data_after_compact);
}
} // namespace blob_db } // namespace blob_db
} // namespace rocksdb } // namespace rocksdb