babe56ddba
Summary: Users can set the priority for file reads associated with their operation by setting `ReadOptions::rate_limiter_priority` to something other than `Env::IO_TOTAL`. Rate limiting `VerifyChecksum()` and `VerifyFileChecksums()` is the motivation for this PR, so it also includes benchmarks and minor bug fixes to get that working. `RandomAccessFileReader::Read()` already had support for rate limiting compaction reads. I changed that rate limiting to be non-specific to compaction, but rather performed according to the passed in `Env::IOPriority`. Now the compaction read rate limiting is supported by setting `rate_limiter_priority = Env::IO_LOW` on its `ReadOptions`. There is no default value for the new `Env::IOPriority` parameter to `RandomAccessFileReader::Read()`. That means this PR goes through all callers (in some cases multiple layers up the call stack) to find a `ReadOptions` to provide the priority. There are TODOs for cases I believe it would be good to let user control the priority some day (e.g., file footer reads), and no TODO in cases I believe it doesn't matter (e.g., trace file reads). The API doc only lists the missing cases where a file read associated with a provided `ReadOptions` cannot be rate limited. For cases like file ingestion checksum calculation, there is no API to provide `ReadOptions` or `Env::IOPriority`, so I didn't count that as missing. Pull Request resolved: https://github.com/facebook/rocksdb/pull/9424 Test Plan: - new unit tests - new benchmarks on ~50MB database with 1MB/s read rate limit and 100ms refill interval; verified with strace reads are chunked (at 0.1MB per chunk) and spaced roughly 100ms apart. - setup command: `./db_bench -benchmarks=fillrandom,compact -db=/tmp/testdb -target_file_size_base=1048576 -disable_auto_compactions=true -file_checksum=true` - benchmarks command: `strace -ttfe pread64 ./db_bench -benchmarks=verifychecksum,verifyfilechecksums -use_existing_db=true -db=/tmp/testdb -rate_limiter_bytes_per_sec=1048576 -rate_limit_bg_reads=1 -rate_limit_user_ops=true -file_checksum=true` - crash test using IO_USER priority on non-validation reads with https://github.com/facebook/rocksdb/issues/9567 reverted: `python3 tools/db_crashtest.py blackbox --max_key=1000000 --write_buffer_size=524288 --target_file_size_base=524288 --level_compaction_dynamic_level_bytes=true --duration=3600 --rate_limit_bg_reads=true --rate_limit_user_ops=true --rate_limiter_bytes_per_sec=10485760 --interval=10` Reviewed By: hx235 Differential Revision: D33747386 Pulled By: ajkr fbshipit-source-id: a2d985e97912fba8c54763798e04f006ccc56e0c
263 lines
7.9 KiB
C++
263 lines
7.9 KiB
C++
// Copyright (c) 2022-present, Facebook, Inc. All rights reserved.
|
|
// This source code is licensed under both the GPLv2 (found in the
|
|
// COPYING file in the root directory) and Apache 2.0 License
|
|
// (found in the LICENSE.Apache file in the root directory).
|
|
|
|
#include "db/db_test_util.h"
|
|
#include "port/stack_trace.h"
|
|
#include "util/file_checksum_helper.h"
|
|
|
|
namespace ROCKSDB_NAMESPACE {
|
|
|
|
class DBRateLimiterTest
|
|
: public DBTestBase,
|
|
public ::testing::WithParamInterface<std::tuple<bool, bool, bool>> {
|
|
public:
|
|
DBRateLimiterTest()
|
|
: DBTestBase("db_rate_limiter_test", /*env_do_fsync=*/false),
|
|
use_direct_io_(std::get<0>(GetParam())),
|
|
use_block_cache_(std::get<1>(GetParam())),
|
|
use_readahead_(std::get<2>(GetParam())) {}
|
|
|
|
void Init() {
|
|
options_ = GetOptions();
|
|
Reopen(options_);
|
|
for (int i = 0; i < kNumFiles; ++i) {
|
|
for (int j = 0; j < kNumKeysPerFile; ++j) {
|
|
ASSERT_OK(Put(Key(i * kNumKeysPerFile + j), "val"));
|
|
}
|
|
ASSERT_OK(Flush());
|
|
}
|
|
MoveFilesToLevel(1);
|
|
}
|
|
|
|
BlockBasedTableOptions GetTableOptions() {
|
|
BlockBasedTableOptions table_options;
|
|
table_options.no_block_cache = !use_block_cache_;
|
|
return table_options;
|
|
}
|
|
|
|
ReadOptions GetReadOptions() {
|
|
ReadOptions read_options;
|
|
read_options.rate_limiter_priority = Env::IO_USER;
|
|
read_options.readahead_size = use_readahead_ ? kReadaheadBytes : 0;
|
|
return read_options;
|
|
}
|
|
|
|
Options GetOptions() {
|
|
Options options = CurrentOptions();
|
|
options.disable_auto_compactions = true;
|
|
options.file_checksum_gen_factory.reset(new FileChecksumGenCrc32cFactory());
|
|
options.rate_limiter.reset(NewGenericRateLimiter(
|
|
1 << 20 /* rate_bytes_per_sec */, 100 * 1000 /* refill_period_us */,
|
|
10 /* fairness */, RateLimiter::Mode::kAllIo));
|
|
options.table_factory.reset(NewBlockBasedTableFactory(GetTableOptions()));
|
|
options.use_direct_reads = use_direct_io_;
|
|
return options;
|
|
}
|
|
|
|
protected:
|
|
const static int kNumKeysPerFile = 1;
|
|
const static int kNumFiles = 3;
|
|
const static int kReadaheadBytes = 32 << 10; // 32KB
|
|
|
|
Options options_;
|
|
const bool use_direct_io_;
|
|
const bool use_block_cache_;
|
|
const bool use_readahead_;
|
|
};
|
|
|
|
std::string GetTestNameSuffix(
|
|
::testing::TestParamInfo<std::tuple<bool, bool, bool>> info) {
|
|
std::ostringstream oss;
|
|
if (std::get<0>(info.param)) {
|
|
oss << "DirectIO";
|
|
} else {
|
|
oss << "BufferedIO";
|
|
}
|
|
if (std::get<1>(info.param)) {
|
|
oss << "_BlockCache";
|
|
} else {
|
|
oss << "_NoBlockCache";
|
|
}
|
|
if (std::get<2>(info.param)) {
|
|
oss << "_Readahead";
|
|
} else {
|
|
oss << "_NoReadahead";
|
|
}
|
|
return oss.str();
|
|
}
|
|
|
|
#ifndef ROCKSDB_LITE
|
|
INSTANTIATE_TEST_CASE_P(DBRateLimiterTest, DBRateLimiterTest,
|
|
::testing::Combine(::testing::Bool(), ::testing::Bool(),
|
|
::testing::Bool()),
|
|
GetTestNameSuffix);
|
|
#else // ROCKSDB_LITE
|
|
// Cannot use direct I/O in lite mode.
|
|
INSTANTIATE_TEST_CASE_P(DBRateLimiterTest, DBRateLimiterTest,
|
|
::testing::Combine(::testing::Values(false),
|
|
::testing::Bool(),
|
|
::testing::Bool()),
|
|
GetTestNameSuffix);
|
|
#endif // ROCKSDB_LITE
|
|
|
|
TEST_P(DBRateLimiterTest, Get) {
|
|
if (use_direct_io_ && !IsDirectIOSupported()) {
|
|
return;
|
|
}
|
|
Init();
|
|
|
|
ASSERT_EQ(0, options_.rate_limiter->GetTotalRequests(Env::IO_USER));
|
|
|
|
int expected = 0;
|
|
for (int i = 0; i < kNumFiles; ++i) {
|
|
{
|
|
std::string value;
|
|
ASSERT_OK(db_->Get(GetReadOptions(), Key(i * kNumKeysPerFile), &value));
|
|
++expected;
|
|
}
|
|
ASSERT_EQ(expected, options_.rate_limiter->GetTotalRequests(Env::IO_USER));
|
|
|
|
{
|
|
std::string value;
|
|
ASSERT_OK(db_->Get(GetReadOptions(), Key(i * kNumKeysPerFile), &value));
|
|
if (!use_block_cache_) {
|
|
++expected;
|
|
}
|
|
}
|
|
ASSERT_EQ(expected, options_.rate_limiter->GetTotalRequests(Env::IO_USER));
|
|
}
|
|
}
|
|
|
|
TEST_P(DBRateLimiterTest, NewMultiGet) {
|
|
// The new void-returning `MultiGet()` APIs use `MultiRead()`, which does not
|
|
// yet support rate limiting.
|
|
if (use_direct_io_ && !IsDirectIOSupported()) {
|
|
return;
|
|
}
|
|
Init();
|
|
|
|
ASSERT_EQ(0, options_.rate_limiter->GetTotalRequests(Env::IO_USER));
|
|
|
|
const int kNumKeys = kNumFiles * kNumKeysPerFile;
|
|
{
|
|
std::vector<std::string> key_bufs;
|
|
key_bufs.reserve(kNumKeys);
|
|
std::vector<Slice> keys;
|
|
keys.reserve(kNumKeys);
|
|
for (int i = 0; i < kNumKeys; ++i) {
|
|
key_bufs.emplace_back(Key(i));
|
|
keys.emplace_back(key_bufs[i]);
|
|
}
|
|
std::vector<Status> statuses(kNumKeys);
|
|
std::vector<PinnableSlice> values(kNumKeys);
|
|
db_->MultiGet(GetReadOptions(), dbfull()->DefaultColumnFamily(), kNumKeys,
|
|
keys.data(), values.data(), statuses.data());
|
|
for (int i = 0; i < kNumKeys; ++i) {
|
|
ASSERT_TRUE(statuses[i].IsNotSupported());
|
|
}
|
|
}
|
|
ASSERT_EQ(0, options_.rate_limiter->GetTotalRequests(Env::IO_USER));
|
|
}
|
|
|
|
TEST_P(DBRateLimiterTest, OldMultiGet) {
|
|
// The old `vector<Status>`-returning `MultiGet()` APIs use `Read()`, which
|
|
// supports rate limiting.
|
|
if (use_direct_io_ && !IsDirectIOSupported()) {
|
|
return;
|
|
}
|
|
Init();
|
|
|
|
ASSERT_EQ(0, options_.rate_limiter->GetTotalRequests(Env::IO_USER));
|
|
|
|
const int kNumKeys = kNumFiles * kNumKeysPerFile;
|
|
int expected = 0;
|
|
{
|
|
std::vector<std::string> key_bufs;
|
|
key_bufs.reserve(kNumKeys);
|
|
std::vector<Slice> keys;
|
|
keys.reserve(kNumKeys);
|
|
for (int i = 0; i < kNumKeys; ++i) {
|
|
key_bufs.emplace_back(Key(i));
|
|
keys.emplace_back(key_bufs[i]);
|
|
}
|
|
std::vector<std::string> values;
|
|
std::vector<Status> statuses =
|
|
db_->MultiGet(GetReadOptions(), keys, &values);
|
|
for (int i = 0; i < kNumKeys; ++i) {
|
|
ASSERT_OK(statuses[i]);
|
|
}
|
|
}
|
|
expected += kNumKeys;
|
|
ASSERT_EQ(expected, options_.rate_limiter->GetTotalRequests(Env::IO_USER));
|
|
}
|
|
|
|
TEST_P(DBRateLimiterTest, Iterator) {
|
|
if (use_direct_io_ && !IsDirectIOSupported()) {
|
|
return;
|
|
}
|
|
Init();
|
|
|
|
std::unique_ptr<Iterator> iter(db_->NewIterator(GetReadOptions()));
|
|
ASSERT_EQ(0, options_.rate_limiter->GetTotalRequests(Env::IO_USER));
|
|
|
|
int expected = 0;
|
|
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
|
|
++expected;
|
|
ASSERT_EQ(expected, options_.rate_limiter->GetTotalRequests(Env::IO_USER));
|
|
}
|
|
|
|
for (iter->SeekToLast(); iter->Valid(); iter->Prev()) {
|
|
// When `use_block_cache_ == true`, the reverse scan will access the blocks
|
|
// loaded to cache during the above forward scan, in which case no further
|
|
// file reads are expected.
|
|
if (!use_block_cache_) {
|
|
++expected;
|
|
}
|
|
}
|
|
// Reverse scan does not read evenly (one block per iteration) due to
|
|
// descending seqno ordering, so wait until after the loop to check total.
|
|
ASSERT_EQ(expected, options_.rate_limiter->GetTotalRequests(Env::IO_USER));
|
|
}
|
|
|
|
#if !defined(ROCKSDB_LITE)
|
|
|
|
TEST_P(DBRateLimiterTest, VerifyChecksum) {
|
|
if (use_direct_io_ && !IsDirectIOSupported()) {
|
|
return;
|
|
}
|
|
Init();
|
|
|
|
ASSERT_EQ(0, options_.rate_limiter->GetTotalRequests(Env::IO_USER));
|
|
|
|
ASSERT_OK(db_->VerifyChecksum(GetReadOptions()));
|
|
// The files are tiny so there should have just been one read per file.
|
|
int expected = kNumFiles;
|
|
ASSERT_EQ(expected, options_.rate_limiter->GetTotalRequests(Env::IO_USER));
|
|
}
|
|
|
|
TEST_P(DBRateLimiterTest, VerifyFileChecksums) {
|
|
if (use_direct_io_ && !IsDirectIOSupported()) {
|
|
return;
|
|
}
|
|
Init();
|
|
|
|
ASSERT_EQ(0, options_.rate_limiter->GetTotalRequests(Env::IO_USER));
|
|
|
|
ASSERT_OK(db_->VerifyFileChecksums(GetReadOptions()));
|
|
// The files are tiny so there should have just been one read per file.
|
|
int expected = kNumFiles;
|
|
ASSERT_EQ(expected, options_.rate_limiter->GetTotalRequests(Env::IO_USER));
|
|
}
|
|
|
|
#endif // !defined(ROCKSDB_LITE)
|
|
|
|
} // namespace ROCKSDB_NAMESPACE
|
|
|
|
int main(int argc, char** argv) {
|
|
ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
|
|
::testing::InitGoogleTest(&argc, argv);
|
|
return RUN_ALL_TESTS();
|
|
}
|