Apply group call participant updates in the correct order.

This commit is contained in:
levlam 2020-12-12 14:10:37 +03:00
parent 841ef6dc7f
commit 7398de2790
2 changed files with 148 additions and 42 deletions

View File

@ -418,6 +418,8 @@ struct GroupCallManager::GroupCallParticipants {
vector<GroupCallParticipant> participants;
string next_offset;
int64 min_order = std::numeric_limits<int64>::max();
std::map<int32, vector<tl_object_ptr<telegram_api::groupCallParticipant>>> pending_updates_;
};
struct GroupCallManager::GroupCallRecentSpeakers {
@ -439,6 +441,9 @@ GroupCallManager::GroupCallManager(Td *td, ActorShared<> parent) : td_(td), pare
recent_speaker_update_timeout_.set_callback(on_recent_speaker_update_timeout_callback);
recent_speaker_update_timeout_.set_callback_data(static_cast<void *>(this));
sync_participants_timeout_.set_callback(on_sync_participants_timeout_callback);
sync_participants_timeout_.set_callback_data(static_cast<void *>(this));
}
GroupCallManager::~GroupCallManager() = default;
@ -505,6 +510,27 @@ void GroupCallManager::on_recent_speaker_update_timeout(GroupCallId group_call_i
false); // will update the list and send updateGroupCall if needed
}
void GroupCallManager::on_sync_participants_timeout_callback(void *group_call_manager_ptr, int64 group_call_id_int) {
if (G()->close_flag()) {
return;
}
auto group_call_manager = static_cast<GroupCallManager *>(group_call_manager_ptr);
send_closure_later(group_call_manager->actor_id(group_call_manager), &GroupCallManager::on_sync_participants_timeout,
GroupCallId(narrow_cast<int32>(group_call_id_int)));
}
void GroupCallManager::on_sync_participants_timeout(GroupCallId group_call_id) {
if (G()->close_flag()) {
return;
}
LOG(INFO) << "Receive sync participants timeout in " << group_call_id;
auto input_group_call_id = get_input_group_call_id(group_call_id).move_as_ok();
sync_group_call_participants(input_group_call_id);
}
GroupCallId GroupCallManager::get_group_call_id(InputGroupCallId input_group_call_id, ChannelId channel_id) {
if (td_->auth_manager_->is_bot() || !input_group_call_id.is_valid()) {
return GroupCallId();
@ -703,38 +729,105 @@ void GroupCallManager::on_update_group_call_participants(
InputGroupCallId input_group_call_id, vector<tl_object_ptr<telegram_api::groupCallParticipant>> &&participants,
int32 version) {
if (!need_group_call_participants(input_group_call_id)) {
LOG(INFO) << "Ignore updateGroupCallParticipants in unknown " << input_group_call_id;
LOG(INFO) << "Ignore updateGroupCallParticipants in " << input_group_call_id;
return;
}
if (version <= 0) {
LOG(ERROR) << "Ignore updateGroupCallParticipants with invalid version " << version << " in "
<< input_group_call_id;
return;
}
if (participants.empty()) {
LOG(INFO) << "Ignore empty updateGroupCallParticipants with version " << version << " in " << input_group_call_id;
return;
}
auto &group_call_participants = group_call_participants_[input_group_call_id];
if (group_call_participants == nullptr) {
group_call_participants = make_unique<GroupCallParticipants>();
}
auto &pending_updates = group_call_participants->pending_updates_[version];
if (!pending_updates.empty()) {
LOG(ERROR) << "Receive duplicate updateGroupCallParticipants with version " << version << " in "
<< input_group_call_id;
sync_group_call_participants(input_group_call_id);
return;
}
pending_updates = std::move(participants);
process_pending_group_call_participants_updates(input_group_call_id);
}
void GroupCallManager::process_pending_group_call_participants_updates(InputGroupCallId input_group_call_id) {
if (!need_group_call_participants(input_group_call_id)) {
return;
}
auto participants_it = group_call_participants_.find(input_group_call_id);
if (participants_it == group_call_participants_.end()) {
return;
}
auto &pending_updates = participants_it->second->pending_updates_;
auto group_call = get_group_call(input_group_call_id);
CHECK(group_call != nullptr && group_call->is_inited);
int32 diff = 0;
while (!pending_updates.empty()) {
auto it = pending_updates.begin();
auto version = it->first;
auto &participants = it->second;
if (version <= group_call->version) {
for (auto &group_call_participant : participants) {
GroupCallParticipant participant(group_call_participant);
if (participant.user_id == td_->contacts_manager_->get_my_id()) {
process_group_call_participant(input_group_call_id, std::move(participant));
}
}
LOG(INFO) << "Ignore already applied updateGroupCallParticipants with version " << version << " in "
<< input_group_call_id;
pending_updates.erase(it);
continue;
}
if (version < group_call->version + static_cast<int32>(participants.size())) {
sync_group_call_participants(input_group_call_id);
break;
}
if (version == group_call->version + static_cast<int32>(participants.size())) {
group_call->version = version;
diff += process_group_call_participants_from_updates(input_group_call_id, std::move(participants));
pending_updates.erase(it);
} else {
// found a gap
sync_participants_timeout_.add_timeout_in(group_call->group_call_id.get(), 1.0);
break;
}
}
if (pending_updates.empty()) {
sync_participants_timeout_.cancel_timeout(group_call->group_call_id.get());
}
if (diff != 0 && (group_call->participant_count != 0 || diff > 0)) {
group_call->participant_count += diff;
if (group_call->participant_count < 0) {
LOG(ERROR) << "Participant count became negative in " << input_group_call_id;
group_call->participant_count = 0;
}
send_update_group_call(group_call);
}
}
void GroupCallManager::sync_group_call_participants(InputGroupCallId input_group_call_id) {
if (!need_group_call_participants(input_group_call_id)) {
return;
}
auto group_call = get_group_call(input_group_call_id);
CHECK(group_call != nullptr && group_call->is_inited);
if (group_call->version >= version) {
if (participants.size() == 1 && group_call->version == version) {
GroupCallParticipant participant(participants[0]);
if (participant.user_id == td_->contacts_manager_->get_my_id()) {
process_group_call_participant(input_group_call_id, std::move(participant));
return;
}
}
LOG(INFO) << "Ignore already applied updateGroupCallParticipants in " << input_group_call_id;
return;
}
if (version == group_call->version + static_cast<int32>(participants.size())) {
group_call->version += static_cast<int32>(participants.size());
auto diff = process_group_call_participants_from_updates(input_group_call_id, std::move(participants));
if (diff != 0 && (group_call->participant_count != 0 || diff > 0)) {
group_call->participant_count += diff;
if (group_call->participant_count < 0) {
LOG(ERROR) << "Participant count became negative in " << input_group_call_id;
group_call->participant_count = 0;
}
send_update_group_call(group_call);
}
return;
}
// TODO sync group call participant list
sync_participants_timeout_.cancel_timeout(group_call->group_call_id.get());
LOG(INFO) << "Force participants synchronization in " << input_group_call_id;
// TODO
}
void GroupCallManager::process_group_call_participants(
@ -1256,6 +1349,7 @@ void GroupCallManager::try_clear_group_call_participants(InputGroupCallId input_
group_call->loaded_all_participants = false;
send_update_group_call(group_call);
}
group_call->version = -1;
for (auto &participant : participants->participants) {
if (participant.order != 0) {
@ -1307,6 +1401,9 @@ InputGroupCallId GroupCallManager::update_group_call(const tl_object_ptr<telegra
auto *group_call = add_group_call(input_group_call_id, channel_id);
call.group_call_id = group_call->group_call_id;
call.channel_id = channel_id.is_valid() ? channel_id : group_call->channel_id;
if (!group_call->channel_id.is_valid()) {
group_call->channel_id = channel_id;
}
if (!group_call->is_inited) {
*group_call = std::move(call);
need_update = true;
@ -1324,25 +1421,28 @@ InputGroupCallId GroupCallManager::update_group_call(const tl_object_ptr<telegra
auto mute_flags_changed =
call.mute_new_participants != group_call->mute_new_participants ||
call.allowed_change_mute_new_participants != group_call->allowed_change_mute_new_participants;
if (mute_flags_changed && call.version >= group_call->version) {
group_call->mute_new_participants = call.mute_new_participants;
group_call->allowed_change_mute_new_participants = call.allowed_change_mute_new_participants;
need_update = true;
}
if (call.version > group_call->version) {
if (group_call->version != -1) {
on_receive_group_call_version(input_group_call_id, call.version);
// if we know group call version, then update participants only by corresponding updates
call.participant_count = group_call->participant_count;
call.version = group_call->version;
on_receive_group_call_version(input_group_call_id, call.version);
} else {
if (call.participant_count != group_call->participant_count) {
group_call->participant_count = call.participant_count;
need_update = true;
}
if (need_group_call_participants(input_group_call_id)) {
// init version
group_call->version = call.version;
}
}
if (group_call->channel_id.is_valid()) {
td_->contacts_manager_->on_update_channel_group_call(group_call->channel_id, true,
call.participant_count == 0);
}
need_update = call.participant_count != group_call->participant_count || mute_flags_changed;
*group_call = std::move(call);
} else if (call.version == group_call->version) {
if (mute_flags_changed) {
group_call->mute_new_participants = call.mute_new_participants;
group_call->allowed_change_mute_new_participants = call.allowed_change_mute_new_participants;
need_update = true;
group_call->participant_count == 0);
}
}
}
@ -1350,9 +1450,6 @@ InputGroupCallId GroupCallManager::update_group_call(const tl_object_ptr<telegra
if (!group_call->is_active && group_call_recent_speakers_.erase(group_call->group_call_id) != 0) {
need_update = true;
}
if (!group_call->channel_id.is_valid()) {
group_call->channel_id = channel_id;
}
if (!join_params.empty()) {
need_update |= on_join_group_call_response(input_group_call_id, std::move(join_params));
}

View File

@ -96,6 +96,10 @@ class GroupCallManager : public Actor {
void on_recent_speaker_update_timeout(GroupCallId group_call_id);
static void on_sync_participants_timeout_callback(void *group_call_manager_ptr, int64 group_call_id_int);
void on_sync_participants_timeout(GroupCallId group_call_id);
Result<InputGroupCallId> get_input_group_call_id(GroupCallId group_call_id);
GroupCallId get_next_group_call_id(InputGroupCallId input_group_call_id);
@ -113,6 +117,10 @@ class GroupCallManager : public Actor {
bool need_group_call_participants(InputGroupCallId input_group_call_id) const;
void process_pending_group_call_participants_updates(InputGroupCallId input_group_call_id);
void sync_group_call_participants(InputGroupCallId input_group_call_id);
void process_group_call_participants(InputGroupCallId group_call_id,
vector<tl_object_ptr<telegram_api::groupCallParticipant>> &&participants,
bool is_load);
@ -184,6 +192,7 @@ class GroupCallManager : public Actor {
MultiTimeout pending_send_speaking_action_timeout_{"PendingSendSpeakingActionTimeout"};
MultiTimeout recent_speaker_update_timeout_{"RecentSpeakerUpdateTimeout"};
MultiTimeout sync_participants_timeout_{"SyncParticipantsTimeout"};
};
} // namespace td