Add a callback for when memtable is moved to immutable (#1137)
* Create a callback for memtable becoming immutable Create a callback for memtable becoming immutable Create a callback for memtable becoming immutable moved notification outside the lock Move sealed notification to unlocked portion of SwitchMemtable * fix lite build
This commit is contained in:
parent
8cf0f86d39
commit
3a276b0cbe
@ -4862,6 +4862,22 @@ Status DBImpl::ScheduleFlushes(WriteContext* context) {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
#ifndef ROCKSDB_LITE
|
||||
void DBImpl::NotifyOnMemTableSealed(ColumnFamilyData* cfd,
|
||||
const MemTableInfo& mem_table_info) {
|
||||
if (db_options_.listeners.size() == 0U) {
|
||||
return;
|
||||
}
|
||||
if (shutting_down_.load(std::memory_order_acquire)) {
|
||||
return;
|
||||
}
|
||||
|
||||
for (auto listener : db_options_.listeners) {
|
||||
listener->OnMemTableSealed(mem_table_info);
|
||||
}
|
||||
}
|
||||
#endif // ROCKSDB_LITE
|
||||
|
||||
// REQUIRES: mutex_ is held
|
||||
// REQUIRES: this thread is currently at the front of the writer queue
|
||||
Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
|
||||
@ -4884,6 +4900,17 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
|
||||
creating_new_log ? versions_->NewFileNumber() : logfile_number_;
|
||||
SuperVersion* new_superversion = nullptr;
|
||||
const MutableCFOptions mutable_cf_options = *cfd->GetLatestMutableCFOptions();
|
||||
|
||||
// Set current_memtble_info for memtable sealed callback
|
||||
#ifndef ROCKSDB_LITE
|
||||
MemTableInfo memtable_info;
|
||||
memtable_info.cf_name = cfd->GetName();
|
||||
memtable_info.first_seqno = cfd->mem()->GetFirstSequenceNumber();
|
||||
memtable_info.earliest_seqno = cfd->mem()->GetEarliestSequenceNumber();
|
||||
memtable_info.num_entries = cfd->mem()->num_entries();
|
||||
memtable_info.num_deletes = cfd->mem()->num_deletes();
|
||||
#endif // ROCKSDB_LITE
|
||||
|
||||
mutex_.Unlock();
|
||||
Status s;
|
||||
{
|
||||
@ -4920,6 +4947,13 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
|
||||
new_mem = cfd->ConstructNewMemtable(mutable_cf_options, seq);
|
||||
new_superversion = new SuperVersion();
|
||||
}
|
||||
|
||||
#ifndef ROCKSDB_LITE
|
||||
// PLEASE NOTE: We assume that there are no failable operations
|
||||
// after lock is acquired below since we are already notifying
|
||||
// client about mem table becoming immutable.
|
||||
NotifyOnMemTableSealed(cfd, memtable_info);
|
||||
#endif //ROCKSDB_LITE
|
||||
}
|
||||
Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
|
||||
"[%s] New memtable created with log file: #%" PRIu64
|
||||
|
@ -57,6 +57,7 @@ class Arena;
|
||||
class WriteCallback;
|
||||
struct JobContext;
|
||||
struct ExternalSstFileInfo;
|
||||
struct MemTableInfo;
|
||||
|
||||
class DBImpl : public DB {
|
||||
public:
|
||||
@ -520,6 +521,8 @@ class DBImpl : public DB {
|
||||
Compaction *c, const Status &st,
|
||||
const CompactionJobStats& job_stats,
|
||||
int job_id);
|
||||
void NotifyOnMemTableSealed(ColumnFamilyData* cfd,
|
||||
const MemTableInfo& mem_table_info);
|
||||
|
||||
void NewThreadStatusCfInfo(ColumnFamilyData* cfd) const;
|
||||
|
||||
|
@ -723,7 +723,39 @@ TEST_F(EventListenerTest, TableFileCreationListenersTest) {
|
||||
dbfull()->TEST_WaitForCompact();
|
||||
listener->CheckAndResetCounters(1, 1, 0, 1, 1, 1);
|
||||
}
|
||||
} // namespace rocksdb
|
||||
|
||||
class MemTableSealedListener : public EventListener {
|
||||
private:
|
||||
SequenceNumber latest_seq_number_;
|
||||
public:
|
||||
MemTableSealedListener() {}
|
||||
void OnMemTableSealed(const MemTableInfo& info) override {
|
||||
latest_seq_number_ = info.first_seqno;
|
||||
}
|
||||
|
||||
void OnFlushCompleted(DB* /*db*/,
|
||||
const FlushJobInfo& flush_job_info) override {
|
||||
ASSERT_LE(flush_job_info.smallest_seqno, latest_seq_number_);
|
||||
}
|
||||
};
|
||||
|
||||
TEST_F(EventListenerTest, MemTableSealedListenerTest) {
|
||||
auto listener = std::make_shared<MemTableSealedListener>();
|
||||
Options options;
|
||||
options.create_if_missing = true;
|
||||
options.listeners.push_back(listener);
|
||||
DestroyAndReopen(options);
|
||||
|
||||
for (unsigned int i = 0; i < 10; i++) {
|
||||
std::string tag = std::to_string(i);
|
||||
ASSERT_OK(Put("foo"+tag, "aaa"));
|
||||
ASSERT_OK(Put("bar"+tag, "bbb"));
|
||||
|
||||
ASSERT_OK(Flush());
|
||||
}
|
||||
}
|
||||
} // namespace rocksdb
|
||||
|
||||
|
||||
#endif // ROCKSDB_LITE
|
||||
|
||||
|
@ -151,6 +151,24 @@ struct CompactionJobInfo {
|
||||
CompactionJobStats stats;
|
||||
};
|
||||
|
||||
struct MemTableInfo {
|
||||
// the name of the column family to which memtable belongs
|
||||
std::string cf_name;
|
||||
// Sequence number of the first element that was inserted
|
||||
// into the memtable.
|
||||
SequenceNumber first_seqno;
|
||||
// Sequence number that is guaranteed to be smaller than or equal
|
||||
// to the sequence number of any key that could be inserted into this
|
||||
// memtable. It can then be assumed that any write with a larger(or equal)
|
||||
// sequence number will be present in this memtable or a later memtable.
|
||||
SequenceNumber earliest_seqno;
|
||||
// Total number of entries in memtable
|
||||
uint64_t num_entries;
|
||||
// Total number of deletes in memtable
|
||||
uint64_t num_deletes;
|
||||
|
||||
};
|
||||
|
||||
// EventListener class contains a set of call-back functions that will
|
||||
// be called when specific RocksDB event happens such as flush. It can
|
||||
// be used as a building block for developing custom features such as
|
||||
@ -247,6 +265,19 @@ class EventListener {
|
||||
// returned value.
|
||||
virtual void OnTableFileCreationStarted(
|
||||
const TableFileCreationBriefInfo& /*info*/) {}
|
||||
|
||||
// A call-back function for RocksDB which will be called before
|
||||
// a memtable is made immutable.
|
||||
//
|
||||
// Note that the this function must be implemented in a way such that
|
||||
// it should not run for an extended period of time before the function
|
||||
// returns. Otherwise, RocksDB may be blocked.
|
||||
//
|
||||
// Note that if applications would like to use the passed reference
|
||||
// outside this function call, they should make copies from these
|
||||
// returned value.
|
||||
virtual void OnMemTableSealed(
|
||||
const MemTableInfo& /*info*/) {}
|
||||
|
||||
virtual ~EventListener() {}
|
||||
};
|
||||
|
Loading…
x
Reference in New Issue
Block a user