2db6a4a1d6
Summary: The RocksDB iterator is a hierarchy of iterators. MergingIterator maintains a heap of LevelIterators, one for each L0 file and for each non-zero level. The Seek() operation naturally lends itself to parallelization, as it involves positioning every LevelIterator on the correct data block in the correct SST file. It lookups a level for a target key, to find the first key that's >= the target key. This typically involves reading one data block that is likely to contain the target key, and scan forward to find the first valid key. The forward scan may read more data blocks. In order to find the right data block, the iterator may read some metadata blocks (required for opening a file and searching the index). This flow can be parallelized. Design: Seek will be called two times under async_io option. First seek will send asynchronous request to prefetch the data blocks at each level and second seek will follow the normal flow and in FilePrefetchBuffer::TryReadFromCacheAsync it will wait for the Poll() to get the results and add the iterator to min_heap. - Status::TryAgain is passed down from FilePrefetchBuffer::PrefetchAsync to block_iter_.Status indicating asynchronous request has been submitted. - If for some reason asynchronous request returns error in submitting the request, it will fallback to sequential reading of blocks in one pass. - If the data already exists in prefetch_buffer, it will return the data without prefetching further and it will be treated as single pass of seek. Pull Request resolved: https://github.com/facebook/rocksdb/pull/9994 Test Plan: - **Run Regressions.** ``` ./db_bench -db=/tmp/prefix_scan_prefetch_main -benchmarks="fillseq" -key_size=32 -value_size=512 -num=5000000 -use_direct_io_for_flush_and_compaction=true -target_file_size_base=16777216 ``` i) Previous release 7.0 run for normal prefetching with async_io disabled: ``` ./db_bench -use_existing_db=true -db=/tmp/prefix_scan_prefetch_main -benchmarks="seekrandom" -key_size=32 -value_size=512 -num=5000000 -use_direct_reads=true -seek_nexts=327680 -duration=120 -ops_between_duration_checks=1 Initializing RocksDB Options from the specified file Initializing RocksDB Options from command-line flags RocksDB: version 7.0 Date: Thu Mar 17 13:11:34 2022 CPU: 24 * Intel Core Processor (Broadwell) CPUCache: 16384 KB Keys: 32 bytes each (+ 0 bytes user-defined timestamp) Values: 512 bytes each (256 bytes after compression) Entries: 5000000 Prefix: 0 bytes Keys per prefix: 0 RawSize: 2594.0 MB (estimated) FileSize: 1373.3 MB (estimated) Write rate: 0 bytes/second Read rate: 0 ops/second Compression: Snappy Compression sampling rate: 0 Memtablerep: SkipListFactory Perf Level: 1 ------------------------------------------------ DB path: [/tmp/prefix_scan_prefetch_main] seekrandom : 483618.390 micros/op 2 ops/sec; 338.9 MB/s (249 of 249 found) ``` ii) normal prefetching after changes with async_io disable: ``` ./db_bench -use_existing_db=true -db=/tmp/prefix_scan_prefetch_main -benchmarks="seekrandom" -key_size=32 -value_size=512 -num=5000000 -use_direct_reads=true -seek_nexts=327680 -duration=120 -ops_between_duration_checks=1 Set seed to 1652922591315307 because --seed was 0 Initializing RocksDB Options from the specified file Initializing RocksDB Options from command-line flags RocksDB: version 7.3 Date: Wed May 18 18:09:51 2022 CPU: 32 * Intel Xeon Processor (Skylake) CPUCache: 16384 KB Keys: 32 bytes each (+ 0 bytes user-defined timestamp) Values: 512 bytes each (256 bytes after compression) Entries: 5000000 Prefix: 0 bytes Keys per prefix: 0 RawSize: 2594.0 MB (estimated) FileSize: 1373.3 MB (estimated) Write rate: 0 bytes/second Read rate: 0 ops/second Compression: Snappy Compression sampling rate: 0 Memtablerep: SkipListFactory Perf Level: 1 ------------------------------------------------ DB path: [/tmp/prefix_scan_prefetch_main] seekrandom : 483080.466 micros/op 2 ops/sec 120.287 seconds 249 operations; 340.8 MB/s (249 of 249 found) ``` iii) db_bench with async_io enabled completed succesfully ``` ./db_bench -use_existing_db=true -db=/tmp/prefix_scan_prefetch_main -benchmarks="seekrandom" -key_size=32 -value_size=512 -num=5000000 -use_direct_reads=true -seek_nexts=327680 -duration=120 -ops_between_duration_checks=1 -async_io=1 -adaptive_readahead=1 Set seed to 1652924062021732 because --seed was 0 Initializing RocksDB Options from the specified file Initializing RocksDB Options from command-line flags RocksDB: version 7.3 Date: Wed May 18 18:34:22 2022 CPU: 32 * Intel Xeon Processor (Skylake) CPUCache: 16384 KB Keys: 32 bytes each (+ 0 bytes user-defined timestamp) Values: 512 bytes each (256 bytes after compression) Entries: 5000000 Prefix: 0 bytes Keys per prefix: 0 RawSize: 2594.0 MB (estimated) FileSize: 1373.3 MB (estimated) Write rate: 0 bytes/second Read rate: 0 ops/second Compression: Snappy Compression sampling rate: 0 Memtablerep: SkipListFactory Perf Level: 1 ------------------------------------------------ DB path: [/tmp/prefix_scan_prefetch_main] seekrandom : 553913.576 micros/op 1 ops/sec 120.199 seconds 217 operations; 293.6 MB/s (217 of 217 found) ``` - db_stress with async_io disabled completed succesfully ``` export CRASH_TEST_EXT_ARGS=" --async_io=0" make crash_test -j ``` I**n Progress**: db_stress with async_io is failing and working on debugging/fixing it. Reviewed By: anand1976 Differential Revision: D36459323 Pulled By: akankshamahajan15 fbshipit-source-id: abb1cd944abe712bae3986ae5b16704b3338917c
143 lines
5.5 KiB
C++
143 lines
5.5 KiB
C++
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
|
// This source code is licensed under both the GPLv2 (found in the
|
|
// COPYING file in the root directory) and Apache 2.0 License
|
|
// (found in the LICENSE.Apache file in the root directory).
|
|
//
|
|
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
|
|
// Use of this source code is governed by a BSD-style license that can be
|
|
// found in the LICENSE file. See the AUTHORS file for names of contributors.
|
|
|
|
#pragma once
|
|
#include "memory/memory_allocator.h"
|
|
#include "table/block_based/block.h"
|
|
#include "table/block_based/block_type.h"
|
|
#include "table/format.h"
|
|
#include "table/persistent_cache_options.h"
|
|
|
|
namespace ROCKSDB_NAMESPACE {
|
|
|
|
// Retrieves a single block of a given file. Utilizes the prefetch buffer and/or
|
|
// persistent cache provided (if any) to try to avoid reading from the file
|
|
// directly. Note that both the prefetch buffer and the persistent cache are
|
|
// optional; also, note that the persistent cache may be configured to store either
|
|
// compressed or uncompressed blocks.
|
|
//
|
|
// If the retrieved block is compressed and the do_uncompress flag is set,
|
|
// BlockFetcher uncompresses the block (using the uncompression dictionary,
|
|
// if provided, to prime the compression algorithm), and returns the resulting
|
|
// uncompressed block data. Otherwise, it returns the original block.
|
|
//
|
|
// Two read options affect the behavior of BlockFetcher: if verify_checksums is
|
|
// true, the checksum of the (original) block is checked; if fill_cache is true,
|
|
// the block is added to the persistent cache if needed.
|
|
//
|
|
// Memory for uncompressed and compressed blocks is allocated as needed
|
|
// using memory_allocator and memory_allocator_compressed, respectively
|
|
// (if provided; otherwise, the default allocator is used).
|
|
|
|
class BlockFetcher {
|
|
public:
|
|
BlockFetcher(RandomAccessFileReader* file,
|
|
FilePrefetchBuffer* prefetch_buffer,
|
|
const Footer& footer /* ref retained */,
|
|
const ReadOptions& read_options,
|
|
const BlockHandle& handle /* ref retained */,
|
|
BlockContents* contents,
|
|
const ImmutableOptions& ioptions /* ref retained */,
|
|
bool do_uncompress, bool maybe_compressed, BlockType block_type,
|
|
const UncompressionDict& uncompression_dict /* ref retained */,
|
|
const PersistentCacheOptions& cache_options /* ref retained */,
|
|
MemoryAllocator* memory_allocator = nullptr,
|
|
MemoryAllocator* memory_allocator_compressed = nullptr,
|
|
bool for_compaction = false)
|
|
: file_(file),
|
|
prefetch_buffer_(prefetch_buffer),
|
|
footer_(footer),
|
|
read_options_(read_options),
|
|
handle_(handle),
|
|
contents_(contents),
|
|
ioptions_(ioptions),
|
|
do_uncompress_(do_uncompress),
|
|
maybe_compressed_(maybe_compressed),
|
|
block_type_(block_type),
|
|
block_size_(static_cast<size_t>(handle_.size())),
|
|
block_size_with_trailer_(block_size_ + footer.GetBlockTrailerSize()),
|
|
uncompression_dict_(uncompression_dict),
|
|
cache_options_(cache_options),
|
|
memory_allocator_(memory_allocator),
|
|
memory_allocator_compressed_(memory_allocator_compressed),
|
|
for_compaction_(for_compaction) {
|
|
io_status_.PermitUncheckedError(); // TODO(AR) can we improve on this?
|
|
}
|
|
|
|
IOStatus ReadBlockContents();
|
|
IOStatus ReadAsyncBlockContents();
|
|
|
|
inline CompressionType get_compression_type() const {
|
|
return compression_type_;
|
|
}
|
|
inline size_t GetBlockSizeWithTrailer() const {
|
|
return block_size_with_trailer_;
|
|
}
|
|
|
|
#ifndef NDEBUG
|
|
int TEST_GetNumStackBufMemcpy() const { return num_stack_buf_memcpy_; }
|
|
int TEST_GetNumHeapBufMemcpy() const { return num_heap_buf_memcpy_; }
|
|
int TEST_GetNumCompressedBufMemcpy() const {
|
|
return num_compressed_buf_memcpy_;
|
|
}
|
|
|
|
#endif
|
|
private:
|
|
#ifndef NDEBUG
|
|
int num_stack_buf_memcpy_ = 0;
|
|
int num_heap_buf_memcpy_ = 0;
|
|
int num_compressed_buf_memcpy_ = 0;
|
|
|
|
#endif
|
|
static const uint32_t kDefaultStackBufferSize = 5000;
|
|
|
|
RandomAccessFileReader* file_;
|
|
FilePrefetchBuffer* prefetch_buffer_;
|
|
const Footer& footer_;
|
|
const ReadOptions read_options_;
|
|
const BlockHandle& handle_;
|
|
BlockContents* contents_;
|
|
const ImmutableOptions& ioptions_;
|
|
const bool do_uncompress_;
|
|
const bool maybe_compressed_;
|
|
const BlockType block_type_;
|
|
const size_t block_size_;
|
|
const size_t block_size_with_trailer_;
|
|
const UncompressionDict& uncompression_dict_;
|
|
const PersistentCacheOptions& cache_options_;
|
|
MemoryAllocator* memory_allocator_;
|
|
MemoryAllocator* memory_allocator_compressed_;
|
|
IOStatus io_status_;
|
|
Slice slice_;
|
|
char* used_buf_ = nullptr;
|
|
AlignedBuf direct_io_buf_;
|
|
CacheAllocationPtr heap_buf_;
|
|
CacheAllocationPtr compressed_buf_;
|
|
char stack_buf_[kDefaultStackBufferSize];
|
|
bool got_from_prefetch_buffer_ = false;
|
|
CompressionType compression_type_;
|
|
bool for_compaction_ = false;
|
|
|
|
// return true if found
|
|
bool TryGetUncompressBlockFromPersistentCache();
|
|
// return true if found
|
|
bool TryGetFromPrefetchBuffer();
|
|
bool TryGetCompressedBlockFromPersistentCache();
|
|
void PrepareBufferForBlockFromFile();
|
|
// Copy content from used_buf_ to new heap_buf_.
|
|
void CopyBufferToHeapBuf();
|
|
// Copy content from used_buf_ to new compressed_buf_.
|
|
void CopyBufferToCompressedBuf();
|
|
void GetBlockContents();
|
|
void InsertCompressedBlockToPersistentCacheIfNeeded();
|
|
void InsertUncompressedBlockToPersistentCacheIfNeeded();
|
|
void ProcessTrailerIfPresent();
|
|
};
|
|
} // namespace ROCKSDB_NAMESPACE
|