TQueue: some fixes.

GitOrigin-RevId: f0521fd9c323e05ffaf4877b92ad42a17ee71dcd
This commit is contained in:
Arseny Smirnov 2019-08-07 18:13:10 +03:00
parent a8c74f9432
commit 791d4f3172
3 changed files with 19 additions and 13 deletions

View File

@ -71,6 +71,9 @@ bool EventId::is_valid(int32 id) {
class TQueueImpl : public TQueue {
public:
static constexpr int32 MAX_DELAY = 7 * 86400;
static constexpr size_t MAX_EVENT_LEN = 65536 * 8;
void set_callback(unique_ptr<Callback> callback) override {
callback_ = std::move(callback);
}
@ -141,7 +144,7 @@ class TQueueImpl : public TQueue {
return q.tail_id;
}
Result<size_t> get(QueueId queue_id, EventId from_id, double now, MutableSpan<Event> events) override {
Result<size_t> get(QueueId queue_id, EventId from_id, double now, MutableSpan<Event> &events) override {
auto it = queues_.find(queue_id);
if (it == queues_.end()) {
return 0;
@ -154,7 +157,8 @@ class TQueueImpl : public TQueue {
}
auto from_events = q.events.as_span();
size_t res_n = 0;
size_t ready_n = 0;
size_t left_n = 0;
for (size_t i = 0; i < from_events.size(); i++) {
auto &from = from_events[i];
if (from.expire_at < now) {
@ -162,17 +166,19 @@ class TQueueImpl : public TQueue {
continue;
}
auto &to = events[res_n];
auto &to = events[ready_n];
to.data = from.data;
to.id = from.event_id;
to.expire_at = from.expire_at;
res_n++;
if (res_n == events.size()) {
ready_n++;
if (ready_n == events.size()) {
left_n += from_events.size() - i - 1;
break;
}
}
return res_n;
events.truncate(ready_n);
return ready_n + left_n;
}
private:

View File

@ -68,7 +68,7 @@ class TQueue {
virtual EventId get_head(QueueId queue_id) const = 0;
virtual EventId get_tail(QueueId queue_id) const = 0;
virtual Result<size_t> get(QueueId queue_id, EventId from_id, double now, MutableSpan<Event> events) = 0;
virtual Result<size_t> get(QueueId queue_id, EventId from_id, double now, MutableSpan<Event> &events) = 0;
static unique_ptr<TQueue> create(unique_ptr<Callback> callback = {});
};

View File

@ -115,12 +115,12 @@ class TestTQueue {
if (tmp.is_ok()) {
a_from = tmp.move_as_ok();
}
auto a_size = baseline_->get(qid, a_from, 0, a_span).move_as_ok();
auto b_size = memory_->get(qid, a_from, 0, b_span).move_as_ok();
auto c_size = binlog_->get(qid, a_from, 0, c_span).move_as_ok();
ASSERT_EQ(a_size, b_size);
ASSERT_EQ(a_size, c_size);
for (size_t i = 0; i < a_size; i++) {
baseline_->get(qid, a_from, 0, a_span).move_as_ok();
memory_->get(qid, a_from, 0, b_span).move_as_ok();
binlog_->get(qid, a_from, 0, c_span).move_as_ok();
ASSERT_EQ(a_span.size(), b_span.size());
ASSERT_EQ(a_span.size(), c_span.size());
for (size_t i = 0; i < a_span.size(); i++) {
ASSERT_EQ(a_span[i].id, b_span[i].id);
ASSERT_EQ(a_span[i].id, c_span[i].id);
ASSERT_EQ(a_span[i].data, b_span[i].data);