Improve event_id variable names.

This commit is contained in:
levlam 2023-01-16 12:47:37 +03:00
parent c508e54a49
commit 3e5f30af70
11 changed files with 127 additions and 123 deletions

View File

@ -120,7 +120,7 @@ class BinlogKeyValue final : public KeyValueSyncInterface {
SeqNo set(string key, string value) final {
auto lock = rw_mutex_.lock_write().move_as_ok();
uint64 old_id = 0;
uint64 old_event_id = 0;
auto it_ok = map_.emplace(key, std::make_pair(value, 0));
if (!it_ok.second) {
if (it_ok.first->second.first == value) {
@ -128,25 +128,25 @@ class BinlogKeyValue final : public KeyValueSyncInterface {
}
VLOG(binlog) << "Change value of key " << key << " from " << hex_encode(it_ok.first->second.first) << " to "
<< hex_encode(value);
old_id = it_ok.first->second.second;
old_event_id = it_ok.first->second.second;
it_ok.first->second.first = value;
} else {
VLOG(binlog) << "Set value of key " << key << " to " << hex_encode(value);
}
bool rewrite = false;
uint64 id;
auto seq_no = binlog_->next_id();
if (old_id != 0) {
uint64 event_id;
auto seq_no = binlog_->next_event_id();
if (old_event_id != 0) {
rewrite = true;
id = old_id;
event_id = old_event_id;
} else {
id = seq_no;
it_ok.first->second.second = id;
event_id = seq_no;
it_ok.first->second.second = event_id;
}
lock.reset();
add_event(seq_no,
BinlogEvent::create_raw(id, magic_, rewrite ? BinlogEvent::Flags::Rewrite : 0, Event{key, value}));
BinlogEvent::create_raw(event_id, magic_, rewrite ? BinlogEvent::Flags::Rewrite : 0, Event{key, value}));
return seq_no;
}
@ -157,11 +157,11 @@ class BinlogKeyValue final : public KeyValueSyncInterface {
return 0;
}
VLOG(binlog) << "Remove value of key " << key << ", which is " << hex_encode(it->second.first);
uint64 id = it->second.second;
uint64 event_id = it->second.second;
map_.erase(it);
auto seq_no = binlog_->next_id();
auto seq_no = binlog_->next_event_id();
lock.reset();
add_event(seq_no, BinlogEvent::create_raw(id, BinlogEvent::ServiceTypes::Empty, BinlogEvent::Flags::Rewrite,
add_event(seq_no, BinlogEvent::create_raw(event_id, BinlogEvent::ServiceTypes::Empty, BinlogEvent::Flags::Rewrite,
EmptyStorer()));
return seq_no;
}
@ -215,18 +215,18 @@ class BinlogKeyValue final : public KeyValueSyncInterface {
void erase_by_prefix(Slice prefix) final {
auto lock = rw_mutex_.lock_write().move_as_ok();
vector<uint64> ids;
vector<uint64> event_ids;
table_remove_if(map_, [&](const auto &it) {
if (begins_with(it.first, prefix)) {
ids.push_back(it.second.second);
event_ids.push_back(it.second.second);
return true;
}
return false;
});
auto seq_no = binlog_->next_id(narrow_cast<int32>(ids.size()));
auto seq_no = binlog_->next_event_id(narrow_cast<int32>(event_ids.size()));
lock.reset();
for (auto id : ids) {
add_event(seq_no, BinlogEvent::create_raw(id, BinlogEvent::ServiceTypes::Empty, BinlogEvent::Flags::Rewrite,
for (auto event_id : event_ids) {
add_event(seq_no, BinlogEvent::create_raw(event_id, BinlogEvent::ServiceTypes::Empty, BinlogEvent::Flags::Rewrite,
EmptyStorer()));
seq_no++;
}

View File

@ -208,8 +208,8 @@ Status Binlog::init(string path, const Callback &callback, DbKey db_key, DbKey o
close().ignore();
return status;
}
info_.last_id = processor_->last_id();
last_id_ = processor_->last_id();
info_.last_event_id = processor_->last_event_id();
last_event_id_ = processor_->last_event_id();
if (info_.wrong_password) {
close().ignore();
return Status::Error(static_cast<int>(Error::WrongPassword), "Wrong password");

View File

@ -31,7 +31,7 @@ extern int32 VERBOSITY_NAME(binlog);
struct BinlogInfo {
bool was_created{false};
uint64 last_id{0};
uint64 last_event_id{0};
bool is_encrypted{false};
bool wrong_password{false};
bool is_opened{false};
@ -57,16 +57,16 @@ class Binlog {
Status init(string path, const Callback &callback, DbKey db_key = DbKey::empty(), DbKey old_db_key = DbKey::empty(),
int32 dummy = -1, const Callback &debug_callback = Callback()) TD_WARN_UNUSED_RESULT;
uint64 next_id() {
return ++last_id_;
uint64 next_event_id() {
return ++last_event_id_;
}
uint64 next_id(int32 shift) {
auto res = last_id_ + 1;
last_id_ += shift;
uint64 next_event_id(int32 shift) {
auto res = last_event_id_ + 1;
last_event_id_ += shift;
return res;
}
uint64 peek_next_id() const {
return last_id_ + 1;
uint64 peek_next_event_id() const {
return last_event_id_ + 1;
}
bool empty() const {
@ -74,22 +74,22 @@ class Binlog {
}
uint64 add(int32 type, const Storer &storer) {
auto log_event_id = next_id();
add_raw_event(BinlogEvent::create_raw(log_event_id, type, 0, storer), {});
return log_event_id;
auto event_id = next_event_id();
add_raw_event(BinlogEvent::create_raw(event_id, type, 0, storer), {});
return event_id;
}
uint64 rewrite(uint64 log_event_id, int32 type, const Storer &storer) {
auto seq_no = next_id();
add_raw_event(BinlogEvent::create_raw(log_event_id, type, BinlogEvent::Flags::Rewrite, storer), {});
uint64 rewrite(uint64 event_id, int32 type, const Storer &storer) {
auto seq_no = next_event_id();
add_raw_event(BinlogEvent::create_raw(event_id, type, BinlogEvent::Flags::Rewrite, storer), {});
return seq_no;
}
uint64 erase(uint64 log_event_id) {
auto seq_no = next_id();
add_raw_event(BinlogEvent::create_raw(log_event_id, BinlogEvent::ServiceTypes::Empty, BinlogEvent::Flags::Rewrite,
EmptyStorer()),
{});
uint64 erase(uint64 event_id) {
auto seq_no = next_event_id();
add_raw_event(
BinlogEvent::create_raw(event_id, BinlogEvent::ServiceTypes::Empty, BinlogEvent::Flags::Rewrite, EmptyStorer()),
{});
return seq_no;
}
@ -148,7 +148,7 @@ class Binlog {
unique_ptr<detail::BinlogEventsProcessor> processor_;
unique_ptr<detail::BinlogEventsBuffer> events_buffer_;
bool in_flush_events_buffer_{false};
uint64 last_id_{0};
uint64 last_event_id_{0};
double need_flush_since_ = 0;
bool need_sync_{false};
enum class State { Empty, Load, Reindex, Run } state_{State::Empty};

View File

@ -31,35 +31,35 @@ class BinlogInterface {
void close_and_destroy(Promise<> promise = {}) {
close_and_destroy_impl(std::move(promise));
}
void add_raw_event(BinlogDebugInfo info, uint64 id, BufferSlice &&raw_event, Promise<> promise = Promise<>()) {
add_raw_event_impl(id, std::move(raw_event), std::move(promise), info);
void add_raw_event(BinlogDebugInfo info, uint64 event_id, BufferSlice &&raw_event, Promise<> promise = Promise<>()) {
add_raw_event_impl(event_id, std::move(raw_event), std::move(promise), info);
}
void add_raw_event(uint64 id, BufferSlice &&raw_event, Promise<> promise = Promise<>()) {
add_raw_event_impl(id, std::move(raw_event), std::move(promise), {});
void add_raw_event(uint64 event_id, BufferSlice &&raw_event, Promise<> promise = Promise<>()) {
add_raw_event_impl(event_id, std::move(raw_event), std::move(promise), {});
}
void lazy_sync(Promise<> promise = Promise<>()) {
add_raw_event_impl(next_id(), BufferSlice(), std::move(promise), {});
add_raw_event_impl(next_event_id(), BufferSlice(), std::move(promise), {});
}
uint64 add(int32 type, const Storer &storer, Promise<> promise = Promise<>()) {
auto log_event_id = next_id();
add_raw_event_impl(log_event_id, BinlogEvent::create_raw(log_event_id, type, 0, storer), std::move(promise), {});
return log_event_id;
auto event_id = next_event_id();
add_raw_event_impl(event_id, BinlogEvent::create_raw(event_id, type, 0, storer), std::move(promise), {});
return event_id;
}
uint64 rewrite(uint64 log_event_id, int32 type, const Storer &storer, Promise<> promise = Promise<>()) {
auto seq_no = next_id();
add_raw_event_impl(seq_no, BinlogEvent::create_raw(log_event_id, type, BinlogEvent::Flags::Rewrite, storer),
uint64 rewrite(uint64 event_id, int32 type, const Storer &storer, Promise<> promise = Promise<>()) {
auto seq_no = next_event_id();
add_raw_event_impl(seq_no, BinlogEvent::create_raw(event_id, type, BinlogEvent::Flags::Rewrite, storer),
std::move(promise), {});
return seq_no;
}
uint64 erase(uint64 log_event_id, Promise<> promise = Promise<>()) {
auto seq_no = next_id();
add_raw_event_impl(seq_no,
BinlogEvent::create_raw(log_event_id, BinlogEvent::ServiceTypes::Empty,
BinlogEvent::Flags::Rewrite, EmptyStorer()),
std::move(promise), {});
uint64 erase(uint64 event_id, Promise<> promise = Promise<>()) {
auto seq_no = next_event_id();
add_raw_event_impl(
seq_no,
BinlogEvent::create_raw(event_id, BinlogEvent::ServiceTypes::Empty, BinlogEvent::Flags::Rewrite, EmptyStorer()),
std::move(promise), {});
return seq_no;
}
@ -67,13 +67,13 @@ class BinlogInterface {
virtual void force_flush() = 0;
virtual void change_key(DbKey db_key, Promise<> promise) = 0;
virtual uint64 next_id() = 0;
virtual uint64 next_id(int32 shift) = 0;
virtual uint64 next_event_id() = 0;
virtual uint64 next_event_id(int32 shift) = 0;
protected:
virtual void close_impl(Promise<> promise) = 0;
virtual void close_and_destroy_impl(Promise<> promise) = 0;
virtual void add_raw_event_impl(uint64 id, BufferSlice &&raw_event, Promise<> promise, BinlogDebugInfo info) = 0;
virtual void add_raw_event_impl(uint64 seq_no, BufferSlice &&raw_event, Promise<> promise, BinlogDebugInfo info) = 0;
};
} // namespace td

View File

@ -40,7 +40,7 @@ class BinlogActor final : public Actor {
BinlogDebugInfo debug_info;
};
void add_raw_event(uint64 seq_no, BufferSlice &&raw_event, Promise<> &&promise, BinlogDebugInfo info) {
processor_.add(seq_no, Event{std::move(raw_event), std::move(promise), info}, [&](uint64 id, Event &&event) {
processor_.add(seq_no, Event{std::move(raw_event), std::move(promise), info}, [&](uint64 event_id, Event &&event) {
if (!event.raw_event.empty()) {
do_add_raw_event(std::move(event.raw_event), event.debug_info);
}
@ -178,9 +178,9 @@ Result<BinlogInfo> ConcurrentBinlog::init(string path, const Callback &callback,
void ConcurrentBinlog::init_impl(unique_ptr<Binlog> binlog, int32 scheduler_id) {
path_ = binlog->get_path().str();
last_id_ = binlog->peek_next_id();
last_event_id_ = binlog->peek_next_event_id();
binlog_actor_ = create_actor_on_scheduler<detail::BinlogActor>(PSLICE() << "Binlog " << path_, scheduler_id,
std::move(binlog), last_id_);
std::move(binlog), last_event_id_);
}
void ConcurrentBinlog::close_impl(Promise<> promise) {
@ -189,8 +189,10 @@ void ConcurrentBinlog::close_impl(Promise<> promise) {
void ConcurrentBinlog::close_and_destroy_impl(Promise<> promise) {
send_closure(std::move(binlog_actor_), &detail::BinlogActor::close_and_destroy, std::move(promise));
}
void ConcurrentBinlog::add_raw_event_impl(uint64 id, BufferSlice &&raw_event, Promise<> promise, BinlogDebugInfo info) {
send_closure(binlog_actor_, &detail::BinlogActor::add_raw_event, id, std::move(raw_event), std::move(promise), info);
void ConcurrentBinlog::add_raw_event_impl(uint64 event_id, BufferSlice &&raw_event, Promise<> promise,
BinlogDebugInfo info) {
send_closure(binlog_actor_, &detail::BinlogActor::add_raw_event, event_id, std::move(raw_event), std::move(promise),
info);
}
void ConcurrentBinlog::force_sync(Promise<> promise) {
send_closure(binlog_actor_, &detail::BinlogActor::force_sync, std::move(promise));

View File

@ -45,11 +45,11 @@ class ConcurrentBinlog final : public BinlogInterface {
void force_flush() final;
void change_key(DbKey db_key, Promise<> promise) final;
uint64 next_id() final {
return last_id_.fetch_add(1, std::memory_order_relaxed);
uint64 next_event_id() final {
return last_event_id_.fetch_add(1, std::memory_order_relaxed);
}
uint64 next_id(int32 shift) final {
return last_id_.fetch_add(shift, std::memory_order_relaxed);
uint64 next_event_id(int32 shift) final {
return last_event_id_.fetch_add(shift, std::memory_order_relaxed);
}
CSlice get_path() const {
@ -60,11 +60,11 @@ class ConcurrentBinlog final : public BinlogInterface {
void init_impl(unique_ptr<Binlog> binlog, int scheduler_id);
void close_impl(Promise<> promise) final;
void close_and_destroy_impl(Promise<> promise) final;
void add_raw_event_impl(uint64 id, BufferSlice &&raw_event, Promise<> promise, BinlogDebugInfo info) final;
void add_raw_event_impl(uint64 event_id, BufferSlice &&raw_event, Promise<> promise, BinlogDebugInfo info) final;
ActorOwn<detail::BinlogActor> binlog_actor_;
string path_;
std::atomic<uint64> last_id_{0};
std::atomic<uint64> last_event_id_{0};
};
} // namespace td

View File

@ -44,20 +44,20 @@ struct Trie {
};
td::vector<FullNode> nodes_;
void do_add(int id, td::Slice value) {
nodes_[id].sum++;
void do_add(int event_id, td::Slice value) {
nodes_[event_id].sum++;
if (value.empty()) {
return;
}
auto c = static_cast<td::uint8>(value[0]);
auto next_id = nodes_[id].next[c];
if (next_id == 0) {
next_id = static_cast<int>(nodes_.size());
auto next_event_id = nodes_[event_id].next[c];
if (next_event_id == 0) {
next_event_id = static_cast<int>(nodes_.size());
nodes_.emplace_back();
nodes_[id].next[c] = next_id;
nodes_[event_id].next[c] = next_event_id;
}
do_add(next_id, value.substr(1));
do_add(next_event_id, value.substr(1));
}
void do_dump(td::string path, int v) {
@ -84,11 +84,11 @@ struct Trie {
return;
}
for (int c = 0; c < 256; c++) {
auto next_id = nodes_[v].next[c];
if (next_id == 0) {
auto next_event_id = nodes_[v].next[c];
if (next_event_id == 0) {
continue;
}
do_dump(path + static_cast<char>(c), next_id);
do_dump(path + static_cast<char>(c), next_event_id);
}
}
};
@ -137,9 +137,10 @@ int main(int argc, char *argv[]) {
auto key = td::TlParser(event.data_).fetch_string<td::Slice>();
info[event.type_].trie.add(key);
}
LOG(PLAIN) << "LogEvent[" << td::tag("id", td::format::as_hex(event.id_)) << td::tag("type", event.type_)
<< td::tag("flags", event.flags_) << td::tag("size", event.data_.size())
<< td::tag("data", td::format::escaped(event.data_)) << "]\n";
LOG(PLAIN) << "LogEvent[" << td::tag("event_id", td::format::as_hex(event.id_))
<< td::tag("type", event.type_) << td::tag("flags", event.flags_)
<< td::tag("size", event.data_.size()) << td::tag("data", td::format::escaped(event.data_))
<< "]\n";
})
.ensure();

View File

@ -16,13 +16,13 @@ namespace detail {
Status BinlogEventsProcessor::do_event(BinlogEvent &&event) {
offset_ = event.offset_;
auto fixed_id = event.id_ * 2;
if ((event.flags_ & BinlogEvent::Flags::Rewrite) && !ids_.empty() && ids_.back() >= fixed_id) {
auto it = std::lower_bound(ids_.begin(), ids_.end(), fixed_id);
if (it == ids_.end() || *it != fixed_id) {
auto fixed_event_id = event.id_ * 2;
if ((event.flags_ & BinlogEvent::Flags::Rewrite) && !event_ids_.empty() && event_ids_.back() >= fixed_event_id) {
auto it = std::lower_bound(event_ids_.begin(), event_ids_.end(), fixed_event_id);
if (it == event_ids_.end() || *it != fixed_event_id) {
return Status::Error(PSLICE() << "Ignore rewrite log event " << event.public_to_string());
}
auto pos = it - ids_.begin();
auto pos = it - event_ids_.begin();
total_raw_events_size_ -= static_cast<int64>(events_[pos].raw_event_.size());
if (event.type_ == BinlogEvent::ServiceTypes::Empty) {
*it += 1;
@ -36,15 +36,15 @@ Status BinlogEventsProcessor::do_event(BinlogEvent &&event) {
} else if (event.type_ < 0) {
// just skip service events
} else {
if (!(ids_.empty() || ids_.back() < fixed_id)) {
return Status::Error(PSLICE() << offset_ << " " << ids_.size() << " " << ids_.back() << " " << fixed_id << " "
<< event.public_to_string() << " " << total_events_ << " "
if (!(event_ids_.empty() || event_ids_.back() < fixed_event_id)) {
return Status::Error(PSLICE() << offset_ << " " << event_ids_.size() << " " << event_ids_.back() << " "
<< fixed_event_id << " " << event.public_to_string() << " " << total_events_ << " "
<< total_raw_events_size_);
}
last_id_ = event.id_;
last_event_id_ = event.id_;
total_raw_events_size_ += static_cast<int64>(event.raw_event_.size());
total_events_++;
ids_.push_back(fixed_id);
event_ids_.push_back(fixed_event_id);
events_.emplace_back(std::move(event));
}
@ -55,22 +55,22 @@ Status BinlogEventsProcessor::do_event(BinlogEvent &&event) {
}
void BinlogEventsProcessor::compactify() {
CHECK(ids_.size() == events_.size());
auto ids_from = ids_.begin();
auto ids_to = ids_from;
CHECK(event_ids_.size() == events_.size());
auto event_ids_from = event_ids_.begin();
auto event_ids_to = event_ids_from;
auto events_from = events_.begin();
auto events_to = events_from;
for (; ids_from != ids_.end(); ids_from++, events_from++) {
if ((*ids_from & 1) == 0) {
*ids_to++ = *ids_from;
for (; event_ids_from != event_ids_.end(); event_ids_from++, events_from++) {
if ((*event_ids_from & 1) == 0) {
*event_ids_to++ = *event_ids_from;
*events_to++ = std::move(*events_from);
}
}
ids_.erase(ids_to, ids_.end());
event_ids_.erase(event_ids_to, event_ids_.end());
events_.erase(events_to, events_.end());
total_events_ = ids_.size();
total_events_ = event_ids_.size();
empty_events_ = 0;
CHECK(ids_.size() == events_.size());
CHECK(event_ids_.size() == events_.size());
}
} // namespace detail

View File

@ -23,17 +23,18 @@ class BinlogEventsProcessor {
template <class CallbackT>
void for_each(CallbackT &&callback) {
for (size_t i = 0; i < ids_.size(); i++) {
LOG_CHECK(i == 0 || ids_[i - 1] < ids_[i]) << ids_[i - 1] << " " << events_[i - 1].public_to_string() << " "
<< ids_[i] << " " << events_[i].public_to_string();
if ((ids_[i] & 1) == 0) {
for (size_t i = 0; i < event_ids_.size(); i++) {
LOG_CHECK(i == 0 || event_ids_[i - 1] < event_ids_[i])
<< event_ids_[i - 1] << " " << events_[i - 1].public_to_string() << " " << event_ids_[i] << " "
<< events_[i].public_to_string();
if ((event_ids_[i] & 1) == 0) {
callback(events_[i]);
}
}
}
uint64 last_id() const {
return last_id_;
uint64 last_event_id() const {
return last_event_id_;
}
int64 offset() const {
return offset_;
@ -43,12 +44,12 @@ class BinlogEventsProcessor {
}
private:
// holds (id * 2 + was_deleted)
std::vector<uint64> ids_;
// holds (event_id * 2 + was_deleted)
std::vector<uint64> event_ids_;
std::vector<BinlogEvent> events_;
size_t total_events_{0};
size_t empty_events_{0};
uint64 last_id_{0};
uint64 last_event_id_{0};
int64 offset_{0};
int64 total_raw_events_size_{0};

View File

@ -75,17 +75,17 @@ TEST(DB, binlog_encryption) {
{
td::Binlog binlog;
binlog.init(binlog_name.str(), [](const td::BinlogEvent &x) {}).ensure();
binlog.add_raw_event(td::BinlogEvent::create_raw(binlog.next_id(), 1, 0, td::create_storer("AAAA")),
binlog.add_raw_event(td::BinlogEvent::create_raw(binlog.next_event_id(), 1, 0, td::create_storer("AAAA")),
td::BinlogDebugInfo{__FILE__, __LINE__});
binlog.add_raw_event(td::BinlogEvent::create_raw(binlog.next_id(), 1, 0, td::create_storer("BBBB")),
binlog.add_raw_event(td::BinlogEvent::create_raw(binlog.next_event_id(), 1, 0, td::create_storer("BBBB")),
td::BinlogDebugInfo{__FILE__, __LINE__});
binlog.add_raw_event(td::BinlogEvent::create_raw(binlog.next_id(), 1, 0, td::create_storer(long_data)),
binlog.add_raw_event(td::BinlogEvent::create_raw(binlog.next_event_id(), 1, 0, td::create_storer(long_data)),
td::BinlogDebugInfo{__FILE__, __LINE__});
LOG(INFO) << "SET PASSWORD";
binlog.change_key(cucumber);
binlog.change_key(hello);
LOG(INFO) << "OK";
binlog.add_raw_event(td::BinlogEvent::create_raw(binlog.next_id(), 1, 0, td::create_storer("CCCC")),
binlog.add_raw_event(td::BinlogEvent::create_raw(binlog.next_event_id(), 1, 0, td::create_storer("CCCC")),
td::BinlogDebugInfo{__FILE__, __LINE__});
binlog.close().ensure();
}

View File

@ -382,14 +382,14 @@ class FakeBinlog final
void force_flush() final {
}
uint64 next_id() final {
auto res = last_id_;
last_id_++;
uint64 next_event_id() final {
auto res = last_event_id_;
last_event_id_++;
return res;
}
uint64 next_id(int32 shift) final {
auto res = last_id_;
last_id_ += shift;
uint64 next_event_id(int32 shift) final {
auto res = last_event_id_;
last_event_id_ += shift;
return res;
}
template <class F>
@ -420,7 +420,7 @@ class FakeBinlog final
}
void close_and_destroy_impl(Promise<> promise) final {
}
void add_raw_event_impl(uint64 id, BufferSlice &&raw_event, Promise<> promise, BinlogDebugInfo info) final {
void add_raw_event_impl(uint64 event_id, BufferSlice &&raw_event, Promise<> promise, BinlogDebugInfo info) final {
auto event = BinlogEvent(std::move(raw_event), info);
LOG(INFO) << "ADD EVENT: " << event.id_ << " " << event;
pending_events_.emplace_back();
@ -464,7 +464,7 @@ class FakeBinlog final
}
}
bool has_request_sync = false;
uint64 last_id_ = 1;
uint64 last_event_id_ = 1;
detail::BinlogEventsProcessor events_processor_;
struct PendingEvent {