Combine get_history_from_the_end queries.

This commit is contained in:
levlam 2023-07-27 20:40:56 +03:00
parent 070399c89a
commit 9618d58732
2 changed files with 88 additions and 4 deletions

View File

@ -13536,6 +13536,7 @@ void MessagesManager::hangup() {
fail_promise_map(load_scheduled_messages_from_database_queries_);
fail_promise_map(run_after_get_channel_difference_);
fail_promise_map(search_public_dialogs_queries_);
fail_promise_map(get_history_from_the_end_queries_);
while (!pending_channel_on_get_dialogs_.empty()) {
auto it = pending_channel_on_get_dialogs_.begin();
auto promise = std::move(it->second.promise);
@ -21055,7 +21056,7 @@ tl_object_ptr<td_api::messages> MessagesManager::get_dialog_history(DialogId dia
LOG(INFO) << "Get " << (only_local ? "local " : "") << "history in " << dialog_id << " from " << from_message_id
<< " with offset " << offset << " and limit " << limit << ", " << left_tries
<< " tries left, have_full_history = " << d->have_full_history
<< " tries left, is_empty = " << d->is_empty << ", have_full_history = " << d->have_full_history
<< ", have_full_history_source = " << d->have_full_history_source;
auto message_ids = d->ordered_messages.get_history(d->last_message_id, from_message_id, offset, limit,
@ -23464,10 +23465,34 @@ void MessagesManager::get_history_from_the_end_impl(const Dialog *d, bool from_d
if (!d->first_database_message_id.is_valid() && !d->have_full_history) {
from_database = false;
}
if (!G()->use_message_database()) {
from_database = false;
}
// load only 10 messages when repairing the last message and can't save the result to the database
int32 limit = !promise && (from_database || !G()->use_message_database()) ? 10 : MAX_GET_HISTORY;
if (from_database && G()->use_message_database()) {
GetHistoryFromTheEndQuery query;
query.dialog_id_ = dialog_id;
query.limit_ = limit;
query.from_database_ = from_database;
query.only_local_ = only_local;
if (from_database) {
LOG(INFO) << "Get history from the end of " << dialog_id << " from database from " << source;
query.old_last_message_id_ = d->last_database_message_id;
auto &promises = get_history_from_the_end_queries_[query];
promises.push_back(std::move(promise));
if (promises.size() != 1) {
// query has already been sent, just wait for the result
return;
}
auto query_promise = PromiseCreator::lambda([actor_id = actor_id(this), query](Result<Unit> &&result) {
send_closure(actor_id, &MessagesManager::on_get_history_from_the_end, query, std::move(result));
});
MessageDbMessagesQuery db_query;
db_query.dialog_id = dialog_id;
db_query.from_message_id = MessageId::max();
@ -23476,7 +23501,7 @@ void MessagesManager::get_history_from_the_end_impl(const Dialog *d, bool from_d
db_query,
PromiseCreator::lambda([actor_id = actor_id(this), dialog_id,
old_last_database_message_id = d->last_database_message_id, only_local, limit,
promise = std::move(promise)](vector<MessageDbDialogMessage> messages) mutable {
promise = std::move(query_promise)](vector<MessageDbDialogMessage> messages) mutable {
send_closure(actor_id, &MessagesManager::on_get_history_from_database, dialog_id, MessageId::max(),
old_last_database_message_id, 0, limit, true, only_local, std::move(messages),
std::move(promise));
@ -23488,12 +23513,43 @@ void MessagesManager::get_history_from_the_end_impl(const Dialog *d, bool from_d
return;
}
query.old_last_message_id_ = d->last_new_message_id;
auto &promises = get_history_from_the_end_queries_[query];
promises.push_back(std::move(promise));
if (promises.size() != 1) {
// query has already been sent, just wait for the result
return;
}
auto query_promise = PromiseCreator::lambda([actor_id = actor_id(this), query](Result<Unit> &&result) {
send_closure(actor_id, &MessagesManager::on_get_history_from_the_end, query, std::move(result));
});
LOG(INFO) << "Get history from the end of " << dialog_id << " from server from " << source;
td_->create_handler<GetHistoryQuery>(std::move(promise))
td_->create_handler<GetHistoryQuery>(std::move(query_promise))
->send_get_from_the_end(dialog_id, d->last_new_message_id, limit);
}
}
void MessagesManager::on_get_history_from_the_end(const GetHistoryFromTheEndQuery &query, Result<Unit> &&result) {
G()->ignore_result_if_closing(result);
auto it = get_history_from_the_end_queries_.find(query);
if (it == get_history_from_the_end_queries_.end()) {
return;
}
auto promises = std::move(it->second);
CHECK(!promises.empty());
get_history_from_the_end_queries_.erase(it);
if (result.is_ok()) {
set_promises(promises);
} else {
fail_promises(promises, result.move_as_error());
}
}
void MessagesManager::get_history(DialogId dialog_id, MessageId from_message_id, int32 offset, int32 limit,
bool from_database, bool only_local, Promise<Unit> &&promise) {
get_history_impl(get_dialog(dialog_id), from_message_id, offset, limit, from_database, only_local,

View File

@ -1645,6 +1645,29 @@ class MessagesManager final : public Actor {
vector<std::pair<Promise<Unit>, std::function<bool(const Message *)>>> suffix_load_queries_;
};
struct GetHistoryFromTheEndQuery {
DialogId dialog_id_;
MessageId old_last_message_id_;
int32 limit_ = 0;
bool from_database_ = false;
bool only_local_ = false;
bool operator==(const GetHistoryFromTheEndQuery &other) const {
return dialog_id_ == other.dialog_id_ && old_last_message_id_ == other.old_last_message_id_ &&
limit_ == other.limit_ && from_database_ == other.from_database_ && only_local_ == other.only_local_;
}
};
struct GetHistoryFromTheEndQueryHash {
uint32 operator()(const GetHistoryFromTheEndQuery &query) const {
uint32 hash = DialogIdHash()(query.dialog_id_);
hash = combine_hashes(hash, MessageIdHash()(query.old_last_message_id_));
hash = combine_hashes(hash, Hash<int32>()(query.limit_));
hash = combine_hashes(hash, static_cast<uint32>(query.from_database_));
hash = combine_hashes(hash, static_cast<uint32>(query.only_local_));
return hash;
}
};
class BlockMessageSenderFromRepliesOnServerLogEvent;
class DeleteAllCallMessagesOnServerLogEvent;
class DeleteAllChannelMessagesFromSenderOnServerLogEvent;
@ -2213,6 +2236,8 @@ class MessagesManager final : public Actor {
void get_history_from_the_end_impl(const Dialog *d, bool from_database, bool only_local, Promise<Unit> &&promise,
const char *source);
void on_get_history_from_the_end(const GetHistoryFromTheEndQuery &query, Result<Unit> &&result);
void get_history(DialogId dialog_id, MessageId from_message_id, int32 offset, int32 limit, bool from_database,
bool only_local, Promise<Unit> &&promise);
@ -3675,6 +3700,9 @@ class MessagesManager final : public Actor {
vector<string> active_reactions_;
FlatHashMap<string, size_t> active_reaction_pos_;
FlatHashMap<GetHistoryFromTheEndQuery, vector<Promise<Unit>>, GetHistoryFromTheEndQueryHash>
get_history_from_the_end_queries_;
uint32 scheduled_messages_sync_generation_ = 1;
int64 authorization_date_ = 0;