Pass CF ID to MemTableRepFactory
Summary: Some users want to monitor column family activity in their custom memtable implementations. Previously there was no way to figure out with which column family a memtable is associated. This diff: - adds an overload to MemTableRepFactory::CreateMemTableRep() that provides the CF ID. For compatibility, its default implementation calls the old overload. - updates MemTable to create MemTableRep's using the new overload. Closes https://github.com/facebook/rocksdb/pull/2346 Differential Revision: D5108061 Pulled By: ajkr fbshipit-source-id: 3a1921214a348dd8ea0f54e1cab3b71c3d46d616
This commit is contained in:
parent
f68d88be51
commit
a4d9c02511
@ -785,7 +785,7 @@ uint64_t ColumnFamilyData::GetTotalSstFilesSize() const {
|
|||||||
MemTable* ColumnFamilyData::ConstructNewMemtable(
|
MemTable* ColumnFamilyData::ConstructNewMemtable(
|
||||||
const MutableCFOptions& mutable_cf_options, SequenceNumber earliest_seq) {
|
const MutableCFOptions& mutable_cf_options, SequenceNumber earliest_seq) {
|
||||||
return new MemTable(internal_comparator_, ioptions_, mutable_cf_options,
|
return new MemTable(internal_comparator_, ioptions_, mutable_cf_options,
|
||||||
write_buffer_manager_, earliest_seq);
|
write_buffer_manager_, earliest_seq, id_);
|
||||||
}
|
}
|
||||||
|
|
||||||
void ColumnFamilyData::CreateNewMemtable(
|
void ColumnFamilyData::CreateNewMemtable(
|
||||||
|
@ -84,14 +84,27 @@ class MockMemTableRepFactory : public MemTableRepFactory {
|
|||||||
return mock_rep_;
|
return mock_rep_;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
virtual MemTableRep* CreateMemTableRep(const MemTableRep::KeyComparator& cmp,
|
||||||
|
MemTableAllocator* allocator,
|
||||||
|
const SliceTransform* transform,
|
||||||
|
Logger* logger,
|
||||||
|
uint32_t column_family_id) override {
|
||||||
|
last_column_family_id_ = column_family_id;
|
||||||
|
return CreateMemTableRep(cmp, allocator, transform, logger);
|
||||||
|
}
|
||||||
|
|
||||||
virtual const char* Name() const override { return "MockMemTableRepFactory"; }
|
virtual const char* Name() const override { return "MockMemTableRepFactory"; }
|
||||||
|
|
||||||
MockMemTableRep* rep() { return mock_rep_; }
|
MockMemTableRep* rep() { return mock_rep_; }
|
||||||
|
|
||||||
bool IsInsertConcurrentlySupported() const override { return false; }
|
bool IsInsertConcurrentlySupported() const override { return false; }
|
||||||
|
|
||||||
|
uint32_t GetLastColumnFamilyId() { return last_column_family_id_; }
|
||||||
|
|
||||||
private:
|
private:
|
||||||
MockMemTableRep* mock_rep_;
|
MockMemTableRep* mock_rep_;
|
||||||
|
// workaround since there's no port::kMaxUint32 yet.
|
||||||
|
uint32_t last_column_family_id_ = static_cast<uint32_t>(-1);
|
||||||
};
|
};
|
||||||
|
|
||||||
class TestPrefixExtractor : public SliceTransform {
|
class TestPrefixExtractor : public SliceTransform {
|
||||||
@ -157,6 +170,24 @@ TEST_F(DBMemTableTest, InsertWithHint) {
|
|||||||
ASSERT_EQ("vvv", Get("whitelisted"));
|
ASSERT_EQ("vvv", Get("whitelisted"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST_F(DBMemTableTest, ColumnFamilyId) {
|
||||||
|
// Verifies MemTableRepFactory is told the right column family id.
|
||||||
|
Options options;
|
||||||
|
options.allow_concurrent_memtable_write = false;
|
||||||
|
options.create_if_missing = true;
|
||||||
|
options.memtable_factory.reset(new MockMemTableRepFactory());
|
||||||
|
DestroyAndReopen(options);
|
||||||
|
CreateAndReopenWithCF({"pikachu"}, options);
|
||||||
|
|
||||||
|
for (int cf = 0; cf < 2; ++cf) {
|
||||||
|
ASSERT_OK(Put(cf, "key", "val"));
|
||||||
|
ASSERT_OK(Flush(cf));
|
||||||
|
ASSERT_EQ(
|
||||||
|
cf, static_cast<MockMemTableRepFactory*>(options.memtable_factory.get())
|
||||||
|
->GetLastColumnFamilyId());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
} // namespace rocksdb
|
} // namespace rocksdb
|
||||||
|
|
||||||
int main(int argc, char** argv) {
|
int main(int argc, char** argv) {
|
||||||
|
@ -184,6 +184,7 @@ class SpecialSkipListFactory : public MemTableRepFactory {
|
|||||||
explicit SpecialSkipListFactory(int num_entries_flush)
|
explicit SpecialSkipListFactory(int num_entries_flush)
|
||||||
: num_entries_flush_(num_entries_flush) {}
|
: num_entries_flush_(num_entries_flush) {}
|
||||||
|
|
||||||
|
using MemTableRepFactory::CreateMemTableRep;
|
||||||
virtual MemTableRep* CreateMemTableRep(
|
virtual MemTableRep* CreateMemTableRep(
|
||||||
const MemTableRep::KeyComparator& compare, MemTableAllocator* allocator,
|
const MemTableRep::KeyComparator& compare, MemTableAllocator* allocator,
|
||||||
const SliceTransform* transform, Logger* logger) override {
|
const SliceTransform* transform, Logger* logger) override {
|
||||||
|
@ -63,7 +63,7 @@ MemTable::MemTable(const InternalKeyComparator& cmp,
|
|||||||
const ImmutableCFOptions& ioptions,
|
const ImmutableCFOptions& ioptions,
|
||||||
const MutableCFOptions& mutable_cf_options,
|
const MutableCFOptions& mutable_cf_options,
|
||||||
WriteBufferManager* write_buffer_manager,
|
WriteBufferManager* write_buffer_manager,
|
||||||
SequenceNumber latest_seq)
|
SequenceNumber latest_seq, uint32_t column_family_id)
|
||||||
: comparator_(cmp),
|
: comparator_(cmp),
|
||||||
moptions_(ioptions, mutable_cf_options),
|
moptions_(ioptions, mutable_cf_options),
|
||||||
refs_(0),
|
refs_(0),
|
||||||
@ -73,10 +73,10 @@ MemTable::MemTable(const InternalKeyComparator& cmp,
|
|||||||
allocator_(&arena_, write_buffer_manager),
|
allocator_(&arena_, write_buffer_manager),
|
||||||
table_(ioptions.memtable_factory->CreateMemTableRep(
|
table_(ioptions.memtable_factory->CreateMemTableRep(
|
||||||
comparator_, &allocator_, ioptions.prefix_extractor,
|
comparator_, &allocator_, ioptions.prefix_extractor,
|
||||||
ioptions.info_log)),
|
ioptions.info_log, column_family_id)),
|
||||||
range_del_table_(SkipListFactory().CreateMemTableRep(
|
range_del_table_(SkipListFactory().CreateMemTableRep(
|
||||||
comparator_, &allocator_, nullptr /* transform */,
|
comparator_, &allocator_, nullptr /* transform */, ioptions.info_log,
|
||||||
ioptions.info_log)),
|
column_family_id)),
|
||||||
is_range_del_table_empty_(true),
|
is_range_del_table_empty_(true),
|
||||||
data_size_(0),
|
data_size_(0),
|
||||||
num_entries_(0),
|
num_entries_(0),
|
||||||
|
@ -103,7 +103,7 @@ class MemTable {
|
|||||||
const ImmutableCFOptions& ioptions,
|
const ImmutableCFOptions& ioptions,
|
||||||
const MutableCFOptions& mutable_cf_options,
|
const MutableCFOptions& mutable_cf_options,
|
||||||
WriteBufferManager* write_buffer_manager,
|
WriteBufferManager* write_buffer_manager,
|
||||||
SequenceNumber earliest_seq);
|
SequenceNumber earliest_seq, uint32_t column_family_id);
|
||||||
|
|
||||||
// Do not delete this MemTable unless Unref() indicates it not in use.
|
// Do not delete this MemTable unless Unref() indicates it not in use.
|
||||||
~MemTable();
|
~MemTable();
|
||||||
|
@ -136,7 +136,7 @@ TEST_F(MemTableListTest, GetTest) {
|
|||||||
|
|
||||||
WriteBufferManager wb(options.db_write_buffer_size);
|
WriteBufferManager wb(options.db_write_buffer_size);
|
||||||
MemTable* mem = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb,
|
MemTable* mem = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb,
|
||||||
kMaxSequenceNumber);
|
kMaxSequenceNumber, 0 /* column_family_id */);
|
||||||
mem->Ref();
|
mem->Ref();
|
||||||
|
|
||||||
// Write some keys to this memtable.
|
// Write some keys to this memtable.
|
||||||
@ -175,7 +175,7 @@ TEST_F(MemTableListTest, GetTest) {
|
|||||||
// Create another memtable and write some keys to it
|
// Create another memtable and write some keys to it
|
||||||
WriteBufferManager wb2(options.db_write_buffer_size);
|
WriteBufferManager wb2(options.db_write_buffer_size);
|
||||||
MemTable* mem2 = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb2,
|
MemTable* mem2 = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb2,
|
||||||
kMaxSequenceNumber);
|
kMaxSequenceNumber, 0 /* column_family_id */);
|
||||||
mem2->Ref();
|
mem2->Ref();
|
||||||
|
|
||||||
mem2->Add(++seq, kTypeDeletion, "key1", "");
|
mem2->Add(++seq, kTypeDeletion, "key1", "");
|
||||||
@ -243,7 +243,7 @@ TEST_F(MemTableListTest, GetFromHistoryTest) {
|
|||||||
|
|
||||||
WriteBufferManager wb(options.db_write_buffer_size);
|
WriteBufferManager wb(options.db_write_buffer_size);
|
||||||
MemTable* mem = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb,
|
MemTable* mem = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb,
|
||||||
kMaxSequenceNumber);
|
kMaxSequenceNumber, 0 /* column_family_id */);
|
||||||
mem->Ref();
|
mem->Ref();
|
||||||
|
|
||||||
// Write some keys to this memtable.
|
// Write some keys to this memtable.
|
||||||
@ -321,7 +321,7 @@ TEST_F(MemTableListTest, GetFromHistoryTest) {
|
|||||||
// Create another memtable and write some keys to it
|
// Create another memtable and write some keys to it
|
||||||
WriteBufferManager wb2(options.db_write_buffer_size);
|
WriteBufferManager wb2(options.db_write_buffer_size);
|
||||||
MemTable* mem2 = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb2,
|
MemTable* mem2 = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb2,
|
||||||
kMaxSequenceNumber);
|
kMaxSequenceNumber, 0 /* column_family_id */);
|
||||||
mem2->Ref();
|
mem2->Ref();
|
||||||
|
|
||||||
mem2->Add(++seq, kTypeDeletion, "key1", "");
|
mem2->Add(++seq, kTypeDeletion, "key1", "");
|
||||||
@ -346,7 +346,7 @@ TEST_F(MemTableListTest, GetFromHistoryTest) {
|
|||||||
// Add a third memtable to push the first memtable out of the history
|
// Add a third memtable to push the first memtable out of the history
|
||||||
WriteBufferManager wb3(options.db_write_buffer_size);
|
WriteBufferManager wb3(options.db_write_buffer_size);
|
||||||
MemTable* mem3 = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb3,
|
MemTable* mem3 = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb3,
|
||||||
kMaxSequenceNumber);
|
kMaxSequenceNumber, 0 /* column_family_id */);
|
||||||
mem3->Ref();
|
mem3->Ref();
|
||||||
list.Add(mem3, &to_delete);
|
list.Add(mem3, &to_delete);
|
||||||
ASSERT_EQ(1, list.NumNotFlushed());
|
ASSERT_EQ(1, list.NumNotFlushed());
|
||||||
@ -420,7 +420,7 @@ TEST_F(MemTableListTest, FlushPendingTest) {
|
|||||||
MutableCFOptions mutable_cf_options(options);
|
MutableCFOptions mutable_cf_options(options);
|
||||||
for (int i = 0; i < num_tables; i++) {
|
for (int i = 0; i < num_tables; i++) {
|
||||||
MemTable* mem = new MemTable(cmp, ioptions, mutable_cf_options, &wb,
|
MemTable* mem = new MemTable(cmp, ioptions, mutable_cf_options, &wb,
|
||||||
kMaxSequenceNumber);
|
kMaxSequenceNumber, 0 /* column_family_id */);
|
||||||
mem->Ref();
|
mem->Ref();
|
||||||
|
|
||||||
std::string value;
|
std::string value;
|
||||||
|
@ -34,7 +34,7 @@ static std::string PrintContents(WriteBatch* b) {
|
|||||||
ImmutableCFOptions ioptions(options);
|
ImmutableCFOptions ioptions(options);
|
||||||
WriteBufferManager wb(options.db_write_buffer_size);
|
WriteBufferManager wb(options.db_write_buffer_size);
|
||||||
MemTable* mem = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb,
|
MemTable* mem = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb,
|
||||||
kMaxSequenceNumber);
|
kMaxSequenceNumber, 0 /* column_family_id */);
|
||||||
mem->Ref();
|
mem->Ref();
|
||||||
std::string state;
|
std::string state;
|
||||||
ColumnFamilyMemTablesDefault cf_mems_default(mem);
|
ColumnFamilyMemTablesDefault cf_mems_default(mem);
|
||||||
|
@ -216,10 +216,18 @@ class MemTableRep {
|
|||||||
class MemTableRepFactory {
|
class MemTableRepFactory {
|
||||||
public:
|
public:
|
||||||
virtual ~MemTableRepFactory() {}
|
virtual ~MemTableRepFactory() {}
|
||||||
|
|
||||||
virtual MemTableRep* CreateMemTableRep(const MemTableRep::KeyComparator&,
|
virtual MemTableRep* CreateMemTableRep(const MemTableRep::KeyComparator&,
|
||||||
MemTableAllocator*,
|
MemTableAllocator*,
|
||||||
const SliceTransform*,
|
const SliceTransform*,
|
||||||
Logger* logger) = 0;
|
Logger* logger) = 0;
|
||||||
|
virtual MemTableRep* CreateMemTableRep(
|
||||||
|
const MemTableRep::KeyComparator& key_cmp, MemTableAllocator* allocator,
|
||||||
|
const SliceTransform* slice_transform, Logger* logger,
|
||||||
|
uint32_t /* column_family_id */) {
|
||||||
|
return CreateMemTableRep(key_cmp, allocator, slice_transform, logger);
|
||||||
|
}
|
||||||
|
|
||||||
virtual const char* Name() const = 0;
|
virtual const char* Name() const = 0;
|
||||||
|
|
||||||
// Return true if the current MemTableRep supports concurrent inserts
|
// Return true if the current MemTableRep supports concurrent inserts
|
||||||
@ -238,6 +246,7 @@ class SkipListFactory : public MemTableRepFactory {
|
|||||||
public:
|
public:
|
||||||
explicit SkipListFactory(size_t lookahead = 0) : lookahead_(lookahead) {}
|
explicit SkipListFactory(size_t lookahead = 0) : lookahead_(lookahead) {}
|
||||||
|
|
||||||
|
using MemTableRepFactory::CreateMemTableRep;
|
||||||
virtual MemTableRep* CreateMemTableRep(const MemTableRep::KeyComparator&,
|
virtual MemTableRep* CreateMemTableRep(const MemTableRep::KeyComparator&,
|
||||||
MemTableAllocator*,
|
MemTableAllocator*,
|
||||||
const SliceTransform*,
|
const SliceTransform*,
|
||||||
@ -264,10 +273,13 @@ class VectorRepFactory : public MemTableRepFactory {
|
|||||||
|
|
||||||
public:
|
public:
|
||||||
explicit VectorRepFactory(size_t count = 0) : count_(count) { }
|
explicit VectorRepFactory(size_t count = 0) : count_(count) { }
|
||||||
|
|
||||||
|
using MemTableRepFactory::CreateMemTableRep;
|
||||||
virtual MemTableRep* CreateMemTableRep(const MemTableRep::KeyComparator&,
|
virtual MemTableRep* CreateMemTableRep(const MemTableRep::KeyComparator&,
|
||||||
MemTableAllocator*,
|
MemTableAllocator*,
|
||||||
const SliceTransform*,
|
const SliceTransform*,
|
||||||
Logger* logger) override;
|
Logger* logger) override;
|
||||||
|
|
||||||
virtual const char* Name() const override {
|
virtual const char* Name() const override {
|
||||||
return "VectorRepFactory";
|
return "VectorRepFactory";
|
||||||
}
|
}
|
||||||
|
@ -28,6 +28,7 @@ class HashCuckooRepFactory : public MemTableRepFactory {
|
|||||||
|
|
||||||
virtual ~HashCuckooRepFactory() {}
|
virtual ~HashCuckooRepFactory() {}
|
||||||
|
|
||||||
|
using MemTableRepFactory::CreateMemTableRep;
|
||||||
virtual MemTableRep* CreateMemTableRep(
|
virtual MemTableRep* CreateMemTableRep(
|
||||||
const MemTableRep::KeyComparator& compare, MemTableAllocator* allocator,
|
const MemTableRep::KeyComparator& compare, MemTableAllocator* allocator,
|
||||||
const SliceTransform* transform, Logger* logger) override;
|
const SliceTransform* transform, Logger* logger) override;
|
||||||
|
@ -28,6 +28,7 @@ class HashLinkListRepFactory : public MemTableRepFactory {
|
|||||||
|
|
||||||
virtual ~HashLinkListRepFactory() {}
|
virtual ~HashLinkListRepFactory() {}
|
||||||
|
|
||||||
|
using MemTableRepFactory::CreateMemTableRep;
|
||||||
virtual MemTableRep* CreateMemTableRep(
|
virtual MemTableRep* CreateMemTableRep(
|
||||||
const MemTableRep::KeyComparator& compare, MemTableAllocator* allocator,
|
const MemTableRep::KeyComparator& compare, MemTableAllocator* allocator,
|
||||||
const SliceTransform* transform, Logger* logger) override;
|
const SliceTransform* transform, Logger* logger) override;
|
||||||
|
@ -25,6 +25,7 @@ class HashSkipListRepFactory : public MemTableRepFactory {
|
|||||||
|
|
||||||
virtual ~HashSkipListRepFactory() {}
|
virtual ~HashSkipListRepFactory() {}
|
||||||
|
|
||||||
|
using MemTableRepFactory::CreateMemTableRep;
|
||||||
virtual MemTableRep* CreateMemTableRep(
|
virtual MemTableRep* CreateMemTableRep(
|
||||||
const MemTableRep::KeyComparator& compare, MemTableAllocator* allocator,
|
const MemTableRep::KeyComparator& compare, MemTableAllocator* allocator,
|
||||||
const SliceTransform* transform, Logger* logger) override;
|
const SliceTransform* transform, Logger* logger) override;
|
||||||
|
@ -429,7 +429,7 @@ class MemTableConstructor: public Constructor {
|
|||||||
ImmutableCFOptions ioptions(options_);
|
ImmutableCFOptions ioptions(options_);
|
||||||
memtable_ =
|
memtable_ =
|
||||||
new MemTable(internal_comparator_, ioptions, MutableCFOptions(options_),
|
new MemTable(internal_comparator_, ioptions, MutableCFOptions(options_),
|
||||||
wb, kMaxSequenceNumber);
|
wb, kMaxSequenceNumber, 0 /* column_family_id */);
|
||||||
memtable_->Ref();
|
memtable_->Ref();
|
||||||
}
|
}
|
||||||
~MemTableConstructor() {
|
~MemTableConstructor() {
|
||||||
@ -443,7 +443,7 @@ class MemTableConstructor: public Constructor {
|
|||||||
ImmutableCFOptions mem_ioptions(ioptions);
|
ImmutableCFOptions mem_ioptions(ioptions);
|
||||||
memtable_ = new MemTable(internal_comparator_, mem_ioptions,
|
memtable_ = new MemTable(internal_comparator_, mem_ioptions,
|
||||||
MutableCFOptions(options_), write_buffer_manager_,
|
MutableCFOptions(options_), write_buffer_manager_,
|
||||||
kMaxSequenceNumber);
|
kMaxSequenceNumber, 0 /* column_family_id */);
|
||||||
memtable_->Ref();
|
memtable_->Ref();
|
||||||
int seq = 1;
|
int seq = 1;
|
||||||
for (const auto kv : kv_map) {
|
for (const auto kv : kv_map) {
|
||||||
@ -2594,8 +2594,9 @@ TEST_F(MemTableTest, Simple) {
|
|||||||
options.memtable_factory = table_factory;
|
options.memtable_factory = table_factory;
|
||||||
ImmutableCFOptions ioptions(options);
|
ImmutableCFOptions ioptions(options);
|
||||||
WriteBufferManager wb(options.db_write_buffer_size);
|
WriteBufferManager wb(options.db_write_buffer_size);
|
||||||
MemTable* memtable = new MemTable(cmp, ioptions, MutableCFOptions(options),
|
MemTable* memtable =
|
||||||
&wb, kMaxSequenceNumber);
|
new MemTable(cmp, ioptions, MutableCFOptions(options), &wb,
|
||||||
|
kMaxSequenceNumber, 0 /* column_family_id */);
|
||||||
memtable->Ref();
|
memtable->Ref();
|
||||||
WriteBatch batch;
|
WriteBatch batch;
|
||||||
WriteBatchInternal::SetSequence(&batch, 100);
|
WriteBatchInternal::SetSequence(&batch, 100);
|
||||||
|
Loading…
Reference in New Issue
Block a user