Explicitly use unique_ptr<HttpQuery>.
GitOrigin-RevId: 1d729c6b0a3d1deaf3423672414f155492b7a0e8
This commit is contained in:
parent
e3b9772cd2
commit
a5413cf50d
@ -22,6 +22,7 @@
|
|||||||
namespace td {
|
namespace td {
|
||||||
|
|
||||||
std::atomic<int> counter;
|
std::atomic<int> counter;
|
||||||
|
|
||||||
class HttpClient : public HttpOutboundConnection::Callback {
|
class HttpClient : public HttpOutboundConnection::Callback {
|
||||||
void start_up() override {
|
void start_up() override {
|
||||||
IPAddress addr;
|
IPAddress addr;
|
||||||
@ -48,7 +49,7 @@ class HttpClient : public HttpOutboundConnection::Callback {
|
|||||||
send_closure(connection_, &HttpOutboundConnection::write_ok);
|
send_closure(connection_, &HttpOutboundConnection::write_ok);
|
||||||
LOG(INFO) << "SEND";
|
LOG(INFO) << "SEND";
|
||||||
}
|
}
|
||||||
void handle(HttpQueryPtr result) override {
|
void handle(unique_ptr<HttpQuery> result) override {
|
||||||
loop();
|
loop();
|
||||||
}
|
}
|
||||||
void on_connection_error(Status error) override {
|
void on_connection_error(Status error) override {
|
||||||
|
@ -19,9 +19,10 @@
|
|||||||
namespace td {
|
namespace td {
|
||||||
|
|
||||||
static int cnt = 0;
|
static int cnt = 0;
|
||||||
|
|
||||||
class HelloWorld : public HttpInboundConnection::Callback {
|
class HelloWorld : public HttpInboundConnection::Callback {
|
||||||
public:
|
public:
|
||||||
void handle(HttpQueryPtr query, ActorOwn<HttpInboundConnection> connection) override {
|
void handle(unique_ptr<HttpQuery> query, ActorOwn<HttpInboundConnection> connection) override {
|
||||||
// LOG(ERROR) << *query;
|
// LOG(ERROR) << *query;
|
||||||
HttpHeaderCreator hc;
|
HttpHeaderCreator hc;
|
||||||
Slice content = "hello world";
|
Slice content = "hello world";
|
||||||
|
@ -27,7 +27,8 @@ int main(int argc, char *argv[]) {
|
|||||||
auto scheduler = td::make_unique<td::ConcurrentScheduler>();
|
auto scheduler = td::make_unique<td::ConcurrentScheduler>();
|
||||||
scheduler->init(0);
|
scheduler->init(0);
|
||||||
scheduler
|
scheduler
|
||||||
->create_actor_unsafe<td::Wget>(0, "Client", td::PromiseCreator::lambda([](td::Result<td::HttpQueryPtr> res) {
|
->create_actor_unsafe<td::Wget>(0, "Client",
|
||||||
|
td::PromiseCreator::lambda([](td::Result<td::unique_ptr<td::HttpQuery>> res) {
|
||||||
LOG(ERROR) << *res.ok();
|
LOG(ERROR) << *res.ok();
|
||||||
td::Scheduler::instance()->finish();
|
td::Scheduler::instance()->finish();
|
||||||
}),
|
}),
|
||||||
|
@ -126,7 +126,7 @@ static ActorOwn<> get_simple_config_impl(Promise<SimpleConfig> promise, int32 sc
|
|||||||
const int ttl = 3;
|
const int ttl = 3;
|
||||||
return ActorOwn<>(create_actor_on_scheduler<Wget>(
|
return ActorOwn<>(create_actor_on_scheduler<Wget>(
|
||||||
"Wget", scheduler_id,
|
"Wget", scheduler_id,
|
||||||
PromiseCreator::lambda([promise = std::move(promise)](Result<HttpQueryPtr> r_query) mutable {
|
PromiseCreator::lambda([promise = std::move(promise)](Result<unique_ptr<HttpQuery>> r_query) mutable {
|
||||||
promise.set_result([&]() -> Result<SimpleConfig> {
|
promise.set_result([&]() -> Result<SimpleConfig> {
|
||||||
TRY_RESULT(http_query, std::move(r_query));
|
TRY_RESULT(http_query, std::move(r_query));
|
||||||
return decode_config(http_query->content_);
|
return decode_config(http_query->content_);
|
||||||
@ -160,7 +160,7 @@ ActorOwn<> get_simple_config_google_dns(Promise<SimpleConfig> promise, const Con
|
|||||||
}
|
}
|
||||||
return ActorOwn<>(create_actor_on_scheduler<Wget>(
|
return ActorOwn<>(create_actor_on_scheduler<Wget>(
|
||||||
"Wget", scheduler_id,
|
"Wget", scheduler_id,
|
||||||
PromiseCreator::lambda([promise = std::move(promise)](Result<HttpQueryPtr> r_query) mutable {
|
PromiseCreator::lambda([promise = std::move(promise)](Result<unique_ptr<HttpQuery>> r_query) mutable {
|
||||||
promise.set_result([&]() -> Result<SimpleConfig> {
|
promise.set_result([&]() -> Result<SimpleConfig> {
|
||||||
TRY_RESULT(http_query, std::move(r_query));
|
TRY_RESULT(http_query, std::move(r_query));
|
||||||
TRY_RESULT(json, json_decode(http_query->content_));
|
TRY_RESULT(json, json_decode(http_query->content_));
|
||||||
|
@ -48,7 +48,7 @@ class GoogleDnsResolver : public Actor {
|
|||||||
const int timeout = 10;
|
const int timeout = 10;
|
||||||
const int ttl = 3;
|
const int ttl = 3;
|
||||||
begin_time_ = Time::now();
|
begin_time_ = Time::now();
|
||||||
auto wget_promise = PromiseCreator::lambda([actor_id = actor_id(this)](Result<HttpQueryPtr> r_http_query) {
|
auto wget_promise = PromiseCreator::lambda([actor_id = actor_id(this)](Result<unique_ptr<HttpQuery>> r_http_query) {
|
||||||
send_closure(actor_id, &GoogleDnsResolver::on_result, std::move(r_http_query));
|
send_closure(actor_id, &GoogleDnsResolver::on_result, std::move(r_http_query));
|
||||||
});
|
});
|
||||||
wget_ = create_actor<Wget>(
|
wget_ = create_actor<Wget>(
|
||||||
@ -58,7 +58,7 @@ class GoogleDnsResolver : public Actor {
|
|||||||
SslStream::VerifyPeer::Off);
|
SslStream::VerifyPeer::Off);
|
||||||
}
|
}
|
||||||
|
|
||||||
static Result<IPAddress> get_ip_address(Result<HttpQueryPtr> r_http_query) {
|
static Result<IPAddress> get_ip_address(Result<unique_ptr<HttpQuery>> r_http_query) {
|
||||||
TRY_RESULT(http_query, std::move(r_http_query));
|
TRY_RESULT(http_query, std::move(r_http_query));
|
||||||
TRY_RESULT(json_value, json_decode(http_query->content_));
|
TRY_RESULT(json_value, json_decode(http_query->content_));
|
||||||
if (json_value.type() != JsonValue::Type::Object) {
|
if (json_value.type() != JsonValue::Type::Object) {
|
||||||
@ -79,7 +79,7 @@ class GoogleDnsResolver : public Actor {
|
|||||||
return ip;
|
return ip;
|
||||||
}
|
}
|
||||||
|
|
||||||
void on_result(Result<HttpQueryPtr> r_http_query) {
|
void on_result(Result<unique_ptr<HttpQuery>> r_http_query) {
|
||||||
auto end_time = Time::now();
|
auto end_time = Time::now();
|
||||||
auto result = get_ip_address(std::move(r_http_query));
|
auto result = get_ip_address(std::move(r_http_query));
|
||||||
VLOG(dns_resolver) << "Init IPv" << (prefer_ipv6_ ? "6" : "4") << " host = " << host_ << " in "
|
VLOG(dns_resolver) << "Init IPv" << (prefer_ipv6_ ? "6" : "4") << " host = " << host_ << " in "
|
||||||
|
@ -21,6 +21,7 @@
|
|||||||
namespace td {
|
namespace td {
|
||||||
|
|
||||||
namespace detail {
|
namespace detail {
|
||||||
|
|
||||||
class HttpConnectionBase : public Actor {
|
class HttpConnectionBase : public Actor {
|
||||||
public:
|
public:
|
||||||
void write_next(BufferSlice buffer);
|
void write_next(BufferSlice buffer);
|
||||||
@ -50,7 +51,7 @@ class HttpConnectionBase : public Actor {
|
|||||||
size_t max_files_;
|
size_t max_files_;
|
||||||
int32 idle_timeout_;
|
int32 idle_timeout_;
|
||||||
HttpReader reader_;
|
HttpReader reader_;
|
||||||
HttpQueryPtr current_query_;
|
unique_ptr<HttpQuery> current_query_;
|
||||||
bool close_after_write_ = false;
|
bool close_after_write_ = false;
|
||||||
|
|
||||||
void live_event();
|
void live_event();
|
||||||
@ -60,8 +61,9 @@ class HttpConnectionBase : public Actor {
|
|||||||
void timeout_expired() override;
|
void timeout_expired() override;
|
||||||
void loop() override;
|
void loop() override;
|
||||||
|
|
||||||
virtual void on_query(HttpQueryPtr) = 0;
|
virtual void on_query(unique_ptr<HttpQuery> query) = 0;
|
||||||
virtual void on_error(Status error) = 0;
|
virtual void on_error(Status error) = 0;
|
||||||
};
|
};
|
||||||
|
|
||||||
} // namespace detail
|
} // namespace detail
|
||||||
} // namespace td
|
} // namespace td
|
||||||
|
@ -18,7 +18,7 @@ HttpInboundConnection::HttpInboundConnection(SocketFd fd, size_t max_post_size,
|
|||||||
, callback_(std::move(callback)) {
|
, callback_(std::move(callback)) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void HttpInboundConnection::on_query(HttpQueryPtr query) {
|
void HttpInboundConnection::on_query(unique_ptr<HttpQuery> query) {
|
||||||
CHECK(!callback_.empty());
|
CHECK(!callback_.empty());
|
||||||
send_closure(callback_, &Callback::handle, std::move(query), ActorOwn<HttpInboundConnection>(actor_id(this)));
|
send_closure(callback_, &Callback::handle, std::move(query), ActorOwn<HttpInboundConnection>(actor_id(this)));
|
||||||
}
|
}
|
||||||
|
@ -20,7 +20,7 @@ class HttpInboundConnection final : public detail::HttpConnectionBase {
|
|||||||
public:
|
public:
|
||||||
class Callback : public Actor {
|
class Callback : public Actor {
|
||||||
public:
|
public:
|
||||||
virtual void handle(HttpQueryPtr query, ActorOwn<HttpInboundConnection> connection) = 0;
|
virtual void handle(unique_ptr<HttpQuery> query, ActorOwn<HttpInboundConnection> connection) = 0;
|
||||||
};
|
};
|
||||||
// Inherited interface
|
// Inherited interface
|
||||||
// void write_next(BufferSlice buffer);
|
// void write_next(BufferSlice buffer);
|
||||||
@ -31,7 +31,7 @@ class HttpInboundConnection final : public detail::HttpConnectionBase {
|
|||||||
ActorShared<Callback> callback);
|
ActorShared<Callback> callback);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
void on_query(HttpQueryPtr query) override;
|
void on_query(unique_ptr<HttpQuery> query) override;
|
||||||
void on_error(Status error) override;
|
void on_error(Status error) override;
|
||||||
void hangup() override {
|
void hangup() override {
|
||||||
callback_.release();
|
callback_.release();
|
||||||
|
@ -10,7 +10,7 @@
|
|||||||
|
|
||||||
namespace td {
|
namespace td {
|
||||||
|
|
||||||
void HttpOutboundConnection::on_query(HttpQueryPtr query) {
|
void HttpOutboundConnection::on_query(unique_ptr<HttpQuery> query) {
|
||||||
CHECK(!callback_.empty());
|
CHECK(!callback_.empty());
|
||||||
send_closure(callback_, &Callback::handle, std::move(query));
|
send_closure(callback_, &Callback::handle, std::move(query));
|
||||||
}
|
}
|
||||||
|
@ -21,7 +21,7 @@ class HttpOutboundConnection final : public detail::HttpConnectionBase {
|
|||||||
public:
|
public:
|
||||||
class Callback : public Actor {
|
class Callback : public Actor {
|
||||||
public:
|
public:
|
||||||
virtual void handle(HttpQueryPtr query) = 0;
|
virtual void handle(unique_ptr<HttpQuery> query) = 0;
|
||||||
virtual void on_connection_error(Status error) = 0; // TODO rename to on_error
|
virtual void on_connection_error(Status error) = 0; // TODO rename to on_error
|
||||||
};
|
};
|
||||||
HttpOutboundConnection(SocketFd fd, SslStream ssl_stream, size_t max_post_size, size_t max_files, int32 idle_timeout,
|
HttpOutboundConnection(SocketFd fd, SslStream ssl_stream, size_t max_post_size, size_t max_files, int32 idle_timeout,
|
||||||
@ -36,7 +36,7 @@ class HttpOutboundConnection final : public detail::HttpConnectionBase {
|
|||||||
// void write_error(Status error);
|
// void write_error(Status error);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
void on_query(HttpQueryPtr query) override;
|
void on_query(unique_ptr<HttpQuery> query) override;
|
||||||
void on_error(Status error) override;
|
void on_error(Status error) override;
|
||||||
void hangup() override {
|
void hangup() override {
|
||||||
callback_.release();
|
callback_.release();
|
||||||
|
@ -42,8 +42,6 @@ class HttpQuery {
|
|||||||
int get_retry_after() const;
|
int get_retry_after() const;
|
||||||
};
|
};
|
||||||
|
|
||||||
using HttpQueryPtr = unique_ptr<HttpQuery>;
|
|
||||||
|
|
||||||
StringBuilder &operator<<(StringBuilder &sb, const HttpQuery &q);
|
StringBuilder &operator<<(StringBuilder &sb, const HttpQuery &q);
|
||||||
|
|
||||||
} // namespace td
|
} // namespace td
|
||||||
|
@ -22,8 +22,8 @@
|
|||||||
|
|
||||||
namespace td {
|
namespace td {
|
||||||
|
|
||||||
Wget::Wget(Promise<HttpQueryPtr> promise, string url, std::vector<std::pair<string, string>> headers, int32 timeout_in,
|
Wget::Wget(Promise<unique_ptr<HttpQuery>> promise, string url, std::vector<std::pair<string, string>> headers,
|
||||||
int32 ttl, bool prefer_ipv6, SslStream::VerifyPeer verify_peer)
|
int32 timeout_in, int32 ttl, bool prefer_ipv6, SslStream::VerifyPeer verify_peer)
|
||||||
: promise_(std::move(promise))
|
: promise_(std::move(promise))
|
||||||
, input_url_(std::move(url))
|
, input_url_(std::move(url))
|
||||||
, headers_(std::move(headers))
|
, headers_(std::move(headers))
|
||||||
@ -90,7 +90,7 @@ void Wget::loop() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void Wget::handle(HttpQueryPtr result) {
|
void Wget::handle(unique_ptr<HttpQuery> result) {
|
||||||
on_ok(std::move(result));
|
on_ok(std::move(result));
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -98,8 +98,9 @@ void Wget::on_connection_error(Status error) {
|
|||||||
on_error(std::move(error));
|
on_error(std::move(error));
|
||||||
}
|
}
|
||||||
|
|
||||||
void Wget::on_ok(HttpQueryPtr http_query_ptr) {
|
void Wget::on_ok(unique_ptr<HttpQuery> http_query_ptr) {
|
||||||
CHECK(promise_);
|
CHECK(promise_);
|
||||||
|
CHECK(http_query_ptr);
|
||||||
if ((http_query_ptr->code_ == 301 || http_query_ptr->code_ == 302 || http_query_ptr->code_ == 307 ||
|
if ((http_query_ptr->code_ == 301 || http_query_ptr->code_ == 302 || http_query_ptr->code_ == 307 ||
|
||||||
http_query_ptr->code_ == 308) &&
|
http_query_ptr->code_ == 308) &&
|
||||||
ttl_ > 0) {
|
ttl_ > 0) {
|
||||||
|
@ -22,23 +22,23 @@ namespace td {
|
|||||||
|
|
||||||
class Wget : public HttpOutboundConnection::Callback {
|
class Wget : public HttpOutboundConnection::Callback {
|
||||||
public:
|
public:
|
||||||
explicit Wget(Promise<HttpQueryPtr> promise, string url, std::vector<std::pair<string, string>> headers = {},
|
explicit Wget(Promise<unique_ptr<HttpQuery>> promise, string url, std::vector<std::pair<string, string>> headers = {},
|
||||||
int32 timeout_in = 10, int32 ttl = 3, bool prefer_ipv6 = false,
|
int32 timeout_in = 10, int32 ttl = 3, bool prefer_ipv6 = false,
|
||||||
SslStream::VerifyPeer verify_peer = SslStream::VerifyPeer::On);
|
SslStream::VerifyPeer verify_peer = SslStream::VerifyPeer::On);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
Status try_init();
|
Status try_init();
|
||||||
void loop() override;
|
void loop() override;
|
||||||
void handle(HttpQueryPtr result) override;
|
void handle(unique_ptr<HttpQuery> result) override;
|
||||||
void on_connection_error(Status error) override;
|
void on_connection_error(Status error) override;
|
||||||
void on_ok(HttpQueryPtr http_query_ptr);
|
void on_ok(unique_ptr<HttpQuery> http_query_ptr);
|
||||||
void on_error(Status error);
|
void on_error(Status error);
|
||||||
|
|
||||||
void tear_down() override;
|
void tear_down() override;
|
||||||
void start_up() override;
|
void start_up() override;
|
||||||
void timeout_expired() override;
|
void timeout_expired() override;
|
||||||
|
|
||||||
Promise<HttpQueryPtr> promise_;
|
Promise<unique_ptr<HttpQuery>> promise_;
|
||||||
ActorOwn<HttpOutboundConnection> connection_;
|
ActorOwn<HttpOutboundConnection> connection_;
|
||||||
string input_url_;
|
string input_url_;
|
||||||
std::vector<std::pair<string, string>> headers_;
|
std::vector<std::pair<string, string>> headers_;
|
||||||
|
Loading…
Reference in New Issue
Block a user