rocksdb/utilities/write_batch_with_index/write_batch_with_index_internal.cc
Manuel Ung 8ad63a4b86 WriteUnPrepared: Add new WAL marker kTypeBeginUnprepareXID (#4069)
Summary:
This adds a new WAL marker of type kTypeBeginUnprepareXID.

Also, DBImpl now contains a field called batch_per_txn (meaning one WriteBatch per transaction, or possibly multiple WriteBatches). This would also indicate that this DB is using WriteUnprepared policy.

Recovery code would be able to make use of this extra field on DBImpl in a separate diff. For now, it is just used to determine whether the WAL is compatible or not.
Closes https://github.com/facebook/rocksdb/pull/4069

Differential Revision: D8675099

Pulled By: lth

fbshipit-source-id: ca27cae1738e46d65f2bb92860fc759deb874749
2018-06-28 18:58:29 -07:00

274 lines
8.6 KiB
C++

// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#ifndef ROCKSDB_LITE
#include "utilities/write_batch_with_index/write_batch_with_index_internal.h"
#include "db/column_family.h"
#include "db/merge_context.h"
#include "db/merge_helper.h"
#include "rocksdb/comparator.h"
#include "rocksdb/db.h"
#include "rocksdb/utilities/write_batch_with_index.h"
#include "util/coding.h"
#include "util/string_util.h"
namespace rocksdb {
class Env;
class Logger;
class Statistics;
Status ReadableWriteBatch::GetEntryFromDataOffset(size_t data_offset,
WriteType* type, Slice* Key,
Slice* value, Slice* blob,
Slice* xid) const {
if (type == nullptr || Key == nullptr || value == nullptr ||
blob == nullptr || xid == nullptr) {
return Status::InvalidArgument("Output parameters cannot be null");
}
if (data_offset == GetDataSize()) {
// reached end of batch.
return Status::NotFound();
}
if (data_offset > GetDataSize()) {
return Status::InvalidArgument("data offset exceed write batch size");
}
Slice input = Slice(rep_.data() + data_offset, rep_.size() - data_offset);
char tag;
uint32_t column_family;
Status s = ReadRecordFromWriteBatch(&input, &tag, &column_family, Key, value,
blob, xid);
switch (tag) {
case kTypeColumnFamilyValue:
case kTypeValue:
*type = kPutRecord;
break;
case kTypeColumnFamilyDeletion:
case kTypeDeletion:
*type = kDeleteRecord;
break;
case kTypeColumnFamilySingleDeletion:
case kTypeSingleDeletion:
*type = kSingleDeleteRecord;
break;
case kTypeColumnFamilyRangeDeletion:
case kTypeRangeDeletion:
*type = kDeleteRangeRecord;
break;
case kTypeColumnFamilyMerge:
case kTypeMerge:
*type = kMergeRecord;
break;
case kTypeLogData:
*type = kLogDataRecord;
break;
case kTypeNoop:
case kTypeBeginPrepareXID:
case kTypeBeginPersistedPrepareXID:
case kTypeBeginUnprepareXID:
case kTypeEndPrepareXID:
case kTypeCommitXID:
case kTypeRollbackXID:
*type = kXIDRecord;
break;
default:
return Status::Corruption("unknown WriteBatch tag ",
ToString(static_cast<unsigned int>(tag)));
}
return Status::OK();
}
int WriteBatchEntryComparator::operator()(
const WriteBatchIndexEntry* entry1,
const WriteBatchIndexEntry* entry2) const {
if (entry1->column_family > entry2->column_family) {
return 1;
} else if (entry1->column_family < entry2->column_family) {
return -1;
}
if (entry1->offset == WriteBatchIndexEntry::kFlagMin) {
return -1;
} else if (entry2->offset == WriteBatchIndexEntry::kFlagMin) {
return 1;
}
Slice key1, key2;
if (entry1->search_key == nullptr) {
key1 = Slice(write_batch_->Data().data() + entry1->key_offset,
entry1->key_size);
} else {
key1 = *(entry1->search_key);
}
if (entry2->search_key == nullptr) {
key2 = Slice(write_batch_->Data().data() + entry2->key_offset,
entry2->key_size);
} else {
key2 = *(entry2->search_key);
}
int cmp = CompareKey(entry1->column_family, key1, key2);
if (cmp != 0) {
return cmp;
} else if (entry1->offset > entry2->offset) {
return 1;
} else if (entry1->offset < entry2->offset) {
return -1;
}
return 0;
}
int WriteBatchEntryComparator::CompareKey(uint32_t column_family,
const Slice& key1,
const Slice& key2) const {
if (column_family < cf_comparators_.size() &&
cf_comparators_[column_family] != nullptr) {
return cf_comparators_[column_family]->Compare(key1, key2);
} else {
return default_comparator_->Compare(key1, key2);
}
}
WriteBatchWithIndexInternal::Result WriteBatchWithIndexInternal::GetFromBatch(
const ImmutableDBOptions& immuable_db_options, WriteBatchWithIndex* batch,
ColumnFamilyHandle* column_family, const Slice& key,
MergeContext* merge_context, WriteBatchEntryComparator* cmp,
std::string* value, bool overwrite_key, Status* s) {
uint32_t cf_id = GetColumnFamilyID(column_family);
*s = Status::OK();
WriteBatchWithIndexInternal::Result result =
WriteBatchWithIndexInternal::Result::kNotFound;
std::unique_ptr<WBWIIterator> iter =
std::unique_ptr<WBWIIterator>(batch->NewIterator(column_family));
// We want to iterate in the reverse order that the writes were added to the
// batch. Since we don't have a reverse iterator, we must seek past the end.
// TODO(agiardullo): consider adding support for reverse iteration
iter->Seek(key);
while (iter->Valid()) {
const WriteEntry entry = iter->Entry();
if (cmp->CompareKey(cf_id, entry.key, key) != 0) {
break;
}
iter->Next();
}
if (!(*s).ok()) {
return WriteBatchWithIndexInternal::Result::kError;
}
if (!iter->Valid()) {
// Read past end of results. Reposition on last result.
iter->SeekToLast();
} else {
iter->Prev();
}
Slice entry_value;
while (iter->Valid()) {
const WriteEntry entry = iter->Entry();
if (cmp->CompareKey(cf_id, entry.key, key) != 0) {
// Unexpected error or we've reached a different next key
break;
}
switch (entry.type) {
case kPutRecord: {
result = WriteBatchWithIndexInternal::Result::kFound;
entry_value = entry.value;
break;
}
case kMergeRecord: {
result = WriteBatchWithIndexInternal::Result::kMergeInProgress;
merge_context->PushOperand(entry.value);
break;
}
case kDeleteRecord:
case kSingleDeleteRecord: {
result = WriteBatchWithIndexInternal::Result::kDeleted;
break;
}
case kLogDataRecord:
case kXIDRecord: {
// ignore
break;
}
default: {
result = WriteBatchWithIndexInternal::Result::kError;
(*s) = Status::Corruption("Unexpected entry in WriteBatchWithIndex:",
ToString(entry.type));
break;
}
}
if (result == WriteBatchWithIndexInternal::Result::kFound ||
result == WriteBatchWithIndexInternal::Result::kDeleted ||
result == WriteBatchWithIndexInternal::Result::kError) {
// We can stop iterating once we find a PUT or DELETE
break;
}
if (result == WriteBatchWithIndexInternal::Result::kMergeInProgress &&
overwrite_key == true) {
// Since we've overwritten keys, we do not know what other operations are
// in this batch for this key, so we cannot do a Merge to compute the
// result. Instead, we will simply return MergeInProgress.
break;
}
iter->Prev();
}
if ((*s).ok()) {
if (result == WriteBatchWithIndexInternal::Result::kFound ||
result == WriteBatchWithIndexInternal::Result::kDeleted) {
// Found a Put or Delete. Merge if necessary.
if (merge_context->GetNumOperands() > 0) {
const MergeOperator* merge_operator;
if (column_family != nullptr) {
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
merge_operator = cfh->cfd()->ioptions()->merge_operator;
} else {
*s = Status::InvalidArgument("Must provide a column_family");
result = WriteBatchWithIndexInternal::Result::kError;
return result;
}
Statistics* statistics = immuable_db_options.statistics.get();
Env* env = immuable_db_options.env;
Logger* logger = immuable_db_options.info_log.get();
if (merge_operator) {
*s = MergeHelper::TimedFullMerge(merge_operator, key, &entry_value,
merge_context->GetOperands(), value,
logger, statistics, env);
} else {
*s = Status::InvalidArgument("Options::merge_operator must be set");
}
if ((*s).ok()) {
result = WriteBatchWithIndexInternal::Result::kFound;
} else {
result = WriteBatchWithIndexInternal::Result::kError;
}
} else { // nothing to merge
if (result == WriteBatchWithIndexInternal::Result::kFound) { // PUT
value->assign(entry_value.data(), entry_value.size());
}
}
}
}
return result;
}
} // namespace rocksdb
#endif // !ROCKSDB_LITE