555 lines
16 KiB
C++
555 lines
16 KiB
C++
//
|
|
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022
|
|
//
|
|
// Distributed under the Boost Software License, Version 1.0. (See accompanying
|
|
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
|
|
//
|
|
#include "td/actor/impl/Scheduler.h"
|
|
|
|
#include "td/actor/impl/Actor.h"
|
|
#include "td/actor/impl/ActorId.h"
|
|
#include "td/actor/impl/ActorInfo.h"
|
|
#include "td/actor/impl/Event.h"
|
|
#include "td/actor/impl/EventFull.h"
|
|
|
|
#include "td/utils/common.h"
|
|
#include "td/utils/ExitGuard.h"
|
|
#include "td/utils/format.h"
|
|
#include "td/utils/List.h"
|
|
#include "td/utils/logging.h"
|
|
#include "td/utils/misc.h"
|
|
#include "td/utils/MpscPollableQueue.h"
|
|
#include "td/utils/ObjectPool.h"
|
|
#include "td/utils/port/thread_local.h"
|
|
#include "td/utils/ScopeGuard.h"
|
|
#include "td/utils/Time.h"
|
|
|
|
#include <functional>
|
|
#include <iterator>
|
|
#include <memory>
|
|
#include <utility>
|
|
|
|
namespace td {
|
|
|
|
int VERBOSITY_NAME(actor) = VERBOSITY_NAME(DEBUG) + 10;
|
|
|
|
TD_THREAD_LOCAL Scheduler *Scheduler::scheduler_; // static zero-initialized
|
|
TD_THREAD_LOCAL ActorContext *Scheduler::context_; // static zero-initialized
|
|
|
|
Scheduler::~Scheduler() {
|
|
clear();
|
|
}
|
|
|
|
Scheduler *Scheduler::instance() {
|
|
return scheduler_;
|
|
}
|
|
|
|
ActorContext *&Scheduler::context() {
|
|
return context_;
|
|
}
|
|
|
|
void Scheduler::on_context_updated() {
|
|
LOG_TAG = context_->tag_;
|
|
}
|
|
|
|
void Scheduler::set_scheduler(Scheduler *scheduler) {
|
|
scheduler_ = scheduler;
|
|
}
|
|
|
|
void Scheduler::ServiceActor::set_queue(std::shared_ptr<MpscPollableQueue<EventFull>> queues) {
|
|
inbound_ = std::move(queues);
|
|
}
|
|
|
|
void Scheduler::ServiceActor::start_up() {
|
|
#if TD_THREAD_UNSUPPORTED || TD_EVENTFD_UNSUPPORTED
|
|
CHECK(!inbound_);
|
|
#else
|
|
if (!inbound_) {
|
|
return;
|
|
}
|
|
#if !TD_PORT_WINDOWS
|
|
auto &fd = inbound_->reader_get_event_fd();
|
|
Scheduler::subscribe(fd.get_poll_info().extract_pollable_fd(this), PollFlags::Read());
|
|
subscribed_ = true;
|
|
#endif
|
|
yield();
|
|
#endif
|
|
}
|
|
|
|
void Scheduler::ServiceActor::loop() {
|
|
auto &queue = inbound_;
|
|
int ready_n = queue->reader_wait_nonblock();
|
|
if (ready_n == 0) {
|
|
return;
|
|
}
|
|
while (ready_n-- > 0) {
|
|
EventFull event = queue->reader_get_unsafe();
|
|
if (event.actor_id().empty()) {
|
|
if (event.data().empty()) {
|
|
yield_scheduler();
|
|
} else {
|
|
Scheduler::instance()->register_migrated_actor(static_cast<ActorInfo *>(event.data().data.ptr));
|
|
}
|
|
} else {
|
|
VLOG(actor) << "Receive " << event.data();
|
|
finish_migrate(event.data());
|
|
event.try_emit();
|
|
}
|
|
}
|
|
queue->reader_flush();
|
|
yield();
|
|
}
|
|
|
|
void Scheduler::ServiceActor::tear_down() {
|
|
if (!subscribed_) {
|
|
return;
|
|
}
|
|
#if TD_THREAD_UNSUPPORTED || TD_EVENTFD_UNSUPPORTED
|
|
CHECK(!inbound_);
|
|
#else
|
|
if (!inbound_) {
|
|
return;
|
|
}
|
|
auto &fd = inbound_->reader_get_event_fd();
|
|
Scheduler::unsubscribe(fd.get_poll_info().get_pollable_fd_ref());
|
|
subscribed_ = false;
|
|
#endif
|
|
}
|
|
|
|
/*** SchedlerGuard ***/
|
|
SchedulerGuard::SchedulerGuard(Scheduler *scheduler, bool lock) : scheduler_(scheduler) {
|
|
if (lock) {
|
|
// the next check can fail if OS killed the scheduler's thread without releasing the guard
|
|
CHECK(!scheduler_->has_guard_);
|
|
scheduler_->has_guard_ = true;
|
|
}
|
|
is_locked_ = lock;
|
|
save_scheduler_ = Scheduler::instance();
|
|
Scheduler::set_scheduler(scheduler_);
|
|
|
|
// Scheduler::context() must be not null
|
|
save_context_ = scheduler_->save_context_.get();
|
|
save_tag_ = LOG_TAG;
|
|
LOG_TAG = save_context_->tag_;
|
|
std::swap(save_context_, Scheduler::context());
|
|
}
|
|
|
|
SchedulerGuard::~SchedulerGuard() {
|
|
if (is_valid_.get()) {
|
|
std::swap(save_context_, scheduler_->context());
|
|
Scheduler::set_scheduler(save_scheduler_);
|
|
if (is_locked_) {
|
|
CHECK(scheduler_->has_guard_);
|
|
scheduler_->has_guard_ = false;
|
|
}
|
|
LOG_TAG = save_tag_;
|
|
}
|
|
}
|
|
|
|
/*** EventGuard ***/
|
|
EventGuard::EventGuard(Scheduler *scheduler, ActorInfo *actor_info) : scheduler_(scheduler) {
|
|
actor_info->start_run();
|
|
event_context_.actor_info = actor_info;
|
|
event_context_ptr_ = &event_context_;
|
|
|
|
save_context_ = actor_info->get_context();
|
|
#ifdef TD_DEBUG
|
|
save_log_tag2_ = actor_info->get_name().c_str();
|
|
#endif
|
|
swap_context(actor_info);
|
|
}
|
|
|
|
EventGuard::~EventGuard() {
|
|
auto info = event_context_.actor_info;
|
|
auto node = info->get_list_node();
|
|
node->remove();
|
|
if (info->mailbox_.empty()) {
|
|
scheduler_->pending_actors_list_.put(node);
|
|
} else {
|
|
scheduler_->ready_actors_list_.put(node);
|
|
}
|
|
info->finish_run();
|
|
swap_context(info);
|
|
CHECK(!info->need_context() || save_context_ == info->get_context());
|
|
#ifdef TD_DEBUG
|
|
LOG_CHECK(!info->need_context() || save_log_tag2_ == info->get_name().c_str())
|
|
<< info->need_context() << " " << info->empty() << " " << info->is_migrating() << " " << save_log_tag2_ << " "
|
|
<< info->get_name() << " " << scheduler_->close_flag_;
|
|
#endif
|
|
if (event_context_.flags & Scheduler::EventContext::Stop) {
|
|
scheduler_->do_stop_actor(info);
|
|
return;
|
|
}
|
|
if (event_context_.flags & Scheduler::EventContext::Migrate) {
|
|
scheduler_->do_migrate_actor(info, event_context_.dest_sched_id);
|
|
}
|
|
}
|
|
|
|
void EventGuard::swap_context(ActorInfo *info) {
|
|
std::swap(scheduler_->event_context_ptr_, event_context_ptr_);
|
|
|
|
if (!info->need_context()) {
|
|
return;
|
|
}
|
|
|
|
#ifdef TD_DEBUG
|
|
std::swap(LOG_TAG2, save_log_tag2_);
|
|
#endif
|
|
|
|
auto *current_context_ptr = &Scheduler::context();
|
|
if (save_context_ != *current_context_ptr) {
|
|
std::swap(save_context_, *current_context_ptr);
|
|
Scheduler::on_context_updated();
|
|
}
|
|
}
|
|
|
|
void Scheduler::init(int32 id, std::vector<std::shared_ptr<MpscPollableQueue<EventFull>>> outbound,
|
|
Callback *callback) {
|
|
save_context_ = std::make_shared<ActorContext>();
|
|
save_context_->this_ptr_ = save_context_;
|
|
save_context_->tag_ = LOG_TAG;
|
|
|
|
auto guard = get_guard();
|
|
|
|
callback_ = callback;
|
|
actor_info_pool_ = make_unique<ObjectPool<ActorInfo>>();
|
|
|
|
yield_flag_ = false;
|
|
actor_count_ = 0;
|
|
sched_id_ = 0;
|
|
|
|
poll_.init();
|
|
|
|
if (!outbound.empty()) {
|
|
inbound_queue_ = std::move(outbound[id]);
|
|
}
|
|
outbound_queues_ = std::move(outbound);
|
|
sched_id_ = id;
|
|
sched_n_ = static_cast<int32>(outbound_queues_.size());
|
|
service_actor_.set_queue(inbound_queue_);
|
|
register_actor("ServiceActor", &service_actor_).release();
|
|
}
|
|
|
|
void Scheduler::clear() {
|
|
if (service_actor_.empty()) {
|
|
return;
|
|
}
|
|
close_flag_ = true;
|
|
auto guard = get_guard();
|
|
|
|
// Stop all actors
|
|
if (!service_actor_.empty()) {
|
|
service_actor_.do_stop();
|
|
}
|
|
while (!pending_actors_list_.empty()) {
|
|
auto actor_info = ActorInfo::from_list_node(pending_actors_list_.get());
|
|
do_stop_actor(actor_info);
|
|
}
|
|
while (!ready_actors_list_.empty()) {
|
|
auto actor_info = ActorInfo::from_list_node(ready_actors_list_.get());
|
|
do_stop_actor(actor_info);
|
|
}
|
|
poll_.clear();
|
|
|
|
if (callback_ && !ExitGuard::is_exited()) {
|
|
// can't move lambda with unique_ptr inside into std::function
|
|
auto ptr = actor_info_pool_.release();
|
|
callback_->register_at_finish([ptr] { delete ptr; });
|
|
} else {
|
|
actor_info_pool_.reset();
|
|
}
|
|
}
|
|
|
|
void Scheduler::do_event(ActorInfo *actor_info, Event &&event) {
|
|
event_context_ptr_->link_token = event.link_token;
|
|
auto actor = actor_info->get_actor_unsafe();
|
|
VLOG(actor) << *actor_info << ' ' << event;
|
|
switch (event.type) {
|
|
case Event::Type::Start:
|
|
actor->start_up();
|
|
break;
|
|
case Event::Type::Stop:
|
|
actor->tear_down();
|
|
break;
|
|
case Event::Type::Yield:
|
|
actor->wakeup();
|
|
break;
|
|
case Event::Type::Hangup:
|
|
if (get_link_token(actor) != 0) {
|
|
actor->hangup_shared();
|
|
} else {
|
|
actor->hangup();
|
|
}
|
|
break;
|
|
case Event::Type::Timeout:
|
|
actor->timeout_expired();
|
|
break;
|
|
case Event::Type::Raw:
|
|
actor->raw_event(event.data);
|
|
break;
|
|
case Event::Type::Custom:
|
|
event.data.custom_event->run(actor);
|
|
break;
|
|
case Event::Type::NoType:
|
|
default:
|
|
UNREACHABLE();
|
|
break;
|
|
}
|
|
// can't clear event here. It may be already destroyed during destroy_actor
|
|
}
|
|
|
|
void Scheduler::register_migrated_actor(ActorInfo *actor_info) {
|
|
VLOG(actor) << "Register migrated actor: " << tag("name", *actor_info) << tag("ptr", actor_info)
|
|
<< tag("actor_count", actor_count_);
|
|
actor_count_++;
|
|
LOG_CHECK(actor_info->is_migrating()) << *actor_info << ' ' << actor_count_ << ' ' << sched_id_ << ' '
|
|
<< actor_info->migrate_dest() << ' ' << actor_info->is_running() << ' '
|
|
<< close_flag_;
|
|
CHECK(sched_id_ == actor_info->migrate_dest());
|
|
// CHECK(!actor_info->is_running());
|
|
actor_info->finish_migrate();
|
|
for (auto &event : actor_info->mailbox_) {
|
|
finish_migrate(event);
|
|
}
|
|
auto it = pending_events_.find(actor_info);
|
|
if (it != pending_events_.end()) {
|
|
actor_info->mailbox_.insert(actor_info->mailbox_.end(), std::make_move_iterator(it->second.begin()),
|
|
std::make_move_iterator(it->second.end()));
|
|
pending_events_.erase(it);
|
|
}
|
|
if (actor_info->mailbox_.empty()) {
|
|
pending_actors_list_.put(actor_info->get_list_node());
|
|
} else {
|
|
ready_actors_list_.put(actor_info->get_list_node());
|
|
}
|
|
actor_info->get_actor_unsafe()->on_finish_migrate();
|
|
}
|
|
|
|
void Scheduler::send_to_other_scheduler(int32 sched_id, const ActorId<> &actor_id, Event &&event) {
|
|
if (sched_id < sched_count()) {
|
|
auto actor_info = actor_id.get_actor_info();
|
|
if (actor_info) {
|
|
VLOG(actor) << "Send to " << *actor_info << " on scheduler " << sched_id << ": " << event;
|
|
} else {
|
|
VLOG(actor) << "Send to scheduler " << sched_id << ": " << event;
|
|
}
|
|
start_migrate(event, sched_id);
|
|
outbound_queues_[sched_id]->writer_put(EventCreator::event_unsafe(actor_id, std::move(event)));
|
|
outbound_queues_[sched_id]->writer_flush();
|
|
}
|
|
}
|
|
|
|
void Scheduler::run_on_scheduler(int32 sched_id, Promise<Unit> action) {
|
|
if (sched_id >= 0 && sched_id_ != sched_id) {
|
|
class Worker final : public Actor {
|
|
public:
|
|
explicit Worker(Promise<Unit> action) : action_(std::move(action)) {
|
|
}
|
|
|
|
private:
|
|
Promise<Unit> action_;
|
|
|
|
void start_up() final {
|
|
action_.set_value(Unit());
|
|
stop();
|
|
}
|
|
};
|
|
create_actor_on_scheduler<Worker>("RunOnSchedulerWorker", sched_id, std::move(action)).release();
|
|
return;
|
|
}
|
|
|
|
action.set_value(Unit());
|
|
}
|
|
|
|
void Scheduler::add_to_mailbox(ActorInfo *actor_info, Event &&event) {
|
|
if (!actor_info->is_running()) {
|
|
auto node = actor_info->get_list_node();
|
|
node->remove();
|
|
ready_actors_list_.put(node);
|
|
}
|
|
VLOG(actor) << "Add to mailbox: " << *actor_info << " " << event;
|
|
actor_info->mailbox_.push_back(std::move(event));
|
|
}
|
|
|
|
void Scheduler::do_stop_actor(Actor *actor) {
|
|
return do_stop_actor(actor->get_info());
|
|
}
|
|
void Scheduler::do_stop_actor(ActorInfo *actor_info) {
|
|
CHECK(!actor_info->is_migrating());
|
|
LOG_CHECK(actor_info->migrate_dest() == sched_id_) << actor_info->migrate_dest() << " " << sched_id_;
|
|
ObjectPool<ActorInfo>::OwnerPtr owner_ptr;
|
|
if (actor_info->need_start_up()) {
|
|
EventGuard guard(this, actor_info);
|
|
do_event(actor_info, Event::stop());
|
|
owner_ptr = actor_info->get_actor_unsafe()->clear();
|
|
// Actor context is visible in destructor
|
|
actor_info->destroy_actor();
|
|
event_context_ptr_->flags = 0;
|
|
} else {
|
|
owner_ptr = actor_info->get_actor_unsafe()->clear();
|
|
actor_info->destroy_actor();
|
|
}
|
|
destroy_actor(actor_info);
|
|
}
|
|
|
|
void Scheduler::migrate_actor(Actor *actor, int32 dest_sched_id) {
|
|
migrate_actor(actor->get_info(), dest_sched_id);
|
|
}
|
|
void Scheduler::migrate_actor(ActorInfo *actor_info, int32 dest_sched_id) {
|
|
CHECK(event_context_ptr_->actor_info == actor_info);
|
|
if (sched_id_ == dest_sched_id) {
|
|
return;
|
|
}
|
|
event_context_ptr_->flags |= EventContext::Migrate;
|
|
event_context_ptr_->dest_sched_id = dest_sched_id;
|
|
}
|
|
|
|
void Scheduler::do_migrate_actor(Actor *actor, int32 dest_sched_id) {
|
|
do_migrate_actor(actor->get_info(), dest_sched_id);
|
|
}
|
|
void Scheduler::do_migrate_actor(ActorInfo *actor_info, int32 dest_sched_id) {
|
|
#if TD_THREAD_UNSUPPORTED || TD_EVENTFD_UNSUPPORTED
|
|
dest_sched_id = 0;
|
|
#endif
|
|
if (sched_id_ == dest_sched_id) {
|
|
return;
|
|
}
|
|
start_migrate_actor(actor_info, dest_sched_id);
|
|
send_to_other_scheduler(dest_sched_id, ActorId<>(), Event::raw(actor_info));
|
|
}
|
|
|
|
void Scheduler::start_migrate_actor(Actor *actor, int32 dest_sched_id) {
|
|
start_migrate_actor(actor->get_info(), dest_sched_id);
|
|
}
|
|
|
|
void Scheduler::start_migrate_actor(ActorInfo *actor_info, int32 dest_sched_id) {
|
|
VLOG(actor) << "Start migrate actor: " << tag("name", actor_info) << tag("ptr", actor_info)
|
|
<< tag("actor_count", actor_count_);
|
|
actor_count_--;
|
|
CHECK(actor_count_ >= 0);
|
|
actor_info->get_actor_unsafe()->on_start_migrate(dest_sched_id);
|
|
for (auto &event : actor_info->mailbox_) {
|
|
start_migrate(event, dest_sched_id);
|
|
}
|
|
actor_info->start_migrate(dest_sched_id);
|
|
actor_info->get_list_node()->remove();
|
|
cancel_actor_timeout(actor_info);
|
|
}
|
|
|
|
double Scheduler::get_actor_timeout(const ActorInfo *actor_info) const {
|
|
const HeapNode *heap_node = actor_info->get_heap_node();
|
|
return heap_node->in_heap() ? timeout_queue_.get_key(heap_node) - Time::now() : 0.0;
|
|
}
|
|
|
|
void Scheduler::set_actor_timeout_in(ActorInfo *actor_info, double timeout) {
|
|
if (timeout > 1e10) {
|
|
timeout = 1e10;
|
|
}
|
|
if (timeout < 0) {
|
|
timeout = 0;
|
|
}
|
|
double expires_at = Time::now() + timeout;
|
|
set_actor_timeout_at(actor_info, expires_at);
|
|
}
|
|
|
|
void Scheduler::set_actor_timeout_at(ActorInfo *actor_info, double timeout_at) {
|
|
HeapNode *heap_node = actor_info->get_heap_node();
|
|
VLOG(actor) << "Set actor " << *actor_info << " timeout in " << timeout_at - Time::now_cached();
|
|
if (heap_node->in_heap()) {
|
|
timeout_queue_.fix(timeout_at, heap_node);
|
|
} else {
|
|
timeout_queue_.insert(timeout_at, heap_node);
|
|
}
|
|
}
|
|
|
|
void Scheduler::run_poll(Timestamp timeout) {
|
|
// we can't wait for less than 1ms
|
|
auto timeout_ms = static_cast<int>(clamp(timeout.in(), 0.0, 1000000.0) * 1000 + 1);
|
|
#if TD_PORT_WINDOWS
|
|
CHECK(inbound_queue_);
|
|
inbound_queue_->reader_get_event_fd().wait(timeout_ms);
|
|
service_actor_.notify();
|
|
#elif TD_PORT_POSIX
|
|
poll_.run(timeout_ms);
|
|
#endif
|
|
}
|
|
|
|
void Scheduler::run_mailbox() {
|
|
VLOG(actor) << "Run mailbox : begin";
|
|
ListNode actors_list = std::move(ready_actors_list_);
|
|
while (!actors_list.empty()) {
|
|
ListNode *node = actors_list.get();
|
|
CHECK(node);
|
|
auto actor_info = ActorInfo::from_list_node(node);
|
|
inc_wait_generation();
|
|
flush_mailbox(actor_info, static_cast<void (*)(ActorInfo *)>(nullptr), static_cast<Event (*)()>(nullptr));
|
|
}
|
|
VLOG(actor) << "Run mailbox : finish " << actor_count_;
|
|
|
|
//Useful for debug, but O(ActorsCount) check
|
|
|
|
//int cnt = 0;
|
|
//for (ListNode *end = &pending_actors_list_, *it = pending_actors_list_.next; it != end; it = it->next) {
|
|
//cnt++;
|
|
//auto actor_info = ActorInfo::from_list_node(it);
|
|
//LOG(ERROR) << *actor_info;
|
|
//CHECK(actor_info->mailbox_.empty());
|
|
//CHECK(!actor_info->is_running());
|
|
//}
|
|
//for (ListNode *end = &ready_actors_list_, *it = ready_actors_list_.next; it != end; it = it->next) {
|
|
//auto actor_info = ActorInfo::from_list_node(it);
|
|
//LOG(ERROR) << *actor_info;
|
|
//cnt++;
|
|
//}
|
|
//LOG_CHECK(cnt == actor_count_) << cnt << " vs " << actor_count_;
|
|
}
|
|
|
|
Timestamp Scheduler::run_timeout() {
|
|
double now = Time::now();
|
|
//TODO: use Timestamp().is_in_past()
|
|
while (!timeout_queue_.empty() && timeout_queue_.top_key() < now) {
|
|
HeapNode *node = timeout_queue_.pop();
|
|
ActorInfo *actor_info = ActorInfo::from_heap_node(node);
|
|
inc_wait_generation();
|
|
send<ActorSendType::Immediate>(actor_info->actor_id(), Event::timeout());
|
|
}
|
|
return get_timeout();
|
|
}
|
|
|
|
Timestamp Scheduler::run_events(Timestamp timeout) {
|
|
Timestamp res;
|
|
VLOG(actor) << "Run events " << sched_id_ << " " << tag("pending", pending_events_.size())
|
|
<< tag("actors", actor_count_);
|
|
do {
|
|
run_mailbox();
|
|
res = run_timeout();
|
|
} while (!ready_actors_list_.empty() && !timeout.is_in_past());
|
|
return res;
|
|
}
|
|
|
|
void Scheduler::run_no_guard(Timestamp timeout) {
|
|
CHECK(has_guard_);
|
|
SCOPE_EXIT {
|
|
yield_flag_ = false;
|
|
};
|
|
|
|
timeout.relax(run_events(timeout));
|
|
if (yield_flag_) {
|
|
return;
|
|
}
|
|
run_poll(timeout);
|
|
run_events(timeout);
|
|
}
|
|
|
|
Timestamp Scheduler::get_timeout() {
|
|
if (!ready_actors_list_.empty()) {
|
|
return Timestamp::in(0);
|
|
}
|
|
if (timeout_queue_.empty()) {
|
|
return Timestamp::in(10000);
|
|
}
|
|
return Timestamp::at(timeout_queue_.top_key());
|
|
}
|
|
|
|
} // namespace td
|