Use request promise in searchCallMessages.

This commit is contained in:
levlam 2024-06-13 15:09:20 +03:00
parent 4257a341bd
commit 0b255f0260
3 changed files with 159 additions and 171 deletions

View File

@ -1899,8 +1899,7 @@ class SearchMessagesQuery final : public Td::ResultHandler {
void send(DialogId dialog_id, SavedMessagesTopicId saved_messages_topic_id, const string &query,
DialogId sender_dialog_id, MessageId from_message_id, int32 offset, int32 limit, MessageSearchFilter filter,
MessageId top_thread_message_id, const ReactionType &tag, int64 random_id) {
auto input_peer = dialog_id.is_valid() ? td_->dialog_manager_->get_input_peer(dialog_id, AccessRights::Read)
: make_tl_object<telegram_api::inputPeerEmpty>();
auto input_peer = td_->dialog_manager_->get_input_peer(dialog_id, AccessRights::Read);
CHECK(input_peer != nullptr);
dialog_id_ = dialog_id;
@ -2019,6 +2018,57 @@ class SearchMessagesQuery final : public Td::ResultHandler {
}
};
class SearchCallMessagesQuery final : public Td::ResultHandler {
Promise<td_api::object_ptr<td_api::foundMessages>> promise_;
MessageId from_message_id_;
int32 limit_;
MessageSearchFilter filter_;
public:
explicit SearchCallMessagesQuery(Promise<td_api::object_ptr<td_api::foundMessages>> &&promise)
: promise_(std::move(promise)) {
}
void send(MessageId from_message_id, int32 limit, MessageSearchFilter filter) {
from_message_id_ = from_message_id;
limit_ = limit;
filter_ = filter;
auto offset_id = from_message_id.get_server_message_id().get();
send_query(G()->net_query_creator().create(telegram_api::messages_search(
0, telegram_api::make_object<telegram_api::inputPeerEmpty>(), string(), nullptr, nullptr,
vector<telegram_api::object_ptr<telegram_api::Reaction>>(), 0, get_input_messages_filter(filter), 0,
std::numeric_limits<int32>::max(), offset_id, 0, limit, std::numeric_limits<int32>::max(), 0, 0)));
}
void on_result(BufferSlice packet) final {
auto result_ptr = fetch_result<telegram_api::messages_search>(packet);
if (result_ptr.is_error()) {
return on_error(result_ptr.move_as_error());
}
auto info = get_messages_info(td_, DialogId(), result_ptr.move_as_ok(), "SearchCallMessagesQuery");
td_->messages_manager_->get_channel_differences_if_needed(
std::move(info),
PromiseCreator::lambda([actor_id = td_->messages_manager_actor_.get(), from_message_id = from_message_id_,
limit = limit_, filter = filter_,
promise = std::move(promise_)](Result<MessagesInfo> &&result) mutable {
if (result.is_error()) {
promise.set_error(result.move_as_error());
} else {
auto info = result.move_as_ok();
send_closure(actor_id, &MessagesManager::on_get_call_messages, from_message_id, limit, filter,
info.total_count, std::move(info.messages), std::move(promise));
}
}),
"SearchCallMessagesQuery");
}
void on_error(Status status) final {
promise_.set_error(std::move(status));
}
};
class GetSearchResultPositionsQuery final : public Td::ResultHandler {
Promise<td_api::object_ptr<td_api::messagePositions>> promise_;
DialogId dialog_id_;
@ -9092,6 +9142,80 @@ void MessagesManager::on_get_message_search_result_calendar(
promise.set_value(td_api::make_object<td_api::messageCalendar>(total_count, std::move(days)));
}
void MessagesManager::on_get_call_messages(MessageId from_message_id, int32 limit, MessageSearchFilter filter,
int32 total_count,
vector<telegram_api::object_ptr<telegram_api::Message>> &&messages,
Promise<td_api::object_ptr<td_api::foundMessages>> &&promise) {
TRY_STATUS_PROMISE(promise, G()->close_status());
LOG(INFO) << "Receive " << messages.size() << " found call messages";
MessageId first_added_message_id;
if (messages.empty()) {
// messages may be empty because there are no more messages or they can't be found due to global limit
// anyway pretend that there are no more messages
first_added_message_id = MessageId::min();
}
FoundMessages found_messages;
auto &result = found_messages.message_full_ids;
int32 added_message_count = 0;
MessageId next_offset_message_id;
for (auto &message : messages) {
auto message_id = MessageId::get_message_id(message, false);
if (message_id.is_valid() && (!next_offset_message_id.is_valid() || message_id < next_offset_message_id)) {
next_offset_message_id = message_id;
}
auto new_message_full_id = on_get_message(std::move(message), false, false, false, "on_get_call_messages");
if (new_message_full_id == MessageFullId()) {
continue;
}
result.push_back(new_message_full_id);
added_message_count++;
CHECK(message_id == new_message_full_id.get_message_id());
CHECK(message_id.is_valid());
if (message_id < first_added_message_id || !first_added_message_id.is_valid()) {
first_added_message_id = message_id;
}
}
if (total_count < added_message_count) {
LOG(ERROR) << "Receive total_count = " << total_count << ", but added " << added_message_count
<< " messages out of " << messages.size();
total_count = added_message_count;
}
if (G()->use_message_database()) {
bool update_state = false;
auto &old_message_count = calls_db_state_.message_count_by_index[call_message_search_filter_index(filter)];
if (old_message_count != total_count) {
LOG(INFO) << "Update calls database message count to " << total_count;
old_message_count = total_count;
update_state = true;
}
auto &old_first_db_message_id =
calls_db_state_.first_calls_database_message_id_by_index[call_message_search_filter_index(filter)];
bool from_the_end = !from_message_id.is_valid() || from_message_id >= MessageId::max();
LOG(INFO) << "Have from_the_end = " << from_the_end << ", old_first_db_message_id = " << old_first_db_message_id
<< ", first_added_message_id = " << first_added_message_id << ", from_message_id = " << from_message_id;
if ((from_the_end || (old_first_db_message_id.is_valid() && old_first_db_message_id <= from_message_id)) &&
(!old_first_db_message_id.is_valid() || first_added_message_id < old_first_db_message_id)) {
LOG(INFO) << "Update calls database first message to " << first_added_message_id;
old_first_db_message_id = first_added_message_id;
update_state = true;
}
if (update_state) {
save_calls_db_state();
}
}
found_messages.total_count = total_count;
if (next_offset_message_id.is_valid()) {
found_messages.next_offset = PSTRING() << next_offset_message_id.get_server_message_id().get();
}
promise.set_value(get_found_messages_object(found_messages, "on_get_call_messages"));
}
void MessagesManager::on_get_dialog_messages_search_result(
DialogId dialog_id, SavedMessagesTopicId saved_messages_topic_id, const string &query, DialogId sender_dialog_id,
MessageId from_message_id, int32 offset, int32 limit, MessageSearchFilter filter, MessageId top_thread_message_id,
@ -9100,82 +9224,6 @@ void MessagesManager::on_get_dialog_messages_search_result(
TRY_STATUS_PROMISE(promise, G()->close_status());
LOG(INFO) << "Receive " << messages.size() << " found messages in " << dialog_id;
if (!dialog_id.is_valid()) {
CHECK(query.empty());
CHECK(!sender_dialog_id.is_valid());
CHECK(!top_thread_message_id.is_valid());
CHECK(!saved_messages_topic_id.is_valid());
CHECK(tag.is_empty());
auto it = found_call_messages_.find(random_id);
CHECK(it != found_call_messages_.end());
MessageId first_added_message_id;
if (messages.empty()) {
// messages may be empty because there are no more messages or they can't be found due to global limit
// anyway pretend that there are no more messages
first_added_message_id = MessageId::min();
}
auto &result = it->second.message_full_ids;
CHECK(result.empty());
int32 added_message_count = 0;
MessageId next_offset_message_id;
for (auto &message : messages) {
auto message_id = MessageId::get_message_id(message, false);
if (message_id.is_valid() && (!next_offset_message_id.is_valid() || message_id < next_offset_message_id)) {
next_offset_message_id = message_id;
}
auto new_message_full_id = on_get_message(std::move(message), false, false, false, "search call messages");
if (new_message_full_id == MessageFullId()) {
continue;
}
result.push_back(new_message_full_id);
added_message_count++;
CHECK(message_id == new_message_full_id.get_message_id());
CHECK(message_id.is_valid());
if (message_id < first_added_message_id || !first_added_message_id.is_valid()) {
first_added_message_id = message_id;
}
}
if (total_count < added_message_count) {
LOG(ERROR) << "Receive total_count = " << total_count << ", but added " << added_message_count
<< " messages out of " << messages.size();
total_count = added_message_count;
}
if (G()->use_message_database()) {
bool update_state = false;
auto &old_message_count = calls_db_state_.message_count_by_index[call_message_search_filter_index(filter)];
if (old_message_count != total_count) {
LOG(INFO) << "Update calls database message count to " << total_count;
old_message_count = total_count;
update_state = true;
}
auto &old_first_db_message_id =
calls_db_state_.first_calls_database_message_id_by_index[call_message_search_filter_index(filter)];
bool from_the_end = !from_message_id.is_valid() || from_message_id >= MessageId::max();
LOG(INFO) << "Have from_the_end = " << from_the_end << ", old_first_db_message_id = " << old_first_db_message_id
<< ", first_added_message_id = " << first_added_message_id << ", from_message_id = " << from_message_id;
if ((from_the_end || (old_first_db_message_id.is_valid() && old_first_db_message_id <= from_message_id)) &&
(!old_first_db_message_id.is_valid() || first_added_message_id < old_first_db_message_id)) {
LOG(INFO) << "Update calls database first message to " << first_added_message_id;
old_first_db_message_id = first_added_message_id;
update_state = true;
}
if (update_state) {
save_calls_db_state();
}
}
it->second.total_count = total_count;
if (next_offset_message_id.is_valid()) {
it->second.next_offset = PSTRING() << next_offset_message_id.get_server_message_id().get();
}
promise.set_value(Unit());
return;
}
auto it = found_dialog_messages_.find(random_id);
CHECK(it != found_dialog_messages_.end());
@ -9298,13 +9346,6 @@ void MessagesManager::on_get_dialog_messages_search_result(
}
void MessagesManager::on_failed_dialog_messages_search(DialogId dialog_id, int64 random_id) {
if (!dialog_id.is_valid()) {
auto it = found_call_messages_.find(random_id);
CHECK(it != found_call_messages_.end());
found_call_messages_.erase(it);
return;
}
auto it = found_dialog_messages_.find(random_id);
CHECK(it != found_dialog_messages_.end());
found_dialog_messages_.erase(it);
@ -20498,26 +20539,10 @@ MessagesManager::FoundDialogMessages MessagesManager::search_dialog_messages(
return result;
}
MessagesManager::FoundMessages MessagesManager::search_call_messages(const string &offset, int32 limit,
bool only_missed, int64 &random_id, bool use_db,
Promise<Unit> &&promise) {
if (random_id != 0) {
// request has already been sent before
auto it = found_call_messages_.find(random_id);
if (it != found_call_messages_.end()) {
auto result = std::move(it->second);
found_call_messages_.erase(it);
promise.set_value(Unit());
return result;
}
random_id = 0;
}
LOG(INFO) << "Search call messages from " << offset << " with limit " << limit;
FoundMessages result;
void MessagesManager::search_call_messages(const string &offset, int32 limit, bool only_missed,
Promise<td_api::object_ptr<td_api::foundMessages>> &&promise) {
if (limit <= 0) {
promise.set_error(Status::Error(400, "Parameter limit must be positive"));
return result;
return promise.set_error(Status::Error(400, "Parameter limit must be positive"));
}
if (limit > MAX_SEARCH_MESSAGES) {
limit = MAX_SEARCH_MESSAGES;
@ -20527,21 +20552,14 @@ MessagesManager::FoundMessages MessagesManager::search_call_messages(const strin
if (!offset.empty()) {
auto r_offset_server_message_id = to_integer_safe<int32>(offset);
if (r_offset_server_message_id.is_error()) {
promise.set_error(Status::Error(400, "Invalid offset specified"));
return result;
return promise.set_error(Status::Error(400, "Invalid offset specified"));
}
offset_message_id = MessageId(ServerMessageId(r_offset_server_message_id.ok()));
}
do {
random_id = Random::secure_int64();
} while (random_id == 0 || found_call_messages_.count(random_id) > 0);
found_call_messages_[random_id]; // reserve place for result
auto filter = only_missed ? MessageSearchFilter::MissedCall : MessageSearchFilter::Call;
if (use_db && G()->use_message_database()) {
if (G()->use_message_database()) {
// try to use database
MessageId first_db_message_id =
calls_db_state_.first_calls_database_message_id_by_index[call_message_search_filter_index(filter)];
@ -20561,19 +20579,17 @@ MessagesManager::FoundMessages MessagesManager::search_call_messages(const strin
db_query.from_unique_message_id = fixed_from_message_id.get_server_message_id().get();
db_query.limit = limit;
G()->td_db()->get_message_db_async()->get_calls(
db_query, PromiseCreator::lambda([random_id, first_db_message_id, filter, promise = std::move(promise)](
Result<MessageDbCallsResult> calls_result) mutable {
db_query,
PromiseCreator::lambda([first_db_message_id, offset_message_id, limit, filter,
promise = std::move(promise)](Result<MessageDbCallsResult> calls_result) mutable {
send_closure(G()->messages_manager(), &MessagesManager::on_message_db_calls_result, std::move(calls_result),
random_id, first_db_message_id, filter, std::move(promise));
first_db_message_id, offset_message_id, limit, filter, std::move(promise));
}));
return result;
return;
}
}
td_->create_handler<SearchMessagesQuery>(std::move(promise))
->send(DialogId(), SavedMessagesTopicId(), string(), DialogId(), offset_message_id, 0, limit, filter, MessageId(),
ReactionType(), random_id);
return result;
td_->create_handler<SearchCallMessagesQuery>(std::move(promise))->send(offset_message_id, limit, filter);
}
void MessagesManager::search_outgoing_document_messages(const string &query, int32 limit,
@ -21200,19 +21216,14 @@ void MessagesManager::on_message_db_fts_result(Result<MessageDbFtsResult> result
promise.set_value(get_found_messages_object(found_messages, "on_message_db_fts_result"));
}
void MessagesManager::on_message_db_calls_result(Result<MessageDbCallsResult> result, int64 random_id,
MessageId first_db_message_id, MessageSearchFilter filter,
Promise<Unit> &&promise) {
G()->ignore_result_if_closing(result);
if (result.is_error()) {
found_call_messages_.erase(random_id);
return promise.set_error(result.move_as_error());
}
auto calls_result = result.move_as_ok();
void MessagesManager::on_message_db_calls_result(Result<MessageDbCallsResult> result, MessageId first_db_message_id,
MessageId offset_message_id, int32 limit, MessageSearchFilter filter,
Promise<td_api::object_ptr<td_api::foundMessages>> &&promise) {
TRY_STATUS_PROMISE(promise, G()->close_status());
TRY_RESULT_PROMISE(promise, calls_result, std::move(result));
auto it = found_call_messages_.find(random_id);
CHECK(it != found_call_messages_.end());
auto &res = it->second.message_full_ids;
FoundMessages found_messages;
auto &res = found_messages.message_full_ids;
CHECK(!first_db_message_id.is_scheduled());
res.reserve(calls_result.messages.size());
@ -21226,17 +21237,17 @@ void MessagesManager::on_message_db_calls_result(Result<MessageDbCallsResult> re
res.emplace_back(message.dialog_id, m->message_id);
}
}
it->second.total_count = calls_db_state_.message_count_by_index[call_message_search_filter_index(filter)];
found_messages.total_count = calls_db_state_.message_count_by_index[call_message_search_filter_index(filter)];
if (next_offset_message_id.is_valid()) {
it->second.next_offset = PSTRING() << next_offset_message_id.get_server_message_id().get();
found_messages.next_offset = PSTRING() << next_offset_message_id.get_server_message_id().get();
}
if (res.empty() && first_db_message_id != MessageId::min()) {
LOG(INFO) << "No messages found in database";
found_call_messages_.erase(it);
return td_->create_handler<SearchCallMessagesQuery>(std::move(promise))->send(offset_message_id, limit, filter);
}
promise.set_value(Unit());
promise.set_value(get_found_messages_object(found_messages, "on_message_db_calls_result"));
}
void MessagesManager::search_messages(DialogListId dialog_list_id, bool ignore_folder_id, bool broadcasts_only,

View File

@ -182,6 +182,10 @@ class MessagesManager final : public Actor {
vector<tl_object_ptr<telegram_api::searchResultsCalendarPeriod>> &&periods,
Promise<td_api::object_ptr<td_api::messageCalendar>> &&promise);
void on_get_call_messages(MessageId from_message_id, int32 limit, MessageSearchFilter filter, int32 total_count,
vector<telegram_api::object_ptr<telegram_api::Message>> &&messages,
Promise<td_api::object_ptr<td_api::foundMessages>> &&promise);
void on_get_dialog_messages_search_result(DialogId dialog_id, SavedMessagesTopicId saved_messages_topic_id,
const string &query, DialogId sender_dialog_id, MessageId from_message_id,
int32 offset, int32 limit, MessageSearchFilter filter,
@ -728,8 +732,8 @@ class MessagesManager final : public Actor {
const string &offset_str, int32 limit, MessageSearchFilter filter, int32 min_date,
int32 max_date, Promise<td_api::object_ptr<td_api::foundMessages>> &&promise);
FoundMessages search_call_messages(const string &offset, int32 limit, bool only_missed, int64 &random_id, bool use_db,
Promise<Unit> &&promise);
void search_call_messages(const string &offset, int32 limit, bool only_missed,
Promise<td_api::object_ptr<td_api::foundMessages>> &&promise);
void search_outgoing_document_messages(const string &query, int32 limit,
Promise<td_api::object_ptr<td_api::foundMessages>> &&promise);
@ -2837,8 +2841,9 @@ class MessagesManager final : public Actor {
void on_message_db_fts_result(Result<MessageDbFtsResult> result, string offset, int32 limit,
Promise<td_api::object_ptr<td_api::foundMessages>> &&promise);
void on_message_db_calls_result(Result<MessageDbCallsResult> result, int64 random_id, MessageId first_db_message_id,
MessageSearchFilter filter, Promise<Unit> &&promise);
void on_message_db_calls_result(Result<MessageDbCallsResult> result, MessageId first_db_message_id,
MessageId offset_message_id, int32 limit, MessageSearchFilter filter,
Promise<td_api::object_ptr<td_api::foundMessages>> &&promise);
void on_load_active_live_location_message_full_ids_from_database(string value);
@ -3252,7 +3257,6 @@ class MessagesManager final : public Actor {
FlatHashMap<int64, FoundDialogMessages> found_dialog_messages_; // random_id -> FoundDialogMessages
FlatHashMap<int64, DialogId> found_dialog_messages_dialog_id_; // random_id -> dialog_id
FlatHashMap<int64, FoundMessages> found_call_messages_; // random_id -> FoundMessages
struct MessageEmbeddingCodes {
FlatHashMap<MessageId, string, MessageIdHash> embedding_codes_;

View File

@ -1437,34 +1437,6 @@ class SearchChatMessagesRequest final : public RequestActor<> {
}
};
class SearchCallMessagesRequest final : public RequestActor<> {
string offset_;
int32 limit_;
bool only_missed_;
int64 random_id_;
MessagesManager::FoundMessages messages_;
void do_run(Promise<Unit> &&promise) final {
messages_ = td_->messages_manager_->search_call_messages(offset_, limit_, only_missed_, random_id_,
get_tries() == 3, std::move(promise));
}
void do_send_result() final {
send_result(td_->messages_manager_->get_found_messages_object(messages_, "SearchCallMessagesRequest"));
}
public:
SearchCallMessagesRequest(ActorShared<Td> td, uint64 request_id, string offset, int32 limit, bool only_missed)
: RequestActor(std::move(td), request_id)
, offset_(std::move(offset))
, limit_(limit)
, only_missed_(only_missed)
, random_id_(0) {
set_tries(3);
}
};
class GetActiveLiveLocationMessagesRequest final : public RequestActor<> {
vector<MessageFullId> message_full_ids_;
@ -5323,7 +5295,8 @@ void Td::on_request(uint64 id, td_api::searchSavedMessages &request) {
void Td::on_request(uint64 id, const td_api::searchCallMessages &request) {
CHECK_IS_USER();
CREATE_REQUEST(SearchCallMessagesRequest, std::move(request.offset_), request.limit_, request.only_missed_);
CREATE_REQUEST_PROMISE();
messages_manager_->search_call_messages(request.offset_, request.limit_, request.only_missed_, std::move(promise));
}
void Td::on_request(uint64 id, td_api::searchOutgoingDocumentMessages &request) {