diff --git a/CMakeLists.txt b/CMakeLists.txt index 9ac79153a..894c39987 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -652,6 +652,7 @@ set(SOURCES db/blob/blob_log_format.cc db/blob/blob_log_sequential_reader.cc db/blob/blob_log_writer.cc + db/blob/prefetch_buffer_collection.cc db/builder.cc db/c.cc db/column_family.cc diff --git a/TARGETS b/TARGETS index a842673a8..903d7e5f3 100644 --- a/TARGETS +++ b/TARGETS @@ -159,6 +159,7 @@ cpp_library( "db/blob/blob_log_format.cc", "db/blob/blob_log_sequential_reader.cc", "db/blob/blob_log_writer.cc", + "db/blob/prefetch_buffer_collection.cc", "db/builder.cc", "db/c.cc", "db/column_family.cc", @@ -487,6 +488,7 @@ cpp_library( "db/blob/blob_log_format.cc", "db/blob/blob_log_sequential_reader.cc", "db/blob/blob_log_writer.cc", + "db/blob/prefetch_buffer_collection.cc", "db/builder.cc", "db/c.cc", "db/column_family.cc", diff --git a/db/blob/blob_fetcher.cc b/db/blob/blob_fetcher.cc index a42a4be5f..124429f93 100644 --- a/db/blob/blob_fetcher.cc +++ b/db/blob/blob_fetcher.cc @@ -9,14 +9,26 @@ namespace ROCKSDB_NAMESPACE { -Status BlobFetcher::FetchBlob(const Slice& user_key, const Slice& blob_index, - PinnableSlice* blob_value) { - Status s; +Status BlobFetcher::FetchBlob(const Slice& user_key, + const Slice& blob_index_slice, + FilePrefetchBuffer* prefetch_buffer, + PinnableSlice* blob_value, + uint64_t* bytes_read) const { assert(version_); - constexpr uint64_t* bytes_read = nullptr; - s = version_->GetBlob(read_options_, user_key, blob_index, blob_value, - bytes_read); - return s; + + return version_->GetBlob(read_options_, user_key, blob_index_slice, + prefetch_buffer, blob_value, bytes_read); } -} // namespace ROCKSDB_NAMESPACE \ No newline at end of file +Status BlobFetcher::FetchBlob(const Slice& user_key, + const BlobIndex& blob_index, + FilePrefetchBuffer* prefetch_buffer, + PinnableSlice* blob_value, + uint64_t* bytes_read) const { + assert(version_); + + return version_->GetBlob(read_options_, user_key, blob_index, prefetch_buffer, + blob_value, bytes_read); +} + +} // namespace ROCKSDB_NAMESPACE diff --git a/db/blob/blob_fetcher.h b/db/blob/blob_fetcher.h index 747057f09..8aeaf965d 100644 --- a/db/blob/blob_fetcher.h +++ b/db/blob/blob_fetcher.h @@ -9,18 +9,29 @@ #include "rocksdb/status.h" namespace ROCKSDB_NAMESPACE { -class Version; +class Version; +class Slice; +class FilePrefetchBuffer; +class PinnableSlice; +class BlobIndex; + +// A thin wrapper around the blob retrieval functionality of Version. class BlobFetcher { public: - BlobFetcher(Version* version, const ReadOptions& read_options) + BlobFetcher(const Version* version, const ReadOptions& read_options) : version_(version), read_options_(read_options) {} - Status FetchBlob(const Slice& user_key, const Slice& blob_index, - PinnableSlice* blob_value); + Status FetchBlob(const Slice& user_key, const Slice& blob_index_slice, + FilePrefetchBuffer* prefetch_buffer, + PinnableSlice* blob_value, uint64_t* bytes_read) const; + + Status FetchBlob(const Slice& user_key, const BlobIndex& blob_index, + FilePrefetchBuffer* prefetch_buffer, + PinnableSlice* blob_value, uint64_t* bytes_read) const; private: - Version* version_; + const Version* version_; ReadOptions read_options_; }; -} // namespace ROCKSDB_NAMESPACE \ No newline at end of file +} // namespace ROCKSDB_NAMESPACE diff --git a/db/blob/blob_file_reader.cc b/db/blob/blob_file_reader.cc index 0cb49ae62..981261001 100644 --- a/db/blob/blob_file_reader.cc +++ b/db/blob/blob_file_reader.cc @@ -9,6 +9,7 @@ #include #include "db/blob/blob_log_format.h" +#include "file/file_prefetch_buffer.h" #include "file/filename.h" #include "monitoring/statistics.h" #include "options/cf_options.h" @@ -282,6 +283,7 @@ Status BlobFileReader::GetBlob(const ReadOptions& read_options, const Slice& user_key, uint64_t offset, uint64_t value_size, CompressionType compression_type, + FilePrefetchBuffer* prefetch_buffer, PinnableSlice* value, uint64_t* bytes_read) const { assert(value); @@ -313,7 +315,21 @@ Status BlobFileReader::GetBlob(const ReadOptions& read_options, Buffer buf; AlignedBuf aligned_buf; - { + bool prefetched = false; + + if (prefetch_buffer) { + Status s; + constexpr bool for_compaction = true; + + prefetched = prefetch_buffer->TryReadFromCache( + IOOptions(), file_reader_.get(), record_offset, + static_cast(record_size), &record_slice, &s, for_compaction); + if (!s.ok()) { + return s; + } + } + + if (!prefetched) { TEST_SYNC_POINT("BlobFileReader::GetBlob:ReadFromFile"); const Status s = ReadFromFile(file_reader_.get(), record_offset, @@ -322,11 +338,11 @@ Status BlobFileReader::GetBlob(const ReadOptions& read_options, if (!s.ok()) { return s; } - - TEST_SYNC_POINT_CALLBACK("BlobFileReader::GetBlob:TamperWithResult", - &record_slice); } + TEST_SYNC_POINT_CALLBACK("BlobFileReader::GetBlob:TamperWithResult", + &record_slice); + if (read_options.verify_checksums) { const Status s = VerifyBlob(record_slice, user_key, value_size); if (!s.ok()) { diff --git a/db/blob/blob_file_reader.h b/db/blob/blob_file_reader.h index b7da393f3..ffd1d11d5 100644 --- a/db/blob/blob_file_reader.h +++ b/db/blob/blob_file_reader.h @@ -21,6 +21,7 @@ struct FileOptions; class HistogramImpl; struct ReadOptions; class Slice; +class FilePrefetchBuffer; class PinnableSlice; class Statistics; @@ -41,7 +42,8 @@ class BlobFileReader { Status GetBlob(const ReadOptions& read_options, const Slice& user_key, uint64_t offset, uint64_t value_size, - CompressionType compression_type, PinnableSlice* value, + CompressionType compression_type, + FilePrefetchBuffer* prefetch_buffer, PinnableSlice* value, uint64_t* bytes_read) const; // offsets must be sorted in ascending order by caller. diff --git a/db/blob/blob_file_reader_test.cc b/db/blob/blob_file_reader_test.cc index ebafef834..8cbb982a8 100644 --- a/db/blob/blob_file_reader_test.cc +++ b/db/blob/blob_file_reader_test.cc @@ -179,13 +179,15 @@ TEST_F(BlobFileReaderTest, CreateReaderAndGetBlob) { ReadOptions read_options; read_options.verify_checksums = false; + constexpr FilePrefetchBuffer* prefetch_buffer = nullptr; + { PinnableSlice value; uint64_t bytes_read = 0; ASSERT_OK(reader->GetBlob(read_options, keys[0], blob_offsets[0], - blob_sizes[0], kNoCompression, &value, - &bytes_read)); + blob_sizes[0], kNoCompression, prefetch_buffer, + &value, &bytes_read)); ASSERT_EQ(value, blobs[0]); ASSERT_EQ(bytes_read, blob_sizes[0]); @@ -222,8 +224,8 @@ TEST_F(BlobFileReaderTest, CreateReaderAndGetBlob) { uint64_t bytes_read = 0; ASSERT_OK(reader->GetBlob(read_options, keys[1], blob_offsets[1], - blob_sizes[1], kNoCompression, &value, - &bytes_read)); + blob_sizes[1], kNoCompression, prefetch_buffer, + &value, &bytes_read)); ASSERT_EQ(value, blobs[1]); const uint64_t key_size = keys[1].size(); @@ -239,8 +241,8 @@ TEST_F(BlobFileReaderTest, CreateReaderAndGetBlob) { ASSERT_TRUE(reader ->GetBlob(read_options, keys[0], blob_offsets[0] - 1, - blob_sizes[0], kNoCompression, &value, - &bytes_read) + blob_sizes[0], kNoCompression, prefetch_buffer, + &value, &bytes_read) .IsCorruption()); ASSERT_EQ(bytes_read, 0); } @@ -252,8 +254,8 @@ TEST_F(BlobFileReaderTest, CreateReaderAndGetBlob) { ASSERT_TRUE(reader ->GetBlob(read_options, keys[2], blob_offsets[2] + 1, - blob_sizes[2], kNoCompression, &value, - &bytes_read) + blob_sizes[2], kNoCompression, prefetch_buffer, + &value, &bytes_read) .IsCorruption()); ASSERT_EQ(bytes_read, 0); } @@ -265,7 +267,8 @@ TEST_F(BlobFileReaderTest, CreateReaderAndGetBlob) { ASSERT_TRUE(reader ->GetBlob(read_options, keys[0], blob_offsets[0], - blob_sizes[0], kZSTD, &value, &bytes_read) + blob_sizes[0], kZSTD, prefetch_buffer, &value, + &bytes_read) .IsCorruption()); ASSERT_EQ(bytes_read, 0); } @@ -280,8 +283,8 @@ TEST_F(BlobFileReaderTest, CreateReaderAndGetBlob) { ->GetBlob(read_options, shorter_key, blob_offsets[0] - (keys[0].size() - sizeof(shorter_key) + 1), - blob_sizes[0], kNoCompression, &value, - &bytes_read) + blob_sizes[0], kNoCompression, prefetch_buffer, + &value, &bytes_read) .IsCorruption()); ASSERT_EQ(bytes_read, 0); @@ -323,8 +326,8 @@ TEST_F(BlobFileReaderTest, CreateReaderAndGetBlob) { ASSERT_TRUE(reader ->GetBlob(read_options, incorrect_key, blob_offsets[0], - blob_sizes[0], kNoCompression, &value, - &bytes_read) + blob_sizes[0], kNoCompression, prefetch_buffer, + &value, &bytes_read) .IsCorruption()); ASSERT_EQ(bytes_read, 0); @@ -363,8 +366,8 @@ TEST_F(BlobFileReaderTest, CreateReaderAndGetBlob) { ASSERT_TRUE(reader ->GetBlob(read_options, keys[1], blob_offsets[1], - blob_sizes[1] + 1, kNoCompression, &value, - &bytes_read) + blob_sizes[1] + 1, kNoCompression, + prefetch_buffer, &value, &bytes_read) .IsCorruption()); ASSERT_EQ(bytes_read, 0); @@ -642,12 +645,14 @@ TEST_F(BlobFileReaderTest, BlobCRCError) { SyncPoint::GetInstance()->EnableProcessing(); + constexpr FilePrefetchBuffer* prefetch_buffer = nullptr; PinnableSlice value; uint64_t bytes_read = 0; ASSERT_TRUE(reader ->GetBlob(ReadOptions(), key, blob_offset, blob_size, - kNoCompression, &value, &bytes_read) + kNoCompression, prefetch_buffer, &value, + &bytes_read) .IsCorruption()); ASSERT_EQ(bytes_read, 0); @@ -695,12 +700,15 @@ TEST_F(BlobFileReaderTest, Compression) { ReadOptions read_options; read_options.verify_checksums = false; + constexpr FilePrefetchBuffer* prefetch_buffer = nullptr; + { PinnableSlice value; uint64_t bytes_read = 0; ASSERT_OK(reader->GetBlob(read_options, key, blob_offset, blob_size, - kSnappyCompression, &value, &bytes_read)); + kSnappyCompression, prefetch_buffer, &value, + &bytes_read)); ASSERT_EQ(value, blob); ASSERT_EQ(bytes_read, blob_size); } @@ -712,7 +720,8 @@ TEST_F(BlobFileReaderTest, Compression) { uint64_t bytes_read = 0; ASSERT_OK(reader->GetBlob(read_options, key, blob_offset, blob_size, - kSnappyCompression, &value, &bytes_read)); + kSnappyCompression, prefetch_buffer, &value, + &bytes_read)); ASSERT_EQ(value, blob); constexpr uint64_t key_size = sizeof(key) - 1; @@ -770,12 +779,14 @@ TEST_F(BlobFileReaderTest, UncompressionError) { SyncPoint::GetInstance()->EnableProcessing(); + constexpr FilePrefetchBuffer* prefetch_buffer = nullptr; PinnableSlice value; uint64_t bytes_read = 0; ASSERT_TRUE(reader ->GetBlob(ReadOptions(), key, blob_offset, blob_size, - kSnappyCompression, &value, &bytes_read) + kSnappyCompression, prefetch_buffer, &value, + &bytes_read) .IsCorruption()); ASSERT_EQ(bytes_read, 0); @@ -854,12 +865,14 @@ TEST_P(BlobFileReaderIOErrorTest, IOError) { } else { ASSERT_OK(s); + constexpr FilePrefetchBuffer* prefetch_buffer = nullptr; PinnableSlice value; uint64_t bytes_read = 0; ASSERT_TRUE(reader ->GetBlob(ReadOptions(), key, blob_offset, blob_size, - kNoCompression, &value, &bytes_read) + kNoCompression, prefetch_buffer, &value, + &bytes_read) .IsIOError()); ASSERT_EQ(bytes_read, 0); } @@ -937,12 +950,14 @@ TEST_P(BlobFileReaderDecodingErrorTest, DecodingError) { } else { ASSERT_OK(s); + constexpr FilePrefetchBuffer* prefetch_buffer = nullptr; PinnableSlice value; uint64_t bytes_read = 0; ASSERT_TRUE(reader ->GetBlob(ReadOptions(), key, blob_offset, blob_size, - kNoCompression, &value, &bytes_read) + kNoCompression, prefetch_buffer, &value, + &bytes_read) .IsCorruption()); ASSERT_EQ(bytes_read, 0); } diff --git a/db/blob/db_blob_compaction_test.cc b/db/blob/db_blob_compaction_test.cc index ee4acc4bf..aff47896d 100644 --- a/db/blob/db_blob_compaction_test.cc +++ b/db/blob/db_blob_compaction_test.cc @@ -590,6 +590,124 @@ TEST_F(DBBlobCompactionTest, MergeBlobWithBase) { Close(); } +TEST_F(DBBlobCompactionTest, CompactionReadaheadGarbageCollection) { + Options options = GetDefaultOptions(); + options.enable_blob_files = true; + options.min_blob_size = 0; + options.enable_blob_garbage_collection = true; + options.blob_garbage_collection_age_cutoff = 1.0; + options.blob_compaction_readahead_size = 1 << 10; + options.disable_auto_compactions = true; + + Reopen(options); + + ASSERT_OK(Put("key", "lime")); + ASSERT_OK(Put("foo", "bar")); + ASSERT_OK(Flush()); + + ASSERT_OK(Put("key", "pie")); + ASSERT_OK(Put("foo", "baz")); + ASSERT_OK(Flush()); + + size_t num_non_prefetch_reads = 0; + SyncPoint::GetInstance()->SetCallBack( + "BlobFileReader::GetBlob:ReadFromFile", + [&num_non_prefetch_reads](void* /* arg */) { ++num_non_prefetch_reads; }); + SyncPoint::GetInstance()->EnableProcessing(); + + constexpr Slice* begin = nullptr; + constexpr Slice* end = nullptr; + + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), begin, end)); + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + + ASSERT_EQ(Get("key"), "pie"); + ASSERT_EQ(Get("foo"), "baz"); + ASSERT_EQ(num_non_prefetch_reads, 0); + + Close(); +} + +TEST_F(DBBlobCompactionTest, CompactionReadaheadFilter) { + Options options = GetDefaultOptions(); + + std::unique_ptr compaction_filter_guard( + new ValueMutationFilter("pie")); + + options.compaction_filter = compaction_filter_guard.get(); + options.enable_blob_files = true; + options.min_blob_size = 0; + options.blob_compaction_readahead_size = 1 << 10; + options.disable_auto_compactions = true; + + Reopen(options); + + ASSERT_OK(Put("key", "lime")); + ASSERT_OK(Put("foo", "bar")); + ASSERT_OK(Flush()); + + size_t num_non_prefetch_reads = 0; + SyncPoint::GetInstance()->SetCallBack( + "BlobFileReader::GetBlob:ReadFromFile", + [&num_non_prefetch_reads](void* /* arg */) { ++num_non_prefetch_reads; }); + SyncPoint::GetInstance()->EnableProcessing(); + + constexpr Slice* begin = nullptr; + constexpr Slice* end = nullptr; + + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), begin, end)); + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + + ASSERT_EQ(Get("key"), "limepie"); + ASSERT_EQ(Get("foo"), "barpie"); + ASSERT_EQ(num_non_prefetch_reads, 0); + + Close(); +} + +TEST_F(DBBlobCompactionTest, CompactionReadaheadMerge) { + Options options = GetDefaultOptions(); + options.enable_blob_files = true; + options.min_blob_size = 0; + options.blob_compaction_readahead_size = 1 << 10; + options.merge_operator = MergeOperators::CreateStringAppendOperator(); + options.disable_auto_compactions = true; + + Reopen(options); + + ASSERT_OK(Put("key", "lime")); + ASSERT_OK(Put("foo", "bar")); + ASSERT_OK(Flush()); + + ASSERT_OK(Merge("key", "pie")); + ASSERT_OK(Merge("foo", "baz")); + ASSERT_OK(Flush()); + + size_t num_non_prefetch_reads = 0; + SyncPoint::GetInstance()->SetCallBack( + "BlobFileReader::GetBlob:ReadFromFile", + [&num_non_prefetch_reads](void* /* arg */) { ++num_non_prefetch_reads; }); + SyncPoint::GetInstance()->EnableProcessing(); + + constexpr Slice* begin = nullptr; + constexpr Slice* end = nullptr; + + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), begin, end)); + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + + ASSERT_EQ(Get("key"), "lime,pie"); + ASSERT_EQ(Get("foo"), "bar,baz"); + ASSERT_EQ(num_non_prefetch_reads, 0); + + Close(); +} + } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) { diff --git a/db/blob/prefetch_buffer_collection.cc b/db/blob/prefetch_buffer_collection.cc new file mode 100644 index 000000000..079576f51 --- /dev/null +++ b/db/blob/prefetch_buffer_collection.cc @@ -0,0 +1,21 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "db/blob/prefetch_buffer_collection.h" + +namespace ROCKSDB_NAMESPACE { + +FilePrefetchBuffer* PrefetchBufferCollection::GetOrCreatePrefetchBuffer( + uint64_t file_number) { + auto& prefetch_buffer = prefetch_buffers_[file_number]; + if (!prefetch_buffer) { + prefetch_buffer.reset( + new FilePrefetchBuffer(readahead_size_, readahead_size_)); + } + + return prefetch_buffer.get(); +} + +} // namespace ROCKSDB_NAMESPACE diff --git a/db/blob/prefetch_buffer_collection.h b/db/blob/prefetch_buffer_collection.h new file mode 100644 index 000000000..b973eddc0 --- /dev/null +++ b/db/blob/prefetch_buffer_collection.h @@ -0,0 +1,38 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#include +#include +#include +#include + +#include "file/file_prefetch_buffer.h" +#include "rocksdb/rocksdb_namespace.h" + +namespace ROCKSDB_NAMESPACE { + +// A class that owns a collection of FilePrefetchBuffers using the file number +// as key. Used for implementing compaction readahead for blob files. Designed +// to be accessed by a single thread only: every (sub)compaction needs its own +// buffers since they are guaranteed to read different blobs from different +// positions even when reading the same file. +class PrefetchBufferCollection { + public: + explicit PrefetchBufferCollection(uint64_t readahead_size) + : readahead_size_(readahead_size) { + assert(readahead_size_ > 0); + } + + FilePrefetchBuffer* GetOrCreatePrefetchBuffer(uint64_t file_number); + + private: + uint64_t readahead_size_; + std::unordered_map> + prefetch_buffers_; // maps file number to prefetch buffer +}; + +} // namespace ROCKSDB_NAMESPACE diff --git a/db/c.cc b/db/c.cc index 89dfa5303..dda5f5560 100644 --- a/db/c.cc +++ b/db/c.cc @@ -2760,6 +2760,16 @@ double rocksdb_options_get_blob_gc_force_threshold(rocksdb_options_t* opt) { return opt->rep.blob_garbage_collection_force_threshold; } +void rocksdb_options_set_blob_compaction_readahead_size(rocksdb_options_t* opt, + uint64_t val) { + opt->rep.blob_compaction_readahead_size = val; +} + +uint64_t rocksdb_options_get_blob_compaction_readahead_size( + rocksdb_options_t* opt) { + return opt->rep.blob_compaction_readahead_size; +} + void rocksdb_options_set_num_levels(rocksdb_options_t* opt, int n) { opt->rep.num_levels = n; } diff --git a/db/c_test.c b/db/c_test.c index fb8f65635..8f7e05ca2 100644 --- a/db/c_test.c +++ b/db/c_test.c @@ -1799,6 +1799,10 @@ int main(int argc, char** argv) { rocksdb_options_set_blob_gc_force_threshold(o, 0.75); CheckCondition(0.75 == rocksdb_options_get_blob_gc_force_threshold(o)); + rocksdb_options_set_blob_compaction_readahead_size(o, 262144); + CheckCondition(262144 == + rocksdb_options_get_blob_compaction_readahead_size(o)); + // Create a copy that should be equal to the original. rocksdb_options_t* copy; copy = rocksdb_options_create_copy(o); diff --git a/db/compaction/compaction_iteration_stats.h b/db/compaction/compaction_iteration_stats.h index 910c4469a..1b1c28b57 100644 --- a/db/compaction/compaction_iteration_stats.h +++ b/db/compaction/compaction_iteration_stats.h @@ -9,6 +9,8 @@ #include "rocksdb/rocksdb_namespace.h" +namespace ROCKSDB_NAMESPACE { + struct CompactionIterationStats { // Compaction statistics @@ -43,3 +45,5 @@ struct CompactionIterationStats { uint64_t num_blobs_relocated = 0; uint64_t total_blob_bytes_relocated = 0; }; + +} // namespace ROCKSDB_NAMESPACE diff --git a/db/compaction/compaction_iterator.cc b/db/compaction/compaction_iterator.cc index cd6a28d8f..5ccd1a922 100644 --- a/db/compaction/compaction_iterator.cc +++ b/db/compaction/compaction_iterator.cc @@ -8,8 +8,10 @@ #include #include +#include "db/blob/blob_fetcher.h" #include "db/blob/blob_file_builder.h" #include "db/blob/blob_index.h" +#include "db/blob/prefetch_buffer_collection.h" #include "db/snapshot_checker.h" #include "logging/logging.h" #include "port/likely.h" @@ -88,6 +90,9 @@ CompactionIterator::CompactionIterator( merge_out_iter_(merge_helper_), blob_garbage_collection_cutoff_file_number_( ComputeBlobGarbageCollectionCutoffFileNumber(compaction_.get())), + blob_fetcher_(CreateBlobFetcherIfNeeded(compaction_.get())), + prefetch_buffers_( + CreatePrefetchBufferCollectionIfNeeded(compaction_.get())), current_key_committed_(false), cmp_with_history_ts_low_(0), level_(compaction_ == nullptr ? 0 : compaction_->level()) { @@ -225,6 +230,13 @@ bool CompactionIterator::InvokeFilterIfNeeded(bool* need_skip, compaction_filter_skip_until_.rep()); if (CompactionFilter::Decision::kUndetermined == filter && !compaction_filter_->IsStackedBlobDbInternalCompactionFilter()) { + if (compaction_ == nullptr) { + status_ = + Status::Corruption("Unexpected blob index outside of compaction"); + valid_ = false; + return false; + } + // For integrated BlobDB impl, CompactionIterator reads blob value. // For Stacked BlobDB impl, the corresponding CompactionFilter's // FilterV2 method should read the blob value. @@ -235,23 +247,19 @@ bool CompactionIterator::InvokeFilterIfNeeded(bool* need_skip, valid_ = false; return false; } - if (blob_index.HasTTL() || blob_index.IsInlined()) { - status_ = Status::Corruption("Unexpected TTL/inlined blob index"); - valid_ = false; - return false; - } - if (compaction_ == nullptr) { - status_ = - Status::Corruption("Unexpected blob index outside of compaction"); - valid_ = false; - return false; - } - const Version* const version = compaction_->input_version(); - assert(version); + + FilePrefetchBuffer* prefetch_buffer = + prefetch_buffers_ ? prefetch_buffers_->GetOrCreatePrefetchBuffer( + blob_index.file_number()) + : nullptr; uint64_t bytes_read = 0; - s = version->GetBlob(ReadOptions(), ikey_.user_key, blob_index, - &blob_value_, &bytes_read); + + assert(blob_fetcher_); + + s = blob_fetcher_->FetchBlob(ikey_.user_key, blob_index, + prefetch_buffer, &blob_value_, + &bytes_read); if (!s.ok()) { status_ = s; valid_ = false; @@ -831,15 +839,15 @@ void CompactionIterator::NextFromInput() { } pinned_iters_mgr_.StartPinning(); - Version* version = compaction_ ? compaction_->input_version() : nullptr; // We know the merge type entry is not hidden, otherwise we would // have hit (A) // We encapsulate the merge related state machine in a different // object to minimize change to the existing flow. - Status s = merge_helper_->MergeUntil(&input_, range_del_agg_, - prev_snapshot, bottommost_level_, - allow_data_in_errors_, version); + Status s = merge_helper_->MergeUntil( + &input_, range_del_agg_, prev_snapshot, bottommost_level_, + allow_data_in_errors_, blob_fetcher_.get(), prefetch_buffers_.get(), + &iter_stats_); merge_out_iter_.SeekToFirst(); if (!s.ok() && !s.IsMergeInProgress()) { @@ -959,26 +967,23 @@ void CompactionIterator::GarbageCollectBlobIfNeeded() { } } - if (blob_index.IsInlined() || blob_index.HasTTL()) { - status_ = Status::Corruption("Unexpected TTL/inlined blob index"); - valid_ = false; - - return; - } - if (blob_index.file_number() >= blob_garbage_collection_cutoff_file_number_) { return; } - const Version* const version = compaction_->input_version(); - assert(version); + FilePrefetchBuffer* prefetch_buffer = + prefetch_buffers_ ? prefetch_buffers_->GetOrCreatePrefetchBuffer( + blob_index.file_number()) + : nullptr; uint64_t bytes_read = 0; { - const Status s = version->GetBlob(ReadOptions(), user_key(), blob_index, - &blob_value_, &bytes_read); + assert(blob_fetcher_); + + const Status s = blob_fetcher_->FetchBlob( + user_key(), blob_index, prefetch_buffer, &blob_value_, &bytes_read); if (!s.ok()) { status_ = s; @@ -1151,7 +1156,7 @@ uint64_t CompactionIterator::ComputeBlobGarbageCollectionCutoffFileNumber( return 0; } - Version* const version = compaction->input_version(); + const Version* const version = compaction->input_version(); assert(version); const VersionStorageInfo* const storage_info = version->storage_info(); @@ -1167,4 +1172,42 @@ uint64_t CompactionIterator::ComputeBlobGarbageCollectionCutoffFileNumber( : std::numeric_limits::max(); } +std::unique_ptr CompactionIterator::CreateBlobFetcherIfNeeded( + const CompactionProxy* compaction) { + if (!compaction) { + return nullptr; + } + + const Version* const version = compaction->input_version(); + if (!version) { + return nullptr; + } + + return std::unique_ptr(new BlobFetcher(version, ReadOptions())); +} + +std::unique_ptr +CompactionIterator::CreatePrefetchBufferCollectionIfNeeded( + const CompactionProxy* compaction) { + if (!compaction) { + return nullptr; + } + + if (!compaction->input_version()) { + return nullptr; + } + + if (compaction->allow_mmap_reads()) { + return nullptr; + } + + const uint64_t readahead_size = compaction->blob_compaction_readahead_size(); + if (!readahead_size) { + return nullptr; + } + + return std::unique_ptr( + new PrefetchBufferCollection(readahead_size)); +} + } // namespace ROCKSDB_NAMESPACE diff --git a/db/compaction/compaction_iterator.h b/db/compaction/compaction_iterator.h index c3785a893..ed74ba061 100644 --- a/db/compaction/compaction_iterator.h +++ b/db/compaction/compaction_iterator.h @@ -23,6 +23,8 @@ namespace ROCKSDB_NAMESPACE { class BlobFileBuilder; +class BlobFetcher; +class PrefetchBufferCollection; // A wrapper of internal iterator whose purpose is to count how // many entries there are in the iterator. @@ -92,11 +94,15 @@ class CompactionIterator { virtual bool preserve_deletes() const = 0; + virtual bool allow_mmap_reads() const = 0; + virtual bool enable_blob_garbage_collection() const = 0; virtual double blob_garbage_collection_age_cutoff() const = 0; - virtual Version* input_version() const = 0; + virtual uint64_t blob_compaction_readahead_size() const = 0; + + virtual const Version* input_version() const = 0; virtual bool DoesInputReferenceBlobFiles() const = 0; @@ -137,6 +143,10 @@ class CompactionIterator { return compaction_->immutable_options()->preserve_deletes; } + bool allow_mmap_reads() const override { + return compaction_->immutable_options()->allow_mmap_reads; + } + bool enable_blob_garbage_collection() const override { return compaction_->mutable_cf_options()->enable_blob_garbage_collection; } @@ -146,7 +156,11 @@ class CompactionIterator { ->blob_garbage_collection_age_cutoff; } - Version* input_version() const override { + uint64_t blob_compaction_readahead_size() const override { + return compaction_->mutable_cf_options()->blob_compaction_readahead_size; + } + + const Version* input_version() const override { return compaction_->input_version(); } @@ -291,6 +305,10 @@ class CompactionIterator { static uint64_t ComputeBlobGarbageCollectionCutoffFileNumber( const CompactionProxy* compaction); + static std::unique_ptr CreateBlobFetcherIfNeeded( + const CompactionProxy* compaction); + static std::unique_ptr + CreatePrefetchBufferCollectionIfNeeded(const CompactionProxy* compaction); SequenceIterWrapper input_; const Comparator* cmp_; @@ -379,6 +397,9 @@ class CompactionIterator { uint64_t blob_garbage_collection_cutoff_file_number_; + std::unique_ptr blob_fetcher_; + std::unique_ptr prefetch_buffers_; + std::string blob_index_; PinnableSlice blob_value_; std::string compaction_filter_value_; diff --git a/db/compaction/compaction_iterator_test.cc b/db/compaction/compaction_iterator_test.cc index d6cc899f5..7dd50bf0e 100644 --- a/db/compaction/compaction_iterator_test.cc +++ b/db/compaction/compaction_iterator_test.cc @@ -168,11 +168,15 @@ class FakeCompaction : public CompactionIterator::CompactionProxy { bool preserve_deletes() const override { return false; } + bool allow_mmap_reads() const override { return false; } + bool enable_blob_garbage_collection() const override { return false; } double blob_garbage_collection_age_cutoff() const override { return 0.0; } - Version* input_version() const override { return nullptr; } + uint64_t blob_compaction_readahead_size() const override { return 0; } + + const Version* input_version() const override { return nullptr; } bool DoesInputReferenceBlobFiles() const override { return false; } diff --git a/db/db_iter.cc b/db/db_iter.cc index 75a196e4d..fc8ddfc04 100644 --- a/db/db_iter.cc +++ b/db/db_iter.cc @@ -192,10 +192,11 @@ bool DBIter::SetBlobValueIfNeeded(const Slice& user_key, read_options.read_tier = read_tier_; read_options.verify_checksums = verify_checksums_; + constexpr FilePrefetchBuffer* prefetch_buffer = nullptr; constexpr uint64_t* bytes_read = nullptr; const Status s = version_->GetBlob(read_options, user_key, blob_index, - &blob_value_, bytes_read); + prefetch_buffer, &blob_value_, bytes_read); if (!s.ok()) { status_ = s; diff --git a/db/merge_helper.cc b/db/merge_helper.cc index 388f4f1f2..217876849 100644 --- a/db/merge_helper.cc +++ b/db/merge_helper.cc @@ -8,6 +8,9 @@ #include #include "db/blob/blob_fetcher.h" +#include "db/blob/blob_index.h" +#include "db/blob/prefetch_buffer_collection.h" +#include "db/compaction/compaction_iteration_stats.h" #include "db/dbformat.h" #include "monitoring/perf_context_imp.h" #include "monitoring/statistics.h" @@ -121,7 +124,9 @@ Status MergeHelper::MergeUntil(InternalIterator* iter, const SequenceNumber stop_before, const bool at_bottom, const bool allow_data_in_errors, - Version* version) { + const BlobFetcher* blob_fetcher, + PrefetchBufferCollection* prefetch_buffers, + CompactionIterationStats* c_iter_stats) { // Get a copy of the internal key, before it's invalidated by iter->Next() // Also maintain the list of merge operands seen. assert(HasOperator()); @@ -212,13 +217,35 @@ Status MergeHelper::MergeUntil(InternalIterator* iter, !range_del_agg->ShouldDelete( ikey, RangeDelPositioningMode::kForwardTraversal))) { if (ikey.type == kTypeBlobIndex) { - assert(version); - BlobFetcher blob_fetcher(version, ReadOptions()); - s = blob_fetcher.FetchBlob(ikey.user_key, val, &blob_value); + BlobIndex blob_index; + + s = blob_index.DecodeFrom(val); if (!s.ok()) { return s; } + + FilePrefetchBuffer* prefetch_buffer = + prefetch_buffers ? prefetch_buffers->GetOrCreatePrefetchBuffer( + blob_index.file_number()) + : nullptr; + + uint64_t bytes_read = 0; + + assert(blob_fetcher); + + s = blob_fetcher->FetchBlob(ikey.user_key, blob_index, + prefetch_buffer, &blob_value, + &bytes_read); + if (!s.ok()) { + return s; + } + val_ptr = &blob_value; + + if (c_iter_stats) { + ++c_iter_stats->num_blobs_read; + c_iter_stats->total_blob_bytes_read += bytes_read; + } } else { val_ptr = &val; } diff --git a/db/merge_helper.h b/db/merge_helper.h index 392934bfb..ae4262806 100644 --- a/db/merge_helper.h +++ b/db/merge_helper.h @@ -26,7 +26,9 @@ class Logger; class MergeOperator; class Statistics; class SystemClock; -class Version; +class BlobFetcher; +class PrefetchBufferCollection; +struct CompactionIterationStats; class MergeHelper { public: @@ -70,6 +72,10 @@ class MergeHelper { // we could reach the start of the history of this user key. // allow_data_in_errors: (IN) if true, data details will be displayed in // error/log messages. + // blob_fetcher: (IN) blob fetcher object for the compaction's input version. + // prefetch_buffers: (IN/OUT) a collection of blob file prefetch buffers + // used for compaction readahead. + // c_iter_stats: (OUT) compaction iteration statistics. // // Returns one of the following statuses: // - OK: Entries were successfully merged. @@ -82,11 +88,12 @@ class MergeHelper { // // REQUIRED: The first key in the input is not corrupted. Status MergeUntil(InternalIterator* iter, - CompactionRangeDelAggregator* range_del_agg = nullptr, - const SequenceNumber stop_before = 0, - const bool at_bottom = false, - const bool allow_data_in_errors = false, - Version* version = nullptr); + CompactionRangeDelAggregator* range_del_agg, + const SequenceNumber stop_before, const bool at_bottom, + const bool allow_data_in_errors, + const BlobFetcher* blob_fetcher, + PrefetchBufferCollection* prefetch_buffers, + CompactionIterationStats* c_iter_stats); // Filters a merge operand using the compaction filter specified // in the constructor. Returns the decision that the filter made. diff --git a/db/merge_helper_test.cc b/db/merge_helper_test.cc index 597eb5931..f458a1af7 100644 --- a/db/merge_helper_test.cc +++ b/db/merge_helper_test.cc @@ -32,8 +32,10 @@ class MergeHelperTest : public testing::Test { merge_helper_.reset(new MergeHelper(env_, icmp_.user_comparator(), merge_op_.get(), filter_.get(), nullptr, false, latest_snapshot)); - return merge_helper_->MergeUntil(iter_.get(), nullptr /* range_del_agg */, - stop_before, at_bottom); + return merge_helper_->MergeUntil( + iter_.get(), nullptr /* range_del_agg */, stop_before, at_bottom, + false /* allow_data_in_errors */, nullptr /* blob_fetcher */, + nullptr /* prefetch_buffers */, nullptr /* c_iter_stats */); } void AddKeyVal(const std::string& user_key, const SequenceNumber& seq, diff --git a/db/version_set.cc b/db/version_set.cc index 557698e9f..0da8c8751 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -1790,12 +1790,9 @@ Version::Version(ColumnFamilyData* column_family_data, VersionSet* vset, io_tracer_(io_tracer) {} Status Version::GetBlob(const ReadOptions& read_options, const Slice& user_key, - const Slice& blob_index_slice, PinnableSlice* value, - uint64_t* bytes_read) const { - if (read_options.read_tier == kBlockCacheTier) { - return Status::Incomplete("Cannot read blob: no disk I/O allowed"); - } - + const Slice& blob_index_slice, + FilePrefetchBuffer* prefetch_buffer, + PinnableSlice* value, uint64_t* bytes_read) const { BlobIndex blob_index; { @@ -1805,14 +1802,20 @@ Status Version::GetBlob(const ReadOptions& read_options, const Slice& user_key, } } - return GetBlob(read_options, user_key, blob_index, value, bytes_read); + return GetBlob(read_options, user_key, blob_index, prefetch_buffer, value, + bytes_read); } Status Version::GetBlob(const ReadOptions& read_options, const Slice& user_key, - const BlobIndex& blob_index, PinnableSlice* value, - uint64_t* bytes_read) const { + const BlobIndex& blob_index, + FilePrefetchBuffer* prefetch_buffer, + PinnableSlice* value, uint64_t* bytes_read) const { assert(value); + if (read_options.read_tier == kBlockCacheTier) { + return Status::Incomplete("Cannot read blob: no disk I/O allowed"); + } + if (blob_index.HasTTL() || blob_index.IsInlined()) { return Status::Corruption("Unexpected TTL/inlined blob index"); } @@ -1840,7 +1843,7 @@ Status Version::GetBlob(const ReadOptions& read_options, const Slice& user_key, assert(blob_file_reader.GetValue()); const Status s = blob_file_reader.GetValue()->GetBlob( read_options, user_key, blob_index.offset(), blob_index.size(), - blob_index.compression(), value, bytes_read); + blob_index.compression(), prefetch_buffer, value, bytes_read); return s; } @@ -2067,10 +2070,11 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k, if (is_blob_index) { if (do_merge && value) { + constexpr FilePrefetchBuffer* prefetch_buffer = nullptr; constexpr uint64_t* bytes_read = nullptr; - *status = - GetBlob(read_options, user_key, *value, value, bytes_read); + *status = GetBlob(read_options, user_key, *value, prefetch_buffer, + value, bytes_read); if (!status->ok()) { if (status->IsIncomplete()) { get_context.MarkKeyMayExist(); diff --git a/db/version_set.h b/db/version_set.h index 63874b2f9..45efd6f37 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -723,13 +723,15 @@ class Version { // saves it in *value. // REQUIRES: blob_index_slice stores an encoded blob reference Status GetBlob(const ReadOptions& read_options, const Slice& user_key, - const Slice& blob_index_slice, PinnableSlice* value, + const Slice& blob_index_slice, + FilePrefetchBuffer* prefetch_buffer, PinnableSlice* value, uint64_t* bytes_read) const; // Retrieves a blob using a blob reference and saves it in *value, // assuming the corresponding blob file is part of this Version. Status GetBlob(const ReadOptions& read_options, const Slice& user_key, - const BlobIndex& blob_index, PinnableSlice* value, + const BlobIndex& blob_index, + FilePrefetchBuffer* prefetch_buffer, PinnableSlice* value, uint64_t* bytes_read) const; using BlobReadRequest = @@ -804,6 +806,7 @@ class Version { int TEST_refs() const { return refs_; } VersionStorageInfo* storage_info() { return &storage_info_; } + const VersionStorageInfo* storage_info() const { return &storage_info_; } VersionSet* version_set() { return vset_; } diff --git a/db_stress_tool/db_stress_common.h b/db_stress_tool/db_stress_common.h index 1c415785f..387aa0204 100644 --- a/db_stress_tool/db_stress_common.h +++ b/db_stress_tool/db_stress_common.h @@ -253,6 +253,7 @@ DECLARE_string(blob_compression_type); DECLARE_bool(enable_blob_garbage_collection); DECLARE_double(blob_garbage_collection_age_cutoff); DECLARE_double(blob_garbage_collection_force_threshold); +DECLARE_uint64(blob_compaction_readahead_size); DECLARE_int32(approximate_size_one_in); DECLARE_bool(sync_fault_injection); diff --git a/db_stress_tool/db_stress_gflags.cc b/db_stress_tool/db_stress_gflags.cc index a9bc883f5..d0db73eb3 100644 --- a/db_stress_tool/db_stress_gflags.cc +++ b/db_stress_tool/db_stress_gflags.cc @@ -405,6 +405,11 @@ DEFINE_double(blob_garbage_collection_force_threshold, "[Integrated BlobDB] The threshold for the ratio of garbage in " "the oldest blob files for forcing garbage collection."); +DEFINE_uint64(blob_compaction_readahead_size, + ROCKSDB_NAMESPACE::AdvancedColumnFamilyOptions() + .blob_compaction_readahead_size, + "[Integrated BlobDB] Compaction readahead for blob files."); + static const bool FLAGS_subcompactions_dummy __attribute__((__unused__)) = RegisterFlagValidator(&FLAGS_subcompactions, &ValidateUint32Range); diff --git a/db_stress_tool/db_stress_test_base.cc b/db_stress_tool/db_stress_test_base.cc index d16fcd326..0dbc7c3e1 100644 --- a/db_stress_tool/db_stress_test_base.cc +++ b/db_stress_tool/db_stress_test_base.cc @@ -269,6 +269,8 @@ bool StressTest::BuildOptionsTable() { std::vector{"0.0", "0.25", "0.5", "0.75", "1.0"}); options_tbl.emplace("blob_garbage_collection_force_threshold", std::vector{"0.5", "0.75", "1.0"}); + options_tbl.emplace("blob_compaction_readahead_size", + std::vector{"0", "1M", "4M"}); } options_table_ = std::move(options_tbl); @@ -2323,6 +2325,8 @@ void StressTest::Open() { FLAGS_blob_garbage_collection_age_cutoff; options_.blob_garbage_collection_force_threshold = FLAGS_blob_garbage_collection_force_threshold; + options_.blob_compaction_readahead_size = + FLAGS_blob_compaction_readahead_size; } else { #ifdef ROCKSDB_LITE fprintf(stderr, "--options_file not supported in lite mode\n"); @@ -2422,21 +2426,18 @@ void StressTest::Open() { exit(1); } - if (options_.enable_blob_files) { - fprintf(stdout, - "Integrated BlobDB: blob files enabled, min blob size %" PRIu64 - ", blob file size %" PRIu64 ", blob compression type %s\n", - options_.min_blob_size, options_.blob_file_size, - CompressionTypeToString(options_.blob_compression_type).c_str()); - } - - if (options_.enable_blob_garbage_collection) { - fprintf( - stdout, - "Integrated BlobDB: blob GC enabled, cutoff %f, force threshold %f\n", - options_.blob_garbage_collection_age_cutoff, - options_.blob_garbage_collection_force_threshold); - } + fprintf(stdout, + "Integrated BlobDB: blob files enabled %d, min blob size %" PRIu64 + ", blob file size %" PRIu64 + ", blob compression type %s, blob GC enabled %d, cutoff %f, force " + "threshold %f, blob compaction readahead size %" PRIu64 "\n", + options_.enable_blob_files, options_.min_blob_size, + options_.blob_file_size, + CompressionTypeToString(options_.blob_compression_type).c_str(), + options_.enable_blob_garbage_collection, + options_.blob_garbage_collection_age_cutoff, + options_.blob_garbage_collection_force_threshold, + options_.blob_compaction_readahead_size); fprintf(stdout, "DB path: [%s]\n", FLAGS_db.c_str()); diff --git a/file/file_prefetch_buffer.cc b/file/file_prefetch_buffer.cc index a04d66a2a..a3d6e0727 100644 --- a/file/file_prefetch_buffer.cc +++ b/file/file_prefetch_buffer.cc @@ -108,6 +108,7 @@ Status FilePrefetchBuffer::Prefetch(const IOOptions& opts, } bool FilePrefetchBuffer::TryReadFromCache(const IOOptions& opts, + RandomAccessFileReader* reader, uint64_t offset, size_t n, Slice* result, Status* status, bool for_compaction) { @@ -124,11 +125,11 @@ bool FilePrefetchBuffer::TryReadFromCache(const IOOptions& opts, // If readahead is not enabled: return false. if (offset + n > buffer_offset_ + buffer_.CurrentSize()) { if (readahead_size_ > 0) { - assert(file_reader_ != nullptr); + assert(reader != nullptr); assert(max_readahead_size_ >= readahead_size_); Status s; if (for_compaction) { - s = Prefetch(opts, file_reader_, offset, std::max(n, readahead_size_), + s = Prefetch(opts, reader, offset, std::max(n, readahead_size_), for_compaction); } else { if (implicit_auto_readahead_) { @@ -149,8 +150,7 @@ bool FilePrefetchBuffer::TryReadFromCache(const IOOptions& opts, return false; } } - s = Prefetch(opts, file_reader_, offset, n + readahead_size_, - for_compaction); + s = Prefetch(opts, reader, offset, n + readahead_size_, for_compaction); } if (!s.ok()) { if (status) { diff --git a/file/file_prefetch_buffer.h b/file/file_prefetch_buffer.h index e741a2cba..e91ee41ce 100644 --- a/file/file_prefetch_buffer.h +++ b/file/file_prefetch_buffer.h @@ -13,7 +13,6 @@ #include #include -#include "file/random_access_file_reader.h" #include "file/readahead_file_info.h" #include "port/port.h" #include "rocksdb/env.h" @@ -24,6 +23,9 @@ namespace ROCKSDB_NAMESPACE { #define DEAFULT_DECREMENT 8 * 1024 +struct IOOptions; +class RandomAccessFileReader; + // FilePrefetchBuffer is a smart buffer to store and read data from a file. class FilePrefetchBuffer { public: @@ -31,7 +33,6 @@ class FilePrefetchBuffer { // Constructor. // // All arguments are optional. - // file_reader : the file reader to use. Can be a nullptr. // readahead_size : the initial readahead size. // max_readahead_size : the maximum readahead size. // If max_readahead_size > readahead_size, the readahead size will be @@ -46,18 +47,14 @@ class FilePrefetchBuffer { // implicit_auto_readahead : Readahead is enabled implicitly by rocksdb after // doing sequential scans for two times. // - // Automatic readhead is enabled for a file if file_reader, readahead_size, + // Automatic readhead is enabled for a file if readahead_size // and max_readahead_size are passed in. - // If file_reader is a nullptr, setting readahead_size and max_readahead_size - // does not make any sense. So it does nothing. // A user can construct a FilePrefetchBuffer without any arguments, but use // `Prefetch` to load data into the buffer. - FilePrefetchBuffer(RandomAccessFileReader* file_reader = nullptr, - size_t readahead_size = 0, size_t max_readahead_size = 0, + FilePrefetchBuffer(size_t readahead_size = 0, size_t max_readahead_size = 0, bool enable = true, bool track_min_offset = false, bool implicit_auto_readahead = false) : buffer_offset_(0), - file_reader_(file_reader), readahead_size_(readahead_size), max_readahead_size_(max_readahead_size), initial_readahead_size_(readahead_size), @@ -77,18 +74,22 @@ class FilePrefetchBuffer { Status Prefetch(const IOOptions& opts, RandomAccessFileReader* reader, uint64_t offset, size_t n, bool for_compaction = false); - // Tries returning the data for a file raed from this buffer, if that data is + // Tries returning the data for a file read from this buffer if that data is // in the buffer. // It handles tracking the minimum read offset if track_min_offset = true. // It also does the exponential readahead when readahead_size is set as part // of the constructor. // - // offset : the file offset. - // n : the number of bytes. - // result : output buffer to put the data into. - // for_compaction : if cache read is done for compaction read. - bool TryReadFromCache(const IOOptions& opts, uint64_t offset, size_t n, - Slice* result, Status* s, bool for_compaction = false); + // opts : the IO options to use. + // reader : the file reader. + // offset : the file offset. + // n : the number of bytes. + // result : output buffer to put the data into. + // s : output status. + // for_compaction : true if cache read is done for compaction read. + bool TryReadFromCache(const IOOptions& opts, RandomAccessFileReader* reader, + uint64_t offset, size_t n, Slice* result, Status* s, + bool for_compaction = false); // The minimum `offset` ever passed to TryReadFromCache(). This will nly be // tracked if track_min_offset = true. @@ -145,7 +146,6 @@ class FilePrefetchBuffer { private: AlignedBuffer buffer_; uint64_t buffer_offset_; - RandomAccessFileReader* file_reader_; size_t readahead_size_; // FilePrefetchBuffer object won't be created from Iterator flow if // max_readahead_size_ = 0. diff --git a/file/file_util.cc b/file/file_util.cc index 547cf80ed..87e343a53 100644 --- a/file/file_util.cc +++ b/file/file_util.cc @@ -183,9 +183,9 @@ IOStatus GenerateOneFileChecksum( ? verify_checksums_readahead_size : default_max_read_ahead_size; - FilePrefetchBuffer prefetch_buffer( - reader.get(), readahead_size /* readahead_size */, - readahead_size /* max_readahead_size */, !allow_mmap_reads /* enable */); + FilePrefetchBuffer prefetch_buffer(readahead_size /* readahead_size */, + readahead_size /* max_readahead_size */, + !allow_mmap_reads /* enable */); Slice slice; uint64_t offset = 0; @@ -193,8 +193,9 @@ IOStatus GenerateOneFileChecksum( while (size > 0) { size_t bytes_to_read = static_cast(std::min(uint64_t{readahead_size}, size)); - if (!prefetch_buffer.TryReadFromCache(opts, offset, bytes_to_read, &slice, - nullptr, false)) { + if (!prefetch_buffer.TryReadFromCache( + opts, reader.get(), offset, bytes_to_read, &slice, + nullptr /* status */, false /* for_compaction */)) { return IOStatus::Corruption("file read failed"); } if (slice.size() == 0) { diff --git a/include/rocksdb/advanced_options.h b/include/rocksdb/advanced_options.h index 75236472c..1ee09bb4b 100644 --- a/include/rocksdb/advanced_options.h +++ b/include/rocksdb/advanced_options.h @@ -819,8 +819,9 @@ struct AdvancedColumnFamilyOptions { // amplification for large-value use cases at the cost of introducing a level // of indirection for reads. See also the options min_blob_size, // blob_file_size, blob_compression_type, enable_blob_garbage_collection, - // blob_garbage_collection_age_cutoff, and - // blob_garbage_collection_force_threshold below. + // blob_garbage_collection_age_cutoff, + // blob_garbage_collection_force_threshold, and blob_compaction_readahead_size + // below. // // Default: false // @@ -893,6 +894,13 @@ struct AdvancedColumnFamilyOptions { // Dynamically changeable through the SetOptions() API double blob_garbage_collection_force_threshold = 1.0; + // Compaction readahead for blob files. + // + // Default: 0 + // + // Dynamically changeable through the SetOptions() API + uint64_t blob_compaction_readahead_size = 0; + // Create ColumnFamilyOptions with default values for all fields AdvancedColumnFamilyOptions(); // Create ColumnFamilyOptions from Options diff --git a/include/rocksdb/c.h b/include/rocksdb/c.h index 730bef9f2..37107e250 100644 --- a/include/rocksdb/c.h +++ b/include/rocksdb/c.h @@ -1122,6 +1122,12 @@ extern ROCKSDB_LIBRARY_API void rocksdb_options_set_blob_gc_force_threshold( extern ROCKSDB_LIBRARY_API double rocksdb_options_get_blob_gc_force_threshold( rocksdb_options_t* opt); +extern ROCKSDB_LIBRARY_API void +rocksdb_options_set_blob_compaction_readahead_size(rocksdb_options_t* opt, + uint64_t val); +extern ROCKSDB_LIBRARY_API uint64_t +rocksdb_options_get_blob_compaction_readahead_size(rocksdb_options_t* opt); + /* returns a pointer to a malloc()-ed, null terminated string */ extern ROCKSDB_LIBRARY_API char* rocksdb_options_statistics_get_string( rocksdb_options_t* opt); diff --git a/options/cf_options.cc b/options/cf_options.cc index 57c965cc3..aef949d11 100644 --- a/options/cf_options.cc +++ b/options/cf_options.cc @@ -434,6 +434,10 @@ static std::unordered_map blob_garbage_collection_force_threshold), OptionType::kDouble, OptionVerificationType::kNormal, OptionTypeFlags::kMutable}}, + {"blob_compaction_readahead_size", + {offsetof(struct MutableCFOptions, blob_compaction_readahead_size), + OptionType::kUInt64T, OptionVerificationType::kNormal, + OptionTypeFlags::kMutable}}, {"sample_for_compression", {offsetof(struct MutableCFOptions, sample_for_compression), OptionType::kUInt64T, OptionVerificationType::kNormal, @@ -1055,6 +1059,8 @@ void MutableCFOptions::Dump(Logger* log) const { blob_garbage_collection_age_cutoff); ROCKS_LOG_INFO(log, " blob_garbage_collection_force_threshold: %f", blob_garbage_collection_force_threshold); + ROCKS_LOG_INFO(log, " blob_compaction_readahead_size: %" PRIu64, + blob_compaction_readahead_size); } MutableCFOptions::MutableCFOptions(const Options& options) diff --git a/options/cf_options.h b/options/cf_options.h index d08096da1..ca2b59fca 100644 --- a/options/cf_options.h +++ b/options/cf_options.h @@ -143,6 +143,7 @@ struct MutableCFOptions { options.blob_garbage_collection_age_cutoff), blob_garbage_collection_force_threshold( options.blob_garbage_collection_force_threshold), + blob_compaction_readahead_size(options.blob_compaction_readahead_size), max_sequential_skip_in_iterations( options.max_sequential_skip_in_iterations), check_flush_compaction_key_order( @@ -190,6 +191,7 @@ struct MutableCFOptions { enable_blob_garbage_collection(false), blob_garbage_collection_age_cutoff(0.0), blob_garbage_collection_force_threshold(0.0), + blob_compaction_readahead_size(0), max_sequential_skip_in_iterations(0), check_flush_compaction_key_order(true), paranoid_file_checks(false), @@ -255,6 +257,7 @@ struct MutableCFOptions { bool enable_blob_garbage_collection; double blob_garbage_collection_age_cutoff; double blob_garbage_collection_force_threshold; + uint64_t blob_compaction_readahead_size; // Misc options uint64_t max_sequential_skip_in_iterations; diff --git a/options/options.cc b/options/options.cc index aa16663f9..969bc31a8 100644 --- a/options/options.cc +++ b/options/options.cc @@ -99,7 +99,8 @@ AdvancedColumnFamilyOptions::AdvancedColumnFamilyOptions(const Options& options) blob_garbage_collection_age_cutoff( options.blob_garbage_collection_age_cutoff), blob_garbage_collection_force_threshold( - options.blob_garbage_collection_force_threshold) { + options.blob_garbage_collection_force_threshold), + blob_compaction_readahead_size(options.blob_compaction_readahead_size) { assert(memtable_factory.get() != nullptr); if (max_bytes_for_level_multiplier_additional.size() < static_cast(num_levels)) { @@ -405,6 +406,9 @@ void ColumnFamilyOptions::Dump(Logger* log) const { blob_garbage_collection_age_cutoff); ROCKS_LOG_HEADER(log, "Options.blob_garbage_collection_force_threshold: %f", blob_garbage_collection_force_threshold); + ROCKS_LOG_HEADER( + log, " Options.blob_compaction_readahead_size: %" PRIu64, + blob_compaction_readahead_size); } // ColumnFamilyOptions::Dump void Options::Dump(Logger* log) const { diff --git a/options/options_helper.cc b/options/options_helper.cc index be70463f7..2df838de9 100644 --- a/options/options_helper.cc +++ b/options/options_helper.cc @@ -254,6 +254,8 @@ void UpdateColumnFamilyOptions(const MutableCFOptions& moptions, moptions.blob_garbage_collection_age_cutoff; cf_opts->blob_garbage_collection_force_threshold = moptions.blob_garbage_collection_force_threshold; + cf_opts->blob_compaction_readahead_size = + moptions.blob_compaction_readahead_size; // Misc options cf_opts->max_sequential_skip_in_iterations = diff --git a/options/options_settable_test.cc b/options/options_settable_test.cc index a95936967..b0ba1a540 100644 --- a/options/options_settable_test.cc +++ b/options/options_settable_test.cc @@ -518,6 +518,7 @@ TEST_F(OptionsSettableTest, ColumnFamilyOptionsAllFieldsSettable) { "enable_blob_garbage_collection=true;" "blob_garbage_collection_age_cutoff=0.5;" "blob_garbage_collection_force_threshold=0.75;" + "blob_compaction_readahead_size=262144;" "compaction_options_fifo={max_table_files_size=3;allow_" "compaction=false;age_for_warm=1;};", new_options)); diff --git a/options/options_test.cc b/options/options_test.cc index 3514420a3..0392320a8 100644 --- a/options/options_test.cc +++ b/options/options_test.cc @@ -109,6 +109,7 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) { {"enable_blob_garbage_collection", "true"}, {"blob_garbage_collection_age_cutoff", "0.5"}, {"blob_garbage_collection_force_threshold", "0.75"}, + {"blob_compaction_readahead_size", "256K"}, }; std::unordered_map db_options_map = { @@ -241,6 +242,7 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) { ASSERT_EQ(new_cf_opt.enable_blob_garbage_collection, true); ASSERT_EQ(new_cf_opt.blob_garbage_collection_age_cutoff, 0.5); ASSERT_EQ(new_cf_opt.blob_garbage_collection_force_threshold, 0.75); + ASSERT_EQ(new_cf_opt.blob_compaction_readahead_size, 262144); cf_options_map["write_buffer_size"] = "hello"; ASSERT_NOK(GetColumnFamilyOptionsFromMap(exact, base_cf_opt, cf_options_map, @@ -2269,6 +2271,7 @@ TEST_F(OptionsOldApiTest, GetOptionsFromMapTest) { {"enable_blob_garbage_collection", "true"}, {"blob_garbage_collection_age_cutoff", "0.5"}, {"blob_garbage_collection_force_threshold", "0.75"}, + {"blob_compaction_readahead_size", "256K"}, }; std::unordered_map db_options_map = { @@ -2393,6 +2396,7 @@ TEST_F(OptionsOldApiTest, GetOptionsFromMapTest) { ASSERT_EQ(new_cf_opt.enable_blob_garbage_collection, true); ASSERT_EQ(new_cf_opt.blob_garbage_collection_age_cutoff, 0.5); ASSERT_EQ(new_cf_opt.blob_garbage_collection_force_threshold, 0.75); + ASSERT_EQ(new_cf_opt.blob_compaction_readahead_size, 262144); cf_options_map["write_buffer_size"] = "hello"; ASSERT_NOK(GetColumnFamilyOptionsFromMap( diff --git a/src.mk b/src.mk index 46b1afcd9..12718c66b 100644 --- a/src.mk +++ b/src.mk @@ -18,6 +18,7 @@ LIB_SOURCES = \ db/blob/blob_log_format.cc \ db/blob/blob_log_sequential_reader.cc \ db/blob/blob_log_writer.cc \ + db/blob/prefetch_buffer_collection.cc \ db/builder.cc \ db/c.cc \ db/column_family.cc \ diff --git a/table/block_based/block_based_table_reader.cc b/table/block_based/block_based_table_reader.cc index 348fd3009..1012b02a7 100644 --- a/table/block_based/block_based_table_reader.cc +++ b/table/block_based/block_based_table_reader.cc @@ -579,7 +579,8 @@ Status BlockBasedTable::Open( } else { // Should not prefetch for mmap mode. prefetch_buffer.reset(new FilePrefetchBuffer( - nullptr, 0, 0, false /* enable */, true /* track_min_offset */)); + 0 /* readahead_size */, 0 /* max_readahead_size */, false /* enable */, + true /* track_min_offset */)); } // Read in the following order: @@ -732,14 +733,17 @@ Status BlockBasedTable::PrefetchTail( // Try file system prefetch if (!file->use_direct_io() && !force_direct_prefetch) { if (!file->Prefetch(prefetch_off, prefetch_len).IsNotSupported()) { - prefetch_buffer->reset( - new FilePrefetchBuffer(nullptr, 0, 0, false, true)); + prefetch_buffer->reset(new FilePrefetchBuffer( + 0 /* readahead_size */, 0 /* max_readahead_size */, + false /* enable */, true /* track_min_offset */)); return Status::OK(); } } // Use `FilePrefetchBuffer` - prefetch_buffer->reset(new FilePrefetchBuffer(nullptr, 0, 0, true, true)); + prefetch_buffer->reset( + new FilePrefetchBuffer(0 /* readahead_size */, 0 /* max_readahead_size */, + true /* enable */, true /* track_min_offset */)); IOOptions opts; Status s = file->PrepareIOOptions(ro, opts); if (s.ok()) { @@ -2966,7 +2970,7 @@ Status BlockBasedTable::VerifyChecksumInBlocks( // FilePrefetchBuffer doesn't work in mmap mode and readahead is not // needed there. FilePrefetchBuffer prefetch_buffer( - rep_->file.get(), readahead_size /* readahead_size */, + readahead_size /* readahead_size */, readahead_size /* max_readahead_size */, !rep_->ioptions.allow_mmap_reads /* enable */); diff --git a/table/block_based/block_based_table_reader.h b/table/block_based/block_based_table_reader.h index 45c8a7e73..8efcd7e09 100644 --- a/table/block_based/block_based_table_reader.h +++ b/table/block_based/block_based_table_reader.h @@ -683,10 +683,10 @@ struct BlockBasedTable::Rep { size_t max_readahead_size, std::unique_ptr* fpb, bool implicit_auto_readahead) const { - fpb->reset(new FilePrefetchBuffer( - file.get(), readahead_size, max_readahead_size, - !ioptions.allow_mmap_reads /* enable */, false /* track_min_offset*/, - implicit_auto_readahead)); + fpb->reset(new FilePrefetchBuffer(readahead_size, max_readahead_size, + !ioptions.allow_mmap_reads /* enable */, + false /* track_min_offset */, + implicit_auto_readahead)); } void CreateFilePrefetchBufferIfNotExists( diff --git a/table/block_fetcher.cc b/table/block_fetcher.cc index 03ca5ce47..5283b1aa5 100644 --- a/table/block_fetcher.cc +++ b/table/block_fetcher.cc @@ -69,9 +69,10 @@ inline bool BlockFetcher::TryGetFromPrefetchBuffer() { if (prefetch_buffer_ != nullptr) { IOOptions opts; IOStatus io_s = file_->PrepareIOOptions(read_options_, opts); - if (io_s.ok() && prefetch_buffer_->TryReadFromCache( - opts, handle_.offset(), block_size_with_trailer_, - &slice_, &io_s, for_compaction_)) { + if (io_s.ok() && + prefetch_buffer_->TryReadFromCache(opts, file_, handle_.offset(), + block_size_with_trailer_, &slice_, + &io_s, for_compaction_)) { ProcessTrailerIfPresent(); if (!io_status_.ok()) { return true; diff --git a/table/format.cc b/table/format.cc index c68fbb132..10dbd3f14 100644 --- a/table/format.cc +++ b/table/format.cc @@ -330,7 +330,7 @@ Status ReadFooterFromFile(const IOOptions& opts, RandomAccessFileReader* file, // for iterator, TryReadFromCache might do a readahead. Revisit to see if we // need to pass a timeout at that point if (prefetch_buffer == nullptr || - !prefetch_buffer->TryReadFromCache(IOOptions(), read_offset, + !prefetch_buffer->TryReadFromCache(IOOptions(), file, read_offset, Footer::kMaxEncodedLength, &footer_input, nullptr)) { if (file->use_direct_io()) { diff --git a/table/get_context.cc b/table/get_context.cc index 80906f795..c61f82b1a 100644 --- a/table/get_context.cc +++ b/table/get_context.cc @@ -382,7 +382,11 @@ void GetContext::Merge(const Slice* value) { bool GetContext::GetBlobValue(const Slice& blob_index, PinnableSlice* blob_value) { - Status status = blob_fetcher_->FetchBlob(user_key_, blob_index, blob_value); + constexpr FilePrefetchBuffer* prefetch_buffer = nullptr; + constexpr uint64_t* bytes_read = nullptr; + + Status status = blob_fetcher_->FetchBlob( + user_key_, blob_index, prefetch_buffer, blob_value, bytes_read); if (!status.ok()) { if (status.IsIncomplete()) { MarkKeyMayExist(); diff --git a/table/sst_file_dumper.cc b/table/sst_file_dumper.cc index ff9c42f6c..8a2be95a6 100644 --- a/table/sst_file_dumper.cc +++ b/table/sst_file_dumper.cc @@ -96,8 +96,9 @@ Status SstFileDumper::GetTableReader(const std::string& file_path) { file_.reset(new RandomAccessFileReader(std::move(file), file_path)); - FilePrefetchBuffer prefetch_buffer(nullptr, 0, 0, true /* enable */, - false /* track_min_offset */); + FilePrefetchBuffer prefetch_buffer( + 0 /* readahead_size */, 0 /* max_readahead_size */, true /* enable */, + false /* track_min_offset */); if (s.ok()) { const uint64_t kSstDumpTailPrefetchSize = 512 * 1024; uint64_t prefetch_size = (file_size > kSstDumpTailPrefetchSize) diff --git a/table/table_test.cc b/table/table_test.cc index b845b976f..c0b254137 100644 --- a/table/table_test.cc +++ b/table/table_test.cc @@ -4937,11 +4937,18 @@ TEST_F(BBTTailPrefetchTest, TestTailPrefetchStats) { TEST_F(BBTTailPrefetchTest, FilePrefetchBufferMinOffset) { TailPrefetchStats tpstats; - FilePrefetchBuffer buffer(nullptr, 0, 0, false, true); + FilePrefetchBuffer buffer(0 /* readahead_size */, 0 /* max_readahead_size */, + false /* enable */, true /* track_min_offset */); IOOptions opts; - buffer.TryReadFromCache(opts, 500, 10, nullptr, nullptr); - buffer.TryReadFromCache(opts, 480, 10, nullptr, nullptr); - buffer.TryReadFromCache(opts, 490, 10, nullptr, nullptr); + buffer.TryReadFromCache(opts, nullptr /* reader */, 500 /* offset */, + 10 /* n */, nullptr /* result */, + nullptr /* status */); + buffer.TryReadFromCache(opts, nullptr /* reader */, 480 /* offset */, + 10 /* n */, nullptr /* result */, + nullptr /* status */); + buffer.TryReadFromCache(opts, nullptr /* reader */, 490 /* offset */, + 10 /* n */, nullptr /* result */, + nullptr /* status */); ASSERT_EQ(480, buffer.min_offset_read()); } diff --git a/test_util/testutil.cc b/test_util/testutil.cc index 986b8d184..78d09ee0f 100644 --- a/test_util/testutil.cc +++ b/test_util/testutil.cc @@ -446,6 +446,7 @@ void RandomInitCFOptions(ColumnFamilyOptions* cf_opt, DBOptions& db_options, uint_max + rnd->Uniform(10000); cf_opt->min_blob_size = uint_max + rnd->Uniform(10000); cf_opt->blob_file_size = uint_max + rnd->Uniform(10000); + cf_opt->blob_compaction_readahead_size = uint_max + rnd->Uniform(10000); // unsigned int options cf_opt->rate_limit_delay_max_milliseconds = rnd->Uniform(10000); diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index 8e6795e94..16019dd29 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -1002,6 +1002,11 @@ DEFINE_double(blob_garbage_collection_force_threshold, "[Integrated BlobDB] The threshold for the ratio of garbage in " "the oldest blob files for forcing garbage collection."); +DEFINE_uint64(blob_compaction_readahead_size, + ROCKSDB_NAMESPACE::AdvancedColumnFamilyOptions() + .blob_compaction_readahead_size, + "[Integrated BlobDB] Compaction readahead for blob files."); + #ifndef ROCKSDB_LITE // Secondary DB instance Options @@ -4365,6 +4370,8 @@ class Benchmark { FLAGS_blob_garbage_collection_age_cutoff; options.blob_garbage_collection_force_threshold = FLAGS_blob_garbage_collection_force_threshold; + options.blob_compaction_readahead_size = + FLAGS_blob_compaction_readahead_size; #ifndef ROCKSDB_LITE if (FLAGS_readonly && FLAGS_transaction_db) { diff --git a/tools/db_bench_tool_test.cc b/tools/db_bench_tool_test.cc index bad4ae5c0..4b4284934 100644 --- a/tools/db_bench_tool_test.cc +++ b/tools/db_bench_tool_test.cc @@ -278,6 +278,7 @@ const std::string options_file_content = R"OPTIONS_FILE( enable_blob_garbage_collection=true blob_garbage_collection_age_cutoff=0.5 blob_garbage_collection_force_threshold=0.75 + blob_compaction_readahead_size=262144 [TableOptions/BlockBasedTable "default"] format_version=0 diff --git a/tools/db_crashtest.py b/tools/db_crashtest.py index 71f44923b..4498fac76 100644 --- a/tools/db_crashtest.py +++ b/tools/db_crashtest.py @@ -294,6 +294,7 @@ blob_params = { "enable_blob_garbage_collection": lambda: random.choice([0] + [1] * 3), "blob_garbage_collection_age_cutoff": lambda: random.choice([0.0, 0.25, 0.5, 0.75, 1.0]), "blob_garbage_collection_force_threshold": lambda: random.choice([0.5, 0.75, 1.0]), + "blob_compaction_readahead_size": lambda: random.choice([0, 1048576, 4194304]), } ts_params = {