Revert 6fbe4e981a3d74270a0160445bd993c464c23d76: If disable wal is set, then batch commits are avoided
Summary: Revert "If disable wal is set, then batch commits are avoided" because keeping the mutex while inserting into the skiplist means that readers and writes are all serialized on the mutex. Test Plan: Reviewers: CC: Task ID: # Blame Rev:
This commit is contained in:
parent
52d7ecfc78
commit
d7ba5bce37
@ -2273,26 +2273,20 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
|
||||
|
||||
StopWatch sw(env_, options_.statistics, DB_WRITE);
|
||||
MutexLock l(&mutex_);
|
||||
|
||||
// If WAL is disabled, we avoid any queueing.
|
||||
if (!options.disableWAL) {
|
||||
writers_.push_back(&w);
|
||||
while (!w.done && &w != writers_.front()) {
|
||||
w.cv.Wait();
|
||||
}
|
||||
if (w.done) {
|
||||
return w.status;
|
||||
}
|
||||
writers_.push_back(&w);
|
||||
while (!w.done && &w != writers_.front()) {
|
||||
w.cv.Wait();
|
||||
}
|
||||
if (w.done) {
|
||||
return w.status;
|
||||
}
|
||||
|
||||
// May temporarily unlock and wait.
|
||||
Status status = MakeRoomForWrite(my_batch == nullptr);
|
||||
uint64_t last_sequence = versions_->LastSequence();
|
||||
Writer* last_writer = &w;
|
||||
|
||||
if (status.ok() && my_batch != nullptr) { // nullptr batch is for compactions
|
||||
WriteBatch* updates = options.disableWAL ? my_batch :
|
||||
BuildBatchGroup(&last_writer);
|
||||
WriteBatch* updates = BuildBatchGroup(&last_writer);
|
||||
const SequenceNumber current_sequence = last_sequence + 1;
|
||||
WriteBatchInternal::SetSequence(updates, current_sequence);
|
||||
int my_batch_count = WriteBatchInternal::Count(updates);
|
||||
@ -2307,12 +2301,12 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
|
||||
// and protects against concurrent loggers and concurrent writes
|
||||
// into mem_.
|
||||
{
|
||||
mutex_.Unlock();
|
||||
if (options.disableWAL) {
|
||||
// If WAL is disabled, then we do not drop the mutex. We keep the
|
||||
// mutex to protect concurrent insertions into the memtable.
|
||||
flush_on_destroy_ = true;
|
||||
} else {
|
||||
mutex_.Unlock();
|
||||
}
|
||||
|
||||
if (!options.disableWAL) {
|
||||
status = log_->AddRecord(WriteBatchInternal::Contents(updates));
|
||||
if (status.ok() && options.sync) {
|
||||
if (options_.use_fsync) {
|
||||
@ -2337,29 +2331,25 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
|
||||
versions_->SetLastSequence(last_sequence);
|
||||
last_flushed_sequence_ = current_sequence;
|
||||
}
|
||||
if (!options.disableWAL) {
|
||||
mutex_.Lock();
|
||||
}
|
||||
mutex_.Lock();
|
||||
}
|
||||
if (updates == &tmp_batch_) tmp_batch_.Clear();
|
||||
}
|
||||
|
||||
if (!options.disableWAL) {
|
||||
while (true) {
|
||||
Writer* ready = writers_.front();
|
||||
writers_.pop_front();
|
||||
if (ready != &w) {
|
||||
ready->status = status;
|
||||
ready->done = true;
|
||||
ready->cv.Signal();
|
||||
}
|
||||
if (ready == last_writer) break;
|
||||
while (true) {
|
||||
Writer* ready = writers_.front();
|
||||
writers_.pop_front();
|
||||
if (ready != &w) {
|
||||
ready->status = status;
|
||||
ready->done = true;
|
||||
ready->cv.Signal();
|
||||
}
|
||||
if (ready == last_writer) break;
|
||||
}
|
||||
|
||||
// Notify new head of write queue
|
||||
if (!writers_.empty()) {
|
||||
writers_.front()->cv.Signal();
|
||||
}
|
||||
// Notify new head of write queue
|
||||
if (!writers_.empty()) {
|
||||
writers_.front()->cv.Signal();
|
||||
}
|
||||
return status;
|
||||
}
|
||||
@ -2423,6 +2413,7 @@ WriteBatch* DBImpl::BuildBatchGroup(Writer** last_writer) {
|
||||
// REQUIRES: this thread is currently at the front of the writer queue
|
||||
Status DBImpl::MakeRoomForWrite(bool force) {
|
||||
mutex_.AssertHeld();
|
||||
assert(!writers_.empty());
|
||||
bool allow_delay = !force;
|
||||
bool allow_rate_limit_delay = !force;
|
||||
uint64_t rate_limit_delay_millis = 0;
|
||||
|
Loading…
x
Reference in New Issue
Block a user