diff --git a/db/db_basic_test.cc b/db/db_basic_test.cc index 9b820f1a3..99fa47b0b 100644 --- a/db/db_basic_test.cc +++ b/db/db_basic_test.cc @@ -1359,10 +1359,7 @@ TEST_F(DBBasicTest, GetMergeOperands) { private: size_t limit_ = 0; }; - std::vector rest; - int* a = new int(); - *a = 5; - rest.push_back(*a); + Options options; options.create_if_missing = true; // Use only the latest two merge operands. @@ -1370,22 +1367,26 @@ TEST_F(DBBasicTest, GetMergeOperands) { std::make_shared(2, ','); options.env = env_; Reopen(options); + int size = 4; + // All K1 values are in memtable. ASSERT_OK(Merge("k1", "a")); - Put("k1", "asd"); + Put("k1", "x"); ASSERT_OK(Merge("k1", "b")); ASSERT_OK(Merge("k1", "c")); ASSERT_OK(Merge("k1", "d")); - std::vector values; - db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(), "k1", &values); + std::vector values(size); + db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(), "k1", values.data(), size); + ASSERT_EQ(values[0], "x"); + ASSERT_EQ(values[1], "b"); + ASSERT_EQ(values[2], "c"); + ASSERT_EQ(values[3], "d"); for(PinnableSlice& value: values) { std::cout << *value.GetSelf() << "\n"; } - std::string value; - ASSERT_TRUE(db_->Get(ReadOptions(), "k1", &value).ok()); - // Make sure that only the latest two merge operands are used. If this was - // not the case the value would be "a,b,c,d". - ASSERT_EQ(value, "c,d"); + // Size is less than number of merge operands so status should be Aborted. + Status status = db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(), "k1", values.data(), size-1); + ASSERT_EQ(status.IsAborted(), true); // All K2 values are flushed to L0 into a single file. ASSERT_OK(Merge("k2", "a")); @@ -1393,13 +1394,14 @@ TEST_F(DBBasicTest, GetMergeOperands) { ASSERT_OK(Merge("k2", "c")); ASSERT_OK(Merge("k2", "d")); ASSERT_OK(Flush()); - std::vector values2(4); - db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(), "k2", &values2); - for(PinnableSlice& psl: values2) { + db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(), "k2", values.data(), size); + ASSERT_EQ(values[0], "a"); + ASSERT_EQ(values[1], "b"); + ASSERT_EQ(values[2], "c"); + ASSERT_EQ(values[3], "d"); + for(PinnableSlice& psl: values) { std::cout << *psl.GetSelf() << "\n"; } - ASSERT_TRUE(db_->Get(ReadOptions(), "k2", &value).ok()); - ASSERT_EQ(value, "c,d"); // All K3 values are flushed and are in different files. ASSERT_OK(Merge("k3", "ab")); @@ -1409,15 +1411,15 @@ TEST_F(DBBasicTest, GetMergeOperands) { ASSERT_OK(Merge("k3", "cd")); ASSERT_OK(Flush()); ASSERT_OK(Merge("k3", "de")); - std::vector values3(4); - db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(), "k3", &values3); - for(PinnableSlice& psl: values3) { + db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(), "k3", values.data(), size); + ASSERT_EQ(values[0], "ab"); + ASSERT_EQ(values[1], "bc"); + ASSERT_EQ(values[2], "cd"); + ASSERT_EQ(values[3], "de"); + for(PinnableSlice& psl: values) { std::cout << *psl.GetSelf() << "\n"; } - ASSERT_TRUE(db_->Get(ReadOptions(), "k3", &value).ok()); - ASSERT_EQ(value, "cd,de"); - // All K4 values are in different levels ASSERT_OK(Merge("k4", "ab")); ASSERT_OK(Flush()); @@ -1429,14 +1431,32 @@ TEST_F(DBBasicTest, GetMergeOperands) { ASSERT_OK(Flush()); MoveFilesToLevel(1); ASSERT_OK(Merge("k4", "de")); - ASSERT_TRUE(db_->Get(ReadOptions(), "k4", &value).ok()); - ASSERT_EQ(value, "cd,de"); - - std::vector values4(4); - db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(), "k4", &values4); - for(PinnableSlice& psl: values4) { + db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(), "k4", values.data(), size); + ASSERT_EQ(values[0], "ab"); + ASSERT_EQ(values[1], "bc"); + ASSERT_EQ(values[2], "cd"); + ASSERT_EQ(values[3], "de"); + for(PinnableSlice& psl: values) { std::cout << *psl.GetSelf() << "\n"; } + +// ASSERT_OK(Merge("k5", "a")); +// ASSERT_OK(Merge("k5", "b")); +// ASSERT_OK(Merge("k5", "c")); +// ASSERT_OK(Merge("k5", "d")); +// rocksdb::SyncPoint::GetInstance()->LoadDependency( +// {{"DBBasicTest.GetMergeOperands", "FlushJob::Start"}} +// ); +// rocksdb::SyncPoint::GetInstance()->EnableProcessing(); +// ASSERT_OK(Flush()); +// std::vector values5(4); +// db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(), "k5", values5.data(), 4); +// for(PinnableSlice& psl: values5) { +// std::cout << *psl.GetSelf() << "\n"; +// } +// TEST_SYNC_POINT("DBBasicTest.GetMergeOperands"); +// rocksdb::SyncPoint::GetInstance()->DisableProcessing(); + } class DBBasicTestWithParallelIO diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index 68abb549c..1fb9f1cda 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -1457,116 +1457,8 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, ColumnFamilyHandle* column_family, const Slice& key, PinnableSlice* pinnable_val, bool* value_found, ReadCallback* callback, bool* is_blob_index) { - assert(pinnable_val != nullptr); - PERF_CPU_TIMER_GUARD(get_cpu_nanos, env_); - StopWatch sw(env_, stats_, DB_GET); - PERF_TIMER_GUARD(get_snapshot_time); - - auto cfh = reinterpret_cast(column_family); - auto cfd = cfh->cfd(); - - if (tracer_) { - // TODO: This mutex should be removed later, to improve performance when - // tracing is enabled. - InstrumentedMutexLock lock(&trace_mutex_); - if (tracer_) { - tracer_->Get(column_family, key); - } - } - - // Acquire SuperVersion - SuperVersion* sv = GetAndRefSuperVersion(cfd); - - TEST_SYNC_POINT("DBImpl::GetImpl:1"); - TEST_SYNC_POINT("DBImpl::GetImpl:2"); - - SequenceNumber snapshot; - if (read_options.snapshot != nullptr) { - if (callback) { - // Already calculated based on read_options.snapshot - snapshot = callback->max_visible_seq(); - } else { - snapshot = - reinterpret_cast(read_options.snapshot)->number_; - } - } else { - // Note that the snapshot is assigned AFTER referencing the super - // version because otherwise a flush happening in between may compact away - // data for the snapshot, so the reader would see neither data that was be - // visible to the snapshot before compaction nor the newer data inserted - // afterwards. - snapshot = last_seq_same_as_publish_seq_ - ? versions_->LastSequence() - : versions_->LastPublishedSequence(); - if (callback) { - callback->Refresh(snapshot); - } - } - TEST_SYNC_POINT("DBImpl::GetImpl:3"); - TEST_SYNC_POINT("DBImpl::GetImpl:4"); - - // Prepare to store a list of merge operations if merge occurs. - MergeContext merge_context; - SequenceNumber max_covering_tombstone_seq = 0; - - Status s; - // First look in the memtable, then in the immutable memtable (if any). - // s is both in/out. When in, s could either be OK or MergeInProgress. - // merge_operands will contain the sequence of merges in the latter case. - LookupKey lkey(key, snapshot); - PERF_TIMER_STOP(get_snapshot_time); - - bool skip_memtable = (read_options.read_tier == kPersistedTier && - has_unpersisted_data_.load(std::memory_order_relaxed)); - bool done = false; - if (!skip_memtable) { - if (sv->mem->Get(lkey, pinnable_val->GetSelf(), &s, &merge_context, - &max_covering_tombstone_seq, read_options, callback, - is_blob_index)) { - done = true; - pinnable_val->PinSelf(); - RecordTick(stats_, MEMTABLE_HIT); - } else if ((s.ok() || s.IsMergeInProgress()) && - sv->imm->Get(lkey, pinnable_val->GetSelf(), &s, &merge_context, - &max_covering_tombstone_seq, read_options, callback, - is_blob_index)) { - done = true; - pinnable_val->PinSelf(); - RecordTick(stats_, MEMTABLE_HIT); - } - if (!done && !s.ok() && !s.IsMergeInProgress()) { - ReturnAndCleanupSuperVersion(cfd, sv); - return s; - } - } - if (!done) { - PERF_TIMER_GUARD(get_from_output_files_time); - sv->current->Get(read_options, lkey, pinnable_val, &s, &merge_context, - &max_covering_tombstone_seq, value_found, nullptr, nullptr, - callback, is_blob_index); - RecordTick(stats_, MEMTABLE_MISS); - } - - { - PERF_TIMER_GUARD(get_post_process_time); - - ReturnAndCleanupSuperVersion(cfd, sv); - - RecordTick(stats_, NUMBER_KEYS_READ); - size_t size = 0; - if (s.ok()) { - size = pinnable_val->size(); - RecordTick(stats_, BYTES_READ, size); - PERF_COUNTER_ADD(get_read_bytes, size); - } - RecordInHistogram(stats_, BYTES_PER_READ, size); - } - return s; -} - -Status DBImpl::GetMergeOperands(const ReadOptions& read_options, - ColumnFamilyHandle* column_family, const Slice& key, - std::vector* pinnable_val) { +// return GetValOrGetMergeOperands(read_options, column_family, key, pinnable_val, +// 0, value_found, callback, is_blob_index, true); assert(pinnable_val != nullptr); PERF_CPU_TIMER_GUARD(get_cpu_nanos, env_); StopWatch sw(env_, stats_, DB_GET); @@ -1587,10 +1479,18 @@ Status DBImpl::GetMergeOperands(const ReadOptions& read_options, // Acquire SuperVersion SuperVersion* sv = GetAndRefSuperVersion(cfd); + TEST_SYNC_POINT("DBImpl::GetImpl:1"); + TEST_SYNC_POINT("DBImpl::GetImpl:2"); + SequenceNumber snapshot; if (read_options.snapshot != nullptr) { + if (callback) { + // Already calculated based on read_options.snapshot + snapshot = callback->max_visible_seq(); + } else { snapshot = reinterpret_cast(read_options.snapshot)->number_; + } } else { // Note that the snapshot is assigned AFTER referencing the super // version because otherwise a flush happening in between may compact away @@ -1600,7 +1500,12 @@ Status DBImpl::GetMergeOperands(const ReadOptions& read_options, snapshot = last_seq_same_as_publish_seq_ ? versions_->LastSequence() : versions_->LastPublishedSequence(); + if (callback) { + callback->Refresh(snapshot); + } } + TEST_SYNC_POINT("DBImpl::GetImpl:3"); + TEST_SYNC_POINT("DBImpl::GetImpl:4"); // Prepare to store a list of merge operations if merge occurs. MergeContext merge_context; @@ -1617,15 +1522,19 @@ Status DBImpl::GetMergeOperands(const ReadOptions& read_options, has_unpersisted_data_.load(std::memory_order_relaxed)); bool done = false; if (!skip_memtable) { - if (sv->mem->GetMergeOperands(lkey, pinnable_val, &s, &merge_context, - &max_covering_tombstone_seq, read_options)) { - done = true; - RecordTick(stats_, MEMTABLE_HIT); + if (sv->mem->Get(lkey, pinnable_val->GetSelf(), &s, &merge_context, + &max_covering_tombstone_seq, read_options, callback, + is_blob_index)) { + done = true; + pinnable_val->PinSelf(); + RecordTick(stats_, MEMTABLE_HIT); } else if ((s.ok() || s.IsMergeInProgress()) && - sv->imm->GetMergeOperands(lkey, pinnable_val, &s, &merge_context, - &max_covering_tombstone_seq, read_options)) { - done = true; - RecordTick(stats_, MEMTABLE_HIT); + sv->imm->Get(lkey, pinnable_val->GetSelf(), &s, &merge_context, + &max_covering_tombstone_seq, read_options, callback, + is_blob_index)) { + done = true; + pinnable_val->PinSelf(); + RecordTick(stats_, MEMTABLE_HIT); } if (!done && !s.ok() && !s.IsMergeInProgress()) { ReturnAndCleanupSuperVersion(cfd, sv); @@ -1634,9 +1543,9 @@ Status DBImpl::GetMergeOperands(const ReadOptions& read_options, } if (!done) { PERF_TIMER_GUARD(get_from_output_files_time); - sv->current->GetMergeOperands(read_options, lkey, pinnable_val, &s, &merge_context, - &max_covering_tombstone_seq, nullptr, nullptr, nullptr, - nullptr, nullptr); + sv->current->Get(read_options, lkey, pinnable_val, &s, &merge_context, + &max_covering_tombstone_seq, value_found, nullptr, nullptr, + callback, is_blob_index); RecordTick(stats_, MEMTABLE_MISS); } @@ -1657,6 +1566,160 @@ Status DBImpl::GetMergeOperands(const ReadOptions& read_options, return s; } +Status DBImpl::GetValOrGetMergeOperands(const ReadOptions& read_options, + ColumnFamilyHandle* column_family, const Slice& key, + PinnableSlice* pinnable_val, int num_records, bool* value_found, + ReadCallback* callback, bool* is_blob_index, bool get_val) { + + assert(pinnable_val != nullptr); + PERF_CPU_TIMER_GUARD(get_cpu_nanos, env_); + StopWatch sw(env_, stats_, DB_GET); + PERF_TIMER_GUARD(get_snapshot_time); + + auto cfh = reinterpret_cast(column_family); + auto cfd = cfh->cfd(); + + if (tracer_) { + // TODO: This mutex should be removed later, to improve performance when + // tracing is enabled. + InstrumentedMutexLock lock(&trace_mutex_); + if (tracer_) { + tracer_->Get(column_family, key); + } + } + + // Acquire SuperVersion + SuperVersion* sv = GetAndRefSuperVersion(cfd); + + TEST_SYNC_POINT("DBImpl::GetImpl:1"); + TEST_SYNC_POINT("DBImpl::GetImpl:2"); + + SequenceNumber snapshot; + if (read_options.snapshot != nullptr) { + if (callback) { + // Already calculated based on read_options.snapshot + snapshot = callback->max_visible_seq(); + } else { + snapshot = + reinterpret_cast(read_options.snapshot)->number_; + } + } else { + // Note that the snapshot is assigned AFTER referencing the super + // version because otherwise a flush happening in between may compact away + // data for the snapshot, so the reader would see neither data that was be + // visible to the snapshot before compaction nor the newer data inserted + // afterwards. + snapshot = last_seq_same_as_publish_seq_ + ? versions_->LastSequence() + : versions_->LastPublishedSequence(); + if (callback) { + callback->Refresh(snapshot); + } + } + TEST_SYNC_POINT("DBImpl::GetImpl:3"); + TEST_SYNC_POINT("DBImpl::GetImpl:4"); + + // Prepare to store a list of merge operations if merge occurs. + MergeContext merge_context; + SequenceNumber max_covering_tombstone_seq = 0; + + Status s; + // First look in the memtable, then in the immutable memtable (if any). + // s is both in/out. When in, s could either be OK or MergeInProgress. + // merge_operands will contain the sequence of merges in the latter case. + LookupKey lkey(key, snapshot); + PERF_TIMER_STOP(get_snapshot_time); + + bool skip_memtable = (read_options.read_tier == kPersistedTier && + has_unpersisted_data_.load(std::memory_order_relaxed)); + bool done = false; + if (!skip_memtable) { + if (get_val) { + if (sv->mem->Get(lkey, pinnable_val->GetSelf(), &s, &merge_context, + &max_covering_tombstone_seq, read_options, callback, + is_blob_index)) { + done = true; + pinnable_val->PinSelf(); + RecordTick(stats_, MEMTABLE_HIT); + } else if ((s.ok() || s.IsMergeInProgress()) && + sv->imm->Get(lkey, pinnable_val->GetSelf(), &s, &merge_context, + &max_covering_tombstone_seq, read_options, callback, + is_blob_index)) { + done = true; + pinnable_val->PinSelf(); + RecordTick(stats_, MEMTABLE_HIT); + } + if (!done && !s.ok() && !s.IsMergeInProgress()) { + ReturnAndCleanupSuperVersion(cfd, sv); + return s; + } + } else { + if (sv->mem->GetMergeOperands(lkey, pinnable_val, num_records, &s, &merge_context, + &max_covering_tombstone_seq, read_options)) { + done = true; + RecordTick(stats_, MEMTABLE_HIT); + } else if ((s.ok() || s.IsMergeInProgress()) && + sv->imm->GetMergeOperands(lkey, pinnable_val, num_records, &s, &merge_context, + &max_covering_tombstone_seq, read_options)) { + done = true; + RecordTick(stats_, MEMTABLE_HIT); + } + if (!done && !s.ok() && !s.IsMergeInProgress()) { + ReturnAndCleanupSuperVersion(cfd, sv); + return s; + } + } + } + if (!done) { + PERF_TIMER_GUARD(get_from_output_files_time); + if (get_val) { + sv->current->Get(read_options, lkey, pinnable_val, &s, &merge_context, + &max_covering_tombstone_seq, value_found, nullptr, nullptr, + callback, is_blob_index); + + } else { + sv->current->GetMergeOperands(read_options, lkey, pinnable_val, num_records, &s, &merge_context, + &max_covering_tombstone_seq, nullptr, nullptr, nullptr, + nullptr, nullptr); + } + RecordTick(stats_, MEMTABLE_MISS); + } + + { + PERF_TIMER_GUARD(get_post_process_time); + + ReturnAndCleanupSuperVersion(cfd, sv); + + RecordTick(stats_, NUMBER_KEYS_READ); + size_t size = 0; + if (s.ok()) { + if (get_val) { + size = pinnable_val->size(); + } else { + int itr = 0; + while (itr < num_records) { + size += pinnable_val->size(); + itr++; + pinnable_val++; + } + } + + RecordTick(stats_, BYTES_READ, size); + PERF_COUNTER_ADD(get_read_bytes, size); + } + RecordInHistogram(stats_, BYTES_PER_READ, size); + } + return s; + +} + +Status DBImpl::GetMergeOperands(const ReadOptions& read_options, + ColumnFamilyHandle* column_family, const Slice& key, + PinnableSlice* slice, int size) { + return GetValOrGetMergeOperands(read_options, column_family, key, slice, size, + nullptr, nullptr, nullptr, false); +} + std::vector DBImpl::MultiGet( const ReadOptions& read_options, const std::vector& column_family, diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 3bb9521f1..e68a7c911 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -162,7 +162,7 @@ class DBImpl : public DB { using DB::GetMergeOperands; virtual Status GetMergeOperands(const ReadOptions& options, ColumnFamilyHandle* column_family, const Slice& key, - std::vector* value) override; + PinnableSlice* slice, int size) override; using DB::MultiGet; virtual std::vector MultiGet( @@ -406,6 +406,11 @@ class DBImpl : public DB { bool* value_found = nullptr, ReadCallback* callback = nullptr, bool* is_blob_index = nullptr); + Status GetValOrGetMergeOperands(const ReadOptions& read_options, + ColumnFamilyHandle* column_family, const Slice& key, + PinnableSlice* pinnable_val, int num_records = 0, bool* value_found = nullptr, + ReadCallback* callback = nullptr, bool* is_blob_index = nullptr, bool get_val = false); + ArenaWrappedDBIter* NewIteratorImpl(const ReadOptions& options, ColumnFamilyData* cfd, SequenceNumber snapshot, diff --git a/db/db_merge_operator_test.cc b/db/db_merge_operator_test.cc index f6d6c4588..49ec115ed 100644 --- a/db/db_merge_operator_test.cc +++ b/db/db_merge_operator_test.cc @@ -86,15 +86,9 @@ TEST_F(DBMergeOperatorTest, LimitMergeOperands) { Reopen(options); // All K1 values are in memtable. ASSERT_OK(Merge("k1", "a")); - Put("k1", "asd"); ASSERT_OK(Merge("k1", "b")); ASSERT_OK(Merge("k1", "c")); ASSERT_OK(Merge("k1", "d")); - std::vector values; - db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(), "k1", &values); - for(PinnableSlice& value: values) { - std::cout << *value.GetSelf() << "\n"; - } std::string value; ASSERT_TRUE(db_->Get(ReadOptions(), "k1", &value).ok()); // Make sure that only the latest two merge operands are used. If this was @@ -107,11 +101,6 @@ TEST_F(DBMergeOperatorTest, LimitMergeOperands) { ASSERT_OK(Merge("k2", "c")); ASSERT_OK(Merge("k2", "d")); ASSERT_OK(Flush()); - std::vector values2(4); - db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(), "k2", &values2); - for(PinnableSlice& psl: values2) { - std::cout << *psl.GetSelf() << "\n"; - } ASSERT_TRUE(db_->Get(ReadOptions(), "k2", &value).ok()); ASSERT_EQ(value, "c,d"); @@ -123,12 +112,6 @@ TEST_F(DBMergeOperatorTest, LimitMergeOperands) { ASSERT_OK(Merge("k3", "cd")); ASSERT_OK(Flush()); ASSERT_OK(Merge("k3", "de")); - std::vector values3(4); - db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(), "k3", &values3); - for(PinnableSlice& psl: values3) { - std::cout << *psl.GetSelf() << "\n"; - } - ASSERT_TRUE(db_->Get(ReadOptions(), "k3", &value).ok()); ASSERT_EQ(value, "cd,de"); @@ -146,12 +129,6 @@ TEST_F(DBMergeOperatorTest, LimitMergeOperands) { ASSERT_TRUE(db_->Get(ReadOptions(), "k4", &value).ok()); ASSERT_EQ(value, "cd,de"); - std::vector values4(4); - db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(), "k4", &values4); - for(PinnableSlice& psl: values4) { - std::cout << *psl.GetSelf() << "\n"; - } - } TEST_F(DBMergeOperatorTest, MergeErrorOnRead) { diff --git a/db/db_test.cc b/db/db_test.cc index 75f40fc1b..2e9dbcba0 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -2469,12 +2469,13 @@ class ModelDB : public DB { using DB::GetMergeOperands; virtual Status GetMergeOperands(const ReadOptions& options, ColumnFamilyHandle* column_family, const Slice& key, - std::vector* value) override { + PinnableSlice* slice, int size) override { (void)options; (void) column_family; (void) key; - (void) value; + (void) slice; + (void) size; return Status::NotSupported(key); } diff --git a/db/memtable.cc b/db/memtable.cc index 42504ee0e..3973d441d 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -830,7 +830,7 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s, } bool MemTable::GetMergeOperands(const LookupKey& key, - std::vector* pinnable_val, + PinnableSlice* slice, int size, Status* s, MergeContext* merge_context, SequenceNumber* max_covering_tombstone_seq, const ReadOptions& read_opts, @@ -894,10 +894,13 @@ bool MemTable::GetMergeOperands(const LookupKey& key, saver.callback_ = callback; saver.is_blob_index = is_blob_index; table_->Get(key, &saver, SaveValue, false); - PinnableSlice* psliceptr = pinnable_val->data(); - for (Slice slice : saver.merge_context->GetOperands()) { - psliceptr->PinSelf(slice); - psliceptr++; + if (saver.merge_context->GetNumOperands() > (unsigned)size) { + *s = Status::Aborted("Number of merge operands: " + +std::to_string(saver.merge_context->GetNumOperands())+" more than size of vector"); + } + for (Slice sl : saver.merge_context->GetOperands()) { + slice->PinSelf(sl); + slice++; } } diff --git a/db/memtable.h b/db/memtable.h index 5bdc66c74..fd4c72dc3 100644 --- a/db/memtable.h +++ b/db/memtable.h @@ -205,7 +205,7 @@ class MemTable { } bool GetMergeOperands(const LookupKey& key, - std::vector* pinnable_val, + PinnableSlice* slice, int size, Status* s, MergeContext* merge_context, SequenceNumber* max_covering_tombstone_seq, const ReadOptions& read_opts, diff --git a/db/memtable_list.cc b/db/memtable_list.cc index 3a41b241a..70f53ea44 100644 --- a/db/memtable_list.cc +++ b/db/memtable_list.cc @@ -110,14 +110,14 @@ bool MemTableListVersion::Get(const LookupKey& key, std::string* value, } bool MemTableListVersion::GetMergeOperands(const LookupKey& key, - std::vector* pinnable_val, + PinnableSlice* slice, int size, Status* s, MergeContext* merge_context, SequenceNumber* max_covering_tombstone_seq, const ReadOptions& read_opts, ReadCallback* callback, bool* is_blob_index) { for (MemTable* memtable : memlist_) { - bool done = memtable->GetMergeOperands(key, pinnable_val, s, merge_context, + bool done = memtable->GetMergeOperands(key, slice, size, s, merge_context, max_covering_tombstone_seq, read_opts, callback, is_blob_index); if (done) { return true; diff --git a/db/memtable_list.h b/db/memtable_list.h index 7d1a49a75..0f876814c 100644 --- a/db/memtable_list.h +++ b/db/memtable_list.h @@ -72,7 +72,7 @@ class MemTableListVersion { } bool GetMergeOperands(const LookupKey& key, - std::vector* pinnable_val, + PinnableSlice* slice, int size, Status* s, MergeContext* merge_context, SequenceNumber* max_covering_tombstone_seq, const ReadOptions& read_opts, diff --git a/db/version_set.cc b/db/version_set.cc index 30fbb65cf..a977e9cdc 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -1784,7 +1784,7 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k, } void Version::GetMergeOperands(const ReadOptions& read_options, const LookupKey& k, - std::vector* pinnable_val, Status* status, + PinnableSlice* slice, int size, Status* status, MergeContext* merge_context, SequenceNumber* max_covering_tombstone_seq, bool* value_found, bool* key_exists, SequenceNumber* seq, ReadCallback* callback, @@ -1804,7 +1804,7 @@ void Version::GetMergeOperands(const ReadOptions& read_options, const LookupKey& user_comparator(), merge_operator_, info_log_, db_statistics_, status->ok() ? GetContext::kNotFound : GetContext::kMerge, user_key, nullptr, value_found, merge_context, max_covering_tombstone_seq, this->env_, - seq, merge_operator_ ? &pinned_iters_mgr : nullptr, callback, is_blob, false); + seq, merge_operator_ ? &pinned_iters_mgr : nullptr, callback, is_blob, 0, false); // Pin blocks that we read to hold merge operands if (merge_operator_) { @@ -1890,10 +1890,13 @@ void Version::GetMergeOperands(const ReadOptions& read_options, const LookupKey& if (db_statistics_ != nullptr) { get_context.ReportCounters(); } - PinnableSlice* pin_slice = pinnable_val->data(); - for (Slice slice : merge_context->GetOperands()){ - pin_slice->PinSelf(slice); - pin_slice++; + if (merge_context->GetNumOperands() > (unsigned)size) { + *status = Status::Aborted("NUmber of merge operands: " + +std::to_string(merge_context->GetNumOperands())+" more than size of vector"); + } + for (Slice sl : merge_context->GetOperands()){ + slice->PinSelf(sl); + slice++; } } diff --git a/db/version_set.h b/db/version_set.h index 722cec289..47b570991 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -584,7 +584,7 @@ class Version { bool* is_blob = nullptr); void GetMergeOperands(const ReadOptions&, const LookupKey& key, - std::vector* pinnable_val, + PinnableSlice* slice, int size, Status* status, MergeContext* merge_context, SequenceNumber* max_covering_tombstone_seq, bool* value_found = nullptr, bool* key_exists = nullptr, diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index fd0e7d2ef..2eba9e61d 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -405,7 +405,7 @@ class DB { virtual Status GetMergeOperands(const ReadOptions& options, ColumnFamilyHandle* column_family, const Slice& key, - std::vector* value) = 0; + PinnableSlice* slice, int size) = 0; // If keys[i] does not exist in the database, then the i'th returned // status will be one for which Status::IsNotFound() is true, and diff --git a/include/rocksdb/utilities/stackable_db.h b/include/rocksdb/utilities/stackable_db.h index 92ed2e984..4f189a54c 100644 --- a/include/rocksdb/utilities/stackable_db.h +++ b/include/rocksdb/utilities/stackable_db.h @@ -91,8 +91,8 @@ class StackableDB : public DB { using DB::GetMergeOperands; virtual Status GetMergeOperands(const ReadOptions& options, ColumnFamilyHandle* column_family, const Slice& key, - std::vector* value) override { - return db_->GetMergeOperands(options, column_family, key, value); + PinnableSlice* slice, int size) override { + return db_->GetMergeOperands(options, column_family, key, slice, size); } using DB::MultiGet;