Manually split send_immediately and send_later implementations.

This commit is contained in:
levlam 2024-05-06 19:38:24 +03:00
parent 44b548c307
commit 2181783bee
5 changed files with 114 additions and 48 deletions

View File

@ -291,12 +291,20 @@ class PromiseFuture {
FutureActor<T> future_;
};
template <ActorSendType send_type, class T, class ActorAT, class ActorBT, class ResultT, class... DestArgsT,
class... ArgsT>
FutureActor<T> send_promise(ActorId<ActorAT> actor_id, ResultT (ActorBT::*func)(PromiseActor<T> &&, DestArgsT...),
template <class T, class ActorAT, class ActorBT, class ResultT, class... DestArgsT, class... ArgsT>
FutureActor<T> send_promise_immediately(ActorId<ActorAT> actor_id,
ResultT (ActorBT::*func)(PromiseActor<T> &&, DestArgsT...), ArgsT &&...args) {
PromiseFuture<T> pf;
Scheduler::instance()->send_closure_immediately(
std::move(actor_id), create_immediate_closure(func, pf.move_promise(), std::forward<ArgsT>(args)...));
return pf.move_future();
}
template <class T, class ActorAT, class ActorBT, class ResultT, class... DestArgsT, class... ArgsT>
FutureActor<T> send_promise_later(ActorId<ActorAT> actor_id, ResultT (ActorBT::*func)(PromiseActor<T> &&, DestArgsT...),
ArgsT &&...args) {
PromiseFuture<T> pf;
Scheduler::instance()->send_closure<send_type>(
Scheduler::instance()->send_closure_later(
std::move(actor_id), create_immediate_closure(func, pf.move_promise(), std::forward<ArgsT>(args)...));
return pf.move_future();
}

View File

@ -39,8 +39,6 @@ extern int VERBOSITY_NAME(actor);
class ActorInfo;
enum class ActorSendType { Immediate, Later };
class Scheduler;
class SchedulerGuard {
public:
@ -109,14 +107,21 @@ class Scheduler {
template <class... ArgsT>
void destroy_on_scheduler(int32 sched_id, ArgsT &...values);
template <ActorSendType send_type, class EventT>
void send_lambda(ActorRef actor_ref, EventT &&func);
template <class EventT>
void send_lambda_immediately(ActorRef actor_ref, EventT &&func);
template <ActorSendType send_type, class EventT>
void send_closure(ActorRef actor_ref, EventT &&closure);
template <class EventT>
void send_lambda_later(ActorRef actor_ref, EventT &&func);
template <ActorSendType send_type>
void send(ActorRef actor_ref, Event &&event);
template <class EventT>
void send_closure_immediately(ActorRef actor_ref, EventT &&closure);
template <class EventT>
void send_closure_later(ActorRef actor_ref, EventT &&closure);
void send_immediately(ActorRef actor_ref, Event &&event);
void send_later(ActorRef actor_ref, Event &&event);
void before_tail_send(const ActorId<> &actor_id);
@ -199,11 +204,16 @@ class Scheduler {
void flush_mailbox(ActorInfo *actor_info);
void get_actor_sched_id(const ActorInfo *actor_info, int32 &actor_sched_id, bool &on_current_sched,
bool &can_send_immediately);
void get_actor_sched_id_to_send_immediately(const ActorInfo *actor_info, int32 &actor_sched_id,
bool &on_current_sched, bool &can_send_immediately);
template <ActorSendType send_type, class RunFuncT, class EventFuncT>
void send_impl(const ActorId<> &actor_id, const RunFuncT &run_func, const EventFuncT &event_func);
void get_actor_sched_id_to_send_later(const ActorInfo *actor_info, int32 &actor_sched_id, bool &on_current_sched);
template <class RunFuncT, class EventFuncT>
void send_immediately_impl(const ActorId<> &actor_id, const RunFuncT &run_func, const EventFuncT &event_func);
template <class EventFuncT>
void send_later_impl(const ActorId<> &actor_id, const EventFuncT &event_func);
Timestamp run_timeout();
void run_mailbox();
@ -275,8 +285,8 @@ void send_closure(ActorIdT &&actor_id, FunctionT function, ArgsT &&...args) {
using FunctionClassT = member_function_class_t<FunctionT>;
static_assert(std::is_base_of<FunctionClassT, ActorT>::value, "unsafe send_closure");
Scheduler::instance()->send_closure<ActorSendType::Immediate>(
std::forward<ActorIdT>(actor_id), create_immediate_closure(function, std::forward<ArgsT>(args)...));
Scheduler::instance()->send_closure_immediately(std::forward<ActorIdT>(actor_id),
create_immediate_closure(function, std::forward<ArgsT>(args)...));
}
template <class ActorIdT, class FunctionT, class... ArgsT>
@ -285,23 +295,23 @@ void send_closure_later(ActorIdT &&actor_id, FunctionT function, ArgsT &&...args
using FunctionClassT = member_function_class_t<FunctionT>;
static_assert(std::is_base_of<FunctionClassT, ActorT>::value, "unsafe send_closure");
Scheduler::instance()->send<ActorSendType::Later>(std::forward<ActorIdT>(actor_id),
Scheduler::instance()->send_later(std::forward<ActorIdT>(actor_id),
Event::delayed_closure(function, std::forward<ArgsT>(args)...));
}
template <class... ArgsT>
void send_lambda(ActorRef actor_ref, ArgsT &&...args) {
Scheduler::instance()->send_lambda<ActorSendType::Immediate>(actor_ref, std::forward<ArgsT>(args)...);
Scheduler::instance()->send_lambda_immediately(actor_ref, std::forward<ArgsT>(args)...);
}
template <class... ArgsT>
void send_event(ActorRef actor_ref, ArgsT &&...args) {
Scheduler::instance()->send<ActorSendType::Immediate>(actor_ref, std::forward<ArgsT>(args)...);
Scheduler::instance()->send_immediately(actor_ref, std::forward<ArgsT>(args)...);
}
template <class... ArgsT>
void send_event_later(ActorRef actor_ref, ArgsT &&...args) {
Scheduler::instance()->send<ActorSendType::Later>(actor_ref, std::forward<ArgsT>(args)...);
Scheduler::instance()->send_later(actor_ref, std::forward<ArgsT>(args)...);
}
} // namespace td

View File

@ -301,8 +301,8 @@ void Scheduler::do_event(ActorInfo *actor_info, Event &&event) {
// can't clear event here. It may be already destroyed during destroy_actor
}
void Scheduler::get_actor_sched_id(const ActorInfo *actor_info, int32 &actor_sched_id, bool &on_current_sched,
bool &can_send_immediately) {
void Scheduler::get_actor_sched_id_to_send_immediately(const ActorInfo *actor_info, int32 &actor_sched_id,
bool &on_current_sched, bool &can_send_immediately) {
bool is_migrating;
std::tie(actor_sched_id, is_migrating) = actor_info->migrate_dest_flag_atomic();
on_current_sched = !is_migrating && sched_id_ == actor_sched_id;
@ -310,6 +310,14 @@ void Scheduler::get_actor_sched_id(const ActorInfo *actor_info, int32 &actor_sch
can_send_immediately = on_current_sched && !actor_info->is_running() && actor_info->mailbox_.empty();
}
void Scheduler::get_actor_sched_id_to_send_later(const ActorInfo *actor_info, int32 &actor_sched_id,
bool &on_current_sched) {
bool is_migrating;
std::tie(actor_sched_id, is_migrating) = actor_info->migrate_dest_flag_atomic();
on_current_sched = !is_migrating && sched_id_ == actor_sched_id;
CHECK(has_guard_ || !on_current_sched);
}
void Scheduler::register_migrated_actor(ActorInfo *actor_info) {
VLOG(actor) << "Register migrated actor " << *actor_info << ", " << tag("actor_count", actor_count_);
actor_count_++;
@ -549,7 +557,7 @@ Timestamp Scheduler::run_timeout() {
while (!timeout_queue_.empty() && timeout_queue_.top_key() < now) {
HeapNode *node = timeout_queue_.pop();
ActorInfo *actor_info = ActorInfo::from_heap_node(node);
send<ActorSendType::Immediate>(actor_info->actor_id(), Event::timeout());
send_immediately(actor_info->actor_id(), Event::timeout());
}
return get_timeout();
}

View File

@ -108,12 +108,12 @@ ActorOwn<ActorT> Scheduler::register_actor_impl(Slice name, ActorT *actor_ptr, A
ActorId<ActorT> actor_id = weak_info->actor_id(actor_ptr);
if (sched_id != sched_id_) {
send<ActorSendType::Later>(actor_id, Event::start());
send_later(actor_id, Event::start());
do_migrate_actor(actor_info, sched_id);
} else {
pending_actors_list_.put(weak_info->get_list_node());
if (ActorTraits<ActorT>::need_start_up) {
send<ActorSendType::Later>(actor_id, Event::start());
send_later(actor_id, Event::start());
}
}
@ -178,8 +178,9 @@ inline void Scheduler::before_tail_send(const ActorId<> &actor_id) {
// TODO
}
template <ActorSendType send_type, class RunFuncT, class EventFuncT>
void Scheduler::send_impl(const ActorId<> &actor_id, const RunFuncT &run_func, const EventFuncT &event_func) {
template <class RunFuncT, class EventFuncT>
void Scheduler::send_immediately_impl(const ActorId<> &actor_id, const RunFuncT &run_func,
const EventFuncT &event_func) {
ActorInfo *actor_info = actor_id.get_actor_info();
if (unlikely(actor_info == nullptr || close_flag_)) {
return;
@ -188,9 +189,9 @@ void Scheduler::send_impl(const ActorId<> &actor_id, const RunFuncT &run_func, c
int32 actor_sched_id;
bool on_current_sched;
bool can_send_immediately;
get_actor_sched_id(actor_info, actor_sched_id, on_current_sched, can_send_immediately);
get_actor_sched_id_to_send_immediately(actor_info, actor_sched_id, on_current_sched, can_send_immediately);
if (likely(send_type == ActorSendType::Immediate && can_send_immediately)) { // run immediately
if (likely(can_send_immediately)) { // run immediately
EventGuard guard(this, actor_info);
run_func(actor_info);
} else {
@ -202,9 +203,27 @@ void Scheduler::send_impl(const ActorId<> &actor_id, const RunFuncT &run_func, c
}
}
template <ActorSendType send_type, class EventT>
void Scheduler::send_lambda(ActorRef actor_ref, EventT &&func) {
return send_impl<send_type>(
template <class EventFuncT>
void Scheduler::send_later_impl(const ActorId<> &actor_id, const EventFuncT &event_func) {
ActorInfo *actor_info = actor_id.get_actor_info();
if (unlikely(actor_info == nullptr || close_flag_)) {
return;
}
int32 actor_sched_id;
bool on_current_sched;
get_actor_sched_id_to_send_later(actor_info, actor_sched_id, on_current_sched);
if (on_current_sched) {
add_to_mailbox(actor_info, event_func());
} else {
send_to_scheduler(actor_sched_id, actor_id, event_func());
}
}
template <class EventT>
void Scheduler::send_lambda_immediately(ActorRef actor_ref, EventT &&func) {
return send_immediately_impl(
actor_ref.get(),
[&](ActorInfo *actor_info) {
event_context_ptr_->link_token = actor_ref.token();
@ -217,9 +236,18 @@ void Scheduler::send_lambda(ActorRef actor_ref, EventT &&func) {
});
}
template <ActorSendType send_type, class EventT>
void Scheduler::send_closure(ActorRef actor_ref, EventT &&closure) {
return send_impl<send_type>(
template <class EventT>
void Scheduler::send_lambda_later(ActorRef actor_ref, EventT &&func) {
return send_later_impl(actor_ref.get(), [&] {
auto event = Event::from_lambda(std::forward<EventT>(func));
event.set_link_token(actor_ref.token());
return event;
});
}
template <class EventT>
void Scheduler::send_closure_immediately(ActorRef actor_ref, EventT &&closure) {
return send_immediately_impl(
actor_ref.get(),
[&](ActorInfo *actor_info) {
event_context_ptr_->link_token = actor_ref.token();
@ -232,14 +260,27 @@ void Scheduler::send_closure(ActorRef actor_ref, EventT &&closure) {
});
}
template <ActorSendType send_type>
void Scheduler::send(ActorRef actor_ref, Event &&event) {
template <class EventT>
void Scheduler::send_closure_later(ActorRef actor_ref, EventT &&closure) {
return send_later_impl(actor_ref.get(), [&] {
auto event = Event::immediate_closure(std::forward<EventT>(closure));
event.set_link_token(actor_ref.token());
return send_impl<send_type>(
return event;
});
}
inline void Scheduler::send_immediately(ActorRef actor_ref, Event &&event) {
event.set_link_token(actor_ref.token());
return send_immediately_impl(
actor_ref.get(), [&](ActorInfo *actor_info) { do_event(actor_info, std::move(event)); },
[&] { return std::move(event); });
}
inline void Scheduler::send_later(ActorRef actor_ref, Event &&event) {
event.set_link_token(actor_ref.token());
return send_later_impl(actor_ref.get(), [&] { return std::move(event); });
}
inline void Scheduler::subscribe(PollableFd fd, PollFlags flags) {
instance()->poll_.subscribe(std::move(fd), flags);
}
@ -256,7 +297,7 @@ inline void Scheduler::yield_actor(Actor *actor) {
yield_actor(actor->get_info());
}
inline void Scheduler::yield_actor(ActorInfo *actor_info) {
send<ActorSendType::Later>(actor_info->actor_id(), Event::yield());
send_later(actor_info->actor_id(), Event::yield());
}
inline void Scheduler::stop_actor(Actor *actor) {

View File

@ -126,8 +126,8 @@ class QueryActor final : public td::Actor {
callback_->on_result(std::move(query));
} else {
auto future = td::Random::fast(0, 3) == 0
? td::send_promise<td::ActorSendType::Immediate>(rand_elem(workers_), &Worker::query, x, p)
: td::send_promise<td::ActorSendType::Later>(rand_elem(workers_), &Worker::query, x, p);
? td::send_promise_immediately(rand_elem(workers_), &Worker::query, x, p)
: td::send_promise_later(rand_elem(workers_), &Worker::query, x, p);
if (future.is_ready()) {
query.result = future.move_as_ok();
callback_->on_result(std::move(query));
@ -301,9 +301,8 @@ class SimpleActor final : public td::Actor {
}
q_++;
p_ = td::Random::fast_bool() ? 1 : 10000;
auto future = td::Random::fast(0, 3) == 0
? td::send_promise<td::ActorSendType::Immediate>(worker_, &Worker::query, q_, p_)
: td::send_promise<td::ActorSendType::Later>(worker_, &Worker::query, q_, p_);
auto future = td::Random::fast(0, 3) == 0 ? td::send_promise_immediately(worker_, &Worker::query, q_, p_)
: td::send_promise_later(worker_, &Worker::query, q_, p_);
if (future.is_ready()) {
auto result = future.move_as_ok();
CHECK(result == fast_pow_mod_uint32(q_, p_));