From a8c74f9432c8040b689fbbbfdcb40cfd49a2ac35 Mon Sep 17 00:00:00 2001 From: Arseny Smirnov Date: Wed, 7 Aug 2019 14:01:22 +0300 Subject: [PATCH] TQueue: export from tqueue, Tqueue.{cpp,h} GitOrigin-RevId: 045ccbd0694d4906788a38bb30f16f0f9b0ef97d --- tddb/CMakeLists.txt | 2 + tddb/td/db/TQueue.cpp | 292 ++++++++++++++++++++++++++++ tddb/td/db/TQueue.h | 106 ++++++++++ test/tqueue.cpp | 440 +++++++----------------------------------- 4 files changed, 466 insertions(+), 374 deletions(-) create mode 100644 tddb/td/db/TQueue.cpp create mode 100644 tddb/td/db/TQueue.h diff --git a/tddb/CMakeLists.txt b/tddb/CMakeLists.txt index 9a80b6cf3..d633bdd98 100644 --- a/tddb/CMakeLists.txt +++ b/tddb/CMakeLists.txt @@ -17,6 +17,7 @@ set(TDDB_SOURCE td/db/SqliteKeyValue.cpp td/db/SqliteKeyValueAsync.cpp td/db/SqliteStatement.cpp + td/db/TQueue.cpp td/db/detail/RawSqliteDb.cpp @@ -39,6 +40,7 @@ set(TDDB_SOURCE td/db/SqliteKeyValueAsync.h td/db/SqliteKeyValueSafe.h td/db/SqliteStatement.h + td/db/TQueue.h td/db/TsSeqKeyValue.h td/db/detail/RawSqliteDb.h diff --git a/tddb/td/db/TQueue.cpp b/tddb/td/db/TQueue.cpp new file mode 100644 index 000000000..446e279b8 --- /dev/null +++ b/tddb/td/db/TQueue.cpp @@ -0,0 +1,292 @@ +// +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2020 +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +#include "td/db/TQueue.h" + +#include "td/utils/Random.h" +#include "td/utils/VectorQueue.h" +#include "td/utils/tl_parsers.h" +#include "td/utils/tl_storers.h" +#include "td/utils/tl_helpers.h" + +#include "td/db/binlog/Binlog.h" +#include "td/db/binlog/BinlogInterface.h" +#include "td/db/binlog/BinlogHelper.h" + +#include + +namespace td { + +using EventId = TQueue::EventId; + +EventId::EventId() { +} + +Result EventId::from_int32(int32 id) { + if (!is_valid(id)) { + return Status::Error("Invalid id"); + } + return EventId(id); +} + +EventId EventId::create_random() { + return from_int32(Random::fast_uint32() % (MAX_ID / 2) + 10).move_as_ok(); +} + +int32 EventId::value() const { + return id_; +} + +Result EventId::next() const { + return from_int32(id_ + 1); +} + +Result EventId::advance(size_t offset) const { + TRY_RESULT(new_id, narrow_cast_safe(id_ + offset)); + return from_int32(new_id); +} + +bool EventId::empty() const { + return id_ == 0; +} + +bool EventId::operator==(const EventId &other) const { + return id_ == other.id_; +} + +StringBuilder &operator<<(StringBuilder &sb, const EventId id) { + return sb << "EventId{" << id.value() << "}"; +} + +EventId::EventId(int32 id) : id_(id) { + CHECK(is_valid(id)); +} + +bool EventId::is_valid(int32 id) { + return 0 <= id && id < MAX_ID; +} + +class TQueueImpl : public TQueue { + public: + void set_callback(unique_ptr callback) override { + callback_ = std::move(callback); + } + unique_ptr extract_callback() override { + return std::move(callback_); + } + + void do_push(QueueId queue_id, RawEvent &&raw_event) override { + CHECK(!raw_event.event_id.empty()); + if (callback_ && raw_event.logevent_id == 0) { + raw_event.logevent_id = callback_->push(queue_id, raw_event); + } + auto &q = queues_[queue_id]; + q.tail_id = raw_event.event_id.next().move_as_ok(); + q.events.push(std::move(raw_event)); + } + + void on_pop(int64 logevent_id) { + if (callback_) { + callback_->pop(logevent_id); + } + } + + EventId push(QueueId queue_id, string data, double expire_at, EventId new_id = EventId()) override { + auto &q = queues_[queue_id]; + EventId event_id; + while (true) { + if (q.events.empty()) { + q.tail_id = new_id.empty() ? EventId::create_random() : new_id; + } + event_id = q.tail_id; + CHECK(!event_id.empty()); + if (event_id.next().is_ok()) { + break; + } + confirm_read(q, event_id); + } + + RawEvent raw_event; + raw_event.event_id = event_id; + raw_event.data = data; + raw_event.expire_at = expire_at; + do_push(queue_id, std::move(raw_event)); + return event_id; + } + + EventId get_head(QueueId queue_id) const override { + auto it = queues_.find(queue_id); + if (it == queues_.end()) { + return EventId(); + } + auto &q = it->second; + if (q.events.empty()) { + return EventId(); + } + return q.events.front().event_id; + } + + EventId get_tail(QueueId queue_id) const override { + auto it = queues_.find(queue_id); + if (it == queues_.end()) { + return EventId(); + } + auto &q = it->second; + if (q.events.empty()) { + return EventId(); + } + return q.tail_id; + } + + Result get(QueueId queue_id, EventId from_id, double now, MutableSpan events) override { + auto it = queues_.find(queue_id); + if (it == queues_.end()) { + return 0; + } + auto &q = it->second; + //TODO: sanity check for from_id + confirm_read(q, from_id); + if (q.events.empty()) { + return 0; + } + + auto from_events = q.events.as_span(); + size_t res_n = 0; + for (size_t i = 0; i < from_events.size(); i++) { + auto &from = from_events[i]; + if (from.expire_at < now) { + //TODO: pop this element + continue; + } + + auto &to = events[res_n]; + to.data = from.data; + to.id = from.event_id; + to.expire_at = from.expire_at; + + res_n++; + if (res_n == events.size()) { + break; + } + } + return res_n; + } + + private: + struct Queue { + EventId tail_id; + VectorQueue events; + }; + + std::unordered_map queues_; + unique_ptr callback_; + + void confirm_read(Queue &q, EventId till_id) { + while (!q.events.empty() && q.events.front().event_id.value() < till_id.value()) { + on_pop(q.events.front().logevent_id); + q.events.pop(); + } + } +}; +unique_ptr TQueue::create(unique_ptr callback) { + auto res = make_unique(); + if (callback) { + res->set_callback(std::move(callback)); + } + return res; +} + +struct TQueueLogEvent : public Storer { + TQueueLogEvent() = default; + int64 queue_id; + int32 event_id; + int32 expire_at; + Slice data; + + template + void store(StorerT &&storer) const { + using td::store; + store(queue_id, storer); + store(event_id, storer); + store(expire_at, storer); + store(data, storer); + } + + template + void parse(ParserT &&parser) { + using td::parse; + parse(queue_id, parser); + parse(event_id, parser); + parse(expire_at, parser); + data = parser.template fetch_string(); + } + + size_t size() const override { + TlStorerCalcLength storer; + store(storer); + return storer.get_length(); + } + + size_t store(uint8 *ptr) const override { + TlStorerUnsafe storer(ptr); + store(storer); + return static_cast(storer.get_buf() - ptr); + } +}; + +template +int64 TQueueBinlog::push(QueueId queue_id, const RawEvent &event) { + TQueueLogEvent log_event; + log_event.queue_id = queue_id; + log_event.event_id = event.event_id.value(); + log_event.expire_at = static_cast(event.expire_at); + log_event.data = event.data; + return binlog_->add(magic_, log_event); +} + +template +void TQueueBinlog::pop(int64 logevent_id) { + binlog_->erase(logevent_id); +} + +template +Status TQueueBinlog::replay(const BinlogEvent &binlog_event, TQueue &q) { + TQueueLogEvent event; + TlParser parser(binlog_event.data_); + event.parse(parser); + TRY_STATUS(parser.get_status()); + TRY_RESULT(event_id, EventId::from_int32(event.event_id)); + RawEvent raw_event; + raw_event.logevent_id = binlog_event.id_; + raw_event.event_id = event_id; + raw_event.expire_at = event.expire_at; + raw_event.data = event.data.str(); + q.do_push(event.queue_id, std::move(raw_event)); + return Status::OK(); +} + +template class TQueueBinlog; +template class TQueueBinlog; + +int64 MemoryStorage::push(QueueId queue_id, const RawEvent &event) { + auto logevent_id = next_logevent_id_++; + events_[logevent_id] = std::make_pair(queue_id, event); + + return logevent_id; +} +void MemoryStorage::pop(int64 logevent_id) { + events_.erase(logevent_id); +} + +void MemoryStorage::replay(TQueue &q) { + for (auto e : events_) { + auto x = e.second; + x.second.logevent_id = e.first; + q.do_push(x.first, std::move(x.second)); + } +} + +} // namespace td diff --git a/tddb/td/db/TQueue.h b/tddb/td/db/TQueue.h new file mode 100644 index 000000000..ff78b13ac --- /dev/null +++ b/tddb/td/db/TQueue.h @@ -0,0 +1,106 @@ +// +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2020 +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +#pragma once + +#include "td/utils/misc.h" +#include "td/utils/Span.h" + +#include + +namespace td { + +class TQueue { + public: + class EventId { + public: + constexpr static int32 MAX_ID = 2000000000; + EventId(); + static Result from_int32(int32 id); + static EventId create_random(); + int32 value() const; + Result next() const; + Result advance(size_t offset) const; + bool empty() const; + + bool operator==(const EventId &other) const; + + private: + int32 id_{0}; + explicit EventId(int32 id); + static bool is_valid(int32 id); + }; + using QueueId = int64; + struct Event { + EventId id; + Slice data; + double expire_at; + }; + struct RawEvent { + int64 logevent_id{0}; + EventId event_id; + string data; + double expire_at{0}; + }; + class Callback { + public: + using EventId = TQueue::EventId; + using QueueId = TQueue::QueueId; + using RawEvent = TQueue::RawEvent; + virtual ~Callback() { + } + virtual int64 push(QueueId queue_id, const RawEvent &event) = 0; + virtual void pop(int64 logevent_id) = 0; + }; + + virtual ~TQueue() { + } + virtual void set_callback(unique_ptr callback) = 0; + virtual unique_ptr extract_callback() = 0; + + virtual void do_push(QueueId queue_id, RawEvent &&raw_event) = 0; + + virtual EventId push(QueueId queue_id, string data, double expire_at, EventId new_id = EventId()) = 0; + + virtual EventId get_head(QueueId queue_id) const = 0; + virtual EventId get_tail(QueueId queue_id) const = 0; + + virtual Result get(QueueId queue_id, EventId from_id, double now, MutableSpan events) = 0; + + static unique_ptr create(unique_ptr callback = {}); +}; + +StringBuilder &operator<<(StringBuilder &sb, const TQueue::EventId id); + +struct BinlogEvent; +template +class TQueueBinlog : public TQueue::Callback { + public: + int64 push(QueueId queue_id, const RawEvent &event) override; + void pop(int64 logevent_id) override; + Status replay(const BinlogEvent &binlog_event, TQueue &q); + + void set_binlog(std::shared_ptr binlog) { + binlog_ = std::move(binlog); + } + + private: + std::shared_ptr binlog_; + int32 magic_{2314}; +}; + +class MemoryStorage : public TQueue::Callback { + public: + int64 push(QueueId queue_id, const RawEvent &event) override; + void pop(int64 logevent_id) override; + void replay(TQueue &q); + + private: + int64 next_logevent_id_{1}; + std::map> events_; +}; + +} // namespace td diff --git a/test/tqueue.cpp b/test/tqueue.cpp index cec8f8bf8..478db0d75 100644 --- a/test/tqueue.cpp +++ b/test/tqueue.cpp @@ -4,358 +4,37 @@ // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) // -#include "td/utils/int_types.h" -#include "td/utils/Status.h" -#include "td/utils/Slice.h" -#include "td/utils/Span.h" -#include "td/utils/VectorQueue.h" -#include "td/utils/HashMap.h" -#include "td/utils/misc.h" -#include "td/utils/tests.h" -#include "td/utils/tl_storers.h" -#include "td/utils/tl_parsers.h" -#include "td/utils/tl_helpers.h" -#include -#include - #include "td/db/binlog/Binlog.h" #include "td/db/binlog/BinlogEvent.h" #include "td/db/binlog/BinlogHelper.h" +#include "td/db/TQueue.h" + +#include "td/utils/int_types.h" +#include "td/utils/misc.h" +#include "td/utils/port/path.h" +#include "td/utils/Slice.h" +#include "td/utils/Span.h" +#include "td/utils/Status.h" +#include "td/utils/tests.h" +#include "td/utils/VectorQueue.h" + +#include +#include namespace td { -class EventId { - public: - constexpr static int32 MAX_ID = 2000000000; - EventId() = default; - static Result from_int32(int32 id) { - if (!is_valid(id)) { - return Status::Error("Invalid id"); - } - return EventId(id); - } - static EventId create_random() { - return from_int32(Random::fast_uint32() % (MAX_ID / 2) + 10).move_as_ok(); - } - int32 value() const { - return id_; - } - Result next() const { - return from_int32(id_ + 1); - } - Result advance(size_t offset) const { - TRY_RESULT(new_id, narrow_cast_safe(id_ + offset)); - return from_int32(new_id); - } - bool empty() const { - return id_ == 0; - } - - bool operator==(const EventId &other) const { - return id_ == other.id_; - } - friend StringBuilder &operator<<(StringBuilder &sb, const EventId id) { - return sb << "EventId{" << id.value() << "}"; - } - - private: - int32 id_{0}; - explicit EventId(int32 id) : id_(id) { - CHECK(is_valid(id)); - } - static bool is_valid(int32 id) { - return 0 <= id && id < MAX_ID; - } -}; - -using TQueueId = int64; - -class TQueue { - public: - struct Event { - EventId id; - Slice data; - double expire_at; - }; - struct RawEvent { - int64 logevent_id{0}; - EventId event_id; - string data; - double expire_at{0}; - }; - class Callback { - public: - using RawEvent = TQueue::RawEvent; - virtual ~Callback() { - } - virtual int64 push(TQueueId queue_id, const RawEvent &event) = 0; - virtual void pop(int64 logevent_id) = 0; - }; - - void set_callback(unique_ptr callback) { - callback_ = std::move(callback); - } - unique_ptr extract_callback() { - return std::move(callback_); - } - - void do_push(TQueueId queue_id, RawEvent &&raw_event) { - CHECK(!raw_event.event_id.empty()); - if (callback_ && raw_event.logevent_id == 0) { - raw_event.logevent_id = callback_->push(queue_id, raw_event); - } - auto &q = queues_[queue_id]; - q.tail_id = raw_event.event_id.next().move_as_ok(); - q.events.push(std::move(raw_event)); - } - - void on_pop(int64 logevent_id) { - if (callback_) { - callback_->pop(logevent_id); - } - } - - EventId push(TQueueId queue_id, string data, double expire_at, EventId new_id = EventId()) { - auto &q = queues_[queue_id]; - EventId event_id; - while (true) { - if (q.events.empty()) { - q.tail_id = new_id.empty() ? EventId::create_random() : new_id; - } - event_id = q.tail_id; - CHECK(!event_id.empty()); - if (event_id.next().is_ok()) { - break; - } - confirm_read(q, event_id); - } - - RawEvent raw_event; - raw_event.event_id = event_id; - raw_event.data = data; - raw_event.expire_at = expire_at; - do_push(queue_id, std::move(raw_event)); - return event_id; - } - - EventId get_head(TQueueId queue_id) const { - auto it = queues_.find(queue_id); - if (it == queues_.end()) { - return EventId(); - } - auto &q = it->second; - if (q.events.empty()) { - return EventId(); - } - return q.events.front().event_id; - } - - EventId get_tail(TQueueId queue_id) const { - auto it = queues_.find(queue_id); - if (it == queues_.end()) { - return EventId(); - } - auto &q = it->second; - if (q.events.empty()) { - return EventId(); - } - return q.tail_id; - } - - Result get(TQueueId queue_id, EventId from_id, double now, MutableSpan events) { - auto it = queues_.find(queue_id); - if (it == queues_.end()) { - return 0; - } - auto &q = it->second; - //TODO: sanity check for from_id - confirm_read(q, from_id); - if (q.events.empty()) { - return 0; - } - - auto from_events = q.events.as_span(); - size_t res_n = 0; - for (size_t i = 0; i < from_events.size(); i++) { - auto &from = from_events[i]; - if (from.expire_at < now) { - //TODO: pop this element - continue; - } - - auto &to = events[res_n]; - to.data = from.data; - to.id = from.event_id; - to.expire_at = from.expire_at; - - res_n++; - if (res_n == events.size()) { - break; - } - } - return res_n; - } - - private: - struct Queue { - EventId tail_id; - VectorQueue events; - }; - - HashMap queues_; - unique_ptr callback_; - - void confirm_read(Queue &q, EventId till_id) { - while (!q.events.empty() && q.events.front().event_id.value() < till_id.value()) { - on_pop(q.events.front().logevent_id); - q.events.pop(); - } - } -}; - -template -class TQueueBinlog : public TQueue::Callback { - public: - struct LogEvent : public Storer { - LogEvent() = default; - int64 queue_id; - int32 event_id; - int32 expire_at; - Slice data; - - template - void store(StorerT &&storer) const { - using td::store; - store(queue_id, storer); - store(event_id, storer); - store(expire_at, storer); - store(data, storer); - } - - template - void parse(ParserT &&parser) { - using td::parse; - parse(queue_id, parser); - parse(event_id, parser); - parse(expire_at, parser); - data = parser.template fetch_string(); - } - - size_t size() const override { - TlStorerCalcLength storer; - store(storer); - return storer.get_length(); - } - - size_t store(uint8 *ptr) const override { - TlStorerUnsafe storer(ptr); - store(storer); - return static_cast(storer.get_buf() - ptr); - } - }; - - int64 push(TQueueId queue_id, const RawEvent &event) override { - LogEvent log_event; - log_event.queue_id = queue_id; - log_event.event_id = event.event_id.value(); - log_event.expire_at = static_cast(event.expire_at); - log_event.data = event.data; - return binlog_->add(magic_, log_event); - } - - void pop(int64 logevent_id) override { - binlog_->erase(binlog_, logevent_id); - } - - Status replay(const BinlogEvent &binlog_event, TQueue &q) { - LogEvent event; - TlParser parser(binlog_event.data_); - event.parse(parser); - TRY_STATUS(parser.get_status()); - TRY_RESULT(event_id, EventId::from_int32(event.event_id)); - RawEvent raw_event; - raw_event.logevent_id = binlog_event.id_; - raw_event.event_id = event_id; - raw_event.expire_at = event.expire_at; - raw_event.data = event.data.str(); - q.do_push(event.queue_id, std::move(raw_event)); - return Status::OK(); - } - - void set_binlog(std::shared_ptr binlog) { - binlog_ = std::move(binlog); - } - - private: - std::shared_ptr binlog_; - int32 magic_{2314}; -}; - -class MemoryStorage : public TQueue::Callback { - public: - int64 push(TQueueId queue_id, const RawEvent &event) override { - auto logevent_id = next_logevent_id_++; - events_[logevent_id] = std::make_pair(queue_id, event); - - return logevent_id; - } - void pop(int64 logevent_id) override { - events_.erase(logevent_id); - } - - void replay(TQueue &q) { - LOG(ERROR) << events_.size(); - for (auto e : events_) { - auto x = e.second; - x.second.logevent_id = e.first; - q.do_push(x.first, std::move(x.second)); - } - } - - private: - int64 next_logevent_id_{1}; - std::map> events_; -}; - -struct Step { - std::function func; - td::uint32 weight; -}; -class RandomSteps { - public: - RandomSteps(std::vector steps) : steps_(std::move(steps)) { - for (auto &step : steps_) { - steps_sum_ += step.weight; - } - } - template - void step(Random &rnd) { - auto w = rnd() % steps_sum_; - for (auto &step : steps_) { - if (w < step.weight) { - step.func(); - break; - } - w -= step.weight; - } - } - - private: - std::vector steps_; - td::int32 steps_sum_ = 0; -}; TEST(TQueue, hands) { TQueue::Event events[100]; auto events_span = MutableSpan(events, 100); - TQueue tqueue; + auto tqueue = TQueue::create(); auto qid = 12; - ASSERT_EQ(true, tqueue.get_head(qid).empty()); - ASSERT_EQ(true, tqueue.get_tail(qid).empty()); - tqueue.push(qid, "hello", 0); - auto head = tqueue.get_head(qid); - ASSERT_EQ(head.next().ok(), tqueue.get_tail(qid)); - ASSERT_EQ(1u, tqueue.get(qid, head, 0, events_span).move_as_ok()); + ASSERT_EQ(true, tqueue->get_head(qid).empty()); + ASSERT_EQ(true, tqueue->get_tail(qid).empty()); + tqueue->push(qid, "hello", 0); + auto head = tqueue->get_head(qid); + ASSERT_EQ(head.next().ok(), tqueue->get_tail(qid)); + ASSERT_EQ(1u, tqueue->get(qid, head, 0, events_span).move_as_ok()); } class TestTQueue { @@ -364,56 +43,61 @@ class TestTQueue { return "test_binlog"; } TestTQueue() { + baseline_ = TQueue::create(); + memory_ = TQueue::create(); + auto memory_storage = td::make_unique(); memory_storage_ = memory_storage.get(); - memory_.set_callback(std::move(memory_storage)); + memory_->set_callback(std::move(memory_storage)); + binlog_ = TQueue::create(); auto tqueue_binlog = make_unique>(); Binlog::destroy(binlog_path()).ensure(); auto binlog = std::make_shared(); binlog->init(binlog_path().str(), [&](const BinlogEvent &event) { UNREACHABLE(); }).ensure(); tqueue_binlog->set_binlog(binlog); - binlog_.set_callback(std::move(tqueue_binlog)); + binlog_->set_callback(std::move(tqueue_binlog)); } void restart(Random::Xorshift128plus &rnd) { - memory_.extract_callback().release(); + memory_->extract_callback().release(); auto memory_storage = unique_ptr(memory_storage_); - memory_ = TQueue(); - memory_storage->replay(memory_); - memory_.set_callback(std::move(memory_storage)); + memory_ = TQueue::create(); + memory_storage->replay(*memory_); + memory_->set_callback(std::move(memory_storage)); if (rnd.fast(0, 100) != 0) { return; } LOG(ERROR) << "RESTART BINLOG"; - binlog_ = TQueue(); + binlog_ = TQueue::create(); auto tqueue_binlog = make_unique>(); auto binlog = std::make_shared(); - binlog->init(binlog_path().str(), [&](const BinlogEvent &event) { tqueue_binlog->replay(event, binlog_); }) + binlog->init(binlog_path().str(), [&](const BinlogEvent &event) { tqueue_binlog->replay(event, *binlog_); }) .ensure(); tqueue_binlog->set_binlog(binlog); - binlog_.set_callback(std::move(tqueue_binlog)); + binlog_->set_callback(std::move(tqueue_binlog)); } - EventId push(TQueueId queue_id, string data, double expire_at, EventId new_id = EventId()) { - auto a_id = baseline_.push(queue_id, data, expire_at, new_id); - auto b_id = memory_.push(queue_id, data, expire_at, new_id); - auto c_id = binlog_.push(queue_id, data, expire_at, new_id); + TQueue::EventId push(TQueue::QueueId queue_id, string data, double expire_at, + TQueue::EventId new_id = TQueue::EventId()) { + auto a_id = baseline_->push(queue_id, data, expire_at, new_id); + auto b_id = memory_->push(queue_id, data, expire_at, new_id); + auto c_id = binlog_->push(queue_id, data, expire_at, new_id); ASSERT_EQ(a_id, b_id); ASSERT_EQ(a_id, c_id); return a_id; } - void check_head_tail(TQueueId qid) { - ASSERT_EQ(baseline_.get_head(qid), memory_.get_head(qid)); - ASSERT_EQ(baseline_.get_head(qid), binlog_.get_head(qid)); - ASSERT_EQ(baseline_.get_tail(qid), memory_.get_tail(qid)); - ASSERT_EQ(baseline_.get_tail(qid), binlog_.get_tail(qid)); + void check_head_tail(TQueue::QueueId qid) { + ASSERT_EQ(baseline_->get_head(qid), memory_->get_head(qid)); + ASSERT_EQ(baseline_->get_head(qid), binlog_->get_head(qid)); + ASSERT_EQ(baseline_->get_tail(qid), memory_->get_tail(qid)); + ASSERT_EQ(baseline_->get_tail(qid), binlog_->get_tail(qid)); } - void check_get(TQueueId qid, Random::Xorshift128plus &rnd) { + void check_get(TQueue::QueueId qid, Random::Xorshift128plus &rnd) { TQueue::Event a[10]; MutableSpan a_span(a, 10); TQueue::Event b[10]; @@ -421,9 +105,9 @@ class TestTQueue { TQueue::Event c[10]; MutableSpan c_span(b, 10); - auto a_from = baseline_.get_head(qid); - auto b_from = memory_.get_head(qid); - auto c_from = binlog_.get_head(qid); + auto a_from = baseline_->get_head(qid); + auto b_from = memory_->get_head(qid); + auto c_from = binlog_->get_head(qid); ASSERT_EQ(a_from, b_from); ASSERT_EQ(a_from, c_from); @@ -431,9 +115,9 @@ class TestTQueue { if (tmp.is_ok()) { a_from = tmp.move_as_ok(); } - auto a_size = baseline_.get(qid, a_from, 0, a_span).move_as_ok(); - auto b_size = memory_.get(qid, a_from, 0, b_span).move_as_ok(); - auto c_size = binlog_.get(qid, a_from, 0, c_span).move_as_ok(); + auto a_size = baseline_->get(qid, a_from, 0, a_span).move_as_ok(); + auto b_size = memory_->get(qid, a_from, 0, b_span).move_as_ok(); + auto c_size = binlog_->get(qid, a_from, 0, c_span).move_as_ok(); ASSERT_EQ(a_size, b_size); ASSERT_EQ(a_size, c_size); for (size_t i = 0; i < a_size; i++) { @@ -445,16 +129,18 @@ class TestTQueue { } private: - TQueue baseline_; - TQueue memory_; - TQueue binlog_; + unique_ptr baseline_; + unique_ptr memory_; + unique_ptr binlog_; MemoryStorage *memory_storage_{nullptr}; - //TQueue binlog_; }; TEST(TQueue, random) { + using EventId = TQueue::EventId; Random::Xorshift128plus rnd(123); - auto next_qid = [&] { return rnd.fast(1, 10); }; + auto next_qid = [&] { + return rnd.fast(1, 10); + }; auto next_first_id = [&] { if (rnd.fast(0, 3) == 0) { return EventId::from_int32(EventId::MAX_ID - 20).move_as_ok(); @@ -466,9 +152,15 @@ TEST(TQueue, random) { auto data = PSTRING() << rnd(); q.push(next_qid(), data, 0, next_first_id()); }; - auto check_head_tail = [&] { q.check_head_tail(next_qid()); }; - auto restart = [&] { q.restart(rnd); }; - auto get = [&] { q.check_get(next_qid(), rnd); }; + auto check_head_tail = [&] { + q.check_head_tail(next_qid()); + }; + auto restart = [&] { + q.restart(rnd); + }; + auto get = [&] { + q.check_get(next_qid(), rnd); + }; RandomSteps steps({{push_event, 100}, {check_head_tail, 10}, {get, 40}, {restart, 1}}); for (int i = 0; i < 1000000; i++) { steps.step(rnd);