diff --git a/db/write_batch.cc b/db/write_batch.cc index ce8d9ee64..4e257b319 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -404,8 +404,9 @@ Status WriteBatch::Iterate(Handler* handler) const { Status s; char tag = 0; uint32_t column_family = 0; // default - while ((s.ok() || s.IsTryAgain()) && !input.empty() && handler->Continue()) { - if (!s.IsTryAgain()) { + while ((s.ok() || UNLIKELY(s.IsTryAgain())) && !input.empty() && + handler->Continue()) { + if (LIKELY(!s.IsTryAgain())) { tag = 0; column_family = 0; // default @@ -425,7 +426,7 @@ Status WriteBatch::Iterate(Handler* handler) const { assert(content_flags_.load(std::memory_order_relaxed) & (ContentFlags::DEFERRED | ContentFlags::HAS_PUT)); s = handler->PutCF(column_family, key, value); - if (s.ok()) { + if (LIKELY(s.ok())) { empty_batch = false; found++; } @@ -435,7 +436,7 @@ Status WriteBatch::Iterate(Handler* handler) const { assert(content_flags_.load(std::memory_order_relaxed) & (ContentFlags::DEFERRED | ContentFlags::HAS_DELETE)); s = handler->DeleteCF(column_family, key); - if (s.ok()) { + if (LIKELY(s.ok())) { empty_batch = false; found++; } @@ -445,7 +446,7 @@ Status WriteBatch::Iterate(Handler* handler) const { assert(content_flags_.load(std::memory_order_relaxed) & (ContentFlags::DEFERRED | ContentFlags::HAS_SINGLE_DELETE)); s = handler->SingleDeleteCF(column_family, key); - if (s.ok()) { + if (LIKELY(s.ok())) { empty_batch = false; found++; } @@ -455,7 +456,7 @@ Status WriteBatch::Iterate(Handler* handler) const { assert(content_flags_.load(std::memory_order_relaxed) & (ContentFlags::DEFERRED | ContentFlags::HAS_DELETE_RANGE)); s = handler->DeleteRangeCF(column_family, key, value); - if (s.ok()) { + if (LIKELY(s.ok())) { empty_batch = false; found++; } @@ -465,7 +466,7 @@ Status WriteBatch::Iterate(Handler* handler) const { assert(content_flags_.load(std::memory_order_relaxed) & (ContentFlags::DEFERRED | ContentFlags::HAS_MERGE)); s = handler->MergeCF(column_family, key, value); - if (s.ok()) { + if (LIKELY(s.ok())) { empty_batch = false; found++; } @@ -475,7 +476,7 @@ Status WriteBatch::Iterate(Handler* handler) const { assert(content_flags_.load(std::memory_order_relaxed) & (ContentFlags::DEFERRED | ContentFlags::HAS_BLOB_INDEX)); s = handler->PutBlobIndexCF(column_family, key, value); - if (s.ok()) { + if (LIKELY(s.ok())) { found++; } break; @@ -1158,7 +1159,7 @@ class MemTableInserter : public WriteBatch::Handler { bool mem_res = mem->Add(sequence_, value_type, key, value, concurrent_memtable_writes_, get_post_process_info(mem)); - if (!mem_res) { + if (UNLIKELY(!mem_res)) { assert(seq_per_batch_); ret_status = Status::TryAgain("key+seq exists"); const bool BATCH_BOUNDRY = true; @@ -1234,7 +1235,7 @@ class MemTableInserter : public WriteBatch::Handler { bool mem_res = mem->Add(sequence_, delete_type, key, value, concurrent_memtable_writes_, get_post_process_info(mem)); - if (!mem_res) { + if (UNLIKELY(!mem_res)) { assert(seq_per_batch_); ret_status = Status::TryAgain("key+seq exists"); const bool BATCH_BOUNDRY = true; @@ -1391,7 +1392,7 @@ class MemTableInserter : public WriteBatch::Handler { } else { // 3) Add value to memtable bool mem_res = mem->Add(sequence_, kTypeValue, key, new_value); - if (!mem_res) { + if (UNLIKELY(!mem_res)) { assert(seq_per_batch_); ret_status = Status::TryAgain("key+seq exists"); const bool BATCH_BOUNDRY = true; @@ -1403,7 +1404,7 @@ class MemTableInserter : public WriteBatch::Handler { if (!perform_merge) { // Add merge operator to memtable bool mem_res = mem->Add(sequence_, kTypeMerge, key, value); - if (!mem_res) { + if (UNLIKELY(!mem_res)) { assert(seq_per_batch_); ret_status = Status::TryAgain("key+seq exists"); const bool BATCH_BOUNDRY = true; diff --git a/utilities/transactions/pessimistic_transaction_db.cc b/utilities/transactions/pessimistic_transaction_db.cc index 0dd48d01e..cd75209f1 100644 --- a/utilities/transactions/pessimistic_transaction_db.cc +++ b/utilities/transactions/pessimistic_transaction_db.cc @@ -175,30 +175,6 @@ TransactionDBOptions PessimisticTransactionDB::ValidateTxnDBOptions( return validated; } -void PessimisticTransactionDB::UpdateCFComparatorMap( - const std::vector& handles) { - auto cf_map = new std::map(); - for (auto h : handles) { - auto id = h->GetID(); - const Comparator* comparator = h->GetComparator(); - (*cf_map)[id] = comparator; - } - cf_map_.store(cf_map); - cf_map_gc_.reset(cf_map); -} - -void PessimisticTransactionDB::UpdateCFComparatorMap( - const ColumnFamilyHandle* h) { - auto old_cf_map_ptr = cf_map_.load(); - assert(old_cf_map_ptr); - auto cf_map = new std::map(*old_cf_map_ptr); - auto id = h->GetID(); - const Comparator* comparator = h->GetComparator(); - (*cf_map)[id] = comparator; - cf_map_.store(cf_map); - cf_map_gc_.reset(cf_map); -} - Status TransactionDB::Open(const Options& options, const TransactionDBOptions& txn_db_options, const std::string& dbname, TransactionDB** dbptr) { @@ -228,7 +204,7 @@ Status TransactionDB::Open( Status s; DB* db; - ROCKS_LOG_WARN(db_options.info_log, "Transaction write_policy is " PRId32, + ROCKS_LOG_WARN(db_options.info_log, "Transaction write_policy is %" PRId32, static_cast(txn_db_options.write_policy)); std::vector column_families_copy = column_families; std::vector compaction_enabled_cf_indices; diff --git a/utilities/transactions/pessimistic_transaction_db.h b/utilities/transactions/pessimistic_transaction_db.h index 90360d7a3..2e1dc04d6 100644 --- a/utilities/transactions/pessimistic_transaction_db.h +++ b/utilities/transactions/pessimistic_transaction_db.h @@ -113,20 +113,17 @@ class PessimisticTransactionDB : public TransactionDB { std::vector GetDeadlockInfoBuffer() override; void SetDeadlockInfoBufferSize(uint32_t target_size) override; - void UpdateCFComparatorMap(const std::vector& handles); - void UpdateCFComparatorMap(const ColumnFamilyHandle* handle); - std::map* GetCFComparatorMap() { - return cf_map_.load(); - } + // The default implementation does nothing. The actual implementation is moved + // to the child classes that actually need this information. This was due to + // an odd performance drop we observed when the added std::atomic member to + // the base class even when the subclass do not read it in the fast path. + virtual void UpdateCFComparatorMap(const std::vector&) {} + virtual void UpdateCFComparatorMap(const ColumnFamilyHandle*) {} protected: DBImpl* db_impl_; std::shared_ptr info_log_; const TransactionDBOptions txn_db_options_; - // A cache of the cf comparators - std::atomic*> cf_map_; - // GC of the object above - std::unique_ptr> cf_map_gc_; void ReinitializeTransaction( Transaction* txn, const WriteOptions& write_options, diff --git a/utilities/transactions/write_prepared_txn.cc b/utilities/transactions/write_prepared_txn.cc index 4ec6219fd..7aedb1a15 100644 --- a/utilities/transactions/write_prepared_txn.cc +++ b/utilities/transactions/write_prepared_txn.cc @@ -72,7 +72,9 @@ Status WritePreparedTxn::PrepareInternal() { uint64_t seq_used = kMaxSequenceNumber; // For each duplicate key we account for a new sub-batch prepare_batch_cnt_ = 1; - if (GetWriteBatch()->HasDuplicateKeys()) { + if (UNLIKELY(GetWriteBatch()->HasDuplicateKeys())) { + ROCKS_LOG_WARN(db_impl_->immutable_db_options().info_log, + "Duplicate key overhead"); SubBatchCounter counter(*wpt_db_->GetCFComparatorMap()); auto s = GetWriteBatch()->GetWriteBatch()->Iterate(&counter); assert(s.ok()); @@ -97,7 +99,7 @@ Status WritePreparedTxn::PrepareInternal() { Status WritePreparedTxn::CommitWithoutPrepareInternal() { // For each duplicate key we account for a new sub-batch size_t batch_cnt = 1; - if (GetWriteBatch()->HasDuplicateKeys()) { + if (UNLIKELY(GetWriteBatch()->HasDuplicateKeys())) { batch_cnt = 0; // this will trigger a batch cnt compute } return CommitBatchInternal(GetWriteBatch()->GetWriteBatch(), batch_cnt); @@ -129,7 +131,9 @@ Status WritePreparedTxn::CommitInternal() { const bool includes_data = !empty && !for_recovery; assert(prepare_batch_cnt_); size_t commit_batch_cnt = 0; - if (includes_data) { + if (UNLIKELY(includes_data)) { + ROCKS_LOG_WARN(db_impl_->immutable_db_options().info_log, + "Duplicate key overhead"); SubBatchCounter counter(*wpt_db_->GetCFComparatorMap()); auto s = working_batch->Iterate(&counter); assert(s.ok()); @@ -143,7 +147,7 @@ Status WritePreparedTxn::CommitInternal() { // a connection between the memtable and its WAL, so there is no need to // redundantly reference the log that contains the prepared data. const uint64_t zero_log_number = 0ull; - size_t batch_cnt = commit_batch_cnt ? commit_batch_cnt : 1; + size_t batch_cnt = UNLIKELY(commit_batch_cnt) ? commit_batch_cnt : 1; auto s = db_impl_->WriteImpl(write_options_, working_batch, nullptr, nullptr, zero_log_number, disable_memtable, &seq_used, batch_cnt, &update_commit_map); @@ -320,7 +324,9 @@ Status WritePreparedTxn::ValidateSnapshot(ColumnFamilyHandle* column_family, Status WritePreparedTxn::RebuildFromWriteBatch(WriteBatch* src_batch) { auto ret = PessimisticTransaction::RebuildFromWriteBatch(src_batch); prepare_batch_cnt_ = 1; - if (GetWriteBatch()->HasDuplicateKeys()) { + if (UNLIKELY(GetWriteBatch()->HasDuplicateKeys())) { + ROCKS_LOG_WARN(db_impl_->immutable_db_options().info_log, + "Duplicate key overhead"); SubBatchCounter counter(*wpt_db_->GetCFComparatorMap()); auto s = GetWriteBatch()->GetWriteBatch()->Iterate(&counter); assert(s.ok()); diff --git a/utilities/transactions/write_prepared_txn.h b/utilities/transactions/write_prepared_txn.h index 0e0581173..df2fb8c20 100644 --- a/utilities/transactions/write_prepared_txn.h +++ b/utilities/transactions/write_prepared_txn.h @@ -62,7 +62,7 @@ class WritePreparedTxn : public PessimisticTransaction { ColumnFamilyHandle* column_family) override; protected: - // Override the protected SetId to make it visible to the firend class + // Override the protected SetId to make it visible to the friend class // WritePreparedTxnDB inline void SetId(uint64_t id) override { Transaction::SetId(id); } diff --git a/utilities/transactions/write_prepared_txn_db.cc b/utilities/transactions/write_prepared_txn_db.cc index e6edca844..882b7fe14 100644 --- a/utilities/transactions/write_prepared_txn_db.cc +++ b/utilities/transactions/write_prepared_txn_db.cc @@ -104,6 +104,8 @@ Status WritePreparedTxnDB::WriteInternal(const WriteOptions& write_options_orig, } if (batch_cnt == 0) { // not provided, then compute it // TODO(myabandeh): add an option to allow user skipping this cost + ROCKS_LOG_WARN(db_impl_->immutable_db_options().info_log, + "Duplicate key overhead"); SubBatchCounter counter(*GetCFComparatorMap()); auto s = batch->Iterate(&counter); assert(s.ok()); @@ -145,16 +147,15 @@ Status WritePreparedTxnDB::WriteInternal(const WriteOptions& write_options_orig, ROCKS_LOG_DETAILS(db_impl_->immutable_db_options().info_log, "CommitBatchInternal 2nd write prepare_seq: %" PRIu64, prepare_seq); - // TODO(myabandeh): Note: we skip AddPrepared here. This could be further - // optimized by skip erasing prepare_seq from prepared_txn_ in the following - // callback. // TODO(myabandeh): What if max advances the prepare_seq_ in the meanwhile and // readers assume the prepared data as committed? Almost zero probability. // Commit the batch by writing an empty batch to the 2nd queue that will // release the commit sequence number to readers. + const size_t ZERO_COMMITS = 0; + const bool PREP_HEAP_SKIPPED = true; WritePreparedCommitEntryPreReleaseCallback update_commit_map_with_prepare( - this, db_impl_, prepare_seq, batch_cnt); + this, db_impl_, prepare_seq, batch_cnt, ZERO_COMMITS, PREP_HEAP_SKIPPED); WriteBatch empty_batch; empty_batch.PutLogData(Slice()); const size_t ONE_BATCH = 1; @@ -184,6 +185,31 @@ Status WritePreparedTxnDB::Get(const ReadOptions& options, &callback); } +void WritePreparedTxnDB::UpdateCFComparatorMap( + const std::vector& handles) { + auto cf_map = new std::map(); + for (auto h : handles) { + auto id = h->GetID(); + const Comparator* comparator = h->GetComparator(); + (*cf_map)[id] = comparator; + } + cf_map_.store(cf_map); + cf_map_gc_.reset(cf_map); +} + +void WritePreparedTxnDB::UpdateCFComparatorMap( + const ColumnFamilyHandle* h) { + auto old_cf_map_ptr = cf_map_.load(); + assert(old_cf_map_ptr); + auto cf_map = new std::map(*old_cf_map_ptr); + auto id = h->GetID(); + const Comparator* comparator = h->GetComparator(); + (*cf_map)[id] = comparator; + cf_map_.store(cf_map); + cf_map_gc_.reset(cf_map); +} + + std::vector WritePreparedTxnDB::MultiGet( const ReadOptions& options, const std::vector& column_family, diff --git a/utilities/transactions/write_prepared_txn_db.h b/utilities/transactions/write_prepared_txn_db.h index f2f5a5953..6e73496c2 100644 --- a/utilities/transactions/write_prepared_txn_db.h +++ b/utilities/transactions/write_prepared_txn_db.h @@ -216,6 +216,13 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { // Struct to hold ownership of snapshot and read callback for cleanup. struct IteratorState; + std::map* GetCFComparatorMap() { + return cf_map_.load(); + } + void UpdateCFComparatorMap( + const std::vector& handles) override; + void UpdateCFComparatorMap(const ColumnFamilyHandle* handle) override; + protected: virtual Status VerifyCFOptions( const ColumnFamilyOptions& cf_options) override; @@ -394,6 +401,10 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { mutable port::RWMutex old_commit_map_mutex_; mutable port::RWMutex commit_cache_mutex_; mutable port::RWMutex snapshots_mutex_; + // A cache of the cf comparators + std::atomic*> cf_map_; + // GC of the object above + std::unique_ptr> cf_map_gc_; }; class WritePreparedTxnReadCallback : public ReadCallback { @@ -420,12 +431,14 @@ class WritePreparedCommitEntryPreReleaseCallback : public PreReleaseCallback { DBImpl* db_impl, SequenceNumber prep_seq, size_t prep_batch_cnt, - size_t data_batch_cnt = 0) + size_t data_batch_cnt = 0, + bool prep_heap_skipped = false) : db_(db), db_impl_(db_impl), prep_seq_(prep_seq), prep_batch_cnt_(prep_batch_cnt), data_batch_cnt_(data_batch_cnt), + prep_heap_skipped_(prep_heap_skipped), includes_data_(data_batch_cnt_ > 0) { assert((prep_batch_cnt_ > 0) != (prep_seq == kMaxSequenceNumber)); // xor assert(prep_batch_cnt_ > 0 || data_batch_cnt_ > 0); @@ -438,7 +451,7 @@ class WritePreparedCommitEntryPreReleaseCallback : public PreReleaseCallback { : commit_seq + data_batch_cnt_ - 1; if (prep_seq_ != kMaxSequenceNumber) { for (size_t i = 0; i < prep_batch_cnt_; i++) { - db_->AddCommitted(prep_seq_ + i, last_commit_seq); + db_->AddCommitted(prep_seq_ + i, last_commit_seq, prep_heap_skipped_); } } // else there was no prepare phase if (includes_data_) { @@ -471,6 +484,9 @@ class WritePreparedCommitEntryPreReleaseCallback : public PreReleaseCallback { SequenceNumber prep_seq_; size_t prep_batch_cnt_; size_t data_batch_cnt_; + // An optimization that indicates that there is no need to update the prepare + // heap since the prepare sequence number was not added to it. + bool prep_heap_skipped_; // Either because it is commit without prepare or it has a // CommitTimeWriteBatch bool includes_data_;