Merge commit '71ac1f67bf160763bac8cf1183e12d15d67260b6'

This commit is contained in:
Andrea Cavalli 2021-01-05 17:20:01 +01:00
commit d81befa783
4 changed files with 143 additions and 25 deletions

View File

@ -10761,10 +10761,10 @@ void ContactsManager::on_get_channel_participants_fail(ChannelId channel_id, Cha
}
}
bool ContactsManager::speculative_add_count(int32 &count, int32 new_count) {
new_count += count;
if (new_count < 0) {
new_count = 0;
bool ContactsManager::speculative_add_count(int32 &count, int32 delta_count, int32 min_count) {
auto new_count = count + delta_count;
if (new_count < min_count) {
new_count = min_count;
}
if (new_count == count) {
return false;
@ -10780,13 +10780,13 @@ void ContactsManager::speculative_add_channel_participants(ChannelId channel_id,
auto channel_full = get_channel_full_force(channel_id, "speculative_add_channel_participants");
bool is_participants_cache_changed = false;
int32 new_participant_count = 0;
int32 delta_participant_count = 0;
for (auto user_id : added_user_ids) {
if (!user_id.is_valid()) {
continue;
}
new_participant_count++;
delta_participant_count++;
if (it != cached_channel_participants_.end()) {
auto &participants = it->second;
@ -10814,11 +10814,11 @@ void ContactsManager::speculative_add_channel_participants(ChannelId channel_id,
if (channel_full != nullptr) {
update_channel_full(channel_full, channel_id);
}
if (new_participant_count == 0) {
if (delta_participant_count == 0) {
return;
}
speculative_add_channel_participants(channel_id, new_participant_count, by_me);
speculative_add_channel_participants(channel_id, delta_participant_count, by_me);
}
void ContactsManager::speculative_delete_channel_participant(ChannelId channel_id, UserId deleted_user_id, bool by_me) {
@ -10849,7 +10849,7 @@ void ContactsManager::speculative_delete_channel_participant(ChannelId channel_i
speculative_add_channel_participants(channel_id, -1, by_me);
}
void ContactsManager::speculative_add_channel_participants(ChannelId channel_id, int32 new_participant_count,
void ContactsManager::speculative_add_channel_participants(ChannelId channel_id, int32 delta_participant_count,
bool by_me) {
if (by_me) {
// Currently ignore all changes made by the current user, because they may be already counted
@ -10857,18 +10857,22 @@ void ContactsManager::speculative_add_channel_participants(ChannelId channel_id,
return;
}
auto channel_full = get_channel_full_force(channel_id, "speculative_add_channel_participants");
auto min_count = channel_full == nullptr ? 0 : channel_full->administrator_count;
auto c = get_channel_force(channel_id);
if (c != nullptr && c->participant_count != 0 && speculative_add_count(c->participant_count, new_participant_count)) {
if (c != nullptr && c->participant_count != 0 &&
speculative_add_count(c->participant_count, delta_participant_count, min_count)) {
c->is_changed = true;
update_channel(c, channel_id);
}
auto channel_full = get_channel_full_force(channel_id, "speculative_add_channel_participants");
if (channel_full == nullptr) {
return;
}
channel_full->is_changed |= speculative_add_count(channel_full->participant_count, new_participant_count);
channel_full->is_changed |=
speculative_add_count(channel_full->participant_count, delta_participant_count, min_count);
if (channel_full->is_changed) {
channel_full->speculative_version++;
@ -10950,10 +10954,11 @@ void ContactsManager::speculative_add_channel_user(ChannelId channel_id, UserId
return;
}
channel_full->is_changed |=
speculative_add_count(channel_full->participant_count, new_status.is_member() - old_status.is_member());
channel_full->is_changed |= speculative_add_count(channel_full->administrator_count,
new_status.is_administrator() - old_status.is_administrator());
channel_full->is_changed |=
speculative_add_count(channel_full->participant_count, new_status.is_member() - old_status.is_member(),
channel_full->administrator_count);
channel_full->is_changed |=
speculative_add_count(channel_full->restricted_count, new_status.is_restricted() - old_status.is_restricted());
channel_full->is_changed |=

View File

@ -1223,9 +1223,9 @@ class ContactsManager : public Actor {
void remove_linked_channel_id(ChannelId channel_id);
ChannelId get_linked_channel_id(ChannelId channel_id) const;
static bool speculative_add_count(int32 &count, int32 new_count);
static bool speculative_add_count(int32 &count, int32 delta_count, int32 min_count = 0);
void speculative_add_channel_participants(ChannelId channel_id, int32 new_participant_count, bool by_me);
void speculative_add_channel_participants(ChannelId channel_id, int32 delta_participant_count, bool by_me);
void speculative_add_channel_user(ChannelId channel_id, UserId user_id, DialogParticipantStatus new_status,
DialogParticipantStatus old_status);

View File

@ -224,14 +224,29 @@ void UpdatesManager::get_difference(const char *source) {
VLOG(get_difference) << "Skip running getDifference from " << source << " because it is already running";
return;
}
run_get_difference(false, source);
}
void UpdatesManager::run_get_difference(bool is_recursive, const char *source) {
CHECK(get_pts() != -1);
CHECK(td_->auth_manager_->is_authorized());
CHECK(!running_get_difference_);
running_get_difference_ = true;
VLOG(get_difference) << "-----BEGIN GET DIFFERENCE----- from " << source;
before_get_difference(false);
if (!is_recursive) {
min_postponed_update_pts_ = 0;
min_postponed_update_qts_ = 0;
}
td_->create_handler<GetDifferenceQuery>()->send();
last_get_difference_pts_ = get_pts();
last_get_difference_qts_ = get_qts();
}
void UpdatesManager::before_get_difference(bool is_initial) {
@ -1066,6 +1081,7 @@ void UpdatesManager::init_state() {
pts_manager_.init(to_integer<int32>(pts_str));
last_get_difference_pts_ = get_pts();
qts_manager_.init(to_integer<int32>(pmc->get("updates.qts")));
last_get_difference_qts_ = get_qts();
date_ = to_integer<int32>(pmc->get("updates.date"));
date_source_ = "database";
LOG(DEBUG) << "Init: " << get_pts() << " " << get_qts() << " " << date_;
@ -1137,6 +1153,11 @@ void UpdatesManager::on_get_difference(tl_object_ptr<telegram_api::updates_Diffe
VLOG(get_difference) << "----- END GET DIFFERENCE-----";
running_get_difference_ = false;
if (!td_->auth_manager_->is_authorized()) {
// just in case
return;
}
if (difference_ptr == nullptr) {
on_failed_get_difference();
return;
@ -1192,8 +1213,10 @@ void UpdatesManager::on_get_difference(tl_object_ptr<telegram_api::updates_Diffe
}
case telegram_api::updates_differenceSlice::ID: {
auto difference = move_tl_object_as<telegram_api::updates_differenceSlice>(difference_ptr);
bool is_pts_changed = have_update_pts_changed(difference->other_updates_);
if (difference->intermediate_state_->pts_ >= get_pts() && get_pts() != std::numeric_limits<int32>::max() &&
difference->intermediate_state_->date_ >= date_ && difference->intermediate_state_->qts_ == get_qts()) {
difference->intermediate_state_->date_ >= date_ && difference->intermediate_state_->qts_ == get_qts() &&
!is_pts_changed) {
// TODO send new getDifference request and apply difference slice only after that
}
@ -1206,12 +1229,28 @@ void UpdatesManager::on_get_difference(tl_object_ptr<telegram_api::updates_Diffe
std::move(difference->new_encrypted_messages_),
std::move(difference->other_updates_));
if (running_get_difference_) {
LOG(ERROR) << "Get difference has run while processing get difference updates";
if (!is_pts_changed) {
LOG(ERROR) << "Get difference has run while processing get difference updates";
}
break;
}
CHECK(!is_pts_changed);
auto state = std::move(difference->intermediate_state_);
if (get_pts() != std::numeric_limits<int32>::max() && state->date_ == get_date() &&
(state->pts_ == get_pts() ||
(min_postponed_update_pts_ != 0 && state->pts_ >= min_postponed_update_pts_ + 1000)) &&
(state->qts_ == get_qts() ||
(min_postponed_update_qts_ != 0 && state->qts_ >= min_postponed_update_qts_ + 1000))) {
on_get_updates_state(std::move(state), "get difference final slice");
VLOG(get_difference) << "Trying to switch back from getDifference to update processing";
break;
}
on_get_updates_state(std::move(difference->intermediate_state_), "get difference slice");
get_difference("on updates_differenceSlice");
on_get_updates_state(std::move(state), "get difference slice");
if (get_pts() != -1) { // just in case
run_get_difference(true, "on updates_differenceSlice");
}
break;
}
case telegram_api::updates_differenceTooLong::ID: {
@ -1246,7 +1285,8 @@ void UpdatesManager::after_get_difference() {
}
if (postponed_updates_.size()) {
VLOG(get_difference) << "Begin to apply " << postponed_updates_.size() << " postponed updates";
VLOG(get_difference) << "Begin to apply " << postponed_updates_.size() << " postponed update chunks";
size_t total_update_count = 0;
while (!postponed_updates_.empty()) {
auto it = postponed_updates_.begin();
auto updates = std::move(it->second.updates);
@ -1255,15 +1295,18 @@ void UpdatesManager::after_get_difference() {
auto promise = std::move(it->second.promise);
// ignore it->second.date, because it may be too old
postponed_updates_.erase(it);
auto update_count = updates.size();
on_pending_updates(std::move(updates), updates_seq_begin, updates_seq_end, 0, std::move(promise),
"postponed updates");
if (running_get_difference_) {
VLOG(get_difference) << "Finish to apply postponed updates with " << postponed_updates_.size()
<< " updates left, because forced to run getDifference";
<< " updates left after applied " << total_update_count
<< " updates, because forced to run getDifference";
return;
}
total_update_count += update_count;
}
VLOG(get_difference) << "Finish to apply postponed updates";
VLOG(get_difference) << "Finish to apply " << total_update_count << " postponed updates";
}
td_->animations_manager_->after_get_difference();
@ -1300,6 +1343,16 @@ void UpdatesManager::on_pending_updates(vector<tl_object_ptr<telegram_api::Updat
if (running_get_difference_ /*|| string(source) != string("postponed updates")*/) {
LOG(INFO) << "Postpone " << updates.size() << " updates [" << seq_begin << ", " << seq_end
<< "] with date = " << date << " from " << source;
for (auto &update : updates) {
auto pts = get_update_pts(update.get());
if (pts != 0 && (min_postponed_update_pts_ == 0 || pts < min_postponed_update_pts_)) {
min_postponed_update_pts_ = pts;
}
auto qts = get_update_qts(update.get());
if (qts != 0 && (min_postponed_update_qts_ == 0 || qts < min_postponed_update_qts_)) {
min_postponed_update_qts_ = qts;
}
}
postponed_updates_.emplace(seq_begin,
PendingUpdates(seq_begin, seq_end, date, std::move(updates), std::move(promise)));
return;
@ -1449,8 +1502,8 @@ void UpdatesManager::on_pending_updates(vector<tl_object_ptr<telegram_api::Updat
}
if (running_get_difference_) {
LOG(INFO) << "Postpone " << updates.size() << " updates [" << seq_begin << ", " << seq_end
<< "] with date = " << date << " from " << source;
LOG(ERROR) << "Postpone " << updates.size() << " updates [" << seq_begin << ", " << seq_end
<< "] with date = " << date << " from " << source;
postponed_updates_.emplace(seq_begin,
PendingUpdates(seq_begin, seq_end, date, std::move(updates), mpas.get_promise()));
return lock.set_value(Unit());
@ -1511,6 +1564,7 @@ void UpdatesManager::add_pending_qts_update(tl_object_ptr<telegram_api::Update>
add_qts(qts - 1).set_value(Unit());
CHECK(get_qts() == qts - 1);
old_qts = qts - 1;
last_get_difference_qts_ = get_qts();
}
if (qts <= old_qts) {
@ -1616,6 +1670,10 @@ void UpdatesManager::process_seq_updates(int32 seq_end, int32 date,
void UpdatesManager::process_qts_update(tl_object_ptr<telegram_api::Update> &&update_ptr, int32 qts,
Promise<Unit> &&promise) {
LOG(DEBUG) << "Process " << to_string(update_ptr);
if (last_get_difference_qts_ + FORCED_GET_DIFFERENCE_PTS_DIFF < qts) {
last_get_difference_qts_ = qts;
schedule_get_difference("process_qts_update");
}
switch (update_ptr->get_id()) {
case telegram_api::updateNewEncryptedMessage::ID: {
auto update = move_tl_object_as<telegram_api::updateNewEncryptedMessage>(update_ptr);
@ -2015,6 +2073,50 @@ int32 UpdatesManager::get_short_update_date() const {
return now;
}
bool UpdatesManager::have_update_pts_changed(const vector<tl_object_ptr<telegram_api::Update>> &updates) {
for (auto &update : updates) {
CHECK(update != nullptr);
if (update->get_id() == telegram_api::updatePtsChanged::ID) {
return true;
}
}
return false;
}
int32 UpdatesManager::get_update_pts(const telegram_api::Update *update) {
switch (update->get_id()) {
case telegram_api::updateNewMessage::ID:
return static_cast<const telegram_api::updateNewMessage *>(update)->pts_;
case telegram_api::updateReadMessagesContents::ID:
return static_cast<const telegram_api::updateReadMessagesContents *>(update)->pts_;
case telegram_api::updateEditMessage::ID:
return static_cast<const telegram_api::updateEditMessage *>(update)->pts_;
case telegram_api::updateDeleteMessages::ID:
return static_cast<const telegram_api::updateDeleteMessages *>(update)->pts_;
case telegram_api::updateReadHistoryInbox::ID:
return static_cast<const telegram_api::updateReadHistoryInbox *>(update)->pts_;
case telegram_api::updateReadHistoryOutbox::ID:
return static_cast<const telegram_api::updateReadHistoryOutbox *>(update)->pts_;
case telegram_api::updateWebPage::ID:
return static_cast<const telegram_api::updateWebPage *>(update)->pts_;
case telegram_api::updatePinnedMessages::ID:
return static_cast<const telegram_api::updatePinnedMessages *>(update)->pts_;
default:
return 0;
}
}
int32 UpdatesManager::get_update_qts(const telegram_api::Update *update) {
switch (update->get_id()) {
case telegram_api::updateNewEncryptedMessage::ID:
return static_cast<const telegram_api::updateNewEncryptedMessage *>(update)->qts_;
case telegram_api::updateChannelParticipant::ID:
return static_cast<const telegram_api::updateChannelParticipant *>(update)->qts_;
default:
return 0;
}
}
void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateUserTyping> update, bool /*force_apply*/,
Promise<Unit> &&promise) {
UserId user_id(update->user_id_);

View File

@ -133,6 +133,9 @@ class UpdatesManager : public Actor {
bool running_get_difference_ = false;
int32 last_get_difference_pts_ = 0;
int32 last_get_difference_qts_ = 0;
int32 min_postponed_update_pts_ = 0;
int32 min_postponed_update_qts_ = 0;
void tear_down() override;
@ -184,12 +187,20 @@ class UpdatesManager : public Actor {
void set_qts_gap_timeout(double timeout);
void run_get_difference(bool is_recursive, const char *source);
void on_failed_get_difference();
void before_get_difference(bool is_initial);
void after_get_difference();
static bool have_update_pts_changed(const vector<tl_object_ptr<telegram_api::Update>> &updates);
static int32 get_update_pts(const telegram_api::Update *update);
static int32 get_update_qts(const telegram_api::Update *update);
static const vector<tl_object_ptr<telegram_api::Update>> *get_updates(const telegram_api::Updates *updates_ptr);
bool is_acceptable_user(UserId user_id) const;