Make initial auto readahead_size configurable (#9836)

Summary:
Make initial auto readahead_size configurable

Pull Request resolved: https://github.com/facebook/rocksdb/pull/9836

Test Plan:
Added new unit test
Ran regression:
Without change:

```
./db_bench -use_existing_db=true -db=/tmp/prefix_scan_prefetch_main -benchmarks="seekrandom" -key_size=32 -value_size=512 -num=5000000 -use_direct_reads=true -seek_nexts=327680 -duration=120 -ops_between_duration_checks=1
Initializing RocksDB Options from the specified file
Initializing RocksDB Options from command-line flags
RocksDB:    version 7.0
Date:       Thu Mar 17 13:11:34 2022
CPU:        24 * Intel Core Processor (Broadwell)
CPUCache:   16384 KB
Keys:       32 bytes each (+ 0 bytes user-defined timestamp)
Values:     512 bytes each (256 bytes after compression)
Entries:    5000000
Prefix:    0 bytes
Keys per prefix:    0
RawSize:    2594.0 MB (estimated)
FileSize:   1373.3 MB (estimated)
Write rate: 0 bytes/second
Read rate: 0 ops/second
Compression: Snappy
Compression sampling rate: 0
Memtablerep: SkipListFactory
Perf Level: 1
------------------------------------------------
DB path: [/tmp/prefix_scan_prefetch_main]
seekrandom   :  483618.390 micros/op 2 ops/sec;  338.9 MB/s (249 of 249 found)
```

With this change:
```
 ./db_bench -use_existing_db=true -db=/tmp/prefix_scan_prefetch_main -benchmarks="seekrandom" -key_size=32 -value_size=512 -num=5000000 -use_direct_reads=true -seek_nexts=327680 -duration=120 -ops_between_duration_checks=1
Set seed to 1649895440554504 because --seed was 0
Initializing RocksDB Options from the specified file
Initializing RocksDB Options from command-line flags
RocksDB:    version 7.2
Date:       Wed Apr 13 17:17:20 2022
CPU:        24 * Intel Core Processor (Broadwell)
CPUCache:   16384 KB
Keys:       32 bytes each (+ 0 bytes user-defined timestamp)
Values:     512 bytes each (256 bytes after compression)
Entries:    5000000
Prefix:    0 bytes
Keys per prefix:    0
RawSize:    2594.0 MB (estimated)
FileSize:   1373.3 MB (estimated)
Write rate: 0 bytes/second
Read rate: 0 ops/second
Compression: Snappy
Compression sampling rate: 0
Memtablerep: SkipListFactory
Perf Level: 1
------------------------------------------------
DB path: [/tmp/prefix_scan_prefetch_main]
... finished 100 ops
seekrandom   :  476892.488 micros/op 2 ops/sec;  344.6 MB/s (252 of 252 found)
```

Reviewed By: anand1976

Differential Revision: D35632815

Pulled By: akankshamahajan15

fbshipit-source-id: c8057a88f9294c9d03b1d434b03affe02f74d796
This commit is contained in:
Akanksha Mahajan 2022-04-15 17:28:09 -07:00 committed by Facebook GitHub Bot
parent d5dfa8c6fe
commit 0c7f455f85
11 changed files with 214 additions and 28 deletions

View File

@ -20,6 +20,7 @@
* Enable async prefetching if ReadOptions.readahead_size is set along with ReadOptions.async_io in FilePrefetchBuffer. * Enable async prefetching if ReadOptions.readahead_size is set along with ReadOptions.async_io in FilePrefetchBuffer.
* Add event listener support on remote compaction compactor side. * Add event listener support on remote compaction compactor side.
* Added a dedicated integer DB property `rocksdb.live-blob-file-garbage-size` that exposes the total amount of garbage in the blob files in the current version. * Added a dedicated integer DB property `rocksdb.live-blob-file-garbage-size` that exposes the total amount of garbage in the blob files in the current version.
* RocksDB does internal auto prefetching if it notices sequential reads. It starts with readahead size `initial_auto_readahead_size` which now can be configured through BlockBasedTableOptions.
### Behavior changes ### Behavior changes
* Disallow usage of commit-time-write-batch for write-prepared/write-unprepared transactions if TransactionOptions::use_only_the_last_commit_time_batch_for_recovery is false to prevent two (or more) uncommitted versions of the same key in the database. Otherwise, bottommost compaction may violate the internal key uniqueness invariant of SSTs if the sequence numbers of both internal keys are zeroed out (#9794). * Disallow usage of commit-time-write-batch for write-prepared/write-unprepared transactions if TransactionOptions::use_only_the_last_commit_time_batch_for_recovery is false to prevent two (or more) uncommitted versions of the same key in the database. Otherwise, bottommost compaction may violate the internal key uniqueness invariant of SSTs if the sequence numbers of both internal keys are zeroed out (#9794).

View File

@ -36,7 +36,6 @@ struct BufferInfo {
class FilePrefetchBuffer { class FilePrefetchBuffer {
public: public:
static const int kMinNumFileReadsToStartAutoReadahead = 2; static const int kMinNumFileReadsToStartAutoReadahead = 2;
static const size_t kInitAutoReadaheadSize = 8 * 1024;
// Constructor. // Constructor.
// //
@ -68,6 +67,7 @@ class FilePrefetchBuffer {
bool async_io = false, FileSystem* fs = nullptr) bool async_io = false, FileSystem* fs = nullptr)
: curr_(0), : curr_(0),
readahead_size_(readahead_size), readahead_size_(readahead_size),
initial_auto_readahead_size_(readahead_size),
max_readahead_size_(max_readahead_size), max_readahead_size_(max_readahead_size),
min_offset_read_(port::kMaxSizet), min_offset_read_(port::kMaxSizet),
enable_(enable), enable_(enable),
@ -184,9 +184,8 @@ class FilePrefetchBuffer {
bufs_[curr_].offset_ + bufs_[curr_].buffer_.CurrentSize()) && bufs_[curr_].offset_ + bufs_[curr_].buffer_.CurrentSize()) &&
IsBlockSequential(offset) && IsBlockSequential(offset) &&
(num_file_reads_ + 1 > kMinNumFileReadsToStartAutoReadahead)) { (num_file_reads_ + 1 > kMinNumFileReadsToStartAutoReadahead)) {
size_t initial_auto_readahead_size = kInitAutoReadaheadSize;
readahead_size_ = readahead_size_ =
std::max(initial_auto_readahead_size, std::max(initial_auto_readahead_size_,
(readahead_size_ >= value ? readahead_size_ - value : 0)); (readahead_size_ >= value ? readahead_size_ - value : 0));
} }
} }
@ -238,7 +237,7 @@ class FilePrefetchBuffer {
// Called in case of implicit auto prefetching. // Called in case of implicit auto prefetching.
void ResetValues() { void ResetValues() {
num_file_reads_ = 1; num_file_reads_ = 1;
readahead_size_ = kInitAutoReadaheadSize; readahead_size_ = initial_auto_readahead_size_;
} }
std::vector<BufferInfo> bufs_; std::vector<BufferInfo> bufs_;
@ -246,6 +245,7 @@ class FilePrefetchBuffer {
// consumed currently. // consumed currently.
uint32_t curr_; uint32_t curr_;
size_t readahead_size_; size_t readahead_size_;
size_t initial_auto_readahead_size_;
// FilePrefetchBuffer object won't be created from Iterator flow if // FilePrefetchBuffer object won't be created from Iterator flow if
// max_readahead_size_ = 0. // max_readahead_size_ = 0.
size_t max_readahead_size_; size_t max_readahead_size_;

View File

@ -275,8 +275,8 @@ TEST_P(PrefetchTest, ConfigureAutoMaxReadaheadSize) {
break; break;
case 1: case 1:
// max_auto_readahead_size is set less than // max_auto_readahead_size is set less than
// BlockBasedTable::kInitAutoReadaheadSize. So readahead_size remains // initial_auto_readahead_size. So readahead_size remains equal to
// equal to max_auto_readahead_size. // max_auto_readahead_size.
ASSERT_OK(db_->SetOptions({{"block_based_table_factory", ASSERT_OK(db_->SetOptions({{"block_based_table_factory",
"{max_auto_readahead_size=4096;}"}})); "{max_auto_readahead_size=4096;}"}}));
break; break;
@ -321,6 +321,145 @@ TEST_P(PrefetchTest, ConfigureAutoMaxReadaheadSize) {
SyncPoint::GetInstance()->ClearAllCallBacks(); SyncPoint::GetInstance()->ClearAllCallBacks();
Close(); Close();
} }
TEST_P(PrefetchTest, ConfigureInternalAutoReadaheadSize) {
// First param is if the mockFS support_prefetch or not
bool support_prefetch =
std::get<0>(GetParam()) &&
test::IsPrefetchSupported(env_->GetFileSystem(), dbname_);
// Second param is if directIO is enabled or not
bool use_direct_io = std::get<1>(GetParam());
std::shared_ptr<MockFS> fs =
std::make_shared<MockFS>(env_->GetFileSystem(), support_prefetch);
std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs));
Options options = CurrentOptions();
options.write_buffer_size = 1024;
options.create_if_missing = true;
options.compression = kNoCompression;
options.env = env.get();
options.disable_auto_compactions = true;
if (use_direct_io) {
options.use_direct_reads = true;
options.use_direct_io_for_flush_and_compaction = true;
}
BlockBasedTableOptions table_options;
table_options.no_block_cache = true;
table_options.cache_index_and_filter_blocks = false;
table_options.metadata_block_size = 1024;
table_options.index_type =
BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
table_options.initial_auto_readahead_size = 0;
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
int buff_prefetch_count = 0;
// DB open will create table readers unless we reduce the table cache
// capacity. SanitizeOptions will set max_open_files to minimum of 20.
// Table cache is allocated with max_open_files - 10 as capacity. So
// override max_open_files to 10 so table cache capacity will become 0.
// This will prevent file open during DB open and force the file to be
// opened during Iteration.
SyncPoint::GetInstance()->SetCallBack(
"SanitizeOptions::AfterChangeMaxOpenFiles", [&](void* arg) {
int* max_open_files = (int*)arg;
*max_open_files = 11;
});
SyncPoint::GetInstance()->SetCallBack("FilePrefetchBuffer::Prefetch:Start",
[&](void*) { buff_prefetch_count++; });
SyncPoint::GetInstance()->EnableProcessing();
SyncPoint::GetInstance()->EnableProcessing();
Status s = TryReopen(options);
if (use_direct_io && (s.IsNotSupported() || s.IsInvalidArgument())) {
// If direct IO is not supported, skip the test
return;
} else {
ASSERT_OK(s);
}
Random rnd(309);
int key_count = 0;
const int num_keys_per_level = 100;
// Level 0 : Keys in range [0, 99], Level 1:[100, 199], Level 2:[200, 299].
for (int level = 2; level >= 0; level--) {
key_count = level * num_keys_per_level;
for (int i = 0; i < num_keys_per_level; ++i) {
ASSERT_OK(Put(Key(key_count++), rnd.RandomString(500)));
}
ASSERT_OK(Flush());
MoveFilesToLevel(level);
}
Close();
TryReopen(options);
{
auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ReadOptions()));
fs->ClearPrefetchCount();
buff_prefetch_count = 0;
std::vector<int> buff_prefetch_level_count = {0, 0, 0};
for (int level = 2; level >= 0; level--) {
key_count = level * num_keys_per_level;
switch (level) {
case 0:
// initial_auto_readahead_size is set 0 so data and index blocks are
// not prefetched.
ASSERT_OK(db_->SetOptions({{"block_based_table_factory",
"{initial_auto_readahead_size=0;}"}}));
break;
case 1:
// intial_auto_readahead_size and max_auto_readahead_size are set same
// so readahead_size remains same.
ASSERT_OK(db_->SetOptions({{"block_based_table_factory",
"{initial_auto_readahead_size=4096;max_"
"auto_readahead_size=4096;}"}}));
break;
case 2:
ASSERT_OK(
db_->SetOptions({{"block_based_table_factory",
"{initial_auto_readahead_size=65536;}"}}));
break;
default:
assert(false);
}
for (int i = 0; i < num_keys_per_level; ++i) {
iter->Seek(Key(key_count++));
iter->Next();
}
buff_prefetch_level_count[level] = buff_prefetch_count;
if (support_prefetch && !use_direct_io) {
if (level == 0) {
ASSERT_FALSE(fs->IsPrefetchCalled());
} else {
ASSERT_TRUE(fs->IsPrefetchCalled());
}
fs->ClearPrefetchCount();
} else {
ASSERT_FALSE(fs->IsPrefetchCalled());
if (level == 0) {
ASSERT_EQ(buff_prefetch_count, 0);
} else {
ASSERT_GT(buff_prefetch_count, 0);
}
buff_prefetch_count = 0;
}
}
if (!support_prefetch) {
ASSERT_GT(buff_prefetch_level_count[1], buff_prefetch_level_count[2]);
}
}
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
Close();
}
#endif // !ROCKSDB_LITE #endif // !ROCKSDB_LITE
TEST_P(PrefetchTest, PrefetchWhenReseek) { TEST_P(PrefetchTest, PrefetchWhenReseek) {

View File

@ -501,14 +501,15 @@ struct BlockBasedTableOptions {
// RocksDB does auto-readahead for iterators on noticing more than two reads // RocksDB does auto-readahead for iterators on noticing more than two reads
// for a table file if user doesn't provide readahead_size. The readahead // for a table file if user doesn't provide readahead_size. The readahead
// starts at 8KB and doubles on every additional read upto // starts at BlockBasedTableOptions.initial_auto_readahead_size (default: 8KB)
// max_auto_readahead_size and max_auto_readahead_size can be configured. // and doubles on every additional read upto max_auto_readahead_size and
// max_auto_readahead_size can be configured.
// //
// Special Value: 0 - If max_auto_readahead_size is set 0 then no implicit // Special Value: 0 - If max_auto_readahead_size is set 0 then it will disable
// auto prefetching will be done. If max_auto_readahead_size provided is less // the implicit auto prefetching.
// than 8KB (which is initial readahead size used by rocksdb in case of // If max_auto_readahead_size provided is less
// auto-readahead), readahead size will remain same as // than initial_auto_readahead_size, then RocksDB will sanitize the
// max_auto_readahead_size. // initial_auto_readahead_size and set it to max_auto_readahead_size.
// //
// Value should be provided along with KB i.e. 256 * 1024 as it will prefetch // Value should be provided along with KB i.e. 256 * 1024 as it will prefetch
// the blocks. // the blocks.
@ -547,6 +548,35 @@ struct BlockBasedTableOptions {
PrepopulateBlockCache prepopulate_block_cache = PrepopulateBlockCache prepopulate_block_cache =
PrepopulateBlockCache::kDisable; PrepopulateBlockCache::kDisable;
// RocksDB does auto-readahead for iterators on noticing more than two reads
// for a table file if user doesn't provide readahead_size. The readahead size
// starts at initial_auto_readahead_size and doubles on every additional read
// upto BlockBasedTableOptions.max_auto_readahead_size.
// max_auto_readahead_size can also be configured.
//
// Scenarios:
// - If initial_auto_readahead_size is set 0 then it will disabled the
// implicit auto prefetching irrespective of max_auto_readahead_size.
// - If max_auto_readahead_size is set 0, it will disable the internal
// prefetching irrespective of initial_auto_readahead_size.
// - If initial_auto_readahead_size > max_auto_readahead_size, then RocksDB
// will sanitize the value of initial_auto_readahead_size to
// max_auto_readahead_size and readahead_size will be
// max_auto_readahead_size.
//
// Value should be provided along with KB i.e. 8 * 1024 as it will prefetch
// the blocks.
//
// This parameter can be changed dynamically by
// DB::SetOptions({{"block_based_table_factory",
// "{initial_auto_readahead_size=0;}"}}));
//
// Changing the value dynamically will only affect files opened after the
// change.
//
// Default: 8 KB (8 * 1024).
size_t initial_auto_readahead_size = 8 * 1024;
}; };
// Table Properties that are specific to block-based table properties. // Table Properties that are specific to block-based table properties.

View File

@ -195,7 +195,8 @@ TEST_F(OptionsSettableTest, BlockBasedTableOptionsAllFieldsSettable) {
"enable_index_compression=false;" "enable_index_compression=false;"
"block_align=true;" "block_align=true;"
"max_auto_readahead_size=0;" "max_auto_readahead_size=0;"
"prepopulate_block_cache=kDisable", "prepopulate_block_cache=kDisable;"
"initial_auto_readahead_size=0",
new_bbto)); new_bbto));
ASSERT_EQ(unset_bytes_base, ASSERT_EQ(unset_bytes_base,

View File

@ -413,6 +413,10 @@ static std::unordered_map<std::string, OptionTypeInfo>
offsetof(struct BlockBasedTableOptions, prepopulate_block_cache), offsetof(struct BlockBasedTableOptions, prepopulate_block_cache),
&block_base_table_prepopulate_block_cache_string_map, &block_base_table_prepopulate_block_cache_string_map,
OptionTypeFlags::kMutable)}, OptionTypeFlags::kMutable)},
{"initial_auto_readahead_size",
{offsetof(struct BlockBasedTableOptions, initial_auto_readahead_size),
OptionType::kSizeT, OptionVerificationType::kNormal,
OptionTypeFlags::kMutable}},
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE
}; };
@ -815,6 +819,10 @@ std::string BlockBasedTableFactory::GetPrintableOptions() const {
snprintf(buffer, kBufferSize, " prepopulate_block_cache: %d\n", snprintf(buffer, kBufferSize, " prepopulate_block_cache: %d\n",
static_cast<int>(table_options_.prepopulate_block_cache)); static_cast<int>(table_options_.prepopulate_block_cache));
ret.append(buffer); ret.append(buffer);
snprintf(buffer, kBufferSize,
" initial_auto_readahead_size: %" ROCKSDB_PRIszt "\n",
table_options_.initial_auto_readahead_size);
ret.append(buffer);
return ret; return ret;
} }

