diff --git a/tddb/td/db/TQueue.cpp b/tddb/td/db/TQueue.cpp index c3c593628..f15602426 100644 --- a/tddb/td/db/TQueue.cpp +++ b/tddb/td/db/TQueue.cpp @@ -19,9 +19,8 @@ #include "td/utils/tl_helpers.h" #include "td/utils/tl_parsers.h" #include "td/utils/tl_storers.h" -#include "td/utils/VectorQueue.h" -#include +#include #include namespace td { @@ -106,16 +105,18 @@ class TQueueImpl : public TQueue { if (q.events.size() >= MAX_QUEUE_EVENTS || q.total_event_length > MAX_TOTAL_EVENT_LENGTH - raw_event.data.size()) { return false; } - if (q.events.empty() || q.events.back().event_id < raw_event.event_id) { - if (raw_event.logevent_id == 0 && callback_ != nullptr) { - raw_event.logevent_id = callback_->push(queue_id, raw_event); - } - q.tail_id = raw_event.event_id.next().move_as_ok(); - q.total_event_length += raw_event.data.size(); - q.events.push(std::move(raw_event)); - return true; + auto event_id = raw_event.event_id; + if (event_id < q.tail_id) { + return false; } - return false; + + if (raw_event.logevent_id == 0 && callback_ != nullptr) { + raw_event.logevent_id = callback_->push(queue_id, raw_event); + } + q.tail_id = event_id.next().move_as_ok(); + q.total_event_length += raw_event.data.size(); + q.events.emplace(event_id, std::move(raw_event)); + return true; } Result push(QueueId queue_id, string data, double expires_at, int64 extra, EventId hint_new_id) override { @@ -147,11 +148,10 @@ class TQueueImpl : public TQueue { if (event_id.next().is_ok()) { break; } - for (auto &event : q.events.as_mutable_span()) { - pop(q, queue_id, event, {}); + for (auto it = q.events.begin(); it != q.events.end();) { + pop(q, queue_id, it, {}); } q.tail_id = EventId(); - q.events = {}; CHECK(hint_new_id.next().is_ok()); } @@ -174,7 +174,7 @@ class TQueueImpl : public TQueue { if (q.events.empty()) { return q.tail_id; } - return q.events.front().event_id; + return q.events.begin()->first; } EventId get_tail(QueueId queue_id) const override { @@ -192,13 +192,11 @@ class TQueueImpl : public TQueue { return; } auto &q = q_it->second; - auto from_events = q.events.as_mutable_span(); - auto it = std::lower_bound(from_events.begin(), from_events.end(), event_id, - [](auto &event, EventId event_id) { return event.event_id < event_id; }); - if (it == from_events.end() || !(it->event_id == event_id)) { + auto it = q.events.find(event_id); + if (it == q.events.end()) { return; } - pop(q, queue_id, *it, q.tail_id); + pop(q, queue_id, it, q.tail_id); } Result get(QueueId queue_id, EventId from_id, bool forget_previous, double now, @@ -223,21 +221,25 @@ class TQueueImpl : public TQueue { std::pair run_gc(double now) override { uint64 total_deleted_events = 0; uint64 deleted_queues = 0; - for (auto it = queues_.begin(); it != queues_.end();) { + for (auto queue_it = queues_.begin(); queue_it != queues_.end();) { size_t deleted_events = 0; - for (auto &e : it->second.events.as_mutable_span()) { + for (auto it = queue_it->second.events.begin(); it != queue_it->second.events.end();) { + auto &e = it->second; if (e.expires_at < now) { - pop(it->second, it->first, e, e.expires_at < now - 7 * 86400 ? EventId() : it->second.tail_id); - if (e.logevent_id == 0 && e.data.empty()) { + if (!it->second.data.empty()) { deleted_events++; } + pop(queue_it->second, queue_it->first, it, + e.expires_at < now - 7 * 86400 ? EventId() : queue_it->second.tail_id); + } else { + ++it; } } - if (callback_ != nullptr && deleted_events == it->second.events.size()) { + if (callback_ != nullptr && queue_it->second.events.empty()) { deleted_queues++; - it = queues_.erase(it); + queue_it = queues_.erase(queue_it); } else { - ++it; + ++queue_it; } total_deleted_events += deleted_events; } @@ -254,46 +256,31 @@ class TQueueImpl : public TQueue { return 0; } - MutableSpan span; - return do_get(queue_id, q, q.events.front().event_id, true, Time::now(), span); + return q.events.size() - (q.events.rbegin()->second.data.empty() ? 1 : 0); } void close(Promise<> promise) override { - callback_->close(std::move(promise)); - callback_ = nullptr; + if (callback_ != nullptr) { + callback_->close(std::move(promise)); + callback_ = nullptr; + } } private: struct Queue { EventId tail_id; - VectorQueue events; + std::map events; size_t total_event_length = 0; }; std::unordered_map queues_; unique_ptr callback_; - static void compactify(VectorQueue &events, size_t prefix) { - if (prefix == events.size()) { - CHECK(!events.empty()); - prefix--; - } - auto processed = events.as_mutable_span().substr(0, prefix); - auto removed_n = - processed.rend() - std::remove_if(processed.rbegin(), processed.rend(), [](auto &e) { return e.data.empty(); }); - events.pop_n(removed_n); - } - - void try_pop(Queue &q, QueueId queue_id, RawEvent &event, EventId from_id, double now) { - if (event.expires_at < now || event.event_id < from_id || event.data.empty()) { - pop(q, queue_id, event, q.tail_id); - } - } - - void pop(Queue &q, QueueId queue_id, RawEvent &event, EventId tail_id) { + void pop(Queue &q, QueueId queue_id, std::map::iterator &it, EventId tail_id) { + auto &event = it->second; if (callback_ == nullptr || event.logevent_id == 0) { event.logevent_id = 0; - clear_event_data(q, event); + remove_event(q, it); return; } @@ -302,13 +289,19 @@ class TQueueImpl : public TQueue { clear_event_data(q, event); callback_->push(queue_id, event); } + ++it; } else { callback_->pop(event.logevent_id); event.logevent_id = 0; - clear_event_data(q, event); + remove_event(q, it); } } + static void remove_event(Queue &q, std::map::iterator &it) { + q.total_event_length -= it->second.data.size(); + it = q.events.erase(it); + } + static void clear_event_data(Queue &q, RawEvent &event) { q.total_event_length -= event.data.size(); event.data = {}; @@ -316,52 +309,35 @@ class TQueueImpl : public TQueue { size_t do_get(QueueId queue_id, Queue &q, EventId from_id, bool forget_previous, double now, MutableSpan &result_events) { - MutableSpan from_events; - size_t ready_n = 0; - size_t i = 0; - - while (true) { - from_events = q.events.as_mutable_span(); - ready_n = 0; - size_t first_i = 0; - if (!forget_previous) { - first_i = std::lower_bound(from_events.begin(), from_events.end(), from_id, - [](auto &event, EventId event_id) { return event.event_id < event_id; }) - - from_events.begin(); + if (forget_previous) { + for (auto it = q.events.begin(); it != q.events.end() && it->first < from_id;) { + pop(q, queue_id, it, q.tail_id); } - for (i = first_i; i < from_events.size(); i++) { - auto &from = from_events[i]; - try_pop(q, queue_id, from, forget_previous ? from_id : EventId{}, now); - if (from.data.empty()) { - continue; - } + } + size_t ready_n = 0; + for (auto it = q.events.lower_bound(from_id); it != q.events.end();) { + auto &event = it->second; + if (event.expires_at < now || event.data.empty()) { + pop(q, queue_id, it, q.tail_id); + } else { + CHECK(!(event.event_id < from_id)); if (ready_n == result_events.size()) { break; } - CHECK(!(from.event_id < from_id)); - auto &to = result_events[ready_n]; - to.data = from.data; - to.id = from.event_id; - to.expires_at = from.expires_at; - to.extra = from.extra; + to.data = event.data; + to.id = event.event_id; + to.expires_at = event.expires_at; + to.extra = event.extra; ready_n++; + ++it; } - - // compactify skipped events - if ((ready_n + 1) * 2 < i + first_i) { - compactify(q.events, i); - continue; - } - - break; } result_events.truncate(ready_n); - size_t left_n = from_events.size() - i; - return ready_n + left_n; + return get_size(queue_id); } }; diff --git a/test/tqueue.cpp b/test/tqueue.cpp index 38a6c14c7..782105dba 100644 --- a/test/tqueue.cpp +++ b/test/tqueue.cpp @@ -35,7 +35,7 @@ TEST(TQueue, hands) { ASSERT_EQ(head.next().ok(), tail); ASSERT_EQ(1u, tqueue->get(qid, head, true, 0, events_span).move_as_ok()); ASSERT_EQ(1u, tqueue->get(qid, head, true, 0, events_span).move_as_ok()); - ASSERT_EQ(0u, tqueue->get(qid, tail, false, 0, events_span).move_as_ok()); + ASSERT_EQ(1u, tqueue->get(qid, tail, false, 0, events_span).move_as_ok()); ASSERT_EQ(1u, tqueue->get(qid, head, true, 0, events_span).move_as_ok()); ASSERT_EQ(0u, tqueue->get(qid, tail, true, 0, events_span).move_as_ok()); ASSERT_EQ(0u, tqueue->get(qid, head, true, 0, events_span).move_as_ok());