diff --git a/HISTORY.md b/HISTORY.md index cb25e8987..a4ef5d659 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -4,7 +4,8 @@ ### New Features * HashLinklist reduces performance outlier caused by skewed bucket by switching data in the bucket from linked list to skip list. Add parameter threshold_use_skiplist in NewHashLinkListRepFactory(). - +* RocksDB is now able to reclaim storage space more effectively during the compaction process. This is done by compensating the size of each deletion entry by the 2X average value size, which makes compaction to be triggerred by deletion entries more easily. +* Add TimeOut API to write. Now WriteOptions have a variable called timeout_hint_us. With timeout_hint_us set to non-zero, any write associated with this timeout_hint_us may be aborted when it runs longer than the specified timeout_hint_us, and it is guaranteed that any write completes earlier than the specified time-out will not be aborted due to the time-out condition. ## 3.2.0 (06/20/2014) diff --git a/db/db_impl.cc b/db/db_impl.cc index ce1cf78ff..6862f9a77 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -65,6 +65,7 @@ #include "util/log_buffer.h" #include "util/mutexlock.h" #include "util/perf_context_imp.h" +#include "util/iostats_context_imp.h" #include "util/stop_watch.h" #include "util/sync_point.h" @@ -80,7 +81,9 @@ struct DBImpl::Writer { WriteBatch* batch; bool sync; bool disableWAL; + bool in_batch_group; bool done; + uint64_t timeout_hint_us; port::CondVar cv; explicit Writer(port::Mutex* mu) : cv(mu) { } @@ -1602,6 +1605,7 @@ Status DBImpl::FlushMemTableToOutputFile(ColumnFamilyData* cfd, // true, mark DB read-only bg_error_ = s; } + RecordFlushIOStats(); return s; } @@ -1918,11 +1922,28 @@ void DBImpl::MaybeScheduleFlushOrCompaction() { } } +void DBImpl::RecordFlushIOStats() { + RecordTick(options_.statistics.get(), FLUSH_WRITE_BYTES, + iostats_context.bytes_written); + IOSTATS_RESET(bytes_written); +} + +void DBImpl::RecordCompactionIOStats() { + RecordTick(options_.statistics.get(), COMPACT_READ_BYTES, + IOSTATS(bytes_read)); + IOSTATS_RESET(bytes_read); + RecordTick(options_.statistics.get(), COMPACT_WRITE_BYTES, + IOSTATS(bytes_written)); + IOSTATS_RESET(bytes_written); +} + void DBImpl::BGWorkFlush(void* db) { + IOSTATS_SET_THREAD_POOL_ID(Env::Priority::HIGH); reinterpret_cast(db)->BackgroundCallFlush(); } void DBImpl::BGWorkCompaction(void* db) { + IOSTATS_SET_THREAD_POOL_ID(Env::Priority::LOW); reinterpret_cast(db)->BackgroundCallCompaction(); } @@ -2022,6 +2043,7 @@ void DBImpl::BackgroundCallFlush() { // that case, all DB variables will be dealloacated and referencing them // will cause trouble. } + RecordFlushIOStats(); } void DBImpl::BackgroundCallCompaction() { @@ -2557,6 +2579,7 @@ Status DBImpl::ProcessKeyValueCompaction( while (input->Valid() && !shutting_down_.Acquire_Load() && !cfd->IsDropped()) { + RecordCompactionIOStats(); // FLUSH preempts compaction // TODO(icanadi) this currently only checks if flush is necessary on // compacting column family. we should also check if flush is necessary on @@ -2815,6 +2838,8 @@ Status DBImpl::ProcessKeyValueCompaction( } } + RecordCompactionIOStats(); + return status; } @@ -3122,22 +3147,18 @@ Status DBImpl::DoCompactionWork(CompactionState* compact, for (int i = 0; i < compact->compaction->num_input_files(0); i++) { stats.bytes_readn += compact->compaction->input(0, i)->fd.GetFileSize(); - RecordTick(options_.statistics.get(), COMPACT_READ_BYTES, - compact->compaction->input(0, i)->fd.GetFileSize()); } for (int i = 0; i < compact->compaction->num_input_files(1); i++) { stats.bytes_readnp1 += compact->compaction->input(1, i)->fd.GetFileSize(); - RecordTick(options_.statistics.get(), COMPACT_READ_BYTES, - compact->compaction->input(1, i)->fd.GetFileSize()); } for (int i = 0; i < num_output_files; i++) { stats.bytes_written += compact->outputs[i].file_size; - RecordTick(options_.statistics.get(), COMPACT_WRITE_BYTES, - compact->outputs[i].file_size); } + RecordCompactionIOStats(); + LogFlush(options_.info_log); mutex_.Lock(); cfd->internal_stats()->AddCompactionStats(compact->compaction->output_level(), @@ -3729,13 +3750,41 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { w.batch = my_batch; w.sync = options.sync; w.disableWAL = options.disableWAL; + w.in_batch_group = false; + w.done = false; + w.timeout_hint_us = options.timeout_hint_us; + + uint64_t expiration_time = 0; + if (w.timeout_hint_us == 0) { + w.timeout_hint_us = kNoTimeOut; + } else { + expiration_time = env_->NowMicros() + w.timeout_hint_us; + } w.done = false; - StopWatch sw(env_, options_.statistics.get(), DB_WRITE, false); mutex_.Lock(); + // the following code block pushes the current writer "w" into the writer + // queue "writers_" and wait until one of the following conditions met: + // 1. the job of "w" has been done by some other writers. + // 2. "w" becomes the first writer in "writers_" + // 3. "w" timed-out. writers_.push_back(&w); + + bool timed_out = false; while (!w.done && &w != writers_.front()) { - w.cv.Wait(); + if (expiration_time == 0) { + w.cv.Wait(); + } else if (w.cv.TimedWait(expiration_time)) { + if (w.in_batch_group) { + // then it means the front writer is currently doing the + // write on behalf of this "timed-out" writer. Then it + // should wait until the write completes. + expiration_time = 0; + } else { + timed_out = true; + break; + } + } } if (!options.disableWAL) { @@ -3746,10 +3795,39 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { mutex_.Unlock(); RecordTick(options_.statistics.get(), WRITE_DONE_BY_OTHER, 1); return w.status; + } else if (timed_out) { +#ifndef NDEBUG + bool found = false; +#endif + for (auto iter = writers_.begin(); iter != writers_.end(); iter++) { + if (*iter == &w) { + writers_.erase(iter); +#ifndef NDEBUG + found = true; +#endif + break; + } + } +#ifndef NDEBUG + assert(found); +#endif + // writers_.front() might still be in cond_wait without a time-out. + // As a result, we need to signal it to wake it up. Otherwise no + // one else will wake him up, and RocksDB will hang. + if (!writers_.empty()) { + writers_.front()->cv.Signal(); + } + mutex_.Unlock(); + RecordTick(options_.statistics.get(), WRITE_TIMEDOUT, 1); + return Status::TimedOut(); } else { RecordTick(options_.statistics.get(), WRITE_DONE_BY_SELF, 1); } + // Once reaches this point, the current writer "w" will try to do its write + // job. It may also pick up some of the remaining writers in the "writers_" + // when it finds suitable, and finish them in the same write batch. + // This is how a write job could be done by the other writer. assert(!single_column_family_mode_ || versions_->GetColumnFamilySet()->NumberOfColumnFamilies() == 1); @@ -3774,8 +3852,10 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { if (LIKELY(single_column_family_mode_)) { // fast path - status = MakeRoomForWrite(default_cf_handle_->cfd(), my_batch == nullptr, - &superversions_to_free, &logs_to_free); + status = MakeRoomForWrite( + default_cf_handle_->cfd(), my_batch == nullptr, + &superversions_to_free, &logs_to_free, + expiration_time); } else { // refcounting cfd in iteration bool dead_cfd = false; @@ -3786,8 +3866,9 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { (flush_column_family_if_log_file != 0 && cfd->GetLogNumber() <= flush_column_family_if_log_file); // May temporarily unlock and wait. - status = MakeRoomForWrite(cfd, force_flush, &superversions_to_free, - &logs_to_free); + status = MakeRoomForWrite( + cfd, force_flush, &superversions_to_free, &logs_to_free, + expiration_time); if (cfd->Unref()) { dead_cfd = true; } @@ -3883,11 +3964,14 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { } } } - if (options_.paranoid_checks && !status.ok() && bg_error_.ok()) { + if (options_.paranoid_checks && !status.ok() && + !status.IsTimedOut() && bg_error_.ok()) { bg_error_ = status; // stop compaction & fail any further writes } - while (true) { + // Pop out the current writer and all writers being pushed before the + // current writer from the writer queue. + while (!writers_.empty()) { Writer* ready = writers_.front(); writers_.pop_front(); if (ready != &w) { @@ -3904,6 +3988,10 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { } mutex_.Unlock(); + if (status.IsTimedOut()) { + RecordTick(options_.statistics.get(), WRITE_TIMEDOUT, 1); + } + for (auto& sv : superversions_to_free) { delete sv; } @@ -3915,6 +4003,9 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { return status; } +// This function will be called only when the first writer succeeds. +// All writers in the to-be-built batch group will be processed. +// // REQUIRES: Writer list must be non-empty // REQUIRES: First writer must have a non-nullptr batch void DBImpl::BuildBatchGroup(Writer** last_writer, @@ -3950,6 +4041,12 @@ void DBImpl::BuildBatchGroup(Writer** last_writer, break; } + if (w->timeout_hint_us < first->timeout_hint_us) { + // Do not include those writes with shorter timeout. Otherwise, we might + // execute a write that should instead be aborted because of timeout. + break; + } + if (w->batch != nullptr) { size += WriteBatchInternal::ByteSize(w->batch); if (size > max_size) { @@ -3959,6 +4056,7 @@ void DBImpl::BuildBatchGroup(Writer** last_writer, write_batch_group->push_back(w->batch); } + w->in_batch_group = true; *last_writer = w; } } @@ -4000,7 +4098,8 @@ uint64_t DBImpl::SlowdownAmount(int n, double bottom, double top) { Status DBImpl::MakeRoomForWrite( ColumnFamilyData* cfd, bool force, autovector* superversions_to_free, - autovector* logs_to_free) { + autovector* logs_to_free, + uint64_t expiration_time) { mutex_.AssertHeld(); assert(!writers_.empty()); bool allow_delay = !force; @@ -4013,12 +4112,16 @@ Status DBImpl::MakeRoomForWrite( // might generate a tight feedback loop, constantly scheduling more background // work, even if additional background work is not needed bool schedule_background_work = true; + bool has_timeout = (expiration_time > 0); while (true) { if (!bg_error_.ok()) { // Yield previous error s = bg_error_; break; + } else if (has_timeout && env_->NowMicros() > expiration_time) { + s = Status::TimedOut(); + break; } else if (allow_delay && cfd->NeedSlowdownForNumLevel0Files()) { // We are getting close to hitting a hard limit on the number of // L0 files. Rather than delaying a single write by several @@ -4063,7 +4166,11 @@ Status DBImpl::MakeRoomForWrite( { StopWatch sw(env_, options_.statistics.get(), STALL_MEMTABLE_COMPACTION_COUNT); - bg_cv_.Wait(); + if (!has_timeout) { + bg_cv_.Wait(); + } else { + bg_cv_.TimedWait(expiration_time); + } stall = sw.ElapsedMicros(); } RecordTick(options_.statistics.get(), @@ -4078,10 +4185,15 @@ Status DBImpl::MakeRoomForWrite( { StopWatch sw(env_, options_.statistics.get(), STALL_L0_NUM_FILES_COUNT); - bg_cv_.Wait(); + if (!has_timeout) { + bg_cv_.Wait(); + } else { + bg_cv_.TimedWait(expiration_time); + } stall = sw.ElapsedMicros(); } - RecordTick(options_.statistics.get(), STALL_L0_NUM_FILES_MICROS, stall); + RecordTick(options_.statistics.get(), + STALL_L0_NUM_FILES_MICROS, stall); cfd->internal_stats()->RecordWriteStall(InternalStats::LEVEL0_NUM_FILES, stall); } else if (allow_hard_rate_limit_delay && cfd->ExceedsHardRateLimit()) { @@ -4112,18 +4224,18 @@ Status DBImpl::MakeRoomForWrite( score = cfd->current()->MaxCompactionScore(); // Delay a write when the compaction score for any level is too large. // TODO: add statistics + uint64_t slowdown = SlowdownAmount(score, cfd->options()->soft_rate_limit, + cfd->options()->hard_rate_limit); mutex_.Unlock(); { StopWatch sw(env_, options_.statistics.get(), SOFT_RATE_LIMIT_DELAY_COUNT); - env_->SleepForMicroseconds( - SlowdownAmount(score, cfd->options()->soft_rate_limit, - cfd->options()->hard_rate_limit)); - rate_limit_delay_millis += sw.ElapsedMicros(); + env_->SleepForMicroseconds(slowdown); + slowdown = sw.ElapsedMicros(); + rate_limit_delay_millis += slowdown; } allow_soft_rate_limit_delay = false; mutex_.Lock(); - } else { unique_ptr lfile; log::Writer* new_log = nullptr; diff --git a/db/db_impl.h b/db/db_impl.h index fb0bdb4af..6ac1d97f4 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -10,6 +10,7 @@ #include #include +#include #include #include #include @@ -28,6 +29,7 @@ #include "rocksdb/transaction_log.h" #include "util/autovector.h" #include "util/stats_logger.h" +#include "util/stop_watch.h" #include "util/thread_local.h" #include "db/internal_stats.h" @@ -345,7 +347,8 @@ class DBImpl : public DB { Status MakeRoomForWrite(ColumnFamilyData* cfd, bool force /* flush even if there is room? */, autovector* superversions_to_free, - autovector* logs_to_free); + autovector* logs_to_free, + uint64_t expiration_time); void BuildBatchGroup(Writer** last_writer, autovector* write_batch_group); @@ -356,6 +359,9 @@ class DBImpl : public DB { // Wait for memtable flushed Status WaitForFlushMemTable(ColumnFamilyData* cfd); + void RecordFlushIOStats(); + void RecordCompactionIOStats(); + void MaybeScheduleLogDBDeployStats(); #ifndef ROCKSDB_LITE @@ -578,6 +584,7 @@ class DBImpl : public DB { bool flush_on_destroy_; // Used when disableWAL is true. static const int KEEP_LOG_FILE_NUM = 1000; + static const uint64_t kNoTimeOut = std::numeric_limits::max(); std::string db_absolute_path_; // count of the number of contiguous delaying writes diff --git a/db/db_test.cc b/db/db_test.cc index 8010eaa81..025e04f24 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -7001,6 +7001,129 @@ TEST(DBTest, FIFOCompactionTest) { } } } + +TEST(DBTest, SimpleWriteTimeoutTest) { + Options options; + options.env = env_; + options.create_if_missing = true; + options.write_buffer_size = 100000; + options.max_background_flushes = 0; + options.max_write_buffer_number = 2; + options.min_write_buffer_number_to_merge = 3; + options.max_total_wal_size = std::numeric_limits::max(); + WriteOptions write_opt = WriteOptions(); + write_opt.timeout_hint_us = 0; + DestroyAndReopen(&options); + // fill the two write buffer + ASSERT_OK(Put(Key(1), Key(1) + std::string(100000, 'v'), write_opt)); + ASSERT_OK(Put(Key(2), Key(2) + std::string(100000, 'v'), write_opt)); + // As the only two write buffers are full in this moment, the third + // Put is expected to be timed-out. + write_opt.timeout_hint_us = 300; + ASSERT_TRUE( + Put(Key(3), Key(3) + std::string(100000, 'v'), write_opt).IsTimedOut()); +} + +// Multi-threaded Timeout Test +namespace { + +static const int kValueSize = 1000; +static const int kWriteBufferSize = 100000; + +struct TimeoutWriterState { + int id; + DB* db; + std::atomic done; + std::map success_kvs; +}; + +static void RandomTimeoutWriter(void* arg) { + TimeoutWriterState* state = reinterpret_cast(arg); + static const uint64_t kTimerBias = 50; + int thread_id = state->id; + DB* db = state->db; + + Random rnd(1000 + thread_id); + WriteOptions write_opt = WriteOptions(); + write_opt.timeout_hint_us = 500; + int timeout_count = 0; + int num_keys = kNumKeys * 5; + + for (int k = 0; k < num_keys; ++k) { + int key = k + thread_id * num_keys; + std::string value = RandomString(&rnd, kValueSize); + // only the second-half is randomized + if (k > num_keys / 2) { + switch (rnd.Next() % 5) { + case 0: + write_opt.timeout_hint_us = 500 * thread_id; + break; + case 1: + write_opt.timeout_hint_us = num_keys - k; + break; + case 2: + write_opt.timeout_hint_us = 1; + break; + default: + write_opt.timeout_hint_us = 0; + state->success_kvs.insert({key, value}); + } + } + + uint64_t time_before_put = db->GetEnv()->NowMicros(); + Status s = db->Put(write_opt, Key(key), value); + uint64_t put_duration = db->GetEnv()->NowMicros() - time_before_put; + if (write_opt.timeout_hint_us == 0 || + put_duration + kTimerBias < write_opt.timeout_hint_us) { + ASSERT_OK(s); + std::string result; + } + if (s.IsTimedOut()) { + timeout_count++; + ASSERT_GT(put_duration + kTimerBias, write_opt.timeout_hint_us); + } + } + + state->done = true; +} + +TEST(DBTest, MTRandomTimeoutTest) { + Options options; + options.env = env_; + options.create_if_missing = true; + options.max_write_buffer_number = 2; + options.compression = kNoCompression; + options.level0_slowdown_writes_trigger = 10; + options.level0_stop_writes_trigger = 20; + options.write_buffer_size = kWriteBufferSize; + DestroyAndReopen(&options); + + TimeoutWriterState thread_states[kNumThreads]; + for (int tid = 0; tid < kNumThreads; ++tid) { + thread_states[tid].id = tid; + thread_states[tid].db = db_; + thread_states[tid].done = false; + env_->StartThread(RandomTimeoutWriter, &thread_states[tid]); + } + + for (int tid = 0; tid < kNumThreads; ++tid) { + while (thread_states[tid].done == false) { + env_->SleepForMicroseconds(100000); + } + } + + Flush(); + + for (int tid = 0; tid < kNumThreads; ++tid) { + auto& success_kvs = thread_states[tid].success_kvs; + for (auto it = success_kvs.begin(); it != success_kvs.end(); ++it) { + ASSERT_EQ(Get(Key(it->first)), it->second); + } + } +} + +} // anonymous namespace + } // namespace rocksdb int main(int argc, char** argv) { diff --git a/db/version_set.cc b/db/version_set.cc index b68923bc0..507a018bb 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -1472,7 +1472,7 @@ class VersionSet::Builder { } } if (!found) { - fprintf(stderr, "not found %ld\n", number); + fprintf(stderr, "not found %" PRIu64 "\n", number); } assert(found); #endif diff --git a/include/rocksdb/iostats_context.h b/include/rocksdb/iostats_context.h new file mode 100644 index 000000000..0a220b53a --- /dev/null +++ b/include/rocksdb/iostats_context.h @@ -0,0 +1,34 @@ +// Copyright (c) 2014, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#ifndef INCLUDE_ROCKSDB_IOSTATS_CONTEXT_H_ +#define INCLUDE_ROCKSDB_IOSTATS_CONTEXT_H_ + +#include +#include + +// A thread local context for gathering io-stats efficiently and transparently. +namespace rocksdb { + +struct IOStatsContext { + // reset all io-stats counter to zero + void Reset(); + + std::string ToString() const; + + // the thread pool id + uint64_t thread_pool_id; + + // number of bytes that has been written. + uint64_t bytes_written; + // number of bytes that has been read. + uint64_t bytes_read; +}; + +extern __thread IOStatsContext iostats_context; + +} // namespace rocksdb + +#endif // INCLUDE_ROCKSDB_IOSTATS_CONTEXT_H_ diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index df7383d25..928149332 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -975,7 +975,18 @@ struct WriteOptions { // and the write may got lost after a crash. bool disableWAL; - WriteOptions() : sync(false), disableWAL(false) {} + // If non-zero, then associated write waiting longer than the specified + // time MAY be aborted and returns Status::TimedOut. A write that takes + // less than the specified time is guaranteed to not fail with + // Status::TimedOut. + // + // The number of times a write call encounters a timeout is recorded in + // Statistics.WRITE_TIMEDOUT + // + // Default: 0 + uint64_t timeout_hint_us; + + WriteOptions() : sync(false), disableWAL(false), timeout_hint_us(0) {} }; // Options that control flush operations diff --git a/include/rocksdb/statistics.h b/include/rocksdb/statistics.h index 7d5235f65..c205f1b8c 100644 --- a/include/rocksdb/statistics.h +++ b/include/rocksdb/statistics.h @@ -115,9 +115,11 @@ enum Tickers { // head of the writers queue. WRITE_DONE_BY_SELF, WRITE_DONE_BY_OTHER, + WRITE_TIMEDOUT, // Number of writes ending up with timed-out. WRITE_WITH_WAL, // Number of Write calls that request WAL COMPACT_READ_BYTES, // Bytes read during compaction COMPACT_WRITE_BYTES, // Bytes written during compaction + FLUSH_WRITE_BYTES, // Bytes written during flush // Number of table's properties loaded directly from file, without creating // table reader object. @@ -176,7 +178,9 @@ const std::vector> TickersNameMap = { {WAL_FILE_BYTES, "rocksdb.wal.bytes"}, {WRITE_DONE_BY_SELF, "rocksdb.write.self"}, {WRITE_DONE_BY_OTHER, "rocksdb.write.other"}, + {WRITE_TIMEDOUT, "rocksdb.write.timedout"}, {WRITE_WITH_WAL, "rocksdb.write.wal"}, + {FLUSH_WRITE_BYTES, "rocksdb.flush.write.bytes"}, {COMPACT_READ_BYTES, "rocksdb.compact.read.bytes"}, {COMPACT_WRITE_BYTES, "rocksdb.compact.write.bytes"}, {NUMBER_DIRECT_LOAD_TABLE_PROPERTIES, diff --git a/include/rocksdb/status.h b/include/rocksdb/status.h index 0298a2838..b20689a77 100644 --- a/include/rocksdb/status.h +++ b/include/rocksdb/status.h @@ -65,6 +65,12 @@ class Status { const Slice& msg2 = Slice()) { return Status(kShutdownInProgress, msg, msg2); } + static Status TimedOut() { + return Status(kTimedOut); + } + static Status TimedOut(const Slice& msg, const Slice& msg2 = Slice()) { + return Status(kTimedOut, msg, msg2); + } // Returns true iff the status indicates success. bool ok() const { return code() == kOk; } @@ -93,6 +99,8 @@ class Status { // Returns true iff the status indicates Incomplete bool IsShutdownInProgress() const { return code() == kShutdownInProgress; } + bool IsTimedOut() const { return code() == kTimedOut; } + // Return a string representation of this status suitable for printing. // Returns the string "OK" for success. std::string ToString() const; @@ -106,7 +114,8 @@ class Status { kIOError = 5, kMergeInProgress = 6, kIncomplete = 7, - kShutdownInProgress = 8 + kShutdownInProgress = 8, + kTimedOut = 9 }; Code code() const { diff --git a/port/port_posix.cc b/port/port_posix.cc index 90dde3227..f353c475e 100644 --- a/port/port_posix.cc +++ b/port/port_posix.cc @@ -20,11 +20,12 @@ namespace rocksdb { namespace port { -static void PthreadCall(const char* label, int result) { - if (result != 0) { +static int PthreadCall(const char* label, int result) { + if (result != 0 && result != ETIMEDOUT) { fprintf(stderr, "pthread %s: %s\n", label, strerror(result)); abort(); } + return result; } Mutex::Mutex(bool adaptive) { diff --git a/table/plain_table_key_coding.cc b/table/plain_table_key_coding.cc index 51849b3e3..eedf58aea 100644 --- a/table/plain_table_key_coding.cc +++ b/table/plain_table_key_coding.cc @@ -198,7 +198,8 @@ Status PlainTableKeyDecoder::NextPlainEncodingKey( user_key_size = static_cast(tmp_size); *bytes_read = key_ptr - start; } - bool decoded_internal_key_valid; + // dummy initial value to avoid compiler complain + bool decoded_internal_key_valid = true; Slice decoded_internal_key; Status s = ReadInternalKey(key_ptr, limit, user_key_size, parsed_key, bytes_read, @@ -227,7 +228,8 @@ Status PlainTableKeyDecoder::NextPrefixEncodingKey( bool expect_suffix = false; do { size_t size = 0; - bool decoded_internal_key_valid; + // dummy initial value to avoid compiler complain + bool decoded_internal_key_valid = true; const char* pos = DecodeSize(key_ptr, limit, &entry_type, &size); if (pos == nullptr) { return Status::Corruption("Unexpected EOF when reading size of the key"); diff --git a/util/env_posix.cc b/util/env_posix.cc index 3bfeb0ea0..a73ec6b0e 100644 --- a/util/env_posix.cc +++ b/util/env_posix.cc @@ -40,6 +40,7 @@ #include "util/logging.h" #include "util/posix_logger.h" #include "util/random.h" +#include "util/iostats_context_imp.h" #include // Get nano time for mach systems @@ -178,6 +179,7 @@ class PosixSequentialFile: public SequentialFile { do { r = fread_unlocked(scratch, 1, n, file_); } while (r == 0 && ferror(file_) && errno == EINTR); + IOSTATS_ADD(bytes_read, r); *result = Slice(scratch, r); if (r < n) { if (feof(file_)) { @@ -241,6 +243,7 @@ class PosixRandomAccessFile: public RandomAccessFile { do { r = pread(fd_, scratch, n, static_cast(offset)); } while (r < 0 && errno == EINTR); + IOSTATS_ADD_IF_POSITIVE(bytes_read, r); *result = Slice(scratch, (r < 0) ? 0 : r); if (r < 0) { // An error: return a non-ok status @@ -488,6 +491,7 @@ class PosixMmapFile : public WritableFile { size_t n = (left <= avail) ? left : avail; memcpy(dst_, src, n); + IOSTATS_ADD(bytes_written, n); dst_ += n; src += n; left -= n; @@ -694,6 +698,7 @@ class PosixWritableFile : public WritableFile { } return IOError(filename_, errno); } + IOSTATS_ADD(bytes_written, done); TEST_KILL_RANDOM(rocksdb_kill_odds); left -= done; @@ -744,6 +749,7 @@ class PosixWritableFile : public WritableFile { } return IOError(filename_, errno); } + IOSTATS_ADD(bytes_written, done); TEST_KILL_RANDOM(rocksdb_kill_odds * REDUCE_ODDS2); left -= done; src += done; @@ -877,6 +883,7 @@ class PosixRandomRWFile : public RandomRWFile { } return IOError(filename_, errno); } + IOSTATS_ADD(bytes_written, done); left -= done; src += done; @@ -890,6 +897,7 @@ class PosixRandomRWFile : public RandomRWFile { char* scratch) const { Status s; ssize_t r = pread(fd_, scratch, n, static_cast(offset)); + IOSTATS_ADD_IF_POSITIVE(bytes_read, r); *result = Slice(scratch, (r < 0) ? 0 : r); if (r < 0) { s = IOError(filename_, errno); diff --git a/util/iostats_context.cc b/util/iostats_context.cc new file mode 100644 index 000000000..610831779 --- /dev/null +++ b/util/iostats_context.cc @@ -0,0 +1,30 @@ +// Copyright (c) 2014, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#include +#include "rocksdb/env.h" +#include "util/iostats_context_imp.h" + +namespace rocksdb { + +__thread IOStatsContext iostats_context; + +void IOStatsContext::Reset() { + thread_pool_id = Env::Priority::TOTAL; + bytes_read = 0; + bytes_written = 0; +} + +#define OUTPUT(counter) #counter << " = " << counter << ", " + +std::string IOStatsContext::ToString() const { + std::ostringstream ss; + ss << OUTPUT(thread_pool_id) + << OUTPUT(bytes_read) + << OUTPUT(bytes_written); + return ss.str(); +} + +} // namespace rocksdb diff --git a/util/iostats_context_imp.h b/util/iostats_context_imp.h new file mode 100644 index 000000000..ed34037d3 --- /dev/null +++ b/util/iostats_context_imp.h @@ -0,0 +1,32 @@ +// Copyright (c) 2014, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// +#pragma once +#include "rocksdb/iostats_context.h" + +// increment a specific counter by the specified value +#define IOSTATS_ADD(metric, value) \ + (iostats_context.metric += value) + +// Increase metric value only when it is positive +#define IOSTATS_ADD_IF_POSITIVE(metric, value) \ + if (value > 0) { IOSTATS_ADD(metric, value); } + +// reset a specific counter to zero +#define IOSTATS_RESET(metric) \ + (iostats_context.metric = 0) + +// reset all counters to zero +#define IOSTATS_RESET_ALL() \ + (iostats_context.Reset()) + +#define IOSTATS_SET_THREAD_POOL_ID(value) \ + (iostats_context.thread_pool_id = value) + +#define IOSTATS_THREAD_POOL_ID() \ + (iostats_context.thread_pool_id) + +#define IOSTATS(metric) \ + (iostats_context.metric) diff --git a/util/status.cc b/util/status.cc index 2a5f05a4b..3165a497d 100644 --- a/util/status.cc +++ b/util/status.cc @@ -68,6 +68,9 @@ std::string Status::ToString() const { case kShutdownInProgress: type = "Shutdown in progress: "; break; + case kTimedOut: + type = "Operation timed out: "; + break; default: snprintf(tmp, sizeof(tmp), "Unknown code(%d): ", static_cast(code())); diff --git a/util/stop_watch.h b/util/stop_watch.h index 48e1b01c2..bc31cfc46 100644 --- a/util/stop_watch.h +++ b/util/stop_watch.h @@ -24,7 +24,7 @@ class StopWatch { - uint64_t ElapsedMicros() { + uint64_t ElapsedMicros() const { return env_->NowMicros() - start_time_; }