Add timestamp support to DBImplReadOnly
This commit is contained in:
parent
3f263ef536
commit
f147bb7f91
@ -1723,17 +1723,6 @@ Status DBImpl::Get(const ReadOptions& read_options,
|
||||
return s;
|
||||
}
|
||||
|
||||
namespace {
|
||||
class GetWithTimestampReadCallback : public ReadCallback {
|
||||
public:
|
||||
explicit GetWithTimestampReadCallback(SequenceNumber seq)
|
||||
: ReadCallback(seq) {}
|
||||
bool IsVisibleFullCheck(SequenceNumber seq) override {
|
||||
return seq <= max_visible_seq_;
|
||||
}
|
||||
};
|
||||
} // namespace
|
||||
|
||||
Status DBImpl::GetImpl(const ReadOptions& read_options, const Slice& key,
|
||||
GetImplOptions& get_impl_options) {
|
||||
assert(get_impl_options.value != nullptr ||
|
||||
|
@ -2395,6 +2395,15 @@ class DBImpl : public DB {
|
||||
std::unique_ptr<StallInterface> wbm_stall_;
|
||||
};
|
||||
|
||||
class GetWithTimestampReadCallback : public ReadCallback {
|
||||
public:
|
||||
explicit GetWithTimestampReadCallback(SequenceNumber seq)
|
||||
: ReadCallback(seq) {}
|
||||
bool IsVisibleFullCheck(SequenceNumber seq) override {
|
||||
return seq <= max_visible_seq_;
|
||||
}
|
||||
};
|
||||
|
||||
extern Options SanitizeOptions(const std::string& db, const Options& src,
|
||||
bool read_only = false);
|
||||
|
||||
|
@ -33,20 +33,39 @@ DBImplReadOnly::~DBImplReadOnly() {}
|
||||
Status DBImplReadOnly::Get(const ReadOptions& read_options,
|
||||
ColumnFamilyHandle* column_family, const Slice& key,
|
||||
PinnableSlice* pinnable_val) {
|
||||
return Get(read_options, column_family, key, pinnable_val,
|
||||
/*timestamp*/ nullptr);
|
||||
}
|
||||
|
||||
Status DBImplReadOnly::Get(const ReadOptions& read_options,
|
||||
ColumnFamilyHandle* column_family, const Slice& key,
|
||||
PinnableSlice* pinnable_val,
|
||||
std::string* timestamp) {
|
||||
assert(pinnable_val != nullptr);
|
||||
// TODO: stopwatch DB_GET needed?, perf timer needed?
|
||||
PERF_TIMER_GUARD(get_snapshot_time);
|
||||
|
||||
assert(column_family);
|
||||
const Comparator* ucmp = column_family->GetComparator();
|
||||
assert(ucmp);
|
||||
if (ucmp->timestamp_size() || read_options.timestamp) {
|
||||
// TODO: support timestamp
|
||||
return Status::NotSupported();
|
||||
if (read_options.timestamp) {
|
||||
const Status s =
|
||||
FailIfTsSizesMismatch(column_family, *(read_options.timestamp));
|
||||
if (!s.ok()) {
|
||||
return s;
|
||||
}
|
||||
} else {
|
||||
const Status s = FailIfCfHasTs(column_family);
|
||||
if (!s.ok()) {
|
||||
return s;
|
||||
}
|
||||
}
|
||||
|
||||
std::string* ts = column_family->GetComparator()->timestamp_size() > 0
|
||||
? timestamp
|
||||
: nullptr;
|
||||
|
||||
Status s;
|
||||
SequenceNumber snapshot = versions_->LastSequence();
|
||||
GetWithTimestampReadCallback read_cb(snapshot);
|
||||
auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
|
||||
auto cfd = cfh->cfd();
|
||||
if (tracer_) {
|
||||
@ -58,19 +77,23 @@ Status DBImplReadOnly::Get(const ReadOptions& read_options,
|
||||
SuperVersion* super_version = cfd->GetSuperVersion();
|
||||
MergeContext merge_context;
|
||||
SequenceNumber max_covering_tombstone_seq = 0;
|
||||
LookupKey lkey(key, snapshot);
|
||||
LookupKey lkey(key, snapshot, read_options.timestamp);
|
||||
PERF_TIMER_STOP(get_snapshot_time);
|
||||
if (super_version->mem->Get(lkey, pinnable_val->GetSelf(),
|
||||
/*timestamp=*/nullptr, &s, &merge_context,
|
||||
&max_covering_tombstone_seq, read_options)) {
|
||||
if (super_version->mem->Get(lkey, pinnable_val->GetSelf(), ts, &s,
|
||||
&merge_context, &max_covering_tombstone_seq,
|
||||
read_options, &read_cb)) {
|
||||
pinnable_val->PinSelf();
|
||||
RecordTick(stats_, MEMTABLE_HIT);
|
||||
} else {
|
||||
PERF_TIMER_GUARD(get_from_output_files_time);
|
||||
PinnedIteratorsManager pinned_iters_mgr;
|
||||
super_version->current->Get(read_options, lkey, pinnable_val,
|
||||
/*timestamp=*/nullptr, &s, &merge_context,
|
||||
&max_covering_tombstone_seq, &pinned_iters_mgr);
|
||||
super_version->current->Get(
|
||||
read_options, lkey, pinnable_val, ts, &s, &merge_context,
|
||||
&max_covering_tombstone_seq, &pinned_iters_mgr,
|
||||
/*value_found*/ nullptr,
|
||||
/*key_exists*/ nullptr, /*seq*/ nullptr, &read_cb,
|
||||
/*is_blob*/ nullptr,
|
||||
/*do_merge*/ true);
|
||||
RecordTick(stats_, MEMTABLE_MISS);
|
||||
}
|
||||
RecordTick(stats_, NUMBER_KEYS_READ);
|
||||
@ -84,11 +107,17 @@ Status DBImplReadOnly::Get(const ReadOptions& read_options,
|
||||
Iterator* DBImplReadOnly::NewIterator(const ReadOptions& read_options,
|
||||
ColumnFamilyHandle* column_family) {
|
||||
assert(column_family);
|
||||
const Comparator* ucmp = column_family->GetComparator();
|
||||
assert(ucmp);
|
||||
if (ucmp->timestamp_size() || read_options.timestamp) {
|
||||
// TODO: support timestamp
|
||||
return NewErrorIterator(Status::NotSupported());
|
||||
if (read_options.timestamp) {
|
||||
const Status s =
|
||||
FailIfTsSizesMismatch(column_family, *(read_options.timestamp));
|
||||
if (!s.ok()) {
|
||||
return NewErrorIterator(s);
|
||||
}
|
||||
} else {
|
||||
const Status s = FailIfCfHasTs(column_family);
|
||||
if (!s.ok()) {
|
||||
return NewErrorIterator(s);
|
||||
}
|
||||
}
|
||||
auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
|
||||
auto cfd = cfh->cfd();
|
||||
@ -118,16 +147,19 @@ Status DBImplReadOnly::NewIterators(
|
||||
const std::vector<ColumnFamilyHandle*>& column_families,
|
||||
std::vector<Iterator*>* iterators) {
|
||||
if (read_options.timestamp) {
|
||||
// TODO: support timestamp
|
||||
return Status::NotSupported();
|
||||
for (auto* cf : column_families) {
|
||||
assert(cf);
|
||||
const Status s = FailIfTsSizesMismatch(cf, *(read_options.timestamp));
|
||||
if (!s.ok()) {
|
||||
return s;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
for (auto* cf : column_families) {
|
||||
assert(cf);
|
||||
const Comparator* ucmp = cf->GetComparator();
|
||||
assert(ucmp);
|
||||
if (ucmp->timestamp_size()) {
|
||||
// TODO: support timestamp
|
||||
return Status::NotSupported();
|
||||
const Status s = FailIfCfHasTs(cf);
|
||||
if (!s.ok()) {
|
||||
return s;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -27,6 +27,9 @@ class DBImplReadOnly : public DBImpl {
|
||||
virtual Status Get(const ReadOptions& options,
|
||||
ColumnFamilyHandle* column_family, const Slice& key,
|
||||
PinnableSlice* value) override;
|
||||
virtual Status Get(const ReadOptions& options,
|
||||
ColumnFamilyHandle* column_family, const Slice& key,
|
||||
PinnableSlice* value, std::string* timestamp) override;
|
||||
|
||||
// TODO: Implement ReadOnly MultiGet?
|
||||
|
||||
|
@ -704,6 +704,155 @@ TEST_F(DBBasicTestWithTimestamp, SimpleIterate) {
|
||||
Close();
|
||||
}
|
||||
|
||||
TEST_F(DBBasicTestWithTimestamp, ReadOnlyDBSimpleIterateAndGet) {
|
||||
const int kNumKeysPerFile = 128;
|
||||
const uint64_t kMaxKey = 1024;
|
||||
Options options = CurrentOptions();
|
||||
options.env = env_;
|
||||
options.create_if_missing = true;
|
||||
const size_t kTimestampSize = Timestamp(0, 0).size();
|
||||
TestComparator test_cmp(kTimestampSize);
|
||||
options.comparator = &test_cmp;
|
||||
options.memtable_factory.reset(
|
||||
test::NewSpecialSkipListFactory(kNumKeysPerFile));
|
||||
DestroyAndReopen(options);
|
||||
const std::vector<uint64_t> start_keys = {1, 0};
|
||||
const std::vector<std::string> write_timestamps = {Timestamp(1, 0),
|
||||
Timestamp(3, 0)};
|
||||
const std::vector<std::string> read_timestamps = {Timestamp(2, 0),
|
||||
Timestamp(4, 0)};
|
||||
for (size_t i = 0; i < write_timestamps.size(); ++i) {
|
||||
WriteOptions write_opts;
|
||||
for (uint64_t key = start_keys[i]; key <= kMaxKey; ++key) {
|
||||
Status s = db_->Put(write_opts, Key1(key), write_timestamps[i],
|
||||
"value" + std::to_string(i));
|
||||
ASSERT_OK(s);
|
||||
}
|
||||
}
|
||||
|
||||
// Reopen the database in read only mode to verify it has timestamp support.
|
||||
Close();
|
||||
ASSERT_OK(ReadOnlyReopen(options));
|
||||
|
||||
auto get_value_and_check = [](DB* db, ReadOptions read_opts, Slice key,
|
||||
Slice expected_value, std::string expected_ts) {
|
||||
std::string value_from_get;
|
||||
std::string timestamp;
|
||||
ASSERT_OK(db->Get(read_opts, key.ToString(), &value_from_get, ×tamp));
|
||||
ASSERT_EQ(expected_value, value_from_get);
|
||||
ASSERT_EQ(expected_ts, timestamp);
|
||||
};
|
||||
for (size_t i = 0; i < read_timestamps.size(); ++i) {
|
||||
ReadOptions read_opts;
|
||||
Slice read_ts = read_timestamps[i];
|
||||
read_opts.timestamp = &read_ts;
|
||||
std::unique_ptr<Iterator> it(db_->NewIterator(read_opts));
|
||||
int count = 0;
|
||||
uint64_t key = 0;
|
||||
// Forward iterate.
|
||||
for (it->Seek(Key1(0)), key = start_keys[i]; it->Valid();
|
||||
it->Next(), ++count, ++key) {
|
||||
CheckIterUserEntry(it.get(), Key1(key), kTypeValue,
|
||||
"value" + std::to_string(i), write_timestamps[i]);
|
||||
get_value_and_check(db_, read_opts, it->key(), it->value(),
|
||||
write_timestamps[i]);
|
||||
}
|
||||
size_t expected_count = kMaxKey - start_keys[i] + 1;
|
||||
ASSERT_EQ(expected_count, count);
|
||||
|
||||
// Backward iterate.
|
||||
count = 0;
|
||||
for (it->SeekForPrev(Key1(kMaxKey)), key = kMaxKey; it->Valid();
|
||||
it->Prev(), ++count, --key) {
|
||||
CheckIterUserEntry(it.get(), Key1(key), kTypeValue,
|
||||
"value" + std::to_string(i), write_timestamps[i]);
|
||||
get_value_and_check(db_, read_opts, it->key(), it->value(),
|
||||
write_timestamps[i]);
|
||||
}
|
||||
ASSERT_EQ(static_cast<size_t>(kMaxKey) - start_keys[i] + 1, count);
|
||||
|
||||
// SeekToFirst()/SeekToLast() with lower/upper bounds.
|
||||
// Then iter with lower and upper bounds.
|
||||
uint64_t l = 0;
|
||||
uint64_t r = kMaxKey + 1;
|
||||
while (l < r) {
|
||||
std::string lb_str = Key1(l);
|
||||
Slice lb = lb_str;
|
||||
std::string ub_str = Key1(r);
|
||||
Slice ub = ub_str;
|
||||
read_opts.iterate_lower_bound = &lb;
|
||||
read_opts.iterate_upper_bound = &ub;
|
||||
it.reset(db_->NewIterator(read_opts));
|
||||
for (it->SeekToFirst(), key = std::max(l, start_keys[i]), count = 0;
|
||||
it->Valid(); it->Next(), ++key, ++count) {
|
||||
CheckIterUserEntry(it.get(), Key1(key), kTypeValue,
|
||||
"value" + std::to_string(i), write_timestamps[i]);
|
||||
get_value_and_check(db_, read_opts, it->key(), it->value(),
|
||||
write_timestamps[i]);
|
||||
}
|
||||
ASSERT_EQ(r - std::max(l, start_keys[i]), count);
|
||||
|
||||
for (it->SeekToLast(), key = std::min(r, kMaxKey + 1), count = 0;
|
||||
it->Valid(); it->Prev(), --key, ++count) {
|
||||
CheckIterUserEntry(it.get(), Key1(key - 1), kTypeValue,
|
||||
"value" + std::to_string(i), write_timestamps[i]);
|
||||
get_value_and_check(db_, read_opts, it->key(), it->value(),
|
||||
write_timestamps[i]);
|
||||
}
|
||||
l += (kMaxKey / 100);
|
||||
r -= (kMaxKey / 100);
|
||||
}
|
||||
}
|
||||
Close();
|
||||
}
|
||||
|
||||
TEST_F(DBBasicTestWithTimestamp, ReadOnlyDBIterators) {
|
||||
const int kNumKeysPerFile = 128;
|
||||
const uint64_t kMaxKey = 1024;
|
||||
Options options = CurrentOptions();
|
||||
options.env = env_;
|
||||
options.create_if_missing = true;
|
||||
const size_t kTimestampSize = Timestamp(0, 0).size();
|
||||
TestComparator test_cmp(kTimestampSize);
|
||||
options.comparator = &test_cmp;
|
||||
options.memtable_factory.reset(
|
||||
test::NewSpecialSkipListFactory(kNumKeysPerFile));
|
||||
DestroyAndReopen(options);
|
||||
const std::string write_timestamp = Timestamp(1, 0);
|
||||
const std::string read_timestamp = Timestamp(2, 0);
|
||||
WriteOptions write_opts;
|
||||
for (uint64_t key = 0; key <= kMaxKey; ++key) {
|
||||
Status s = db_->Put(write_opts, Key1(key), write_timestamp,
|
||||
"value" + std::to_string(key));
|
||||
ASSERT_OK(s);
|
||||
}
|
||||
|
||||
// Reopen the database in read only mode to verify it has timestamp support.
|
||||
Close();
|
||||
ASSERT_OK(ReadOnlyReopen(options));
|
||||
ReadOptions read_opts;
|
||||
Slice read_ts = read_timestamp;
|
||||
read_opts.timestamp = &read_ts;
|
||||
std::vector<Iterator*> iters;
|
||||
ASSERT_OK(db_->NewIterators(read_opts, {db_->DefaultColumnFamily()}, &iters));
|
||||
ASSERT_EQ(static_cast<uint64_t>(1), iters.size());
|
||||
|
||||
int count = 0;
|
||||
uint64_t key = 0;
|
||||
// Forward iterate.
|
||||
for (iters[0]->Seek(Key1(0)), key = 0; iters[0]->Valid();
|
||||
iters[0]->Next(), ++count, ++key) {
|
||||
CheckIterUserEntry(iters[0], Key1(key), kTypeValue,
|
||||
"value" + std::to_string(key), write_timestamp);
|
||||
}
|
||||
|
||||
size_t expected_count = kMaxKey - 0 + 1;
|
||||
ASSERT_EQ(expected_count, count);
|
||||
delete iters[0];
|
||||
|
||||
Close();
|
||||
}
|
||||
|
||||
TEST_F(DBBasicTestWithTimestamp, TrimHistoryTest) {
|
||||
Options options = CurrentOptions();
|
||||
options.env = env_;
|
||||
|
Loading…
Reference in New Issue
Block a user