Add and use Scheduler::run_on_scheduler.

This commit is contained in:
levlam 2022-06-30 00:28:25 +03:00
parent aa635a89c2
commit 32724a5a6c
4 changed files with 43 additions and 24 deletions

View File

@ -377,18 +377,13 @@ Status TdDb::init_sqlite(int32 scheduler_id, const TdParameters &parameters, con
}
void TdDb::open(int32 scheduler_id, TdParameters parameters, DbKey key, Promise<OpenedDatabase> &&promise) {
if (scheduler_id >= 0 && Scheduler::instance()->sched_id() != scheduler_id) {
class Worker final : public Actor {
public:
void open(TdParameters &&parameters, DbKey &&key, Promise<OpenedDatabase> &&promise) {
TdDb::open(-1, std::move(parameters), std::move(key), std::move(promise));
stop();
}
};
send_closure(create_actor_on_scheduler<Worker>("Worker", scheduler_id), &Worker::open, std::move(parameters),
std::move(key), std::move(promise));
return;
}
Scheduler::instance()->run_on_scheduler(scheduler_id, [parameters = std::move(parameters), key = std::move(key),
promise = std::move(promise)](Unit) mutable {
TdDb::open_impl(-1, std::move(parameters), std::move(key), std::move(promise));
});
}
void TdDb::open_impl(int32 scheduler_id, TdParameters parameters, DbKey key, Promise<OpenedDatabase> &&promise) {
OpenedDatabase result;
// Init pmc
@ -483,18 +478,13 @@ TdDb::TdDb() = default;
TdDb::~TdDb() = default;
void TdDb::check_parameters(int32 scheduler_id, TdParameters parameters, Promise<CheckedParameters> promise) {
if (scheduler_id >= 0 && Scheduler::instance()->sched_id() != scheduler_id) {
class Worker final : public Actor {
public:
void run(TdParameters parameters, Promise<CheckedParameters> promise) {
TdDb::check_parameters(-1, std::move(parameters), std::move(promise));
stop();
}
};
send_closure(create_actor_on_scheduler<Worker>("Worker", scheduler_id), &Worker::run, std::move(parameters),
std::move(promise));
return;
}
Scheduler::instance()->run_on_scheduler(
scheduler_id, [parameters = std::move(parameters), promise = std::move(promise)](Unit) mutable {
TdDb::check_parameters_impl(std::move(parameters), std::move(promise));
});
}
void TdDb::check_parameters_impl(TdParameters parameters, Promise<CheckedParameters> promise) {
CheckedParameters result;
auto prepare_dir = [](string dir) -> Result<string> {

View File

@ -124,6 +124,10 @@ class TdDb {
std::shared_ptr<BinlogKeyValue<ConcurrentBinlog>> config_pmc_;
std::shared_ptr<ConcurrentBinlog> binlog_;
static void open_impl(int32 scheduler_id, TdParameters parameters, DbKey key, Promise<OpenedDatabase> &&promise);
static void check_parameters_impl(TdParameters parameters, Promise<CheckedParameters> promise);
Status init_sqlite(int32 scheduler_id, const TdParameters &parameters, const DbKey &key, const DbKey &old_key,
BinlogKeyValue<Binlog> &binlog_pmc);

View File

@ -22,6 +22,7 @@
#include "td/utils/port/Poll.h"
#include "td/utils/port/PollFlags.h"
#include "td/utils/port/thread_local.h"
#include "td/utils/Promise.h"
#include "td/utils/Slice.h"
#include "td/utils/Time.h"
#include "td/utils/type_traits.h"
@ -98,6 +99,8 @@ class Scheduler {
void send_to_scheduler(int32 sched_id, const ActorId<> &actor_id, Event &&event);
void send_to_other_scheduler(int32 sched_id, const ActorId<> &actor_id, Event &&event);
void run_on_scheduler(int32 sched_id, Promise<Unit> action); // TODO Action
template <ActorSendType send_type, class EventT>
void send_lambda(ActorRef actor_ref, EventT &&lambda);

View File

@ -339,6 +339,28 @@ void Scheduler::send_to_other_scheduler(int32 sched_id, const ActorId<> &actor_i
}
}
void Scheduler::run_on_scheduler(int32 sched_id, Promise<Unit> action) {
if (sched_id >= 0 && sched_id_ != sched_id) {
class Worker final : public Actor {
public:
explicit Worker(Promise<Unit> action) : action_(std::move(action)) {
}
private:
Promise<Unit> action_;
void start_up() final {
action_.set_value(Unit());
stop();
}
};
create_actor_on_scheduler<Worker>("RunOnSchedulerWorker", sched_id, std::move(action)).release();
return;
}
action.set_value(Unit());
}
void Scheduler::add_to_mailbox(ActorInfo *actor_info, Event &&event) {
if (!actor_info->is_running()) {
auto node = actor_info->get_list_node();