rocksdb/db/write_batch.cc
Reid Horuff 6e56a114be Modification of WriteBatch to support two phase commit
Summary: Adds three new WriteBatch data types: Prepare(xid), Commit(xid), Rollback(xid). Prepare(xid) should precede the (single) operation to which is applies. There can obviously be multiple Prepare(xid) markers. There should only be one Rollback(xid) or Commit(xid) marker yet not both. None of this logic is currently enforced and will most likely be implemented further up such as in the memtableinserter. All three markers are similar to PutLogData in that they are writebatch meta-data, ie stored but not counted. All three markers differ from PutLogData in that they will actually be written to disk. As for WriteBatchWithIndex, Prepare, Commit, Rollback are all implemented just as PutLogData and none are tested just as PutLogData.

Test Plan: single unit test in write_batch_test.

Reviewers: hermanlee4, sdong, anthony

Subscribers: andrewkr, vasilep, dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D54093
2016-04-29 11:50:30 -07:00

1003 lines
34 KiB
C++

// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
//
// WriteBatch::rep_ :=
// sequence: fixed64
// count: fixed32
// data: record[count]
// record :=
// kTypeValue varstring varstring
// kTypeDeletion varstring
// kTypeSingleDeletion varstring
// kTypeMerge varstring varstring
// kTypeColumnFamilyValue varint32 varstring varstring
// kTypeColumnFamilyDeletion varint32 varstring varstring
// kTypeColumnFamilySingleDeletion varint32 varstring varstring
// kTypeColumnFamilyMerge varint32 varstring varstring
// kTypeBeginPrepareXID varstring
// kTypeEndPrepareXID
// kTypeCommitXID varstring
// kTypeRollbackXID varstring
// kTypeNoop
// varstring :=
// len: varint32
// data: uint8[len]
#include "rocksdb/write_batch.h"
#include <stack>
#include <stdexcept>
#include <vector>
#include "db/column_family.h"
#include "db/db_impl.h"
#include "db/dbformat.h"
#include "db/flush_scheduler.h"
#include "db/memtable.h"
#include "db/snapshot_impl.h"
#include "db/write_batch_internal.h"
#include "rocksdb/merge_operator.h"
#include "util/coding.h"
#include "util/perf_context_imp.h"
#include "util/statistics.h"
namespace rocksdb {
// anon namespace for file-local types
namespace {
enum ContentFlags : uint32_t {
DEFERRED = 1 << 0,
HAS_PUT = 1 << 1,
HAS_DELETE = 1 << 2,
HAS_SINGLE_DELETE = 1 << 3,
HAS_MERGE = 1 << 4,
HAS_BEGIN_PREPARE = 1 << 5,
HAS_END_PREPARE = 1 << 6,
HAS_COMMIT = 1 << 7,
HAS_ROLLBACK = 1 << 8,
};
struct BatchContentClassifier : public WriteBatch::Handler {
uint32_t content_flags = 0;
Status PutCF(uint32_t, const Slice&, const Slice&) override {
content_flags |= ContentFlags::HAS_PUT;
return Status::OK();
}
Status DeleteCF(uint32_t, const Slice&) override {
content_flags |= ContentFlags::HAS_DELETE;
return Status::OK();
}
Status SingleDeleteCF(uint32_t, const Slice&) override {
content_flags |= ContentFlags::HAS_SINGLE_DELETE;
return Status::OK();
}
Status MergeCF(uint32_t, const Slice&, const Slice&) override {
content_flags |= ContentFlags::HAS_MERGE;
return Status::OK();
}
Status MarkBeginPrepare() override {
content_flags |= ContentFlags::HAS_BEGIN_PREPARE;
return Status::OK();
}
Status MarkEndPrepare(const Slice&) override {
content_flags |= ContentFlags::HAS_END_PREPARE;
return Status::OK();
}
Status MarkCommit(const Slice&) override {
content_flags |= ContentFlags::HAS_COMMIT;
return Status::OK();
}
Status MarkRollback(const Slice&) override {
content_flags |= ContentFlags::HAS_ROLLBACK;
return Status::OK();
}
};
} // anon namespace
struct SavePoint {
size_t size; // size of rep_
int count; // count of elements in rep_
uint32_t content_flags;
};
struct SavePoints {
std::stack<SavePoint> stack;
};
WriteBatch::WriteBatch(size_t reserved_bytes)
: save_points_(nullptr), content_flags_(0), rep_() {
rep_.reserve((reserved_bytes > WriteBatchInternal::kHeader) ?
reserved_bytes : WriteBatchInternal::kHeader);
rep_.resize(WriteBatchInternal::kHeader);
rep_.push_back(static_cast<char>(kTypeNoop));
}
WriteBatch::WriteBatch(const std::string& rep)
: save_points_(nullptr),
content_flags_(ContentFlags::DEFERRED),
rep_(rep) {}
WriteBatch::WriteBatch(const WriteBatch& src)
: save_points_(src.save_points_),
content_flags_(src.content_flags_.load(std::memory_order_relaxed)),
rep_(src.rep_) {}
WriteBatch::WriteBatch(WriteBatch&& src)
: save_points_(std::move(src.save_points_)),
content_flags_(src.content_flags_.load(std::memory_order_relaxed)),
rep_(std::move(src.rep_)) {}
WriteBatch& WriteBatch::operator=(const WriteBatch& src) {
if (&src != this) {
this->~WriteBatch();
new (this) WriteBatch(src);
}
return *this;
}
WriteBatch& WriteBatch::operator=(WriteBatch&& src) {
if (&src != this) {
this->~WriteBatch();
new (this) WriteBatch(std::move(src));
}
return *this;
}
WriteBatch::~WriteBatch() { delete save_points_; }
WriteBatch::Handler::~Handler() { }
void WriteBatch::Handler::LogData(const Slice& blob) {
// If the user has not specified something to do with blobs, then we ignore
// them.
}
bool WriteBatch::Handler::Continue() {
return true;
}
void WriteBatch::Clear() {
rep_.clear();
rep_.resize(WriteBatchInternal::kHeader);
rep_.push_back(static_cast<char>(kTypeNoop));
content_flags_.store(0, std::memory_order_relaxed);
if (save_points_ != nullptr) {
while (!save_points_->stack.empty()) {
save_points_->stack.pop();
}
}
}
int WriteBatch::Count() const {
return WriteBatchInternal::Count(this);
}
uint32_t WriteBatch::ComputeContentFlags() const {
auto rv = content_flags_.load(std::memory_order_relaxed);
if ((rv & ContentFlags::DEFERRED) != 0) {
BatchContentClassifier classifier;
Iterate(&classifier);
rv = classifier.content_flags;
// this method is conceptually const, because it is performing a lazy
// computation that doesn't affect the abstract state of the batch.
// content_flags_ is marked mutable so that we can perform the
// following assignment
content_flags_.store(rv, std::memory_order_relaxed);
}
return rv;
}
bool WriteBatch::HasPut() const {
return (ComputeContentFlags() & ContentFlags::HAS_PUT) != 0;
}
bool WriteBatch::HasDelete() const {
return (ComputeContentFlags() & ContentFlags::HAS_DELETE) != 0;
}
bool WriteBatch::HasSingleDelete() const {
return (ComputeContentFlags() & ContentFlags::HAS_SINGLE_DELETE) != 0;
}
bool WriteBatch::HasMerge() const {
return (ComputeContentFlags() & ContentFlags::HAS_MERGE) != 0;
}
bool ReadKeyFromWriteBatchEntry(Slice* input, Slice* key, bool cf_record) {
assert(input != nullptr && key != nullptr);
// Skip tag byte
input->remove_prefix(1);
if (cf_record) {
// Skip column_family bytes
uint32_t cf;
if (!GetVarint32(input, &cf)) {
return false;
}
}
// Extract key
return GetLengthPrefixedSlice(input, key);
}
bool WriteBatch::HasBeginPrepare() const {
return (ComputeContentFlags() & ContentFlags::HAS_BEGIN_PREPARE) != 0;
}
bool WriteBatch::HasEndPrepare() const {
return (ComputeContentFlags() & ContentFlags::HAS_END_PREPARE) != 0;
}
bool WriteBatch::HasCommit() const {
return (ComputeContentFlags() & ContentFlags::HAS_COMMIT) != 0;
}
bool WriteBatch::HasRollback() const {
return (ComputeContentFlags() & ContentFlags::HAS_ROLLBACK) != 0;
}
Status ReadRecordFromWriteBatch(Slice* input, char* tag,
uint32_t* column_family, Slice* key,
Slice* value, Slice* blob, Slice* xid) {
assert(key != nullptr && value != nullptr);
*tag = (*input)[0];
input->remove_prefix(1);
*column_family = 0; // default
switch (*tag) {
case kTypeColumnFamilyValue:
if (!GetVarint32(input, column_family)) {
return Status::Corruption("bad WriteBatch Put");
}
// intentional fallthrough
case kTypeValue:
if (!GetLengthPrefixedSlice(input, key) ||
!GetLengthPrefixedSlice(input, value)) {
return Status::Corruption("bad WriteBatch Put");
}
break;
case kTypeColumnFamilyDeletion:
case kTypeColumnFamilySingleDeletion:
if (!GetVarint32(input, column_family)) {
return Status::Corruption("bad WriteBatch Delete");
}
// intentional fallthrough
case kTypeDeletion:
case kTypeSingleDeletion:
if (!GetLengthPrefixedSlice(input, key)) {
return Status::Corruption("bad WriteBatch Delete");
}
break;
case kTypeColumnFamilyMerge:
if (!GetVarint32(input, column_family)) {
return Status::Corruption("bad WriteBatch Merge");
}
// intentional fallthrough
case kTypeMerge:
if (!GetLengthPrefixedSlice(input, key) ||
!GetLengthPrefixedSlice(input, value)) {
return Status::Corruption("bad WriteBatch Merge");
}
break;
case kTypeLogData:
assert(blob != nullptr);
if (!GetLengthPrefixedSlice(input, blob)) {
return Status::Corruption("bad WriteBatch Blob");
}
break;
case kTypeNoop:
case kTypeBeginPrepareXID:
break;
case kTypeEndPrepareXID:
if (!GetLengthPrefixedSlice(input, xid)) {
return Status::Corruption("bad EndPrepare XID");
}
break;
case kTypeCommitXID:
if (!GetLengthPrefixedSlice(input, xid)) {
return Status::Corruption("bad Commit XID");
}
break;
case kTypeRollbackXID:
if (!GetLengthPrefixedSlice(input, xid)) {
return Status::Corruption("bad Rollback XID");
}
break;
default:
return Status::Corruption("unknown WriteBatch tag");
}
return Status::OK();
}
Status WriteBatch::Iterate(Handler* handler) const {
Slice input(rep_);
if (input.size() < WriteBatchInternal::kHeader) {
return Status::Corruption("malformed WriteBatch (too small)");
}
input.remove_prefix(WriteBatchInternal::kHeader);
Slice key, value, blob, xid;
int found = 0;
Status s;
while (s.ok() && !input.empty() && handler->Continue()) {
char tag = 0;
uint32_t column_family = 0; // default
s = ReadRecordFromWriteBatch(&input, &tag, &column_family, &key, &value,
&blob, &xid);
if (!s.ok()) {
return s;
}
switch (tag) {
case kTypeColumnFamilyValue:
case kTypeValue:
assert(content_flags_.load(std::memory_order_relaxed) &
(ContentFlags::DEFERRED | ContentFlags::HAS_PUT));
s = handler->PutCF(column_family, key, value);
found++;
break;
case kTypeColumnFamilyDeletion:
case kTypeDeletion:
assert(content_flags_.load(std::memory_order_relaxed) &
(ContentFlags::DEFERRED | ContentFlags::HAS_DELETE));
s = handler->DeleteCF(column_family, key);
found++;
break;
case kTypeColumnFamilySingleDeletion:
case kTypeSingleDeletion:
assert(content_flags_.load(std::memory_order_relaxed) &
(ContentFlags::DEFERRED | ContentFlags::HAS_SINGLE_DELETE));
s = handler->SingleDeleteCF(column_family, key);
found++;
break;
case kTypeColumnFamilyMerge:
case kTypeMerge:
assert(content_flags_.load(std::memory_order_relaxed) &
(ContentFlags::DEFERRED | ContentFlags::HAS_MERGE));
s = handler->MergeCF(column_family, key, value);
found++;
break;
case kTypeLogData:
handler->LogData(blob);
break;
case kTypeBeginPrepareXID:
assert(content_flags_.load(std::memory_order_relaxed) &
(ContentFlags::DEFERRED | ContentFlags::HAS_BEGIN_PREPARE));
handler->MarkBeginPrepare();
break;
case kTypeEndPrepareXID:
assert(content_flags_.load(std::memory_order_relaxed) &
(ContentFlags::DEFERRED | ContentFlags::HAS_END_PREPARE));
handler->MarkEndPrepare(xid);
break;
case kTypeCommitXID:
assert(content_flags_.load(std::memory_order_relaxed) &
(ContentFlags::DEFERRED | ContentFlags::HAS_COMMIT));
handler->MarkCommit(xid);
break;
case kTypeRollbackXID:
assert(content_flags_.load(std::memory_order_relaxed) &
(ContentFlags::DEFERRED | ContentFlags::HAS_ROLLBACK));
handler->MarkRollback(xid);
break;
case kTypeNoop:
break;
default:
return Status::Corruption("unknown WriteBatch tag");
}
}
if (!s.ok()) {
return s;
}
if (found != WriteBatchInternal::Count(this)) {
return Status::Corruption("WriteBatch has wrong count");
} else {
return Status::OK();
}
}
int WriteBatchInternal::Count(const WriteBatch* b) {
return DecodeFixed32(b->rep_.data() + 8);
}
void WriteBatchInternal::SetCount(WriteBatch* b, int n) {
EncodeFixed32(&b->rep_[8], n);
}
SequenceNumber WriteBatchInternal::Sequence(const WriteBatch* b) {
return SequenceNumber(DecodeFixed64(b->rep_.data()));
}
void WriteBatchInternal::SetSequence(WriteBatch* b, SequenceNumber seq) {
EncodeFixed64(&b->rep_[0], seq);
}
size_t WriteBatchInternal::GetFirstOffset(WriteBatch* b) {
return WriteBatchInternal::kHeader;
}
void WriteBatchInternal::Put(WriteBatch* b, uint32_t column_family_id,
const Slice& key, const Slice& value) {
WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
if (column_family_id == 0) {
b->rep_.push_back(static_cast<char>(kTypeValue));
} else {
b->rep_.push_back(static_cast<char>(kTypeColumnFamilyValue));
PutVarint32(&b->rep_, column_family_id);
}
PutLengthPrefixedSlice(&b->rep_, key);
PutLengthPrefixedSlice(&b->rep_, value);
b->content_flags_.store(
b->content_flags_.load(std::memory_order_relaxed) | ContentFlags::HAS_PUT,
std::memory_order_relaxed);
}
void WriteBatch::Put(ColumnFamilyHandle* column_family, const Slice& key,
const Slice& value) {
WriteBatchInternal::Put(this, GetColumnFamilyID(column_family), key, value);
}
void WriteBatchInternal::Put(WriteBatch* b, uint32_t column_family_id,
const SliceParts& key, const SliceParts& value) {
WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
if (column_family_id == 0) {
b->rep_.push_back(static_cast<char>(kTypeValue));
} else {
b->rep_.push_back(static_cast<char>(kTypeColumnFamilyValue));
PutVarint32(&b->rep_, column_family_id);
}
PutLengthPrefixedSliceParts(&b->rep_, key);
PutLengthPrefixedSliceParts(&b->rep_, value);
b->content_flags_.store(
b->content_flags_.load(std::memory_order_relaxed) | ContentFlags::HAS_PUT,
std::memory_order_relaxed);
}
void WriteBatch::Put(ColumnFamilyHandle* column_family, const SliceParts& key,
const SliceParts& value) {
WriteBatchInternal::Put(this, GetColumnFamilyID(column_family), key, value);
}
void WriteBatchInternal::MarkEndPrepare(WriteBatch* b, const Slice& xid) {
// a manually constructed batch can only contain one prepare section
assert(b->rep_[12] == static_cast<char>(kTypeNoop));
// all savepoints up to this point are cleared
if (b->save_points_ != nullptr) {
while (!b->save_points_->stack.empty()) {
b->save_points_->stack.pop();
}
}
// rewrite noop as begin marker
b->rep_[12] = static_cast<char>(kTypeBeginPrepareXID);
b->rep_.push_back(static_cast<char>(kTypeEndPrepareXID));
PutLengthPrefixedSlice(&b->rep_, xid);
b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
ContentFlags::HAS_END_PREPARE |
ContentFlags::HAS_BEGIN_PREPARE,
std::memory_order_relaxed);
}
void WriteBatchInternal::MarkCommit(WriteBatch* b, const Slice& xid) {
b->rep_.push_back(static_cast<char>(kTypeCommitXID));
PutLengthPrefixedSlice(&b->rep_, xid);
b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
ContentFlags::HAS_COMMIT,
std::memory_order_relaxed);
}
void WriteBatchInternal::MarkRollback(WriteBatch* b, const Slice& xid) {
b->rep_.push_back(static_cast<char>(kTypeRollbackXID));
PutLengthPrefixedSlice(&b->rep_, xid);
b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
ContentFlags::HAS_ROLLBACK,
std::memory_order_relaxed);
}
void WriteBatchInternal::Delete(WriteBatch* b, uint32_t column_family_id,
const Slice& key) {
WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
if (column_family_id == 0) {
b->rep_.push_back(static_cast<char>(kTypeDeletion));
} else {
b->rep_.push_back(static_cast<char>(kTypeColumnFamilyDeletion));
PutVarint32(&b->rep_, column_family_id);
}
PutLengthPrefixedSlice(&b->rep_, key);
b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
ContentFlags::HAS_DELETE,
std::memory_order_relaxed);
}
void WriteBatch::Delete(ColumnFamilyHandle* column_family, const Slice& key) {
WriteBatchInternal::Delete(this, GetColumnFamilyID(column_family), key);
}
void WriteBatchInternal::Delete(WriteBatch* b, uint32_t column_family_id,
const SliceParts& key) {
WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
if (column_family_id == 0) {
b->rep_.push_back(static_cast<char>(kTypeDeletion));
} else {
b->rep_.push_back(static_cast<char>(kTypeColumnFamilyDeletion));
PutVarint32(&b->rep_, column_family_id);
}
PutLengthPrefixedSliceParts(&b->rep_, key);
b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
ContentFlags::HAS_DELETE,
std::memory_order_relaxed);
}
void WriteBatch::Delete(ColumnFamilyHandle* column_family,
const SliceParts& key) {
WriteBatchInternal::Delete(this, GetColumnFamilyID(column_family), key);
}
void WriteBatchInternal::SingleDelete(WriteBatch* b, uint32_t column_family_id,
const Slice& key) {
WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
if (column_family_id == 0) {
b->rep_.push_back(static_cast<char>(kTypeSingleDeletion));
} else {
b->rep_.push_back(static_cast<char>(kTypeColumnFamilySingleDeletion));
PutVarint32(&b->rep_, column_family_id);
}
PutLengthPrefixedSlice(&b->rep_, key);
b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
ContentFlags::HAS_SINGLE_DELETE,
std::memory_order_relaxed);
}
void WriteBatch::SingleDelete(ColumnFamilyHandle* column_family,
const Slice& key) {
WriteBatchInternal::SingleDelete(this, GetColumnFamilyID(column_family), key);
}
void WriteBatchInternal::SingleDelete(WriteBatch* b, uint32_t column_family_id,
const SliceParts& key) {
WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
if (column_family_id == 0) {
b->rep_.push_back(static_cast<char>(kTypeSingleDeletion));
} else {
b->rep_.push_back(static_cast<char>(kTypeColumnFamilySingleDeletion));
PutVarint32(&b->rep_, column_family_id);
}
PutLengthPrefixedSliceParts(&b->rep_, key);
b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
ContentFlags::HAS_SINGLE_DELETE,
std::memory_order_relaxed);
}
void WriteBatch::SingleDelete(ColumnFamilyHandle* column_family,
const SliceParts& key) {
WriteBatchInternal::SingleDelete(this, GetColumnFamilyID(column_family), key);
}
void WriteBatchInternal::Merge(WriteBatch* b, uint32_t column_family_id,
const Slice& key, const Slice& value) {
WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
if (column_family_id == 0) {
b->rep_.push_back(static_cast<char>(kTypeMerge));
} else {
b->rep_.push_back(static_cast<char>(kTypeColumnFamilyMerge));
PutVarint32(&b->rep_, column_family_id);
}
PutLengthPrefixedSlice(&b->rep_, key);
PutLengthPrefixedSlice(&b->rep_, value);
b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
ContentFlags::HAS_MERGE,
std::memory_order_relaxed);
}
void WriteBatch::Merge(ColumnFamilyHandle* column_family, const Slice& key,
const Slice& value) {
WriteBatchInternal::Merge(this, GetColumnFamilyID(column_family), key, value);
}
void WriteBatchInternal::Merge(WriteBatch* b, uint32_t column_family_id,
const SliceParts& key,
const SliceParts& value) {
WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
if (column_family_id == 0) {
b->rep_.push_back(static_cast<char>(kTypeMerge));
} else {
b->rep_.push_back(static_cast<char>(kTypeColumnFamilyMerge));
PutVarint32(&b->rep_, column_family_id);
}
PutLengthPrefixedSliceParts(&b->rep_, key);
PutLengthPrefixedSliceParts(&b->rep_, value);
b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
ContentFlags::HAS_MERGE,
std::memory_order_relaxed);
}
void WriteBatch::Merge(ColumnFamilyHandle* column_family,
const SliceParts& key,
const SliceParts& value) {
WriteBatchInternal::Merge(this, GetColumnFamilyID(column_family),
key, value);
}
void WriteBatch::PutLogData(const Slice& blob) {
rep_.push_back(static_cast<char>(kTypeLogData));
PutLengthPrefixedSlice(&rep_, blob);
}
void WriteBatch::SetSavePoint() {
if (save_points_ == nullptr) {
save_points_ = new SavePoints();
}
// Record length and count of current batch of writes.
save_points_->stack.push(SavePoint{
GetDataSize(), Count(), content_flags_.load(std::memory_order_relaxed)});
}
Status WriteBatch::RollbackToSavePoint() {
if (save_points_ == nullptr || save_points_->stack.size() == 0) {
return Status::NotFound();
}
// Pop the most recent savepoint off the stack
SavePoint savepoint = save_points_->stack.top();
save_points_->stack.pop();
assert(savepoint.size <= rep_.size());
assert(savepoint.count <= Count());
if (savepoint.size == rep_.size()) {
// No changes to rollback
} else if (savepoint.size == 0) {
// Rollback everything
Clear();
} else {
rep_.resize(savepoint.size);
WriteBatchInternal::SetCount(this, savepoint.count);
content_flags_.store(savepoint.content_flags, std::memory_order_relaxed);
}
return Status::OK();
}
namespace {
class MemTableInserter : public WriteBatch::Handler {
public:
SequenceNumber sequence_;
ColumnFamilyMemTables* const cf_mems_;
FlushScheduler* const flush_scheduler_;
const bool ignore_missing_column_families_;
const uint64_t log_number_;
DBImpl* db_;
const bool dont_filter_deletes_;
const bool concurrent_memtable_writes_;
// cf_mems should not be shared with concurrent inserters
MemTableInserter(SequenceNumber sequence, ColumnFamilyMemTables* cf_mems,
FlushScheduler* flush_scheduler,
bool ignore_missing_column_families, uint64_t log_number,
DB* db, const bool dont_filter_deletes,
bool concurrent_memtable_writes)
: sequence_(sequence),
cf_mems_(cf_mems),
flush_scheduler_(flush_scheduler),
ignore_missing_column_families_(ignore_missing_column_families),
log_number_(log_number),
db_(reinterpret_cast<DBImpl*>(db)),
dont_filter_deletes_(dont_filter_deletes),
concurrent_memtable_writes_(concurrent_memtable_writes) {
assert(cf_mems_);
if (!dont_filter_deletes_) {
assert(db_);
}
}
bool SeekToColumnFamily(uint32_t column_family_id, Status* s) {
// If we are in a concurrent mode, it is the caller's responsibility
// to clone the original ColumnFamilyMemTables so that each thread
// has its own instance. Otherwise, it must be guaranteed that there
// is no concurrent access
bool found = cf_mems_->Seek(column_family_id);
if (!found) {
if (ignore_missing_column_families_) {
*s = Status::OK();
} else {
*s = Status::InvalidArgument(
"Invalid column family specified in write batch");
}
return false;
}
if (log_number_ != 0 && log_number_ < cf_mems_->GetLogNumber()) {
// This is true only in recovery environment (log_number_ is always 0 in
// non-recovery, regular write code-path)
// * If log_number_ < cf_mems_->GetLogNumber(), this means that column
// family already contains updates from this log. We can't apply updates
// twice because of update-in-place or merge workloads -- ignore the
// update
*s = Status::OK();
return false;
}
return true;
}
virtual Status PutCF(uint32_t column_family_id, const Slice& key,
const Slice& value) override {
Status seek_status;
if (!SeekToColumnFamily(column_family_id, &seek_status)) {
++sequence_;
return seek_status;
}
MemTable* mem = cf_mems_->GetMemTable();
auto* moptions = mem->GetMemTableOptions();
if (!moptions->inplace_update_support) {
mem->Add(sequence_, kTypeValue, key, value, concurrent_memtable_writes_);
} else if (moptions->inplace_callback == nullptr) {
assert(!concurrent_memtable_writes_);
mem->Update(sequence_, key, value);
RecordTick(moptions->statistics, NUMBER_KEYS_UPDATED);
} else {
assert(!concurrent_memtable_writes_);
if (mem->UpdateCallback(sequence_, key, value)) {
} else {
// key not found in memtable. Do sst get, update, add
SnapshotImpl read_from_snapshot;
read_from_snapshot.number_ = sequence_;
ReadOptions ropts;
ropts.snapshot = &read_from_snapshot;
std::string prev_value;
std::string merged_value;
auto cf_handle = cf_mems_->GetColumnFamilyHandle();
if (cf_handle == nullptr) {
cf_handle = db_->DefaultColumnFamily();
}
Status s = db_->Get(ropts, cf_handle, key, &prev_value);
char* prev_buffer = const_cast<char*>(prev_value.c_str());
uint32_t prev_size = static_cast<uint32_t>(prev_value.size());
auto status = moptions->inplace_callback(s.ok() ? prev_buffer : nullptr,
s.ok() ? &prev_size : nullptr,
value, &merged_value);
if (status == UpdateStatus::UPDATED_INPLACE) {
// prev_value is updated in-place with final value.
mem->Add(sequence_, kTypeValue, key, Slice(prev_buffer, prev_size));
RecordTick(moptions->statistics, NUMBER_KEYS_WRITTEN);
} else if (status == UpdateStatus::UPDATED) {
// merged_value contains the final value.
mem->Add(sequence_, kTypeValue, key, Slice(merged_value));
RecordTick(moptions->statistics, NUMBER_KEYS_WRITTEN);
}
}
}
// Since all Puts are logged in trasaction logs (if enabled), always bump
// sequence number. Even if the update eventually fails and does not result
// in memtable add/update.
sequence_++;
CheckMemtableFull();
return Status::OK();
}
Status DeleteImpl(uint32_t column_family_id, const Slice& key,
ValueType delete_type) {
Status seek_status;
if (!SeekToColumnFamily(column_family_id, &seek_status)) {
++sequence_;
return seek_status;
}
MemTable* mem = cf_mems_->GetMemTable();
auto* moptions = mem->GetMemTableOptions();
if (!dont_filter_deletes_ && moptions->filter_deletes) {
assert(!concurrent_memtable_writes_);
SnapshotImpl read_from_snapshot;
read_from_snapshot.number_ = sequence_;
ReadOptions ropts;
ropts.snapshot = &read_from_snapshot;
std::string value;
auto cf_handle = cf_mems_->GetColumnFamilyHandle();
if (cf_handle == nullptr) {
cf_handle = db_->DefaultColumnFamily();
}
if (!db_->KeyMayExist(ropts, cf_handle, key, &value)) {
RecordTick(moptions->statistics, NUMBER_FILTERED_DELETES);
return Status::OK();
}
}
mem->Add(sequence_, delete_type, key, Slice(), concurrent_memtable_writes_);
sequence_++;
CheckMemtableFull();
return Status::OK();
}
virtual Status DeleteCF(uint32_t column_family_id,
const Slice& key) override {
return DeleteImpl(column_family_id, key, kTypeDeletion);
}
virtual Status SingleDeleteCF(uint32_t column_family_id,
const Slice& key) override {
return DeleteImpl(column_family_id, key, kTypeSingleDeletion);
}
virtual Status MergeCF(uint32_t column_family_id, const Slice& key,
const Slice& value) override {
assert(!concurrent_memtable_writes_);
Status seek_status;
if (!SeekToColumnFamily(column_family_id, &seek_status)) {
++sequence_;
return seek_status;
}
MemTable* mem = cf_mems_->GetMemTable();
auto* moptions = mem->GetMemTableOptions();
bool perform_merge = false;
if (moptions->max_successive_merges > 0 && db_ != nullptr) {
LookupKey lkey(key, sequence_);
// Count the number of successive merges at the head
// of the key in the memtable
size_t num_merges = mem->CountSuccessiveMergeEntries(lkey);
if (num_merges >= moptions->max_successive_merges) {
perform_merge = true;
}
}
if (perform_merge) {
// 1) Get the existing value
std::string get_value;
// Pass in the sequence number so that we also include previous merge
// operations in the same batch.
SnapshotImpl read_from_snapshot;
read_from_snapshot.number_ = sequence_;
ReadOptions read_options;
read_options.snapshot = &read_from_snapshot;
auto cf_handle = cf_mems_->GetColumnFamilyHandle();
if (cf_handle == nullptr) {
cf_handle = db_->DefaultColumnFamily();
}
db_->Get(read_options, cf_handle, key, &get_value);
Slice get_value_slice = Slice(get_value);
// 2) Apply this merge
auto merge_operator = moptions->merge_operator;
assert(merge_operator);
std::deque<std::string> operands;
operands.push_front(value.ToString());
std::string new_value;
bool merge_success = false;
{
StopWatchNano timer(Env::Default(), moptions->statistics != nullptr);
PERF_TIMER_GUARD(merge_operator_time_nanos);
merge_success = merge_operator->FullMerge(
key, &get_value_slice, operands, &new_value, moptions->info_log);
RecordTick(moptions->statistics, MERGE_OPERATION_TOTAL_TIME,
timer.ElapsedNanos());
}
if (!merge_success) {
// Failed to merge!
RecordTick(moptions->statistics, NUMBER_MERGE_FAILURES);
// Store the delta in memtable
perform_merge = false;
} else {
// 3) Add value to memtable
mem->Add(sequence_, kTypeValue, key, new_value);
}
}
if (!perform_merge) {
// Add merge operator to memtable
mem->Add(sequence_, kTypeMerge, key, value);
}
sequence_++;
CheckMemtableFull();
return Status::OK();
}
void CheckMemtableFull() {
if (flush_scheduler_ != nullptr) {
auto* cfd = cf_mems_->current();
assert(cfd != nullptr);
if (cfd->mem()->ShouldScheduleFlush() &&
cfd->mem()->MarkFlushScheduled()) {
// MarkFlushScheduled only returns true if we are the one that
// should take action, so no need to dedup further
flush_scheduler_->ScheduleFlush(cfd);
}
}
}
};
} // namespace
// This function can only be called in these conditions:
// 1) During Recovery()
// 2) During Write(), in a single-threaded write thread
// 3) During Write(), in a concurrent context where memtables has been cloned
// The reason is that it calls memtables->Seek(), which has a stateful cache
Status WriteBatchInternal::InsertInto(
const autovector<WriteThread::Writer*>& writers, SequenceNumber sequence,
ColumnFamilyMemTables* memtables, FlushScheduler* flush_scheduler,
bool ignore_missing_column_families, uint64_t log_number, DB* db,
const bool dont_filter_deletes, bool concurrent_memtable_writes) {
MemTableInserter inserter(sequence, memtables, flush_scheduler,
ignore_missing_column_families, log_number, db,
dont_filter_deletes, concurrent_memtable_writes);
for (size_t i = 0; i < writers.size(); i++) {
if (!writers[i]->CallbackFailed()) {
writers[i]->status = writers[i]->batch->Iterate(&inserter);
if (!writers[i]->status.ok()) {
return writers[i]->status;
}
}
}
return Status::OK();
}
Status WriteBatchInternal::InsertInto(const WriteBatch* batch,
ColumnFamilyMemTables* memtables,
FlushScheduler* flush_scheduler,
bool ignore_missing_column_families,
uint64_t log_number, DB* db,
const bool dont_filter_deletes,
bool concurrent_memtable_writes) {
MemTableInserter inserter(WriteBatchInternal::Sequence(batch), memtables,
flush_scheduler, ignore_missing_column_families,
log_number, db, dont_filter_deletes,
concurrent_memtable_writes);
return batch->Iterate(&inserter);
}
void WriteBatchInternal::SetContents(WriteBatch* b, const Slice& contents) {
assert(contents.size() >= WriteBatchInternal::kHeader);
b->rep_.assign(contents.data(), contents.size());
b->content_flags_.store(ContentFlags::DEFERRED, std::memory_order_relaxed);
}
void WriteBatchInternal::Append(WriteBatch* dst, const WriteBatch* src) {
SetCount(dst, Count(dst) + Count(src));
assert(src->rep_.size() >= WriteBatchInternal::kHeader);
dst->rep_.append(src->rep_.data() + WriteBatchInternal::kHeader,
src->rep_.size() - WriteBatchInternal::kHeader);
dst->content_flags_.store(
dst->content_flags_.load(std::memory_order_relaxed) |
src->content_flags_.load(std::memory_order_relaxed),
std::memory_order_relaxed);
}
size_t WriteBatchInternal::AppendedByteSize(size_t leftByteSize,
size_t rightByteSize) {
if (leftByteSize == 0 || rightByteSize == 0) {
return leftByteSize + rightByteSize;
} else {
return leftByteSize + rightByteSize - WriteBatchInternal::kHeader;
}
}
} // namespace rocksdb