diff --git a/HISTORY.md b/HISTORY.md index 0f5c3a3de..a00290102 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -1,6 +1,7 @@ # Rocksdb Change Log ## Unreleased ### Bug Fixes +* Fixes a bug in directed IO mode when calling MultiGet() for blobs in the same blob file. The bug is caused by not sorting the blob read requests by file offsets. ### New Features ### Public API change * Made SystemClock extend the Customizable class and added a CreateFromString method. Implementations need to be registered with the ObjectRegistry and to implement a Name() method in order to be created via this method. diff --git a/db/blob/db_blob_basic_test.cc b/db/blob/db_blob_basic_test.cc index 9a909f67d..56b76ce4c 100644 --- a/db/blob/db_blob_basic_test.cc +++ b/db/blob/db_blob_basic_test.cc @@ -127,6 +127,197 @@ TEST_F(DBBlobBasicTest, MultiGetBlobs) { } } +#ifndef ROCKSDB_LITE +TEST_F(DBBlobBasicTest, MultiGetWithDirectIO) { + Options options = GetDefaultOptions(); + + // First, create an external SST file ["b"]. + const std::string file_path = dbname_ + "/test.sst"; + { + SstFileWriter sst_file_writer(EnvOptions(), GetDefaultOptions()); + Status s = sst_file_writer.Open(file_path); + ASSERT_OK(s); + ASSERT_OK(sst_file_writer.Put("b", "b_value")); + ASSERT_OK(sst_file_writer.Finish()); + } + + options.enable_blob_files = true; + options.min_blob_size = 1000; + options.use_direct_reads = true; + options.allow_ingest_behind = true; + + // Open DB with fixed-prefix sst-partitioner so that compaction will cut + // new table file when encountering a new key whose 1-byte prefix changes. + constexpr size_t key_len = 1; + options.sst_partitioner_factory = + NewSstPartitionerFixedPrefixFactory(key_len); + + Status s = TryReopen(options); + if (s.IsInvalidArgument()) { + ROCKSDB_GTEST_SKIP("This test requires direct IO support"); + return; + } + ASSERT_OK(s); + + constexpr size_t num_keys = 3; + constexpr size_t blob_size = 3000; + + constexpr char first_key[] = "a"; + const std::string first_blob(blob_size, 'a'); + ASSERT_OK(Put(first_key, first_blob)); + + constexpr char second_key[] = "b"; + const std::string second_blob(2 * blob_size, 'b'); + ASSERT_OK(Put(second_key, second_blob)); + + constexpr char third_key[] = "d"; + const std::string third_blob(blob_size, 'd'); + ASSERT_OK(Put(third_key, third_blob)); + + // first_blob, second_blob and third_blob in the same blob file. + // SST Blob file + // L0 ["a", "b", "d"] |'aaaa', 'bbbb', 'dddd'| + // | | | ^ ^ ^ + // | | | | | | + // | | +---------|-------|--------+ + // | +-----------------|-------+ + // +-------------------------+ + ASSERT_OK(Flush()); + + constexpr char fourth_key[] = "c"; + const std::string fourth_blob(blob_size, 'c'); + ASSERT_OK(Put(fourth_key, fourth_blob)); + // fourth_blob in another blob file. + // SST Blob file SST Blob file + // L0 ["a", "b", "d"] |'aaaa', 'bbbb', 'dddd'| ["c"] |'cccc'| + // | | | ^ ^ ^ | ^ + // | | | | | | | | + // | | +---------|-------|--------+ +-------+ + // | +-----------------|-------+ + // +-------------------------+ + ASSERT_OK(Flush()); + + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), /*begin=*/nullptr, + /*end=*/nullptr)); + + // Due to the above sst partitioner, we get 4 L1 files. The blob files are + // unchanged. + // |'aaaa', 'bbbb', 'dddd'| |'cccc'| + // ^ ^ ^ ^ + // | | | | + // L0 | | | | + // L1 ["a"] ["b"] ["c"] | | ["d"] | + // | | | | | | + // | | +---------|-------|---------------+ + // | +-----------------|-------+ + // +-------------------------+ + ASSERT_EQ(4, NumTableFilesAtLevel(/*level=*/1)); + + { + // Ingest the external SST file into bottommost level. + std::vector ext_files{file_path}; + IngestExternalFileOptions opts; + opts.ingest_behind = true; + ASSERT_OK( + db_->IngestExternalFile(db_->DefaultColumnFamily(), ext_files, opts)); + } + + // Now the database becomes as follows. + // |'aaaa', 'bbbb', 'dddd'| |'cccc'| + // ^ ^ ^ ^ + // | | | | + // L0 | | | | + // L1 ["a"] ["b"] ["c"] | | ["d"] | + // | | | | | | + // | | +---------|-------|---------------+ + // | +-----------------|-------+ + // +-------------------------+ + // + // L6 ["b"] + + { + // Compact ["b"] to bottommost level. + Slice begin = Slice(second_key); + Slice end = Slice(second_key); + CompactRangeOptions cro; + cro.bottommost_level_compaction = BottommostLevelCompaction::kForce; + ASSERT_OK(db_->CompactRange(cro, &begin, &end)); + } + + // |'aaaa', 'bbbb', 'dddd'| |'cccc'| + // ^ ^ ^ ^ + // | | | | + // L0 | | | | + // L1 ["a"] ["c"] | | ["d"] | + // | | | | | + // | +---------|-------|---------------+ + // | +-----------------|-------+ + // +-------|-----------------+ + // | + // L6 ["b"] + ASSERT_EQ(3, NumTableFilesAtLevel(/*level=*/1)); + ASSERT_EQ(1, NumTableFilesAtLevel(/*level=*/6)); + + bool called = false; + SyncPoint::GetInstance()->ClearAllCallBacks(); + SyncPoint::GetInstance()->SetCallBack( + "RandomAccessFileReader::MultiRead:AlignedReqs", [&](void* arg) { + auto* aligned_reqs = static_cast*>(arg); + assert(aligned_reqs); + ASSERT_EQ(1, aligned_reqs->size()); + called = true; + }); + SyncPoint::GetInstance()->EnableProcessing(); + + std::array keys{{first_key, third_key, second_key}}; + + { + std::array values; + std::array statuses; + + // The MultiGet(), when constructing the KeyContexts, will process the keys + // in such order: a, d, b. The reason is that ["a"] and ["d"] are in L1, + // while ["b"] resides in L6. + // Consequently, the original FSReadRequest list prepared by + // Version::MultiGetblob() will be for "a", "d" and "b". It is unsorted as + // follows: + // + // ["a", offset=30, len=3033], + // ["d", offset=9096, len=3033], + // ["b", offset=3063, len=6033] + // + // If we do not sort them before calling MultiRead() in DirectIO, then the + // underlying IO merging logic will yield two requests. + // + // [offset=0, len=4096] (for "a") + // [offset=0, len=12288] (result of merging the request for "d" and "b") + // + // We need to sort them in Version::MultiGetBlob() so that the underlying + // IO merging logic in DirectIO mode works as expected. The correct + // behavior will be one aligned request: + // + // [offset=0, len=12288] + + db_->MultiGet(ReadOptions(), db_->DefaultColumnFamily(), num_keys, &keys[0], + &values[0], &statuses[0]); + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + + ASSERT_TRUE(called); + + ASSERT_OK(statuses[0]); + ASSERT_EQ(values[0], first_blob); + + ASSERT_OK(statuses[1]); + ASSERT_EQ(values[1], third_blob); + + ASSERT_OK(statuses[2]); + ASSERT_EQ(values[2], second_blob); + } +} +#endif // !ROCKSDB_LITE + TEST_F(DBBlobBasicTest, MultiGetBlobsFromMultipleFiles) { Options options = GetDefaultOptions(); options.enable_blob_files = true; diff --git a/db/version_set.cc b/db/version_set.cc index d8d34fd48..7d0a5702d 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -1866,7 +1866,7 @@ Status Version::GetBlob(const ReadOptions& read_options, const Slice& user_key, void Version::MultiGetBlob( const ReadOptions& read_options, MultiGetRange& range, - const std::unordered_map& blob_rqs) { + std::unordered_map& blob_rqs) { if (read_options.read_tier == kBlockCacheTier) { Status s = Status::Incomplete("Cannot read blob(s): no disk I/O allowed"); for (const auto& elem : blob_rqs) { @@ -1916,7 +1916,14 @@ void Version::MultiGetBlob( const CompressionType compression = blob_file_reader.GetValue()->GetCompressionType(); - // TODO: sort blobs_in_file by file offset. + // sort blobs_in_file by file offset. + std::sort( + blobs_in_file.begin(), blobs_in_file.end(), + [](const BlobReadRequest& lhs, const BlobReadRequest& rhs) -> bool { + assert(lhs.first.file_number() == rhs.first.file_number()); + return lhs.first.offset() < rhs.first.offset(); + }); + autovector> blob_read_key_contexts; autovector> user_keys; autovector offsets; diff --git a/db/version_set.h b/db/version_set.h index 86948a042..4abb96f49 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -713,11 +713,11 @@ class Version { const BlobIndex& blob_index, PinnableSlice* value, uint64_t* bytes_read) const; - using BlobReadRequests = std::vector< - std::pair>>; - void MultiGetBlob( - const ReadOptions& read_options, MultiGetRange& range, - const std::unordered_map& blob_rqs); + using BlobReadRequest = + std::pair>; + using BlobReadRequests = std::vector; + void MultiGetBlob(const ReadOptions& read_options, MultiGetRange& range, + std::unordered_map& blob_rqs); // Loads some stats information from files. Call without mutex held. It needs // to be called before applying the version to the version set.