//
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2023
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include "td/telegram/TopDialogManager.h"

#include "td/telegram/AccessRights.h"
#include "td/telegram/AuthManager.h"
#include "td/telegram/ContactsManager.h"
#include "td/telegram/DialogId.h"
#include "td/telegram/Global.h"
#include "td/telegram/logevent/LogEvent.h"
#include "td/telegram/MessagesManager.h"
#include "td/telegram/misc.h"
#include "td/telegram/net/NetQuery.h"
#include "td/telegram/net/NetQueryDispatcher.h"
#include "td/telegram/StateManager.h"
#include "td/telegram/Td.h"
#include "td/telegram/TdDb.h"
#include "td/telegram/telegram_api.h"

#include "td/actor/PromiseFuture.h"

#include "td/utils/algorithm.h"
#include "td/utils/buffer.h"
#include "td/utils/logging.h"
#include "td/utils/misc.h"
#include "td/utils/port/Clocks.h"
#include "td/utils/SliceBuilder.h"
#include "td/utils/Status.h"
#include "td/utils/tl_helpers.h"

#include <algorithm>
#include <cmath>
#include <iterator>

namespace td {

class GetTopPeersQuery final : public Td::ResultHandler {
  Promise<telegram_api::object_ptr<telegram_api::contacts_TopPeers>> promise_;

 public:
  explicit GetTopPeersQuery(Promise<telegram_api::object_ptr<telegram_api::contacts_TopPeers>> &&promise)
      : promise_(std::move(promise)) {
  }

  void send(int64 hash) {
    int32 flags =
        telegram_api::contacts_getTopPeers::CORRESPONDENTS_MASK | telegram_api::contacts_getTopPeers::BOTS_PM_MASK |
        telegram_api::contacts_getTopPeers::BOTS_INLINE_MASK | telegram_api::contacts_getTopPeers::GROUPS_MASK |
        telegram_api::contacts_getTopPeers::CHANNELS_MASK | telegram_api::contacts_getTopPeers::PHONE_CALLS_MASK |
        telegram_api::contacts_getTopPeers::FORWARD_USERS_MASK | telegram_api::contacts_getTopPeers::FORWARD_CHATS_MASK;
    send_query(G()->net_query_creator().create(telegram_api::contacts_getTopPeers(
        flags, false /*ignored*/, false /*ignored*/, false /*ignored*/, false /*ignored*/, false /*ignored*/,
        false /*ignored*/, false /*ignored*/, false /*ignored*/, 0 /*offset*/, 100 /*limit*/, hash)));
  }

  void on_result(BufferSlice packet) final {
    auto result_ptr = fetch_result<telegram_api::contacts_getTopPeers>(packet);
    if (result_ptr.is_error()) {
      return on_error(result_ptr.move_as_error());
    }

    promise_.set_value(result_ptr.move_as_ok());
  }

  void on_error(Status status) final {
    promise_.set_error(std::move(status));
  }
};

class ToggleTopPeersQuery final : public Td::ResultHandler {
  Promise<Unit> promise_;

 public:
  explicit ToggleTopPeersQuery(Promise<Unit> &&promise) : promise_(std::move(promise)) {
  }

  void send(bool is_enabled) {
    send_query(G()->net_query_creator().create(telegram_api::contacts_toggleTopPeers(is_enabled)));
  }

  void on_result(BufferSlice packet) final {
    auto result_ptr = fetch_result<telegram_api::contacts_toggleTopPeers>(packet);
    if (result_ptr.is_error()) {
      return on_error(result_ptr.move_as_error());
    }

    promise_.set_value(Unit());
  }

  void on_error(Status status) final {
    promise_.set_error(std::move(status));
  }
};

class ResetTopPeerRatingQuery final : public Td::ResultHandler {
  DialogId dialog_id_;

 public:
  void send(TopDialogCategory category, DialogId dialog_id) {
    auto input_peer = td_->messages_manager_->get_input_peer(dialog_id, AccessRights::Read);
    if (input_peer == nullptr) {
      return;
    }

    dialog_id_ = dialog_id;
    send_query(G()->net_query_creator().create(
        telegram_api::contacts_resetTopPeerRating(get_input_top_peer_category(category), std::move(input_peer))));
  }

