[CF] Handle failure in WriteBatch::Handler

Summary:
* Add ColumnFamilyHandle::GetID() function. Client needs to know column family's ID to be able to construct WriteBatch
* Handle WriteBatch::Handler failure gracefully. Since WriteBatch is not a very smart function (it takes raw CF id), client can add data to WriteBatch for column family that doesn't exist. In that case, we need to gracefully return failure status from DB::Write(). To do that, I added a return Status to WriteBatch functions PutCF, DeleteCF and MergeCF.

Test Plan: Added test to column_family_test

Reviewers: dhruba, haobo

CC: leveldb

Differential Revision: https://reviews.facebook.net/D16323
This commit is contained in:
Igor Canadi 2014-02-25 17:30:54 -08:00
parent 944ff673d6
commit 8b7ab9951c
10 changed files with 98 additions and 48 deletions

View File

@ -6,7 +6,6 @@
executed in high priority thread pool.
## Unreleased (will be relased in 2.8)
## Unreleased
### Public API changes

View File

@ -44,6 +44,8 @@ ColumnFamilyHandleImpl::~ColumnFamilyHandleImpl() {
}
}
uint32_t ColumnFamilyHandleImpl::GetID() const { return cfd()->GetID(); }
namespace {
// Fix user-supplied options to be reasonable
template <class T, class V>

View File

@ -41,6 +41,8 @@ class ColumnFamilyHandleImpl : public ColumnFamilyHandle {
virtual ~ColumnFamilyHandleImpl();
virtual ColumnFamilyData* cfd() const { return cfd_; }
virtual uint32_t GetID() const override;
private:
ColumnFamilyData* cfd_;
DBImpl* db_;

View File

@ -259,6 +259,17 @@ TEST(ColumnFamilyTest, DropTest) {
}
}
TEST(ColumnFamilyTest, WriteBatchFailure) {
Open();
WriteBatch batch;
batch.Put(1, Slice("non-existing"), Slice("column-family"));
Status s = db_->Write(WriteOptions(), &batch);
ASSERT_TRUE(s.IsInvalidArgument());
CreateColumnFamilies({"one"});
ASSERT_OK(db_->Write(WriteOptions(), &batch));
Close();
}
TEST(ColumnFamilyTest, ReadWrite) {
ASSERT_OK(Open({"default"}));
CreateColumnFamilies({"one", "two"});

View File

@ -3304,11 +3304,13 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
BumpPerfTime(&perf_context.write_memtable_time, &write_memtable_timer);
if (!status.ok()) {
// Panic for in-memory corruptions
// Iteration failed (either in-memory writebatch corruption (very
// bad), or the client specified invalid column family). Return
// failure.
// Note that existing logic was not sound. Any partial failure writing
// into the memtable would result in a state that some write ops might
// have succeeded in memtable but Status reports error for all writes.
throw std::runtime_error("In memory WriteBatch corruption!");
return status;
}
SetTickerCount(options_.statistics.get(), SEQUENCE_NUMBER,
last_sequence);
@ -3822,24 +3824,21 @@ Status DB::Put(const WriteOptions& opt, ColumnFamilyHandle* column_family,
// 8 bytes are taken by header, 4 bytes for count, 1 byte for type,
// and we allocate 11 extra bytes for key length, as well as value length.
WriteBatch batch(key.size() + value.size() + 24);
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
batch.Put(cfh->cfd()->GetID(), key, value);
batch.Put(column_family->GetID(), key, value);
return Write(opt, &batch);
}
Status DB::Delete(const WriteOptions& opt, ColumnFamilyHandle* column_family,
const Slice& key) {
WriteBatch batch;
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
batch.Delete(cfh->cfd()->GetID(), key);
batch.Delete(column_family->GetID(), key);
return Write(opt, &batch);
}
Status DB::Merge(const WriteOptions& opt, ColumnFamilyHandle* column_family,
const Slice& key, const Slice& value) {
WriteBatch batch;
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
batch.Merge(cfh->cfd()->GetID(), key, value);
batch.Merge(column_family->GetID(), key, value);
return Write(opt, &batch);
}

View File

@ -4804,19 +4804,22 @@ TEST(DBTest, TransactionLogIteratorBlobs) {
auto res = OpenTransactionLogIter(0)->GetBatch();
struct Handler : public WriteBatch::Handler {
std::string seen;
virtual void PutCF(uint32_t cf, const Slice& key, const Slice& value) {
virtual Status PutCF(uint32_t cf, const Slice& key, const Slice& value) {
seen += "Put(" + std::to_string(cf) + ", " + key.ToString() + ", " +
std::to_string(value.size()) + ")";
return Status::OK();
}
virtual void MergeCF(uint32_t cf, const Slice& key, const Slice& value) {
virtual Status MergeCF(uint32_t cf, const Slice& key, const Slice& value) {
seen += "Merge(" + std::to_string(cf) + ", " + key.ToString() + ", " +
std::to_string(value.size()) + ")";
return Status::OK();
}
virtual void LogData(const Slice& blob) {
seen += "LogData(" + blob.ToString() + ")";
}
virtual void DeleteCF(uint32_t cf, const Slice& key) {
virtual Status DeleteCF(uint32_t cf, const Slice& key) {
seen += "Delete(" + std::to_string(cf) + ", " + key.ToString() + ")";
return Status::OK();
}
} handler;
res.writeBatchPtr->Iterate(&handler);

View File

@ -89,7 +89,8 @@ Status WriteBatch::Iterate(Handler* handler) const {
input.remove_prefix(kHeader);
Slice key, value, blob;
int found = 0;
while (!input.empty() && handler->Continue()) {
Status s;
while (s.ok() && !input.empty() && handler->Continue()) {
char tag = input[0];
input.remove_prefix(1);
uint32_t column_family = 0; // default
@ -98,11 +99,11 @@ Status WriteBatch::Iterate(Handler* handler) const {
if (!GetVarint32(&input, &column_family)) {
return Status::Corruption("bad WriteBatch Put");
}
// intentional fallthrough
// intentional fallthrough
case kTypeValue:
if (GetLengthPrefixedSlice(&input, &key) &&
GetLengthPrefixedSlice(&input, &value)) {
handler->PutCF(column_family, key, value);
s = handler->PutCF(column_family, key, value);
found++;
} else {
return Status::Corruption("bad WriteBatch Put");
@ -112,10 +113,10 @@ Status WriteBatch::Iterate(Handler* handler) const {
if (!GetVarint32(&input, &column_family)) {
return Status::Corruption("bad WriteBatch Delete");
}
// intentional fallthrough
// intentional fallthrough
case kTypeDeletion:
if (GetLengthPrefixedSlice(&input, &key)) {
handler->DeleteCF(column_family, key);
s = handler->DeleteCF(column_family, key);
found++;
} else {
return Status::Corruption("bad WriteBatch Delete");
@ -125,11 +126,11 @@ Status WriteBatch::Iterate(Handler* handler) const {
if (!GetVarint32(&input, &column_family)) {
return Status::Corruption("bad WriteBatch Merge");
}
// intentional fallthrough
// intentional fallthrough
case kTypeMerge:
if (GetLengthPrefixedSlice(&input, &key) &&
GetLengthPrefixedSlice(&input, &value)) {
handler->MergeCF(column_family, key, value);
s = handler->MergeCF(column_family, key, value);
found++;
} else {
return Status::Corruption("bad WriteBatch Merge");
@ -146,7 +147,10 @@ Status WriteBatch::Iterate(Handler* handler) const {
return Status::Corruption("unknown WriteBatch tag");
}
}
if (found != WriteBatchInternal::Count(this)) {
if (!s.ok()) {
return s;
}
if (found != WriteBatchInternal::Count(this)) {
return Status::Corruption("WriteBatch has wrong count");
} else {
return Status::OK();
@ -251,13 +255,15 @@ class MemTableInserter : public WriteBatch::Handler {
return log_number_ != 0 && log_number_ < cf_mems_->GetLogNumber();
}
virtual void PutCF(uint32_t column_family_id, const Slice& key,
const Slice& value) {
virtual Status PutCF(uint32_t column_family_id, const Slice& key,
const Slice& value) {
bool found = cf_mems_->Seek(column_family_id);
// TODO(icanadi) if found = false somehow return the error to caller
// Will need to change public API to do this
if (!found || IgnoreUpdate()) {
return;
if (!found) {
return Status::InvalidArgument(
"Invalid column family specified in write batch");
}
if (IgnoreUpdate()) {
return Status::OK();
}
MemTable* mem = cf_mems_->GetMemTable();
const Options* options = cf_mems_->GetFullOptions();
@ -304,13 +310,18 @@ class MemTableInserter : public WriteBatch::Handler {
// sequence number. Even if the update eventually fails and does not result
// in memtable add/update.
sequence_++;
return Status::OK();
}
virtual void MergeCF(uint32_t column_family_id, const Slice& key,
const Slice& value) {
virtual Status MergeCF(uint32_t column_family_id, const Slice& key,
const Slice& value) {
bool found = cf_mems_->Seek(column_family_id);
if (!found || IgnoreUpdate()) {
return;
if (!found) {
return Status::InvalidArgument(
"Invalid column family specified in write batch");
}
if (IgnoreUpdate()) {
return Status::OK();
}
MemTable* mem = cf_mems_->GetMemTable();
const Options* options = cf_mems_->GetFullOptions();
@ -372,12 +383,17 @@ class MemTableInserter : public WriteBatch::Handler {
}
sequence_++;
return Status::OK();
}
virtual void DeleteCF(uint32_t column_family_id, const Slice& key) {
virtual Status DeleteCF(uint32_t column_family_id, const Slice& key) {
bool found = cf_mems_->Seek(column_family_id);
if (!found || IgnoreUpdate()) {
return;
if (!found) {
return Status::InvalidArgument(
"Invalid column family specified in write batch");
}
if (IgnoreUpdate()) {
return Status::OK();
}
MemTable* mem = cf_mems_->GetMemTable();
const Options* options = cf_mems_->GetFullOptions();
@ -393,11 +409,12 @@ class MemTableInserter : public WriteBatch::Handler {
}
if (!db_->KeyMayExist(ropts, cf_handle, key, &value)) {
RecordTick(options->statistics.get(), NUMBER_FILTERED_DELETES);
return;
return Status::OK();
}
}
mem->Add(sequence_, kTypeDeletion, key, Slice());
sequence_++;
return Status::OK();
}
};
} // namespace

View File

@ -145,7 +145,7 @@ TEST(WriteBatchTest, Append) {
namespace {
struct TestHandler : public WriteBatch::Handler {
std::string seen;
virtual void PutCF(uint32_t column_family_id, const Slice& key,
virtual Status PutCF(uint32_t column_family_id, const Slice& key,
const Slice& value) {
if (column_family_id == 0) {
seen += "Put(" + key.ToString() + ", " + value.ToString() + ")";
@ -153,8 +153,9 @@ namespace {
seen += "PutCF(" + std::to_string(column_family_id) + ", " +
key.ToString() + ", " + value.ToString() + ")";
}
return Status::OK();
}
virtual void MergeCF(uint32_t column_family_id, const Slice& key,
virtual Status MergeCF(uint32_t column_family_id, const Slice& key,
const Slice& value) {
if (column_family_id == 0) {
seen += "Merge(" + key.ToString() + ", " + value.ToString() + ")";
@ -162,17 +163,19 @@ namespace {
seen += "MergeCF(" + std::to_string(column_family_id) + ", " +
key.ToString() + ", " + value.ToString() + ")";
}
return Status::OK();
}
virtual void LogData(const Slice& blob) {
seen += "LogData(" + blob.ToString() + ")";
}
virtual void DeleteCF(uint32_t column_family_id, const Slice& key) {
virtual Status DeleteCF(uint32_t column_family_id, const Slice& key) {
if (column_family_id == 0) {
seen += "Delete(" + key.ToString() + ")";
} else {
seen += "DeleteCF(" + std::to_string(column_family_id) + ", " +
key.ToString() + ")";
}
return Status::OK();
}
};
}
@ -212,23 +215,23 @@ TEST(WriteBatchTest, Continue) {
struct Handler : public TestHandler {
int num_seen = 0;
virtual void PutCF(uint32_t column_family_id, const Slice& key,
virtual Status PutCF(uint32_t column_family_id, const Slice& key,
const Slice& value) {
++num_seen;
TestHandler::PutCF(column_family_id, key, value);
return TestHandler::PutCF(column_family_id, key, value);
}
virtual void MergeCF(uint32_t column_family_id, const Slice& key,
virtual Status MergeCF(uint32_t column_family_id, const Slice& key,
const Slice& value) {
++num_seen;
TestHandler::MergeCF(column_family_id, key, value);
return TestHandler::MergeCF(column_family_id, key, value);
}
virtual void LogData(const Slice& blob) {
++num_seen;
TestHandler::LogData(blob);
}
virtual void DeleteCF(uint32_t column_family_id, const Slice& key) {
virtual Status DeleteCF(uint32_t column_family_id, const Slice& key) {
++num_seen;
TestHandler::DeleteCF(column_family_id, key);
return TestHandler::DeleteCF(column_family_id, key);
}
virtual bool Continue() override {
return num_seen < 3;

View File

@ -27,6 +27,8 @@ using std::unique_ptr;
class ColumnFamilyHandle {
public:
virtual ~ColumnFamilyHandle() {}
virtual uint32_t GetID() const = 0;
};
extern const std::string default_column_family_name;

View File

@ -88,29 +88,41 @@ class WriteBatch {
// default implementation will just call Put without column family for
// backwards compatibility. If the column family is not default,
// the function is noop
virtual void PutCF(uint32_t column_family_id, const Slice& key,
const Slice& value) {
virtual Status PutCF(uint32_t column_family_id, const Slice& key,
const Slice& value) {
if (column_family_id == 0) {
// Put() historically doesn't return status. We didn't want to be
// backwards incompatible so we didn't change the return status
// (this is a public API). We do an ordinary get and return Status::OK()
Put(key, value);
return Status::OK();
}
return Status::InvalidArgument(
"non-default column family and PutCF not implemented");
}
virtual void Put(const Slice& key, const Slice& value);
// Merge and LogData are not pure virtual. Otherwise, we would break
// existing clients of Handler on a source code level. The default
// implementation of Merge simply throws a runtime exception.
virtual void MergeCF(uint32_t column_family_id, const Slice& key,
const Slice& value) {
virtual Status MergeCF(uint32_t column_family_id, const Slice& key,
const Slice& value) {
if (column_family_id == 0) {
Merge(key, value);
return Status::OK();
}
return Status::InvalidArgument(
"non-default column family and MergeCF not implemented");
}
virtual void Merge(const Slice& key, const Slice& value);
// The default implementation of LogData does nothing.
virtual void LogData(const Slice& blob);
virtual void DeleteCF(uint32_t column_family_id, const Slice& key) {
virtual Status DeleteCF(uint32_t column_family_id, const Slice& key) {
if (column_family_id == 0) {
Delete(key);
return Status::OK();
}
return Status::InvalidArgument(
"non-default column family and DeleteCF not implemented");
}
virtual void Delete(const Slice& key);
// Continue is called by WriteBatch::Iterate. If it returns false,