Minor tqueue fixes.

GitOrigin-RevId: 827a336498dd57364aa8a2ac59d97ecacb05f5de
This commit is contained in:
levlam 2020-06-11 16:43:26 +03:00
parent 39e58f3eb9
commit 22c9927314
5 changed files with 38 additions and 25 deletions

View File

@ -7,15 +7,20 @@
#include "td/db/TQueue.h"
#include "td/db/binlog/Binlog.h"
#include "td/db/binlog/BinlogInterface.h"
#include "td/db/binlog/BinlogHelper.h"
#include "td/db/binlog/BinlogInterface.h"
#include "td/utils/format.h"
#include "td/utils/port/Clocks.h"
#include "td/utils/Random.h"
#include "td/utils/StorerBase.h"
#include "td/utils/Time.h"
#include "td/utils/tl_parsers.h"
#include "td/utils/tl_storers.h"
#include "td/utils/tl_helpers.h"
#include "td/utils/VectorQueue.h"
#include <algorithm>
#include <unordered_map>
namespace td {
@ -31,7 +36,7 @@ EventId::EventId() {
Result<EventId> EventId::from_int32(int32 id) {
if (!is_valid(id)) {
return Status::Error("Invalid id");
return Status::Error("Invalid ID");
}
return EventId(id);
}
@ -100,14 +105,14 @@ class TQueueImpl : public TQueue {
q.events.push(std::move(raw_event));
}
Result<EventId> push(QueueId queue_id, string data, double expire_at, EventId new_id = EventId(),
Result<EventId> push(QueueId queue_id, string data, double expires_at, EventId new_id = EventId(),
int64 extra = 0) override {
auto &q = queues_[queue_id];
if (q.events.size() >= MAX_QUEUE_EVENTS) {
return Status::Error("Queue is full");
}
if (data.empty()) {
return Status::Error("data is empty");
return Status::Error("Data is empty");
}
EventId event_id;
while (true) {
@ -129,7 +134,7 @@ class TQueueImpl : public TQueue {
RawEvent raw_event;
raw_event.event_id = event_id;
raw_event.data = std::move(data);
raw_event.expire_at = expire_at;
raw_event.expires_at = expires_at;
raw_event.extra = extra;
do_push(queue_id, std::move(raw_event));
return event_id;
@ -182,10 +187,10 @@ class TQueueImpl : public TQueue {
auto &q = it->second;
// Some sanity checks
if (from_id.value() > q.tail_id.value() + 10) {
return Status::Error("from_id is in future");
return Status::Error("Specified from_id is in the future");
}
if (from_id.value() < q.tail_id.value() - narrow_cast<int32>(MAX_QUEUE_EVENTS) * 2) {
return Status::Error("from_id is in past");
return Status::Error("Specified from_id is in the past");
}
auto from_events = q.events.as_mutable_span();
@ -222,7 +227,7 @@ class TQueueImpl : public TQueue {
auto &to = events[ready_n];
to.data = from.data;
to.id = from.event_id;
to.expire_at = from.expire_at;
to.expires_at = from.expires_at;
to.extra = from.extra;
ready_n++;
}
@ -266,9 +271,10 @@ class TQueueImpl : public TQueue {
}
void try_pop(QueueId queue_id, RawEvent &event, EventId from_id, EventId tail_id, double now, bool force = false) {
//LOG(ERROR) << event.expire_at << " < " << now << " = " << (event.expire_at < now) << " "
//LOG(ERROR) << event.expires_at << " < " << now << " = " << (event.expires_at < now) << " "
//<< (event.event_id.value() < from_id.value()) << " " << force << " " << event.data.empty();
bool should_drop = event.expire_at < now || event.event_id.value() < from_id.value() || force || event.data.empty();
bool should_drop =
event.expires_at < now || event.event_id.value() < from_id.value() || force || event.data.empty();
if (!callback_ || event.logevent_id == 0) {
if (should_drop) {
event.data = {};
@ -305,7 +311,7 @@ struct TQueueLogEvent : public Storer {
TQueueLogEvent() = default;
int64 queue_id;
int32 event_id;
int32 expire_at;
int32 expires_at;
Slice data;
int64 extra;
@ -314,7 +320,7 @@ struct TQueueLogEvent : public Storer {
using td::store;
store(queue_id, storer);
store(event_id, storer);
store(expire_at, storer);
store(expires_at, storer);
store(data, storer);
if (extra != 0) {
store(extra, storer);
@ -326,7 +332,7 @@ struct TQueueLogEvent : public Storer {
using td::parse;
parse(queue_id, parser);
parse(event_id, parser);
parse(expire_at, parser);
parse(expires_at, parser);
data = parser.template fetch_string<Slice>();
if (has_extra == 0) {
extra = 0;
@ -357,7 +363,7 @@ int64 TQueueBinlog<BinlogT>::push(QueueId queue_id, const RawEvent &event) {
TQueueLogEvent log_event;
log_event.queue_id = queue_id;
log_event.event_id = event.event_id.value();
log_event.expire_at = static_cast<int32>(event.expire_at + diff_);
log_event.expires_at = static_cast<int32>(event.expires_at + diff_);
log_event.data = event.data;
log_event.extra = event.extra;
if (event.logevent_id == 0) {
@ -383,7 +389,7 @@ Status TQueueBinlog<BinlogT>::replay(const BinlogEvent &binlog_event, TQueue &q)
RawEvent raw_event;
raw_event.logevent_id = binlog_event.id_;
raw_event.event_id = event_id;
raw_event.expire_at = event.expire_at - diff_ + 1;
raw_event.expires_at = event.expires_at - diff_ + 1;
raw_event.data = event.data.str();
raw_event.extra = event.extra;
q.do_push(event.queue_id, std::move(raw_event));

View File

@ -6,10 +6,15 @@
//
#pragma once
#include "td/utils/misc.h"
#include "td/utils/common.h"
#include "td/utils/Slice.h"
#include "td/utils/Span.h"
#include "td/utils/Status.h"
#include "td/utils/StringBuilder.h"
#include <map>
#include <memory>
#include <utility>
namespace td {
@ -17,7 +22,7 @@ class TQueue {
public:
class EventId {
public:
constexpr static int32 MAX_ID = 2000000000;
static constexpr int32 MAX_ID = 2000000000;
EventId();
static Result<EventId> from_int32(int32 id);
static EventId create_random();
@ -39,14 +44,14 @@ class TQueue {
EventId id;
Slice data;
int64 extra{0};
double expire_at;
double expires_at;
};
struct RawEvent {
int64 logevent_id{0};
EventId event_id;
string data;
int64 extra{0};
double expire_at{0};
double expires_at{0};
};
class Callback {
public:
@ -68,7 +73,7 @@ class TQueue {
virtual void do_push(QueueId queue_id, RawEvent &&raw_event) = 0;
virtual Result<EventId> push(QueueId queue_id, string data, double expire_at, EventId new_id = EventId(),
virtual Result<EventId> push(QueueId queue_id, string data, double expires_at, EventId new_id = EventId(),
int64 extra = 0) = 0;
virtual void forget(QueueId queue_id, EventId event_id) = 0;
@ -87,6 +92,7 @@ class TQueue {
StringBuilder &operator<<(StringBuilder &sb, const TQueue::EventId id);
struct BinlogEvent;
template <class BinlogT>
class TQueueBinlog : public TQueue::Callback {
public:

View File

@ -13,6 +13,7 @@
#include "td/utils/buffer.h"
#include "td/utils/common.h"
#include "td/utils/StorerBase.h"
namespace td {

View File

@ -2,7 +2,6 @@ cmake_minimum_required(VERSION 3.0.2 FATAL_ERROR)
#SOURCE SETS
set(TD_TEST_SOURCE
${CMAKE_CURRENT_SOURCE_DIR}/tqueue.cpp
${CMAKE_CURRENT_SOURCE_DIR}/db.cpp
${CMAKE_CURRENT_SOURCE_DIR}/http.cpp
${CMAKE_CURRENT_SOURCE_DIR}/mtproto.cpp
@ -13,6 +12,7 @@ set(TD_TEST_SOURCE
${CMAKE_CURRENT_SOURCE_DIR}/set_with_position.cpp
${CMAKE_CURRENT_SOURCE_DIR}/string_cleaning.cpp
${CMAKE_CURRENT_SOURCE_DIR}/tdclient.cpp
${CMAKE_CURRENT_SOURCE_DIR}/tqueue.cpp
${CMAKE_CURRENT_SOURCE_DIR}/data.cpp
${CMAKE_CURRENT_SOURCE_DIR}/data.h

View File

@ -92,11 +92,11 @@ class TestTQueue {
}
}
TQueue::EventId push(TQueue::QueueId queue_id, string data, double expire_at,
TQueue::EventId push(TQueue::QueueId queue_id, string data, double expires_at,
TQueue::EventId new_id = TQueue::EventId()) {
auto a_id = baseline_->push(queue_id, data, expire_at, new_id).move_as_ok();
auto b_id = memory_->push(queue_id, data, expire_at, new_id).move_as_ok();
auto c_id = binlog_->push(queue_id, data, expire_at, new_id).move_as_ok();
auto a_id = baseline_->push(queue_id, data, expires_at, new_id).move_as_ok();
auto b_id = memory_->push(queue_id, data, expires_at, new_id).move_as_ok();
auto c_id = binlog_->push(queue_id, data, expires_at, new_id).move_as_ok();
ASSERT_EQ(a_id, b_id);
ASSERT_EQ(a_id, c_id);
return a_id;