refactor TableCache Get/NewIterator for single exit points
Summary: these functions were too complicated to change with exit points everywhere, so refactored them. btw, please review urgently, this is a prereq to fix the 5.0 perf regression Closes https://github.com/facebook/rocksdb/pull/1534 Differential Revision: D4198972 Pulled By: ajkr fbshipit-source-id: 04ebfb7
This commit is contained in:
parent
f39452e81f
commit
635a7bd1ad
@ -174,77 +174,78 @@ InternalIterator* TableCache::NewIterator(
|
||||
bool skip_filters, int level) {
|
||||
PERF_TIMER_GUARD(new_table_iterator_nanos);
|
||||
|
||||
Status s;
|
||||
if (range_del_agg != nullptr && !options.ignore_range_deletions) {
|
||||
std::unique_ptr<InternalIterator> range_del_iter(NewRangeDeletionIterator(
|
||||
options, icomparator, fd, file_read_hist, skip_filters, level));
|
||||
Status s = range_del_iter->status();
|
||||
s = range_del_iter->status();
|
||||
if (s.ok()) {
|
||||
s = range_del_agg->AddTombstones(std::move(range_del_iter));
|
||||
}
|
||||
if (!s.ok()) {
|
||||
return NewErrorInternalIterator(s, arena);
|
||||
}
|
||||
}
|
||||
|
||||
if (table_reader_ptr != nullptr) {
|
||||
*table_reader_ptr = nullptr;
|
||||
}
|
||||
|
||||
bool create_new_table_reader = false;
|
||||
TableReader* table_reader = nullptr;
|
||||
Cache::Handle* handle = nullptr;
|
||||
|
||||
size_t readahead = 0;
|
||||
bool create_new_table_reader = false;
|
||||
if (for_compaction) {
|
||||
if (ioptions_.new_table_reader_for_compaction_inputs) {
|
||||
readahead = ioptions_.compaction_readahead_size;
|
||||
create_new_table_reader = true;
|
||||
if (s.ok()) {
|
||||
if (table_reader_ptr != nullptr) {
|
||||
*table_reader_ptr = nullptr;
|
||||
}
|
||||
} else {
|
||||
readahead = options.readahead_size;
|
||||
create_new_table_reader = readahead > 0;
|
||||
}
|
||||
|
||||
if (create_new_table_reader) {
|
||||
unique_ptr<TableReader> table_reader_unique_ptr;
|
||||
Status s = GetTableReader(
|
||||
env_options, icomparator, fd, true /* sequential_mode */, readahead,
|
||||
!for_compaction /* record stats */, nullptr, &table_reader_unique_ptr,
|
||||
false /* skip_filters */, level);
|
||||
if (!s.ok()) {
|
||||
return NewErrorInternalIterator(s, arena);
|
||||
}
|
||||
table_reader = table_reader_unique_ptr.release();
|
||||
} else {
|
||||
table_reader = fd.table_reader;
|
||||
if (table_reader == nullptr) {
|
||||
Status s = FindTable(env_options, icomparator, fd, &handle,
|
||||
options.read_tier == kBlockCacheTier /* no_io */,
|
||||
!for_compaction /* record read_stats */,
|
||||
file_read_hist, skip_filters, level);
|
||||
if (!s.ok()) {
|
||||
return NewErrorInternalIterator(s, arena);
|
||||
size_t readahead = 0;
|
||||
if (for_compaction) {
|
||||
if (ioptions_.new_table_reader_for_compaction_inputs) {
|
||||
readahead = ioptions_.compaction_readahead_size;
|
||||
create_new_table_reader = true;
|
||||
}
|
||||
} else {
|
||||
readahead = options.readahead_size;
|
||||
create_new_table_reader = readahead > 0;
|
||||
}
|
||||
|
||||
if (create_new_table_reader) {
|
||||
unique_ptr<TableReader> table_reader_unique_ptr;
|
||||
s = GetTableReader(
|
||||
env_options, icomparator, fd, true /* sequential_mode */, readahead,
|
||||
!for_compaction /* record stats */, nullptr, &table_reader_unique_ptr,
|
||||
false /* skip_filters */, level);
|
||||
if (s.ok()) {
|
||||
table_reader = table_reader_unique_ptr.release();
|
||||
}
|
||||
} else {
|
||||
table_reader = fd.table_reader;
|
||||
if (table_reader == nullptr) {
|
||||
s = FindTable(env_options, icomparator, fd, &handle,
|
||||
options.read_tier == kBlockCacheTier /* no_io */,
|
||||
!for_compaction /* record read_stats */, file_read_hist,
|
||||
skip_filters, level);
|
||||
if (s.ok()) {
|
||||
table_reader = GetTableReaderFromHandle(handle);
|
||||
}
|
||||
}
|
||||
table_reader = GetTableReaderFromHandle(handle);
|
||||
}
|
||||
}
|
||||
if (s.ok()) {
|
||||
InternalIterator* result =
|
||||
table_reader->NewIterator(options, arena, skip_filters);
|
||||
if (create_new_table_reader) {
|
||||
assert(handle == nullptr);
|
||||
result->RegisterCleanup(&DeleteTableReader, table_reader, nullptr);
|
||||
} else if (handle != nullptr) {
|
||||
result->RegisterCleanup(&UnrefEntry, cache_, handle);
|
||||
}
|
||||
|
||||
InternalIterator* result =
|
||||
table_reader->NewIterator(options, arena, skip_filters);
|
||||
if (create_new_table_reader) {
|
||||
assert(handle == nullptr);
|
||||
result->RegisterCleanup(&DeleteTableReader, table_reader, nullptr);
|
||||
} else if (handle != nullptr) {
|
||||
result->RegisterCleanup(&UnrefEntry, cache_, handle);
|
||||
if (for_compaction) {
|
||||
table_reader->SetupForCompaction();
|
||||
}
|
||||
if (table_reader_ptr != nullptr) {
|
||||
*table_reader_ptr = table_reader;
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
if (for_compaction) {
|
||||
table_reader->SetupForCompaction();
|
||||
if (handle != nullptr) {
|
||||
ReleaseHandle(handle);
|
||||
}
|
||||
if (table_reader_ptr != nullptr) {
|
||||
*table_reader_ptr = table_reader;
|
||||
}
|
||||
return result;
|
||||
return NewErrorInternalIterator(s);
|
||||
}
|
||||
|
||||
InternalIterator* TableCache::NewRangeDeletionIterator(
|
||||
@ -281,89 +282,87 @@ Status TableCache::Get(const ReadOptions& options,
|
||||
const FileDescriptor& fd, const Slice& k,
|
||||
GetContext* get_context, HistogramImpl* file_read_hist,
|
||||
bool skip_filters, int level) {
|
||||
Status s;
|
||||
if (get_context->range_del_agg() != nullptr &&
|
||||
!options.ignore_range_deletions) {
|
||||
std::unique_ptr<InternalIterator> range_del_iter(NewRangeDeletionIterator(
|
||||
options, internal_comparator, fd, file_read_hist, skip_filters, level));
|
||||
Status s = range_del_iter->status();
|
||||
s = range_del_iter->status();
|
||||
if (s.ok()) {
|
||||
s = get_context->range_del_agg()->AddTombstones(
|
||||
std::move(range_del_iter));
|
||||
}
|
||||
if (!s.ok()) {
|
||||
return s;
|
||||
}
|
||||
}
|
||||
|
||||
TableReader* t = fd.table_reader;
|
||||
Status s;
|
||||
Cache::Handle* handle = nullptr;
|
||||
std::string* row_cache_entry = nullptr;
|
||||
|
||||
bool done = false;
|
||||
#ifndef ROCKSDB_LITE
|
||||
IterKey row_cache_key;
|
||||
std::string row_cache_entry_buffer;
|
||||
if (s.ok()) {
|
||||
// Check row cache if enabled. Since row cache does not currently store
|
||||
// sequence numbers, we cannot use it if we need to fetch the sequence.
|
||||
if (ioptions_.row_cache && !get_context->NeedToReadSequence()) {
|
||||
uint64_t fd_number = fd.GetNumber();
|
||||
auto user_key = ExtractUserKey(k);
|
||||
// We use the user key as cache key instead of the internal key,
|
||||
// otherwise the whole cache would be invalidated every time the
|
||||
// sequence key increases. However, to support caching snapshot
|
||||
// reads, we append the sequence number (incremented by 1 to
|
||||
// distinguish from 0) only in this case.
|
||||
uint64_t seq_no =
|
||||
options.snapshot == nullptr ? 0 : 1 + GetInternalKeySeqno(k);
|
||||
|
||||
// Check row cache if enabled. Since row cache does not currently store
|
||||
// sequence numbers, we cannot use it if we need to fetch the sequence.
|
||||
if (ioptions_.row_cache && !get_context->NeedToReadSequence()) {
|
||||
uint64_t fd_number = fd.GetNumber();
|
||||
auto user_key = ExtractUserKey(k);
|
||||
// We use the user key as cache key instead of the internal key,
|
||||
// otherwise the whole cache would be invalidated every time the
|
||||
// sequence key increases. However, to support caching snapshot
|
||||
// reads, we append the sequence number (incremented by 1 to
|
||||
// distinguish from 0) only in this case.
|
||||
uint64_t seq_no =
|
||||
options.snapshot == nullptr ? 0 : 1 + GetInternalKeySeqno(k);
|
||||
// Compute row cache key.
|
||||
row_cache_key.TrimAppend(row_cache_key.Size(), row_cache_id_.data(),
|
||||
row_cache_id_.size());
|
||||
AppendVarint64(&row_cache_key, fd_number);
|
||||
AppendVarint64(&row_cache_key, seq_no);
|
||||
row_cache_key.TrimAppend(row_cache_key.Size(), user_key.data(),
|
||||
user_key.size());
|
||||
|
||||
// Compute row cache key.
|
||||
row_cache_key.TrimAppend(row_cache_key.Size(), row_cache_id_.data(),
|
||||
row_cache_id_.size());
|
||||
AppendVarint64(&row_cache_key, fd_number);
|
||||
AppendVarint64(&row_cache_key, seq_no);
|
||||
row_cache_key.TrimAppend(row_cache_key.Size(), user_key.data(),
|
||||
user_key.size());
|
||||
|
||||
if (auto row_handle = ioptions_.row_cache->Lookup(row_cache_key.GetKey())) {
|
||||
auto found_row_cache_entry = static_cast<const std::string*>(
|
||||
ioptions_.row_cache->Value(row_handle));
|
||||
replayGetContextLog(*found_row_cache_entry, user_key, get_context);
|
||||
ioptions_.row_cache->Release(row_handle);
|
||||
RecordTick(ioptions_.statistics, ROW_CACHE_HIT);
|
||||
return Status::OK();
|
||||
if (auto row_handle =
|
||||
ioptions_.row_cache->Lookup(row_cache_key.GetKey())) {
|
||||
auto found_row_cache_entry = static_cast<const std::string*>(
|
||||
ioptions_.row_cache->Value(row_handle));
|
||||
replayGetContextLog(*found_row_cache_entry, user_key, get_context);
|
||||
ioptions_.row_cache->Release(row_handle);
|
||||
RecordTick(ioptions_.statistics, ROW_CACHE_HIT);
|
||||
done = true;
|
||||
} else {
|
||||
// Not found, setting up the replay log.
|
||||
RecordTick(ioptions_.statistics, ROW_CACHE_MISS);
|
||||
row_cache_entry = &row_cache_entry_buffer;
|
||||
}
|
||||
}
|
||||
|
||||
// Not found, setting up the replay log.
|
||||
RecordTick(ioptions_.statistics, ROW_CACHE_MISS);
|
||||
row_cache_entry = &row_cache_entry_buffer;
|
||||
}
|
||||
#endif // ROCKSDB_LITE
|
||||
|
||||
if (!t) {
|
||||
s = FindTable(env_options_, internal_comparator, fd, &handle,
|
||||
options.read_tier == kBlockCacheTier /* no_io */,
|
||||
true /* record_read_stats */, file_read_hist, skip_filters,
|
||||
level);
|
||||
if (!done && s.ok()) {
|
||||
if (!t) {
|
||||
s = FindTable(env_options_, internal_comparator, fd, &handle,
|
||||
options.read_tier == kBlockCacheTier /* no_io */,
|
||||
true /* record_read_stats */, file_read_hist, skip_filters,
|
||||
level);
|
||||
if (s.ok()) {
|
||||
t = GetTableReaderFromHandle(handle);
|
||||
}
|
||||
}
|
||||
if (s.ok()) {
|
||||
t = GetTableReaderFromHandle(handle);
|
||||
get_context->SetReplayLog(row_cache_entry); // nullptr if no cache.
|
||||
s = t->Get(options, k, get_context, skip_filters);
|
||||
get_context->SetReplayLog(nullptr);
|
||||
} else if (options.read_tier == kBlockCacheTier && s.IsIncomplete()) {
|
||||
// Couldn't find Table in cache but treat as kFound if no_io set
|
||||
get_context->MarkKeyMayExist();
|
||||
s = Status::OK();
|
||||
done = true;
|
||||
}
|
||||
}
|
||||
if (s.ok()) {
|
||||
get_context->SetReplayLog(row_cache_entry); // nullptr if no cache.
|
||||
s = t->Get(options, k, get_context, skip_filters);
|
||||
get_context->SetReplayLog(nullptr);
|
||||
if (handle != nullptr) {
|
||||
ReleaseHandle(handle);
|
||||
}
|
||||
} else if (options.read_tier == kBlockCacheTier && s.IsIncomplete()) {
|
||||
// Couldn't find Table in cache but treat as kFound if no_io set
|
||||
get_context->MarkKeyMayExist();
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
#ifndef ROCKSDB_LITE
|
||||
// Put the replay log in row cache only if something was found.
|
||||
if (s.ok() && row_cache_entry && !row_cache_entry->empty()) {
|
||||
if (!done && s.ok() && row_cache_entry && !row_cache_entry->empty()) {
|
||||
size_t charge =
|
||||
row_cache_key.Size() + row_cache_entry->size() + sizeof(std::string);
|
||||
void* row_ptr = new std::string(std::move(*row_cache_entry));
|
||||
@ -372,6 +371,9 @@ Status TableCache::Get(const ReadOptions& options,
|
||||
}
|
||||
#endif // ROCKSDB_LITE
|
||||
|
||||
if (handle != nullptr) {
|
||||
ReleaseHandle(handle);
|
||||
}
|
||||
return s;
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user