Blob DB: option to enable garbage collection

Summary:
Add an option to enable/disable auto garbage collection, where we keep counting how many keys have been evicted by either deletion or compaction and decide whether to garbage collect a blob file.

Default disable auto garbage collection for now since the whole logic is not fully tested and we plan to make major change to it.
Closes https://github.com/facebook/rocksdb/pull/3117

Differential Revision: D6224756

Pulled By: yiwu-arbug

fbshipit-source-id: cdf53bdccec96a4580a2b3a342110ad9e8864dfe
This commit is contained in:
Yi Wu 2017-11-02 15:47:30 -07:00
parent 11bacd5787
commit 632f36dcd3
3 changed files with 65 additions and 33 deletions

View File

@ -57,12 +57,16 @@ Status BlobDB::OpenAndLoad(const Options& options,
{ {
MutexLock l(&listener_mutex); MutexLock l(&listener_mutex);
all_blobdb_listeners.push_back(fblistener); all_blobdb_listeners.push_back(fblistener);
all_blobdb_listeners.push_back(ce_listener); if (bdb_options.enable_garbage_collection) {
all_blobdb_listeners.push_back(ce_listener);
}
all_wal_filters.push_back(rw_filter); all_wal_filters.push_back(rw_filter);
} }
changed_options->listeners.emplace_back(fblistener); changed_options->listeners.emplace_back(fblistener);
changed_options->listeners.emplace_back(ce_listener); if (bdb_options.enable_garbage_collection) {
changed_options->listeners.emplace_back(ce_listener);
}
changed_options->wal_filter = rw_filter.get(); changed_options->wal_filter = rw_filter.get();
DBOptions db_options(*changed_options); DBOptions db_options(*changed_options);
@ -71,7 +75,9 @@ Status BlobDB::OpenAndLoad(const Options& options,
BlobDBImpl* bdb = new BlobDBImpl(dbname, bdb_options, db_options); BlobDBImpl* bdb = new BlobDBImpl(dbname, bdb_options, db_options);
fblistener->SetImplPtr(bdb); fblistener->SetImplPtr(bdb);
ce_listener->SetImplPtr(bdb); if (bdb_options.enable_garbage_collection) {
ce_listener->SetImplPtr(bdb);
}
rw_filter->SetImplPtr(bdb); rw_filter->SetImplPtr(bdb);
Status s = bdb->OpenPhase1(); Status s = bdb->OpenPhase1();
@ -124,20 +130,26 @@ Status BlobDB::Open(const DBOptions& db_options_input,
ReconcileWalFilter_t rw_filter = std::make_shared<BlobReconcileWalFilter>(); ReconcileWalFilter_t rw_filter = std::make_shared<BlobReconcileWalFilter>();
db_options.listeners.emplace_back(fblistener); db_options.listeners.emplace_back(fblistener);
db_options.listeners.emplace_back(ce_listener); if (bdb_options.enable_garbage_collection) {
db_options.listeners.emplace_back(ce_listener);
}
db_options.wal_filter = rw_filter.get(); db_options.wal_filter = rw_filter.get();
{ {
MutexLock l(&listener_mutex); MutexLock l(&listener_mutex);
all_blobdb_listeners.push_back(fblistener); all_blobdb_listeners.push_back(fblistener);
all_blobdb_listeners.push_back(ce_listener); if (bdb_options.enable_garbage_collection) {
all_blobdb_listeners.push_back(ce_listener);
}
all_wal_filters.push_back(rw_filter); all_wal_filters.push_back(rw_filter);
} }
// we need to open blob db first so that recovery can happen // we need to open blob db first so that recovery can happen
BlobDBImpl* bdb = new BlobDBImpl(dbname, bdb_options, db_options); BlobDBImpl* bdb = new BlobDBImpl(dbname, bdb_options, db_options);
fblistener->SetImplPtr(bdb); fblistener->SetImplPtr(bdb);
ce_listener->SetImplPtr(bdb); if (bdb_options.enable_garbage_collection) {
ce_listener->SetImplPtr(bdb);
}
rw_filter->SetImplPtr(bdb); rw_filter->SetImplPtr(bdb);
s = bdb->OpenPhase1(); s = bdb->OpenPhase1();
@ -172,25 +184,27 @@ Status BlobDB::Open(const DBOptions& db_options_input,
BlobDB::BlobDB(DB* db) : StackableDB(db) {} BlobDB::BlobDB(DB* db) : StackableDB(db) {}
void BlobDBOptions::Dump(Logger* log) const { void BlobDBOptions::Dump(Logger* log) const {
ROCKS_LOG_HEADER(log, " blob_db_options.blob_dir: %s", ROCKS_LOG_HEADER(log, " blob_db_options.blob_dir: %s",
blob_dir.c_str()); blob_dir.c_str());
ROCKS_LOG_HEADER(log, " blob_db_options.path_relative: %d", ROCKS_LOG_HEADER(log, " blob_db_options.path_relative: %d",
path_relative); path_relative);
ROCKS_LOG_HEADER(log, " blob_db_options.is_fifo: %d", ROCKS_LOG_HEADER(log, " blob_db_options.is_fifo: %d",
is_fifo); is_fifo);
ROCKS_LOG_HEADER(log, " blob_db_options.blob_dir_size: %" PRIu64, ROCKS_LOG_HEADER(log, " blob_db_options.blob_dir_size: %" PRIu64,
blob_dir_size); blob_dir_size);
ROCKS_LOG_HEADER(log, " blob_db_options.ttl_range_secs: %" PRIu32, ROCKS_LOG_HEADER(log, " blob_db_options.ttl_range_secs: %" PRIu32,
ttl_range_secs); ttl_range_secs);
ROCKS_LOG_HEADER(log, " blob_db_options.bytes_per_sync: %" PRIu64, ROCKS_LOG_HEADER(log, " blob_db_options.bytes_per_sync: %" PRIu64,
bytes_per_sync); bytes_per_sync);
ROCKS_LOG_HEADER(log, " blob_db_options.blob_file_size: %" PRIu64, ROCKS_LOG_HEADER(log, " blob_db_options.blob_file_size: %" PRIu64,
blob_file_size); blob_file_size);
ROCKS_LOG_HEADER(log, " blob_db_options.ttl_extractor: %p", ROCKS_LOG_HEADER(log, " blob_db_options.ttl_extractor: %p",
ttl_extractor.get()); ttl_extractor.get());
ROCKS_LOG_HEADER(log, " blob_db_options.compression: %d", ROCKS_LOG_HEADER(log, " blob_db_options.compression: %d",
static_cast<int>(compression)); static_cast<int>(compression));
ROCKS_LOG_HEADER(log, " blob_db_options.disable_background_tasks: %d", ROCKS_LOG_HEADER(log, "blob_db_options.enable_garbage_collection: %d",
enable_garbage_collection);
ROCKS_LOG_HEADER(log, " blob_db_options.disable_background_tasks: %d",
disable_background_tasks); disable_background_tasks);
} }

View File

@ -71,7 +71,12 @@ struct BlobDBOptions {
// what compression to use for Blob's // what compression to use for Blob's
CompressionType compression = kNoCompression; CompressionType compression = kNoCompression;
// Disable all background job. // If enabled, blob DB periodically cleanup stale data by rewriting remaining
// live data in blob files to new files. If garbage collection is not enabled,
// blob files will be cleanup based on TTL.
bool enable_garbage_collection = false;
// Disable all background job. Used for test only.
bool disable_background_tasks = false; bool disable_background_tasks = false;
void Dump(Logger* log) const; void Dump(Logger* log) const;

View File

@ -69,6 +69,7 @@ void EvictAllVersionsCompactionListener::InternalListener::OnCompaction(
int level, const Slice& key, int level, const Slice& key,
CompactionEventListener::CompactionListenerValueType value_type, CompactionEventListener::CompactionListenerValueType value_type,
const Slice& existing_value, const SequenceNumber& sn, bool is_new) { const Slice& existing_value, const SequenceNumber& sn, bool is_new) {
assert(impl_->bdb_options_.enable_garbage_collection);
if (!is_new && if (!is_new &&
value_type == value_type ==
CompactionEventListener::CompactionListenerValueType::kValue) { CompactionEventListener::CompactionListenerValueType::kValue) {
@ -213,12 +214,14 @@ void BlobDBImpl::StartBackgroundTasks() {
std::bind(&BlobDBImpl::ReclaimOpenFiles, this, std::placeholders::_1)); std::bind(&BlobDBImpl::ReclaimOpenFiles, this, std::placeholders::_1));
tqueue_.add(kGCCheckPeriodMillisecs, tqueue_.add(kGCCheckPeriodMillisecs,
std::bind(&BlobDBImpl::RunGC, this, std::placeholders::_1)); std::bind(&BlobDBImpl::RunGC, this, std::placeholders::_1));
tqueue_.add( if (bdb_options_.enable_garbage_collection) {
kDeleteCheckPeriodMillisecs, tqueue_.add(
std::bind(&BlobDBImpl::EvictDeletions, this, std::placeholders::_1)); kDeleteCheckPeriodMillisecs,
tqueue_.add( std::bind(&BlobDBImpl::EvictDeletions, this, std::placeholders::_1));
kDeleteCheckPeriodMillisecs, tqueue_.add(
std::bind(&BlobDBImpl::EvictCompacted, this, std::placeholders::_1)); kDeleteCheckPeriodMillisecs,
std::bind(&BlobDBImpl::EvictCompacted, this, std::placeholders::_1));
}
tqueue_.add( tqueue_.add(
kDeleteObsoleteFilesPeriodMillisecs, kDeleteObsoleteFilesPeriodMillisecs,
std::bind(&BlobDBImpl::DeleteObsoleteFiles, this, std::placeholders::_1)); std::bind(&BlobDBImpl::DeleteObsoleteFiles, this, std::placeholders::_1));
@ -659,8 +662,10 @@ Status BlobDBImpl::Delete(const WriteOptions& options, const Slice& key) {
SequenceNumber lsn = db_impl_->GetLatestSequenceNumber(); SequenceNumber lsn = db_impl_->GetLatestSequenceNumber();
Status s = db_->Delete(options, key); Status s = db_->Delete(options, key);
// add deleted key to list of keys that have been deleted for book-keeping if (bdb_options_.enable_garbage_collection) {
delete_keys_q_.enqueue({DefaultColumnFamily(), key.ToString(), lsn}); // add deleted key to list of keys that have been deleted for book-keeping
delete_keys_q_.enqueue({DefaultColumnFamily(), key.ToString(), lsn});
}
return s; return s;
} }
@ -780,11 +785,13 @@ Status BlobDBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
SequenceNumber sequence_; SequenceNumber sequence_;
}; };
// add deleted key to list of keys that have been deleted for book-keeping if (bdb_options_.enable_garbage_collection) {
DeleteBookkeeper delete_bookkeeper(this, current_seq); // add deleted key to list of keys that have been deleted for book-keeping
updates->Iterate(&delete_bookkeeper); DeleteBookkeeper delete_bookkeeper(this, current_seq);
s = updates->Iterate(&delete_bookkeeper);
}
return Status::OK(); return s;
} }
Status BlobDBImpl::GetLiveFiles(std::vector<std::string>& ret, Status BlobDBImpl::GetLiveFiles(std::vector<std::string>& ret,
@ -1318,6 +1325,7 @@ bool BlobDBImpl::FileDeleteOk_SnapshotCheckLocked(
bool BlobDBImpl::FindFileAndEvictABlob(uint64_t file_number, uint64_t key_size, bool BlobDBImpl::FindFileAndEvictABlob(uint64_t file_number, uint64_t key_size,
uint64_t blob_offset, uint64_t blob_offset,
uint64_t blob_size) { uint64_t blob_size) {
assert(bdb_options_.enable_garbage_collection);
(void)blob_offset; (void)blob_offset;
std::shared_ptr<BlobFile> bfile; std::shared_ptr<BlobFile> bfile;
{ {
@ -1340,6 +1348,7 @@ bool BlobDBImpl::FindFileAndEvictABlob(uint64_t file_number, uint64_t key_size,
} }
bool BlobDBImpl::MarkBlobDeleted(const Slice& key, const Slice& index_entry) { bool BlobDBImpl::MarkBlobDeleted(const Slice& key, const Slice& index_entry) {
assert(bdb_options_.enable_garbage_collection);
BlobIndex blob_index; BlobIndex blob_index;
Status s = blob_index.DecodeFrom(index_entry); Status s = blob_index.DecodeFrom(index_entry);
if (!s.ok()) { if (!s.ok()) {
@ -1354,6 +1363,7 @@ bool BlobDBImpl::MarkBlobDeleted(const Slice& key, const Slice& index_entry) {
} }
std::pair<bool, int64_t> BlobDBImpl::EvictCompacted(bool aborted) { std::pair<bool, int64_t> BlobDBImpl::EvictCompacted(bool aborted) {
assert(bdb_options_.enable_garbage_collection);
if (aborted) return std::make_pair(false, -1); if (aborted) return std::make_pair(false, -1);
override_packet_t packet; override_packet_t packet;
@ -1377,6 +1387,7 @@ std::pair<bool, int64_t> BlobDBImpl::EvictCompacted(bool aborted) {
} }
std::pair<bool, int64_t> BlobDBImpl::EvictDeletions(bool aborted) { std::pair<bool, int64_t> BlobDBImpl::EvictDeletions(bool aborted) {
assert(bdb_options_.enable_garbage_collection);
if (aborted) return std::make_pair(false, -1); if (aborted) return std::make_pair(false, -1);
ColumnFamilyHandle* last_cfh = nullptr; ColumnFamilyHandle* last_cfh = nullptr;
@ -1882,10 +1893,12 @@ bool BlobDBImpl::ShouldGCFile(std::shared_ptr<BlobFile> bfile, uint64_t now,
ReadLock lockbfile_r(&bfile->mutex_); ReadLock lockbfile_r(&bfile->mutex_);
if ((bfile->deleted_size_ * 100.0 / bfile->file_size_.load()) > if (bdb_options_.enable_garbage_collection) {
kPartialExpirationPercentage) { if ((bfile->deleted_size_ * 100.0 / bfile->file_size_.load()) >
*reason = "deleted simple blobs beyond threshold"; kPartialExpirationPercentage) {
return true; *reason = "deleted simple blobs beyond threshold";
return true;
}
} }
// if we haven't reached limits of disk space, don't DELETE // if we haven't reached limits of disk space, don't DELETE