From c217e0b9c7348bfb55ceef2fd6d7c645174818ff Mon Sep 17 00:00:00 2001 From: Andrew Kryczka Date: Tue, 13 Jun 2017 14:51:22 -0700 Subject: [PATCH] Call RateLimiter for compaction reads Summary: Allow users to rate limit background work based on read bytes, written bytes, or sum of read and written bytes. Support these by changing the RateLimiter API, so no additional options were needed. Closes https://github.com/facebook/rocksdb/pull/2433 Differential Revision: D5216946 Pulled By: ajkr fbshipit-source-id: aec57a8357dbb4bfde2003261094d786d94f724e --- HISTORY.md | 1 + db/db_impl_write.cc | 5 +- db/db_test2.cc | 65 +++++++++++++++++ db/table_cache.cc | 9 ++- db/table_cache.h | 3 +- include/rocksdb/rate_limiter.h | 70 +++++++++++++++--- options/cf_options.cc | 1 + options/cf_options.h | 2 + tools/db_bench_tool.cc | 53 +++++++++----- tools/db_stress.cc | 17 ++++- util/file_reader_writer.cc | 100 +++++++++++++++++--------- util/file_reader_writer.h | 24 ++++--- util/rate_limiter.cc | 30 ++++++-- util/rate_limiter.h | 5 +- util/rate_limiter_test.cc | 32 ++++++++- utilities/backupable/backupable_db.cc | 3 +- 16 files changed, 333 insertions(+), 87 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index c541fad7d..cabf93b73 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -5,6 +5,7 @@ ### New Features * Measure estimated number of reads per file. The information can be accessed through DB::GetColumnFamilyMetaData or "rocksdb.sstables" DB property. +* RateLimiter support for throttling background reads, or throttling the sum of background reads and writes. This can give more predictable I/O usage when compaction reads more data than it writes, e.g., due to lots of deletions. ## 5.6.0 (06/06/2017) ### Public API Change diff --git a/db/db_impl_write.cc b/db/db_impl_write.cc index 477fc7469..324fcd1da 100644 --- a/db/db_impl_write.cc +++ b/db/db_impl_write.cc @@ -769,8 +769,9 @@ Status DBImpl::ThrottleLowPriWritesIfNeeded(const WriteOptions& write_options, // is that in case the write is heavy, low pri writes may never have // a chance to run. Now we guarantee we are still slowly making // progress. - write_controller_.low_pri_rate_limiter()->Request(my_batch->GetDataSize(), - Env::IO_HIGH, nullptr); + write_controller_.low_pri_rate_limiter()->Request( + my_batch->GetDataSize(), Env::IO_HIGH, nullptr /* stats */, + RateLimiter::OpType::kWrite); } } return Status::OK(); diff --git a/db/db_test2.cc b/db/db_test2.cc index 42403f2aa..b2b814961 100644 --- a/db/db_test2.cc +++ b/db/db_test2.cc @@ -2236,6 +2236,71 @@ TEST_F(DBTest2, LowPriWrite) { Put("", "", wo); ASSERT_EQ(1, rate_limit_count.load()); } + +TEST_F(DBTest2, RateLimitedCompactionReads) { + // compaction input has 512KB data + const int kNumKeysPerFile = 128; + const int kBytesPerKey = 1024; + const int kNumL0Files = 4; + + for (auto use_direct_io : {false, true}) { + if (use_direct_io && !IsDirectIOSupported()) { + continue; + } + Options options = CurrentOptions(); + options.compression = kNoCompression; + options.level0_file_num_compaction_trigger = kNumL0Files; + options.memtable_factory.reset(new SpecialSkipListFactory(kNumKeysPerFile)); + options.new_table_reader_for_compaction_inputs = true; + // takes roughly one second, split into 100 x 10ms intervals. Each interval + // permits 5.12KB, which is smaller than the block size, so this test + // exercises the code for chunking reads. + options.rate_limiter.reset(NewGenericRateLimiter( + static_cast(kNumL0Files * kNumKeysPerFile * + kBytesPerKey) /* rate_bytes_per_sec */, + 10 * 1000 /* refill_period_us */, 10 /* fairness */, + RateLimiter::Mode::kReadsOnly)); + options.use_direct_io_for_flush_and_compaction = use_direct_io; + BlockBasedTableOptions bbto; + bbto.block_size = 16384; + bbto.no_block_cache = true; + options.table_factory.reset(new BlockBasedTableFactory(bbto)); + DestroyAndReopen(options); + + for (int i = 0; i < kNumL0Files; ++i) { + for (int j = 0; j <= kNumKeysPerFile; ++j) { + ASSERT_OK(Put(Key(j), DummyString(kBytesPerKey))); + } + dbfull()->TEST_WaitForFlushMemTable(); + ASSERT_EQ(i + 1, NumTableFilesAtLevel(0)); + } + dbfull()->TEST_WaitForCompact(); + ASSERT_EQ(0, NumTableFilesAtLevel(0)); + + ASSERT_EQ(0, options.rate_limiter->GetTotalBytesThrough(Env::IO_HIGH)); + // should be slightly above 512KB due to non-data blocks read. Arbitrarily + // chose 1MB as the upper bound on the total bytes read. + size_t rate_limited_bytes = + options.rate_limiter->GetTotalBytesThrough(Env::IO_LOW); + ASSERT_GE( + rate_limited_bytes, + static_cast(kNumKeysPerFile * kBytesPerKey * kNumL0Files)); + ASSERT_LT( + rate_limited_bytes, + static_cast(2 * kNumKeysPerFile * kBytesPerKey * kNumL0Files)); + + Iterator* iter = db_->NewIterator(ReadOptions()); + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + ASSERT_EQ(iter->value().ToString(), DummyString(kBytesPerKey)); + } + delete iter; + // bytes read for user iterator shouldn't count against the rate limit. + ASSERT_EQ(rate_limited_bytes, + static_cast( + options.rate_limiter->GetTotalBytesThrough(Env::IO_LOW))); + } +} + } // namespace rocksdb int main(int argc, char** argv) { diff --git a/db/table_cache.cc b/db/table_cache.cc index be5b9f038..287f3dea9 100644 --- a/db/table_cache.cc +++ b/db/table_cache.cc @@ -91,7 +91,8 @@ Status TableCache::GetTableReader( const InternalKeyComparator& internal_comparator, const FileDescriptor& fd, bool sequential_mode, size_t readahead, bool record_read_stats, HistogramImpl* file_read_hist, unique_ptr* table_reader, - bool skip_filters, int level, bool prefetch_index_and_filter_in_cache) { + bool skip_filters, int level, bool prefetch_index_and_filter_in_cache, + bool for_compaction) { std::string fname = TableFileName(ioptions_.db_paths, fd.GetNumber(), fd.GetPathId()); unique_ptr file; @@ -109,7 +110,8 @@ Status TableCache::GetTableReader( std::unique_ptr file_reader( new RandomAccessFileReader(std::move(file), ioptions_.env, ioptions_.statistics, record_read_stats, - file_read_hist)); + file_read_hist, ioptions_.rate_limiter, + for_compaction)); s = ioptions_.table_factory->NewTableReader( TableReaderOptions(ioptions_, env_options, internal_comparator, skip_filters, level), @@ -205,7 +207,8 @@ InternalIterator* TableCache::NewIterator( s = GetTableReader( env_options, icomparator, fd, true /* sequential_mode */, readahead, !for_compaction /* record stats */, nullptr, &table_reader_unique_ptr, - false /* skip_filters */, level); + false /* skip_filters */, level, + true /* prefetch_index_and_filter_in_cache */, for_compaction); if (s.ok()) { table_reader = table_reader_unique_ptr.release(); } diff --git a/db/table_cache.h b/db/table_cache.h index 68dbfa74e..85adba510 100644 --- a/db/table_cache.h +++ b/db/table_cache.h @@ -136,7 +136,8 @@ class TableCache { HistogramImpl* file_read_hist, unique_ptr* table_reader, bool skip_filters = false, int level = -1, - bool prefetch_index_and_filter_in_cache = true); + bool prefetch_index_and_filter_in_cache = true, + bool for_compaction = false); const ImmutableCFOptions& ioptions_; const EnvOptions& env_options_; diff --git a/include/rocksdb/rate_limiter.h b/include/rocksdb/rate_limiter.h index 1fec5e081..03976a9f6 100644 --- a/include/rocksdb/rate_limiter.h +++ b/include/rocksdb/rate_limiter.h @@ -18,22 +18,40 @@ namespace rocksdb { class RateLimiter { public: + enum class OpType { + // Limitation: we currently only invoke Request() with OpType::kRead for + // compactions when DBOptions::new_table_reader_for_compaction_inputs is set + kRead, + kWrite, + }; + enum class Mode { + kReadsOnly, + kWritesOnly, + kAllIo, + }; + + // For API compatibility, default to rate-limiting writes only. + explicit RateLimiter(Mode mode = Mode::kWritesOnly) : mode_(mode) {} + virtual ~RateLimiter() {} // This API allows user to dynamically change rate limiter's bytes per second. // REQUIRED: bytes_per_second > 0 virtual void SetBytesPerSecond(int64_t bytes_per_second) = 0; - // Request for token to write bytes. If this request can not be satisfied, - // the call is blocked. Caller is responsible to make sure + // Deprecated. New RateLimiter derived classes should override + // Request(const int64_t, const Env::IOPriority, Statistics*) or + // Request(const int64_t, const Env::IOPriority, Statistics*, OpType) + // instead. + // + // Request for token for bytes. If this request can not be satisfied, the call + // is blocked. Caller is responsible to make sure // bytes <= GetSingleBurstBytes() virtual void Request(const int64_t bytes, const Env::IOPriority pri) { - // Deprecated. New RateLimiter derived classes should override - // Request(const int64_t, const Env::IOPriority, Statistics*) instead. assert(false); } - // Request for token to write bytes and potentially update statistics. If this + // Request for token for bytes and potentially update statistics. If this // request can not be satisfied, the call is blocked. Caller is responsible to // make sure bytes <= GetSingleBurstBytes(). virtual void Request(const int64_t bytes, const Env::IOPriority pri, @@ -43,6 +61,25 @@ class RateLimiter { Request(bytes, pri); } + // Requests token to read or write bytes and potentially updates statistics. + // + // If this request can not be satisfied, the call is blocked. Caller is + // responsible to make sure bytes <= GetSingleBurstBytes(). + virtual void Request(const int64_t bytes, const Env::IOPriority pri, + Statistics* stats, OpType op_type) { + if (IsRateLimited(op_type)) { + Request(bytes, pri, stats); + } + } + + // Requests token to read or write bytes and potentially updates statistics. + // Takes into account GetSingleBurstBytes() and alignment (e.g., in case of + // direct I/O) to allocate an appropriate number of bytes, which may be less + // than the number of bytes requested. + virtual size_t RequestToken(size_t bytes, size_t alignment, + Env::IOPriority io_priority, Statistics* stats, + RateLimiter::OpType op_type); + // Max bytes can be granted in a single burst virtual int64_t GetSingleBurstBytes() const = 0; @@ -55,6 +92,22 @@ class RateLimiter { const Env::IOPriority pri = Env::IO_TOTAL) const = 0; virtual int64_t GetBytesPerSecond() const = 0; + + virtual bool IsRateLimited(OpType op_type) { + if ((mode_ == RateLimiter::Mode::kWritesOnly && + op_type == RateLimiter::OpType::kRead) || + (mode_ == RateLimiter::Mode::kReadsOnly && + op_type == RateLimiter::OpType::kWrite)) { + return false; + } + return true; + } + + protected: + Mode GetMode() { return mode_; } + + private: + const Mode mode_; }; // Create a RateLimiter object, which can be shared among RocksDB instances to @@ -75,9 +128,10 @@ class RateLimiter { // continuously. This fairness parameter grants low-pri requests permission by // 1/fairness chance even though high-pri requests exist to avoid starvation. // You should be good by leaving it at default 10. +// @mode: Mode indicates which types of operations count against the limit. extern RateLimiter* NewGenericRateLimiter( - int64_t rate_bytes_per_sec, - int64_t refill_period_us = 100 * 1000, - int32_t fairness = 10); + int64_t rate_bytes_per_sec, int64_t refill_period_us = 100 * 1000, + int32_t fairness = 10, + RateLimiter::Mode mode = RateLimiter::Mode::kWritesOnly); } // namespace rocksdb diff --git a/options/cf_options.cc b/options/cf_options.cc index 79e60abb5..d0c0a6f5d 100644 --- a/options/cf_options.cc +++ b/options/cf_options.cc @@ -45,6 +45,7 @@ ImmutableCFOptions::ImmutableCFOptions(const ImmutableDBOptions& db_options, inplace_callback(cf_options.inplace_callback), info_log(db_options.info_log.get()), statistics(db_options.statistics.get()), + rate_limiter(db_options.rate_limiter.get()), env(db_options.env), allow_mmap_reads(db_options.allow_mmap_reads), allow_mmap_writes(db_options.allow_mmap_writes), diff --git a/options/cf_options.h b/options/cf_options.h index 397ee5d6f..b1e11d7bb 100644 --- a/options/cf_options.h +++ b/options/cf_options.h @@ -59,6 +59,8 @@ struct ImmutableCFOptions { Statistics* statistics; + RateLimiter* rate_limiter; + InfoLogLevel info_log_level; Env* env; diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index 27adaf76b..6ea3fc47f 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -785,6 +785,9 @@ DEFINE_int32(rate_limit_delay_max_milliseconds, 1000, DEFINE_uint64(rate_limiter_bytes_per_sec, 0, "Set options.rate_limiter value."); +DEFINE_bool(rate_limit_bg_reads, false, + "Use options.rate_limiter on compaction reads"); + DEFINE_uint64( benchmark_write_rate_limit, 0, "If non-zero, db_bench will rate-limit the writes going into RocksDB. This " @@ -2579,8 +2582,9 @@ void VerifyDBFromDB(std::string& truth_db_name) { NewGenericRateLimiter(FLAGS_benchmark_write_rate_limit)); } if (FLAGS_benchmark_read_rate_limit > 0) { - shared.read_rate_limiter.reset( - NewGenericRateLimiter(FLAGS_benchmark_read_rate_limit)); + shared.read_rate_limiter.reset(NewGenericRateLimiter( + FLAGS_benchmark_read_rate_limit, 100000 /* refill_period_us */, + 10 /* fairness */, RateLimiter::Mode::kReadsOnly)); } std::unique_ptr reporter_agent; @@ -3132,8 +3136,18 @@ void VerifyDBFromDB(std::string& truth_db_name) { options.enable_thread_tracking = true; } if (FLAGS_rate_limiter_bytes_per_sec > 0) { - options.rate_limiter.reset( - NewGenericRateLimiter(FLAGS_rate_limiter_bytes_per_sec)); + if (FLAGS_rate_limit_bg_reads && + !FLAGS_new_table_reader_for_compaction_inputs) { + fprintf(stderr, + "rate limit compaction reads must have " + "new_table_reader_for_compaction_inputs set\n"); + exit(1); + } + options.rate_limiter.reset(NewGenericRateLimiter( + FLAGS_rate_limiter_bytes_per_sec, 100 * 1000 /* refill_period_us */, + 10 /* fairness */, + FLAGS_rate_limit_bg_reads ? RateLimiter::Mode::kReadsOnly + : RateLimiter::Mode::kWritesOnly)); } #ifndef ROCKSDB_LITE @@ -3423,7 +3437,7 @@ void VerifyDBFromDB(std::string& truth_db_name) { if (thread->shared->write_rate_limiter.get() != nullptr) { thread->shared->write_rate_limiter->Request( entries_per_batch_ * (value_size_ + key_size_), Env::IO_HIGH, - nullptr /* stats */); + nullptr /* stats */, RateLimiter::OpType::kWrite); // Set time at which last op finished to Now() to hide latency and // sleep from rate limiter. Also, do the check once per batch, not // once per write. @@ -3833,7 +3847,8 @@ void VerifyDBFromDB(std::string& truth_db_name) { if (thread->shared->read_rate_limiter.get() != nullptr && i % 1024 == 1023) { thread->shared->read_rate_limiter->Request(1024, Env::IO_HIGH, - nullptr /* stats */); + nullptr /* stats */, + RateLimiter::OpType::kRead); } } @@ -3865,7 +3880,8 @@ void VerifyDBFromDB(std::string& truth_db_name) { if (thread->shared->read_rate_limiter.get() != nullptr && i % 1024 == 1023) { thread->shared->read_rate_limiter->Request(1024, Env::IO_HIGH, - nullptr /* stats */); + nullptr /* stats */, + RateLimiter::OpType::kRead); } } delete iter; @@ -3906,8 +3922,8 @@ void VerifyDBFromDB(std::string& truth_db_name) { } } if (thread->shared->read_rate_limiter.get() != nullptr) { - thread->shared->read_rate_limiter->Request(100, Env::IO_HIGH, - nullptr /* stats */); + thread->shared->read_rate_limiter->Request( + 100, Env::IO_HIGH, nullptr /* stats */, RateLimiter::OpType::kRead); } thread->stats.FinishedOps(nullptr, db, 100, kRead); @@ -3991,8 +4007,8 @@ void VerifyDBFromDB(std::string& truth_db_name) { if (thread->shared->read_rate_limiter.get() != nullptr && read % 256 == 255) { - thread->shared->read_rate_limiter->Request(256, Env::IO_HIGH, - nullptr /* stats */); + thread->shared->read_rate_limiter->Request( + 256, Env::IO_HIGH, nullptr /* stats */, RateLimiter::OpType::kRead); } thread->stats.FinishedOps(db_with_cfh, db_with_cfh->db, 1, kRead); @@ -4048,7 +4064,8 @@ void VerifyDBFromDB(std::string& truth_db_name) { if (thread->shared->read_rate_limiter.get() != nullptr && num_multireads % 256 == 255) { thread->shared->read_rate_limiter->Request( - 256 * entries_per_batch_, Env::IO_HIGH, nullptr /* stats */); + 256 * entries_per_batch_, Env::IO_HIGH, nullptr /* stats */, + RateLimiter::OpType::kRead); } thread->stats.FinishedOps(nullptr, db, entries_per_batch_, kRead); } @@ -4145,8 +4162,8 @@ void VerifyDBFromDB(std::string& truth_db_name) { if (thread->shared->read_rate_limiter.get() != nullptr && read % 256 == 255) { - thread->shared->read_rate_limiter->Request(256, Env::IO_HIGH, - nullptr /* stats */); + thread->shared->read_rate_limiter->Request( + 256, Env::IO_HIGH, nullptr /* stats */, RateLimiter::OpType::kRead); } thread->stats.FinishedOps(&db_, db_.db, 1, kSeek); @@ -4294,7 +4311,7 @@ void VerifyDBFromDB(std::string& truth_db_name) { if (FLAGS_benchmark_write_rate_limit > 0) { write_rate_limiter->Request( entries_per_batch_ * (value_size_ + key_size_), Env::IO_HIGH, - nullptr /* stats */); + nullptr /* stats */, RateLimiter::OpType::kWrite); } } thread->stats.AddBytes(bytes); @@ -4965,8 +4982,8 @@ void VerifyDBFromDB(std::string& truth_db_name) { found += key_found; if (thread->shared->read_rate_limiter.get() != nullptr) { - thread->shared->read_rate_limiter->Request(1, Env::IO_HIGH, - nullptr /* stats */); + thread->shared->read_rate_limiter->Request( + 1, Env::IO_HIGH, nullptr /* stats */, RateLimiter::OpType::kRead); } } delete iter; @@ -5037,7 +5054,7 @@ void VerifyDBFromDB(std::string& truth_db_name) { if (FLAGS_benchmark_write_rate_limit > 0) { write_rate_limiter->Request( entries_per_batch_ * (value_size_ + key_size_), Env::IO_HIGH, - nullptr /* stats */); + nullptr /* stats */, RateLimiter::OpType::kWrite); } } } diff --git a/tools/db_stress.cc b/tools/db_stress.cc index 44ff1bbda..ef7273ee8 100644 --- a/tools/db_stress.cc +++ b/tools/db_stress.cc @@ -245,7 +245,7 @@ DEFINE_uint64(subcompactions, 1, "Maximum number of subcompactions to divide L0-L1 compactions " "into."); -DEFINE_bool(allow_concurrent_memtable_write, true, +DEFINE_bool(allow_concurrent_memtable_write, false, "Allow multi-writers to update mem tables in parallel."); DEFINE_bool(enable_write_thread_adaptive_yield, true, @@ -326,6 +326,11 @@ DEFINE_double(max_bytes_for_level_multiplier, 2, DEFINE_int32(range_deletion_width, 10, "The width of the range deletion intervals."); +DEFINE_uint64(rate_limiter_bytes_per_sec, 0, "Set options.rate_limiter value."); + +DEFINE_bool(rate_limit_bg_reads, false, + "Use options.rate_limiter on compaction reads"); + // Temporarily disable this to allows it to detect new bugs DEFINE_int32(compact_files_one_in, 0, "If non-zero, then CompactFiles() will be called one for every N " @@ -2182,6 +2187,16 @@ class StressTest { FLAGS_allow_concurrent_memtable_write; options_.enable_write_thread_adaptive_yield = FLAGS_enable_write_thread_adaptive_yield; + if (FLAGS_rate_limiter_bytes_per_sec > 0) { + options_.rate_limiter.reset(NewGenericRateLimiter( + FLAGS_rate_limiter_bytes_per_sec, 1000 /* refill_period_us */, + 10 /* fairness */, + FLAGS_rate_limit_bg_reads ? RateLimiter::Mode::kReadsOnly + : RateLimiter::Mode::kWritesOnly)); + if (FLAGS_rate_limit_bg_reads) { + options_.new_table_reader_for_compaction_inputs = true; + } + } if (FLAGS_prefix_size == 0 && FLAGS_rep_factory == kHashSkipList) { fprintf(stderr, diff --git a/util/file_reader_writer.cc b/util/file_reader_writer.cc index a29fe9715..b578b642a 100644 --- a/util/file_reader_writer.cc +++ b/util/file_reader_writer.cc @@ -84,22 +84,63 @@ Status RandomAccessFileReader::Read(uint64_t offset, size_t n, Slice* result, size_t alignment = file_->GetRequiredBufferAlignment(); size_t aligned_offset = TruncateToPageBoundary(alignment, offset); size_t offset_advance = offset - aligned_offset; - size_t size = Roundup(offset + n, alignment) - aligned_offset; - size_t r = 0; + size_t read_size = Roundup(offset + n, alignment) - aligned_offset; AlignedBuffer buf; buf.Alignment(alignment); - buf.AllocateNewBuffer(size); - Slice tmp; - s = file_->Read(aligned_offset, size, &tmp, buf.BufferStart()); - if (s.ok() && offset_advance < tmp.size()) { - buf.Size(tmp.size()); - r = buf.Read(scratch, offset_advance, - std::min(tmp.size() - offset_advance, n)); + buf.AllocateNewBuffer(read_size); + while (buf.CurrentSize() < read_size) { + size_t allowed; + if (rate_limiter_ != nullptr) { + allowed = rate_limiter_->RequestToken( + buf.Capacity() - buf.CurrentSize(), buf.Alignment(), + Env::IOPriority::IO_LOW, stats_, RateLimiter::OpType::kRead); + } else { + assert(buf.CurrentSize() == 0); + allowed = read_size; + } + Slice tmp; + s = file_->Read(aligned_offset + buf.CurrentSize(), allowed, &tmp, + buf.Destination()); + buf.Size(buf.CurrentSize() + tmp.size()); + if (!s.ok() || tmp.size() < allowed) { + break; + } } - *result = Slice(scratch, r); + size_t res_len = 0; + if (s.ok() && offset_advance < buf.CurrentSize()) { + res_len = buf.Read(scratch, offset_advance, + std::min(buf.CurrentSize() - offset_advance, n)); + } + *result = Slice(scratch, res_len); #endif // !ROCKSDB_LITE } else { - s = file_->Read(offset, n, result, scratch); + size_t pos = 0; + const char* res_scratch = nullptr; + while (pos < n) { + size_t allowed; + if (for_compaction_ && rate_limiter_ != nullptr) { + allowed = rate_limiter_->RequestToken(n - pos, 0 /* alignment */, + Env::IOPriority::IO_LOW, stats_, + RateLimiter::OpType::kRead); + } else { + allowed = n; + } + Slice tmp_result; + s = file_->Read(offset + pos, allowed, &tmp_result, scratch + pos); + if (res_scratch == nullptr) { + // we can't simply use `scratch` because reads of mmap'd files return + // data in a different buffer. + res_scratch = tmp_result.data(); + } else { + // make sure chunks are inserted contiguously into `res_scratch`. + assert(tmp_result.data() == res_scratch + pos); + } + pos += tmp_result.size(); + if (!s.ok() || tmp_result.size() < allowed) { + break; + } + } + *result = Slice(res_scratch, s.ok() ? pos : 0); } IOSTATS_ADD_IF_POSITIVE(bytes_read, result->size()); } @@ -319,25 +360,6 @@ Status WritableFileWriter::RangeSync(uint64_t offset, uint64_t nbytes) { return writable_file_->RangeSync(offset, nbytes); } -size_t WritableFileWriter::RequestToken(size_t bytes, bool align) { - Env::IOPriority io_priority; - if (rate_limiter_ && (io_priority = writable_file_->GetIOPriority()) < - Env::IO_TOTAL) { - bytes = std::min( - bytes, static_cast(rate_limiter_->GetSingleBurstBytes())); - - if (align) { - // Here we may actually require more than burst and block - // but we can not write less than one page at a time on direct I/O - // thus we may want not to use ratelimiter - size_t alignment = buf_.Alignment(); - bytes = std::max(alignment, TruncateToPageBoundary(alignment, bytes)); - } - rate_limiter_->Request(bytes, io_priority, stats_); - } - return bytes; -} - // This method writes to disk the specified data and makes use of the rate // limiter if available Status WritableFileWriter::WriteBuffered(const char* data, size_t size) { @@ -347,7 +369,14 @@ Status WritableFileWriter::WriteBuffered(const char* data, size_t size) { size_t left = size; while (left > 0) { - size_t allowed = RequestToken(left, false); + size_t allowed; + if (rate_limiter_ != nullptr) { + allowed = rate_limiter_->RequestToken( + left, 0 /* alignment */, writable_file_->GetIOPriority(), stats_, + RateLimiter::OpType::kWrite); + } else { + allowed = left; + } { IOSTATS_TIMER_GUARD(write_nanos); @@ -403,7 +432,14 @@ Status WritableFileWriter::WriteDirect() { while (left > 0) { // Check how much is allowed - size_t size = RequestToken(left, true); + size_t size; + if (rate_limiter_ != nullptr) { + size = rate_limiter_->RequestToken(left, buf_.Alignment(), + writable_file_->GetIOPriority(), + stats_, RateLimiter::OpType::kWrite); + } else { + size = left; + } { IOSTATS_TIMER_GUARD(write_nanos); diff --git a/util/file_reader_writer.h b/util/file_reader_writer.h index 4a82e1ddc..b3c65c95c 100644 --- a/util/file_reader_writer.h +++ b/util/file_reader_writer.h @@ -13,6 +13,7 @@ #include #include "port/port.h" #include "rocksdb/env.h" +#include "rocksdb/rate_limiter.h" #include "util/aligned_buffer.h" namespace rocksdb { @@ -53,9 +54,6 @@ class SequentialFileReader { SequentialFile* file() { return file_.get(); } bool use_direct_io() const { return file_->use_direct_io(); } - - protected: - Status DirectRead(size_t n, Slice* result, char* scratch); }; class RandomAccessFileReader { @@ -65,29 +63,38 @@ class RandomAccessFileReader { Statistics* stats_; uint32_t hist_type_; HistogramImpl* file_read_hist_; + RateLimiter* rate_limiter_; + bool for_compaction_; public: explicit RandomAccessFileReader(std::unique_ptr&& raf, Env* env = nullptr, Statistics* stats = nullptr, uint32_t hist_type = 0, - HistogramImpl* file_read_hist = nullptr) + HistogramImpl* file_read_hist = nullptr, + RateLimiter* rate_limiter = nullptr, + bool for_compaction = false) : file_(std::move(raf)), env_(env), stats_(stats), hist_type_(hist_type), - file_read_hist_(file_read_hist) {} + file_read_hist_(file_read_hist), + rate_limiter_(rate_limiter), + for_compaction_(for_compaction) {} RandomAccessFileReader(RandomAccessFileReader&& o) ROCKSDB_NOEXCEPT { *this = std::move(o); } - RandomAccessFileReader& operator=(RandomAccessFileReader&& o) ROCKSDB_NOEXCEPT{ + RandomAccessFileReader& operator=(RandomAccessFileReader&& o) + ROCKSDB_NOEXCEPT { file_ = std::move(o.file_); env_ = std::move(o.env_); stats_ = std::move(o.stats_); hist_type_ = std::move(o.hist_type_); file_read_hist_ = std::move(o.file_read_hist_); + rate_limiter_ = std::move(o.rate_limiter_); + for_compaction_ = std::move(o.for_compaction_); return *this; } @@ -103,10 +110,6 @@ class RandomAccessFileReader { RandomAccessFile* file() { return file_.get(); } bool use_direct_io() const { return file_->use_direct_io(); } - - protected: - Status DirectRead(uint64_t offset, size_t n, Slice* result, - char* scratch) const; }; // Use posix write to write data to a file. @@ -187,7 +190,6 @@ class WritableFileWriter { // Normal write Status WriteBuffered(const char* data, size_t size); Status RangeSync(uint64_t offset, uint64_t nbytes); - size_t RequestToken(size_t bytes, bool align); Status SyncInternal(bool use_fsync); }; diff --git a/util/rate_limiter.cc b/util/rate_limiter.cc index 064764cb6..e8d4cdf87 100644 --- a/util/rate_limiter.cc +++ b/util/rate_limiter.cc @@ -13,10 +13,27 @@ #include "monitoring/statistics.h" #include "port/port.h" #include "rocksdb/env.h" +#include "util/aligned_buffer.h" #include "util/sync_point.h" namespace rocksdb { +size_t RateLimiter::RequestToken(size_t bytes, size_t alignment, + Env::IOPriority io_priority, Statistics* stats, + RateLimiter::OpType op_type) { + if (io_priority < Env::IO_TOTAL && IsRateLimited(op_type)) { + bytes = std::min(bytes, static_cast(GetSingleBurstBytes())); + + if (alignment > 0) { + // Here we may actually require more than burst and block + // but we can not write less than one page at a time on direct I/O + // thus we may want not to use ratelimiter + bytes = std::max(alignment, TruncateToPageBoundary(alignment, bytes)); + } + Request(bytes, io_priority, stats, op_type); + } + return bytes; +} // Pending request struct GenericRateLimiter::Req { @@ -30,8 +47,9 @@ struct GenericRateLimiter::Req { GenericRateLimiter::GenericRateLimiter(int64_t rate_bytes_per_sec, int64_t refill_period_us, - int32_t fairness) - : refill_period_us_(refill_period_us), + int32_t fairness, RateLimiter::Mode mode) + : RateLimiter(mode), + refill_period_us_(refill_period_us), rate_bytes_per_sec_(rate_bytes_per_sec), refill_bytes_per_period_( CalculateRefillBytesPerPeriod(rate_bytes_per_sec)), @@ -241,12 +259,14 @@ int64_t GenericRateLimiter::CalculateRefillBytesPerPeriod( } RateLimiter* NewGenericRateLimiter( - int64_t rate_bytes_per_sec, int64_t refill_period_us, int32_t fairness) { + int64_t rate_bytes_per_sec, int64_t refill_period_us /* = 100 * 1000 */, + int32_t fairness /* = 10 */, + RateLimiter::Mode mode /* = RateLimiter::Mode::kWritesOnly */) { assert(rate_bytes_per_sec > 0); assert(refill_period_us > 0); assert(fairness > 0); - return new GenericRateLimiter( - rate_bytes_per_sec, refill_period_us, fairness); + return new GenericRateLimiter(rate_bytes_per_sec, refill_period_us, fairness, + mode); } } // namespace rocksdb diff --git a/util/rate_limiter.h b/util/rate_limiter.h index 238be5c3d..106e25c59 100644 --- a/util/rate_limiter.h +++ b/util/rate_limiter.h @@ -24,8 +24,9 @@ namespace rocksdb { class GenericRateLimiter : public RateLimiter { public: - GenericRateLimiter(int64_t refill_bytes, - int64_t refill_period_us, int32_t fairness); + GenericRateLimiter(int64_t refill_bytes, int64_t refill_period_us, + int32_t fairness, + RateLimiter::Mode mode = RateLimiter::Mode::kWritesOnly); virtual ~GenericRateLimiter(); diff --git a/util/rate_limiter_test.cc b/util/rate_limiter_test.cc index c20806ddf..4d4d0f03f 100644 --- a/util/rate_limiter_test.cc +++ b/util/rate_limiter_test.cc @@ -35,6 +35,30 @@ TEST_F(RateLimiterTest, StartStop) { std::unique_ptr limiter(NewGenericRateLimiter(100, 100, 10)); } +TEST_F(RateLimiterTest, Modes) { + for (auto mode : {RateLimiter::Mode::kWritesOnly, + RateLimiter::Mode::kReadsOnly, RateLimiter::Mode::kAllIo}) { + GenericRateLimiter limiter(2000 /* rate_bytes_per_sec */, + 1000 * 1000 /* refill_period_us */, + 10 /* fairness */, mode); + limiter.Request(1000 /* bytes */, Env::IO_HIGH, nullptr /* stats */, + RateLimiter::OpType::kRead); + if (mode == RateLimiter::Mode::kWritesOnly) { + ASSERT_EQ(0, limiter.GetTotalBytesThrough(Env::IO_HIGH)); + } else { + ASSERT_EQ(1000, limiter.GetTotalBytesThrough(Env::IO_HIGH)); + } + + limiter.Request(1000 /* bytes */, Env::IO_HIGH, nullptr /* stats */, + RateLimiter::OpType::kWrite); + if (mode == RateLimiter::Mode::kAllIo) { + ASSERT_EQ(2000, limiter.GetTotalBytesThrough(Env::IO_HIGH)); + } else { + ASSERT_EQ(1000, limiter.GetTotalBytesThrough(Env::IO_HIGH)); + } + } +} + TEST_F(RateLimiterTest, Rate) { auto* env = Env::Default(); struct Arg { @@ -57,10 +81,11 @@ TEST_F(RateLimiterTest, Rate) { while (thread_env->NowMicros() < until) { for (int i = 0; i < static_cast(r.Skewed(arg->burst) + 1); ++i) { arg->limiter->Request(r.Uniform(arg->request_size - 1) + 1, - Env::IO_HIGH, nullptr /* stats */); + Env::IO_HIGH, nullptr /* stats */, + RateLimiter::OpType::kWrite); } arg->limiter->Request(r.Uniform(arg->request_size - 1) + 1, Env::IO_LOW, - nullptr /* stats */); + nullptr /* stats */, RateLimiter::OpType::kWrite); } }; @@ -113,7 +138,8 @@ TEST_F(RateLimiterTest, LimitChangeTest) { auto writer = [](void* p) { auto* arg = static_cast(p); - arg->limiter->Request(arg->request_size, arg->pri, nullptr /* stats */); + arg->limiter->Request(arg->request_size, arg->pri, nullptr /* stats */, + RateLimiter::OpType::kWrite); }; for (uint32_t i = 1; i <= 16; i <<= 1) { diff --git a/utilities/backupable/backupable_db.cc b/utilities/backupable/backupable_db.cc index 233b34388..1377e94a6 100644 --- a/utilities/backupable/backupable_db.cc +++ b/utilities/backupable/backupable_db.cc @@ -1214,7 +1214,8 @@ Status BackupEngineImpl::CopyOrCreateFile( } s = dest_writer->Append(data); if (rate_limiter != nullptr) { - rate_limiter->Request(data.size(), Env::IO_LOW, nullptr /* stats */); + rate_limiter->Request(data.size(), Env::IO_LOW, nullptr /* stats */, + RateLimiter::OpType::kWrite); } if (processed_buffer_size > options_.callback_trigger_interval_size) { processed_buffer_size -= options_.callback_trigger_interval_size;