Pass promise to add_pending_channel_update.

This commit is contained in:
levlam 2020-12-25 00:14:18 +03:00
parent 3dbc996acc
commit 4a28b98022
3 changed files with 71 additions and 52 deletions

View File

@ -654,23 +654,24 @@ class UnpinAllMessagesQuery : public Td::ResultHandler {
if (affected_history->pts_count_ > 0) {
affected_history->pts_count_ = 0; // force receiving real updates from the server
auto promise = affected_history->offset_ > 0 ? Promise<Unit>() : std::move(promise_);
if (dialog_id_.get_type() == DialogType::Channel) {
td->messages_manager_->add_pending_channel_update(dialog_id_, make_tl_object<dummyUpdate>(),
affected_history->pts_, affected_history->pts_count_,
"unpin all messages");
std::move(promise), "unpin all messages");
} else {
td->messages_manager_->add_pending_update(make_tl_object<dummyUpdate>(), affected_history->pts_,
affected_history->pts_count_, false, Promise<Unit>(),
affected_history->pts_count_, false, std::move(promise),
"unpin all messages");
}
} else if (affected_history->offset_ <= 0) {
promise_.set_value(Unit());
}
if (affected_history->offset_ > 0) {
send_request();
return;
}
promise_.set_value(Unit());
}
void on_error(uint64 id, Status status) override {
@ -2385,17 +2386,17 @@ class DeleteUserHistoryQuery : public Td::ResultHandler {
CHECK(affected_history->get_id() == telegram_api::messages_affectedHistory::ID);
if (affected_history->pts_count_ > 0) {
td->messages_manager_->add_pending_channel_update(DialogId(channel_id_), make_tl_object<dummyUpdate>(),
affected_history->pts_, affected_history->pts_count_,
"delete user history query");
td->messages_manager_->add_pending_channel_update(
DialogId(channel_id_), make_tl_object<dummyUpdate>(), affected_history->pts_, affected_history->pts_count_,
affected_history->offset_ > 0 ? Promise<Unit>() : std::move(promise_), "delete user history query");
} else if (affected_history->offset_ <= 0) {
promise_.set_value(Unit());
}
if (affected_history->offset_ > 0) {
send_request();
return;
}
promise_.set_value(Unit());
}
void on_error(uint64 id, Status status) override {
@ -2576,7 +2577,7 @@ class SendMessageActor : public NetActorOnce {
if (dialog_id_.get_type() == DialogType::Channel) {
td->messages_manager_->add_pending_channel_update(
dialog_id_, make_tl_object<updateSentMessage>(random_id_, message_id, sent_message->date_),
sent_message->pts_, sent_message->pts_count_, "send message actor");
sent_message->pts_, sent_message->pts_count_, Promise<Unit>(), "send message actor");
return;
}
@ -3651,7 +3652,7 @@ class DeleteChannelMessagesQuery : public Td::ResultHandler {
if (affected_messages->pts_count_ > 0) {
td->messages_manager_->add_pending_channel_update(DialogId(channel_id_), make_tl_object<dummyUpdate>(),
affected_messages->pts_, affected_messages->pts_count_,
"DeleteChannelMessagesQuery");
Promise<Unit>(), "DeleteChannelMessagesQuery");
}
if (--query_count_ == 0) {
promise_.set_value(Unit());
@ -6399,17 +6400,20 @@ void MessagesManager::on_update_service_notification(tl_object_ptr<telegram_api:
}
}
void MessagesManager::on_update_new_channel_message(tl_object_ptr<telegram_api::updateNewChannelMessage> &&update) {
void MessagesManager::on_update_new_channel_message(tl_object_ptr<telegram_api::updateNewChannelMessage> &&update,
Promise<Unit> &&promise) {
int new_pts = update->pts_;
int pts_count = update->pts_count_;
DialogId dialog_id = get_message_dialog_id(update->message_);
switch (dialog_id.get_type()) {
case DialogType::None:
promise.set_value(Unit());
return;
case DialogType::User:
case DialogType::Chat:
case DialogType::SecretChat:
LOG(ERROR) << "Receive updateNewChannelMessage in wrong " << dialog_id;
promise.set_value(Unit());
return;
case DialogType::Channel: {
auto channel_id = dialog_id.get_channel_id();
@ -6417,6 +6421,7 @@ void MessagesManager::on_update_new_channel_message(tl_object_ptr<telegram_api::
// if min channel was received
if (td_->contacts_manager_->have_min_channel(channel_id)) {
td_->updates_manager_->schedule_get_difference("on_update_new_channel_message");
promise.set_value(Unit()); // TODO postpone
return;
}
}
@ -6428,26 +6433,24 @@ void MessagesManager::on_update_new_channel_message(tl_object_ptr<telegram_api::
return;
}
if (pts_count < 0 || new_pts <= pts_count) {
LOG(ERROR) << "Receive new channel message with wrong pts = " << new_pts << " or pts_count = " << pts_count << ": "
<< oneline(to_string(update));
return;
}
add_pending_channel_update(dialog_id, std::move(update), new_pts, pts_count, "on_update_new_channel_message");
add_pending_channel_update(dialog_id, std::move(update), new_pts, pts_count, std::move(promise),
"on_update_new_channel_message");
}
void MessagesManager::on_update_edit_channel_message(tl_object_ptr<telegram_api::updateEditChannelMessage> &&update) {
void MessagesManager::on_update_edit_channel_message(tl_object_ptr<telegram_api::updateEditChannelMessage> &&update,
Promise<Unit> &&promise) {
int new_pts = update->pts_;
int pts_count = update->pts_count_;
DialogId dialog_id = get_message_dialog_id(update->message_);
switch (dialog_id.get_type()) {
case DialogType::None:
promise.set_value(Unit());
return;
case DialogType::User:
case DialogType::Chat:
case DialogType::SecretChat:
LOG(ERROR) << "Receive updateEditChannelMessage in wrong " << dialog_id;
promise.set_value(Unit());
return;
case DialogType::Channel: {
auto channel_id = dialog_id.get_channel_id();
@ -6455,6 +6458,7 @@ void MessagesManager::on_update_edit_channel_message(tl_object_ptr<telegram_api:
// if min channel was received
if (td_->contacts_manager_->have_min_channel(channel_id)) {
td_->updates_manager_->schedule_get_difference("on_update_edit_channel_message");
promise.set_value(Unit()); // TODO postpone
return;
}
}
@ -6466,13 +6470,8 @@ void MessagesManager::on_update_edit_channel_message(tl_object_ptr<telegram_api:
return;
}
if (pts_count < 0 || new_pts <= pts_count) {
LOG(ERROR) << "Receive edited channel message with wrong pts = " << new_pts << " or pts_count = " << pts_count
<< ": " << oneline(to_string(update));
return;
}
add_pending_channel_update(dialog_id, std::move(update), new_pts, pts_count, "on_update_edit_channel_message");
add_pending_channel_update(dialog_id, std::move(update), new_pts, pts_count, std::move(promise),
"on_update_edit_channel_message");
}
void MessagesManager::on_update_read_channel_inbox(tl_object_ptr<telegram_api::updateReadChannelInbox> &&update) {
@ -7123,19 +7122,20 @@ void MessagesManager::cancel_user_dialog_action(DialogId dialog_id, const Messag
}
void MessagesManager::add_pending_channel_update(DialogId dialog_id, tl_object_ptr<telegram_api::Update> &&update,
int32 new_pts, int32 pts_count, const char *source,
bool is_postponed_update) {
int32 new_pts, int32 pts_count, Promise<Unit> &&promise,
const char *source, bool is_postponed_update) {
LOG(INFO) << "Receive from " << source << " pending " << to_string(update);
CHECK(update != nullptr);
CHECK(dialog_id.get_type() == DialogType::Channel);
if (pts_count < 0 || new_pts <= pts_count) {
LOG(ERROR) << "Receive channel update from " << source << " with wrong pts = " << new_pts
<< " or pts_count = " << pts_count << ": " << oneline(to_string(update));
promise.set_value(Unit());
return;
}
// TODO need to save all updates that can change result of running queries not associated with pts (for example
// getHistory) and apply them to result of this queries
// getHistory) and apply them to result of these queries
Dialog *d = get_dialog_force(dialog_id);
if (d == nullptr) {
@ -7145,6 +7145,7 @@ void MessagesManager::add_pending_channel_update(DialogId dialog_id, tl_object_p
if (!td_->contacts_manager_->have_channel(channel_id)) {
// do not create dialog if there is no info about the channel
LOG(INFO) << "There is no info about " << channel_id << ", so ignore " << oneline(to_string(update));
promise.set_value(Unit());
return;
}
@ -7158,6 +7159,7 @@ void MessagesManager::add_pending_channel_update(DialogId dialog_id, tl_object_p
// if there is no dialog, it can be created by the update
LOG(INFO) << "Receive pending update from " << source << " about unknown " << dialog_id;
if (running_get_channel_difference(dialog_id)) {
promise.set_value(Unit());
return;
}
} else {
@ -7182,6 +7184,7 @@ void MessagesManager::add_pending_channel_update(DialogId dialog_id, tl_object_p
// apply sent channel message
on_get_message(std::move(update_new_channel_message->message_), true, true, false, true, true,
"updateNewChannelMessage with an awaited message");
promise.set_value(Unit());
return;
}
}
@ -7191,6 +7194,7 @@ void MessagesManager::add_pending_channel_update(DialogId dialog_id, tl_object_p
// apply sent channel message
on_send_message_success(update_sent_message->random_id_, update_sent_message->message_id_,
update_sent_message->date_, FileId(), "process old updateSentChannelMessage");
promise.set_value(Unit());
return;
}
}
@ -7198,13 +7202,14 @@ void MessagesManager::add_pending_channel_update(DialogId dialog_id, tl_object_p
LOG_IF(WARNING, new_pts == old_pts && pts_count == 0)
<< "Receive from " << source << " useless channel update " << oneline(to_string(update));
LOG(INFO) << "Skip already applied channel update";
promise.set_value(Unit());
return;
}
if (running_get_channel_difference(dialog_id)) {
if (pts_count > 0) {
d->postponed_channel_updates.emplace(new_pts,
PendingPtsUpdate(std::move(update), new_pts, pts_count, Promise<Unit>()));
d->postponed_channel_updates.emplace(
new_pts, PendingPtsUpdate(std::move(update), new_pts, pts_count, std::move(promise)));
}
LOG(INFO) << "Postpone channel update, because getChannelDifference is run";
return;
@ -7215,8 +7220,8 @@ void MessagesManager::add_pending_channel_update(DialogId dialog_id, tl_object_p
<< ", pts_count = " << pts_count << " in update from " << source;
if (pts_count > 0) {
d->postponed_channel_updates.emplace(new_pts,
PendingPtsUpdate(std::move(update), new_pts, pts_count, Promise<Unit>()));
d->postponed_channel_updates.emplace(
new_pts, PendingPtsUpdate(std::move(update), new_pts, pts_count, std::move(promise)));
}
get_channel_difference(dialog_id, old_pts, true, "add_pending_channel_update pts mismatch");
@ -7236,12 +7241,14 @@ void MessagesManager::add_pending_channel_update(DialogId dialog_id, tl_object_p
d = get_dialog(dialog_id);
if (d == nullptr) {
LOG(INFO) << "Update didn't created " << dialog_id;
promise.set_value(Unit());
return;
}
}
CHECK(new_pts > d->pts);
set_channel_pts(d, new_pts, source);
promise.set_value(Unit());
}
bool MessagesManager::is_old_channel_update(DialogId dialog_id, int32 new_pts) {
@ -35182,31 +35189,42 @@ void MessagesManager::after_get_channel_difference(DialogId dialog_id, bool succ
auto d = get_dialog(dialog_id);
if (d != nullptr) {
bool have_access = have_input_peer(dialog_id, AccessRights::Read);
if (!have_access) {
d->postponed_channel_updates.clear();
} else if (!d->postponed_channel_updates.empty()) {
if (!d->postponed_channel_updates.empty()) {
LOG(INFO) << "Begin to apply postponed channel updates";
while (!d->postponed_channel_updates.empty()) {
auto it = d->postponed_channel_updates.begin();
auto update = std::move(it->second.update);
auto update_pts = it->second.pts;
auto update_pts_count = it->second.pts_count;
auto promise = std::move(it->second.promise);
d->postponed_channel_updates.erase(it);
auto old_size = d->postponed_channel_updates.size();
auto update_id = update->get_id();
add_pending_channel_update(dialog_id, std::move(update), update_pts, update_pts_count,
"apply postponed channel updates", true);
if (have_access) {
add_pending_channel_update(dialog_id, std::move(update), update_pts, update_pts_count, std::move(promise),
"apply postponed channel updates", true);
} else {
promise.set_value(Unit());
}
if (d->postponed_channel_updates.size() != old_size || running_get_channel_difference(dialog_id)) {
if (success && update_pts < d->pts + 10000 && update_pts_count == 1) {
// if getChannelDifference was successful and update pts is near channel pts,
// we hope that the update eventually can be applied
LOG(INFO) << "Can't apply postponed channel updates";
} else {
// otherwise we protecting from getChannelDifference repeating calls by dropping pending updates
// otherwise protect from getChannelDifference repeating calls by dropping postponed updates
LOG(WARNING) << "Failed to apply postponed updates of type " << update_id << " in " << dialog_id
<< " with pts " << d->pts << ", update pts is " << update_pts << ", update pts count is "
<< update_pts_count;
vector<Promise<Unit>> update_promises;
for (auto &postponed_update : d->postponed_channel_updates) {
update_promises.push_back(std::move(postponed_update.second.promise));
}
d->postponed_channel_updates.clear();
for (auto &update_promise : update_promises) {
update_promise.set_value(Unit());
}
}
break;
}

View File

@ -343,9 +343,11 @@ class MessagesManager : public Actor {
void on_update_service_notification(tl_object_ptr<telegram_api::updateServiceNotification> &&update,
bool skip_new_entities, Promise<Unit> &&promise);
void on_update_new_channel_message(tl_object_ptr<telegram_api::updateNewChannelMessage> &&update);
void on_update_new_channel_message(tl_object_ptr<telegram_api::updateNewChannelMessage> &&update,
Promise<Unit> &&promise);
void on_update_edit_channel_message(tl_object_ptr<telegram_api::updateEditChannelMessage> &&update);
void on_update_edit_channel_message(tl_object_ptr<telegram_api::updateEditChannelMessage> &&update,
Promise<Unit> &&promise);
void on_update_read_channel_inbox(tl_object_ptr<telegram_api::updateReadChannelInbox> &&update);
@ -796,7 +798,8 @@ class MessagesManager : public Actor {
bool force_apply, Promise<Unit> &&promise, const char *source);
void add_pending_channel_update(DialogId dialog_id, tl_object_ptr<telegram_api::Update> &&update, int32 new_pts,
int32 pts_count, const char *source, bool is_postponed_update = false);
int32 pts_count, Promise<Unit> &&promise, const char *source,
bool is_postponed_update = false);
bool is_old_channel_update(DialogId dialog_id, int32 new_pts);

View File

@ -1701,8 +1701,7 @@ void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateNewMessage> upd
void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateNewChannelMessage> update, bool /*force_apply*/,
Promise<Unit> &&promise) {
td_->messages_manager_->on_update_new_channel_message(std::move(update));
promise.set_value(Unit());
td_->messages_manager_->on_update_new_channel_message(std::move(update), std::move(promise));
}
void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateMessageID> update, bool force_apply,
@ -1812,8 +1811,7 @@ void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateChannel> update
void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateEditChannelMessage> update, bool /*force_apply*/,
Promise<Unit> &&promise) {
td_->messages_manager_->on_update_edit_channel_message(std::move(update));
promise.set_value(Unit());
td_->messages_manager_->on_update_edit_channel_message(std::move(update), std::move(promise));
}
void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateDeleteChannelMessages> update, bool /*force_apply*/,
@ -1821,14 +1819,14 @@ void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateDeleteChannelMe
ChannelId channel_id(update->channel_id_);
if (!channel_id.is_valid()) {
LOG(ERROR) << "Receive invalid " << channel_id;
promise.set_value(Unit());
} else {
DialogId dialog_id(channel_id);
int new_pts = update->pts_;
int pts_count = update->pts_count_;
td_->messages_manager_->add_pending_channel_update(dialog_id, std::move(update), new_pts, pts_count,
"on_updateDeleteChannelMessages");
std::move(promise), "on_updateDeleteChannelMessages");
}
promise.set_value(Unit());
}
void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateChannelMessageViews> update, bool /*force_apply*/,
@ -1898,14 +1896,14 @@ void UpdatesManager::on_update(tl_object_ptr<telegram_api::updatePinnedChannelMe
ChannelId channel_id(update->channel_id_);
if (!channel_id.is_valid()) {
LOG(ERROR) << "Receive invalid " << channel_id;
promise.set_value(Unit());
} else {
DialogId dialog_id(channel_id);
int new_pts = update->pts_;
int pts_count = update->pts_count_;
td_->messages_manager_->add_pending_channel_update(dialog_id, std::move(update), new_pts, pts_count,
"on_updatePinnedChannelMessages");
std::move(promise), "on_updatePinnedChannelMessages");
}
promise.set_value(Unit());
}
void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateNotifySettings> update, bool /*force_apply*/,
@ -1968,7 +1966,7 @@ void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateChannelWebPage>
} else {
DialogId dialog_id(channel_id);
td_->messages_manager_->add_pending_channel_update(dialog_id, make_tl_object<dummyUpdate>(), update->pts_,
update->pts_count_, "on_updateChannelWebPage");
update->pts_count_, Promise<Unit>(), "on_updateChannelWebPage");
}
promise.set_value(Unit());
}