diff --git a/db/db_bench.cc b/db/db_bench.cc index 7832be6f4..53e204057 100644 --- a/db/db_bench.cc +++ b/db/db_bench.cc @@ -357,6 +357,11 @@ DEFINE_int32(open_files, rocksdb::Options().max_open_files, "Maximum number of files to keep open at the same time" " (use default if == 0)"); +DEFINE_int32(new_table_reader_for_compaction_inputs, true, + "If true, uses a separate file handle for compaction inputs"); + +DEFINE_int32(compaction_readahead_size, 0, "Compaction readahead size"); + DEFINE_int32(bloom_bits, -1, "Bloom filter bits per key. Negative means" " use default settings."); DEFINE_int32(memtable_bloom_bits, 0, "Bloom filter bits per key for memtable. " @@ -2191,6 +2196,9 @@ class Benchmark { options.memtable_prefix_bloom_bits = FLAGS_memtable_bloom_bits; options.bloom_locality = FLAGS_bloom_locality; options.max_open_files = FLAGS_open_files; + options.new_table_reader_for_compaction_inputs = + FLAGS_new_table_reader_for_compaction_inputs; + options.compaction_readahead_size = FLAGS_compaction_readahead_size; options.statistics = dbstats; if (FLAGS_enable_io_prio) { FLAGS_env->LowerThreadPoolIOPriority(Env::LOW); diff --git a/db/db_impl.cc b/db/db_impl.cc index a41210888..597be5739 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -156,6 +156,10 @@ DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src) { result.db_paths.emplace_back(dbname, std::numeric_limits::max()); } + if (result.compaction_readahead_size > 0) { + result.new_table_reader_for_compaction_inputs = true; + } + return result; } diff --git a/db/table_cache.cc b/db/table_cache.cc index 35250c66e..ac2f473cc 100644 --- a/db/table_cache.cc +++ b/db/table_cache.cc @@ -85,15 +85,19 @@ void TableCache::ReleaseHandle(Cache::Handle* handle) { Status TableCache::GetTableReader( const EnvOptions& env_options, const InternalKeyComparator& internal_comparator, const FileDescriptor& fd, - bool advise_random_on_open, bool record_read_stats, - HistogramImpl* file_read_hist, unique_ptr* table_reader) { + bool sequential_mode, bool record_read_stats, HistogramImpl* file_read_hist, + unique_ptr* table_reader) { std::string fname = TableFileName(ioptions_.db_paths, fd.GetNumber(), fd.GetPathId()); unique_ptr file; Status s = ioptions_.env->NewRandomAccessFile(fname, &file, env_options); + if (sequential_mode && ioptions_.compaction_readahead_size > 0) { + file = NewReadaheadRandomAccessFile(std::move(file), + ioptions_.compaction_readahead_size); + } RecordTick(ioptions_.statistics, NO_FILE_OPENS); if (s.ok()) { - if (advise_random_on_open) { + if (!sequential_mode && ioptions_.advise_random_on_open) { file->Hint(RandomAccessFile::RANDOM); } StopWatch sw(ioptions_.env, ioptions_.statistics, TABLE_OPEN_IO_MICROS); @@ -128,7 +132,7 @@ Status TableCache::FindTable(const EnvOptions& env_options, } unique_ptr table_reader; s = GetTableReader(env_options, internal_comparator, fd, - ioptions_.advise_random_on_open, record_read_stats, + false /* sequential mode */, record_read_stats, file_read_hist, &table_reader); if (!s.ok()) { assert(table_reader == nullptr); @@ -162,8 +166,9 @@ Iterator* TableCache::NewIterator(const ReadOptions& options, (for_compaction && ioptions_.new_table_reader_for_compaction_inputs); if (create_new_table_reader) { unique_ptr table_reader_unique_ptr; - Status s = GetTableReader(env_options, icomparator, fd, false, false, - nullptr, &table_reader_unique_ptr); + Status s = GetTableReader( + env_options, icomparator, fd, /* sequential mode */ true, + /* record stats */ false, nullptr, &table_reader_unique_ptr); if (!s.ok()) { return NewErrorIterator(s, arena); } diff --git a/db/table_cache.h b/db/table_cache.h index 60851e502..d9ae01348 100644 --- a/db/table_cache.h +++ b/db/table_cache.h @@ -61,13 +61,6 @@ class TableCache { // Evict any entry for the specified file number static void Evict(Cache* cache, uint64_t file_number); - // Build a table reader - Status GetTableReader(const EnvOptions& env_options, - const InternalKeyComparator& internal_comparator, - const FileDescriptor& fd, bool advise_random_on_open, - bool record_read_stats, HistogramImpl* file_read_hist, - unique_ptr* table_reader); - // Find table reader Status FindTable(const EnvOptions& toptions, const InternalKeyComparator& internal_comparator, @@ -101,6 +94,13 @@ class TableCache { void ReleaseHandle(Cache::Handle* handle); private: + // Build a table reader + Status GetTableReader(const EnvOptions& env_options, + const InternalKeyComparator& internal_comparator, + const FileDescriptor& fd, bool sequential_mode, + bool record_read_stats, HistogramImpl* file_read_hist, + unique_ptr* table_reader); + const ImmutableCFOptions& ioptions_; const EnvOptions& env_options_; Cache* const cache_; diff --git a/include/rocksdb/immutable_options.h b/include/rocksdb/immutable_options.h index 051f1ff8d..589f14e99 100644 --- a/include/rocksdb/immutable_options.h +++ b/include/rocksdb/immutable_options.h @@ -91,6 +91,8 @@ struct ImmutableCFOptions { bool new_table_reader_for_compaction_inputs; + size_t compaction_readahead_size; + int num_levels; bool optimize_filters_for_hits; diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 9522f44f0..dcf22907a 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -1038,6 +1038,16 @@ struct DBOptions { // Default: false bool new_table_reader_for_compaction_inputs; + // If non-zero, we perform bigger reads when doing compaction. If you're + // running RocksDB on spinning disks, you should set this to at least 2MB. + // That way RocksDB's compaction is doing sequential instead of random reads. + // + // When non-zero, we also force new_table_reader_for_compaction_inputs to + // true. + // + // Default: 0 + size_t compaction_readahead_size; + // Use adaptive mutex, which spins in the user space before resorting // to kernel. This could reduce context switch when the mutex is not // heavily contended. However, if the mutex is hot, we could end up diff --git a/util/db_test_util.cc b/util/db_test_util.cc index ded303383..cff448817 100644 --- a/util/db_test_util.cc +++ b/util/db_test_util.cc @@ -231,6 +231,7 @@ Options DBTestBase::CurrentOptions( case kFullFilterWithNewTableReaderForCompactions: table_options.filter_policy.reset(NewBloomFilterPolicy(10, false)); options.new_table_reader_for_compaction_inputs = true; + options.compaction_readahead_size = 10 * 1024 * 1024; break; case kUncompressed: options.compression = kNoCompression; diff --git a/util/file_reader_writer.cc b/util/file_reader_writer.cc index 502b55ce2..fb3fb8dfd 100644 --- a/util/file_reader_writer.cc +++ b/util/file_reader_writer.cc @@ -10,6 +10,8 @@ #include "util/file_reader_writer.h" #include +#include + #include "port/port.h" #include "util/histogram.h" #include "util/iostats_context_imp.h" @@ -222,4 +224,86 @@ size_t WritableFileWriter::RequestToken(size_t bytes) { } return bytes; } + +namespace { +class ReadaheadRandomAccessFile : public RandomAccessFile { + public: + ReadaheadRandomAccessFile(std::unique_ptr file, + size_t readahead_size) + : file_(std::move(file)), + readahead_size_(readahead_size), + buffer_(new char[readahead_size_]), + buffer_offset_(0), + buffer_len_(0) {} + + virtual Status Read(uint64_t offset, size_t n, Slice* result, + char* scratch) const override { + if (n >= readahead_size_) { + return file_->Read(offset, n, result, scratch); + } + + std::unique_lock lk(lock_); + + size_t copied = 0; + // if offset between [buffer_offset_, buffer_offset_ + buffer_len> + if (offset >= buffer_offset_ && offset < buffer_len_ + buffer_offset_) { + uint64_t offset_in_buffer = offset - buffer_offset_; + copied = std::min(static_cast(buffer_len_) - offset_in_buffer, + static_cast(n)); + memcpy(scratch, buffer_.get() + offset_in_buffer, copied); + if (copied == n) { + // fully cached + *result = Slice(scratch, n); + return Status::OK(); + } + } + Slice readahead_result; + Status s = file_->Read(offset + copied, readahead_size_, &readahead_result, + buffer_.get()); + if (!s.ok()) { + return s; + } + + auto left_to_copy = std::min(readahead_result.size(), n - copied); + memcpy(scratch + copied, readahead_result.data(), left_to_copy); + *result = Slice(scratch, copied + left_to_copy); + + if (readahead_result.data() == buffer_.get()) { + buffer_offset_ = offset + copied; + buffer_len_ = readahead_result.size(); + } else { + buffer_len_ = 0; + } + + return Status::OK(); + } + + virtual size_t GetUniqueId(char* id, size_t max_size) const override { + return file_->GetUniqueId(id, max_size); + } + + virtual void Hint(AccessPattern pattern) override { file_->Hint(pattern); } + + virtual Status InvalidateCache(size_t offset, size_t length) override { + return file_->InvalidateCache(offset, length); + } + + private: + std::unique_ptr file_; + size_t readahead_size_; + + mutable std::mutex lock_; + mutable std::unique_ptr buffer_; + mutable uint64_t buffer_offset_; + mutable size_t buffer_len_; +}; +} // namespace + +std::unique_ptr NewReadaheadRandomAccessFile( + std::unique_ptr file, size_t readahead_size) { + std::unique_ptr wrapped_file( + new ReadaheadRandomAccessFile(std::move(file), readahead_size)); + return std::move(wrapped_file); +} + } // namespace rocksdb diff --git a/util/file_reader_writer.h b/util/file_reader_writer.h index c0a3b5aa8..873c0e29d 100644 --- a/util/file_reader_writer.h +++ b/util/file_reader_writer.h @@ -14,6 +14,9 @@ namespace rocksdb { class Statistics; class HistogramImpl; +std::unique_ptr NewReadaheadRandomAccessFile( + std::unique_ptr file, size_t readahead_size); + class SequentialFileReader { private: std::unique_ptr file_; diff --git a/util/options.cc b/util/options.cc index e7376cd2e..7f3bf75c7 100644 --- a/util/options.cc +++ b/util/options.cc @@ -71,6 +71,7 @@ ImmutableCFOptions::ImmutableCFOptions(const Options& options) access_hint_on_compaction_start(options.access_hint_on_compaction_start), new_table_reader_for_compaction_inputs( options.new_table_reader_for_compaction_inputs), + compaction_readahead_size(options.compaction_readahead_size), num_levels(options.num_levels), optimize_filters_for_hits(options.optimize_filters_for_hits), listeners(options.listeners), @@ -241,6 +242,7 @@ DBOptions::DBOptions() db_write_buffer_size(0), access_hint_on_compaction_start(NORMAL), new_table_reader_for_compaction_inputs(false), + compaction_readahead_size(0), use_adaptive_mutex(false), bytes_per_sync(0), wal_bytes_per_sync(0), @@ -294,6 +296,7 @@ DBOptions::DBOptions(const Options& options) access_hint_on_compaction_start(options.access_hint_on_compaction_start), new_table_reader_for_compaction_inputs( options.new_table_reader_for_compaction_inputs), + compaction_readahead_size(options.compaction_readahead_size), use_adaptive_mutex(options.use_adaptive_mutex), bytes_per_sync(options.bytes_per_sync), wal_bytes_per_sync(options.wal_bytes_per_sync), @@ -371,6 +374,10 @@ void DBOptions::Dump(Logger* log) const { access_hints[access_hint_on_compaction_start]); Warn(log, " Options.new_table_reader_for_compaction_inputs: %d", new_table_reader_for_compaction_inputs); + Warn(log, + " Options.compaction_readahead_size: %" ROCKSDB_PRIszt + "d", + compaction_readahead_size); Warn(log, " Options.use_adaptive_mutex: %d", use_adaptive_mutex); Warn(log, " Options.rate_limiter: %p", diff --git a/util/options_helper.h b/util/options_helper.h index e944aec13..bc3d463a2 100644 --- a/util/options_helper.h +++ b/util/options_helper.h @@ -85,6 +85,9 @@ static std::unordered_map db_options_type_info = { {"new_table_reader_for_compaction_inputs", {offsetof(struct DBOptions, new_table_reader_for_compaction_inputs), OptionType::kBoolean}}, + {"compaction_readahead_size", + {offsetof(struct DBOptions, compaction_readahead_size), + OptionType::kSizeT}}, {"use_adaptive_mutex", {offsetof(struct DBOptions, use_adaptive_mutex), OptionType::kBoolean}}, {"use_fsync", diff --git a/util/options_test.cc b/util/options_test.cc index 93e471263..b7d8ca9b5 100644 --- a/util/options_test.cc +++ b/util/options_test.cc @@ -172,9 +172,9 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) { {"advise_random_on_open", "true"}, {"use_adaptive_mutex", "false"}, {"new_table_reader_for_compaction_inputs", "true"}, + {"compaction_readahead_size", "100"}, {"bytes_per_sync", "47"}, - {"wal_bytes_per_sync", "48"}, - }; + {"wal_bytes_per_sync", "48"}, }; ColumnFamilyOptions base_cf_opt; ColumnFamilyOptions new_cf_opt; @@ -279,6 +279,7 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) { ASSERT_EQ(new_db_opt.advise_random_on_open, true); ASSERT_EQ(new_db_opt.use_adaptive_mutex, false); ASSERT_EQ(new_db_opt.new_table_reader_for_compaction_inputs, true); + ASSERT_EQ(new_db_opt.compaction_readahead_size, 100); ASSERT_EQ(new_db_opt.bytes_per_sync, static_cast(47)); ASSERT_EQ(new_db_opt.wal_bytes_per_sync, static_cast(48)); }