Use unix_time in TQueue.

GitOrigin-RevId: d49c0871c7cb1a4315e38a66c198f56cb2b89428
This commit is contained in:
levlam 2020-08-06 04:02:54 +03:00
parent 95ac56215b
commit 5109f43435
3 changed files with 27 additions and 34 deletions

View File

@ -130,7 +130,7 @@ class TQueueImpl : public TQueue {
return true;
}
Result<EventId> push(QueueId queue_id, string data, double expires_at, int64 extra, EventId hint_new_id) override {
Result<EventId> push(QueueId queue_id, string data, int32 expires_at, int64 extra, EventId hint_new_id) override {
if (data.empty()) {
return Status::Error("Data is empty");
}
@ -210,7 +210,7 @@ class TQueueImpl : public TQueue {
pop(q, queue_id, it, q.tail_id);
}
Result<size_t> get(QueueId queue_id, EventId from_id, bool forget_previous, double now,
Result<size_t> get(QueueId queue_id, EventId from_id, bool forget_previous, int32 unix_time_now,
MutableSpan<Event> &result_events) override {
auto it = queues_.find(queue_id);
if (it == queues_.end()) {
@ -226,22 +226,22 @@ class TQueueImpl : public TQueue {
return Status::Error("Specified from_id is in the past");
}
return do_get(queue_id, q, from_id, forget_previous, now, result_events);
return do_get(queue_id, q, from_id, forget_previous, unix_time_now, result_events);
}
std::pair<uint64, uint64> run_gc(double now) override {
std::pair<uint64, uint64> run_gc(int32 unix_time_now) override {
uint64 total_deleted_events = 0;
uint64 deleted_queues = 0;
for (auto queue_it = queues_.begin(); queue_it != queues_.end();) {
size_t deleted_events = 0;
for (auto it = queue_it->second.events.begin(); it != queue_it->second.events.end();) {
auto &e = it->second;
if (e.expires_at < now) {
if (e.expires_at < unix_time_now) {
if (!it->second.data.empty()) {
deleted_events++;
}
pop(queue_it->second, queue_it->first, it,
e.expires_at < now - 7 * 86400 ? EventId() : queue_it->second.tail_id);
e.expires_at < unix_time_now - 7 * 86400 ? EventId() : queue_it->second.tail_id);
} else {
++it;
}
@ -316,7 +316,7 @@ class TQueueImpl : public TQueue {
event.data = {};
}
size_t do_get(QueueId queue_id, Queue &q, EventId from_id, bool forget_previous, double now,
size_t do_get(QueueId queue_id, Queue &q, EventId from_id, bool forget_previous, int32 unix_time_now,
MutableSpan<Event> &result_events) {
if (forget_previous) {
for (auto it = q.events.begin(); it != q.events.end() && it->first < from_id;) {
@ -327,7 +327,7 @@ class TQueueImpl : public TQueue {
size_t ready_n = 0;
for (auto it = q.events.lower_bound(from_id); it != q.events.end();) {
auto &event = it->second;
if (event.expires_at < now || event.data.empty()) {
if (event.expires_at < unix_time_now || event.data.empty()) {
pop(q, queue_id, it, q.tail_id);
} else {
CHECK(!(event.event_id < from_id));
@ -400,20 +400,15 @@ struct TQueueLogEvent : public Storer {
}
};
template <class BinlogT>
TQueueBinlog<BinlogT>::TQueueBinlog() {
diff_ = Clocks::system() - Time::now();
}
template <class BinlogT>
uint64 TQueueBinlog<BinlogT>::push(QueueId queue_id, const RawEvent &event) {
TQueueLogEvent log_event;
log_event.queue_id = queue_id;
log_event.event_id = event.event_id.value();
log_event.expires_at = static_cast<int32>(event.expires_at + diff_ + 1);
log_event.expires_at = event.expires_at;
log_event.data = event.data;
log_event.extra = event.extra;
auto magic = magic_ + (log_event.extra != 0);
auto magic = BINLOG_EVENT_TYPE + (log_event.extra != 0);
if (event.logevent_id == 0) {
return binlog_->add(magic, log_event);
}
@ -430,7 +425,7 @@ template <class BinlogT>
Status TQueueBinlog<BinlogT>::replay(const BinlogEvent &binlog_event, TQueue &q) const {
TQueueLogEvent event;
TlParser parser(binlog_event.data_);
int32 has_extra = binlog_event.type_ - magic_;
int32 has_extra = binlog_event.type_ - BINLOG_EVENT_TYPE;
if (has_extra != 0 && has_extra != 1) {
return Status::Error("Wrong magic");
}
@ -441,7 +436,7 @@ Status TQueueBinlog<BinlogT>::replay(const BinlogEvent &binlog_event, TQueue &q)
RawEvent raw_event;
raw_event.logevent_id = binlog_event.id_;
raw_event.event_id = event_id;
raw_event.expires_at = event.expires_at - diff_;
raw_event.expires_at = event.expires_at;
raw_event.data = event.data.str();
raw_event.extra = event.extra;
if (!q.do_push(event.queue_id, std::move(raw_event))) {
@ -477,6 +472,7 @@ void TQueueMemoryStorage::replay(TQueue &q) const {
}
}
void TQueueMemoryStorage::close(Promise<> promise) {
events_.clear();
promise.set_value({});
}

View File

@ -56,7 +56,7 @@ class TQueue {
EventId id;
Slice data;
int64 extra{0};
double expires_at{0};
int32 expires_at{0};
};
struct RawEvent {
@ -64,7 +64,7 @@ class TQueue {
EventId event_id;
string data;
int64 extra{0};
double expires_at{0};
int32 expires_at{0};
};
using QueueId = int64;
@ -101,19 +101,19 @@ class TQueue {
virtual bool do_push(QueueId queue_id, RawEvent &&raw_event) = 0;
virtual Result<EventId> push(QueueId queue_id, string data, double expires_at, int64 extra, EventId hint_new_id) = 0;
virtual Result<EventId> push(QueueId queue_id, string data, int32 expires_at, int64 extra, EventId hint_new_id) = 0;
virtual void forget(QueueId queue_id, EventId event_id) = 0;
virtual EventId get_head(QueueId queue_id) const = 0;
virtual EventId get_tail(QueueId queue_id) const = 0;
virtual Result<size_t> get(QueueId queue_id, EventId from_id, bool forget_previous, double now,
virtual Result<size_t> get(QueueId queue_id, EventId from_id, bool forget_previous, int32 unix_time_now,
MutableSpan<Event> &result_events) = 0;
virtual size_t get_size(QueueId queue_id) = 0;
virtual std::pair<uint64, uint64> run_gc(double now) = 0;
virtual std::pair<uint64, uint64> run_gc(int32 unix_time_now) = 0;
virtual void close(Promise<> promise) = 0;
};
@ -124,8 +124,6 @@ struct BinlogEvent;
template <class BinlogT>
class TQueueBinlog : public TQueue::StorageCallback {
public:
TQueueBinlog();
uint64 push(QueueId queue_id, const RawEvent &event) override;
void pop(uint64 logevent_id) override;
Status replay(const BinlogEvent &binlog_event, TQueue &q) const TD_WARN_UNUSED_RESULT;
@ -137,8 +135,7 @@ class TQueueBinlog : public TQueue::StorageCallback {
private:
std::shared_ptr<BinlogT> binlog_;
int32 magic_{2314};
double diff_{0};
static constexpr int32 BINLOG_EVENT_TYPE = 2314;
};
class TQueueMemoryStorage : public TQueue::StorageCallback {

View File

@ -68,7 +68,7 @@ class TestTQueue {
binlog_->set_callback(std::move(tqueue_binlog));
}
void restart(td::Random::Xorshift128plus &rnd, double now) {
void restart(td::Random::Xorshift128plus &rnd, td::int32 now) {
if (rnd.fast(0, 10) == 0) {
baseline_->run_gc(now);
}
@ -101,7 +101,7 @@ class TestTQueue {
}
}
EventId push(td::TQueue::QueueId queue_id, td::string data, double expires_at, EventId new_id = EventId()) {
EventId push(td::TQueue::QueueId queue_id, td::string data, td::int32 expires_at, EventId new_id = EventId()) {
auto a_id = baseline_->push(queue_id, data, expires_at, 0, new_id).move_as_ok();
auto b_id = memory_->push(queue_id, data, expires_at, 0, new_id).move_as_ok();
auto c_id = binlog_->push(queue_id, data, expires_at, 0, new_id).move_as_ok();
@ -110,14 +110,14 @@ class TestTQueue {
return a_id;
}
void check_head_tail(td::TQueue::QueueId qid, double now) {
void check_head_tail(td::TQueue::QueueId qid) {
//ASSERT_EQ(baseline_->get_head(qid), memory_->get_head(qid));
//ASSERT_EQ(baseline_->get_head(qid), binlog_->get_head(qid));
ASSERT_EQ(baseline_->get_tail(qid), memory_->get_tail(qid));
ASSERT_EQ(baseline_->get_tail(qid), binlog_->get_tail(qid));
}
void check_get(td::TQueue::QueueId qid, td::Random::Xorshift128plus &rnd, double now) {
void check_get(td::TQueue::QueueId qid, td::Random::Xorshift128plus &rnd, td::int32 now) {
td::TQueue::Event a[10];
td::MutableSpan<td::TQueue::Event> a_span(a, 10);
td::TQueue::Event b[10];
@ -169,7 +169,7 @@ TEST(TQueue, random) {
};
TestTQueue q;
double now = 0;
td::int32 now = 0;
auto push_event = [&] {
auto data = PSTRING() << rnd();
if (rnd.fast(0, 10000) == 0) {
@ -181,7 +181,7 @@ TEST(TQueue, random) {
now += 10;
};
auto check_head_tail = [&] {
q.check_head_tail(next_queue_id(), now);
q.check_head_tail(next_queue_id());
};
auto restart = [&] {
q.restart(rnd, now);
@ -206,7 +206,7 @@ TEST(TQueue, memory_leak) {
tqueue_binlog->set_binlog(std::move(binlog));
tqueue->set_callback(std::move(tqueue_binlog));
double now = 0;
td::int32 now = 0;
std::vector<td::TQueue::EventId> ids;
td::Random::Xorshift128plus rnd(123);
int i = 0;
@ -219,7 +219,7 @@ TEST(TQueue, memory_leak) {
tqueue->forget(1, ids.back());
ids.pop_back();
}
now += 1;
now++;
if (i++ % 100000 == 0) {
LOG(ERROR) << td::BufferAllocator::get_buffer_mem() << " " << tqueue->get_size(1) << " "
<< td::BufferAllocator::get_buffer_slice_size();