diff --git a/tddb/td/db/TQueue.cpp b/tddb/td/db/TQueue.cpp index 450a1af74..364e51d0a 100644 --- a/tddb/td/db/TQueue.cpp +++ b/tddb/td/db/TQueue.cpp @@ -84,8 +84,9 @@ bool EventId::is_valid_id(int32 id) { } class TQueueImpl : public TQueue { - static constexpr size_t MAX_EVENT_LEN = 65536 * 8; + static constexpr size_t MAX_EVENT_LENGTH = 65536 * 8; static constexpr size_t MAX_QUEUE_EVENTS = 1000000; + static constexpr size_t MAX_TOTAL_EVENT_LENGTH = 1 << 30; public: void set_callback(unique_ptr callback) override { @@ -103,6 +104,7 @@ class TQueueImpl : public TQueue { raw_event.logevent_id = callback_->push(queue_id, raw_event); } q.tail_id = raw_event.event_id.next().move_as_ok(); + q.total_event_length += raw_event.data.size(); q.events.push(std::move(raw_event)); return true; } @@ -117,9 +119,12 @@ class TQueueImpl : public TQueue { if (data.empty()) { return Status::Error("Data is empty"); } - if (data.size() > MAX_EVENT_LEN) { + if (data.size() > MAX_EVENT_LENGTH) { return Status::Error("Data is too big"); } + if (q.total_event_length > MAX_TOTAL_EVENT_LENGTH - data.size()) { + return Status::Error("Queue size is too big"); + } EventId event_id; while (true) { if (q.tail_id.empty()) { @@ -135,7 +140,7 @@ class TQueueImpl : public TQueue { break; } for (auto &event : q.events.as_mutable_span()) { - pop(queue_id, event, {}); + pop(q, queue_id, event, {}); } q.tail_id = EventId(); q.events = {}; @@ -185,7 +190,7 @@ class TQueueImpl : public TQueue { if (it == from_events.end() || !(it->event_id == event_id)) { return; } - pop(queue_id, *it, q.tail_id); + pop(q, queue_id, *it, q.tail_id); } Result get(QueueId queue_id, EventId from_id, bool forget_previous, double now, @@ -210,7 +215,7 @@ class TQueueImpl : public TQueue { void run_gc(double now) override { for (auto &it : queues_) { for (auto &e : it.second.events.as_mutable_span()) { - try_pop(it.first, e, EventId(), it.second.tail_id, now); + try_pop(it.second, it.first, e, EventId(), now); } } } @@ -233,6 +238,7 @@ class TQueueImpl : public TQueue { struct Queue { EventId tail_id; VectorQueue events; + size_t total_event_length = 0; }; std::unordered_map queues_; @@ -249,31 +255,36 @@ class TQueueImpl : public TQueue { events.pop_n(removed_n); } - void try_pop(QueueId queue_id, RawEvent &event, EventId from_id, EventId tail_id, double now) { + void try_pop(Queue &q, QueueId queue_id, RawEvent &event, EventId from_id, double now) { if (event.expires_at < now || event.event_id < from_id || event.data.empty()) { - pop(queue_id, event, tail_id); + pop(q, queue_id, event, q.tail_id); } } - void pop(QueueId queue_id, RawEvent &event, EventId tail_id) { + void pop(Queue &q, QueueId queue_id, RawEvent &event, EventId tail_id) { if (callback_ == nullptr || event.logevent_id == 0) { event.logevent_id = 0; - event.data = {}; + clear_event_data(q, event); return; } if (event.event_id.next().ok() == tail_id) { if (!event.data.empty()) { - event.data = {}; + clear_event_data(q, event); callback_->push(queue_id, event); } } else { callback_->pop(event.logevent_id); event.logevent_id = 0; - event.data = {}; + clear_event_data(q, event); } } + static void clear_event_data(Queue &q, RawEvent &event) { + q.total_event_length -= event.data.size(); + event.data = {}; + } + size_t do_get(QueueId queue_id, Queue &q, EventId from_id, bool forget_previous, double now, MutableSpan &result_events) { MutableSpan from_events; @@ -291,7 +302,7 @@ class TQueueImpl : public TQueue { } for (i = first_i; i < from_events.size(); i++) { auto &from = from_events[i]; - try_pop(queue_id, from, forget_previous ? from_id : EventId{}, q.tail_id, now); + try_pop(q, queue_id, from, forget_previous ? from_id : EventId{}, now); if (from.data.empty()) { continue; }