diff --git a/tdactor/td/actor/impl/ConcurrentScheduler.cpp b/tdactor/td/actor/impl/ConcurrentScheduler.cpp index 47593db9..b044fbab 100644 --- a/tdactor/td/actor/impl/ConcurrentScheduler.cpp +++ b/tdactor/td/actor/impl/ConcurrentScheduler.cpp @@ -33,10 +33,25 @@ void ConcurrentScheduler::init(int32 threads_n) { #endif } - schedulers_.resize(threads_n); - for (int32 i = 0; i < threads_n; i++) { + // +1 for extra scheduler for IOCP and send_closure from unrelated threads + // It will know about other schedulers + // Other schedulers will have no idea about its existance + int extra_scheduler = 1; +#if TD_THREAD_UNSUPPORTED || TD_EVENTFD_UNSUPPORTED + extra_scheduler = 0; +#endif + + schedulers_.resize(threads_n + extra_scheduler); + for (int32 i = 0; i < threads_n + extra_scheduler; i++) { auto &sched = schedulers_[i]; sched = make_unique(); + + if (i >= threads_n) { + auto queue = std::make_shared>(); + queue->init(); + outbound.push_back(std::move(queue)); + } + sched->init(i, outbound, static_cast(this)); } diff --git a/tdactor/td/actor/impl/ConcurrentScheduler.h b/tdactor/td/actor/impl/ConcurrentScheduler.h index 1e9793ea..884e6ff8 100644 --- a/tdactor/td/actor/impl/ConcurrentScheduler.h +++ b/tdactor/td/actor/impl/ConcurrentScheduler.h @@ -34,6 +34,10 @@ class ConcurrentScheduler : private Scheduler::Callback { return schedulers_[0]->get_guard(); } + SchedulerGuard get_send_guard() { + return schedulers_[0]->get_const_guard(); + } + void test_one_thread_run(); bool is_finished() { diff --git a/tdactor/td/actor/impl/Scheduler-decl.h b/tdactor/td/actor/impl/Scheduler-decl.h index 4dc1a4d3..fb8dc313 100644 --- a/tdactor/td/actor/impl/Scheduler-decl.h +++ b/tdactor/td/actor/impl/Scheduler-decl.h @@ -38,7 +38,7 @@ enum class ActorSendType { Immediate, Later, LaterWeak }; class Scheduler; class SchedulerGuard { public: - explicit SchedulerGuard(Scheduler *scheduler); + explicit SchedulerGuard(Scheduler *scheduler, bool lock = true); ~SchedulerGuard(); SchedulerGuard(const SchedulerGuard &other) = delete; SchedulerGuard &operator=(const SchedulerGuard &other) = delete; @@ -47,6 +47,7 @@ class SchedulerGuard { private: MovableValue is_valid_ = true; + bool is_locked_; Scheduler *scheduler_; ActorContext *save_context_; Scheduler *save_scheduler_; @@ -137,6 +138,7 @@ class Scheduler { static void on_context_updated(); SchedulerGuard get_guard(); + SchedulerGuard get_const_guard(); private: static void set_scheduler(Scheduler *scheduler); @@ -148,7 +150,9 @@ class Scheduler { private: std::shared_ptr> inbound_; + bool subscribed_{false}; void loop() override; + void tear_down() override; }; friend class ServiceActor; @@ -205,9 +209,6 @@ class Scheduler { std::map> pending_events_; -#if !TD_THREAD_UNSUPPORTED && !TD_EVENTFD_UNSUPPORTED - EventFd event_fd_; -#endif ServiceActor service_actor_; Poll poll_; diff --git a/tdactor/td/actor/impl/Scheduler.cpp b/tdactor/td/actor/impl/Scheduler.cpp index 7049600f..cf482570 100644 --- a/tdactor/td/actor/impl/Scheduler.cpp +++ b/tdactor/td/actor/impl/Scheduler.cpp @@ -57,8 +57,8 @@ void Scheduler::ServiceActor::start_up() { return; } auto &fd = inbound_->reader_get_event_fd(); - ::td::subscribe(fd.get_poll_info().extract_pollable_fd(this), PollFlags::Read()); + subscribed_ = true; yield(); #endif } @@ -72,7 +72,11 @@ void Scheduler::ServiceActor::loop() { while (ready_n-- > 0) { EventFull event = queue->reader_get_unsafe(); if (event.actor_id().empty()) { - Scheduler::instance()->register_migrated_actor(static_cast(event.data().data.ptr)); + if (event.data().empty()) { + yield_scheduler(); + } else { + Scheduler::instance()->register_migrated_actor(static_cast(event.data().data.ptr)); + } } else { VLOG(actor) << "Receive " << event.data(); finish_migrate(event.data()); @@ -83,10 +87,29 @@ void Scheduler::ServiceActor::loop() { yield(); } +void Scheduler::ServiceActor::tear_down() { + if (!subscribed_) { + return; + } +#if TD_THREAD_UNSUPPORTED || TD_EVENTFD_UNSUPPORTED + CHECK(!inbound_); +#else + if (!inbound_) { + return; + } + auto &fd = inbound_->reader_get_event_fd(); + ::td::unsubscribe(fd.get_poll_info().get_pollable_fd_ref()); + subscribed_ = false; +#endif +} + /*** SchedlerGuard ***/ -SchedulerGuard::SchedulerGuard(Scheduler *scheduler) : scheduler_(scheduler) { - CHECK(!scheduler_->has_guard_); - scheduler_->has_guard_ = true; +SchedulerGuard::SchedulerGuard(Scheduler *scheduler, bool lock) : scheduler_(scheduler) { + if (lock) { + CHECK(!scheduler_->has_guard_); + scheduler_->has_guard_ = true; + } + is_locked_ = lock; save_scheduler_ = Scheduler::instance(); Scheduler::set_scheduler(scheduler_); @@ -101,8 +124,10 @@ SchedulerGuard::~SchedulerGuard() { if (is_valid_.get()) { std::swap(save_context_, scheduler_->context()); Scheduler::set_scheduler(save_scheduler_); - CHECK(scheduler_->has_guard_); - scheduler_->has_guard_ = false; + if (is_locked_) { + CHECK(scheduler_->has_guard_); + scheduler_->has_guard_ = false; + } LOG_TAG = save_tag_; } } @@ -181,11 +206,6 @@ void Scheduler::init(int32 id, std::vector(timeout * 1000 + 1)); - -#if !TD_THREAD_UNSUPPORTED && !TD_EVENTFD_UNSUPPORTED - if (event_fd_.get_poll_info().get_flags().can_read()) { - std::atomic_thread_fence(std::memory_order_acquire); - event_fd_.acquire(); - } -#endif } void Scheduler::run_mailbox() { diff --git a/tdactor/td/actor/impl/Scheduler.h b/tdactor/td/actor/impl/Scheduler.h index 5e45bb22..1b70a63b 100644 --- a/tdactor/td/actor/impl/Scheduler.h +++ b/tdactor/td/actor/impl/Scheduler.h @@ -58,6 +58,9 @@ class EventGuard { inline SchedulerGuard Scheduler::get_guard() { return SchedulerGuard(this); } +inline SchedulerGuard Scheduler::get_const_guard() { + return SchedulerGuard(this, false); +} inline void Scheduler::init() { init(0, {}, nullptr); @@ -92,6 +95,7 @@ ActorOwn Scheduler::register_actor(Slice name, unique_ptr actor_ template ActorOwn Scheduler::register_actor_impl(Slice name, ActorT *actor_ptr, Actor::Deleter deleter, int32 sched_id) { + CHECK(has_guard_); if (sched_id == -1) { sched_id = sched_id_; } @@ -186,7 +190,7 @@ inline void Scheduler::inc_wait_generation() { template void Scheduler::send_impl(const ActorId<> &actor_id, const RunFuncT &run_func, const EventFuncT &event_func) { - CHECK(has_guard_); + //CHECK(has_guard_ || ); ActorInfo *actor_info = actor_id.get_actor_info(); if (unlikely(actor_info == nullptr || close_flag_)) { // LOG(ERROR) << "Invalid actor id"; @@ -198,6 +202,7 @@ void Scheduler::send_impl(const ActorId<> &actor_id, const RunFuncT &run_func, c bool is_migrating; std::tie(actor_sched_id, is_migrating) = actor_info->migrate_dest_flag_atomic(); bool on_current_sched = !is_migrating && sched_id_ == actor_sched_id; + CHECK(has_guard_ || !on_current_sched); if (likely(send_type == ActorSendType::Immediate && on_current_sched && !actor_info->is_running() && !actor_info->must_wait(wait_generation_))) { // run immediately @@ -332,7 +337,7 @@ inline void Scheduler::yield() { inline void Scheduler::wakeup() { std::atomic_thread_fence(std::memory_order_release); #if !TD_THREAD_UNSUPPORTED && !TD_EVENTFD_UNSUPPORTED - event_fd_.release(); + inbound_queue_->writer_put({}); #endif } diff --git a/tdactor/test/actors_simple.cpp b/tdactor/test/actors_simple.cpp index 8f932e3a..4fc7d808 100644 --- a/tdactor/test/actors_simple.cpp +++ b/tdactor/test/actors_simple.cpp @@ -620,3 +620,39 @@ TEST(Actors, always_wait_for_mailbox) { } scheduler.finish(); } + +TEST(Actors, send_from_other_threads) { + SET_VERBOSITY_LEVEL(VERBOSITY_NAME(ERROR)); + ConcurrentScheduler scheduler; + scheduler.init(1); + int thread_n = 10; + class Listener : public Actor { + public: + Listener(int cnt) : cnt_(cnt) { + } + void dec() { + if (--cnt_ == 0) { + Scheduler::instance()->finish(); + } + } + + private: + int cnt_; + }; + + auto A = scheduler.create_actor_unsafe(1, "A", thread_n).release(); + scheduler.start(); + std::vector threads(thread_n); + for (auto &thread : threads) { + thread = td::thread([&A, &scheduler] { + auto guard = scheduler.get_send_guard(); + send_closure(A, &Listener::dec); + }); + } + while (scheduler.run_main(10)) { + } + for (auto &thread : threads) { + thread.join(); + } + scheduler.finish(); +} diff --git a/tdutils/CMakeLists.txt b/tdutils/CMakeLists.txt index 48b3b660..1318320c 100644 --- a/tdutils/CMakeLists.txt +++ b/tdutils/CMakeLists.txt @@ -64,7 +64,7 @@ set(TDUTILS_SOURCE td/utils/buffer.cpp td/utils/BufferedUdp.cpp td/utils/crypto.cpp - #td/utils/FileLog.cpp + td/utils/FileLog.cpp td/utils/filesystem.cpp td/utils/find_boundary.cpp td/utils/Gzip.cpp @@ -141,7 +141,7 @@ set(TDUTILS_SOURCE td/utils/crypto.h td/utils/Enumerator.h td/utils/Destructor.h - #td/utils/FileLog.h + td/utils/FileLog.h td/utils/filesystem.h td/utils/find_boundary.h td/utils/FloodControlFast.h diff --git a/tdutils/td/utils/FileLog.cpp b/tdutils/td/utils/FileLog.cpp index 922be637..3a45888f 100644 --- a/tdutils/td/utils/FileLog.cpp +++ b/tdutils/td/utils/FileLog.cpp @@ -31,7 +31,8 @@ bool FileLog::init(string path, int64 rotate_threshold) { fd_.close(); fd_ = r_fd.move_as_ok(); - Fd::duplicate(fd_.get_fd(), Fd::Stderr()).ignore(); + // FIXME + //Fd::duplicate(fd_.get_fd(), Fd::Stderr()).ignore(); path_ = std::move(path); size_ = fd_.get_size(); @@ -84,7 +85,8 @@ void FileLog::do_rotate() { process_fatal_error(r_fd.error().message()); } fd_ = r_fd.move_as_ok(); - Fd::duplicate(fd_.get_fd(), Fd::Stderr()).ignore(); + // FIXME + //Fd::duplicate(fd_.get_fd(), Fd::Stderr()).ignore(); size_ = 0; SET_VERBOSITY_LEVEL(current_verbosity_level); }