Merge remote-tracking branch 'origin/master' into ribbon_bloom_hybrid

This commit is contained in:
Peter Dillinger 2021-08-20 16:00:50 -07:00
commit 46c4a5a5da
16 changed files with 390 additions and 61 deletions

View File

@ -8,6 +8,8 @@
* Fixed an issue where `OnFlushCompleted` was not called for atomic flush. * Fixed an issue where `OnFlushCompleted` was not called for atomic flush.
* Fixed a bug affecting the batched `MultiGet` API when used with keys spanning multiple column families and `sorted_input == false`. * Fixed a bug affecting the batched `MultiGet` API when used with keys spanning multiple column families and `sorted_input == false`.
* Fixed a potential incorrect result in opt mode and assertion failures caused by releasing snapshot(s) during compaction. * Fixed a potential incorrect result in opt mode and assertion failures caused by releasing snapshot(s) during compaction.
* Fixed passing of BlobFileCompletionCallback to Compaction job and Atomic flush job which was default paramter (nullptr). BlobFileCompletitionCallback is internal callback that manages addition of blob files to SSTFileManager.
* Fixed MultiGet not updating the block_read_count and block_read_byte PerfContext counters
### New Features ### New Features
* Made the EventListener extend the Customizable class. * Made the EventListener extend the Customizable class.
@ -19,6 +21,7 @@
* Fast forward option in Trace replay changed to double type to allow replaying at a lower speed, by settings the value between 0 and 1. This option can be set via `ReplayOptions` in `Replayer::Replay()`, or via `--trace_replay_fast_forward` in db_bench. * Fast forward option in Trace replay changed to double type to allow replaying at a lower speed, by settings the value between 0 and 1. This option can be set via `ReplayOptions` in `Replayer::Replay()`, or via `--trace_replay_fast_forward` in db_bench.
* Add property `LiveSstFilesSizeAtTemperature` to retrieve sst file size at different temperature. * Add property `LiveSstFilesSizeAtTemperature` to retrieve sst file size at different temperature.
* Added a stat rocksdb.secondary.cache.hits * Added a stat rocksdb.secondary.cache.hits
* Added a PerfContext counter secondary_cache_hit_count
* The integrated BlobDB implementation now supports the tickers `BLOB_DB_BLOB_FILE_BYTES_READ`, `BLOB_DB_GC_NUM_KEYS_RELOCATED`, and `BLOB_DB_GC_BYTES_RELOCATED`, as well as the histograms `BLOB_DB_COMPRESSION_MICROS` and `BLOB_DB_DECOMPRESSION_MICROS`. * The integrated BlobDB implementation now supports the tickers `BLOB_DB_BLOB_FILE_BYTES_READ`, `BLOB_DB_GC_NUM_KEYS_RELOCATED`, and `BLOB_DB_GC_BYTES_RELOCATED`, as well as the histograms `BLOB_DB_COMPRESSION_MICROS` and `BLOB_DB_DECOMPRESSION_MICROS`.
* Added hybrid configuration of Ribbon filter and Bloom filter where some LSM levels use Ribbon for memory space efficiency and some use Bloom for speed. See NewRibbonFilterPolicy. This also changes the default behavior of NewRibbonFilterPolicy to use Bloom for flushes under Leveled and Universal compaction and Ribbon otherwise. * Added hybrid configuration of Ribbon filter and Bloom filter where some LSM levels use Ribbon for memory space efficiency and some use Bloom for speed. See NewRibbonFilterPolicy. This also changes the default behavior of NewRibbonFilterPolicy to use Bloom for flushes under Leveled and Universal compaction and Ribbon otherwise.

3
cache/lru_cache.cc vendored
View File

@ -13,6 +13,7 @@
#include <cstdint> #include <cstdint>
#include <cstdio> #include <cstdio>
#include "monitoring/perf_context_imp.h"
#include "monitoring/statistics.h" #include "monitoring/statistics.h"
#include "util/mutexlock.h" #include "util/mutexlock.h"
@ -473,6 +474,7 @@ Cache::Handle* LRUCacheShard::Lookup(
e->Free(); e->Free();
e = nullptr; e = nullptr;
} else { } else {
PERF_COUNTER_ADD(secondary_cache_hit_count, 1);
RecordTick(stats, SECONDARY_CACHE_HITS); RecordTick(stats, SECONDARY_CACHE_HITS);
} }
} else { } else {
@ -481,6 +483,7 @@ Cache::Handle* LRUCacheShard::Lookup(
e->SetIncomplete(true); e->SetIncomplete(true);
// This may be slightly inaccurate, if the lookup eventually fails. // This may be slightly inaccurate, if the lookup eventually fails.
// But the probability is very low. // But the probability is very low.
PERF_COUNTER_ADD(secondary_cache_hit_count, 1);
RecordTick(stats, SECONDARY_CACHE_HITS); RecordTick(stats, SECONDARY_CACHE_HITS);
} }
} }

View File

