TQueue: do not drop tail_id whithout restart

GitOrigin-RevId: 54df1acc7b36303e128cfe1f6ac3e7ed4837eacd
This commit is contained in:
Arseny Smirnov 2019-08-07 18:29:47 +03:00
parent 62f6318edf
commit 8226c9ab49
3 changed files with 21 additions and 9 deletions

View File

@ -22,6 +22,10 @@ namespace td {
using EventId = TQueue::EventId; using EventId = TQueue::EventId;
static constexpr int32 MAX_DELAY = 7 * 86400;
static constexpr size_t MAX_EVENT_LEN = 65536 * 8;
static constexpr size_t MAX_QUEUE_EVENTS = 1000000;
EventId::EventId() { EventId::EventId() {
} }
@ -33,7 +37,7 @@ Result<EventId> EventId::from_int32(int32 id) {
} }
EventId EventId::create_random() { EventId EventId::create_random() {
return from_int32(Random::fast_uint32() % (MAX_ID / 2) + 10).move_as_ok(); return from_int32(Random::fast_uint32() % (MAX_ID / 2) + 10 + MAX_QUEUE_EVENTS).move_as_ok();
} }
int32 EventId::value() const { int32 EventId::value() const {
@ -71,9 +75,6 @@ bool EventId::is_valid(int32 id) {
class TQueueImpl : public TQueue { class TQueueImpl : public TQueue {
public: public:
static constexpr int32 MAX_DELAY = 7 * 86400;
static constexpr size_t MAX_EVENT_LEN = 65536 * 8;
void set_callback(unique_ptr<Callback> callback) override { void set_callback(unique_ptr<Callback> callback) override {
callback_ = std::move(callback); callback_ = std::move(callback);
} }
@ -81,6 +82,14 @@ class TQueueImpl : public TQueue {
return std::move(callback_); return std::move(callback_);
} }
void emulate_restart() override {
for (auto &it : queues_) {
if (it.second.events.empty()) {
it.second.tail_id = {};
}
}
}
void do_push(QueueId queue_id, RawEvent &&raw_event) override { void do_push(QueueId queue_id, RawEvent &&raw_event) override {
CHECK(!raw_event.event_id.empty()); CHECK(!raw_event.event_id.empty());
if (callback_ && raw_event.logevent_id == 0) { if (callback_ && raw_event.logevent_id == 0) {
@ -95,7 +104,7 @@ class TQueueImpl : public TQueue {
auto &q = queues_[queue_id]; auto &q = queues_[queue_id];
EventId event_id; EventId event_id;
while (true) { while (true) {
if (q.events.empty()) { if (q.tail_id.empty()) {
q.tail_id = new_id.empty() ? EventId::create_random() : new_id; q.tail_id = new_id.empty() ? EventId::create_random() : new_id;
} }
event_id = q.tail_id; event_id = q.tail_id;
@ -104,6 +113,7 @@ class TQueueImpl : public TQueue {
break; break;
} }
confirm_read(q, event_id); confirm_read(q, event_id);
q.tail_id = {};
} }
RawEvent raw_event; RawEvent raw_event;
@ -121,7 +131,7 @@ class TQueueImpl : public TQueue {
} }
auto &q = it->second; auto &q = it->second;
if (q.events.empty()) { if (q.events.empty()) {
return EventId(); return q.tail_id;
} }
return q.events.front().event_id; return q.events.front().event_id;
} }
@ -132,9 +142,6 @@ class TQueueImpl : public TQueue {
return EventId(); return EventId();
} }
auto &q = it->second; auto &q = it->second;
if (q.events.empty()) {
return EventId();
}
return q.tail_id; return q.tail_id;
} }

View File

@ -61,6 +61,8 @@ class TQueue {
virtual void set_callback(unique_ptr<Callback> callback) = 0; virtual void set_callback(unique_ptr<Callback> callback) = 0;
virtual unique_ptr<Callback> extract_callback() = 0; virtual unique_ptr<Callback> extract_callback() = 0;
virtual void emulate_restart() = 0;
virtual void do_push(QueueId queue_id, RawEvent &&raw_event) = 0; virtual void do_push(QueueId queue_id, RawEvent &&raw_event) = 0;
virtual EventId push(QueueId queue_id, string data, double expire_at, EventId new_id = EventId()) = 0; virtual EventId push(QueueId queue_id, string data, double expire_at, EventId new_id = EventId()) = 0;

View File

@ -60,6 +60,8 @@ class TestTQueue {
} }
void restart(Random::Xorshift128plus &rnd) { void restart(Random::Xorshift128plus &rnd) {
baseline_->emulate_restart();
memory_->extract_callback().release(); memory_->extract_callback().release();
auto memory_storage = unique_ptr<MemoryStorage>(memory_storage_); auto memory_storage = unique_ptr<MemoryStorage>(memory_storage_);
memory_ = TQueue::create(); memory_ = TQueue::create();
@ -67,6 +69,7 @@ class TestTQueue {
memory_->set_callback(std::move(memory_storage)); memory_->set_callback(std::move(memory_storage));
if (rnd.fast(0, 100) != 0) { if (rnd.fast(0, 100) != 0) {
binlog_->emulate_restart();
return; return;
} }