TQueue: run_gc returns count of deleted events

GitOrigin-RevId: 7cddd57c1c2842eade83aa253209462e33629dc9
This commit is contained in:
Arseny Smirnov 2020-08-07 19:36:23 +03:00
parent c48ef93e1e
commit 80d98def74
2 changed files with 17 additions and 8 deletions

View File

@ -234,7 +234,8 @@ class TQueueImpl : public TQueue {
return get_size(queue_id);
}
void run_gc(int32 unix_time_now) override {
int64 run_gc(int32 unix_time_now) override {
int64 deleted_events = 0;
while (!queue_gc_at_.empty()) {
auto it = queue_gc_at_.begin();
if (it->first >= unix_time_now) {
@ -249,7 +250,11 @@ class TQueueImpl : public TQueue {
auto head_id = q.events.begin()->first;
Event event;
MutableSpan<Event> span{&event, 1};
size_t size_before = get_size(q);
do_get(queue_id, q, head_id, false, unix_time_now, span);
size_t size_after = get_size(q);
CHECK(size_after <= size_before);
deleted_events += size_before - size_after;
if (!span.empty()) {
CHECK(!event.data.empty());
new_gc_at = event.expires_at;
@ -258,6 +263,7 @@ class TQueueImpl : public TQueue {
}
schedule_queue_gc(queue_id, q, new_gc_at);
}
return deleted_events;
}
size_t get_size(QueueId queue_id) override {
@ -265,12 +271,7 @@ class TQueueImpl : public TQueue {
if (it == queues_.end()) {
return 0;
}
auto &q = it->second;
if (q.events.empty()) {
return 0;
}
return q.events.size() - (q.events.rbegin()->second.data.empty() ? 1 : 0);
return get_size(it->second);
}
void close(Promise<> promise) override {
@ -292,6 +293,14 @@ class TQueueImpl : public TQueue {
std::set<std::pair<int32, QueueId>> queue_gc_at_;
unique_ptr<StorageCallback> callback_;
size_t get_size(Queue &q) {
if (q.events.empty()) {
return 0;
}
return q.events.size() - (q.events.rbegin()->second.data.empty() ? 1 : 0);
}
void pop(Queue &q, QueueId queue_id, std::map<EventId, RawEvent>::iterator &it, EventId tail_id) {
auto &event = it->second;
if (callback_ == nullptr || event.logevent_id == 0) {

View File

@ -113,7 +113,7 @@ class TQueue {
virtual size_t get_size(QueueId queue_id) = 0;
virtual void run_gc(int32 unix_time_now) = 0;
virtual int64 run_gc(int32 unix_time_now) = 0;
virtual void close(Promise<> promise) = 0;
};