return timestamp from get (#6409)

Summary:
Added new Get() methods that return timestamp. Dummy implementation is given so that classes derived from DB don't need to be touched to provide their implementation. MultiGet is not included.

ReadRandom perf test (10 minutes) on the same development machine ram drive with the same DB data shows no regression (within marge of error). The test is adapted from https://github.com/facebook/rocksdb/wiki/RocksDB-In-Memory-Workload-Performance-Benchmarks.
    base line (commit 72ee067b9):
        101.712 micros/op 314602 ops/sec;   36.0 MB/s (5658999 of 5658999 found)
    This PR:
        100.288 micros/op 319071 ops/sec;   36.5 MB/s (5674999 of 5674999 found)

./db_bench --db=r:\rocksdb.github --num_levels=6 --key_size=20 --prefix_size=20 --keys_per_prefix=0 --value_size=100 --cache_size=2147483648 --cache_numshardbits=6 --compression_type=none --compression_ratio=1 --min_level_to_compress=-1 --disable_seek_compaction=1 --hard_rate_limit=2 --write_buffer_size=134217728 --max_write_buffer_number=2 --level0_file_num_compaction_trigger=8 --target_file_size_base=134217728 --max_bytes_for_level_base=1073741824 --disable_wal=0 --wal_dir=r:\rocksdb.github\WAL_LOG --sync=0 --verify_checksum=1 --delete_obsolete_files_period_micros=314572800 --max_background_compactions=4 --max_background_flushes=0 --level0_slowdown_writes_trigger=16 --level0_stop_writes_trigger=24 --statistics=0 --stats_per_interval=0 --stats_interval=1048576 --histogram=0 --use_plain_table=1 --open_files=-1 --mmap_read=1 --mmap_write=0 --memtablerep=prefix_hash --bloom_bits=10 --bloom_locality=1 --duration=600 --benchmarks=readrandom --use_existing_db=1 --num=25000000 --threads=32
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6409

Differential Revision: D20200086

Pulled By: riversand963

fbshipit-source-id: 490edd74d924f62bd8ae9c29c2a6bbbb8410ca50
This commit is contained in:
Huisheng Liu 2020-03-02 15:58:32 -08:00 committed by Facebook Github Bot
parent 8637bc1eea
commit 904a60ff63
19 changed files with 305 additions and 139 deletions

View File

@ -37,7 +37,7 @@ Status CompactedDBImpl::Get(const ReadOptions& options, ColumnFamilyHandle*,
const Slice& key, PinnableSlice* value) { const Slice& key, PinnableSlice* value) {
GetContext get_context(user_comparator_, nullptr, nullptr, nullptr, GetContext get_context(user_comparator_, nullptr, nullptr, nullptr,
GetContext::kNotFound, key, value, nullptr, nullptr, GetContext::kNotFound, key, value, nullptr, nullptr,
true, nullptr, nullptr); nullptr, true, nullptr, nullptr);
LookupKey lkey(key, kMaxSequenceNumber); LookupKey lkey(key, kMaxSequenceNumber);
files_.files[FindFile(key)].fd.table_reader->Get(options, lkey.internal_key(), files_.files[FindFile(key)].fd.table_reader->Get(options, lkey.internal_key(),
&get_context, nullptr); &get_context, nullptr);
@ -70,7 +70,7 @@ std::vector<Status> CompactedDBImpl::MultiGet(const ReadOptions& options,
std::string& value = (*values)[idx]; std::string& value = (*values)[idx];
GetContext get_context(user_comparator_, nullptr, nullptr, nullptr, GetContext get_context(user_comparator_, nullptr, nullptr, nullptr,
GetContext::kNotFound, keys[idx], &pinnable_val, GetContext::kNotFound, keys[idx], &pinnable_val,
nullptr, nullptr, true, nullptr, nullptr); nullptr, nullptr, nullptr, true, nullptr, nullptr);
LookupKey lkey(keys[idx], kMaxSequenceNumber); LookupKey lkey(keys[idx], kMaxSequenceNumber);
r->Get(options, lkey.internal_key(), &get_context, nullptr); r->Get(options, lkey.internal_key(), &get_context, nullptr);
value.assign(pinnable_val.data(), pinnable_val.size()); value.assign(pinnable_val.data(), pinnable_val.size());

View File

@ -2356,6 +2356,22 @@ TEST_F(DBBasicTestWithTimestamp, PutAndGetWithCompaction) {
std::vector<Slice> write_ts_list; std::vector<Slice> write_ts_list;
std::vector<Slice> read_ts_list; std::vector<Slice> read_ts_list;
const auto& verify_record_func = [&](size_t i, size_t k,
ColumnFamilyHandle* cfh) {
std::string value;
std::string timestamp;
ReadOptions ropts;
ropts.timestamp = &read_ts_list[i];
std::string expected_timestamp =
std::string(write_ts_list[i].data(), write_ts_list[i].size());
ASSERT_OK(
db_->Get(ropts, cfh, "key" + std::to_string(k), &value, &timestamp));
ASSERT_EQ("value_" + std::to_string(k) + "_" + std::to_string(i), value);
ASSERT_EQ(expected_timestamp, timestamp);
};
for (size_t i = 0; i != kNumTimestamps; ++i) { for (size_t i = 0; i != kNumTimestamps; ++i) {
write_ts_list.emplace_back(EncodeTimestamp(i * 2, 0, &write_ts_strs[i])); write_ts_list.emplace_back(EncodeTimestamp(i * 2, 0, &write_ts_strs[i]));
read_ts_list.emplace_back(EncodeTimestamp(1 + i * 2, 0, &read_ts_strs[i])); read_ts_list.emplace_back(EncodeTimestamp(1 + i * 2, 0, &read_ts_strs[i]));
@ -2363,11 +2379,17 @@ TEST_F(DBBasicTestWithTimestamp, PutAndGetWithCompaction) {
WriteOptions wopts; WriteOptions wopts;
wopts.timestamp = &write_ts; wopts.timestamp = &write_ts;
for (int cf = 0; cf != static_cast<int>(num_cfs); ++cf) { for (int cf = 0; cf != static_cast<int>(num_cfs); ++cf) {
size_t memtable_get_start = 0;
for (size_t j = 0; j != kNumKeysPerTimestamp; ++j) { for (size_t j = 0; j != kNumKeysPerTimestamp; ++j) {
ASSERT_OK(Put(cf, "key" + std::to_string(j), ASSERT_OK(Put(cf, "key" + std::to_string(j),
"value_" + std::to_string(j) + "_" + std::to_string(i), "value_" + std::to_string(j) + "_" + std::to_string(i),
wopts)); wopts));
if (j == kSplitPosBase + i || j == kNumKeysPerTimestamp - 1) { if (j == kSplitPosBase + i || j == kNumKeysPerTimestamp - 1) {
for (size_t k = memtable_get_start; k <= j; ++k) {
verify_record_func(i, k, handles_[cf]);
}
memtable_get_start = j + 1;
// flush all keys with the same timestamp to two sst files, split at // flush all keys with the same timestamp to two sst files, split at
// incremental positions such that lowerlevel[1].smallest.userkey == // incremental positions such that lowerlevel[1].smallest.userkey ==
// higherlevel[0].largest.userkey // higherlevel[0].largest.userkey
@ -2390,13 +2412,12 @@ TEST_F(DBBasicTestWithTimestamp, PutAndGetWithCompaction) {
for (size_t i = 0; i != kNumTimestamps; ++i) { for (size_t i = 0; i != kNumTimestamps; ++i) {
ReadOptions ropts; ReadOptions ropts;
ropts.timestamp = &read_ts_list[i]; ropts.timestamp = &read_ts_list[i];
std::string expected_timestamp(write_ts_list[i].data(),
write_ts_list[i].size());
for (int cf = 0; cf != static_cast<int>(num_cfs); ++cf) { for (int cf = 0; cf != static_cast<int>(num_cfs); ++cf) {
ColumnFamilyHandle* cfh = handles_[cf]; ColumnFamilyHandle* cfh = handles_[cf];
for (size_t j = 0; j != kNumKeysPerTimestamp; ++j) { for (size_t j = 0; j != kNumKeysPerTimestamp; ++j) {
std::string value; verify_record_func(i, j, cfh);
ASSERT_OK(db_->Get(ropts, cfh, "key" + std::to_string(j), &value));
ASSERT_EQ("value_" + std::to_string(j) + "_" + std::to_string(i),
value);
} }
} }
} }

View File

@ -1496,14 +1496,22 @@ ColumnFamilyHandle* DBImpl::PersistentStatsColumnFamily() const {
Status DBImpl::Get(const ReadOptions& read_options, Status DBImpl::Get(const ReadOptions& read_options,
ColumnFamilyHandle* column_family, const Slice& key, ColumnFamilyHandle* column_family, const Slice& key,
PinnableSlice* value) { PinnableSlice* value) {
return Get(read_options, column_family, key, value, /*timestamp=*/nullptr);
}
Status DBImpl::Get(const ReadOptions& read_options,
ColumnFamilyHandle* column_family, const Slice& key,
PinnableSlice* value, std::string* timestamp) {
GetImplOptions get_impl_options; GetImplOptions get_impl_options;
get_impl_options.column_family = column_family; get_impl_options.column_family = column_family;
get_impl_options.value = value; get_impl_options.value = value;
return GetImpl(read_options, key, get_impl_options); get_impl_options.timestamp = timestamp;
Status s = GetImpl(read_options, key, get_impl_options);
return s;
} }
Status DBImpl::GetImpl(const ReadOptions& read_options, const Slice& key, Status DBImpl::GetImpl(const ReadOptions& read_options, const Slice& key,
GetImplOptions get_impl_options) { GetImplOptions& get_impl_options) {
assert(get_impl_options.value != nullptr || assert(get_impl_options.value != nullptr ||
get_impl_options.merge_operands != nullptr); get_impl_options.merge_operands != nullptr);
PERF_CPU_TIMER_GUARD(get_cpu_nanos, env_); PERF_CPU_TIMER_GUARD(get_cpu_nanos, env_);
@ -1583,10 +1591,14 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, const Slice& key,
bool skip_memtable = (read_options.read_tier == kPersistedTier && bool skip_memtable = (read_options.read_tier == kPersistedTier &&
has_unpersisted_data_.load(std::memory_order_relaxed)); has_unpersisted_data_.load(std::memory_order_relaxed));
bool done = false; bool done = false;
const Comparator* comparator =
get_impl_options.column_family->GetComparator();
size_t ts_sz = comparator->timestamp_size();
std::string* timestamp = ts_sz > 0 ? get_impl_options.timestamp : nullptr;
if (!skip_memtable) { if (!skip_memtable) {
// Get value associated with key // Get value associated with key
if (get_impl_options.get_value) { if (get_impl_options.get_value) {
if (sv->mem->Get(lkey, get_impl_options.value->GetSelf(), &s, if (sv->mem->Get(lkey, get_impl_options.value->GetSelf(), timestamp, &s,
&merge_context, &max_covering_tombstone_seq, &merge_context, &max_covering_tombstone_seq,
read_options, get_impl_options.callback, read_options, get_impl_options.callback,
get_impl_options.is_blob_index)) { get_impl_options.is_blob_index)) {
@ -1594,9 +1606,10 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, const Slice& key,
get_impl_options.value->PinSelf(); get_impl_options.value->PinSelf();
RecordTick(stats_, MEMTABLE_HIT); RecordTick(stats_, MEMTABLE_HIT);
} else if ((s.ok() || s.IsMergeInProgress()) && } else if ((s.ok() || s.IsMergeInProgress()) &&
sv->imm->Get(lkey, get_impl_options.value->GetSelf(), &s, sv->imm->Get(lkey, get_impl_options.value->GetSelf(),
&merge_context, &max_covering_tombstone_seq, timestamp, &s, &merge_context,
read_options, get_impl_options.callback, &max_covering_tombstone_seq, read_options,
get_impl_options.callback,
get_impl_options.is_blob_index)) { get_impl_options.is_blob_index)) {
done = true; done = true;
get_impl_options.value->PinSelf(); get_impl_options.value->PinSelf();
@ -1605,9 +1618,9 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, const Slice& key,
} else { } else {
// Get Merge Operands associated with key, Merge Operands should not be // Get Merge Operands associated with key, Merge Operands should not be
// merged and raw values should be returned to the user. // merged and raw values should be returned to the user.
if (sv->mem->Get(lkey, nullptr, &s, &merge_context, if (sv->mem->Get(lkey, /*value*/ nullptr, /*timestamp=*/nullptr, &s,
&max_covering_tombstone_seq, read_options, nullptr, &merge_context, &max_covering_tombstone_seq,
nullptr, false)) { read_options, nullptr, nullptr, false)) {
done = true; done = true;
RecordTick(stats_, MEMTABLE_HIT); RecordTick(stats_, MEMTABLE_HIT);
} else if ((s.ok() || s.IsMergeInProgress()) && } else if ((s.ok() || s.IsMergeInProgress()) &&
@ -1626,8 +1639,8 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, const Slice& key,
if (!done) { if (!done) {
PERF_TIMER_GUARD(get_from_output_files_time); PERF_TIMER_GUARD(get_from_output_files_time);
sv->current->Get( sv->current->Get(
read_options, lkey, get_impl_options.value, &s, &merge_context, read_options, lkey, get_impl_options.value, timestamp, &s,
&max_covering_tombstone_seq, &merge_context, &max_covering_tombstone_seq,
get_impl_options.get_value ? get_impl_options.value_found : nullptr, get_impl_options.get_value ? get_impl_options.value_found : nullptr,
nullptr, nullptr, nullptr, nullptr,
get_impl_options.get_value ? get_impl_options.callback : nullptr, get_impl_options.get_value ? get_impl_options.callback : nullptr,
@ -1738,13 +1751,14 @@ std::vector<Status> DBImpl::MultiGet(
has_unpersisted_data_.load(std::memory_order_relaxed)); has_unpersisted_data_.load(std::memory_order_relaxed));
bool done = false; bool done = false;
if (!skip_memtable) { if (!skip_memtable) {
if (super_version->mem->Get(lkey, value, &s, &merge_context, if (super_version->mem->Get(lkey, value, /*timestamp=*/nullptr, &s,
&max_covering_tombstone_seq, read_options)) { &merge_context, &max_covering_tombstone_seq,
read_options)) {
done = true; done = true;
RecordTick(stats_, MEMTABLE_HIT); RecordTick(stats_, MEMTABLE_HIT);
} else if (super_version->imm->Get(lkey, value, &s, &merge_context, } else if (super_version->imm->Get(
&max_covering_tombstone_seq, lkey, value, nullptr, &s, &merge_context,
read_options)) { &max_covering_tombstone_seq, read_options)) {
done = true; done = true;
RecordTick(stats_, MEMTABLE_HIT); RecordTick(stats_, MEMTABLE_HIT);
} }
@ -1752,8 +1766,9 @@ std::vector<Status> DBImpl::MultiGet(
if (!done) { if (!done) {
PinnableSlice pinnable_val; PinnableSlice pinnable_val;
PERF_TIMER_GUARD(get_from_output_files_time); PERF_TIMER_GUARD(get_from_output_files_time);
super_version->current->Get(read_options, lkey, &pinnable_val, &s, super_version->current->Get(read_options, lkey, &pinnable_val,
&merge_context, &max_covering_tombstone_seq); /*timestamp=*/nullptr, &s, &merge_context,
&max_covering_tombstone_seq);
value->assign(pinnable_val.data(), pinnable_val.size()); value->assign(pinnable_val.data(), pinnable_val.size());
RecordTick(stats_, MEMTABLE_MISS); RecordTick(stats_, MEMTABLE_MISS);
} }
@ -2436,7 +2451,8 @@ Status DBImpl::DropColumnFamilyImpl(ColumnFamilyHandle* column_family) {
bool DBImpl::KeyMayExist(const ReadOptions& read_options, bool DBImpl::KeyMayExist(const ReadOptions& read_options,
ColumnFamilyHandle* column_family, const Slice& key, ColumnFamilyHandle* column_family, const Slice& key,
std::string* value, bool* value_found) { std::string* value, std::string* timestamp,
bool* value_found) {
assert(value != nullptr); assert(value != nullptr);
if (value_found != nullptr) { if (value_found != nullptr) {
// falsify later if key-may-exist but can't fetch value // falsify later if key-may-exist but can't fetch value
@ -2449,6 +2465,7 @@ bool DBImpl::KeyMayExist(const ReadOptions& read_options,
get_impl_options.column_family = column_family; get_impl_options.column_family = column_family;
get_impl_options.value = &pinnable_val; get_impl_options.value = &pinnable_val;
get_impl_options.value_found = value_found; get_impl_options.value_found = value_found;
get_impl_options.timestamp = timestamp;
auto s = GetImpl(roptions, key, get_impl_options); auto s = GetImpl(roptions, key, get_impl_options);
value->assign(pinnable_val.data(), pinnable_val.size()); value->assign(pinnable_val.data(), pinnable_val.size());
@ -3819,8 +3836,9 @@ Status DBImpl::GetLatestSequenceForKey(SuperVersion* sv, const Slice& key,
*found_record_for_key = false; *found_record_for_key = false;
// Check if there is a record for this key in the latest memtable // Check if there is a record for this key in the latest memtable
sv->mem->Get(lkey, nullptr, &s, &merge_context, &max_covering_tombstone_seq, sv->mem->Get(lkey, nullptr, nullptr, &s, &merge_context,
seq, read_options, nullptr /*read_callback*/, is_blob_index); &max_covering_tombstone_seq, seq, read_options,
nullptr /*read_callback*/, is_blob_index);
if (!(s.ok() || s.IsNotFound() || s.IsMergeInProgress())) { if (!(s.ok() || s.IsNotFound() || s.IsMergeInProgress())) {
// unexpected error reading memtable. // unexpected error reading memtable.
@ -3845,8 +3863,9 @@ Status DBImpl::GetLatestSequenceForKey(SuperVersion* sv, const Slice& key,
} }
// Check if there is a record for this key in the immutable memtables // Check if there is a record for this key in the immutable memtables
sv->imm->Get(lkey, nullptr, &s, &merge_context, &max_covering_tombstone_seq, sv->imm->Get(lkey, nullptr, nullptr, &s, &merge_context,
seq, read_options, nullptr /*read_callback*/, is_blob_index); &max_covering_tombstone_seq, seq, read_options,
nullptr /*read_callback*/, is_blob_index);
if (!(s.ok() || s.IsNotFound() || s.IsMergeInProgress())) { if (!(s.ok() || s.IsNotFound() || s.IsMergeInProgress())) {
// unexpected error reading memtable. // unexpected error reading memtable.
@ -3871,7 +3890,7 @@ Status DBImpl::GetLatestSequenceForKey(SuperVersion* sv, const Slice& key,
} }
// Check if there is a record for this key in the immutable memtables // Check if there is a record for this key in the immutable memtables
sv->imm->GetFromHistory(lkey, nullptr, &s, &merge_context, sv->imm->GetFromHistory(lkey, nullptr, nullptr, &s, &merge_context,
&max_covering_tombstone_seq, seq, read_options, &max_covering_tombstone_seq, seq, read_options,
is_blob_index); is_blob_index);
@ -3899,7 +3918,7 @@ Status DBImpl::GetLatestSequenceForKey(SuperVersion* sv, const Slice& key,
// SST files if cache_only=true? // SST files if cache_only=true?
if (!cache_only) { if (!cache_only) {
// Check tables // Check tables
sv->current->Get(read_options, lkey, nullptr, &s, &merge_context, sv->current->Get(read_options, lkey, nullptr, nullptr, &s, &merge_context,
&max_covering_tombstone_seq, nullptr /* value_found */, &max_covering_tombstone_seq, nullptr /* value_found */,
found_record_for_key, seq, nullptr /*read_callback*/, found_record_for_key, seq, nullptr /*read_callback*/,
is_blob_index); is_blob_index);

View File

@ -163,6 +163,9 @@ class DBImpl : public DB {
virtual Status Get(const ReadOptions& options, virtual Status Get(const ReadOptions& options,
ColumnFamilyHandle* column_family, const Slice& key, ColumnFamilyHandle* column_family, const Slice& key,
PinnableSlice* value) override; PinnableSlice* value) override;
virtual Status Get(const ReadOptions& options,
ColumnFamilyHandle* column_family, const Slice& key,
PinnableSlice* value, std::string* timestamp) override;
using DB::GetMergeOperands; using DB::GetMergeOperands;
Status GetMergeOperands(const ReadOptions& options, Status GetMergeOperands(const ReadOptions& options,
@ -230,7 +233,7 @@ class DBImpl : public DB {
using DB::KeyMayExist; using DB::KeyMayExist;
virtual bool KeyMayExist(const ReadOptions& options, virtual bool KeyMayExist(const ReadOptions& options,
ColumnFamilyHandle* column_family, const Slice& key, ColumnFamilyHandle* column_family, const Slice& key,
std::string* value, std::string* value, std::string* timestamp,
bool* value_found = nullptr) override; bool* value_found = nullptr) override;
using DB::NewIterator; using DB::NewIterator;
@ -433,6 +436,7 @@ class DBImpl : public DB {
struct GetImplOptions { struct GetImplOptions {
ColumnFamilyHandle* column_family = nullptr; ColumnFamilyHandle* column_family = nullptr;
PinnableSlice* value = nullptr; PinnableSlice* value = nullptr;
std::string* timestamp = nullptr;
bool* value_found = nullptr; bool* value_found = nullptr;
ReadCallback* callback = nullptr; ReadCallback* callback = nullptr;
bool* is_blob_index = nullptr; bool* is_blob_index = nullptr;
@ -455,7 +459,7 @@ class DBImpl : public DB {
// If get_impl_options.get_value = false get merge operands associated with // If get_impl_options.get_value = false get merge operands associated with
// get_impl_options.key via get_impl_options.merge_operands // get_impl_options.key via get_impl_options.merge_operands
Status GetImpl(const ReadOptions& options, const Slice& key, Status GetImpl(const ReadOptions& options, const Slice& key,
GetImplOptions get_impl_options); GetImplOptions& get_impl_options);
ArenaWrappedDBIter* NewIteratorImpl(const ReadOptions& options, ArenaWrappedDBIter* NewIteratorImpl(const ReadOptions& options,
ColumnFamilyData* cfd, ColumnFamilyData* cfd,

View File

@ -48,14 +48,16 @@ Status DBImplReadOnly::Get(const ReadOptions& read_options,
SequenceNumber max_covering_tombstone_seq = 0; SequenceNumber max_covering_tombstone_seq = 0;
LookupKey lkey(key, snapshot); LookupKey lkey(key, snapshot);
PERF_TIMER_STOP(get_snapshot_time); PERF_TIMER_STOP(get_snapshot_time);
if (super_version->mem->Get(lkey, pinnable_val->GetSelf(), &s, &merge_context, if (super_version->mem->Get(lkey, pinnable_val->GetSelf(),
/*timestamp=*/nullptr, &s, &merge_context,
&max_covering_tombstone_seq, read_options)) { &max_covering_tombstone_seq, read_options)) {
pinnable_val->PinSelf(); pinnable_val->PinSelf();
RecordTick(stats_, MEMTABLE_HIT); RecordTick(stats_, MEMTABLE_HIT);
} else { } else {
PERF_TIMER_GUARD(get_from_output_files_time); PERF_TIMER_GUARD(get_from_output_files_time);
super_version->current->Get(read_options, lkey, pinnable_val, &s, super_version->current->Get(read_options, lkey, pinnable_val,
&merge_context, &max_covering_tombstone_seq); /*timestamp=*/nullptr, &s, &merge_context,
&max_covering_tombstone_seq);
RecordTick(stats_, MEMTABLE_MISS); RecordTick(stats_, MEMTABLE_MISS);
} }
RecordTick(stats_, NUMBER_KEYS_READ); RecordTick(stats_, NUMBER_KEYS_READ);

View File

@ -340,15 +340,16 @@ Status DBImplSecondary::GetImpl(const ReadOptions& read_options,
PERF_TIMER_STOP(get_snapshot_time); PERF_TIMER_STOP(get_snapshot_time);
bool done = false; bool done = false;
if (super_version->mem->Get(lkey, pinnable_val->GetSelf(), &s, &merge_context, if (super_version->mem->Get(lkey, pinnable_val->GetSelf(),
/*timestamp=*/nullptr, &s, &merge_context,
&max_covering_tombstone_seq, read_options)) { &max_covering_tombstone_seq, read_options)) {
done = true; done = true;
pinnable_val->PinSelf(); pinnable_val->PinSelf();
RecordTick(stats_, MEMTABLE_HIT); RecordTick(stats_, MEMTABLE_HIT);
} else if ((s.ok() || s.IsMergeInProgress()) && } else if ((s.ok() || s.IsMergeInProgress()) &&
super_version->imm->Get( super_version->imm->Get(
lkey, pinnable_val->GetSelf(), &s, &merge_context, lkey, pinnable_val->GetSelf(), /*timestamp=*/nullptr, &s,
&max_covering_tombstone_seq, read_options)) { &merge_context, &max_covering_tombstone_seq, read_options)) {
done = true; done = true;
pinnable_val->PinSelf(); pinnable_val->PinSelf();
RecordTick(stats_, MEMTABLE_HIT); RecordTick(stats_, MEMTABLE_HIT);
@ -359,8 +360,9 @@ Status DBImplSecondary::GetImpl(const ReadOptions& read_options,
} }
if (!done) { if (!done) {
PERF_TIMER_GUARD(get_from_output_files_time); PERF_TIMER_GUARD(get_from_output_files_time);
super_version->current->Get(read_options, lkey, pinnable_val, &s, super_version->current->Get(read_options, lkey, pinnable_val,
&merge_context, &max_covering_tombstone_seq); /*timestamp=*/nullptr, &s, &merge_context,
&max_covering_tombstone_seq);
RecordTick(stats_, MEMTABLE_MISS); RecordTick(stats_, MEMTABLE_MISS);
} }
{ {

View File

@ -261,7 +261,7 @@ TEST_F(DBMemTableTest, ConcurrentMergeWrite) {
ReadOptions roptions; ReadOptions roptions;
SequenceNumber max_covering_tombstone_seq = 0; SequenceNumber max_covering_tombstone_seq = 0;
LookupKey lkey("key", kMaxSequenceNumber); LookupKey lkey("key", kMaxSequenceNumber);
res = mem->Get(lkey, &value, &status, &merge_context, res = mem->Get(lkey, &value, /*timestamp=*/nullptr, &status, &merge_context,
&max_covering_tombstone_seq, roptions); &max_covering_tombstone_seq, roptions);
ASSERT_TRUE(res); ASSERT_TRUE(res);
uint64_t ivalue = DecodeFixed64(Slice(value).data()); uint64_t ivalue = DecodeFixed64(Slice(value).data());

View File

@ -162,6 +162,11 @@ inline Slice StripTimestampFromUserKey(const Slice& user_key, size_t ts_sz) {
return Slice(user_key.data(), user_key.size() - ts_sz); return Slice(user_key.data(), user_key.size() - ts_sz);
} }
inline Slice ExtractTimestampFromUserKey(const Slice& user_key, size_t ts_sz) {
assert(user_key.size() >= ts_sz);
return Slice(user_key.data() + user_key.size() - ts_sz, ts_sz);
}
inline uint64_t ExtractInternalKeyFooter(const Slice& internal_key) { inline uint64_t ExtractInternalKeyFooter(const Slice& internal_key) {
assert(internal_key.size() >= 8); assert(internal_key.size() >= 8);
const size_t n = internal_key.size(); const size_t n = internal_key.size();

View File

@ -600,6 +600,7 @@ struct Saver {
bool* merge_in_progress; bool* merge_in_progress;
std::string* value; std::string* value;
SequenceNumber seq; SequenceNumber seq;
std::string* timestamp;
const MergeOperator* merge_operator; const MergeOperator* merge_operator;
// the merge operations encountered; // the merge operations encountered;
MergeContext* merge_context; MergeContext* merge_context;
@ -643,9 +644,11 @@ static bool SaveValue(void* arg, const char* entry) {
uint32_t key_length; uint32_t key_length;
const char* key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length); const char* key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length);
Slice user_key_slice = Slice(key_ptr, key_length - 8); Slice user_key_slice = Slice(key_ptr, key_length - 8);
if (s->mem->GetInternalKeyComparator() const Comparator* user_comparator =
.user_comparator() s->mem->GetInternalKeyComparator().user_comparator();
->CompareWithoutTimestamp(user_key_slice, s->key->user_key()) == 0) { size_t ts_sz = user_comparator->timestamp_size();
if (user_comparator->CompareWithoutTimestamp(user_key_slice,
s->key->user_key()) == 0) {
// Correct user key // Correct user key
const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8); const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8);
ValueType type; ValueType type;
@ -713,6 +716,11 @@ static bool SaveValue(void* arg, const char* entry) {
if (s->is_blob_index != nullptr) { if (s->is_blob_index != nullptr) {
*(s->is_blob_index) = (type == kTypeBlobIndex); *(s->is_blob_index) = (type == kTypeBlobIndex);
} }
if (ts_sz > 0 && s->timestamp != nullptr) {
Slice ts = ExtractTimestampFromUserKey(user_key_slice, ts_sz);
s->timestamp->assign(ts.data(), ts.size());
}
return false; return false;
} }
case kTypeDeletion: case kTypeDeletion:
@ -767,7 +775,8 @@ static bool SaveValue(void* arg, const char* entry) {
return false; return false;
} }
bool MemTable::Get(const LookupKey& key, std::string* value, Status* s, bool MemTable::Get(const LookupKey& key, std::string* value,
std::string* timestamp, Status* s,
MergeContext* merge_context, MergeContext* merge_context,
SequenceNumber* max_covering_tombstone_seq, SequenceNumber* max_covering_tombstone_seq,
SequenceNumber* seq, const ReadOptions& read_opts, SequenceNumber* seq, const ReadOptions& read_opts,
@ -816,7 +825,7 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s,
PERF_COUNTER_ADD(bloom_memtable_hit_count, 1); PERF_COUNTER_ADD(bloom_memtable_hit_count, 1);
} }
GetFromTable(key, *max_covering_tombstone_seq, do_merge, callback, GetFromTable(key, *max_covering_tombstone_seq, do_merge, callback,
is_blob_index, value, s, merge_context, seq, is_blob_index, value, timestamp, s, merge_context, seq,
&found_final_value, &merge_in_progress); &found_final_value, &merge_in_progress);
} }
@ -831,7 +840,8 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s,
void MemTable::GetFromTable(const LookupKey& key, void MemTable::GetFromTable(const LookupKey& key,
SequenceNumber max_covering_tombstone_seq, SequenceNumber max_covering_tombstone_seq,
bool do_merge, ReadCallback* callback, bool do_merge, ReadCallback* callback,
bool* is_blob_index, std::string* value, Status* s, bool* is_blob_index, std::string* value,
std::string* timestamp, Status* s,
MergeContext* merge_context, SequenceNumber* seq, MergeContext* merge_context, SequenceNumber* seq,
bool* found_final_value, bool* merge_in_progress) { bool* found_final_value, bool* merge_in_progress) {
Saver saver; Saver saver;
@ -840,6 +850,7 @@ void MemTable::GetFromTable(const LookupKey& key,
saver.merge_in_progress = merge_in_progress; saver.merge_in_progress = merge_in_progress;
saver.key = &key; saver.key = &key;
saver.value = value; saver.value = value;
saver.timestamp = timestamp;
saver.seq = kMaxSequenceNumber; saver.seq = kMaxSequenceNumber;
saver.mem = this; saver.mem = this;
saver.merge_context = merge_context; saver.merge_context = merge_context;
@ -908,9 +919,9 @@ void MemTable::MultiGet(const ReadOptions& read_options, MultiGetRange* range,
range_del_iter->MaxCoveringTombstoneSeqnum(iter->lkey->user_key())); range_del_iter->MaxCoveringTombstoneSeqnum(iter->lkey->user_key()));
} }
GetFromTable(*(iter->lkey), iter->max_covering_tombstone_seq, true, GetFromTable(*(iter->lkey), iter->max_covering_tombstone_seq, true,
callback, is_blob, iter->value->GetSelf(), iter->s, callback, is_blob, iter->value->GetSelf(),
&(iter->merge_context), &seq, &found_final_value, /*timestamp=*/nullptr, iter->s, &(iter->merge_context), &seq,
&merge_in_progress); &found_final_value, &merge_in_progress);
if (!found_final_value && merge_in_progress) { if (!found_final_value && merge_in_progress) {
*(iter->s) = Status::MergeInProgress(); *(iter->s) = Status::MergeInProgress();

View File

@ -212,16 +212,27 @@ class MemTable {
MergeContext* merge_context, MergeContext* merge_context,
SequenceNumber* max_covering_tombstone_seq, SequenceNumber* seq, SequenceNumber* max_covering_tombstone_seq, SequenceNumber* seq,
const ReadOptions& read_opts, ReadCallback* callback = nullptr, const ReadOptions& read_opts, ReadCallback* callback = nullptr,
bool* is_blob_index = nullptr, bool do_merge = true) {
return Get(key, value, /*timestamp=*/nullptr, s, merge_context,
max_covering_tombstone_seq, seq, read_opts, callback,
is_blob_index, do_merge);
}
bool Get(const LookupKey& key, std::string* value, std::string* timestamp,
Status* s, MergeContext* merge_context,
SequenceNumber* max_covering_tombstone_seq, SequenceNumber* seq,
const ReadOptions& read_opts, ReadCallback* callback = nullptr,
bool* is_blob_index = nullptr, bool do_merge = true); bool* is_blob_index = nullptr, bool do_merge = true);
bool Get(const LookupKey& key, std::string* value, Status* s, bool Get(const LookupKey& key, std::string* value, std::string* timestamp,
MergeContext* merge_context, Status* s, MergeContext* merge_context,
SequenceNumber* max_covering_tombstone_seq, SequenceNumber* max_covering_tombstone_seq,
const ReadOptions& read_opts, ReadCallback* callback = nullptr, const ReadOptions& read_opts, ReadCallback* callback = nullptr,
bool* is_blob_index = nullptr, bool do_merge = true) { bool* is_blob_index = nullptr, bool do_merge = true) {
SequenceNumber seq; SequenceNumber seq;
return Get(key, value, s, merge_context, max_covering_tombstone_seq, &seq, return Get(key, value, timestamp, s, merge_context,
read_opts, callback, is_blob_index, do_merge); max_covering_tombstone_seq, &seq, read_opts, callback,
is_blob_index, do_merge);
} }
void MultiGet(const ReadOptions& read_options, MultiGetRange* range, void MultiGet(const ReadOptions& read_options, MultiGetRange* range,
@ -532,9 +543,9 @@ class MemTable {
void GetFromTable(const LookupKey& key, void GetFromTable(const LookupKey& key,
SequenceNumber max_covering_tombstone_seq, bool do_merge, SequenceNumber max_covering_tombstone_seq, bool do_merge,
ReadCallback* callback, bool* is_blob_index, ReadCallback* callback, bool* is_blob_index,
std::string* value, Status* s, MergeContext* merge_context, std::string* value, std::string* timestamp, Status* s,
SequenceNumber* seq, bool* found_final_value, MergeContext* merge_context, SequenceNumber* seq,
bool* merge_in_progress); bool* found_final_value, bool* merge_in_progress);
}; };
extern const char* EncodeKey(std::string* scratch, const Slice& target); extern const char* EncodeKey(std::string* scratch, const Slice& target);

View File

@ -104,11 +104,12 @@ int MemTableList::NumFlushed() const {
// Return the most recent value found, if any. // Return the most recent value found, if any.
// Operands stores the list of merge operations to apply, so far. // Operands stores the list of merge operations to apply, so far.
bool MemTableListVersion::Get(const LookupKey& key, std::string* value, bool MemTableListVersion::Get(const LookupKey& key, std::string* value,
Status* s, MergeContext* merge_context, std::string* timestamp, Status* s,
MergeContext* merge_context,
SequenceNumber* max_covering_tombstone_seq, SequenceNumber* max_covering_tombstone_seq,
SequenceNumber* seq, const ReadOptions& read_opts, SequenceNumber* seq, const ReadOptions& read_opts,
ReadCallback* callback, bool* is_blob_index) { ReadCallback* callback, bool* is_blob_index) {
return GetFromList(&memlist_, key, value, s, merge_context, return GetFromList(&memlist_, key, value, timestamp, s, merge_context,
max_covering_tombstone_seq, seq, read_opts, callback, max_covering_tombstone_seq, seq, read_opts, callback,
is_blob_index); is_blob_index);
} }
@ -128,9 +129,9 @@ bool MemTableListVersion::GetMergeOperands(
const LookupKey& key, Status* s, MergeContext* merge_context, const LookupKey& key, Status* s, MergeContext* merge_context,
SequenceNumber* max_covering_tombstone_seq, const ReadOptions& read_opts) { SequenceNumber* max_covering_tombstone_seq, const ReadOptions& read_opts) {
for (MemTable* memtable : memlist_) { for (MemTable* memtable : memlist_) {
bool done = memtable->Get(key, nullptr, s, merge_context, bool done = memtable->Get(key, /*value*/ nullptr, /*timestamp*/ nullptr, s,
max_covering_tombstone_seq, read_opts, nullptr, merge_context, max_covering_tombstone_seq,
nullptr, false); read_opts, nullptr, nullptr, false);
if (done) { if (done) {
return true; return true;
} }
@ -139,17 +140,17 @@ bool MemTableListVersion::GetMergeOperands(
} }
bool MemTableListVersion::GetFromHistory( bool MemTableListVersion::GetFromHistory(
const LookupKey& key, std::string* value, Status* s, const LookupKey& key, std::string* value, std::string* timestamp, Status* s,
MergeContext* merge_context, SequenceNumber* max_covering_tombstone_seq, MergeContext* merge_context, SequenceNumber* max_covering_tombstone_seq,
SequenceNumber* seq, const ReadOptions& read_opts, bool* is_blob_index) { SequenceNumber* seq, const ReadOptions& read_opts, bool* is_blob_index) {
return GetFromList(&memlist_history_, key, value, s, merge_context, return GetFromList(&memlist_history_, key, value, timestamp, s, merge_context,
max_covering_tombstone_seq, seq, read_opts, max_covering_tombstone_seq, seq, read_opts,
nullptr /*read_callback*/, is_blob_index); nullptr /*read_callback*/, is_blob_index);
} }
bool MemTableListVersion::GetFromList( bool MemTableListVersion::GetFromList(
std::list<MemTable*>* list, const LookupKey& key, std::string* value, std::list<MemTable*>* list, const LookupKey& key, std::string* value,
Status* s, MergeContext* merge_context, std::string* timestamp, Status* s, MergeContext* merge_context,
SequenceNumber* max_covering_tombstone_seq, SequenceNumber* seq, SequenceNumber* max_covering_tombstone_seq, SequenceNumber* seq,
const ReadOptions& read_opts, ReadCallback* callback, bool* is_blob_index) { const ReadOptions& read_opts, ReadCallback* callback, bool* is_blob_index) {
*seq = kMaxSequenceNumber; *seq = kMaxSequenceNumber;
@ -157,9 +158,9 @@ bool MemTableListVersion::GetFromList(
for (auto& memtable : *list) { for (auto& memtable : *list) {
SequenceNumber current_seq = kMaxSequenceNumber; SequenceNumber current_seq = kMaxSequenceNumber;
bool done = bool done = memtable->Get(key, value, timestamp, s, merge_context,
memtable->Get(key, value, s, merge_context, max_covering_tombstone_seq, max_covering_tombstone_seq, &current_seq,
&current_seq, read_opts, callback, is_blob_index); read_opts, callback, is_blob_index);
if (*seq == kMaxSequenceNumber) { if (*seq == kMaxSequenceNumber) {
// Store the most recent sequence number of any operation on this key. // Store the most recent sequence number of any operation on this key.
// Since we only care about the most recent change, we only need to // Since we only care about the most recent change, we only need to

View File

@ -58,20 +58,21 @@ class MemTableListVersion {
// If any operation was found for this key, its most recent sequence number // If any operation was found for this key, its most recent sequence number
// will be stored in *seq on success (regardless of whether true/false is // will be stored in *seq on success (regardless of whether true/false is
// returned). Otherwise, *seq will be set to kMaxSequenceNumber. // returned). Otherwise, *seq will be set to kMaxSequenceNumber.
bool Get(const LookupKey& key, std::string* value, Status* s, bool Get(const LookupKey& key, std::string* value, std::string* timestamp,
MergeContext* merge_context, Status* s, MergeContext* merge_context,
SequenceNumber* max_covering_tombstone_seq, SequenceNumber* seq, SequenceNumber* max_covering_tombstone_seq, SequenceNumber* seq,
const ReadOptions& read_opts, ReadCallback* callback = nullptr, const ReadOptions& read_opts, ReadCallback* callback = nullptr,
bool* is_blob_index = nullptr); bool* is_blob_index = nullptr);
bool Get(const LookupKey& key, std::string* value, Status* s, bool Get(const LookupKey& key, std::string* value, std::string* timestamp,
MergeContext* merge_context, Status* s, MergeContext* merge_context,
SequenceNumber* max_covering_tombstone_seq, SequenceNumber* max_covering_tombstone_seq,
const ReadOptions& read_opts, ReadCallback* callback = nullptr, const ReadOptions& read_opts, ReadCallback* callback = nullptr,
bool* is_blob_index = nullptr) { bool* is_blob_index = nullptr) {
SequenceNumber seq; SequenceNumber seq;
return Get(key, value, s, merge_context, max_covering_tombstone_seq, &seq, return Get(key, value, timestamp, s, merge_context,
read_opts, callback, is_blob_index); max_covering_tombstone_seq, &seq, read_opts, callback,
is_blob_index);
} }
void MultiGet(const ReadOptions& read_options, MultiGetRange* range, void MultiGet(const ReadOptions& read_options, MultiGetRange* range,
@ -88,18 +89,20 @@ class MemTableListVersion {
// have already been flushed. Should only be used from in-memory only // have already been flushed. Should only be used from in-memory only
// queries (such as Transaction validation) as the history may contain // queries (such as Transaction validation) as the history may contain
// writes that are also present in the SST files. // writes that are also present in the SST files.
bool GetFromHistory(const LookupKey& key, std::string* value, Status* s, bool GetFromHistory(const LookupKey& key, std::string* value,
std::string* timestamp, Status* s,
MergeContext* merge_context, MergeContext* merge_context,
SequenceNumber* max_covering_tombstone_seq, SequenceNumber* max_covering_tombstone_seq,
SequenceNumber* seq, const ReadOptions& read_opts, SequenceNumber* seq, const ReadOptions& read_opts,
bool* is_blob_index = nullptr); bool* is_blob_index = nullptr);
bool GetFromHistory(const LookupKey& key, std::string* value, Status* s, bool GetFromHistory(const LookupKey& key, std::string* value,
std::string* timestamp, Status* s,
MergeContext* merge_context, MergeContext* merge_context,
SequenceNumber* max_covering_tombstone_seq, SequenceNumber* max_covering_tombstone_seq,
const ReadOptions& read_opts, const ReadOptions& read_opts,
bool* is_blob_index = nullptr) { bool* is_blob_index = nullptr) {
SequenceNumber seq; SequenceNumber seq;
return GetFromHistory(key, value, s, merge_context, return GetFromHistory(key, value, timestamp, s, merge_context,
max_covering_tombstone_seq, &seq, read_opts, max_covering_tombstone_seq, &seq, read_opts,
is_blob_index); is_blob_index);
} }
@ -148,7 +151,8 @@ class MemTableListVersion {
void TrimHistory(autovector<MemTable*>* to_delete, size_t usage); void TrimHistory(autovector<MemTable*>* to_delete, size_t usage);
bool GetFromList(std::list<MemTable*>* list, const LookupKey& key, bool GetFromList(std::list<MemTable*>* list, const LookupKey& key,
std::string* value, Status* s, MergeContext* merge_context, std::string* value, std::string* timestamp, Status* s,
MergeContext* merge_context,
SequenceNumber* max_covering_tombstone_seq, SequenceNumber* max_covering_tombstone_seq,
SequenceNumber* seq, const ReadOptions& read_opts, SequenceNumber* seq, const ReadOptions& read_opts,
ReadCallback* callback = nullptr, ReadCallback* callback = nullptr,

View File

@ -221,8 +221,9 @@ TEST_F(MemTableListTest, GetTest) {
autovector<MemTable*> to_delete; autovector<MemTable*> to_delete;
LookupKey lkey("key1", seq); LookupKey lkey("key1", seq);
bool found = list.current()->Get(lkey, &value, &s, &merge_context, bool found = list.current()->Get(
&max_covering_tombstone_seq, ReadOptions()); lkey, &value, /*timestamp*/nullptr, &s, &merge_context,
&max_covering_tombstone_seq, ReadOptions());
ASSERT_FALSE(found); ASSERT_FALSE(found);
// Create a MemTable // Create a MemTable
@ -244,19 +245,22 @@ TEST_F(MemTableListTest, GetTest) {
// Fetch the newly written keys // Fetch the newly written keys
merge_context.Clear(); merge_context.Clear();
found = mem->Get(LookupKey("key1", seq), &value, &s, &merge_context, found = mem->Get(LookupKey("key1", seq), &value,
/*timestamp*/nullptr, &s, &merge_context,
&max_covering_tombstone_seq, ReadOptions()); &max_covering_tombstone_seq, ReadOptions());
ASSERT_TRUE(s.ok() && found); ASSERT_TRUE(s.ok() && found);
ASSERT_EQ(value, "value1"); ASSERT_EQ(value, "value1");
merge_context.Clear(); merge_context.Clear();
found = mem->Get(LookupKey("key1", 2), &value, &s, &merge_context, found = mem->Get(LookupKey("key1", 2), &value,
/*timestamp*/nullptr, &s, &merge_context,
&max_covering_tombstone_seq, ReadOptions()); &max_covering_tombstone_seq, ReadOptions());
// MemTable found out that this key is *not* found (at this sequence#) // MemTable found out that this key is *not* found (at this sequence#)
ASSERT_TRUE(found && s.IsNotFound()); ASSERT_TRUE(found && s.IsNotFound());
merge_context.Clear(); merge_context.Clear();
found = mem->Get(LookupKey("key2", seq), &value, &s, &merge_context, found = mem->Get(LookupKey("key2", seq), &value,
/*timestamp*/nullptr, &s, &merge_context,
&max_covering_tombstone_seq, ReadOptions()); &max_covering_tombstone_seq, ReadOptions());
ASSERT_TRUE(s.ok() && found); ASSERT_TRUE(s.ok() && found);
ASSERT_EQ(value, "value2.2"); ASSERT_EQ(value, "value2.2");
@ -283,28 +287,29 @@ TEST_F(MemTableListTest, GetTest) {
// Fetch keys via MemTableList // Fetch keys via MemTableList
merge_context.Clear(); merge_context.Clear();
found = found = list.current()->Get(
list.current()->Get(LookupKey("key1", seq), &value, &s, &merge_context, LookupKey("key1", seq), &value, /*timestamp*/nullptr, &s,
&max_covering_tombstone_seq, ReadOptions()); &merge_context, &max_covering_tombstone_seq, ReadOptions());
ASSERT_TRUE(found && s.IsNotFound()); ASSERT_TRUE(found && s.IsNotFound());
merge_context.Clear(); merge_context.Clear();
found = list.current()->Get(LookupKey("key1", saved_seq), &value, &s, found = list.current()->Get(
&merge_context, &max_covering_tombstone_seq, LookupKey("key1", saved_seq), &value, /*timestamp*/nullptr,
ReadOptions()); &s, &merge_context, &max_covering_tombstone_seq, ReadOptions());
ASSERT_TRUE(s.ok() && found); ASSERT_TRUE(s.ok() && found);
ASSERT_EQ("value1", value); ASSERT_EQ("value1", value);
merge_context.Clear(); merge_context.Clear();
found = found = list.current()->Get(
list.current()->Get(LookupKey("key2", seq), &value, &s, &merge_context, LookupKey("key2", seq), &value, /*timestamp*/nullptr, &s,
&max_covering_tombstone_seq, ReadOptions()); &merge_context, &max_covering_tombstone_seq, ReadOptions());
ASSERT_TRUE(s.ok() && found); ASSERT_TRUE(s.ok() && found);
ASSERT_EQ(value, "value2.3"); ASSERT_EQ(value, "value2.3");
merge_context.Clear(); merge_context.Clear();
found = list.current()->Get(LookupKey("key2", 1), &value, &s, &merge_context, found = list.current()->Get(
&max_covering_tombstone_seq, ReadOptions()); LookupKey("key2", 1), &value, /*timestamp*/nullptr, &s,
&merge_context, &max_covering_tombstone_seq, ReadOptions());
ASSERT_FALSE(found); ASSERT_FALSE(found);
ASSERT_EQ(2, list.NumNotFlushed()); ASSERT_EQ(2, list.NumNotFlushed());
@ -333,8 +338,9 @@ TEST_F(MemTableListTest, GetFromHistoryTest) {
autovector<MemTable*> to_delete; autovector<MemTable*> to_delete;
LookupKey lkey("key1", seq); LookupKey lkey("key1", seq);
bool found = list.current()->Get(lkey, &value, &s, &merge_context, bool found = list.current()->Get(
&max_covering_tombstone_seq, ReadOptions()); lkey, &value, /*timestamp*/nullptr, &s, &merge_context,
&max_covering_tombstone_seq, ReadOptions());
ASSERT_FALSE(found); ASSERT_FALSE(found);
// Create a MemTable // Create a MemTable
@ -355,13 +361,15 @@ TEST_F(MemTableListTest, GetFromHistoryTest) {
// Fetch the newly written keys // Fetch the newly written keys
merge_context.Clear(); merge_context.Clear();
found = mem->Get(LookupKey("key1", seq), &value, &s, &merge_context, found = mem->Get(LookupKey("key1", seq), &value,
/*timestamp*/nullptr, &s, &merge_context,
&max_covering_tombstone_seq, ReadOptions()); &max_covering_tombstone_seq, ReadOptions());
// MemTable found out that this key is *not* found (at this sequence#) // MemTable found out that this key is *not* found (at this sequence#)
ASSERT_TRUE(found && s.IsNotFound()); ASSERT_TRUE(found && s.IsNotFound());
merge_context.Clear(); merge_context.Clear();
found = mem->Get(LookupKey("key2", seq), &value, &s, &merge_context, found = mem->Get(LookupKey("key2", seq), &value,
/*timestamp*/nullptr, &s, &merge_context,
&max_covering_tombstone_seq, ReadOptions()); &max_covering_tombstone_seq, ReadOptions());
ASSERT_TRUE(s.ok() && found); ASSERT_TRUE(s.ok() && found);
ASSERT_EQ(value, "value2.2"); ASSERT_EQ(value, "value2.2");
@ -372,15 +380,15 @@ TEST_F(MemTableListTest, GetFromHistoryTest) {
// Fetch keys via MemTableList // Fetch keys via MemTableList
merge_context.Clear(); merge_context.Clear();
found = found = list.current()->Get(LookupKey("key1", seq), &value,
list.current()->Get(LookupKey("key1", seq), &value, &s, &merge_context, /*timestamp*/nullptr, &s, &merge_context,
&max_covering_tombstone_seq, ReadOptions()); &max_covering_tombstone_seq, ReadOptions());
ASSERT_TRUE(found && s.IsNotFound()); ASSERT_TRUE(found && s.IsNotFound());
merge_context.Clear(); merge_context.Clear();
found = found = list.current()->Get(LookupKey("key2", seq), &value,
list.current()->Get(LookupKey("key2", seq), &value, &s, &merge_context, /*timestamp*/nullptr, &s, &merge_context,
&max_covering_tombstone_seq, ReadOptions()); &max_covering_tombstone_seq, ReadOptions());
ASSERT_TRUE(s.ok() && found); ASSERT_TRUE(s.ok() && found);
ASSERT_EQ("value2.2", value); ASSERT_EQ("value2.2", value);
@ -400,27 +408,27 @@ TEST_F(MemTableListTest, GetFromHistoryTest) {
// Verify keys are no longer in MemTableList // Verify keys are no longer in MemTableList
merge_context.Clear(); merge_context.Clear();
found = found = list.current()->Get(LookupKey("key1", seq), &value,
list.current()->Get(LookupKey("key1", seq), &value, &s, &merge_context, /*timestamp*/nullptr, &s, &merge_context,
&max_covering_tombstone_seq, ReadOptions()); &max_covering_tombstone_seq, ReadOptions());
ASSERT_FALSE(found); ASSERT_FALSE(found);
merge_context.Clear(); merge_context.Clear();
found = found = list.current()->Get(LookupKey("key2", seq), &value,
list.current()->Get(LookupKey("key2", seq), &value, &s, &merge_context, /*timestamp*/nullptr, &s, &merge_context,
&max_covering_tombstone_seq, ReadOptions()); &max_covering_tombstone_seq, ReadOptions());
ASSERT_FALSE(found); ASSERT_FALSE(found);
// Verify keys are present in history // Verify keys are present in history
merge_context.Clear(); merge_context.Clear();
found = list.current()->GetFromHistory( found = list.current()->GetFromHistory(
LookupKey("key1", seq), &value, &s, &merge_context, LookupKey("key1", seq), &value, /*timestamp*/nullptr, &s, &merge_context,
&max_covering_tombstone_seq, ReadOptions()); &max_covering_tombstone_seq, ReadOptions());
ASSERT_TRUE(found && s.IsNotFound()); ASSERT_TRUE(found && s.IsNotFound());
merge_context.Clear(); merge_context.Clear();
found = list.current()->GetFromHistory( found = list.current()->GetFromHistory(
LookupKey("key2", seq), &value, &s, &merge_context, LookupKey("key2", seq), &value, /*timestamp*/nullptr, &s, &merge_context,
&max_covering_tombstone_seq, ReadOptions()); &max_covering_tombstone_seq, ReadOptions());
ASSERT_TRUE(found); ASSERT_TRUE(found);
ASSERT_EQ("value2.2", value); ASSERT_EQ("value2.2", value);
@ -462,42 +470,42 @@ TEST_F(MemTableListTest, GetFromHistoryTest) {
// Verify keys are no longer in MemTableList // Verify keys are no longer in MemTableList
merge_context.Clear(); merge_context.Clear();
found = found = list.current()->Get(LookupKey("key1", seq), &value,
list.current()->Get(LookupKey("key1", seq), &value, &s, &merge_context, /*timestamp*/nullptr, &s, &merge_context,
&max_covering_tombstone_seq, ReadOptions()); &max_covering_tombstone_seq, ReadOptions());
ASSERT_FALSE(found); ASSERT_FALSE(found);
merge_context.Clear(); merge_context.Clear();
found = found = list.current()->Get(LookupKey("key2", seq), &value,
list.current()->Get(LookupKey("key2", seq), &value, &s, &merge_context, /*timestamp*/nullptr, &s, &merge_context,
&max_covering_tombstone_seq, ReadOptions()); &max_covering_tombstone_seq, ReadOptions());
ASSERT_FALSE(found); ASSERT_FALSE(found);
merge_context.Clear(); merge_context.Clear();
found = found = list.current()->Get(LookupKey("key3", seq), &value,
list.current()->Get(LookupKey("key3", seq), &value, &s, &merge_context, /*timestamp*/nullptr, &s, &merge_context,
&max_covering_tombstone_seq, ReadOptions()); &max_covering_tombstone_seq, ReadOptions());
ASSERT_FALSE(found); ASSERT_FALSE(found);
// Verify that the second memtable's keys are in the history // Verify that the second memtable's keys are in the history
merge_context.Clear(); merge_context.Clear();
found = list.current()->GetFromHistory( found = list.current()->GetFromHistory(
LookupKey("key1", seq), &value, &s, &merge_context, LookupKey("key1", seq), &value, /*timestamp*/nullptr, &s, &merge_context,
&max_covering_tombstone_seq, ReadOptions()); &max_covering_tombstone_seq, ReadOptions());
ASSERT_TRUE(found && s.IsNotFound()); ASSERT_TRUE(found && s.IsNotFound());
merge_context.Clear(); merge_context.Clear();
found = list.current()->GetFromHistory( found = list.current()->GetFromHistory(
LookupKey("key3", seq), &value, &s, &merge_context, LookupKey("key3", seq), &value, /*timestamp*/nullptr, &s, &merge_context,
&max_covering_tombstone_seq, ReadOptions()); &max_covering_tombstone_seq, ReadOptions());
ASSERT_TRUE(found); ASSERT_TRUE(found);
ASSERT_EQ("value3", value); ASSERT_EQ("value3", value);
// Verify that key2 from the first memtable is no longer in the history // Verify that key2 from the first memtable is no longer in the history
merge_context.Clear(); merge_context.Clear();
found = found = list.current()->Get(LookupKey("key2", seq), &value,
list.current()->Get(LookupKey("key2", seq), &value, &s, &merge_context, /*timestamp*/nullptr, &s, &merge_context,
&max_covering_tombstone_seq, ReadOptions()); &max_covering_tombstone_seq, ReadOptions());
ASSERT_FALSE(found); ASSERT_FALSE(found);
// Cleanup // Cleanup

View File

@ -1754,7 +1754,7 @@ Version::Version(ColumnFamilyData* column_family_data, VersionSet* vset,
version_number_(version_number) {} version_number_(version_number) {}
void Version::Get(const ReadOptions& read_options, const LookupKey& k, void Version::Get(const ReadOptions& read_options, const LookupKey& k,
PinnableSlice* value, Status* status, PinnableSlice* value, std::string* timestamp, Status* status,
MergeContext* merge_context, MergeContext* merge_context,
SequenceNumber* max_covering_tombstone_seq, bool* value_found, SequenceNumber* max_covering_tombstone_seq, bool* value_found,
bool* key_exists, SequenceNumber* seq, ReadCallback* callback, bool* key_exists, SequenceNumber* seq, ReadCallback* callback,
@ -1778,8 +1778,8 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k,
GetContext get_context( GetContext get_context(
user_comparator(), merge_operator_, info_log_, db_statistics_, user_comparator(), merge_operator_, info_log_, db_statistics_,
status->ok() ? GetContext::kNotFound : GetContext::kMerge, user_key, status->ok() ? GetContext::kNotFound : GetContext::kMerge, user_key,
do_merge ? value : nullptr, value_found, merge_context, do_merge, do_merge ? value : nullptr, do_merge ? timestamp : nullptr, value_found,
max_covering_tombstone_seq, this->env_, seq, merge_context, do_merge, max_covering_tombstone_seq, this->env_, seq,
merge_operator_ ? &pinned_iters_mgr : nullptr, callback, is_blob, merge_operator_ ? &pinned_iters_mgr : nullptr, callback, is_blob,
tracing_get_id); tracing_get_id);
@ -1918,8 +1918,8 @@ void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range,
get_ctx.emplace_back( get_ctx.emplace_back(
user_comparator(), merge_operator_, info_log_, db_statistics_, user_comparator(), merge_operator_, info_log_, db_statistics_,
iter->s->ok() ? GetContext::kNotFound : GetContext::kMerge, iter->ukey, iter->s->ok() ? GetContext::kNotFound : GetContext::kMerge, iter->ukey,
iter->value, nullptr, &(iter->merge_context), true, iter->value, /*timestamp*/ nullptr, nullptr, &(iter->merge_context),
&iter->max_covering_tombstone_seq, this->env_, nullptr, true, &iter->max_covering_tombstone_seq, this->env_, nullptr,
merge_operator_ ? &pinned_iters_mgr : nullptr, callback, is_blob, merge_operator_ ? &pinned_iters_mgr : nullptr, callback, is_blob,
tracing_mget_id); tracing_mget_id);
// MergeInProgress status, if set, has been transferred to the get_context // MergeInProgress status, if set, has been transferred to the get_context

View File

@ -595,7 +595,7 @@ class Version {
// merge_context.operands_list and don't merge the operands // merge_context.operands_list and don't merge the operands
// REQUIRES: lock is not held // REQUIRES: lock is not held
void Get(const ReadOptions&, const LookupKey& key, PinnableSlice* value, void Get(const ReadOptions&, const LookupKey& key, PinnableSlice* value,
Status* status, MergeContext* merge_context, std::string* timestamp, Status* status, MergeContext* merge_context,
SequenceNumber* max_covering_tombstone_seq, SequenceNumber* max_covering_tombstone_seq,
bool* value_found = nullptr, bool* key_exists = nullptr, bool* value_found = nullptr, bool* key_exists = nullptr,
SequenceNumber* seq = nullptr, ReadCallback* callback = nullptr, SequenceNumber* seq = nullptr, ReadCallback* callback = nullptr,

View File

@ -388,6 +388,9 @@ class DB {
// If the database contains an entry for "key" store the // If the database contains an entry for "key" store the
// corresponding value in *value and return OK. // corresponding value in *value and return OK.
// //
// If timestamp is enabled and a non-null timestamp pointer is passed in,
// timestamp is returned.
//
// If there is no entry for "key" leave *value unchanged and return // If there is no entry for "key" leave *value unchanged and return
// a status for which Status::IsNotFound() returns true. // a status for which Status::IsNotFound() returns true.
// //
@ -412,6 +415,32 @@ class DB {
return Get(options, DefaultColumnFamily(), key, value); return Get(options, DefaultColumnFamily(), key, value);
} }
// Get() methods that return timestamp. Derived DB classes don't need to worry
// about this group of methods if they don't care about timestamp feature.
virtual inline Status Get(const ReadOptions& options,
ColumnFamilyHandle* column_family, const Slice& key,
std::string* value, std::string* timestamp) {
assert(value != nullptr);
PinnableSlice pinnable_val(value);
assert(!pinnable_val.IsPinned());
auto s = Get(options, column_family, key, &pinnable_val, timestamp);
if (s.ok() && pinnable_val.IsPinned()) {
value->assign(pinnable_val.data(), pinnable_val.size());
} // else value is already assigned
return s;
}
virtual Status Get(const ReadOptions& /*options*/,
ColumnFamilyHandle* /*column_family*/,
const Slice& /*key*/, PinnableSlice* /*value*/,
std::string* /*timestamp*/) {
return Status::NotSupported(
"Get() that returns timestamp is not implemented.");
}
virtual Status Get(const ReadOptions& options, const Slice& key,
std::string* value, std::string* timestamp) {
return Get(options, DefaultColumnFamily(), key, value, timestamp);
}
// Returns all the merge operands corresponding to the key. If the // Returns all the merge operands corresponding to the key. If the
// number of merge operands in DB is greater than // number of merge operands in DB is greater than
// merge_operands_options.expected_max_number_of_operands // merge_operands_options.expected_max_number_of_operands
@ -542,17 +571,33 @@ class DB {
virtual bool KeyMayExist(const ReadOptions& /*options*/, virtual bool KeyMayExist(const ReadOptions& /*options*/,
ColumnFamilyHandle* /*column_family*/, ColumnFamilyHandle* /*column_family*/,
const Slice& /*key*/, std::string* /*value*/, const Slice& /*key*/, std::string* /*value*/,
std::string* /*timestamp*/,
bool* value_found = nullptr) { bool* value_found = nullptr) {
if (value_found != nullptr) { if (value_found != nullptr) {
*value_found = false; *value_found = false;
} }
return true; return true;
} }
virtual bool KeyMayExist(const ReadOptions& options,
ColumnFamilyHandle* column_family, const Slice& key,
std::string* value, bool* value_found = nullptr) {
return KeyMayExist(options, column_family, key, value,
/*timestamp=*/nullptr, value_found);
}
virtual bool KeyMayExist(const ReadOptions& options, const Slice& key, virtual bool KeyMayExist(const ReadOptions& options, const Slice& key,
std::string* value, bool* value_found = nullptr) { std::string* value, bool* value_found = nullptr) {
return KeyMayExist(options, DefaultColumnFamily(), key, value, value_found); return KeyMayExist(options, DefaultColumnFamily(), key, value, value_found);
} }
virtual bool KeyMayExist(const ReadOptions& options, const Slice& key,
std::string* value, std::string* timestamp,
bool* value_found = nullptr) {
return KeyMayExist(options, DefaultColumnFamily(), key, value, timestamp,
value_found);
}
// Return a heap-allocated iterator over the contents of the database. // Return a heap-allocated iterator over the contents of the database.
// The result of NewIterator() is initially invalid (caller must // The result of NewIterator() is initially invalid (caller must
// call one of the Seek methods on the iterator before using it). // call one of the Seek methods on the iterator before using it).

View File

@ -41,10 +41,11 @@ void appendToReplayLog(std::string* replay_log, ValueType type, Slice value) {
GetContext::GetContext( GetContext::GetContext(
const Comparator* ucmp, const MergeOperator* merge_operator, Logger* logger, const Comparator* ucmp, const MergeOperator* merge_operator, Logger* logger,
Statistics* statistics, GetState init_state, const Slice& user_key, Statistics* statistics, GetState init_state, const Slice& user_key,
PinnableSlice* pinnable_val, bool* value_found, MergeContext* merge_context, PinnableSlice* pinnable_val, std::string* timestamp, bool* value_found,
bool do_merge, SequenceNumber* _max_covering_tombstone_seq, Env* env, MergeContext* merge_context, bool do_merge,
SequenceNumber* seq, PinnedIteratorsManager* _pinned_iters_mgr, SequenceNumber* _max_covering_tombstone_seq, Env* env, SequenceNumber* seq,
ReadCallback* callback, bool* is_blob_index, uint64_t tracing_get_id) PinnedIteratorsManager* _pinned_iters_mgr, ReadCallback* callback,
bool* is_blob_index, uint64_t tracing_get_id)
: ucmp_(ucmp), : ucmp_(ucmp),
merge_operator_(merge_operator), merge_operator_(merge_operator),
logger_(logger), logger_(logger),
@ -52,6 +53,7 @@ GetContext::GetContext(
state_(init_state), state_(init_state),
user_key_(user_key), user_key_(user_key),
pinnable_val_(pinnable_val), pinnable_val_(pinnable_val),
timestamp_(timestamp),
value_found_(value_found), value_found_(value_found),
merge_context_(merge_context), merge_context_(merge_context),
max_covering_tombstone_seq_(_max_covering_tombstone_seq), max_covering_tombstone_seq_(_max_covering_tombstone_seq),
@ -69,6 +71,18 @@ GetContext::GetContext(
sample_ = should_sample_file_read(); sample_ = should_sample_file_read();
} }
GetContext::GetContext(
const Comparator* ucmp, const MergeOperator* merge_operator, Logger* logger,
Statistics* statistics, GetState init_state, const Slice& user_key,
PinnableSlice* pinnable_val, bool* value_found, MergeContext* merge_context,
bool do_merge, SequenceNumber* _max_covering_tombstone_seq, Env* env,
SequenceNumber* seq, PinnedIteratorsManager* _pinned_iters_mgr,
ReadCallback* callback, bool* is_blob_index, uint64_t tracing_get_id)
: GetContext(ucmp, merge_operator, logger, statistics, init_state, user_key,
pinnable_val, nullptr, value_found, merge_context, do_merge,
_max_covering_tombstone_seq, env, seq, _pinned_iters_mgr,
callback, is_blob_index, tracing_get_id) {}
// Called from TableCache::Get and Table::Get when file/block in which // Called from TableCache::Get and Table::Get when file/block in which
// key may exist are not there in TableCache/BlockCache respectively. In this // key may exist are not there in TableCache/BlockCache respectively. In this
// case we can't guarantee that key does not exist and are not permitted to do // case we can't guarantee that key does not exist and are not permitted to do
@ -256,6 +270,13 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key,
push_operand(value, value_pinner); push_operand(value, value_pinner);
} }
} }
if (state_ == kFound) {
size_t ts_sz = ucmp_->timestamp_size();
if (ts_sz > 0 && timestamp_ != nullptr) {
Slice ts = ExtractTimestampFromUserKey(parsed_key.user_key, ts_sz);
timestamp_->assign(ts.data(), ts.size());
}
}
if (is_blob_index_ != nullptr) { if (is_blob_index_ != nullptr) {
*is_blob_index_ = (type == kTypeBlobIndex); *is_blob_index_ = (type == kTypeBlobIndex);
} }

View File

@ -87,7 +87,17 @@ class GetContext {
// merge_context and they are never merged. The value pointer is untouched. // merge_context and they are never merged. The value pointer is untouched.
GetContext(const Comparator* ucmp, const MergeOperator* merge_operator, GetContext(const Comparator* ucmp, const MergeOperator* merge_operator,
Logger* logger, Statistics* statistics, GetState init_state, Logger* logger, Statistics* statistics, GetState init_state,
const Slice& user_key, PinnableSlice* value, bool* value_found, const Slice& user_key, PinnableSlice* value,
bool* value_found, MergeContext* merge_context, bool do_merge,
SequenceNumber* max_covering_tombstone_seq, Env* env,
SequenceNumber* seq = nullptr,
PinnedIteratorsManager* _pinned_iters_mgr = nullptr,
ReadCallback* callback = nullptr, bool* is_blob_index = nullptr,
uint64_t tracing_get_id = 0);
GetContext(const Comparator* ucmp, const MergeOperator* merge_operator,
Logger* logger, Statistics* statistics, GetState init_state,
const Slice& user_key, PinnableSlice* value,
std::string* timestamp, bool* value_found,
MergeContext* merge_context, bool do_merge, MergeContext* merge_context, bool do_merge,
SequenceNumber* max_covering_tombstone_seq, Env* env, SequenceNumber* max_covering_tombstone_seq, Env* env,
SequenceNumber* seq = nullptr, SequenceNumber* seq = nullptr,
@ -159,6 +169,7 @@ class GetContext {
GetState state_; GetState state_;
Slice user_key_; Slice user_key_;
PinnableSlice* pinnable_val_; PinnableSlice* pinnable_val_;
std::string* timestamp_;
bool* value_found_; // Is value set correctly? Used by KeyMayExist bool* value_found_; // Is value set correctly? Used by KeyMayExist
MergeContext* merge_context_; MergeContext* merge_context_;
SequenceNumber* max_covering_tombstone_seq_; SequenceNumber* max_covering_tombstone_seq_;

View File

@ -1552,7 +1552,8 @@ Status BlobDBImpl::GetRawBlobFromFile(const Slice& key, uint64_t file_number,
Status BlobDBImpl::Get(const ReadOptions& read_options, Status BlobDBImpl::Get(const ReadOptions& read_options,
ColumnFamilyHandle* column_family, const Slice& key, ColumnFamilyHandle* column_family, const Slice& key,
PinnableSlice* value) { PinnableSlice* value) {
return Get(read_options, column_family, key, value, nullptr /*expiration*/); return Get(read_options, column_family, key, value,
static_cast<uint64_t*>(nullptr) /*expiration*/);
} }
Status BlobDBImpl::Get(const ReadOptions& read_options, Status BlobDBImpl::Get(const ReadOptions& read_options,