5cbda834bd
GitOrigin-RevId: 1369d3af1195221f6ddb9462d5f8b74fb5fef20f
297 lines
10 KiB
C++
297 lines
10 KiB
C++
//
|
|
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
|
|
//
|
|
// Distributed under the Boost Software License, Version 1.0. (See accompanying
|
|
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
|
|
//
|
|
#pragma once
|
|
|
|
#include "td/actor/impl/Actor-decl.h"
|
|
#include "td/actor/impl/ActorId-decl.h"
|
|
#include "td/actor/impl/EventFull-decl.h"
|
|
|
|
#include "td/utils/Closure.h"
|
|
#include "td/utils/Heap.h"
|
|
#include "td/utils/List.h"
|
|
#include "td/utils/MovableValue.h"
|
|
#include "td/utils/MpscPollableQueue.h"
|
|
#include "td/utils/ObjectPool.h"
|
|
#include "td/utils/port/EventFd.h"
|
|
#include "td/utils/port/Fd.h"
|
|
#include "td/utils/port/Poll.h"
|
|
#include "td/utils/port/thread_local.h"
|
|
#include "td/utils/Slice.h"
|
|
#include "td/utils/type_traits.h"
|
|
|
|
#include <functional>
|
|
#include <map>
|
|
#include <memory>
|
|
#include <type_traits>
|
|
#include <utility>
|
|
|
|
namespace td {
|
|
class ActorInfo;
|
|
struct Send {
|
|
using Flags = uint32;
|
|
static const Flags immediate = 0x001;
|
|
static const Flags later = 0x002;
|
|
static const Flags later_weak = 0x004;
|
|
};
|
|
|
|
class Scheduler;
|
|
class SchedulerGuard {
|
|
public:
|
|
explicit SchedulerGuard(Scheduler *scheduler);
|
|
~SchedulerGuard();
|
|
SchedulerGuard(const SchedulerGuard &other) = delete;
|
|
SchedulerGuard &operator=(const SchedulerGuard &other) = delete;
|
|
SchedulerGuard(SchedulerGuard &&other) = default;
|
|
SchedulerGuard &operator=(SchedulerGuard &&other) = delete;
|
|
|
|
private:
|
|
MovableValue<bool> is_valid_ = true;
|
|
Scheduler *scheduler_;
|
|
ActorContext *save_context_;
|
|
Scheduler *save_scheduler_;
|
|
const char *save_tag_;
|
|
};
|
|
|
|
class Scheduler {
|
|
public:
|
|
class Callback {
|
|
public:
|
|
Callback() = default;
|
|
Callback(const Callback &) = delete;
|
|
Callback &operator=(const Callback &) = delete;
|
|
virtual ~Callback() = default;
|
|
virtual void on_finish() = 0;
|
|
virtual void register_at_finish(std::function<void()>) = 0;
|
|
};
|
|
Scheduler() = default;
|
|
Scheduler(const Scheduler &) = delete;
|
|
Scheduler &operator=(const Scheduler &) = delete;
|
|
Scheduler(Scheduler &&) = delete;
|
|
Scheduler &operator=(Scheduler &&) = delete;
|
|
~Scheduler();
|
|
|
|
void init();
|
|
void init(int32 id, std::vector<std::shared_ptr<MpscPollableQueue<EventFull>>> outbound, Callback *callback);
|
|
void clear();
|
|
|
|
int32 sched_id() const;
|
|
int32 sched_count() const;
|
|
|
|
template <class ActorT, class... Args>
|
|
TD_WARN_UNUSED_RESULT ActorOwn<ActorT> create_actor(Slice name, Args &&... args);
|
|
template <class ActorT, class... Args>
|
|
TD_WARN_UNUSED_RESULT ActorOwn<ActorT> create_actor_on_scheduler(Slice name, int32 sched_id, Args &&... args);
|
|
template <class ActorT>
|
|
TD_WARN_UNUSED_RESULT ActorOwn<ActorT> register_actor(Slice name, ActorT *actor_ptr, int32 sched_id = -1);
|
|
template <class ActorT>
|
|
TD_WARN_UNUSED_RESULT ActorOwn<ActorT> register_actor(Slice name, unique_ptr<ActorT> actor_ptr, int32 sched_id = -1);
|
|
|
|
template <class ActorT>
|
|
TD_WARN_UNUSED_RESULT ActorOwn<ActorT> register_existing_actor(unique_ptr<ActorT> actor_ptr);
|
|
|
|
void send_to_scheduler(int32 sched_id, const ActorId<> &actor_id, Event &&event);
|
|
void send_to_other_scheduler(int32 sched_id, const ActorId<> &actor_id, Event &&event);
|
|
|
|
template <class EventT>
|
|
void send_lambda(ActorRef actor_ref, EventT &&lambda, Send::Flags flags = 0);
|
|
|
|
template <class EventT>
|
|
void send_closure(ActorRef actor_ref, EventT &&closure, Send::Flags flags = 0);
|
|
|
|
void send(ActorRef actor_ref, Event &&event, Send::Flags flags = 0);
|
|
|
|
void hack(const ActorId<> &actor_id, Event &&event) {
|
|
actor_id.get_actor_unsafe()->raw_event(event.data);
|
|
}
|
|
void before_tail_send(const ActorId<> &actor_id);
|
|
|
|
void subscribe(const Fd &fd, Fd::Flags flags = Fd::Write | Fd::Read);
|
|
void unsubscribe(const Fd &fd);
|
|
void unsubscribe_before_close(const Fd &fd);
|
|
|
|
void yield_actor(Actor *actor);
|
|
void stop_actor(Actor *actor);
|
|
void do_stop_actor(Actor *actor);
|
|
uint64 get_link_token(Actor *actor);
|
|
void migrate_actor(Actor *actor, int32 dest_sched_id);
|
|
void do_migrate_actor(Actor *actor, int32 dest_sched_id);
|
|
void start_migrate_actor(Actor *actor, int32 dest_sched_id);
|
|
void finish_migrate_actor(Actor *actor);
|
|
|
|
bool has_actor_timeout(const Actor *actor) const;
|
|
void set_actor_timeout_in(Actor *actor, double timeout);
|
|
void set_actor_timeout_at(Actor *actor, double timeout_at);
|
|
void cancel_actor_timeout(Actor *actor);
|
|
|
|
void finish();
|
|
void yield();
|
|
void run(double timeout);
|
|
void run_no_guard(double timeout);
|
|
|
|
void wakeup();
|
|
|
|
static Scheduler *instance();
|
|
static ActorContext *&context();
|
|
static void on_context_updated();
|
|
|
|
SchedulerGuard get_guard();
|
|
|
|
private:
|
|
static void set_scheduler(Scheduler *scheduler);
|
|
/*** ServiceActor ***/
|
|
class ServiceActor final : public Actor {
|
|
public:
|
|
void set_queue(std::shared_ptr<MpscPollableQueue<EventFull>> queues);
|
|
void start_up() override;
|
|
|
|
private:
|
|
std::shared_ptr<MpscPollableQueue<EventFull>> inbound_;
|
|
void loop() override;
|
|
};
|
|
friend class ServiceActor;
|
|
|
|
void do_custom_event(ActorInfo *actor, CustomEvent &event);
|
|
void do_event(ActorInfo *actor, Event &&event);
|
|
|
|
void enter_actor(ActorInfo *actor_info);
|
|
void exit_actor(ActorInfo *actor_info);
|
|
|
|
void yield_actor(ActorInfo *actor_info);
|
|
void stop_actor(ActorInfo *actor_info);
|
|
void do_stop_actor(ActorInfo *actor_info);
|
|
uint64 get_link_token(ActorInfo *actor_info);
|
|
void migrate_actor(ActorInfo *actor_info, int32 dest_sched_id);
|
|
void do_migrate_actor(ActorInfo *actor_info, int32 dest_sched_id);
|
|
void start_migrate_actor(ActorInfo *actor_info, int32 dest_sched_id);
|
|
|
|
bool has_actor_timeout(const ActorInfo *actor_info) const;
|
|
void set_actor_timeout_in(ActorInfo *actor_info, double timeout);
|
|
void set_actor_timeout_at(ActorInfo *actor_info, double timeout_at);
|
|
void cancel_actor_timeout(ActorInfo *actor_info);
|
|
|
|
void register_migrated_actor(ActorInfo *actor_info);
|
|
void add_to_mailbox(ActorInfo *actor_info, Event &&event);
|
|
void clear_mailbox(ActorInfo *actor_info);
|
|
|
|
template <class RunFuncT, class EventFuncT>
|
|
void flush_mailbox(ActorInfo *actor_info, const RunFuncT &run_func, const EventFuncT &event_func);
|
|
|
|
template <class RunFuncT, class EventFuncT>
|
|
void send_impl(const ActorId<> &actor_id, Send::Flags flags, const RunFuncT &run_func, const EventFuncT &event_func);
|
|
|
|
void inc_wait_generation();
|
|
|
|
double run_timeout();
|
|
void run_mailbox();
|
|
double run_events();
|
|
void run_poll(double timeout);
|
|
|
|
template <class ActorT>
|
|
ActorOwn<ActorT> register_actor_impl(Slice name, ActorT *actor_ptr, Actor::Deleter deleter, int32 sched_id);
|
|
void destroy_actor(ActorInfo *actor_info);
|
|
|
|
static TD_THREAD_LOCAL Scheduler *scheduler_;
|
|
static TD_THREAD_LOCAL ActorContext *context_;
|
|
|
|
Callback *callback_ = nullptr;
|
|
std::unique_ptr<ObjectPool<ActorInfo>> actor_info_pool_;
|
|
|
|
int32 actor_count_;
|
|
ListNode pending_actors_list_;
|
|
ListNode ready_actors_list_;
|
|
KHeap<double> timeout_queue_;
|
|
|
|
std::map<ActorInfo *, std::vector<Event>> pending_events_;
|
|
|
|
#if !TD_THREAD_UNSUPPORTED && !TD_EVENTFD_UNSUPPORTED
|
|
EventFd event_fd_;
|
|
#endif
|
|
ServiceActor service_actor_;
|
|
Poll poll_;
|
|
|
|
bool yield_flag_;
|
|
bool has_guard_ = false;
|
|
bool close_flag_ = false;
|
|
|
|
uint32 wait_generation_ = 0;
|
|
int32 sched_id_;
|
|
int32 sched_n_;
|
|
std::shared_ptr<MpscPollableQueue<EventFull>> inbound_queue_;
|
|
std::vector<std::shared_ptr<MpscPollableQueue<EventFull>>> outbound_queues_;
|
|
|
|
std::shared_ptr<ActorContext> save_context_;
|
|
|
|
struct EventContext {
|
|
int32 dest_sched_id;
|
|
enum Flags { Stop = 1, Migrate = 2 };
|
|
int32 flags{0};
|
|
uint64 link_token;
|
|
|
|
ActorInfo *actor_info;
|
|
};
|
|
EventContext *event_context_ptr_;
|
|
|
|
friend class GlobalScheduler;
|
|
friend class SchedulerGuard;
|
|
friend class EventGuard;
|
|
};
|
|
|
|
/*** Interface to current scheduler ***/
|
|
void subscribe(const Fd &fd, Fd::Flags flags = Fd::Write | Fd::Read);
|
|
void unsubscribe(const Fd &fd);
|
|
void unsubscribe_before_close(const Fd &fd);
|
|
|
|
template <class ActorT, class... Args>
|
|
TD_WARN_UNUSED_RESULT ActorOwn<ActorT> create_actor(Slice name, Args &&... args);
|
|
template <class ActorT, class... Args>
|
|
TD_WARN_UNUSED_RESULT ActorOwn<ActorT> create_actor_on_scheduler(Slice name, int32 sched_id, Args &&... args);
|
|
template <class ActorT>
|
|
TD_WARN_UNUSED_RESULT ActorOwn<ActorT> register_actor(Slice name, ActorT *actor_ptr, int32 sched_id = -1);
|
|
template <class ActorT>
|
|
TD_WARN_UNUSED_RESULT ActorOwn<ActorT> register_actor(Slice name, unique_ptr<ActorT> actor_ptr, int32 sched_id = -1);
|
|
|
|
template <class ActorT>
|
|
TD_WARN_UNUSED_RESULT ActorOwn<ActorT> register_existing_actor(unique_ptr<ActorT> actor_ptr);
|
|
|
|
template <class ActorIdT, class FunctionT, class... ArgsT>
|
|
void send_closure(ActorIdT &&actor_id, FunctionT function, ArgsT &&... args) {
|
|
using ActorT = typename std::decay_t<ActorIdT>::ActorT;
|
|
using FunctionClassT = member_function_class_t<FunctionT>;
|
|
static_assert(std::is_base_of<FunctionClassT, ActorT>::value, "unsafe send_closure");
|
|
|
|
Scheduler::instance()->send_closure(std::forward<ActorIdT>(actor_id),
|
|
create_immediate_closure(function, std::forward<ArgsT>(args)...));
|
|
}
|
|
|
|
template <class ActorIdT, class FunctionT, class... ArgsT>
|
|
void send_closure_later(ActorIdT &&actor_id, FunctionT function, ArgsT &&... args) {
|
|
using ActorT = typename std::decay_t<ActorIdT>::ActorT;
|
|
using FunctionClassT = member_function_class_t<FunctionT>;
|
|
static_assert(std::is_base_of<FunctionClassT, ActorT>::value, "unsafe send_closure");
|
|
|
|
Scheduler::instance()->send(std::forward<ActorIdT>(actor_id),
|
|
Event::delayed_closure(function, std::forward<ArgsT>(args)...), Send::later);
|
|
}
|
|
|
|
template <class... ArgsT>
|
|
void send_lambda(ActorRef actor_ref, ArgsT &&... args) {
|
|
Scheduler::instance()->send_lambda(actor_ref, std::forward<ArgsT>(args)...);
|
|
}
|
|
|
|
template <class... ArgsT>
|
|
void send_event(ActorRef actor_ref, ArgsT &&... args) {
|
|
Scheduler::instance()->send(actor_ref, std::forward<ArgsT>(args)...);
|
|
}
|
|
|
|
template <class... ArgsT>
|
|
void send_event_later(ActorRef actor_ref, ArgsT &&... args) {
|
|
Scheduler::instance()->send(actor_ref, std::forward<ArgsT>(args)..., Send::later);
|
|
}
|
|
|
|
void yield_scheduler();
|
|
} // namespace td
|