Fix bug in rocksdb internal automatic prefetching (#9234)

Summary:
After introducing adaptive_readahead, the original flow got
broken. Readahead size was set to 0 because of which rocksdb wasn't be
able to do automatic prefetching which it enables after seeing
sequential reads. This PR fixes it.

----------------------------------------------------------------------------------------------------

Before this patch:
b_bench -use_existing_db=true -db=/tmp/prefix_scan -benchmarks="seekrandom" -key_size=32 -value_size=512 -num=5000000 -use_direct_reads=true -seek_nexts=327680 -duration=120 -ops_between_duration_checks=1
Initializing RocksDB Options from the specified file
Initializing RocksDB Options from command-line flags
RocksDB:    version 6.27
Date:       Tue Nov 30 11:56:50 2021
CPU:        24 * Intel Core Processor (Broadwell)
CPUCache:   16384 KB
Keys:       32 bytes each (+ 0 bytes user-defined timestamp)
Values:     512 bytes each (256 bytes after compression)
Entries:    5000000
Prefix:    0 bytes
Keys per prefix:    0
RawSize:    2594.0 MB (estimated)
FileSize:   1373.3 MB (estimated)
Write rate: 0 bytes/second
Read rate: 0 ops/second
Compression: Snappy
Compression sampling rate: 0
Memtablerep: SkipListFactory
Perf Level: 1
WARNING: Assertions are enabled; benchmarks unnecessarily slow
------------------------------------------------
DB path: [/tmp/prefix_scan]

seekrandom   : 5356367.174 micros/op 0 ops/sec;   29.4 MB/s (23 of 23 found)

----------------------------------------------------------------------------------------------------

After the patch:
./db_bench -use_existing_db=true -db=/tmp/prefix_scan -benchmarks="seekrandom" -key_size=32 -value_size=512 -num=5000000 -use_direct_reads=true -seek_nexts=327680 -duration=120 -ops_between_duration_checks=1
Initializing RocksDB Options from the specified file
Initializing RocksDB Options from command-line flags
RocksDB:    version 6.27
Date:       Tue Nov 30 14:38:33 2021
CPU:        24 * Intel Core Processor (Broadwell)
CPUCache:   16384 KB
Keys:       32 bytes each (+ 0 bytes user-defined timestamp)
Values:     512 bytes each (256 bytes after compression)
Entries:    5000000
Prefix:    0 bytes
Keys per prefix:    0
RawSize:    2594.0 MB (estimated)
FileSize:   1373.3 MB (estimated)
Write rate: 0 bytes/second
Read rate: 0 ops/second
Compression: Snappy
Compression sampling rate: 0
Memtablerep: SkipListFactory
Perf Level: 1
WARNING: Assertions are enabled; benchmarks unnecessarily slow
------------------------------------------------
DB path: [/tmp/prefix_scan]
seekrandom   :  456504.277 micros/op 2 ops/sec;  359.8 MB/s (264 of 264 found)

Pull Request resolved: https://github.com/facebook/rocksdb/pull/9234

Test Plan:
Ran ./db_bench -db=/data/mysql/rocksdb/prefix_scan
-benchmarks="fillseq" -key_size=32 -value_size=512 -num=5000000 -use_d
irect_io_for_flush_and_compaction=true -target_file_size_base=16777216
and then ./db_bench -use_existing_db=true
-db=/data/mysql/rocksdb/prefix_scan -benchmarks="seekrandom"
-key_size=32 -value_siz
e=512 -num=5000000 -use_direct_reads=true -seek_nexts=327680
-duration=120 -ops_between_duration_checks=1
 and compared the results.

Reviewed By: anand1976

Differential Revision: D32743965

Pulled By: akankshamahajan15

fbshipit-source-id: b950fba68c91963b7deb5c20acdf471bc60251f5
This commit is contained in:
Akanksha Mahajan 2021-11-30 22:52:14 -08:00 committed by akankshamahajan
parent e15c8e6819
commit db0435772f
6 changed files with 55 additions and 23 deletions

View File

@ -123,6 +123,8 @@ bool FilePrefetchBuffer::TryReadFromCache(const IOOptions& opts,
// If readahead is enabled: prefetch the remaining bytes + readahead bytes // If readahead is enabled: prefetch the remaining bytes + readahead bytes
// and satisfy the request. // and satisfy the request.
// If readahead is not enabled: return false. // If readahead is not enabled: return false.
TEST_SYNC_POINT_CALLBACK("FilePrefetchBuffer::TryReadFromCache",
&readahead_size_);
if (offset + n > buffer_offset_ + buffer_.CurrentSize()) { if (offset + n > buffer_offset_ + buffer_.CurrentSize()) {
if (readahead_size_ > 0) { if (readahead_size_ > 0) {
assert(reader != nullptr); assert(reader != nullptr);
@ -161,8 +163,6 @@ bool FilePrefetchBuffer::TryReadFromCache(const IOOptions& opts,
#endif #endif
return false; return false;
} }
TEST_SYNC_POINT_CALLBACK("FilePrefetchBuffer::TryReadFromCache",
&readahead_size_);
readahead_size_ = std::min(max_readahead_size_, readahead_size_ * 2); readahead_size_ = std::min(max_readahead_size_, readahead_size_ * 2);
} else { } else {
return false; return false;

View File

@ -30,6 +30,8 @@ class RandomAccessFileReader;
class FilePrefetchBuffer { class FilePrefetchBuffer {
public: public:
static const int kMinNumFileReadsToStartAutoReadahead = 2; static const int kMinNumFileReadsToStartAutoReadahead = 2;
static const size_t kInitAutoReadaheadSize = 8 * 1024;
// Constructor. // Constructor.
// //
// All arguments are optional. // All arguments are optional.
@ -57,7 +59,6 @@ class FilePrefetchBuffer {
: buffer_offset_(0), : buffer_offset_(0),
readahead_size_(readahead_size), readahead_size_(readahead_size),
max_readahead_size_(max_readahead_size), max_readahead_size_(max_readahead_size),
initial_readahead_size_(readahead_size),
min_offset_read_(port::kMaxSizet), min_offset_read_(port::kMaxSizet),
enable_(enable), enable_(enable),
track_min_offset_(track_min_offset), track_min_offset_(track_min_offset),
@ -95,6 +96,7 @@ class FilePrefetchBuffer {
// tracked if track_min_offset = true. // tracked if track_min_offset = true.
size_t min_offset_read() const { return min_offset_read_; } size_t min_offset_read() const { return min_offset_read_; }
// Called in case of implicit auto prefetching.
void UpdateReadPattern(const uint64_t& offset, const size_t& len, void UpdateReadPattern(const uint64_t& offset, const size_t& len,
bool is_adaptive_readahead = false) { bool is_adaptive_readahead = false) {
if (is_adaptive_readahead) { if (is_adaptive_readahead) {
@ -111,9 +113,10 @@ class FilePrefetchBuffer {
return (prev_len_ == 0 || (prev_offset_ + prev_len_ == offset)); return (prev_len_ == 0 || (prev_offset_ + prev_len_ == offset));
} }
// Called in case of implicit auto prefetching.
void ResetValues() { void ResetValues() {
num_file_reads_ = 1; num_file_reads_ = 1;
readahead_size_ = initial_readahead_size_; readahead_size_ = kInitAutoReadaheadSize;
} }
void GetReadaheadState(ReadaheadFileInfo::ReadaheadInfo* readahead_info) { void GetReadaheadState(ReadaheadFileInfo::ReadaheadInfo* readahead_info) {
@ -136,8 +139,9 @@ class FilePrefetchBuffer {
if ((offset + size > buffer_offset_ + buffer_.CurrentSize()) && if ((offset + size > buffer_offset_ + buffer_.CurrentSize()) &&
IsBlockSequential(offset) && IsBlockSequential(offset) &&
(num_file_reads_ + 1 > kMinNumFileReadsToStartAutoReadahead)) { (num_file_reads_ + 1 > kMinNumFileReadsToStartAutoReadahead)) {
size_t initial_auto_readahead_size = kInitAutoReadaheadSize;
readahead_size_ = readahead_size_ =
std::max(initial_readahead_size_, std::max(initial_auto_readahead_size,
(readahead_size_ >= value ? readahead_size_ - value : 0)); (readahead_size_ >= value ? readahead_size_ - value : 0));
} }
} }
@ -150,7 +154,6 @@ class FilePrefetchBuffer {
// FilePrefetchBuffer object won't be created from Iterator flow if // FilePrefetchBuffer object won't be created from Iterator flow if
// max_readahead_size_ = 0. // max_readahead_size_ = 0.
size_t max_readahead_size_; size_t max_readahead_size_;
size_t initial_readahead_size_;
// The minimum `offset` ever passed to TryReadFromCache(). // The minimum `offset` ever passed to TryReadFromCache().
size_t min_offset_read_; size_t min_offset_read_;
// if false, TryReadFromCache() always return false, and we only take stats // if false, TryReadFromCache() always return false, and we only take stats

View File

@ -670,13 +670,16 @@ TEST_P(PrefetchTest, PrefetchWhenReseekwithCache) {
Close(); Close();
} }
class PrefetchTest1 : public DBTestBase, class PrefetchTest1
public ::testing::WithParamInterface<bool> { : public DBTestBase,
public ::testing::WithParamInterface<std::tuple<bool, bool>> {
public: public:
PrefetchTest1() : DBTestBase("prefetch_test1", true) {} PrefetchTest1() : DBTestBase("prefetch_test1", true) {}
}; };
INSTANTIATE_TEST_CASE_P(PrefetchTest1, PrefetchTest1, ::testing::Bool()); INSTANTIATE_TEST_CASE_P(PrefetchTest1, PrefetchTest1,
::testing::Combine(::testing::Bool(),
::testing::Bool()));
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
TEST_P(PrefetchTest1, DBIterLevelReadAhead) { TEST_P(PrefetchTest1, DBIterLevelReadAhead) {
@ -686,12 +689,13 @@ TEST_P(PrefetchTest1, DBIterLevelReadAhead) {
std::make_shared<MockFS>(env_->GetFileSystem(), false); std::make_shared<MockFS>(env_->GetFileSystem(), false);
std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs)); std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs));
bool is_adaptive_readahead = std::get<1>(GetParam());
Options options = CurrentOptions(); Options options = CurrentOptions();
options.write_buffer_size = 1024; options.write_buffer_size = 1024;
options.create_if_missing = true; options.create_if_missing = true;
options.compression = kNoCompression; options.compression = kNoCompression;
options.env = env.get(); options.env = env.get();
if (GetParam()) { if (std::get<0>(GetParam())) {
options.use_direct_reads = true; options.use_direct_reads = true;
options.use_direct_io_for_flush_and_compaction = true; options.use_direct_io_for_flush_and_compaction = true;
} }
@ -704,7 +708,8 @@ TEST_P(PrefetchTest1, DBIterLevelReadAhead) {
options.table_factory.reset(NewBlockBasedTableFactory(table_options)); options.table_factory.reset(NewBlockBasedTableFactory(table_options));
Status s = TryReopen(options); Status s = TryReopen(options);
if (GetParam() && (s.IsNotSupported() || s.IsInvalidArgument())) { if (std::get<0>(GetParam()) &&
(s.IsNotSupported() || s.IsInvalidArgument())) {
// If direct IO is not supported, skip the test // If direct IO is not supported, skip the test
return; return;
} else { } else {
@ -748,12 +753,15 @@ TEST_P(PrefetchTest1, DBIterLevelReadAhead) {
SyncPoint::GetInstance()->SetCallBack( SyncPoint::GetInstance()->SetCallBack(
"FilePrefetchBuffer::TryReadFromCache", [&](void* arg) { "FilePrefetchBuffer::TryReadFromCache", [&](void* arg) {
current_readahead_size = *reinterpret_cast<size_t*>(arg); current_readahead_size = *reinterpret_cast<size_t*>(arg);
ASSERT_GT(current_readahead_size, 0);
}); });
SyncPoint::GetInstance()->EnableProcessing(); SyncPoint::GetInstance()->EnableProcessing();
ReadOptions ro; ReadOptions ro;
ro.adaptive_readahead = true; if (is_adaptive_readahead) {
ro.adaptive_readahead = true;
}
auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ro)); auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ro));
int num_keys = 0; int num_keys = 0;
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
@ -763,14 +771,28 @@ TEST_P(PrefetchTest1, DBIterLevelReadAhead) {
ASSERT_GT(buff_prefetch_count, 0); ASSERT_GT(buff_prefetch_count, 0);
buff_prefetch_count = 0; buff_prefetch_count = 0;
// For index and data blocks. // For index and data blocks.
ASSERT_EQ(readahead_carry_over_count, 2 * (num_sst_files - 1)); if (is_adaptive_readahead) {
ASSERT_EQ(readahead_carry_over_count, 2 * (num_sst_files - 1));
} else {
ASSERT_EQ(readahead_carry_over_count, 0);
}
SyncPoint::GetInstance()->DisableProcessing(); SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks(); SyncPoint::GetInstance()->ClearAllCallBacks();
} }
Close(); Close();
} }
#endif //! ROCKSDB_LITE
TEST_P(PrefetchTest1, NonSequentialReads) { class PrefetchTest2 : public DBTestBase,
public ::testing::WithParamInterface<bool> {
public:
PrefetchTest2() : DBTestBase("prefetch_test2", true) {}
};
INSTANTIATE_TEST_CASE_P(PrefetchTest2, PrefetchTest2, ::testing::Bool());
#ifndef ROCKSDB_LITE
TEST_P(PrefetchTest2, NonSequentialReads) {
const int kNumKeys = 1000; const int kNumKeys = 1000;
// Set options // Set options
std::shared_ptr<MockFS> fs = std::shared_ptr<MockFS> fs =
@ -856,7 +878,7 @@ TEST_P(PrefetchTest1, NonSequentialReads) {
} }
#endif //! ROCKSDB_LITE #endif //! ROCKSDB_LITE
TEST_P(PrefetchTest1, DecreaseReadAheadIfInCache) { TEST_P(PrefetchTest2, DecreaseReadAheadIfInCache) {
const int kNumKeys = 2000; const int kNumKeys = 2000;
// Set options // Set options
std::shared_ptr<MockFS> fs = std::shared_ptr<MockFS> fs =

View File

@ -161,10 +161,12 @@ class BlockBasedTableIterator : public InternalIteratorBase<Slice> {
} }
void SetReadaheadState(ReadaheadFileInfo* readahead_file_info) override { void SetReadaheadState(ReadaheadFileInfo* readahead_file_info) override {
block_prefetcher_.SetReadaheadState( if (read_options_.adaptive_readahead) {
&(readahead_file_info->data_block_readahead_info)); block_prefetcher_.SetReadaheadState(
if (index_iter_) { &(readahead_file_info->data_block_readahead_info));
index_iter_->SetReadaheadState(readahead_file_info); if (index_iter_) {
index_iter_->SetReadaheadState(readahead_file_info);
}
} }
} }

View File

@ -30,8 +30,11 @@ class BlockPrefetcher {
void ResetValues() { void ResetValues() {
num_file_reads_ = 1; num_file_reads_ = 1;
readahead_size_ = BlockBasedTable::kInitAutoReadaheadSize; // Since initial_auto_readahead_size_ can be different from
initial_auto_readahead_size_ = readahead_size_; // kInitAutoReadaheadSize in case of adaptive_readahead, so fallback the
// readahead_size_ to kInitAutoReadaheadSize in case of reset.
initial_auto_readahead_size_ = BlockBasedTable::kInitAutoReadaheadSize;
readahead_size_ = initial_auto_readahead_size_;
readahead_limit_ = 0; readahead_limit_ = 0;
return; return;
} }

View File

@ -123,8 +123,10 @@ class PartitionedIndexIterator : public InternalIteratorBase<IndexValue> {
} }
void SetReadaheadState(ReadaheadFileInfo* readahead_file_info) override { void SetReadaheadState(ReadaheadFileInfo* readahead_file_info) override {
block_prefetcher_.SetReadaheadState( if (read_options_.adaptive_readahead) {
&(readahead_file_info->index_block_readahead_info)); block_prefetcher_.SetReadaheadState(
&(readahead_file_info->index_block_readahead_info));
}
} }
std::unique_ptr<InternalIteratorBase<IndexValue>> index_iter_; std::unique_ptr<InternalIteratorBase<IndexValue>> index_iter_;