TQueue: extra

GitOrigin-RevId: 7f746000e546b422034c0170e068d599317764c9
This commit is contained in:
Arseny Smirnov 2019-09-06 18:55:19 +03:00
parent 7a48b9bfc0
commit 39e58f3eb9
3 changed files with 32 additions and 15 deletions

View File

@ -6,16 +6,16 @@
//
#include "td/db/TQueue.h"
#include "td/utils/Random.h"
#include "td/utils/VectorQueue.h"
#include "td/utils/tl_parsers.h"
#include "td/utils/tl_storers.h"
#include "td/utils/tl_helpers.h"
#include "td/db/binlog/Binlog.h"
#include "td/db/binlog/BinlogInterface.h"
#include "td/db/binlog/BinlogHelper.h"
#include "td/utils/Random.h"
#include "td/utils/tl_parsers.h"
#include "td/utils/tl_storers.h"
#include "td/utils/tl_helpers.h"
#include "td/utils/VectorQueue.h"
#include <unordered_map>
namespace td {
@ -100,7 +100,8 @@ class TQueueImpl : public TQueue {
q.events.push(std::move(raw_event));
}
Result<EventId> push(QueueId queue_id, string data, double expire_at, EventId new_id = EventId()) override {
Result<EventId> push(QueueId queue_id, string data, double expire_at, EventId new_id = EventId(),
int64 extra = 0) override {
auto &q = queues_[queue_id];
if (q.events.size() >= MAX_QUEUE_EVENTS) {
return Status::Error("Queue is full");
@ -129,6 +130,7 @@ class TQueueImpl : public TQueue {
raw_event.event_id = event_id;
raw_event.data = std::move(data);
raw_event.expire_at = expire_at;
raw_event.extra = extra;
do_push(queue_id, std::move(raw_event));
return event_id;
}
@ -221,6 +223,7 @@ class TQueueImpl : public TQueue {
to.data = from.data;
to.id = from.event_id;
to.expire_at = from.expire_at;
to.extra = from.extra;
ready_n++;
}
@ -304,6 +307,7 @@ struct TQueueLogEvent : public Storer {
int32 event_id;
int32 expire_at;
Slice data;
int64 extra;
template <class StorerT>
void store(StorerT &&storer) const {
@ -312,15 +316,23 @@ struct TQueueLogEvent : public Storer {
store(event_id, storer);
store(expire_at, storer);
store(data, storer);
if (extra != 0) {
store(extra, storer);
}
}
template <class ParserT>
void parse(ParserT &&parser) {
void parse(ParserT &&parser, int32 has_extra) {
using td::parse;
parse(queue_id, parser);
parse(event_id, parser);
parse(expire_at, parser);
data = parser.template fetch_string<Slice>();
if (has_extra == 0) {
extra = 0;
} else {
parse(extra, parser);
}
}
size_t size() const override {
@ -347,11 +359,12 @@ int64 TQueueBinlog<BinlogT>::push(QueueId queue_id, const RawEvent &event) {
log_event.event_id = event.event_id.value();
log_event.expire_at = static_cast<int32>(event.expire_at + diff_);
log_event.data = event.data;
log_event.extra = event.extra;
if (event.logevent_id == 0) {
auto res = binlog_->add(magic_, log_event);
auto res = binlog_->add(magic_ + (log_event.extra != 0), log_event);
return res;
}
binlog_->rewrite(event.logevent_id, magic_, log_event);
binlog_->rewrite(event.logevent_id, magic_ + (log_event.extra != 0), log_event);
return event.logevent_id;
}
@ -364,7 +377,7 @@ template <class BinlogT>
Status TQueueBinlog<BinlogT>::replay(const BinlogEvent &binlog_event, TQueue &q) {
TQueueLogEvent event;
TlParser parser(binlog_event.data_);
event.parse(parser);
event.parse(parser, binlog_event.type_ - magic_);
TRY_STATUS(parser.get_status());
TRY_RESULT(event_id, EventId::from_int32(event.event_id));
RawEvent raw_event;
@ -372,6 +385,7 @@ Status TQueueBinlog<BinlogT>::replay(const BinlogEvent &binlog_event, TQueue &q)
raw_event.event_id = event_id;
raw_event.expire_at = event.expire_at - diff_ + 1;
raw_event.data = event.data.str();
raw_event.extra = event.extra;
q.do_push(event.queue_id, std::move(raw_event));
return Status::OK();
}

View File

@ -38,12 +38,14 @@ class TQueue {
struct Event {
EventId id;
Slice data;
int64 extra{0};
double expire_at;
};
struct RawEvent {
int64 logevent_id{0};
EventId event_id;
string data;
int64 extra{0};
double expire_at{0};
};
class Callback {
@ -66,7 +68,8 @@ class TQueue {
virtual void do_push(QueueId queue_id, RawEvent &&raw_event) = 0;
virtual Result<EventId> push(QueueId queue_id, string data, double expire_at, EventId new_id = EventId()) = 0;
virtual Result<EventId> push(QueueId queue_id, string data, double expire_at, EventId new_id = EventId(),
int64 extra = 0) = 0;
virtual void forget(QueueId queue_id, EventId event_id) = 0;

View File

@ -170,13 +170,13 @@ TEST(TQueue, random) {
now += 10;
};
auto check_head_tail = [&] {
q.check_head_tail(next_qid());
q.check_head_tail(next_qid(), now);
};
auto restart = [&] {
q.restart(rnd);
q.restart(rnd, now);
};
auto get = [&] {
q.check_get(next_qid(), rnd);
q.check_get(next_qid(), rnd, now);
};
RandomSteps steps({{push_event, 100}, {check_head_tail, 10}, {get, 40}, {inc_now, 5}, {restart, 1}});
for (int i = 0; i < 1000000; i++) {