Introduce a new trace file format (v 0.2) for better extension (#7977)

Summary:
The trace file record and payload encode is fixed, which requires complex backward compatibility resolving. This PR introduce a new trace file format, which makes it easier to add new entries to the payload and does not have backward compatible issues. V 0.1 is still supported in this PR. Added the tracing for lower_bound and upper_bound for iterator.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/7977

Test Plan: make check. tested with old trace file in replay and analyzing.

Reviewed By: anand1976

Differential Revision: D26529948

Pulled By: zhichao-cao

fbshipit-source-id: ebb75a127ce3c07c25a1ccc194c551f917896a76
This commit is contained in:
Zhichao Cao 2021-02-18 23:03:49 -08:00 committed by Facebook GitHub Bot
parent c9878baa87
commit b0fd1cc45a
9 changed files with 481 additions and 94 deletions

View File

@ -2,6 +2,7 @@
## Unreleased
### Behavior Changes
* When retryable IO error occurs during compaction, it is mapped to soft error and set the BG error. However, auto resume is not called to clean the soft error since compaction will reschedule by itself. In this change, When retryable IO error occurs during compaction, BG error is not set. User will be informed the error via EventHelper.
* Introduce a new trace file format for query tracing and replay and trace file version is bump up to 0.2. A payload map is added as the first portion of the payload. We will not have backward compatible issues when adding new entries to trace records. Added the iterator_upper_bound and iterator_lower_bound in Seek and SeekForPrev tracing function. Added them as the new payload member for iterator tracing.
### New Features
* Add support for key-value integrity protection in live updates from the user buffers provided to `WriteBatch` through the write to RocksDB's in-memory update buffer (memtable). This is intended to detect some cases of in-memory data corruption, due to either software or hardware errors. Users can enable protection by constructing their `WriteBatch` with `protection_bytes_per_key == 8`.

View File

@ -4975,24 +4975,27 @@ Status DBImpl::EndBlockCacheTrace() {
return Status::OK();
}
Status DBImpl::TraceIteratorSeek(const uint32_t& cf_id, const Slice& key) {
Status DBImpl::TraceIteratorSeek(const uint32_t& cf_id, const Slice& key,
const Slice& lower_bound,
const Slice upper_bound) {
Status s;
if (tracer_) {
InstrumentedMutexLock lock(&trace_mutex_);
if (tracer_) {
s = tracer_->IteratorSeek(cf_id, key);
s = tracer_->IteratorSeek(cf_id, key, lower_bound, upper_bound);
}
}
return s;
}
Status DBImpl::TraceIteratorSeekForPrev(const uint32_t& cf_id,
const Slice& key) {
Status DBImpl::TraceIteratorSeekForPrev(const uint32_t& cf_id, const Slice& key,
const Slice& lower_bound,
const Slice upper_bound) {
Status s;
if (tracer_) {
InstrumentedMutexLock lock(&trace_mutex_);
if (tracer_) {
s = tracer_->IteratorSeekForPrev(cf_id, key);
s = tracer_->IteratorSeekForPrev(cf_id, key, lower_bound, upper_bound);
}
}
return s;

View File

@ -592,8 +592,11 @@ class DBImpl : public DB {
bool* found_record_for_key,
bool* is_blob_index = nullptr);
Status TraceIteratorSeek(const uint32_t& cf_id, const Slice& key);
Status TraceIteratorSeekForPrev(const uint32_t& cf_id, const Slice& key);
Status TraceIteratorSeek(const uint32_t& cf_id, const Slice& key,
const Slice& lower_bound, const Slice upper_bound);
Status TraceIteratorSeekForPrev(const uint32_t& cf_id, const Slice& key,
const Slice& lower_bound,
const Slice upper_bound);
#endif // ROCKSDB_LITE
// Similar to GetSnapshot(), but also lets the db know that this snapshot

View File

@ -1257,7 +1257,19 @@ void DBIter::Seek(const Slice& target) {
#ifndef ROCKSDB_LITE
if (db_impl_ != nullptr && cfd_ != nullptr) {
// TODO: What do we do if this returns an error?
db_impl_->TraceIteratorSeek(cfd_->GetID(), target).PermitUncheckedError();
Slice lower_bound, upper_bound;
if (iterate_lower_bound_ != nullptr) {
lower_bound = *iterate_lower_bound_;
} else {
lower_bound = Slice("");
}
if (iterate_upper_bound_ != nullptr) {
upper_bound = *iterate_upper_bound_;
} else {
upper_bound = Slice("");
}
db_impl_->TraceIteratorSeek(cfd_->GetID(), target, lower_bound, upper_bound)
.PermitUncheckedError();
}
#endif // ROCKSDB_LITE
@ -1319,7 +1331,20 @@ void DBIter::SeekForPrev(const Slice& target) {
#ifndef ROCKSDB_LITE
if (db_impl_ != nullptr && cfd_ != nullptr) {
// TODO: What do we do if this returns an error?
db_impl_->TraceIteratorSeekForPrev(cfd_->GetID(), target)
Slice lower_bound, upper_bound;
if (iterate_lower_bound_ != nullptr) {
lower_bound = *iterate_lower_bound_;
} else {
lower_bound = Slice("");
}
if (iterate_upper_bound_ != nullptr) {
upper_bound = *iterate_upper_bound_;
} else {
upper_bound = Slice("");
}
db_impl_
->TraceIteratorSeekForPrev(cfd_->GetID(), target, lower_bound,
upper_bound)
.PermitUncheckedError();
}
#endif // ROCKSDB_LITE

View File

@ -57,7 +57,11 @@ class TraceAnalyzerTest : public testing::Test {
Options options;
options.create_if_missing = true;
options.merge_operator = MergeOperators::CreatePutOperator();
Slice upper_bound("a");
Slice lower_bound("abce");
ReadOptions ro;
ro.iterate_upper_bound = &upper_bound;
ro.iterate_lower_bound = &lower_bound;
WriteOptions wo;
TraceOptions trace_opt;
DB* db_ = nullptr;

View File

@ -283,6 +283,8 @@ TraceAnalyzer::TraceAnalyzer(std::string& trace_path, std::string& output_path,
end_time_ = 0;
time_series_start_ = 0;
cur_time_sec_ = 0;
// Set the default trace file version as version 0.2
trace_file_version_ = 2;
if (FLAGS_sample_ratio > 1.0 || FLAGS_sample_ratio <= 0) {
sample_max_ = 1;
} else {
@ -389,10 +391,15 @@ Status TraceAnalyzer::PrepareProcessing() {
Status TraceAnalyzer::ReadTraceHeader(Trace* header) {
assert(header != nullptr);
Status s = ReadTraceRecord(header);
std::string encoded_trace;
// Read the trace head
Status s = trace_reader_->Read(&encoded_trace);
if (!s.ok()) {
return s;
}
s = TracerHelper::DecodeTrace(encoded_trace, header);
if (header->type != kTraceBegin) {
return Status::Corruption("Corrupted trace file. Incorrect header.");
}
@ -422,13 +429,7 @@ Status TraceAnalyzer::ReadTraceRecord(Trace* trace) {
if (!s.ok()) {
return s;
}
Slice enc_slice = Slice(encoded_trace);
GetFixed64(&enc_slice, &trace->ts);
trace->type = static_cast<TraceType>(enc_slice[0]);
enc_slice.remove_prefix(kTraceTypeSize + kTracePayloadLengthSize);
trace->payload = enc_slice.ToString();
return s;
return TracerHelper::DecodeTrace(encoded_trace, trace);
}
// process the trace itself and redirect the trace content
@ -442,6 +443,11 @@ Status TraceAnalyzer::StartProcessing() {
fprintf(stderr, "Cannot read the header\n");
return s;
}
s = TracerHelper::ParseTraceHeader(header, &trace_file_version_,
&db_version_);
if (!s.ok()) {
return s;
}
trace_create_time_ = header.ts;
if (FLAGS_output_time_series) {
time_series_start_ = header.ts;
@ -460,14 +466,22 @@ Status TraceAnalyzer::StartProcessing() {
if (trace.type == kTraceWrite) {
total_writes_++;
c_time_ = trace.ts;
WriteBatch batch(trace.payload);
Slice batch_data;
if (trace_file_version_ < 2) {
Slice tmp_data(trace.payload);
batch_data = tmp_data;
} else {
WritePayload w_payload;
TracerHelper::DecodeWritePayload(&trace, &w_payload);
batch_data = w_payload.write_batch_data;
}
// Note that, if the write happens in a transaction,
// 'Write' will be called twice, one for Prepare, one for
// Commit. Thus, in the trace, for the same WriteBatch, there
// will be two reords if it is in a transaction. Here, we only
// process the reord that is committed. If write is non-transaction,
// HasBeginPrepare()==false, so we process it normally.
WriteBatch batch(batch_data.ToString());
if (batch.HasBeginPrepare() && !batch.HasCommit()) {
continue;
}
@ -478,22 +492,34 @@ Status TraceAnalyzer::StartProcessing() {
return s;
}
} else if (trace.type == kTraceGet) {
uint32_t cf_id = 0;
Slice key;
DecodeCFAndKeyFromString(trace.payload, &cf_id, &key);
GetPayload get_payload;
get_payload.get_key = 0;
if (trace_file_version_ < 2) {
DecodeCFAndKeyFromString(trace.payload, &get_payload.cf_id,
&get_payload.get_key);
} else {
TracerHelper::DecodeGetPayload(&trace, &get_payload);
}
total_gets_++;
s = HandleGet(cf_id, key.ToString(), trace.ts, 1);
s = HandleGet(get_payload.cf_id, get_payload.get_key.ToString(), trace.ts,
1);
if (!s.ok()) {
fprintf(stderr, "Cannot process the get in the trace\n");
return s;
}
} else if (trace.type == kTraceIteratorSeek ||
trace.type == kTraceIteratorSeekForPrev) {
uint32_t cf_id = 0;
Slice key;
DecodeCFAndKeyFromString(trace.payload, &cf_id, &key);
s = HandleIter(cf_id, key.ToString(), trace.ts, trace.type);
IterPayload iter_payload;
iter_payload.cf_id = 0;
if (trace_file_version_ < 2) {
DecodeCFAndKeyFromString(trace.payload, &iter_payload.cf_id,
&iter_payload.iter_key);
} else {
TracerHelper::DecodeIterPayload(&trace, &iter_payload);
}
s = HandleIter(iter_payload.cf_id, iter_payload.iter_key.ToString(),
trace.ts, trace.type);
if (!s.ok()) {
fprintf(stderr, "Cannot process the iterator in the trace\n");
return s;

View File

@ -249,6 +249,9 @@ class TraceAnalyzer {
Status MakeStatisticKeyStatsOrPrefix(TraceStats& stats);
Status MakeStatisticCorrelation(TraceStats& stats, StatsUnit& unit);
Status MakeStatisticQPS();
// Set the default trace file version as version 0.2
int trace_file_version_;
int db_version_;
};
// write bach handler to be used for WriteBache iterator

View File

@ -25,11 +25,6 @@ namespace ROCKSDB_NAMESPACE {
const std::string kTraceMagic = "feedcafedeadbeef";
namespace {
void EncodeCFAndKey(std::string* dst, uint32_t cf_id, const Slice& key) {
PutFixed32(dst, cf_id);
PutLengthPrefixedSlice(dst, key);
}
void DecodeCFAndKey(std::string& buffer, uint32_t* cf_id, Slice* key) {
Slice buf(buffer);
GetFixed32(&buf, cf_id);
@ -37,6 +32,54 @@ void DecodeCFAndKey(std::string& buffer, uint32_t* cf_id, Slice* key) {
}
} // namespace
Status TracerHelper::ParseVersionStr(std::string& v_string, int* v_num) {
if (v_string.find_first_of('.') == std::string::npos ||
v_string.find_first_of('.') != v_string.find_last_of('.')) {
return Status::Corruption(
"Corrupted trace file. Incorrect version format.");
}
int tmp_num = 0;
for (int i = 0; i < static_cast<int>(v_string.size()); i++) {
if (v_string[i] == '.') {
continue;
} else if (isdigit(v_string[i])) {
tmp_num = tmp_num * 10 + (v_string[i] - '0');
} else {
return Status::Corruption(
"Corrupted trace file. Incorrect version format");
}
}
*v_num = tmp_num;
return Status::OK();
}
Status TracerHelper::ParseTraceHeader(const Trace& header, int* trace_version,
int* db_version) {
std::vector<std::string> s_vec;
int begin = 0, end;
for (int i = 0; i < 3; i++) {
assert(header.payload.find("\t", begin) != std::string::npos);
end = static_cast<int>(header.payload.find("\t", begin));
s_vec.push_back(header.payload.substr(begin, end - begin));
begin = end + 1;
}
std::string t_v_str, db_v_str;
assert(s_vec.size() == 3);
assert(s_vec[1].find("Trace Version: ") != std::string::npos);
t_v_str = s_vec[1].substr(15);
assert(s_vec[2].find("RocksDB Version: ") != std::string::npos);
db_v_str = s_vec[2].substr(17);
Status s;
s = ParseVersionStr(t_v_str, trace_version);
if (s != Status::OK()) {
return s;
}
s = ParseVersionStr(db_v_str, db_version);
return s;
}
void TracerHelper::EncodeTrace(const Trace& trace, std::string* encoded_trace) {
assert(encoded_trace);
PutFixed64(encoded_trace, trace.ts);
@ -61,6 +104,87 @@ Status TracerHelper::DecodeTrace(const std::string& encoded_trace,
return Status::OK();
}
bool TracerHelper::SetPayloadMap(uint64_t& payload_map,
const TracePayloadType payload_type) {
uint64_t old_state = payload_map;
uint64_t tmp = 1;
payload_map |= (tmp << payload_type);
return old_state != payload_map;
}
void TracerHelper::DecodeWritePayload(Trace* trace,
WritePayload* write_payload) {
assert(write_payload != nullptr);
Slice buf(trace->payload);
GetFixed64(&buf, &trace->payload_map);
int64_t payload_map = static_cast<int64_t>(trace->payload_map);
while (payload_map) {
// Find the rightmost set bit.
uint32_t set_pos = static_cast<uint32_t>(log2(payload_map & -payload_map));
switch (set_pos) {
case TracePayloadType::kWriteBatchData:
GetLengthPrefixedSlice(&buf, &(write_payload->write_batch_data));
break;
default:
assert(false);
}
// unset the rightmost bit.
payload_map &= (payload_map - 1);
}
}
void TracerHelper::DecodeGetPayload(Trace* trace, GetPayload* get_payload) {
assert(get_payload != nullptr);
Slice buf(trace->payload);
GetFixed64(&buf, &trace->payload_map);
int64_t payload_map = static_cast<int64_t>(trace->payload_map);
while (payload_map) {
// Find the rightmost set bit.
uint32_t set_pos = static_cast<uint32_t>(log2(payload_map & -payload_map));
switch (set_pos) {
case TracePayloadType::kGetCFID:
GetFixed32(&buf, &(get_payload->cf_id));
break;
case TracePayloadType::kGetKey:
GetLengthPrefixedSlice(&buf, &(get_payload->get_key));
break;
default:
assert(false);
}
// unset the rightmost bit.
payload_map &= (payload_map - 1);
}
}
void TracerHelper::DecodeIterPayload(Trace* trace, IterPayload* iter_payload) {
assert(iter_payload != nullptr);
Slice buf(trace->payload);
GetFixed64(&buf, &trace->payload_map);
int64_t payload_map = static_cast<int64_t>(trace->payload_map);
while (payload_map) {
// Find the rightmost set bit.
uint32_t set_pos = static_cast<uint32_t>(log2(payload_map & -payload_map));
switch (set_pos) {
case TracePayloadType::kIterCFID:
GetFixed32(&buf, &(iter_payload->cf_id));
break;
case TracePayloadType::kIterKey:
GetLengthPrefixedSlice(&buf, &(iter_payload->iter_key));
break;
case TracePayloadType::kIterLowerBound:
GetLengthPrefixedSlice(&buf, &(iter_payload->lower_bound));
break;
case TracePayloadType::kIterUpperBound:
GetLengthPrefixedSlice(&buf, &(iter_payload->upper_bound));
break;
default:
assert(false);
}
// unset the rightmost bit.
payload_map &= (payload_map - 1);
}
}
Tracer::Tracer(const std::shared_ptr<SystemClock>& clock,
const TraceOptions& trace_options,
std::unique_ptr<TraceWriter>&& trace_writer)
@ -82,7 +206,10 @@ Status Tracer::Write(WriteBatch* write_batch) {
Trace trace;
trace.ts = clock_->NowMicros();
trace.type = trace_type;
trace.payload = write_batch->Data();
TracerHelper::SetPayloadMap(trace.payload_map,
TracePayloadType::kWriteBatchData);
PutFixed64(&trace.payload, trace.payload_map);
PutLengthPrefixedSlice(&trace.payload, Slice(write_batch->Data()));
return WriteTrace(trace);
}
@ -94,11 +221,19 @@ Status Tracer::Get(ColumnFamilyHandle* column_family, const Slice& key) {
Trace trace;
trace.ts = clock_->NowMicros();
trace.type = trace_type;
EncodeCFAndKey(&trace.payload, column_family->GetID(), key);
// Set the payloadmap of the struct member that will be encoded in the
// payload.
TracerHelper::SetPayloadMap(trace.payload_map, TracePayloadType::kGetCFID);
TracerHelper::SetPayloadMap(trace.payload_map, TracePayloadType::kGetKey);
// Encode the Get struct members into payload. Make sure add them in order.
PutFixed64(&trace.payload, trace.payload_map);
PutFixed32(&trace.payload, column_family->GetID());
PutLengthPrefixedSlice(&trace.payload, key);
return WriteTrace(trace);
}
Status Tracer::IteratorSeek(const uint32_t& cf_id, const Slice& key) {
Status Tracer::IteratorSeek(const uint32_t& cf_id, const Slice& key,
const Slice& lower_bound, const Slice upper_bound) {
TraceType trace_type = kTraceIteratorSeek;
if (ShouldSkipTrace(trace_type)) {
return Status::OK();
@ -106,11 +241,35 @@ Status Tracer::IteratorSeek(const uint32_t& cf_id, const Slice& key) {
Trace trace;
trace.ts = clock_->NowMicros();
trace.type = trace_type;
EncodeCFAndKey(&trace.payload, cf_id, key);
// Set the payloadmap of the struct member that will be encoded in the
// payload.
TracerHelper::SetPayloadMap(trace.payload_map, TracePayloadType::kIterCFID);
TracerHelper::SetPayloadMap(trace.payload_map, TracePayloadType::kIterKey);
if (lower_bound.size() > 0) {
TracerHelper::SetPayloadMap(trace.payload_map,
TracePayloadType::kIterLowerBound);
}
if (upper_bound.size() > 0) {
TracerHelper::SetPayloadMap(trace.payload_map,
TracePayloadType::kIterUpperBound);
}
// Encode the Iterator struct members into payload. Make sure add them in
// order.
PutFixed64(&trace.payload, trace.payload_map);
PutFixed32(&trace.payload, cf_id);
PutLengthPrefixedSlice(&trace.payload, key);
if (lower_bound.size() > 0) {
PutLengthPrefixedSlice(&trace.payload, lower_bound);
}
if (upper_bound.size() > 0) {
PutLengthPrefixedSlice(&trace.payload, upper_bound);
}
return WriteTrace(trace);
}
Status Tracer::IteratorSeekForPrev(const uint32_t& cf_id, const Slice& key) {
Status Tracer::IteratorSeekForPrev(const uint32_t& cf_id, const Slice& key,
const Slice& lower_bound,
const Slice upper_bound) {
TraceType trace_type = kTraceIteratorSeekForPrev;
if (ShouldSkipTrace(trace_type)) {
return Status::OK();
@ -118,7 +277,29 @@ Status Tracer::IteratorSeekForPrev(const uint32_t& cf_id, const Slice& key) {
Trace trace;
trace.ts = clock_->NowMicros();
trace.type = trace_type;
EncodeCFAndKey(&trace.payload, cf_id, key);
// Set the payloadmap of the struct member that will be encoded in the
// payload.
TracerHelper::SetPayloadMap(trace.payload_map, TracePayloadType::kIterCFID);
TracerHelper::SetPayloadMap(trace.payload_map, TracePayloadType::kIterKey);
if (lower_bound.size() > 0) {
TracerHelper::SetPayloadMap(trace.payload_map,
TracePayloadType::kIterLowerBound);
}
if (upper_bound.size() > 0) {
TracerHelper::SetPayloadMap(trace.payload_map,
TracePayloadType::kIterUpperBound);
}
// Encode the Iterator struct members into payload. Make sure add them in
// order.
PutFixed64(&trace.payload, trace.payload_map);
PutFixed32(&trace.payload, cf_id);
PutLengthPrefixedSlice(&trace.payload, key);
if (lower_bound.size() > 0) {
PutLengthPrefixedSlice(&trace.payload, lower_bound);
}
if (upper_bound.size() > 0) {
PutLengthPrefixedSlice(&trace.payload, upper_bound);
}
return WriteTrace(trace);
}
@ -148,7 +329,8 @@ bool Tracer::IsTraceFileOverMax() {
Status Tracer::WriteHeader() {
std::ostringstream s;
s << kTraceMagic << "\t"
<< "Trace Version: 0.1\t"
<< "Trace Version: " << kTraceFileMajorVersion << "."
<< kTraceFileMinorVersion << "\t"
<< "RocksDB Version: " << kMajorVersion << "." << kMinorVersion << "\t"
<< "Format: Timestamp OpType Payload\n";
std::string header(s.str());
@ -164,6 +346,8 @@ Status Tracer::WriteFooter() {
Trace trace;
trace.ts = clock_->NowMicros();
trace.type = kTraceEnd;
TracerHelper::SetPayloadMap(trace.payload_map,
TracePayloadType::kEmptyPayload);
trace.payload = "";
return WriteTrace(trace);
}
@ -204,10 +388,15 @@ Status Replayer::SetFastForward(uint32_t fast_forward) {
Status Replayer::Replay() {
Status s;
Trace header;
int db_version;
s = ReadHeader(&header);
if (!s.ok()) {
return s;
}
s = TracerHelper::ParseTraceHeader(header, &trace_file_version_, &db_version);
if (!s.ok()) {
return s;
}
std::chrono::system_clock::time_point replay_epoch =
std::chrono::system_clock::now();
@ -227,55 +416,83 @@ Status Replayer::Replay() {
replay_epoch +
std::chrono::microseconds((trace.ts - header.ts) / fast_forward_));
if (trace.type == kTraceWrite) {
if (trace_file_version_ < 2) {
WriteBatch batch(trace.payload);
db_->Write(woptions, &batch);
} else {
WritePayload w_payload;
TracerHelper::DecodeWritePayload(&trace, &w_payload);
WriteBatch batch(w_payload.write_batch_data.ToString());
db_->Write(woptions, &batch);
}
ops++;
} else if (trace.type == kTraceGet) {
uint32_t cf_id = 0;
Slice key;
DecodeCFAndKey(trace.payload, &cf_id, &key);
if (cf_id > 0 && cf_map_.find(cf_id) == cf_map_.end()) {
GetPayload get_payload;
get_payload.get_key = 0;
if (trace_file_version_ < 2) {
DecodeCFAndKey(trace.payload, &get_payload.cf_id, &get_payload.get_key);
} else {
TracerHelper::DecodeGetPayload(&trace, &get_payload);
}
if (get_payload.cf_id > 0 &&
cf_map_.find(get_payload.cf_id) == cf_map_.end()) {
return Status::Corruption("Invalid Column Family ID.");
}
std::string value;
if (cf_id == 0) {
db_->Get(roptions, key, &value);
if (get_payload.cf_id == 0) {
db_->Get(roptions, get_payload.get_key, &value);
} else {
db_->Get(roptions, cf_map_[cf_id], key, &value);
db_->Get(roptions, cf_map_[get_payload.cf_id], get_payload.get_key,
&value);
}
ops++;
} else if (trace.type == kTraceIteratorSeek) {
uint32_t cf_id = 0;
Slice key;
DecodeCFAndKey(trace.payload, &cf_id, &key);
if (cf_id > 0 && cf_map_.find(cf_id) == cf_map_.end()) {
// Currently, we only support to call Seek. The Next() and Prev() is not
// supported.
IterPayload iter_payload;
iter_payload.cf_id = 0;
if (trace_file_version_ < 2) {
DecodeCFAndKey(trace.payload, &iter_payload.cf_id,
&iter_payload.iter_key);
} else {
TracerHelper::DecodeIterPayload(&trace, &iter_payload);
}
if (iter_payload.cf_id > 0 &&
cf_map_.find(iter_payload.cf_id) == cf_map_.end()) {
return Status::Corruption("Invalid Column Family ID.");
}
if (cf_id == 0) {
if (iter_payload.cf_id == 0) {
single_iter = db_->NewIterator(roptions);
} else {
single_iter = db_->NewIterator(roptions, cf_map_[cf_id]);
single_iter = db_->NewIterator(roptions, cf_map_[iter_payload.cf_id]);
}
single_iter->Seek(key);
single_iter->Seek(iter_payload.iter_key);
ops++;
delete single_iter;
} else if (trace.type == kTraceIteratorSeekForPrev) {
// Currently, only support to call the Seek()
uint32_t cf_id = 0;
Slice key;
DecodeCFAndKey(trace.payload, &cf_id, &key);
if (cf_id > 0 && cf_map_.find(cf_id) == cf_map_.end()) {
// Currently, we only support to call SeekForPrev. The Next() and Prev()
// is not supported.
IterPayload iter_payload;
iter_payload.cf_id = 0;
if (trace_file_version_ < 2) {
DecodeCFAndKey(trace.payload, &iter_payload.cf_id,
&iter_payload.iter_key);
} else {
TracerHelper::DecodeIterPayload(&trace, &iter_payload);
}
if (iter_payload.cf_id > 0 &&
cf_map_.find(iter_payload.cf_id) == cf_map_.end()) {
return Status::Corruption("Invalid Column Family ID.");
}
if (cf_id == 0) {
if (iter_payload.cf_id == 0) {
single_iter = db_->NewIterator(roptions);
} else {
single_iter = db_->NewIterator(roptions, cf_map_[cf_id]);
single_iter = db_->NewIterator(roptions, cf_map_[iter_payload.cf_id]);
}
single_iter->SeekForPrev(key);
single_iter->SeekForPrev(iter_payload.iter_key);
ops++;
delete single_iter;
} else if (trace.type == kTraceEnd) {
@ -302,11 +519,15 @@ Status Replayer::Replay() {
Status Replayer::MultiThreadReplay(uint32_t threads_num) {
Status s;
Trace header;
int db_version;
s = ReadHeader(&header);
if (!s.ok()) {
return s;
}
s = TracerHelper::ParseTraceHeader(header, &trace_file_version_, &db_version);
if (!s.ok()) {
return s;
}
ThreadPoolImpl thread_pool;
thread_pool.SetHostEnv(env_);
@ -331,6 +552,7 @@ Status Replayer::MultiThreadReplay(uint32_t threads_num) {
ra->cf_map = &cf_map_;
ra->woptions = woptions;
ra->roptions = roptions;
ra->trace_file_version = trace_file_version_;
std::this_thread::sleep_until(
replay_epoch + std::chrono::microseconds(
@ -374,10 +596,15 @@ Status Replayer::MultiThreadReplay(uint32_t threads_num) {
Status Replayer::ReadHeader(Trace* header) {
assert(header != nullptr);
Status s = ReadTrace(header);
std::string encoded_trace;
// Read the trace head
Status s = trace_reader_->Read(&encoded_trace);
if (!s.ok()) {
return s;
}
s = TracerHelper::DecodeTrace(encoded_trace, header);
if (header->type != kTraceBegin) {
return Status::Corruption("Corrupted trace file. Incorrect header.");
}
@ -418,20 +645,26 @@ void Replayer::BGWorkGet(void* arg) {
assert(ra != nullptr);
auto cf_map = static_cast<std::unordered_map<uint32_t, ColumnFamilyHandle*>*>(
ra->cf_map);
uint32_t cf_id = 0;
Slice key;
DecodeCFAndKey(ra->trace_entry.payload, &cf_id, &key);
if (cf_id > 0 && cf_map->find(cf_id) == cf_map->end()) {
GetPayload get_payload;
get_payload.cf_id = 0;
if (ra->trace_file_version < 2) {
DecodeCFAndKey(ra->trace_entry.payload, &get_payload.cf_id,
&get_payload.get_key);
} else {
TracerHelper::DecodeGetPayload(&(ra->trace_entry), &get_payload);
}
if (get_payload.cf_id > 0 &&
cf_map->find(get_payload.cf_id) == cf_map->end()) {
return;
}
std::string value;
if (cf_id == 0) {
ra->db->Get(ra->roptions, key, &value);
if (get_payload.cf_id == 0) {
ra->db->Get(ra->roptions, get_payload.get_key, &value);
} else {
ra->db->Get(ra->roptions, (*cf_map)[cf_id], key, &value);
ra->db->Get(ra->roptions, (*cf_map)[get_payload.cf_id], get_payload.get_key,
&value);
}
return;
}
@ -439,8 +672,16 @@ void Replayer::BGWorkWriteBatch(void* arg) {
std::unique_ptr<ReplayerWorkerArg> ra(
reinterpret_cast<ReplayerWorkerArg*>(arg));
assert(ra != nullptr);
if (ra->trace_file_version < 2) {
WriteBatch batch(ra->trace_entry.payload);
ra->db->Write(ra->woptions, &batch);
} else {
WritePayload w_payload;
TracerHelper::DecodeWritePayload(&(ra->trace_entry), &w_payload);
WriteBatch batch(w_payload.write_batch_data.ToString());
ra->db->Write(ra->woptions, &batch);
}
return;
}
@ -450,21 +691,28 @@ void Replayer::BGWorkIterSeek(void* arg) {
assert(ra != nullptr);
auto cf_map = static_cast<std::unordered_map<uint32_t, ColumnFamilyHandle*>*>(
ra->cf_map);
uint32_t cf_id = 0;
Slice key;
DecodeCFAndKey(ra->trace_entry.payload, &cf_id, &key);
if (cf_id > 0 && cf_map->find(cf_id) == cf_map->end()) {
IterPayload iter_payload;
iter_payload.cf_id = 0;
if (ra->trace_file_version < 2) {
DecodeCFAndKey(ra->trace_entry.payload, &iter_payload.cf_id,
&iter_payload.iter_key);
} else {
TracerHelper::DecodeIterPayload(&(ra->trace_entry), &iter_payload);
}
if (iter_payload.cf_id > 0 &&
cf_map->find(iter_payload.cf_id) == cf_map->end()) {
return;
}
std::string value;
Iterator* single_iter = nullptr;
if (cf_id == 0) {
if (iter_payload.cf_id == 0) {
single_iter = ra->db->NewIterator(ra->roptions);
} else {
single_iter = ra->db->NewIterator(ra->roptions, (*cf_map)[cf_id]);
single_iter =
ra->db->NewIterator(ra->roptions, (*cf_map)[iter_payload.cf_id]);
}
single_iter->Seek(key);
single_iter->Seek(iter_payload.iter_key);
delete single_iter;
return;
}
@ -475,21 +723,28 @@ void Replayer::BGWorkIterSeekForPrev(void* arg) {
assert(ra != nullptr);
auto cf_map = static_cast<std::unordered_map<uint32_t, ColumnFamilyHandle*>*>(
ra->cf_map);
uint32_t cf_id = 0;
Slice key;
DecodeCFAndKey(ra->trace_entry.payload, &cf_id, &key);
if (cf_id > 0 && cf_map->find(cf_id) == cf_map->end()) {
IterPayload iter_payload;
iter_payload.cf_id = 0;
if (ra->trace_file_version < 2) {
DecodeCFAndKey(ra->trace_entry.payload, &iter_payload.cf_id,
&iter_payload.iter_key);
} else {
TracerHelper::DecodeIterPayload(&(ra->trace_entry), &iter_payload);
}
if (iter_payload.cf_id > 0 &&
cf_map->find(iter_payload.cf_id) == cf_map->end()) {
return;
}
std::string value;
Iterator* single_iter = nullptr;
if (cf_id == 0) {
if (iter_payload.cf_id == 0) {
single_iter = ra->db->NewIterator(ra->roptions);
} else {
single_iter = ra->db->NewIterator(ra->roptions, (*cf_map)[cf_id]);
single_iter =
ra->db->NewIterator(ra->roptions, (*cf_map)[iter_payload.cf_id]);
}
single_iter->SeekForPrev(key);
single_iter->SeekForPrev(iter_payload.iter_key);
delete single_iter;
return;
}

View File

@ -40,6 +40,9 @@ const unsigned int kTracePayloadLengthSize = 4;
const unsigned int kTraceMetadataSize =
kTraceTimestampSize + kTraceTypeSize + kTracePayloadLengthSize;
static const int kTraceFileMajorVersion = 0;
static const int kTraceFileMinorVersion = 2;
// Supported Trace types.
enum TraceType : char {
kTraceBegin = 1,
@ -67,22 +70,79 @@ enum TraceType : char {
struct Trace {
uint64_t ts; // timestamp
TraceType type;
// Each bit in payload_map stores which corresponding struct member added in
// the payload. Each TraceType has its corresponding payload struct. For
// example, if bit at position 0 is set in write payload, then the write batch
// will be addedd.
uint64_t payload_map = 0;
// Each trace type has its own payload_struct, which will be serilized in the
// payload.
std::string payload;
void reset() {
ts = 0;
type = kTraceMax;
payload_map = 0;
payload.clear();
}
};
enum TracePayloadType : char {
// Each member of all query payload structs should have a corresponding flag
// here. Make sure to add them sequentially in the order of it is added.
kEmptyPayload = 0,
kWriteBatchData = 1,
kGetCFID = 2,
kGetKey = 3,
kIterCFID = 4,
kIterKey = 5,
kIterLowerBound = 6,
kIterUpperBound = 7,
};
struct WritePayload {
Slice write_batch_data;
};
struct GetPayload {
uint32_t cf_id;
Slice get_key;
};
struct IterPayload {
uint32_t cf_id;
Slice iter_key;
Slice lower_bound;
Slice upper_bound;
};
class TracerHelper {
public:
// Encode a trace object into the given string.
// Parse the string with major and minor version only
static Status ParseVersionStr(std::string& v_string, int* v_num);
// Parse the trace file version and db version in trace header
static Status ParseTraceHeader(const Trace& header, int* trace_version,
int* db_version);
// Encode a version 0.1 trace object into the given string.
static void EncodeTrace(const Trace& trace, std::string* encoded_trace);
// Decode a string into the given trace object.
static Status DecodeTrace(const std::string& encoded_trace, Trace* trace);
// Set the payload map based on the payload type
static bool SetPayloadMap(uint64_t& payload_map,
const TracePayloadType payload_type);
// Decode the write payload and store in WrteiPayload
static void DecodeWritePayload(Trace* trace, WritePayload* write_payload);
// Decode the get payload and store in WrteiPayload
static void DecodeGetPayload(Trace* trace, GetPayload* get_payload);
// Decode the iter payload and store in WrteiPayload
static void DecodeIterPayload(Trace* trace, IterPayload* iter_payload);
};
// Tracer captures all RocksDB operations using a user-provided TraceWriter.
@ -102,8 +162,10 @@ class Tracer {
Status Get(ColumnFamilyHandle* cfname, const Slice& key);
// Trace Iterators.
Status IteratorSeek(const uint32_t& cf_id, const Slice& key);
Status IteratorSeekForPrev(const uint32_t& cf_id, const Slice& key);
Status IteratorSeek(const uint32_t& cf_id, const Slice& key,
const Slice& lower_bound, const Slice upper_bound);
Status IteratorSeekForPrev(const uint32_t& cf_id, const Slice& key,
const Slice& lower_bound, const Slice upper_bound);
// Returns true if the trace is over the configured max trace file limit.
// False otherwise.
@ -186,6 +248,10 @@ class Replayer {
std::unique_ptr<TraceReader> trace_reader_;
std::unordered_map<uint32_t, ColumnFamilyHandle*> cf_map_;
uint32_t fast_forward_;
// When reading the trace header, the trace file version can be parsed.
// Replayer will use different decode method to get the trace content based
// on different trace file version.
int trace_file_version_;
};
// The passin arg of MultiThreadRepkay for each trace record.
@ -195,6 +261,7 @@ struct ReplayerWorkerArg {
std::unordered_map<uint32_t, ColumnFamilyHandle*>* cf_map;
WriteOptions woptions;
ReadOptions roptions;
int trace_file_version;
};
} // namespace ROCKSDB_NAMESPACE