Add group call participants syncronization.

This commit is contained in:
levlam 2020-12-15 12:41:26 +03:00
parent 37a17476c7
commit c63da4b241
2 changed files with 123 additions and 19 deletions

View File

@ -405,6 +405,7 @@ struct GroupCallManager::GroupCall {
bool is_active = false;
bool is_joined = false;
bool is_speaking = false;
bool syncing_participants = false;
bool loaded_all_participants = false;
bool mute_new_participants = false;
bool allowed_change_mute_new_participants = false;
@ -702,7 +703,7 @@ void GroupCallManager::finish_get_group_call(InputGroupCallId input_group_call_i
LOG(ERROR) << "Expected " << input_group_call_id << ", but received " << to_string(result.ok());
result = Status::Error(500, "Receive another group call");
} else {
process_group_call_participants(input_group_call_id, std::move(result.ok_ref()->participants_), true);
process_group_call_participants(input_group_call_id, std::move(result.ok_ref()->participants_), true, false);
auto participants_it = group_call_participants_.find(input_group_call_id);
if (participants_it != group_call_participants_.end()) {
@ -753,13 +754,32 @@ void GroupCallManager::on_get_group_call_participants(
return;
}
auto is_empty = participants->participants_.empty();
process_group_call_participants(input_group_call_id, std::move(participants->participants_), is_load);
bool is_sync = is_load && offset.empty();
if (is_sync) {
auto group_call = get_group_call(input_group_call_id);
is_sync = group_call->syncing_participants;
if (is_sync) {
group_call->syncing_participants = false;
on_receive_group_call_version(input_group_call_id, participants->version_);
if (group_call->version >= participants->version_) {
LOG(INFO) << "Ignore result of outdated participants sync with version " << participants->version_ << " in "
<< input_group_call_id << " from " << group_call->dialog_id << ", because current version is "
<< group_call->version;
return;
}
LOG(INFO) << "Finish syncing participants in " << input_group_call_id << " from " << group_call->dialog_id;
group_call->version = participants->version_;
}
}
auto is_empty = participants->participants_.empty();
process_group_call_participants(input_group_call_id, std::move(participants->participants_), is_load, is_sync);
if (!is_sync) {
on_receive_group_call_version(input_group_call_id, participants->version_);
}
if (is_load) {
// TODO use count
auto participants_it = group_call_participants_.find(input_group_call_id);
if (participants_it != group_call_participants_.end()) {
CHECK(participants_it->second != nullptr);
@ -768,24 +788,39 @@ void GroupCallManager::on_get_group_call_participants(
}
}
if (is_empty) {
if (is_empty || is_sync) {
bool need_update = false;
auto group_call = get_group_call(input_group_call_id);
CHECK(group_call != nullptr && group_call->is_inited);
if (!group_call->loaded_all_participants) {
if (is_empty && !group_call->loaded_all_participants) {
group_call->loaded_all_participants = true;
need_update = true;
}
auto real_participant_count = participants_it != group_call_participants_.end()
? static_cast<int32>(participants_it->second->participants.size())
: 0;
auto real_participant_count = participants->count_;
if (is_empty) {
auto known_participant_count = participants_it != group_call_participants_.end()
? static_cast<int32>(participants_it->second->participants.size())
: 0;
if (real_participant_count != known_participant_count) {
LOG(ERROR) << "Receive participant count " << real_participant_count << ", but know "
<< known_participant_count << " participants in " << input_group_call_id << " from "
<< group_call->dialog_id;
real_participant_count = known_participant_count;
}
}
if (real_participant_count != group_call->participant_count) {
LOG(ERROR) << "Have participant count " << group_call->participant_count << " instead of "
<< real_participant_count << " in " << input_group_call_id;
if (!is_sync) {
LOG(ERROR) << "Have participant count " << group_call->participant_count << " instead of "
<< real_participant_count << " in " << input_group_call_id << " from " << group_call->dialog_id;
}
group_call->participant_count = real_participant_count;
need_update = true;
}
if (!is_empty && is_sync && group_call->loaded_all_participants && group_call->participant_count > 50) {
group_call->loaded_all_participants = false;
need_update = true;
}
if (need_update) {
send_update_group_call(group_call);
}
@ -868,7 +903,7 @@ bool GroupCallManager::process_pending_group_call_participant_updates(InputGroup
group_call->version = version;
diff += process_group_call_participants_from_updates(input_group_call_id, std::move(participants));
pending_updates.erase(it);
} else {
} else if (!group_call->syncing_participants) {
// found a gap
sync_participants_timeout_.add_timeout_in(group_call->group_call_id.get(), 1.0);
break;
@ -899,17 +934,51 @@ void GroupCallManager::sync_group_call_participants(InputGroupCallId input_group
sync_participants_timeout_.cancel_timeout(group_call->group_call_id.get());
if (group_call->syncing_participants) {
return;
}
group_call->syncing_participants = true;
LOG(INFO) << "Force participants synchronization in " << input_group_call_id;
// TODO
auto promise = PromiseCreator::lambda([actor_id = actor_id(this), input_group_call_id](Result<Unit> &&result) {
if (result.is_error()) {
send_closure(actor_id, &GroupCallManager::on_sync_group_call_participants_failed, input_group_call_id);
}
});
td_->create_handler<GetGroupCallParticipantsQuery>(std::move(promise))->send(input_group_call_id, string(), 100);
}
void GroupCallManager::on_sync_group_call_participants_failed(InputGroupCallId input_group_call_id) {
if (G()->close_flag() || !need_group_call_participants(input_group_call_id)) {
return;
}
auto group_call = get_group_call(input_group_call_id);
CHECK(group_call != nullptr && group_call->is_inited);
CHECK(group_call->syncing_participants);
group_call->syncing_participants = false;
sync_participants_timeout_.add_timeout_in(group_call->group_call_id.get(), 1.0);
}
void GroupCallManager::process_group_call_participants(
InputGroupCallId input_group_call_id, vector<tl_object_ptr<telegram_api::groupCallParticipant>> &&participants,
bool is_load) {
bool is_load, bool is_sync) {
if (!need_group_call_participants(input_group_call_id)) {
return;
}
std::unordered_set<UserId, UserIdHash> old_participant_user_ids;
if (is_sync) {
auto participants_it = group_call_participants_.find(input_group_call_id);
if (participants_it != group_call_participants_.end()) {
CHECK(participants_it->second != nullptr);
for (auto &participant : participants_it->second->participants) {
old_participant_user_ids.insert(participant.user_id);
}
}
}
int64 min_order = std::numeric_limits<int64>::max();
for (auto &participant : participants) {
GroupCallParticipant group_call_participant(participant);
@ -925,14 +994,44 @@ void GroupCallManager::process_group_call_participants(
} else {
min_order = real_order;
}
process_group_call_participant(input_group_call_id, GroupCallParticipant(participant));
if (is_sync) {
old_participant_user_ids.erase(group_call_participant.user_id);
}
process_group_call_participant(input_group_call_id, std::move(group_call_participant));
}
if (is_sync) {
auto participants_it = group_call_participants_.find(input_group_call_id);
if (participants_it != group_call_participants_.end()) {
CHECK(participants_it->second != nullptr);
auto &group_participants = participants_it->second->participants;
for (auto participant_it = group_participants.begin(); participant_it != group_participants.end();) {
auto &participant = *participant_it;
if (old_participant_user_ids.count(participant.user_id) == 0) {
CHECK(participant.order == 0 || participant.order >= min_order);
++participant_it;
continue;
}
// not synced user, needs to be deleted
if (participant.order != 0) {
CHECK(participant.order >= participants_it->second->min_order);
participant.order = 0;
send_update_group_call_participant(input_group_call_id, participant);
}
participant_it = group_participants.erase(participant_it);
}
if (participants_it->second->min_order < min_order) {
// if previously known more users, adjust min_order
participants_it->second->min_order = min_order;
}
}
}
if (is_load) {
auto participants_it = group_call_participants_.find(input_group_call_id);
if (participants_it != group_call_participants_.end()) {
CHECK(participants_it->second != nullptr);
if (participants_it->second->min_order > min_order) {
auto old_min_order = participants_it->second->min_order;
auto old_min_order = participants_it->second->min_order;
if (old_min_order > min_order) {
participants_it->second->min_order = min_order;
for (auto &participant : participants_it->second->participants) {
@ -1577,6 +1676,9 @@ void GroupCallManager::on_receive_group_call_version(InputGroupCallId input_grou
if (version <= group_call->version) {
return;
}
if (group_call->syncing_participants) {
return;
}
// found a gap
auto &group_call_participants = group_call_participants_[input_group_call_id];

View File

@ -123,9 +123,11 @@ class GroupCallManager : public Actor {
void sync_group_call_participants(InputGroupCallId input_group_call_id);
void on_sync_group_call_participants_failed(InputGroupCallId input_group_call_id);
void process_group_call_participants(InputGroupCallId group_call_id,
vector<tl_object_ptr<telegram_api::groupCallParticipant>> &&participants,
bool is_load);
bool is_load, bool is_sync);
int32 process_group_call_participants_from_updates(
InputGroupCallId group_call_id, vector<tl_object_ptr<telegram_api::groupCallParticipant>> &&participants);