Combine all getChatHistory queries.

This commit is contained in:
levlam 2023-07-28 03:36:21 +03:00
parent 9618d58732
commit bd0ca2d6fc
2 changed files with 65 additions and 22 deletions

View File

@ -13536,7 +13536,7 @@ void MessagesManager::hangup() {
fail_promise_map(load_scheduled_messages_from_database_queries_); fail_promise_map(load_scheduled_messages_from_database_queries_);
fail_promise_map(run_after_get_channel_difference_); fail_promise_map(run_after_get_channel_difference_);
fail_promise_map(search_public_dialogs_queries_); fail_promise_map(search_public_dialogs_queries_);
fail_promise_map(get_history_from_the_end_queries_); fail_promise_map(get_history_queries_);
while (!pending_channel_on_get_dialogs_.empty()) { while (!pending_channel_on_get_dialogs_.empty()) {
auto it = pending_channel_on_get_dialogs_.begin(); auto it = pending_channel_on_get_dialogs_.begin();
auto promise = std::move(it->second.promise); auto promise = std::move(it->second.promise);
@ -23471,7 +23471,7 @@ void MessagesManager::get_history_from_the_end_impl(const Dialog *d, bool from_d
// load only 10 messages when repairing the last message and can't save the result to the database // load only 10 messages when repairing the last message and can't save the result to the database
int32 limit = !promise && (from_database || !G()->use_message_database()) ? 10 : MAX_GET_HISTORY; int32 limit = !promise && (from_database || !G()->use_message_database()) ? 10 : MAX_GET_HISTORY;
GetHistoryFromTheEndQuery query; PendingGetHistoryQuery query;
query.dialog_id_ = dialog_id; query.dialog_id_ = dialog_id;
query.limit_ = limit; query.limit_ = limit;
query.from_database_ = from_database; query.from_database_ = from_database;
@ -23482,7 +23482,7 @@ void MessagesManager::get_history_from_the_end_impl(const Dialog *d, bool from_d
query.old_last_message_id_ = d->last_database_message_id; query.old_last_message_id_ = d->last_database_message_id;
auto &promises = get_history_from_the_end_queries_[query]; auto &promises = get_history_queries_[query];
promises.push_back(std::move(promise)); promises.push_back(std::move(promise));
if (promises.size() != 1) { if (promises.size() != 1) {
// query has already been sent, just wait for the result // query has already been sent, just wait for the result
@ -23490,7 +23490,7 @@ void MessagesManager::get_history_from_the_end_impl(const Dialog *d, bool from_d
} }
auto query_promise = PromiseCreator::lambda([actor_id = actor_id(this), query](Result<Unit> &&result) { auto query_promise = PromiseCreator::lambda([actor_id = actor_id(this), query](Result<Unit> &&result) {
send_closure(actor_id, &MessagesManager::on_get_history_from_the_end, query, std::move(result)); send_closure(actor_id, &MessagesManager::on_get_history_finished, query, std::move(result));
}); });
MessageDbMessagesQuery db_query; MessageDbMessagesQuery db_query;
@ -23515,7 +23515,7 @@ void MessagesManager::get_history_from_the_end_impl(const Dialog *d, bool from_d
query.old_last_message_id_ = d->last_new_message_id; query.old_last_message_id_ = d->last_new_message_id;
auto &promises = get_history_from_the_end_queries_[query]; auto &promises = get_history_queries_[query];
promises.push_back(std::move(promise)); promises.push_back(std::move(promise));
if (promises.size() != 1) { if (promises.size() != 1) {
// query has already been sent, just wait for the result // query has already been sent, just wait for the result
@ -23523,7 +23523,7 @@ void MessagesManager::get_history_from_the_end_impl(const Dialog *d, bool from_d
} }
auto query_promise = PromiseCreator::lambda([actor_id = actor_id(this), query](Result<Unit> &&result) { auto query_promise = PromiseCreator::lambda([actor_id = actor_id(this), query](Result<Unit> &&result) {
send_closure(actor_id, &MessagesManager::on_get_history_from_the_end, query, std::move(result)); send_closure(actor_id, &MessagesManager::on_get_history_finished, query, std::move(result));
}); });
LOG(INFO) << "Get history from the end of " << dialog_id << " from server from " << source; LOG(INFO) << "Get history from the end of " << dialog_id << " from server from " << source;
@ -23532,16 +23532,16 @@ void MessagesManager::get_history_from_the_end_impl(const Dialog *d, bool from_d
} }
} }
void MessagesManager::on_get_history_from_the_end(const GetHistoryFromTheEndQuery &query, Result<Unit> &&result) { void MessagesManager::on_get_history_finished(const PendingGetHistoryQuery &query, Result<Unit> &&result) {
G()->ignore_result_if_closing(result); G()->ignore_result_if_closing(result);
auto it = get_history_from_the_end_queries_.find(query); auto it = get_history_queries_.find(query);
if (it == get_history_from_the_end_queries_.end()) { if (it == get_history_queries_.end()) {
return; return;
} }
auto promises = std::move(it->second); auto promises = std::move(it->second);
CHECK(!promises.empty()); CHECK(!promises.empty());
get_history_from_the_end_queries_.erase(it); get_history_queries_.erase(it);
if (result.is_ok()) { if (result.is_ok()) {
set_promises(promises); set_promises(promises);
@ -23571,9 +23571,35 @@ void MessagesManager::get_history_impl(const Dialog *d, MessageId from_message_i
!d->have_full_history) { !d->have_full_history) {
from_database = false; from_database = false;
} }
if (from_database && G()->use_message_database()) { if (!G()->use_message_database()) {
from_database = false;
}
PendingGetHistoryQuery query;
query.dialog_id_ = dialog_id;
query.from_message_id_ = from_message_id;
query.offset_ = offset;
query.limit_ = limit;
query.from_database_ = from_database;
query.only_local_ = only_local;
if (from_database) {
LOG(INFO) << "Get history in " << dialog_id << " from " << from_message_id << " with offset " << offset LOG(INFO) << "Get history in " << dialog_id << " from " << from_message_id << " with offset " << offset
<< " and limit " << limit << " from database"; << " and limit " << limit << " from database";
query.old_last_message_id_ = d->last_database_message_id;
auto &promises = get_history_queries_[query];
promises.push_back(std::move(promise));
if (promises.size() != 1) {
// query has already been sent, just wait for the result
return;
}
auto query_promise = PromiseCreator::lambda([actor_id = actor_id(this), query](Result<Unit> &&result) {
send_closure(actor_id, &MessagesManager::on_get_history_finished, query, std::move(result));
});
MessageDbMessagesQuery db_query; MessageDbMessagesQuery db_query;
db_query.dialog_id = dialog_id; db_query.dialog_id = dialog_id;
db_query.from_message_id = from_message_id; db_query.from_message_id = from_message_id;
@ -23583,7 +23609,7 @@ void MessagesManager::get_history_impl(const Dialog *d, MessageId from_message_i
db_query, db_query,
PromiseCreator::lambda([actor_id = actor_id(this), dialog_id, from_message_id, PromiseCreator::lambda([actor_id = actor_id(this), dialog_id, from_message_id,
old_last_database_message_id = d->last_database_message_id, offset, limit, only_local, old_last_database_message_id = d->last_database_message_id, offset, limit, only_local,
promise = std::move(promise)](vector<MessageDbDialogMessage> messages) mutable { promise = std::move(query_promise)](vector<MessageDbDialogMessage> messages) mutable {
send_closure(actor_id, &MessagesManager::on_get_history_from_database, dialog_id, from_message_id, send_closure(actor_id, &MessagesManager::on_get_history_from_database, dialog_id, from_message_id,
old_last_database_message_id, offset, limit, false, only_local, std::move(messages), old_last_database_message_id, offset, limit, false, only_local, std::move(messages),
std::move(promise)); std::move(promise));
@ -23593,9 +23619,22 @@ void MessagesManager::get_history_impl(const Dialog *d, MessageId from_message_i
return promise.set_value(Unit()); return promise.set_value(Unit());
} }
query.old_last_message_id_ = d->last_new_message_id;
auto &promises = get_history_queries_[query];
promises.push_back(std::move(promise));
if (promises.size() != 1) {
// query has already been sent, just wait for the result
return;
}
auto query_promise = PromiseCreator::lambda([actor_id = actor_id(this), query](Result<Unit> &&result) {
send_closure(actor_id, &MessagesManager::on_get_history_finished, query, std::move(result));
});
LOG(INFO) << "Get history in " << dialog_id << " from " << from_message_id << " with offset " << offset LOG(INFO) << "Get history in " << dialog_id << " from " << from_message_id << " with offset " << offset
<< " and limit " << limit << " from server"; << " and limit " << limit << " from server";
td_->create_handler<GetHistoryQuery>(std::move(promise)) td_->create_handler<GetHistoryQuery>(std::move(query_promise))
->send(dialog_id, from_message_id.get_next_server_message_id(), d->last_new_message_id, offset, limit); ->send(dialog_id, from_message_id.get_next_server_message_id(), d->last_new_message_id, offset, limit);
} }
} }

View File

@ -1645,22 +1645,27 @@ class MessagesManager final : public Actor {
vector<std::pair<Promise<Unit>, std::function<bool(const Message *)>>> suffix_load_queries_; vector<std::pair<Promise<Unit>, std::function<bool(const Message *)>>> suffix_load_queries_;
}; };
struct GetHistoryFromTheEndQuery { struct PendingGetHistoryQuery {
DialogId dialog_id_; DialogId dialog_id_;
MessageId from_message_id_;
MessageId old_last_message_id_; MessageId old_last_message_id_;
int32 offset_ = 0;
int32 limit_ = 0; int32 limit_ = 0;
bool from_database_ = false; bool from_database_ = false;
bool only_local_ = false; bool only_local_ = false;
bool operator==(const GetHistoryFromTheEndQuery &other) const { bool operator==(const PendingGetHistoryQuery &other) const {
return dialog_id_ == other.dialog_id_ && old_last_message_id_ == other.old_last_message_id_ && return dialog_id_ == other.dialog_id_ && from_message_id_ == other.from_message_id_ &&
limit_ == other.limit_ && from_database_ == other.from_database_ && only_local_ == other.only_local_; old_last_message_id_ == other.old_last_message_id_ && offset_ == other.offset_ && limit_ == other.limit_ &&
from_database_ == other.from_database_ && only_local_ == other.only_local_;
} }
}; };
struct GetHistoryFromTheEndQueryHash { struct PendingGetHistoryQueryHash {
uint32 operator()(const GetHistoryFromTheEndQuery &query) const { uint32 operator()(const PendingGetHistoryQuery &query) const {
uint32 hash = DialogIdHash()(query.dialog_id_); uint32 hash = DialogIdHash()(query.dialog_id_);
hash = combine_hashes(hash, MessageIdHash()(query.from_message_id_));
hash = combine_hashes(hash, MessageIdHash()(query.old_last_message_id_)); hash = combine_hashes(hash, MessageIdHash()(query.old_last_message_id_));
hash = combine_hashes(hash, Hash<int32>()(query.offset_));
hash = combine_hashes(hash, Hash<int32>()(query.limit_)); hash = combine_hashes(hash, Hash<int32>()(query.limit_));
hash = combine_hashes(hash, static_cast<uint32>(query.from_database_)); hash = combine_hashes(hash, static_cast<uint32>(query.from_database_));
hash = combine_hashes(hash, static_cast<uint32>(query.only_local_)); hash = combine_hashes(hash, static_cast<uint32>(query.only_local_));
@ -2236,7 +2241,7 @@ class MessagesManager final : public Actor {
void get_history_from_the_end_impl(const Dialog *d, bool from_database, bool only_local, Promise<Unit> &&promise, void get_history_from_the_end_impl(const Dialog *d, bool from_database, bool only_local, Promise<Unit> &&promise,
const char *source); const char *source);
void on_get_history_from_the_end(const GetHistoryFromTheEndQuery &query, Result<Unit> &&result); void on_get_history_finished(const PendingGetHistoryQuery &query, Result<Unit> &&result);
void get_history(DialogId dialog_id, MessageId from_message_id, int32 offset, int32 limit, bool from_database, void get_history(DialogId dialog_id, MessageId from_message_id, int32 offset, int32 limit, bool from_database,
bool only_local, Promise<Unit> &&promise); bool only_local, Promise<Unit> &&promise);
@ -3700,8 +3705,7 @@ class MessagesManager final : public Actor {
vector<string> active_reactions_; vector<string> active_reactions_;
FlatHashMap<string, size_t> active_reaction_pos_; FlatHashMap<string, size_t> active_reaction_pos_;
FlatHashMap<GetHistoryFromTheEndQuery, vector<Promise<Unit>>, GetHistoryFromTheEndQueryHash> FlatHashMap<PendingGetHistoryQuery, vector<Promise<Unit>>, PendingGetHistoryQueryHash> get_history_queries_;
get_history_from_the_end_queries_;
uint32 scheduled_messages_sync_generation_ = 1; uint32 scheduled_messages_sync_generation_ = 1;