From ff8953380f356768d5ea7bf26b2734d86484d228 Mon Sep 17 00:00:00 2001 From: Merlin Mao Date: Thu, 19 Aug 2021 17:26:11 -0700 Subject: [PATCH 1/5] Add iterator's lower and upper bounds to `TraceRecord` (#8677) Summary: Trace file V2 added lower/upper bounds to `Iterator::Seek()` and `Iterator::SeekForPrev()`. They were not used anywhere during the execution of a `TraceRecord`. Now they are added to be used by `ReadOptions` during `Iterator::Seek()` and `Iterator::SeekForPrev()` if they are set. Added test cases in `DBTest2.TraceAndManualReplay`. Pull Request resolved: https://github.com/facebook/rocksdb/pull/8677 Reviewed By: zhichao-cao Differential Revision: D30438255 Pulled By: autopear fbshipit-source-id: 82563006be0b69155990e506a74951c18af8d288 --- db/db_test2.cc | 52 ++++++++++++++++++++++++++-- include/rocksdb/trace_record.h | 18 ++++++++++ trace_replay/trace_record.cc | 31 +++++++++++++++++ trace_replay/trace_record_handler.cc | 11 +++++- trace_replay/trace_replay.cc | 13 ++++--- 5 files changed, 117 insertions(+), 8 deletions(-) diff --git a/db/db_test2.cc b/db/db_test2.cc index c09ea4c71..b7e626df6 100644 --- a/db/db_test2.cc +++ b/db/db_test2.cc @@ -4538,6 +4538,37 @@ TEST_F(DBTest2, TraceAndManualReplay) { single_iter = db_->NewIterator(ro); single_iter->Seek("f"); single_iter->SeekForPrev("g"); + ASSERT_OK(single_iter->status()); + delete single_iter; + + // Write some sequenced keys for testing lower/upper bounds of iterator. + batch.Clear(); + ASSERT_OK(batch.Put("iter-0", "iter-0")); + ASSERT_OK(batch.Put("iter-1", "iter-1")); + ASSERT_OK(batch.Put("iter-2", "iter-2")); + ASSERT_OK(batch.Put("iter-3", "iter-3")); + ASSERT_OK(batch.Put("iter-4", "iter-4")); + ASSERT_OK(db_->Write(wo, &batch)); + + ReadOptions bounded_ro = ro; + Slice lower_bound("iter-1"); + Slice upper_bound("iter-3"); + bounded_ro.iterate_lower_bound = &lower_bound; + bounded_ro.iterate_upper_bound = &upper_bound; + single_iter = db_->NewIterator(bounded_ro); + single_iter->Seek("iter-0"); + ASSERT_EQ(single_iter->key().ToString(), "iter-1"); + single_iter->Seek("iter-2"); + ASSERT_EQ(single_iter->key().ToString(), "iter-2"); + single_iter->Seek("iter-4"); + ASSERT_FALSE(single_iter->Valid()); + single_iter->SeekForPrev("iter-0"); + ASSERT_FALSE(single_iter->Valid()); + single_iter->SeekForPrev("iter-2"); + ASSERT_EQ(single_iter->key().ToString(), "iter-2"); + single_iter->SeekForPrev("iter-4"); + ASSERT_EQ(single_iter->key().ToString(), "iter-2"); + ASSERT_OK(single_iter->status()); delete single_iter; ASSERT_EQ("1", Get(0, "a")); @@ -4548,6 +4579,9 @@ TEST_F(DBTest2, TraceAndManualReplay) { ASSERT_EQ("NOT_FOUND", Get(1, "leveldb")); // Same as TraceAndReplay, Write x 8, Get x 3, Seek x 2. + // Plus 1 WriteBatch for iterator with lower/upper bounds, and 6 + // Seek(ForPrev)s. + // Total Write x 9, Get x 3, Seek x 8 ASSERT_OK(db_->EndTrace()); // These should not get into the trace file as it is after EndTrace. ASSERT_OK(Put("hello", "world")); @@ -4610,6 +4644,20 @@ TEST_F(DBTest2, TraceAndManualReplay) { continue; } if (s.ok()) { + if (record->GetTraceType() == kTraceIteratorSeek || + record->GetTraceType() == kTraceIteratorSeekForPrev) { + IteratorSeekQueryTraceRecord* iter_r = + dynamic_cast(record.get()); + // Check if lower/upper bounds are correctly saved and decoded. + lower_bound = iter_r->GetLowerBound(); + if (!lower_bound.empty()) { + ASSERT_EQ(lower_bound.ToString(), "iter-1"); + } + upper_bound = iter_r->GetUpperBound(); + if (!upper_bound.empty()) { + ASSERT_EQ(upper_bound.ToString(), "iter-3"); + } + } ASSERT_OK(replayer->Execute(record, &result)); if (result != nullptr) { ASSERT_OK(result->Accept(&res_handler)); @@ -4622,9 +4670,9 @@ TEST_F(DBTest2, TraceAndManualReplay) { ASSERT_TRUE(s.IsIncomplete()); ASSERT_TRUE(replayer->Next(nullptr).IsIncomplete()); ASSERT_GT(res_handler.GetAvgLatency(), 0.0); - ASSERT_EQ(res_handler.GetNumWrites(), 8); + ASSERT_EQ(res_handler.GetNumWrites(), 9); ASSERT_EQ(res_handler.GetNumGets(), 3); - ASSERT_EQ(res_handler.GetNumIterSeeks(), 2); + ASSERT_EQ(res_handler.GetNumIterSeeks(), 8); ASSERT_EQ(res_handler.GetNumMultiGets(), 0); res_handler.Reset(); } diff --git a/include/rocksdb/trace_record.h b/include/rocksdb/trace_record.h index 3f591d3d1..defa89934 100644 --- a/include/rocksdb/trace_record.h +++ b/include/rocksdb/trace_record.h @@ -169,6 +169,16 @@ class IteratorSeekQueryTraceRecord : public IteratorQueryTraceRecord { IteratorSeekQueryTraceRecord(SeekType seekType, uint32_t column_family_id, const std::string& key, uint64_t timestamp); + IteratorSeekQueryTraceRecord(SeekType seekType, uint32_t column_family_id, + PinnableSlice&& key, PinnableSlice&& lower_bound, + PinnableSlice&& upper_bound, uint64_t timestamp); + + IteratorSeekQueryTraceRecord(SeekType seekType, uint32_t column_family_id, + const std::string& key, + const std::string& lower_bound, + const std::string& upper_bound, + uint64_t timestamp); + virtual ~IteratorSeekQueryTraceRecord() override; // Trace type matches the seek type. @@ -183,6 +193,12 @@ class IteratorSeekQueryTraceRecord : public IteratorQueryTraceRecord { // Key to seek to. virtual Slice GetKey() const; + // Iterate lower bound. + virtual Slice GetLowerBound() const; + + // Iterate upper bound. + virtual Slice GetUpperBound() const; + Status Accept(Handler* handler, std::unique_ptr* result) override; @@ -190,6 +206,8 @@ class IteratorSeekQueryTraceRecord : public IteratorQueryTraceRecord { SeekType type_; uint32_t cf_id_; PinnableSlice key_; + PinnableSlice lower_; + PinnableSlice upper_; }; // Trace record for DB::MultiGet() operation. diff --git a/trace_replay/trace_record.cc b/trace_replay/trace_record.cc index e0ce02090..b377a18d2 100644 --- a/trace_replay/trace_record.cc +++ b/trace_replay/trace_record.cc @@ -100,6 +100,29 @@ IteratorSeekQueryTraceRecord::IteratorSeekQueryTraceRecord( key_.PinSelf(key); } +IteratorSeekQueryTraceRecord::IteratorSeekQueryTraceRecord( + SeekType seek_type, uint32_t column_family_id, PinnableSlice&& key, + PinnableSlice&& lower_bound, PinnableSlice&& upper_bound, + uint64_t timestamp) + : IteratorQueryTraceRecord(timestamp), + type_(seek_type), + cf_id_(column_family_id), + key_(std::move(key)), + lower_(std::move(lower_bound)), + upper_(std::move(upper_bound)) {} + +IteratorSeekQueryTraceRecord::IteratorSeekQueryTraceRecord( + SeekType seek_type, uint32_t column_family_id, const std::string& key, + const std::string& lower_bound, const std::string& upper_bound, + uint64_t timestamp) + : IteratorQueryTraceRecord(timestamp), + type_(seek_type), + cf_id_(column_family_id) { + key_.PinSelf(key); + lower_.PinSelf(lower_bound); + upper_.PinSelf(upper_bound); +} + IteratorSeekQueryTraceRecord::~IteratorSeekQueryTraceRecord() { key_.clear(); } TraceType IteratorSeekQueryTraceRecord::GetTraceType() const { @@ -117,6 +140,14 @@ uint32_t IteratorSeekQueryTraceRecord::GetColumnFamilyID() const { Slice IteratorSeekQueryTraceRecord::GetKey() const { return Slice(key_); } +Slice IteratorSeekQueryTraceRecord::GetLowerBound() const { + return Slice(lower_); +} + +Slice IteratorSeekQueryTraceRecord::GetUpperBound() const { + return Slice(upper_); +} + Status IteratorSeekQueryTraceRecord::Accept( Handler* handler, std::unique_ptr* result) { assert(handler != nullptr); diff --git a/trace_replay/trace_record_handler.cc b/trace_replay/trace_record_handler.cc index 6343d2ed3..e0b5fb202 100644 --- a/trace_replay/trace_record_handler.cc +++ b/trace_replay/trace_record_handler.cc @@ -93,7 +93,16 @@ Status TraceExecutionHandler::Handle( return Status::Corruption("Invalid Column Family ID."); } - Iterator* single_iter = db_->NewIterator(read_opts_, it->second); + ReadOptions r_opts = read_opts_; + Slice lower = record.GetLowerBound(); + if (!lower.empty()) { + r_opts.iterate_lower_bound = &lower; + } + Slice upper = record.GetUpperBound(); + if (!upper.empty()) { + r_opts.iterate_upper_bound = &upper; + } + Iterator* single_iter = db_->NewIterator(r_opts, it->second); uint64_t start = clock_->NowMicros(); diff --git a/trace_replay/trace_replay.cc b/trace_replay/trace_replay.cc index af2e76500..01867b9f4 100644 --- a/trace_replay/trace_replay.cc +++ b/trace_replay/trace_replay.cc @@ -225,14 +225,12 @@ Status TracerHelper::DecodeIterRecord(Trace* trace, int trace_file_version, uint32_t cf_id = 0; Slice iter_key; + Slice lower_bound; + Slice upper_bound; if (trace_file_version < 2) { DecodeCFAndKey(trace->payload, &cf_id, &iter_key); } else { - // Are these two used anywhere? - Slice lower_bound; - Slice upper_bound; - Slice buf(trace->payload); GetFixed64(&buf, &trace->payload_map); int64_t payload_map = static_cast(trace->payload_map); @@ -264,9 +262,14 @@ Status TracerHelper::DecodeIterRecord(Trace* trace, int trace_file_version, if (record != nullptr) { PinnableSlice ps_key; ps_key.PinSelf(iter_key); + PinnableSlice ps_lower; + ps_lower.PinSelf(lower_bound); + PinnableSlice ps_upper; + ps_upper.PinSelf(upper_bound); record->reset(new IteratorSeekQueryTraceRecord( static_cast(trace->type), cf_id, - std::move(ps_key), trace->ts)); + std::move(ps_key), std::move(ps_lower), std::move(ps_upper), + trace->ts)); } return Status::OK(); From 5efec84c606d715090521f00916f2099fa743674 Mon Sep 17 00:00:00 2001 From: Akanksha Mahajan Date: Fri, 20 Aug 2021 11:37:53 -0700 Subject: [PATCH 2/5] Fix blob callback in compaction and atomic flush (#8681) Summary: Pass BlobFileCompletionCallback in case of atomic flush and compaction job which is currently nullptr(default parameter). BlobFileCompletionCallback is used in case of IntegratedBlobDB to report new blob files to SstFileManager. Pull Request resolved: https://github.com/facebook/rocksdb/pull/8681 Test Plan: CircleCI jobs Reviewed By: ltamasi Differential Revision: D30445998 Pulled By: akankshamahajan15 fbshipit-source-id: ba48093843864faec57f1f365cce7b5a569c4021 --- HISTORY.md | 1 + db/db_impl/db_impl_compaction_flush.cc | 7 +- db/db_sst_test.cc | 90 ++++++++++++++++++++++++++ 3 files changed, 95 insertions(+), 3 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index aa728f412..a424099e3 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -8,6 +8,7 @@ * Fixed an issue where `OnFlushCompleted` was not called for atomic flush. * Fixed a bug affecting the batched `MultiGet` API when used with keys spanning multiple column families and `sorted_input == false`. * Fixed a potential incorrect result in opt mode and assertion failures caused by releasing snapshot(s) during compaction. +* Fixed passing of BlobFileCompletionCallback to Compaction job and Atomic flush job which was default paramter (nullptr). BlobFileCompletitionCallback is internal callback that manages addition of blob files to SSTFileManager. ### New Features * Made the EventListener extend the Customizable class. diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index df849d519..b1679d756 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -411,7 +411,7 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( stats_, &event_logger_, mutable_cf_options.report_bg_io_stats, false /* sync_output_directory */, false /* write_manifest */, thread_pri, io_tracer_, db_id_, db_session_id_, - cfd->GetFullHistoryTsLow())); + cfd->GetFullHistoryTsLow(), &blob_callback_)); } std::vector file_meta(num_cfs); @@ -1280,7 +1280,7 @@ Status DBImpl::CompactFilesImpl( c->mutable_cf_options()->report_bg_io_stats, dbname_, &compaction_job_stats, Env::Priority::USER, io_tracer_, &manual_compaction_paused_, nullptr, db_id_, db_session_id_, - c->column_family_data()->GetFullHistoryTsLow()); + c->column_family_data()->GetFullHistoryTsLow(), &blob_callback_); // Creating a compaction influences the compaction score because the score // takes running compactions into account (by skipping files that are already @@ -3193,7 +3193,8 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, &compaction_job_stats, thread_pri, io_tracer_, is_manual ? &manual_compaction_paused_ : nullptr, is_manual ? manual_compaction->canceled : nullptr, db_id_, - db_session_id_, c->column_family_data()->GetFullHistoryTsLow()); + db_session_id_, c->column_family_data()->GetFullHistoryTsLow(), + &blob_callback_); compaction_job.Prepare(); NotifyOnCompactionBegin(c->column_family_data(), c.get(), status, diff --git a/db/db_sst_test.cc b/db/db_sst_test.cc index 77984758f..cba6fb0b7 100644 --- a/db/db_sst_test.cc +++ b/db/db_sst_test.cc @@ -1569,6 +1569,96 @@ TEST_F(DBSSTTest, GetTotalSstFilesSizeVersionsFilesShared) { ASSERT_EQ(total_sst_files_size, 0); } +// This test if blob files are recorded by SST File Manager when Compaction job +// creates/delete them and in case of AtomicFlush. +TEST_F(DBSSTTest, DBWithSFMForBlobFilesAtomicFlush) { + std::shared_ptr sst_file_manager(NewSstFileManager(env_)); + auto sfm = static_cast(sst_file_manager.get()); + Options options = CurrentOptions(); + options.sst_file_manager = sst_file_manager; + options.enable_blob_files = true; + options.min_blob_size = 0; + options.disable_auto_compactions = true; + options.enable_blob_garbage_collection = true; + options.blob_garbage_collection_age_cutoff = 0.5; + options.atomic_flush = true; + + int files_added = 0; + int files_deleted = 0; + int files_scheduled_to_delete = 0; + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "SstFileManagerImpl::OnAddFile", [&](void* arg) { + const std::string* const file_path = + static_cast(arg); + if (EndsWith(*file_path, ".blob")) { + files_added++; + } + }); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "SstFileManagerImpl::OnDeleteFile", [&](void* arg) { + const std::string* const file_path = + static_cast(arg); + if (EndsWith(*file_path, ".blob")) { + files_deleted++; + } + }); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "SstFileManagerImpl::ScheduleFileDeletion", [&](void* arg) { + assert(arg); + const std::string* const file_path = + static_cast(arg); + if (EndsWith(*file_path, ".blob")) { + ++files_scheduled_to_delete; + } + }); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + DestroyAndReopen(options); + Random rnd(301); + + ASSERT_OK(Put("key_1", "value_1")); + ASSERT_OK(Put("key_2", "value_2")); + ASSERT_OK(Put("key_3", "value_3")); + ASSERT_OK(Put("key_4", "value_4")); + ASSERT_OK(Flush()); + + // Overwrite will create the garbage data. + ASSERT_OK(Put("key_3", "new_value_3")); + ASSERT_OK(Put("key_4", "new_value_4")); + ASSERT_OK(Flush()); + + ASSERT_OK(Put("Key5", "blob_value5")); + ASSERT_OK(Put("Key6", "blob_value6")); + ASSERT_OK(Flush()); + + ASSERT_EQ(files_added, 3); + ASSERT_EQ(files_deleted, 0); + ASSERT_EQ(files_scheduled_to_delete, 0); + files_added = 0; + + constexpr Slice* begin = nullptr; + constexpr Slice* end = nullptr; + // Compaction job will create a new file and delete the older files. + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), begin, end)); + + ASSERT_EQ(files_added, 1); + ASSERT_EQ(files_deleted, 1); + ASSERT_EQ(files_scheduled_to_delete, 1); + + Close(); + ASSERT_OK(DestroyDB(dbname_, options)); + sfm->WaitForEmptyTrash(); + ASSERT_EQ(files_deleted, 4); + ASSERT_EQ(files_scheduled_to_delete, 4); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks(); +} + #endif // ROCKSDB_LITE } // namespace ROCKSDB_NAMESPACE From 22f2936b35ba1e20c96ba31d29d082dabb930d5a Mon Sep 17 00:00:00 2001 From: anand76 Date: Fri, 20 Aug 2021 11:49:53 -0700 Subject: [PATCH 3/5] Update the block_read_count/block_read_byte counters in MultiGet (#8676) Summary: MultiGet in block based table reader doesn't use BlockFetcher. As a result, the block_read_count and block_read_byte PerfContext counters were not being updated. This fixes that by updating them in MultiRead. Pull Request resolved: https://github.com/facebook/rocksdb/pull/8676 Reviewed By: zhichao-cao Differential Revision: D30428680 Pulled By: anand1976 fbshipit-source-id: 21846efe92588fc17123665dd06733693a40126d --- table/block_based/block_based_table_reader.cc | 3 +++ table/block_based/block_based_table_reader_test.cc | 8 ++++++++ 2 files changed, 11 insertions(+) diff --git a/table/block_based/block_based_table_reader.cc b/table/block_based/block_based_table_reader.cc index c9ee60aeb..ee6ccf830 100644 --- a/table/block_based/block_based_table_reader.cc +++ b/table/block_based/block_based_table_reader.cc @@ -1713,6 +1713,9 @@ void BlockBasedTable::RetrieveMultipleBlocks( req_offset_for_block.emplace_back(0); } req_idx_for_block.emplace_back(read_reqs.size()); + + PERF_COUNTER_ADD(block_read_count, 1); + PERF_COUNTER_ADD(block_read_byte, block_size(handle)); } // Handle the last block and process the pending last request if (prev_len != 0) { diff --git a/table/block_based/block_based_table_reader_test.cc b/table/block_based/block_based_table_reader_test.cc index 07136dbf8..775f2f505 100644 --- a/table/block_based/block_based_table_reader_test.cc +++ b/table/block_based/block_based_table_reader_test.cc @@ -228,8 +228,16 @@ TEST_P(BlockBasedTableReaderTest, MultiGet) { // Execute MultiGet. MultiGetContext::Range range = ctx.GetMultiGetRange(); + PerfContext* perf_ctx = get_perf_context(); + perf_ctx->Reset(); table->MultiGet(ReadOptions(), &range, nullptr); + ASSERT_GE(perf_ctx->block_read_count - perf_ctx->index_block_read_count - + perf_ctx->filter_block_read_count - + perf_ctx->compression_dict_block_read_count, + 1); + ASSERT_GE(perf_ctx->block_read_byte, 1); + for (const Status& status : statuses) { ASSERT_OK(status); } From f35042ca40f84b9e33cc6876738392c6eff12898 Mon Sep 17 00:00:00 2001 From: anand76 Date: Fri, 20 Aug 2021 15:16:33 -0700 Subject: [PATCH 4/5] Add a PerfContext counter for secondary cache hits (#8685) Summary: Add a PerfContext counter. Pull Request resolved: https://github.com/facebook/rocksdb/pull/8685 Reviewed By: zhichao-cao Differential Revision: D30453957 Pulled By: anand1976 fbshipit-source-id: 42888a3ced240e1c44446d52d3b04adfb01f5665 --- HISTORY.md | 2 ++ cache/lru_cache.cc | 3 +++ cache/lru_cache_test.cc | 3 +++ include/rocksdb/perf_context.h | 3 +++ monitoring/perf_context.cc | 5 +++++ 5 files changed, 16 insertions(+) diff --git a/HISTORY.md b/HISTORY.md index a424099e3..b1c9ffb7d 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -9,6 +9,7 @@ * Fixed a bug affecting the batched `MultiGet` API when used with keys spanning multiple column families and `sorted_input == false`. * Fixed a potential incorrect result in opt mode and assertion failures caused by releasing snapshot(s) during compaction. * Fixed passing of BlobFileCompletionCallback to Compaction job and Atomic flush job which was default paramter (nullptr). BlobFileCompletitionCallback is internal callback that manages addition of blob files to SSTFileManager. +* Fixed MultiGet not updating the block_read_count and block_read_byte PerfContext counters ### New Features * Made the EventListener extend the Customizable class. @@ -20,6 +21,7 @@ * Fast forward option in Trace replay changed to double type to allow replaying at a lower speed, by settings the value between 0 and 1. This option can be set via `ReplayOptions` in `Replayer::Replay()`, or via `--trace_replay_fast_forward` in db_bench. * Add property `LiveSstFilesSizeAtTemperature` to retrieve sst file size at different temperature. * Added a stat rocksdb.secondary.cache.hits +* Added a PerfContext counter secondary_cache_hit_count * The integrated BlobDB implementation now supports the tickers `BLOB_DB_BLOB_FILE_BYTES_READ`, `BLOB_DB_GC_NUM_KEYS_RELOCATED`, and `BLOB_DB_GC_BYTES_RELOCATED`, as well as the histograms `BLOB_DB_COMPRESSION_MICROS` and `BLOB_DB_DECOMPRESSION_MICROS`. ## Public API change diff --git a/cache/lru_cache.cc b/cache/lru_cache.cc index 710a155c4..689242ca9 100644 --- a/cache/lru_cache.cc +++ b/cache/lru_cache.cc @@ -13,6 +13,7 @@ #include #include +#include "monitoring/perf_context_imp.h" #include "monitoring/statistics.h" #include "util/mutexlock.h" @@ -473,6 +474,7 @@ Cache::Handle* LRUCacheShard::Lookup( e->Free(); e = nullptr; } else { + PERF_COUNTER_ADD(secondary_cache_hit_count, 1); RecordTick(stats, SECONDARY_CACHE_HITS); } } else { @@ -481,6 +483,7 @@ Cache::Handle* LRUCacheShard::Lookup( e->SetIncomplete(true); // This may be slightly inaccurate, if the lookup eventually fails. // But the probability is very low. + PERF_COUNTER_ADD(secondary_cache_hit_count, 1); RecordTick(stats, SECONDARY_CACHE_HITS); } } diff --git a/cache/lru_cache_test.cc b/cache/lru_cache_test.cc index 74409421c..e840273bd 100644 --- a/cache/lru_cache_test.cc +++ b/cache/lru_cache_test.cc @@ -481,6 +481,7 @@ TEST_F(LRUSecondaryCacheTest, BasicTest) { ASSERT_OK(cache->Insert("k2", item2, &LRUSecondaryCacheTest::helper_, str2.length())); + get_perf_context()->Reset(); Cache::Handle* handle; handle = cache->Lookup("k2", &LRUSecondaryCacheTest::helper_, test_item_creator, @@ -497,6 +498,8 @@ TEST_F(LRUSecondaryCacheTest, BasicTest) { ASSERT_EQ(secondary_cache->num_lookups(), 1u); ASSERT_EQ(stats->getTickerCount(SECONDARY_CACHE_HITS), secondary_cache->num_lookups()); + PerfContext perf_ctx = *get_perf_context(); + ASSERT_EQ(perf_ctx.secondary_cache_hit_count, secondary_cache->num_lookups()); cache.reset(); secondary_cache.reset(); diff --git a/include/rocksdb/perf_context.h b/include/rocksdb/perf_context.h index 699f57344..f3058416e 100644 --- a/include/rocksdb/perf_context.h +++ b/include/rocksdb/perf_context.h @@ -74,6 +74,9 @@ struct PerfContext { uint64_t filter_block_read_count; // total number of filter block reads uint64_t compression_dict_block_read_count; // total number of compression // dictionary block reads + + uint64_t secondary_cache_hit_count; // total number of secondary cache hits + uint64_t block_checksum_time; // total nanos spent on block checksum uint64_t block_decompress_time; // total nanos spent on block decompression diff --git a/monitoring/perf_context.cc b/monitoring/perf_context.cc index d45d84fb6..9e56f1018 100644 --- a/monitoring/perf_context.cc +++ b/monitoring/perf_context.cc @@ -47,6 +47,7 @@ PerfContext::PerfContext(const PerfContext& other) { block_cache_filter_hit_count = other.block_cache_filter_hit_count; filter_block_read_count = other.filter_block_read_count; compression_dict_block_read_count = other.compression_dict_block_read_count; + secondary_cache_hit_count = other.secondary_cache_hit_count; block_checksum_time = other.block_checksum_time; block_decompress_time = other.block_decompress_time; get_read_bytes = other.get_read_bytes; @@ -144,6 +145,7 @@ PerfContext::PerfContext(PerfContext&& other) noexcept { block_cache_filter_hit_count = other.block_cache_filter_hit_count; filter_block_read_count = other.filter_block_read_count; compression_dict_block_read_count = other.compression_dict_block_read_count; + secondary_cache_hit_count = other.secondary_cache_hit_count; block_checksum_time = other.block_checksum_time; block_decompress_time = other.block_decompress_time; get_read_bytes = other.get_read_bytes; @@ -243,6 +245,7 @@ PerfContext& PerfContext::operator=(const PerfContext& other) { block_cache_filter_hit_count = other.block_cache_filter_hit_count; filter_block_read_count = other.filter_block_read_count; compression_dict_block_read_count = other.compression_dict_block_read_count; + secondary_cache_hit_count = other.secondary_cache_hit_count; block_checksum_time = other.block_checksum_time; block_decompress_time = other.block_decompress_time; get_read_bytes = other.get_read_bytes; @@ -339,6 +342,7 @@ void PerfContext::Reset() { block_cache_filter_hit_count = 0; filter_block_read_count = 0; compression_dict_block_read_count = 0; + secondary_cache_hit_count = 0; block_checksum_time = 0; block_decompress_time = 0; get_read_bytes = 0; @@ -459,6 +463,7 @@ std::string PerfContext::ToString(bool exclude_zero_counters) const { PERF_CONTEXT_OUTPUT(block_cache_filter_hit_count); PERF_CONTEXT_OUTPUT(filter_block_read_count); PERF_CONTEXT_OUTPUT(compression_dict_block_read_count); + PERF_CONTEXT_OUTPUT(secondary_cache_hit_count); PERF_CONTEXT_OUTPUT(block_checksum_time); PERF_CONTEXT_OUTPUT(block_decompress_time); PERF_CONTEXT_OUTPUT(get_read_bytes); From baf22b4ee6eb27d7833dfbf5c59a32e15eef31a4 Mon Sep 17 00:00:00 2001 From: Merlin Mao Date: Fri, 20 Aug 2021 15:32:55 -0700 Subject: [PATCH 5/5] Add `IteratorTraceExecutionResult` for iterator related trace records. (#8687) Summary: - Allow to get `Valid()`, `status()`, `key()` and `value()` of an iterator from `IteratorTraceExecutionResult`. - Move lower bound and upper bound from `IteratorSeekQueryTraceRecord` to `IteratorQueryTraceRecord`. Added test in `DBTest2.TraceAndReplay`. Pull Request resolved: https://github.com/facebook/rocksdb/pull/8687 Reviewed By: zhichao-cao Differential Revision: D30457630 Pulled By: autopear fbshipit-source-id: be433099a25895b3aa6f0c00f95ad7b1d7489c1d --- db/db_test2.cc | 70 +++++++++++++++++------- include/rocksdb/trace_record.h | 29 ++++++---- include/rocksdb/trace_record_result.h | 79 +++++++++++++++------------ trace_replay/trace_record.cc | 40 +++++++++----- trace_replay/trace_record_handler.cc | 17 ++++-- trace_replay/trace_record_result.cc | 40 ++++++++++++++ 6 files changed, 188 insertions(+), 87 deletions(-) diff --git a/db/db_test2.cc b/db/db_test2.cc index b7e626df6..f3b8dfbfd 100644 --- a/db/db_test2.cc +++ b/db/db_test2.cc @@ -4256,13 +4256,6 @@ class TraceExecutionResultHandler : public TraceRecordResult::Handler { writes_++; break; } - case kTraceIteratorSeek: - case kTraceIteratorSeekForPrev: { - total_latency_ += result.GetLatency(); - cnt_++; - seeks_++; - break; - } default: return Status::Corruption("Type mismatch."); } @@ -4309,6 +4302,25 @@ class TraceExecutionResultHandler : public TraceRecordResult::Handler { return Status::OK(); } + virtual Status Handle(const IteratorTraceExecutionResult& result) override { + if (result.GetStartTimestamp() > result.GetEndTimestamp()) { + return Status::InvalidArgument("Invalid timestamps."); + } + result.GetStatus().PermitUncheckedError(); + switch (result.GetTraceType()) { + case kTraceIteratorSeek: + case kTraceIteratorSeekForPrev: { + total_latency_ += result.GetLatency(); + cnt_++; + seeks_++; + break; + } + default: + return Status::Corruption("Type mismatch."); + } + return Status::OK(); + } + void Reset() { total_latency_ = 0; cnt_ = 0; @@ -4644,23 +4656,39 @@ TEST_F(DBTest2, TraceAndManualReplay) { continue; } if (s.ok()) { - if (record->GetTraceType() == kTraceIteratorSeek || - record->GetTraceType() == kTraceIteratorSeekForPrev) { - IteratorSeekQueryTraceRecord* iter_r = - dynamic_cast(record.get()); - // Check if lower/upper bounds are correctly saved and decoded. - lower_bound = iter_r->GetLowerBound(); - if (!lower_bound.empty()) { - ASSERT_EQ(lower_bound.ToString(), "iter-1"); - } - upper_bound = iter_r->GetUpperBound(); - if (!upper_bound.empty()) { - ASSERT_EQ(upper_bound.ToString(), "iter-3"); - } - } ASSERT_OK(replayer->Execute(record, &result)); if (result != nullptr) { ASSERT_OK(result->Accept(&res_handler)); + if (record->GetTraceType() == kTraceIteratorSeek || + record->GetTraceType() == kTraceIteratorSeekForPrev) { + IteratorSeekQueryTraceRecord* iter_rec = + dynamic_cast(record.get()); + IteratorTraceExecutionResult* iter_res = + dynamic_cast(result.get()); + // Check if lower/upper bounds are correctly saved and decoded. + std::string lower_str = iter_rec->GetLowerBound().ToString(); + std::string upper_str = iter_rec->GetUpperBound().ToString(); + std::string iter_key = iter_res->GetKey().ToString(); + std::string iter_value = iter_res->GetValue().ToString(); + if (!lower_str.empty() && !upper_str.empty()) { + ASSERT_EQ(lower_str, "iter-1"); + ASSERT_EQ(upper_str, "iter-3"); + if (iter_res->GetValid()) { + // If iterator is valid, then lower_bound <= key < upper_bound. + ASSERT_GE(iter_key, lower_str); + ASSERT_LT(iter_key, upper_str); + } else { + // If iterator is invalid, then + // key < lower_bound or key >= upper_bound. + ASSERT_TRUE(iter_key < lower_str || iter_key >= upper_str); + } + } + // If iterator is invalid, the key and value should be empty. + if (!iter_res->GetValid()) { + ASSERT_TRUE(iter_key.empty()); + ASSERT_TRUE(iter_value.empty()); + } + } result.reset(); } } diff --git a/include/rocksdb/trace_record.h b/include/rocksdb/trace_record.h index defa89934..3d0c0acbf 100644 --- a/include/rocksdb/trace_record.h +++ b/include/rocksdb/trace_record.h @@ -65,19 +65,15 @@ class TraceRecord { public: virtual ~Handler() = default; - // Handle WriteQueryTraceRecord virtual Status Handle(const WriteQueryTraceRecord& record, std::unique_ptr* result) = 0; - // Handle GetQueryTraceRecord virtual Status Handle(const GetQueryTraceRecord& record, std::unique_ptr* result) = 0; - // Handle IteratorSeekQueryTraceRecord virtual Status Handle(const IteratorSeekQueryTraceRecord& record, std::unique_ptr* result) = 0; - // Handle MultiGetQueryTraceRecord virtual Status Handle(const MultiGetQueryTraceRecord& record, std::unique_ptr* result) = 0; }; @@ -152,6 +148,23 @@ class GetQueryTraceRecord : public QueryTraceRecord { class IteratorQueryTraceRecord : public QueryTraceRecord { public: explicit IteratorQueryTraceRecord(uint64_t timestamp); + + IteratorQueryTraceRecord(PinnableSlice&& lower_bound, + PinnableSlice&& upper_bound, uint64_t timestamp); + + IteratorQueryTraceRecord(const std::string& lower_bound, + const std::string& upper_bound, uint64_t timestamp); + + virtual ~IteratorQueryTraceRecord() override; + + // Get the iterator's lower/upper bound. They may be used in ReadOptions to + // create an Iterator instance. + virtual Slice GetLowerBound() const; + virtual Slice GetUpperBound() const; + + private: + PinnableSlice lower_; + PinnableSlice upper_; }; // Trace record for Iterator::Seek() and Iterator::SeekForPrev() operation. @@ -193,12 +206,6 @@ class IteratorSeekQueryTraceRecord : public IteratorQueryTraceRecord { // Key to seek to. virtual Slice GetKey() const; - // Iterate lower bound. - virtual Slice GetLowerBound() const; - - // Iterate upper bound. - virtual Slice GetUpperBound() const; - Status Accept(Handler* handler, std::unique_ptr* result) override; @@ -206,8 +213,6 @@ class IteratorSeekQueryTraceRecord : public IteratorQueryTraceRecord { SeekType type_; uint32_t cf_id_; PinnableSlice key_; - PinnableSlice lower_; - PinnableSlice upper_; }; // Trace record for DB::MultiGet() operation. diff --git a/include/rocksdb/trace_record_result.h b/include/rocksdb/trace_record_result.h index 7924c6de3..0cd0004a6 100644 --- a/include/rocksdb/trace_record_result.h +++ b/include/rocksdb/trace_record_result.h @@ -9,11 +9,13 @@ #include #include "rocksdb/rocksdb_namespace.h" +#include "rocksdb/slice.h" #include "rocksdb/status.h" #include "rocksdb/trace_record.h" namespace ROCKSDB_NAMESPACE { +class IteratorTraceExecutionResult; class MultiValuesTraceExecutionResult; class SingleValueTraceExecutionResult; class StatusOnlyTraceExecutionResult; @@ -34,42 +36,14 @@ class TraceRecordResult { public: virtual ~Handler() = default; - // Handle StatusOnlyTraceExecutionResult virtual Status Handle(const StatusOnlyTraceExecutionResult& result) = 0; - // Handle SingleValueTraceExecutionResult virtual Status Handle(const SingleValueTraceExecutionResult& result) = 0; - // Handle MultiValuesTraceExecutionResult virtual Status Handle(const MultiValuesTraceExecutionResult& result) = 0; - }; - /* - * Example handler to just print the trace record execution results. - * - * class ResultPrintHandler : public TraceRecordResult::Handler { - * public: - * ResultPrintHandler(); - * ~ResultPrintHandler() override {} - * - * Status Handle(const StatusOnlyTraceExecutionResult& result) override { - * std::cout << "Status: " << result.GetStatus().ToString() << std::endl; - * } - * - * Status Handle(const SingleValueTraceExecutionResult& result) override { - * std::cout << "Status: " << result.GetStatus().ToString() - * << ", value: " << result.GetValue() << std::endl; - * } - * - * Status Handle(const MultiValuesTraceExecutionResult& result) override { - * size_t size = result.GetMultiStatus().size(); - * for (size_t i = 0; i < size; i++) { - * std::cout << "Status: " << result.GetMultiStatus()[i].ToString() - * << ", value: " << result.GetValues()[i] << std::endl; - * } - * } - * }; - * */ + virtual Status Handle(const IteratorTraceExecutionResult& result) = 0; + }; // Accept the handler. virtual Status Accept(Handler* handler) = 0; @@ -106,8 +80,7 @@ class TraceExecutionResult : public TraceRecordResult { }; // Result for operations that only return a single Status. -// Example operations: DB::Write(), Iterator::Seek() and -// Iterator::SeekForPrev(). +// Example operation: DB::Write() class StatusOnlyTraceExecutionResult : public TraceExecutionResult { public: StatusOnlyTraceExecutionResult(Status status, uint64_t start_timestamp, @@ -138,7 +111,7 @@ class SingleValueTraceExecutionResult : public TraceExecutionResult { virtual ~SingleValueTraceExecutionResult() override; - // Return status of DB::Get(), etc. + // Return status of DB::Get(). virtual const Status& GetStatus() const; // Value for the searched key. @@ -151,7 +124,7 @@ class SingleValueTraceExecutionResult : public TraceExecutionResult { std::string value_; }; -// Result for operations that return multiple Status(es) and values. +// Result for operations that return multiple Status(es) and values as vectors. // Example operation: DB::MultiGet() class MultiValuesTraceExecutionResult : public TraceExecutionResult { public: @@ -162,7 +135,7 @@ class MultiValuesTraceExecutionResult : public TraceExecutionResult { virtual ~MultiValuesTraceExecutionResult() override; - // Returned Status(es) of DB::MultiGet(), etc. + // Returned Status(es) of DB::MultiGet(). virtual const std::vector& GetMultiStatus() const; // Returned values for the searched keys. @@ -175,4 +148,40 @@ class MultiValuesTraceExecutionResult : public TraceExecutionResult { std::vector values_; }; +// Result for Iterator operations. +// Example operations: Iterator::Seek(), Iterator::SeekForPrev() +class IteratorTraceExecutionResult : public TraceExecutionResult { + public: + IteratorTraceExecutionResult(bool valid, Status status, PinnableSlice&& key, + PinnableSlice&& value, uint64_t start_timestamp, + uint64_t end_timestamp, TraceType trace_type); + + IteratorTraceExecutionResult(bool valid, Status status, + const std::string& key, const std::string& value, + uint64_t start_timestamp, uint64_t end_timestamp, + TraceType trace_type); + + virtual ~IteratorTraceExecutionResult() override; + + // Return if the Iterator is valid. + virtual bool GetValid() const; + + // Return the status of the Iterator. + virtual const Status& GetStatus() const; + + // Key of the current iterating entry, empty if GetValid() is false. + virtual Slice GetKey() const; + + // Value of the current iterating entry, empty if GetValid() is false. + virtual Slice GetValue() const; + + virtual Status Accept(Handler* handler) override; + + private: + bool valid_; + Status status_; + PinnableSlice key_; + PinnableSlice value_; +}; + } // namespace ROCKSDB_NAMESPACE diff --git a/trace_replay/trace_record.cc b/trace_replay/trace_record.cc index b377a18d2..21df0275d 100644 --- a/trace_replay/trace_record.cc +++ b/trace_replay/trace_record.cc @@ -82,6 +82,27 @@ Status GetQueryTraceRecord::Accept(Handler* handler, IteratorQueryTraceRecord::IteratorQueryTraceRecord(uint64_t timestamp) : QueryTraceRecord(timestamp) {} +IteratorQueryTraceRecord::IteratorQueryTraceRecord(PinnableSlice&& lower_bound, + PinnableSlice&& upper_bound, + uint64_t timestamp) + : QueryTraceRecord(timestamp), + lower_(std::move(lower_bound)), + upper_(std::move(upper_bound)) {} + +IteratorQueryTraceRecord::IteratorQueryTraceRecord( + const std::string& lower_bound, const std::string& upper_bound, + uint64_t timestamp) + : QueryTraceRecord(timestamp) { + lower_.PinSelf(lower_bound); + upper_.PinSelf(upper_bound); +} + +IteratorQueryTraceRecord::~IteratorQueryTraceRecord() {} + +Slice IteratorQueryTraceRecord::GetLowerBound() const { return Slice(lower_); } + +Slice IteratorQueryTraceRecord::GetUpperBound() const { return Slice(upper_); } + // IteratorSeekQueryTraceRecord IteratorSeekQueryTraceRecord::IteratorSeekQueryTraceRecord( SeekType seek_type, uint32_t column_family_id, PinnableSlice&& key, @@ -104,23 +125,20 @@ IteratorSeekQueryTraceRecord::IteratorSeekQueryTraceRecord( SeekType seek_type, uint32_t column_family_id, PinnableSlice&& key, PinnableSlice&& lower_bound, PinnableSlice&& upper_bound, uint64_t timestamp) - : IteratorQueryTraceRecord(timestamp), + : IteratorQueryTraceRecord(std::move(lower_bound), std::move(upper_bound), + timestamp), type_(seek_type), cf_id_(column_family_id), - key_(std::move(key)), - lower_(std::move(lower_bound)), - upper_(std::move(upper_bound)) {} + key_(std::move(key)) {} IteratorSeekQueryTraceRecord::IteratorSeekQueryTraceRecord( SeekType seek_type, uint32_t column_family_id, const std::string& key, const std::string& lower_bound, const std::string& upper_bound, uint64_t timestamp) - : IteratorQueryTraceRecord(timestamp), + : IteratorQueryTraceRecord(lower_bound, upper_bound, timestamp), type_(seek_type), cf_id_(column_family_id) { key_.PinSelf(key); - lower_.PinSelf(lower_bound); - upper_.PinSelf(upper_bound); } IteratorSeekQueryTraceRecord::~IteratorSeekQueryTraceRecord() { key_.clear(); } @@ -140,14 +158,6 @@ uint32_t IteratorSeekQueryTraceRecord::GetColumnFamilyID() const { Slice IteratorSeekQueryTraceRecord::GetKey() const { return Slice(key_); } -Slice IteratorSeekQueryTraceRecord::GetLowerBound() const { - return Slice(lower_); -} - -Slice IteratorSeekQueryTraceRecord::GetUpperBound() const { - return Slice(upper_); -} - Status IteratorSeekQueryTraceRecord::Accept( Handler* handler, std::unique_ptr* result) { assert(handler != nullptr); diff --git a/trace_replay/trace_record_handler.cc b/trace_replay/trace_record_handler.cc index e0b5fb202..8f5371622 100644 --- a/trace_replay/trace_record_handler.cc +++ b/trace_replay/trace_record_handler.cc @@ -120,12 +120,21 @@ Status TraceExecutionHandler::Handle( uint64_t end = clock_->NowMicros(); Status s = single_iter->status(); - delete single_iter; - if (s.ok() && result != nullptr) { - result->reset(new StatusOnlyTraceExecutionResult(s, start, end, - record.GetTraceType())); + if (single_iter->Valid()) { + PinnableSlice ps_key; + ps_key.PinSelf(single_iter->key()); + PinnableSlice ps_value; + ps_value.PinSelf(single_iter->value()); + result->reset(new IteratorTraceExecutionResult( + true, s, std::move(ps_key), std::move(ps_value), start, end, + record.GetTraceType())); + } else { + result->reset(new IteratorTraceExecutionResult( + false, s, "", "", start, end, record.GetTraceType())); + } } + delete single_iter; return s; } diff --git a/trace_replay/trace_record_result.cc b/trace_replay/trace_record_result.cc index b22b57e43..9c0cb43ad 100644 --- a/trace_replay/trace_record_result.cc +++ b/trace_replay/trace_record_result.cc @@ -103,4 +103,44 @@ Status MultiValuesTraceExecutionResult::Accept(Handler* handler) { return handler->Handle(*this); } +// IteratorTraceExecutionResult +IteratorTraceExecutionResult::IteratorTraceExecutionResult( + bool valid, Status status, PinnableSlice&& key, PinnableSlice&& value, + uint64_t start_timestamp, uint64_t end_timestamp, TraceType trace_type) + : TraceExecutionResult(start_timestamp, end_timestamp, trace_type), + valid_(valid), + status_(std::move(status)), + key_(std::move(key)), + value_(std::move(value)) {} + +IteratorTraceExecutionResult::IteratorTraceExecutionResult( + bool valid, Status status, const std::string& key, const std::string& value, + uint64_t start_timestamp, uint64_t end_timestamp, TraceType trace_type) + : TraceExecutionResult(start_timestamp, end_timestamp, trace_type), + valid_(valid), + status_(std::move(status)) { + key_.PinSelf(key); + value_.PinSelf(value); +} + +IteratorTraceExecutionResult::~IteratorTraceExecutionResult() { + key_.clear(); + value_.clear(); +} + +bool IteratorTraceExecutionResult::GetValid() const { return valid_; } + +const Status& IteratorTraceExecutionResult::GetStatus() const { + return status_; +} + +Slice IteratorTraceExecutionResult::GetKey() const { return Slice(key_); } + +Slice IteratorTraceExecutionResult::GetValue() const { return Slice(value_); } + +Status IteratorTraceExecutionResult::Accept(Handler* handler) { + assert(handler != nullptr); + return handler->Handle(*this); +} + } // namespace ROCKSDB_NAMESPACE