Save reference to UpdatesManager in promise.

This commit is contained in:
levlam 2021-11-05 03:31:48 +03:00
parent 8bc413742d
commit b7cf2e578d
2 changed files with 32 additions and 3 deletions

View File

@ -176,6 +176,28 @@ void UpdatesManager::tear_down() {
parent_.reset(); parent_.reset();
} }
void UpdatesManager::hangup_shared() {
ref_cnt_--;
if (ref_cnt_ == 0) {
stop();
}
}
void UpdatesManager::hangup() {
pending_pts_updates_.clear();
postponed_pts_updates_.clear();
postponed_updates_.clear();
pending_seq_updates_.clear();
pending_qts_updates_.clear();
hangup_shared();
}
ActorShared<UpdatesManager> UpdatesManager::create_reference() {
ref_cnt_++;
return actor_shared(this, 1);
}
void UpdatesManager::fill_pts_gap(void *td) { void UpdatesManager::fill_pts_gap(void *td) {
CHECK(td != nullptr); CHECK(td != nullptr);
if (G()->close_flag()) { if (G()->close_flag()) {
@ -1652,7 +1674,7 @@ void UpdatesManager::on_pending_updates(vector<tl_object_ptr<telegram_api::Updat
} }
MultiPromiseActorSafe mpas{"OnPendingUpdatesMultiPromiseActor"}; MultiPromiseActorSafe mpas{"OnPendingUpdatesMultiPromiseActor"};
mpas.add_promise([actor_id = actor_id(this), promise = std::move(promise)](Result<Unit> &&result) mutable { mpas.add_promise([actor_id = create_reference(), promise = std::move(promise)](Result<Unit> &&result) mutable {
send_closure(actor_id, &UpdatesManager::on_pending_updates_processed, std::move(result), std::move(promise)); send_closure(actor_id, &UpdatesManager::on_pending_updates_processed, std::move(result), std::move(promise));
}); });
auto lock = mpas.get_promise(); auto lock = mpas.get_promise();
@ -1773,7 +1795,7 @@ void UpdatesManager::on_pending_updates(vector<tl_object_ptr<telegram_api::Updat
lock.set_value(Unit()); lock.set_value(Unit());
} }
void UpdatesManager::on_pending_updates_processed(Result<Unit> &&result, Promise<Unit> &&promise) { void UpdatesManager::on_pending_updates_processed(Result<Unit> result, Promise<Unit> promise) {
promise.set_result(std::move(result)); promise.set_result(std::move(result));
} }

View File

@ -181,6 +181,7 @@ class UpdatesManager final : public Actor {
Td *td_; Td *td_;
ActorShared<> parent_; ActorShared<> parent_;
int32 ref_cnt_ = 1;
PtsManager pts_manager_; PtsManager pts_manager_;
PtsManager qts_manager_; PtsManager qts_manager_;
@ -225,6 +226,12 @@ class UpdatesManager final : public Actor {
void tear_down() final; void tear_down() final;
void hangup_shared() final;
void hangup() final;
ActorShared<UpdatesManager> create_reference();
int32 get_pts() const { int32 get_pts() const {
return pts_manager_.mem_pts(); return pts_manager_.mem_pts();
} }
@ -268,7 +275,7 @@ class UpdatesManager final : public Actor {
void on_pending_updates(vector<tl_object_ptr<telegram_api::Update>> &&updates, int32 seq_begin, int32 seq_end, void on_pending_updates(vector<tl_object_ptr<telegram_api::Update>> &&updates, int32 seq_begin, int32 seq_end,
int32 date, double receive_time, Promise<Unit> &&promise, const char *source); int32 date, double receive_time, Promise<Unit> &&promise, const char *source);
void on_pending_updates_processed(Result<Unit> &&result, Promise<Unit> &&promise); void on_pending_updates_processed(Result<Unit> result, Promise<Unit> promise);
void process_updates(vector<tl_object_ptr<telegram_api::Update>> &&updates, bool force_apply, void process_updates(vector<tl_object_ptr<telegram_api::Update>> &&updates, bool force_apply,
Promise<Unit> &&promise); Promise<Unit> &&promise);