From 9d1a1a11553272a29581c27eb34d131f983bd97c Mon Sep 17 00:00:00 2001 From: Arseny Smirnov Date: Thu, 16 Aug 2018 17:29:13 +0300 Subject: [PATCH] tdactors: IOCP draft GitOrigin-RevId: eec85d677d808db336340d2667dca298493dd4a8 --- tdactor/td/actor/impl/ConcurrentScheduler.cpp | 28 +++++++++++++++++++ tdactor/td/actor/impl/ConcurrentScheduler.h | 10 +++++++ tdactor/td/actor/impl/Scheduler.cpp | 8 +++++- 3 files changed, 45 insertions(+), 1 deletion(-) diff --git a/tdactor/td/actor/impl/ConcurrentScheduler.cpp b/tdactor/td/actor/impl/ConcurrentScheduler.cpp index b044fbab..0be47cd8 100644 --- a/tdactor/td/actor/impl/ConcurrentScheduler.cpp +++ b/tdactor/td/actor/impl/ConcurrentScheduler.cpp @@ -55,6 +55,11 @@ void ConcurrentScheduler::init(int32 threads_n) { sched->init(i, outbound, static_cast(this)); } +#if TD_PORT_WINDOWS + iocp_ = std::make_unique(); + iocp_->init(); +#endif + state_ = State::Start; } @@ -75,12 +80,19 @@ void ConcurrentScheduler::start() { auto &sched = schedulers_[i]; threads_.push_back(td::thread([&, tid = i]() { set_thread_id(static_cast(tid)); +#if TD_PORT_WINDOWS + td::detail::IOCP::Guard iocp_guard(iocp_.get()); +#endif while (!is_finished()) { sched->run(10); } })); } #endif +#if TD_PORT_WINDOWS + iocp_thread_ = td::thread([&iocp_] { iocp_->loop(); }); +#endif + state_ = State::Run; } @@ -89,6 +101,9 @@ bool ConcurrentScheduler::run_main(double timeout) { // run main scheduler in same thread auto &main_sched = schedulers_[0]; if (!is_finished()) { +#if TD_PORT_WINDOWS + td::detail::IOCP::Guard iocp_guard(iocp_.get()); +#endif main_sched->run(timeout); } return !is_finished(); @@ -99,12 +114,25 @@ void ConcurrentScheduler::finish() { if (!is_finished()) { on_finish(); } +#if TD_PORT_WINDOWS + SCOPE_EXIT { + iocp_->clear(); + }; + td::detail::IOCP::Guard iocp_guard(iocp_.get()); +#endif + #if !TD_THREAD_UNSUPPORTED && !TD_EVENTFD_UNSUPPORTED for (auto &thread : threads_) { thread.join(); } threads_.clear(); #endif + +#if TD_PORT_WINDOWS + iocp_->interrupt_loop(); + iocp_thread_.join(); +#endif + schedulers_.clear(); for (auto &f : at_finish_) { f(); diff --git a/tdactor/td/actor/impl/ConcurrentScheduler.h b/tdactor/td/actor/impl/ConcurrentScheduler.h index 884e6ff8..c8e18695 100644 --- a/tdactor/td/actor/impl/ConcurrentScheduler.h +++ b/tdactor/td/actor/impl/ConcurrentScheduler.h @@ -20,6 +20,12 @@ namespace td { +#if TD_PORT_WINDOWS +namespace detail { +class IOCP; +} +#endif + class ConcurrentScheduler : private Scheduler::Callback { public: void init(int32 threads_n); @@ -80,6 +86,10 @@ class ConcurrentScheduler : private Scheduler::Callback { #if !TD_THREAD_UNSUPPORTED && !TD_EVENTFD_UNSUPPORTED std::vector threads_; #endif +#if TD_PORT_WINDOWS + std::unique_ptr iocp_; + td::thread iocp_thread_; +#endif void on_finish() override { is_finished_.store(true, std::memory_order_relaxed); diff --git a/tdactor/td/actor/impl/Scheduler.cpp b/tdactor/td/actor/impl/Scheduler.cpp index cf482570..5df2fca7 100644 --- a/tdactor/td/actor/impl/Scheduler.cpp +++ b/tdactor/td/actor/impl/Scheduler.cpp @@ -436,7 +436,13 @@ void Scheduler::set_actor_timeout_at(ActorInfo *actor_info, double timeout_at) { void Scheduler::run_poll(double timeout) { // LOG(DEBUG) << "run poll [timeout:" << format::as_time(timeout) << "]"; // we can't wait for less than 1ms - poll_.run(static_cast(timeout * 1000 + 1)); + int timeout_ms = static_cast(timeout * 1000 + 1); +#if TD_PORT_WINDOWS + CHECK(inbound_queue_); + inbound_queue_->reader_get_event_fd().wait(timeout_ms); +#elif TD_PORT_POSIX + poll_.run(timeout_ms); +#endif } void Scheduler::run_mailbox() {