From 39e58f3eb96d4716bdc90dc64256082476e9b567 Mon Sep 17 00:00:00 2001 From: Arseny Smirnov Date: Fri, 6 Sep 2019 18:55:19 +0300 Subject: [PATCH] TQueue: extra GitOrigin-RevId: 7f746000e546b422034c0170e068d599317764c9 --- tddb/td/db/TQueue.cpp | 36 +++++++++++++++++++++++++----------- tddb/td/db/TQueue.h | 5 ++++- test/tqueue.cpp | 6 +++--- 3 files changed, 32 insertions(+), 15 deletions(-) diff --git a/tddb/td/db/TQueue.cpp b/tddb/td/db/TQueue.cpp index 25f791037..fb857172e 100644 --- a/tddb/td/db/TQueue.cpp +++ b/tddb/td/db/TQueue.cpp @@ -6,16 +6,16 @@ // #include "td/db/TQueue.h" -#include "td/utils/Random.h" -#include "td/utils/VectorQueue.h" -#include "td/utils/tl_parsers.h" -#include "td/utils/tl_storers.h" -#include "td/utils/tl_helpers.h" - #include "td/db/binlog/Binlog.h" #include "td/db/binlog/BinlogInterface.h" #include "td/db/binlog/BinlogHelper.h" +#include "td/utils/Random.h" +#include "td/utils/tl_parsers.h" +#include "td/utils/tl_storers.h" +#include "td/utils/tl_helpers.h" +#include "td/utils/VectorQueue.h" + #include namespace td { @@ -100,7 +100,8 @@ class TQueueImpl : public TQueue { q.events.push(std::move(raw_event)); } - Result push(QueueId queue_id, string data, double expire_at, EventId new_id = EventId()) override { + Result push(QueueId queue_id, string data, double expire_at, EventId new_id = EventId(), + int64 extra = 0) override { auto &q = queues_[queue_id]; if (q.events.size() >= MAX_QUEUE_EVENTS) { return Status::Error("Queue is full"); @@ -129,6 +130,7 @@ class TQueueImpl : public TQueue { raw_event.event_id = event_id; raw_event.data = std::move(data); raw_event.expire_at = expire_at; + raw_event.extra = extra; do_push(queue_id, std::move(raw_event)); return event_id; } @@ -221,6 +223,7 @@ class TQueueImpl : public TQueue { to.data = from.data; to.id = from.event_id; to.expire_at = from.expire_at; + to.extra = from.extra; ready_n++; } @@ -304,6 +307,7 @@ struct TQueueLogEvent : public Storer { int32 event_id; int32 expire_at; Slice data; + int64 extra; template void store(StorerT &&storer) const { @@ -312,15 +316,23 @@ struct TQueueLogEvent : public Storer { store(event_id, storer); store(expire_at, storer); store(data, storer); + if (extra != 0) { + store(extra, storer); + } } template - void parse(ParserT &&parser) { + void parse(ParserT &&parser, int32 has_extra) { using td::parse; parse(queue_id, parser); parse(event_id, parser); parse(expire_at, parser); data = parser.template fetch_string(); + if (has_extra == 0) { + extra = 0; + } else { + parse(extra, parser); + } } size_t size() const override { @@ -347,11 +359,12 @@ int64 TQueueBinlog::push(QueueId queue_id, const RawEvent &event) { log_event.event_id = event.event_id.value(); log_event.expire_at = static_cast(event.expire_at + diff_); log_event.data = event.data; + log_event.extra = event.extra; if (event.logevent_id == 0) { - auto res = binlog_->add(magic_, log_event); + auto res = binlog_->add(magic_ + (log_event.extra != 0), log_event); return res; } - binlog_->rewrite(event.logevent_id, magic_, log_event); + binlog_->rewrite(event.logevent_id, magic_ + (log_event.extra != 0), log_event); return event.logevent_id; } @@ -364,7 +377,7 @@ template Status TQueueBinlog::replay(const BinlogEvent &binlog_event, TQueue &q) { TQueueLogEvent event; TlParser parser(binlog_event.data_); - event.parse(parser); + event.parse(parser, binlog_event.type_ - magic_); TRY_STATUS(parser.get_status()); TRY_RESULT(event_id, EventId::from_int32(event.event_id)); RawEvent raw_event; @@ -372,6 +385,7 @@ Status TQueueBinlog::replay(const BinlogEvent &binlog_event, TQueue &q) raw_event.event_id = event_id; raw_event.expire_at = event.expire_at - diff_ + 1; raw_event.data = event.data.str(); + raw_event.extra = event.extra; q.do_push(event.queue_id, std::move(raw_event)); return Status::OK(); } diff --git a/tddb/td/db/TQueue.h b/tddb/td/db/TQueue.h index 590649ad1..88ac96026 100644 --- a/tddb/td/db/TQueue.h +++ b/tddb/td/db/TQueue.h @@ -38,12 +38,14 @@ class TQueue { struct Event { EventId id; Slice data; + int64 extra{0}; double expire_at; }; struct RawEvent { int64 logevent_id{0}; EventId event_id; string data; + int64 extra{0}; double expire_at{0}; }; class Callback { @@ -66,7 +68,8 @@ class TQueue { virtual void do_push(QueueId queue_id, RawEvent &&raw_event) = 0; - virtual Result push(QueueId queue_id, string data, double expire_at, EventId new_id = EventId()) = 0; + virtual Result push(QueueId queue_id, string data, double expire_at, EventId new_id = EventId(), + int64 extra = 0) = 0; virtual void forget(QueueId queue_id, EventId event_id) = 0; diff --git a/test/tqueue.cpp b/test/tqueue.cpp index be1f58d72..94b60f459 100644 --- a/test/tqueue.cpp +++ b/test/tqueue.cpp @@ -170,13 +170,13 @@ TEST(TQueue, random) { now += 10; }; auto check_head_tail = [&] { - q.check_head_tail(next_qid()); + q.check_head_tail(next_qid(), now); }; auto restart = [&] { - q.restart(rnd); + q.restart(rnd, now); }; auto get = [&] { - q.check_get(next_qid(), rnd); + q.check_get(next_qid(), rnd, now); }; RandomSteps steps({{push_event, 100}, {check_head_tail, 10}, {get, 40}, {inc_now, 5}, {restart, 1}}); for (int i = 0; i < 1000000; i++) {