diff --git a/include/rocksdb/utilities/transaction.h b/include/rocksdb/utilities/transaction.h index 5bcf1c8c7..a3f9f6303 100644 --- a/include/rocksdb/utilities/transaction.h +++ b/include/rocksdb/utilities/transaction.h @@ -205,6 +205,19 @@ class Transaction { const std::vector& keys, std::vector* values) = 0; + // Batched version of MultiGet - see DBImpl::MultiGet(). Sub-classes are + // expected to override this with an implementation that calls + // DBImpl::MultiGet() + virtual void MultiGet(const ReadOptions& options, + ColumnFamilyHandle* column_family, + const size_t num_keys, const Slice* keys, + PinnableSlice* values, Status* statuses, + const bool /*sorted_input*/ = false) { + for (size_t i = 0; i < num_keys; ++i) { + statuses[i] = Get(options, column_family, keys[i], &values[i]); + } + } + // Read this key and ensure that this transaction will only // be able to be committed if this key is not written outside this // transaction after it has first been read (or after the snapshot if a diff --git a/include/rocksdb/utilities/write_batch_with_index.h b/include/rocksdb/utilities/write_batch_with_index.h index d25b9513b..34e6c4689 100644 --- a/include/rocksdb/utilities/write_batch_with_index.h +++ b/include/rocksdb/utilities/write_batch_with_index.h @@ -14,6 +14,7 @@ #include #include +#include #include "rocksdb/comparator.h" #include "rocksdb/iterator.h" @@ -207,6 +208,12 @@ class WriteBatchWithIndex : public WriteBatchBase { ColumnFamilyHandle* column_family, const Slice& key, PinnableSlice* value); + void MultiGetFromBatchAndDB(DB* db, const ReadOptions& read_options, + ColumnFamilyHandle* column_family, + const size_t num_keys, const Slice* keys, + PinnableSlice* values, Status* statuses, + bool sorted_input); + // Records the state of the batch for future calls to RollbackToSavePoint(). // May be called multiple times to set multiple save points. void SetSavePoint() override; @@ -246,6 +253,11 @@ class WriteBatchWithIndex : public WriteBatchBase { Status GetFromBatchAndDB(DB* db, const ReadOptions& read_options, ColumnFamilyHandle* column_family, const Slice& key, PinnableSlice* value, ReadCallback* callback); + void MultiGetFromBatchAndDB(DB* db, const ReadOptions& read_options, + ColumnFamilyHandle* column_family, + const size_t num_keys, const Slice* keys, + PinnableSlice* values, Status* statuses, + bool sorted_input, ReadCallback* callback); struct Rep; std::unique_ptr rep; }; diff --git a/utilities/transactions/transaction_base.cc b/utilities/transactions/transaction_base.cc index fc2e6b520..68b87b5aa 100644 --- a/utilities/transactions/transaction_base.cc +++ b/utilities/transactions/transaction_base.cc @@ -281,6 +281,16 @@ std::vector TransactionBaseImpl::MultiGet( return stat_list; } +void TransactionBaseImpl::MultiGet(const ReadOptions& read_options, + ColumnFamilyHandle* column_family, + const size_t num_keys, const Slice* keys, + PinnableSlice* values, Status* statuses, + bool sorted_input) { + write_batch_.MultiGetFromBatchAndDB(db_, read_options, column_family, + num_keys, keys, values, statuses, + sorted_input); +} + std::vector TransactionBaseImpl::MultiGetForUpdate( const ReadOptions& read_options, const std::vector& column_family, diff --git a/utilities/transactions/transaction_base.h b/utilities/transactions/transaction_base.h index cc3d0cdc3..04274866a 100644 --- a/utilities/transactions/transaction_base.h +++ b/utilities/transactions/transaction_base.h @@ -47,7 +47,7 @@ class TransactionBaseImpl : public Transaction { void SetSavePoint() override; Status RollbackToSavePoint() override; - + Status PopSavePoint() override; using Transaction::Get; @@ -80,6 +80,7 @@ class TransactionBaseImpl : public Transaction { exclusive, do_validate); } + using Transaction::MultiGet; std::vector MultiGet( const ReadOptions& options, const std::vector& column_family, @@ -94,6 +95,11 @@ class TransactionBaseImpl : public Transaction { keys, values); } + void MultiGet(const ReadOptions& options, ColumnFamilyHandle* column_family, + const size_t num_keys, const Slice* keys, PinnableSlice* values, + Status* statuses, bool sorted_input = false) override; + + using Transaction::MultiGetForUpdate; std::vector MultiGetForUpdate( const ReadOptions& options, const std::vector& column_family, diff --git a/utilities/transactions/transaction_test.cc b/utilities/transactions/transaction_test.cc index 79f7f8899..1a5bf2d66 100644 --- a/utilities/transactions/transaction_test.cc +++ b/utilities/transactions/transaction_test.cc @@ -2676,7 +2676,6 @@ TEST_P(TransactionTest, ColumnFamiliesTest) { handles[0], handles[2]}; std::vector multiget_keys = {"AAA", "AAAZZZ", "foo", "foo"}; std::vector values(4); - std::vector results = txn->MultiGetForUpdate( snapshot_read_options, multiget_cfh, multiget_keys, &values); ASSERT_OK(results[0]); @@ -2736,6 +2735,92 @@ TEST_P(TransactionTest, ColumnFamiliesTest) { } } +TEST_P(TransactionTest, MultiGetBatchedTest) { + WriteOptions write_options; + ReadOptions read_options, snapshot_read_options; + TransactionOptions txn_options; + string value; + Status s; + + ColumnFamilyHandle* cf; + ColumnFamilyOptions cf_options; + + // Create a new column families + s = db->CreateColumnFamily(cf_options, "CF", &cf); + ASSERT_OK(s); + + delete cf; + delete db; + db = nullptr; + + // open DB with three column families + std::vector column_families; + // have to open default column family + column_families.push_back( + ColumnFamilyDescriptor(kDefaultColumnFamilyName, ColumnFamilyOptions())); + // open the new column families + cf_options.merge_operator = MergeOperators::CreateStringAppendOperator(); + column_families.push_back(ColumnFamilyDescriptor("CF", cf_options)); + + std::vector handles; + + options.merge_operator = MergeOperators::CreateStringAppendOperator(); + s = TransactionDB::Open(options, txn_db_options, dbname, column_families, + &handles, &db); + assert(db != nullptr); + ASSERT_OK(s); + + // Write some data to the db + WriteBatch batch; + batch.Put(handles[1], "aaa", "val1"); + batch.Put(handles[1], "bbb", "val2"); + batch.Put(handles[1], "ccc", "val3"); + batch.Put(handles[1], "ddd", "foo"); + batch.Put(handles[1], "eee", "val5"); + batch.Put(handles[1], "fff", "val6"); + batch.Merge(handles[1], "ggg", "foo"); + s = db->Write(write_options, &batch); + ASSERT_OK(s); + + Transaction* txn = db->BeginTransaction(write_options); + ASSERT_TRUE(txn); + + txn->SetSnapshot(); + snapshot_read_options.snapshot = txn->GetSnapshot(); + + txn_options.set_snapshot = true; + // Write some data to the db + s = txn->Delete(handles[1], "bbb"); + ASSERT_OK(s); + s = txn->Put(handles[1], "ccc", "val3_new"); + ASSERT_OK(s); + s = txn->Merge(handles[1], "ddd", "bar"); + ASSERT_OK(s); + + std::vector keys = {"aaa", "bbb", "ccc", "ddd", "eee", "fff", "ggg"}; + std::vector values(keys.size()); + std::vector statuses(keys.size()); + + txn->MultiGet(snapshot_read_options, handles[1], keys.size(), keys.data(), + values.data(), statuses.data()); + ASSERT_TRUE(statuses[0].ok()); + ASSERT_EQ(values[0], "val1"); + ASSERT_TRUE(statuses[1].IsNotFound()); + ASSERT_TRUE(statuses[2].ok()); + ASSERT_EQ(values[2], "val3_new"); + ASSERT_TRUE(statuses[3].IsMergeInProgress()); + ASSERT_TRUE(statuses[4].ok()); + ASSERT_EQ(values[4], "val5"); + ASSERT_TRUE(statuses[5].ok()); + ASSERT_EQ(values[5], "val6"); + ASSERT_TRUE(statuses[6].ok()); + ASSERT_EQ(values[6], "foo"); + delete txn; + for (auto handle : handles) { + delete handle; + } +} + TEST_P(TransactionTest, ColumnFamiliesTest2) { WriteOptions write_options; ReadOptions read_options, snapshot_read_options; diff --git a/utilities/write_batch_with_index/write_batch_with_index.cc b/utilities/write_batch_with_index/write_batch_with_index.cc index c620ebd4d..adec3475c 100644 --- a/utilities/write_batch_with_index/write_batch_with_index.cc +++ b/utilities/write_batch_with_index/write_batch_with_index.cc @@ -922,6 +922,109 @@ Status WriteBatchWithIndex::GetFromBatchAndDB( return s; } +void WriteBatchWithIndex::MultiGetFromBatchAndDB( + DB* db, const ReadOptions& read_options, ColumnFamilyHandle* column_family, + const size_t num_keys, const Slice* keys, PinnableSlice* values, + Status* statuses, bool sorted_input) { + MultiGetFromBatchAndDB(db, read_options, column_family, num_keys, keys, + values, statuses, sorted_input, nullptr); +} + +void WriteBatchWithIndex::MultiGetFromBatchAndDB( + DB* db, const ReadOptions& read_options, ColumnFamilyHandle* column_family, + const size_t num_keys, const Slice* keys, PinnableSlice* values, + Status* statuses, bool sorted_input, ReadCallback* callback) { + const ImmutableDBOptions& immuable_db_options = + static_cast_with_check(db->GetRootDB()) + ->immutable_db_options(); + + autovector key_context; + // To hold merges from the write batch + autovector, + MultiGetContext::MAX_BATCH_SIZE> + merges; + // Since the lifetime of the WriteBatch is the same as that of the transaction + // we cannot pin it as otherwise the returned value will not be available + // after the transaction finishes. + for (size_t i = 0; i < num_keys; ++i) { + MergeContext merge_context; + PinnableSlice* pinnable_val = &values[i]; + std::string& batch_value = *pinnable_val->GetSelf(); + Status* s = &statuses[i]; + WriteBatchWithIndexInternal::Result result = + WriteBatchWithIndexInternal::GetFromBatch( + immuable_db_options, this, column_family, keys[i], &merge_context, + &rep->comparator, &batch_value, rep->overwrite_key, s); + + if (result == WriteBatchWithIndexInternal::Result::kFound) { + pinnable_val->PinSelf(); + continue; + } + if (result == WriteBatchWithIndexInternal::Result::kDeleted) { + *s = Status::NotFound(); + continue; + } + if (result == WriteBatchWithIndexInternal::Result::kError) { + continue; + } + if (result == WriteBatchWithIndexInternal::Result::kMergeInProgress && + rep->overwrite_key == true) { + // Since we've overwritten keys, we do not know what other operations are + // in this batch for this key, so we cannot do a Merge to compute the + // result. Instead, we will simply return MergeInProgress. + *s = Status::MergeInProgress(); + continue; + } + + assert(result == WriteBatchWithIndexInternal::Result::kMergeInProgress || + result == WriteBatchWithIndexInternal::Result::kNotFound); + key_context.emplace_back(keys[i], &values[i], &statuses[i]); + merges.emplace_back(result, std::move(merge_context)); + } + + // Did not find key in batch OR could not resolve Merges. Try DB. + static_cast_with_check(db->GetRootDB()) + ->MultiGetImpl(read_options, column_family, key_context, sorted_input, + callback); + + ColumnFamilyHandleImpl* cfh = + reinterpret_cast(column_family); + const MergeOperator* merge_operator = cfh->cfd()->ioptions()->merge_operator; + for (auto iter = key_context.begin(); iter != key_context.end(); ++iter) { + KeyContext& key = *iter; + if (key.s->ok() || key.s->IsNotFound()) { // DB Get Succeeded + size_t index = iter - key_context.begin(); + std::pair& + merge_result = merges[index]; + if (merge_result.first == + WriteBatchWithIndexInternal::Result::kMergeInProgress) { + // Merge result from DB with merges in Batch + Statistics* statistics = immuable_db_options.statistics.get(); + Env* env = immuable_db_options.env; + Logger* logger = immuable_db_options.info_log.get(); + + Slice* merge_data; + if (key.s->ok()) { + merge_data = iter->value; + } else { // Key not present in db (s.IsNotFound()) + merge_data = nullptr; + } + + if (merge_operator) { + *key.s = MergeHelper::TimedFullMerge( + merge_operator, *key.key, merge_data, + merge_result.second.GetOperands(), key.value->GetSelf(), logger, + statistics, env); + key.value->PinSelf(); + } else { + *key.s = + Status::InvalidArgument("Options::merge_operator must be set"); + } + } + } + } +} + void WriteBatchWithIndex::SetSavePoint() { rep->write_batch.SetSavePoint(); } Status WriteBatchWithIndex::RollbackToSavePoint() {