|
|
|
@ -7432,6 +7432,12 @@ void MessagesManager::cancel_user_dialog_action(DialogId dialog_id, const Messag
|
|
|
|
|
m->content->get_type());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void MessagesManager::add_postponed_channel_update(DialogId dialog_id, tl_object_ptr<telegram_api::Update> &&update,
|
|
|
|
|
int32 new_pts, int32 pts_count, Promise<Unit> &&promise) {
|
|
|
|
|
postponed_channel_updates_[dialog_id].emplace(
|
|
|
|
|
new_pts, PendingPtsUpdate(std::move(update), new_pts, pts_count, std::move(promise)));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void MessagesManager::add_pending_channel_update(DialogId dialog_id, tl_object_ptr<telegram_api::Update> &&update,
|
|
|
|
|
int32 new_pts, int32 pts_count, Promise<Unit> &&promise,
|
|
|
|
|
const char *source, bool is_postponed_update) {
|
|
|
|
@ -7479,12 +7485,12 @@ void MessagesManager::add_pending_channel_update(DialogId dialog_id, tl_object_p
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (new_pts > pts && pts != new_pts - pts_count) {
|
|
|
|
|
LOG(INFO) << "Found a gap in the " << dialog_id << " with pts = " << pts << ". new_pts = " << new_pts
|
|
|
|
|
LOG(INFO) << "Found a gap in unknown " << dialog_id << " with pts = " << pts << ". new_pts = " << new_pts
|
|
|
|
|
<< ", pts_count = " << pts_count << " in update from " << source;
|
|
|
|
|
add_postponed_channel_update(dialog_id, std::move(update), new_pts, pts_count, std::move(promise));
|
|
|
|
|
auto enable_pull_based_backpressure
|
|
|
|
|
= G()->shared_config().get_option_boolean("enable_pull_based_backpressure", false);
|
|
|
|
|
get_channel_difference_delayed(dialog_id, pts, true, enable_pull_based_backpressure, "add_pending_channel_update 3");
|
|
|
|
|
promise.set_value(Unit());
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -7554,8 +7560,7 @@ void MessagesManager::add_pending_channel_update(DialogId dialog_id, tl_object_p
|
|
|
|
|
|
|
|
|
|
if (running_get_channel_difference(dialog_id)) {
|
|
|
|
|
LOG(INFO) << "Postpone channel update, because getChannelDifference is run";
|
|
|
|
|
d->postponed_channel_updates.emplace(new_pts,
|
|
|
|
|
PendingPtsUpdate(std::move(update), new_pts, pts_count, std::move(promise)));
|
|
|
|
|
add_postponed_channel_update(dialog_id, std::move(update), new_pts, pts_count, std::move(promise));
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -7564,9 +7569,7 @@ void MessagesManager::add_pending_channel_update(DialogId dialog_id, tl_object_p
|
|
|
|
|
<< ", pts_count = " << pts_count << " in update from " << source;
|
|
|
|
|
if (d->was_opened || td_->contacts_manager_->get_channel_status(channel_id).is_member() ||
|
|
|
|
|
is_dialog_sponsored(d)) {
|
|
|
|
|
d->postponed_channel_updates.emplace(
|
|
|
|
|
new_pts, PendingPtsUpdate(std::move(update), new_pts, pts_count, std::move(promise)));
|
|
|
|
|
|
|
|
|
|
add_postponed_channel_update(dialog_id, std::move(update), new_pts, pts_count, std::move(promise));
|
|
|
|
|
auto enable_pull_based_backpressure
|
|
|
|
|
= G()->shared_config().get_option_boolean("enable_pull_based_backpressure", false);
|
|
|
|
|
get_channel_difference_delayed(dialog_id, old_pts, true, enable_pull_based_backpressure,
|
|
|
|
@ -9463,10 +9466,10 @@ void MessagesManager::on_get_history(DialogId dialog_id, MessageId from_message_
|
|
|
|
|
CHECK(!from_message_id.is_scheduled());
|
|
|
|
|
|
|
|
|
|
Dialog *d = get_dialog(dialog_id);
|
|
|
|
|
CHECK(d != nullptr);
|
|
|
|
|
|
|
|
|
|
MessageId last_received_message_id = messages.empty() ? MessageId() : get_message_id(messages[0], false);
|
|
|
|
|
if (d != nullptr && old_last_new_message_id < d->last_new_message_id &&
|
|
|
|
|
(from_the_end || old_last_new_message_id < from_message_id) &&
|
|
|
|
|
if (old_last_new_message_id < d->last_new_message_id && (from_the_end || old_last_new_message_id < from_message_id) &&
|
|
|
|
|
last_received_message_id < d->last_new_message_id) {
|
|
|
|
|
// new server messages were added to the dialog since the request was sent, but weren't received
|
|
|
|
|
// they should have been received, so we must repeat the request to get them
|
|
|
|
@ -9483,19 +9486,17 @@ void MessagesManager::on_get_history(DialogId dialog_id, MessageId from_message_
|
|
|
|
|
bool have_full_history = from_the_end && narrow_cast<int32>(messages.size()) < limit && messages.size() <= 1;
|
|
|
|
|
|
|
|
|
|
if (messages.empty()) {
|
|
|
|
|
if (d != nullptr) {
|
|
|
|
|
if (have_full_history) {
|
|
|
|
|
d->have_full_history = true;
|
|
|
|
|
on_dialog_updated(dialog_id, "set have_full_history");
|
|
|
|
|
}
|
|
|
|
|
if (have_full_history) {
|
|
|
|
|
d->have_full_history = true;
|
|
|
|
|
on_dialog_updated(dialog_id, "set have_full_history");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (from_the_end && d->have_full_history && d->messages == nullptr) {
|
|
|
|
|
if (!d->last_database_message_id.is_valid()) {
|
|
|
|
|
set_dialog_is_empty(d, "on_get_history empty");
|
|
|
|
|
} else {
|
|
|
|
|
LOG(INFO) << "Skip marking " << dialog_id << " as empty, because it probably has messages from "
|
|
|
|
|
<< d->first_database_message_id << " to " << d->last_database_message_id << " in the database";
|
|
|
|
|
}
|
|
|
|
|
if (from_the_end && d->have_full_history && d->messages == nullptr) {
|
|
|
|
|
if (!d->last_database_message_id.is_valid()) {
|
|
|
|
|
set_dialog_is_empty(d, "on_get_history empty");
|
|
|
|
|
} else {
|
|
|
|
|
LOG(INFO) << "Skip marking " << dialog_id << " as empty, because it probably has messages from "
|
|
|
|
|
<< d->first_database_message_id << " to " << d->last_database_message_id << " in the database";
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -9526,7 +9527,6 @@ void MessagesManager::on_get_history(DialogId dialog_id, MessageId from_message_
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// be aware that dialog may not yet exist
|
|
|
|
|
// be aware that returned messages are guaranteed to be consecutive messages, but if !from_the_end they
|
|
|
|
|
// may be older (if some messages was deleted) or newer (if some messages were added) than an expected answer
|
|
|
|
|
// be aware that any subset of the returned messages may be already deleted and returned as MessageEmpty
|
|
|
|
@ -9535,7 +9535,7 @@ void MessagesManager::on_get_history(DialogId dialog_id, MessageId from_message_
|
|
|
|
|
MessageId last_added_message_id;
|
|
|
|
|
bool have_next = false;
|
|
|
|
|
|
|
|
|
|
if (narrow_cast<int32>(messages.size()) < limit + offset && messages.size() <= 1 && d != nullptr) {
|
|
|
|
|
if (narrow_cast<int32>(messages.size()) < limit + offset && messages.size() <= 1) {
|
|
|
|
|
MessageId first_received_message_id = get_message_id(messages.back(), false);
|
|
|
|
|
if (first_received_message_id >= from_message_id && d->first_database_message_id.is_valid() &&
|
|
|
|
|
first_received_message_id >= d->first_database_message_id) {
|
|
|
|
@ -9544,20 +9544,12 @@ void MessagesManager::on_get_history(DialogId dialog_id, MessageId from_message_
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
bool prev_have_full_history = false;
|
|
|
|
|
MessageId prev_last_new_message_id;
|
|
|
|
|
MessageId prev_first_database_message_id;
|
|
|
|
|
MessageId prev_last_database_message_id;
|
|
|
|
|
MessageId prev_last_message_id;
|
|
|
|
|
if (d != nullptr) {
|
|
|
|
|
prev_last_new_message_id = d->last_new_message_id;
|
|
|
|
|
prev_first_database_message_id = d->first_database_message_id;
|
|
|
|
|
prev_last_database_message_id = d->last_database_message_id;
|
|
|
|
|
prev_last_message_id = d->last_message_id;
|
|
|
|
|
prev_have_full_history = d->have_full_history;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (from_the_end && d != nullptr) {
|
|
|
|
|
bool prev_have_full_history = d->have_full_history;
|
|
|
|
|
MessageId prev_last_new_message_id = d->last_new_message_id;
|
|
|
|
|
MessageId prev_first_database_message_id = d->first_database_message_id;
|
|
|
|
|
MessageId prev_last_database_message_id = d->last_database_message_id;
|
|
|
|
|
MessageId prev_last_message_id = d->last_message_id;
|
|
|
|
|
if (from_the_end) {
|
|
|
|
|
// delete all server messages with ID > last_received_message_id
|
|
|
|
|
// there were no new messages received after the getHistory request was sent, so they are already deleted message
|
|
|
|
|
vector<MessageId> message_ids;
|
|
|
|
@ -9600,7 +9592,7 @@ void MessagesManager::on_get_history(DialogId dialog_id, MessageId from_message_
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for (auto &message : messages) {
|
|
|
|
|
if (!have_next && from_the_end && d != nullptr && get_message_id(message, false) < d->last_message_id) {
|
|
|
|
|
if (!have_next && from_the_end && get_message_id(message, false) < d->last_message_id) {
|
|
|
|
|
// last message in the dialog should be attached to the next message if there is some
|
|
|
|
|
have_next = true;
|
|
|
|
|
}
|
|
|
|
@ -9621,13 +9613,6 @@ void MessagesManager::on_get_history(DialogId dialog_id, MessageId from_message_
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (!have_next) {
|
|
|
|
|
if (d == nullptr) {
|
|
|
|
|
d = get_dialog(dialog_id);
|
|
|
|
|
if (d == nullptr) {
|
|
|
|
|
LOG(ERROR) << "Unknown dialog " << dialog_id;
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
have_next = true;
|
|
|
|
|
} else if (first_added_message_id.is_valid()) {
|
|
|
|
|
Message *next_message = get_message(d, first_added_message_id);
|
|
|
|
@ -9646,11 +9631,6 @@ void MessagesManager::on_get_history(DialogId dialog_id, MessageId from_message_
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (d == nullptr) {
|
|
|
|
|
promise.set_value(Unit());
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (have_full_history) {
|
|
|
|
|
d->have_full_history = true;
|
|
|
|
|
on_dialog_updated(dialog_id, "set have_full_history 2");
|
|
|
|
@ -32456,7 +32436,7 @@ MessagesManager::Message *MessagesManager::add_message_to_dialog(Dialog *d, uniq
|
|
|
|
|
if (*need_update) {
|
|
|
|
|
*need_update = false;
|
|
|
|
|
if (!G()->parameters().use_message_db) {
|
|
|
|
|
// can happen for bots if the message is received first through getMessage in an unknown chat without
|
|
|
|
|
// can happen if the message is received first through getMessage in an unknown chat without
|
|
|
|
|
// last_new_message_id and only after that received through getDifference or getChannelDifference
|
|
|
|
|
if (d->last_new_message_id.is_valid()) {
|
|
|
|
|
LOG(ERROR) << "Receive again " << (message->is_outgoing ? "outgoing" : "incoming")
|
|
|
|
@ -32473,7 +32453,7 @@ MessagesManager::Message *MessagesManager::add_message_to_dialog(Dialog *d, uniq
|
|
|
|
|
message->have_previous = false;
|
|
|
|
|
message->have_next = false;
|
|
|
|
|
}
|
|
|
|
|
if (!message->from_database) {
|
|
|
|
|
if (!message->from_database && (from_update || message->edit_date >= m->edit_date)) {
|
|
|
|
|
const int32 INDEX_MASK_MASK = ~message_search_filter_index_mask(MessageSearchFilter::UnreadMention);
|
|
|
|
|
auto old_index_mask = get_message_index_mask(dialog_id, m) & INDEX_MASK_MASK;
|
|
|
|
|
bool was_deleted = delete_active_live_location(dialog_id, m);
|
|
|
|
@ -34236,6 +34216,9 @@ MessagesManager::Dialog *MessagesManager::add_new_dialog(unique_ptr<Dialog> &&d,
|
|
|
|
|
if (td_->auth_manager_->is_bot()) {
|
|
|
|
|
d->notification_settings.is_synchronized = true;
|
|
|
|
|
}
|
|
|
|
|
if (is_channel_difference_finished_.erase(dialog_id)) {
|
|
|
|
|
d->is_channel_difference_finished = true;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
unique_ptr<Message> last_database_message = std::move(d->messages);
|
|
|
|
|
MessageId last_database_message_id = d->last_database_message_id;
|
|
|
|
@ -35752,8 +35735,11 @@ bool MessagesManager::need_channel_difference_to_add_message(DialogId dialog_id,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Dialog *d = get_dialog_force(dialog_id, "need_channel_difference_to_add_message");
|
|
|
|
|
if (d == nullptr || d->last_new_message_id == MessageId()) {
|
|
|
|
|
return false;
|
|
|
|
|
if (d == nullptr) {
|
|
|
|
|
return load_channel_pts(dialog_id) > 0 && !is_channel_difference_finished_.count(dialog_id);
|
|
|
|
|
}
|
|
|
|
|
if (d->last_new_message_id == MessageId()) {
|
|
|
|
|
return d->pts > 0 && !d->is_channel_difference_finished;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return get_message_id(message_ptr, false) > d->last_new_message_id;
|
|
|
|
@ -35763,7 +35749,7 @@ void MessagesManager::run_after_channel_difference(DialogId dialog_id, Promise<U
|
|
|
|
|
CHECK(dialog_id.get_type() == DialogType::Channel);
|
|
|
|
|
CHECK(have_input_peer(dialog_id, AccessRights::Read));
|
|
|
|
|
|
|
|
|
|
Dialog *d = get_dialog(dialog_id);
|
|
|
|
|
const Dialog *d = get_dialog(dialog_id);
|
|
|
|
|
CHECK(d != nullptr);
|
|
|
|
|
|
|
|
|
|
run_after_get_channel_difference_[dialog_id].push_back(std::move(promise));
|
|
|
|
@ -36279,7 +36265,7 @@ void MessagesManager::on_get_channel_difference(
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (!is_final) {
|
|
|
|
|
LOG_IF(ERROR, timeout > 0) << "Have timeout in not final ChannelDifference in " << dialog_id;
|
|
|
|
|
LOG_IF(ERROR, timeout > 0) << "Have timeout in nonfinal ChannelDifference in " << dialog_id;
|
|
|
|
|
auto enable_pull_based_backpressure
|
|
|
|
|
= G()->shared_config().get_option_boolean("enable_pull_based_backpressure", false);
|
|
|
|
|
get_channel_difference_delayed(dialog_id, d->pts, true, enable_pull_based_backpressure, "on_get_channel_difference");
|
|
|
|
@ -36306,51 +36292,58 @@ void MessagesManager::after_get_channel_difference(DialogId dialog_id, bool succ
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
auto d = get_dialog(dialog_id);
|
|
|
|
|
bool have_access = have_input_peer(dialog_id, AccessRights::Read);
|
|
|
|
|
auto pts = d != nullptr ? d->pts : load_channel_pts(dialog_id);
|
|
|
|
|
auto updates_it = postponed_channel_updates_.find(dialog_id);
|
|
|
|
|
if (updates_it != postponed_channel_updates_.end()) {
|
|
|
|
|
auto &updates = updates_it->second;
|
|
|
|
|
LOG(INFO) << "Begin to apply " << updates.size() << " postponed channel updates";
|
|
|
|
|
while (!updates.empty()) {
|
|
|
|
|
auto it = updates.begin();
|
|
|
|
|
auto update = std::move(it->second.update);
|
|
|
|
|
auto update_pts = it->second.pts;
|
|
|
|
|
auto update_pts_count = it->second.pts_count;
|
|
|
|
|
auto promise = std::move(it->second.promise);
|
|
|
|
|
updates.erase(it);
|
|
|
|
|
|
|
|
|
|
auto old_size = updates.size();
|
|
|
|
|
auto update_id = update->get_id();
|
|
|
|
|
if (have_access) {
|
|
|
|
|
add_pending_channel_update(dialog_id, std::move(update), update_pts, update_pts_count, std::move(promise),
|
|
|
|
|
"apply postponed channel updates", true);
|
|
|
|
|
} else {
|
|
|
|
|
promise.set_value(Unit());
|
|
|
|
|
}
|
|
|
|
|
if (updates.size() != old_size || running_get_channel_difference(dialog_id)) {
|
|
|
|
|
if (success && update_pts - 10000 < pts && update_pts_count == 1) {
|
|
|
|
|
// if getChannelDifference was successful and update pts is near channel pts,
|
|
|
|
|
// we hope that the update eventually can be applied
|
|
|
|
|
LOG(INFO) << "Can't apply postponed channel updates";
|
|
|
|
|
} else {
|
|
|
|
|
// otherwise protect from getChannelDifference repeating calls by dropping postponed updates
|
|
|
|
|
LOG(WARNING) << "Failed to apply postponed updates of type " << update_id << " in " << dialog_id
|
|
|
|
|
<< " with pts " << pts << ", update pts is " << update_pts << ", update pts count is "
|
|
|
|
|
<< update_pts_count;
|
|
|
|
|
vector<Promise<Unit>> update_promises;
|
|
|
|
|
for (auto &postponed_update : updates) {
|
|
|
|
|
update_promises.push_back(std::move(postponed_update.second.promise));
|
|
|
|
|
}
|
|
|
|
|
updates.clear();
|
|
|
|
|
for (auto &update_promise : update_promises) {
|
|
|
|
|
update_promise.set_value(Unit());
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if (updates.empty()) {
|
|
|
|
|
postponed_channel_updates_.erase(updates_it);
|
|
|
|
|
}
|
|
|
|
|
LOG(INFO) << "Finish to apply postponed channel updates";
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (d != nullptr) {
|
|
|
|
|
d->is_channel_difference_finished = true;
|
|
|
|
|
bool have_access = have_input_peer(dialog_id, AccessRights::Read);
|
|
|
|
|
if (!d->postponed_channel_updates.empty()) {
|
|
|
|
|
LOG(INFO) << "Begin to apply postponed channel updates";
|
|
|
|
|
while (!d->postponed_channel_updates.empty()) {
|
|
|
|
|
auto it = d->postponed_channel_updates.begin();
|
|
|
|
|
auto update = std::move(it->second.update);
|
|
|
|
|
auto update_pts = it->second.pts;
|
|
|
|
|
auto update_pts_count = it->second.pts_count;
|
|
|
|
|
auto promise = std::move(it->second.promise);
|
|
|
|
|
d->postponed_channel_updates.erase(it);
|
|
|
|
|
|
|
|
|
|
auto old_size = d->postponed_channel_updates.size();
|
|
|
|
|
auto update_id = update->get_id();
|
|
|
|
|
if (have_access) {
|
|
|
|
|
add_pending_channel_update(dialog_id, std::move(update), update_pts, update_pts_count, std::move(promise),
|
|
|
|
|
"apply postponed channel updates", true);
|
|
|
|
|
} else {
|
|
|
|
|
promise.set_value(Unit());
|
|
|
|
|
}
|
|
|
|
|
if (d->postponed_channel_updates.size() != old_size || running_get_channel_difference(dialog_id)) {
|
|
|
|
|
if (success && update_pts - 10000 < d->pts && update_pts_count == 1) {
|
|
|
|
|
// if getChannelDifference was successful and update pts is near channel pts,
|
|
|
|
|
// we hope that the update eventually can be applied
|
|
|
|
|
LOG(INFO) << "Can't apply postponed channel updates";
|
|
|
|
|
} else {
|
|
|
|
|
// otherwise protect from getChannelDifference repeating calls by dropping postponed updates
|
|
|
|
|
LOG(WARNING) << "Failed to apply postponed updates of type " << update_id << " in " << dialog_id
|
|
|
|
|
<< " with pts " << d->pts << ", update pts is " << update_pts << ", update pts count is "
|
|
|
|
|
<< update_pts_count;
|
|
|
|
|
vector<Promise<Unit>> update_promises;
|
|
|
|
|
for (auto &postponed_update : d->postponed_channel_updates) {
|
|
|
|
|
update_promises.push_back(std::move(postponed_update.second.promise));
|
|
|
|
|
}
|
|
|
|
|
d->postponed_channel_updates.clear();
|
|
|
|
|
for (auto &update_promise : update_promises) {
|
|
|
|
|
update_promise.set_value(Unit());
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
LOG(INFO) << "Finish to apply postponed channel updates";
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (d->message_notification_group.group_id.is_valid()) {
|
|
|
|
|
send_closure_later(G()->notification_manager(), &NotificationManager::after_get_chat_difference,
|
|
|
|
@ -36365,6 +36358,8 @@ void MessagesManager::after_get_channel_difference(DialogId dialog_id, bool succ
|
|
|
|
|
(d->order != DEFAULT_ORDER || is_dialog_sponsored(d))) {
|
|
|
|
|
get_history_from_the_end_impl(d, true, false, Auto());
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
is_channel_difference_finished_.insert(dialog_id);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (postponed_chat_read_inbox_updates_.erase(dialog_id) > 0) {
|
|
|
|
|