rocksdb/db/memtable_list.cc
agiardullo c815351038 Support saving history in memtable_list
Summary:
For transactions, we are using the memtables to validate that there are no write conflicts.  But after flushing, we don't have any memtables, and transactions could fail to commit.  So we want to someone keep around some extra history to use for conflict checking.  In addition, we want to provide a way to increase the size of this history if too many transactions fail to commit.

After chatting with people, it seems like everyone prefers just using Memtables to store this history (instead of a separate history structure).  It seems like the best place for this is abstracted inside the memtable_list.  I decide to create a separate list in MemtableListVersion as using the same list complicated the flush/installalflushresults logic too much.

This diff adds a new parameter to control how much memtable history to keep around after flushing.  However, it sounds like people aren't too fond of adding new parameters.  So I am making the default size of flushed+not-flushed memtables be set to max_write_buffers.  This should not change the maximum amount of memory used, but make it more likely we're using closer the the limit.  (We are now postponing deleting flushed memtables until the max_write_buffer limit is reached).  So while we might use more memory on average, we are still obeying the limit set (and you could argue it's better to go ahead and use up memory now instead of waiting for a write stall to happen to test this limit).

However, if people are opposed to this default behavior, we can easily set it to 0 and require this parameter be set in order to use transactions.

Test Plan: Added a xfunc test to play around with setting different values of this parameter in all tests.  Added testing in memtablelist_test and planning on adding more testing here.

Reviewers: sdong, rven, igor

Reviewed By: igor

Subscribers: dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D37443
2015-05-28 16:34:24 -07:00

347 lines
11 KiB
C++

// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
#include "db/memtable_list.h"
#ifndef __STDC_FORMAT_MACROS
#define __STDC_FORMAT_MACROS
#endif
#include <inttypes.h>
#include <string>
#include "rocksdb/db.h"
#include "db/memtable.h"
#include "db/version_set.h"
#include "rocksdb/env.h"
#include "rocksdb/iterator.h"
#include "table/merger.h"
#include "util/coding.h"
#include "util/log_buffer.h"
#include "util/thread_status_util.h"
namespace rocksdb {
class InternalKeyComparator;
class Mutex;
class VersionSet;
MemTableListVersion::MemTableListVersion(MemTableListVersion* old)
: max_write_buffer_number_to_maintain_(
old->max_write_buffer_number_to_maintain_) {
if (old != nullptr) {
memlist_ = old->memlist_;
for (auto& m : memlist_) {
m->Ref();
}
memlist_history_ = old->memlist_history_;
for (auto& m : memlist_history_) {
m->Ref();
}
}
}
MemTableListVersion::MemTableListVersion(
int max_write_buffer_number_to_maintain)
: max_write_buffer_number_to_maintain_(
max_write_buffer_number_to_maintain) {}
void MemTableListVersion::Ref() { ++refs_; }
void MemTableListVersion::Unref(autovector<MemTable*>* to_delete) {
assert(refs_ >= 1);
--refs_;
if (refs_ == 0) {
// if to_delete is equal to nullptr it means we're confident
// that refs_ will not be zero
assert(to_delete != nullptr);
for (const auto& m : memlist_) {
MemTable* x = m->Unref();
if (x != nullptr) {
to_delete->push_back(x);
}
}
for (const auto& m : memlist_history_) {
MemTable* x = m->Unref();
if (x != nullptr) {
to_delete->push_back(x);
}
}
delete this;
}
}
int MemTableList::NumNotFlushed() const {
int size = static_cast<int>(current_->memlist_.size());
assert(num_flush_not_started_ <= size);
return size;
}
int MemTableList::NumFlushed() const {
return static_cast<int>(current_->memlist_history_.size());
}
// Search all the memtables starting from the most recent one.
// Return the most recent value found, if any.
// Operands stores the list of merge operations to apply, so far.
bool MemTableListVersion::Get(const LookupKey& key, std::string* value,
Status* s, MergeContext* merge_context) {
for (auto& memtable : memlist_) {
if (memtable->Get(key, value, s, merge_context)) {
return true;
}
}
return false;
}
bool MemTableListVersion::GetFromHistory(const LookupKey& key,
std::string* value, Status* s,
MergeContext* merge_context) {
for (auto& memtable : memlist_history_) {
if (memtable->Get(key, value, s, merge_context)) {
return true;
}
}
return false;
}
void MemTableListVersion::AddIterators(const ReadOptions& options,
std::vector<Iterator*>* iterator_list,
Arena* arena) {
for (auto& m : memlist_) {
iterator_list->push_back(m->NewIterator(options, arena));
}
}
void MemTableListVersion::AddIterators(
const ReadOptions& options, MergeIteratorBuilder* merge_iter_builder) {
for (auto& m : memlist_) {
merge_iter_builder->AddIterator(
m->NewIterator(options, merge_iter_builder->GetArena()));
}
}
uint64_t MemTableListVersion::GetTotalNumEntries() const {
uint64_t total_num = 0;
for (auto& m : memlist_) {
total_num += m->num_entries();
}
return total_num;
}
uint64_t MemTableListVersion::GetTotalNumDeletes() const {
uint64_t total_num = 0;
for (auto& m : memlist_) {
total_num += m->num_deletes();
}
return total_num;
}
// caller is responsible for referencing m
void MemTableListVersion::Add(MemTable* m, autovector<MemTable*>* to_delete) {
assert(refs_ == 1); // only when refs_ == 1 is MemTableListVersion mutable
memlist_.push_front(m);
TrimHistory(to_delete);
}
// Removes m from list of memtables not flushed. Caller should NOT Unref m.
void MemTableListVersion::Remove(MemTable* m,
autovector<MemTable*>* to_delete) {
assert(refs_ == 1); // only when refs_ == 1 is MemTableListVersion mutable
memlist_.remove(m);
if (max_write_buffer_number_to_maintain_ > 0) {
memlist_history_.push_front(m);
TrimHistory(to_delete);
} else {
if (m->Unref()) {
to_delete->push_back(m);
}
}
}
// Make sure we don't use up too much space in history
void MemTableListVersion::TrimHistory(autovector<MemTable*>* to_delete) {
while (memlist_.size() + memlist_history_.size() >
static_cast<size_t>(max_write_buffer_number_to_maintain_) &&
!memlist_history_.empty()) {
MemTable* x = memlist_history_.back();
memlist_history_.pop_back();
if (x->Unref()) {
to_delete->push_back(x);
}
}
}
// Returns true if there is at least one memtable on which flush has
// not yet started.
bool MemTableList::IsFlushPending() const {
if ((flush_requested_ && num_flush_not_started_ >= 1) ||
(num_flush_not_started_ >= min_write_buffer_number_to_merge_)) {
assert(imm_flush_needed.load(std::memory_order_relaxed));
return true;
}
return false;
}
// Returns the memtables that need to be flushed.
void MemTableList::PickMemtablesToFlush(autovector<MemTable*>* ret) {
AutoThreadOperationStageUpdater stage_updater(
ThreadStatus::STAGE_PICK_MEMTABLES_TO_FLUSH);
const auto& memlist = current_->memlist_;
for (auto it = memlist.rbegin(); it != memlist.rend(); ++it) {
MemTable* m = *it;
if (!m->flush_in_progress_) {
assert(!m->flush_completed_);
num_flush_not_started_--;
if (num_flush_not_started_ == 0) {
imm_flush_needed.store(false, std::memory_order_release);
}
m->flush_in_progress_ = true; // flushing will start very soon
ret->push_back(m);
}
}
flush_requested_ = false; // start-flush request is complete
}
void MemTableList::RollbackMemtableFlush(const autovector<MemTable*>& mems,
uint64_t file_number) {
AutoThreadOperationStageUpdater stage_updater(
ThreadStatus::STAGE_MEMTABLE_ROLLBACK);
assert(!mems.empty());
// If the flush was not successful, then just reset state.
// Maybe a succeeding attempt to flush will be successful.
for (MemTable* m : mems) {
assert(m->flush_in_progress_);
assert(m->file_number_ == 0);
m->flush_in_progress_ = false;
m->flush_completed_ = false;
m->edit_.Clear();
num_flush_not_started_++;
}
imm_flush_needed.store(true, std::memory_order_release);
}
// Record a successful flush in the manifest file
Status MemTableList::InstallMemtableFlushResults(
ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options,
const autovector<MemTable*>& mems, VersionSet* vset, InstrumentedMutex* mu,
uint64_t file_number, autovector<MemTable*>* to_delete,
Directory* db_directory, LogBuffer* log_buffer) {
AutoThreadOperationStageUpdater stage_updater(
ThreadStatus::STAGE_MEMTABLE_INSTALL_FLUSH_RESULTS);
mu->AssertHeld();
// flush was successful
for (size_t i = 0; i < mems.size(); ++i) {
// All the edits are associated with the first memtable of this batch.
assert(i == 0 || mems[i]->GetEdits()->NumEntries() == 0);
mems[i]->flush_completed_ = true;
mems[i]->file_number_ = file_number;
}
// if some other thread is already committing, then return
Status s;
if (commit_in_progress_) {
return s;
}
// Only a single thread can be executing this piece of code
commit_in_progress_ = true;
// scan all memtables from the earliest, and commit those
// (in that order) that have finished flushing. Memetables
// are always committed in the order that they were created.
while (!current_->memlist_.empty() && s.ok()) {
MemTable* m = current_->memlist_.back(); // get the last element
if (!m->flush_completed_) {
break;
}
LogToBuffer(log_buffer, "[%s] Level-0 commit table #%" PRIu64 " started",
cfd->GetName().c_str(), m->file_number_);
// this can release and reacquire the mutex.
s = vset->LogAndApply(cfd, mutable_cf_options, &m->edit_, mu, db_directory);
// we will be changing the version in the next code path,
// so we better create a new one, since versions are immutable
InstallNewVersion();
// All the later memtables that have the same filenum
// are part of the same batch. They can be committed now.
uint64_t mem_id = 1; // how many memtables has been flushed.
do {
if (s.ok()) { // commit new state
LogToBuffer(log_buffer, "[%s] Level-0 commit table #%" PRIu64
": memtable #%" PRIu64 " done",
cfd->GetName().c_str(), m->file_number_, mem_id);
assert(m->file_number_ > 0);
current_->Remove(m, to_delete);
} else {
//commit failed. setup state so that we can flush again.
LogToBuffer(log_buffer, "Level-0 commit table #%" PRIu64
": memtable #%" PRIu64 " failed",
m->file_number_, mem_id);
m->flush_completed_ = false;
m->flush_in_progress_ = false;
m->edit_.Clear();
num_flush_not_started_++;
m->file_number_ = 0;
imm_flush_needed.store(true, std::memory_order_release);
}
++mem_id;
} while (!current_->memlist_.empty() && (m = current_->memlist_.back()) &&
m->file_number_ == file_number);
}
commit_in_progress_ = false;
return s;
}
// New memtables are inserted at the front of the list.
void MemTableList::Add(MemTable* m, autovector<MemTable*>* to_delete) {
assert(static_cast<int>(current_->memlist_.size()) >= num_flush_not_started_);
InstallNewVersion();
// this method is used to move mutable memtable into an immutable list.
// since mutable memtable is already refcounted by the DBImpl,
// and when moving to the imutable list we don't unref it,
// we don't have to ref the memtable here. we just take over the
// reference from the DBImpl.
current_->Add(m, to_delete);
m->MarkImmutable();
num_flush_not_started_++;
if (num_flush_not_started_ == 1) {
imm_flush_needed.store(true, std::memory_order_release);
}
}
// Returns an estimate of the number of bytes of data in use.
size_t MemTableList::ApproximateMemoryUsage() {
size_t total_size = 0;
for (auto& memtable : current_->memlist_) {
total_size += memtable->ApproximateMemoryUsage();
}
return total_size;
}
void MemTableList::InstallNewVersion() {
if (current_->refs_ == 1) {
// we're the only one using the version, just keep using it
} else {
// somebody else holds the current version, we need to create new one
MemTableListVersion* version = current_;
current_ = new MemTableListVersion(current_);
current_->Ref();
version->Unref();
}
}
} // namespace rocksdb