Binlog::erase_batch, TQueue:pop_batch

This commit is contained in:
Arseny Smirnov 2023-02-10 15:14:14 +01:00
parent 1c9efb4283
commit ebe93d7e4f
6 changed files with 46 additions and 3 deletions

View File

@ -258,9 +258,7 @@ class TQueueImpl final : public TQueue {
}
}
collect_deleted_event_ids_time = Time::now() - start_time;
for (auto log_event_id : deleted_log_event_ids) {
callback_->pop(log_event_id);
}
callback_->pop_batch(std::move(deleted_log_event_ids));
}
auto callback_clear_time = Time::now() - start_time;
@ -547,6 +545,11 @@ void TQueueBinlog<BinlogT>::pop(uint64 log_event_id) {
binlog_->erase(log_event_id);
}
template <class BinlogT>
void TQueueBinlog<BinlogT>::pop_batch(std::vector<uint64> log_event_ids) {
binlog_->erase_batch(std::move(log_event_ids));
}
template <class BinlogT>
Status TQueueBinlog<BinlogT>::replay(const BinlogEvent &binlog_event, TQueue &q) const {
TQueueLogEvent event;
@ -602,4 +605,9 @@ void TQueueMemoryStorage::close(Promise<> promise) {
promise.set_value({});
}
void TQueue::StorageCallback::pop_batch(std::vector<uint64> log_event_ids) {
for (auto id : log_event_ids) {
pop(id);
}
}
} // namespace td

View File

@ -83,6 +83,7 @@ class TQueue {
virtual uint64 push(QueueId queue_id, const RawEvent &event) = 0;
virtual void pop(uint64 log_event_id) = 0;
virtual void close(Promise<> promise) = 0;
virtual void pop_batch(std::vector<uint64> log_event_ids);
};
static unique_ptr<TQueue> create();
@ -128,6 +129,7 @@ class TQueueBinlog final : public TQueue::StorageCallback {
public:
uint64 push(QueueId queue_id, const RawEvent &event) final;
void pop(uint64 log_event_id) final;
void pop_batch(std::vector<uint64> log_event_ids) final;
Status replay(const BinlogEvent &binlog_event, TQueue &q) const TD_WARN_UNUSED_RESULT;
void set_binlog(std::shared_ptr<BinlogT> binlog) {

View File

@ -93,6 +93,14 @@ class Binlog {
return seq_no;
}
uint64 erase_batch(std::vector<uint64> event_ids) {
auto seq_no = next_event_id(0);
for (auto event_id : event_ids) {
erase(event_id);
}
return seq_no;
}
void add_raw_event(BufferSlice &&raw_event, BinlogDebugInfo info) {
add_event(BinlogEvent(std::move(raw_event), info));
}

View File

@ -63,6 +63,14 @@ class BinlogInterface {
return seq_no;
}
virtual uint64 erase_batch(std::vector<uint64> event_ids) {
uint64 seq_no = next_event_id(0);
for (auto id : event_ids) {
erase(id);
}
return seq_no;
}
virtual void force_sync(Promise<> promise) = 0;
virtual void force_flush() = 0;
virtual void change_key(DbKey db_key, Promise<> promise) = 0;

View File

@ -39,6 +39,15 @@ class BinlogActor final : public Actor {
Promise<> sync_promise;
BinlogDebugInfo debug_info;
};
void erase_batch(uint64 seq_no, std::vector<uint64> event_ids) {
for (auto event_id : event_ids) {
auto event = BinlogEvent::create_raw(event_id, BinlogEvent::ServiceTypes::Empty, BinlogEvent::Flags::Rewrite, EmptyStorer());
add_raw_event(seq_no, std::move(event), {}, {});
seq_no++;
}
}
void add_raw_event(uint64 seq_no, BufferSlice &&raw_event, Promise<> &&promise, BinlogDebugInfo info) {
processor_.add(seq_no, Event{std::move(raw_event), std::move(promise), info}, [&](uint64 event_id, Event &&event) {
if (!event.raw_event.empty()) {
@ -203,4 +212,11 @@ void ConcurrentBinlog::force_flush() {
void ConcurrentBinlog::change_key(DbKey db_key, Promise<> promise) {
send_closure(binlog_actor_, &detail::BinlogActor::change_key, std::move(db_key), std::move(promise));
}
uint64 ConcurrentBinlog::erase_batch(std::vector<uint64> event_ids) {
auto shift = td::narrow_cast<int32>(event_ids.size());
auto seq_no = next_event_id(shift);
send_closure(binlog_actor_, &detail::BinlogActor::erase_batch, seq_no, std::move(event_ids));
return seq_no;
}
} // namespace td

View File

@ -55,6 +55,7 @@ class ConcurrentBinlog final : public BinlogInterface {
CSlice get_path() const {
return path_;
}
uint64 erase_batch(std::vector<uint64> event_ids) final;
private:
void init_impl(unique_ptr<Binlog> binlog, int scheduler_id);