Code cleanup for trace replayer (#8652)

Summary:
- Remove extra `;` in trace_record.h
- Remove some unnecessary `assert` in trace_record_handler.cc
- Initialize `env_` after` exec_handler_` in `ReplayerImpl` to let db be asserted in creating the handler before getting `db->GetEnv()`.
- Update history to include the new `TraceReader::Reset()`

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8652

Reviewed By: ajkr

Differential Revision: D30276872

Pulled By: autopear

fbshipit-source-id: 476ee162e0f241490c6209307448343a5b326b37
This commit is contained in:
Merlin Mao 2021-08-12 09:21:40 -07:00 committed by Facebook GitHub Bot
parent f58d276764
commit 74a652a45f
5 changed files with 12 additions and 14 deletions

View File

@ -18,7 +18,7 @@
* Fast forward option in Trace replay changed to double type to allow replaying at a lower speed, by settings the value between 0 and 1. This option can be set via `ReplayOptions` in `Replayer::Replay()`, or via `--trace_replay_fast_forward` in db_bench. * Fast forward option in Trace replay changed to double type to allow replaying at a lower speed, by settings the value between 0 and 1. This option can be set via `ReplayOptions` in `Replayer::Replay()`, or via `--trace_replay_fast_forward` in db_bench.
## Public API change ## Public API change
* Added APIs to decode and replay trace file via Replayer class. Added `DB::NewDefaultReplayer()` to create a default Replayer instance. Created trace_record.h and utilities/replayer.h files to access decoded Trace records and replay them. * Added APIs to decode and replay trace file via Replayer class. Added `DB::NewDefaultReplayer()` to create a default Replayer instance. Added `TraceReader::Reset()` to restart reading a trace file. Created trace_record.h and utilities/replayer.h files to access decoded Trace records and replay them.
### Performance Improvements ### Performance Improvements
* Try to avoid updating DBOptions if `SetDBOptions()` does not change any option value. * Try to avoid updating DBOptions if `SetDBOptions()` does not change any option value.

View File

@ -95,7 +95,7 @@ class WriteQueryTraceRecord : public QueryTraceRecord {
virtual ~WriteQueryTraceRecord() override; virtual ~WriteQueryTraceRecord() override;
TraceType GetTraceType() const override { return kTraceWrite; }; TraceType GetTraceType() const override { return kTraceWrite; }
virtual Slice GetWriteBatchRep() const; virtual Slice GetWriteBatchRep() const;
@ -116,7 +116,7 @@ class GetQueryTraceRecord : public QueryTraceRecord {
virtual ~GetQueryTraceRecord() override; virtual ~GetQueryTraceRecord() override;
TraceType GetTraceType() const override { return kTraceGet; }; TraceType GetTraceType() const override { return kTraceGet; }
virtual uint32_t GetColumnFamilyID() const; virtual uint32_t GetColumnFamilyID() const;
@ -187,7 +187,7 @@ class MultiGetQueryTraceRecord : public QueryTraceRecord {
virtual ~MultiGetQueryTraceRecord() override; virtual ~MultiGetQueryTraceRecord() override;
TraceType GetTraceType() const override { return kTraceMultiGet; }; TraceType GetTraceType() const override { return kTraceMultiGet; }
virtual std::vector<uint32_t> GetColumnFamilyIDs() const; virtual std::vector<uint32_t> GetColumnFamilyIDs() const;

View File

@ -38,7 +38,6 @@ Status TraceExecutionHandler::Handle(const GetQueryTraceRecord& record) {
if (it == cf_map_.end()) { if (it == cf_map_.end()) {
return Status::Corruption("Invalid Column Family ID."); return Status::Corruption("Invalid Column Family ID.");
} }
assert(it->second != nullptr);
std::string value; std::string value;
Status s = db_->Get(read_opts_, it->second, record.GetKey(), &value); Status s = db_->Get(read_opts_, it->second, record.GetKey(), &value);
@ -53,7 +52,6 @@ Status TraceExecutionHandler::Handle(
if (it == cf_map_.end()) { if (it == cf_map_.end()) {
return Status::Corruption("Invalid Column Family ID."); return Status::Corruption("Invalid Column Family ID.");
} }
assert(it->second != nullptr);
Iterator* single_iter = db_->NewIterator(read_opts_, it->second); Iterator* single_iter = db_->NewIterator(read_opts_, it->second);
@ -80,7 +78,6 @@ Status TraceExecutionHandler::Handle(const MultiGetQueryTraceRecord& record) {
if (it == cf_map_.end()) { if (it == cf_map_.end()) {
return Status::Corruption("Invalid Column Family ID."); return Status::Corruption("Invalid Column Family ID.");
} }
assert(it->second != nullptr);
handles.push_back(it->second); handles.push_back(it->second);
} }

View File

@ -25,12 +25,13 @@ ReplayerImpl::ReplayerImpl(DB* db,
const std::vector<ColumnFamilyHandle*>& handles, const std::vector<ColumnFamilyHandle*>& handles,
std::unique_ptr<TraceReader>&& reader) std::unique_ptr<TraceReader>&& reader)
: Replayer(), : Replayer(),
env_(db->GetEnv()),
trace_reader_(std::move(reader)), trace_reader_(std::move(reader)),
prepared_(false), prepared_(false),
trace_end_(false), trace_end_(false),
header_ts_(0), header_ts_(0),
exec_handler_(TraceRecord::NewExecutionHandler(db, handles)) {} exec_handler_(TraceRecord::NewExecutionHandler(db, handles)),
env_(db->GetEnv()),
trace_file_version_(-1) {}
ReplayerImpl::~ReplayerImpl() { ReplayerImpl::~ReplayerImpl() {
exec_handler_.reset(); exec_handler_.reset();

View File

@ -62,17 +62,17 @@ class ReplayerImpl : public Replayer {
// Generic function to execute a Trace in a thread pool. // Generic function to execute a Trace in a thread pool.
static void BackgroundWork(void* arg); static void BackgroundWork(void* arg);
Env* env_;
std::unique_ptr<TraceReader> trace_reader_; std::unique_ptr<TraceReader> trace_reader_;
// When reading the trace header, the trace file version can be parsed.
// Replayer will use different decode method to get the trace content based
// on different trace file version.
int trace_file_version_;
std::mutex mutex_; std::mutex mutex_;
std::atomic<bool> prepared_; std::atomic<bool> prepared_;
std::atomic<bool> trace_end_; std::atomic<bool> trace_end_;
uint64_t header_ts_; uint64_t header_ts_;
std::unique_ptr<TraceRecord::Handler> exec_handler_; std::unique_ptr<TraceRecord::Handler> exec_handler_;
Env* env_;
// When reading the trace header, the trace file version can be parsed.
// Replayer will use different decode method to get the trace content based
// on different trace file version.
int trace_file_version_;
}; };
// The passin arg of MultiThreadRepkay for each trace record. // The passin arg of MultiThreadRepkay for each trace record.