WriteBatch Save Points

Summary:
Support RollbackToSavePoint() in WriteBatch and WriteBatchWithIndex.  Support for partial transaction rollback is needed for MyRocks.

An alternate implementation of Transaction::RollbackToSavePoint() exists in D40869.  However, the other implementation is messier because it is implemented outside of WriteBatch.  This implementation is much cleaner and also exposes a potentially useful feature to WriteBatch.

Test Plan: Added unit tests

Reviewers: IslamAbdelRahman, kradhakrishnan, maykov, yoshinorim, hermanlee4, spetrunia, sdong, yhchiang

Reviewed By: yhchiang

Subscribers: dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D42723
This commit is contained in:
agiardullo 2015-07-10 20:15:45 -07:00
parent 7bfae3a723
commit 8161bdb5a0
10 changed files with 442 additions and 6 deletions

View File

@ -2,6 +2,9 @@
## Unreleased
### New Features
* RollbackToSavePoint() in WriteBatch/WriteBatchWithIndex
### Public API Changes
* Deprecated WriteOptions::timeout_hint_us. We no longer support write timeout. If you really need this option, talk to us and we might consider returning it.
* Deprecated purge_redundant_kvs_while_flush option.

View File

@ -23,6 +23,10 @@
// data: uint8[len]
#include "rocksdb/write_batch.h"
#include <stack>
#include <stdexcept>
#include "rocksdb/merge_operator.h"
#include "db/dbformat.h"
#include "db/db_impl.h"
@ -32,7 +36,6 @@
#include "db/write_batch_internal.h"
#include "util/coding.h"
#include "util/statistics.h"
#include <stdexcept>
#include "util/perf_context_imp.h"
namespace rocksdb {
@ -40,12 +43,26 @@ namespace rocksdb {
// WriteBatch header has an 8-byte sequence number followed by a 4-byte count.
static const size_t kHeader = 12;
WriteBatch::WriteBatch(size_t reserved_bytes) {
struct SavePoint {
size_t size; // size of rep_
int count; // count of elements in rep_
SavePoint(size_t s, int c) : size(s), count(c) {}
};
struct SavePoints {
std::stack<SavePoint> stack;
};
WriteBatch::WriteBatch(size_t reserved_bytes) : save_points_(nullptr) {
rep_.reserve((reserved_bytes > kHeader) ? reserved_bytes : kHeader);
Clear();
}
WriteBatch::~WriteBatch() { }
WriteBatch::~WriteBatch() {
if (save_points_ != nullptr) {
delete save_points_;
}
}
WriteBatch::Handler::~Handler() { }
@ -61,6 +78,12 @@ bool WriteBatch::Handler::Continue() {
void WriteBatch::Clear() {
rep_.clear();
rep_.resize(kHeader);
if (save_points_ != nullptr) {
while (!save_points_->stack.empty()) {
save_points_->stack.pop();
}
}
}
int WriteBatch::Count() const {
@ -188,6 +211,8 @@ void WriteBatchInternal::SetSequence(WriteBatch* b, SequenceNumber seq) {
EncodeFixed64(&b->rep_[0], seq);
}
size_t WriteBatchInternal::GetFirstOffset(WriteBatch* b) { return kHeader; }
void WriteBatchInternal::Put(WriteBatch* b, uint32_t column_family_id,
const Slice& key, const Slice& value) {
WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
@ -301,6 +326,38 @@ void WriteBatch::PutLogData(const Slice& blob) {
PutLengthPrefixedSlice(&rep_, blob);
}
void WriteBatch::SetSavePoint() {
if (save_points_ == nullptr) {
save_points_ = new SavePoints();
}
// Record length and count of current batch of writes.
save_points_->stack.push(SavePoint(GetDataSize(), Count()));
}
Status WriteBatch::RollbackToSavePoint() {
if (save_points_ == nullptr || save_points_->stack.size() == 0) {
return Status::NotFound();
}
// Pop the most recent savepoint off the stack
SavePoint savepoint = save_points_->stack.top();
save_points_->stack.pop();
assert(savepoint.size <= rep_.size());
if (savepoint.size == rep_.size()) {
// No changes to rollback
} else if (savepoint.size == 0) {
// Rollback everything
Clear();
} else {
rep_.resize(savepoint.size);
WriteBatchInternal::SetCount(this, savepoint.count);
}
return Status::OK();
}
namespace {
// This class can *only* be used from a single-threaded write thread, because it
// calls ColumnFamilyMemTablesImpl::Seek()

View File

@ -92,6 +92,10 @@ class WriteBatchInternal {
// this batch.
static void SetSequence(WriteBatch* batch, SequenceNumber seq);
// Returns the offset of the first entry in the batch.
// This offset is only valid if the batch is not empty.
static size_t GetFirstOffset(WriteBatch* batch);
static Slice Contents(const WriteBatch* batch) {
return Slice(batch->rep_);
}

View File

@ -446,6 +446,111 @@ TEST_F(WriteBatchTest, ColumnFamiliesBatchWithIndexTest) {
}
#endif // !ROCKSDB_LITE
TEST_F(WriteBatchTest, SavePointTest) {
Status s;
WriteBatch batch;
batch.SetSavePoint();
batch.Put("A", "a");
batch.Put("B", "b");
batch.SetSavePoint();
batch.Put("C", "c");
batch.Delete("A");
batch.SetSavePoint();
batch.SetSavePoint();
ASSERT_OK(batch.RollbackToSavePoint());
ASSERT_EQ(
"Delete(A)@3"
"Put(A, a)@0"
"Put(B, b)@1"
"Put(C, c)@2",
PrintContents(&batch));
ASSERT_OK(batch.RollbackToSavePoint());
ASSERT_OK(batch.RollbackToSavePoint());
ASSERT_EQ(
"Put(A, a)@0"
"Put(B, b)@1",
PrintContents(&batch));
batch.Delete("A");
batch.Put("B", "bb");
ASSERT_OK(batch.RollbackToSavePoint());
ASSERT_EQ("", PrintContents(&batch));
s = batch.RollbackToSavePoint();
ASSERT_TRUE(s.IsNotFound());
ASSERT_EQ("", PrintContents(&batch));
batch.Put("D", "d");
batch.Delete("A");
batch.SetSavePoint();
batch.Put("A", "aaa");
ASSERT_OK(batch.RollbackToSavePoint());
ASSERT_EQ(
"Delete(A)@1"
"Put(D, d)@0",
PrintContents(&batch));
batch.SetSavePoint();
batch.Put("D", "d");
batch.Delete("A");
ASSERT_OK(batch.RollbackToSavePoint());
ASSERT_EQ(
"Delete(A)@1"
"Put(D, d)@0",
PrintContents(&batch));
s = batch.RollbackToSavePoint();
ASSERT_TRUE(s.IsNotFound());
ASSERT_EQ(
"Delete(A)@1"
"Put(D, d)@0",
PrintContents(&batch));
WriteBatch batch2;
s = batch2.RollbackToSavePoint();
ASSERT_TRUE(s.IsNotFound());
ASSERT_EQ("", PrintContents(&batch2));
batch2.Delete("A");
batch2.SetSavePoint();
s = batch2.RollbackToSavePoint();
ASSERT_OK(s);
ASSERT_EQ("Delete(A)@0", PrintContents(&batch2));
batch2.Clear();
ASSERT_EQ("", PrintContents(&batch2));
batch2.SetSavePoint();
batch2.Delete("B");
ASSERT_EQ("Delete(B)@0", PrintContents(&batch2));
batch2.SetSavePoint();
s = batch2.RollbackToSavePoint();
ASSERT_OK(s);
ASSERT_EQ("Delete(B)@0", PrintContents(&batch2));
s = batch2.RollbackToSavePoint();
ASSERT_OK(s);
ASSERT_EQ("", PrintContents(&batch2));
s = batch2.RollbackToSavePoint();
ASSERT_TRUE(s.IsNotFound());
ASSERT_EQ("", PrintContents(&batch2));
}
} // namespace rocksdb
int main(int argc, char** argv) {

View File

@ -157,6 +157,22 @@ class WriteBatchWithIndex : public WriteBatchBase {
ColumnFamilyHandle* column_family, const Slice& key,
std::string* value);
// Records the state of the batch for future calls to RollbackToSavePoint().
// May be called multiple times to set multiple save points.
void SetSavePoint() override;
// Remove all entries in this batch (Put, Merge, Delete, PutLogData) since the
// most recent call to SetSavePoint() and removes the most recent save point.
// If there is no previous call to SetSavePoint(), behaves the same as
// Clear().
//
// Calling RollbackToSavePoint invalidates any open iterators on this batch.
//
// Returns Status::OK() on success,
// Status::NotFound() if no previous call to SetSavePoint(),
// or other Status on corruption.
Status RollbackToSavePoint() override;
private:
struct Rep;
Rep* rep;

View File

@ -25,6 +25,7 @@
#ifndef STORAGE_ROCKSDB_INCLUDE_WRITE_BATCH_H_
#define STORAGE_ROCKSDB_INCLUDE_WRITE_BATCH_H_
#include <stack>
#include <string>
#include <stdint.h>
#include "rocksdb/status.h"
@ -34,6 +35,7 @@ namespace rocksdb {
class Slice;
class ColumnFamilyHandle;
struct SavePoints;
struct SliceParts;
class WriteBatch : public WriteBatchBase {
@ -101,6 +103,17 @@ class WriteBatch : public WriteBatchBase {
// Clear all updates buffered in this batch.
void Clear() override;
// Records the state of the batch for future calls to RollbackToSavePoint().
// May be called multiple times to set multiple save points.
void SetSavePoint() override;
// Remove all entries in this batch (Put, Merge, Delete, PutLogData) since the
// most recent call to SetSavePoint() and removes the most recent save point.
// If there is no previous call to SetSavePoint(), Status::NotFound()
// will be returned.
// Oterwise returns Status::OK().
Status RollbackToSavePoint() override;
// Support for iterating over the contents of a batch.
class Handler {
public:
@ -168,10 +181,11 @@ class WriteBatch : public WriteBatchBase {
WriteBatch* GetWriteBatch() override { return this; }
// Constructor with a serialized string object
explicit WriteBatch(std::string rep): rep_(rep) {}
explicit WriteBatch(std::string rep) : save_points_(nullptr), rep_(rep) {}
private:
friend class WriteBatchInternal;
SavePoints* save_points_;
protected:
std::string rep_; // See comment in write_batch.cc for the format of rep_

View File

@ -11,6 +11,7 @@
namespace rocksdb {
class Slice;
class Status;
class ColumnFamilyHandle;
class WriteBatch;
struct SliceParts;
@ -72,6 +73,16 @@ class WriteBatchBase {
// converting any WriteBatchBase(eg WriteBatchWithIndex) into a basic
// WriteBatch.
virtual WriteBatch* GetWriteBatch() = 0;
// Records the state of the batch for future calls to RollbackToSavePoint().
// May be called multiple times to set multiple save points.
virtual void SetSavePoint() = 0;
// Remove all entries in this batch (Put, Merge, Delete, PutLogData) since the
// most recent call to SetSavePoint() and removes the most recent save point.
// If there is no previous call to SetSavePoint(), behaves the same as
// Clear().
virtual Status RollbackToSavePoint() = 0;
};
} // namespace rocksdb

View File

@ -400,6 +400,11 @@ struct WriteBatchWithIndex::Rep {
// Clear all updates buffered in this batch.
void Clear();
void ClearIndex();
// Rebuild index by reading all records from the batch.
// Returns non-ok status on corruption.
Status ReBuildIndex();
};
bool WriteBatchWithIndex::Rep::UpdateExistingEntry(
@ -455,13 +460,73 @@ void WriteBatchWithIndex::Rep::AddNewEntry(uint32_t column_family_id) {
void WriteBatchWithIndex::Rep::Clear() {
write_batch.Clear();
ClearIndex();
}
void WriteBatchWithIndex::Rep::ClearIndex() {
skip_list.~WriteBatchEntrySkipList();
arena.~Arena();
new (&arena) Arena();
skip_list.~WriteBatchEntrySkipList();
new (&skip_list) WriteBatchEntrySkipList(comparator, &arena);
last_entry_offset = 0;
}
Status WriteBatchWithIndex::Rep::ReBuildIndex() {
Status s;
ClearIndex();
if (write_batch.Count() == 0) {
// Nothing to re-index
return s;
}
size_t offset = WriteBatchInternal::GetFirstOffset(&write_batch);
Slice input(write_batch.Data());
input.remove_prefix(offset);
// Loop through all entries in Rep and add each one to the index
int found = 0;
while (s.ok() && !input.empty()) {
Slice key, value, blob;
uint32_t column_family_id = 0; // default
char tag = 0;
// set offset of current entry for call to AddNewEntry()
last_entry_offset = input.data() - write_batch.Data().data();
s = ReadRecordFromWriteBatch(&input, &tag, &column_family_id, &key,
&value, &blob);
if (!s.ok()) {
break;
}
switch (tag) {
case kTypeColumnFamilyValue:
case kTypeValue:
case kTypeColumnFamilyDeletion:
case kTypeDeletion:
case kTypeColumnFamilyMerge:
case kTypeMerge:
found++;
if (!UpdateExistingEntryWithCfId(column_family_id, key)) {
AddNewEntry(column_family_id);
}
break;
case kTypeLogData:
break;
default:
return Status::Corruption("unknown WriteBatch tag");
}
}
if (s.ok() && found != write_batch.Count()) {
s = Status::Corruption("WriteBatch has wrong count");
}
return s;
}
WriteBatchWithIndex::WriteBatchWithIndex(
const Comparator* default_index_comparator, size_t reserved_bytes,
@ -640,5 +705,17 @@ Status WriteBatchWithIndex::GetFromBatchAndDB(DB* db,
return s;
}
void WriteBatchWithIndex::SetSavePoint() { rep->write_batch.SetSavePoint(); }
Status WriteBatchWithIndex::RollbackToSavePoint() {
Status s = rep->write_batch.RollbackToSavePoint();
if (s.ok()) {
s = rep->ReBuildIndex();
}
return s;
}
} // namespace rocksdb
#endif // !ROCKSDB_LITE

View File

@ -30,7 +30,12 @@ Status ReadableWriteBatch::GetEntryFromDataOffset(size_t data_offset,
return Status::InvalidArgument("Output parameters cannot be null");
}
if (data_offset >= GetDataSize()) {
if (data_offset == GetDataSize()) {
// reached end of batch.
return Status::NotFound();
}
if (data_offset > GetDataSize()) {
return Status::InvalidArgument("data offset exceed write batch size");
}
Slice input = Slice(rep_.data() + data_offset, rep_.size() - data_offset);

View File

@ -1364,6 +1364,150 @@ TEST_F(WriteBatchWithIndexTest, MutateWhileIteratingBaseStressTest) {
}
}
}
static std::string PrintContents(WriteBatchWithIndex* batch,
ColumnFamilyHandle* column_family) {
std::string result;
WBWIIterator* iter;
if (column_family == nullptr) {
iter = batch->NewIterator();
} else {
iter = batch->NewIterator(column_family);
}
iter->SeekToFirst();
while (iter->Valid()) {
WriteEntry e = iter->Entry();
if (e.type == kPutRecord) {
result.append("PUT(");
result.append(e.key.ToString());
result.append("):");
result.append(e.value.ToString());
} else if (e.type == kMergeRecord) {
result.append("MERGE(");
result.append(e.key.ToString());
result.append("):");
result.append(e.value.ToString());
} else {
assert(e.type == kDeleteRecord);
result.append("DEL(");
result.append(e.key.ToString());
result.append(")");
}
result.append(",");
iter->Next();
}
delete iter;
return result;
}
TEST_F(WriteBatchWithIndexTest, SavePointTest) {
WriteBatchWithIndex batch;
ColumnFamilyHandleImplDummy cf1(1, BytewiseComparator());
Status s;
batch.Put("A", "a");
batch.Put("B", "b");
batch.Put("A", "aa");
batch.Put(&cf1, "A", "a1");
batch.Delete(&cf1, "B");
batch.Put(&cf1, "C", "c1");
batch.SetSavePoint();
batch.Put("C", "cc");
batch.Put("B", "bb");
batch.Delete("A");
batch.Put(&cf1, "B", "b1");
batch.Delete(&cf1, "A");
batch.SetSavePoint();
batch.Put("A", "aaa");
batch.Put("A", "xxx");
batch.Delete("B");
batch.Put(&cf1, "B", "b2");
batch.Delete(&cf1, "C");
batch.SetSavePoint();
batch.SetSavePoint();
batch.Delete("D");
batch.Delete(&cf1, "D");
ASSERT_EQ(
"PUT(A):a,PUT(A):aa,DEL(A),PUT(A):aaa,PUT(A):xxx,PUT(B):b,PUT(B):bb,DEL("
"B)"
",PUT(C):cc,DEL(D),",
PrintContents(&batch, nullptr));
ASSERT_EQ(
"PUT(A):a1,DEL(A),DEL(B),PUT(B):b1,PUT(B):b2,PUT(C):c1,DEL(C),"
"DEL(D),",
PrintContents(&batch, &cf1));
ASSERT_OK(batch.RollbackToSavePoint());
ASSERT_EQ(
"PUT(A):a,PUT(A):aa,DEL(A),PUT(A):aaa,PUT(A):xxx,PUT(B):b,PUT(B):bb,DEL("
"B)"
",PUT(C):cc,",
PrintContents(&batch, nullptr));
ASSERT_EQ("PUT(A):a1,DEL(A),DEL(B),PUT(B):b1,PUT(B):b2,PUT(C):c1,DEL(C),",
PrintContents(&batch, &cf1));
ASSERT_OK(batch.RollbackToSavePoint());
ASSERT_EQ(
"PUT(A):a,PUT(A):aa,DEL(A),PUT(A):aaa,PUT(A):xxx,PUT(B):b,PUT(B):bb,DEL("
"B)"
",PUT(C):cc,",
PrintContents(&batch, nullptr));
ASSERT_EQ("PUT(A):a1,DEL(A),DEL(B),PUT(B):b1,PUT(B):b2,PUT(C):c1,DEL(C),",
PrintContents(&batch, &cf1));
ASSERT_OK(batch.RollbackToSavePoint());
ASSERT_EQ("PUT(A):a,PUT(A):aa,DEL(A),PUT(B):b,PUT(B):bb,PUT(C):cc,",
PrintContents(&batch, nullptr));
ASSERT_EQ("PUT(A):a1,DEL(A),DEL(B),PUT(B):b1,PUT(C):c1,",
PrintContents(&batch, &cf1));
batch.SetSavePoint();
batch.Put("X", "x");
ASSERT_EQ("PUT(A):a,PUT(A):aa,DEL(A),PUT(B):b,PUT(B):bb,PUT(C):cc,PUT(X):x,",
PrintContents(&batch, nullptr));
ASSERT_OK(batch.RollbackToSavePoint());
ASSERT_EQ("PUT(A):a,PUT(A):aa,DEL(A),PUT(B):b,PUT(B):bb,PUT(C):cc,",
PrintContents(&batch, nullptr));
ASSERT_EQ("PUT(A):a1,DEL(A),DEL(B),PUT(B):b1,PUT(C):c1,",
PrintContents(&batch, &cf1));
ASSERT_OK(batch.RollbackToSavePoint());
ASSERT_EQ("PUT(A):a,PUT(A):aa,PUT(B):b,", PrintContents(&batch, nullptr));
ASSERT_EQ("PUT(A):a1,DEL(B),PUT(C):c1,", PrintContents(&batch, &cf1));
s = batch.RollbackToSavePoint();
ASSERT_TRUE(s.IsNotFound());
ASSERT_EQ("PUT(A):a,PUT(A):aa,PUT(B):b,", PrintContents(&batch, nullptr));
ASSERT_EQ("PUT(A):a1,DEL(B),PUT(C):c1,", PrintContents(&batch, &cf1));
batch.SetSavePoint();
batch.Clear();
ASSERT_EQ("", PrintContents(&batch, nullptr));
ASSERT_EQ("", PrintContents(&batch, &cf1));
s = batch.RollbackToSavePoint();
ASSERT_TRUE(s.IsNotFound());
}
} // namespace
int main(int argc, char** argv) {