Fix timestamp support for MultiGet (#6748)
Summary: 1. Avoid nullptr dereference when passing timestamp to KeyContext creation. 2. Construct LookupKey correctly with timestamp when creating MultiGetContext. 3. Compare without timestamp when sorting KeyContexts. Fixes https://github.com/facebook/rocksdb/issues/6745 Test plan (dev server): make check Pull Request resolved: https://github.com/facebook/rocksdb/pull/6748 Reviewed By: pdillinger Differential Revision: D21258691 Pulled By: riversand963 fbshipit-source-id: 44e65b759c18b9986947783edf03be4f890bb004
This commit is contained in:
parent
4cd859edf1
commit
d4398e08fc
@ -5,6 +5,9 @@
|
||||
* Finish implementation of BlockBasedTableOptions::IndexType::kBinarySearchWithFirstKey. It's now ready for use. Significantly reduces read amplification in some setups, especially for iterator seeks.
|
||||
* Fix a bug by updating CURRENT file so that it points to the correct MANIFEST file after best-efforts recovery.
|
||||
* Fixed a bug where ColumnFamilyHandle objects were not cleaned up in case an error happened during BlobDB's open after the base DB had been opened.
|
||||
* Fix a potential undefined behavior caused by trying to dereference nullable pointer (timestamp argument) in DB::MultiGet.
|
||||
* Fix a bug caused by not including user timestamp in MultiGet LookupKey construction. This can lead to wrong query result since the trailing bytes of a user key, if not shorter than timestamp, will be mistaken for user timestamp.
|
||||
* Fix a bug caused by using wrong compare function when sorting the input keys of MultiGet with timestamps.
|
||||
|
||||
### Public API Change
|
||||
* Add a ConfigOptions argument to the APIs dealing with converting options to and from strings and files. The ConfigOptions is meant to replace some of the options (such as input_strings_escaped and ignore_unknown_options) and allow for more parameters to be passed in the future without changing the function signature.
|
||||
|
@ -1535,6 +1535,19 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, const Slice& key,
|
||||
return Status::NotSupported("ReadOptions deadline is not supported");
|
||||
}
|
||||
|
||||
#ifndef NDEBUG
|
||||
assert(get_impl_options.column_family);
|
||||
ColumnFamilyHandle* cf = get_impl_options.column_family;
|
||||
const Comparator* const ucmp = cf->GetComparator();
|
||||
assert(ucmp);
|
||||
if (ucmp->timestamp_size() > 0) {
|
||||
assert(read_options.timestamp);
|
||||
assert(read_options.timestamp->size() == ucmp->timestamp_size());
|
||||
} else {
|
||||
assert(!read_options.timestamp);
|
||||
}
|
||||
#endif // NDEBUG
|
||||
|
||||
PERF_CPU_TIMER_GUARD(get_cpu_nanos, env_);
|
||||
StopWatch sw(env_, stats_, DB_GET);
|
||||
PERF_TIMER_GUARD(get_snapshot_time);
|
||||
@ -1710,7 +1723,7 @@ std::vector<Status> DBImpl::MultiGet(
|
||||
const std::vector<ColumnFamilyHandle*>& column_family,
|
||||
const std::vector<Slice>& keys, std::vector<std::string>* values) {
|
||||
return MultiGet(read_options, column_family, keys, values,
|
||||
/*timestamps*/ nullptr);
|
||||
/*timestamps=*/nullptr);
|
||||
}
|
||||
|
||||
std::vector<Status> DBImpl::MultiGet(
|
||||
@ -1722,6 +1735,20 @@ std::vector<Status> DBImpl::MultiGet(
|
||||
StopWatch sw(env_, stats_, DB_MULTIGET);
|
||||
PERF_TIMER_GUARD(get_snapshot_time);
|
||||
|
||||
#ifndef NDEBUG
|
||||
for (const auto* cfh : column_family) {
|
||||
assert(cfh);
|
||||
const Comparator* const ucmp = cfh->GetComparator();
|
||||
assert(ucmp);
|
||||
if (ucmp->timestamp_size() > 0) {
|
||||
assert(read_options.timestamp);
|
||||
assert(ucmp->timestamp_size() == read_options.timestamp->size());
|
||||
} else {
|
||||
assert(!read_options.timestamp);
|
||||
}
|
||||
}
|
||||
#endif // NDEBUG
|
||||
|
||||
SequenceNumber consistent_seqnum;
|
||||
|
||||
std::unordered_map<uint32_t, MultiGetColumnFamilyData> multiget_cf_data(
|
||||
@ -1981,7 +2008,7 @@ void DBImpl::MultiGet(const ReadOptions& read_options, const size_t num_keys,
|
||||
PinnableSlice* values, Status* statuses,
|
||||
const bool sorted_input) {
|
||||
return MultiGet(read_options, num_keys, column_families, keys, values,
|
||||
/*timestamps*/ nullptr, statuses, sorted_input);
|
||||
/*timestamps=*/nullptr, statuses, sorted_input);
|
||||
}
|
||||
|
||||
void DBImpl::MultiGet(const ReadOptions& read_options, const size_t num_keys,
|
||||
@ -1991,12 +2018,29 @@ void DBImpl::MultiGet(const ReadOptions& read_options, const size_t num_keys,
|
||||
if (num_keys == 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
#ifndef NDEBUG
|
||||
for (size_t i = 0; i < num_keys; ++i) {
|
||||
ColumnFamilyHandle* cfh = column_families[i];
|
||||
assert(cfh);
|
||||
const Comparator* const ucmp = cfh->GetComparator();
|
||||
assert(ucmp);
|
||||
if (ucmp->timestamp_size() > 0) {
|
||||
assert(read_options.timestamp);
|
||||
assert(read_options.timestamp->size() == ucmp->timestamp_size());
|
||||
} else {
|
||||
assert(!read_options.timestamp);
|
||||
}
|
||||
}
|
||||
#endif // NDEBUG
|
||||
|
||||
autovector<KeyContext, MultiGetContext::MAX_BATCH_SIZE> key_context;
|
||||
autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE> sorted_keys;
|
||||
sorted_keys.resize(num_keys);
|
||||
for (size_t i = 0; i < num_keys; ++i) {
|
||||
key_context.emplace_back(column_families[i], keys[i], &values[i],
|
||||
×tamps[i], &statuses[i]);
|
||||
timestamps ? ×tamps[i] : nullptr,
|
||||
&statuses[i]);
|
||||
}
|
||||
for (size_t i = 0; i < num_keys; ++i) {
|
||||
sorted_keys[i] = &key_context[i];
|
||||
@ -2083,7 +2127,8 @@ struct CompareKeyContext {
|
||||
}
|
||||
|
||||
// Both keys are from the same column family
|
||||
int cmp = comparator->Compare(*(lhs->key), *(rhs->key));
|
||||
int cmp = comparator->CompareWithoutTimestamp(
|
||||
*(lhs->key), /*a_has_ts=*/false, *(rhs->key), /*b_has_ts=*/false);
|
||||
if (cmp < 0) {
|
||||
return true;
|
||||
}
|
||||
@ -2115,7 +2160,8 @@ void DBImpl::PrepareMultiGetKeys(
|
||||
}
|
||||
|
||||
// Both keys are from the same column family
|
||||
int cmp = comparator->Compare(*(lhs->key), *(rhs->key));
|
||||
int cmp = comparator->CompareWithoutTimestamp(
|
||||
*(lhs->key), /*a_has_ts=*/false, *(rhs->key), /*b_has_ts=*/false);
|
||||
assert(cmp <= 0);
|
||||
}
|
||||
index++;
|
||||
@ -2244,7 +2290,7 @@ Status DBImpl::MultiGetImpl(
|
||||
? MultiGetContext::MAX_BATCH_SIZE
|
||||
: keys_left;
|
||||
MultiGetContext ctx(sorted_keys, start_key + num_keys - keys_left,
|
||||
batch_size, snapshot);
|
||||
batch_size, snapshot, read_options);
|
||||
MultiGetRange range = ctx.GetMultiGetRange();
|
||||
bool lookup_current = false;
|
||||
|
||||
|
@ -1820,6 +1820,8 @@ Status DB::Put(const WriteOptions& opt, ColumnFamilyHandle* column_family,
|
||||
const Slice* ts = opt.timestamp;
|
||||
assert(nullptr != ts);
|
||||
size_t ts_sz = ts->size();
|
||||
assert(column_family->GetComparator());
|
||||
assert(ts_sz == column_family->GetComparator()->timestamp_size());
|
||||
WriteBatch batch(key.size() + ts_sz + value.size() + 24, /*max_bytes=*/0,
|
||||
ts_sz);
|
||||
Status s = batch.Put(column_family, key, value);
|
||||
|
@ -774,6 +774,55 @@ TEST_F(DBBasicTestWithTimestamp, BatchWriteAndMultiGet) {
|
||||
Close();
|
||||
}
|
||||
|
||||
TEST_F(DBBasicTestWithTimestamp, MultiGetNoReturnTs) {
|
||||
Options options = CurrentOptions();
|
||||
options.env = env_;
|
||||
const size_t kTimestampSize = Timestamp(0, 0).size();
|
||||
TestComparator test_cmp(kTimestampSize);
|
||||
options.comparator = &test_cmp;
|
||||
DestroyAndReopen(options);
|
||||
WriteOptions write_opts;
|
||||
std::string ts_str = Timestamp(1, 0);
|
||||
Slice ts = ts_str;
|
||||
write_opts.timestamp = &ts;
|
||||
ASSERT_OK(db_->Put(write_opts, "foo", "value"));
|
||||
ASSERT_OK(db_->Put(write_opts, "bar", "value"));
|
||||
ASSERT_OK(db_->Put(write_opts, "fooxxxxxxxxxxxxxxxx", "value"));
|
||||
ASSERT_OK(db_->Put(write_opts, "barxxxxxxxxxxxxxxxx", "value"));
|
||||
ColumnFamilyHandle* cfh = dbfull()->DefaultColumnFamily();
|
||||
ts_str = Timestamp(2, 0);
|
||||
ts = ts_str;
|
||||
ReadOptions read_opts;
|
||||
read_opts.timestamp = &ts;
|
||||
{
|
||||
ColumnFamilyHandle* column_families[] = {cfh, cfh};
|
||||
Slice keys[] = {"foo", "bar"};
|
||||
PinnableSlice values[] = {PinnableSlice(), PinnableSlice()};
|
||||
Status statuses[] = {Status::OK(), Status::OK()};
|
||||
dbfull()->MultiGet(read_opts, /*num_keys=*/2, &column_families[0], &keys[0],
|
||||
&values[0], &statuses[0], /*sorted_input=*/false);
|
||||
for (const auto& s : statuses) {
|
||||
ASSERT_OK(s);
|
||||
}
|
||||
}
|
||||
{
|
||||
ColumnFamilyHandle* column_families[] = {cfh, cfh, cfh, cfh};
|
||||
// Make user keys longer than configured timestamp size (16 bytes) to
|
||||
// verify RocksDB does not use the trailing bytes 'x' as timestamp.
|
||||
Slice keys[] = {"fooxxxxxxxxxxxxxxxx", "barxxxxxxxxxxxxxxxx", "foo", "bar"};
|
||||
PinnableSlice values[] = {PinnableSlice(), PinnableSlice(), PinnableSlice(),
|
||||
PinnableSlice()};
|
||||
Status statuses[] = {Status::OK(), Status::OK(), Status::OK(),
|
||||
Status::OK()};
|
||||
dbfull()->MultiGet(read_opts, /*num_keys=*/4, &column_families[0], &keys[0],
|
||||
&values[0], &statuses[0], /*sorted_input=*/false);
|
||||
for (const auto& s : statuses) {
|
||||
ASSERT_OK(s);
|
||||
}
|
||||
}
|
||||
Close();
|
||||
}
|
||||
|
||||
#endif // !ROCKSDB_LITE
|
||||
|
||||
INSTANTIATE_TEST_CASE_P(
|
||||
|
@ -92,7 +92,8 @@ class MultiGetContext {
|
||||
static const int MAX_BATCH_SIZE = 32;
|
||||
|
||||
MultiGetContext(autovector<KeyContext*, MAX_BATCH_SIZE>* sorted_keys,
|
||||
size_t begin, size_t num_keys, SequenceNumber snapshot)
|
||||
size_t begin, size_t num_keys, SequenceNumber snapshot,
|
||||
const ReadOptions& read_opts)
|
||||
: num_keys_(num_keys),
|
||||
value_mask_(0),
|
||||
lookup_key_ptr_(reinterpret_cast<LookupKey*>(lookup_key_stack_buf)) {
|
||||
@ -106,7 +107,7 @@ class MultiGetContext {
|
||||
// autovector may not be contiguous storage, so make a copy
|
||||
sorted_keys_[iter] = (*sorted_keys)[begin + iter];
|
||||
sorted_keys_[iter]->lkey = new (&lookup_key_ptr_[iter])
|
||||
LookupKey(*sorted_keys_[iter]->key, snapshot);
|
||||
LookupKey(*sorted_keys_[iter]->key, snapshot, read_opts.timestamp);
|
||||
sorted_keys_[iter]->ukey = sorted_keys_[iter]->lkey->user_key();
|
||||
sorted_keys_[iter]->ikey = sorted_keys_[iter]->lkey->internal_key();
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user