diff --git a/tddb/td/db/TQueue.cpp b/tddb/td/db/TQueue.cpp index 5e0335492..b4f198f41 100644 --- a/tddb/td/db/TQueue.cpp +++ b/tddb/td/db/TQueue.cpp @@ -20,7 +20,7 @@ #include "td/utils/tl_parsers.h" #include "td/utils/tl_storers.h" -#include +#include #include namespace td { @@ -102,7 +102,8 @@ class TQueueImpl : public TQueue { return false; } auto &q = queues_[queue_id]; - if (q.events.size() >= MAX_QUEUE_EVENTS || q.total_event_length > MAX_TOTAL_EVENT_LENGTH - raw_event.data.size()) { + if (q.events.size() >= MAX_QUEUE_EVENTS || q.total_event_length > MAX_TOTAL_EVENT_LENGTH - raw_event.data.size() || + raw_event.expires_at <= 0) { return false; } auto event_id = raw_event.event_id; @@ -120,6 +121,13 @@ class TQueueImpl : public TQueue { q.events.erase(it); } } + if (q.events.empty() && !raw_event.data.empty()) { + if (q.gc_at != 0) { + bool is_deleted = queue_gc_at_.erase({q.gc_at, queue_id}); + CHECK(is_deleted); + } + schedule_queue_gc(queue_id, q, raw_event.expires_at); + } if (raw_event.logevent_id == 0 && callback_ != nullptr) { raw_event.logevent_id = callback_->push(queue_id, raw_event); @@ -226,35 +234,34 @@ class TQueueImpl : public TQueue { return Status::Error("Specified from_id is in the past"); } - return do_get(queue_id, q, from_id, forget_previous, unix_time_now, result_events); + do_get(queue_id, q, from_id, forget_previous, unix_time_now, result_events); + return get_size(queue_id); } - std::pair run_gc(int32 unix_time_now) override { - uint64 total_deleted_events = 0; - uint64 deleted_queues = 0; - for (auto queue_it = queues_.begin(); queue_it != queues_.end();) { - size_t deleted_events = 0; - for (auto it = queue_it->second.events.begin(); it != queue_it->second.events.end();) { - auto &e = it->second; - if (e.expires_at < unix_time_now) { - if (!it->second.data.empty()) { - deleted_events++; - } - pop(queue_it->second, queue_it->first, it, - e.expires_at < unix_time_now - 7 * 86400 ? EventId() : queue_it->second.tail_id); - } else { - ++it; + void run_gc(int32 unix_time_now) override { + while (!queue_gc_at_.empty()) { + auto it = queue_gc_at_.begin(); + if (it->first >= unix_time_now) { + break; + } + auto queue_id = it->second; + auto &q = queues_[queue_id]; + CHECK(q.gc_at == it->first); + q.gc_at = 0; + queue_gc_at_.erase(it); + + if (!q.events.empty()) { + auto head_id = q.events.begin()->first; + Event event; + MutableSpan span{&event, 1}; + do_get(queue_id, q, head_id, false, unix_time_now, span); + if (!span.empty()) { + CHECK(!event.data.empty()); + CHECK(event.expires_at >= unix_time_now); + schedule_queue_gc(queue_id, q, event.expires_at); } } - if (callback_ != nullptr && queue_it->second.events.empty()) { - deleted_queues++; - queue_it = queues_.erase(queue_it); - } else { - ++queue_it; - } - total_deleted_events += deleted_events; } - return {deleted_queues, total_deleted_events}; } size_t get_size(QueueId queue_id) override { @@ -282,9 +289,11 @@ class TQueueImpl : public TQueue { EventId tail_id; std::map events; size_t total_event_length = 0; + int32 gc_at = 0; }; std::unordered_map queues_; + std::set> queue_gc_at_; unique_ptr callback_; void pop(Queue &q, QueueId queue_id, std::map::iterator &it, EventId tail_id) { @@ -316,8 +325,8 @@ class TQueueImpl : public TQueue { event.data = {}; } - size_t do_get(QueueId queue_id, Queue &q, EventId from_id, bool forget_previous, int32 unix_time_now, - MutableSpan &result_events) { + void do_get(QueueId queue_id, Queue &q, EventId from_id, bool forget_previous, int32 unix_time_now, + MutableSpan &result_events) { if (forget_previous) { for (auto it = q.events.begin(); it != q.events.end() && it->first < from_id;) { pop(q, queue_id, it, q.tail_id); @@ -346,7 +355,12 @@ class TQueueImpl : public TQueue { } result_events.truncate(ready_n); - return get_size(queue_id); + } + + void schedule_queue_gc(QueueId queue_id, Queue &q, int32 gc_at) { + q.gc_at = gc_at; + bool is_inserted = queue_gc_at_.emplace(gc_at, queue_id).second; + CHECK(is_inserted); } }; diff --git a/tddb/td/db/TQueue.h b/tddb/td/db/TQueue.h index f97597d6d..590a5c9db 100644 --- a/tddb/td/db/TQueue.h +++ b/tddb/td/db/TQueue.h @@ -113,7 +113,7 @@ class TQueue { virtual size_t get_size(QueueId queue_id) = 0; - virtual std::pair run_gc(int32 unix_time_now) = 0; + virtual void run_gc(int32 unix_time_now) = 0; virtual void close(Promise<> promise) = 0; }; diff --git a/test/tqueue.cpp b/test/tqueue.cpp index 2b37ca4f8..828491a2e 100644 --- a/test/tqueue.cpp +++ b/test/tqueue.cpp @@ -31,7 +31,7 @@ TEST(TQueue, hands) { auto qid = 12; ASSERT_EQ(true, tqueue->get_head(qid).empty()); ASSERT_EQ(true, tqueue->get_tail(qid).empty()); - tqueue->push(qid, "hello", 0, 0, td::TQueue::EventId()); + tqueue->push(qid, "hello", 1, 0, td::TQueue::EventId()); auto head = tqueue->get_head(qid); auto tail = tqueue->get_tail(qid); ASSERT_EQ(head.next().ok(), tail); @@ -169,7 +169,7 @@ TEST(TQueue, random) { }; TestTQueue q; - td::int32 now = 0; + td::int32 now = 1000; auto push_event = [&] { auto data = PSTRING() << rnd(); if (rnd.fast(0, 10000) == 0) {