rocksdb/db/write_batch_internal.h
Yanqin Jin 924616526a Update WriteBatch::AssignTimestamp() and Add (#9205)
Summary:
Pull Request resolved: https://github.com/facebook/rocksdb/pull/9205

Update WriteBatch::AssignTimestamp() APIs so that they take an
additional argument, i.e. a function object called `checker` indicating the user-specified logic of performing
checks on timestamp sizes.

WriteBatch is a building block used by multiple other RocksDB components, each of which may track
timestamp information in different data structures. For example, transaction can either write to
`WriteBatchWithIndex` which is a `WriteBatch` with index, or write directly to raw `WriteBatch` if
`Transaction::DisableIndexing()` is called.
`WriteBatchWithIndex` keeps mapping from column family id to comparator, and transaction needs
to keep similar information for the `WriteBatch` if user calls `Transaction::DisableIndexing()` (dynamically)
so that we will know the size of each timestamp later. The bookkeeping info maintained by `WriteBatchWithIndex`
and `Transaction` should not overlap.
When we later call `WriteBatch::AssignTimestamp()`, we need to use these data structures to guarantee
that we do not accidentally assign timestamps for keys from column families that disable timestamp.

Reviewed By: ltamasi

Differential Revision: D31735186

fbshipit-source-id: 8b1709ed880ac72f995aa9e012e5873b290840a7
2021-11-30 22:33:00 -08:00

438 lines
15 KiB
C++

// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#pragma once
#include <array>
#include <vector>
#include "db/flush_scheduler.h"
#include "db/kv_checksum.h"
#include "db/trim_history_scheduler.h"
#include "db/write_thread.h"
#include "rocksdb/db.h"
#include "rocksdb/options.h"
#include "rocksdb/types.h"
#include "rocksdb/write_batch.h"
#include "util/autovector.h"
#include "util/cast_util.h"
namespace ROCKSDB_NAMESPACE {
class MemTable;
class FlushScheduler;
class ColumnFamilyData;
class ColumnFamilyMemTables {
public:
virtual ~ColumnFamilyMemTables() {}
virtual bool Seek(uint32_t column_family_id) = 0;
// returns true if the update to memtable should be ignored
// (useful when recovering from log whose updates have already
// been processed)
virtual uint64_t GetLogNumber() const = 0;
virtual MemTable* GetMemTable() const = 0;
virtual ColumnFamilyHandle* GetColumnFamilyHandle() = 0;
virtual ColumnFamilyData* current() { return nullptr; }
};
class ColumnFamilyMemTablesDefault : public ColumnFamilyMemTables {
public:
explicit ColumnFamilyMemTablesDefault(MemTable* mem)
: ok_(false), mem_(mem) {}
bool Seek(uint32_t column_family_id) override {
ok_ = (column_family_id == 0);
return ok_;
}
uint64_t GetLogNumber() const override { return 0; }
MemTable* GetMemTable() const override {
assert(ok_);
return mem_;
}
ColumnFamilyHandle* GetColumnFamilyHandle() override { return nullptr; }
private:
bool ok_;
MemTable* mem_;
};
struct WriteBatch::ProtectionInfo {
// `WriteBatch` usually doesn't contain a huge number of keys so protecting
// with a fixed, non-configurable eight bytes per key may work well enough.
autovector<ProtectionInfoKVOC64> entries_;
size_t GetBytesPerKey() const { return 8; }
};
// WriteBatchInternal provides static methods for manipulating a
// WriteBatch that we don't want in the public WriteBatch interface.
class WriteBatchInternal {
public:
// WriteBatch header has an 8-byte sequence number followed by a 4-byte count.
static const size_t kHeader = 12;
// WriteBatch methods with column_family_id instead of ColumnFamilyHandle*
static Status Put(WriteBatch* batch, uint32_t column_family_id,
const Slice& key, const Slice& value);
static Status Put(WriteBatch* batch, uint32_t column_family_id,
const SliceParts& key, const SliceParts& value);
static Status Delete(WriteBatch* batch, uint32_t column_family_id,
const SliceParts& key);
static Status Delete(WriteBatch* batch, uint32_t column_family_id,
const Slice& key);
static Status SingleDelete(WriteBatch* batch, uint32_t column_family_id,
const SliceParts& key);
static Status SingleDelete(WriteBatch* batch, uint32_t column_family_id,
const Slice& key);
static Status DeleteRange(WriteBatch* b, uint32_t column_family_id,
const Slice& begin_key, const Slice& end_key);
static Status DeleteRange(WriteBatch* b, uint32_t column_family_id,
const SliceParts& begin_key,
const SliceParts& end_key);
static Status Merge(WriteBatch* batch, uint32_t column_family_id,
const Slice& key, const Slice& value);
static Status Merge(WriteBatch* batch, uint32_t column_family_id,
const SliceParts& key, const SliceParts& value);
static Status PutBlobIndex(WriteBatch* batch, uint32_t column_family_id,
const Slice& key, const Slice& value);
static Status MarkEndPrepare(WriteBatch* batch, const Slice& xid,
const bool write_after_commit = true,
const bool unprepared_batch = false);
static Status MarkRollback(WriteBatch* batch, const Slice& xid);
static Status MarkCommit(WriteBatch* batch, const Slice& xid);
static Status InsertNoop(WriteBatch* batch);
// Return the number of entries in the batch.
static uint32_t Count(const WriteBatch* batch);
// Set the count for the number of entries in the batch.
static void SetCount(WriteBatch* batch, uint32_t n);
// Return the sequence number for the start of this batch.
static SequenceNumber Sequence(const WriteBatch* batch);
// Store the specified number as the sequence number for the start of
// this batch.
static void SetSequence(WriteBatch* batch, SequenceNumber seq);
// Returns the offset of the first entry in the batch.
// This offset is only valid if the batch is not empty.
static size_t GetFirstOffset(WriteBatch* batch);
static Slice Contents(const WriteBatch* batch) {
return Slice(batch->rep_);
}
static size_t ByteSize(const WriteBatch* batch) {
return batch->rep_.size();
}
static Status SetContents(WriteBatch* batch, const Slice& contents);
static Status CheckSlicePartsLength(const SliceParts& key,
const SliceParts& value);
// Inserts batches[i] into memtable, for i in 0..num_batches-1 inclusive.
//
// If ignore_missing_column_families == true. WriteBatch
// referencing non-existing column family will be ignored.
// If ignore_missing_column_families == false, processing of the
// batches will be stopped if a reference is found to a non-existing
// column family and InvalidArgument() will be returned. The writes
// in batches may be only partially applied at that point.
//
// If log_number is non-zero, the memtable will be updated only if
// memtables->GetLogNumber() >= log_number.
//
// If flush_scheduler is non-null, it will be invoked if the memtable
// should be flushed.
//
// Under concurrent use, the caller is responsible for making sure that
// the memtables object itself is thread-local.
static Status InsertInto(
WriteThread::WriteGroup& write_group, SequenceNumber sequence,
ColumnFamilyMemTables* memtables, FlushScheduler* flush_scheduler,
TrimHistoryScheduler* trim_history_scheduler,
bool ignore_missing_column_families = false, uint64_t log_number = 0,
DB* db = nullptr, bool concurrent_memtable_writes = false,
bool seq_per_batch = false, bool batch_per_txn = true);
// Convenience form of InsertInto when you have only one batch
// next_seq returns the seq after last sequence number used in MemTable insert
static Status InsertInto(
const WriteBatch* batch, ColumnFamilyMemTables* memtables,
FlushScheduler* flush_scheduler,
TrimHistoryScheduler* trim_history_scheduler,
bool ignore_missing_column_families = false, uint64_t log_number = 0,
DB* db = nullptr, bool concurrent_memtable_writes = false,
SequenceNumber* next_seq = nullptr, bool* has_valid_writes = nullptr,
bool seq_per_batch = false, bool batch_per_txn = true);
static Status InsertInto(WriteThread::Writer* writer, SequenceNumber sequence,
ColumnFamilyMemTables* memtables,
FlushScheduler* flush_scheduler,
TrimHistoryScheduler* trim_history_scheduler,
bool ignore_missing_column_families = false,
uint64_t log_number = 0, DB* db = nullptr,
bool concurrent_memtable_writes = false,
bool seq_per_batch = false, size_t batch_cnt = 0,
bool batch_per_txn = true,
bool hint_per_batch = false);
static Status Append(WriteBatch* dst, const WriteBatch* src,
const bool WAL_only = false);
// Returns the byte size of appending a WriteBatch with ByteSize
// leftByteSize and a WriteBatch with ByteSize rightByteSize
static size_t AppendedByteSize(size_t leftByteSize, size_t rightByteSize);
// Iterate over [begin, end) range of a write batch
static Status Iterate(const WriteBatch* wb, WriteBatch::Handler* handler,
size_t begin, size_t end);
// This write batch includes the latest state that should be persisted. Such
// state meant to be used only during recovery.
static void SetAsLatestPersistentState(WriteBatch* b);
static bool IsLatestPersistentState(const WriteBatch* b);
};
// LocalSavePoint is similar to a scope guard
class LocalSavePoint {
public:
explicit LocalSavePoint(WriteBatch* batch)
: batch_(batch),
savepoint_(batch->GetDataSize(), batch->Count(),
batch->content_flags_.load(std::memory_order_relaxed))
#ifndef NDEBUG
,
committed_(false)
#endif
{
}
#ifndef NDEBUG
~LocalSavePoint() { assert(committed_); }
#endif
Status commit() {
#ifndef NDEBUG
committed_ = true;
#endif
if (batch_->max_bytes_ && batch_->rep_.size() > batch_->max_bytes_) {
batch_->rep_.resize(savepoint_.size);
WriteBatchInternal::SetCount(batch_, savepoint_.count);
if (batch_->prot_info_ != nullptr) {
batch_->prot_info_->entries_.resize(savepoint_.count);
}
batch_->content_flags_.store(savepoint_.content_flags,
std::memory_order_relaxed);
return Status::MemoryLimit();
}
return Status::OK();
}
private:
WriteBatch* batch_;
SavePoint savepoint_;
#ifndef NDEBUG
bool committed_;
#endif
};
template <typename Derived, typename Checker>
class TimestampAssignerBase : public WriteBatch::Handler {
public:
explicit TimestampAssignerBase(WriteBatch::ProtectionInfo* prot_info,
Checker&& checker)
: prot_info_(prot_info), checker_(std::move(checker)) {}
~TimestampAssignerBase() override {}
Status PutCF(uint32_t cf, const Slice& key, const Slice&) override {
return AssignTimestamp(cf, key);
}
Status DeleteCF(uint32_t cf, const Slice& key) override {
return AssignTimestamp(cf, key);
}
Status SingleDeleteCF(uint32_t cf, const Slice& key) override {
return AssignTimestamp(cf, key);
}
Status DeleteRangeCF(uint32_t cf, const Slice& begin_key,
const Slice&) override {
return AssignTimestamp(cf, begin_key);
}
Status MergeCF(uint32_t cf, const Slice& key, const Slice&) override {
return AssignTimestamp(cf, key);
}
Status PutBlobIndexCF(uint32_t cf, const Slice& key, const Slice&) override {
return AssignTimestamp(cf, key);
}
Status MarkBeginPrepare(bool) override { return Status::OK(); }
Status MarkEndPrepare(const Slice&) override { return Status::OK(); }
Status MarkCommit(const Slice&) override { return Status::OK(); }
Status MarkRollback(const Slice&) override { return Status::OK(); }
protected:
Status AssignTimestamp(uint32_t cf, const Slice& key) {
Status s = static_cast_with_check<Derived>(this)->AssignTimestampImpl(
cf, key, idx_);
++idx_;
return s;
}
Status CheckTimestampSize(uint32_t cf, size_t& ts_sz) {
return checker_(cf, ts_sz);
}
Status UpdateTimestampIfNeeded(size_t ts_sz, const Slice& key,
const Slice& ts) {
if (ts_sz > 0) {
assert(ts_sz == ts.size());
UpdateProtectionInformationIfNeeded(key, ts);
UpdateTimestamp(key, ts);
}
return Status::OK();
}
void UpdateProtectionInformationIfNeeded(const Slice& key, const Slice& ts) {
if (prot_info_ != nullptr) {
const size_t ts_sz = ts.size();
SliceParts old_key(&key, 1);
Slice key_no_ts(key.data(), key.size() - ts_sz);
std::array<Slice, 2> new_key_cmpts{{key_no_ts, ts}};
SliceParts new_key(new_key_cmpts.data(), 2);
prot_info_->entries_[idx_].UpdateK(old_key, new_key);
}
}
void UpdateTimestamp(const Slice& key, const Slice& ts) {
const size_t ts_sz = ts.size();
char* ptr = const_cast<char*>(key.data() + key.size() - ts_sz);
assert(ptr);
memcpy(ptr, ts.data(), ts_sz);
}
// No copy or move.
TimestampAssignerBase(const TimestampAssignerBase&) = delete;
TimestampAssignerBase(TimestampAssignerBase&&) = delete;
TimestampAssignerBase& operator=(const TimestampAssignerBase&) = delete;
TimestampAssignerBase& operator=(TimestampAssignerBase&&) = delete;
WriteBatch::ProtectionInfo* const prot_info_ = nullptr;
const Checker checker_{};
size_t idx_ = 0;
};
template <typename Checker>
class SimpleListTimestampAssigner
: public TimestampAssignerBase<SimpleListTimestampAssigner<Checker>,
Checker> {
public:
explicit SimpleListTimestampAssigner(WriteBatch::ProtectionInfo* prot_info,
Checker checker,
const std::vector<Slice>& timestamps)
: TimestampAssignerBase<SimpleListTimestampAssigner<Checker>, Checker>(
prot_info, std::move(checker)),
timestamps_(timestamps) {}
~SimpleListTimestampAssigner() override {}
private:
friend class TimestampAssignerBase<SimpleListTimestampAssigner<Checker>,
Checker>;
Status AssignTimestampImpl(uint32_t cf, const Slice& key, size_t idx) {
if (idx >= timestamps_.size()) {
return Status::InvalidArgument("Need more timestamps for the assignment");
}
const Slice& ts = timestamps_[idx];
size_t ts_sz = ts.size();
const Status s = this->CheckTimestampSize(cf, ts_sz);
if (!s.ok()) {
return s;
}
return this->UpdateTimestampIfNeeded(ts_sz, key, ts);
}
const std::vector<Slice>& timestamps_;
};
template <typename Checker>
class TimestampAssigner
: public TimestampAssignerBase<TimestampAssigner<Checker>, Checker> {
public:
explicit TimestampAssigner(WriteBatch::ProtectionInfo* prot_info,
Checker checker, const Slice& ts)
: TimestampAssignerBase<TimestampAssigner<Checker>, Checker>(
prot_info, std::move(checker)),
timestamp_(ts) {
assert(!timestamp_.empty());
}
~TimestampAssigner() override {}
private:
friend class TimestampAssignerBase<TimestampAssigner<Checker>, Checker>;
Status AssignTimestampImpl(uint32_t cf, const Slice& key, size_t /*idx*/) {
if (timestamp_.empty()) {
return Status::InvalidArgument("Timestamp is empty");
}
size_t ts_sz = timestamp_.size();
const Status s = this->CheckTimestampSize(cf, ts_sz);
if (!s.ok()) {
return s;
}
return this->UpdateTimestampIfNeeded(ts_sz, key, timestamp_);
}
const Slice timestamp_;
};
template <typename Checker>
Status WriteBatch::AssignTimestamp(const Slice& ts, Checker checker) {
TimestampAssigner<Checker> ts_assigner(prot_info_.get(), checker, ts);
return Iterate(&ts_assigner);
}
template <typename Checker>
Status WriteBatch::AssignTimestamps(const std::vector<Slice>& ts_list,
Checker checker) {
SimpleListTimestampAssigner<Checker> ts_assigner(prot_info_.get(), checker,
ts_list);
return Iterate(&ts_assigner);
}
} // namespace ROCKSDB_NAMESPACE