Move get_message_public_forwards to StatisticsManager.

This commit is contained in:
levlam 2023-09-25 21:39:02 +03:00
parent 7dc1aed0d2
commit 155095dff7
5 changed files with 177 additions and 168 deletions

View File

@ -2663,60 +2663,6 @@ class GetRecentLocationsQuery final : public Td::ResultHandler {
}
};
class GetMessagePublicForwardsQuery final : public Td::ResultHandler {
Promise<td_api::object_ptr<td_api::foundMessages>> promise_;
DialogId dialog_id_;
int32 limit_;
public:
explicit GetMessagePublicForwardsQuery(Promise<td_api::object_ptr<td_api::foundMessages>> &&promise)
: promise_(std::move(promise)) {
}
void send(DcId dc_id, MessageFullId message_full_id, int32 offset_date, DialogId offset_dialog_id,
ServerMessageId offset_message_id, int32 limit) {
dialog_id_ = message_full_id.get_dialog_id();
limit_ = limit;
auto input_peer = MessagesManager::get_input_peer_force(offset_dialog_id);
CHECK(input_peer != nullptr);
auto input_channel = td_->contacts_manager_->get_input_channel(dialog_id_.get_channel_id());
CHECK(input_channel != nullptr);
send_query(G()->net_query_creator().create(
telegram_api::stats_getMessagePublicForwards(
std::move(input_channel), message_full_id.get_message_id().get_server_message_id().get(), offset_date,
std::move(input_peer), offset_message_id.get(), limit),
{}, dc_id));
}
void on_result(BufferSlice packet) final {
auto result_ptr = fetch_result<telegram_api::stats_getMessagePublicForwards>(packet);
if (result_ptr.is_error()) {
return on_error(result_ptr.move_as_error());
}
auto info = get_messages_info(td_, DialogId(), result_ptr.move_as_ok(), "GetMessagePublicForwardsQuery");
td_->messages_manager_->get_channel_differences_if_needed(
std::move(info), PromiseCreator::lambda([actor_id = td_->messages_manager_actor_.get(),
promise = std::move(promise_)](Result<MessagesInfo> &&result) mutable {
if (result.is_error()) {
promise.set_error(result.move_as_error());
} else {
auto info = result.move_as_ok();
send_closure(actor_id, &MessagesManager::on_get_message_public_forwards, info.total_count,
std::move(info.messages), info.next_rate, std::move(promise));
}
}));
}
void on_error(Status status) final {
td_->messages_manager_->on_get_dialog_error(dialog_id_, status, "GetMessagePublicForwardsQuery");
promise_.set_error(std::move(status));
}
};
class HidePromoDataQuery final : public Td::ResultHandler {
DialogId dialog_id_;
@ -10574,54 +10520,6 @@ void MessagesManager::on_get_recent_locations(DialogId dialog_id, int32 limit, i
promise.set_value(get_messages_object(total_count, dialog_id, result, true, "on_get_recent_locations"));
}
void MessagesManager::on_get_message_public_forwards(int32 total_count,
vector<tl_object_ptr<telegram_api::Message>> &&messages,
int32 next_rate,
Promise<td_api::object_ptr<td_api::foundMessages>> &&promise) {
TRY_STATUS_PROMISE(promise, G()->close_status());
LOG(INFO) << "Receive " << messages.size() << " forwarded messages";
vector<td_api::object_ptr<td_api::message>> result;
int32 last_message_date = 0;
MessageId last_message_id;
DialogId last_dialog_id;
for (auto &message : messages) {
auto message_date = get_message_date(message);
auto message_id = MessageId::get_message_id(message, false);
auto dialog_id = DialogId::get_message_dialog_id(message);
if (message_date > 0 && message_id.is_valid() && dialog_id.is_valid()) {
last_message_date = message_date;
last_message_id = message_id;
last_dialog_id = dialog_id;
}
auto new_message_full_id = on_get_message(std::move(message), false, dialog_id.get_type() == DialogType::Channel,
false, "get message public forwards");
if (new_message_full_id != MessageFullId()) {
CHECK(dialog_id == new_message_full_id.get_dialog_id());
result.push_back(get_message_object(new_message_full_id, "on_get_message_public_forwards"));
CHECK(result.back() != nullptr);
} else {
total_count--;
}
}
if (total_count < static_cast<int32>(result.size())) {
LOG(ERROR) << "Receive " << result.size() << " valid messages out of " << total_count << " in " << messages.size()
<< " messages";
total_count = static_cast<int32>(result.size());
}
string next_offset;
if (!result.empty()) {
if (next_rate > 0) {
last_message_date = next_rate;
}
next_offset = PSTRING() << last_message_date << ',' << last_dialog_id.get() << ','
<< last_message_id.get_server_message_id().get();
}
promise.set_value(td_api::make_object<td_api::foundMessages>(total_count, std::move(result), next_offset));
}
void MessagesManager::delete_messages_from_updates(const vector<MessageId> &message_ids, bool is_permanent) {
FlatHashMap<DialogId, vector<int64>, DialogIdHash> deleted_message_ids;
FlatHashMap<DialogId, bool, DialogIdHash> need_update_dialog_pos;
@ -23993,59 +23891,6 @@ void MessagesManager::on_read_message_reactions(DialogId dialog_id, vector<Messa
}
}
void MessagesManager::get_message_public_forwards(MessageFullId message_full_id, string offset, int32 limit,
Promise<td_api::object_ptr<td_api::foundMessages>> &&promise) {
auto dc_id_promise = PromiseCreator::lambda([actor_id = actor_id(this), message_full_id, offset = std::move(offset),
limit, promise = std::move(promise)](Result<DcId> r_dc_id) mutable {
if (r_dc_id.is_error()) {
return promise.set_error(r_dc_id.move_as_error());
}
send_closure(actor_id, &MessagesManager::send_get_message_public_forwards_query, r_dc_id.move_as_ok(),
message_full_id, std::move(offset), limit, std::move(promise));
});
td_->contacts_manager_->get_channel_statistics_dc_id(message_full_id.get_dialog_id(), false,
std::move(dc_id_promise));
}
void MessagesManager::send_get_message_public_forwards_query(
DcId dc_id, MessageFullId message_full_id, string offset, int32 limit,
Promise<td_api::object_ptr<td_api::foundMessages>> &&promise) {
if (!can_get_message_statistics(message_full_id)) {
return promise.set_error(Status::Error(400, "Message forwards are inaccessible"));
}
if (limit <= 0) {
return promise.set_error(Status::Error(400, "Parameter limit must be positive"));
}
if (limit > MAX_SEARCH_MESSAGES) {
limit = MAX_SEARCH_MESSAGES;
}
int32 offset_date = std::numeric_limits<int32>::max();
DialogId offset_dialog_id;
ServerMessageId offset_server_message_id;
if (!offset.empty()) {
auto parts = full_split(offset, ',');
if (parts.size() != 3) {
return promise.set_error(Status::Error(400, "Invalid offset specified"));
}
auto r_offset_date = to_integer_safe<int32>(parts[0]);
auto r_offset_dialog_id = to_integer_safe<int64>(parts[1]);
auto r_offset_server_message_id = to_integer_safe<int32>(parts[2]);
if (r_offset_date.is_error() || r_offset_dialog_id.is_error() || r_offset_server_message_id.is_error()) {
return promise.set_error(Status::Error(400, "Invalid offset specified"));
}
offset_date = r_offset_date.ok();
offset_dialog_id = DialogId(r_offset_dialog_id.ok());
offset_server_message_id = ServerMessageId(r_offset_server_message_id.ok());
}
td_->create_handler<GetMessagePublicForwardsQuery>(std::move(promise))
->send(dc_id, message_full_id, offset_date, offset_dialog_id, offset_server_message_id, limit);
}
Result<int32> MessagesManager::get_message_schedule_date(
td_api::object_ptr<td_api::MessageSchedulingState> &&scheduling_state) {
if (scheduling_state == nullptr) {

View File

@ -171,6 +171,8 @@ class MessagesManager final : public Actor {
static bool is_invalid_poll_message(const telegram_api::Message *message);
static int32 get_message_date(const tl_object_ptr<telegram_api::Message> &message_ptr);
tl_object_ptr<telegram_api::InputPeer> get_input_peer(DialogId dialog_id, AccessRights access_rights) const;
static tl_object_ptr<telegram_api::InputPeer> get_input_peer_force(DialogId dialog_id);
@ -245,9 +247,6 @@ class MessagesManager final : public Actor {
vector<tl_object_ptr<telegram_api::Message>> &&messages,
Promise<td_api::object_ptr<td_api::messages>> &&promise);
void on_get_message_public_forwards(int32 total_count, vector<tl_object_ptr<telegram_api::Message>> &&messages,
int32 next_rate, Promise<td_api::object_ptr<td_api::foundMessages>> &&promise);
// if message is from_update, flags have_previous and have_next are ignored and must be both true
MessageFullId on_get_message(tl_object_ptr<telegram_api::Message> message_ptr, bool from_update,
bool is_channel_message, bool is_scheduled, const char *source);
@ -873,9 +872,6 @@ class MessagesManager final : public Actor {
void remove_message_reaction(MessageFullId message_full_id, ReactionType reaction_type, Promise<Unit> &&promise);
void get_message_public_forwards(MessageFullId message_full_id, string offset, int32 limit,
Promise<td_api::object_ptr<td_api::foundMessages>> &&promise);
tl_object_ptr<td_api::message> get_dialog_message_by_date_object(int64 random_id);
td_api::object_ptr<td_api::message> get_dialog_event_log_message_object(
@ -1752,8 +1748,6 @@ class MessagesManager final : public Actor {
static constexpr bool DROP_SEND_MESSAGE_UPDATES = false;
static int32 get_message_date(const tl_object_ptr<telegram_api::Message> &message_ptr);
vector<UserId> get_message_user_ids(const Message *m) const;
static vector<ChannelId> get_message_channel_ids(const Message *m);
@ -3149,9 +3143,6 @@ class MessagesManager final : public Actor {
Status can_import_messages(DialogId dialog_id);
void send_get_message_public_forwards_query(DcId dc_id, MessageFullId message_full_id, string offset, int32 limit,
Promise<td_api::object_ptr<td_api::foundMessages>> &&promise);
void add_sponsored_dialog(const Dialog *d, DialogSource source);
void save_sponsored_dialog();

View File

@ -9,6 +9,7 @@
#include "td/telegram/ContactsManager.h"
#include "td/telegram/Global.h"
#include "td/telegram/MessageId.h"
#include "td/telegram/MessagesInfo.h"
#include "td/telegram/MessagesManager.h"
#include "td/telegram/ServerMessageId.h"
#include "td/telegram/Td.h"
@ -298,6 +299,60 @@ class LoadAsyncGraphQuery final : public Td::ResultHandler {
}
};
class GetMessagePublicForwardsQuery final : public Td::ResultHandler {
Promise<td_api::object_ptr<td_api::foundMessages>> promise_;
DialogId dialog_id_;
int32 limit_;
public:
explicit GetMessagePublicForwardsQuery(Promise<td_api::object_ptr<td_api::foundMessages>> &&promise)
: promise_(std::move(promise)) {
}
void send(DcId dc_id, MessageFullId message_full_id, int32 offset_date, DialogId offset_dialog_id,
ServerMessageId offset_message_id, int32 limit) {
dialog_id_ = message_full_id.get_dialog_id();
limit_ = limit;
auto input_peer = MessagesManager::get_input_peer_force(offset_dialog_id);
CHECK(input_peer != nullptr);
auto input_channel = td_->contacts_manager_->get_input_channel(dialog_id_.get_channel_id());
CHECK(input_channel != nullptr);
send_query(G()->net_query_creator().create(
telegram_api::stats_getMessagePublicForwards(
std::move(input_channel), message_full_id.get_message_id().get_server_message_id().get(), offset_date,
std::move(input_peer), offset_message_id.get(), limit),
{}, dc_id));
}
void on_result(BufferSlice packet) final {
auto result_ptr = fetch_result<telegram_api::stats_getMessagePublicForwards>(packet);
if (result_ptr.is_error()) {
return on_error(result_ptr.move_as_error());
}
auto info = get_messages_info(td_, DialogId(), result_ptr.move_as_ok(), "GetMessagePublicForwardsQuery");
td_->messages_manager_->get_channel_differences_if_needed(
std::move(info), PromiseCreator::lambda([actor_id = td_->statistics_manager_actor_.get(),
promise = std::move(promise_)](Result<MessagesInfo> &&result) mutable {
if (result.is_error()) {
promise.set_error(result.move_as_error());
} else {
auto info = result.move_as_ok();
send_closure(actor_id, &StatisticsManager::on_get_message_public_forwards, info.total_count,
std::move(info.messages), info.next_rate, std::move(promise));
}
}));
}
void on_error(Status status) final {
td_->messages_manager_->on_get_dialog_error(dialog_id_, status, "GetMessagePublicForwardsQuery");
promise_.set_error(std::move(status));
}
};
StatisticsManager::StatisticsManager(Td *td, ActorShared<> parent) : td_(td), parent_(std::move(parent)) {
}
@ -380,4 +435,111 @@ void StatisticsManager::send_load_async_graph_query(DcId dc_id, string token, in
td_->create_handler<LoadAsyncGraphQuery>(std::move(promise))->send(token, x, dc_id);
}
void StatisticsManager::get_message_public_forwards(MessageFullId message_full_id, string offset, int32 limit,
Promise<td_api::object_ptr<td_api::foundMessages>> &&promise) {
if (limit <= 0) {
return promise.set_error(Status::Error(400, "Parameter limit must be positive"));
}
auto dc_id_promise = PromiseCreator::lambda([actor_id = actor_id(this), message_full_id, offset = std::move(offset),
limit, promise = std::move(promise)](Result<DcId> r_dc_id) mutable {
if (r_dc_id.is_error()) {
return promise.set_error(r_dc_id.move_as_error());
}
send_closure(actor_id, &StatisticsManager::send_get_message_public_forwards_query, r_dc_id.move_as_ok(),
message_full_id, std::move(offset), limit, std::move(promise));
});
td_->contacts_manager_->get_channel_statistics_dc_id(message_full_id.get_dialog_id(), false,
std::move(dc_id_promise));
}
void StatisticsManager::send_get_message_public_forwards_query(
DcId dc_id, MessageFullId message_full_id, string offset, int32 limit,
Promise<td_api::object_ptr<td_api::foundMessages>> &&promise) {
if (!td_->messages_manager_->have_message_force(message_full_id, "send_get_message_public_forwards_query")) {
return promise.set_error(Status::Error(400, "Message not found"));
}
if (!td_->messages_manager_->can_get_message_statistics(message_full_id)) {
return promise.set_error(Status::Error(400, "Message forwards are inaccessible"));
}
static constexpr int32 MAX_MESSAGE_FORWARDS = 100; // server side limit
if (limit > MAX_MESSAGE_FORWARDS) {
limit = MAX_MESSAGE_FORWARDS;
}
int32 offset_date = std::numeric_limits<int32>::max();
DialogId offset_dialog_id;
ServerMessageId offset_server_message_id;
if (!offset.empty()) {
auto parts = full_split(offset, ',');
if (parts.size() != 3) {
return promise.set_error(Status::Error(400, "Invalid offset specified"));
}
auto r_offset_date = to_integer_safe<int32>(parts[0]);
auto r_offset_dialog_id = to_integer_safe<int64>(parts[1]);
auto r_offset_server_message_id = to_integer_safe<int32>(parts[2]);
if (r_offset_date.is_error() || r_offset_dialog_id.is_error() || r_offset_server_message_id.is_error()) {
return promise.set_error(Status::Error(400, "Invalid offset specified"));
}
offset_date = r_offset_date.ok();
offset_dialog_id = DialogId(r_offset_dialog_id.ok());
offset_server_message_id = ServerMessageId(r_offset_server_message_id.ok());
}
td_->create_handler<GetMessagePublicForwardsQuery>(std::move(promise))
->send(dc_id, message_full_id, offset_date, offset_dialog_id, offset_server_message_id, limit);
}
void StatisticsManager::on_get_message_public_forwards(
int32 total_count, vector<telegram_api::object_ptr<telegram_api::Message>> &&messages, int32 next_rate,
Promise<td_api::object_ptr<td_api::foundMessages>> &&promise) {
TRY_STATUS_PROMISE(promise, G()->close_status());
LOG(INFO) << "Receive " << messages.size() << " forwarded messages";
vector<td_api::object_ptr<td_api::message>> result;
int32 last_message_date = 0;
MessageId last_message_id;
DialogId last_dialog_id;
for (auto &message : messages) {
auto message_date = MessagesManager::get_message_date(message);
auto message_id = MessageId::get_message_id(message, false);
auto dialog_id = DialogId::get_message_dialog_id(message);
if (message_date > 0 && message_id.is_valid() && dialog_id.is_valid()) {
last_message_date = message_date;
last_message_id = message_id;
last_dialog_id = dialog_id;
}
auto new_message_full_id =
td_->messages_manager_->on_get_message(std::move(message), false, dialog_id.get_type() == DialogType::Channel,
false, "on_get_message_public_forwards");
if (new_message_full_id != MessageFullId()) {
CHECK(dialog_id == new_message_full_id.get_dialog_id());
result.push_back(
td_->messages_manager_->get_message_object(new_message_full_id, "on_get_message_public_forwards"));
CHECK(result.back() != nullptr);
} else {
total_count--;
}
}
if (total_count < static_cast<int32>(result.size())) {
LOG(ERROR) << "Receive " << result.size() << " valid messages out of " << total_count << " in " << messages.size()
<< " messages";
total_count = static_cast<int32>(result.size());
}
string next_offset;
if (!result.empty()) {
if (next_rate > 0) {
last_message_date = next_rate;
}
next_offset = PSTRING() << last_message_date << ',' << last_dialog_id.get() << ','
<< last_message_id.get_server_message_id().get();
}
promise.set_value(td_api::make_object<td_api::foundMessages>(total_count, std::move(result), next_offset));
}
} // namespace td

View File

@ -11,6 +11,7 @@
#include "td/telegram/MessageFullId.h"
#include "td/telegram/net/DcId.h"
#include "td/telegram/td_api.h"
#include "td/telegram/telegram_api.h"
#include "td/actor/actor.h"
@ -34,6 +35,13 @@ class StatisticsManager final : public Actor {
void load_statistics_graph(DialogId dialog_id, string token, int64 x,
Promise<td_api::object_ptr<td_api::StatisticalGraph>> &&promise);
void get_message_public_forwards(MessageFullId message_full_id, string offset, int32 limit,
Promise<td_api::object_ptr<td_api::foundMessages>> &&promise);
void on_get_message_public_forwards(int32 total_count,
vector<telegram_api::object_ptr<telegram_api::Message>> &&messages,
int32 next_rate, Promise<td_api::object_ptr<td_api::foundMessages>> &&promise);
private:
void tear_down() final;
@ -46,6 +54,9 @@ class StatisticsManager final : public Actor {
void send_load_async_graph_query(DcId dc_id, string token, int64 x,
Promise<td_api::object_ptr<td_api::StatisticalGraph>> &&promise);
void send_get_message_public_forwards_query(DcId dc_id, MessageFullId message_full_id, string offset, int32 limit,
Promise<td_api::object_ptr<td_api::foundMessages>> &&promise);
Td *td_;
ActorShared<> parent_;
};

View File

@ -5475,8 +5475,8 @@ void Td::on_request(uint64 id, td_api::getMessagePublicForwards &request) {
CHECK_IS_USER();
CLEAN_INPUT_STRING(request.offset_);
CREATE_REQUEST_PROMISE();
messages_manager_->get_message_public_forwards({DialogId(request.chat_id_), MessageId(request.message_id_)},
std::move(request.offset_), request.limit_, std::move(promise));
statistics_manager_->get_message_public_forwards({DialogId(request.chat_id_), MessageId(request.message_id_)},
std::move(request.offset_), request.limit_, std::move(promise));
}
void Td::on_request(uint64 id, const td_api::removeNotification &request) {