Binlog: truncate on read error
GitOrigin-RevId: 061fbfbe92470dca2f7282842993b1bea132872b
This commit is contained in:
parent
ec2f70cb19
commit
7e71d12423
@ -305,9 +305,6 @@ Status Binlog::destroy(Slice path) {
|
||||
}
|
||||
|
||||
void Binlog::do_event(BinlogEvent &&event) {
|
||||
fd_events_++;
|
||||
fd_size_ += event.raw_event_.size();
|
||||
|
||||
if (state_ == State::Run || state_ == State::Reindex) {
|
||||
VLOG(binlog) << "Write binlog event: " << format::cond(state_ == State::Reindex, "[reindex] ");
|
||||
auto validate_status = event.validate();
|
||||
@ -371,8 +368,18 @@ void Binlog::do_event(BinlogEvent &&event) {
|
||||
}
|
||||
|
||||
if (state_ != State::Reindex) {
|
||||
processor_->add_event(std::move(event));
|
||||
auto status = processor_->add_event(std::move(event));
|
||||
if (status.is_error()) {
|
||||
if (state_ == State::Load) {
|
||||
fd_.seek(fd_size_).ensure();
|
||||
fd_.truncate_to_current_position(fd_size_).ensure();
|
||||
}
|
||||
LOG(FATAL) << status << " " << tag("state", static_cast<int32>(state_));
|
||||
}
|
||||
}
|
||||
|
||||
fd_events_++;
|
||||
fd_size_ += event.raw_event_.size();
|
||||
}
|
||||
|
||||
void Binlog::sync() {
|
||||
|
@ -13,14 +13,13 @@
|
||||
namespace td {
|
||||
namespace detail {
|
||||
|
||||
void BinlogEventsProcessor::do_event(BinlogEvent &&event) {
|
||||
Status BinlogEventsProcessor::do_event(BinlogEvent &&event) {
|
||||
offset_ = event.offset_;
|
||||
auto fixed_id = event.id_ * 2;
|
||||
if ((event.flags_ & BinlogEvent::Flags::Rewrite) && !ids_.empty() && ids_.back() >= fixed_id) {
|
||||
auto it = std::lower_bound(ids_.begin(), ids_.end(), fixed_id);
|
||||
if (it == ids_.end() || *it != fixed_id) {
|
||||
LOG(FATAL) << "Ignore rewrite logevent " << event.public_to_string();
|
||||
return;
|
||||
return Status::Error(PSLICE() << "Ignore rewrite logevent " << event.public_to_string());
|
||||
}
|
||||
auto pos = it - ids_.begin();
|
||||
total_raw_events_size_ -= static_cast<int64>(events_[pos].raw_event_.size());
|
||||
@ -36,9 +35,11 @@ void BinlogEventsProcessor::do_event(BinlogEvent &&event) {
|
||||
} else if (event.type_ < 0) {
|
||||
// just skip service events
|
||||
} else {
|
||||
CHECK(ids_.empty() || ids_.back() < fixed_id)
|
||||
<< offset_ << " " << ids_.size() << " " << ids_.back() << " " << fixed_id << " " << event.public_to_string()
|
||||
<< " " << total_events_ << " " << total_raw_events_size_;
|
||||
if (!(ids_.empty() || ids_.back() < fixed_id)) {
|
||||
return Status::Error(PSLICE() << offset_ << " " << ids_.size() << " " << ids_.back() << " " << fixed_id << " "
|
||||
<< event.public_to_string() << " " << total_events_ << " "
|
||||
<< total_raw_events_size_);
|
||||
}
|
||||
last_id_ = event.id_;
|
||||
total_raw_events_size_ += static_cast<int64>(event.raw_event_.size());
|
||||
total_events_++;
|
||||
@ -49,6 +50,7 @@ void BinlogEventsProcessor::do_event(BinlogEvent &&event) {
|
||||
if (total_events_ > 10 && empty_events_ * 4 > total_events_ * 3) {
|
||||
compactify();
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
void BinlogEventsProcessor::compactify() {
|
||||
|
@ -15,8 +15,8 @@ namespace td {
|
||||
namespace detail {
|
||||
class BinlogEventsProcessor {
|
||||
public:
|
||||
void add_event(BinlogEvent &&event) {
|
||||
do_event(std::move(event));
|
||||
Status add_event(BinlogEvent &&event) TD_WARN_UNUSED_RESULT {
|
||||
return do_event(std::move(event));
|
||||
}
|
||||
|
||||
template <class CallbackT>
|
||||
@ -50,7 +50,7 @@ class BinlogEventsProcessor {
|
||||
int64 offset_{0};
|
||||
int64 total_raw_events_size_{0};
|
||||
|
||||
void do_event(BinlogEvent &&event);
|
||||
Status do_event(BinlogEvent &&event);
|
||||
void compactify();
|
||||
};
|
||||
} // namespace detail
|
||||
|
@ -448,7 +448,7 @@ class FakeBinlog
|
||||
auto event = std::move(pending.event);
|
||||
if (!event.empty()) {
|
||||
LOG(INFO) << "SAVE EVENT: " << event.id_ << " " << event;
|
||||
events_processor_.add_event(std::move(event));
|
||||
events_processor_.add_event(std::move(event)).ensure();
|
||||
}
|
||||
append(promises, std::move(pending.promises_));
|
||||
}
|
||||
|
Reference in New Issue
Block a user