diff --git a/HISTORY.md b/HISTORY.md index 53a5bbbc6..fa680a9ad 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -8,6 +8,8 @@ * Fixed an issue where `OnFlushCompleted` was not called for atomic flush. * Fixed a bug affecting the batched `MultiGet` API when used with keys spanning multiple column families and `sorted_input == false`. * Fixed a potential incorrect result in opt mode and assertion failures caused by releasing snapshot(s) during compaction. +* Fixed passing of BlobFileCompletionCallback to Compaction job and Atomic flush job which was default paramter (nullptr). BlobFileCompletitionCallback is internal callback that manages addition of blob files to SSTFileManager. +* Fixed MultiGet not updating the block_read_count and block_read_byte PerfContext counters ### New Features * Made the EventListener extend the Customizable class. @@ -19,6 +21,7 @@ * Fast forward option in Trace replay changed to double type to allow replaying at a lower speed, by settings the value between 0 and 1. This option can be set via `ReplayOptions` in `Replayer::Replay()`, or via `--trace_replay_fast_forward` in db_bench. * Add property `LiveSstFilesSizeAtTemperature` to retrieve sst file size at different temperature. * Added a stat rocksdb.secondary.cache.hits +* Added a PerfContext counter secondary_cache_hit_count * The integrated BlobDB implementation now supports the tickers `BLOB_DB_BLOB_FILE_BYTES_READ`, `BLOB_DB_GC_NUM_KEYS_RELOCATED`, and `BLOB_DB_GC_BYTES_RELOCATED`, as well as the histograms `BLOB_DB_COMPRESSION_MICROS` and `BLOB_DB_DECOMPRESSION_MICROS`. * Added hybrid configuration of Ribbon filter and Bloom filter where some LSM levels use Ribbon for memory space efficiency and some use Bloom for speed. See NewRibbonFilterPolicy. This also changes the default behavior of NewRibbonFilterPolicy to use Bloom for flushes under Leveled and Universal compaction and Ribbon otherwise. diff --git a/cache/lru_cache.cc b/cache/lru_cache.cc index 710a155c4..689242ca9 100644 --- a/cache/lru_cache.cc +++ b/cache/lru_cache.cc @@ -13,6 +13,7 @@ #include #include +#include "monitoring/perf_context_imp.h" #include "monitoring/statistics.h" #include "util/mutexlock.h" @@ -473,6 +474,7 @@ Cache::Handle* LRUCacheShard::Lookup( e->Free(); e = nullptr; } else { + PERF_COUNTER_ADD(secondary_cache_hit_count, 1); RecordTick(stats, SECONDARY_CACHE_HITS); } } else { @@ -481,6 +483,7 @@ Cache::Handle* LRUCacheShard::Lookup( e->SetIncomplete(true); // This may be slightly inaccurate, if the lookup eventually fails. // But the probability is very low. + PERF_COUNTER_ADD(secondary_cache_hit_count, 1); RecordTick(stats, SECONDARY_CACHE_HITS); } } diff --git a/cache/lru_cache_test.cc b/cache/lru_cache_test.cc index 74409421c..e840273bd 100644 --- a/cache/lru_cache_test.cc +++ b/cache/lru_cache_test.cc @@ -481,6 +481,7 @@ TEST_F(LRUSecondaryCacheTest, BasicTest) { ASSERT_OK(cache->Insert("k2", item2, &LRUSecondaryCacheTest::helper_, str2.length())); + get_perf_context()->Reset(); Cache::Handle* handle; handle = cache->Lookup("k2", &LRUSecondaryCacheTest::helper_, test_item_creator, @@ -497,6 +498,8 @@ TEST_F(LRUSecondaryCacheTest, BasicTest) { ASSERT_EQ(secondary_cache->num_lookups(), 1u); ASSERT_EQ(stats->getTickerCount(SECONDARY_CACHE_HITS), secondary_cache->num_lookups()); + PerfContext perf_ctx = *get_perf_context(); + ASSERT_EQ(perf_ctx.secondary_cache_hit_count, secondary_cache->num_lookups()); cache.reset(); secondary_cache.reset(); diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index df849d519..b1679d756 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -411,7 +411,7 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( stats_, &event_logger_, mutable_cf_options.report_bg_io_stats, false /* sync_output_directory */, false /* write_manifest */, thread_pri, io_tracer_, db_id_, db_session_id_, - cfd->GetFullHistoryTsLow())); + cfd->GetFullHistoryTsLow(), &blob_callback_)); } std::vector file_meta(num_cfs); @@ -1280,7 +1280,7 @@ Status DBImpl::CompactFilesImpl( c->mutable_cf_options()->report_bg_io_stats, dbname_, &compaction_job_stats, Env::Priority::USER, io_tracer_, &manual_compaction_paused_, nullptr, db_id_, db_session_id_, - c->column_family_data()->GetFullHistoryTsLow()); + c->column_family_data()->GetFullHistoryTsLow(), &blob_callback_); // Creating a compaction influences the compaction score because the score // takes running compactions into account (by skipping files that are already @@ -3193,7 +3193,8 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, &compaction_job_stats, thread_pri, io_tracer_, is_manual ? &manual_compaction_paused_ : nullptr, is_manual ? manual_compaction->canceled : nullptr, db_id_, - db_session_id_, c->column_family_data()->GetFullHistoryTsLow()); + db_session_id_, c->column_family_data()->GetFullHistoryTsLow(), + &blob_callback_); compaction_job.Prepare(); NotifyOnCompactionBegin(c->column_family_data(), c.get(), status, diff --git a/db/db_sst_test.cc b/db/db_sst_test.cc index 77984758f..cba6fb0b7 100644 --- a/db/db_sst_test.cc +++ b/db/db_sst_test.cc @@ -1569,6 +1569,96 @@ TEST_F(DBSSTTest, GetTotalSstFilesSizeVersionsFilesShared) { ASSERT_EQ(total_sst_files_size, 0); } +// This test if blob files are recorded by SST File Manager when Compaction job +// creates/delete them and in case of AtomicFlush. +TEST_F(DBSSTTest, DBWithSFMForBlobFilesAtomicFlush) { + std::shared_ptr sst_file_manager(NewSstFileManager(env_)); + auto sfm = static_cast(sst_file_manager.get()); + Options options = CurrentOptions(); + options.sst_file_manager = sst_file_manager; + options.enable_blob_files = true; + options.min_blob_size = 0; + options.disable_auto_compactions = true; + options.enable_blob_garbage_collection = true; + options.blob_garbage_collection_age_cutoff = 0.5; + options.atomic_flush = true; + + int files_added = 0; + int files_deleted = 0; + int files_scheduled_to_delete = 0; + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "SstFileManagerImpl::OnAddFile", [&](void* arg) { + const std::string* const file_path = + static_cast(arg); + if (EndsWith(*file_path, ".blob")) { + files_added++; + } + }); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "SstFileManagerImpl::OnDeleteFile", [&](void* arg) { + const std::string* const file_path = + static_cast(arg); + if (EndsWith(*file_path, ".blob")) { + files_deleted++; + } + }); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "SstFileManagerImpl::ScheduleFileDeletion", [&](void* arg) { + assert(arg); + const std::string* const file_path = + static_cast(arg); + if (EndsWith(*file_path, ".blob")) { + ++files_scheduled_to_delete; + } + }); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + DestroyAndReopen(options); + Random rnd(301); + + ASSERT_OK(Put("key_1", "value_1")); + ASSERT_OK(Put("key_2", "value_2")); + ASSERT_OK(Put("key_3", "value_3")); + ASSERT_OK(Put("key_4", "value_4")); + ASSERT_OK(Flush()); + + // Overwrite will create the garbage data. + ASSERT_OK(Put("key_3", "new_value_3")); + ASSERT_OK(Put("key_4", "new_value_4")); + ASSERT_OK(Flush()); + + ASSERT_OK(Put("Key5", "blob_value5")); + ASSERT_OK(Put("Key6", "blob_value6")); + ASSERT_OK(Flush()); + + ASSERT_EQ(files_added, 3); + ASSERT_EQ(files_deleted, 0); + ASSERT_EQ(files_scheduled_to_delete, 0); + files_added = 0; + + constexpr Slice* begin = nullptr; + constexpr Slice* end = nullptr; + // Compaction job will create a new file and delete the older files. + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), begin, end)); + + ASSERT_EQ(files_added, 1); + ASSERT_EQ(files_deleted, 1); + ASSERT_EQ(files_scheduled_to_delete, 1); + + Close(); + ASSERT_OK(DestroyDB(dbname_, options)); + sfm->WaitForEmptyTrash(); + ASSERT_EQ(files_deleted, 4); + ASSERT_EQ(files_scheduled_to_delete, 4); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks(); +} + #endif // ROCKSDB_LITE } // namespace ROCKSDB_NAMESPACE diff --git a/db/db_test2.cc b/db/db_test2.cc index c09ea4c71..f3b8dfbfd 100644 --- a/db/db_test2.cc +++ b/db/db_test2.cc @@ -4256,13 +4256,6 @@ class TraceExecutionResultHandler : public TraceRecordResult::Handler { writes_++; break; } - case kTraceIteratorSeek: - case kTraceIteratorSeekForPrev: { - total_latency_ += result.GetLatency(); - cnt_++; - seeks_++; - break; - } default: return Status::Corruption("Type mismatch."); } @@ -4309,6 +4302,25 @@ class TraceExecutionResultHandler : public TraceRecordResult::Handler { return Status::OK(); } + virtual Status Handle(const IteratorTraceExecutionResult& result) override { + if (result.GetStartTimestamp() > result.GetEndTimestamp()) { + return Status::InvalidArgument("Invalid timestamps."); + } + result.GetStatus().PermitUncheckedError(); + switch (result.GetTraceType()) { + case kTraceIteratorSeek: + case kTraceIteratorSeekForPrev: { + total_latency_ += result.GetLatency(); + cnt_++; + seeks_++; + break; + } + default: + return Status::Corruption("Type mismatch."); + } + return Status::OK(); + } + void Reset() { total_latency_ = 0; cnt_ = 0; @@ -4538,6 +4550,37 @@ TEST_F(DBTest2, TraceAndManualReplay) { single_iter = db_->NewIterator(ro); single_iter->Seek("f"); single_iter->SeekForPrev("g"); + ASSERT_OK(single_iter->status()); + delete single_iter; + + // Write some sequenced keys for testing lower/upper bounds of iterator. + batch.Clear(); + ASSERT_OK(batch.Put("iter-0", "iter-0")); + ASSERT_OK(batch.Put("iter-1", "iter-1")); + ASSERT_OK(batch.Put("iter-2", "iter-2")); + ASSERT_OK(batch.Put("iter-3", "iter-3")); + ASSERT_OK(batch.Put("iter-4", "iter-4")); + ASSERT_OK(db_->Write(wo, &batch)); + + ReadOptions bounded_ro = ro; + Slice lower_bound("iter-1"); + Slice upper_bound("iter-3"); + bounded_ro.iterate_lower_bound = &lower_bound; + bounded_ro.iterate_upper_bound = &upper_bound; + single_iter = db_->NewIterator(bounded_ro); + single_iter->Seek("iter-0"); + ASSERT_EQ(single_iter->key().ToString(), "iter-1"); + single_iter->Seek("iter-2"); + ASSERT_EQ(single_iter->key().ToString(), "iter-2"); + single_iter->Seek("iter-4"); + ASSERT_FALSE(single_iter->Valid()); + single_iter->SeekForPrev("iter-0"); + ASSERT_FALSE(single_iter->Valid()); + single_iter->SeekForPrev("iter-2"); + ASSERT_EQ(single_iter->key().ToString(), "iter-2"); + single_iter->SeekForPrev("iter-4"); + ASSERT_EQ(single_iter->key().ToString(), "iter-2"); + ASSERT_OK(single_iter->status()); delete single_iter; ASSERT_EQ("1", Get(0, "a")); @@ -4548,6 +4591,9 @@ TEST_F(DBTest2, TraceAndManualReplay) { ASSERT_EQ("NOT_FOUND", Get(1, "leveldb")); // Same as TraceAndReplay, Write x 8, Get x 3, Seek x 2. + // Plus 1 WriteBatch for iterator with lower/upper bounds, and 6 + // Seek(ForPrev)s. + // Total Write x 9, Get x 3, Seek x 8 ASSERT_OK(db_->EndTrace()); // These should not get into the trace file as it is after EndTrace. ASSERT_OK(Put("hello", "world")); @@ -4613,6 +4659,36 @@ TEST_F(DBTest2, TraceAndManualReplay) { ASSERT_OK(replayer->Execute(record, &result)); if (result != nullptr) { ASSERT_OK(result->Accept(&res_handler)); + if (record->GetTraceType() == kTraceIteratorSeek || + record->GetTraceType() == kTraceIteratorSeekForPrev) { + IteratorSeekQueryTraceRecord* iter_rec = + dynamic_cast(record.get()); + IteratorTraceExecutionResult* iter_res = + dynamic_cast(result.get()); + // Check if lower/upper bounds are correctly saved and decoded. + std::string lower_str = iter_rec->GetLowerBound().ToString(); + std::string upper_str = iter_rec->GetUpperBound().ToString(); + std::string iter_key = iter_res->GetKey().ToString(); + std::string iter_value = iter_res->GetValue().ToString(); + if (!lower_str.empty() && !upper_str.empty()) { + ASSERT_EQ(lower_str, "iter-1"); + ASSERT_EQ(upper_str, "iter-3"); + if (iter_res->GetValid()) { + // If iterator is valid, then lower_bound <= key < upper_bound. + ASSERT_GE(iter_key, lower_str); + ASSERT_LT(iter_key, upper_str); + } else { + // If iterator is invalid, then + // key < lower_bound or key >= upper_bound. + ASSERT_TRUE(iter_key < lower_str || iter_key >= upper_str); + } + } + // If iterator is invalid, the key and value should be empty. + if (!iter_res->GetValid()) { + ASSERT_TRUE(iter_key.empty()); + ASSERT_TRUE(iter_value.empty()); + } + } result.reset(); } } @@ -4622,9 +4698,9 @@ TEST_F(DBTest2, TraceAndManualReplay) { ASSERT_TRUE(s.IsIncomplete()); ASSERT_TRUE(replayer->Next(nullptr).IsIncomplete()); ASSERT_GT(res_handler.GetAvgLatency(), 0.0); - ASSERT_EQ(res_handler.GetNumWrites(), 8); + ASSERT_EQ(res_handler.GetNumWrites(), 9); ASSERT_EQ(res_handler.GetNumGets(), 3); - ASSERT_EQ(res_handler.GetNumIterSeeks(), 2); + ASSERT_EQ(res_handler.GetNumIterSeeks(), 8); ASSERT_EQ(res_handler.GetNumMultiGets(), 0); res_handler.Reset(); } diff --git a/include/rocksdb/perf_context.h b/include/rocksdb/perf_context.h index 699f57344..f3058416e 100644 --- a/include/rocksdb/perf_context.h +++ b/include/rocksdb/perf_context.h @@ -74,6 +74,9 @@ struct PerfContext { uint64_t filter_block_read_count; // total number of filter block reads uint64_t compression_dict_block_read_count; // total number of compression // dictionary block reads + + uint64_t secondary_cache_hit_count; // total number of secondary cache hits + uint64_t block_checksum_time; // total nanos spent on block checksum uint64_t block_decompress_time; // total nanos spent on block decompression diff --git a/include/rocksdb/trace_record.h b/include/rocksdb/trace_record.h index 3f591d3d1..3d0c0acbf 100644 --- a/include/rocksdb/trace_record.h +++ b/include/rocksdb/trace_record.h @@ -65,19 +65,15 @@ class TraceRecord { public: virtual ~Handler() = default; - // Handle WriteQueryTraceRecord virtual Status Handle(const WriteQueryTraceRecord& record, std::unique_ptr* result) = 0; - // Handle GetQueryTraceRecord virtual Status Handle(const GetQueryTraceRecord& record, std::unique_ptr* result) = 0; - // Handle IteratorSeekQueryTraceRecord virtual Status Handle(const IteratorSeekQueryTraceRecord& record, std::unique_ptr* result) = 0; - // Handle MultiGetQueryTraceRecord virtual Status Handle(const MultiGetQueryTraceRecord& record, std::unique_ptr* result) = 0; }; @@ -152,6 +148,23 @@ class GetQueryTraceRecord : public QueryTraceRecord { class IteratorQueryTraceRecord : public QueryTraceRecord { public: explicit IteratorQueryTraceRecord(uint64_t timestamp); + + IteratorQueryTraceRecord(PinnableSlice&& lower_bound, + PinnableSlice&& upper_bound, uint64_t timestamp); + + IteratorQueryTraceRecord(const std::string& lower_bound, + const std::string& upper_bound, uint64_t timestamp); + + virtual ~IteratorQueryTraceRecord() override; + + // Get the iterator's lower/upper bound. They may be used in ReadOptions to + // create an Iterator instance. + virtual Slice GetLowerBound() const; + virtual Slice GetUpperBound() const; + + private: + PinnableSlice lower_; + PinnableSlice upper_; }; // Trace record for Iterator::Seek() and Iterator::SeekForPrev() operation. @@ -169,6 +182,16 @@ class IteratorSeekQueryTraceRecord : public IteratorQueryTraceRecord { IteratorSeekQueryTraceRecord(SeekType seekType, uint32_t column_family_id, const std::string& key, uint64_t timestamp); + IteratorSeekQueryTraceRecord(SeekType seekType, uint32_t column_family_id, + PinnableSlice&& key, PinnableSlice&& lower_bound, + PinnableSlice&& upper_bound, uint64_t timestamp); + + IteratorSeekQueryTraceRecord(SeekType seekType, uint32_t column_family_id, + const std::string& key, + const std::string& lower_bound, + const std::string& upper_bound, + uint64_t timestamp); + virtual ~IteratorSeekQueryTraceRecord() override; // Trace type matches the seek type. diff --git a/include/rocksdb/trace_record_result.h b/include/rocksdb/trace_record_result.h index 7924c6de3..0cd0004a6 100644 --- a/include/rocksdb/trace_record_result.h +++ b/include/rocksdb/trace_record_result.h @@ -9,11 +9,13 @@ #include #include "rocksdb/rocksdb_namespace.h" +#include "rocksdb/slice.h" #include "rocksdb/status.h" #include "rocksdb/trace_record.h" namespace ROCKSDB_NAMESPACE { +class IteratorTraceExecutionResult; class MultiValuesTraceExecutionResult; class SingleValueTraceExecutionResult; class StatusOnlyTraceExecutionResult; @@ -34,42 +36,14 @@ class TraceRecordResult { public: virtual ~Handler() = default; - // Handle StatusOnlyTraceExecutionResult virtual Status Handle(const StatusOnlyTraceExecutionResult& result) = 0; - // Handle SingleValueTraceExecutionResult virtual Status Handle(const SingleValueTraceExecutionResult& result) = 0; - // Handle MultiValuesTraceExecutionResult virtual Status Handle(const MultiValuesTraceExecutionResult& result) = 0; - }; - /* - * Example handler to just print the trace record execution results. - * - * class ResultPrintHandler : public TraceRecordResult::Handler { - * public: - * ResultPrintHandler(); - * ~ResultPrintHandler() override {} - * - * Status Handle(const StatusOnlyTraceExecutionResult& result) override { - * std::cout << "Status: " << result.GetStatus().ToString() << std::endl; - * } - * - * Status Handle(const SingleValueTraceExecutionResult& result) override { - * std::cout << "Status: " << result.GetStatus().ToString() - * << ", value: " << result.GetValue() << std::endl; - * } - * - * Status Handle(const MultiValuesTraceExecutionResult& result) override { - * size_t size = result.GetMultiStatus().size(); - * for (size_t i = 0; i < size; i++) { - * std::cout << "Status: " << result.GetMultiStatus()[i].ToString() - * << ", value: " << result.GetValues()[i] << std::endl; - * } - * } - * }; - * */ + virtual Status Handle(const IteratorTraceExecutionResult& result) = 0; + }; // Accept the handler. virtual Status Accept(Handler* handler) = 0; @@ -106,8 +80,7 @@ class TraceExecutionResult : public TraceRecordResult { }; // Result for operations that only return a single Status. -// Example operations: DB::Write(), Iterator::Seek() and -// Iterator::SeekForPrev(). +// Example operation: DB::Write() class StatusOnlyTraceExecutionResult : public TraceExecutionResult { public: StatusOnlyTraceExecutionResult(Status status, uint64_t start_timestamp, @@ -138,7 +111,7 @@ class SingleValueTraceExecutionResult : public TraceExecutionResult { virtual ~SingleValueTraceExecutionResult() override; - // Return status of DB::Get(), etc. + // Return status of DB::Get(). virtual const Status& GetStatus() const; // Value for the searched key. @@ -151,7 +124,7 @@ class SingleValueTraceExecutionResult : public TraceExecutionResult { std::string value_; }; -// Result for operations that return multiple Status(es) and values. +// Result for operations that return multiple Status(es) and values as vectors. // Example operation: DB::MultiGet() class MultiValuesTraceExecutionResult : public TraceExecutionResult { public: @@ -162,7 +135,7 @@ class MultiValuesTraceExecutionResult : public TraceExecutionResult { virtual ~MultiValuesTraceExecutionResult() override; - // Returned Status(es) of DB::MultiGet(), etc. + // Returned Status(es) of DB::MultiGet(). virtual const std::vector& GetMultiStatus() const; // Returned values for the searched keys. @@ -175,4 +148,40 @@ class MultiValuesTraceExecutionResult : public TraceExecutionResult { std::vector values_; }; +// Result for Iterator operations. +// Example operations: Iterator::Seek(), Iterator::SeekForPrev() +class IteratorTraceExecutionResult : public TraceExecutionResult { + public: + IteratorTraceExecutionResult(bool valid, Status status, PinnableSlice&& key, + PinnableSlice&& value, uint64_t start_timestamp, + uint64_t end_timestamp, TraceType trace_type); + + IteratorTraceExecutionResult(bool valid, Status status, + const std::string& key, const std::string& value, + uint64_t start_timestamp, uint64_t end_timestamp, + TraceType trace_type); + + virtual ~IteratorTraceExecutionResult() override; + + // Return if the Iterator is valid. + virtual bool GetValid() const; + + // Return the status of the Iterator. + virtual const Status& GetStatus() const; + + // Key of the current iterating entry, empty if GetValid() is false. + virtual Slice GetKey() const; + + // Value of the current iterating entry, empty if GetValid() is false. + virtual Slice GetValue() const; + + virtual Status Accept(Handler* handler) override; + + private: + bool valid_; + Status status_; + PinnableSlice key_; + PinnableSlice value_; +}; + } // namespace ROCKSDB_NAMESPACE diff --git a/monitoring/perf_context.cc b/monitoring/perf_context.cc index d45d84fb6..9e56f1018 100644 --- a/monitoring/perf_context.cc +++ b/monitoring/perf_context.cc @@ -47,6 +47,7 @@ PerfContext::PerfContext(const PerfContext& other) { block_cache_filter_hit_count = other.block_cache_filter_hit_count; filter_block_read_count = other.filter_block_read_count; compression_dict_block_read_count = other.compression_dict_block_read_count; + secondary_cache_hit_count = other.secondary_cache_hit_count; block_checksum_time = other.block_checksum_time; block_decompress_time = other.block_decompress_time; get_read_bytes = other.get_read_bytes; @@ -144,6 +145,7 @@ PerfContext::PerfContext(PerfContext&& other) noexcept { block_cache_filter_hit_count = other.block_cache_filter_hit_count; filter_block_read_count = other.filter_block_read_count; compression_dict_block_read_count = other.compression_dict_block_read_count; + secondary_cache_hit_count = other.secondary_cache_hit_count; block_checksum_time = other.block_checksum_time; block_decompress_time = other.block_decompress_time; get_read_bytes = other.get_read_bytes; @@ -243,6 +245,7 @@ PerfContext& PerfContext::operator=(const PerfContext& other) { block_cache_filter_hit_count = other.block_cache_filter_hit_count; filter_block_read_count = other.filter_block_read_count; compression_dict_block_read_count = other.compression_dict_block_read_count; + secondary_cache_hit_count = other.secondary_cache_hit_count; block_checksum_time = other.block_checksum_time; block_decompress_time = other.block_decompress_time; get_read_bytes = other.get_read_bytes; @@ -339,6 +342,7 @@ void PerfContext::Reset() { block_cache_filter_hit_count = 0; filter_block_read_count = 0; compression_dict_block_read_count = 0; + secondary_cache_hit_count = 0; block_checksum_time = 0; block_decompress_time = 0; get_read_bytes = 0; @@ -459,6 +463,7 @@ std::string PerfContext::ToString(bool exclude_zero_counters) const { PERF_CONTEXT_OUTPUT(block_cache_filter_hit_count); PERF_CONTEXT_OUTPUT(filter_block_read_count); PERF_CONTEXT_OUTPUT(compression_dict_block_read_count); + PERF_CONTEXT_OUTPUT(secondary_cache_hit_count); PERF_CONTEXT_OUTPUT(block_checksum_time); PERF_CONTEXT_OUTPUT(block_decompress_time); PERF_CONTEXT_OUTPUT(get_read_bytes); diff --git a/table/block_based/block_based_table_reader.cc b/table/block_based/block_based_table_reader.cc index c9ee60aeb..ee6ccf830 100644 --- a/table/block_based/block_based_table_reader.cc +++ b/table/block_based/block_based_table_reader.cc @@ -1713,6 +1713,9 @@ void BlockBasedTable::RetrieveMultipleBlocks( req_offset_for_block.emplace_back(0); } req_idx_for_block.emplace_back(read_reqs.size()); + + PERF_COUNTER_ADD(block_read_count, 1); + PERF_COUNTER_ADD(block_read_byte, block_size(handle)); } // Handle the last block and process the pending last request if (prev_len != 0) { diff --git a/table/block_based/block_based_table_reader_test.cc b/table/block_based/block_based_table_reader_test.cc index 07136dbf8..775f2f505 100644 --- a/table/block_based/block_based_table_reader_test.cc +++ b/table/block_based/block_based_table_reader_test.cc @@ -228,8 +228,16 @@ TEST_P(BlockBasedTableReaderTest, MultiGet) { // Execute MultiGet. MultiGetContext::Range range = ctx.GetMultiGetRange(); + PerfContext* perf_ctx = get_perf_context(); + perf_ctx->Reset(); table->MultiGet(ReadOptions(), &range, nullptr); + ASSERT_GE(perf_ctx->block_read_count - perf_ctx->index_block_read_count - + perf_ctx->filter_block_read_count - + perf_ctx->compression_dict_block_read_count, + 1); + ASSERT_GE(perf_ctx->block_read_byte, 1); + for (const Status& status : statuses) { ASSERT_OK(status); } diff --git a/trace_replay/trace_record.cc b/trace_replay/trace_record.cc index e0ce02090..21df0275d 100644 --- a/trace_replay/trace_record.cc +++ b/trace_replay/trace_record.cc @@ -82,6 +82,27 @@ Status GetQueryTraceRecord::Accept(Handler* handler, IteratorQueryTraceRecord::IteratorQueryTraceRecord(uint64_t timestamp) : QueryTraceRecord(timestamp) {} +IteratorQueryTraceRecord::IteratorQueryTraceRecord(PinnableSlice&& lower_bound, + PinnableSlice&& upper_bound, + uint64_t timestamp) + : QueryTraceRecord(timestamp), + lower_(std::move(lower_bound)), + upper_(std::move(upper_bound)) {} + +IteratorQueryTraceRecord::IteratorQueryTraceRecord( + const std::string& lower_bound, const std::string& upper_bound, + uint64_t timestamp) + : QueryTraceRecord(timestamp) { + lower_.PinSelf(lower_bound); + upper_.PinSelf(upper_bound); +} + +IteratorQueryTraceRecord::~IteratorQueryTraceRecord() {} + +Slice IteratorQueryTraceRecord::GetLowerBound() const { return Slice(lower_); } + +Slice IteratorQueryTraceRecord::GetUpperBound() const { return Slice(upper_); } + // IteratorSeekQueryTraceRecord IteratorSeekQueryTraceRecord::IteratorSeekQueryTraceRecord( SeekType seek_type, uint32_t column_family_id, PinnableSlice&& key, @@ -100,6 +121,26 @@ IteratorSeekQueryTraceRecord::IteratorSeekQueryTraceRecord( key_.PinSelf(key); } +IteratorSeekQueryTraceRecord::IteratorSeekQueryTraceRecord( + SeekType seek_type, uint32_t column_family_id, PinnableSlice&& key, + PinnableSlice&& lower_bound, PinnableSlice&& upper_bound, + uint64_t timestamp) + : IteratorQueryTraceRecord(std::move(lower_bound), std::move(upper_bound), + timestamp), + type_(seek_type), + cf_id_(column_family_id), + key_(std::move(key)) {} + +IteratorSeekQueryTraceRecord::IteratorSeekQueryTraceRecord( + SeekType seek_type, uint32_t column_family_id, const std::string& key, + const std::string& lower_bound, const std::string& upper_bound, + uint64_t timestamp) + : IteratorQueryTraceRecord(lower_bound, upper_bound, timestamp), + type_(seek_type), + cf_id_(column_family_id) { + key_.PinSelf(key); +} + IteratorSeekQueryTraceRecord::~IteratorSeekQueryTraceRecord() { key_.clear(); } TraceType IteratorSeekQueryTraceRecord::GetTraceType() const { diff --git a/trace_replay/trace_record_handler.cc b/trace_replay/trace_record_handler.cc index 6343d2ed3..8f5371622 100644 --- a/trace_replay/trace_record_handler.cc +++ b/trace_replay/trace_record_handler.cc @@ -93,7 +93,16 @@ Status TraceExecutionHandler::Handle( return Status::Corruption("Invalid Column Family ID."); } - Iterator* single_iter = db_->NewIterator(read_opts_, it->second); + ReadOptions r_opts = read_opts_; + Slice lower = record.GetLowerBound(); + if (!lower.empty()) { + r_opts.iterate_lower_bound = &lower; + } + Slice upper = record.GetUpperBound(); + if (!upper.empty()) { + r_opts.iterate_upper_bound = &upper; + } + Iterator* single_iter = db_->NewIterator(r_opts, it->second); uint64_t start = clock_->NowMicros(); @@ -111,12 +120,21 @@ Status TraceExecutionHandler::Handle( uint64_t end = clock_->NowMicros(); Status s = single_iter->status(); - delete single_iter; - if (s.ok() && result != nullptr) { - result->reset(new StatusOnlyTraceExecutionResult(s, start, end, - record.GetTraceType())); + if (single_iter->Valid()) { + PinnableSlice ps_key; + ps_key.PinSelf(single_iter->key()); + PinnableSlice ps_value; + ps_value.PinSelf(single_iter->value()); + result->reset(new IteratorTraceExecutionResult( + true, s, std::move(ps_key), std::move(ps_value), start, end, + record.GetTraceType())); + } else { + result->reset(new IteratorTraceExecutionResult( + false, s, "", "", start, end, record.GetTraceType())); + } } + delete single_iter; return s; } diff --git a/trace_replay/trace_record_result.cc b/trace_replay/trace_record_result.cc index b22b57e43..9c0cb43ad 100644 --- a/trace_replay/trace_record_result.cc +++ b/trace_replay/trace_record_result.cc @@ -103,4 +103,44 @@ Status MultiValuesTraceExecutionResult::Accept(Handler* handler) { return handler->Handle(*this); } +// IteratorTraceExecutionResult +IteratorTraceExecutionResult::IteratorTraceExecutionResult( + bool valid, Status status, PinnableSlice&& key, PinnableSlice&& value, + uint64_t start_timestamp, uint64_t end_timestamp, TraceType trace_type) + : TraceExecutionResult(start_timestamp, end_timestamp, trace_type), + valid_(valid), + status_(std::move(status)), + key_(std::move(key)), + value_(std::move(value)) {} + +IteratorTraceExecutionResult::IteratorTraceExecutionResult( + bool valid, Status status, const std::string& key, const std::string& value, + uint64_t start_timestamp, uint64_t end_timestamp, TraceType trace_type) + : TraceExecutionResult(start_timestamp, end_timestamp, trace_type), + valid_(valid), + status_(std::move(status)) { + key_.PinSelf(key); + value_.PinSelf(value); +} + +IteratorTraceExecutionResult::~IteratorTraceExecutionResult() { + key_.clear(); + value_.clear(); +} + +bool IteratorTraceExecutionResult::GetValid() const { return valid_; } + +const Status& IteratorTraceExecutionResult::GetStatus() const { + return status_; +} + +Slice IteratorTraceExecutionResult::GetKey() const { return Slice(key_); } + +Slice IteratorTraceExecutionResult::GetValue() const { return Slice(value_); } + +Status IteratorTraceExecutionResult::Accept(Handler* handler) { + assert(handler != nullptr); + return handler->Handle(*this); +} + } // namespace ROCKSDB_NAMESPACE diff --git a/trace_replay/trace_replay.cc b/trace_replay/trace_replay.cc index af2e76500..01867b9f4 100644 --- a/trace_replay/trace_replay.cc +++ b/trace_replay/trace_replay.cc @@ -225,14 +225,12 @@ Status TracerHelper::DecodeIterRecord(Trace* trace, int trace_file_version, uint32_t cf_id = 0; Slice iter_key; + Slice lower_bound; + Slice upper_bound; if (trace_file_version < 2) { DecodeCFAndKey(trace->payload, &cf_id, &iter_key); } else { - // Are these two used anywhere? - Slice lower_bound; - Slice upper_bound; - Slice buf(trace->payload); GetFixed64(&buf, &trace->payload_map); int64_t payload_map = static_cast(trace->payload_map); @@ -264,9 +262,14 @@ Status TracerHelper::DecodeIterRecord(Trace* trace, int trace_file_version, if (record != nullptr) { PinnableSlice ps_key; ps_key.PinSelf(iter_key); + PinnableSlice ps_lower; + ps_lower.PinSelf(lower_bound); + PinnableSlice ps_upper; + ps_upper.PinSelf(upper_bound); record->reset(new IteratorSeekQueryTraceRecord( static_cast(trace->type), cf_id, - std::move(ps_key), trace->ts)); + std::move(ps_key), std::move(ps_lower), std::move(ps_upper), + trace->ts)); } return Status::OK();