From 6acbe0fc45fa212da6549f0e10c0301366ce9bf5 Mon Sep 17 00:00:00 2001 From: Dhruba Borthakur Date: Tue, 11 Jun 2013 14:23:58 -0700 Subject: [PATCH] Compact multiple memtables before flushing to storage. Summary: Merge multiple multiple memtables in memory before writing it out to a file in L0. There is a new config parameter min_write_buffer_number_to_merge that specifies the number of write buffers that should be merged together to a single file in storage. The system will not flush wrte buffers to storage unless at least these many buffers have accumulated in memory. The default value of this new parameter is 1, which means that a write buffer will be immediately flushed to disk as soon it is ready. Test Plan: make check Differential Revision: https://reviews.facebook.net/D11241 --- db/db_bench.cc | 17 ++++++ db/db_impl.cc | 40 +++++++++----- db/db_impl.h | 2 +- db/memtable.cc | 3 +- db/memtable.h | 11 ++++ db/memtablelist.cc | 113 ++++++++++++++++++++++---------------- db/memtablelist.h | 11 ++-- db/version_edit.h | 5 ++ include/leveldb/options.h | 9 +++ util/options.cc | 3 + 10 files changed, 145 insertions(+), 69 deletions(-) diff --git a/db/db_bench.cc b/db/db_bench.cc index 27fcd726d..e24cc1779 100644 --- a/db/db_bench.cc +++ b/db/db_bench.cc @@ -122,6 +122,16 @@ static int FLAGS_write_buffer_size = 0; // This is initialized to default value of 2 in "main" function. static int FLAGS_max_write_buffer_number = 0; +// The minimum number of write buffers that will be merged together +// before writing to storage. This is cheap because it is an +// in-memory merge. If this feature is not enabled, then all these +// write buffers are fushed to L0 as seperate files and this increases +// read amplification because a get request has to check in all of these +// files. Also, an in-memory merge may result in writing lesser +// data to storage if there are duplicate records in each of these +// individual write buffers. +static int FLAGS_min_write_buffer_number_to_merge = 0; + // The maximum number of concurrent background compactions // that can occur in parallel. // This is initialized to default value of 1 in "main" function. @@ -1122,6 +1132,8 @@ unique_ptr GenerateKeyFromInt(int v, const char* suffix = "") } options.write_buffer_size = FLAGS_write_buffer_size; options.max_write_buffer_number = FLAGS_max_write_buffer_number; + options.min_write_buffer_number_to_merge = + FLAGS_min_write_buffer_number_to_merge; options.max_background_compactions = FLAGS_max_background_compactions; options.block_size = FLAGS_block_size; options.filter_policy = filter_policy_; @@ -1999,6 +2011,8 @@ int main(int argc, char** argv) { FLAGS_write_buffer_size = leveldb::Options().write_buffer_size; FLAGS_max_write_buffer_number = leveldb::Options().max_write_buffer_number; + FLAGS_min_write_buffer_number_to_merge = + leveldb::Options().min_write_buffer_number_to_merge; FLAGS_open_files = leveldb::Options().max_open_files; FLAGS_max_background_compactions = leveldb::Options().max_background_compactions; @@ -2055,6 +2069,9 @@ int main(int argc, char** argv) { FLAGS_write_buffer_size = n; } else if (sscanf(argv[i], "--max_write_buffer_number=%d%c", &n, &junk) == 1) { FLAGS_max_write_buffer_number = n; + } else if (sscanf(argv[i], "--min_write_buffer_number_to_merge=%d%c", + &n, &junk) == 1) { + FLAGS_min_write_buffer_number_to_merge = n; } else if (sscanf(argv[i], "--max_background_compactions=%d%c", &n, &junk) == 1) { FLAGS_max_background_compactions = n; } else if (sscanf(argv[i], "--cache_size=%ld%c", &l, &junk) == 1) { diff --git a/db/db_impl.cc b/db/db_impl.cc index 90a6cae51..c8827abaa 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -126,6 +126,9 @@ Options SanitizeOptions(const std::string& dbname, ClipToRange(&result.write_buffer_size, ((size_t)64)<<10, ((size_t)64)<<30); ClipToRange(&result.block_size, 1<<10, 4<<20); + + result.min_write_buffer_number_to_merge = std::min( + result.min_write_buffer_number_to_merge, result.max_write_buffer_number-1); if (result.info_log == nullptr) { Status s = CreateLoggerFromOptions(dbname, result.db_log_dir, src.env, result, &result.info_log); @@ -217,7 +220,7 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname) DBImpl::~DBImpl() { // Wait for background work to finish - if (flush_on_destroy_) { + if (flush_on_destroy_ && mem_->GetFirstSequenceNumber() != 0) { FlushMemTable(FlushOptions()); } mutex_.Lock(); @@ -794,7 +797,7 @@ Status DBImpl::WriteLevel0TableForRecovery(MemTable* mem, VersionEdit* edit) { } -Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit, +Status DBImpl::WriteLevel0Table(std::vector &mems, VersionEdit* edit, uint64_t* filenumber) { mutex_.AssertHeld(); const uint64_t start_micros = env_->NowMicros(); @@ -802,15 +805,21 @@ Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit, meta.number = versions_->NewFileNumber(); *filenumber = meta.number; pending_outputs_.insert(meta.number); - Iterator* iter = mem->NewIterator(); + + std::vector list; + for (MemTable* m : mems) { + list.push_back(m->NewIterator()); + } + Iterator* iter = NewMergingIterator(&internal_comparator_, &list[0], + list.size()); const SequenceNumber newest_snapshot = snapshots_.GetNewest(); const SequenceNumber earliest_seqno_in_memtable = - mem->GetFirstSequenceNumber(); + mems[0]->GetFirstSequenceNumber(); Log(options_.info_log, "Level-0 flush table #%llu: started", (unsigned long long) meta.number); Version* base = versions_->current(); - base->Ref(); + base->Ref(); // it is likely that we do not need this reference Status s; { mutex_.Unlock(); @@ -868,7 +877,7 @@ Status DBImpl::CompactMemTable(bool* madeProgress) { mutex_.AssertHeld(); assert(imm_.size() != 0); - if (!imm_.IsFlushPending()) { + if (!imm_.IsFlushPending(options_.min_write_buffer_number_to_merge)) { Log(options_.info_log, "Memcompaction already in progress"); Status s = Status::IOError("Memcompaction already in progress"); return s; @@ -877,19 +886,21 @@ Status DBImpl::CompactMemTable(bool* madeProgress) { // Save the contents of the earliest memtable as a new Table // This will release and re-acquire the mutex. uint64_t file_number; - MemTable* m = imm_.PickMemtableToFlush(); - if (m == nullptr) { + std::vector mems; + imm_.PickMemtablesToFlush(&mems); + if (mems.empty()) { Log(options_.info_log, "Nothing in memstore to flush"); Status s = Status::IOError("Nothing in memstore to flush"); return s; } // record the logfile_number_ before we release the mutex + MemTable* m = mems[0]; VersionEdit* edit = m->GetEdits(); edit->SetPrevLogNumber(0); - edit->SetLogNumber(logfile_number_); // Earlier logs no longer needed + edit->SetLogNumber(m->GetLogNumber()); // Earlier logs no longer needed - Status s = WriteLevel0Table(m, edit, &file_number); + Status s = WriteLevel0Table(mems, edit, &file_number); if (s.ok() && shutting_down_.Acquire_Load()) { s = Status::IOError( @@ -899,7 +910,7 @@ Status DBImpl::CompactMemTable(bool* madeProgress) { // Replace immutable memtable with the generated Table s = imm_.InstallMemtableFlushResults( - m, versions_.get(), s, &mutex_, options_.info_log.get(), + mems, versions_.get(), s, &mutex_, options_.info_log.get(), file_number, pending_outputs_); if (s.ok()) { @@ -1256,7 +1267,7 @@ void DBImpl::MaybeScheduleCompaction() { // Already scheduled } else if (shutting_down_.Acquire_Load()) { // DB is being deleted; no more background compactions - } else if (!imm_.IsFlushPending() && + } else if (!imm_.IsFlushPending(options_.min_write_buffer_number_to_merge) && manual_compaction_ == nullptr && !versions_->NeedsCompaction()) { // No work to be done @@ -1327,7 +1338,7 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, *madeProgress = false; mutex_.AssertHeld(); - while (imm_.IsFlushPending()) { + while (imm_.IsFlushPending(options_.min_write_buffer_number_to_merge)) { Log(options_.info_log, "BackgroundCompaction doing CompactMemTable, compaction slots available %d", options_.max_background_compactions - bg_compaction_scheduled_); @@ -1691,7 +1702,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { if (imm_.imm_flush_needed.NoBarrier_Load() != nullptr) { const uint64_t imm_start = env_->NowMicros(); mutex_.Lock(); - if (imm_.IsFlushPending()) { + if (imm_.IsFlushPending(options_.min_write_buffer_number_to_merge)) { CompactMemTable(); bg_cv_.SignalAll(); // Wakeup MakeRoomForWrite() if necessary } @@ -2422,6 +2433,7 @@ Status DBImpl::MakeRoomForWrite(bool force) { lfile->SetPreallocationBlockSize(1.1 * options_.write_buffer_size); logfile_number_ = new_log_number; log_.reset(new log::Writer(std::move(lfile))); + mem_->SetLogNumber(logfile_number_); imm_.Add(mem_); mem_ = new MemTable(internal_comparator_, NumberLevels()); mem_->Ref(); diff --git a/db/db_impl.h b/db/db_impl.h index ca0095337..df04607d7 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -152,7 +152,7 @@ class DBImpl : public DB { // for the entire period. The second method WriteLevel0Table supports // concurrent flush memtables to storage. Status WriteLevel0TableForRecovery(MemTable* mem, VersionEdit* edit); - Status WriteLevel0Table(MemTable* mem, VersionEdit* edit, + Status WriteLevel0Table(std::vector &mems, VersionEdit* edit, uint64_t* filenumber); Status MakeRoomForWrite(bool force /* compact even if there is room? */); diff --git a/db/memtable.cc b/db/memtable.cc index 7a7c6bb7a..c6f8f26c2 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -27,7 +27,8 @@ MemTable::MemTable(const InternalKeyComparator& cmp, int numlevel) flush_completed_(false), file_number_(0), edit_(numlevel), - first_seqno_(0) { + first_seqno_(0), + mem_logfile_number_(0) { } MemTable::~MemTable() { diff --git a/db/memtable.h b/db/memtable.h index 8fb9ce943..6b3c68bf6 100644 --- a/db/memtable.h +++ b/db/memtable.h @@ -77,6 +77,14 @@ class MemTable { // into the memtable SequenceNumber GetFirstSequenceNumber() { return first_seqno_; } + // Returns the logfile number that can be safely deleted when this + // memstore is flushed to storage + uint64_t GetLogNumber() { return mem_logfile_number_; } + + // Sets the logfile number that can be safely deleted when this + // memstore is flushed to storage + void SetLogNumber(uint64_t num) { mem_logfile_number_ = num; } + private: ~MemTable(); // Private since only Unref() should be used to delete it @@ -108,6 +116,9 @@ class MemTable { // The sequence number of the kv that was inserted first SequenceNumber first_seqno_; + // The log files earlier than this number can be deleted. + uint64_t mem_logfile_number_; + // No copying allowed MemTable(const MemTable&); void operator=(const MemTable&); diff --git a/db/memtablelist.cc b/db/memtablelist.cc index c12995726..eb427eb16 100644 --- a/db/memtablelist.cc +++ b/db/memtablelist.cc @@ -43,17 +43,16 @@ int MemTableList::size() { // Returns true if there is at least one memtable on which flush has // not yet started. -bool MemTableList::IsFlushPending() { - if (num_flush_not_started_ > 0) { +bool MemTableList::IsFlushPending(int min_write_buffer_number_to_merge) { + if (num_flush_not_started_ >= min_write_buffer_number_to_merge) { assert(imm_flush_needed.NoBarrier_Load() != nullptr); return true; } return false; } -// Returns the earliest memtable that needs to be flushed. -// Returns null, if no such memtable exist. -MemTable* MemTableList::PickMemtableToFlush() { +// Returns the memtables that need to be flushed. +void MemTableList::PickMemtablesToFlush(std::vector* ret) { for (list::reverse_iterator it = memlist_.rbegin(); it != memlist_.rend(); it++) { MemTable* m = *it; @@ -64,37 +63,48 @@ MemTable* MemTableList::PickMemtableToFlush() { imm_flush_needed.Release_Store(nullptr); } m->flush_in_progress_ = true; // flushing will start very soon - return m; + ret->push_back(m); } } - return nullptr; } // Record a successful flush in the manifest file -Status MemTableList::InstallMemtableFlushResults(MemTable* m, +Status MemTableList::InstallMemtableFlushResults( + const std::vector &mems, VersionSet* vset, Status flushStatus, port::Mutex* mu, Logger* info_log, uint64_t file_number, std::set& pending_outputs) { mu->AssertHeld(); - assert(m->flush_in_progress_); - assert(m->file_number_ == 0); // If the flush was not successful, then just reset state. // Maybe a suceeding attempt to flush will be successful. if (!flushStatus.ok()) { - m->flush_in_progress_ = false; - m->flush_completed_ = false; - m->edit_.Clear(); - num_flush_not_started_++; - imm_flush_needed.Release_Store((void *)1); - pending_outputs.erase(file_number); + for (MemTable* m : mems) { + assert(m->flush_in_progress_); + assert(m->file_number_ == 0); + + m->flush_in_progress_ = false; + m->flush_completed_ = false; + m->edit_.Clear(); + num_flush_not_started_++; + imm_flush_needed.Release_Store((void *)1); + pending_outputs.erase(file_number); + } return flushStatus; } // flush was sucessful - m->flush_completed_ = true; - m->file_number_ = file_number; + bool first = true; + for (MemTable* m : mems) { + + // All the edits are associated with the first memtable of this batch. + assert(first || m->GetEdits()->NumEntries() == 0); + first = false; + + m->flush_completed_ = true; + m->file_number_ = file_number; + } // if some other thread is already commiting, then return Status s; @@ -106,12 +116,15 @@ Status MemTableList::InstallMemtableFlushResults(MemTable* m, commit_in_progress_ = true; // scan all memtables from the earliest, and commit those - // (in that order) that have finished flushing. - while (!memlist_.empty()) { - m = memlist_.back(); // get the last element + // (in that order) that have finished flushing. Memetables + // are always committed in the order that they were created. + while (!memlist_.empty() && s.ok()) { + MemTable* m = memlist_.back(); // get the last element if (!m->flush_completed_) { break; } + first = true; + Log(info_log, "Level-0 commit table #%llu: started", (unsigned long long)m->file_number_); @@ -119,33 +132,39 @@ Status MemTableList::InstallMemtableFlushResults(MemTable* m, // this can release and reacquire the mutex. s = vset->LogAndApply(&m->edit_, mu); - if (s.ok()) { // commit new state - Log(info_log, "Level-0 commit table #%llu: done", - (unsigned long long)m->file_number_); - memlist_.remove(m); - assert(m->file_number_ > 0); + // All the later memtables that have the same filenum + // are part of the same batch. They can be committed now. + do { + if (s.ok()) { // commit new state + Log(info_log, "Level-0 commit table #%llu: done %s", + (unsigned long long)m->file_number_, + first ? "": "bulk"); + memlist_.remove(m); + assert(m->file_number_ > 0); - // pending_outputs can be cleared only after the newly created file - // has been written to a committed version so that other concurrently - // executing compaction threads do not mistakenly assume that this - // file is not live. - pending_outputs.erase(m->file_number_); - m->Unref(); - size_--; - } else { - //commit failed. setup state so that we can flush again. - Log(info_log, "Level-0 commit table #%llu: failed", - (unsigned long long)m->file_number_); - m->flush_completed_ = false; - m->flush_in_progress_ = false; - m->edit_.Clear(); - num_flush_not_started_++; - pending_outputs.erase(m->file_number_); - m->file_number_ = 0; - imm_flush_needed.Release_Store((void *)1); - s = Status::IOError("Unable to commit flushed memtable"); - break; - } + // pending_outputs can be cleared only after the newly created file + // has been written to a committed version so that other concurrently + // executing compaction threads do not mistakenly assume that this + // file is not live. + pending_outputs.erase(m->file_number_); + m->Unref(); + size_--; + } else { + //commit failed. setup state so that we can flush again. + Log(info_log, "Level-0 commit table #%llu: failed", + (unsigned long long)m->file_number_); + m->flush_completed_ = false; + m->flush_in_progress_ = false; + m->edit_.Clear(); + num_flush_not_started_++; + pending_outputs.erase(m->file_number_); + m->file_number_ = 0; + imm_flush_needed.Release_Store((void *)1); + s = Status::IOError("Unable to commit flushed memtable"); + } + first = false; + } while (!memlist_.empty() && (m = memlist_.back()) && + m->file_number_ == file_number); } commit_in_progress_ = false; return s; diff --git a/db/memtablelist.h b/db/memtablelist.h index de27150ef..31831deac 100644 --- a/db/memtablelist.h +++ b/db/memtablelist.h @@ -18,7 +18,7 @@ class Mutex; class MemTableListIterator; // -// This class stores refeernces to all the immutable memtables. +// This class stores references to all the immutable memtables. // The memtables are flushed to L0 as soon as possible and in // any order. If there are more than one immutable memtable, their // flushes can occur concurrently. However, they are 'committed' @@ -49,14 +49,13 @@ class MemTableList { // Returns true if there is at least one memtable on which flush has // not yet started. - bool IsFlushPending(); + bool IsFlushPending(int min_write_buffer_number_to_merge); - // Returns the earliest memtable that needs to be flushed. - // Returns null, if no such memtable exist. - MemTable* PickMemtableToFlush(); + // Returns the earliest memtables that needs to be flushed. + void PickMemtablesToFlush(std::vector* mems); // Commit a successful flush in the manifest file - Status InstallMemtableFlushResults(MemTable* m, + Status InstallMemtableFlushResults(const std::vector &m, VersionSet* vset, Status flushStatus, port::Mutex* mu, Logger* info_log, uint64_t file_number, diff --git a/db/version_edit.h b/db/version_edit.h index 3776c2179..2743e9e0d 100644 --- a/db/version_edit.h +++ b/db/version_edit.h @@ -81,6 +81,11 @@ class VersionEdit { deleted_files_.insert(std::make_pair(level, file)); } + // Number of edits + int NumEntries() { + return new_files_.size() + deleted_files_.size(); + } + void EncodeTo(std::string* dst) const; Status DecodeFrom(const Slice& src); diff --git a/include/leveldb/options.h b/include/leveldb/options.h index 92b9d3726..c2ad73267 100644 --- a/include/leveldb/options.h +++ b/include/leveldb/options.h @@ -131,6 +131,15 @@ struct Options { // Default: 2 int max_write_buffer_number; + // The minimum number of write buffers that will be merged together + // before writing to storage. If set to 1, then + // all write buffers are fushed to L0 as individual files and this increases + // read amplification because a get request has to check in all of these + // files. Also, an in-memory merge may result in writing lesser + // data to storage if there are duplicate records in each of these + // individual write buffers. Default: 1 + int min_write_buffer_number_to_merge; + // Number of open files that can be used by the DB. You may need to // increase this if your database has a large working set (budget // one open file per 2MB of working set). diff --git a/util/options.cc b/util/options.cc index 022299caf..92a064cde 100644 --- a/util/options.cc +++ b/util/options.cc @@ -26,6 +26,7 @@ Options::Options() info_log(nullptr), write_buffer_size(4<<20), max_write_buffer_number(2), + min_write_buffer_number_to_merge(1), max_open_files(1000), block_size(4096), block_restart_interval(16), @@ -127,6 +128,8 @@ Options::Dump(Logger* log) const Log(log," Options.allow_readahead: %d", allow_readahead); Log(log," Options.allow_mmap_reads: %d", allow_mmap_reads); Log(log," Options.allow_mmap_writes: %d", allow_mmap_writes); + Log(log," Options.min_write_buffer_number_to_merge: %d", + min_write_buffer_number_to_merge); Log(log," Options.allow_readahead_compactions: %d", allow_readahead_compactions); Log(log," Options.purge_redundant_kvs_while_flush: %d",