Add TQueue::get_size.

GitOrigin-RevId: 20b842bfb145a890e149f80bff56f0fa80785337
This commit is contained in:
levlam 2020-06-24 15:11:39 +03:00
parent f6b4ced7bb
commit a9e95b7f4b
2 changed files with 74 additions and 53 deletions

View File

@ -205,6 +205,78 @@ class TQueueImpl : public TQueue {
return Status::Error("Specified from_id is in the past");
}
return do_get(queue_id, q, from_id, forget_previous, now, result_events);
}
void run_gc(double now) override {
for (auto &it : queues_) {
for (auto &e : it.second.events.as_mutable_span()) {
try_pop(it.first, e, EventId(), it.second.tail_id, now);
}
}
}
size_t get_size(QueueId queue_id) override {
auto it = queues_.find(queue_id);
if (it == queues_.end()) {
return 0;
}
auto &q = it->second;
if (q.events.empty()) {
return 0;
}
td::MutableSpan<td::TQueue::Event> span;
return do_get(queue_id, q, q.events.front().event_id, true, Time::now(), span);
}
private:
struct Queue {
EventId tail_id;
VectorQueue<RawEvent> events;
};
std::unordered_map<QueueId, Queue> queues_;
unique_ptr<StorageCallback> callback_;
static void compactify(VectorQueue<RawEvent> &events, size_t prefix) {
if (prefix == events.size()) {
CHECK(!events.empty());
prefix--;
}
auto processed = events.as_mutable_span().substr(0, prefix);
auto removed_n =
processed.rend() - std::remove_if(processed.rbegin(), processed.rend(), [](auto &e) { return e.data.empty(); });
events.pop_n(removed_n);
}
void try_pop(QueueId queue_id, RawEvent &event, EventId from_id, EventId tail_id, double now) {
if (event.expires_at < now || event.event_id < from_id || event.data.empty()) {
pop(queue_id, event, tail_id);
}
}
void pop(QueueId queue_id, RawEvent &event, EventId tail_id) {
if (callback_ == nullptr || event.logevent_id == 0) {
event.logevent_id = 0;
event.data = {};
return;
}
if (event.event_id.next().ok() == tail_id) {
if (!event.data.empty()) {
event.data = {};
callback_->push(queue_id, event);
}
} else {
callback_->pop(event.logevent_id);
event.logevent_id = 0;
event.data = {};
}
}
size_t do_get(QueueId queue_id, Queue &q, EventId from_id, bool forget_previous, double now,
MutableSpan<Event> &result_events) {
MutableSpan<RawEvent> from_events;
size_t ready_n = 0;
size_t i = 0;
@ -252,59 +324,6 @@ class TQueueImpl : public TQueue {
size_t left_n = from_events.size() - i;
return ready_n + left_n;
}
void run_gc(double now) override {
for (auto &it : queues_) {
for (auto &e : it.second.events.as_mutable_span()) {
try_pop(it.first, e, EventId(), it.second.tail_id, now);
}
}
}
private:
struct Queue {
EventId tail_id;
VectorQueue<RawEvent> events;
};
std::unordered_map<QueueId, Queue> queues_;
unique_ptr<StorageCallback> callback_;
static void compactify(VectorQueue<RawEvent> &events, size_t prefix) {
if (prefix == events.size()) {
CHECK(!events.empty());
prefix--;
}
auto processed = events.as_mutable_span().substr(0, prefix);
auto removed_n =
processed.rend() - std::remove_if(processed.rbegin(), processed.rend(), [](auto &e) { return e.data.empty(); });
events.pop_n(removed_n);
}
void try_pop(QueueId queue_id, RawEvent &event, EventId from_id, EventId tail_id, double now) {
if (event.expires_at < now || event.event_id < from_id || event.data.empty()) {
pop(queue_id, event, tail_id);
}
}
void pop(QueueId queue_id, RawEvent &event, EventId tail_id) {
if (callback_ == nullptr || event.logevent_id == 0) {
event.logevent_id = 0;
event.data = {};
return;
}
if (event.event_id.next().ok() == tail_id) {
if (!event.data.empty()) {
event.data = {};
callback_->push(queue_id, event);
}
} else {
callback_->pop(event.logevent_id);
event.logevent_id = 0;
event.data = {};
}
}
};
unique_ptr<TQueue> TQueue::create() {

View File

@ -108,6 +108,8 @@ class TQueue {
virtual Result<size_t> get(QueueId queue_id, EventId from_id, bool forget_previous, double now,
MutableSpan<Event> &result_events) = 0;
virtual size_t get_size(QueueId queue_id) = 0;
virtual void run_gc(double now) = 0;
};