@ -481,6 +481,7 @@ TEST_F(LRUSecondaryCacheTest, BasicTest) {
ASSERT_OK(cache->Insert("k2", item2, &LRUSecondaryCacheTest::helper_, ASSERT_OK(cache->Insert("k2", item2, &LRUSecondaryCacheTest::helper_,
str2.length())); str2.length()));
get_perf_context()->Reset();
Cache::Handle* handle; Cache::Handle* handle;
handle = handle =
cache->Lookup("k2", &LRUSecondaryCacheTest::helper_, test_item_creator, cache->Lookup("k2", &LRUSecondaryCacheTest::helper_, test_item_creator,
@ -497,6 +498,8 @@ TEST_F(LRUSecondaryCacheTest, BasicTest) {
ASSERT_EQ(secondary_cache->num_lookups(), 1u); ASSERT_EQ(secondary_cache->num_lookups(), 1u);
ASSERT_EQ(stats->getTickerCount(SECONDARY_CACHE_HITS), ASSERT_EQ(stats->getTickerCount(SECONDARY_CACHE_HITS),
secondary_cache->num_lookups()); secondary_cache->num_lookups());
PerfContext perf_ctx = *get_perf_context();
ASSERT_EQ(perf_ctx.secondary_cache_hit_count, secondary_cache->num_lookups());
cache.reset(); cache.reset();
secondary_cache.reset(); secondary_cache.reset();

View File

@ -411,7 +411,7 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
stats_, &event_logger_, mutable_cf_options.report_bg_io_stats, stats_, &event_logger_, mutable_cf_options.report_bg_io_stats,
false /* sync_output_directory */, false /* write_manifest */, false /* sync_output_directory */, false /* write_manifest */,
thread_pri, io_tracer_, db_id_, db_session_id_, thread_pri, io_tracer_, db_id_, db_session_id_,
cfd->GetFullHistoryTsLow())); cfd->GetFullHistoryTsLow(), &blob_callback_));
} }
std::vector<FileMetaData> file_meta(num_cfs); std::vector<FileMetaData> file_meta(num_cfs);
@ -1280,7 +1280,7 @@ Status DBImpl::CompactFilesImpl(
c->mutable_cf_options()->report_bg_io_stats, dbname_, c->mutable_cf_options()->report_bg_io_stats, dbname_,
&compaction_job_stats, Env::Priority::USER, io_tracer_, &compaction_job_stats, Env::Priority::USER, io_tracer_,
&manual_compaction_paused_, nullptr, db_id_, db_session_id_, &manual_compaction_paused_, nullptr, db_id_, db_session_id_,
c->column_family_data()->GetFullHistoryTsLow()); c->column_family_data()->GetFullHistoryTsLow(), &blob_callback_);
// Creating a compaction influences the compaction score because the score // Creating a compaction influences the compaction score because the score
// takes running compactions into account (by skipping files that are already // takes running compactions into account (by skipping files that are already
@ -3193,7 +3193,8 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
&compaction_job_stats, thread_pri, io_tracer_, &compaction_job_stats, thread_pri, io_tracer_,
is_manual ? &manual_compaction_paused_ : nullptr, is_manual ? &manual_compaction_paused_ : nullptr,
is_manual ? manual_compaction->canceled : nullptr, db_id_, is_manual ? manual_compaction->canceled : nullptr, db_id_,
db_session_id_, c->column_family_data()->GetFullHistoryTsLow()); db_session_id_, c->column_family_data()->GetFullHistoryTsLow(),
&blob_callback_);
compaction_job.Prepare(); compaction_job.Prepare();
NotifyOnCompactionBegin(c->column_family_data(), c.get(), status, NotifyOnCompactionBegin(c->column_family_data(), c.get(), status,

View File

@ -1569,6 +1569,96 @@ TEST_F(DBSSTTest, GetTotalSstFilesSizeVersionsFilesShared) {
ASSERT_EQ(total_sst_files_size, 0); ASSERT_EQ(total_sst_files_size, 0);
} }
// This test if blob files are recorded by SST File Manager when Compaction job
// creates/delete them and in case of AtomicFlush.
TEST_F(DBSSTTest, DBWithSFMForBlobFilesAtomicFlush) {
std::shared_ptr<SstFileManager> sst_file_manager(NewSstFileManager(env_));
auto sfm = static_cast<SstFileManagerImpl*>(sst_file_manager.get());
Options options = CurrentOptions();
options.sst_file_manager = sst_file_manager;
options.enable_blob_files = true;
options.min_blob_size = 0;
options.disable_auto_compactions = true;
options.enable_blob_garbage_collection = true;
options.blob_garbage_collection_age_cutoff = 0.5;
options.atomic_flush = true;
int files_added = 0;
int files_deleted = 0;
int files_scheduled_to_delete = 0;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"SstFileManagerImpl::OnAddFile", [&](void* arg) {
const std::string* const file_path =
static_cast<const std::string*>(arg);
if (EndsWith(*file_path, ".blob")) {
files_added++;
}
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"SstFileManagerImpl::OnDeleteFile", [&](void* arg) {
const std::string* const file_path =
static_cast<const std::string*>(arg);
if (EndsWith(*file_path, ".blob")) {
files_deleted++;
}
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"SstFileManagerImpl::ScheduleFileDeletion", [&](void* arg) {
assert(arg);
const std::string* const file_path =
static_cast<const std::string*>(arg);
if (EndsWith(*file_path, ".blob")) {
++files_scheduled_to_delete;
}
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
DestroyAndReopen(options);
Random rnd(301);
ASSERT_OK(Put("key_1", "value_1"));
ASSERT_OK(Put("key_2", "value_2"));
ASSERT_OK(Put("key_3", "value_3"));
ASSERT_OK(Put("key_4", "value_4"));
ASSERT_OK(Flush());
// Overwrite will create the garbage data.
ASSERT_OK(Put("key_3", "new_value_3"));
ASSERT_OK(Put("key_4", "new_value_4"));
ASSERT_OK(Flush());
ASSERT_OK(Put("Key5", "blob_value5"));
ASSERT_OK(Put("Key6", "blob_value6"));
ASSERT_OK(Flush());
ASSERT_EQ(files_added, 3);
ASSERT_EQ(files_deleted, 0);
ASSERT_EQ(files_scheduled_to_delete, 0);
files_added = 0;
constexpr Slice* begin = nullptr;
constexpr Slice* end = nullptr;
// Compaction job will create a new file and delete the older files.
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), begin, end));
ASSERT_EQ(files_added, 1);
ASSERT_EQ(files_deleted, 1);
ASSERT_EQ(files_scheduled_to_delete, 1);
Close();
ASSERT_OK(DestroyDB(dbname_, options));
sfm->WaitForEmptyTrash();
ASSERT_EQ(files_deleted, 4);
ASSERT_EQ(files_scheduled_to_delete, 4);
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
}
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE

View File

@ -4256,13 +4256,6 @@ class TraceExecutionResultHandler : public TraceRecordResult::Handler {
writes_++; writes_++;
break; break;
} }
case kTraceIteratorSeek:
case kTraceIteratorSeekForPrev: {
total_latency_ += result.GetLatency();
cnt_++;
seeks_++;
break;
}
default: default:
return Status::Corruption("Type mismatch."); return Status::Corruption("Type mismatch.");
} }
@ -4309,6 +4302,25 @@ class TraceExecutionResultHandler : public TraceRecordResult::Handler {
return Status::OK(); return Status::OK();
} }
virtual Status Handle(const IteratorTraceExecutionResult& result) override {
if (result.GetStartTimestamp() > result.GetEndTimestamp()) {
return Status::InvalidArgument("Invalid timestamps.");
}
result.GetStatus().PermitUncheckedError();
switch (result.GetTraceType()) {
case kTraceIteratorSeek:
case kTraceIteratorSeekForPrev: {
total_latency_ += result.GetLatency();
cnt_++;
seeks_++;
break;
}
default:
return Status::Corruption("Type mismatch.");
}
return Status::OK();
}
void Reset() { void Reset() {
total_latency_ = 0; total_latency_ = 0;
cnt_ = 0; cnt_ = 0;
@ -4538,6 +4550,37 @@ TEST_F(DBTest2, TraceAndManualReplay) {
single_iter = db_->NewIterator(ro); single_iter = db_->NewIterator(ro);
single_iter->Seek("f"); single_iter->Seek("f");
single_iter->SeekForPrev("g"); single_iter->SeekForPrev("g");
ASSERT_OK(single_iter->status());
delete single_iter;
// Write some sequenced keys for testing lower/upper bounds of iterator.
batch.Clear();
ASSERT_OK(batch.Put("iter-0", "iter-0"));
ASSERT_OK(batch.Put("iter-1", "iter-1"));
ASSERT_OK(batch.Put("iter-2", "iter-2"));
ASSERT_OK(batch.Put("iter-3", "iter-3"));
ASSERT_OK(batch.Put("iter-4", "iter-4"));
ASSERT_OK(db_->Write(wo, &batch));
ReadOptions bounded_ro = ro;
Slice lower_bound("iter-1");
Slice upper_bound("iter-3");
bounded_ro.iterate_lower_bound = &lower_bound;
bounded_ro.iterate_upper_bound = &upper_bound;
single_iter = db_->NewIterator(bounded_ro);
single_iter->Seek("iter-0");
ASSERT_EQ(single_iter->key().ToString(), "iter-1");
single_iter->Seek("iter-2");
ASSERT_EQ(single_iter->key().ToString(), "iter-2");
single_iter->Seek("iter-4");
ASSERT_FALSE(single_iter->Valid());
single_iter->SeekForPrev("iter-0");
ASSERT_FALSE(single_iter->Valid());
single_iter->SeekForPrev("iter-2");
ASSERT_EQ(single_iter->key().ToString(), "iter-2");
single_iter->SeekForPrev("iter-4");
ASSERT_EQ(single_iter->key().ToString(), "iter-2");
ASSERT_OK(single_iter->status());
delete single_iter; delete single_iter;
ASSERT_EQ("1", Get(0, "a")); ASSERT_EQ("1", Get(0, "a"));
@ -4548,6 +4591,9 @@ TEST_F(DBTest2, TraceAndManualReplay) {
ASSERT_EQ("NOT_FOUND", Get(1, "leveldb")); ASSERT_EQ("NOT_FOUND", Get(1, "leveldb"));
// Same as TraceAndReplay, Write x 8, Get x 3, Seek x 2. // Same as TraceAndReplay, Write x 8, Get x 3, Seek x 2.
// Plus 1 WriteBatch for iterator with lower/upper bounds, and 6
// Seek(ForPrev)s.
// Total Write x 9, Get x 3, Seek x 8
ASSERT_OK(db_->EndTrace()); ASSERT_OK(db_->EndTrace());
// These should not get into the trace file as it is after EndTrace. // These should not get into the trace file as it is after EndTrace.
ASSERT_OK(Put("hello", "world")); ASSERT_OK(Put("hello", "world"));
@ -4613,6 +4659,36 @@ TEST_F(DBTest2, TraceAndManualReplay) {
ASSERT_OK(replayer->Execute(record, &result)); ASSERT_OK(replayer->Execute(record, &result));
if (result != nullptr) { if (result != nullptr) {
ASSERT_OK(result->Accept(&res_handler)); ASSERT_OK(result->Accept(&res_handler));
if (record->GetTraceType() == kTraceIteratorSeek ||
record->GetTraceType() == kTraceIteratorSeekForPrev) {
IteratorSeekQueryTraceRecord* iter_rec =
dynamic_cast<IteratorSeekQueryTraceRecord*>(record.get());
IteratorTraceExecutionResult* iter_res =
dynamic_cast<IteratorTraceExecutionResult*>(result.get());
// Check if lower/upper bounds are correctly saved and decoded.
std::string lower_str = iter_rec->GetLowerBound().ToString();
std::string upper_str = iter_rec->GetUpperBound().ToString();
std::string iter_key = iter_res->GetKey().ToString();
std::string iter_value = iter_res->GetValue().ToString();
if (!lower_str.empty() && !upper_str.empty()) {
ASSERT_EQ(lower_str, "iter-1");
ASSERT_EQ(upper_str, "iter-3");
if (iter_res->GetValid()) {
// If iterator is valid, then lower_bound <= key < upper_bound.
ASSERT_GE(iter_key, lower_str);
ASSERT_LT(iter_key, upper_str);
} else {
// If iterator is invalid, then
// key < lower_bound or key >= upper_bound.
ASSERT_TRUE(iter_key < lower_str || iter_key >= upper_str);
}
}
// If iterator is invalid, the key and value should be empty.
if (!iter_res->GetValid()) {
ASSERT_TRUE(iter_key.empty());
ASSERT_TRUE(iter_value.empty());
}
}
result.reset(); result.reset();
} }
} }
@ -4622,9 +4698,9 @@ TEST_F(DBTest2, TraceAndManualReplay) {
ASSERT_TRUE(s.IsIncomplete()); ASSERT_TRUE(s.IsIncomplete());
ASSERT_TRUE(replayer->Next(nullptr).IsIncomplete()); ASSERT_TRUE(replayer->Next(nullptr).IsIncomplete());
ASSERT_GT(res_handler.GetAvgLatency(), 0.0); ASSERT_GT(res_handler.GetAvgLatency(), 0.0);
ASSERT_EQ(res_handler.GetNumWrites(), 8); ASSERT_EQ(res_handler.GetNumWrites(), 9);
ASSERT_EQ(res_handler.GetNumGets(), 3); ASSERT_EQ(res_handler.GetNumGets(), 3);
ASSERT_EQ(res_handler.GetNumIterSeeks(), 2); ASSERT_EQ(res_handler.GetNumIterSeeks(), 8);
ASSERT_EQ(res_handler.GetNumMultiGets(), 0); ASSERT_EQ(res_handler.GetNumMultiGets(), 0);
res_handler.Reset(); res_handler.Reset();
} }

View File

@ -74,6 +74,9 @@ struct PerfContext {
uint64_t filter_block_read_count; // total number of filter block reads uint64_t filter_block_read_count; // total number of filter block reads
uint64_t compression_dict_block_read_count; // total number of compression uint64_t compression_dict_block_read_count; // total number of compression
// dictionary block reads // dictionary block reads
uint64_t secondary_cache_hit_count; // total number of secondary cache hits
uint64_t block_checksum_time; // total nanos spent on block checksum uint64_t block_checksum_time; // total nanos spent on block checksum
uint64_t block_decompress_time; // total nanos spent on block decompression uint64_t block_decompress_time; // total nanos spent on block decompression

View File

@ -65,19 +65,15 @@ class TraceRecord {
public: public:
virtual ~Handler() = default; virtual ~Handler() = default;
// Handle WriteQueryTraceRecord
virtual Status Handle(const WriteQueryTraceRecord& record, virtual Status Handle(const WriteQueryTraceRecord& record,
std::unique_ptr<TraceRecordResult>* result) = 0; std::unique_ptr<TraceRecordResult>* result) = 0;
// Handle GetQueryTraceRecord
virtual Status Handle(const GetQueryTraceRecord& record, virtual Status Handle(const GetQueryTraceRecord& record,
std::unique_ptr<TraceRecordResult>* result) = 0; std::unique_ptr<TraceRecordResult>* result) = 0;
// Handle IteratorSeekQueryTraceRecord
virtual Status Handle(const IteratorSeekQueryTraceRecord& record, virtual Status Handle(const IteratorSeekQueryTraceRecord& record,
std::unique_ptr<TraceRecordResult>* result) = 0; std::unique_ptr<TraceRecordResult>* result) = 0;
// Handle MultiGetQueryTraceRecord
virtual Status Handle(const MultiGetQueryTraceRecord& record, virtual Status Handle(const MultiGetQueryTraceRecord& record,
std::unique_ptr<TraceRecordResult>* result) = 0; std::unique_ptr<TraceRecordResult>* result) = 0;
}; };
@ -152,6 +148,23 @@ class GetQueryTraceRecord : public QueryTraceRecord {
class IteratorQueryTraceRecord : public QueryTraceRecord { class IteratorQueryTraceRecord : public QueryTraceRecord {
public: public:
explicit IteratorQueryTraceRecord(uint64_t timestamp); explicit IteratorQueryTraceRecord(uint64_t timestamp);
IteratorQueryTraceRecord(PinnableSlice&& lower_bound,
PinnableSlice&& upper_bound, uint64_t timestamp);
IteratorQueryTraceRecord(const std::string& lower_bound,
const std::string& upper_bound, uint64_t timestamp);
virtual ~IteratorQueryTraceRecord() override;
// Get the iterator's lower/upper bound. They may be used in ReadOptions to
// create an Iterator instance.
virtual Slice GetLowerBound() const;
virtual Slice GetUpperBound() const;
private:
PinnableSlice lower_;
PinnableSlice upper_;
}; };
// Trace record for Iterator::Seek() and Iterator::SeekForPrev() operation. // Trace record for Iterator::Seek() and Iterator::SeekForPrev() operation.
@ -169,6 +182,16 @@ class IteratorSeekQueryTraceRecord : public IteratorQueryTraceRecord {
IteratorSeekQueryTraceRecord(SeekType seekType, uint32_t column_family_id, IteratorSeekQueryTraceRecord(SeekType seekType, uint32_t column_family_id,
const std::string& key, uint64_t timestamp); const std::string& key, uint64_t timestamp);
IteratorSeekQueryTraceRecord(SeekType seekType, uint32_t column_family_id,
PinnableSlice&& key, PinnableSlice&& lower_bound,
PinnableSlice&& upper_bound, uint64_t timestamp);
IteratorSeekQueryTraceRecord(SeekType seekType, uint32_t column_family_id,
const std::string& key,
const std::string& lower_bound,
const std::string& upper_bound,
uint64_t timestamp);
virtual ~IteratorSeekQueryTraceRecord() override; virtual ~IteratorSeekQueryTraceRecord() override;
// Trace type matches the seek type. // Trace type matches the seek type.

View File

@ -9,11 +9,13 @@
#include <vector> #include <vector>
#include "rocksdb/rocksdb_namespace.h" #include "rocksdb/rocksdb_namespace.h"
#include "rocksdb/slice.h"
#include "rocksdb/status.h" #include "rocksdb/status.h"
#include "rocksdb/trace_record.h" #include "rocksdb/trace_record.h"
namespace ROCKSDB_NAMESPACE { namespace ROCKSDB_NAMESPACE {
class IteratorTraceExecutionResult;
class MultiValuesTraceExecutionResult; class MultiValuesTraceExecutionResult;
class SingleValueTraceExecutionResult; class SingleValueTraceExecutionResult;
class StatusOnlyTraceExecutionResult; class StatusOnlyTraceExecutionResult;
@ -34,42 +36,14 @@ class TraceRecordResult {
public: public:
virtual ~Handler() = default; virtual ~Handler() = default;
// Handle StatusOnlyTraceExecutionResult
virtual Status Handle(const StatusOnlyTraceExecutionResult& result) = 0; virtual Status Handle(const StatusOnlyTraceExecutionResult& result) = 0;
// Handle SingleValueTraceExecutionResult
virtual Status Handle(const SingleValueTraceExecutionResult& result) = 0; virtual Status Handle(const SingleValueTraceExecutionResult& result) = 0;
// Handle MultiValuesTraceExecutionResult
virtual Status Handle(const MultiValuesTraceExecutionResult& result) = 0; virtual Status Handle(const MultiValuesTraceExecutionResult& result) = 0;
};
/* virtual Status Handle(const IteratorTraceExecutionResult& result) = 0;
* Example handler to just print the trace record execution results. };
*
* class ResultPrintHandler : public TraceRecordResult::Handler {
* public:
* ResultPrintHandler();
* ~ResultPrintHandler() override {}
*
* Status Handle(const StatusOnlyTraceExecutionResult& result) override {
* std::cout << "Status: " << result.GetStatus().ToString() << std::endl;
* }
*
* Status Handle(const SingleValueTraceExecutionResult& result) override {
* std::cout << "Status: " << result.GetStatus().ToString()
* << ", value: " << result.GetValue() << std::endl;
* }
*
* Status Handle(const MultiValuesTraceExecutionResult& result) override {
* size_t size = result.GetMultiStatus().size();
* for (size_t i = 0; i < size; i++) {
* std::cout << "Status: " << result.GetMultiStatus()[i].ToString()
* << ", value: " << result.GetValues()[i] << std::endl;
* }
* }
* };
* */
// Accept the handler. // Accept the handler.
virtual Status Accept(Handler* handler) = 0; virtual Status Accept(Handler* handler) = 0;
@ -106,8 +80,7 @@ class TraceExecutionResult : public TraceRecordResult {
}; };
// Result for operations that only return a single Status. // Result for operations that only return a single Status.
// Example operations: DB::Write(), Iterator::Seek() and // Example operation: DB::Write()
// Iterator::SeekForPrev().
class StatusOnlyTraceExecutionResult : public TraceExecutionResult { class StatusOnlyTraceExecutionResult : public TraceExecutionResult {
public: public:
StatusOnlyTraceExecutionResult(Status status, uint64_t start_timestamp, StatusOnlyTraceExecutionResult(Status status, uint64_t start_timestamp,
@ -138,7 +111,7 @@ class SingleValueTraceExecutionResult : public TraceExecutionResult {
virtual ~SingleValueTraceExecutionResult() override; virtual ~SingleValueTraceExecutionResult() override;
// Return status of DB::Get(), etc. // Return status of DB::Get().
virtual const Status& GetStatus() const; virtual const Status& GetStatus() const;
// Value for the searched key. // Value for the searched key.
@ -151,7 +124,7 @@ class SingleValueTraceExecutionResult : public TraceExecutionResult {
std::string value_; std::string value_;
}; };
// Result for operations that return multiple Status(es) and values. // Result for operations that return multiple Status(es) and values as vectors.
// Example operation: DB::MultiGet() // Example operation: DB::MultiGet()
class MultiValuesTraceExecutionResult : public TraceExecutionResult { class MultiValuesTraceExecutionResult : public TraceExecutionResult {
public: public:
@ -162,7 +135,7 @@ class MultiValuesTraceExecutionResult : public TraceExecutionResult {
virtual ~MultiValuesTraceExecutionResult() override; virtual ~MultiValuesTraceExecutionResult() override;
// Returned Status(es) of DB::MultiGet(), etc. // Returned Status(es) of DB::MultiGet().
virtual const std::vector<Status>& GetMultiStatus() const; virtual const std::vector<Status>& GetMultiStatus() const;
// Returned values for the searched keys. // Returned values for the searched keys.
@ -175,4 +148,40 @@ class MultiValuesTraceExecutionResult : public TraceExecutionResult {
std::vector<std::string> values_; std::vector<std::string> values_;
}; };
// Result for Iterator operations.
// Example operations: Iterator::Seek(), Iterator::SeekForPrev()
class IteratorTraceExecutionResult : public TraceExecutionResult {
public:
IteratorTraceExecutionResult(bool valid, Status status, PinnableSlice&& key,
PinnableSlice&& value, uint64_t start_timestamp,
uint64_t end_timestamp, TraceType trace_type);
IteratorTraceExecutionResult(bool valid, Status status,
const std::string& key, const std::string& value,
uint64_t start_timestamp, uint64_t end_timestamp,
TraceType trace_type);
virtual ~IteratorTraceExecutionResult() override;
// Return if the Iterator is valid.
virtual bool GetValid() const;
// Return the status of the Iterator.
virtual const Status& GetStatus() const;
// Key of the current iterating entry, empty if GetValid() is false.
virtual Slice GetKey() const;
// Value of the current iterating entry, empty if GetValid() is false.
virtual Slice GetValue() const;
virtual Status Accept(Handler* handler) override;
private:
bool valid_;
Status status_;
PinnableSlice key_;
PinnableSlice value_;
};
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE

View File

@ -47,6 +47,7 @@ PerfContext::PerfContext(const PerfContext& other) {
block_cache_filter_hit_count = other.block_cache_filter_hit_count; block_cache_filter_hit_count = other.block_cache_filter_hit_count;
filter_block_read_count = other.filter_block_read_count; filter_block_read_count = other.filter_block_read_count;
compression_dict_block_read_count = other.compression_dict_block_read_count; compression_dict_block_read_count = other.compression_dict_block_read_count;
secondary_cache_hit_count = other.secondary_cache_hit_count;
block_checksum_time = other.block_checksum_time; block_checksum_time = other.block_checksum_time;
block_decompress_time = other.block_decompress_time; block_decompress_time = other.block_decompress_time;
get_read_bytes = other.get_read_bytes; get_read_bytes = other.get_read_bytes;
@ -144,6 +145,7 @@ PerfContext::PerfContext(PerfContext&& other) noexcept {
block_cache_filter_hit_count = other.block_cache_filter_hit_count; block_cache_filter_hit_count = other.block_cache_filter_hit_count;
filter_block_read_count = other.filter_block_read_count; filter_block_read_count = other.filter_block_read_count;
compression_dict_block_read_count = other.compression_dict_block_read_count; compression_dict_block_read_count = other.compression_dict_block_read_count;
secondary_cache_hit_count = other.secondary_cache_hit_count;
block_checksum_time = other.block_checksum_time; block_checksum_time = other.block_checksum_time;
block_decompress_time = other.block_decompress_time; block_decompress_time = other.block_decompress_time;
get_read_bytes = other.get_read_bytes; get_read_bytes = other.get_read_bytes;
@ -243,6 +245,7 @@ PerfContext& PerfContext::operator=(const PerfContext& other) {
block_cache_filter_hit_count = other.block_cache_filter_hit_count; block_cache_filter_hit_count = other.block_cache_filter_hit_count;
filter_block_read_count = other.filter_block_read_count; filter_block_read_count = other.filter_block_read_count;
compression_dict_block_read_count = other.compression_dict_block_read_count; compression_dict_block_read_count = other.compression_dict_block_read_count;
secondary_cache_hit_count = other.secondary_cache_hit_count;
block_checksum_time = other.block_checksum_time; block_checksum_time = other.block_checksum_time;
block_decompress_time = other.block_decompress_time; block_decompress_time = other.block_decompress_time;
get_read_bytes = other.get_read_bytes; get_read_bytes = other.get_read_bytes;
@ -339,6 +342,7 @@ void PerfContext::Reset() {
block_cache_filter_hit_count = 0; block_cache_filter_hit_count = 0;
filter_block_read_count = 0; filter_block_read_count = 0;
compression_dict_block_read_count = 0; compression_dict_block_read_count = 0;
secondary_cache_hit_count = 0;
block_checksum_time = 0; block_checksum_time = 0;
block_decompress_time = 0; block_decompress_time = 0;
get_read_bytes = 0; get_read_bytes = 0;
@ -459,6 +463,7 @@ std::string PerfContext::ToString(bool exclude_zero_counters) const {
PERF_CONTEXT_OUTPUT(block_cache_filter_hit_count); PERF_CONTEXT_OUTPUT(block_cache_filter_hit_count);
PERF_CONTEXT_OUTPUT(filter_block_read_count); PERF_CONTEXT_OUTPUT(filter_block_read_count);
PERF_CONTEXT_OUTPUT(compression_dict_block_read_count); PERF_CONTEXT_OUTPUT(compression_dict_block_read_count);
PERF_CONTEXT_OUTPUT(secondary_cache_hit_count);
PERF_CONTEXT_OUTPUT(block_checksum_time); PERF_CONTEXT_OUTPUT(block_checksum_time);
PERF_CONTEXT_OUTPUT(block_decompress_time); PERF_CONTEXT_OUTPUT(block_decompress_time);
PERF_CONTEXT_OUTPUT(get_read_bytes); PERF_CONTEXT_OUTPUT(get_read_bytes);

View File

@ -1713,6 +1713,9 @@ void BlockBasedTable::RetrieveMultipleBlocks(
req_offset_for_block.emplace_back(0); req_offset_for_block.emplace_back(0);
} }
req_idx_for_block.emplace_back(read_reqs.size()); req_idx_for_block.emplace_back(read_reqs.size());
PERF_COUNTER_ADD(block_read_count, 1);
PERF_COUNTER_ADD(block_read_byte, block_size(handle));
} }
// Handle the last block and process the pending last request // Handle the last block and process the pending last request
if (prev_len != 0) { if (prev_len != 0) {

View File

@ -228,8 +228,16 @@ TEST_P(BlockBasedTableReaderTest, MultiGet) {
// Execute MultiGet. // Execute MultiGet.
MultiGetContext::Range range = ctx.GetMultiGetRange(); MultiGetContext::Range range = ctx.GetMultiGetRange();
PerfContext* perf_ctx = get_perf_context();
perf_ctx->Reset();
table->MultiGet(ReadOptions(), &range, nullptr); table->MultiGet(ReadOptions(), &range, nullptr);
ASSERT_GE(perf_ctx->block_read_count - perf_ctx->index_block_read_count -
perf_ctx->filter_block_read_count -
perf_ctx->compression_dict_block_read_count,
1);
ASSERT_GE(perf_ctx->block_read_byte, 1);
for (const Status& status : statuses) { for (const Status& status : statuses) {
ASSERT_OK(status); ASSERT_OK(status);
} }

View File

@ -82,6 +82,27 @@ Status GetQueryTraceRecord::Accept(Handler* handler,
IteratorQueryTraceRecord::IteratorQueryTraceRecord(uint64_t timestamp) IteratorQueryTraceRecord::IteratorQueryTraceRecord(uint64_t timestamp)
: QueryTraceRecord(timestamp) {} : QueryTraceRecord(timestamp) {}
IteratorQueryTraceRecord::IteratorQueryTraceRecord(PinnableSlice&& lower_bound,
PinnableSlice&& upper_bound,
uint64_t timestamp)
: QueryTraceRecord(timestamp),
lower_(std::move(lower_bound)),
upper_(std::move(upper_bound)) {}
IteratorQueryTraceRecord::IteratorQueryTraceRecord(
const std::string& lower_bound, const std::string& upper_bound,
uint64_t timestamp)
: QueryTraceRecord(timestamp) {
lower_.PinSelf(lower_bound);
upper_.PinSelf(upper_bound);
}
IteratorQueryTraceRecord::~IteratorQueryTraceRecord() {}
Slice IteratorQueryTraceRecord::GetLowerBound() const { return Slice(lower_); }
Slice IteratorQueryTraceRecord::GetUpperBound() const { return Slice(upper_); }
// IteratorSeekQueryTraceRecord // IteratorSeekQueryTraceRecord
IteratorSeekQueryTraceRecord::IteratorSeekQueryTraceRecord( IteratorSeekQueryTraceRecord::IteratorSeekQueryTraceRecord(
SeekType seek_type, uint32_t column_family_id, PinnableSlice&& key, SeekType seek_type, uint32_t column_family_id, PinnableSlice&& key,
@ -100,6 +121,26 @@ IteratorSeekQueryTraceRecord::IteratorSeekQueryTraceRecord(
key_.PinSelf(key); key_.PinSelf(key);
} }
IteratorSeekQueryTraceRecord::IteratorSeekQueryTraceRecord(
SeekType seek_type, uint32_t column_family_id, PinnableSlice&& key,
PinnableSlice&& lower_bound, PinnableSlice&& upper_bound,
uint64_t timestamp)
: IteratorQueryTraceRecord(std::move(lower_bound), std::move(upper_bound),
timestamp),
type_(seek_type),
cf_id_(column_family_id),
key_(std::move(key)) {}
IteratorSeekQueryTraceRecord::IteratorSeekQueryTraceRecord(
SeekType seek_type, uint32_t column_family_id, const std::string& key,
const std::string& lower_bound, const std::string& upper_bound,
uint64_t timestamp)
: IteratorQueryTraceRecord(lower_bound, upper_bound, timestamp),
type_(seek_type),
cf_id_(column_family_id) {
key_.PinSelf(key);
}
IteratorSeekQueryTraceRecord::~IteratorSeekQueryTraceRecord() { key_.clear(); } IteratorSeekQueryTraceRecord::~IteratorSeekQueryTraceRecord() { key_.clear(); }
TraceType IteratorSeekQueryTraceRecord::GetTraceType() const { TraceType IteratorSeekQueryTraceRecord::GetTraceType() const {

View File

@ -93,7 +93,16 @@ Status TraceExecutionHandler::Handle(
return Status::Corruption("Invalid Column Family ID."); return Status::Corruption("Invalid Column Family ID.");
} }
Iterator* single_iter = db_->NewIterator(read_opts_, it->second); ReadOptions r_opts = read_opts_;
Slice lower = record.GetLowerBound();
if (!lower.empty()) {
r_opts.iterate_lower_bound = &lower;
}
Slice upper = record.GetUpperBound();
if (!upper.empty()) {
r_opts.iterate_upper_bound = &upper;
}
Iterator* single_iter = db_->NewIterator(r_opts, it->second);
uint64_t start = clock_->NowMicros(); uint64_t start = clock_->NowMicros();
@ -111,12 +120,21 @@ Status TraceExecutionHandler::Handle(
uint64_t end = clock_->NowMicros(); uint64_t end = clock_->NowMicros();
Status s = single_iter->status(); Status s = single_iter->status();
delete single_iter;
if (s.ok() && result != nullptr) { if (s.ok() && result != nullptr) {
result->reset(new StatusOnlyTraceExecutionResult(s, start, end, if (single_iter->Valid()) {
PinnableSlice ps_key;
ps_key.PinSelf(single_iter->key());
PinnableSlice ps_value;
ps_value.PinSelf(single_iter->value());
result->reset(new IteratorTraceExecutionResult(
true, s, std::move(ps_key), std::move(ps_value), start, end,
record.GetTraceType())); record.GetTraceType()));
} else {
result->reset(new IteratorTraceExecutionResult(
false, s, "", "", start, end, record.GetTraceType()));
} }
}
delete single_iter;
return s; return s;
} }

View File

@ -103,4 +103,44 @@ Status MultiValuesTraceExecutionResult::Accept(Handler* handler) {
return handler->Handle(*this); return handler->Handle(*this);
} }
// IteratorTraceExecutionResult
IteratorTraceExecutionResult::IteratorTraceExecutionResult(
bool valid, Status status, PinnableSlice&& key, PinnableSlice&& value,
uint64_t start_timestamp, uint64_t end_timestamp, TraceType trace_type)
: TraceExecutionResult(start_timestamp, end_timestamp, trace_type),
valid_(valid),
status_(std::move(status)),
key_(std::move(key)),
value_(std::move(value)) {}
IteratorTraceExecutionResult::IteratorTraceExecutionResult(
bool valid, Status status, const std::string& key, const std::string& value,
uint64_t start_timestamp, uint64_t end_timestamp, TraceType trace_type)
: TraceExecutionResult(start_timestamp, end_timestamp, trace_type),
valid_(valid),
status_(std::move(status)) {
key_.PinSelf(key);
value_.PinSelf(value);
}
IteratorTraceExecutionResult::~IteratorTraceExecutionResult() {
key_.clear();
value_.clear();
}
bool IteratorTraceExecutionResult::GetValid() const { return valid_; }
const Status& IteratorTraceExecutionResult::GetStatus() const {
return status_;
}
Slice IteratorTraceExecutionResult::GetKey() const { return Slice(key_); }
Slice IteratorTraceExecutionResult::GetValue() const { return Slice(value_); }
Status IteratorTraceExecutionResult::Accept(Handler* handler) {
assert(handler != nullptr);
return handler->Handle(*this);
}
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE

View File

@ -225,14 +225,12 @@ Status TracerHelper::DecodeIterRecord(Trace* trace, int trace_file_version,
uint32_t cf_id = 0; uint32_t cf_id = 0;
Slice iter_key; Slice iter_key;
Slice lower_bound;
Slice upper_bound;
if (trace_file_version < 2) { if (trace_file_version < 2) {
DecodeCFAndKey(trace->payload, &cf_id, &iter_key); DecodeCFAndKey(trace->payload, &cf_id, &iter_key);
} else { } else {
// Are these two used anywhere?
Slice lower_bound;
Slice upper_bound;
Slice buf(trace->payload); Slice buf(trace->payload);
GetFixed64(&buf, &trace->payload_map); GetFixed64(&buf, &trace->payload_map);
int64_t payload_map = static_cast<int64_t>(trace->payload_map); int64_t payload_map = static_cast<int64_t>(trace->payload_map);
@ -264,9 +262,14 @@ Status TracerHelper::DecodeIterRecord(Trace* trace, int trace_file_version,
if (record != nullptr) { if (record != nullptr) {
PinnableSlice ps_key; PinnableSlice ps_key;
ps_key.PinSelf(iter_key); ps_key.PinSelf(iter_key);
PinnableSlice ps_lower;
ps_lower.PinSelf(lower_bound);
PinnableSlice ps_upper;
ps_upper.PinSelf(upper_bound);
record->reset(new IteratorSeekQueryTraceRecord( record->reset(new IteratorSeekQueryTraceRecord(
static_cast<IteratorSeekQueryTraceRecord::SeekType>(trace->type), cf_id, static_cast<IteratorSeekQueryTraceRecord::SeekType>(trace->type), cf_id,
std::move(ps_key), trace->ts)); std::move(ps_key), std::move(ps_lower), std::move(ps_upper),
trace->ts));
} }
return Status::OK(); return Status::OK();