Blob storage helper methods
Summary: Split out interfaces needed for blob storage from #1560, including * CompactionEventListener and OnFlushBegin listener interfaces. * Blob filename support. Closes https://github.com/facebook/rocksdb/pull/2169 Differential Revision: D4905463 Pulled By: yiwu-arbug fbshipit-source-id: 564e73448f1b7a367e5e46216a521e57ea9011b5
This commit is contained in:
parent
1265ed7abb
commit
48fc484950
@ -9,6 +9,7 @@
|
|||||||
* PinnableSlice releases the pinned resources that contain the value when it is destructed or when ::Reset() is called on it.
|
* PinnableSlice releases the pinned resources that contain the value when it is destructed or when ::Reset() is called on it.
|
||||||
* The old API that accepts std::string, although discouraged, is still supported.
|
* The old API that accepts std::string, although discouraged, is still supported.
|
||||||
* Replace Options::use_direct_writes with Options::use_direct_io_for_flush_and_compaction. Read Direct IO wiki for details.
|
* Replace Options::use_direct_writes with Options::use_direct_io_for_flush_and_compaction. Read Direct IO wiki for details.
|
||||||
|
* Added CompactionEventListener and EventListener::OnFlushBegin interfaces.
|
||||||
|
|
||||||
### New Features
|
### New Features
|
||||||
* Memtable flush can be avoided during checkpoint creation if total log file size is smaller than a threshold specified by the user.
|
* Memtable flush can be avoided during checkpoint creation if total log file size is smaller than a threshold specified by the user.
|
||||||
|
@ -10,12 +10,34 @@
|
|||||||
|
|
||||||
namespace rocksdb {
|
namespace rocksdb {
|
||||||
|
|
||||||
|
CompactionEventListener::CompactionListenerValueType fromInternalValueType(
|
||||||
|
ValueType vt) {
|
||||||
|
switch (vt) {
|
||||||
|
case kTypeDeletion:
|
||||||
|
return CompactionEventListener::CompactionListenerValueType::kDelete;
|
||||||
|
case kTypeValue:
|
||||||
|
return CompactionEventListener::CompactionListenerValueType::kValue;
|
||||||
|
case kTypeMerge:
|
||||||
|
return CompactionEventListener::CompactionListenerValueType::
|
||||||
|
kMergeOperand;
|
||||||
|
case kTypeSingleDeletion:
|
||||||
|
return CompactionEventListener::CompactionListenerValueType::
|
||||||
|
kSingleDelete;
|
||||||
|
case kTypeRangeDeletion:
|
||||||
|
return CompactionEventListener::CompactionListenerValueType::kRangeDelete;
|
||||||
|
default:
|
||||||
|
assert(false);
|
||||||
|
return CompactionEventListener::CompactionListenerValueType::kInvalid;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
CompactionIterator::CompactionIterator(
|
CompactionIterator::CompactionIterator(
|
||||||
InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper,
|
InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper,
|
||||||
SequenceNumber last_sequence, std::vector<SequenceNumber>* snapshots,
|
SequenceNumber last_sequence, std::vector<SequenceNumber>* snapshots,
|
||||||
SequenceNumber earliest_write_conflict_snapshot, Env* env,
|
SequenceNumber earliest_write_conflict_snapshot, Env* env,
|
||||||
bool expect_valid_internal_key, RangeDelAggregator* range_del_agg,
|
bool expect_valid_internal_key, RangeDelAggregator* range_del_agg,
|
||||||
const Compaction* compaction, const CompactionFilter* compaction_filter,
|
const Compaction* compaction, const CompactionFilter* compaction_filter,
|
||||||
|
CompactionEventListener* compaction_listener,
|
||||||
const std::atomic<bool>* shutting_down)
|
const std::atomic<bool>* shutting_down)
|
||||||
: CompactionIterator(
|
: CompactionIterator(
|
||||||
input, cmp, merge_helper, last_sequence, snapshots,
|
input, cmp, merge_helper, last_sequence, snapshots,
|
||||||
@ -23,7 +45,7 @@ CompactionIterator::CompactionIterator(
|
|||||||
range_del_agg,
|
range_del_agg,
|
||||||
std::unique_ptr<CompactionProxy>(
|
std::unique_ptr<CompactionProxy>(
|
||||||
compaction ? new CompactionProxy(compaction) : nullptr),
|
compaction ? new CompactionProxy(compaction) : nullptr),
|
||||||
compaction_filter, shutting_down) {}
|
compaction_filter, compaction_listener, shutting_down) {}
|
||||||
|
|
||||||
CompactionIterator::CompactionIterator(
|
CompactionIterator::CompactionIterator(
|
||||||
InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper,
|
InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper,
|
||||||
@ -32,6 +54,7 @@ CompactionIterator::CompactionIterator(
|
|||||||
bool expect_valid_internal_key, RangeDelAggregator* range_del_agg,
|
bool expect_valid_internal_key, RangeDelAggregator* range_del_agg,
|
||||||
std::unique_ptr<CompactionProxy> compaction,
|
std::unique_ptr<CompactionProxy> compaction,
|
||||||
const CompactionFilter* compaction_filter,
|
const CompactionFilter* compaction_filter,
|
||||||
|
CompactionEventListener* compaction_listener,
|
||||||
const std::atomic<bool>* shutting_down)
|
const std::atomic<bool>* shutting_down)
|
||||||
: input_(input),
|
: input_(input),
|
||||||
cmp_(cmp),
|
cmp_(cmp),
|
||||||
@ -43,7 +66,9 @@ CompactionIterator::CompactionIterator(
|
|||||||
range_del_agg_(range_del_agg),
|
range_del_agg_(range_del_agg),
|
||||||
compaction_(std::move(compaction)),
|
compaction_(std::move(compaction)),
|
||||||
compaction_filter_(compaction_filter),
|
compaction_filter_(compaction_filter),
|
||||||
|
compaction_listener_(compaction_listener),
|
||||||
shutting_down_(shutting_down),
|
shutting_down_(shutting_down),
|
||||||
|
ignore_snapshots_(false),
|
||||||
merge_out_iter_(merge_helper_) {
|
merge_out_iter_(merge_helper_) {
|
||||||
assert(compaction_filter_ == nullptr || compaction_ != nullptr);
|
assert(compaction_filter_ == nullptr || compaction_ != nullptr);
|
||||||
bottommost_level_ =
|
bottommost_level_ =
|
||||||
@ -62,8 +87,8 @@ CompactionIterator::CompactionIterator(
|
|||||||
earliest_snapshot_ = snapshots_->at(0);
|
earliest_snapshot_ = snapshots_->at(0);
|
||||||
latest_snapshot_ = snapshots_->back();
|
latest_snapshot_ = snapshots_->back();
|
||||||
}
|
}
|
||||||
if (compaction_filter_ != nullptr && compaction_filter_->IgnoreSnapshots()) {
|
if (compaction_filter_ != nullptr) {
|
||||||
ignore_snapshots_ = true;
|
if (compaction_filter_->IgnoreSnapshots()) ignore_snapshots_ = true;
|
||||||
} else {
|
} else {
|
||||||
ignore_snapshots_ = false;
|
ignore_snapshots_ = false;
|
||||||
}
|
}
|
||||||
@ -188,6 +213,12 @@ void CompactionIterator::NextFromInput() {
|
|||||||
current_user_key_sequence_ = kMaxSequenceNumber;
|
current_user_key_sequence_ = kMaxSequenceNumber;
|
||||||
current_user_key_snapshot_ = 0;
|
current_user_key_snapshot_ = 0;
|
||||||
|
|
||||||
|
if (compaction_listener_) {
|
||||||
|
compaction_listener_->OnCompaction(compaction_->level(), ikey_.user_key,
|
||||||
|
fromInternalValueType(ikey_.type),
|
||||||
|
value_, ikey_.sequence, true);
|
||||||
|
}
|
||||||
|
|
||||||
// apply the compaction filter to the first occurrence of the user key
|
// apply the compaction filter to the first occurrence of the user key
|
||||||
if (compaction_filter_ != nullptr && ikey_.type == kTypeValue &&
|
if (compaction_filter_ != nullptr && ikey_.type == kTypeValue &&
|
||||||
(visible_at_tip_ || ikey_.sequence > latest_snapshot_ ||
|
(visible_at_tip_ || ikey_.sequence > latest_snapshot_ ||
|
||||||
@ -235,6 +266,12 @@ void CompactionIterator::NextFromInput() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
if (compaction_listener_) {
|
||||||
|
compaction_listener_->OnCompaction(compaction_->level(), ikey_.user_key,
|
||||||
|
fromInternalValueType(ikey_.type),
|
||||||
|
value_, ikey_.sequence, false);
|
||||||
|
}
|
||||||
|
|
||||||
// Update the current key to reflect the new sequence number/type without
|
// Update the current key to reflect the new sequence number/type without
|
||||||
// copying the user key.
|
// copying the user key.
|
||||||
// TODO(rven): Compaction filter does not process keys in this path
|
// TODO(rven): Compaction filter does not process keys in this path
|
||||||
@ -394,7 +431,7 @@ void CompactionIterator::NextFromInput() {
|
|||||||
// is the same as the visibility of a previous instance of the
|
// is the same as the visibility of a previous instance of the
|
||||||
// same key, then this kv is not visible in any snapshot.
|
// same key, then this kv is not visible in any snapshot.
|
||||||
// Hidden by an newer entry for same user key
|
// Hidden by an newer entry for same user key
|
||||||
// TODO: why not > ?
|
// TODO(noetzli): why not > ?
|
||||||
//
|
//
|
||||||
// Note: Dropping this key will not affect TransactionDB write-conflict
|
// Note: Dropping this key will not affect TransactionDB write-conflict
|
||||||
// checking since there has already been a record returned for this key
|
// checking since there has already been a record returned for this key
|
||||||
|
@ -20,6 +20,8 @@
|
|||||||
|
|
||||||
namespace rocksdb {
|
namespace rocksdb {
|
||||||
|
|
||||||
|
class CompactionEventListener;
|
||||||
|
|
||||||
class CompactionIterator {
|
class CompactionIterator {
|
||||||
public:
|
public:
|
||||||
// A wrapper around Compaction. Has a much smaller interface, only what
|
// A wrapper around Compaction. Has a much smaller interface, only what
|
||||||
@ -60,6 +62,7 @@ class CompactionIterator {
|
|||||||
RangeDelAggregator* range_del_agg,
|
RangeDelAggregator* range_del_agg,
|
||||||
const Compaction* compaction = nullptr,
|
const Compaction* compaction = nullptr,
|
||||||
const CompactionFilter* compaction_filter = nullptr,
|
const CompactionFilter* compaction_filter = nullptr,
|
||||||
|
CompactionEventListener* compaction_listener = nullptr,
|
||||||
const std::atomic<bool>* shutting_down = nullptr);
|
const std::atomic<bool>* shutting_down = nullptr);
|
||||||
|
|
||||||
// Constructor with custom CompactionProxy, used for tests.
|
// Constructor with custom CompactionProxy, used for tests.
|
||||||
@ -71,6 +74,7 @@ class CompactionIterator {
|
|||||||
RangeDelAggregator* range_del_agg,
|
RangeDelAggregator* range_del_agg,
|
||||||
std::unique_ptr<CompactionProxy> compaction,
|
std::unique_ptr<CompactionProxy> compaction,
|
||||||
const CompactionFilter* compaction_filter = nullptr,
|
const CompactionFilter* compaction_filter = nullptr,
|
||||||
|
CompactionEventListener* compaction_listener = nullptr,
|
||||||
const std::atomic<bool>* shutting_down = nullptr);
|
const std::atomic<bool>* shutting_down = nullptr);
|
||||||
|
|
||||||
~CompactionIterator();
|
~CompactionIterator();
|
||||||
@ -124,6 +128,7 @@ class CompactionIterator {
|
|||||||
RangeDelAggregator* range_del_agg_;
|
RangeDelAggregator* range_del_agg_;
|
||||||
std::unique_ptr<CompactionProxy> compaction_;
|
std::unique_ptr<CompactionProxy> compaction_;
|
||||||
const CompactionFilter* compaction_filter_;
|
const CompactionFilter* compaction_filter_;
|
||||||
|
CompactionEventListener* compaction_listener_;
|
||||||
const std::atomic<bool>* shutting_down_;
|
const std::atomic<bool>* shutting_down_;
|
||||||
bool bottommost_level_;
|
bool bottommost_level_;
|
||||||
bool valid_ = false;
|
bool valid_ = false;
|
||||||
|
@ -189,7 +189,7 @@ class CompactionIteratorTest : public testing::Test {
|
|||||||
c_iter_.reset(new CompactionIterator(
|
c_iter_.reset(new CompactionIterator(
|
||||||
iter_.get(), cmp_, merge_helper_.get(), last_sequence, &snapshots_,
|
iter_.get(), cmp_, merge_helper_.get(), last_sequence, &snapshots_,
|
||||||
kMaxSequenceNumber, Env::Default(), false, range_del_agg_.get(),
|
kMaxSequenceNumber, Env::Default(), false, range_del_agg_.get(),
|
||||||
std::move(compaction), filter, &shutting_down_));
|
std::move(compaction), filter, nullptr, &shutting_down_));
|
||||||
}
|
}
|
||||||
|
|
||||||
void AddSnapshot(SequenceNumber snapshot) { snapshots_.push_back(snapshot); }
|
void AddSnapshot(SequenceNumber snapshot) { snapshots_.push_back(snapshot); }
|
||||||
|
@ -739,12 +739,21 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
|
|||||||
input->SeekToFirst();
|
input->SeekToFirst();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// we allow only 1 compaction event listener. Used by blob storage
|
||||||
|
CompactionEventListener* comp_event_listener = nullptr;
|
||||||
|
for (auto& celitr : cfd->ioptions()->listeners) {
|
||||||
|
comp_event_listener = celitr->GetCompactionEventListener();
|
||||||
|
if (comp_event_listener != nullptr) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
Status status;
|
Status status;
|
||||||
sub_compact->c_iter.reset(new CompactionIterator(
|
sub_compact->c_iter.reset(new CompactionIterator(
|
||||||
input.get(), cfd->user_comparator(), &merge, versions_->LastSequence(),
|
input.get(), cfd->user_comparator(), &merge, versions_->LastSequence(),
|
||||||
&existing_snapshots_, earliest_write_conflict_snapshot_, env_, false,
|
&existing_snapshots_, earliest_write_conflict_snapshot_, env_, false,
|
||||||
range_del_agg.get(), sub_compact->compaction, compaction_filter,
|
range_del_agg.get(), sub_compact->compaction, compaction_filter,
|
||||||
shutting_down_));
|
comp_event_listener, shutting_down_));
|
||||||
auto c_iter = sub_compact->c_iter.get();
|
auto c_iter = sub_compact->c_iter.get();
|
||||||
c_iter->SeekToFirst();
|
c_iter->SeekToFirst();
|
||||||
if (c_iter->Valid() &&
|
if (c_iter->Valid() &&
|
||||||
|
11
db/db_impl.h
11
db/db_impl.h
@ -557,6 +557,10 @@ class DBImpl : public DB {
|
|||||||
Status RenameTempFileToOptionsFile(const std::string& file_name);
|
Status RenameTempFileToOptionsFile(const std::string& file_name);
|
||||||
Status DeleteObsoleteOptionsFiles();
|
Status DeleteObsoleteOptionsFiles();
|
||||||
|
|
||||||
|
void NotifyOnFlushBegin(ColumnFamilyData* cfd, FileMetaData* file_meta,
|
||||||
|
const MutableCFOptions& mutable_cf_options,
|
||||||
|
int job_id, TableProperties prop);
|
||||||
|
|
||||||
void NotifyOnFlushCompleted(ColumnFamilyData* cfd, FileMetaData* file_meta,
|
void NotifyOnFlushCompleted(ColumnFamilyData* cfd, FileMetaData* file_meta,
|
||||||
const MutableCFOptions& mutable_cf_options,
|
const MutableCFOptions& mutable_cf_options,
|
||||||
int job_id, TableProperties prop);
|
int job_id, TableProperties prop);
|
||||||
@ -1091,13 +1095,6 @@ class DBImpl : public DB {
|
|||||||
DBImpl(const DBImpl&);
|
DBImpl(const DBImpl&);
|
||||||
void operator=(const DBImpl&);
|
void operator=(const DBImpl&);
|
||||||
|
|
||||||
// Return the earliest snapshot where seqno is visible.
|
|
||||||
// Store the snapshot right before that, if any, in prev_snapshot
|
|
||||||
inline SequenceNumber findEarliestVisibleSnapshot(
|
|
||||||
SequenceNumber in,
|
|
||||||
std::vector<SequenceNumber>& snapshots,
|
|
||||||
SequenceNumber* prev_snapshot);
|
|
||||||
|
|
||||||
// Background threads call this function, which is just a wrapper around
|
// Background threads call this function, which is just a wrapper around
|
||||||
// the InstallSuperVersion() function. Background threads carry
|
// the InstallSuperVersion() function. Background threads carry
|
||||||
// job_context which can have new_superversion already
|
// job_context which can have new_superversion already
|
||||||
|
@ -90,6 +90,12 @@ Status DBImpl::FlushMemTableToOutputFile(
|
|||||||
|
|
||||||
flush_job.PickMemTable();
|
flush_job.PickMemTable();
|
||||||
|
|
||||||
|
#ifndef ROCKSDB_LITE
|
||||||
|
// may temporarily unlock and lock the mutex.
|
||||||
|
NotifyOnFlushBegin(cfd, &file_meta, mutable_cf_options, job_context->job_id,
|
||||||
|
flush_job.GetTableProperties());
|
||||||
|
#endif // ROCKSDB_LITE
|
||||||
|
|
||||||
Status s;
|
Status s;
|
||||||
if (logfile_number_ > 0 &&
|
if (logfile_number_ > 0 &&
|
||||||
versions_->GetColumnFamilySet()->NumberOfColumnFamilies() > 0) {
|
versions_->GetColumnFamilySet()->NumberOfColumnFamilies() > 0) {
|
||||||
@ -156,6 +162,49 @@ Status DBImpl::FlushMemTableToOutputFile(
|
|||||||
return s;
|
return s;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void DBImpl::NotifyOnFlushBegin(ColumnFamilyData* cfd, FileMetaData* file_meta,
|
||||||
|
const MutableCFOptions& mutable_cf_options,
|
||||||
|
int job_id, TableProperties prop) {
|
||||||
|
#ifndef ROCKSDB_LITE
|
||||||
|
if (immutable_db_options_.listeners.size() == 0U) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
mutex_.AssertHeld();
|
||||||
|
if (shutting_down_.load(std::memory_order_acquire)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
bool triggered_writes_slowdown =
|
||||||
|
(cfd->current()->storage_info()->NumLevelFiles(0) >=
|
||||||
|
mutable_cf_options.level0_slowdown_writes_trigger);
|
||||||
|
bool triggered_writes_stop =
|
||||||
|
(cfd->current()->storage_info()->NumLevelFiles(0) >=
|
||||||
|
mutable_cf_options.level0_stop_writes_trigger);
|
||||||
|
// release lock while notifying events
|
||||||
|
mutex_.Unlock();
|
||||||
|
{
|
||||||
|
FlushJobInfo info;
|
||||||
|
info.cf_name = cfd->GetName();
|
||||||
|
// TODO(yhchiang): make db_paths dynamic in case flush does not
|
||||||
|
// go to L0 in the future.
|
||||||
|
info.file_path = MakeTableFileName(immutable_db_options_.db_paths[0].path,
|
||||||
|
file_meta->fd.GetNumber());
|
||||||
|
info.thread_id = env_->GetThreadID();
|
||||||
|
info.job_id = job_id;
|
||||||
|
info.triggered_writes_slowdown = triggered_writes_slowdown;
|
||||||
|
info.triggered_writes_stop = triggered_writes_stop;
|
||||||
|
info.smallest_seqno = file_meta->smallest_seqno;
|
||||||
|
info.largest_seqno = file_meta->largest_seqno;
|
||||||
|
info.table_properties = prop;
|
||||||
|
for (auto listener : immutable_db_options_.listeners) {
|
||||||
|
listener->OnFlushBegin(this, info);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
mutex_.Lock();
|
||||||
|
// no need to signal bg_cv_ as it will be signaled at the end of the
|
||||||
|
// flush process.
|
||||||
|
#endif // ROCKSDB_LITE
|
||||||
|
}
|
||||||
|
|
||||||
void DBImpl::NotifyOnFlushCompleted(ColumnFamilyData* cfd,
|
void DBImpl::NotifyOnFlushCompleted(ColumnFamilyData* cfd,
|
||||||
FileMetaData* file_meta,
|
FileMetaData* file_meta,
|
||||||
const MutableCFOptions& mutable_cf_options,
|
const MutableCFOptions& mutable_cf_options,
|
||||||
|
@ -444,6 +444,7 @@ void DBImpl::PurgeObsoleteFiles(const JobContext& state, bool schedule_only) {
|
|||||||
case kIdentityFile:
|
case kIdentityFile:
|
||||||
case kMetaDatabase:
|
case kMetaDatabase:
|
||||||
case kOptionsFile:
|
case kOptionsFile:
|
||||||
|
case kBlobFile:
|
||||||
keep = true;
|
keep = true;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -34,8 +34,8 @@ static const int kMaxRecordType = kRecyclableLastType;
|
|||||||
|
|
||||||
static const unsigned int kBlockSize = 32768;
|
static const unsigned int kBlockSize = 32768;
|
||||||
|
|
||||||
// Header is checksum (4 bytes), type (1 byte), length (2 bytes).
|
// Header is checksum (4 bytes), length (2 bytes), type (1 byte)
|
||||||
static const int kHeaderSize = 4 + 1 + 2;
|
static const int kHeaderSize = 4 + 2 + 1;
|
||||||
|
|
||||||
// Recyclable header is checksum (4 bytes), type (1 byte), log number
|
// Recyclable header is checksum (4 bytes), type (1 byte), log number
|
||||||
// (4 bytes), length (2 bytes).
|
// (4 bytes), length (2 bytes).
|
||||||
|
@ -724,9 +724,9 @@ void MemTable::Update(SequenceNumber seq,
|
|||||||
uint32_t new_size = static_cast<uint32_t>(value.size());
|
uint32_t new_size = static_cast<uint32_t>(value.size());
|
||||||
|
|
||||||
// Update value, if new value size <= previous value size
|
// Update value, if new value size <= previous value size
|
||||||
if (new_size <= prev_size ) {
|
if (new_size <= prev_size) {
|
||||||
char* p = EncodeVarint32(const_cast<char*>(key_ptr) + key_length,
|
char* p =
|
||||||
new_size);
|
EncodeVarint32(const_cast<char*>(key_ptr) + key_length, new_size);
|
||||||
WriteLock wl(GetLock(lkey.user_key()));
|
WriteLock wl(GetLock(lkey.user_key()));
|
||||||
memcpy(p, value.data(), value.size());
|
memcpy(p, value.data(), value.size());
|
||||||
assert((unsigned)((p + value.size()) - entry) ==
|
assert((unsigned)((p + value.size()) - entry) ==
|
||||||
|
@ -163,6 +163,20 @@ class Env {
|
|||||||
unique_ptr<WritableFile>* result,
|
unique_ptr<WritableFile>* result,
|
||||||
const EnvOptions& options) = 0;
|
const EnvOptions& options) = 0;
|
||||||
|
|
||||||
|
// Create an object that writes to a new file with the specified
|
||||||
|
// name. Deletes any existing file with the same name and creates a
|
||||||
|
// new file. On success, stores a pointer to the new file in
|
||||||
|
// *result and returns OK. On failure stores nullptr in *result and
|
||||||
|
// returns non-OK.
|
||||||
|
//
|
||||||
|
// The returned file will only be accessed by one thread at a time.
|
||||||
|
virtual Status ReopenWritableFile(const std::string& fname,
|
||||||
|
unique_ptr<WritableFile>* result,
|
||||||
|
const EnvOptions& options) {
|
||||||
|
Status s;
|
||||||
|
return s;
|
||||||
|
}
|
||||||
|
|
||||||
// Reuse an existing file by renaming it and opening it as writable.
|
// Reuse an existing file by renaming it and opening it as writable.
|
||||||
virtual Status ReuseWritableFile(const std::string& fname,
|
virtual Status ReuseWritableFile(const std::string& fname,
|
||||||
const std::string& old_fname,
|
const std::string& old_fname,
|
||||||
@ -454,6 +468,8 @@ class SequentialFile {
|
|||||||
// aligned buffer for Direct I/O
|
// aligned buffer for Direct I/O
|
||||||
virtual size_t GetRequiredBufferAlignment() const { return kDefaultPageSize; }
|
virtual size_t GetRequiredBufferAlignment() const { return kDefaultPageSize; }
|
||||||
|
|
||||||
|
virtual void Rewind() {}
|
||||||
|
|
||||||
// Remove any kind of caching of data from the offset to offset+length
|
// Remove any kind of caching of data from the offset to offset+length
|
||||||
// of this file. If the length is 0, then it refers to the end of file.
|
// of this file. If the length is 0, then it refers to the end of file.
|
||||||
// If the system is not caching the file contents, then this is a noop.
|
// If the system is not caching the file contents, then this is a noop.
|
||||||
@ -918,6 +934,11 @@ class EnvWrapper : public Env {
|
|||||||
const EnvOptions& options) override {
|
const EnvOptions& options) override {
|
||||||
return target_->NewWritableFile(f, r, options);
|
return target_->NewWritableFile(f, r, options);
|
||||||
}
|
}
|
||||||
|
Status ReopenWritableFile(const std::string& fname,
|
||||||
|
unique_ptr<WritableFile>* result,
|
||||||
|
const EnvOptions& options) override {
|
||||||
|
return target_->ReopenWritableFile(fname, result, options);
|
||||||
|
}
|
||||||
Status ReuseWritableFile(const std::string& fname,
|
Status ReuseWritableFile(const std::string& fname,
|
||||||
const std::string& old_fname,
|
const std::string& old_fname,
|
||||||
unique_ptr<WritableFile>* r,
|
unique_ptr<WritableFile>* r,
|
||||||
|
@ -183,6 +183,28 @@ struct ExternalFileIngestionInfo {
|
|||||||
TableProperties table_properties;
|
TableProperties table_properties;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// A call-back function to RocksDB which will be called when the compaction
|
||||||
|
// iterator is compacting values. It is mean to be returned from
|
||||||
|
// EventListner::GetCompactionEventListner() at the beginning of compaction
|
||||||
|
// job.
|
||||||
|
class CompactionEventListener {
|
||||||
|
public:
|
||||||
|
enum CompactionListenerValueType {
|
||||||
|
kValue,
|
||||||
|
kMergeOperand,
|
||||||
|
kDelete,
|
||||||
|
kSingleDelete,
|
||||||
|
kRangeDelete,
|
||||||
|
kInvalid,
|
||||||
|
};
|
||||||
|
|
||||||
|
virtual void OnCompaction(int level, const Slice& key,
|
||||||
|
CompactionListenerValueType value_type,
|
||||||
|
const Slice& existing_value,
|
||||||
|
const SequenceNumber& sn, bool is_new) = 0;
|
||||||
|
|
||||||
|
virtual ~CompactionEventListener() = default;
|
||||||
|
};
|
||||||
|
|
||||||
// EventListener class contains a set of call-back functions that will
|
// EventListener class contains a set of call-back functions that will
|
||||||
// be called when specific RocksDB event happens such as flush. It can
|
// be called when specific RocksDB event happens such as flush. It can
|
||||||
@ -225,6 +247,16 @@ class EventListener {
|
|||||||
virtual void OnFlushCompleted(DB* /*db*/,
|
virtual void OnFlushCompleted(DB* /*db*/,
|
||||||
const FlushJobInfo& /*flush_job_info*/) {}
|
const FlushJobInfo& /*flush_job_info*/) {}
|
||||||
|
|
||||||
|
// A call-back function to RocksDB which will be called before a
|
||||||
|
// RocksDB starts to flush memtables. The default implementation is
|
||||||
|
// no-op.
|
||||||
|
//
|
||||||
|
// Note that the this function must be implemented in a way such that
|
||||||
|
// it should not run for an extended period of time before the function
|
||||||
|
// returns. Otherwise, RocksDB may be blocked.
|
||||||
|
virtual void OnFlushBegin(DB* /*db*/,
|
||||||
|
const FlushJobInfo& /*flush_job_info*/) {}
|
||||||
|
|
||||||
// A call-back function for RocksDB which will be called whenever
|
// A call-back function for RocksDB which will be called whenever
|
||||||
// a SST file is deleted. Different from OnCompactionCompleted and
|
// a SST file is deleted. Different from OnCompactionCompleted and
|
||||||
// OnFlushCompleted, this call-back is designed for external logging
|
// OnFlushCompleted, this call-back is designed for external logging
|
||||||
@ -314,6 +346,12 @@ class EventListener {
|
|||||||
virtual void OnExternalFileIngested(
|
virtual void OnExternalFileIngested(
|
||||||
DB* /*db*/, const ExternalFileIngestionInfo& /*info*/) {}
|
DB* /*db*/, const ExternalFileIngestionInfo& /*info*/) {}
|
||||||
|
|
||||||
|
// Factory method to return CompactionEventListener. If multiple listeners
|
||||||
|
// provides CompactionEventListner, only the first one will be used.
|
||||||
|
virtual CompactionEventListener* GetCompactionEventListener() {
|
||||||
|
return nullptr;
|
||||||
|
}
|
||||||
|
|
||||||
virtual ~EventListener() {}
|
virtual ~EventListener() {}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -48,6 +48,7 @@ extern void PutLengthPrefixedSliceParts(std::string* dst,
|
|||||||
// Standard Get... routines parse a value from the beginning of a Slice
|
// Standard Get... routines parse a value from the beginning of a Slice
|
||||||
// and advance the slice past the parsed value.
|
// and advance the slice past the parsed value.
|
||||||
extern bool GetFixed64(Slice* input, uint64_t* value);
|
extern bool GetFixed64(Slice* input, uint64_t* value);
|
||||||
|
extern bool GetFixed32(Slice* input, uint32_t* value);
|
||||||
extern bool GetVarint32(Slice* input, uint32_t* value);
|
extern bool GetVarint32(Slice* input, uint32_t* value);
|
||||||
extern bool GetVarint64(Slice* input, uint64_t* value);
|
extern bool GetVarint64(Slice* input, uint64_t* value);
|
||||||
extern bool GetLengthPrefixedSlice(Slice* input, Slice* result);
|
extern bool GetLengthPrefixedSlice(Slice* input, Slice* result);
|
||||||
@ -271,6 +272,15 @@ inline bool GetFixed64(Slice* input, uint64_t* value) {
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
inline bool GetFixed32(Slice* input, uint32_t* value) {
|
||||||
|
if (input->size() < sizeof(uint32_t)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
*value = DecodeFixed32(input->data());
|
||||||
|
input->remove_prefix(sizeof(uint32_t));
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
inline bool GetVarint32(Slice* input, uint32_t* value) {
|
inline bool GetVarint32(Slice* input, uint32_t* value) {
|
||||||
const char* p = input->data();
|
const char* p = input->data();
|
||||||
const char* limit = p + input->size();
|
const char* limit = p + input->size();
|
||||||
|
@ -27,6 +27,7 @@ namespace rocksdb {
|
|||||||
|
|
||||||
static const std::string kRocksDbTFileExt = "sst";
|
static const std::string kRocksDbTFileExt = "sst";
|
||||||
static const std::string kLevelDbTFileExt = "ldb";
|
static const std::string kLevelDbTFileExt = "ldb";
|
||||||
|
static const std::string kRocksDBBlobFileExt = "blob";
|
||||||
|
|
||||||
// Given a path, flatten the path name by replacing all chars not in
|
// Given a path, flatten the path name by replacing all chars not in
|
||||||
// {[0-9,a-z,A-Z,-,_,.]} with _. And append '_LOG\0' at the end.
|
// {[0-9,a-z,A-Z,-,_,.]} with _. And append '_LOG\0' at the end.
|
||||||
@ -74,6 +75,11 @@ std::string LogFileName(const std::string& name, uint64_t number) {
|
|||||||
return MakeFileName(name, number, "log");
|
return MakeFileName(name, number, "log");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
std::string BlobFileName(const std::string& blobdirname, uint64_t number) {
|
||||||
|
assert(number > 0);
|
||||||
|
return MakeFileName(blobdirname, number, kRocksDBBlobFileExt.c_str());
|
||||||
|
}
|
||||||
|
|
||||||
std::string ArchivalDirectory(const std::string& dir) {
|
std::string ArchivalDirectory(const std::string& dir) {
|
||||||
return dir + "/" + ARCHIVAL_DIR;
|
return dir + "/" + ARCHIVAL_DIR;
|
||||||
}
|
}
|
||||||
@ -220,7 +226,7 @@ std::string IdentityFileName(const std::string& dbname) {
|
|||||||
// dbname/<info_log_name_prefix>
|
// dbname/<info_log_name_prefix>
|
||||||
// dbname/<info_log_name_prefix>.old.[0-9]+
|
// dbname/<info_log_name_prefix>.old.[0-9]+
|
||||||
// dbname/MANIFEST-[0-9]+
|
// dbname/MANIFEST-[0-9]+
|
||||||
// dbname/[0-9]+.(log|sst)
|
// dbname/[0-9]+.(log|sst|blob)
|
||||||
// dbname/METADB-[0-9]+
|
// dbname/METADB-[0-9]+
|
||||||
// dbname/OPTIONS-[0-9]+
|
// dbname/OPTIONS-[0-9]+
|
||||||
// dbname/OPTIONS-[0-9]+.dbtmp
|
// dbname/OPTIONS-[0-9]+.dbtmp
|
||||||
@ -335,6 +341,8 @@ bool ParseFileName(const std::string& fname, uint64_t* number,
|
|||||||
} else if (suffix == Slice(kRocksDbTFileExt) ||
|
} else if (suffix == Slice(kRocksDbTFileExt) ||
|
||||||
suffix == Slice(kLevelDbTFileExt)) {
|
suffix == Slice(kLevelDbTFileExt)) {
|
||||||
*type = kTableFile;
|
*type = kTableFile;
|
||||||
|
} else if (suffix == Slice(kRocksDBBlobFileExt)) {
|
||||||
|
*type = kBlobFile;
|
||||||
} else if (suffix == Slice(kTempFileNameSuffix)) {
|
} else if (suffix == Slice(kTempFileNameSuffix)) {
|
||||||
*type = kTempFile;
|
*type = kTempFile;
|
||||||
} else {
|
} else {
|
||||||
|
@ -38,7 +38,8 @@ enum FileType {
|
|||||||
kInfoLogFile, // Either the current one, or an old one
|
kInfoLogFile, // Either the current one, or an old one
|
||||||
kMetaDatabase,
|
kMetaDatabase,
|
||||||
kIdentityFile,
|
kIdentityFile,
|
||||||
kOptionsFile
|
kOptionsFile,
|
||||||
|
kBlobFile
|
||||||
};
|
};
|
||||||
|
|
||||||
// Return the name of the log file with the specified number
|
// Return the name of the log file with the specified number
|
||||||
@ -46,6 +47,8 @@ enum FileType {
|
|||||||
// "dbname".
|
// "dbname".
|
||||||
extern std::string LogFileName(const std::string& dbname, uint64_t number);
|
extern std::string LogFileName(const std::string& dbname, uint64_t number);
|
||||||
|
|
||||||
|
extern std::string BlobFileName(const std::string& bdirname, uint64_t number);
|
||||||
|
|
||||||
static const std::string ARCHIVAL_DIR = "archive";
|
static const std::string ARCHIVAL_DIR = "archive";
|
||||||
|
|
||||||
extern std::string ArchivalDirectory(const std::string& dbname);
|
extern std::string ArchivalDirectory(const std::string& dbname);
|
||||||
|
Loading…
x
Reference in New Issue
Block a user