diff --git a/db/db_impl.cc b/db/db_impl.cc index 1a8c3bc06..6be7d9c53 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -3524,6 +3524,8 @@ const Options& DBImpl::GetOptions(ColumnFamilyHandle* column_family) const { return *cfh->cfd()->options(); } +const DBOptions& DBImpl::GetDBOptions() const { return db_options_; } + bool DBImpl::GetProperty(ColumnFamilyHandle* column_family, const Slice& property, std::string* value) { bool is_int_property = false; diff --git a/db/db_impl.h b/db/db_impl.h index e17f404ef..91a59635e 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -150,6 +150,8 @@ class DBImpl : public DB { using DB::GetOptions; virtual const Options& GetOptions( ColumnFamilyHandle* column_family) const override; + using DB::GetDBOptions; + virtual const DBOptions& GetDBOptions() const override; using DB::Flush; virtual Status Flush(const FlushOptions& options, ColumnFamilyHandle* column_family) override; diff --git a/db/db_test.cc b/db/db_test.cc index 076033425..11aeeee9d 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -8753,6 +8753,9 @@ class ModelDB: public DB { return options_; } + using DB::GetDBOptions; + virtual const DBOptions& GetDBOptions() const override { return options_; } + using DB::Flush; virtual Status Flush(const rocksdb::FlushOptions& options, ColumnFamilyHandle* column_family) override { diff --git a/db/merge_context.h b/db/merge_context.h index bf483a827..f8609da75 100644 --- a/db/merge_context.h +++ b/db/merge_context.h @@ -66,4 +66,3 @@ private: }; } // namespace rocksdb - diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index db1331e44..e5b4838c8 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -495,6 +495,8 @@ class DB { return GetOptions(DefaultColumnFamily()); } + virtual const DBOptions& GetDBOptions() const = 0; + // Flush all mem-table data. virtual Status Flush(const FlushOptions& options, ColumnFamilyHandle* column_family) = 0; diff --git a/include/rocksdb/utilities/stackable_db.h b/include/rocksdb/utilities/stackable_db.h index 368f12f31..158aa329e 100644 --- a/include/rocksdb/utilities/stackable_db.h +++ b/include/rocksdb/utilities/stackable_db.h @@ -175,6 +175,11 @@ class StackableDB : public DB { return db_->GetOptions(column_family); } + using DB::GetDBOptions; + virtual const DBOptions& GetDBOptions() const override { + return db_->GetDBOptions(); + } + using DB::Flush; virtual Status Flush(const FlushOptions& fopts, ColumnFamilyHandle* column_family) override { diff --git a/include/rocksdb/utilities/write_batch_with_index.h b/include/rocksdb/utilities/write_batch_with_index.h index dabbe1189..7c17534aa 100644 --- a/include/rocksdb/utilities/write_batch_with_index.h +++ b/include/rocksdb/utilities/write_batch_with_index.h @@ -11,6 +11,8 @@ #pragma once +#include + #include "rocksdb/comparator.h" #include "rocksdb/iterator.h" #include "rocksdb/slice.h" @@ -22,6 +24,9 @@ namespace rocksdb { class ColumnFamilyHandle; class Comparator; +class DB; +struct ReadOptions; +struct DBOptions; enum WriteType { kPutRecord, kMergeRecord, kDeleteRecord, kLogDataRecord }; @@ -118,6 +123,37 @@ class WriteBatchWithIndex : public WriteBatchBase { // default column family Iterator* NewIteratorWithBase(Iterator* base_iterator); + // Similar to DB::Get() but will only read the key from this batch. + // If the batch does not have enough data to resolve Merge operations, + // MergeInProgress status may be returned. + Status GetFromBatch(ColumnFamilyHandle* column_family, + const DBOptions& options, const Slice& key, + std::string* value); + + // Similar to previous function but does not require a column_family. + // Note: An InvalidArgument status will be returned if there are any Merge + // operators for this key. + Status GetFromBatch(const DBOptions& options, const Slice& key, + std::string* value) { + return GetFromBatch(nullptr, options, key, value); + } + + // Similar to DB::Get() but will also read writes from this batch. + // + // This function will query both this batch and the DB and then merge + // the results using the DB's merge operator (if the batch contains any + // merge requests). + // + // Setting read_options.snapshot will affect what is read from the DB + // but will NOT change which keys are read from the batch (the keys in + // this batch do not yet belong to any snapshot and will be fetched + // regardless). + Status GetFromBatchAndDB(DB* db, const ReadOptions& read_options, + const Slice& key, std::string* value); + Status GetFromBatchAndDB(DB* db, const ReadOptions& read_options, + ColumnFamilyHandle* column_family, const Slice& key, + std::string* value); + private: struct Rep; Rep* rep; diff --git a/src.mk b/src.mk index f4410e87b..c4be8d3f5 100644 --- a/src.mk +++ b/src.mk @@ -110,6 +110,7 @@ LIB_SOURCES = \ utilities/spatialdb/spatial_db.cc \ utilities/ttl/db_ttl_impl.cc \ utilities/write_batch_with_index/write_batch_with_index.cc \ + utilities/write_batch_with_index/write_batch_with_index_internal.cc \ util/event_logger.cc \ util/ldb_cmd.cc \ util/ldb_tool.cc \ diff --git a/utilities/spatialdb/spatial_db.cc b/utilities/spatialdb/spatial_db.cc index 2a4f7b14e..a90185338 100644 --- a/utilities/spatialdb/spatial_db.cc +++ b/utilities/spatialdb/spatial_db.cc @@ -658,7 +658,7 @@ class SpatialDBImpl : public SpatialDB { }; namespace { -DBOptions GetDBOptions(const SpatialDBOptions& options) { +DBOptions GetDBOptionsFromSpatialDBOptions(const SpatialDBOptions& options) { DBOptions db_options; db_options.max_open_files = 50000; db_options.max_background_compactions = 3 * options.num_threads / 4; @@ -760,7 +760,7 @@ class MetadataStorage { Status SpatialDB::Create( const SpatialDBOptions& options, const std::string& name, const std::vector& spatial_indexes) { - DBOptions db_options = GetDBOptions(options); + DBOptions db_options = GetDBOptionsFromSpatialDBOptions(options); db_options.create_if_missing = true; db_options.create_missing_column_families = true; db_options.error_if_exists = true; @@ -805,7 +805,7 @@ Status SpatialDB::Create( Status SpatialDB::Open(const SpatialDBOptions& options, const std::string& name, SpatialDB** db, bool read_only) { - DBOptions db_options = GetDBOptions(options); + DBOptions db_options = GetDBOptionsFromSpatialDBOptions(options); auto block_cache = NewLRUCache(options.cache_size); ColumnFamilyOptions column_family_options = GetColumnFamilyOptions(options, block_cache); diff --git a/utilities/write_batch_with_index/write_batch_with_index.cc b/utilities/write_batch_with_index/write_batch_with_index.cc index 75d9e9dd5..0c3e02f3e 100644 --- a/utilities/write_batch_with_index/write_batch_with_index.cc +++ b/utilities/write_batch_with_index/write_batch_with_index.cc @@ -10,8 +10,11 @@ #include "rocksdb/comparator.h" #include "rocksdb/iterator.h" #include "db/column_family.h" +#include "db/merge_context.h" +#include "db/merge_helper.h" #include "db/skiplist.h" #include "util/arena.h" +#include "utilities/write_batch_with_index/write_batch_with_index_internal.h" namespace rocksdb { @@ -287,60 +290,6 @@ class BaseDeltaIterator : public Iterator { const Comparator* comparator_; // not owned }; -class ReadableWriteBatch : public WriteBatch { - public: - explicit ReadableWriteBatch(size_t reserved_bytes = 0) - : WriteBatch(reserved_bytes) {} - // Retrieve some information from a write entry in the write batch, given - // the start offset of the write entry. - Status GetEntryFromDataOffset(size_t data_offset, WriteType* type, Slice* Key, - Slice* value, Slice* blob) const; -}; - -// Key used by skip list, as the binary searchable index of WriteBatchWithIndex. -struct WriteBatchIndexEntry { - WriteBatchIndexEntry(size_t o, uint32_t c) - : offset(o), column_family(c), search_key(nullptr) {} - WriteBatchIndexEntry(const Slice* sk, uint32_t c) - : offset(0), column_family(c), search_key(sk) {} - - // If this flag appears in the offset, it indicates a key that is smaller - // than any other entry for the same column family - static const size_t kFlagMin = std::numeric_limits::max(); - - size_t offset; // offset of an entry in write batch's string buffer. - uint32_t column_family; // column family of the entry - const Slice* search_key; // if not null, instead of reading keys from - // write batch, use it to compare. This is used - // for lookup key. -}; - -class WriteBatchEntryComparator { - public: - WriteBatchEntryComparator(const Comparator* _default_comparator, - const ReadableWriteBatch* write_batch) - : default_comparator_(_default_comparator), write_batch_(write_batch) {} - // Compare a and b. Return a negative value if a is less than b, 0 if they - // are equal, and a positive value if a is greater than b - int operator()(const WriteBatchIndexEntry* entry1, - const WriteBatchIndexEntry* entry2) const; - - int CompareKey(uint32_t column_family, const Slice& key1, - const Slice& key2) const; - - void SetComparatorForCF(uint32_t column_family_id, - const Comparator* comparator) { - cf_comparator_map_[column_family_id] = comparator; - } - - const Comparator* default_comparator() { return default_comparator_; } - - private: - const Comparator* default_comparator_; - std::unordered_map cf_comparator_map_; - const ReadableWriteBatch* write_batch_; -}; - typedef SkipList WriteBatchEntrySkipList; @@ -535,45 +484,6 @@ void WriteBatchWithIndex::Rep::AddNewEntry(uint32_t column_family_id) { last_entry_offset = 0; } -Status ReadableWriteBatch::GetEntryFromDataOffset(size_t data_offset, - WriteType* type, Slice* Key, - Slice* value, - Slice* blob) const { - if (type == nullptr || Key == nullptr || value == nullptr || - blob == nullptr) { - return Status::InvalidArgument("Output parameters cannot be null"); - } - - if (data_offset >= GetDataSize()) { - return Status::InvalidArgument("data offset exceed write batch size"); - } - Slice input = Slice(rep_.data() + data_offset, rep_.size() - data_offset); - char tag; - uint32_t column_family; - Status s = - ReadRecordFromWriteBatch(&input, &tag, &column_family, Key, value, blob); - - switch (tag) { - case kTypeColumnFamilyValue: - case kTypeValue: - *type = kPutRecord; - break; - case kTypeColumnFamilyDeletion: - case kTypeDeletion: - *type = kDeleteRecord; - break; - case kTypeColumnFamilyMerge: - case kTypeMerge: - *type = kMergeRecord; - break; - case kTypeLogData: - *type = kLogDataRecord; - break; - default: - return Status::Corruption("unknown WriteBatch tag"); - } - return Status::OK(); -} WriteBatchWithIndex::WriteBatchWithIndex( const Comparator* default_index_comparator, size_t reserved_bytes, @@ -659,66 +569,97 @@ void WriteBatchWithIndex::Delete(const Slice& key) { void WriteBatchWithIndex::Clear() { rep->Clear(); } -int WriteBatchEntryComparator::operator()( - const WriteBatchIndexEntry* entry1, - const WriteBatchIndexEntry* entry2) const { - if (entry1->column_family > entry2->column_family) { - return 1; - } else if (entry1->column_family < entry2->column_family) { - return -1; - } - - if (entry1->offset == WriteBatchIndexEntry::kFlagMin) { - return -1; - } else if (entry2->offset == WriteBatchIndexEntry::kFlagMin) { - return 1; - } - +Status WriteBatchWithIndex::GetFromBatch(ColumnFamilyHandle* column_family, + const DBOptions& options, + const Slice& key, std::string* value) { Status s; - Slice key1, key2; - if (entry1->search_key == nullptr) { - Slice value, blob; - WriteType write_type; - s = write_batch_->GetEntryFromDataOffset(entry1->offset, &write_type, &key1, - &value, &blob); - if (!s.ok()) { - return 1; - } - } else { - key1 = *(entry1->search_key); - } - if (entry2->search_key == nullptr) { - Slice value, blob; - WriteType write_type; - s = write_batch_->GetEntryFromDataOffset(entry2->offset, &write_type, &key2, - &value, &blob); - if (!s.ok()) { - return -1; - } - } else { - key2 = *(entry2->search_key); + MergeContext merge_context; + + WriteBatchWithIndexInternal::Result result = + WriteBatchWithIndexInternal::GetFromBatch(options, this, column_family, + key, &merge_context, + &rep->comparator, value, &s); + + switch (result) { + case WriteBatchWithIndexInternal::Result::kFound: + case WriteBatchWithIndexInternal::Result::kError: + return s; + case WriteBatchWithIndexInternal::Result::kDeleted: + case WriteBatchWithIndexInternal::Result::kNotFound: + return Status::NotFound(); + case WriteBatchWithIndexInternal::Result::kMergeInProgress: + return Status::MergeInProgress(""); + default: + assert(false); } - int cmp = CompareKey(entry1->column_family, key1, key2); - if (cmp != 0) { - return cmp; - } else if (entry1->offset > entry2->offset) { - return 1; - } else if (entry1->offset < entry2->offset) { - return -1; - } - return 0; + return s; } -int WriteBatchEntryComparator::CompareKey(uint32_t column_family, - const Slice& key1, - const Slice& key2) const { - auto comparator_for_cf = cf_comparator_map_.find(column_family); - if (comparator_for_cf != cf_comparator_map_.end()) { - return comparator_for_cf->second->Compare(key1, key2); - } else { - return default_comparator_->Compare(key1, key2); +Status WriteBatchWithIndex::GetFromBatchAndDB(DB* db, + const ReadOptions& read_options, + const Slice& key, + std::string* value) { + return GetFromBatchAndDB(db, read_options, db->DefaultColumnFamily(), key, + value); +} + +Status WriteBatchWithIndex::GetFromBatchAndDB(DB* db, + const ReadOptions& read_options, + ColumnFamilyHandle* column_family, + const Slice& key, + std::string* value) { + Status s; + MergeContext merge_context; + const DBOptions& options = db->GetDBOptions(); + + std::string batch_value; + WriteBatchWithIndexInternal::Result result = + WriteBatchWithIndexInternal::GetFromBatch( + options, this, column_family, key, &merge_context, &rep->comparator, + &batch_value, &s); + + if (result == WriteBatchWithIndexInternal::Result::kFound) { + value->assign(batch_value.data(), batch_value.size()); + return s; } + if (result == WriteBatchWithIndexInternal::Result::kDeleted) { + return Status::NotFound(); + } + if (result == WriteBatchWithIndexInternal::Result::kError) { + return s; + } + assert(result == WriteBatchWithIndexInternal::Result::kMergeInProgress || + result == WriteBatchWithIndexInternal::Result::kNotFound); + + // Did not find key in batch OR could not resolve Merges. Try DB. + s = db->Get(read_options, column_family, key, value); + + if (s.ok() || s.IsNotFound()) { // DB Get Suceeded + if (result == WriteBatchWithIndexInternal::Result::kMergeInProgress) { + // Merge result from DB with merges in Batch + auto cfh = reinterpret_cast(column_family); + const MergeOperator* merge_operator = + cfh->cfd()->ioptions()->merge_operator; + Statistics* statistics = options.statistics.get(); + Env* env = options.env; + Logger* logger = options.info_log.get(); + + Slice db_slice(*value); + Slice* merge_data; + if (s.ok()) { + merge_data = &db_slice; + } else { // Key not present in db (s.IsNotFound()) + merge_data = nullptr; + } + + s = MergeHelper::TimedFullMerge( + key, merge_data, merge_context.GetOperands(), merge_operator, + statistics, env, logger, value); + } + } + + return s; } } // namespace rocksdb diff --git a/utilities/write_batch_with_index/write_batch_with_index_internal.cc b/utilities/write_batch_with_index/write_batch_with_index_internal.cc new file mode 100644 index 000000000..b9cf644af --- /dev/null +++ b/utilities/write_batch_with_index/write_batch_with_index_internal.cc @@ -0,0 +1,242 @@ +// Copyright (c) 2015, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#include "db/column_family.h" +#include "db/merge_context.h" +#include "db/merge_helper.h" +#include "rocksdb/comparator.h" +#include "rocksdb/db.h" +#include "rocksdb/utilities/write_batch_with_index.h" +#include "util/coding.h" +#include "utilities/write_batch_with_index/write_batch_with_index_internal.h" + +namespace rocksdb { + +class Env; +class Logger; +class Statistics; + +Status ReadableWriteBatch::GetEntryFromDataOffset(size_t data_offset, + WriteType* type, Slice* Key, + Slice* value, + Slice* blob) const { + if (type == nullptr || Key == nullptr || value == nullptr || + blob == nullptr) { + return Status::InvalidArgument("Output parameters cannot be null"); + } + + if (data_offset >= GetDataSize()) { + return Status::InvalidArgument("data offset exceed write batch size"); + } + Slice input = Slice(rep_.data() + data_offset, rep_.size() - data_offset); + char tag; + uint32_t column_family; + Status s = + ReadRecordFromWriteBatch(&input, &tag, &column_family, Key, value, blob); + + switch (tag) { + case kTypeColumnFamilyValue: + case kTypeValue: + *type = kPutRecord; + break; + case kTypeColumnFamilyDeletion: + case kTypeDeletion: + *type = kDeleteRecord; + break; + case kTypeColumnFamilyMerge: + case kTypeMerge: + *type = kMergeRecord; + break; + case kTypeLogData: + *type = kLogDataRecord; + break; + default: + return Status::Corruption("unknown WriteBatch tag"); + } + return Status::OK(); +} + +int WriteBatchEntryComparator::operator()( + const WriteBatchIndexEntry* entry1, + const WriteBatchIndexEntry* entry2) const { + if (entry1->column_family > entry2->column_family) { + return 1; + } else if (entry1->column_family < entry2->column_family) { + return -1; + } + + if (entry1->offset == WriteBatchIndexEntry::kFlagMin) { + return -1; + } else if (entry2->offset == WriteBatchIndexEntry::kFlagMin) { + return 1; + } + + Status s; + Slice key1, key2; + if (entry1->search_key == nullptr) { + Slice value, blob; + WriteType write_type; + s = write_batch_->GetEntryFromDataOffset(entry1->offset, &write_type, &key1, + &value, &blob); + if (!s.ok()) { + return 1; + } + } else { + key1 = *(entry1->search_key); + } + if (entry2->search_key == nullptr) { + Slice value, blob; + WriteType write_type; + s = write_batch_->GetEntryFromDataOffset(entry2->offset, &write_type, &key2, + &value, &blob); + if (!s.ok()) { + return -1; + } + } else { + key2 = *(entry2->search_key); + } + + int cmp = CompareKey(entry1->column_family, key1, key2); + if (cmp != 0) { + return cmp; + } else if (entry1->offset > entry2->offset) { + return 1; + } else if (entry1->offset < entry2->offset) { + return -1; + } + return 0; +} + +int WriteBatchEntryComparator::CompareKey(uint32_t column_family, + const Slice& key1, + const Slice& key2) const { + auto comparator_for_cf = cf_comparator_map_.find(column_family); + if (comparator_for_cf != cf_comparator_map_.end()) { + return comparator_for_cf->second->Compare(key1, key2); + } else { + return default_comparator_->Compare(key1, key2); + } +} + +WriteBatchWithIndexInternal::Result WriteBatchWithIndexInternal::GetFromBatch( + const DBOptions& options, WriteBatchWithIndex* batch, + ColumnFamilyHandle* column_family, const Slice& key, + MergeContext* merge_context, WriteBatchEntryComparator* cmp, + std::string* value, Status* s) { + uint32_t cf_id = GetColumnFamilyID(column_family); + *s = Status::OK(); + WriteBatchWithIndexInternal::Result result = + WriteBatchWithIndexInternal::Result::kNotFound; + + std::unique_ptr iter = + std::unique_ptr(batch->NewIterator(column_family)); + + // We want to iterate in the reverse order that the writes were added to the + // batch. Since we don't have a reverse iterator, we must seek past the end. + // TODO(agiardullo): consider adding support for reverse iteration + iter->Seek(key); + while (iter->Valid()) { + const WriteEntry& entry = iter->Entry(); + if (cmp->CompareKey(cf_id, entry.key, key) != 0) { + break; + } + + iter->Next(); + } + + if (!(*s).ok()) { + return WriteBatchWithIndexInternal::Result::kError; + } + + if (!iter->Valid()) { + // Read past end of results. Reposition on last result. + iter->SeekToLast(); + } else { + iter->Prev(); + } + + const Slice* entry_value = nullptr; + while (iter->Valid()) { + const WriteEntry& entry = iter->Entry(); + if (cmp->CompareKey(cf_id, entry.key, key) != 0) { + // Unexpected error or we've reached a different next key + break; + } + + switch (entry.type) { + case kPutRecord: { + result = WriteBatchWithIndexInternal::Result::kFound; + entry_value = &entry.value; + break; + } + case kMergeRecord: { + result = WriteBatchWithIndexInternal::Result::kMergeInProgress; + merge_context->PushOperand(entry.value); + break; + } + case kDeleteRecord: { + result = WriteBatchWithIndexInternal::Result::kDeleted; + break; + } + case kLogDataRecord: { + // ignore + break; + } + default: { + result = WriteBatchWithIndexInternal::Result::kError; + (*s) = Status::Corruption("Unexpected entry in WriteBatchWithIndex:", + std::to_string(entry.type)); + break; + } + } + if (result == WriteBatchWithIndexInternal::Result::kFound || + result == WriteBatchWithIndexInternal::Result::kDeleted || + result == WriteBatchWithIndexInternal::Result::kError) { + // We can stop iterating once we find a PUT or DELETE + break; + } + + iter->Prev(); + } + + if ((*s).ok()) { + if (result == WriteBatchWithIndexInternal::Result::kFound || + result == WriteBatchWithIndexInternal::Result::kDeleted) { + // Found a Put or Delete. Merge if necessary. + if (merge_context->GetNumOperands() > 0) { + const MergeOperator* merge_operator; + + if (column_family != nullptr) { + auto cfh = reinterpret_cast(column_family); + merge_operator = cfh->cfd()->ioptions()->merge_operator; + } else { + *s = Status::InvalidArgument("Must provide a column_family"); + result = WriteBatchWithIndexInternal::Result::kError; + return result; + } + Statistics* statistics = options.statistics.get(); + Env* env = options.env; + Logger* logger = options.info_log.get(); + + *s = MergeHelper::TimedFullMerge( + key, entry_value, merge_context->GetOperands(), merge_operator, + statistics, env, logger, value); + if ((*s).ok()) { + result = WriteBatchWithIndexInternal::Result::kFound; + } else { + result = WriteBatchWithIndexInternal::Result::kError; + } + } else { // nothing to merge + if (result == WriteBatchWithIndexInternal::Result::kFound) { // PUT + value->assign(entry_value->data(), entry_value->size()); + } + } + } + } + + return result; +} + +} // namespace rocksdb diff --git a/utilities/write_batch_with_index/write_batch_with_index_internal.h b/utilities/write_batch_with_index/write_batch_with_index_internal.h new file mode 100644 index 000000000..a98ddd67e --- /dev/null +++ b/utilities/write_batch_with_index/write_batch_with_index_internal.h @@ -0,0 +1,96 @@ +// Copyright (c) 2015, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#pragma once + +#include +#include +#include + +#include "rocksdb/comparator.h" +#include "rocksdb/iterator.h" +#include "rocksdb/slice.h" +#include "rocksdb/status.h" +#include "rocksdb/utilities/write_batch_with_index.h" + +namespace rocksdb { + +class MergeContext; +struct Options; + +// Key used by skip list, as the binary searchable index of WriteBatchWithIndex. +struct WriteBatchIndexEntry { + WriteBatchIndexEntry(size_t o, uint32_t c) + : offset(o), column_family(c), search_key(nullptr) {} + WriteBatchIndexEntry(const Slice* sk, uint32_t c) + : offset(0), column_family(c), search_key(sk) {} + + // If this flag appears in the offset, it indicates a key that is smaller + // than any other entry for the same column family + static const size_t kFlagMin = std::numeric_limits::max(); + + size_t offset; // offset of an entry in write batch's string buffer. + uint32_t column_family; // column family of the entry + const Slice* search_key; // if not null, instead of reading keys from + // write batch, use it to compare. This is used + // for lookup key. +}; + +class ReadableWriteBatch : public WriteBatch { + public: + explicit ReadableWriteBatch(size_t reserved_bytes = 0) + : WriteBatch(reserved_bytes) {} + // Retrieve some information from a write entry in the write batch, given + // the start offset of the write entry. + Status GetEntryFromDataOffset(size_t data_offset, WriteType* type, Slice* Key, + Slice* value, Slice* blob) const; +}; + +class WriteBatchEntryComparator { + public: + WriteBatchEntryComparator(const Comparator* _default_comparator, + const ReadableWriteBatch* write_batch) + : default_comparator_(_default_comparator), write_batch_(write_batch) {} + // Compare a and b. Return a negative value if a is less than b, 0 if they + // are equal, and a positive value if a is greater than b + int operator()(const WriteBatchIndexEntry* entry1, + const WriteBatchIndexEntry* entry2) const; + + int CompareKey(uint32_t column_family, const Slice& key1, + const Slice& key2) const; + + void SetComparatorForCF(uint32_t column_family_id, + const Comparator* comparator) { + cf_comparator_map_[column_family_id] = comparator; + } + + const Comparator* default_comparator() { return default_comparator_; } + + private: + const Comparator* default_comparator_; + std::unordered_map cf_comparator_map_; + const ReadableWriteBatch* write_batch_; +}; + +class WriteBatchWithIndexInternal { + public: + enum Result { kFound, kDeleted, kNotFound, kMergeInProgress, kError }; + + // If batch contains a value for key, store it in *value and return kFound. + // If batch contains a deletion for key, return Deleted. + // If batch contains Merge operations as the most recent entry for a key, + // and the merge process does not stop (not reaching a value or delete), + // prepend the current merge operands to *operands, + // and return kMergeInProgress + // If batch does not contain this key, return kNotFound + // Else, return kError on error with error Status stored in *s. + static WriteBatchWithIndexInternal::Result GetFromBatch( + const DBOptions& options, WriteBatchWithIndex* batch, + ColumnFamilyHandle* column_family, const Slice& key, + MergeContext* merge_context, WriteBatchEntryComparator* cmp, + std::string* value, Status* s); +}; + +} // namespace rocksdb diff --git a/utilities/write_batch_with_index/write_batch_with_index_test.cc b/utilities/write_batch_with_index/write_batch_with_index_test.cc index c6da1612a..5e9ff772c 100644 --- a/utilities/write_batch_with_index/write_batch_with_index_test.cc +++ b/utilities/write_batch_with_index/write_batch_with_index_test.cc @@ -13,6 +13,8 @@ #include "db/column_family.h" #include "rocksdb/utilities/write_batch_with_index.h" #include "util/testharness.h" +#include "utilities/merge_operators.h" +#include "utilities/merge_operators/string_append/stringappend.h" namespace rocksdb { @@ -907,6 +909,279 @@ TEST_F(WriteBatchWithIndexTest, TestIteraratorWithBaseReverseCmp) { } } +TEST_F(WriteBatchWithIndexTest, TestGetFromBatch) { + Options options; + WriteBatchWithIndex batch; + Status s; + std::string value; + + s = batch.GetFromBatch(options, "b", &value); + ASSERT_TRUE(s.IsNotFound()); + + batch.Put("a", "a"); + batch.Put("b", "b"); + batch.Put("c", "c"); + batch.Put("a", "z"); + batch.Delete("c"); + batch.Delete("d"); + batch.Delete("e"); + batch.Put("e", "e"); + + s = batch.GetFromBatch(options, "b", &value); + ASSERT_OK(s); + ASSERT_EQ("b", value); + + s = batch.GetFromBatch(options, "a", &value); + ASSERT_OK(s); + ASSERT_EQ("z", value); + + s = batch.GetFromBatch(options, "c", &value); + ASSERT_TRUE(s.IsNotFound()); + + s = batch.GetFromBatch(options, "d", &value); + ASSERT_TRUE(s.IsNotFound()); + + s = batch.GetFromBatch(options, "x", &value); + ASSERT_TRUE(s.IsNotFound()); + + s = batch.GetFromBatch(options, "e", &value); + ASSERT_OK(s); + ASSERT_EQ("e", value); + + batch.Merge("z", "z"); + + s = batch.GetFromBatch(options, "z", &value); + ASSERT_NOK(s); // No merge operator specified. + + s = batch.GetFromBatch(options, "b", &value); + ASSERT_OK(s); + ASSERT_EQ("b", value); +} + +TEST_F(WriteBatchWithIndexTest, TestGetFromBatchMerge) { + DB* db; + Options options; + options.merge_operator = MergeOperators::CreateFromStringId("stringappend"); + options.create_if_missing = true; + + std::string dbname = test::TmpDir() + "/write_batch_with_index_test"; + + DestroyDB(dbname, options); + Status s = DB::Open(options, dbname, &db); + assert(s.ok()); + + ColumnFamilyHandle* column_family = db->DefaultColumnFamily(); + WriteBatchWithIndex batch; + std::string value; + + s = batch.GetFromBatch(options, "x", &value); + ASSERT_TRUE(s.IsNotFound()); + + batch.Put("x", "X"); + std::string expected = "X"; + + for (int i = 0; i < 5; i++) { + batch.Merge("x", std::to_string(i)); + expected = expected + "," + std::to_string(i); + + if (i % 2 == 0) { + batch.Put("y", std::to_string(i / 2)); + } + + batch.Merge("z", "z"); + + s = batch.GetFromBatch(column_family, options, "x", &value); + ASSERT_OK(s); + ASSERT_EQ(expected, value); + + s = batch.GetFromBatch(column_family, options, "y", &value); + ASSERT_OK(s); + ASSERT_EQ(std::to_string(i / 2), value); + + s = batch.GetFromBatch(column_family, options, "z", &value); + ASSERT_TRUE(s.IsMergeInProgress()); + } + + delete db; + DestroyDB(dbname, options); +} + +TEST_F(WriteBatchWithIndexTest, TestGetFromBatchAndDB) { + DB* db; + Options options; + options.create_if_missing = true; + std::string dbname = test::TmpDir() + "/write_batch_with_index_test"; + + DestroyDB(dbname, options); + Status s = DB::Open(options, dbname, &db); + assert(s.ok()); + + WriteBatchWithIndex batch; + ReadOptions read_options; + WriteOptions write_options; + std::string value; + + s = db->Put(write_options, "a", "a"); + ASSERT_OK(s); + + s = db->Put(write_options, "b", "b"); + ASSERT_OK(s); + + s = db->Put(write_options, "c", "c"); + ASSERT_OK(s); + + batch.Put("a", "batch.a"); + batch.Delete("b"); + + s = batch.GetFromBatchAndDB(db, read_options, "a", &value); + ASSERT_OK(s); + ASSERT_EQ("batch.a", value); + + s = batch.GetFromBatchAndDB(db, read_options, "b", &value); + ASSERT_TRUE(s.IsNotFound()); + + s = batch.GetFromBatchAndDB(db, read_options, "c", &value); + ASSERT_OK(s); + ASSERT_EQ("c", value); + + s = batch.GetFromBatchAndDB(db, read_options, "x", &value); + ASSERT_TRUE(s.IsNotFound()); + + db->Delete(write_options, "x"); + + s = batch.GetFromBatchAndDB(db, read_options, "x", &value); + ASSERT_TRUE(s.IsNotFound()); + + delete db; + DestroyDB(dbname, options); +} + +TEST_F(WriteBatchWithIndexTest, TestGetFromBatchAndDBMerge) { + DB* db; + Options options; + + options.create_if_missing = true; + std::string dbname = test::TmpDir() + "/write_batch_with_index_test"; + + options.merge_operator = MergeOperators::CreateFromStringId("stringappend"); + + DestroyDB(dbname, options); + Status s = DB::Open(options, dbname, &db); + assert(s.ok()); + + WriteBatchWithIndex batch; + ReadOptions read_options; + WriteOptions write_options; + std::string value; + + s = db->Put(write_options, "a", "a0"); + ASSERT_OK(s); + + s = db->Put(write_options, "b", "b0"); + ASSERT_OK(s); + + s = db->Merge(write_options, "b", "b1"); + ASSERT_OK(s); + + s = db->Merge(write_options, "c", "c0"); + ASSERT_OK(s); + + s = db->Merge(write_options, "d", "d0"); + ASSERT_OK(s); + + batch.Merge("a", "a1"); + batch.Merge("a", "a2"); + batch.Merge("b", "b2"); + batch.Merge("d", "d1"); + batch.Merge("e", "e0"); + + s = batch.GetFromBatchAndDB(db, read_options, "a", &value); + ASSERT_OK(s); + ASSERT_EQ("a0,a1,a2", value); + + s = batch.GetFromBatchAndDB(db, read_options, "b", &value); + ASSERT_OK(s); + ASSERT_EQ("b0,b1,b2", value); + + s = batch.GetFromBatchAndDB(db, read_options, "c", &value); + ASSERT_OK(s); + ASSERT_EQ("c0", value); + + s = batch.GetFromBatchAndDB(db, read_options, "d", &value); + ASSERT_OK(s); + ASSERT_EQ("d0,d1", value); + + s = batch.GetFromBatchAndDB(db, read_options, "e", &value); + ASSERT_OK(s); + ASSERT_EQ("e0", value); + + s = db->Delete(write_options, "x"); + ASSERT_OK(s); + + s = batch.GetFromBatchAndDB(db, read_options, "x", &value); + ASSERT_TRUE(s.IsNotFound()); + + const Snapshot* snapshot = db->GetSnapshot(); + ReadOptions snapshot_read_options; + snapshot_read_options.snapshot = snapshot; + + s = db->Delete(write_options, "a"); + ASSERT_OK(s); + + s = batch.GetFromBatchAndDB(db, read_options, "a", &value); + ASSERT_OK(s); + ASSERT_EQ("a1,a2", value); + + s = batch.GetFromBatchAndDB(db, snapshot_read_options, "a", &value); + ASSERT_OK(s); + ASSERT_EQ("a0,a1,a2", value); + + batch.Delete("a"); + + s = batch.GetFromBatchAndDB(db, read_options, "a", &value); + ASSERT_TRUE(s.IsNotFound()); + + s = batch.GetFromBatchAndDB(db, snapshot_read_options, "a", &value); + ASSERT_TRUE(s.IsNotFound()); + + s = db->Merge(write_options, "c", "c1"); + ASSERT_OK(s); + + s = batch.GetFromBatchAndDB(db, read_options, "c", &value); + ASSERT_OK(s); + ASSERT_EQ("c0,c1", value); + + s = batch.GetFromBatchAndDB(db, snapshot_read_options, "c", &value); + ASSERT_OK(s); + ASSERT_EQ("c0", value); + + s = db->Put(write_options, "e", "e1"); + ASSERT_OK(s); + + s = batch.GetFromBatchAndDB(db, read_options, "e", &value); + ASSERT_OK(s); + ASSERT_EQ("e1,e0", value); + + s = batch.GetFromBatchAndDB(db, snapshot_read_options, "e", &value); + ASSERT_OK(s); + ASSERT_EQ("e0", value); + + s = db->Delete(write_options, "e"); + ASSERT_OK(s); + + s = batch.GetFromBatchAndDB(db, read_options, "e", &value); + ASSERT_OK(s); + ASSERT_EQ("e0", value); + + s = batch.GetFromBatchAndDB(db, snapshot_read_options, "e", &value); + ASSERT_OK(s); + ASSERT_EQ("e0", value); + + db->ReleaseSnapshot(snapshot); + delete db; + DestroyDB(dbname, options); +} + } // namespace int main(int argc, char** argv) {