Collect statistics about received updates.

This commit is contained in:
levlam 2023-01-25 17:48:04 +03:00
parent 30cdbe69b6
commit 2325c5041b
8 changed files with 68 additions and 10 deletions

View File

@ -420,7 +420,7 @@ static ActorOwn<> get_full_config(DcOption option, Promise<tl_object_ptr<telegra
void on_server_salt_updated(std::vector<mtproto::ServerSalt> server_salts) final {
// nop
}
void on_update(BufferSlice &&update) final {
void on_update(BufferSlice &&update, uint64 auth_key_id) final {
// nop
}
void on_result(NetQueryPtr net_query) final {
@ -1356,7 +1356,7 @@ void ConfigManager::process_config(tl_object_ptr<telegram_api::config> config) {
}
if (is_from_main_dc) {
options.set_option_integer("webfile_dc_id", config->webfile_dc_id_);
if ((config->flags_ & telegram_api::config::TMP_SESSIONS_MASK) != 0) {
if ((config->flags_ & telegram_api::config::TMP_SESSIONS_MASK) != 0 && config->tmp_sessions_ > 1) {
options.set_option_integer("session_count", config->tmp_sessions_);
} else {
options.set_option_empty("session_count");

View File

@ -3064,7 +3064,7 @@ std::shared_ptr<Td::ResultHandler> Td::extract_handler(uint64 id) {
return result;
}
void Td::on_update(BufferSlice &&update) {
void Td::on_update(BufferSlice &&update, uint64 auth_key_id) {
if (close_flag_ > 1) {
return;
}
@ -3076,6 +3076,7 @@ void Td::on_update(BufferSlice &&update) {
LOG(ERROR) << "Failed to fetch update: " << parser.get_error() << format::as_hex_dump<4>(update.as_slice());
updates_manager_->schedule_get_difference("failed to fetch update");
} else {
updates_manager_->on_update_from_auth_key_id(auth_key_id);
updates_manager_->on_get_updates(std::move(ptr), Promise<Unit>());
if (auth_manager_->is_bot() && auth_manager_->is_authorized()) {
alarm_timeout_.set_timeout_in(PING_SERVER_ALARM_ID,

View File

@ -118,7 +118,7 @@ class Td final : public Actor {
void schedule_get_promo_data(int32 expires_in);
void on_update(BufferSlice &&update);
void on_update(BufferSlice &&update, uint64 auth_key_id);
void on_result(NetQueryPtr query);

View File

@ -303,7 +303,22 @@ void UpdatesManager::fill_gap(void *td, const char *source) {
auto updates_manager = static_cast<Td *>(td)->updates_manager_.get();
if (source != nullptr && !updates_manager->running_get_difference_) {
LOG(WARNING) << "Filling gap in " << source << " by running getDifference";
auto auth_key_id = updates_manager->get_most_unused_auth_key_id();
uint64 update_count = 0;
double active_time = 0.0;
double delay_time = 0.0;
if (auth_key_id != 0) {
auto now = Time::now();
auto &info = updates_manager->session_infos_[auth_key_id];
update_count = info.update_count;
active_time = now - info.first_update_time;
delay_time = now - info.last_update_time;
}
LOG(WARNING) << "Filling gap in " << source
<< " by running getDifference. Receive no updates from session with auth key " << auth_key_id
<< " for " << delay_time << " seconds, active for " << active_time << " seconds and having "
<< update_count << " received updates";
}
updates_manager->get_difference("fill_gap");
@ -1479,6 +1494,37 @@ void UpdatesManager::init_state() {
get_difference("init_state");
}
uint64 UpdatesManager::get_most_unused_auth_key_id() {
uint64 min_auth_key_id = 0;
double min_time = Time::now();
for (auto &it : session_infos_) {
if (it.second.last_update_time < min_time) {
min_time = it.second.last_update_time;
min_auth_key_id = it.first;
}
}
return min_auth_key_id;
}
void UpdatesManager::on_update_from_auth_key_id(uint64 auth_key_id) {
if (auth_key_id == 0) {
return;
}
auto &info = session_infos_[auth_key_id];
auto now = Time::now();
info.last_update_time = now;
if (info.update_count++ == 0) {
info.first_update_time = now;
while (session_infos_.size() >
static_cast<size_t>(max(narrow_cast<int32>(G()->get_option_integer("session_count")), 1))) {
auto unused_auth_key_id = get_most_unused_auth_key_id();
LOG(INFO) << "Delete statistics for auth key " << unused_auth_key_id;
session_infos_.erase(unused_auth_key_id);
}
}
}
void UpdatesManager::ping_server() {
if (is_ping_sent_) {
return;

View File

@ -130,6 +130,8 @@ class UpdatesManager final : public Actor {
void schedule_get_difference(const char *source);
void on_update_from_auth_key_id(uint64 auth_key_id);
void ping_server();
bool running_get_difference() const {
@ -251,6 +253,13 @@ class UpdatesManager final : public Actor {
FlatHashMap<int64, TranscribedAudioHandler> pending_audio_transcriptions_;
MultiTimeout pending_audio_transcription_timeout_{"PendingAudioTranscriptionTimeout"};
struct SessionInfo {
uint64 update_count = 0;
double first_update_time = 0.0;
double last_update_time = 0.0;
};
FlatHashMap<uint64, SessionInfo> session_infos_;
void start_up() final;
void tear_down() final;
@ -365,6 +374,8 @@ class UpdatesManager final : public Actor {
void try_reload_data();
uint64 get_most_unused_auth_key_id();
static vector<int32> get_update_ids(const telegram_api::Updates *updates_ptr);
static bool have_update_pts_changed(const vector<tl_object_ptr<telegram_api::Update>> &updates);

View File

@ -704,7 +704,7 @@ void Session::on_session_created(uint64 unique_id, uint64 first_message_id) {
BufferSlice packet(4);
as<int32>(packet.as_slice().begin()) = telegram_api::updatesTooLong::ID;
last_activity_timestamp_ = Time::now();
callback_->on_update(std::move(packet));
callback_->on_update(std::move(packet), auth_data_.get_auth_key().id());
}
for (auto it = sent_queries_.begin(); it != sent_queries_.end();) {
@ -858,7 +858,7 @@ Status Session::on_update(BufferSlice packet) {
last_success_timestamp_ = Time::now();
}
last_activity_timestamp_ = Time::now();
callback_->on_update(std::move(packet));
callback_->on_update(std::move(packet), auth_data_.get_auth_key().id());
return Status::OK();
}

View File

@ -62,7 +62,7 @@ class Session final
Promise<unique_ptr<mtproto::RawConnection>>) = 0;
virtual void on_tmp_auth_key_updated(mtproto::AuthKey auth_key) = 0;
virtual void on_server_salt_updated(vector<mtproto::ServerSalt> server_salts) = 0;
virtual void on_update(BufferSlice &&update) = 0;
virtual void on_update(BufferSlice &&update, uint64 auth_key_id) = 0;
virtual void on_result(NetQueryPtr net_query) = 0;
};

View File

@ -58,8 +58,8 @@ class SessionCallback final : public Session::Callback {
send_closure(parent_, &SessionProxy::on_server_salt_updated, std::move(server_salts));
}
void on_update(BufferSlice &&update) final {
send_closure_later(G()->td(), &Td::on_update, std::move(update));
void on_update(BufferSlice &&update, uint64 auth_key_id) final {
send_closure_later(G()->td(), &Td::on_update, std::move(update), auth_key_id);
}
void on_result(NetQueryPtr query) final {