Make subscribe a static method of Scheduler.
GitOrigin-RevId: e80024cfa63a37fb9b64f514ac3407d4e97c7302
This commit is contained in:
parent
a165b42575
commit
9971b52b81
@ -39,7 +39,7 @@ class HelloWorld : public Actor {
|
|||||||
size_t write_pos_{0};
|
size_t write_pos_{0};
|
||||||
|
|
||||||
void start_up() override {
|
void start_up() override {
|
||||||
subscribe(socket_fd_.get_poll_info().extract_pollable_fd(this));
|
Scheduler::subscribe(socket_fd_.get_poll_info().extract_pollable_fd(this));
|
||||||
HttpHeaderCreator hc;
|
HttpHeaderCreator hc;
|
||||||
Slice content = "hello world";
|
Slice content = "hello world";
|
||||||
//auto content = BufferSlice("hello world");
|
//auto content = BufferSlice("hello world");
|
||||||
@ -55,7 +55,7 @@ class HelloWorld : public Actor {
|
|||||||
void loop() override {
|
void loop() override {
|
||||||
auto status = do_loop();
|
auto status = do_loop();
|
||||||
if (status.is_error()) {
|
if (status.is_error()) {
|
||||||
unsubscribe(socket_fd_.get_poll_info().get_pollable_fd_ref());
|
Scheduler::unsubscribe(socket_fd_.get_poll_info().get_pollable_fd_ref());
|
||||||
stop();
|
stop();
|
||||||
LOG(ERROR) << "CLOSE: " << status;
|
LOG(ERROR) << "CLOSE: " << status;
|
||||||
}
|
}
|
||||||
|
@ -31,7 +31,7 @@ class HttpEchoConnection : public Actor {
|
|||||||
HttpReader reader_;
|
HttpReader reader_;
|
||||||
HttpQuery query_;
|
HttpQuery query_;
|
||||||
void start_up() override {
|
void start_up() override {
|
||||||
subscribe(fd_.get_poll_info().extract_pollable_fd(this));
|
Scheduler::subscribe(fd_.get_poll_info().extract_pollable_fd(this));
|
||||||
reader_.init(&fd_.input_buffer(), 1024 * 1024, 0);
|
reader_.init(&fd_.input_buffer(), 1024 * 1024, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -32,7 +32,7 @@ void HandshakeActor::close() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
void HandshakeActor::start_up() {
|
void HandshakeActor::start_up() {
|
||||||
subscribe(connection_->get_poll_info().extract_pollable_fd(this));
|
Scheduler::subscribe(connection_->get_poll_info().extract_pollable_fd(this));
|
||||||
set_timeout_in(timeout_);
|
set_timeout_in(timeout_);
|
||||||
yield();
|
yield();
|
||||||
}
|
}
|
||||||
@ -58,7 +58,7 @@ void HandshakeActor::return_connection(Status status) {
|
|||||||
if (status.is_error()) {
|
if (status.is_error()) {
|
||||||
status = Status::Error(status.code(), PSLICE() << status.message() << " : " << raw_connection->debug_str_);
|
status = Status::Error(status.code(), PSLICE() << status.message() << " : " << raw_connection->debug_str_);
|
||||||
}
|
}
|
||||||
unsubscribe(raw_connection->get_poll_info().get_pollable_fd_ref());
|
Scheduler::unsubscribe(raw_connection->get_poll_info().get_pollable_fd_ref());
|
||||||
if (raw_connection_promise_) {
|
if (raw_connection_promise_) {
|
||||||
if (status.is_error()) {
|
if (status.is_error()) {
|
||||||
if (raw_connection->stats_callback()) {
|
if (raw_connection->stats_callback()) {
|
||||||
|
@ -127,7 +127,7 @@ class TdProxy : public Actor {
|
|||||||
|
|
||||||
void start_up() override {
|
void start_up() override {
|
||||||
auto &fd = input_queue_->reader_get_event_fd();
|
auto &fd = input_queue_->reader_get_event_fd();
|
||||||
::td::subscribe(fd.get_poll_info().extract_pollable_fd(this), PollFlags::Read());
|
Scheduler::subscribe(fd.get_poll_info().extract_pollable_fd(this), PollFlags::Read());
|
||||||
|
|
||||||
class Callback : public TdCallback {
|
class Callback : public TdCallback {
|
||||||
public:
|
public:
|
||||||
@ -189,7 +189,7 @@ class TdProxy : public Actor {
|
|||||||
|
|
||||||
void tear_down() override {
|
void tear_down() override {
|
||||||
auto &fd = input_queue_->reader_get_event_fd();
|
auto &fd = input_queue_->reader_get_event_fd();
|
||||||
::td::unsubscribe(fd.get_poll_info().get_pollable_fd_ref());
|
Scheduler::unsubscribe(fd.get_poll_info().get_pollable_fd_ref());
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -634,7 +634,7 @@ class CliClient final : public Actor {
|
|||||||
close_flag_ = true;
|
close_flag_ = true;
|
||||||
dump_memory_usage();
|
dump_memory_usage();
|
||||||
td_.reset();
|
td_.reset();
|
||||||
unsubscribe(stdin_.get_poll_info().get_pollable_fd_ref());
|
Scheduler::unsubscribe(stdin_.get_poll_info().get_pollable_fd_ref());
|
||||||
is_stdin_reader_stopped_ = true;
|
is_stdin_reader_stopped_ = true;
|
||||||
yield();
|
yield();
|
||||||
}
|
}
|
||||||
@ -748,7 +748,7 @@ class CliClient final : public Actor {
|
|||||||
rl_attempted_completion_function = tg_cli_completion;
|
rl_attempted_completion_function = tg_cli_completion;
|
||||||
reactivate_readline();
|
reactivate_readline();
|
||||||
#endif
|
#endif
|
||||||
subscribe(stdin_.get_poll_info().extract_pollable_fd(this), PollFlags::Read());
|
Scheduler::subscribe(stdin_.get_poll_info().extract_pollable_fd(this), PollFlags::Read());
|
||||||
|
|
||||||
if (get_chat_list_) {
|
if (get_chat_list_) {
|
||||||
send_request(make_tl_object<td_api::getChats>(std::numeric_limits<int64>::max(), 0, 100));
|
send_request(make_tl_object<td_api::getChats>(std::numeric_limits<int64>::max(), 0, 100));
|
||||||
|
@ -100,7 +100,7 @@ class PingActor : public Actor {
|
|||||||
ActorShared<> parent_;
|
ActorShared<> parent_;
|
||||||
|
|
||||||
void start_up() override {
|
void start_up() override {
|
||||||
subscribe(ping_connection_->get_poll_info().extract_pollable_fd(this));
|
Scheduler::subscribe(ping_connection_->get_poll_info().extract_pollable_fd(this));
|
||||||
set_timeout_in(10);
|
set_timeout_in(10);
|
||||||
yield();
|
yield();
|
||||||
}
|
}
|
||||||
@ -137,7 +137,7 @@ class PingActor : public Actor {
|
|||||||
CHECK(!promise_);
|
CHECK(!promise_);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
unsubscribe(raw_connection->get_poll_info().get_pollable_fd_ref());
|
Scheduler::unsubscribe(raw_connection->get_poll_info().get_pollable_fd_ref());
|
||||||
if (promise_) {
|
if (promise_) {
|
||||||
if (status.is_error()) {
|
if (status.is_error()) {
|
||||||
if (raw_connection->stats_callback()) {
|
if (raw_connection->stats_callback()) {
|
||||||
|
@ -392,7 +392,7 @@ void Session::on_server_time_difference_updated() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
void Session::on_before_close() {
|
void Session::on_before_close() {
|
||||||
unsubscribe_before_close(current_info_->connection->get_poll_info().get_pollable_fd_ref());
|
Scheduler::unsubscribe_before_close(current_info_->connection->get_poll_info().get_pollable_fd_ref());
|
||||||
}
|
}
|
||||||
|
|
||||||
void Session::on_closed(Status status) {
|
void Session::on_closed(Status status) {
|
||||||
@ -955,7 +955,7 @@ void Session::connection_open_finish(ConnectionInfo *info,
|
|||||||
info->connection->set_online(connection_online_flag_);
|
info->connection->set_online(connection_online_flag_);
|
||||||
}
|
}
|
||||||
info->connection->set_name(name);
|
info->connection->set_name(name);
|
||||||
subscribe(info->connection->get_poll_info().extract_pollable_fd(this));
|
Scheduler::subscribe(info->connection->get_poll_info().extract_pollable_fd(this));
|
||||||
info->mode = mode_;
|
info->mode = mode_;
|
||||||
info->state = ConnectionInfo::State::Ready;
|
info->state = ConnectionInfo::State::Ready;
|
||||||
info->created_at = Time::now_cached();
|
info->created_at = Time::now_cached();
|
||||||
|
@ -108,9 +108,9 @@ class Scheduler {
|
|||||||
}
|
}
|
||||||
void before_tail_send(const ActorId<> &actor_id);
|
void before_tail_send(const ActorId<> &actor_id);
|
||||||
|
|
||||||
void subscribe(PollableFd fd, PollFlags flags = PollFlags::ReadWrite());
|
static void subscribe(PollableFd fd, PollFlags flags = PollFlags::ReadWrite());
|
||||||
void unsubscribe(PollableFdRef fd);
|
static void unsubscribe(PollableFdRef fd);
|
||||||
void unsubscribe_before_close(PollableFdRef fd);
|
static void unsubscribe_before_close(PollableFdRef fd);
|
||||||
|
|
||||||
void yield_actor(Actor *actor);
|
void yield_actor(Actor *actor);
|
||||||
void stop_actor(Actor *actor);
|
void stop_actor(Actor *actor);
|
||||||
@ -240,10 +240,6 @@ class Scheduler {
|
|||||||
};
|
};
|
||||||
|
|
||||||
/*** Interface to current scheduler ***/
|
/*** Interface to current scheduler ***/
|
||||||
void subscribe(PollableFd fd, PollFlags flags = PollFlags::ReadWrite());
|
|
||||||
void unsubscribe(PollableFdRef fd);
|
|
||||||
void unsubscribe_before_close(PollableFdRef fd);
|
|
||||||
|
|
||||||
template <class ActorT, class... Args>
|
template <class ActorT, class... Args>
|
||||||
TD_WARN_UNUSED_RESULT ActorOwn<ActorT> create_actor(Slice name, Args &&... args);
|
TD_WARN_UNUSED_RESULT ActorOwn<ActorT> create_actor(Slice name, Args &&... args);
|
||||||
template <class ActorT, class... Args>
|
template <class ActorT, class... Args>
|
||||||
|
@ -58,7 +58,7 @@ void Scheduler::ServiceActor::start_up() {
|
|||||||
}
|
}
|
||||||
#if !TD_PORT_WINDOWS
|
#if !TD_PORT_WINDOWS
|
||||||
auto &fd = inbound_->reader_get_event_fd();
|
auto &fd = inbound_->reader_get_event_fd();
|
||||||
::td::subscribe(fd.get_poll_info().extract_pollable_fd(this), PollFlags::Read());
|
Scheduler::subscribe(fd.get_poll_info().extract_pollable_fd(this), PollFlags::Read());
|
||||||
subscribed_ = true;
|
subscribed_ = true;
|
||||||
#endif
|
#endif
|
||||||
yield();
|
yield();
|
||||||
@ -100,7 +100,7 @@ void Scheduler::ServiceActor::tear_down() {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
auto &fd = inbound_->reader_get_event_fd();
|
auto &fd = inbound_->reader_get_event_fd();
|
||||||
::td::unsubscribe(fd.get_poll_info().get_pollable_fd_ref());
|
Scheduler::unsubscribe(fd.get_poll_info().get_pollable_fd_ref());
|
||||||
subscribed_ = false;
|
subscribed_ = false;
|
||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
|
@ -259,15 +259,15 @@ void Scheduler::send(ActorRef actor_ref, Event &&event) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
inline void Scheduler::subscribe(PollableFd fd, PollFlags flags) {
|
inline void Scheduler::subscribe(PollableFd fd, PollFlags flags) {
|
||||||
poll_.subscribe(std::move(fd), flags);
|
instance()->poll_.subscribe(std::move(fd), flags);
|
||||||
}
|
}
|
||||||
|
|
||||||
inline void Scheduler::unsubscribe(PollableFdRef fd) {
|
inline void Scheduler::unsubscribe(PollableFdRef fd) {
|
||||||
poll_.unsubscribe(std::move(fd));
|
instance()->poll_.unsubscribe(std::move(fd));
|
||||||
}
|
}
|
||||||
|
|
||||||
inline void Scheduler::unsubscribe_before_close(PollableFdRef fd) {
|
inline void Scheduler::unsubscribe_before_close(PollableFdRef fd) {
|
||||||
poll_.unsubscribe_before_close(std::move(fd));
|
instance()->poll_.unsubscribe_before_close(std::move(fd));
|
||||||
}
|
}
|
||||||
|
|
||||||
inline void Scheduler::yield_actor(Actor *actor) {
|
inline void Scheduler::yield_actor(Actor *actor) {
|
||||||
@ -357,18 +357,6 @@ inline void Scheduler::run(double timeout) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/*** Interface to current scheduler ***/
|
/*** Interface to current scheduler ***/
|
||||||
inline void subscribe(PollableFd fd, PollFlags flags) {
|
|
||||||
Scheduler::instance()->subscribe(std::move(fd), flags);
|
|
||||||
}
|
|
||||||
|
|
||||||
inline void unsubscribe(PollableFdRef fd) {
|
|
||||||
Scheduler::instance()->unsubscribe(std::move(fd));
|
|
||||||
}
|
|
||||||
|
|
||||||
inline void unsubscribe_before_close(PollableFdRef fd) {
|
|
||||||
Scheduler::instance()->unsubscribe_before_close(std::move(fd));
|
|
||||||
}
|
|
||||||
|
|
||||||
template <class ActorT, class... Args>
|
template <class ActorT, class... Args>
|
||||||
ActorOwn<ActorT> create_actor(Slice name, Args &&... args) {
|
ActorOwn<ActorT> create_actor(Slice name, Args &&... args) {
|
||||||
return Scheduler::instance()->create_actor<ActorT>(name, std::forward<Args>(args)...);
|
return Scheduler::instance()->create_actor<ActorT>(name, std::forward<Args>(args)...);
|
||||||
|
@ -41,7 +41,7 @@ void HttpConnectionBase::live_event() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
void HttpConnectionBase::start_up() {
|
void HttpConnectionBase::start_up() {
|
||||||
subscribe(fd_.get_poll_info().extract_pollable_fd(this));
|
Scheduler::subscribe(fd_.get_poll_info().extract_pollable_fd(this));
|
||||||
reader_.init(read_sink_.get_output(), max_post_size_, max_files_);
|
reader_.init(read_sink_.get_output(), max_post_size_, max_files_);
|
||||||
if (state_ == State::Read) {
|
if (state_ == State::Read) {
|
||||||
current_query_ = make_unique<HttpQuery>();
|
current_query_ = make_unique<HttpQuery>();
|
||||||
@ -50,7 +50,7 @@ void HttpConnectionBase::start_up() {
|
|||||||
yield();
|
yield();
|
||||||
}
|
}
|
||||||
void HttpConnectionBase::tear_down() {
|
void HttpConnectionBase::tear_down() {
|
||||||
unsubscribe_before_close(fd_.get_poll_info().get_pollable_fd_ref());
|
Scheduler::unsubscribe_before_close(fd_.get_poll_info().get_pollable_fd_ref());
|
||||||
fd_.close();
|
fd_.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -10,7 +10,7 @@
|
|||||||
#include "td/utils/port/detail/PollableFd.h"
|
#include "td/utils/port/detail/PollableFd.h"
|
||||||
|
|
||||||
namespace td {
|
namespace td {
|
||||||
// TcpListener implementation
|
|
||||||
TcpListener::TcpListener(int port, ActorShared<Callback> callback) : port_(port), callback_(std::move(callback)) {
|
TcpListener::TcpListener(int port, ActorShared<Callback> callback) : port_(port), callback_(std::move(callback)) {
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -26,13 +26,13 @@ void TcpListener::start_up() {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
server_fd_ = r_socket.move_as_ok();
|
server_fd_ = r_socket.move_as_ok();
|
||||||
subscribe(server_fd_.get_poll_info().extract_pollable_fd(this));
|
Scheduler::subscribe(server_fd_.get_poll_info().extract_pollable_fd(this));
|
||||||
}
|
}
|
||||||
|
|
||||||
void TcpListener::tear_down() {
|
void TcpListener::tear_down() {
|
||||||
LOG(ERROR) << "TcpListener closed";
|
LOG(ERROR) << "TcpListener closed";
|
||||||
if (!server_fd_.empty()) {
|
if (!server_fd_.empty()) {
|
||||||
unsubscribe_before_close(server_fd_.get_poll_info().get_pollable_fd_ref());
|
Scheduler::unsubscribe_before_close(server_fd_.get_poll_info().get_pollable_fd_ref());
|
||||||
server_fd_.close();
|
server_fd_.close();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -35,7 +35,7 @@ void TransparentProxy::on_error(Status status) {
|
|||||||
|
|
||||||
void TransparentProxy::tear_down() {
|
void TransparentProxy::tear_down() {
|
||||||
VLOG(proxy) << "Finish to connect to proxy";
|
VLOG(proxy) << "Finish to connect to proxy";
|
||||||
unsubscribe(fd_.get_poll_info().get_pollable_fd_ref());
|
Scheduler::unsubscribe(fd_.get_poll_info().get_pollable_fd_ref());
|
||||||
if (callback_) {
|
if (callback_) {
|
||||||
if (!fd_.input_buffer().empty()) {
|
if (!fd_.input_buffer().empty()) {
|
||||||
LOG(ERROR) << "Have " << fd_.input_buffer().size() << " unread bytes";
|
LOG(ERROR) << "Have " << fd_.input_buffer().size() << " unread bytes";
|
||||||
@ -53,7 +53,7 @@ void TransparentProxy::hangup() {
|
|||||||
|
|
||||||
void TransparentProxy::start_up() {
|
void TransparentProxy::start_up() {
|
||||||
VLOG(proxy) << "Begin to connect to proxy";
|
VLOG(proxy) << "Begin to connect to proxy";
|
||||||
subscribe(fd_.get_poll_info().extract_pollable_fd(this));
|
Scheduler::subscribe(fd_.get_poll_info().extract_pollable_fd(this));
|
||||||
set_timeout_in(10);
|
set_timeout_in(10);
|
||||||
if (can_write(fd_)) {
|
if (can_write(fd_)) {
|
||||||
loop();
|
loop();
|
||||||
|
@ -95,12 +95,12 @@ class TestPingActor : public Actor {
|
|||||||
mtproto::TransportType{mtproto::TransportType::Tcp, 0, ""}, nullptr),
|
mtproto::TransportType{mtproto::TransportType::Tcp, 0, ""}, nullptr),
|
||||||
3);
|
3);
|
||||||
|
|
||||||
subscribe(ping_connection_->get_poll_info().extract_pollable_fd(this));
|
Scheduler::subscribe(ping_connection_->get_poll_info().extract_pollable_fd(this));
|
||||||
set_timeout_in(10);
|
set_timeout_in(10);
|
||||||
yield();
|
yield();
|
||||||
}
|
}
|
||||||
void tear_down() override {
|
void tear_down() override {
|
||||||
unsubscribe_before_close(ping_connection_->get_poll_info().get_pollable_fd_ref());
|
Scheduler::unsubscribe_before_close(ping_connection_->get_poll_info().get_pollable_fd_ref());
|
||||||
ping_connection_->close();
|
ping_connection_->close();
|
||||||
Scheduler::instance()->finish();
|
Scheduler::instance()->finish();
|
||||||
}
|
}
|
||||||
@ -226,8 +226,8 @@ class HandshakeTestActor : public Actor {
|
|||||||
|
|
||||||
wait_for_result_ = true;
|
wait_for_result_ = true;
|
||||||
create_actor<HandshakeActor>(
|
create_actor<HandshakeActor>(
|
||||||
"HandshakeActor", std::move(handshake_), std::move(raw_connection_), std::make_unique<HandshakeContext>(), 10.0,
|
"HandshakeActor", std::move(handshake_), std::move(raw_connection_), std::make_unique<HandshakeContext>(),
|
||||||
PromiseCreator::lambda([self = actor_id(this)](Result<std::unique_ptr<RawConnection>> raw_connection) {
|
10.0, PromiseCreator::lambda([self = actor_id(this)](Result<std::unique_ptr<RawConnection>> raw_connection) {
|
||||||
send_closure(self, &HandshakeTestActor::got_connection, std::move(raw_connection), 1);
|
send_closure(self, &HandshakeTestActor::got_connection, std::move(raw_connection), 1);
|
||||||
}),
|
}),
|
||||||
PromiseCreator::lambda([self = actor_id(this)](Result<std::unique_ptr<AuthKeyHandshake>> handshake) {
|
PromiseCreator::lambda([self = actor_id(this)](Result<std::unique_ptr<AuthKeyHandshake>> handshake) {
|
||||||
|
Reference in New Issue
Block a user