From ae152ee666c34b31c4bb0fa5a8fdf46a6b5ea93b Mon Sep 17 00:00:00 2001 From: Yanqin Jin Date: Thu, 25 Jul 2019 15:23:46 -0700 Subject: [PATCH] Avoid user key copying for Get/Put/Write with user-timestamp (#5502) Summary: In previous https://github.com/facebook/rocksdb/issues/5079, we added user-specified timestamp to `DB::Get()` and `DB::Put()`. Limitation is that these two functions may cause extra memory allocation and key copy. The reason is that `WriteBatch` does not allocate extra memory for timestamps because it is not aware of timestamp size, and we did not provide an API to assign/update timestamp of each key within a `WriteBatch`. We address these issues in this PR by doing the following. 1. Add a `timestamp_size_` to `WriteBatch` so that `WriteBatch` can take timestamps into account when calling `WriteBatch::Put`, `WriteBatch::Delete`, etc. 2. Add APIs `WriteBatch::AssignTimestamp` and `WriteBatch::AssignTimestamps` so that application can assign/update timestamps for each key in a `WriteBatch`. 3. Avoid key copy in `GetImpl` by adding new constructor to `LookupKey`. Test plan (on devserver): ``` $make clean && COMPILE_WITH_ASAN=1 make -j32 all $./db_basic_test --gtest_filter=Timestamp/DBBasicTestWithTimestampWithParam.PutAndGet/* $make check ``` If the API extension looks good, I will add more unit tests. Some simple benchmark using db_bench. ``` $rm -rf /dev/shm/dbbench/* && TEST_TMPDIR=/dev/shm ./db_bench -benchmarks=fillseq,readrandom -num=1000000 $rm -rf /dev/shm/dbbench/* && TEST_TMPDIR=/dev/shm ./db_bench -benchmarks=fillrandom -num=1000000 -disable_wal=true ``` Master is at a78503bd6c80a3c4137df1962a972fe406b4d90b. ``` | | readrandom | fillrandom | | master | 15.53 MB/s | 25.97 MB/s | | PR5502 | 16.70 MB/s | 25.80 MB/s | ``` Pull Request resolved: https://github.com/facebook/rocksdb/pull/5502 Differential Revision: D16340894 Pulled By: riversand963 fbshipit-source-id: 51132cf792be07d1efc3ac33f5768c4ee2608bb8 --- .gitignore | 1 + db/db_impl/db_impl.cc | 13 +-- db/db_impl/db_impl_write.cc | 12 +-- db/dbformat.cc | 12 ++- db/dbformat.h | 16 ---- db/lookup_key.h | 3 +- db/write_batch.cc | 146 ++++++++++++++++++++++++++++++++-- include/rocksdb/write_batch.h | 9 +++ util/coding.h | 16 +++- 9 files changed, 183 insertions(+), 45 deletions(-) diff --git a/.gitignore b/.gitignore index 7a799c09a..c8672a8b3 100644 --- a/.gitignore +++ b/.gitignore @@ -49,6 +49,7 @@ rocksdb_undump db_test2 trace_analyzer trace_analyzer_test +block_cache_trace_analyzer .DS_Store java/out diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index 8132d5a0b..54e401ddd 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -1441,16 +1441,7 @@ ColumnFamilyHandle* DBImpl::PersistentStatsColumnFamily() const { Status DBImpl::Get(const ReadOptions& read_options, ColumnFamilyHandle* column_family, const Slice& key, PinnableSlice* value) { - if (nullptr == read_options.timestamp) { - return GetImpl(read_options, column_family, key, value); - } - Slice akey; - std::string buf; - Status s = AppendTimestamp(key, *(read_options.timestamp), &akey, &buf); - if (s.ok()) { - s = GetImpl(read_options, column_family, akey, value); - } - return s; + return GetImpl(read_options, column_family, key, value); } Status DBImpl::GetImpl(const ReadOptions& read_options, @@ -1528,7 +1519,7 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, // First look in the memtable, then in the immutable memtable (if any). // s is both in/out. When in, s could either be OK or MergeInProgress. // merge_operands will contain the sequence of merges in the latter case. - LookupKey lkey(key, snapshot); + LookupKey lkey(key, snapshot, read_options.timestamp); PERF_TIMER_STOP(get_snapshot_time); bool skip_memtable = (read_options.read_tier == kPersistedTier && diff --git a/db/db_impl/db_impl_write.cc b/db/db_impl/db_impl_write.cc index 95a1b31c7..0ad2a3e9a 100644 --- a/db/db_impl/db_impl_write.cc +++ b/db/db_impl/db_impl_write.cc @@ -1734,14 +1734,16 @@ Status DB::Put(const WriteOptions& opt, ColumnFamilyHandle* column_family, } return Write(opt, &batch); } - Slice akey; - std::string buf; - Status s = AppendTimestamp(key, *(opt.timestamp), &akey, &buf); + const Slice* ts = opt.timestamp; + assert(nullptr != ts); + size_t ts_sz = ts->size(); + WriteBatch batch(key.size() + ts_sz + value.size() + 24, /*max_bytes=*/0, + ts_sz); + Status s = batch.Put(column_family, key, value); if (!s.ok()) { return s; } - WriteBatch batch(akey.size() + value.size() + 24); - s = batch.Put(column_family, akey, value); + s = batch.AssignTimestamp(*ts); if (!s.ok()) { return s; } diff --git a/db/dbformat.cc b/db/dbformat.cc index bfaea868b..130ba4e8a 100644 --- a/db/dbformat.cc +++ b/db/dbformat.cc @@ -159,9 +159,11 @@ void InternalKeyComparator::FindShortSuccessor(std::string* key) const { } } -LookupKey::LookupKey(const Slice& _user_key, SequenceNumber s) { +LookupKey::LookupKey(const Slice& _user_key, SequenceNumber s, + const Slice* ts) { size_t usize = _user_key.size(); - size_t needed = usize + 13; // A conservative estimate + size_t ts_sz = (nullptr == ts) ? 0 : ts->size(); + size_t needed = usize + ts_sz + 13; // A conservative estimate char* dst; if (needed <= sizeof(space_)) { dst = space_; @@ -170,10 +172,14 @@ LookupKey::LookupKey(const Slice& _user_key, SequenceNumber s) { } start_ = dst; // NOTE: We don't support users keys of more than 2GB :) - dst = EncodeVarint32(dst, static_cast(usize + 8)); + dst = EncodeVarint32(dst, static_cast(usize + ts_sz + 8)); kstart_ = dst; memcpy(dst, _user_key.data(), usize); dst += usize; + if (nullptr != ts) { + memcpy(dst, ts->data(), ts_sz); + dst += ts_sz; + } EncodeFixed64(dst, PackSequenceAndType(s, kValueTypeForSeek)); dst += 8; end_ = dst; diff --git a/db/dbformat.h b/db/dbformat.h index c6ee5677c..1d9b7ef7e 100644 --- a/db/dbformat.h +++ b/db/dbformat.h @@ -669,20 +669,4 @@ struct ParsedInternalKeyComparator { const InternalKeyComparator* cmp; }; -// TODO (yanqin): this causes extra memory allocation and copy. Should be -// addressed in the future. -inline Status AppendTimestamp(const Slice& key, const Slice& timestamp, - Slice* ret_key, std::string* ret_buf) { - assert(ret_key != nullptr); - assert(ret_buf != nullptr); - if (key.data() + key.size() == timestamp.data()) { - *ret_key = Slice(key.data(), key.size() + timestamp.size()); - } else { - ret_buf->assign(key.data(), key.size()); - ret_buf->append(timestamp.data(), timestamp.size()); - *ret_key = Slice(*ret_buf); - } - return Status::OK(); -} - } // namespace rocksdb diff --git a/db/lookup_key.h b/db/lookup_key.h index ddf4ff0e9..1b0f6f562 100644 --- a/db/lookup_key.h +++ b/db/lookup_key.h @@ -21,7 +21,8 @@ class LookupKey { public: // Initialize *this for looking up user_key at a snapshot with // the specified sequence number. - LookupKey(const Slice& _user_key, SequenceNumber sequence); + LookupKey(const Slice& _user_key, SequenceNumber sequence, + const Slice* ts = nullptr); ~LookupKey(); diff --git a/db/write_batch.cc b/db/write_batch.cc index d7a2e792a..2c2d81e87 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -135,6 +135,105 @@ struct BatchContentClassifier : public WriteBatch::Handler { } }; +class TimestampAssigner : public WriteBatch::Handler { + public: + explicit TimestampAssigner(const Slice& ts) + : timestamp_(ts), timestamps_(kEmptyTimestampList) {} + explicit TimestampAssigner(const std::vector& ts_list) + : timestamps_(ts_list) { + SanityCheck(); + } + ~TimestampAssigner() override {} + + Status PutCF(uint32_t, const Slice& key, const Slice&) override { + AssignTimestamp(key); + ++idx_; + return Status::OK(); + } + + Status DeleteCF(uint32_t, const Slice& key) override { + AssignTimestamp(key); + ++idx_; + return Status::OK(); + } + + Status SingleDeleteCF(uint32_t, const Slice& key) override { + AssignTimestamp(key); + ++idx_; + return Status::OK(); + } + + Status DeleteRangeCF(uint32_t, const Slice& begin_key, + const Slice& end_key) override { + AssignTimestamp(begin_key); + AssignTimestamp(end_key); + ++idx_; + return Status::OK(); + } + + Status MergeCF(uint32_t, const Slice& key, const Slice&) override { + AssignTimestamp(key); + ++idx_; + return Status::OK(); + } + + Status PutBlobIndexCF(uint32_t, const Slice&, const Slice&) override { + // TODO (yanqin): support blob db in the future. + return Status::OK(); + } + + Status MarkBeginPrepare(bool) override { + // TODO (yanqin): support in the future. + return Status::OK(); + } + + Status MarkEndPrepare(const Slice&) override { + // TODO (yanqin): support in the future. + return Status::OK(); + } + + Status MarkCommit(const Slice&) override { + // TODO (yanqin): support in the future. + return Status::OK(); + } + + Status MarkRollback(const Slice&) override { + // TODO (yanqin): support in the future. + return Status::OK(); + } + + private: + void SanityCheck() const { + assert(!timestamps_.empty()); +#ifndef NDEBUG + const size_t ts_sz = timestamps_[0].size(); + for (size_t i = 1; i != timestamps_.size(); ++i) { + assert(ts_sz == timestamps_[i].size()); + } +#endif // !NDEBUG + } + + void AssignTimestamp(const Slice& key) { + assert(timestamps_.empty() || idx_ < timestamps_.size()); + const Slice& ts = timestamps_.empty() ? timestamp_ : timestamps_[idx_]; + size_t ts_sz = ts.size(); + char* ptr = const_cast(key.data() + key.size() - ts_sz); + memcpy(ptr, ts.data(), ts_sz); + } + + static const std::vector kEmptyTimestampList; + const Slice timestamp_; + const std::vector& timestamps_; + size_t idx_ = 0; + + // No copy or move. + TimestampAssigner(const TimestampAssigner&) = delete; + TimestampAssigner(TimestampAssigner&&) = delete; + TimestampAssigner& operator=(const TimestampAssigner&) = delete; + TimestampAssigner&& operator=(TimestampAssigner&&) = delete; +}; +const std::vector TimestampAssigner::kEmptyTimestampList; + } // anon namespace struct SavePoints { @@ -142,7 +241,15 @@ struct SavePoints { }; WriteBatch::WriteBatch(size_t reserved_bytes, size_t max_bytes) - : content_flags_(0), max_bytes_(max_bytes), rep_() { + : content_flags_(0), max_bytes_(max_bytes), rep_(), timestamp_size_(0) { + rep_.reserve((reserved_bytes > WriteBatchInternal::kHeader) + ? reserved_bytes + : WriteBatchInternal::kHeader); + rep_.resize(WriteBatchInternal::kHeader); +} + +WriteBatch::WriteBatch(size_t reserved_bytes, size_t max_bytes, size_t ts_sz) + : content_flags_(0), max_bytes_(max_bytes), rep_(), timestamp_size_(ts_sz) { rep_.reserve((reserved_bytes > WriteBatchInternal::kHeader) ? reserved_bytes : WriteBatchInternal::kHeader); rep_.resize(WriteBatchInternal::kHeader); @@ -151,18 +258,21 @@ WriteBatch::WriteBatch(size_t reserved_bytes, size_t max_bytes) WriteBatch::WriteBatch(const std::string& rep) : content_flags_(ContentFlags::DEFERRED), max_bytes_(0), - rep_(rep) {} + rep_(rep), + timestamp_size_(0) {} WriteBatch::WriteBatch(std::string&& rep) : content_flags_(ContentFlags::DEFERRED), max_bytes_(0), - rep_(std::move(rep)) {} + rep_(std::move(rep)), + timestamp_size_(0) {} WriteBatch::WriteBatch(const WriteBatch& src) : wal_term_point_(src.wal_term_point_), content_flags_(src.content_flags_.load(std::memory_order_relaxed)), max_bytes_(src.max_bytes_), - rep_(src.rep_) { + rep_(src.rep_), + timestamp_size_(src.timestamp_size_) { if (src.save_points_ != nullptr) { save_points_.reset(new SavePoints()); save_points_->stack = src.save_points_->stack; @@ -174,7 +284,8 @@ WriteBatch::WriteBatch(WriteBatch&& src) noexcept wal_term_point_(std::move(src.wal_term_point_)), content_flags_(src.content_flags_.load(std::memory_order_relaxed)), max_bytes_(src.max_bytes_), - rep_(std::move(src.rep_)) {} + rep_(std::move(src.rep_)), + timestamp_size_(src.timestamp_size_) {} WriteBatch& WriteBatch::operator=(const WriteBatch& src) { if (&src != this) { @@ -643,7 +754,14 @@ Status WriteBatchInternal::Put(WriteBatch* b, uint32_t column_family_id, b->rep_.push_back(static_cast(kTypeColumnFamilyValue)); PutVarint32(&b->rep_, column_family_id); } - PutLengthPrefixedSlice(&b->rep_, key); + if (0 == b->timestamp_size_) { + PutLengthPrefixedSlice(&b->rep_, key); + } else { + PutVarint32(&b->rep_, + static_cast(key.size() + b->timestamp_size_)); + b->rep_.append(key.data(), key.size()); + b->rep_.append(b->timestamp_size_, '\0'); + } PutLengthPrefixedSlice(&b->rep_, value); b->content_flags_.store( b->content_flags_.load(std::memory_order_relaxed) | ContentFlags::HAS_PUT, @@ -692,7 +810,11 @@ Status WriteBatchInternal::Put(WriteBatch* b, uint32_t column_family_id, b->rep_.push_back(static_cast(kTypeColumnFamilyValue)); PutVarint32(&b->rep_, column_family_id); } - PutLengthPrefixedSliceParts(&b->rep_, key); + if (0 == b->timestamp_size_) { + PutLengthPrefixedSliceParts(&b->rep_, key); + } else { + PutLengthPrefixedSlicePartsWithPadding(&b->rep_, key, b->timestamp_size_); + } PutLengthPrefixedSliceParts(&b->rep_, value); b->content_flags_.store( b->content_flags_.load(std::memory_order_relaxed) | ContentFlags::HAS_PUT, @@ -1038,6 +1160,16 @@ Status WriteBatch::PopSavePoint() { return Status::OK(); } +Status WriteBatch::AssignTimestamp(const Slice& ts) { + TimestampAssigner ts_assigner(ts); + return Iterate(&ts_assigner); +} + +Status WriteBatch::AssignTimestamps(const std::vector& ts_list) { + TimestampAssigner ts_assigner(ts_list); + return Iterate(&ts_assigner); +} + class MemTableInserter : public WriteBatch::Handler { SequenceNumber sequence_; diff --git a/include/rocksdb/write_batch.h b/include/rocksdb/write_batch.h index 29b660d19..393c5d9c6 100644 --- a/include/rocksdb/write_batch.h +++ b/include/rocksdb/write_batch.h @@ -28,6 +28,7 @@ #include #include #include +#include #include "rocksdb/status.h" #include "rocksdb/write_batch_base.h" @@ -60,6 +61,7 @@ struct SavePoint { class WriteBatch : public WriteBatchBase { public: explicit WriteBatch(size_t reserved_bytes = 0, size_t max_bytes = 0); + explicit WriteBatch(size_t reserved_bytes, size_t max_bytes, size_t ts_sz); ~WriteBatch() override; using WriteBatchBase::Put; @@ -311,6 +313,12 @@ class WriteBatch : public WriteBatchBase { // Returns trie if MarkRollback will be called during Iterate bool HasRollback() const; + // Assign timestamp to write batch + Status AssignTimestamp(const Slice& ts); + + // Assign timestamps to write batch + Status AssignTimestamps(const std::vector& ts_list); + using WriteBatchBase::GetWriteBatch; WriteBatch* GetWriteBatch() override { return this; } @@ -361,6 +369,7 @@ class WriteBatch : public WriteBatchBase { protected: std::string rep_; // See comment in write_batch.cc for the format of rep_ + const size_t timestamp_size_; // Intentionally copyable }; diff --git a/util/coding.h b/util/coding.h index 9427d5261..3ad6d9570 100644 --- a/util/coding.h +++ b/util/coding.h @@ -50,6 +50,8 @@ extern void PutVarint32Varint32Varint64(std::string* dst, uint32_t value1, extern void PutLengthPrefixedSlice(std::string* dst, const Slice& value); extern void PutLengthPrefixedSliceParts(std::string* dst, const SliceParts& slice_parts); +extern void PutLengthPrefixedSlicePartsWithPadding( + std::string* dst, const SliceParts& slice_parts, size_t pad_sz); // Standard Get... routines parse a value from the beginning of a Slice // and advance the slice past the parsed value. @@ -306,9 +308,8 @@ inline void PutLengthPrefixedSlice(std::string* dst, const Slice& value) { dst->append(value.data(), value.size()); } -inline void PutLengthPrefixedSliceParts(std::string* dst, +inline void PutLengthPrefixedSliceParts(std::string* dst, size_t total_bytes, const SliceParts& slice_parts) { - size_t total_bytes = 0; for (int i = 0; i < slice_parts.num_parts; ++i) { total_bytes += slice_parts.parts[i].size(); } @@ -318,6 +319,17 @@ inline void PutLengthPrefixedSliceParts(std::string* dst, } } +inline void PutLengthPrefixedSliceParts(std::string* dst, + const SliceParts& slice_parts) { + PutLengthPrefixedSliceParts(dst, /*total_bytes=*/0, slice_parts); +} + +inline void PutLengthPrefixedSlicePartsWithPadding( + std::string* dst, const SliceParts& slice_parts, size_t pad_sz) { + PutLengthPrefixedSliceParts(dst, /*total_bytes=*/pad_sz, slice_parts); + dst->append(pad_sz, '\0'); +} + inline int VarintLength(uint64_t v) { int len = 1; while (v >= 128) {