diff --git a/tddb/td/db/TQueue.cpp b/tddb/td/db/TQueue.cpp index 57ac2119c..f7fa43148 100644 --- a/tddb/td/db/TQueue.cpp +++ b/tddb/td/db/TQueue.cpp @@ -111,8 +111,7 @@ class TQueueImpl : public TQueue { q.events.push(std::move(raw_event)); } - Result push(QueueId queue_id, string data, double expires_at, EventId new_id = EventId(), - int64 extra = 0) override { + Result push(QueueId queue_id, string data, double expires_at, int64 extra, EventId hint_new_id) override { auto &q = queues_[queue_id]; if (q.events.size() >= MAX_QUEUE_EVENTS) { return Status::Error("Queue is full"); @@ -123,10 +122,10 @@ class TQueueImpl : public TQueue { EventId event_id; while (true) { if (q.tail_id.empty()) { - q.tail_id = new_id.empty() ? EventId::create_random() : new_id; + q.tail_id = hint_new_id.empty() ? EventId::create_random() : hint_new_id; } event_id = q.tail_id; - CHECK(!event_id.empty()); + CHECK(event_id.is_valid()); if (event_id.next().is_ok()) { break; } @@ -135,7 +134,7 @@ class TQueueImpl : public TQueue { } q.tail_id = EventId(); q.events = {}; - CHECK(new_id.next().is_ok()); + CHECK(hint_new_id.next().is_ok()); } RawEvent raw_event; diff --git a/tddb/td/db/TQueue.h b/tddb/td/db/TQueue.h index e2e530661..ce7d04749 100644 --- a/tddb/td/db/TQueue.h +++ b/tddb/td/db/TQueue.h @@ -100,8 +100,7 @@ class TQueue { virtual void do_push(QueueId queue_id, RawEvent &&raw_event) = 0; - virtual Result push(QueueId queue_id, string data, double expires_at, EventId new_id = EventId(), - int64 extra = 0) = 0; + virtual Result push(QueueId queue_id, string data, double expires_at, int64 extra, EventId hint_new_id) = 0; virtual void forget(QueueId queue_id, EventId event_id) = 0; diff --git a/test/tqueue.cpp b/test/tqueue.cpp index bdb2ab9e2..08c7248bc 100644 --- a/test/tqueue.cpp +++ b/test/tqueue.cpp @@ -21,17 +21,15 @@ #include #include -namespace td { - TEST(TQueue, hands) { - TQueue::Event events[100]; - auto events_span = MutableSpan(events, 100); + td::TQueue::Event events[100]; + auto events_span = td::MutableSpan(events, 100); - auto tqueue = TQueue::create(); + auto tqueue = td::TQueue::create(); auto qid = 12; ASSERT_EQ(true, tqueue->get_head(qid).empty()); ASSERT_EQ(true, tqueue->get_tail(qid).empty()); - tqueue->push(qid, "hello", 0); + tqueue->push(qid, "hello", 0, 0, td::Auto()); auto head = tqueue->get_head(qid); ASSERT_EQ(head.next().ok(), tqueue->get_tail(qid)); ASSERT_EQ(1u, tqueue->get(qid, head, true, 0, events_span).move_as_ok()); @@ -39,34 +37,36 @@ TEST(TQueue, hands) { class TestTQueue { public: - CSlice binlog_path() { + using EventId = td::TQueue::EventId; + + td::CSlice binlog_path() { return "test_binlog"; } TestTQueue() { - baseline_ = TQueue::create(); - memory_ = TQueue::create(); + baseline_ = td::TQueue::create(); + memory_ = td::TQueue::create(); - auto memory_storage = td::make_unique(); + auto memory_storage = td::make_unique(); memory_storage_ = memory_storage.get(); memory_->set_callback(std::move(memory_storage)); - binlog_ = TQueue::create(); - auto tqueue_binlog = make_unique>(); - Binlog::destroy(binlog_path()).ensure(); - auto binlog = std::make_shared(); - binlog->init(binlog_path().str(), [&](const BinlogEvent &event) { UNREACHABLE(); }).ensure(); + binlog_ = td::TQueue::create(); + auto tqueue_binlog = td::make_unique>(); + td::Binlog::destroy(binlog_path()).ensure(); + auto binlog = std::make_shared(); + binlog->init(binlog_path().str(), [&](const td::BinlogEvent &event) { UNREACHABLE(); }).ensure(); tqueue_binlog->set_binlog(binlog); binlog_->set_callback(std::move(tqueue_binlog)); } - void restart(Random::Xorshift128plus &rnd, double now) { + void restart(td::Random::Xorshift128plus &rnd, double now) { if (rnd.fast(0, 10) == 0) { baseline_->run_gc(now); } memory_->extract_callback().release(); - auto memory_storage = unique_ptr(memory_storage_); - memory_ = TQueue::create(); + auto memory_storage = td::unique_ptr(memory_storage_); + memory_ = td::TQueue::create(); memory_storage->replay(*memory_); memory_->set_callback(std::move(memory_storage)); if (rnd.fast(0, 10) == 0) { @@ -78,10 +78,10 @@ class TestTQueue { } LOG(INFO) << "Restart binlog"; - binlog_ = TQueue::create(); - auto tqueue_binlog = make_unique>(); - auto binlog = std::make_shared(); - binlog->init(binlog_path().str(), [&](const BinlogEvent &event) { tqueue_binlog->replay(event, *binlog_); }) + binlog_ = td::TQueue::create(); + auto tqueue_binlog = td::make_unique>(); + auto binlog = std::make_shared(); + binlog->init(binlog_path().str(), [&](const td::BinlogEvent &event) { tqueue_binlog->replay(event, *binlog_); }) .ensure(); tqueue_binlog->set_binlog(binlog); binlog_->set_callback(std::move(tqueue_binlog)); @@ -90,30 +90,29 @@ class TestTQueue { } } - TQueue::EventId push(TQueue::QueueId queue_id, string data, double expires_at, - TQueue::EventId new_id = TQueue::EventId()) { - auto a_id = baseline_->push(queue_id, data, expires_at, new_id).move_as_ok(); - auto b_id = memory_->push(queue_id, data, expires_at, new_id).move_as_ok(); - auto c_id = binlog_->push(queue_id, data, expires_at, new_id).move_as_ok(); + EventId push(td::TQueue::QueueId queue_id, td::string data, double expires_at, EventId new_id = EventId()) { + auto a_id = baseline_->push(queue_id, data, expires_at, 0, new_id).move_as_ok(); + auto b_id = memory_->push(queue_id, data, expires_at, 0, new_id).move_as_ok(); + auto c_id = binlog_->push(queue_id, data, expires_at, 0, new_id).move_as_ok(); ASSERT_EQ(a_id, b_id); ASSERT_EQ(a_id, c_id); return a_id; } - void check_head_tail(TQueue::QueueId qid, double now) { + void check_head_tail(td::TQueue::QueueId qid, double now) { //ASSERT_EQ(baseline_->get_head(qid), memory_->get_head(qid)); //ASSERT_EQ(baseline_->get_head(qid), binlog_->get_head(qid)); ASSERT_EQ(baseline_->get_tail(qid), memory_->get_tail(qid)); ASSERT_EQ(baseline_->get_tail(qid), binlog_->get_tail(qid)); } - void check_get(TQueue::QueueId qid, Random::Xorshift128plus &rnd, double now) { - TQueue::Event a[10]; - MutableSpan a_span(a, 10); - TQueue::Event b[10]; - MutableSpan b_span(b, 10); - TQueue::Event c[10]; - MutableSpan c_span(c, 10); + void check_get(td::TQueue::QueueId qid, td::Random::Xorshift128plus &rnd, double now) { + td::TQueue::Event a[10]; + td::MutableSpan a_span(a, 10); + td::TQueue::Event b[10]; + td::MutableSpan b_span(b, 10); + td::TQueue::Event c[10]; + td::MutableSpan c_span(c, 10); auto a_from = baseline_->get_head(qid); //auto b_from = memory_->get_head(qid); @@ -139,15 +138,15 @@ class TestTQueue { } private: - unique_ptr baseline_; - unique_ptr memory_; - unique_ptr binlog_; - TQueueMemoryStorage *memory_storage_{nullptr}; + td::unique_ptr baseline_; + td::unique_ptr memory_; + td::unique_ptr binlog_; + td::TQueueMemoryStorage *memory_storage_{nullptr}; }; TEST(TQueue, random) { - using EventId = TQueue::EventId; - Random::Xorshift128plus rnd(123); + using EventId = td::TQueue::EventId; + td::Random::Xorshift128plus rnd(123); auto next_qid = [&] { return rnd.fast(1, 10); }; @@ -176,10 +175,8 @@ TEST(TQueue, random) { auto get = [&] { q.check_get(next_qid(), rnd, now); }; - RandomSteps steps({{push_event, 100}, {check_head_tail, 10}, {get, 40}, {inc_now, 5}, {restart, 1}}); + td::RandomSteps steps({{push_event, 100}, {check_head_tail, 10}, {get, 40}, {inc_now, 5}, {restart, 1}}); for (int i = 0; i < 100000; i++) { steps.step(rnd); } } - -} // namespace td