From 5e9f3a9aa720b796cd633c257e4d3e88f33aae40 Mon Sep 17 00:00:00 2001 From: Dhruba Borthakur Date: Tue, 17 Sep 2013 14:11:04 -0700 Subject: [PATCH] Better locking in vectorrep that increases throughput to match speed of storage. Summary: There is a use-case where we want to insert data into rocksdb as fast as possible. Vector rep is used for this purpose. The background flush thread needs to flush the vectorrep to storage. It acquires the dblock then sorts the vector, releases the dblock and then writes the sorted vector to storage. This is suboptimal because the lock is held during the sort, which prevents new writes for occuring. This patch moves the sorting of the vector rep to outside the db mutex. Performance is now as fastas the underlying storage system. If you are doing buffered writes to rocksdb files, then you can observe throughput upwards of 200 MB/sec writes. This is an early draft and not yet ready to be reviewed. Test Plan: make check Task ID: # Blame Rev: Reviewers: haobo Reviewed By: haobo CC: leveldb, haobo Differential Revision: https://reviews.facebook.net/D12987 --- db/db_bench.cc | 57 +++++++++++++++++++++++++++++++++++++++-- util/vectorrep.cc | 65 ++++++++++++++++++++++++++++++++++------------- 2 files changed, 102 insertions(+), 20 deletions(-) diff --git a/db/db_bench.cc b/db/db_bench.cc index 096b18377..1bc1caa73 100644 --- a/db/db_bench.cc +++ b/db/db_bench.cc @@ -13,6 +13,7 @@ #include "rocksdb/cache.h" #include "rocksdb/db.h" #include "rocksdb/env.h" +#include "rocksdb/memtablerep.h" #include "rocksdb/write_batch.h" #include "rocksdb/statistics.h" #include "port/port.h" @@ -79,6 +80,7 @@ static const char* FLAGS_benchmarks = "snappycomp," "snappyuncomp," "acquireload," + "fillfromstdin," ; // the maximum size of key in bytes static const int MAX_KEY_SIZE = 128; @@ -906,6 +908,9 @@ class Benchmark { } else if (name == Slice("fillrandom")) { fresh_db = true; method = &Benchmark::WriteRandom; + } else if (name == Slice("fillfromstdin")) { + fresh_db = true; + method = &Benchmark::WriteFromStdin; } else if (name == Slice("filluniquerandom")) { fresh_db = true; if (num_threads > 1) { @@ -1342,6 +1347,54 @@ class Benchmark { DoWrite(thread, UNIQUE_RANDOM); } + void writeOrFail(WriteBatch& batch) { + Status s = db_->Write(write_options_, &batch); + if (!s.ok()) { + fprintf(stderr, "put error: %s\n", s.ToString().c_str()); + exit(1); + } + } + + void WriteFromStdin(ThreadState* thread) { + size_t count = 0; + WriteBatch batch; + const size_t bufferLen = 32 << 20; + unique_ptr line = unique_ptr(new char[bufferLen]); + char* linep = line.get(); + const int batchSize = 100 << 10; + const char columnSeparator = '\t'; + const char lineSeparator = '\n'; + + while (fgets(linep, bufferLen, stdin) != nullptr) { + ++count; + char* tab = std::find(linep, linep + bufferLen, columnSeparator); + if (tab == linep + bufferLen) { + fprintf(stderr, "[Error] No Key delimiter TAB at line %ld\n", count); + continue; + } + Slice key(linep, tab - linep); + tab++; + char* endLine = std::find(tab, linep + bufferLen, lineSeparator); + if (endLine == linep + bufferLen) { + fprintf(stderr, "[Error] No ENTER at end of line # %ld\n", count); + continue; + } + Slice value(tab, endLine - tab); + thread->stats.FinishedSingleOp(db_); + thread->stats.AddBytes(endLine - linep - 1); + + if (batch.Count() < batchSize) { + batch.Put(key, value); + continue; + } + writeOrFail(batch); + batch.Clear(); + } + if (batch.Count() > 0) { + writeOrFail(batch); + } + } + void DoWrite(ThreadState* thread, WriteMode write_mode) { const int test_duration = write_mode == RANDOM ? FLAGS_duration : 0; const int num_ops = writes_ == 0 ? num_ : writes_ ; @@ -2320,8 +2373,8 @@ int main(int argc, char** argv) { } else { FLAGS_key_size = n; } - } else if (sscanf(argv[i], "--write_buffer_size=%d%c", &n, &junk) == 1) { - FLAGS_write_buffer_size = n; + } else if (sscanf(argv[i], "--write_buffer_size=%lld%c", &ll, &junk) == 1) { + FLAGS_write_buffer_size = ll; } else if (sscanf(argv[i], "--max_write_buffer_number=%d%c", &n, &junk) == 1) { FLAGS_max_write_buffer_number = n; } else if (sscanf(argv[i], "--min_write_buffer_number_to_merge=%d%c", diff --git a/util/vectorrep.cc b/util/vectorrep.cc index 14443c731..8770f60b9 100644 --- a/util/vectorrep.cc +++ b/util/vectorrep.cc @@ -36,11 +36,15 @@ class VectorRep : public MemTableRep { virtual ~VectorRep() override { } class Iterator : public MemTableRep::Iterator { + class VectorRep* vrep_; std::shared_ptr> bucket_; - typename std::vector::const_iterator cit_; + typename std::vector::const_iterator mutable cit_; const KeyComparator& compare_; + bool mutable sorted_; + void DoSort() const; public: - explicit Iterator(std::shared_ptr> bucket, + explicit Iterator(class VectorRep* vrep, + std::shared_ptr> bucket, const KeyComparator& compare); // Initialize an iterator over the specified collection. @@ -82,11 +86,12 @@ class VectorRep : public MemTableRep { virtual std::shared_ptr GetIterator() override; private: + friend class Iterator; typedef std::vector Bucket; std::shared_ptr bucket_; mutable port::RWMutex rwlock_; - bool immutable_ = false; - bool sorted_ = false; + bool immutable_; + bool sorted_; const KeyComparator& compare_; }; @@ -119,16 +124,42 @@ size_t VectorRep::ApproximateMemoryUsage() { VectorRep::VectorRep(const KeyComparator& compare, Arena* arena, size_t count) : bucket_(new Bucket(count)), + immutable_(false), + sorted_(false), compare_(compare) { } -VectorRep::Iterator::Iterator(std::shared_ptr> bucket, +VectorRep::Iterator::Iterator(class VectorRep* vrep, + std::shared_ptr> bucket, const KeyComparator& compare) -: bucket_(bucket), - cit_(bucket_->begin()), - compare_(compare) { } +: vrep_(vrep), + bucket_(bucket), + cit_(nullptr), + compare_(compare), + sorted_(false) { } + +void VectorRep::Iterator::DoSort() const { + // vrep is non-null means that we are working on an immutable memtable + if (!sorted_ && vrep_ != nullptr) { + WriteLock l(&vrep_->rwlock_); + if (!vrep_->sorted_) { + std::sort(bucket_->begin(), bucket_->end(), Compare(compare_)); + cit_ = bucket_->begin(); + vrep_->sorted_ = true; + } + sorted_ = true; + } + if (!sorted_) { + std::sort(bucket_->begin(), bucket_->end(), Compare(compare_)); + cit_ = bucket_->begin(); + sorted_ = true; + } + assert(sorted_); + assert(vrep_ == nullptr || vrep_->sorted_); +} // Returns true iff the iterator is positioned at a valid node. bool VectorRep::Iterator::Valid() const { + DoSort(); return cit_ != bucket_->end(); } @@ -165,6 +196,7 @@ void VectorRep::Iterator::Prev() { // Advance to the first entry with a key >= target void VectorRep::Iterator::Seek(const char* target) { + DoSort(); // Do binary search to find first value not less than the target cit_ = std::equal_range(bucket_->begin(), bucket_->end(), @@ -177,12 +209,14 @@ void VectorRep::Iterator::Seek(const char* target) { // Position at the first entry in collection. // Final state of iterator is Valid() iff collection is not empty. void VectorRep::Iterator::SeekToFirst() { + DoSort(); cit_ = bucket_->begin(); } // Position at the last entry in collection. // Final state of iterator is Valid() iff collection is not empty. void VectorRep::Iterator::SeekToLast() { + DoSort(); cit_ = bucket_->end(); if (bucket_->size() != 0) { --cit_; @@ -190,21 +224,16 @@ void VectorRep::Iterator::SeekToLast() { } std::shared_ptr VectorRep::GetIterator() { - std::shared_ptr tmp; ReadLock l(&rwlock_); + // Do not sort here. The sorting would be done the first time + // a Seek is performed on the iterator. if (immutable_) { - rwlock_.Unlock(); - rwlock_.WriteLock(); - tmp = bucket_; - if (!sorted_) { - std::sort(tmp->begin(), tmp->end(), Compare(compare_)); - sorted_ = true; - } + return std::make_shared(this, bucket_, compare_); } else { + std::shared_ptr tmp; tmp.reset(new Bucket(*bucket_)); // make a copy - std::sort(tmp->begin(), tmp->end(), Compare(compare_)); + return std::make_shared(nullptr, tmp, compare_); } - return std::make_shared(tmp, compare_); } } // anon namespace