Use LambdaPromise instead of PromiseActor.

This commit is contained in:
levlam 2022-06-30 19:59:30 +03:00
parent 4f3105f4a4
commit 3de310c951
10 changed files with 65 additions and 74 deletions

2
td

@ -1 +1 @@
Subproject commit b393215d6671863b6baf2a589d343cff9474f6ba
Subproject commit 9a061c30c1f1928f12bdb3feb2108a8052b102be

View File

@ -145,7 +145,7 @@ void ClientManager::send(PromisedQueryPtr query) {
std::move(query)); // will send 429 if the client is already closed
}
void ClientManager::get_stats(td::PromiseActor<td::BufferSlice> promise,
void ClientManager::get_stats(td::Promise<td::BufferSlice> promise,
td::vector<std::pair<td::string, td::string>> args) {
if (close_flag_) {
promise.set_value(td::BufferSlice("Closing"));
@ -394,7 +394,7 @@ PromisedQueryPtr ClientManager::get_webhook_restore_query(td::Slice token, td::S
auto query = td::make_unique<Query>(std::move(containers), token, is_test_dc, method, std::move(args),
td::vector<std::pair<td::MutableSlice, td::MutableSlice>>(),
td::vector<td::HttpFile>(), std::move(shared_data), td::IPAddress(), true);
return PromisedQueryPtr(query.release(), PromiseDeleter(td::PromiseActor<td::unique_ptr<Query>>()));
return PromisedQueryPtr(query.release(), PromiseDeleter(td::Promise<td::unique_ptr<Query>>()));
}
void ClientManager::raw_event(const td::Event::Raw &event) {

View File

@ -43,7 +43,7 @@ class ClientManager final : public td::Actor {
void send(PromisedQueryPtr query);
void get_stats(td::PromiseActor<td::BufferSlice> promise, td::vector<std::pair<td::string, td::string>> args);
void get_stats(td::Promise<td::BufferSlice> promise, td::vector<std::pair<td::string, td::string>> args);
void close(td::Promise<td::Unit> &&promise);

View File

@ -48,22 +48,17 @@ void HttpConnection::handle(td::unique_ptr<td::HttpQuery> http_query,
std::move(http_query->args_), std::move(http_query->headers_),
std::move(http_query->files_), shared_data_, http_query->peer_address_, false);
td::PromiseActor<td::unique_ptr<Query>> promise;
td::FutureActor<td::unique_ptr<Query>> future;
td::init_promise_future(&promise, &future);
future.set_event(td::EventCreator::yield(actor_id()));
auto promise = td::PromiseCreator::lambda([actor_id = actor_id(this)](td::Result<td::unique_ptr<Query>> r_query) {
send_closure(actor_id, &HttpConnection::on_query_finished, std::move(r_query));
});
auto promised_query = PromisedQueryPtr(query.release(), PromiseDeleter(std::move(promise)));
send_closure(client_manager_, &ClientManager::send, std::move(promised_query));
result_ = std::move(future);
}
void HttpConnection::wakeup() {
if (result_.empty()) {
return;
}
LOG_CHECK(result_.is_ok()) << result_.move_as_error();
void HttpConnection::on_query_finished(td::Result<td::unique_ptr<Query>> r_query) {
LOG_CHECK(r_query.is_ok()) << r_query.error();
auto query = result_.move_as_ok();
auto query = r_query.move_as_ok();
send_response(query->http_status_code(), std::move(query->answer()), query->retry_after());
}

View File

@ -32,10 +32,7 @@ class HttpConnection final : public td::HttpInboundConnection::Callback {
void handle(td::unique_ptr<td::HttpQuery> http_query, td::ActorOwn<td::HttpInboundConnection> connection) final;
void wakeup() final;
private:
td::FutureActor<td::unique_ptr<Query>> result_;
td::ActorId<ClientManager> client_manager_;
td::ActorOwn<td::HttpInboundConnection> connection_;
std::shared_ptr<SharedData> shared_data_;
@ -45,6 +42,8 @@ class HttpConnection final : public td::HttpInboundConnection::Callback {
stop();
}
void on_query_finished(td::Result<td::unique_ptr<Query>> r_query);
void send_response(int http_status_code, td::BufferSlice &&content, int retry_after);
void send_http_error(int http_status_code, td::Slice description);

View File

@ -18,22 +18,20 @@ void HttpStatConnection::handle(td::unique_ptr<td::HttpQuery> http_query,
CHECK(connection_->empty());
connection_ = std::move(connection);
td::PromiseActor<td::BufferSlice> promise;
td::FutureActor<td::BufferSlice> future;
init_promise_future(&promise, &future);
future.set_event(td::EventCreator::yield(actor_id()));
LOG(DEBUG) << "SEND";
auto promise = td::PromiseCreator::lambda([actor_id = actor_id(this)](td::Result<td::BufferSlice> result) {
send_closure(actor_id, &HttpStatConnection::on_result, std::move(result));
});
send_closure(client_manager_, &ClientManager::get_stats, std::move(promise), http_query->get_args());
result_ = std::move(future);
}
void HttpStatConnection::wakeup() {
if (result_.empty()) {
void HttpStatConnection::on_result(td::Result<td::BufferSlice> result) {
if (result.is_error()) {
send_closure(connection_.release(), &td::HttpInboundConnection::write_error,
td::Status::Error(500, "Internal Server Error: closing"));
return;
}
LOG_CHECK(result_.is_ok()) << result_.move_as_error();
auto content = result_.move_as_ok();
auto content = result.move_as_ok();
td::HttpHeaderCreator hc;
hc.init_status_line(200);
hc.set_keep_alive();

View File

@ -12,9 +12,9 @@
#include "td/net/HttpQuery.h"
#include "td/actor/actor.h"
#include "td/actor/PromiseFuture.h"
#include "td/utils/buffer.h"
#include "td/utils/Status.h"
namespace telegram_bot_api {
@ -22,15 +22,15 @@ class HttpStatConnection final : public td::HttpInboundConnection::Callback {
public:
explicit HttpStatConnection(td::ActorId<ClientManager> client_manager) : client_manager_(client_manager) {
}
void handle(td::unique_ptr<td::HttpQuery> http_query, td::ActorOwn<td::HttpInboundConnection> connection) final;
void wakeup() final;
private:
td::FutureActor<td::BufferSlice> result_;
td::ActorId<ClientManager> client_manager_;
td::ActorOwn<td::HttpInboundConnection> connection_;
void on_result(td::Result<td::BufferSlice> result);
void hangup() final {
connection_.release();
stop();

View File

@ -227,7 +227,7 @@ class JsonQueryError final : public td::Jsonable {
class PromiseDeleter {
public:
explicit PromiseDeleter(td::PromiseActor<td::unique_ptr<Query>> &&promise) : promise_(std::move(promise)) {
explicit PromiseDeleter(td::Promise<td::unique_ptr<Query>> &&promise) : promise_(std::move(promise)) {
}
PromiseDeleter() = default;
PromiseDeleter(const PromiseDeleter &) = delete;
@ -236,7 +236,7 @@ class PromiseDeleter {
PromiseDeleter &operator=(PromiseDeleter &&) = default;
void operator()(Query *raw_ptr) {
td::unique_ptr<Query> query(raw_ptr); // now I cannot forget to delete this pointer
if (!promise_.empty_promise()) {
if (promise_) {
if (!query->is_ready()) {
query->set_retry_after_error(5);
}
@ -245,11 +245,11 @@ class PromiseDeleter {
}
}
~PromiseDeleter() {
CHECK(promise_.empty());
CHECK(!promise_);
}
private:
td::PromiseActor<td::unique_ptr<Query>> promise_;
td::Promise<td::unique_ptr<Query>> promise_;
};
using PromisedQueryPtr = std::unique_ptr<Query, PromiseDeleter>;

View File

@ -82,7 +82,7 @@ void WebhookActor::relax_wakeup_at(double wakeup_at, const char *source) {
}
void WebhookActor::resolve_ip_address() {
if (fix_ip_address_) {
if (fix_ip_address_ || is_ip_address_being_resolved_) {
return;
}
if (td::Time::now() < next_ip_address_resolve_time_) {
@ -90,43 +90,42 @@ void WebhookActor::resolve_ip_address() {
return;
}
bool future_created = false;
if (future_ip_address_.empty()) {
td::PromiseActor<td::IPAddress> promise;
init_promise_future(&promise, &future_ip_address_);
future_created = true;
send_closure(parameters_->get_host_by_name_actor_id_, &td::GetHostByNameActor::run, url_.host_, url_.port_, false,
td::PromiseCreator::from_promise_actor(std::move(promise)));
is_ip_address_being_resolved_ = true;
auto promise = td::PromiseCreator::lambda([actor_id = actor_id(this)](td::Result<td::IPAddress> r_ip_address) {
send_closure(actor_id, &WebhookActor::on_resolved_ip_address, std::move(r_ip_address));
});
send_closure(parameters_->get_host_by_name_actor_id_, &td::GetHostByNameActor::run, url_.host_, url_.port_, false,
std::move(promise));
}
void WebhookActor::on_resolved_ip_address(td::Result<td::IPAddress> r_ip_address) {
CHECK(is_ip_address_being_resolved_);
is_ip_address_being_resolved_ = false;
next_ip_address_resolve_time_ =
td::Time::now() + IP_ADDRESS_CACHE_TIME + td::Random::fast(0, IP_ADDRESS_CACHE_TIME / 10);
relax_wakeup_at(next_ip_address_resolve_time_, "on_resolved_ip_address");
SCOPE_EXIT {
loop();
};
if (r_ip_address.is_error()) {
return on_error(r_ip_address.move_as_error());
}
if (future_ip_address_.is_ready()) {
next_ip_address_resolve_time_ =
td::Time::now() + IP_ADDRESS_CACHE_TIME + td::Random::fast(0, IP_ADDRESS_CACHE_TIME / 10);
relax_wakeup_at(next_ip_address_resolve_time_, "resolve_ip_address");
auto r_ip_address = future_ip_address_.move_as_result();
if (r_ip_address.is_error()) {
CHECK(!(r_ip_address.error() == td::Status::Error<td::FutureActor<td::IPAddress>::HANGUP_ERROR_CODE>()));
return on_error(r_ip_address.move_as_error());
}
auto new_ip_address = r_ip_address.move_as_ok();
if (!check_ip_address(new_ip_address)) {
return on_error(td::Status::Error(PSLICE() << "IP address " << new_ip_address.get_ip_str() << " is reserved"));
}
if (!(ip_address_ == new_ip_address)) {
VLOG(webhook) << "IP address has changed: " << ip_address_ << " --> " << new_ip_address;
ip_address_ = new_ip_address;
ip_generation_++;
if (was_checked_) {
on_webhook_verified();
}
}
VLOG(webhook) << "IP address was verified";
} else {
if (future_created) {
future_ip_address_.set_event(td::EventCreator::yield(actor_id()));
auto new_ip_address = r_ip_address.move_as_ok();
if (!check_ip_address(new_ip_address)) {
return on_error(td::Status::Error(PSLICE() << "IP address " << new_ip_address.get_ip_str() << " is reserved"));
}
if (!(ip_address_ == new_ip_address)) {
VLOG(webhook) << "IP address has changed: " << ip_address_ << " --> " << new_ip_address;
ip_address_ = new_ip_address;
ip_generation_++;
if (was_checked_) {
on_webhook_verified();
}
}
VLOG(webhook) << "IP address was verified";
}
td::Status WebhookActor::create_connection() {
@ -605,8 +604,7 @@ void WebhookActor::handle(td::unique_ptr<td::HttpQuery> response) {
td::MutableSlice(), std::move(response->args_),
std::move(response->headers_), std::move(response->files_),
parameters_->shared_data_, response->peer_address_, false);
auto promised_query =
PromisedQueryPtr(query.release(), PromiseDeleter(td::PromiseActor<td::unique_ptr<Query>>()));
auto promised_query = PromisedQueryPtr(query.release(), PromiseDeleter(td::Promise<td::unique_ptr<Query>>()));
send_closure(callback_, &Callback::send, std::move(promised_query));
}
first_error_410_time_ = 0;

View File

@ -137,7 +137,7 @@ class WebhookActor final : public td::HttpOutboundConnection::Callback {
td::IPAddress ip_address_;
td::int32 ip_generation_ = 0;
double next_ip_address_resolve_time_ = 0;
td::FutureActor<td::IPAddress> future_ip_address_;
bool is_ip_address_being_resolved_ = false;
class Connection final : public td::ListNode {
public:
@ -175,6 +175,7 @@ class WebhookActor final : public td::HttpOutboundConnection::Callback {
void relax_wakeup_at(double wakeup_at, const char *source);
void resolve_ip_address();
void on_resolved_ip_address(td::Result<td::IPAddress> r_ip_address);
td::Result<td::SslStream> create_ssl_stream();
td::Status create_connection() TD_WARN_UNUSED_RESULT;