diff --git a/db/db_impl.h b/db/db_impl.h index ff61577c4..3259189e5 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -550,9 +550,18 @@ class DBImpl : public DB { WriteBatch* batch_; // The seq number of the first key in the batch SequenceNumber seq_; + // Number of sub-batched. A new sub-batch is created if we txn attempts to + // inserts a duplicate key,seq to memtable. This is currently used in + // WritePrparedTxn + size_t batch_cnt_; explicit RecoveredTransaction(const uint64_t log, const std::string& name, - WriteBatch* batch, SequenceNumber seq) - : log_number_(log), name_(name), batch_(batch), seq_(seq) {} + WriteBatch* batch, SequenceNumber seq, + size_t batch_cnt) + : log_number_(log), + name_(name), + batch_(batch), + seq_(seq), + batch_cnt_(batch_cnt) {} ~RecoveredTransaction() { delete batch_; } }; @@ -574,9 +583,10 @@ class DBImpl : public DB { } void InsertRecoveredTransaction(const uint64_t log, const std::string& name, - WriteBatch* batch, SequenceNumber seq) { + WriteBatch* batch, SequenceNumber seq, + size_t batch_cnt) { recovered_transactions_[name] = - new RecoveredTransaction(log, name, batch, seq); + new RecoveredTransaction(log, name, batch, seq, batch_cnt); MarkLogAsContainingPrepSection(log); } diff --git a/db/write_batch.cc b/db/write_batch.cc index 4e257b319..ff4c1a96d 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -978,6 +978,60 @@ Status WriteBatch::PopSavePoint() { return Status::OK(); } +// TODO(myabandeh): move it to util +namespace { +// During recovery if the memtable is flushed we cannot rely on its help on +// duplicate key detection and as key insert will not be attempted. This class +// will be used as a emulator of memtable to tell if insertion of a key/seq +// would have resulted in duplication. +class DuplicateDetector { + public: + explicit DuplicateDetector(DBImpl* db) : db_(db) {} + bool IsDuplicateKeySeq(uint32_t cf, const Slice& key, SequenceNumber seq) { + assert(seq >= batch_seq_); + if (batch_seq_ != seq) { // it is a new batch + keys_.clear(); + } + batch_seq_ = seq; + CFKeys& cf_keys = keys_[cf]; + if (cf_keys.size() == 0) { // just inserted + InitWithComp(cf); + } + auto it = cf_keys.insert(key); + if (it.second == false) { // second is false if a element already existed. + keys_.clear(); + InitWithComp(cf); + keys_[cf].insert(key); + return true; + } + return false; + } + + private: + SequenceNumber batch_seq_ = 0; + DBImpl* db_; + // A comparator to be used in std::set + struct SetComparator { + explicit SetComparator() : user_comparator_(BytewiseComparator()) {} + explicit SetComparator(const Comparator* user_comparator) + : user_comparator_(user_comparator ? user_comparator + : BytewiseComparator()) {} + bool operator()(const Slice& lhs, const Slice& rhs) const { + return user_comparator_->Compare(lhs, rhs) < 0; + } + + private: + const Comparator* user_comparator_; + }; + using CFKeys = std::set; + std::map keys_; + void InitWithComp(const uint32_t cf) { + auto cmp = db_->GetColumnFamilyHandle(cf)->GetComparator(); + keys_[cf] = CFKeys(SetComparator(cmp)); + } +}; +} // anonymous namespace + class MemTableInserter : public WriteBatch::Handler { SequenceNumber sequence_; @@ -1008,6 +1062,7 @@ class MemTableInserter : public WriteBatch::Handler { bool seq_per_batch_; // Whether the memtable write will be done only after the commit bool write_after_commit_; + DuplicateDetector duplicate_detector_; MemPostInfoMap& GetPostMap() { assert(concurrent_memtable_writes_); @@ -1045,7 +1100,8 @@ class MemTableInserter : public WriteBatch::Handler { // Write after commit currently uses one seq per key (instead of per // batch). So seq_per_batch being false indicates write_after_commit // approach. - write_after_commit_(!seq_per_batch) { + write_after_commit_(!seq_per_batch), + duplicate_detector_(db_) { assert(cf_mems_); } @@ -1135,17 +1191,25 @@ class MemTableInserter : public WriteBatch::Handler { Status PutCFImpl(uint32_t column_family_id, const Slice& key, const Slice& value, ValueType value_type) { - if (rebuilding_trx_ != nullptr) { + // optimize for non-recovery mode + if (UNLIKELY(write_after_commit_ && rebuilding_trx_ != nullptr)) { WriteBatchInternal::Put(rebuilding_trx_, column_family_id, key, value); - if (write_after_commit_) { - return Status::OK(); - } + return Status::OK(); // else insert the values to the memtable right away } Status seek_status; - if (!SeekToColumnFamily(column_family_id, &seek_status)) { - MaybeAdvanceSeq(); + if (UNLIKELY(!SeekToColumnFamily(column_family_id, &seek_status))) { + bool batch_boundry = false; + if (rebuilding_trx_ != nullptr) { + assert(!write_after_commit_); + // The CF is probabely flushed and hence no need for insert but we still + // need to keep track of the keys for upcoming rollback/commit. + WriteBatchInternal::Put(rebuilding_trx_, column_family_id, key, value); + batch_boundry = duplicate_detector_.IsDuplicateKeySeq(column_family_id, + key, sequence_); + } + MaybeAdvanceSeq(batch_boundry); return seek_status; } Status ret_status; @@ -1215,6 +1279,13 @@ class MemTableInserter : public WriteBatch::Handler { } } } + // optimize for non-recovery mode + if (UNLIKELY(!ret_status.IsTryAgain() && rebuilding_trx_ != nullptr)) { + assert(!write_after_commit_); + // If the ret_status is TryAgain then let the next try to add the ky to + // the the rebuilding transaction object. + WriteBatchInternal::Put(rebuilding_trx_, column_family_id, key, value); + } // Since all Puts are logged in trasaction logs (if enabled), always bump // sequence number. Even if the update eventually fails and does not result // in memtable add/update. @@ -1248,57 +1319,102 @@ class MemTableInserter : public WriteBatch::Handler { virtual Status DeleteCF(uint32_t column_family_id, const Slice& key) override { - if (rebuilding_trx_ != nullptr) { + // optimize for non-recovery mode + if (UNLIKELY(write_after_commit_ && rebuilding_trx_ != nullptr)) { WriteBatchInternal::Delete(rebuilding_trx_, column_family_id, key); - if (write_after_commit_) { - return Status::OK(); - } + return Status::OK(); // else insert the values to the memtable right away } Status seek_status; - if (!SeekToColumnFamily(column_family_id, &seek_status)) { - MaybeAdvanceSeq(); + if (UNLIKELY(!SeekToColumnFamily(column_family_id, &seek_status))) { + bool batch_boundry = false; + if (rebuilding_trx_ != nullptr) { + assert(!write_after_commit_); + // The CF is probabely flushed and hence no need for insert but we still + // need to keep track of the keys for upcoming rollback/commit. + WriteBatchInternal::Delete(rebuilding_trx_, column_family_id, key); + batch_boundry = duplicate_detector_.IsDuplicateKeySeq(column_family_id, + key, sequence_); + } + MaybeAdvanceSeq(batch_boundry); return seek_status; } - return DeleteImpl(column_family_id, key, Slice(), kTypeDeletion); + auto ret_status = DeleteImpl(column_family_id, key, Slice(), kTypeDeletion); + // optimize for non-recovery mode + if (UNLIKELY(!ret_status.IsTryAgain() && rebuilding_trx_ != nullptr)) { + assert(!write_after_commit_); + // If the ret_status is TryAgain then let the next try to add the ky to + // the the rebuilding transaction object. + WriteBatchInternal::Delete(rebuilding_trx_, column_family_id, key); + } + return ret_status; } virtual Status SingleDeleteCF(uint32_t column_family_id, const Slice& key) override { - if (rebuilding_trx_ != nullptr) { + // optimize for non-recovery mode + if (UNLIKELY(write_after_commit_ && rebuilding_trx_ != nullptr)) { WriteBatchInternal::SingleDelete(rebuilding_trx_, column_family_id, key); - if (write_after_commit_) { - return Status::OK(); - } + return Status::OK(); // else insert the values to the memtable right away } Status seek_status; - if (!SeekToColumnFamily(column_family_id, &seek_status)) { - MaybeAdvanceSeq(); + if (UNLIKELY(!SeekToColumnFamily(column_family_id, &seek_status))) { + bool batch_boundry = false; + if (rebuilding_trx_ != nullptr) { + assert(!write_after_commit_); + // The CF is probabely flushed and hence no need for insert but we still + // need to keep track of the keys for upcoming rollback/commit. + WriteBatchInternal::SingleDelete(rebuilding_trx_, column_family_id, + key); + batch_boundry = duplicate_detector_.IsDuplicateKeySeq(column_family_id, + key, sequence_); + } + MaybeAdvanceSeq(batch_boundry); return seek_status; } - return DeleteImpl(column_family_id, key, Slice(), kTypeSingleDeletion); + auto ret_status = + DeleteImpl(column_family_id, key, Slice(), kTypeSingleDeletion); + // optimize for non-recovery mode + if (UNLIKELY(!ret_status.IsTryAgain() && rebuilding_trx_ != nullptr)) { + assert(!write_after_commit_); + // If the ret_status is TryAgain then let the next try to add the ky to + // the the rebuilding transaction object. + WriteBatchInternal::SingleDelete(rebuilding_trx_, column_family_id, key); + } + return ret_status; } virtual Status DeleteRangeCF(uint32_t column_family_id, const Slice& begin_key, const Slice& end_key) override { - if (rebuilding_trx_ != nullptr) { + // optimize for non-recovery mode + if (UNLIKELY(write_after_commit_ && rebuilding_trx_ != nullptr)) { WriteBatchInternal::DeleteRange(rebuilding_trx_, column_family_id, begin_key, end_key); - if (write_after_commit_) { - return Status::OK(); - } + return Status::OK(); // else insert the values to the memtable right away } Status seek_status; - if (!SeekToColumnFamily(column_family_id, &seek_status)) { - MaybeAdvanceSeq(); + if (UNLIKELY(!SeekToColumnFamily(column_family_id, &seek_status))) { + bool batch_boundry = false; + if (rebuilding_trx_ != nullptr) { + assert(!write_after_commit_); + // The CF is probabely flushed and hence no need for insert but we still + // need to keep track of the keys for upcoming rollback/commit. + WriteBatchInternal::DeleteRange(rebuilding_trx_, column_family_id, + begin_key, end_key); + // TODO(myabandeh): when transctional DeleteRange support is added, + // check if end_key must also be added. + batch_boundry = duplicate_detector_.IsDuplicateKeySeq( + column_family_id, begin_key, sequence_); + } + MaybeAdvanceSeq(batch_boundry); return seek_status; } if (db_ != nullptr) { @@ -1315,23 +1431,42 @@ class MemTableInserter : public WriteBatch::Handler { } } - return DeleteImpl(column_family_id, begin_key, end_key, kTypeRangeDeletion); + auto ret_status = + DeleteImpl(column_family_id, begin_key, end_key, kTypeRangeDeletion); + // optimize for non-recovery mode + if (UNLIKELY(!ret_status.IsTryAgain() && rebuilding_trx_ != nullptr)) { + assert(!write_after_commit_); + // If the ret_status is TryAgain then let the next try to add the ky to + // the the rebuilding transaction object. + WriteBatchInternal::DeleteRange(rebuilding_trx_, column_family_id, + begin_key, end_key); + } + return ret_status; } virtual Status MergeCF(uint32_t column_family_id, const Slice& key, const Slice& value) override { assert(!concurrent_memtable_writes_); - if (rebuilding_trx_ != nullptr) { + // optimize for non-recovery mode + if (UNLIKELY(write_after_commit_ && rebuilding_trx_ != nullptr)) { WriteBatchInternal::Merge(rebuilding_trx_, column_family_id, key, value); - if (write_after_commit_) { - return Status::OK(); - } + return Status::OK(); // else insert the values to the memtable right away } Status seek_status; - if (!SeekToColumnFamily(column_family_id, &seek_status)) { - MaybeAdvanceSeq(); + if (UNLIKELY(!SeekToColumnFamily(column_family_id, &seek_status))) { + bool batch_boundry = false; + if (rebuilding_trx_ != nullptr) { + assert(!write_after_commit_); + // The CF is probabely flushed and hence no need for insert but we still + // need to keep track of the keys for upcoming rollback/commit. + WriteBatchInternal::Merge(rebuilding_trx_, column_family_id, key, + value); + batch_boundry = duplicate_detector_.IsDuplicateKeySeq(column_family_id, + key, sequence_); + } + MaybeAdvanceSeq(batch_boundry); return seek_status; } @@ -1412,6 +1547,13 @@ class MemTableInserter : public WriteBatch::Handler { } } + // optimize for non-recovery mode + if (UNLIKELY(!ret_status.IsTryAgain() && rebuilding_trx_ != nullptr)) { + assert(!write_after_commit_); + // If the ret_status is TryAgain then let the next try to add the ky to + // the the rebuilding transaction object. + WriteBatchInternal::Merge(rebuilding_trx_, column_family_id, key, value); + } MaybeAdvanceSeq(); CheckMemtableFull(); return ret_status; @@ -1466,8 +1608,13 @@ class MemTableInserter : public WriteBatch::Handler { if (recovering_log_number_ != 0) { assert(db_->allow_2pc()); + size_t batch_cnt = + write_after_commit_ + ? 0 // 0 will disable further checks + : static_cast(sequence_ - rebuilding_trx_seq_ + 1); db_->InsertRecoveredTransaction(recovering_log_number_, name.ToString(), - rebuilding_trx_, rebuilding_trx_seq_); + rebuilding_trx_, rebuilding_trx_seq_, + batch_cnt); rebuilding_trx_ = nullptr; } else { assert(rebuilding_trx_ == nullptr); diff --git a/include/rocksdb/utilities/write_batch_with_index.h b/include/rocksdb/utilities/write_batch_with_index.h index a0c9635da..0ce166890 100644 --- a/include/rocksdb/utilities/write_batch_with_index.h +++ b/include/rocksdb/utilities/write_batch_with_index.h @@ -227,6 +227,7 @@ class WriteBatchWithIndex : public WriteBatchBase { void SetMaxBytes(size_t max_bytes) override; private: + friend class PessimisticTransactionDB; friend class WritePreparedTxn; friend class WriteBatchWithIndex_SubBatchCnt_Test; // Returns the number of sub-batches inside the write batch. A sub-batch diff --git a/utilities/transactions/pessimistic_transaction_db.cc b/utilities/transactions/pessimistic_transaction_db.cc index cd75209f1..a3b3a622a 100644 --- a/utilities/transactions/pessimistic_transaction_db.cc +++ b/utilities/transactions/pessimistic_transaction_db.cc @@ -142,6 +142,11 @@ Status PessimisticTransactionDB::Initialize( } s = real_trx->RebuildFromWriteBatch(recovered_trx->batch_); + // WriteCommitted set this to to disable this check that is specific to + // WritePrepared txns + assert(recovered_trx->batch_cnt_ == 0 || + real_trx->GetWriteBatch()->SubBatchCnt() == + recovered_trx->batch_cnt_); real_trx->SetState(Transaction::PREPARED); if (!s.ok()) { break; diff --git a/utilities/transactions/pessimistic_transaction_db.h b/utilities/transactions/pessimistic_transaction_db.h index 1386b5c22..af4114cc2 100644 --- a/utilities/transactions/pessimistic_transaction_db.h +++ b/utilities/transactions/pessimistic_transaction_db.h @@ -135,6 +135,7 @@ class PessimisticTransactionDB : public TransactionDB { friend class WritePreparedTxnDB; friend class WritePreparedTxnDBMock; friend class TransactionTest_DoubleEmptyWrite_Test; + friend class TransactionTest_DuplicateKeys_Test; friend class TransactionTest_PersistentTwoPhaseTransactionTest_Test; friend class TransactionTest_TwoPhaseLongPrepareTest_Test; friend class TransactionTest_TwoPhaseDoubleRecoveryTest_Test; diff --git a/utilities/transactions/transaction_test.cc b/utilities/transactions/transaction_test.cc index 2015d314c..9d9d06082 100644 --- a/utilities/transactions/transaction_test.cc +++ b/utilities/transactions/transaction_test.cc @@ -5051,6 +5051,36 @@ TEST_P(TransactionTest, Optimizations) { } } +// A comparator that uses only the first three bytes +class ThreeBytewiseComparator : public Comparator { + public: + ThreeBytewiseComparator() {} + virtual const char* Name() const override { + return "test.ThreeBytewiseComparator"; + } + virtual int Compare(const Slice& a, const Slice& b) const override { + Slice na = Slice(a.data(), a.size() < 3 ? a.size() : 3); + Slice nb = Slice(b.data(), b.size() < 3 ? b.size() : 3); + return na.compare(nb); + } + virtual bool Equal(const Slice& a, const Slice& b) const override { + Slice na = Slice(a.data(), a.size() < 3 ? a.size() : 3); + Slice nb = Slice(b.data(), b.size() < 3 ? b.size() : 3); + return na == nb; + } + // This methods below dont seem relevant to this test. Implement them if + // proven othersize. + void FindShortestSeparator(std::string* start, + const Slice& limit) const override { + const Comparator* bytewise_comp = BytewiseComparator(); + bytewise_comp->FindShortestSeparator(start, limit); + } + void FindShortSuccessor(std::string* key) const override { + const Comparator* bytewise_comp = BytewiseComparator(); + bytewise_comp->FindShortSuccessor(key); + } +}; + // Test that the transactional db can handle duplicate keys in the write batch TEST_P(TransactionTest, DuplicateKeys) { ColumnFamilyOptions cf_options; @@ -5090,35 +5120,6 @@ TEST_P(TransactionTest, DuplicateKeys) { // Test with non-bytewise comparator { - // A comparator that uses only the first three bytes - class ThreeBytewiseComparator : public Comparator { - public: - ThreeBytewiseComparator() {} - virtual const char* Name() const override { - return "test.ThreeBytewiseComparator"; - } - virtual int Compare(const Slice& a, const Slice& b) const override { - Slice na = Slice(a.data(), a.size() < 3 ? a.size() : 3); - Slice nb = Slice(b.data(), b.size() < 3 ? b.size() : 3); - return na.compare(nb); - } - virtual bool Equal(const Slice& a, const Slice& b) const override { - Slice na = Slice(a.data(), a.size() < 3 ? a.size() : 3); - Slice nb = Slice(b.data(), b.size() < 3 ? b.size() : 3); - return na == nb; - } - // This methods below dont seem relevant to this test. Implement them if - // proven othersize. - void FindShortestSeparator(std::string* start, - const Slice& limit) const override { - const Comparator* bytewise_comp = BytewiseComparator(); - bytewise_comp->FindShortestSeparator(start, limit); - } - void FindShortSuccessor(std::string* key) const override { - const Comparator* bytewise_comp = BytewiseComparator(); - bytewise_comp->FindShortSuccessor(key); - } - }; ReOpen(); std::unique_ptr comp_gc(new ThreeBytewiseComparator()); cf_options.comparator = comp_gc.get(); @@ -5128,6 +5129,8 @@ TEST_P(TransactionTest, DuplicateKeys) { batch.Put(cf_handle, Slice("key"), Slice("value")); // The first three bytes are the same, do it must be counted as duplicate batch.Put(cf_handle, Slice("key2"), Slice("value2")); + // check for 2nd duplicate key in cf with non-default comparator + batch.Put(cf_handle, Slice("key2b"), Slice("value2b")); ASSERT_OK(db->Write(write_options, &batch)); // The value must be the most recent value for all the keys equal to "key", @@ -5135,7 +5138,7 @@ TEST_P(TransactionTest, DuplicateKeys) { ReadOptions ropt; PinnableSlice pinnable_val; ASSERT_OK(db->Get(ropt, cf_handle, "key", &pinnable_val)); - ASSERT_TRUE(pinnable_val == ("value2")); + ASSERT_TRUE(pinnable_val == ("value2b")); // Test duplicate keys with rollback TransactionOptions txn_options; @@ -5145,7 +5148,7 @@ TEST_P(TransactionTest, DuplicateKeys) { ASSERT_OK(txn0->Merge(cf_handle, Slice("key4"), Slice("value4"))); ASSERT_OK(txn0->Rollback()); ASSERT_OK(db->Get(ropt, cf_handle, "key5", &pinnable_val)); - ASSERT_TRUE(pinnable_val == ("value2")); + ASSERT_TRUE(pinnable_val == ("value2b")); delete txn0; delete cf_handle; @@ -5321,6 +5324,212 @@ TEST_P(TransactionTest, DuplicateKeys) { ASSERT_OK(txn0->Commit()); delete txn0; } + + // Test sucessfull recovery after a crash + { + ReOpen(); + TransactionOptions txn_options; + WriteOptions write_options; + ReadOptions ropt; + Transaction* txn0; + PinnableSlice pinnable_val; + Status s; + + std::unique_ptr comp_gc(new ThreeBytewiseComparator()); + cf_options.comparator = comp_gc.get(); + ASSERT_OK(db->CreateColumnFamily(cf_options, cf_name, &cf_handle)); + delete cf_handle; + std::vector cfds{ + ColumnFamilyDescriptor(kDefaultColumnFamilyName, + ColumnFamilyOptions(options)), + ColumnFamilyDescriptor(cf_name, cf_options), + }; + std::vector handles; + ASSERT_OK(ReOpenNoDelete(cfds, &handles)); + + ASSERT_OK(db->Put(write_options, "foo0", "init")); + ASSERT_OK(db->Put(write_options, "foo1", "init")); + ASSERT_OK(db->Put(write_options, handles[1], "foo0", "init")); + ASSERT_OK(db->Put(write_options, handles[1], "foo1", "init")); + + // one entry + txn0 = db->BeginTransaction(write_options, txn_options); + ASSERT_OK(txn0->SetName("xid")); + ASSERT_OK(txn0->Put(Slice("foo0"), Slice("bar0a"))); + ASSERT_OK(txn0->Prepare()); + delete txn0; + // This will check the asserts inside recovery code + db->FlushWAL(true); + reinterpret_cast(db)->TEST_Crash(); + ASSERT_OK(ReOpenNoDelete(cfds, &handles)); + txn0 = db->GetTransactionByName("xid"); + ASSERT_TRUE(txn0 != nullptr); + ASSERT_OK(txn0->Commit()); + delete txn0; + s = db->Get(ropt, db->DefaultColumnFamily(), "foo0", &pinnable_val); + ASSERT_OK(s); + ASSERT_TRUE(pinnable_val == ("bar0a")); + + // two entries, no duplicate + txn0 = db->BeginTransaction(write_options, txn_options); + ASSERT_OK(txn0->SetName("xid")); + ASSERT_OK(txn0->Put(handles[1], Slice("foo0"), Slice("bar0b"))); + ASSERT_OK(txn0->Put(handles[1], Slice("fol1"), Slice("bar1b"))); + ASSERT_OK(txn0->Put(Slice("foo0"), Slice("bar0b"))); + ASSERT_OK(txn0->Put(Slice("foo1"), Slice("bar1b"))); + ASSERT_OK(txn0->Prepare()); + delete txn0; + // This will check the asserts inside recovery code + db->FlushWAL(true); + // Flush only cf 1 + reinterpret_cast(db->GetRootDB()) + ->TEST_FlushMemTable(true, handles[1]); + reinterpret_cast(db)->TEST_Crash(); + ASSERT_OK(ReOpenNoDelete(cfds, &handles)); + txn0 = db->GetTransactionByName("xid"); + ASSERT_TRUE(txn0 != nullptr); + ASSERT_OK(txn0->Commit()); + delete txn0; + pinnable_val.Reset(); + s = db->Get(ropt, db->DefaultColumnFamily(), "foo0", &pinnable_val); + ASSERT_OK(s); + ASSERT_TRUE(pinnable_val == ("bar0b")); + pinnable_val.Reset(); + s = db->Get(ropt, db->DefaultColumnFamily(), "foo1", &pinnable_val); + ASSERT_OK(s); + ASSERT_TRUE(pinnable_val == ("bar1b")); + pinnable_val.Reset(); + s = db->Get(ropt, handles[1], "foo0", &pinnable_val); + ASSERT_OK(s); + ASSERT_TRUE(pinnable_val == ("bar0b")); + pinnable_val.Reset(); + s = db->Get(ropt, handles[1], "fol1", &pinnable_val); + ASSERT_OK(s); + ASSERT_TRUE(pinnable_val == ("bar1b")); + + // one duplicate with ::Put + txn0 = db->BeginTransaction(write_options, txn_options); + ASSERT_OK(txn0->SetName("xid")); + ASSERT_OK(txn0->Put(handles[1], Slice("key-nonkey0"), Slice("bar0c"))); + ASSERT_OK(txn0->Put(handles[1], Slice("key-nonkey1"), Slice("bar1d"))); + ASSERT_OK(txn0->Put(Slice("foo0"), Slice("bar0c"))); + ASSERT_OK(txn0->Put(Slice("foo1"), Slice("bar1c"))); + ASSERT_OK(txn0->Put(Slice("foo0"), Slice("bar0d"))); + ASSERT_OK(txn0->Prepare()); + delete txn0; + // This will check the asserts inside recovery code + db->FlushWAL(true); + // Flush only cf 1 + reinterpret_cast(db->GetRootDB()) + ->TEST_FlushMemTable(true, handles[1]); + reinterpret_cast(db)->TEST_Crash(); + ASSERT_OK(ReOpenNoDelete(cfds, &handles)); + txn0 = db->GetTransactionByName("xid"); + ASSERT_TRUE(txn0 != nullptr); + ASSERT_OK(txn0->Commit()); + delete txn0; + pinnable_val.Reset(); + s = db->Get(ropt, db->DefaultColumnFamily(), "foo0", &pinnable_val); + ASSERT_OK(s); + ASSERT_TRUE(pinnable_val == ("bar0d")); + pinnable_val.Reset(); + s = db->Get(ropt, db->DefaultColumnFamily(), "foo1", &pinnable_val); + ASSERT_OK(s); + ASSERT_TRUE(pinnable_val == ("bar1c")); + pinnable_val.Reset(); + s = db->Get(ropt, handles[1], "key-nonkey2", &pinnable_val); + ASSERT_OK(s); + ASSERT_TRUE(pinnable_val == ("bar1d")); + + // Duplicate with ::Put, ::Delete + txn0 = db->BeginTransaction(write_options, txn_options); + ASSERT_OK(txn0->SetName("xid")); + ASSERT_OK(txn0->Put(handles[1], Slice("key-nonkey0"), Slice("bar0e"))); + ASSERT_OK(txn0->Delete(handles[1], Slice("key-nonkey1"))); + ASSERT_OK(txn0->Put(Slice("foo0"), Slice("bar0e"))); + ASSERT_OK(txn0->Delete(Slice("foo0"))); + ASSERT_OK(txn0->Prepare()); + delete txn0; + // This will check the asserts inside recovery code + db->FlushWAL(true); + // Flush only cf 1 + reinterpret_cast(db->GetRootDB()) + ->TEST_FlushMemTable(true, handles[1]); + reinterpret_cast(db)->TEST_Crash(); + ASSERT_OK(ReOpenNoDelete(cfds, &handles)); + txn0 = db->GetTransactionByName("xid"); + ASSERT_TRUE(txn0 != nullptr); + ASSERT_OK(txn0->Commit()); + delete txn0; + pinnable_val.Reset(); + s = db->Get(ropt, db->DefaultColumnFamily(), "foo0", &pinnable_val); + ASSERT_TRUE(s.IsNotFound()); + pinnable_val.Reset(); + s = db->Get(ropt, handles[1], "key-nonkey2", &pinnable_val); + ASSERT_TRUE(s.IsNotFound()); + + // Duplicate with ::Put, ::SingleDelete + txn0 = db->BeginTransaction(write_options, txn_options); + ASSERT_OK(txn0->SetName("xid")); + ASSERT_OK(txn0->Put(handles[1], Slice("key-nonkey0"), Slice("bar0g"))); + ASSERT_OK(txn0->SingleDelete(handles[1], Slice("key-nonkey1"))); + ASSERT_OK(txn0->Put(Slice("foo0"), Slice("bar0e"))); + ASSERT_OK(txn0->SingleDelete(Slice("foo0"))); + ASSERT_OK(txn0->Prepare()); + delete txn0; + // This will check the asserts inside recovery code + db->FlushWAL(true); + // Flush only cf 1 + reinterpret_cast(db->GetRootDB()) + ->TEST_FlushMemTable(true, handles[1]); + reinterpret_cast(db)->TEST_Crash(); + ASSERT_OK(ReOpenNoDelete(cfds, &handles)); + txn0 = db->GetTransactionByName("xid"); + ASSERT_TRUE(txn0 != nullptr); + ASSERT_OK(txn0->Commit()); + delete txn0; + pinnable_val.Reset(); + s = db->Get(ropt, db->DefaultColumnFamily(), "foo0", &pinnable_val); + ASSERT_TRUE(s.IsNotFound()); + pinnable_val.Reset(); + s = db->Get(ropt, handles[1], "key-nonkey2", &pinnable_val); + ASSERT_TRUE(s.IsNotFound()); + + // Duplicate with ::Put, ::Merge + txn0 = db->BeginTransaction(write_options, txn_options); + ASSERT_OK(txn0->SetName("xid")); + ASSERT_OK(txn0->Put(handles[1], Slice("key-nonkey0"), Slice("bar1i"))); + ASSERT_OK(txn0->Merge(handles[1], Slice("key-nonkey1"), Slice("bar1j"))); + ASSERT_OK(txn0->Put(Slice("foo0"), Slice("bar0f"))); + ASSERT_OK(txn0->Merge(Slice("foo0"), Slice("bar0g"))); + ASSERT_OK(txn0->Prepare()); + delete txn0; + // This will check the asserts inside recovery code + db->FlushWAL(true); + // Flush only cf 1 + reinterpret_cast(db->GetRootDB()) + ->TEST_FlushMemTable(true, handles[1]); + reinterpret_cast(db)->TEST_Crash(); + ASSERT_OK(ReOpenNoDelete(cfds, &handles)); + txn0 = db->GetTransactionByName("xid"); + ASSERT_TRUE(txn0 != nullptr); + ASSERT_OK(txn0->Commit()); + delete txn0; + pinnable_val.Reset(); + s = db->Get(ropt, db->DefaultColumnFamily(), "foo0", &pinnable_val); + ASSERT_OK(s); + ASSERT_TRUE(pinnable_val == ("bar0f,bar0g")); + pinnable_val.Reset(); + s = db->Get(ropt, handles[1], "key-nonkey2", &pinnable_val); + ASSERT_OK(s); + ASSERT_TRUE(pinnable_val == ("bar1i,bar1j")); + + for (auto h : handles) { + delete h; + } + delete db; + db = nullptr; + } } } // namespace rocksdb diff --git a/utilities/transactions/transaction_test.h b/utilities/transactions/transaction_test.h index 950af6c13..beec0df40 100644 --- a/utilities/transactions/transaction_test.h +++ b/utilities/transactions/transaction_test.h @@ -101,6 +101,27 @@ class TransactionTestBase : public ::testing::Test { return s; } + Status ReOpenNoDelete(std::vector& cfs, + std::vector* handles) { + for (auto h : *handles) { + delete h; + } + handles->clear(); + delete db; + db = nullptr; + env->AssertNoOpenFile(); + env->DropUnsyncedFileData(); + env->ResetState(); + Status s; + if (use_stackable_db_ == false) { + s = TransactionDB::Open(options, txn_db_options, dbname, cfs, handles, + &db); + } else { + s = OpenWithStackableDB(cfs, handles); + } + return s; + } + Status ReOpen() { delete db; DestroyDB(dbname, options); @@ -113,6 +134,24 @@ class TransactionTestBase : public ::testing::Test { return s; } + Status OpenWithStackableDB(std::vector& cfs, + std::vector* handles) { + std::vector compaction_enabled_cf_indices; + TransactionDB::PrepareWrap(&options, &cfs, &compaction_enabled_cf_indices); + DB* root_db; + Options options_copy(options); + const bool use_seq_per_batch = + txn_db_options.write_policy == WRITE_PREPARED; + Status s = DBImpl::Open(options_copy, dbname, cfs, handles, &root_db, + use_seq_per_batch); + if (s.ok()) { + s = TransactionDB::WrapStackableDB( + new StackableDB(root_db), txn_db_options, + compaction_enabled_cf_indices, *handles, &db); + } + return s; + } + Status OpenWithStackableDB() { std::vector compaction_enabled_cf_indices; std::vector column_families{ColumnFamilyDescriptor( diff --git a/utilities/transactions/write_prepared_txn_db.cc b/utilities/transactions/write_prepared_txn_db.cc index d91bdccaa..accc75338 100644 --- a/utilities/transactions/write_prepared_txn_db.cc +++ b/utilities/transactions/write_prepared_txn_db.cc @@ -475,7 +475,8 @@ void WritePreparedTxnDB::AddCommitted(uint64_t prepare_seq, uint64_t commit_seq, CommitEntry64b evicted_64b; CommitEntry evicted; bool to_be_evicted = GetCommitEntry(indexed_seq, &evicted_64b, &evicted); - if (to_be_evicted) { + if (LIKELY(to_be_evicted)) { + assert(evicted.prep_seq != prepare_seq); auto prev_max = max_evicted_seq_.load(std::memory_order_acquire); ROCKS_LOG_DETAILS(info_log_, "Evicting %" PRIu64 ",%" PRIu64 " with max %" PRIu64, @@ -491,7 +492,11 @@ void WritePreparedTxnDB::AddCommitted(uint64_t prepare_seq, uint64_t commit_seq, } bool succ = ExchangeCommitEntry(indexed_seq, evicted_64b, {prepare_seq, commit_seq}); - if (!succ) { + if (UNLIKELY(!succ)) { + ROCKS_LOG_ERROR(info_log_, + "ExchangeCommitEntry failed on [%" PRIu64 "] %" PRIu64 + ",%" PRIu64 " retrying...", + indexed_seq, prepare_seq, commit_seq); // A very rare event, in which the commit entry is updated before we do. // Here we apply a very simple solution of retrying. if (loop_cnt > 100) { @@ -783,16 +788,21 @@ WritePreparedTxnDB::~WritePreparedTxnDB() { db_impl_->CancelAllBackgroundWork(true /*wait*/); } +void SubBatchCounter::InitWithComp(const uint32_t cf) { + auto cmp = comparators_[cf]; + keys_[cf] = CFKeys(SetComparator(cmp)); +} + void SubBatchCounter::AddKey(const uint32_t cf, const Slice& key) { CFKeys& cf_keys = keys_[cf]; if (cf_keys.size() == 0) { // just inserted - auto cmp = comparators_[cf]; - keys_[cf] = CFKeys(SetComparator(cmp)); + InitWithComp(cf); } auto it = cf_keys.insert(key); if (it.second == false) { // second is false if a element already existed. batches_++; keys_.clear(); + InitWithComp(cf); keys_[cf].insert(key); } } diff --git a/utilities/transactions/write_prepared_txn_db.h b/utilities/transactions/write_prepared_txn_db.h index 63b66a753..04f66bf48 100644 --- a/utilities/transactions/write_prepared_txn_db.h +++ b/utilities/transactions/write_prepared_txn_db.h @@ -541,6 +541,7 @@ struct SubBatchCounter : public WriteBatch::Handler { size_t batches_; size_t BatchCount() { return batches_; } void AddKey(const uint32_t cf, const Slice& key); + void InitWithComp(const uint32_t cf); Status MarkNoop(bool) override { return Status::OK(); } Status MarkEndPrepare(const Slice&) override { return Status::OK(); } Status MarkCommit(const Slice&) override { return Status::OK(); }