Pass BufferedFd to HttpConnection.
This commit is contained in:
parent
a68d8e77ef
commit
377bd18909
@ -30,7 +30,7 @@ class HttpClient final : public HttpOutboundConnection::Callback {
|
|||||||
addr.init_ipv4_port("127.0.0.1", 8082).ensure();
|
addr.init_ipv4_port("127.0.0.1", 8082).ensure();
|
||||||
auto fd = SocketFd::open(addr);
|
auto fd = SocketFd::open(addr);
|
||||||
LOG_CHECK(fd.is_ok()) << fd.error();
|
LOG_CHECK(fd.is_ok()) << fd.error();
|
||||||
connection_ = create_actor<HttpOutboundConnection>("Connect", fd.move_as_ok(), SslStream{},
|
connection_ = create_actor<HttpOutboundConnection>("Connect", BufferedFd<SocketFd>(fd.move_as_ok()), SslStream{},
|
||||||
std::numeric_limits<size_t>::max(), 0, 0,
|
std::numeric_limits<size_t>::max(), 0, 0,
|
||||||
ActorOwn<HttpOutboundConnection::Callback>(actor_id(this)));
|
ActorOwn<HttpOutboundConnection::Callback>(actor_id(this)));
|
||||||
yield();
|
yield();
|
||||||
|
@ -56,8 +56,8 @@ class Server final : public TcpListener::Callback {
|
|||||||
LOG(ERROR) << "ACCEPT " << cnt++;
|
LOG(ERROR) << "ACCEPT " << cnt++;
|
||||||
pos_++;
|
pos_++;
|
||||||
auto scheduler_id = pos_ % (N != 0 ? N : 1) + (N != 0);
|
auto scheduler_id = pos_ % (N != 0 ? N : 1) + (N != 0);
|
||||||
create_actor_on_scheduler<HttpInboundConnection>("HttpInboundConnection", scheduler_id, std::move(fd), 1024 * 1024,
|
create_actor_on_scheduler<HttpInboundConnection>("HttpInboundConnection", scheduler_id,
|
||||||
0, 0,
|
BufferedFd<SocketFd>(std::move(fd)), 1024 * 1024, 0, 0,
|
||||||
create_actor_on_scheduler<HelloWorld>("HelloWorld", scheduler_id))
|
create_actor_on_scheduler<HelloWorld>("HelloWorld", scheduler_id))
|
||||||
.release();
|
.release();
|
||||||
}
|
}
|
||||||
|
@ -5,7 +5,6 @@
|
|||||||
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
|
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
|
||||||
//
|
//
|
||||||
#include "td/net/HttpHeaderCreator.h"
|
#include "td/net/HttpHeaderCreator.h"
|
||||||
#include "td/net/HttpInboundConnection.h"
|
|
||||||
#include "td/net/TcpListener.h"
|
#include "td/net/TcpListener.h"
|
||||||
|
|
||||||
#include "td/actor/actor.h"
|
#include "td/actor/actor.h"
|
||||||
@ -22,7 +21,6 @@
|
|||||||
|
|
||||||
namespace td {
|
namespace td {
|
||||||
|
|
||||||
// HttpInboundConnection header
|
|
||||||
static int cnt = 0;
|
static int cnt = 0;
|
||||||
class HelloWorld final : public Actor {
|
class HelloWorld final : public Actor {
|
||||||
public:
|
public:
|
||||||
@ -107,7 +105,7 @@ class Server final : public TcpListener::Callback {
|
|||||||
LOG(ERROR) << "ACCEPT " << cnt++;
|
LOG(ERROR) << "ACCEPT " << cnt++;
|
||||||
pos_++;
|
pos_++;
|
||||||
auto scheduler_id = pos_ % (N != 0 ? N : 1) + (N != 0);
|
auto scheduler_id = pos_ % (N != 0 ? N : 1) + (N != 0);
|
||||||
create_actor_on_scheduler<HelloWorld>("HttpInboundConnection", scheduler_id, std::move(fd)).release();
|
create_actor_on_scheduler<HelloWorld>("HelloWorld", scheduler_id, std::move(fd)).release();
|
||||||
}
|
}
|
||||||
void hangup() final {
|
void hangup() final {
|
||||||
// may be it should be default?..
|
// may be it should be default?..
|
||||||
|
@ -93,7 +93,7 @@ class Server final : public TcpListener::Callback {
|
|||||||
void accept(SocketFd fd) final {
|
void accept(SocketFd fd) final {
|
||||||
pos_++;
|
pos_++;
|
||||||
auto scheduler_id = pos_ % (N != 0 ? N : 1) + (N != 0);
|
auto scheduler_id = pos_ % (N != 0 ? N : 1) + (N != 0);
|
||||||
create_actor_on_scheduler<HttpEchoConnection>("HttpInboundConnection", scheduler_id, std::move(fd)).release();
|
create_actor_on_scheduler<HttpEchoConnection>("HttpEchoConnection", scheduler_id, std::move(fd)).release();
|
||||||
}
|
}
|
||||||
void hangup() final {
|
void hangup() final {
|
||||||
LOG(ERROR) << "Hanging up..";
|
LOG(ERROR) << "Hanging up..";
|
||||||
|
@ -16,7 +16,7 @@
|
|||||||
namespace td {
|
namespace td {
|
||||||
namespace detail {
|
namespace detail {
|
||||||
|
|
||||||
HttpConnectionBase::HttpConnectionBase(State state, SocketFd fd, SslStream ssl_stream, size_t max_post_size,
|
HttpConnectionBase::HttpConnectionBase(State state, BufferedFd<SocketFd> fd, SslStream ssl_stream, size_t max_post_size,
|
||||||
size_t max_files, int32 idle_timeout, int32 slow_scheduler_id)
|
size_t max_files, int32 idle_timeout, int32 slow_scheduler_id)
|
||||||
: state_(state)
|
: state_(state)
|
||||||
, fd_(std::move(fd))
|
, fd_(std::move(fd))
|
||||||
|
@ -32,7 +32,7 @@ class HttpConnectionBase : public Actor {
|
|||||||
|
|
||||||
protected:
|
protected:
|
||||||
enum class State { Read, Write, Close };
|
enum class State { Read, Write, Close };
|
||||||
HttpConnectionBase(State state, SocketFd fd, SslStream ssl_stream, size_t max_post_size, size_t max_files,
|
HttpConnectionBase(State state, BufferedFd<SocketFd> fd, SslStream ssl_stream, size_t max_post_size, size_t max_files,
|
||||||
int32 idle_timeout, int32 slow_scheduler_id);
|
int32 idle_timeout, int32 slow_scheduler_id);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
@ -12,8 +12,9 @@
|
|||||||
|
|
||||||
namespace td {
|
namespace td {
|
||||||
|
|
||||||
HttpInboundConnection::HttpInboundConnection(SocketFd fd, size_t max_post_size, size_t max_files, int32 idle_timeout,
|
HttpInboundConnection::HttpInboundConnection(BufferedFd<SocketFd> fd, size_t max_post_size, size_t max_files,
|
||||||
ActorShared<Callback> callback, int32 slow_scheduler_id)
|
int32 idle_timeout, ActorShared<Callback> callback,
|
||||||
|
int32 slow_scheduler_id)
|
||||||
: HttpConnectionBase(State::Read, std::move(fd), SslStream(), max_post_size, max_files, idle_timeout,
|
: HttpConnectionBase(State::Read, std::move(fd), SslStream(), max_post_size, max_files, idle_timeout,
|
||||||
slow_scheduler_id)
|
slow_scheduler_id)
|
||||||
, callback_(std::move(callback)) {
|
, callback_(std::move(callback)) {
|
||||||
|
@ -27,7 +27,7 @@ class HttpInboundConnection final : public detail::HttpConnectionBase {
|
|||||||
// void write_ok();
|
// void write_ok();
|
||||||
// void write_error(Status error);
|
// void write_error(Status error);
|
||||||
|
|
||||||
HttpInboundConnection(SocketFd fd, size_t max_post_size, size_t max_files, int32 idle_timeout,
|
HttpInboundConnection(BufferedFd<SocketFd> fd, size_t max_post_size, size_t max_files, int32 idle_timeout,
|
||||||
ActorShared<Callback> callback, int32 slow_scheduler_id = -1);
|
ActorShared<Callback> callback, int32 slow_scheduler_id = -1);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
@ -24,8 +24,8 @@ class HttpOutboundConnection final : public detail::HttpConnectionBase {
|
|||||||
virtual void handle(unique_ptr<HttpQuery> query) = 0;
|
virtual void handle(unique_ptr<HttpQuery> query) = 0;
|
||||||
virtual void on_connection_error(Status error) = 0; // TODO rename to on_error
|
virtual void on_connection_error(Status error) = 0; // TODO rename to on_error
|
||||||
};
|
};
|
||||||
HttpOutboundConnection(SocketFd fd, SslStream ssl_stream, size_t max_post_size, size_t max_files, int32 idle_timeout,
|
HttpOutboundConnection(BufferedFd<SocketFd> fd, SslStream ssl_stream, size_t max_post_size, size_t max_files,
|
||||||
ActorShared<Callback> callback, int32 slow_scheduler_id = -1)
|
int32 idle_timeout, ActorShared<Callback> callback, int32 slow_scheduler_id = -1)
|
||||||
: HttpConnectionBase(HttpConnectionBase::State::Write, std::move(fd), std::move(ssl_stream), max_post_size,
|
: HttpConnectionBase(HttpConnectionBase::State::Write, std::move(fd), std::move(ssl_stream), max_post_size,
|
||||||
max_files, idle_timeout, slow_scheduler_id)
|
max_files, idle_timeout, slow_scheduler_id)
|
||||||
, callback_(std::move(callback)) {
|
, callback_(std::move(callback)) {
|
||||||
|
@ -79,14 +79,14 @@ Status Wget::try_init() {
|
|||||||
return Status::Error("Sockets are not supported");
|
return Status::Error("Sockets are not supported");
|
||||||
}
|
}
|
||||||
if (url.protocol_ == HttpUrl::Protocol::Http) {
|
if (url.protocol_ == HttpUrl::Protocol::Http) {
|
||||||
connection_ = create_actor<HttpOutboundConnection>("Connect", std::move(fd), SslStream{},
|
connection_ = create_actor<HttpOutboundConnection>("Connect", BufferedFd<SocketFd>(std::move(fd)), SslStream{},
|
||||||
std::numeric_limits<std::size_t>::max(), 0, 0,
|
std::numeric_limits<std::size_t>::max(), 0, 0,
|
||||||
ActorOwn<HttpOutboundConnection::Callback>(actor_id(this)));
|
ActorOwn<HttpOutboundConnection::Callback>(actor_id(this)));
|
||||||
} else {
|
} else {
|
||||||
TRY_RESULT(ssl_stream, SslStream::create(url.host_, CSlice() /* certificate */, verify_peer_));
|
TRY_RESULT(ssl_stream, SslStream::create(url.host_, CSlice() /* certificate */, verify_peer_));
|
||||||
connection_ = create_actor<HttpOutboundConnection>("Connect", std::move(fd), std::move(ssl_stream),
|
connection_ = create_actor<HttpOutboundConnection>(
|
||||||
std::numeric_limits<std::size_t>::max(), 0, 0,
|
"Connect", BufferedFd<SocketFd>(std::move(fd)), std::move(ssl_stream), std::numeric_limits<std::size_t>::max(),
|
||||||
ActorOwn<HttpOutboundConnection::Callback>(actor_id(this)));
|
0, 0, ActorOwn<HttpOutboundConnection::Callback>(actor_id(this)));
|
||||||
}
|
}
|
||||||
|
|
||||||
send_closure(connection_, &HttpOutboundConnection::write_next, BufferSlice(header));
|
send_closure(connection_, &HttpOutboundConnection::write_next, BufferSlice(header));
|
||||||
|
Loading…
x
Reference in New Issue
Block a user