diff --git a/tdactor/td/actor/impl/ConcurrentScheduler.cpp b/tdactor/td/actor/impl/ConcurrentScheduler.cpp index 0be47cd8f..98fd36c1e 100644 --- a/tdactor/td/actor/impl/ConcurrentScheduler.cpp +++ b/tdactor/td/actor/impl/ConcurrentScheduler.cpp @@ -56,7 +56,7 @@ void ConcurrentScheduler::init(int32 threads_n) { } #if TD_PORT_WINDOWS - iocp_ = std::make_unique(); + iocp_ = std::make_unique(); iocp_->init(); #endif @@ -90,7 +90,10 @@ void ConcurrentScheduler::start() { } #endif #if TD_PORT_WINDOWS - iocp_thread_ = td::thread([&iocp_] { iocp_->loop(); }); + iocp_thread_ = td::thread([this] { + auto guard = this->get_send_guard(); + this->iocp_->loop(); + }); #endif state_ = State::Run; diff --git a/tdactor/td/actor/impl/ConcurrentScheduler.h b/tdactor/td/actor/impl/ConcurrentScheduler.h index c8e186956..52c6c5ab4 100644 --- a/tdactor/td/actor/impl/ConcurrentScheduler.h +++ b/tdactor/td/actor/impl/ConcurrentScheduler.h @@ -41,7 +41,7 @@ class ConcurrentScheduler : private Scheduler::Callback { } SchedulerGuard get_send_guard() { - return schedulers_[0]->get_const_guard(); + return schedulers_.back()->get_const_guard(); } void test_one_thread_run(); @@ -87,7 +87,7 @@ class ConcurrentScheduler : private Scheduler::Callback { std::vector threads_; #endif #if TD_PORT_WINDOWS - std::unique_ptr iocp_; + std::unique_ptr iocp_; td::thread iocp_thread_; #endif diff --git a/tdactor/td/actor/impl/Scheduler.cpp b/tdactor/td/actor/impl/Scheduler.cpp index 5df2fca7f..c04a46f1b 100644 --- a/tdactor/td/actor/impl/Scheduler.cpp +++ b/tdactor/td/actor/impl/Scheduler.cpp @@ -56,9 +56,11 @@ void Scheduler::ServiceActor::start_up() { if (!inbound_) { return; } +#if !TD_PORT_WINDOWS auto &fd = inbound_->reader_get_event_fd(); ::td::subscribe(fd.get_poll_info().extract_pollable_fd(this), PollFlags::Read()); subscribed_ = true; +#endif yield(); #endif } @@ -440,6 +442,7 @@ void Scheduler::run_poll(double timeout) { #if TD_PORT_WINDOWS CHECK(inbound_queue_); inbound_queue_->reader_get_event_fd().wait(timeout_ms); + service_actor_.notify(); #elif TD_PORT_POSIX poll_.run(timeout_ms); #endif diff --git a/tdactor/test/actors_simple.cpp b/tdactor/test/actors_simple.cpp index 4fc7d8084..121df540c 100644 --- a/tdactor/test/actors_simple.cpp +++ b/tdactor/test/actors_simple.cpp @@ -36,7 +36,7 @@ TEST(Actors, SendLater) { SET_VERBOSITY_LEVEL(VERBOSITY_NAME(ERROR)); sb.clear(); Scheduler scheduler; - scheduler.init(); + scheduler.init(0, {std::make_shared>()}, nullptr); auto guard = scheduler.get_guard(); class Worker : public Actor { @@ -93,7 +93,7 @@ class XReceiver final : public Actor { TEST(Actors, simple_pass_event_arguments) { SET_VERBOSITY_LEVEL(VERBOSITY_NAME(ERROR)); Scheduler scheduler; - scheduler.init(); + scheduler.init(0, {std::make_shared>()}, nullptr); auto guard = scheduler.get_guard(); auto id = create_actor("XR").release(); @@ -200,7 +200,7 @@ class PrintChar final : public Actor { TEST(Actors, simple_hand_yield) { SET_VERBOSITY_LEVEL(VERBOSITY_NAME(ERROR)); Scheduler scheduler; - scheduler.init(); + scheduler.init(0, {std::make_shared>()}, nullptr); sb.clear(); int cnt = 1000; { @@ -354,7 +354,7 @@ class MasterActor : public MsgActor { TEST(Actors, call_after_destruct) { SET_VERBOSITY_LEVEL(VERBOSITY_NAME(ERROR)); Scheduler scheduler; - scheduler.init(); + scheduler.init(0, {std::make_shared>()}, nullptr); { auto guard = scheduler.get_guard(); create_actor("Master").release(); diff --git a/tddb/td/db/binlog/Binlog.cpp b/tddb/td/db/binlog/Binlog.cpp index 0b6cdc2e1..5c5b8fdc4 100644 --- a/tddb/td/db/binlog/Binlog.cpp +++ b/tddb/td/db/binlog/Binlog.cpp @@ -313,6 +313,8 @@ Status Binlog::destroy(Slice path) { } void Binlog::do_event(BinlogEvent &&event) { + auto event_size = event.raw_event_.size(); + if (state_ == State::Run || state_ == State::Reindex) { VLOG(binlog) << "Write binlog event: " << format::cond(state_ == State::Reindex, "[reindex] "); auto validate_status = event.validate(); @@ -389,7 +391,7 @@ void Binlog::do_event(BinlogEvent &&event) { } fd_events_++; - fd_size_ += event.raw_event_.size(); + fd_size_ += event_size; } void Binlog::sync() { diff --git a/tdutils/td/utils/logging.cpp b/tdutils/td/utils/logging.cpp index b2f5f266f..1ee6664e4 100644 --- a/tdutils/td/utils/logging.cpp +++ b/tdutils/td/utils/logging.cpp @@ -121,7 +121,13 @@ TsCerr::~TsCerr() { } namespace { FileFd &Stderr() { - static FileFd res = FileFd::from_native_fd(NativeFd(2, true)).move_as_ok(); + static FileFd res = FileFd::from_native_fd(NativeFd( +#if TD_PORT_POSIX + 2 +#elif TD_PORT_WINDOWS + GetStdHandle(STD_ERROR_HANDLE) +#endif + , true)).move_as_ok(); return res; } } // namespace diff --git a/test/mtproto.cpp b/test/mtproto.cpp index 1ace74a29..61e023f8f 100644 --- a/test/mtproto.cpp +++ b/test/mtproto.cpp @@ -166,7 +166,7 @@ class Mtproto_ping : public td::Test { }; Mtproto_ping mtproto_ping("Mtproto_ping"); -class Context : public AuthKeyHandshakeContext { +class HandshakeContext : public AuthKeyHandshakeContext { public: DhCallback *get_dh_callback() override { return nullptr; @@ -226,7 +226,7 @@ class HandshakeTestActor : public Actor { wait_for_result_ = true; create_actor( - "HandshakeActor", std::move(handshake_), std::move(raw_connection_), std::make_unique(), 10.0, + "HandshakeActor", std::move(handshake_), std::move(raw_connection_), std::make_unique(), 10.0, PromiseCreator::lambda([self = actor_id(this)](Result> raw_connection) { send_closure(self, &HandshakeTestActor::got_connection, std::move(raw_connection), 1); }),