From d4d338de33da7fa95336afa15c46cf61c2325c3a Mon Sep 17 00:00:00 2001 From: Yueh-Hsuan Chiang Date: Thu, 3 Jul 2014 15:47:02 -0700 Subject: [PATCH] Add timeout_hint_us to WriteOptions and introduce Status::TimeOut. Summary: This diff adds timeout_hint_us to WriteOptions. If it's non-zero, then 1) writes associated with this options MAY be aborted when it has been waiting for longer than the specified time. If an abortion happens, associated writes will return Status::TimeOut. 2) the stall time of the associated write caused by flush or compaction will be limited by timeout_hint_us. The default value of timeout_hint_us is 0 (i.e., OFF.) The statistics of timeout writes will be recorded in WRITE_TIMEDOUT. Test Plan: export ROCKSDB_TESTS=WriteTimeoutAndDelayTest make db_test ./db_test Reviewers: igor, ljin, haobo, sdong Reviewed By: sdong Subscribers: dhruba, leveldb Differential Revision: https://reviews.facebook.net/D18837 --- db/db_impl.cc | 121 +++++++++++++++++++++++++++++----- db/db_impl.h | 6 +- db/db_test.cc | 122 +++++++++++++++++++++++++++++++++++ include/rocksdb/options.h | 13 +++- include/rocksdb/statistics.h | 2 + include/rocksdb/status.h | 11 +++- port/port_posix.cc | 5 +- util/status.cc | 3 + util/stop_watch.h | 2 +- 9 files changed, 262 insertions(+), 23 deletions(-) diff --git a/db/db_impl.cc b/db/db_impl.cc index ce1cf78ff..a15568432 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -80,7 +80,9 @@ struct DBImpl::Writer { WriteBatch* batch; bool sync; bool disableWAL; + bool in_batch_group; bool done; + uint64_t timeout_hint_us; port::CondVar cv; explicit Writer(port::Mutex* mu) : cv(mu) { } @@ -3729,13 +3731,41 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { w.batch = my_batch; w.sync = options.sync; w.disableWAL = options.disableWAL; + w.in_batch_group = false; + w.done = false; + w.timeout_hint_us = options.timeout_hint_us; + + uint64_t expiration_time = 0; + if (w.timeout_hint_us == 0) { + w.timeout_hint_us = kNoTimeOut; + } else { + expiration_time = env_->NowMicros() + w.timeout_hint_us; + } w.done = false; - StopWatch sw(env_, options_.statistics.get(), DB_WRITE, false); mutex_.Lock(); + // the following code block pushes the current writer "w" into the writer + // queue "writers_" and wait until one of the following conditions met: + // 1. the job of "w" has been done by some other writers. + // 2. "w" becomes the first writer in "writers_" + // 3. "w" timed-out. writers_.push_back(&w); + + bool timed_out = false; while (!w.done && &w != writers_.front()) { - w.cv.Wait(); + if (expiration_time == 0) { + w.cv.Wait(); + } else if (w.cv.TimedWait(expiration_time)) { + if (w.in_batch_group) { + // then it means the front writer is currently doing the + // write on behalf of this "timed-out" writer. Then it + // should wait until the write completes. + expiration_time = 0; + } else { + timed_out = true; + break; + } + } } if (!options.disableWAL) { @@ -3746,10 +3776,33 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { mutex_.Unlock(); RecordTick(options_.statistics.get(), WRITE_DONE_BY_OTHER, 1); return w.status; + } else if (timed_out) { + bool found = false; + for (auto iter = writers_.begin(); iter != writers_.end(); iter++) { + if (*iter == &w) { + writers_.erase(iter); + found = true; + break; + } + } + assert(found); + // writers_.front() might still be in cond_wait without a time-out. + // As a result, we need to signal it to wake it up. Otherwise no + // one else will wake him up, and RocksDB will hang. + if (!writers_.empty()) { + writers_.front()->cv.Signal(); + } + mutex_.Unlock(); + RecordTick(options_.statistics.get(), WRITE_TIMEDOUT, 1); + return Status::TimedOut(); } else { RecordTick(options_.statistics.get(), WRITE_DONE_BY_SELF, 1); } + // Once reaches this point, the current writer "w" will try to do its write + // job. It may also pick up some of the remaining writers in the "writers_" + // when it finds suitable, and finish them in the same write batch. + // This is how a write job could be done by the other writer. assert(!single_column_family_mode_ || versions_->GetColumnFamilySet()->NumberOfColumnFamilies() == 1); @@ -3774,8 +3827,10 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { if (LIKELY(single_column_family_mode_)) { // fast path - status = MakeRoomForWrite(default_cf_handle_->cfd(), my_batch == nullptr, - &superversions_to_free, &logs_to_free); + status = MakeRoomForWrite( + default_cf_handle_->cfd(), my_batch == nullptr, + &superversions_to_free, &logs_to_free, + expiration_time); } else { // refcounting cfd in iteration bool dead_cfd = false; @@ -3786,8 +3841,9 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { (flush_column_family_if_log_file != 0 && cfd->GetLogNumber() <= flush_column_family_if_log_file); // May temporarily unlock and wait. - status = MakeRoomForWrite(cfd, force_flush, &superversions_to_free, - &logs_to_free); + status = MakeRoomForWrite( + cfd, force_flush, &superversions_to_free, &logs_to_free, + expiration_time); if (cfd->Unref()) { dead_cfd = true; } @@ -3883,11 +3939,14 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { } } } - if (options_.paranoid_checks && !status.ok() && bg_error_.ok()) { + if (options_.paranoid_checks && !status.ok() && + !status.IsTimedOut() && bg_error_.ok()) { bg_error_ = status; // stop compaction & fail any further writes } - while (true) { + // Pop out the current writer and all writers being pushed before the + // current writer from the writer queue. + while (!writers_.empty()) { Writer* ready = writers_.front(); writers_.pop_front(); if (ready != &w) { @@ -3904,6 +3963,10 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { } mutex_.Unlock(); + if (status.IsTimedOut()) { + RecordTick(options_.statistics.get(), WRITE_TIMEDOUT, 1); + } + for (auto& sv : superversions_to_free) { delete sv; } @@ -3915,6 +3978,9 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { return status; } +// This function will be called only when the first writer succeeds. +// All writers in the to-be-built batch group will be processed. +// // REQUIRES: Writer list must be non-empty // REQUIRES: First writer must have a non-nullptr batch void DBImpl::BuildBatchGroup(Writer** last_writer, @@ -3950,6 +4016,12 @@ void DBImpl::BuildBatchGroup(Writer** last_writer, break; } + if (w->timeout_hint_us < first->timeout_hint_us) { + // Do not include those writes with shorter timeout. Otherwise, we might + // execute a write that should instead be aborted because of timeout. + break; + } + if (w->batch != nullptr) { size += WriteBatchInternal::ByteSize(w->batch); if (size > max_size) { @@ -3959,6 +4031,7 @@ void DBImpl::BuildBatchGroup(Writer** last_writer, write_batch_group->push_back(w->batch); } + w->in_batch_group = true; *last_writer = w; } } @@ -4000,7 +4073,8 @@ uint64_t DBImpl::SlowdownAmount(int n, double bottom, double top) { Status DBImpl::MakeRoomForWrite( ColumnFamilyData* cfd, bool force, autovector* superversions_to_free, - autovector* logs_to_free) { + autovector* logs_to_free, + uint64_t expiration_time) { mutex_.AssertHeld(); assert(!writers_.empty()); bool allow_delay = !force; @@ -4013,12 +4087,16 @@ Status DBImpl::MakeRoomForWrite( // might generate a tight feedback loop, constantly scheduling more background // work, even if additional background work is not needed bool schedule_background_work = true; + bool has_timeout = (expiration_time > 0); while (true) { if (!bg_error_.ok()) { // Yield previous error s = bg_error_; break; + } else if (has_timeout && env_->NowMicros() > expiration_time) { + s = Status::TimedOut(); + break; } else if (allow_delay && cfd->NeedSlowdownForNumLevel0Files()) { // We are getting close to hitting a hard limit on the number of // L0 files. Rather than delaying a single write by several @@ -4063,7 +4141,11 @@ Status DBImpl::MakeRoomForWrite( { StopWatch sw(env_, options_.statistics.get(), STALL_MEMTABLE_COMPACTION_COUNT); - bg_cv_.Wait(); + if (!has_timeout) { + bg_cv_.Wait(); + } else { + bg_cv_.TimedWait(expiration_time); + } stall = sw.ElapsedMicros(); } RecordTick(options_.statistics.get(), @@ -4078,10 +4160,15 @@ Status DBImpl::MakeRoomForWrite( { StopWatch sw(env_, options_.statistics.get(), STALL_L0_NUM_FILES_COUNT); - bg_cv_.Wait(); + if (!has_timeout) { + bg_cv_.Wait(); + } else { + bg_cv_.TimedWait(expiration_time); + } stall = sw.ElapsedMicros(); } - RecordTick(options_.statistics.get(), STALL_L0_NUM_FILES_MICROS, stall); + RecordTick(options_.statistics.get(), + STALL_L0_NUM_FILES_MICROS, stall); cfd->internal_stats()->RecordWriteStall(InternalStats::LEVEL0_NUM_FILES, stall); } else if (allow_hard_rate_limit_delay && cfd->ExceedsHardRateLimit()) { @@ -4112,18 +4199,18 @@ Status DBImpl::MakeRoomForWrite( score = cfd->current()->MaxCompactionScore(); // Delay a write when the compaction score for any level is too large. // TODO: add statistics + uint64_t slowdown = SlowdownAmount(score, cfd->options()->soft_rate_limit, + cfd->options()->hard_rate_limit); mutex_.Unlock(); { StopWatch sw(env_, options_.statistics.get(), SOFT_RATE_LIMIT_DELAY_COUNT); - env_->SleepForMicroseconds( - SlowdownAmount(score, cfd->options()->soft_rate_limit, - cfd->options()->hard_rate_limit)); - rate_limit_delay_millis += sw.ElapsedMicros(); + env_->SleepForMicroseconds(slowdown); + slowdown = sw.ElapsedMicros(); + rate_limit_delay_millis += slowdown; } allow_soft_rate_limit_delay = false; mutex_.Lock(); - } else { unique_ptr lfile; log::Writer* new_log = nullptr; diff --git a/db/db_impl.h b/db/db_impl.h index fb0bdb4af..48bf4de37 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -10,6 +10,7 @@ #include #include +#include #include #include #include @@ -28,6 +29,7 @@ #include "rocksdb/transaction_log.h" #include "util/autovector.h" #include "util/stats_logger.h" +#include "util/stop_watch.h" #include "util/thread_local.h" #include "db/internal_stats.h" @@ -345,7 +347,8 @@ class DBImpl : public DB { Status MakeRoomForWrite(ColumnFamilyData* cfd, bool force /* flush even if there is room? */, autovector* superversions_to_free, - autovector* logs_to_free); + autovector* logs_to_free, + uint64_t expiration_time); void BuildBatchGroup(Writer** last_writer, autovector* write_batch_group); @@ -578,6 +581,7 @@ class DBImpl : public DB { bool flush_on_destroy_; // Used when disableWAL is true. static const int KEEP_LOG_FILE_NUM = 1000; + static const uint64_t kNoTimeOut = std::numeric_limits::max(); std::string db_absolute_path_; // count of the number of contiguous delaying writes diff --git a/db/db_test.cc b/db/db_test.cc index 8010eaa81..575768db1 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -7001,6 +7001,128 @@ TEST(DBTest, FIFOCompactionTest) { } } } + +TEST(DBTest, SimpleWriteTimeoutTest) { + Options options; + options.env = env_; + options.create_if_missing = true; + options.write_buffer_size = 100000; + options.max_background_flushes = 0; + options.max_write_buffer_number = 2; + options.min_write_buffer_number_to_merge = 3; + options.max_total_wal_size = std::numeric_limits::max(); + WriteOptions write_opt = WriteOptions(); + write_opt.timeout_hint_us = 500; + DestroyAndReopen(&options); + // fill the two write buffer + ASSERT_OK(Put(Key(1), Key(1) + std::string(100000, 'v'), write_opt)); + ASSERT_OK(Put(Key(2), Key(2) + std::string(100000, 'v'), write_opt)); + // As the only two write buffers are full in this moment, the third + // Put is expected to be timed-out. + ASSERT_TRUE( + Put(Key(3), Key(3) + std::string(100000, 'v'), write_opt).IsTimedOut()); +} + +// Multi-threaded Timeout Test +namespace { + +static const int kValueSize = 1000; +static const int kWriteBufferSize = 100000; + +struct TimeoutWriterState { + int id; + DB* db; + std::atomic done; + std::map success_kvs; +}; + +static void RandomTimeoutWriter(void* arg) { + TimeoutWriterState* state = reinterpret_cast(arg); + static const uint64_t kTimerBias = 50; + int thread_id = state->id; + DB* db = state->db; + + Random rnd(1000 + thread_id); + WriteOptions write_opt = WriteOptions(); + write_opt.timeout_hint_us = 500; + int timeout_count = 0; + int num_keys = kNumKeys * 5; + + for (int k = 0; k < num_keys; ++k) { + int key = k + thread_id * num_keys; + std::string value = RandomString(&rnd, kValueSize); + // only the second-half is randomized + if (k > num_keys / 2) { + switch (rnd.Next() % 5) { + case 0: + write_opt.timeout_hint_us = 500 * thread_id; + break; + case 1: + write_opt.timeout_hint_us = num_keys - k; + break; + case 2: + write_opt.timeout_hint_us = 1; + break; + default: + write_opt.timeout_hint_us = 0; + state->success_kvs.insert({key, value}); + } + } + + uint64_t time_before_put = db->GetEnv()->NowMicros(); + Status s = db->Put(write_opt, Key(key), value); + uint64_t put_duration = db->GetEnv()->NowMicros() - time_before_put; + if (write_opt.timeout_hint_us == 0 || + put_duration + kTimerBias < write_opt.timeout_hint_us) { + ASSERT_OK(s); + std::string result; + } + if (s.IsTimedOut()) { + timeout_count++; + ASSERT_GT(put_duration + kTimerBias, write_opt.timeout_hint_us); + } + } + + state->done = true; +} + +TEST(DBTest, MTRandomTimeoutTest) { + Options options; + options.env = env_; + options.create_if_missing = true; + options.max_write_buffer_number = 2; + options.compression = kNoCompression; + options.level0_slowdown_writes_trigger = 10; + options.level0_stop_writes_trigger = 20; + options.write_buffer_size = kWriteBufferSize; + DestroyAndReopen(&options); + + TimeoutWriterState thread_states[kNumThreads]; + for (int tid = 0; tid < kNumThreads; ++tid) { + thread_states[tid].id = tid; + thread_states[tid].db = db_; + thread_states[tid].done = false; + env_->StartThread(RandomTimeoutWriter, &thread_states[tid]); + } + + for (int tid = 0; tid < kNumThreads; ++tid) { + while (thread_states[tid].done == false) { + env_->SleepForMicroseconds(100000); + } + } + + Flush(); + + for (int tid = 0; tid < kNumThreads; ++tid) { + auto& success_kvs = thread_states[tid].success_kvs; + for (auto it = success_kvs.begin(); it != success_kvs.end(); ++it) { + ASSERT_EQ(Get(Key(it->first)), it->second); + } + } +} + +} // anonymous namespace + } // namespace rocksdb int main(int argc, char** argv) { diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index df7383d25..928149332 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -975,7 +975,18 @@ struct WriteOptions { // and the write may got lost after a crash. bool disableWAL; - WriteOptions() : sync(false), disableWAL(false) {} + // If non-zero, then associated write waiting longer than the specified + // time MAY be aborted and returns Status::TimedOut. A write that takes + // less than the specified time is guaranteed to not fail with + // Status::TimedOut. + // + // The number of times a write call encounters a timeout is recorded in + // Statistics.WRITE_TIMEDOUT + // + // Default: 0 + uint64_t timeout_hint_us; + + WriteOptions() : sync(false), disableWAL(false), timeout_hint_us(0) {} }; // Options that control flush operations diff --git a/include/rocksdb/statistics.h b/include/rocksdb/statistics.h index 7d5235f65..77f0b0388 100644 --- a/include/rocksdb/statistics.h +++ b/include/rocksdb/statistics.h @@ -115,6 +115,7 @@ enum Tickers { // head of the writers queue. WRITE_DONE_BY_SELF, WRITE_DONE_BY_OTHER, + WRITE_TIMEDOUT, // Number of writes ending up with timed-out. WRITE_WITH_WAL, // Number of Write calls that request WAL COMPACT_READ_BYTES, // Bytes read during compaction COMPACT_WRITE_BYTES, // Bytes written during compaction @@ -176,6 +177,7 @@ const std::vector> TickersNameMap = { {WAL_FILE_BYTES, "rocksdb.wal.bytes"}, {WRITE_DONE_BY_SELF, "rocksdb.write.self"}, {WRITE_DONE_BY_OTHER, "rocksdb.write.other"}, + {WRITE_TIMEDOUT, "rocksdb.write.timedout"}, {WRITE_WITH_WAL, "rocksdb.write.wal"}, {COMPACT_READ_BYTES, "rocksdb.compact.read.bytes"}, {COMPACT_WRITE_BYTES, "rocksdb.compact.write.bytes"}, diff --git a/include/rocksdb/status.h b/include/rocksdb/status.h index 0298a2838..b20689a77 100644 --- a/include/rocksdb/status.h +++ b/include/rocksdb/status.h @@ -65,6 +65,12 @@ class Status { const Slice& msg2 = Slice()) { return Status(kShutdownInProgress, msg, msg2); } + static Status TimedOut() { + return Status(kTimedOut); + } + static Status TimedOut(const Slice& msg, const Slice& msg2 = Slice()) { + return Status(kTimedOut, msg, msg2); + } // Returns true iff the status indicates success. bool ok() const { return code() == kOk; } @@ -93,6 +99,8 @@ class Status { // Returns true iff the status indicates Incomplete bool IsShutdownInProgress() const { return code() == kShutdownInProgress; } + bool IsTimedOut() const { return code() == kTimedOut; } + // Return a string representation of this status suitable for printing. // Returns the string "OK" for success. std::string ToString() const; @@ -106,7 +114,8 @@ class Status { kIOError = 5, kMergeInProgress = 6, kIncomplete = 7, - kShutdownInProgress = 8 + kShutdownInProgress = 8, + kTimedOut = 9 }; Code code() const { diff --git a/port/port_posix.cc b/port/port_posix.cc index 90dde3227..f353c475e 100644 --- a/port/port_posix.cc +++ b/port/port_posix.cc @@ -20,11 +20,12 @@ namespace rocksdb { namespace port { -static void PthreadCall(const char* label, int result) { - if (result != 0) { +static int PthreadCall(const char* label, int result) { + if (result != 0 && result != ETIMEDOUT) { fprintf(stderr, "pthread %s: %s\n", label, strerror(result)); abort(); } + return result; } Mutex::Mutex(bool adaptive) { diff --git a/util/status.cc b/util/status.cc index 2a5f05a4b..3165a497d 100644 --- a/util/status.cc +++ b/util/status.cc @@ -68,6 +68,9 @@ std::string Status::ToString() const { case kShutdownInProgress: type = "Shutdown in progress: "; break; + case kTimedOut: + type = "Operation timed out: "; + break; default: snprintf(tmp, sizeof(tmp), "Unknown code(%d): ", static_cast(code())); diff --git a/util/stop_watch.h b/util/stop_watch.h index 48e1b01c2..bc31cfc46 100644 --- a/util/stop_watch.h +++ b/util/stop_watch.h @@ -24,7 +24,7 @@ class StopWatch { - uint64_t ElapsedMicros() { + uint64_t ElapsedMicros() const { return env_->NowMicros() - start_time_; }