Unify AffectedHistory handling.

This commit is contained in:
levlam 2021-11-11 20:55:49 +03:00
parent b731414d42
commit f22d96ac9c
2 changed files with 134 additions and 183 deletions

View File

@ -610,10 +610,16 @@ class UpdateDialogPinnedMessageQuery final : public Td::ResultHandler {
};
class UnpinAllMessagesQuery final : public Td::ResultHandler {
Promise<Unit> promise_;
Promise<AffectedHistory> promise_;
DialogId dialog_id_;
void send_request() {
public:
explicit UnpinAllMessagesQuery(Promise<AffectedHistory> &&promise) : promise_(std::move(promise)) {
}
void send(DialogId dialog_id) {
dialog_id_ = dialog_id;
auto input_peer = td_->messages_manager_->get_input_peer(dialog_id_, AccessRights::Write);
if (input_peer == nullptr) {
LOG(INFO) << "Can't unpin all messages in " << dialog_id_;
@ -623,45 +629,13 @@ class UnpinAllMessagesQuery final : public Td::ResultHandler {
send_query(G()->net_query_creator().create(telegram_api::messages_unpinAllMessages(std::move(input_peer))));
}
public:
explicit UnpinAllMessagesQuery(Promise<Unit> &&promise) : promise_(std::move(promise)) {
}
void send(DialogId dialog_id) {
dialog_id_ = dialog_id;
send_request();
}
void on_result(BufferSlice packet) final {
auto result_ptr = fetch_result<telegram_api::messages_unpinAllMessages>(packet);
if (result_ptr.is_error()) {
return on_error(result_ptr.move_as_error());
}
auto affected_history = result_ptr.move_as_ok();
CHECK(affected_history->get_id() == telegram_api::messages_affectedHistory::ID);
if (affected_history->pts_count_ > 0) {
affected_history->pts_count_ = 0; // force receiving real updates from the server
auto promise = affected_history->offset_ > 0 ? Promise<Unit>() : std::move(promise_);
if (dialog_id_.get_type() == DialogType::Channel) {
td_->messages_manager_->add_pending_channel_update(dialog_id_, make_tl_object<dummyUpdate>(),
affected_history->pts_, affected_history->pts_count_,
std::move(promise), "unpin all messages");
} else {
td_->updates_manager_->add_pending_pts_update(make_tl_object<dummyUpdate>(), affected_history->pts_,
affected_history->pts_count_, Time::now(), std::move(promise),
"unpin all messages");
}
} else if (affected_history->offset_ <= 0) {
promise_.set_value(Unit());
}
if (affected_history->offset_ > 0) {
send_request();
return;
}
promise_.set_value(result_ptr.move_as_ok());
}
void on_error(Status status) final {
@ -2655,42 +2629,32 @@ class HidePromoDataQuery final : public Td::ResultHandler {
};
class DeleteHistoryQuery final : public Td::ResultHandler {
Promise<Unit> promise_;
Promise<AffectedHistory> promise_;
DialogId dialog_id_;
MessageId max_message_id_;
bool remove_from_dialog_list_;
bool revoke_;
void send_request() {
public:
explicit DeleteHistoryQuery(Promise<AffectedHistory> &&promise) : promise_(std::move(promise)) {
}
void send(DialogId dialog_id, MessageId max_message_id, bool remove_from_dialog_list, bool revoke) {
dialog_id_ = dialog_id;
auto input_peer = td_->messages_manager_->get_input_peer(dialog_id_, AccessRights::Read);
if (input_peer == nullptr) {
return promise_.set_error(Status::Error(400, "Chat is not accessible"));
}
int32 flags = 0;
if (!remove_from_dialog_list_) {
if (!remove_from_dialog_list) {
flags |= telegram_api::messages_deleteHistory::JUST_CLEAR_MASK;
}
if (revoke_) {
if (revoke) {
flags |= telegram_api::messages_deleteHistory::REVOKE_MASK;
}
send_query(G()->net_query_creator().create(
telegram_api::messages_deleteHistory(flags, false /*ignored*/, false /*ignored*/, std::move(input_peer),
max_message_id_.get_server_message_id().get(), 0, 0)));
}
public:
explicit DeleteHistoryQuery(Promise<Unit> &&promise) : promise_(std::move(promise)) {
}
void send(DialogId dialog_id, MessageId max_message_id, bool remove_from_dialog_list, bool revoke) {
dialog_id_ = dialog_id;
max_message_id_ = max_message_id;
remove_from_dialog_list_ = remove_from_dialog_list;
revoke_ = revoke;
send_request();
max_message_id.get_server_message_id().get(), 0, 0)));
}
void on_result(BufferSlice packet) final {
@ -2699,21 +2663,7 @@ class DeleteHistoryQuery final : public Td::ResultHandler {
return on_error(result_ptr.move_as_error());
}
auto affected_history = result_ptr.move_as_ok();
CHECK(affected_history->get_id() == telegram_api::messages_affectedHistory::ID);
if (affected_history->pts_count_ > 0) {
td_->updates_manager_->add_pending_pts_update(make_tl_object<dummyUpdate>(), affected_history->pts_,
affected_history->pts_count_, Time::now(), Promise<Unit>(),
"delete history query");
}
if (affected_history->offset_ > 0) {
send_request();
return;
}
promise_.set_value(Unit());
promise_.set_value(result_ptr.move_as_ok());
}
void on_error(Status status) final {
@ -2765,13 +2715,16 @@ class DeleteChannelHistoryQuery final : public Td::ResultHandler {
};
class DeleteMessagesByDateQuery final : public Td::ResultHandler {
Promise<Unit> promise_;
Promise<AffectedHistory> promise_;
DialogId dialog_id_;
int32 min_date_;
int32 max_date_;
bool revoke_;
void send_request() {
public:
explicit DeleteMessagesByDateQuery(Promise<AffectedHistory> &&promise) : promise_(std::move(promise)) {
}
void send(DialogId dialog_id, int32 min_date, int32 max_date, bool revoke) {
dialog_id_ = dialog_id;
auto input_peer = td_->messages_manager_->get_input_peer(dialog_id_, AccessRights::Read);
if (input_peer == nullptr) {
return promise_.set_error(Status::Error(400, "Chat is not accessible"));
@ -2780,25 +2733,12 @@ class DeleteMessagesByDateQuery final : public Td::ResultHandler {
int32 flags = telegram_api::messages_deleteHistory::JUST_CLEAR_MASK |
telegram_api::messages_deleteHistory::MIN_DATE_MASK |
telegram_api::messages_deleteHistory::MAX_DATE_MASK;
if (revoke_) {
if (revoke) {
flags |= telegram_api::messages_deleteHistory::REVOKE_MASK;
}
send_query(G()->net_query_creator().create(telegram_api::messages_deleteHistory(
flags, false /*ignored*/, false /*ignored*/, std::move(input_peer), 0, min_date_, max_date_)));
}
public:
explicit DeleteMessagesByDateQuery(Promise<Unit> &&promise) : promise_(std::move(promise)) {
}
void send(DialogId dialog_id, int32 min_date, int32 max_date, bool revoke) {
dialog_id_ = dialog_id;
min_date_ = min_date;
max_date_ = max_date;
revoke_ = revoke;
send_request();
flags, false /*ignored*/, false /*ignored*/, std::move(input_peer), 0, min_date, max_date)));
}
void on_result(BufferSlice packet) final {
@ -2807,23 +2747,7 @@ class DeleteMessagesByDateQuery final : public Td::ResultHandler {
return on_error(result_ptr.move_as_error());
}
auto affected_history = result_ptr.move_as_ok();
CHECK(affected_history->get_id() == telegram_api::messages_affectedHistory::ID);
if (affected_history->pts_count_ > 0) {
affected_history->pts_count_ = 0; // force receiving real updates from the server
auto promise = affected_history->offset_ > 0 ? Promise<Unit>() : std::move(promise_);
td_->updates_manager_->add_pending_pts_update(make_tl_object<dummyUpdate>(), affected_history->pts_,
affected_history->pts_count_, Time::now(), std::move(promise),
"DeleteMessagesByDateQuery");
} else if (affected_history->offset_ <= 0) {
promise_.set_value(Unit());
}
if (affected_history->offset_ > 0) {
send_request();
return;
}
promise_.set_value(result_ptr.move_as_ok());
}
void on_error(Status status) final {
@ -2926,16 +2850,21 @@ class BlockFromRepliesQuery final : public Td::ResultHandler {
};
class DeleteUserHistoryQuery final : public Td::ResultHandler {
Promise<Unit> promise_;
Promise<AffectedHistory> promise_;
ChannelId channel_id_;
UserId user_id_;
void send_request() {
public:
explicit DeleteUserHistoryQuery(Promise<AffectedHistory> &&promise) : promise_(std::move(promise)) {
}
void send(ChannelId channel_id, UserId user_id) {
channel_id_ = channel_id;
auto input_channel = td_->contacts_manager_->get_input_channel(channel_id_);
if (input_channel == nullptr) {
return promise_.set_error(Status::Error(400, "Chat is not accessible"));
}
auto input_user = td_->contacts_manager_->get_input_user(user_id_);
auto input_user = td_->contacts_manager_->get_input_user(user_id);
if (input_user == nullptr) {
return promise_.set_error(Status::Error(400, "Usat is not accessible"));
}
@ -2944,38 +2873,13 @@ class DeleteUserHistoryQuery final : public Td::ResultHandler {
telegram_api::channels_deleteUserHistory(std::move(input_channel), std::move(input_user))));
}
public:
explicit DeleteUserHistoryQuery(Promise<Unit> &&promise) : promise_(std::move(promise)) {
}
void send(ChannelId channel_id, UserId user_id) {
channel_id_ = channel_id;
user_id_ = user_id;
send_request();
}
void on_result(BufferSlice packet) final {
auto result_ptr = fetch_result<telegram_api::channels_deleteUserHistory>(packet);
if (result_ptr.is_error()) {
return on_error(result_ptr.move_as_error());
}
auto affected_history = result_ptr.move_as_ok();
CHECK(affected_history->get_id() == telegram_api::messages_affectedHistory::ID);
if (affected_history->pts_count_ > 0) {
td_->messages_manager_->add_pending_channel_update(
DialogId(channel_id_), make_tl_object<dummyUpdate>(), affected_history->pts_, affected_history->pts_count_,
affected_history->offset_ > 0 ? Promise<Unit>() : std::move(promise_), "delete user history query");
} else if (affected_history->offset_ <= 0) {
promise_.set_value(Unit());
}
if (affected_history->offset_ > 0) {
send_request();
return;
}
promise_.set_value(result_ptr.move_as_ok());
}
void on_error(Status status) final {
@ -2985,10 +2889,16 @@ class DeleteUserHistoryQuery final : public Td::ResultHandler {
};
class ReadMentionsQuery final : public Td::ResultHandler {
Promise<Unit> promise_;
Promise<AffectedHistory> promise_;
DialogId dialog_id_;
void send_request() {
public:
explicit ReadMentionsQuery(Promise<AffectedHistory> &&promise) : promise_(std::move(promise)) {
}
void send(DialogId dialog_id) {
dialog_id_ = dialog_id;
auto input_peer = td_->messages_manager_->get_input_peer(dialog_id_, AccessRights::Read);
if (input_peer == nullptr) {
return promise_.set_error(Status::Error(400, "Chat is not accessible"));
@ -2997,43 +2907,13 @@ class ReadMentionsQuery final : public Td::ResultHandler {
send_query(G()->net_query_creator().create(telegram_api::messages_readMentions(std::move(input_peer))));
}
public:
explicit ReadMentionsQuery(Promise<Unit> &&promise) : promise_(std::move(promise)) {
}
void send(DialogId dialog_id) {
dialog_id_ = dialog_id;
send_request();
}
void on_result(BufferSlice packet) final {
auto result_ptr = fetch_result<telegram_api::messages_readMentions>(packet);
if (result_ptr.is_error()) {
return on_error(result_ptr.move_as_error());
}
auto affected_history = result_ptr.move_as_ok();
CHECK(affected_history->get_id() == telegram_api::messages_affectedHistory::ID);
if (affected_history->pts_count_ > 0) {
if (dialog_id_.get_type() == DialogType::Channel) {
LOG(ERROR) << "Receive pts_count " << affected_history->pts_count_ << " in result of ReadMentionsQuery in "
<< dialog_id_;
td_->updates_manager_->get_difference("Wrong messages_readMentions result");
} else {
td_->updates_manager_->add_pending_pts_update(make_tl_object<dummyUpdate>(), affected_history->pts_,
affected_history->pts_count_, Time::now(), Promise<Unit>(),
"read all mentions query");
}
}
if (affected_history->offset_ > 0) {
send_request();
return;
}
promise_.set_value(Unit());
promise_.set_value(result_ptr.move_as_ok());
}
void on_error(Status status) final {
@ -10795,10 +10675,15 @@ void MessagesManager::delete_dialog_history_on_server(DialogId dialog_id, Messag
switch (dialog_id.get_type()) {
case DialogType::User:
case DialogType::Chat:
td_->create_handler<DeleteHistoryQuery>(std::move(promise))
->send(dialog_id, max_message_id, remove_from_dialog_list, revoke);
case DialogType::Chat: {
AffectedHistoryQuery query = [td = td_, max_message_id, remove_from_dialog_list, revoke](
DialogId dialog_id, Promise<AffectedHistory> &&query_promise) {
td->create_handler<DeleteHistoryQuery>(std::move(query_promise))
->send(dialog_id, max_message_id, remove_from_dialog_list, revoke);
};
run_affected_history_query_until_complete(dialog_id, std::move(query), false, std::move(promise));
break;
}
case DialogType::Channel:
td_->create_handler<DeleteChannelHistoryQuery>(std::move(promise))
->send(dialog_id.get_channel_id(), max_message_id, allow_error);
@ -11023,8 +10908,11 @@ void MessagesManager::delete_all_channel_messages_from_user_on_server(ChannelId
log_event_id = save_delete_all_channel_messages_from_user_on_server_log_event(channel_id, user_id);
}
td_->create_handler<DeleteUserHistoryQuery>(get_erase_log_event_promise(log_event_id, std::move(promise)))
->send(channel_id, user_id);
AffectedHistoryQuery query = [td = td_, user_id](DialogId dialog_id, Promise<AffectedHistory> &&query_promise) {
td->create_handler<DeleteUserHistoryQuery>(std::move(query_promise))->send(dialog_id.get_channel_id(), user_id);
};
run_affected_history_query_until_complete(DialogId(channel_id), std::move(query), false,
get_erase_log_event_promise(log_event_id, std::move(promise)));
}
void MessagesManager::delete_dialog_messages_by_date(DialogId dialog_id, int32 min_date, int32 max_date, bool revoke,
@ -11143,8 +11031,13 @@ void MessagesManager::delete_dialog_messages_by_date_on_server(DialogId dialog_i
log_event_id = save_delete_dialog_messages_by_date_on_server_log_event(dialog_id, min_date, max_date, revoke);
}
td_->create_handler<DeleteMessagesByDateQuery>(get_erase_log_event_promise(log_event_id, std::move(promise)))
->send(dialog_id, min_date, max_date, revoke);
AffectedHistoryQuery query = [td = td_, min_date, max_date, revoke](DialogId dialog_id,
Promise<AffectedHistory> &&query_promise) {
td->create_handler<DeleteMessagesByDateQuery>(std::move(query_promise))
->send(dialog_id, min_date, max_date, revoke);
};
run_affected_history_query_until_complete(dialog_id, std::move(query), true,
get_erase_log_event_promise(log_event_id, std::move(promise)));
}
int32 MessagesManager::get_unload_dialog_delay() const {
@ -11404,9 +11297,11 @@ void MessagesManager::read_all_dialog_mentions_on_server(DialogId dialog_id, uin
log_event_id = save_read_all_dialog_mentions_on_server_log_event(dialog_id);
}
LOG(INFO) << "Read all mentions on server in " << dialog_id;
td_->create_handler<ReadMentionsQuery>(get_erase_log_event_promise(log_event_id, std::move(promise)))
->send(dialog_id);
AffectedHistoryQuery query = [td = td_](DialogId dialog_id, Promise<AffectedHistory> &&query_promise) {
td->create_handler<ReadMentionsQuery>(std::move(query_promise))->send(dialog_id);
};
run_affected_history_query_until_complete(dialog_id, std::move(query), false,
get_erase_log_event_promise(log_event_id, std::move(promise)));
}
void MessagesManager::read_message_content_from_updates(MessageId message_id) {
@ -22548,6 +22443,49 @@ int64 MessagesManager::get_dialog_message_by_date(DialogId dialog_id, int32 date
return random_id;
}
void MessagesManager::run_affected_history_query_until_complete(DialogId dialog_id, AffectedHistoryQuery query,
bool get_affected_messages, Promise<Unit> &&promise) {
CHECK(!G()->close_flag());
auto query_promise = PromiseCreator::lambda([actor_id = actor_id(this), dialog_id, query, get_affected_messages,
promise = std::move(promise)](Result<AffectedHistory> &&result) mutable {
if (result.is_error()) {
return promise.set_error(result.move_as_error());
}
send_closure(actor_id, &MessagesManager::on_get_affected_history, dialog_id, query, get_affected_messages,
result.move_as_ok(), std::move(promise));
});
query(dialog_id, std::move(query_promise));
}
void MessagesManager::on_get_affected_history(DialogId dialog_id, AffectedHistoryQuery query,
bool get_affected_messages, AffectedHistory affected_history,
Promise<Unit> &&promise) {
TRY_STATUS_PROMISE(promise, G()->close_status());
if (affected_history->pts_count_ > 0) {
if (get_affected_messages) {
affected_history->pts_count_ = 0;
}
auto update_promise = affected_history->offset_ > 0 ? Promise<Unit>() : std::move(promise);
if (dialog_id.get_type() == DialogType::Channel) {
td_->messages_manager_->add_pending_channel_update(dialog_id, make_tl_object<dummyUpdate>(),
affected_history->pts_, affected_history->pts_count_,
std::move(update_promise), "on_get_affected_history");
} else {
td_->updates_manager_->add_pending_pts_update(make_tl_object<dummyUpdate>(), affected_history->pts_,
affected_history->pts_count_, Time::now(),
std::move(update_promise), "on_get_affected_history");
}
} else if (affected_history->offset_ <= 0) {
promise.set_value(Unit());
}
if (affected_history->offset_ > 0) {
run_affected_history_query_until_complete(dialog_id, std::move(query), get_affected_messages, std::move(promise));
}
}
MessageId MessagesManager::find_message_by_date(const Message *m, int32 date) {
if (m == nullptr) {
return MessageId();
@ -32255,8 +32193,11 @@ void MessagesManager::unpin_all_dialog_messages_on_server(DialogId dialog_id, ui
log_event_id = save_unpin_all_dialog_messages_on_server_log_event(dialog_id);
}
td_->create_handler<UnpinAllMessagesQuery>(get_erase_log_event_promise(log_event_id, std::move(promise)))
->send(dialog_id);
AffectedHistoryQuery query = [td = td_](DialogId dialog_id, Promise<AffectedHistory> &&query_promise) {
td->create_handler<UnpinAllMessagesQuery>(std::move(query_promise))->send(dialog_id);
};
run_affected_history_query_until_complete(dialog_id, std::move(query), true,
get_erase_log_event_promise(log_event_id, std::move(promise)));
}
unique_ptr<MessagesManager::Message> *MessagesManager::treap_find_message(unique_ptr<Message> *v,

View File

@ -92,6 +92,8 @@ class MessageContent;
class MultiSequenceDispatcher;
class Td;
using AffectedHistory = tl_object_ptr<telegram_api::messages_affectedHistory>;
class MessagesManager final : public Actor {
public:
// static constexpr int32 MESSAGE_FLAG_IS_UNREAD = 1 << 0;
@ -2027,6 +2029,14 @@ class MessagesManager final : public Actor {
void unpin_all_dialog_messages_on_server(DialogId dialog_id, uint64 log_event_id, Promise<Unit> &&promise);
using AffectedHistoryQuery = std::function<void(DialogId, Promise<AffectedHistory>)>;
void run_affected_history_query_until_complete(DialogId dialog_id, AffectedHistoryQuery query,
bool get_affected_messages, Promise<Unit> &&promise);
void on_get_affected_history(DialogId dialog_id, AffectedHistoryQuery query, bool get_affected_messages,
AffectedHistory affected_history, Promise<Unit> &&promise);
static MessageId find_message_by_date(const Message *m, int32 date);
static void find_messages_by_date(const Message *m, int32 min_date, int32 max_date, vector<MessageId> &message_ids);