Blob DB: cleanup unused options
Summary: * cleanup num_concurrent_simple_blobs. We don't do concurrent writes (by taking write_mutex_) so it doesn't make sense to have multiple non TTL files open. We can revisit later when we want to improve writes. * cleanup eviction callback. we don't have plan to use it now. * rename s/open_simple_blob_files_/open_non_ttl_file_/ and s/open_blob_files_/open_ttl_files_/ to avoid confusion. Closes https://github.com/facebook/rocksdb/pull/3088 Differential Revision: D6182598 Pulled By: yiwu-arbug fbshipit-source-id: 99e6f5e01fa66d31309cdb06ce48502464bac6ad
This commit is contained in:
parent
ffc3c62ca2
commit
c1e99eddc8
@ -186,8 +186,6 @@ void BlobDBOptions::Dump(Logger* log) const {
|
||||
bytes_per_sync);
|
||||
ROCKS_LOG_HEADER(log, " blob_db_options.blob_file_size: %" PRIu64,
|
||||
blob_file_size);
|
||||
ROCKS_LOG_HEADER(log, "blob_db_options.num_concurrent_simple_blobs: %" PRIu32,
|
||||
num_concurrent_simple_blobs);
|
||||
ROCKS_LOG_HEADER(log, " blob_db_options.ttl_extractor: %p",
|
||||
ttl_extractor.get());
|
||||
ROCKS_LOG_HEADER(log, " blob_db_options.compression: %d",
|
||||
|
@ -63,20 +63,11 @@ struct BlobDBOptions {
|
||||
// after it exceeds that size
|
||||
uint64_t blob_file_size = 256 * 1024 * 1024;
|
||||
|
||||
// how many files to use for simple blobs at one time
|
||||
uint32_t num_concurrent_simple_blobs = 1;
|
||||
|
||||
// Instead of setting TTL explicitly by calling PutWithTTL or PutUntil,
|
||||
// applications can set a TTLExtractor which can extract TTL from key-value
|
||||
// pairs.
|
||||
std::shared_ptr<TTLExtractor> ttl_extractor = nullptr;
|
||||
|
||||
// eviction callback.
|
||||
// this function will be called for every blob that is getting
|
||||
// evicted.
|
||||
std::function<void(const ColumnFamilyHandle*, const Slice&, const Slice&)>
|
||||
gc_evict_cb_fn;
|
||||
|
||||
// what compression to use for Blob's
|
||||
CompressionType compression = kNoCompression;
|
||||
|
||||
|
@ -413,7 +413,7 @@ Status BlobDBImpl::OpenAllFiles() {
|
||||
expiration_range.second);
|
||||
}
|
||||
} else {
|
||||
open_blob_files_.insert(bfptr);
|
||||
open_ttl_files_.insert(bfptr);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -493,23 +493,23 @@ Status BlobDBImpl::CreateWriterLocked(const std::shared_ptr<BlobFile>& bfile) {
|
||||
|
||||
std::shared_ptr<BlobFile> BlobDBImpl::FindBlobFileLocked(
|
||||
uint64_t expiration) const {
|
||||
if (open_blob_files_.empty()) return nullptr;
|
||||
if (open_ttl_files_.empty()) return nullptr;
|
||||
|
||||
std::shared_ptr<BlobFile> tmp = std::make_shared<BlobFile>();
|
||||
tmp->expiration_range_ = std::make_pair(expiration, 0);
|
||||
|
||||
auto citr = open_blob_files_.equal_range(tmp);
|
||||
if (citr.first == open_blob_files_.end()) {
|
||||
assert(citr.second == open_blob_files_.end());
|
||||
auto citr = open_ttl_files_.equal_range(tmp);
|
||||
if (citr.first == open_ttl_files_.end()) {
|
||||
assert(citr.second == open_ttl_files_.end());
|
||||
|
||||
std::shared_ptr<BlobFile> check = *(open_blob_files_.rbegin());
|
||||
std::shared_ptr<BlobFile> check = *(open_ttl_files_.rbegin());
|
||||
return (check->expiration_range_.second < expiration) ? nullptr : check;
|
||||
}
|
||||
|
||||
if (citr.first != citr.second) return *(citr.first);
|
||||
|
||||
auto finditr = citr.second;
|
||||
if (finditr != open_blob_files_.begin()) --finditr;
|
||||
if (finditr != open_ttl_files_.begin()) --finditr;
|
||||
|
||||
bool b2 = (*finditr)->expiration_range_.second < expiration;
|
||||
bool b1 = (*finditr)->expiration_range_.first > expiration;
|
||||
@ -530,11 +530,17 @@ std::shared_ptr<Writer> BlobDBImpl::CheckOrCreateWriterLocked(
|
||||
}
|
||||
|
||||
std::shared_ptr<BlobFile> BlobDBImpl::SelectBlobFile() {
|
||||
uint32_t val = blob_rgen.Next();
|
||||
{
|
||||
ReadLock rl(&mutex_);
|
||||
if (open_simple_files_.size() == bdb_options_.num_concurrent_simple_blobs)
|
||||
return open_simple_files_[val % bdb_options_.num_concurrent_simple_blobs];
|
||||
if (open_non_ttl_file_ != nullptr) {
|
||||
return open_non_ttl_file_;
|
||||
}
|
||||
}
|
||||
|
||||
// CHECK again
|
||||
WriteLock wl(&mutex_);
|
||||
if (open_non_ttl_file_ != nullptr) {
|
||||
return open_non_ttl_file_;
|
||||
}
|
||||
|
||||
std::shared_ptr<BlobFile> bfile = NewBlobFile("SelectBlobFile");
|
||||
@ -557,12 +563,6 @@ std::shared_ptr<BlobFile> BlobDBImpl::SelectBlobFile() {
|
||||
bfile->header_valid_ = true;
|
||||
bfile->SetHasTTL(false);
|
||||
|
||||
// CHECK again
|
||||
WriteLock wl(&mutex_);
|
||||
if (open_simple_files_.size() == bdb_options_.num_concurrent_simple_blobs) {
|
||||
return open_simple_files_[val % bdb_options_.num_concurrent_simple_blobs];
|
||||
}
|
||||
|
||||
Status s = writer->WriteHeader(bfile->header_);
|
||||
if (!s.ok()) {
|
||||
ROCKS_LOG_ERROR(db_options_.info_log,
|
||||
@ -574,7 +574,7 @@ std::shared_ptr<BlobFile> BlobDBImpl::SelectBlobFile() {
|
||||
|
||||
dir_change_.store(true);
|
||||
blob_files_.insert(std::make_pair(bfile->BlobFileNumber(), bfile));
|
||||
open_simple_files_.push_back(bfile);
|
||||
open_non_ttl_file_ = bfile;
|
||||
return bfile;
|
||||
}
|
||||
|
||||
@ -625,7 +625,7 @@ std::shared_ptr<BlobFile> BlobDBImpl::SelectBlobFileTTL(uint64_t expiration) {
|
||||
bfile->file_size_ = BlobLogHeader::kSize;
|
||||
|
||||
// set the first value of the range, since that is
|
||||
// concrete at this time. also necessary to add to open_blob_files_
|
||||
// concrete at this time. also necessary to add to open_ttl_files_
|
||||
bfile->expiration_range_ = expiration_range;
|
||||
|
||||
WriteLock wl(&mutex_);
|
||||
@ -647,7 +647,7 @@ std::shared_ptr<BlobFile> BlobDBImpl::SelectBlobFileTTL(uint64_t expiration) {
|
||||
|
||||
dir_change_.store(true);
|
||||
blob_files_.insert(std::make_pair(bfile->BlobFileNumber(), bfile));
|
||||
open_blob_files_.insert(bfile);
|
||||
open_ttl_files_.insert(bfile);
|
||||
epoch_of_++;
|
||||
|
||||
return bfile;
|
||||
@ -1192,9 +1192,9 @@ std::pair<bool, int64_t> BlobDBImpl::SanityCheck(bool aborted) {
|
||||
blob_files_.size());
|
||||
|
||||
ROCKS_LOG_INFO(db_options_.info_log, "Number of open files %" PRIu64,
|
||||
open_blob_files_.size());
|
||||
open_ttl_files_.size());
|
||||
|
||||
for (auto bfile : open_blob_files_) {
|
||||
for (auto bfile : open_ttl_files_) {
|
||||
assert(!bfile->Immutable());
|
||||
}
|
||||
|
||||
@ -1215,6 +1215,7 @@ std::pair<bool, int64_t> BlobDBImpl::SanityCheck(bool aborted) {
|
||||
}
|
||||
|
||||
Status BlobDBImpl::CloseBlobFile(std::shared_ptr<BlobFile> bfile) {
|
||||
assert(bfile != nullptr);
|
||||
Status s;
|
||||
ROCKS_LOG_INFO(db_options_.info_log, "Close blob file %" PRIu64,
|
||||
bfile->BlobFileNumber());
|
||||
@ -1222,13 +1223,12 @@ Status BlobDBImpl::CloseBlobFile(std::shared_ptr<BlobFile> bfile) {
|
||||
WriteLock wl(&mutex_);
|
||||
|
||||
if (bfile->HasTTL()) {
|
||||
size_t erased __attribute__((__unused__)) = open_blob_files_.erase(bfile);
|
||||
size_t erased __attribute__((__unused__));
|
||||
erased = open_ttl_files_.erase(bfile);
|
||||
assert(erased == 1);
|
||||
} else {
|
||||
auto iter = std::find(open_simple_files_.begin(),
|
||||
open_simple_files_.end(), bfile);
|
||||
assert(iter != open_simple_files_.end());
|
||||
open_simple_files_.erase(iter);
|
||||
assert(bfile == open_non_ttl_file_);
|
||||
open_non_ttl_file_ = nullptr;
|
||||
}
|
||||
}
|
||||
|
||||
@ -1411,7 +1411,7 @@ std::pair<bool, int64_t> BlobDBImpl::CheckSeqFiles(bool aborted) {
|
||||
uint64_t epoch_now = EpochNow();
|
||||
|
||||
ReadLock rl(&mutex_);
|
||||
for (auto bfile : open_blob_files_) {
|
||||
for (auto bfile : open_ttl_files_) {
|
||||
{
|
||||
ReadLock lockbfile_r(&bfile->mutex_);
|
||||
|
||||
@ -1436,14 +1436,14 @@ std::pair<bool, int64_t> BlobDBImpl::FsyncFiles(bool aborted) {
|
||||
std::vector<std::shared_ptr<BlobFile>> process_files;
|
||||
{
|
||||
ReadLock rl(&mutex_);
|
||||
for (auto fitr : open_blob_files_) {
|
||||
for (auto fitr : open_ttl_files_) {
|
||||
if (fitr->NeedsFsync(true, bdb_options_.bytes_per_sync))
|
||||
process_files.push_back(fitr);
|
||||
}
|
||||
|
||||
for (auto fitr : open_simple_files_) {
|
||||
if (fitr->NeedsFsync(true, bdb_options_.bytes_per_sync))
|
||||
process_files.push_back(fitr);
|
||||
if (open_non_ttl_file_ != nullptr &&
|
||||
open_non_ttl_file_->NeedsFsync(true, bdb_options_.bytes_per_sync)) {
|
||||
process_files.push_back(open_non_ttl_file_);
|
||||
}
|
||||
}
|
||||
|
||||
@ -1799,7 +1799,7 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr<BlobFile>& bfptr,
|
||||
// but under the asusmption that this is only called when a
|
||||
// file is Immutable, we can reduce the critical section
|
||||
bool BlobDBImpl::ShouldGCFile(std::shared_ptr<BlobFile> bfile, uint64_t now,
|
||||
bool is_oldest_simple_blob_file,
|
||||
bool is_oldest_non_ttl_file,
|
||||
std::string* reason) {
|
||||
if (bfile->HasTTL()) {
|
||||
ExpirationRange expiration_range = bfile->GetExpirationRange();
|
||||
@ -1857,7 +1857,7 @@ bool BlobDBImpl::ShouldGCFile(std::shared_ptr<BlobFile> bfile, uint64_t now,
|
||||
return false;
|
||||
}
|
||||
|
||||
if (is_oldest_simple_blob_file) {
|
||||
if (is_oldest_non_ttl_file) {
|
||||
*reason = "out of space and is the oldest simple blob file";
|
||||
return true;
|
||||
}
|
||||
@ -1923,72 +1923,6 @@ std::pair<bool, int64_t> BlobDBImpl::DeleteObsoleteFiles(bool aborted) {
|
||||
return std::make_pair(!aborted, -1);
|
||||
}
|
||||
|
||||
bool BlobDBImpl::CallbackEvictsImpl(std::shared_ptr<BlobFile> bfile) {
|
||||
std::shared_ptr<Reader> reader =
|
||||
bfile->OpenSequentialReader(env_, db_options_, env_options_);
|
||||
if (!reader) {
|
||||
ROCKS_LOG_ERROR(
|
||||
db_options_.info_log,
|
||||
"File sequential reader could not be opened for evict callback: %s",
|
||||
bfile->PathName().c_str());
|
||||
return false;
|
||||
}
|
||||
|
||||
ReadLock lockbfile_r(&bfile->mutex_);
|
||||
|
||||
BlobLogHeader header;
|
||||
Status s = reader->ReadHeader(&header);
|
||||
if (!s.ok()) {
|
||||
ROCKS_LOG_ERROR(
|
||||
db_options_.info_log,
|
||||
"Failure to read header for blob-file during evict callback %s",
|
||||
bfile->PathName().c_str());
|
||||
return false;
|
||||
}
|
||||
|
||||
ColumnFamilyHandle* cfh =
|
||||
db_impl_->GetColumnFamilyHandleUnlocked(bfile->column_family_id());
|
||||
BlobLogRecord record;
|
||||
Reader::ReadLevel full = Reader::kReadHeaderKeyBlob;
|
||||
while (reader->ReadRecord(&record, full).ok()) {
|
||||
bdb_options_.gc_evict_cb_fn(cfh, record.key, record.value);
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
std::pair<bool, int64_t> BlobDBImpl::RemoveTimerQ(TimerQueue* tq,
|
||||
bool aborted) {
|
||||
WriteLock wl(&mutex_);
|
||||
for (auto itr = cb_threads_.begin(); itr != cb_threads_.end(); ++itr) {
|
||||
if ((*itr).get() != tq) continue;
|
||||
|
||||
cb_threads_.erase(itr);
|
||||
break;
|
||||
}
|
||||
return std::make_pair(false, -1);
|
||||
}
|
||||
|
||||
std::pair<bool, int64_t> BlobDBImpl::CallbackEvicts(
|
||||
TimerQueue* tq, std::shared_ptr<BlobFile> bfile, bool aborted) {
|
||||
if (aborted) return std::make_pair(false, -1);
|
||||
bool succ = CallbackEvictsImpl(bfile);
|
||||
if (succ) {
|
||||
ROCKS_LOG_DEBUG(db_options_.info_log, "Eviction callbacks completed %s",
|
||||
bfile->PathName().c_str());
|
||||
}
|
||||
|
||||
WriteLock wl(&mutex_);
|
||||
bfile->SetCanBeDeleted();
|
||||
obsolete_files_.push_front(bfile);
|
||||
if (tq) {
|
||||
// all of the callbacks have been processed
|
||||
tqueue_.add(0, std::bind(&BlobDBImpl::RemoveTimerQ, this, tq,
|
||||
std::placeholders::_1));
|
||||
}
|
||||
return std::make_pair(false, -1);
|
||||
}
|
||||
|
||||
void BlobDBImpl::CopyBlobFiles(
|
||||
std::vector<std::shared_ptr<BlobFile>>* bfiles_copy) {
|
||||
ReadLock rl(&mutex_);
|
||||
@ -2010,7 +1944,7 @@ void BlobDBImpl::FilterSubsetOfFiles(
|
||||
uint64_t now = EpochNow();
|
||||
|
||||
size_t files_processed = 0;
|
||||
bool simple_blob_file_found = false;
|
||||
bool non_ttl_file_found = false;
|
||||
for (auto bfile : blob_files) {
|
||||
if (files_processed >= files_to_collect) break;
|
||||
// if this is the first time processing the file
|
||||
@ -2030,15 +1964,14 @@ void BlobDBImpl::FilterSubsetOfFiles(
|
||||
// then it should not be GC'd
|
||||
if (bfile->Obsolete() || !bfile->Immutable()) continue;
|
||||
|
||||
bool is_oldest_simple_blob_file = false;
|
||||
if (!simple_blob_file_found && !bfile->HasTTL()) {
|
||||
is_oldest_simple_blob_file = true;
|
||||
simple_blob_file_found = true;
|
||||
bool is_oldest_non_ttl_file = false;
|
||||
if (!non_ttl_file_found && !bfile->HasTTL()) {
|
||||
is_oldest_non_ttl_file = true;
|
||||
non_ttl_file_found = true;
|
||||
}
|
||||
|
||||
std::string reason;
|
||||
bool shouldgc =
|
||||
ShouldGCFile(bfile, now, is_oldest_simple_blob_file, &reason);
|
||||
bool shouldgc = ShouldGCFile(bfile, now, is_oldest_non_ttl_file, &reason);
|
||||
if (!shouldgc) {
|
||||
ROCKS_LOG_DEBUG(db_options_.info_log,
|
||||
"File has been skipped for GC ttl %s %" PRIu64 " %" PRIu64
|
||||
@ -2096,25 +2029,11 @@ std::pair<bool, int64_t> BlobDBImpl::RunGC(bool aborted) {
|
||||
}
|
||||
|
||||
if (!obsoletes.empty()) {
|
||||
bool evict_cb = (!!bdb_options_.gc_evict_cb_fn);
|
||||
std::shared_ptr<TimerQueue> tq;
|
||||
if (evict_cb) tq = std::make_shared<TimerQueue>();
|
||||
|
||||
// if evict callback is present, first schedule the callback thread
|
||||
WriteLock wl(&mutex_);
|
||||
for (auto bfile : obsoletes) {
|
||||
bool last_file = (bfile == obsoletes.back());
|
||||
|
||||
if (!evict_cb) {
|
||||
bfile->SetCanBeDeleted();
|
||||
obsolete_files_.push_front(bfile);
|
||||
} else {
|
||||
tq->add(0, std::bind(&BlobDBImpl::CallbackEvicts, this,
|
||||
(last_file) ? tq.get() : nullptr, bfile,
|
||||
std::placeholders::_1));
|
||||
}
|
||||
bfile->SetCanBeDeleted();
|
||||
obsolete_files_.push_front(bfile);
|
||||
}
|
||||
if (evict_cb) cb_threads_.emplace_back(tq);
|
||||
}
|
||||
|
||||
// reschedule
|
||||
|
@ -305,7 +305,7 @@ class BlobDBImpl : public BlobDB {
|
||||
// tt - current time
|
||||
// last_id - the id of the non-TTL file to evict
|
||||
bool ShouldGCFile(std::shared_ptr<BlobFile> bfile, uint64_t now,
|
||||
bool is_oldest_simple_blob_file, std::string* reason);
|
||||
bool is_oldest_non_ttl_file, std::string* reason);
|
||||
|
||||
// collect all the blob log files from the blob directory
|
||||
Status GetAllLogFiles(std::set<std::pair<uint64_t, std::string>>* file_nums);
|
||||
@ -370,14 +370,8 @@ class BlobDBImpl : public BlobDB {
|
||||
|
||||
std::pair<bool, int64_t> EvictCompacted(bool aborted);
|
||||
|
||||
bool CallbackEvictsImpl(std::shared_ptr<BlobFile> bfile);
|
||||
|
||||
std::pair<bool, int64_t> RemoveTimerQ(TimerQueue* tq, bool aborted);
|
||||
|
||||
std::pair<bool, int64_t> CallbackEvicts(TimerQueue* tq,
|
||||
std::shared_ptr<BlobFile> bfile,
|
||||
bool aborted);
|
||||
|
||||
// Adds the background tasks to the timer queue
|
||||
void StartBackgroundTasks();
|
||||
|
||||
@ -467,12 +461,12 @@ class BlobDBImpl : public BlobDB {
|
||||
// epoch or version of the open files.
|
||||
std::atomic<uint64_t> epoch_of_;
|
||||
|
||||
// All opened non-TTL blob files.
|
||||
std::vector<std::shared_ptr<BlobFile>> open_simple_files_;
|
||||
// opened non-TTL blob file.
|
||||
std::shared_ptr<BlobFile> open_non_ttl_file_;
|
||||
|
||||
// all the blob files which are currently being appended to based
|
||||
// on variety of incoming TTL's
|
||||
std::multiset<std::shared_ptr<BlobFile>, blobf_compare_ttl> open_blob_files_;
|
||||
std::multiset<std::shared_ptr<BlobFile>, blobf_compare_ttl> open_ttl_files_;
|
||||
|
||||
// packet of information to put in lockess delete(s) queue
|
||||
struct delete_packet_t {
|
||||
@ -505,9 +499,6 @@ class BlobDBImpl : public BlobDB {
|
||||
// timer based queue to execute tasks
|
||||
TimerQueue tqueue_;
|
||||
|
||||
// timer queues to call eviction callbacks.
|
||||
std::vector<std::shared_ptr<TimerQueue>> cb_threads_;
|
||||
|
||||
// only accessed in GC thread, hence not atomic. The epoch of the
|
||||
// GC task. Each execution is one epoch. Helps us in allocating
|
||||
// files to one execution
|
||||
|
@ -268,7 +268,6 @@ TEST_F(BlobDBTest, TTLExtrator_NoTTL) {
|
||||
bdb_options.ttl_range_secs = 1000;
|
||||
bdb_options.min_blob_size = 0;
|
||||
bdb_options.blob_file_size = 256 * 1000 * 1000;
|
||||
bdb_options.num_concurrent_simple_blobs = 1;
|
||||
bdb_options.ttl_extractor = ttl_extractor_;
|
||||
bdb_options.disable_background_tasks = true;
|
||||
Open(bdb_options, options);
|
||||
|
Loading…
x
Reference in New Issue
Block a user