TQueue improvements.

GitOrigin-RevId: 753aba147aed58ccba167a83dd798987ce6da177
This commit is contained in:
levlam 2020-06-11 18:12:16 +03:00
parent 2a6eebec24
commit 42e3aefc02
3 changed files with 58 additions and 32 deletions

View File

@ -66,6 +66,10 @@ bool EventId::operator==(const EventId &other) const {
return id_ == other.id_; return id_ == other.id_;
} }
bool EventId::operator!=(const EventId &other) const {
return !(*this == other);
}
bool EventId::operator<(const EventId &other) const { bool EventId::operator<(const EventId &other) const {
return id_ < other.id_; return id_ < other.id_;
} }
@ -177,11 +181,11 @@ class TQueueImpl : public TQueue {
} }
Result<size_t> get(QueueId queue_id, EventId from_id, bool forget_previous, double now, Result<size_t> get(QueueId queue_id, EventId from_id, bool forget_previous, double now,
MutableSpan<Event> &events) override { MutableSpan<Event> &result_events) override {
//LOG(ERROR) << "Get " << queue_id << " " << from_id; //LOG(ERROR) << "Get " << queue_id << " " << from_id;
auto it = queues_.find(queue_id); auto it = queues_.find(queue_id);
if (it == queues_.end()) { if (it == queues_.end()) {
events.truncate(0); result_events.truncate(0);
return 0; return 0;
} }
auto &q = it->second; auto &q = it->second;
@ -214,7 +218,7 @@ class TQueueImpl : public TQueue {
continue; continue;
} }
if (ready_n == events.size()) { if (ready_n == result_events.size()) {
break; break;
} }
@ -224,7 +228,7 @@ class TQueueImpl : public TQueue {
continue; continue;
} }
auto &to = events[ready_n]; auto &to = result_events[ready_n];
to.data = from.data; to.data = from.data;
to.id = from.event_id; to.id = from.event_id;
to.expires_at = from.expires_at; to.expires_at = from.expires_at;
@ -241,7 +245,7 @@ class TQueueImpl : public TQueue {
break; break;
} }
events.truncate(ready_n); result_events.truncate(ready_n);
size_t left_n = from_events.size() - i; size_t left_n = from_events.size() - i;
return ready_n + left_n; return ready_n + left_n;
} }
@ -299,12 +303,9 @@ class TQueueImpl : public TQueue {
} }
} }
}; };
unique_ptr<TQueue> TQueue::create(unique_ptr<Callback> callback) {
auto res = make_unique<TQueueImpl>(); unique_ptr<TQueue> TQueue::create() {
if (callback) { return make_unique<TQueueImpl>();
res->set_callback(std::move(callback));
}
return res;
} }
struct TQueueLogEvent : public Storer { struct TQueueLogEvent : public Storer {
@ -358,6 +359,7 @@ template <class BinlogT>
TQueueBinlog<BinlogT>::TQueueBinlog() { TQueueBinlog<BinlogT>::TQueueBinlog() {
diff_ = Clocks::system() - Time::now(); diff_ = Clocks::system() - Time::now();
} }
template <class BinlogT> template <class BinlogT>
uint64 TQueueBinlog<BinlogT>::push(QueueId queue_id, const RawEvent &event) { uint64 TQueueBinlog<BinlogT>::push(QueueId queue_id, const RawEvent &event) {
TQueueLogEvent log_event; TQueueLogEvent log_event;
@ -367,8 +369,7 @@ uint64 TQueueBinlog<BinlogT>::push(QueueId queue_id, const RawEvent &event) {
log_event.data = event.data; log_event.data = event.data;
log_event.extra = event.extra; log_event.extra = event.extra;
if (event.logevent_id == 0) { if (event.logevent_id == 0) {
auto res = binlog_->add(magic_ + (log_event.extra != 0), log_event); return binlog_->add(magic_ + (log_event.extra != 0), log_event);
return res;
} }
binlog_->rewrite(event.logevent_id, magic_ + (log_event.extra != 0), log_event); binlog_->rewrite(event.logevent_id, magic_ + (log_event.extra != 0), log_event);
return event.logevent_id; return event.logevent_id;
@ -399,17 +400,16 @@ Status TQueueBinlog<BinlogT>::replay(const BinlogEvent &binlog_event, TQueue &q)
template class TQueueBinlog<BinlogInterface>; template class TQueueBinlog<BinlogInterface>;
template class TQueueBinlog<Binlog>; template class TQueueBinlog<Binlog>;
uint64 MemoryStorage::push(QueueId queue_id, const RawEvent &event) { uint64 TQueueMemoryStorage::push(QueueId queue_id, const RawEvent &event) {
auto logevent_id = event.logevent_id == 0 ? next_logevent_id_++ : event.logevent_id; auto logevent_id = event.logevent_id == 0 ? next_logevent_id_++ : event.logevent_id;
events_[logevent_id] = std::make_pair(queue_id, event); events_[logevent_id] = std::make_pair(queue_id, event);
return logevent_id; return logevent_id;
} }
void MemoryStorage::pop(uint64 logevent_id) { void TQueueMemoryStorage::pop(uint64 logevent_id) {
events_.erase(logevent_id); events_.erase(logevent_id);
} }
void MemoryStorage::replay(TQueue &q) { void TQueueMemoryStorage::replay(TQueue &q) {
for (auto e : events_) { for (auto e : events_) {
auto x = e.second; auto x = e.second;
x.second.logevent_id = e.first; x.second.logevent_id = e.first;

View File

@ -23,29 +23,40 @@ class TQueue {
class EventId { class EventId {
public: public:
static constexpr int32 MAX_ID = 2000000000; static constexpr int32 MAX_ID = 2000000000;
EventId(); EventId();
static Result<EventId> from_int32(int32 id); static Result<EventId> from_int32(int32 id);
static EventId create_random(); static EventId create_random();
int32 value() const; int32 value() const;
Result<EventId> next() const; Result<EventId> next() const;
Result<EventId> advance(size_t offset) const; Result<EventId> advance(size_t offset) const;
bool empty() const; bool empty() const;
bool operator==(const EventId &other) const; bool operator==(const EventId &other) const;
bool operator!=(const EventId &other) const;
bool operator<(const EventId &other) const; bool operator<(const EventId &other) const;
private: private:
int32 id_{0}; int32 id_{0};
explicit EventId(int32 id); explicit EventId(int32 id);
static bool is_valid(int32 id); static bool is_valid(int32 id);
}; };
using QueueId = int64;
struct Event { struct Event {
EventId id; EventId id;
Slice data; Slice data;
int64 extra{0}; int64 extra{0};
double expires_at; double expires_at{0};
}; };
struct RawEvent { struct RawEvent {
uint64 logevent_id{0}; uint64 logevent_id{0};
EventId event_id; EventId event_id;
@ -53,23 +64,39 @@ class TQueue {
int64 extra{0}; int64 extra{0};
double expires_at{0}; double expires_at{0};
}; };
using QueueId = int64;
class Callback { class Callback {
public: public:
using EventId = TQueue::EventId;
using QueueId = TQueue::QueueId; using QueueId = TQueue::QueueId;
using RawEvent = TQueue::RawEvent; using RawEvent = TQueue::RawEvent;
virtual ~Callback() {
} Callback() = default;
Callback(const Callback &) = delete;
Callback &operator=(const Callback &) = delete;
Callback(Callback &&) = delete;
Callback &operator=(Callback &&) = delete;
virtual ~Callback() = default;
virtual uint64 push(QueueId queue_id, const RawEvent &event) = 0; virtual uint64 push(QueueId queue_id, const RawEvent &event) = 0;
virtual void pop(uint64 logevent_id) = 0; virtual void pop(uint64 logevent_id) = 0;
}; };
virtual ~TQueue() { static unique_ptr<TQueue> create();
}
TQueue() = default;
TQueue(const TQueue &) = delete;
TQueue &operator=(const TQueue &) = delete;
TQueue(TQueue &&) = delete;
TQueue &operator=(TQueue &&) = delete;
virtual ~TQueue() = default;
virtual void set_callback(unique_ptr<Callback> callback) = 0; virtual void set_callback(unique_ptr<Callback> callback) = 0;
virtual unique_ptr<Callback> extract_callback() = 0; virtual unique_ptr<Callback> extract_callback() = 0;
virtual void emulate_restart() = 0; virtual void emulate_restart() = 0; // for testing only
virtual void do_push(QueueId queue_id, RawEvent &&raw_event) = 0; virtual void do_push(QueueId queue_id, RawEvent &&raw_event) = 0;
@ -82,11 +109,9 @@ class TQueue {
virtual EventId get_tail(QueueId queue_id) const = 0; virtual EventId get_tail(QueueId queue_id) const = 0;
virtual Result<size_t> get(QueueId queue_id, EventId from_id, bool forget_previous, double now, virtual Result<size_t> get(QueueId queue_id, EventId from_id, bool forget_previous, double now,
MutableSpan<Event> &events) = 0; MutableSpan<Event> &result_events) = 0;
virtual void run_gc(double now) = 0; virtual void run_gc(double now) = 0;
static unique_ptr<TQueue> create(unique_ptr<Callback> callback = {});
}; };
StringBuilder &operator<<(StringBuilder &sb, const TQueue::EventId id); StringBuilder &operator<<(StringBuilder &sb, const TQueue::EventId id);
@ -97,6 +122,7 @@ template <class BinlogT>
class TQueueBinlog : public TQueue::Callback { class TQueueBinlog : public TQueue::Callback {
public: public:
TQueueBinlog(); TQueueBinlog();
uint64 push(QueueId queue_id, const RawEvent &event) override; uint64 push(QueueId queue_id, const RawEvent &event) override;
void pop(uint64 logevent_id) override; void pop(uint64 logevent_id) override;
Status replay(const BinlogEvent &binlog_event, TQueue &q); Status replay(const BinlogEvent &binlog_event, TQueue &q);
@ -111,7 +137,7 @@ class TQueueBinlog : public TQueue::Callback {
double diff_{0}; double diff_{0};
}; };
class MemoryStorage : public TQueue::Callback { class TQueueMemoryStorage : public TQueue::Callback {
public: public:
uint64 push(QueueId queue_id, const RawEvent &event) override; uint64 push(QueueId queue_id, const RawEvent &event) override;
void pop(uint64 logevent_id) override; void pop(uint64 logevent_id) override;

View File

@ -46,7 +46,7 @@ class TestTQueue {
baseline_ = TQueue::create(); baseline_ = TQueue::create();
memory_ = TQueue::create(); memory_ = TQueue::create();
auto memory_storage = td::make_unique<MemoryStorage>(); auto memory_storage = td::make_unique<TQueueMemoryStorage>();
memory_storage_ = memory_storage.get(); memory_storage_ = memory_storage.get();
memory_->set_callback(std::move(memory_storage)); memory_->set_callback(std::move(memory_storage));
@ -66,7 +66,7 @@ class TestTQueue {
} }
memory_->extract_callback().release(); memory_->extract_callback().release();
auto memory_storage = unique_ptr<MemoryStorage>(memory_storage_); auto memory_storage = unique_ptr<TQueueMemoryStorage>(memory_storage_);
memory_ = TQueue::create(); memory_ = TQueue::create();
memory_storage->replay(*memory_); memory_storage->replay(*memory_);
memory_->set_callback(std::move(memory_storage)); memory_->set_callback(std::move(memory_storage));
@ -144,7 +144,7 @@ class TestTQueue {
unique_ptr<TQueue> baseline_; unique_ptr<TQueue> baseline_;
unique_ptr<TQueue> memory_; unique_ptr<TQueue> memory_;
unique_ptr<TQueue> binlog_; unique_ptr<TQueue> binlog_;
MemoryStorage *memory_storage_{nullptr}; TQueueMemoryStorage *memory_storage_{nullptr};
}; };
TEST(TQueue, random) { TEST(TQueue, random) {