MultiGet() with timestamp should respect snapshot (#7404)
Summary: Similar to PR https://github.com/facebook/rocksdb/issues/7227, add read callback to filter out rows with with higher sequence number. Pull Request resolved: https://github.com/facebook/rocksdb/pull/7404 Reviewed By: riversand963 Differential Revision: D23790762 Pulled By: jay-zhuang fbshipit-source-id: bce854307612f1a22f985ffc934da627d0a139c2
This commit is contained in:
parent
c5b3128f15
commit
8c9fff917c
@ -1815,6 +1815,9 @@ std::vector<Status> DBImpl::MultiGet(
|
|||||||
read_options, nullptr, iter_deref_lambda, &multiget_cf_data,
|
read_options, nullptr, iter_deref_lambda, &multiget_cf_data,
|
||||||
&consistent_seqnum);
|
&consistent_seqnum);
|
||||||
|
|
||||||
|
TEST_SYNC_POINT("DBImpl::MultiGet:AfterGetSeqNum1");
|
||||||
|
TEST_SYNC_POINT("DBImpl::MultiGet:AfterGetSeqNum2");
|
||||||
|
|
||||||
// Contain a list of merge operations if merge occurs.
|
// Contain a list of merge operations if merge occurs.
|
||||||
MergeContext merge_context;
|
MergeContext merge_context;
|
||||||
|
|
||||||
@ -1837,6 +1840,14 @@ std::vector<Status> DBImpl::MultiGet(
|
|||||||
size_t num_found = 0;
|
size_t num_found = 0;
|
||||||
size_t keys_read;
|
size_t keys_read;
|
||||||
uint64_t curr_value_size = 0;
|
uint64_t curr_value_size = 0;
|
||||||
|
|
||||||
|
GetWithTimestampReadCallback timestamp_read_callback(0);
|
||||||
|
ReadCallback* read_callback = nullptr;
|
||||||
|
if (read_options.timestamp && read_options.timestamp->size() > 0) {
|
||||||
|
timestamp_read_callback.Refresh(consistent_seqnum);
|
||||||
|
read_callback = ×tamp_read_callback;
|
||||||
|
}
|
||||||
|
|
||||||
for (keys_read = 0; keys_read < num_keys; ++keys_read) {
|
for (keys_read = 0; keys_read < num_keys; ++keys_read) {
|
||||||
merge_context.Clear();
|
merge_context.Clear();
|
||||||
Status& s = stat_list[keys_read];
|
Status& s = stat_list[keys_read];
|
||||||
@ -1857,12 +1868,14 @@ std::vector<Status> DBImpl::MultiGet(
|
|||||||
bool done = false;
|
bool done = false;
|
||||||
if (!skip_memtable) {
|
if (!skip_memtable) {
|
||||||
if (super_version->mem->Get(lkey, value, timestamp, &s, &merge_context,
|
if (super_version->mem->Get(lkey, value, timestamp, &s, &merge_context,
|
||||||
&max_covering_tombstone_seq, read_options)) {
|
&max_covering_tombstone_seq, read_options,
|
||||||
|
read_callback)) {
|
||||||
done = true;
|
done = true;
|
||||||
RecordTick(stats_, MEMTABLE_HIT);
|
RecordTick(stats_, MEMTABLE_HIT);
|
||||||
} else if (super_version->imm->Get(
|
} else if (super_version->imm->Get(lkey, value, timestamp, &s,
|
||||||
lkey, value, timestamp, &s, &merge_context,
|
&merge_context,
|
||||||
&max_covering_tombstone_seq, read_options)) {
|
&max_covering_tombstone_seq,
|
||||||
|
read_options, read_callback)) {
|
||||||
done = true;
|
done = true;
|
||||||
RecordTick(stats_, MEMTABLE_HIT);
|
RecordTick(stats_, MEMTABLE_HIT);
|
||||||
}
|
}
|
||||||
@ -1870,9 +1883,11 @@ std::vector<Status> DBImpl::MultiGet(
|
|||||||
if (!done) {
|
if (!done) {
|
||||||
PinnableSlice pinnable_val;
|
PinnableSlice pinnable_val;
|
||||||
PERF_TIMER_GUARD(get_from_output_files_time);
|
PERF_TIMER_GUARD(get_from_output_files_time);
|
||||||
super_version->current->Get(read_options, lkey, &pinnable_val, timestamp,
|
super_version->current->Get(
|
||||||
&s, &merge_context,
|
read_options, lkey, &pinnable_val, timestamp, &s, &merge_context,
|
||||||
&max_covering_tombstone_seq);
|
&max_covering_tombstone_seq, /*value_found=*/nullptr,
|
||||||
|
/*key_exists=*/nullptr,
|
||||||
|
/*seq=*/nullptr, read_callback);
|
||||||
value->assign(pinnable_val.data(), pinnable_val.size());
|
value->assign(pinnable_val.data(), pinnable_val.size());
|
||||||
RecordTick(stats_, MEMTABLE_MISS);
|
RecordTick(stats_, MEMTABLE_MISS);
|
||||||
}
|
}
|
||||||
@ -2130,12 +2145,19 @@ void DBImpl::MultiGet(const ReadOptions& read_options, const size_t num_keys,
|
|||||||
read_options, nullptr, iter_deref_lambda, &multiget_cf_data,
|
read_options, nullptr, iter_deref_lambda, &multiget_cf_data,
|
||||||
&consistent_seqnum);
|
&consistent_seqnum);
|
||||||
|
|
||||||
|
GetWithTimestampReadCallback timestamp_read_callback(0);
|
||||||
|
ReadCallback* read_callback = nullptr;
|
||||||
|
if (read_options.timestamp && read_options.timestamp->size() > 0) {
|
||||||
|
timestamp_read_callback.Refresh(consistent_seqnum);
|
||||||
|
read_callback = ×tamp_read_callback;
|
||||||
|
}
|
||||||
|
|
||||||
Status s;
|
Status s;
|
||||||
auto cf_iter = multiget_cf_data.begin();
|
auto cf_iter = multiget_cf_data.begin();
|
||||||
for (; cf_iter != multiget_cf_data.end(); ++cf_iter) {
|
for (; cf_iter != multiget_cf_data.end(); ++cf_iter) {
|
||||||
s = MultiGetImpl(read_options, cf_iter->start, cf_iter->num_keys,
|
s = MultiGetImpl(read_options, cf_iter->start, cf_iter->num_keys,
|
||||||
&sorted_keys, cf_iter->super_version, consistent_seqnum,
|
&sorted_keys, cf_iter->super_version, consistent_seqnum,
|
||||||
nullptr, nullptr);
|
read_callback, nullptr);
|
||||||
if (!s.ok()) {
|
if (!s.ok()) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
@ -2298,9 +2320,16 @@ void DBImpl::MultiGetWithCallback(
|
|||||||
consistent_seqnum = callback->max_visible_seq();
|
consistent_seqnum = callback->max_visible_seq();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
GetWithTimestampReadCallback timestamp_read_callback(0);
|
||||||
|
ReadCallback* read_callback = nullptr;
|
||||||
|
if (read_options.timestamp && read_options.timestamp->size() > 0) {
|
||||||
|
timestamp_read_callback.Refresh(consistent_seqnum);
|
||||||
|
read_callback = ×tamp_read_callback;
|
||||||
|
}
|
||||||
|
|
||||||
Status s = MultiGetImpl(read_options, 0, num_keys, sorted_keys,
|
Status s = MultiGetImpl(read_options, 0, num_keys, sorted_keys,
|
||||||
multiget_cf_data[0].super_version, consistent_seqnum,
|
multiget_cf_data[0].super_version, consistent_seqnum,
|
||||||
nullptr, nullptr);
|
read_callback, nullptr);
|
||||||
assert(s.ok() || s.IsTimedOut() || s.IsAborted());
|
assert(s.ok() || s.IsTimedOut() || s.IsAborted());
|
||||||
ReturnAndCleanupSuperVersion(multiget_cf_data[0].cfd,
|
ReturnAndCleanupSuperVersion(multiget_cf_data[0].cfd,
|
||||||
multiget_cf_data[0].super_version);
|
multiget_cf_data[0].super_version);
|
||||||
|
@ -590,8 +590,132 @@ TEST_F(DBBasicTestWithTimestamp, CompactDeletionWithTimestampMarkerToBottom) {
|
|||||||
|
|
||||||
class DataVisibilityTest : public DBBasicTestWithTimestampBase {
|
class DataVisibilityTest : public DBBasicTestWithTimestampBase {
|
||||||
public:
|
public:
|
||||||
DataVisibilityTest() : DBBasicTestWithTimestampBase("data_visibility_test") {}
|
DataVisibilityTest() : DBBasicTestWithTimestampBase("data_visibility_test") {
|
||||||
|
// Initialize test data
|
||||||
|
for (int i = 0; i < kTestDataSize; i++) {
|
||||||
|
test_data_[i].key = "key" + ToString(i);
|
||||||
|
test_data_[i].value = "value" + ToString(i);
|
||||||
|
test_data_[i].timestamp = Timestamp(i, 0);
|
||||||
|
test_data_[i].ts = i;
|
||||||
|
test_data_[i].seq_num = kMaxSequenceNumber;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected:
|
||||||
|
struct TestData {
|
||||||
|
std::string key;
|
||||||
|
std::string value;
|
||||||
|
int ts;
|
||||||
|
std::string timestamp;
|
||||||
|
SequenceNumber seq_num;
|
||||||
|
};
|
||||||
|
|
||||||
|
constexpr static int kTestDataSize = 3;
|
||||||
|
TestData test_data_[kTestDataSize];
|
||||||
|
|
||||||
|
void PutTestData(int index, ColumnFamilyHandle* cfh = nullptr) {
|
||||||
|
ASSERT_LE(index, kTestDataSize);
|
||||||
|
WriteOptions write_opts;
|
||||||
|
Slice ts_slice = test_data_[index].timestamp;
|
||||||
|
write_opts.timestamp = &ts_slice;
|
||||||
|
|
||||||
|
if (cfh == nullptr) {
|
||||||
|
ASSERT_OK(
|
||||||
|
db_->Put(write_opts, test_data_[index].key, test_data_[index].value));
|
||||||
|
const Snapshot* snap = db_->GetSnapshot();
|
||||||
|
test_data_[index].seq_num = snap->GetSequenceNumber();
|
||||||
|
if (index > 0) {
|
||||||
|
ASSERT_GT(test_data_[index].seq_num, test_data_[index - 1].seq_num);
|
||||||
|
}
|
||||||
|
db_->ReleaseSnapshot(snap);
|
||||||
|
} else {
|
||||||
|
ASSERT_OK(db_->Put(write_opts, cfh, test_data_[index].key,
|
||||||
|
test_data_[index].value));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void AssertVisibility(int ts, SequenceNumber seq,
|
||||||
|
std::vector<Status> statuses) {
|
||||||
|
ASSERT_EQ(kTestDataSize, statuses.size());
|
||||||
|
for (int i = 0; i < kTestDataSize; i++) {
|
||||||
|
if (test_data_[i].seq_num <= seq && test_data_[i].ts <= ts) {
|
||||||
|
ASSERT_OK(statuses[i]);
|
||||||
|
} else {
|
||||||
|
ASSERT_TRUE(statuses[i].IsNotFound());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
std::vector<Slice> GetKeys() {
|
||||||
|
std::vector<Slice> ret(kTestDataSize);
|
||||||
|
for (int i = 0; i < kTestDataSize; i++) {
|
||||||
|
ret[i] = test_data_[i].key;
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
void VerifyDefaultCF(int ts, const Snapshot* snap = nullptr) {
|
||||||
|
ReadOptions read_opts;
|
||||||
|
std::string read_ts = Timestamp(ts, 0);
|
||||||
|
Slice read_ts_slice = read_ts;
|
||||||
|
read_opts.timestamp = &read_ts_slice;
|
||||||
|
read_opts.snapshot = snap;
|
||||||
|
|
||||||
|
ColumnFamilyHandle* cfh = db_->DefaultColumnFamily();
|
||||||
|
std::vector<ColumnFamilyHandle*> cfs(kTestDataSize, cfh);
|
||||||
|
SequenceNumber seq =
|
||||||
|
snap ? snap->GetSequenceNumber() : kMaxSequenceNumber - 1;
|
||||||
|
|
||||||
|
// There're several MultiGet interfaces with not exactly the same
|
||||||
|
// implementations, query data with all of them.
|
||||||
|
auto keys = GetKeys();
|
||||||
|
std::vector<std::string> values;
|
||||||
|
auto s1 = db_->MultiGet(read_opts, cfs, keys, &values);
|
||||||
|
AssertVisibility(ts, seq, s1);
|
||||||
|
|
||||||
|
auto s2 = db_->MultiGet(read_opts, keys, &values);
|
||||||
|
AssertVisibility(ts, seq, s2);
|
||||||
|
|
||||||
|
std::vector<std::string> timestamps;
|
||||||
|
auto s3 = db_->MultiGet(read_opts, cfs, keys, &values, ×tamps);
|
||||||
|
AssertVisibility(ts, seq, s3);
|
||||||
|
|
||||||
|
auto s4 = db_->MultiGet(read_opts, keys, &values, ×tamps);
|
||||||
|
AssertVisibility(ts, seq, s4);
|
||||||
|
|
||||||
|
std::vector<PinnableSlice> values_ps5(kTestDataSize);
|
||||||
|
std::vector<Status> s5(kTestDataSize);
|
||||||
|
db_->MultiGet(read_opts, cfh, kTestDataSize, keys.data(), values_ps5.data(),
|
||||||
|
s5.data());
|
||||||
|
AssertVisibility(ts, seq, s5);
|
||||||
|
|
||||||
|
std::vector<PinnableSlice> values_ps6(kTestDataSize);
|
||||||
|
std::vector<Status> s6(kTestDataSize);
|
||||||
|
std::vector<std::string> timestamps_array(kTestDataSize);
|
||||||
|
db_->MultiGet(read_opts, cfh, kTestDataSize, keys.data(), values_ps6.data(),
|
||||||
|
timestamps_array.data(), s6.data());
|
||||||
|
AssertVisibility(ts, seq, s6);
|
||||||
|
|
||||||
|
std::vector<PinnableSlice> values_ps7(kTestDataSize);
|
||||||
|
std::vector<Status> s7(kTestDataSize);
|
||||||
|
db_->MultiGet(read_opts, kTestDataSize, cfs.data(), keys.data(),
|
||||||
|
values_ps7.data(), s7.data());
|
||||||
|
AssertVisibility(ts, seq, s7);
|
||||||
|
|
||||||
|
std::vector<PinnableSlice> values_ps8(kTestDataSize);
|
||||||
|
std::vector<Status> s8(kTestDataSize);
|
||||||
|
db_->MultiGet(read_opts, kTestDataSize, cfs.data(), keys.data(),
|
||||||
|
values_ps8.data(), timestamps_array.data(), s8.data());
|
||||||
|
AssertVisibility(ts, seq, s8);
|
||||||
|
}
|
||||||
|
|
||||||
|
void VerifyDefaultCF(const Snapshot* snap = nullptr) {
|
||||||
|
for (int i = 0; i <= kTestDataSize; i++) {
|
||||||
|
VerifyDefaultCF(i, snap);
|
||||||
|
}
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
constexpr int DataVisibilityTest::kTestDataSize;
|
||||||
|
|
||||||
// Application specifies timestamp but not snapshot.
|
// Application specifies timestamp but not snapshot.
|
||||||
// reader writer
|
// reader writer
|
||||||
@ -906,6 +1030,153 @@ TEST_F(DataVisibilityTest, RangeScanWithSnapshot) {
|
|||||||
Close();
|
Close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Application specifies both timestamp and snapshot.
|
||||||
|
// Query each combination and make sure for MultiGet key <k, t1, s1>, only
|
||||||
|
// return keys that ts>=t1 AND seq>=s1.
|
||||||
|
TEST_F(DataVisibilityTest, MultiGetWithTimestamp) {
|
||||||
|
Options options = CurrentOptions();
|
||||||
|
const size_t kTimestampSize = Timestamp(0, 0).size();
|
||||||
|
TestComparator test_cmp(kTimestampSize);
|
||||||
|
options.comparator = &test_cmp;
|
||||||
|
DestroyAndReopen(options);
|
||||||
|
|
||||||
|
const Snapshot* snap0 = db_->GetSnapshot();
|
||||||
|
PutTestData(0);
|
||||||
|
VerifyDefaultCF();
|
||||||
|
VerifyDefaultCF(snap0);
|
||||||
|
|
||||||
|
const Snapshot* snap1 = db_->GetSnapshot();
|
||||||
|
PutTestData(1);
|
||||||
|
VerifyDefaultCF();
|
||||||
|
VerifyDefaultCF(snap0);
|
||||||
|
VerifyDefaultCF(snap1);
|
||||||
|
|
||||||
|
Flush();
|
||||||
|
|
||||||
|
const Snapshot* snap2 = db_->GetSnapshot();
|
||||||
|
PutTestData(2);
|
||||||
|
VerifyDefaultCF();
|
||||||
|
VerifyDefaultCF(snap0);
|
||||||
|
VerifyDefaultCF(snap1);
|
||||||
|
VerifyDefaultCF(snap2);
|
||||||
|
|
||||||
|
db_->ReleaseSnapshot(snap0);
|
||||||
|
db_->ReleaseSnapshot(snap1);
|
||||||
|
db_->ReleaseSnapshot(snap2);
|
||||||
|
|
||||||
|
Close();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Application specifies timestamp but not snapshot.
|
||||||
|
// reader writer
|
||||||
|
// ts'=0, 1
|
||||||
|
// ts=3
|
||||||
|
// seq=10
|
||||||
|
// seq'=11, 12
|
||||||
|
// write finishes
|
||||||
|
// MultiGet(ts,seq)
|
||||||
|
// For MultiGet <k, t1, s1>, only return keys that ts>=t1 AND seq>=s1.
|
||||||
|
TEST_F(DataVisibilityTest, MultiGetWithoutSnapshot) {
|
||||||
|
Options options = CurrentOptions();
|
||||||
|
const size_t kTimestampSize = Timestamp(0, 0).size();
|
||||||
|
TestComparator test_cmp(kTimestampSize);
|
||||||
|
options.comparator = &test_cmp;
|
||||||
|
DestroyAndReopen(options);
|
||||||
|
|
||||||
|
SyncPoint::GetInstance()->DisableProcessing();
|
||||||
|
SyncPoint::GetInstance()->LoadDependency({
|
||||||
|
{"DBImpl::MultiGet:AfterGetSeqNum1",
|
||||||
|
"DataVisibilityTest::MultiGetWithoutSnapshot:BeforePut"},
|
||||||
|
{"DataVisibilityTest::MultiGetWithoutSnapshot:AfterPut",
|
||||||
|
"DBImpl::MultiGet:AfterGetSeqNum2"},
|
||||||
|
});
|
||||||
|
SyncPoint::GetInstance()->EnableProcessing();
|
||||||
|
port::Thread writer_thread([this]() {
|
||||||
|
TEST_SYNC_POINT("DataVisibilityTest::MultiGetWithoutSnapshot:BeforePut");
|
||||||
|
PutTestData(0);
|
||||||
|
PutTestData(1);
|
||||||
|
TEST_SYNC_POINT("DataVisibilityTest::MultiGetWithoutSnapshot:AfterPut");
|
||||||
|
});
|
||||||
|
|
||||||
|
ReadOptions read_opts;
|
||||||
|
std::string read_ts = Timestamp(kTestDataSize, 0);
|
||||||
|
Slice read_ts_slice = read_ts;
|
||||||
|
read_opts.timestamp = &read_ts_slice;
|
||||||
|
auto keys = GetKeys();
|
||||||
|
std::vector<std::string> values;
|
||||||
|
auto ss = db_->MultiGet(read_opts, keys, &values);
|
||||||
|
|
||||||
|
writer_thread.join();
|
||||||
|
for (auto s : ss) {
|
||||||
|
ASSERT_TRUE(s.IsNotFound());
|
||||||
|
}
|
||||||
|
VerifyDefaultCF();
|
||||||
|
Close();
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_F(DataVisibilityTest, MultiGetCrossCF) {
|
||||||
|
Options options = CurrentOptions();
|
||||||
|
const size_t kTimestampSize = Timestamp(0, 0).size();
|
||||||
|
TestComparator test_cmp(kTimestampSize);
|
||||||
|
options.comparator = &test_cmp;
|
||||||
|
DestroyAndReopen(options);
|
||||||
|
|
||||||
|
CreateAndReopenWithCF({"second"}, options);
|
||||||
|
ColumnFamilyHandle* second_cf = handles_[1];
|
||||||
|
|
||||||
|
const Snapshot* snap0 = db_->GetSnapshot();
|
||||||
|
PutTestData(0);
|
||||||
|
PutTestData(0, second_cf);
|
||||||
|
VerifyDefaultCF();
|
||||||
|
VerifyDefaultCF(snap0);
|
||||||
|
|
||||||
|
const Snapshot* snap1 = db_->GetSnapshot();
|
||||||
|
PutTestData(1);
|
||||||
|
PutTestData(1, second_cf);
|
||||||
|
VerifyDefaultCF();
|
||||||
|
VerifyDefaultCF(snap0);
|
||||||
|
VerifyDefaultCF(snap1);
|
||||||
|
|
||||||
|
Flush();
|
||||||
|
|
||||||
|
const Snapshot* snap2 = db_->GetSnapshot();
|
||||||
|
PutTestData(2);
|
||||||
|
PutTestData(2, second_cf);
|
||||||
|
VerifyDefaultCF();
|
||||||
|
VerifyDefaultCF(snap0);
|
||||||
|
VerifyDefaultCF(snap1);
|
||||||
|
VerifyDefaultCF(snap2);
|
||||||
|
|
||||||
|
ReadOptions read_opts;
|
||||||
|
std::string read_ts = Timestamp(kTestDataSize, 0);
|
||||||
|
Slice read_ts_slice = read_ts;
|
||||||
|
read_opts.timestamp = &read_ts_slice;
|
||||||
|
read_opts.snapshot = snap1;
|
||||||
|
auto keys = GetKeys();
|
||||||
|
auto keys2 = GetKeys();
|
||||||
|
keys.insert(keys.end(), keys2.begin(), keys2.end());
|
||||||
|
std::vector<ColumnFamilyHandle*> cfs(kTestDataSize,
|
||||||
|
db_->DefaultColumnFamily());
|
||||||
|
std::vector<ColumnFamilyHandle*> cfs2(kTestDataSize, second_cf);
|
||||||
|
cfs.insert(cfs.end(), cfs2.begin(), cfs2.end());
|
||||||
|
|
||||||
|
std::vector<std::string> values;
|
||||||
|
auto ss = db_->MultiGet(read_opts, cfs, keys, &values);
|
||||||
|
for (int i = 0; i < 2 * kTestDataSize; i++) {
|
||||||
|
if (i % 3 == 0) {
|
||||||
|
// only the first key for each column family should be returned
|
||||||
|
ASSERT_OK(ss[i]);
|
||||||
|
} else {
|
||||||
|
ASSERT_TRUE(ss[i].IsNotFound());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
db_->ReleaseSnapshot(snap0);
|
||||||
|
db_->ReleaseSnapshot(snap1);
|
||||||
|
db_->ReleaseSnapshot(snap2);
|
||||||
|
Close();
|
||||||
|
}
|
||||||
|
|
||||||
class DBBasicTestWithTimestampCompressionSettings
|
class DBBasicTestWithTimestampCompressionSettings
|
||||||
: public DBBasicTestWithTimestampBase,
|
: public DBBasicTestWithTimestampBase,
|
||||||
public testing::WithParamInterface<
|
public testing::WithParamInterface<
|
||||||
|
Loading…
x
Reference in New Issue
Block a user