From 7d87f02799bd0a8fd36df24fab5baa4968615c86 Mon Sep 17 00:00:00 2001 From: Nathan Bronson Date: Fri, 14 Aug 2015 16:59:07 -0700 Subject: [PATCH] support for concurrent adds to memtable Summary: This diff adds support for concurrent adds to the skiplist memtable implementations. Memory allocation is made thread-safe by the addition of a spinlock, with small per-core buffers to avoid contention. Concurrent memtable writes are made via an additional method and don't impose a performance overhead on the non-concurrent case, so parallelism can be selected on a per-batch basis. Write thread synchronization is an increasing bottleneck for higher levels of concurrency, so this diff adds --enable_write_thread_adaptive_yield (default off). This feature causes threads joining a write batch group to spin for a short time (default 100 usec) using sched_yield, rather than going to sleep on a mutex. If the timing of the yield calls indicates that another thread has actually run during the yield then spinning is avoided. This option improves performance for concurrent situations even without parallel adds, although it has the potential to increase CPU usage (and the heuristic adaptation is not yet mature). Parallel writes are not currently compatible with inplace updates, update callbacks, or delete filtering. Enable it with --allow_concurrent_memtable_write (and --enable_write_thread_adaptive_yield). Parallel memtable writes are performance neutral when there is no actual parallelism, and in my experiments (SSD server-class Linux and varying contention and key sizes for fillrandom) they are always a performance win when there is more than one thread. Statistics are updated earlier in the write path, dropping the number of DB mutex acquisitions from 2 to 1 for almost all cases. This diff was motivated and inspired by Yahoo's cLSM work. It is more conservative than cLSM: RocksDB's write batch group leader role is preserved (along with all of the existing flush and write throttling logic) and concurrent writers are blocked until all memtable insertions have completed and the sequence number has been advanced, to preserve linearizability. My test config is "db_bench -benchmarks=fillrandom -threads=$T -batch_size=1 -memtablerep=skip_list -value_size=100 --num=1000000/$T -level0_slowdown_writes_trigger=9999 -level0_stop_writes_trigger=9999 -disable_auto_compactions --max_write_buffer_number=8 -max_background_flushes=8 --disable_wal --write_buffer_size=160000000 --block_size=16384 --allow_concurrent_memtable_write" on a two-socket Xeon E5-2660 @ 2.2Ghz with lots of memory and an SSD hard drive. With 1 thread I get ~440Kops/sec. Peak performance for 1 socket (numactl -N1) is slightly more than 1Mops/sec, at 16 threads. Peak performance across both sockets happens at 30 threads, and is ~900Kops/sec, although with fewer threads there is less performance loss when the system has background work. Test Plan: 1. concurrent stress tests for InlineSkipList and DynamicBloom 2. make clean; make check 3. make clean; DISABLE_JEMALLOC=1 make valgrind_check; valgrind db_bench 4. make clean; COMPILE_WITH_TSAN=1 make all check; db_bench 5. make clean; COMPILE_WITH_ASAN=1 make all check; db_bench 6. make clean; OPT=-DROCKSDB_LITE make check 7. verify no perf regressions when disabled Reviewers: igor, sdong Reviewed By: sdong Subscribers: MarkCallaghan, IslamAbdelRahman, anthony, yhchiang, rven, sdong, guyg8, kradhakrishnan, dhruba Differential Revision: https://reviews.facebook.net/D50589 --- CMakeLists.txt | 1 + db/column_family.cc | 21 +- db/column_family.h | 49 +++-- db/db_bench.cc | 20 ++ db/db_impl.cc | 366 +++++++++++++++++++++------------- db/flush_scheduler.cc | 82 +++++--- db/flush_scheduler.h | 30 ++- db/inlineskiplist.h | 130 ++++++++++-- db/inlineskiplist_test.cc | 114 +++++++++-- db/internal_stats.cc | 16 +- db/internal_stats.h | 30 ++- db/memtable.cc | 109 +++++++--- db/memtable.h | 72 ++++--- db/memtable_allocator.cc | 28 ++- db/memtable_allocator.h | 9 +- db/repair.cc | 2 +- db/write_batch.cc | 82 +++++--- db/write_batch_internal.h | 21 +- db/write_batch_test.cc | 2 +- db/write_thread.cc | 298 ++++++++++++++++++++++++--- db/write_thread.h | 167 ++++++++++++---- db/writebuffer.h | 18 +- include/rocksdb/memtablerep.h | 38 +++- include/rocksdb/options.h | 41 ++++ include/rocksdb/statistics.h | 2 +- port/port_posix.cc | 16 ++ port/port_posix.h | 19 +- port/win/port_win.cc | 2 + port/win/port_win.h | 9 + src.mk | 1 + table/table_test.cc | 3 +- util/arena.cc | 2 +- util/arena.h | 3 +- util/concurrent_arena.cc | 49 +++++ util/concurrent_arena.h | 192 ++++++++++++++++++ util/dynamic_bloom.cc | 17 +- util/dynamic_bloom.h | 51 ++++- util/dynamic_bloom_test.cc | 163 ++++++++++++--- util/mutexlock.h | 39 ++++ util/options.cc | 17 ++ util/skiplistrep.cc | 6 + 41 files changed, 1827 insertions(+), 510 deletions(-) create mode 100644 util/concurrent_arena.cc create mode 100644 util/concurrent_arena.h diff --git a/CMakeLists.txt b/CMakeLists.txt index ca7c7a0b9..ca570ccd2 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -191,6 +191,7 @@ set(SOURCES util/coding.cc util/compaction_job_stats_impl.cc util/comparator.cc + util/concurrent_arena.cc util/crc32c.cc util/db_info_dumper.cc util/delete_scheduler_impl.cc diff --git a/db/column_family.cc b/db/column_family.cc index 2ef5a907f..0b74e4ebe 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -110,6 +110,20 @@ Status CheckCompressionSupported(const ColumnFamilyOptions& cf_options) { return Status::OK(); } +Status CheckConcurrentWritesSupported(const ColumnFamilyOptions& cf_options) { + if (cf_options.inplace_update_support) { + return Status::InvalidArgument( + "In-place memtable updates (inplace_update_support) is not compatible " + "with concurrent writes (allow_concurrent_memtable_write)"); + } + if (cf_options.filter_deletes) { + return Status::InvalidArgument( + "Delete filtering (filter_deletes) is not compatible with concurrent " + "memtable writes (allow_concurrent_memtable_writes)"); + } + return Status::OK(); +} + ColumnFamilyOptions SanitizeOptions(const DBOptions& db_options, const InternalKeyComparator* icmp, const ColumnFamilyOptions& src) { @@ -916,13 +930,6 @@ ColumnFamilyHandle* ColumnFamilyMemTablesImpl::GetColumnFamilyHandle() { return &handle_; } -void ColumnFamilyMemTablesImpl::CheckMemtableFull() { - if (current_ != nullptr && current_->mem()->ShouldScheduleFlush()) { - flush_scheduler_->ScheduleFlush(current_); - current_->mem()->MarkFlushScheduled(); - } -} - uint32_t GetColumnFamilyID(ColumnFamilyHandle* column_family) { uint32_t column_family_id = 0; if (column_family != nullptr) { diff --git a/db/column_family.h b/db/column_family.h index 64bb1c9a1..4ba154779 100644 --- a/db/column_family.h +++ b/db/column_family.h @@ -19,12 +19,10 @@ #include "db/write_controller.h" #include "db/table_cache.h" #include "db/table_properties_collector.h" -#include "db/flush_scheduler.h" #include "rocksdb/compaction_job_stats.h" #include "rocksdb/db.h" #include "rocksdb/env.h" #include "rocksdb/options.h" -#include "util/instrumented_mutex.h" #include "util/mutable_cf_options.h" #include "util/thread_local.h" @@ -134,6 +132,9 @@ struct SuperVersion { extern Status CheckCompressionSupported(const ColumnFamilyOptions& cf_options); +extern Status CheckConcurrentWritesSupported( + const ColumnFamilyOptions& cf_options); + extern ColumnFamilyOptions SanitizeOptions(const DBOptions& db_options, const InternalKeyComparator* icmp, const ColumnFamilyOptions& src); @@ -158,14 +159,16 @@ class ColumnFamilyData { // thread-safe const std::string& GetName() const { return name_; } - // Ref() can only be called whily holding a DB mutex or during a - // single-threaded write. + // Ref() can only be called from a context where the caller can guarantee + // that ColumnFamilyData is alive (while holding a non-zero ref already, + // holding a DB mutex, or as the leader in a write batch group). void Ref() { refs_.fetch_add(1, std::memory_order_relaxed); } - // will just decrease reference count to 0, but will not delete it. returns - // true if the ref count was decreased to zero. in that case, it can be - // deleted by the caller immediately, or later, by calling - // FreeDeadColumnFamilies() - // Unref() can only be called while holding a DB mutex + + // Unref decreases the reference count, but does not handle deletion + // when the count goes to 0. If this method returns true then the + // caller should delete the instance immediately, or later, by calling + // FreeDeadColumnFamilies(). Unref() can only be called while holding + // a DB mutex, or during single-threaded recovery. bool Unref() { int old_refs = refs_.fetch_sub(1, std::memory_order_relaxed); assert(old_refs > 0); @@ -497,15 +500,18 @@ class ColumnFamilySet { // memtables of different column families (specified by ID in the write batch) class ColumnFamilyMemTablesImpl : public ColumnFamilyMemTables { public: - explicit ColumnFamilyMemTablesImpl(ColumnFamilySet* column_family_set, - FlushScheduler* flush_scheduler) - : column_family_set_(column_family_set), - current_(nullptr), - flush_scheduler_(flush_scheduler) {} + explicit ColumnFamilyMemTablesImpl(ColumnFamilySet* column_family_set) + : column_family_set_(column_family_set), current_(nullptr) {} + + // Constructs a ColumnFamilyMemTablesImpl equivalent to one constructed + // with the arguments used to construct *orig. + explicit ColumnFamilyMemTablesImpl(ColumnFamilyMemTablesImpl* orig) + : column_family_set_(orig->column_family_set_), current_(nullptr) {} // sets current_ to ColumnFamilyData with column_family_id // returns false if column family doesn't exist - // REQUIRES: under a DB mutex OR from a write thread + // REQUIRES: use this function of DBImpl::column_family_memtables_ should be + // under a DB mutex OR from a write thread bool Seek(uint32_t column_family_id) override; // Returns log number of the selected column family @@ -513,20 +519,23 @@ class ColumnFamilyMemTablesImpl : public ColumnFamilyMemTables { uint64_t GetLogNumber() const override; // REQUIRES: Seek() called first - // REQUIRES: under a DB mutex OR from a write thread + // REQUIRES: use this function of DBImpl::column_family_memtables_ should be + // under a DB mutex OR from a write thread virtual MemTable* GetMemTable() const override; // Returns column family handle for the selected column family - // REQUIRES: under a DB mutex OR from a write thread + // REQUIRES: use this function of DBImpl::column_family_memtables_ should be + // under a DB mutex OR from a write thread virtual ColumnFamilyHandle* GetColumnFamilyHandle() override; - // REQUIRES: under a DB mutex OR from a write thread - virtual void CheckMemtableFull() override; + // Cannot be called while another thread is calling Seek(). + // REQUIRES: use this function of DBImpl::column_family_memtables_ should be + // under a DB mutex OR from a write thread + virtual ColumnFamilyData* current() { return current_; } private: ColumnFamilySet* column_family_set_; ColumnFamilyData* current_; - FlushScheduler* flush_scheduler_; ColumnFamilyHandleInternal handle_; }; diff --git a/db/db_bench.cc b/db/db_bench.cc index 42667b362..0f455d9c7 100644 --- a/db/db_bench.cc +++ b/db/db_bench.cc @@ -646,6 +646,20 @@ DEFINE_uint64(delayed_write_rate, 8388608u, "Limited bytes allowed to DB when soft_rate_limit or " "level0_slowdown_writes_trigger triggers"); +DEFINE_bool(allow_concurrent_memtable_write, false, + "Allow multi-writers to update mem tables in parallel."); + +DEFINE_bool(enable_write_thread_adaptive_yield, false, + "Use a yielding spin loop for brief writer thread waits."); + +DEFINE_uint64( + write_thread_max_yield_usec, 100, + "Maximum microseconds for enable_write_thread_adaptive_yield operation."); + +DEFINE_uint64(write_thread_slow_yield_usec, 3, + "The threshold at which a slow yield is considered a signal that " + "other processes or threads want the core."); + DEFINE_int32(rate_limit_delay_max_milliseconds, 1000, "When hard_rate_limit is set then this is the max time a put will" " be stalled."); @@ -2552,6 +2566,12 @@ class Benchmark { options.hard_pending_compaction_bytes_limit = FLAGS_hard_pending_compaction_bytes_limit; options.delayed_write_rate = FLAGS_delayed_write_rate; + options.allow_concurrent_memtable_write = + FLAGS_allow_concurrent_memtable_write; + options.enable_write_thread_adaptive_yield = + FLAGS_enable_write_thread_adaptive_yield; + options.write_thread_max_yield_usec = FLAGS_write_thread_max_yield_usec; + options.write_thread_slow_yield_usec = FLAGS_write_thread_slow_yield_usec; options.rate_limit_delay_max_milliseconds = FLAGS_rate_limit_delay_max_milliseconds; options.table_cache_numshardbits = FLAGS_table_cache_numshardbits; diff --git a/db/db_impl.cc b/db/db_impl.cc index 25f0fa066..fb179e04b 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -247,6 +247,10 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname) max_total_in_memory_state_(0), is_snapshot_supported_(true), write_buffer_(options.db_write_buffer_size), + write_thread_(options.enable_write_thread_adaptive_yield + ? options.write_thread_max_yield_usec + : 0, + options.write_thread_slow_yield_usec), write_controller_(options.delayed_write_rate), last_batch_group_size_(0), unscheduled_flushes_(0), @@ -282,8 +286,8 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname) versions_.reset(new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(), &write_buffer_, &write_controller_)); - column_family_memtables_.reset(new ColumnFamilyMemTablesImpl( - versions_->GetColumnFamilySet(), &flush_scheduler_)); + column_family_memtables_.reset( + new ColumnFamilyMemTablesImpl(versions_->GetColumnFamilySet())); DumpRocksDBBuildVersion(db_options_.info_log.get()); DumpDBFileSummary(db_options_, dbname_); @@ -1235,8 +1239,9 @@ Status DBImpl::RecoverLogFiles(const std::vector& log_numbers, // insert. We don't want to fail the whole write batch in that case -- // we just ignore the update. // That's why we set ignore missing column families to true - status = WriteBatchInternal::InsertInto( - &batch, column_family_memtables_.get(), true, log_number); + status = + WriteBatchInternal::InsertInto(&batch, column_family_memtables_.get(), + &flush_scheduler_, true, log_number); MaybeIgnoreError(&status); if (!status.ok()) { @@ -1257,7 +1262,7 @@ Status DBImpl::RecoverLogFiles(const std::vector& log_numbers, // DB and there is only a single thread operating on DB ColumnFamilyData* cfd; - while ((cfd = flush_scheduler_.GetNextColumnFamily()) != nullptr) { + while ((cfd = flush_scheduler_.TakeNextColumnFamily()) != nullptr) { cfd->Unref(); // If this asserts, it means that InsertInto failed in // filtering updates to already-flushed column families @@ -3623,6 +3628,9 @@ Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& cf_options, *handle = nullptr; s = CheckCompressionSupported(cf_options); + if (s.ok() && db_options_.allow_concurrent_memtable_write) { + s = CheckConcurrentWritesSupported(cf_options); + } if (!s.ok()) { return s; } @@ -4071,7 +4079,6 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, w.sync = write_options.sync; w.disableWAL = write_options.disableWAL; w.in_batch_group = false; - w.done = false; w.has_callback = (callback != nullptr) ? true : false; if (!write_options.disableWAL) { @@ -4081,12 +4088,35 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, StopWatch write_sw(env_, db_options_.statistics.get(), DB_WRITE); write_thread_.JoinBatchGroup(&w); - if (w.done) { - // write was done by someone else, no need to grab mutex + if (w.state == WriteThread::STATE_PARALLEL_FOLLOWER) { + // we are a non-leader in a parallel group + PERF_TIMER_GUARD(write_memtable_time); + + ColumnFamilyMemTablesImpl column_family_memtables( + versions_->GetColumnFamilySet()); + WriteBatchInternal::SetSequence(w.batch, w.sequence); + w.status = WriteBatchInternal::InsertInto( + w.batch, &column_family_memtables, &flush_scheduler_, + write_options.ignore_missing_column_families, 0 /*log_number*/, this, + true /*dont_filter_deletes*/, true /*concurrent_memtable_writes*/); + + if (write_thread_.CompleteParallelWorker(&w)) { + // we're responsible for early exit + auto last_sequence = w.parallel_group->last_writer->sequence; + SetTickerCount(stats_, SEQUENCE_NUMBER, last_sequence); + versions_->SetLastSequence(last_sequence); + write_thread_.EarlyExitParallelGroup(&w); + } + assert(w.state == WriteThread::STATE_COMPLETED); + // STATE_COMPLETED conditional below handles exit + } + if (w.state == WriteThread::STATE_COMPLETED) { + // write is complete and leader has updated sequence RecordTick(stats_, WRITE_DONE_BY_OTHER); return w.status; } // else we are the leader of the write batch group + assert(w.state == WriteThread::STATE_GROUP_LEADER); WriteContext context; mutex_.Lock(); @@ -4108,9 +4138,9 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, uint64_t max_total_wal_size = (db_options_.max_total_wal_size == 0) ? 4 * max_total_in_memory_state_ : db_options_.max_total_wal_size; - if (UNLIKELY(!single_column_family_mode_) && - alive_log_files_.begin()->getting_flushed == false && - total_log_size_ > max_total_wal_size) { + if (UNLIKELY(!single_column_family_mode_ && + alive_log_files_.begin()->getting_flushed == false && + total_log_size_ > max_total_wal_size)) { uint64_t flush_column_family_if_log_file = alive_log_files_.begin()->number; alive_log_files_.begin()->getting_flushed = true; Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log, @@ -4175,8 +4205,8 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, status = ScheduleFlushes(&context); } - if (UNLIKELY(status.ok()) && - (write_controller_.IsStopped() || write_controller_.NeedsDelay())) { + if (UNLIKELY(status.ok() && (write_controller_.IsStopped() || + write_controller_.NeedsDelay()))) { PERF_TIMER_STOP(write_pre_and_post_process_time); PERF_TIMER_GUARD(write_delay_time); // We don't know size of curent batch so that we always use the size @@ -4194,9 +4224,6 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, bool need_log_dir_sync = need_log_sync && !log_dir_synced_; if (status.ok()) { - last_batch_group_size_ = write_thread_.EnterAsBatchGroupLeader( - &w, &last_writer, &write_batch_group); - if (need_log_sync) { while (logs_.front().getting_synced) { log_sync_cv_.Wait(); @@ -4226,153 +4253,203 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, // At this point the mutex is unlocked + bool exit_completed_early = false; + last_batch_group_size_ = write_thread_.EnterAsBatchGroupLeader( + &w, &last_writer, &write_batch_group); + if (status.ok()) { - int total_count = 0; - uint64_t total_byte_size = 0; - for (auto b : write_batch_group) { - total_count += WriteBatchInternal::Count(b); - total_byte_size = WriteBatchInternal::AppendedByteSize( - total_byte_size, WriteBatchInternal::ByteSize(b)); - } + // Rules for when we can update the memtable concurrently + // 1. supported by memtable + // 2. Puts are not okay if inplace_update_support + // 3. Deletes or SingleDeletes are not okay if filtering deletes + // (controlled by both batch and memtable setting) + // 4. Merges are not okay + // + // Rules 1..3 are enforced by checking the options + // during startup (CheckConcurrentWritesSupported), so if + // options.allow_concurrent_memtable_write is true then they can be + // assumed to be true. Rule 4 is checked for each batch. We could + // relax rules 2 and 3 if we could prevent write batches from referring + // more than once to a particular key. + bool parallel = db_options_.allow_concurrent_memtable_write && + write_batch_group.size() > 1; + int total_count = 0; + uint64_t total_byte_size = 0; + for (auto b : write_batch_group) { + total_count += WriteBatchInternal::Count(b); + total_byte_size = WriteBatchInternal::AppendedByteSize( + total_byte_size, WriteBatchInternal::ByteSize(b)); + parallel = parallel && !b->HasMerge(); + } - const SequenceNumber current_sequence = last_sequence + 1; - last_sequence += total_count; + const SequenceNumber current_sequence = last_sequence + 1; + last_sequence += total_count; - // Record statistics - RecordTick(stats_, NUMBER_KEYS_WRITTEN, total_count); - RecordTick(stats_, BYTES_WRITTEN, total_byte_size); - PERF_TIMER_STOP(write_pre_and_post_process_time); + // Record statistics + RecordTick(stats_, NUMBER_KEYS_WRITTEN, total_count); + RecordTick(stats_, BYTES_WRITTEN, total_byte_size); + PERF_TIMER_STOP(write_pre_and_post_process_time); - if (write_options.disableWAL) { - flush_on_destroy_ = true; - } + if (write_options.disableWAL) { + flush_on_destroy_ = true; + } - uint64_t log_size = 0; - if (!write_options.disableWAL) { - PERF_TIMER_GUARD(write_wal_time); + uint64_t log_size = 0; + if (!write_options.disableWAL) { + PERF_TIMER_GUARD(write_wal_time); - WriteBatch* merged_batch = nullptr; - if (write_batch_group.size() == 1) { - merged_batch = write_batch_group[0]; - } else { - // WAL needs all of the batches flattened into a single batch. - // We could avoid copying here with an iov-like AddRecord - // interface - merged_batch = &tmp_batch_; - for (auto b : write_batch_group) { - WriteBatchInternal::Append(merged_batch, b); - } - } - WriteBatchInternal::SetSequence(merged_batch, current_sequence); - - assert(WriteBatchInternal::Count(merged_batch) == total_count); - assert(WriteBatchInternal::ByteSize(merged_batch) == total_byte_size); - - Slice log_entry = WriteBatchInternal::Contents(merged_batch); - status = logs_.back().writer->AddRecord(log_entry); - total_log_size_ += log_entry.size(); - alive_log_files_.back().AddSize(log_entry.size()); - log_empty_ = false; - log_size = log_entry.size(); - RecordTick(stats_, WAL_FILE_BYTES, log_size); - if (status.ok() && need_log_sync) { - RecordTick(stats_, WAL_FILE_SYNCED); - StopWatch sw(env_, stats_, WAL_FILE_SYNC_MICROS); - // It's safe to access logs_ with unlocked mutex_ here because: - // - we've set getting_synced=true for all logs, - // so other threads won't pop from logs_ while we're here, - // - only writer thread can push to logs_, and we're in - // writer thread, so no one will push to logs_, - // - as long as other threads don't modify it, it's safe to read - // from std::deque from multiple threads concurrently. - for (auto& log : logs_) { - status = log.writer->file()->Sync(db_options_.use_fsync); - if (!status.ok()) { - break; - } - } - if (status.ok() && need_log_dir_sync) { - // We only sync WAL directory the first time WAL syncing is - // requested, so that in case users never turn on WAL sync, - // we can avoid the disk I/O in the write code path. - status = directories_.GetWalDir()->Fsync(); - } - } - - if (merged_batch == &tmp_batch_) { - tmp_batch_.Clear(); + WriteBatch* merged_batch = nullptr; + if (write_batch_group.size() == 1) { + merged_batch = write_batch_group[0]; + } else { + // WAL needs all of the batches flattened into a single batch. + // We could avoid copying here with an iov-like AddRecord + // interface + merged_batch = &tmp_batch_; + for (auto b : write_batch_group) { + WriteBatchInternal::Append(merged_batch, b); } } - if (status.ok()) { - PERF_TIMER_GUARD(write_memtable_time); + WriteBatchInternal::SetSequence(merged_batch, current_sequence); + assert(WriteBatchInternal::Count(merged_batch) == total_count); + assert(WriteBatchInternal::ByteSize(merged_batch) == total_byte_size); + + Slice log_entry = WriteBatchInternal::Contents(merged_batch); + status = logs_.back().writer->AddRecord(log_entry); + total_log_size_ += log_entry.size(); + alive_log_files_.back().AddSize(log_entry.size()); + log_empty_ = false; + log_size = log_entry.size(); + RecordTick(stats_, WAL_FILE_BYTES, log_size); + if (status.ok() && need_log_sync) { + RecordTick(stats_, WAL_FILE_SYNCED); + StopWatch sw(env_, stats_, WAL_FILE_SYNC_MICROS); + // It's safe to access logs_ with unlocked mutex_ here because: + // - we've set getting_synced=true for all logs, + // so other threads won't pop from logs_ while we're here, + // - only writer thread can push to logs_, and we're in + // writer thread, so no one will push to logs_, + // - as long as other threads don't modify it, it's safe to read + // from std::deque from multiple threads concurrently. + for (auto& log : logs_) { + status = log.writer->file()->Sync(db_options_.use_fsync); + if (!status.ok()) { + break; + } + } + if (status.ok() && need_log_dir_sync) { + // We only sync WAL directory the first time WAL syncing is + // requested, so that in case users never turn on WAL sync, + // we can avoid the disk I/O in the write code path. + status = directories_.GetWalDir()->Fsync(); + } + } + + if (merged_batch == &tmp_batch_) { + tmp_batch_.Clear(); + } + } + if (status.ok()) { + PERF_TIMER_GUARD(write_memtable_time); + + { + // Update stats while we are an exclusive group leader, so we know + // that nobody else can be writing to these particular stats. + // We're optimistic, updating the stats before we successfully + // commit. That lets us release our leader status early in + // some cases. + auto stats = default_cf_internal_stats_; + stats->AddDBStats(InternalStats::BYTES_WRITTEN, total_byte_size); + stats->AddDBStats(InternalStats::NUMBER_KEYS_WRITTEN, total_count); + if (!write_options.disableWAL) { + if (write_options.sync) { + stats->AddDBStats(InternalStats::WAL_FILE_SYNCED, 1); + } + stats->AddDBStats(InternalStats::WAL_FILE_BYTES, log_size); + } + uint64_t for_other = write_batch_group.size() - 1; + if (for_other > 0) { + stats->AddDBStats(InternalStats::WRITE_DONE_BY_OTHER, for_other); + if (!write_options.disableWAL) { + stats->AddDBStats(InternalStats::WRITE_WITH_WAL, for_other); + } + } + } + + if (!parallel) { status = WriteBatchInternal::InsertInto( write_batch_group, current_sequence, column_family_memtables_.get(), - write_options.ignore_missing_column_families, - /*log_number*/ 0, this, /*dont_filter_deletes*/ false); + &flush_scheduler_, write_options.ignore_missing_column_families, + 0 /*log_number*/, this, false /*dont_filter_deletes*/); + } else { + WriteThread::ParallelGroup pg{}; + pg.leader = &w; + pg.last_writer = last_writer; + pg.early_exit_allowed = !need_log_sync; + pg.running.store(write_batch_group.size(), std::memory_order_relaxed); + write_thread_.LaunchParallelFollowers(&pg, current_sequence); - // A non-OK status here indicates that the state implied by the - // WAL has diverged from the in-memory state. This could be - // because of a corrupt write_batch (very bad), or because the - // client specified an invalid column family and didn't specify - // ignore_missing_column_families. - // - // Is setting bg_error_ enough here? This will at least stop - // compaction and fail any further writes. - if (!status.ok() && bg_error_.ok()) { - bg_error_ = status; - } + ColumnFamilyMemTablesImpl column_family_memtables( + versions_->GetColumnFamilySet()); + assert(w.sequence == current_sequence); + WriteBatchInternal::SetSequence(w.batch, w.sequence); + w.status = WriteBatchInternal::InsertInto( + w.batch, &column_family_memtables, &flush_scheduler_, + write_options.ignore_missing_column_families, 0 /*log_number*/, + this, true /*dont_filter_deletes*/, + true /*concurrent_memtable_writes*/); + assert(last_writer->sequence == last_sequence); + // CompleteParallelWorker returns true if this thread should + // handle exit, false means somebody else did + exit_completed_early = !write_thread_.CompleteParallelWorker(&w); + status = w.status; + assert(status.ok() || !exit_completed_early); + } + + if (status.ok() && !exit_completed_early) { SetTickerCount(stats_, SEQUENCE_NUMBER, last_sequence); - } - PERF_TIMER_START(write_pre_and_post_process_time); - mutex_.Lock(); - - // internal stats - default_cf_internal_stats_->AddDBStats(InternalStats::BYTES_WRITTEN, - total_byte_size); - default_cf_internal_stats_->AddDBStats(InternalStats::NUMBER_KEYS_WRITTEN, - total_count); - if (!write_options.disableWAL) { - if (write_options.sync) { - default_cf_internal_stats_->AddDBStats(InternalStats::WAL_FILE_SYNCED, - 1); - } - default_cf_internal_stats_->AddDBStats( - InternalStats::WAL_FILE_BYTES, log_size); - } - if (status.ok()) { versions_->SetLastSequence(last_sequence); + if (!need_log_sync) { + write_thread_.ExitAsBatchGroupLeader(&w, last_writer, status); + exit_completed_early = true; + } } - } else { - // Operation failed. Make sure sure mutex is held for cleanup code below. - mutex_.Lock(); - } - if (db_options_.paranoid_checks && !status.ok() && !callback_failed && - !status.IsBusy() && bg_error_.ok()) { - bg_error_ = status; // stop compaction & fail any further writes - } - - mutex_.AssertHeld(); - - if (need_log_sync) { - MarkLogsSynced(logfile_number_, need_log_dir_sync, status); - } - - uint64_t writes_for_other = write_batch_group.size() - 1; - if (writes_for_other > 0) { - default_cf_internal_stats_->AddDBStats(InternalStats::WRITE_DONE_BY_OTHER, - writes_for_other); - if (!write_options.disableWAL) { - default_cf_internal_stats_->AddDBStats(InternalStats::WRITE_WITH_WAL, - writes_for_other); + // A non-OK status here indicates that the state implied by the + // WAL has diverged from the in-memory state. This could be + // because of a corrupt write_batch (very bad), or because the + // client specified an invalid column family and didn't specify + // ignore_missing_column_families. + // + // Is setting bg_error_ enough here? This will at least stop + // compaction and fail any further writes. + if (!status.ok() && bg_error_.ok()) { + bg_error_ = status; + } } } + PERF_TIMER_START(write_pre_and_post_process_time); - mutex_.Unlock(); + if (db_options_.paranoid_checks && !status.ok() && !callback_failed && + !status.IsBusy()) { + mutex_.Lock(); + if (bg_error_.ok()) { + bg_error_ = status; // stop compaction & fail any further writes + } + mutex_.Unlock(); + } - write_thread_.ExitAsBatchGroupLeader(&w, last_writer, status); + if (need_log_sync) { + mutex_.Lock(); + MarkLogsSynced(logfile_number_, need_log_dir_sync, status); + mutex_.Unlock(); + } + + if (!exit_completed_early) { + write_thread_.ExitAsBatchGroupLeader(&w, last_writer, status); + } return status; } @@ -4411,7 +4488,7 @@ Status DBImpl::DelayWrite(uint64_t num_bytes) { Status DBImpl::ScheduleFlushes(WriteContext* context) { ColumnFamilyData* cfd; - while ((cfd = flush_scheduler_.GetNextColumnFamily()) != nullptr) { + while ((cfd = flush_scheduler_.TakeNextColumnFamily()) != nullptr) { auto status = SwitchMemtable(cfd, context); if (cfd->Unref()) { delete cfd; @@ -5084,6 +5161,9 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname, for (auto& cfd : column_families) { s = CheckCompressionSupported(cfd.options); + if (s.ok() && db_options.allow_concurrent_memtable_write) { + s = CheckConcurrentWritesSupported(cfd.options); + } if (!s.ok()) { return s; } diff --git a/db/flush_scheduler.cc b/db/flush_scheduler.cc index 56816159e..f970f1ca8 100644 --- a/db/flush_scheduler.cc +++ b/db/flush_scheduler.cc @@ -13,51 +13,69 @@ namespace rocksdb { void FlushScheduler::ScheduleFlush(ColumnFamilyData* cfd) { #ifndef NDEBUG - assert(column_families_set_.find(cfd) == column_families_set_.end()); - column_families_set_.insert(cfd); + { + std::lock_guard lock(checking_mutex_); + assert(checking_set_.count(cfd) == 0); + checking_set_.insert(cfd); + } #endif // NDEBUG cfd->Ref(); - column_families_.push_back(cfd); + Node* node = new Node{cfd, head_.load(std::memory_order_relaxed)}; + while (!head_.compare_exchange_strong( + node->next, node, std::memory_order_relaxed, std::memory_order_relaxed)) { + // failing CAS updates the first param, so we are already set for + // retry. TakeNextColumnFamily won't happen until after another + // inter-thread synchronization, so we don't even need release + // semantics for this CAS + } } -ColumnFamilyData* FlushScheduler::GetNextColumnFamily() { - ColumnFamilyData* cfd = nullptr; - while (column_families_.size() > 0) { - cfd = column_families_.front(); - column_families_.pop_front(); - if (cfd->IsDropped()) { - if (cfd->Unref()) { - delete cfd; - cfd = nullptr; - } - } else { - break; +ColumnFamilyData* FlushScheduler::TakeNextColumnFamily() { + while (true) { + if (Empty()) { + return nullptr; } - } -#ifndef NDEBUG - if (cfd != nullptr) { - auto itr = column_families_set_.find(cfd); - assert(itr != column_families_set_.end()); - column_families_set_.erase(itr); - } -#endif // NDEBUG - return cfd; -} -bool FlushScheduler::Empty() { return column_families_.empty(); } + // dequeue the head + Node* node = head_.load(std::memory_order_relaxed); + head_.store(node->next, std::memory_order_relaxed); + ColumnFamilyData* cfd = node->column_family; + delete node; -void FlushScheduler::Clear() { - for (auto cfd : column_families_) { #ifndef NDEBUG - auto itr = column_families_set_.find(cfd); - assert(itr != column_families_set_.end()); - column_families_set_.erase(itr); + { + auto iter = checking_set_.find(cfd); + assert(iter != checking_set_.end()); + checking_set_.erase(iter); + } #endif // NDEBUG + + if (!cfd->IsDropped()) { + // success + return cfd; + } + + // no longer relevant, retry if (cfd->Unref()) { delete cfd; } } - column_families_.clear(); +} + +bool FlushScheduler::Empty() { + auto rv = head_.load(std::memory_order_relaxed) == nullptr; + assert(rv == checking_set_.empty()); + return rv; +} + +void FlushScheduler::Clear() { + ColumnFamilyData* cfd; + while ((cfd = TakeNextColumnFamily()) != nullptr) { + if (cfd->Unref()) { + delete cfd; + } + } + assert(Empty()); } } // namespace rocksdb diff --git a/db/flush_scheduler.h b/db/flush_scheduler.h index 0c96709b9..dd439e410 100644 --- a/db/flush_scheduler.h +++ b/db/flush_scheduler.h @@ -6,34 +6,42 @@ #pragma once #include -#include +#include +#include #include -#include namespace rocksdb { class ColumnFamilyData; -// This class is thread-compatible. It's should only be accessed from single -// write thread (between BeginWrite() and EndWrite()) +// Unless otherwise noted, all methods on FlushScheduler should be called +// only with the DB mutex held or from a single-threaded recovery context. class FlushScheduler { public: - FlushScheduler() = default; - ~FlushScheduler() = default; + FlushScheduler() : head_(nullptr) {} + // May be called from multiple threads at once, but not concurrent with + // any other method calls on this instance void ScheduleFlush(ColumnFamilyData* cfd); - // Returns Ref()-ed column family. Client needs to Unref() - // REQUIRES: db mutex is held (exception is single-threaded recovery) - ColumnFamilyData* GetNextColumnFamily(); + + // Removes and returns Ref()-ed column family. Client needs to Unref(). + // Filters column families that have been dropped. + ColumnFamilyData* TakeNextColumnFamily(); bool Empty(); void Clear(); private: - std::deque column_families_; + struct Node { + ColumnFamilyData* column_family; + Node* next; + }; + + std::atomic head_; #ifndef NDEBUG - std::set column_families_set_; + std::mutex checking_mutex_; + std::set checking_set_; #endif // NDEBUG }; diff --git a/db/inlineskiplist.h b/db/inlineskiplist.h index a4fb5ab89..201580b10 100644 --- a/db/inlineskiplist.h +++ b/db/inlineskiplist.h @@ -20,10 +20,12 @@ // // Thread safety ------------- // -// Writes require external synchronization, most likely a mutex. Reads -// require a guarantee that the InlineSkipList will not be destroyed while -// the read is in progress. Apart from that, reads progress without any -// internal locking or synchronization. +// Writes via Insert require external synchronization, most likely a mutex. +// InsertConcurrently can be safely called concurrently with reads and +// with other concurrent inserts. Reads require a guarantee that the +// InlineSkipList will not be destroyed while the read is in progress. +// Apart from that, reads progress without any internal locking or +// synchronization. // // Invariants: // @@ -63,16 +65,21 @@ class InlineSkipList { int32_t max_height = 12, int32_t branching_factor = 4); - // Allocates a key and a skip-list node, returning a pointer to the - // key portion of the node. + // Allocates a key and a skip-list node, returning a pointer to the key + // portion of the node. This method is thread-safe if the allocator + // is thread-safe. char* AllocateKey(size_t key_size); // Inserts a key allocated by AllocateKey, after the actual key value // has been filled in. // // REQUIRES: nothing that compares equal to key is currently in the list. + // REQUIRES: no concurrent calls to INSERT void Insert(const char* key); + // Like Insert, but external synchronization is not required. + void InsertConcurrently(const char* key); + // Returns true iff an entry that compares equal to key is in the list. bool Contains(const char* key) const; @@ -124,6 +131,8 @@ class InlineSkipList { }; private: + enum MaxPossibleHeightEnum : uint16_t { kMaxPossibleHeight = 32 }; + const uint16_t kMaxHeight_; const uint16_t kBranching_; const uint32_t kScaledInverseBranching_; @@ -139,11 +148,11 @@ class InlineSkipList { std::atomic max_height_; // Height of the entire list // Used for optimizing sequential insert patterns. Tricky. prev_[i] for - // i up to max_height_ is the predecessor of prev_[0] and prev_height_ - // is the height of prev_[0]. prev_[0] can only be equal to head before - // insertion, in which case max_height_ and prev_height_ are 1. + // i up to max_height_ - 1 (inclusive) is the predecessor of prev_[0]. + // prev_height_ is the height of prev_[0]. prev_[0] can only be equal + // to head when max_height_ and prev_height_ are both 1. Node** prev_; - int32_t prev_height_; + std::atomic prev_height_; inline int GetMaxHeight() const { return max_height_.load(std::memory_order_relaxed); @@ -175,6 +184,15 @@ class InlineSkipList { // Return head_ if list is empty. Node* FindLast() const; + // Traverses a single level of the list, setting *out_prev to the last + // node before the key and *out_next to the first node after. Assumes + // that the key is not present in the skip list. On entry, before should + // point to a node that is before the key, and after should point to + // a node that is after the key. after should be nullptr if a good after + // node isn't conveniently available. + void FindLevelSplice(const char* key, Node* before, Node* after, int level, + Node** out_prev, Node** out_next); + // No copying allowed InlineSkipList(const InlineSkipList&); InlineSkipList& operator=(const InlineSkipList&); @@ -223,6 +241,11 @@ struct InlineSkipList::Node { next_[-n].store(x, std::memory_order_release); } + bool CASNext(int n, Node* expected, Node* x) { + assert(n >= 0); + return next_[-n].compare_exchange_strong(expected, x); + } + // No-barrier variants that can be safely used in a few locations. Node* NoBarrier_Next(int n) { assert(n >= 0); @@ -305,11 +328,13 @@ int InlineSkipList::RandomHeight() { // Increase height with probability 1 in kBranching int height = 1; - while (height < kMaxHeight_ && rnd->Next() < kScaledInverseBranching_) { + while (height < kMaxHeight_ && height < kMaxPossibleHeight && + rnd->Next() < kScaledInverseBranching_) { height++; } assert(height > 0); assert(height <= kMaxHeight_); + assert(height <= kMaxPossibleHeight); return height; } @@ -440,7 +465,7 @@ InlineSkipList::InlineSkipList(const Comparator cmp, max_height_(1), prev_height_(1) { assert(max_height > 0 && kMaxHeight_ == static_cast(max_height)); - assert(branching_factor > 0 && + assert(branching_factor > 1 && kBranching_ == static_cast(branching_factor)); assert(kScaledInverseBranching_ > 0); // Allocate the prev_ Node* array, directly from the passed-in allocator. @@ -485,16 +510,23 @@ InlineSkipList::AllocateNode(size_t key_size, int height) { template void InlineSkipList::Insert(const char* key) { + // InsertConcurrently can't maintain the prev_ invariants when it needs + // to increase max_height_. In that case it sets prev_height_ to zero, + // letting us know that we should ignore it. A relaxed load suffices + // here because write thread synchronization separates Insert calls + // from InsertConcurrently calls. + auto prev_height = prev_height_.load(std::memory_order_relaxed); + // fast path for sequential insertion - if (!KeyIsAfterNode(key, prev_[0]->NoBarrier_Next(0)) && + if (prev_height > 0 && !KeyIsAfterNode(key, prev_[0]->NoBarrier_Next(0)) && (prev_[0] == head_ || KeyIsAfterNode(key, prev_[0]))) { - assert(prev_[0] != head_ || (prev_height_ == 1 && GetMaxHeight() == 1)); + assert(prev_[0] != head_ || (prev_height == 1 && GetMaxHeight() == 1)); // Outside of this method prev_[1..max_height_] is the predecessor // of prev_[0], and prev_height_ refers to prev_[0]. Inside Insert // prev_[0..max_height - 1] is the predecessor of key. Switch from // the external state to the internal - for (int i = 1; i < prev_height_; i++) { + for (int i = 1; i < prev_height; i++) { prev_[i] = prev_[0]; } } else { @@ -534,7 +566,73 @@ void InlineSkipList::Insert(const char* key) { prev_[i]->SetNext(i, x); } prev_[0] = x; - prev_height_ = height; + prev_height_.store(height, std::memory_order_relaxed); +} + +template +void InlineSkipList::FindLevelSplice(const char* key, Node* before, + Node* after, int level, + Node** out_prev, + Node** out_next) { + while (true) { + Node* next = before->Next(level); + assert(before == head_ || next == nullptr || + KeyIsAfterNode(next->Key(), before)); + assert(before == head_ || KeyIsAfterNode(key, before)); + if (next == after || !KeyIsAfterNode(key, next)) { + // found it + *out_prev = before; + *out_next = next; + return; + } + before = next; + } +} + +template +void InlineSkipList::InsertConcurrently(const char* key) { + Node* x = reinterpret_cast(const_cast(key)) - 1; + int height = x->UnstashHeight(); + assert(height >= 1 && height <= kMaxHeight_); + + int max_height = max_height_.load(std::memory_order_relaxed); + while (height > max_height) { + if (max_height_.compare_exchange_strong(max_height, height)) { + // successfully updated it + max_height = height; + + // we dont have a lock-free algorithm for fixing up prev_, so just + // mark it invalid + prev_height_.store(0, std::memory_order_relaxed); + break; + } + // else retry, possibly exiting the loop because somebody else + // increased it + } + assert(max_height <= kMaxPossibleHeight); + + Node* prev[kMaxPossibleHeight + 1]; + Node* next[kMaxPossibleHeight + 1]; + prev[max_height] = head_; + next[max_height] = nullptr; + for (int i = max_height - 1; i >= 0; --i) { + FindLevelSplice(key, prev[i + 1], next[i + 1], i, &prev[i], &next[i]); + } + for (int i = 0; i < height; ++i) { + while (true) { + x->NoBarrier_SetNext(i, next[i]); + if (prev[i]->CASNext(i, next[i], x)) { + // success + break; + } + // CAS failed, we need to recompute prev and next. It is unlikely + // to be helpful to try to use a different level as we redo the + // search, because it should be unlikely that lots of nodes have + // been inserted between prev[i] and next[i]. No point in using + // next[i] as the after hint, because we know it is stale. + FindLevelSplice(key, prev[i], nullptr, i, &prev[i], &next[i]); + } + } } template diff --git a/db/inlineskiplist_test.cc b/db/inlineskiplist_test.cc index 70fd97a88..5c2dd6fa5 100644 --- a/db/inlineskiplist_test.cc +++ b/db/inlineskiplist_test.cc @@ -10,7 +10,7 @@ #include "db/inlineskiplist.h" #include #include "rocksdb/env.h" -#include "util/arena.h" +#include "util/concurrent_arena.h" #include "util/hash.h" #include "util/random.h" #include "util/testharness.h" @@ -67,7 +67,7 @@ TEST_F(InlineSkipTest, InsertAndLookup) { const int R = 5000; Random rnd(1000); std::set keys; - Arena arena; + ConcurrentArena arena; TestComparator cmp; InlineSkipList list(cmp, &arena); for (int i = 0; i < N; i++) { @@ -167,9 +167,10 @@ TEST_F(InlineSkipTest, InsertAndLookup) { // check that it is either expected given the initial snapshot or has // been concurrently added since the iterator started. class ConcurrentTest { - private: - static const uint32_t K = 4; + public: + static const uint32_t K = 8; + private: static uint64_t key(Key key) { return (key >> 40); } static uint64_t gen(Key key) { return (key >> 8) & 0xffffffffu; } static uint64_t hash(Key key) { return key & 0xff; } @@ -222,7 +223,7 @@ class ConcurrentTest { // Current state of the test State current_; - Arena arena_; + ConcurrentArena arena_; // InlineSkipList is not protected by mu_. We just use a single writer // thread to modify it. @@ -231,7 +232,7 @@ class ConcurrentTest { public: ConcurrentTest() : list_(TestComparator(), &arena_) {} - // REQUIRES: External synchronization + // REQUIRES: No concurrent calls to WriteStep or ConcurrentWriteStep void WriteStep(Random* rnd) { const uint32_t k = rnd->Next() % K; const int g = current_.Get(k) + 1; @@ -242,6 +243,17 @@ class ConcurrentTest { current_.Set(k, g); } + // REQUIRES: No concurrent calls for the same k + void ConcurrentWriteStep(uint32_t k) { + const int g = current_.Get(k) + 1; + const Key new_key = MakeKey(k, g); + char* buf = list_.AllocateKey(sizeof(Key)); + memcpy(buf, &new_key, sizeof(Key)); + list_.InsertConcurrently(buf); + ASSERT_EQ(g, current_.Get(k) + 1); + current_.Set(k, g); + } + void ReadStep(Random* rnd) { // Remember the initial committed state of the skiplist. State initial_state; @@ -304,7 +316,7 @@ const uint32_t ConcurrentTest::K; // Simple test that does single-threaded testing of the ConcurrentTest // scaffolding. -TEST_F(InlineSkipTest, ConcurrentWithoutThreads) { +TEST_F(InlineSkipTest, ConcurrentReadWithoutThreads) { ConcurrentTest test; Random rnd(test::RandomSeed()); for (int i = 0; i < 10000; i++) { @@ -313,16 +325,33 @@ TEST_F(InlineSkipTest, ConcurrentWithoutThreads) { } } +TEST_F(InlineSkipTest, ConcurrentInsertWithoutThreads) { + ConcurrentTest test; + Random rnd(test::RandomSeed()); + for (int i = 0; i < 10000; i++) { + test.ReadStep(&rnd); + uint32_t base = rnd.Next(); + for (int j = 0; j < 4; ++j) { + test.ConcurrentWriteStep((base + j) % ConcurrentTest::K); + } + } +} + class TestState { public: ConcurrentTest t_; int seed_; std::atomic quit_flag_; + std::atomic next_writer_; enum ReaderState { STARTING, RUNNING, DONE }; explicit TestState(int s) - : seed_(s), quit_flag_(false), state_(STARTING), state_cv_(&mu_) {} + : seed_(s), + quit_flag_(false), + state_(STARTING), + pending_writers_(0), + state_cv_(&mu_) {} void Wait(ReaderState s) { mu_.Lock(); @@ -339,9 +368,27 @@ class TestState { mu_.Unlock(); } + void AdjustPendingWriters(int delta) { + mu_.Lock(); + pending_writers_ += delta; + if (pending_writers_ == 0) { + state_cv_.Signal(); + } + mu_.Unlock(); + } + + void WaitForPendingWriters() { + mu_.Lock(); + while (pending_writers_ != 0) { + state_cv_.Wait(); + } + mu_.Unlock(); + } + private: port::Mutex mu_; ReaderState state_; + int pending_writers_; port::CondVar state_cv_; }; @@ -357,7 +404,14 @@ static void ConcurrentReader(void* arg) { state->Change(TestState::DONE); } -static void RunConcurrent(int run) { +static void ConcurrentWriter(void* arg) { + TestState* state = reinterpret_cast(arg); + uint32_t k = state->next_writer_++ % ConcurrentTest::K; + state->t_.ConcurrentWriteStep(k); + state->AdjustPendingWriters(-1); +} + +static void RunConcurrentRead(int run) { const int seed = test::RandomSeed() + (run * 100); Random rnd(seed); const int N = 1000; @@ -369,7 +423,7 @@ static void RunConcurrent(int run) { TestState state(seed + 1); Env::Default()->Schedule(ConcurrentReader, &state); state.Wait(TestState::RUNNING); - for (int k = 0; k < kSize; k++) { + for (int k = 0; k < kSize; ++k) { state.t_.WriteStep(&rnd); } state.quit_flag_.store(true, std::memory_order_release); @@ -377,11 +431,41 @@ static void RunConcurrent(int run) { } } -TEST_F(InlineSkipTest, Concurrent1) { RunConcurrent(1); } -TEST_F(InlineSkipTest, Concurrent2) { RunConcurrent(2); } -TEST_F(InlineSkipTest, Concurrent3) { RunConcurrent(3); } -TEST_F(InlineSkipTest, Concurrent4) { RunConcurrent(4); } -TEST_F(InlineSkipTest, Concurrent5) { RunConcurrent(5); } +static void RunConcurrentInsert(int run, int write_parallelism = 4) { + Env::Default()->SetBackgroundThreads(1 + write_parallelism, + Env::Priority::LOW); + const int seed = test::RandomSeed() + (run * 100); + Random rnd(seed); + const int N = 1000; + const int kSize = 1000; + for (int i = 0; i < N; i++) { + if ((i % 100) == 0) { + fprintf(stderr, "Run %d of %d\n", i, N); + } + TestState state(seed + 1); + Env::Default()->Schedule(ConcurrentReader, &state); + state.Wait(TestState::RUNNING); + for (int k = 0; k < kSize; k += write_parallelism) { + state.next_writer_ = rnd.Next(); + state.AdjustPendingWriters(write_parallelism); + for (int p = 0; p < write_parallelism; ++p) { + Env::Default()->Schedule(ConcurrentWriter, &state); + } + state.WaitForPendingWriters(); + } + state.quit_flag_.store(true, std::memory_order_release); + state.Wait(TestState::DONE); + } +} + +TEST_F(InlineSkipTest, ConcurrentRead1) { RunConcurrentRead(1); } +TEST_F(InlineSkipTest, ConcurrentRead2) { RunConcurrentRead(2); } +TEST_F(InlineSkipTest, ConcurrentRead3) { RunConcurrentRead(3); } +TEST_F(InlineSkipTest, ConcurrentRead4) { RunConcurrentRead(4); } +TEST_F(InlineSkipTest, ConcurrentRead5) { RunConcurrentRead(5); } +TEST_F(InlineSkipTest, ConcurrentInsert1) { RunConcurrentInsert(1); } +TEST_F(InlineSkipTest, ConcurrentInsert2) { RunConcurrentInsert(2); } +TEST_F(InlineSkipTest, ConcurrentInsert3) { RunConcurrentInsert(3); } } // namespace rocksdb diff --git a/db/internal_stats.cc b/db/internal_stats.cc index 3ee0eb931..74aac3649 100644 --- a/db/internal_stats.cc +++ b/db/internal_stats.cc @@ -485,14 +485,14 @@ void InternalStats::DumpDBStats(std::string* value) { seconds_up, interval_seconds_up); value->append(buf); // Cumulative - uint64_t user_bytes_written = db_stats_[InternalStats::BYTES_WRITTEN]; - uint64_t num_keys_written = db_stats_[InternalStats::NUMBER_KEYS_WRITTEN]; - uint64_t write_other = db_stats_[InternalStats::WRITE_DONE_BY_OTHER]; - uint64_t write_self = db_stats_[InternalStats::WRITE_DONE_BY_SELF]; - uint64_t wal_bytes = db_stats_[InternalStats::WAL_FILE_BYTES]; - uint64_t wal_synced = db_stats_[InternalStats::WAL_FILE_SYNCED]; - uint64_t write_with_wal = db_stats_[InternalStats::WRITE_WITH_WAL]; - uint64_t write_stall_micros = db_stats_[InternalStats::WRITE_STALL_MICROS]; + uint64_t user_bytes_written = GetDBStats(InternalStats::BYTES_WRITTEN); + uint64_t num_keys_written = GetDBStats(InternalStats::NUMBER_KEYS_WRITTEN); + uint64_t write_other = GetDBStats(InternalStats::WRITE_DONE_BY_OTHER); + uint64_t write_self = GetDBStats(InternalStats::WRITE_DONE_BY_SELF); + uint64_t wal_bytes = GetDBStats(InternalStats::WAL_FILE_BYTES); + uint64_t wal_synced = GetDBStats(InternalStats::WAL_FILE_SYNCED); + uint64_t write_with_wal = GetDBStats(InternalStats::WRITE_WITH_WAL); + uint64_t write_stall_micros = GetDBStats(InternalStats::WRITE_STALL_MICROS); uint64_t compact_bytes_read = 0; uint64_t compact_bytes_write = 0; uint64_t compact_micros = 0; diff --git a/db/internal_stats.h b/db/internal_stats.h index 16aee45a8..9c4414ef1 100644 --- a/db/internal_stats.h +++ b/db/internal_stats.h @@ -109,24 +109,16 @@ class InternalStats { }; InternalStats(int num_levels, Env* env, ColumnFamilyData* cfd) - : db_stats_(INTERNAL_DB_STATS_ENUM_MAX), - cf_stats_value_(INTERNAL_CF_STATS_ENUM_MAX), - cf_stats_count_(INTERNAL_CF_STATS_ENUM_MAX), + : db_stats_{}, + cf_stats_value_{}, + cf_stats_count_{}, comp_stats_(num_levels), file_read_latency_(num_levels), bg_error_count_(0), number_levels_(num_levels), env_(env), cfd_(cfd), - started_at_(env->NowMicros()) { - for (int i = 0; i< INTERNAL_DB_STATS_ENUM_MAX; ++i) { - db_stats_[i] = 0; - } - for (int i = 0; i< INTERNAL_CF_STATS_ENUM_MAX; ++i) { - cf_stats_value_[i] = 0; - cf_stats_count_[i] = 0; - } - } + started_at_(env->NowMicros()) {} // Per level compaction stats. comp_stats_[level] stores the stats for // compactions that produced data for the specified "level". @@ -239,7 +231,13 @@ class InternalStats { } void AddDBStats(InternalDBStatsType type, uint64_t value) { - db_stats_[type] += value; + auto& v = db_stats_[type]; + v.store(v.load(std::memory_order_relaxed) + value, + std::memory_order_relaxed); + } + + uint64_t GetDBStats(InternalDBStatsType type) { + return db_stats_[type].load(std::memory_order_relaxed); } HistogramImpl* GetFileReadHist(int level) { @@ -264,10 +262,10 @@ class InternalStats { void DumpCFStats(std::string* value); // Per-DB stats - std::vector db_stats_; + std::atomic db_stats_[INTERNAL_DB_STATS_ENUM_MAX]; // Per-ColumnFamily stats - std::vector cf_stats_value_; - std::vector cf_stats_count_; + uint64_t cf_stats_value_[INTERNAL_CF_STATS_ENUM_MAX]; + uint64_t cf_stats_count_[INTERNAL_CF_STATS_ENUM_MAX]; // Per-ColumnFamily/level compaction stats std::vector comp_stats_; std::vector file_read_latency_; diff --git a/db/memtable.cc b/db/memtable.cc index 484e18e91..120a39d1b 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -60,7 +60,7 @@ MemTable::MemTable(const InternalKeyComparator& cmp, moptions_(ioptions, mutable_cf_options), refs_(0), kArenaBlockSize(OptimizeBlockSize(moptions_.arena_block_size)), - arena_(moptions_.arena_block_size), + arena_(moptions_.arena_block_size, 0), allocator_(&arena_, write_buffer), table_(ioptions.memtable_factory->CreateMemTableRep( comparator_, &allocator_, ioptions.prefix_extractor, @@ -78,12 +78,12 @@ MemTable::MemTable(const InternalKeyComparator& cmp, ? moptions_.inplace_update_num_locks : 0), prefix_extractor_(ioptions.prefix_extractor), - should_flush_(ShouldFlushNow()), - flush_scheduled_(false), + flush_state_(FLUSH_NOT_REQUESTED), env_(ioptions.env) { - // if should_flush_ == true without an entry inserted, something must have - // gone wrong already. - assert(!should_flush_); + UpdateFlushState(); + // something went wrong if we need to flush before inserting anything + assert(!ShouldScheduleFlush()); + if (prefix_extractor_ && moptions_.memtable_prefix_bloom_bits > 0) { prefix_bloom_.reset(new DynamicBloom( &allocator_, @@ -167,6 +167,17 @@ bool MemTable::ShouldFlushNow() const { return arena_.AllocatedAndUnused() < kArenaBlockSize / 4; } +void MemTable::UpdateFlushState() { + auto state = flush_state_.load(std::memory_order_relaxed); + if (state == FLUSH_NOT_REQUESTED && ShouldFlushNow()) { + // ignore CAS failure, because that means somebody else requested + // a flush + flush_state_.compare_exchange_strong(state, FLUSH_REQUESTED, + std::memory_order_relaxed, + std::memory_order_relaxed); + } +} + int MemTable::KeyComparator::operator()(const char* prefix_len_key1, const char* prefix_len_key2) const { // Internal keys are encoded as length-prefixed strings. @@ -335,7 +346,7 @@ uint64_t MemTable::ApproximateSize(const Slice& start_ikey, void MemTable::Add(SequenceNumber s, ValueType type, const Slice& key, /* user key */ - const Slice& value) { + const Slice& value, bool allow_concurrent) { // Format of an entry is concatenation of: // key_size : varint32 of internal_key.size() // key bytes : char[internal_key.size()] @@ -349,7 +360,7 @@ void MemTable::Add(SequenceNumber s, ValueType type, val_size; char* buf = nullptr; KeyHandle handle = table_->Allocate(encoded_len, &buf); - assert(buf != nullptr); + char* p = EncodeVarint32(buf, internal_key_size); memcpy(p, key.data(), key_size); p += key_size; @@ -359,32 +370,64 @@ void MemTable::Add(SequenceNumber s, ValueType type, p = EncodeVarint32(p, val_size); memcpy(p, value.data(), val_size); assert((unsigned)(p + val_size - buf) == (unsigned)encoded_len); - table_->Insert(handle); - num_entries_.store(num_entries_.load(std::memory_order_relaxed) + 1, + if (!allow_concurrent) { + table_->Insert(handle); + + // this is a bit ugly, but is the way to avoid locked instructions + // when incrementing an atomic + num_entries_.store(num_entries_.load(std::memory_order_relaxed) + 1, + std::memory_order_relaxed); + data_size_.store(data_size_.load(std::memory_order_relaxed) + encoded_len, std::memory_order_relaxed); - data_size_.store(data_size_.load(std::memory_order_relaxed) + encoded_len, - std::memory_order_relaxed); - if (type == kTypeDeletion) { - num_deletes_++; - } - - if (prefix_bloom_) { - assert(prefix_extractor_); - prefix_bloom_->Add(prefix_extractor_->Transform(key)); - } - - // The first sequence number inserted into the memtable - assert(first_seqno_ == 0 || s > first_seqno_); - if (first_seqno_ == 0) { - first_seqno_ = s; - - if (earliest_seqno_ == kMaxSequenceNumber) { - earliest_seqno_ = first_seqno_; + if (type == kTypeDeletion) { + num_deletes_.store(num_deletes_.load(std::memory_order_relaxed) + 1, + std::memory_order_relaxed); + } + + if (prefix_bloom_) { + assert(prefix_extractor_); + prefix_bloom_->Add(prefix_extractor_->Transform(key)); + } + + // The first sequence number inserted into the memtable + assert(first_seqno_ == 0 || s > first_seqno_); + if (first_seqno_ == 0) { + first_seqno_.store(s, std::memory_order_relaxed); + + if (earliest_seqno_ == kMaxSequenceNumber) { + earliest_seqno_.store(GetFirstSequenceNumber(), + std::memory_order_relaxed); + } + assert(first_seqno_.load() >= earliest_seqno_.load()); + } + } else { + table_->InsertConcurrently(handle); + + num_entries_.fetch_add(1, std::memory_order_relaxed); + data_size_.fetch_add(encoded_len, std::memory_order_relaxed); + if (type == kTypeDeletion) { + num_deletes_.fetch_add(1, std::memory_order_relaxed); + } + + if (prefix_bloom_) { + assert(prefix_extractor_); + prefix_bloom_->AddConcurrently(prefix_extractor_->Transform(key)); + } + + // atomically update first_seqno_ and earliest_seqno_. + uint64_t cur_seq_num = first_seqno_.load(std::memory_order_relaxed); + while ((cur_seq_num == 0 || s < cur_seq_num) && + !first_seqno_.compare_exchange_weak(cur_seq_num, s)) { + } + uint64_t cur_earliest_seqno = + earliest_seqno_.load(std::memory_order_relaxed); + while ( + (cur_earliest_seqno == kMaxSequenceNumber || s < cur_earliest_seqno) && + !first_seqno_.compare_exchange_weak(cur_earliest_seqno, s)) { } - assert(first_seqno_ >= earliest_seqno_); } - should_flush_ = ShouldFlushNow(); + UpdateFlushState(); } // Callback from MemTable::Get() @@ -685,16 +728,16 @@ bool MemTable::UpdateCallback(SequenceNumber seq, } } RecordTick(moptions_.statistics, NUMBER_KEYS_UPDATED); - should_flush_ = ShouldFlushNow(); + UpdateFlushState(); return true; } else if (status == UpdateStatus::UPDATED) { Add(seq, kTypeValue, key, Slice(str_value)); RecordTick(moptions_.statistics, NUMBER_KEYS_WRITTEN); - should_flush_ = ShouldFlushNow(); + UpdateFlushState(); return true; } else if (status == UpdateStatus::UPDATE_FAILED) { // No action required. Return. - should_flush_ = ShouldFlushNow(); + UpdateFlushState(); return true; } } diff --git a/db/memtable.h b/db/memtable.h index af3b1f945..110985620 100644 --- a/db/memtable.h +++ b/db/memtable.h @@ -8,10 +8,11 @@ // found in the LICENSE file. See the AUTHORS file for names of contributors. #pragma once -#include -#include -#include +#include #include +#include +#include +#include #include #include "db/dbformat.h" #include "db/skiplist.h" @@ -21,8 +22,9 @@ #include "rocksdb/memtablerep.h" #include "rocksdb/immutable_options.h" #include "db/memtable_allocator.h" -#include "util/arena.h" +#include "util/concurrent_arena.h" #include "util/dynamic_bloom.h" +#include "util/instrumented_mutex.h" #include "util/mutable_cf_options.h" namespace rocksdb { @@ -124,10 +126,17 @@ class MemTable { // This method heuristically determines if the memtable should continue to // host more data. bool ShouldScheduleFlush() const { - return flush_scheduled_ == false && should_flush_; + return flush_state_.load(std::memory_order_relaxed) == FLUSH_REQUESTED; } - void MarkFlushScheduled() { flush_scheduled_ = true; } + // Returns true if a flush should be scheduled and the caller should + // be the one to schedule it + bool MarkFlushScheduled() { + auto before = FLUSH_REQUESTED; + return flush_state_.compare_exchange_strong(before, FLUSH_SCHEDULED, + std::memory_order_relaxed, + std::memory_order_relaxed); + } // Return an iterator that yields the contents of the memtable. // @@ -147,11 +156,10 @@ class MemTable { // specified sequence number and with the specified type. // Typically value will be empty if type==kTypeDeletion. // - // REQUIRES: external synchronization to prevent simultaneous - // operations on the same MemTable. - void Add(SequenceNumber seq, ValueType type, - const Slice& key, - const Slice& value); + // REQUIRES: if allow_concurrent = false, external synchronization to prevent + // simultaneous operations on the same MemTable. + void Add(SequenceNumber seq, ValueType type, const Slice& key, + const Slice& value, bool allow_concurrent = false); // If memtable contains a value for key, store it in *value and return true. // If memtable contains a deletion for key, store a NotFound() error @@ -220,7 +228,9 @@ class MemTable { // Get total number of deletes in the mem table. // REQUIRES: external synchronization to prevent simultaneous // operations on the same MemTable (unless this Memtable is immutable). - uint64_t num_deletes() const { return num_deletes_; } + uint64_t num_deletes() const { + return num_deletes_.load(std::memory_order_relaxed); + } // Returns the edits area that is needed for flushing the memtable VersionEdit* GetEdits() { return &edit_; } @@ -234,7 +244,9 @@ class MemTable { // into the memtable. // REQUIRES: external synchronization to prevent simultaneous // operations on the same MemTable (unless this Memtable is immutable). - SequenceNumber GetFirstSequenceNumber() { return first_seqno_; } + SequenceNumber GetFirstSequenceNumber() { + return first_seqno_.load(std::memory_order_relaxed); + } // Returns the sequence number that is guaranteed to be smaller than or equal // to the sequence number of any key that could be inserted into this @@ -243,7 +255,9 @@ class MemTable { // // If the earliest sequence number could not be determined, // kMaxSequenceNumber will be returned. - SequenceNumber GetEarliestSequenceNumber() { return earliest_seqno_; } + SequenceNumber GetEarliestSequenceNumber() { + return earliest_seqno_.load(std::memory_order_relaxed); + } // Returns the next active logfile number when this memtable is about to // be flushed to storage @@ -290,8 +304,7 @@ class MemTable { const MemTableOptions* GetMemTableOptions() const { return &moptions_; } private: - // Dynamically check if we can add more incoming entries - bool ShouldFlushNow() const; + enum FlushStateEnum { FLUSH_NOT_REQUESTED, FLUSH_REQUESTED, FLUSH_SCHEDULED }; friend class MemTableIterator; friend class MemTableBackwardIterator; @@ -301,14 +314,14 @@ class MemTable { const MemTableOptions moptions_; int refs_; const size_t kArenaBlockSize; - Arena arena_; + ConcurrentArena arena_; MemTableAllocator allocator_; unique_ptr table_; // Total data size of all data inserted std::atomic data_size_; std::atomic num_entries_; - uint64_t num_deletes_; + std::atomic num_deletes_; // These are used to manage memtable flushes to storage bool flush_in_progress_; // started the flush @@ -320,11 +333,11 @@ class MemTable { VersionEdit edit_; // The sequence number of the kv that was inserted first - SequenceNumber first_seqno_; + std::atomic first_seqno_; // The db sequence number at the time of creation or kMaxSequenceNumber // if not set. - SequenceNumber earliest_seqno_; + std::atomic earliest_seqno_; // The log files earlier than this number can be deleted. uint64_t mem_next_logfile_number_; @@ -332,19 +345,22 @@ class MemTable { // rw locks for inplace updates std::vector locks_; - // No copying allowed - MemTable(const MemTable&); - void operator=(const MemTable&); - const SliceTransform* const prefix_extractor_; std::unique_ptr prefix_bloom_; - // a flag indicating if a memtable has met the criteria to flush - bool should_flush_; + std::atomic flush_state_; - // a flag indicating if flush has been scheduled - bool flush_scheduled_; Env* env_; + + // Returns a heuristic flush decision + bool ShouldFlushNow() const; + + // Updates flush_state_ using ShouldFlushNow() + void UpdateFlushState(); + + // No copying allowed + MemTable(const MemTable&); + MemTable& operator=(const MemTable&); }; extern const char* EncodeKey(std::string* scratch, const Slice& target); diff --git a/db/memtable_allocator.cc b/db/memtable_allocator.cc index d3ecea2fd..1ed2019b6 100644 --- a/db/memtable_allocator.cc +++ b/db/memtable_allocator.cc @@ -7,46 +7,42 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. -#include - #include "db/memtable_allocator.h" + +#include #include "db/writebuffer.h" #include "util/arena.h" namespace rocksdb { -MemTableAllocator::MemTableAllocator(Arena* arena, WriteBuffer* write_buffer) - : arena_(arena), write_buffer_(write_buffer), bytes_allocated_(0) { -} +MemTableAllocator::MemTableAllocator(Allocator* allocator, + WriteBuffer* write_buffer) + : allocator_(allocator), write_buffer_(write_buffer), bytes_allocated_(0) {} -MemTableAllocator::~MemTableAllocator() { - DoneAllocating(); -} +MemTableAllocator::~MemTableAllocator() { DoneAllocating(); } char* MemTableAllocator::Allocate(size_t bytes) { assert(write_buffer_ != nullptr); - bytes_allocated_ += bytes; + bytes_allocated_.fetch_add(bytes, std::memory_order_relaxed); write_buffer_->ReserveMem(bytes); - return arena_->Allocate(bytes); + return allocator_->Allocate(bytes); } char* MemTableAllocator::AllocateAligned(size_t bytes, size_t huge_page_size, Logger* logger) { assert(write_buffer_ != nullptr); - bytes_allocated_ += bytes; + bytes_allocated_.fetch_add(bytes, std::memory_order_relaxed); write_buffer_->ReserveMem(bytes); - return arena_->AllocateAligned(bytes, huge_page_size, logger); + return allocator_->AllocateAligned(bytes, huge_page_size, logger); } void MemTableAllocator::DoneAllocating() { if (write_buffer_ != nullptr) { - write_buffer_->FreeMem(bytes_allocated_); + write_buffer_->FreeMem(bytes_allocated_.load(std::memory_order_relaxed)); write_buffer_ = nullptr; } } -size_t MemTableAllocator::BlockSize() const { - return arena_->BlockSize(); -} +size_t MemTableAllocator::BlockSize() const { return allocator_->BlockSize(); } } // namespace rocksdb diff --git a/db/memtable_allocator.h b/db/memtable_allocator.h index fa8ee1287..c2cf130cc 100644 --- a/db/memtable_allocator.h +++ b/db/memtable_allocator.h @@ -11,17 +11,18 @@ // to WriteBuffer so we can track and enforce overall write buffer limits. #pragma once + +#include #include "util/allocator.h" namespace rocksdb { -class Arena; class Logger; class WriteBuffer; class MemTableAllocator : public Allocator { public: - explicit MemTableAllocator(Arena* arena, WriteBuffer* write_buffer); + explicit MemTableAllocator(Allocator* allocator, WriteBuffer* write_buffer); ~MemTableAllocator(); // Allocator interface @@ -35,9 +36,9 @@ class MemTableAllocator : public Allocator { void DoneAllocating(); private: - Arena* arena_; + Allocator* allocator_; WriteBuffer* write_buffer_; - size_t bytes_allocated_; + std::atomic bytes_allocated_; // No copying allowed MemTableAllocator(const MemTableAllocator&); diff --git a/db/repair.cc b/db/repair.cc index db8650e18..1805059a7 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -270,7 +270,7 @@ class Repairer { continue; } WriteBatchInternal::SetContents(&batch, record); - status = WriteBatchInternal::InsertInto(&batch, cf_mems_default); + status = WriteBatchInternal::InsertInto(&batch, cf_mems_default, nullptr); if (status.ok()) { counter += WriteBatchInternal::Count(&batch); } else { diff --git a/db/write_batch.cc b/db/write_batch.cc index ade89aa31..0565c0599 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -28,10 +28,12 @@ #include #include +#include #include "db/column_family.h" #include "db/db_impl.h" #include "db/dbformat.h" +#include "db/flush_scheduler.h" #include "db/memtable.h" #include "db/snapshot_impl.h" #include "db/write_batch_internal.h" @@ -536,35 +538,42 @@ Status WriteBatch::RollbackToSavePoint() { } namespace { -// This class can *only* be used from a single-threaded write thread, because it -// calls ColumnFamilyMemTablesImpl::Seek() class MemTableInserter : public WriteBatch::Handler { public: SequenceNumber sequence_; - ColumnFamilyMemTables* cf_mems_; - bool ignore_missing_column_families_; - uint64_t log_number_; + ColumnFamilyMemTables* const cf_mems_; + FlushScheduler* const flush_scheduler_; + const bool ignore_missing_column_families_; + const uint64_t log_number_; DBImpl* db_; const bool dont_filter_deletes_; + const bool concurrent_memtable_writes_; + // cf_mems should not be shared with concurrent inserters MemTableInserter(SequenceNumber sequence, ColumnFamilyMemTables* cf_mems, + FlushScheduler* flush_scheduler, bool ignore_missing_column_families, uint64_t log_number, - DB* db, const bool dont_filter_deletes) + DB* db, const bool dont_filter_deletes, + bool concurrent_memtable_writes) : sequence_(sequence), cf_mems_(cf_mems), + flush_scheduler_(flush_scheduler), ignore_missing_column_families_(ignore_missing_column_families), log_number_(log_number), db_(reinterpret_cast(db)), - dont_filter_deletes_(dont_filter_deletes) { - assert(cf_mems); + dont_filter_deletes_(dont_filter_deletes), + concurrent_memtable_writes_(concurrent_memtable_writes) { + assert(cf_mems_); if (!dont_filter_deletes_) { assert(db_); } } bool SeekToColumnFamily(uint32_t column_family_id, Status* s) { - // We are only allowed to call this from a single-threaded write thread - // (or while holding DB mutex) + // If we are in a concurrent mode, it is the caller's responsibility + // to clone the original ColumnFamilyMemTables so that each thread + // has its own instance. Otherwise, it must be guaranteed that there + // is no concurrent access bool found = cf_mems_->Seek(column_family_id); if (!found) { if (ignore_missing_column_families_) { @@ -598,11 +607,13 @@ class MemTableInserter : public WriteBatch::Handler { MemTable* mem = cf_mems_->GetMemTable(); auto* moptions = mem->GetMemTableOptions(); if (!moptions->inplace_update_support) { - mem->Add(sequence_, kTypeValue, key, value); + mem->Add(sequence_, kTypeValue, key, value, concurrent_memtable_writes_); } else if (moptions->inplace_callback == nullptr) { + assert(!concurrent_memtable_writes_); mem->Update(sequence_, key, value); RecordTick(moptions->statistics, NUMBER_KEYS_UPDATED); } else { + assert(!concurrent_memtable_writes_); if (mem->UpdateCallback(sequence_, key, value)) { } else { // key not found in memtable. Do sst get, update, add @@ -640,7 +651,7 @@ class MemTableInserter : public WriteBatch::Handler { // sequence number. Even if the update eventually fails and does not result // in memtable add/update. sequence_++; - cf_mems_->CheckMemtableFull(); + CheckMemtableFull(); return Status::OK(); } @@ -654,6 +665,7 @@ class MemTableInserter : public WriteBatch::Handler { MemTable* mem = cf_mems_->GetMemTable(); auto* moptions = mem->GetMemTableOptions(); if (!dont_filter_deletes_ && moptions->filter_deletes) { + assert(!concurrent_memtable_writes_); SnapshotImpl read_from_snapshot; read_from_snapshot.number_ = sequence_; ReadOptions ropts; @@ -668,9 +680,9 @@ class MemTableInserter : public WriteBatch::Handler { return Status::OK(); } } - mem->Add(sequence_, delete_type, key, Slice()); + mem->Add(sequence_, delete_type, key, Slice(), concurrent_memtable_writes_); sequence_++; - cf_mems_->CheckMemtableFull(); + CheckMemtableFull(); return Status::OK(); } @@ -686,6 +698,7 @@ class MemTableInserter : public WriteBatch::Handler { virtual Status MergeCF(uint32_t column_family_id, const Slice& key, const Slice& value) override { + assert(!concurrent_memtable_writes_); Status seek_status; if (!SeekToColumnFamily(column_family_id, &seek_status)) { ++sequence_; @@ -760,24 +773,38 @@ class MemTableInserter : public WriteBatch::Handler { } sequence_++; - cf_mems_->CheckMemtableFull(); + CheckMemtableFull(); return Status::OK(); } + + void CheckMemtableFull() { + if (flush_scheduler_ != nullptr) { + auto* cfd = cf_mems_->current(); + assert(cfd != nullptr); + if (cfd->mem()->ShouldScheduleFlush() && + cfd->mem()->MarkFlushScheduled()) { + // MarkFlushScheduled only returns true if we are the one that + // should take action, so no need to dedup further + flush_scheduler_->ScheduleFlush(cfd); + } + } + } }; } // namespace // This function can only be called in these conditions: // 1) During Recovery() // 2) During Write(), in a single-threaded write thread +// 3) During Write(), in a concurrent context where memtables has been cloned // The reason is that it calls memtables->Seek(), which has a stateful cache -Status WriteBatchInternal::InsertInto(const autovector& batches, - SequenceNumber sequence, - ColumnFamilyMemTables* memtables, - bool ignore_missing_column_families, - uint64_t log_number, DB* db, - const bool dont_filter_deletes) { - MemTableInserter inserter(sequence, memtables, ignore_missing_column_families, - log_number, db, dont_filter_deletes); +Status WriteBatchInternal::InsertInto( + const autovector& batches, SequenceNumber sequence, + ColumnFamilyMemTables* memtables, FlushScheduler* flush_scheduler, + bool ignore_missing_column_families, uint64_t log_number, DB* db, + const bool dont_filter_deletes, bool concurrent_memtable_writes) { + MemTableInserter inserter(sequence, memtables, flush_scheduler, + ignore_missing_column_families, log_number, db, + dont_filter_deletes, concurrent_memtable_writes); Status rv = Status::OK(); for (size_t i = 0; i < batches.size() && rv.ok(); ++i) { rv = batches[i]->Iterate(&inserter); @@ -787,12 +814,15 @@ Status WriteBatchInternal::InsertInto(const autovector& batches, Status WriteBatchInternal::InsertInto(const WriteBatch* batch, ColumnFamilyMemTables* memtables, + FlushScheduler* flush_scheduler, bool ignore_missing_column_families, uint64_t log_number, DB* db, - const bool dont_filter_deletes) { + const bool dont_filter_deletes, + bool concurrent_memtable_writes) { MemTableInserter inserter(WriteBatchInternal::Sequence(batch), memtables, - ignore_missing_column_families, log_number, db, - dont_filter_deletes); + flush_scheduler, ignore_missing_column_families, + log_number, db, dont_filter_deletes, + concurrent_memtable_writes); return batch->Iterate(&inserter); } diff --git a/db/write_batch_internal.h b/db/write_batch_internal.h index 3ae4edc7a..d75d2ef65 100644 --- a/db/write_batch_internal.h +++ b/db/write_batch_internal.h @@ -8,6 +8,7 @@ // found in the LICENSE file. See the AUTHORS file for names of contributors. #pragma once +#include #include "rocksdb/types.h" #include "rocksdb/write_batch.h" #include "rocksdb/db.h" @@ -17,6 +18,8 @@ namespace rocksdb { class MemTable; +class FlushScheduler; +class ColumnFamilyData; class ColumnFamilyMemTables { public: @@ -28,7 +31,7 @@ class ColumnFamilyMemTables { virtual uint64_t GetLogNumber() const = 0; virtual MemTable* GetMemTable() const = 0; virtual ColumnFamilyHandle* GetColumnFamilyHandle() = 0; - virtual void CheckMemtableFull() = 0; + virtual ColumnFamilyData* current() { return nullptr; } }; class ColumnFamilyMemTablesDefault : public ColumnFamilyMemTables { @@ -50,8 +53,6 @@ class ColumnFamilyMemTablesDefault : public ColumnFamilyMemTables { ColumnFamilyHandle* GetColumnFamilyHandle() override { return nullptr; } - void CheckMemtableFull() override {} - private: bool ok_; MemTable* mem_; @@ -127,19 +128,29 @@ class WriteBatchInternal { // // If log_number is non-zero, the memtable will be updated only if // memtables->GetLogNumber() >= log_number. + // + // If flush_scheduler is non-null, it will be invoked if the memtable + // should be flushed. + // + // Under concurrent use, the caller is responsible for making sure that + // the memtables object itself is thread-local. static Status InsertInto(const autovector& batches, SequenceNumber sequence, ColumnFamilyMemTables* memtables, + FlushScheduler* flush_scheduler, bool ignore_missing_column_families = false, uint64_t log_number = 0, DB* db = nullptr, - const bool dont_filter_deletes = true); + const bool dont_filter_deletes = true, + bool concurrent_memtable_writes = false); // Convenience form of InsertInto when you have only one batch static Status InsertInto(const WriteBatch* batch, ColumnFamilyMemTables* memtables, + FlushScheduler* flush_scheduler, bool ignore_missing_column_families = false, uint64_t log_number = 0, DB* db = nullptr, - const bool dont_filter_deletes = true); + const bool dont_filter_deletes = true, + bool concurrent_memtable_writes = false); static void Append(WriteBatch* dst, const WriteBatch* src); diff --git a/db/write_batch_test.cc b/db/write_batch_test.cc index 62830da48..5d008b3a4 100644 --- a/db/write_batch_test.cc +++ b/db/write_batch_test.cc @@ -37,7 +37,7 @@ static std::string PrintContents(WriteBatch* b) { mem->Ref(); std::string state; ColumnFamilyMemTablesDefault cf_mems_default(mem); - Status s = WriteBatchInternal::InsertInto(b, &cf_mems_default); + Status s = WriteBatchInternal::InsertInto(b, &cf_mems_default, nullptr); int count = 0; int put_count = 0; int delete_count = 0; diff --git a/db/write_thread.cc b/db/write_thread.cc index cbfd2646f..a5285ce99 100644 --- a/db/write_thread.cc +++ b/db/write_thread.cc @@ -4,34 +4,197 @@ // of patent rights can be found in the PATENTS file in the same directory. #include "db/write_thread.h" +#include +#include +#include +#include "db/column_family.h" +#include "port/port.h" #include "util/sync_point.h" namespace rocksdb { -void WriteThread::Await(Writer* w) { - std::unique_lock guard(w->JoinMutex()); - w->JoinCV().wait(guard, [w] { return w->joined; }); +WriteThread::WriteThread(uint64_t max_yield_usec, uint64_t slow_yield_usec) + : max_yield_usec_(max_yield_usec), + slow_yield_usec_(slow_yield_usec), + newest_writer_(nullptr) {} + +uint8_t WriteThread::BlockingAwaitState(Writer* w, uint8_t goal_mask) { + // We're going to block. Lazily create the mutex. We guarantee + // propagation of this construction to the waker via the + // STATE_LOCKED_WAITING state. The waker won't try to touch the mutex + // or the condvar unless they CAS away the STATE_LOCKED_WAITING that + // we install below. + w->CreateMutex(); + + auto state = w->state.load(std::memory_order_acquire); + assert(state != STATE_LOCKED_WAITING); + if ((state & goal_mask) == 0 && + w->state.compare_exchange_strong(state, STATE_LOCKED_WAITING)) { + // we have permission (and an obligation) to use StateMutex + std::unique_lock guard(w->StateMutex()); + w->StateCV().wait(guard, [w] { + return w->state.load(std::memory_order_relaxed) != STATE_LOCKED_WAITING; + }); + state = w->state.load(std::memory_order_relaxed); + } + // else tricky. Goal is met or CAS failed. In the latter case the waker + // must have changed the state, and compare_exchange_strong has updated + // our local variable with the new one. At the moment WriteThread never + // waits for a transition across intermediate states, so we know that + // since a state change has occurred the goal must have been met. + assert((state & goal_mask) != 0); + return state; } -void WriteThread::MarkJoined(Writer* w) { - std::lock_guard guard(w->JoinMutex()); - assert(!w->joined); - w->joined = true; - w->JoinCV().notify_one(); +uint8_t WriteThread::AwaitState(Writer* w, uint8_t goal_mask, + AdaptationContext* ctx) { + uint8_t state; + + // On a modern Xeon each loop takes about 7 nanoseconds (most of which + // is the effect of the pause instruction), so 200 iterations is a bit + // more than a microsecond. This is long enough that waits longer than + // this can amortize the cost of accessing the clock and yielding. + for (uint32_t tries = 0; tries < 200; ++tries) { + state = w->state.load(std::memory_order_acquire); + if ((state & goal_mask) != 0) { + return state; + } + port::AsmVolatilePause(); + } + + // If we're only going to end up waiting a short period of time, + // it can be a lot more efficient to call std::this_thread::yield() + // in a loop than to block in StateMutex(). For reference, on my 4.0 + // SELinux test server with support for syscall auditing enabled, the + // minimum latency between FUTEX_WAKE to returning from FUTEX_WAIT is + // 2.7 usec, and the average is more like 10 usec. That can be a big + // drag on RockDB's single-writer design. Of course, spinning is a + // bad idea if other threads are waiting to run or if we're going to + // wait for a long time. How do we decide? + // + // We break waiting into 3 categories: short-uncontended, + // short-contended, and long. If we had an oracle, then we would always + // spin for short-uncontended, always block for long, and our choice for + // short-contended might depend on whether we were trying to optimize + // RocksDB throughput or avoid being greedy with system resources. + // + // Bucketing into short or long is easy by measuring elapsed time. + // Differentiating short-uncontended from short-contended is a bit + // trickier, but not too bad. We could look for involuntary context + // switches using getrusage(RUSAGE_THREAD, ..), but it's less work + // (portability code and CPU) to just look for yield calls that take + // longer than we expect. sched_yield() doesn't actually result in any + // context switch overhead if there are no other runnable processes + // on the current core, in which case it usually takes less than + // a microsecond. + // + // There are two primary tunables here: the threshold between "short" + // and "long" waits, and the threshold at which we suspect that a yield + // is slow enough to indicate we should probably block. If these + // thresholds are chosen well then CPU-bound workloads that don't + // have more threads than cores will experience few context switches + // (voluntary or involuntary), and the total number of context switches + // (voluntary and involuntary) will not be dramatically larger (maybe + // 2x) than the number of voluntary context switches that occur when + // --max_yield_wait_micros=0. + // + // There's another constant, which is the number of slow yields we will + // tolerate before reversing our previous decision. Solitary slow + // yields are pretty common (low-priority small jobs ready to run), + // so this should be at least 2. We set this conservatively to 3 so + // that we can also immediately schedule a ctx adaptation, rather than + // waiting for the next update_ctx. + + const size_t kMaxSlowYieldsWhileSpinning = 3; + + bool update_ctx = false; + bool would_spin_again = false; + + if (max_yield_usec_ > 0) { + update_ctx = Random::GetTLSInstance()->OneIn(256); + + if (update_ctx || ctx->value.load(std::memory_order_relaxed) >= 0) { + // we're updating the adaptation statistics, or spinning has > + // 50% chance of being shorter than max_yield_usec_ and causing no + // involuntary context switches + auto spin_begin = std::chrono::steady_clock::now(); + + // this variable doesn't include the final yield (if any) that + // causes the goal to be met + size_t slow_yield_count = 0; + + auto iter_begin = spin_begin; + while ((iter_begin - spin_begin) <= + std::chrono::microseconds(max_yield_usec_)) { + std::this_thread::yield(); + + state = w->state.load(std::memory_order_acquire); + if ((state & goal_mask) != 0) { + // success + would_spin_again = true; + break; + } + + auto now = std::chrono::steady_clock::now(); + if (now == iter_begin || + now - iter_begin >= std::chrono::microseconds(slow_yield_usec_)) { + // conservatively count it as a slow yield if our clock isn't + // accurate enough to measure the yield duration + ++slow_yield_count; + if (slow_yield_count >= kMaxSlowYieldsWhileSpinning) { + // Not just one ivcsw, but several. Immediately update ctx + // and fall back to blocking + update_ctx = true; + break; + } + } + iter_begin = now; + } + } + } + + if ((state & goal_mask) == 0) { + state = BlockingAwaitState(w, goal_mask); + } + + if (update_ctx) { + auto v = ctx->value.load(std::memory_order_relaxed); + // fixed point exponential decay with decay constant 1/1024, with +1 + // and -1 scaled to avoid overflow for int32_t + v = v + (v / 1024) + (would_spin_again ? 1 : -1) * 16384; + ctx->value.store(v, std::memory_order_relaxed); + } + + assert((state & goal_mask) != 0); + return state; } -void WriteThread::LinkOne(Writer* w, bool* wait_needed) { - assert(!w->joined && !w->done); +void WriteThread::SetState(Writer* w, uint8_t new_state) { + auto state = w->state.load(std::memory_order_acquire); + if (state == STATE_LOCKED_WAITING || + !w->state.compare_exchange_strong(state, new_state)) { + assert(state == STATE_LOCKED_WAITING); + + std::lock_guard guard(w->StateMutex()); + assert(w->state.load(std::memory_order_relaxed) != new_state); + w->state.store(new_state, std::memory_order_relaxed); + w->StateCV().notify_one(); + } +} + +void WriteThread::LinkOne(Writer* w, bool* linked_as_leader) { + assert(w->state == STATE_INIT); Writer* writers = newest_writer_.load(std::memory_order_relaxed); while (true) { w->link_older = writers; - if (writers != nullptr) { - w->CreateMutex(); - } if (newest_writer_.compare_exchange_strong(writers, w)) { - // Success. - *wait_needed = (writers != nullptr); + if (writers == nullptr) { + // this isn't part of the WriteThread machinery, but helps with + // debugging and is checked by an assert in WriteImpl + w->state.store(STATE_GROUP_LEADER, std::memory_order_relaxed); + } + *linked_as_leader = (writers == nullptr); return; } } @@ -50,11 +213,15 @@ void WriteThread::CreateMissingNewerLinks(Writer* head) { } void WriteThread::JoinBatchGroup(Writer* w) { + static AdaptationContext ctx{"JoinBatchGroup"}; + assert(w->batch != nullptr); - bool wait_needed; - LinkOne(w, &wait_needed); - if (wait_needed) { - Await(w); + bool linked_as_leader; + LinkOne(w, &linked_as_leader); + if (!linked_as_leader) { + AwaitState(w, + STATE_GROUP_LEADER | STATE_PARALLEL_FOLLOWER | STATE_COMPLETED, + &ctx); } } @@ -88,7 +255,7 @@ size_t WriteThread::EnterAsBatchGroupLeader( // This is safe regardless of any db mutex status of the caller. Previous // calls to ExitAsGroupLeader either didn't call CreateMissingNewerLinks // (they emptied the list and then we added ourself as leader) or had to - // explicitly wake up us (the list was non-empty when we added ourself, + // explicitly wake us up (the list was non-empty when we added ourself, // so we have already received our MarkJoined). CreateMissingNewerLinks(newest_writer); @@ -135,6 +302,73 @@ size_t WriteThread::EnterAsBatchGroupLeader( return size; } +void WriteThread::LaunchParallelFollowers(ParallelGroup* pg, + SequenceNumber sequence) { + // EnterAsBatchGroupLeader already created the links from leader to + // newer writers in the group + + pg->leader->parallel_group = pg; + + Writer* w = pg->leader; + w->sequence = sequence; + + while (w != pg->last_writer) { + sequence += WriteBatchInternal::Count(w->batch); + w = w->link_newer; + + w->sequence = sequence; + w->parallel_group = pg; + SetState(w, STATE_PARALLEL_FOLLOWER); + } +} + +bool WriteThread::CompleteParallelWorker(Writer* w) { + static AdaptationContext ctx{"CompleteParallelWorker"}; + + auto* pg = w->parallel_group; + if (!w->status.ok()) { + std::lock_guard guard(w->StateMutex()); + pg->status = w->status; + } + auto leader = pg->leader; + auto early_exit_allowed = pg->early_exit_allowed; + + if (pg->running.load(std::memory_order_acquire) > 1 && pg->running-- > 1) { + // we're not the last one + AwaitState(w, STATE_COMPLETED, &ctx); + + // Caller only needs to perform exit duties if early exit doesn't + // apply and this is the leader. Can't touch pg here. Whoever set + // our state to STATE_COMPLETED copied pg->status to w.status for us. + return w == leader && !(early_exit_allowed && w->status.ok()); + } + // else we're the last parallel worker + + if (w == leader || (early_exit_allowed && pg->status.ok())) { + // this thread should perform exit duties + w->status = pg->status; + return true; + } else { + // We're the last parallel follower but early commit is not + // applicable. Wake up the leader and then wait for it to exit. + assert(w->state == STATE_PARALLEL_FOLLOWER); + SetState(leader, STATE_COMPLETED); + AwaitState(w, STATE_COMPLETED, &ctx); + return false; + } +} + +void WriteThread::EarlyExitParallelGroup(Writer* w) { + auto* pg = w->parallel_group; + + assert(w->state == STATE_PARALLEL_FOLLOWER); + assert(pg->status.ok()); + ExitAsBatchGroupLeader(pg->leader, pg->last_writer, pg->status); + assert(w->state == STATE_COMPLETED); + assert(w->status.ok()); + SetState(pg->leader, STATE_COMPLETED); +} + void WriteThread::ExitAsBatchGroupLeader(Writer* leader, Writer* last_writer, Status status) { assert(leader->link_older == nullptr); @@ -166,31 +400,35 @@ void WriteThread::ExitAsBatchGroupLeader(Writer* leader, Writer* last_writer, // nullptr when they enqueued (we were definitely enqueued before them // and are still in the list). That means leader handoff occurs when // we call MarkJoined - MarkJoined(last_writer->link_newer); + SetState(last_writer->link_newer, STATE_GROUP_LEADER); } // else nobody else was waiting, although there might already be a new // leader now while (last_writer != leader) { last_writer->status = status; - last_writer->done = true; - // We must read link_older before calling MarkJoined, because as - // soon as it is marked the other thread's AwaitJoined may return - // and deallocate the Writer. + + // we need to read link_older before calling SetState, because as soon + // as it is marked committed the other thread's Await may return and + // deallocate the Writer. auto next = last_writer->link_older; - MarkJoined(last_writer); + SetState(last_writer, STATE_COMPLETED); + last_writer = next; } } void WriteThread::EnterUnbatched(Writer* w, InstrumentedMutex* mu) { + static AdaptationContext ctx{"EnterUnbatched"}; + static std::atomic adaptation_history{}; + assert(w->batch == nullptr); - bool wait_needed; - LinkOne(w, &wait_needed); - if (wait_needed) { + bool linked_as_leader; + LinkOne(w, &linked_as_leader); + if (!linked_as_leader) { mu->Unlock(); TEST_SYNC_POINT("WriteThread::EnterUnbatched:Wait"); - Await(w); + AwaitState(w, STATE_GROUP_LEADER, &ctx); mu->Lock(); } } diff --git a/db/write_thread.h b/db/write_thread.h index 3a15ea847..2fdd4a02f 100644 --- a/db/write_thread.h +++ b/db/write_thread.h @@ -8,11 +8,13 @@ #include #include #include +#include #include #include +#include #include -#include "rocksdb/status.h" #include "db/write_batch_internal.h" +#include "rocksdb/status.h" #include "util/autovector.h" #include "util/instrumented_mutex.h" @@ -20,19 +22,69 @@ namespace rocksdb { class WriteThread { public: + enum State : uint8_t { + // The initial state of a writer. This is a Writer that is + // waiting in JoinBatchGroup. This state can be left when another + // thread informs the waiter that it has become a group leader + // (-> STATE_GROUP_LEADER), when a leader that has chosen to be + // non-parallel informs a follower that its writes have been committed + // (-> STATE_COMPLETED), or when a leader that has chosen to perform + // updates in parallel and needs this Writer to apply its batch (-> + // STATE_PARALLEL_FOLLOWER). + STATE_INIT = 1, + + // The state used to inform a waiting Writer that it has become the + // leader, and it should now build a write batch group. Tricky: + // this state is not used if newest_writer_ is empty when a writer + // enqueues itself, because there is no need to wait (or even to + // create the mutex and condvar used to wait) in that case. This is + // a terminal state unless the leader chooses to make this a parallel + // batch, in which case the last parallel worker to finish will move + // the leader to STATE_COMPLETED. + STATE_GROUP_LEADER = 2, + + // A Writer that has returned as a follower in a parallel group. + // It should apply its batch to the memtable and then call + // CompleteParallelWorker. When someone calls ExitAsBatchGroupLeader + // or EarlyExitParallelGroup this state will get transitioned to + // STATE_COMPLETED. + STATE_PARALLEL_FOLLOWER = 4, + + // A follower whose writes have been applied, or a parallel leader + // whose followers have all finished their work. This is a terminal + // state. + STATE_COMPLETED = 8, + + // A state indicating that the thread may be waiting using StateMutex() + // and StateCondVar() + STATE_LOCKED_WAITING = 16, + }; + + struct Writer; + + struct ParallelGroup { + Writer* leader; + Writer* last_writer; + bool early_exit_allowed; + // before running goes to zero, status needs leader->StateMutex() + Status status; + std::atomic running; + }; + // Information kept for every waiting writer. struct Writer { WriteBatch* batch; bool sync; bool disableWAL; bool in_batch_group; - bool done; bool has_callback; + bool made_waitable; // records lazy construction of mutex and cv + std::atomic state; // write under StateMutex() or pre-link + ParallelGroup* parallel_group; + SequenceNumber sequence; // the sequence number to use Status status; - bool made_waitable; // records lazy construction of mutex and cv - bool joined; // read/write only under JoinMutex() (or pre-link) - std::aligned_storage::type join_mutex_bytes; - std::aligned_storage::type join_cv_bytes; + std::aligned_storage::type state_mutex_bytes; + std::aligned_storage::type state_cv_bytes; Writer* link_older; // read/write only before linking, or as leader Writer* link_newer; // lazy, read/write only before linking, or as leader @@ -41,44 +93,45 @@ class WriteThread { sync(false), disableWAL(false), in_batch_group(false), - done(false), has_callback(false), made_waitable(false), - joined(false), + state(STATE_INIT), link_older(nullptr), link_newer(nullptr) {} ~Writer() { if (made_waitable) { - JoinMutex().~mutex(); - JoinCV().~condition_variable(); + StateMutex().~mutex(); + StateCV().~condition_variable(); } } void CreateMutex() { - assert(!joined); if (!made_waitable) { + // Note that made_waitable is tracked separately from state + // transitions, because we can't atomically create the mutex and + // link into the list. made_waitable = true; - new (&join_mutex_bytes) std::mutex; - new (&join_cv_bytes) std::condition_variable; + new (&state_mutex_bytes) std::mutex; + new (&state_cv_bytes) std::condition_variable; } } - // No other mutexes may be acquired while holding JoinMutex(), it is + // No other mutexes may be acquired while holding StateMutex(), it is // always last in the order - std::mutex& JoinMutex() { + std::mutex& StateMutex() { assert(made_waitable); - return *static_cast(static_cast(&join_mutex_bytes)); + return *static_cast(static_cast(&state_mutex_bytes)); } - std::condition_variable& JoinCV() { + std::condition_variable& StateCV() { assert(made_waitable); return *static_cast( - static_cast(&join_cv_bytes)); + static_cast(&state_cv_bytes)); } }; - WriteThread() : newest_writer_(nullptr) {} + WriteThread(uint64_t max_yield_usec, uint64_t slow_yield_usec); // IMPORTANT: None of the methods in this class rely on the db mutex // for correctness. All of the methods except JoinBatchGroup and @@ -86,13 +139,16 @@ class WriteThread { // Correctness is maintained by ensuring that only a single thread is // a leader at a time. - // Registers w as ready to become part of a batch group, and blocks - // until some other thread has completed the write (in which case - // w->done will be set to true) or this write has become the leader - // of a batch group (w->done will remain unset). The db mutex SHOULD - // NOT be held when calling this function, because it will block. - // If !w->done then JoinBatchGroup should be followed by a call to - // EnterAsBatchGroupLeader and ExitAsBatchGroupLeader. + // Registers w as ready to become part of a batch group, waits until the + // caller should perform some work, and returns the current state of the + // writer. If w has become the leader of a write batch group, returns + // STATE_GROUP_LEADER. If w has been made part of a sequential batch + // group and the leader has performed the write, returns STATE_DONE. + // If w has been made part of a parallel batch group and is reponsible + // for updating the memtable, returns STATE_PARALLEL_FOLLOWER. + // + // The db mutex SHOULD NOT be held when calling this function, because + // it will block. // // Writer* w: Writer to be executed as part of a batch group void JoinBatchGroup(Writer* w); @@ -100,15 +156,35 @@ class WriteThread { // Constructs a write batch group led by leader, which should be a // Writer passed to JoinBatchGroup on the current thread. // - // Writer* leader: Writer passed to JoinBatchGroup, but !done - // Writer** last_writer: Out-param for use by ExitAsBatchGroupLeader + // Writer* leader: Writer that is STATE_GROUP_LEADER + // Writer** last_writer: Out-param that identifies the last follower // autovector* write_batch_group: Out-param of group members - // returns: Total batch group size + // returns: Total batch group byte size size_t EnterAsBatchGroupLeader(Writer* leader, Writer** last_writer, autovector* write_batch_group); - // Unlinks the Writer-s in a batch group, wakes up the non-leaders, and - // wakes up the next leader (if any). + // Causes JoinBatchGroup to return STATE_PARALLEL_FOLLOWER for all of the + // non-leader members of this write batch group. Sets Writer::sequence + // before waking them up. + // + // ParallalGroup* pg: Extra state used to coordinate the parallel add + // SequenceNumber sequence: Starting sequence number to assign to Writer-s + void LaunchParallelFollowers(ParallelGroup* pg, SequenceNumber sequence); + + // Reports the completion of w's batch to the parallel group leader, and + // waits for the rest of the parallel batch to complete. Returns true + // if this thread is the last to complete, and hence should advance + // the sequence number and then call EarlyExitParallelGroup, false if + // someone else has already taken responsibility for that. + bool CompleteParallelWorker(Writer* w); + + // This method performs an early completion of a parallel write group, + // where the cleanup work of the leader is performed by a follower who + // happens to be the last parallel worker to complete. + void EarlyExitParallelGroup(Writer* w); + + // Unlinks the Writer-s in a batch group, wakes up the non-leaders, + // and wakes up the next leader (if any). // // Writer* leader: From EnterAsBatchGroupLeader // Writer* last_writer: Value of out-param of EnterAsBatchGroupLeader @@ -128,18 +204,35 @@ class WriteThread { // writers. void ExitUnbatched(Writer* w); + struct AdaptationContext { + const char* name; + std::atomic value; + }; + private: + uint64_t max_yield_usec_; + uint64_t slow_yield_usec_; + // Points to the newest pending Writer. Only leader can remove // elements, adding can be done lock-free by anybody std::atomic newest_writer_; - void Await(Writer* w); - void MarkJoined(Writer* w); + // Waits for w->state & goal_mask using w->StateMutex(). Returns + // the state that satisfies goal_mask. + uint8_t BlockingAwaitState(Writer* w, uint8_t goal_mask); - // Links w into the newest_writer_ list. Sets *wait_needed to false - // if w was linked directly into the leader position, true otherwise. - // Safe to call from multiple threads without external locking. - void LinkOne(Writer* w, bool* wait_needed); + // Blocks until w->state & goal_mask, returning the state value + // that satisfied the predicate. Uses ctx to adaptively use + // std::this_thread::yield() to avoid mutex overheads. ctx should be + // a context-dependent static. + uint8_t AwaitState(Writer* w, uint8_t goal_mask, AdaptationContext* ctx); + + void SetState(Writer* w, uint8_t new_state); + + // Links w into the newest_writer_ list. Sets *linked_as_leader to + // true if w was linked directly into the leader position. Safe to + // call from multiple threads without external locking. + void LinkOne(Writer* w, bool* linked_as_leader); // Computes any missing link_newer links. Should not be called // concurrently with itself. diff --git a/db/writebuffer.h b/db/writebuffer.h index 7047a9244..4fe51d8a7 100644 --- a/db/writebuffer.h +++ b/db/writebuffer.h @@ -11,16 +11,20 @@ #pragma once +#include + namespace rocksdb { class WriteBuffer { public: explicit WriteBuffer(size_t _buffer_size) - : buffer_size_(_buffer_size), memory_used_(0) {} + : buffer_size_(_buffer_size), memory_used_(0) {} ~WriteBuffer() {} - size_t memory_usage() const { return memory_used_; } + size_t memory_usage() const { + return memory_used_.load(std::memory_order_relaxed); + } size_t buffer_size() const { return buffer_size_; } // Should only be called from write thread @@ -29,12 +33,16 @@ class WriteBuffer { } // Should only be called from write thread - void ReserveMem(size_t mem) { memory_used_ += mem; } - void FreeMem(size_t mem) { memory_used_ -= mem; } + void ReserveMem(size_t mem) { + memory_used_.fetch_add(mem, std::memory_order_relaxed); + } + void FreeMem(size_t mem) { + memory_used_.fetch_sub(mem, std::memory_order_relaxed); + } private: const size_t buffer_size_; - size_t memory_used_; + std::atomic memory_used_; // No copying allowed WriteBuffer(const WriteBuffer&); diff --git a/include/rocksdb/memtablerep.h b/include/rocksdb/memtablerep.h index f02c2d094..6cd92d823 100644 --- a/include/rocksdb/memtablerep.h +++ b/include/rocksdb/memtablerep.h @@ -36,7 +36,9 @@ #pragma once #include +#include #include +#include namespace rocksdb { @@ -68,25 +70,36 @@ class MemTableRep { explicit MemTableRep(MemTableAllocator* allocator) : allocator_(allocator) {} - // Allocate a buf of len size for storing key. The idea is that a specific - // memtable representation knows its underlying data structure better. By - // allowing it to allocate memory, it can possibly put correlated stuff - // in consecutive memory area to make processor prefetching more efficient. + // Allocate a buf of len size for storing key. The idea is that a + // specific memtable representation knows its underlying data structure + // better. By allowing it to allocate memory, it can possibly put + // correlated stuff in consecutive memory area to make processor + // prefetching more efficient. virtual KeyHandle Allocate(const size_t len, char** buf); // Insert key into the collection. (The caller will pack key and value into a // single buffer and pass that in as the parameter to Insert). // REQUIRES: nothing that compares equal to key is currently in the - // collection. + // collection, and no concurrent modifications to the table in progress virtual void Insert(KeyHandle handle) = 0; + // Like Insert(handle), but may be called concurrent with other calls + // to InsertConcurrently for other handles + virtual void InsertConcurrently(KeyHandle handle) { +#ifndef ROCKSDB_LITE + throw std::runtime_error("concurrent insert not supported"); +#else + abort(); +#endif + } + // Returns true iff an entry that compares equal to key is in the collection. virtual bool Contains(const char* key) const = 0; - // Notify this table rep that it will no longer be added to. By default, does - // nothing. After MarkReadOnly() is called, this table rep will not be - // written to (ie No more calls to Allocate(), Insert(), or any writes done - // directly to entries accessed through the iterator.) + // Notify this table rep that it will no longer be added to. By default, + // does nothing. After MarkReadOnly() is called, this table rep will + // not be written to (ie No more calls to Allocate(), Insert(), + // or any writes done directly to entries accessed through the iterator.) virtual void MarkReadOnly() { } // Look up key from the mem table, since the first key in the mem table whose @@ -94,6 +107,7 @@ class MemTableRep { // callback_args directly forwarded as the first parameter, and the mem table // key as the second parameter. If the return value is false, then terminates. // Otherwise, go through the next key. + // // It's safe for Get() to terminate after having finished all the potential // key for the k.user_key(), or not. // @@ -109,7 +123,7 @@ class MemTableRep { } // Report an approximation of how much memory has been used other than memory - // that was allocated through the allocator. + // that was allocated through the allocator. Safe to call from any thread. virtual size_t ApproximateMemoryUsage() = 0; virtual ~MemTableRep() { } @@ -174,6 +188,10 @@ class MemTableRep { // Default: true virtual bool IsSnapshotSupported() const { return true; } + // Return true if the current MemTableRep supports concurrent inserts + // Default: false + virtual bool IsInsertConcurrentlySupported() const { return false; } + protected: // When *key is an internal key concatenated with the value, returns the // user key. diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index fb35b4108..e7064b3cb 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -1173,6 +1173,47 @@ struct DBOptions { // Default: 2MB/s uint64_t delayed_write_rate; + // If true, allow multi-writers to update mem tables in parallel. + // Only some memtable_factory-s support concurrent writes; currently it + // is implemented only for SkipListFactory. Concurrent memtable writes + // are not compatible with inplace_update_support or filter_deletes. + // It is strongly recommended to set enable_write_thread_adaptive_yield + // if you are going to use this feature. + // + // THIS FEATURE IS NOT STABLE YET. + // + // Default: false + bool allow_concurrent_memtable_write; + + // If true, threads synchronizing with the write batch group leader will + // wait for up to write_thread_max_yield_usec before blocking on a mutex. + // This can substantially improve throughput for concurrent workloads, + // regardless of whether allow_concurrent_memtable_write is enabled. + // + // THIS FEATURE IS NOT STABLE YET. + // + // Default: false + bool enable_write_thread_adaptive_yield; + + // The maximum number of microseconds that a write operation will use + // a yielding spin loop to coordinate with other write threads before + // blocking on a mutex. (Assuming write_thread_slow_yield_usec is + // set properly) increasing this value is likely to increase RocksDB + // throughput at the expense of increased CPU usage. + // + // Default: 100 + uint64_t write_thread_max_yield_usec; + + // The latency in microseconds after which a std::this_thread::yield + // call (sched_yield on Linux) is considered to be a signal that + // other processes or threads would like to use the current core. + // Increasing this makes writer threads more likely to take CPU + // by spinning, which will show up as an increase in the number of + // involuntary context switches. + // + // Default: 3 + uint64_t write_thread_slow_yield_usec; + // If true, then DB::Open() will not update the statistics used to optimize // compaction decision by loading table properties from many files. // Turning off this feature will improve DBOpen time especially in diff --git a/include/rocksdb/statistics.h b/include/rocksdb/statistics.h index 9a21fe174..15c49439c 100644 --- a/include/rocksdb/statistics.h +++ b/include/rocksdb/statistics.h @@ -145,7 +145,7 @@ enum Tickers : uint32_t { // Writes can be processed by requesting thread or by the thread at the // head of the writers queue. WRITE_DONE_BY_SELF, - WRITE_DONE_BY_OTHER, + WRITE_DONE_BY_OTHER, // Equivalent to writes done for others WRITE_TIMEDOUT, // Number of writes ending up with timed-out. WRITE_WITH_WAL, // Number of Write calls that request WAL COMPACT_READ_BYTES, // Bytes read during compaction diff --git a/port/port_posix.cc b/port/port_posix.cc index 773c6f1c3..73ad3caf1 100644 --- a/port/port_posix.cc +++ b/port/port_posix.cc @@ -10,6 +10,9 @@ #include "port/port_posix.h" #include +#if defined(__i386__) || defined(__x86_64__) +#include +#endif #include #include #include @@ -132,6 +135,19 @@ void RWMutex::ReadUnlock() { PthreadCall("read unlock", pthread_rwlock_unlock(&m void RWMutex::WriteUnlock() { PthreadCall("write unlock", pthread_rwlock_unlock(&mu_)); } +int PhysicalCoreID() { +#if defined(__i386__) || defined(__x86_64__) + // if you ever find that this function is hot on Linux, you can go from + // ~200 nanos to ~20 nanos by adding the machinery to use __vdso_getcpu + unsigned eax, ebx = 0, ecx, edx; + __get_cpuid(1, &eax, &ebx, &ecx, &edx); + return ebx >> 24; +#else + // getcpu or sched_getcpu could work here + return -1; +#endif +} + void InitOnce(OnceType* once, void (*initializer)()) { PthreadCall("once", pthread_once(once, initializer)); } diff --git a/port/port_posix.h b/port/port_posix.h index 8854e1c3f..efcd1aa8e 100644 --- a/port/port_posix.h +++ b/port/port_posix.h @@ -43,8 +43,9 @@ #include #include -#include #include +#include +#include #ifndef PLATFORM_IS_LITTLE_ENDIAN #define PLATFORM_IS_LITTLE_ENDIAN (__BYTE_ORDER == __LITTLE_ENDIAN) @@ -71,8 +72,6 @@ #define fdatasync fsync #endif -#include - namespace rocksdb { namespace port { @@ -142,6 +141,20 @@ class CondVar { Mutex* mu_; }; +static inline void AsmVolatilePause() { +#if defined(__i386__) || defined(__x86_64__) + asm volatile("pause"); +#elif defined(__aarch64__) + asm volatile("wfe"); +#elif defined(__powerpc64__) + asm volatile("or 27,27,27"); +#endif + // it's okay for other platforms to be no-ops +} + +// Returns -1 if not available on this platform +extern int PhysicalCoreID(); + typedef pthread_once_t OnceType; #define LEVELDB_ONCE_INIT PTHREAD_ONCE_INIT extern void InitOnce(OnceType* once, void (*initializer)()); diff --git a/port/win/port_win.cc b/port/win/port_win.cc index 91c83b8c9..e08f0ec22 100644 --- a/port/win/port_win.cc +++ b/port/win/port_win.cc @@ -100,6 +100,8 @@ void CondVar::Signal() { cv_.notify_one(); } void CondVar::SignalAll() { cv_.notify_all(); } +int PhysicalCoreID() { return GetCurrentProcessorNumber(); } + void InitOnce(OnceType* once, void (*initializer)()) { std::call_once(once->flag_, initializer); } diff --git a/port/win/port_win.h b/port/win/port_win.h index 6b7920023..9ee7d96be 100644 --- a/port/win/port_win.h +++ b/port/win/port_win.h @@ -243,6 +243,15 @@ extern void InitOnce(OnceType* once, void (*initializer)()); #define CACHE_LINE_SIZE 64U +static inline void AsmVolatilePause() { +#if defined(_M_IX86) || defined(_M_X64) + ::_mm_pause(); +#endif + // it would be nice to get "wfe" on ARM here +} + +extern int PhysicalCoreID(); + // For Thread Local Storage abstraction typedef DWORD pthread_key_t; diff --git a/src.mk b/src.mk index 40f7f0d7a..369890258 100644 --- a/src.mk +++ b/src.mk @@ -88,6 +88,7 @@ LIB_SOURCES = \ util/coding.cc \ util/comparator.cc \ util/compaction_job_stats_impl.cc \ + util/concurrent_arena.cc \ util/crc32c.cc \ util/db_info_dumper.cc \ util/delete_scheduler_impl.cc \ diff --git a/table/table_test.cc b/table/table_test.cc index 58607bbb2..cf58a2477 100644 --- a/table/table_test.cc +++ b/table/table_test.cc @@ -2041,7 +2041,8 @@ TEST_F(MemTableTest, Simple) { batch.Put(std::string("k3"), std::string("v3")); batch.Put(std::string("largekey"), std::string("vlarge")); ColumnFamilyMemTablesDefault cf_mems_default(memtable); - ASSERT_TRUE(WriteBatchInternal::InsertInto(&batch, &cf_mems_default).ok()); + ASSERT_TRUE( + WriteBatchInternal::InsertInto(&batch, &cf_mems_default, nullptr).ok()); Arena arena; ScopedArenaIterator iter(memtable->NewIterator(ReadOptions(), &arena)); diff --git a/util/arena.cc b/util/arena.cc index 1fe455af5..1d292ec01 100644 --- a/util/arena.cc +++ b/util/arena.cc @@ -167,7 +167,7 @@ char* Arena::AllocateAligned(size_t bytes, size_t huge_page_size, aligned_alloc_ptr_ += needed; alloc_bytes_remaining_ -= needed; } else { - // AllocateFallback always returned aligned memory + // AllocateFallback always returns aligned memory result = AllocateFallback(bytes, true /* aligned */); } assert((reinterpret_cast(result) & (kAlignUnit - 1)) == 0); diff --git a/util/arena.h b/util/arena.h index 9149498c8..db2150a8f 100644 --- a/util/arena.h +++ b/util/arena.h @@ -21,6 +21,7 @@ #include #include #include "util/allocator.h" +#include "util/mutexlock.h" namespace rocksdb { @@ -76,7 +77,7 @@ class Arena : public Allocator { size_t BlockSize() const override { return kBlockSize; } private: - char inline_block_[kInlineSize]; + char inline_block_[kInlineSize] __attribute__((__aligned__(sizeof(void*)))); // Number of bytes allocated in one block const size_t kBlockSize; // Array of new[] allocated memory blocks diff --git a/util/concurrent_arena.cc b/util/concurrent_arena.cc new file mode 100644 index 000000000..027124871 --- /dev/null +++ b/util/concurrent_arena.cc @@ -0,0 +1,49 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "util/concurrent_arena.h" +#include +#include "port/likely.h" +#include "port/port.h" +#include "util/random.h" + +namespace rocksdb { + +#if ROCKSDB_SUPPORT_THREAD_LOCAL +__thread uint32_t ConcurrentArena::tls_cpuid = 0; +#endif + +ConcurrentArena::ConcurrentArena(size_t block_size, size_t huge_page_size) + : shard_block_size_(block_size / 8), arena_(block_size, huge_page_size) { + // find a power of two >= num_cpus and >= 8 + auto num_cpus = std::thread::hardware_concurrency(); + index_mask_ = 7; + while (index_mask_ + 1 < num_cpus) { + index_mask_ = index_mask_ * 2 + 1; + } + + shards_.reset(new Shard[index_mask_ + 1]); + Fixup(); +} + +ConcurrentArena::Shard* ConcurrentArena::Repick() { + int cpuid = port::PhysicalCoreID(); + if (UNLIKELY(cpuid < 0)) { + // cpu id unavailable, just pick randomly + cpuid = Random::GetTLSInstance()->Uniform(index_mask_ + 1); + } +#if ROCKSDB_SUPPORT_THREAD_LOCAL + // even if we are cpu 0, use a non-zero tls_cpuid so we can tell we + // have repicked + tls_cpuid = cpuid | (index_mask_ + 1); +#endif + return &shards_[cpuid & index_mask_]; +} + +} // namespace rocksdb diff --git a/util/concurrent_arena.h b/util/concurrent_arena.h new file mode 100644 index 000000000..e3e1a3eb3 --- /dev/null +++ b/util/concurrent_arena.h @@ -0,0 +1,192 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#pragma once +#include +#include +#include +#include "port/likely.h" +#include "util/allocator.h" +#include "util/arena.h" +#include "util/mutexlock.h" +#include "util/thread_local.h" + +namespace rocksdb { + +class Logger; + +// ConcurrentArena wraps an Arena. It makes it thread safe using a fast +// inlined spinlock, and adds small per-core allocation caches to avoid +// contention for small allocations. To avoid any memory waste from the +// per-core shards, they are kept small, they are lazily instantiated +// only if ConcurrentArena actually notices concurrent use, and they +// adjust their size so that there is no fragmentation waste when the +// shard blocks are allocated from the underlying main arena. +class ConcurrentArena : public Allocator { + public: + // block_size and huge_page_size are the same as for Arena (and are + // in fact just passed to the constructor of arena_. The core-local + // shards compute their shard_block_size as a fraction of block_size + // that varies according to the hardware concurrency level. + explicit ConcurrentArena(size_t block_size = Arena::kMinBlockSize, + size_t huge_page_size = 0); + + char* Allocate(size_t bytes) override { + return AllocateImpl(bytes, false /*force_arena*/, + [=]() { return arena_.Allocate(bytes); }); + } + + char* AllocateAligned(size_t bytes, size_t huge_page_size = 0, + Logger* logger = nullptr) override { + size_t rounded_up = ((bytes - 1) | (sizeof(void*) - 1)) + 1; + assert(rounded_up >= bytes && rounded_up < bytes + sizeof(void*) && + (rounded_up % sizeof(void*)) == 0); + + return AllocateImpl(rounded_up, huge_page_size != 0 /*force_arena*/, [=]() { + return arena_.AllocateAligned(rounded_up, huge_page_size, logger); + }); + } + + size_t ApproximateMemoryUsage() const { + std::unique_lock lock(arena_mutex_, std::defer_lock); + if (index_mask_ != 0) { + lock.lock(); + } + return arena_.ApproximateMemoryUsage() - ShardAllocatedAndUnused(); + } + + size_t MemoryAllocatedBytes() const { + return memory_allocated_bytes_.load(std::memory_order_relaxed); + } + + size_t AllocatedAndUnused() const { + return arena_allocated_and_unused_.load(std::memory_order_relaxed) + + ShardAllocatedAndUnused(); + } + + size_t IrregularBlockNum() const { + return irregular_block_num_.load(std::memory_order_relaxed); + } + + size_t BlockSize() const override { return arena_.BlockSize(); } + + private: + struct Shard { + char padding[40]; + mutable SpinMutex mutex; + char* free_begin_; + std::atomic allocated_and_unused_; + + Shard() : allocated_and_unused_(0) {} + }; + +#if ROCKSDB_SUPPORT_THREAD_LOCAL + static __thread uint32_t tls_cpuid; +#else + enum ZeroFirstEnum : uint32_t { tls_cpuid = 0 }; +#endif + + char padding0[56]; + + size_t shard_block_size_; + + // shards_[i & index_mask_] is valid + size_t index_mask_; + std::unique_ptr shards_; + + Arena arena_; + mutable SpinMutex arena_mutex_; + std::atomic arena_allocated_and_unused_; + std::atomic memory_allocated_bytes_; + std::atomic irregular_block_num_; + + char padding1[56]; + + Shard* Repick(); + + size_t ShardAllocatedAndUnused() const { + size_t total = 0; + for (size_t i = 0; i <= index_mask_; ++i) { + total += shards_[i].allocated_and_unused_.load(std::memory_order_relaxed); + } + return total; + } + + template + char* AllocateImpl(size_t bytes, bool force_arena, const Func& func) { + uint32_t cpu; + + // Go directly to the arena if the allocation is too large, or if + // we've never needed to Repick() and the arena mutex is available + // with no waiting. This keeps the fragmentation penalty of + // concurrency zero unless it might actually confer an advantage. + std::unique_lock arena_lock(arena_mutex_, std::defer_lock); + if (bytes > shard_block_size_ / 4 || force_arena || + ((cpu = tls_cpuid) == 0 && + !shards_[0].allocated_and_unused_.load(std::memory_order_relaxed) && + arena_lock.try_lock())) { + if (!arena_lock.owns_lock()) { + arena_lock.lock(); + } + auto rv = func(); + Fixup(); + return rv; + } + + // pick a shard from which to allocate + Shard* s = &shards_[cpu & index_mask_]; + if (!s->mutex.try_lock()) { + s = Repick(); + s->mutex.lock(); + } + std::unique_lock lock(s->mutex, std::adopt_lock); + + size_t avail = s->allocated_and_unused_.load(std::memory_order_relaxed); + if (avail < bytes) { + // reload + std::lock_guard reload_lock(arena_mutex_); + + // If the arena's current block is within a factor of 2 of the right + // size, we adjust our request to avoid arena waste. + auto exact = arena_allocated_and_unused_.load(std::memory_order_relaxed); + assert(exact == arena_.AllocatedAndUnused()); + avail = exact >= shard_block_size_ / 2 && exact < shard_block_size_ * 2 + ? exact + : shard_block_size_; + s->free_begin_ = arena_.AllocateAligned(avail); + Fixup(); + } + s->allocated_and_unused_.store(avail - bytes, std::memory_order_relaxed); + + char* rv; + if ((bytes % sizeof(void*)) == 0) { + // aligned allocation from the beginning + rv = s->free_begin_; + s->free_begin_ += bytes; + } else { + // unaligned from the end + rv = s->free_begin_ + avail - bytes; + } + return rv; + } + + void Fixup() { + arena_allocated_and_unused_.store(arena_.AllocatedAndUnused(), + std::memory_order_relaxed); + memory_allocated_bytes_.store(arena_.MemoryAllocatedBytes(), + std::memory_order_relaxed); + irregular_block_num_.store(arena_.IrregularBlockNum(), + std::memory_order_relaxed); + } + + ConcurrentArena(const ConcurrentArena&) = delete; + ConcurrentArena& operator=(const ConcurrentArena&) = delete; +}; + +} // namespace rocksdb diff --git a/util/dynamic_bloom.cc b/util/dynamic_bloom.cc index ffe8157cc..4df81d527 100644 --- a/util/dynamic_bloom.cc +++ b/util/dynamic_bloom.cc @@ -48,7 +48,7 @@ DynamicBloom::DynamicBloom(uint32_t num_probes, void DynamicBloom::SetRawData(unsigned char* raw_data, uint32_t total_bits, uint32_t num_blocks) { - data_ = raw_data; + data_ = reinterpret_cast*>(raw_data); kTotalBits = total_bits; kNumBlocks = num_blocks; } @@ -69,15 +69,14 @@ void DynamicBloom::SetTotalBits(Allocator* allocator, sz += CACHE_LINE_SIZE - 1; } assert(allocator); - raw_ = reinterpret_cast( - allocator->AllocateAligned(sz, huge_page_tlb_size, logger)); - memset(raw_, 0, sz); - if (kNumBlocks > 0 && (reinterpret_cast(raw_) % CACHE_LINE_SIZE)) { - data_ = raw_ + CACHE_LINE_SIZE - - reinterpret_cast(raw_) % CACHE_LINE_SIZE; - } else { - data_ = raw_; + + char* raw = allocator->AllocateAligned(sz, huge_page_tlb_size, logger); + memset(raw, 0, sz); + auto cache_line_offset = reinterpret_cast(raw) % CACHE_LINE_SIZE; + if (kNumBlocks > 0 && cache_line_offset > 0) { + raw += CACHE_LINE_SIZE - cache_line_offset; } + data_ = reinterpret_cast*>(raw); } } // rocksdb diff --git a/util/dynamic_bloom.h b/util/dynamic_bloom.h index e2ac56e76..8d1b7b4af 100644 --- a/util/dynamic_bloom.h +++ b/util/dynamic_bloom.h @@ -51,9 +51,15 @@ class DynamicBloom { // Assuming single threaded access to this function. void Add(const Slice& key); + // Like Add, but may be called concurrent with other functions. + void AddConcurrently(const Slice& key); + // Assuming single threaded access to this function. void AddHash(uint32_t hash); + // Like AddHash, but may be called concurrent with other functions. + void AddHashConcurrently(uint32_t hash); + // Multithreaded access to this function is OK bool MayContain(const Slice& key) const; @@ -81,12 +87,40 @@ class DynamicBloom { const uint32_t kNumProbes; uint32_t (*hash_func_)(const Slice& key); - unsigned char* data_; - unsigned char* raw_; + std::atomic* data_; + + // or_func(ptr, mask) should effect *ptr |= mask with the appropriate + // concurrency safety, working with bytes. + template + void AddHash(uint32_t hash, const OrFunc& or_func); }; inline void DynamicBloom::Add(const Slice& key) { AddHash(hash_func_(key)); } +inline void DynamicBloom::AddConcurrently(const Slice& key) { + AddHashConcurrently(hash_func_(key)); +} + +inline void DynamicBloom::AddHash(uint32_t hash) { + AddHash(hash, [](std::atomic* ptr, uint8_t mask) { + ptr->store(ptr->load(std::memory_order_relaxed) | mask, + std::memory_order_relaxed); + }); +} + +inline void DynamicBloom::AddHashConcurrently(uint32_t hash) { + AddHash(hash, [](std::atomic* ptr, uint8_t mask) { + // Happens-before between AddHash and MaybeContains is handled by + // access to versions_->LastSequence(), so all we have to do here is + // avoid races (so we don't give the compiler a license to mess up + // our code) and not lose bits. std::memory_order_relaxed is enough + // for that. + if ((mask & ptr->load(std::memory_order_relaxed)) != mask) { + ptr->fetch_or(mask, std::memory_order_relaxed); + } + }); +} + inline bool DynamicBloom::MayContain(const Slice& key) const { return (MayContainHash(hash_func_(key))); } @@ -107,7 +141,8 @@ inline bool DynamicBloom::MayContainHash(uint32_t h) const { // Since CACHE_LINE_SIZE is defined as 2^n, this line will be optimized // to a simple and operation by compiler. const uint32_t bitpos = b + (h % (CACHE_LINE_SIZE * 8)); - if (((data_[bitpos / 8]) & (1 << (bitpos % 8))) == 0) { + uint8_t byteval = data_[bitpos / 8].load(std::memory_order_relaxed); + if ((byteval & (1 << (bitpos % 8))) == 0) { return false; } // Rotate h so that we don't reuse the same bytes. @@ -118,7 +153,8 @@ inline bool DynamicBloom::MayContainHash(uint32_t h) const { } else { for (uint32_t i = 0; i < kNumProbes; ++i) { const uint32_t bitpos = h % kTotalBits; - if (((data_[bitpos / 8]) & (1 << (bitpos % 8))) == 0) { + uint8_t byteval = data_[bitpos / 8].load(std::memory_order_relaxed); + if ((byteval & (1 << (bitpos % 8))) == 0) { return false; } h += delta; @@ -127,7 +163,8 @@ inline bool DynamicBloom::MayContainHash(uint32_t h) const { return true; } -inline void DynamicBloom::AddHash(uint32_t h) { +template +inline void DynamicBloom::AddHash(uint32_t h, const OrFunc& or_func) { assert(IsInitialized()); const uint32_t delta = (h >> 17) | (h << 15); // Rotate right 17 bits if (kNumBlocks != 0) { @@ -136,7 +173,7 @@ inline void DynamicBloom::AddHash(uint32_t h) { // Since CACHE_LINE_SIZE is defined as 2^n, this line will be optimized // to a simple and operation by compiler. const uint32_t bitpos = b + (h % (CACHE_LINE_SIZE * 8)); - data_[bitpos / 8] |= (1 << (bitpos % 8)); + or_func(&data_[bitpos / 8], (1 << (bitpos % 8))); // Rotate h so that we don't reuse the same bytes. h = h / (CACHE_LINE_SIZE * 8) + (h % (CACHE_LINE_SIZE * 8)) * (0x20000000U / CACHE_LINE_SIZE); @@ -145,7 +182,7 @@ inline void DynamicBloom::AddHash(uint32_t h) { } else { for (uint32_t i = 0; i < kNumProbes; ++i) { const uint32_t bitpos = h % kTotalBits; - data_[bitpos / 8] |= (1 << (bitpos % 8)); + or_func(&data_[bitpos / 8], (1 << (bitpos % 8))); h += delta; } } diff --git a/util/dynamic_bloom_test.cc b/util/dynamic_bloom_test.cc index cb3836661..e5ef4aab7 100644 --- a/util/dynamic_bloom_test.cc +++ b/util/dynamic_bloom_test.cc @@ -17,6 +17,10 @@ int main() { #include #include +#include +#include +#include +#include #include #include "dynamic_bloom.h" @@ -72,6 +76,25 @@ TEST_F(DynamicBloomTest, Small) { ASSERT_TRUE(!bloom2.MayContain("foo")); } +TEST_F(DynamicBloomTest, SmallConcurrentAdd) { + Arena arena; + DynamicBloom bloom1(&arena, 100, 0, 2); + bloom1.AddConcurrently("hello"); + bloom1.AddConcurrently("world"); + ASSERT_TRUE(bloom1.MayContain("hello")); + ASSERT_TRUE(bloom1.MayContain("world")); + ASSERT_TRUE(!bloom1.MayContain("x")); + ASSERT_TRUE(!bloom1.MayContain("foo")); + + DynamicBloom bloom2(&arena, CACHE_LINE_SIZE * 8 * 2 - 1, 1, 2); + bloom2.AddConcurrently("hello"); + bloom2.AddConcurrently("world"); + ASSERT_TRUE(bloom2.MayContain("hello")); + ASSERT_TRUE(bloom2.MayContain("world")); + ASSERT_TRUE(!bloom2.MayContain("x")); + ASSERT_TRUE(!bloom2.MayContain("foo")); +} + static uint32_t NextNum(uint32_t num) { if (num < 10) { num += 1; @@ -93,8 +116,8 @@ TEST_F(DynamicBloomTest, VaryingLengths) { int good_filters = 0; uint32_t num_probes = static_cast(FLAGS_num_probes); - fprintf(stderr, "bits_per_key: %d num_probes: %d\n", - FLAGS_bits_per_key, num_probes); + fprintf(stderr, "bits_per_key: %d num_probes: %d\n", FLAGS_bits_per_key, + num_probes); for (uint32_t enable_locality = 0; enable_locality < 2; ++enable_locality) { for (uint32_t num = 1; num <= 10000; num = NextNum(num)) { @@ -114,8 +137,8 @@ TEST_F(DynamicBloomTest, VaryingLengths) { // All added keys must match for (uint64_t i = 0; i < num; i++) { - ASSERT_TRUE(bloom.MayContain(Key(i, buffer))) - << "Num " << num << "; key " << i; + ASSERT_TRUE(bloom.MayContain(Key(i, buffer))) << "Num " << num + << "; key " << i; } // Check false positive rate @@ -139,9 +162,9 @@ TEST_F(DynamicBloomTest, VaryingLengths) { good_filters++; } - fprintf(stderr, "Filters: %d good, %d mediocre\n", - good_filters, mediocre_filters); - ASSERT_LE(mediocre_filters, good_filters/5); + fprintf(stderr, "Filters: %d good, %d mediocre\n", good_filters, + mediocre_filters); + ASSERT_LE(mediocre_filters, good_filters / 5); } } @@ -161,7 +184,7 @@ TEST_F(DynamicBloomTest, perf) { DynamicBloom std_bloom(&arena, num_keys * 10, 0, num_probes); timer.Start(); - for (uint32_t i = 1; i <= num_keys; ++i) { + for (uint64_t i = 1; i <= num_keys; ++i) { std_bloom.Add(Slice(reinterpret_cast(&i), 8)); } @@ -171,7 +194,7 @@ TEST_F(DynamicBloomTest, perf) { uint32_t count = 0; timer.Start(); - for (uint32_t i = 1; i <= num_keys; ++i) { + for (uint64_t i = 1; i <= num_keys; ++i) { if (std_bloom.MayContain(Slice(reinterpret_cast(&i), 8))) { ++count; } @@ -184,31 +207,125 @@ TEST_F(DynamicBloomTest, perf) { // Locality enabled version DynamicBloom blocked_bloom(&arena, num_keys * 10, 1, num_probes); + timer.Start(); + for (uint64_t i = 1; i <= num_keys; ++i) { + blocked_bloom.Add(Slice(reinterpret_cast(&i), 8)); + } + + elapsed = timer.ElapsedNanos(); + fprintf(stderr, + "blocked bloom(enable locality), avg add latency %" PRIu64 "\n", + elapsed / num_keys); + + count = 0; + timer.Start(); + for (uint64_t i = 1; i <= num_keys; ++i) { + if (blocked_bloom.MayContain( + Slice(reinterpret_cast(&i), 8))) { + ++count; + } + } + + elapsed = timer.ElapsedNanos(); + fprintf(stderr, + "blocked bloom(enable locality), avg query latency %" PRIu64 "\n", + elapsed / count); + ASSERT_TRUE(count == num_keys); + } +} + +TEST_F(DynamicBloomTest, concurrent_with_perf) { + StopWatchNano timer(Env::Default()); + uint32_t num_probes = static_cast(FLAGS_num_probes); + + uint32_t m_limit = FLAGS_enable_perf ? 8 : 1; + uint32_t locality_limit = FLAGS_enable_perf ? 1 : 0; + + uint32_t num_threads = 4; + std::vector threads; + + for (uint32_t m = 1; m <= m_limit; ++m) { + for (uint32_t locality = 0; locality <= locality_limit; ++locality) { + Arena arena; + const uint32_t num_keys = m * 8 * 1024 * 1024; + fprintf(stderr, "testing %" PRIu32 "M keys with %" PRIu32 " locality\n", + m * 8, locality); + + DynamicBloom std_bloom(&arena, num_keys * 10, locality, num_probes); + timer.Start(); - for (uint32_t i = 1; i <= num_keys; ++i) { - blocked_bloom.Add(Slice(reinterpret_cast(&i), 8)); + + auto adder = [&](size_t t) { + for (uint64_t i = 1 + t; i <= num_keys; i += num_threads) { + std_bloom.AddConcurrently( + Slice(reinterpret_cast(&i), 8)); + } + }; + for (size_t t = 0; t < num_threads; ++t) { + // TSAN currently complains of a race between an allocation + // made bythis race and the eventual shutdown of the thread. + // It is a false positive. + threads.emplace_back(adder, t); + } + while (threads.size() > 0) { + threads.back().join(); + threads.pop_back(); } - elapsed = timer.ElapsedNanos(); - fprintf(stderr, - "blocked bloom(enable locality), avg add latency %" PRIu64 "\n", + uint64_t elapsed = timer.ElapsedNanos(); + fprintf(stderr, "standard bloom, avg parallel add latency %" PRIu64 + " nanos/key\n", elapsed / num_keys); - count = 0; timer.Start(); - for (uint32_t i = 1; i <= num_keys; ++i) { - if (blocked_bloom.MayContain( - Slice(reinterpret_cast(&i), 8))) { - ++count; + + auto hitter = [&](size_t t) { + for (uint64_t i = 1 + t; i <= num_keys; i += num_threads) { + bool f = + std_bloom.MayContain(Slice(reinterpret_cast(&i), 8)); + ASSERT_TRUE(f); } + }; + for (size_t t = 0; t < num_threads; ++t) { + threads.emplace_back(hitter, t); + } + while (threads.size() > 0) { + threads.back().join(); + threads.pop_back(); } elapsed = timer.ElapsedNanos(); - fprintf(stderr, - "blocked bloom(enable locality), avg query latency %" PRIu64 "\n", - elapsed / count); - ASSERT_TRUE(count == num_keys); + fprintf(stderr, "standard bloom, avg parallel hit latency %" PRIu64 + " nanos/key\n", + elapsed / num_keys); + + timer.Start(); + + std::atomic false_positives(0); + auto misser = [&](size_t t) { + for (uint64_t i = num_keys + 1 + t; i <= 2 * num_keys; + i += num_threads) { + bool f = + std_bloom.MayContain(Slice(reinterpret_cast(&i), 8)); + if (f) { + ++false_positives; + } + } + }; + for (size_t t = 0; t < num_threads; ++t) { + threads.emplace_back(misser, t); + } + while (threads.size() > 0) { + threads.back().join(); + threads.pop_back(); + } + + elapsed = timer.ElapsedNanos(); + fprintf(stderr, "standard bloom, avg parallel miss latency %" PRIu64 + " nanos/key, %f%% false positive rate\n", + elapsed / num_keys, false_positives.load() * 100.0 / num_keys); } + } } } // namespace rocksdb diff --git a/util/mutexlock.h b/util/mutexlock.h index 6121ec1ec..63a0f5ce1 100644 --- a/util/mutexlock.h +++ b/util/mutexlock.h @@ -8,6 +8,10 @@ // found in the LICENSE file. See the AUTHORS file for names of contributors. #pragma once +#include +#include +#include +#include #include "port/port.h" namespace rocksdb { @@ -75,4 +79,39 @@ class WriteLock { void operator=(const WriteLock&); }; +// +// SpinMutex has very low overhead for low-contention cases. Method names +// are chosen so you can use std::unique_lock or std::lock_guard with it. +// +class SpinMutex { + public: + SpinMutex() : locked_(false) {} + + bool try_lock() { + auto currently_locked = locked_.load(std::memory_order_relaxed); + return !currently_locked && + locked_.compare_exchange_weak(currently_locked, true, + std::memory_order_acquire, + std::memory_order_relaxed); + } + + void lock() { + for (size_t tries = 0;; ++tries) { + if (try_lock()) { + // success + break; + } + port::AsmVolatilePause(); + if (tries > 100) { + std::this_thread::yield(); + } + } + } + + void unlock() { locked_.store(false, std::memory_order_release); } + + private: + std::atomic locked_; +}; + } // namespace rocksdb diff --git a/util/options.cc b/util/options.cc index 09ae10360..c925153fd 100644 --- a/util/options.cc +++ b/util/options.cc @@ -262,6 +262,10 @@ DBOptions::DBOptions() listeners(), enable_thread_tracking(false), delayed_write_rate(2 * 1024U * 1024U), + allow_concurrent_memtable_write(false), + enable_write_thread_adaptive_yield(false), + write_thread_max_yield_usec(100), + write_thread_slow_yield_usec(3), skip_stats_update_on_db_open(false), wal_recovery_mode(WALRecoveryMode::kTolerateCorruptedTailRecords), row_cache(nullptr), @@ -325,6 +329,11 @@ DBOptions::DBOptions(const Options& options) listeners(options.listeners), enable_thread_tracking(options.enable_thread_tracking), delayed_write_rate(options.delayed_write_rate), + allow_concurrent_memtable_write(options.allow_concurrent_memtable_write), + enable_write_thread_adaptive_yield( + options.enable_write_thread_adaptive_yield), + write_thread_max_yield_usec(options.write_thread_max_yield_usec), + write_thread_slow_yield_usec(options.write_thread_slow_yield_usec), skip_stats_update_on_db_open(options.skip_stats_update_on_db_open), wal_recovery_mode(options.wal_recovery_mode), row_cache(options.row_cache), @@ -435,6 +444,14 @@ void DBOptions::Dump(Logger* log) const { wal_recovery_mode); Header(log, " Options.enable_thread_tracking: %d", enable_thread_tracking); + Header(log, " Options.allow_concurrent_memtable_write: %d", + allow_concurrent_memtable_write); + Header(log, " Options.enable_write_thread_adaptive_yield: %d", + enable_write_thread_adaptive_yield); + Header(log, " Options.write_thread_max_yield_usec: %" PRIu64, + write_thread_max_yield_usec); + Header(log, " Options.write_thread_slow_yield_usec: %" PRIu64, + write_thread_slow_yield_usec); if (row_cache) { Header(log, " Options.row_cache: %" PRIu64, row_cache->GetCapacity()); diff --git a/util/skiplistrep.cc b/util/skiplistrep.cc index 83472c699..7108008a8 100644 --- a/util/skiplistrep.cc +++ b/util/skiplistrep.cc @@ -25,6 +25,8 @@ public: transform_(transform), lookahead_(lookahead) { } + virtual bool IsInsertConcurrentlySupported() const override { return true; } + virtual KeyHandle Allocate(const size_t len, char** buf) override { *buf = skip_list_.AllocateKey(len); return static_cast(*buf); @@ -36,6 +38,10 @@ public: skip_list_.Insert(static_cast(handle)); } + virtual void InsertConcurrently(KeyHandle handle) override { + skip_list_.InsertConcurrently(static_cast(handle)); + } + // Returns true iff an entry that compares equal to key is in the list. virtual bool Contains(const char* key) const override { return skip_list_.Contains(key);