From 7a48b9bfc06e4a5a374bfa9a9e3f9825fccb1518 Mon Sep 17 00:00:00 2001 From: Arseny Smirnov Date: Tue, 27 Aug 2019 18:06:00 +0300 Subject: [PATCH] TQueue: interface for webhooks GitOrigin-RevId: 521f25f183d76bcbb9143270660dd48b3bf8fc83 --- tddb/td/db/TQueue.cpp | 47 ++++++++++++++++++++++++++++++++++++++----- tddb/td/db/TQueue.h | 6 +++++- test/tqueue.cpp | 8 ++++---- 3 files changed, 51 insertions(+), 10 deletions(-) diff --git a/tddb/td/db/TQueue.cpp b/tddb/td/db/TQueue.cpp index 8201f321c..25f791037 100644 --- a/tddb/td/db/TQueue.cpp +++ b/tddb/td/db/TQueue.cpp @@ -61,6 +61,10 @@ bool EventId::operator==(const EventId &other) const { return id_ == other.id_; } +bool EventId::operator<(const EventId &other) const { + return id_ < other.id_; +} + StringBuilder &operator<<(StringBuilder &sb, const EventId id) { return sb << "EventId{" << id.value() << "}"; } @@ -86,6 +90,7 @@ class TQueueImpl : public TQueue { } void do_push(QueueId queue_id, RawEvent &&raw_event) override { + //LOG(ERROR) << "Push " << queue_id << " " << raw_event.event_id; CHECK(!raw_event.event_id.empty()); if (raw_event.logevent_id == 0 && callback_) { raw_event.logevent_id = callback_->push(queue_id, raw_event); @@ -149,7 +154,24 @@ class TQueueImpl : public TQueue { return q.tail_id; } - Result get(QueueId queue_id, EventId from_id, double now, MutableSpan &events) override { + void forget(QueueId queue_id, EventId event_id) override { + auto q_it = queues_.find(queue_id); + if (q_it == queues_.end()) { + return; + } + auto &q = q_it->second; + auto from_events = q.events.as_mutable_span(); + auto it = std::lower_bound(from_events.begin(), from_events.end(), event_id, + [](auto &event, EventId event_id) { return event.event_id < event_id; }); + if (it == from_events.end() || !(it->event_id == event_id)) { + return; + } + try_pop(queue_id, *it, {}, q.tail_id, 0, true /*force*/); + } + + Result get(QueueId queue_id, EventId from_id, bool forget_previous, double now, + MutableSpan &events) override { + //LOG(ERROR) << "Get " << queue_id << " " << from_id; auto it = queues_.find(queue_id); if (it == queues_.end()) { events.truncate(0); @@ -171,10 +193,16 @@ class TQueueImpl : public TQueue { while (true) { from_events = q.events.as_mutable_span(); ready_n = 0; - i = 0; - for (; i < from_events.size(); i++) { + size_t first_i = 0; + if (!forget_previous) { + first_i = std::lower_bound(from_events.begin(), from_events.end(), from_id, + [](auto &event, EventId event_id) { return event.event_id < event_id; }) - + from_events.begin(); + } + //LOG(ERROR) << tag("first_i", first_i) << tag("size", from_events.size()); + for (i = first_i; i < from_events.size(); i++) { auto &from = from_events[i]; - try_pop(queue_id, from, from_id, q.tail_id, now); + try_pop(queue_id, from, forget_previous ? from_id : EventId{}, q.tail_id, now); if (from.data.empty()) { continue; } @@ -183,6 +211,12 @@ class TQueueImpl : public TQueue { break; } + if (from.event_id < from_id) { + // should not happend + UNREACHABLE(); + continue; + } + auto &to = events[ready_n]; to.data = from.data; to.id = from.event_id; @@ -191,7 +225,7 @@ class TQueueImpl : public TQueue { } // compactify skipped events - if (ready_n * 2 < i) { + if ((first_i + ready_n) * 2 < i) { compactify(q.events, i); continue; } @@ -229,6 +263,8 @@ class TQueueImpl : public TQueue { } void try_pop(QueueId queue_id, RawEvent &event, EventId from_id, EventId tail_id, double now, bool force = false) { + //LOG(ERROR) << event.expire_at << " < " << now << " = " << (event.expire_at < now) << " " + //<< (event.event_id.value() < from_id.value()) << " " << force << " " << event.data.empty(); bool should_drop = event.expire_at < now || event.event_id.value() < from_id.value() || force || event.data.empty(); if (!callback_ || event.logevent_id == 0) { if (should_drop) { @@ -241,6 +277,7 @@ class TQueueImpl : public TQueue { return; } + //LOG(ERROR) << "Drop " << queue_id << " " << event.event_id; if (event.event_id.value() + 1 == tail_id.value()) { if (!event.data.empty()) { event.data = {}; diff --git a/tddb/td/db/TQueue.h b/tddb/td/db/TQueue.h index dcb222dee..590649ad1 100644 --- a/tddb/td/db/TQueue.h +++ b/tddb/td/db/TQueue.h @@ -27,6 +27,7 @@ class TQueue { bool empty() const; bool operator==(const EventId &other) const; + bool operator<(const EventId &other) const; private: int32 id_{0}; @@ -67,10 +68,13 @@ class TQueue { virtual Result push(QueueId queue_id, string data, double expire_at, EventId new_id = EventId()) = 0; + virtual void forget(QueueId queue_id, EventId event_id) = 0; + virtual EventId get_head(QueueId queue_id) const = 0; virtual EventId get_tail(QueueId queue_id) const = 0; - virtual Result get(QueueId queue_id, EventId from_id, double now, MutableSpan &events) = 0; + virtual Result get(QueueId queue_id, EventId from_id, bool forget_previous, double now, + MutableSpan &events) = 0; virtual void run_gc(double now) = 0; diff --git a/test/tqueue.cpp b/test/tqueue.cpp index e71319b68..be1f58d72 100644 --- a/test/tqueue.cpp +++ b/test/tqueue.cpp @@ -34,7 +34,7 @@ TEST(TQueue, hands) { tqueue->push(qid, "hello", 0); auto head = tqueue->get_head(qid); ASSERT_EQ(head.next().ok(), tqueue->get_tail(qid)); - ASSERT_EQ(1u, tqueue->get(qid, head, 0, events_span).move_as_ok()); + ASSERT_EQ(1u, tqueue->get(qid, head, true, 0, events_span).move_as_ok()); } class TestTQueue { @@ -127,9 +127,9 @@ class TestTQueue { if (tmp.is_ok()) { a_from = tmp.move_as_ok(); } - baseline_->get(qid, a_from, now, a_span).move_as_ok(); - memory_->get(qid, a_from, now, b_span).move_as_ok(); - binlog_->get(qid, a_from, now, c_span).move_as_ok(); + baseline_->get(qid, a_from, true, now, a_span).move_as_ok(); + memory_->get(qid, a_from, true, now, b_span).move_as_ok(); + binlog_->get(qid, a_from, true, now, c_span).move_as_ok(); ASSERT_EQ(a_span.size(), b_span.size()); ASSERT_EQ(a_span.size(), c_span.size()); for (size_t i = 0; i < a_span.size(); i++) {