rocksdb/table/plain/plain_table_key_coding.cc
Andrew Kryczka babe56ddba Add rate limiter priority to ReadOptions (#9424)
Summary:
Users can set the priority for file reads associated with their operation by setting `ReadOptions::rate_limiter_priority` to something other than `Env::IO_TOTAL`. Rate limiting `VerifyChecksum()` and `VerifyFileChecksums()` is the motivation for this PR, so it also includes benchmarks and minor bug fixes to get that working.

`RandomAccessFileReader::Read()` already had support for rate limiting compaction reads. I changed that rate limiting to be non-specific to compaction, but rather performed according to the passed in `Env::IOPriority`. Now the compaction read rate limiting is supported by setting `rate_limiter_priority = Env::IO_LOW` on its `ReadOptions`.

There is no default value for the new `Env::IOPriority` parameter to `RandomAccessFileReader::Read()`. That means this PR goes through all callers (in some cases multiple layers up the call stack) to find a `ReadOptions` to provide the priority. There are TODOs for cases I believe it would be good to let user control the priority some day (e.g., file footer reads), and no TODO in cases I believe it doesn't matter (e.g., trace file reads).

The API doc only lists the missing cases where a file read associated with a provided `ReadOptions` cannot be rate limited. For cases like file ingestion checksum calculation, there is no API to provide `ReadOptions` or `Env::IOPriority`, so I didn't count that as missing.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/9424

Test Plan:
- new unit tests
- new benchmarks on ~50MB database with 1MB/s read rate limit and 100ms refill interval; verified with strace reads are chunked (at 0.1MB per chunk) and spaced roughly 100ms apart.
  - setup command: `./db_bench -benchmarks=fillrandom,compact -db=/tmp/testdb -target_file_size_base=1048576 -disable_auto_compactions=true -file_checksum=true`
  - benchmarks command: `strace -ttfe pread64 ./db_bench -benchmarks=verifychecksum,verifyfilechecksums -use_existing_db=true -db=/tmp/testdb -rate_limiter_bytes_per_sec=1048576 -rate_limit_bg_reads=1 -rate_limit_user_ops=true -file_checksum=true`
- crash test using IO_USER priority on non-validation reads with https://github.com/facebook/rocksdb/issues/9567 reverted: `python3 tools/db_crashtest.py blackbox --max_key=1000000 --write_buffer_size=524288 --target_file_size_base=524288 --level_compaction_dynamic_level_bytes=true --duration=3600 --rate_limit_bg_reads=true --rate_limit_user_ops=true --rate_limiter_bytes_per_sec=10485760 --interval=10`

Reviewed By: hx235

Differential Revision: D33747386

Pulled By: ajkr

fbshipit-source-id: a2d985e97912fba8c54763798e04f006ccc56e0c
2022-02-16 23:18:14 -08:00

509 lines
18 KiB
C++

// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#ifndef ROCKSDB_LITE
#include "table/plain/plain_table_key_coding.h"
#include <algorithm>
#include <string>
#include "db/dbformat.h"
#include "file/writable_file_writer.h"
#include "table/plain/plain_table_factory.h"
#include "table/plain/plain_table_reader.h"
namespace ROCKSDB_NAMESPACE {
enum PlainTableEntryType : unsigned char {
kFullKey = 0,
kPrefixFromPreviousKey = 1,
kKeySuffix = 2,
};
namespace {
// Control byte:
// First two bits indicate type of entry
// Other bytes are inlined sizes. If all bits are 1 (0x03F), overflow bytes
// are used. key_size-0x3F will be encoded as a variint32 after this bytes.
const unsigned char kSizeInlineLimit = 0x3F;
// Return 0 for error
size_t EncodeSize(PlainTableEntryType type, uint32_t key_size,
char* out_buffer) {
out_buffer[0] = type << 6;
if (key_size < static_cast<uint32_t>(kSizeInlineLimit)) {
// size inlined
out_buffer[0] |= static_cast<char>(key_size);
return 1;
} else {
out_buffer[0] |= kSizeInlineLimit;
char* ptr = EncodeVarint32(out_buffer + 1, key_size - kSizeInlineLimit);
return ptr - out_buffer;
}
}
} // namespace
// Fill bytes_read with number of bytes read.
inline Status PlainTableKeyDecoder::DecodeSize(uint32_t start_offset,
PlainTableEntryType* entry_type,
uint32_t* key_size,
uint32_t* bytes_read) {
Slice next_byte_slice;
bool success = file_reader_.Read(start_offset, 1, &next_byte_slice);
if (!success) {
return file_reader_.status();
}
*entry_type = static_cast<PlainTableEntryType>(
(static_cast<unsigned char>(next_byte_slice[0]) & ~kSizeInlineLimit) >>
6);
char inline_key_size = next_byte_slice[0] & kSizeInlineLimit;
if (inline_key_size < kSizeInlineLimit) {
*key_size = inline_key_size;
*bytes_read = 1;
return Status::OK();
} else {
uint32_t extra_size;
uint32_t tmp_bytes_read;
success = file_reader_.ReadVarint32(start_offset + 1, &extra_size,
&tmp_bytes_read);
if (!success) {
return file_reader_.status();
}
assert(tmp_bytes_read > 0);
*key_size = kSizeInlineLimit + extra_size;
*bytes_read = tmp_bytes_read + 1;
return Status::OK();
}
}
IOStatus PlainTableKeyEncoder::AppendKey(const Slice& key,
WritableFileWriter* file,
uint64_t* offset, char* meta_bytes_buf,
size_t* meta_bytes_buf_size) {
ParsedInternalKey parsed_key;
Status pik_status =
ParseInternalKey(key, &parsed_key, false /* log_err_key */); // TODO
if (!pik_status.ok()) {
return IOStatus::Corruption(pik_status.getState());
}
Slice key_to_write = key; // Portion of internal key to write out.
uint32_t user_key_size = static_cast<uint32_t>(key.size() - 8);
if (encoding_type_ == kPlain) {
if (fixed_user_key_len_ == kPlainTableVariableLength) {
// Write key length
char key_size_buf[5]; // tmp buffer for key size as varint32
char* ptr = EncodeVarint32(key_size_buf, user_key_size);
assert(ptr <= key_size_buf + sizeof(key_size_buf));
auto len = ptr - key_size_buf;
IOStatus io_s = file->Append(Slice(key_size_buf, len));
if (!io_s.ok()) {
return io_s;
}
*offset += len;
}
} else {
assert(encoding_type_ == kPrefix);
char size_bytes[12];
size_t size_bytes_pos = 0;
Slice prefix =
prefix_extractor_->Transform(Slice(key.data(), user_key_size));
if (key_count_for_prefix_ == 0 || prefix != pre_prefix_.GetUserKey() ||
key_count_for_prefix_ % index_sparseness_ == 0) {
key_count_for_prefix_ = 1;
pre_prefix_.SetUserKey(prefix);
size_bytes_pos += EncodeSize(kFullKey, user_key_size, size_bytes);
IOStatus io_s = file->Append(Slice(size_bytes, size_bytes_pos));
if (!io_s.ok()) {
return io_s;
}
*offset += size_bytes_pos;
} else {
key_count_for_prefix_++;
if (key_count_for_prefix_ == 2) {
// For second key within a prefix, need to encode prefix length
size_bytes_pos +=
EncodeSize(kPrefixFromPreviousKey,
static_cast<uint32_t>(pre_prefix_.GetUserKey().size()),
size_bytes + size_bytes_pos);
}
uint32_t prefix_len =
static_cast<uint32_t>(pre_prefix_.GetUserKey().size());
size_bytes_pos += EncodeSize(kKeySuffix, user_key_size - prefix_len,
size_bytes + size_bytes_pos);
IOStatus io_s = file->Append(Slice(size_bytes, size_bytes_pos));
if (!io_s.ok()) {
return io_s;
}
*offset += size_bytes_pos;
key_to_write = Slice(key.data() + prefix_len, key.size() - prefix_len);
}
}
// Encode full key
// For value size as varint32 (up to 5 bytes).
// If the row is of value type with seqId 0, flush the special flag together
// in this buffer to safe one file append call, which takes 1 byte.
if (parsed_key.sequence == 0 && parsed_key.type == kTypeValue) {
IOStatus io_s =
file->Append(Slice(key_to_write.data(), key_to_write.size() - 8));
if (!io_s.ok()) {
return io_s;
}
*offset += key_to_write.size() - 8;
meta_bytes_buf[*meta_bytes_buf_size] = PlainTableFactory::kValueTypeSeqId0;
*meta_bytes_buf_size += 1;
} else {
IOStatus io_s = file->Append(key_to_write);
if (!io_s.ok()) {
return io_s;
}
*offset += key_to_write.size();
}
return IOStatus::OK();
}
Slice PlainTableFileReader::GetFromBuffer(Buffer* buffer, uint32_t file_offset,
uint32_t len) {
assert(file_offset + len <= file_info_->data_end_offset);
return Slice(buffer->buf.get() + (file_offset - buffer->buf_start_offset),
len);
}
bool PlainTableFileReader::ReadNonMmap(uint32_t file_offset, uint32_t len,
Slice* out) {
const uint32_t kPrefetchSize = 256u;
// Try to read from buffers.
for (uint32_t i = 0; i < num_buf_; i++) {
Buffer* buffer = buffers_[num_buf_ - 1 - i].get();
if (file_offset >= buffer->buf_start_offset &&
file_offset + len <= buffer->buf_start_offset + buffer->buf_len) {
*out = GetFromBuffer(buffer, file_offset, len);
return true;
}
}
Buffer* new_buffer;
// Data needed is not in any of the buffer. Allocate a new buffer.
if (num_buf_ < buffers_.size()) {
// Add a new buffer
new_buffer = new Buffer();
buffers_[num_buf_++].reset(new_buffer);
} else {
// Now simply replace the last buffer. Can improve the placement policy
// if needed.
new_buffer = buffers_[num_buf_ - 1].get();
}
assert(file_offset + len <= file_info_->data_end_offset);
uint32_t size_to_read = std::min(file_info_->data_end_offset - file_offset,
std::max(kPrefetchSize, len));
if (size_to_read > new_buffer->buf_capacity) {
new_buffer->buf.reset(new char[size_to_read]);
new_buffer->buf_capacity = size_to_read;
new_buffer->buf_len = 0;
}
Slice read_result;
// TODO: rate limit plain table reads.
Status s =
file_info_->file->Read(IOOptions(), file_offset, size_to_read,
&read_result, new_buffer->buf.get(), nullptr,
Env::IO_TOTAL /* rate_limiter_priority */);
if (!s.ok()) {
status_ = s;
return false;
}
new_buffer->buf_start_offset = file_offset;
new_buffer->buf_len = size_to_read;
*out = GetFromBuffer(new_buffer, file_offset, len);
return true;
}
inline bool PlainTableFileReader::ReadVarint32(uint32_t offset, uint32_t* out,
uint32_t* bytes_read) {
if (file_info_->is_mmap_mode) {
const char* start = file_info_->file_data.data() + offset;
const char* limit =
file_info_->file_data.data() + file_info_->data_end_offset;
const char* key_ptr = GetVarint32Ptr(start, limit, out);
assert(key_ptr != nullptr);
*bytes_read = static_cast<uint32_t>(key_ptr - start);
return true;
} else {
return ReadVarint32NonMmap(offset, out, bytes_read);
}
}
bool PlainTableFileReader::ReadVarint32NonMmap(uint32_t offset, uint32_t* out,
uint32_t* bytes_read) {
const char* start;
const char* limit;
const uint32_t kMaxVarInt32Size = 6u;
uint32_t bytes_to_read =
std::min(file_info_->data_end_offset - offset, kMaxVarInt32Size);
Slice bytes;
if (!Read(offset, bytes_to_read, &bytes)) {
return false;
}
start = bytes.data();
limit = bytes.data() + bytes.size();
const char* key_ptr = GetVarint32Ptr(start, limit, out);
*bytes_read =
(key_ptr != nullptr) ? static_cast<uint32_t>(key_ptr - start) : 0;
return true;
}
Status PlainTableKeyDecoder::ReadInternalKey(
uint32_t file_offset, uint32_t user_key_size, ParsedInternalKey* parsed_key,
uint32_t* bytes_read, bool* internal_key_valid, Slice* internal_key) {
Slice tmp_slice;
bool success = file_reader_.Read(file_offset, user_key_size + 1, &tmp_slice);
if (!success) {
return file_reader_.status();
}
if (tmp_slice[user_key_size] == PlainTableFactory::kValueTypeSeqId0) {
// Special encoding for the row with seqID=0
parsed_key->user_key = Slice(tmp_slice.data(), user_key_size);
parsed_key->sequence = 0;
parsed_key->type = kTypeValue;
*bytes_read += user_key_size + 1;
*internal_key_valid = false;
} else {
success = file_reader_.Read(file_offset, user_key_size + 8, internal_key);
if (!success) {
return file_reader_.status();
}
*internal_key_valid = true;
Status pik_status = ParseInternalKey(*internal_key, parsed_key,
false /* log_err_key */); // TODO
if (!pik_status.ok()) {
return Status::Corruption(
Slice("Corrupted key found during next key read. "),
pik_status.getState());
}
*bytes_read += user_key_size + 8;
}
return Status::OK();
}
Status PlainTableKeyDecoder::NextPlainEncodingKey(uint32_t start_offset,
ParsedInternalKey* parsed_key,
Slice* internal_key,
uint32_t* bytes_read,
bool* /*seekable*/) {
uint32_t user_key_size = 0;
Status s;
if (fixed_user_key_len_ != kPlainTableVariableLength) {
user_key_size = fixed_user_key_len_;
} else {
uint32_t tmp_size = 0;
uint32_t tmp_read;
bool success =
file_reader_.ReadVarint32(start_offset, &tmp_size, &tmp_read);
if (!success) {
return file_reader_.status();
}
assert(tmp_read > 0);
user_key_size = tmp_size;
*bytes_read = tmp_read;
}
// dummy initial value to avoid compiler complain
bool decoded_internal_key_valid = true;
Slice decoded_internal_key;
s = ReadInternalKey(start_offset + *bytes_read, user_key_size, parsed_key,
bytes_read, &decoded_internal_key_valid,
&decoded_internal_key);
if (!s.ok()) {
return s;
}
if (!file_reader_.file_info()->is_mmap_mode) {
cur_key_.SetInternalKey(*parsed_key);
parsed_key->user_key =
Slice(cur_key_.GetInternalKey().data(), user_key_size);
if (internal_key != nullptr) {
*internal_key = cur_key_.GetInternalKey();
}
} else if (internal_key != nullptr) {
if (decoded_internal_key_valid) {
*internal_key = decoded_internal_key;
} else {
// Need to copy out the internal key
cur_key_.SetInternalKey(*parsed_key);
*internal_key = cur_key_.GetInternalKey();
}
}
return Status::OK();
}
Status PlainTableKeyDecoder::NextPrefixEncodingKey(
uint32_t start_offset, ParsedInternalKey* parsed_key, Slice* internal_key,
uint32_t* bytes_read, bool* seekable) {
PlainTableEntryType entry_type;
bool expect_suffix = false;
Status s;
do {
uint32_t size = 0;
// dummy initial value to avoid compiler complain
bool decoded_internal_key_valid = true;
uint32_t my_bytes_read = 0;
s = DecodeSize(start_offset + *bytes_read, &entry_type, &size,
&my_bytes_read);
if (!s.ok()) {
return s;
}
if (my_bytes_read == 0) {
return Status::Corruption("Unexpected EOF when reading size of the key");
}
*bytes_read += my_bytes_read;
switch (entry_type) {
case kFullKey: {
expect_suffix = false;
Slice decoded_internal_key;
s = ReadInternalKey(start_offset + *bytes_read, size, parsed_key,
bytes_read, &decoded_internal_key_valid,
&decoded_internal_key);
if (!s.ok()) {
return s;
}
if (!file_reader_.file_info()->is_mmap_mode ||
(internal_key != nullptr && !decoded_internal_key_valid)) {
// In non-mmap mode, always need to make a copy of keys returned to
// users, because after reading value for the key, the key might
// be invalid.
cur_key_.SetInternalKey(*parsed_key);
saved_user_key_ = cur_key_.GetUserKey();
if (!file_reader_.file_info()->is_mmap_mode) {
parsed_key->user_key =
Slice(cur_key_.GetInternalKey().data(), size);
}
if (internal_key != nullptr) {
*internal_key = cur_key_.GetInternalKey();
}
} else {
if (internal_key != nullptr) {
*internal_key = decoded_internal_key;
}
saved_user_key_ = parsed_key->user_key;
}
break;
}
case kPrefixFromPreviousKey: {
if (seekable != nullptr) {
*seekable = false;
}
prefix_len_ = size;
assert(prefix_extractor_ == nullptr ||
prefix_extractor_->Transform(saved_user_key_).size() ==
prefix_len_);
// Need read another size flag for suffix
expect_suffix = true;
break;
}
case kKeySuffix: {
expect_suffix = false;
if (seekable != nullptr) {
*seekable = false;
}
Slice tmp_slice;
s = ReadInternalKey(start_offset + *bytes_read, size, parsed_key,
bytes_read, &decoded_internal_key_valid,
&tmp_slice);
if (!s.ok()) {
return s;
}
if (!file_reader_.file_info()->is_mmap_mode) {
// In non-mmap mode, we need to make a copy of keys returned to
// users, because after reading value for the key, the key might
// be invalid.
// saved_user_key_ points to cur_key_. We are making a copy of
// the prefix part to another string, and construct the current
// key from the prefix part and the suffix part back to cur_key_.
std::string tmp =
Slice(saved_user_key_.data(), prefix_len_).ToString();
cur_key_.Reserve(prefix_len_ + size);
cur_key_.SetInternalKey(tmp, *parsed_key);
parsed_key->user_key =
Slice(cur_key_.GetInternalKey().data(), prefix_len_ + size);
saved_user_key_ = cur_key_.GetUserKey();
} else {
cur_key_.Reserve(prefix_len_ + size);
cur_key_.SetInternalKey(Slice(saved_user_key_.data(), prefix_len_),
*parsed_key);
}
parsed_key->user_key = cur_key_.GetUserKey();
if (internal_key != nullptr) {
*internal_key = cur_key_.GetInternalKey();
}
break;
}
default:
return Status::Corruption("Un-identified size flag.");
}
} while (expect_suffix); // Another round if suffix is expected.
return Status::OK();
}
Status PlainTableKeyDecoder::NextKey(uint32_t start_offset,
ParsedInternalKey* parsed_key,
Slice* internal_key, Slice* value,
uint32_t* bytes_read, bool* seekable) {
assert(value != nullptr);
Status s = NextKeyNoValue(start_offset, parsed_key, internal_key, bytes_read,
seekable);
if (s.ok()) {
assert(bytes_read != nullptr);
uint32_t value_size;
uint32_t value_size_bytes;
bool success = file_reader_.ReadVarint32(start_offset + *bytes_read,
&value_size, &value_size_bytes);
if (!success) {
return file_reader_.status();
}
if (value_size_bytes == 0) {
return Status::Corruption(
"Unexpected EOF when reading the next value's size.");
}
*bytes_read += value_size_bytes;
success = file_reader_.Read(start_offset + *bytes_read, value_size, value);
if (!success) {
return file_reader_.status();
}
*bytes_read += value_size;
}
return s;
}
Status PlainTableKeyDecoder::NextKeyNoValue(uint32_t start_offset,
ParsedInternalKey* parsed_key,
Slice* internal_key,
uint32_t* bytes_read,
bool* seekable) {
*bytes_read = 0;
if (seekable != nullptr) {
*seekable = true;
}
if (encoding_type_ == kPlain) {
return NextPlainEncodingKey(start_offset, parsed_key, internal_key,
bytes_read, seekable);
} else {
assert(encoding_type_ == kPrefix);
return NextPrefixEncodingKey(start_offset, parsed_key, internal_key,
bytes_read, seekable);
}
}
} // namespace ROCKSDB_NAMESPACE
#endif // ROCKSDB_LIT