[CF] Fix CF bugs in WriteBatch
Summary: This diff fixes two bugs: * Increase sequence number even if WriteBatch fails. This is important because WriteBatches in WAL logs have implictly increasing sequence number, even if one update in a write batch fails. This caused some writes to get lost in my CF stress testing * Tolerate 'invalid column family' errors on recovery. When a column family is dropped, processing WAL logs can have some WriteBatches that still refer to the dropped column family. In recovery environment, we want to ignore those errors. In client's Write() code path, however, we want to return the failure to the client if he's trying to add data to invalid column family. Test Plan: db_stress's verification works now Reviewers: dhruba, haobo CC: leveldb Differential Revision: https://reviews.facebook.net/D16533
This commit is contained in:
parent
8ea21a778b
commit
f9b2f0ad79
@ -955,7 +955,7 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, SequenceNumber* max_sequence,
|
||||
WriteBatchInternal::SetContents(&batch, record);
|
||||
|
||||
status = WriteBatchInternal::InsertInto(
|
||||
&batch, column_family_memtables_.get(), log_number);
|
||||
&batch, column_family_memtables_.get(), true, log_number);
|
||||
|
||||
MaybeIgnoreError(&status);
|
||||
if (!status.ok()) {
|
||||
@ -3311,7 +3311,7 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
|
||||
|
||||
StartPerfTimer(&write_memtable_timer);
|
||||
status = WriteBatchInternal::InsertInto(
|
||||
updates, column_family_memtables_.get(), 0, this, false);
|
||||
updates, column_family_memtables_.get(), false, 0, this, false);
|
||||
BumpPerfTime(&perf_context.write_memtable_time, &write_memtable_timer);
|
||||
|
||||
if (!status.ok()) {
|
||||
|
@ -234,14 +234,17 @@ class MemTableInserter : public WriteBatch::Handler {
|
||||
public:
|
||||
SequenceNumber sequence_;
|
||||
ColumnFamilyMemTables* cf_mems_;
|
||||
bool recovery_;
|
||||
uint64_t log_number_;
|
||||
DBImpl* db_;
|
||||
const bool dont_filter_deletes_;
|
||||
|
||||
MemTableInserter(SequenceNumber sequence, ColumnFamilyMemTables* cf_mems,
|
||||
uint64_t log_number, DB* db, const bool dont_filter_deletes)
|
||||
bool recovery, uint64_t log_number, DB* db,
|
||||
const bool dont_filter_deletes)
|
||||
: sequence_(sequence),
|
||||
cf_mems_(cf_mems),
|
||||
recovery_(recovery),
|
||||
log_number_(log_number),
|
||||
db_(reinterpret_cast<DBImpl*>(db)),
|
||||
dont_filter_deletes_(dont_filter_deletes) {
|
||||
@ -251,19 +254,39 @@ class MemTableInserter : public WriteBatch::Handler {
|
||||
}
|
||||
}
|
||||
|
||||
bool IgnoreUpdate() {
|
||||
return log_number_ != 0 && log_number_ < cf_mems_->GetLogNumber();
|
||||
bool SeekToColumnFamily(uint32_t column_family_id, Status* s) {
|
||||
bool found = cf_mems_->Seek(column_family_id);
|
||||
if (recovery_ && (!found || log_number_ < cf_mems_->GetLogNumber())) {
|
||||
// if in recovery envoronment:
|
||||
// * If column family was not found, it might mean that the WAL write
|
||||
// batch references to the column family that was dropped after the
|
||||
// insert. We don't want to fail the whole write batch in that case -- we
|
||||
// just ignore the update.
|
||||
// * If log_number_ < cf_mems_->GetLogNumber(), this means that column
|
||||
// family already contains updates from this log. We can't apply updates
|
||||
// twice because of update-in-place or merge workloads -- ignore the
|
||||
// update
|
||||
*s = Status::OK();
|
||||
return false;
|
||||
}
|
||||
if (!found) {
|
||||
assert(!recovery_);
|
||||
// If the column family was not found in non-recovery enviornment
|
||||
// (client's write code-path), we have to fail the write and return
|
||||
// the failure status to the client.
|
||||
*s = Status::InvalidArgument(
|
||||
"Invalid column family specified in write batch");
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
virtual Status PutCF(uint32_t column_family_id, const Slice& key,
|
||||
const Slice& value) {
|
||||
bool found = cf_mems_->Seek(column_family_id);
|
||||
if (!found) {
|
||||
return Status::InvalidArgument(
|
||||
"Invalid column family specified in write batch");
|
||||
}
|
||||
if (IgnoreUpdate()) {
|
||||
return Status::OK();
|
||||
Status seek_status;
|
||||
if (!SeekToColumnFamily(column_family_id, &seek_status)) {
|
||||
++sequence_;
|
||||
return seek_status;
|
||||
}
|
||||
MemTable* mem = cf_mems_->GetMemTable();
|
||||
const Options* options = cf_mems_->GetFullOptions();
|
||||
@ -315,13 +338,10 @@ class MemTableInserter : public WriteBatch::Handler {
|
||||
|
||||
virtual Status MergeCF(uint32_t column_family_id, const Slice& key,
|
||||
const Slice& value) {
|
||||
bool found = cf_mems_->Seek(column_family_id);
|
||||
if (!found) {
|
||||
return Status::InvalidArgument(
|
||||
"Invalid column family specified in write batch");
|
||||
}
|
||||
if (IgnoreUpdate()) {
|
||||
return Status::OK();
|
||||
Status seek_status;
|
||||
if (!SeekToColumnFamily(column_family_id, &seek_status)) {
|
||||
++sequence_;
|
||||
return seek_status;
|
||||
}
|
||||
MemTable* mem = cf_mems_->GetMemTable();
|
||||
const Options* options = cf_mems_->GetFullOptions();
|
||||
@ -387,13 +407,10 @@ class MemTableInserter : public WriteBatch::Handler {
|
||||
}
|
||||
|
||||
virtual Status DeleteCF(uint32_t column_family_id, const Slice& key) {
|
||||
bool found = cf_mems_->Seek(column_family_id);
|
||||
if (!found) {
|
||||
return Status::InvalidArgument(
|
||||
"Invalid column family specified in write batch");
|
||||
}
|
||||
if (IgnoreUpdate()) {
|
||||
return Status::OK();
|
||||
Status seek_status;
|
||||
if (!SeekToColumnFamily(column_family_id, &seek_status)) {
|
||||
++sequence_;
|
||||
return seek_status;
|
||||
}
|
||||
MemTable* mem = cf_mems_->GetMemTable();
|
||||
const Options* options = cf_mems_->GetFullOptions();
|
||||
@ -421,10 +438,10 @@ class MemTableInserter : public WriteBatch::Handler {
|
||||
|
||||
Status WriteBatchInternal::InsertInto(const WriteBatch* b,
|
||||
ColumnFamilyMemTables* memtables,
|
||||
uint64_t log_number, DB* db,
|
||||
const bool dont_filter_deletes) {
|
||||
bool recovery, uint64_t log_number,
|
||||
DB* db, const bool dont_filter_deletes) {
|
||||
MemTableInserter inserter(WriteBatchInternal::Sequence(b), memtables,
|
||||
log_number, db, dont_filter_deletes);
|
||||
recovery, log_number, db, dont_filter_deletes);
|
||||
return b->Iterate(&inserter);
|
||||
}
|
||||
|
||||
|
@ -90,12 +90,18 @@ class WriteBatchInternal {
|
||||
// Inserts batch entries into memtable
|
||||
// If dont_filter_deletes is false AND options.filter_deletes is true,
|
||||
// then --> Drops deletes in batch if db->KeyMayExist returns false
|
||||
// If log_number is not-null, the memtable will be updated only if
|
||||
// If recovery == true, this means InsertInto is executed on a recovery
|
||||
// code-path. WriteBatch referencing a dropped column family can be
|
||||
// found on a recovery code-path and should be ignored (recovery should not
|
||||
// fail). Additionally, the memtable will be updated only if
|
||||
// memtables->GetLogNumber() >= log_number
|
||||
// See MemTableInserter::IgnoreUpdate()
|
||||
// However, if recovery == false, any WriteBatch referencing
|
||||
// non-existing column family will return a failure. Also, log_number is
|
||||
// ignored in that case
|
||||
static Status InsertInto(const WriteBatch* batch,
|
||||
ColumnFamilyMemTables* memtables,
|
||||
uint64_t log_number = 0, DB* db = nullptr,
|
||||
bool recovery = false, uint64_t log_number = 0,
|
||||
DB* db = nullptr,
|
||||
const bool dont_filter_deletes = true);
|
||||
|
||||
static void Append(WriteBatch* dst, const WriteBatch* src);
|
||||
|
Loading…
x
Reference in New Issue
Block a user