tdactors: IOCP draft

GitOrigin-RevId: eec85d677d808db336340d2667dca298493dd4a8
This commit is contained in:
Arseny Smirnov 2018-08-16 17:29:13 +03:00
parent 44be8d2ea3
commit 9d1a1a1155
3 changed files with 45 additions and 1 deletions

View File

@ -55,6 +55,11 @@ void ConcurrentScheduler::init(int32 threads_n) {
sched->init(i, outbound, static_cast<Scheduler::Callback *>(this)); sched->init(i, outbound, static_cast<Scheduler::Callback *>(this));
} }
#if TD_PORT_WINDOWS
iocp_ = std::make_unique<IOCP>();
iocp_->init();
#endif
state_ = State::Start; state_ = State::Start;
} }
@ -75,12 +80,19 @@ void ConcurrentScheduler::start() {
auto &sched = schedulers_[i]; auto &sched = schedulers_[i];
threads_.push_back(td::thread([&, tid = i]() { threads_.push_back(td::thread([&, tid = i]() {
set_thread_id(static_cast<int32>(tid)); set_thread_id(static_cast<int32>(tid));
#if TD_PORT_WINDOWS
td::detail::IOCP::Guard iocp_guard(iocp_.get());
#endif
while (!is_finished()) { while (!is_finished()) {
sched->run(10); sched->run(10);
} }
})); }));
} }
#endif #endif
#if TD_PORT_WINDOWS
iocp_thread_ = td::thread([&iocp_] { iocp_->loop(); });
#endif
state_ = State::Run; state_ = State::Run;
} }
@ -89,6 +101,9 @@ bool ConcurrentScheduler::run_main(double timeout) {
// run main scheduler in same thread // run main scheduler in same thread
auto &main_sched = schedulers_[0]; auto &main_sched = schedulers_[0];
if (!is_finished()) { if (!is_finished()) {
#if TD_PORT_WINDOWS
td::detail::IOCP::Guard iocp_guard(iocp_.get());
#endif
main_sched->run(timeout); main_sched->run(timeout);
} }
return !is_finished(); return !is_finished();
@ -99,12 +114,25 @@ void ConcurrentScheduler::finish() {
if (!is_finished()) { if (!is_finished()) {
on_finish(); on_finish();
} }
#if TD_PORT_WINDOWS
SCOPE_EXIT {
iocp_->clear();
};
td::detail::IOCP::Guard iocp_guard(iocp_.get());
#endif
#if !TD_THREAD_UNSUPPORTED && !TD_EVENTFD_UNSUPPORTED #if !TD_THREAD_UNSUPPORTED && !TD_EVENTFD_UNSUPPORTED
for (auto &thread : threads_) { for (auto &thread : threads_) {
thread.join(); thread.join();
} }
threads_.clear(); threads_.clear();
#endif #endif
#if TD_PORT_WINDOWS
iocp_->interrupt_loop();
iocp_thread_.join();
#endif
schedulers_.clear(); schedulers_.clear();
for (auto &f : at_finish_) { for (auto &f : at_finish_) {
f(); f();

View File

@ -20,6 +20,12 @@
namespace td { namespace td {
#if TD_PORT_WINDOWS
namespace detail {
class IOCP;
}
#endif
class ConcurrentScheduler : private Scheduler::Callback { class ConcurrentScheduler : private Scheduler::Callback {
public: public:
void init(int32 threads_n); void init(int32 threads_n);
@ -80,6 +86,10 @@ class ConcurrentScheduler : private Scheduler::Callback {
#if !TD_THREAD_UNSUPPORTED && !TD_EVENTFD_UNSUPPORTED #if !TD_THREAD_UNSUPPORTED && !TD_EVENTFD_UNSUPPORTED
std::vector<thread> threads_; std::vector<thread> threads_;
#endif #endif
#if TD_PORT_WINDOWS
std::unique_ptr<IOCP> iocp_;
td::thread iocp_thread_;
#endif
void on_finish() override { void on_finish() override {
is_finished_.store(true, std::memory_order_relaxed); is_finished_.store(true, std::memory_order_relaxed);

View File

@ -436,7 +436,13 @@ void Scheduler::set_actor_timeout_at(ActorInfo *actor_info, double timeout_at) {
void Scheduler::run_poll(double timeout) { void Scheduler::run_poll(double timeout) {
// LOG(DEBUG) << "run poll [timeout:" << format::as_time(timeout) << "]"; // LOG(DEBUG) << "run poll [timeout:" << format::as_time(timeout) << "]";
// we can't wait for less than 1ms // we can't wait for less than 1ms
poll_.run(static_cast<int32>(timeout * 1000 + 1)); int timeout_ms = static_cast<int32>(timeout * 1000 + 1);
#if TD_PORT_WINDOWS
CHECK(inbound_queue_);
inbound_queue_->reader_get_event_fd().wait(timeout_ms);
#elif TD_PORT_POSIX
poll_.run(timeout_ms);
#endif
} }
void Scheduler::run_mailbox() { void Scheduler::run_mailbox() {