diff --git a/db/db_impl.cc b/db/db_impl.cc index b50eb4c44..37e8d7582 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -56,6 +56,7 @@ #include "util/mutexlock.h" #include "util/perf_context_imp.h" #include "util/stop_watch.h" +#include "util/autovector.h" namespace rocksdb { @@ -2969,12 +2970,8 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { uint64_t last_sequence = versions_->LastSequence(); Writer* last_writer = &w; if (status.ok() && my_batch != nullptr) { // nullptr batch is for compactions - // TODO: BuildBatchGroup physically concatenate/copy all write batches into - // a new one. Mem copy is done with the lock held. Ideally, we only need - // the lock to obtain the last_writer and the references to all batches. - // Creation (copy) of the merged batch could have been done outside of the - // lock protected region. - WriteBatch* updates = BuildBatchGroup(&last_writer); + autovector write_batch_group; + BuildBatchGroup(&last_writer, &write_batch_group); // Add to log and apply to memtable. We can release the lock // during this phase since &w is currently responsible for logging @@ -2982,6 +2979,16 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { // into mem_. { mutex_.Unlock(); + WriteBatch* updates = nullptr; + if (write_batch_group.size() == 1) { + updates = write_batch_group[0]; + } else { + updates = &tmp_batch_; + for (size_t i = 0; i < write_batch_group.size(); ++i) { + WriteBatchInternal::Append(updates, write_batch_group[i]); + } + } + const SequenceNumber current_sequence = last_sequence + 1; WriteBatchInternal::SetSequence(updates, current_sequence); int my_batch_count = WriteBatchInternal::Count(updates); @@ -3027,12 +3034,12 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { SetTickerCount(options_.statistics.get(), SEQUENCE_NUMBER, last_sequence); } + if (updates == &tmp_batch_) tmp_batch_.Clear(); mutex_.Lock(); if (status.ok()) { versions_->SetLastSequence(last_sequence); } } - if (updates == &tmp_batch_) tmp_batch_.Clear(); } if (options_.paranoid_checks && !status.ok() && bg_error_.ok()) { bg_error_ = status; // stop compaction & fail any further writes @@ -3060,13 +3067,14 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { // REQUIRES: Writer list must be non-empty // REQUIRES: First writer must have a non-nullptr batch -WriteBatch* DBImpl::BuildBatchGroup(Writer** last_writer) { +void DBImpl::BuildBatchGroup(Writer** last_writer, + autovector* write_batch_group) { assert(!writers_.empty()); Writer* first = writers_.front(); - WriteBatch* result = first->batch; - assert(result != nullptr); + assert(first->batch != nullptr); size_t size = WriteBatchInternal::ByteSize(first->batch); + write_batch_group->push_back(first->batch); // Allow the group to grow up to a maximum size, but if the // original write is small, limit the growth so we do not slow @@ -3099,18 +3107,10 @@ WriteBatch* DBImpl::BuildBatchGroup(Writer** last_writer) { break; } - // Append to *reuslt - if (result == first->batch) { - // Switch to temporary batch instead of disturbing caller's batch - result = &tmp_batch_; - assert(WriteBatchInternal::Count(result) == 0); - WriteBatchInternal::Append(result, first->batch); - } - WriteBatchInternal::Append(result, w->batch); + write_batch_group->push_back(w->batch); } *last_writer = w; } - return result; } // This function computes the amount of time in microseconds by which a write diff --git a/db/db_impl.h b/db/db_impl.h index d33efd19e..d74b77aa4 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -22,6 +22,7 @@ #include "port/port.h" #include "util/stats_logger.h" #include "memtablelist.h" +#include "util/autovector.h" namespace rocksdb { @@ -291,7 +292,8 @@ class DBImpl : public DB { // the superversion outside of mutex Status MakeRoomForWrite(bool force /* compact even if there is room? */, SuperVersion** superversion_to_free); - WriteBatch* BuildBatchGroup(Writer** last_writer); + void BuildBatchGroup(Writer** last_writer, + autovector* write_batch_group); // Force current memtable contents to be flushed. Status FlushMemTable(const FlushOptions& options); diff --git a/db/db_test.cc b/db/db_test.cc index a0b3d9aaa..560311ae3 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -4333,6 +4333,69 @@ TEST(DBTest, MultiThreaded) { } while (ChangeOptions()); } +// Group commit test: +namespace { + +static const int kGCNumThreads = 4; +static const int kGCNumKeys = 1000; + +struct GCThread { + DB* db; + int id; + std::atomic done; +}; + +static void GCThreadBody(void* arg) { + GCThread* t = reinterpret_cast(arg); + int id = t->id; + DB* db = t->db; + WriteOptions wo; + + for (int i = 0; i < kGCNumKeys; ++i) { + std::string kv(std::to_string(i + id * kGCNumKeys)); + ASSERT_OK(db->Put(wo, kv, kv)); + } + t->done = true; +} + +} // namespace + +TEST(DBTest, GroupCommitTest) { + do { + // Start threads + GCThread thread[kGCNumThreads]; + for (int id = 0; id < kGCNumThreads; id++) { + thread[id].id = id; + thread[id].db = db_; + thread[id].done = false; + env_->StartThread(GCThreadBody, &thread[id]); + } + + for (int id = 0; id < kGCNumThreads; id++) { + while (thread[id].done == false) { + env_->SleepForMicroseconds(100000); + } + } + + std::vector expected_db; + for (int i = 0; i < kGCNumThreads * kGCNumKeys; ++i) { + expected_db.push_back(std::to_string(i)); + } + sort(expected_db.begin(), expected_db.end()); + + Iterator* itr = db_->NewIterator(ReadOptions()); + itr->SeekToFirst(); + for (auto x : expected_db) { + ASSERT_TRUE(itr->Valid()); + ASSERT_EQ(itr->key().ToString(), x); + ASSERT_EQ(itr->value().ToString(), x); + itr->Next(); + } + ASSERT_TRUE(!itr->Valid()); + + } while (ChangeOptions()); +} + namespace { typedef std::map KVMap; }