  void on_result(BufferSlice packet) final {
    auto result_ptr = fetch_result<telegram_api::contacts_resetTopPeerRating>(packet);
    if (result_ptr.is_error()) {
      return on_error(result_ptr.move_as_error());
    }

    // ignore the result
  }

  void on_error(Status status) final {
    if (!td_->messages_manager_->on_get_dialog_error(dialog_id_, status, "ResetTopPeerRatingQuery")) {
      LOG(INFO) << "Receive error for ResetTopPeerRatingQuery: " << status;
    }
  }
};

TopDialogManager::TopDialogManager(Td *td, ActorShared<> parent) : td_(td), parent_(std::move(parent)) {
}

void TopDialogManager::update_is_enabled(bool is_enabled) {
  if (td_->auth_manager_ == nullptr || !td_->auth_manager_->is_authorized() || td_->auth_manager_->is_bot()) {
    return;
  }

  if (set_is_enabled(is_enabled)) {
    G()->td_db()->get_binlog_pmc()->set("top_peers_enabled", is_enabled ? "1" : "0");
    send_toggle_top_peers(is_enabled);

    loop();
  }
}

bool TopDialogManager::set_is_enabled(bool is_enabled) {
  if (is_enabled_ == is_enabled) {
    return false;
  }

  LOG(DEBUG) << "Change top chats is_enabled to " << is_enabled;
  is_enabled_ = is_enabled;
  try_start();
  return true;
}

void TopDialogManager::send_toggle_top_peers(bool is_enabled) {
  if (G()->close_flag()) {
    return;
  }

  if (have_toggle_top_peers_query_) {
    have_pending_toggle_top_peers_query_ = true;
    pending_toggle_top_peers_query_ = is_enabled;
    return;
  }

  LOG(DEBUG) << "Send toggle top peers query to " << is_enabled;
  have_toggle_top_peers_query_ = true;

  auto promise = PromiseCreator::lambda([actor_id = actor_id(this), is_enabled](Result<Unit> result) {
    send_closure(actor_id, &TopDialogManager::on_toggle_top_peers, is_enabled, std::move(result));
  });
  td_->create_handler<ToggleTopPeersQuery>(std::move(promise))->send(is_enabled);
}

void TopDialogManager::on_toggle_top_peers(bool is_enabled, Result<Unit> &&result) {
  CHECK(have_toggle_top_peers_query_);
  have_toggle_top_peers_query_ = false;

  if (have_pending_toggle_top_peers_query_) {
    have_pending_toggle_top_peers_query_ = false;
    if (pending_toggle_top_peers_query_ != is_enabled) {
      send_toggle_top_peers(pending_toggle_top_peers_query_);
      return;
    }
  }

  if (result.is_ok()) {
    // everything is synchronized
    G()->td_db()->get_binlog_pmc()->erase("top_peers_enabled");
  } else {
    // let's resend the query forever
    send_toggle_top_peers(is_enabled);
  }
  loop();
}

void TopDialogManager::on_dialog_used(TopDialogCategory category, DialogId dialog_id, int32 date) {
  if (!is_active_ || !is_enabled_) {
    return;
  }
  auto pos = static_cast<size_t>(category);
  CHECK(pos < by_category_.size());
  auto &top_dialogs = by_category_[pos];

  top_dialogs.is_dirty = true;
  auto it = std::find_if(top_dialogs.dialogs.begin(), top_dialogs.dialogs.end(),
                         [&](auto &top_dialog) { return top_dialog.dialog_id == dialog_id; });
  if (it == top_dialogs.dialogs.end()) {
    TopDialog top_dialog;
    top_dialog.dialog_id = dialog_id;
    top_dialogs.dialogs.push_back(top_dialog);
    it = top_dialogs.dialogs.end() - 1;
  }

  auto delta = rating_add(date, top_dialogs.rating_timestamp);
  it->rating += delta;
  while (it != top_dialogs.dialogs.begin()) {
    auto next = std::prev(it);
    if (*next < *it) {
      break;
    }
    std::swap(*next, *it);
    it = next;
  }

  LOG(INFO) << "Update " << get_top_dialog_category_name(category) << " rating of " << dialog_id << " by " << delta;

  if (!first_unsync_change_) {
    first_unsync_change_ = Timestamp::now_cached();
  }
  loop();
}

void TopDialogManager::remove_dialog(TopDialogCategory category, DialogId dialog_id, Promise<Unit> &&promise) {
  if (category == TopDialogCategory::Size) {
    return promise.set_error(Status::Error(400, "Top chat category must be non-empty"));
  }
  if (!td_->messages_manager_->have_dialog_force(dialog_id, "remove_dialog")) {
    return promise.set_error(Status::Error(400, "Chat not found"));
  }
  if (!is_active_ || !is_enabled_) {
    return promise.set_value(Unit());
  }

  if (category == TopDialogCategory::ForwardUsers && dialog_id.get_type() != DialogType::User) {
    category = TopDialogCategory::ForwardChats;
  }

  auto pos = static_cast<size_t>(category);
  CHECK(pos < by_category_.size());
  auto &top_dialogs = by_category_[pos];

  td_->create_handler<ResetTopPeerRatingQuery>()->send(category, dialog_id);

  auto it = std::find_if(top_dialogs.dialogs.begin(), top_dialogs.dialogs.end(),
                         [&](auto &top_dialog) { return top_dialog.dialog_id == dialog_id; });
  if (it == top_dialogs.dialogs.end()) {
    return promise.set_value(Unit());
  }

  top_dialogs.is_dirty = true;
  top_dialogs.dialogs.erase(it);
  if (!first_unsync_change_) {
    first_unsync_change_ = Timestamp::now_cached();
  }
  loop();
  promise.set_value(Unit());
}

void TopDialogManager::get_top_dialogs(TopDialogCategory category, int32 limit,
                                       Promise<td_api::object_ptr<td_api::chats>> &&promise) {
  if (category == TopDialogCategory::Size) {
    return promise.set_error(Status::Error(400, "Top chat category must be non-empty"));
  }
  if (limit <= 0) {
    return promise.set_error(Status::Error(400, "Limit must be positive"));
  }
  if (!is_active_) {
    return promise.set_error(Status::Error(400, "Not supported without chat info database"));
  }
  if (!is_enabled_) {
    return promise.set_error(Status::Error(400, "Top chats computation is disabled"));
  }

  GetTopDialogsQuery query;
  query.category = category;
  query.limit = static_cast<size_t>(limit);
  query.promise = std::move(promise);
  pending_get_top_dialogs_.push_back(std::move(query));
  loop();
}

void TopDialogManager::update_rating_e_decay() {
  if (!is_active_) {
    return;
  }
  rating_e_decay_ = narrow_cast<int32>(G()->get_option_integer("rating_e_decay", rating_e_decay_));
}

template <class StorerT>
void store(const TopDialogManager::TopDialog &top_dialog, StorerT &storer) {
  using ::td::store;
  store(top_dialog.dialog_id, storer);
  store(top_dialog.rating, storer);
}

template <class ParserT>
void parse(TopDialogManager::TopDialog &top_dialog, ParserT &parser) {
  using ::td::parse;
  parse(top_dialog.dialog_id, parser);
  parse(top_dialog.rating, parser);
}

template <class StorerT>
void store(const TopDialogManager::TopDialogs &top_dialogs, StorerT &storer) {
  using ::td::store;
  store(top_dialogs.rating_timestamp, storer);
  store(top_dialogs.dialogs, storer);
}

template <class ParserT>
void parse(TopDialogManager::TopDialogs &top_dialogs, ParserT &parser) {
  using ::td::parse;
  parse(top_dialogs.rating_timestamp, parser);
  parse(top_dialogs.dialogs, parser);
}

double TopDialogManager::rating_add(double now, double rating_timestamp) const {
  return std::exp((now - rating_timestamp) / rating_e_decay_);
}

double TopDialogManager::current_rating_add(double rating_timestamp) const {
  return rating_add(G()->server_time_cached(), rating_timestamp);
}

void TopDialogManager::normalize_rating() {
  for (auto &top_dialogs : by_category_) {
    auto div_by = current_rating_add(top_dialogs.rating_timestamp);
    top_dialogs.rating_timestamp = G()->server_time_cached();
    for (auto &dialog : top_dialogs.dialogs) {
      dialog.rating /= div_by;
    }
    top_dialogs.is_dirty = true;
  }
  db_sync_state_ = SyncState::None;
}

void TopDialogManager::do_get_top_dialogs(GetTopDialogsQuery &&query) {
  vector<DialogId> dialog_ids;
  if (query.category != TopDialogCategory::ForwardUsers) {
    auto pos = static_cast<size_t>(query.category);
    CHECK(pos < by_category_.size());
    dialog_ids = transform(by_category_[pos].dialogs, [](const auto &x) { return x.dialog_id; });
  } else {
    // merge ForwardUsers and ForwardChats
    auto &users = by_category_[static_cast<size_t>(TopDialogCategory::ForwardUsers)];
    auto &chats = by_category_[static_cast<size_t>(TopDialogCategory::ForwardChats)];
    size_t users_pos = 0;
    size_t chats_pos = 0;
    while (users_pos < users.dialogs.size() || chats_pos < chats.dialogs.size()) {
      if (chats_pos == chats.dialogs.size() ||
          (users_pos < users.dialogs.size() && users.dialogs[users_pos] < chats.dialogs[chats_pos])) {
        dialog_ids.push_back(users.dialogs[users_pos++].dialog_id);
      } else {
        dialog_ids.push_back(chats.dialogs[chats_pos++].dialog_id);
      }
    }
  }

  auto promise = PromiseCreator::lambda(
      [actor_id = actor_id(this), query = std::move(query)](Result<vector<DialogId>> r_dialog_ids) mutable {
        if (r_dialog_ids.is_error()) {
          return query.promise.set_error(r_dialog_ids.move_as_error());
        }
        send_closure(actor_id, &TopDialogManager::on_load_dialogs, std::move(query), r_dialog_ids.move_as_ok());
      });
  send_closure(td_->messages_manager_actor_, &MessagesManager::load_dialogs, std::move(dialog_ids), std::move(promise));
}

void TopDialogManager::on_load_dialogs(GetTopDialogsQuery &&query, vector<DialogId> &&dialog_ids) {
  auto limit = std::min({query.limit, MAX_TOP_DIALOGS_LIMIT, dialog_ids.size()});
  vector<DialogId> result;
  result.reserve(limit);
  for (auto dialog_id : dialog_ids) {
    if (dialog_id.get_type() == DialogType::User) {
      auto user_id = dialog_id.get_user_id();
      if (td_->contacts_manager_->is_user_deleted(user_id)) {
        LOG(INFO) << "Skip deleted " << user_id;
        continue;
      }
      if (td_->contacts_manager_->get_my_id() == user_id) {
        LOG(INFO) << "Skip self " << user_id;
        continue;
      }
      if (query.category == TopDialogCategory::BotInline || query.category == TopDialogCategory::BotPM) {
        auto r_bot_info = td_->contacts_manager_->get_bot_data(user_id);
        if (r_bot_info.is_error()) {
          LOG(INFO) << "Skip not a bot " << user_id;
          continue;
        }
        if (query.category == TopDialogCategory::BotInline &&
            (r_bot_info.ok().username.empty() || !r_bot_info.ok().is_inline)) {
          LOG(INFO) << "Skip not inline bot " << user_id;
          continue;
        }
      }
    }

    result.push_back(dialog_id);
    if (result.size() == limit) {
      break;
    }
  }

  query.promise.set_value(
      td_->messages_manager_->get_chats_object(-1, std::move(result), "TopDialogManager::on_load_dialogs"));
}

void TopDialogManager::do_get_top_peers() {
  std::vector<uint64> peer_ids;
  for (auto &category : by_category_) {
    for (auto &top_dialog : category.dialogs) {
      auto dialog_id = top_dialog.dialog_id;
      switch (dialog_id.get_type()) {
        case DialogType::User:
          peer_ids.push_back(dialog_id.get_user_id().get());
          break;
        case DialogType::Chat:
          peer_ids.push_back(dialog_id.get_chat_id().get());
          break;
        case DialogType::Channel:
          peer_ids.push_back(dialog_id.get_channel_id().get());
          break;
        default:
          break;
      }
    }
  }
  auto promise = PromiseCreator::lambda(
      [actor_id = actor_id(this)](Result<telegram_api::object_ptr<telegram_api::contacts_TopPeers>> result) {
        send_closure(actor_id, &TopDialogManager::on_get_top_peers, std::move(result));
      });
  td_->create_handler<GetTopPeersQuery>(std::move(promise))->send(get_vector_hash(peer_ids));
}

void TopDialogManager::on_get_top_peers(Result<telegram_api::object_ptr<telegram_api::contacts_TopPeers>> result) {
  normalize_rating();  // once a day too

  if (result.is_error()) {
    last_server_sync_ = Timestamp::in(SERVER_SYNC_RESEND_DELAY - SERVER_SYNC_DELAY);
    loop();
    return;
  }

  last_server_sync_ = Timestamp::now();
  server_sync_state_ = SyncState::Ok;

  auto top_peers_parent = result.move_as_ok();
  LOG(DEBUG) << "Receive contacts_getTopPeers result: " << to_string(top_peers_parent);
  switch (top_peers_parent->get_id()) {
    case telegram_api::contacts_topPeersNotModified::ID:
      // nothing to do
      break;
    case telegram_api::contacts_topPeersDisabled::ID:
      G()->set_option_boolean("disable_top_chats", true);
      set_is_enabled(false);  // apply immediately
      break;
    case telegram_api::contacts_topPeers::ID: {
      G()->set_option_empty("disable_top_chats");
      set_is_enabled(true);  // apply immediately
      auto top_peers = move_tl_object_as<telegram_api::contacts_topPeers>(std::move(top_peers_parent));

      td_->contacts_manager_->on_get_users(std::move(top_peers->users_), "on get top chats");
      td_->contacts_manager_->on_get_chats(std::move(top_peers->chats_), "on get top chats");
      for (auto &category : top_peers->categories_) {
        auto dialog_category = get_top_dialog_category(category->category_);
        auto pos = static_cast<size_t>(dialog_category);
        CHECK(pos < by_category_.size());
        auto &top_dialogs = by_category_[pos];

        top_dialogs.is_dirty = true;
        top_dialogs.dialogs.clear();
        for (auto &top_peer : category->peers_) {
          TopDialog top_dialog;
          top_dialog.dialog_id = DialogId(top_peer->peer_);
          top_dialog.rating = top_peer->rating_;
          top_dialogs.dialogs.push_back(std::move(top_dialog));
        }
      }
      db_sync_state_ = SyncState::None;
      break;
    }
    default:
      UNREACHABLE();
  }

  G()->td_db()->get_binlog_pmc()->set("top_dialogs_ts", to_string(static_cast<uint32>(Clocks::system())));
  loop();
}

void TopDialogManager::do_save_top_dialogs() {
  LOG(INFO) << "Save top chats";
  for (size_t top_dialog_category_i = 0; top_dialog_category_i < by_category_.size(); top_dialog_category_i++) {
    auto top_dialog_category = TopDialogCategory(top_dialog_category_i);
    auto key = PSTRING() << "top_dialogs#" << get_top_dialog_category_name(top_dialog_category);

    auto &top_dialogs = by_category_[top_dialog_category_i];
    if (!top_dialogs.is_dirty) {
      continue;
    }
    top_dialogs.is_dirty = false;

    G()->td_db()->get_binlog_pmc()->set(key, log_event_store(top_dialogs).as_slice().str());
  }
  db_sync_state_ = SyncState::Ok;
  first_unsync_change_ = Timestamp();
}

void TopDialogManager::start_up() {
  init();
}

void TopDialogManager::tear_down() {
  parent_.reset();
}

void TopDialogManager::init() {
  if (td_->auth_manager_ == nullptr || !td_->auth_manager_->is_authorized()) {
    return;
  }

  is_active_ = G()->use_chat_info_database() && !td_->auth_manager_->is_bot();
  is_enabled_ = !G()->get_option_boolean("disable_top_chats");
  update_rating_e_decay();

  string need_update_top_peers = G()->td_db()->get_binlog_pmc()->get("top_peers_enabled");
  if (!need_update_top_peers.empty()) {
    send_toggle_top_peers(need_update_top_peers[0] == '1');
  }

  try_start();
  loop();
}

void TopDialogManager::try_start() {
  was_first_sync_ = false;
  first_unsync_change_ = Timestamp();
  server_sync_state_ = SyncState::None;
  last_server_sync_ = Timestamp();
  CHECK(pending_get_top_dialogs_.empty());

  LOG(DEBUG) << "Init is enabled: " << is_enabled_;
  if (!is_active_) {
    G()->td_db()->get_binlog_pmc()->erase_by_prefix("top_dialogs");
    return;
  }

  auto di_top_dialogs_ts = G()->td_db()->get_binlog_pmc()->get("top_dialogs_ts");
  if (!di_top_dialogs_ts.empty()) {
    last_server_sync_ = Timestamp::in(to_integer<uint32>(di_top_dialogs_ts) - Clocks::system());
    if (last_server_sync_.is_in_past()) {
      server_sync_state_ = SyncState::Ok;
    }
  }

  if (is_enabled_) {
    for (size_t top_dialog_category_i = 0; top_dialog_category_i < by_category_.size(); top_dialog_category_i++) {
      auto top_dialog_category = TopDialogCategory(top_dialog_category_i);
      auto key = PSTRING() << "top_dialogs#" << get_top_dialog_category_name(top_dialog_category);
      auto value = G()->td_db()->get_binlog_pmc()->get(key);

      auto &top_dialogs = by_category_[top_dialog_category_i];
      top_dialogs.is_dirty = false;
      if (value.empty()) {
        continue;
      }
      log_event_parse(top_dialogs, value).ensure();
    }
    normalize_rating();
  } else {
    G()->td_db()->get_binlog_pmc()->erase_by_prefix("top_dialogs#");
    for (auto &top_dialogs : by_category_) {
      top_dialogs.is_dirty = false;
      top_dialogs.rating_timestamp = 0;
      top_dialogs.dialogs.clear();
    }
  }
  db_sync_state_ = SyncState::Ok;

  send_closure(G()->state_manager(), &StateManager::wait_first_sync,
               create_event_promise(self_closure(this, &TopDialogManager::on_first_sync)));
}

void TopDialogManager::on_first_sync() {
  was_first_sync_ = true;
  if (!G()->close_flag() && td_->auth_manager_->is_bot()) {
    is_active_ = false;
    try_start();
  }
  loop();
}

void TopDialogManager::loop() {
  if (!is_active_ || G()->close_flag()) {
    return;
  }

  if (!pending_get_top_dialogs_.empty()) {
    for (auto &query : pending_get_top_dialogs_) {
      do_get_top_dialogs(std::move(query));
    }
    pending_get_top_dialogs_.clear();
  }

  // server sync
  Timestamp server_sync_timeout;
  if (server_sync_state_ == SyncState::Ok) {
    server_sync_timeout = Timestamp::at(last_server_sync_.at() + SERVER_SYNC_DELAY);
    if (server_sync_timeout.is_in_past()) {
      server_sync_state_ = SyncState::None;
    }
  }

  Timestamp wakeup_timeout;
  if (server_sync_state_ == SyncState::Ok) {
    wakeup_timeout.relax(server_sync_timeout);
  } else if (server_sync_state_ == SyncState::None && was_first_sync_) {
    server_sync_state_ = SyncState::Pending;
    do_get_top_peers();
  }

  if (is_enabled_) {
    // database sync
    Timestamp db_sync_timeout;
    if (db_sync_state_ == SyncState::Ok) {
      if (first_unsync_change_) {
        db_sync_timeout = Timestamp::at(first_unsync_change_.at() + DB_SYNC_DELAY);
        if (db_sync_timeout.is_in_past()) {
          db_sync_state_ = SyncState::None;
        }
      }
    }

    if (db_sync_state_ == SyncState::Ok) {
      wakeup_timeout.relax(db_sync_timeout);
    } else if (db_sync_state_ == SyncState::None) {
      if (server_sync_state_ == SyncState::Ok) {
        do_save_top_dialogs();
      }
    }
  }

  if (wakeup_timeout) {
    LOG(INFO) << "Wakeup in: " << wakeup_timeout.in();
    set_timeout_at(wakeup_timeout.at());
  } else {
    LOG(INFO) << "Wakeup: never";
    cancel_timeout();
  }
}

}  // namespace td