View File

@ -35,7 +35,9 @@ class BlockBasedTableIterator : public InternalIteratorBase<Slice> {
pinned_iters_mgr_(nullptr), pinned_iters_mgr_(nullptr),
prefix_extractor_(prefix_extractor), prefix_extractor_(prefix_extractor),
lookup_context_(caller), lookup_context_(caller),
block_prefetcher_(compaction_readahead_size), block_prefetcher_(
compaction_readahead_size,
table_->get_rep()->table_options.initial_auto_readahead_size),
allow_unprepared_value_(allow_unprepared_value), allow_unprepared_value_(allow_unprepared_value),
block_iter_points_to_real_block_(false), block_iter_points_to_real_block_(false),
check_filter_(check_filter), check_filter_(check_filter),

View File

@ -71,7 +71,6 @@ class BlockBasedTable : public TableReader {
static const std::string kPartitionedFilterBlockPrefix; static const std::string kPartitionedFilterBlockPrefix;
// All the below fields control iterator readahead // All the below fields control iterator readahead
static const size_t kInitAutoReadaheadSize = 8 * 1024;
static const int kMinNumFileReadsToStartAutoReadahead = 2; static const int kMinNumFileReadsToStartAutoReadahead = 2;
// 1-byte compression type + 32-bit checksum // 1-byte compression type + 32-bit checksum

View File

@ -34,7 +34,7 @@ void BlockPrefetcher::PrefetchIfNeeded(const BlockBasedTable::Rep* rep,
// If max_auto_readahead_size is set to be 0 by user, no data will be // If max_auto_readahead_size is set to be 0 by user, no data will be
// prefetched. // prefetched.
size_t max_auto_readahead_size = rep->table_options.max_auto_readahead_size; size_t max_auto_readahead_size = rep->table_options.max_auto_readahead_size;
if (max_auto_readahead_size == 0) { if (max_auto_readahead_size == 0 || initial_auto_readahead_size_ == 0) {
return; return;
} }
@ -50,7 +50,7 @@ void BlockPrefetcher::PrefetchIfNeeded(const BlockBasedTable::Rep* rep,
if (!IsBlockSequential(offset)) { if (!IsBlockSequential(offset)) {
UpdateReadPattern(offset, len); UpdateReadPattern(offset, len);
ResetValues(); ResetValues(rep->table_options.initial_auto_readahead_size);
return; return;
} }
UpdateReadPattern(offset, len); UpdateReadPattern(offset, len);

View File

@ -12,8 +12,12 @@
namespace ROCKSDB_NAMESPACE { namespace ROCKSDB_NAMESPACE {
class BlockPrefetcher { class BlockPrefetcher {
public: public:
explicit BlockPrefetcher(size_t compaction_readahead_size) explicit BlockPrefetcher(size_t compaction_readahead_size,
: compaction_readahead_size_(compaction_readahead_size) {} size_t initial_auto_readahead_size)
: compaction_readahead_size_(compaction_readahead_size),
readahead_size_(initial_auto_readahead_size),
initial_auto_readahead_size_(initial_auto_readahead_size) {}
void PrefetchIfNeeded(const BlockBasedTable::Rep* rep, void PrefetchIfNeeded(const BlockBasedTable::Rep* rep,
const BlockHandle& handle, size_t readahead_size, const BlockHandle& handle, size_t readahead_size,
bool is_for_compaction, bool async_io); bool is_for_compaction, bool async_io);
@ -28,12 +32,13 @@ class BlockPrefetcher {
return (prev_len_ == 0 || (prev_offset_ + prev_len_ == offset)); return (prev_len_ == 0 || (prev_offset_ + prev_len_ == offset));
} }
void ResetValues() { void ResetValues(size_t initial_auto_readahead_size) {
num_file_reads_ = 1; num_file_reads_ = 1;
// Since initial_auto_readahead_size_ can be different from // Since initial_auto_readahead_size_ can be different from
// kInitAutoReadaheadSize in case of adaptive_readahead, so fallback the // the value passed to BlockBasedTableOptions.initial_auto_readahead_size in
// readahead_size_ to kInitAutoReadaheadSize in case of reset. // case of adaptive_readahead, so fallback the readahead_size_ to that value
initial_auto_readahead_size_ = BlockBasedTable::kInitAutoReadaheadSize; // in case of reset.
initial_auto_readahead_size_ = initial_auto_readahead_size;
readahead_size_ = initial_auto_readahead_size_; readahead_size_ = initial_auto_readahead_size_;
readahead_limit_ = 0; readahead_limit_ = 0;
return; return;
@ -52,12 +57,11 @@ class BlockPrefetcher {
size_t compaction_readahead_size_; size_t compaction_readahead_size_;
// readahead_size_ is used if underlying FS supports prefetching. // readahead_size_ is used if underlying FS supports prefetching.
size_t readahead_size_ = BlockBasedTable::kInitAutoReadaheadSize; size_t readahead_size_;
size_t readahead_limit_ = 0; size_t readahead_limit_ = 0;
// initial_auto_readahead_size_ is used if RocksDB uses internal prefetch // initial_auto_readahead_size_ is used if RocksDB uses internal prefetch
// buffer. // buffer.
uint64_t initial_auto_readahead_size_ = uint64_t initial_auto_readahead_size_;
BlockBasedTable::kInitAutoReadaheadSize;
int64_t num_file_reads_ = 0; int64_t num_file_reads_ = 0;
uint64_t prev_offset_ = 0; uint64_t prev_offset_ = 0;
size_t prev_len_ = 0; size_t prev_len_ = 0;

View File

@ -36,7 +36,9 @@ class PartitionedIndexIterator : public InternalIteratorBase<IndexValue> {
user_comparator_(icomp.user_comparator()), user_comparator_(icomp.user_comparator()),
block_iter_points_to_real_block_(false), block_iter_points_to_real_block_(false),
lookup_context_(caller), lookup_context_(caller),
block_prefetcher_(compaction_readahead_size) { block_prefetcher_(
compaction_readahead_size,
table_->get_rep()->table_options.initial_auto_readahead_size) {
} }
~PartitionedIndexIterator() override {} ~PartitionedIndexIterator() override {}