3dff28cf9b
Summary: For performance purposes, the lower level routines were changed to use a SystemClock* instead of a std::shared_ptr<SystemClock>. The shared ptr has some performance degradation on certain hardware classes. For most of the system, there is no risk of the pointer being deleted/invalid because the shared_ptr will be stored elsewhere. For example, the ImmutableDBOptions stores the Env which has a std::shared_ptr<SystemClock> in it. The SystemClock* within the ImmutableDBOptions is essentially a "short cut" to gain access to this constant resource. There were a few classes (PeriodicWorkScheduler?) where the "short cut" property did not hold. In those cases, the shared pointer was preserved. Using db_bench readrandom perf_level=3 on my EC2 box, this change performed as well or better than 6.17: 6.17: readrandom : 28.046 micros/op 854902 ops/sec; 61.3 MB/s (355999 of 355999 found) 6.18: readrandom : 32.615 micros/op 735306 ops/sec; 52.7 MB/s (290999 of 290999 found) PR: readrandom : 27.500 micros/op 871909 ops/sec; 62.5 MB/s (367999 of 367999 found) (Note that the times for 6.18 are prior to revert of the SystemClock). Pull Request resolved: https://github.com/facebook/rocksdb/pull/8033 Reviewed By: pdillinger Differential Revision: D27014563 Pulled By: mrambacher fbshipit-source-id: ad0459eba03182e454391b5926bf5cdd45657b67
411 lines
15 KiB
C++
411 lines
15 KiB
C++
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
|
// This source code is licensed under both the GPLv2 (found in the
|
|
// COPYING file in the root directory) and Apache 2.0 License
|
|
// (found in the LICENSE.Apache file in the root directory).
|
|
|
|
#include "table/get_context.h"
|
|
|
|
#include "db/merge_helper.h"
|
|
#include "db/pinned_iterators_manager.h"
|
|
#include "db/read_callback.h"
|
|
#include "monitoring/file_read_sample.h"
|
|
#include "monitoring/perf_context_imp.h"
|
|
#include "monitoring/statistics.h"
|
|
#include "rocksdb/merge_operator.h"
|
|
#include "rocksdb/statistics.h"
|
|
#include "rocksdb/system_clock.h"
|
|
|
|
namespace ROCKSDB_NAMESPACE {
|
|
|
|
namespace {
|
|
|
|
void appendToReplayLog(std::string* replay_log, ValueType type, Slice value) {
|
|
#ifndef ROCKSDB_LITE
|
|
if (replay_log) {
|
|
if (replay_log->empty()) {
|
|
// Optimization: in the common case of only one operation in the
|
|
// log, we allocate the exact amount of space needed.
|
|
replay_log->reserve(1 + VarintLength(value.size()) + value.size());
|
|
}
|
|
replay_log->push_back(type);
|
|
PutLengthPrefixedSlice(replay_log, value);
|
|
}
|
|
#else
|
|
(void)replay_log;
|
|
(void)type;
|
|
(void)value;
|
|
#endif // ROCKSDB_LITE
|
|
}
|
|
|
|
} // namespace
|
|
|
|
GetContext::GetContext(
|
|
const Comparator* ucmp, const MergeOperator* merge_operator, Logger* logger,
|
|
Statistics* statistics, GetState init_state, const Slice& user_key,
|
|
PinnableSlice* pinnable_val, std::string* timestamp, bool* value_found,
|
|
MergeContext* merge_context, bool do_merge,
|
|
SequenceNumber* _max_covering_tombstone_seq, SystemClock* clock,
|
|
SequenceNumber* seq, PinnedIteratorsManager* _pinned_iters_mgr,
|
|
ReadCallback* callback, bool* is_blob_index, uint64_t tracing_get_id)
|
|
: ucmp_(ucmp),
|
|
merge_operator_(merge_operator),
|
|
logger_(logger),
|
|
statistics_(statistics),
|
|
state_(init_state),
|
|
user_key_(user_key),
|
|
pinnable_val_(pinnable_val),
|
|
timestamp_(timestamp),
|
|
value_found_(value_found),
|
|
merge_context_(merge_context),
|
|
max_covering_tombstone_seq_(_max_covering_tombstone_seq),
|
|
clock_(clock),
|
|
seq_(seq),
|
|
replay_log_(nullptr),
|
|
pinned_iters_mgr_(_pinned_iters_mgr),
|
|
callback_(callback),
|
|
do_merge_(do_merge),
|
|
is_blob_index_(is_blob_index),
|
|
tracing_get_id_(tracing_get_id) {
|
|
if (seq_) {
|
|
*seq_ = kMaxSequenceNumber;
|
|
}
|
|
sample_ = should_sample_file_read();
|
|
}
|
|
|
|
GetContext::GetContext(
|
|
const Comparator* ucmp, const MergeOperator* merge_operator, Logger* logger,
|
|
Statistics* statistics, GetState init_state, const Slice& user_key,
|
|
PinnableSlice* pinnable_val, bool* value_found, MergeContext* merge_context,
|
|
bool do_merge, SequenceNumber* _max_covering_tombstone_seq,
|
|
SystemClock* clock, SequenceNumber* seq,
|
|
PinnedIteratorsManager* _pinned_iters_mgr, ReadCallback* callback,
|
|
bool* is_blob_index, uint64_t tracing_get_id)
|
|
: GetContext(ucmp, merge_operator, logger, statistics, init_state, user_key,
|
|
pinnable_val, nullptr, value_found, merge_context, do_merge,
|
|
_max_covering_tombstone_seq, clock, seq, _pinned_iters_mgr,
|
|
callback, is_blob_index, tracing_get_id) {}
|
|
|
|
// Called from TableCache::Get and Table::Get when file/block in which
|
|
// key may exist are not there in TableCache/BlockCache respectively. In this
|
|
// case we can't guarantee that key does not exist and are not permitted to do
|
|
// IO to be certain.Set the status=kFound and value_found=false to let the
|
|
// caller know that key may exist but is not there in memory
|
|
void GetContext::MarkKeyMayExist() {
|
|
state_ = kFound;
|
|
if (value_found_ != nullptr) {
|
|
*value_found_ = false;
|
|
}
|
|
}
|
|
|
|
void GetContext::SaveValue(const Slice& value, SequenceNumber /*seq*/) {
|
|
assert(state_ == kNotFound);
|
|
appendToReplayLog(replay_log_, kTypeValue, value);
|
|
|
|
state_ = kFound;
|
|
if (LIKELY(pinnable_val_ != nullptr)) {
|
|
pinnable_val_->PinSelf(value);
|
|
}
|
|
}
|
|
|
|
void GetContext::ReportCounters() {
|
|
if (get_context_stats_.num_cache_hit > 0) {
|
|
RecordTick(statistics_, BLOCK_CACHE_HIT, get_context_stats_.num_cache_hit);
|
|
}
|
|
if (get_context_stats_.num_cache_index_hit > 0) {
|
|
RecordTick(statistics_, BLOCK_CACHE_INDEX_HIT,
|
|
get_context_stats_.num_cache_index_hit);
|
|
}
|
|
if (get_context_stats_.num_cache_data_hit > 0) {
|
|
RecordTick(statistics_, BLOCK_CACHE_DATA_HIT,
|
|
get_context_stats_.num_cache_data_hit);
|
|
}
|
|
if (get_context_stats_.num_cache_filter_hit > 0) {
|
|
RecordTick(statistics_, BLOCK_CACHE_FILTER_HIT,
|
|
get_context_stats_.num_cache_filter_hit);
|
|
}
|
|
if (get_context_stats_.num_cache_compression_dict_hit > 0) {
|
|
RecordTick(statistics_, BLOCK_CACHE_COMPRESSION_DICT_HIT,
|
|
get_context_stats_.num_cache_compression_dict_hit);
|
|
}
|
|
if (get_context_stats_.num_cache_index_miss > 0) {
|
|
RecordTick(statistics_, BLOCK_CACHE_INDEX_MISS,
|
|
get_context_stats_.num_cache_index_miss);
|
|
}
|
|
if (get_context_stats_.num_cache_filter_miss > 0) {
|
|
RecordTick(statistics_, BLOCK_CACHE_FILTER_MISS,
|
|
get_context_stats_.num_cache_filter_miss);
|
|
}
|
|
if (get_context_stats_.num_cache_data_miss > 0) {
|
|
RecordTick(statistics_, BLOCK_CACHE_DATA_MISS,
|
|
get_context_stats_.num_cache_data_miss);
|
|
}
|
|
if (get_context_stats_.num_cache_compression_dict_miss > 0) {
|
|
RecordTick(statistics_, BLOCK_CACHE_COMPRESSION_DICT_MISS,
|
|
get_context_stats_.num_cache_compression_dict_miss);
|
|
}
|
|
if (get_context_stats_.num_cache_bytes_read > 0) {
|
|
RecordTick(statistics_, BLOCK_CACHE_BYTES_READ,
|
|
get_context_stats_.num_cache_bytes_read);
|
|
}
|
|
if (get_context_stats_.num_cache_miss > 0) {
|
|
RecordTick(statistics_, BLOCK_CACHE_MISS,
|
|
get_context_stats_.num_cache_miss);
|
|
}
|
|
if (get_context_stats_.num_cache_add > 0) {
|
|
RecordTick(statistics_, BLOCK_CACHE_ADD, get_context_stats_.num_cache_add);
|
|
}
|
|
if (get_context_stats_.num_cache_add_redundant > 0) {
|
|
RecordTick(statistics_, BLOCK_CACHE_ADD_REDUNDANT,
|
|
get_context_stats_.num_cache_add_redundant);
|
|
}
|
|
if (get_context_stats_.num_cache_bytes_write > 0) {
|
|
RecordTick(statistics_, BLOCK_CACHE_BYTES_WRITE,
|
|
get_context_stats_.num_cache_bytes_write);
|
|
}
|
|
if (get_context_stats_.num_cache_index_add > 0) {
|
|
RecordTick(statistics_, BLOCK_CACHE_INDEX_ADD,
|
|
get_context_stats_.num_cache_index_add);
|
|
}
|
|
if (get_context_stats_.num_cache_index_add_redundant > 0) {
|
|
RecordTick(statistics_, BLOCK_CACHE_INDEX_ADD_REDUNDANT,
|
|
get_context_stats_.num_cache_index_add_redundant);
|
|
}
|
|
if (get_context_stats_.num_cache_index_bytes_insert > 0) {
|
|
RecordTick(statistics_, BLOCK_CACHE_INDEX_BYTES_INSERT,
|
|
get_context_stats_.num_cache_index_bytes_insert);
|
|
}
|
|
if (get_context_stats_.num_cache_data_add > 0) {
|
|
RecordTick(statistics_, BLOCK_CACHE_DATA_ADD,
|
|
get_context_stats_.num_cache_data_add);
|
|
}
|
|
if (get_context_stats_.num_cache_data_add_redundant > 0) {
|
|
RecordTick(statistics_, BLOCK_CACHE_DATA_ADD_REDUNDANT,
|
|
get_context_stats_.num_cache_data_add_redundant);
|
|
}
|
|
if (get_context_stats_.num_cache_data_bytes_insert > 0) {
|
|
RecordTick(statistics_, BLOCK_CACHE_DATA_BYTES_INSERT,
|
|
get_context_stats_.num_cache_data_bytes_insert);
|
|
}
|
|
if (get_context_stats_.num_cache_filter_add > 0) {
|
|
RecordTick(statistics_, BLOCK_CACHE_FILTER_ADD,
|
|
get_context_stats_.num_cache_filter_add);
|
|
}
|
|
if (get_context_stats_.num_cache_filter_add_redundant > 0) {
|
|
RecordTick(statistics_, BLOCK_CACHE_FILTER_ADD_REDUNDANT,
|
|
get_context_stats_.num_cache_filter_add_redundant);
|
|
}
|
|
if (get_context_stats_.num_cache_filter_bytes_insert > 0) {
|
|
RecordTick(statistics_, BLOCK_CACHE_FILTER_BYTES_INSERT,
|
|
get_context_stats_.num_cache_filter_bytes_insert);
|
|
}
|
|
if (get_context_stats_.num_cache_compression_dict_add > 0) {
|
|
RecordTick(statistics_, BLOCK_CACHE_COMPRESSION_DICT_ADD,
|
|
get_context_stats_.num_cache_compression_dict_add);
|
|
}
|
|
if (get_context_stats_.num_cache_compression_dict_add_redundant > 0) {
|
|
RecordTick(statistics_, BLOCK_CACHE_COMPRESSION_DICT_ADD_REDUNDANT,
|
|
get_context_stats_.num_cache_compression_dict_add_redundant);
|
|
}
|
|
if (get_context_stats_.num_cache_compression_dict_bytes_insert > 0) {
|
|
RecordTick(statistics_, BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT,
|
|
get_context_stats_.num_cache_compression_dict_bytes_insert);
|
|
}
|
|
}
|
|
|
|
bool GetContext::SaveValue(const ParsedInternalKey& parsed_key,
|
|
const Slice& value, bool* matched,
|
|
Cleanable* value_pinner) {
|
|
assert(matched);
|
|
assert((state_ != kMerge && parsed_key.type != kTypeMerge) ||
|
|
merge_context_ != nullptr);
|
|
if (ucmp_->EqualWithoutTimestamp(parsed_key.user_key, user_key_)) {
|
|
*matched = true;
|
|
// If the value is not in the snapshot, skip it
|
|
if (!CheckCallback(parsed_key.sequence)) {
|
|
return true; // to continue to the next seq
|
|
}
|
|
|
|
appendToReplayLog(replay_log_, parsed_key.type, value);
|
|
|
|
if (seq_ != nullptr) {
|
|
// Set the sequence number if it is uninitialized
|
|
if (*seq_ == kMaxSequenceNumber) {
|
|
*seq_ = parsed_key.sequence;
|
|
}
|
|
}
|
|
|
|
auto type = parsed_key.type;
|
|
// Key matches. Process it
|
|
if ((type == kTypeValue || type == kTypeMerge || type == kTypeBlobIndex) &&
|
|
max_covering_tombstone_seq_ != nullptr &&
|
|
*max_covering_tombstone_seq_ > parsed_key.sequence) {
|
|
type = kTypeRangeDeletion;
|
|
}
|
|
switch (type) {
|
|
case kTypeValue:
|
|
case kTypeBlobIndex:
|
|
assert(state_ == kNotFound || state_ == kMerge);
|
|
if (type == kTypeBlobIndex && is_blob_index_ == nullptr) {
|
|
// Blob value not supported. Stop.
|
|
state_ = kUnexpectedBlobIndex;
|
|
return false;
|
|
}
|
|
if (kNotFound == state_) {
|
|
state_ = kFound;
|
|
if (do_merge_) {
|
|
if (LIKELY(pinnable_val_ != nullptr)) {
|
|
if (LIKELY(value_pinner != nullptr)) {
|
|
// If the backing resources for the value are provided, pin them
|
|
pinnable_val_->PinSlice(value, value_pinner);
|
|
} else {
|
|
TEST_SYNC_POINT_CALLBACK("GetContext::SaveValue::PinSelf",
|
|
this);
|
|
|
|
// Otherwise copy the value
|
|
pinnable_val_->PinSelf(value);
|
|
}
|
|
}
|
|
} else {
|
|
// It means this function is called as part of DB GetMergeOperands
|
|
// API and the current value should be part of
|
|
// merge_context_->operand_list
|
|
push_operand(value, value_pinner);
|
|
}
|
|
} else if (kMerge == state_) {
|
|
assert(merge_operator_ != nullptr);
|
|
state_ = kFound;
|
|
if (do_merge_) {
|
|
if (LIKELY(pinnable_val_ != nullptr)) {
|
|
Status merge_status = MergeHelper::TimedFullMerge(
|
|
merge_operator_, user_key_, &value,
|
|
merge_context_->GetOperands(), pinnable_val_->GetSelf(),
|
|
logger_, statistics_, clock_);
|
|
pinnable_val_->PinSelf();
|
|
if (!merge_status.ok()) {
|
|
state_ = kCorrupt;
|
|
}
|
|
}
|
|
} else {
|
|
// It means this function is called as part of DB GetMergeOperands
|
|
// API and the current value should be part of
|
|
// merge_context_->operand_list
|
|
push_operand(value, value_pinner);
|
|
}
|
|
}
|
|
if (state_ == kFound) {
|
|
size_t ts_sz = ucmp_->timestamp_size();
|
|
if (ts_sz > 0 && timestamp_ != nullptr) {
|
|
Slice ts = ExtractTimestampFromUserKey(parsed_key.user_key, ts_sz);
|
|
timestamp_->assign(ts.data(), ts.size());
|
|
}
|
|
}
|
|
if (is_blob_index_ != nullptr) {
|
|
*is_blob_index_ = (type == kTypeBlobIndex);
|
|
}
|
|
return false;
|
|
|
|
case kTypeDeletion:
|
|
case kTypeDeletionWithTimestamp:
|
|
case kTypeSingleDeletion:
|
|
case kTypeRangeDeletion:
|
|
// TODO(noetzli): Verify correctness once merge of single-deletes
|
|
// is supported
|
|
assert(state_ == kNotFound || state_ == kMerge);
|
|
if (kNotFound == state_) {
|
|
state_ = kDeleted;
|
|
} else if (kMerge == state_) {
|
|
state_ = kFound;
|
|
if (LIKELY(pinnable_val_ != nullptr)) {
|
|
if (do_merge_) {
|
|
Status merge_status = MergeHelper::TimedFullMerge(
|
|
merge_operator_, user_key_, nullptr,
|
|
merge_context_->GetOperands(), pinnable_val_->GetSelf(),
|
|
logger_, statistics_, clock_);
|
|
pinnable_val_->PinSelf();
|
|
if (!merge_status.ok()) {
|
|
state_ = kCorrupt;
|
|
}
|
|
}
|
|
// If do_merge_ = false then the current value shouldn't be part of
|
|
// merge_context_->operand_list
|
|
}
|
|
}
|
|
return false;
|
|
|
|
case kTypeMerge:
|
|
assert(state_ == kNotFound || state_ == kMerge);
|
|
state_ = kMerge;
|
|
// value_pinner is not set from plain_table_reader.cc for example.
|
|
push_operand(value, value_pinner);
|
|
if (do_merge_ && merge_operator_ != nullptr &&
|
|
merge_operator_->ShouldMerge(
|
|
merge_context_->GetOperandsDirectionBackward())) {
|
|
state_ = kFound;
|
|
if (LIKELY(pinnable_val_ != nullptr)) {
|
|
// do_merge_ = true this is the case where this function is called
|
|
// as part of DB Get API hence merge operators should be merged.
|
|
if (do_merge_) {
|
|
Status merge_status = MergeHelper::TimedFullMerge(
|
|
merge_operator_, user_key_, nullptr,
|
|
merge_context_->GetOperands(), pinnable_val_->GetSelf(),
|
|
logger_, statistics_, clock_);
|
|
pinnable_val_->PinSelf();
|
|
if (!merge_status.ok()) {
|
|
state_ = kCorrupt;
|
|
}
|
|
}
|
|
}
|
|
return false;
|
|
}
|
|
return true;
|
|
|
|
default:
|
|
assert(false);
|
|
break;
|
|
}
|
|
}
|
|
|
|
// state_ could be Corrupt, merge or notfound
|
|
return false;
|
|
}
|
|
|
|
void GetContext::push_operand(const Slice& value, Cleanable* value_pinner) {
|
|
if (pinned_iters_mgr() && pinned_iters_mgr()->PinningEnabled() &&
|
|
value_pinner != nullptr) {
|
|
value_pinner->DelegateCleanupsTo(pinned_iters_mgr());
|
|
merge_context_->PushOperand(value, true /*value_pinned*/);
|
|
} else {
|
|
merge_context_->PushOperand(value, false);
|
|
}
|
|
}
|
|
|
|
void replayGetContextLog(const Slice& replay_log, const Slice& user_key,
|
|
GetContext* get_context, Cleanable* value_pinner) {
|
|
#ifndef ROCKSDB_LITE
|
|
Slice s = replay_log;
|
|
while (s.size()) {
|
|
auto type = static_cast<ValueType>(*s.data());
|
|
s.remove_prefix(1);
|
|
Slice value;
|
|
bool ret = GetLengthPrefixedSlice(&s, &value);
|
|
assert(ret);
|
|
(void)ret;
|
|
|
|
bool dont_care __attribute__((__unused__));
|
|
// Since SequenceNumber is not stored and unknown, we will use
|
|
// kMaxSequenceNumber.
|
|
get_context->SaveValue(
|
|
ParsedInternalKey(user_key, kMaxSequenceNumber, type), value,
|
|
&dont_care, value_pinner);
|
|
}
|
|
#else // ROCKSDB_LITE
|
|
(void)replay_log;
|
|
(void)user_key;
|
|
(void)get_context;
|
|
(void)value_pinner;
|
|
assert(false);
|
|
#endif // ROCKSDB_LITE
|
|
}
|
|
|
|
} // namespace ROCKSDB_NAMESPACE
|