// Copyright (c) 2015, Facebook, Inc. All rights reserved. // This source code is licensed under the BSD-style license found in the // LICENSE file in the root directory of this source tree. An additional grant // of patent rights can be found in the PATENTS file in the same directory. #ifndef ROCKSDB_LITE #include "utilities/write_batch_with_index/write_batch_with_index_internal.h" #include "db/column_family.h" #include "db/merge_context.h" #include "db/merge_helper.h" #include "rocksdb/comparator.h" #include "rocksdb/db.h" #include "rocksdb/utilities/write_batch_with_index.h" #include "util/coding.h" #include "util/string_util.h" namespace rocksdb { class Env; class Logger; class Statistics; Status ReadableWriteBatch::GetEntryFromDataOffset(size_t data_offset, WriteType* type, Slice* Key, Slice* value, Slice* blob) const { if (type == nullptr || Key == nullptr || value == nullptr || blob == nullptr) { return Status::InvalidArgument("Output parameters cannot be null"); } if (data_offset == GetDataSize()) { // reached end of batch. return Status::NotFound(); } if (data_offset > GetDataSize()) { return Status::InvalidArgument("data offset exceed write batch size"); } Slice input = Slice(rep_.data() + data_offset, rep_.size() - data_offset); char tag; uint32_t column_family; Status s = ReadRecordFromWriteBatch(&input, &tag, &column_family, Key, value, blob); switch (tag) { case kTypeColumnFamilyValue: case kTypeValue: *type = kPutRecord; break; case kTypeColumnFamilyDeletion: case kTypeDeletion: *type = kDeleteRecord; break; case kTypeColumnFamilySingleDeletion: case kTypeSingleDeletion: *type = kSingleDeleteRecord; break; case kTypeColumnFamilyMerge: case kTypeMerge: *type = kMergeRecord; break; case kTypeLogData: *type = kLogDataRecord; break; default: return Status::Corruption("unknown WriteBatch tag"); } return Status::OK(); } int WriteBatchEntryComparator::operator()( const WriteBatchIndexEntry* entry1, const WriteBatchIndexEntry* entry2) const { if (entry1->column_family > entry2->column_family) { return 1; } else if (entry1->column_family < entry2->column_family) { return -1; } if (entry1->offset == WriteBatchIndexEntry::kFlagMin) { return -1; } else if (entry2->offset == WriteBatchIndexEntry::kFlagMin) { return 1; } Status s; Slice key1, key2; if (entry1->search_key == nullptr) { Slice value, blob; WriteType write_type; s = write_batch_->GetEntryFromDataOffset(entry1->offset, &write_type, &key1, &value, &blob); if (!s.ok()) { return 1; } } else { key1 = *(entry1->search_key); } if (entry2->search_key == nullptr) { Slice value, blob; WriteType write_type; s = write_batch_->GetEntryFromDataOffset(entry2->offset, &write_type, &key2, &value, &blob); if (!s.ok()) { return -1; } } else { key2 = *(entry2->search_key); } int cmp = CompareKey(entry1->column_family, key1, key2); if (cmp != 0) { return cmp; } else if (entry1->offset > entry2->offset) { return 1; } else if (entry1->offset < entry2->offset) { return -1; } return 0; } int WriteBatchEntryComparator::CompareKey(uint32_t column_family, const Slice& key1, const Slice& key2) const { auto comparator_for_cf = cf_comparator_map_.find(column_family); if (comparator_for_cf != cf_comparator_map_.end()) { return comparator_for_cf->second->Compare(key1, key2); } else { return default_comparator_->Compare(key1, key2); } } WriteBatchWithIndexInternal::Result WriteBatchWithIndexInternal::GetFromBatch( const DBOptions& options, WriteBatchWithIndex* batch, ColumnFamilyHandle* column_family, const Slice& key, MergeContext* merge_context, WriteBatchEntryComparator* cmp, std::string* value, bool overwrite_key, Status* s) { uint32_t cf_id = GetColumnFamilyID(column_family); *s = Status::OK(); WriteBatchWithIndexInternal::Result result = WriteBatchWithIndexInternal::Result::kNotFound; std::unique_ptr iter = std::unique_ptr(batch->NewIterator(column_family)); // We want to iterate in the reverse order that the writes were added to the // batch. Since we don't have a reverse iterator, we must seek past the end. // TODO(agiardullo): consider adding support for reverse iteration iter->Seek(key); while (iter->Valid()) { const WriteEntry& entry = iter->Entry(); if (cmp->CompareKey(cf_id, entry.key, key) != 0) { break; } iter->Next(); } if (!(*s).ok()) { return WriteBatchWithIndexInternal::Result::kError; } if (!iter->Valid()) { // Read past end of results. Reposition on last result. iter->SeekToLast(); } else { iter->Prev(); } const Slice* entry_value = nullptr; while (iter->Valid()) { const WriteEntry& entry = iter->Entry(); if (cmp->CompareKey(cf_id, entry.key, key) != 0) { // Unexpected error or we've reached a different next key break; } switch (entry.type) { case kPutRecord: { result = WriteBatchWithIndexInternal::Result::kFound; entry_value = &entry.value; break; } case kMergeRecord: { result = WriteBatchWithIndexInternal::Result::kMergeInProgress; merge_context->PushOperand(entry.value); break; } case kDeleteRecord: case kSingleDeleteRecord: { result = WriteBatchWithIndexInternal::Result::kDeleted; break; } case kLogDataRecord: { // ignore break; } default: { result = WriteBatchWithIndexInternal::Result::kError; (*s) = Status::Corruption("Unexpected entry in WriteBatchWithIndex:", ToString(entry.type)); break; } } if (result == WriteBatchWithIndexInternal::Result::kFound || result == WriteBatchWithIndexInternal::Result::kDeleted || result == WriteBatchWithIndexInternal::Result::kError) { // We can stop iterating once we find a PUT or DELETE break; } if (result == WriteBatchWithIndexInternal::Result::kMergeInProgress && overwrite_key == true) { // Since we've overwritten keys, we do not know what other operations are // in this batch for this key, so we cannot do a Merge to compute the // result. Instead, we will simply return MergeInProgress. break; } iter->Prev(); } if ((*s).ok()) { if (result == WriteBatchWithIndexInternal::Result::kFound || result == WriteBatchWithIndexInternal::Result::kDeleted) { // Found a Put or Delete. Merge if necessary. if (merge_context->GetNumOperands() > 0) { const MergeOperator* merge_operator; if (column_family != nullptr) { auto cfh = reinterpret_cast(column_family); merge_operator = cfh->cfd()->ioptions()->merge_operator; } else { *s = Status::InvalidArgument("Must provide a column_family"); result = WriteBatchWithIndexInternal::Result::kError; return result; } Statistics* statistics = options.statistics.get(); Env* env = options.env; Logger* logger = options.info_log.get(); *s = MergeHelper::TimedFullMerge( key, entry_value, merge_context->GetOperands(), merge_operator, statistics, env, logger, value); if ((*s).ok()) { result = WriteBatchWithIndexInternal::Result::kFound; } else { result = WriteBatchWithIndexInternal::Result::kError; } } else { // nothing to merge if (result == WriteBatchWithIndexInternal::Result::kFound) { // PUT value->assign(entry_value->data(), entry_value->size()); } } } } return result; } } // namespace rocksdb #endif // !ROCKSDB_LITE