tdlight/td/telegram/TopDialogManager.cpp

653 lines
22 KiB
C++

//
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2020
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include "td/telegram/TopDialogManager.h"
#include "td/telegram/AuthManager.h"
#include "td/telegram/ConfigShared.h"
#include "td/telegram/ContactsManager.h"
#include "td/telegram/DialogId.h"
#include "td/telegram/Global.h"
#include "td/telegram/logevent/LogEvent.h"
#include "td/telegram/MessagesManager.h"
#include "td/telegram/misc.h"
#include "td/telegram/net/NetQuery.h"
#include "td/telegram/net/NetQueryDispatcher.h"
#include "td/telegram/StateManager.h"
#include "td/telegram/Td.h"
#include "td/telegram/TdDb.h"
#include "td/utils/logging.h"
#include "td/utils/misc.h"
#include "td/utils/port/Clocks.h"
#include "td/utils/ScopeGuard.h"
#include "td/utils/Slice.h"
#include "td/utils/Status.h"
#include "td/utils/tl_helpers.h"
#include "td/telegram/telegram_api.h"
#include <algorithm>
#include <cmath>
#include <iterator>
namespace td {
static CSlice top_dialog_category_name(TopDialogCategory category) {
switch (category) {
case TopDialogCategory::Correspondent:
return CSlice("correspondent");
case TopDialogCategory::BotPM:
return CSlice("bot_pm");
case TopDialogCategory::BotInline:
return CSlice("bot_inline");
case TopDialogCategory::Group:
return CSlice("group");
case TopDialogCategory::Channel:
return CSlice("channel");
case TopDialogCategory::Call:
return CSlice("call");
case TopDialogCategory::ForwardUsers:
return CSlice("forward_users");
case TopDialogCategory::ForwardChats:
return CSlice("forward_chats");
default:
UNREACHABLE();
}
}
static TopDialogCategory get_top_dialog_category(const telegram_api::TopPeerCategory &category) {
switch (category.get_id()) {
case telegram_api::topPeerCategoryCorrespondents::ID:
return TopDialogCategory::Correspondent;
case telegram_api::topPeerCategoryBotsPM::ID:
return TopDialogCategory::BotPM;
case telegram_api::topPeerCategoryBotsInline::ID:
return TopDialogCategory::BotInline;
case telegram_api::topPeerCategoryGroups::ID:
return TopDialogCategory::Group;
case telegram_api::topPeerCategoryChannels::ID:
return TopDialogCategory::Channel;
case telegram_api::topPeerCategoryPhoneCalls::ID:
return TopDialogCategory::Call;
case telegram_api::topPeerCategoryForwardUsers::ID:
return TopDialogCategory::ForwardUsers;
case telegram_api::topPeerCategoryForwardChats::ID:
return TopDialogCategory::ForwardChats;
default:
UNREACHABLE();
}
}
static tl_object_ptr<telegram_api::TopPeerCategory> get_top_peer_category(TopDialogCategory category) {
switch (category) {
case TopDialogCategory::Correspondent:
return make_tl_object<telegram_api::topPeerCategoryCorrespondents>();
case TopDialogCategory::BotPM:
return make_tl_object<telegram_api::topPeerCategoryBotsPM>();
case TopDialogCategory::BotInline:
return make_tl_object<telegram_api::topPeerCategoryBotsInline>();
case TopDialogCategory::Group:
return make_tl_object<telegram_api::topPeerCategoryGroups>();
case TopDialogCategory::Channel:
return make_tl_object<telegram_api::topPeerCategoryChannels>();
case TopDialogCategory::Call:
return make_tl_object<telegram_api::topPeerCategoryPhoneCalls>();
case TopDialogCategory::ForwardUsers:
return make_tl_object<telegram_api::topPeerCategoryForwardUsers>();
case TopDialogCategory::ForwardChats:
return make_tl_object<telegram_api::topPeerCategoryForwardChats>();
default:
UNREACHABLE();
}
}
void TopDialogManager::update_is_enabled(bool is_enabled) {
auto auth_manager = G()->td().get_actor_unsafe()->auth_manager_.get();
if (auth_manager == nullptr || !auth_manager->is_authorized() || auth_manager->is_bot()) {
return;
}
if (set_is_enabled(is_enabled)) {
G()->td_db()->get_binlog_pmc()->set("top_peers_enabled", is_enabled ? "1" : "0");
send_toggle_top_peers(is_enabled);
loop();
}
}
bool TopDialogManager::set_is_enabled(bool is_enabled) {
if (is_enabled_ == is_enabled) {
return false;
}
LOG(DEBUG) << "Change top chats is_enabled to " << is_enabled;
is_enabled_ = is_enabled;
init();
return true;
}
void TopDialogManager::send_toggle_top_peers(bool is_enabled) {
if (have_toggle_top_peers_query_) {
have_pending_toggle_top_peers_query_ = true;
pending_toggle_top_peers_query_ = is_enabled;
return;
}
LOG(DEBUG) << "Send toggle top peers query to " << is_enabled;
have_toggle_top_peers_query_ = true;
toggle_top_peers_query_is_enabled_ = is_enabled;
auto net_query = G()->net_query_creator().create(telegram_api::contacts_toggleTopPeers(is_enabled));
G()->net_query_dispatcher().dispatch_with_callback(std::move(net_query), actor_shared(this, 2));
}
void TopDialogManager::on_dialog_used(TopDialogCategory category, DialogId dialog_id, int32 date) {
if (!is_active_ || !is_enabled_) {
return;
}
auto pos = static_cast<size_t>(category);
CHECK(pos < by_category_.size());
auto &top_dialogs = by_category_[pos];
top_dialogs.is_dirty = true;
auto it = std::find_if(top_dialogs.dialogs.begin(), top_dialogs.dialogs.end(),
[&](auto &top_dialog) { return top_dialog.dialog_id == dialog_id; });
if (it == top_dialogs.dialogs.end()) {
TopDialog top_dialog;
top_dialog.dialog_id = dialog_id;
top_dialogs.dialogs.push_back(top_dialog);
it = top_dialogs.dialogs.end() - 1;
}
auto delta = rating_add(date, top_dialogs.rating_timestamp);
it->rating += delta;
while (it != top_dialogs.dialogs.begin()) {
auto next = std::prev(it);
if (*next < *it) {
break;
}
std::swap(*next, *it);
it = next;
}
LOG(INFO) << "Update " << top_dialog_category_name(category) << " rating of " << dialog_id << " by " << delta;
if (!first_unsync_change_) {
first_unsync_change_ = Timestamp::now_cached();
}
loop();
}
void TopDialogManager::remove_dialog(TopDialogCategory category, DialogId dialog_id,
tl_object_ptr<telegram_api::InputPeer> input_peer) {
if (!is_active_ || !is_enabled_) {
return;
}
CHECK(dialog_id.is_valid());
if (category == TopDialogCategory::ForwardUsers && dialog_id.get_type() != DialogType::User) {
category = TopDialogCategory::ForwardChats;
}
auto pos = static_cast<size_t>(category);
CHECK(pos < by_category_.size());
auto &top_dialogs = by_category_[pos];
LOG(INFO) << "Remove " << top_dialog_category_name(category) << " rating of " << dialog_id;
if (input_peer != nullptr) {
auto query = telegram_api::contacts_resetTopPeerRating(get_top_peer_category(category), std::move(input_peer));
auto net_query = G()->net_query_creator().create(query);
G()->net_query_dispatcher().dispatch_with_callback(std::move(net_query), actor_shared(this, 1));
}
auto it = std::find_if(top_dialogs.dialogs.begin(), top_dialogs.dialogs.end(),
[&](auto &top_dialog) { return top_dialog.dialog_id == dialog_id; });
if (it == top_dialogs.dialogs.end()) {
return;
}
top_dialogs.is_dirty = true;
top_dialogs.dialogs.erase(it);
if (!first_unsync_change_) {
first_unsync_change_ = Timestamp::now_cached();
}
loop();
}
void TopDialogManager::get_top_dialogs(TopDialogCategory category, size_t limit, Promise<vector<DialogId>> promise) {
if (!is_active_) {
promise.set_error(Status::Error(400, "Not supported without chat info database"));
return;
}
if (!is_enabled_) {
promise.set_error(Status::Error(400, "Top chats computation is disabled"));
return;
}
GetTopDialogsQuery query;
query.category = category;
query.limit = limit;
query.promise = std::move(promise);
pending_get_top_dialogs_.push_back(std::move(query));
loop();
}
void TopDialogManager::update_rating_e_decay() {
if (!is_active_) {
return;
}
rating_e_decay_ = G()->shared_config().get_option_integer("rating_e_decay", rating_e_decay_);
}
template <class StorerT>
void store(const TopDialogManager::TopDialog &top_dialog, StorerT &storer) {
using ::td::store;
store(top_dialog.dialog_id, storer);
store(top_dialog.rating, storer);
}
template <class ParserT>
void parse(TopDialogManager::TopDialog &top_dialog, ParserT &parser) {
using ::td::parse;
parse(top_dialog.dialog_id, parser);
parse(top_dialog.rating, parser);
}
template <class StorerT>
void store(const TopDialogManager::TopDialogs &top_dialogs, StorerT &storer) {
using ::td::store;
store(top_dialogs.rating_timestamp, storer);
store(top_dialogs.dialogs, storer);
}
template <class ParserT>
void parse(TopDialogManager::TopDialogs &top_dialogs, ParserT &parser) {
using ::td::parse;
parse(top_dialogs.rating_timestamp, parser);
parse(top_dialogs.dialogs, parser);
}
double TopDialogManager::rating_add(double now, double rating_timestamp) const {
return std::exp((now - rating_timestamp) / rating_e_decay_);
}
double TopDialogManager::current_rating_add(double rating_timestamp) const {
return rating_add(G()->server_time_cached(), rating_timestamp);
}
void TopDialogManager::normalize_rating() {
for (auto &top_dialogs : by_category_) {
auto div_by = current_rating_add(top_dialogs.rating_timestamp);
top_dialogs.rating_timestamp = G()->server_time_cached();
for (auto &dialog : top_dialogs.dialogs) {
dialog.rating /= div_by;
}
top_dialogs.is_dirty = true;
}
db_sync_state_ = SyncState::None;
}
void TopDialogManager::do_get_top_dialogs(GetTopDialogsQuery &&query) {
vector<DialogId> dialog_ids;
if (query.category != TopDialogCategory::ForwardUsers) {
auto pos = static_cast<size_t>(query.category);
CHECK(pos < by_category_.size());
dialog_ids = transform(by_category_[pos].dialogs, [](const auto &x) { return x.dialog_id; });
} else {
// merge ForwardUsers and ForwardChats
auto &users = by_category_[static_cast<size_t>(TopDialogCategory::ForwardUsers)];
auto &chats = by_category_[static_cast<size_t>(TopDialogCategory::ForwardChats)];
size_t users_pos = 0;
size_t chats_pos = 0;
while (users_pos < users.dialogs.size() || chats_pos < chats.dialogs.size()) {
if (chats_pos == chats.dialogs.size() ||
(users_pos < users.dialogs.size() && users.dialogs[users_pos] < chats.dialogs[chats_pos])) {
dialog_ids.push_back(users.dialogs[users_pos++].dialog_id);
} else {
dialog_ids.push_back(chats.dialogs[chats_pos++].dialog_id);
}
}
}
auto limit = std::min({query.limit, MAX_TOP_DIALOGS_LIMIT, dialog_ids.size()});
auto promise = PromiseCreator::lambda([query = std::move(query), dialog_ids, limit](Result<Unit>) mutable {
vector<DialogId> result;
result.reserve(limit);
for (auto dialog_id : dialog_ids) {
if (dialog_id.get_type() == DialogType::User) {
auto user_id = dialog_id.get_user_id();
if (G()->td().get_actor_unsafe()->contacts_manager_->is_user_deleted(user_id)) {
LOG(INFO) << "Skip deleted " << user_id;
continue;
}
if (G()->td().get_actor_unsafe()->contacts_manager_->get_my_id() == user_id) {
LOG(INFO) << "Skip self " << user_id;
continue;
}
if (query.category == TopDialogCategory::BotInline || query.category == TopDialogCategory::BotPM) {
auto r_bot_info = G()->td().get_actor_unsafe()->contacts_manager_->get_bot_data(user_id);
if (r_bot_info.is_error()) {
LOG(INFO) << "Skip not a bot " << user_id;
continue;
}
if (query.category == TopDialogCategory::BotInline &&
(r_bot_info.ok().username.empty() || !r_bot_info.ok().is_inline)) {
LOG(INFO) << "Skip not inline bot " << user_id;
continue;
}
}
}
result.push_back(dialog_id);
if (result.size() == limit) {
break;
}
}
query.promise.set_value(std::move(result));
});
send_closure(G()->messages_manager(), &MessagesManager::load_dialogs, std::move(dialog_ids), std::move(promise));
}
void TopDialogManager::do_get_top_peers() {
LOG(INFO) << "Send get top peers request";
using telegram_api::contacts_getTopPeers;
std::vector<uint32> ids;
for (auto &category : by_category_) {
for (auto &top_dialog : category.dialogs) {
auto dialog_id = top_dialog.dialog_id;
switch (dialog_id.get_type()) {
case DialogType::Channel:
ids.push_back(dialog_id.get_channel_id().get());
break;
case DialogType::User:
ids.push_back(dialog_id.get_user_id().get());
break;
case DialogType::Chat:
ids.push_back(dialog_id.get_chat_id().get());
break;
default:
break;
}
}
}
int32 hash = get_vector_hash(ids);
int32 flags = contacts_getTopPeers::CORRESPONDENTS_MASK | contacts_getTopPeers::BOTS_PM_MASK |
contacts_getTopPeers::BOTS_INLINE_MASK | contacts_getTopPeers::GROUPS_MASK |
contacts_getTopPeers::CHANNELS_MASK | contacts_getTopPeers::PHONE_CALLS_MASK |
contacts_getTopPeers::FORWARD_USERS_MASK | contacts_getTopPeers::FORWARD_CHATS_MASK;
contacts_getTopPeers query{flags,
true /*correspondents*/,
true /*bot_pm*/,
true /*bot_inline */,
true /*phone_calls*/,
true /*groups*/,
true /*channels*/,
true /*forward_users*/,
true /*forward_chats*/,
0 /*offset*/,
100 /*limit*/,
hash};
auto net_query = G()->net_query_creator().create(query);
G()->net_query_dispatcher().dispatch_with_callback(std::move(net_query), actor_shared(this));
}
void TopDialogManager::on_result(NetQueryPtr net_query) {
auto query_type = get_link_token();
if (query_type == 2) { // toggleTopPeers
CHECK(have_toggle_top_peers_query_);
have_toggle_top_peers_query_ = false;
if (have_pending_toggle_top_peers_query_) {
have_pending_toggle_top_peers_query_ = false;
if (pending_toggle_top_peers_query_ != toggle_top_peers_query_is_enabled_) {
send_toggle_top_peers(pending_toggle_top_peers_query_);
return;
}
}
auto r_result = fetch_result<telegram_api::contacts_toggleTopPeers>(std::move(net_query));
if (r_result.is_ok()) {
// everything is synchronized
G()->td_db()->get_binlog_pmc()->erase("top_peers_enabled");
} else {
// let's resend the query forever
if (!G()->close_flag()) {
send_toggle_top_peers(toggle_top_peers_query_is_enabled_);
}
}
return;
}
if (query_type == 1) { // resetTopPeerRating
// ignore result
return;
}
SCOPE_EXIT {
loop();
};
normalize_rating(); // once a day too
auto r_top_peers = fetch_result<telegram_api::contacts_getTopPeers>(std::move(net_query));
if (r_top_peers.is_error()) {
last_server_sync_ = Timestamp::in(SERVER_SYNC_RESEND_DELAY - SERVER_SYNC_DELAY);
return;
}
last_server_sync_ = Timestamp::now();
server_sync_state_ = SyncState::Ok;
SCOPE_EXIT {
G()->td_db()->get_binlog_pmc()->set("top_dialogs_ts", to_string(static_cast<uint32>(Clocks::system())));
};
auto top_peers_parent = r_top_peers.move_as_ok();
LOG(DEBUG) << "Receive contacts_getTopPeers result: " << to_string(top_peers_parent);
switch (top_peers_parent->get_id()) {
case telegram_api::contacts_topPeersNotModified::ID:
// nothing to do
return;
case telegram_api::contacts_topPeersDisabled::ID:
G()->shared_config().set_option_boolean("disable_top_chats", true);
set_is_enabled(false); // apply immediately
return;
case telegram_api::contacts_topPeers::ID: {
G()->shared_config().set_option_empty("disable_top_chats");
set_is_enabled(true); // apply immediately
auto top_peers = move_tl_object_as<telegram_api::contacts_topPeers>(std::move(top_peers_parent));
send_closure(G()->contacts_manager(), &ContactsManager::on_get_users, std::move(top_peers->users_),
"on get top chats");
send_closure(G()->contacts_manager(), &ContactsManager::on_get_chats, std::move(top_peers->chats_),
"on get top chats");
for (auto &category : top_peers->categories_) {
auto dialog_category = get_top_dialog_category(*category->category_);
auto pos = static_cast<size_t>(dialog_category);
CHECK(pos < by_category_.size());
auto &top_dialogs = by_category_[pos];
top_dialogs.is_dirty = true;
top_dialogs.dialogs.clear();
for (auto &top_peer : category->peers_) {
TopDialog top_dialog;
top_dialog.dialog_id = DialogId(top_peer->peer_);
top_dialog.rating = top_peer->rating_;
top_dialogs.dialogs.push_back(std::move(top_dialog));
}
}
db_sync_state_ = SyncState::None;
break;
}
default:
UNREACHABLE();
}
}
void TopDialogManager::do_save_top_dialogs() {
LOG(INFO) << "Save top chats";
for (size_t top_dialog_category_i = 0; top_dialog_category_i < by_category_.size(); top_dialog_category_i++) {
auto top_dialog_category = TopDialogCategory(top_dialog_category_i);
auto key = PSTRING() << "top_dialogs#" << top_dialog_category_name(top_dialog_category);
auto &top_dialogs = by_category_[top_dialog_category_i];
if (!top_dialogs.is_dirty) {
continue;
}
top_dialogs.is_dirty = false;
G()->td_db()->get_binlog_pmc()->set(key, log_event_store(top_dialogs).as_slice().str());
}
db_sync_state_ = SyncState::Ok;
first_unsync_change_ = Timestamp();
}
void TopDialogManager::start_up() {
do_start_up();
}
void TopDialogManager::do_start_up() {
auto auth_manager = G()->td().get_actor_unsafe()->auth_manager_.get();
if (auth_manager == nullptr || !auth_manager->is_authorized()) {
return;
}
is_active_ = G()->parameters().use_chat_info_db && !auth_manager->is_bot();
is_enabled_ = !G()->shared_config().get_option_boolean("disable_top_chats");
update_rating_e_decay();
string need_update_top_peers = G()->td_db()->get_binlog_pmc()->get("top_peers_enabled");
if (!need_update_top_peers.empty()) {
send_toggle_top_peers(need_update_top_peers[0] == '1');
}
init();
loop();
}
void TopDialogManager::init() {
was_first_sync_ = false;
first_unsync_change_ = Timestamp();
server_sync_state_ = SyncState::None;
last_server_sync_ = Timestamp();
CHECK(pending_get_top_dialogs_.empty());
LOG(DEBUG) << "Init is enabled: " << is_enabled_;
if (!is_active_) {
G()->td_db()->get_binlog_pmc()->erase_by_prefix("top_dialogs");
return;
}
auto di_top_dialogs_ts = G()->td_db()->get_binlog_pmc()->get("top_dialogs_ts");
if (!di_top_dialogs_ts.empty()) {
last_server_sync_ = Timestamp::in(to_integer<uint32>(di_top_dialogs_ts) - Clocks::system());
if (last_server_sync_.is_in_past()) {
server_sync_state_ = SyncState::Ok;
}
}
if (is_enabled_) {
for (size_t top_dialog_category_i = 0; top_dialog_category_i < by_category_.size(); top_dialog_category_i++) {
auto top_dialog_category = TopDialogCategory(top_dialog_category_i);
auto key = PSTRING() << "top_dialogs#" << top_dialog_category_name(top_dialog_category);
auto value = G()->td_db()->get_binlog_pmc()->get(key);
auto &top_dialogs = by_category_[top_dialog_category_i];
top_dialogs.is_dirty = false;
if (value.empty()) {
continue;
}
log_event_parse(top_dialogs, value).ensure();
}
normalize_rating();
} else {
G()->td_db()->get_binlog_pmc()->erase_by_prefix("top_dialogs#");
for (auto &top_dialogs : by_category_) {
top_dialogs.is_dirty = false;
top_dialogs.rating_timestamp = 0;
top_dialogs.dialogs.clear();
}
}
db_sync_state_ = SyncState::Ok;
send_closure(G()->state_manager(), &StateManager::wait_first_sync,
PromiseCreator::event(self_closure(this, &TopDialogManager::on_first_sync)));
}
void TopDialogManager::on_first_sync() {
was_first_sync_ = true;
if (!G()->close_flag() && G()->td().get_actor_unsafe()->auth_manager_->is_bot()) {
is_active_ = false;
init();
}
loop();
}
void TopDialogManager::loop() {
if (!is_active_ || G()->close_flag()) {
return;
}
if (!pending_get_top_dialogs_.empty()) {
for (auto &query : pending_get_top_dialogs_) {
do_get_top_dialogs(std::move(query));
}
pending_get_top_dialogs_.clear();
}
// server sync
Timestamp server_sync_timeout;
if (server_sync_state_ == SyncState::Ok) {
server_sync_timeout = Timestamp::at(last_server_sync_.at() + SERVER_SYNC_DELAY);
if (server_sync_timeout.is_in_past()) {
server_sync_state_ = SyncState::None;
}
}
Timestamp wakeup_timeout;
if (server_sync_state_ == SyncState::Ok) {
wakeup_timeout.relax(server_sync_timeout);
} else if (server_sync_state_ == SyncState::None && was_first_sync_) {
server_sync_state_ = SyncState::Pending;
do_get_top_peers();
}
if (is_enabled_) {
// database sync
Timestamp db_sync_timeout;
if (db_sync_state_ == SyncState::Ok) {
if (first_unsync_change_) {
db_sync_timeout = Timestamp::at(first_unsync_change_.at() + DB_SYNC_DELAY);
if (db_sync_timeout.is_in_past()) {
db_sync_state_ = SyncState::None;
}
}
}
if (db_sync_state_ == SyncState::Ok) {
wakeup_timeout.relax(db_sync_timeout);
} else if (db_sync_state_ == SyncState::None) {
if (server_sync_state_ == SyncState::Ok) {
do_save_top_dialogs();
}
}
}
if (wakeup_timeout) {
LOG(INFO) << "Wakeup in: " << wakeup_timeout.in();
set_timeout_at(wakeup_timeout.at());
} else {
LOG(INFO) << "Wakeup: never";
cancel_timeout();
}
}
} // namespace td