Support WBWI for keys having timestamps (#9603)
Summary: This PR supports inserting keys to a `WriteBatchWithIndex` for column families that enable user-defined timestamps and reading the keys back. **The index does not have timestamps.** Writing a key to WBWI is unchanged, because the underlying WriteBatch already supports it. When reading the keys back, we need to make sure to distinguish between keys with and without timestamps before comparison. When user calls `GetFromBatchAndDB()`, no timestamp is needed to query the batch, but a timestamp has to be provided to query the db. The assumption is that data in the batch must be newer than data from the db. Pull Request resolved: https://github.com/facebook/rocksdb/pull/9603 Test Plan: make check Reviewed By: ltamasi Differential Revision: D34354849 Pulled By: riversand963 fbshipit-source-id: d25d1f84e2240ce543e521fa30595082fb8db9a0
This commit is contained in:
parent
8ca433f912
commit
6f12599863
@ -1,4 +1,8 @@
|
|||||||
# Rocksdb Change Log
|
# Rocksdb Change Log
|
||||||
|
## Unreleased
|
||||||
|
### New Features
|
||||||
|
* Allow WriteBatchWithIndex to index a WriteBatch that includes keys with user-defined timestamps. The index itself does not have timestamp.
|
||||||
|
|
||||||
## 7.0.0 (02/20/2022)
|
## 7.0.0 (02/20/2022)
|
||||||
### Bug Fixes
|
### Bug Fixes
|
||||||
* Fixed a major bug in which batched MultiGet could return old values for keys deleted by DeleteRange when memtable Bloom filter is enabled (memtable_prefix_bloom_size_ratio > 0). (The fix includes a substantial MultiGet performance improvement in the unusual case of both memtable_whole_key_filtering and prefix_extractor.)
|
* Fixed a major bug in which batched MultiGet could return old values for keys deleted by DeleteRange when memtable Bloom filter is enabled (memtable_prefix_bloom_size_ratio > 0). (The fix includes a substantial MultiGet performance improvement in the unusual case of both memtable_whole_key_filtering and prefix_extractor.)
|
||||||
|
@ -800,6 +800,7 @@ Status WriteBatch::Put(ColumnFamilyHandle* column_family, const Slice& key,
|
|||||||
}
|
}
|
||||||
|
|
||||||
needs_in_place_update_ts_ = true;
|
needs_in_place_update_ts_ = true;
|
||||||
|
has_key_with_ts_ = true;
|
||||||
std::string dummy_ts(ts_sz, '\0');
|
std::string dummy_ts(ts_sz, '\0');
|
||||||
std::array<Slice, 2> key_with_ts{{key, dummy_ts}};
|
std::array<Slice, 2> key_with_ts{{key, dummy_ts}};
|
||||||
return WriteBatchInternal::Put(this, cf_id, SliceParts(key_with_ts.data(), 2),
|
return WriteBatchInternal::Put(this, cf_id, SliceParts(key_with_ts.data(), 2),
|
||||||
@ -812,6 +813,7 @@ Status WriteBatch::Put(ColumnFamilyHandle* column_family, const Slice& key,
|
|||||||
if (!s.ok()) {
|
if (!s.ok()) {
|
||||||
return s;
|
return s;
|
||||||
}
|
}
|
||||||
|
has_key_with_ts_ = true;
|
||||||
assert(column_family);
|
assert(column_family);
|
||||||
uint32_t cf_id = column_family->GetID();
|
uint32_t cf_id = column_family->GetID();
|
||||||
std::array<Slice, 2> key_with_ts{{key, ts}};
|
std::array<Slice, 2> key_with_ts{{key, ts}};
|
||||||
@ -1002,6 +1004,7 @@ Status WriteBatch::Delete(ColumnFamilyHandle* column_family, const Slice& key) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
needs_in_place_update_ts_ = true;
|
needs_in_place_update_ts_ = true;
|
||||||
|
has_key_with_ts_ = true;
|
||||||
std::string dummy_ts(ts_sz, '\0');
|
std::string dummy_ts(ts_sz, '\0');
|
||||||
std::array<Slice, 2> key_with_ts{{key, dummy_ts}};
|
std::array<Slice, 2> key_with_ts{{key, dummy_ts}};
|
||||||
return WriteBatchInternal::Delete(this, cf_id,
|
return WriteBatchInternal::Delete(this, cf_id,
|
||||||
@ -1015,6 +1018,7 @@ Status WriteBatch::Delete(ColumnFamilyHandle* column_family, const Slice& key,
|
|||||||
return s;
|
return s;
|
||||||
}
|
}
|
||||||
assert(column_family);
|
assert(column_family);
|
||||||
|
has_key_with_ts_ = true;
|
||||||
uint32_t cf_id = column_family->GetID();
|
uint32_t cf_id = column_family->GetID();
|
||||||
std::array<Slice, 2> key_with_ts{{key, ts}};
|
std::array<Slice, 2> key_with_ts{{key, ts}};
|
||||||
return WriteBatchInternal::Delete(this, cf_id,
|
return WriteBatchInternal::Delete(this, cf_id,
|
||||||
@ -1115,6 +1119,7 @@ Status WriteBatch::SingleDelete(ColumnFamilyHandle* column_family,
|
|||||||
}
|
}
|
||||||
|
|
||||||
needs_in_place_update_ts_ = true;
|
needs_in_place_update_ts_ = true;
|
||||||
|
has_key_with_ts_ = true;
|
||||||
std::string dummy_ts(ts_sz, '\0');
|
std::string dummy_ts(ts_sz, '\0');
|
||||||
std::array<Slice, 2> key_with_ts{{key, dummy_ts}};
|
std::array<Slice, 2> key_with_ts{{key, dummy_ts}};
|
||||||
return WriteBatchInternal::SingleDelete(this, cf_id,
|
return WriteBatchInternal::SingleDelete(this, cf_id,
|
||||||
@ -1127,6 +1132,7 @@ Status WriteBatch::SingleDelete(ColumnFamilyHandle* column_family,
|
|||||||
if (!s.ok()) {
|
if (!s.ok()) {
|
||||||
return s;
|
return s;
|
||||||
}
|
}
|
||||||
|
has_key_with_ts_ = true;
|
||||||
assert(column_family);
|
assert(column_family);
|
||||||
uint32_t cf_id = column_family->GetID();
|
uint32_t cf_id = column_family->GetID();
|
||||||
std::array<Slice, 2> key_with_ts{{key, ts}};
|
std::array<Slice, 2> key_with_ts{{key, ts}};
|
||||||
|
@ -228,6 +228,10 @@ class WriteBatchInternal {
|
|||||||
static bool TimestampsUpdateNeeded(const WriteBatch& wb) {
|
static bool TimestampsUpdateNeeded(const WriteBatch& wb) {
|
||||||
return wb.needs_in_place_update_ts_;
|
return wb.needs_in_place_update_ts_;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static bool HasKeyWithTimestamp(const WriteBatch& wb) {
|
||||||
|
return wb.has_key_with_ts_;
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
// LocalSavePoint is similar to a scope guard
|
// LocalSavePoint is similar to a scope guard
|
||||||
|
@ -1009,6 +1009,29 @@ TEST_F(WriteBatchTest, UpdateTimestamps) {
|
|||||||
{4, cf4.GetComparator()},
|
{4, cf4.GetComparator()},
|
||||||
{5, cf5.GetComparator()}};
|
{5, cf5.GetComparator()}};
|
||||||
|
|
||||||
|
static constexpr size_t timestamp_size = sizeof(uint64_t);
|
||||||
|
|
||||||
|
{
|
||||||
|
WriteBatch wb1, wb2, wb3, wb4, wb5, wb6, wb7;
|
||||||
|
ASSERT_OK(wb1.Put(&cf0, "key", "value"));
|
||||||
|
ASSERT_FALSE(WriteBatchInternal::HasKeyWithTimestamp(wb1));
|
||||||
|
ASSERT_OK(wb2.Put(&cf4, "key", "value"));
|
||||||
|
ASSERT_TRUE(WriteBatchInternal::HasKeyWithTimestamp(wb2));
|
||||||
|
ASSERT_OK(wb3.Put(&cf4, "key", /*ts=*/std::string(timestamp_size, '\xfe'),
|
||||||
|
"value"));
|
||||||
|
ASSERT_TRUE(WriteBatchInternal::HasKeyWithTimestamp(wb3));
|
||||||
|
ASSERT_OK(wb4.Delete(&cf4, "key",
|
||||||
|
/*ts=*/std::string(timestamp_size, '\xfe')));
|
||||||
|
ASSERT_TRUE(WriteBatchInternal::HasKeyWithTimestamp(wb4));
|
||||||
|
ASSERT_OK(wb5.Delete(&cf4, "key"));
|
||||||
|
ASSERT_TRUE(WriteBatchInternal::HasKeyWithTimestamp(wb5));
|
||||||
|
ASSERT_OK(wb6.SingleDelete(&cf4, "key"));
|
||||||
|
ASSERT_TRUE(WriteBatchInternal::HasKeyWithTimestamp(wb6));
|
||||||
|
ASSERT_OK(wb7.SingleDelete(&cf4, "key",
|
||||||
|
/*ts=*/std::string(timestamp_size, '\xfe')));
|
||||||
|
ASSERT_TRUE(WriteBatchInternal::HasKeyWithTimestamp(wb7));
|
||||||
|
}
|
||||||
|
|
||||||
WriteBatch batch;
|
WriteBatch batch;
|
||||||
// Write to the batch. We will assign timestamps later.
|
// Write to the batch. We will assign timestamps later.
|
||||||
for (const auto& key_str : key_strs) {
|
for (const auto& key_str : key_strs) {
|
||||||
@ -1017,7 +1040,6 @@ TEST_F(WriteBatchTest, UpdateTimestamps) {
|
|||||||
ASSERT_OK(batch.Put(&cf5, key_str, "value"));
|
ASSERT_OK(batch.Put(&cf5, key_str, "value"));
|
||||||
}
|
}
|
||||||
|
|
||||||
static constexpr size_t timestamp_size = sizeof(uint64_t);
|
|
||||||
const auto checker1 = [](uint32_t cf) {
|
const auto checker1 = [](uint32_t cf) {
|
||||||
if (cf == 4 || cf == 5) {
|
if (cf == 4 || cf == 5) {
|
||||||
return timestamp_size;
|
return timestamp_size;
|
||||||
|
@ -275,6 +275,7 @@ class WriteBatchWithIndex : public WriteBatchBase {
|
|||||||
friend class WritePreparedTxn;
|
friend class WritePreparedTxn;
|
||||||
friend class WriteUnpreparedTxn;
|
friend class WriteUnpreparedTxn;
|
||||||
friend class WriteBatchWithIndex_SubBatchCnt_Test;
|
friend class WriteBatchWithIndex_SubBatchCnt_Test;
|
||||||
|
friend class WriteBatchWithIndexInternal;
|
||||||
// Returns the number of sub-batches inside the write batch. A sub-batch
|
// Returns the number of sub-batches inside the write batch. A sub-batch
|
||||||
// starts right before inserting a key that is a duplicate of a key in the
|
// starts right before inserting a key that is a duplicate of a key in the
|
||||||
// last sub-batch.
|
// last sub-batch.
|
||||||
|
@ -444,6 +444,10 @@ class WriteBatch : public WriteBatchBase {
|
|||||||
// timestamps to desired values.
|
// timestamps to desired values.
|
||||||
bool needs_in_place_update_ts_ = false;
|
bool needs_in_place_update_ts_ = false;
|
||||||
|
|
||||||
|
// True if the write batch contains at least one key from a column family
|
||||||
|
// that enables user-defined timestamp.
|
||||||
|
bool has_key_with_ts_ = false;
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
std::string rep_; // See comment in write_batch.cc for the format of rep_
|
std::string rep_; // See comment in write_batch.cc for the format of rep_
|
||||||
};
|
};
|
||||||
|
@ -125,7 +125,9 @@ Status TransactionBaseImpl::TryLock(ColumnFamilyHandle* column_family,
|
|||||||
|
|
||||||
void TransactionBaseImpl::SetSavePoint() {
|
void TransactionBaseImpl::SetSavePoint() {
|
||||||
if (save_points_ == nullptr) {
|
if (save_points_ == nullptr) {
|
||||||
save_points_.reset(new std::stack<TransactionBaseImpl::SavePoint, autovector<TransactionBaseImpl::SavePoint>>());
|
save_points_.reset(
|
||||||
|
new std::stack<TransactionBaseImpl::SavePoint,
|
||||||
|
autovector<TransactionBaseImpl::SavePoint>>());
|
||||||
}
|
}
|
||||||
save_points_->emplace(snapshot_, snapshot_needed_, snapshot_notifier_,
|
save_points_->emplace(snapshot_, snapshot_needed_, snapshot_notifier_,
|
||||||
num_puts_, num_deletes_, num_merges_,
|
num_puts_, num_deletes_, num_merges_,
|
||||||
|
@ -26,7 +26,8 @@ namespace ROCKSDB_NAMESPACE {
|
|||||||
struct WriteBatchWithIndex::Rep {
|
struct WriteBatchWithIndex::Rep {
|
||||||
explicit Rep(const Comparator* index_comparator, size_t reserved_bytes = 0,
|
explicit Rep(const Comparator* index_comparator, size_t reserved_bytes = 0,
|
||||||
size_t max_bytes = 0, bool _overwrite_key = false)
|
size_t max_bytes = 0, bool _overwrite_key = false)
|
||||||
: write_batch(reserved_bytes, max_bytes),
|
: write_batch(reserved_bytes, max_bytes, /*protection_bytes_per_key=*/0,
|
||||||
|
index_comparator ? index_comparator->timestamp_size() : 0),
|
||||||
comparator(index_comparator, &write_batch),
|
comparator(index_comparator, &write_batch),
|
||||||
skip_list(comparator, &arena),
|
skip_list(comparator, &arena),
|
||||||
overwrite_key(_overwrite_key),
|
overwrite_key(_overwrite_key),
|
||||||
@ -144,9 +145,11 @@ void WriteBatchWithIndex::Rep::AddNewEntry(uint32_t column_family_id) {
|
|||||||
wb_data.size() - last_entry_offset);
|
wb_data.size() - last_entry_offset);
|
||||||
// Extract key
|
// Extract key
|
||||||
Slice key;
|
Slice key;
|
||||||
bool success __attribute__((__unused__));
|
bool success =
|
||||||
success =
|
|
||||||
ReadKeyFromWriteBatchEntry(&entry_ptr, &key, column_family_id != 0);
|
ReadKeyFromWriteBatchEntry(&entry_ptr, &key, column_family_id != 0);
|
||||||
|
#ifdef NDEBUG
|
||||||
|
(void)success;
|
||||||
|
#endif
|
||||||
assert(success);
|
assert(success);
|
||||||
|
|
||||||
auto* mem = arena.Allocate(sizeof(WriteBatchIndexEntry));
|
auto* mem = arena.Allocate(sizeof(WriteBatchIndexEntry));
|
||||||
@ -239,6 +242,7 @@ Status WriteBatchWithIndex::Rep::ReBuildIndex() {
|
|||||||
case kTypeBeginUnprepareXID:
|
case kTypeBeginUnprepareXID:
|
||||||
case kTypeEndPrepareXID:
|
case kTypeEndPrepareXID:
|
||||||
case kTypeCommitXID:
|
case kTypeCommitXID:
|
||||||
|
case kTypeCommitXIDAndTimestamp:
|
||||||
case kTypeRollbackXID:
|
case kTypeRollbackXID:
|
||||||
case kTypeNoop:
|
case kTypeNoop:
|
||||||
break;
|
break;
|
||||||
@ -491,6 +495,12 @@ Status WriteBatchWithIndex::GetFromBatchAndDB(DB* db,
|
|||||||
Status WriteBatchWithIndex::GetFromBatchAndDB(
|
Status WriteBatchWithIndex::GetFromBatchAndDB(
|
||||||
DB* db, const ReadOptions& read_options, ColumnFamilyHandle* column_family,
|
DB* db, const ReadOptions& read_options, ColumnFamilyHandle* column_family,
|
||||||
const Slice& key, PinnableSlice* pinnable_val, ReadCallback* callback) {
|
const Slice& key, PinnableSlice* pinnable_val, ReadCallback* callback) {
|
||||||
|
const Comparator* const ucmp = rep->comparator.GetComparator(column_family);
|
||||||
|
size_t ts_sz = ucmp ? ucmp->timestamp_size() : 0;
|
||||||
|
if (ts_sz > 0 && !read_options.timestamp) {
|
||||||
|
return Status::InvalidArgument("Must specify timestamp");
|
||||||
|
}
|
||||||
|
|
||||||
Status s;
|
Status s;
|
||||||
WriteBatchWithIndexInternal wbwii(db, column_family);
|
WriteBatchWithIndexInternal wbwii(db, column_family);
|
||||||
|
|
||||||
@ -555,6 +565,15 @@ void WriteBatchWithIndex::MultiGetFromBatchAndDB(
|
|||||||
DB* db, const ReadOptions& read_options, ColumnFamilyHandle* column_family,
|
DB* db, const ReadOptions& read_options, ColumnFamilyHandle* column_family,
|
||||||
const size_t num_keys, const Slice* keys, PinnableSlice* values,
|
const size_t num_keys, const Slice* keys, PinnableSlice* values,
|
||||||
Status* statuses, bool sorted_input, ReadCallback* callback) {
|
Status* statuses, bool sorted_input, ReadCallback* callback) {
|
||||||
|
const Comparator* const ucmp = rep->comparator.GetComparator(column_family);
|
||||||
|
size_t ts_sz = ucmp ? ucmp->timestamp_size() : 0;
|
||||||
|
if (ts_sz > 0 && !read_options.timestamp) {
|
||||||
|
for (size_t i = 0; i < num_keys; ++i) {
|
||||||
|
statuses[i] = Status::InvalidArgument("Must specify timestamp");
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
WriteBatchWithIndexInternal wbwii(db, column_family);
|
WriteBatchWithIndexInternal wbwii(db, column_family);
|
||||||
|
|
||||||
autovector<KeyContext, MultiGetContext::MAX_BATCH_SIZE> key_context;
|
autovector<KeyContext, MultiGetContext::MAX_BATCH_SIZE> key_context;
|
||||||
@ -657,5 +676,11 @@ size_t WriteBatchWithIndex::GetDataSize() const {
|
|||||||
return rep->write_batch.GetDataSize();
|
return rep->write_batch.GetDataSize();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const Comparator* WriteBatchWithIndexInternal::GetUserComparator(
|
||||||
|
const WriteBatchWithIndex& wbwi, uint32_t cf_id) {
|
||||||
|
const WriteBatchEntryComparator& ucmps = wbwi.rep->comparator;
|
||||||
|
return ucmps.GetComparator(cf_id);
|
||||||
|
}
|
||||||
|
|
||||||
} // namespace ROCKSDB_NAMESPACE
|
} // namespace ROCKSDB_NAMESPACE
|
||||||
#endif // !ROCKSDB_LITE
|
#endif // !ROCKSDB_LITE
|
||||||
|
@ -33,6 +33,7 @@ BaseDeltaIterator::BaseDeltaIterator(ColumnFamilyHandle* column_family,
|
|||||||
comparator_(comparator),
|
comparator_(comparator),
|
||||||
iterate_upper_bound_(read_options ? read_options->iterate_upper_bound
|
iterate_upper_bound_(read_options ? read_options->iterate_upper_bound
|
||||||
: nullptr) {
|
: nullptr) {
|
||||||
|
assert(comparator_);
|
||||||
wbwii_.reset(new WriteBatchWithIndexInternal(column_family));
|
wbwii_.reset(new WriteBatchWithIndexInternal(column_family));
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -95,8 +96,9 @@ void BaseDeltaIterator::Next() {
|
|||||||
AdvanceBase();
|
AdvanceBase();
|
||||||
}
|
}
|
||||||
if (DeltaValid() && BaseValid()) {
|
if (DeltaValid() && BaseValid()) {
|
||||||
if (comparator_->Equal(delta_iterator_->Entry().key,
|
if (0 == comparator_->CompareWithoutTimestamp(
|
||||||
base_iterator_->key())) {
|
delta_iterator_->Entry().key, /*a_has_ts=*/false,
|
||||||
|
base_iterator_->key(), /*b_has_ts=*/false)) {
|
||||||
equal_keys_ = true;
|
equal_keys_ = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -131,8 +133,9 @@ void BaseDeltaIterator::Prev() {
|
|||||||
AdvanceBase();
|
AdvanceBase();
|
||||||
}
|
}
|
||||||
if (DeltaValid() && BaseValid()) {
|
if (DeltaValid() && BaseValid()) {
|
||||||
if (comparator_->Equal(delta_iterator_->Entry().key,
|
if (0 == comparator_->CompareWithoutTimestamp(
|
||||||
base_iterator_->key())) {
|
delta_iterator_->Entry().key, /*a_has_ts=*/false,
|
||||||
|
base_iterator_->key(), /*b_has_ts=*/false)) {
|
||||||
equal_keys_ = true;
|
equal_keys_ = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -218,8 +221,9 @@ void BaseDeltaIterator::AssertInvariants() {
|
|||||||
// we don't support those yet
|
// we don't support those yet
|
||||||
assert(delta_iterator_->Entry().type != kMergeRecord &&
|
assert(delta_iterator_->Entry().type != kMergeRecord &&
|
||||||
delta_iterator_->Entry().type != kLogDataRecord);
|
delta_iterator_->Entry().type != kLogDataRecord);
|
||||||
int compare =
|
int compare = comparator_->CompareWithoutTimestamp(
|
||||||
comparator_->Compare(delta_iterator_->Entry().key, base_iterator_->key());
|
delta_iterator_->Entry().key, /*a_has_ts=*/false, base_iterator_->key(),
|
||||||
|
/*b_has_ts=*/false);
|
||||||
if (forward_) {
|
if (forward_) {
|
||||||
// current_at_base -> compare < 0
|
// current_at_base -> compare < 0
|
||||||
assert(!current_at_base_ || compare < 0);
|
assert(!current_at_base_ || compare < 0);
|
||||||
@ -301,7 +305,9 @@ void BaseDeltaIterator::UpdateCurrent() {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (iterate_upper_bound_) {
|
if (iterate_upper_bound_) {
|
||||||
if (comparator_->Compare(delta_entry.key, *iterate_upper_bound_) >= 0) {
|
if (comparator_->CompareWithoutTimestamp(
|
||||||
|
delta_entry.key, /*a_has_ts=*/false, *iterate_upper_bound_,
|
||||||
|
/*b_has_ts=*/false) >= 0) {
|
||||||
// out of upper bound -> finished.
|
// out of upper bound -> finished.
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@ -319,8 +325,9 @@ void BaseDeltaIterator::UpdateCurrent() {
|
|||||||
return;
|
return;
|
||||||
} else {
|
} else {
|
||||||
int compare =
|
int compare =
|
||||||
(forward_ ? 1 : -1) *
|
(forward_ ? 1 : -1) * comparator_->CompareWithoutTimestamp(
|
||||||
comparator_->Compare(delta_entry.key, base_iterator_->key());
|
delta_entry.key, /*a_has_ts=*/false,
|
||||||
|
base_iterator_->key(), /*b_has_ts=*/false);
|
||||||
if (compare <= 0) { // delta bigger or equal
|
if (compare <= 0) { // delta bigger or equal
|
||||||
if (compare == 0) {
|
if (compare == 0) {
|
||||||
equal_keys_ = true;
|
equal_keys_ = true;
|
||||||
@ -572,12 +579,28 @@ int WriteBatchEntryComparator::CompareKey(uint32_t column_family,
|
|||||||
const Slice& key2) const {
|
const Slice& key2) const {
|
||||||
if (column_family < cf_comparators_.size() &&
|
if (column_family < cf_comparators_.size() &&
|
||||||
cf_comparators_[column_family] != nullptr) {
|
cf_comparators_[column_family] != nullptr) {
|
||||||
return cf_comparators_[column_family]->Compare(key1, key2);
|
return cf_comparators_[column_family]->CompareWithoutTimestamp(
|
||||||
|
key1, /*a_has_ts=*/false, key2, /*b_has_ts=*/false);
|
||||||
} else {
|
} else {
|
||||||
return default_comparator_->Compare(key1, key2);
|
return default_comparator_->CompareWithoutTimestamp(
|
||||||
|
key1, /*a_has_ts=*/false, key2, /*b_has_ts=*/false);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const Comparator* WriteBatchEntryComparator::GetComparator(
|
||||||
|
const ColumnFamilyHandle* column_family) const {
|
||||||
|
return column_family ? column_family->GetComparator() : default_comparator_;
|
||||||
|
}
|
||||||
|
|
||||||
|
const Comparator* WriteBatchEntryComparator::GetComparator(
|
||||||
|
uint32_t column_family) const {
|
||||||
|
if (column_family < cf_comparators_.size() &&
|
||||||
|
cf_comparators_[column_family]) {
|
||||||
|
return cf_comparators_[column_family];
|
||||||
|
}
|
||||||
|
return default_comparator_;
|
||||||
|
}
|
||||||
|
|
||||||
WriteEntry WBWIIteratorImpl::Entry() const {
|
WriteEntry WBWIIteratorImpl::Entry() const {
|
||||||
WriteEntry ret;
|
WriteEntry ret;
|
||||||
Slice blob, xid;
|
Slice blob, xid;
|
||||||
@ -591,6 +614,12 @@ WriteEntry WBWIIteratorImpl::Entry() const {
|
|||||||
assert(ret.type == kPutRecord || ret.type == kDeleteRecord ||
|
assert(ret.type == kPutRecord || ret.type == kDeleteRecord ||
|
||||||
ret.type == kSingleDeleteRecord || ret.type == kDeleteRangeRecord ||
|
ret.type == kSingleDeleteRecord || ret.type == kDeleteRangeRecord ||
|
||||||
ret.type == kMergeRecord);
|
ret.type == kMergeRecord);
|
||||||
|
// Make sure entry.key does not include user-defined timestamp.
|
||||||
|
const Comparator* const ucmp = comparator_->GetComparator(column_family_id_);
|
||||||
|
size_t ts_sz = ucmp->timestamp_size();
|
||||||
|
if (ts_sz > 0) {
|
||||||
|
ret.key = StripTimestampFromUserKey(ret.key, ts_sz);
|
||||||
|
}
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -137,8 +137,11 @@ struct WriteBatchIndexEntry {
|
|||||||
|
|
||||||
class ReadableWriteBatch : public WriteBatch {
|
class ReadableWriteBatch : public WriteBatch {
|
||||||
public:
|
public:
|
||||||
explicit ReadableWriteBatch(size_t reserved_bytes = 0, size_t max_bytes = 0)
|
explicit ReadableWriteBatch(size_t reserved_bytes = 0, size_t max_bytes = 0,
|
||||||
: WriteBatch(reserved_bytes, max_bytes) {}
|
size_t protection_bytes_per_key = 0,
|
||||||
|
size_t default_cf_ts_sz = 0)
|
||||||
|
: WriteBatch(reserved_bytes, max_bytes, protection_bytes_per_key,
|
||||||
|
default_cf_ts_sz) {}
|
||||||
// Retrieve some information from a write entry in the write batch, given
|
// Retrieve some information from a write entry in the write batch, given
|
||||||
// the start offset of the write entry.
|
// the start offset of the write entry.
|
||||||
Status GetEntryFromDataOffset(size_t data_offset, WriteType* type, Slice* Key,
|
Status GetEntryFromDataOffset(size_t data_offset, WriteType* type, Slice* Key,
|
||||||
@ -168,10 +171,15 @@ class WriteBatchEntryComparator {
|
|||||||
|
|
||||||
const Comparator* default_comparator() { return default_comparator_; }
|
const Comparator* default_comparator() { return default_comparator_; }
|
||||||
|
|
||||||
|
const Comparator* GetComparator(
|
||||||
|
const ColumnFamilyHandle* column_family) const;
|
||||||
|
|
||||||
|
const Comparator* GetComparator(uint32_t column_family) const;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
const Comparator* default_comparator_;
|
const Comparator* const default_comparator_;
|
||||||
std::vector<const Comparator*> cf_comparators_;
|
std::vector<const Comparator*> cf_comparators_;
|
||||||
const ReadableWriteBatch* write_batch_;
|
const ReadableWriteBatch* const write_batch_;
|
||||||
};
|
};
|
||||||
|
|
||||||
using WriteBatchEntrySkipList =
|
using WriteBatchEntrySkipList =
|
||||||
@ -179,7 +187,13 @@ using WriteBatchEntrySkipList =
|
|||||||
|
|
||||||
class WBWIIteratorImpl : public WBWIIterator {
|
class WBWIIteratorImpl : public WBWIIterator {
|
||||||
public:
|
public:
|
||||||
enum Result { kFound, kDeleted, kNotFound, kMergeInProgress, kError };
|
enum Result : uint8_t {
|
||||||
|
kFound,
|
||||||
|
kDeleted,
|
||||||
|
kNotFound,
|
||||||
|
kMergeInProgress,
|
||||||
|
kError
|
||||||
|
};
|
||||||
WBWIIteratorImpl(uint32_t column_family_id,
|
WBWIIteratorImpl(uint32_t column_family_id,
|
||||||
WriteBatchEntrySkipList* skip_list,
|
WriteBatchEntrySkipList* skip_list,
|
||||||
const ReadableWriteBatch* write_batch,
|
const ReadableWriteBatch* write_batch,
|
||||||
@ -251,9 +265,9 @@ class WBWIIteratorImpl : public WBWIIterator {
|
|||||||
|
|
||||||
bool MatchesKey(uint32_t cf_id, const Slice& key);
|
bool MatchesKey(uint32_t cf_id, const Slice& key);
|
||||||
|
|
||||||
// Moves the to first entry of the previous key.
|
// Moves the iterator to first entry of the previous key.
|
||||||
void PrevKey();
|
void PrevKey();
|
||||||
// Moves the to first entry of the next key.
|
// Moves the iterator to first entry of the next key.
|
||||||
void NextKey();
|
void NextKey();
|
||||||
|
|
||||||
// Moves the iterator to the Update (Put or Delete) for the current key
|
// Moves the iterator to the Update (Put or Delete) for the current key
|
||||||
@ -280,6 +294,9 @@ class WBWIIteratorImpl : public WBWIIterator {
|
|||||||
|
|
||||||
class WriteBatchWithIndexInternal {
|
class WriteBatchWithIndexInternal {
|
||||||
public:
|
public:
|
||||||
|
static const Comparator* GetUserComparator(const WriteBatchWithIndex& wbwi,
|
||||||
|
uint32_t cf_id);
|
||||||
|
|
||||||
// For GetFromBatchAndDB or similar
|
// For GetFromBatchAndDB or similar
|
||||||
explicit WriteBatchWithIndexInternal(DB* db,
|
explicit WriteBatchWithIndexInternal(DB* db,
|
||||||
ColumnFamilyHandle* column_family);
|
ColumnFamilyHandle* column_family);
|
||||||
|
@ -17,6 +17,7 @@
|
|||||||
#include "db/column_family.h"
|
#include "db/column_family.h"
|
||||||
#include "port/stack_trace.h"
|
#include "port/stack_trace.h"
|
||||||
#include "test_util/testharness.h"
|
#include "test_util/testharness.h"
|
||||||
|
#include "test_util/testutil.h"
|
||||||
#include "util/random.h"
|
#include "util/random.h"
|
||||||
#include "util/string_util.h"
|
#include "util/string_util.h"
|
||||||
#include "utilities/merge_operators.h"
|
#include "utilities/merge_operators.h"
|
||||||
@ -116,7 +117,8 @@ class KVIter : public Iterator {
|
|||||||
};
|
};
|
||||||
|
|
||||||
static std::string PrintContents(WriteBatchWithIndex* batch,
|
static std::string PrintContents(WriteBatchWithIndex* batch,
|
||||||
ColumnFamilyHandle* column_family) {
|
ColumnFamilyHandle* column_family,
|
||||||
|
bool hex = false) {
|
||||||
std::string result;
|
std::string result;
|
||||||
|
|
||||||
WBWIIterator* iter;
|
WBWIIterator* iter;
|
||||||
@ -132,22 +134,22 @@ static std::string PrintContents(WriteBatchWithIndex* batch,
|
|||||||
|
|
||||||
if (e.type == kPutRecord) {
|
if (e.type == kPutRecord) {
|
||||||
result.append("PUT(");
|
result.append("PUT(");
|
||||||
result.append(e.key.ToString());
|
result.append(e.key.ToString(hex));
|
||||||
result.append("):");
|
result.append("):");
|
||||||
result.append(e.value.ToString());
|
result.append(e.value.ToString(hex));
|
||||||
} else if (e.type == kMergeRecord) {
|
} else if (e.type == kMergeRecord) {
|
||||||
result.append("MERGE(");
|
result.append("MERGE(");
|
||||||
result.append(e.key.ToString());
|
result.append(e.key.ToString(hex));
|
||||||
result.append("):");
|
result.append("):");
|
||||||
result.append(e.value.ToString());
|
result.append(e.value.ToString(hex));
|
||||||
} else if (e.type == kSingleDeleteRecord) {
|
} else if (e.type == kSingleDeleteRecord) {
|
||||||
result.append("SINGLE-DEL(");
|
result.append("SINGLE-DEL(");
|
||||||
result.append(e.key.ToString());
|
result.append(e.key.ToString(hex));
|
||||||
result.append(")");
|
result.append(")");
|
||||||
} else {
|
} else {
|
||||||
assert(e.type == kDeleteRecord);
|
assert(e.type == kDeleteRecord);
|
||||||
result.append("DEL(");
|
result.append("DEL(");
|
||||||
result.append(e.key.ToString());
|
result.append(e.key.ToString(hex));
|
||||||
result.append(")");
|
result.append(")");
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -2254,6 +2256,137 @@ TEST_F(WBWIOverwriteTest, TestBadMergeOperator) {
|
|||||||
ASSERT_OK(batch_->GetFromBatch(column_family, options_, "b", &value));
|
ASSERT_OK(batch_->GetFromBatch(column_family, options_, "b", &value));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST_P(WriteBatchWithIndexTest, ColumnFamilyWithTimestamp) {
|
||||||
|
ColumnFamilyHandleImplDummy cf2(2,
|
||||||
|
test::BytewiseComparatorWithU64TsWrapper());
|
||||||
|
|
||||||
|
// Sanity checks
|
||||||
|
ASSERT_TRUE(batch_->Put(&cf2, "key", "ts", "value").IsNotSupported());
|
||||||
|
ASSERT_TRUE(batch_->Put(/*column_family=*/nullptr, "key", "ts", "value")
|
||||||
|
.IsInvalidArgument());
|
||||||
|
ASSERT_TRUE(batch_->Delete(&cf2, "key", "ts").IsNotSupported());
|
||||||
|
ASSERT_TRUE(batch_->Delete(/*column_family=*/nullptr, "key", "ts")
|
||||||
|
.IsInvalidArgument());
|
||||||
|
ASSERT_TRUE(batch_->SingleDelete(&cf2, "key", "ts").IsNotSupported());
|
||||||
|
ASSERT_TRUE(batch_->SingleDelete(/*column_family=*/nullptr, "key", "ts")
|
||||||
|
.IsInvalidArgument());
|
||||||
|
{
|
||||||
|
std::string value;
|
||||||
|
ASSERT_TRUE(batch_
|
||||||
|
->GetFromBatchAndDB(
|
||||||
|
/*db=*/nullptr, ReadOptions(), &cf2, "key", &value)
|
||||||
|
.IsInvalidArgument());
|
||||||
|
}
|
||||||
|
{
|
||||||
|
constexpr size_t num_keys = 2;
|
||||||
|
std::array<Slice, num_keys> keys{{Slice(), Slice()}};
|
||||||
|
std::array<PinnableSlice, num_keys> pinnable_vals{
|
||||||
|
{PinnableSlice(), PinnableSlice()}};
|
||||||
|
std::array<Status, num_keys> statuses{{Status(), Status()}};
|
||||||
|
constexpr bool sorted_input = false;
|
||||||
|
batch_->MultiGetFromBatchAndDB(/*db=*/nullptr, ReadOptions(), &cf2,
|
||||||
|
num_keys, keys.data(), pinnable_vals.data(),
|
||||||
|
statuses.data(), sorted_input);
|
||||||
|
for (const auto& s : statuses) {
|
||||||
|
ASSERT_TRUE(s.IsInvalidArgument());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
constexpr uint32_t kMaxKey = 10;
|
||||||
|
|
||||||
|
const auto ts_sz_lookup = [&cf2](uint32_t id) {
|
||||||
|
if (cf2.GetID() == id) {
|
||||||
|
return sizeof(uint64_t);
|
||||||
|
} else {
|
||||||
|
return std::numeric_limits<size_t>::max();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// Put keys
|
||||||
|
for (uint32_t i = 0; i < kMaxKey; ++i) {
|
||||||
|
std::string key;
|
||||||
|
PutFixed32(&key, i);
|
||||||
|
Status s = batch_->Put(&cf2, key, "value" + std::to_string(i));
|
||||||
|
ASSERT_OK(s);
|
||||||
|
}
|
||||||
|
|
||||||
|
WriteBatch* wb = batch_->GetWriteBatch();
|
||||||
|
assert(wb);
|
||||||
|
ASSERT_OK(
|
||||||
|
wb->UpdateTimestamps(std::string(sizeof(uint64_t), '\0'), ts_sz_lookup));
|
||||||
|
|
||||||
|
// Point lookup
|
||||||
|
for (uint32_t i = 0; i < kMaxKey; ++i) {
|
||||||
|
std::string value;
|
||||||
|
std::string key;
|
||||||
|
PutFixed32(&key, i);
|
||||||
|
Status s = batch_->GetFromBatch(&cf2, Options(), key, &value);
|
||||||
|
ASSERT_OK(s);
|
||||||
|
ASSERT_EQ("value" + std::to_string(i), value);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Iterator
|
||||||
|
{
|
||||||
|
std::unique_ptr<WBWIIterator> it(batch_->NewIterator(&cf2));
|
||||||
|
uint32_t start = 0;
|
||||||
|
for (it->SeekToFirst(); it->Valid(); it->Next(), ++start) {
|
||||||
|
std::string key;
|
||||||
|
PutFixed32(&key, start);
|
||||||
|
ASSERT_OK(it->status());
|
||||||
|
ASSERT_EQ(key, it->Entry().key);
|
||||||
|
ASSERT_EQ("value" + std::to_string(start), it->Entry().value);
|
||||||
|
ASSERT_EQ(WriteType::kPutRecord, it->Entry().type);
|
||||||
|
}
|
||||||
|
ASSERT_EQ(kMaxKey, start);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Delete the keys with Delete() or SingleDelete()
|
||||||
|
for (uint32_t i = 0; i < kMaxKey; ++i) {
|
||||||
|
std::string key;
|
||||||
|
PutFixed32(&key, i);
|
||||||
|
Status s;
|
||||||
|
if (0 == (i % 2)) {
|
||||||
|
s = batch_->Delete(&cf2, key);
|
||||||
|
} else {
|
||||||
|
s = batch_->SingleDelete(&cf2, key);
|
||||||
|
}
|
||||||
|
ASSERT_OK(s);
|
||||||
|
}
|
||||||
|
|
||||||
|
ASSERT_OK(wb->UpdateTimestamps(std::string(sizeof(uint64_t), '\xfe'),
|
||||||
|
ts_sz_lookup));
|
||||||
|
|
||||||
|
for (uint32_t i = 0; i < kMaxKey; ++i) {
|
||||||
|
std::string value;
|
||||||
|
std::string key;
|
||||||
|
PutFixed32(&key, i);
|
||||||
|
Status s = batch_->GetFromBatch(&cf2, Options(), key, &value);
|
||||||
|
ASSERT_TRUE(s.IsNotFound());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Iterator
|
||||||
|
{
|
||||||
|
const bool overwrite = GetParam();
|
||||||
|
std::unique_ptr<WBWIIterator> it(batch_->NewIterator(&cf2));
|
||||||
|
uint32_t start = 0;
|
||||||
|
for (it->SeekToFirst(); it->Valid(); it->Next(), ++start) {
|
||||||
|
std::string key;
|
||||||
|
PutFixed32(&key, start);
|
||||||
|
ASSERT_EQ(key, it->Entry().key);
|
||||||
|
if (!overwrite) {
|
||||||
|
ASSERT_EQ(WriteType::kPutRecord, it->Entry().type);
|
||||||
|
it->Next();
|
||||||
|
ASSERT_TRUE(it->Valid());
|
||||||
|
}
|
||||||
|
if (0 == (start % 2)) {
|
||||||
|
ASSERT_EQ(WriteType::kDeleteRecord, it->Entry().type);
|
||||||
|
} else {
|
||||||
|
ASSERT_EQ(WriteType::kSingleDeleteRecord, it->Entry().type);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
INSTANTIATE_TEST_CASE_P(WBWI, WriteBatchWithIndexTest, testing::Bool());
|
INSTANTIATE_TEST_CASE_P(WBWI, WriteBatchWithIndexTest, testing::Bool());
|
||||||
} // namespace ROCKSDB_NAMESPACE
|
} // namespace ROCKSDB_NAMESPACE
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user