diff --git a/HISTORY.md b/HISTORY.md index d945115c6..ad2050af0 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -5,6 +5,8 @@ * Delete deprecated classes for creating backups (BackupableDB) and restoring from backups (RestoreBackupableDB). Now, BackupEngine should be used for creating backups, and BackupEngineReadOnly should be used for restorations. For more details, see https://github.com/facebook/rocksdb/wiki/How-to-backup-RocksDB%3F * Expose estimate of per-level compression ratio via DB property: "rocksdb.compression-ratio-at-levelN". * Added EventListener::OnTableFileCreationStarted. EventListener::OnTableFileCreated will be called on failure case. User can check creation status via TableFileCreationInfo::status. +### New Features +* Add ReadOptions::readahead_size. If non-zero, NewIterator will create a new table reader which performs reads of the given size. ## 4.7.0 (4/8/2016) ### Public API Change diff --git a/db/db_iterator_test.cc b/db/db_iterator_test.cc index 48f7cf9e1..7635900e8 100644 --- a/db/db_iterator_test.cc +++ b/db/db_iterator_test.cc @@ -9,6 +9,7 @@ #include "db/db_test_util.h" #include "port/stack_trace.h" +#include "rocksdb/iostats_context.h" #include "rocksdb/perf_context.h" namespace rocksdb { @@ -1525,6 +1526,75 @@ TEST_F(DBIteratorTest, IteratorWithLocalStatistics) { ASSERT_EQ(TestGetTickerCount(options, ITER_BYTES_READ), total_bytes); } +TEST_F(DBIteratorTest, ReadAhead) { + Options options; + auto env = new SpecialEnv(Env::Default()); + env->count_random_reads_ = true; + options.env = env; + options.disable_auto_compactions = true; + options.write_buffer_size = 4 << 20; + options.statistics = rocksdb::CreateDBStatistics(); + BlockBasedTableOptions table_options; + table_options.block_size = 1024; + table_options.no_block_cache = true; + options.table_factory.reset(new BlockBasedTableFactory(table_options)); + Reopen(options); + + std::string value(1024, 'a'); + for (int i = 0; i < 100; i++) { + Put(Key(i), value); + } + ASSERT_OK(Flush()); + MoveFilesToLevel(2); + + for (int i = 0; i < 100; i++) { + Put(Key(i), value); + } + ASSERT_OK(Flush()); + MoveFilesToLevel(1); + + for (int i = 0; i < 100; i++) { + Put(Key(i), value); + } + ASSERT_OK(Flush()); + ASSERT_EQ("1,1,1", FilesPerLevel()); + + env->random_read_bytes_counter_ = 0; + options.statistics->setTickerCount(NO_FILE_OPENS, 0); + ReadOptions read_options; + auto* iter = db_->NewIterator(read_options); + iter->SeekToFirst(); + int64_t num_file_opens = TestGetTickerCount(options, NO_FILE_OPENS); + int64_t bytes_read = env->random_read_bytes_counter_; + delete iter; + + env->random_read_bytes_counter_ = 0; + options.statistics->setTickerCount(NO_FILE_OPENS, 0); + read_options.readahead_size = 1024 * 10; + iter = db_->NewIterator(read_options); + iter->SeekToFirst(); + int64_t num_file_opens_readahead = TestGetTickerCount(options, NO_FILE_OPENS); + int64_t bytes_read_readahead = env->random_read_bytes_counter_; + delete iter; + ASSERT_EQ(num_file_opens + 3, num_file_opens_readahead); + ASSERT_GT(bytes_read_readahead, bytes_read); + ASSERT_GT(bytes_read_readahead, read_options.readahead_size * 3); + + // Verify correctness. + iter = db_->NewIterator(read_options); + int count = 0; + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + ASSERT_EQ(value, iter->value()); + count++; + } + ASSERT_EQ(100, count); + for (int i = 0; i < 100; i++) { + iter->Seek(Key(i)); + ASSERT_EQ(value, iter->value()); + } + delete iter; +} + } // namespace rocksdb int main(int argc, char** argv) { diff --git a/db/db_test_util.h b/db/db_test_util.h index dffcc9037..4f2cea29c 100644 --- a/db/db_test_util.h +++ b/db/db_test_util.h @@ -358,23 +358,30 @@ class SpecialEnv : public EnvWrapper { class CountingFile : public RandomAccessFile { public: CountingFile(unique_ptr&& target, - anon::AtomicCounter* counter) - : target_(std::move(target)), counter_(counter) {} + anon::AtomicCounter* counter, + std::atomic* bytes_read) + : target_(std::move(target)), + counter_(counter), + bytes_read_(bytes_read) {} virtual Status Read(uint64_t offset, size_t n, Slice* result, char* scratch) const override { counter_->Increment(); - return target_->Read(offset, n, result, scratch); + Status s = target_->Read(offset, n, result, scratch); + *bytes_read_ += result->size(); + return s; } private: unique_ptr target_; anon::AtomicCounter* counter_; + std::atomic* bytes_read_; }; Status s = target()->NewRandomAccessFile(f, r, soptions); random_file_open_counter_++; if (s.ok() && count_random_reads_) { - r->reset(new CountingFile(std::move(*r), &random_read_counter_)); + r->reset(new CountingFile(std::move(*r), &random_read_counter_, + &random_read_bytes_counter_)); } return s; } @@ -464,6 +471,7 @@ class SpecialEnv : public EnvWrapper { bool count_random_reads_; anon::AtomicCounter random_read_counter_; + std::atomic random_read_bytes_counter_; std::atomic random_file_open_counter_; bool count_sequential_reads_; diff --git a/db/table_cache.cc b/db/table_cache.cc index f8c81f9f9..9b0e8edb9 100644 --- a/db/table_cache.cc +++ b/db/table_cache.cc @@ -87,15 +87,16 @@ void TableCache::ReleaseHandle(Cache::Handle* handle) { Status TableCache::GetTableReader( const EnvOptions& env_options, const InternalKeyComparator& internal_comparator, const FileDescriptor& fd, - bool sequential_mode, bool record_read_stats, HistogramImpl* file_read_hist, - unique_ptr* table_reader, bool skip_filters, int level) { + bool sequential_mode, size_t readahead, bool record_read_stats, + HistogramImpl* file_read_hist, unique_ptr* table_reader, + bool skip_filters, int level) { std::string fname = TableFileName(ioptions_.db_paths, fd.GetNumber(), fd.GetPathId()); unique_ptr file; Status s = ioptions_.env->NewRandomAccessFile(fname, &file, env_options); - if (sequential_mode && ioptions_.compaction_readahead_size > 0) { - file = NewReadaheadRandomAccessFile(std::move(file), - ioptions_.compaction_readahead_size); + + if (readahead > 0) { + file = NewReadaheadRandomAccessFile(std::move(file), readahead); } RecordTick(ioptions_.statistics, NO_FILE_OPENS); if (s.ok()) { @@ -143,8 +144,9 @@ Status TableCache::FindTable(const EnvOptions& env_options, } unique_ptr table_reader; s = GetTableReader(env_options, internal_comparator, fd, - false /* sequential mode */, record_read_stats, - file_read_hist, &table_reader, skip_filters, level); + false /* sequential mode */, 0 /* readahead */, + record_read_stats, file_read_hist, &table_reader, + skip_filters, level); if (!s.ok()) { assert(table_reader == nullptr); RecordTick(ioptions_.statistics, NO_FILE_ERRORS); @@ -175,13 +177,24 @@ InternalIterator* TableCache::NewIterator( TableReader* table_reader = nullptr; Cache::Handle* handle = nullptr; - bool create_new_table_reader = - (for_compaction && ioptions_.new_table_reader_for_compaction_inputs); + + size_t readahead = 0; + bool create_new_table_reader = false; + if (for_compaction) { + if (ioptions_.new_table_reader_for_compaction_inputs) { + readahead = ioptions_.compaction_readahead_size; + create_new_table_reader = true; + } + } else { + readahead = options.readahead_size; + create_new_table_reader = readahead > 0; + } + if (create_new_table_reader) { unique_ptr table_reader_unique_ptr; Status s = GetTableReader( - env_options, icomparator, fd, /* sequential mode */ true, - /* record stats */ false, nullptr, &table_reader_unique_ptr, + env_options, icomparator, fd, true /* sequential_mode */, readahead, + !for_compaction /* record stats */, nullptr, &table_reader_unique_ptr, false /* skip_filters */, level); if (!s.ok()) { return NewErrorInternalIterator(s, arena); diff --git a/db/table_cache.h b/db/table_cache.h index fbb7cacbf..18882c6a2 100644 --- a/db/table_cache.h +++ b/db/table_cache.h @@ -111,7 +111,8 @@ class TableCache { Status GetTableReader(const EnvOptions& env_options, const InternalKeyComparator& internal_comparator, const FileDescriptor& fd, bool sequential_mode, - bool record_read_stats, HistogramImpl* file_read_hist, + size_t readahead, bool record_read_stats, + HistogramImpl* file_read_hist, unique_ptr* table_reader, bool skip_filters = false, int level = -1); diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 67ab2d8f5..d93af2314 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -1466,6 +1466,12 @@ struct ReadOptions { // Default: false bool pin_data; + // If non-zero, NewIterator will create a new table reader which + // performs reads of the given size. Using a large size (> 2MB) can + // improve the performance of forward iteration on spinning disks. + // Default: 0 + size_t readahead_size; + ReadOptions(); ReadOptions(bool cksum, bool cache); }; diff --git a/util/options.cc b/util/options.cc index 44c7604eb..7eef1ab27 100644 --- a/util/options.cc +++ b/util/options.cc @@ -794,7 +794,8 @@ ReadOptions::ReadOptions() managed(false), total_order_seek(false), prefix_same_as_start(false), - pin_data(false) { + pin_data(false), + readahead_size(0) { XFUNC_TEST("", "managed_options", managed_options, xf_manage_options, reinterpret_cast(this)); } @@ -809,7 +810,8 @@ ReadOptions::ReadOptions(bool cksum, bool cache) managed(false), total_order_seek(false), prefix_same_as_start(false), - pin_data(false) { + pin_data(false), + readahead_size(0) { XFUNC_TEST("", "managed_options", managed_options, xf_manage_options, reinterpret_cast(this)); }