Add cache of channel participants for bots-administrators.

This commit is contained in:
levlam 2021-06-12 23:53:14 +03:00
parent 6f51690275
commit 1e8724127a
2 changed files with 143 additions and 8 deletions

View File

@ -3453,6 +3453,9 @@ ContactsManager::ContactsManager(Td *td, ActorShared<> parent) : td_(td), parent
invite_link_info_expire_timeout_.set_callback(on_invite_link_info_expire_timeout_callback);
invite_link_info_expire_timeout_.set_callback_data(static_cast<void *>(this));
channel_participant_cache_timeout_.set_callback(on_channel_participant_cache_timeout_callback);
channel_participant_cache_timeout_.set_callback_data(static_cast<void *>(this));
}
ContactsManager::~ContactsManager() = default;
@ -3614,6 +3617,44 @@ void ContactsManager::on_invite_link_info_expire_timeout(DialogId dialog_id) {
remove_dialog_access_by_invite_link(dialog_id);
}
void ContactsManager::on_channel_participant_cache_timeout_callback(void *contacts_manager_ptr, int64 channel_id_long) {
if (G()->close_flag()) {
return;
}
auto contacts_manager = static_cast<ContactsManager *>(contacts_manager_ptr);
send_closure_later(contacts_manager->actor_id(contacts_manager),
&ContactsManager::on_channel_participant_cache_timeout,
ChannelId(narrow_cast<int32>(channel_id_long)));
}
void ContactsManager::on_channel_participant_cache_timeout(ChannelId channel_id) {
if (G()->close_flag()) {
return;
}
auto channel_participants_it = channel_participants_.find(channel_id);
if (channel_participants_it == channel_participants_.end()) {
return;
}
auto &participants = channel_participants_it->second.participants_;
auto min_access_date = G()->unix_time() - CHANNEL_PARTICIPANT_CACHE_TIME;
for (auto it = participants.begin(); it != participants.end();) {
if (it->second.last_access_date_ < min_access_date) {
it = participants.erase(it);
} else {
++it;
}
}
if (participants.empty()) {
channel_participants_.erase(channel_participants_it);
} else {
channel_participant_cache_timeout_.set_timeout_in(channel_id.get(), CHANNEL_PARTICIPANT_CACHE_TIME);
}
}
template <class StorerT>
void ContactsManager::BotInfo::store(StorerT &storer) const {
using td::store;
@ -11749,6 +11790,11 @@ void ContactsManager::on_get_channel_participants(
on_update_channel_bot_user_ids(channel_id, std::move(bot_user_ids));
}
}
if (have_channel_participant_cache(channel_id)) {
for (const auto &participant : result) {
add_channel_participant_to_cache(channel_id, participant, false);
}
}
if (participant_count != -1 || administrator_count != -1) {
auto channel_full = get_channel_full_force(channel_id, "on_get_channel_participants_success");
@ -11803,6 +11849,46 @@ void ContactsManager::on_get_channel_participants(
promise.set_value(DialogParticipants{total_count, std::move(result)});
}
bool ContactsManager::have_channel_participant_cache(ChannelId channel_id) const {
if (!td_->auth_manager_->is_bot()) {
return false;
}
auto c = get_channel(channel_id);
return c != nullptr && c->status.is_administrator();
}
void ContactsManager::add_channel_participant_to_cache(ChannelId channel_id,
const DialogParticipant &dialog_participant,
bool allow_replace) {
auto &participants = channel_participants_[channel_id];
if (participants.participants_.empty()) {
channel_participant_cache_timeout_.set_timeout_in(channel_id.get(), CHANNEL_PARTICIPANT_CACHE_TIME);
}
auto &participant_info = participants.participants_[dialog_participant.dialog_id];
if (participant_info.last_access_date_ > 0 && !allow_replace) {
return;
}
participant_info.participant_ = dialog_participant;
participant_info.last_access_date_ = G()->unix_time();
}
const DialogParticipant *ContactsManager::get_channel_participant_from_cache(ChannelId channel_id,
DialogId participant_dialog_id) const {
auto channel_participants_it = channel_participants_.find(channel_id);
if (channel_participants_it == channel_participants_.end()) {
return nullptr;
}
auto &participants = channel_participants_it->second.participants_;
CHECK(!participants.empty());
auto it = participants.find(participant_dialog_id);
if (it != participants.end()) {
it->second.last_access_date_ = G()->unix_time();
return &it->second.participant_;
}
return nullptr;
}
bool ContactsManager::speculative_add_count(int32 &count, int32 delta_count, int32 min_count) {
auto new_count = count + delta_count;
if (new_count < min_count) {
@ -13103,6 +13189,9 @@ void ContactsManager::on_channel_status_changed(const Channel *c, ChannelId chan
send_closure_later(G()->messages_manager(), &MessagesManager::on_update_dialog_group_call_rights,
DialogId(channel_id));
}
if (td_->auth_manager_->is_bot() && old_status.is_administrator() && !new_status.is_administrator()) {
channel_participants_.erase(channel_id);
}
// must not load ChannelFull, because must not change the Channel
CHECK(have_channel_full == (get_channel_full(channel_id) != nullptr));
@ -13408,6 +13497,13 @@ void ContactsManager::on_update_channel_participant(ChannelId channel_id, UserId
return;
}
if (old_dialog_participant.dialog_id == DialogId(get_my_id()) && old_dialog_participant.status.is_administrator() &&
!new_dialog_participant.status.is_administrator()) {
channel_participants_.erase(channel_id);
} else if (have_channel_participant_cache(channel_id)) {
add_channel_participant_to_cache(channel_id, new_dialog_participant, true);
}
send_update_chat_member(DialogId(channel_id), user_id, date, invite_link, old_dialog_participant,
new_dialog_participant);
}
@ -14930,6 +15026,14 @@ DialogParticipant ContactsManager::get_channel_participant(ChannelId channel_id,
return DialogParticipant();
}
if (have_channel_participant_cache(channel_id)) {
auto *participant = get_channel_participant_from_cache(channel_id, participant_dialog_id);
if (participant != nullptr) {
promise.set_value(Unit());
return *participant;
}
}
if (!td_->auth_manager_->is_bot() && participant_dialog_id.get_type() == DialogType::User &&
is_user_bot(participant_dialog_id.get_user_id())) {
auto user_id = participant_dialog_id.get_user_id();
@ -14953,24 +15057,26 @@ DialogParticipant ContactsManager::get_channel_participant(ChannelId channel_id,
LOG(DEBUG) << "Get info about " << participant_dialog_id << " membership in the " << channel_id << " with random_id "
<< random_id;
auto on_result_promise = PromiseCreator::lambda([actor_id = actor_id(this), random_id, promise = std::move(promise)](
Result<DialogParticipant> r_dialog_participant) mutable {
send_closure(actor_id, &ContactsManager::on_get_channel_participant, random_id, std::move(r_dialog_participant),
std::move(promise));
});
auto on_result_promise =
PromiseCreator::lambda([actor_id = actor_id(this), channel_id, random_id,
promise = std::move(promise)](Result<DialogParticipant> r_dialog_participant) mutable {
send_closure(actor_id, &ContactsManager::on_get_channel_participant, channel_id, random_id,
std::move(r_dialog_participant), std::move(promise));
});
td_->create_handler<GetChannelParticipantQuery>(std::move(on_result_promise))
->send(channel_id, participant_dialog_id, std::move(input_peer));
return DialogParticipant();
}
void ContactsManager::on_get_channel_participant(int64 random_id, Result<DialogParticipant> r_dialog_participant,
void ContactsManager::on_get_channel_participant(ChannelId channel_id, int64 random_id,
Result<DialogParticipant> r_dialog_participant,
Promise<Unit> &&promise) {
if (G()->close_flag()) {
return promise.set_error(Status::Error(500, "Request aborted"));
}
LOG(INFO) << "Receive a member of a channel with random_id " << random_id;
LOG(INFO) << "Receive a member of a channel " << channel_id << " with random_id " << random_id;
auto it = received_channel_participant_.find(random_id);
CHECK(it != received_channel_participant_.end());
@ -14980,6 +15086,9 @@ void ContactsManager::on_get_channel_participant(int64 random_id, Result<DialogP
promise.set_error(r_dialog_participant.move_as_error());
} else {
it->second = r_dialog_participant.move_as_ok();
if (have_channel_participant_cache(channel_id)) {
add_channel_participant_to_cache(channel_id, it->second, false);
}
promise.set_value(Unit());
}
}

View File

@ -979,6 +979,8 @@ class ContactsManager : public Actor {
static constexpr size_t MAX_BIO_LENGTH = 70; // server side limit
static constexpr int32 MAX_GET_CHANNEL_PARTICIPANTS = 200; // server side limit
static constexpr int32 CHANNEL_PARTICIPANT_CACHE_TIME = 1800; // some reasonable limit
static constexpr int32 USER_FLAG_HAS_ACCESS_HASH = 1 << 0;
static constexpr int32 USER_FLAG_HAS_FIRST_NAME = 1 << 1;
static constexpr int32 USER_FLAG_HAS_LAST_NAME = 1 << 2;
@ -1493,7 +1495,7 @@ class ContactsManager : public Actor {
void delete_chat_participant(ChatId chat_id, UserId user_id, bool revoke_messages, Promise<Unit> &&promise);
void on_get_channel_participant(int64 random_id, Result<DialogParticipant> r_dialog_participant,
void on_get_channel_participant(ChannelId channel_id, int64 random_id, Result<DialogParticipant> r_dialog_participant,
Promise<Unit> &&promise);
void search_chat_participants(ChatId chat_id, const string &query, int32 limit, DialogParticipantsFilter filter,
@ -1511,6 +1513,14 @@ class ContactsManager : public Actor {
tl_object_ptr<telegram_api::channels_channelParticipants> &&channel_participants,
Promise<DialogParticipants> &&promise);
bool have_channel_participant_cache(ChannelId channel_id) const;
void add_channel_participant_to_cache(ChannelId channel_id, const DialogParticipant &dialog_participant,
bool allow_replace);
const DialogParticipant *get_channel_participant_from_cache(ChannelId channel_id,
DialogId participant_dialog_id) const;
void change_channel_participant_status_impl(ChannelId channel_id, DialogId participant_dialog_id,
DialogParticipantStatus status, DialogParticipantStatus old_status,
Promise<Unit> &&promise);
@ -1553,6 +1563,8 @@ class ContactsManager : public Actor {
static void on_invite_link_info_expire_timeout_callback(void *contacts_manager_ptr, int64 dialog_id_long);
static void on_channel_participant_cache_timeout_callback(void *contacts_manager_ptr, int64 channel_id_long);
void on_user_online_timeout(UserId user_id);
void on_channel_unban_timeout(ChannelId channel_id);
@ -1563,6 +1575,8 @@ class ContactsManager : public Actor {
void on_invite_link_info_expire_timeout(DialogId dialog_id);
void on_channel_participant_cache_timeout(ChannelId channel_id);
void tear_down() override;
Td *td_;
@ -1667,6 +1681,17 @@ class ContactsManager : public Actor {
std::unordered_map<ChannelId, vector<DialogParticipant>, ChannelIdHash> cached_channel_participants_;
// bot-administrators only
struct ChannelParticipantInfo {
DialogParticipant participant_;
mutable int32 last_access_date_ = 0;
};
struct ChannelParticipants {
std::unordered_map<DialogId, ChannelParticipantInfo, DialogIdHash> participants_;
};
std::unordered_map<ChannelId, ChannelParticipants, ChannelIdHash> channel_participants_;
bool are_contacts_loaded_ = false;
int32 next_contacts_sync_date_ = 0;
Hints contacts_hints_; // search contacts by first name, last name and username
@ -1710,6 +1735,7 @@ class ContactsManager : public Actor {
MultiTimeout user_nearby_timeout_{"UserNearbyTimeout"};
MultiTimeout slow_mode_delay_timeout_{"SlowModeDelayTimeout"};
MultiTimeout invite_link_info_expire_timeout_{"InviteLinkInfoExpireTimeout"};
MultiTimeout channel_participant_cache_timeout_{"ChannelParticipantCacheTimeout"};
};
} // namespace td