diff --git a/db/memtable.cc b/db/memtable.cc index 8b6f49a7c..d4e2fcbff 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -352,7 +352,8 @@ uint64_t MemTable::ApproximateSize(const Slice& start_ikey, void MemTable::Add(SequenceNumber s, ValueType type, const Slice& key, /* user key */ - const Slice& value, bool allow_concurrent) { + const Slice& value, bool allow_concurrent, + MemTablePostProcessInfo* post_process_info) { // Format of an entry is concatenation of: // key_size : varint32 of internal_key.size() // key bytes : char[internal_key.size()] @@ -406,13 +407,16 @@ void MemTable::Add(SequenceNumber s, ValueType type, } assert(first_seqno_.load() >= earliest_seqno_.load()); } + assert(post_process_info == nullptr); + UpdateFlushState(); } else { table_->InsertConcurrently(handle); - num_entries_.fetch_add(1, std::memory_order_relaxed); - data_size_.fetch_add(encoded_len, std::memory_order_relaxed); + assert(post_process_info != nullptr); + post_process_info->num_entries++; + post_process_info->data_size += encoded_len; if (type == kTypeDeletion) { - num_deletes_.fetch_add(1, std::memory_order_relaxed); + post_process_info->num_deletes++; } if (prefix_bloom_) { @@ -432,8 +436,6 @@ void MemTable::Add(SequenceNumber s, ValueType type, !first_seqno_.compare_exchange_weak(cur_earliest_seqno, s)) { } } - - UpdateFlushState(); } // Callback from MemTable::Get() diff --git a/db/memtable.h b/db/memtable.h index e30c99976..d6d2a735f 100644 --- a/db/memtable.h +++ b/db/memtable.h @@ -54,6 +54,15 @@ struct MemTableOptions { Logger* info_log; }; +// Batched counters to updated when inserting keys in one write batch. +// In post process of the write batch, these can be updated together. +// Only used in concurrent memtable insert case. +struct MemTablePostProcessInfo { + uint64_t data_size = 0; + uint64_t num_entries = 0; + uint64_t num_deletes = 0; +}; + // Note: Many of the methods in this class have comments indicating that // external synchromization is required as these methods are not thread-safe. // It is up to higher layers of code to decide how to prevent concurrent @@ -157,7 +166,8 @@ class MemTable { // REQUIRES: if allow_concurrent = false, external synchronization to prevent // simultaneous operations on the same MemTable. void Add(SequenceNumber seq, ValueType type, const Slice& key, - const Slice& value, bool allow_concurrent = false); + const Slice& value, bool allow_concurrent = false, + MemTablePostProcessInfo* post_process_info = nullptr); // If memtable contains a value for key, store it in *value and return true. // If memtable contains a deletion for key, store a NotFound() error @@ -216,6 +226,17 @@ class MemTable { // key in the memtable. size_t CountSuccessiveMergeEntries(const LookupKey& key); + // Update counters and flush status after inserting a whole write batch + // Used in concurrent memtable inserts. + void BatchPostProcess(const MemTablePostProcessInfo& update_counters) { + num_entries_.fetch_add(update_counters.num_entries, + std::memory_order_relaxed); + data_size_.fetch_add(update_counters.data_size, std::memory_order_relaxed); + num_deletes_.fetch_add(update_counters.num_deletes, + std::memory_order_relaxed); + UpdateFlushState(); + } + // Get total number of entries in the mem table. // REQUIRES: external synchronization to prevent simultaneous // operations on the same MemTable (unless this Memtable is immutable). diff --git a/db/write_batch.cc b/db/write_batch.cc index e42abc9a4..4d77832aa 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -31,6 +31,7 @@ #include "rocksdb/write_batch.h" +#include #include #include #include @@ -693,6 +694,8 @@ class MemTableInserter : public WriteBatch::Handler { uint64_t log_number_ref_; DBImpl* db_; const bool concurrent_memtable_writes_; + typedef std::map MemPostInfoMap; + MemPostInfoMap mem_post_info_map_; // current recovered transaction we are rebuilding (recovery) WriteBatch* rebuilding_trx_; @@ -718,6 +721,12 @@ class MemTableInserter : public WriteBatch::Handler { SequenceNumber get_final_sequence() { return sequence_; } + void PostProcess() { + for (auto& pair : mem_post_info_map_) { + pair.first->BatchPostProcess(pair.second); + } + } + bool SeekToColumnFamily(uint32_t column_family_id, Status* s) { // If we are in a concurrent mode, it is the caller's responsibility // to clone the original ColumnFamilyMemTables so that each thread @@ -770,7 +779,8 @@ class MemTableInserter : public WriteBatch::Handler { MemTable* mem = cf_mems_->GetMemTable(); auto* moptions = mem->GetMemTableOptions(); if (!moptions->inplace_update_support) { - mem->Add(sequence_, kTypeValue, key, value, concurrent_memtable_writes_); + mem->Add(sequence_, kTypeValue, key, value, concurrent_memtable_writes_, + get_post_process_info(mem)); } else if (moptions->inplace_callback == nullptr) { assert(!concurrent_memtable_writes_); mem->Update(sequence_, key, value); @@ -821,7 +831,8 @@ class MemTableInserter : public WriteBatch::Handler { Status DeleteImpl(uint32_t column_family_id, const Slice& key, ValueType delete_type) { MemTable* mem = cf_mems_->GetMemTable(); - mem->Add(sequence_, delete_type, key, Slice(), concurrent_memtable_writes_); + mem->Add(sequence_, delete_type, key, Slice(), concurrent_memtable_writes_, + get_post_process_info(mem)); sequence_++; CheckMemtableFull(); return Status::OK(); @@ -1046,6 +1057,15 @@ class MemTableInserter : public WriteBatch::Handler { return Status::OK(); } + + private: + MemTablePostProcessInfo* get_post_process_info(MemTable* mem) { + if (!concurrent_memtable_writes_) { + // No need to batch counters locally if we don't use concurrent mode. + return nullptr; + } + return &mem_post_info_map_[mem]; + } }; // This function can only be called in these conditions: @@ -1087,7 +1107,11 @@ Status WriteBatchInternal::InsertInto(WriteThread::Writer* writer, concurrent_memtable_writes); assert(writer->ShouldWriteToMemtable()); inserter.set_log_number_ref(writer->log_ref); - return writer->batch->Iterate(&inserter); + Status s = writer->batch->Iterate(&inserter); + if (concurrent_memtable_writes) { + inserter.PostProcess(); + } + return s; } Status WriteBatchInternal::InsertInto(const WriteBatch* batch, @@ -1104,6 +1128,9 @@ Status WriteBatchInternal::InsertInto(const WriteBatch* batch, if (last_seq_used != nullptr) { *last_seq_used = inserter.get_final_sequence(); } + if (concurrent_memtable_writes) { + inserter.PostProcess(); + } return s; }