diff --git a/utilities/write_batch_with_index/write_batch_with_index.cc b/utilities/write_batch_with_index/write_batch_with_index.cc index 97b565a13..6ad54f219 100644 --- a/utilities/write_batch_with_index/write_batch_with_index.cc +++ b/utilities/write_batch_with_index/write_batch_with_index.cc @@ -539,13 +539,15 @@ void WriteBatchWithIndex::MultiGetFromBatchAndDB( // after the transaction finishes. for (size_t i = 0; i < num_keys; ++i) { MergeContext merge_context; - PinnableSlice* pinnable_val = &values[i]; - std::string& batch_value = *pinnable_val->GetSelf(); + std::string batch_value; Status* s = &statuses[i]; + PinnableSlice* pinnable_val = &values[i]; + pinnable_val->Reset(); auto result = wbwii.GetFromBatch(this, keys[i], &merge_context, &batch_value, s); if (result == WBWIIteratorImpl::kFound) { + *pinnable_val->GetSelf() = std::move(batch_value); pinnable_val->PinSelf(); continue; } @@ -581,15 +583,20 @@ void WriteBatchWithIndex::MultiGetFromBatchAndDB( std::pair& merge_result = merges[index]; if (merge_result.first == WBWIIteratorImpl::kMergeInProgress) { + std::string merged_value; // Merge result from DB with merges in Batch if (key.s->ok()) { *key.s = wbwii.MergeKey(*key.key, iter->value, merge_result.second, - key.value->GetSelf()); + &merged_value); } else { // Key not present in db (s.IsNotFound()) *key.s = wbwii.MergeKey(*key.key, nullptr, merge_result.second, - key.value->GetSelf()); + &merged_value); + } + if (key.s->ok()) { + key.value->Reset(); + *key.value->GetSelf() = std::move(merged_value); + key.value->PinSelf(); } - key.value->PinSelf(); } } } diff --git a/utilities/write_batch_with_index/write_batch_with_index_internal.cc b/utilities/write_batch_with_index/write_batch_with_index_internal.cc index 81674cf09..6b264617c 100644 --- a/utilities/write_batch_with_index/write_batch_with_index_internal.cc +++ b/utilities/write_batch_with_index/write_batch_with_index_internal.cc @@ -621,8 +621,7 @@ WriteBatchWithIndexInternal::WriteBatchWithIndexInternal( Status WriteBatchWithIndexInternal::MergeKey(const Slice& key, const Slice* value, const MergeContext& context, - std::string* result, - Slice* result_operand) const { + std::string* result) const { if (column_family_ != nullptr) { auto cfh = static_cast_with_check(column_family_); const auto merge_operator = cfh->cfd()->ioptions()->merge_operator.get(); @@ -638,7 +637,7 @@ Status WriteBatchWithIndexInternal::MergeKey(const Slice& key, SystemClock* clock = immutable_db_options.clock; return MergeHelper::TimedFullMerge(merge_operator, key, value, context.GetOperands(), result, logger, - statistics, clock, result_operand); + statistics, clock); } else if (db_options_ != nullptr) { Statistics* statistics = db_options_->statistics.get(); Env* env = db_options_->env; @@ -646,12 +645,12 @@ Status WriteBatchWithIndexInternal::MergeKey(const Slice& key, SystemClock* clock = env->GetSystemClock().get(); return MergeHelper::TimedFullMerge(merge_operator, key, value, context.GetOperands(), result, logger, - statistics, clock, result_operand); + statistics, clock); } else { const auto cf_opts = cfh->cfd()->ioptions(); return MergeHelper::TimedFullMerge( merge_operator, key, value, context.GetOperands(), result, - cf_opts->logger, cf_opts->stats, cf_opts->clock, result_operand); + cf_opts->logger, cf_opts->stats, cf_opts->clock); } } else { return Status::InvalidArgument("Must provide a column_family"); diff --git a/utilities/write_batch_with_index/write_batch_with_index_internal.h b/utilities/write_batch_with_index/write_batch_with_index_internal.h index 60ec66e30..b3360140b 100644 --- a/utilities/write_batch_with_index/write_batch_with_index_internal.h +++ b/utilities/write_batch_with_index/write_batch_with_index_internal.h @@ -306,13 +306,12 @@ class WriteBatchWithIndexInternal { const Slice& key, MergeContext* merge_context, std::string* value, Status* s); - Status MergeKey(const Slice& key, const Slice* value, std::string* result, - Slice* result_operand = nullptr) const { - return MergeKey(key, value, merge_context_, result, result_operand); + Status MergeKey(const Slice& key, const Slice* value, + std::string* result) const { + return MergeKey(key, value, merge_context_, result); } Status MergeKey(const Slice& key, const Slice* value, - const MergeContext& context, std::string* result, - Slice* result_operand = nullptr) const; + const MergeContext& context, std::string* result) const; size_t GetNumOperands() const { return merge_context_.GetNumOperands(); } MergeContext* GetMergeContext() { return &merge_context_; } Slice GetOperand(int index) const { return merge_context_.GetOperand(index); } diff --git a/utilities/write_batch_with_index/write_batch_with_index_test.cc b/utilities/write_batch_with_index/write_batch_with_index_test.cc index badfc471e..f39085efa 100644 --- a/utilities/write_batch_with_index/write_batch_with_index_test.cc +++ b/utilities/write_batch_with_index/write_batch_with_index_test.cc @@ -1412,6 +1412,49 @@ TEST_P(WriteBatchWithIndexTest, TestGetFromBatchAndDBMerge3) { ASSERT_EQ(value, "1,2"); } +TEST_P(WriteBatchWithIndexTest, TestPinnedGetFromBatchAndDB) { + Status s = OpenDB(); + ASSERT_OK(s); + + PinnableSlice value; + + ASSERT_OK(db_->Put(write_opts_, "a", "a0")); + ASSERT_OK(db_->Put(write_opts_, "b", "b0")); + ASSERT_OK(db_->Merge(write_opts_, "b", "b1")); + ASSERT_OK(db_->Merge(write_opts_, "c", "c0")); + ASSERT_OK(db_->Merge(write_opts_, "d", "d0")); + ASSERT_OK(batch_->Merge("a", "a1")); + ASSERT_OK(batch_->Merge("a", "a2")); + ASSERT_OK(batch_->Merge("b", "b2")); + ASSERT_OK(batch_->Merge("d", "d1")); + ASSERT_OK(batch_->Merge("e", "e0")); + + for (int i = 0; i < 2; i++) { + if (i == 1) { + // Do it again with a flushed DB... + ASSERT_OK(db_->Flush(FlushOptions(), db_->DefaultColumnFamily())); + } + ASSERT_OK(batch_->GetFromBatchAndDB(db_, read_opts_, "a", &value)); + ASSERT_EQ("a0,a1,a2", value.ToString()); + + ASSERT_OK(batch_->GetFromBatchAndDB(db_, read_opts_, "b", &value)); + ASSERT_EQ("b0,b1,b2", value.ToString()); + + ASSERT_OK(batch_->GetFromBatchAndDB(db_, read_opts_, "c", &value)); + ASSERT_EQ("c0", value.ToString()); + + ASSERT_OK(batch_->GetFromBatchAndDB(db_, read_opts_, "d", &value)); + ASSERT_EQ("d0,d1", value.ToString()); + + ASSERT_OK(batch_->GetFromBatchAndDB(db_, read_opts_, "e", &value)); + ASSERT_EQ("e0", value.ToString()); + ASSERT_OK(db_->Delete(write_opts_, "x")); + + s = batch_->GetFromBatchAndDB(db_, read_opts_, "x", &value); + ASSERT_TRUE(s.IsNotFound()); + } +} + void AssertKey(std::string key, WBWIIterator* iter) { ASSERT_TRUE(iter->Valid()); ASSERT_EQ(key, iter->Entry().key.ToString()); @@ -1808,12 +1851,12 @@ TEST_P(WriteBatchWithIndexTest, MultiGetTest) { // Write some data to the db for the even numbered keys { WriteBatch wb; - for (size_t i = 1; i < keys.size(); ++i) { + for (size_t i = 0; i < keys.size(); i += 2) { std::string val = "val" + std::to_string(i); ASSERT_OK(wb.Put(cf0, keys[i], val)); } ASSERT_OK(db_->Write(write_opts_, &wb)); - for (size_t i = 1; i < keys.size(); ++i) { + for (size_t i = 0; i < keys.size(); i += 2) { std::string value; ASSERT_OK(db_->Get(read_opts_, cf0, keys[i], &value)); } @@ -1852,21 +1895,111 @@ TEST_P(WriteBatchWithIndexTest, MultiGetTest) { } else if ((i % 7) == 0) { // Merge after Put std::string val = "new" + std::to_string(i); ASSERT_EQ(values[i], val + ",merge"); - } else { + } else if ((i % 2) == 0) { std::string val = "val" + std::to_string(i); ASSERT_EQ(values[i], val + ",merge"); + } else { + ASSERT_EQ(values[i], "merge"); } } else if ((i % 5) == 0) { ASSERT_TRUE(statuses[i].IsNotFound()); } else if ((i % 7) == 0) { ASSERT_OK(statuses[i]); ASSERT_EQ(values[i], "new" + std::to_string(i)); - } else { + } else if ((i % 2) == 0) { ASSERT_OK(statuses[i]); ASSERT_EQ(values[i], "val" + std::to_string(i)); + } else { + ASSERT_TRUE(statuses[i].IsNotFound()); } } } +TEST_P(WriteBatchWithIndexTest, MultiGetTest2) { + // MultiGet a lot of keys in order to force std::vector reallocations + const int num_keys = 700; + const int keys_per_pass = 100; + std::vector keys; + for (size_t i = 0; i < num_keys; ++i) { + keys.emplace_back(std::to_string(i)); + } + ASSERT_OK(OpenDB()); + ColumnFamilyHandle* cf0 = db_->DefaultColumnFamily(); + + // Keys 0- 99 have a PUT in the batch but not DB + // Keys 100-199 have a PUT in the DB + // Keys 200-299 Have a PUT/DELETE + // Keys 300-399 Have a PUT/DELETE/MERGE + // Keys 400-499 have a PUT/MERGE + // Keys 500-599 have a MERGE only + // Keys 600-699 were never written + { + WriteBatch wb; + for (size_t i = 100; i < 500; i++) { + std::string val = std::to_string(i); + ASSERT_OK(wb.Put(cf0, keys[i], val)); + } + ASSERT_OK(db_->Write(write_opts_, &wb)); + } + ASSERT_OK(db_->Flush(FlushOptions(), cf0)); + for (size_t i = 0; i < 100; i++) { + ASSERT_OK(batch_->Put(cf0, keys[i], keys[i])); + } + for (size_t i = 200; i < 400; i++) { + ASSERT_OK(batch_->Delete(cf0, keys[i])); + } + for (size_t i = 300; i < 600; i++) { + std::string val = std::to_string(i) + "m"; + ASSERT_OK(batch_->Merge(cf0, keys[i], val)); + } + + Random rnd(301); + std::vector values(keys_per_pass); + std::vector statuses(keys_per_pass); + for (int pass = 0; pass < 40; pass++) { + std::vector key_slices; + for (size_t i = 0; i < keys_per_pass; i++) { + int random = rnd.Uniform(num_keys); + key_slices.emplace_back(keys[random]); + } + batch_->MultiGetFromBatchAndDB(db_, read_opts_, cf0, keys_per_pass, + key_slices.data(), values.data(), + statuses.data(), false); + for (size_t i = 0; i < keys_per_pass; i++) { + int key = ParseInt(key_slices[i].ToString()); + switch (key / 100) { + case 0: // 0-99 PUT only + ASSERT_OK(statuses[i]); + ASSERT_EQ(values[i], key_slices[i].ToString()); + break; + case 1: // 100-199 PUT only + ASSERT_OK(statuses[i]); + ASSERT_EQ(values[i], key_slices[i].ToString()); + break; + case 2: // 200-299 Deleted + ASSERT_TRUE(statuses[i].IsNotFound()); + break; + case 3: // 300-399 Delete+Merge + ASSERT_OK(statuses[i]); + ASSERT_EQ(values[i], key_slices[i].ToString() + "m"); + break; + case 4: // 400-400 Put+ Merge + ASSERT_OK(statuses[i]); + ASSERT_EQ(values[i], key_slices[i].ToString() + "," + + key_slices[i].ToString() + "m"); + break; + case 5: // Merge only + ASSERT_OK(statuses[i]); + ASSERT_EQ(values[i], key_slices[i].ToString() + "m"); + break; + case 6: // Never written + ASSERT_TRUE(statuses[i].IsNotFound()); + break; + default: + assert(false); + } // end switch + } // End for each key + } // end for passes +} // This test has merges, but the merge does not play into the final result TEST_P(WriteBatchWithIndexTest, FakeMergeWithIteratorTest) {