Create log::Writer out of DB Mutex

Summary: Our measurement shows that sometimes new log::Write's constructor can take hundreds of milliseconds. It's unclear why but just simply move it out of DB mutex.

Test Plan: make all check

Reviewers: haobo, ljin, igor

Reviewed By: haobo

CC: nkg-, yhchiang, leveldb

Differential Revision: https://reviews.facebook.net/D17487
This commit is contained in:
sdong 2014-04-03 22:04:42 -07:00
parent c90d446ee7
commit ea0198fe9a
2 changed files with 18 additions and 5 deletions

View File

@ -3703,7 +3703,10 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
// May temporarily unlock and wait. // May temporarily unlock and wait.
SuperVersion* superversion_to_free = nullptr; SuperVersion* superversion_to_free = nullptr;
Status status = MakeRoomForWrite(my_batch == nullptr, &superversion_to_free); log::Writer* old_log = nullptr;
Status status = MakeRoomForWrite(my_batch == nullptr,
&superversion_to_free,
&old_log);
uint64_t last_sequence = versions_->LastSequence(); uint64_t last_sequence = versions_->LastSequence();
Writer* last_writer = &w; Writer* last_writer = &w;
if (status.ok() && my_batch != nullptr) { // nullptr batch is for compactions if (status.ok() && my_batch != nullptr) { // nullptr batch is for compactions
@ -3804,6 +3807,7 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
writers_.front()->cv.Signal(); writers_.front()->cv.Signal();
} }
mutex_.Unlock(); mutex_.Unlock();
delete old_log;
delete superversion_to_free; delete superversion_to_free;
BumpPerfTime(&perf_context.write_pre_and_post_process_time, BumpPerfTime(&perf_context.write_pre_and_post_process_time,
&pre_post_process_timer); &pre_post_process_timer);
@ -3893,7 +3897,8 @@ uint64_t DBImpl::SlowdownAmount(int n, double bottom, double top) {
// REQUIRES: mutex_ is held // REQUIRES: mutex_ is held
// REQUIRES: this thread is currently at the front of the writer queue // REQUIRES: this thread is currently at the front of the writer queue
Status DBImpl::MakeRoomForWrite(bool force, Status DBImpl::MakeRoomForWrite(bool force,
SuperVersion** superversion_to_free) { SuperVersion** superversion_to_free,
log::Writer** old_log) {
mutex_.AssertHeld(); mutex_.AssertHeld();
assert(!writers_.empty()); assert(!writers_.empty());
bool allow_delay = !force; bool allow_delay = !force;
@ -4015,6 +4020,7 @@ Status DBImpl::MakeRoomForWrite(bool force,
} else { } else {
unique_ptr<WritableFile> lfile; unique_ptr<WritableFile> lfile;
log::Writer* new_log = nullptr;
MemTable* new_mem = nullptr; MemTable* new_mem = nullptr;
// Attempt to switch to a new memtable and trigger flush of old. // Attempt to switch to a new memtable and trigger flush of old.
@ -4032,6 +4038,7 @@ Status DBImpl::MakeRoomForWrite(bool force,
// Our final size should be less than write_buffer_size // Our final size should be less than write_buffer_size
// (compression, etc) but err on the side of caution. // (compression, etc) but err on the side of caution.
lfile->SetPreallocationBlockSize(1.1 * options_.write_buffer_size); lfile->SetPreallocationBlockSize(1.1 * options_.write_buffer_size);
new_log = new log::Writer(std::move(lfile));
new_mem = new MemTable(internal_comparator_, options_); new_mem = new MemTable(internal_comparator_, options_);
new_superversion = new SuperVersion(); new_superversion = new SuperVersion();
} }
@ -4044,10 +4051,13 @@ Status DBImpl::MakeRoomForWrite(bool force,
// Avoid chewing through file number space in a tight loop. // Avoid chewing through file number space in a tight loop.
versions_->ReuseFileNumber(new_log_number); versions_->ReuseFileNumber(new_log_number);
assert (!new_mem); assert (!new_mem);
assert(new_log == nullptr);
break; break;
} }
logfile_number_ = new_log_number; logfile_number_ = new_log_number;
log_.reset(new log::Writer(std::move(lfile))); assert(new_log != nullptr);
*old_log = log_.release();
log_.reset(new_log);
mem_->SetNextLogNumber(logfile_number_); mem_->SetNextLogNumber(logfile_number_);
imm_.Add(mem_); imm_.Add(mem_);
if (force) { if (force) {

View File

@ -338,9 +338,12 @@ class DBImpl : public DB {
uint64_t SlowdownAmount(int n, double bottom, double top); uint64_t SlowdownAmount(int n, double bottom, double top);
// MakeRoomForWrite will return superversion_to_free through an arugment, // MakeRoomForWrite will return superversion_to_free through an arugment,
// which the caller needs to delete. We do it because caller can delete // which the caller needs to delete. We do it because caller can delete
// the superversion outside of mutex // the superversion outside of mutex.
// old_log if not nullptr is the old log writer that should be safely
// closed whenever DB mutex is released.
Status MakeRoomForWrite(bool force /* compact even if there is room? */, Status MakeRoomForWrite(bool force /* compact even if there is room? */,
SuperVersion** superversion_to_free); SuperVersion** superversion_to_free,
log::Writer** old_log);
void BuildBatchGroup(Writer** last_writer, void BuildBatchGroup(Writer** last_writer,
autovector<WriteBatch*>* write_batch_group); autovector<WriteBatch*>* write_batch_group);