rocksdb/table/block_fetcher.cc
Mike Kolupaev b4d7209428 Add an option to put first key of each sst block in the index ()
Summary:
The first key is used to defer reading the data block until this file gets to the top of merging iterator's heap. For short range scans, most files never make it to the top of the heap, so this change can reduce read amplification by a lot sometimes.

Consider the following workload. There are a few data streams (we'll be calling them "logs"), each stream consisting of a sequence of blobs (we'll be calling them "records"). Each record is identified by log ID and a sequence number within the log. RocksDB key is concatenation of log ID and sequence number (big endian). Reads are mostly relatively short range scans, each within a single log. Writes are mostly sequential for each log, but writes to different logs are randomly interleaved. Compactions are disabled; instead, when we accumulate a few tens of sst files, we create a new column family and start writing to it.

So, a typical sst file consists of a few ranges of blocks, each range corresponding to one log ID (we use FlushBlockPolicy to cut blocks at log boundaries). A typical read would go like this. First, iterator Seek() reads one block from each sst file. Then a series of Next()s move through one sst file (since writes to each log are mostly sequential) until the subiterator reaches the end of this log in this sst file; then Next() switches to the next sst file and reads sequentially from that, and so on. Often a range scan will only return records from a small number of blocks in small number of sst files; in this case, the cost of initial Seek() reading one block from each file may be bigger than the cost of reading the actually useful blocks.

Neither iterate_upper_bound nor bloom filters can prevent reading one block from each file in Seek(). But this PR can: if the index contains first key from each block, we don't have to read the block until this block actually makes it to the top of merging iterator's heap, so for short range scans we won't read any blocks from most of the sst files.

This PR does the deferred block loading inside value() call. This is not ideal: there's no good way to report an IO error from inside value(). As discussed with siying offline, it would probably be better to change InternalIterator's interface to explicitly fetch deferred value and get status. I'll do it in a separate PR.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5289

Differential Revision: D15256423

Pulled By: al13n321

fbshipit-source-id: 750e4c39ce88e8d41662f701cf6275d9388ba46a
2019-06-24 20:54:04 -07:00

286 lines
9.8 KiB
C++

// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "table/block_fetcher.h"
#include <cinttypes>
#include <string>
#include "logging/logging.h"
#include "memory/memory_allocator.h"
#include "monitoring/perf_context_imp.h"
#include "rocksdb/env.h"
#include "table/block_based/block.h"
#include "table/block_based/block_based_table_reader.h"
#include "table/format.h"
#include "table/persistent_cache_helper.h"
#include "util/coding.h"
#include "util/compression.h"
#include "util/crc32c.h"
#include "util/file_reader_writer.h"
#include "util/stop_watch.h"
#include "util/string_util.h"
#include "util/xxhash.h"
namespace rocksdb {
inline void BlockFetcher::CheckBlockChecksum() {
// Check the crc of the type and the block contents
if (read_options_.verify_checksums) {
const char* data = slice_.data(); // Pointer to where Read put the data
PERF_TIMER_GUARD(block_checksum_time);
uint32_t value = DecodeFixed32(data + block_size_ + 1);
uint32_t actual = 0;
switch (footer_.checksum()) {
case kNoChecksum:
break;
case kCRC32c:
value = crc32c::Unmask(value);
actual = crc32c::Value(data, block_size_ + 1);
break;
case kxxHash:
actual = XXH32(data, static_cast<int>(block_size_) + 1, 0);
break;
case kxxHash64:
actual = static_cast<uint32_t>(
XXH64(data, static_cast<int>(block_size_) + 1, 0) &
uint64_t{0xffffffff});
break;
default:
status_ = Status::Corruption(
"unknown checksum type " + ToString(footer_.checksum()) + " in " +
file_->file_name() + " offset " + ToString(handle_.offset()) +
" size " + ToString(block_size_));
}
if (status_.ok() && actual != value) {
status_ = Status::Corruption(
"block checksum mismatch: expected " + ToString(actual) + ", got " +
ToString(value) + " in " + file_->file_name() + " offset " +
ToString(handle_.offset()) + " size " + ToString(block_size_));
}
}
}
inline bool BlockFetcher::TryGetUncompressBlockFromPersistentCache() {
if (cache_options_.persistent_cache &&
!cache_options_.persistent_cache->IsCompressed()) {
Status status = PersistentCacheHelper::LookupUncompressedPage(
cache_options_, handle_, contents_);
if (status.ok()) {
// uncompressed page is found for the block handle
return true;
} else {
// uncompressed page is not found
if (ioptions_.info_log && !status.IsNotFound()) {
assert(!status.ok());
ROCKS_LOG_INFO(ioptions_.info_log,
"Error reading from persistent cache. %s",
status.ToString().c_str());
}
}
}
return false;
}
inline bool BlockFetcher::TryGetFromPrefetchBuffer() {
if (prefetch_buffer_ != nullptr &&
prefetch_buffer_->TryReadFromCache(
handle_.offset(),
static_cast<size_t>(handle_.size()) + kBlockTrailerSize, &slice_,
for_compaction_)) {
block_size_ = static_cast<size_t>(handle_.size());
CheckBlockChecksum();
if (!status_.ok()) {
return true;
}
got_from_prefetch_buffer_ = true;
used_buf_ = const_cast<char*>(slice_.data());
}
return got_from_prefetch_buffer_;
}
inline bool BlockFetcher::TryGetCompressedBlockFromPersistentCache() {
if (cache_options_.persistent_cache &&
cache_options_.persistent_cache->IsCompressed()) {
// lookup uncompressed cache mode p-cache
std::unique_ptr<char[]> raw_data;
status_ = PersistentCacheHelper::LookupRawPage(
cache_options_, handle_, &raw_data, block_size_ + kBlockTrailerSize);
if (status_.ok()) {
heap_buf_ = CacheAllocationPtr(raw_data.release());
used_buf_ = heap_buf_.get();
slice_ = Slice(heap_buf_.get(), block_size_);
return true;
} else if (!status_.IsNotFound() && ioptions_.info_log) {
assert(!status_.ok());
ROCKS_LOG_INFO(ioptions_.info_log,
"Error reading from persistent cache. %s",
status_.ToString().c_str());
}
}
return false;
}
inline void BlockFetcher::PrepareBufferForBlockFromFile() {
// cache miss read from device
if (do_uncompress_ &&
block_size_ + kBlockTrailerSize < kDefaultStackBufferSize) {
// If we've got a small enough hunk of data, read it in to the
// trivially allocated stack buffer instead of needing a full malloc()
used_buf_ = &stack_buf_[0];
} else if (maybe_compressed_ && !do_uncompress_) {
compressed_buf_ = AllocateBlock(block_size_ + kBlockTrailerSize,
memory_allocator_compressed_);
used_buf_ = compressed_buf_.get();
} else {
heap_buf_ =
AllocateBlock(block_size_ + kBlockTrailerSize, memory_allocator_);
used_buf_ = heap_buf_.get();
}
}
inline void BlockFetcher::InsertCompressedBlockToPersistentCacheIfNeeded() {
if (status_.ok() && read_options_.fill_cache &&
cache_options_.persistent_cache &&
cache_options_.persistent_cache->IsCompressed()) {
// insert to raw cache
PersistentCacheHelper::InsertRawPage(cache_options_, handle_, used_buf_,
block_size_ + kBlockTrailerSize);
}
}
inline void BlockFetcher::InsertUncompressedBlockToPersistentCacheIfNeeded() {
if (status_.ok() && !got_from_prefetch_buffer_ && read_options_.fill_cache &&
cache_options_.persistent_cache &&
!cache_options_.persistent_cache->IsCompressed()) {
// insert to uncompressed cache
PersistentCacheHelper::InsertUncompressedPage(cache_options_, handle_,
*contents_);
}
}
inline void BlockFetcher::CopyBufferToHeap() {
assert(used_buf_ != heap_buf_.get());
heap_buf_ = AllocateBlock(block_size_ + kBlockTrailerSize, memory_allocator_);
memcpy(heap_buf_.get(), used_buf_, block_size_ + kBlockTrailerSize);
}
inline void BlockFetcher::GetBlockContents() {
if (slice_.data() != used_buf_) {
// the slice content is not the buffer provided
*contents_ = BlockContents(Slice(slice_.data(), block_size_));
} else {
// page can be either uncompressed or compressed, the buffer either stack
// or heap provided. Refer to https://github.com/facebook/rocksdb/pull/4096
if (got_from_prefetch_buffer_ || used_buf_ == &stack_buf_[0]) {
CopyBufferToHeap();
} else if (used_buf_ == compressed_buf_.get()) {
if (compression_type_ == kNoCompression &&
memory_allocator_ != memory_allocator_compressed_) {
CopyBufferToHeap();
} else {
heap_buf_ = std::move(compressed_buf_);
}
}
*contents_ = BlockContents(std::move(heap_buf_), block_size_);
}
#ifndef NDEBUG
contents_->is_raw_block = true;
#endif
}
Status BlockFetcher::ReadBlockContents() {
block_size_ = static_cast<size_t>(handle_.size());
if (TryGetUncompressBlockFromPersistentCache()) {
compression_type_ = kNoCompression;
#ifndef NDEBUG
contents_->is_raw_block = true;
#endif // NDEBUG
return Status::OK();
}
if (TryGetFromPrefetchBuffer()) {
if (!status_.ok()) {
return status_;
}
} else if (!TryGetCompressedBlockFromPersistentCache()) {
PrepareBufferForBlockFromFile();
Status s;
{
PERF_TIMER_GUARD(block_read_time);
// Actual file read
status_ = file_->Read(handle_.offset(), block_size_ + kBlockTrailerSize,
&slice_, used_buf_, for_compaction_);
}
PERF_COUNTER_ADD(block_read_count, 1);
// TODO: introduce dedicated perf counter for range tombstones
switch (block_type_) {
case BlockType::kFilter:
PERF_COUNTER_ADD(filter_block_read_count, 1);
break;
case BlockType::kCompressionDictionary:
PERF_COUNTER_ADD(compression_dict_block_read_count, 1);
break;
case BlockType::kIndex:
PERF_COUNTER_ADD(index_block_read_count, 1);
break;
// Nothing to do here as we don't have counters for the other types.
default:
break;
}
PERF_COUNTER_ADD(block_read_byte, block_size_ + kBlockTrailerSize);
if (!status_.ok()) {
return status_;
}
if (slice_.size() != block_size_ + kBlockTrailerSize) {
return Status::Corruption("truncated block read from " +
file_->file_name() + " offset " +
ToString(handle_.offset()) + ", expected " +
ToString(block_size_ + kBlockTrailerSize) +
" bytes, got " + ToString(slice_.size()));
}
CheckBlockChecksum();
if (status_.ok()) {
InsertCompressedBlockToPersistentCacheIfNeeded();
} else {
return status_;
}
}
PERF_TIMER_GUARD(block_decompress_time);
compression_type_ = get_block_compression_type(slice_.data(), block_size_);
if (do_uncompress_ && compression_type_ != kNoCompression) {
// compressed page, uncompress, update cache
UncompressionContext context(compression_type_);
UncompressionInfo info(context, uncompression_dict_, compression_type_);
status_ = UncompressBlockContents(info, slice_.data(), block_size_,
contents_, footer_.version(), ioptions_,
memory_allocator_);
compression_type_ = kNoCompression;
} else {
GetBlockContents();
}
InsertUncompressedBlockToPersistentCacheIfNeeded();
return status_;
}
} // namespace rocksdb