rocksdb/db_stress_tool/cf_consistency_stress.cc
Andrew Kryczka babe56ddba Add rate limiter priority to ReadOptions (#9424)
Summary:
Users can set the priority for file reads associated with their operation by setting `ReadOptions::rate_limiter_priority` to something other than `Env::IO_TOTAL`. Rate limiting `VerifyChecksum()` and `VerifyFileChecksums()` is the motivation for this PR, so it also includes benchmarks and minor bug fixes to get that working.

`RandomAccessFileReader::Read()` already had support for rate limiting compaction reads. I changed that rate limiting to be non-specific to compaction, but rather performed according to the passed in `Env::IOPriority`. Now the compaction read rate limiting is supported by setting `rate_limiter_priority = Env::IO_LOW` on its `ReadOptions`.

There is no default value for the new `Env::IOPriority` parameter to `RandomAccessFileReader::Read()`. That means this PR goes through all callers (in some cases multiple layers up the call stack) to find a `ReadOptions` to provide the priority. There are TODOs for cases I believe it would be good to let user control the priority some day (e.g., file footer reads), and no TODO in cases I believe it doesn't matter (e.g., trace file reads).

The API doc only lists the missing cases where a file read associated with a provided `ReadOptions` cannot be rate limited. For cases like file ingestion checksum calculation, there is no API to provide `ReadOptions` or `Env::IOPriority`, so I didn't count that as missing.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/9424

Test Plan:
- new unit tests
- new benchmarks on ~50MB database with 1MB/s read rate limit and 100ms refill interval; verified with strace reads are chunked (at 0.1MB per chunk) and spaced roughly 100ms apart.
  - setup command: `./db_bench -benchmarks=fillrandom,compact -db=/tmp/testdb -target_file_size_base=1048576 -disable_auto_compactions=true -file_checksum=true`
  - benchmarks command: `strace -ttfe pread64 ./db_bench -benchmarks=verifychecksum,verifyfilechecksums -use_existing_db=true -db=/tmp/testdb -rate_limiter_bytes_per_sec=1048576 -rate_limit_bg_reads=1 -rate_limit_user_ops=true -file_checksum=true`
- crash test using IO_USER priority on non-validation reads with https://github.com/facebook/rocksdb/issues/9567 reverted: `python3 tools/db_crashtest.py blackbox --max_key=1000000 --write_buffer_size=524288 --target_file_size_base=524288 --level_compaction_dynamic_level_bytes=true --duration=3600 --rate_limit_bg_reads=true --rate_limit_user_ops=true --rate_limiter_bytes_per_sec=10485760 --interval=10`

Reviewed By: hx235

Differential Revision: D33747386

Pulled By: ajkr

fbshipit-source-id: a2d985e97912fba8c54763798e04f006ccc56e0c
2022-02-16 23:18:14 -08:00

527 lines
19 KiB
C++

// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#ifdef GFLAGS
#include "db_stress_tool/db_stress_common.h"
#include "file/file_util.h"
namespace ROCKSDB_NAMESPACE {
class CfConsistencyStressTest : public StressTest {
public:
CfConsistencyStressTest() : batch_id_(0) {}
~CfConsistencyStressTest() override {}
bool IsStateTracked() const override { return false; }
Status TestPut(ThreadState* thread, WriteOptions& write_opts,
const ReadOptions& /* read_opts */,
const std::vector<int>& rand_column_families,
const std::vector<int64_t>& rand_keys, char (&value)[100],
std::unique_ptr<MutexLock>& /* lock */) override {
std::string key_str = Key(rand_keys[0]);
Slice key = key_str;
uint64_t value_base = batch_id_.fetch_add(1);
size_t sz =
GenerateValue(static_cast<uint32_t>(value_base), value, sizeof(value));
Slice v(value, sz);
WriteBatch batch;
for (auto cf : rand_column_families) {
ColumnFamilyHandle* cfh = column_families_[cf];
if (FLAGS_use_merge) {
batch.Merge(cfh, key, v);
} else { /* !FLAGS_use_merge */
batch.Put(cfh, key, v);
}
}
Status s = db_->Write(write_opts, &batch);
if (!s.ok()) {
fprintf(stderr, "multi put or merge error: %s\n", s.ToString().c_str());
thread->stats.AddErrors(1);
} else {
auto num = static_cast<long>(rand_column_families.size());
thread->stats.AddBytesForWrites(num, (sz + 1) * num);
}
return s;
}
Status TestDelete(ThreadState* thread, WriteOptions& write_opts,
const std::vector<int>& rand_column_families,
const std::vector<int64_t>& rand_keys,
std::unique_ptr<MutexLock>& /* lock */) override {
std::string key_str = Key(rand_keys[0]);
Slice key = key_str;
WriteBatch batch;
for (auto cf : rand_column_families) {
ColumnFamilyHandle* cfh = column_families_[cf];
batch.Delete(cfh, key);
}
Status s = db_->Write(write_opts, &batch);
if (!s.ok()) {
fprintf(stderr, "multidel error: %s\n", s.ToString().c_str());
thread->stats.AddErrors(1);
} else {
thread->stats.AddDeletes(static_cast<long>(rand_column_families.size()));
}
return s;
}
Status TestDeleteRange(ThreadState* thread, WriteOptions& write_opts,
const std::vector<int>& rand_column_families,
const std::vector<int64_t>& rand_keys,
std::unique_ptr<MutexLock>& /* lock */) override {
int64_t rand_key = rand_keys[0];
auto shared = thread->shared;
int64_t max_key = shared->GetMaxKey();
if (rand_key > max_key - FLAGS_range_deletion_width) {
rand_key =
thread->rand.Next() % (max_key - FLAGS_range_deletion_width + 1);
}
std::string key_str = Key(rand_key);
Slice key = key_str;
std::string end_key_str = Key(rand_key + FLAGS_range_deletion_width);
Slice end_key = end_key_str;
WriteBatch batch;
for (auto cf : rand_column_families) {
ColumnFamilyHandle* cfh = column_families_[rand_column_families[cf]];
batch.DeleteRange(cfh, key, end_key);
}
Status s = db_->Write(write_opts, &batch);
if (!s.ok()) {
fprintf(stderr, "multi del range error: %s\n", s.ToString().c_str());
thread->stats.AddErrors(1);
} else {
thread->stats.AddRangeDeletions(
static_cast<long>(rand_column_families.size()));
}
return s;
}
void TestIngestExternalFile(
ThreadState* /* thread */,
const std::vector<int>& /* rand_column_families */,
const std::vector<int64_t>& /* rand_keys */,
std::unique_ptr<MutexLock>& /* lock */) override {
assert(false);
fprintf(stderr,
"CfConsistencyStressTest does not support TestIngestExternalFile "
"because it's not possible to verify the result\n");
std::terminate();
}
Status TestGet(ThreadState* thread, const ReadOptions& readoptions,
const std::vector<int>& rand_column_families,
const std::vector<int64_t>& rand_keys) override {
std::string key_str = Key(rand_keys[0]);
Slice key = key_str;
Status s;
bool is_consistent = true;
if (thread->rand.OneIn(2)) {
// 1/2 chance, does a random read from random CF
auto cfh =
column_families_[rand_column_families[thread->rand.Next() %
rand_column_families.size()]];
std::string from_db;
s = db_->Get(readoptions, cfh, key, &from_db);
} else {
// 1/2 chance, comparing one key is the same across all CFs
const Snapshot* snapshot = db_->GetSnapshot();
ReadOptions readoptionscopy = readoptions;
readoptionscopy.snapshot = snapshot;
std::string value0;
s = db_->Get(readoptionscopy, column_families_[rand_column_families[0]],
key, &value0);
if (s.ok() || s.IsNotFound()) {
bool found = s.ok();
for (size_t i = 1; i < rand_column_families.size(); i++) {
std::string value1;
s = db_->Get(readoptionscopy,
column_families_[rand_column_families[i]], key, &value1);
if (!s.ok() && !s.IsNotFound()) {
break;
}
if (!found && s.ok()) {
fprintf(stderr, "Get() return different results with key %s\n",
Slice(key_str).ToString(true).c_str());
fprintf(stderr, "CF %s is not found\n",
column_family_names_[0].c_str());
fprintf(stderr, "CF %s returns value %s\n",
column_family_names_[i].c_str(),
Slice(value1).ToString(true).c_str());
is_consistent = false;
} else if (found && s.IsNotFound()) {
fprintf(stderr, "Get() return different results with key %s\n",
Slice(key_str).ToString(true).c_str());
fprintf(stderr, "CF %s returns value %s\n",
column_family_names_[0].c_str(),
Slice(value0).ToString(true).c_str());
fprintf(stderr, "CF %s is not found\n",
column_family_names_[i].c_str());
is_consistent = false;
} else if (s.ok() && value0 != value1) {
fprintf(stderr, "Get() return different results with key %s\n",
Slice(key_str).ToString(true).c_str());
fprintf(stderr, "CF %s returns value %s\n",
column_family_names_[0].c_str(),
Slice(value0).ToString(true).c_str());
fprintf(stderr, "CF %s returns value %s\n",
column_family_names_[i].c_str(),
Slice(value1).ToString(true).c_str());
is_consistent = false;
}
if (!is_consistent) {
break;
}
}
}
db_->ReleaseSnapshot(snapshot);
}
if (!is_consistent) {
fprintf(stderr, "TestGet error: is_consistent is false\n");
thread->stats.AddErrors(1);
// Fail fast to preserve the DB state.
thread->shared->SetVerificationFailure();
} else if (s.ok()) {
thread->stats.AddGets(1, 1);
} else if (s.IsNotFound()) {
thread->stats.AddGets(1, 0);
} else {
fprintf(stderr, "TestGet error: %s\n", s.ToString().c_str());
thread->stats.AddErrors(1);
}
return s;
}
std::vector<Status> TestMultiGet(
ThreadState* thread, const ReadOptions& read_opts,
const std::vector<int>& rand_column_families,
const std::vector<int64_t>& rand_keys) override {
size_t num_keys = rand_keys.size();
std::vector<std::string> key_str;
std::vector<Slice> keys;
keys.reserve(num_keys);
key_str.reserve(num_keys);
std::vector<PinnableSlice> values(num_keys);
std::vector<Status> statuses(num_keys);
ColumnFamilyHandle* cfh = column_families_[rand_column_families[0]];
for (size_t i = 0; i < num_keys; ++i) {
key_str.emplace_back(Key(rand_keys[i]));
keys.emplace_back(key_str.back());
}
db_->MultiGet(read_opts, cfh, num_keys, keys.data(), values.data(),
statuses.data());
for (auto s : statuses) {
if (s.ok()) {
// found case
thread->stats.AddGets(1, 1);
} else if (s.IsNotFound()) {
// not found case
thread->stats.AddGets(1, 0);
} else {
// errors case
fprintf(stderr, "MultiGet error: %s\n", s.ToString().c_str());
thread->stats.AddErrors(1);
}
}
return statuses;
}
Status TestPrefixScan(ThreadState* thread, const ReadOptions& readoptions,
const std::vector<int>& rand_column_families,
const std::vector<int64_t>& rand_keys) override {
size_t prefix_to_use =
(FLAGS_prefix_size < 0) ? 7 : static_cast<size_t>(FLAGS_prefix_size);
std::string key_str = Key(rand_keys[0]);
Slice key = key_str;
Slice prefix = Slice(key.data(), prefix_to_use);
std::string upper_bound;
Slice ub_slice;
ReadOptions ro_copy = readoptions;
// Get the next prefix first and then see if we want to set upper bound.
// We'll use the next prefix in an assertion later on
if (GetNextPrefix(prefix, &upper_bound) && thread->rand.OneIn(2)) {
ub_slice = Slice(upper_bound);
ro_copy.iterate_upper_bound = &ub_slice;
}
auto cfh =
column_families_[rand_column_families[thread->rand.Next() %
rand_column_families.size()]];
Iterator* iter = db_->NewIterator(ro_copy, cfh);
unsigned long count = 0;
for (iter->Seek(prefix); iter->Valid() && iter->key().starts_with(prefix);
iter->Next()) {
++count;
}
assert(prefix_to_use == 0 ||
count <= GetPrefixKeyCount(prefix.ToString(), upper_bound));
Status s = iter->status();
if (s.ok()) {
thread->stats.AddPrefixes(1, count);
} else {
fprintf(stderr, "TestPrefixScan error: %s\n", s.ToString().c_str());
thread->stats.AddErrors(1);
}
delete iter;
return s;
}
ColumnFamilyHandle* GetControlCfh(ThreadState* thread,
int /*column_family_id*/
) override {
// All column families should contain the same data. Randomly pick one.
return column_families_[thread->rand.Next() % column_families_.size()];
}
void VerifyDb(ThreadState* thread) const override {
// This `ReadOptions` is for validation purposes. Ignore
// `FLAGS_rate_limit_user_ops` to avoid slowing any validation.
ReadOptions options(FLAGS_verify_checksum, true);
// We must set total_order_seek to true because we are doing a SeekToFirst
// on a column family whose memtables may support (by default) prefix-based
// iterator. In this case, NewIterator with options.total_order_seek being
// false returns a prefix-based iterator. Calling SeekToFirst using this
// iterator causes the iterator to become invalid. That means we cannot
// iterate the memtable using this iterator any more, although the memtable
// contains the most up-to-date key-values.
options.total_order_seek = true;
const auto ss_deleter = [this](const Snapshot* ss) {
db_->ReleaseSnapshot(ss);
};
std::unique_ptr<const Snapshot, decltype(ss_deleter)> snapshot_guard(
db_->GetSnapshot(), ss_deleter);
options.snapshot = snapshot_guard.get();
assert(thread != nullptr);
auto shared = thread->shared;
std::vector<std::unique_ptr<Iterator>> iters(column_families_.size());
for (size_t i = 0; i != column_families_.size(); ++i) {
iters[i].reset(db_->NewIterator(options, column_families_[i]));
}
for (auto& iter : iters) {
iter->SeekToFirst();
}
size_t num = column_families_.size();
assert(num == iters.size());
std::vector<Status> statuses(num, Status::OK());
do {
if (shared->HasVerificationFailedYet()) {
break;
}
size_t valid_cnt = 0;
size_t idx = 0;
for (auto& iter : iters) {
if (iter->Valid()) {
++valid_cnt;
} else {
statuses[idx] = iter->status();
}
++idx;
}
if (valid_cnt == 0) {
Status status;
for (size_t i = 0; i != num; ++i) {
const auto& s = statuses[i];
if (!s.ok()) {
status = s;
fprintf(stderr, "Iterator on cf %s has error: %s\n",
column_families_[i]->GetName().c_str(),
s.ToString().c_str());
shared->SetVerificationFailure();
}
}
break;
} else if (valid_cnt != iters.size()) {
shared->SetVerificationFailure();
for (size_t i = 0; i != num; ++i) {
if (!iters[i]->Valid()) {
if (statuses[i].ok()) {
fprintf(stderr, "Finished scanning cf %s\n",
column_families_[i]->GetName().c_str());
} else {
fprintf(stderr, "Iterator on cf %s has error: %s\n",
column_families_[i]->GetName().c_str(),
statuses[i].ToString().c_str());
}
} else {
fprintf(stderr, "cf %s has remaining data to scan\n",
column_families_[i]->GetName().c_str());
}
}
break;
}
if (shared->HasVerificationFailedYet()) {
break;
}
// If the program reaches here, then all column families' iterators are
// still valid.
if (shared->PrintingVerificationResults()) {
continue;
}
Slice key;
Slice value;
int num_mismatched_cfs = 0;
for (size_t i = 0; i != num; ++i) {
if (i == 0) {
key = iters[i]->key();
value = iters[i]->value();
} else {
int cmp = key.compare(iters[i]->key());
if (cmp != 0) {
++num_mismatched_cfs;
if (1 == num_mismatched_cfs) {
fprintf(stderr, "Verification failed\n");
fprintf(stderr, "Latest Sequence Number: %" PRIu64 "\n",
db_->GetLatestSequenceNumber());
fprintf(stderr, "[%s] %s => %s\n",
column_families_[0]->GetName().c_str(),
key.ToString(true /* hex */).c_str(),
value.ToString(true /* hex */).c_str());
}
fprintf(stderr, "[%s] %s => %s\n",
column_families_[i]->GetName().c_str(),
iters[i]->key().ToString(true /* hex */).c_str(),
iters[i]->value().ToString(true /* hex */).c_str());
#ifndef ROCKSDB_LITE
Slice begin_key;
Slice end_key;
if (cmp < 0) {
begin_key = key;
end_key = iters[i]->key();
} else {
begin_key = iters[i]->key();
end_key = key;
}
std::vector<KeyVersion> versions;
const size_t kMaxNumIKeys = 8;
const auto print_key_versions = [&](ColumnFamilyHandle* cfh) {
Status s = GetAllKeyVersions(db_, cfh, begin_key, end_key,
kMaxNumIKeys, &versions);
if (!s.ok()) {
fprintf(stderr, "%s\n", s.ToString().c_str());
return;
}
assert(nullptr != cfh);
fprintf(stderr,
"Internal keys in CF '%s', [%s, %s] (max %" ROCKSDB_PRIszt
")\n",
cfh->GetName().c_str(),
begin_key.ToString(true /* hex */).c_str(),
end_key.ToString(true /* hex */).c_str(), kMaxNumIKeys);
for (const KeyVersion& kv : versions) {
fprintf(stderr, " key %s seq %" PRIu64 " type %d\n",
Slice(kv.user_key).ToString(true).c_str(), kv.sequence,
kv.type);
}
};
if (1 == num_mismatched_cfs) {
print_key_versions(column_families_[0]);
}
print_key_versions(column_families_[i]);
#endif // ROCKSDB_LITE
shared->SetVerificationFailure();
}
}
}
shared->FinishPrintingVerificationResults();
for (auto& iter : iters) {
iter->Next();
}
} while (true);
}
#ifndef ROCKSDB_LITE
void ContinuouslyVerifyDb(ThreadState* thread) const override {
assert(thread);
Status status;
DB* db_ptr = cmp_db_ ? cmp_db_ : db_;
const auto& cfhs = cmp_db_ ? cmp_cfhs_ : column_families_;
const auto ss_deleter = [&](const Snapshot* ss) {
db_ptr->ReleaseSnapshot(ss);
};
std::unique_ptr<const Snapshot, decltype(ss_deleter)> snapshot_guard(
db_ptr->GetSnapshot(), ss_deleter);
if (cmp_db_) {
status = cmp_db_->TryCatchUpWithPrimary();
}
SharedState* shared = thread->shared;
assert(shared);
if (!status.ok()) {
shared->SetShouldStopTest();
return;
}
assert(cmp_db_ || snapshot_guard.get());
const auto checksum_column_family = [](Iterator* iter,
uint32_t* checksum) -> Status {
assert(nullptr != checksum);
uint32_t ret = 0;
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
ret = crc32c::Extend(ret, iter->key().data(), iter->key().size());
ret = crc32c::Extend(ret, iter->value().data(), iter->value().size());
}
*checksum = ret;
return iter->status();
};
// This `ReadOptions` is for validation purposes. Ignore
// `FLAGS_rate_limit_user_ops` to avoid slowing any validation.
ReadOptions ropts;
ropts.total_order_seek = true;
ropts.snapshot = snapshot_guard.get();
uint32_t crc = 0;
{
// Compute crc for all key-values of default column family.
std::unique_ptr<Iterator> it(db_ptr->NewIterator(ropts));
status = checksum_column_family(it.get(), &crc);
}
uint32_t tmp_crc = 0;
if (status.ok()) {
for (ColumnFamilyHandle* cfh : cfhs) {
if (cfh == db_ptr->DefaultColumnFamily()) {
continue;
}
std::unique_ptr<Iterator> it(db_ptr->NewIterator(ropts, cfh));
status = checksum_column_family(it.get(), &tmp_crc);
if (!status.ok() || tmp_crc != crc) {
break;
}
}
}
if (!status.ok() || tmp_crc != crc) {
shared->SetShouldStopTest();
}
}
#endif // !ROCKSDB_LITE
std::vector<int> GenerateColumnFamilies(
const int /* num_column_families */,
int /* rand_column_family */) const override {
std::vector<int> ret;
int num = static_cast<int>(column_families_.size());
int k = 0;
std::generate_n(back_inserter(ret), num, [&k]() -> int { return k++; });
return ret;
}
private:
std::atomic<int64_t> batch_id_;
};
StressTest* CreateCfConsistencyStressTest() {
return new CfConsistencyStressTest();
}
} // namespace ROCKSDB_NAMESPACE
#endif // GFLAGS