Initialize all BinlogEvent fields.
This commit is contained in:
parent
3179d35694
commit
4cb164f444
@ -329,7 +329,7 @@ void Binlog::do_event(BinlogEvent &&event) {
|
|||||||
auto validate_status = event.validate();
|
auto validate_status = event.validate();
|
||||||
if (validate_status.is_error()) {
|
if (validate_status.is_error()) {
|
||||||
LOG(FATAL) << "Failed to validate binlog event " << validate_status << " "
|
LOG(FATAL) << "Failed to validate binlog event " << validate_status << " "
|
||||||
<< format::as_hex_dump<4>(Slice(event.raw_event_.as_slice().truncate(28)));
|
<< format::as_hex_dump<4>(as_slice(event.raw_event_).truncate(28));
|
||||||
}
|
}
|
||||||
VLOG(binlog) << "Write binlog event: " << format::cond(state_ == State::Reindex, "[reindex] ")
|
VLOG(binlog) << "Write binlog event: " << format::cond(state_ == State::Reindex, "[reindex] ")
|
||||||
<< event.public_to_string();
|
<< event.public_to_string();
|
||||||
@ -338,7 +338,7 @@ void Binlog::do_event(BinlogEvent &&event) {
|
|||||||
buffer_writer_.append(event.raw_event_.clone());
|
buffer_writer_.append(event.raw_event_.clone());
|
||||||
break;
|
break;
|
||||||
case EncryptionType::AesCtr:
|
case EncryptionType::AesCtr:
|
||||||
buffer_writer_.append(event.raw_event_.as_slice());
|
buffer_writer_.append(as_slice(event.raw_event_));
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -15,7 +15,7 @@
|
|||||||
namespace td {
|
namespace td {
|
||||||
|
|
||||||
void BinlogEvent::init(BufferSlice &&raw_event) {
|
void BinlogEvent::init(BufferSlice &&raw_event) {
|
||||||
TlParser parser(raw_event.as_slice());
|
TlParser parser(as_slice(raw_event));
|
||||||
size_ = static_cast<uint32>(parser.fetch_int());
|
size_ = static_cast<uint32>(parser.fetch_int());
|
||||||
LOG_CHECK(size_ == raw_event.size()) << size_ << ' ' << raw_event.size() << debug_info_;
|
LOG_CHECK(size_ == raw_event.size()) << size_ << ' ' << raw_event.size() << debug_info_;
|
||||||
id_ = static_cast<uint64>(parser.fetch_long());
|
id_ = static_cast<uint64>(parser.fetch_long());
|
||||||
@ -30,21 +30,21 @@ void BinlogEvent::init(BufferSlice &&raw_event) {
|
|||||||
|
|
||||||
Slice BinlogEvent::get_data() const {
|
Slice BinlogEvent::get_data() const {
|
||||||
CHECK(raw_event_.size() >= MIN_SIZE);
|
CHECK(raw_event_.size() >= MIN_SIZE);
|
||||||
return Slice(raw_event_.as_slice().data() + HEADER_SIZE, size_ - MIN_SIZE);
|
return Slice(as_slice(raw_event_).data() + HEADER_SIZE, raw_event_.size() - MIN_SIZE);
|
||||||
}
|
}
|
||||||
|
|
||||||
Status BinlogEvent::validate() const {
|
Status BinlogEvent::validate() const {
|
||||||
if (raw_event_.size() < MIN_SIZE) {
|
if (raw_event_.size() < MIN_SIZE) {
|
||||||
return Status::Error("Too small event");
|
return Status::Error("Too small event");
|
||||||
}
|
}
|
||||||
TlParser parser(raw_event_.as_slice());
|
TlParser parser(as_slice(raw_event_));
|
||||||
uint32 size = static_cast<uint32>(parser.fetch_int());
|
uint32 size = static_cast<uint32>(parser.fetch_int());
|
||||||
if (size_ != size || size_ != raw_event_.size()) {
|
if (size_ != size || size_ != raw_event_.size()) {
|
||||||
return Status::Error(PSLICE() << "Size of event changed: " << tag("was", size_) << tag("now", size));
|
return Status::Error(PSLICE() << "Size of event changed: " << tag("was", size_) << tag("now", size));
|
||||||
}
|
}
|
||||||
parser.fetch_string_raw<Slice>(size_ - TAIL_SIZE - sizeof(int)); // skip
|
parser.fetch_string_raw<Slice>(size_ - TAIL_SIZE - sizeof(int)); // skip
|
||||||
auto stored_crc32 = static_cast<uint32>(parser.fetch_int());
|
auto stored_crc32 = static_cast<uint32>(parser.fetch_int());
|
||||||
auto calculated_crc = crc32(Slice(raw_event_.as_slice().data(), size_ - TAIL_SIZE));
|
auto calculated_crc = crc32(Slice(as_slice(raw_event_).data(), size_ - TAIL_SIZE));
|
||||||
if (calculated_crc != crc32_ || calculated_crc != stored_crc32) {
|
if (calculated_crc != crc32_ || calculated_crc != stored_crc32) {
|
||||||
return Status::Error(PSLICE() << "CRC mismatch " << tag("actual", format::as_hex(calculated_crc))
|
return Status::Error(PSLICE() << "CRC mismatch " << tag("actual", format::as_hex(calculated_crc))
|
||||||
<< tag("expected", format::as_hex(crc32_)) << public_to_string());
|
<< tag("expected", format::as_hex(crc32_)) << public_to_string());
|
||||||
|
@ -55,12 +55,12 @@ struct BinlogEvent {
|
|||||||
|
|
||||||
int64 offset_ = -1;
|
int64 offset_ = -1;
|
||||||
|
|
||||||
uint32 size_;
|
uint32 size_ = 0;
|
||||||
uint64 id_;
|
uint64 id_ = 0;
|
||||||
int32 type_; // type can be merged with flags
|
int32 type_ = 0; // type can be merged with flags
|
||||||
int32 flags_;
|
int32 flags_ = 0;
|
||||||
uint64 extra_;
|
uint64 extra_ = 0;
|
||||||
uint32 crc32_;
|
uint32 crc32_ = 0;
|
||||||
|
|
||||||
BufferSlice raw_event_;
|
BufferSlice raw_event_;
|
||||||
|
|
||||||
@ -71,11 +71,7 @@ struct BinlogEvent {
|
|||||||
|
|
||||||
Slice get_data() const;
|
Slice get_data() const;
|
||||||
|
|
||||||
void clear() {
|
bool is_empty() const {
|
||||||
raw_event_ = BufferSlice();
|
|
||||||
}
|
|
||||||
|
|
||||||
bool empty() const {
|
|
||||||
return raw_event_.empty();
|
return raw_event_.empty();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -27,7 +27,7 @@ Status BinlogEventsProcessor::do_event(BinlogEvent &&event) {
|
|||||||
if (event.type_ == BinlogEvent::ServiceTypes::Empty) {
|
if (event.type_ == BinlogEvent::ServiceTypes::Empty) {
|
||||||
*it += 1;
|
*it += 1;
|
||||||
empty_events_++;
|
empty_events_++;
|
||||||
events_[pos].clear();
|
events_[pos] = {};
|
||||||
} else {
|
} else {
|
||||||
event.flags_ &= ~BinlogEvent::Flags::Rewrite;
|
event.flags_ &= ~BinlogEvent::Flags::Rewrite;
|
||||||
total_raw_events_size_ += static_cast<int64>(event.raw_event_.size());
|
total_raw_events_size_ += static_cast<int64>(event.raw_event_.size());
|
||||||
|
@ -405,7 +405,7 @@ class FakeBinlog final
|
|||||||
cancel_timeout();
|
cancel_timeout();
|
||||||
for (auto &pending : pending_events_) {
|
for (auto &pending : pending_events_) {
|
||||||
auto &event = pending.event;
|
auto &event = pending.event;
|
||||||
if (!event.empty()) {
|
if (!event.is_empty()) {
|
||||||
// LOG(ERROR) << "FORGET EVENT: " << event.id_ << " " << event;
|
// LOG(ERROR) << "FORGET EVENT: " << event.id_ << " " << event;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -439,7 +439,7 @@ class FakeBinlog final
|
|||||||
for (size_t i = 0; i <= pos; i++) {
|
for (size_t i = 0; i <= pos; i++) {
|
||||||
auto &pending = pending_events_[i];
|
auto &pending = pending_events_[i];
|
||||||
auto event = std::move(pending.event);
|
auto event = std::move(pending.event);
|
||||||
if (!event.empty()) {
|
if (!event.is_empty()) {
|
||||||
LOG(INFO) << "SAVE EVENT: " << event.id_ << " " << event;
|
LOG(INFO) << "SAVE EVENT: " << event.id_ << " " << event;
|
||||||
events_processor_.add_event(std::move(event)).ensure();
|
events_processor_.add_event(std::move(event)).ensure();
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user