Use std::map instead of VectorQueue in TQueue.

GitOrigin-RevId: a419aa0c9ee5954f8bf1681e6e4097b3e632fa0c
This commit is contained in:
levlam 2020-07-25 00:37:45 +03:00
parent 47d2e8276a
commit 7d8d13606c
2 changed files with 63 additions and 87 deletions

View File

@ -19,9 +19,8 @@
#include "td/utils/tl_helpers.h"
#include "td/utils/tl_parsers.h"
#include "td/utils/tl_storers.h"
#include "td/utils/VectorQueue.h"
#include <algorithm>
#include <map>
#include <unordered_map>
namespace td {
@ -106,16 +105,18 @@ class TQueueImpl : public TQueue {
if (q.events.size() >= MAX_QUEUE_EVENTS || q.total_event_length > MAX_TOTAL_EVENT_LENGTH - raw_event.data.size()) {
return false;
}
if (q.events.empty() || q.events.back().event_id < raw_event.event_id) {
if (raw_event.logevent_id == 0 && callback_ != nullptr) {
raw_event.logevent_id = callback_->push(queue_id, raw_event);
}
q.tail_id = raw_event.event_id.next().move_as_ok();
q.total_event_length += raw_event.data.size();
q.events.push(std::move(raw_event));
return true;
auto event_id = raw_event.event_id;
if (event_id < q.tail_id) {
return false;
}
return false;
if (raw_event.logevent_id == 0 && callback_ != nullptr) {
raw_event.logevent_id = callback_->push(queue_id, raw_event);
}
q.tail_id = event_id.next().move_as_ok();
q.total_event_length += raw_event.data.size();
q.events.emplace(event_id, std::move(raw_event));
return true;
}
Result<EventId> push(QueueId queue_id, string data, double expires_at, int64 extra, EventId hint_new_id) override {
@ -147,11 +148,10 @@ class TQueueImpl : public TQueue {
if (event_id.next().is_ok()) {
break;
}
for (auto &event : q.events.as_mutable_span()) {
pop(q, queue_id, event, {});
for (auto it = q.events.begin(); it != q.events.end();) {
pop(q, queue_id, it, {});
}
q.tail_id = EventId();
q.events = {};
CHECK(hint_new_id.next().is_ok());
}
@ -174,7 +174,7 @@ class TQueueImpl : public TQueue {
if (q.events.empty()) {
return q.tail_id;
}
return q.events.front().event_id;
return q.events.begin()->first;
}
EventId get_tail(QueueId queue_id) const override {
@ -192,13 +192,11 @@ class TQueueImpl : public TQueue {
return;
}
auto &q = q_it->second;
auto from_events = q.events.as_mutable_span();
auto it = std::lower_bound(from_events.begin(), from_events.end(), event_id,
[](auto &event, EventId event_id) { return event.event_id < event_id; });
if (it == from_events.end() || !(it->event_id == event_id)) {
auto it = q.events.find(event_id);
if (it == q.events.end()) {
return;
}
pop(q, queue_id, *it, q.tail_id);
pop(q, queue_id, it, q.tail_id);
}
Result<size_t> get(QueueId queue_id, EventId from_id, bool forget_previous, double now,
@ -223,21 +221,25 @@ class TQueueImpl : public TQueue {
std::pair<uint64, uint64> run_gc(double now) override {
uint64 total_deleted_events = 0;
uint64 deleted_queues = 0;
for (auto it = queues_.begin(); it != queues_.end();) {
for (auto queue_it = queues_.begin(); queue_it != queues_.end();) {
size_t deleted_events = 0;
for (auto &e : it->second.events.as_mutable_span()) {
for (auto it = queue_it->second.events.begin(); it != queue_it->second.events.end();) {
auto &e = it->second;
if (e.expires_at < now) {
pop(it->second, it->first, e, e.expires_at < now - 7 * 86400 ? EventId() : it->second.tail_id);
if (e.logevent_id == 0 && e.data.empty()) {
if (!it->second.data.empty()) {
deleted_events++;
}
pop(queue_it->second, queue_it->first, it,
e.expires_at < now - 7 * 86400 ? EventId() : queue_it->second.tail_id);
} else {
++it;
}
}
if (callback_ != nullptr && deleted_events == it->second.events.size()) {
if (callback_ != nullptr && queue_it->second.events.empty()) {
deleted_queues++;
it = queues_.erase(it);
queue_it = queues_.erase(queue_it);
} else {
++it;
++queue_it;
}
total_deleted_events += deleted_events;
}
@ -254,46 +256,31 @@ class TQueueImpl : public TQueue {
return 0;
}
MutableSpan<Event> span;
return do_get(queue_id, q, q.events.front().event_id, true, Time::now(), span);
return q.events.size() - (q.events.rbegin()->second.data.empty() ? 1 : 0);
}
void close(Promise<> promise) override {
callback_->close(std::move(promise));
callback_ = nullptr;
if (callback_ != nullptr) {
callback_->close(std::move(promise));
callback_ = nullptr;
}
}
private:
struct Queue {
EventId tail_id;
VectorQueue<RawEvent> events;
std::map<EventId, RawEvent> events;
size_t total_event_length = 0;
};
std::unordered_map<QueueId, Queue> queues_;
unique_ptr<StorageCallback> callback_;
static void compactify(VectorQueue<RawEvent> &events, size_t prefix) {
if (prefix == events.size()) {
CHECK(!events.empty());
prefix--;
}
auto processed = events.as_mutable_span().substr(0, prefix);
auto removed_n =
processed.rend() - std::remove_if(processed.rbegin(), processed.rend(), [](auto &e) { return e.data.empty(); });
events.pop_n(removed_n);
}
void try_pop(Queue &q, QueueId queue_id, RawEvent &event, EventId from_id, double now) {
if (event.expires_at < now || event.event_id < from_id || event.data.empty()) {
pop(q, queue_id, event, q.tail_id);
}
}
void pop(Queue &q, QueueId queue_id, RawEvent &event, EventId tail_id) {
void pop(Queue &q, QueueId queue_id, std::map<EventId, RawEvent>::iterator &it, EventId tail_id) {
auto &event = it->second;
if (callback_ == nullptr || event.logevent_id == 0) {
event.logevent_id = 0;
clear_event_data(q, event);
remove_event(q, it);
return;
}
@ -302,13 +289,19 @@ class TQueueImpl : public TQueue {
clear_event_data(q, event);
callback_->push(queue_id, event);
}
++it;
} else {
callback_->pop(event.logevent_id);
event.logevent_id = 0;
clear_event_data(q, event);
remove_event(q, it);
}
}
static void remove_event(Queue &q, std::map<EventId, RawEvent>::iterator &it) {
q.total_event_length -= it->second.data.size();
it = q.events.erase(it);
}
static void clear_event_data(Queue &q, RawEvent &event) {
q.total_event_length -= event.data.size();
event.data = {};
@ -316,52 +309,35 @@ class TQueueImpl : public TQueue {
size_t do_get(QueueId queue_id, Queue &q, EventId from_id, bool forget_previous, double now,
MutableSpan<Event> &result_events) {
MutableSpan<RawEvent> from_events;
size_t ready_n = 0;
size_t i = 0;
while (true) {
from_events = q.events.as_mutable_span();
ready_n = 0;
size_t first_i = 0;
if (!forget_previous) {
first_i = std::lower_bound(from_events.begin(), from_events.end(), from_id,
[](auto &event, EventId event_id) { return event.event_id < event_id; }) -
from_events.begin();
if (forget_previous) {
for (auto it = q.events.begin(); it != q.events.end() && it->first < from_id;) {
pop(q, queue_id, it, q.tail_id);
}
for (i = first_i; i < from_events.size(); i++) {
auto &from = from_events[i];
try_pop(q, queue_id, from, forget_previous ? from_id : EventId{}, now);
if (from.data.empty()) {
continue;
}
}
size_t ready_n = 0;
for (auto it = q.events.lower_bound(from_id); it != q.events.end();) {
auto &event = it->second;
if (event.expires_at < now || event.data.empty()) {
pop(q, queue_id, it, q.tail_id);
} else {
CHECK(!(event.event_id < from_id));
if (ready_n == result_events.size()) {
break;
}
CHECK(!(from.event_id < from_id));
auto &to = result_events[ready_n];
to.data = from.data;
to.id = from.event_id;
to.expires_at = from.expires_at;
to.extra = from.extra;
to.data = event.data;
to.id = event.event_id;
to.expires_at = event.expires_at;
to.extra = event.extra;
ready_n++;
++it;
}
// compactify skipped events
if ((ready_n + 1) * 2 < i + first_i) {
compactify(q.events, i);
continue;
}
break;
}
result_events.truncate(ready_n);
size_t left_n = from_events.size() - i;
return ready_n + left_n;
return get_size(queue_id);
}
};

View File

@ -35,7 +35,7 @@ TEST(TQueue, hands) {
ASSERT_EQ(head.next().ok(), tail);
ASSERT_EQ(1u, tqueue->get(qid, head, true, 0, events_span).move_as_ok());
ASSERT_EQ(1u, tqueue->get(qid, head, true, 0, events_span).move_as_ok());
ASSERT_EQ(0u, tqueue->get(qid, tail, false, 0, events_span).move_as_ok());
ASSERT_EQ(1u, tqueue->get(qid, tail, false, 0, events_span).move_as_ok());
ASSERT_EQ(1u, tqueue->get(qid, head, true, 0, events_span).move_as_ok());
ASSERT_EQ(0u, tqueue->get(qid, tail, true, 0, events_span).move_as_ok());
ASSERT_EQ(0u, tqueue->get(qid, head, true, 0, events_span).move_as_ok());