rocksdb/db/memtable_list.cc
Yi Wu d1cab2b64e Add ValueType::kTypeBlobIndex
Summary:
Add kTypeBlobIndex value type, which will be used by blob db only, to insert a (key, blob_offset) KV pair. The purpose is to
1. Make it possible to open existing rocksdb instance as blob db. Existing value will be of kTypeIndex type, while value inserted by blob db will be of kTypeBlobIndex.
2. Make rocksdb able to detect if the db contains value written by blob db, if so return error.
3. Make it possible to have blob db optionally store value in SST file (with kTypeValue type) or as a blob value (with kTypeBlobIndex type).

The root db (DBImpl) basically pretended kTypeBlobIndex are normal value on write. On Get if is_blob is provided, return whether the value read is of kTypeBlobIndex type, or return Status::NotSupported() status if is_blob is not provided. On scan allow_blob flag is pass and if the flag is true, return wether the value is of kTypeBlobIndex type via iter->IsBlob().

Changes on blob db side will be in a separate patch.
Closes https://github.com/facebook/rocksdb/pull/2886

Differential Revision: D5838431

Pulled By: yiwu-arbug

fbshipit-source-id: 3c5306c62bc13bb11abc03422ec5cbcea1203cca
2017-10-03 09:11:23 -07:00

485 lines
16 KiB
C++

// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
#include "db/memtable_list.h"
#ifndef __STDC_FORMAT_MACROS
#define __STDC_FORMAT_MACROS
#endif
#include <inttypes.h>
#include <string>
#include "db/memtable.h"
#include "db/version_set.h"
#include "monitoring/thread_status_util.h"
#include "rocksdb/db.h"
#include "rocksdb/env.h"
#include "rocksdb/iterator.h"
#include "table/merging_iterator.h"
#include "util/coding.h"
#include "util/log_buffer.h"
#include "util/sync_point.h"
namespace rocksdb {
class InternalKeyComparator;
class Mutex;
class VersionSet;
void MemTableListVersion::AddMemTable(MemTable* m) {
memlist_.push_front(m);
*parent_memtable_list_memory_usage_ += m->ApproximateMemoryUsage();
}
void MemTableListVersion::UnrefMemTable(autovector<MemTable*>* to_delete,
MemTable* m) {
if (m->Unref()) {
to_delete->push_back(m);
assert(*parent_memtable_list_memory_usage_ >= m->ApproximateMemoryUsage());
*parent_memtable_list_memory_usage_ -= m->ApproximateMemoryUsage();
} else {
}
}
MemTableListVersion::MemTableListVersion(
size_t* parent_memtable_list_memory_usage, MemTableListVersion* old)
: max_write_buffer_number_to_maintain_(
old->max_write_buffer_number_to_maintain_),
parent_memtable_list_memory_usage_(parent_memtable_list_memory_usage) {
if (old != nullptr) {
memlist_ = old->memlist_;
for (auto& m : memlist_) {
m->Ref();
}
memlist_history_ = old->memlist_history_;
for (auto& m : memlist_history_) {
m->Ref();
}
}
}
MemTableListVersion::MemTableListVersion(
size_t* parent_memtable_list_memory_usage,
int max_write_buffer_number_to_maintain)
: max_write_buffer_number_to_maintain_(max_write_buffer_number_to_maintain),
parent_memtable_list_memory_usage_(parent_memtable_list_memory_usage) {}
void MemTableListVersion::Ref() { ++refs_; }
// called by superversion::clean()
void MemTableListVersion::Unref(autovector<MemTable*>* to_delete) {
assert(refs_ >= 1);
--refs_;
if (refs_ == 0) {
// if to_delete is equal to nullptr it means we're confident
// that refs_ will not be zero
assert(to_delete != nullptr);
for (const auto& m : memlist_) {
UnrefMemTable(to_delete, m);
}
for (const auto& m : memlist_history_) {
UnrefMemTable(to_delete, m);
}
delete this;
}
}
int MemTableList::NumNotFlushed() const {
int size = static_cast<int>(current_->memlist_.size());
assert(num_flush_not_started_ <= size);
return size;
}
int MemTableList::NumFlushed() const {
return static_cast<int>(current_->memlist_history_.size());
}
// Search all the memtables starting from the most recent one.
// Return the most recent value found, if any.
// Operands stores the list of merge operations to apply, so far.
bool MemTableListVersion::Get(const LookupKey& key, std::string* value,
Status* s, MergeContext* merge_context,
RangeDelAggregator* range_del_agg,
SequenceNumber* seq, const ReadOptions& read_opts,
ReadCallback* callback, bool* is_blob_index) {
return GetFromList(&memlist_, key, value, s, merge_context, range_del_agg,
seq, read_opts, callback, is_blob_index);
}
bool MemTableListVersion::GetFromHistory(const LookupKey& key,
std::string* value, Status* s,
MergeContext* merge_context,
RangeDelAggregator* range_del_agg,
SequenceNumber* seq,
const ReadOptions& read_opts) {
return GetFromList(&memlist_history_, key, value, s, merge_context,
range_del_agg, seq, read_opts);
}
bool MemTableListVersion::GetFromList(
std::list<MemTable*>* list, const LookupKey& key, std::string* value,
Status* s, MergeContext* merge_context, RangeDelAggregator* range_del_agg,
SequenceNumber* seq, const ReadOptions& read_opts, ReadCallback* callback,
bool* is_blob_index) {
*seq = kMaxSequenceNumber;
for (auto& memtable : *list) {
SequenceNumber current_seq = kMaxSequenceNumber;
bool done = memtable->Get(key, value, s, merge_context, range_del_agg,
&current_seq, read_opts, callback, is_blob_index);
if (*seq == kMaxSequenceNumber) {
// Store the most recent sequence number of any operation on this key.
// Since we only care about the most recent change, we only need to
// return the first operation found when searching memtables in
// reverse-chronological order.
// current_seq would be equal to kMaxSequenceNumber if the value was to be
// skipped. This allows seq to be assigned again when the next value is
// read.
*seq = current_seq;
}
if (done) {
assert(*seq != kMaxSequenceNumber);
return true;
}
if (!done && !s->ok() && !s->IsMergeInProgress() && !s->IsNotFound()) {
return false;
}
}
return false;
}
Status MemTableListVersion::AddRangeTombstoneIterators(
const ReadOptions& read_opts, Arena* arena,
RangeDelAggregator* range_del_agg) {
assert(range_del_agg != nullptr);
for (auto& m : memlist_) {
std::unique_ptr<InternalIterator> range_del_iter(
m->NewRangeTombstoneIterator(read_opts));
Status s = range_del_agg->AddTombstones(std::move(range_del_iter));
if (!s.ok()) {
return s;
}
}
return Status::OK();
}
Status MemTableListVersion::AddRangeTombstoneIterators(
const ReadOptions& read_opts,
std::vector<InternalIterator*>* range_del_iters) {
for (auto& m : memlist_) {
auto* range_del_iter = m->NewRangeTombstoneIterator(read_opts);
if (range_del_iter != nullptr) {
range_del_iters->push_back(range_del_iter);
}
}
return Status::OK();
}
void MemTableListVersion::AddIterators(
const ReadOptions& options, std::vector<InternalIterator*>* iterator_list,
Arena* arena) {
for (auto& m : memlist_) {
iterator_list->push_back(m->NewIterator(options, arena));
}
}
void MemTableListVersion::AddIterators(
const ReadOptions& options, MergeIteratorBuilder* merge_iter_builder) {
for (auto& m : memlist_) {
merge_iter_builder->AddIterator(
m->NewIterator(options, merge_iter_builder->GetArena()));
}
}
uint64_t MemTableListVersion::GetTotalNumEntries() const {
uint64_t total_num = 0;
for (auto& m : memlist_) {
total_num += m->num_entries();
}
return total_num;
}
MemTable::MemTableStats MemTableListVersion::ApproximateStats(
const Slice& start_ikey, const Slice& end_ikey) {
MemTable::MemTableStats total_stats = {0, 0};
for (auto& m : memlist_) {
auto mStats = m->ApproximateStats(start_ikey, end_ikey);
total_stats.size += mStats.size;
total_stats.count += mStats.count;
}
return total_stats;
}
uint64_t MemTableListVersion::GetTotalNumDeletes() const {
uint64_t total_num = 0;
for (auto& m : memlist_) {
total_num += m->num_deletes();
}
return total_num;
}
SequenceNumber MemTableListVersion::GetEarliestSequenceNumber(
bool include_history) const {
if (include_history && !memlist_history_.empty()) {
return memlist_history_.back()->GetEarliestSequenceNumber();
} else if (!memlist_.empty()) {
return memlist_.back()->GetEarliestSequenceNumber();
} else {
return kMaxSequenceNumber;
}
}
// caller is responsible for referencing m
void MemTableListVersion::Add(MemTable* m, autovector<MemTable*>* to_delete) {
assert(refs_ == 1); // only when refs_ == 1 is MemTableListVersion mutable
AddMemTable(m);
TrimHistory(to_delete);
}
// Removes m from list of memtables not flushed. Caller should NOT Unref m.
void MemTableListVersion::Remove(MemTable* m,
autovector<MemTable*>* to_delete) {
assert(refs_ == 1); // only when refs_ == 1 is MemTableListVersion mutable
memlist_.remove(m);
if (max_write_buffer_number_to_maintain_ > 0) {
memlist_history_.push_front(m);
TrimHistory(to_delete);
} else {
UnrefMemTable(to_delete, m);
}
}
// Make sure we don't use up too much space in history
void MemTableListVersion::TrimHistory(autovector<MemTable*>* to_delete) {
while (memlist_.size() + memlist_history_.size() >
static_cast<size_t>(max_write_buffer_number_to_maintain_) &&
!memlist_history_.empty()) {
MemTable* x = memlist_history_.back();
memlist_history_.pop_back();
UnrefMemTable(to_delete, x);
}
}
// Returns true if there is at least one memtable on which flush has
// not yet started.
bool MemTableList::IsFlushPending() const {
if ((flush_requested_ && num_flush_not_started_ >= 1) ||
(num_flush_not_started_ >= min_write_buffer_number_to_merge_)) {
assert(imm_flush_needed.load(std::memory_order_relaxed));
return true;
}
return false;
}
// Returns the memtables that need to be flushed.
void MemTableList::PickMemtablesToFlush(autovector<MemTable*>* ret) {
AutoThreadOperationStageUpdater stage_updater(
ThreadStatus::STAGE_PICK_MEMTABLES_TO_FLUSH);
const auto& memlist = current_->memlist_;
for (auto it = memlist.rbegin(); it != memlist.rend(); ++it) {
MemTable* m = *it;
if (!m->flush_in_progress_) {
assert(!m->flush_completed_);
num_flush_not_started_--;
if (num_flush_not_started_ == 0) {
imm_flush_needed.store(false, std::memory_order_release);
}
m->flush_in_progress_ = true; // flushing will start very soon
ret->push_back(m);
}
}
flush_requested_ = false; // start-flush request is complete
}
void MemTableList::RollbackMemtableFlush(const autovector<MemTable*>& mems,
uint64_t file_number) {
AutoThreadOperationStageUpdater stage_updater(
ThreadStatus::STAGE_MEMTABLE_ROLLBACK);
assert(!mems.empty());
// If the flush was not successful, then just reset state.
// Maybe a succeeding attempt to flush will be successful.
for (MemTable* m : mems) {
assert(m->flush_in_progress_);
assert(m->file_number_ == 0);
m->flush_in_progress_ = false;
m->flush_completed_ = false;
m->edit_.Clear();
num_flush_not_started_++;
}
imm_flush_needed.store(true, std::memory_order_release);
}
// Record a successful flush in the manifest file
Status MemTableList::InstallMemtableFlushResults(
ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options,
const autovector<MemTable*>& mems, VersionSet* vset, InstrumentedMutex* mu,
uint64_t file_number, autovector<MemTable*>* to_delete,
Directory* db_directory, LogBuffer* log_buffer) {
AutoThreadOperationStageUpdater stage_updater(
ThreadStatus::STAGE_MEMTABLE_INSTALL_FLUSH_RESULTS);
mu->AssertHeld();
// flush was successful
for (size_t i = 0; i < mems.size(); ++i) {
// All the edits are associated with the first memtable of this batch.
assert(i == 0 || mems[i]->GetEdits()->NumEntries() == 0);
mems[i]->flush_completed_ = true;
mems[i]->file_number_ = file_number;
}
// if some other thread is already committing, then return
Status s;
if (commit_in_progress_) {
TEST_SYNC_POINT("MemTableList::InstallMemtableFlushResults:InProgress");
return s;
}
// Only a single thread can be executing this piece of code
commit_in_progress_ = true;
// Retry until all completed flushes are committed. New flushes can finish
// while the current thread is writing manifest where mutex is released.
while (s.ok()) {
auto& memlist = current_->memlist_;
if (memlist.empty() || !memlist.back()->flush_completed_) {
break;
}
// scan all memtables from the earliest, and commit those
// (in that order) that have finished flushing. Memetables
// are always committed in the order that they were created.
uint64_t batch_file_number = 0;
size_t batch_count = 0;
autovector<VersionEdit*> edit_list;
// enumerate from the last (earliest) element to see how many batch finished
for (auto it = memlist.rbegin(); it != memlist.rend(); ++it) {
MemTable* m = *it;
if (!m->flush_completed_) {
break;
}
if (it == memlist.rbegin() || batch_file_number != m->file_number_) {
batch_file_number = m->file_number_;
ROCKS_LOG_BUFFER(log_buffer,
"[%s] Level-0 commit table #%" PRIu64 " started",
cfd->GetName().c_str(), m->file_number_);
edit_list.push_back(&m->edit_);
}
batch_count++;
}
if (batch_count > 0) {
// this can release and reacquire the mutex.
s = vset->LogAndApply(cfd, mutable_cf_options, edit_list, mu,
db_directory);
// we will be changing the version in the next code path,
// so we better create a new one, since versions are immutable
InstallNewVersion();
// All the later memtables that have the same filenum
// are part of the same batch. They can be committed now.
uint64_t mem_id = 1; // how many memtables have been flushed.
if (s.ok()) { // commit new state
while (batch_count-- > 0) {
MemTable* m = current_->memlist_.back();
ROCKS_LOG_BUFFER(log_buffer, "[%s] Level-0 commit table #%" PRIu64
": memtable #%" PRIu64 " done",
cfd->GetName().c_str(), m->file_number_, mem_id);
assert(m->file_number_ > 0);
current_->Remove(m, to_delete);
++mem_id;
}
} else {
for (auto it = current_->memlist_.rbegin(); batch_count-- > 0; it++) {
MemTable* m = *it;
// commit failed. setup state so that we can flush again.
ROCKS_LOG_BUFFER(log_buffer, "Level-0 commit table #%" PRIu64
": memtable #%" PRIu64 " failed",
m->file_number_, mem_id);
m->flush_completed_ = false;
m->flush_in_progress_ = false;
m->edit_.Clear();
num_flush_not_started_++;
m->file_number_ = 0;
imm_flush_needed.store(true, std::memory_order_release);
++mem_id;
}
}
}
}
commit_in_progress_ = false;
return s;
}
// New memtables are inserted at the front of the list.
void MemTableList::Add(MemTable* m, autovector<MemTable*>* to_delete) {
assert(static_cast<int>(current_->memlist_.size()) >= num_flush_not_started_);
InstallNewVersion();
// this method is used to move mutable memtable into an immutable list.
// since mutable memtable is already refcounted by the DBImpl,
// and when moving to the imutable list we don't unref it,
// we don't have to ref the memtable here. we just take over the
// reference from the DBImpl.
current_->Add(m, to_delete);
m->MarkImmutable();
num_flush_not_started_++;
if (num_flush_not_started_ == 1) {
imm_flush_needed.store(true, std::memory_order_release);
}
}
// Returns an estimate of the number of bytes of data in use.
size_t MemTableList::ApproximateUnflushedMemTablesMemoryUsage() {
size_t total_size = 0;
for (auto& memtable : current_->memlist_) {
total_size += memtable->ApproximateMemoryUsage();
}
return total_size;
}
size_t MemTableList::ApproximateMemoryUsage() { return current_memory_usage_; }
void MemTableList::InstallNewVersion() {
if (current_->refs_ == 1) {
// we're the only one using the version, just keep using it
} else {
// somebody else holds the current version, we need to create new one
MemTableListVersion* version = current_;
current_ = new MemTableListVersion(&current_memory_usage_, current_);
current_->Ref();
version->Unref();
}
}
uint64_t MemTableList::GetMinLogContainingPrepSection() {
uint64_t min_log = 0;
for (auto& m : current_->memlist_) {
// this mem has been flushed it no longer
// needs to hold on the its prep section
if (m->flush_completed_) {
continue;
}
auto log = m->GetMinLogContainingPrepSection();
if (log > 0 && (min_log == 0 || log < min_log)) {
min_log = log;
}
}
return min_log;
}
} // namespace rocksdb