diff --git a/db/db_basic_test.cc b/db/db_basic_test.cc index dedf93851..f2cfceae8 100644 --- a/db/db_basic_test.cc +++ b/db/db_basic_test.cc @@ -925,44 +925,44 @@ TEST_F(DBBasicTest, MmapAndBufferOptions) { #endif class TestEnv : public EnvWrapper { - public: - explicit TestEnv(Env* base_env) : EnvWrapper(base_env), close_count(0) {} + public: + explicit TestEnv(Env* base_env) : EnvWrapper(base_env), close_count(0) {} - class TestLogger : public Logger { - public: - using Logger::Logv; - explicit TestLogger(TestEnv* env_ptr) : Logger() { env = env_ptr; } - ~TestLogger() override { - if (!closed_) { - CloseHelper(); - } - } - void Logv(const char* /*format*/, va_list /*ap*/) override {} - - protected: - Status CloseImpl() override { return CloseHelper(); } - - private: - Status CloseHelper() { - env->CloseCountInc(); - ; - return Status::IOError(); - } - TestEnv* env; - }; - - void CloseCountInc() { close_count++; } - - int GetCloseCount() { return close_count; } - - Status NewLogger(const std::string& /*fname*/, - std::shared_ptr* result) override { - result->reset(new TestLogger(this)); - return Status::OK(); + class TestLogger : public Logger { + public: + using Logger::Logv; + explicit TestLogger(TestEnv* env_ptr) : Logger() { env = env_ptr; } + ~TestLogger() override { + if (!closed_) { + CloseHelper(); + } } + void Logv(const char* /*format*/, va_list /*ap*/) override {} + + protected: + Status CloseImpl() override { return CloseHelper(); } private: - int close_count; + Status CloseHelper() { + env->CloseCountInc(); + ; + return Status::IOError(); + } + TestEnv* env; + }; + + void CloseCountInc() { close_count++; } + + int GetCloseCount() { return close_count; } + + Status NewLogger(const std::string& /*fname*/, + std::shared_ptr* result) override { + result->reset(new TestLogger(this)); + return Status::OK(); + } + + private: + int close_count; }; TEST_F(DBBasicTest, DBClose) { @@ -1014,7 +1014,7 @@ TEST_F(DBBasicTest, DBCloseFlushError) { Options options = GetDefaultOptions(); options.create_if_missing = true; options.manual_wal_flush = true; - options.write_buffer_size=100; + options.write_buffer_size = 100; options.env = fault_injection_env.get(); Reopen(options); @@ -1464,7 +1464,8 @@ TEST_F(DBBasicTest, MultiGetBatchedMultiLevelMerge) { ASSERT_EQ(0, num_keys); for (int i = 0; i < 128; i += 9) { - ASSERT_OK(Merge("key_" + std::to_string(i), "val_mem_" + std::to_string(i))); + ASSERT_OK( + Merge("key_" + std::to_string(i), "val_mem_" + std::to_string(i))); } std::vector keys; @@ -1702,8 +1703,8 @@ TEST_F(DBBasicTest, MultiGetIOBufferOverrun) { BlockBasedTableOptions table_options; table_options.pin_l0_filter_and_index_blocks_in_cache = true; table_options.block_size = 16 * 1024; - assert(table_options.block_size > - BlockBasedTable::kMultiGetReadStackBufSize); + ASSERT_TRUE(table_options.block_size > + BlockBasedTable::kMultiGetReadStackBufSize); options.table_factory.reset(new BlockBasedTableFactory(table_options)); Reopen(options); @@ -1938,7 +1939,7 @@ class DBBasicTestWithParallelIO if (!Snappy_Supported()) { compression_enabled_ = false; } -#endif //ROCKSDB_LITE +#endif // ROCKSDB_LITE table_options.block_cache = uncompressed_cache_; if (table_options.block_cache == nullptr) { @@ -2275,13 +2276,13 @@ TEST_P(DBBasicTestWithParallelIO, MultiGetWithChecksumMismatch) { ro.fill_cache = fill_cache(); SyncPoint::GetInstance()->SetCallBack( - "RetrieveMultipleBlocks:VerifyChecksum", [&](void *status) { - Status* s = static_cast(status); - read_count++; - if (read_count == 2) { - *s = Status::Corruption(); - } - }); + "RetrieveMultipleBlocks:VerifyChecksum", [&](void* status) { + Status* s = static_cast(status); + read_count++; + if (read_count == 2) { + *s = Status::Corruption(); + } + }); SyncPoint::GetInstance()->EnableProcessing(); // Warm up the cache first @@ -2294,7 +2295,7 @@ TEST_P(DBBasicTestWithParallelIO, MultiGetWithChecksumMismatch) { dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(), keys.data(), values.data(), statuses.data(), true); ASSERT_TRUE(CheckValue(0, values[0].ToString())); - //ASSERT_TRUE(CheckValue(50, values[1].ToString())); + // ASSERT_TRUE(CheckValue(50, values[1].ToString())); ASSERT_EQ(statuses[0], Status::OK()); ASSERT_EQ(statuses[1], Status::Corruption()); @@ -2312,10 +2313,10 @@ TEST_P(DBBasicTestWithParallelIO, MultiGetWithMissingFile) { ro.fill_cache = fill_cache(); SyncPoint::GetInstance()->SetCallBack( - "TableCache::MultiGet:FindTable", [&](void *status) { - Status* s = static_cast(status); - *s = Status::IOError(); - }); + "TableCache::MultiGet:FindTable", [&](void* status) { + Status* s = static_cast(status); + *s = Status::IOError(); + }); // DB open will create table readers unless we reduce the table cache // capacity. // SanitizeOptions will set max_open_files to minimum of 20. Table cache @@ -2324,10 +2325,10 @@ TEST_P(DBBasicTestWithParallelIO, MultiGetWithMissingFile) { // prevent file open during DB open and force the file to be opened // during MultiGet SyncPoint::GetInstance()->SetCallBack( - "SanitizeOptions::AfterChangeMaxOpenFiles", [&](void *arg) { - int* max_open_files = (int*)arg; - *max_open_files = 11; - }); + "SanitizeOptions::AfterChangeMaxOpenFiles", [&](void* arg) { + int* max_open_files = (int*)arg; + *max_open_files = 11; + }); SyncPoint::GetInstance()->EnableProcessing(); Reopen(CurrentOptions()); @@ -2347,15 +2348,15 @@ TEST_P(DBBasicTestWithParallelIO, MultiGetWithMissingFile) { SyncPoint::GetInstance()->DisableProcessing(); } -INSTANTIATE_TEST_CASE_P( - ParallelIO, DBBasicTestWithParallelIO, - // Params are as follows - - // Param 0 - Compressed cache enabled - // Param 1 - Uncompressed cache enabled - // Param 2 - Data compression enabled - // Param 3 - ReadOptions::fill_cache - ::testing::Combine(::testing::Bool(), ::testing::Bool(), - ::testing::Bool(), ::testing::Bool())); +INSTANTIATE_TEST_CASE_P(ParallelIO, DBBasicTestWithParallelIO, + // Params are as follows - + // Param 0 - Compressed cache enabled + // Param 1 - Uncompressed cache enabled + // Param 2 - Data compression enabled + // Param 3 - ReadOptions::fill_cache + ::testing::Combine(::testing::Bool(), ::testing::Bool(), + ::testing::Bool(), + ::testing::Bool())); } // namespace ROCKSDB_NAMESPACE diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index af18ee868..bc753223c 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -1687,12 +1687,20 @@ std::vector DBImpl::MultiGet( const ReadOptions& read_options, const std::vector& column_family, const std::vector& keys, std::vector* values) { + return MultiGet(read_options, column_family, keys, values, + /*timestamps*/ nullptr); +} + +std::vector DBImpl::MultiGet( + const ReadOptions& read_options, + const std::vector& column_family, + const std::vector& keys, std::vector* values, + std::vector* timestamps) { PERF_CPU_TIMER_GUARD(get_cpu_nanos, env_); StopWatch sw(env_, stats_, DB_MULTIGET); PERF_TIMER_GUARD(get_snapshot_time); SequenceNumber consistent_seqnum; - ; std::unordered_map multiget_cf_data( column_family.size()); @@ -1723,6 +1731,9 @@ std::vector DBImpl::MultiGet( size_t num_keys = keys.size(); std::vector stat_list(num_keys); values->resize(num_keys); + if (timestamps) { + timestamps->resize(num_keys); + } // Keep track of bytes that we read for statistics-recording later uint64_t bytes_read = 0; @@ -1737,8 +1748,9 @@ std::vector DBImpl::MultiGet( merge_context.Clear(); Status& s = stat_list[i]; std::string* value = &(*values)[i]; + std::string* timestamp = timestamps ? &(*timestamps)[i] : nullptr; - LookupKey lkey(keys[i], consistent_seqnum); + LookupKey lkey(keys[i], consistent_seqnum, read_options.timestamp); auto cfh = reinterpret_cast(column_family[i]); SequenceNumber max_covering_tombstone_seq = 0; auto mgd_iter = multiget_cf_data.find(cfh->cfd()->GetID()); @@ -1750,13 +1762,12 @@ std::vector DBImpl::MultiGet( has_unpersisted_data_.load(std::memory_order_relaxed)); bool done = false; if (!skip_memtable) { - if (super_version->mem->Get(lkey, value, /*timestamp=*/nullptr, &s, - &merge_context, &max_covering_tombstone_seq, - read_options)) { + if (super_version->mem->Get(lkey, value, timestamp, &s, &merge_context, + &max_covering_tombstone_seq, read_options)) { done = true; RecordTick(stats_, MEMTABLE_HIT); } else if (super_version->imm->Get( - lkey, value, nullptr, &s, &merge_context, + lkey, value, timestamp, &s, &merge_context, &max_covering_tombstone_seq, read_options)) { done = true; RecordTick(stats_, MEMTABLE_HIT); @@ -1765,8 +1776,8 @@ std::vector DBImpl::MultiGet( if (!done) { PinnableSlice pinnable_val; PERF_TIMER_GUARD(get_from_output_files_time); - super_version->current->Get(read_options, lkey, &pinnable_val, - /*timestamp=*/nullptr, &s, &merge_context, + super_version->current->Get(read_options, lkey, &pinnable_val, timestamp, + &s, &merge_context, &max_covering_tombstone_seq); value->assign(pinnable_val.data(), pinnable_val.size()); RecordTick(stats_, MEMTABLE_MISS); @@ -1929,6 +1940,14 @@ void DBImpl::MultiGet(const ReadOptions& read_options, const size_t num_keys, ColumnFamilyHandle** column_families, const Slice* keys, PinnableSlice* values, Status* statuses, const bool sorted_input) { + return MultiGet(read_options, num_keys, column_families, keys, values, + /*timestamps*/ nullptr, statuses, sorted_input); +} + +void DBImpl::MultiGet(const ReadOptions& read_options, const size_t num_keys, + ColumnFamilyHandle** column_families, const Slice* keys, + PinnableSlice* values, std::string* timestamps, + Status* statuses, const bool sorted_input) { if (num_keys == 0) { return; } @@ -1937,7 +1956,7 @@ void DBImpl::MultiGet(const ReadOptions& read_options, const size_t num_keys, sorted_keys.resize(num_keys); for (size_t i = 0; i < num_keys; ++i) { key_context.emplace_back(column_families[i], keys[i], &values[i], - &statuses[i]); + ×tamps[i], &statuses[i]); } for (size_t i = 0; i < num_keys; ++i) { sorted_keys[i] = &key_context[i]; @@ -2057,11 +2076,22 @@ void DBImpl::MultiGet(const ReadOptions& read_options, ColumnFamilyHandle* column_family, const size_t num_keys, const Slice* keys, PinnableSlice* values, Status* statuses, const bool sorted_input) { + return MultiGet(read_options, column_family, num_keys, keys, values, + /*timestamp=*/nullptr, statuses, sorted_input); +} + +void DBImpl::MultiGet(const ReadOptions& read_options, + ColumnFamilyHandle* column_family, const size_t num_keys, + const Slice* keys, PinnableSlice* values, + std::string* timestamps, Status* statuses, + const bool sorted_input) { autovector key_context; autovector sorted_keys; sorted_keys.resize(num_keys); for (size_t i = 0; i < num_keys; ++i) { - key_context.emplace_back(column_family, keys[i], &values[i], &statuses[i]); + key_context.emplace_back(column_family, keys[i], &values[i], + timestamps ? ×tamps[i] : nullptr, + &statuses[i]); } for (size_t i = 0; i < num_keys; ++i) { sorted_keys[i] = &key_context[i]; diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 978ee8c9c..2a057a873 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -188,6 +188,11 @@ class DBImpl : public DB { const std::vector& column_family, const std::vector& keys, std::vector* values) override; + virtual std::vector MultiGet( + const ReadOptions& options, + const std::vector& column_family, + const std::vector& keys, std::vector* values, + std::vector* timestamps) override; // This MultiGet is a batched version, which may be faster than calling Get // multiple times, especially if the keys have some spatial locality that @@ -201,11 +206,22 @@ class DBImpl : public DB { const size_t num_keys, const Slice* keys, PinnableSlice* values, Status* statuses, const bool sorted_input = false) override; + virtual void MultiGet(const ReadOptions& options, + ColumnFamilyHandle* column_family, + const size_t num_keys, const Slice* keys, + PinnableSlice* values, std::string* timestamps, + Status* statuses, + const bool sorted_input = false) override; virtual void MultiGet(const ReadOptions& options, const size_t num_keys, ColumnFamilyHandle** column_families, const Slice* keys, PinnableSlice* values, Status* statuses, const bool sorted_input = false) override; + virtual void MultiGet(const ReadOptions& options, const size_t num_keys, + ColumnFamilyHandle** column_families, const Slice* keys, + PinnableSlice* values, std::string* timestamps, + Status* statuses, + const bool sorted_input = false) override; virtual void MultiGetWithCallback( const ReadOptions& options, ColumnFamilyHandle* column_family, diff --git a/db/db_with_timestamp_basic_test.cc b/db/db_with_timestamp_basic_test.cc index 51678f787..184f08b17 100644 --- a/db/db_with_timestamp_basic_test.cc +++ b/db/db_with_timestamp_basic_test.cc @@ -580,7 +580,7 @@ TEST_P(DBBasicTestWithTimestampCompressionSettings, PutAndGetWithCompaction) { std::vector write_ts_list; std::vector read_ts_list; - const auto& verify_record_func = [&](size_t i, size_t k, + const auto& verify_records_func = [&](size_t i, size_t begin, size_t end, ColumnFamilyHandle* cfh) { std::string value; std::string timestamp; @@ -591,9 +591,11 @@ TEST_P(DBBasicTestWithTimestampCompressionSettings, PutAndGetWithCompaction) { std::string expected_timestamp = std::string(write_ts_list[i].data(), write_ts_list[i].size()); - ASSERT_OK(db_->Get(ropts, cfh, Key1(k), &value, ×tamp)); - ASSERT_EQ("value_" + std::to_string(k) + "_" + std::to_string(i), value); - ASSERT_EQ(expected_timestamp, timestamp); + for (size_t j = begin; j <= end; ++j) { + ASSERT_OK(db_->Get(ropts, cfh, Key1(j), &value, ×tamp)); + ASSERT_EQ("value_" + std::to_string(j) + "_" + std::to_string(i), value); + ASSERT_EQ(expected_timestamp, timestamp); + } }; for (size_t i = 0; i != kNumTimestamps; ++i) { @@ -609,9 +611,7 @@ TEST_P(DBBasicTestWithTimestampCompressionSettings, PutAndGetWithCompaction) { "value_" + std::to_string(j) + "_" + std::to_string(i), wopts)); if (j == kSplitPosBase + i || j == kNumKeysPerTimestamp - 1) { - for (size_t k = memtable_get_start; k <= j; ++k) { - verify_record_func(i, k, handles_[cf]); - } + verify_records_func(i, memtable_get_start, j, handles_[cf]); memtable_get_start = j + 1; // flush all keys with the same timestamp to two sst files, split at @@ -641,15 +641,104 @@ TEST_P(DBBasicTestWithTimestampCompressionSettings, PutAndGetWithCompaction) { write_ts_list[i].size()); for (int cf = 0; cf != static_cast(num_cfs); ++cf) { ColumnFamilyHandle* cfh = handles_[cf]; - for (size_t j = 0; j != kNumKeysPerTimestamp; ++j) { - verify_record_func(i, j, cfh); - } + verify_records_func(i, 0, kNumKeysPerTimestamp - 1, cfh); } } }; verify_db_func(); Close(); } + +TEST_F(DBBasicTestWithTimestamp, BatchWriteAndMultiGet) { + const int kNumKeysPerFile = 8192; + const size_t kNumTimestamps = 2; + const size_t kNumKeysPerTimestamp = (kNumKeysPerFile - 1) / kNumTimestamps; + Options options = CurrentOptions(); + options.create_if_missing = true; + options.env = env_; + options.memtable_factory.reset(new SpecialSkipListFactory(kNumKeysPerFile)); + + size_t ts_sz = Timestamp(0, 0).size(); + TestComparator test_cmp(ts_sz); + options.comparator = &test_cmp; + BlockBasedTableOptions bbto; + bbto.filter_policy.reset(NewBloomFilterPolicy( + 10 /*bits_per_key*/, false /*use_block_based_builder*/)); + bbto.whole_key_filtering = true; + options.table_factory.reset(NewBlockBasedTableFactory(bbto)); + DestroyAndReopen(options); + CreateAndReopenWithCF({"pikachu"}, options); + size_t num_cfs = handles_.size(); + ASSERT_EQ(2, num_cfs); + std::vector write_ts_list; + std::vector read_ts_list; + + const auto& verify_records_func = [&](size_t i, ColumnFamilyHandle* cfh) { + std::vector keys; + std::vector key_vals; + std::vector values; + std::vector timestamps; + + for (size_t j = 0; j != kNumKeysPerTimestamp; ++j) { + key_vals.push_back(Key1(j)); + } + for (size_t j = 0; j != kNumKeysPerTimestamp; ++j) { + keys.push_back(key_vals[j]); + } + + ReadOptions ropts; + const Slice read_ts = read_ts_list[i]; + ropts.timestamp = &read_ts; + std::string expected_timestamp(write_ts_list[i].data(), + write_ts_list[i].size()); + + std::vector cfhs(keys.size(), cfh); + std::vector statuses = + db_->MultiGet(ropts, cfhs, keys, &values, ×tamps); + for (size_t j = 0; j != kNumKeysPerTimestamp; ++j) { + ASSERT_OK(statuses[j]); + ASSERT_EQ("value_" + std::to_string(j) + "_" + std::to_string(i), + values[j]); + ASSERT_EQ(expected_timestamp, timestamps[j]); + } + }; + + for (size_t i = 0; i != kNumTimestamps; ++i) { + write_ts_list.push_back(Timestamp(i * 2, 0)); + read_ts_list.push_back(Timestamp(1 + i * 2, 0)); + const Slice& write_ts = write_ts_list.back(); + for (int cf = 0; cf != static_cast(num_cfs); ++cf) { + WriteOptions wopts; + WriteBatch batch(0, 0, ts_sz); + for (size_t j = 0; j != kNumKeysPerTimestamp; ++j) { + ASSERT_OK( + batch.Put(handles_[cf], Key1(j), + "value_" + std::to_string(j) + "_" + std::to_string(i))); + } + batch.AssignTimestamp(write_ts); + ASSERT_OK(db_->Write(wopts, &batch)); + + verify_records_func(i, handles_[cf]); + + ASSERT_OK(Flush(cf)); + } + } + + const auto& verify_db_func = [&]() { + for (size_t i = 0; i != kNumTimestamps; ++i) { + ReadOptions ropts; + const Slice read_ts = read_ts_list[i]; + ropts.timestamp = &read_ts; + for (int cf = 0; cf != static_cast(num_cfs); ++cf) { + ColumnFamilyHandle* cfh = handles_[cf]; + verify_records_func(i, cfh); + } + } + }; + verify_db_func(); + Close(); +} + #endif // !ROCKSDB_LITE INSTANTIATE_TEST_CASE_P( diff --git a/db/memtable.cc b/db/memtable.cc index 82b0cc928..efc73b7ca 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -919,9 +919,9 @@ void MemTable::MultiGet(const ReadOptions& read_options, MultiGetRange* range, range_del_iter->MaxCoveringTombstoneSeqnum(iter->lkey->user_key())); } GetFromTable(*(iter->lkey), iter->max_covering_tombstone_seq, true, - callback, is_blob, iter->value->GetSelf(), - /*timestamp=*/nullptr, iter->s, &(iter->merge_context), &seq, - &found_final_value, &merge_in_progress); + callback, is_blob, iter->value->GetSelf(), iter->timestamp, + iter->s, &(iter->merge_context), &seq, &found_final_value, + &merge_in_progress); if (!found_final_value && merge_in_progress) { *(iter->s) = Status::MergeInProgress(); diff --git a/db/version_set.cc b/db/version_set.cc index 91d60f615..c8f50b333 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -1897,8 +1897,8 @@ void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range, get_ctx.emplace_back( user_comparator(), merge_operator_, info_log_, db_statistics_, iter->s->ok() ? GetContext::kNotFound : GetContext::kMerge, iter->ukey, - iter->value, /*timestamp*/ nullptr, nullptr, &(iter->merge_context), - true, &iter->max_covering_tombstone_seq, this->env_, nullptr, + iter->value, iter->timestamp, nullptr, &(iter->merge_context), true, + &iter->max_covering_tombstone_seq, this->env_, nullptr, merge_operator_ ? &pinned_iters_mgr : nullptr, callback, is_blob, tracing_mget_id); // MergeInProgress status, if set, has been transferred to the get_context diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index cbc17c205..6de90fa0b 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -480,6 +480,25 @@ class DB { keys, values); } + virtual std::vector MultiGet( + const ReadOptions& /*options*/, + const std::vector& /*column_family*/, + const std::vector& keys, std::vector* /*values*/, + std::vector* /*timestamps*/) { + return std::vector( + keys.size(), Status::NotSupported( + "MultiGet() returning timestamps not implemented.")); + } + virtual std::vector MultiGet(const ReadOptions& options, + const std::vector& keys, + std::vector* values, + std::vector* timestamps) { + return MultiGet( + options, + std::vector(keys.size(), DefaultColumnFamily()), + keys, values, timestamps); + } + // Overloaded MultiGet API that improves performance by batching operations // in the read path for greater efficiency. Currently, only the block based // table format with full filters are supported. Other table formats such @@ -521,6 +540,30 @@ class DB { } } + virtual void MultiGet(const ReadOptions& options, + ColumnFamilyHandle* column_family, + const size_t num_keys, const Slice* keys, + PinnableSlice* values, std::string* timestamps, + Status* statuses, const bool /*sorted_input*/ = false) { + std::vector cf; + std::vector user_keys; + std::vector status; + std::vector vals; + std::vector tss; + + for (size_t i = 0; i < num_keys; ++i) { + cf.emplace_back(column_family); + user_keys.emplace_back(keys[i]); + } + status = MultiGet(options, cf, user_keys, &vals, &tss); + std::copy(status.begin(), status.end(), statuses); + std::copy(tss.begin(), tss.end(), timestamps); + for (auto& value : vals) { + values->PinSelf(value); + values++; + } + } + // Overloaded MultiGet API that improves performance by batching operations // in the read path for greater efficiency. Currently, only the block based // table format with full filters are supported. Other table formats such @@ -560,6 +603,28 @@ class DB { values++; } } + virtual void MultiGet(const ReadOptions& options, const size_t num_keys, + ColumnFamilyHandle** column_families, const Slice* keys, + PinnableSlice* values, std::string* timestamps, + Status* statuses, const bool /*sorted_input*/ = false) { + std::vector cf; + std::vector user_keys; + std::vector status; + std::vector vals; + std::vector tss; + + for (size_t i = 0; i < num_keys; ++i) { + cf.emplace_back(column_families[i]); + user_keys.emplace_back(keys[i]); + } + status = MultiGet(options, cf, user_keys, &vals, &tss); + std::copy(status.begin(), status.end(), statuses); + std::copy(tss.begin(), tss.end(), timestamps); + for (auto& value : vals) { + values->PinSelf(value); + values++; + } + } // If the key definitely does not exist in the database, then this method // returns false, else true. If the caller wants to obtain value when the key diff --git a/table/multiget_context.h b/table/multiget_context.h index 0c5848c82..3d30bf200 100644 --- a/table/multiget_context.h +++ b/table/multiget_context.h @@ -29,10 +29,11 @@ struct KeyContext { bool key_exists; void* cb_arg; PinnableSlice* value; + std::string* timestamp; GetContext* get_context; KeyContext(ColumnFamilyHandle* col_family, const Slice& user_key, - PinnableSlice* val, Status* stat) + PinnableSlice* val, std::string* ts, Status* stat) : key(&user_key), lkey(nullptr), column_family(col_family), @@ -41,6 +42,7 @@ struct KeyContext { key_exists(false), cb_arg(nullptr), value(val), + timestamp(ts), get_context(nullptr) {} KeyContext() = default; diff --git a/utilities/write_batch_with_index/write_batch_with_index.cc b/utilities/write_batch_with_index/write_batch_with_index.cc index 2df6bcaf3..675e9752d 100644 --- a/utilities/write_batch_with_index/write_batch_with_index.cc +++ b/utilities/write_batch_with_index/write_batch_with_index.cc @@ -982,7 +982,8 @@ void WriteBatchWithIndex::MultiGetFromBatchAndDB( assert(result == WriteBatchWithIndexInternal::Result::kMergeInProgress || result == WriteBatchWithIndexInternal::Result::kNotFound); - key_context.emplace_back(column_family, keys[i], &values[i], &statuses[i]); + key_context.emplace_back(column_family, keys[i], &values[i], + /*timestamp*/ nullptr, &statuses[i]); merges.emplace_back(result, std::move(merge_context)); }