diff --git a/td/telegram/net/ConnectionCreator.cpp b/td/telegram/net/ConnectionCreator.cpp index 041c45237..8b9cb0c12 100644 --- a/td/telegram/net/ConnectionCreator.cpp +++ b/td/telegram/net/ConnectionCreator.cpp @@ -800,9 +800,11 @@ void ConnectionCreator::client_loop(ClientInfo &client) { auto begin = client.queries.begin(); auto it = begin; while (it != client.queries.end() && !client.ready_connections.empty()) { - VLOG(connections) << "Send to promise " << tag("connection", client.ready_connections.back().first.get()); - it->set_value(std::move(client.ready_connections.back().first)); - client.ready_connections.pop_back(); + if (!it->is_cancelled()) { + VLOG(connections) << "Send to promise " << tag("connection", client.ready_connections.back().first.get()); + it->set_value(std::move(client.ready_connections.back().first)); + client.ready_connections.pop_back(); + } ++it; } client.queries.erase(begin, it); diff --git a/td/telegram/net/Session.cpp b/td/telegram/net/Session.cpp index 1d425748c..2d1bd8f95 100644 --- a/td/telegram/net/Session.cpp +++ b/td/telegram/net/Session.cpp @@ -65,6 +65,7 @@ class GenAuthKeyActor : public Actor { Promise> connection_promise_; Promise> handshake_promise_; std::shared_ptr callback_; + CancellationToken cancellation_token_{true}; ActorOwn child_; @@ -72,7 +73,8 @@ class GenAuthKeyActor : public Actor { // Bug in Android clang and MSVC // std::tuple> b(std::forward_as_tuple(Result())); - callback_->request_raw_connection(PromiseCreator::lambda( + callback_->request_raw_connection(PromiseCreator::cancellable_lambda( + cancellation_token_, [actor_id = actor_id(this)](Result> r_raw_connection) { send_closure(actor_id, &GenAuthKeyActor::on_connection, std::move(r_raw_connection), false); })); @@ -857,8 +859,10 @@ void Session::connection_open(ConnectionInfo *info, bool ask_info) { info->ask_info = ask_info; info->state = ConnectionInfo::State::Connecting; + info->cancellation_token_ = CancellationToken{true}; // NB: rely on constant location of info - auto promise = PromiseCreator::lambda( + auto promise = PromiseCreator::cancellable_lambda( + info->cancellation_token_, [actor_id = actor_id(this), info = info](Result> res) { send_closure(actor_id, &Session::connection_open_finish, info, std::move(res)); }); diff --git a/td/telegram/net/Session.h b/td/telegram/net/Session.h index 28eab863f..fe08bea88 100644 --- a/td/telegram/net/Session.h +++ b/td/telegram/net/Session.h @@ -125,6 +125,7 @@ class Session final int8 connection_id; Mode mode; enum class State : int8 { Empty, Connecting, Ready } state = State::Empty; + CancellationToken cancellation_token_; unique_ptr connection; bool ask_info; double wakeup_at = 0; diff --git a/tdactor/td/actor/PromiseFuture.h b/tdactor/td/actor/PromiseFuture.h index 63156c383..026bceda7 100644 --- a/tdactor/td/actor/PromiseFuture.h +++ b/tdactor/td/actor/PromiseFuture.h @@ -42,6 +42,13 @@ class PromiseInterface { set_error(result.move_as_error()); } } + virtual bool is_cancellable() const { + return false; + } + virtual bool is_cancelled() const { + return false; + } + virtual void start_migrate(int32 sched_id) { } virtual void finish_migrate() { @@ -110,6 +117,18 @@ class Promise { } promise_->finish_migrate(); } + bool is_cancellable() const { + if (!promise_) { + return false; + } + return promise_->is_cancellable(); + } + bool is_cancelled() const { + if (!promise_) { + return false; + } + return promise_->is_cancelled(); + } std::unique_ptr> release() { return std::move(promise_); } @@ -169,6 +188,42 @@ Promise &Promise::operator=(SafePromise &&other) { return *this; } +class CancellationToken { + public: + explicit CancellationToken(bool init = false) { + if (init) { + ptr_ = std::make_shared>(false); + } + } + CancellationToken(const CancellationToken &other) = default; + CancellationToken &operator=(const CancellationToken &other) { + cancel(); + ptr_ = other.ptr_; + return *this; + } + CancellationToken(CancellationToken &&other) = default; + CancellationToken &operator=(CancellationToken &&other) { + cancel(); + ptr_ = std::move(other.ptr_); + return *this; + } + ~CancellationToken() { + cancel(); + } + bool is_canceled() const { + return !ptr_ || *ptr_; + } + void cancel() { + if (ptr_) { + ptr_->store(true, std::memory_order_relaxed); + ptr_.reset(); + } + } + + private: + std::shared_ptr> ptr_; +}; + namespace detail { class EventPromise : public PromiseInterface { @@ -239,6 +294,25 @@ struct DropResult> { template using drop_result_t = typename DropResult::type; +template +class CancellablePromise : public PromiseT { + public: + template + CancellablePromise(CancellationToken cancellation_token, ArgsT &&... args) + : PromiseT(std::forward(args)...), cancellation_token_(std::move(cancellation_token)) { + } + virtual bool is_cancellable() const { + return true; + ; + } + virtual bool is_cancelled() const { + return cancellation_token_.is_canceled(); + } + + private: + CancellationToken cancellation_token_; +}; + template class LambdaPromise : public PromiseInterface { enum OnFail { None, Ok, Fail }; @@ -549,6 +623,13 @@ class PromiseCreator { std::forward(ok), std::forward(fail), false)); } + template >> + static auto cancellable_lambda(CancellationToken cancellation_token, OkT &&ok) { + return Promise( + std::make_unique, Ignore>>>( + std::move(cancellation_token), std::forward(ok), Ignore(), true)); + } + static Promise<> event(EventFull &&ok) { return Promise<>(std::make_unique(std::move(ok))); }