Merge commit '8139e0d560b25cd2827fb0473e5726319998c10f'

Conflicts:
	td/generate/scheme/td_api.tlo
	td/telegram/GroupCallManager.cpp
	td/telegram/MessagesManager.cpp
This commit is contained in:
Andrea Cavalli 2020-12-28 19:09:04 +01:00
commit 17e7e44f6e
13 changed files with 1088 additions and 1398 deletions

View File

@ -2066,6 +2066,9 @@ callStateDiscarded reason:CallDiscardReason need_rating:Bool need_debug_informat
callStateError error:error = CallState; callStateError error:error = CallState;
//@description Describes a recently speaking user in a group call @user_id User identifier @is_speaking True, is the user has spoken recently
groupCallRecentSpeaker user_id:int32 is_speaking:Bool = GroupCallRecentSpeaker;
//@description Describes a group call //@description Describes a group call
//@id Group call identifier //@id Group call identifier
//@is_active True, if the call is active //@is_active True, if the call is active
@ -2075,11 +2078,11 @@ callStateError error:error = CallState;
//@can_be_managed True, if the current user can manage the group call //@can_be_managed True, if the current user can manage the group call
//@participant_count Number of participants in the group call //@participant_count Number of participants in the group call
//@loaded_all_participants True, if all group call participants are loaded //@loaded_all_participants True, if all group call participants are loaded
//@recent_speaker_user_ids Identifiers of recently speaking users in the group call //@recent_speakers Recently speaking users in the group call
//@mute_new_participants True, if only group call administrators can unmute new participants //@mute_new_participants True, if only group call administrators can unmute new participants
//@allowed_change_mute_new_participants True, if group call administrators can enable or disable mute_new_participants setting //@allowed_change_mute_new_participants True, if group call administrators can enable or disable mute_new_participants setting
//@duration Call duration; for ended calls only //@duration Call duration; for ended calls only
groupCall id:int32 is_active:Bool is_joined:Bool need_rejoin:Bool can_unmute_self:Bool can_be_managed:Bool participant_count:int32 loaded_all_participants:Bool recent_speaker_user_ids:vector<int32> mute_new_participants:Bool allowed_change_mute_new_participants:Bool duration:int32 = GroupCall; groupCall id:int32 is_active:Bool is_joined:Bool need_rejoin:Bool can_unmute_self:Bool can_be_managed:Bool participant_count:int32 loaded_all_participants:Bool recent_speakers:vector<groupCallRecentSpeaker> mute_new_participants:Bool allowed_change_mute_new_participants:Bool duration:int32 = GroupCall;
//@description Describes a payload fingerprint for interaction with tgcalls @hash Value of the field hash @setup Value of the field setup @fingerprint Value of the field fingerprint //@description Describes a payload fingerprint for interaction with tgcalls @hash Value of the field hash @setup Value of the field setup @fingerprint Value of the field fingerprint
groupCallPayloadFingerprint hash:string setup:string fingerprint:string = GroupCallPayloadFingerprint; groupCallPayloadFingerprint hash:string setup:string fingerprint:string = GroupCallPayloadFingerprint;
@ -4398,9 +4401,6 @@ setGroupCallParticipantIsSpeaking group_call_id:int32 source:int32 is_speaking:B
//@group_call_id Group call identifier @user_id User identifier @is_muted Pass true if the user must be muted and false otherwise //@group_call_id Group call identifier @user_id User identifier @is_muted Pass true if the user must be muted and false otherwise
toggleGroupCallParticipantIsMuted group_call_id:int32 user_id:int32 is_muted:Bool = Ok; toggleGroupCallParticipantIsMuted group_call_id:int32 user_id:int32 is_muted:Bool = Ok;
//@description Checks whether a group call is still joined. Should be called every 10 seconds when tgcalls notifies about lost connection with the server @group_call_id Group call identifier
checkGroupCallIsJoined group_call_id:int32 = Ok;
//@description Loads more group call participants. The loaded participants will be received through updates. Use the field groupCall.loaded_all_participants to check whether all participants has already been loaded //@description Loads more group call participants. The loaded participants will be received through updates. Use the field groupCall.loaded_all_participants to check whether all participants has already been loaded
//@group_call_id Group call identifier. The group call must be previously received through getGroupCall and must be joined or being joined //@group_call_id Group call identifier. The group call must be previously received through getGroupCall and must be joined or being joined
//@limit Maximum number of participants to load //@limit Maximum number of participants to load

View File

@ -1,6 +0,0 @@
#!/bin/sh
cd $(dirname $0)
tl-parser -e td_api.tlo td_api.tl
tl-parser -e telegram_api.tlo telegram_api.tl
tl-parser -e mtproto_api.tlo mtproto_api.tl
tl-parser -e secret_api.tlo secret_api.tl

View File

