Optimize TQueue::run_gc, so it can be run arbitrary often.

GitOrigin-RevId: f7fc488cf3682683c0164c557992dbbc4eed9d4d
This commit is contained in:
levlam 2020-08-07 06:06:29 +03:00
parent bea0ab1282
commit 2097934da5
3 changed files with 46 additions and 32 deletions

View File

@ -20,7 +20,7 @@
#include "td/utils/tl_parsers.h"
#include "td/utils/tl_storers.h"
#include <map>
#include <set>
#include <unordered_map>
namespace td {
@ -102,7 +102,8 @@ class TQueueImpl : public TQueue {
return false;
}
auto &q = queues_[queue_id];
if (q.events.size() >= MAX_QUEUE_EVENTS || q.total_event_length > MAX_TOTAL_EVENT_LENGTH - raw_event.data.size()) {
if (q.events.size() >= MAX_QUEUE_EVENTS || q.total_event_length > MAX_TOTAL_EVENT_LENGTH - raw_event.data.size() ||
raw_event.expires_at <= 0) {
return false;
}
auto event_id = raw_event.event_id;
@ -120,6 +121,13 @@ class TQueueImpl : public TQueue {
q.events.erase(it);
}
}
if (q.events.empty() && !raw_event.data.empty()) {
if (q.gc_at != 0) {
bool is_deleted = queue_gc_at_.erase({q.gc_at, queue_id});
CHECK(is_deleted);
}
schedule_queue_gc(queue_id, q, raw_event.expires_at);
}
if (raw_event.logevent_id == 0 && callback_ != nullptr) {
raw_event.logevent_id = callback_->push(queue_id, raw_event);
@ -226,35 +234,34 @@ class TQueueImpl : public TQueue {
return Status::Error("Specified from_id is in the past");
}
return do_get(queue_id, q, from_id, forget_previous, unix_time_now, result_events);
do_get(queue_id, q, from_id, forget_previous, unix_time_now, result_events);
return get_size(queue_id);
}
std::pair<uint64, uint64> run_gc(int32 unix_time_now) override {
uint64 total_deleted_events = 0;
uint64 deleted_queues = 0;
for (auto queue_it = queues_.begin(); queue_it != queues_.end();) {
size_t deleted_events = 0;
for (auto it = queue_it->second.events.begin(); it != queue_it->second.events.end();) {
auto &e = it->second;
if (e.expires_at < unix_time_now) {
if (!it->second.data.empty()) {
deleted_events++;
}
pop(queue_it->second, queue_it->first, it,
e.expires_at < unix_time_now - 7 * 86400 ? EventId() : queue_it->second.tail_id);
} else {
++it;
void run_gc(int32 unix_time_now) override {
while (!queue_gc_at_.empty()) {
auto it = queue_gc_at_.begin();
if (it->first >= unix_time_now) {
break;
}
auto queue_id = it->second;
auto &q = queues_[queue_id];
CHECK(q.gc_at == it->first);
q.gc_at = 0;
queue_gc_at_.erase(it);
if (!q.events.empty()) {
auto head_id = q.events.begin()->first;
Event event;
MutableSpan<Event> span{&event, 1};
do_get(queue_id, q, head_id, false, unix_time_now, span);
if (!span.empty()) {
CHECK(!event.data.empty());
CHECK(event.expires_at >= unix_time_now);
schedule_queue_gc(queue_id, q, event.expires_at);
}
}
if (callback_ != nullptr && queue_it->second.events.empty()) {
deleted_queues++;
queue_it = queues_.erase(queue_it);
} else {
++queue_it;
}
total_deleted_events += deleted_events;
}
return {deleted_queues, total_deleted_events};
}
size_t get_size(QueueId queue_id) override {
@ -282,9 +289,11 @@ class TQueueImpl : public TQueue {
EventId tail_id;
std::map<EventId, RawEvent> events;
size_t total_event_length = 0;
int32 gc_at = 0;
};
std::unordered_map<QueueId, Queue> queues_;
std::set<std::pair<int32, QueueId>> queue_gc_at_;
unique_ptr<StorageCallback> callback_;
void pop(Queue &q, QueueId queue_id, std::map<EventId, RawEvent>::iterator &it, EventId tail_id) {
@ -316,8 +325,8 @@ class TQueueImpl : public TQueue {
event.data = {};
}
size_t do_get(QueueId queue_id, Queue &q, EventId from_id, bool forget_previous, int32 unix_time_now,
MutableSpan<Event> &result_events) {
void do_get(QueueId queue_id, Queue &q, EventId from_id, bool forget_previous, int32 unix_time_now,
MutableSpan<Event> &result_events) {
if (forget_previous) {
for (auto it = q.events.begin(); it != q.events.end() && it->first < from_id;) {
pop(q, queue_id, it, q.tail_id);
@ -346,7 +355,12 @@ class TQueueImpl : public TQueue {
}
result_events.truncate(ready_n);
return get_size(queue_id);
}
void schedule_queue_gc(QueueId queue_id, Queue &q, int32 gc_at) {
q.gc_at = gc_at;
bool is_inserted = queue_gc_at_.emplace(gc_at, queue_id).second;
CHECK(is_inserted);
}
};

View File

@ -113,7 +113,7 @@ class TQueue {
virtual size_t get_size(QueueId queue_id) = 0;
virtual std::pair<uint64, uint64> run_gc(int32 unix_time_now) = 0;
virtual void run_gc(int32 unix_time_now) = 0;
virtual void close(Promise<> promise) = 0;
};

View File

@ -31,7 +31,7 @@ TEST(TQueue, hands) {
auto qid = 12;
ASSERT_EQ(true, tqueue->get_head(qid).empty());
ASSERT_EQ(true, tqueue->get_tail(qid).empty());
tqueue->push(qid, "hello", 0, 0, td::TQueue::EventId());
tqueue->push(qid, "hello", 1, 0, td::TQueue::EventId());
auto head = tqueue->get_head(qid);
auto tail = tqueue->get_tail(qid);
ASSERT_EQ(head.next().ok(), tail);
@ -169,7 +169,7 @@ TEST(TQueue, random) {
};
TestTQueue q;
td::int32 now = 0;
td::int32 now = 1000;
auto push_event = [&] {
auto data = PSTRING() << rnd();
if (rnd.fast(0, 10000) == 0) {