diff --git a/db/db_bench.cc b/db/db_bench.cc index 27fcd726d..e24cc1779 100644 --- a/db/db_bench.cc +++ b/db/db_bench.cc @@ -122,6 +122,16 @@ static int FLAGS_write_buffer_size = 0; // This is initialized to default value of 2 in "main" function. static int FLAGS_max_write_buffer_number = 0; +// The minimum number of write buffers that will be merged together +// before writing to storage. This is cheap because it is an +// in-memory merge. If this feature is not enabled, then all these +// write buffers are fushed to L0 as seperate files and this increases +// read amplification because a get request has to check in all of these +// files. Also, an in-memory merge may result in writing lesser +// data to storage if there are duplicate records in each of these +// individual write buffers. +static int FLAGS_min_write_buffer_number_to_merge = 0; + // The maximum number of concurrent background compactions // that can occur in parallel. // This is initialized to default value of 1 in "main" function. @@ -1122,6 +1132,8 @@ unique_ptr GenerateKeyFromInt(int v, const char* suffix = "") } options.write_buffer_size = FLAGS_write_buffer_size; options.max_write_buffer_number = FLAGS_max_write_buffer_number; + options.min_write_buffer_number_to_merge = + FLAGS_min_write_buffer_number_to_merge; options.max_background_compactions = FLAGS_max_background_compactions; options.block_size = FLAGS_block_size; options.filter_policy = filter_policy_; @@ -1999,6 +2011,8 @@ int main(int argc, char** argv) { FLAGS_write_buffer_size = leveldb::Options().write_buffer_size; FLAGS_max_write_buffer_number = leveldb::Options().max_write_buffer_number; + FLAGS_min_write_buffer_number_to_merge = + leveldb::Options().min_write_buffer_number_to_merge; FLAGS_open_files = leveldb::Options().max_open_files; FLAGS_max_background_compactions = leveldb::Options().max_background_compactions; @@ -2055,6 +2069,9 @@ int main(int argc, char** argv) { FLAGS_write_buffer_size = n; } else if (sscanf(argv[i], "--max_write_buffer_number=%d%c", &n, &junk) == 1) { FLAGS_max_write_buffer_number = n; + } else if (sscanf(argv[i], "--min_write_buffer_number_to_merge=%d%c", + &n, &junk) == 1) { + FLAGS_min_write_buffer_number_to_merge = n; } else if (sscanf(argv[i], "--max_background_compactions=%d%c", &n, &junk) == 1) { FLAGS_max_background_compactions = n; } else if (sscanf(argv[i], "--cache_size=%ld%c", &l, &junk) == 1) { diff --git a/db/db_impl.cc b/db/db_impl.cc index 90a6cae51..c8827abaa 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -126,6 +126,9 @@ Options SanitizeOptions(const std::string& dbname, ClipToRange(&result.write_buffer_size, ((size_t)64)<<10, ((size_t)64)<<30); ClipToRange(&result.block_size, 1<<10, 4<<20); + + result.min_write_buffer_number_to_merge = std::min( + result.min_write_buffer_number_to_merge, result.max_write_buffer_number-1); if (result.info_log == nullptr) { Status s = CreateLoggerFromOptions(dbname, result.db_log_dir, src.env, result, &result.info_log); @@ -217,7 +220,7 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname) DBImpl::~DBImpl() { // Wait for background work to finish - if (flush_on_destroy_) { + if (flush_on_destroy_ && mem_->GetFirstSequenceNumber() != 0) { FlushMemTable(FlushOptions()); } mutex_.Lock(); @@ -794,7 +797,7 @@ Status DBImpl::WriteLevel0TableForRecovery(MemTable* mem, VersionEdit* edit) { } -Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit, +Status DBImpl::WriteLevel0Table(std::vector &mems, VersionEdit* edit, uint64_t* filenumber) { mutex_.AssertHeld(); const uint64_t start_micros = env_->NowMicros(); @@ -802,15 +805,21 @@ Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit, meta.number = versions_->NewFileNumber(); *filenumber = meta.number; pending_outputs_.insert(meta.number); - Iterator* iter = mem->NewIterator(); + + std::vector list; + for (MemTable* m : mems) { + list.push_back(m->NewIterator()); + } + Iterator* iter = NewMergingIterator(&internal_comparator_, &list[0], + list.size()); const SequenceNumber newest_snapshot = snapshots_.GetNewest(); const SequenceNumber earliest_seqno_in_memtable = - mem->GetFirstSequenceNumber(); + mems[0]->GetFirstSequenceNumber(); Log(options_.info_log, "Level-0 flush table #%llu: started", (unsigned long long) meta.number); Version* base = versions_->current(); - base->Ref(); + base->Ref(); // it is likely that we do not need this reference Status s; { mutex_.Unlock(); @@ -868,7 +877,7 @@ Status DBImpl::CompactMemTable(bool* madeProgress) { mutex_.AssertHeld(); assert(imm_.size() != 0); - if (!imm_.IsFlushPending()) { + if (!imm_.IsFlushPending(options_.min_write_buffer_number_to_merge)) { Log(options_.info_log, "Memcompaction already in progress"); Status s = Status::IOError("Memcompaction already in progress"); return s; @@ -877,19 +886,21 @@ Status DBImpl::CompactMemTable(bool* madeProgress) { // Save the contents of the earliest memtable as a new Table // This will release and re-acquire the mutex. uint64_t file_number; - MemTable* m = imm_.PickMemtableToFlush(); - if (m == nullptr) { + std::vector mems; + imm_.PickMemtablesToFlush(&mems); + if (mems.empty()) { Log(options_.info_log, "Nothing in memstore to flush"); Status s = Status::IOError("Nothing in memstore to flush"); return s; } // record the logfile_number_ before we release the mutex + MemTable* m = mems[0]; VersionEdit* edit = m->GetEdits(); edit->SetPrevLogNumber(0); - edit->SetLogNumber(logfile_number_); // Earlier logs no longer needed + edit->SetLogNumber(m->GetLogNumber()); // Earlier logs no longer needed - Status s = WriteLevel0Table(m, edit, &file_number); + Status s = WriteLevel0Table(mems, edit, &file_number); if (s.ok() && shutting_down_.Acquire_Load()) { s = Status::IOError( @@ -899,7 +910,7 @@ Status DBImpl::CompactMemTable(bool* madeProgress) { // Replace immutable memtable with the generated Table s = imm_.InstallMemtableFlushResults( - m, versions_.get(), s, &mutex_, options_.info_log.get(), + mems, versions_.get(), s, &mutex_, options_.info_log.get(), file_number, pending_outputs_); if (s.ok()) { @@ -1256,7 +1267,7 @@ void DBImpl::MaybeScheduleCompaction() { // Already scheduled } else if (shutting_down_.Acquire_Load()) { // DB is being deleted; no more background compactions - } else if (!imm_.IsFlushPending() && + } else if (!imm_.IsFlushPending(options_.min_write_buffer_number_to_merge) && manual_compaction_ == nullptr && !versions_->NeedsCompaction()) { // No work to be done @@ -1327,7 +1338,7 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, *madeProgress = false; mutex_.AssertHeld(); - while (imm_.IsFlushPending()) { + while (imm_.IsFlushPending(options_.min_write_buffer_number_to_merge)) { Log(options_.info_log, "BackgroundCompaction doing CompactMemTable, compaction slots available %d", options_.max_background_compactions - bg_compaction_scheduled_); @@ -1691,7 +1702,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { if (imm_.imm_flush_needed.NoBarrier_Load() != nullptr) { const uint64_t imm_start = env_->NowMicros(); mutex_.Lock(); - if (imm_.IsFlushPending()) { + if (imm_.IsFlushPending(options_.min_write_buffer_number_to_merge)) { CompactMemTable(); bg_cv_.SignalAll(); // Wakeup MakeRoomForWrite() if necessary } @@ -2422,6 +2433,7 @@ Status DBImpl::MakeRoomForWrite(bool force) { lfile->SetPreallocationBlockSize(1.1 * options_.write_buffer_size); logfile_number_ = new_log_number; log_.reset(new log::Writer(std::move(lfile))); + mem_->SetLogNumber(logfile_number_); imm_.Add(mem_); mem_ = new MemTable(internal_comparator_, NumberLevels()); mem_->Ref(); diff --git a/db/db_impl.h b/db/db_impl.h index ca0095337..df04607d7 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -152,7 +152,7 @@ class DBImpl : public DB { // for the entire period. The second method WriteLevel0Table supports // concurrent flush memtables to storage. Status WriteLevel0TableForRecovery(MemTable* mem, VersionEdit* edit); - Status WriteLevel0Table(MemTable* mem, VersionEdit* edit, + Status WriteLevel0Table(std::vector &mems, VersionEdit* edit, uint64_t* filenumber); Status MakeRoomForWrite(bool force /* compact even if there is room? */); diff --git a/db/memtable.cc b/db/memtable.cc index 7a7c6bb7a..c6f8f26c2 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -27,7 +27,8 @@ MemTable::MemTable(const InternalKeyComparator& cmp, int numlevel) flush_completed_(false), file_number_(0), edit_(numlevel), - first_seqno_(0) { + first_seqno_(0), + mem_logfile_number_(0) { } MemTable::~MemTable() { diff --git a/db/memtable.h b/db/memtable.h index 8fb9ce943..6b3c68bf6 100644 --- a/db/memtable.h +++ b/db/memtable.h @@ -77,6 +77,14 @@ class MemTable { // into the memtable SequenceNumber GetFirstSequenceNumber() { return first_seqno_; } + // Returns the logfile number that can be safely deleted when this + // memstore is flushed to storage + uint64_t GetLogNumber() { return mem_logfile_number_; } + + // Sets the logfile number that can be safely deleted when this + // memstore is flushed to storage + void SetLogNumber(uint64_t num) { mem_logfile_number_ = num; } + private: ~MemTable(); // Private since only Unref() should be used to delete it @@ -108,6 +116,9 @@ class MemTable { // The sequence number of the kv that was inserted first SequenceNumber first_seqno_; + // The log files earlier than this number can be deleted. + uint64_t mem_logfile_number_; + // No copying allowed MemTable(const MemTable&); void operator=(const MemTable&); diff --git a/db/memtablelist.cc b/db/memtablelist.cc index c12995726..eb427eb16 100644 --- a/db/memtablelist.cc +++ b/db/memtablelist.cc @@ -43,17 +43,16 @@ int MemTableList::size() { // Returns true if there is at least one memtable on which flush has // not yet started. -bool MemTableList::IsFlushPending() { - if (num_flush_not_started_ > 0) { +bool MemTableList::IsFlushPending(int min_write_buffer_number_to_merge) { + if (num_flush_not_started_ >= min_write_buffer_number_to_merge) { assert(imm_flush_needed.NoBarrier_Load() != nullptr); return true; } return false; } -// Returns the earliest memtable that needs to be flushed. -// Returns null, if no such memtable exist. -MemTable* MemTableList::PickMemtableToFlush() { +// Returns the memtables that need to be flushed. +void MemTableList::PickMemtablesToFlush(std::vector* ret) { for (list::reverse_iterator it = memlist_.rbegin(); it != memlist_.rend(); it++) { MemTable* m = *it; @@ -64,37 +63,48 @@ MemTable* MemTableList::PickMemtableToFlush() { imm_flush_needed.Release_Store(nullptr); } m->flush_in_progress_ = true; // flushing will start very soon - return m; + ret->push_back(m); } } - return nullptr; } // Record a successful flush in the manifest file -Status MemTableList::InstallMemtableFlushResults(MemTable* m, +Status MemTableList::InstallMemtableFlushResults( + const std::vector &mems, VersionSet* vset, Status flushStatus, port::Mutex* mu, Logger* info_log, uint64_t file_number, std::set& pending_outputs) { mu->AssertHeld(); - assert(m->flush_in_progress_); - assert(m->file_number_ == 0); // If the flush was not successful, then just reset state. // Maybe a suceeding attempt to flush will be successful. if (!flushStatus.ok()) { - m->flush_in_progress_ = false; - m->flush_completed_ = false; - m->edit_.Clear(); - num_flush_not_started_++; - imm_flush_needed.Release_Store((void *)1); - pending_outputs.erase(file_number); + for (MemTable* m : mems) { + assert(m->flush_in_progress_); + assert(m->file_number_ == 0); + + m->flush_in_progress_ = false; + m->flush_completed_ = false; + m->edit_.Clear(); + num_flush_not_started_++; + imm_flush_needed.Release_Store((void *)1); + pending_outputs.erase(file_number); + } return flushStatus; } // flush was sucessful - m->flush_completed_ = true; - m->file_number_ = file_number; + bool first = true; + for (MemTable* m : mems) { + + // All the edits are associated with the first memtable of this batch. + assert(first || m->GetEdits()->NumEntries() == 0); + first = false; + + m->flush_completed_ = true; + m->file_number_ = file_number; + } // if some other thread is already commiting, then return Status s; @@ -106,12 +116,15 @@ Status MemTableList::InstallMemtableFlushResults(MemTable* m, commit_in_progress_ = true; // scan all memtables from the earliest, and commit those - // (in that order) that have finished flushing. - while (!memlist_.empty()) { - m = memlist_.back(); // get the last element + // (in that order) that have finished flushing. Memetables + // are always committed in the order that they were created. + while (!memlist_.empty() && s.ok()) { + MemTable* m = memlist_.back(); // get the last element if (!m->flush_completed_) { break; } + first = true; + Log(info_log, "Level-0 commit table #%llu: started", (unsigned long long)m->file_number_); @@ -119,33 +132,39 @@ Status MemTableList::InstallMemtableFlushResults(MemTable* m, // this can release and reacquire the mutex. s = vset->LogAndApply(&m->edit_, mu); - if (s.ok()) { // commit new state - Log(info_log, "Level-0 commit table #%llu: done", - (unsigned long long)m->file_number_); - memlist_.remove(m); - assert(m->file_number_ > 0); + // All the later memtables that have the same filenum + // are part of the same batch. They can be committed now. + do { + if (s.ok()) { // commit new state + Log(info_log, "Level-0 commit table #%llu: done %s", + (unsigned long long)m->file_number_, + first ? "": "bulk"); + memlist_.remove(m); + assert(m->file_number_ > 0); - // pending_outputs can be cleared only after the newly created file - // has been written to a committed version so that other concurrently - // executing compaction threads do not mistakenly assume that this - // file is not live. - pending_outputs.erase(m->file_number_); - m->Unref(); - size_--; - } else { - //commit failed. setup state so that we can flush again. - Log(info_log, "Level-0 commit table #%llu: failed", - (unsigned long long)m->file_number_); - m->flush_completed_ = false; - m->flush_in_progress_ = false; - m->edit_.Clear(); - num_flush_not_started_++; - pending_outputs.erase(m->file_number_); - m->file_number_ = 0; - imm_flush_needed.Release_Store((void *)1); - s = Status::IOError("Unable to commit flushed memtable"); - break; - } + // pending_outputs can be cleared only after the newly created file + // has been written to a committed version so that other concurrently + // executing compaction threads do not mistakenly assume that this + // file is not live. + pending_outputs.erase(m->file_number_); + m->Unref(); + size_--; + } else { + //commit failed. setup state so that we can flush again. + Log(info_log, "Level-0 commit table #%llu: failed", + (unsigned long long)m->file_number_); + m->flush_completed_ = false; + m->flush_in_progress_ = false; + m->edit_.Clear(); + num_flush_not_started_++; + pending_outputs.erase(m->file_number_); + m->file_number_ = 0; + imm_flush_needed.Release_Store((void *)1); + s = Status::IOError("Unable to commit flushed memtable"); + } + first = false; + } while (!memlist_.empty() && (m = memlist_.back()) && + m->file_number_ == file_number); } commit_in_progress_ = false; return s; diff --git a/db/memtablelist.h b/db/memtablelist.h index de27150ef..31831deac 100644 --- a/db/memtablelist.h +++ b/db/memtablelist.h @@ -18,7 +18,7 @@ class Mutex; class MemTableListIterator; // -// This class stores refeernces to all the immutable memtables. +// This class stores references to all the immutable memtables. // The memtables are flushed to L0 as soon as possible and in // any order. If there are more than one immutable memtable, their // flushes can occur concurrently. However, they are 'committed' @@ -49,14 +49,13 @@ class MemTableList { // Returns true if there is at least one memtable on which flush has // not yet started. - bool IsFlushPending(); + bool IsFlushPending(int min_write_buffer_number_to_merge); - // Returns the earliest memtable that needs to be flushed. - // Returns null, if no such memtable exist. - MemTable* PickMemtableToFlush(); + // Returns the earliest memtables that needs to be flushed. + void PickMemtablesToFlush(std::vector* mems); // Commit a successful flush in the manifest file - Status InstallMemtableFlushResults(MemTable* m, + Status InstallMemtableFlushResults(const std::vector &m, VersionSet* vset, Status flushStatus, port::Mutex* mu, Logger* info_log, uint64_t file_number, diff --git a/db/version_edit.h b/db/version_edit.h index 3776c2179..2743e9e0d 100644 --- a/db/version_edit.h +++ b/db/version_edit.h @@ -81,6 +81,11 @@ class VersionEdit { deleted_files_.insert(std::make_pair(level, file)); } + // Number of edits + int NumEntries() { + return new_files_.size() + deleted_files_.size(); + } + void EncodeTo(std::string* dst) const; Status DecodeFrom(const Slice& src); diff --git a/include/leveldb/options.h b/include/leveldb/options.h index 92b9d3726..c2ad73267 100644 --- a/include/leveldb/options.h +++ b/include/leveldb/options.h @@ -131,6 +131,15 @@ struct Options { // Default: 2 int max_write_buffer_number; + // The minimum number of write buffers that will be merged together + // before writing to storage. If set to 1, then + // all write buffers are fushed to L0 as individual files and this increases + // read amplification because a get request has to check in all of these + // files. Also, an in-memory merge may result in writing lesser + // data to storage if there are duplicate records in each of these + // individual write buffers. Default: 1 + int min_write_buffer_number_to_merge; + // Number of open files that can be used by the DB. You may need to // increase this if your database has a large working set (budget // one open file per 2MB of working set). diff --git a/util/options.cc b/util/options.cc index 022299caf..92a064cde 100644 --- a/util/options.cc +++ b/util/options.cc @@ -26,6 +26,7 @@ Options::Options() info_log(nullptr), write_buffer_size(4<<20), max_write_buffer_number(2), + min_write_buffer_number_to_merge(1), max_open_files(1000), block_size(4096), block_restart_interval(16), @@ -127,6 +128,8 @@ Options::Dump(Logger* log) const Log(log," Options.allow_readahead: %d", allow_readahead); Log(log," Options.allow_mmap_reads: %d", allow_mmap_reads); Log(log," Options.allow_mmap_writes: %d", allow_mmap_writes); + Log(log," Options.min_write_buffer_number_to_merge: %d", + min_write_buffer_number_to_merge); Log(log," Options.allow_readahead_compactions: %d", allow_readahead_compactions); Log(log," Options.purge_redundant_kvs_while_flush: %d",