From 8c9fff917ca083ef3f9b19ca1e9bb42658cb3c6b Mon Sep 17 00:00:00 2001 From: Jay Zhuang Date: Fri, 25 Sep 2020 09:40:51 -0700 Subject: [PATCH] MultiGet() with timestamp should respect snapshot (#7404) Summary: Similar to PR https://github.com/facebook/rocksdb/issues/7227, add read callback to filter out rows with with higher sequence number. Pull Request resolved: https://github.com/facebook/rocksdb/pull/7404 Reviewed By: riversand963 Differential Revision: D23790762 Pulled By: jay-zhuang fbshipit-source-id: bce854307612f1a22f985ffc934da627d0a139c2 --- db/db_impl/db_impl.cc | 47 ++++- db/db_with_timestamp_basic_test.cc | 273 ++++++++++++++++++++++++++++- 2 files changed, 310 insertions(+), 10 deletions(-) diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index 2d88a9a83..eb3e4187d 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -1815,6 +1815,9 @@ std::vector DBImpl::MultiGet( read_options, nullptr, iter_deref_lambda, &multiget_cf_data, &consistent_seqnum); + TEST_SYNC_POINT("DBImpl::MultiGet:AfterGetSeqNum1"); + TEST_SYNC_POINT("DBImpl::MultiGet:AfterGetSeqNum2"); + // Contain a list of merge operations if merge occurs. MergeContext merge_context; @@ -1837,6 +1840,14 @@ std::vector DBImpl::MultiGet( size_t num_found = 0; size_t keys_read; uint64_t curr_value_size = 0; + + GetWithTimestampReadCallback timestamp_read_callback(0); + ReadCallback* read_callback = nullptr; + if (read_options.timestamp && read_options.timestamp->size() > 0) { + timestamp_read_callback.Refresh(consistent_seqnum); + read_callback = ×tamp_read_callback; + } + for (keys_read = 0; keys_read < num_keys; ++keys_read) { merge_context.Clear(); Status& s = stat_list[keys_read]; @@ -1857,12 +1868,14 @@ std::vector DBImpl::MultiGet( bool done = false; if (!skip_memtable) { if (super_version->mem->Get(lkey, value, timestamp, &s, &merge_context, - &max_covering_tombstone_seq, read_options)) { + &max_covering_tombstone_seq, read_options, + read_callback)) { done = true; RecordTick(stats_, MEMTABLE_HIT); - } else if (super_version->imm->Get( - lkey, value, timestamp, &s, &merge_context, - &max_covering_tombstone_seq, read_options)) { + } else if (super_version->imm->Get(lkey, value, timestamp, &s, + &merge_context, + &max_covering_tombstone_seq, + read_options, read_callback)) { done = true; RecordTick(stats_, MEMTABLE_HIT); } @@ -1870,9 +1883,11 @@ std::vector DBImpl::MultiGet( if (!done) { PinnableSlice pinnable_val; PERF_TIMER_GUARD(get_from_output_files_time); - super_version->current->Get(read_options, lkey, &pinnable_val, timestamp, - &s, &merge_context, - &max_covering_tombstone_seq); + super_version->current->Get( + read_options, lkey, &pinnable_val, timestamp, &s, &merge_context, + &max_covering_tombstone_seq, /*value_found=*/nullptr, + /*key_exists=*/nullptr, + /*seq=*/nullptr, read_callback); value->assign(pinnable_val.data(), pinnable_val.size()); RecordTick(stats_, MEMTABLE_MISS); } @@ -2130,12 +2145,19 @@ void DBImpl::MultiGet(const ReadOptions& read_options, const size_t num_keys, read_options, nullptr, iter_deref_lambda, &multiget_cf_data, &consistent_seqnum); + GetWithTimestampReadCallback timestamp_read_callback(0); + ReadCallback* read_callback = nullptr; + if (read_options.timestamp && read_options.timestamp->size() > 0) { + timestamp_read_callback.Refresh(consistent_seqnum); + read_callback = ×tamp_read_callback; + } + Status s; auto cf_iter = multiget_cf_data.begin(); for (; cf_iter != multiget_cf_data.end(); ++cf_iter) { s = MultiGetImpl(read_options, cf_iter->start, cf_iter->num_keys, &sorted_keys, cf_iter->super_version, consistent_seqnum, - nullptr, nullptr); + read_callback, nullptr); if (!s.ok()) { break; } @@ -2298,9 +2320,16 @@ void DBImpl::MultiGetWithCallback( consistent_seqnum = callback->max_visible_seq(); } + GetWithTimestampReadCallback timestamp_read_callback(0); + ReadCallback* read_callback = nullptr; + if (read_options.timestamp && read_options.timestamp->size() > 0) { + timestamp_read_callback.Refresh(consistent_seqnum); + read_callback = ×tamp_read_callback; + } + Status s = MultiGetImpl(read_options, 0, num_keys, sorted_keys, multiget_cf_data[0].super_version, consistent_seqnum, - nullptr, nullptr); + read_callback, nullptr); assert(s.ok() || s.IsTimedOut() || s.IsAborted()); ReturnAndCleanupSuperVersion(multiget_cf_data[0].cfd, multiget_cf_data[0].super_version); diff --git a/db/db_with_timestamp_basic_test.cc b/db/db_with_timestamp_basic_test.cc index 8c6838a05..9235acf34 100644 --- a/db/db_with_timestamp_basic_test.cc +++ b/db/db_with_timestamp_basic_test.cc @@ -590,8 +590,132 @@ TEST_F(DBBasicTestWithTimestamp, CompactDeletionWithTimestampMarkerToBottom) { class DataVisibilityTest : public DBBasicTestWithTimestampBase { public: - DataVisibilityTest() : DBBasicTestWithTimestampBase("data_visibility_test") {} + DataVisibilityTest() : DBBasicTestWithTimestampBase("data_visibility_test") { + // Initialize test data + for (int i = 0; i < kTestDataSize; i++) { + test_data_[i].key = "key" + ToString(i); + test_data_[i].value = "value" + ToString(i); + test_data_[i].timestamp = Timestamp(i, 0); + test_data_[i].ts = i; + test_data_[i].seq_num = kMaxSequenceNumber; + } + } + + protected: + struct TestData { + std::string key; + std::string value; + int ts; + std::string timestamp; + SequenceNumber seq_num; + }; + + constexpr static int kTestDataSize = 3; + TestData test_data_[kTestDataSize]; + + void PutTestData(int index, ColumnFamilyHandle* cfh = nullptr) { + ASSERT_LE(index, kTestDataSize); + WriteOptions write_opts; + Slice ts_slice = test_data_[index].timestamp; + write_opts.timestamp = &ts_slice; + + if (cfh == nullptr) { + ASSERT_OK( + db_->Put(write_opts, test_data_[index].key, test_data_[index].value)); + const Snapshot* snap = db_->GetSnapshot(); + test_data_[index].seq_num = snap->GetSequenceNumber(); + if (index > 0) { + ASSERT_GT(test_data_[index].seq_num, test_data_[index - 1].seq_num); + } + db_->ReleaseSnapshot(snap); + } else { + ASSERT_OK(db_->Put(write_opts, cfh, test_data_[index].key, + test_data_[index].value)); + } + } + + void AssertVisibility(int ts, SequenceNumber seq, + std::vector statuses) { + ASSERT_EQ(kTestDataSize, statuses.size()); + for (int i = 0; i < kTestDataSize; i++) { + if (test_data_[i].seq_num <= seq && test_data_[i].ts <= ts) { + ASSERT_OK(statuses[i]); + } else { + ASSERT_TRUE(statuses[i].IsNotFound()); + } + } + } + + std::vector GetKeys() { + std::vector ret(kTestDataSize); + for (int i = 0; i < kTestDataSize; i++) { + ret[i] = test_data_[i].key; + } + return ret; + } + + void VerifyDefaultCF(int ts, const Snapshot* snap = nullptr) { + ReadOptions read_opts; + std::string read_ts = Timestamp(ts, 0); + Slice read_ts_slice = read_ts; + read_opts.timestamp = &read_ts_slice; + read_opts.snapshot = snap; + + ColumnFamilyHandle* cfh = db_->DefaultColumnFamily(); + std::vector cfs(kTestDataSize, cfh); + SequenceNumber seq = + snap ? snap->GetSequenceNumber() : kMaxSequenceNumber - 1; + + // There're several MultiGet interfaces with not exactly the same + // implementations, query data with all of them. + auto keys = GetKeys(); + std::vector values; + auto s1 = db_->MultiGet(read_opts, cfs, keys, &values); + AssertVisibility(ts, seq, s1); + + auto s2 = db_->MultiGet(read_opts, keys, &values); + AssertVisibility(ts, seq, s2); + + std::vector timestamps; + auto s3 = db_->MultiGet(read_opts, cfs, keys, &values, ×tamps); + AssertVisibility(ts, seq, s3); + + auto s4 = db_->MultiGet(read_opts, keys, &values, ×tamps); + AssertVisibility(ts, seq, s4); + + std::vector values_ps5(kTestDataSize); + std::vector s5(kTestDataSize); + db_->MultiGet(read_opts, cfh, kTestDataSize, keys.data(), values_ps5.data(), + s5.data()); + AssertVisibility(ts, seq, s5); + + std::vector values_ps6(kTestDataSize); + std::vector s6(kTestDataSize); + std::vector timestamps_array(kTestDataSize); + db_->MultiGet(read_opts, cfh, kTestDataSize, keys.data(), values_ps6.data(), + timestamps_array.data(), s6.data()); + AssertVisibility(ts, seq, s6); + + std::vector values_ps7(kTestDataSize); + std::vector s7(kTestDataSize); + db_->MultiGet(read_opts, kTestDataSize, cfs.data(), keys.data(), + values_ps7.data(), s7.data()); + AssertVisibility(ts, seq, s7); + + std::vector values_ps8(kTestDataSize); + std::vector s8(kTestDataSize); + db_->MultiGet(read_opts, kTestDataSize, cfs.data(), keys.data(), + values_ps8.data(), timestamps_array.data(), s8.data()); + AssertVisibility(ts, seq, s8); + } + + void VerifyDefaultCF(const Snapshot* snap = nullptr) { + for (int i = 0; i <= kTestDataSize; i++) { + VerifyDefaultCF(i, snap); + } + } }; +constexpr int DataVisibilityTest::kTestDataSize; // Application specifies timestamp but not snapshot. // reader writer @@ -906,6 +1030,153 @@ TEST_F(DataVisibilityTest, RangeScanWithSnapshot) { Close(); } +// Application specifies both timestamp and snapshot. +// Query each combination and make sure for MultiGet key , only +// return keys that ts>=t1 AND seq>=s1. +TEST_F(DataVisibilityTest, MultiGetWithTimestamp) { + Options options = CurrentOptions(); + const size_t kTimestampSize = Timestamp(0, 0).size(); + TestComparator test_cmp(kTimestampSize); + options.comparator = &test_cmp; + DestroyAndReopen(options); + + const Snapshot* snap0 = db_->GetSnapshot(); + PutTestData(0); + VerifyDefaultCF(); + VerifyDefaultCF(snap0); + + const Snapshot* snap1 = db_->GetSnapshot(); + PutTestData(1); + VerifyDefaultCF(); + VerifyDefaultCF(snap0); + VerifyDefaultCF(snap1); + + Flush(); + + const Snapshot* snap2 = db_->GetSnapshot(); + PutTestData(2); + VerifyDefaultCF(); + VerifyDefaultCF(snap0); + VerifyDefaultCF(snap1); + VerifyDefaultCF(snap2); + + db_->ReleaseSnapshot(snap0); + db_->ReleaseSnapshot(snap1); + db_->ReleaseSnapshot(snap2); + + Close(); +} + +// Application specifies timestamp but not snapshot. +// reader writer +// ts'=0, 1 +// ts=3 +// seq=10 +// seq'=11, 12 +// write finishes +// MultiGet(ts,seq) +// For MultiGet , only return keys that ts>=t1 AND seq>=s1. +TEST_F(DataVisibilityTest, MultiGetWithoutSnapshot) { + Options options = CurrentOptions(); + const size_t kTimestampSize = Timestamp(0, 0).size(); + TestComparator test_cmp(kTimestampSize); + options.comparator = &test_cmp; + DestroyAndReopen(options); + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->LoadDependency({ + {"DBImpl::MultiGet:AfterGetSeqNum1", + "DataVisibilityTest::MultiGetWithoutSnapshot:BeforePut"}, + {"DataVisibilityTest::MultiGetWithoutSnapshot:AfterPut", + "DBImpl::MultiGet:AfterGetSeqNum2"}, + }); + SyncPoint::GetInstance()->EnableProcessing(); + port::Thread writer_thread([this]() { + TEST_SYNC_POINT("DataVisibilityTest::MultiGetWithoutSnapshot:BeforePut"); + PutTestData(0); + PutTestData(1); + TEST_SYNC_POINT("DataVisibilityTest::MultiGetWithoutSnapshot:AfterPut"); + }); + + ReadOptions read_opts; + std::string read_ts = Timestamp(kTestDataSize, 0); + Slice read_ts_slice = read_ts; + read_opts.timestamp = &read_ts_slice; + auto keys = GetKeys(); + std::vector values; + auto ss = db_->MultiGet(read_opts, keys, &values); + + writer_thread.join(); + for (auto s : ss) { + ASSERT_TRUE(s.IsNotFound()); + } + VerifyDefaultCF(); + Close(); +} + +TEST_F(DataVisibilityTest, MultiGetCrossCF) { + Options options = CurrentOptions(); + const size_t kTimestampSize = Timestamp(0, 0).size(); + TestComparator test_cmp(kTimestampSize); + options.comparator = &test_cmp; + DestroyAndReopen(options); + + CreateAndReopenWithCF({"second"}, options); + ColumnFamilyHandle* second_cf = handles_[1]; + + const Snapshot* snap0 = db_->GetSnapshot(); + PutTestData(0); + PutTestData(0, second_cf); + VerifyDefaultCF(); + VerifyDefaultCF(snap0); + + const Snapshot* snap1 = db_->GetSnapshot(); + PutTestData(1); + PutTestData(1, second_cf); + VerifyDefaultCF(); + VerifyDefaultCF(snap0); + VerifyDefaultCF(snap1); + + Flush(); + + const Snapshot* snap2 = db_->GetSnapshot(); + PutTestData(2); + PutTestData(2, second_cf); + VerifyDefaultCF(); + VerifyDefaultCF(snap0); + VerifyDefaultCF(snap1); + VerifyDefaultCF(snap2); + + ReadOptions read_opts; + std::string read_ts = Timestamp(kTestDataSize, 0); + Slice read_ts_slice = read_ts; + read_opts.timestamp = &read_ts_slice; + read_opts.snapshot = snap1; + auto keys = GetKeys(); + auto keys2 = GetKeys(); + keys.insert(keys.end(), keys2.begin(), keys2.end()); + std::vector cfs(kTestDataSize, + db_->DefaultColumnFamily()); + std::vector cfs2(kTestDataSize, second_cf); + cfs.insert(cfs.end(), cfs2.begin(), cfs2.end()); + + std::vector values; + auto ss = db_->MultiGet(read_opts, cfs, keys, &values); + for (int i = 0; i < 2 * kTestDataSize; i++) { + if (i % 3 == 0) { + // only the first key for each column family should be returned + ASSERT_OK(ss[i]); + } else { + ASSERT_TRUE(ss[i].IsNotFound()); + } + } + + db_->ReleaseSnapshot(snap0); + db_->ReleaseSnapshot(snap1); + db_->ReleaseSnapshot(snap2); + Close(); +} + class DBBasicTestWithTimestampCompressionSettings : public DBBasicTestWithTimestampBase, public testing::WithParamInterface<