BlockBasedTableReader: automatically adjust tail prefetch size (#4156)

Summary:
Right now we use one hard-coded prefetch size to prefetch data from the tail of the SST files. However, this may introduce a waste for some use cases, while not efficient for others.
Introduce a way to adjust this prefetch size by tracking 32 recent times, and pick a value with which the wasted read is less than 10%
Pull Request resolved: https://github.com/facebook/rocksdb/pull/4156

Differential Revision: D8916847

Pulled By: siying

fbshipit-source-id: 8413f9eb3987e0033ed0bd910f83fc2eeaaf5758
This commit is contained in:
Siying Dong 2018-07-20 14:31:27 -07:00 committed by Facebook Github Bot
parent ab35505e21
commit 8425c8bd4d
8 changed files with 336 additions and 21 deletions

View File

@ -2321,9 +2321,9 @@ TEST_F(DBTest2, RateLimitedCompactionReads) {
options.rate_limiter->GetTotalBytesThrough(Env::IO_LOW);
// Include the explicit prefetch of the footer in direct I/O case.
size_t direct_io_extra = use_direct_io ? 512 * 1024 : 0;
ASSERT_GE(rate_limited_bytes,
static_cast<size_t>(kNumKeysPerFile * kBytesPerKey * kNumL0Files +
direct_io_extra));
ASSERT_GE(
rate_limited_bytes,
static_cast<size_t>(kNumKeysPerFile * kBytesPerKey * kNumL0Files));
ASSERT_LT(
rate_limited_bytes,
static_cast<size_t>(2 * kNumKeysPerFile * kBytesPerKey * kNumL0Files +
@ -2547,6 +2547,88 @@ TEST_F(DBTest2, PinnableSliceAndMmapReads) {
#endif
}
TEST_F(DBTest2, TestBBTTailPrefetch) {
std::atomic<bool> called(false);
size_t expected_lower_bound = 512 * 1024;
size_t expected_higher_bound = 512 * 1024;
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"BlockBasedTable::Open::TailPrefetchLen", [&](void* arg) {
size_t* prefetch_size = static_cast<size_t*>(arg);
EXPECT_LE(expected_lower_bound, *prefetch_size);
EXPECT_GE(expected_higher_bound, *prefetch_size);
called = true;
});
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
Put("1", "1");
Put("9", "1");
Flush();
expected_lower_bound = 0;
expected_higher_bound = 8 * 1024;
Put("1", "1");
Put("9", "1");
Flush();
Put("1", "1");
Put("9", "1");
Flush();
// Full compaction to make sure there is no L0 file after the open.
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
ASSERT_TRUE(called.load());
called = false;
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks();
std::atomic<bool> first_call(true);
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"BlockBasedTable::Open::TailPrefetchLen", [&](void* arg) {
size_t* prefetch_size = static_cast<size_t*>(arg);
if (first_call) {
EXPECT_EQ(4 * 1024, *prefetch_size);
first_call = false;
} else {
EXPECT_GE(4 * 1024, *prefetch_size);
}
called = true;
});
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
Options options = CurrentOptions();
options.max_file_opening_threads = 1; // one thread
BlockBasedTableOptions table_options;
table_options.cache_index_and_filter_blocks = true;
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
options.max_open_files = -1;
Reopen(options);
Put("1", "1");
Put("9", "1");
Flush();
Put("1", "1");
Put("9", "1");
Flush();
ASSERT_TRUE(called.load());
called = false;
// Parallel loading SST files
options.max_file_opening_threads = 16;
Reopen(options);
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
ASSERT_TRUE(called.load());
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks();
}
} // namespace rocksdb
int main(int argc, char** argv) {

View File

@ -27,10 +27,141 @@
#include "table/block_based_table_builder.h"
#include "table/block_based_table_reader.h"
#include "table/format.h"
#include "util/mutexlock.h"
#include "util/string_util.h"
namespace rocksdb {
void TailPrefetchStats::RecordEffectiveSize(size_t len) {
MutexLock l(&mutex_);
if (num_records_ < kNumTracked) {
num_records_++;
}
records_[next_++] = len;
if (next_ == kNumTracked) {
next_ = 0;
}
}
size_t TailPrefetchStats::GetSuggestedPrefetchSize() {
std::vector<size_t> sorted;
{
MutexLock l(&mutex_);
if (num_records_ == 0) {
return 0;
}
sorted.assign(records_, records_ + num_records_);
}
// Of the historic size, we find the maximum one that satisifis the condtiion
// that if prefetching all, less than 1/8 will be wasted.
std::sort(sorted.begin(), sorted.end());
// Assuming we have 5 data points, and after sorting it looks like this:
//
// +---+
// +---+ | |
// | | | |
// | | | |
// | | | |
// | | | |
// +---+ | | | |
// | | | | | |
// +---+ | | | | | |
// | | | | | | | |
// +---+ | | | | | | | |
// | | | | | | | | | |
// | | | | | | | | | |
// | | | | | | | | | |
// | | | | | | | | | |
// | | | | | | | | | |
// +---+ +---+ +---+ +---+ +---+
//
// and we use every of the value as a candidate, and estimate how much we
// wasted, compared to read. For example, when we use the 3rd record
// as candiate. This area is what we read:
// +---+
// +---+ | |
// | | | |
// | | | |
// | | | |
// | | | |
// *** *** *** ***+ *** *** *** *** **
// * | | | | | |
// +---+ | | | | | *
// * | | | | | | | |
// +---+ | | | | | | | *
// * | | | | X | | | | |
// | | | | | | | | | *
// * | | | | | | | | |
// | | | | | | | | | *
// * | | | | | | | | |
// *** *** ***-*** ***--*** ***--*** +****
// which is (size of the record) X (number of records).
//
// While wasted is this area:
// +---+
// +---+ | |
// | | | |
// | | | |
// | | | |
// | | | |
// *** *** *** ****---+ | | | |
// * * | | | | |
// * *-*** *** | | | | |
// * * | | | | | | |
// *--** *** | | | | | | |
// | | | | | X | | | | |
// | | | | | | | | | |
// | | | | | | | | | |
// | | | | | | | | | |
// | | | | | | | | | |
// +---+ +---+ +---+ +---+ +---+
//
// Which can be calculated iteratively.
// The difference between wasted using 4st and 3rd record, will
// be following area:
// +---+
// +--+ +-+ ++ +-+ +-+ +---+ | |
// + xxxxxxxxxxxxxxxxxxxxxxxx | | | |
// xxxxxxxxxxxxxxxxxxxxxxxx | | | |
// + xxxxxxxxxxxxxxxxxxxxxxxx | | | |
// | xxxxxxxxxxxxxxxxxxxxxxxx | | | |
// +-+ +-+ +-+ ++ +---+ +--+ | | |
// | | | | | | |
// +---+ ++ | | | | | |
// | | | | | | X | | |
// +---+ ++ | | | | | | | |
// | | | | | | | | | |
// | | | | | | | | | |
// | | | | | | | | | |
// | | | | | | | | | |
// | | | | | | | | | |
// +---+ +---+ +---+ +---+ +---+
//
// which will be the size difference between 4st and 3rd record,
// times 3, which is number of records before the 4st.
// Here we assume that all data within the prefetch range will be useful. In
// reality, it may not be the case when a partial block is inside the range,
// or there are data in the middle that is not read. We ignore those cases
// for simplicity.
assert(!sorted.empty());
size_t prev_size = sorted[0];
size_t max_qualified_size = sorted[0];
size_t wasted = 0;
for (size_t i = 1; i < sorted.size(); i++) {
size_t read = sorted[i] * sorted.size();
wasted += (sorted[i] - prev_size) * i;
if (wasted <= read / 8) {
max_qualified_size = sorted[i];
}
prev_size = sorted[i];
}
const size_t kMaxPrefetchSize = 512 * 1024; // Never exceed 512KB
return std::min(kMaxPrefetchSize, max_qualified_size);
}
BlockBasedTableFactory::BlockBasedTableFactory(
const BlockBasedTableOptions& _table_options)
: table_options_(_table_options) {
@ -71,7 +202,8 @@ Status BlockBasedTableFactory::NewTableReader(
table_options_, table_reader_options.internal_comparator, std::move(file),
file_size, table_reader, table_reader_options.prefix_extractor,
prefetch_index_and_filter_in_cache, table_reader_options.skip_filters,
table_reader_options.level, table_reader_options.immortal);
table_reader_options.level, table_reader_options.immortal,
&tail_prefetch_stats_);
}
TableBuilder* BlockBasedTableFactory::NewTableBuilder(

View File

@ -26,6 +26,22 @@ struct EnvOptions;
using std::unique_ptr;
class BlockBasedTableBuilder;
// A class used to track actual bytes written from the tail in the recent SST
// file opens, and provide a suggestion for following open.
class TailPrefetchStats {
public:
void RecordEffectiveSize(size_t len);
// 0 indicates no information to determine.
size_t GetSuggestedPrefetchSize();
private:
const static size_t kNumTracked = 32;
size_t records_[kNumTracked];
port::Mutex mutex_;
size_t next_ = 0;
size_t num_records_ = 0;
};
class BlockBasedTableFactory : public TableFactory {
public:
explicit BlockBasedTableFactory(
@ -64,6 +80,7 @@ class BlockBasedTableFactory : public TableFactory {
private:
BlockBasedTableOptions table_options_;
mutable TailPrefetchStats tail_prefetch_stats_;
};
extern const std::string kHashIndexPrefixesBlock;

View File

@ -731,7 +731,8 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions,
const SliceTransform* prefix_extractor,
const bool prefetch_index_and_filter_in_cache,
const bool skip_filters, const int level,
const bool immortal_table) {
const bool immortal_table,
TailPrefetchStats* tail_prefetch_stats) {
table_reader->reset();
Footer footer;
@ -741,29 +742,40 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions,
// prefetch both index and filters, down to all partitions
const bool prefetch_all = prefetch_index_and_filter_in_cache || level == 0;
const bool preload_all = !table_options.cache_index_and_filter_blocks;
// Before read footer, readahead backwards to prefetch data. Do more readahead
// if we're going to read index/filter.
// TODO: This may incorrectly select small readahead in case partitioned
// index/filter is enabled and top-level partition pinning is enabled. That's
// because we need to issue readahead before we read the properties, at which
// point we don't yet know the index type.
const size_t kTailPrefetchSize =
prefetch_all || preload_all ? 512 * 1024 : 4 * 1024;
size_t tail_prefetch_size = 0;
if (tail_prefetch_stats != nullptr) {
// Multiple threads may get a 0 (no history) when running in parallel,
// but it will get cleared after the first of them finishes.
tail_prefetch_size = tail_prefetch_stats->GetSuggestedPrefetchSize();
}
if (tail_prefetch_size == 0) {
// Before read footer, readahead backwards to prefetch data. Do more
// readahead if we're going to read index/filter.
// TODO: This may incorrectly select small readahead in case partitioned
// index/filter is enabled and top-level partition pinning is enabled.
// That's because we need to issue readahead before we read the properties,
// at which point we don't yet know the index type.
tail_prefetch_size = prefetch_all || preload_all ? 512 * 1024 : 4 * 1024;
}
size_t prefetch_off;
size_t prefetch_len;
if (file_size < kTailPrefetchSize) {
if (file_size < tail_prefetch_size) {
prefetch_off = 0;
prefetch_len = static_cast<size_t>(file_size);
} else {
prefetch_off = static_cast<size_t>(file_size - kTailPrefetchSize);
prefetch_len = kTailPrefetchSize;
prefetch_off = static_cast<size_t>(file_size - tail_prefetch_size);
prefetch_len = tail_prefetch_size;
}
TEST_SYNC_POINT_CALLBACK("BlockBasedTable::Open::TailPrefetchLen",
&tail_prefetch_size);
Status s;
// TODO should not have this special logic in the future.
if (!file->use_direct_io()) {
prefetch_buffer.reset(new FilePrefetchBuffer(nullptr, 0, 0, false, true));
s = file->Prefetch(prefetch_off, prefetch_len);
} else {
prefetch_buffer.reset(new FilePrefetchBuffer());
prefetch_buffer.reset(new FilePrefetchBuffer(nullptr, 0, 0, true, true));
s = prefetch_buffer->Prefetch(file.get(), prefetch_off, prefetch_len);
}
s = ReadFooterFromFile(file.get(), prefetch_buffer.get(), file_size, &footer,
@ -1060,6 +1072,12 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions,
}
if (s.ok()) {
assert(prefetch_buffer.get() != nullptr);
if (tail_prefetch_stats != nullptr) {
assert(prefetch_buffer->min_offset_read() < file_size);
tail_prefetch_stats->RecordEffectiveSize(
file_size - prefetch_buffer->min_offset_read());
}
*table_reader = std::move(new_table);
}

View File

@ -23,6 +23,7 @@
#include "rocksdb/status.h"
#include "rocksdb/table.h"
#include "table/block.h"
#include "table/block_based_table_factory.h"
#include "table/filter_block.h"
#include "table/format.h"
#include "table/persistent_cache_helper.h"
@ -93,7 +94,8 @@ class BlockBasedTable : public TableReader {
const SliceTransform* prefix_extractor = nullptr,
bool prefetch_index_and_filter_in_cache = true,
bool skip_filters = false, int level = -1,
const bool immortal_table = false);
const bool immortal_table = false,
TailPrefetchStats* tail_prefetch_stats = nullptr);
bool PrefixMayMatch(const Slice& internal_key,
const ReadOptions& read_options,

View File

@ -1075,6 +1075,7 @@ class BlockBasedTableTest
};
class PlainTableTest : public TableTest {};
class TablePropertyTest : public testing::Test {};
class BBTTailPrefetchTest : public TableTest {};
INSTANTIATE_TEST_CASE_P(FormatDef, BlockBasedTableTest,
testing::Values(test::kDefaultFormatVersion));
@ -3594,6 +3595,49 @@ TEST_P(BlockBasedTableTest, BadOptions) {
ASSERT_NOK(rocksdb::DB::Open(options, kDBPath, &db));
}
TEST_F(BBTTailPrefetchTest, TestTailPrefetchStats) {
TailPrefetchStats tpstats;
ASSERT_EQ(0, tpstats.GetSuggestedPrefetchSize());
tpstats.RecordEffectiveSize(size_t{1000});
tpstats.RecordEffectiveSize(size_t{1005});
tpstats.RecordEffectiveSize(size_t{1002});
ASSERT_EQ(1005, tpstats.GetSuggestedPrefetchSize());
// One single super large value shouldn't influence much
tpstats.RecordEffectiveSize(size_t{1002000});
tpstats.RecordEffectiveSize(size_t{999});
ASSERT_LE(1005, tpstats.GetSuggestedPrefetchSize());
ASSERT_GT(1200, tpstats.GetSuggestedPrefetchSize());
// Only history of 32 is kept
for (int i = 0; i < 32; i++) {
tpstats.RecordEffectiveSize(size_t{100});
}
ASSERT_EQ(100, tpstats.GetSuggestedPrefetchSize());
// 16 large values and 16 small values. The result should be closer
// to the small value as the algorithm.
for (int i = 0; i < 16; i++) {
tpstats.RecordEffectiveSize(size_t{1000});
}
tpstats.RecordEffectiveSize(size_t{10});
tpstats.RecordEffectiveSize(size_t{20});
for (int i = 0; i < 6; i++) {
tpstats.RecordEffectiveSize(size_t{100});
}
ASSERT_LE(80, tpstats.GetSuggestedPrefetchSize());
ASSERT_GT(200, tpstats.GetSuggestedPrefetchSize());
}
TEST_F(BBTTailPrefetchTest, FilePrefetchBufferMinOffset) {
TailPrefetchStats tpstats;
FilePrefetchBuffer buffer(nullptr, 0, 0, false, true);
buffer.TryReadFromCache(500, 10, nullptr);
buffer.TryReadFromCache(480, 10, nullptr);
buffer.TryReadFromCache(490, 10, nullptr);
ASSERT_EQ(480, buffer.min_offset_read());
}
} // namespace rocksdb
int main(int argc, char** argv) {

View File

@ -714,7 +714,10 @@ Status FilePrefetchBuffer::Prefetch(RandomAccessFileReader* reader,
bool FilePrefetchBuffer::TryReadFromCache(uint64_t offset, size_t n,
Slice* result) {
if (offset < buffer_offset_) {
if (track_min_offset_ && offset < min_offset_read_) {
min_offset_read_ = offset;
}
if (!enable_ || offset < buffer_offset_) {
return false;
}

View File

@ -213,21 +213,38 @@ class WritableFileWriter {
// readahead_size will be doubled on every IO, until max_readahead_size.
class FilePrefetchBuffer {
public:
// If `track_min_offset` is true, track minimum offset ever read.
FilePrefetchBuffer(RandomAccessFileReader* file_reader = nullptr,
size_t readadhead_size = 0, size_t max_readahead_size = 0)
size_t readadhead_size = 0, size_t max_readahead_size = 0,
bool enable = true, bool track_min_offset = false)
: buffer_offset_(0),
file_reader_(file_reader),
readahead_size_(readadhead_size),
max_readahead_size_(max_readahead_size) {}
max_readahead_size_(max_readahead_size),
min_offset_read_(port::kMaxSizet),
enable_(enable),
track_min_offset_(track_min_offset) {}
Status Prefetch(RandomAccessFileReader* reader, uint64_t offset, size_t n);
bool TryReadFromCache(uint64_t offset, size_t n, Slice* result);
// The minimum `offset` ever passed to TryReadFromCache(). Only be tracked
// if track_min_offset = true.
size_t min_offset_read() const { return min_offset_read_; }
private:
AlignedBuffer buffer_;
uint64_t buffer_offset_;
RandomAccessFileReader* file_reader_;
size_t readahead_size_;
size_t max_readahead_size_;
// The minimum `offset` ever passed to TryReadFromCache().
size_t min_offset_read_;
// if false, TryReadFromCache() always return false, and we only take stats
// for track_min_offset_ if track_min_offset_ = true
bool enable_;
// If true, track minimum `offset` ever passed to TryReadFromCache(), which
// can be fetched from min_offset_read().
bool track_min_offset_;
};
extern Status NewWritableFile(Env* env, const std::string& fname,