5455cacd18
Summary: As title, Closes https://github.com/facebook/rocksdb/issues/9272 Since TimestampAssigner-related classes needs to access `WriteBatch::ProtectionInfo` objects which is for internal use only, it's difficult to make `AssignTimestamp` methods a template and put them in the same public header, `include/rocksdb/write_batch.h`. Pull Request resolved: https://github.com/facebook/rocksdb/pull/9278 Test Plan: ``` make check # Also manually test following the repro-steps in issue 9272 ``` Reviewed By: ltamasi Differential Revision: D33012686 Pulled By: riversand963 fbshipit-source-id: 89f24a86a1170125bd0b94ef3b32e69aa08bd949
430 lines
15 KiB
C++
430 lines
15 KiB
C++
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
|
// This source code is licensed under both the GPLv2 (found in the
|
|
// COPYING file in the root directory) and Apache 2.0 License
|
|
// (found in the LICENSE.Apache file in the root directory).
|
|
//
|
|
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
|
|
// Use of this source code is governed by a BSD-style license that can be
|
|
// found in the LICENSE file. See the AUTHORS file for names of contributors.
|
|
|
|
#pragma once
|
|
#include <array>
|
|
#include <vector>
|
|
|
|
#include "db/flush_scheduler.h"
|
|
#include "db/kv_checksum.h"
|
|
#include "db/trim_history_scheduler.h"
|
|
#include "db/write_thread.h"
|
|
#include "rocksdb/db.h"
|
|
#include "rocksdb/options.h"
|
|
#include "rocksdb/types.h"
|
|
#include "rocksdb/write_batch.h"
|
|
#include "util/autovector.h"
|
|
#include "util/cast_util.h"
|
|
|
|
namespace ROCKSDB_NAMESPACE {
|
|
|
|
class MemTable;
|
|
class FlushScheduler;
|
|
class ColumnFamilyData;
|
|
|
|
class ColumnFamilyMemTables {
|
|
public:
|
|
virtual ~ColumnFamilyMemTables() {}
|
|
virtual bool Seek(uint32_t column_family_id) = 0;
|
|
// returns true if the update to memtable should be ignored
|
|
// (useful when recovering from log whose updates have already
|
|
// been processed)
|
|
virtual uint64_t GetLogNumber() const = 0;
|
|
virtual MemTable* GetMemTable() const = 0;
|
|
virtual ColumnFamilyHandle* GetColumnFamilyHandle() = 0;
|
|
virtual ColumnFamilyData* current() { return nullptr; }
|
|
};
|
|
|
|
class ColumnFamilyMemTablesDefault : public ColumnFamilyMemTables {
|
|
public:
|
|
explicit ColumnFamilyMemTablesDefault(MemTable* mem)
|
|
: ok_(false), mem_(mem) {}
|
|
|
|
bool Seek(uint32_t column_family_id) override {
|
|
ok_ = (column_family_id == 0);
|
|
return ok_;
|
|
}
|
|
|
|
uint64_t GetLogNumber() const override { return 0; }
|
|
|
|
MemTable* GetMemTable() const override {
|
|
assert(ok_);
|
|
return mem_;
|
|
}
|
|
|
|
ColumnFamilyHandle* GetColumnFamilyHandle() override { return nullptr; }
|
|
|
|
private:
|
|
bool ok_;
|
|
MemTable* mem_;
|
|
};
|
|
|
|
struct WriteBatch::ProtectionInfo {
|
|
// `WriteBatch` usually doesn't contain a huge number of keys so protecting
|
|
// with a fixed, non-configurable eight bytes per key may work well enough.
|
|
autovector<ProtectionInfoKVOC64> entries_;
|
|
|
|
size_t GetBytesPerKey() const { return 8; }
|
|
};
|
|
|
|
// WriteBatchInternal provides static methods for manipulating a
|
|
// WriteBatch that we don't want in the public WriteBatch interface.
|
|
class WriteBatchInternal {
|
|
public:
|
|
|
|
// WriteBatch header has an 8-byte sequence number followed by a 4-byte count.
|
|
static const size_t kHeader = 12;
|
|
|
|
// WriteBatch methods with column_family_id instead of ColumnFamilyHandle*
|
|
static Status Put(WriteBatch* batch, uint32_t column_family_id,
|
|
const Slice& key, const Slice& value);
|
|
|
|
static Status Put(WriteBatch* batch, uint32_t column_family_id,
|
|
const SliceParts& key, const SliceParts& value);
|
|
|
|
static Status Delete(WriteBatch* batch, uint32_t column_family_id,
|
|
const SliceParts& key);
|
|
|
|
static Status Delete(WriteBatch* batch, uint32_t column_family_id,
|
|
const Slice& key);
|
|
|
|
static Status SingleDelete(WriteBatch* batch, uint32_t column_family_id,
|
|
const SliceParts& key);
|
|
|
|
static Status SingleDelete(WriteBatch* batch, uint32_t column_family_id,
|
|
const Slice& key);
|
|
|
|
static Status DeleteRange(WriteBatch* b, uint32_t column_family_id,
|
|
const Slice& begin_key, const Slice& end_key);
|
|
|
|
static Status DeleteRange(WriteBatch* b, uint32_t column_family_id,
|
|
const SliceParts& begin_key,
|
|
const SliceParts& end_key);
|
|
|
|
static Status Merge(WriteBatch* batch, uint32_t column_family_id,
|
|
const Slice& key, const Slice& value);
|
|
|
|
static Status Merge(WriteBatch* batch, uint32_t column_family_id,
|
|
const SliceParts& key, const SliceParts& value);
|
|
|
|
static Status PutBlobIndex(WriteBatch* batch, uint32_t column_family_id,
|
|
const Slice& key, const Slice& value);
|
|
|
|
static Status MarkEndPrepare(WriteBatch* batch, const Slice& xid,
|
|
const bool write_after_commit = true,
|
|
const bool unprepared_batch = false);
|
|
|
|
static Status MarkRollback(WriteBatch* batch, const Slice& xid);
|
|
|
|
static Status MarkCommit(WriteBatch* batch, const Slice& xid);
|
|
|
|
static Status MarkCommitWithTimestamp(WriteBatch* batch, const Slice& xid,
|
|
const Slice& commit_ts);
|
|
|
|
static Status InsertNoop(WriteBatch* batch);
|
|
|
|
// Return the number of entries in the batch.
|
|
static uint32_t Count(const WriteBatch* batch);
|
|
|
|
// Set the count for the number of entries in the batch.
|
|
static void SetCount(WriteBatch* batch, uint32_t n);
|
|
|
|
// Return the sequence number for the start of this batch.
|
|
static SequenceNumber Sequence(const WriteBatch* batch);
|
|
|
|
// Store the specified number as the sequence number for the start of
|
|
// this batch.
|
|
static void SetSequence(WriteBatch* batch, SequenceNumber seq);
|
|
|
|
// Returns the offset of the first entry in the batch.
|
|
// This offset is only valid if the batch is not empty.
|
|
static size_t GetFirstOffset(WriteBatch* batch);
|
|
|
|
static Slice Contents(const WriteBatch* batch) {
|
|
return Slice(batch->rep_);
|
|
}
|
|
|
|
static size_t ByteSize(const WriteBatch* batch) {
|
|
return batch->rep_.size();
|
|
}
|
|
|
|
static Status SetContents(WriteBatch* batch, const Slice& contents);
|
|
|
|
static Status CheckSlicePartsLength(const SliceParts& key,
|
|
const SliceParts& value);
|
|
|
|
// Inserts batches[i] into memtable, for i in 0..num_batches-1 inclusive.
|
|
//
|
|
// If ignore_missing_column_families == true. WriteBatch
|
|
// referencing non-existing column family will be ignored.
|
|
// If ignore_missing_column_families == false, processing of the
|
|
// batches will be stopped if a reference is found to a non-existing
|
|
// column family and InvalidArgument() will be returned. The writes
|
|
// in batches may be only partially applied at that point.
|
|
//
|
|
// If log_number is non-zero, the memtable will be updated only if
|
|
// memtables->GetLogNumber() >= log_number.
|
|
//
|
|
// If flush_scheduler is non-null, it will be invoked if the memtable
|
|
// should be flushed.
|
|
//
|
|
// Under concurrent use, the caller is responsible for making sure that
|
|
// the memtables object itself is thread-local.
|
|
static Status InsertInto(
|
|
WriteThread::WriteGroup& write_group, SequenceNumber sequence,
|
|
ColumnFamilyMemTables* memtables, FlushScheduler* flush_scheduler,
|
|
TrimHistoryScheduler* trim_history_scheduler,
|
|
bool ignore_missing_column_families = false, uint64_t log_number = 0,
|
|
DB* db = nullptr, bool concurrent_memtable_writes = false,
|
|
bool seq_per_batch = false, bool batch_per_txn = true);
|
|
|
|
// Convenience form of InsertInto when you have only one batch
|
|
// next_seq returns the seq after last sequence number used in MemTable insert
|
|
static Status InsertInto(
|
|
const WriteBatch* batch, ColumnFamilyMemTables* memtables,
|
|
FlushScheduler* flush_scheduler,
|
|
TrimHistoryScheduler* trim_history_scheduler,
|
|
bool ignore_missing_column_families = false, uint64_t log_number = 0,
|
|
DB* db = nullptr, bool concurrent_memtable_writes = false,
|
|
SequenceNumber* next_seq = nullptr, bool* has_valid_writes = nullptr,
|
|
bool seq_per_batch = false, bool batch_per_txn = true);
|
|
|
|
static Status InsertInto(WriteThread::Writer* writer, SequenceNumber sequence,
|
|
ColumnFamilyMemTables* memtables,
|
|
FlushScheduler* flush_scheduler,
|
|
TrimHistoryScheduler* trim_history_scheduler,
|
|
bool ignore_missing_column_families = false,
|
|
uint64_t log_number = 0, DB* db = nullptr,
|
|
bool concurrent_memtable_writes = false,
|
|
bool seq_per_batch = false, size_t batch_cnt = 0,
|
|
bool batch_per_txn = true,
|
|
bool hint_per_batch = false);
|
|
|
|
static Status Append(WriteBatch* dst, const WriteBatch* src,
|
|
const bool WAL_only = false);
|
|
|
|
// Returns the byte size of appending a WriteBatch with ByteSize
|
|
// leftByteSize and a WriteBatch with ByteSize rightByteSize
|
|
static size_t AppendedByteSize(size_t leftByteSize, size_t rightByteSize);
|
|
|
|
// Iterate over [begin, end) range of a write batch
|
|
static Status Iterate(const WriteBatch* wb, WriteBatch::Handler* handler,
|
|
size_t begin, size_t end);
|
|
|
|
// This write batch includes the latest state that should be persisted. Such
|
|
// state meant to be used only during recovery.
|
|
static void SetAsLatestPersistentState(WriteBatch* b);
|
|
static bool IsLatestPersistentState(const WriteBatch* b);
|
|
};
|
|
|
|
// LocalSavePoint is similar to a scope guard
|
|
class LocalSavePoint {
|
|
public:
|
|
explicit LocalSavePoint(WriteBatch* batch)
|
|
: batch_(batch),
|
|
savepoint_(batch->GetDataSize(), batch->Count(),
|
|
batch->content_flags_.load(std::memory_order_relaxed))
|
|
#ifndef NDEBUG
|
|
,
|
|
committed_(false)
|
|
#endif
|
|
{
|
|
}
|
|
|
|
#ifndef NDEBUG
|
|
~LocalSavePoint() { assert(committed_); }
|
|
#endif
|
|
Status commit() {
|
|
#ifndef NDEBUG
|
|
committed_ = true;
|
|
#endif
|
|
if (batch_->max_bytes_ && batch_->rep_.size() > batch_->max_bytes_) {
|
|
batch_->rep_.resize(savepoint_.size);
|
|
WriteBatchInternal::SetCount(batch_, savepoint_.count);
|
|
if (batch_->prot_info_ != nullptr) {
|
|
batch_->prot_info_->entries_.resize(savepoint_.count);
|
|
}
|
|
batch_->content_flags_.store(savepoint_.content_flags,
|
|
std::memory_order_relaxed);
|
|
return Status::MemoryLimit();
|
|
}
|
|
return Status::OK();
|
|
}
|
|
|
|
private:
|
|
WriteBatch* batch_;
|
|
SavePoint savepoint_;
|
|
#ifndef NDEBUG
|
|
bool committed_;
|
|
#endif
|
|
};
|
|
|
|
template <typename Derived>
|
|
class TimestampAssignerBase : public WriteBatch::Handler {
|
|
public:
|
|
explicit TimestampAssignerBase(
|
|
WriteBatch::ProtectionInfo* prot_info,
|
|
std::function<Status(uint32_t, size_t&)>&& checker)
|
|
: prot_info_(prot_info), checker_(std::move(checker)) {}
|
|
|
|
~TimestampAssignerBase() override {}
|
|
|
|
Status PutCF(uint32_t cf, const Slice& key, const Slice&) override {
|
|
return AssignTimestamp(cf, key);
|
|
}
|
|
|
|
Status DeleteCF(uint32_t cf, const Slice& key) override {
|
|
return AssignTimestamp(cf, key);
|
|
}
|
|
|
|
Status SingleDeleteCF(uint32_t cf, const Slice& key) override {
|
|
return AssignTimestamp(cf, key);
|
|
}
|
|
|
|
Status DeleteRangeCF(uint32_t cf, const Slice& begin_key,
|
|
const Slice&) override {
|
|
return AssignTimestamp(cf, begin_key);
|
|
}
|
|
|
|
Status MergeCF(uint32_t cf, const Slice& key, const Slice&) override {
|
|
return AssignTimestamp(cf, key);
|
|
}
|
|
|
|
Status PutBlobIndexCF(uint32_t cf, const Slice& key, const Slice&) override {
|
|
return AssignTimestamp(cf, key);
|
|
}
|
|
|
|
Status MarkBeginPrepare(bool) override { return Status::OK(); }
|
|
|
|
Status MarkEndPrepare(const Slice&) override { return Status::OK(); }
|
|
|
|
Status MarkCommit(const Slice&) override { return Status::OK(); }
|
|
|
|
Status MarkCommitWithTimestamp(const Slice&, const Slice&) override {
|
|
return Status::OK();
|
|
}
|
|
|
|
Status MarkRollback(const Slice&) override { return Status::OK(); }
|
|
|
|
Status MarkNoop(bool /*empty_batch*/) override { return Status::OK(); }
|
|
|
|
protected:
|
|
Status AssignTimestamp(uint32_t cf, const Slice& key) {
|
|
Status s = static_cast_with_check<Derived>(this)->AssignTimestampImpl(
|
|
cf, key, idx_);
|
|
++idx_;
|
|
return s;
|
|
}
|
|
|
|
Status CheckTimestampSize(uint32_t cf, size_t& ts_sz) {
|
|
return checker_(cf, ts_sz);
|
|
}
|
|
|
|
Status UpdateTimestampIfNeeded(size_t ts_sz, const Slice& key,
|
|
const Slice& ts) {
|
|
if (ts_sz > 0) {
|
|
assert(ts_sz == ts.size());
|
|
UpdateProtectionInformationIfNeeded(key, ts);
|
|
UpdateTimestamp(key, ts);
|
|
}
|
|
return Status::OK();
|
|
}
|
|
|
|
void UpdateProtectionInformationIfNeeded(const Slice& key, const Slice& ts) {
|
|
if (prot_info_ != nullptr) {
|
|
const size_t ts_sz = ts.size();
|
|
SliceParts old_key(&key, 1);
|
|
Slice key_no_ts(key.data(), key.size() - ts_sz);
|
|
std::array<Slice, 2> new_key_cmpts{{key_no_ts, ts}};
|
|
SliceParts new_key(new_key_cmpts.data(), 2);
|
|
prot_info_->entries_[idx_].UpdateK(old_key, new_key);
|
|
}
|
|
}
|
|
|
|
void UpdateTimestamp(const Slice& key, const Slice& ts) {
|
|
const size_t ts_sz = ts.size();
|
|
char* ptr = const_cast<char*>(key.data() + key.size() - ts_sz);
|
|
assert(ptr);
|
|
memcpy(ptr, ts.data(), ts_sz);
|
|
}
|
|
|
|
// No copy or move.
|
|
TimestampAssignerBase(const TimestampAssignerBase&) = delete;
|
|
TimestampAssignerBase(TimestampAssignerBase&&) = delete;
|
|
TimestampAssignerBase& operator=(const TimestampAssignerBase&) = delete;
|
|
TimestampAssignerBase& operator=(TimestampAssignerBase&&) = delete;
|
|
|
|
WriteBatch::ProtectionInfo* const prot_info_ = nullptr;
|
|
const std::function<Status(uint32_t, size_t&)> checker_{};
|
|
size_t idx_ = 0;
|
|
};
|
|
|
|
class SimpleListTimestampAssigner
|
|
: public TimestampAssignerBase<SimpleListTimestampAssigner> {
|
|
public:
|
|
explicit SimpleListTimestampAssigner(
|
|
WriteBatch::ProtectionInfo* prot_info,
|
|
std::function<Status(uint32_t, size_t&)>&& checker,
|
|
const std::vector<Slice>& timestamps)
|
|
: TimestampAssignerBase<SimpleListTimestampAssigner>(prot_info,
|
|
std::move(checker)),
|
|
timestamps_(timestamps) {}
|
|
|
|
~SimpleListTimestampAssigner() override {}
|
|
|
|
private:
|
|
friend class TimestampAssignerBase<SimpleListTimestampAssigner>;
|
|
|
|
Status AssignTimestampImpl(uint32_t cf, const Slice& key, size_t idx) {
|
|
if (idx >= timestamps_.size()) {
|
|
return Status::InvalidArgument("Need more timestamps for the assignment");
|
|
}
|
|
const Slice& ts = timestamps_[idx];
|
|
size_t ts_sz = ts.size();
|
|
const Status s = this->CheckTimestampSize(cf, ts_sz);
|
|
if (!s.ok()) {
|
|
return s;
|
|
}
|
|
return this->UpdateTimestampIfNeeded(ts_sz, key, ts);
|
|
}
|
|
|
|
const std::vector<Slice>& timestamps_;
|
|
};
|
|
|
|
class TimestampAssigner : public TimestampAssignerBase<TimestampAssigner> {
|
|
public:
|
|
explicit TimestampAssigner(WriteBatch::ProtectionInfo* prot_info,
|
|
std::function<Status(uint32_t, size_t&)>&& checker,
|
|
const Slice& ts)
|
|
: TimestampAssignerBase<TimestampAssigner>(prot_info, std::move(checker)),
|
|
timestamp_(ts) {
|
|
assert(!timestamp_.empty());
|
|
}
|
|
~TimestampAssigner() override {}
|
|
|
|
private:
|
|
friend class TimestampAssignerBase<TimestampAssigner>;
|
|
|
|
Status AssignTimestampImpl(uint32_t cf, const Slice& key, size_t /*idx*/) {
|
|
if (timestamp_.empty()) {
|
|
return Status::InvalidArgument("Timestamp is empty");
|
|
}
|
|
size_t ts_sz = timestamp_.size();
|
|
const Status s = this->CheckTimestampSize(cf, ts_sz);
|
|
if (!s.ok()) {
|
|
return s;
|
|
}
|
|
return this->UpdateTimestampIfNeeded(ts_sz, key, timestamp_);
|
|
}
|
|
|
|
const Slice timestamp_;
|
|
};
|
|
|
|
} // namespace ROCKSDB_NAMESPACE
|