Binlog debug

GitOrigin-RevId: 20ea6ad4fcb17af41d08ec9e00156d9267bbec15
This commit is contained in:
Arseny Smirnov 2018-06-28 18:12:20 +03:00
parent 80d4292493
commit d22fba5603
7 changed files with 48 additions and 16 deletions

View File

@ -71,6 +71,10 @@ class Binlog {
add_event(BinlogEvent(std::move(raw_event))); add_event(BinlogEvent(std::move(raw_event)));
} }
void add_raw_event(BufferSlice &&raw_event, BinlogDebugInfo info) {
add_event(BinlogEvent(std::move(raw_event), info));
}
void add_event(BinlogEvent &&event); void add_event(BinlogEvent &&event);
void sync(); void sync();
void flush(); void flush();

View File

@ -40,6 +40,20 @@ static constexpr size_t MIN_EVENT_SIZE = EVENT_HEADER_SIZE + EVENT_TAIL_SIZE;
extern int32 VERBOSITY_NAME(binlog); extern int32 VERBOSITY_NAME(binlog);
struct BinlogDebugInfo {
BinlogDebugInfo() = default;
BinlogDebugInfo(const char *file, int line) : file(file), line(line) {
}
const char *file{""};
int line{0};
};
inline StringBuilder &operator<<(StringBuilder &sb, const BinlogDebugInfo &info) {
if (info.line == 0) {
return sb;
}
return sb << "[" << info.file << ":" << info.line << "]";
}
// TODO: smaller BinlogEvent // TODO: smaller BinlogEvent
struct BinlogEvent { struct BinlogEvent {
int64 offset_; int64 offset_;
@ -54,6 +68,8 @@ struct BinlogEvent {
BufferSlice raw_event_; BufferSlice raw_event_;
BinlogDebugInfo debug_info_;
enum ServiceTypes { Header = -1, Empty = -2, AesCtrEncryption = -3, NoEncryption = -4 }; enum ServiceTypes { Header = -1, Empty = -2, AesCtrEncryption = -3, NoEncryption = -4 };
enum Flags { Rewrite = 1, Partial = 2 }; enum Flags { Rewrite = 1, Partial = 2 };
@ -77,18 +93,23 @@ struct BinlogEvent {
explicit BinlogEvent(BufferSlice &&raw_event) { explicit BinlogEvent(BufferSlice &&raw_event) {
init(std::move(raw_event), false).ensure(); init(std::move(raw_event), false).ensure();
} }
explicit BinlogEvent(BufferSlice &&raw_event, BinlogDebugInfo info) {
init(std::move(raw_event), false).ensure();
debug_info_ = info;
}
Status init(BufferSlice &&raw_event, bool check_crc = true) TD_WARN_UNUSED_RESULT; Status init(BufferSlice &&raw_event, bool check_crc = true) TD_WARN_UNUSED_RESULT;
static BufferSlice create_raw(uint64 id, int32 type, int32 flags, const Storer &storer); static BufferSlice create_raw(uint64 id, int32 type, int32 flags, const Storer &storer);
std::string public_to_string() const { std::string public_to_string() const {
return PSTRING() << "LogEvent[" << tag("id", format::as_hex(id_)) << tag("type", type_) << tag("flags", flags_) return PSTRING() << "LogEvent[" << tag("id", format::as_hex(id_)) << tag("type", type_) << tag("flags", flags_)
<< tag("data", data_.size()) << "]"; << tag("data", data_.size()) << "]" << debug_info_;
} }
}; };
inline StringBuilder &operator<<(StringBuilder &sb, const BinlogEvent &event) { inline StringBuilder &operator<<(StringBuilder &sb, const BinlogEvent &event) {
return sb << "LogEvent[" << tag("id", format::as_hex(event.id_)) << tag("type", event.type_) return sb << "LogEvent[" << tag("id", format::as_hex(event.id_)) << tag("type", event.type_)
<< tag("flags", event.flags_) << tag("data", format::as_hex_dump<4>(event.data_)) << "]"; << tag("flags", event.flags_) << tag("data", format::as_hex_dump<4>(event.data_)) << "]"
<< event.debug_info_;
} }
// Implementation // Implementation

View File

@ -31,10 +31,13 @@ static uint64 binlog_rewrite(const BinlogT &binlog_ptr, uint64 logevent_id, int3
return seq_no; return seq_no;
} }
#define binlog_erase(...) binlog_erase_impl({__FILE__, __LINE__}, __VA_ARGS__)
template <class BinlogT> template <class BinlogT>
static uint64 binlog_erase(const BinlogT &binlog_ptr, uint64 logevent_id, Promise<> promise = Promise<>()) { static uint64 binlog_erase_impl(BinlogDebugInfo info, const BinlogT &binlog_ptr, uint64 logevent_id,
Promise<> promise = Promise<>()) {
auto seq_no = binlog_ptr->next_id(); auto seq_no = binlog_ptr->next_id();
binlog_ptr->add_raw_event(seq_no, binlog_ptr->add_raw_event(info, seq_no,
BinlogEvent::create_raw(logevent_id, BinlogEvent::ServiceTypes::Empty, BinlogEvent::create_raw(logevent_id, BinlogEvent::ServiceTypes::Empty,
BinlogEvent::Flags::Rewrite, EmptyStorer()), BinlogEvent::Flags::Rewrite, EmptyStorer()),
std::move(promise)); std::move(promise));

View File

@ -30,11 +30,14 @@ class BinlogInterface {
void close_and_destroy(Promise<> promise = {}) { void close_and_destroy(Promise<> promise = {}) {
close_and_destroy_impl(std::move(promise)); close_and_destroy_impl(std::move(promise));
} }
void add_raw_event(BinlogDebugInfo info, uint64 id, BufferSlice &&raw_event, Promise<> promise = Promise<>()) {
add_raw_event_impl(id, std::move(raw_event), std::move(promise), info);
}
void add_raw_event(uint64 id, BufferSlice &&raw_event, Promise<> promise = Promise<>()) { void add_raw_event(uint64 id, BufferSlice &&raw_event, Promise<> promise = Promise<>()) {
add_raw_event_impl(id, std::move(raw_event), std::move(promise)); add_raw_event_impl(id, std::move(raw_event), std::move(promise), {});
} }
void lazy_sync(Promise<> promise = Promise<>()) { void lazy_sync(Promise<> promise = Promise<>()) {
add_raw_event_impl(next_id(), BufferSlice(), std::move(promise)); add_raw_event_impl(next_id(), BufferSlice(), std::move(promise), {});
} }
virtual void force_sync(Promise<> promise) = 0; virtual void force_sync(Promise<> promise) = 0;
virtual void force_flush() = 0; virtual void force_flush() = 0;
@ -46,6 +49,6 @@ class BinlogInterface {
protected: protected:
virtual void close_impl(Promise<> promise) = 0; virtual void close_impl(Promise<> promise) = 0;
virtual void close_and_destroy_impl(Promise<> promise) = 0; virtual void close_and_destroy_impl(Promise<> promise) = 0;
virtual void add_raw_event_impl(uint64 id, BufferSlice &&raw_event, Promise<> promise) = 0; virtual void add_raw_event_impl(uint64 id, BufferSlice &&raw_event, Promise<> promise, BinlogDebugInfo info) = 0;
}; };
} // namespace td } // namespace td

View File

@ -34,11 +34,12 @@ class BinlogActor : public Actor {
struct Event { struct Event {
BufferSlice raw_event; BufferSlice raw_event;
Promise<> sync_promise; Promise<> sync_promise;
BinlogDebugInfo debug_info;
}; };
void add_raw_event(uint64 seq_no, BufferSlice &&raw_event, Promise<> &&promise) { void add_raw_event(uint64 seq_no, BufferSlice &&raw_event, Promise<> &&promise, BinlogDebugInfo info) {
processor_.add(seq_no, Event{std::move(raw_event), std::move(promise)}, [&](uint64 id, Event &&event) { processor_.add(seq_no, Event{std::move(raw_event), std::move(promise), info}, [&](uint64 id, Event &&event) {
if (!event.raw_event.empty()) { if (!event.raw_event.empty()) {
do_add_raw_event(std::move(event.raw_event)); do_add_raw_event(std::move(event.raw_event), event.debug_info);
} }
do_lazy_sync(std::move(event.sync_promise)); do_lazy_sync(std::move(event.sync_promise));
}); });
@ -92,8 +93,8 @@ class BinlogActor : public Actor {
} }
} }
void do_add_raw_event(BufferSlice &&raw_event) { void do_add_raw_event(BufferSlice &&raw_event, BinlogDebugInfo info) {
binlog_->add_raw_event(std::move(raw_event)); binlog_->add_raw_event(std::move(raw_event), info);
} }
void try_flush() { void try_flush() {
@ -188,8 +189,8 @@ void ConcurrentBinlog::close_impl(Promise<> promise) {
void ConcurrentBinlog::close_and_destroy_impl(Promise<> promise) { void ConcurrentBinlog::close_and_destroy_impl(Promise<> promise) {
send_closure(std::move(binlog_actor_), &detail::BinlogActor::close_and_destroy, std::move(promise)); send_closure(std::move(binlog_actor_), &detail::BinlogActor::close_and_destroy, std::move(promise));
} }
void ConcurrentBinlog::add_raw_event_impl(uint64 id, BufferSlice &&raw_event, Promise<> promise) { void ConcurrentBinlog::add_raw_event_impl(uint64 id, BufferSlice &&raw_event, Promise<> promise, BinlogDebugInfo info) {
send_closure(binlog_actor_, &detail::BinlogActor::add_raw_event, id, std::move(raw_event), std::move(promise)); send_closure(binlog_actor_, &detail::BinlogActor::add_raw_event, id, std::move(raw_event), std::move(promise), info);
} }
void ConcurrentBinlog::force_sync(Promise<> promise) { void ConcurrentBinlog::force_sync(Promise<> promise) {
send_closure(binlog_actor_, &detail::BinlogActor::force_sync, std::move(promise)); send_closure(binlog_actor_, &detail::BinlogActor::force_sync, std::move(promise));

View File

@ -59,7 +59,7 @@ class ConcurrentBinlog : public BinlogInterface {
void init_impl(std::unique_ptr<Binlog> binlog, int scheduler_id); void init_impl(std::unique_ptr<Binlog> binlog, int scheduler_id);
void close_impl(Promise<> promise) override; void close_impl(Promise<> promise) override;
void close_and_destroy_impl(Promise<> promise) override; void close_and_destroy_impl(Promise<> promise) override;
void add_raw_event_impl(uint64 id, BufferSlice &&raw_event, Promise<> promise) override; void add_raw_event_impl(uint64 id, BufferSlice &&raw_event, Promise<> promise, BinlogDebugInfo info) override;
ActorOwn<detail::BinlogActor> binlog_actor_; ActorOwn<detail::BinlogActor> binlog_actor_;
string path_; string path_;

View File

@ -426,7 +426,7 @@ class FakeBinlog
} }
void close_and_destroy_impl(Promise<> promise) override { void close_and_destroy_impl(Promise<> promise) override {
} }
void add_raw_event_impl(uint64 id, BufferSlice &&raw_event, Promise<> promise) override { void add_raw_event_impl(uint64 id, BufferSlice &&raw_event, Promise<> promise, BinlogDebugInfo info) override {
auto event = BinlogEvent(std::move(raw_event)); auto event = BinlogEvent(std::move(raw_event));
LOG(INFO) << "ADD EVENT: " << event.id_ << " " << event; LOG(INFO) << "ADD EVENT: " << event.id_ << " " << event;
pending_events_.emplace_back(); pending_events_.emplace_back();