From 20357988345b02efcef303bc274089111507e160 Mon Sep 17 00:00:00 2001 From: Yanqin Jin Date: Mon, 15 Nov 2021 12:50:42 -0800 Subject: [PATCH] Update TransactionUtil::CheckKeyForConflict to also use timestamps (#9162) Summary: Pull Request resolved: https://github.com/facebook/rocksdb/pull/9162 Existing TransactionUtil::CheckKeyForConflict() performs only seq-based conflict checking. If user-defined timestamp is enabled, it should perform conflict checking based on timestamps too. Update TransactionUtil::CheckKey-related methods to verify the timestamp of the latest version of a key is smaller than the read timestamp. Note that CheckKeysForConflict() is not updated since it's used only by optimistic transaction, and we do not plan to update it in this upcoming batch of diffs. Existing GetLatestSequenceForKey() returns the sequence of the latest version of a specific user key. Since we support user-defined timestamp, we need to update this method to also return the timestamp (if enabled) of the latest version of the key. This will be needed for snapshot validation. Reviewed By: ltamasi Differential Revision: D31567960 fbshipit-source-id: 2e4a14aed267435a9aa91bc632d2411c01946d44 --- HISTORY.md | 1 + db/column_family.cc | 3 +- db/db_impl/db_impl.cc | 59 ++++++++++++----- db/db_impl/db_impl.h | 12 +++- db/db_test2.cc | 63 +++++++++++++++++++ .../transactions/pessimistic_transaction.cc | 4 +- utilities/transactions/transaction_util.cc | 38 ++++++++--- utilities/transactions/transaction_util.h | 17 +++-- utilities/transactions/write_prepared_txn.cc | 7 ++- .../transactions/write_unprepared_txn.cc | 8 ++- 10 files changed, 168 insertions(+), 44 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index 709cb9701..dc3f1b2b4 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -18,6 +18,7 @@ ### Behavior Changes * `NUM_FILES_IN_SINGLE_COMPACTION` was only counting the first input level files, now it's including all input files. +* `TransactionUtil::CheckKeyForConflicts` can also perform conflict-checking based on user-defined timestamps in addition to sequence numbers. ### Public Interface Change * When options.ttl is used with leveled compaction with compactinon priority kMinOverlappingRatio, files exceeding half of TTL value will be prioritized more, so that by the time TTL is reached, fewer extra compactions will be scheduled to clear them up. At the same time, when compacting files with data older than half of TTL, output files may be cut off based on those files' boundaries, in order for the early TTL compaction to work properly. diff --git a/db/column_family.cc b/db/column_family.cc index 2aa1fbd6d..e749cb9ee 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -211,7 +211,8 @@ ColumnFamilyOptions SanitizeOptions(const ImmutableDBOptions& db_options, size_t clamp_max = std::conditional< sizeof(size_t) == 4, std::integral_constant, std::integral_constant>::type::value; - ClipToRange(&result.write_buffer_size, ((size_t)64) << 10, clamp_max); + ClipToRange(&result.write_buffer_size, (static_cast(64)) << 10, + clamp_max); // if user sets arena_block_size, we trust user to use this value. Otherwise, // calculate a proper value from writer_buffer_size; if (result.arena_block_size <= 0) { diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index 8e3e0decc..4ed47d224 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -4376,25 +4376,38 @@ SequenceNumber DBImpl::GetEarliestMemTableSequenceNumber(SuperVersion* sv, return earliest_seq; } -Status DBImpl::GetLatestSequenceForKey(SuperVersion* sv, const Slice& key, - bool cache_only, - SequenceNumber lower_bound_seq, - SequenceNumber* seq, - bool* found_record_for_key, - bool* is_blob_index) { +Status DBImpl::GetLatestSequenceForKey( + SuperVersion* sv, const Slice& key, bool cache_only, + SequenceNumber lower_bound_seq, SequenceNumber* seq, std::string* timestamp, + bool* found_record_for_key, bool* is_blob_index) { Status s; MergeContext merge_context; SequenceNumber max_covering_tombstone_seq = 0; ReadOptions read_options; SequenceNumber current_seq = versions_->LastSequence(); - LookupKey lkey(key, current_seq); + + ColumnFamilyData* cfd = sv->cfd; + assert(cfd); + const Comparator* const ucmp = cfd->user_comparator(); + assert(ucmp); + size_t ts_sz = ucmp->timestamp_size(); + std::string ts_buf; + if (ts_sz > 0) { + assert(timestamp); + ts_buf.assign(ts_sz, '\xff'); + } else { + assert(!timestamp); + } + Slice ts(ts_buf); + + LookupKey lkey(key, current_seq, ts_sz == 0 ? nullptr : &ts); *seq = kMaxSequenceNumber; *found_record_for_key = false; // Check if there is a record for this key in the latest memtable - sv->mem->Get(lkey, nullptr, nullptr, &s, &merge_context, + sv->mem->Get(lkey, /*value=*/nullptr, timestamp, &s, &merge_context, &max_covering_tombstone_seq, seq, read_options, nullptr /*read_callback*/, is_blob_index); @@ -4406,6 +4419,10 @@ Status DBImpl::GetLatestSequenceForKey(SuperVersion* sv, const Slice& key, return s; } + assert(!ts_sz || + (*seq != kMaxSequenceNumber && + *timestamp != std::string(ts_sz, '\xff')) || + (*seq == kMaxSequenceNumber && timestamp->empty())); if (*seq != kMaxSequenceNumber) { // Found a sequence number, no need to check immutable memtables @@ -4421,7 +4438,7 @@ Status DBImpl::GetLatestSequenceForKey(SuperVersion* sv, const Slice& key, } // Check if there is a record for this key in the immutable memtables - sv->imm->Get(lkey, nullptr, nullptr, &s, &merge_context, + sv->imm->Get(lkey, /*value=*/nullptr, timestamp, &s, &merge_context, &max_covering_tombstone_seq, seq, read_options, nullptr /*read_callback*/, is_blob_index); @@ -4434,6 +4451,11 @@ Status DBImpl::GetLatestSequenceForKey(SuperVersion* sv, const Slice& key, return s; } + assert(!ts_sz || + (*seq != kMaxSequenceNumber && + *timestamp != std::string(ts_sz, '\xff')) || + (*seq == kMaxSequenceNumber && timestamp->empty())); + if (*seq != kMaxSequenceNumber) { // Found a sequence number, no need to check memtable history *found_record_for_key = true; @@ -4448,9 +4470,9 @@ Status DBImpl::GetLatestSequenceForKey(SuperVersion* sv, const Slice& key, } // Check if there is a record for this key in the immutable memtables - sv->imm->GetFromHistory(lkey, nullptr, nullptr, &s, &merge_context, - &max_covering_tombstone_seq, seq, read_options, - is_blob_index); + sv->imm->GetFromHistory(lkey, /*value=*/nullptr, timestamp, &s, + &merge_context, &max_covering_tombstone_seq, seq, + read_options, is_blob_index); if (!(s.ok() || s.IsNotFound() || s.IsMergeInProgress())) { // unexpected error reading memtable. @@ -4462,8 +4484,13 @@ Status DBImpl::GetLatestSequenceForKey(SuperVersion* sv, const Slice& key, return s; } + assert(!ts_sz || + (*seq != kMaxSequenceNumber && + *timestamp != std::string(ts_sz, '\xff')) || + (*seq == kMaxSequenceNumber && timestamp->empty())); if (*seq != kMaxSequenceNumber) { // Found a sequence number, no need to check SST files + assert(0 == ts_sz || *timestamp != std::string(ts_sz, '\xff')); *found_record_for_key = true; return Status::OK(); } @@ -4476,10 +4503,10 @@ Status DBImpl::GetLatestSequenceForKey(SuperVersion* sv, const Slice& key, // SST files if cache_only=true? if (!cache_only) { // Check tables - sv->current->Get(read_options, lkey, nullptr, nullptr, &s, &merge_context, - &max_covering_tombstone_seq, nullptr /* value_found */, - found_record_for_key, seq, nullptr /*read_callback*/, - is_blob_index); + sv->current->Get(read_options, lkey, /*value=*/nullptr, timestamp, &s, + &merge_context, &max_covering_tombstone_seq, + nullptr /* value_found */, found_record_for_key, seq, + nullptr /*read_callback*/, is_blob_index); if (!(s.ok() || s.IsNotFound() || s.IsMergeInProgress())) { // unexpected error reading SST files diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 5dc8fa8f8..f06a0c7c2 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -582,9 +582,15 @@ class DBImpl : public DB { // in the memtables, including memtable history. If cache_only is false, // SST files will also be checked. // + // `key` should NOT have user-defined timestamp appended to user key even if + // timestamp is enabled. + // // If a key is found, *found_record_for_key will be set to true and // *seq will be set to the stored sequence number for the latest - // operation on this key or kMaxSequenceNumber if unknown. + // operation on this key or kMaxSequenceNumber if unknown. If user-defined + // timestamp is enabled for this column family and timestamp is not nullptr, + // then *timestamp will be set to the stored timestamp for the latest + // operation on this key. // If no key is found, *found_record_for_key will be set to false. // // Note: If cache_only=false, it is possible for *seq to be set to 0 if @@ -608,9 +614,9 @@ class DBImpl : public DB { Status GetLatestSequenceForKey(SuperVersion* sv, const Slice& key, bool cache_only, SequenceNumber lower_bound_seq, - SequenceNumber* seq, + SequenceNumber* seq, std::string* timestamp, bool* found_record_for_key, - bool* is_blob_index = nullptr); + bool* is_blob_index); Status TraceIteratorSeek(const uint32_t& cf_id, const Slice& key, const Slice& lower_bound, const Slice upper_bound); diff --git a/db/db_test2.cc b/db/db_test2.cc index d599694dc..5904865f6 100644 --- a/db/db_test2.cc +++ b/db/db_test2.cc @@ -6763,6 +6763,69 @@ TEST_F(DBTest2, RenameDirectory) { Destroy(options); dbname_ = old_dbname; } + +#ifndef ROCKSDB_LITE +TEST_F(DBTest2, GetLatestSeqAndTsForKey) { + Destroy(last_options_); + + Options options = CurrentOptions(); + options.max_write_buffer_size_to_maintain = 64 << 10; + options.create_if_missing = true; + options.disable_auto_compactions = true; + options.comparator = test::ComparatorWithU64Ts(); + options.statistics = CreateDBStatistics(); + + Reopen(options); + + constexpr uint64_t kTsU64Value = 12; + + for (uint64_t key = 0; key < 100; ++key) { + std::string ts_str; + PutFixed64(&ts_str, kTsU64Value); + Slice ts = ts_str; + WriteOptions write_opts; + write_opts.timestamp = &ts; + + std::string key_str; + PutFixed64(&key_str, key); + std::reverse(key_str.begin(), key_str.end()); + ASSERT_OK(Put(key_str, "value", write_opts)); + } + + ASSERT_OK(Flush()); + + constexpr bool cache_only = true; + constexpr SequenceNumber lower_bound_seq = 0; + auto* cfhi = static_cast_with_check( + dbfull()->DefaultColumnFamily()); + assert(cfhi); + assert(cfhi->cfd()); + SuperVersion* sv = cfhi->cfd()->GetSuperVersion(); + for (uint64_t key = 0; key < 100; ++key) { + std::string key_str; + PutFixed64(&key_str, key); + std::reverse(key_str.begin(), key_str.end()); + std::string ts; + SequenceNumber seq = kMaxSequenceNumber; + bool found_record_for_key = false; + bool is_blob_index = false; + + const Status s = dbfull()->GetLatestSequenceForKey( + sv, key_str, cache_only, lower_bound_seq, &seq, &ts, + &found_record_for_key, &is_blob_index); + ASSERT_OK(s); + std::string expected_ts; + PutFixed64(&expected_ts, kTsU64Value); + ASSERT_EQ(expected_ts, ts); + ASSERT_TRUE(found_record_for_key); + ASSERT_FALSE(is_blob_index); + } + + // Verify that no read to SST files. + ASSERT_EQ(0, options.statistics->getTickerCount(GET_HIT_L0)); +} +#endif // ROCKSDB_LITE + } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) { diff --git a/utilities/transactions/pessimistic_transaction.cc b/utilities/transactions/pessimistic_transaction.cc index e4dc2ad7a..f0f630a48 100644 --- a/utilities/transactions/pessimistic_transaction.cc +++ b/utilities/transactions/pessimistic_transaction.cc @@ -174,7 +174,6 @@ Status PessimisticTransaction::CommitBatch(WriteBatch* batch) { } Status PessimisticTransaction::Prepare() { - if (name_.empty()) { return Status::InvalidArgument( "Cannot prepare a transaction that has not been named."); @@ -712,8 +711,9 @@ Status PessimisticTransaction::ValidateSnapshot( ColumnFamilyHandle* cfh = column_family ? column_family : db_impl_->DefaultColumnFamily(); + // TODO (yanqin): support conflict checking based on timestamp. return TransactionUtil::CheckKeyForConflicts( - db_impl_, cfh, key.ToString(), snap_seq, false /* cache_only */); + db_impl_, cfh, key.ToString(), snap_seq, nullptr, false /* cache_only */); } bool PessimisticTransaction::TryStealingLocks() { diff --git a/utilities/transactions/transaction_util.cc b/utilities/transactions/transaction_util.cc index 494f132e7..f1d72ec07 100644 --- a/utilities/transactions/transaction_util.cc +++ b/utilities/transactions/transaction_util.cc @@ -21,8 +21,8 @@ namespace ROCKSDB_NAMESPACE { Status TransactionUtil::CheckKeyForConflicts( DBImpl* db_impl, ColumnFamilyHandle* column_family, const std::string& key, - SequenceNumber snap_seq, bool cache_only, ReadCallback* snap_checker, - SequenceNumber min_uncommitted) { + SequenceNumber snap_seq, const std::string* const read_ts, bool cache_only, + ReadCallback* snap_checker, SequenceNumber min_uncommitted) { Status result; auto cfh = static_cast_with_check(column_family); @@ -38,8 +38,8 @@ Status TransactionUtil::CheckKeyForConflicts( SequenceNumber earliest_seq = db_impl->GetEarliestMemTableSequenceNumber(sv, true); - result = CheckKey(db_impl, sv, earliest_seq, snap_seq, key, cache_only, - snap_checker, min_uncommitted); + result = CheckKey(db_impl, sv, earliest_seq, snap_seq, key, read_ts, + cache_only, snap_checker, min_uncommitted); db_impl->ReturnAndCleanupSuperVersion(cfd, sv); } @@ -50,8 +50,9 @@ Status TransactionUtil::CheckKeyForConflicts( Status TransactionUtil::CheckKey(DBImpl* db_impl, SuperVersion* sv, SequenceNumber earliest_seq, SequenceNumber snap_seq, - const std::string& key, bool cache_only, - ReadCallback* snap_checker, + const std::string& key, + const std::string* const read_ts, + bool cache_only, ReadCallback* snap_checker, SequenceNumber min_uncommitted) { // When `min_uncommitted` is provided, keys are not always committed // in sequence number order, and `snap_checker` is used to check whether @@ -105,6 +106,7 @@ Status TransactionUtil::CheckKey(DBImpl* db_impl, SuperVersion* sv, if (result.ok()) { SequenceNumber seq = kMaxSequenceNumber; + std::string timestamp; bool found_record_for_key = false; // When min_uncommitted == kMaxSequenceNumber, writes are committed in @@ -117,9 +119,10 @@ Status TransactionUtil::CheckKey(DBImpl* db_impl, SuperVersion* sv, // keys lower than min_uncommitted can be skipped. SequenceNumber lower_bound_seq = (min_uncommitted == kMaxSequenceNumber) ? snap_seq : min_uncommitted; - Status s = db_impl->GetLatestSequenceForKey(sv, key, !need_to_read_sst, - lower_bound_seq, &seq, - &found_record_for_key); + Status s = db_impl->GetLatestSequenceForKey( + sv, key, !need_to_read_sst, lower_bound_seq, &seq, + !read_ts ? nullptr : ×tamp, &found_record_for_key, + /*is_blob_index=*/nullptr); if (!(s.ok() || s.IsNotFound() || s.IsMergeInProgress())) { result = s; @@ -127,6 +130,17 @@ Status TransactionUtil::CheckKey(DBImpl* db_impl, SuperVersion* sv, bool write_conflict = snap_checker == nullptr ? snap_seq < seq : !snap_checker->IsVisible(seq); + // Perform conflict checking based on timestamp if applicable. + if (!write_conflict && read_ts != nullptr) { + ColumnFamilyData* cfd = sv->cfd; + assert(cfd); + const Comparator* const ucmp = cfd->user_comparator(); + assert(ucmp); + assert(read_ts->size() == ucmp->timestamp_size()); + assert(read_ts->size() == timestamp.size()); + // Write conflict if *ts < timestamp. + write_conflict = ucmp->CompareTimestamp(*read_ts, timestamp) < 0; + } if (write_conflict) { result = Status::Busy(); } @@ -167,7 +181,11 @@ Status TransactionUtil::CheckKeysForConflicts(DBImpl* db_impl, PointLockStatus status = tracker.GetPointLockStatus(cf, key); const SequenceNumber key_seq = status.seq; - result = CheckKey(db_impl, sv, earliest_seq, key_seq, key, cache_only); + // TODO: support timestamp-based conflict checking. + // CheckKeysForConflicts() is currently used only by optimistic + // transactions. + result = CheckKey(db_impl, sv, earliest_seq, key_seq, key, + /*read_ts=*/nullptr, cache_only); if (!result.ok()) { break; } diff --git a/utilities/transactions/transaction_util.h b/utilities/transactions/transaction_util.h index 707d487c5..a349ba87a 100644 --- a/utilities/transactions/transaction_util.h +++ b/utilities/transactions/transaction_util.h @@ -27,20 +27,22 @@ class WriteBatchWithIndex; class TransactionUtil { public: // Verifies there have been no commits to this key in the db since this - // sequence number. + // sequence number. If user-defined timestamp is enabled, then also check + // no commits to this key in the db since the given ts. // // If cache_only is true, then this function will not attempt to read any // SST files. This will make it more likely this function will // return an error if it is unable to determine if there are any conflicts. // - // See comment of CheckKey() for explanation of `snap_seq`, `snap_checker` - // and `min_uncommitted`. + // See comment of CheckKey() for explanation of `snap_seq`, `ts`, + // `snap_checker` and `min_uncommitted`. // // Returns OK on success, BUSY if there is a conflicting write, or other error // status for any unexpected errors. static Status CheckKeyForConflicts( DBImpl* db_impl, ColumnFamilyHandle* column_family, - const std::string& key, SequenceNumber snap_seq, bool cache_only, + const std::string& key, SequenceNumber snap_seq, + const std::string* const ts, bool cache_only, ReadCallback* snap_checker = nullptr, SequenceNumber min_uncommitted = kMaxSequenceNumber); @@ -68,10 +70,13 @@ class TransactionUtil { // seq < `min_uncommitted`: no conflict // seq > `snap_seq`: applicable to conflict // `min_uncommitted` <= seq <= `snap_seq`: call `snap_checker` to determine. + // + // If user-defined timestamp is enabled, a write conflict is detected if an + // operation for `key` with timestamp greater than `ts` exists. static Status CheckKey(DBImpl* db_impl, SuperVersion* sv, SequenceNumber earliest_seq, SequenceNumber snap_seq, - const std::string& key, bool cache_only, - ReadCallback* snap_checker = nullptr, + const std::string& key, const std::string* const ts, + bool cache_only, ReadCallback* snap_checker = nullptr, SequenceNumber min_uncommitted = kMaxSequenceNumber); }; diff --git a/utilities/transactions/write_prepared_txn.cc b/utilities/transactions/write_prepared_txn.cc index 4c6b82616..25c7c0f40 100644 --- a/utilities/transactions/write_prepared_txn.cc +++ b/utilities/transactions/write_prepared_txn.cc @@ -453,9 +453,10 @@ Status WritePreparedTxn::ValidateSnapshot(ColumnFamilyHandle* column_family, WritePreparedTxnReadCallback snap_checker(wpt_db_, snap_seq, min_uncommitted, kBackedByDBSnapshot); - return TransactionUtil::CheckKeyForConflicts(db_impl_, cfh, key.ToString(), - snap_seq, false /* cache_only */, - &snap_checker, min_uncommitted); + // TODO(yanqin): support user-defined timestamp + return TransactionUtil::CheckKeyForConflicts( + db_impl_, cfh, key.ToString(), snap_seq, /*ts=*/nullptr, + false /* cache_only */, &snap_checker, min_uncommitted); } void WritePreparedTxn::SetSnapshot() { diff --git a/utilities/transactions/write_unprepared_txn.cc b/utilities/transactions/write_unprepared_txn.cc index 990e63a1a..b1a33741e 100644 --- a/utilities/transactions/write_unprepared_txn.cc +++ b/utilities/transactions/write_unprepared_txn.cc @@ -6,6 +6,7 @@ #ifndef ROCKSDB_LITE #include "utilities/transactions/write_unprepared_txn.h" + #include "db/db_impl/db_impl.h" #include "util/cast_util.h" #include "utilities/transactions/write_unprepared_txn_db.h" @@ -1025,9 +1026,10 @@ Status WriteUnpreparedTxn::ValidateSnapshot(ColumnFamilyHandle* column_family, WriteUnpreparedTxnReadCallback snap_checker( wupt_db_, snap_seq, min_uncommitted, unprep_seqs_, kBackedByDBSnapshot); - return TransactionUtil::CheckKeyForConflicts(db_impl_, cfh, key.ToString(), - snap_seq, false /* cache_only */, - &snap_checker, min_uncommitted); + // TODO(yanqin): Support user-defined timestamp. + return TransactionUtil::CheckKeyForConflicts( + db_impl_, cfh, key.ToString(), snap_seq, /*ts=*/nullptr, + false /* cache_only */, &snap_checker, min_uncommitted); } const std::map&