MemTableListVersion

Summary:
MemTableListVersion is to MemTableList what Version is to VersionSet. I took almost the same ideas to develop MemTableListVersion. The reason is to have copying std::list done in background, while flushing, rather than in foreground (MultiGet() and NewIterator()) under a mutex! Also, whenever we copied MemTableList, we copied also some MemTableList metadata (flush_requested_, commit_in_progress_, etc.), which was wasteful.

This diff avoids std::list copy under a mutex in both MultiGet() and NewIterator(). I created a small database with some number of immutable memtables, and creating 100.000 iterators in a single-thread (!) decreased from {188739, 215703, 198028} to {154352, 164035, 159817}. A lot of the savings come from code under a mutex, so we should see much higher savings with multiple threads. Creating new iterator is very important to LogDevice team.

I also think this diff will make SuperVersion obsolete for performance reasons. I will try it in the next diff. SuperVersion gave us huge savings on Get() code path, but I think that most of the savings came from copying MemTableList under a mutex. If we had MemTableListVersion, we would never need to copy the entire object (like we still do in NewIterator() and MultiGet())

Test Plan: `make check` works. I will also do `make valgrind_check` before commit

Reviewers: dhruba, haobo, kailiu, sdong, emayanke, tnovak

Reviewed By: kailiu

CC: leveldb

Differential Revision: https://reviews.facebook.net/D15255
This commit is contained in:
Igor Canadi 2014-01-24 14:52:08 -08:00
parent f131d4c280
commit c583157d49
4 changed files with 187 additions and 132 deletions

View File

