Add timeout_hint_us to WriteOptions and introduce Status::TimeOut.

Summary:
This diff adds timeout_hint_us to WriteOptions.  If it's non-zero, then
1) writes associated with this options MAY be aborted when it has been
  waiting for longer than the specified time.  If an abortion happens,
  associated writes will return Status::TimeOut.
2) the stall time of the associated write caused by flush or compaction
  will be limited by timeout_hint_us.

The default value of timeout_hint_us is 0 (i.e., OFF.)

The statistics of timeout writes will be recorded in WRITE_TIMEDOUT.

Test Plan:
export ROCKSDB_TESTS=WriteTimeoutAndDelayTest
make db_test
./db_test

Reviewers: igor, ljin, haobo, sdong

Reviewed By: sdong

Subscribers: dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D18837
This commit is contained in:
Yueh-Hsuan Chiang 2014-07-03 15:47:02 -07:00
parent 4203431e71
commit d4d338de33
9 changed files with 262 additions and 23 deletions

View File

@ -80,7 +80,9 @@ struct DBImpl::Writer {
WriteBatch* batch;
bool sync;
bool disableWAL;
bool in_batch_group;
bool done;
uint64_t timeout_hint_us;
port::CondVar cv;
explicit Writer(port::Mutex* mu) : cv(mu) { }
@ -3729,13 +3731,41 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
w.batch = my_batch;
w.sync = options.sync;
w.disableWAL = options.disableWAL;
w.in_batch_group = false;
w.done = false;
w.timeout_hint_us = options.timeout_hint_us;
uint64_t expiration_time = 0;
if (w.timeout_hint_us == 0) {
w.timeout_hint_us = kNoTimeOut;
} else {
expiration_time = env_->NowMicros() + w.timeout_hint_us;
}
w.done = false;
StopWatch sw(env_, options_.statistics.get(), DB_WRITE, false);
mutex_.Lock();
// the following code block pushes the current writer "w" into the writer
// queue "writers_" and wait until one of the following conditions met:
// 1. the job of "w" has been done by some other writers.
// 2. "w" becomes the first writer in "writers_"
// 3. "w" timed-out.
writers_.push_back(&w);
bool timed_out = false;
while (!w.done && &w != writers_.front()) {
w.cv.Wait();
if (expiration_time == 0) {
w.cv.Wait();
} else if (w.cv.TimedWait(expiration_time)) {
if (w.in_batch_group) {
// then it means the front writer is currently doing the
// write on behalf of this "timed-out" writer. Then it
// should wait until the write completes.
expiration_time = 0;
} else {
timed_out = true;
break;
}
}
}
if (!options.disableWAL) {
@ -3746,10 +3776,33 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
mutex_.Unlock();
RecordTick(options_.statistics.get(), WRITE_DONE_BY_OTHER, 1);
return w.status;
} else if (timed_out) {
bool found = false;
for (auto iter = writers_.begin(); iter != writers_.end(); iter++) {
if (*iter == &w) {
writers_.erase(iter);
found = true;
break;
}
}
assert(found);
// writers_.front() might still be in cond_wait without a time-out.
// As a result, we need to signal it to wake it up. Otherwise no
// one else will wake him up, and RocksDB will hang.
if (!writers_.empty()) {
writers_.front()->cv.Signal();
}
mutex_.Unlock();
RecordTick(options_.statistics.get(), WRITE_TIMEDOUT, 1);
return Status::TimedOut();
} else {
RecordTick(options_.statistics.get(), WRITE_DONE_BY_SELF, 1);
}
// Once reaches this point, the current writer "w" will try to do its write
// job. It may also pick up some of the remaining writers in the "writers_"
// when it finds suitable, and finish them in the same write batch.
// This is how a write job could be done by the other writer.
assert(!single_column_family_mode_ ||
versions_->GetColumnFamilySet()->NumberOfColumnFamilies() == 1);
@ -3774,8 +3827,10 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
if (LIKELY(single_column_family_mode_)) {
// fast path
status = MakeRoomForWrite(default_cf_handle_->cfd(), my_batch == nullptr,
&superversions_to_free, &logs_to_free);
status = MakeRoomForWrite(
default_cf_handle_->cfd(), my_batch == nullptr,
&superversions_to_free, &logs_to_free,
expiration_time);
} else {
// refcounting cfd in iteration
bool dead_cfd = false;
@ -3786,8 +3841,9 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
(flush_column_family_if_log_file != 0 &&
cfd->GetLogNumber() <= flush_column_family_if_log_file);
// May temporarily unlock and wait.
status = MakeRoomForWrite(cfd, force_flush, &superversions_to_free,
&logs_to_free);
status = MakeRoomForWrite(
cfd, force_flush, &superversions_to_free, &logs_to_free,
expiration_time);
if (cfd->Unref()) {
dead_cfd = true;
}
@ -3883,11 +3939,14 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
}
}
}
if (options_.paranoid_checks && !status.ok() && bg_error_.ok()) {
if (options_.paranoid_checks && !status.ok() &&
!status.IsTimedOut() && bg_error_.ok()) {
bg_error_ = status; // stop compaction & fail any further writes
}
while (true) {
// Pop out the current writer and all writers being pushed before the
// current writer from the writer queue.
while (!writers_.empty()) {
Writer* ready = writers_.front();
writers_.pop_front();
if (ready != &w) {
@ -3904,6 +3963,10 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
}
mutex_.Unlock();
if (status.IsTimedOut()) {
RecordTick(options_.statistics.get(), WRITE_TIMEDOUT, 1);
}
for (auto& sv : superversions_to_free) {
delete sv;
}
@ -3915,6 +3978,9 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
return status;
}
// This function will be called only when the first writer succeeds.
// All writers in the to-be-built batch group will be processed.
//
// REQUIRES: Writer list must be non-empty
// REQUIRES: First writer must have a non-nullptr batch
void DBImpl::BuildBatchGroup(Writer** last_writer,
@ -3950,6 +4016,12 @@ void DBImpl::BuildBatchGroup(Writer** last_writer,
break;
}
if (w->timeout_hint_us < first->timeout_hint_us) {
// Do not include those writes with shorter timeout. Otherwise, we might
// execute a write that should instead be aborted because of timeout.
break;
}
if (w->batch != nullptr) {
size += WriteBatchInternal::ByteSize(w->batch);
if (size > max_size) {
@ -3959,6 +4031,7 @@ void DBImpl::BuildBatchGroup(Writer** last_writer,
write_batch_group->push_back(w->batch);
}
w->in_batch_group = true;
*last_writer = w;
}
}
@ -4000,7 +4073,8 @@ uint64_t DBImpl::SlowdownAmount(int n, double bottom, double top) {
Status DBImpl::MakeRoomForWrite(
ColumnFamilyData* cfd, bool force,
autovector<SuperVersion*>* superversions_to_free,
autovector<log::Writer*>* logs_to_free) {
autovector<log::Writer*>* logs_to_free,
uint64_t expiration_time) {
mutex_.AssertHeld();
assert(!writers_.empty());
bool allow_delay = !force;
@ -4013,12 +4087,16 @@ Status DBImpl::MakeRoomForWrite(
// might generate a tight feedback loop, constantly scheduling more background
// work, even if additional background work is not needed
bool schedule_background_work = true;
bool has_timeout = (expiration_time > 0);
while (true) {
if (!bg_error_.ok()) {
// Yield previous error
s = bg_error_;
break;
} else if (has_timeout && env_->NowMicros() > expiration_time) {
s = Status::TimedOut();
break;
} else if (allow_delay && cfd->NeedSlowdownForNumLevel0Files()) {
// We are getting close to hitting a hard limit on the number of
// L0 files. Rather than delaying a single write by several
@ -4063,7 +4141,11 @@ Status DBImpl::MakeRoomForWrite(
{
StopWatch sw(env_, options_.statistics.get(),
STALL_MEMTABLE_COMPACTION_COUNT);
bg_cv_.Wait();
if (!has_timeout) {
bg_cv_.Wait();
} else {
bg_cv_.TimedWait(expiration_time);
}
stall = sw.ElapsedMicros();
}
RecordTick(options_.statistics.get(),
@ -4078,10 +4160,15 @@ Status DBImpl::MakeRoomForWrite(
{
StopWatch sw(env_, options_.statistics.get(),
STALL_L0_NUM_FILES_COUNT);
bg_cv_.Wait();
if (!has_timeout) {
bg_cv_.Wait();
} else {
bg_cv_.TimedWait(expiration_time);
}
stall = sw.ElapsedMicros();
}
RecordTick(options_.statistics.get(), STALL_L0_NUM_FILES_MICROS, stall);
RecordTick(options_.statistics.get(),
STALL_L0_NUM_FILES_MICROS, stall);
cfd->internal_stats()->RecordWriteStall(InternalStats::LEVEL0_NUM_FILES,
stall);
} else if (allow_hard_rate_limit_delay && cfd->ExceedsHardRateLimit()) {
@ -4112,18 +4199,18 @@ Status DBImpl::MakeRoomForWrite(
score = cfd->current()->MaxCompactionScore();
// Delay a write when the compaction score for any level is too large.
// TODO: add statistics
uint64_t slowdown = SlowdownAmount(score, cfd->options()->soft_rate_limit,
cfd->options()->hard_rate_limit);
mutex_.Unlock();
{
StopWatch sw(env_, options_.statistics.get(),
SOFT_RATE_LIMIT_DELAY_COUNT);
env_->SleepForMicroseconds(
SlowdownAmount(score, cfd->options()->soft_rate_limit,
cfd->options()->hard_rate_limit));
rate_limit_delay_millis += sw.ElapsedMicros();
env_->SleepForMicroseconds(slowdown);
slowdown = sw.ElapsedMicros();
rate_limit_delay_millis += slowdown;
}
allow_soft_rate_limit_delay = false;
mutex_.Lock();
} else {
unique_ptr<WritableFile> lfile;
log::Writer* new_log = nullptr;

View File

@ -10,6 +10,7 @@
#include <atomic>
#include <deque>
#include <limits>
#include <set>
#include <utility>
#include <vector>
@ -28,6 +29,7 @@
#include "rocksdb/transaction_log.h"
#include "util/autovector.h"
#include "util/stats_logger.h"
#include "util/stop_watch.h"
#include "util/thread_local.h"
#include "db/internal_stats.h"
@ -345,7 +347,8 @@ class DBImpl : public DB {
Status MakeRoomForWrite(ColumnFamilyData* cfd,
bool force /* flush even if there is room? */,
autovector<SuperVersion*>* superversions_to_free,
autovector<log::Writer*>* logs_to_free);
autovector<log::Writer*>* logs_to_free,
uint64_t expiration_time);
void BuildBatchGroup(Writer** last_writer,
autovector<WriteBatch*>* write_batch_group);
@ -578,6 +581,7 @@ class DBImpl : public DB {
bool flush_on_destroy_; // Used when disableWAL is true.
static const int KEEP_LOG_FILE_NUM = 1000;
static const uint64_t kNoTimeOut = std::numeric_limits<uint64_t>::max();
std::string db_absolute_path_;
// count of the number of contiguous delaying writes

View File

@ -7001,6 +7001,128 @@ TEST(DBTest, FIFOCompactionTest) {
}
}
}
TEST(DBTest, SimpleWriteTimeoutTest) {
Options options;
options.env = env_;
options.create_if_missing = true;
options.write_buffer_size = 100000;
options.max_background_flushes = 0;
options.max_write_buffer_number = 2;
options.min_write_buffer_number_to_merge = 3;
options.max_total_wal_size = std::numeric_limits<uint64_t>::max();
WriteOptions write_opt = WriteOptions();
write_opt.timeout_hint_us = 500;
DestroyAndReopen(&options);
// fill the two write buffer
ASSERT_OK(Put(Key(1), Key(1) + std::string(100000, 'v'), write_opt));
ASSERT_OK(Put(Key(2), Key(2) + std::string(100000, 'v'), write_opt));
// As the only two write buffers are full in this moment, the third
// Put is expected to be timed-out.
ASSERT_TRUE(
Put(Key(3), Key(3) + std::string(100000, 'v'), write_opt).IsTimedOut());
}
// Multi-threaded Timeout Test
namespace {
static const int kValueSize = 1000;
static const int kWriteBufferSize = 100000;
struct TimeoutWriterState {
int id;
DB* db;
std::atomic<bool> done;
std::map<int, std::string> success_kvs;
};
static void RandomTimeoutWriter(void* arg) {
TimeoutWriterState* state = reinterpret_cast<TimeoutWriterState*>(arg);
static const uint64_t kTimerBias = 50;
int thread_id = state->id;
DB* db = state->db;
Random rnd(1000 + thread_id);
WriteOptions write_opt = WriteOptions();
write_opt.timeout_hint_us = 500;
int timeout_count = 0;
int num_keys = kNumKeys * 5;
for (int k = 0; k < num_keys; ++k) {
int key = k + thread_id * num_keys;
std::string value = RandomString(&rnd, kValueSize);
// only the second-half is randomized
if (k > num_keys / 2) {
switch (rnd.Next() % 5) {
case 0:
write_opt.timeout_hint_us = 500 * thread_id;
break;
case 1:
write_opt.timeout_hint_us = num_keys - k;
break;
case 2:
write_opt.timeout_hint_us = 1;
break;
default:
write_opt.timeout_hint_us = 0;
state->success_kvs.insert({key, value});
}
}
uint64_t time_before_put = db->GetEnv()->NowMicros();
Status s = db->Put(write_opt, Key(key), value);
uint64_t put_duration = db->GetEnv()->NowMicros() - time_before_put;
if (write_opt.timeout_hint_us == 0 ||
put_duration + kTimerBias < write_opt.timeout_hint_us) {
ASSERT_OK(s);
std::string result;
}
if (s.IsTimedOut()) {
timeout_count++;
ASSERT_GT(put_duration + kTimerBias, write_opt.timeout_hint_us);
}
}
state->done = true;
}
TEST(DBTest, MTRandomTimeoutTest) {
Options options;
options.env = env_;
options.create_if_missing = true;
options.max_write_buffer_number = 2;
options.compression = kNoCompression;
options.level0_slowdown_writes_trigger = 10;
options.level0_stop_writes_trigger = 20;
options.write_buffer_size = kWriteBufferSize;
DestroyAndReopen(&options);
TimeoutWriterState thread_states[kNumThreads];
for (int tid = 0; tid < kNumThreads; ++tid) {
thread_states[tid].id = tid;
thread_states[tid].db = db_;
thread_states[tid].done = false;
env_->StartThread(RandomTimeoutWriter, &thread_states[tid]);
}
for (int tid = 0; tid < kNumThreads; ++tid) {
while (thread_states[tid].done == false) {
env_->SleepForMicroseconds(100000);
}
}
Flush();
for (int tid = 0; tid < kNumThreads; ++tid) {
auto& success_kvs = thread_states[tid].success_kvs;
for (auto it = success_kvs.begin(); it != success_kvs.end(); ++it) {
ASSERT_EQ(Get(Key(it->first)), it->second);
}
}
}
} // anonymous namespace
} // namespace rocksdb
int main(int argc, char** argv) {

View File

@ -975,7 +975,18 @@ struct WriteOptions {
// and the write may got lost after a crash.
bool disableWAL;
WriteOptions() : sync(false), disableWAL(false) {}
// If non-zero, then associated write waiting longer than the specified
// time MAY be aborted and returns Status::TimedOut. A write that takes
// less than the specified time is guaranteed to not fail with
// Status::TimedOut.
//
// The number of times a write call encounters a timeout is recorded in
// Statistics.WRITE_TIMEDOUT
//
// Default: 0
uint64_t timeout_hint_us;
WriteOptions() : sync(false), disableWAL(false), timeout_hint_us(0) {}
};
// Options that control flush operations

View File

@ -115,6 +115,7 @@ enum Tickers {
// head of the writers queue.
WRITE_DONE_BY_SELF,
WRITE_DONE_BY_OTHER,
WRITE_TIMEDOUT, // Number of writes ending up with timed-out.
WRITE_WITH_WAL, // Number of Write calls that request WAL
COMPACT_READ_BYTES, // Bytes read during compaction
COMPACT_WRITE_BYTES, // Bytes written during compaction
@ -176,6 +177,7 @@ const std::vector<std::pair<Tickers, std::string>> TickersNameMap = {
{WAL_FILE_BYTES, "rocksdb.wal.bytes"},
{WRITE_DONE_BY_SELF, "rocksdb.write.self"},
{WRITE_DONE_BY_OTHER, "rocksdb.write.other"},
{WRITE_TIMEDOUT, "rocksdb.write.timedout"},
{WRITE_WITH_WAL, "rocksdb.write.wal"},
{COMPACT_READ_BYTES, "rocksdb.compact.read.bytes"},
{COMPACT_WRITE_BYTES, "rocksdb.compact.write.bytes"},

View File

@ -65,6 +65,12 @@ class Status {
const Slice& msg2 = Slice()) {
return Status(kShutdownInProgress, msg, msg2);
}
static Status TimedOut() {
return Status(kTimedOut);
}
static Status TimedOut(const Slice& msg, const Slice& msg2 = Slice()) {
return Status(kTimedOut, msg, msg2);
}
// Returns true iff the status indicates success.
bool ok() const { return code() == kOk; }
@ -93,6 +99,8 @@ class Status {
// Returns true iff the status indicates Incomplete
bool IsShutdownInProgress() const { return code() == kShutdownInProgress; }
bool IsTimedOut() const { return code() == kTimedOut; }
// Return a string representation of this status suitable for printing.
// Returns the string "OK" for success.
std::string ToString() const;
@ -106,7 +114,8 @@ class Status {
kIOError = 5,
kMergeInProgress = 6,
kIncomplete = 7,
kShutdownInProgress = 8
kShutdownInProgress = 8,
kTimedOut = 9
};
Code code() const {

View File

@ -20,11 +20,12 @@
namespace rocksdb {
namespace port {
static void PthreadCall(const char* label, int result) {
if (result != 0) {
static int PthreadCall(const char* label, int result) {
if (result != 0 && result != ETIMEDOUT) {
fprintf(stderr, "pthread %s: %s\n", label, strerror(result));
abort();
}
return result;
}
Mutex::Mutex(bool adaptive) {

View File

@ -68,6 +68,9 @@ std::string Status::ToString() const {
case kShutdownInProgress:
type = "Shutdown in progress: ";
break;
case kTimedOut:
type = "Operation timed out: ";
break;
default:
snprintf(tmp, sizeof(tmp), "Unknown code(%d): ",
static_cast<int>(code()));

View File

@ -24,7 +24,7 @@ class StopWatch {
uint64_t ElapsedMicros() {
uint64_t ElapsedMicros() const {
return env_->NowMicros() - start_time_;
}