diff --git a/HISTORY.md b/HISTORY.md index 6e7a45fc8..fa21fa585 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -23,6 +23,7 @@ * Allow a single write batch to include keys from multiple column families whose timestamps' formats can differ. For example, some column families may disable timestamp, while others enable timestamp. * Add compaction priority information in RemoteCompaction, which can be used to schedule high priority job first. * Added new callback APIs `OnBlobFileCreationStarted`,`OnBlobFileCreated`and `OnBlobFileDeleted` in `EventListener` class of listener.h. It notifies listeners during creation/deletion of individual blob files in Integrated BlobDB. It also log blob file creation finished event and deletion event in LOG file. +* Batch blob read requests for `DB::MultiGet` using `MultiRead`. ### Public API change * Remove obsolete implementation details FullKey and ParseFullKey from public API diff --git a/db/blob/blob_file_reader.cc b/db/blob/blob_file_reader.cc index 6f64da48b..9ce0b5b89 100644 --- a/db/blob/blob_file_reader.cc +++ b/db/blob/blob_file_reader.cc @@ -351,6 +351,117 @@ Status BlobFileReader::GetBlob(const ReadOptions& read_options, return Status::OK(); } +void BlobFileReader::MultiGetBlob( + const ReadOptions& read_options, + const autovector>& user_keys, + const autovector& offsets, + const autovector& value_sizes, autovector& statuses, + autovector& values, uint64_t* bytes_read) const { + const size_t num_blobs = user_keys.size(); + assert(num_blobs == offsets.size()); + assert(num_blobs == value_sizes.size()); + assert(num_blobs == statuses.size()); + assert(num_blobs == values.size()); + + std::vector read_reqs(num_blobs); + autovector adjustments; + uint64_t total_len = 0; + for (size_t i = 0; i < num_blobs; ++i) { + const size_t key_size = user_keys[i].get().size(); + assert(IsValidBlobOffset(offsets[i], key_size, value_sizes[i], file_size_)); + const uint64_t adjustment = + read_options.verify_checksums + ? BlobLogRecord::CalculateAdjustmentForRecordHeader(key_size) + : 0; + assert(offsets[i] >= adjustment); + adjustments.push_back(adjustment); + read_reqs[i].offset = offsets[i] - adjustment; + read_reqs[i].len = value_sizes[i] + adjustment; + total_len += read_reqs[i].len; + } + + RecordTick(statistics_, BLOB_DB_BLOB_FILE_BYTES_READ, total_len); + + Buffer buf; + AlignedBuf aligned_buf; + + Status s; + bool direct_io = file_reader_->use_direct_io(); + if (direct_io) { + for (size_t i = 0; i < read_reqs.size(); ++i) { + read_reqs[i].scratch = nullptr; + } + } else { + buf.reset(new char[total_len]); + std::ptrdiff_t pos = 0; + for (size_t i = 0; i < read_reqs.size(); ++i) { + read_reqs[i].scratch = buf.get() + pos; + pos += read_reqs[i].len; + } + } + TEST_SYNC_POINT("BlobFileReader::MultiGetBlob:ReadFromFile"); + s = file_reader_->MultiRead(IOOptions(), read_reqs.data(), read_reqs.size(), + direct_io ? &aligned_buf : nullptr); + if (!s.ok()) { + for (auto& req : read_reqs) { + req.status.PermitUncheckedError(); + } + for (size_t i = 0; i < num_blobs; ++i) { + assert(statuses[i]); + *statuses[i] = s; + } + return; + } + + assert(s.ok()); + for (size_t i = 0; i < num_blobs; ++i) { + auto& req = read_reqs[i]; + assert(statuses[i]); + if (req.status.ok() && req.result.size() != req.len) { + req.status = IOStatus::Corruption("Failed to read data from blob file"); + } + *statuses[i] = req.status; + } + + if (read_options.verify_checksums) { + for (size_t i = 0; i < num_blobs; ++i) { + assert(statuses[i]); + if (!statuses[i]->ok()) { + continue; + } + const Slice& record_slice = read_reqs[i].result; + s = VerifyBlob(record_slice, user_keys[i], value_sizes[i]); + if (!s.ok()) { + assert(statuses[i]); + *statuses[i] = s; + } + } + } + + for (size_t i = 0; i < num_blobs; ++i) { + assert(statuses[i]); + if (!statuses[i]->ok()) { + continue; + } + const Slice& record_slice = read_reqs[i].result; + const Slice value_slice(record_slice.data() + adjustments[i], + value_sizes[i]); + s = UncompressBlobIfNeeded(value_slice, compression_type_, clock_, + statistics_, values[i]); + if (!s.ok()) { + *statuses[i] = s; + } + } + + if (bytes_read) { + uint64_t total_bytes = 0; + for (const auto& req : read_reqs) { + total_bytes += req.result.size(); + } + *bytes_read = total_bytes; + } +} + Status BlobFileReader::VerifyBlob(const Slice& record_slice, const Slice& user_key, uint64_t value_size) { BlobLogRecord record; diff --git a/db/blob/blob_file_reader.h b/db/blob/blob_file_reader.h index 3ab0d52c2..06087c3a0 100644 --- a/db/blob/blob_file_reader.h +++ b/db/blob/blob_file_reader.h @@ -11,6 +11,7 @@ #include "file/random_access_file_reader.h" #include "rocksdb/compression_type.h" #include "rocksdb/rocksdb_namespace.h" +#include "util/autovector.h" namespace ROCKSDB_NAMESPACE { @@ -43,6 +44,17 @@ class BlobFileReader { CompressionType compression_type, PinnableSlice* value, uint64_t* bytes_read) const; + void MultiGetBlob( + const ReadOptions& read_options, + const autovector>& user_keys, + const autovector& offsets, + const autovector& value_sizes, autovector& statuses, + autovector& values, uint64_t* bytes_read) const; + + CompressionType GetCompressionType() const { return compression_type_; } + + uint64_t GetFileSize() const { return file_size_; } + private: BlobFileReader(std::unique_ptr&& file_reader, uint64_t file_size, CompressionType compression_type, diff --git a/db/blob/blob_file_reader_test.cc b/db/blob/blob_file_reader_test.cc index e08a4bab8..8544b53d4 100644 --- a/db/blob/blob_file_reader_test.cc +++ b/db/blob/blob_file_reader_test.cc @@ -27,23 +27,23 @@ namespace ROCKSDB_NAMESPACE { namespace { -// Creates a test blob file with a single blob in it. Note: this method -// makes it possible to test various corner cases by allowing the caller -// to specify the contents of various blob file header/footer fields. +// Creates a test blob file with `num` blobs in it. void WriteBlobFile(const ImmutableOptions& immutable_options, uint32_t column_family_id, bool has_ttl, const ExpirationRange& expiration_range_header, const ExpirationRange& expiration_range_footer, - uint64_t blob_file_number, const Slice& key, - const Slice& blob, CompressionType compression_type, - uint64_t* blob_offset, uint64_t* blob_size) { + uint64_t blob_file_number, const std::vector& keys, + const std::vector& blobs, CompressionType compression, + std::vector& blob_offsets, + std::vector& blob_sizes) { assert(!immutable_options.cf_paths.empty()); - assert(blob_offset); - assert(blob_size); + size_t num = keys.size(); + assert(num == blobs.size()); + assert(num == blob_offsets.size()); + assert(num == blob_sizes.size()); const std::string blob_file_path = BlobFileName(immutable_options.cf_paths.front().path, blob_file_number); - std::unique_ptr file; ASSERT_OK(NewWritableFile(immutable_options.fs.get(), blob_file_path, &file, FileOptions())); @@ -59,50 +59,77 @@ void WriteBlobFile(const ImmutableOptions& immutable_options, statistics, blob_file_number, use_fsync, do_flush); - BlobLogHeader header(column_family_id, compression_type, has_ttl, + BlobLogHeader header(column_family_id, compression, has_ttl, expiration_range_header); ASSERT_OK(blob_log_writer.WriteHeader(header)); - std::string compressed_blob; - Slice blob_to_write; - - if (compression_type == kNoCompression) { - blob_to_write = blob; - *blob_size = blob.size(); + std::vector compressed_blobs(num); + std::vector blobs_to_write(num); + if (kNoCompression == compression) { + for (size_t i = 0; i < num; ++i) { + blobs_to_write[i] = blobs[i]; + blob_sizes[i] = blobs[i].size(); + } } else { CompressionOptions opts; - CompressionContext context(compression_type); + CompressionContext context(compression); constexpr uint64_t sample_for_compression = 0; - CompressionInfo info(opts, context, CompressionDict::GetEmptyDict(), - compression_type, sample_for_compression); + compression, sample_for_compression); constexpr uint32_t compression_format_version = 2; - ASSERT_TRUE( - CompressData(blob, info, compression_format_version, &compressed_blob)); - - blob_to_write = compressed_blob; - *blob_size = compressed_blob.size(); + for (size_t i = 0; i < num; ++i) { + ASSERT_TRUE(CompressData(blobs[i], info, compression_format_version, + &compressed_blobs[i])); + blobs_to_write[i] = compressed_blobs[i]; + blob_sizes[i] = compressed_blobs[i].size(); + } } - uint64_t key_offset = 0; - - ASSERT_OK( - blob_log_writer.AddRecord(key, blob_to_write, &key_offset, blob_offset)); + for (size_t i = 0; i < num; ++i) { + uint64_t key_offset = 0; + ASSERT_OK(blob_log_writer.AddRecord(keys[i], blobs_to_write[i], &key_offset, + &blob_offsets[i])); + } BlobLogFooter footer; - footer.blob_count = 1; + footer.blob_count = num; footer.expiration_range = expiration_range_footer; std::string checksum_method; std::string checksum_value; - ASSERT_OK( blob_log_writer.AppendFooter(footer, &checksum_method, &checksum_value)); } +// Creates a test blob file with a single blob in it. Note: this method +// makes it possible to test various corner cases by allowing the caller +// to specify the contents of various blob file header/footer fields. +void WriteBlobFile(const ImmutableOptions& immutable_options, + uint32_t column_family_id, bool has_ttl, + const ExpirationRange& expiration_range_header, + const ExpirationRange& expiration_range_footer, + uint64_t blob_file_number, const Slice& key, + const Slice& blob, CompressionType compression, + uint64_t* blob_offset, uint64_t* blob_size) { + std::vector keys{key}; + std::vector blobs{blob}; + std::vector blob_offsets{0}; + std::vector blob_sizes{0}; + WriteBlobFile(immutable_options, column_family_id, has_ttl, + expiration_range_header, expiration_range_footer, + blob_file_number, keys, blobs, compression, blob_offsets, + blob_sizes); + if (blob_offset) { + *blob_offset = blob_offsets[0]; + } + if (blob_size) { + *blob_size = blob_sizes[0]; + } +} + } // anonymous namespace class BlobFileReaderTest : public testing::Test { @@ -127,15 +154,19 @@ TEST_F(BlobFileReaderTest, CreateReaderAndGetBlob) { constexpr bool has_ttl = false; constexpr ExpirationRange expiration_range; constexpr uint64_t blob_file_number = 1; - constexpr char key[] = "key"; - constexpr char blob[] = "blob"; + constexpr size_t num_blobs = 3; + const std::vector key_strs = {"key1", "key2", "key3"}; + const std::vector blob_strs = {"blob1", "blob2", "blob3"}; - uint64_t blob_offset = 0; - uint64_t blob_size = 0; + const std::vector keys = {key_strs[0], key_strs[1], key_strs[2]}; + const std::vector blobs = {blob_strs[0], blob_strs[1], blob_strs[2]}; + + std::vector blob_offsets(keys.size()); + std::vector blob_sizes(keys.size()); WriteBlobFile(immutable_options, column_family_id, has_ttl, expiration_range, - expiration_range, blob_file_number, key, blob, kNoCompression, - &blob_offset, &blob_size); + expiration_range, blob_file_number, keys, blobs, kNoCompression, + blob_offsets, blob_sizes); constexpr HistogramImpl* blob_file_read_hist = nullptr; @@ -153,10 +184,36 @@ TEST_F(BlobFileReaderTest, CreateReaderAndGetBlob) { PinnableSlice value; uint64_t bytes_read = 0; - ASSERT_OK(reader->GetBlob(read_options, key, blob_offset, blob_size, - kNoCompression, &value, &bytes_read)); - ASSERT_EQ(value, blob); - ASSERT_EQ(bytes_read, blob_size); + ASSERT_OK(reader->GetBlob(read_options, keys[0], blob_offsets[0], + blob_sizes[0], kNoCompression, &value, + &bytes_read)); + ASSERT_EQ(value, blobs[0]); + ASSERT_EQ(bytes_read, blob_sizes[0]); + + // MultiGetBlob + bytes_read = 0; + size_t total_size = 0; + autovector> key_refs; + for (const auto& key_ref : keys) { + key_refs.emplace_back(std::cref(key_ref)); + } + autovector offsets{blob_offsets[0], blob_offsets[1], + blob_offsets[2]}; + autovector sizes{blob_sizes[0], blob_sizes[1], blob_sizes[2]}; + std::array statuses_buf; + autovector statuses{&statuses_buf[0], &statuses_buf[1], + &statuses_buf[2]}; + std::array value_buf; + autovector values{&value_buf[0], &value_buf[1], + &value_buf[2]}; + reader->MultiGetBlob(read_options, key_refs, offsets, sizes, statuses, + values, &bytes_read); + for (size_t i = 0; i < num_blobs; ++i) { + ASSERT_OK(statuses_buf[i]); + ASSERT_EQ(value_buf[i], blobs[i]); + total_size += blob_sizes[i]; + } + ASSERT_EQ(bytes_read, total_size); } read_options.verify_checksums = true; @@ -165,14 +222,15 @@ TEST_F(BlobFileReaderTest, CreateReaderAndGetBlob) { PinnableSlice value; uint64_t bytes_read = 0; - ASSERT_OK(reader->GetBlob(read_options, key, blob_offset, blob_size, - kNoCompression, &value, &bytes_read)); - ASSERT_EQ(value, blob); + ASSERT_OK(reader->GetBlob(read_options, keys[1], blob_offsets[1], + blob_sizes[1], kNoCompression, &value, + &bytes_read)); + ASSERT_EQ(value, blobs[1]); - constexpr uint64_t key_size = sizeof(key) - 1; + const uint64_t key_size = keys[1].size(); ASSERT_EQ(bytes_read, BlobLogRecord::CalculateAdjustmentForRecordHeader(key_size) + - blob_size); + blob_sizes[1]); } // Invalid offset (too close to start of file) @@ -181,8 +239,9 @@ TEST_F(BlobFileReaderTest, CreateReaderAndGetBlob) { uint64_t bytes_read = 0; ASSERT_TRUE(reader - ->GetBlob(read_options, key, blob_offset - 1, blob_size, - kNoCompression, &value, &bytes_read) + ->GetBlob(read_options, keys[0], blob_offsets[0] - 1, + blob_sizes[0], kNoCompression, &value, + &bytes_read) .IsCorruption()); ASSERT_EQ(bytes_read, 0); } @@ -193,8 +252,9 @@ TEST_F(BlobFileReaderTest, CreateReaderAndGetBlob) { uint64_t bytes_read = 0; ASSERT_TRUE(reader - ->GetBlob(read_options, key, blob_offset + 1, blob_size, - kNoCompression, &value, &bytes_read) + ->GetBlob(read_options, keys[2], blob_offsets[2] + 1, + blob_sizes[2], kNoCompression, &value, + &bytes_read) .IsCorruption()); ASSERT_EQ(bytes_read, 0); } @@ -205,8 +265,8 @@ TEST_F(BlobFileReaderTest, CreateReaderAndGetBlob) { uint64_t bytes_read = 0; ASSERT_TRUE(reader - ->GetBlob(read_options, key, blob_offset, blob_size, kZSTD, - &value, &bytes_read) + ->GetBlob(read_options, keys[0], blob_offsets[0], + blob_sizes[0], kZSTD, &value, &bytes_read) .IsCorruption()); ASSERT_EQ(bytes_read, 0); } @@ -219,23 +279,82 @@ TEST_F(BlobFileReaderTest, CreateReaderAndGetBlob) { ASSERT_TRUE(reader ->GetBlob(read_options, shorter_key, - blob_offset - (sizeof(key) - sizeof(shorter_key)), - blob_size, kNoCompression, &value, &bytes_read) + blob_offsets[0] - + (keys[0].size() - sizeof(shorter_key) + 1), + blob_sizes[0], kNoCompression, &value, + &bytes_read) .IsCorruption()); ASSERT_EQ(bytes_read, 0); + + // MultiGetBlob + autovector> key_refs; + for (const auto& key_ref : keys) { + key_refs.emplace_back(std::cref(key_ref)); + } + Slice shorter_key_slice(shorter_key, sizeof(shorter_key) - 1); + key_refs[1] = std::cref(shorter_key_slice); + + autovector offsets{ + blob_offsets[0], + blob_offsets[1] - (keys[1].size() - key_refs[1].get().size()), + blob_offsets[2]}; + autovector sizes{blob_sizes[0], blob_sizes[1], blob_sizes[2]}; + std::array statuses_buf; + autovector statuses{&statuses_buf[0], &statuses_buf[1], + &statuses_buf[2]}; + std::array value_buf; + autovector values{&value_buf[0], &value_buf[1], + &value_buf[2]}; + reader->MultiGetBlob(read_options, key_refs, offsets, sizes, statuses, + values, &bytes_read); + for (size_t i = 0; i < num_blobs; ++i) { + if (i == 1) { + ASSERT_TRUE(statuses_buf[i].IsCorruption()); + } else { + ASSERT_OK(statuses_buf[i]); + } + } } // Incorrect key { - constexpr char incorrect_key[] = "foo"; + constexpr char incorrect_key[] = "foo1"; PinnableSlice value; uint64_t bytes_read = 0; ASSERT_TRUE(reader - ->GetBlob(read_options, incorrect_key, blob_offset, - blob_size, kNoCompression, &value, &bytes_read) + ->GetBlob(read_options, incorrect_key, blob_offsets[0], + blob_sizes[0], kNoCompression, &value, + &bytes_read) .IsCorruption()); ASSERT_EQ(bytes_read, 0); + + // MultiGetBlob + autovector> key_refs; + for (const auto& key_ref : keys) { + key_refs.emplace_back(std::cref(key_ref)); + } + Slice wrong_key_slice(incorrect_key, sizeof(incorrect_key) - 1); + key_refs[2] = std::cref(wrong_key_slice); + + autovector offsets{blob_offsets[0], blob_offsets[1], + blob_offsets[2]}; + autovector sizes{blob_sizes[0], blob_sizes[1], blob_sizes[2]}; + std::array statuses_buf; + autovector statuses{&statuses_buf[0], &statuses_buf[1], + &statuses_buf[2]}; + std::array value_buf; + autovector values{&value_buf[0], &value_buf[1], + &value_buf[2]}; + reader->MultiGetBlob(read_options, key_refs, offsets, sizes, statuses, + values, &bytes_read); + for (size_t i = 0; i < num_blobs; ++i) { + if (i == num_blobs - 1) { + ASSERT_TRUE(statuses_buf[i].IsCorruption()); + } else { + ASSERT_OK(statuses_buf[i]); + } + } } // Incorrect value size @@ -244,10 +363,35 @@ TEST_F(BlobFileReaderTest, CreateReaderAndGetBlob) { uint64_t bytes_read = 0; ASSERT_TRUE(reader - ->GetBlob(read_options, key, blob_offset, blob_size + 1, - kNoCompression, &value, &bytes_read) + ->GetBlob(read_options, keys[1], blob_offsets[1], + blob_sizes[1] + 1, kNoCompression, &value, + &bytes_read) .IsCorruption()); ASSERT_EQ(bytes_read, 0); + + // MultiGetBlob + autovector> key_refs; + for (const auto& key_ref : keys) { + key_refs.emplace_back(std::cref(key_ref)); + } + autovector offsets{blob_offsets[0], blob_offsets[1], + blob_offsets[2]}; + autovector sizes{blob_sizes[0], blob_sizes[1] + 1, blob_sizes[2]}; + std::array statuses_buf; + autovector statuses{&statuses_buf[0], &statuses_buf[1], + &statuses_buf[2]}; + std::array value_buf; + autovector values{&value_buf[0], &value_buf[1], + &value_buf[2]}; + reader->MultiGetBlob(read_options, key_refs, offsets, sizes, statuses, + values, &bytes_read); + for (size_t i = 0; i < num_blobs; ++i) { + if (i != 1) { + ASSERT_OK(statuses_buf[i]); + } else { + ASSERT_TRUE(statuses_buf[i].IsCorruption()); + } + } } } diff --git a/db/blob/blob_index.h b/db/blob/blob_index.h index 27927cb38..5bac36627 100644 --- a/db/blob/blob_index.h +++ b/db/blob/blob_index.h @@ -52,6 +52,9 @@ class BlobIndex { BlobIndex() : type_(Type::kUnknown) {} + BlobIndex(const BlobIndex&) = default; + BlobIndex& operator=(const BlobIndex&) = default; + bool IsInlined() const { return type_ == Type::kInlinedTTL; } bool HasTTL() const { diff --git a/db/blob/db_blob_basic_test.cc b/db/blob/db_blob_basic_test.cc index 23a54cad3..9a909f67d 100644 --- a/db/blob/db_blob_basic_test.cc +++ b/db/blob/db_blob_basic_test.cc @@ -127,6 +127,46 @@ TEST_F(DBBlobBasicTest, MultiGetBlobs) { } } +TEST_F(DBBlobBasicTest, MultiGetBlobsFromMultipleFiles) { + Options options = GetDefaultOptions(); + options.enable_blob_files = true; + options.min_blob_size = 0; + + Reopen(options); + + constexpr size_t kNumBlobFiles = 3; + constexpr size_t kNumBlobsPerFile = 3; + constexpr size_t kNumKeys = kNumBlobsPerFile * kNumBlobFiles; + + std::vector key_strs; + std::vector value_strs; + for (size_t i = 0; i < kNumBlobFiles; ++i) { + for (size_t j = 0; j < kNumBlobsPerFile; ++j) { + std::string key = "key" + std::to_string(i) + "_" + std::to_string(j); + std::string value = + "value_as_blob" + std::to_string(i) + "_" + std::to_string(j); + ASSERT_OK(Put(key, value)); + key_strs.push_back(key); + value_strs.push_back(value); + } + ASSERT_OK(Flush()); + } + assert(key_strs.size() == kNumKeys); + std::array keys; + for (size_t i = 0; i < keys.size(); ++i) { + keys[i] = key_strs[i]; + } + std::array values; + std::array statuses; + db_->MultiGet(ReadOptions(), db_->DefaultColumnFamily(), kNumKeys, &keys[0], + &values[0], &statuses[0]); + + for (size_t i = 0; i < kNumKeys; ++i) { + ASSERT_OK(statuses[i]); + ASSERT_EQ(value_strs[i], values[i]); + } +} + TEST_F(DBBlobBasicTest, GetBlob_CorruptIndex) { Options options = GetDefaultOptions(); options.enable_blob_files = true; @@ -150,6 +190,83 @@ TEST_F(DBBlobBasicTest, GetBlob_CorruptIndex) { .IsCorruption()); } +TEST_F(DBBlobBasicTest, MultiGetBlob_CorruptIndex) { + Options options = GetDefaultOptions(); + options.enable_blob_files = true; + options.min_blob_size = 0; + options.create_if_missing = true; + + DestroyAndReopen(options); + + constexpr size_t kNumOfKeys = 3; + std::array key_strs; + std::array value_strs; + std::array keys; + for (size_t i = 0; i < kNumOfKeys; ++i) { + key_strs[i] = "foo" + std::to_string(i); + value_strs[i] = "blob_value" + std::to_string(i); + ASSERT_OK(Put(key_strs[i], value_strs[i])); + keys[i] = key_strs[i]; + } + + constexpr char key[] = "key"; + { + // Fake a corrupt blob index. + const std::string blob_index("foobar"); + WriteBatch batch; + ASSERT_OK(WriteBatchInternal::PutBlobIndex(&batch, 0, key, blob_index)); + ASSERT_OK(db_->Write(WriteOptions(), &batch)); + keys[kNumOfKeys] = Slice(static_cast(key), sizeof(key) - 1); + } + + ASSERT_OK(Flush()); + + std::array values; + std::array statuses; + db_->MultiGet(ReadOptions(), dbfull()->DefaultColumnFamily(), kNumOfKeys + 1, + keys.data(), values.data(), statuses.data(), + /*sorted_input=*/false); + for (size_t i = 0; i < kNumOfKeys + 1; ++i) { + if (i != kNumOfKeys) { + ASSERT_OK(statuses[i]); + ASSERT_EQ("blob_value" + std::to_string(i), values[i]); + } else { + ASSERT_TRUE(statuses[i].IsCorruption()); + } + } +} + +TEST_F(DBBlobBasicTest, MultiGetBlob_ExceedSoftLimit) { + Options options = GetDefaultOptions(); + options.enable_blob_files = true; + options.min_blob_size = 0; + + Reopen(options); + + constexpr size_t kNumOfKeys = 3; + std::array key_bufs; + std::array value_bufs; + std::array keys; + for (size_t i = 0; i < kNumOfKeys; ++i) { + key_bufs[i] = "foo" + std::to_string(i); + value_bufs[i] = "blob_value" + std::to_string(i); + ASSERT_OK(Put(key_bufs[i], value_bufs[i])); + keys[i] = key_bufs[i]; + } + ASSERT_OK(Flush()); + + std::array values; + std::array statuses; + ReadOptions read_opts; + read_opts.value_size_soft_limit = 1; + db_->MultiGet(read_opts, dbfull()->DefaultColumnFamily(), kNumOfKeys, + keys.data(), values.data(), statuses.data(), + /*sorted_input=*/true); + for (const auto& s : statuses) { + ASSERT_TRUE(s.IsAborted()); + } +} + TEST_F(DBBlobBasicTest, GetBlob_InlinedTTLIndex) { constexpr uint64_t min_blob_size = 10; @@ -522,11 +639,21 @@ class DBBlobBasicIOErrorTest : public DBBlobBasicTest, std::string sync_point_; }; +class DBBlobBasicIOErrorMultiGetTest : public DBBlobBasicIOErrorTest { + public: + DBBlobBasicIOErrorMultiGetTest() : DBBlobBasicIOErrorTest() {} +}; + INSTANTIATE_TEST_CASE_P(DBBlobBasicTest, DBBlobBasicIOErrorTest, ::testing::ValuesIn(std::vector{ "BlobFileReader::OpenFile:NewRandomAccessFile", "BlobFileReader::GetBlob:ReadFromFile"})); +INSTANTIATE_TEST_CASE_P(DBBlobBasicTest, DBBlobBasicIOErrorMultiGetTest, + ::testing::ValuesIn(std::vector{ + "BlobFileReader::OpenFile:NewRandomAccessFile", + "BlobFileReader::MultiGetBlob:ReadFromFile"})); + TEST_P(DBBlobBasicIOErrorTest, GetBlob_IOError) { Options options; options.env = fault_injection_env_.get(); @@ -556,7 +683,7 @@ TEST_P(DBBlobBasicIOErrorTest, GetBlob_IOError) { SyncPoint::GetInstance()->ClearAllCallBacks(); } -TEST_P(DBBlobBasicIOErrorTest, MultiGetBlobs_IOError) { +TEST_P(DBBlobBasicIOErrorMultiGetTest, MultiGetBlobs_IOError) { Options options = GetDefaultOptions(); options.env = fault_injection_env_.get(); options.enable_blob_files = true; @@ -598,6 +725,53 @@ TEST_P(DBBlobBasicIOErrorTest, MultiGetBlobs_IOError) { ASSERT_TRUE(statuses[1].IsIOError()); } +TEST_P(DBBlobBasicIOErrorMultiGetTest, MultipleBlobFiles) { + Options options = GetDefaultOptions(); + options.env = fault_injection_env_.get(); + options.enable_blob_files = true; + options.min_blob_size = 0; + + Reopen(options); + + constexpr size_t num_keys = 2; + + constexpr char key1[] = "key1"; + constexpr char value1[] = "blob1"; + + ASSERT_OK(Put(key1, value1)); + ASSERT_OK(Flush()); + + constexpr char key2[] = "key2"; + constexpr char value2[] = "blob2"; + + ASSERT_OK(Put(key2, value2)); + ASSERT_OK(Flush()); + + std::array keys{{key1, key2}}; + std::array values; + std::array statuses; + + bool first_blob_file = true; + SyncPoint::GetInstance()->SetCallBack( + sync_point_, [&first_blob_file, this](void* /* arg */) { + if (first_blob_file) { + first_blob_file = false; + return; + } + fault_injection_env_->SetFilesystemActive(false, + Status::IOError(sync_point_)); + }); + SyncPoint::GetInstance()->EnableProcessing(); + + db_->MultiGet(ReadOptions(), db_->DefaultColumnFamily(), num_keys, + keys.data(), values.data(), statuses.data()); + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + ASSERT_OK(statuses[0]); + ASSERT_EQ(value1, values[0]); + ASSERT_TRUE(statuses[1].IsIOError()); +} + namespace { class ReadBlobCompactionFilter : public CompactionFilter { diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index a3d5bb19b..bac5e6c7f 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -2162,7 +2162,7 @@ bool DBImpl::MultiCFSnapshot( // consecutive retries, it means the write rate is very high. In that case // its probably ok to take the mutex on the 3rd try so we can succeed for // sure - static const int num_retries = 3; + constexpr int num_retries = 3; for (int i = 0; i < num_retries; ++i) { last_try = (i == num_retries - 1); bool retry = false; @@ -2192,8 +2192,9 @@ bool DBImpl::MultiCFSnapshot( *snapshot = versions_->LastPublishedSequence(); } } else { - *snapshot = reinterpret_cast(read_options.snapshot) - ->number_; + *snapshot = + static_cast_with_check(read_options.snapshot) + ->number_; } for (auto cf_iter = cf_list->begin(); cf_iter != cf_list->end(); ++cf_iter) { @@ -2394,17 +2395,9 @@ void DBImpl::PrepareMultiGetKeys( autovector* sorted_keys) { if (sorted_input) { #ifndef NDEBUG - CompareKeyContext key_context_less; - - for (size_t index = 1; index < sorted_keys->size(); ++index) { - const KeyContext* const lhs = (*sorted_keys)[index - 1]; - const KeyContext* const rhs = (*sorted_keys)[index]; - - // lhs should be <= rhs, or in other words, rhs should NOT be < lhs - assert(!key_context_less(rhs, lhs)); - } + assert(std::is_sorted(sorted_keys->begin(), sorted_keys->end(), + CompareKeyContext())); #endif - return; } diff --git a/db/version_set.cc b/db/version_set.cc index 63f3082aa..d8d34fd48 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -25,6 +25,7 @@ #include "db/blob/blob_file_cache.h" #include "db/blob/blob_file_reader.h" #include "db/blob/blob_index.h" +#include "db/blob/blob_log_format.h" #include "db/internal_stats.h" #include "db/log_reader.h" #include "db/log_writer.h" @@ -1863,6 +1864,112 @@ Status Version::GetBlob(const ReadOptions& read_options, const Slice& user_key, return s; } +void Version::MultiGetBlob( + const ReadOptions& read_options, MultiGetRange& range, + const std::unordered_map& blob_rqs) { + if (read_options.read_tier == kBlockCacheTier) { + Status s = Status::Incomplete("Cannot read blob(s): no disk I/O allowed"); + for (const auto& elem : blob_rqs) { + for (const auto& blob_rq : elem.second) { + const KeyContext& key_context = blob_rq.second; + assert(key_context.s); + assert(key_context.s->ok()); + *(key_context.s) = s; + assert(key_context.get_context); + auto& get_context = *(key_context.get_context); + get_context.MarkKeyMayExist(); + } + } + return; + } + + assert(!blob_rqs.empty()); + Status status; + const auto& blob_files = storage_info_.GetBlobFiles(); + for (auto& elem : blob_rqs) { + uint64_t blob_file_number = elem.first; + if (blob_files.find(blob_file_number) == blob_files.end()) { + auto& blobs_in_file = elem.second; + for (const auto& blob : blobs_in_file) { + const KeyContext& key_context = blob.second; + *(key_context.s) = Status::Corruption("Invalid blob file number"); + } + continue; + } + CacheHandleGuard blob_file_reader; + assert(blob_file_cache_); + status = blob_file_cache_->GetBlobFileReader(blob_file_number, + &blob_file_reader); + assert(!status.ok() || blob_file_reader.GetValue()); + + auto& blobs_in_file = elem.second; + if (!status.ok()) { + for (const auto& blob : blobs_in_file) { + const KeyContext& key_context = blob.second; + *(key_context.s) = status; + } + continue; + } + + assert(blob_file_reader.GetValue()); + const uint64_t file_size = blob_file_reader.GetValue()->GetFileSize(); + const CompressionType compression = + blob_file_reader.GetValue()->GetCompressionType(); + + // TODO: sort blobs_in_file by file offset. + autovector> blob_read_key_contexts; + autovector> user_keys; + autovector offsets; + autovector value_sizes; + autovector statuses; + autovector values; + for (const auto& blob : blobs_in_file) { + const auto& blob_index = blob.first; + const KeyContext& key_context = blob.second; + if (blob_index.HasTTL() || blob_index.IsInlined()) { + *(key_context.s) = + Status::Corruption("Unexpected TTL/inlined blob index"); + continue; + } + const uint64_t key_size = key_context.ukey_with_ts.size(); + const uint64_t offset = blob_index.offset(); + const uint64_t value_size = blob_index.size(); + if (!IsValidBlobOffset(offset, key_size, value_size, file_size)) { + *(key_context.s) = Status::Corruption("Invalid blob offset"); + continue; + } + if (blob_index.compression() != compression) { + *(key_context.s) = + Status::Corruption("Compression type mismatch when reading a blob"); + continue; + } + blob_read_key_contexts.emplace_back(std::cref(key_context)); + user_keys.emplace_back(std::cref(key_context.ukey_with_ts)); + offsets.push_back(blob_index.offset()); + value_sizes.push_back(blob_index.size()); + statuses.push_back(key_context.s); + values.push_back(key_context.value); + } + blob_file_reader.GetValue()->MultiGetBlob(read_options, user_keys, offsets, + value_sizes, statuses, values, + /*bytes_read=*/nullptr); + size_t num = blob_read_key_contexts.size(); + assert(num == user_keys.size()); + assert(num == offsets.size()); + assert(num == value_sizes.size()); + assert(num == statuses.size()); + assert(num == values.size()); + for (size_t i = 0; i < num; ++i) { + if (statuses[i]->ok()) { + range.AddValueSize(blob_read_key_contexts[i].get().value->size()); + if (range.GetValueSize() > read_options.value_size_soft_limit) { + *(blob_read_key_contexts[i].get().s) = Status::Aborted(); + } + } + } + } +} + void Version::Get(const ReadOptions& read_options, const LookupKey& k, PinnableSlice* value, std::string* timestamp, Status* status, MergeContext* merge_context, @@ -2085,6 +2192,10 @@ void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range, uint64_t num_data_read = 0; uint64_t num_sst_read = 0; + MultiGetRange keys_with_blobs_range(*range, range->begin(), range->end()); + // blob_file => [[blob_idx, it], ...] + std::unordered_map blob_rqs; + while (f != nullptr) { MultiGetRange file_range = fp.CurrentFileRange(); bool timer_enabled = @@ -2170,24 +2281,24 @@ void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range, if (iter->is_blob_index) { if (iter->value) { - constexpr uint64_t* bytes_read = nullptr; - - *status = GetBlob(read_options, iter->ukey_with_ts, *iter->value, - iter->value, bytes_read); - if (!status->ok()) { - if (status->IsIncomplete()) { - get_context.MarkKeyMayExist(); - } - - continue; + const Slice& blob_index_slice = *(iter->value); + BlobIndex blob_index; + Status tmp_s = blob_index.DecodeFrom(blob_index_slice); + if (tmp_s.ok()) { + const uint64_t blob_file_num = blob_index.file_number(); + blob_rqs[blob_file_num].emplace_back( + std::make_pair(blob_index, std::cref(*iter))); + } else { + *(iter->s) = tmp_s; } } - } - - file_range.AddValueSize(iter->value->size()); - if (file_range.GetValueSize() > read_options.value_size_soft_limit) { - s = Status::Aborted(); - break; + } else { + file_range.AddValueSize(iter->value->size()); + if (file_range.GetValueSize() > + read_options.value_size_soft_limit) { + s = Status::Aborted(); + break; + } } continue; case GetContext::kDeleted: @@ -2233,6 +2344,10 @@ void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range, f = fp.GetNextFile(); } + if (s.ok() && !blob_rqs.empty()) { + MultiGetBlob(read_options, keys_with_blobs_range, blob_rqs); + } + // Process any left over keys for (auto iter = range->begin(); s.ok() && iter != range->end(); ++iter) { GetContext& get_context = *iter->get_context; diff --git a/db/version_set.h b/db/version_set.h index c15bbd8f3..86948a042 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -713,6 +713,12 @@ class Version { const BlobIndex& blob_index, PinnableSlice* value, uint64_t* bytes_read) const; + using BlobReadRequests = std::vector< + std::pair>>; + void MultiGetBlob( + const ReadOptions& read_options, MultiGetRange& range, + const std::unordered_map& blob_rqs); + // Loads some stats information from files. Call without mutex held. It needs // to be called before applying the version to the version set. void PrepareApply(const MutableCFOptions& mutable_cf_options, diff --git a/table/multiget_context.h b/table/multiget_context.h index 3872353d2..3d1ce72bc 100644 --- a/table/multiget_context.h +++ b/table/multiget_context.h @@ -97,6 +97,8 @@ class MultiGetContext { // that need to be performed static const int MAX_BATCH_SIZE = 32; + static_assert(MAX_BATCH_SIZE < 64, "MAX_BATCH_SIZE cannot exceed 63"); + MultiGetContext(autovector* sorted_keys, size_t begin, size_t num_keys, SequenceNumber snapshot, const ReadOptions& read_opts) @@ -104,6 +106,7 @@ class MultiGetContext { value_mask_(0), value_size_(0), lookup_key_ptr_(reinterpret_cast(lookup_key_stack_buf)) { + assert(num_keys <= MAX_BATCH_SIZE); if (num_keys > MAX_LOOKUP_KEYS_ON_STACK) { lookup_key_heap_buf.reset(new char[sizeof(LookupKey) * num_keys]); lookup_key_ptr_ = reinterpret_cast( @@ -236,6 +239,10 @@ class MultiGetContext { skip_mask_ |= uint64_t{1} << iter.index_; } + bool IsKeySkipped(const Iterator& iter) const { + return skip_mask_ & (uint64_t{1} << iter.index_); + } + // Update the value_mask_ in MultiGetContext so its // immediately reflected in all the Range Iterators void MarkKeyDone(Iterator& iter) {