Return all deleted events fron TQueue::clear.

This commit is contained in:
levlam 2023-01-05 23:17:06 +03:00
parent 651f49fc90
commit c4afb9283c
3 changed files with 16 additions and 8 deletions

View File

@ -218,15 +218,15 @@ class TQueueImpl final : public TQueue {
pop(q, queue_id, it, q.tail_id); pop(q, queue_id, it, q.tail_id);
} }
void clear(QueueId queue_id, size_t keep_count) final { std::map<EventId, RawEvent> clear(QueueId queue_id, size_t keep_count) final {
auto queue_it = queues_.find(queue_id); auto queue_it = queues_.find(queue_id);
if (queue_it == queues_.end()) { if (queue_it == queues_.end()) {
return; return {};
} }
auto &q = queue_it->second; auto &q = queue_it->second;
auto size = get_size(q); auto size = get_size(q);
if (size <= keep_count) { if (size <= keep_count) {
return; return {};
} }
auto start_time = Time::now(); auto start_time = Time::now();
@ -261,16 +261,23 @@ class TQueueImpl final : public TQueue {
} }
auto callback_clear_time = Time::now() - start_time; auto callback_clear_time = Time::now() - start_time;
for (auto it = q.events.begin(); it != end_it;) { q.total_event_length = 0;
remove_event(q, it); std::map<EventId, RawEvent> new_events;
for (auto it = end_it; it != q.events.end();) {
q.total_event_length += it->second.data.size();
bool is_inserted = new_events.emplace(it->first, std::move(it->second)).second;
CHECK(is_inserted);
it = q.events.erase(it);
} }
std::swap(new_events, q.events);
auto clear_time = Time::now() - start_time; auto clear_time = Time::now() - start_time;
if (clear_time > 0.1) { if (clear_time > 0.01) {
LOG(WARNING) << "Cleared " << (size - keep_count) << " TQueue events with total size " LOG(WARNING) << "Cleared " << (size - keep_count) << " TQueue events with total size "
<< (total_event_length - q.total_event_length) << " in " << clear_time - callback_clear_time << (total_event_length - q.total_event_length) << " in " << clear_time - callback_clear_time
<< " seconds and deleted them from callback in " << callback_clear_time << " seconds"; << " seconds and deleted them from callback in " << callback_clear_time << " seconds";
} }
return new_events;
} }
Result<size_t> get(QueueId queue_id, EventId from_id, bool forget_previous, int32 unix_time_now, Result<size_t> get(QueueId queue_id, EventId from_id, bool forget_previous, int32 unix_time_now,

View File

@ -104,7 +104,7 @@ class TQueue {
virtual void forget(QueueId queue_id, EventId event_id) = 0; virtual void forget(QueueId queue_id, EventId event_id) = 0;
virtual void clear(QueueId queue_id, size_t keep_count) = 0; virtual std::map<EventId, RawEvent> clear(QueueId queue_id, size_t keep_count) = 0;
virtual EventId get_head(QueueId queue_id) const = 0; virtual EventId get_head(QueueId queue_id) const = 0;
virtual EventId get_tail(QueueId queue_id) const = 0; virtual EventId get_tail(QueueId queue_id) const = 0;

View File

@ -240,11 +240,12 @@ TEST(TQueue, clear) {
auto tail_id = tqueue->get_tail(1); auto tail_id = tqueue->get_tail(1);
auto clear_start_time = td::Time::now(); auto clear_start_time = td::Time::now();
size_t keep_count = td::Random::fast(0, 2); size_t keep_count = td::Random::fast(0, 2);
tqueue->clear(1, keep_count); auto deleted_events = tqueue->clear(1, keep_count);
auto finish_time = td::Time::now(); auto finish_time = td::Time::now();
LOG(INFO) << "Added TQueue events in " << clear_start_time - start_time << " seconds and cleared them in " LOG(INFO) << "Added TQueue events in " << clear_start_time - start_time << " seconds and cleared them in "
<< finish_time - clear_start_time << " seconds"; << finish_time - clear_start_time << " seconds";
CHECK(tqueue->get_size(1) == keep_count); CHECK(tqueue->get_size(1) == keep_count);
CHECK(tqueue->get_head(1).advance(keep_count).ok() == tail_id); CHECK(tqueue->get_head(1).advance(keep_count).ok() == tail_id);
CHECK(tqueue->get_tail(1) == tail_id); CHECK(tqueue->get_tail(1) == tail_id);
CHECK(deleted_events.size() == 100000 - keep_count);
} }