From ebe93d7e4faf52d5099b562c8a257235fa5b9fac Mon Sep 17 00:00:00 2001 From: Arseny Smirnov Date: Fri, 10 Feb 2023 15:14:14 +0100 Subject: [PATCH] Binlog::erase_batch, TQueue:pop_batch --- tddb/td/db/TQueue.cpp | 14 +++++++++++--- tddb/td/db/TQueue.h | 2 ++ tddb/td/db/binlog/Binlog.h | 8 ++++++++ tddb/td/db/binlog/BinlogInterface.h | 8 ++++++++ tddb/td/db/binlog/ConcurrentBinlog.cpp | 16 ++++++++++++++++ tddb/td/db/binlog/ConcurrentBinlog.h | 1 + 6 files changed, 46 insertions(+), 3 deletions(-) diff --git a/tddb/td/db/TQueue.cpp b/tddb/td/db/TQueue.cpp index bcc9d3f22..186be7f8f 100644 --- a/tddb/td/db/TQueue.cpp +++ b/tddb/td/db/TQueue.cpp @@ -258,9 +258,7 @@ class TQueueImpl final : public TQueue { } } collect_deleted_event_ids_time = Time::now() - start_time; - for (auto log_event_id : deleted_log_event_ids) { - callback_->pop(log_event_id); - } + callback_->pop_batch(std::move(deleted_log_event_ids)); } auto callback_clear_time = Time::now() - start_time; @@ -547,6 +545,11 @@ void TQueueBinlog::pop(uint64 log_event_id) { binlog_->erase(log_event_id); } +template +void TQueueBinlog::pop_batch(std::vector log_event_ids) { + binlog_->erase_batch(std::move(log_event_ids)); +} + template Status TQueueBinlog::replay(const BinlogEvent &binlog_event, TQueue &q) const { TQueueLogEvent event; @@ -602,4 +605,9 @@ void TQueueMemoryStorage::close(Promise<> promise) { promise.set_value({}); } +void TQueue::StorageCallback::pop_batch(std::vector log_event_ids) { + for (auto id : log_event_ids) { + pop(id); + } +} } // namespace td diff --git a/tddb/td/db/TQueue.h b/tddb/td/db/TQueue.h index 694e82831..c47d7a725 100644 --- a/tddb/td/db/TQueue.h +++ b/tddb/td/db/TQueue.h @@ -83,6 +83,7 @@ class TQueue { virtual uint64 push(QueueId queue_id, const RawEvent &event) = 0; virtual void pop(uint64 log_event_id) = 0; virtual void close(Promise<> promise) = 0; + virtual void pop_batch(std::vector log_event_ids); }; static unique_ptr create(); @@ -128,6 +129,7 @@ class TQueueBinlog final : public TQueue::StorageCallback { public: uint64 push(QueueId queue_id, const RawEvent &event) final; void pop(uint64 log_event_id) final; + void pop_batch(std::vector log_event_ids) final; Status replay(const BinlogEvent &binlog_event, TQueue &q) const TD_WARN_UNUSED_RESULT; void set_binlog(std::shared_ptr binlog) { diff --git a/tddb/td/db/binlog/Binlog.h b/tddb/td/db/binlog/Binlog.h index 70029129a..762a4cfcc 100644 --- a/tddb/td/db/binlog/Binlog.h +++ b/tddb/td/db/binlog/Binlog.h @@ -93,6 +93,14 @@ class Binlog { return seq_no; } + uint64 erase_batch(std::vector event_ids) { + auto seq_no = next_event_id(0); + for (auto event_id : event_ids) { + erase(event_id); + } + return seq_no; + } + void add_raw_event(BufferSlice &&raw_event, BinlogDebugInfo info) { add_event(BinlogEvent(std::move(raw_event), info)); } diff --git a/tddb/td/db/binlog/BinlogInterface.h b/tddb/td/db/binlog/BinlogInterface.h index 31b1b1c1f..f27e351bd 100644 --- a/tddb/td/db/binlog/BinlogInterface.h +++ b/tddb/td/db/binlog/BinlogInterface.h @@ -63,6 +63,14 @@ class BinlogInterface { return seq_no; } + virtual uint64 erase_batch(std::vector event_ids) { + uint64 seq_no = next_event_id(0); + for (auto id : event_ids) { + erase(id); + } + return seq_no; + } + virtual void force_sync(Promise<> promise) = 0; virtual void force_flush() = 0; virtual void change_key(DbKey db_key, Promise<> promise) = 0; diff --git a/tddb/td/db/binlog/ConcurrentBinlog.cpp b/tddb/td/db/binlog/ConcurrentBinlog.cpp index 1dfe10b67..cd6a95707 100644 --- a/tddb/td/db/binlog/ConcurrentBinlog.cpp +++ b/tddb/td/db/binlog/ConcurrentBinlog.cpp @@ -39,6 +39,15 @@ class BinlogActor final : public Actor { Promise<> sync_promise; BinlogDebugInfo debug_info; }; + + void erase_batch(uint64 seq_no, std::vector event_ids) { + for (auto event_id : event_ids) { + auto event = BinlogEvent::create_raw(event_id, BinlogEvent::ServiceTypes::Empty, BinlogEvent::Flags::Rewrite, EmptyStorer()); + add_raw_event(seq_no, std::move(event), {}, {}); + seq_no++; + } + } + void add_raw_event(uint64 seq_no, BufferSlice &&raw_event, Promise<> &&promise, BinlogDebugInfo info) { processor_.add(seq_no, Event{std::move(raw_event), std::move(promise), info}, [&](uint64 event_id, Event &&event) { if (!event.raw_event.empty()) { @@ -203,4 +212,11 @@ void ConcurrentBinlog::force_flush() { void ConcurrentBinlog::change_key(DbKey db_key, Promise<> promise) { send_closure(binlog_actor_, &detail::BinlogActor::change_key, std::move(db_key), std::move(promise)); } + +uint64 ConcurrentBinlog::erase_batch(std::vector event_ids) { + auto shift = td::narrow_cast(event_ids.size()); + auto seq_no = next_event_id(shift); + send_closure(binlog_actor_, &detail::BinlogActor::erase_batch, seq_no, std::move(event_ids)); + return seq_no; +} } // namespace td diff --git a/tddb/td/db/binlog/ConcurrentBinlog.h b/tddb/td/db/binlog/ConcurrentBinlog.h index 51eeff280..33e91b11b 100644 --- a/tddb/td/db/binlog/ConcurrentBinlog.h +++ b/tddb/td/db/binlog/ConcurrentBinlog.h @@ -55,6 +55,7 @@ class ConcurrentBinlog final : public BinlogInterface { CSlice get_path() const { return path_; } + uint64 erase_batch(std::vector event_ids) final; private: void init_impl(unique_ptr binlog, int scheduler_id);