rocksdb/db/blob/blob_file_reader.cc
mrambacher 12f1137355 Add a SystemClock class to capture the time functions of an Env (#7858)
Summary:
Introduces and uses a SystemClock class to RocksDB.  This class contains the time-related functions of an Env and these functions can be redirected from the Env to the SystemClock.

Many of the places that used an Env (Timer, PerfStepTimer, RepeatableThread, RateLimiter, WriteController) for time-related functions have been changed to use SystemClock instead.  There are likely more places that can be changed, but this is a start to show what can/should be done.  Over time it would be nice to migrate most (if not all) of the uses of the time functions from the Env to the SystemClock.

There are several Env classes that implement these functions.  Most of these have not been converted yet to SystemClock implementations; that will come in a subsequent PR.  It would be good to unify many of the Mock Timer implementations, so that they behave similarly and be tested similarly (some override Sleep, some use a MockSleep, etc).

Additionally, this change will allow new methods to be introduced to the SystemClock (like https://github.com/facebook/rocksdb/issues/7101 WaitFor) in a consistent manner across a smaller number of classes.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/7858

Reviewed By: pdillinger

Differential Revision: D26006406

Pulled By: mrambacher

fbshipit-source-id: ed10a8abbdab7ff2e23d69d85bd25b3e7e899e90
2021-01-25 22:09:11 -08:00

425 lines
11 KiB
C++

// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include "db/blob/blob_file_reader.h"
#include <cassert>
#include <string>
#include "db/blob/blob_log_format.h"
#include "file/filename.h"
#include "options/cf_options.h"
#include "rocksdb/file_system.h"
#include "rocksdb/slice.h"
#include "rocksdb/status.h"
#include "test_util/sync_point.h"
#include "util/compression.h"
#include "util/crc32c.h"
namespace ROCKSDB_NAMESPACE {
Status BlobFileReader::Create(
const ImmutableCFOptions& immutable_cf_options,
const FileOptions& file_options, uint32_t column_family_id,
HistogramImpl* blob_file_read_hist, uint64_t blob_file_number,
std::unique_ptr<BlobFileReader>* blob_file_reader) {
assert(blob_file_reader);
assert(!*blob_file_reader);
uint64_t file_size = 0;
std::unique_ptr<RandomAccessFileReader> file_reader;
{
const Status s =
OpenFile(immutable_cf_options, file_options, blob_file_read_hist,
blob_file_number, &file_size, &file_reader);
if (!s.ok()) {
return s;
}
}
assert(file_reader);
CompressionType compression_type = kNoCompression;
{
const Status s =
ReadHeader(file_reader.get(), column_family_id, &compression_type);
if (!s.ok()) {
return s;
}
}
{
const Status s = ReadFooter(file_size, file_reader.get());
if (!s.ok()) {
return s;
}
}
blob_file_reader->reset(
new BlobFileReader(std::move(file_reader), file_size, compression_type));
return Status::OK();
}
Status BlobFileReader::OpenFile(
const ImmutableCFOptions& immutable_cf_options,
const FileOptions& file_opts, HistogramImpl* blob_file_read_hist,
uint64_t blob_file_number, uint64_t* file_size,
std::unique_ptr<RandomAccessFileReader>* file_reader) {
assert(file_size);
assert(file_reader);
const auto& cf_paths = immutable_cf_options.cf_paths;
assert(!cf_paths.empty());
const std::string blob_file_path =
BlobFileName(cf_paths.front().path, blob_file_number);
FileSystem* const fs = immutable_cf_options.fs;
assert(fs);
constexpr IODebugContext* dbg = nullptr;
{
TEST_SYNC_POINT("BlobFileReader::OpenFile:GetFileSize");
const Status s =
fs->GetFileSize(blob_file_path, IOOptions(), file_size, dbg);
if (!s.ok()) {
return s;
}
}
if (*file_size < BlobLogHeader::kSize + BlobLogFooter::kSize) {
return Status::Corruption("Malformed blob file");
}
std::unique_ptr<FSRandomAccessFile> file;
{
TEST_SYNC_POINT("BlobFileReader::OpenFile:NewRandomAccessFile");
const Status s =
fs->NewRandomAccessFile(blob_file_path, file_opts, &file, dbg);
if (!s.ok()) {
return s;
}
}
assert(file);
if (immutable_cf_options.advise_random_on_open) {
file->Hint(FSRandomAccessFile::kRandom);
}
file_reader->reset(new RandomAccessFileReader(
std::move(file), blob_file_path,
immutable_cf_options.env->GetSystemClock(), std::shared_ptr<IOTracer>(),
immutable_cf_options.statistics, BLOB_DB_BLOB_FILE_READ_MICROS,
blob_file_read_hist, immutable_cf_options.rate_limiter,
immutable_cf_options.listeners));
return Status::OK();
}
Status BlobFileReader::ReadHeader(const RandomAccessFileReader* file_reader,
uint32_t column_family_id,
CompressionType* compression_type) {
assert(file_reader);
assert(compression_type);
Slice header_slice;
Buffer buf;
AlignedBuf aligned_buf;
{
TEST_SYNC_POINT("BlobFileReader::ReadHeader:ReadFromFile");
constexpr uint64_t read_offset = 0;
constexpr size_t read_size = BlobLogHeader::kSize;
const Status s = ReadFromFile(file_reader, read_offset, read_size,
&header_slice, &buf, &aligned_buf);
if (!s.ok()) {
return s;
}
TEST_SYNC_POINT_CALLBACK("BlobFileReader::ReadHeader:TamperWithResult",
&header_slice);
}
BlobLogHeader header;
{
const Status s = header.DecodeFrom(header_slice);
if (!s.ok()) {
return s;
}
}
constexpr ExpirationRange no_expiration_range;
if (header.has_ttl || header.expiration_range != no_expiration_range) {
return Status::Corruption("Unexpected TTL blob file");
}
if (header.column_family_id != column_family_id) {
return Status::Corruption("Column family ID mismatch");
}
*compression_type = header.compression;
return Status::OK();
}
Status BlobFileReader::ReadFooter(uint64_t file_size,
const RandomAccessFileReader* file_reader) {
assert(file_size >= BlobLogHeader::kSize + BlobLogFooter::kSize);
assert(file_reader);
Slice footer_slice;
Buffer buf;
AlignedBuf aligned_buf;
{
TEST_SYNC_POINT("BlobFileReader::ReadFooter:ReadFromFile");
const uint64_t read_offset = file_size - BlobLogFooter::kSize;
constexpr size_t read_size = BlobLogFooter::kSize;
const Status s = ReadFromFile(file_reader, read_offset, read_size,
&footer_slice, &buf, &aligned_buf);
if (!s.ok()) {
return s;
}
TEST_SYNC_POINT_CALLBACK("BlobFileReader::ReadFooter:TamperWithResult",
&footer_slice);
}
BlobLogFooter footer;
{
const Status s = footer.DecodeFrom(footer_slice);
if (!s.ok()) {
return s;
}
}
constexpr ExpirationRange no_expiration_range;
if (footer.expiration_range != no_expiration_range) {
return Status::Corruption("Unexpected TTL blob file");
}
return Status::OK();
}
Status BlobFileReader::ReadFromFile(const RandomAccessFileReader* file_reader,
uint64_t read_offset, size_t read_size,
Slice* slice, Buffer* buf,
AlignedBuf* aligned_buf) {
assert(slice);
assert(buf);
assert(aligned_buf);
assert(file_reader);
Status s;
if (file_reader->use_direct_io()) {
constexpr char* scratch = nullptr;
s = file_reader->Read(IOOptions(), read_offset, read_size, slice, scratch,
aligned_buf);
} else {
buf->reset(new char[read_size]);
constexpr AlignedBuf* aligned_scratch = nullptr;
s = file_reader->Read(IOOptions(), read_offset, read_size, slice,
buf->get(), aligned_scratch);
}
if (!s.ok()) {
return s;
}
if (slice->size() != read_size) {
return Status::Corruption("Failed to read data from blob file");
}
return Status::OK();
}
BlobFileReader::BlobFileReader(
std::unique_ptr<RandomAccessFileReader>&& file_reader, uint64_t file_size,
CompressionType compression_type)
: file_reader_(std::move(file_reader)),
file_size_(file_size),
compression_type_(compression_type) {
assert(file_reader_);
}
BlobFileReader::~BlobFileReader() = default;
Status BlobFileReader::GetBlob(const ReadOptions& read_options,
const Slice& user_key, uint64_t offset,
uint64_t value_size,
CompressionType compression_type,
PinnableSlice* value) const {
assert(value);
const uint64_t key_size = user_key.size();
if (!IsValidBlobOffset(offset, key_size, value_size, file_size_)) {
return Status::Corruption("Invalid blob offset");
}
if (compression_type != compression_type_) {
return Status::Corruption("Compression type mismatch when reading blob");
}
// Note: if verify_checksum is set, we read the entire blob record to be able
// to perform the verification; otherwise, we just read the blob itself. Since
// the offset in BlobIndex actually points to the blob value, we need to make
// an adjustment in the former case.
const uint64_t adjustment =
read_options.verify_checksums
? BlobLogRecord::CalculateAdjustmentForRecordHeader(key_size)
: 0;
assert(offset >= adjustment);
Slice record_slice;
Buffer buf;
AlignedBuf aligned_buf;
{
TEST_SYNC_POINT("BlobFileReader::GetBlob:ReadFromFile");
const uint64_t record_offset = offset - adjustment;
const uint64_t record_size = value_size + adjustment;
const Status s = ReadFromFile(file_reader_.get(), record_offset,
static_cast<size_t>(record_size),
&record_slice, &buf, &aligned_buf);
if (!s.ok()) {
return s;
}
TEST_SYNC_POINT_CALLBACK("BlobFileReader::GetBlob:TamperWithResult",
&record_slice);
}
if (read_options.verify_checksums) {
const Status s = VerifyBlob(record_slice, user_key, value_size);
if (!s.ok()) {
return s;
}
}
const Slice value_slice(record_slice.data() + adjustment, value_size);
{
const Status s =
UncompressBlobIfNeeded(value_slice, compression_type, value);
if (!s.ok()) {
return s;
}
}
return Status::OK();
}
Status BlobFileReader::VerifyBlob(const Slice& record_slice,
const Slice& user_key, uint64_t value_size) {
BlobLogRecord record;
const Slice header_slice(record_slice.data(), BlobLogRecord::kHeaderSize);
{
const Status s = record.DecodeHeaderFrom(header_slice);
if (!s.ok()) {
return s;
}
}
if (record.key_size != user_key.size()) {
return Status::Corruption("Key size mismatch when reading blob");
}
if (record.value_size != value_size) {
return Status::Corruption("Value size mismatch when reading blob");
}
record.key =
Slice(record_slice.data() + BlobLogRecord::kHeaderSize, record.key_size);
if (record.key != user_key) {
return Status::Corruption("Key mismatch when reading blob");
}
record.value = Slice(record.key.data() + record.key_size, value_size);
{
TEST_SYNC_POINT_CALLBACK("BlobFileReader::VerifyBlob:CheckBlobCRC",
&record);
const Status s = record.CheckBlobCRC();
if (!s.ok()) {
return s;
}
}
return Status::OK();
}
Status BlobFileReader::UncompressBlobIfNeeded(const Slice& value_slice,
CompressionType compression_type,
PinnableSlice* value) {
assert(value);
if (compression_type == kNoCompression) {
SaveValue(value_slice, value);
return Status::OK();
}
UncompressionContext context(compression_type);
UncompressionInfo info(context, UncompressionDict::GetEmptyDict(),
compression_type);
size_t uncompressed_size = 0;
constexpr uint32_t compression_format_version = 2;
constexpr MemoryAllocator* allocator = nullptr;
CacheAllocationPtr output =
UncompressData(info, value_slice.data(), value_slice.size(),
&uncompressed_size, compression_format_version, allocator);
TEST_SYNC_POINT_CALLBACK(
"BlobFileReader::UncompressBlobIfNeeded:TamperWithResult", &output);
if (!output) {
return Status::Corruption("Unable to uncompress blob");
}
SaveValue(Slice(output.get(), uncompressed_size), value);
return Status::OK();
}
void BlobFileReader::SaveValue(const Slice& src, PinnableSlice* dst) {
assert(dst);
if (dst->IsPinned()) {
dst->Reset();
}
dst->PinSelf(src);
}
} // namespace ROCKSDB_NAMESPACE