Merge remote-tracking branch 'td/master'

This commit is contained in:
Andrea Cavalli 2021-11-03 16:21:09 +01:00
commit aed520759f
10 changed files with 122 additions and 48 deletions

View File

@ -18,6 +18,58 @@
#pragma comment(linker, "/STACK:16777216")
#endif
struct TestActor final : public td::Actor {
static td::int32 actor_count_;
void start_up() final {
actor_count_++;
stop();
}
void tear_down() final {
if (--actor_count_ == 0) {
td::Scheduler::instance()->finish();
}
}
};
td::int32 TestActor::actor_count_;
template <>
class td::ActorTraits<TestActor> {
public:
static constexpr bool need_context = false;
static constexpr bool need_start_up = true;
};
class CreateActorBench final : public td::Benchmark {
private:
td::ConcurrentScheduler scheduler_;
void start_up() final {
scheduler_.init(0);
scheduler_.start();
}
void tear_down() final {
scheduler_.finish();
}
public:
td::string get_description() const final {
return "CreateActor";
}
void run(int n) final {
for (int i = 0; i < n; i++) {
scheduler_.create_actor_unsafe<TestActor>(0, "TestActor").release();
}
while (scheduler_.run_main(10)) {
// empty
}
}
};
template <int type>
class RingBench final : public td::Benchmark {
public:
@ -26,11 +78,11 @@ class RingBench final : public td::Benchmark {
private:
int actor_n_ = -1;
int thread_n_ = -1;
std::vector<td::ActorId<PassActor>> actor_array_;
td::vector<td::ActorId<PassActor>> actor_array_;
td::ConcurrentScheduler *scheduler_ = nullptr;
public:
std::string get_description() const final {
td::string get_description() const final {
static const char *types[] = {"later", "immediate", "raw", "tail", "lambda"};
static_assert(0 <= type && type < 5, "");
return PSTRING() << "Ring (send_" << types[type] << ") (threads_n = " << thread_n_ << ")";
@ -89,7 +141,7 @@ class RingBench final : public td::Benchmark {
scheduler_ = new td::ConcurrentScheduler();
scheduler_->init(thread_n_);
actor_array_ = std::vector<td::ActorId<PassActor>>(actor_n_);
actor_array_ = td::vector<td::ActorId<PassActor>>(actor_n_);
for (int i = 0; i < actor_n_; i++) {
actor_array_[i] =
scheduler_->create_actor_unsafe<PassActor>(thread_n_ ? i % thread_n_ : 0, "PassActor").release();
@ -118,7 +170,7 @@ class RingBench final : public td::Benchmark {
template <int type>
class QueryBench final : public td::Benchmark {
public:
std::string get_description() const final {
td::string get_description() const final {
static const char *types[] = {"callback", "immediate future", "delayed future", "dummy", "lambda", "lambda_future"};
static_assert(0 <= type && type < 6, "");
return PSTRING() << "QueryBench: " << types[type];
@ -269,7 +321,7 @@ class QueryBench final : public td::Benchmark {
int main() {
td::init_openssl_threads();
SET_VERBOSITY_LEVEL(VERBOSITY_NAME(DEBUG));
bench(CreateActorBench());
bench(RingBench<4>(504, 0));
bench(RingBench<3>(504, 0));
bench(RingBench<0>(504, 0));

View File

@ -1741,26 +1741,25 @@ class ToggleDialogUnreadMarkQuery final : public Td::ResultHandler {
}
};
class ToggleDialogIsBlockedQuery final : public Td::ResultHandler {
class ToggleDialogIsBlockedActor final : public NetActorOnce {
Promise<Unit> promise_;
DialogId dialog_id_;
bool is_blocked_;
public:
explicit ToggleDialogIsBlockedQuery(Promise<Unit> &&promise) : promise_(std::move(promise)) {
explicit ToggleDialogIsBlockedActor(Promise<Unit> &&promise) : promise_(std::move(promise)) {
}
void send(DialogId dialog_id, bool is_blocked) {
void send(DialogId dialog_id, bool is_blocked, uint64 sequence_dispatcher_id) {
dialog_id_ = dialog_id;
is_blocked_ = is_blocked;
auto input_peer = td->messages_manager_->get_input_peer(dialog_id, AccessRights::Know);
CHECK(input_peer != nullptr && input_peer->get_id() != telegram_api::inputPeerEmpty::ID);
if (is_blocked) {
send_query(G()->net_query_creator().create(telegram_api::contacts_block(std::move(input_peer))));
} else {
send_query(G()->net_query_creator().create(telegram_api::contacts_unblock(std::move(input_peer))));
}
auto query = is_blocked ? G()->net_query_creator().create(telegram_api::contacts_block(std::move(input_peer)))
: G()->net_query_creator().create(telegram_api::contacts_unblock(std::move(input_peer)));
send_closure(td->messages_manager_->sequence_dispatcher_, &MultiSequenceDispatcher::send_with_callback,
std::move(query), actor_shared(this), sequence_dispatcher_id);
}
void on_result(uint64 id, BufferSlice packet) final {
@ -1778,13 +1777,13 @@ class ToggleDialogIsBlockedQuery final : public Td::ResultHandler {
}
void on_error(uint64 id, Status status) final {
if (!td->messages_manager_->on_get_dialog_error(dialog_id_, status, "ToggleDialogIsBlockedQuery")) {
LOG(ERROR) << "Receive error for ToggleDialogIsBlockedQuery: " << status;
if (!td->messages_manager_->on_get_dialog_error(dialog_id_, status, "ToggleDialogIsBlockedActor")) {
LOG(ERROR) << "Receive error for ToggleDialogIsBlockedActor: " << status;
}
if (!G()->close_flag()) {
td->messages_manager_->on_update_dialog_is_blocked(dialog_id_, !is_blocked_);
td->messages_manager_->get_dialog_info_full(dialog_id_, Auto(), "ToggleDialogIsBlockedQuery");
td->messages_manager_->reget_dialog_action_bar(dialog_id_, "ToggleDialogIsBlockedQuery");
td->messages_manager_->get_dialog_info_full(dialog_id_, Auto(), "ToggleDialogIsBlockedActor");
td->messages_manager_->reget_dialog_action_bar(dialog_id_, "ToggleDialogIsBlockedActor");
}
promise_.set_error(std::move(status));
}
@ -19283,8 +19282,9 @@ void MessagesManager::toggle_dialog_is_blocked_on_server(DialogId dialog_id, boo
log_event_id = save_toggle_dialog_is_blocked_on_server_log_event(dialog_id, is_blocked);
}
td_->create_handler<ToggleDialogIsBlockedQuery>(get_erase_log_event_promise(log_event_id))
->send(dialog_id, is_blocked);
send_closure(td_->create_net_actor<ToggleDialogIsBlockedActor>(get_erase_log_event_promise(log_event_id)),
&ToggleDialogIsBlockedActor::send, dialog_id, is_blocked,
get_sequence_dispatcher_id(dialog_id, MessageContentType::Text));
}
Status MessagesManager::toggle_dialog_silent_send_message(DialogId dialog_id, bool silent_send_message) {

View File

@ -91,6 +91,13 @@ class MultiPromiseActor final
}
};
template <>
class ActorTraits<MultiPromiseActor> {
public:
static constexpr bool need_context = false;
static constexpr bool need_start_up = true;
};
class MultiPromiseActorSafe final : public MultiPromiseInterface {
public:
void add_promise(Promise<Unit> &&promise) final;

View File

@ -469,7 +469,8 @@ class PromiseActor;
template <class T>
class ActorTraits<FutureActor<T>> {
public:
static constexpr bool is_lite = true;
static constexpr bool need_context = false;
static constexpr bool need_start_up = false;
};
template <class T>

View File

@ -31,4 +31,11 @@ class SleepActor final : public Actor {
}
};
template <>
class ActorTraits<SleepActor> {
public:
static constexpr bool need_context = false;
static constexpr bool need_start_up = true;
};
} // namespace td

View File

@ -115,7 +115,8 @@ class Actor : public ObserverBase {
template <class ActorT>
class ActorTraits {
public:
static constexpr bool is_lite = false;
static constexpr bool need_context = true;
static constexpr bool need_start_up = true;
};
} // namespace td

View File

@ -63,7 +63,7 @@ class ActorInfo final
ActorInfo &operator=(const ActorInfo &) = delete;
void init(int32 sched_id, Slice name, ObjectPool<ActorInfo>::OwnerPtr &&this_ptr, Actor *actor_ptr, Deleter deleter,
bool is_lite);
bool need_context, bool need_start_up);
void on_actor_moved(Actor *actor_new_ptr);
template <class ActorT>
@ -105,7 +105,8 @@ class ActorInfo final
vector<Event> mailbox_;
bool is_lite() const;
bool need_context() const;
bool need_start_up() const;
void set_wait_generation(uint32 wait_generation);
bool must_wait(uint32 wait_generation) const;
@ -113,7 +114,8 @@ class ActorInfo final
private:
Deleter deleter_ = Deleter::None;
bool is_lite_ = false;
bool need_context_ = true;
bool need_start_up_ = true;
bool is_running_ = false;
bool always_wait_for_mailbox_{false};
uint32 wait_generation_{0};

View File

@ -11,7 +11,6 @@
#include "td/actor/impl/Scheduler-decl.h"
#include "td/utils/common.h"
#include "td/utils/format.h"
#include "td/utils/Heap.h"
#include "td/utils/List.h"
#include "td/utils/logging.h"
@ -32,45 +31,53 @@ inline StringBuilder &operator<<(StringBuilder &sb, const ActorInfo &info) {
}
inline void ActorInfo::init(int32 sched_id, Slice name, ObjectPool<ActorInfo>::OwnerPtr &&this_ptr, Actor *actor_ptr,
Deleter deleter, bool is_lite) {
Deleter deleter, bool need_context, bool need_start_up) {
CHECK(!is_running());
CHECK(!is_migrating());
sched_id_.store(sched_id, std::memory_order_relaxed);
actor_ = actor_ptr;
if (!is_lite) {
if (need_context) {
context_ = Scheduler::context()->this_ptr_.lock();
VLOG(actor) << "Set context " << context_.get() << " for " << name;
#ifdef TD_DEBUG
name_ = name.str();
#endif
}
#ifdef TD_DEBUG
name_.assign(name.data(), name.size());
#endif
actor_->init(std::move(this_ptr));
deleter_ = deleter;
is_lite_ = is_lite;
need_context_ = need_context;
need_start_up_ = need_start_up;
is_running_ = false;
wait_generation_ = 0;
}
inline bool ActorInfo::is_lite() const {
return is_lite_;
inline bool ActorInfo::need_context() const {
return need_context_;
}
inline bool ActorInfo::need_start_up() const {
return need_start_up_;
}
inline void ActorInfo::set_wait_generation(uint32 wait_generation) {
wait_generation_ = wait_generation;
}
inline bool ActorInfo::must_wait(uint32 wait_generation) const {
return wait_generation_ == wait_generation || (always_wait_for_mailbox_ && !mailbox_.empty());
}
inline void ActorInfo::always_wait_for_mailbox() {
always_wait_for_mailbox_ = true;
}
inline void ActorInfo::on_actor_moved(Actor *actor_new_ptr) {
actor_ = actor_new_ptr;
}
inline void ActorInfo::clear() {
// LOG_IF(WARNING, !mailbox_.empty()) << "Destroy actor with non-empty mailbox: " << get_name()
// << format::as_array(mailbox_);
CHECK(mailbox_.empty());
CHECK(!actor_);
CHECK(!is_running());
@ -179,7 +186,7 @@ inline CSlice ActorInfo::get_name() const {
inline void ActorInfo::start_run() {
VLOG(actor) << "Start run actor: " << *this;
LOG_CHECK(!is_running_) << "Recursive call of actor " << tag("name", get_name());
LOG_CHECK(!is_running_) << "Recursive call of actor " << get_name();
is_running_ = true;
}
inline void ActorInfo::finish_run() {

View File

@ -168,10 +168,10 @@ EventGuard::~EventGuard() {
}
info->finish_run();
swap_context(info);
CHECK(info->is_lite() || save_context_ == info->get_context());
CHECK(!info->need_context() || save_context_ == info->get_context());
#ifdef TD_DEBUG
LOG_CHECK(info->is_lite() || save_log_tag2_ == info->get_name().c_str())
<< info->is_lite() << " " << info->empty() << " " << info->is_migrating() << " " << save_log_tag2_ << " "
LOG_CHECK(!info->need_context() || save_log_tag2_ == info->get_name().c_str())
<< info->need_context() << " " << info->empty() << " " << info->is_migrating() << " " << save_log_tag2_ << " "
<< info->get_name() << " " << scheduler_->close_flag_;
#endif
if (event_context_.flags & Scheduler::EventContext::Stop) {
@ -186,7 +186,7 @@ EventGuard::~EventGuard() {
void EventGuard::swap_context(ActorInfo *info) {
std::swap(scheduler_->event_context_ptr_, event_context_ptr_);
if (info->is_lite()) {
if (!info->need_context()) {
return;
}
@ -353,7 +353,7 @@ void Scheduler::do_stop_actor(ActorInfo *actor_info) {
CHECK(!actor_info->is_migrating());
LOG_CHECK(actor_info->migrate_dest() == sched_id_) << actor_info->migrate_dest() << " " << sched_id_;
ObjectPool<ActorInfo>::OwnerPtr owner_ptr;
if (!actor_info->is_lite()) {
if (actor_info->need_start_up()) {
EventGuard guard(this, actor_info);
do_event(actor_info, Event::stop());
owner_ptr = actor_info->get_actor_unsafe()->clear();

View File

@ -10,7 +10,6 @@
#include "td/actor/impl/Scheduler-decl.h"
#include "td/utils/common.h"
#include "td/utils/format.h"
#include "td/utils/Heap.h"
#include "td/utils/logging.h"
#include "td/utils/MpscPollableQueue.h"
@ -103,13 +102,12 @@ ActorOwn<ActorT> Scheduler::register_actor_impl(Slice name, ActorT *actor_ptr, A
LOG_CHECK(sched_id == sched_id_ || (0 <= sched_id && sched_id < static_cast<int32>(outbound_queues_.size())))
<< sched_id;
auto info = actor_info_pool_->create_empty();
VLOG(actor) << "Create actor: " << tag("name", name) << tag("ptr", *info) << tag("context", context())
<< tag("this", this) << tag("actor_count", actor_count_);
actor_count_++;
auto weak_info = info.get_weak();
auto actor_info = info.get();
actor_info->init(sched_id_, name, std::move(info), static_cast<Actor *>(actor_ptr), deleter,
ActorTraits<ActorT>::is_lite);
ActorTraits<ActorT>::need_context, ActorTraits<ActorT>::need_start_up);
VLOG(actor) << "Create actor " << *actor_info << " (actor_count = " << actor_count_ << ')';
ActorId<ActorT> actor_id = weak_info->actor_id(actor_ptr);
if (sched_id != sched_id_) {
@ -117,7 +115,7 @@ ActorOwn<ActorT> Scheduler::register_actor_impl(Slice name, ActorT *actor_ptr, A
do_migrate_actor(actor_info, sched_id);
} else {
pending_actors_list_.put(weak_info->get_list_node());
if (!ActorTraits<ActorT>::is_lite) {
if (ActorTraits<ActorT>::need_start_up) {
send<ActorSendType::LaterWeak>(actor_id, Event::start());
}
}
@ -134,8 +132,7 @@ ActorOwn<ActorT> Scheduler::register_existing_actor(unique_ptr<ActorT> actor_ptr
}
inline void Scheduler::destroy_actor(ActorInfo *actor_info) {
VLOG(actor) << "Destroy actor: " << tag("name", *actor_info) << tag("ptr", actor_info)
<< tag("actor_count", actor_count_);
VLOG(actor) << "Destroy actor " << *actor_info << " (actor_count = " << actor_count_ << ')';
LOG_CHECK(actor_info->migrate_dest() == sched_id_) << actor_info->migrate_dest() << " " << sched_id_;
cancel_actor_timeout(actor_info);