From 5109f43435c917a0b242eb6eec2c7783b3c23ff0 Mon Sep 17 00:00:00 2001 From: levlam Date: Thu, 6 Aug 2020 04:02:54 +0300 Subject: [PATCH] Use unix_time in TQueue. GitOrigin-RevId: d49c0871c7cb1a4315e38a66c198f56cb2b89428 --- tddb/td/db/TQueue.cpp | 30 +++++++++++++----------------- tddb/td/db/TQueue.h | 15 ++++++--------- test/tqueue.cpp | 16 ++++++++-------- 3 files changed, 27 insertions(+), 34 deletions(-) diff --git a/tddb/td/db/TQueue.cpp b/tddb/td/db/TQueue.cpp index dab0e9145..5e0335492 100644 --- a/tddb/td/db/TQueue.cpp +++ b/tddb/td/db/TQueue.cpp @@ -130,7 +130,7 @@ class TQueueImpl : public TQueue { return true; } - Result push(QueueId queue_id, string data, double expires_at, int64 extra, EventId hint_new_id) override { + Result push(QueueId queue_id, string data, int32 expires_at, int64 extra, EventId hint_new_id) override { if (data.empty()) { return Status::Error("Data is empty"); } @@ -210,7 +210,7 @@ class TQueueImpl : public TQueue { pop(q, queue_id, it, q.tail_id); } - Result get(QueueId queue_id, EventId from_id, bool forget_previous, double now, + Result get(QueueId queue_id, EventId from_id, bool forget_previous, int32 unix_time_now, MutableSpan &result_events) override { auto it = queues_.find(queue_id); if (it == queues_.end()) { @@ -226,22 +226,22 @@ class TQueueImpl : public TQueue { return Status::Error("Specified from_id is in the past"); } - return do_get(queue_id, q, from_id, forget_previous, now, result_events); + return do_get(queue_id, q, from_id, forget_previous, unix_time_now, result_events); } - std::pair run_gc(double now) override { + std::pair run_gc(int32 unix_time_now) override { uint64 total_deleted_events = 0; uint64 deleted_queues = 0; for (auto queue_it = queues_.begin(); queue_it != queues_.end();) { size_t deleted_events = 0; for (auto it = queue_it->second.events.begin(); it != queue_it->second.events.end();) { auto &e = it->second; - if (e.expires_at < now) { + if (e.expires_at < unix_time_now) { if (!it->second.data.empty()) { deleted_events++; } pop(queue_it->second, queue_it->first, it, - e.expires_at < now - 7 * 86400 ? EventId() : queue_it->second.tail_id); + e.expires_at < unix_time_now - 7 * 86400 ? EventId() : queue_it->second.tail_id); } else { ++it; } @@ -316,7 +316,7 @@ class TQueueImpl : public TQueue { event.data = {}; } - size_t do_get(QueueId queue_id, Queue &q, EventId from_id, bool forget_previous, double now, + size_t do_get(QueueId queue_id, Queue &q, EventId from_id, bool forget_previous, int32 unix_time_now, MutableSpan &result_events) { if (forget_previous) { for (auto it = q.events.begin(); it != q.events.end() && it->first < from_id;) { @@ -327,7 +327,7 @@ class TQueueImpl : public TQueue { size_t ready_n = 0; for (auto it = q.events.lower_bound(from_id); it != q.events.end();) { auto &event = it->second; - if (event.expires_at < now || event.data.empty()) { + if (event.expires_at < unix_time_now || event.data.empty()) { pop(q, queue_id, it, q.tail_id); } else { CHECK(!(event.event_id < from_id)); @@ -400,20 +400,15 @@ struct TQueueLogEvent : public Storer { } }; -template -TQueueBinlog::TQueueBinlog() { - diff_ = Clocks::system() - Time::now(); -} - template uint64 TQueueBinlog::push(QueueId queue_id, const RawEvent &event) { TQueueLogEvent log_event; log_event.queue_id = queue_id; log_event.event_id = event.event_id.value(); - log_event.expires_at = static_cast(event.expires_at + diff_ + 1); + log_event.expires_at = event.expires_at; log_event.data = event.data; log_event.extra = event.extra; - auto magic = magic_ + (log_event.extra != 0); + auto magic = BINLOG_EVENT_TYPE + (log_event.extra != 0); if (event.logevent_id == 0) { return binlog_->add(magic, log_event); } @@ -430,7 +425,7 @@ template Status TQueueBinlog::replay(const BinlogEvent &binlog_event, TQueue &q) const { TQueueLogEvent event; TlParser parser(binlog_event.data_); - int32 has_extra = binlog_event.type_ - magic_; + int32 has_extra = binlog_event.type_ - BINLOG_EVENT_TYPE; if (has_extra != 0 && has_extra != 1) { return Status::Error("Wrong magic"); } @@ -441,7 +436,7 @@ Status TQueueBinlog::replay(const BinlogEvent &binlog_event, TQueue &q) RawEvent raw_event; raw_event.logevent_id = binlog_event.id_; raw_event.event_id = event_id; - raw_event.expires_at = event.expires_at - diff_; + raw_event.expires_at = event.expires_at; raw_event.data = event.data.str(); raw_event.extra = event.extra; if (!q.do_push(event.queue_id, std::move(raw_event))) { @@ -477,6 +472,7 @@ void TQueueMemoryStorage::replay(TQueue &q) const { } } void TQueueMemoryStorage::close(Promise<> promise) { + events_.clear(); promise.set_value({}); } diff --git a/tddb/td/db/TQueue.h b/tddb/td/db/TQueue.h index 2c6843ad0..f97597d6d 100644 --- a/tddb/td/db/TQueue.h +++ b/tddb/td/db/TQueue.h @@ -56,7 +56,7 @@ class TQueue { EventId id; Slice data; int64 extra{0}; - double expires_at{0}; + int32 expires_at{0}; }; struct RawEvent { @@ -64,7 +64,7 @@ class TQueue { EventId event_id; string data; int64 extra{0}; - double expires_at{0}; + int32 expires_at{0}; }; using QueueId = int64; @@ -101,19 +101,19 @@ class TQueue { virtual bool do_push(QueueId queue_id, RawEvent &&raw_event) = 0; - virtual Result push(QueueId queue_id, string data, double expires_at, int64 extra, EventId hint_new_id) = 0; + virtual Result push(QueueId queue_id, string data, int32 expires_at, int64 extra, EventId hint_new_id) = 0; virtual void forget(QueueId queue_id, EventId event_id) = 0; virtual EventId get_head(QueueId queue_id) const = 0; virtual EventId get_tail(QueueId queue_id) const = 0; - virtual Result get(QueueId queue_id, EventId from_id, bool forget_previous, double now, + virtual Result get(QueueId queue_id, EventId from_id, bool forget_previous, int32 unix_time_now, MutableSpan &result_events) = 0; virtual size_t get_size(QueueId queue_id) = 0; - virtual std::pair run_gc(double now) = 0; + virtual std::pair run_gc(int32 unix_time_now) = 0; virtual void close(Promise<> promise) = 0; }; @@ -124,8 +124,6 @@ struct BinlogEvent; template class TQueueBinlog : public TQueue::StorageCallback { public: - TQueueBinlog(); - uint64 push(QueueId queue_id, const RawEvent &event) override; void pop(uint64 logevent_id) override; Status replay(const BinlogEvent &binlog_event, TQueue &q) const TD_WARN_UNUSED_RESULT; @@ -137,8 +135,7 @@ class TQueueBinlog : public TQueue::StorageCallback { private: std::shared_ptr binlog_; - int32 magic_{2314}; - double diff_{0}; + static constexpr int32 BINLOG_EVENT_TYPE = 2314; }; class TQueueMemoryStorage : public TQueue::StorageCallback { diff --git a/test/tqueue.cpp b/test/tqueue.cpp index 6122daf55..2b37ca4f8 100644 --- a/test/tqueue.cpp +++ b/test/tqueue.cpp @@ -68,7 +68,7 @@ class TestTQueue { binlog_->set_callback(std::move(tqueue_binlog)); } - void restart(td::Random::Xorshift128plus &rnd, double now) { + void restart(td::Random::Xorshift128plus &rnd, td::int32 now) { if (rnd.fast(0, 10) == 0) { baseline_->run_gc(now); } @@ -101,7 +101,7 @@ class TestTQueue { } } - EventId push(td::TQueue::QueueId queue_id, td::string data, double expires_at, EventId new_id = EventId()) { + EventId push(td::TQueue::QueueId queue_id, td::string data, td::int32 expires_at, EventId new_id = EventId()) { auto a_id = baseline_->push(queue_id, data, expires_at, 0, new_id).move_as_ok(); auto b_id = memory_->push(queue_id, data, expires_at, 0, new_id).move_as_ok(); auto c_id = binlog_->push(queue_id, data, expires_at, 0, new_id).move_as_ok(); @@ -110,14 +110,14 @@ class TestTQueue { return a_id; } - void check_head_tail(td::TQueue::QueueId qid, double now) { + void check_head_tail(td::TQueue::QueueId qid) { //ASSERT_EQ(baseline_->get_head(qid), memory_->get_head(qid)); //ASSERT_EQ(baseline_->get_head(qid), binlog_->get_head(qid)); ASSERT_EQ(baseline_->get_tail(qid), memory_->get_tail(qid)); ASSERT_EQ(baseline_->get_tail(qid), binlog_->get_tail(qid)); } - void check_get(td::TQueue::QueueId qid, td::Random::Xorshift128plus &rnd, double now) { + void check_get(td::TQueue::QueueId qid, td::Random::Xorshift128plus &rnd, td::int32 now) { td::TQueue::Event a[10]; td::MutableSpan a_span(a, 10); td::TQueue::Event b[10]; @@ -169,7 +169,7 @@ TEST(TQueue, random) { }; TestTQueue q; - double now = 0; + td::int32 now = 0; auto push_event = [&] { auto data = PSTRING() << rnd(); if (rnd.fast(0, 10000) == 0) { @@ -181,7 +181,7 @@ TEST(TQueue, random) { now += 10; }; auto check_head_tail = [&] { - q.check_head_tail(next_queue_id(), now); + q.check_head_tail(next_queue_id()); }; auto restart = [&] { q.restart(rnd, now); @@ -206,7 +206,7 @@ TEST(TQueue, memory_leak) { tqueue_binlog->set_binlog(std::move(binlog)); tqueue->set_callback(std::move(tqueue_binlog)); - double now = 0; + td::int32 now = 0; std::vector ids; td::Random::Xorshift128plus rnd(123); int i = 0; @@ -219,7 +219,7 @@ TEST(TQueue, memory_leak) { tqueue->forget(1, ids.back()); ids.pop_back(); } - now += 1; + now++; if (i++ % 100000 == 0) { LOG(ERROR) << td::BufferAllocator::get_buffer_mem() << " " << tqueue->get_size(1) << " " << td::BufferAllocator::get_buffer_slice_size();