Replace actor Send::Flags with ActorSendType.
GitOrigin-RevId: 30b50f6a856301d6dfc7cbe9c0410281c93beddd
This commit is contained in:
parent
deecdb66bc
commit
ba027ac0f5
@ -599,11 +599,11 @@ class PromiseFuture {
|
|||||||
};
|
};
|
||||||
|
|
||||||
template <class T, class ActorAT, class ActorBT, class ResultT, class... DestArgsT, class... ArgsT>
|
template <class T, class ActorAT, class ActorBT, class ResultT, class... DestArgsT, class... ArgsT>
|
||||||
FutureActor<T> send_promise(ActorId<ActorAT> actor_id, Send::Flags flags,
|
FutureActor<T> send_promise(ActorId<ActorAT> actor_id, ActorSendType send_type,
|
||||||
ResultT (ActorBT::*func)(PromiseActor<T> &&, DestArgsT...), ArgsT &&... args) {
|
ResultT (ActorBT::*func)(PromiseActor<T> &&, DestArgsT...), ArgsT &&... args) {
|
||||||
PromiseFuture<T> pf;
|
PromiseFuture<T> pf;
|
||||||
::td::Scheduler::instance()->send_closure(
|
::td::Scheduler::instance()->send_closure(
|
||||||
std::move(actor_id), create_immediate_closure(func, pf.move_promise(), std::forward<ArgsT>(args)...), flags);
|
std::move(actor_id), create_immediate_closure(func, pf.move_promise(), std::forward<ArgsT>(args)...), send_type);
|
||||||
return pf.move_future();
|
return pf.move_future();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -33,12 +33,7 @@ namespace td {
|
|||||||
|
|
||||||
class ActorInfo;
|
class ActorInfo;
|
||||||
|
|
||||||
struct Send {
|
enum class ActorSendType { Immediate, Later, LaterWeak };
|
||||||
using Flags = uint32;
|
|
||||||
static const Flags immediate = 0x001;
|
|
||||||
static const Flags later = 0x002;
|
|
||||||
static const Flags later_weak = 0x004;
|
|
||||||
};
|
|
||||||
|
|
||||||
class Scheduler;
|
class Scheduler;
|
||||||
class SchedulerGuard {
|
class SchedulerGuard {
|
||||||
@ -99,12 +94,12 @@ class Scheduler {
|
|||||||
void send_to_other_scheduler(int32 sched_id, const ActorId<> &actor_id, Event &&event);
|
void send_to_other_scheduler(int32 sched_id, const ActorId<> &actor_id, Event &&event);
|
||||||
|
|
||||||
template <class EventT>
|
template <class EventT>
|
||||||
void send_lambda(ActorRef actor_ref, EventT &&lambda, Send::Flags flags = 0);
|
void send_lambda(ActorRef actor_ref, EventT &&lambda, ActorSendType send_type);
|
||||||
|
|
||||||
template <class EventT>
|
template <class EventT>
|
||||||
void send_closure(ActorRef actor_ref, EventT &&closure, Send::Flags flags = 0);
|
void send_closure(ActorRef actor_ref, EventT &&closure, ActorSendType send_type);
|
||||||
|
|
||||||
void send(ActorRef actor_ref, Event &&event, Send::Flags flags = 0);
|
void send(ActorRef actor_ref, Event &&event, ActorSendType send_type);
|
||||||
|
|
||||||
void hack(const ActorId<> &actor_id, Event &&event) {
|
void hack(const ActorId<> &actor_id, Event &&event) {
|
||||||
actor_id.get_actor_unsafe()->raw_event(event.data);
|
actor_id.get_actor_unsafe()->raw_event(event.data);
|
||||||
@ -183,7 +178,8 @@ class Scheduler {
|
|||||||
void flush_mailbox(ActorInfo *actor_info, const RunFuncT &run_func, const EventFuncT &event_func);
|
void flush_mailbox(ActorInfo *actor_info, const RunFuncT &run_func, const EventFuncT &event_func);
|
||||||
|
|
||||||
template <class RunFuncT, class EventFuncT>
|
template <class RunFuncT, class EventFuncT>
|
||||||
void send_impl(const ActorId<> &actor_id, Send::Flags flags, const RunFuncT &run_func, const EventFuncT &event_func);
|
void send_impl(const ActorId<> &actor_id, ActorSendType send_type, const RunFuncT &run_func,
|
||||||
|
const EventFuncT &event_func);
|
||||||
|
|
||||||
void inc_wait_generation();
|
void inc_wait_generation();
|
||||||
|
|
||||||
@ -266,7 +262,8 @@ void send_closure(ActorIdT &&actor_id, FunctionT function, ArgsT &&... args) {
|
|||||||
static_assert(std::is_base_of<FunctionClassT, ActorT>::value, "unsafe send_closure");
|
static_assert(std::is_base_of<FunctionClassT, ActorT>::value, "unsafe send_closure");
|
||||||
|
|
||||||
Scheduler::instance()->send_closure(std::forward<ActorIdT>(actor_id),
|
Scheduler::instance()->send_closure(std::forward<ActorIdT>(actor_id),
|
||||||
create_immediate_closure(function, std::forward<ArgsT>(args)...));
|
create_immediate_closure(function, std::forward<ArgsT>(args)...),
|
||||||
|
ActorSendType::Immediate);
|
||||||
}
|
}
|
||||||
|
|
||||||
template <class ActorIdT, class FunctionT, class... ArgsT>
|
template <class ActorIdT, class FunctionT, class... ArgsT>
|
||||||
@ -276,22 +273,22 @@ void send_closure_later(ActorIdT &&actor_id, FunctionT function, ArgsT &&... arg
|
|||||||
static_assert(std::is_base_of<FunctionClassT, ActorT>::value, "unsafe send_closure");
|
static_assert(std::is_base_of<FunctionClassT, ActorT>::value, "unsafe send_closure");
|
||||||
|
|
||||||
Scheduler::instance()->send(std::forward<ActorIdT>(actor_id),
|
Scheduler::instance()->send(std::forward<ActorIdT>(actor_id),
|
||||||
Event::delayed_closure(function, std::forward<ArgsT>(args)...), Send::later);
|
Event::delayed_closure(function, std::forward<ArgsT>(args)...), ActorSendType::Later);
|
||||||
}
|
}
|
||||||
|
|
||||||
template <class... ArgsT>
|
template <class... ArgsT>
|
||||||
void send_lambda(ActorRef actor_ref, ArgsT &&... args) {
|
void send_lambda(ActorRef actor_ref, ArgsT &&... args) {
|
||||||
Scheduler::instance()->send_lambda(actor_ref, std::forward<ArgsT>(args)...);
|
Scheduler::instance()->send_lambda(actor_ref, std::forward<ArgsT>(args)..., ActorSendType::Immediate);
|
||||||
}
|
}
|
||||||
|
|
||||||
template <class... ArgsT>
|
template <class... ArgsT>
|
||||||
void send_event(ActorRef actor_ref, ArgsT &&... args) {
|
void send_event(ActorRef actor_ref, ArgsT &&... args) {
|
||||||
Scheduler::instance()->send(actor_ref, std::forward<ArgsT>(args)...);
|
Scheduler::instance()->send(actor_ref, std::forward<ArgsT>(args)..., ActorSendType::Immediate);
|
||||||
}
|
}
|
||||||
|
|
||||||
template <class... ArgsT>
|
template <class... ArgsT>
|
||||||
void send_event_later(ActorRef actor_ref, ArgsT &&... args) {
|
void send_event_later(ActorRef actor_ref, ArgsT &&... args) {
|
||||||
Scheduler::instance()->send(actor_ref, std::forward<ArgsT>(args)..., Send::later);
|
Scheduler::instance()->send(actor_ref, std::forward<ArgsT>(args)..., ActorSendType::Later);
|
||||||
}
|
}
|
||||||
|
|
||||||
void yield_scheduler();
|
void yield_scheduler();
|
||||||
|
@ -469,7 +469,7 @@ double Scheduler::run_timeout() {
|
|||||||
HeapNode *node = timeout_queue_.pop();
|
HeapNode *node = timeout_queue_.pop();
|
||||||
ActorInfo *actor_info = ActorInfo::from_heap_node(node);
|
ActorInfo *actor_info = ActorInfo::from_heap_node(node);
|
||||||
inc_wait_generation();
|
inc_wait_generation();
|
||||||
send(actor_info->actor_id(), Event::timeout(), Send::immediate);
|
send(actor_info->actor_id(), Event::timeout(), ActorSendType::Immediate);
|
||||||
}
|
}
|
||||||
if (timeout_queue_.empty()) {
|
if (timeout_queue_.empty()) {
|
||||||
return 10000;
|
return 10000;
|
||||||
|
@ -109,12 +109,12 @@ ActorOwn<ActorT> Scheduler::register_actor_impl(Slice name, ActorT *actor_ptr, A
|
|||||||
|
|
||||||
ActorId<ActorT> actor_id = weak_info->actor_id(actor_ptr);
|
ActorId<ActorT> actor_id = weak_info->actor_id(actor_ptr);
|
||||||
if (sched_id != sched_id_) {
|
if (sched_id != sched_id_) {
|
||||||
send(actor_id, Event::start(), Send::later_weak);
|
send(actor_id, Event::start(), ActorSendType::LaterWeak);
|
||||||
do_migrate_actor(actor_info, sched_id);
|
do_migrate_actor(actor_info, sched_id);
|
||||||
} else {
|
} else {
|
||||||
pending_actors_list_.put(weak_info->get_list_node());
|
pending_actors_list_.put(weak_info->get_list_node());
|
||||||
if (!ActorTraits<ActorT>::is_lite) {
|
if (!ActorTraits<ActorT>::is_lite) {
|
||||||
send(actor_id, Event::start(), Send::later_weak);
|
send(actor_id, Event::start(), ActorSendType::LaterWeak);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -185,7 +185,7 @@ inline void Scheduler::inc_wait_generation() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
template <class RunFuncT, class EventFuncT>
|
template <class RunFuncT, class EventFuncT>
|
||||||
void Scheduler::send_impl(const ActorId<> &actor_id, Send::Flags flags, const RunFuncT &run_func,
|
void Scheduler::send_impl(const ActorId<> &actor_id, ActorSendType send_type, const RunFuncT &run_func,
|
||||||
const EventFuncT &event_func) {
|
const EventFuncT &event_func) {
|
||||||
CHECK(has_guard_);
|
CHECK(has_guard_);
|
||||||
ActorInfo *actor_info = actor_id.get_actor_info();
|
ActorInfo *actor_info = actor_id.get_actor_info();
|
||||||
@ -200,7 +200,7 @@ void Scheduler::send_impl(const ActorId<> &actor_id, Send::Flags flags, const Ru
|
|||||||
std::tie(actor_sched_id, is_migrating) = actor_info->migrate_dest_flag_atomic();
|
std::tie(actor_sched_id, is_migrating) = actor_info->migrate_dest_flag_atomic();
|
||||||
bool on_current_sched = !is_migrating && sched_id_ == actor_sched_id;
|
bool on_current_sched = !is_migrating && sched_id_ == actor_sched_id;
|
||||||
|
|
||||||
if (likely(!(flags & Send::later) && !(flags & Send::later_weak) && on_current_sched && !actor_info->is_running() &&
|
if (likely(send_type == ActorSendType::Immediate && on_current_sched && !actor_info->is_running() &&
|
||||||
!actor_info->must_wait(wait_generation_))) { // run immediately
|
!actor_info->must_wait(wait_generation_))) { // run immediately
|
||||||
if (likely(actor_info->mailbox_.empty())) {
|
if (likely(actor_info->mailbox_.empty())) {
|
||||||
EventGuard guard(this, actor_info);
|
EventGuard guard(this, actor_info);
|
||||||
@ -211,7 +211,7 @@ void Scheduler::send_impl(const ActorId<> &actor_id, Send::Flags flags, const Ru
|
|||||||
} else {
|
} else {
|
||||||
if (on_current_sched) {
|
if (on_current_sched) {
|
||||||
add_to_mailbox(actor_info, event_func());
|
add_to_mailbox(actor_info, event_func());
|
||||||
if (flags & Send::later) {
|
if (send_type == ActorSendType::Later) {
|
||||||
actor_info->set_wait_generation(wait_generation_);
|
actor_info->set_wait_generation(wait_generation_);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
@ -221,8 +221,8 @@ void Scheduler::send_impl(const ActorId<> &actor_id, Send::Flags flags, const Ru
|
|||||||
}
|
}
|
||||||
|
|
||||||
template <class EventT>
|
template <class EventT>
|
||||||
void Scheduler::send_lambda(ActorRef actor_ref, EventT &&lambda, Send::Flags flags) {
|
void Scheduler::send_lambda(ActorRef actor_ref, EventT &&lambda, ActorSendType send_type) {
|
||||||
return send_impl(actor_ref.get(), flags,
|
return send_impl(actor_ref.get(), send_type,
|
||||||
[&](ActorInfo *actor_info) {
|
[&](ActorInfo *actor_info) {
|
||||||
event_context_ptr_->link_token = actor_ref.token();
|
event_context_ptr_->link_token = actor_ref.token();
|
||||||
lambda();
|
lambda();
|
||||||
@ -235,8 +235,8 @@ void Scheduler::send_lambda(ActorRef actor_ref, EventT &&lambda, Send::Flags fla
|
|||||||
}
|
}
|
||||||
|
|
||||||
template <class EventT>
|
template <class EventT>
|
||||||
void Scheduler::send_closure(ActorRef actor_ref, EventT &&closure, Send::Flags flags) {
|
void Scheduler::send_closure(ActorRef actor_ref, EventT &&closure, ActorSendType send_type) {
|
||||||
return send_impl(actor_ref.get(), flags,
|
return send_impl(actor_ref.get(), send_type,
|
||||||
[&](ActorInfo *actor_info) {
|
[&](ActorInfo *actor_info) {
|
||||||
event_context_ptr_->link_token = actor_ref.token();
|
event_context_ptr_->link_token = actor_ref.token();
|
||||||
closure.run(static_cast<typename EventT::ActorType *>(actor_info->get_actor_unsafe()));
|
closure.run(static_cast<typename EventT::ActorType *>(actor_info->get_actor_unsafe()));
|
||||||
@ -248,9 +248,9 @@ void Scheduler::send_closure(ActorRef actor_ref, EventT &&closure, Send::Flags f
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
inline void Scheduler::send(ActorRef actor_ref, Event &&event, Send::Flags flags) {
|
inline void Scheduler::send(ActorRef actor_ref, Event &&event, ActorSendType send_type) {
|
||||||
event.set_link_token(actor_ref.token());
|
event.set_link_token(actor_ref.token());
|
||||||
return send_impl(actor_ref.get(), flags, [&](ActorInfo *actor_info) { do_event(actor_info, std::move(event)); },
|
return send_impl(actor_ref.get(), send_type, [&](ActorInfo *actor_info) { do_event(actor_info, std::move(event)); },
|
||||||
[&]() { return std::move(event); });
|
[&]() { return std::move(event); });
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -270,7 +270,7 @@ inline void Scheduler::yield_actor(Actor *actor) {
|
|||||||
yield_actor(actor->get_info());
|
yield_actor(actor->get_info());
|
||||||
}
|
}
|
||||||
inline void Scheduler::yield_actor(ActorInfo *actor_info) {
|
inline void Scheduler::yield_actor(ActorInfo *actor_info) {
|
||||||
send(actor_info->actor_id(), Event::yield(), Send::later_weak);
|
send(actor_info->actor_id(), Event::yield(), ActorSendType::LaterWeak);
|
||||||
}
|
}
|
||||||
|
|
||||||
inline void Scheduler::stop_actor(Actor *actor) {
|
inline void Scheduler::stop_actor(Actor *actor) {
|
||||||
|
@ -128,7 +128,9 @@ class QueryActor final : public Actor {
|
|||||||
query.result = slow_pow_mod_uint32(x, p);
|
query.result = slow_pow_mod_uint32(x, p);
|
||||||
callback_->on_result(std::move(query));
|
callback_->on_result(std::move(query));
|
||||||
} else {
|
} else {
|
||||||
auto future = send_promise(rand_elem(workers_), Random::fast(0, 3) == 0 ? 0 : Send::later, &Worker::query, x, p);
|
auto future =
|
||||||
|
send_promise(rand_elem(workers_), Random::fast(0, 3) == 0 ? ActorSendType::Immediate : ActorSendType::Later,
|
||||||
|
&Worker::query, x, p);
|
||||||
if (future.is_ready()) {
|
if (future.is_ready()) {
|
||||||
query.result = future.move_as_ok();
|
query.result = future.move_as_ok();
|
||||||
callback_->on_result(std::move(query));
|
callback_->on_result(std::move(query));
|
||||||
@ -302,7 +304,8 @@ class SimpleActor final : public Actor {
|
|||||||
}
|
}
|
||||||
q_++;
|
q_++;
|
||||||
p_ = Random::fast(0, 1) ? 1 : 10000;
|
p_ = Random::fast(0, 1) ? 1 : 10000;
|
||||||
auto future = send_promise(worker_, Random::fast(0, 3) == 0 ? 0 : Send::later, &Worker::query, q_, p_);
|
auto future = send_promise(worker_, Random::fast(0, 3) == 0 ? ActorSendType::Immediate : ActorSendType::Later,
|
||||||
|
&Worker::query, q_, p_);
|
||||||
if (future.is_ready()) {
|
if (future.is_ready()) {
|
||||||
auto result = future.move_as_ok();
|
auto result = future.move_as_ok();
|
||||||
CHECK(result == fast_pow_mod_uint32(q_, p_));
|
CHECK(result == fast_pow_mod_uint32(q_, p_));
|
||||||
|
Reference in New Issue
Block a user