From 5f4166c90e8ec69afef28ae825512d964db312b2 Mon Sep 17 00:00:00 2001 From: Igor Canadi Date: Wed, 26 Aug 2015 15:25:59 -0700 Subject: [PATCH] ReadaheadRandomAccessFile -- userspace readahead Summary: ReadaheadRandomAccessFile acts as a transparent layer on top of RandomAccessFile. When a Read() request is issued, it issues a much bigger request to the OS and caches the result. When a new request comes in and we already have the data cached, it doesn't have to issue any requests to the OS. We add ReadaheadRandomAccessFile layer only when file is read during compactions. D45105 was incorrectly closed by Phabricator because I committed it to a separate branch (not master), so I'm resubmitting the diff. Test Plan: make check Reviewers: MarkCallaghan, sdong Reviewed By: sdong Subscribers: leveldb, dhruba Differential Revision: https://reviews.facebook.net/D45123 --- db/db_bench.cc | 8 +++ db/db_impl.cc | 4 ++ db/table_cache.cc | 17 +++--- db/table_cache.h | 14 ++--- include/rocksdb/immutable_options.h | 2 + include/rocksdb/options.h | 10 ++++ util/db_test_util.cc | 1 + util/file_reader_writer.cc | 84 +++++++++++++++++++++++++++++ util/file_reader_writer.h | 3 ++ util/options.cc | 7 +++ util/options_helper.h | 3 ++ util/options_test.cc | 5 +- 12 files changed, 143 insertions(+), 15 deletions(-) diff --git a/db/db_bench.cc b/db/db_bench.cc index 7832be6f4..53e204057 100644 --- a/db/db_bench.cc +++ b/db/db_bench.cc @@ -357,6 +357,11 @@ DEFINE_int32(open_files, rocksdb::Options().max_open_files, "Maximum number of files to keep open at the same time" " (use default if == 0)"); +DEFINE_int32(new_table_reader_for_compaction_inputs, true, + "If true, uses a separate file handle for compaction inputs"); + +DEFINE_int32(compaction_readahead_size, 0, "Compaction readahead size"); + DEFINE_int32(bloom_bits, -1, "Bloom filter bits per key. Negative means" " use default settings."); DEFINE_int32(memtable_bloom_bits, 0, "Bloom filter bits per key for memtable. " @@ -2191,6 +2196,9 @@ class Benchmark { options.memtable_prefix_bloom_bits = FLAGS_memtable_bloom_bits; options.bloom_locality = FLAGS_bloom_locality; options.max_open_files = FLAGS_open_files; + options.new_table_reader_for_compaction_inputs = + FLAGS_new_table_reader_for_compaction_inputs; + options.compaction_readahead_size = FLAGS_compaction_readahead_size; options.statistics = dbstats; if (FLAGS_enable_io_prio) { FLAGS_env->LowerThreadPoolIOPriority(Env::LOW); diff --git a/db/db_impl.cc b/db/db_impl.cc index a41210888..597be5739 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -156,6 +156,10 @@ DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src) { result.db_paths.emplace_back(dbname, std::numeric_limits::max()); } + if (result.compaction_readahead_size > 0) { + result.new_table_reader_for_compaction_inputs = true; + } + return result; } diff --git a/db/table_cache.cc b/db/table_cache.cc index 35250c66e..ac2f473cc 100644 --- a/db/table_cache.cc +++ b/db/table_cache.cc @@ -85,15 +85,19 @@ void TableCache::ReleaseHandle(Cache::Handle* handle) { Status TableCache::GetTableReader( const EnvOptions& env_options, const InternalKeyComparator& internal_comparator, const FileDescriptor& fd, - bool advise_random_on_open, bool record_read_stats, - HistogramImpl* file_read_hist, unique_ptr* table_reader) { + bool sequential_mode, bool record_read_stats, HistogramImpl* file_read_hist, + unique_ptr* table_reader) { std::string fname = TableFileName(ioptions_.db_paths, fd.GetNumber(), fd.GetPathId()); unique_ptr file; Status s = ioptions_.env->NewRandomAccessFile(fname, &file, env_options); + if (sequential_mode && ioptions_.compaction_readahead_size > 0) { + file = NewReadaheadRandomAccessFile(std::move(file), + ioptions_.compaction_readahead_size); + } RecordTick(ioptions_.statistics, NO_FILE_OPENS); if (s.ok()) { - if (advise_random_on_open) { + if (!sequential_mode && ioptions_.advise_random_on_open) { file->Hint(RandomAccessFile::RANDOM); } StopWatch sw(ioptions_.env, ioptions_.statistics, TABLE_OPEN_IO_MICROS); @@ -128,7 +132,7 @@ Status TableCache::FindTable(const EnvOptions& env_options, } unique_ptr table_reader; s = GetTableReader(env_options, internal_comparator, fd, - ioptions_.advise_random_on_open, record_read_stats, + false /* sequential mode */, record_read_stats, file_read_hist, &table_reader); if (!s.ok()) { assert(table_reader == nullptr); @@ -162,8 +166,9 @@ Iterator* TableCache::NewIterator(const ReadOptions& options, (for_compaction && ioptions_.new_table_reader_for_compaction_inputs); if (create_new_table_reader) { unique_ptr table_reader_unique_ptr; - Status s = GetTableReader(env_options, icomparator, fd, false, false, - nullptr, &table_reader_unique_ptr); + Status s = GetTableReader( + env_options, icomparator, fd, /* sequential mode */ true, + /* record stats */ false, nullptr, &table_reader_unique_ptr); if (!s.ok()) { return NewErrorIterator(s, arena); } diff --git a/db/table_cache.h b/db/table_cache.h index 60851e502..d9ae01348 100644 --- a/db/table_cache.h +++ b/db/table_cache.h @@ -61,13 +61,6 @@ class TableCache { // Evict any entry for the specified file number static void Evict(Cache* cache, uint64_t file_number); - // Build a table reader - Status GetTableReader(const EnvOptions& env_options, - const InternalKeyComparator& internal_comparator, - const FileDescriptor& fd, bool advise_random_on_open, - bool record_read_stats, HistogramImpl* file_read_hist, - unique_ptr* table_reader); - // Find table reader Status FindTable(const EnvOptions& toptions, const InternalKeyComparator& internal_comparator, @@ -101,6 +94,13 @@ class TableCache { void ReleaseHandle(Cache::Handle* handle); private: + // Build a table reader + Status GetTableReader(const EnvOptions& env_options, + const InternalKeyComparator& internal_comparator, + const FileDescriptor& fd, bool sequential_mode, + bool record_read_stats, HistogramImpl* file_read_hist, + unique_ptr* table_reader); + const ImmutableCFOptions& ioptions_; const EnvOptions& env_options_; Cache* const cache_; diff --git a/include/rocksdb/immutable_options.h b/include/rocksdb/immutable_options.h index 051f1ff8d..589f14e99 100644 --- a/include/rocksdb/immutable_options.h +++ b/include/rocksdb/immutable_options.h @@ -91,6 +91,8 @@ struct ImmutableCFOptions { bool new_table_reader_for_compaction_inputs; + size_t compaction_readahead_size; + int num_levels; bool optimize_filters_for_hits; diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 9522f44f0..dcf22907a 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -1038,6 +1038,16 @@ struct DBOptions { // Default: false bool new_table_reader_for_compaction_inputs; + // If non-zero, we perform bigger reads when doing compaction. If you're + // running RocksDB on spinning disks, you should set this to at least 2MB. + // That way RocksDB's compaction is doing sequential instead of random reads. + // + // When non-zero, we also force new_table_reader_for_compaction_inputs to + // true. + // + // Default: 0 + size_t compaction_readahead_size; + // Use adaptive mutex, which spins in the user space before resorting // to kernel. This could reduce context switch when the mutex is not // heavily contended. However, if the mutex is hot, we could end up diff --git a/util/db_test_util.cc b/util/db_test_util.cc index ded303383..cff448817 100644 --- a/util/db_test_util.cc +++ b/util/db_test_util.cc @@ -231,6 +231,7 @@ Options DBTestBase::CurrentOptions( case kFullFilterWithNewTableReaderForCompactions: table_options.filter_policy.reset(NewBloomFilterPolicy(10, false)); options.new_table_reader_for_compaction_inputs = true; + options.compaction_readahead_size = 10 * 1024 * 1024; break; case kUncompressed: options.compression = kNoCompression; diff --git a/util/file_reader_writer.cc b/util/file_reader_writer.cc index 502b55ce2..fb3fb8dfd 100644 --- a/util/file_reader_writer.cc +++ b/util/file_reader_writer.cc @@ -10,6 +10,8 @@ #include "util/file_reader_writer.h" #include +#include + #include "port/port.h" #include "util/histogram.h" #include "util/iostats_context_imp.h" @@ -222,4 +224,86 @@ size_t WritableFileWriter::RequestToken(size_t bytes) { } return bytes; } + +namespace { +class ReadaheadRandomAccessFile : public RandomAccessFile { + public: + ReadaheadRandomAccessFile(std::unique_ptr file, + size_t readahead_size) + : file_(std::move(file)), + readahead_size_(readahead_size), + buffer_(new char[readahead_size_]), + buffer_offset_(0), + buffer_len_(0) {} + + virtual Status Read(uint64_t offset, size_t n, Slice* result, + char* scratch) const override { + if (n >= readahead_size_) { + return file_->Read(offset, n, result, scratch); + } + + std::unique_lock lk(lock_); + + size_t copied = 0; + // if offset between [buffer_offset_, buffer_offset_ + buffer_len> + if (offset >= buffer_offset_ && offset < buffer_len_ + buffer_offset_) { + uint64_t offset_in_buffer = offset - buffer_offset_; + copied = std::min(static_cast(buffer_len_) - offset_in_buffer, + static_cast(n)); + memcpy(scratch, buffer_.get() + offset_in_buffer, copied); + if (copied == n) { + // fully cached + *result = Slice(scratch, n); + return Status::OK(); + } + } + Slice readahead_result; + Status s = file_->Read(offset + copied, readahead_size_, &readahead_result, + buffer_.get()); + if (!s.ok()) { + return s; + } + + auto left_to_copy = std::min(readahead_result.size(), n - copied); + memcpy(scratch + copied, readahead_result.data(), left_to_copy); + *result = Slice(scratch, copied + left_to_copy); + + if (readahead_result.data() == buffer_.get()) { + buffer_offset_ = offset + copied; + buffer_len_ = readahead_result.size(); + } else { + buffer_len_ = 0; + } + + return Status::OK(); + } + + virtual size_t GetUniqueId(char* id, size_t max_size) const override { + return file_->GetUniqueId(id, max_size); + } + + virtual void Hint(AccessPattern pattern) override { file_->Hint(pattern); } + + virtual Status InvalidateCache(size_t offset, size_t length) override { + return file_->InvalidateCache(offset, length); + } + + private: + std::unique_ptr file_; + size_t readahead_size_; + + mutable std::mutex lock_; + mutable std::unique_ptr buffer_; + mutable uint64_t buffer_offset_; + mutable size_t buffer_len_; +}; +} // namespace + +std::unique_ptr NewReadaheadRandomAccessFile( + std::unique_ptr file, size_t readahead_size) { + std::unique_ptr wrapped_file( + new ReadaheadRandomAccessFile(std::move(file), readahead_size)); + return std::move(wrapped_file); +} + } // namespace rocksdb diff --git a/util/file_reader_writer.h b/util/file_reader_writer.h index c0a3b5aa8..873c0e29d 100644 --- a/util/file_reader_writer.h +++ b/util/file_reader_writer.h @@ -14,6 +14,9 @@ namespace rocksdb { class Statistics; class HistogramImpl; +std::unique_ptr NewReadaheadRandomAccessFile( + std::unique_ptr file, size_t readahead_size); + class SequentialFileReader { private: std::unique_ptr file_; diff --git a/util/options.cc b/util/options.cc index e7376cd2e..7f3bf75c7 100644 --- a/util/options.cc +++ b/util/options.cc @@ -71,6 +71,7 @@ ImmutableCFOptions::ImmutableCFOptions(const Options& options) access_hint_on_compaction_start(options.access_hint_on_compaction_start), new_table_reader_for_compaction_inputs( options.new_table_reader_for_compaction_inputs), + compaction_readahead_size(options.compaction_readahead_size), num_levels(options.num_levels), optimize_filters_for_hits(options.optimize_filters_for_hits), listeners(options.listeners), @@ -241,6 +242,7 @@ DBOptions::DBOptions() db_write_buffer_size(0), access_hint_on_compaction_start(NORMAL), new_table_reader_for_compaction_inputs(false), + compaction_readahead_size(0), use_adaptive_mutex(false), bytes_per_sync(0), wal_bytes_per_sync(0), @@ -294,6 +296,7 @@ DBOptions::DBOptions(const Options& options) access_hint_on_compaction_start(options.access_hint_on_compaction_start), new_table_reader_for_compaction_inputs( options.new_table_reader_for_compaction_inputs), + compaction_readahead_size(options.compaction_readahead_size), use_adaptive_mutex(options.use_adaptive_mutex), bytes_per_sync(options.bytes_per_sync), wal_bytes_per_sync(options.wal_bytes_per_sync), @@ -371,6 +374,10 @@ void DBOptions::Dump(Logger* log) const { access_hints[access_hint_on_compaction_start]); Warn(log, " Options.new_table_reader_for_compaction_inputs: %d", new_table_reader_for_compaction_inputs); + Warn(log, + " Options.compaction_readahead_size: %" ROCKSDB_PRIszt + "d", + compaction_readahead_size); Warn(log, " Options.use_adaptive_mutex: %d", use_adaptive_mutex); Warn(log, " Options.rate_limiter: %p", diff --git a/util/options_helper.h b/util/options_helper.h index e944aec13..bc3d463a2 100644 --- a/util/options_helper.h +++ b/util/options_helper.h @@ -85,6 +85,9 @@ static std::unordered_map db_options_type_info = { {"new_table_reader_for_compaction_inputs", {offsetof(struct DBOptions, new_table_reader_for_compaction_inputs), OptionType::kBoolean}}, + {"compaction_readahead_size", + {offsetof(struct DBOptions, compaction_readahead_size), + OptionType::kSizeT}}, {"use_adaptive_mutex", {offsetof(struct DBOptions, use_adaptive_mutex), OptionType::kBoolean}}, {"use_fsync", diff --git a/util/options_test.cc b/util/options_test.cc index 93e471263..b7d8ca9b5 100644 --- a/util/options_test.cc +++ b/util/options_test.cc @@ -172,9 +172,9 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) { {"advise_random_on_open", "true"}, {"use_adaptive_mutex", "false"}, {"new_table_reader_for_compaction_inputs", "true"}, + {"compaction_readahead_size", "100"}, {"bytes_per_sync", "47"}, - {"wal_bytes_per_sync", "48"}, - }; + {"wal_bytes_per_sync", "48"}, }; ColumnFamilyOptions base_cf_opt; ColumnFamilyOptions new_cf_opt; @@ -279,6 +279,7 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) { ASSERT_EQ(new_db_opt.advise_random_on_open, true); ASSERT_EQ(new_db_opt.use_adaptive_mutex, false); ASSERT_EQ(new_db_opt.new_table_reader_for_compaction_inputs, true); + ASSERT_EQ(new_db_opt.compaction_readahead_size, 100); ASSERT_EQ(new_db_opt.bytes_per_sync, static_cast(47)); ASSERT_EQ(new_db_opt.wal_bytes_per_sync, static_cast(48)); }