Improve TQueue.

GitOrigin-RevId: 3146441d9035be886a616d8de504024df1602116
This commit is contained in:
levlam 2020-06-11 23:54:56 +03:00
parent 44155da2d8
commit bee9b67e3c
3 changed files with 55 additions and 56 deletions

View File

@ -11,6 +11,7 @@
#include "td/db/binlog/BinlogInterface.h"
#include "td/utils/format.h"
#include "td/utils/misc.h"
#include "td/utils/port/Clocks.h"
#include "td/utils/Random.h"
#include "td/utils/StorerBase.h"
@ -35,14 +36,18 @@ EventId::EventId() {
}
Result<EventId> EventId::from_int32(int32 id) {
if (!is_valid(id)) {
if (!is_valid_id(id)) {
return Status::Error("Invalid ID");
}
return EventId(id);
}
EventId EventId::create_random() {
return from_int32(Random::fast_uint32() % (MAX_ID / 2) + 10 + MAX_QUEUE_EVENTS).move_as_ok();
return from_int32(Random::fast(MAX_QUEUE_EVENTS + 1, MAX_ID / 2)).move_as_ok();
}
bool EventId::is_valid() const {
return !empty() && is_valid_id(id_);
}
int32 EventId::value() const {
@ -74,34 +79,31 @@ bool EventId::operator<(const EventId &other) const {
return id_ < other.id_;
}
StringBuilder &operator<<(StringBuilder &sb, const EventId id) {
return sb << "EventId{" << id.value() << "}";
StringBuilder &operator<<(StringBuilder &string_builder, const EventId id) {
return string_builder << "EventId{" << id.value() << "}";
}
EventId::EventId(int32 id) : id_(id) {
CHECK(is_valid(id));
CHECK(is_valid_id(id));
}
bool EventId::is_valid(int32 id) {
bool EventId::is_valid_id(int32 id) {
return 0 <= id && id < MAX_ID;
}
class TQueueImpl : public TQueue {
public:
void set_callback(unique_ptr<Callback> callback) override {
void set_callback(unique_ptr<StorageCallback> callback) override {
callback_ = std::move(callback);
}
unique_ptr<Callback> extract_callback() override {
unique_ptr<StorageCallback> extract_callback() override {
return std::move(callback_);
}
void emulate_restart() override {
}
void do_push(QueueId queue_id, RawEvent &&raw_event) override {
//LOG(ERROR) << "Push " << queue_id << " " << raw_event.event_id;
CHECK(!raw_event.event_id.empty());
if (raw_event.logevent_id == 0 && callback_) {
// LOG(ERROR) << "Push to queue " << queue_id << " " << raw_event.event_id;
CHECK(raw_event.event_id.is_valid());
if (raw_event.logevent_id == 0 && callback_ != nullptr) {
raw_event.logevent_id = callback_->push(queue_id, raw_event);
}
auto &q = queues_[queue_id];
@ -128,11 +130,12 @@ class TQueueImpl : public TQueue {
if (event_id.next().is_ok()) {
break;
}
for (auto &e : q.events.as_mutable_span()) {
try_pop(queue_id, e, EventId{}, EventId{}, 0, true);
for (auto &event : q.events.as_mutable_span()) {
pop(queue_id, event, {});
}
q.tail_id = {};
q.tail_id = EventId();
q.events = {};
CHECK(new_id.next().is_ok());
}
RawEvent raw_event;
@ -177,12 +180,12 @@ class TQueueImpl : public TQueue {
if (it == from_events.end() || !(it->event_id == event_id)) {
return;
}
try_pop(queue_id, *it, {}, q.tail_id, 0, true /*force*/);
pop(queue_id, *it, q.tail_id);
}
Result<size_t> get(QueueId queue_id, EventId from_id, bool forget_previous, double now,
MutableSpan<Event> &result_events) override {
//LOG(ERROR) << "Get " << queue_id << " " << from_id;
// LOG(ERROR) << "Get " << queue_id << " " << from_id;
auto it = queues_.find(queue_id);
if (it == queues_.end()) {
result_events.truncate(0);
@ -210,7 +213,7 @@ class TQueueImpl : public TQueue {
[](auto &event, EventId event_id) { return event.event_id < event_id; }) -
from_events.begin();
}
//LOG(ERROR) << tag("first_i", first_i) << tag("size", from_events.size());
// LOG(ERROR) << tag("first_i", first_i) << tag("size", from_events.size());
for (i = first_i; i < from_events.size(); i++) {
auto &from = from_events[i];
try_pop(queue_id, from, forget_previous ? from_id : EventId{}, q.tail_id, now);
@ -265,7 +268,7 @@ class TQueueImpl : public TQueue {
};
std::unordered_map<QueueId, Queue> queues_;
unique_ptr<Callback> callback_;
unique_ptr<StorageCallback> callback_;
void compactify(VectorQueue<RawEvent> &events, size_t prefix) {
auto processed = events.as_mutable_span().substr(0, prefix);
@ -274,24 +277,23 @@ class TQueueImpl : public TQueue {
events.pop_n(removed_n);
}
void try_pop(QueueId queue_id, RawEvent &event, EventId from_id, EventId tail_id, double now, bool force = false) {
//LOG(ERROR) << event.expires_at << " < " << now << " = " << (event.expires_at < now) << " "
//<< (event.event_id.value() < from_id.value()) << " " << force << " " << event.data.empty();
bool should_drop =
event.expires_at < now || event.event_id.value() < from_id.value() || force || event.data.empty();
if (!callback_ || event.logevent_id == 0) {
if (should_drop) {
void try_pop(QueueId queue_id, RawEvent &event, EventId from_id, EventId tail_id, double now) {
// LOG(ERROR) << event.expires_at << " < " << now << " = " << (event.expires_at < now) << " "
// << (event.event_id < from_id) << " " << event.data.empty();
if (event.expires_at < now || event.event_id < from_id || event.data.empty()) {
pop(queue_id, event, tail_id);
}
}
void pop(QueueId queue_id, RawEvent &event, EventId tail_id) {
if (callback_ == nullptr || event.logevent_id == 0) {
event.logevent_id = 0;
event.data = {};
}
return;
}
if (!should_drop) {
return;
}
//LOG(ERROR) << "Drop " << queue_id << " " << event.event_id;
if (event.event_id.value() + 1 == tail_id.value()) {
// LOG(ERROR) << "Drop " << queue_id << " " << event.event_id;
if (event.event_id.next().ok() == tail_id) {
if (!event.data.empty()) {
event.data = {};
callback_->push(queue_id, event);
@ -309,7 +311,6 @@ unique_ptr<TQueue> TQueue::create() {
}
struct TQueueLogEvent : public Storer {
TQueueLogEvent() = default;
int64 queue_id;
int32 event_id;
int32 expires_at;

View File

@ -30,6 +30,8 @@ class TQueue {
static EventId create_random();
bool is_valid() const;
int32 value() const;
Result<EventId> next() const;
@ -47,7 +49,7 @@ class TQueue {
explicit EventId(int32 id);
static bool is_valid(int32 id);
static bool is_valid_id(int32 id);
};
struct Event {
@ -67,17 +69,17 @@ class TQueue {
using QueueId = int64;
class Callback {
class StorageCallback {
public:
using QueueId = TQueue::QueueId;
using RawEvent = TQueue::RawEvent;
Callback() = default;
Callback(const Callback &) = delete;
Callback &operator=(const Callback &) = delete;
Callback(Callback &&) = delete;
Callback &operator=(Callback &&) = delete;
virtual ~Callback() = default;
StorageCallback() = default;
StorageCallback(const StorageCallback &) = delete;
StorageCallback &operator=(const StorageCallback &) = delete;
StorageCallback(StorageCallback &&) = delete;
StorageCallback &operator=(StorageCallback &&) = delete;
virtual ~StorageCallback() = default;
virtual uint64 push(QueueId queue_id, const RawEvent &event) = 0;
virtual void pop(uint64 logevent_id) = 0;
@ -93,10 +95,8 @@ class TQueue {
virtual ~TQueue() = default;
virtual void set_callback(unique_ptr<Callback> callback) = 0;
virtual unique_ptr<Callback> extract_callback() = 0;
virtual void emulate_restart() = 0; // for testing only
virtual void set_callback(unique_ptr<StorageCallback> callback) = 0;
virtual unique_ptr<StorageCallback> extract_callback() = 0;
virtual void do_push(QueueId queue_id, RawEvent &&raw_event) = 0;
@ -114,12 +114,12 @@ class TQueue {
virtual void run_gc(double now) = 0;
};
StringBuilder &operator<<(StringBuilder &sb, const TQueue::EventId id);
StringBuilder &operator<<(StringBuilder &string_builder, const TQueue::EventId id);
struct BinlogEvent;
template <class BinlogT>
class TQueueBinlog : public TQueue::Callback {
class TQueueBinlog : public TQueue::StorageCallback {
public:
TQueueBinlog();
@ -137,7 +137,7 @@ class TQueueBinlog : public TQueue::Callback {
double diff_{0};
};
class TQueueMemoryStorage : public TQueue::Callback {
class TQueueMemoryStorage : public TQueue::StorageCallback {
public:
uint64 push(QueueId queue_id, const RawEvent &event) override;
void pop(uint64 logevent_id) override;

View File

@ -60,7 +60,6 @@ class TestTQueue {
}
void restart(Random::Xorshift128plus &rnd, double now) {
baseline_->emulate_restart();
if (rnd.fast(0, 10) == 0) {
baseline_->run_gc(now);
}
@ -75,11 +74,10 @@ class TestTQueue {
}
if (rnd.fast(0, 100) != 0) {
binlog_->emulate_restart();
return;
}
LOG(ERROR) << "RESTART BINLOG";
LOG(INFO) << "Restart binlog";
binlog_ = TQueue::create();
auto tqueue_binlog = make_unique<TQueueBinlog<Binlog>>();
auto binlog = std::make_shared<Binlog>();
@ -179,7 +177,7 @@ TEST(TQueue, random) {
q.check_get(next_qid(), rnd, now);
};
RandomSteps steps({{push_event, 100}, {check_head_tail, 10}, {get, 40}, {inc_now, 5}, {restart, 1}});
for (int i = 0; i < 1000000; i++) {
for (int i = 0; i < 100000; i++) {
steps.step(rnd);
}
}