diff --git a/td/telegram/GroupCallManager.cpp b/td/telegram/GroupCallManager.cpp index 25ae5c7c7..9d70eccd1 100644 --- a/td/telegram/GroupCallManager.cpp +++ b/td/telegram/GroupCallManager.cpp @@ -405,6 +405,7 @@ struct GroupCallManager::GroupCall { bool is_active = false; bool is_joined = false; bool is_speaking = false; + bool syncing_participants = false; bool loaded_all_participants = false; bool mute_new_participants = false; bool allowed_change_mute_new_participants = false; @@ -702,7 +703,7 @@ void GroupCallManager::finish_get_group_call(InputGroupCallId input_group_call_i LOG(ERROR) << "Expected " << input_group_call_id << ", but received " << to_string(result.ok()); result = Status::Error(500, "Receive another group call"); } else { - process_group_call_participants(input_group_call_id, std::move(result.ok_ref()->participants_), true); + process_group_call_participants(input_group_call_id, std::move(result.ok_ref()->participants_), true, false); auto participants_it = group_call_participants_.find(input_group_call_id); if (participants_it != group_call_participants_.end()) { @@ -753,13 +754,32 @@ void GroupCallManager::on_get_group_call_participants( return; } - auto is_empty = participants->participants_.empty(); - process_group_call_participants(input_group_call_id, std::move(participants->participants_), is_load); + bool is_sync = is_load && offset.empty(); + if (is_sync) { + auto group_call = get_group_call(input_group_call_id); + is_sync = group_call->syncing_participants; + if (is_sync) { + group_call->syncing_participants = false; - on_receive_group_call_version(input_group_call_id, participants->version_); + if (group_call->version >= participants->version_) { + LOG(INFO) << "Ignore result of outdated participants sync with version " << participants->version_ << " in " + << input_group_call_id << " from " << group_call->dialog_id << ", because current version is " + << group_call->version; + return; + } + LOG(INFO) << "Finish syncing participants in " << input_group_call_id << " from " << group_call->dialog_id; + group_call->version = participants->version_; + } + } + + auto is_empty = participants->participants_.empty(); + process_group_call_participants(input_group_call_id, std::move(participants->participants_), is_load, is_sync); + + if (!is_sync) { + on_receive_group_call_version(input_group_call_id, participants->version_); + } if (is_load) { - // TODO use count auto participants_it = group_call_participants_.find(input_group_call_id); if (participants_it != group_call_participants_.end()) { CHECK(participants_it->second != nullptr); @@ -768,24 +788,39 @@ void GroupCallManager::on_get_group_call_participants( } } - if (is_empty) { + if (is_empty || is_sync) { bool need_update = false; auto group_call = get_group_call(input_group_call_id); CHECK(group_call != nullptr && group_call->is_inited); - if (!group_call->loaded_all_participants) { + if (is_empty && !group_call->loaded_all_participants) { group_call->loaded_all_participants = true; need_update = true; } - auto real_participant_count = participants_it != group_call_participants_.end() - ? static_cast(participants_it->second->participants.size()) - : 0; + auto real_participant_count = participants->count_; + if (is_empty) { + auto known_participant_count = participants_it != group_call_participants_.end() + ? static_cast(participants_it->second->participants.size()) + : 0; + if (real_participant_count != known_participant_count) { + LOG(ERROR) << "Receive participant count " << real_participant_count << ", but know " + << known_participant_count << " participants in " << input_group_call_id << " from " + << group_call->dialog_id; + real_participant_count = known_participant_count; + } + } if (real_participant_count != group_call->participant_count) { - LOG(ERROR) << "Have participant count " << group_call->participant_count << " instead of " - << real_participant_count << " in " << input_group_call_id; + if (!is_sync) { + LOG(ERROR) << "Have participant count " << group_call->participant_count << " instead of " + << real_participant_count << " in " << input_group_call_id << " from " << group_call->dialog_id; + } group_call->participant_count = real_participant_count; need_update = true; } + if (!is_empty && is_sync && group_call->loaded_all_participants && group_call->participant_count > 50) { + group_call->loaded_all_participants = false; + need_update = true; + } if (need_update) { send_update_group_call(group_call); } @@ -868,7 +903,7 @@ bool GroupCallManager::process_pending_group_call_participant_updates(InputGroup group_call->version = version; diff += process_group_call_participants_from_updates(input_group_call_id, std::move(participants)); pending_updates.erase(it); - } else { + } else if (!group_call->syncing_participants) { // found a gap sync_participants_timeout_.add_timeout_in(group_call->group_call_id.get(), 1.0); break; @@ -899,17 +934,51 @@ void GroupCallManager::sync_group_call_participants(InputGroupCallId input_group sync_participants_timeout_.cancel_timeout(group_call->group_call_id.get()); + if (group_call->syncing_participants) { + return; + } + group_call->syncing_participants = true; + LOG(INFO) << "Force participants synchronization in " << input_group_call_id; - // TODO + auto promise = PromiseCreator::lambda([actor_id = actor_id(this), input_group_call_id](Result &&result) { + if (result.is_error()) { + send_closure(actor_id, &GroupCallManager::on_sync_group_call_participants_failed, input_group_call_id); + } + }); + td_->create_handler(std::move(promise))->send(input_group_call_id, string(), 100); +} + +void GroupCallManager::on_sync_group_call_participants_failed(InputGroupCallId input_group_call_id) { + if (G()->close_flag() || !need_group_call_participants(input_group_call_id)) { + return; + } + + auto group_call = get_group_call(input_group_call_id); + CHECK(group_call != nullptr && group_call->is_inited); + CHECK(group_call->syncing_participants); + group_call->syncing_participants = false; + + sync_participants_timeout_.add_timeout_in(group_call->group_call_id.get(), 1.0); } void GroupCallManager::process_group_call_participants( InputGroupCallId input_group_call_id, vector> &&participants, - bool is_load) { + bool is_load, bool is_sync) { if (!need_group_call_participants(input_group_call_id)) { return; } + std::unordered_set old_participant_user_ids; + if (is_sync) { + auto participants_it = group_call_participants_.find(input_group_call_id); + if (participants_it != group_call_participants_.end()) { + CHECK(participants_it->second != nullptr); + for (auto &participant : participants_it->second->participants) { + old_participant_user_ids.insert(participant.user_id); + } + } + } + int64 min_order = std::numeric_limits::max(); for (auto &participant : participants) { GroupCallParticipant group_call_participant(participant); @@ -925,14 +994,44 @@ void GroupCallManager::process_group_call_participants( } else { min_order = real_order; } - process_group_call_participant(input_group_call_id, GroupCallParticipant(participant)); + if (is_sync) { + old_participant_user_ids.erase(group_call_participant.user_id); + } + process_group_call_participant(input_group_call_id, std::move(group_call_participant)); + } + if (is_sync) { + auto participants_it = group_call_participants_.find(input_group_call_id); + if (participants_it != group_call_participants_.end()) { + CHECK(participants_it->second != nullptr); + auto &group_participants = participants_it->second->participants; + for (auto participant_it = group_participants.begin(); participant_it != group_participants.end();) { + auto &participant = *participant_it; + if (old_participant_user_ids.count(participant.user_id) == 0) { + CHECK(participant.order == 0 || participant.order >= min_order); + ++participant_it; + continue; + } + + // not synced user, needs to be deleted + if (participant.order != 0) { + CHECK(participant.order >= participants_it->second->min_order); + participant.order = 0; + send_update_group_call_participant(input_group_call_id, participant); + } + participant_it = group_participants.erase(participant_it); + } + if (participants_it->second->min_order < min_order) { + // if previously known more users, adjust min_order + participants_it->second->min_order = min_order; + } + } } if (is_load) { auto participants_it = group_call_participants_.find(input_group_call_id); if (participants_it != group_call_participants_.end()) { CHECK(participants_it->second != nullptr); - if (participants_it->second->min_order > min_order) { - auto old_min_order = participants_it->second->min_order; + auto old_min_order = participants_it->second->min_order; + if (old_min_order > min_order) { participants_it->second->min_order = min_order; for (auto &participant : participants_it->second->participants) { @@ -1577,6 +1676,9 @@ void GroupCallManager::on_receive_group_call_version(InputGroupCallId input_grou if (version <= group_call->version) { return; } + if (group_call->syncing_participants) { + return; + } // found a gap auto &group_call_participants = group_call_participants_[input_group_call_id]; diff --git a/td/telegram/GroupCallManager.h b/td/telegram/GroupCallManager.h index 7dc86174e..e3b27b61e 100644 --- a/td/telegram/GroupCallManager.h +++ b/td/telegram/GroupCallManager.h @@ -123,9 +123,11 @@ class GroupCallManager : public Actor { void sync_group_call_participants(InputGroupCallId input_group_call_id); + void on_sync_group_call_participants_failed(InputGroupCallId input_group_call_id); + void process_group_call_participants(InputGroupCallId group_call_id, vector> &&participants, - bool is_load); + bool is_load, bool is_sync); int32 process_group_call_participants_from_updates( InputGroupCallId group_call_id, vector> &&participants);