rocksdb/db/memtable_list.cc
sdong df9069d23f In DB::NewIterator(), try to allocate the whole iterator tree in an arena
Summary:
In this patch, try to allocate the whole iterator tree starting from DBIter from an arena
1. ArenaWrappedDBIter is created when serves as the entry point of an iterator tree, with an arena in it.
2. Add an option to create iterator from arena for following iterators: DBIter, MergingIterator, MemtableIterator, all mem table's iterators, all table reader's iterators and two level iterator.
3. MergeIteratorBuilder is created to incrementally build the tree of internal iterators. It is passed to mem table list and version set and add iterators to it.

Limitations:
(1) Only DB::NewIterator() without tailing uses the arena. Other cases, including readonly DB and compactions are still from malloc
(2) Two level iterator itself is allocated in arena, but not iterators inside it.

Test Plan: make all check

Reviewers: ljin, haobo

Reviewed By: haobo

Subscribers: leveldb, dhruba, yhchiang, igor

Differential Revision: https://reviews.facebook.net/D18513
2014-06-02 17:44:57 -07:00

287 lines
9.2 KiB
C++

// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
#include "db/memtable_list.h"
#include <string>
#include "rocksdb/db.h"
#include "db/memtable.h"
#include "db/version_set.h"
#include "rocksdb/env.h"
#include "rocksdb/iterator.h"
#include "table/merger.h"
#include "util/coding.h"
#include "util/log_buffer.h"
namespace rocksdb {
class InternalKeyComparator;
class Mutex;
class VersionSet;
MemTableListVersion::MemTableListVersion(MemTableListVersion* old) {
if (old != nullptr) {
memlist_ = old->memlist_;
size_ = old->size_;
for (auto& m : memlist_) {
m->Ref();
}
}
}
void MemTableListVersion::Ref() { ++refs_; }
void MemTableListVersion::Unref(autovector<MemTable*>* to_delete) {
assert(refs_ >= 1);
--refs_;
if (refs_ == 0) {
// if to_delete is equal to nullptr it means we're confident
// that refs_ will not be zero
assert(to_delete != nullptr);
for (const auto& m : memlist_) {
MemTable* x = m->Unref();
if (x != nullptr) {
to_delete->push_back(x);
}
}
delete this;
}
}
int MemTableListVersion::size() const { return size_; }
// Returns the total number of memtables in the list
int MemTableList::size() const {
assert(num_flush_not_started_ <= current_->size_);
return current_->size_;
}
// Search all the memtables starting from the most recent one.
// Return the most recent value found, if any.
// Operands stores the list of merge operations to apply, so far.
bool MemTableListVersion::Get(const LookupKey& key, std::string* value,
Status* s, MergeContext& merge_context,
const Options& options) {
for (auto& memtable : memlist_) {
if (memtable->Get(key, value, s, merge_context, options)) {
return true;
}
}
return false;
}
void MemTableListVersion::AddIterators(const ReadOptions& options,
std::vector<Iterator*>* iterator_list) {
for (auto& m : memlist_) {
iterator_list->push_back(m->NewIterator(options));
}
}
void MemTableListVersion::AddIterators(
const ReadOptions& options, MergeIteratorBuilder* merge_iter_builder) {
for (auto& m : memlist_) {
merge_iter_builder->AddIterator(
m->NewIterator(options, merge_iter_builder->GetArena()));
}
}
uint64_t MemTableListVersion::GetTotalNumEntries() const {
uint64_t total_num = 0;
for (auto& m : memlist_) {
total_num += m->GetNumEntries();
}
return total_num;
}
// caller is responsible for referencing m
void MemTableListVersion::Add(MemTable* m) {
assert(refs_ == 1); // only when refs_ == 1 is MemTableListVersion mutable
memlist_.push_front(m);
++size_;
}
// caller is responsible for unreferencing m
void MemTableListVersion::Remove(MemTable* m) {
assert(refs_ == 1); // only when refs_ == 1 is MemTableListVersion mutable
memlist_.remove(m);
--size_;
}
// Returns true if there is at least one memtable on which flush has
// not yet started.
bool MemTableList::IsFlushPending() const {
if ((flush_requested_ && num_flush_not_started_ >= 1) ||
(num_flush_not_started_ >= min_write_buffer_number_to_merge_)) {
assert(imm_flush_needed.NoBarrier_Load() != nullptr);
return true;
}
return false;
}
// Returns the memtables that need to be flushed.
void MemTableList::PickMemtablesToFlush(autovector<MemTable*>* ret) {
const auto& memlist = current_->memlist_;
for (auto it = memlist.rbegin(); it != memlist.rend(); ++it) {
MemTable* m = *it;
if (!m->flush_in_progress_) {
assert(!m->flush_completed_);
num_flush_not_started_--;
if (num_flush_not_started_ == 0) {
imm_flush_needed.Release_Store(nullptr);
}
m->flush_in_progress_ = true; // flushing will start very soon
ret->push_back(m);
}
}
flush_requested_ = false; // start-flush request is complete
}
void MemTableList::RollbackMemtableFlush(const autovector<MemTable*>& mems,
uint64_t file_number,
std::set<uint64_t>* pending_outputs) {
assert(!mems.empty());
// If the flush was not successful, then just reset state.
// Maybe a suceeding attempt to flush will be successful.
for (MemTable* m : mems) {
assert(m->flush_in_progress_);
assert(m->file_number_ == 0);
m->flush_in_progress_ = false;
m->flush_completed_ = false;
m->edit_.Clear();
num_flush_not_started_++;
}
pending_outputs->erase(file_number);
imm_flush_needed.Release_Store(reinterpret_cast<void *>(1));
}
// Record a successful flush in the manifest file
Status MemTableList::InstallMemtableFlushResults(
ColumnFamilyData* cfd, const autovector<MemTable*>& mems, VersionSet* vset,
port::Mutex* mu, Logger* info_log, uint64_t file_number,
std::set<uint64_t>& pending_outputs, autovector<MemTable*>* to_delete,
Directory* db_directory, LogBuffer* log_buffer) {
mu->AssertHeld();
// flush was sucessful
for (size_t i = 0; i < mems.size(); ++i) {
// All the edits are associated with the first memtable of this batch.
assert(i == 0 || mems[i]->GetEdits()->NumEntries() == 0);
mems[i]->flush_completed_ = true;
mems[i]->file_number_ = file_number;
}
// if some other thread is already commiting, then return
Status s;
if (commit_in_progress_) {
return s;
}
// Only a single thread can be executing this piece of code
commit_in_progress_ = true;
// scan all memtables from the earliest, and commit those
// (in that order) that have finished flushing. Memetables
// are always committed in the order that they were created.
while (!current_->memlist_.empty() && s.ok()) {
MemTable* m = current_->memlist_.back(); // get the last element
if (!m->flush_completed_) {
break;
}
LogToBuffer(log_buffer, "[%s] Level-0 commit table #%lu started",
cfd->GetName().c_str(), (unsigned long)m->file_number_);
// this can release and reacquire the mutex.
s = vset->LogAndApply(cfd, &m->edit_, mu, db_directory);
// we will be changing the version in the next code path,
// so we better create a new one, since versions are immutable
InstallNewVersion();
// All the later memtables that have the same filenum
// are part of the same batch. They can be committed now.
uint64_t mem_id = 1; // how many memtables has been flushed.
do {
if (s.ok()) { // commit new state
LogToBuffer(log_buffer,
"[%s] Level-0 commit table #%lu: memtable #%lu done",
cfd->GetName().c_str(), (unsigned long)m->file_number_,
(unsigned long)mem_id);
current_->Remove(m);
assert(m->file_number_ > 0);
// pending_outputs can be cleared only after the newly created file
// has been written to a committed version so that other concurrently
// executing compaction threads do not mistakenly assume that this
// file is not live.
pending_outputs.erase(m->file_number_);
if (m->Unref() != nullptr) {
to_delete->push_back(m);
}
} else {
//commit failed. setup state so that we can flush again.
Log(info_log,
"Level-0 commit table #%lu: memtable #%lu failed",
(unsigned long)m->file_number_,
(unsigned long)mem_id);
m->flush_completed_ = false;
m->flush_in_progress_ = false;
m->edit_.Clear();
num_flush_not_started_++;
pending_outputs.erase(m->file_number_);
m->file_number_ = 0;
imm_flush_needed.Release_Store((void *)1);
}
++mem_id;
} while (!current_->memlist_.empty() && (m = current_->memlist_.back()) &&
m->file_number_ == file_number);
}
commit_in_progress_ = false;
return s;
}
// New memtables are inserted at the front of the list.
void MemTableList::Add(MemTable* m) {
assert(current_->size_ >= num_flush_not_started_);
InstallNewVersion();
// this method is used to move mutable memtable into an immutable list.
// since mutable memtable is already refcounted by the DBImpl,
// and when moving to the imutable list we don't unref it,
// we don't have to ref the memtable here. we just take over the
// reference from the DBImpl.
current_->Add(m);
m->MarkImmutable();
num_flush_not_started_++;
if (num_flush_not_started_ == 1) {
imm_flush_needed.Release_Store((void *)1);
}
}
// Returns an estimate of the number of bytes of data in use.
size_t MemTableList::ApproximateMemoryUsage() {
size_t size = 0;
for (auto& memtable : current_->memlist_) {
size += memtable->ApproximateMemoryUsage();
}
return size;
}
void MemTableList::InstallNewVersion() {
if (current_->refs_ == 1) {
// we're the only one using the version, just keep using it
} else {
// somebody else holds the current version, we need to create new one
MemTableListVersion* version = current_;
current_ = new MemTableListVersion(current_);
current_->Ref();
version->Unref();
}
}
} // namespace rocksdb