Move suffix load out of Dialog.

This commit is contained in:
levlam 2023-03-16 17:54:33 +03:00
parent 97fa882a8f
commit 3a1aaa1148
2 changed files with 83 additions and 56 deletions

View File

@ -15142,8 +15142,11 @@ void MessagesManager::set_dialog_last_message_id(Dialog *d, MessageId last_messa
}
}
if (!last_message_id.is_valid()) {
d->suffix_load_first_message_id_ = MessageId();
d->suffix_load_done_ = false;
auto it = dialog_suffix_load_queries_.find(d->dialog_id);
if (it != dialog_suffix_load_queries_.end()) {
it->second->suffix_load_first_message_id_ = MessageId();
it->second->suffix_load_done_ = false;
}
}
if (last_message_id.is_valid() && d->delete_last_message_date != 0) {
d->delete_last_message_date = 0;
@ -16198,11 +16201,17 @@ bool MessagesManager::can_unload_message(const Dialog *d, const Message *m) cons
// don't want to unload messages from the last album
// can't unload from memory last dialog, last database messages, yet unsent messages, being edited media messages and active live locations
// can't unload messages in dialog with active suffix load query
{
auto it = dialog_suffix_load_queries_.find(d->dialog_id);
if (it != dialog_suffix_load_queries_.end() && !it->second->suffix_load_queries_.empty()) {
return false;
}
}
return d->open_count == 0 && m->message_id != d->last_message_id && m->message_id != d->last_database_message_id &&
!m->message_id.is_yet_unsent() && active_live_location_full_message_ids_.count(full_message_id) == 0 &&
replied_by_yet_unsent_messages_.count(full_message_id) == 0 && m->edited_content == nullptr &&
d->suffix_load_queries_.empty() && m->message_id != d->reply_markup_message_id &&
m->message_id != d->last_pinned_message_id && m->message_id != d->last_edited_message_id &&
m->message_id != d->reply_markup_message_id && m->message_id != d->last_pinned_message_id &&
m->message_id != d->last_edited_message_id &&
(m->media_album_id != d->last_media_album_id || m->media_album_id == 0);
}
@ -16547,22 +16556,24 @@ unique_ptr<MessagesManager::Message> MessagesManager::do_delete_message(Dialog *
set_dialog_first_database_message_id(d, MessageId(), "do_delete_message");
}
if (message_id == d->suffix_load_first_message_id_) {
auto suffix_load_queries_it = dialog_suffix_load_queries_.find(d->dialog_id);
if (suffix_load_queries_it != dialog_suffix_load_queries_.end() &&
message_id == suffix_load_queries_it->second->suffix_load_first_message_id_) {
MessagesConstIterator it(d, message_id);
CHECK(*it == m);
if ((*it)->have_previous) {
--it;
if (*it != nullptr) {
d->suffix_load_first_message_id_ = (*it)->message_id;
suffix_load_queries_it->second->suffix_load_first_message_id_ = (*it)->message_id;
} else {
LOG(ERROR) << "Have have_previous is true, but there is no previous for " << full_message_id << " from "
<< source;
d->suffix_load_first_message_id_ = MessageId();
d->suffix_load_done_ = false;
suffix_load_queries_it->second->suffix_load_first_message_id_ = MessageId();
suffix_load_queries_it->second->suffix_load_done_ = false;
}
} else {
d->suffix_load_first_message_id_ = MessageId();
d->suffix_load_done_ = false;
suffix_load_queries_it->second->suffix_load_first_message_id_ = MessageId();
suffix_load_queries_it->second->suffix_load_done_ = false;
}
}
} else {
@ -16570,10 +16581,13 @@ unique_ptr<MessagesManager::Message> MessagesManager::do_delete_message(Dialog *
CHECK(td_->auth_manager_->is_bot() && !G()->use_message_database());
set_dialog_last_message_id(d, MessageId(), "do_delete_message");
}
}
if (only_from_memory && message_id >= d->suffix_load_first_message_id_) {
d->suffix_load_first_message_id_ = MessageId();
d->suffix_load_done_ = false;
auto suffix_load_queries_it = dialog_suffix_load_queries_.find(d->dialog_id);
if (suffix_load_queries_it != dialog_suffix_load_queries_.end() &&
message_id >= suffix_load_queries_it->second->suffix_load_first_message_id_) {
suffix_load_queries_it->second->suffix_load_first_message_id_ = MessageId();
suffix_load_queries_it->second->suffix_load_done_ = false;
}
}
if (m->have_previous && (only_from_memory || !m->have_next)) {
@ -41117,24 +41131,25 @@ void MessagesManager::clear_recently_found_dialogs() {
recently_found_dialogs_.clear_dialogs();
}
void MessagesManager::suffix_load_loop(Dialog *d) {
if (d->suffix_load_has_query_) {
void MessagesManager::suffix_load_loop(const Dialog *d, SuffixLoadQueries *queries) {
CHECK(queries != nullptr);
if (queries->suffix_load_has_query_) {
return;
}
if (d->suffix_load_queries_.empty()) {
if (queries->suffix_load_queries_.empty()) {
return;
}
CHECK(!d->suffix_load_done_);
CHECK(!queries->suffix_load_done_);
CHECK(d != nullptr);
auto dialog_id = d->dialog_id;
auto from_message_id = d->suffix_load_first_message_id_;
auto from_message_id = queries->suffix_load_first_message_id_;
LOG(INFO) << "Send suffix load query in " << dialog_id << " from " << from_message_id;
auto promise = PromiseCreator::lambda([actor_id = actor_id(this), dialog_id](Result<Unit> result) {
send_closure(actor_id, &MessagesManager::suffix_load_query_ready, dialog_id);
});
d->suffix_load_has_query_ = true;
d->suffix_load_query_message_id_ = from_message_id;
queries->suffix_load_has_query_ = true;
queries->suffix_load_query_message_id_ = from_message_id;
if (from_message_id.is_valid()) {
get_history_impl(d, from_message_id, -1, 100, true, true, std::move(promise));
} else {
@ -41143,56 +41158,65 @@ void MessagesManager::suffix_load_loop(Dialog *d) {
}
}
void MessagesManager::suffix_load_update_first_message_id(Dialog *d) {
if (!d->suffix_load_first_message_id_.is_valid()) {
void MessagesManager::suffix_load_update_first_message_id(const Dialog *d, SuffixLoadQueries *queries) {
CHECK(d != nullptr);
CHECK(queries != nullptr);
if (!queries->suffix_load_first_message_id_.is_valid()) {
if (!d->last_message_id.is_valid()) {
return;
}
d->suffix_load_first_message_id_ = d->last_message_id;
queries->suffix_load_first_message_id_ = d->last_message_id;
}
auto it = MessagesConstIterator(d, d->suffix_load_first_message_id_);
auto it = MessagesConstIterator(d, queries->suffix_load_first_message_id_);
CHECK(*it != nullptr);
CHECK((*it)->message_id == d->suffix_load_first_message_id_);
CHECK((*it)->message_id == queries->suffix_load_first_message_id_);
while ((*it)->have_previous) {
--it;
}
d->suffix_load_first_message_id_ = (*it)->message_id;
queries->suffix_load_first_message_id_ = (*it)->message_id;
}
void MessagesManager::suffix_load_query_ready(DialogId dialog_id) {
auto *queries = dialog_suffix_load_queries_[dialog_id].get();
CHECK(queries != nullptr);
CHECK(queries->suffix_load_has_query_);
LOG(INFO) << "Finished suffix load query in " << dialog_id;
auto *d = get_dialog(dialog_id);
CHECK(d != nullptr);
bool is_unchanged = d->suffix_load_first_message_id_ == d->suffix_load_query_message_id_;
suffix_load_update_first_message_id(d);
if (is_unchanged && d->suffix_load_first_message_id_ == d->suffix_load_query_message_id_) {
bool is_unchanged = queries->suffix_load_first_message_id_ == queries->suffix_load_query_message_id_;
suffix_load_update_first_message_id(d, queries);
if (is_unchanged && queries->suffix_load_first_message_id_ == queries->suffix_load_query_message_id_) {
LOG(INFO) << "Finished suffix load in " << dialog_id;
d->suffix_load_done_ = true;
queries->suffix_load_done_ = true;
}
d->suffix_load_has_query_ = false;
queries->suffix_load_has_query_ = false;
// Remove ready queries
auto *m = get_message_force(d, d->suffix_load_first_message_id_, "suffix_load_query_ready");
auto ready_it = std::partition(d->suffix_load_queries_.begin(), d->suffix_load_queries_.end(),
[&](auto &value) { return !(d->suffix_load_done_ || value.second(m)); });
for (auto it = ready_it; it != d->suffix_load_queries_.end(); ++it) {
auto *m = get_message_force(d, queries->suffix_load_first_message_id_, "suffix_load_query_ready");
auto ready_it = std::partition(queries->suffix_load_queries_.begin(), queries->suffix_load_queries_.end(),
[&](auto &value) { return !(queries->suffix_load_done_ || value.second(m)); });
for (auto it = ready_it; it != queries->suffix_load_queries_.end(); ++it) {
it->first.set_value(Unit());
}
d->suffix_load_queries_.erase(ready_it, d->suffix_load_queries_.end());
queries->suffix_load_queries_.erase(ready_it, queries->suffix_load_queries_.end());
suffix_load_loop(d);
suffix_load_loop(d, queries);
}
void MessagesManager::suffix_load_add_query(Dialog *d,
std::pair<Promise<Unit>, std::function<bool(const Message *)>> query) {
suffix_load_update_first_message_id(d);
auto *m = get_message_force(d, d->suffix_load_first_message_id_, "suffix_load_add_query");
if (d->suffix_load_done_ || query.second(m)) {
auto &queries = dialog_suffix_load_queries_[d->dialog_id];
if (queries == nullptr) {
queries = make_unique<SuffixLoadQueries>();
}
suffix_load_update_first_message_id(d, queries.get());
auto *m = get_message_force(d, queries->suffix_load_first_message_id_, "suffix_load_add_query");
if (queries->suffix_load_done_ || query.second(m)) {
query.first.set_value(Unit());
} else {
d->suffix_load_queries_.emplace_back(std::move(query));
suffix_load_loop(d);
queries->suffix_load_queries_.emplace_back(std::move(query));
suffix_load_loop(d, queries.get());
}
}

View File

@ -1403,9 +1403,6 @@ class MessagesManager final : public Actor {
bool has_unload_timeout = false;
bool is_channel_difference_finished = false;
bool suffix_load_done_ = false;
bool suffix_load_has_query_ = false;
int32 pts = 0; // for channels only
int32 pending_read_channel_inbox_pts = 0; // for channels only
int32 pending_read_channel_inbox_server_unread_count = 0; // for channels only
@ -1431,12 +1428,6 @@ class MessagesManager final : public Actor {
string client_data;
// Load from newest to oldest message
MessageId suffix_load_first_message_id_; // identifier of some message such all suffix messages in range
// [suffix_load_first_message_id_, last_message_id] are loaded
MessageId suffix_load_query_message_id_;
std::vector<std::pair<Promise<Unit>, std::function<bool(const Message *)>>> suffix_load_queries_;
unique_ptr<Message> messages;
unique_ptr<Message> scheduled_messages;
@ -1736,6 +1727,16 @@ class MessagesManager final : public Actor {
}
};
struct SuffixLoadQueries {
bool suffix_load_done_ = false;
bool suffix_load_has_query_ = false;
MessageId suffix_load_first_message_id_; // identifier of some message such all suffix messages in range
// [suffix_load_first_message_id_, last_message_id] are loaded
MessageId suffix_load_query_message_id_;
vector<std::pair<Promise<Unit>, std::function<bool(const Message *)>>> suffix_load_queries_;
};
class BlockMessageSenderFromRepliesOnServerLogEvent;
class DeleteAllCallMessagesOnServerLogEvent;
class DeleteAllChannelMessagesFromSenderOnServerLogEvent;
@ -3346,8 +3347,8 @@ class MessagesManager final : public Actor {
static uint64 save_unpin_all_dialog_messages_on_server_log_event(DialogId dialog_id);
void suffix_load_loop(Dialog *d);
static void suffix_load_update_first_message_id(Dialog *d);
void suffix_load_loop(const Dialog *d, SuffixLoadQueries *queries);
void suffix_load_update_first_message_id(const Dialog *d, SuffixLoadQueries *queries);
void suffix_load_query_ready(DialogId dialog_id);
void suffix_load_add_query(Dialog *d, std::pair<Promise<Unit>, std::function<bool(const Message *)>> query);
void suffix_load_till_date(Dialog *d, int32 date, Promise<Unit> promise);
@ -3752,6 +3753,8 @@ class MessagesManager final : public Actor {
FlatHashMap<FullMessageId, std::set<MessageId>, FullMessageIdHash>
yet_unsent_thread_message_ids_; // {dialog_id, top_thread_message_id} -> yet unsent message IDs
FlatHashMap<DialogId, unique_ptr<SuffixLoadQueries>, DialogIdHash> dialog_suffix_load_queries_;
struct PendingMessageView {
FlatHashSet<MessageId, MessageIdHash> message_ids_;
bool increment_view_counter_ = false;