661 lines
22 KiB
C++
661 lines
22 KiB
C++
//
|
|
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2021
|
|
//
|
|
// Distributed under the Boost Software License, Version 1.0. (See accompanying
|
|
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
|
|
//
|
|
#include "td/telegram/TopDialogManager.h"
|
|
|
|
#include "td/telegram/AccessRights.h"
|
|
#include "td/telegram/AuthManager.h"
|
|
#include "td/telegram/ConfigShared.h"
|
|
#include "td/telegram/ContactsManager.h"
|
|
#include "td/telegram/DialogId.h"
|
|
#include "td/telegram/Global.h"
|
|
#include "td/telegram/logevent/LogEvent.h"
|
|
#include "td/telegram/MessagesManager.h"
|
|
#include "td/telegram/misc.h"
|
|
#include "td/telegram/net/NetQuery.h"
|
|
#include "td/telegram/net/NetQueryDispatcher.h"
|
|
#include "td/telegram/StateManager.h"
|
|
#include "td/telegram/Td.h"
|
|
#include "td/telegram/TdDb.h"
|
|
#include "td/telegram/TdParameters.h"
|
|
#include "td/telegram/telegram_api.h"
|
|
|
|
#include "td/utils/algorithm.h"
|
|
#include "td/utils/buffer.h"
|
|
#include "td/utils/logging.h"
|
|
#include "td/utils/misc.h"
|
|
#include "td/utils/port/Clocks.h"
|
|
#include "td/utils/ScopeGuard.h"
|
|
#include "td/utils/Slice.h"
|
|
#include "td/utils/SliceBuilder.h"
|
|
#include "td/utils/Status.h"
|
|
#include "td/utils/tl_helpers.h"
|
|
|
|
#include <algorithm>
|
|
#include <cmath>
|
|
#include <iterator>
|
|
|
|
namespace td {
|
|
|
|
class GetTopPeersQuery final : public Td::ResultHandler {
|
|
Promise<telegram_api::object_ptr<telegram_api::contacts_TopPeers>> promise_;
|
|
|
|
public:
|
|
explicit GetTopPeersQuery(Promise<telegram_api::object_ptr<telegram_api::contacts_TopPeers>> &&promise)
|
|
: promise_(std::move(promise)) {
|
|
}
|
|
|
|
void send(int64 hash) {
|
|
int32 flags =
|
|
telegram_api::contacts_getTopPeers::CORRESPONDENTS_MASK | telegram_api::contacts_getTopPeers::BOTS_PM_MASK |
|
|
telegram_api::contacts_getTopPeers::BOTS_INLINE_MASK | telegram_api::contacts_getTopPeers::GROUPS_MASK |
|
|
telegram_api::contacts_getTopPeers::CHANNELS_MASK | telegram_api::contacts_getTopPeers::PHONE_CALLS_MASK |
|
|
telegram_api::contacts_getTopPeers::FORWARD_USERS_MASK | telegram_api::contacts_getTopPeers::FORWARD_CHATS_MASK;
|
|
send_query(G()->net_query_creator().create(telegram_api::contacts_getTopPeers(
|
|
flags, false /*ignored*/, false /*ignored*/, false /*ignored*/, false /*ignored*/, false /*ignored*/,
|
|
false /*ignored*/, false /*ignored*/, false /*ignored*/, 0 /*offset*/, 100 /*limit*/, hash)));
|
|
}
|
|
|
|
void on_result(uint64 id, BufferSlice packet) final {
|
|
auto result_ptr = fetch_result<telegram_api::contacts_getTopPeers>(packet);
|
|
if (result_ptr.is_error()) {
|
|
return on_error(id, result_ptr.move_as_error());
|
|
}
|
|
|
|
promise_.set_value(result_ptr.move_as_ok());
|
|
}
|
|
|
|
void on_error(uint64 id, Status status) final {
|
|
promise_.set_error(std::move(status));
|
|
}
|
|
};
|
|
|
|
class ToggleTopPeersQuery final : public Td::ResultHandler {
|
|
Promise<Unit> promise_;
|
|
|
|
public:
|
|
explicit ToggleTopPeersQuery(Promise<Unit> &&promise) : promise_(std::move(promise)) {
|
|
}
|
|
|
|
void send(bool is_enabled) {
|
|
send_query(G()->net_query_creator().create(telegram_api::contacts_toggleTopPeers(is_enabled)));
|
|
}
|
|
|
|
void on_result(uint64 id, BufferSlice packet) final {
|
|
auto result_ptr = fetch_result<telegram_api::contacts_toggleTopPeers>(packet);
|
|
if (result_ptr.is_error()) {
|
|
return on_error(id, result_ptr.move_as_error());
|
|
}
|
|
|
|
promise_.set_value(Unit());
|
|
}
|
|
|
|
void on_error(uint64 id, Status status) final {
|
|
promise_.set_error(std::move(status));
|
|
}
|
|
};
|
|
|
|
class ResetTopPeerRatingQuery final : public Td::ResultHandler {
|
|
DialogId dialog_id_;
|
|
|
|
public:
|
|
void send(TopDialogCategory category, DialogId dialog_id) {
|
|
auto input_peer = td->messages_manager_->get_input_peer(dialog_id, AccessRights::Read);
|
|
if (input_peer == nullptr) {
|
|
return;
|
|
}
|
|
|
|
dialog_id_ = dialog_id;
|
|
send_query(G()->net_query_creator().create(
|
|
telegram_api::contacts_resetTopPeerRating(get_input_top_peer_category(category), std::move(input_peer))));
|
|
}
|
|
|
|
void on_result(uint64 id, BufferSlice packet) final {
|
|
auto result_ptr = fetch_result<telegram_api::contacts_resetTopPeerRating>(packet);
|
|
if (result_ptr.is_error()) {
|
|
return on_error(id, result_ptr.move_as_error());
|
|
}
|
|
|
|
// ignore the result
|
|
}
|
|
|
|
void on_error(uint64 id, Status status) final {
|
|
if (!td->messages_manager_->on_get_dialog_error(dialog_id_, status, "ResetTopPeerRatingQuery")) {
|
|
LOG(INFO) << "Receive error for resetTopPeerRating: " << status;
|
|
}
|
|
}
|
|
};
|
|
|
|
void TopDialogManager::update_is_enabled(bool is_enabled) {
|
|
if (td_->auth_manager_ == nullptr || !td_->auth_manager_->is_authorized() || td_->auth_manager_->is_bot()) {
|
|
return;
|
|
}
|
|
|
|
if (set_is_enabled(is_enabled)) {
|
|
G()->td_db()->get_binlog_pmc()->set("top_peers_enabled", is_enabled ? "1" : "0");
|
|
send_toggle_top_peers(is_enabled);
|
|
|
|
loop();
|
|
}
|
|
}
|
|
|
|
bool TopDialogManager::set_is_enabled(bool is_enabled) {
|
|
if (is_enabled_ == is_enabled) {
|
|
return false;
|
|
}
|
|
|
|
LOG(DEBUG) << "Change top chats is_enabled to " << is_enabled;
|
|
is_enabled_ = is_enabled;
|
|
try_start();
|
|
return true;
|
|
}
|
|
|
|
void TopDialogManager::send_toggle_top_peers(bool is_enabled) {
|
|
if (G()->close_flag()) {
|
|
return;
|
|
}
|
|
|
|
if (have_toggle_top_peers_query_) {
|
|
have_pending_toggle_top_peers_query_ = true;
|
|
pending_toggle_top_peers_query_ = is_enabled;
|
|
return;
|
|
}
|
|
|
|
LOG(DEBUG) << "Send toggle top peers query to " << is_enabled;
|
|
have_toggle_top_peers_query_ = true;
|
|
|
|
auto promise = PromiseCreator::lambda([actor_id = actor_id(this), is_enabled](Result<Unit> result) {
|
|
send_closure(actor_id, &TopDialogManager::on_toggle_top_peers, is_enabled, std::move(result));
|
|
});
|
|
td_->create_handler<ToggleTopPeersQuery>(std::move(promise))->send(is_enabled);
|
|
}
|
|
|
|
void TopDialogManager::on_toggle_top_peers(bool is_enabled, Result<Unit> &&result) {
|
|
CHECK(have_toggle_top_peers_query_);
|
|
have_toggle_top_peers_query_ = false;
|
|
|
|
if (have_pending_toggle_top_peers_query_) {
|
|
have_pending_toggle_top_peers_query_ = false;
|
|
if (pending_toggle_top_peers_query_ != is_enabled) {
|
|
send_toggle_top_peers(pending_toggle_top_peers_query_);
|
|
return;
|
|
}
|
|
}
|
|
|
|
if (result.is_ok()) {
|
|
// everything is synchronized
|
|
G()->td_db()->get_binlog_pmc()->erase("top_peers_enabled");
|
|
} else {
|
|
// let's resend the query forever
|
|
send_toggle_top_peers(is_enabled);
|
|
}
|
|
}
|
|
|
|
void TopDialogManager::on_dialog_used(TopDialogCategory category, DialogId dialog_id, int32 date) {
|
|
if (!is_active_ || !is_enabled_) {
|
|
return;
|
|
}
|
|
auto pos = static_cast<size_t>(category);
|
|
CHECK(pos < by_category_.size());
|
|
auto &top_dialogs = by_category_[pos];
|
|
|
|
top_dialogs.is_dirty = true;
|
|
auto it = std::find_if(top_dialogs.dialogs.begin(), top_dialogs.dialogs.end(),
|
|
[&](auto &top_dialog) { return top_dialog.dialog_id == dialog_id; });
|
|
if (it == top_dialogs.dialogs.end()) {
|
|
TopDialog top_dialog;
|
|
top_dialog.dialog_id = dialog_id;
|
|
top_dialogs.dialogs.push_back(top_dialog);
|
|
it = top_dialogs.dialogs.end() - 1;
|
|
}
|
|
|
|
auto delta = rating_add(date, top_dialogs.rating_timestamp);
|
|
it->rating += delta;
|
|
while (it != top_dialogs.dialogs.begin()) {
|
|
auto next = std::prev(it);
|
|
if (*next < *it) {
|
|
break;
|
|
}
|
|
std::swap(*next, *it);
|
|
it = next;
|
|
}
|
|
|
|
LOG(INFO) << "Update " << get_top_dialog_category_name(category) << " rating of " << dialog_id << " by " << delta;
|
|
|
|
if (!first_unsync_change_) {
|
|
first_unsync_change_ = Timestamp::now_cached();
|
|
}
|
|
loop();
|
|
}
|
|
|
|
void TopDialogManager::remove_dialog(TopDialogCategory category, DialogId dialog_id, Promise<Unit> &&promise) {
|
|
if (category == TopDialogCategory::Size) {
|
|
return promise.set_error(Status::Error(400, "Top chat category must be non-empty"));
|
|
}
|
|
if (!td_->messages_manager_->have_dialog_force(dialog_id, "remove_dialog")) {
|
|
return promise.set_error(Status::Error(400, "Chat not found"));
|
|
}
|
|
if (!is_active_ || !is_enabled_) {
|
|
return promise.set_value(Unit());
|
|
}
|
|
|
|
if (category == TopDialogCategory::ForwardUsers && dialog_id.get_type() != DialogType::User) {
|
|
category = TopDialogCategory::ForwardChats;
|
|
}
|
|
|
|
auto pos = static_cast<size_t>(category);
|
|
CHECK(pos < by_category_.size());
|
|
auto &top_dialogs = by_category_[pos];
|
|
|
|
td_->create_handler<ResetTopPeerRatingQuery>()->send(category, dialog_id);
|
|
|
|
auto it = std::find_if(top_dialogs.dialogs.begin(), top_dialogs.dialogs.end(),
|
|
[&](auto &top_dialog) { return top_dialog.dialog_id == dialog_id; });
|
|
if (it == top_dialogs.dialogs.end()) {
|
|
return promise.set_value(Unit());
|
|
}
|
|
|
|
top_dialogs.is_dirty = true;
|
|
top_dialogs.dialogs.erase(it);
|
|
if (!first_unsync_change_) {
|
|
first_unsync_change_ = Timestamp::now_cached();
|
|
}
|
|
loop();
|
|
promise.set_value(Unit());
|
|
}
|
|
|
|
void TopDialogManager::get_top_dialogs(TopDialogCategory category, int32 limit, Promise<vector<DialogId>> promise) {
|
|
if (category == TopDialogCategory::Size) {
|
|
return promise.set_error(Status::Error(400, "Top chat category must be non-empty"));
|
|
}
|
|
if (limit <= 0) {
|
|
return promise.set_error(Status::Error(400, "Limit must be positive"));
|
|
}
|
|
if (!is_active_) {
|
|
return promise.set_error(Status::Error(400, "Not supported without chat info database"));
|
|
}
|
|
if (!is_enabled_) {
|
|
return promise.set_error(Status::Error(400, "Top chats computation is disabled"));
|
|
}
|
|
|
|
GetTopDialogsQuery query;
|
|
query.category = category;
|
|
query.limit = static_cast<size_t>(limit);
|
|
query.promise = std::move(promise);
|
|
pending_get_top_dialogs_.push_back(std::move(query));
|
|
loop();
|
|
}
|
|
|
|
void TopDialogManager::update_rating_e_decay() {
|
|
if (!is_active_) {
|
|
return;
|
|
}
|
|
rating_e_decay_ = narrow_cast<int32>(G()->shared_config().get_option_integer("rating_e_decay", rating_e_decay_));
|
|
}
|
|
|
|
template <class StorerT>
|
|
void store(const TopDialogManager::TopDialog &top_dialog, StorerT &storer) {
|
|
using ::td::store;
|
|
store(top_dialog.dialog_id, storer);
|
|
store(top_dialog.rating, storer);
|
|
}
|
|
|
|
template <class ParserT>
|
|
void parse(TopDialogManager::TopDialog &top_dialog, ParserT &parser) {
|
|
using ::td::parse;
|
|
parse(top_dialog.dialog_id, parser);
|
|
parse(top_dialog.rating, parser);
|
|
}
|
|
|
|
template <class StorerT>
|
|
void store(const TopDialogManager::TopDialogs &top_dialogs, StorerT &storer) {
|
|
using ::td::store;
|
|
store(top_dialogs.rating_timestamp, storer);
|
|
store(top_dialogs.dialogs, storer);
|
|
}
|
|
|
|
template <class ParserT>
|
|
void parse(TopDialogManager::TopDialogs &top_dialogs, ParserT &parser) {
|
|
using ::td::parse;
|
|
parse(top_dialogs.rating_timestamp, parser);
|
|
parse(top_dialogs.dialogs, parser);
|
|
}
|
|
|
|
double TopDialogManager::rating_add(double now, double rating_timestamp) const {
|
|
return std::exp((now - rating_timestamp) / rating_e_decay_);
|
|
}
|
|
|
|
double TopDialogManager::current_rating_add(double rating_timestamp) const {
|
|
return rating_add(G()->server_time_cached(), rating_timestamp);
|
|
}
|
|
|
|
void TopDialogManager::normalize_rating() {
|
|
for (auto &top_dialogs : by_category_) {
|
|
auto div_by = current_rating_add(top_dialogs.rating_timestamp);
|
|
top_dialogs.rating_timestamp = G()->server_time_cached();
|
|
for (auto &dialog : top_dialogs.dialogs) {
|
|
dialog.rating /= div_by;
|
|
}
|
|
top_dialogs.is_dirty = true;
|
|
}
|
|
db_sync_state_ = SyncState::None;
|
|
}
|
|
|
|
void TopDialogManager::do_get_top_dialogs(GetTopDialogsQuery &&query) {
|
|
vector<DialogId> dialog_ids;
|
|
if (query.category != TopDialogCategory::ForwardUsers) {
|
|
auto pos = static_cast<size_t>(query.category);
|
|
CHECK(pos < by_category_.size());
|
|
dialog_ids = transform(by_category_[pos].dialogs, [](const auto &x) { return x.dialog_id; });
|
|
} else {
|
|
// merge ForwardUsers and ForwardChats
|
|
auto &users = by_category_[static_cast<size_t>(TopDialogCategory::ForwardUsers)];
|
|
auto &chats = by_category_[static_cast<size_t>(TopDialogCategory::ForwardChats)];
|
|
size_t users_pos = 0;
|
|
size_t chats_pos = 0;
|
|
while (users_pos < users.dialogs.size() || chats_pos < chats.dialogs.size()) {
|
|
if (chats_pos == chats.dialogs.size() ||
|
|
(users_pos < users.dialogs.size() && users.dialogs[users_pos] < chats.dialogs[chats_pos])) {
|
|
dialog_ids.push_back(users.dialogs[users_pos++].dialog_id);
|
|
} else {
|
|
dialog_ids.push_back(chats.dialogs[chats_pos++].dialog_id);
|
|
}
|
|
}
|
|
}
|
|
|
|
auto promise = PromiseCreator::lambda(
|
|
[actor_id = actor_id(this), query = std::move(query)](Result<vector<DialogId>> r_dialog_ids) mutable {
|
|
if (r_dialog_ids.is_error()) {
|
|
return query.promise.set_error(r_dialog_ids.move_as_error());
|
|
}
|
|
send_closure(actor_id, &TopDialogManager::on_load_dialogs, std::move(query), r_dialog_ids.move_as_ok());
|
|
});
|
|
td_->messages_manager_->load_dialogs(std::move(dialog_ids), std::move(promise));
|
|
}
|
|
|
|
void TopDialogManager::on_load_dialogs(GetTopDialogsQuery &&query, vector<DialogId> &&dialog_ids) {
|
|
auto limit = std::min({query.limit, MAX_TOP_DIALOGS_LIMIT, dialog_ids.size()});
|
|
vector<DialogId> result;
|
|
result.reserve(limit);
|
|
for (auto dialog_id : dialog_ids) {
|
|
if (dialog_id.get_type() == DialogType::User) {
|
|
auto user_id = dialog_id.get_user_id();
|
|
if (td_->contacts_manager_->is_user_deleted(user_id)) {
|
|
LOG(INFO) << "Skip deleted " << user_id;
|
|
continue;
|
|
}
|
|
if (td_->contacts_manager_->get_my_id() == user_id) {
|
|
LOG(INFO) << "Skip self " << user_id;
|
|
continue;
|
|
}
|
|
if (query.category == TopDialogCategory::BotInline || query.category == TopDialogCategory::BotPM) {
|
|
auto r_bot_info = td_->contacts_manager_->get_bot_data(user_id);
|
|
if (r_bot_info.is_error()) {
|
|
LOG(INFO) << "Skip not a bot " << user_id;
|
|
continue;
|
|
}
|
|
if (query.category == TopDialogCategory::BotInline &&
|
|
(r_bot_info.ok().username.empty() || !r_bot_info.ok().is_inline)) {
|
|
LOG(INFO) << "Skip not inline bot " << user_id;
|
|
continue;
|
|
}
|
|
}
|
|
}
|
|
|
|
result.push_back(dialog_id);
|
|
if (result.size() == limit) {
|
|
break;
|
|
}
|
|
}
|
|
|
|
query.promise.set_value(std::move(result));
|
|
}
|
|
|
|
void TopDialogManager::do_get_top_peers() {
|
|
std::vector<uint64> ids;
|
|
for (auto &category : by_category_) {
|
|
for (auto &top_dialog : category.dialogs) {
|
|
auto dialog_id = top_dialog.dialog_id;
|
|
switch (dialog_id.get_type()) {
|
|
case DialogType::User:
|
|
ids.push_back(dialog_id.get_user_id().get());
|
|
break;
|
|
case DialogType::Chat:
|
|
ids.push_back(dialog_id.get_chat_id().get());
|
|
break;
|
|
case DialogType::Channel:
|
|
ids.push_back(dialog_id.get_channel_id().get());
|
|
break;
|
|
default:
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
auto promise = PromiseCreator::lambda(
|
|
[actor_id = actor_id(this)](Result<telegram_api::object_ptr<telegram_api::contacts_TopPeers>> result) {
|
|
send_closure(actor_id, &TopDialogManager::on_get_top_peers, std::move(result));
|
|
});
|
|
td_->create_handler<GetTopPeersQuery>(std::move(promise))->send(get_vector_hash(ids));
|
|
}
|
|
|
|
void TopDialogManager::on_get_top_peers(Result<telegram_api::object_ptr<telegram_api::contacts_TopPeers>> result) {
|
|
normalize_rating(); // once a day too
|
|
|
|
if (result.is_error()) {
|
|
last_server_sync_ = Timestamp::in(SERVER_SYNC_RESEND_DELAY - SERVER_SYNC_DELAY);
|
|
loop();
|
|
return;
|
|
}
|
|
|
|
last_server_sync_ = Timestamp::now();
|
|
server_sync_state_ = SyncState::Ok;
|
|
|
|
auto top_peers_parent = result.move_as_ok();
|
|
LOG(DEBUG) << "Receive contacts_getTopPeers result: " << to_string(top_peers_parent);
|
|
switch (top_peers_parent->get_id()) {
|
|
case telegram_api::contacts_topPeersNotModified::ID:
|
|
// nothing to do
|
|
break;
|
|
case telegram_api::contacts_topPeersDisabled::ID:
|
|
G()->shared_config().set_option_boolean("disable_top_chats", true);
|
|
set_is_enabled(false); // apply immediately
|
|
break;
|
|
case telegram_api::contacts_topPeers::ID: {
|
|
G()->shared_config().set_option_empty("disable_top_chats");
|
|
set_is_enabled(true); // apply immediately
|
|
auto top_peers = move_tl_object_as<telegram_api::contacts_topPeers>(std::move(top_peers_parent));
|
|
|
|
td_->contacts_manager_->on_get_users(std::move(top_peers->users_), "on get top chats");
|
|
td_->contacts_manager_->on_get_chats(std::move(top_peers->chats_), "on get top chats");
|
|
for (auto &category : top_peers->categories_) {
|
|
auto dialog_category = get_top_dialog_category(category->category_);
|
|
auto pos = static_cast<size_t>(dialog_category);
|
|
CHECK(pos < by_category_.size());
|
|
auto &top_dialogs = by_category_[pos];
|
|
|
|
top_dialogs.is_dirty = true;
|
|
top_dialogs.dialogs.clear();
|
|
for (auto &top_peer : category->peers_) {
|
|
TopDialog top_dialog;
|
|
top_dialog.dialog_id = DialogId(top_peer->peer_);
|
|
top_dialog.rating = top_peer->rating_;
|
|
top_dialogs.dialogs.push_back(std::move(top_dialog));
|
|
}
|
|
}
|
|
db_sync_state_ = SyncState::None;
|
|
break;
|
|
}
|
|
default:
|
|
UNREACHABLE();
|
|
}
|
|
|
|
G()->td_db()->get_binlog_pmc()->set("top_dialogs_ts", to_string(static_cast<uint32>(Clocks::system())));
|
|
loop();
|
|
}
|
|
|
|
void TopDialogManager::do_save_top_dialogs() {
|
|
LOG(INFO) << "Save top chats";
|
|
for (size_t top_dialog_category_i = 0; top_dialog_category_i < by_category_.size(); top_dialog_category_i++) {
|
|
auto top_dialog_category = TopDialogCategory(top_dialog_category_i);
|
|
auto key = PSTRING() << "top_dialogs#" << get_top_dialog_category_name(top_dialog_category);
|
|
|
|
auto &top_dialogs = by_category_[top_dialog_category_i];
|
|
if (!top_dialogs.is_dirty) {
|
|
continue;
|
|
}
|
|
top_dialogs.is_dirty = false;
|
|
|
|
G()->td_db()->get_binlog_pmc()->set(key, log_event_store(top_dialogs).as_slice().str());
|
|
}
|
|
db_sync_state_ = SyncState::Ok;
|
|
first_unsync_change_ = Timestamp();
|
|
}
|
|
|
|
void TopDialogManager::start_up() {
|
|
init();
|
|
}
|
|
|
|
void TopDialogManager::tear_down() {
|
|
parent_.reset();
|
|
}
|
|
|
|
void TopDialogManager::init() {
|
|
if (td_->auth_manager_ == nullptr || !td_->auth_manager_->is_authorized()) {
|
|
return;
|
|
}
|
|
|
|
is_active_ = G()->parameters().use_chat_info_db && !td_->auth_manager_->is_bot();
|
|
is_enabled_ = !G()->shared_config().get_option_boolean("disable_top_chats");
|
|
update_rating_e_decay();
|
|
|
|
string need_update_top_peers = G()->td_db()->get_binlog_pmc()->get("top_peers_enabled");
|
|
if (!need_update_top_peers.empty()) {
|
|
send_toggle_top_peers(need_update_top_peers[0] == '1');
|
|
}
|
|
|
|
try_start();
|
|
loop();
|
|
}
|
|
|
|
void TopDialogManager::try_start() {
|
|
was_first_sync_ = false;
|
|
first_unsync_change_ = Timestamp();
|
|
server_sync_state_ = SyncState::None;
|
|
last_server_sync_ = Timestamp();
|
|
CHECK(pending_get_top_dialogs_.empty());
|
|
|
|
LOG(DEBUG) << "Init is enabled: " << is_enabled_;
|
|
if (!is_active_) {
|
|
G()->td_db()->get_binlog_pmc()->erase_by_prefix("top_dialogs");
|
|
return;
|
|
}
|
|
|
|
auto di_top_dialogs_ts = G()->td_db()->get_binlog_pmc()->get("top_dialogs_ts");
|
|
if (!di_top_dialogs_ts.empty()) {
|
|
last_server_sync_ = Timestamp::in(to_integer<uint32>(di_top_dialogs_ts) - Clocks::system());
|
|
if (last_server_sync_.is_in_past()) {
|
|
server_sync_state_ = SyncState::Ok;
|
|
}
|
|
}
|
|
|
|
if (is_enabled_) {
|
|
for (size_t top_dialog_category_i = 0; top_dialog_category_i < by_category_.size(); top_dialog_category_i++) {
|
|
auto top_dialog_category = TopDialogCategory(top_dialog_category_i);
|
|
auto key = PSTRING() << "top_dialogs#" << get_top_dialog_category_name(top_dialog_category);
|
|
auto value = G()->td_db()->get_binlog_pmc()->get(key);
|
|
|
|
auto &top_dialogs = by_category_[top_dialog_category_i];
|
|
top_dialogs.is_dirty = false;
|
|
if (value.empty()) {
|
|
continue;
|
|
}
|
|
log_event_parse(top_dialogs, value).ensure();
|
|
}
|
|
normalize_rating();
|
|
} else {
|
|
G()->td_db()->get_binlog_pmc()->erase_by_prefix("top_dialogs#");
|
|
for (auto &top_dialogs : by_category_) {
|
|
top_dialogs.is_dirty = false;
|
|
top_dialogs.rating_timestamp = 0;
|
|
top_dialogs.dialogs.clear();
|
|
}
|
|
}
|
|
db_sync_state_ = SyncState::Ok;
|
|
|
|
send_closure(G()->state_manager(), &StateManager::wait_first_sync,
|
|
PromiseCreator::event(self_closure(this, &TopDialogManager::on_first_sync)));
|
|
}
|
|
|
|
void TopDialogManager::on_first_sync() {
|
|
was_first_sync_ = true;
|
|
if (!G()->close_flag() && td_->auth_manager_->is_bot()) {
|
|
is_active_ = false;
|
|
try_start();
|
|
}
|
|
loop();
|
|
}
|
|
|
|
void TopDialogManager::loop() {
|
|
if (!is_active_ || G()->close_flag()) {
|
|
return;
|
|
}
|
|
|
|
if (!pending_get_top_dialogs_.empty()) {
|
|
for (auto &query : pending_get_top_dialogs_) {
|
|
do_get_top_dialogs(std::move(query));
|
|
}
|
|
pending_get_top_dialogs_.clear();
|
|
}
|
|
|
|
// server sync
|
|
Timestamp server_sync_timeout;
|
|
if (server_sync_state_ == SyncState::Ok) {
|
|
server_sync_timeout = Timestamp::at(last_server_sync_.at() + SERVER_SYNC_DELAY);
|
|
if (server_sync_timeout.is_in_past()) {
|
|
server_sync_state_ = SyncState::None;
|
|
}
|
|
}
|
|
|
|
Timestamp wakeup_timeout;
|
|
if (server_sync_state_ == SyncState::Ok) {
|
|
wakeup_timeout.relax(server_sync_timeout);
|
|
} else if (server_sync_state_ == SyncState::None && was_first_sync_) {
|
|
server_sync_state_ = SyncState::Pending;
|
|
do_get_top_peers();
|
|
}
|
|
|
|
if (is_enabled_) {
|
|
// database sync
|
|
Timestamp db_sync_timeout;
|
|
if (db_sync_state_ == SyncState::Ok) {
|
|
if (first_unsync_change_) {
|
|
db_sync_timeout = Timestamp::at(first_unsync_change_.at() + DB_SYNC_DELAY);
|
|
if (db_sync_timeout.is_in_past()) {
|
|
db_sync_state_ = SyncState::None;
|
|
}
|
|
}
|
|
}
|
|
|
|
if (db_sync_state_ == SyncState::Ok) {
|
|
wakeup_timeout.relax(db_sync_timeout);
|
|
} else if (db_sync_state_ == SyncState::None) {
|
|
if (server_sync_state_ == SyncState::Ok) {
|
|
do_save_top_dialogs();
|
|
}
|
|
}
|
|
}
|
|
|
|
if (wakeup_timeout) {
|
|
LOG(INFO) << "Wakeup in: " << wakeup_timeout.in();
|
|
set_timeout_at(wakeup_timeout.at());
|
|
} else {
|
|
LOG(INFO) << "Wakeup: never";
|
|
cancel_timeout();
|
|
}
|
|
}
|
|
|
|
} // namespace td
|