From 04667e86a7405843a306dce53943f9e4c67f4022 Mon Sep 17 00:00:00 2001 From: Arseny Smirnov Date: Thu, 23 Jul 2020 19:47:12 +0300 Subject: [PATCH] HttpConnection: migrate connections to a dedicated scheduler GitOrigin-RevId: 91ece928204ee732e81eaedd5e869810c5bea3b8 --- tdnet/td/net/HttpConnectionBase.cpp | 16 +++++++++++++--- tdnet/td/net/HttpConnectionBase.h | 4 +++- tdnet/td/net/HttpInboundConnection.cpp | 5 +++-- tdnet/td/net/HttpInboundConnection.h | 2 +- tdnet/td/net/HttpOutboundConnection.h | 4 ++-- tdnet/td/net/HttpReader.cpp | 5 ++++- tdnet/td/net/HttpReader.h | 2 +- 7 files changed, 27 insertions(+), 11 deletions(-) diff --git a/tdnet/td/net/HttpConnectionBase.cpp b/tdnet/td/net/HttpConnectionBase.cpp index 7ef6a83c4..f302a2ff3 100644 --- a/tdnet/td/net/HttpConnectionBase.cpp +++ b/tdnet/td/net/HttpConnectionBase.cpp @@ -17,13 +17,14 @@ namespace td { namespace detail { HttpConnectionBase::HttpConnectionBase(State state, SocketFd fd, SslStream ssl_stream, size_t max_post_size, - size_t max_files, int32 idle_timeout) + size_t max_files, int32 idle_timeout, int32 slow_scheduler_id) : state_(state) , fd_(std::move(fd)) , ssl_stream_(std::move(ssl_stream)) , max_post_size_(max_post_size) , max_files_(max_files) - , idle_timeout_(idle_timeout) { + , idle_timeout_(idle_timeout) + , slow_scheduler_id_(slow_scheduler_id) { CHECK(state_ != State::Close); if (ssl_stream_) { @@ -109,9 +110,18 @@ void HttpConnectionBase::loop() { // TODO: read_next even when state_ == State::Write bool want_read = false; + bool can_be_slow = slow_scheduler_id_ == -1; if (state_ == State::Read) { - auto res = reader_.read_next(current_query_.get()); + auto res = reader_.read_next(current_query_.get(), can_be_slow); if (res.is_error()) { + if (res.error().message() == "SLOW") { + LOG(INFO) << "Slow HTTP connection: migrate to " << slow_scheduler_id_; + CHECK(!can_be_slow); + yield(); + migrate(slow_scheduler_id_); + slow_scheduler_id_ = -1; + return; + } live_event(); state_ = State::Write; LOG(INFO) << res.error(); diff --git a/tdnet/td/net/HttpConnectionBase.h b/tdnet/td/net/HttpConnectionBase.h index c965e863e..868aeac53 100644 --- a/tdnet/td/net/HttpConnectionBase.h +++ b/tdnet/td/net/HttpConnectionBase.h @@ -33,7 +33,7 @@ class HttpConnectionBase : public Actor { protected: enum class State { Read, Write, Close }; HttpConnectionBase(State state, SocketFd fd, SslStream ssl_stream, size_t max_post_size, size_t max_files, - int32 idle_timeout); + int32 idle_timeout, int32 slow_scheduler_id); private: State state_; @@ -57,6 +57,8 @@ class HttpConnectionBase : public Actor { unique_ptr current_query_; bool close_after_write_ = false; + int32 slow_scheduler_id_{-1}; + void live_event(); void start_up() override; diff --git a/tdnet/td/net/HttpInboundConnection.cpp b/tdnet/td/net/HttpInboundConnection.cpp index f457e2608..3af624b31 100644 --- a/tdnet/td/net/HttpInboundConnection.cpp +++ b/tdnet/td/net/HttpInboundConnection.cpp @@ -13,8 +13,9 @@ namespace td { HttpInboundConnection::HttpInboundConnection(SocketFd fd, size_t max_post_size, size_t max_files, int32 idle_timeout, - ActorShared callback) - : HttpConnectionBase(State::Read, std::move(fd), SslStream(), max_post_size, max_files, idle_timeout) + ActorShared callback, int32 slow_scheduler_id) + : HttpConnectionBase(State::Read, std::move(fd), SslStream(), max_post_size, max_files, idle_timeout, + slow_scheduler_id) , callback_(std::move(callback)) { } diff --git a/tdnet/td/net/HttpInboundConnection.h b/tdnet/td/net/HttpInboundConnection.h index d9ccf224d..b701227da 100644 --- a/tdnet/td/net/HttpInboundConnection.h +++ b/tdnet/td/net/HttpInboundConnection.h @@ -28,7 +28,7 @@ class HttpInboundConnection final : public detail::HttpConnectionBase { // void write_error(Status error); HttpInboundConnection(SocketFd fd, size_t max_post_size, size_t max_files, int32 idle_timeout, - ActorShared callback); + ActorShared callback, int32 slow_scheduler_id = -1); private: void on_query(unique_ptr query) override; diff --git a/tdnet/td/net/HttpOutboundConnection.h b/tdnet/td/net/HttpOutboundConnection.h index 732235c28..4e6766417 100644 --- a/tdnet/td/net/HttpOutboundConnection.h +++ b/tdnet/td/net/HttpOutboundConnection.h @@ -25,9 +25,9 @@ class HttpOutboundConnection final : public detail::HttpConnectionBase { virtual void on_connection_error(Status error) = 0; // TODO rename to on_error }; HttpOutboundConnection(SocketFd fd, SslStream ssl_stream, size_t max_post_size, size_t max_files, int32 idle_timeout, - ActorShared callback) + ActorShared callback, int32 slow_scheduler_id = -1) : HttpConnectionBase(HttpConnectionBase::State::Write, std::move(fd), std::move(ssl_stream), max_post_size, - max_files, idle_timeout) + max_files, idle_timeout, slow_scheduler_id) , callback_(std::move(callback)) { } // Inherited interface diff --git a/tdnet/td/net/HttpReader.cpp b/tdnet/td/net/HttpReader.cpp index f1fa19446..b22d54a70 100644 --- a/tdnet/td/net/HttpReader.cpp +++ b/tdnet/td/net/HttpReader.cpp @@ -60,7 +60,7 @@ void HttpReader::init(ChainBufferReader *input, size_t max_post_size, size_t max total_headers_length_ = 0; } -Result HttpReader::read_next(HttpQuery *query) { +Result HttpReader::read_next(HttpQuery *query, bool can_be_slow) { if (query_ != query) { CHECK(query_ == nullptr); query_ = query; @@ -191,6 +191,9 @@ Result HttpReader::read_next(HttpQuery *query) { return need_size; } case State::ReadContentToFile: { + if (!can_be_slow) { + return Status::Error("SLOW"); + } // save content to a file if (temp_file_.empty()) { auto file = open_temp_file("file"); diff --git a/tdnet/td/net/HttpReader.h b/tdnet/td/net/HttpReader.h index 5142a57e9..c3ebf2b2a 100644 --- a/tdnet/td/net/HttpReader.h +++ b/tdnet/td/net/HttpReader.h @@ -27,7 +27,7 @@ class HttpReader { public: void init(ChainBufferReader *input, size_t max_post_size = std::numeric_limits::max(), size_t max_files = 100); - Result read_next(HttpQuery *query) TD_WARN_UNUSED_RESULT; // TODO move query to init + Result read_next(HttpQuery *query, bool can_be_slow = true) TD_WARN_UNUSED_RESULT; // TODO move query to init HttpReader() = default; HttpReader(const HttpReader &other) = delete;