diff --git a/benchmark/bench_http_server_cheat.cpp b/benchmark/bench_http_server_cheat.cpp index 0c9824b3e..b27002356 100644 --- a/benchmark/bench_http_server_cheat.cpp +++ b/benchmark/bench_http_server_cheat.cpp @@ -39,7 +39,7 @@ class HelloWorld : public Actor { size_t write_pos_{0}; void start_up() override { - subscribe(socket_fd_.get_poll_info().extract_pollable_fd(this)); + Scheduler::subscribe(socket_fd_.get_poll_info().extract_pollable_fd(this)); HttpHeaderCreator hc; Slice content = "hello world"; //auto content = BufferSlice("hello world"); @@ -55,7 +55,7 @@ class HelloWorld : public Actor { void loop() override { auto status = do_loop(); if (status.is_error()) { - unsubscribe(socket_fd_.get_poll_info().get_pollable_fd_ref()); + Scheduler::unsubscribe(socket_fd_.get_poll_info().get_pollable_fd_ref()); stop(); LOG(ERROR) << "CLOSE: " << status; } diff --git a/benchmark/bench_http_server_fast.cpp b/benchmark/bench_http_server_fast.cpp index c97d978ef..b9efff3ce 100644 --- a/benchmark/bench_http_server_fast.cpp +++ b/benchmark/bench_http_server_fast.cpp @@ -31,7 +31,7 @@ class HttpEchoConnection : public Actor { HttpReader reader_; HttpQuery query_; void start_up() override { - subscribe(fd_.get_poll_info().extract_pollable_fd(this)); + Scheduler::subscribe(fd_.get_poll_info().extract_pollable_fd(this)); reader_.init(&fd_.input_buffer(), 1024 * 1024, 0); } diff --git a/td/mtproto/HandshakeActor.cpp b/td/mtproto/HandshakeActor.cpp index de9b60422..a3ea97e4d 100644 --- a/td/mtproto/HandshakeActor.cpp +++ b/td/mtproto/HandshakeActor.cpp @@ -32,7 +32,7 @@ void HandshakeActor::close() { } void HandshakeActor::start_up() { - subscribe(connection_->get_poll_info().extract_pollable_fd(this)); + Scheduler::subscribe(connection_->get_poll_info().extract_pollable_fd(this)); set_timeout_in(timeout_); yield(); } @@ -58,7 +58,7 @@ void HandshakeActor::return_connection(Status status) { if (status.is_error()) { status = Status::Error(status.code(), PSLICE() << status.message() << " : " << raw_connection->debug_str_); } - unsubscribe(raw_connection->get_poll_info().get_pollable_fd_ref()); + Scheduler::unsubscribe(raw_connection->get_poll_info().get_pollable_fd_ref()); if (raw_connection_promise_) { if (status.is_error()) { if (raw_connection->stats_callback()) { diff --git a/td/telegram/Client.cpp b/td/telegram/Client.cpp index 38cb1b1b9..6b45d8b8c 100644 --- a/td/telegram/Client.cpp +++ b/td/telegram/Client.cpp @@ -127,7 +127,7 @@ class TdProxy : public Actor { void start_up() override { auto &fd = input_queue_->reader_get_event_fd(); - ::td::subscribe(fd.get_poll_info().extract_pollable_fd(this), PollFlags::Read()); + Scheduler::subscribe(fd.get_poll_info().extract_pollable_fd(this), PollFlags::Read()); class Callback : public TdCallback { public: @@ -189,7 +189,7 @@ class TdProxy : public Actor { void tear_down() override { auto &fd = input_queue_->reader_get_event_fd(); - ::td::unsubscribe(fd.get_poll_info().get_pollable_fd_ref()); + Scheduler::unsubscribe(fd.get_poll_info().get_pollable_fd_ref()); } }; diff --git a/td/telegram/cli.cpp b/td/telegram/cli.cpp index 14b668bfb..cb3eea3eb 100644 --- a/td/telegram/cli.cpp +++ b/td/telegram/cli.cpp @@ -634,7 +634,7 @@ class CliClient final : public Actor { close_flag_ = true; dump_memory_usage(); td_.reset(); - unsubscribe(stdin_.get_poll_info().get_pollable_fd_ref()); + Scheduler::unsubscribe(stdin_.get_poll_info().get_pollable_fd_ref()); is_stdin_reader_stopped_ = true; yield(); } @@ -748,7 +748,7 @@ class CliClient final : public Actor { rl_attempted_completion_function = tg_cli_completion; reactivate_readline(); #endif - subscribe(stdin_.get_poll_info().extract_pollable_fd(this), PollFlags::Read()); + Scheduler::subscribe(stdin_.get_poll_info().extract_pollable_fd(this), PollFlags::Read()); if (get_chat_list_) { send_request(make_tl_object(std::numeric_limits::max(), 0, 100)); diff --git a/td/telegram/net/ConnectionCreator.cpp b/td/telegram/net/ConnectionCreator.cpp index 5e9047e1d..eddd55976 100644 --- a/td/telegram/net/ConnectionCreator.cpp +++ b/td/telegram/net/ConnectionCreator.cpp @@ -100,7 +100,7 @@ class PingActor : public Actor { ActorShared<> parent_; void start_up() override { - subscribe(ping_connection_->get_poll_info().extract_pollable_fd(this)); + Scheduler::subscribe(ping_connection_->get_poll_info().extract_pollable_fd(this)); set_timeout_in(10); yield(); } @@ -137,7 +137,7 @@ class PingActor : public Actor { CHECK(!promise_); return; } - unsubscribe(raw_connection->get_poll_info().get_pollable_fd_ref()); + Scheduler::unsubscribe(raw_connection->get_poll_info().get_pollable_fd_ref()); if (promise_) { if (status.is_error()) { if (raw_connection->stats_callback()) { diff --git a/td/telegram/net/Session.cpp b/td/telegram/net/Session.cpp index f62363887..c1b28536f 100644 --- a/td/telegram/net/Session.cpp +++ b/td/telegram/net/Session.cpp @@ -392,7 +392,7 @@ void Session::on_server_time_difference_updated() { } void Session::on_before_close() { - unsubscribe_before_close(current_info_->connection->get_poll_info().get_pollable_fd_ref()); + Scheduler::unsubscribe_before_close(current_info_->connection->get_poll_info().get_pollable_fd_ref()); } void Session::on_closed(Status status) { @@ -955,7 +955,7 @@ void Session::connection_open_finish(ConnectionInfo *info, info->connection->set_online(connection_online_flag_); } info->connection->set_name(name); - subscribe(info->connection->get_poll_info().extract_pollable_fd(this)); + Scheduler::subscribe(info->connection->get_poll_info().extract_pollable_fd(this)); info->mode = mode_; info->state = ConnectionInfo::State::Ready; info->created_at = Time::now_cached(); diff --git a/tdactor/td/actor/impl/Scheduler-decl.h b/tdactor/td/actor/impl/Scheduler-decl.h index fff953258..16dfaa383 100644 --- a/tdactor/td/actor/impl/Scheduler-decl.h +++ b/tdactor/td/actor/impl/Scheduler-decl.h @@ -108,9 +108,9 @@ class Scheduler { } void before_tail_send(const ActorId<> &actor_id); - void subscribe(PollableFd fd, PollFlags flags = PollFlags::ReadWrite()); - void unsubscribe(PollableFdRef fd); - void unsubscribe_before_close(PollableFdRef fd); + static void subscribe(PollableFd fd, PollFlags flags = PollFlags::ReadWrite()); + static void unsubscribe(PollableFdRef fd); + static void unsubscribe_before_close(PollableFdRef fd); void yield_actor(Actor *actor); void stop_actor(Actor *actor); @@ -240,10 +240,6 @@ class Scheduler { }; /*** Interface to current scheduler ***/ -void subscribe(PollableFd fd, PollFlags flags = PollFlags::ReadWrite()); -void unsubscribe(PollableFdRef fd); -void unsubscribe_before_close(PollableFdRef fd); - template TD_WARN_UNUSED_RESULT ActorOwn create_actor(Slice name, Args &&... args); template diff --git a/tdactor/td/actor/impl/Scheduler.cpp b/tdactor/td/actor/impl/Scheduler.cpp index c04a46f1b..9164fe4d2 100644 --- a/tdactor/td/actor/impl/Scheduler.cpp +++ b/tdactor/td/actor/impl/Scheduler.cpp @@ -58,7 +58,7 @@ void Scheduler::ServiceActor::start_up() { } #if !TD_PORT_WINDOWS auto &fd = inbound_->reader_get_event_fd(); - ::td::subscribe(fd.get_poll_info().extract_pollable_fd(this), PollFlags::Read()); + Scheduler::subscribe(fd.get_poll_info().extract_pollable_fd(this), PollFlags::Read()); subscribed_ = true; #endif yield(); @@ -100,7 +100,7 @@ void Scheduler::ServiceActor::tear_down() { return; } auto &fd = inbound_->reader_get_event_fd(); - ::td::unsubscribe(fd.get_poll_info().get_pollable_fd_ref()); + Scheduler::unsubscribe(fd.get_poll_info().get_pollable_fd_ref()); subscribed_ = false; #endif } diff --git a/tdactor/td/actor/impl/Scheduler.h b/tdactor/td/actor/impl/Scheduler.h index eda84250b..4226593d4 100644 --- a/tdactor/td/actor/impl/Scheduler.h +++ b/tdactor/td/actor/impl/Scheduler.h @@ -259,15 +259,15 @@ void Scheduler::send(ActorRef actor_ref, Event &&event) { } inline void Scheduler::subscribe(PollableFd fd, PollFlags flags) { - poll_.subscribe(std::move(fd), flags); + instance()->poll_.subscribe(std::move(fd), flags); } inline void Scheduler::unsubscribe(PollableFdRef fd) { - poll_.unsubscribe(std::move(fd)); + instance()->poll_.unsubscribe(std::move(fd)); } inline void Scheduler::unsubscribe_before_close(PollableFdRef fd) { - poll_.unsubscribe_before_close(std::move(fd)); + instance()->poll_.unsubscribe_before_close(std::move(fd)); } inline void Scheduler::yield_actor(Actor *actor) { @@ -357,18 +357,6 @@ inline void Scheduler::run(double timeout) { } /*** Interface to current scheduler ***/ -inline void subscribe(PollableFd fd, PollFlags flags) { - Scheduler::instance()->subscribe(std::move(fd), flags); -} - -inline void unsubscribe(PollableFdRef fd) { - Scheduler::instance()->unsubscribe(std::move(fd)); -} - -inline void unsubscribe_before_close(PollableFdRef fd) { - Scheduler::instance()->unsubscribe_before_close(std::move(fd)); -} - template ActorOwn create_actor(Slice name, Args &&... args) { return Scheduler::instance()->create_actor(name, std::forward(args)...); diff --git a/tdnet/td/net/HttpConnectionBase.cpp b/tdnet/td/net/HttpConnectionBase.cpp index 7975f2009..a4c501836 100644 --- a/tdnet/td/net/HttpConnectionBase.cpp +++ b/tdnet/td/net/HttpConnectionBase.cpp @@ -41,7 +41,7 @@ void HttpConnectionBase::live_event() { } void HttpConnectionBase::start_up() { - subscribe(fd_.get_poll_info().extract_pollable_fd(this)); + Scheduler::subscribe(fd_.get_poll_info().extract_pollable_fd(this)); reader_.init(read_sink_.get_output(), max_post_size_, max_files_); if (state_ == State::Read) { current_query_ = make_unique(); @@ -50,7 +50,7 @@ void HttpConnectionBase::start_up() { yield(); } void HttpConnectionBase::tear_down() { - unsubscribe_before_close(fd_.get_poll_info().get_pollable_fd_ref()); + Scheduler::unsubscribe_before_close(fd_.get_poll_info().get_pollable_fd_ref()); fd_.close(); } diff --git a/tdnet/td/net/TcpListener.cpp b/tdnet/td/net/TcpListener.cpp index b98b4c727..11e6282dd 100644 --- a/tdnet/td/net/TcpListener.cpp +++ b/tdnet/td/net/TcpListener.cpp @@ -10,7 +10,7 @@ #include "td/utils/port/detail/PollableFd.h" namespace td { -// TcpListener implementation + TcpListener::TcpListener(int port, ActorShared callback) : port_(port), callback_(std::move(callback)) { } @@ -26,13 +26,13 @@ void TcpListener::start_up() { return; } server_fd_ = r_socket.move_as_ok(); - subscribe(server_fd_.get_poll_info().extract_pollable_fd(this)); + Scheduler::subscribe(server_fd_.get_poll_info().extract_pollable_fd(this)); } void TcpListener::tear_down() { LOG(ERROR) << "TcpListener closed"; if (!server_fd_.empty()) { - unsubscribe_before_close(server_fd_.get_poll_info().get_pollable_fd_ref()); + Scheduler::unsubscribe_before_close(server_fd_.get_poll_info().get_pollable_fd_ref()); server_fd_.close(); } } diff --git a/tdnet/td/net/TransparentProxy.cpp b/tdnet/td/net/TransparentProxy.cpp index b9881e985..0685cb076 100644 --- a/tdnet/td/net/TransparentProxy.cpp +++ b/tdnet/td/net/TransparentProxy.cpp @@ -35,7 +35,7 @@ void TransparentProxy::on_error(Status status) { void TransparentProxy::tear_down() { VLOG(proxy) << "Finish to connect to proxy"; - unsubscribe(fd_.get_poll_info().get_pollable_fd_ref()); + Scheduler::unsubscribe(fd_.get_poll_info().get_pollable_fd_ref()); if (callback_) { if (!fd_.input_buffer().empty()) { LOG(ERROR) << "Have " << fd_.input_buffer().size() << " unread bytes"; @@ -53,7 +53,7 @@ void TransparentProxy::hangup() { void TransparentProxy::start_up() { VLOG(proxy) << "Begin to connect to proxy"; - subscribe(fd_.get_poll_info().extract_pollable_fd(this)); + Scheduler::subscribe(fd_.get_poll_info().extract_pollable_fd(this)); set_timeout_in(10); if (can_write(fd_)) { loop(); diff --git a/test/mtproto.cpp b/test/mtproto.cpp index 61e023f8f..95b43beea 100644 --- a/test/mtproto.cpp +++ b/test/mtproto.cpp @@ -95,12 +95,12 @@ class TestPingActor : public Actor { mtproto::TransportType{mtproto::TransportType::Tcp, 0, ""}, nullptr), 3); - subscribe(ping_connection_->get_poll_info().extract_pollable_fd(this)); + Scheduler::subscribe(ping_connection_->get_poll_info().extract_pollable_fd(this)); set_timeout_in(10); yield(); } void tear_down() override { - unsubscribe_before_close(ping_connection_->get_poll_info().get_pollable_fd_ref()); + Scheduler::unsubscribe_before_close(ping_connection_->get_poll_info().get_pollable_fd_ref()); ping_connection_->close(); Scheduler::instance()->finish(); } @@ -226,8 +226,8 @@ class HandshakeTestActor : public Actor { wait_for_result_ = true; create_actor( - "HandshakeActor", std::move(handshake_), std::move(raw_connection_), std::make_unique(), 10.0, - PromiseCreator::lambda([self = actor_id(this)](Result> raw_connection) { + "HandshakeActor", std::move(handshake_), std::move(raw_connection_), std::make_unique(), + 10.0, PromiseCreator::lambda([self = actor_id(this)](Result> raw_connection) { send_closure(self, &HandshakeTestActor::got_connection, std::move(raw_connection), 1); }), PromiseCreator::lambda([self = actor_id(this)](Result> handshake) {