HttpConnection: migrate connections to a dedicated scheduler
GitOrigin-RevId: 91ece928204ee732e81eaedd5e869810c5bea3b8
This commit is contained in:
parent
8132c4dfd3
commit
04667e86a7
@ -17,13 +17,14 @@ namespace td {
|
|||||||
namespace detail {
|
namespace detail {
|
||||||
|
|
||||||
HttpConnectionBase::HttpConnectionBase(State state, SocketFd fd, SslStream ssl_stream, size_t max_post_size,
|
HttpConnectionBase::HttpConnectionBase(State state, SocketFd fd, SslStream ssl_stream, size_t max_post_size,
|
||||||
size_t max_files, int32 idle_timeout)
|
size_t max_files, int32 idle_timeout, int32 slow_scheduler_id)
|
||||||
: state_(state)
|
: state_(state)
|
||||||
, fd_(std::move(fd))
|
, fd_(std::move(fd))
|
||||||
, ssl_stream_(std::move(ssl_stream))
|
, ssl_stream_(std::move(ssl_stream))
|
||||||
, max_post_size_(max_post_size)
|
, max_post_size_(max_post_size)
|
||||||
, max_files_(max_files)
|
, max_files_(max_files)
|
||||||
, idle_timeout_(idle_timeout) {
|
, idle_timeout_(idle_timeout)
|
||||||
|
, slow_scheduler_id_(slow_scheduler_id) {
|
||||||
CHECK(state_ != State::Close);
|
CHECK(state_ != State::Close);
|
||||||
|
|
||||||
if (ssl_stream_) {
|
if (ssl_stream_) {
|
||||||
@ -109,9 +110,18 @@ void HttpConnectionBase::loop() {
|
|||||||
// TODO: read_next even when state_ == State::Write
|
// TODO: read_next even when state_ == State::Write
|
||||||
|
|
||||||
bool want_read = false;
|
bool want_read = false;
|
||||||
|
bool can_be_slow = slow_scheduler_id_ == -1;
|
||||||
if (state_ == State::Read) {
|
if (state_ == State::Read) {
|
||||||
auto res = reader_.read_next(current_query_.get());
|
auto res = reader_.read_next(current_query_.get(), can_be_slow);
|
||||||
if (res.is_error()) {
|
if (res.is_error()) {
|
||||||
|
if (res.error().message() == "SLOW") {
|
||||||
|
LOG(INFO) << "Slow HTTP connection: migrate to " << slow_scheduler_id_;
|
||||||
|
CHECK(!can_be_slow);
|
||||||
|
yield();
|
||||||
|
migrate(slow_scheduler_id_);
|
||||||
|
slow_scheduler_id_ = -1;
|
||||||
|
return;
|
||||||
|
}
|
||||||
live_event();
|
live_event();
|
||||||
state_ = State::Write;
|
state_ = State::Write;
|
||||||
LOG(INFO) << res.error();
|
LOG(INFO) << res.error();
|
||||||
|
@ -33,7 +33,7 @@ class HttpConnectionBase : public Actor {
|
|||||||
protected:
|
protected:
|
||||||
enum class State { Read, Write, Close };
|
enum class State { Read, Write, Close };
|
||||||
HttpConnectionBase(State state, SocketFd fd, SslStream ssl_stream, size_t max_post_size, size_t max_files,
|
HttpConnectionBase(State state, SocketFd fd, SslStream ssl_stream, size_t max_post_size, size_t max_files,
|
||||||
int32 idle_timeout);
|
int32 idle_timeout, int32 slow_scheduler_id);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
State state_;
|
State state_;
|
||||||
@ -57,6 +57,8 @@ class HttpConnectionBase : public Actor {
|
|||||||
unique_ptr<HttpQuery> current_query_;
|
unique_ptr<HttpQuery> current_query_;
|
||||||
bool close_after_write_ = false;
|
bool close_after_write_ = false;
|
||||||
|
|
||||||
|
int32 slow_scheduler_id_{-1};
|
||||||
|
|
||||||
void live_event();
|
void live_event();
|
||||||
|
|
||||||
void start_up() override;
|
void start_up() override;
|
||||||
|
@ -13,8 +13,9 @@
|
|||||||
namespace td {
|
namespace td {
|
||||||
|
|
||||||
HttpInboundConnection::HttpInboundConnection(SocketFd fd, size_t max_post_size, size_t max_files, int32 idle_timeout,
|
HttpInboundConnection::HttpInboundConnection(SocketFd fd, size_t max_post_size, size_t max_files, int32 idle_timeout,
|
||||||
ActorShared<Callback> callback)
|
ActorShared<Callback> callback, int32 slow_scheduler_id)
|
||||||
: HttpConnectionBase(State::Read, std::move(fd), SslStream(), max_post_size, max_files, idle_timeout)
|
: HttpConnectionBase(State::Read, std::move(fd), SslStream(), max_post_size, max_files, idle_timeout,
|
||||||
|
slow_scheduler_id)
|
||||||
, callback_(std::move(callback)) {
|
, callback_(std::move(callback)) {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -28,7 +28,7 @@ class HttpInboundConnection final : public detail::HttpConnectionBase {
|
|||||||
// void write_error(Status error);
|
// void write_error(Status error);
|
||||||
|
|
||||||
HttpInboundConnection(SocketFd fd, size_t max_post_size, size_t max_files, int32 idle_timeout,
|
HttpInboundConnection(SocketFd fd, size_t max_post_size, size_t max_files, int32 idle_timeout,
|
||||||
ActorShared<Callback> callback);
|
ActorShared<Callback> callback, int32 slow_scheduler_id = -1);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
void on_query(unique_ptr<HttpQuery> query) override;
|
void on_query(unique_ptr<HttpQuery> query) override;
|
||||||
|
@ -25,9 +25,9 @@ class HttpOutboundConnection final : public detail::HttpConnectionBase {
|
|||||||
virtual void on_connection_error(Status error) = 0; // TODO rename to on_error
|
virtual void on_connection_error(Status error) = 0; // TODO rename to on_error
|
||||||
};
|
};
|
||||||
HttpOutboundConnection(SocketFd fd, SslStream ssl_stream, size_t max_post_size, size_t max_files, int32 idle_timeout,
|
HttpOutboundConnection(SocketFd fd, SslStream ssl_stream, size_t max_post_size, size_t max_files, int32 idle_timeout,
|
||||||
ActorShared<Callback> callback)
|
ActorShared<Callback> callback, int32 slow_scheduler_id = -1)
|
||||||
: HttpConnectionBase(HttpConnectionBase::State::Write, std::move(fd), std::move(ssl_stream), max_post_size,
|
: HttpConnectionBase(HttpConnectionBase::State::Write, std::move(fd), std::move(ssl_stream), max_post_size,
|
||||||
max_files, idle_timeout)
|
max_files, idle_timeout, slow_scheduler_id)
|
||||||
, callback_(std::move(callback)) {
|
, callback_(std::move(callback)) {
|
||||||
}
|
}
|
||||||
// Inherited interface
|
// Inherited interface
|
||||||
|
@ -60,7 +60,7 @@ void HttpReader::init(ChainBufferReader *input, size_t max_post_size, size_t max
|
|||||||
total_headers_length_ = 0;
|
total_headers_length_ = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
Result<size_t> HttpReader::read_next(HttpQuery *query) {
|
Result<size_t> HttpReader::read_next(HttpQuery *query, bool can_be_slow) {
|
||||||
if (query_ != query) {
|
if (query_ != query) {
|
||||||
CHECK(query_ == nullptr);
|
CHECK(query_ == nullptr);
|
||||||
query_ = query;
|
query_ = query;
|
||||||
@ -191,6 +191,9 @@ Result<size_t> HttpReader::read_next(HttpQuery *query) {
|
|||||||
return need_size;
|
return need_size;
|
||||||
}
|
}
|
||||||
case State::ReadContentToFile: {
|
case State::ReadContentToFile: {
|
||||||
|
if (!can_be_slow) {
|
||||||
|
return Status::Error("SLOW");
|
||||||
|
}
|
||||||
// save content to a file
|
// save content to a file
|
||||||
if (temp_file_.empty()) {
|
if (temp_file_.empty()) {
|
||||||
auto file = open_temp_file("file");
|
auto file = open_temp_file("file");
|
||||||
|
@ -27,7 +27,7 @@ class HttpReader {
|
|||||||
public:
|
public:
|
||||||
void init(ChainBufferReader *input, size_t max_post_size = std::numeric_limits<size_t>::max(),
|
void init(ChainBufferReader *input, size_t max_post_size = std::numeric_limits<size_t>::max(),
|
||||||
size_t max_files = 100);
|
size_t max_files = 100);
|
||||||
Result<size_t> read_next(HttpQuery *query) TD_WARN_UNUSED_RESULT; // TODO move query to init
|
Result<size_t> read_next(HttpQuery *query, bool can_be_slow = true) TD_WARN_UNUSED_RESULT; // TODO move query to init
|
||||||
|
|
||||||
HttpReader() = default;
|
HttpReader() = default;
|
||||||
HttpReader(const HttpReader &other) = delete;
|
HttpReader(const HttpReader &other) = delete;
|
||||||
|
Loading…
Reference in New Issue
Block a user