From 8226c9ab490619ca6760cf91260297a91159ede0 Mon Sep 17 00:00:00 2001 From: Arseny Smirnov Date: Wed, 7 Aug 2019 18:29:47 +0300 Subject: [PATCH] TQueue: do not drop tail_id whithout restart GitOrigin-RevId: 54df1acc7b36303e128cfe1f6ac3e7ed4837eacd --- tddb/td/db/TQueue.cpp | 25 ++++++++++++++++--------- tddb/td/db/TQueue.h | 2 ++ test/tqueue.cpp | 3 +++ 3 files changed, 21 insertions(+), 9 deletions(-) diff --git a/tddb/td/db/TQueue.cpp b/tddb/td/db/TQueue.cpp index d9865d7e7..e3e376ac9 100644 --- a/tddb/td/db/TQueue.cpp +++ b/tddb/td/db/TQueue.cpp @@ -22,6 +22,10 @@ namespace td { using EventId = TQueue::EventId; +static constexpr int32 MAX_DELAY = 7 * 86400; +static constexpr size_t MAX_EVENT_LEN = 65536 * 8; +static constexpr size_t MAX_QUEUE_EVENTS = 1000000; + EventId::EventId() { } @@ -33,7 +37,7 @@ Result EventId::from_int32(int32 id) { } EventId EventId::create_random() { - return from_int32(Random::fast_uint32() % (MAX_ID / 2) + 10).move_as_ok(); + return from_int32(Random::fast_uint32() % (MAX_ID / 2) + 10 + MAX_QUEUE_EVENTS).move_as_ok(); } int32 EventId::value() const { @@ -71,9 +75,6 @@ bool EventId::is_valid(int32 id) { class TQueueImpl : public TQueue { public: - static constexpr int32 MAX_DELAY = 7 * 86400; - static constexpr size_t MAX_EVENT_LEN = 65536 * 8; - void set_callback(unique_ptr callback) override { callback_ = std::move(callback); } @@ -81,6 +82,14 @@ class TQueueImpl : public TQueue { return std::move(callback_); } + void emulate_restart() override { + for (auto &it : queues_) { + if (it.second.events.empty()) { + it.second.tail_id = {}; + } + } + } + void do_push(QueueId queue_id, RawEvent &&raw_event) override { CHECK(!raw_event.event_id.empty()); if (callback_ && raw_event.logevent_id == 0) { @@ -95,7 +104,7 @@ class TQueueImpl : public TQueue { auto &q = queues_[queue_id]; EventId event_id; while (true) { - if (q.events.empty()) { + if (q.tail_id.empty()) { q.tail_id = new_id.empty() ? EventId::create_random() : new_id; } event_id = q.tail_id; @@ -104,6 +113,7 @@ class TQueueImpl : public TQueue { break; } confirm_read(q, event_id); + q.tail_id = {}; } RawEvent raw_event; @@ -121,7 +131,7 @@ class TQueueImpl : public TQueue { } auto &q = it->second; if (q.events.empty()) { - return EventId(); + return q.tail_id; } return q.events.front().event_id; } @@ -132,9 +142,6 @@ class TQueueImpl : public TQueue { return EventId(); } auto &q = it->second; - if (q.events.empty()) { - return EventId(); - } return q.tail_id; } diff --git a/tddb/td/db/TQueue.h b/tddb/td/db/TQueue.h index 18cb30729..0a107c04e 100644 --- a/tddb/td/db/TQueue.h +++ b/tddb/td/db/TQueue.h @@ -61,6 +61,8 @@ class TQueue { virtual void set_callback(unique_ptr callback) = 0; virtual unique_ptr extract_callback() = 0; + virtual void emulate_restart() = 0; + virtual void do_push(QueueId queue_id, RawEvent &&raw_event) = 0; virtual EventId push(QueueId queue_id, string data, double expire_at, EventId new_id = EventId()) = 0; diff --git a/test/tqueue.cpp b/test/tqueue.cpp index a0373479f..12391a3c3 100644 --- a/test/tqueue.cpp +++ b/test/tqueue.cpp @@ -60,6 +60,8 @@ class TestTQueue { } void restart(Random::Xorshift128plus &rnd) { + baseline_->emulate_restart(); + memory_->extract_callback().release(); auto memory_storage = unique_ptr(memory_storage_); memory_ = TQueue::create(); @@ -67,6 +69,7 @@ class TestTQueue { memory_->set_callback(std::move(memory_storage)); if (rnd.fast(0, 100) != 0) { + binlog_->emulate_restart(); return; }