Add limit on total TQueue events size.

GitOrigin-RevId: cd0d77e4b0e2d66d7c5253a6d36bd543bca95bd1
This commit is contained in:
levlam 2020-07-13 23:48:15 +03:00
parent e16952121b
commit 32f9e4924f

View File

@ -84,8 +84,9 @@ bool EventId::is_valid_id(int32 id) {
}
class TQueueImpl : public TQueue {
static constexpr size_t MAX_EVENT_LEN = 65536 * 8;
static constexpr size_t MAX_EVENT_LENGTH = 65536 * 8;
static constexpr size_t MAX_QUEUE_EVENTS = 1000000;
static constexpr size_t MAX_TOTAL_EVENT_LENGTH = 1 << 30;
public:
void set_callback(unique_ptr<StorageCallback> callback) override {
@ -103,6 +104,7 @@ class TQueueImpl : public TQueue {
raw_event.logevent_id = callback_->push(queue_id, raw_event);
}
q.tail_id = raw_event.event_id.next().move_as_ok();
q.total_event_length += raw_event.data.size();
q.events.push(std::move(raw_event));
return true;
}
@ -117,9 +119,12 @@ class TQueueImpl : public TQueue {
if (data.empty()) {
return Status::Error("Data is empty");
}
if (data.size() > MAX_EVENT_LEN) {
if (data.size() > MAX_EVENT_LENGTH) {
return Status::Error("Data is too big");
}
if (q.total_event_length > MAX_TOTAL_EVENT_LENGTH - data.size()) {
return Status::Error("Queue size is too big");
}
EventId event_id;
while (true) {
if (q.tail_id.empty()) {
@ -135,7 +140,7 @@ class TQueueImpl : public TQueue {
break;
}
for (auto &event : q.events.as_mutable_span()) {
pop(queue_id, event, {});
pop(q, queue_id, event, {});
}
q.tail_id = EventId();
q.events = {};
@ -185,7 +190,7 @@ class TQueueImpl : public TQueue {
if (it == from_events.end() || !(it->event_id == event_id)) {
return;
}
pop(queue_id, *it, q.tail_id);
pop(q, queue_id, *it, q.tail_id);
}
Result<size_t> get(QueueId queue_id, EventId from_id, bool forget_previous, double now,
@ -210,7 +215,7 @@ class TQueueImpl : public TQueue {
void run_gc(double now) override {
for (auto &it : queues_) {
for (auto &e : it.second.events.as_mutable_span()) {
try_pop(it.first, e, EventId(), it.second.tail_id, now);
try_pop(it.second, it.first, e, EventId(), now);
}
}
}
@ -233,6 +238,7 @@ class TQueueImpl : public TQueue {
struct Queue {
EventId tail_id;
VectorQueue<RawEvent> events;
size_t total_event_length = 0;
};
std::unordered_map<QueueId, Queue> queues_;
@ -249,31 +255,36 @@ class TQueueImpl : public TQueue {
events.pop_n(removed_n);
}
void try_pop(QueueId queue_id, RawEvent &event, EventId from_id, EventId tail_id, double now) {
void try_pop(Queue &q, QueueId queue_id, RawEvent &event, EventId from_id, double now) {
if (event.expires_at < now || event.event_id < from_id || event.data.empty()) {
pop(queue_id, event, tail_id);
pop(q, queue_id, event, q.tail_id);
}
}
void pop(QueueId queue_id, RawEvent &event, EventId tail_id) {
void pop(Queue &q, QueueId queue_id, RawEvent &event, EventId tail_id) {
if (callback_ == nullptr || event.logevent_id == 0) {
event.logevent_id = 0;
event.data = {};
clear_event_data(q, event);
return;
}
if (event.event_id.next().ok() == tail_id) {
if (!event.data.empty()) {
event.data = {};
clear_event_data(q, event);
callback_->push(queue_id, event);
}
} else {
callback_->pop(event.logevent_id);
event.logevent_id = 0;
event.data = {};
clear_event_data(q, event);
}
}
static void clear_event_data(Queue &q, RawEvent &event) {
q.total_event_length -= event.data.size();
event.data = {};
}
size_t do_get(QueueId queue_id, Queue &q, EventId from_id, bool forget_previous, double now,
MutableSpan<Event> &result_events) {
MutableSpan<RawEvent> from_events;
@ -291,7 +302,7 @@ class TQueueImpl : public TQueue {
}
for (i = first_i; i < from_events.size(); i++) {
auto &from = from_events[i];
try_pop(queue_id, from, forget_previous ? from_id : EventId{}, q.tail_id, now);
try_pop(q, queue_id, from, forget_previous ? from_id : EventId{}, now);
if (from.data.empty()) {
continue;
}