Refactor blob file creation logic (#6066)

Summary:
The patch refactors and cleans up the logic around creating new blob files
by moving the common code of `SelectBlobFile` and `SelectBlobFileTTL`
to a new helper method `CreateBlobFileAndWriter`, bringing the implementation
of `SelectBlobFile` and `SelectBlobFileTTL` into sync, and increasing encapsulation
by adding new constructors for `BlobFile` and `BlobLogHeader`.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6066

Test Plan:
Ran `make check` and used the BlobDB mode of `db_bench` to sanity test both
the TTL and the non-TTL code paths.

Differential Revision: D18646921

Pulled By: ltamasi

fbshipit-source-id: e5705a84807932e31dccab4f49b3e64369cea26d
This commit is contained in:
Levi Tamasi 2019-11-26 13:16:39 -08:00 committed by Facebook Github Bot
parent 771e1723c7
commit 72daa92d3a
5 changed files with 152 additions and 177 deletions

View File

@ -84,7 +84,6 @@ BlobDBImpl::BlobDBImpl(const std::string& dbname,
statistics_(db_options_.statistics.get()),
next_file_number_(1),
flush_sequence_(0),
epoch_of_(0),
closed_(true),
open_file_count_(0),
total_blob_size_(0),
@ -584,14 +583,24 @@ Status BlobDBImpl::GetBlobFileReader(
return s;
}
std::shared_ptr<BlobFile> BlobDBImpl::NewBlobFile(const std::string& reason) {
std::shared_ptr<BlobFile> BlobDBImpl::NewBlobFile(
bool has_ttl, const ExpirationRange& expiration_range,
const std::string& reason) {
assert(has_ttl == (expiration_range.first || expiration_range.second));
uint64_t file_num = next_file_number_++;
auto bfile = std::make_shared<BlobFile>(this, blob_dir_, file_num,
db_options_.info_log.get());
const uint32_t column_family_id =
static_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily())->GetID();
auto blob_file = std::make_shared<BlobFile>(
this, blob_dir_, file_num, db_options_.info_log.get(), column_family_id,
bdb_options_.compression, has_ttl, expiration_range);
ROCKS_LOG_DEBUG(db_options_.info_log, "New blob file created: %s reason='%s'",
bfile->PathName().c_str(), reason.c_str());
blob_file->PathName().c_str(), reason.c_str());
LogFlush(db_options_.info_log);
return bfile;
return blob_file;
}
Status BlobDBImpl::CreateWriterLocked(const std::shared_ptr<BlobFile>& bfile) {
@ -687,142 +696,123 @@ Status BlobDBImpl::CheckOrCreateWriterLocked(
return s;
}
Status BlobDBImpl::CreateBlobFileAndWriter(
bool has_ttl, const ExpirationRange& expiration_range,
const std::string& reason, std::shared_ptr<BlobFile>* blob_file,
std::shared_ptr<Writer>* writer) {
assert(has_ttl == (expiration_range.first || expiration_range.second));
assert(blob_file);
assert(writer);
*blob_file = NewBlobFile(has_ttl, expiration_range, reason);
assert(*blob_file);
// file not visible, hence no lock
Status s = CheckOrCreateWriterLocked(*blob_file, writer);
if (!s.ok()) {
ROCKS_LOG_ERROR(db_options_.info_log,
"Failed to get writer for blob file: %s, error: %s",
(*blob_file)->PathName().c_str(), s.ToString().c_str());
return s;
}
assert(*writer);
s = (*writer)->WriteHeader((*blob_file)->header_);
if (!s.ok()) {
ROCKS_LOG_ERROR(db_options_.info_log,
"Failed to write header to new blob file: %s"
" status: '%s'",
(*blob_file)->PathName().c_str(), s.ToString().c_str());
return s;
}
(*blob_file)->SetFileSize(BlobLogHeader::kSize);
total_blob_size_ += BlobLogHeader::kSize;
return s;
}
Status BlobDBImpl::SelectBlobFile(std::shared_ptr<BlobFile>* blob_file) {
assert(blob_file != nullptr);
assert(blob_file);
{
ReadLock rl(&mutex_);
if (open_non_ttl_file_ != nullptr) {
if (open_non_ttl_file_) {
assert(!open_non_ttl_file_->Immutable());
*blob_file = open_non_ttl_file_;
return Status::OK();
}
}
// CHECK again
// Check again
WriteLock wl(&mutex_);
if (open_non_ttl_file_ != nullptr) {
if (open_non_ttl_file_) {
assert(!open_non_ttl_file_->Immutable());
*blob_file = open_non_ttl_file_;
return Status::OK();
}
*blob_file = NewBlobFile("SelectBlobFile");
assert(*blob_file != nullptr);
// file not visible, hence no lock
std::shared_ptr<Writer> writer;
Status s = CheckOrCreateWriterLocked(*blob_file, &writer);
const Status s = CreateBlobFileAndWriter(
/* has_ttl */ false, ExpirationRange(),
/* reason */ "SelectBlobFile", blob_file, &writer);
if (!s.ok()) {
ROCKS_LOG_ERROR(db_options_.info_log,
"Failed to get writer from blob file: %s, error: %s",
(*blob_file)->PathName().c_str(), s.ToString().c_str());
return s;
}
(*blob_file)->file_size_ = BlobLogHeader::kSize;
(*blob_file)->header_.compression = bdb_options_.compression;
(*blob_file)->header_.has_ttl = false;
(*blob_file)->header_.column_family_id =
reinterpret_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily())->GetID();
(*blob_file)->header_valid_ = true;
(*blob_file)->SetColumnFamilyId((*blob_file)->header_.column_family_id);
(*blob_file)->SetHasTTL(false);
(*blob_file)->SetCompression(bdb_options_.compression);
s = writer->WriteHeader((*blob_file)->header_);
if (!s.ok()) {
ROCKS_LOG_ERROR(db_options_.info_log,
"Failed to write header to new blob file: %s"
" status: '%s'",
(*blob_file)->PathName().c_str(), s.ToString().c_str());
return s;
}
blob_files_.insert(
std::make_pair((*blob_file)->BlobFileNumber(), *blob_file));
blob_files_.insert(std::map<uint64_t, std::shared_ptr<BlobFile>>::value_type(
(*blob_file)->BlobFileNumber(), *blob_file));
open_non_ttl_file_ = *blob_file;
total_blob_size_ += BlobLogHeader::kSize;
return s;
}
Status BlobDBImpl::SelectBlobFileTTL(uint64_t expiration,
std::shared_ptr<BlobFile>* blob_file) {
assert(blob_file != nullptr);
assert(blob_file);
assert(expiration != kNoExpiration);
uint64_t epoch_read = 0;
{
ReadLock rl(&mutex_);
*blob_file = FindBlobFileLocked(expiration);
epoch_read = epoch_of_.load();
if (*blob_file != nullptr) {
assert(!(*blob_file)->Immutable());
return Status::OK();
}
}
// Check again
WriteLock wl(&mutex_);
*blob_file = FindBlobFileLocked(expiration);
if (*blob_file != nullptr) {
assert(!(*blob_file)->Immutable());
return Status::OK();
}
uint64_t exp_low =
const uint64_t exp_low =
(expiration / bdb_options_.ttl_range_secs) * bdb_options_.ttl_range_secs;
uint64_t exp_high = exp_low + bdb_options_.ttl_range_secs;
ExpirationRange expiration_range = std::make_pair(exp_low, exp_high);
const uint64_t exp_high = exp_low + bdb_options_.ttl_range_secs;
const ExpirationRange expiration_range(exp_low, exp_high);
*blob_file = NewBlobFile("SelectBlobFileTTL");
assert(*blob_file != nullptr);
std::ostringstream oss;
oss << "SelectBlobFileTTL range: [" << exp_low << ',' << exp_high << ')';
ROCKS_LOG_INFO(db_options_.info_log,
"New blob file TTL range: %s %" PRIu64 " %" PRIu64,
(*blob_file)->PathName().c_str(), exp_low, exp_high);
LogFlush(db_options_.info_log);
// we don't need to take lock as no other thread is seeing bfile yet
std::shared_ptr<Writer> writer;
Status s = CheckOrCreateWriterLocked(*blob_file, &writer);
const Status s =
CreateBlobFileAndWriter(/* has_ttl */ true, expiration_range,
/* reason */ oss.str(), blob_file, &writer);
if (!s.ok()) {
ROCKS_LOG_ERROR(
db_options_.info_log,
"Failed to get writer from blob file with TTL: %s, error: %s",
(*blob_file)->PathName().c_str(), s.ToString().c_str());
return s;
}
(*blob_file)->header_.expiration_range = expiration_range;
(*blob_file)->header_.compression = bdb_options_.compression;
(*blob_file)->header_.has_ttl = true;
(*blob_file)->header_.column_family_id =
reinterpret_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily())->GetID();
(*blob_file)->header_valid_ = true;
(*blob_file)->SetColumnFamilyId((*blob_file)->header_.column_family_id);
(*blob_file)->SetHasTTL(true);
(*blob_file)->SetCompression(bdb_options_.compression);
(*blob_file)->file_size_ = BlobLogHeader::kSize;
// set the first value of the range, since that is
// concrete at this time. also necessary to add to open_ttl_files_
(*blob_file)->expiration_range_ = expiration_range;
WriteLock wl(&mutex_);
// in case the epoch has shifted in the interim, then check
// check condition again - should be rare.
if (epoch_of_.load() != epoch_read) {
std::shared_ptr<BlobFile> blob_file2 = FindBlobFileLocked(expiration);
if (blob_file2 != nullptr) {
*blob_file = std::move(blob_file2);
return Status::OK();
}
}
s = writer->WriteHeader((*blob_file)->header_);
if (!s.ok()) {
ROCKS_LOG_ERROR(db_options_.info_log,
"Failed to write header to new blob file: %s"
" status: '%s'",
(*blob_file)->PathName().c_str(), s.ToString().c_str());
return s;
}
blob_files_.insert(
std::make_pair((*blob_file)->BlobFileNumber(), *blob_file));
blob_files_.insert(std::map<uint64_t, std::shared_ptr<BlobFile>>::value_type(
(*blob_file)->BlobFileNumber(), *blob_file));
open_ttl_files_.insert(*blob_file);
total_blob_size_ += BlobLogHeader::kSize;
epoch_of_++;
return s;
}
@ -1954,7 +1944,7 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr<BlobFile>& bfptr,
// new file
std::string reason("GC of ");
reason += bfptr->PathName();
newfile = NewBlobFile(reason);
newfile = NewBlobFile(bfptr->HasTTL(), bfptr->expiration_range_, reason);
s = CheckOrCreateWriterLocked(newfile, &new_writer);
if (!s.ok()) {
@ -1963,14 +1953,7 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr<BlobFile>& bfptr,
newfile->PathName().c_str(), s.ToString().c_str());
break;
}
// Can't use header beyond this point
newfile->header_ = std::move(header);
newfile->header_valid_ = true;
newfile->file_size_ = BlobLogHeader::kSize;
newfile->SetColumnFamilyId(bfptr->column_family_id());
newfile->SetHasTTL(bfptr->HasTTL());
newfile->SetCompression(bfptr->compression());
newfile->expiration_range_ = bfptr->expiration_range_;
s = new_writer->WriteHeader(newfile->header_);
if (!s.ok()) {

View File

@ -264,14 +264,22 @@ class BlobDBImpl : public BlobDB {
const Slice& value, uint64_t expiration,
std::string* index_entry);
// find an existing blob log file based on the expiration unix epoch
// if such a file does not exist, return nullptr
// Create a new blob file and associated writer.
Status CreateBlobFileAndWriter(bool has_ttl,
const ExpirationRange& expiration_range,
const std::string& reason,
std::shared_ptr<BlobFile>* blob_file,
std::shared_ptr<Writer>* writer);
// Get the open non-TTL blob log file, or create a new one if no such file
// exists.
Status SelectBlobFile(std::shared_ptr<BlobFile>* blob_file);
// Get the open TTL blob log file for a certain expiration, or create a new
// one if no such file exists.
Status SelectBlobFileTTL(uint64_t expiration,
std::shared_ptr<BlobFile>* blob_file);
// find an existing blob log file to append the value to
Status SelectBlobFile(std::shared_ptr<BlobFile>* blob_file);
std::shared_ptr<BlobFile> FindBlobFileLocked(uint64_t expiration) const;
// periodic sanity check. Bunch of checks
@ -300,7 +308,9 @@ class BlobDBImpl : public BlobDB {
void StartBackgroundTasks();
// add a new Blob File
std::shared_ptr<BlobFile> NewBlobFile(const std::string& reason);
std::shared_ptr<BlobFile> NewBlobFile(bool has_ttl,
const ExpirationRange& expiration_range,
const std::string& reason);
// collect all the blob log files from the blob directory
Status GetAllBlobFiles(std::set<uint64_t>* file_numbers);
@ -434,9 +444,6 @@ class BlobDBImpl : public BlobDB {
// The largest sequence number that has been flushed.
SequenceNumber flush_sequence_;
// epoch or version of the open files.
std::atomic<uint64_t> epoch_of_;
// opened non-TTL blob file.
std::shared_ptr<BlobFile> open_non_ttl_file_;

View File

@ -10,7 +10,6 @@
#include <cinttypes>
#include <algorithm>
#include <limits>
#include <memory>
#include "db/column_family.h"
@ -25,45 +24,24 @@ namespace rocksdb {
namespace blob_db {
BlobFile::BlobFile()
: parent_(nullptr),
file_number_(0),
info_log_(nullptr),
column_family_id_(std::numeric_limits<uint32_t>::max()),
compression_(kNoCompression),
has_ttl_(false),
blob_count_(0),
file_size_(0),
closed_(false),
immutable_sequence_(0),
obsolete_(false),
obsolete_sequence_(0),
expiration_range_({0, 0}),
last_access_(-1),
last_fsync_(0),
header_valid_(false),
footer_valid_(false) {}
BlobFile::BlobFile(const BlobDBImpl* p, const std::string& bdir, uint64_t fn,
Logger* info_log)
: parent_(p), path_to_dir_(bdir), file_number_(fn), info_log_(info_log) {}
BlobFile::BlobFile(const BlobDBImpl* p, const std::string& bdir, uint64_t fn,
Logger* info_log, uint32_t column_family_id,
CompressionType compression, bool has_ttl,
const ExpirationRange& expiration_range)
: parent_(p),
path_to_dir_(bdir),
file_number_(fn),
info_log_(info_log),
column_family_id_(std::numeric_limits<uint32_t>::max()),
compression_(kNoCompression),
has_ttl_(false),
blob_count_(0),
file_size_(0),
closed_(false),
immutable_sequence_(0),
obsolete_(false),
obsolete_sequence_(0),
expiration_range_({0, 0}),
last_access_(-1),
last_fsync_(0),
header_valid_(false),
footer_valid_(false) {}
column_family_id_(column_family_id),
compression_(compression),
has_ttl_(has_ttl),
expiration_range_(expiration_range),
header_(column_family_id, compression, has_ttl, expiration_range),
header_valid_(true) {}
BlobFile::~BlobFile() {
if (obsolete_) {

View File

@ -6,6 +6,7 @@
#ifndef ROCKSDB_LITE
#include <atomic>
#include <limits>
#include <memory>
#include <unordered_set>
@ -29,7 +30,7 @@ class BlobFile {
private:
// access to parent
const BlobDBImpl* parent_;
const BlobDBImpl* parent_{nullptr};
// path to blob directory
std::string path_to_dir_;
@ -37,49 +38,50 @@ class BlobFile {
// the id of the file.
// the above 2 are created during file creation and never changed
// after that
uint64_t file_number_;
uint64_t file_number_{0};
// The file numbers of the SST files whose oldest blob file reference
// points to this blob file.
std::unordered_set<uint64_t> linked_sst_files_;
// Info log.
Logger* info_log_;
Logger* info_log_{nullptr};
// Column family id.
uint32_t column_family_id_;
uint32_t column_family_id_{std::numeric_limits<uint32_t>::max()};
// Compression type of blobs in the file
CompressionType compression_;
CompressionType compression_{kNoCompression};
// If true, the keys in this file all has TTL. Otherwise all keys don't
// have TTL.
bool has_ttl_;
bool has_ttl_{false};
// TTL range of blobs in the file.
ExpirationRange expiration_range_;
// number of blobs in the file
std::atomic<uint64_t> blob_count_;
std::atomic<uint64_t> blob_count_{0};
// size of the file
std::atomic<uint64_t> file_size_;
std::atomic<uint64_t> file_size_{0};
BlobLogHeader header_;
// closed_ = true implies the file is no more mutable
// no more blobs will be appended and the footer has been written out
std::atomic<bool> closed_;
std::atomic<bool> closed_{false};
// The latest sequence number when the file was closed/made immutable.
SequenceNumber immutable_sequence_;
SequenceNumber immutable_sequence_{0};
// has a pass of garbage collection successfully finished on this file
// obsolete_ still needs to do iterator/snapshot checks
std::atomic<bool> obsolete_;
std::atomic<bool> obsolete_{false};
// The last sequence number by the time the file marked as obsolete.
// Data in this file is visible to a snapshot taken before the sequence.
SequenceNumber obsolete_sequence_;
ExpirationRange expiration_range_;
SequenceNumber obsolete_sequence_{0};
// Sequential/Append writer for blobs
std::shared_ptr<Writer> log_writer_;
@ -92,29 +94,30 @@ class BlobFile {
mutable port::RWMutex mutex_;
// time when the random access reader was last created.
std::atomic<std::int64_t> last_access_;
std::atomic<std::int64_t> last_access_{-1};
// last time file was fsync'd/fdatasyncd
std::atomic<uint64_t> last_fsync_;
std::atomic<uint64_t> last_fsync_{0};
bool header_valid_;
bool header_valid_{false};
bool footer_valid_;
bool footer_valid_{false};
public:
BlobFile();
BlobFile() = default;
BlobFile(const BlobDBImpl* parent, const std::string& bdir, uint64_t fnum,
Logger* info_log);
BlobFile(const BlobDBImpl* parent, const std::string& bdir, uint64_t fnum,
Logger* info_log, uint32_t column_family_id,
CompressionType compression, bool has_ttl,
const ExpirationRange& expiration_range);
~BlobFile();
uint32_t column_family_id() const;
void SetColumnFamilyId(uint32_t cf_id) {
column_family_id_ = cf_id;
}
// Returns log file's absolute pathname.
std::string PathName() const;
@ -203,10 +206,6 @@ class BlobFile {
CompressionType compression() const { return compression_; }
void SetCompression(CompressionType c) {
compression_ = c;
}
std::shared_ptr<Writer> GetWriter() const { return log_writer_; }
// Read blob file header and footer. Return corruption if file header is

View File

@ -43,11 +43,19 @@ using ExpirationRange = std::pair<uint64_t, uint64_t>;
struct BlobLogHeader {
static constexpr size_t kSize = 30;
BlobLogHeader() = default;
BlobLogHeader(uint32_t _column_family_id, CompressionType _compression,
bool _has_ttl, const ExpirationRange& _expiration_range)
: column_family_id(_column_family_id),
compression(_compression),
has_ttl(_has_ttl),
expiration_range(_expiration_range) {}
uint32_t version = kVersion1;
uint32_t column_family_id = 0;
CompressionType compression = kNoCompression;
bool has_ttl = false;
ExpirationRange expiration_range = std::make_pair(0, 0);
ExpirationRange expiration_range;
void EncodeTo(std::string* dst);