From 635a7bd1adb9e9d943bebb34dc9e42789a06e25f Mon Sep 17 00:00:00 2001 From: Andrew Kryczka Date: Thu, 17 Nov 2016 14:30:11 -0800 Subject: [PATCH] refactor TableCache Get/NewIterator for single exit points Summary: these functions were too complicated to change with exit points everywhere, so refactored them. btw, please review urgently, this is a prereq to fix the 5.0 perf regression Closes https://github.com/facebook/rocksdb/pull/1534 Differential Revision: D4198972 Pulled By: ajkr fbshipit-source-id: 04ebfb7 --- db/table_cache.cc | 226 +++++++++++++++++++++++----------------------- 1 file changed, 114 insertions(+), 112 deletions(-) diff --git a/db/table_cache.cc b/db/table_cache.cc index 7de8681ff..a49a95787 100644 --- a/db/table_cache.cc +++ b/db/table_cache.cc @@ -174,77 +174,78 @@ InternalIterator* TableCache::NewIterator( bool skip_filters, int level) { PERF_TIMER_GUARD(new_table_iterator_nanos); + Status s; if (range_del_agg != nullptr && !options.ignore_range_deletions) { std::unique_ptr range_del_iter(NewRangeDeletionIterator( options, icomparator, fd, file_read_hist, skip_filters, level)); - Status s = range_del_iter->status(); + s = range_del_iter->status(); if (s.ok()) { s = range_del_agg->AddTombstones(std::move(range_del_iter)); } - if (!s.ok()) { - return NewErrorInternalIterator(s, arena); - } - } - - if (table_reader_ptr != nullptr) { - *table_reader_ptr = nullptr; } + bool create_new_table_reader = false; TableReader* table_reader = nullptr; Cache::Handle* handle = nullptr; - - size_t readahead = 0; - bool create_new_table_reader = false; - if (for_compaction) { - if (ioptions_.new_table_reader_for_compaction_inputs) { - readahead = ioptions_.compaction_readahead_size; - create_new_table_reader = true; + if (s.ok()) { + if (table_reader_ptr != nullptr) { + *table_reader_ptr = nullptr; } - } else { - readahead = options.readahead_size; - create_new_table_reader = readahead > 0; - } - - if (create_new_table_reader) { - unique_ptr table_reader_unique_ptr; - Status s = GetTableReader( - env_options, icomparator, fd, true /* sequential_mode */, readahead, - !for_compaction /* record stats */, nullptr, &table_reader_unique_ptr, - false /* skip_filters */, level); - if (!s.ok()) { - return NewErrorInternalIterator(s, arena); - } - table_reader = table_reader_unique_ptr.release(); - } else { - table_reader = fd.table_reader; - if (table_reader == nullptr) { - Status s = FindTable(env_options, icomparator, fd, &handle, - options.read_tier == kBlockCacheTier /* no_io */, - !for_compaction /* record read_stats */, - file_read_hist, skip_filters, level); - if (!s.ok()) { - return NewErrorInternalIterator(s, arena); + size_t readahead = 0; + if (for_compaction) { + if (ioptions_.new_table_reader_for_compaction_inputs) { + readahead = ioptions_.compaction_readahead_size; + create_new_table_reader = true; + } + } else { + readahead = options.readahead_size; + create_new_table_reader = readahead > 0; + } + + if (create_new_table_reader) { + unique_ptr table_reader_unique_ptr; + s = GetTableReader( + env_options, icomparator, fd, true /* sequential_mode */, readahead, + !for_compaction /* record stats */, nullptr, &table_reader_unique_ptr, + false /* skip_filters */, level); + if (s.ok()) { + table_reader = table_reader_unique_ptr.release(); + } + } else { + table_reader = fd.table_reader; + if (table_reader == nullptr) { + s = FindTable(env_options, icomparator, fd, &handle, + options.read_tier == kBlockCacheTier /* no_io */, + !for_compaction /* record read_stats */, file_read_hist, + skip_filters, level); + if (s.ok()) { + table_reader = GetTableReaderFromHandle(handle); + } } - table_reader = GetTableReaderFromHandle(handle); } } + if (s.ok()) { + InternalIterator* result = + table_reader->NewIterator(options, arena, skip_filters); + if (create_new_table_reader) { + assert(handle == nullptr); + result->RegisterCleanup(&DeleteTableReader, table_reader, nullptr); + } else if (handle != nullptr) { + result->RegisterCleanup(&UnrefEntry, cache_, handle); + } - InternalIterator* result = - table_reader->NewIterator(options, arena, skip_filters); - if (create_new_table_reader) { - assert(handle == nullptr); - result->RegisterCleanup(&DeleteTableReader, table_reader, nullptr); - } else if (handle != nullptr) { - result->RegisterCleanup(&UnrefEntry, cache_, handle); + if (for_compaction) { + table_reader->SetupForCompaction(); + } + if (table_reader_ptr != nullptr) { + *table_reader_ptr = table_reader; + } + return result; } - - if (for_compaction) { - table_reader->SetupForCompaction(); + if (handle != nullptr) { + ReleaseHandle(handle); } - if (table_reader_ptr != nullptr) { - *table_reader_ptr = table_reader; - } - return result; + return NewErrorInternalIterator(s); } InternalIterator* TableCache::NewRangeDeletionIterator( @@ -281,89 +282,87 @@ Status TableCache::Get(const ReadOptions& options, const FileDescriptor& fd, const Slice& k, GetContext* get_context, HistogramImpl* file_read_hist, bool skip_filters, int level) { + Status s; if (get_context->range_del_agg() != nullptr && !options.ignore_range_deletions) { std::unique_ptr range_del_iter(NewRangeDeletionIterator( options, internal_comparator, fd, file_read_hist, skip_filters, level)); - Status s = range_del_iter->status(); + s = range_del_iter->status(); if (s.ok()) { s = get_context->range_del_agg()->AddTombstones( std::move(range_del_iter)); } - if (!s.ok()) { - return s; - } } + TableReader* t = fd.table_reader; - Status s; Cache::Handle* handle = nullptr; std::string* row_cache_entry = nullptr; - + bool done = false; #ifndef ROCKSDB_LITE IterKey row_cache_key; std::string row_cache_entry_buffer; + if (s.ok()) { + // Check row cache if enabled. Since row cache does not currently store + // sequence numbers, we cannot use it if we need to fetch the sequence. + if (ioptions_.row_cache && !get_context->NeedToReadSequence()) { + uint64_t fd_number = fd.GetNumber(); + auto user_key = ExtractUserKey(k); + // We use the user key as cache key instead of the internal key, + // otherwise the whole cache would be invalidated every time the + // sequence key increases. However, to support caching snapshot + // reads, we append the sequence number (incremented by 1 to + // distinguish from 0) only in this case. + uint64_t seq_no = + options.snapshot == nullptr ? 0 : 1 + GetInternalKeySeqno(k); - // Check row cache if enabled. Since row cache does not currently store - // sequence numbers, we cannot use it if we need to fetch the sequence. - if (ioptions_.row_cache && !get_context->NeedToReadSequence()) { - uint64_t fd_number = fd.GetNumber(); - auto user_key = ExtractUserKey(k); - // We use the user key as cache key instead of the internal key, - // otherwise the whole cache would be invalidated every time the - // sequence key increases. However, to support caching snapshot - // reads, we append the sequence number (incremented by 1 to - // distinguish from 0) only in this case. - uint64_t seq_no = - options.snapshot == nullptr ? 0 : 1 + GetInternalKeySeqno(k); + // Compute row cache key. + row_cache_key.TrimAppend(row_cache_key.Size(), row_cache_id_.data(), + row_cache_id_.size()); + AppendVarint64(&row_cache_key, fd_number); + AppendVarint64(&row_cache_key, seq_no); + row_cache_key.TrimAppend(row_cache_key.Size(), user_key.data(), + user_key.size()); - // Compute row cache key. - row_cache_key.TrimAppend(row_cache_key.Size(), row_cache_id_.data(), - row_cache_id_.size()); - AppendVarint64(&row_cache_key, fd_number); - AppendVarint64(&row_cache_key, seq_no); - row_cache_key.TrimAppend(row_cache_key.Size(), user_key.data(), - user_key.size()); - - if (auto row_handle = ioptions_.row_cache->Lookup(row_cache_key.GetKey())) { - auto found_row_cache_entry = static_cast( - ioptions_.row_cache->Value(row_handle)); - replayGetContextLog(*found_row_cache_entry, user_key, get_context); - ioptions_.row_cache->Release(row_handle); - RecordTick(ioptions_.statistics, ROW_CACHE_HIT); - return Status::OK(); + if (auto row_handle = + ioptions_.row_cache->Lookup(row_cache_key.GetKey())) { + auto found_row_cache_entry = static_cast( + ioptions_.row_cache->Value(row_handle)); + replayGetContextLog(*found_row_cache_entry, user_key, get_context); + ioptions_.row_cache->Release(row_handle); + RecordTick(ioptions_.statistics, ROW_CACHE_HIT); + done = true; + } else { + // Not found, setting up the replay log. + RecordTick(ioptions_.statistics, ROW_CACHE_MISS); + row_cache_entry = &row_cache_entry_buffer; + } } - - // Not found, setting up the replay log. - RecordTick(ioptions_.statistics, ROW_CACHE_MISS); - row_cache_entry = &row_cache_entry_buffer; } #endif // ROCKSDB_LITE - - if (!t) { - s = FindTable(env_options_, internal_comparator, fd, &handle, - options.read_tier == kBlockCacheTier /* no_io */, - true /* record_read_stats */, file_read_hist, skip_filters, - level); + if (!done && s.ok()) { + if (!t) { + s = FindTable(env_options_, internal_comparator, fd, &handle, + options.read_tier == kBlockCacheTier /* no_io */, + true /* record_read_stats */, file_read_hist, skip_filters, + level); + if (s.ok()) { + t = GetTableReaderFromHandle(handle); + } + } if (s.ok()) { - t = GetTableReaderFromHandle(handle); + get_context->SetReplayLog(row_cache_entry); // nullptr if no cache. + s = t->Get(options, k, get_context, skip_filters); + get_context->SetReplayLog(nullptr); + } else if (options.read_tier == kBlockCacheTier && s.IsIncomplete()) { + // Couldn't find Table in cache but treat as kFound if no_io set + get_context->MarkKeyMayExist(); + s = Status::OK(); + done = true; } } - if (s.ok()) { - get_context->SetReplayLog(row_cache_entry); // nullptr if no cache. - s = t->Get(options, k, get_context, skip_filters); - get_context->SetReplayLog(nullptr); - if (handle != nullptr) { - ReleaseHandle(handle); - } - } else if (options.read_tier == kBlockCacheTier && s.IsIncomplete()) { - // Couldn't find Table in cache but treat as kFound if no_io set - get_context->MarkKeyMayExist(); - return Status::OK(); - } - #ifndef ROCKSDB_LITE // Put the replay log in row cache only if something was found. - if (s.ok() && row_cache_entry && !row_cache_entry->empty()) { + if (!done && s.ok() && row_cache_entry && !row_cache_entry->empty()) { size_t charge = row_cache_key.Size() + row_cache_entry->size() + sizeof(std::string); void* row_ptr = new std::string(std::move(*row_cache_entry)); @@ -372,6 +371,9 @@ Status TableCache::Get(const ReadOptions& options, } #endif // ROCKSDB_LITE + if (handle != nullptr) { + ReleaseHandle(handle); + } return s; }