From 2c1f95291d0142957d74042aea14682539085501 Mon Sep 17 00:00:00 2001 From: Reid Horuff Date: Fri, 7 Oct 2016 11:31:26 -0700 Subject: [PATCH] Add facility to write only a portion of WriteBatch to WAL Summary: When constructing a write batch a client may now call MarkWalTerminationPoint() on that batch. No batch operations after this call will be added written to the WAL but will still be inserted into the Memtable. This facility is used to remove one of the three WriteImpl calls in 2PC transactions. This produces a ~1% perf improvement. ``` RocksDB - unoptimized 2pc, sync_binlog=1, disable_2pc=off INFO 2016-08-31 14:30:38,814 [main]: REQUEST PHASE COMPLETED. 75000000 requests done in 2619 seconds. Requests/second = 28628 RocksDB - optimized 2pc , sync_binlog=1, disable_2pc=off INFO 2016-08-31 16:26:59,442 [main]: REQUEST PHASE COMPLETED. 75000000 requests done in 2581 seconds. Requests/second = 29054 ``` Test Plan: Two unit tests added. Reviewers: sdong, yiwu, IslamAbdelRahman Reviewed By: yiwu Subscribers: hermanlee4, dhruba, andrewkr Differential Revision: https://reviews.facebook.net/D64599 --- db/db_impl.cc | 9 +++- db/db_wal_test.cc | 23 +++++++++++ db/fault_injection_test.cc | 24 +++++++++++ db/write_batch.cc | 48 +++++++++++++++------- db/write_batch_internal.h | 3 +- db/write_batch_test.cc | 21 ++++++++++ include/rocksdb/write_batch.h | 29 +++++++++++++ utilities/transactions/transaction_impl.cc | 28 ++++++------- 8 files changed, 152 insertions(+), 33 deletions(-) diff --git a/db/db_impl.cc b/db/db_impl.cc index 46b03d27e..cec473b24 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -4768,7 +4768,11 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, PERF_TIMER_GUARD(write_wal_time); WriteBatch* merged_batch = nullptr; - if (write_group.size() == 1 && write_group[0]->ShouldWriteToWAL()) { + if (write_group.size() == 1 && write_group[0]->ShouldWriteToWAL() && + write_group[0]->batch->GetWalTerminationPoint().is_cleared()) { + // we simply write the first WriteBatch to WAL if the group only + // contains one batch, that batch should be written to the WAL, + // and the batch is not wanting to be truncated merged_batch = write_group[0]->batch; write_group[0]->log_used = logfile_number_; } else { @@ -4778,7 +4782,8 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, merged_batch = &tmp_batch_; for (auto writer : write_group) { if (writer->ShouldWriteToWAL()) { - WriteBatchInternal::Append(merged_batch, writer->batch); + WriteBatchInternal::Append(merged_batch, writer->batch, + /*WAL_only*/ true); } writer->log_used = logfile_number_; } diff --git a/db/db_wal_test.cc b/db/db_wal_test.cc index b0ca938a9..425a26b71 100644 --- a/db/db_wal_test.cc +++ b/db/db_wal_test.cc @@ -1130,6 +1130,29 @@ TEST_F(DBWALTest, RecoverFromCorruptedWALWithoutFlush) { #endif // ROCKSDB_LITE +TEST_F(DBWALTest, WalTermTest) { + Options options = CurrentOptions(); + options.env = env_; + CreateAndReopenWithCF({"pikachu"}, options); + + ASSERT_OK(Put(1, "foo", "bar")); + + WriteOptions wo; + wo.sync = true; + wo.disableWAL = false; + + WriteBatch batch; + batch.Put("foo", "bar"); + batch.MarkWalTerminationPoint(); + batch.Put("foo2", "bar2"); + + ASSERT_OK(dbfull()->Write(wo, &batch)); + + // make sure we can re-open it. + ASSERT_OK(TryReopenWithColumnFamilies({"default", "pikachu"}, options)); + ASSERT_EQ("bar", Get(1, "foo")); + ASSERT_EQ("NOT_FOUND", Get(1, "foo2")); +} } // namespace rocksdb int main(int argc, char** argv) { diff --git a/db/fault_injection_test.cc b/db/fault_injection_test.cc index d4d4932aa..8200de1b6 100644 --- a/db/fault_injection_test.cc +++ b/db/fault_injection_test.cc @@ -503,6 +503,30 @@ TEST_P(FaultInjectionTest, ManualLogSyncTest) { ASSERT_EQ(value_space, val); } +TEST_P(FaultInjectionTest, WriteBatchWalTerminationTest) { + ReadOptions ro; + Options options = CurrentOptions(); + options.env = env_; + + WriteOptions wo; + wo.sync = true; + wo.disableWAL = false; + WriteBatch batch; + batch.Put("cats", "dogs"); + batch.MarkWalTerminationPoint(); + batch.Put("boys", "girls"); + ASSERT_OK(db_->Write(wo, &batch)); + + env_->SetFilesystemActive(false); + NoWriteTestReopenWithFault(kResetDropAndDeleteUnsynced); + ASSERT_OK(OpenDB()); + + std::string val; + ASSERT_OK(db_->Get(ro, "cats", &val)); + ASSERT_EQ("dogs", val); + ASSERT_EQ(db_->Get(ro, "boys", &val), Status::NotFound()); +} + INSTANTIATE_TEST_CASE_P(FaultTest, FaultInjectionTest, ::testing::Bool()); } // namespace rocksdb diff --git a/db/write_batch.cc b/db/write_batch.cc index 2ed3cd85b..078d9e6c6 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -118,13 +118,6 @@ struct BatchContentClassifier : public WriteBatch::Handler { } // anon namespace - -struct SavePoint { - size_t size; // size of rep_ - int count; // count of elements in rep_ - uint32_t content_flags; -}; - struct SavePoints { std::stack stack; }; @@ -143,11 +136,13 @@ WriteBatch::WriteBatch(const std::string& rep) WriteBatch::WriteBatch(const WriteBatch& src) : save_points_(src.save_points_), + wal_term_point_(src.wal_term_point_), content_flags_(src.content_flags_.load(std::memory_order_relaxed)), rep_(src.rep_) {} WriteBatch::WriteBatch(WriteBatch&& src) : save_points_(std::move(src.save_points_)), + wal_term_point_(std::move(src.wal_term_point_)), content_flags_(src.content_flags_.load(std::memory_order_relaxed)), rep_(std::move(src.rep_)) {} @@ -191,6 +186,8 @@ void WriteBatch::Clear() { save_points_->stack.pop(); } } + + wal_term_point_.clear(); } int WriteBatch::Count() const { @@ -213,6 +210,12 @@ uint32_t WriteBatch::ComputeContentFlags() const { return rv; } +void WriteBatch::MarkWalTerminationPoint() { + wal_term_point_.size = GetDataSize(); + wal_term_point_.count = Count(); + wal_term_point_.content_flags = content_flags_; +} + bool WriteBatch::HasPut() const { return (ComputeContentFlags() & ContentFlags::HAS_PUT) != 0; } @@ -729,8 +732,8 @@ void WriteBatch::SetSavePoint() { save_points_ = new SavePoints(); } // Record length and count of current batch of writes. - save_points_->stack.push(SavePoint{ - GetDataSize(), Count(), content_flags_.load(std::memory_order_relaxed)}); + save_points_->stack.push(SavePoint( + GetDataSize(), Count(), content_flags_.load(std::memory_order_relaxed))); } Status WriteBatch::RollbackToSavePoint() { @@ -1252,14 +1255,29 @@ void WriteBatchInternal::SetContents(WriteBatch* b, const Slice& contents) { b->content_flags_.store(ContentFlags::DEFERRED, std::memory_order_relaxed); } -void WriteBatchInternal::Append(WriteBatch* dst, const WriteBatch* src) { - SetCount(dst, Count(dst) + Count(src)); +void WriteBatchInternal::Append(WriteBatch* dst, const WriteBatch* src, + const bool wal_only) { + size_t src_len; + int src_count; + uint32_t src_flags; + + const SavePoint& batch_end = src->GetWalTerminationPoint(); + + if (wal_only && !batch_end.is_cleared()) { + src_len = batch_end.size - WriteBatchInternal::kHeader; + src_count = batch_end.count; + src_flags = batch_end.content_flags; + } else { + src_len = src->rep_.size() - WriteBatchInternal::kHeader; + src_count = Count(src); + src_flags = src->content_flags_.load(std::memory_order_relaxed); + } + + SetCount(dst, Count(dst) + src_count); assert(src->rep_.size() >= WriteBatchInternal::kHeader); - dst->rep_.append(src->rep_.data() + WriteBatchInternal::kHeader, - src->rep_.size() - WriteBatchInternal::kHeader); + dst->rep_.append(src->rep_.data() + WriteBatchInternal::kHeader, src_len); dst->content_flags_.store( - dst->content_flags_.load(std::memory_order_relaxed) | - src->content_flags_.load(std::memory_order_relaxed), + dst->content_flags_.load(std::memory_order_relaxed) | src_flags, std::memory_order_relaxed); } diff --git a/db/write_batch_internal.h b/db/write_batch_internal.h index 1602fdbcc..77e46ecff 100644 --- a/db/write_batch_internal.h +++ b/db/write_batch_internal.h @@ -177,7 +177,8 @@ class WriteBatchInternal { uint64_t log_number = 0, DB* db = nullptr, bool concurrent_memtable_writes = false); - static void Append(WriteBatch* dst, const WriteBatch* src); + static void Append(WriteBatch* dst, const WriteBatch* src, + const bool WAL_only = false); // Returns the byte size of appending a WriteBatch with ByteSize // leftByteSize and a WriteBatch with ByteSize rightByteSize diff --git a/db/write_batch_test.cc b/db/write_batch_test.cc index 05e7c0e7b..75d9e3e7f 100644 --- a/db/write_batch_test.cc +++ b/db/write_batch_test.cc @@ -185,6 +185,27 @@ TEST_F(WriteBatchTest, Append) { "Delete(foo)@203", PrintContents(&b1)); ASSERT_EQ(4, b1.Count()); + b2.Clear(); + b2.Put("c", "cc"); + b2.Put("d", "dd"); + b2.MarkWalTerminationPoint(); + b2.Put("e", "ee"); + WriteBatchInternal::Append(&b1, &b2, /*wal only*/ true); + ASSERT_EQ( + "Put(a, va)@200" + "Put(b, vb)@202" + "Put(b, vb)@201" + "Put(c, cc)@204" + "Put(d, dd)@205" + "Delete(foo)@203", + PrintContents(&b1)); + ASSERT_EQ(6, b1.Count()); + ASSERT_EQ( + "Put(c, cc)@0" + "Put(d, dd)@1" + "Put(e, ee)@2", + PrintContents(&b2)); + ASSERT_EQ(3, b2.Count()); } TEST_F(WriteBatchTest, SingleDeletion) { diff --git a/include/rocksdb/write_batch.h b/include/rocksdb/write_batch.h index 01f2108f1..89f9e5017 100644 --- a/include/rocksdb/write_batch.h +++ b/include/rocksdb/write_batch.h @@ -39,6 +39,25 @@ class ColumnFamilyHandle; struct SavePoints; struct SliceParts; +struct SavePoint { + size_t size; // size of rep_ + int count; // count of elements in rep_ + uint32_t content_flags; + + SavePoint() : size(0), count(0), content_flags(0) {} + + SavePoint(size_t _size, int _count, uint32_t _flags) + : size(_size), count(_count), content_flags(_flags) {} + + void clear() { + size = 0; + count = 0; + content_flags = 0; + } + + bool is_cleared() const { return (size | count | content_flags) == 0; } +}; + class WriteBatch : public WriteBatchBase { public: explicit WriteBatch(size_t reserved_bytes = 0); @@ -280,10 +299,20 @@ class WriteBatch : public WriteBatchBase { WriteBatch& operator=(const WriteBatch& src); WriteBatch& operator=(WriteBatch&& src); + // marks this point in the WriteBatch as the last record to + // be inserted into the WAL, provided the WAL is enabled + void MarkWalTerminationPoint(); + const SavePoint& GetWalTerminationPoint() const { return wal_term_point_; } + private: friend class WriteBatchInternal; SavePoints* save_points_; + // When sending a WriteBatch through WriteImpl we might want to + // specify that only the first x records of the batch be written to + // the WAL. + SavePoint wal_term_point_; + // For HasXYZ. Mutable to allow lazy computation of results mutable std::atomic content_flags_; diff --git a/utilities/transactions/transaction_impl.cc b/utilities/transactions/transaction_impl.cc index fe4a959b3..8676582e9 100644 --- a/utilities/transactions/transaction_impl.cc +++ b/utilities/transactions/transaction_impl.cc @@ -250,25 +250,23 @@ Status TransactionImpl::Commit() { } } else if (commit_prepared) { exec_status_.store(AWAITING_COMMIT); - WriteOptions write_options = write_options_; - // insert prepared batch into Memtable only. - // Memtable will ignore BeginPrepare/EndPrepare markers - // in non recovery mode and simply insert the values - write_options.disableWAL = true; - assert(log_number_ > 0); - s = db_impl_->WriteImpl(write_options, GetWriteBatch()->GetWriteBatch(), - nullptr, nullptr, log_number_); - if (!s.ok()) { - return s; - } // We take the commit-time batch and append the Commit marker. - // We then write this batch to both WAL and Memtable. // The Memtable will ignore the Commit marker in non-recovery mode - write_options.disableWAL = false; - WriteBatchInternal::MarkCommit(GetCommitTimeWriteBatch(), name_); - s = db_impl_->WriteImpl(write_options, GetCommitTimeWriteBatch()); + WriteBatch* working_batch = GetCommitTimeWriteBatch(); + WriteBatchInternal::MarkCommit(working_batch, name_); + + // any operations appended to this working_batch will be ignored from WAL + working_batch->MarkWalTerminationPoint(); + + // insert prepared batch into Memtable only skipping WAL. + // Memtable will ignore BeginPrepare/EndPrepare markers + // in non recovery mode and simply insert the values + WriteBatchInternal::Append(working_batch, GetWriteBatch()->GetWriteBatch()); + + s = db_impl_->WriteImpl(write_options_, working_batch, nullptr, nullptr, + log_number_); if (!s.ok()) { return s; }