diff --git a/CMakeLists.txt b/CMakeLists.txt index 82c96a4e3..4b8b3542e 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -573,6 +573,7 @@ set(SOURCES db/blob/blob_file_builder.cc db/blob/blob_file_garbage.cc db/blob/blob_file_meta.cc + db/blob/blob_file_reader.cc db/blob/blob_log_format.cc db/blob/blob_log_reader.cc db/blob/blob_log_writer.cc @@ -1044,6 +1045,7 @@ if(WITH_TESTS) db/blob/blob_file_addition_test.cc db/blob/blob_file_builder_test.cc db/blob/blob_file_garbage_test.cc + db/blob/blob_file_reader_test.cc db/blob/db_blob_index_test.cc db/column_family_test.cc db/compact_files_test.cc diff --git a/Makefile b/Makefile index cbec036c2..6894ef07a 100644 --- a/Makefile +++ b/Makefile @@ -578,6 +578,7 @@ ifdef ASSERT_STATUS_CHECKED blob_file_addition_test \ blob_file_builder_test \ blob_file_garbage_test \ + blob_file_reader_test \ bloom_test \ cassandra_format_test \ cassandra_row_merge_test \ @@ -1910,6 +1911,9 @@ blob_file_builder_test: $(OBJ_DIR)/db/blob/blob_file_builder_test.o $(TEST_LIBRA blob_file_garbage_test: $(OBJ_DIR)/db/blob/blob_file_garbage_test.o $(TEST_LIBRARY) $(LIBRARY) $(AM_LINK) +blob_file_reader_test: $(OBJ_DIR)/db/blob/blob_file_reader_test.o $(TEST_LIBRARY) $(LIBRARY) + $(AM_LINK) + timer_test: $(OBJ_DIR)/util/timer_test.o $(TEST_LIBRARY) $(LIBRARY) $(AM_LINK) diff --git a/TARGETS b/TARGETS index ba8c885e0..28c570757 100644 --- a/TARGETS +++ b/TARGETS @@ -137,6 +137,7 @@ cpp_library( "db/blob/blob_file_builder.cc", "db/blob/blob_file_garbage.cc", "db/blob/blob_file_meta.cc", + "db/blob/blob_file_reader.cc", "db/blob/blob_log_format.cc", "db/blob/blob_log_reader.cc", "db/blob/blob_log_writer.cc", @@ -424,6 +425,7 @@ cpp_library( "db/blob/blob_file_builder.cc", "db/blob/blob_file_garbage.cc", "db/blob/blob_file_meta.cc", + "db/blob/blob_file_reader.cc", "db/blob/blob_log_format.cc", "db/blob/blob_log_reader.cc", "db/blob/blob_log_writer.cc", @@ -862,6 +864,13 @@ ROCKS_TESTS = [ [], [], ], + [ + "blob_file_reader_test", + "db/blob/blob_file_reader_test.cc", + "serial", + [], + [], + ], [ "block_based_filter_block_test", "table/block_based/block_based_filter_block_test.cc", diff --git a/db/blob/blob_file_reader.cc b/db/blob/blob_file_reader.cc new file mode 100644 index 000000000..0cae4eb53 --- /dev/null +++ b/db/blob/blob_file_reader.cc @@ -0,0 +1,423 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "db/blob/blob_file_reader.h" + +#include +#include + +#include "db/blob/blob_log_format.h" +#include "file/filename.h" +#include "options/cf_options.h" +#include "rocksdb/file_system.h" +#include "rocksdb/slice.h" +#include "rocksdb/status.h" +#include "test_util/sync_point.h" +#include "util/compression.h" +#include "util/crc32c.h" + +namespace ROCKSDB_NAMESPACE { + +Status BlobFileReader::Create( + const ImmutableCFOptions& immutable_cf_options, + const FileOptions& file_options, uint32_t column_family_id, + HistogramImpl* blob_file_read_hist, uint64_t blob_file_number, + std::unique_ptr* blob_file_reader) { + assert(blob_file_reader); + assert(!*blob_file_reader); + + uint64_t file_size = 0; + std::unique_ptr file_reader; + + { + const Status s = + OpenFile(immutable_cf_options, file_options, blob_file_read_hist, + blob_file_number, &file_size, &file_reader); + if (!s.ok()) { + return s; + } + } + + assert(file_reader); + + CompressionType compression_type = kNoCompression; + + { + const Status s = + ReadHeader(file_reader.get(), column_family_id, &compression_type); + if (!s.ok()) { + return s; + } + } + + { + const Status s = ReadFooter(file_size, file_reader.get()); + if (!s.ok()) { + return s; + } + } + + blob_file_reader->reset( + new BlobFileReader(std::move(file_reader), file_size, compression_type)); + + return Status::OK(); +} + +Status BlobFileReader::OpenFile( + const ImmutableCFOptions& immutable_cf_options, + const FileOptions& file_opts, HistogramImpl* blob_file_read_hist, + uint64_t blob_file_number, uint64_t* file_size, + std::unique_ptr* file_reader) { + assert(file_size); + assert(file_reader); + + const auto& cf_paths = immutable_cf_options.cf_paths; + assert(!cf_paths.empty()); + + const std::string blob_file_path = + BlobFileName(cf_paths.front().path, blob_file_number); + + FileSystem* const fs = immutable_cf_options.fs; + assert(fs); + + constexpr IODebugContext* dbg = nullptr; + + { + TEST_SYNC_POINT("BlobFileReader::OpenFile:GetFileSize"); + + const Status s = + fs->GetFileSize(blob_file_path, IOOptions(), file_size, dbg); + if (!s.ok()) { + return s; + } + } + + if (*file_size < BlobLogHeader::kSize + BlobLogFooter::kSize) { + return Status::Corruption("Malformed blob file"); + } + + std::unique_ptr file; + + { + TEST_SYNC_POINT("BlobFileReader::OpenFile:NewRandomAccessFile"); + + const Status s = + fs->NewRandomAccessFile(blob_file_path, file_opts, &file, dbg); + if (!s.ok()) { + return s; + } + } + + assert(file); + + if (immutable_cf_options.advise_random_on_open) { + file->Hint(FSRandomAccessFile::kRandom); + } + + file_reader->reset(new RandomAccessFileReader( + std::move(file), blob_file_path, immutable_cf_options.env, + std::shared_ptr(), immutable_cf_options.statistics, + BLOB_DB_BLOB_FILE_READ_MICROS, blob_file_read_hist, + immutable_cf_options.rate_limiter, immutable_cf_options.listeners)); + + return Status::OK(); +} + +Status BlobFileReader::ReadHeader(const RandomAccessFileReader* file_reader, + uint32_t column_family_id, + CompressionType* compression_type) { + assert(file_reader); + assert(compression_type); + + Slice header_slice; + Buffer buf; + AlignedBuf aligned_buf; + + { + TEST_SYNC_POINT("BlobFileReader::ReadHeader:ReadFromFile"); + + constexpr uint64_t read_offset = 0; + constexpr size_t read_size = BlobLogHeader::kSize; + + const Status s = ReadFromFile(file_reader, read_offset, read_size, + &header_slice, &buf, &aligned_buf); + if (!s.ok()) { + return s; + } + + TEST_SYNC_POINT_CALLBACK("BlobFileReader::ReadHeader:TamperWithResult", + &header_slice); + } + + BlobLogHeader header; + + { + const Status s = header.DecodeFrom(header_slice); + if (!s.ok()) { + return s; + } + } + + constexpr ExpirationRange no_expiration_range; + + if (header.has_ttl || header.expiration_range != no_expiration_range) { + return Status::Corruption("Unexpected TTL blob file"); + } + + if (header.column_family_id != column_family_id) { + return Status::Corruption("Column family ID mismatch"); + } + + *compression_type = header.compression; + + return Status::OK(); +} + +Status BlobFileReader::ReadFooter(uint64_t file_size, + const RandomAccessFileReader* file_reader) { + assert(file_size >= BlobLogHeader::kSize + BlobLogFooter::kSize); + assert(file_reader); + + Slice footer_slice; + Buffer buf; + AlignedBuf aligned_buf; + + { + TEST_SYNC_POINT("BlobFileReader::ReadFooter:ReadFromFile"); + + const uint64_t read_offset = file_size - BlobLogFooter::kSize; + constexpr size_t read_size = BlobLogFooter::kSize; + + const Status s = ReadFromFile(file_reader, read_offset, read_size, + &footer_slice, &buf, &aligned_buf); + if (!s.ok()) { + return s; + } + + TEST_SYNC_POINT_CALLBACK("BlobFileReader::ReadFooter:TamperWithResult", + &footer_slice); + } + + BlobLogFooter footer; + + { + const Status s = footer.DecodeFrom(footer_slice); + if (!s.ok()) { + return s; + } + } + + constexpr ExpirationRange no_expiration_range; + + if (footer.expiration_range != no_expiration_range) { + return Status::Corruption("Unexpected TTL blob file"); + } + + return Status::OK(); +} + +Status BlobFileReader::ReadFromFile(const RandomAccessFileReader* file_reader, + uint64_t read_offset, size_t read_size, + Slice* slice, Buffer* buf, + AlignedBuf* aligned_buf) { + assert(slice); + assert(buf); + assert(aligned_buf); + + assert(file_reader); + + Status s; + + if (file_reader->use_direct_io()) { + constexpr char* scratch = nullptr; + + s = file_reader->Read(IOOptions(), read_offset, read_size, slice, scratch, + aligned_buf); + } else { + buf->reset(new char[read_size]); + constexpr AlignedBuf* aligned_scratch = nullptr; + + s = file_reader->Read(IOOptions(), read_offset, read_size, slice, + buf->get(), aligned_scratch); + } + + if (!s.ok()) { + return s; + } + + if (slice->size() != read_size) { + return Status::Corruption("Failed to read data from blob file"); + } + + return Status::OK(); +} + +BlobFileReader::BlobFileReader( + std::unique_ptr&& file_reader, uint64_t file_size, + CompressionType compression_type) + : file_reader_(std::move(file_reader)), + file_size_(file_size), + compression_type_(compression_type) { + assert(file_reader_); +} + +BlobFileReader::~BlobFileReader() = default; + +Status BlobFileReader::GetBlob(const ReadOptions& read_options, + const Slice& user_key, uint64_t offset, + uint64_t value_size, + CompressionType compression_type, + PinnableSlice* value) const { + assert(value); + + const uint64_t key_size = user_key.size(); + + if (!IsValidBlobOffset(offset, key_size, value_size, file_size_)) { + return Status::Corruption("Invalid blob offset"); + } + + if (compression_type != compression_type_) { + return Status::Corruption("Compression type mismatch when reading blob"); + } + + // Note: if verify_checksum is set, we read the entire blob record to be able + // to perform the verification; otherwise, we just read the blob itself. Since + // the offset in BlobIndex actually points to the blob value, we need to make + // an adjustment in the former case. + const uint64_t adjustment = + read_options.verify_checksums + ? BlobLogRecord::CalculateAdjustmentForRecordHeader(key_size) + : 0; + assert(offset >= adjustment); + + Slice record_slice; + Buffer buf; + AlignedBuf aligned_buf; + + { + TEST_SYNC_POINT("BlobFileReader::GetBlob:ReadFromFile"); + + const uint64_t record_offset = offset - adjustment; + const uint64_t record_size = value_size + adjustment; + + const Status s = ReadFromFile(file_reader_.get(), record_offset, + static_cast(record_size), + &record_slice, &buf, &aligned_buf); + if (!s.ok()) { + return s; + } + + TEST_SYNC_POINT_CALLBACK("BlobFileReader::GetBlob:TamperWithResult", + &record_slice); + } + + if (read_options.verify_checksums) { + const Status s = VerifyBlob(record_slice, user_key, value_size); + if (!s.ok()) { + return s; + } + } + + const Slice value_slice(record_slice.data() + adjustment, value_size); + + { + const Status s = + UncompressBlobIfNeeded(value_slice, compression_type, value); + if (!s.ok()) { + return s; + } + } + + return Status::OK(); +} + +Status BlobFileReader::VerifyBlob(const Slice& record_slice, + const Slice& user_key, uint64_t value_size) { + BlobLogRecord record; + + const Slice header_slice(record_slice.data(), BlobLogRecord::kHeaderSize); + + { + const Status s = record.DecodeHeaderFrom(header_slice); + if (!s.ok()) { + return s; + } + } + + if (record.key_size != user_key.size()) { + return Status::Corruption("Key size mismatch when reading blob"); + } + + if (record.value_size != value_size) { + return Status::Corruption("Value size mismatch when reading blob"); + } + + record.key = + Slice(record_slice.data() + BlobLogRecord::kHeaderSize, record.key_size); + if (record.key != user_key) { + return Status::Corruption("Key mismatch when reading blob"); + } + + record.value = Slice(record.key.data() + record.key_size, value_size); + + { + TEST_SYNC_POINT_CALLBACK("BlobFileReader::VerifyBlob:CheckBlobCRC", + &record); + + const Status s = record.CheckBlobCRC(); + if (!s.ok()) { + return s; + } + } + + return Status::OK(); +} + +Status BlobFileReader::UncompressBlobIfNeeded(const Slice& value_slice, + CompressionType compression_type, + PinnableSlice* value) { + assert(value); + + if (compression_type == kNoCompression) { + SaveValue(value_slice, value); + + return Status::OK(); + } + + UncompressionContext context(compression_type); + UncompressionInfo info(context, UncompressionDict::GetEmptyDict(), + compression_type); + + size_t uncompressed_size = 0; + constexpr uint32_t compression_format_version = 2; + constexpr MemoryAllocator* allocator = nullptr; + + CacheAllocationPtr output = + UncompressData(info, value_slice.data(), value_slice.size(), + &uncompressed_size, compression_format_version, allocator); + + TEST_SYNC_POINT_CALLBACK( + "BlobFileReader::UncompressBlobIfNeeded:TamperWithResult", &output); + + if (!output) { + return Status::Corruption("Unable to uncompress blob"); + } + + SaveValue(Slice(output.get(), uncompressed_size), value); + + return Status::OK(); +} + +void BlobFileReader::SaveValue(const Slice& src, PinnableSlice* dst) { + assert(dst); + + if (dst->IsPinned()) { + dst->Reset(); + } + + dst->PinSelf(src); +} + +} // namespace ROCKSDB_NAMESPACE diff --git a/db/blob/blob_file_reader.h b/db/blob/blob_file_reader.h new file mode 100644 index 000000000..8c7df393d --- /dev/null +++ b/db/blob/blob_file_reader.h @@ -0,0 +1,81 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#include +#include + +#include "file/random_access_file_reader.h" +#include "rocksdb/compression_type.h" +#include "rocksdb/rocksdb_namespace.h" + +namespace ROCKSDB_NAMESPACE { + +class Status; +struct ImmutableCFOptions; +struct FileOptions; +class HistogramImpl; +struct ReadOptions; +class Slice; +class PinnableSlice; + +class BlobFileReader { + public: + static Status Create(const ImmutableCFOptions& immutable_cf_options, + const FileOptions& file_options, + uint32_t column_family_id, + HistogramImpl* blob_file_read_hist, + uint64_t blob_file_number, + std::unique_ptr* reader); + + BlobFileReader(const BlobFileReader&) = delete; + BlobFileReader& operator=(const BlobFileReader&) = delete; + + ~BlobFileReader(); + + Status GetBlob(const ReadOptions& read_options, const Slice& user_key, + uint64_t offset, uint64_t value_size, + CompressionType compression_type, PinnableSlice* value) const; + + private: + BlobFileReader(std::unique_ptr&& file_reader, + uint64_t file_size, CompressionType compression_type); + + static Status OpenFile(const ImmutableCFOptions& immutable_cf_options, + const FileOptions& file_opts, + HistogramImpl* blob_file_read_hist, + uint64_t blob_file_number, uint64_t* file_size, + std::unique_ptr* file_reader); + + static Status ReadHeader(const RandomAccessFileReader* file_reader, + uint32_t column_family_id, + CompressionType* compression_type); + + static Status ReadFooter(uint64_t file_size, + const RandomAccessFileReader* file_reader); + + using Buffer = std::unique_ptr; + + static Status ReadFromFile(const RandomAccessFileReader* file_reader, + uint64_t read_offset, size_t read_size, + Slice* slice, Buffer* buf, + AlignedBuf* aligned_buf); + + static Status VerifyBlob(const Slice& record_slice, const Slice& user_key, + uint64_t value_size); + + static Status UncompressBlobIfNeeded(const Slice& value_slice, + CompressionType compression_type, + PinnableSlice* value); + + static void SaveValue(const Slice& src, PinnableSlice* dst); + + std::unique_ptr file_reader_; + uint64_t file_size_; + CompressionType compression_type_; +}; + +} // namespace ROCKSDB_NAMESPACE diff --git a/db/blob/blob_file_reader_test.cc b/db/blob/blob_file_reader_test.cc new file mode 100644 index 000000000..0830b3b8f --- /dev/null +++ b/db/blob/blob_file_reader_test.cc @@ -0,0 +1,768 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "db/blob/blob_file_reader.h" + +#include +#include + +#include "db/blob/blob_log_format.h" +#include "db/blob/blob_log_writer.h" +#include "env/mock_env.h" +#include "file/filename.h" +#include "file/read_write_util.h" +#include "file/writable_file_writer.h" +#include "options/cf_options.h" +#include "rocksdb/env.h" +#include "rocksdb/file_system.h" +#include "rocksdb/options.h" +#include "test_util/testharness.h" +#include "util/compression.h" +#include "utilities/fault_injection_env.h" + +namespace ROCKSDB_NAMESPACE { + +namespace { + +// Creates a test blob file with a single blob in it. Note: this method +// makes it possible to test various corner cases by allowing the caller +// to specify the contents of various blob file header/footer fields. +void WriteBlobFile(const ImmutableCFOptions& immutable_cf_options, + uint32_t column_family_id, bool has_ttl, + const ExpirationRange& expiration_range_header, + const ExpirationRange& expiration_range_footer, + uint64_t blob_file_number, const Slice& key, + const Slice& blob, CompressionType compression_type, + uint64_t* blob_offset, uint64_t* blob_size) { + assert(!immutable_cf_options.cf_paths.empty()); + assert(blob_offset); + assert(blob_size); + + const std::string blob_file_path = BlobFileName( + immutable_cf_options.cf_paths.front().path, blob_file_number); + + std::unique_ptr file; + ASSERT_OK(NewWritableFile(immutable_cf_options.fs, blob_file_path, &file, + FileOptions())); + + std::unique_ptr file_writer( + new WritableFileWriter(std::move(file), blob_file_path, FileOptions(), + immutable_cf_options.env)); + + constexpr Statistics* statistics = nullptr; + constexpr bool use_fsync = false; + + BlobLogWriter blob_log_writer(std::move(file_writer), + immutable_cf_options.env, statistics, + blob_file_number, use_fsync); + + BlobLogHeader header(column_family_id, compression_type, has_ttl, + expiration_range_header); + + ASSERT_OK(blob_log_writer.WriteHeader(header)); + + std::string compressed_blob; + Slice blob_to_write; + + if (compression_type == kNoCompression) { + blob_to_write = blob; + *blob_size = blob.size(); + } else { + CompressionOptions opts; + CompressionContext context(compression_type); + constexpr uint64_t sample_for_compression = 0; + + CompressionInfo info(opts, context, CompressionDict::GetEmptyDict(), + compression_type, sample_for_compression); + + constexpr uint32_t compression_format_version = 2; + + ASSERT_TRUE( + CompressData(blob, info, compression_format_version, &compressed_blob)); + + blob_to_write = compressed_blob; + *blob_size = compressed_blob.size(); + } + + uint64_t key_offset = 0; + + ASSERT_OK( + blob_log_writer.AddRecord(key, blob_to_write, &key_offset, blob_offset)); + + BlobLogFooter footer; + footer.blob_count = 1; + footer.expiration_range = expiration_range_footer; + + std::string checksum_method; + std::string checksum_value; + + ASSERT_OK( + blob_log_writer.AppendFooter(footer, &checksum_method, &checksum_value)); +} + +} // anonymous namespace + +class BlobFileReaderTest : public testing::Test { + protected: + BlobFileReaderTest() : mock_env_(Env::Default()) {} + + MockEnv mock_env_; +}; + +TEST_F(BlobFileReaderTest, CreateReaderAndGetBlob) { + Options options; + options.env = &mock_env_; + options.cf_paths.emplace_back( + test::PerThreadDBPath(&mock_env_, + "BlobFileReaderTest_CreateReaderAndGetBlob"), + 0); + options.enable_blob_files = true; + + ImmutableCFOptions immutable_cf_options(options); + + constexpr uint32_t column_family_id = 1; + constexpr bool has_ttl = false; + constexpr ExpirationRange expiration_range; + constexpr uint64_t blob_file_number = 1; + constexpr char key[] = "key"; + constexpr char blob[] = "blob"; + + uint64_t blob_offset = 0; + uint64_t blob_size = 0; + + WriteBlobFile(immutable_cf_options, column_family_id, has_ttl, + expiration_range, expiration_range, blob_file_number, key, blob, + kNoCompression, &blob_offset, &blob_size); + + constexpr HistogramImpl* blob_file_read_hist = nullptr; + + std::unique_ptr reader; + + ASSERT_OK(BlobFileReader::Create(immutable_cf_options, FileOptions(), + column_family_id, blob_file_read_hist, + blob_file_number, &reader)); + + // Make sure the blob can be retrieved with and without checksum verification + ReadOptions read_options; + read_options.verify_checksums = false; + + { + PinnableSlice value; + + ASSERT_OK(reader->GetBlob(read_options, key, blob_offset, blob_size, + kNoCompression, &value)); + ASSERT_EQ(value, blob); + } + + read_options.verify_checksums = true; + + { + PinnableSlice value; + + ASSERT_OK(reader->GetBlob(read_options, key, blob_offset, blob_size, + kNoCompression, &value)); + ASSERT_EQ(value, blob); + } + + // Invalid offset (too close to start of file) + { + PinnableSlice value; + + ASSERT_TRUE(reader + ->GetBlob(read_options, key, blob_offset - 1, blob_size, + kNoCompression, &value) + .IsCorruption()); + } + + // Invalid offset (too close to end of file) + { + PinnableSlice value; + + ASSERT_TRUE(reader + ->GetBlob(read_options, key, blob_offset + 1, blob_size, + kNoCompression, &value) + .IsCorruption()); + } + + // Incorrect compression type + { + PinnableSlice value; + + ASSERT_TRUE( + reader + ->GetBlob(read_options, key, blob_offset, blob_size, kZSTD, &value) + .IsCorruption()); + } + + // Incorrect key size + { + constexpr char shorter_key[] = "k"; + PinnableSlice value; + + ASSERT_TRUE(reader + ->GetBlob(read_options, shorter_key, + blob_offset - (sizeof(key) - sizeof(shorter_key)), + blob_size, kNoCompression, &value) + .IsCorruption()); + } + + // Incorrect key + { + constexpr char incorrect_key[] = "foo"; + PinnableSlice value; + + ASSERT_TRUE(reader + ->GetBlob(read_options, incorrect_key, blob_offset, + blob_size, kNoCompression, &value) + .IsCorruption()); + } + + // Incorrect value size + { + PinnableSlice value; + + ASSERT_TRUE(reader + ->GetBlob(read_options, key, blob_offset, blob_size + 1, + kNoCompression, &value) + .IsCorruption()); + } +} + +TEST_F(BlobFileReaderTest, Malformed) { + // Write a blob file consisting of nothing but a header, and make sure we + // detect the error when we open it for reading + + Options options; + options.env = &mock_env_; + options.cf_paths.emplace_back( + test::PerThreadDBPath(&mock_env_, "BlobFileReaderTest_Malformed"), 0); + options.enable_blob_files = true; + + ImmutableCFOptions immutable_cf_options(options); + + constexpr uint32_t column_family_id = 1; + constexpr uint64_t blob_file_number = 1; + + { + constexpr bool has_ttl = false; + constexpr ExpirationRange expiration_range; + + const std::string blob_file_path = BlobFileName( + immutable_cf_options.cf_paths.front().path, blob_file_number); + + std::unique_ptr file; + ASSERT_OK(NewWritableFile(immutable_cf_options.fs, blob_file_path, &file, + FileOptions())); + + std::unique_ptr file_writer( + new WritableFileWriter(std::move(file), blob_file_path, FileOptions(), + immutable_cf_options.env)); + + constexpr Statistics* statistics = nullptr; + constexpr bool use_fsync = false; + + BlobLogWriter blob_log_writer(std::move(file_writer), + immutable_cf_options.env, statistics, + blob_file_number, use_fsync); + + BlobLogHeader header(column_family_id, kNoCompression, has_ttl, + expiration_range); + + ASSERT_OK(blob_log_writer.WriteHeader(header)); + } + + constexpr HistogramImpl* blob_file_read_hist = nullptr; + + std::unique_ptr reader; + + ASSERT_TRUE(BlobFileReader::Create(immutable_cf_options, FileOptions(), + column_family_id, blob_file_read_hist, + blob_file_number, &reader) + .IsCorruption()); +} + +TEST_F(BlobFileReaderTest, TTL) { + Options options; + options.env = &mock_env_; + options.cf_paths.emplace_back( + test::PerThreadDBPath(&mock_env_, "BlobFileReaderTest_TTL"), 0); + options.enable_blob_files = true; + + ImmutableCFOptions immutable_cf_options(options); + + constexpr uint32_t column_family_id = 1; + constexpr bool has_ttl = true; + constexpr ExpirationRange expiration_range; + constexpr uint64_t blob_file_number = 1; + constexpr char key[] = "key"; + constexpr char blob[] = "blob"; + + uint64_t blob_offset = 0; + uint64_t blob_size = 0; + + WriteBlobFile(immutable_cf_options, column_family_id, has_ttl, + expiration_range, expiration_range, blob_file_number, key, blob, + kNoCompression, &blob_offset, &blob_size); + + constexpr HistogramImpl* blob_file_read_hist = nullptr; + + std::unique_ptr reader; + + ASSERT_TRUE(BlobFileReader::Create(immutable_cf_options, FileOptions(), + column_family_id, blob_file_read_hist, + blob_file_number, &reader) + .IsCorruption()); +} + +TEST_F(BlobFileReaderTest, ExpirationRangeInHeader) { + Options options; + options.env = &mock_env_; + options.cf_paths.emplace_back( + test::PerThreadDBPath(&mock_env_, + "BlobFileReaderTest_ExpirationRangeInHeader"), + 0); + options.enable_blob_files = true; + + ImmutableCFOptions immutable_cf_options(options); + + constexpr uint32_t column_family_id = 1; + constexpr bool has_ttl = false; + constexpr ExpirationRange expiration_range_header(1, 2); + constexpr ExpirationRange expiration_range_footer; + constexpr uint64_t blob_file_number = 1; + constexpr char key[] = "key"; + constexpr char blob[] = "blob"; + + uint64_t blob_offset = 0; + uint64_t blob_size = 0; + + WriteBlobFile(immutable_cf_options, column_family_id, has_ttl, + expiration_range_header, expiration_range_footer, + blob_file_number, key, blob, kNoCompression, &blob_offset, + &blob_size); + + constexpr HistogramImpl* blob_file_read_hist = nullptr; + + std::unique_ptr reader; + + ASSERT_TRUE(BlobFileReader::Create(immutable_cf_options, FileOptions(), + column_family_id, blob_file_read_hist, + blob_file_number, &reader) + .IsCorruption()); +} + +TEST_F(BlobFileReaderTest, ExpirationRangeInFooter) { + Options options; + options.env = &mock_env_; + options.cf_paths.emplace_back( + test::PerThreadDBPath(&mock_env_, + "BlobFileReaderTest_ExpirationRangeInFooter"), + 0); + options.enable_blob_files = true; + + ImmutableCFOptions immutable_cf_options(options); + + constexpr uint32_t column_family_id = 1; + constexpr bool has_ttl = false; + constexpr ExpirationRange expiration_range_header; + constexpr ExpirationRange expiration_range_footer(1, 2); + constexpr uint64_t blob_file_number = 1; + constexpr char key[] = "key"; + constexpr char blob[] = "blob"; + + uint64_t blob_offset = 0; + uint64_t blob_size = 0; + + WriteBlobFile(immutable_cf_options, column_family_id, has_ttl, + expiration_range_header, expiration_range_footer, + blob_file_number, key, blob, kNoCompression, &blob_offset, + &blob_size); + + constexpr HistogramImpl* blob_file_read_hist = nullptr; + + std::unique_ptr reader; + + ASSERT_TRUE(BlobFileReader::Create(immutable_cf_options, FileOptions(), + column_family_id, blob_file_read_hist, + blob_file_number, &reader) + .IsCorruption()); +} + +TEST_F(BlobFileReaderTest, IncorrectColumnFamily) { + Options options; + options.env = &mock_env_; + options.cf_paths.emplace_back( + test::PerThreadDBPath(&mock_env_, + "BlobFileReaderTest_IncorrectColumnFamily"), + 0); + options.enable_blob_files = true; + + ImmutableCFOptions immutable_cf_options(options); + + constexpr uint32_t column_family_id = 1; + constexpr bool has_ttl = false; + constexpr ExpirationRange expiration_range; + constexpr uint64_t blob_file_number = 1; + constexpr char key[] = "key"; + constexpr char blob[] = "blob"; + + uint64_t blob_offset = 0; + uint64_t blob_size = 0; + + WriteBlobFile(immutable_cf_options, column_family_id, has_ttl, + expiration_range, expiration_range, blob_file_number, key, blob, + kNoCompression, &blob_offset, &blob_size); + + constexpr HistogramImpl* blob_file_read_hist = nullptr; + + std::unique_ptr reader; + + constexpr uint32_t incorrect_column_family_id = 2; + + ASSERT_TRUE(BlobFileReader::Create(immutable_cf_options, FileOptions(), + incorrect_column_family_id, + blob_file_read_hist, blob_file_number, + &reader) + .IsCorruption()); +} + +TEST_F(BlobFileReaderTest, BlobCRCError) { + Options options; + options.env = &mock_env_; + options.cf_paths.emplace_back( + test::PerThreadDBPath(&mock_env_, "BlobFileReaderTest_BlobCRCError"), 0); + options.enable_blob_files = true; + + ImmutableCFOptions immutable_cf_options(options); + + constexpr uint32_t column_family_id = 1; + constexpr bool has_ttl = false; + constexpr ExpirationRange expiration_range; + constexpr uint64_t blob_file_number = 1; + constexpr char key[] = "key"; + constexpr char blob[] = "blob"; + + uint64_t blob_offset = 0; + uint64_t blob_size = 0; + + WriteBlobFile(immutable_cf_options, column_family_id, has_ttl, + expiration_range, expiration_range, blob_file_number, key, blob, + kNoCompression, &blob_offset, &blob_size); + + constexpr HistogramImpl* blob_file_read_hist = nullptr; + + std::unique_ptr reader; + + ASSERT_OK(BlobFileReader::Create(immutable_cf_options, FileOptions(), + column_family_id, blob_file_read_hist, + blob_file_number, &reader)); + + SyncPoint::GetInstance()->SetCallBack( + "BlobFileReader::VerifyBlob:CheckBlobCRC", [](void* arg) { + BlobLogRecord* const record = static_cast(arg); + assert(record); + + record->blob_crc = 0xfaceb00c; + }); + + SyncPoint::GetInstance()->EnableProcessing(); + + PinnableSlice value; + + ASSERT_TRUE(reader + ->GetBlob(ReadOptions(), key, blob_offset, blob_size, + kNoCompression, &value) + .IsCorruption()); + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); +} + +TEST_F(BlobFileReaderTest, Compression) { + if (!Snappy_Supported()) { + return; + } + + Options options; + options.env = &mock_env_; + options.cf_paths.emplace_back( + test::PerThreadDBPath(&mock_env_, "BlobFileReaderTest_Compression"), 0); + options.enable_blob_files = true; + + ImmutableCFOptions immutable_cf_options(options); + + constexpr uint32_t column_family_id = 1; + constexpr bool has_ttl = false; + constexpr ExpirationRange expiration_range; + constexpr uint64_t blob_file_number = 1; + constexpr char key[] = "key"; + constexpr char blob[] = "blob"; + + uint64_t blob_offset = 0; + uint64_t blob_size = 0; + + WriteBlobFile(immutable_cf_options, column_family_id, has_ttl, + expiration_range, expiration_range, blob_file_number, key, blob, + kSnappyCompression, &blob_offset, &blob_size); + + constexpr HistogramImpl* blob_file_read_hist = nullptr; + + std::unique_ptr reader; + + ASSERT_OK(BlobFileReader::Create(immutable_cf_options, FileOptions(), + column_family_id, blob_file_read_hist, + blob_file_number, &reader)); + + // Make sure the blob can be retrieved with and without checksum verification + ReadOptions read_options; + read_options.verify_checksums = false; + + { + PinnableSlice value; + + ASSERT_OK(reader->GetBlob(read_options, key, blob_offset, blob_size, + kSnappyCompression, &value)); + ASSERT_EQ(value, blob); + } + + read_options.verify_checksums = true; + + { + PinnableSlice value; + + ASSERT_OK(reader->GetBlob(read_options, key, blob_offset, blob_size, + kSnappyCompression, &value)); + ASSERT_EQ(value, blob); + } +} + +TEST_F(BlobFileReaderTest, UncompressionError) { + if (!Snappy_Supported()) { + return; + } + + Options options; + options.env = &mock_env_; + options.cf_paths.emplace_back( + test::PerThreadDBPath(&mock_env_, + "BlobFileReaderTest_UncompressionError"), + 0); + options.enable_blob_files = true; + + ImmutableCFOptions immutable_cf_options(options); + + constexpr uint32_t column_family_id = 1; + constexpr bool has_ttl = false; + constexpr ExpirationRange expiration_range; + constexpr uint64_t blob_file_number = 1; + constexpr char key[] = "key"; + constexpr char blob[] = "blob"; + + uint64_t blob_offset = 0; + uint64_t blob_size = 0; + + WriteBlobFile(immutable_cf_options, column_family_id, has_ttl, + expiration_range, expiration_range, blob_file_number, key, blob, + kSnappyCompression, &blob_offset, &blob_size); + + constexpr HistogramImpl* blob_file_read_hist = nullptr; + + std::unique_ptr reader; + + ASSERT_OK(BlobFileReader::Create(immutable_cf_options, FileOptions(), + column_family_id, blob_file_read_hist, + blob_file_number, &reader)); + + SyncPoint::GetInstance()->SetCallBack( + "BlobFileReader::UncompressBlobIfNeeded:TamperWithResult", [](void* arg) { + CacheAllocationPtr* const output = + static_cast(arg); + assert(output); + + output->reset(); + }); + + SyncPoint::GetInstance()->EnableProcessing(); + + PinnableSlice value; + + ASSERT_TRUE(reader + ->GetBlob(ReadOptions(), key, blob_offset, blob_size, + kSnappyCompression, &value) + .IsCorruption()); + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); +} + +class BlobFileReaderIOErrorTest + : public testing::Test, + public testing::WithParamInterface { + protected: + BlobFileReaderIOErrorTest() + : mock_env_(Env::Default()), + fault_injection_env_(&mock_env_), + sync_point_(GetParam()) {} + + MockEnv mock_env_; + FaultInjectionTestEnv fault_injection_env_; + std::string sync_point_; +}; + +INSTANTIATE_TEST_CASE_P(BlobFileReaderTest, BlobFileReaderIOErrorTest, + ::testing::ValuesIn(std::vector{ + "BlobFileReader::OpenFile:GetFileSize", + "BlobFileReader::OpenFile:NewRandomAccessFile", + "BlobFileReader::ReadHeader:ReadFromFile", + "BlobFileReader::ReadFooter:ReadFromFile", + "BlobFileReader::GetBlob:ReadFromFile"})); + +TEST_P(BlobFileReaderIOErrorTest, IOError) { + // Simulates an I/O error during the specified step + + Options options; + options.env = &fault_injection_env_; + options.cf_paths.emplace_back( + test::PerThreadDBPath(&fault_injection_env_, + "BlobFileReaderIOErrorTest_IOError"), + 0); + options.enable_blob_files = true; + + ImmutableCFOptions immutable_cf_options(options); + + constexpr uint32_t column_family_id = 1; + constexpr bool has_ttl = false; + constexpr ExpirationRange expiration_range; + constexpr uint64_t blob_file_number = 1; + constexpr char key[] = "key"; + constexpr char blob[] = "blob"; + + uint64_t blob_offset = 0; + uint64_t blob_size = 0; + + WriteBlobFile(immutable_cf_options, column_family_id, has_ttl, + expiration_range, expiration_range, blob_file_number, key, blob, + kNoCompression, &blob_offset, &blob_size); + + SyncPoint::GetInstance()->SetCallBack(sync_point_, [this](void* /* arg */) { + fault_injection_env_.SetFilesystemActive(false, + Status::IOError(sync_point_)); + }); + SyncPoint::GetInstance()->EnableProcessing(); + + constexpr HistogramImpl* blob_file_read_hist = nullptr; + + std::unique_ptr reader; + + const Status s = BlobFileReader::Create(immutable_cf_options, FileOptions(), + column_family_id, blob_file_read_hist, + blob_file_number, &reader); + + const bool fail_during_create = + (sync_point_ != "BlobFileReader::GetBlob:ReadFromFile"); + + if (fail_during_create) { + ASSERT_TRUE(s.IsIOError()); + } else { + ASSERT_OK(s); + + PinnableSlice value; + + ASSERT_TRUE(reader + ->GetBlob(ReadOptions(), key, blob_offset, blob_size, + kNoCompression, &value) + .IsIOError()); + } + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); +} + +class BlobFileReaderDecodingErrorTest + : public testing::Test, + public testing::WithParamInterface { + protected: + BlobFileReaderDecodingErrorTest() + : mock_env_(Env::Default()), sync_point_(GetParam()) {} + + MockEnv mock_env_; + std::string sync_point_; +}; + +INSTANTIATE_TEST_CASE_P(BlobFileReaderTest, BlobFileReaderDecodingErrorTest, + ::testing::ValuesIn(std::vector{ + "BlobFileReader::ReadHeader:TamperWithResult", + "BlobFileReader::ReadFooter:TamperWithResult", + "BlobFileReader::GetBlob:TamperWithResult"})); + +TEST_P(BlobFileReaderDecodingErrorTest, DecodingError) { + Options options; + options.env = &mock_env_; + options.cf_paths.emplace_back( + test::PerThreadDBPath(&mock_env_, + "BlobFileReaderDecodingErrorTest_DecodingError"), + 0); + options.enable_blob_files = true; + + ImmutableCFOptions immutable_cf_options(options); + + constexpr uint32_t column_family_id = 1; + constexpr bool has_ttl = false; + constexpr ExpirationRange expiration_range; + constexpr uint64_t blob_file_number = 1; + constexpr char key[] = "key"; + constexpr char blob[] = "blob"; + + uint64_t blob_offset = 0; + uint64_t blob_size = 0; + + WriteBlobFile(immutable_cf_options, column_family_id, has_ttl, + expiration_range, expiration_range, blob_file_number, key, blob, + kNoCompression, &blob_offset, &blob_size); + + SyncPoint::GetInstance()->SetCallBack(sync_point_, [](void* arg) { + Slice* const slice = static_cast(arg); + assert(slice); + assert(!slice->empty()); + + slice->remove_prefix(1); + }); + + SyncPoint::GetInstance()->EnableProcessing(); + + constexpr HistogramImpl* blob_file_read_hist = nullptr; + + std::unique_ptr reader; + + const Status s = BlobFileReader::Create(immutable_cf_options, FileOptions(), + column_family_id, blob_file_read_hist, + blob_file_number, &reader); + + const bool fail_during_create = + sync_point_ != "BlobFileReader::GetBlob:TamperWithResult"; + + if (fail_during_create) { + ASSERT_TRUE(s.IsCorruption()); + } else { + ASSERT_OK(s); + + PinnableSlice value; + + ASSERT_TRUE(reader + ->GetBlob(ReadOptions(), key, blob_offset, blob_size, + kNoCompression, &value) + .IsCorruption()); + } + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); +} + +} // namespace ROCKSDB_NAMESPACE + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/db/blob/blob_index.h b/db/blob/blob_index.h index ec757f1ec..27927cb38 100644 --- a/db/blob/blob_index.h +++ b/db/blob/blob_index.h @@ -7,8 +7,9 @@ #include #include -#include "rocksdb/options.h" +#include "rocksdb/compression_type.h" #include "util/coding.h" +#include "util/compression.h" #include "util/string_util.h" namespace ROCKSDB_NAMESPACE { @@ -82,6 +83,11 @@ class BlobIndex { return size_; } + CompressionType compression() const { + assert(!IsInlined()); + return compression_; + } + Status DecodeFrom(Slice slice) { static const std::string kErrorMessage = "Error while decoding blob index"; assert(slice.size() > 0); @@ -117,7 +123,8 @@ class BlobIndex { oss << "[inlined blob] value:" << value_.ToString(output_hex); } else { oss << "[blob ref] file:" << file_number_ << " offset:" << offset_ - << " size:" << size_; + << " size:" << size_ + << " compression: " << CompressionTypeToString(compression_); } if (HasTTL()) { diff --git a/db/blob/blob_log_format.cc b/db/blob/blob_log_format.cc index 482bd078e..b5cd0bdcc 100644 --- a/db/blob/blob_log_format.cc +++ b/db/blob/blob_log_format.cc @@ -95,6 +95,10 @@ Status BlobLogFooter::DecodeFrom(Slice src) { return Status::OK(); } +uint64_t BlobLogRecord::CalculateAdjustmentForRecordHeader(uint64_t key_size) { + return key_size + kHeaderSize; +} + void BlobLogRecord::EncodeHeaderTo(std::string* dst) { assert(dst != nullptr); dst->clear(); diff --git a/db/blob/blob_log_format.h b/db/blob/blob_log_format.h index 9cd86ed18..afeb8d370 100644 --- a/db/blob/blob_log_format.h +++ b/db/blob/blob_log_format.h @@ -104,6 +104,11 @@ struct BlobLogRecord { // header include fields up to blob CRC static constexpr size_t kHeaderSize = 32; + // Note that the offset field of BlobIndex actually points to the blob value + // as opposed to the start of the blob record. The following method can + // be used to calculate the adjustment needed to read the blob record header. + static uint64_t CalculateAdjustmentForRecordHeader(uint64_t key_size); + uint64_t key_size = 0; uint64_t value_size = 0; uint64_t expiration = 0; @@ -123,4 +128,19 @@ struct BlobLogRecord { Status CheckBlobCRC() const; }; +// Checks whether a blob offset is potentially valid or not. +inline bool IsValidBlobOffset(uint64_t value_offset, uint64_t key_size, + uint64_t value_size, uint64_t file_size) { + if (value_offset < + BlobLogHeader::kSize + BlobLogRecord::kHeaderSize + key_size) { + return false; + } + + if (value_offset + value_size + BlobLogFooter::kSize > file_size) { + return false; + } + + return true; +} + } // namespace ROCKSDB_NAMESPACE diff --git a/db/internal_stats.cc b/db/internal_stats.cc index 57abdcb58..afe3f7141 100644 --- a/db/internal_stats.cc +++ b/db/internal_stats.cc @@ -13,6 +13,7 @@ #include #include #include +#include #include #include #include @@ -1386,21 +1387,26 @@ void InternalStats::DumpCFStatsNoFileHistogram(std::string* value) { } void InternalStats::DumpCFFileHistogram(std::string* value) { - char buf[2000]; - snprintf(buf, sizeof(buf), - "\n** File Read Latency Histogram By Level [%s] **\n", - cfd_->GetName().c_str()); - value->append(buf); + assert(value); + assert(cfd_); + + std::ostringstream oss; + oss << "\n** File Read Latency Histogram By Level [" << cfd_->GetName() + << "] **\n"; for (int level = 0; level < number_levels_; level++) { if (!file_read_latency_[level].Empty()) { - char buf2[5000]; - snprintf(buf2, sizeof(buf2), - "** Level %d read latency histogram (micros):\n%s\n", level, - file_read_latency_[level].ToString().c_str()); - value->append(buf2); + oss << "** Level " << level << " read latency histogram (micros):\n" + << file_read_latency_[level].ToString() << '\n'; } } + + if (!blob_file_read_latency_.Empty()) { + oss << "** Blob file read latency histogram (micros):\n" + << blob_file_read_latency_.ToString() << '\n'; + } + + *value = oss.str(); } #else diff --git a/db/internal_stats.h b/db/internal_stats.h index 1e2de502a..edb2c0582 100644 --- a/db/internal_stats.h +++ b/db/internal_stats.h @@ -335,6 +335,7 @@ class InternalStats { for (auto& h : file_read_latency_) { h.Clear(); } + blob_file_read_latency_.Clear(); cf_stats_snapshot_.Clear(); db_stats_snapshot_.Clear(); bg_error_count_ = 0; @@ -375,6 +376,8 @@ class InternalStats { return &file_read_latency_[level]; } + HistogramImpl* GetBlobFileReadHist() { return &blob_file_read_latency_; } + uint64_t GetBackgroundErrorCount() const { return bg_error_count_; } uint64_t BumpAndGetBackgroundErrorCount() { return ++bg_error_count_; } @@ -426,6 +429,7 @@ class InternalStats { std::vector comp_stats_; std::vector comp_stats_by_pri_; std::vector file_read_latency_; + HistogramImpl blob_file_read_latency_; // Used to compute per-interval statistics struct CFStatsSnapshot { diff --git a/src.mk b/src.mk index c6e1defeb..aedc75ceb 100644 --- a/src.mk +++ b/src.mk @@ -9,6 +9,7 @@ LIB_SOURCES = \ db/blob/blob_file_builder.cc \ db/blob/blob_file_garbage.cc \ db/blob/blob_file_meta.cc \ + db/blob/blob_file_reader.cc \ db/blob/blob_log_format.cc \ db/blob/blob_log_reader.cc \ db/blob/blob_log_writer.cc \ @@ -356,6 +357,7 @@ TEST_MAIN_SOURCES = \ db/blob/blob_file_addition_test.cc \ db/blob/blob_file_builder_test.cc \ db/blob/blob_file_garbage_test.cc \ + db/blob/blob_file_reader_test.cc \ db/blob/db_blob_index_test.cc \ db/column_family_test.cc \ db/compact_files_test.cc \