Enable configurable readahead for iterators
Summary: Add an option `iterator_readahead_size` to `ReadOptions` to enable configurable readahead for iterators similar to the corresponding option for compaction. Test Plan: ``` make commit_prereq ``` Reviewers: kumar.rangarajan, ott, igor, sdong Reviewed By: sdong Subscribers: yiwu, andrewkr, dhruba Differential Revision: https://reviews.facebook.net/D55419
This commit is contained in:
parent
2137377f0e
commit
2182bf2828
@ -5,6 +5,8 @@
|
||||
* Delete deprecated classes for creating backups (BackupableDB) and restoring from backups (RestoreBackupableDB). Now, BackupEngine should be used for creating backups, and BackupEngineReadOnly should be used for restorations. For more details, see https://github.com/facebook/rocksdb/wiki/How-to-backup-RocksDB%3F
|
||||
* Expose estimate of per-level compression ratio via DB property: "rocksdb.compression-ratio-at-levelN".
|
||||
* Added EventListener::OnTableFileCreationStarted. EventListener::OnTableFileCreated will be called on failure case. User can check creation status via TableFileCreationInfo::status.
|
||||
### New Features
|
||||
* Add ReadOptions::readahead_size. If non-zero, NewIterator will create a new table reader which performs reads of the given size.
|
||||
|
||||
## 4.7.0 (4/8/2016)
|
||||
### Public API Change
|
||||
|
@ -9,6 +9,7 @@
|
||||
|
||||
#include "db/db_test_util.h"
|
||||
#include "port/stack_trace.h"
|
||||
#include "rocksdb/iostats_context.h"
|
||||
#include "rocksdb/perf_context.h"
|
||||
|
||||
namespace rocksdb {
|
||||
@ -1525,6 +1526,75 @@ TEST_F(DBIteratorTest, IteratorWithLocalStatistics) {
|
||||
ASSERT_EQ(TestGetTickerCount(options, ITER_BYTES_READ), total_bytes);
|
||||
}
|
||||
|
||||
TEST_F(DBIteratorTest, ReadAhead) {
|
||||
Options options;
|
||||
auto env = new SpecialEnv(Env::Default());
|
||||
env->count_random_reads_ = true;
|
||||
options.env = env;
|
||||
options.disable_auto_compactions = true;
|
||||
options.write_buffer_size = 4 << 20;
|
||||
options.statistics = rocksdb::CreateDBStatistics();
|
||||
BlockBasedTableOptions table_options;
|
||||
table_options.block_size = 1024;
|
||||
table_options.no_block_cache = true;
|
||||
options.table_factory.reset(new BlockBasedTableFactory(table_options));
|
||||
Reopen(options);
|
||||
|
||||
std::string value(1024, 'a');
|
||||
for (int i = 0; i < 100; i++) {
|
||||
Put(Key(i), value);
|
||||
}
|
||||
ASSERT_OK(Flush());
|
||||
MoveFilesToLevel(2);
|
||||
|
||||
for (int i = 0; i < 100; i++) {
|
||||
Put(Key(i), value);
|
||||
}
|
||||
ASSERT_OK(Flush());
|
||||
MoveFilesToLevel(1);
|
||||
|
||||
for (int i = 0; i < 100; i++) {
|
||||
Put(Key(i), value);
|
||||
}
|
||||
ASSERT_OK(Flush());
|
||||
ASSERT_EQ("1,1,1", FilesPerLevel());
|
||||
|
||||
env->random_read_bytes_counter_ = 0;
|
||||
options.statistics->setTickerCount(NO_FILE_OPENS, 0);
|
||||
ReadOptions read_options;
|
||||
auto* iter = db_->NewIterator(read_options);
|
||||
iter->SeekToFirst();
|
||||
int64_t num_file_opens = TestGetTickerCount(options, NO_FILE_OPENS);
|
||||
int64_t bytes_read = env->random_read_bytes_counter_;
|
||||
delete iter;
|
||||
|
||||
env->random_read_bytes_counter_ = 0;
|
||||
options.statistics->setTickerCount(NO_FILE_OPENS, 0);
|
||||
read_options.readahead_size = 1024 * 10;
|
||||
iter = db_->NewIterator(read_options);
|
||||
iter->SeekToFirst();
|
||||
int64_t num_file_opens_readahead = TestGetTickerCount(options, NO_FILE_OPENS);
|
||||
int64_t bytes_read_readahead = env->random_read_bytes_counter_;
|
||||
delete iter;
|
||||
ASSERT_EQ(num_file_opens + 3, num_file_opens_readahead);
|
||||
ASSERT_GT(bytes_read_readahead, bytes_read);
|
||||
ASSERT_GT(bytes_read_readahead, read_options.readahead_size * 3);
|
||||
|
||||
// Verify correctness.
|
||||
iter = db_->NewIterator(read_options);
|
||||
int count = 0;
|
||||
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
|
||||
ASSERT_EQ(value, iter->value());
|
||||
count++;
|
||||
}
|
||||
ASSERT_EQ(100, count);
|
||||
for (int i = 0; i < 100; i++) {
|
||||
iter->Seek(Key(i));
|
||||
ASSERT_EQ(value, iter->value());
|
||||
}
|
||||
delete iter;
|
||||
}
|
||||
|
||||
} // namespace rocksdb
|
||||
|
||||
int main(int argc, char** argv) {
|
||||
|
@ -358,23 +358,30 @@ class SpecialEnv : public EnvWrapper {
|
||||
class CountingFile : public RandomAccessFile {
|
||||
public:
|
||||
CountingFile(unique_ptr<RandomAccessFile>&& target,
|
||||
anon::AtomicCounter* counter)
|
||||
: target_(std::move(target)), counter_(counter) {}
|
||||
anon::AtomicCounter* counter,
|
||||
std::atomic<int64_t>* bytes_read)
|
||||
: target_(std::move(target)),
|
||||
counter_(counter),
|
||||
bytes_read_(bytes_read) {}
|
||||
virtual Status Read(uint64_t offset, size_t n, Slice* result,
|
||||
char* scratch) const override {
|
||||
counter_->Increment();
|
||||
return target_->Read(offset, n, result, scratch);
|
||||
Status s = target_->Read(offset, n, result, scratch);
|
||||
*bytes_read_ += result->size();
|
||||
return s;
|
||||
}
|
||||
|
||||
private:
|
||||
unique_ptr<RandomAccessFile> target_;
|
||||
anon::AtomicCounter* counter_;
|
||||
std::atomic<int64_t>* bytes_read_;
|
||||
};
|
||||
|
||||
Status s = target()->NewRandomAccessFile(f, r, soptions);
|
||||
random_file_open_counter_++;
|
||||
if (s.ok() && count_random_reads_) {
|
||||
r->reset(new CountingFile(std::move(*r), &random_read_counter_));
|
||||
r->reset(new CountingFile(std::move(*r), &random_read_counter_,
|
||||
&random_read_bytes_counter_));
|
||||
}
|
||||
return s;
|
||||
}
|
||||
@ -464,6 +471,7 @@ class SpecialEnv : public EnvWrapper {
|
||||
|
||||
bool count_random_reads_;
|
||||
anon::AtomicCounter random_read_counter_;
|
||||
std::atomic<int64_t> random_read_bytes_counter_;
|
||||
std::atomic<int> random_file_open_counter_;
|
||||
|
||||
bool count_sequential_reads_;
|
||||
|
@ -87,15 +87,16 @@ void TableCache::ReleaseHandle(Cache::Handle* handle) {
|
||||
Status TableCache::GetTableReader(
|
||||
const EnvOptions& env_options,
|
||||
const InternalKeyComparator& internal_comparator, const FileDescriptor& fd,
|
||||
bool sequential_mode, bool record_read_stats, HistogramImpl* file_read_hist,
|
||||
unique_ptr<TableReader>* table_reader, bool skip_filters, int level) {
|
||||
bool sequential_mode, size_t readahead, bool record_read_stats,
|
||||
HistogramImpl* file_read_hist, unique_ptr<TableReader>* table_reader,
|
||||
bool skip_filters, int level) {
|
||||
std::string fname =
|
||||
TableFileName(ioptions_.db_paths, fd.GetNumber(), fd.GetPathId());
|
||||
unique_ptr<RandomAccessFile> file;
|
||||
Status s = ioptions_.env->NewRandomAccessFile(fname, &file, env_options);
|
||||
if (sequential_mode && ioptions_.compaction_readahead_size > 0) {
|
||||
file = NewReadaheadRandomAccessFile(std::move(file),
|
||||
ioptions_.compaction_readahead_size);
|
||||
|
||||
if (readahead > 0) {
|
||||
file = NewReadaheadRandomAccessFile(std::move(file), readahead);
|
||||
}
|
||||
RecordTick(ioptions_.statistics, NO_FILE_OPENS);
|
||||
if (s.ok()) {
|
||||
@ -143,8 +144,9 @@ Status TableCache::FindTable(const EnvOptions& env_options,
|
||||
}
|
||||
unique_ptr<TableReader> table_reader;
|
||||
s = GetTableReader(env_options, internal_comparator, fd,
|
||||
false /* sequential mode */, record_read_stats,
|
||||
file_read_hist, &table_reader, skip_filters, level);
|
||||
false /* sequential mode */, 0 /* readahead */,
|
||||
record_read_stats, file_read_hist, &table_reader,
|
||||
skip_filters, level);
|
||||
if (!s.ok()) {
|
||||
assert(table_reader == nullptr);
|
||||
RecordTick(ioptions_.statistics, NO_FILE_ERRORS);
|
||||
@ -175,13 +177,24 @@ InternalIterator* TableCache::NewIterator(
|
||||
|
||||
TableReader* table_reader = nullptr;
|
||||
Cache::Handle* handle = nullptr;
|
||||
bool create_new_table_reader =
|
||||
(for_compaction && ioptions_.new_table_reader_for_compaction_inputs);
|
||||
|
||||
size_t readahead = 0;
|
||||
bool create_new_table_reader = false;
|
||||
if (for_compaction) {
|
||||
if (ioptions_.new_table_reader_for_compaction_inputs) {
|
||||
readahead = ioptions_.compaction_readahead_size;
|
||||
create_new_table_reader = true;
|
||||
}
|
||||
} else {
|
||||
readahead = options.readahead_size;
|
||||
create_new_table_reader = readahead > 0;
|
||||
}
|
||||
|
||||
if (create_new_table_reader) {
|
||||
unique_ptr<TableReader> table_reader_unique_ptr;
|
||||
Status s = GetTableReader(
|
||||
env_options, icomparator, fd, /* sequential mode */ true,
|
||||
/* record stats */ false, nullptr, &table_reader_unique_ptr,
|
||||
env_options, icomparator, fd, true /* sequential_mode */, readahead,
|
||||
!for_compaction /* record stats */, nullptr, &table_reader_unique_ptr,
|
||||
false /* skip_filters */, level);
|
||||
if (!s.ok()) {
|
||||
return NewErrorInternalIterator(s, arena);
|
||||
|
@ -111,7 +111,8 @@ class TableCache {
|
||||
Status GetTableReader(const EnvOptions& env_options,
|
||||
const InternalKeyComparator& internal_comparator,
|
||||
const FileDescriptor& fd, bool sequential_mode,
|
||||
bool record_read_stats, HistogramImpl* file_read_hist,
|
||||
size_t readahead, bool record_read_stats,
|
||||
HistogramImpl* file_read_hist,
|
||||
unique_ptr<TableReader>* table_reader,
|
||||
bool skip_filters = false, int level = -1);
|
||||
|
||||
|
@ -1466,6 +1466,12 @@ struct ReadOptions {
|
||||
// Default: false
|
||||
bool pin_data;
|
||||
|
||||
// If non-zero, NewIterator will create a new table reader which
|
||||
// performs reads of the given size. Using a large size (> 2MB) can
|
||||
// improve the performance of forward iteration on spinning disks.
|
||||
// Default: 0
|
||||
size_t readahead_size;
|
||||
|
||||
ReadOptions();
|
||||
ReadOptions(bool cksum, bool cache);
|
||||
};
|
||||
|
@ -794,7 +794,8 @@ ReadOptions::ReadOptions()
|
||||
managed(false),
|
||||
total_order_seek(false),
|
||||
prefix_same_as_start(false),
|
||||
pin_data(false) {
|
||||
pin_data(false),
|
||||
readahead_size(0) {
|
||||
XFUNC_TEST("", "managed_options", managed_options, xf_manage_options,
|
||||
reinterpret_cast<ReadOptions*>(this));
|
||||
}
|
||||
@ -809,7 +810,8 @@ ReadOptions::ReadOptions(bool cksum, bool cache)
|
||||
managed(false),
|
||||
total_order_seek(false),
|
||||
prefix_same_as_start(false),
|
||||
pin_data(false) {
|
||||
pin_data(false),
|
||||
readahead_size(0) {
|
||||
XFUNC_TEST("", "managed_options", managed_options, xf_manage_options,
|
||||
reinterpret_cast<ReadOptions*>(this));
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user