From 42e3aefc02a3150b52c339dedc270dc333624499 Mon Sep 17 00:00:00 2001 From: levlam Date: Thu, 11 Jun 2020 18:12:16 +0300 Subject: [PATCH] TQueue improvements. GitOrigin-RevId: 753aba147aed58ccba167a83dd798987ce6da177 --- tddb/td/db/TQueue.cpp | 34 ++++++++++++++--------------- tddb/td/db/TQueue.h | 50 ++++++++++++++++++++++++++++++++----------- test/tqueue.cpp | 6 +++--- 3 files changed, 58 insertions(+), 32 deletions(-) diff --git a/tddb/td/db/TQueue.cpp b/tddb/td/db/TQueue.cpp index 675618c5d..1c874dc56 100644 --- a/tddb/td/db/TQueue.cpp +++ b/tddb/td/db/TQueue.cpp @@ -66,6 +66,10 @@ bool EventId::operator==(const EventId &other) const { return id_ == other.id_; } +bool EventId::operator!=(const EventId &other) const { + return !(*this == other); +} + bool EventId::operator<(const EventId &other) const { return id_ < other.id_; } @@ -177,11 +181,11 @@ class TQueueImpl : public TQueue { } Result get(QueueId queue_id, EventId from_id, bool forget_previous, double now, - MutableSpan &events) override { + MutableSpan &result_events) override { //LOG(ERROR) << "Get " << queue_id << " " << from_id; auto it = queues_.find(queue_id); if (it == queues_.end()) { - events.truncate(0); + result_events.truncate(0); return 0; } auto &q = it->second; @@ -214,7 +218,7 @@ class TQueueImpl : public TQueue { continue; } - if (ready_n == events.size()) { + if (ready_n == result_events.size()) { break; } @@ -224,7 +228,7 @@ class TQueueImpl : public TQueue { continue; } - auto &to = events[ready_n]; + auto &to = result_events[ready_n]; to.data = from.data; to.id = from.event_id; to.expires_at = from.expires_at; @@ -241,7 +245,7 @@ class TQueueImpl : public TQueue { break; } - events.truncate(ready_n); + result_events.truncate(ready_n); size_t left_n = from_events.size() - i; return ready_n + left_n; } @@ -299,12 +303,9 @@ class TQueueImpl : public TQueue { } } }; -unique_ptr TQueue::create(unique_ptr callback) { - auto res = make_unique(); - if (callback) { - res->set_callback(std::move(callback)); - } - return res; + +unique_ptr TQueue::create() { + return make_unique(); } struct TQueueLogEvent : public Storer { @@ -358,6 +359,7 @@ template TQueueBinlog::TQueueBinlog() { diff_ = Clocks::system() - Time::now(); } + template uint64 TQueueBinlog::push(QueueId queue_id, const RawEvent &event) { TQueueLogEvent log_event; @@ -367,8 +369,7 @@ uint64 TQueueBinlog::push(QueueId queue_id, const RawEvent &event) { log_event.data = event.data; log_event.extra = event.extra; if (event.logevent_id == 0) { - auto res = binlog_->add(magic_ + (log_event.extra != 0), log_event); - return res; + return binlog_->add(magic_ + (log_event.extra != 0), log_event); } binlog_->rewrite(event.logevent_id, magic_ + (log_event.extra != 0), log_event); return event.logevent_id; @@ -399,17 +400,16 @@ Status TQueueBinlog::replay(const BinlogEvent &binlog_event, TQueue &q) template class TQueueBinlog; template class TQueueBinlog; -uint64 MemoryStorage::push(QueueId queue_id, const RawEvent &event) { +uint64 TQueueMemoryStorage::push(QueueId queue_id, const RawEvent &event) { auto logevent_id = event.logevent_id == 0 ? next_logevent_id_++ : event.logevent_id; events_[logevent_id] = std::make_pair(queue_id, event); - return logevent_id; } -void MemoryStorage::pop(uint64 logevent_id) { +void TQueueMemoryStorage::pop(uint64 logevent_id) { events_.erase(logevent_id); } -void MemoryStorage::replay(TQueue &q) { +void TQueueMemoryStorage::replay(TQueue &q) { for (auto e : events_) { auto x = e.second; x.second.logevent_id = e.first; diff --git a/tddb/td/db/TQueue.h b/tddb/td/db/TQueue.h index 409835f35..6a2221251 100644 --- a/tddb/td/db/TQueue.h +++ b/tddb/td/db/TQueue.h @@ -23,29 +23,40 @@ class TQueue { class EventId { public: static constexpr int32 MAX_ID = 2000000000; + EventId(); + static Result from_int32(int32 id); + static EventId create_random(); + int32 value() const; + Result next() const; + Result advance(size_t offset) const; + bool empty() const; bool operator==(const EventId &other) const; + bool operator!=(const EventId &other) const; bool operator<(const EventId &other) const; private: int32 id_{0}; + explicit EventId(int32 id); + static bool is_valid(int32 id); }; - using QueueId = int64; + struct Event { EventId id; Slice data; int64 extra{0}; - double expires_at; + double expires_at{0}; }; + struct RawEvent { uint64 logevent_id{0}; EventId event_id; @@ -53,23 +64,39 @@ class TQueue { int64 extra{0}; double expires_at{0}; }; + + using QueueId = int64; + class Callback { public: - using EventId = TQueue::EventId; using QueueId = TQueue::QueueId; using RawEvent = TQueue::RawEvent; - virtual ~Callback() { - } + + Callback() = default; + Callback(const Callback &) = delete; + Callback &operator=(const Callback &) = delete; + Callback(Callback &&) = delete; + Callback &operator=(Callback &&) = delete; + virtual ~Callback() = default; + virtual uint64 push(QueueId queue_id, const RawEvent &event) = 0; virtual void pop(uint64 logevent_id) = 0; }; - virtual ~TQueue() { - } + static unique_ptr create(); + + TQueue() = default; + TQueue(const TQueue &) = delete; + TQueue &operator=(const TQueue &) = delete; + TQueue(TQueue &&) = delete; + TQueue &operator=(TQueue &&) = delete; + + virtual ~TQueue() = default; + virtual void set_callback(unique_ptr callback) = 0; virtual unique_ptr extract_callback() = 0; - virtual void emulate_restart() = 0; + virtual void emulate_restart() = 0; // for testing only virtual void do_push(QueueId queue_id, RawEvent &&raw_event) = 0; @@ -82,11 +109,9 @@ class TQueue { virtual EventId get_tail(QueueId queue_id) const = 0; virtual Result get(QueueId queue_id, EventId from_id, bool forget_previous, double now, - MutableSpan &events) = 0; + MutableSpan &result_events) = 0; virtual void run_gc(double now) = 0; - - static unique_ptr create(unique_ptr callback = {}); }; StringBuilder &operator<<(StringBuilder &sb, const TQueue::EventId id); @@ -97,6 +122,7 @@ template class TQueueBinlog : public TQueue::Callback { public: TQueueBinlog(); + uint64 push(QueueId queue_id, const RawEvent &event) override; void pop(uint64 logevent_id) override; Status replay(const BinlogEvent &binlog_event, TQueue &q); @@ -111,7 +137,7 @@ class TQueueBinlog : public TQueue::Callback { double diff_{0}; }; -class MemoryStorage : public TQueue::Callback { +class TQueueMemoryStorage : public TQueue::Callback { public: uint64 push(QueueId queue_id, const RawEvent &event) override; void pop(uint64 logevent_id) override; diff --git a/test/tqueue.cpp b/test/tqueue.cpp index ecda6fdfa..7e1e178f3 100644 --- a/test/tqueue.cpp +++ b/test/tqueue.cpp @@ -46,7 +46,7 @@ class TestTQueue { baseline_ = TQueue::create(); memory_ = TQueue::create(); - auto memory_storage = td::make_unique(); + auto memory_storage = td::make_unique(); memory_storage_ = memory_storage.get(); memory_->set_callback(std::move(memory_storage)); @@ -66,7 +66,7 @@ class TestTQueue { } memory_->extract_callback().release(); - auto memory_storage = unique_ptr(memory_storage_); + auto memory_storage = unique_ptr(memory_storage_); memory_ = TQueue::create(); memory_storage->replay(*memory_); memory_->set_callback(std::move(memory_storage)); @@ -144,7 +144,7 @@ class TestTQueue { unique_ptr baseline_; unique_ptr memory_; unique_ptr binlog_; - MemoryStorage *memory_storage_{nullptr}; + TQueueMemoryStorage *memory_storage_{nullptr}; }; TEST(TQueue, random) {