Scheduler: send from other thread

GitOrigin-RevId: 3448a047001c257bcfb4792135d2e332410c85e8
This commit is contained in:
Arseny Smirnov 2018-08-16 16:56:16 +03:00
parent 8a28e4b461
commit 44be8d2ea3
8 changed files with 107 additions and 37 deletions

View File

@ -33,10 +33,25 @@ void ConcurrentScheduler::init(int32 threads_n) {
#endif
}
schedulers_.resize(threads_n);
for (int32 i = 0; i < threads_n; i++) {
// +1 for extra scheduler for IOCP and send_closure from unrelated threads
// It will know about other schedulers
// Other schedulers will have no idea about its existance
int extra_scheduler = 1;
#if TD_THREAD_UNSUPPORTED || TD_EVENTFD_UNSUPPORTED
extra_scheduler = 0;
#endif
schedulers_.resize(threads_n + extra_scheduler);
for (int32 i = 0; i < threads_n + extra_scheduler; i++) {
auto &sched = schedulers_[i];
sched = make_unique<Scheduler>();
if (i >= threads_n) {
auto queue = std::make_shared<MpscPollableQueue<EventFull>>();
queue->init();
outbound.push_back(std::move(queue));
}
sched->init(i, outbound, static_cast<Scheduler::Callback *>(this));
}

View File

@ -34,6 +34,10 @@ class ConcurrentScheduler : private Scheduler::Callback {
return schedulers_[0]->get_guard();
}
SchedulerGuard get_send_guard() {
return schedulers_[0]->get_const_guard();
}
void test_one_thread_run();
bool is_finished() {

View File

@ -38,7 +38,7 @@ enum class ActorSendType { Immediate, Later, LaterWeak };
class Scheduler;
class SchedulerGuard {
public:
explicit SchedulerGuard(Scheduler *scheduler);
explicit SchedulerGuard(Scheduler *scheduler, bool lock = true);
~SchedulerGuard();
SchedulerGuard(const SchedulerGuard &other) = delete;
SchedulerGuard &operator=(const SchedulerGuard &other) = delete;
@ -47,6 +47,7 @@ class SchedulerGuard {
private:
MovableValue<bool> is_valid_ = true;
bool is_locked_;
Scheduler *scheduler_;
ActorContext *save_context_;
Scheduler *save_scheduler_;
@ -137,6 +138,7 @@ class Scheduler {
static void on_context_updated();
SchedulerGuard get_guard();
SchedulerGuard get_const_guard();
private:
static void set_scheduler(Scheduler *scheduler);
@ -148,7 +150,9 @@ class Scheduler {
private:
std::shared_ptr<MpscPollableQueue<EventFull>> inbound_;
bool subscribed_{false};
void loop() override;
void tear_down() override;
};
friend class ServiceActor;
@ -205,9 +209,6 @@ class Scheduler {
std::map<ActorInfo *, std::vector<Event>> pending_events_;
#if !TD_THREAD_UNSUPPORTED && !TD_EVENTFD_UNSUPPORTED
EventFd event_fd_;
#endif
ServiceActor service_actor_;
Poll poll_;

View File

@ -57,8 +57,8 @@ void Scheduler::ServiceActor::start_up() {
return;
}
auto &fd = inbound_->reader_get_event_fd();
::td::subscribe(fd.get_poll_info().extract_pollable_fd(this), PollFlags::Read());
subscribed_ = true;
yield();
#endif
}
@ -72,7 +72,11 @@ void Scheduler::ServiceActor::loop() {
while (ready_n-- > 0) {
EventFull event = queue->reader_get_unsafe();
if (event.actor_id().empty()) {
Scheduler::instance()->register_migrated_actor(static_cast<ActorInfo *>(event.data().data.ptr));
if (event.data().empty()) {
yield_scheduler();
} else {
Scheduler::instance()->register_migrated_actor(static_cast<ActorInfo *>(event.data().data.ptr));
}
} else {
VLOG(actor) << "Receive " << event.data();
finish_migrate(event.data());
@ -83,10 +87,29 @@ void Scheduler::ServiceActor::loop() {
yield();
}
void Scheduler::ServiceActor::tear_down() {
if (!subscribed_) {
return;
}
#if TD_THREAD_UNSUPPORTED || TD_EVENTFD_UNSUPPORTED
CHECK(!inbound_);
#else
if (!inbound_) {
return;
}
auto &fd = inbound_->reader_get_event_fd();
::td::unsubscribe(fd.get_poll_info().get_pollable_fd_ref());
subscribed_ = false;
#endif
}
/*** SchedlerGuard ***/
SchedulerGuard::SchedulerGuard(Scheduler *scheduler) : scheduler_(scheduler) {
CHECK(!scheduler_->has_guard_);
scheduler_->has_guard_ = true;
SchedulerGuard::SchedulerGuard(Scheduler *scheduler, bool lock) : scheduler_(scheduler) {
if (lock) {
CHECK(!scheduler_->has_guard_);
scheduler_->has_guard_ = true;
}
is_locked_ = lock;
save_scheduler_ = Scheduler::instance();
Scheduler::set_scheduler(scheduler_);
@ -101,8 +124,10 @@ SchedulerGuard::~SchedulerGuard() {
if (is_valid_.get()) {
std::swap(save_context_, scheduler_->context());
Scheduler::set_scheduler(save_scheduler_);
CHECK(scheduler_->has_guard_);
scheduler_->has_guard_ = false;
if (is_locked_) {
CHECK(scheduler_->has_guard_);
scheduler_->has_guard_ = false;
}
LOG_TAG = save_tag_;
}
}
@ -181,11 +206,6 @@ void Scheduler::init(int32 id, std::vector<std::shared_ptr<MpscPollableQueue<Eve
poll_.init();
#if !TD_THREAD_UNSUPPORTED && !TD_EVENTFD_UNSUPPORTED
event_fd_.init();
subscribe(event_fd_.get_poll_info().extract_pollable_fd(nullptr), PollFlags::Read());
#endif
if (!outbound.empty()) {
inbound_queue_ = std::move(outbound[id]);
}
@ -219,12 +239,6 @@ void Scheduler::clear() {
CHECK(ready_actors_list_.empty());
poll_.clear();
#if !TD_THREAD_UNSUPPORTED && !TD_EVENTFD_UNSUPPORTED
if (!event_fd_.empty()) {
event_fd_.close();
}
#endif
if (callback_) {
// can't move lambda with unique_ptr inside into std::function
auto ptr = actor_info_pool_.release();
@ -423,13 +437,6 @@ void Scheduler::run_poll(double timeout) {
// LOG(DEBUG) << "run poll [timeout:" << format::as_time(timeout) << "]";
// we can't wait for less than 1ms
poll_.run(static_cast<int32>(timeout * 1000 + 1));
#if !TD_THREAD_UNSUPPORTED && !TD_EVENTFD_UNSUPPORTED
if (event_fd_.get_poll_info().get_flags().can_read()) {
std::atomic_thread_fence(std::memory_order_acquire);
event_fd_.acquire();
}
#endif
}
void Scheduler::run_mailbox() {

View File

@ -58,6 +58,9 @@ class EventGuard {
inline SchedulerGuard Scheduler::get_guard() {
return SchedulerGuard(this);
}
inline SchedulerGuard Scheduler::get_const_guard() {
return SchedulerGuard(this, false);
}
inline void Scheduler::init() {
init(0, {}, nullptr);
@ -92,6 +95,7 @@ ActorOwn<ActorT> Scheduler::register_actor(Slice name, unique_ptr<ActorT> actor_
template <class ActorT>
ActorOwn<ActorT> Scheduler::register_actor_impl(Slice name, ActorT *actor_ptr, Actor::Deleter deleter, int32 sched_id) {
CHECK(has_guard_);
if (sched_id == -1) {
sched_id = sched_id_;
}
@ -186,7 +190,7 @@ inline void Scheduler::inc_wait_generation() {
template <ActorSendType send_type, class RunFuncT, class EventFuncT>
void Scheduler::send_impl(const ActorId<> &actor_id, const RunFuncT &run_func, const EventFuncT &event_func) {
CHECK(has_guard_);
//CHECK(has_guard_ || );
ActorInfo *actor_info = actor_id.get_actor_info();
if (unlikely(actor_info == nullptr || close_flag_)) {
// LOG(ERROR) << "Invalid actor id";
@ -198,6 +202,7 @@ void Scheduler::send_impl(const ActorId<> &actor_id, const RunFuncT &run_func, c
bool is_migrating;
std::tie(actor_sched_id, is_migrating) = actor_info->migrate_dest_flag_atomic();
bool on_current_sched = !is_migrating && sched_id_ == actor_sched_id;
CHECK(has_guard_ || !on_current_sched);
if (likely(send_type == ActorSendType::Immediate && on_current_sched && !actor_info->is_running() &&
!actor_info->must_wait(wait_generation_))) { // run immediately
@ -332,7 +337,7 @@ inline void Scheduler::yield() {
inline void Scheduler::wakeup() {
std::atomic_thread_fence(std::memory_order_release);
#if !TD_THREAD_UNSUPPORTED && !TD_EVENTFD_UNSUPPORTED
event_fd_.release();
inbound_queue_->writer_put({});
#endif
}

View File

@ -620,3 +620,39 @@ TEST(Actors, always_wait_for_mailbox) {
}
scheduler.finish();
}
TEST(Actors, send_from_other_threads) {
SET_VERBOSITY_LEVEL(VERBOSITY_NAME(ERROR));
ConcurrentScheduler scheduler;
scheduler.init(1);
int thread_n = 10;
class Listener : public Actor {
public:
Listener(int cnt) : cnt_(cnt) {
}
void dec() {
if (--cnt_ == 0) {
Scheduler::instance()->finish();
}
}
private:
int cnt_;
};
auto A = scheduler.create_actor_unsafe<Listener>(1, "A", thread_n).release();
scheduler.start();
std::vector<td::thread> threads(thread_n);
for (auto &thread : threads) {
thread = td::thread([&A, &scheduler] {
auto guard = scheduler.get_send_guard();
send_closure(A, &Listener::dec);
});
}
while (scheduler.run_main(10)) {
}
for (auto &thread : threads) {
thread.join();
}
scheduler.finish();
}

View File

@ -64,7 +64,7 @@ set(TDUTILS_SOURCE
td/utils/buffer.cpp
td/utils/BufferedUdp.cpp
td/utils/crypto.cpp
#td/utils/FileLog.cpp
td/utils/FileLog.cpp
td/utils/filesystem.cpp
td/utils/find_boundary.cpp
td/utils/Gzip.cpp
@ -141,7 +141,7 @@ set(TDUTILS_SOURCE
td/utils/crypto.h
td/utils/Enumerator.h
td/utils/Destructor.h
#td/utils/FileLog.h
td/utils/FileLog.h
td/utils/filesystem.h
td/utils/find_boundary.h
td/utils/FloodControlFast.h

View File

@ -31,7 +31,8 @@ bool FileLog::init(string path, int64 rotate_threshold) {
fd_.close();
fd_ = r_fd.move_as_ok();
Fd::duplicate(fd_.get_fd(), Fd::Stderr()).ignore();
// FIXME
//Fd::duplicate(fd_.get_fd(), Fd::Stderr()).ignore();
path_ = std::move(path);
size_ = fd_.get_size();
@ -84,7 +85,8 @@ void FileLog::do_rotate() {
process_fatal_error(r_fd.error().message());
}
fd_ = r_fd.move_as_ok();
Fd::duplicate(fd_.get_fd(), Fd::Stderr()).ignore();
// FIXME
//Fd::duplicate(fd_.get_fd(), Fd::Stderr()).ignore();
size_ = 0;
SET_VERBOSITY_LEVEL(current_verbosity_level);
}