Blob storage helper methods

Summary:
Split out interfaces needed for blob storage from #1560, including
* CompactionEventListener and OnFlushBegin listener interfaces.
* Blob filename support.
Closes https://github.com/facebook/rocksdb/pull/2169

Differential Revision: D4905463

Pulled By: yiwu-arbug

fbshipit-source-id: 564e73448f1b7a367e5e46216a521e57ea9011b5
This commit is contained in:
Yi Wu 2017-04-18 12:00:36 -07:00 committed by Facebook Github Bot
parent a6439d797e
commit 0fcdccc33e
15 changed files with 199 additions and 20 deletions

View File

@ -9,6 +9,7 @@
* PinnableSlice releases the pinned resources that contain the value when it is destructed or when ::Reset() is called on it.
* The old API that accepts std::string, although discouraged, is still supported.
* Replace Options::use_direct_writes with Options::use_direct_io_for_flush_and_compaction. Read Direct IO wiki for details.
* Added CompactionEventListener and EventListener::OnFlushBegin interfaces.
### New Features
* Memtable flush can be avoided during checkpoint creation if total log file size is smaller than a threshold specified by the user.

View File

@ -10,12 +10,34 @@
namespace rocksdb {
CompactionEventListener::CompactionListenerValueType fromInternalValueType(
ValueType vt) {
switch (vt) {
case kTypeDeletion:
return CompactionEventListener::CompactionListenerValueType::kDelete;
case kTypeValue:
return CompactionEventListener::CompactionListenerValueType::kValue;
case kTypeMerge:
return CompactionEventListener::CompactionListenerValueType::
kMergeOperand;
case kTypeSingleDeletion:
return CompactionEventListener::CompactionListenerValueType::
kSingleDelete;
case kTypeRangeDeletion:
return CompactionEventListener::CompactionListenerValueType::kRangeDelete;
default:
assert(false);
return CompactionEventListener::CompactionListenerValueType::kInvalid;
}
}
CompactionIterator::CompactionIterator(
InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper,
SequenceNumber last_sequence, std::vector<SequenceNumber>* snapshots,
SequenceNumber earliest_write_conflict_snapshot, Env* env,
bool expect_valid_internal_key, RangeDelAggregator* range_del_agg,
const Compaction* compaction, const CompactionFilter* compaction_filter,
CompactionEventListener* compaction_listener,
const std::atomic<bool>* shutting_down)
: CompactionIterator(
input, cmp, merge_helper, last_sequence, snapshots,
@ -23,7 +45,7 @@ CompactionIterator::CompactionIterator(
range_del_agg,
std::unique_ptr<CompactionProxy>(
compaction ? new CompactionProxy(compaction) : nullptr),
compaction_filter, shutting_down) {}
compaction_filter, compaction_listener, shutting_down) {}
CompactionIterator::CompactionIterator(
InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper,
@ -32,6 +54,7 @@ CompactionIterator::CompactionIterator(
bool expect_valid_internal_key, RangeDelAggregator* range_del_agg,
std::unique_ptr<CompactionProxy> compaction,
const CompactionFilter* compaction_filter,
CompactionEventListener* compaction_listener,
const std::atomic<bool>* shutting_down)
: input_(input),
cmp_(cmp),
@ -43,7 +66,9 @@ CompactionIterator::CompactionIterator(
range_del_agg_(range_del_agg),
compaction_(std::move(compaction)),
compaction_filter_(compaction_filter),
compaction_listener_(compaction_listener),
shutting_down_(shutting_down),
ignore_snapshots_(false),
merge_out_iter_(merge_helper_) {
assert(compaction_filter_ == nullptr || compaction_ != nullptr);
bottommost_level_ =
@ -62,8 +87,8 @@ CompactionIterator::CompactionIterator(
earliest_snapshot_ = snapshots_->at(0);
latest_snapshot_ = snapshots_->back();
}
if (compaction_filter_ != nullptr && compaction_filter_->IgnoreSnapshots()) {
ignore_snapshots_ = true;
if (compaction_filter_ != nullptr) {
if (compaction_filter_->IgnoreSnapshots()) ignore_snapshots_ = true;
} else {
ignore_snapshots_ = false;
}
@ -188,6 +213,12 @@ void CompactionIterator::NextFromInput() {
current_user_key_sequence_ = kMaxSequenceNumber;
current_user_key_snapshot_ = 0;
if (compaction_listener_) {
compaction_listener_->OnCompaction(compaction_->level(), ikey_.user_key,
fromInternalValueType(ikey_.type),
value_, ikey_.sequence, true);
}
// apply the compaction filter to the first occurrence of the user key
if (compaction_filter_ != nullptr && ikey_.type == kTypeValue &&
(visible_at_tip_ || ikey_.sequence > latest_snapshot_ ||
@ -235,6 +266,12 @@ void CompactionIterator::NextFromInput() {
}
}
} else {
if (compaction_listener_) {
compaction_listener_->OnCompaction(compaction_->level(), ikey_.user_key,
fromInternalValueType(ikey_.type),
value_, ikey_.sequence, false);
}
// Update the current key to reflect the new sequence number/type without
// copying the user key.
// TODO(rven): Compaction filter does not process keys in this path
@ -394,7 +431,7 @@ void CompactionIterator::NextFromInput() {
// is the same as the visibility of a previous instance of the
// same key, then this kv is not visible in any snapshot.
// Hidden by an newer entry for same user key
// TODO: why not > ?
// TODO(noetzli): why not > ?
//
// Note: Dropping this key will not affect TransactionDB write-conflict
// checking since there has already been a record returned for this key

View File

@ -20,6 +20,8 @@
namespace rocksdb {
class CompactionEventListener;
class CompactionIterator {
public:
// A wrapper around Compaction. Has a much smaller interface, only what
@ -60,6 +62,7 @@ class CompactionIterator {
RangeDelAggregator* range_del_agg,
const Compaction* compaction = nullptr,
const CompactionFilter* compaction_filter = nullptr,
CompactionEventListener* compaction_listener = nullptr,
const std::atomic<bool>* shutting_down = nullptr);
// Constructor with custom CompactionProxy, used for tests.
@ -71,6 +74,7 @@ class CompactionIterator {
RangeDelAggregator* range_del_agg,
std::unique_ptr<CompactionProxy> compaction,
const CompactionFilter* compaction_filter = nullptr,
CompactionEventListener* compaction_listener = nullptr,
const std::atomic<bool>* shutting_down = nullptr);
~CompactionIterator();
@ -124,6 +128,7 @@ class CompactionIterator {
RangeDelAggregator* range_del_agg_;
std::unique_ptr<CompactionProxy> compaction_;
const CompactionFilter* compaction_filter_;
CompactionEventListener* compaction_listener_;
const std::atomic<bool>* shutting_down_;
bool bottommost_level_;
bool valid_ = false;

View File

@ -189,7 +189,7 @@ class CompactionIteratorTest : public testing::Test {
c_iter_.reset(new CompactionIterator(
iter_.get(), cmp_, merge_helper_.get(), last_sequence, &snapshots_,
kMaxSequenceNumber, Env::Default(), false, range_del_agg_.get(),
std::move(compaction), filter, &shutting_down_));
std::move(compaction), filter, nullptr, &shutting_down_));
}
void AddSnapshot(SequenceNumber snapshot) { snapshots_.push_back(snapshot); }

View File

@ -739,12 +739,21 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
input->SeekToFirst();
}
// we allow only 1 compaction event listener. Used by blob storage
CompactionEventListener* comp_event_listener = nullptr;
for (auto& celitr : cfd->ioptions()->listeners) {
comp_event_listener = celitr->GetCompactionEventListener();
if (comp_event_listener != nullptr) {
break;
}
}
Status status;
sub_compact->c_iter.reset(new CompactionIterator(
input.get(), cfd->user_comparator(), &merge, versions_->LastSequence(),
&existing_snapshots_, earliest_write_conflict_snapshot_, env_, false,
range_del_agg.get(), sub_compact->compaction, compaction_filter,
shutting_down_));
comp_event_listener, shutting_down_));
auto c_iter = sub_compact->c_iter.get();
c_iter->SeekToFirst();
if (c_iter->Valid() &&

View File

@ -557,6 +557,10 @@ class DBImpl : public DB {
Status RenameTempFileToOptionsFile(const std::string& file_name);
Status DeleteObsoleteOptionsFiles();
void NotifyOnFlushBegin(ColumnFamilyData* cfd, FileMetaData* file_meta,
const MutableCFOptions& mutable_cf_options,
int job_id, TableProperties prop);
void NotifyOnFlushCompleted(ColumnFamilyData* cfd, FileMetaData* file_meta,
const MutableCFOptions& mutable_cf_options,
int job_id, TableProperties prop);
@ -1095,13 +1099,6 @@ class DBImpl : public DB {
DBImpl(const DBImpl&);
void operator=(const DBImpl&);
// Return the earliest snapshot where seqno is visible.
// Store the snapshot right before that, if any, in prev_snapshot
inline SequenceNumber findEarliestVisibleSnapshot(
SequenceNumber in,
std::vector<SequenceNumber>& snapshots,
SequenceNumber* prev_snapshot);
// Background threads call this function, which is just a wrapper around
// the InstallSuperVersion() function. Background threads carry
// job_context which can have new_superversion already

View File

@ -90,6 +90,12 @@ Status DBImpl::FlushMemTableToOutputFile(
flush_job.PickMemTable();
#ifndef ROCKSDB_LITE
// may temporarily unlock and lock the mutex.
NotifyOnFlushBegin(cfd, &file_meta, mutable_cf_options, job_context->job_id,
flush_job.GetTableProperties());
#endif // ROCKSDB_LITE
Status s;
if (logfile_number_ > 0 &&
versions_->GetColumnFamilySet()->NumberOfColumnFamilies() > 0) {
@ -156,6 +162,49 @@ Status DBImpl::FlushMemTableToOutputFile(
return s;
}
void DBImpl::NotifyOnFlushBegin(ColumnFamilyData* cfd, FileMetaData* file_meta,
const MutableCFOptions& mutable_cf_options,
int job_id, TableProperties prop) {
#ifndef ROCKSDB_LITE
if (immutable_db_options_.listeners.size() == 0U) {
return;
}
mutex_.AssertHeld();
if (shutting_down_.load(std::memory_order_acquire)) {
return;
}
bool triggered_writes_slowdown =
(cfd->current()->storage_info()->NumLevelFiles(0) >=
mutable_cf_options.level0_slowdown_writes_trigger);
bool triggered_writes_stop =
(cfd->current()->storage_info()->NumLevelFiles(0) >=
mutable_cf_options.level0_stop_writes_trigger);
// release lock while notifying events
mutex_.Unlock();
{
FlushJobInfo info;
info.cf_name = cfd->GetName();
// TODO(yhchiang): make db_paths dynamic in case flush does not
// go to L0 in the future.
info.file_path = MakeTableFileName(immutable_db_options_.db_paths[0].path,
file_meta->fd.GetNumber());
info.thread_id = env_->GetThreadID();
info.job_id = job_id;
info.triggered_writes_slowdown = triggered_writes_slowdown;
info.triggered_writes_stop = triggered_writes_stop;
info.smallest_seqno = file_meta->smallest_seqno;
info.largest_seqno = file_meta->largest_seqno;
info.table_properties = prop;
for (auto listener : immutable_db_options_.listeners) {
listener->OnFlushBegin(this, info);
}
}
mutex_.Lock();
// no need to signal bg_cv_ as it will be signaled at the end of the
// flush process.
#endif // ROCKSDB_LITE
}
void DBImpl::NotifyOnFlushCompleted(ColumnFamilyData* cfd,
FileMetaData* file_meta,
const MutableCFOptions& mutable_cf_options,

View File

@ -444,6 +444,7 @@ void DBImpl::PurgeObsoleteFiles(const JobContext& state, bool schedule_only) {
case kIdentityFile:
case kMetaDatabase:
case kOptionsFile:
case kBlobFile:
keep = true;
break;
}

View File

@ -34,8 +34,8 @@ static const int kMaxRecordType = kRecyclableLastType;
static const unsigned int kBlockSize = 32768;
// Header is checksum (4 bytes), type (1 byte), length (2 bytes).
static const int kHeaderSize = 4 + 1 + 2;
// Header is checksum (4 bytes), length (2 bytes), type (1 byte)
static const int kHeaderSize = 4 + 2 + 1;
// Recyclable header is checksum (4 bytes), type (1 byte), log number
// (4 bytes), length (2 bytes).

View File

@ -724,9 +724,9 @@ void MemTable::Update(SequenceNumber seq,
uint32_t new_size = static_cast<uint32_t>(value.size());
// Update value, if new value size <= previous value size
if (new_size <= prev_size ) {
char* p = EncodeVarint32(const_cast<char*>(key_ptr) + key_length,
new_size);
if (new_size <= prev_size) {
char* p =
EncodeVarint32(const_cast<char*>(key_ptr) + key_length, new_size);
WriteLock wl(GetLock(lkey.user_key()));
memcpy(p, value.data(), value.size());
assert((unsigned)((p + value.size()) - entry) ==

View File

@ -163,6 +163,20 @@ class Env {
unique_ptr<WritableFile>* result,
const EnvOptions& options) = 0;
// Create an object that writes to a new file with the specified
// name. Deletes any existing file with the same name and creates a
// new file. On success, stores a pointer to the new file in
// *result and returns OK. On failure stores nullptr in *result and
// returns non-OK.
//
// The returned file will only be accessed by one thread at a time.
virtual Status ReopenWritableFile(const std::string& fname,
unique_ptr<WritableFile>* result,
const EnvOptions& options) {
Status s;
return s;
}
// Reuse an existing file by renaming it and opening it as writable.
virtual Status ReuseWritableFile(const std::string& fname,
const std::string& old_fname,
@ -454,6 +468,8 @@ class SequentialFile {
// aligned buffer for Direct I/O
virtual size_t GetRequiredBufferAlignment() const { return kDefaultPageSize; }
virtual void Rewind() {}
// Remove any kind of caching of data from the offset to offset+length
// of this file. If the length is 0, then it refers to the end of file.
// If the system is not caching the file contents, then this is a noop.
@ -918,6 +934,11 @@ class EnvWrapper : public Env {
const EnvOptions& options) override {
return target_->NewWritableFile(f, r, options);
}
Status ReopenWritableFile(const std::string& fname,
unique_ptr<WritableFile>* result,
const EnvOptions& options) override {
return target_->ReopenWritableFile(fname, result, options);
}
Status ReuseWritableFile(const std::string& fname,
const std::string& old_fname,
unique_ptr<WritableFile>* r,

View File

@ -183,6 +183,28 @@ struct ExternalFileIngestionInfo {
TableProperties table_properties;
};
// A call-back function to RocksDB which will be called when the compaction
// iterator is compacting values. It is mean to be returned from
// EventListner::GetCompactionEventListner() at the beginning of compaction
// job.
class CompactionEventListener {
public:
enum CompactionListenerValueType {
kValue,
kMergeOperand,
kDelete,
kSingleDelete,
kRangeDelete,
kInvalid,
};
virtual void OnCompaction(int level, const Slice& key,
CompactionListenerValueType value_type,
const Slice& existing_value,
const SequenceNumber& sn, bool is_new) = 0;
virtual ~CompactionEventListener() = default;
};
// EventListener class contains a set of call-back functions that will
// be called when specific RocksDB event happens such as flush. It can
@ -225,6 +247,16 @@ class EventListener {
virtual void OnFlushCompleted(DB* /*db*/,
const FlushJobInfo& /*flush_job_info*/) {}
// A call-back function to RocksDB which will be called before a
// RocksDB starts to flush memtables. The default implementation is
// no-op.
//
// Note that the this function must be implemented in a way such that
// it should not run for an extended period of time before the function
// returns. Otherwise, RocksDB may be blocked.
virtual void OnFlushBegin(DB* /*db*/,
const FlushJobInfo& /*flush_job_info*/) {}
// A call-back function for RocksDB which will be called whenever
// a SST file is deleted. Different from OnCompactionCompleted and
// OnFlushCompleted, this call-back is designed for external logging
@ -314,6 +346,12 @@ class EventListener {
virtual void OnExternalFileIngested(
DB* /*db*/, const ExternalFileIngestionInfo& /*info*/) {}
// Factory method to return CompactionEventListener. If multiple listeners
// provides CompactionEventListner, only the first one will be used.
virtual CompactionEventListener* GetCompactionEventListener() {
return nullptr;
}
virtual ~EventListener() {}
};

View File

@ -48,6 +48,7 @@ extern void PutLengthPrefixedSliceParts(std::string* dst,
// Standard Get... routines parse a value from the beginning of a Slice
// and advance the slice past the parsed value.
extern bool GetFixed64(Slice* input, uint64_t* value);
extern bool GetFixed32(Slice* input, uint32_t* value);
extern bool GetVarint32(Slice* input, uint32_t* value);
extern bool GetVarint64(Slice* input, uint64_t* value);
extern bool GetLengthPrefixedSlice(Slice* input, Slice* result);
@ -271,6 +272,15 @@ inline bool GetFixed64(Slice* input, uint64_t* value) {
return true;
}
inline bool GetFixed32(Slice* input, uint32_t* value) {
if (input->size() < sizeof(uint32_t)) {
return false;
}
*value = DecodeFixed32(input->data());
input->remove_prefix(sizeof(uint32_t));
return true;
}
inline bool GetVarint32(Slice* input, uint32_t* value) {
const char* p = input->data();
const char* limit = p + input->size();

View File

@ -27,6 +27,7 @@ namespace rocksdb {
static const std::string kRocksDbTFileExt = "sst";
static const std::string kLevelDbTFileExt = "ldb";
static const std::string kRocksDBBlobFileExt = "blob";
// Given a path, flatten the path name by replacing all chars not in
// {[0-9,a-z,A-Z,-,_,.]} with _. And append '_LOG\0' at the end.
@ -74,6 +75,11 @@ std::string LogFileName(const std::string& name, uint64_t number) {
return MakeFileName(name, number, "log");
}
std::string BlobFileName(const std::string& blobdirname, uint64_t number) {
assert(number > 0);
return MakeFileName(blobdirname, number, kRocksDBBlobFileExt.c_str());
}
std::string ArchivalDirectory(const std::string& dir) {
return dir + "/" + ARCHIVAL_DIR;
}
@ -220,7 +226,7 @@ std::string IdentityFileName(const std::string& dbname) {
// dbname/<info_log_name_prefix>
// dbname/<info_log_name_prefix>.old.[0-9]+
// dbname/MANIFEST-[0-9]+
// dbname/[0-9]+.(log|sst)
// dbname/[0-9]+.(log|sst|blob)
// dbname/METADB-[0-9]+
// dbname/OPTIONS-[0-9]+
// dbname/OPTIONS-[0-9]+.dbtmp
@ -335,6 +341,8 @@ bool ParseFileName(const std::string& fname, uint64_t* number,
} else if (suffix == Slice(kRocksDbTFileExt) ||
suffix == Slice(kLevelDbTFileExt)) {
*type = kTableFile;
} else if (suffix == Slice(kRocksDBBlobFileExt)) {
*type = kBlobFile;
} else if (suffix == Slice(kTempFileNameSuffix)) {
*type = kTempFile;
} else {

View File

@ -38,7 +38,8 @@ enum FileType {
kInfoLogFile, // Either the current one, or an old one
kMetaDatabase,
kIdentityFile,
kOptionsFile
kOptionsFile,
kBlobFile
};
// Return the name of the log file with the specified number
@ -46,6 +47,8 @@ enum FileType {
// "dbname".
extern std::string LogFileName(const std::string& dbname, uint64_t number);
extern std::string BlobFileName(const std::string& bdirname, uint64_t number);
static const std::string ARCHIVAL_DIR = "archive";
extern std::string ArchivalDirectory(const std::string& dbname);