diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index af9aea011..8132d5a0b 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -1499,7 +1499,22 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, ? versions_->LastSequence() : versions_->LastPublishedSequence(); if (callback) { + // The unprep_seqs are not published for write unprepared, so it could be + // that max_visible_seq is larger. Seek to the std::max of the two. + // However, we still want our callback to contain the actual snapshot so + // that it can do the correct visibility filtering. callback->Refresh(snapshot); + + // Internally, WriteUnpreparedTxnReadCallback::Refresh would set + // max_visible_seq = max(max_visible_seq, snapshot) + // + // Currently, the commented out assert is broken by + // InvalidSnapshotReadCallback, but if write unprepared recovery followed + // the regular transaction flow, then this special read callback would not + // be needed. + // + // assert(callback->max_visible_seq() >= snapshot); + snapshot = callback->max_visible_seq(); } } TEST_SYNC_POINT("DBImpl::GetImpl:3"); diff --git a/db/db_impl/db_impl_write.cc b/db/db_impl/db_impl_write.cc index c0d320013..95a1b31c7 100644 --- a/db/db_impl/db_impl_write.cc +++ b/db/db_impl/db_impl_write.cc @@ -172,7 +172,8 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, w.status = WriteBatchInternal::InsertInto( &w, w.sequence, &column_family_memtables, &flush_scheduler_, write_options.ignore_missing_column_families, 0 /*log_number*/, this, - true /*concurrent_memtable_writes*/, seq_per_batch_, w.batch_cnt); + true /*concurrent_memtable_writes*/, seq_per_batch_, w.batch_cnt, + batch_per_txn_); PERF_TIMER_START(write_pre_and_post_process_time); } diff --git a/db/db_iter.cc b/db/db_iter.cc index 633724c57..060138fd6 100644 --- a/db/db_iter.cc +++ b/db/db_iter.cc @@ -263,12 +263,6 @@ class DBIter final: public Iterator { bool TooManyInternalKeysSkipped(bool increment = true); inline bool IsVisible(SequenceNumber sequence); - // CanReseekToSkip() returns whether the iterator can use the optimization - // where it reseek by sequence number to get the next key when there are too - // many versions. This is disabled for write unprepared because seeking to - // sequence number does not guarantee that it is visible. - inline bool CanReseekToSkip(); - // Temporarily pin the blocks that we encounter until ReleaseTempPinnedData() // is called void TempPinData() { @@ -453,6 +447,11 @@ inline bool DBIter::FindNextUserEntryInternal(bool skipping, bool prefix_check) // greater than that, // - none of the above : saved_key_ can contain anything, it doesn't matter. uint64_t num_skipped = 0; + // For write unprepared, the target sequence number in reseek could be larger + // than the snapshot, and thus needs to be skipped again. This could result in + // an infinite loop of reseeks. To avoid that, we limit the number of reseeks + // to one. + bool reseek_done = false; is_blob_ = false; @@ -498,6 +497,7 @@ inline bool DBIter::FindNextUserEntryInternal(bool skipping, bool prefix_check) assert(!skipping || user_comparator_.Compare( ikey_.user_key, saved_key_.GetUserKey()) > 0); num_skipped = 0; + reseek_done = false; switch (ikey_.type) { case kTypeDeletion: case kTypeSingleDeletion: @@ -551,6 +551,7 @@ inline bool DBIter::FindNextUserEntryInternal(bool skipping, bool prefix_check) // they are hidden by this deletion. skipping = true; num_skipped = 0; + reseek_done = false; PERF_COUNTER_ADD(internal_delete_skipped_count, 1); } else if (ikey_.type == kTypeBlobIndex) { if (!allow_blob_) { @@ -581,6 +582,7 @@ inline bool DBIter::FindNextUserEntryInternal(bool skipping, bool prefix_check) // they are hidden by this deletion. skipping = true; num_skipped = 0; + reseek_done = false; PERF_COUNTER_ADD(internal_delete_skipped_count, 1); } else { // By now, we are sure the current ikey is going to yield a @@ -611,14 +613,23 @@ inline bool DBIter::FindNextUserEntryInternal(bool skipping, bool prefix_check) !iter_.iter()->IsKeyPinned() || !pin_thru_lifetime_ /* copy */); skipping = false; num_skipped = 0; + reseek_done = false; } } // If we have sequentially iterated via numerous equal keys, then it's // better to seek so that we can avoid too many key comparisons. - if (num_skipped > max_skip_ && CanReseekToSkip()) { + // + // To avoid infinite loops, do not reseek if we have already attempted to + // reseek previously. + // + // TODO(lth): If we reseek to sequence number greater than ikey_.sequence, + // than it does not make sense to reseek as we would actually land further + // away from the desired key. There is opportunity for optimization here. + if (num_skipped > max_skip_ && !reseek_done) { is_key_seqnum_zero_ = false; num_skipped = 0; + reseek_done = true; std::string last_key; if (skipping) { // We're looking for the next user-key but all we see are the same @@ -937,7 +948,7 @@ bool DBIter::FindValueForCurrentKey() { // This user key has lots of entries. // We're going from old to new, and it's taking too long. Let's do a Seek() // and go from new to old. This helps when a key was overwritten many times. - if (num_skipped >= max_skip_ && CanReseekToSkip()) { + if (num_skipped >= max_skip_) { return FindValueForCurrentKeyUsingSeek(); } @@ -1234,7 +1245,7 @@ bool DBIter::FindUserKeyBeforeSavedKey() { PERF_COUNTER_ADD(internal_key_skipped_count, 1); } - if (num_skipped >= max_skip_ && CanReseekToSkip()) { + if (num_skipped >= max_skip_) { num_skipped = 0; IterKey last_key; last_key.SetInternalKey(ParsedInternalKey( @@ -1281,10 +1292,6 @@ bool DBIter::IsVisible(SequenceNumber sequence) { } } -bool DBIter::CanReseekToSkip() { - return read_callback_ == nullptr || read_callback_->CanReseekToSkip(); -} - void DBIter::Seek(const Slice& target) { PERF_CPU_TIMER_GUARD(iter_seek_cpu_nanos, env_); StopWatch sw(env_, statistics_, DB_SEEK); diff --git a/db/read_callback.h b/db/read_callback.h index 60f91ef87..d8801e651 100644 --- a/db/read_callback.h +++ b/db/read_callback.h @@ -42,9 +42,6 @@ class ReadCallback { // Refresh to a more recent visible seq virtual void Refresh(SequenceNumber seq) { max_visible_seq_ = seq; } - // Refer to DBIter::CanReseekToSkip - virtual bool CanReseekToSkip() { return true; } - protected: // The max visible seq, it is usually the snapshot but could be larger if // transaction has its own writes written to db. diff --git a/utilities/transactions/transaction_test.cc b/utilities/transactions/transaction_test.cc index a410c5b51..7868d0060 100644 --- a/utilities/transactions/transaction_test.cc +++ b/utilities/transactions/transaction_test.cc @@ -3471,6 +3471,12 @@ TEST_P(TransactionTest, LockLimitTest) { } TEST_P(TransactionTest, IteratorTest) { + // This test does writes without snapshot validation, and then tries to create + // iterator later, which is unsupported in write unprepared. + if (txn_db_options.write_policy == WRITE_UNPREPARED) { + return; + } + WriteOptions write_options; ReadOptions read_options, snapshot_read_options; std::string value; @@ -3589,6 +3595,16 @@ TEST_P(TransactionTest, IteratorTest) { } TEST_P(TransactionTest, DisableIndexingTest) { + // Skip this test for write unprepared. It does not solely rely on WBWI for + // read your own writes, so depending on whether batches are flushed or not, + // only some writes will be visible. + // + // Also, write unprepared does not support creating iterators if there has + // been txn->Put() without snapshot validation. + if (txn_db_options.write_policy == WRITE_UNPREPARED) { + return; + } + WriteOptions write_options; ReadOptions read_options; std::string value; diff --git a/utilities/transactions/write_unprepared_transaction_test.cc b/utilities/transactions/write_unprepared_transaction_test.cc index faa6c7745..a2546229e 100644 --- a/utilities/transactions/write_unprepared_transaction_test.cc +++ b/utilities/transactions/write_unprepared_transaction_test.cc @@ -37,6 +37,9 @@ INSTANTIATE_TEST_CASE_P( std::make_tuple(false, true, WRITE_UNPREPARED))); TEST_P(WriteUnpreparedTransactionTest, ReadYourOwnWrite) { + // The following tests checks whether reading your own write for + // a transaction works for write unprepared, when there are uncommitted + // values written into DB. auto verify_state = [](Iterator* iter, const std::string& key, const std::string& value) { ASSERT_TRUE(iter->Valid()); @@ -45,155 +48,251 @@ TEST_P(WriteUnpreparedTransactionTest, ReadYourOwnWrite) { ASSERT_EQ(value, iter->value().ToString()); }; - options.disable_auto_compactions = true; - ReOpen(); + // Test always reseeking vs never reseeking. + for (uint64_t max_skip : {0, std::numeric_limits::max()}) { + options.max_sequential_skip_in_iterations = max_skip; + options.disable_auto_compactions = true; + ReOpen(); - // The following tests checks whether reading your own write for - // a transaction works for write unprepared, when there are uncommitted - // values written into DB. - // - // Although the values written by DB::Put are technically committed, we add - // their seq num to unprep_seqs_ to pretend that they were written into DB - // as part of an unprepared batch, and then check if they are visible to the - // transaction. - auto snapshot0 = db->GetSnapshot(); - ASSERT_OK(db->Put(WriteOptions(), "a", "v1")); - ASSERT_OK(db->Put(WriteOptions(), "b", "v2")); - auto snapshot2 = db->GetSnapshot(); - ASSERT_OK(db->Put(WriteOptions(), "a", "v3")); - ASSERT_OK(db->Put(WriteOptions(), "b", "v4")); - auto snapshot4 = db->GetSnapshot(); - ASSERT_OK(db->Put(WriteOptions(), "a", "v5")); - ASSERT_OK(db->Put(WriteOptions(), "b", "v6")); - auto snapshot6 = db->GetSnapshot(); - ASSERT_OK(db->Put(WriteOptions(), "a", "v7")); - ASSERT_OK(db->Put(WriteOptions(), "b", "v8")); - auto snapshot8 = db->GetSnapshot(); + TransactionOptions txn_options; + WriteOptions woptions; + ReadOptions roptions; - TransactionOptions txn_options; - WriteOptions write_options; - Transaction* txn = db->BeginTransaction(write_options, txn_options); - WriteUnpreparedTxn* wup_txn = dynamic_cast(txn); + ASSERT_OK(db->Put(woptions, "a", "")); + ASSERT_OK(db->Put(woptions, "b", "")); - ReadOptions roptions; - roptions.snapshot = snapshot0; + Transaction* txn = db->BeginTransaction(woptions, txn_options); + WriteUnpreparedTxn* wup_txn = dynamic_cast(txn); + txn->SetSnapshot(); - wup_txn->unprep_seqs_[snapshot2->GetSequenceNumber() + 1] = - snapshot4->GetSequenceNumber() - snapshot2->GetSequenceNumber(); - auto iter = txn->GetIterator(roptions); + for (int i = 0; i < 5; i++) { + std::string stored_value = "v" + ToString(i); + ASSERT_OK(txn->Put("a", stored_value)); + ASSERT_OK(txn->Put("b", stored_value)); + wup_txn->FlushWriteBatchToDB(false); - // Test Get(). - std::string value; + // Test Get() + std::string value; + ASSERT_OK(txn->Get(roptions, "a", &value)); + ASSERT_EQ(value, stored_value); + ASSERT_OK(txn->Get(roptions, "b", &value)); + ASSERT_EQ(value, stored_value); - ASSERT_OK(txn->Get(roptions, Slice("a"), &value)); - ASSERT_EQ(value, "v3"); + // Test Next() + auto iter = txn->GetIterator(roptions); + iter->Seek("a"); + verify_state(iter, "a", stored_value); - ASSERT_OK(txn->Get(roptions, Slice("b"), &value)); - ASSERT_EQ(value, "v4"); + iter->Next(); + verify_state(iter, "b", stored_value); - wup_txn->unprep_seqs_[snapshot6->GetSequenceNumber() + 1] = - snapshot8->GetSequenceNumber() - snapshot6->GetSequenceNumber(); - delete iter; - iter = txn->GetIterator(roptions); + iter->SeekToFirst(); + verify_state(iter, "a", stored_value); - ASSERT_OK(txn->Get(roptions, Slice("a"), &value)); - ASSERT_EQ(value, "v7"); + iter->Next(); + verify_state(iter, "b", stored_value); - ASSERT_OK(txn->Get(roptions, Slice("b"), &value)); - ASSERT_EQ(value, "v8"); + delete iter; - wup_txn->unprep_seqs_.clear(); + // Test Prev() + iter = txn->GetIterator(roptions); + iter->SeekForPrev("b"); + verify_state(iter, "b", stored_value); - // Test Next(). - wup_txn->unprep_seqs_[snapshot2->GetSequenceNumber() + 1] = - snapshot4->GetSequenceNumber() - snapshot2->GetSequenceNumber(); - delete iter; - iter = txn->GetIterator(roptions); + iter->Prev(); + verify_state(iter, "a", stored_value); - iter->Seek("a"); - verify_state(iter, "a", "v3"); + iter->SeekToLast(); + verify_state(iter, "b", stored_value); - iter->Next(); - verify_state(iter, "b", "v4"); + iter->Prev(); + verify_state(iter, "a", stored_value); - iter->SeekToFirst(); - verify_state(iter, "a", "v3"); + delete iter; + } - iter->Next(); - verify_state(iter, "b", "v4"); + delete txn; + } +} - wup_txn->unprep_seqs_[snapshot6->GetSequenceNumber() + 1] = - snapshot8->GetSequenceNumber() - snapshot6->GetSequenceNumber(); - delete iter; - iter = txn->GetIterator(roptions); +TEST_P(WriteUnpreparedTransactionTest, ReadYourOwnWriteStress) { + // This is a stress test where different threads are writing random keys, and + // then before committing or aborting the transaction, it validates to see + // that it can read the keys it wrote, and the keys it did not write respect + // the snapshot. To avoid row lock contention (and simply stressing the + // locking system), each thread is mostly only writing to its own set of keys. + const uint32_t kNumIter = 1000; + const uint32_t kNumThreads = 10; + const uint32_t kNumKeys = 5; - iter->Seek("a"); - verify_state(iter, "a", "v7"); + std::default_random_engine rand(static_cast( + std::hash()(std::this_thread::get_id()))); - iter->Next(); - verify_state(iter, "b", "v8"); + enum Action { NO_SNAPSHOT, RO_SNAPSHOT, REFRESH_SNAPSHOT }; + // Test with + // 1. no snapshots set + // 2. snapshot set on ReadOptions + // 3. snapshot set, and refreshing after every write. + for (Action a : {NO_SNAPSHOT, RO_SNAPSHOT, REFRESH_SNAPSHOT}) { + WriteOptions write_options; + txn_db_options.transaction_lock_timeout = -1; + options.disable_auto_compactions = true; + ReOpen(); - iter->SeekToFirst(); - verify_state(iter, "a", "v7"); + std::vector keys; + for (uint32_t k = 0; k < kNumKeys * kNumThreads; k++) { + keys.push_back("k" + ToString(k)); + } + std::shuffle(keys.begin(), keys.end(), rand); - iter->Next(); - verify_state(iter, "b", "v8"); + // This counter will act as a "sequence number" to help us validate + // visibility logic with snapshots. If we had direct access to the seqno of + // snapshots and key/values, then we should directly compare those instead. + std::atomic counter(0); - wup_txn->unprep_seqs_.clear(); + std::function stress_thread = [&](int id) { + size_t tid = std::hash()(std::this_thread::get_id()); + Random64 rnd(static_cast(tid)); - // Test Prev(). For Prev(), we need to adjust the snapshot to match what is - // possible in WriteUnpreparedTxn. - // - // Because of row locks and ValidateSnapshot, there cannot be any committed - // entries after snapshot, but before the first prepared key. - roptions.snapshot = snapshot2; - wup_txn->unprep_seqs_[snapshot2->GetSequenceNumber() + 1] = - snapshot4->GetSequenceNumber() - snapshot2->GetSequenceNumber(); - delete iter; - iter = txn->GetIterator(roptions); + Transaction* txn; + TransactionOptions txn_options; + // batch_size of 1 causes writes to DB for every marker. + txn_options.max_write_batch_size = 1; + ReadOptions read_options; - iter->SeekForPrev("b"); - verify_state(iter, "b", "v4"); + for (uint32_t i = 0; i < kNumIter; i++) { + std::set owned_keys(&keys[id * kNumKeys], + &keys[(id + 1) * kNumKeys]); + // Add unowned keys to make the workload more interesting, but this + // increases row lock contention, so just do it sometimes. + if (rnd.OneIn(2)) { + owned_keys.insert(keys[rnd.Uniform(kNumKeys * kNumThreads)]); + } - iter->Prev(); - verify_state(iter, "a", "v3"); + txn = db->BeginTransaction(write_options, txn_options); + txn->SetName(ToString(id)); + txn->SetSnapshot(); + if (a >= RO_SNAPSHOT) { + read_options.snapshot = txn->GetSnapshot(); + ASSERT_TRUE(read_options.snapshot != nullptr); + } - iter->SeekToLast(); - verify_state(iter, "b", "v4"); + uint64_t buf[2]; + buf[0] = id; - iter->Prev(); - verify_state(iter, "a", "v3"); + // When scanning through the database, make sure that all unprepared + // keys have value >= snapshot and all other keys have value < snapshot. + int64_t snapshot_num = counter.fetch_add(1); - roptions.snapshot = snapshot6; - wup_txn->unprep_seqs_[snapshot6->GetSequenceNumber() + 1] = - snapshot8->GetSequenceNumber() - snapshot6->GetSequenceNumber(); - delete iter; - iter = txn->GetIterator(roptions); + Status s; + for (const auto& key : owned_keys) { + buf[1] = counter.fetch_add(1); + s = txn->Put(key, Slice((const char*)buf, sizeof(buf))); + if (!s.ok()) { + break; + } + if (a == REFRESH_SNAPSHOT) { + txn->SetSnapshot(); + read_options.snapshot = txn->GetSnapshot(); + snapshot_num = counter.fetch_add(1); + } + } - iter->SeekForPrev("b"); - verify_state(iter, "b", "v8"); + // Failure is possible due to snapshot validation. In this case, + // rollback and move onto next iteration. + if (!s.ok()) { + ASSERT_TRUE(s.IsBusy()); + ASSERT_OK(txn->Rollback()); + delete txn; + continue; + } - iter->Prev(); - verify_state(iter, "a", "v7"); + auto verify_key = [&owned_keys, &a, &id, &snapshot_num]( + const std::string& key, + const std::string& value) { + if (owned_keys.count(key) > 0) { + ASSERT_EQ(value.size(), 16); - iter->SeekToLast(); - verify_state(iter, "b", "v8"); + // Since this key is part of owned_keys, then this key must be + // unprepared by this transaction identified by 'id' + ASSERT_EQ(((int64_t*)value.c_str())[0], id); + if (a == REFRESH_SNAPSHOT) { + // If refresh snapshot is true, then the snapshot is refreshed + // after every Put(), meaning that the current snapshot in + // snapshot_num must be greater than the "seqno" of any keys + // written by the current transaction. + ASSERT_LT(((int64_t*)value.c_str())[1], snapshot_num); + } else { + // If refresh snapshot is not on, then the snapshot was taken at + // the beginning of the transaction, meaning all writes must come + // after snapshot_num + ASSERT_GT(((int64_t*)value.c_str())[1], snapshot_num); + } + } else if (a >= RO_SNAPSHOT) { + // If this is not an unprepared key, just assert that the key + // "seqno" is smaller than the snapshot seqno. + ASSERT_EQ(value.size(), 16); + ASSERT_LT(((int64_t*)value.c_str())[1], snapshot_num); + } + }; - iter->Prev(); - verify_state(iter, "a", "v7"); + // Validate Get()/Next()/Prev(). Do only one of them to save time, and + // reduce lock contention. + switch (rnd.Uniform(3)) { + case 0: // Validate Get() + { + for (const auto& key : keys) { + std::string value; + s = txn->Get(read_options, Slice(key), &value); + if (!s.ok()) { + ASSERT_TRUE(s.IsNotFound()); + ASSERT_EQ(owned_keys.count(key), 0); + } else { + verify_key(key, value); + } + } + break; + } + case 1: // Validate Next() + { + Iterator* iter = txn->GetIterator(read_options); + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + verify_key(iter->key().ToString(), iter->value().ToString()); + } + delete iter; + break; + } + case 2: // Validate Prev() + { + Iterator* iter = txn->GetIterator(read_options); + for (iter->SeekToLast(); iter->Valid(); iter->Prev()) { + verify_key(iter->key().ToString(), iter->value().ToString()); + } + delete iter; + break; + } + default: + ASSERT_TRUE(false); + } - // Since the unprep_seqs_ data were faked for testing, we do not want the - // destructor for the transaction to be rolling back data that did not - // exist. - wup_txn->unprep_seqs_.clear(); + if (rnd.OneIn(2)) { + ASSERT_OK(txn->Commit()); + } else { + ASSERT_OK(txn->Rollback()); + } + delete txn; + } + }; - db->ReleaseSnapshot(snapshot0); - db->ReleaseSnapshot(snapshot2); - db->ReleaseSnapshot(snapshot4); - db->ReleaseSnapshot(snapshot6); - db->ReleaseSnapshot(snapshot8); - delete iter; - delete txn; + std::vector threads; + for (uint32_t i = 0; i < kNumThreads; i++) { + threads.emplace_back(stress_thread, i); + } + + for (auto& t : threads) { + t.join(); + } + } } // This tests how write unprepared behaves during recovery when the DB crashes diff --git a/utilities/transactions/write_unprepared_txn.cc b/utilities/transactions/write_unprepared_txn.cc index d127220e4..4d1401b3a 100644 --- a/utilities/transactions/write_unprepared_txn.cc +++ b/utilities/transactions/write_unprepared_txn.cc @@ -32,7 +32,7 @@ bool WriteUnpreparedTxnReadCallback::IsVisibleFullCheck(SequenceNumber seq) { SequenceNumber WriteUnpreparedTxnReadCallback::CalcMaxUnpreparedSequenceNumber( WriteUnpreparedTxn* txn) { - auto unprep_seqs = txn->GetUnpreparedSequenceNumbers(); + const auto& unprep_seqs = txn->GetUnpreparedSequenceNumbers(); if (unprep_seqs.size()) { return unprep_seqs.rbegin()->first + unprep_seqs.rbegin()->second - 1; } @@ -44,7 +44,8 @@ WriteUnpreparedTxn::WriteUnpreparedTxn(WriteUnpreparedTxnDB* txn_db, const TransactionOptions& txn_options) : WritePreparedTxn(txn_db, write_options, txn_options), wupt_db_(txn_db), - recovered_txn_(false) { + recovered_txn_(false), + largest_validated_seq_(0) { max_write_batch_size_ = txn_options.max_write_batch_size; // We set max bytes to zero so that we don't get a memory limit error. // Instead of trying to keep write batch strictly under the size limit, we @@ -85,75 +86,82 @@ void WriteUnpreparedTxn::Initialize(const TransactionOptions& txn_options) { write_batch_.SetMaxBytes(0); unprep_seqs_.clear(); recovered_txn_ = false; + largest_validated_seq_ = 0; +} + +Status WriteUnpreparedTxn::HandleWrite(std::function do_write) { + Status s = MaybeFlushWriteBatchToDB(); + if (!s.ok()) { + return s; + } + s = do_write(); + if (s.ok()) { + if (snapshot_) { + largest_validated_seq_ = + std::max(largest_validated_seq_, snapshot_->GetSequenceNumber()); + } else { + largest_validated_seq_ = kMaxSequenceNumber; + } + } + return s; } Status WriteUnpreparedTxn::Put(ColumnFamilyHandle* column_family, const Slice& key, const Slice& value, const bool assume_tracked) { - Status s = MaybeFlushWriteBatchToDB(); - if (!s.ok()) { - return s; - } - return TransactionBaseImpl::Put(column_family, key, value, assume_tracked); + return HandleWrite([&]() { + return TransactionBaseImpl::Put(column_family, key, value, assume_tracked); + }); } Status WriteUnpreparedTxn::Put(ColumnFamilyHandle* column_family, const SliceParts& key, const SliceParts& value, const bool assume_tracked) { - Status s = MaybeFlushWriteBatchToDB(); - if (!s.ok()) { - return s; - } - return TransactionBaseImpl::Put(column_family, key, value, assume_tracked); + return HandleWrite([&]() { + return TransactionBaseImpl::Put(column_family, key, value, assume_tracked); + }); } Status WriteUnpreparedTxn::Merge(ColumnFamilyHandle* column_family, const Slice& key, const Slice& value, const bool assume_tracked) { - Status s = MaybeFlushWriteBatchToDB(); - if (!s.ok()) { - return s; - } - return TransactionBaseImpl::Merge(column_family, key, value, assume_tracked); + return HandleWrite([&]() { + return TransactionBaseImpl::Merge(column_family, key, value, + assume_tracked); + }); } Status WriteUnpreparedTxn::Delete(ColumnFamilyHandle* column_family, const Slice& key, const bool assume_tracked) { - Status s = MaybeFlushWriteBatchToDB(); - if (!s.ok()) { - return s; - } - return TransactionBaseImpl::Delete(column_family, key, assume_tracked); + return HandleWrite([&]() { + return TransactionBaseImpl::Delete(column_family, key, assume_tracked); + }); } Status WriteUnpreparedTxn::Delete(ColumnFamilyHandle* column_family, const SliceParts& key, const bool assume_tracked) { - Status s = MaybeFlushWriteBatchToDB(); - if (!s.ok()) { - return s; - } - return TransactionBaseImpl::Delete(column_family, key, assume_tracked); + return HandleWrite([&]() { + return TransactionBaseImpl::Delete(column_family, key, assume_tracked); + }); } Status WriteUnpreparedTxn::SingleDelete(ColumnFamilyHandle* column_family, const Slice& key, const bool assume_tracked) { - Status s = MaybeFlushWriteBatchToDB(); - if (!s.ok()) { - return s; - } - return TransactionBaseImpl::SingleDelete(column_family, key, assume_tracked); + return HandleWrite([&]() { + return TransactionBaseImpl::SingleDelete(column_family, key, + assume_tracked); + }); } Status WriteUnpreparedTxn::SingleDelete(ColumnFamilyHandle* column_family, const SliceParts& key, const bool assume_tracked) { - Status s = MaybeFlushWriteBatchToDB(); - if (!s.ok()) { - return s; - } - return TransactionBaseImpl::SingleDelete(column_family, key, assume_tracked); + return HandleWrite([&]() { + return TransactionBaseImpl::SingleDelete(column_family, key, + assume_tracked); + }); } // WriteUnpreparedTxn::RebuildFromWriteBatch is only called on recovery. For diff --git a/utilities/transactions/write_unprepared_txn.h b/utilities/transactions/write_unprepared_txn.h index 15a76d134..b64fd81e6 100644 --- a/utilities/transactions/write_unprepared_txn.h +++ b/utilities/transactions/write_unprepared_txn.h @@ -17,6 +17,40 @@ namespace rocksdb { class WriteUnpreparedTxnDB; class WriteUnpreparedTxn; +// WriteUnprepared transactions needs to be able to read their own uncommitted +// writes, and supporting this requires some careful consideration. Because +// writes in the current transaction may be flushed to DB already, we cannot +// rely on the contents of WriteBatchWithIndex to determine whether a key should +// be visible or not, so we have to remember to check the DB for any uncommitted +// keys that should be visible to us. First, we will need to change the seek to +// snapshot logic, to seek to max_visible_seq = max(snap_seq, max_unprep_seq). +// Any key greater than max_visible_seq should not be visible because they +// cannot be unprepared by the current transaction and they are not in its +// snapshot. +// +// When we seek to max_visible_seq, one of these cases will happen: +// 1. We hit a unprepared key from the current transaction. +// 2. We hit a unprepared key from the another transaction. +// 3. We hit a committed key with snap_seq < seq < max_unprep_seq. +// 4. We hit a committed key with seq <= snap_seq. +// +// IsVisibleFullCheck handles all cases correctly. +// +// Other notes: +// Note that max_visible_seq is only calculated once at iterator construction +// time, meaning if the same transaction is adding more unprep seqs through +// writes during iteration, these newer writes may not be visible. This is not a +// problem for MySQL though because it avoids modifying the index as it is +// scanning through it to avoid the Halloween Problem. Instead, it scans the +// index once up front, and modifies based on a temporary copy. +// +// In DBIter, there is a "reseek" optimization if the iterator skips over too +// many keys. However, this assumes that the reseek seeks exactly to the +// required key. In write unprepared, even after seeking directly to +// max_visible_seq, some iteration may be required before hitting a visible key, +// and special precautions must be taken to avoid performing another reseek, +// leading to an infinite loop. +// class WriteUnpreparedTxnReadCallback : public ReadCallback { public: WriteUnpreparedTxnReadCallback(WritePreparedTxnDB* db, @@ -25,7 +59,7 @@ class WriteUnpreparedTxnReadCallback : public ReadCallback { WriteUnpreparedTxn* txn) // Pass our last uncommitted seq as the snapshot to the parent class to // ensure that the parent will not prematurely filter out own writes. We - // will do the exact comparison agaisnt snapshots in IsVisibleFullCheck + // will do the exact comparison against snapshots in IsVisibleFullCheck // override. : ReadCallback(CalcMaxVisibleSeq(txn, snapshot), min_uncommitted), db_(db), @@ -34,12 +68,6 @@ class WriteUnpreparedTxnReadCallback : public ReadCallback { virtual bool IsVisibleFullCheck(SequenceNumber seq) override; - bool CanReseekToSkip() override { - return wup_snapshot_ == max_visible_seq_; - // Otherwise our own writes uncommitted are in db, and the assumptions - // behind reseek optimizations are no longer valid. - } - void Refresh(SequenceNumber seq) override { max_visible_seq_ = std::max(max_visible_seq_, seq); wup_snapshot_ = seq; @@ -130,6 +158,7 @@ class WriteUnpreparedTxn : public WritePreparedTxn { Status MaybeFlushWriteBatchToDB(); Status FlushWriteBatchToDB(bool prepared); + Status HandleWrite(std::function do_write); // For write unprepared, we check on every writebatch append to see if // max_write_batch_size_ has been exceeded, and then call @@ -153,6 +182,20 @@ class WriteUnpreparedTxn : public WritePreparedTxn { // locked for efficiency reasons. For recovered transactions, skip unlocking // keys when transaction ends. bool recovered_txn_; + + // Track the largest sequence number at which we performed snapshot + // validation. If snapshot validation was skipped because no snapshot was set, + // then this is set to kMaxSequenceNumber. This value is useful because it + // means that for keys that have unprepared seqnos, we can guarantee that no + // committed keys by other transactions can exist between + // largest_validated_seq_ and max_unprep_seq. See + // WriteUnpreparedTxnDB::NewIterator for an explanation for why this is + // necessary for iterator Prev(). + // + // Currently this value only increases during the lifetime of a transaction, + // but in some cases, we should be able to restore the previously largest + // value when calling RollbackToSavepoint. + SequenceNumber largest_validated_seq_; }; } // namespace rocksdb diff --git a/utilities/transactions/write_unprepared_txn_db.cc b/utilities/transactions/write_unprepared_txn_db.cc index c4be058bb..c3fcd1f45 100644 --- a/utilities/transactions/write_unprepared_txn_db.cc +++ b/utilities/transactions/write_unprepared_txn_db.cc @@ -368,25 +368,77 @@ Iterator* WriteUnpreparedTxnDB::NewIterator(const ReadOptions& options, constexpr bool ALLOW_BLOB = true; constexpr bool ALLOW_REFRESH = true; std::shared_ptr own_snapshot = nullptr; - SequenceNumber snapshot_seq; + SequenceNumber snapshot_seq = kMaxSequenceNumber; SequenceNumber min_uncommitted = 0; - if (options.snapshot != nullptr) { - snapshot_seq = options.snapshot->GetSequenceNumber(); - min_uncommitted = - static_cast_with_check( - options.snapshot) - ->min_uncommitted_; - } else { - auto* snapshot = GetSnapshot(); - // We take a snapshot to make sure that the related data in the commit map - // are not deleted. - snapshot_seq = snapshot->GetSequenceNumber(); - min_uncommitted = - static_cast_with_check(snapshot) - ->min_uncommitted_; + + // Currently, the Prev() iterator logic does not work well without snapshot + // validation. The logic simply iterates through values of a key in + // ascending seqno order, stopping at the first non-visible value and + // returning the last visible value. + // + // For example, if snapshot sequence is 3, and we have the following keys: + // foo: v1 1 + // foo: v2 2 + // foo: v3 3 + // foo: v4 4 + // foo: v5 5 + // + // Then 1, 2, 3 will be visible, but 4 will be non-visible, so we return v3, + // which is the last visible key. + // + // For unprepared transactions, if we have snap_seq = 3, but the current + // transaction has unprep_seq 5, then returning the first non-visible key + // would be incorrect, as we should return v5, and not v3. The problem is that + // there are committed keys at snapshot_seq < commit_seq < unprep_seq. + // + // Snapshot validation can prevent this problem by ensuring that no committed + // keys exist at snapshot_seq < commit_seq, and thus any value with a sequence + // number greater than snapshot_seq must be unprepared keys. For example, if + // the transaction had a snapshot at 3, then snapshot validation would be + // performed during the Put(v5) call. It would find v4, and the Put would fail + // with snapshot validation failure. + // + // Because of this, if any writes have occurred, then the transaction snapshot + // must be used for the iterator. If no writes have occurred though, we can + // simply create a snapshot. Later writes would not be visible though, but we + // don't support iterating while writing anyway. + // + // TODO(lth): Improve Prev() logic to continue iterating until + // max_visible_seq, and then return the last visible key, so that this + // restriction can be lifted. + const Snapshot* snapshot = nullptr; + if (options.snapshot == nullptr) { + snapshot = GetSnapshot(); own_snapshot = std::make_shared(db_impl_, snapshot); + } else { + snapshot = options.snapshot; } + + snapshot_seq = snapshot->GetSequenceNumber(); assert(snapshot_seq != kMaxSequenceNumber); + // Iteration is safe as long as largest_validated_seq <= snapshot_seq. We are + // guaranteed that for keys that were modified by this transaction (and thus + // might have unprepared versions), no committed versions exist at + // largest_validated_seq < commit_seq (or the contrapositive: any committed + // version must exist at commit_seq <= largest_validated_seq). This implies + // that commit_seq <= largest_validated_seq <= snapshot_seq or commit_seq <= + // snapshot_seq. As explained above, the problem with Prev() only happens when + // snapshot_seq < commit_seq. + // + // For keys that were not modified by this transaction, largest_validated_seq_ + // is meaningless, and Prev() should just work with the existing visibility + // logic. + if (txn->largest_validated_seq_ > snapshot->GetSequenceNumber() && + !txn->unprep_seqs_.empty()) { + ROCKS_LOG_ERROR(info_log_, + "WriteUnprepared iterator creation failed since the " + "transaction has performed unvalidated writes"); + return nullptr; + } + min_uncommitted = + static_cast_with_check(snapshot) + ->min_uncommitted_; + auto* cfd = reinterpret_cast(column_family)->cfd(); auto* state = new IteratorState(this, snapshot_seq, own_snapshot, min_uncommitted, txn);