From 90eca1e616d4225555ee25ce54462c5993cb79d2 Mon Sep 17 00:00:00 2001 From: Maysam Yabandeh Date: Thu, 22 Feb 2018 18:05:14 -0800 Subject: [PATCH] WritePrepared Txn: optimize SubBatchCnt Summary: Make use of the index in WriteBatchWithIndex to also count the number of sub-batches. This eliminates the need to separately scan the batch to count the number of sub-batches once a duplicate key is detected. Closes https://github.com/facebook/rocksdb/pull/3529 Differential Revision: D7049947 Pulled By: maysamyabandeh fbshipit-source-id: 81cbf12c4e662541c772c7265a8f91631e25c7cd --- .../utilities/write_batch_with_index.h | 7 +- .../write_prepared_transaction_test.cc | 96 +++++++++++++++++++ utilities/transactions/write_prepared_txn.cc | 25 +---- .../write_batch_with_index.cc | 27 ++++-- 4 files changed, 122 insertions(+), 33 deletions(-) diff --git a/include/rocksdb/utilities/write_batch_with_index.h b/include/rocksdb/utilities/write_batch_with_index.h index 84b293815..a0c9635da 100644 --- a/include/rocksdb/utilities/write_batch_with_index.h +++ b/include/rocksdb/utilities/write_batch_with_index.h @@ -228,8 +228,11 @@ class WriteBatchWithIndex : public WriteBatchBase { private: friend class WritePreparedTxn; - // Returns true if there has been duplicate keys in the batch. - bool HasDuplicateKeys(); + friend class WriteBatchWithIndex_SubBatchCnt_Test; + // Returns the number of sub-batches inside the write batch. A sub-batch + // starts right before inserting a key that is a duplicate of a key in the + // last sub-batch. + size_t SubBatchCnt(); Status GetFromBatchAndDB(DB* db, const ReadOptions& read_options, ColumnFamilyHandle* column_family, const Slice& key, diff --git a/utilities/transactions/write_prepared_transaction_test.cc b/utilities/transactions/write_prepared_transaction_test.cc index 8e1276558..1e2c8bb4a 100644 --- a/utilities/transactions/write_prepared_transaction_test.cc +++ b/utilities/transactions/write_prepared_transaction_test.cc @@ -179,6 +179,102 @@ TEST(PreparedHeap, Concurrent) { } } +// Test that WriteBatchWithIndex correctly counts the number of sub-batches +TEST(WriteBatchWithIndex, SubBatchCnt) { + ColumnFamilyOptions cf_options; + std::string cf_name = "two"; + DB* db; + Options options; + options.create_if_missing = true; + const std::string dbname = test::TmpDir() + "/transaction_testdb"; + DestroyDB(dbname, options); + ASSERT_OK(DB::Open(options, dbname, &db)); + ColumnFamilyHandle* cf_handle = nullptr; + ASSERT_OK(db->CreateColumnFamily(cf_options, cf_name, &cf_handle)); + WriteOptions write_options; + size_t batch_cnt = 1; + size_t save_points = 0; + std::vector batch_cnt_at; + WriteBatchWithIndex batch(db->DefaultColumnFamily()->GetComparator(), 0, true, + 0); + ASSERT_EQ(batch_cnt, batch.SubBatchCnt()); + batch_cnt_at.push_back(batch_cnt); + batch.SetSavePoint(); + save_points++; + batch.Put(Slice("key"), Slice("value")); + ASSERT_EQ(batch_cnt, batch.SubBatchCnt()); + batch_cnt_at.push_back(batch_cnt); + batch.SetSavePoint(); + save_points++; + batch.Put(Slice("key2"), Slice("value2")); + ASSERT_EQ(batch_cnt, batch.SubBatchCnt()); + // duplicate the keys + batch_cnt_at.push_back(batch_cnt); + batch.SetSavePoint(); + save_points++; + batch.Put(Slice("key"), Slice("value3")); + batch_cnt++; + ASSERT_EQ(batch_cnt, batch.SubBatchCnt()); + // duplicate the 2nd key. It should not be counted duplicate since a + // sub-patch is cut after the last duplicate. + batch_cnt_at.push_back(batch_cnt); + batch.SetSavePoint(); + save_points++; + batch.Put(Slice("key2"), Slice("value4")); + ASSERT_EQ(batch_cnt, batch.SubBatchCnt()); + // duplicate the keys but in a different cf. It should not be counted as + // duplicate keys + batch_cnt_at.push_back(batch_cnt); + batch.SetSavePoint(); + save_points++; + batch.Put(cf_handle, Slice("key"), Slice("value5")); + ASSERT_EQ(batch_cnt, batch.SubBatchCnt()); + + // Test that the number of sub-batches matches what we count with + // SubBatchCounter + std::map comparators; + comparators[0] = db->DefaultColumnFamily()->GetComparator(); + comparators[cf_handle->GetID()] = cf_handle->GetComparator(); + SubBatchCounter counter(comparators); + ASSERT_OK(batch.GetWriteBatch()->Iterate(&counter)); + ASSERT_EQ(batch_cnt, counter.BatchCount()); + + // Test that RollbackToSavePoint will properly resets the number of + // sub-bathces + for (size_t i = save_points; i > 0; i--) { + batch.RollbackToSavePoint(); + ASSERT_EQ(batch_cnt_at[i - 1], batch.SubBatchCnt()); + } + + // Test the count is right with random batches + { + const size_t TOTAL_KEYS = 20; // 20 ~= 10 to cause a few randoms + Random rnd(1131); + std::string keys[TOTAL_KEYS]; + for (size_t k = 0; k < TOTAL_KEYS; k++) { + int len = static_cast(rnd.Uniform(50)); + keys[k] = test::RandomKey(&rnd, len); + } + for (size_t i = 0; i < 1000; i++) { // 1000 random batches + WriteBatchWithIndex rndbatch(db->DefaultColumnFamily()->GetComparator(), + 0, true, 0); + for (size_t k = 0; k < 10; k++) { // 10 key per batch + size_t ki = static_cast(rnd.Uniform(TOTAL_KEYS)); + Slice key = Slice(keys[ki]); + std::string buffer; + Slice value = Slice(test::RandomString(&rnd, 16, &buffer)); + rndbatch.Put(key, value); + } + SubBatchCounter batch_counter(comparators); + ASSERT_OK(rndbatch.GetWriteBatch()->Iterate(&batch_counter)); + ASSERT_EQ(rndbatch.SubBatchCnt(), batch_counter.BatchCount()); + } + } + + delete cf_handle; + delete db; +} + TEST(CommitEntry64b, BasicTest) { const size_t INDEX_BITS = static_cast(21); const size_t INDEX_SIZE = static_cast(1ull << INDEX_BITS); diff --git a/utilities/transactions/write_prepared_txn.cc b/utilities/transactions/write_prepared_txn.cc index 017a4d7d8..6f573c324 100644 --- a/utilities/transactions/write_prepared_txn.cc +++ b/utilities/transactions/write_prepared_txn.cc @@ -71,15 +71,7 @@ Status WritePreparedTxn::PrepareInternal() { const bool DISABLE_MEMTABLE = true; uint64_t seq_used = kMaxSequenceNumber; // For each duplicate key we account for a new sub-batch - prepare_batch_cnt_ = 1; - if (UNLIKELY(GetWriteBatch()->HasDuplicateKeys())) { - ROCKS_LOG_WARN(db_impl_->immutable_db_options().info_log, - "Duplicate key overhead"); - SubBatchCounter counter(*wpt_db_->GetCFComparatorMap()); - auto s = GetWriteBatch()->GetWriteBatch()->Iterate(&counter); - assert(s.ok()); - prepare_batch_cnt_ = counter.BatchCount(); - } + prepare_batch_cnt_ = GetWriteBatch()->SubBatchCnt(); Status s = db_impl_->WriteImpl(write_options, GetWriteBatch()->GetWriteBatch(), /*callback*/ nullptr, &log_number_, /*log ref*/ 0, @@ -98,10 +90,7 @@ Status WritePreparedTxn::PrepareInternal() { Status WritePreparedTxn::CommitWithoutPrepareInternal() { // For each duplicate key we account for a new sub-batch - size_t batch_cnt = 1; - if (UNLIKELY(GetWriteBatch()->HasDuplicateKeys())) { - batch_cnt = 0; // this will trigger a batch cnt compute - } + const size_t batch_cnt = GetWriteBatch()->SubBatchCnt(); return CommitBatchInternal(GetWriteBatch()->GetWriteBatch(), batch_cnt); } @@ -326,15 +315,7 @@ Status WritePreparedTxn::ValidateSnapshot(ColumnFamilyHandle* column_family, Status WritePreparedTxn::RebuildFromWriteBatch(WriteBatch* src_batch) { auto ret = PessimisticTransaction::RebuildFromWriteBatch(src_batch); - prepare_batch_cnt_ = 1; - if (UNLIKELY(GetWriteBatch()->HasDuplicateKeys())) { - ROCKS_LOG_WARN(db_impl_->immutable_db_options().info_log, - "Duplicate key overhead"); - SubBatchCounter counter(*wpt_db_->GetCFComparatorMap()); - auto s = GetWriteBatch()->GetWriteBatch()->Iterate(&counter); - assert(s.ok()); - prepare_batch_cnt_ = counter.BatchCount(); - } + prepare_batch_cnt_ = GetWriteBatch()->SubBatchCnt(); return ret; } diff --git a/utilities/write_batch_with_index/write_batch_with_index.cc b/utilities/write_batch_with_index/write_batch_with_index.cc index 8f3163ab3..31f7b3993 100644 --- a/utilities/write_batch_with_index/write_batch_with_index.cc +++ b/utilities/write_batch_with_index/write_batch_with_index.cc @@ -8,7 +8,6 @@ #include "rocksdb/utilities/write_batch_with_index.h" #include -#include #include "db/column_family.h" #include "db/db_impl.h" @@ -393,14 +392,21 @@ struct WriteBatchWithIndex::Rep { comparator(index_comparator, &write_batch), skip_list(comparator, &arena), overwrite_key(_overwrite_key), - last_entry_offset(0) {} + last_entry_offset(0), + last_sub_batch_offset(0), + sub_batch_cnt(1) {} ReadableWriteBatch write_batch; WriteBatchEntryComparator comparator; Arena arena; WriteBatchEntrySkipList skip_list; bool overwrite_key; size_t last_entry_offset; - std::vector obsolete_offsets; + // The starting offset of the last sub-batch. A sub-batch starts right before + // inserting a key that is a duplicate of a key in the last sub-batch. Zero, + // the default, means that no duplicate key is detected so far. + size_t last_sub_batch_offset; + // Total number of sub-batches in the write batch. Default is 1. + size_t sub_batch_cnt; // Remember current offset of internal write batch, which is used as // the starting offset of the next record. @@ -452,7 +458,10 @@ bool WriteBatchWithIndex::Rep::UpdateExistingEntryWithCfId( } WriteBatchIndexEntry* non_const_entry = const_cast(iter.GetRawEntry()); - obsolete_offsets.push_back(non_const_entry->offset); + if (LIKELY(last_sub_batch_offset <= non_const_entry->offset)) { + last_sub_batch_offset = last_entry_offset; + sub_batch_cnt++; + } non_const_entry->offset = last_entry_offset; return true; } @@ -504,6 +513,8 @@ void WriteBatchWithIndex::Rep::ClearIndex() { new (&arena) Arena(); new (&skip_list) WriteBatchEntrySkipList(comparator, &arena); last_entry_offset = 0; + last_sub_batch_offset = 0; + sub_batch_cnt = 1; } Status WriteBatchWithIndex::Rep::ReBuildIndex() { @@ -582,9 +593,7 @@ WriteBatchWithIndex::~WriteBatchWithIndex() {} WriteBatch* WriteBatchWithIndex::GetWriteBatch() { return &rep->write_batch; } -bool WriteBatchWithIndex::HasDuplicateKeys() { - return rep->obsolete_offsets.size() > 0; -} +size_t WriteBatchWithIndex::SubBatchCnt() { return rep->sub_batch_cnt; } WBWIIterator* WriteBatchWithIndex::NewIterator() { return new WBWIIteratorImpl(0, &(rep->skip_list), &rep->write_batch); @@ -883,8 +892,8 @@ Status WriteBatchWithIndex::RollbackToSavePoint() { Status s = rep->write_batch.RollbackToSavePoint(); if (s.ok()) { - // obsolete_offsets will be rebuilt by ReBuildIndex - rep->obsolete_offsets.clear(); + rep->sub_batch_cnt = 1; + rep->last_sub_batch_offset = 0; s = rep->ReBuildIndex(); }