// // Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2020 // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) // #pragma once #include "td/actor/actor.h" #include "td/utils/CancellationToken.h" #include "td/utils/Closure.h" #include "td/utils/common.h" #include "td/utils/invoke.h" // for tuple_for_each #include "td/utils/ScopeGuard.h" #include "td/utils/Status.h" #include #include #include namespace td { template class PromiseInterface { public: PromiseInterface() = default; PromiseInterface(const PromiseInterface &) = delete; PromiseInterface &operator=(const PromiseInterface &) = delete; PromiseInterface(PromiseInterface &&) = default; PromiseInterface &operator=(PromiseInterface &&) = default; virtual ~PromiseInterface() = default; virtual void set_value(T &&value) { set_result(std::move(value)); } virtual void set_error(Status &&error) { set_result(std::move(error)); } virtual void set_result(Result &&result) { if (result.is_ok()) { set_value(result.move_as_ok()); } else { set_error(result.move_as_error()); } } virtual bool is_cancellable() const { return false; } virtual bool is_cancelled() const { return false; } virtual void start_migrate(int32 sched_id) { } virtual void finish_migrate() { } }; template class SafePromise; template class Promise { public: void set_value(T &&value) { if (!promise_) { return; } promise_->set_value(std::move(value)); promise_.reset(); } void set_error(Status &&error) { if (!promise_) { return; } promise_->set_error(std::move(error)); promise_.reset(); } void set_result(Result &&result) { if (!promise_) { return; } promise_->set_result(std::move(result)); promise_.reset(); } void reset() { promise_.reset(); } void start_migrate(int32 sched_id) { if (!promise_) { return; } promise_->start_migrate(sched_id); } void finish_migrate() { if (!promise_) { return; } promise_->finish_migrate(); } bool is_cancellable() const { if (!promise_) { return false; } return promise_->is_cancellable(); } bool is_cancelled() const { if (!promise_) { return false; } return promise_->is_cancelled(); } unique_ptr> release() { return std::move(promise_); } Promise() = default; explicit Promise(unique_ptr> promise) : promise_(std::move(promise)) { } Promise(SafePromise &&other); Promise &operator=(SafePromise &&other); explicit operator bool() { return static_cast(promise_); } private: unique_ptr> promise_; }; template void start_migrate(Promise &promise, int32 sched_id) { // promise.start_migrate(sched_id); } template void finish_migrate(Promise &promise) { // promise.finish_migrate(); } template class SafePromise { public: SafePromise(Promise promise, Result result) : promise_(std::move(promise)), result_(std::move(result)) { } SafePromise(const SafePromise &other) = delete; SafePromise &operator=(const SafePromise &other) = delete; SafePromise(SafePromise &&other) = default; SafePromise &operator=(SafePromise &&other) = default; ~SafePromise() { if (promise_) { promise_.set_result(std::move(result_)); } } Promise release() { return std::move(promise_); } private: Promise promise_; Result result_; }; template Promise::Promise(SafePromise &&other) : Promise(other.release()) { } template Promise &Promise::operator=(SafePromise &&other) { *this = other.release(); return *this; } namespace detail { class EventPromise : public PromiseInterface { public: void set_value(Unit &&) override { ok_.try_emit(); fail_.clear(); } void set_error(Status &&) override { do_set_error(); } EventPromise(const EventPromise &other) = delete; EventPromise &operator=(const EventPromise &other) = delete; EventPromise(EventPromise &&other) = delete; EventPromise &operator=(EventPromise &&other) = delete; ~EventPromise() override { do_set_error(); } EventPromise() = default; explicit EventPromise(EventFull ok) : ok_(std::move(ok)), use_ok_as_fail_(true) { } EventPromise(EventFull ok, EventFull fail) : ok_(std::move(ok)), fail_(std::move(fail)), use_ok_as_fail_(false) { } private: EventFull ok_; EventFull fail_; bool use_ok_as_fail_ = false; void do_set_error() { if (use_ok_as_fail_) { ok_.try_emit(); } else { ok_.clear(); fail_.try_emit(); } } }; template struct GetArg : public GetArg {}; template class GetArg { public: using type = Arg; }; template class GetArg { public: using type = Arg; }; template using get_arg_t = std::decay_t::type>; template struct DropResult { using type = T; }; template struct DropResult> { using type = T; }; template using drop_result_t = typename DropResult::type; template class CancellablePromise : public PromiseT { public: template CancellablePromise(CancellationToken cancellation_token, ArgsT &&... args) : PromiseT(std::forward(args)...), cancellation_token_(std::move(cancellation_token)) { } virtual bool is_cancellable() const { return true; } virtual bool is_cancelled() const { return static_cast(cancellation_token_); } private: CancellationToken cancellation_token_; }; template class LambdaPromise : public PromiseInterface { enum class OnFail { None, Ok, Fail }; public: void set_value(ValueT &&value) override { ok_(std::move(value)); on_fail_ = OnFail::None; } void set_error(Status &&error) override { do_error(std::move(error)); } LambdaPromise(const LambdaPromise &other) = delete; LambdaPromise &operator=(const LambdaPromise &other) = delete; LambdaPromise(LambdaPromise &&other) = delete; LambdaPromise &operator=(LambdaPromise &&other) = delete; ~LambdaPromise() override { do_error(Status::Error("Lost promise")); } template LambdaPromise(FromOkT &&ok, FromFailT &&fail, bool use_ok_as_fail) : ok_(std::forward(ok)) , fail_(std::forward(fail)) , on_fail_(use_ok_as_fail ? OnFail::Ok : OnFail::Fail) { } private: FunctionOkT ok_; FunctionFailT fail_; OnFail on_fail_ = OnFail::None; template > std::enable_if_t::value> do_error_impl(FuncT &func, Status &&status) { func(std::move(status)); } template > std::enable_if_t::value> do_error_impl(FuncT &func, Status &&status) { func(Auto()); } void do_error(Status &&error) { switch (on_fail_) { case OnFail::None: break; case OnFail::Ok: do_error_impl(ok_, std::move(error)); break; case OnFail::Fail: fail_(std::move(error)); break; } on_fail_ = OnFail::None; } }; template class JoinPromise : public PromiseInterface { public: explicit JoinPromise(ArgsT &&... arg) : promises_(std::forward(arg)...) { } void set_value(Unit &&) override { tuple_for_each(promises_, [](auto &promise) { promise.set_value(Unit()); }); } void set_error(Status &&error) override { tuple_for_each(promises_, [&error](auto &promise) { promise.set_error(error.clone()); }); } private: std::tuple...> promises_; }; } // namespace detail /*** FutureActor and PromiseActor ***/ template class FutureActor; template class PromiseActor; template class ActorTraits> { public: static constexpr bool is_lite = true; }; template class PromiseActor final : public PromiseInterface { friend class FutureActor; enum State { Waiting, Hangup }; public: PromiseActor() = default; PromiseActor(const PromiseActor &other) = delete; PromiseActor &operator=(const PromiseActor &other) = delete; PromiseActor(PromiseActor &&) = default; PromiseActor &operator=(PromiseActor &&) = default; ~PromiseActor() override { close(); } void set_value(T &&value) override; void set_error(Status &&error) override; void close() { future_id_.reset(); } // NB: if true is returned no further events will be sent bool is_hangup() { if (state_ == State::Hangup) { return true; } if (!future_id_.is_alive()) { state_ = State::Hangup; future_id_.release(); event_.clear(); return true; } return false; } template friend void init_promise_future(PromiseActor *promise, FutureActor *future); bool empty_promise() { return future_id_.empty(); } bool empty() { return future_id_.empty(); } private: ActorOwn> future_id_; EventFull event_; State state_ = State::Hangup; void init() { state_ = State::Waiting; event_.clear(); } }; template class FutureActor final : public Actor { friend class PromiseActor; public: enum State { Waiting, Ready }; static constexpr int HANGUP_ERROR_CODE = 426487; FutureActor() = default; FutureActor(const FutureActor &other) = delete; FutureActor &operator=(const FutureActor &other) = delete; FutureActor(FutureActor &&other) = default; FutureActor &operator=(FutureActor &&other) = default; ~FutureActor() override = default; bool is_ok() const { return is_ready() && result_.is_ok(); } bool is_error() const { CHECK(is_ready()); return is_ready() && result_.is_error(); } T move_as_ok() { return move_as_result().move_as_ok(); } Status move_as_error() TD_WARN_UNUSED_RESULT { return move_as_result().move_as_error(); } Result move_as_result() TD_WARN_UNUSED_RESULT { CHECK(is_ready()); SCOPE_EXIT { do_stop(); }; return std::move(result_); } bool is_ready() const { return !empty() && state_ == State::Ready; } void close() { event_.clear(); result_.clear(); do_stop(); } void set_event(EventFull &&event) { CHECK(!empty()); event_ = std::move(event); if (state_ != State::Waiting) { event_.try_emit_later(); } } State get_state() const { return state_; } template friend void init_promise_future(PromiseActor *promise, FutureActor *future); private: EventFull event_; Result result_ = Status::Error(500, "Empty FutureActor"); State state_; void set_value(T &&value) { set_result(std::move(value)); } void set_error(Status &&error) { set_result(std::move(error)); } void set_result(Result &&result) { CHECK(state_ == State::Waiting); result_ = std::move(result); state_ = State::Ready; event_.try_emit_later(); } void hangup() override { set_error(Status::Error()); } void start_up() override { // empty } void init() { CHECK(empty()); state_ = State::Waiting; event_.clear(); } }; template void PromiseActor::set_value(T &&value) { if (state_ == State::Waiting && !future_id_.empty()) { send_closure(std::move(future_id_), &FutureActor::set_value, std::move(value)); } } template void PromiseActor::set_error(Status &&error) { if (state_ == State::Waiting && !future_id_.empty()) { send_closure(std::move(future_id_), &FutureActor::set_error, std::move(error)); } } template void init_promise_future(PromiseActor *promise, FutureActor *future) { promise->init(); future->init(); promise->future_id_ = register_actor("FutureActor", future); CHECK(future->get_info() != nullptr); } template class PromiseFuture { public: PromiseFuture() { init_promise_future(&promise_, &future_); } PromiseActor &promise() { return promise_; } FutureActor &future() { return future_; } PromiseActor &&move_promise() { return std::move(promise_); } FutureActor &&move_future() { return std::move(future_); } private: PromiseActor promise_; FutureActor future_; }; template FutureActor send_promise(ActorId actor_id, ResultT (ActorBT::*func)(PromiseActor &&, DestArgsT...), ArgsT &&... args) { PromiseFuture pf; Scheduler::instance()->send_closure( std::move(actor_id), create_immediate_closure(func, pf.move_promise(), std::forward(args)...)); return pf.move_future(); } class PromiseCreator { public: struct Ignore { void operator()(Status &&error) { error.ignore(); } }; template >> static Promise lambda(OkT &&ok) { return Promise( td::make_unique, Ignore>>(std::forward(ok), Ignore(), true)); } template > static Promise lambda(OkT &&ok, FailT &&fail) { return Promise(td::make_unique, std::decay_t>>( std::forward(ok), std::forward(fail), false)); } template >> static auto cancellable_lambda(CancellationToken cancellation_token, OkT &&ok) { return Promise( td::make_unique, Ignore>>>( std::move(cancellation_token), std::forward(ok), Ignore(), true)); } static Promise<> event(EventFull &&ok) { return Promise<>(td::make_unique(std::move(ok))); } static Promise<> event(EventFull ok, EventFull fail) { return Promise<>(td::make_unique(std::move(ok), std::move(fail))); } template static Promise<> join(ArgsT &&... args) { return Promise<>(td::make_unique>(std::forward(args)...)); } template static Promise from_promise_actor(PromiseActor &&from) { return Promise(td::make_unique>(std::move(from))); } }; } // namespace td