Merge Version 6.3.2

master
Giuseppe Marino 2 months ago
commit d6950e5d80
No known key found for this signature in database
GPG Key ID: C26F7A532ADEC25E

@ -6,7 +6,7 @@ if (POLICY CMP0065)
cmake_policy(SET CMP0065 NEW)
endif()
project(TelegramBotApi VERSION 6.2 LANGUAGES CXX)
project(TelegramBotApi VERSION 6.3.2 LANGUAGES CXX)
if (POLICY CMP0069)
option(TELEGRAM_BOT_API_ENABLE_LTO "Use \"ON\" to enable Link Time Optimization.")
@ -85,6 +85,7 @@ set(TELEGRAM_BOT_API_SOURCE
telegram-bot-api/HttpStatConnection.cpp
telegram-bot-api/Query.cpp
telegram-bot-api/Stats.cpp
telegram-bot-api/Watchdog.cpp
telegram-bot-api/WebhookActor.cpp
telegram-bot-api/Client.h
@ -95,6 +96,7 @@ set(TELEGRAM_BOT_API_SOURCE
telegram-bot-api/HttpStatConnection.h
telegram-bot-api/Query.h
telegram-bot-api/Stats.h
telegram-bot-api/Watchdog.h
telegram-bot-api/WebhookActor.h
)

@ -454,7 +454,7 @@ function onOptionsChanged() {
pre_text.push('Note that building requires a lot of memory, so you may need to increase allowed per-process memory usage in /etc/login.conf or build from root.');
}
if (os_netbsd) {
pre_text.push('Note that the following instruction is for NetBSD 8.0 and default SH shell.');
pre_text.push('Note that the following instruction is for NetBSD 8+ and default SH shell.');
}
var terminal_name = (function () {
@ -586,8 +586,8 @@ function onOptionsChanged() {
if (!use_root) {
commands.push('su -');
}
commands.push('export PKG_PATH=ftp://ftp.netbsd.org/pub/pkgsrc/packages/NetBSD/i386/8.0_2019Q2/All');
var packages = 'git gperf cmake openssl gcc5-libs';
commands.push('export PKG_PATH=http://cdn.netbsd.org/pub/pkgsrc/packages/NetBSD/$(uname -p)/$(uname -r)/All');
var packages = 'git gperf cmake openssl gcc12-libs mozilla-rootcerts-openssl';
commands.push('pkg_add ' + packages);
if (!use_root) {
commands.push('exit');

2
td

@ -1 +1 @@
Subproject commit d9cfcf88fe4ad06dae1716ce8f66bbeb7f9491d9
Subproject commit 7eba19887ad834fd731b6b07b53c2426fe4beb59

File diff suppressed because it is too large Load Diff

@ -26,6 +26,7 @@
#include "td/utils/Promise.h"
#include "td/utils/Slice.h"
#include "td/utils/Status.h"
#include "td/utils/WaitFreeHashMap.h"
#include <functional>
#include <limits>
@ -154,6 +155,9 @@ class Client final : public WebhookActor::Callback {
class JsonChatMembers;
class JsonChatMemberUpdated;
class JsonChatJoinRequest;
class JsonForumTopicCreated;
class JsonForumTopicIsClosedToggled;
class JsonForumTopicInfo;
class JsonGameHighScore;
class JsonAddress;
class JsonOrderInfo;
@ -200,6 +204,7 @@ class Client final : public WebhookActor::Callback {
class TdOnGetEditedMessageCallback;
class TdOnGetCallbackQueryMessageCallback;
class TdOnGetStickerSetCallback;
class TdOnGetForumTopicInfoCallback;
class TdOnGetMenuButtonCallback;
class TdOnGetMyCommandsCallback;
class TdOnGetMyDefaultAdministratorRightsCallback;
@ -281,6 +286,8 @@ class Client final : public WebhookActor::Callback {
template <class OnSuccess>
class TdOnCheckMessageCallback;
template <class OnSuccess>
class TdOnCheckMessageThreadCallback;
template <class OnSuccess>
class TdOnCheckRemoteFileIdCallback;
template <class OnSuccess>
class TdOnGetChatMemberCallback;
@ -327,6 +334,10 @@ class Client final : public WebhookActor::Callback {
void check_message(Slice chat_id_str, int64 message_id, bool allow_empty, AccessRights access_rights,
Slice message_type, PromisedQueryPtr query, OnSuccess on_success);
template <class OnSuccess>
void check_message_thread(int64 chat_id, int64 message_thread_id, int64 reply_to_message_id, PromisedQueryPtr query,
OnSuccess on_success);
template <class OnSuccess>
void resolve_sticker_set(const td::string &sticker_set_name, PromisedQueryPtr query, OnSuccess on_success);
@ -480,8 +491,7 @@ class Client final : public WebhookActor::Callback {
td::Result<object_ptr<td_api::InputMessageContent>> get_input_media(const Query *query, td::JsonValue &&input_media,
bool for_album) const;
td::Result<object_ptr<td_api::InputMessageContent>> get_input_media(const Query *query, Slice field_name,
bool for_album) const;
td::Result<object_ptr<td_api::InputMessageContent>> get_input_media(const Query *query, Slice field_name) const;
td::Result<td::vector<object_ptr<td_api::InputMessageContent>>> get_input_message_contents(const Query *query,
Slice field_name) const;
@ -489,7 +499,7 @@ class Client final : public WebhookActor::Callback {
td::Result<td::vector<object_ptr<td_api::InputMessageContent>>> get_input_message_contents(
const Query *query, td::JsonValue &&value) const;
static td::Result<object_ptr<td_api::inputMessageInvoice>> get_input_message_invoice(const Query *query);
td::Result<object_ptr<td_api::inputMessageInvoice>> get_input_message_invoice(const Query *query) const;
static object_ptr<td_api::messageSendOptions> get_message_send_options(bool disable_notification,
bool protect_content,
@ -534,6 +544,8 @@ class Client final : public WebhookActor::Callback {
static bool init_methods();
static bool is_local_method(Slice method);
void on_cmd(PromisedQueryPtr query);
Status process_get_me_query(PromisedQueryPtr &query);
@ -595,6 +607,13 @@ class Client final : public WebhookActor::Callback {
Status process_unpin_all_chat_messages_query(PromisedQueryPtr &query);
Status process_set_chat_sticker_set_query(PromisedQueryPtr &query);
Status process_delete_chat_sticker_set_query(PromisedQueryPtr &query);
Status process_get_forum_topic_icon_stickers_query(PromisedQueryPtr &query);
Status process_create_forum_topic_query(PromisedQueryPtr &query);
Status process_edit_forum_topic_query(PromisedQueryPtr &query);
Status process_close_forum_topic_query(PromisedQueryPtr &query);
Status process_reopen_forum_topic_query(PromisedQueryPtr &query);
Status process_delete_forum_topic_query(PromisedQueryPtr &query);
Status process_unpin_all_forum_topic_messages_query(PromisedQueryPtr &query);
Status process_get_chat_member_query(PromisedQueryPtr &query);
Status process_get_chat_administrators_query(PromisedQueryPtr &query);
Status process_get_chat_member_count_query(PromisedQueryPtr &query);
@ -672,6 +691,8 @@ class Client final : public WebhookActor::Callback {
int32 get_webhook_max_connections(const Query *query) const;
static bool get_webhook_fix_ip_address(const Query *query);
void do_set_webhook(PromisedQueryPtr query, bool was_deleted);
void on_webhook_certificate_copied(Status status);
void finish_set_webhook(PromisedQueryPtr query);
void save_webhook() const;
td::string get_webhook_certificate_path() const;
@ -698,6 +719,8 @@ class Client final : public WebhookActor::Callback {
void abort_long_poll(bool from_set_webhook);
void fail_query_closing(PromisedQueryPtr &&query) const;
void fail_query_conflict(Slice message, PromisedQueryPtr &&query);
static void fail_query_with_error(PromisedQueryPtr query, int32 error_code, Slice error_message,
@ -725,8 +748,10 @@ class Client final : public WebhookActor::Callback {
td::string first_name;
td::string last_name;
td::string username;
td::vector<td::string> active_usernames;
td::string editable_username;
td::string language_code;
int64 emoji_status_custom_emoji_id;
object_ptr<td_api::chatPhoto> photo;
td::string bio;
@ -775,7 +800,8 @@ class Client final : public WebhookActor::Callback {
const GroupInfo *get_group_info(int64 group_id) const;
struct SupergroupInfo {
td::string username;
td::vector<td::string> active_usernames;
td::string editable_username;
object_ptr<td_api::chatPhoto> photo;
td::string description;
td::string invite_link;
@ -786,6 +812,7 @@ class Client final : public WebhookActor::Callback {
object_ptr<td_api::chatLocation> location;
object_ptr<td_api::ChatMemberStatus> status;
bool is_supergroup = false;
bool is_forum = false;
bool can_set_sticker_set = false;
bool has_location = false;
bool join_to_send_messages = false;
@ -836,6 +863,7 @@ class Client final : public WebhookActor::Callback {
int64 sender_user_id = 0;
int64 sender_chat_id = 0;
int64 chat_id = 0;
int64 message_thread_id = 0;
int32 date = 0;
int32 edit_date = 0;
int64 initial_chat_id = 0;
@ -847,7 +875,6 @@ class Client final : public WebhookActor::Callback {
td::string initial_sender_name;
td::string author_signature;
int64 reply_to_message_id = 0;
int64 message_thread_id = 0;
int64 media_album_id = 0;
int64 via_bot_user_id = 0;
object_ptr<td_api::MessageContent> content;
@ -863,6 +890,7 @@ class Client final : public WebhookActor::Callback {
bool can_be_saved = false;
bool is_automatic_forward = false;
bool is_topic_message = false;
mutable bool is_reply_to_message_deleted = false;
mutable bool is_content_changed = false;
};
@ -889,7 +917,7 @@ class Client final : public WebhookActor::Callback {
bool have_sticker_set_name(int64 sticker_set_id) const;
Slice get_sticker_set_name(int64 sticker_set_id) const;
td::string get_sticker_set_name(int64 sticker_set_id) const;
int64 choose_added_member_id(const td_api::messageChatAddMembers *message_add_members) const;
@ -910,7 +938,8 @@ class Client final : public WebhookActor::Callback {
static void json_store_user_status(td::JsonObjectScope &object, const td_api::UserStatus *userStatus);
void remove_replies_to_message(int64 chat_id, int64 reply_to_message_id, bool only_from_cache);
void delete_message(int64 chat_id, int64 message_id, bool only_from_cache);
td::unique_ptr<MessageInfo> delete_message(int64 chat_id, int64 message_id, bool only_from_cache);
void add_new_message(object_ptr<td_api::message> &&message, bool is_edited);
void process_new_message_queue(int64 chat_id);
@ -1063,11 +1092,11 @@ class Client final : public WebhookActor::Callback {
static td::FlatHashMap<td::string, Status (Client::*)(PromisedQueryPtr &query)> methods_;
td::FlatHashMap<FullMessageId, td::unique_ptr<MessageInfo>, FullMessageIdHash> messages_; // message cache
td::FlatHashMap<int64, td::unique_ptr<UserInfo>> users_; // user info cache
td::FlatHashMap<int64, td::unique_ptr<GroupInfo>> groups_; // group info cache
td::FlatHashMap<int64, td::unique_ptr<SupergroupInfo>> supergroups_; // supergroup info cache
td::FlatHashMap<int64, td::unique_ptr<ChatInfo>> chats_; // chat info cache
td::WaitFreeHashMap<FullMessageId, td::unique_ptr<MessageInfo>, FullMessageIdHash> messages_;
td::WaitFreeHashMap<int64, td::unique_ptr<UserInfo>> users_;
td::WaitFreeHashMap<int64, td::unique_ptr<GroupInfo>> groups_;
td::WaitFreeHashMap<int64, td::unique_ptr<SupergroupInfo>> supergroups_;
td::WaitFreeHashMap<int64, td::unique_ptr<ChatInfo>> chats_;
td::FlatHashMap<FullMessageId, td::FlatHashSet<int64>, FullMessageIdHash>
reply_message_ids_; // message -> replies to it
@ -1118,7 +1147,7 @@ class Client final : public WebhookActor::Callback {
};
td::FlatHashMap<int64, NewCallbackQueryQueue> new_callback_query_queues_; // sender_user_id -> queue
td::FlatHashMap<int64, td::string> sticker_set_names_;
td::WaitFreeHashMap<int64, td::string> sticker_set_names_;
int64 cur_temp_bot_user_id_ = 1;
td::FlatHashMap<td::string, int64> bot_user_ids_;
@ -1162,6 +1191,7 @@ class Client final : public WebhookActor::Callback {
WebhookQueryType webhook_query_type_ = WebhookQueryType::Cancel;
td::ActorOwn<WebhookActor> webhook_id_;
PromisedQueryPtr webhook_set_query_;
PromisedQueryPtr active_webhook_set_query_;
td::string webhook_url_;
double webhook_set_time_ = 0;
int32 webhook_max_connections_ = 0;
@ -1188,6 +1218,13 @@ class Client final : public WebhookActor::Callback {
td::uint64 webhook_generation_ = 1;
UpdateType delayed_update_type_ = UpdateType::Size;
int64 delayed_chat_id_ = 0;
int32 delayed_min_date_ = 0;
int32 delayed_max_date_ = 0;
int32 delayed_max_time_ = 0;
size_t delayed_update_count_ = 0;
std::shared_ptr<const ClientParameters> parameters_;
td::ActorId<BotStatActor> stat_actor_;

@ -6,7 +6,6 @@
//
#include "telegram-bot-api/ClientManager.h"
#include "telegram-bot-api/Client.h"
#include "telegram-bot-api/ClientParameters.h"
#include "telegram-bot-api/WebhookActor.h"
#include "telegram-bot-api/StatsJson.h"
@ -31,6 +30,7 @@
#include "td/utils/Parser.h"
#include "td/utils/port/IPAddress.h"
#include "td/utils/port/Stat.h"
#include "td/utils/port/thread.h"
#include "td/utils/Slice.h"
#include "td/utils/SliceBuilder.h"
#include "td/utils/StackAllocator.h"
@ -39,7 +39,10 @@
#include "td/utils/Random.h"
#include "td/utils/base64.h"
#include <map>
#include "memprof/memprof.h"
#include <algorithm>
#include <atomic>
#include <tuple>
namespace telegram_bot_api {
@ -51,6 +54,8 @@ void ClientManager::close(td::Promise<td::Unit> &&promise) {
}
close_flag_ = true;
watchdog_id_.reset();
dump_statistics();
auto ids = clients_.ids();
for (auto id : ids) {
auto *client_info = clients_.get(id);
@ -262,7 +267,8 @@ void ClientManager::get_stats(td::Promise<td::BufferSlice> promise,
auto now = td::Time::now();
td::int32 active_bot_count = 0;
std::multimap<td::int64, td::uint64> top_bot_ids;
td::vector<std::pair<td::int64, td::uint64>> top_bot_ids;
size_t max_bots = 50;
for (auto id : clients_.ids()) {
auto *client_info = clients_.get(id);
CHECK(client_info);
@ -275,15 +281,17 @@ void ClientManager::get_stats(td::Promise<td::BufferSlice> promise,
continue;
}
auto stats = client_info->stat_.as_vector(now);
double score = 0.0;
for (auto &stat : stats) {
if (stat.key_ == "update_count" || stat.key_ == "request_count") {
score -= td::to_double(stat.value_);
}
auto score = static_cast<td::int64>(client_info->stat_.get_score(now) * -1e9);
if (score == 0 && top_bot_ids.size() >= max_bots) {
continue;
}
top_bot_ids.emplace(static_cast<td::int64>(score * 1e9), id);
top_bot_ids.emplace_back(score, id);
}
if (top_bot_ids.size() < max_bots) {
max_bots = top_bot_ids.size();
}
std::partial_sort(top_bot_ids.begin(), top_bot_ids.begin() + max_bots, top_bot_ids.end());
top_bot_ids.resize(max_bots);
if(!as_json) {
sb << stat_.get_description() << '\n';
@ -359,10 +367,12 @@ void ClientManager::get_stats(td::Promise<td::BufferSlice> promise,
for (std::pair<td::int64, td::uint64> top_bot_id : top_bot_ids) {
auto client_info = clients_.get(top_bot_id.second);
CHECK(client_info);
ServerBotInfo bot_info = client_info->client_->get_actor_unsafe()->get_bot_info();
ServerBotInfo bot_info = client_info->client_.get_actor_unsafe()->get_bot_info();
auto active_request_count = client_info->stat_.get_active_request_count();
auto active_file_upload_bytes = client_info->stat_.get_active_file_upload_bytes();
auto stats = client_info->stat_.as_json_ready_vector(now);
JsonStatsBotAdvanced bot(
std::move(top_bot_id), std::move(bot_info), std::move(stats), parameters_->stats_hide_sensible_data_, now
std::move(top_bot_id), std::move(bot_info), active_request_count, active_file_upload_bytes, std::move(stats), parameters_->stats_hide_sensible_data_, now
);
bots.push_back(bot);
}
@ -372,8 +382,9 @@ void ClientManager::get_stats(td::Promise<td::BufferSlice> promise,
for (auto top_bot_id : top_bot_ids) {
auto *client_info = clients_.get(top_bot_id.second);
CHECK(client_info);
auto bot_info = client_info->client_->get_actor_unsafe()->get_bot_info();
auto bot_info = client_info->client_.get_actor_unsafe()->get_bot_info();
auto active_request_count = client_info->stat_.get_active_request_count();
auto active_file_upload_bytes = client_info->stat_.get_active_file_upload_bytes();
sb << '\n';
sb << "id\t" << bot_info.id_ << '\n';
sb << "uptime\t" << now - bot_info.start_time_ << '\n';
@ -381,18 +392,30 @@ void ClientManager::get_stats(td::Promise<td::BufferSlice> promise,
sb << "token\t" << bot_info.token_ << '\n';
}
sb << "username\t" << bot_info.username_ << '\n';
if (!parameters_->stats_hide_sensible_data_) {
sb << "webhook\t" << bot_info.webhook_ << '\n';
} else if (bot_info.webhook_.empty()) {
sb << "webhook disabled" << '\n';
} else {
sb << "webhook enabled" << '\n';
if (active_request_count != 0) {
sb << "active_request_count\t" << active_request_count << '\n';
}
if (active_file_upload_bytes != 0) {
sb << "active_file_upload_bytes\t" << active_file_upload_bytes << '\n';
}
if (!bot_info.webhook_.empty()) {
if (!parameters_->stats_hide_sensible_data_) {
sb << "webhook\t" << bot_info.webhook_ << '\n';
} else {
sb << "webhook enabled" << '\n';
}
if (bot_info.has_webhook_certificate_) {
sb << "has_custom_certificate\t" << bot_info.has_webhook_certificate_ << '\n';
}
if (bot_info.webhook_max_connections_ != parameters_->default_max_webhook_connections_) {
sb << "webhook_max_connections\t" << bot_info.webhook_max_connections_ << '\n';
}
}
sb << "has_custom_certificate\t" << bot_info.has_webhook_certificate_ << '\n';
sb << "head_update_id\t" << bot_info.head_update_id_ << '\n';
sb << "tail_update_id\t" << bot_info.tail_update_id_ << '\n';
sb << "pending_update_count\t" << bot_info.pending_update_count_ << '\n';
sb << "webhook_max_connections\t" << bot_info.webhook_max_connections_ << '\n';
if (bot_info.pending_update_count_ != 0) {
sb << "tail_update_id\t" << bot_info.tail_update_id_ << '\n';
sb << "pending_update_count\t" << bot_info.pending_update_count_ << '\n';
}
auto stats = client_info->stat_.as_vector(now);
for (auto &stat : stats) {
@ -419,9 +442,7 @@ td::int64 ClientManager::get_tqueue_id(td::int64 user_id, bool is_test_dc) {
void ClientManager::start_up() {
//NB: the same scheduler as for database in Td
auto current_scheduler_id = td::Scheduler::instance()->sched_id();
auto scheduler_count = td::Scheduler::instance()->sched_count();
auto scheduler_id = td::min(current_scheduler_id + 1, scheduler_count - 1);
auto scheduler_id = 1;
// init tqueue
{
@ -459,13 +480,14 @@ void ClientManager::start_up() {
LOG(WARNING) << "Loaded " << loaded_event_count << " TQueue events in " << (td::Time::now() - load_start_time)
<< " seconds";
next_tqueue_gc_time_ = td::Time::now() + 600;
}
// init webhook_db and user_db
auto concurrent_webhook_db = td::make_unique<td::BinlogKeyValue<td::ConcurrentBinlog>>();
auto status = concurrent_webhook_db->init(parameters_->working_directory_ + "webhooks_db.binlog", td::DbKey::empty(),
scheduler_id);
LOG_IF(FATAL, status.is_error()) << "Can't open webhooks_db.binlog " << status.error();
LOG_IF(FATAL, status.is_error()) << "Can't open webhooks_db.binlog " << status;
parameters_->shared_data_->webhook_db_ = std::move(concurrent_webhook_db);
auto concurrent_user_db = td::make_unique<td::BinlogKeyValue<td::ConcurrentBinlog>>();
@ -486,6 +508,10 @@ void ClientManager::start_up() {
send_closure_later(actor_id(this), &ClientManager::send, std::move(query));
}
// launch watchdog
watchdog_id_ = td::create_actor_on_scheduler<Watchdog>(
"ManagerWatchdog", td::Scheduler::instance()->sched_count() - 3, td::this_thread::get_id(), WATCHDOG_TIMEOUT);
set_timeout_in(600.0);
}
PromisedQueryPtr ClientManager::get_webhook_restore_query(td::Slice token, bool is_user, td::Slice webhook_info,
@ -547,6 +573,61 @@ PromisedQueryPtr ClientManager::get_webhook_restore_query(td::Slice token, bool
return PromisedQueryPtr(query.release(), PromiseDeleter(td::Promise<td::unique_ptr<Query>>()));
}
void ClientManager::dump_statistics() {
if (is_memprof_on()) {
LOG(WARNING) << "Memory dump:";
td::vector<AllocInfo> v;
dump_alloc([&](const AllocInfo &info) { v.push_back(info); });
std::sort(v.begin(), v.end(), [](const AllocInfo &a, const AllocInfo &b) { return a.size > b.size; });
size_t total_size = 0;
size_t other_size = 0;
int count = 0;
for (auto &info : v) {
if (count++ < 50) {
LOG(WARNING) << td::format::as_size(info.size) << td::format::as_array(info.backtrace);
} else {
other_size += info.size;
}
total_size += info.size;
}
LOG(WARNING) << td::tag("other", td::format::as_size(other_size));
LOG(WARNING) << td::tag("total size", td::format::as_size(total_size));
LOG(WARNING) << td::tag("total traces", get_ht_size());
LOG(WARNING) << td::tag("fast_backtrace_success_rate", get_fast_backtrace_success_rate());
}
auto r_mem_stat = td::mem_stat();
if (r_mem_stat.is_ok()) {
auto mem_stat = r_mem_stat.move_as_ok();
LOG(WARNING) << td::tag("rss", td::format::as_size(mem_stat.resident_size_));
LOG(WARNING) << td::tag("vm", td::format::as_size(mem_stat.virtual_size_));
LOG(WARNING) << td::tag("rss_peak", td::format::as_size(mem_stat.resident_size_peak_));
LOG(WARNING) << td::tag("vm_peak", td::format::as_size(mem_stat.virtual_size_peak_));
}
LOG(WARNING) << td::tag("buffer_mem", td::format::as_size(td::BufferAllocator::get_buffer_mem()));
LOG(WARNING) << td::tag("buffer_slice_size", td::format::as_size(td::BufferAllocator::get_buffer_slice_size()));
const auto &shared_data = parameters_->shared_data_;
auto query_list_size = shared_data->query_list_size_.load(std::memory_order_relaxed);
auto query_count = shared_data->query_count_.load(std::memory_order_relaxed);
LOG(WARNING) << td::tag("pending queries", query_count) << td::tag("pending requests", query_list_size);
td::uint64 i = 0;
bool was_gap = false;
for (auto end = &shared_data->query_list_, cur = end->prev; cur != end; cur = cur->prev, i++) {
if (i < 20 || i > query_list_size - 20 || i % (query_list_size / 50 + 1) == 0) {
if (was_gap) {
LOG(WARNING) << "...";
was_gap = false;
}
LOG(WARNING) << static_cast<const Query &>(*cur);
} else {
was_gap = true;
}
}
td::dump_pending_network_queries(*parameters_->net_query_stats_);
}
void ClientManager::raw_event(const td::Event::Raw &event) {
auto id = get_link_token();
auto *info = clients_.get(id);
@ -563,6 +644,28 @@ void ClientManager::raw_event(const td::Event::Raw &event) {
}
}
void ClientManager::timeout_expired() {
send_closure(watchdog_id_, &Watchdog::kick);
set_timeout_in(WATCHDOG_TIMEOUT / 2);
double now = td::Time::now();
if (now > next_tqueue_gc_time_) {
auto unix_time = parameters_->shared_data_->get_unix_time(now);
LOG(INFO) << "Run TQueue GC at " << unix_time;
td::int64 deleted_events;
bool is_finished;
std::tie(deleted_events, is_finished) = parameters_->shared_data_->tqueue_->run_gc(unix_time);
LOG(INFO) << "TQueue GC deleted " << deleted_events << " events";
next_tqueue_gc_time_ = td::Time::now() + (is_finished ? 60.0 : 1.0);
tqueue_deleted_events_ += deleted_events;
if (tqueue_deleted_events_ > last_tqueue_deleted_events_ + 10000) {
LOG(WARNING) << "TQueue GC already deleted " << tqueue_deleted_events_ << " events since the start";
last_tqueue_deleted_events_ = tqueue_deleted_events_;
}
}
}
void ClientManager::hangup_shared() {
auto id = get_link_token();
auto *info = clients_.get(id);
@ -600,4 +703,6 @@ void ClientManager::finish_close() {
stop();
}
constexpr double ClientManager::WATCHDOG_TIMEOUT;
} // namespace telegram_bot_api

@ -9,6 +9,7 @@
#include "telegram-bot-api/Client.h"
#include "telegram-bot-api/Query.h"
#include "telegram-bot-api/Stats.h"
#include "telegram-bot-api/Watchdog.h"
#include "td/actor/actor.h"
@ -41,6 +42,8 @@ class ClientManager final : public td::Actor {
: parameters_(std::move(parameters)), token_range_(token_range) {
}
void dump_statistics();
void send(PromisedQueryPtr query);
void user_login(PromisedQueryPtr query);
@ -71,6 +74,13 @@ class ClientManager final : public td::Actor {
bool close_flag_ = false;
td::vector<td::Promise<td::Unit>> close_promises_;
td::ActorOwn<Watchdog> watchdog_id_;
double next_tqueue_gc_time_ = 0.0;
td::int64 tqueue_deleted_events_ = 0;
td::int64 last_tqueue_deleted_events_ = 0;
static constexpr double WATCHDOG_TIMEOUT = 0.5;
static td::int64 get_tqueue_id(td::int64 user_id, bool is_test_dc);
static PromisedQueryPtr get_webhook_restore_query(td::Slice token, bool is_user, td::Slice webhook_info,
@ -78,6 +88,7 @@ class ClientManager final : public td::Actor {
void start_up() final;
void raw_event(const td::Event::Raw &event) final;
void timeout_expired() final;
void hangup_shared() final;
void close_db();
void finish_close();

@ -29,10 +29,10 @@ namespace telegram_bot_api {
struct SharedData {
std::atomic<td::uint64> query_count_{0};
std::atomic<size_t> query_list_size_{0};
std::atomic<int> next_verbosity_level_{-1};
// not thread-safe
size_t query_list_size_ = 0;
// not thread-safe, must be used from a single thread
td::ListNode query_list_;
td::unique_ptr<td::KeyValueSyncInterface> webhook_db_;
td::unique_ptr<td::KeyValueSyncInterface> user_db_;

@ -21,7 +21,7 @@ namespace telegram_bot_api {
void HttpConnection::handle(td::unique_ptr<td::HttpQuery> http_query,
td::ActorOwn<td::HttpInboundConnection> connection) {
CHECK(connection_->empty());
CHECK(connection_.empty());
connection_ = std::move(connection);
LOG(DEBUG) << "Handle " << *http_query;

@ -15,7 +15,7 @@ namespace telegram_bot_api {
void HttpStatConnection::handle(td::unique_ptr<td::HttpQuery> http_query,
td::ActorOwn<td::HttpInboundConnection> connection) {
CHECK(connection_->empty());
CHECK(connection_.empty());
connection_ = std::move(connection);
td::Parser url_path_parser(http_query->url_path_);
as_json_ = url_path_parser.try_skip("/json");

@ -46,9 +46,9 @@ Query::Query(td::vector<td::BufferSlice> &&container, td::Slice token, bool is_u
start_timestamp_ = td::Time::now();
LOG(INFO) << "QUERY: create " << td::tag("ptr", this) << *this;
if (shared_data_) {
shared_data_->query_count_++;
shared_data_->query_count_.fetch_add(1, std::memory_order_relaxed);
if (method_ != "getupdates") {
shared_data_->query_list_size_++;
shared_data_->query_list_size_.fetch_add(1, std::memory_order_relaxed);
shared_data_->query_list_.put(this);
}
}
@ -136,7 +136,7 @@ void Query::send_response_stat() const {
return;
}
send_closure(stat_actor_, &BotStatActor::add_event<ServerBotStat::Response>,
ServerBotStat::Response{state_ == State::OK, answer_.size()}, now);
ServerBotStat::Response{state_ == State::OK, answer_.size(), files_size()}, now);
}
} // namespace telegram_bot_api

@ -23,6 +23,7 @@
#include "td/utils/StringBuilder.h"
#include <algorithm>
#include <atomic>
#include <memory>
#include <utility>
@ -112,9 +113,9 @@ class Query final : public td::ListNode {
Query &operator=(Query &&) = delete;
~Query() {
if (shared_data_) {
shared_data_->query_count_--;
shared_data_->query_count_.fetch_sub(1, std::memory_order_relaxed);
if (!empty()) {
shared_data_->query_list_size_--;
shared_data_->query_list_size_.fetch_sub(1, std::memory_order_relaxed);
}
}
}

@ -185,6 +185,25 @@ td::vector<td::string> BotStatActor::get_jsonable_description() const {
}
double BotStatActor::get_score(double now) {
auto minute_stat = stat_[2].stat_duration(now);
double result = minute_stat.first.request_count_ + minute_stat.first.update_count_;
if (minute_stat.second != 0) {
result /= minute_stat.second;
}
result += td::max(static_cast<double>(get_active_request_count() - 10), static_cast<double>(0));
result += static_cast<double>(get_active_file_upload_bytes()) * 1e-8;
return result;
}
td::int64 BotStatActor::get_active_request_count() const {
return active_request_count_;
}
td::int64 BotStatActor::get_active_file_upload_bytes() const {
return active_file_upload_bytes_;
}
bool BotStatActor::is_active(double now) const {
return last_activity_timestamp_ > now - 86400;
}

@ -116,15 +116,16 @@ struct ServerBotStat {
struct Response {
bool ok_;
size_t size_;
td::int64 files_size_;
};
void on_event(const Response &answer) {
void on_event(const Response &response) {
response_count_++;
if (answer.ok_) {
if (response.ok_) {
response_count_ok_++;
} else {
response_count_error_++;
}
response_bytes_ += static_cast<double>(answer.size_);
response_bytes_ += static_cast<double>(response.size_);
}
struct Request {
@ -174,6 +175,7 @@ class BotStatActor final : public td::Actor {
for (auto &stat : stat_) {
stat.add_event(event, now);
}
on_event(event);
if (!parent_.empty()) {
send_closure(parent_, &BotStatActor::add_event<EventT>, event, now);
}
@ -184,6 +186,12 @@ class BotStatActor final : public td::Actor {
td::string get_description() const;
td::vector<td::string> get_jsonable_description() const;
double get_score(double now);
td::int64 get_active_request_count() const;
td::int64 get_active_file_upload_bytes() const;
bool is_active(double now) const;
static constexpr std::size_t SIZE = 4;
@ -193,6 +201,23 @@ class BotStatActor final : public td::Actor {
td::TimedStat<ServerBotStat> stat_[SIZE];
td::ActorId<BotStatActor> parent_;
double last_activity_timestamp_ = -1e9;
td::int64 active_request_count_ = 0;
td::int64 active_file_upload_bytes_ = 0;
void on_event(const ServerBotStat::Update &update) {
}
void on_event(const ServerBotStat::Response &response) {
active_request_count_--;
active_file_upload_bytes_ -= response.files_size_;
CHECK(active_request_count_ >= 0);
CHECK(active_file_upload_bytes_ >= 0);
}
void on_event(const ServerBotStat::Request &request) {
active_request_count_++;
active_file_upload_bytes_ += request.files_size_;
}
};
} // namespace telegram_bot_api

@ -200,10 +200,13 @@ class JsonStatsBotAdvanced : public JsonStatsBot {
public:
explicit JsonStatsBotAdvanced(std::pair<td::int64, td::uint64> score_id_pair,
ServerBotInfo bot,
td::int64 active_request_count,
td::int64 active_file_upload_bytes,
td::vector<ServerBotStat> stats,
const bool hide_sensible_data,
const double now)
: JsonStatsBot(std::move(score_id_pair)), bot_(std::move(bot)), stats_(std::move(stats)),
: JsonStatsBot(std::move(score_id_pair)), bot_(std::move(bot)), active_request_count_(active_request_count),
active_file_upload_bytes_(active_file_upload_bytes), stats_(std::move(stats)),
hide_sensible_data_(hide_sensible_data), now_(now) {
}
void store(td::JsonValueScope *scope) const {
@ -235,6 +238,8 @@ class JsonStatsBotAdvanced : public JsonStatsBot {
}
private:
ServerBotInfo bot_;
td::int64 active_request_count_;
td::int64 active_file_upload_bytes_;
td::vector<ServerBotStat> stats_;
const bool hide_sensible_data_;
const double now_;

@ -0,0 +1,28 @@
//
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2021
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include "telegram-bot-api/Watchdog.h"
#include "td/utils/logging.h"
#include "td/utils/Time.h"
namespace telegram_bot_api {
void Watchdog::kick() {
auto now = td::Time::now();
if (now >= last_kick_time_ + timeout_ && last_kick_time_ > 0) {
LOG(ERROR) << get_name() << " timeout expired after " << now - last_kick_time_ << " seconds";
td::thread::send_real_time_signal(main_thread_id_, 2);
}
last_kick_time_ = now;
set_timeout_in(timeout_);
}
void Watchdog::timeout_expired() {
kick();
}
} // namespace telegram_bot_api

@ -0,0 +1,31 @@
//
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2021
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#pragma once
#include "td/actor/actor.h"
#include "td/utils/port/thread.h"
namespace telegram_bot_api {
class Watchdog final : public td::Actor {
public:
Watchdog(td::thread::id main_thread_id, double timeout) : main_thread_id_(main_thread_id), timeout_(timeout) {
// watchdog is disabled until it is kicked for the first time
}
void kick();
private:
void timeout_expired() final;
td::thread::id main_thread_id_;
double timeout_;
double last_kick_time_ = 0.0;
};
} // namespace telegram_bot_api

@ -9,13 +9,10 @@
#include "telegram-bot-api/HttpConnection.h"
#include "telegram-bot-api/HttpServer.h"
#include "telegram-bot-api/HttpStatConnection.h"
#include "telegram-bot-api/Query.h"
#include "telegram-bot-api/Stats.h"
#include "td/telegram/ClientActor.h"
#include "telegram-bot-api/Watchdog.h"
#include "td/db/binlog/Binlog.h"
#include "td/db/TQueue.h"
#include "td/net/GetHostByNameActor.h"
#include "td/net/HttpInboundConnection.h"
@ -23,13 +20,11 @@
#include "td/actor/actor.h"
#include "td/actor/ConcurrentScheduler.h"
#include "td/utils/buffer.h"
#include "td/utils/AsyncFileLog.h"
#include "td/utils/CombinedLog.h"
#include "td/utils/common.h"
#include "td/utils/crypto.h"
#include "td/utils/ExitGuard.h"
#include "td/utils/FileLog.h"
#include "td/utils/format.h"
//#include "td/utils/GitInfo.h"
#include "td/utils/logging.h"
#include "td/utils/MemoryLog.h"
@ -42,18 +37,14 @@
#include "td/utils/port/rlimit.h"
#include "td/utils/port/signals.h"
#include "td/utils/port/stacktrace.h"
#include "td/utils/port/Stat.h"
#include "td/utils/port/thread.h"
#include "td/utils/port/user.h"
#include "td/utils/Promise.h"
#include "td/utils/Slice.h"
#include "td/utils/SliceBuilder.h"
#include "td/utils/Status.h"
#include "td/utils/Time.h"
#include "td/utils/TsLog.h"
#include "memprof/memprof.h"
#include <algorithm>
#include <atomic>
#include <cstdlib>
#include <memory>
@ -78,18 +69,31 @@ static td::MemoryLog<1 << 20> memory_log;
void print_log() {
auto buf = memory_log.get_buffer();
auto pos = memory_log.get_pos();
size_t tail_length = buf.size() - pos;
while (tail_length > 0 && buf[pos + tail_length - 1] == ' ') {
tail_length--;
}
if (tail_length + 100 >= buf.size() - pos) {
tail_length = buf.size() - pos;
}
td::signal_safe_write("------- Log dump -------\n");
td::signal_safe_write(buf.substr(pos), false);
td::signal_safe_write(buf.substr(pos, tail_length), false);
td::signal_safe_write(buf.substr(0, pos), false);
td::signal_safe_write("\n", false);
td::signal_safe_write("------------------------\n");
}
static std::atomic_bool has_failed{false};
static void dump_stacktrace_signal_handler(int sig) {
if (has_failed) {
return;
}
td::Stacktrace::print_to_stderr();
}
static void fail_signal_handler(int sig) {
has_failed = true;
td::signal_safe_write_signal_number(sig);
td::Stacktrace::PrintOptions options;
options.use_gdb = true;
@ -107,6 +111,9 @@ static void change_verbosity_level_signal_handler(int sig) {
static std::atomic_flag need_dump_log;
static void dump_log_signal_handler(int sig) {
if (has_failed) {
return;
}
need_dump_log.clear();
}
@ -115,61 +122,6 @@ static void sigsegv_signal_handler(int signum, void *addr) {
fail_signal_handler(signum);
}
static void dump_statistics(const std::shared_ptr<SharedData> &shared_data,
const std::shared_ptr<td::NetQueryStats> &net_query_stats) {
if (is_memprof_on()) {
LOG(WARNING) << "Memory dump:";
td::vector<AllocInfo> v;
dump_alloc([&](const AllocInfo &info) { v.push_back(info); });
std::sort(v.begin(), v.end(), [](const AllocInfo &a, const AllocInfo &b) { return a.size > b.size; });
size_t total_size = 0;
size_t other_size = 0;
int count = 0;
for (auto &info : v) {
if (count++ < 50) {
LOG(WARNING) << td::format::as_size(info.size) << td::format::as_array(info.backtrace);
} else {
other_size += info.size;
}
total_size += info.size;
}
LOG(WARNING) << td::tag("other", td::format::as_size(other_size));
LOG(WARNING) << td::tag("total size", td::format::as_size(total_size));
LOG(WARNING) << td::tag("total traces", get_ht_size());
LOG(WARNING) << td::tag("fast_backtrace_success_rate", get_fast_backtrace_success_rate());
}
auto r_mem_stat = td::mem_stat();
if (r_mem_stat.is_ok()) {
auto mem_stat = r_mem_stat.move_as_ok();
LOG(WARNING) << td::tag("rss", td::format::as_size(mem_stat.resident_size_));
LOG(WARNING) << td::tag("vm", td::format::as_size(mem_stat.virtual_size_));
LOG(WARNING) << td::tag("rss_peak", td::format::as_size(mem_stat.resident_size_peak_));
LOG(WARNING) << td::tag("vm_peak", td::format::as_size(mem_stat.virtual_size_peak_));
}
LOG(WARNING) << td::tag("buffer_mem", td::format::as_size(td::BufferAllocator::get_buffer_mem()));
LOG(WARNING) << td::tag("buffer_slice_size", td::format::as_size(td::BufferAllocator::get_buffer_slice_size()));
auto query_list_size = shared_data->query_list_size_;
auto query_count = shared_data->query_count_.load();
LOG(WARNING) << td::tag("pending queries", query_count) << td::tag("pending requests", query_list_size);
td::uint64 i = 0;
bool was_gap = false;
for (auto end = &shared_data->query_list_, cur = end->prev; cur != end; cur = cur->prev, i++) {
if (i < 20 || i > query_list_size - 20 || i % (query_list_size / 50 + 1) == 0) {
if (was_gap) {
LOG(WARNING) << "...";
was_gap = false;
}
LOG(WARNING) << static_cast<const Query &>(*cur);
} else {
was_gap = true;
}
}
td::dump_pending_network_queries(*net_query_stats);
}
int main(int argc, char *argv[]) {
SET_VERBOSITY_LEVEL(VERBOSITY_NAME(FATAL));
td::ExitGuard exit_guard;
@ -191,16 +143,16 @@ int main(int argc, char *argv[]) {
td::set_signal_handler(td::SignalType::Other, fail_signal_handler).ensure();
td::set_extended_signal_handler(td::SignalType::Error, sigsegv_signal_handler).ensure();
td::set_runtime_signal_handler(0, change_verbosity_level_signal_handler).ensure();
td::set_runtime_signal_handler(1, dump_log_signal_handler).ensure();
td::set_runtime_signal_handler(2, dump_stacktrace_signal_handler).ensure();
td::set_real_time_signal_handler(0, change_verbosity_level_signal_handler).ensure();
td::set_real_time_signal_handler(1, dump_log_signal_handler).ensure();
td::set_real_time_signal_handler(2, dump_stacktrace_signal_handler).ensure();
td::init_openssl_threads();
auto start_time = td::Time::now();
auto shared_data = std::make_shared<SharedData>();
auto parameters = std::make_unique<ClientParameters>();
parameters->version_ = "6.2";
parameters->version_ = "6.3.2";
parameters->shared_data_ = shared_data;
parameters->start_time_ = start_time;
auto net_query_stats = td::create_net_query_stats();
@ -222,6 +174,8 @@ int main(int argc, char *argv[]) {
td::string username;
td::string groupname;
td::uint64 max_connections = 0;
td::uint64 cpu_affinity = 0;
td::uint64 main_thread_affinity = 0;
ClientManager::TokenRange token_range{0, 1};
parameters->api_id_ = [](auto x) -> td::int32 {
@ -319,6 +273,17 @@ int main(int argc, char *argv[]) {
options.add_option('g', "groupname", "effective group name to switch to", td::OptionParser::parse_string(groupname));
options.add_checked_option('c', "max-connections", "maximum number of open file descriptors",
td::OptionParser::parse_integer(max_connections));
#if TD_HAVE_THREAD_AFFINITY
options.add_checked_option('\0', "cpu-affinity", "CPU affinity as 64-bit mask (defaults to all available CPUs)",
td::OptionParser::parse_integer(cpu_affinity));
options.add_checked_option(
'\0', "main-thread-affinity",
"CPU affinity of the main thread as 64-bit mask (defaults to the value of the option --cpu-affinity)",
td::OptionParser::parse_integer(main_thread_affinity));
#else
(void)cpu_affinity;
(void)main_thread_affinity;
#endif
options.add_checked_option('\0', "max-batch-operations", PSLICE() << "maximum number of batch operations (default: " << parameters->max_batch_operations << ")",
@ -375,10 +340,29 @@ int main(int argc, char *argv[]) {
log.set_second(&memory_log);
td::log_interface = &log;
td::FileLog file_log;
td::TsLog ts_log(&file_log);
td::AsyncFileLog file_log;
auto init_status = [&] {
#if TD_HAVE_THREAD_AFFINITY
if (main_thread_affinity == 0) {
main_thread_affinity = cpu_affinity;
}
if (main_thread_affinity != 0) {
auto initial_mask = td::thread::get_affinity_mask(td::this_thread::get_id());
if (initial_mask == 0) {
return td::Status::Error("Failed to get current thread affinity");
}
if (cpu_affinity != 0) {
TRY_STATUS_PREFIX(td::thread::set_affinity_mask(td::this_thread::get_id(), cpu_affinity),
"Can't set CPU affinity mask: ");
} else {
cpu_affinity = initial_mask;
}
TRY_STATUS_PREFIX(td::thread::set_affinity_mask(td::this_thread::get_id(), main_thread_affinity),
"Can't set main thread CPU affinity mask: ");
}
#endif
if (max_connections != 0) {
TRY_STATUS_PREFIX(td::set_resource_limit(td::ResourceLimitType::NoFile, max_connections),
"Can't set file descriptor limit: ");
@ -392,7 +376,7 @@ int main(int argc, char *argv[]) {
TRY_RESULT_PREFIX_ASSIGN(working_directory, td::realpath(working_directory, true),
"Invalid working directory specified: ");
if (working_directory.empty()) {
return td::Status::Error("Working directory can't be empty");
return td::Status::Error("Empty path specified as working directory");
}
if (working_directory.back() != TD_DIR_SLASH) {
working_directory += TD_DIR_SLASH;
@ -448,13 +432,13 @@ int main(int argc, char *argv[]) {
log_file_path = working_directory + log_file_path;
}
TRY_STATUS_PREFIX(file_log.init(log_file_path, log_max_file_size), "Can't open log file: ");
log.set_first(&ts_log);
log.set_first(&file_log);
}
return td::Status::OK();
}();
if (init_status.is_error()) {
LOG(PLAIN) << init_status.error().message();
LOG(PLAIN) << init_status.message();
LOG(PLAIN) << options;
return 1;
}
@ -479,48 +463,58 @@ int main(int argc, char *argv[]) {
// << (td::GitInfo::is_dirty() ? "(dirty)" : "") << " started";
LOG(WARNING) << "Bot API " << parameters->version_ << " server started";
const int threads_n = 5; // +3 for Td, one for slow HTTP connections and one for DNS resolving
td::ConcurrentScheduler sched;
sched.init(threads_n);
// +3 threads for Td
// one thread for ClientManager and all Clients
// one thread for watchdogs
// one thread for slow HTTP connections
// one thread for DNS resolving
const int thread_count = 7;
td::ConcurrentScheduler sched(thread_count, cpu_affinity);
td::GetHostByNameActor::Options get_host_by_name_options;
get_host_by_name_options.scheduler_id = threads_n;
get_host_by_name_options.scheduler_id = thread_count;
parameters->get_host_by_name_actor_id_ =
sched.create_actor_unsafe<td::GetHostByNameActor>(0, "GetHostByName", std::move(get_host_by_name_options))
.release();
auto client_manager =
sched.create_actor_unsafe<ClientManager>(0, "ClientManager", std::move(parameters), token_range).release();
sched.create_actor_unsafe<ClientManager>(thread_count - 3, "ClientManager", std::move(parameters), token_range)
.release();
sched
.create_actor_unsafe<HttpServer>(
0, "HttpServer", http_ip_address, http_port,
thread_count - 3, "HttpServer", http_ip_address, http_port,
[client_manager, shared_data] {
return td::ActorOwn<td::HttpInboundConnection::Callback>(
td::create_actor<HttpConnection>("HttpConnection", client_manager, shared_data));
})
.release();
if (http_stat_port != 0) {
sched
.create_actor_unsafe<HttpServer>(
0, "HttpStatsServer", http_stat_ip_address, http_stat_port,
thread_count - 3, "HttpStatsServer", http_stat_ip_address, http_stat_port,
[client_manager] {
return td::ActorOwn<td::HttpInboundConnection::Callback>(
td::create_actor<HttpStatConnection>("HttpStatConnection", client_manager));
})
.release();
}
constexpr double WATCHDOG_TIMEOUT = 0.5;
auto watchdog_id =
sched.create_actor_unsafe<Watchdog>(thread_count - 2, "Watchdog", td::this_thread::get_id(), WATCHDOG_TIMEOUT);