// // Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2021 // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) // #pragma once #include "td/actor/actor.h" #include "td/utils/CancellationToken.h" #include "td/utils/Closure.h" #include "td/utils/common.h" #include "td/utils/invoke.h" #include "td/utils/MovableValue.h" #include "td/utils/ScopeGuard.h" #include "td/utils/Status.h" #include #include #include namespace td { template class PromiseInterface { public: PromiseInterface() = default; PromiseInterface(const PromiseInterface &) = delete; PromiseInterface &operator=(const PromiseInterface &) = delete; PromiseInterface(PromiseInterface &&) = default; PromiseInterface &operator=(PromiseInterface &&) = default; virtual ~PromiseInterface() = default; virtual void set_value(T &&value) { set_result(std::move(value)); } virtual void set_error(Status &&error) { set_result(std::move(error)); } virtual void set_result(Result &&result) { if (result.is_ok()) { set_value(result.move_as_ok()); } else { set_error(result.move_as_error()); } } void operator()(T &&value) { set_value(std::move(value)); } void operator()(Status &&error) { set_error(std::move(error)); } void operator()(Result &&result) { set_result(std::move(result)); } virtual bool is_cancellable() const { return false; } virtual bool is_canceled() const { return false; } virtual void start_migrate(int32 sched_id) { } virtual void finish_migrate() { } }; namespace detail { template struct GetArg final : public GetArg {}; template class GetArg { public: using type = Arg; }; template class GetArg { public: using type = Arg; }; template using get_arg_t = std::decay_t::type>; template struct DropResult { using type = T; }; template struct DropResult> { using type = T; }; template using drop_result_t = typename DropResult::type; struct Ignore { void operator()(Status &&error) { error.ignore(); } }; template class LambdaPromise : public PromiseInterface { enum class OnFail { None, Ok, Fail }; public: void set_value(ValueT &&value) override { CHECK(has_lambda_.get()); do_ok(ok_, std::move(value)); on_fail_ = OnFail::None; } void set_error(Status &&error) override { CHECK(has_lambda_.get()); do_error(std::move(error)); } LambdaPromise(const LambdaPromise &other) = delete; LambdaPromise &operator=(const LambdaPromise &other) = delete; LambdaPromise(LambdaPromise &&other) = default; LambdaPromise &operator=(LambdaPromise &&other) = default; ~LambdaPromise() override { if (has_lambda_.get()) { do_error(Status::Error("Lost promise")); } } template LambdaPromise(FromOkT &&ok, FromFailT &&fail, bool use_ok_as_fail) : ok_(std::forward(ok)) , fail_(std::forward(fail)) , on_fail_(use_ok_as_fail ? OnFail::Ok : OnFail::Fail) , has_lambda_(true) { } template , LambdaPromise>::value, int> = 0> LambdaPromise(FromOkT &&ok) : LambdaPromise(std::forward(ok), Ignore(), true) { } private: FunctionOkT ok_; FunctionFailT fail_; OnFail on_fail_ = OnFail::None; MovableValue has_lambda_{false}; void do_error(Status &&error) { switch (on_fail_) { case OnFail::None: break; case OnFail::Ok: do_error(ok_, std::move(error)); break; case OnFail::Fail: do_error(fail_, std::move(error)); break; } on_fail_ = OnFail::None; } template std::enable_if_t>::value, void> do_error(F &&f, Status &&status) { f(Result(std::move(status))); } template std::enable_if_t>::value, void> do_error(F &&f, Y &&status) { f(Auto()); } template std::enable_if_t>::value, void> do_ok(F &&f, ValueT &&result) { f(Result(std::move(result))); } template std::enable_if_t>::value, void> do_ok(F &&f, ValueT &&result) { f(std::move(result)); } }; } // namespace detail template class SafePromise; template class Promise; constexpr std::false_type is_promise_interface(...); template constexpr std::true_type is_promise_interface(const PromiseInterface &promise); template constexpr std::true_type is_promise_interface(const Promise &promise); template constexpr bool is_promise_interface() { return decltype(is_promise_interface(std::declval()))::value; } constexpr std::false_type is_promise_interface_ptr(...); template constexpr std::true_type is_promise_interface_ptr(const unique_ptr &promise); template constexpr bool is_promise_interface_ptr() { return decltype(is_promise_interface_ptr(std::declval()))::value; } template ::value, bool> has_t = false> auto lambda_promise(F &&f) { return detail::LambdaPromise>>, std::decay_t>( std::forward(f)); } template ::value, bool> has_t = true> auto lambda_promise(F &&f) { return detail::LambdaPromise>(std::forward(f)); } template (), bool> from_promise_interface = true> auto &&promise_interface(F &&f) { return std::forward(f); } template (), bool> from_promise_interface = false> auto promise_interface(F &&f) { return lambda_promise(std::forward(f)); } template (), bool> from_promise_interface = true> auto promise_interface_ptr(F &&f) { return std::forward(f); } template (), bool> from_promise_interface = false> auto promise_interface_ptr(F &&f) { return td::make_unique(std::forward(f)))>>( promise_interface(std::forward(f))); } template class Promise { public: void set_value(T &&value) { if (!promise_) { return; } promise_->set_value(std::move(value)); promise_.reset(); } void set_error(Status &&error) { if (!promise_) { return; } promise_->set_error(std::move(error)); promise_.reset(); } void set_result(Result &&result) { if (!promise_) { return; } promise_->set_result(std::move(result)); promise_.reset(); } template void operator()(S &&result) { if (!promise_) { return; } promise_->operator()(std::forward(result)); promise_.reset(); } void reset() { promise_.reset(); } void start_migrate(int32 sched_id) { if (!promise_) { return; } promise_->start_migrate(sched_id); } void finish_migrate() { if (!promise_) { return; } promise_->finish_migrate(); } bool is_cancellable() const { if (!promise_) { return false; } return promise_->is_cancellable(); } bool is_canceled() const { if (!promise_) { return false; } return promise_->is_canceled(); } unique_ptr> release() { return std::move(promise_); } Promise() = default; explicit Promise(unique_ptr> promise) : promise_(std::move(promise)) { } Promise(Auto) { } Promise(SafePromise &&other); Promise &operator=(SafePromise &&other); template , Promise>::value, int> = 0> Promise(F &&f) : promise_(promise_interface_ptr(std::forward(f))) { } explicit operator bool() { return static_cast(promise_); } private: unique_ptr> promise_; }; template void start_migrate(Promise &promise, int32 sched_id) { // promise.start_migrate(sched_id); } template void finish_migrate(Promise &promise) { // promise.finish_migrate(); } template class SafePromise { public: SafePromise(Promise promise, Result result) : promise_(std::move(promise)), result_(std::move(result)) { } SafePromise(const SafePromise &other) = delete; SafePromise &operator=(const SafePromise &other) = delete; SafePromise(SafePromise &&other) = default; SafePromise &operator=(SafePromise &&other) = default; ~SafePromise() { if (promise_) { promise_.set_result(std::move(result_)); } } Promise release() { return std::move(promise_); } private: Promise promise_; Result result_; }; template Promise::Promise(SafePromise &&other) : Promise(other.release()) { } template Promise &Promise::operator=(SafePromise &&other) { *this = other.release(); return *this; } namespace detail { class EventPromise final : public PromiseInterface { public: void set_value(Unit &&) final { ok_.try_emit(); fail_.clear(); } void set_error(Status &&) final { do_set_error(); } EventPromise(const EventPromise &other) = delete; EventPromise &operator=(const EventPromise &other) = delete; EventPromise(EventPromise &&other) = delete; EventPromise &operator=(EventPromise &&other) = delete; ~EventPromise() final { do_set_error(); } EventPromise() = default; explicit EventPromise(EventFull ok) : ok_(std::move(ok)), use_ok_as_fail_(true) { } EventPromise(EventFull ok, EventFull fail) : ok_(std::move(ok)), fail_(std::move(fail)), use_ok_as_fail_(false) { } private: EventFull ok_; EventFull fail_; bool use_ok_as_fail_ = false; void do_set_error() { if (use_ok_as_fail_) { ok_.try_emit(); } else { ok_.clear(); fail_.try_emit(); } } }; template class CancellablePromise final : public PromiseT { public: template CancellablePromise(CancellationToken cancellation_token, ArgsT &&...args) : PromiseT(std::forward(args)...), cancellation_token_(std::move(cancellation_token)) { } bool is_cancellable() const final { return true; } bool is_canceled() const final { return static_cast(cancellation_token_); } private: CancellationToken cancellation_token_; }; template class JoinPromise final : public PromiseInterface { public: explicit JoinPromise(ArgsT &&...arg) : promises_(std::forward(arg)...) { } void set_value(Unit &&) final { tuple_for_each(promises_, [](auto &promise) { promise.set_value(Unit()); }); } void set_error(Status &&error) final { tuple_for_each(promises_, [&error](auto &promise) { promise.set_error(error.clone()); }); } private: std::tuple...> promises_; }; } // namespace detail class SendClosure { public: template void operator()(ArgsT &&...args) const { send_closure(std::forward(args)...); } }; //template //template //auto Promise::send_closure(ArgsT &&... args) { // return [promise = std::move(*this), t = std::make_tuple(std::forward(args)...)](auto &&r_res) mutable { // TRY_RESULT_PROMISE(promise, res, std::move(r_res)); // td2::call_tuple(SendClosure(), std::tuple_cat(std::move(t), std::make_tuple(std::move(res), std::move(promise)))); // }; //} template auto promise_send_closure(ArgsT &&...args) { return [t = std::make_tuple(std::forward(args)...)](auto &&res) mutable { call_tuple(SendClosure(), std::tuple_cat(std::move(t), std::make_tuple(std::forward(res)))); }; } /*** FutureActor and PromiseActor ***/ template class FutureActor; template class PromiseActor; template class ActorTraits> { public: static constexpr bool need_context = false; static constexpr bool need_start_up = false; }; template class PromiseActor final : public PromiseInterface { friend class FutureActor; enum State { Waiting, Hangup }; public: PromiseActor() = default; PromiseActor(const PromiseActor &other) = delete; PromiseActor &operator=(const PromiseActor &other) = delete; PromiseActor(PromiseActor &&) = default; PromiseActor &operator=(PromiseActor &&) = default; ~PromiseActor() final { close(); } void set_value(T &&value) final; void set_error(Status &&error) final; void close() { future_id_.reset(); } // NB: if true is returned no further events will be sent bool is_hangup() { if (state_ == State::Hangup) { return true; } if (!future_id_.is_alive()) { state_ = State::Hangup; future_id_.release(); event_.clear(); return true; } return false; } template friend void init_promise_future(PromiseActor *promise, FutureActor *future); bool empty_promise() { return future_id_.empty(); } bool empty() { return future_id_.empty(); } private: ActorOwn> future_id_; EventFull event_; State state_ = State::Hangup; void init() { state_ = State::Waiting; event_.clear(); } }; template class FutureActor final : public Actor { friend class PromiseActor; public: enum State { Waiting, Ready }; static constexpr int HANGUP_ERROR_CODE = 426487; FutureActor() = default; FutureActor(const FutureActor &other) = delete; FutureActor &operator=(const FutureActor &other) = delete; FutureActor(FutureActor &&other) = default; FutureActor &operator=(FutureActor &&other) = default; ~FutureActor() final = default; bool is_ok() const { return is_ready() && result_.is_ok(); } bool is_error() const { CHECK(is_ready()); return is_ready() && result_.is_error(); } T move_as_ok() { return move_as_result().move_as_ok(); } Status move_as_error() TD_WARN_UNUSED_RESULT { return move_as_result().move_as_error(); } Result move_as_result() TD_WARN_UNUSED_RESULT { CHECK(is_ready()); SCOPE_EXIT { do_stop(); }; return std::move(result_); } bool is_ready() const { return !empty() && state_ == State::Ready; } void close() { event_.clear(); result_.clear(); do_stop(); } void set_event(EventFull &&event) { CHECK(!empty()); event_ = std::move(event); if (state_ != State::Waiting) { event_.try_emit_later(); } } State get_state() const { return state_; } template friend void init_promise_future(PromiseActor *promise, FutureActor *future); private: EventFull event_; Result result_ = Status::Error(500, "Empty FutureActor"); State state_ = State::Waiting; void set_value(T &&value) { set_result(std::move(value)); } void set_error(Status &&error) { set_result(std::move(error)); } void set_result(Result &&result) { CHECK(state_ == State::Waiting); result_ = std::move(result); state_ = State::Ready; event_.try_emit_later(); } void hangup() final { set_error(Status::Error()); } void start_up() final { // empty } void init() { CHECK(empty()); state_ = State::Waiting; event_.clear(); } }; template void PromiseActor::set_value(T &&value) { if (state_ == State::Waiting && !future_id_.empty()) { send_closure(std::move(future_id_), &FutureActor::set_value, std::move(value)); } } template void PromiseActor::set_error(Status &&error) { if (state_ == State::Waiting && !future_id_.empty()) { send_closure(std::move(future_id_), &FutureActor::set_error, std::move(error)); } } template void init_promise_future(PromiseActor *promise, FutureActor *future) { promise->init(); future->init(); promise->future_id_ = register_actor("FutureActor", future); CHECK(future->get_info() != nullptr); } template class PromiseFuture { public: PromiseFuture() { init_promise_future(&promise_, &future_); } PromiseActor &promise() { return promise_; } FutureActor &future() { return future_; } PromiseActor &&move_promise() { return std::move(promise_); } FutureActor &&move_future() { return std::move(future_); } private: PromiseActor promise_; FutureActor future_; }; template FutureActor send_promise(ActorId actor_id, ResultT (ActorBT::*func)(PromiseActor &&, DestArgsT...), ArgsT &&...args) { PromiseFuture pf; Scheduler::instance()->send_closure( std::move(actor_id), create_immediate_closure(func, pf.move_promise(), std::forward(args)...)); return pf.move_future(); } class PromiseCreator { public: using Ignore = detail::Ignore; template >> static Promise lambda(OkT &&ok) { return Promise(td::make_unique>>(std::forward(ok))); } template > static Promise lambda(OkT &&ok, FailT &&fail) { return Promise(td::make_unique, std::decay_t>>( std::forward(ok), std::forward(fail), false)); } template >> static auto cancellable_lambda(CancellationToken cancellation_token, OkT &&ok) { return Promise(td::make_unique>>>( std::move(cancellation_token), std::forward(ok))); } static Promise<> event(EventFull &&ok) { return Promise<>(td::make_unique(std::move(ok))); } static Promise<> event(EventFull ok, EventFull fail) { return Promise<>(td::make_unique(std::move(ok), std::move(fail))); } template static Promise<> join(ArgsT &&...args) { return Promise<>(td::make_unique>(std::forward(args)...)); } template static Promise from_promise_actor(PromiseActor &&from) { return Promise(td::make_unique>(std::move(from))); } }; } // namespace td