8ea0a2c1bd
Summary: Implement the ```WaitAll()``` interface in ```LRUCache``` to allow callers to issue multiple lookups in parallel and wait for all of them to complete. Modify ```MultiGet``` to use this to parallelize the secondary cache lookups in order to reduce the overall latency. A call to ```cache->Lookup()``` returns a handle that has an incomplete value (nullptr), and the caller can call ```cache->IsReady()``` to check whether the lookup is complete, and pass a vector of handles to ```WaitAll``` to wait for completion. If any of the lookups fail, ```MultiGet``` will read the block from the SST file. Another change in this PR is to rename ```SecondaryCacheHandle``` to ```SecondaryCacheResultHandle``` as it more accurately describes the return result of the secondary cache lookup, which is more like a future. Tests: 1. Add unit tests in lru_cache_test 2. Benchmark results with no secondary cache configured Master - ``` readrandom : 41.175 micros/op 388562 ops/sec; 106.7 MB/s (7277999 of 7277999 found) readrandom : 41.217 micros/op 388160 ops/sec; 106.6 MB/s (7274999 of 7274999 found) multireadrandom : 10.309 micros/op 1552082 ops/sec; (28908992 of 28908992 found) multireadrandom : 10.321 micros/op 1550218 ops/sec; (29081984 of 29081984 found) ``` This PR - ``` readrandom : 41.158 micros/op 388723 ops/sec; 106.8 MB/s (7290999 of 7290999 found) readrandom : 41.185 micros/op 388463 ops/sec; 106.7 MB/s (7287999 of 7287999 found) multireadrandom : 10.277 micros/op 1556801 ops/sec; (29346944 of 29346944 found) multireadrandom : 10.253 micros/op 1560539 ops/sec; (29274944 of 29274944 found) ``` Pull Request resolved: https://github.com/facebook/rocksdb/pull/8405 Reviewed By: zhichao-cao Differential Revision: D29190509 Pulled By: anand1976 fbshipit-source-id: 6f8eff6246712af8a297cfe22ea0d1c3b2a01bb0
191 lines
8.4 KiB
C++
191 lines
8.4 KiB
C++
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
|
// This source code is licensed under both the GPLv2 (found in the
|
|
// COPYING file in the root directory) and Apache 2.0 License
|
|
// (found in the LICENSE.Apache file in the root directory).
|
|
//
|
|
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
|
|
// Use of this source code is governed by a BSD-style license that can be
|
|
// found in the LICENSE file. See the AUTHORS file for names of contributors.
|
|
#pragma once
|
|
#include "table/block_based/block_based_table_reader.h"
|
|
|
|
#include "table/block_based/reader_common.h"
|
|
|
|
// The file contains some member functions of BlockBasedTable that
|
|
// cannot be implemented in block_based_table_reader.cc because
|
|
// it's called by other files (e.g. block_based_iterator.h) and
|
|
// are templates.
|
|
|
|
namespace ROCKSDB_NAMESPACE {
|
|
// Convert an index iterator value (i.e., an encoded BlockHandle)
|
|
// into an iterator over the contents of the corresponding block.
|
|
// If input_iter is null, new a iterator
|
|
// If input_iter is not null, update this iter and return it
|
|
template <typename TBlockIter>
|
|
TBlockIter* BlockBasedTable::NewDataBlockIterator(
|
|
const ReadOptions& ro, const BlockHandle& handle, TBlockIter* input_iter,
|
|
BlockType block_type, GetContext* get_context,
|
|
BlockCacheLookupContext* lookup_context, Status s,
|
|
FilePrefetchBuffer* prefetch_buffer, bool for_compaction) const {
|
|
PERF_TIMER_GUARD(new_table_block_iter_nanos);
|
|
|
|
TBlockIter* iter = input_iter != nullptr ? input_iter : new TBlockIter;
|
|
if (!s.ok()) {
|
|
iter->Invalidate(s);
|
|
return iter;
|
|
}
|
|
|
|
CachableEntry<UncompressionDict> uncompression_dict;
|
|
if (rep_->uncompression_dict_reader) {
|
|
const bool no_io = (ro.read_tier == kBlockCacheTier);
|
|
s = rep_->uncompression_dict_reader->GetOrReadUncompressionDictionary(
|
|
prefetch_buffer, no_io, get_context, lookup_context,
|
|
&uncompression_dict);
|
|
if (!s.ok()) {
|
|
iter->Invalidate(s);
|
|
return iter;
|
|
}
|
|
}
|
|
|
|
const UncompressionDict& dict = uncompression_dict.GetValue()
|
|
? *uncompression_dict.GetValue()
|
|
: UncompressionDict::GetEmptyDict();
|
|
|
|
CachableEntry<Block> block;
|
|
s = RetrieveBlock(prefetch_buffer, ro, handle, dict, &block, block_type,
|
|
get_context, lookup_context, for_compaction,
|
|
/* use_cache */ true, /* wait_for_cache */ true);
|
|
|
|
if (!s.ok()) {
|
|
assert(block.IsEmpty());
|
|
iter->Invalidate(s);
|
|
return iter;
|
|
}
|
|
|
|
assert(block.GetValue() != nullptr);
|
|
|
|
// Block contents are pinned and it is still pinned after the iterator
|
|
// is destroyed as long as cleanup functions are moved to another object,
|
|
// when:
|
|
// 1. block cache handle is set to be released in cleanup function, or
|
|
// 2. it's pointing to immortal source. If own_bytes is true then we are
|
|
// not reading data from the original source, whether immortal or not.
|
|
// Otherwise, the block is pinned iff the source is immortal.
|
|
const bool block_contents_pinned =
|
|
block.IsCached() ||
|
|
(!block.GetValue()->own_bytes() && rep_->immortal_table);
|
|
iter = InitBlockIterator<TBlockIter>(rep_, block.GetValue(), block_type, iter,
|
|
block_contents_pinned);
|
|
|
|
if (!block.IsCached()) {
|
|
if (!ro.fill_cache && rep_->cache_key_prefix_size != 0) {
|
|
// insert a dummy record to block cache to track the memory usage
|
|
Cache* const block_cache = rep_->table_options.block_cache.get();
|
|
Cache::Handle* cache_handle = nullptr;
|
|
// There are two other types of cache keys: 1) SST cache key added in
|
|
// `MaybeReadBlockAndLoadToCache` 2) dummy cache key added in
|
|
// `write_buffer_manager`. Use longer prefix (41 bytes) to differentiate
|
|
// from SST cache key(31 bytes), and use non-zero prefix to
|
|
// differentiate from `write_buffer_manager`
|
|
const size_t kExtraCacheKeyPrefix = kMaxVarint64Length * 4 + 1;
|
|
char cache_key[kExtraCacheKeyPrefix + kMaxVarint64Length];
|
|
// Prefix: use rep_->cache_key_prefix padded by 0s
|
|
memset(cache_key, 0, kExtraCacheKeyPrefix + kMaxVarint64Length);
|
|
assert(rep_->cache_key_prefix_size != 0);
|
|
assert(rep_->cache_key_prefix_size <= kExtraCacheKeyPrefix);
|
|
memcpy(cache_key, rep_->cache_key_prefix, rep_->cache_key_prefix_size);
|
|
char* end = EncodeVarint64(cache_key + kExtraCacheKeyPrefix,
|
|
next_cache_key_id_++);
|
|
assert(end - cache_key <=
|
|
static_cast<int>(kExtraCacheKeyPrefix + kMaxVarint64Length));
|
|
const Slice unique_key(cache_key, static_cast<size_t>(end - cache_key));
|
|
s = block_cache->Insert(unique_key, nullptr,
|
|
block.GetValue()->ApproximateMemoryUsage(),
|
|
nullptr, &cache_handle);
|
|
|
|
if (s.ok()) {
|
|
assert(cache_handle != nullptr);
|
|
iter->RegisterCleanup(&ForceReleaseCachedEntry, block_cache,
|
|
cache_handle);
|
|
}
|
|
}
|
|
} else {
|
|
iter->SetCacheHandle(block.GetCacheHandle());
|
|
}
|
|
|
|
block.TransferTo(iter);
|
|
|
|
return iter;
|
|
}
|
|
|
|
// Convert an uncompressed data block (i.e CachableEntry<Block>)
|
|
// into an iterator over the contents of the corresponding block.
|
|
// If input_iter is null, new a iterator
|
|
// If input_iter is not null, update this iter and return it
|
|
template <typename TBlockIter>
|
|
TBlockIter* BlockBasedTable::NewDataBlockIterator(const ReadOptions& ro,
|
|
CachableEntry<Block>& block,
|
|
TBlockIter* input_iter,
|
|
Status s) const {
|
|
PERF_TIMER_GUARD(new_table_block_iter_nanos);
|
|
|
|
TBlockIter* iter = input_iter != nullptr ? input_iter : new TBlockIter;
|
|
if (!s.ok()) {
|
|
iter->Invalidate(s);
|
|
return iter;
|
|
}
|
|
|
|
assert(block.GetValue() != nullptr);
|
|
// Block contents are pinned and it is still pinned after the iterator
|
|
// is destroyed as long as cleanup functions are moved to another object,
|
|
// when:
|
|
// 1. block cache handle is set to be released in cleanup function, or
|
|
// 2. it's pointing to immortal source. If own_bytes is true then we are
|
|
// not reading data from the original source, whether immortal or not.
|
|
// Otherwise, the block is pinned iff the source is immortal.
|
|
const bool block_contents_pinned =
|
|
block.IsCached() ||
|
|
(!block.GetValue()->own_bytes() && rep_->immortal_table);
|
|
iter = InitBlockIterator<TBlockIter>(rep_, block.GetValue(), BlockType::kData,
|
|
iter, block_contents_pinned);
|
|
|
|
if (!block.IsCached()) {
|
|
if (!ro.fill_cache && rep_->cache_key_prefix_size != 0) {
|
|
// insert a dummy record to block cache to track the memory usage
|
|
Cache* const block_cache = rep_->table_options.block_cache.get();
|
|
Cache::Handle* cache_handle = nullptr;
|
|
// There are two other types of cache keys: 1) SST cache key added in
|
|
// `MaybeReadBlockAndLoadToCache` 2) dummy cache key added in
|
|
// `write_buffer_manager`. Use longer prefix (41 bytes) to differentiate
|
|
// from SST cache key(31 bytes), and use non-zero prefix to
|
|
// differentiate from `write_buffer_manager`
|
|
const size_t kExtraCacheKeyPrefix = kMaxVarint64Length * 4 + 1;
|
|
char cache_key[kExtraCacheKeyPrefix + kMaxVarint64Length];
|
|
// Prefix: use rep_->cache_key_prefix padded by 0s
|
|
memset(cache_key, 0, kExtraCacheKeyPrefix + kMaxVarint64Length);
|
|
assert(rep_->cache_key_prefix_size != 0);
|
|
assert(rep_->cache_key_prefix_size <= kExtraCacheKeyPrefix);
|
|
memcpy(cache_key, rep_->cache_key_prefix, rep_->cache_key_prefix_size);
|
|
char* end = EncodeVarint64(cache_key + kExtraCacheKeyPrefix,
|
|
next_cache_key_id_++);
|
|
assert(end - cache_key <=
|
|
static_cast<int>(kExtraCacheKeyPrefix + kMaxVarint64Length));
|
|
const Slice unique_key(cache_key, static_cast<size_t>(end - cache_key));
|
|
s = block_cache->Insert(unique_key, nullptr,
|
|
block.GetValue()->ApproximateMemoryUsage(),
|
|
nullptr, &cache_handle);
|
|
if (s.ok()) {
|
|
assert(cache_handle != nullptr);
|
|
iter->RegisterCleanup(&ForceReleaseCachedEntry, block_cache,
|
|
cache_handle);
|
|
}
|
|
}
|
|
} else {
|
|
iter->SetCacheHandle(block.GetCacheHandle());
|
|
}
|
|
|
|
block.TransferTo(iter);
|
|
return iter;
|
|
}
|
|
} // namespace ROCKSDB_NAMESPACE
|