@ -264,6 +264,7 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname)
bg_cv_(&mutex_), bg_cv_(&mutex_),
mem_rep_factory_(options_.memtable_factory.get()), mem_rep_factory_(options_.memtable_factory.get()),
mem_(new MemTable(internal_comparator_, options_)), mem_(new MemTable(internal_comparator_, options_)),
imm_(options_.min_write_buffer_number_to_merge),
logfile_number_(0), logfile_number_(0),
super_version_(nullptr), super_version_(nullptr),
super_version_number_(0), super_version_number_(0),
@ -360,7 +361,7 @@ DBImpl::~DBImpl() {
delete mem_->Unref(); delete mem_->Unref();
} }
imm_.UnrefAll(&to_delete); imm_.current()->Unref(&to_delete);
for (MemTable* m: to_delete) { for (MemTable* m: to_delete) {
delete m; delete m;
} }
@ -508,7 +509,7 @@ bool DBImpl::SuperVersion::Unref() {
void DBImpl::SuperVersion::Cleanup() { void DBImpl::SuperVersion::Cleanup() {
assert(refs.load(std::memory_order_relaxed) == 0); assert(refs.load(std::memory_order_relaxed) == 0);
imm.UnrefAll(&to_delete); imm->Unref(&to_delete);
MemTable* m = mem->Unref(); MemTable* m = mem->Unref();
if (m != nullptr) { if (m != nullptr) {
to_delete.push_back(m); to_delete.push_back(m);
@ -516,13 +517,13 @@ void DBImpl::SuperVersion::Cleanup() {
current->Unref(); current->Unref();
} }
void DBImpl::SuperVersion::Init(MemTable* new_mem, const MemTableList& new_imm, void DBImpl::SuperVersion::Init(MemTable* new_mem, MemTableListVersion* new_imm,
Version* new_current) { Version* new_current) {
mem = new_mem; mem = new_mem;
imm = new_imm; imm = new_imm;
current = new_current; current = new_current;
mem->Ref(); mem->Ref();
imm.RefAll(); imm->Ref();
current->Ref(); current->Ref();
refs.store(1, std::memory_order_relaxed); refs.store(1, std::memory_order_relaxed);
} }
@ -1221,7 +1222,7 @@ Status DBImpl::FlushMemTableToOutputFile(bool* madeProgress,
mutex_.AssertHeld(); mutex_.AssertHeld();
assert(imm_.size() != 0); assert(imm_.size() != 0);
if (!imm_.IsFlushPending(options_.min_write_buffer_number_to_merge)) { if (!imm_.IsFlushPending()) {
Log(options_.info_log, "FlushMemTableToOutputFile already in progress"); Log(options_.info_log, "FlushMemTableToOutputFile already in progress");
Status s = Status::IOError("FlushMemTableToOutputFile already in progress"); Status s = Status::IOError("FlushMemTableToOutputFile already in progress");
return s; return s;
@ -1762,8 +1763,7 @@ void DBImpl::MaybeScheduleFlushOrCompaction() {
} else if (shutting_down_.Acquire_Load()) { } else if (shutting_down_.Acquire_Load()) {
// DB is being deleted; no more background compactions // DB is being deleted; no more background compactions
} else { } else {
bool is_flush_pending = bool is_flush_pending = imm_.IsFlushPending();
imm_.IsFlushPending(options_.min_write_buffer_number_to_merge);
if (is_flush_pending && if (is_flush_pending &&
(bg_flush_scheduled_ < options_.max_background_flushes)) { (bg_flush_scheduled_ < options_.max_background_flushes)) {
// memtable flush needed // memtable flush needed
@ -1798,8 +1798,7 @@ void DBImpl::BGWorkCompaction(void* db) {
Status DBImpl::BackgroundFlush(bool* madeProgress, Status DBImpl::BackgroundFlush(bool* madeProgress,
DeletionState& deletion_state) { DeletionState& deletion_state) {
Status stat; Status stat;
while (stat.ok() && while (stat.ok() && imm_.IsFlushPending()) {
imm_.IsFlushPending(options_.min_write_buffer_number_to_merge)) {
Log(options_.info_log, Log(options_.info_log,
"BackgroundCallFlush doing FlushMemTableToOutputFile, flush slots available %d", "BackgroundCallFlush doing FlushMemTableToOutputFile, flush slots available %d",
options_.max_background_flushes - bg_flush_scheduled_); options_.max_background_flushes - bg_flush_scheduled_);
@ -1919,7 +1918,7 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress,
mutex_.AssertHeld(); mutex_.AssertHeld();
// TODO: remove memtable flush from formal compaction // TODO: remove memtable flush from formal compaction
while (imm_.IsFlushPending(options_.min_write_buffer_number_to_merge)) { while (imm_.IsFlushPending()) {
Log(options_.info_log, Log(options_.info_log,
"BackgroundCompaction doing FlushMemTableToOutputFile, compaction slots " "BackgroundCompaction doing FlushMemTableToOutputFile, compaction slots "
"available %d", "available %d",
@ -2325,7 +2324,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
const uint64_t imm_start = env_->NowMicros(); const uint64_t imm_start = env_->NowMicros();
LogFlush(options_.info_log); LogFlush(options_.info_log);
mutex_.Lock(); mutex_.Lock();
if (imm_.IsFlushPending(options_.min_write_buffer_number_to_merge)) { if (imm_.IsFlushPending()) {
FlushMemTableToOutputFile(nullptr, deletion_state); FlushMemTableToOutputFile(nullptr, deletion_state);
bg_cv_.SignalAll(); // Wakeup MakeRoomForWrite() if necessary bg_cv_.SignalAll(); // Wakeup MakeRoomForWrite() if necessary
} }
@ -2658,8 +2657,9 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
namespace { namespace {
struct IterState { struct IterState {
port::Mutex* mu; port::Mutex* mu;
Version* version; Version* version = nullptr;
std::vector<MemTable*> mem; // includes both mem_ and imm_ MemTable* mem = nullptr;
MemTableListVersion* imm = nullptr;
DBImpl *db; DBImpl *db;
}; };
@ -2668,15 +2668,16 @@ static void CleanupIteratorState(void* arg1, void* arg2) {
DBImpl::DeletionState deletion_state(state->db->GetOptions(). DBImpl::DeletionState deletion_state(state->db->GetOptions().
max_write_buffer_number); max_write_buffer_number);
state->mu->Lock(); state->mu->Lock();
for (unsigned int i = 0; i < state->mem.size(); i++) { MemTable* m = state->mem->Unref();
MemTable* m = state->mem[i]->Unref(); if (m != nullptr) {
if (m != nullptr) { deletion_state.memtables_to_free.push_back(m);
deletion_state.memtables_to_free.push_back(m);
}
} }
if (state->version) { // not set for memtable-only iterator if (state->version) { // not set for memtable-only iterator
state->version->Unref(); state->version->Unref();
} }
if (state->imm) { // not set for memtable-only iterator
state->imm->Unref(&deletion_state.memtables_to_free);
}
// fast path FindObsoleteFiles // fast path FindObsoleteFiles
state->db->FindObsoleteFiles(deletion_state, false, true); state->db->FindObsoleteFiles(deletion_state, false, true);
state->mu->Unlock(); state->mu->Unlock();
@ -2690,7 +2691,7 @@ Iterator* DBImpl::NewInternalIterator(const ReadOptions& options,
SequenceNumber* latest_snapshot) { SequenceNumber* latest_snapshot) {
IterState* cleanup = new IterState; IterState* cleanup = new IterState;
MemTable* mutable_mem; MemTable* mutable_mem;
std::vector<MemTable*> immutables; MemTableListVersion* immutable_mems;
Version* version; Version* version;
// Collect together all needed child iterators for mem // Collect together all needed child iterators for mem
@ -2699,27 +2700,22 @@ Iterator* DBImpl::NewInternalIterator(const ReadOptions& options,
mem_->Ref(); mem_->Ref();
mutable_mem = mem_; mutable_mem = mem_;
// Collect together all needed child iterators for imm_ // Collect together all needed child iterators for imm_
imm_.GetMemTables(&immutables); immutable_mems = imm_.current();
for (unsigned int i = 0; i < immutables.size(); i++) { immutable_mems->Ref();
immutables[i]->Ref();
}
versions_->current()->Ref(); versions_->current()->Ref();
version = versions_->current(); version = versions_->current();
mutex_.Unlock(); mutex_.Unlock();
std::vector<Iterator*> list; std::vector<Iterator*> iterator_list;
list.push_back(mutable_mem->NewIterator(options)); iterator_list.push_back(mutable_mem->NewIterator(options));
cleanup->mem.push_back(mutable_mem); cleanup->mem = mutable_mem;
cleanup->imm = immutable_mems;
// Collect all needed child iterators for immutable memtables // Collect all needed child iterators for immutable memtables
for (MemTable* m : immutables) { immutable_mems->AddIterators(options, &iterator_list);
list.push_back(m->NewIterator(options));
cleanup->mem.push_back(m);
}
// Collect iterators for files in L0 - Ln // Collect iterators for files in L0 - Ln
version->AddIterators(options, storage_options_, &list); version->AddIterators(options, storage_options_, &iterator_list);
Iterator* internal_iter = Iterator* internal_iter = NewMergingIterator(
NewMergingIterator(&internal_comparator_, &list[0], list.size()); &internal_comparator_, &iterator_list[0], iterator_list.size());
cleanup->version = version; cleanup->version = version;
cleanup->mu = &mutex_; cleanup->mu = &mutex_;
cleanup->db = this; cleanup->db = this;
@ -2738,19 +2734,15 @@ std::pair<Iterator*, Iterator*> DBImpl::GetTailingIteratorPair(
uint64_t* superversion_number) { uint64_t* superversion_number) {
MemTable* mutable_mem; MemTable* mutable_mem;
std::vector<MemTable*> immutables; MemTableListVersion* immutable_mems;
Version* version; Version* version;
immutables.reserve(options_.max_write_buffer_number);
// get all child iterators and bump their refcounts under lock // get all child iterators and bump their refcounts under lock
mutex_.Lock(); mutex_.Lock();
mutable_mem = mem_; mutable_mem = mem_;
mutable_mem->Ref(); mutable_mem->Ref();
imm_.GetMemTables(&immutables); immutable_mems = imm_.current();
for (size_t i = 0; i < immutables.size(); ++i) { immutable_mems->Ref();
immutables[i]->Ref();
}
version = versions_->current(); version = versions_->current();
version->Ref(); version->Ref();
if (superversion_number != nullptr) { if (superversion_number != nullptr) {
@ -2760,7 +2752,7 @@ std::pair<Iterator*, Iterator*> DBImpl::GetTailingIteratorPair(
Iterator* mutable_iter = mutable_mem->NewIterator(options); Iterator* mutable_iter = mutable_mem->NewIterator(options);
IterState* mutable_cleanup = new IterState(); IterState* mutable_cleanup = new IterState();
mutable_cleanup->mem.push_back(mutable_mem); mutable_cleanup->mem = mutable_mem;
mutable_cleanup->db = this; mutable_cleanup->db = this;
mutable_cleanup->mu = &mutex_; mutable_cleanup->mu = &mutex_;
mutable_iter->RegisterCleanup(CleanupIteratorState, mutable_cleanup, nullptr); mutable_iter->RegisterCleanup(CleanupIteratorState, mutable_cleanup, nullptr);
@ -2772,10 +2764,8 @@ std::pair<Iterator*, Iterator*> DBImpl::GetTailingIteratorPair(
Iterator* immutable_iter; Iterator* immutable_iter;
IterState* immutable_cleanup = new IterState(); IterState* immutable_cleanup = new IterState();
std::vector<Iterator*> list; std::vector<Iterator*> list;
for (MemTable* m : immutables) { immutable_mems->AddIterators(options, &list);
list.push_back(m->NewIterator(options)); immutable_cleanup->imm = immutable_mems;
immutable_cleanup->mem.push_back(m);
}
version->AddIterators(options, storage_options_, &list); version->AddIterators(options, storage_options_, &list);
immutable_cleanup->version = version; immutable_cleanup->version = version;
immutable_cleanup->db = this; immutable_cleanup->db = this;
@ -2832,7 +2822,7 @@ void DBImpl::InstallSuperVersion(DeletionState& deletion_state) {
DBImpl::SuperVersion* DBImpl::InstallSuperVersion( DBImpl::SuperVersion* DBImpl::InstallSuperVersion(
SuperVersion* new_superversion) { SuperVersion* new_superversion) {
mutex_.AssertHeld(); mutex_.AssertHeld();
new_superversion->Init(mem_, imm_, versions_->current()); new_superversion->Init(mem_, imm_.current(), versions_->current());
SuperVersion* old_superversion = super_version_; SuperVersion* old_superversion = super_version_;
super_version_ = new_superversion; super_version_ = new_superversion;
++super_version_number_; ++super_version_number_;
@ -2875,7 +2865,7 @@ Status DBImpl::GetImpl(const ReadOptions& options,
if (get_version->mem->Get(lkey, value, &s, merge_context, options_)) { if (get_version->mem->Get(lkey, value, &s, merge_context, options_)) {
// Done // Done
RecordTick(options_.statistics.get(), MEMTABLE_HIT); RecordTick(options_.statistics.get(), MEMTABLE_HIT);
} else if (get_version->imm.Get(lkey, value, &s, merge_context, options_)) { } else if (get_version->imm->Get(lkey, value, &s, merge_context, options_)) {
// Done // Done
RecordTick(options_.statistics.get(), MEMTABLE_HIT); RecordTick(options_.statistics.get(), MEMTABLE_HIT);
} else { } else {
@ -2930,10 +2920,10 @@ std::vector<Status> DBImpl::MultiGet(const ReadOptions& options,
} }
MemTable* mem = mem_; MemTable* mem = mem_;
MemTableList imm = imm_; MemTableListVersion* imm = imm_.current();
Version* current = versions_->current(); Version* current = versions_->current();
mem->Ref(); mem->Ref();
imm.RefAll(); imm->Ref();
current->Ref(); current->Ref();
// Unlock while reading from files and memtables // Unlock while reading from files and memtables
@ -2965,7 +2955,7 @@ std::vector<Status> DBImpl::MultiGet(const ReadOptions& options,
LookupKey lkey(keys[i], snapshot); LookupKey lkey(keys[i], snapshot);
if (mem->Get(lkey, value, &s, merge_context, options_)) { if (mem->Get(lkey, value, &s, merge_context, options_)) {
// Done // Done
} else if (imm.Get(lkey, value, &s, merge_context, options_)) { } else if (imm->Get(lkey, value, &s, merge_context, options_)) {
// Done // Done
} else { } else {
current->Get(options, lkey, value, &s, &merge_context, &stats, options_); current->Get(options, lkey, value, &s, &merge_context, &stats, options_);
@ -2984,7 +2974,7 @@ std::vector<Status> DBImpl::MultiGet(const ReadOptions& options,
MaybeScheduleFlushOrCompaction(); MaybeScheduleFlushOrCompaction();
} }
MemTable* m = mem->Unref(); MemTable* m = mem->Unref();
imm.UnrefAll(&to_delete); imm->Unref(&to_delete);
current->Unref(); current->Unref();
mutex_.Unlock(); mutex_.Unlock();

View File

@ -140,10 +140,10 @@ class DBImpl : public DB {
// holds references to memtable, all immutable memtables and version // holds references to memtable, all immutable memtables and version
struct SuperVersion { struct SuperVersion {
MemTable* mem; MemTable* mem;
MemTableList imm; MemTableListVersion* imm;
Version* current; Version* current;
std::atomic<uint32_t> refs; std::atomic<uint32_t> refs;
// We need to_delete because during Cleanup(), imm.UnrefAll() returns // We need to_delete because during Cleanup(), imm->Unref() returns
// all memtables that we need to free through this vector. We then // all memtables that we need to free through this vector. We then
// delete all those memtables outside of mutex, during destruction // delete all those memtables outside of mutex, during destruction
std::vector<MemTable*> to_delete; std::vector<MemTable*> to_delete;
@ -161,7 +161,7 @@ class DBImpl : public DB {
// that needs to be deleted in to_delete vector. Unrefing those // that needs to be deleted in to_delete vector. Unrefing those
// objects needs to be done in the mutex // objects needs to be done in the mutex
void Cleanup(); void Cleanup();
void Init(MemTable* new_mem, const MemTableList& new_imm, void Init(MemTable* new_mem, MemTableListVersion* new_imm,
Version* new_current); Version* new_current);
}; };

View File

@ -16,41 +16,85 @@ namespace rocksdb {
class InternalKeyComparator; class InternalKeyComparator;
class Mutex; class Mutex;
class MemTableListIterator;
class VersionSet; class VersionSet;
using std::list; MemTableListVersion::MemTableListVersion(MemTableListVersion* old) {
if (old != nullptr) {
// Increase reference count on all underling memtables memlist_ = old->memlist_;
void MemTableList::RefAll() { size_ = old->size_;
for (auto &memtable : memlist_) { for (auto& m : memlist_) {
memtable->Ref(); m->Ref();
}
}
// Drop reference count on all underling memtables. If the
// refcount of an underlying memtable drops to zero, then
// return it in to_delete vector.
void MemTableList::UnrefAll(std::vector<MemTable*>* to_delete) {
for (auto &memtable : memlist_) {
MemTable* m = memtable->Unref();
if (m != nullptr) {
to_delete->push_back(m);
} }
} }
} }
void MemTableListVersion::Ref() { ++refs_; }
void MemTableListVersion::Unref(std::vector<MemTable*>* to_delete) {
--refs_;
if (refs_ == 0) {
// if to_delete is equal to nullptr it means we're confident
// that refs_ will not be zero
assert(to_delete != nullptr);
for (const auto& m : memlist_) {
MemTable* x = m->Unref();
if (x != nullptr) {
to_delete->push_back(x);
}
}
delete this;
}
}
int MemTableListVersion::size() const { return size_; }
// Returns the total number of memtables in the list // Returns the total number of memtables in the list
int MemTableList::size() { int MemTableList::size() const {
assert(num_flush_not_started_ <= size_); assert(num_flush_not_started_ <= current_->size_);
return size_; return current_->size_;
}
// Search all the memtables starting from the most recent one.
// Return the most recent value found, if any.
// Operands stores the list of merge operations to apply, so far.
bool MemTableListVersion::Get(const LookupKey& key, std::string* value,
Status* s, MergeContext& merge_context,
const Options& options) {
for (auto& memtable : memlist_) {
if (memtable->Get(key, value, s, merge_context, options)) {
return true;
}
}
return false;
}
void MemTableListVersion::AddIterators(const ReadOptions& options,
std::vector<Iterator*>* iterator_list) {
for (auto& m : memlist_) {
iterator_list->push_back(m->NewIterator(options));
}
}
void MemTableListVersion::Add(MemTable* m) {
assert(refs_ == 1); // only when refs_ == 1 is MemTableListVersion mutable
m->Ref();
memlist_.push_front(m);
++size_;
}
void MemTableListVersion::Remove(MemTable* m) {
assert(refs_ == 1); // only when refs_ == 1 is MemTableListVersion mutable
MemTable* x __attribute__((unused)) = m->Unref();
assert(x == nullptr); // it still needs to be alive!
memlist_.remove(m);
--size_;
} }
// Returns true if there is at least one memtable on which flush has // Returns true if there is at least one memtable on which flush has
// not yet started. // not yet started.
bool MemTableList::IsFlushPending(int min_write_buffer_number_to_merge) { bool MemTableList::IsFlushPending() {
if ((flush_requested_ && num_flush_not_started_ >= 1) || if ((flush_requested_ && num_flush_not_started_ >= 1) ||
(num_flush_not_started_ >= min_write_buffer_number_to_merge)) { (num_flush_not_started_ >= min_write_buffer_number_to_merge_)) {
assert(imm_flush_needed.NoBarrier_Load() != nullptr); assert(imm_flush_needed.NoBarrier_Load() != nullptr);
return true; return true;
} }
@ -59,7 +103,8 @@ bool MemTableList::IsFlushPending(int min_write_buffer_number_to_merge) {
// Returns the memtables that need to be flushed. // Returns the memtables that need to be flushed.
void MemTableList::PickMemtablesToFlush(std::vector<MemTable*>* ret) { void MemTableList::PickMemtablesToFlush(std::vector<MemTable*>* ret) {
for (auto it = memlist_.rbegin(); it != memlist_.rend(); it++) { const auto& memlist = current_->memlist_;
for (auto it = memlist.rbegin(); it != memlist.rend(); ++it) {
MemTable* m = *it; MemTable* m = *it;
if (!m->flush_in_progress_) { if (!m->flush_in_progress_) {
assert(!m->flush_completed_); assert(!m->flush_completed_);
@ -122,8 +167,8 @@ Status MemTableList::InstallMemtableFlushResults(
// scan all memtables from the earliest, and commit those // scan all memtables from the earliest, and commit those
// (in that order) that have finished flushing. Memetables // (in that order) that have finished flushing. Memetables
// are always committed in the order that they were created. // are always committed in the order that they were created.
while (!memlist_.empty() && s.ok()) { while (!current_->memlist_.empty() && s.ok()) {
MemTable* m = memlist_.back(); // get the last element MemTable* m = current_->memlist_.back(); // get the last element
if (!m->flush_completed_) { if (!m->flush_completed_) {
break; break;
} }
@ -135,6 +180,10 @@ Status MemTableList::InstallMemtableFlushResults(
// this can release and reacquire the mutex. // this can release and reacquire the mutex.
s = vset->LogAndApply(&m->edit_, mu); s = vset->LogAndApply(&m->edit_, mu);
// we will be changing the version in the next code path,
// so we better create a new one, since versions are immutable
InstallNewVersion();
// All the later memtables that have the same filenum // All the later memtables that have the same filenum
// are part of the same batch. They can be committed now. // are part of the same batch. They can be committed now.
uint64_t mem_id = 1; // how many memtables has been flushed. uint64_t mem_id = 1; // how many memtables has been flushed.
@ -144,7 +193,7 @@ Status MemTableList::InstallMemtableFlushResults(
"Level-0 commit table #%lu: memtable #%lu done", "Level-0 commit table #%lu: memtable #%lu done",
(unsigned long)m->file_number_, (unsigned long)m->file_number_,
(unsigned long)mem_id); (unsigned long)mem_id);
memlist_.remove(m); current_->Remove(m);
assert(m->file_number_ > 0); assert(m->file_number_ > 0);
// pending_outputs can be cleared only after the newly created file // pending_outputs can be cleared only after the newly created file
@ -155,7 +204,6 @@ Status MemTableList::InstallMemtableFlushResults(
if (m->Unref() != nullptr) { if (m->Unref() != nullptr) {
to_delete->push_back(m); to_delete->push_back(m);
} }
size_--;
} else { } else {
//commit failed. setup state so that we can flush again. //commit failed. setup state so that we can flush again.
Log(info_log, Log(info_log,
@ -172,7 +220,7 @@ Status MemTableList::InstallMemtableFlushResults(
s = Status::IOError("Unable to commit flushed memtable"); s = Status::IOError("Unable to commit flushed memtable");
} }
++mem_id; ++mem_id;
} while (!memlist_.empty() && (m = memlist_.back()) && } while (!current_->memlist_.empty() && (m = current_->memlist_.back()) &&
m->file_number_ == file_number); m->file_number_ == file_number);
} }
commit_in_progress_ = false; commit_in_progress_ = false;
@ -181,9 +229,9 @@ Status MemTableList::InstallMemtableFlushResults(
// New memtables are inserted at the front of the list. // New memtables are inserted at the front of the list.
void MemTableList::Add(MemTable* m) { void MemTableList::Add(MemTable* m) {
assert(size_ >= num_flush_not_started_); assert(current_->size_ >= num_flush_not_started_);
size_++; InstallNewVersion();
memlist_.push_front(m); current_->Add(m);
m->MarkImmutable(); m->MarkImmutable();
num_flush_not_started_++; num_flush_not_started_++;
if (num_flush_not_started_ == 1) { if (num_flush_not_started_ == 1) {
@ -194,28 +242,20 @@ void MemTableList::Add(MemTable* m) {
// Returns an estimate of the number of bytes of data in use. // Returns an estimate of the number of bytes of data in use.
size_t MemTableList::ApproximateMemoryUsage() { size_t MemTableList::ApproximateMemoryUsage() {
size_t size = 0; size_t size = 0;
for (auto &memtable : memlist_) { for (auto& memtable : current_->memlist_) {
size += memtable->ApproximateMemoryUsage(); size += memtable->ApproximateMemoryUsage();
} }
return size; return size;
} }
// Search all the memtables starting from the most recent one. void MemTableList::InstallNewVersion() {
// Return the most recent value found, if any. if (current_->refs_ == 1) {
// Operands stores the list of merge operations to apply, so far. // we're the only one using the version, just keep using it
bool MemTableList::Get(const LookupKey& key, std::string* value, Status* s, } else {
MergeContext& merge_context, const Options& options) { // somebody else holds the current version, we need to create new one
for (auto &memtable : memlist_) { MemTableListVersion* version = current_;
if (memtable->Get(key, value, s, merge_context, options)) { current_ = new MemTableListVersion(current_);
return true; version->Unref();
}
}
return false;
}
void MemTableList::GetMemTables(std::vector<MemTable*>* output) {
for (auto &memtable : memlist_) {
output->push_back(memtable);
} }
} }

View File

@ -7,8 +7,10 @@
#pragma once #pragma once
#include <string> #include <string>
#include <list> #include <list>
#include <deque> #include <vector>
#include "rocksdb/db.h" #include "rocksdb/db.h"
#include "rocksdb/options.h"
#include "rocksdb/iterator.h"
#include "db/dbformat.h" #include "db/dbformat.h"
#include "db/skiplist.h" #include "db/skiplist.h"
#include "memtable.h" #include "memtable.h"
@ -17,44 +19,71 @@ namespace rocksdb {
class InternalKeyComparator; class InternalKeyComparator;
class Mutex; class Mutex;
class MemTableListIterator;
// // keeps a list of immutable memtables in a vector. the list is immutable
// if refcount is bigger than one. It is used as a state for Get() and
// Iterator code paths
class MemTableListVersion {
public:
explicit MemTableListVersion(MemTableListVersion* old = nullptr);
void Ref();
void Unref(std::vector<MemTable*>* to_delete = nullptr);
int size() const;
// Search all the memtables starting from the most recent one.
// Return the most recent value found, if any.
bool Get(const LookupKey& key, std::string* value, Status* s,
MergeContext& merge_context, const Options& options);
void AddIterators(const ReadOptions& options,
std::vector<Iterator*>* iterator_list);
// REQUIRE: m is mutable memtable
void Add(MemTable* m);
// REQUIRE: m is mutable memtable
void Remove(MemTable* m);
private:
friend class MemTableList;
std::list<MemTable*> memlist_;
int size_ = 0;
int refs_ = 1;
};
// This class stores references to all the immutable memtables. // This class stores references to all the immutable memtables.
// The memtables are flushed to L0 as soon as possible and in // The memtables are flushed to L0 as soon as possible and in
// any order. If there are more than one immutable memtable, their // any order. If there are more than one immutable memtable, their
// flushes can occur concurrently. However, they are 'committed' // flushes can occur concurrently. However, they are 'committed'
// to the manifest in FIFO order to maintain correctness and // to the manifest in FIFO order to maintain correctness and
// recoverability from a crash. // recoverability from a crash.
//
class MemTableList { class MemTableList {
public: public:
// A list of memtables. // A list of memtables.
MemTableList() : size_(0), num_flush_not_started_(0), explicit MemTableList(int min_write_buffer_number_to_merge)
commit_in_progress_(false), : min_write_buffer_number_to_merge_(min_write_buffer_number_to_merge),
flush_requested_(false) { current_(new MemTableListVersion()),
num_flush_not_started_(0),
commit_in_progress_(false),
flush_requested_(false) {
imm_flush_needed.Release_Store(nullptr); imm_flush_needed.Release_Store(nullptr);
current_->Ref();
} }
~MemTableList() {}; ~MemTableList() {}
MemTableListVersion* current() { return current_; }
// so that backgrund threads can detect non-nullptr pointer to // so that backgrund threads can detect non-nullptr pointer to
// determine whether this is anything more to start flushing. // determine whether this is anything more to start flushing.
port::AtomicPointer imm_flush_needed; port::AtomicPointer imm_flush_needed;
// Increase reference count on all underling memtables
void RefAll();
// Drop reference count on all underling memtables. If the refcount
// on an underlying memtable drops to zero, then return it in
// to_delete vector.
void UnrefAll(std::vector<MemTable*>* to_delete);
// Returns the total number of memtables in the list // Returns the total number of memtables in the list
int size(); int size() const;
// Returns true if there is at least one memtable on which flush has // Returns true if there is at least one memtable on which flush has
// not yet started. // not yet started.
bool IsFlushPending(int min_write_buffer_number_to_merge); bool IsFlushPending();
// Returns the earliest memtables that needs to be flushed. The returned // Returns the earliest memtables that needs to be flushed. The returned
// memtables are guaranteed to be in the ascending order of created time. // memtables are guaranteed to be in the ascending order of created time.
@ -75,14 +104,6 @@ class MemTableList {
// Returns an estimate of the number of bytes of data in use. // Returns an estimate of the number of bytes of data in use.
size_t ApproximateMemoryUsage(); size_t ApproximateMemoryUsage();
// Search all the memtables starting from the most recent one.
// Return the most recent value found, if any.
bool Get(const LookupKey& key, std::string* value, Status* s,
MergeContext& merge_context, const Options& options);
// Returns the list of underlying memtables.
void GetMemTables(std::vector<MemTable*>* list);
// Request a flush of all existing memtables to storage // Request a flush of all existing memtables to storage
void FlushRequested() { flush_requested_ = true; } void FlushRequested() { flush_requested_ = true; }
@ -91,8 +112,12 @@ class MemTableList {
// void operator=(const MemTableList&); // void operator=(const MemTableList&);
private: private:
std::list<MemTable*> memlist_; // DB mutex held
int size_; void InstallNewVersion();
int min_write_buffer_number_to_merge_;
MemTableListVersion* current_;
// the number of elements that still need flushing // the number of elements that still need flushing
int num_flush_not_started_; int num_flush_not_started_;