Almost compiles
GitOrigin-RevId: aad536022caddba0446a761e7ab1f3b4ac64f53b
This commit is contained in:
parent
093651fb22
commit
8a28e4b461
@ -39,8 +39,7 @@ class HelloWorld : public Actor {
|
||||
size_t write_pos_{0};
|
||||
|
||||
void start_up() override {
|
||||
socket_fd_.get_fd().set_observer(this);
|
||||
subscribe(socket_fd_.get_fd());
|
||||
subscribe(socket_fd_.get_poll_info().extract_pollable_fd(this));
|
||||
HttpHeaderCreator hc;
|
||||
Slice content = "hello world";
|
||||
//auto content = BufferSlice("hello world");
|
||||
@ -56,7 +55,7 @@ class HelloWorld : public Actor {
|
||||
void loop() override {
|
||||
auto status = do_loop();
|
||||
if (status.is_error()) {
|
||||
unsubscribe(socket_fd_.get_fd());
|
||||
unsubscribe(socket_fd_.get_poll_info().get_pollable_fd_ref());
|
||||
stop();
|
||||
LOG(ERROR) << "CLOSE: " << status;
|
||||
}
|
||||
|
@ -31,8 +31,7 @@ class HttpEchoConnection : public Actor {
|
||||
HttpReader reader_;
|
||||
HttpQuery query_;
|
||||
void start_up() override {
|
||||
fd_.get_fd().set_observer(this);
|
||||
subscribe(fd_.get_fd());
|
||||
subscribe(fd_.get_poll_info().extract_pollable_fd(this));
|
||||
reader_.init(&fd_.input_buffer(), 1024 * 1024, 0);
|
||||
}
|
||||
|
||||
|
@ -32,8 +32,7 @@ void HandshakeActor::close() {
|
||||
}
|
||||
|
||||
void HandshakeActor::start_up() {
|
||||
connection_->get_pollable().set_observer(this);
|
||||
subscribe(connection_->get_pollable());
|
||||
subscribe(connection_->get_poll_info().extract_pollable_fd(this));
|
||||
set_timeout_in(timeout_);
|
||||
yield();
|
||||
}
|
||||
@ -59,8 +58,7 @@ void HandshakeActor::return_connection(Status status) {
|
||||
if (status.is_error()) {
|
||||
status = Status::Error(status.code(), PSLICE() << status.message() << " : " << raw_connection->debug_str_);
|
||||
}
|
||||
unsubscribe(raw_connection->get_pollable());
|
||||
raw_connection->get_pollable().set_observer(nullptr);
|
||||
unsubscribe(raw_connection->get_poll_info().get_pollable_fd_ref());
|
||||
if (raw_connection_promise_) {
|
||||
if (status.is_error()) {
|
||||
if (raw_connection->stats_callback()) {
|
||||
|
@ -32,8 +32,8 @@ class HandshakeConnection
|
||||
handshake_->resume(this);
|
||||
}
|
||||
|
||||
Fd &get_pollable() {
|
||||
return raw_connection_->get_pollable();
|
||||
PollableFdInfo &get_poll_info() {
|
||||
return raw_connection_->get_poll_info();
|
||||
}
|
||||
|
||||
std::unique_ptr<RawConnection> move_as_raw_connection() {
|
||||
|
@ -28,8 +28,8 @@ class PingConnection : private RawConnection::Callback {
|
||||
: raw_connection_(std::move(raw_connection)), ping_count_(ping_count) {
|
||||
}
|
||||
|
||||
Fd &get_pollable() {
|
||||
return raw_connection_->get_pollable();
|
||||
PollableFdInfo &get_poll_info() {
|
||||
return raw_connection_->get_poll_info();
|
||||
}
|
||||
|
||||
std::unique_ptr<RawConnection> move_as_raw_connection() {
|
||||
|
@ -63,8 +63,8 @@ class RawConnection {
|
||||
uint64 quick_ack_token = 0);
|
||||
uint64 send_no_crypto(const Storer &storer);
|
||||
|
||||
Fd &get_pollable() {
|
||||
return socket_fd_.get_fd();
|
||||
PollableFdInfo &get_poll_info() {
|
||||
return socket_fd_.get_poll_info();
|
||||
}
|
||||
StatsCallback *stats_callback() {
|
||||
return stats_callback_.get();
|
||||
|
@ -667,8 +667,8 @@ SessionConnection::SessionConnection(Mode mode, std::unique_ptr<RawConnection> r
|
||||
created_at_ = Time::now();
|
||||
}
|
||||
|
||||
Fd &SessionConnection::get_pollable() {
|
||||
return raw_connection_->get_pollable();
|
||||
PollableFdInfo &SessionConnection::get_poll_info() {
|
||||
return raw_connection_->get_poll_info();
|
||||
}
|
||||
|
||||
Status SessionConnection::init() {
|
||||
|
@ -69,7 +69,7 @@ class SessionConnection
|
||||
SessionConnection(Mode mode, std::unique_ptr<RawConnection> raw_connection, AuthData *auth_data,
|
||||
DhCallback *dh_callback);
|
||||
|
||||
Fd &get_pollable();
|
||||
PollableFdInfo &get_poll_info();
|
||||
|
||||
// Interface
|
||||
Result<uint64> TD_WARN_UNUSED_RESULT send_query(BufferSlice buffer, bool gzip_flag, int64 message_id = 0,
|
||||
|
@ -127,8 +127,7 @@ class TdProxy : public Actor {
|
||||
|
||||
void start_up() override {
|
||||
auto &fd = input_queue_->reader_get_event_fd();
|
||||
fd.get_fd().set_observer(this);
|
||||
::td::subscribe(fd.get_fd(), Fd::Read);
|
||||
::td::subscribe(fd.get_poll_info().extract_pollable_fd(this), PollFlags::Read());
|
||||
|
||||
class Callback : public TdCallback {
|
||||
public:
|
||||
@ -190,8 +189,7 @@ class TdProxy : public Actor {
|
||||
|
||||
void tear_down() override {
|
||||
auto &fd = input_queue_->reader_get_event_fd();
|
||||
::td::unsubscribe(fd.get_fd());
|
||||
fd.get_fd().set_observer(nullptr);
|
||||
::td::unsubscribe(fd.get_poll_info().get_pollable_fd_ref());
|
||||
}
|
||||
};
|
||||
|
||||
@ -227,6 +225,8 @@ class Client::Impl final {
|
||||
~Impl() {
|
||||
input_queue_->writer_put({0, nullptr});
|
||||
scheduler_thread_.join();
|
||||
auto &event_fd = output_queue_->reader_get_event_fd();
|
||||
poll_.unsubscribe(event_fd.get_poll_info().get_pollable_fd_ref());
|
||||
}
|
||||
|
||||
private:
|
||||
@ -256,7 +256,7 @@ class Client::Impl final {
|
||||
|
||||
poll_.init();
|
||||
auto &event_fd = output_queue_->reader_get_event_fd();
|
||||
poll_.subscribe(event_fd.get_fd(), Fd::Read);
|
||||
poll_.subscribe(event_fd.get_poll_info().extract_pollable_fd(nullptr), PollFlags::Read());
|
||||
}
|
||||
|
||||
Response receive_unlocked(double timeout) {
|
||||
|
@ -264,7 +264,7 @@ class CliClient final : public Actor {
|
||||
}
|
||||
|
||||
void update_users(const td_api::users &users) {
|
||||
Logger log{*log_interface, VERBOSITY_NAME(PLAIN)};
|
||||
Logger log{*log_interface, LogOptions::plain(), VERBOSITY_NAME(PLAIN)};
|
||||
for (auto &user_id : users.user_ids_) {
|
||||
if (user_id == 0) {
|
||||
continue;
|
||||
|
@ -85,7 +85,7 @@ Status FileHashUploader::loop_sha() {
|
||||
}
|
||||
resource_state_.start_use(limit);
|
||||
|
||||
fd_.update_flags(Fd::Flag::Read);
|
||||
fd_.get_poll_info().add_flags(PollFlags::Read());
|
||||
TRY_RESULT(read_size, fd_.flush_read(static_cast<size_t>(limit)));
|
||||
if (read_size != static_cast<size_t>(limit)) {
|
||||
return Status::Error("unexpected end of file");
|
||||
|
@ -100,8 +100,7 @@ class PingActor : public Actor {
|
||||
ActorShared<> parent_;
|
||||
|
||||
void start_up() override {
|
||||
ping_connection_->get_pollable().set_observer(this);
|
||||
subscribe(ping_connection_->get_pollable());
|
||||
subscribe(ping_connection_->get_poll_info().extract_pollable_fd(this));
|
||||
set_timeout_in(10);
|
||||
yield();
|
||||
}
|
||||
@ -138,8 +137,7 @@ class PingActor : public Actor {
|
||||
CHECK(!promise_);
|
||||
return;
|
||||
}
|
||||
unsubscribe(raw_connection->get_pollable());
|
||||
raw_connection->get_pollable().set_observer(nullptr);
|
||||
unsubscribe(raw_connection->get_poll_info().get_pollable_fd_ref());
|
||||
if (promise_) {
|
||||
if (status.is_error()) {
|
||||
if (raw_connection->stats_callback()) {
|
||||
|
@ -392,7 +392,7 @@ void Session::on_server_time_difference_updated() {
|
||||
}
|
||||
|
||||
void Session::on_before_close() {
|
||||
unsubscribe_before_close(current_info_->connection->get_pollable());
|
||||
unsubscribe_before_close(current_info_->connection->get_poll_info().get_pollable_fd_ref());
|
||||
}
|
||||
|
||||
void Session::on_closed(Status status) {
|
||||
@ -955,8 +955,7 @@ void Session::connection_open_finish(ConnectionInfo *info,
|
||||
info->connection->set_online(connection_online_flag_);
|
||||
}
|
||||
info->connection->set_name(name);
|
||||
info->connection->get_pollable().set_observer(this);
|
||||
subscribe(info->connection->get_pollable());
|
||||
subscribe(info->connection->get_poll_info().extract_pollable_fd(this));
|
||||
info->mode = mode_;
|
||||
info->state = ConnectionInfo::State::Ready;
|
||||
info->created_at = Time::now_cached();
|
||||
|
@ -107,9 +107,9 @@ class Scheduler {
|
||||
}
|
||||
void before_tail_send(const ActorId<> &actor_id);
|
||||
|
||||
void subscribe(const Fd &fd, Fd::Flags flags = Fd::Write | Fd::Read);
|
||||
void unsubscribe(const Fd &fd);
|
||||
void unsubscribe_before_close(const Fd &fd);
|
||||
void subscribe(PollableFd fd, PollFlags flags = PollFlags::ReadWrite());
|
||||
void unsubscribe(PollableFdRef fd);
|
||||
void unsubscribe_before_close(PollableFdRef fd);
|
||||
|
||||
void yield_actor(Actor *actor);
|
||||
void stop_actor(Actor *actor);
|
||||
@ -239,9 +239,9 @@ class Scheduler {
|
||||
};
|
||||
|
||||
/*** Interface to current scheduler ***/
|
||||
void subscribe(const Fd &fd, Fd::Flags flags = Fd::Write | Fd::Read);
|
||||
void unsubscribe(const Fd &fd);
|
||||
void unsubscribe_before_close(const Fd &fd);
|
||||
void subscribe(PollableFd fd, PollFlags flags = PollFlags::ReadWrite());
|
||||
void unsubscribe(PollableFdRef fd);
|
||||
void unsubscribe_before_close(PollableFdRef fd);
|
||||
|
||||
template <class ActorT, class... Args>
|
||||
TD_WARN_UNUSED_RESULT ActorOwn<ActorT> create_actor(Slice name, Args &&... args);
|
||||
|
@ -58,8 +58,7 @@ void Scheduler::ServiceActor::start_up() {
|
||||
}
|
||||
auto &fd = inbound_->reader_get_event_fd();
|
||||
|
||||
fd.get_fd().set_observer(this);
|
||||
::td::subscribe(fd.get_fd(), Fd::Read);
|
||||
::td::subscribe(fd.get_poll_info().extract_pollable_fd(this), PollFlags::Read());
|
||||
yield();
|
||||
#endif
|
||||
}
|
||||
@ -184,7 +183,7 @@ void Scheduler::init(int32 id, std::vector<std::shared_ptr<MpscPollableQueue<Eve
|
||||
|
||||
#if !TD_THREAD_UNSUPPORTED && !TD_EVENTFD_UNSUPPORTED
|
||||
event_fd_.init();
|
||||
subscribe(event_fd_.get_fd(), Fd::Read);
|
||||
subscribe(event_fd_.get_poll_info().extract_pollable_fd(nullptr), PollFlags::Read());
|
||||
#endif
|
||||
|
||||
if (!outbound.empty()) {
|
||||
@ -426,7 +425,7 @@ void Scheduler::run_poll(double timeout) {
|
||||
poll_.run(static_cast<int32>(timeout * 1000 + 1));
|
||||
|
||||
#if !TD_THREAD_UNSUPPORTED && !TD_EVENTFD_UNSUPPORTED
|
||||
if (can_read(event_fd_.get_fd())) {
|
||||
if (event_fd_.get_poll_info().get_flags().can_read()) {
|
||||
std::atomic_thread_fence(std::memory_order_acquire);
|
||||
event_fd_.acquire();
|
||||
}
|
||||
|
@ -254,16 +254,16 @@ void Scheduler::send(ActorRef actor_ref, Event &&event) {
|
||||
[&]() { return std::move(event); });
|
||||
}
|
||||
|
||||
inline void Scheduler::subscribe(const Fd &fd, Fd::Flags flags) {
|
||||
poll_.subscribe(fd, flags);
|
||||
inline void Scheduler::subscribe(PollableFd fd, PollFlags flags) {
|
||||
poll_.subscribe(std::move(fd), flags);
|
||||
}
|
||||
|
||||
inline void Scheduler::unsubscribe(const Fd &fd) {
|
||||
poll_.unsubscribe(fd);
|
||||
inline void Scheduler::unsubscribe(PollableFdRef fd) {
|
||||
poll_.unsubscribe(std::move(fd));
|
||||
}
|
||||
|
||||
inline void Scheduler::unsubscribe_before_close(const Fd &fd) {
|
||||
poll_.unsubscribe_before_close(fd);
|
||||
inline void Scheduler::unsubscribe_before_close(PollableFdRef fd) {
|
||||
poll_.unsubscribe_before_close(std::move(fd));
|
||||
}
|
||||
|
||||
inline void Scheduler::yield_actor(Actor *actor) {
|
||||
@ -353,16 +353,16 @@ inline void Scheduler::run(double timeout) {
|
||||
}
|
||||
|
||||
/*** Interface to current scheduler ***/
|
||||
inline void subscribe(const Fd &fd, Fd::Flags flags) {
|
||||
Scheduler::instance()->subscribe(fd, flags);
|
||||
inline void subscribe(PollableFd fd, PollFlags flags) {
|
||||
Scheduler::instance()->subscribe(std::move(fd), flags);
|
||||
}
|
||||
|
||||
inline void unsubscribe(const Fd &fd) {
|
||||
Scheduler::instance()->unsubscribe(fd);
|
||||
inline void unsubscribe(PollableFdRef fd) {
|
||||
Scheduler::instance()->unsubscribe(std::move(fd));
|
||||
}
|
||||
|
||||
inline void unsubscribe_before_close(const Fd &fd) {
|
||||
Scheduler::instance()->unsubscribe_before_close(fd);
|
||||
inline void unsubscribe_before_close(PollableFdRef fd) {
|
||||
Scheduler::instance()->unsubscribe_before_close(std::move(fd));
|
||||
}
|
||||
|
||||
template <class ActorT, class... Args>
|
||||
|
@ -281,7 +281,7 @@ class OpenClose final : public Actor {
|
||||
CHECK(r_file_fd.is_ok()) << r_file_fd.error();
|
||||
auto file_fd = r_file_fd.move_as_ok();
|
||||
// LOG(ERROR) << file_fd.get_native_fd();
|
||||
file_fd.get_fd().set_observer(observer);
|
||||
file_fd.get_poll_info().extract_pollable_fd(observer);
|
||||
file_fd.close();
|
||||
cnt_--;
|
||||
yield();
|
||||
|
@ -483,7 +483,7 @@ Status Binlog::load_binlog(const Callback &callback, const Callback &debug_callb
|
||||
|
||||
update_read_encryption();
|
||||
|
||||
fd_.update_flags(Fd::Flag::Read);
|
||||
fd_.get_poll_info().add_flags(PollFlags::Read());
|
||||
info_.wrong_password = false;
|
||||
while (true) {
|
||||
BinlogEvent event;
|
||||
|
@ -41,8 +41,7 @@ void HttpConnectionBase::live_event() {
|
||||
}
|
||||
|
||||
void HttpConnectionBase::start_up() {
|
||||
fd_.get_fd().set_observer(this);
|
||||
subscribe(fd_.get_fd());
|
||||
subscribe(fd_.get_poll_info().extract_pollable_fd(this));
|
||||
reader_.init(read_sink_.get_output(), max_post_size_, max_files_);
|
||||
if (state_ == State::Read) {
|
||||
current_query_ = make_unique<HttpQuery>();
|
||||
@ -51,7 +50,7 @@ void HttpConnectionBase::start_up() {
|
||||
yield();
|
||||
}
|
||||
void HttpConnectionBase::tear_down() {
|
||||
unsubscribe_before_close(fd_.get_fd());
|
||||
unsubscribe_before_close(fd_.get_poll_info().get_pollable_fd_ref());
|
||||
fd_.close();
|
||||
}
|
||||
|
||||
@ -141,7 +140,7 @@ void HttpConnectionBase::loop() {
|
||||
}
|
||||
|
||||
Status pending_error;
|
||||
if (fd_.get_fd().has_pending_error()) {
|
||||
if (fd_.get_poll_info().get_flags().has_pending_error()) {
|
||||
pending_error = fd_.get_pending_error();
|
||||
}
|
||||
if (pending_error.is_ok() && write_sink_.status().is_error()) {
|
||||
|
@ -26,14 +26,13 @@ void TcpListener::start_up() {
|
||||
return;
|
||||
}
|
||||
server_fd_ = r_socket.move_as_ok();
|
||||
server_fd_.get_fd().set_observer(this);
|
||||
subscribe(server_fd_.get_fd());
|
||||
subscribe(server_fd_.get_poll_info().extract_pollable_fd(this));
|
||||
}
|
||||
|
||||
void TcpListener::tear_down() {
|
||||
LOG(ERROR) << "TcpListener closed";
|
||||
if (!server_fd_.empty()) {
|
||||
unsubscribe_before_close(server_fd_.get_fd());
|
||||
unsubscribe_before_close(server_fd_.get_poll_info().get_pollable_fd_ref());
|
||||
server_fd_.close();
|
||||
}
|
||||
}
|
||||
|
@ -35,8 +35,7 @@ void TransparentProxy::on_error(Status status) {
|
||||
|
||||
void TransparentProxy::tear_down() {
|
||||
VLOG(proxy) << "Finish to connect to proxy";
|
||||
unsubscribe(fd_.get_fd());
|
||||
fd_.get_fd().set_observer(nullptr);
|
||||
unsubscribe(fd_.get_poll_info().get_pollable_fd_ref());
|
||||
if (callback_) {
|
||||
if (!fd_.input_buffer().empty()) {
|
||||
LOG(ERROR) << "Have " << fd_.input_buffer().size() << " unread bytes";
|
||||
@ -54,8 +53,7 @@ void TransparentProxy::hangup() {
|
||||
|
||||
void TransparentProxy::start_up() {
|
||||
VLOG(proxy) << "Begin to connect to proxy";
|
||||
fd_.get_fd().set_observer(this);
|
||||
subscribe(fd_.get_fd());
|
||||
subscribe(fd_.get_poll_info().extract_pollable_fd(this));
|
||||
set_timeout_in(10);
|
||||
if (can_write(fd_)) {
|
||||
loop();
|
||||
|
@ -9,7 +9,7 @@
|
||||
#include "td/utils/buffer.h"
|
||||
#include "td/utils/format.h"
|
||||
#include "td/utils/logging.h"
|
||||
#include "td/utils/port/Fd.h"
|
||||
#include "td/utils/port/detail/PollableFd.h"
|
||||
#include "td/utils/Slice.h"
|
||||
#include "td/utils/Status.h"
|
||||
|
||||
|
@ -107,10 +107,10 @@ class DelayedClosure {
|
||||
explicit DelayedClosure(FunctionT func, ArgsT... args) : args(func, std::forward<ArgsT>(args)...) {
|
||||
}
|
||||
|
||||
//template <class F>
|
||||
//void for_each(const F &f) {
|
||||
//tuple_for_each(args, f);
|
||||
//}
|
||||
template <class F>
|
||||
void for_each(const F &f) {
|
||||
tuple_for_each(args, f);
|
||||
}
|
||||
|
||||
private:
|
||||
using ArgsStorageT = std::tuple<FunctionT, typename std::decay<ArgsT>::type...>;
|
||||
|
@ -86,7 +86,7 @@ class MpscPollableQueue {
|
||||
while ((res = reader_wait_nonblock()) == 0) {
|
||||
// TODO: reader_flush?
|
||||
pollfd fd;
|
||||
fd.fd = reader_get_event_fd().get_fd().get_native_fd();
|
||||
fd.fd = reader_get_event_fd().get_poll_info().native_fd().fd();
|
||||
fd.events = POLLIN;
|
||||
poll(&fd, 1, -1);
|
||||
}
|
||||
|
@ -401,7 +401,7 @@ class PollQueue : public QueueT {
|
||||
while ((res = reader_wait_nonblock()) == 0) {
|
||||
// TODO: reader_flush?
|
||||
pollfd fd;
|
||||
fd.fd = reader_get_event_fd().get_fd().get_native_fd();
|
||||
fd.fd = reader_get_event_fd().get_poll_info().native_fd().fd();
|
||||
fd.events = POLLIN;
|
||||
poll(&fd, 1, -1);
|
||||
}
|
||||
|
@ -285,7 +285,7 @@ TEST(Http, aes_file_encryption) {
|
||||
source >> aes_encode >> sink;
|
||||
fd.set_input_writer(&input_writer);
|
||||
|
||||
fd.update_flags(Fd::Flag::Read);
|
||||
fd.get_poll_info().add_flags(PollFlags::Read());
|
||||
while (can_read(fd)) {
|
||||
fd.flush_read(4096).ensure();
|
||||
source.wakeup();
|
||||
|
@ -95,13 +95,12 @@ class TestPingActor : public Actor {
|
||||
mtproto::TransportType{mtproto::TransportType::Tcp, 0, ""}, nullptr),
|
||||
3);
|
||||
|
||||
ping_connection_->get_pollable().set_observer(this);
|
||||
subscribe(ping_connection_->get_pollable());
|
||||
subscribe(ping_connection_->get_poll_info().extract_pollable_fd(this));
|
||||
set_timeout_in(10);
|
||||
yield();
|
||||
}
|
||||
void tear_down() override {
|
||||
unsubscribe_before_close(ping_connection_->get_pollable());
|
||||
unsubscribe_before_close(ping_connection_->get_poll_info().get_pollable_fd_ref());
|
||||
ping_connection_->close();
|
||||
Scheduler::instance()->finish();
|
||||
}
|
||||
|
@ -41,16 +41,6 @@ static void check_td_error(T &result) {
|
||||
CHECK(result->get_id() != td_api::error::ID) << to_string(result);
|
||||
}
|
||||
|
||||
static void rmrf(CSlice path) {
|
||||
td::walk_path(path, [](CSlice path, bool is_dir) {
|
||||
if (is_dir) {
|
||||
td::rmdir(path).ignore();
|
||||
} else {
|
||||
td::unlink(path).ignore();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
class TestClient : public Actor {
|
||||
public:
|
||||
explicit TestClient(string name) : name_(std::move(name)) {
|
||||
@ -143,7 +133,7 @@ class TestClient : public Actor {
|
||||
}
|
||||
|
||||
void start_up() override {
|
||||
rmrf(name_);
|
||||
rmrf(name_).ignore();
|
||||
set_context(std::make_shared<td::ActorContext>());
|
||||
set_tag(name_);
|
||||
LOG(INFO) << "START UP!";
|
||||
|
Reference in New Issue
Block a user