@ -8706,6 +8706,18 @@ void ContactsManager::on_load_channel_full_from_database(ChannelId channel_id, s
auto photo = std::move(channel_full->photo); auto photo = std::move(channel_full->photo);
on_update_channel_full_photo(channel_full, channel_id, std::move(photo)); on_update_channel_full_photo(channel_full, channel_id, std::move(photo));
if (channel_full->participant_count < channel_full->administrator_count) {
channel_full->participant_count = channel_full->administrator_count;
}
if (c->participant_count != channel_full->participant_count) {
channel_full->participant_count = c->participant_count;
if (channel_full->participant_count < channel_full->administrator_count) {
channel_full->participant_count = channel_full->administrator_count;
channel_full->expires_at = 0.0;
}
}
update_channel_full(channel_full, channel_id, true); update_channel_full(channel_full, channel_id, true);
if (channel_full->expires_at == 0.0) { if (channel_full->expires_at == 0.0) {
@ -9110,9 +9122,7 @@ void ContactsManager::update_channel_full(ChannelFull *channel_full, ChannelId c
CHECK(channel_full != nullptr); CHECK(channel_full != nullptr);
unavailable_channel_fulls_.erase(channel_id); // don't needed anymore unavailable_channel_fulls_.erase(channel_id); // don't needed anymore
if (channel_full->participant_count < channel_full->administrator_count) { CHECK(channel_full->participant_count >= channel_full->administrator_count);
channel_full->administrator_count = channel_full->participant_count;
}
if (channel_full->is_slow_mode_next_send_date_changed) { if (channel_full->is_slow_mode_next_send_date_changed) {
auto now = G()->server_time(); auto now = G()->server_time();
@ -9517,14 +9527,17 @@ void ContactsManager::on_get_chat_full(tl_object_ptr<telegram_api::ChatFull> &&c
ChannelFull *channel = add_channel_full(channel_id); ChannelFull *channel = add_channel_full(channel_id);
bool have_participant_count = (channel_full->flags_ & CHANNEL_FULL_FLAG_HAS_PARTICIPANT_COUNT) != 0; bool have_participant_count = (channel_full->flags_ & CHANNEL_FULL_FLAG_HAS_PARTICIPANT_COUNT) != 0;
auto participant_count = have_participant_count ? channel_full->participants_count_ : 0; auto participant_count = have_participant_count ? channel_full->participants_count_ : channel->participant_count;
auto administrator_count = 0; auto administrator_count = 0;
if ((channel_full->flags_ & CHANNEL_FULL_FLAG_HAS_ADMINISTRATOR_COUNT) != 0) { if ((channel_full->flags_ & CHANNEL_FULL_FLAG_HAS_ADMINISTRATOR_COUNT) != 0) {
administrator_count = channel_full->admins_count_; administrator_count = channel_full->admins_count_;
} else if (c->is_megagroup || c->status.is_administrator()) { } else if (c->is_megagroup || c->status.is_administrator()) {
// in megagroups and administrated channels don't drop known number of administrators // in megagroups and administered channels don't drop known number of administrators
administrator_count = channel->administrator_count; administrator_count = channel->administrator_count;
} }
if (participant_count < administrator_count) {
participant_count = administrator_count;
}
auto restricted_count = auto restricted_count =
(channel_full->flags_ & CHANNEL_FULL_FLAG_HAS_BANNED_COUNT) != 0 ? channel_full->banned_count_ : 0; (channel_full->flags_ & CHANNEL_FULL_FLAG_HAS_BANNED_COUNT) != 0 ? channel_full->banned_count_ : 0;
auto banned_count = auto banned_count =
@ -10699,11 +10712,20 @@ void ContactsManager::on_get_channel_participants_success(
if (participant_count != -1 || administrator_count != -1) { if (participant_count != -1 || administrator_count != -1) {
auto channel_full = get_channel_full_force(channel_id, "on_get_channel_participants_success"); auto channel_full = get_channel_full_force(channel_id, "on_get_channel_participants_success");
if (channel_full != nullptr) { if (channel_full != nullptr) {
if (participant_count != -1 && channel_full->participant_count != participant_count) { if (administrator_count == -1) {
administrator_count = channel_full->administrator_count;
}
if (participant_count == -1) {
participant_count = channel_full->participant_count;
}
if (participant_count < administrator_count) {
participant_count = administrator_count;
}
if (channel_full->participant_count != participant_count) {
channel_full->participant_count = participant_count; channel_full->participant_count = participant_count;
channel_full->is_changed = true; channel_full->is_changed = true;
} }
if (administrator_count != -1 && channel_full->administrator_count != administrator_count) { if (channel_full->administrator_count != administrator_count) {
channel_full->administrator_count = administrator_count; channel_full->administrator_count = administrator_count;
channel_full->is_changed = true; channel_full->is_changed = true;
} }
@ -13683,6 +13705,18 @@ void ContactsManager::on_update_channel_administrator_count(ChannelId channel_id
if (channel_full != nullptr && channel_full->administrator_count != administrator_count) { if (channel_full != nullptr && channel_full->administrator_count != administrator_count) {
channel_full->administrator_count = administrator_count; channel_full->administrator_count = administrator_count;
channel_full->is_changed = true; channel_full->is_changed = true;
if (channel_full->participant_count < channel_full->administrator_count) {
channel_full->participant_count = channel_full->administrator_count;
auto c = get_channel(channel_id);
if (c != nullptr && c->participant_count != channel_full->participant_count) {
c->participant_count = channel_full->participant_count;
c->is_changed = true;
update_channel(c, channel_id);
}
}
update_channel_full(channel_full, channel_id); update_channel_full(channel_full, channel_id);
} }
} }
@ -13908,6 +13942,13 @@ void ContactsManager::on_chat_update(telegram_api::channel &channel, const char
int32 participant_count = int32 participant_count =
(channel.flags_ & CHANNEL_FLAG_HAS_PARTICIPANT_COUNT) != 0 ? channel.participants_count_ : 0; (channel.flags_ & CHANNEL_FLAG_HAS_PARTICIPANT_COUNT) != 0 ? channel.participants_count_ : 0;
if (participant_count != 0) {
auto channel_full = get_channel_full_const(channel_id);
if (channel_full != nullptr && channel_full->administrator_count > participant_count) {
participant_count = channel_full->administrator_count;
}
}
{ {
bool is_broadcast = (channel.flags_ & CHANNEL_FLAG_IS_BROADCAST) != 0; bool is_broadcast = (channel.flags_ & CHANNEL_FLAG_IS_BROADCAST) != 0;
LOG_IF(ERROR, is_broadcast == is_megagroup) LOG_IF(ERROR, is_broadcast == is_megagroup)
@ -13999,6 +14040,13 @@ void ContactsManager::on_chat_update(telegram_api::channel &channel, const char
if (participant_count != 0 && participant_count != c->participant_count) { if (participant_count != 0 && participant_count != c->participant_count) {
c->participant_count = participant_count; c->participant_count = participant_count;
c->is_changed = true; c->is_changed = true;
auto channel_full = get_channel_full(channel_id, "on_chat_update");
if (channel_full != nullptr && channel_full->participant_count != participant_count) {
channel_full->participant_count = participant_count;
channel_full->is_changed = true;
update_channel_full(channel_full, channel_id);
}
} }
bool need_invalidate_channel_full = false; bool need_invalidate_channel_full = false;
@ -14094,11 +14142,6 @@ void ContactsManager::on_chat_update(telegram_api::channelForbidden &channel, co
sign_messages = true; sign_messages = true;
} }
if (c->participant_count != 0) {
c->participant_count = 0;
c->is_changed = true;
}
bool need_invalidate_channel_full = false; bool need_invalidate_channel_full = false;
if (c->is_slow_mode_enabled != is_slow_mode_enabled || c->is_megagroup != is_megagroup || if (c->is_slow_mode_enabled != is_slow_mode_enabled || c->is_megagroup != is_megagroup ||
!c->restriction_reasons.empty() || c->is_scam != is_scam) { !c->restriction_reasons.empty() || c->is_scam != is_scam) {
@ -14119,6 +14162,19 @@ void ContactsManager::on_chat_update(telegram_api::channelForbidden &channel, co
c->is_changed = true; c->is_changed = true;
} }
if (c->participant_count != 0) {
c->participant_count = 0;
c->is_changed = true;
auto channel_full = get_channel_full(channel_id, "on_chat_update");
if (channel_full != nullptr && channel_full->participant_count != 0) {
channel_full->participant_count = 0;
channel_full->administrator_count = 0;
channel_full->is_changed = true;
update_channel_full(channel_full, channel_id);
}
}
if (c->cache_version != Channel::CACHE_VERSION) { if (c->cache_version != Channel::CACHE_VERSION) {
c->cache_version = Channel::CACHE_VERSION; c->cache_version = Channel::CACHE_VERSION;
c->need_save_to_database = true; c->need_save_to_database = true;

View File

@ -431,7 +431,7 @@ struct GroupCallManager::GroupCallParticipants {
struct GroupCallManager::GroupCallRecentSpeakers { struct GroupCallManager::GroupCallRecentSpeakers {
vector<std::pair<UserId, int32>> users; // user + time; sorted by time vector<std::pair<UserId, int32>> users; // user + time; sorted by time
bool is_changed = false; bool is_changed = false;
vector<int32> last_sent_user_ids; vector<std::pair<UserId, bool>> last_sent_users;
}; };
struct GroupCallManager::PendingJoinRequest { struct GroupCallManager::PendingJoinRequest {
@ -442,6 +442,9 @@ struct GroupCallManager::PendingJoinRequest {
}; };
GroupCallManager::GroupCallManager(Td *td, ActorShared<> parent) : td_(td), parent_(std::move(parent)) { GroupCallManager::GroupCallManager(Td *td, ActorShared<> parent) : td_(td), parent_(std::move(parent)) {
check_group_call_is_joined_timeout_.set_callback(on_check_group_call_is_joined_timeout_callback);
check_group_call_is_joined_timeout_.set_callback_data(static_cast<void *>(this));
pending_send_speaking_action_timeout_.set_callback(on_pending_send_speaking_action_timeout_callback); pending_send_speaking_action_timeout_.set_callback(on_pending_send_speaking_action_timeout_callback);
pending_send_speaking_action_timeout_.set_callback_data(static_cast<void *>(this)); pending_send_speaking_action_timeout_.set_callback_data(static_cast<void *>(this));
@ -486,6 +489,45 @@ void GroupCallManager::memory_stats(vector<string> &output) {
output.push_back("\"pending_join_requests_\":"); output.push_back(std::to_string(pending_join_requests_.size())); output.push_back("\"pending_join_requests_\":"); output.push_back(std::to_string(pending_join_requests_.size()));
} }
void GroupCallManager::on_check_group_call_is_joined_timeout_callback(void *group_call_manager_ptr,
int64 group_call_id_int) {
if (G()->close_flag()) {
return;
}
auto group_call_manager = static_cast<GroupCallManager *>(group_call_manager_ptr);
send_closure_later(group_call_manager->actor_id(group_call_manager),
&GroupCallManager::on_check_group_call_is_joined_timeout,
GroupCallId(narrow_cast<int32>(group_call_id_int)));
}
void GroupCallManager::on_check_group_call_is_joined_timeout(GroupCallId group_call_id) {
if (G()->close_flag()) {
return;
}
LOG(INFO) << "Receive check group call is_joined timeout in " << group_call_id;
auto input_group_call_id = get_input_group_call_id(group_call_id).move_as_ok();
auto *group_call = get_group_call(input_group_call_id);
CHECK(group_call != nullptr && group_call->is_inited);
if (!group_call->is_joined || check_group_call_is_joined_timeout_.has_timeout(group_call_id.get())) {
return;
}
auto source = group_call->source;
auto promise =
PromiseCreator::lambda([actor_id = actor_id(this), input_group_call_id, source](Result<Unit> &&result) mutable {
if (result.is_error() && result.error().message() == "GROUP_CALL_JOIN_MISSING") {
send_closure(actor_id, &GroupCallManager::on_group_call_left, input_group_call_id, source, true);
result = Unit();
}
send_closure(actor_id, &GroupCallManager::finish_check_group_call_is_joined, input_group_call_id, source,
std::move(result));
});
td_->create_handler<CheckGroupCallQuery>(std::move(promise))->send(input_group_call_id, source);
}
void GroupCallManager::on_pending_send_speaking_action_timeout_callback(void *group_call_manager_ptr, void GroupCallManager::on_pending_send_speaking_action_timeout_callback(void *group_call_manager_ptr,
int64 group_call_id_int) { int64 group_call_id_int) {
if (G()->close_flag()) { if (G()->close_flag()) {
@ -540,8 +582,8 @@ void GroupCallManager::on_recent_speaker_update_timeout(GroupCallId group_call_i
LOG(INFO) << "Receive recent speaker update timeout in " << group_call_id; LOG(INFO) << "Receive recent speaker update timeout in " << group_call_id;
auto input_group_call_id = get_input_group_call_id(group_call_id).move_as_ok(); auto input_group_call_id = get_input_group_call_id(group_call_id).move_as_ok();
get_recent_speaker_user_ids(get_group_call(input_group_call_id), get_recent_speakers(get_group_call(input_group_call_id),
false); // will update the list and send updateGroupCall if needed false); // will update the list and send updateGroupCall if needed
} }
void GroupCallManager::on_sync_participants_timeout_callback(void *group_call_manager_ptr, int64 group_call_id_int) { void GroupCallManager::on_sync_participants_timeout_callback(void *group_call_manager_ptr, int64 group_call_id_int) {
@ -718,7 +760,7 @@ void GroupCallManager::get_group_call(GroupCallId group_call_id,
auto group_call = get_group_call(input_group_call_id); auto group_call = get_group_call(input_group_call_id);
if (group_call != nullptr && group_call->is_inited) { if (group_call != nullptr && group_call->is_inited) {
return promise.set_value(get_group_call_object(group_call, get_recent_speaker_user_ids(group_call, false))); return promise.set_value(get_group_call_object(group_call, get_recent_speakers(group_call, false)));
} }
reload_group_call(input_group_call_id, std::move(promise)); reload_group_call(input_group_call_id, std::move(promise));
@ -804,11 +846,26 @@ void GroupCallManager::finish_get_group_call(InputGroupCallId input_group_call_i
CHECK(group_call != nullptr && group_call->is_inited); CHECK(group_call != nullptr && group_call->is_inited);
for (auto &promise : promises) { for (auto &promise : promises) {
if (promise) { if (promise) {
promise.set_value(get_group_call_object(group_call, get_recent_speaker_user_ids(group_call, false))); promise.set_value(get_group_call_object(group_call, get_recent_speakers(group_call, false)));
} }
} }
} }
void GroupCallManager::finish_check_group_call_is_joined(InputGroupCallId input_group_call_id, int32 source,
Result<Unit> &&result) {
LOG(INFO) << "Finish check group call is_joined for " << input_group_call_id;
auto *group_call = get_group_call(input_group_call_id);
CHECK(group_call != nullptr && group_call->is_inited);
if (!group_call->is_joined || check_group_call_is_joined_timeout_.has_timeout(group_call->group_call_id.get()) ||
group_call->source != source) {
return;
}
int32 next_timeout = result.is_ok() ? CHECK_GROUP_CALL_IS_JOINED_TIMEOUT : 1;
check_group_call_is_joined_timeout_.set_timeout_in(group_call->group_call_id.get(), next_timeout);
}
bool GroupCallManager::need_group_call_participants(InputGroupCallId input_group_call_id) const { bool GroupCallManager::need_group_call_participants(InputGroupCallId input_group_call_id) const {
auto *group_call = get_group_call(input_group_call_id); auto *group_call = get_group_call(input_group_call_id);
if (group_call == nullptr || !group_call->is_inited || !group_call->is_active) { if (group_call == nullptr || !group_call->is_inited || !group_call->is_active) {
@ -1616,6 +1673,8 @@ bool GroupCallManager::on_join_group_call_response(InputGroupCallId input_group_
group_call->joined_date = G()->unix_time(); group_call->joined_date = G()->unix_time();
group_call->source = it->second->source; group_call->source = it->second->source;
it->second->promise.set_value(result.move_as_ok()); it->second->promise.set_value(result.move_as_ok());
check_group_call_is_joined_timeout_.set_timeout_in(group_call->group_call_id.get(),
CHECK_GROUP_CALL_IS_JOINED_TIMEOUT);
need_update = true; need_update = true;
} }
pending_join_requests_.erase(it); pending_join_requests_.erase(it);
@ -1691,6 +1750,10 @@ void GroupCallManager::set_group_call_participant_is_speaking(GroupCallId group_
} else { } else {
recursive = true; recursive = true;
} }
if (source != group_call->source && !recursive && is_speaking &&
check_group_call_is_joined_timeout_.has_timeout(group_call_id.get())) {
check_group_call_is_joined_timeout_.set_timeout_in(group_call_id.get(), CHECK_GROUP_CALL_IS_JOINED_TIMEOUT);
}
UserId user_id = set_group_call_participant_is_speaking_by_source(input_group_call_id, source, is_speaking, date); UserId user_id = set_group_call_participant_is_speaking_by_source(input_group_call_id, source, is_speaking, date);
if (!user_id.is_valid()) { if (!user_id.is_valid()) {
if (!recursive) { if (!recursive) {
@ -1755,29 +1818,6 @@ void GroupCallManager::toggle_group_call_participant_is_muted(GroupCallId group_
td_->create_handler<EditGroupCallMemberQuery>(std::move(promise))->send(input_group_call_id, user_id, is_muted); td_->create_handler<EditGroupCallMemberQuery>(std::move(promise))->send(input_group_call_id, user_id, is_muted);
} }
void GroupCallManager::check_group_call_is_joined(GroupCallId group_call_id, Promise<Unit> &&promise) {
TRY_RESULT_PROMISE(promise, input_group_call_id, get_input_group_call_id(group_call_id));
auto *group_call = get_group_call(input_group_call_id);
if (group_call == nullptr || !group_call->is_inited) {
return promise.set_error(Status::Error(400, "GROUP_CALL_JOIN_MISSING"));
}
if (!group_call->is_active || !group_call->is_joined || group_call->joined_date > G()->unix_time() - 8) {
return promise.set_value(Unit());
}
auto source = group_call->source;
auto query_promise = PromiseCreator::lambda([actor_id = actor_id(this), input_group_call_id, source,
promise = std::move(promise)](Result<Unit> &&result) mutable {
if (result.is_error() && result.error().message() == "GROUP_CALL_JOIN_MISSING") {
send_closure(actor_id, &GroupCallManager::on_group_call_left, input_group_call_id, source, true);
result = Unit();
}
promise.set_result(std::move(result));
});
td_->create_handler<CheckGroupCallQuery>(std::move(query_promise))->send(input_group_call_id, source);
}
void GroupCallManager::load_group_call_participants(GroupCallId group_call_id, int32 limit, Promise<Unit> &&promise) { void GroupCallManager::load_group_call_participants(GroupCallId group_call_id, int32 limit, Promise<Unit> &&promise) {
if (limit <= 0) { if (limit <= 0) {
return promise.set_error(Status::Error(400, "Parameter limit must be positive")); return promise.set_error(Status::Error(400, "Parameter limit must be positive"));
@ -1846,6 +1886,7 @@ void GroupCallManager::on_group_call_left_impl(GroupCall *group_call, bool need_
group_call->source = 0; group_call->source = 0;
group_call->loaded_all_participants = false; group_call->loaded_all_participants = false;
group_call->version = -1; group_call->version = -1;
check_group_call_is_joined_timeout_.cancel_timeout(group_call->group_call_id.get());
try_clear_group_call_participants(get_input_group_call_id(group_call->group_call_id).ok()); try_clear_group_call_participants(get_input_group_call_id(group_call->group_call_id).ok());
} }
@ -2220,64 +2261,74 @@ void GroupCallManager::update_group_call_dialog(const GroupCall *group_call, con
group_call->participant_count == 0, source); group_call->participant_count == 0, source);
} }
vector<int32> GroupCallManager::get_recent_speaker_user_ids(const GroupCall *group_call, bool for_update) { vector<td_api::object_ptr<td_api::groupCallRecentSpeaker>> GroupCallManager::get_recent_speakers(
const GroupCall *group_call, bool for_update) {
CHECK(group_call != nullptr && group_call->is_inited); CHECK(group_call != nullptr && group_call->is_inited);
vector<int32> recent_speaker_user_ids;
auto recent_speakers_it = group_call_recent_speakers_.find(group_call->group_call_id); auto recent_speakers_it = group_call_recent_speakers_.find(group_call->group_call_id);
if (recent_speakers_it == group_call_recent_speakers_.end()) { if (recent_speakers_it == group_call_recent_speakers_.end()) {
return recent_speaker_user_ids; return Auto();
} }
auto *recent_speakers = recent_speakers_it->second.get(); auto *recent_speakers = recent_speakers_it->second.get();
CHECK(recent_speakers != nullptr); CHECK(recent_speakers != nullptr);
LOG(INFO) << "Found " << recent_speakers->users.size() << " recent speakers in " << group_call->group_call_id LOG(INFO) << "Found " << recent_speakers->users.size() << " recent speakers in " << group_call->group_call_id
<< " from " << group_call->dialog_id; << " from " << group_call->dialog_id;
while (!recent_speakers->users.empty() && auto now = G()->unix_time();
recent_speakers->users.back().second < G()->unix_time() - RECENT_SPEAKER_TIMEOUT) { while (!recent_speakers->users.empty() && recent_speakers->users.back().second < now - RECENT_SPEAKER_TIMEOUT) {
recent_speakers->users.pop_back(); recent_speakers->users.pop_back();
} }
vector<std::pair<UserId, bool>> recent_speaker_users;
for (auto &recent_speaker : recent_speakers->users) { for (auto &recent_speaker : recent_speakers->users) {
recent_speaker_user_ids.push_back(recent_speaker.first.get()); recent_speaker_users.emplace_back(recent_speaker.first, recent_speaker.second > now - 5);
} }
if (recent_speakers->is_changed) { if (recent_speakers->is_changed) {
recent_speakers->is_changed = false; recent_speakers->is_changed = false;
recent_speaker_update_timeout_.cancel_timeout(group_call->group_call_id.get()); recent_speaker_update_timeout_.cancel_timeout(group_call->group_call_id.get());
} }
if (!recent_speakers->users.empty()) { if (!recent_speaker_users.empty()) {
auto next_timeout = recent_speakers->users.back().second + RECENT_SPEAKER_TIMEOUT - G()->unix_time() + 1; auto next_timeout = recent_speakers->users.back().second + RECENT_SPEAKER_TIMEOUT - now + 1;
if (recent_speaker_users[0].second) { // if someone is speaking, recheck in 1 second
next_timeout = 1;
}
recent_speaker_update_timeout_.add_timeout_in(group_call->group_call_id.get(), next_timeout); recent_speaker_update_timeout_.add_timeout_in(group_call->group_call_id.get(), next_timeout);
} }
if (recent_speakers->last_sent_user_ids != recent_speaker_user_ids) { auto get_result = [recent_speaker_users] {
recent_speakers->last_sent_user_ids = recent_speaker_user_ids; return transform(recent_speaker_users, [](const std::pair<UserId, bool> &recent_speaker_user) {
return td_api::make_object<td_api::groupCallRecentSpeaker>(recent_speaker_user.first.get(),
recent_speaker_user.second);
});
};
if (recent_speakers->last_sent_users != recent_speaker_users) {
recent_speakers->last_sent_users = std::move(recent_speaker_users);
if (!for_update) { if (!for_update) {
// the change must be received through update first // the change must be received through update first
send_update_group_call(group_call, "get_recent_speaker_user_ids"); send_closure(G()->td(), &Td::send_update, get_update_group_call_object(group_call, get_result()));
} }
} }
return recent_speaker_user_ids;
return get_result();
} }
tl_object_ptr<td_api::groupCall> GroupCallManager::get_group_call_object(const GroupCall *group_call, tl_object_ptr<td_api::groupCall> GroupCallManager::get_group_call_object(
vector<int32> recent_speaker_user_ids) const { const GroupCall *group_call, vector<td_api::object_ptr<td_api::groupCallRecentSpeaker>> recent_speakers) const {
CHECK(group_call != nullptr); CHECK(group_call != nullptr);
CHECK(group_call->is_inited); CHECK(group_call->is_inited);
return td_api::make_object<td_api::groupCall>( return td_api::make_object<td_api::groupCall>(
group_call->group_call_id.get(), group_call->is_active, group_call->is_joined, group_call->need_rejoin, group_call->group_call_id.get(), group_call->is_active, group_call->is_joined, group_call->need_rejoin,
group_call->can_self_unmute, group_call->can_be_managed, group_call->participant_count, group_call->can_self_unmute, group_call->can_be_managed, group_call->participant_count,
group_call->loaded_all_participants, std::move(recent_speaker_user_ids), group_call->mute_new_participants, group_call->loaded_all_participants, std::move(recent_speakers), group_call->mute_new_participants,
group_call->allowed_change_mute_new_participants, group_call->duration); group_call->allowed_change_mute_new_participants, group_call->duration);
} }
tl_object_ptr<td_api::updateGroupCall> GroupCallManager::get_update_group_call_object( tl_object_ptr<td_api::updateGroupCall> GroupCallManager::get_update_group_call_object(
const GroupCall *group_call, vector<int32> recent_speaker_user_ids) const { const GroupCall *group_call, vector<td_api::object_ptr<td_api::groupCallRecentSpeaker>> recent_speakers) const {
return td_api::make_object<td_api::updateGroupCall>( return td_api::make_object<td_api::updateGroupCall>(get_group_call_object(group_call, std::move(recent_speakers)));
get_group_call_object(group_call, std::move(recent_speaker_user_ids)));
} }
tl_object_ptr<td_api::updateGroupCallParticipant> GroupCallManager::get_update_group_call_participant_object( tl_object_ptr<td_api::updateGroupCallParticipant> GroupCallManager::get_update_group_call_participant_object(
@ -2292,7 +2343,7 @@ void GroupCallManager::send_update_group_call(const GroupCall *group_call, const
return; return;
} }
send_closure(G()->td(), &Td::send_update, send_closure(G()->td(), &Td::send_update,
get_update_group_call_object(group_call, get_recent_speaker_user_ids(group_call, true))); get_update_group_call_object(group_call, get_recent_speakers(group_call, true)));
} }
void GroupCallManager::send_update_group_call_participant(GroupCallId group_call_id, void GroupCallManager::send_update_group_call_participant(GroupCallId group_call_id,

View File

@ -66,8 +66,6 @@ class GroupCallManager : public Actor {
void toggle_group_call_participant_is_muted(GroupCallId group_call_id, UserId user_id, bool is_muted, void toggle_group_call_participant_is_muted(GroupCallId group_call_id, UserId user_id, bool is_muted,
Promise<Unit> &&promise); Promise<Unit> &&promise);
void check_group_call_is_joined(GroupCallId group_call_id, Promise<Unit> &&promise);
void load_group_call_participants(GroupCallId group_call_id, int32 limit, Promise<Unit> &&promise); void load_group_call_participants(GroupCallId group_call_id, int32 limit, Promise<Unit> &&promise);
void leave_group_call(GroupCallId group_call_id, Promise<Unit> &&promise); void leave_group_call(GroupCallId group_call_id, Promise<Unit> &&promise);
@ -96,9 +94,14 @@ class GroupCallManager : public Actor {
struct PendingJoinRequest; struct PendingJoinRequest;
static constexpr int32 RECENT_SPEAKER_TIMEOUT = 5 * 60; static constexpr int32 RECENT_SPEAKER_TIMEOUT = 5 * 60;
static constexpr int32 CHECK_GROUP_CALL_IS_JOINED_TIMEOUT = 10;
void tear_down() override; void tear_down() override;
static void on_check_group_call_is_joined_timeout_callback(void *group_call_manager_ptr, int64 group_call_id_int);
void on_check_group_call_is_joined_timeout(GroupCallId group_call_id);
static void on_pending_send_speaking_action_timeout_callback(void *group_call_manager_ptr, int64 group_call_id_int); static void on_pending_send_speaking_action_timeout_callback(void *group_call_manager_ptr, int64 group_call_id_int);
void on_send_speaking_action_timeout(GroupCallId group_call_id); void on_send_speaking_action_timeout(GroupCallId group_call_id);
@ -129,6 +132,8 @@ class GroupCallManager : public Actor {
void finish_get_group_call(InputGroupCallId input_group_call_id, void finish_get_group_call(InputGroupCallId input_group_call_id,
Result<tl_object_ptr<telegram_api::phone_groupCall>> &&result); Result<tl_object_ptr<telegram_api::phone_groupCall>> &&result);
void finish_check_group_call_is_joined(InputGroupCallId input_group_call_id, int32 source, Result<Unit> &&result);
bool need_group_call_participants(InputGroupCallId input_group_call_id) const; bool need_group_call_participants(InputGroupCallId input_group_call_id) const;
bool process_pending_group_call_participant_updates(InputGroupCallId input_group_call_id); bool process_pending_group_call_participant_updates(InputGroupCallId input_group_call_id);
@ -185,13 +190,14 @@ class GroupCallManager : public Actor {
void update_group_call_dialog(const GroupCall *group_call, const char *source); void update_group_call_dialog(const GroupCall *group_call, const char *source);
vector<int32> get_recent_speaker_user_ids(const GroupCall *group_call, bool for_update); vector<td_api::object_ptr<td_api::groupCallRecentSpeaker>> get_recent_speakers(const GroupCall *group_call,
bool for_update);
tl_object_ptr<td_api::updateGroupCall> get_update_group_call_object(const GroupCall *group_call, tl_object_ptr<td_api::updateGroupCall> get_update_group_call_object(
vector<int32> recent_speaker_user_ids) const; const GroupCall *group_call, vector<td_api::object_ptr<td_api::groupCallRecentSpeaker>> recent_speakers) const;
tl_object_ptr<td_api::groupCall> get_group_call_object(const GroupCall *group_call, tl_object_ptr<td_api::groupCall> get_group_call_object(
vector<int32> recent_speaker_user_ids) const; const GroupCall *group_call, vector<td_api::object_ptr<td_api::groupCallRecentSpeaker>> recent_speakers) const;
tl_object_ptr<td_api::updateGroupCallParticipant> get_update_group_call_participant_object( tl_object_ptr<td_api::updateGroupCallParticipant> get_update_group_call_participant_object(
GroupCallId group_call_id, const GroupCallParticipant &participant); GroupCallId group_call_id, const GroupCallParticipant &participant);
@ -223,6 +229,7 @@ class GroupCallManager : public Actor {
std::unordered_map<InputGroupCallId, unique_ptr<PendingJoinRequest>, InputGroupCallIdHash> pending_join_requests_; std::unordered_map<InputGroupCallId, unique_ptr<PendingJoinRequest>, InputGroupCallIdHash> pending_join_requests_;
uint64 join_group_request_generation_ = 0; uint64 join_group_request_generation_ = 0;
MultiTimeout check_group_call_is_joined_timeout_{"CheckGroupCallIsJoinedTimeout"};
MultiTimeout pending_send_speaking_action_timeout_{"PendingSendSpeakingActionTimeout"}; MultiTimeout pending_send_speaking_action_timeout_{"PendingSendSpeakingActionTimeout"};
MultiTimeout recent_speaker_update_timeout_{"RecentSpeakerUpdateTimeout"}; MultiTimeout recent_speaker_update_timeout_{"RecentSpeakerUpdateTimeout"};
MultiTimeout sync_participants_timeout_{"SyncParticipantsTimeout"}; MultiTimeout sync_participants_timeout_{"SyncParticipantsTimeout"};

View File

@ -656,22 +656,24 @@ class UnpinAllMessagesQuery : public Td::ResultHandler {
if (affected_history->pts_count_ > 0) { if (affected_history->pts_count_ > 0) {
affected_history->pts_count_ = 0; // force receiving real updates from the server affected_history->pts_count_ = 0; // force receiving real updates from the server
auto promise = affected_history->offset_ > 0 ? Promise<Unit>() : std::move(promise_);
if (dialog_id_.get_type() == DialogType::Channel) { if (dialog_id_.get_type() == DialogType::Channel) {
td->messages_manager_->add_pending_channel_update(dialog_id_, make_tl_object<dummyUpdate>(), td->messages_manager_->add_pending_channel_update(dialog_id_, make_tl_object<dummyUpdate>(),
affected_history->pts_, affected_history->pts_count_, affected_history->pts_, affected_history->pts_count_,
"unpin all messages"); std::move(promise), "unpin all messages");
} else { } else {
td->messages_manager_->add_pending_update(make_tl_object<dummyUpdate>(), affected_history->pts_, td->messages_manager_->add_pending_update(make_tl_object<dummyUpdate>(), affected_history->pts_,
affected_history->pts_count_, false, "unpin all messages"); affected_history->pts_count_, false, std::move(promise),
"unpin all messages");
} }
} else if (affected_history->offset_ <= 0) {
promise_.set_value(Unit());
} }
if (affected_history->offset_ > 0) { if (affected_history->offset_ > 0) {
send_request(); send_request();
return; return;
} }
promise_.set_value(Unit());
} }
void on_error(uint64 id, Status status) override { void on_error(uint64 id, Status status) override {
@ -1566,7 +1568,8 @@ class ReadMessagesContentsQuery : public Td::ResultHandler {
if (affected_messages->pts_count_ > 0) { if (affected_messages->pts_count_ > 0) {
td->messages_manager_->add_pending_update(make_tl_object<dummyUpdate>(), affected_messages->pts_, td->messages_manager_->add_pending_update(make_tl_object<dummyUpdate>(), affected_messages->pts_,
affected_messages->pts_count_, false, "read messages content query"); affected_messages->pts_count_, false, Promise<Unit>(),
"read messages content query");
} }
promise_.set_value(Unit()); promise_.set_value(Unit());
@ -1783,7 +1786,8 @@ class ReadHistoryQuery : public Td::ResultHandler {
if (affected_messages->pts_count_ > 0) { if (affected_messages->pts_count_ > 0) {
td->messages_manager_->add_pending_update(make_tl_object<dummyUpdate>(), affected_messages->pts_, td->messages_manager_->add_pending_update(make_tl_object<dummyUpdate>(), affected_messages->pts_,
affected_messages->pts_count_, false, "read history query"); affected_messages->pts_count_, false, Promise<Unit>(),
"read history query");
} }
promise_.set_value(Unit()); promise_.set_value(Unit());
@ -2252,7 +2256,8 @@ class DeleteHistoryQuery : public Td::ResultHandler {
if (affected_history->pts_count_ > 0) { if (affected_history->pts_count_ > 0) {
td->messages_manager_->add_pending_update(make_tl_object<dummyUpdate>(), affected_history->pts_, td->messages_manager_->add_pending_update(make_tl_object<dummyUpdate>(), affected_history->pts_,
affected_history->pts_count_, false, "delete history query"); affected_history->pts_count_, false, Promise<Unit>(),
"delete history query");
} }
if (affected_history->offset_ > 0) { if (affected_history->offset_ > 0) {
@ -2391,17 +2396,17 @@ class DeleteUserHistoryQuery : public Td::ResultHandler {
CHECK(affected_history->get_id() == telegram_api::messages_affectedHistory::ID); CHECK(affected_history->get_id() == telegram_api::messages_affectedHistory::ID);
if (affected_history->pts_count_ > 0) { if (affected_history->pts_count_ > 0) {
td->messages_manager_->add_pending_channel_update(DialogId(channel_id_), make_tl_object<dummyUpdate>(), td->messages_manager_->add_pending_channel_update(
affected_history->pts_, affected_history->pts_count_, DialogId(channel_id_), make_tl_object<dummyUpdate>(), affected_history->pts_, affected_history->pts_count_,
"delete user history query"); affected_history->offset_ > 0 ? Promise<Unit>() : std::move(promise_), "delete user history query");
} else if (affected_history->offset_ <= 0) {
promise_.set_value(Unit());
} }
if (affected_history->offset_ > 0) { if (affected_history->offset_ > 0) {
send_request(); send_request();
return; return;
} }
promise_.set_value(Unit());
} }
void on_error(uint64 id, Status status) override { void on_error(uint64 id, Status status) override {
@ -2451,7 +2456,8 @@ class ReadAllMentionsQuery : public Td::ResultHandler {
td->updates_manager_->get_difference("Wrong messages_readMentions result"); td->updates_manager_->get_difference("Wrong messages_readMentions result");
} else { } else {
td->messages_manager_->add_pending_update(make_tl_object<dummyUpdate>(), affected_history->pts_, td->messages_manager_->add_pending_update(make_tl_object<dummyUpdate>(), affected_history->pts_,
affected_history->pts_count_, false, "read all mentions query"); affected_history->pts_count_, false, Promise<Unit>(),
"read all mentions query");
} }
} }
@ -2581,13 +2587,13 @@ class SendMessageActor : public NetActorOnce {
if (dialog_id_.get_type() == DialogType::Channel) { if (dialog_id_.get_type() == DialogType::Channel) {
td->messages_manager_->add_pending_channel_update( td->messages_manager_->add_pending_channel_update(
dialog_id_, make_tl_object<updateSentMessage>(random_id_, message_id, sent_message->date_), dialog_id_, make_tl_object<updateSentMessage>(random_id_, message_id, sent_message->date_),
sent_message->pts_, sent_message->pts_count_, "send message actor"); sent_message->pts_, sent_message->pts_count_, Promise<Unit>(), "send message actor");
return; return;
} }
td->messages_manager_->add_pending_update( td->messages_manager_->add_pending_update(
make_tl_object<updateSentMessage>(random_id_, message_id, sent_message->date_), sent_message->pts_, make_tl_object<updateSentMessage>(random_id_, message_id, sent_message->date_), sent_message->pts_,
sent_message->pts_count_, false, "send message actor"); sent_message->pts_count_, false, Promise<Unit>(), "send message actor");
} }
void on_error(uint64 id, Status status) override { void on_error(uint64 id, Status status) override {
@ -3038,11 +3044,20 @@ class SendScheduledMessageActor : public NetActorOnce {
}; };
class EditMessageActor : public NetActorOnce { class EditMessageActor : public NetActorOnce {
Promise<Unit> promise_; Promise<int32> promise_;
DialogId dialog_id_; DialogId dialog_id_;
public: public:
explicit EditMessageActor(Promise<Unit> &&promise) : promise_(std::move(promise)) { explicit EditMessageActor(Promise<Unit> &&promise) {
promise_ = PromiseCreator::lambda([promise = std::move(promise)](Result<int32> result) mutable {
if (result.is_error()) {
promise.set_error(result.move_as_error());
} else {
promise.set_value(Unit());
}
});
}
explicit EditMessageActor(Promise<int32> &&promise) : promise_(std::move(promise)) {
} }
void send(int32 flags, DialogId dialog_id, MessageId message_id, const string &text, void send(int32 flags, DialogId dialog_id, MessageId message_id, const string &text,
@ -3101,13 +3116,16 @@ class EditMessageActor : public NetActorOnce {
auto ptr = result_ptr.move_as_ok(); auto ptr = result_ptr.move_as_ok();
LOG(INFO) << "Receive result for EditMessageActor: " << to_string(ptr); LOG(INFO) << "Receive result for EditMessageActor: " << to_string(ptr);
td->updates_manager_->on_get_updates(std::move(ptr), std::move(promise_)); auto pts = td->updates_manager_->get_update_edit_message_pts(ptr.get());
auto promise = PromiseCreator::lambda(
[promise = std::move(promise_), pts](Result<Unit> result) mutable { promise.set_value(std::move(pts)); });
td->updates_manager_->on_get_updates(std::move(ptr), std::move(promise));
} }
void on_error(uint64 id, Status status) override { void on_error(uint64 id, Status status) override {
LOG(INFO) << "Receive error for EditMessageQuery: " << status; LOG(INFO) << "Receive error for EditMessageQuery: " << status;
if (!td->auth_manager_->is_bot() && status.message() == "MESSAGE_NOT_MODIFIED") { if (!td->auth_manager_->is_bot() && status.message() == "MESSAGE_NOT_MODIFIED") {
return promise_.set_value(Unit()); return promise_.set_value(0);
} }
td->messages_manager_->on_get_dialog_error(dialog_id_, status, "EditMessageActor"); td->messages_manager_->on_get_dialog_error(dialog_id_, status, "EditMessageActor");
promise_.set_error(std::move(status)); promise_.set_error(std::move(status));
@ -3596,7 +3614,8 @@ class DeleteMessagesQuery : public Td::ResultHandler {
if (affected_messages->pts_count_ > 0) { if (affected_messages->pts_count_ > 0) {
td->messages_manager_->add_pending_update(make_tl_object<dummyUpdate>(), affected_messages->pts_, td->messages_manager_->add_pending_update(make_tl_object<dummyUpdate>(), affected_messages->pts_,
affected_messages->pts_count_, false, "delete messages query"); affected_messages->pts_count_, false, Promise<Unit>(),
"delete messages query");
} }
if (--query_count_ == 0) { if (--query_count_ == 0) {
promise_.set_value(Unit()); promise_.set_value(Unit());
@ -3661,7 +3680,7 @@ class DeleteChannelMessagesQuery : public Td::ResultHandler {
if (affected_messages->pts_count_ > 0) { if (affected_messages->pts_count_ > 0) {
td->messages_manager_->add_pending_channel_update(DialogId(channel_id_), make_tl_object<dummyUpdate>(), td->messages_manager_->add_pending_channel_update(DialogId(channel_id_), make_tl_object<dummyUpdate>(),
affected_messages->pts_, affected_messages->pts_count_, affected_messages->pts_, affected_messages->pts_count_,
"DeleteChannelMessagesQuery"); Promise<Unit>(), "DeleteChannelMessagesQuery");
} }
if (--query_count_ == 0) { if (--query_count_ == 0) {
promise_.set_value(Unit()); promise_.set_value(Unit());
@ -6246,7 +6265,7 @@ void MessagesManager::skip_old_pending_update(tl_object_ptr<telegram_api::Update
} }
void MessagesManager::add_pending_update(tl_object_ptr<telegram_api::Update> &&update, int32 new_pts, int32 pts_count, void MessagesManager::add_pending_update(tl_object_ptr<telegram_api::Update> &&update, int32 new_pts, int32 pts_count,
bool force_apply, const char *source) { bool force_apply, Promise<Unit> &&promise, const char *source) {
// do not try to run getDifference from this function // do not try to run getDifference from this function
CHECK(update != nullptr); CHECK(update != nullptr);
CHECK(source != nullptr); CHECK(source != nullptr);
@ -6255,7 +6274,7 @@ void MessagesManager::add_pending_update(tl_object_ptr<telegram_api::Update> &&u
if (pts_count < 0 || new_pts <= pts_count) { if (pts_count < 0 || new_pts <= pts_count) {
LOG(ERROR) << "Receive update with wrong pts = " << new_pts << " or pts_count = " << pts_count << " from " << source LOG(ERROR) << "Receive update with wrong pts = " << new_pts << " or pts_count = " << pts_count << " from " << source
<< ": " << oneline(to_string(update)); << ": " << oneline(to_string(update));
return; return promise.set_value(Unit());
} }
// TODO need to save all updates that can change result of running queries not associated with pts (for example // TODO need to save all updates that can change result of running queries not associated with pts (for example
@ -6272,7 +6291,7 @@ void MessagesManager::add_pending_update(tl_object_ptr<telegram_api::Update> &&u
auto update_new_message = static_cast<const telegram_api::updateNewMessage *>(update.get()); auto update_new_message = static_cast<const telegram_api::updateNewMessage *>(update.get());
DialogId dialog_id = get_message_dialog_id(update_new_message->message_); DialogId dialog_id = get_message_dialog_id(update_new_message->message_);
if (!check_update_dialog_id(update, dialog_id)) { if (!check_update_dialog_id(update, dialog_id)) {
return; return promise.set_value(Unit());
} }
break; break;
} }
@ -6280,7 +6299,7 @@ void MessagesManager::add_pending_update(tl_object_ptr<telegram_api::Update> &&u
auto update_read_history_inbox = static_cast<const telegram_api::updateReadHistoryInbox *>(update.get()); auto update_read_history_inbox = static_cast<const telegram_api::updateReadHistoryInbox *>(update.get());
auto dialog_id = DialogId(update_read_history_inbox->peer_); auto dialog_id = DialogId(update_read_history_inbox->peer_);
if (!check_update_dialog_id(update, dialog_id)) { if (!check_update_dialog_id(update, dialog_id)) {
return; return promise.set_value(Unit());
} }
break; break;
} }
@ -6288,7 +6307,7 @@ void MessagesManager::add_pending_update(tl_object_ptr<telegram_api::Update> &&u
auto update_read_history_outbox = static_cast<const telegram_api::updateReadHistoryOutbox *>(update.get()); auto update_read_history_outbox = static_cast<const telegram_api::updateReadHistoryOutbox *>(update.get());
auto dialog_id = DialogId(update_read_history_outbox->peer_); auto dialog_id = DialogId(update_read_history_outbox->peer_);
if (!check_update_dialog_id(update, dialog_id)) { if (!check_update_dialog_id(update, dialog_id)) {
return; return promise.set_value(Unit());
} }
break; break;
} }
@ -6296,7 +6315,7 @@ void MessagesManager::add_pending_update(tl_object_ptr<telegram_api::Update> &&u
auto update_edit_message = static_cast<const telegram_api::updateEditMessage *>(update.get()); auto update_edit_message = static_cast<const telegram_api::updateEditMessage *>(update.get());
DialogId dialog_id = get_message_dialog_id(update_edit_message->message_); DialogId dialog_id = get_message_dialog_id(update_edit_message->message_);
if (!check_update_dialog_id(update, dialog_id)) { if (!check_update_dialog_id(update, dialog_id)) {
return; return promise.set_value(Unit());
} }
break; break;
} }
@ -6304,7 +6323,7 @@ void MessagesManager::add_pending_update(tl_object_ptr<telegram_api::Update> &&u
auto update_pinned_messages = static_cast<const telegram_api::updatePinnedMessages *>(update.get()); auto update_pinned_messages = static_cast<const telegram_api::updatePinnedMessages *>(update.get());
auto dialog_id = DialogId(update_pinned_messages->peer_); auto dialog_id = DialogId(update_pinned_messages->peer_);
if (!check_update_dialog_id(update, dialog_id)) { if (!check_update_dialog_id(update, dialog_id)) {
return; return promise.set_value(Unit());
} }
break; break;
} }
@ -6314,17 +6333,18 @@ void MessagesManager::add_pending_update(tl_object_ptr<telegram_api::Update> &&u
} }
if (force_apply) { if (force_apply) {
CHECK(pending_updates_.empty()); CHECK(pending_pts_updates_.empty());
CHECK(accumulated_pts_ == -1); CHECK(accumulated_pts_ == -1);
if (pts_count != 0) { if (pts_count != 0) {
LOG(ERROR) << "Receive forced update with pts_count = " << pts_count << " from " << source; LOG(ERROR) << "Receive forced update with pts_count = " << pts_count << " from " << source;
} }
process_update(std::move(update)); process_update(std::move(update));
return; return promise.set_value(Unit());
} }
if (DROP_UPDATES) { if (DROP_UPDATES) {
return set_get_difference_timeout(1.0); set_get_difference_timeout(1.0);
return promise.set_value(Unit());
} }
int32 old_pts = td_->updates_manager_->get_pts(); int32 old_pts = td_->updates_manager_->get_pts();
@ -6350,15 +6370,13 @@ void MessagesManager::add_pending_update(tl_object_ptr<telegram_api::Update> &&u
if (new_pts <= old_pts) { if (new_pts <= old_pts) {
skip_old_pending_update(std::move(update), new_pts, old_pts, pts_count, source); skip_old_pending_update(std::move(update), new_pts, old_pts, pts_count, source);
return; return promise.set_value(Unit());
} }
if (td_->updates_manager_->running_get_difference()) { if (td_->updates_manager_->running_get_difference() || !postponed_pts_updates_.empty()) {
LOG(INFO) << "Save pending update got while running getDifference from " << source; LOG(INFO) << "Save pending update got while running getDifference from " << source;
CHECK(update->get_id() == dummyUpdate::ID || update->get_id() == updateSentMessage::ID); CHECK(update->get_id() == dummyUpdate::ID || update->get_id() == updateSentMessage::ID);
if (pts_count > 0) { postpone_pts_update(std::move(update), new_pts, pts_count, std::move(promise));
postponed_pts_updates_.emplace(new_pts, PendingPtsUpdate(std::move(update), new_pts, pts_count));
}
return; return;
} }
@ -6366,6 +6384,7 @@ void MessagesManager::add_pending_update(tl_object_ptr<telegram_api::Update> &&u
LOG(WARNING) << "Have old_pts (= " << old_pts << ") + pts_count (= " << pts_count << ") > new_pts (= " << new_pts LOG(WARNING) << "Have old_pts (= " << old_pts << ") + pts_count (= " << pts_count << ") > new_pts (= " << new_pts
<< "). Logged in " << G()->shared_config().get_option_integer("authorization_date") << ". Update from " << "). Logged in " << G()->shared_config().get_option_integer("authorization_date") << ". Update from "
<< source << " = " << oneline(to_string(update)); << source << " = " << oneline(to_string(update));
postpone_pts_update(std::move(update), new_pts, pts_count, std::move(promise));
set_get_difference_timeout(0.001); set_get_difference_timeout(0.001);
return; return;
} }
@ -6381,13 +6400,14 @@ void MessagesManager::add_pending_update(tl_object_ptr<telegram_api::Update> &&u
<< ", pts_count = " << pts_count << ". Logged in " << ", pts_count = " << pts_count << ". Logged in "
<< G()->shared_config().get_option_integer("authorization_date") << ". Update from " << source << " = " << G()->shared_config().get_option_integer("authorization_date") << ". Update from " << source << " = "
<< oneline(to_string(update)); << oneline(to_string(update));
postpone_pts_update(std::move(update), new_pts, pts_count, std::move(promise));
set_get_difference_timeout(0.001); set_get_difference_timeout(0.001);
return; return;
} }
LOG_IF(INFO, pts_count == 0 && update->get_id() != dummyUpdate::ID) << "Skip useless update " << to_string(update); LOG_IF(INFO, pts_count == 0 && update->get_id() != dummyUpdate::ID) << "Skip useless update " << to_string(update);
if (pending_updates_.empty() && old_pts + accumulated_pts_count_ == accumulated_pts_ && if (pending_pts_updates_.empty() && old_pts + accumulated_pts_count_ == accumulated_pts_ &&
!pts_gap_timeout_.has_timeout()) { !pts_gap_timeout_.has_timeout()) {
if (pts_count > 0) { if (pts_count > 0) {
process_update(std::move(update)); process_update(std::move(update));
@ -6397,12 +6417,11 @@ void MessagesManager::add_pending_update(tl_object_ptr<telegram_api::Update> &&u
accumulated_pts_count_ = 0; accumulated_pts_count_ = 0;
accumulated_pts_ = -1; accumulated_pts_ = -1;
} }
promise.set_value(Unit());
return; return;
} }
if (pts_count > 0) { pending_pts_updates_.emplace(new_pts, PendingPtsUpdate(std::move(update), new_pts, pts_count, std::move(promise)));
pending_updates_.emplace(new_pts, PendingPtsUpdate(std::move(update), new_pts, pts_count));
}
if (old_pts + accumulated_pts_count_ < accumulated_pts_) { if (old_pts + accumulated_pts_count_ < accumulated_pts_) {
set_get_difference_timeout(UpdatesManager::MAX_UNFILLED_GAP_TIME); set_get_difference_timeout(UpdatesManager::MAX_UNFILLED_GAP_TIME);
@ -6410,7 +6429,7 @@ void MessagesManager::add_pending_update(tl_object_ptr<telegram_api::Update> &&u
} }
CHECK(old_pts + accumulated_pts_count_ == accumulated_pts_); CHECK(old_pts + accumulated_pts_count_ == accumulated_pts_);
if (!pending_updates_.empty()) { if (!pending_pts_updates_.empty()) {
process_pending_updates(); process_pending_updates();
} }
} }
@ -6513,82 +6532,6 @@ void MessagesManager::on_update_service_notification(tl_object_ptr<telegram_api:
} }
} }
void MessagesManager::on_update_new_channel_message(tl_object_ptr<telegram_api::updateNewChannelMessage> &&update) {
int new_pts = update->pts_;
int pts_count = update->pts_count_;
DialogId dialog_id = get_message_dialog_id(update->message_);
switch (dialog_id.get_type()) {
case DialogType::None:
return;
case DialogType::User:
case DialogType::Chat:
case DialogType::SecretChat:
LOG(ERROR) << "Receive updateNewChannelMessage in wrong " << dialog_id;
return;
case DialogType::Channel: {
auto channel_id = dialog_id.get_channel_id();
if (!td_->contacts_manager_->have_channel(channel_id)) {
// if min channel was received
if (td_->contacts_manager_->have_min_channel(channel_id)) {
td_->updates_manager_->schedule_get_difference("on_update_new_channel_message");
return;
}
}
// Ok
break;
}
default:
UNREACHABLE();
return;
}
if (pts_count < 0 || new_pts <= pts_count) {
LOG(ERROR) << "Receive new channel message with wrong pts = " << new_pts << " or pts_count = " << pts_count << ": "
<< oneline(to_string(update));
return;
}
add_pending_channel_update(dialog_id, std::move(update), new_pts, pts_count, "on_update_new_channel_message");
}
void MessagesManager::on_update_edit_channel_message(tl_object_ptr<telegram_api::updateEditChannelMessage> &&update) {
int new_pts = update->pts_;
int pts_count = update->pts_count_;
DialogId dialog_id = get_message_dialog_id(update->message_);
switch (dialog_id.get_type()) {
case DialogType::None:
return;
case DialogType::User:
case DialogType::Chat:
case DialogType::SecretChat:
LOG(ERROR) << "Receive updateEditChannelMessage in wrong " << dialog_id;
return;
case DialogType::Channel: {
auto channel_id = dialog_id.get_channel_id();
if (!td_->contacts_manager_->have_channel(channel_id)) {
// if min channel was received
if (td_->contacts_manager_->have_min_channel(channel_id)) {
td_->updates_manager_->schedule_get_difference("on_update_edit_channel_message");
return;
}
}
// Ok
break;
}
default:
UNREACHABLE();
return;
}
if (pts_count < 0 || new_pts <= pts_count) {
LOG(ERROR) << "Receive edited channel message with wrong pts = " << new_pts << " or pts_count = " << pts_count
<< ": " << oneline(to_string(update));
return;
}
add_pending_channel_update(dialog_id, std::move(update), new_pts, pts_count, "on_update_edit_channel_message");
}
void MessagesManager::on_update_read_channel_inbox(tl_object_ptr<telegram_api::updateReadChannelInbox> &&update) { void MessagesManager::on_update_read_channel_inbox(tl_object_ptr<telegram_api::updateReadChannelInbox> &&update) {
ChannelId channel_id(update->channel_id_); ChannelId channel_id(update->channel_id_);
if (!channel_id.is_valid()) { if (!channel_id.is_valid()) {
@ -6819,7 +6762,10 @@ bool MessagesManager::is_active_message_reply_info(DialogId dialog_id, const Mes
bool MessagesManager::is_visible_message_reply_info(DialogId dialog_id, const Message *m) const { bool MessagesManager::is_visible_message_reply_info(DialogId dialog_id, const Message *m) const {
CHECK(m != nullptr); CHECK(m != nullptr);
if (!m->message_id.is_valid() || !m->message_id.is_server()) { if (!m->message_id.is_valid()) {
return false;
}
if (!m->message_id.is_server() && !m->message_id.is_yet_unsent()) {
return false; return false;
} }
if (is_broadcast_channel(dialog_id) && (m->had_reply_markup || m->reply_markup != nullptr)) { if (is_broadcast_channel(dialog_id) && (m->had_reply_markup || m->reply_markup != nullptr)) {
@ -7248,28 +7194,41 @@ void MessagesManager::cancel_user_dialog_action(DialogId dialog_id, const Messag
} }
void MessagesManager::add_pending_channel_update(DialogId dialog_id, tl_object_ptr<telegram_api::Update> &&update, void MessagesManager::add_pending_channel_update(DialogId dialog_id, tl_object_ptr<telegram_api::Update> &&update,
int32 new_pts, int32 pts_count, const char *source, int32 new_pts, int32 pts_count, Promise<Unit> &&promise,
bool is_postponed_update) { const char *source, bool is_postponed_update) {
LOG(INFO) << "Receive from " << source << " pending " << to_string(update); LOG(INFO) << "Receive from " << source << " pending " << to_string(update);
CHECK(update != nullptr); CHECK(update != nullptr);
CHECK(dialog_id.get_type() == DialogType::Channel); if (dialog_id.get_type() != DialogType::Channel) {
LOG(ERROR) << "Receive channel update in invalid " << dialog_id << " from " << source << ": "
<< oneline(to_string(update));
promise.set_value(Unit());
return;
}
if (pts_count < 0 || new_pts <= pts_count) { if (pts_count < 0 || new_pts <= pts_count) {
LOG(ERROR) << "Receive channel update from " << source << " with wrong pts = " << new_pts LOG(ERROR) << "Receive channel update from " << source << " with wrong pts = " << new_pts
<< " or pts_count = " << pts_count << ": " << oneline(to_string(update)); << " or pts_count = " << pts_count << ": " << oneline(to_string(update));
promise.set_value(Unit());
return;
}
auto channel_id = dialog_id.get_channel_id();
if (!td_->contacts_manager_->have_channel(channel_id) && td_->contacts_manager_->have_min_channel(channel_id)) {
td_->updates_manager_->schedule_get_difference("on_update_new_channel_message");
promise.set_value(Unit());
return; return;
} }
// TODO need to save all updates that can change result of running queries not associated with pts (for example // TODO need to save all updates that can change result of running queries not associated with pts (for example
// getHistory) and apply them to result of this queries // getHistory) and apply them to result of these queries
Dialog *d = get_dialog_force(dialog_id); Dialog *d = get_dialog_force(dialog_id);
if (d == nullptr) { if (d == nullptr) {
auto pts = load_channel_pts(dialog_id); auto pts = load_channel_pts(dialog_id);
if (pts > 0) { if (pts > 0) {
auto channel_id = dialog_id.get_channel_id();
if (!td_->contacts_manager_->have_channel(channel_id)) { if (!td_->contacts_manager_->have_channel(channel_id)) {
// do not create dialog if there is no info about the channel // do not create dialog if there is no info about the channel
LOG(INFO) << "There is no info about " << channel_id << ", so ignore " << oneline(to_string(update)); LOG(INFO) << "There is no info about " << channel_id << ", so ignore " << oneline(to_string(update));
promise.set_value(Unit());
return; return;
} }
@ -7286,6 +7245,7 @@ void MessagesManager::add_pending_channel_update(DialogId dialog_id, tl_object_p
// if there is no dialog, it can be created by the update // if there is no dialog, it can be created by the update
LOG(INFO) << "Receive pending update from " << source << " about unknown " << dialog_id; LOG(INFO) << "Receive pending update from " << source << " about unknown " << dialog_id;
if (running_get_channel_difference(dialog_id)) { if (running_get_channel_difference(dialog_id)) {
promise.set_value(Unit());
return; return;
} }
} else { } else {
@ -7310,6 +7270,7 @@ void MessagesManager::add_pending_channel_update(DialogId dialog_id, tl_object_p
// apply sent channel message // apply sent channel message
on_get_message(std::move(update_new_channel_message->message_), true, true, false, true, true, on_get_message(std::move(update_new_channel_message->message_), true, true, false, true, true,
"updateNewChannelMessage with an awaited message"); "updateNewChannelMessage with an awaited message");
promise.set_value(Unit());
return; return;
} }
} }
@ -7319,6 +7280,7 @@ void MessagesManager::add_pending_channel_update(DialogId dialog_id, tl_object_p
// apply sent channel message // apply sent channel message
on_send_message_success(update_sent_message->random_id_, update_sent_message->message_id_, on_send_message_success(update_sent_message->random_id_, update_sent_message->message_id_,
update_sent_message->date_, FileId(), "process old updateSentChannelMessage"); update_sent_message->date_, FileId(), "process old updateSentChannelMessage");
promise.set_value(Unit());
return; return;
} }
} }
@ -7326,14 +7288,14 @@ void MessagesManager::add_pending_channel_update(DialogId dialog_id, tl_object_p
LOG_IF(WARNING, new_pts == old_pts && pts_count == 0) LOG_IF(WARNING, new_pts == old_pts && pts_count == 0)
<< "Receive from " << source << " useless channel update " << oneline(to_string(update)); << "Receive from " << source << " useless channel update " << oneline(to_string(update));
LOG(INFO) << "Skip already applied channel update"; LOG(INFO) << "Skip already applied channel update";
promise.set_value(Unit());
return; return;
} }
if (running_get_channel_difference(dialog_id)) { if (running_get_channel_difference(dialog_id)) {
if (pts_count > 0) {
d->postponed_channel_updates.emplace(new_pts, PendingPtsUpdate(std::move(update), new_pts, pts_count));
}
LOG(INFO) << "Postpone channel update, because getChannelDifference is run"; LOG(INFO) << "Postpone channel update, because getChannelDifference is run";
d->postponed_channel_updates.emplace(new_pts,
PendingPtsUpdate(std::move(update), new_pts, pts_count, std::move(promise)));
return; return;
} }
@ -7341,9 +7303,8 @@ void MessagesManager::add_pending_channel_update(DialogId dialog_id, tl_object_p
LOG(INFO) << "Found a gap in the " << dialog_id << " with pts = " << old_pts << ". new_pts = " << new_pts LOG(INFO) << "Found a gap in the " << dialog_id << " with pts = " << old_pts << ". new_pts = " << new_pts
<< ", pts_count = " << pts_count << " in update from " << source; << ", pts_count = " << pts_count << " in update from " << source;
if (pts_count > 0) { d->postponed_channel_updates.emplace(new_pts,
d->postponed_channel_updates.emplace(new_pts, PendingPtsUpdate(std::move(update), new_pts, pts_count)); PendingPtsUpdate(std::move(update), new_pts, pts_count, std::move(promise)));
}
get_channel_difference(dialog_id, old_pts, true, "add_pending_channel_update pts mismatch"); get_channel_difference(dialog_id, old_pts, true, "add_pending_channel_update pts mismatch");
return; return;
@ -7362,12 +7323,14 @@ void MessagesManager::add_pending_channel_update(DialogId dialog_id, tl_object_p
d = get_dialog(dialog_id); d = get_dialog(dialog_id);
if (d == nullptr) { if (d == nullptr) {
LOG(INFO) << "Update didn't created " << dialog_id; LOG(INFO) << "Update didn't created " << dialog_id;
promise.set_value(Unit());
return; return;
} }
} }
CHECK(new_pts > d->pts); CHECK(new_pts > d->pts);
set_channel_pts(d, new_pts, source); set_channel_pts(d, new_pts, source);
promise.set_value(Unit());
} }
bool MessagesManager::is_old_channel_update(DialogId dialog_id, int32 new_pts) { bool MessagesManager::is_old_channel_update(DialogId dialog_id, int32 new_pts) {
@ -7391,11 +7354,12 @@ void MessagesManager::process_update(tl_object_ptr<telegram_api::Update> &&updat
case dummyUpdate::ID: case dummyUpdate::ID:
LOG(INFO) << "Process dummyUpdate"; LOG(INFO) << "Process dummyUpdate";
break; break;
case telegram_api::updateNewMessage::ID: case telegram_api::updateNewMessage::ID: {
auto update_new_message = move_tl_object_as<telegram_api::updateNewMessage>(update);
LOG(INFO) << "Process updateNewMessage"; LOG(INFO) << "Process updateNewMessage";
on_get_message(std::move(move_tl_object_as<telegram_api::updateNewMessage>(update)->message_), true, false, false, on_get_message(std::move(update_new_message->message_), true, false, false, true, true, "updateNewMessage");
true, true, "updateNewMessage");
break; break;
}
case updateSentMessage::ID: { case updateSentMessage::ID: {
auto send_message_success_update = move_tl_object_as<updateSentMessage>(update); auto send_message_success_update = move_tl_object_as<updateSentMessage>(update);
LOG(INFO) << "Process updateSentMessage " << send_message_success_update->random_id_; LOG(INFO) << "Process updateSentMessage " << send_message_success_update->random_id_;
@ -7412,11 +7376,11 @@ void MessagesManager::process_update(tl_object_ptr<telegram_api::Update> &&updat
break; break;
} }
case telegram_api::updateEditMessage::ID: { case telegram_api::updateEditMessage::ID: {
auto full_message_id = auto update_edit_message = move_tl_object_as<telegram_api::updateEditMessage>(update);
on_get_message(std::move(move_tl_object_as<telegram_api::updateEditMessage>(update)->message_), false, false, auto full_message_id = on_get_message(std::move(update_edit_message->message_), false, false, false, false, false,
false, false, false, "updateEditMessage"); "updateEditMessage");
LOG(INFO) << "Process updateEditMessage"; LOG(INFO) << "Process updateEditMessage";
on_message_edited(full_message_id); on_message_edited(full_message_id, update_edit_message->pts_);
break; break;
} }
case telegram_api::updateDeleteMessages::ID: { case telegram_api::updateDeleteMessages::ID: {
@ -7477,11 +7441,13 @@ void MessagesManager::process_channel_update(tl_object_ptr<telegram_api::Update>
send_message_success_update->date_, FileId(), "process updateSentChannelMessage"); send_message_success_update->date_, FileId(), "process updateSentChannelMessage");
break; break;
} }
case telegram_api::updateNewChannelMessage::ID: case telegram_api::updateNewChannelMessage::ID: {
auto update_new_channel_message = move_tl_object_as<telegram_api::updateNewChannelMessage>(update);
LOG(INFO) << "Process updateNewChannelMessage"; LOG(INFO) << "Process updateNewChannelMessage";
on_get_message(std::move(move_tl_object_as<telegram_api::updateNewChannelMessage>(update)->message_), true, true, on_get_message(std::move(update_new_channel_message->message_), true, true, false, true, true,
false, true, true, "updateNewChannelMessage"); "updateNewChannelMessage");
break; break;
}
case telegram_api::updateDeleteChannelMessages::ID: { case telegram_api::updateDeleteChannelMessages::ID: {
auto delete_channel_messages_update = move_tl_object_as<telegram_api::updateDeleteChannelMessages>(update); auto delete_channel_messages_update = move_tl_object_as<telegram_api::updateDeleteChannelMessages>(update);
LOG(INFO) << "Process updateDeleteChannelMessages"; LOG(INFO) << "Process updateDeleteChannelMessages";
@ -7501,11 +7467,11 @@ void MessagesManager::process_channel_update(tl_object_ptr<telegram_api::Update>
break; break;
} }
case telegram_api::updateEditChannelMessage::ID: { case telegram_api::updateEditChannelMessage::ID: {
auto update_edit_channel_message = move_tl_object_as<telegram_api::updateEditChannelMessage>(update);
LOG(INFO) << "Process updateEditChannelMessage"; LOG(INFO) << "Process updateEditChannelMessage";
auto full_message_id = auto full_message_id = on_get_message(std::move(update_edit_channel_message->message_), false, true, false, false,
on_get_message(std::move(move_tl_object_as<telegram_api::updateEditChannelMessage>(update)->message_), false, false, "updateEditChannelMessage");
true, false, false, false, "updateEditChannelMessage"); on_message_edited(full_message_id, update_edit_channel_message->pts_);
on_message_edited(full_message_id);
break; break;
} }
case telegram_api::updatePinnedChannelMessages::ID: { case telegram_api::updatePinnedChannelMessages::ID: {
@ -7531,15 +7497,16 @@ void MessagesManager::process_channel_update(tl_object_ptr<telegram_api::Update>
} }
} }
void MessagesManager::on_message_edited(FullMessageId full_message_id) { void MessagesManager::on_message_edited(FullMessageId full_message_id, int32 pts) {
if (full_message_id == FullMessageId()) { if (full_message_id == FullMessageId()) {
return; return;
} }
auto dialog_id = full_message_id.get_dialog_id(); auto dialog_id = full_message_id.get_dialog_id();
Dialog *d = get_dialog(dialog_id); Dialog *d = get_dialog(dialog_id);
const Message *m = get_message(d, full_message_id.get_message_id()); Message *m = get_message(d, full_message_id.get_message_id());
CHECK(m != nullptr); CHECK(m != nullptr);
m->last_edit_pts = pts;
if (td_->auth_manager_->is_bot()) { if (td_->auth_manager_->is_bot()) {
d->last_edited_message_id = m->message_id; d->last_edited_message_id = m->message_id;
send_update_message_edited(dialog_id, m); send_update_message_edited(dialog_id, m);
@ -7548,8 +7515,9 @@ void MessagesManager::on_message_edited(FullMessageId full_message_id) {
} }
void MessagesManager::process_pending_updates() { void MessagesManager::process_pending_updates() {
for (auto &update : pending_updates_) { for (auto &update : pending_pts_updates_) {
process_update(std::move(update.second.update)); process_update(std::move(update.second.update));
update.second.promise.set_value(Unit());
} }
td_->updates_manager_->set_pts(accumulated_pts_, "process pending updates") td_->updates_manager_->set_pts(accumulated_pts_, "process pending updates")
@ -7561,7 +7529,12 @@ void MessagesManager::drop_pending_updates() {
accumulated_pts_count_ = 0; accumulated_pts_count_ = 0;
accumulated_pts_ = -1; accumulated_pts_ = -1;
pts_gap_timeout_.cancel_timeout(); pts_gap_timeout_.cancel_timeout();
pending_updates_.clear(); pending_pts_updates_.clear();
}
void MessagesManager::postpone_pts_update(tl_object_ptr<telegram_api::Update> &&update, int32 pts, int32 pts_count,
Promise<Unit> &&promise) {
postponed_pts_updates_.emplace(pts, PendingPtsUpdate(std::move(update), pts, pts_count, std::move(promise)));
} }
string MessagesManager::get_notification_settings_scope_database_key(NotificationSettingsScope scope) { string MessagesManager::get_notification_settings_scope_database_key(NotificationSettingsScope scope) {
@ -8902,8 +8875,8 @@ void MessagesManager::before_get_difference() {
// scheduled messages are not returned in getDifference, so we must always reget them after it // scheduled messages are not returned in getDifference, so we must always reget them after it
scheduled_messages_sync_generation_++; scheduled_messages_sync_generation_++;
postponed_pts_updates_.insert(std::make_move_iterator(pending_updates_.begin()), postponed_pts_updates_.insert(std::make_move_iterator(pending_pts_updates_.begin()),
std::make_move_iterator(pending_updates_.end())); std::make_move_iterator(pending_pts_updates_.end()));
drop_pending_updates(); drop_pending_updates();
} }
@ -8919,9 +8892,10 @@ void MessagesManager::after_get_difference() {
if (new_pts <= old_pts) { if (new_pts <= old_pts) {
skip_old_pending_update(std::move(update.second.update), new_pts, old_pts, update.second.pts_count, skip_old_pending_update(std::move(update.second.update), new_pts, old_pts, update.second.pts_count,
"after get difference"); "after get difference");
update.second.promise.set_value(Unit());
} else { } else {
add_pending_update(std::move(update.second.update), update.second.pts, update.second.pts_count, false, add_pending_update(std::move(update.second.update), update.second.pts, update.second.pts_count, false,
"after get difference"); std::move(update.second.promise), "after get difference");
} }
CHECK(!td_->updates_manager_->running_get_difference()); CHECK(!td_->updates_manager_->running_get_difference());
} }
@ -8994,13 +8968,13 @@ void MessagesManager::after_get_difference() {
LOG(ERROR) << "Unknown dialog " << dialog_id; LOG(ERROR) << "Unknown dialog " << dialog_id;
break; break;
} }
if (dialog_id.get_type() == DialogType::Channel || pending_updates_.empty() || message_id.is_scheduled() || if (dialog_id.get_type() == DialogType::Channel || pending_pts_updates_.empty() || message_id.is_scheduled() ||
message_id <= d->last_new_message_id) { message_id <= d->last_new_message_id) {
LOG(ERROR) << "Receive updateMessageId from " << it.second << " to " << full_message_id LOG(ERROR) << "Receive updateMessageId from " << it.second << " to " << full_message_id
<< " but not receive corresponding message, last_new_message_id = " << d->last_new_message_id; << " but not receive corresponding message, last_new_message_id = " << d->last_new_message_id;
} }
if (dialog_id.get_type() != DialogType::Channel && if (dialog_id.get_type() != DialogType::Channel &&
(pending_updates_.empty() || message_id.is_scheduled() || message_id <= d->last_new_message_id)) { (pending_pts_updates_.empty() || message_id.is_scheduled() || message_id <= d->last_new_message_id)) {
dump_debug_message_op(get_dialog(dialog_id)); dump_debug_message_op(get_dialog(dialog_id));
} }
if (message_id.is_scheduled() || message_id <= d->last_new_message_id) { if (message_id.is_scheduled() || message_id <= d->last_new_message_id) {
@ -9322,12 +9296,10 @@ void MessagesManager::on_get_history(DialogId dialog_id, MessageId from_message_
set_dialog_last_new_message_id( set_dialog_last_new_message_id(
d, last_added_message_id.is_valid() ? last_added_message_id : last_received_message_id, "on_get_history"); d, last_added_message_id.is_valid() ? last_added_message_id : last_received_message_id, "on_get_history");
} }
if (last_added_message_id.is_valid()) { if (last_added_message_id.is_valid() && last_added_message_id > d->last_message_id) {
if (last_added_message_id > d->last_message_id) { CHECK(d->last_new_message_id.is_valid());
CHECK(d->last_new_message_id.is_valid()); set_dialog_last_message_id(d, last_added_message_id, "on_get_history");
set_dialog_last_message_id(d, last_added_message_id, "on_get_history"); send_update_chat_last_message(d, "on_get_history");
send_update_chat_last_message(d, "on_get_history");
}
} }
} }
@ -13402,14 +13374,6 @@ FullMessageId MessagesManager::on_get_message(MessageInfo &&message_info, bool f
return FullMessageId(); return FullMessageId();
} }
if (is_sent_message) {
try_add_active_live_location(dialog_id, m);
// add_message_to_dialog will not update counts, because need_update == false
update_message_count_by_index(d, +1, m);
update_reply_count_by_message(d, +1, m);
}
auto pcc_it = pending_created_dialogs_.find(dialog_id); auto pcc_it = pending_created_dialogs_.find(dialog_id);
if (from_update && pcc_it != pending_created_dialogs_.end()) { if (from_update && pcc_it != pending_created_dialogs_.end()) {
pcc_it->second.set_value(Unit()); pcc_it->second.set_value(Unit());
@ -13421,6 +13385,18 @@ FullMessageId MessagesManager::on_get_message(MessageInfo &&message_info, bool f
send_update_new_message(d, m); send_update_new_message(d, m);
} }
if (is_sent_message) {
try_add_active_live_location(dialog_id, m);
// add_message_to_dialog will not update counts, because need_update == false
update_message_count_by_index(d, +1, m);
}
if (is_sent_message || need_update) {
update_reply_count_by_message(d, +1, m);
update_forward_count(dialog_id, m);
}
if (dialog_id.get_type() == DialogType::Channel && !have_input_peer(dialog_id, AccessRights::Read)) { if (dialog_id.get_type() == DialogType::Channel && !have_input_peer(dialog_id, AccessRights::Read)) {
auto p = delete_message(d, message_id, false, &need_update_dialog_pos, "get a message in inaccessible chat"); auto p = delete_message(d, message_id, false, &need_update_dialog_pos, "get a message in inaccessible chat");
CHECK(p.get() == m); CHECK(p.get() == m);
@ -16521,6 +16497,9 @@ Result<FullMessageId> MessagesManager::get_top_thread_full_message_id(DialogId d
if (!is_visible_message_reply_info(dialog_id, m)) { if (!is_visible_message_reply_info(dialog_id, m)) {
return Status::Error(400, "Message has no comments"); return Status::Error(400, "Message has no comments");
} }
if (m->message_id.is_yet_unsent()) {
return Status::Error(400, "Message is not sent yet");
}
return FullMessageId{DialogId(m->reply_info.channel_id), m->linked_top_thread_message_id}; return FullMessageId{DialogId(m->reply_info.channel_id), m->linked_top_thread_message_id};
} else { } else {
if (!m->top_thread_message_id.is_valid()) { if (!m->top_thread_message_id.is_valid()) {
@ -17004,7 +16983,7 @@ Result<std::pair<string, bool>> MessagesManager::get_message_link(FullMessageId
return Status::Error(400, "Message not found"); return Status::Error(400, "Message not found");
} }
if (m->message_id.is_yet_unsent()) { if (m->message_id.is_yet_unsent()) {
return Status::Error(400, "Message is yet unsent"); return Status::Error(400, "Message is not sent yet");
} }
if (m->message_id.is_scheduled()) { if (m->message_id.is_scheduled()) {
return Status::Error(400, "Message is scheduled"); return Status::Error(400, "Message is scheduled");
@ -17103,7 +17082,7 @@ string MessagesManager::get_message_embedding_code(FullMessageId full_message_id
return {}; return {};
} }
if (m->message_id.is_yet_unsent()) { if (m->message_id.is_yet_unsent()) {
promise.set_error(Status::Error(400, "Message is yet unsent")); promise.set_error(Status::Error(400, "Message is not sent yet"));
return {}; return {};
} }
if (m->message_id.is_scheduled()) { if (m->message_id.is_scheduled()) {
@ -23583,7 +23562,7 @@ void MessagesManager::on_message_media_uploaded(DialogId dialog_id, const Messag
auto promise = PromiseCreator::lambda( auto promise = PromiseCreator::lambda(
[actor_id = actor_id(this), dialog_id, message_id, file_id, thumbnail_file_id, schedule_date, [actor_id = actor_id(this), dialog_id, message_id, file_id, thumbnail_file_id, schedule_date,
generation = m->edit_generation, was_uploaded, was_thumbnail_uploaded, generation = m->edit_generation, was_uploaded, was_thumbnail_uploaded,
file_reference = FileManager::extract_file_reference(input_media)](Result<Unit> result) mutable { file_reference = FileManager::extract_file_reference(input_media)](Result<int32> result) mutable {
send_closure(actor_id, &MessagesManager::on_message_media_edited, dialog_id, message_id, file_id, send_closure(actor_id, &MessagesManager::on_message_media_edited, dialog_id, message_id, file_id,
thumbnail_file_id, was_uploaded, was_thumbnail_uploaded, std::move(file_reference), thumbnail_file_id, was_uploaded, was_thumbnail_uploaded, std::move(file_reference),
schedule_date, generation, std::move(result)); schedule_date, generation, std::move(result));
@ -24750,7 +24729,9 @@ void MessagesManager::cancel_edit_message_media(DialogId dialog_id, Message *m,
void MessagesManager::on_message_media_edited(DialogId dialog_id, MessageId message_id, FileId file_id, void MessagesManager::on_message_media_edited(DialogId dialog_id, MessageId message_id, FileId file_id,
FileId thumbnail_file_id, bool was_uploaded, bool was_thumbnail_uploaded, FileId thumbnail_file_id, bool was_uploaded, bool was_thumbnail_uploaded,
string file_reference, int32 schedule_date, uint64 generation, string file_reference, int32 schedule_date, uint64 generation,
Result<Unit> &&result) { Result<int32> &&result) {
// must not run getDifference
CHECK(message_id.is_any_server()); CHECK(message_id.is_any_server());
auto m = get_message({dialog_id, message_id}); auto m = get_message({dialog_id, message_id});
if (m == nullptr || m->edit_generation != generation) { if (m == nullptr || m->edit_generation != generation) {
@ -24761,15 +24742,20 @@ void MessagesManager::on_message_media_edited(DialogId dialog_id, MessageId mess
CHECK(m->edited_content != nullptr); CHECK(m->edited_content != nullptr);
if (result.is_ok()) { if (result.is_ok()) {
// message content has already been replaced from updateEdit{Channel,}Message // message content has already been replaced from updateEdit{Channel,}Message
// TODO check that it really was replaced
// need only merge files from edited_content with their uploaded counterparts // need only merge files from edited_content with their uploaded counterparts
// updateMessageContent was already sent and needs to be sent again, // updateMessageContent was already sent and needs to be sent again,
// only if 'i' and 't' sizes from edited_content was added to the photo // only if 'i' and 't' sizes from edited_content was added to the photo
auto pts = result.ok();
LOG(INFO) << "Successfully edited " << message_id << " in " << dialog_id << " with pts = " << pts
<< " and last edit pts = " << m->last_edit_pts;
std::swap(m->content, m->edited_content); std::swap(m->content, m->edited_content);
bool need_send_update_message_content = m->edited_content->get_type() == MessageContentType::Photo && bool need_send_update_message_content = m->edited_content->get_type() == MessageContentType::Photo &&
m->content->get_type() == MessageContentType::Photo; m->content->get_type() == MessageContentType::Photo;
update_message_content(dialog_id, m, std::move(m->edited_content), need_send_update_message_content, true, true); bool need_merge_files = pts != 0 && pts == m->last_edit_pts;
update_message_content(dialog_id, m, std::move(m->edited_content), need_send_update_message_content,
need_merge_files, true);
} else { } else {
LOG(INFO) << "Failed to edit " << message_id << " in " << dialog_id << ": " << result.error();
if (was_uploaded) { if (was_uploaded) {
if (was_thumbnail_uploaded) { if (was_thumbnail_uploaded) {
CHECK(thumbnail_file_id.is_valid()); CHECK(thumbnail_file_id.is_valid());
@ -28374,6 +28360,8 @@ FullMessageId MessagesManager::on_send_message_success(int64 random_id, MessageI
send_update_chat_last_message(d, "on_send_message_success"); send_update_chat_last_message(d, "on_send_message_success");
} }
try_add_active_live_location(dialog_id, m); try_add_active_live_location(dialog_id, m);
update_reply_count_by_message(d, +1, m);
update_forward_count(dialog_id, m);
being_readded_message_id_ = FullMessageId(); being_readded_message_id_ = FullMessageId();
return {dialog_id, new_message_id}; return {dialog_id, new_message_id};
} }
@ -29134,6 +29122,10 @@ void MessagesManager::on_update_dialog_last_pinned_message_id(DialogId dialog_id
d->is_last_pinned_message_id_inited = true; d->is_last_pinned_message_id_inited = true;
on_dialog_updated(dialog_id, "on_update_dialog_last_pinned_message_id"); on_dialog_updated(dialog_id, "on_update_dialog_last_pinned_message_id");
} }
Message *m = get_message_force(d, pinned_message_id, "on_update_dialog_last_pinned_message_id");
if (m != nullptr && update_message_is_pinned(d, m, true, "on_update_dialog_last_pinned_message_id")) {
on_message_changed(d, m, true, "on_update_dialog_last_pinned_message_id");
}
return; return;
} }
@ -29142,6 +29134,11 @@ void MessagesManager::on_update_dialog_last_pinned_message_id(DialogId dialog_id
void MessagesManager::set_dialog_last_pinned_message_id(Dialog *d, MessageId pinned_message_id) { void MessagesManager::set_dialog_last_pinned_message_id(Dialog *d, MessageId pinned_message_id) {
CHECK(d != nullptr); CHECK(d != nullptr);
Message *m = get_message_force(d, pinned_message_id, "set_dialog_last_pinned_message_id");
if (m != nullptr && update_message_is_pinned(d, m, true, "set_dialog_last_pinned_message_id")) {
on_message_changed(d, m, true, "set_dialog_last_pinned_message_id");
}
if (d->last_pinned_message_id == pinned_message_id) { if (d->last_pinned_message_id == pinned_message_id) {
return; return;
} }
@ -32145,7 +32142,6 @@ MessagesManager::Message *MessagesManager::add_message_to_dialog(Dialog *d, uniq
} }
if (*need_update) { if (*need_update) {
update_message_count_by_index(d, +1, message.get()); update_message_count_by_index(d, +1, message.get());
update_reply_count_by_message(d, +1, message.get());
} }
if (auto_attach && message_id > d->last_message_id && message_id >= d->last_new_message_id) { if (auto_attach && message_id > d->last_message_id && message_id >= d->last_new_message_id) {
set_dialog_last_message_id(d, message_id, "add_message_to_dialog"); set_dialog_last_message_id(d, message_id, "add_message_to_dialog");
@ -32330,12 +32326,6 @@ MessagesManager::Message *MessagesManager::add_message_to_dialog(Dialog *d, uniq
} }
} }
} }
if (!td_->auth_manager_->is_bot() && from_update && m->forward_info != nullptr &&
m->forward_info->sender_dialog_id.is_valid() && m->forward_info->message_id.is_valid() &&
(!is_discussion_message(dialog_id, m) || m->forward_info->sender_dialog_id != m->forward_info->from_dialog_id ||
m->forward_info->message_id != m->forward_info->from_message_id)) {
update_forward_count(m->forward_info->sender_dialog_id, m->forward_info->message_id, m->date);
}
return result_message; return result_message;
} }
@ -33287,6 +33277,7 @@ bool MessagesManager::update_message_content(DialogId dialog_id, Message *old_me
"update_message_content"); "update_message_content");
} }
old_content = std::move(new_content); old_content = std::move(new_content);
old_message->last_edit_pts = 0;
update_message_content_file_id_remote(old_content.get(), old_file_id); update_message_content_file_id_remote(old_content.get(), old_file_id);
} else { } else {
update_message_content_file_id_remote(old_content.get(), get_message_content_any_file_id(new_content.get())); update_message_content_file_id_remote(old_content.get(), get_message_content_any_file_id(new_content.get()));
@ -35490,31 +35481,42 @@ void MessagesManager::after_get_channel_difference(DialogId dialog_id, bool succ
auto d = get_dialog(dialog_id); auto d = get_dialog(dialog_id);
if (d != nullptr) { if (d != nullptr) {
bool have_access = have_input_peer(dialog_id, AccessRights::Read); bool have_access = have_input_peer(dialog_id, AccessRights::Read);
if (!have_access) { if (!d->postponed_channel_updates.empty()) {
d->postponed_channel_updates.clear();
} else if (!d->postponed_channel_updates.empty()) {
LOG(INFO) << "Begin to apply postponed channel updates"; LOG(INFO) << "Begin to apply postponed channel updates";
while (!d->postponed_channel_updates.empty()) { while (!d->postponed_channel_updates.empty()) {
auto it = d->postponed_channel_updates.begin(); auto it = d->postponed_channel_updates.begin();
auto update = std::move(it->second.update); auto update = std::move(it->second.update);
auto update_pts = it->second.pts; auto update_pts = it->second.pts;
auto update_pts_count = it->second.pts_count; auto update_pts_count = it->second.pts_count;
auto promise = std::move(it->second.promise);
d->postponed_channel_updates.erase(it); d->postponed_channel_updates.erase(it);
auto old_size = d->postponed_channel_updates.size(); auto old_size = d->postponed_channel_updates.size();
auto update_id = update->get_id(); auto update_id = update->get_id();
add_pending_channel_update(dialog_id, std::move(update), update_pts, update_pts_count, if (have_access) {
"apply postponed channel updates", true); add_pending_channel_update(dialog_id, std::move(update), update_pts, update_pts_count, std::move(promise),
"apply postponed channel updates", true);
} else {
promise.set_value(Unit());
}
if (d->postponed_channel_updates.size() != old_size || running_get_channel_difference(dialog_id)) { if (d->postponed_channel_updates.size() != old_size || running_get_channel_difference(dialog_id)) {
if (success && update_pts < d->pts + 10000 && update_pts_count == 1) { if (success && update_pts < d->pts + 10000 && update_pts_count == 1) {
// if getChannelDifference was successful and update pts is near channel pts, // if getChannelDifference was successful and update pts is near channel pts,
// we hope that the update eventually can be applied // we hope that the update eventually can be applied
LOG(INFO) << "Can't apply postponed channel updates"; LOG(INFO) << "Can't apply postponed channel updates";
} else { } else {
// otherwise we protecting from getChannelDifference repeating calls by dropping pending updates // otherwise protect from getChannelDifference repeating calls by dropping postponed updates
LOG(WARNING) << "Failed to apply postponed updates of type " << update_id << " in " << dialog_id LOG(WARNING) << "Failed to apply postponed updates of type " << update_id << " in " << dialog_id
<< " with pts " << d->pts << ", update pts is " << update_pts << ", update pts count is " << " with pts " << d->pts << ", update pts is " << update_pts << ", update pts count is "
<< update_pts_count; << update_pts_count;
vector<Promise<Unit>> update_promises;
for (auto &postponed_update : d->postponed_channel_updates) {
update_promises.push_back(std::move(postponed_update.second.promise));
}
d->postponed_channel_updates.clear(); d->postponed_channel_updates.clear();
for (auto &update_promise : update_promises) {
update_promise.set_value(Unit());
}
} }
break; break;
} }
@ -35679,6 +35681,15 @@ void MessagesManager::update_top_dialogs(DialogId dialog_id, const Message *m) {
} }
} }
void MessagesManager::update_forward_count(DialogId dialog_id, const Message *m) {
if (!td_->auth_manager_->is_bot() && m->forward_info != nullptr && m->forward_info->sender_dialog_id.is_valid() &&
m->forward_info->message_id.is_valid() &&
(!is_discussion_message(dialog_id, m) || m->forward_info->sender_dialog_id != m->forward_info->from_dialog_id ||
m->forward_info->message_id != m->forward_info->from_message_id)) {
update_forward_count(m->forward_info->sender_dialog_id, m->forward_info->message_id, m->date);
}
}
void MessagesManager::update_forward_count(DialogId dialog_id, MessageId message_id, int32 update_date) { void MessagesManager::update_forward_count(DialogId dialog_id, MessageId message_id, int32 update_date) {
CHECK(!td_->auth_manager_->is_bot()); CHECK(!td_->auth_manager_->is_bot());
Dialog *d = get_dialog(dialog_id); Dialog *d = get_dialog(dialog_id);

View File

@ -347,10 +347,6 @@ class MessagesManager : public Actor {
void on_update_service_notification(tl_object_ptr<telegram_api::updateServiceNotification> &&update, void on_update_service_notification(tl_object_ptr<telegram_api::updateServiceNotification> &&update,
bool skip_new_entities, Promise<Unit> &&promise); bool skip_new_entities, Promise<Unit> &&promise);
void on_update_new_channel_message(tl_object_ptr<telegram_api::updateNewChannelMessage> &&update);
void on_update_edit_channel_message(tl_object_ptr<telegram_api::updateEditChannelMessage> &&update);
void on_update_read_channel_inbox(tl_object_ptr<telegram_api::updateReadChannelInbox> &&update); void on_update_read_channel_inbox(tl_object_ptr<telegram_api::updateReadChannelInbox> &&update);
void on_update_read_channel_outbox(tl_object_ptr<telegram_api::updateReadChannelOutbox> &&update); void on_update_read_channel_outbox(tl_object_ptr<telegram_api::updateReadChannelOutbox> &&update);
@ -797,10 +793,11 @@ class MessagesManager : public Actor {
bool skip_not_found); bool skip_not_found);
void add_pending_update(tl_object_ptr<telegram_api::Update> &&update, int32 new_pts, int32 pts_count, void add_pending_update(tl_object_ptr<telegram_api::Update> &&update, int32 new_pts, int32 pts_count,
bool force_apply, const char *source); bool force_apply, Promise<Unit> &&promise, const char *source);
void add_pending_channel_update(DialogId dialog_id, tl_object_ptr<telegram_api::Update> &&update, int32 new_pts, void add_pending_channel_update(DialogId dialog_id, tl_object_ptr<telegram_api::Update> &&update, int32 new_pts,
int32 pts_count, const char *source, bool is_postponed_update = false); int32 pts_count, Promise<Unit> &&promise, const char *source,
bool is_postponed_update = false);
bool is_old_channel_update(DialogId dialog_id, int32 new_pts); bool is_old_channel_update(DialogId dialog_id, int32 new_pts);
@ -977,9 +974,10 @@ class MessagesManager : public Actor {
tl_object_ptr<telegram_api::Update> update; tl_object_ptr<telegram_api::Update> update;
int32 pts; int32 pts;
int32 pts_count; int32 pts_count;
Promise<Unit> promise;
PendingPtsUpdate(tl_object_ptr<telegram_api::Update> &&update, int32 pts, int32 pts_count) PendingPtsUpdate(tl_object_ptr<telegram_api::Update> &&update, int32 pts, int32 pts_count, Promise<Unit> &&promise)
: update(std::move(update)), pts(pts), pts_count(pts_count) { : update(std::move(update)), pts(pts), pts_count(pts_count), promise(std::move(promise)) {
} }
}; };
@ -1144,6 +1142,8 @@ class MessagesManager : public Actor {
uint64 edit_generation = 0; uint64 edit_generation = 0;
Promise<Unit> edit_promise; Promise<Unit> edit_promise;
int32 last_edit_pts = 0;
unique_ptr<Message> left; unique_ptr<Message> left;
unique_ptr<Message> right; unique_ptr<Message> right;
@ -1815,7 +1815,7 @@ class MessagesManager : public Actor {
void on_message_media_edited(DialogId dialog_id, MessageId message_id, FileId file_id, FileId thumbnail_file_id, void on_message_media_edited(DialogId dialog_id, MessageId message_id, FileId file_id, FileId thumbnail_file_id,
bool was_uploaded, bool was_thumbnail_uploaded, string file_reference, bool was_uploaded, bool was_thumbnail_uploaded, string file_reference,
int32 scheduled_date, uint64 generation, Result<Unit> &&result); int32 scheduled_date, uint64 generation, Result<int32> &&result);
MessageId get_persistent_message_id(const Dialog *d, MessageId message_id) const; MessageId get_persistent_message_id(const Dialog *d, MessageId message_id) const;
@ -1834,7 +1834,7 @@ class MessagesManager : public Actor {
void process_channel_update(tl_object_ptr<telegram_api::Update> &&update); void process_channel_update(tl_object_ptr<telegram_api::Update> &&update);
void on_message_edited(FullMessageId full_message_id); void on_message_edited(FullMessageId full_message_id, int32 pts);
void delete_messages_from_updates(const vector<MessageId> &message_ids); void delete_messages_from_updates(const vector<MessageId> &message_ids);
@ -2800,6 +2800,9 @@ class MessagesManager : public Actor {
void drop_pending_updates(); void drop_pending_updates();
void postpone_pts_update(tl_object_ptr<telegram_api::Update> &&update, int32 pts, int32 pts_count,
Promise<Unit> &&promise);
static string get_channel_pts_key(DialogId dialog_id); static string get_channel_pts_key(DialogId dialog_id);
int32 load_channel_pts(DialogId dialog_id) const; int32 load_channel_pts(DialogId dialog_id) const;
@ -2917,6 +2920,8 @@ class MessagesManager : public Actor {
void update_top_dialogs(DialogId dialog_id, const Message *m); void update_top_dialogs(DialogId dialog_id, const Message *m);
void update_forward_count(DialogId dialog_id, const Message *m);
void update_forward_count(DialogId dialog_id, MessageId message_id, int32 update_date); void update_forward_count(DialogId dialog_id, MessageId message_id, int32 update_date);
void try_hide_distance(DialogId dialog_id, const Message *m); void try_hide_distance(DialogId dialog_id, const Message *m);
@ -3111,7 +3116,7 @@ class MessagesManager : public Actor {
bool running_get_difference_ = false; // true after before_get_difference and false after after_get_difference bool running_get_difference_ = false; // true after before_get_difference and false after after_get_difference
std::unordered_map<DialogId, unique_ptr<Dialog>, DialogIdHash> dialogs_; std::unordered_map<DialogId, unique_ptr<Dialog>, DialogIdHash> dialogs_;
std::multimap<int32, PendingPtsUpdate> pending_updates_; std::multimap<int32, PendingPtsUpdate> pending_pts_updates_;
std::multimap<int32, PendingPtsUpdate> postponed_pts_updates_; std::multimap<int32, PendingPtsUpdate> postponed_pts_updates_;
std::unordered_set<DialogId, DialogIdHash> std::unordered_set<DialogId, DialogIdHash>

View File

@ -6114,12 +6114,6 @@ void Td::on_request(uint64 id, const td_api::toggleGroupCallParticipantIsMuted &
GroupCallId(request.group_call_id_), UserId(request.user_id_), request.is_muted_, std::move(promise)); GroupCallId(request.group_call_id_), UserId(request.user_id_), request.is_muted_, std::move(promise));
} }
void Td::on_request(uint64 id, const td_api::checkGroupCallIsJoined &request) {
CHECK_IS_USER();
CREATE_OK_REQUEST_PROMISE();
group_call_manager_->check_group_call_is_joined(GroupCallId(request.group_call_id_), std::move(promise));
}
void Td::on_request(uint64 id, const td_api::loadGroupCallParticipants &request) { void Td::on_request(uint64 id, const td_api::loadGroupCallParticipants &request) {
CHECK_IS_USER(); CHECK_IS_USER();
CREATE_OK_REQUEST_PROMISE(); CREATE_OK_REQUEST_PROMISE();

View File

@ -719,8 +719,6 @@ class Td final : public NetQueryCallback {
void on_request(uint64 id, const td_api::toggleGroupCallParticipantIsMuted &request); void on_request(uint64 id, const td_api::toggleGroupCallParticipantIsMuted &request);
void on_request(uint64 id, const td_api::checkGroupCallIsJoined &request);
void on_request(uint64 id, const td_api::loadGroupCallParticipants &request); void on_request(uint64 id, const td_api::loadGroupCallParticipants &request);
void on_request(uint64 id, const td_api::leaveGroupCall &request); void on_request(uint64 id, const td_api::leaveGroupCall &request);

View File

@ -1008,6 +1008,37 @@ vector<DialogId> UpdatesManager::get_chat_dialog_ids(const telegram_api::Updates
return dialog_ids; return dialog_ids;
} }
int32 UpdatesManager::get_update_edit_message_pts(const telegram_api::Updates *updates_ptr) {
int32 pts = 0;
auto updates = get_updates(updates_ptr);
if (updates != nullptr) {
for (auto &update : *updates) {
int32 update_pts = [&] {
switch (update->get_id()) {
case telegram_api::updateEditMessage::ID:
return static_cast<const telegram_api::updateEditMessage *>(update.get())->pts_;
case telegram_api::updateEditChannelMessage::ID:
return static_cast<const telegram_api::updateEditChannelMessage *>(update.get())->pts_;
default:
return 0;
}
}();
if (update_pts != 0) {
if (pts == 0) {
pts = update_pts;
} else {
pts = -1;
}
}
}
}
if (pts == -1) {
LOG(ERROR) << "Receive multiple edit message updates in " << to_string(*updates_ptr);
pts = 0;
}
return pts;
}
void UpdatesManager::init_state() { void UpdatesManager::init_state() {
if (!td_->auth_manager_->is_authorized()) { if (!td_->auth_manager_->is_authorized()) {
return; return;
@ -1695,14 +1726,17 @@ void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateNewMessage> upd
Promise<Unit> &&promise) { Promise<Unit> &&promise) {
int new_pts = update->pts_; int new_pts = update->pts_;
int pts_count = update->pts_count_; int pts_count = update->pts_count_;
td_->messages_manager_->add_pending_update(std::move(update), new_pts, pts_count, force_apply, "on_updateNewMessage"); td_->messages_manager_->add_pending_update(std::move(update), new_pts, pts_count, force_apply, std::move(promise),
promise.set_value(Unit()); "updateNewMessage");
} }
void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateNewChannelMessage> update, bool /*force_apply*/, void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateNewChannelMessage> update, bool /*force_apply*/,
Promise<Unit> &&promise) { Promise<Unit> &&promise) {
td_->messages_manager_->on_update_new_channel_message(std::move(update)); DialogId dialog_id = MessagesManager::get_message_dialog_id(update->message_);
promise.set_value(Unit()); int new_pts = update->pts_;
int pts_count = update->pts_count_;
td_->messages_manager_->add_pending_channel_update(dialog_id, std::move(update), new_pts, pts_count,
std::move(promise), "updateNewChannelMessage");
} }
void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateMessageID> update, bool force_apply, void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateMessageID> update, bool force_apply,
@ -1721,18 +1755,16 @@ void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateReadMessagesCon
Promise<Unit> &&promise) { Promise<Unit> &&promise) {
int new_pts = update->pts_; int new_pts = update->pts_;
int pts_count = update->pts_count_; int pts_count = update->pts_count_;
td_->messages_manager_->add_pending_update(std::move(update), new_pts, pts_count, force_apply, td_->messages_manager_->add_pending_update(std::move(update), new_pts, pts_count, force_apply, std::move(promise),
"on_updateReadMessagesContents"); "updateReadMessagesContents");
promise.set_value(Unit());
} }
void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateEditMessage> update, bool force_apply, void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateEditMessage> update, bool force_apply,
Promise<Unit> &&promise) { Promise<Unit> &&promise) {
int new_pts = update->pts_; int new_pts = update->pts_;
int pts_count = update->pts_count_; int pts_count = update->pts_count_;
td_->messages_manager_->add_pending_update(std::move(update), new_pts, pts_count, force_apply, td_->messages_manager_->add_pending_update(std::move(update), new_pts, pts_count, force_apply, std::move(promise),
"on_updateEditMessage"); "updateEditMessage");
promise.set_value(Unit());
} }
void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateDeleteMessages> update, bool force_apply, void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateDeleteMessages> update, bool force_apply,
@ -1741,12 +1773,12 @@ void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateDeleteMessages>
int pts_count = update->pts_count_; int pts_count = update->pts_count_;
if (update->messages_.empty()) { if (update->messages_.empty()) {
td_->messages_manager_->add_pending_update(make_tl_object<dummyUpdate>(), new_pts, pts_count, force_apply, td_->messages_manager_->add_pending_update(make_tl_object<dummyUpdate>(), new_pts, pts_count, force_apply,
"on_updateDeleteMessages"); Promise<Unit>(), "updateDeleteMessages");
promise.set_value(Unit());
} else { } else {
td_->messages_manager_->add_pending_update(std::move(update), new_pts, pts_count, force_apply, td_->messages_manager_->add_pending_update(std::move(update), new_pts, pts_count, force_apply, std::move(promise),
"on_updateDeleteMessages"); "updateDeleteMessages");
} }
promise.set_value(Unit());
} }
void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateReadHistoryInbox> update, bool force_apply, void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateReadHistoryInbox> update, bool force_apply,
@ -1756,18 +1788,16 @@ void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateReadHistoryInbo
if (force_apply) { if (force_apply) {
update->still_unread_count_ = -1; update->still_unread_count_ = -1;
} }
td_->messages_manager_->add_pending_update(std::move(update), new_pts, pts_count, force_apply, td_->messages_manager_->add_pending_update(std::move(update), new_pts, pts_count, force_apply, std::move(promise),
"on_updateReadHistoryInbox"); "updateReadHistoryInbox");
promise.set_value(Unit());
} }
void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateReadHistoryOutbox> update, bool force_apply, void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateReadHistoryOutbox> update, bool force_apply,
Promise<Unit> &&promise) { Promise<Unit> &&promise) {
int new_pts = update->pts_; int new_pts = update->pts_;
int pts_count = update->pts_count_; int pts_count = update->pts_count_;
td_->messages_manager_->add_pending_update(std::move(update), new_pts, pts_count, force_apply, td_->messages_manager_->add_pending_update(std::move(update), new_pts, pts_count, force_apply, std::move(promise),
"on_updateReadHistoryOutbox"); "updateReadHistoryOutbox");
promise.set_value(Unit());
} }
void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateServiceNotification> update, bool /*force_apply*/, void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateServiceNotification> update, bool /*force_apply*/,
@ -1816,23 +1846,20 @@ void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateChannel> update
void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateEditChannelMessage> update, bool /*force_apply*/, void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateEditChannelMessage> update, bool /*force_apply*/,
Promise<Unit> &&promise) { Promise<Unit> &&promise) {
td_->messages_manager_->on_update_edit_channel_message(std::move(update)); DialogId dialog_id = MessagesManager::get_message_dialog_id(update->message_);
promise.set_value(Unit()); int new_pts = update->pts_;
int pts_count = update->pts_count_;
td_->messages_manager_->add_pending_channel_update(dialog_id, std::move(update), new_pts, pts_count,
std::move(promise), "updateEditChannelMessage");
} }
void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateDeleteChannelMessages> update, bool /*force_apply*/, void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateDeleteChannelMessages> update, bool /*force_apply*/,
Promise<Unit> &&promise) { Promise<Unit> &&promise) {
ChannelId channel_id(update->channel_id_); DialogId dialog_id(ChannelId(update->channel_id_));
if (!channel_id.is_valid()) { int new_pts = update->pts_;
LOG(ERROR) << "Receive invalid " << channel_id; int pts_count = update->pts_count_;
} else { td_->messages_manager_->add_pending_channel_update(dialog_id, std::move(update), new_pts, pts_count,
DialogId dialog_id(channel_id); std::move(promise), "updateDeleteChannelMessages");
int new_pts = update->pts_;
int pts_count = update->pts_count_;
td_->messages_manager_->add_pending_channel_update(dialog_id, std::move(update), new_pts, pts_count,
"on_updateDeleteChannelMessages");
}
promise.set_value(Unit());
} }
void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateChannelMessageViews> update, bool /*force_apply*/, void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateChannelMessageViews> update, bool /*force_apply*/,
@ -1893,24 +1920,17 @@ void UpdatesManager::on_update(tl_object_ptr<telegram_api::updatePinnedMessages>
Promise<Unit> &&promise) { Promise<Unit> &&promise) {
int new_pts = update->pts_; int new_pts = update->pts_;
int pts_count = update->pts_count_; int pts_count = update->pts_count_;
td_->messages_manager_->add_pending_update(std::move(update), new_pts, pts_count, force_apply, td_->messages_manager_->add_pending_update(std::move(update), new_pts, pts_count, force_apply, std::move(promise),
"on_updatePinnedMessages"); "updatePinnedMessages");
promise.set_value(Unit());
} }
void UpdatesManager::on_update(tl_object_ptr<telegram_api::updatePinnedChannelMessages> update, bool /*force_apply*/, void UpdatesManager::on_update(tl_object_ptr<telegram_api::updatePinnedChannelMessages> update, bool /*force_apply*/,
Promise<Unit> &&promise) { Promise<Unit> &&promise) {
ChannelId channel_id(update->channel_id_); DialogId dialog_id(ChannelId(update->channel_id_));
if (!channel_id.is_valid()) { int new_pts = update->pts_;
LOG(ERROR) << "Receive invalid " << channel_id; int pts_count = update->pts_count_;
} else { td_->messages_manager_->add_pending_channel_update(dialog_id, std::move(update), new_pts, pts_count,
DialogId dialog_id(channel_id); std::move(promise), "updatePinnedChannelMessages");
int new_pts = update->pts_;
int pts_count = update->pts_count_;
td_->messages_manager_->add_pending_channel_update(dialog_id, std::move(update), new_pts, pts_count,
"on_updatePinnedChannelMessages");
}
promise.set_value(Unit());
} }
void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateNotifySettings> update, bool /*force_apply*/, void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateNotifySettings> update, bool /*force_apply*/,
@ -1960,21 +1980,16 @@ void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateWebPage> update
Promise<Unit> &&promise) { Promise<Unit> &&promise) {
td_->web_pages_manager_->on_get_web_page(std::move(update->webpage_), DialogId()); td_->web_pages_manager_->on_get_web_page(std::move(update->webpage_), DialogId());
td_->messages_manager_->add_pending_update(make_tl_object<dummyUpdate>(), update->pts_, update->pts_count_, td_->messages_manager_->add_pending_update(make_tl_object<dummyUpdate>(), update->pts_, update->pts_count_,
force_apply, "on_updateWebPage"); force_apply, Promise<Unit>(), "updateWebPage");
promise.set_value(Unit()); promise.set_value(Unit());
} }
void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateChannelWebPage> update, bool /*force_apply*/, void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateChannelWebPage> update, bool /*force_apply*/,
Promise<Unit> &&promise) { Promise<Unit> &&promise) {
td_->web_pages_manager_->on_get_web_page(std::move(update->webpage_), DialogId()); td_->web_pages_manager_->on_get_web_page(std::move(update->webpage_), DialogId());
ChannelId channel_id(update->channel_id_); DialogId dialog_id(ChannelId(update->channel_id_));
if (!channel_id.is_valid()) { td_->messages_manager_->add_pending_channel_update(dialog_id, make_tl_object<dummyUpdate>(), update->pts_,
LOG(ERROR) << "Receive invalid " << channel_id; update->pts_count_, Promise<Unit>(), "updateChannelWebPage");
} else {
DialogId dialog_id(channel_id);
td_->messages_manager_->add_pending_channel_update(dialog_id, make_tl_object<dummyUpdate>(), update->pts_,
update->pts_count_, "on_updateChannelWebPage");
}
promise.set_value(Unit()); promise.set_value(Unit());
} }
@ -1987,7 +2002,7 @@ void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateFolderPeers> up
} }
td_->messages_manager_->add_pending_update(make_tl_object<dummyUpdate>(), update->pts_, update->pts_count_, td_->messages_manager_->add_pending_update(make_tl_object<dummyUpdate>(), update->pts_, update->pts_count_,
force_apply, "on_updateFolderPeers"); force_apply, Promise<Unit>(), "updateFolderPeers");
promise.set_value(Unit()); promise.set_value(Unit());
} }

View File

@ -51,6 +51,8 @@ class UpdatesManager : public Actor {
static vector<DialogId> get_chat_dialog_ids(const telegram_api::Updates *updates_ptr); static vector<DialogId> get_chat_dialog_ids(const telegram_api::Updates *updates_ptr);
static int32 get_update_edit_message_pts(const telegram_api::Updates *updates_ptr);
void get_difference(const char *source); void get_difference(const char *source);
void schedule_get_difference(const char *source); void schedule_get_difference(const char *source);

File diff suppressed because it is too large Load Diff

View File

@ -320,7 +320,7 @@ template <class T>
Result<T> to_integer_safe(Slice str) { Result<T> to_integer_safe(Slice str) {
auto res = to_integer<T>(str); auto res = to_integer<T>(str);
if ((PSLICE() << res) != str) { if ((PSLICE() << res) != str) {
return Status::Error(PSLICE() << "Can't parse \"" << str << "\" as number"); return Status::Error(PSLICE() << "Can't parse \"" << str << "\" as an integer");
} }
return res; return res;
} }