Move channel participant cache to DialogParticipantManager.

This commit is contained in:
levlam 2024-01-08 18:03:04 +03:00
parent bba173ea7d
commit 50b5a6e9bf
4 changed files with 165 additions and 162 deletions

View File

@ -2399,7 +2399,7 @@ class EditChannelAdminQuery final : public Td::ResultHandler {
LOG(INFO) << "Receive result for EditChannelAdminQuery: " << to_string(ptr);
td_->contacts_manager_->invalidate_channel_full(channel_id_, false, "EditChannelAdminQuery");
td_->updates_manager_->on_get_updates(std::move(ptr), std::move(promise_));
td_->contacts_manager_->on_set_channel_participant_status(channel_id_, DialogId(user_id_), status_);
td_->dialog_participant_manager_->on_set_channel_participant_status(channel_id_, DialogId(user_id_), status_);
}
void on_error(Status status) final {
@ -2445,7 +2445,7 @@ class EditChannelBannedQuery final : public Td::ResultHandler {
LOG(INFO) << "Receive result for EditChannelBannedQuery: " << to_string(ptr);
td_->contacts_manager_->invalidate_channel_full(channel_id_, false, "EditChannelBannedQuery");
td_->updates_manager_->on_get_updates(std::move(ptr), std::move(promise_));
td_->contacts_manager_->on_set_channel_participant_status(channel_id_, participant_dialog_id_, status_);
td_->dialog_participant_manager_->on_set_channel_participant_status(channel_id_, participant_dialog_id_, status_);
}
void on_error(Status status) final {
@ -3233,9 +3233,6 @@ ContactsManager::ContactsManager(Td *td, ActorShared<> parent) : td_(td), parent
slow_mode_delay_timeout_.set_callback(on_slow_mode_delay_timeout_callback);
slow_mode_delay_timeout_.set_callback_data(static_cast<void *>(this));
channel_participant_cache_timeout_.set_callback(on_channel_participant_cache_timeout_callback);
channel_participant_cache_timeout_.set_callback_data(static_cast<void *>(this));
get_user_queries_.set_merge_function([this](vector<int64> query_ids, Promise<Unit> &&promise) {
TRY_STATUS_PROMISE(promise, G()->close_status());
auto input_users = transform(query_ids, [this](int64 query_id) { return get_input_user_force(UserId(query_id)); });
@ -3267,7 +3264,7 @@ ContactsManager::~ContactsManager() {
G()->get_gc_scheduler_id(), loaded_from_database_users_, unavailable_user_fulls_, loaded_from_database_chats_,
unavailable_chat_fulls_, loaded_from_database_channels_, unavailable_channel_fulls_,
loaded_from_database_secret_chats_, user_online_member_dialogs_, cached_channel_participants_,
resolved_phone_numbers_, channel_participants_, all_imported_contacts_, linked_channel_ids_, restricted_user_ids_,
resolved_phone_numbers_, all_imported_contacts_, linked_channel_ids_, restricted_user_ids_,
restricted_channel_ids_);
}
@ -3456,38 +3453,6 @@ void ContactsManager::on_slow_mode_delay_timeout(ChannelId channel_id) {
on_update_channel_slow_mode_next_send_date(channel_id, 0);
}
void ContactsManager::on_channel_participant_cache_timeout_callback(void *contacts_manager_ptr, int64 channel_id_long) {
if (G()->close_flag()) {
return;
}
auto contacts_manager = static_cast<ContactsManager *>(contacts_manager_ptr);
send_closure_later(contacts_manager->actor_id(contacts_manager),
&ContactsManager::on_channel_participant_cache_timeout, ChannelId(channel_id_long));
}
void ContactsManager::on_channel_participant_cache_timeout(ChannelId channel_id) {
if (G()->close_flag()) {
return;
}
auto channel_participants_it = channel_participants_.find(channel_id);
if (channel_participants_it == channel_participants_.end()) {
return;
}
auto &participants = channel_participants_it->second.participants_;
auto min_access_date = G()->unix_time() - CHANNEL_PARTICIPANT_CACHE_TIME;
table_remove_if(participants,
[min_access_date](const auto &it) { return it.second.last_access_date_ < min_access_date; });
if (participants.empty()) {
channel_participants_.erase(channel_participants_it);
} else {
channel_participant_cache_timeout_.set_timeout_in(channel_id.get(), CHANNEL_PARTICIPANT_CACHE_TIME);
}
}
template <class StorerT>
void ContactsManager::User::store(StorerT &storer) const {
using td::store;
@ -8651,18 +8616,6 @@ void ContactsManager::restrict_channel_participant(ChannelId channel_id, DialogI
->send(channel_id, participant_dialog_id, std::move(input_peer), new_status);
}
void ContactsManager::on_set_channel_participant_status(ChannelId channel_id, DialogId participant_dialog_id,
DialogParticipantStatus status) {
if (G()->close_flag() || participant_dialog_id == DialogId(get_my_id())) {
return;
}
status.update_restrictions();
if (have_channel_participant_cache(channel_id)) {
update_channel_participant_status_cache(channel_id, participant_dialog_id, std::move(status));
}
}
ChannelId ContactsManager::migrate_chat_to_megagroup(ChatId chat_id, Promise<Unit> &promise) {
auto c = get_chat(chat_id);
if (c == nullptr) {
@ -14349,9 +14302,9 @@ void ContactsManager::on_get_channel_participants(
on_update_channel_bot_user_ids(channel_id, std::move(bot_user_ids));
}
}
if (have_channel_participant_cache(channel_id)) {
if (td_->dialog_participant_manager_->have_channel_participant_cache(channel_id)) {
for (const auto &participant : result) {
add_channel_participant_to_cache(channel_id, participant, false);
td_->dialog_participant_manager_->add_channel_participant_to_cache(channel_id, participant, false);
}
}
@ -14417,73 +14370,6 @@ void ContactsManager::on_get_channel_participants(
promise.set_value(DialogParticipants{total_count, std::move(result)});
}
bool ContactsManager::have_channel_participant_cache(ChannelId channel_id) const {
if (!td_->auth_manager_->is_bot()) {
return false;
}
auto c = get_channel(channel_id);
return c != nullptr && c->status.is_administrator();
}
void ContactsManager::add_channel_participant_to_cache(ChannelId channel_id,
const DialogParticipant &dialog_participant,
bool allow_replace) {
CHECK(channel_id.is_valid());
CHECK(dialog_participant.is_valid());
auto &participants = channel_participants_[channel_id];
if (participants.participants_.empty()) {
channel_participant_cache_timeout_.set_timeout_in(channel_id.get(), CHANNEL_PARTICIPANT_CACHE_TIME);
}
auto &participant_info = participants.participants_[dialog_participant.dialog_id_];
if (participant_info.last_access_date_ > 0 && !allow_replace) {
return;
}
participant_info.participant_ = dialog_participant;
participant_info.last_access_date_ = G()->unix_time();
}
void ContactsManager::update_channel_participant_status_cache(ChannelId channel_id, DialogId participant_dialog_id,
DialogParticipantStatus &&dialog_participant_status) {
CHECK(channel_id.is_valid());
CHECK(participant_dialog_id.is_valid());
auto channel_participants_it = channel_participants_.find(channel_id);
if (channel_participants_it == channel_participants_.end()) {
return;
}
auto &participants = channel_participants_it->second;
auto it = participants.participants_.find(participant_dialog_id);
if (it == participants.participants_.end()) {
return;
}
auto &participant_info = it->second;
LOG(INFO) << "Update cached status of " << participant_dialog_id << " in " << channel_id << " from "
<< participant_info.participant_.status_ << " to " << dialog_participant_status;
participant_info.participant_.status_ = std::move(dialog_participant_status);
participant_info.last_access_date_ = G()->unix_time();
}
void ContactsManager::drop_channel_participant_cache(ChannelId channel_id) {
channel_participants_.erase(channel_id);
}
const DialogParticipant *ContactsManager::get_channel_participant_from_cache(ChannelId channel_id,
DialogId participant_dialog_id) {
auto channel_participants_it = channel_participants_.find(channel_id);
if (channel_participants_it == channel_participants_.end()) {
return nullptr;
}
auto &participants = channel_participants_it->second.participants_;
CHECK(!participants.empty());
auto it = participants.find(participant_dialog_id);
if (it != participants.end()) {
it->second.participant_.status_.update_restrictions();
it->second.last_access_date_ = G()->unix_time();
return &it->second.participant_;
}
return nullptr;
}
bool ContactsManager::speculative_add_count(int32 &count, int32 delta_count, int32 min_count) {
auto new_count = count + delta_count;
if (new_count < min_count) {
@ -15817,7 +15703,7 @@ void ContactsManager::on_channel_status_changed(Channel *c, ChannelId channel_id
}
bool is_bot = td_->auth_manager_->is_bot();
if (is_bot && old_status.is_administrator() && !new_status.is_administrator()) {
channel_participants_.erase(channel_id);
td_->dialog_participant_manager_->drop_channel_participant_cache(channel_id);
}
if (is_bot && old_status.is_member() && !new_status.is_member() && !G()->use_message_database()) {
send_closure_later(G()->messages_manager(), &MessagesManager::on_dialog_deleted, DialogId(channel_id),

View File

@ -268,15 +268,6 @@ class ContactsManager final : public Actor {
void on_update_channel_default_permissions(ChannelId channel_id, RestrictedRights default_permissions);
void on_update_channel_administrator_count(ChannelId channel_id, int32 administrator_count);
bool have_channel_participant_cache(ChannelId channel_id) const;
void add_channel_participant_to_cache(ChannelId channel_id, const DialogParticipant &dialog_participant,
bool allow_replace);
const DialogParticipant *get_channel_participant_from_cache(ChannelId channel_id, DialogId participant_dialog_id);
void drop_channel_participant_cache(ChannelId channel_id);
int32 on_update_peer_located(vector<tl_object_ptr<telegram_api::PeerLocated>> &&peers, bool from_update);
void on_update_bot_commands(DialogId dialog_id, UserId bot_user_id,
@ -669,9 +660,6 @@ class ContactsManager final : public Actor {
void ban_dialog_participant(DialogId dialog_id, DialogId participant_dialog_id, int32 banned_until_date,
bool revoke_messages, Promise<Unit> &&promise);
void on_set_channel_participant_status(ChannelId channel_id, DialogId participant_dialog_id,
DialogParticipantStatus status);
void get_chat_participant(ChatId chat_id, UserId user_id, Promise<DialogParticipant> &&promise);
void search_dialog_participants(DialogId dialog_id, const string &query, int32 limit, DialogParticipantFilter filter,
@ -1186,7 +1174,6 @@ class ContactsManager final : public Actor {
static constexpr size_t MAX_DESCRIPTION_LENGTH = 255; // server side limit for chat/channel description
static constexpr int32 MAX_GET_CHANNEL_PARTICIPANTS = 200; // server side limit
static constexpr int32 CHANNEL_PARTICIPANT_CACHE_TIME = 1800; // some reasonable limit
static constexpr int32 MAX_ACTIVE_STORY_ID_RELOAD_TIME = 3600; // some reasonable limit
static constexpr int32 CHANNEL_RECOMMENDATIONS_CACHE_TIME = 86400; // some reasonable limit
@ -1833,9 +1820,6 @@ class ContactsManager final : public Actor {
tl_object_ptr<telegram_api::channels_channelParticipants> &&channel_participants,
Promise<DialogParticipants> &&promise);
void update_channel_participant_status_cache(ChannelId channel_id, DialogId participant_dialog_id,
DialogParticipantStatus &&dialog_participant_status);
void set_chat_participant_status(ChatId chat_id, UserId user_id, DialogParticipantStatus status,
Promise<Unit> &&promise);
@ -1874,8 +1858,6 @@ class ContactsManager final : public Actor {
static void on_slow_mode_delay_timeout_callback(void *contacts_manager_ptr, int64 channel_id_long);
static void on_channel_participant_cache_timeout_callback(void *contacts_manager_ptr, int64 channel_id_long);
void on_user_online_timeout(UserId user_id);
void on_user_emoji_status_timeout(UserId user_id);
@ -1888,8 +1870,6 @@ class ContactsManager final : public Actor {
void on_slow_mode_delay_timeout(ChannelId channel_id);
void on_channel_participant_cache_timeout(ChannelId channel_id);
void start_up() final;
void tear_down() final;
@ -2019,17 +1999,6 @@ class ContactsManager final : public Actor {
FlatHashMap<UserId, FlatHashSet<MessageFullId, MessageFullIdHash>, UserIdHash> user_messages_;
FlatHashMap<ChannelId, FlatHashSet<MessageFullId, MessageFullIdHash>, ChannelIdHash> channel_messages_;
// bot-administrators only
struct ChannelParticipantInfo {
DialogParticipant participant_;
int32 last_access_date_ = 0;
};
struct ChannelParticipants {
FlatHashMap<DialogId, ChannelParticipantInfo, DialogIdHash> participants_;
};
FlatHashMap<ChannelId, ChannelParticipants, ChannelIdHash> channel_participants_;
bool are_contacts_loaded_ = false;
int32 next_contacts_sync_date_ = 0;
Hints contacts_hints_; // search contacts by first name, last name and usernames
@ -2074,7 +2043,6 @@ class ContactsManager final : public Actor {
MultiTimeout channel_unban_timeout_{"ChannelUnbanTimeout"};
MultiTimeout user_nearby_timeout_{"UserNearbyTimeout"};
MultiTimeout slow_mode_delay_timeout_{"SlowModeDelayTimeout"};
MultiTimeout channel_participant_cache_timeout_{"ChannelParticipantCacheTimeout"};
};
} // namespace td

View File

@ -368,10 +368,14 @@ class GetChannelParticipantQuery final : public Td::ResultHandler {
DialogParticipantManager::DialogParticipantManager(Td *td, ActorShared<> parent) : td_(td), parent_(std::move(parent)) {
update_dialog_online_member_count_timeout_.set_callback(on_update_dialog_online_member_count_timeout_callback);
update_dialog_online_member_count_timeout_.set_callback_data(static_cast<void *>(this));
channel_participant_cache_timeout_.set_callback(on_channel_participant_cache_timeout_callback);
channel_participant_cache_timeout_.set_callback_data(static_cast<void *>(this));
}
DialogParticipantManager::~DialogParticipantManager() {
Scheduler::instance()->destroy_on_scheduler(G()->get_gc_scheduler_id(), dialog_administrators_);
Scheduler::instance()->destroy_on_scheduler(G()->get_gc_scheduler_id(), dialog_administrators_,
channel_participants_);
}
void DialogParticipantManager::tear_down() {
@ -972,9 +976,9 @@ void DialogParticipantManager::on_update_channel_participant(
if (old_dialog_participant.dialog_id_ == td_->dialog_manager_->get_my_dialog_id() &&
old_dialog_participant.status_.is_administrator() && !new_dialog_participant.status_.is_administrator()) {
td_->contacts_manager_->drop_channel_participant_cache(channel_id);
} else if (td_->contacts_manager_->have_channel_participant_cache(channel_id)) {
td_->contacts_manager_->add_channel_participant_to_cache(channel_id, new_dialog_participant, true);
drop_channel_participant_cache(channel_id);
} else if (have_channel_participant_cache(channel_id)) {
add_channel_participant_to_cache(channel_id, new_dialog_participant, true);
}
auto channel_status = td_->contacts_manager_->get_channel_status(channel_id);
@ -1093,8 +1097,8 @@ void DialogParticipantManager::get_channel_participant(ChannelId channel_id, Dia
return promise.set_error(Status::Error(400, "Member not found"));
}
if (td_->contacts_manager_->have_channel_participant_cache(channel_id)) {
auto *participant = td_->contacts_manager_->get_channel_participant_from_cache(channel_id, participant_dialog_id);
if (have_channel_participant_cache(channel_id)) {
auto *participant = get_channel_participant_from_cache(channel_id, participant_dialog_id);
if (participant != nullptr) {
return promise.set_value(DialogParticipant{*participant});
}
@ -1121,10 +1125,121 @@ void DialogParticipantManager::finish_get_channel_participant(ChannelId channel_
LOG(INFO) << "Receive " << dialog_participant.dialog_id_ << " as a member of a channel " << channel_id;
dialog_participant.status_.update_restrictions();
if (td_->contacts_manager_->have_channel_participant_cache(channel_id)) {
td_->contacts_manager_->add_channel_participant_to_cache(channel_id, dialog_participant, false);
if (have_channel_participant_cache(channel_id)) {
add_channel_participant_to_cache(channel_id, dialog_participant, false);
}
promise.set_value(std::move(dialog_participant));
}
void DialogParticipantManager::on_set_channel_participant_status(ChannelId channel_id, DialogId participant_dialog_id,
DialogParticipantStatus status) {
if (G()->close_flag() || participant_dialog_id == td_->dialog_manager_->get_my_dialog_id()) {
return;
}
status.update_restrictions();
if (have_channel_participant_cache(channel_id)) {
update_channel_participant_status_cache(channel_id, participant_dialog_id, std::move(status));
}
}
void DialogParticipantManager::on_channel_participant_cache_timeout_callback(void *dialog_participant_manager_ptr,
int64 channel_id_long) {
if (G()->close_flag()) {
return;
}
auto dialog_participant_manager = static_cast<DialogParticipantManager *>(dialog_participant_manager_ptr);
send_closure_later(dialog_participant_manager->actor_id(dialog_participant_manager),
&DialogParticipantManager::on_channel_participant_cache_timeout, ChannelId(channel_id_long));
}
void DialogParticipantManager::on_channel_participant_cache_timeout(ChannelId channel_id) {
if (G()->close_flag()) {
return;
}
auto channel_participants_it = channel_participants_.find(channel_id);
if (channel_participants_it == channel_participants_.end()) {
return;
}
auto &participants = channel_participants_it->second.participants_;
auto min_access_date = G()->unix_time() - CHANNEL_PARTICIPANT_CACHE_TIME;
table_remove_if(participants,
[min_access_date](const auto &it) { return it.second.last_access_date_ < min_access_date; });
if (participants.empty()) {
channel_participants_.erase(channel_participants_it);
} else {
channel_participant_cache_timeout_.set_timeout_in(channel_id.get(), CHANNEL_PARTICIPANT_CACHE_TIME);
}
}
bool DialogParticipantManager::have_channel_participant_cache(ChannelId channel_id) const {
if (!td_->auth_manager_->is_bot()) {
return false;
}
return td_->contacts_manager_->get_channel_status(channel_id).is_administrator();
}
void DialogParticipantManager::add_channel_participant_to_cache(ChannelId channel_id,
const DialogParticipant &dialog_participant,
bool allow_replace) {
CHECK(channel_id.is_valid());
CHECK(dialog_participant.is_valid());
auto &participants = channel_participants_[channel_id];
if (participants.participants_.empty()) {
channel_participant_cache_timeout_.set_timeout_in(channel_id.get(), CHANNEL_PARTICIPANT_CACHE_TIME);
}
auto &participant_info = participants.participants_[dialog_participant.dialog_id_];
if (participant_info.last_access_date_ > 0 && !allow_replace) {
return;
}
participant_info.participant_ = dialog_participant;
participant_info.last_access_date_ = G()->unix_time();
}
void DialogParticipantManager::update_channel_participant_status_cache(
ChannelId channel_id, DialogId participant_dialog_id, DialogParticipantStatus &&dialog_participant_status) {
CHECK(channel_id.is_valid());
CHECK(participant_dialog_id.is_valid());
auto channel_participants_it = channel_participants_.find(channel_id);
if (channel_participants_it == channel_participants_.end()) {
return;
}
auto &participants = channel_participants_it->second;
auto it = participants.participants_.find(participant_dialog_id);
if (it == participants.participants_.end()) {
return;
}
auto &participant_info = it->second;
LOG(INFO) << "Update cached status of " << participant_dialog_id << " in " << channel_id << " from "
<< participant_info.participant_.status_ << " to " << dialog_participant_status;
participant_info.participant_.status_ = std::move(dialog_participant_status);
participant_info.last_access_date_ = G()->unix_time();
}
void DialogParticipantManager::drop_channel_participant_cache(ChannelId channel_id) {
channel_participants_.erase(channel_id);
}
const DialogParticipant *DialogParticipantManager::get_channel_participant_from_cache(ChannelId channel_id,
DialogId participant_dialog_id) {
auto channel_participants_it = channel_participants_.find(channel_id);
if (channel_participants_it == channel_participants_.end()) {
return nullptr;
}
auto &participants = channel_participants_it->second.participants_;
CHECK(!participants.empty());
auto it = participants.find(participant_dialog_id);
if (it != participants.end()) {
it->second.participant_.status_.update_restrictions();
it->second.last_access_date_ = G()->unix_time();
return &it->second.participant_;
}
return nullptr;
}
} // namespace td

View File

@ -85,6 +85,16 @@ class DialogParticipantManager final : public Actor {
void get_channel_participant(ChannelId channel_id, DialogId participant_dialog_id,
Promise<DialogParticipant> &&promise);
void on_set_channel_participant_status(ChannelId channel_id, DialogId participant_dialog_id,
DialogParticipantStatus status);
bool have_channel_participant_cache(ChannelId channel_id) const;
void add_channel_participant_to_cache(ChannelId channel_id, const DialogParticipant &dialog_participant,
bool allow_replace);
void drop_channel_participant_cache(ChannelId channel_id);
void get_current_state(vector<td_api::object_ptr<td_api::Update>> &updates) const;
private:
@ -92,6 +102,8 @@ class DialogParticipantManager final : public Actor {
static constexpr int32 ONLINE_MEMBER_COUNT_UPDATE_TIME = 5 * 60;
static constexpr int32 CHANNEL_PARTICIPANT_CACHE_TIME = 1800; // some reasonable limit
static void on_update_dialog_online_member_count_timeout_callback(void *dialog_participant_manager_ptr,
int64 dialog_id_int);
@ -133,6 +145,16 @@ class DialogParticipantManager final : public Actor {
void finish_get_channel_participant(ChannelId channel_id, DialogParticipant &&dialog_participant,
Promise<DialogParticipant> &&promise);
void update_channel_participant_status_cache(ChannelId channel_id, DialogId participant_dialog_id,
DialogParticipantStatus &&dialog_participant_status);
const DialogParticipant *get_channel_participant_from_cache(ChannelId channel_id, DialogId participant_dialog_id);
static void on_channel_participant_cache_timeout_callback(void *dialog_participant_manager_ptr,
int64 channel_id_long);
void on_channel_participant_cache_timeout(ChannelId channel_id);
struct OnlineMemberCountInfo {
int32 online_member_count = 0;
double update_time = 0;
@ -140,10 +162,22 @@ class DialogParticipantManager final : public Actor {
};
FlatHashMap<DialogId, OnlineMemberCountInfo, DialogIdHash> dialog_online_member_counts_;
MultiTimeout update_dialog_online_member_count_timeout_{"UpdateDialogOnlineMemberCountTimeout"};
FlatHashMap<DialogId, vector<DialogAdministrator>, DialogIdHash> dialog_administrators_;
// bot-administrators only
struct ChannelParticipantInfo {
DialogParticipant participant_;
int32 last_access_date_ = 0;
};
struct ChannelParticipants {
FlatHashMap<DialogId, ChannelParticipantInfo, DialogIdHash> participants_;
};
FlatHashMap<ChannelId, ChannelParticipants, ChannelIdHash> channel_participants_;
MultiTimeout update_dialog_online_member_count_timeout_{"UpdateDialogOnlineMemberCountTimeout"};
MultiTimeout channel_participant_cache_timeout_{"ChannelParticipantCacheTimeout"};
Td *td_;
ActorShared<> parent_;
};