Use CancellationToken for ConnectionCreator::request_connection promises

GitOrigin-RevId: 14157bd3677f4720d37ef70f64500522e3674173
This commit is contained in:
Arseny Smirnov 2018-06-13 19:20:42 +03:00
parent 3c1698dddf
commit d4cc127f17
4 changed files with 93 additions and 5 deletions

View File

@ -800,9 +800,11 @@ void ConnectionCreator::client_loop(ClientInfo &client) {
auto begin = client.queries.begin(); auto begin = client.queries.begin();
auto it = begin; auto it = begin;
while (it != client.queries.end() && !client.ready_connections.empty()) { while (it != client.queries.end() && !client.ready_connections.empty()) {
VLOG(connections) << "Send to promise " << tag("connection", client.ready_connections.back().first.get()); if (!it->is_cancelled()) {
it->set_value(std::move(client.ready_connections.back().first)); VLOG(connections) << "Send to promise " << tag("connection", client.ready_connections.back().first.get());
client.ready_connections.pop_back(); it->set_value(std::move(client.ready_connections.back().first));
client.ready_connections.pop_back();
}
++it; ++it;
} }
client.queries.erase(begin, it); client.queries.erase(begin, it);

View File

@ -65,6 +65,7 @@ class GenAuthKeyActor : public Actor {
Promise<std::unique_ptr<mtproto::RawConnection>> connection_promise_; Promise<std::unique_ptr<mtproto::RawConnection>> connection_promise_;
Promise<std::unique_ptr<mtproto::AuthKeyHandshake>> handshake_promise_; Promise<std::unique_ptr<mtproto::AuthKeyHandshake>> handshake_promise_;
std::shared_ptr<Session::Callback> callback_; std::shared_ptr<Session::Callback> callback_;
CancellationToken cancellation_token_{true};
ActorOwn<mtproto::HandshakeActor> child_; ActorOwn<mtproto::HandshakeActor> child_;
@ -72,7 +73,8 @@ class GenAuthKeyActor : public Actor {
// Bug in Android clang and MSVC // Bug in Android clang and MSVC
// std::tuple<Result<int>> b(std::forward_as_tuple(Result<int>())); // std::tuple<Result<int>> b(std::forward_as_tuple(Result<int>()));
callback_->request_raw_connection(PromiseCreator::lambda( callback_->request_raw_connection(PromiseCreator::cancellable_lambda(
cancellation_token_,
[actor_id = actor_id(this)](Result<std::unique_ptr<mtproto::RawConnection>> r_raw_connection) { [actor_id = actor_id(this)](Result<std::unique_ptr<mtproto::RawConnection>> r_raw_connection) {
send_closure(actor_id, &GenAuthKeyActor::on_connection, std::move(r_raw_connection), false); send_closure(actor_id, &GenAuthKeyActor::on_connection, std::move(r_raw_connection), false);
})); }));
@ -857,8 +859,10 @@ void Session::connection_open(ConnectionInfo *info, bool ask_info) {
info->ask_info = ask_info; info->ask_info = ask_info;
info->state = ConnectionInfo::State::Connecting; info->state = ConnectionInfo::State::Connecting;
info->cancellation_token_ = CancellationToken{true};
// NB: rely on constant location of info // NB: rely on constant location of info
auto promise = PromiseCreator::lambda( auto promise = PromiseCreator::cancellable_lambda(
info->cancellation_token_,
[actor_id = actor_id(this), info = info](Result<std::unique_ptr<mtproto::RawConnection>> res) { [actor_id = actor_id(this), info = info](Result<std::unique_ptr<mtproto::RawConnection>> res) {
send_closure(actor_id, &Session::connection_open_finish, info, std::move(res)); send_closure(actor_id, &Session::connection_open_finish, info, std::move(res));
}); });

View File

@ -125,6 +125,7 @@ class Session final
int8 connection_id; int8 connection_id;
Mode mode; Mode mode;
enum class State : int8 { Empty, Connecting, Ready } state = State::Empty; enum class State : int8 { Empty, Connecting, Ready } state = State::Empty;
CancellationToken cancellation_token_;
unique_ptr<mtproto::SessionConnection> connection; unique_ptr<mtproto::SessionConnection> connection;
bool ask_info; bool ask_info;
double wakeup_at = 0; double wakeup_at = 0;

View File

@ -42,6 +42,13 @@ class PromiseInterface {
set_error(result.move_as_error()); set_error(result.move_as_error());
} }
} }
virtual bool is_cancellable() const {
return false;
}
virtual bool is_cancelled() const {
return false;
}
virtual void start_migrate(int32 sched_id) { virtual void start_migrate(int32 sched_id) {
} }
virtual void finish_migrate() { virtual void finish_migrate() {
@ -110,6 +117,18 @@ class Promise {
} }
promise_->finish_migrate(); promise_->finish_migrate();
} }
bool is_cancellable() const {
if (!promise_) {
return false;
}
return promise_->is_cancellable();
}
bool is_cancelled() const {
if (!promise_) {
return false;
}
return promise_->is_cancelled();
}
std::unique_ptr<PromiseInterface<T>> release() { std::unique_ptr<PromiseInterface<T>> release() {
return std::move(promise_); return std::move(promise_);
} }
@ -169,6 +188,42 @@ Promise<T> &Promise<T>::operator=(SafePromise<T> &&other) {
return *this; return *this;
} }
class CancellationToken {
public:
explicit CancellationToken(bool init = false) {
if (init) {
ptr_ = std::make_shared<std::atomic<bool>>(false);
}
}
CancellationToken(const CancellationToken &other) = default;
CancellationToken &operator=(const CancellationToken &other) {
cancel();
ptr_ = other.ptr_;
return *this;
}
CancellationToken(CancellationToken &&other) = default;
CancellationToken &operator=(CancellationToken &&other) {
cancel();
ptr_ = std::move(other.ptr_);
return *this;
}
~CancellationToken() {
cancel();
}
bool is_canceled() const {
return !ptr_ || *ptr_;
}
void cancel() {
if (ptr_) {
ptr_->store(true, std::memory_order_relaxed);
ptr_.reset();
}
}
private:
std::shared_ptr<std::atomic<bool>> ptr_;
};
namespace detail { namespace detail {
class EventPromise : public PromiseInterface<Unit> { class EventPromise : public PromiseInterface<Unit> {
@ -239,6 +294,25 @@ struct DropResult<Result<T>> {
template <class T> template <class T>
using drop_result_t = typename DropResult<T>::type; using drop_result_t = typename DropResult<T>::type;
template <class PromiseT>
class CancellablePromise : public PromiseT {
public:
template <class... ArgsT>
CancellablePromise(CancellationToken cancellation_token, ArgsT &&... args)
: PromiseT(std::forward<ArgsT>(args)...), cancellation_token_(std::move(cancellation_token)) {
}
virtual bool is_cancellable() const {
return true;
;
}
virtual bool is_cancelled() const {
return cancellation_token_.is_canceled();
}
private:
CancellationToken cancellation_token_;
};
template <class ValueT, class FunctionOkT, class FunctionFailT> template <class ValueT, class FunctionOkT, class FunctionFailT>
class LambdaPromise : public PromiseInterface<ValueT> { class LambdaPromise : public PromiseInterface<ValueT> {
enum OnFail { None, Ok, Fail }; enum OnFail { None, Ok, Fail };
@ -549,6 +623,13 @@ class PromiseCreator {
std::forward<OkT>(ok), std::forward<FailT>(fail), false)); std::forward<OkT>(ok), std::forward<FailT>(fail), false));
} }
template <class OkT, class ArgT = detail::drop_result_t<detail::get_arg_t<OkT>>>
static auto cancellable_lambda(CancellationToken cancellation_token, OkT &&ok) {
return Promise<ArgT>(
std::make_unique<detail::CancellablePromise<detail::LambdaPromise<ArgT, std::decay_t<OkT>, Ignore>>>(
std::move(cancellation_token), std::forward<OkT>(ok), Ignore(), true));
}
static Promise<> event(EventFull &&ok) { static Promise<> event(EventFull &&ok) {
return Promise<>(std::make_unique<detail::EventPromise>(std::move(ok))); return Promise<>(std::make_unique<detail::EventPromise>(std::move(ok)));
} }