Repeat getHistory request if some new server messages aren't received.

This commit is contained in:
levlam 2021-07-29 06:07:11 +03:00
parent c1d69fcc32
commit bc8544299b
2 changed files with 51 additions and 22 deletions

View File

@ -1928,6 +1928,7 @@ class GetHistoryQuery final : public Td::ResultHandler {
Promise<Unit> promise_; Promise<Unit> promise_;
DialogId dialog_id_; DialogId dialog_id_;
MessageId from_message_id_; MessageId from_message_id_;
MessageId old_last_new_message_id_;
int32 offset_; int32 offset_;
int32 limit_; int32 limit_;
bool from_the_end_; bool from_the_end_;
@ -1936,7 +1937,8 @@ class GetHistoryQuery final : public Td::ResultHandler {
explicit GetHistoryQuery(Promise<Unit> &&promise) : promise_(std::move(promise)) { explicit GetHistoryQuery(Promise<Unit> &&promise) : promise_(std::move(promise)) {
} }
void send(DialogId dialog_id, MessageId from_message_id, int32 offset, int32 limit) { void send(DialogId dialog_id, MessageId from_message_id, MessageId old_last_new_message_id, int32 offset,
int32 limit) {
auto input_peer = td->messages_manager_->get_input_peer(dialog_id, AccessRights::Read); auto input_peer = td->messages_manager_->get_input_peer(dialog_id, AccessRights::Read);
if (input_peer == nullptr) { if (input_peer == nullptr) {
LOG(ERROR) << "Can't get chat history in " << dialog_id << " because doesn't have info about the chat"; LOG(ERROR) << "Can't get chat history in " << dialog_id << " because doesn't have info about the chat";
@ -1946,6 +1948,7 @@ class GetHistoryQuery final : public Td::ResultHandler {
dialog_id_ = dialog_id; dialog_id_ = dialog_id;
from_message_id_ = from_message_id; from_message_id_ = from_message_id;
old_last_new_message_id_ = old_last_new_message_id;
offset_ = offset; offset_ = offset;
limit_ = limit; limit_ = limit;
from_the_end_ = false; from_the_end_ = false;
@ -1953,7 +1956,7 @@ class GetHistoryQuery final : public Td::ResultHandler {
std::move(input_peer), from_message_id.get_server_message_id().get(), 0, offset, limit, 0, 0, 0))); std::move(input_peer), from_message_id.get_server_message_id().get(), 0, offset, limit, 0, 0, 0)));
} }
void send_get_from_the_end(DialogId dialog_id, int32 limit) { void send_get_from_the_end(DialogId dialog_id, MessageId old_last_new_message_id, int32 limit) {
auto input_peer = td->messages_manager_->get_input_peer(dialog_id, AccessRights::Read); auto input_peer = td->messages_manager_->get_input_peer(dialog_id, AccessRights::Read);
if (input_peer == nullptr) { if (input_peer == nullptr) {
LOG(ERROR) << "Can't get chat history because doesn't have info about the chat"; LOG(ERROR) << "Can't get chat history because doesn't have info about the chat";
@ -1961,6 +1964,7 @@ class GetHistoryQuery final : public Td::ResultHandler {
} }
dialog_id_ = dialog_id; dialog_id_ = dialog_id;
old_last_new_message_id_ = old_last_new_message_id;
offset_ = 0; offset_ = 0;
limit_ = limit; limit_ = limit;
from_the_end_ = true; from_the_end_ = true;
@ -1977,17 +1981,17 @@ class GetHistoryQuery final : public Td::ResultHandler {
auto info = td->messages_manager_->on_get_messages(result_ptr.move_as_ok(), "GetHistoryQuery"); auto info = td->messages_manager_->on_get_messages(result_ptr.move_as_ok(), "GetHistoryQuery");
td->messages_manager_->get_channel_difference_if_needed( td->messages_manager_->get_channel_difference_if_needed(
dialog_id_, std::move(info), dialog_id_, std::move(info),
PromiseCreator::lambda([td = td, dialog_id = dialog_id_, from_message_id = from_message_id_, offset = offset_, PromiseCreator::lambda([td = td, dialog_id = dialog_id_, from_message_id = from_message_id_,
limit = limit_, from_the_end = from_the_end_, old_last_new_message_id = old_last_new_message_id_, offset = offset_, limit = limit_,
from_the_end = from_the_end_,
promise = std::move(promise_)](Result<MessagesManager::MessagesInfo> &&result) mutable { promise = std::move(promise_)](Result<MessagesManager::MessagesInfo> &&result) mutable {
if (result.is_error()) { if (result.is_error()) {
promise.set_error(result.move_as_error()); promise.set_error(result.move_as_error());
} else { } else {
auto info = result.move_as_ok(); auto info = result.move_as_ok();
// TODO use info.total_count, info.pts // TODO use info.total_count, info.pts
td->messages_manager_->on_get_history(dialog_id, from_message_id, offset, limit, from_the_end, td->messages_manager_->on_get_history(dialog_id, from_message_id, old_last_new_message_id, offset, limit,
std::move(info.messages)); from_the_end, std::move(info.messages), std::move(promise));
promise.set_value(Unit());
} }
})); }));
} }
@ -9249,18 +9253,34 @@ void MessagesManager::on_get_messages(vector<tl_object_ptr<telegram_api::Message
} }
} }
void MessagesManager::on_get_history(DialogId dialog_id, MessageId from_message_id, int32 offset, int32 limit, void MessagesManager::on_get_history(DialogId dialog_id, MessageId from_message_id, MessageId old_last_new_message_id,
bool from_the_end, vector<tl_object_ptr<telegram_api::Message>> &&messages) { int32 offset, int32 limit, bool from_the_end,
vector<tl_object_ptr<telegram_api::Message>> &&messages, Promise<Unit> &&promise) {
LOG(INFO) << "Receive " << messages.size() << " history messages " << (from_the_end ? "from the end " : "") << "in " LOG(INFO) << "Receive " << messages.size() << " history messages " << (from_the_end ? "from the end " : "") << "in "
<< dialog_id << " from " << from_message_id << " with offset " << offset << " and limit " << limit; << dialog_id << " from " << from_message_id << " with offset " << offset << " and limit " << limit;
CHECK(-limit < offset && offset <= 0); CHECK(-limit < offset && offset <= 0);
CHECK(offset < 0 || from_the_end); CHECK(offset < 0 || from_the_end);
CHECK(!from_message_id.is_scheduled()); CHECK(!from_message_id.is_scheduled());
Dialog *d = get_dialog(dialog_id);
MessageId last_received_message_id = messages.empty() ? MessageId() : get_message_id(messages[0], false);
if (d != nullptr && old_last_new_message_id < d->last_new_message_id &&
(from_the_end || old_last_new_message_id < from_message_id) &&
last_received_message_id < d->last_new_message_id) {
// new server messages were added to the dialog since the request was sent, but weren't received
// they should have been received, so we must repeat the request to get them
if (from_the_end) {
get_history_from_the_end_impl(d, false, false, std::move(promise));
} else {
get_history_impl(d, from_message_id, offset, limit, false, false, std::move(promise));
}
return;
}
// the server can return less messages than requested if some of messages are deleted during request // the server can return less messages than requested if some of messages are deleted during request
// but if it happens, it is likely that there are no more messages on the server // but if it happens, it is likely that there are no more messages on the server
bool have_full_history = from_the_end && narrow_cast<int32>(messages.size()) < limit && messages.size() <= 1; bool have_full_history = from_the_end && narrow_cast<int32>(messages.size()) < limit && messages.size() <= 1;
Dialog *d = get_dialog(dialog_id);
if (messages.empty()) { if (messages.empty()) {
if (d != nullptr) { if (d != nullptr) {
@ -9281,6 +9301,7 @@ void MessagesManager::on_get_history(DialogId dialog_id, MessageId from_message_
// be aware that in some cases an empty answer may be returned, because of the race of getHistory and deleteMessages // be aware that in some cases an empty answer may be returned, because of the race of getHistory and deleteMessages
// and not because there are no more messages // and not because there are no more messages
promise.set_value(Unit());
return; return;
} }
@ -9298,6 +9319,7 @@ void MessagesManager::on_get_history(DialogId dialog_id, MessageId from_message_
} }
// TODO move to ERROR // TODO move to ERROR
LOG(FATAL) << error; LOG(FATAL) << error;
promise.set_value(Unit());
return; return;
} }
cur_message_id = message_id; cur_message_id = message_id;
@ -9310,7 +9332,6 @@ void MessagesManager::on_get_history(DialogId dialog_id, MessageId from_message_
// be aware that any subset of the returned messages may be already deleted and returned as MessageEmpty // be aware that any subset of the returned messages may be already deleted and returned as MessageEmpty
bool is_channel_message = dialog_id.get_type() == DialogType::Channel; bool is_channel_message = dialog_id.get_type() == DialogType::Channel;
MessageId first_added_message_id; MessageId first_added_message_id;
MessageId last_received_message_id = get_message_id(messages[0], false);
MessageId last_added_message_id; MessageId last_added_message_id;
bool have_next = false; bool have_next = false;
@ -9337,15 +9358,15 @@ void MessagesManager::on_get_history(DialogId dialog_id, MessageId from_message_
} }
if (from_the_end && d != nullptr) { if (from_the_end && d != nullptr) {
auto last_server_message_id = get_message_id(messages[0], false); // delete all server messages with ID > last_received_message_id
// delete all server messages with ID > last_server_message_id // there were no new messages received after the getHistory request was sent, so they are already deleted message
vector<MessageId> message_ids; vector<MessageId> message_ids;
find_newer_messages(d->messages.get(), last_server_message_id, message_ids); find_newer_messages(d->messages.get(), last_received_message_id, message_ids);
if (!message_ids.empty()) { if (!message_ids.empty()) {
bool need_update_dialog_pos = false; bool need_update_dialog_pos = false;
vector<int64> deleted_message_ids; vector<int64> deleted_message_ids;
for (auto message_id : message_ids) { for (auto message_id : message_ids) {
CHECK(message_id > last_server_message_id); CHECK(message_id > last_received_message_id);
if (message_id.is_server()) { if (message_id.is_server()) {
auto message = delete_message(d, message_id, true, &need_update_dialog_pos, "on_get_gistory 1"); auto message = delete_message(d, message_id, true, &need_update_dialog_pos, "on_get_gistory 1");
CHECK(message != nullptr); CHECK(message != nullptr);
@ -9360,10 +9381,10 @@ void MessagesManager::on_get_history(DialogId dialog_id, MessageId from_message_
send_update_delete_messages(dialog_id, std::move(deleted_message_ids), true, false); send_update_delete_messages(dialog_id, std::move(deleted_message_ids), true, false);
message_ids.clear(); message_ids.clear();
find_newer_messages(d->messages.get(), last_server_message_id, message_ids); find_newer_messages(d->messages.get(), last_received_message_id, message_ids);
} }
// connect all messages with ID > last_server_message_id // connect all messages with ID > last_received_message_id
for (size_t i = 0; i + 1 < message_ids.size(); i++) { for (size_t i = 0; i + 1 < message_ids.size(); i++) {
auto m = get_message(d, message_ids[i]); auto m = get_message(d, message_ids[i]);
CHECK(m != nullptr); CHECK(m != nullptr);
@ -9372,6 +9393,8 @@ void MessagesManager::on_get_history(DialogId dialog_id, MessageId from_message_
attach_message_to_next(d, message_ids[i], "on_get_history 3"); attach_message_to_next(d, message_ids[i], "on_get_history 3");
} }
} }
have_next = true;
} }
} }
@ -9420,6 +9443,7 @@ void MessagesManager::on_get_history(DialogId dialog_id, MessageId from_message_
} }
if (d == nullptr) { if (d == nullptr) {
promise.set_value(Unit());
return; return;
} }
@ -9546,6 +9570,7 @@ void MessagesManager::on_get_history(DialogId dialog_id, MessageId from_message_
} }
} }
} }
promise.set_value(Unit());
} }
vector<DialogId> MessagesManager::get_peers_dialog_ids(vector<tl_object_ptr<telegram_api::Peer>> &&peers) { vector<DialogId> MessagesManager::get_peers_dialog_ids(vector<tl_object_ptr<telegram_api::Peer>> &&peers) {
@ -20309,6 +20334,7 @@ tl_object_ptr<td_api::messages> MessagesManager::get_dialog_history(DialogId dia
while (*p != nullptr && messages.size() < static_cast<size_t>(limit)) { while (*p != nullptr && messages.size() < static_cast<size_t>(limit)) {
messages.push_back(get_message_object(dialog_id, *p)); messages.push_back(get_message_object(dialog_id, *p));
from_message_id = (*p)->message_id; from_message_id = (*p)->message_id;
from_the_end = false;
--p; --p;
} }
} }
@ -22327,13 +22353,15 @@ void MessagesManager::get_history_from_the_end_impl(const Dialog *d, bool from_d
std::move(promise)); std::move(promise));
})); }));
} else { } else {
if (only_local || dialog_id.get_type() == DialogType::SecretChat) { if (only_local || dialog_id.get_type() == DialogType::SecretChat || d->last_message_id.is_valid()) {
// if last message is known, there are no reasons to get message history from server from the end
promise.set_value(Unit()); promise.set_value(Unit());
return; return;
} }
LOG(INFO) << "Get history from the end of " << dialog_id << " from server"; LOG(INFO) << "Get history from the end of " << dialog_id << " from server";
td_->create_handler<GetHistoryQuery>(std::move(promise))->send_get_from_the_end(dialog_id, limit); td_->create_handler<GetHistoryQuery>(std::move(promise))
->send_get_from_the_end(dialog_id, d->last_new_message_id, limit);
} }
} }
@ -22385,7 +22413,7 @@ void MessagesManager::get_history_impl(const Dialog *d, MessageId from_message_i
LOG(INFO) << "Get history in " << dialog_id << " from " << from_message_id << " with offset " << offset LOG(INFO) << "Get history in " << dialog_id << " from " << from_message_id << " with offset " << offset
<< " and limit " << limit << " from server"; << " and limit " << limit << " from server";
td_->create_handler<GetHistoryQuery>(std::move(promise)) td_->create_handler<GetHistoryQuery>(std::move(promise))
->send(dialog_id, from_message_id.get_next_server_message_id(), offset, limit); ->send(dialog_id, from_message_id.get_next_server_message_id(), d->last_new_message_id, offset, limit);
} }
} }

View File

@ -189,8 +189,9 @@ class MessagesManager final : public Actor {
void on_get_messages(vector<tl_object_ptr<telegram_api::Message>> &&messages, bool is_channel_message, void on_get_messages(vector<tl_object_ptr<telegram_api::Message>> &&messages, bool is_channel_message,
bool is_scheduled, const char *source); bool is_scheduled, const char *source);
void on_get_history(DialogId dialog_id, MessageId from_message_id, int32 offset, int32 limit, bool from_the_end, void on_get_history(DialogId dialog_id, MessageId from_message_id, MessageId old_last_new_message_id, int32 offset,
vector<tl_object_ptr<telegram_api::Message>> &&messages); int32 limit, bool from_the_end, vector<tl_object_ptr<telegram_api::Message>> &&messages,
Promise<Unit> &&promise);
void on_get_public_dialogs_search_result(const string &query, vector<tl_object_ptr<telegram_api::Peer>> &&my_peers, void on_get_public_dialogs_search_result(const string &query, vector<tl_object_ptr<telegram_api::Peer>> &&my_peers,
vector<tl_object_ptr<telegram_api::Peer>> &&peers); vector<tl_object_ptr<telegram_api::Peer>> &&peers);