close(Promise<>) interface for TQueue and BinlogKeyValue
GitOrigin-RevId: d2c524407888b99c9e6207f74828eb2da29a7720
This commit is contained in:
parent
9b98451c43
commit
d701e17cc6
@ -114,6 +114,9 @@ class BinlogKeyValue : public KeyValueSyncInterface {
|
|||||||
void close() {
|
void close() {
|
||||||
*this = BinlogKeyValue();
|
*this = BinlogKeyValue();
|
||||||
}
|
}
|
||||||
|
void close(td::Promise<> promise) override {
|
||||||
|
binlog_->close(std::move(promise));
|
||||||
|
}
|
||||||
|
|
||||||
SeqNo set(string key, string value) override {
|
SeqNo set(string key, string value) override {
|
||||||
auto lock = rw_mutex_.lock_write().move_as_ok();
|
auto lock = rw_mutex_.lock_write().move_as_ok();
|
||||||
|
@ -43,6 +43,8 @@ class KeyValueSyncInterface {
|
|||||||
virtual void erase_by_prefix(Slice prefix) = 0;
|
virtual void erase_by_prefix(Slice prefix) = 0;
|
||||||
|
|
||||||
virtual void force_sync(Promise<> &&promise) = 0;
|
virtual void force_sync(Promise<> &&promise) = 0;
|
||||||
|
|
||||||
|
virtual void close(Promise<> promise) = 0;
|
||||||
};
|
};
|
||||||
|
|
||||||
} // namespace td
|
} // namespace td
|
||||||
|
@ -234,6 +234,11 @@ class TQueueImpl : public TQueue {
|
|||||||
return do_get(queue_id, q, q.events.front().event_id, true, Time::now(), span);
|
return do_get(queue_id, q, q.events.front().event_id, true, Time::now(), span);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void close(td::Promise<> promise) override {
|
||||||
|
callback_->close(std::move(promise));
|
||||||
|
callback_ = nullptr;
|
||||||
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
struct Queue {
|
struct Queue {
|
||||||
EventId tail_id;
|
EventId tail_id;
|
||||||
@ -436,6 +441,11 @@ Status TQueueBinlog<BinlogT>::replay(const BinlogEvent &binlog_event, TQueue &q)
|
|||||||
return Status::OK();
|
return Status::OK();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
template <class BinlogT>
|
||||||
|
void TQueueBinlog<BinlogT>::close(td::Promise<> promise) {
|
||||||
|
binlog_->close(std::move(promise));
|
||||||
|
}
|
||||||
|
|
||||||
template class TQueueBinlog<BinlogInterface>;
|
template class TQueueBinlog<BinlogInterface>;
|
||||||
template class TQueueBinlog<Binlog>;
|
template class TQueueBinlog<Binlog>;
|
||||||
|
|
||||||
@ -457,5 +467,8 @@ void TQueueMemoryStorage::replay(TQueue &q) const {
|
|||||||
CHECK(is_added);
|
CHECK(is_added);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
void TQueueMemoryStorage::close(td::Promise<> promise) {
|
||||||
|
promise.set_value({});
|
||||||
|
}
|
||||||
|
|
||||||
} // namespace td
|
} // namespace td
|
||||||
|
@ -12,6 +12,8 @@
|
|||||||
#include "td/utils/Status.h"
|
#include "td/utils/Status.h"
|
||||||
#include "td/utils/StringBuilder.h"
|
#include "td/utils/StringBuilder.h"
|
||||||
|
|
||||||
|
#include "td/actor/PromiseFuture.h"
|
||||||
|
|
||||||
#include <map>
|
#include <map>
|
||||||
#include <memory>
|
#include <memory>
|
||||||
#include <utility>
|
#include <utility>
|
||||||
@ -81,6 +83,7 @@ class TQueue {
|
|||||||
|
|
||||||
virtual uint64 push(QueueId queue_id, const RawEvent &event) = 0;
|
virtual uint64 push(QueueId queue_id, const RawEvent &event) = 0;
|
||||||
virtual void pop(uint64 logevent_id) = 0;
|
virtual void pop(uint64 logevent_id) = 0;
|
||||||
|
virtual void close(td::Promise<> promise) = 0;
|
||||||
};
|
};
|
||||||
|
|
||||||
static unique_ptr<TQueue> create();
|
static unique_ptr<TQueue> create();
|
||||||
@ -111,6 +114,7 @@ class TQueue {
|
|||||||
virtual size_t get_size(QueueId queue_id) = 0;
|
virtual size_t get_size(QueueId queue_id) = 0;
|
||||||
|
|
||||||
virtual void run_gc(double now) = 0;
|
virtual void run_gc(double now) = 0;
|
||||||
|
virtual void close(td::Promise<> promise) = 0;
|
||||||
};
|
};
|
||||||
|
|
||||||
StringBuilder &operator<<(StringBuilder &string_builder, const TQueue::EventId id);
|
StringBuilder &operator<<(StringBuilder &string_builder, const TQueue::EventId id);
|
||||||
@ -129,6 +133,7 @@ class TQueueBinlog : public TQueue::StorageCallback {
|
|||||||
void set_binlog(std::shared_ptr<BinlogT> binlog) {
|
void set_binlog(std::shared_ptr<BinlogT> binlog) {
|
||||||
binlog_ = std::move(binlog);
|
binlog_ = std::move(binlog);
|
||||||
}
|
}
|
||||||
|
virtual void close(td::Promise<> promise) override;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
std::shared_ptr<BinlogT> binlog_;
|
std::shared_ptr<BinlogT> binlog_;
|
||||||
@ -141,6 +146,7 @@ class TQueueMemoryStorage : public TQueue::StorageCallback {
|
|||||||
uint64 push(QueueId queue_id, const RawEvent &event) override;
|
uint64 push(QueueId queue_id, const RawEvent &event) override;
|
||||||
void pop(uint64 logevent_id) override;
|
void pop(uint64 logevent_id) override;
|
||||||
void replay(TQueue &q) const;
|
void replay(TQueue &q) const;
|
||||||
|
virtual void close(td::Promise<> promise) override;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
uint64 next_logevent_id_{1};
|
uint64 next_logevent_id_{1};
|
||||||
|
@ -293,6 +293,11 @@ Status Binlog::close(bool need_sync) {
|
|||||||
return Status::OK();
|
return Status::OK();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void Binlog::close(td::Promise<> promise) {
|
||||||
|
TRY_STATUS_PROMISE(promise, close());
|
||||||
|
promise.set_value({});
|
||||||
|
}
|
||||||
|
|
||||||
void Binlog::change_key(DbKey new_db_key) {
|
void Binlog::change_key(DbKey new_db_key) {
|
||||||
db_key_ = std::move(new_db_key);
|
db_key_ = std::move(new_db_key);
|
||||||
aes_ctr_key_salt_ = BufferSlice();
|
aes_ctr_key_salt_ = BufferSlice();
|
||||||
|
@ -21,6 +21,8 @@
|
|||||||
#include "td/utils/StorerBase.h"
|
#include "td/utils/StorerBase.h"
|
||||||
#include "td/utils/UInt.h"
|
#include "td/utils/UInt.h"
|
||||||
|
|
||||||
|
#include "td/actor/PromiseFuture.h"
|
||||||
|
|
||||||
#include <functional>
|
#include <functional>
|
||||||
|
|
||||||
namespace td {
|
namespace td {
|
||||||
@ -103,6 +105,7 @@ class Binlog {
|
|||||||
void change_key(DbKey new_db_key);
|
void change_key(DbKey new_db_key);
|
||||||
|
|
||||||
Status close(bool need_sync = true) TD_WARN_UNUSED_RESULT;
|
Status close(bool need_sync = true) TD_WARN_UNUSED_RESULT;
|
||||||
|
void close(td::Promise<>);
|
||||||
Status close_and_destroy() TD_WARN_UNUSED_RESULT;
|
Status close_and_destroy() TD_WARN_UNUSED_RESULT;
|
||||||
static Status destroy(Slice path) TD_WARN_UNUSED_RESULT;
|
static Status destroy(Slice path) TD_WARN_UNUSED_RESULT;
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user