TQueue: big rewrite

GitOrigin-RevId: dc541f0fbdbc2ecdbba70b52b494756f763e78bb
This commit is contained in:
Arseny Smirnov 2019-08-09 20:29:17 +03:00
parent db97a337fc
commit de5cc3e214
3 changed files with 118 additions and 57 deletions

View File

@ -83,16 +83,11 @@ class TQueueImpl : public TQueue {
}
void emulate_restart() override {
for (auto &it : queues_) {
if (it.second.events.empty()) {
it.second.tail_id = {};
}
}
}
void do_push(QueueId queue_id, RawEvent &&raw_event) override {
CHECK(!raw_event.event_id.empty());
if (callback_ && raw_event.logevent_id == 0) {
if (raw_event.logevent_id == 0 && callback_) {
raw_event.logevent_id = callback_->push(queue_id, raw_event);
}
auto &q = queues_[queue_id];
@ -105,6 +100,9 @@ class TQueueImpl : public TQueue {
if (q.events.size() >= MAX_QUEUE_EVENTS) {
return Status::Error("Queue is full");
}
if (data.empty()) {
return Status::Error("data is empty");
}
EventId event_id;
while (true) {
if (q.tail_id.empty()) {
@ -115,13 +113,16 @@ class TQueueImpl : public TQueue {
if (event_id.next().is_ok()) {
break;
}
confirm_read(q, event_id, 0);
for (auto &e : q.events.as_mutable_span()) {
try_pop(queue_id, e, EventId{}, EventId{}, 0, true);
}
q.tail_id = {};
q.events = {};
}
RawEvent raw_event;
raw_event.event_id = event_id;
raw_event.data = data;
raw_event.data = std::move(data);
raw_event.expire_at = expire_at;
do_push(queue_id, std::move(raw_event));
return event_id;
@ -162,36 +163,53 @@ class TQueueImpl : public TQueue {
if (from_id.value() < q.tail_id.value() - narrow_cast<int32>(MAX_QUEUE_EVENTS) * 2) {
return Status::Error("from_id is in past");
}
confirm_read(q, from_id, now);
if (q.events.empty()) {
events.truncate(0);
return 0;
}
auto from_events = q.events.as_span();
auto from_events = q.events.as_mutable_span();
size_t ready_n = 0;
size_t left_n = from_events.size();
for (size_t i = 0; i < from_events.size() && ready_n < events.size(); i++) {
left_n--;
auto &from = from_events[i];
if (from.expire_at < now) {
//TODO: pop this element
size_t i = 0;
while (true) {
from_events = q.events.as_mutable_span();
ready_n = 0;
i = 0;
for (; i < from_events.size(); i++) {
auto &from = from_events[i];
try_pop(queue_id, from, from_id, q.tail_id, now);
if (from.data.empty()) {
continue;
}
if (ready_n == events.size()) {
break;
}
auto &to = events[ready_n];
to.data = from.data;
to.id = from.event_id;
to.expire_at = from.expire_at;
ready_n++;
}
// compactify skipped events
if (ready_n * 2 < i) {
compactify(q.events, i);
continue;
}
auto &to = events[ready_n];
to.data = from.data;
to.id = from.event_id;
to.expire_at = from.expire_at;
break;
}
ready_n++;
if (ready_n == events.size()) {
left_n += from_events.size() - i - 1;
break;
events.truncate(ready_n);
size_t left_n = from_events.size() - i;
return ready_n + left_n;
}
void run_gc(double now) override {
for (auto &it : queues_) {
for (auto &e : it.second.events.as_mutable_span()) {
try_pop(it.first, e, EventId(), it.second.tail_id, now);
}
}
events.truncate(ready_n);
return ready_n + left_n;
}
private:
@ -203,13 +221,35 @@ class TQueueImpl : public TQueue {
std::unordered_map<QueueId, Queue> queues_;
unique_ptr<Callback> callback_;
void confirm_read(Queue &q, EventId till_id, double now) {
while (!q.events.empty() &&
(q.events.front().event_id.value() < till_id.value() || q.events.front().expire_at < now)) {
if (callback_) {
callback_->pop(q.events.front().logevent_id);
void compactify(VectorQueue<RawEvent> &events, size_t prefix) {
auto processed = events.as_mutable_span().substr(0, prefix);
auto removed_n =
processed.rend() - std::remove_if(processed.rbegin(), processed.rend(), [](auto &e) { return e.data.empty(); });
events.pop_n(removed_n);
}
void try_pop(QueueId queue_id, RawEvent &event, EventId from_id, EventId tail_id, double now, bool force = false) {
bool should_drop = event.expire_at < now || event.event_id.value() < from_id.value() || force || event.data.empty();
if (!callback_ || event.logevent_id == 0) {
if (should_drop) {
event.data = {};
}
q.events.pop();
return;
}
if (!should_drop) {
return;
}
if (event.event_id.value() + 1 == tail_id.value()) {
if (!event.data.empty()) {
event.data = {};
callback_->push(queue_id, event);
}
} else {
callback_->pop(event.logevent_id);
event.logevent_id = 0;
event.data = {};
}
}
};
@ -270,7 +310,12 @@ int64 TQueueBinlog<BinlogT>::push(QueueId queue_id, const RawEvent &event) {
log_event.event_id = event.event_id.value();
log_event.expire_at = static_cast<int32>(event.expire_at + diff_);
log_event.data = event.data;
return binlog_->add(magic_, log_event);
if (event.logevent_id == 0) {
auto res = binlog_->add(magic_, log_event);
return res;
}
binlog_->rewrite(event.logevent_id, magic_, log_event);
return event.logevent_id;
}
template <class BinlogT>
@ -298,7 +343,7 @@ template class TQueueBinlog<BinlogInterface>;
template class TQueueBinlog<Binlog>;
int64 MemoryStorage::push(QueueId queue_id, const RawEvent &event) {
auto logevent_id = next_logevent_id_++;
auto logevent_id = event.logevent_id == 0 ? next_logevent_id_++ : event.logevent_id;
events_[logevent_id] = std::make_pair(queue_id, event);
return logevent_id;

View File

@ -72,6 +72,8 @@ class TQueue {
virtual Result<size_t> get(QueueId queue_id, EventId from_id, double now, MutableSpan<Event> &events) = 0;
virtual void run_gc(double now) = 0;
static unique_ptr<TQueue> create(unique_ptr<Callback> callback = {});
};

View File

@ -59,14 +59,20 @@ class TestTQueue {
binlog_->set_callback(std::move(tqueue_binlog));
}
void restart(Random::Xorshift128plus &rnd) {
void restart(Random::Xorshift128plus &rnd, double now) {
baseline_->emulate_restart();
if (rnd.fast(0, 10) == 0) {
baseline_->run_gc(now);
}
memory_->extract_callback().release();
auto memory_storage = unique_ptr<MemoryStorage>(memory_storage_);
memory_ = TQueue::create();
memory_storage->replay(*memory_);
memory_->set_callback(std::move(memory_storage));
if (rnd.fast(0, 10) == 0) {
memory_->run_gc(now);
}
if (rnd.fast(0, 100) != 0) {
binlog_->emulate_restart();
@ -81,6 +87,9 @@ class TestTQueue {
.ensure();
tqueue_binlog->set_binlog(binlog);
binlog_->set_callback(std::move(tqueue_binlog));
if (rnd.fast(0, 10) == 0) {
binlog_->run_gc(now);
}
}
TQueue::EventId push(TQueue::QueueId queue_id, string data, double expire_at,
@ -93,34 +102,34 @@ class TestTQueue {
return a_id;
}
void check_head_tail(TQueue::QueueId qid) {
ASSERT_EQ(baseline_->get_head(qid), memory_->get_head(qid));
ASSERT_EQ(baseline_->get_head(qid), binlog_->get_head(qid));
void check_head_tail(TQueue::QueueId qid, double now) {
//ASSERT_EQ(baseline_->get_head(qid), memory_->get_head(qid));
//ASSERT_EQ(baseline_->get_head(qid), binlog_->get_head(qid));
ASSERT_EQ(baseline_->get_tail(qid), memory_->get_tail(qid));
ASSERT_EQ(baseline_->get_tail(qid), binlog_->get_tail(qid));
}
void check_get(TQueue::QueueId qid, Random::Xorshift128plus &rnd) {
void check_get(TQueue::QueueId qid, Random::Xorshift128plus &rnd, double now) {
TQueue::Event a[10];
MutableSpan<TQueue::Event> a_span(a, 10);
TQueue::Event b[10];
MutableSpan<TQueue::Event> b_span(b, 10);
TQueue::Event c[10];
MutableSpan<TQueue::Event> c_span(b, 10);
MutableSpan<TQueue::Event> c_span(c, 10);
auto a_from = baseline_->get_head(qid);
auto b_from = memory_->get_head(qid);
auto c_from = binlog_->get_head(qid);
ASSERT_EQ(a_from, b_from);
ASSERT_EQ(a_from, c_from);
//auto b_from = memory_->get_head(qid);
//auto c_from = binlog_->get_head(qid);
//ASSERT_EQ(a_from, b_from);
//ASSERT_EQ(a_from, c_from);
auto tmp = a_from.advance(rnd.fast(-10, 10));
if (tmp.is_ok()) {
a_from = tmp.move_as_ok();
}
baseline_->get(qid, a_from, 0, a_span).move_as_ok();
memory_->get(qid, a_from, 0, b_span).move_as_ok();
binlog_->get(qid, a_from, 0, c_span).move_as_ok();
baseline_->get(qid, a_from, now, a_span).move_as_ok();
memory_->get(qid, a_from, now, b_span).move_as_ok();
binlog_->get(qid, a_from, now, c_span).move_as_ok();
ASSERT_EQ(a_span.size(), b_span.size());
ASSERT_EQ(a_span.size(), c_span.size());
for (size_t i = 0; i < a_span.size(); i++) {
@ -145,15 +154,20 @@ TEST(TQueue, random) {
return rnd.fast(1, 10);
};
auto next_first_id = [&] {
if (rnd.fast(0, 3) == 0) {
return EventId::from_int32(EventId::MAX_ID - 20).move_as_ok();
}
return EventId::from_int32(rnd.fast(1000000000, 1500000000)).move_as_ok();
return EventId::from_int32(EventId::MAX_ID - 20).move_as_ok();
//if (rnd.fast(0, 3) == 0) {
//return EventId::from_int32(EventId::MAX_ID - 20).move_as_ok();
//}
//return EventId::from_int32(rnd.fast(1000000000, 1500000000)).move_as_ok();
};
TestTQueue q;
double now = 0;
auto push_event = [&] {
auto data = PSTRING() << rnd();
q.push(next_qid(), data, 0, next_first_id());
q.push(next_qid(), data, now + rnd.fast(-10, 10) * 10 + 5, next_first_id());
};
auto inc_now = [&] {
now += 10;
};
auto check_head_tail = [&] {
q.check_head_tail(next_qid());
@ -164,7 +178,7 @@ TEST(TQueue, random) {
auto get = [&] {
q.check_get(next_qid(), rnd);
};
RandomSteps steps({{push_event, 100}, {check_head_tail, 10}, {get, 40}, {restart, 1}});
RandomSteps steps({{push_event, 100}, {check_head_tail, 10}, {get, 40}, {inc_now, 5}, {restart, 1}});
for (int i = 0; i < 1000000; i++) {
steps.step(rnd);
}