diff --git a/tddb/td/db/binlog/Binlog.cpp b/tddb/td/db/binlog/Binlog.cpp index c6f00d479..9aa316282 100644 --- a/tddb/td/db/binlog/Binlog.cpp +++ b/tddb/td/db/binlog/Binlog.cpp @@ -305,9 +305,6 @@ Status Binlog::destroy(Slice path) { } void Binlog::do_event(BinlogEvent &&event) { - fd_events_++; - fd_size_ += event.raw_event_.size(); - if (state_ == State::Run || state_ == State::Reindex) { VLOG(binlog) << "Write binlog event: " << format::cond(state_ == State::Reindex, "[reindex] "); auto validate_status = event.validate(); @@ -371,8 +368,18 @@ void Binlog::do_event(BinlogEvent &&event) { } if (state_ != State::Reindex) { - processor_->add_event(std::move(event)); + auto status = processor_->add_event(std::move(event)); + if (status.is_error()) { + if (state_ == State::Load) { + fd_.seek(fd_size_).ensure(); + fd_.truncate_to_current_position(fd_size_).ensure(); + } + LOG(FATAL) << status << " " << tag("state", static_cast(state_)); + } } + + fd_events_++; + fd_size_ += event.raw_event_.size(); } void Binlog::sync() { diff --git a/tddb/td/db/binlog/detail/BinlogEventsProcessor.cpp b/tddb/td/db/binlog/detail/BinlogEventsProcessor.cpp index 88cb848a6..84a1fee66 100644 --- a/tddb/td/db/binlog/detail/BinlogEventsProcessor.cpp +++ b/tddb/td/db/binlog/detail/BinlogEventsProcessor.cpp @@ -13,14 +13,13 @@ namespace td { namespace detail { -void BinlogEventsProcessor::do_event(BinlogEvent &&event) { +Status BinlogEventsProcessor::do_event(BinlogEvent &&event) { offset_ = event.offset_; auto fixed_id = event.id_ * 2; if ((event.flags_ & BinlogEvent::Flags::Rewrite) && !ids_.empty() && ids_.back() >= fixed_id) { auto it = std::lower_bound(ids_.begin(), ids_.end(), fixed_id); if (it == ids_.end() || *it != fixed_id) { - LOG(FATAL) << "Ignore rewrite logevent " << event.public_to_string(); - return; + return Status::Error(PSLICE() << "Ignore rewrite logevent " << event.public_to_string()); } auto pos = it - ids_.begin(); total_raw_events_size_ -= static_cast(events_[pos].raw_event_.size()); @@ -36,9 +35,11 @@ void BinlogEventsProcessor::do_event(BinlogEvent &&event) { } else if (event.type_ < 0) { // just skip service events } else { - CHECK(ids_.empty() || ids_.back() < fixed_id) - << offset_ << " " << ids_.size() << " " << ids_.back() << " " << fixed_id << " " << event.public_to_string() - << " " << total_events_ << " " << total_raw_events_size_; + if (!(ids_.empty() || ids_.back() < fixed_id)) { + return Status::Error(PSLICE() << offset_ << " " << ids_.size() << " " << ids_.back() << " " << fixed_id << " " + << event.public_to_string() << " " << total_events_ << " " + << total_raw_events_size_); + } last_id_ = event.id_; total_raw_events_size_ += static_cast(event.raw_event_.size()); total_events_++; @@ -49,6 +50,7 @@ void BinlogEventsProcessor::do_event(BinlogEvent &&event) { if (total_events_ > 10 && empty_events_ * 4 > total_events_ * 3) { compactify(); } + return Status::OK(); } void BinlogEventsProcessor::compactify() { diff --git a/tddb/td/db/binlog/detail/BinlogEventsProcessor.h b/tddb/td/db/binlog/detail/BinlogEventsProcessor.h index 84ec28c1c..b7cf97bec 100644 --- a/tddb/td/db/binlog/detail/BinlogEventsProcessor.h +++ b/tddb/td/db/binlog/detail/BinlogEventsProcessor.h @@ -15,8 +15,8 @@ namespace td { namespace detail { class BinlogEventsProcessor { public: - void add_event(BinlogEvent &&event) { - do_event(std::move(event)); + Status add_event(BinlogEvent &&event) TD_WARN_UNUSED_RESULT { + return do_event(std::move(event)); } template @@ -50,7 +50,7 @@ class BinlogEventsProcessor { int64 offset_{0}; int64 total_raw_events_size_{0}; - void do_event(BinlogEvent &&event); + Status do_event(BinlogEvent &&event); void compactify(); }; } // namespace detail diff --git a/test/secret.cpp b/test/secret.cpp index c7d65de79..f9763d87d 100644 --- a/test/secret.cpp +++ b/test/secret.cpp @@ -448,7 +448,7 @@ class FakeBinlog auto event = std::move(pending.event); if (!event.empty()) { LOG(INFO) << "SAVE EVENT: " << event.id_ << " " << event; - events_processor_.add_event(std::move(event)); + events_processor_.add_event(std::move(event)).ensure(); } append(promises, std::move(pending.promises_)); }