Trace MultiGet Keys and CF_IDs to the trace file (#8421)
Summary: Tracing the MultiGet information including timestamp, keys, and CF_IDs to the trace file for analyzing and replay. Pull Request resolved: https://github.com/facebook/rocksdb/pull/8421 Test Plan: make check, add test to trace_analyzer_test Reviewed By: anand1976 Differential Revision: D29221195 Pulled By: zhichao-cao fbshipit-source-id: 30c677d6c39ab31ef4bbdf7e0d1fa1fd79f295ff
This commit is contained in:
parent
d96b0127a0
commit
82a70e1470
@ -1880,6 +1880,16 @@ std::vector<Status> DBImpl::MultiGet(
|
||||
}
|
||||
#endif // NDEBUG
|
||||
|
||||
if (tracer_) {
|
||||
// TODO: This mutex should be removed later, to improve performance when
|
||||
// tracing is enabled.
|
||||
InstrumentedMutexLock lock(&trace_mutex_);
|
||||
if (tracer_) {
|
||||
// TODO: maybe handle the tracing status?
|
||||
tracer_->MultiGet(column_family, keys).PermitUncheckedError();
|
||||
}
|
||||
}
|
||||
|
||||
SequenceNumber consistent_seqnum;
|
||||
|
||||
std::unordered_map<uint32_t, MultiGetColumnFamilyData> multiget_cf_data(
|
||||
@ -2191,6 +2201,16 @@ void DBImpl::MultiGet(const ReadOptions& read_options, const size_t num_keys,
|
||||
}
|
||||
#endif // NDEBUG
|
||||
|
||||
if (tracer_) {
|
||||
// TODO: This mutex should be removed later, to improve performance when
|
||||
// tracing is enabled.
|
||||
InstrumentedMutexLock lock(&trace_mutex_);
|
||||
if (tracer_) {
|
||||
// TODO: maybe handle the tracing status?
|
||||
tracer_->MultiGet(num_keys, column_families, keys).PermitUncheckedError();
|
||||
}
|
||||
}
|
||||
|
||||
autovector<KeyContext, MultiGetContext::MAX_BATCH_SIZE> key_context;
|
||||
autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE> sorted_keys;
|
||||
sorted_keys.resize(num_keys);
|
||||
@ -2353,6 +2373,15 @@ void DBImpl::MultiGet(const ReadOptions& read_options,
|
||||
const Slice* keys, PinnableSlice* values,
|
||||
std::string* timestamps, Status* statuses,
|
||||
const bool sorted_input) {
|
||||
if (tracer_) {
|
||||
// TODO: This mutex should be removed later, to improve performance when
|
||||
// tracing is enabled.
|
||||
InstrumentedMutexLock lock(&trace_mutex_);
|
||||
if (tracer_) {
|
||||
// TODO: maybe handle the tracing status?
|
||||
tracer_->MultiGet(num_keys, column_family, keys).PermitUncheckedError();
|
||||
}
|
||||
}
|
||||
autovector<KeyContext, MultiGetContext::MAX_BATCH_SIZE> key_context;
|
||||
autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE> sorted_keys;
|
||||
sorted_keys.resize(num_keys);
|
||||
|
@ -81,8 +81,26 @@ class TraceAnalyzerTest : public testing::Test {
|
||||
ASSERT_OK(batch.SingleDelete("d"));
|
||||
ASSERT_OK(batch.DeleteRange("e", "f"));
|
||||
ASSERT_OK(db_->Write(wo, &batch));
|
||||
|
||||
std::vector<Slice> keys;
|
||||
keys.push_back("a");
|
||||
keys.push_back("b");
|
||||
keys.push_back("df");
|
||||
keys.push_back("gege");
|
||||
keys.push_back("hjhjhj");
|
||||
std::vector<std::string> values;
|
||||
std::vector<Status> ss = db_->MultiGet(ro, keys, &values);
|
||||
ASSERT_GE(ss.size(), 0);
|
||||
ASSERT_OK(ss[0]);
|
||||
ASSERT_NOK(ss[2]);
|
||||
std::vector<ColumnFamilyHandle*> cfs(2, db_->DefaultColumnFamily());
|
||||
std::vector<PinnableSlice> values2(keys.size());
|
||||
db_->MultiGet(ro, 2, cfs.data(), keys.data(), values2.data(), ss.data(),
|
||||
false);
|
||||
ASSERT_OK(ss[0]);
|
||||
db_->MultiGet(ro, db_->DefaultColumnFamily(), 2, keys.data() + 3,
|
||||
values2.data(), ss.data(), false);
|
||||
ASSERT_OK(db_->Get(ro, "a", &value));
|
||||
|
||||
single_iter = db_->NewIterator(ro);
|
||||
single_iter->Seek("a");
|
||||
ASSERT_OK(single_iter->status());
|
||||
|
@ -524,6 +524,10 @@ Status TraceAnalyzer::StartProcessing() {
|
||||
fprintf(stderr, "Cannot process the iterator in the trace\n");
|
||||
return s;
|
||||
}
|
||||
} else if (trace.type == kTraceMultiGet) {
|
||||
MultiGetPayload multiget_payload;
|
||||
assert(trace_file_version_ >= 2);
|
||||
TracerHelper::DecodeMultiGetPayload(&trace, &multiget_payload);
|
||||
} else if (trace.type == kTraceEnd) {
|
||||
break;
|
||||
}
|
||||
|
@ -185,6 +185,47 @@ void TracerHelper::DecodeIterPayload(Trace* trace, IterPayload* iter_payload) {
|
||||
}
|
||||
}
|
||||
|
||||
void TracerHelper::DecodeMultiGetPayload(Trace* trace,
|
||||
MultiGetPayload* multiget_payload) {
|
||||
assert(multiget_payload != nullptr);
|
||||
Slice cfids_payload;
|
||||
Slice keys_payload;
|
||||
Slice buf(trace->payload);
|
||||
GetFixed64(&buf, &trace->payload_map);
|
||||
int64_t payload_map = static_cast<int64_t>(trace->payload_map);
|
||||
while (payload_map) {
|
||||
// Find the rightmost set bit.
|
||||
uint32_t set_pos = static_cast<uint32_t>(log2(payload_map & -payload_map));
|
||||
switch (set_pos) {
|
||||
case TracePayloadType::kMultiGetSize:
|
||||
GetFixed32(&buf, &(multiget_payload->multiget_size));
|
||||
break;
|
||||
case TracePayloadType::kMultiGetCFIDs:
|
||||
GetLengthPrefixedSlice(&buf, &cfids_payload);
|
||||
break;
|
||||
case TracePayloadType::kMultiGetKeys:
|
||||
GetLengthPrefixedSlice(&buf, &keys_payload);
|
||||
break;
|
||||
default:
|
||||
assert(false);
|
||||
}
|
||||
// unset the rightmost bit.
|
||||
payload_map &= (payload_map - 1);
|
||||
}
|
||||
|
||||
// Decode the cfids_payload and keys_payload
|
||||
multiget_payload->cf_ids.reserve(multiget_payload->multiget_size);
|
||||
multiget_payload->multiget_keys.reserve(multiget_payload->multiget_size);
|
||||
for (uint32_t i = 0; i < multiget_payload->multiget_size; i++) {
|
||||
uint32_t tmp_cfid;
|
||||
Slice tmp_key;
|
||||
GetFixed32(&cfids_payload, &tmp_cfid);
|
||||
GetLengthPrefixedSlice(&keys_payload, &tmp_key);
|
||||
multiget_payload->cf_ids.push_back(tmp_cfid);
|
||||
multiget_payload->multiget_keys.push_back(tmp_key.ToString());
|
||||
}
|
||||
}
|
||||
|
||||
Tracer::Tracer(SystemClock* clock, const TraceOptions& trace_options,
|
||||
std::unique_ptr<TraceWriter>&& trace_writer)
|
||||
: clock_(clock),
|
||||
@ -302,6 +343,77 @@ Status Tracer::IteratorSeekForPrev(const uint32_t& cf_id, const Slice& key,
|
||||
return WriteTrace(trace);
|
||||
}
|
||||
|
||||
Status Tracer::MultiGet(const size_t num_keys,
|
||||
ColumnFamilyHandle** column_families,
|
||||
const Slice* keys) {
|
||||
if (num_keys == 0) {
|
||||
return Status::OK();
|
||||
}
|
||||
std::vector<ColumnFamilyHandle*> v_column_families;
|
||||
std::vector<Slice> v_keys;
|
||||
v_column_families.resize(num_keys);
|
||||
v_keys.resize(num_keys);
|
||||
for (size_t i = 0; i < num_keys; i++) {
|
||||
v_column_families[i] = column_families[i];
|
||||
v_keys[i] = keys[i];
|
||||
}
|
||||
return MultiGet(v_column_families, v_keys);
|
||||
}
|
||||
|
||||
Status Tracer::MultiGet(const size_t num_keys,
|
||||
ColumnFamilyHandle* column_family, const Slice* keys) {
|
||||
if (num_keys == 0) {
|
||||
return Status::OK();
|
||||
}
|
||||
std::vector<ColumnFamilyHandle*> column_families;
|
||||
std::vector<Slice> v_keys;
|
||||
column_families.resize(num_keys);
|
||||
v_keys.resize(num_keys);
|
||||
for (size_t i = 0; i < num_keys; i++) {
|
||||
column_families[i] = column_family;
|
||||
v_keys[i] = keys[i];
|
||||
}
|
||||
return MultiGet(column_families, v_keys);
|
||||
}
|
||||
|
||||
Status Tracer::MultiGet(const std::vector<ColumnFamilyHandle*>& column_families,
|
||||
const std::vector<Slice>& keys) {
|
||||
if (column_families.size() != keys.size()) {
|
||||
return Status::Corruption("the CFs size and keys size does not match!");
|
||||
}
|
||||
TraceType trace_type = kTraceMultiGet;
|
||||
if (ShouldSkipTrace(trace_type)) {
|
||||
return Status::OK();
|
||||
}
|
||||
uint32_t multiget_size = static_cast<uint32_t>(keys.size());
|
||||
Trace trace;
|
||||
trace.ts = clock_->NowMicros();
|
||||
trace.type = trace_type;
|
||||
// Set the payloadmap of the struct member that will be encoded in the
|
||||
// payload.
|
||||
TracerHelper::SetPayloadMap(trace.payload_map,
|
||||
TracePayloadType::kMultiGetSize);
|
||||
TracerHelper::SetPayloadMap(trace.payload_map,
|
||||
TracePayloadType::kMultiGetCFIDs);
|
||||
TracerHelper::SetPayloadMap(trace.payload_map,
|
||||
TracePayloadType::kMultiGetKeys);
|
||||
// Encode the CFIDs inorder
|
||||
std::string cfids_payload;
|
||||
std::string keys_payload;
|
||||
for (uint32_t i = 0; i < multiget_size; i++) {
|
||||
assert(i < column_families.size());
|
||||
assert(i < keys.size());
|
||||
PutFixed32(&cfids_payload, column_families[i]->GetID());
|
||||
PutLengthPrefixedSlice(&keys_payload, keys[i]);
|
||||
}
|
||||
// Encode the Get struct members into payload. Make sure add them in order.
|
||||
PutFixed64(&trace.payload, trace.payload_map);
|
||||
PutFixed32(&trace.payload, multiget_size);
|
||||
PutLengthPrefixedSlice(&trace.payload, cfids_payload);
|
||||
PutLengthPrefixedSlice(&trace.payload, keys_payload);
|
||||
return WriteTrace(trace);
|
||||
}
|
||||
|
||||
bool Tracer::ShouldSkipTrace(const TraceType& trace_type) {
|
||||
if (IsTraceFileOverMax()) {
|
||||
return true;
|
||||
|
@ -59,6 +59,8 @@ enum TraceType : char {
|
||||
kBlockTraceRangeDeletionBlock = 11,
|
||||
// For IOTracing.
|
||||
kIOTracer = 12,
|
||||
// For query tracing
|
||||
kTraceMultiGet = 13,
|
||||
// All trace types should be added before kTraceMax
|
||||
kTraceMax,
|
||||
};
|
||||
@ -98,6 +100,9 @@ enum TracePayloadType : char {
|
||||
kIterKey = 5,
|
||||
kIterLowerBound = 6,
|
||||
kIterUpperBound = 7,
|
||||
kMultiGetSize = 8,
|
||||
kMultiGetCFIDs = 9,
|
||||
kMultiGetKeys = 10,
|
||||
};
|
||||
|
||||
struct WritePayload {
|
||||
@ -116,6 +121,12 @@ struct IterPayload {
|
||||
Slice upper_bound;
|
||||
};
|
||||
|
||||
struct MultiGetPayload {
|
||||
uint32_t multiget_size;
|
||||
std::vector<uint32_t> cf_ids;
|
||||
std::vector<std::string> multiget_keys;
|
||||
};
|
||||
|
||||
class TracerHelper {
|
||||
public:
|
||||
// Parse the string with major and minor version only
|
||||
@ -143,6 +154,10 @@ class TracerHelper {
|
||||
|
||||
// Decode the iter payload and store in WrteiPayload
|
||||
static void DecodeIterPayload(Trace* trace, IterPayload* iter_payload);
|
||||
|
||||
// Decode the multiget payload and store in MultiGetPayload
|
||||
static void DecodeMultiGetPayload(Trace* trace,
|
||||
MultiGetPayload* multiget_payload);
|
||||
};
|
||||
|
||||
// Tracer captures all RocksDB operations using a user-provided TraceWriter.
|
||||
@ -166,6 +181,17 @@ class Tracer {
|
||||
Status IteratorSeekForPrev(const uint32_t& cf_id, const Slice& key,
|
||||
const Slice& lower_bound, const Slice upper_bound);
|
||||
|
||||
// Trace MultiGet
|
||||
|
||||
Status MultiGet(const size_t num_keys, ColumnFamilyHandle** column_families,
|
||||
const Slice* keys);
|
||||
|
||||
Status MultiGet(const size_t num_keys, ColumnFamilyHandle* column_family,
|
||||
const Slice* keys);
|
||||
|
||||
Status MultiGet(const std::vector<ColumnFamilyHandle*>& column_family,
|
||||
const std::vector<Slice>& keys);
|
||||
|
||||
// Returns true if the trace is over the configured max trace file limit.
|
||||
// False otherwise.
|
||||
bool IsTraceFileOverMax();
|
||||
|
Loading…
Reference in New Issue
Block a user