diff --git a/HISTORY.md b/HISTORY.md index 45f03f04e..fd9ef0354 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -5,6 +5,7 @@ ### Bug Fixes * Truncated WALs ending in incomplete records can no longer produce gaps in the recovered data when `WALRecoveryMode::kPointInTimeRecovery` is used. Gaps are still possible when WALs are truncated exactly on record boundaries; for complete protection, users should enable `track_and_verify_wals_in_manifest`. +* Fix a bug where compressed blocks read by MultiGet are not inserted into the compressed block cache when use_direct_reads = true. ### Bug Fixes * Fixed the logic of populating native data structure for `read_amp_bytes_per_bit` during OPTIONS file parsing on big-endian architecture. Without this fix, original code introduced in PR7659, when running on big-endian machine, can mistakenly store read_amp_bytes_per_bit (an uint32) in little endian format. Future access to `read_amp_bytes_per_bit` will give wrong values. Little endian architecture is not affected. diff --git a/db/db_basic_test.cc b/db/db_basic_test.cc index 37ae61cd8..691871663 100644 --- a/db/db_basic_test.cc +++ b/db/db_basic_test.cc @@ -2693,6 +2693,7 @@ class DBBasicTestMultiGet : public DBTestBase { } else { options.compression_opts.parallel_threads = compression_parallel_threads; } + options_ = options; Reopen(options); if (num_cfs > 1) { @@ -2762,6 +2763,7 @@ class DBBasicTestMultiGet : public DBTestBase { bool compression_enabled() { return compression_enabled_; } bool has_compressed_cache() { return compressed_cache_ != nullptr; } bool has_uncompressed_cache() { return uncompressed_cache_ != nullptr; } + Options get_options() { return options_; } static void SetUpTestCase() {} static void TearDownTestCase() {} @@ -2847,6 +2849,7 @@ class DBBasicTestMultiGet : public DBTestBase { std::shared_ptr compressed_cache_; std::shared_ptr uncompressed_cache_; + Options options_; bool compression_enabled_; std::vector values_; std::vector uncompressable_values_; @@ -2989,6 +2992,123 @@ TEST_P(DBBasicTestWithParallelIO, MultiGet) { } } +#ifndef ROCKSDB_LITE +TEST_P(DBBasicTestWithParallelIO, MultiGetDirectIO) { + class FakeDirectIOEnv : public EnvWrapper { + class FakeDirectIOSequentialFile; + class FakeDirectIORandomAccessFile; + + public: + FakeDirectIOEnv(Env* env) : EnvWrapper(env) {} + + Status NewRandomAccessFile(const std::string& fname, + std::unique_ptr* result, + const EnvOptions& options) override { + std::unique_ptr file; + assert(options.use_direct_reads); + EnvOptions opts = options; + opts.use_direct_reads = false; + Status s = target()->NewRandomAccessFile(fname, &file, opts); + if (!s.ok()) { + return s; + } + result->reset(new FakeDirectIORandomAccessFile(std::move(file))); + return s; + } + + private: + class FakeDirectIOSequentialFile : public SequentialFileWrapper { + public: + FakeDirectIOSequentialFile(std::unique_ptr&& file) + : SequentialFileWrapper(file.get()), file_(std::move(file)) {} + ~FakeDirectIOSequentialFile() {} + + bool use_direct_io() const override { return true; } + size_t GetRequiredBufferAlignment() const override { return 1; } + + private: + std::unique_ptr file_; + }; + + class FakeDirectIORandomAccessFile : public RandomAccessFileWrapper { + public: + FakeDirectIORandomAccessFile(std::unique_ptr&& file) + : RandomAccessFileWrapper(file.get()), file_(std::move(file)) {} + ~FakeDirectIORandomAccessFile() {} + + bool use_direct_io() const override { return true; } + size_t GetRequiredBufferAlignment() const override { return 1; } + + private: + std::unique_ptr file_; + }; + }; + + std::unique_ptr env(new FakeDirectIOEnv(env_)); + Options opts = get_options(); + opts.env = env.get(); + opts.use_direct_reads = true; + Reopen(opts); + + std::vector key_data(10); + std::vector keys; + // We cannot resize a PinnableSlice vector, so just set initial size to + // largest we think we will need + std::vector values(10); + std::vector statuses; + ReadOptions ro; + ro.fill_cache = fill_cache(); + + // Warm up the cache first + key_data.emplace_back(Key(0)); + keys.emplace_back(Slice(key_data.back())); + key_data.emplace_back(Key(50)); + keys.emplace_back(Slice(key_data.back())); + statuses.resize(keys.size()); + + dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(), + keys.data(), values.data(), statuses.data(), true); + ASSERT_TRUE(CheckValue(0, values[0].ToString())); + ASSERT_TRUE(CheckValue(50, values[1].ToString())); + + int random_reads = env_->random_read_counter_.Read(); + key_data[0] = Key(1); + key_data[1] = Key(51); + keys[0] = Slice(key_data[0]); + keys[1] = Slice(key_data[1]); + values[0].Reset(); + values[1].Reset(); + if (uncompressed_cache_) { + uncompressed_cache_->SetCapacity(0); + uncompressed_cache_->SetCapacity(1048576); + } + dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(), + keys.data(), values.data(), statuses.data(), true); + ASSERT_TRUE(CheckValue(1, values[0].ToString())); + ASSERT_TRUE(CheckValue(51, values[1].ToString())); + + bool read_from_cache = false; + if (fill_cache()) { + if (has_uncompressed_cache()) { + read_from_cache = true; + } else if (has_compressed_cache() && compression_enabled()) { + read_from_cache = true; + } + } + + int expected_reads = random_reads; + if (!compression_enabled() || !has_compressed_cache()) { + expected_reads += 2; + } else { + expected_reads += (read_from_cache ? 0 : 2); + } + if (env_->random_read_counter_.Read() != expected_reads) { + ASSERT_EQ(env_->random_read_counter_.Read(), expected_reads); + } + Close(); +} +#endif // ROCKSDB_LITE + TEST_P(DBBasicTestWithParallelIO, MultiGetWithChecksumMismatch) { std::vector key_data(10); std::vector keys; diff --git a/table/block_based/block_based_table_reader.cc b/table/block_based/block_based_table_reader.cc index 58e7b91de..628f1026d 100644 --- a/table/block_based/block_based_table_reader.cc +++ b/table/block_based/block_based_table_reader.cc @@ -1821,13 +1821,21 @@ void BlockBasedTable::RetrieveMultipleBlocks( if (s.ok()) { // When the blocks share the same underlying buffer (scratch or direct io - // buffer), if the block is compressed, the shared buffer will be - // uncompressed into heap during uncompressing; otherwise, we need to - // manually copy the block into heap before inserting the block to block - // cache. + // buffer), we may need to manually copy the block into heap if the raw + // block has to be inserted into a cache. That falls into th following + // cases - + // 1. Raw block is not compressed, it needs to be inserted into the + // uncompressed block cache if there is one + // 2. If the raw block is compressed, it needs to be inserted into the + // compressed block cache if there is one + // + // In all other cases, the raw block is either uncompressed into a heap + // buffer or there is no cache at all. CompressionType compression_type = raw_block_contents.get_compression_type(); - if (use_shared_buffer && compression_type == kNoCompression) { + if (use_shared_buffer && (compression_type == kNoCompression || + (compression_type != kNoCompression && + rep_->table_options.block_cache_compressed))) { Slice raw = Slice(req.result.data() + req_offset, block_size(handle)); raw_block_contents = BlockContents( CopyBufferToHeap(GetMemoryAllocator(rep_->table_options), raw),