Compare commits

..

No commits in common. "master" and "v6.2-1" have entirely different histories.

27 changed files with 2557 additions and 5727 deletions

View File

@ -4,36 +4,15 @@ Language: Cpp
AccessModifierOffset: -1
AlignAfterOpenBracket: Align
AlignArrayOfStructures: None
AlignConsecutiveAssignments:
Enabled: false
AcrossEmptyLines: false
AcrossComments: false
AlignCompound: false
PadOperators: true
AlignConsecutiveBitFields:
Enabled: false
AcrossEmptyLines: false
AcrossComments: false
AlignCompound: false
PadOperators: false
AlignConsecutiveDeclarations:
Enabled: false
AcrossEmptyLines: false
AcrossComments: false
AlignCompound: false
PadOperators: false
AlignConsecutiveMacros:
Enabled: false
AcrossEmptyLines: false
AcrossComments: false
AlignCompound: false
PadOperators: false
AlignConsecutiveMacros: None
AlignConsecutiveAssignments: None
AlignConsecutiveBitFields: None
AlignConsecutiveDeclarations: None
AlignEscapedNewlines: Left
AlignOperands: Align
AlignTrailingComments:
Kind: Always
OverEmptyLines: 0
AlignTrailingComments: true
AllowAllArgumentsOnNextLine: true
AllowAllConstructorInitializersOnNextLine: true
AllowAllParametersOfDeclarationOnNextLine: true
AllowShortBlocksOnASingleLine: Never
AllowShortCaseLabelsOnASingleLine: false
@ -46,8 +25,6 @@ AlwaysBreakAfterDefinitionReturnType: None
AlwaysBreakAfterReturnType: None
AlwaysBreakBeforeMultilineStrings: true
AlwaysBreakTemplateDeclarations: Yes
# AttributeMacros:
# - __capability
BinPackArguments: true
BinPackParameters: true
BitFieldColonSpacing: Both
@ -56,12 +33,12 @@ BraceWrapping:
AfterClass: false
AfterControlStatement: Never
AfterEnum: false
AfterExternBlock: false
AfterFunction: false
AfterNamespace: false
AfterObjCDeclaration: false
AfterStruct: false
AfterUnion: false
AfterExternBlock: false
BeforeCatch: false
BeforeElse: false
BeforeLambdaBody: false
@ -70,23 +47,24 @@ BraceWrapping:
SplitEmptyFunction: true
SplitEmptyRecord: true
SplitEmptyNamespace: true
BreakAfterAttributes: Never
# BreakAfterJavaFieldAnnotations: false
BreakArrays: true
BreakBeforeBinaryOperators: None
BreakBeforeBraces: Attach
BreakBeforeConceptDeclarations: Always
BreakBeforeInlineASMColon: OnlyMultiline
BreakBeforeTernaryOperators: true
BreakConstructorInitializers: BeforeComma # BeforeColon
BreakBeforeConceptDeclarations: true
BreakBeforeInheritanceComma: true # false
BreakInheritanceList: BeforeComma # BeforeColon
BreakBeforeTernaryOperators: true
BreakConstructorInitializersBeforeComma: true # false
BreakConstructorInitializers: BeforeComma # BeforeColon
# BreakAfterJavaFieldAnnotations: false
BreakStringLiterals: true
ColumnLimit: 120 # 80
CommentPragmas: '^ IWYU pragma:'
CompactNamespaces: false
ConstructorInitializerAllOnOneLineOrOnePerLine: true
ConstructorInitializerIndentWidth: 4
ContinuationIndentWidth: 4
Cpp11BracedListStyle: true
DeriveLineEnding: true
DerivePointerAlignment: true
DisableFormat: false
EmptyLineAfterAccessModifier: Never
@ -105,21 +83,14 @@ IndentCaseLabels: true
IndentExternBlock: AfterExternBlock
IndentGotoLabels: true
IndentPPDirectives: None
IndentRequiresClause: true
IndentRequires: false
IndentWidth: 2
IndentWrappedFunctionNames: false
InsertBraces: false
InsertNewlineAtEOF: false
# InsertTrailingCommas: None
IntegerLiteralSeparator:
Binary: 0
Decimal: 0
Hex: 0
# JavaScriptQuotes: Leave
# JavaScriptWrapImports: true
KeepEmptyLinesAtTheStartOfBlocks: false
LambdaBodyIndentation: Signature
LineEnding: DeriveLF
MacroBlockBegin: ''
MacroBlockEnd: ''
MaxEmptyLinesToKeep: 1
@ -129,31 +100,23 @@ NamespaceIndentation: None
# ObjCBreakBeforeNestedBlockParam: true
# ObjCSpaceAfterProperty: false
# ObjCSpaceBeforeProtocolList: true
PackConstructorInitializers: NextLine
PenaltyBreakAssignment: 2
PenaltyBreakBeforeFirstCallParameter: 1
PenaltyBreakComment: 300
PenaltyBreakFirstLessLess: 120
PenaltyBreakOpenParenthesis: 0
PenaltyBreakString: 1000
PenaltyBreakTemplateDeclaration: 10
PenaltyExcessCharacter: 1000000
PenaltyIndentedWhitespace: 0
PenaltyReturnTypeOnItsOwnLine: 200
PointerAlignment: Right # Left
PointerAlignment: Right
PPIndentWidth: -1
QualifierAlignment: Leave
ReferenceAlignment: Pointer
ReflowComments: false # true
RemoveBracesLLVM: false
RemoveSemicolon: false
RequiresClausePosition: OwnLine
RequiresExpressionIndentation: OuterScope
SeparateDefinitionBlocks: Leave
ShortNamespaceLines: 0 # 1
SortIncludes: CaseInsensitive # CaseSensitive
# SortJavaStaticImport: Before
SortUsingDeclarations: Lexicographic # LexicographicNumeric
SortUsingDeclarations: false # true
SpaceAfterCStyleCast: false
SpaceAfterLogicalNot: false
SpaceAfterTemplateKeyword: true
@ -164,16 +127,6 @@ SpaceBeforeCpp11BracedList: false
SpaceBeforeCtorInitializerColon: true
SpaceBeforeInheritanceColon: true
SpaceBeforeParens: ControlStatements
SpaceBeforeParensOptions:
AfterControlStatements: true
AfterForeachMacros: true
AfterFunctionDefinitionName: false
AfterFunctionDeclarationName: false
AfterIfMacros: true
AfterOverloadedOperator: false
AfterRequiresInClause: false
AfterRequiresInExpression: false
BeforeNonEmptyParentheses: false
SpaceBeforeRangeBasedForLoopColon: true
SpaceBeforeSquareBrackets: false
SpaceInEmptyBlock: false
@ -190,5 +143,6 @@ SpacesInParentheses: false
SpacesInSquareBrackets: false
Standard: Auto
TabWidth: 100 # 8
UseCRLF: false
UseTab: Never
...

View File

@ -29,13 +29,13 @@ jobs:
# Strip git ref prefix from version
VERSION=$(echo "${{ github.ref }}" | sed -e 's,.*/\(.*\),\1,')
# Strip "v" prefix from tag name
[[ "${{ github.ref }}" == "refs/tags/"* ]] && VERSION=$(echo $VERSION | sed -e 's/^v//')
# Use Docker `latest` tag convention
[ "$VERSION" == "master" ] && VERSION=latest
# Convert IMAGE_TAG, HASH_VERSION and VERSION to lowercase (repository name must be lowercase)
IMAGE_TAG=$(echo "$IMAGE_TAG" | awk '{print tolower($0)}')
IMAGE_TAG_DH=$(echo "$IMAGE_TAG_DH" | awk '{print tolower($0)}')
@ -69,10 +69,10 @@ jobs:
uses: docker/setup-qemu-action@v1
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v2
uses: docker/setup-buildx-action@v1
- name: Cache Docker layers
uses: actions/cache@v3
uses: actions/cache@v2
with:
path: /tmp/.buildx-cache
key: ${{ runner.os }}-buildx-${{ env.SAFE_ARCH }}-${{ github.sha }}
@ -80,7 +80,7 @@ jobs:
${{ runner.os }}-buildx-${{ env.SAFE_ARCH }}-
- name: Login to ghcr registry
uses: docker/login-action@v2
uses: docker/login-action@v1
if: ${{ github.event_name != 'pull_request' }}
with:
registry: ghcr.io
@ -88,7 +88,7 @@ jobs:
password: ${{ secrets.GH_ACCESS_TOKEN }}
- name: Login to Docker Hub registry
uses: docker/login-action@v2
uses: docker/login-action@v1
if: ${{ github.event_name != 'pull_request' }}
with:
username: ${{ secrets.DOCKERHUB_USERNAME }}
@ -158,14 +158,14 @@ jobs:
docker load --input image_linuxppc64le/linuxppc64le.tar
- name: Login to ghcr registry
uses: docker/login-action@v2
uses: docker/login-action@v1
with:
registry: ghcr.io
username: ${{ secrets.GH_USERNAME }}
password: ${{ secrets.GH_ACCESS_TOKEN }}
- name: Login to Docker Hub registry
uses: docker/login-action@v2
uses: docker/login-action@v1
with:
username: ${{ secrets.DOCKERHUB_USERNAME }}
password: ${{ secrets.DOCKERHUB_ACCESS_TOKEN }}
@ -180,7 +180,7 @@ jobs:
--amend ${{ env.IMAGE_TAG }}:${{ env.HASH_VERSION }}-linuxarmv7 \
--amend ${{ env.IMAGE_TAG }}:${{ env.HASH_VERSION }}-linuxarm64 \
--amend ${{ env.IMAGE_TAG }}:${{ env.HASH_VERSION }}-linuxppc64le
#docker manifest push ${{ env.IMAGE_TAG }}:${{ env.HASH_VERSION }}
docker manifest push ${{ env.IMAGE_TAG }}:${{ env.HASH_VERSION }}
# Tag images as VERSION (like 'latest')
docker tag ${{ env.IMAGE_TAG }}:${{ env.HASH_VERSION }}-linux386 ${{ env.IMAGE_TAG }}:${{ env.VERSION }}-linux386
@ -197,7 +197,7 @@ jobs:
--amend ${{ env.IMAGE_TAG }}:${{ env.VERSION }}-linuxarmv7 \
--amend ${{ env.IMAGE_TAG }}:${{ env.VERSION }}-linuxarm64 \
--amend ${{ env.IMAGE_TAG }}:${{ env.VERSION }}-linuxppc64le
#docker manifest push ${{ env.IMAGE_TAG }}:${{ env.VERSION }}
docker manifest push ${{ env.IMAGE_TAG }}:${{ env.VERSION }}
# -- Push to Docker Hub
docker tag ${{ env.IMAGE_TAG }}:${{ env.HASH_VERSION }}-linux386 ${{ env.IMAGE_TAG_DH }}:${{ env.VERSION }}-linux386

View File

@ -6,7 +6,7 @@ if (POLICY CMP0065)
cmake_policy(SET CMP0065 NEW)
endif()
project(TelegramBotApi VERSION 7.0 LANGUAGES CXX)
project(TelegramBotApi VERSION 6.2 LANGUAGES CXX)
if (POLICY CMP0069)
option(TELEGRAM_BOT_API_ENABLE_LTO "Use \"ON\" to enable Link Time Optimization.")
@ -73,9 +73,6 @@ if (CLANG OR GCC)
elseif (APPLE)
set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -Wl,-no_pie")
endif()
include(AddCXXCompilerFlag)
add_cxx_compiler_flag("-static-libstdc++")
add_cxx_compiler_flag("-static-libgcc")
endif()
endif()
@ -88,7 +85,6 @@ set(TELEGRAM_BOT_API_SOURCE
telegram-bot-api/HttpStatConnection.cpp
telegram-bot-api/Query.cpp
telegram-bot-api/Stats.cpp
telegram-bot-api/Watchdog.cpp
telegram-bot-api/WebhookActor.cpp
telegram-bot-api/Client.h
@ -99,7 +95,6 @@ set(TELEGRAM_BOT_API_SOURCE
telegram-bot-api/HttpStatConnection.h
telegram-bot-api/Query.h
telegram-bot-api/Stats.h
telegram-bot-api/Watchdog.h
telegram-bot-api/WebhookActor.h
)

View File

@ -54,6 +54,17 @@ Get the member list of a supergroup or channel
###### Returns `ChatMember`
##### Method `deleteMessages`
Delete all the messages with message_id in range between `start` and `end`.
The `start` parameter MUST be less than the `end` parameter
Both `start` and `end` must be positive non zero numbers
The method will always return `true` as a result, even if the messages cannot be deleted
This method does not work on private chat or normal groups
It is not suggested to delete more than 200 messages per call
**NOTE**
The maximum number of messages to be deleted in a single batch is determined by the `max-batch-operations` parameter and is 10000 by default
###### Parameters
- `chat_id` Chat id
- `start` First message id to delete
@ -111,17 +122,6 @@ _For Docker containers, `$TELEGRAM_VERBOSITY` can be set._
##### Method `getChat`
The command `getChat` will also try to resolve the username online, if it can't be found locally
##### Method `deleteMessages`
The command `deleteMessages` can also delete all the messages with message_id in range between `start` and `end`.
The `start` parameter MUST be less than the `end` parameter
Both `start` and `end` must be positive non-zero numbers
The method will always return `true` as a result, even if the messages cannot be deleted
This method does not work on private chat or normal groups
It is not suggested to delete more than 200 messages per call
**NOTE**
The maximum number of messages to be deleted in a single batch is determined by the `max-batch-operations` parameter and is 10000 by default
##### Object `Message`
The `Message` object now has two new fields:
- `views`: how many views has the message (usually the views are shown only for channel messages)

View File

@ -454,7 +454,7 @@ function onOptionsChanged() {
pre_text.push('Note that building requires a lot of memory, so you may need to increase allowed per-process memory usage in /etc/login.conf or build from root.');
}
if (os_netbsd) {
pre_text.push('Note that the following instruction is for NetBSD 8+ and default SH shell.');
pre_text.push('Note that the following instruction is for NetBSD 8.0 and default SH shell.');
}
var terminal_name = (function () {
@ -586,8 +586,8 @@ function onOptionsChanged() {
if (!use_root) {
commands.push('su -');
}
commands.push('export PKG_PATH=http://cdn.netbsd.org/pub/pkgsrc/packages/NetBSD/$(uname -p)/$(uname -r)/All');
var packages = 'git gperf cmake openssl gcc12-libs mozilla-rootcerts-openssl';
commands.push('export PKG_PATH=ftp://ftp.netbsd.org/pub/pkgsrc/packages/NetBSD/i386/8.0_2019Q2/All');
var packages = 'git gperf cmake openssl gcc5-libs';
commands.push('pkg_add ' + packages);
if (!use_root) {
commands.push('exit');

2
td

@ -1 +1 @@
Subproject commit 27c3eaeb4964bd5f18d8488e354abde1a4383e49
Subproject commit d9cfcf88fe4ad06dae1716ce8f66bbeb7f9491d9

View File

@ -547,23 +547,16 @@ paths:
/deleteMessages:
post:
tags:
- modified
- added
description: |-
Use this method to delete multiple messages simultaneously.
This method can delete a set of message ids, or a range of message ids.
If you specify "message_ids", this method tries to delete the specified set of ids:
If some of the specified messages can't be found, they are skipped.
Returns True on success.
Delete all the messages with message_id in range between start and end.
The start parameter MUST be less than the end parameter
Both start and end must be positive non zero numbers
The method will always return true as a result, even if the messages cannot be deleted
This method does not work on private chat or normal groups It is not suggested to delete more than 200 messages per call.
If you specify "start" and "end", this method deletes all the messages with message_id in range between start and end:
The start parameter MUST be less than the end parameter
Both start and end must be positive non zero numbers
The method will always return true as a result, even if the messages cannot be deleted
This method does not work on private chat or normal groups It is not suggested to delete more than 200 messages per call.
*NOTE*
The maximum number of messages to be deleted in a single batch is determined by the max-batch-operations parameter and is 10000 by default.
*NOTE*
The maximum number of messages to be deleted in a single batch is determined by the max-batch-operations parameter and is 10000 by default.
requestBody:
content:
application/x-www-form-urlencoded:
@ -575,10 +568,6 @@ paths:
anyOf:
- type: integer
- type: string
message_ids:
type: array
items:
type: integer
start:
description: First message id to delete
type: integer
@ -587,6 +576,8 @@ paths:
type: integer
required:
- chat_id
- start
- end
multipart/form-data:
schema:
type: object
@ -596,10 +587,6 @@ paths:
anyOf:
- type: integer
- type: string
message_ids:
type: array
items:
type: integer
start:
description: First message id to delete
type: integer
@ -608,6 +595,8 @@ paths:
type: integer
required:
- chat_id
- start
- end
application/json:
schema:
type: object
@ -617,10 +606,6 @@ paths:
anyOf:
- type: integer
- type: string
message_ids:
type: array
items:
type: integer
start:
description: First message id to delete
type: integer
@ -629,10 +614,12 @@ paths:
type: integer
required:
- chat_id
- start
- end
required: true
responses:
'200':
description: 'Request was successful, the result is returned.'
description: ''
content:
application/json:
schema:

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -6,6 +6,7 @@
//
#include "telegram-bot-api/ClientManager.h"
#include "telegram-bot-api/Client.h"
#include "telegram-bot-api/ClientParameters.h"
#include "telegram-bot-api/WebhookActor.h"
#include "telegram-bot-api/StatsJson.h"
@ -30,7 +31,6 @@
#include "td/utils/Parser.h"
#include "td/utils/port/IPAddress.h"
#include "td/utils/port/Stat.h"
#include "td/utils/port/thread.h"
#include "td/utils/Slice.h"
#include "td/utils/SliceBuilder.h"
#include "td/utils/StackAllocator.h"
@ -39,10 +39,7 @@
#include "td/utils/Random.h"
#include "td/utils/base64.h"
#include "memprof/memprof.h"
#include <algorithm>
#include <atomic>
#include <map>
#include <tuple>
namespace telegram_bot_api {
@ -54,8 +51,6 @@ void ClientManager::close(td::Promise<td::Unit> &&promise) {
}
close_flag_ = true;
watchdog_id_.reset();
dump_statistics();
auto ids = clients_.ids();
for (auto id : ids) {
auto *client_info = clients_.get(id);
@ -177,7 +172,13 @@ void ClientManager::user_login(PromisedQueryPtr query) {
}
bool ClientManager::check_flood_limits(PromisedQueryPtr &query, bool is_user_login) {
td::string ip_address = query->get_peer_ip_address();
td::string ip_address;
if (query->peer_address().is_valid() && !query->peer_address().is_reserved()) { // external connection
ip_address = query->peer_address().get_ip_str().str();
} else {
// invalid peer address or connection from the local network
ip_address = query->get_header("x-real-ip").str();
}
if (!ip_address.empty()) {
td::IPAddress tmp;
tmp.init_host_port(ip_address, 0).ignore();
@ -186,7 +187,7 @@ bool ClientManager::check_flood_limits(PromisedQueryPtr &query, bool is_user_log
ip_address = tmp.get_ip_str().str();
}
}
LOG(DEBUG) << "Receive incoming query for new bot " << query->token() << " from " << ip_address;
LOG(DEBUG) << "Receive incoming query for new bot " << query->token() << " from " << query->peer_address();
if (!ip_address.empty()) {
LOG(DEBUG) << "Check Client creation flood control for IP address " << ip_address;
if (is_user_login) {
@ -203,51 +204,18 @@ bool ClientManager::check_flood_limits(PromisedQueryPtr &query, bool is_user_log
flood_control.add_limit(60 * 60, 600); // 600 in an hour
}
}
auto now = td::Time::now();
auto wakeup_at = flood_control.get_wakeup_at();
auto now = static_cast<td::uint32>(td::Time::now());
td::uint32 wakeup_at = flood_control.get_wakeup_at();
if (wakeup_at > now) {
LOG(INFO) << "Failed to create Client from IP address " << ip_address;
query->set_retry_after_error(static_cast<int>(wakeup_at - now) + 1);
return false;
}
flood_control.add_event(now);
flood_control.add_event(static_cast<td::int32>(now));
}
return true;
}
ClientManager::TopClients ClientManager::get_top_clients(std::size_t max_count, td::Slice token_filter) {
auto now = td::Time::now();
TopClients result;
td::vector<std::pair<td::int64, td::uint64>> top_client_ids;
for (auto id : clients_.ids()) {
auto *client_info = clients_.get(id);
CHECK(client_info);
if (client_info->stat_.is_active(now)) {
result.active_count++;
}
if (!td::begins_with(client_info->token_, token_filter)) {
continue;
}
auto score = static_cast<td::int64>(client_info->stat_.get_score(now) * -1e9);
if (score == 0 && top_client_ids.size() >= max_count) {
continue;
}
top_client_ids.emplace_back(score, id);
}
if (top_client_ids.size() < max_count) {
max_count = top_client_ids.size();
}
std::partial_sort(top_client_ids.begin(), top_client_ids.begin() + max_count, top_client_ids.end());
result.top_client_ids.reserve(max_count);
for (std::size_t i = 0; i < max_count; i++) {
result.top_client_ids.push_back(top_client_ids[i].second);
}
return result;
}
void ClientManager::get_stats(td::Promise<td::BufferSlice> promise,
td::vector<std::pair<td::string, td::string>> args,
bool as_json) {
@ -293,10 +261,32 @@ void ClientManager::get_stats(td::Promise<td::BufferSlice> promise,
}
auto now = td::Time::now();
auto top_clients = get_top_clients(50, id_filter);
td::int32 active_bot_count = 0;
std::multimap<td::int64, td::uint64> top_bot_ids;
for (auto id : clients_.ids()) {
auto *client_info = clients_.get(id);
CHECK(client_info);
if (client_info->stat_.is_active(now)) {
active_bot_count++;
}
if (!td::begins_with(client_info->token_, id_filter)) {
continue;
}
auto stats = client_info->stat_.as_vector(now);
double score = 0.0;
for (auto &stat : stats) {
if (stat.key_ == "update_count" || stat.key_ == "request_count") {
score -= td::to_double(stat.value_);
}
}
top_bot_ids.emplace(static_cast<td::int64>(score * 1e9), id);
}
if(!as_json) {
sb << BotStatActor::get_description() << '\n';
sb << stat_.get_description() << '\n';
}
if (id_filter.empty()) {
if(as_json) {
@ -310,9 +300,9 @@ void ClientManager::get_stats(td::Promise<td::BufferSlice> promise,
sb << "bot_count\t" << clients_.size() << '\n';
}
if(as_json) {
jb_root("active_bot_count", td::JsonInt(top_clients.active_count));
jb_root("active_bot_count", td::JsonInt(active_bot_count));
} else {
sb << "active_bot_count\t" << top_clients.active_count << '\n';
sb << "active_bot_count\t" << active_bot_count << '\n';
}
auto r_mem_stat = td::mem_stat();
if (r_mem_stat.is_ok()) {
@ -346,13 +336,13 @@ void ClientManager::get_stats(td::Promise<td::BufferSlice> promise,
if(as_json) {
jb_root("buffer_memory", JsonStatsSize(td::BufferAllocator::get_buffer_mem()));
jb_root("active_webhook_connections", td::JsonLong(WebhookActor::get_total_connection_count()));
jb_root("active_requests", td::JsonLong(parameters_->shared_data_->query_count_.load(std::memory_order_relaxed)));
jb_root("active_webhook_connections", td::JsonLong(WebhookActor::get_total_connections_count()));
jb_root("active_requests", td::JsonLong(parameters_->shared_data_->query_count_.load()));
jb_root("active_network_queries", td::JsonLong(td::get_pending_network_query_count(*parameters_->net_query_stats_)));
} else {
sb << "buffer_memory\t" << td::format::as_size(td::BufferAllocator::get_buffer_mem()) << '\n';
sb << "active_webhook_connections\t" << WebhookActor::get_total_connection_count() << '\n';
sb << "active_requests\t" << parameters_->shared_data_->query_count_.load(std::memory_order_relaxed) << '\n';
sb << "active_webhook_connections\t" << WebhookActor::get_total_connections_count() << '\n';
sb << "active_requests\t" << parameters_->shared_data_->query_count_.load() << '\n';
sb << "active_network_queries\t" << td::get_pending_network_query_count(*parameters_->net_query_stats_) << '\n';
}
if(as_json) {
@ -366,29 +356,24 @@ void ClientManager::get_stats(td::Promise<td::BufferSlice> promise,
if(as_json) {
td::vector<JsonStatsBotAdvanced> bots;
for (auto top_client_id : top_clients.top_client_ids) {
auto client_info = clients_.get(top_client_id);
for (std::pair<td::int64, td::uint64> top_bot_id : top_bot_ids) {
auto client_info = clients_.get(top_bot_id.second);
CHECK(client_info);
ServerBotInfo bot_info = client_info->client_.get_actor_unsafe()->get_bot_info();
auto active_request_count = client_info->stat_.get_active_request_count();
auto active_file_upload_bytes = client_info->stat_.get_active_file_upload_bytes();
auto active_file_upload_count = client_info->stat_.get_active_file_upload_count();
ServerBotInfo bot_info = client_info->client_->get_actor_unsafe()->get_bot_info();
auto stats = client_info->stat_.as_json_ready_vector(now);
JsonStatsBotAdvanced bot(
std::move(top_client_id), std::move(bot_info), active_request_count, active_file_upload_bytes, active_file_upload_count, std::move(stats), parameters_->stats_hide_sensible_data_, now
std::move(top_bot_id), std::move(bot_info), std::move(stats), parameters_->stats_hide_sensible_data_, now
);
bots.push_back(bot);
}
auto bot_count = bots.size();
jb_root("bots", JsonStatsBots(std::move(bots), bot_count > 100));
} else {
for (auto top_client_id : top_clients.top_client_ids) {
auto *client_info = clients_.get(top_client_id);
for (auto top_bot_id : top_bot_ids) {
auto *client_info = clients_.get(top_bot_id.second);
CHECK(client_info);
auto bot_info = client_info->client_.get_actor_unsafe()->get_bot_info();
auto active_request_count = client_info->stat_.get_active_request_count();
auto active_file_upload_bytes = client_info->stat_.get_active_file_upload_bytes();
auto active_file_upload_count = client_info->stat_.get_active_file_upload_count();
auto bot_info = client_info->client_->get_actor_unsafe()->get_bot_info();
sb << '\n';
sb << "id\t" << bot_info.id_ << '\n';
sb << "uptime\t" << now - bot_info.start_time_ << '\n';
@ -396,33 +381,18 @@ void ClientManager::get_stats(td::Promise<td::BufferSlice> promise,
sb << "token\t" << bot_info.token_ << '\n';
}
sb << "username\t" << bot_info.username_ << '\n';
if (active_request_count != 0) {
sb << "active_request_count\t" << active_request_count << '\n';
}
if (active_file_upload_bytes != 0) {
sb << "active_file_upload_bytes\t" << active_file_upload_bytes << '\n';
}
if (active_file_upload_count != 0) {
sb << "active_file_upload_count\t" << active_file_upload_count << '\n';
}
if (!bot_info.webhook_.empty()) {
if (!parameters_->stats_hide_sensible_data_) {
sb << "webhook\t" << bot_info.webhook_ << '\n';
} else {
sb << "webhook enabled" << '\n';
}
if (bot_info.has_webhook_certificate_) {
sb << "has_custom_certificate\t" << bot_info.has_webhook_certificate_ << '\n';
}
if (bot_info.webhook_max_connections_ != parameters_->default_max_webhook_connections_) {
sb << "webhook_max_connections\t" << bot_info.webhook_max_connections_ << '\n';
}
if (!parameters_->stats_hide_sensible_data_) {
sb << "webhook\t" << bot_info.webhook_ << '\n';
} else if (bot_info.webhook_.empty()) {
sb << "webhook disabled" << '\n';
} else {
sb << "webhook enabled" << '\n';
}
sb << "has_custom_certificate\t" << bot_info.has_webhook_certificate_ << '\n';
sb << "head_update_id\t" << bot_info.head_update_id_ << '\n';
if (bot_info.pending_update_count_ != 0) {
sb << "tail_update_id\t" << bot_info.tail_update_id_ << '\n';
sb << "pending_update_count\t" << bot_info.pending_update_count_ << '\n';
}
sb << "tail_update_id\t" << bot_info.tail_update_id_ << '\n';
sb << "pending_update_count\t" << bot_info.pending_update_count_ << '\n';
sb << "webhook_max_connections\t" << bot_info.webhook_max_connections_ << '\n';
auto stats = client_info->stat_.as_vector(now);
for (auto &stat : stats) {
@ -448,6 +418,11 @@ td::int64 ClientManager::get_tqueue_id(td::int64 user_id, bool is_test_dc) {
}
void ClientManager::start_up() {
//NB: the same scheduler as for database in Td
auto current_scheduler_id = td::Scheduler::instance()->sched_id();
auto scheduler_count = td::Scheduler::instance()->sched_count();
auto scheduler_id = td::min(current_scheduler_id + 1, scheduler_count - 1);
// init tqueue
{
auto load_start_time = td::Time::now();
@ -475,8 +450,7 @@ void ClientManager::start_up() {
}
}
auto concurrent_binlog =
std::make_shared<td::ConcurrentBinlog>(std::move(binlog), SharedData::get_binlog_scheduler_id());
auto concurrent_binlog = std::make_shared<td::ConcurrentBinlog>(std::move(binlog), scheduler_id);
auto concurrent_tqueue_binlog = td::make_unique<td::TQueueBinlog<td::BinlogInterface>>();
concurrent_tqueue_binlog->set_binlog(std::move(concurrent_binlog));
tqueue->set_callback(std::move(concurrent_tqueue_binlog));
@ -485,24 +459,23 @@ void ClientManager::start_up() {
LOG(WARNING) << "Loaded " << loaded_event_count << " TQueue events in " << (td::Time::now() - load_start_time)
<< " seconds";
next_tqueue_gc_time_ = td::Time::now() + 600;
}
// init webhook_db and user_db
auto concurrent_webhook_db = td::make_unique<td::BinlogKeyValue<td::ConcurrentBinlog>>();
auto status = concurrent_webhook_db->init(parameters_->working_directory_ + "webhooks_db.binlog", td::DbKey::empty(),
SharedData::get_binlog_scheduler_id());
LOG_IF(FATAL, status.is_error()) << "Can't open webhooks_db.binlog " << status;
scheduler_id);
LOG_IF(FATAL, status.is_error()) << "Can't open webhooks_db.binlog " << status.error();
parameters_->shared_data_->webhook_db_ = std::move(concurrent_webhook_db);
auto concurrent_user_db = td::make_unique<td::BinlogKeyValue<td::ConcurrentBinlog>>();
status = concurrent_user_db->init(parameters_->working_directory_ + "user_db.binlog", td::DbKey::empty(), SharedData::get_binlog_scheduler_id());
status = concurrent_user_db->init(parameters_->working_directory_ + "user_db.binlog", td::DbKey::empty(), scheduler_id);
LOG_IF(FATAL, status.is_error()) << "Can't open user_db.binlog " << status.error();
parameters_->shared_data_->user_db_ = std::move(concurrent_user_db);
auto &webhook_db = *parameters_->shared_data_->webhook_db_;
auto &user_db = *parameters_->shared_data_->user_db_;
for (const auto &key_value : webhook_db.get_all()) {
for (auto key_value : webhook_db.get_all()) {
if (!token_range_(td::to_integer<td::uint64>(key_value.first))) {
LOG(WARNING) << "DROP WEBHOOK: " << key_value.first << " ---> " << key_value.second;
webhook_db.erase(key_value.first);
@ -513,10 +486,6 @@ void ClientManager::start_up() {
send_closure_later(actor_id(this), &ClientManager::send, std::move(query));
}
// launch watchdog
watchdog_id_ = td::create_actor_on_scheduler<Watchdog>("ManagerWatchdog", SharedData::get_watchdog_scheduler_id(),
td::this_thread::get_id(), WATCHDOG_TIMEOUT);
set_timeout_in(600.0);
}
PromisedQueryPtr ClientManager::get_webhook_restore_query(td::Slice token, bool is_user, td::Slice webhook_info,
@ -525,7 +494,7 @@ PromisedQueryPtr ClientManager::get_webhook_restore_query(td::Slice token, bool
td::vector<td::BufferSlice> containers;
auto add_string = [&containers](td::Slice str) {
containers.emplace_back(str);
return containers.back().as_mutable_slice();
return containers.back().as_slice();
};
token = add_string(token);
@ -578,92 +547,6 @@ PromisedQueryPtr ClientManager::get_webhook_restore_query(td::Slice token, bool
return PromisedQueryPtr(query.release(), PromiseDeleter(td::Promise<td::unique_ptr<Query>>()));
}
void ClientManager::dump_statistics() {
if (is_memprof_on()) {
LOG(WARNING) << "Memory dump:";
td::vector<AllocInfo> v;
dump_alloc([&](const AllocInfo &info) { v.push_back(info); });
std::sort(v.begin(), v.end(), [](const AllocInfo &a, const AllocInfo &b) { return a.size > b.size; });
size_t total_size = 0;
size_t other_size = 0;
int count = 0;
for (auto &info : v) {
if (count++ < 50) {
LOG(WARNING) << td::format::as_size(info.size) << td::format::as_array(info.backtrace);
} else {
other_size += info.size;
}
total_size += info.size;
}
LOG(WARNING) << td::tag("other", td::format::as_size(other_size));
LOG(WARNING) << td::tag("total size", td::format::as_size(total_size));
LOG(WARNING) << td::tag("total traces", get_ht_size());
LOG(WARNING) << td::tag("fast_backtrace_success_rate", get_fast_backtrace_success_rate());
}
auto r_mem_stat = td::mem_stat();
if (r_mem_stat.is_ok()) {
auto mem_stat = r_mem_stat.move_as_ok();
LOG(WARNING) << td::tag("rss", td::format::as_size(mem_stat.resident_size_));
LOG(WARNING) << td::tag("vm", td::format::as_size(mem_stat.virtual_size_));
LOG(WARNING) << td::tag("rss_peak", td::format::as_size(mem_stat.resident_size_peak_));
LOG(WARNING) << td::tag("vm_peak", td::format::as_size(mem_stat.virtual_size_peak_));
}
LOG(WARNING) << td::tag("buffer_mem", td::format::as_size(td::BufferAllocator::get_buffer_mem()));
LOG(WARNING) << td::tag("buffer_slice_size", td::format::as_size(td::BufferAllocator::get_buffer_slice_size()));
const auto &shared_data = parameters_->shared_data_;
auto query_list_size = shared_data->query_list_size_.load(std::memory_order_relaxed);
auto query_count = shared_data->query_count_.load(std::memory_order_relaxed);
LOG(WARNING) << td::tag("pending queries", query_count) << td::tag("pending requests", query_list_size);
td::uint64 i = 0;
bool was_gap = false;
for (auto end = &shared_data->query_list_, cur = end->prev; cur != end; cur = cur->prev, i++) {
if (i < 20 || i > query_list_size - 20 || i % (query_list_size / 50 + 1) == 0) {
if (was_gap) {
LOG(WARNING) << "...";
was_gap = false;
}
LOG(WARNING) << static_cast<const Query &>(*cur);
} else {
was_gap = true;
}
}
td::dump_pending_network_queries(*parameters_->net_query_stats_);
auto now = td::Time::now();
auto top_clients = get_top_clients(10, {});
for (auto top_client_id : top_clients.top_client_ids) {
auto *client_info = clients_.get(top_client_id);
CHECK(client_info);
auto bot_info = client_info->client_.get_actor_unsafe()->get_bot_info();
td::string update_count;
td::string request_count;
auto replace_tabs = [](td::string &str) {
for (auto &c : str) {
if (c == '\t') {
c = ' ';
}
}
};
auto stats = client_info->stat_.as_vector(now);
for (auto &stat : stats) {
if (stat.key_ == "update_count") {
replace_tabs(stat.value_);
update_count = std::move(stat.value_);
}
if (stat.key_ == "request_count") {
replace_tabs(stat.value_);
request_count = std::move(stat.value_);
}
}
LOG(WARNING) << td::tag("id", bot_info.id_) << td::tag("update_count", update_count)
<< td::tag("request_count", request_count);
}
}
void ClientManager::raw_event(const td::Event::Raw &event) {
auto id = get_link_token();
auto *info = clients_.get(id);
@ -680,28 +563,6 @@ void ClientManager::raw_event(const td::Event::Raw &event) {
}
}
void ClientManager::timeout_expired() {
send_closure(watchdog_id_, &Watchdog::kick);
set_timeout_in(WATCHDOG_TIMEOUT / 10);
double now = td::Time::now();
if (now > next_tqueue_gc_time_) {
auto unix_time = parameters_->shared_data_->get_unix_time(now);
LOG(INFO) << "Run TQueue GC at " << unix_time;
td::int64 deleted_events;
bool is_finished;
std::tie(deleted_events, is_finished) = parameters_->shared_data_->tqueue_->run_gc(unix_time);
LOG(INFO) << "TQueue GC deleted " << deleted_events << " events";
next_tqueue_gc_time_ = td::Time::now() + (is_finished ? 60.0 : 1.0);
tqueue_deleted_events_ += deleted_events;
if (tqueue_deleted_events_ > last_tqueue_deleted_events_ + 10000) {
LOG(WARNING) << "TQueue GC already deleted " << tqueue_deleted_events_ << " events since the start";
last_tqueue_deleted_events_ = tqueue_deleted_events_;
}
}
}
void ClientManager::hangup_shared() {
auto id = get_link_token();
auto *info = clients_.get(id);
@ -739,6 +600,4 @@ void ClientManager::finish_close() {
stop();
}
constexpr double ClientManager::WATCHDOG_TIMEOUT;
} // namespace telegram_bot_api

View File

@ -1,5 +1,5 @@
//
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2023
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2021
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
@ -9,7 +9,6 @@
#include "telegram-bot-api/Client.h"
#include "telegram-bot-api/Query.h"
#include "telegram-bot-api/Stats.h"
#include "telegram-bot-api/Watchdog.h"
#include "td/actor/actor.h"
@ -42,8 +41,6 @@ class ClientManager final : public td::Actor {
: parameters_(std::move(parameters)), token_range_(token_range) {
}
void dump_statistics();
void send(PromisedQueryPtr query);
void user_login(PromisedQueryPtr query);
@ -74,27 +71,13 @@ class ClientManager final : public td::Actor {
bool close_flag_ = false;
td::vector<td::Promise<td::Unit>> close_promises_;
td::ActorOwn<Watchdog> watchdog_id_;
double next_tqueue_gc_time_ = 0.0;
td::int64 tqueue_deleted_events_ = 0;
td::int64 last_tqueue_deleted_events_ = 0;
static constexpr double WATCHDOG_TIMEOUT = 0.25;
static td::int64 get_tqueue_id(td::int64 user_id, bool is_test_dc);
static PromisedQueryPtr get_webhook_restore_query(td::Slice token, bool is_user, td::Slice webhook_info,
std::shared_ptr<SharedData> shared_data);
struct TopClients {
td::int32 active_count = 0;
td::vector<td::uint64> top_client_ids;
};
TopClients get_top_clients(std::size_t max_count, td::Slice token_filter);
void start_up() final;
void raw_event(const td::Event::Raw &event) final;
void timeout_expired() final;
void hangup_shared() final;
void close_db();
void finish_close();

View File

@ -1,5 +1,5 @@
//
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2023
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2021
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
@ -29,10 +29,10 @@ namespace telegram_bot_api {
struct SharedData {
std::atomic<td::uint64> query_count_{0};
std::atomic<size_t> query_list_size_{0};
std::atomic<int> next_verbosity_level_{-1};
// not thread-safe, must be used from a single thread
// not thread-safe
size_t query_list_size_ = 0;
td::ListNode query_list_;
td::unique_ptr<td::KeyValueSyncInterface> webhook_db_;
td::unique_ptr<td::KeyValueSyncInterface> user_db_;
@ -53,55 +53,6 @@ struct SharedData {
}
return static_cast<td::int32>(result);
}
static td::int32 get_file_gc_scheduler_id() {
// the same scheduler as for file GC in Td
return 2;
}
static td::int32 get_client_scheduler_id() {
// the thread for ClientManager and all Clients
return 4;
}
static td::int32 get_watchdog_scheduler_id() {
// the thread for watchdogs
return 5;
}
static td::int32 get_slow_incoming_http_scheduler_id() {
// the thread for slow incoming HTTP connections
return 6;
}
static td::int32 get_slow_outgoing_http_scheduler_id() {
// the thread for slow outgoing HTTP connections
return 7;
}
static td::int32 get_dns_resolver_scheduler_id() {
// the thread for DNS resolving
return 8;
}
static td::int32 get_binlog_scheduler_id() {
// the thread for TQueue and webhook binlogs
return 9;
}
static td::int32 get_webhook_certificate_scheduler_id() {
// the thread for webhook certificate processing
return 10;
}
static td::int32 get_statistics_thread_id() {
// the thread for CPU usage updating
return 11;
}
static td::int32 get_thread_count() {
return 12;
}
};
struct ClientParameters {
@ -111,7 +62,7 @@ struct ClientParameters {
bool local_mode_ = false;
bool allow_http_ = false;
bool use_relative_path_ = false;
bool no_file_limit_ = false;
bool no_file_limit_ = true;
bool allow_users_ = false;
bool allow_users_registration_ = false;
bool stats_hide_sensible_data_ = false;

View File

@ -1,5 +1,5 @@
//
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2023
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2021
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
@ -21,7 +21,7 @@ namespace telegram_bot_api {
void HttpConnection::handle(td::unique_ptr<td::HttpQuery> http_query,
td::ActorOwn<td::HttpInboundConnection> connection) {
CHECK(connection_.empty());
CHECK(connection_->empty());
connection_ = std::move(connection);
LOG(DEBUG) << "Handle " << *http_query;

View File

@ -1,5 +1,5 @@
//
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2023
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2021
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)

View File

@ -1,20 +1,17 @@
//
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2023
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2021
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#pragma once
#include "telegram-bot-api/ClientParameters.h"
#include "td/net/HttpInboundConnection.h"
#include "td/net/TcpListener.h"
#include "td/actor/actor.h"
#include "td/utils/BufferedFd.h"
#include "td/utils/common.h"
#include "td/utils/FloodControlFast.h"
#include "td/utils/format.h"
#include "td/utils/logging.h"
@ -49,8 +46,8 @@ class HttpServer final : public td::TcpListener::Callback {
set_timeout_at(wakeup_at);
return;
}
flood_control_.add_event(now);
LOG(INFO) << "Create TCP listener " << td::tag("address", ip_address_) << td::tag("port", port_);
flood_control_.add_event(static_cast<td::int32>(now));
LOG(INFO) << "Create tcp listener " << td::tag("address", ip_address_) << td::tag("port", port_);
listener_ = td::create_actor<td::TcpListener>(
PSLICE() << "TcpListener" << td::tag("address", ip_address_) << td::tag("port", port_), port_,
actor_shared(this, 1), ip_address_);
@ -63,8 +60,13 @@ class HttpServer final : public td::TcpListener::Callback {
}
void accept(td::SocketFd fd) final {
auto scheduler_count = td::Scheduler::instance()->sched_count();
auto scheduler_id = scheduler_count - 1;
if (scheduler_id > 0) {
scheduler_id--;
}
td::create_actor<td::HttpInboundConnection>("HttpInboundConnection", td::BufferedFd<td::SocketFd>(std::move(fd)), 0,
50, 500, creator_(), SharedData::get_slow_incoming_http_scheduler_id())
20, 500, creator_(), scheduler_id)
.release();
}

View File

@ -1,5 +1,5 @@
//
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2023
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2021
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
@ -15,7 +15,7 @@ namespace telegram_bot_api {
void HttpStatConnection::handle(td::unique_ptr<td::HttpQuery> http_query,
td::ActorOwn<td::HttpInboundConnection> connection) {
CHECK(connection_.empty());
CHECK(connection_->empty());
connection_ = std::move(connection);
td::Parser url_path_parser(http_query->url_path_);
as_json_ = url_path_parser.try_skip("/json");

View File

@ -1,5 +1,5 @@
//
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2023
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2021
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)

View File

@ -1,5 +1,5 @@
//
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2023
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2021
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
@ -26,10 +26,10 @@ td::FlatHashMap<td::string, td::unique_ptr<td::VirtuallyJsonable>> empty_paramet
Query::Query(td::vector<td::BufferSlice> &&container, td::Slice token, bool is_user, bool is_test_dc, td::MutableSlice method,
td::vector<std::pair<td::MutableSlice, td::MutableSlice>> &&args,
td::vector<std::pair<td::MutableSlice, td::MutableSlice>> &&headers, td::vector<td::HttpFile> &&files,
std::shared_ptr<SharedData> shared_data, const td::IPAddress &peer_ip_address, bool is_internal)
std::shared_ptr<SharedData> shared_data, const td::IPAddress &peer_address, bool is_internal)
: state_(State::Query)
, shared_data_(shared_data)
, peer_ip_address_(peer_ip_address)
, peer_address_(peer_address)
, container_(std::move(container))
, token_(token)
, is_user_(is_user)
@ -44,25 +44,16 @@ Query::Query(td::vector<td::BufferSlice> &&container, td::Slice token, bool is_u
}
td::to_lower_inplace(method_);
start_timestamp_ = td::Time::now();
LOG(INFO) << "Query " << this << ": " << *this;
LOG(INFO) << "QUERY: create " << td::tag("ptr", this) << *this;
if (shared_data_) {
shared_data_->query_count_.fetch_add(1, std::memory_order_relaxed);
shared_data_->query_count_++;
if (method_ != "getupdates") {
shared_data_->query_list_size_.fetch_add(1, std::memory_order_relaxed);
shared_data_->query_list_size_++;
shared_data_->query_list_.put(this);
}
}
}
td::string Query::get_peer_ip_address() const {
if (peer_ip_address_.is_valid() && !peer_ip_address_.is_reserved()) { // external connection
return peer_ip_address_.get_ip_str().str();
} else {
// invalid peer IP address or connection from the local network
return get_header("x-real-ip").str();
}
}
td::int64 Query::query_size() const {
return std::accumulate(
container_.begin(), container_.end(), td::int64{0},
@ -86,7 +77,7 @@ void Query::set_stat_actor(td::ActorId<BotStatActor> stat_actor) {
void Query::set_ok(td::BufferSlice result) {
CHECK(state_ == State::Query);
LOG(INFO) << "Query " << this << ": " << td::tag("method", method_) << td::tag("text", result.as_slice());
LOG(INFO) << "QUERY: got ok " << td::tag("ptr", this) << td::tag("text", result.as_slice());
answer_ = std::move(result);
state_ = State::OK;
http_status_code_ = 200;
@ -94,7 +85,7 @@ void Query::set_ok(td::BufferSlice result) {
}
void Query::set_error(int http_status_code, td::BufferSlice result) {
LOG(INFO) << "Query " << this << ": " << td::tag("method", method_) << td::tag("code", http_status_code)
LOG(INFO) << "QUERY: got error " << td::tag("ptr", this) << td::tag("code", http_status_code)
<< td::tag("text", result.as_slice());
CHECK(state_ == State::Query);
answer_ = std::move(result);
@ -116,25 +107,9 @@ td::StringBuilder &operator<<(td::StringBuilder &sb, const Query &query) {
auto padded_time =
td::lpad(PSTRING() << td::format::as_time(td::Time::now_cached() - query.start_timestamp()), 10, ' ');
sb << "[bot" << td::rpad(query.token().str(), 46, ' ') << "][time:" << padded_time << ']'
<< td::tag("method", td::lpad(query.method().str(), 25, ' '));
<< td::tag("method", td::lpad(query.method().str(), 20, ' '));
if (!query.args().empty()) {
sb << '{';
for (const auto &arg : query.args()) {
sb << '[';
if (arg.first.size() > 128) {
sb << '<' << arg.first.size() << '>' << td::oneline(arg.first.substr(0, 128)) << "...";
} else {
sb << td::oneline(arg.first);
}
sb << ':';
if (arg.second.size() > 4096) {
sb << '<' << arg.second.size() << '>' << td::oneline(arg.second.substr(0, 4096)) << "...";
} else {
sb << td::oneline(arg.second);
}
sb << ']';
}
sb << '}';
sb << td::oneline(PSLICE() << query.args());
}
if (!query.files().empty()) {
sb << query.files();
@ -161,7 +136,7 @@ void Query::send_response_stat() const {
return;
}
send_closure(stat_actor_, &BotStatActor::add_event<ServerBotStat::Response>,
ServerBotStat::Response{state_ == State::OK, answer_.size(), file_count(), files_size()}, now);
ServerBotStat::Response{state_ == State::OK, answer_.size()}, now);
}
} // namespace telegram_bot_api

View File

@ -1,5 +1,5 @@
//
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2023
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2021
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
@ -23,7 +23,6 @@
#include "td/utils/StringBuilder.h"
#include <algorithm>
#include <atomic>
#include <memory>
#include <utility>
@ -38,53 +37,44 @@ class Query final : public td::ListNode {
td::Slice token() const {
return token_;
}
bool is_user() const {
return is_user_;
}
bool is_test_dc() const {
return is_test_dc_;
}
td::Slice method() const {
return method_;
}
bool has_arg(td::Slice key) const {
auto it = std::find_if(args_.begin(), args_.end(),
[&key](const std::pair<td::MutableSlice, td::MutableSlice> &s) { return s.first == key; });
return it != args_.end();
}
td::MutableSlice arg(td::Slice key) const {
auto it = std::find_if(args_.begin(), args_.end(),
[&key](const std::pair<td::MutableSlice, td::MutableSlice> &s) { return s.first == key; });
return it == args_.end() ? td::MutableSlice() : it->second;
}
const td::vector<std::pair<td::MutableSlice, td::MutableSlice>> &args() const {
return args_;
}
td::Slice get_header(td::Slice key) const {
auto it = std::find_if(headers_.begin(), headers_.end(),
[&key](const std::pair<td::MutableSlice, td::MutableSlice> &s) { return s.first == key; });
return it == headers_.end() ? td::Slice() : it->second;
}
const td::HttpFile *file(td::Slice key) const {
auto it = std::find_if(files_.begin(), files_.end(), [&key](const td::HttpFile &f) { return f.field_name == key; });
return it == files_.end() ? nullptr : &*it;
}
const td::vector<td::HttpFile> &files() const {
return files_;
}
td::int64 files_size() const;
td::string get_peer_ip_address() const;
const td::IPAddress &peer_address() const {
return peer_address_;
}
td::BufferSlice &answer() {
return answer_;
@ -115,19 +105,17 @@ class Query final : public td::ListNode {
Query(td::vector<td::BufferSlice> &&container, td::Slice token, bool is_user, bool is_test_dc, td::MutableSlice method,
td::vector<std::pair<td::MutableSlice, td::MutableSlice>> &&args,
td::vector<std::pair<td::MutableSlice, td::MutableSlice>> &&headers, td::vector<td::HttpFile> &&files,
std::shared_ptr<SharedData> shared_data, const td::IPAddress &peer_ip_address, bool is_internal);
std::shared_ptr<SharedData> shared_data, const td::IPAddress &peer_address, bool is_internal);
Query(const Query &) = delete;
Query &operator=(const Query &) = delete;
Query(Query &&) = delete;
Query &operator=(Query &&) = delete;
~Query() {
if (shared_data_) {
shared_data_->query_count_.fetch_sub(1, std::memory_order_relaxed);
shared_data_->query_count_--;
if (!empty()) {
shared_data_->query_list_size_.fetch_sub(1, std::memory_order_relaxed);
shared_data_->query_list_size_--;
}
td::Scheduler::instance()->destroy_on_scheduler(SharedData::get_file_gc_scheduler_id(), container_, args_,
headers_, files_, answer_);
}
}
@ -141,7 +129,7 @@ class Query final : public td::ListNode {
State state_;
std::shared_ptr<SharedData> shared_data_;
double start_timestamp_;
td::IPAddress peer_ip_address_;
td::IPAddress peer_address_;
td::ActorId<BotStatActor> stat_actor_;
// request
@ -167,6 +155,8 @@ class Query final : public td::ListNode {
td::int64 query_size() const;
td::int64 files_size() const;
td::int64 files_max_size() const;
void send_request_stat() const;

View File

@ -1,5 +1,5 @@
//
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2023
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2021
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
@ -7,7 +7,6 @@
#include "telegram-bot-api/Stats.h"
#include "td/utils/common.h"
#include "td/utils/logging.h"
#include "td/utils/port/thread.h"
#include "td/utils/SliceBuilder.h"
#include "td/utils/StringBuilder.h"
@ -20,24 +19,17 @@ ServerCpuStat::ServerCpuStat() {
}
}
void ServerCpuStat::update(double now) {
auto r_cpu_stat = td::cpu_stat();
if (r_cpu_stat.is_error()) {
return;
void ServerCpuStat::add_event(const td::CpuStat &cpu_stat, double now) {
std::lock_guard<std::mutex> guard(mutex_);
for (auto &stat : stat_) {
stat.add_event(cpu_stat, now);
}
auto &cpu_stat = instance();
std::lock_guard<std::mutex> guard(cpu_stat.mutex_);
for (auto &stat : cpu_stat.stat_) {
stat.add_event(r_cpu_stat.ok(), now);
}
LOG(WARNING) << "CPU usage: " << cpu_stat.stat_[1].get_stat(now).as_vector()[0].value_;
}
td::string ServerCpuStat::get_description() {
td::string ServerCpuStat::get_description() const {
td::string res = "DURATION";
for (auto &descr : DESCR) {
res += '\t';
res += "\t";
res += descr;
}
return res;
@ -45,7 +37,7 @@ td::string ServerCpuStat::get_description() {
static td::string to_percentage(td::uint64 ticks, td::uint64 total_ticks) {
static double multiplier = 100.0 * (td::thread::hardware_concurrency() ? td::thread::hardware_concurrency() : 1);
return PSTRING() << (static_cast<double>(ticks) / static_cast<double>(total_ticks) * multiplier) << '%';
return PSTRING() << (static_cast<double>(ticks) / static_cast<double>(total_ticks) * multiplier) << "%";
}
td::vector<StatItem> CpuStat::as_vector() const {
@ -175,7 +167,7 @@ td::vector<ServerBotStat> BotStatActor::as_json_ready_vector(double now) {
return res;
}
td::string BotStatActor::get_description() {
td::string BotStatActor::get_description() const {
td::string res = "DURATION";
for (auto &descr : DESCR) {
res += "\t";
@ -193,43 +185,6 @@ td::vector<td::string> BotStatActor::get_jsonable_description() const {
}
double BotStatActor::get_score(double now) {
auto minute_stat = stat_[2].stat_duration(now);
double minute_score = minute_stat.first.request_count_ + minute_stat.first.update_count_;
if (minute_stat.second != 0) {
minute_score /= minute_stat.second;
}
auto all_time_stat = stat_[0].stat_duration(now);
double all_time_score = 0.01 * (all_time_stat.first.request_count_ + all_time_stat.first.update_count_);
if (all_time_stat.second != 0) {
all_time_score /= all_time_stat.second;
}
auto active_request_score = static_cast<double>(td::max(get_active_request_count() - 10, static_cast<td::int64>(0)));
auto active_file_upload_score = static_cast<double>(get_active_file_upload_bytes()) * 1e-8;
return minute_score + all_time_score + active_request_score + active_file_upload_score;
}
double BotStatActor::get_minute_update_count(double now) {
auto minute_stat = stat_[2].stat_duration(now);
double result = minute_stat.first.update_count_;
if (minute_stat.second != 0) {
result /= minute_stat.second;
}
return result;
}
td::int64 BotStatActor::get_active_request_count() const {
return active_request_count_;
}
td::int64 BotStatActor::get_active_file_upload_bytes() const {
return active_file_upload_bytes_;
}
td::int64 BotStatActor::get_active_file_upload_count() const {
return active_file_upload_count_;
}
bool BotStatActor::is_active(double now) const {
return last_activity_timestamp_ > now - 86400;
}

View File

@ -1,5 +1,5 @@
//
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2023
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2021
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
@ -9,6 +9,7 @@
#include "td/actor/actor.h"
#include "td/utils/common.h"
#include "td/utils/logging.h"
#include "td/utils/port/Stat.h"
#include "td/utils/Time.h"
#include "td/utils/TimedStat.h"
@ -48,10 +49,16 @@ class ServerCpuStat {
static ServerCpuStat stat;
return stat;
}
static void update(double now) {
auto r_event = td::cpu_stat();
if (r_event.is_error()) {
return;
}
instance().add_event(r_event.ok(), now);
LOG(WARNING) << "CPU usage: " << instance().stat_[1].get_stat(now).as_vector()[0].value_;
}
static void update(double now);
static td::string get_description();
td::string get_description() const;
td::vector<StatItem> as_vector(double now);
td::vector<td::vector<StatItem>> as_json_ready_vector(double now);
@ -65,6 +72,8 @@ class ServerCpuStat {
td::TimedStat<CpuStat> stat_[SIZE];
ServerCpuStat();
void add_event(const td::CpuStat &stat, double now);
};
class ServerBotInfo {
@ -107,17 +116,15 @@ struct ServerBotStat {
struct Response {
bool ok_;
size_t size_;
td::int64 file_count_;
td::int64 files_size_;
};
void on_event(const Response &response) {
void on_event(const Response &answer) {
response_count_++;
if (response.ok_) {
if (answer.ok_) {
response_count_ok_++;
} else {
response_count_error_++;
}
response_bytes_ += static_cast<double>(response.size_);
response_bytes_ += static_cast<double>(answer.size_);
}
struct Request {
@ -148,7 +155,7 @@ class BotStatActor final : public td::Actor {
}
BotStatActor(const BotStatActor &) = delete;
BotStatActor &operator=(const BotStatActor &) = delete;
BotStatActor &operator=(const BotStatActor &other) = delete;
BotStatActor(BotStatActor &&) = default;
BotStatActor &operator=(BotStatActor &&other) noexcept {
if (!empty()) {
@ -167,7 +174,6 @@ class BotStatActor final : public td::Actor {
for (auto &stat : stat_) {
stat.add_event(event, now);
}
on_event(event);
if (!parent_.empty()) {
send_closure(parent_, &BotStatActor::add_event<EventT>, event, now);
}
@ -175,19 +181,8 @@ class BotStatActor final : public td::Actor {
td::vector<StatItem> as_vector(double now);
td::vector<ServerBotStat> as_json_ready_vector(double now);
td::string get_description() const;
td::vector<td::string> get_jsonable_description() const;
static td::string get_description();
double get_score(double now);
double get_minute_update_count(double now);
td::int64 get_active_request_count() const;
td::int64 get_active_file_upload_bytes() const;
td::int64 get_active_file_upload_count() const;
bool is_active(double now) const;
@ -198,26 +193,6 @@ class BotStatActor final : public td::Actor {
td::TimedStat<ServerBotStat> stat_[SIZE];
td::ActorId<BotStatActor> parent_;
double last_activity_timestamp_ = -1e9;
td::int64 active_request_count_ = 0;
td::int64 active_file_upload_bytes_ = 0;
td::int64 active_file_upload_count_ = 0;
void on_event(const ServerBotStat::Update &update) {
}
void on_event(const ServerBotStat::Response &response) {
active_request_count_--;
active_file_upload_count_ -= response.file_count_;
active_file_upload_bytes_ -= response.files_size_;
CHECK(active_request_count_ >= 0);
CHECK(active_file_upload_bytes_ >= 0);
}
void on_event(const ServerBotStat::Request &request) {
active_request_count_++;
active_file_upload_count_ += request.file_count_;
active_file_upload_bytes_ += request.files_size_;
}
};
} // namespace telegram_bot_api

View File

@ -123,15 +123,16 @@ class JsonStatsCpu : public td::Jsonable {
class JsonStatsBot : public td::Jsonable {
public:
explicit JsonStatsBot(td::uint64 client_id) : client_id_(client_id) {
explicit JsonStatsBot(std::pair<td::int64, td::uint64> score_id_pair) : score_id_pair_(std::move(score_id_pair)) {
}
void store(td::JsonValueScope *scope) const {
auto object = scope->enter_object();
object("client_id", td::JsonLong(client_id_));
object("score", td::JsonLong(score_id_pair_.first));
object("internal_id", td::JsonLong(score_id_pair_.second));
}
protected:
const td::uint64 client_id_;
const std::pair<td::int64, td::uint64> score_id_pair_;
};
class JsonStatsBotStatDouble : public td::Jsonable {
@ -197,23 +198,20 @@ class JsonStatsBotStats : public td::Jsonable {
class JsonStatsBotAdvanced : public JsonStatsBot {
public:
explicit JsonStatsBotAdvanced(td::uint64 client_id,
explicit JsonStatsBotAdvanced(std::pair<td::int64, td::uint64> score_id_pair,
ServerBotInfo bot,
td::int64 active_request_count,
td::int64 active_file_upload_bytes,
td::int64 active_file_upload_count,
td::vector<ServerBotStat> stats,
const bool hide_sensible_data,
const double now)
: JsonStatsBot(client_id), bot_(std::move(bot)), active_request_count_(active_request_count),
active_file_upload_bytes_(active_file_upload_bytes), active_file_upload_count_(active_file_upload_count),
stats_(std::move(stats)), hide_sensible_data_(hide_sensible_data), now_(now) {
: JsonStatsBot(std::move(score_id_pair)), bot_(std::move(bot)), stats_(std::move(stats)),
hide_sensible_data_(hide_sensible_data), now_(now) {
}
void store(td::JsonValueScope *scope) const {
auto object = scope->enter_object();
object("id", td::JsonLong(td::to_integer<td::int64>(bot_.id_)));
object("uptime", now_ - bot_.start_time_);
object("client_id", td::JsonLong(client_id_));
object("score", td::JsonLong(score_id_pair_.first));
object("internal_id", td::JsonLong(score_id_pair_.second));
if (!hide_sensible_data_) {
object("token", td::JsonString(bot_.token_));
}
@ -237,9 +235,6 @@ class JsonStatsBotAdvanced : public JsonStatsBot {
}
private:
ServerBotInfo bot_;
td::int64 active_request_count_;
td::int64 active_file_upload_bytes_;
td::int64 active_file_upload_count_;
td::vector<ServerBotStat> stats_;
const bool hide_sensible_data_;
const double now_;

View File

@ -1,28 +0,0 @@
//
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2023
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include "telegram-bot-api/Watchdog.h"
#include "td/utils/logging.h"
#include "td/utils/Time.h"
namespace telegram_bot_api {
void Watchdog::kick() {
auto now = td::Time::now();
if (now >= last_kick_time_ + timeout_ && last_kick_time_ > 0 && GET_VERBOSITY_LEVEL() >= VERBOSITY_NAME(ERROR)) {
LOG(ERROR) << get_name() << " timeout expired after " << now - last_kick_time_ << " seconds";
td::thread::send_real_time_signal(main_thread_id_, 2);
}
last_kick_time_ = now;
set_timeout_in(timeout_);
}
void Watchdog::timeout_expired() {
kick();
}
} // namespace telegram_bot_api

View File

@ -1,31 +0,0 @@
//
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2023
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#pragma once
#include "td/actor/actor.h"
#include "td/utils/port/thread.h"
namespace telegram_bot_api {
class Watchdog final : public td::Actor {
public:
Watchdog(td::thread::id main_thread_id, double timeout) : main_thread_id_(main_thread_id), timeout_(timeout) {
// watchdog is disabled until it is kicked for the first time
}
void kick();
private:
void timeout_expired() final;
td::thread::id main_thread_id_;
double timeout_;
double last_kick_time_ = 0.0;
};
} // namespace telegram_bot_api

View File

@ -1,5 +1,5 @@
//
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2023
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2021
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
@ -11,8 +11,11 @@
#include "td/net/GetHostByNameActor.h"
#include "td/net/HttpHeaderCreator.h"
#include "td/net/HttpProxy.h"
#include "td/net/SslStream.h"
#include "td/net/TransparentProxy.h"
#include "td/actor/actor.h"
#include "td/utils/base64.h"
#include "td/utils/buffer.h"
#include "td/utils/common.h"
@ -29,11 +32,13 @@
#include "td/utils/Span.h"
#include "td/utils/Time.h"
#include <limits>
namespace telegram_bot_api {
static int VERBOSITY_NAME(webhook) = VERBOSITY_NAME(DEBUG);
std::atomic<td::uint64> WebhookActor::total_connection_count_{0};
std::atomic<td::uint64> WebhookActor::total_connections_count_{0};
WebhookActor::WebhookActor(td::ActorShared<Callback> callback, td::int64 tqueue_id, td::HttpUrl url,
td::string cert_path, td::int32 max_connections, bool from_db_flag,
@ -69,11 +74,6 @@ WebhookActor::WebhookActor(td::ActorShared<Callback> callback, td::int64 tqueue_
<< ", max_connections = " << max_connections_;
}
WebhookActor::~WebhookActor() {
td::Scheduler::instance()->destroy_on_scheduler(SharedData::get_file_gc_scheduler_id(), update_map_, queue_updates_,
queues_, ssl_ctx_);
}
void WebhookActor::relax_wakeup_at(double wakeup_at, const char *source) {
if (wakeup_at_ == 0 || wakeup_at < wakeup_at_) {
VLOG(webhook) << "Wake up in " << wakeup_at - td::Time::now() << " from " << source;
@ -114,9 +114,8 @@ void WebhookActor::on_resolved_ip_address(td::Result<td::IPAddress> r_ip_address
return on_error(r_ip_address.move_as_error());
}
auto new_ip_address = r_ip_address.move_as_ok();
auto check_status = check_ip_address(new_ip_address);
if (check_status.is_error()) {
return on_error(std::move(check_status));
if (!check_ip_address(new_ip_address)) {
return on_error(td::Status::Error(PSLICE() << "IP address " << new_ip_address.get_ip_str() << " is reserved"));
}
if (!(ip_address_ == new_ip_address)) {
VLOG(webhook) << "IP address has changed: " << ip_address_ << " --> " << new_ip_address;
@ -129,25 +128,24 @@ void WebhookActor::on_resolved_ip_address(td::Result<td::IPAddress> r_ip_address
VLOG(webhook) << "IP address was verified";
}
void WebhookActor::on_ssl_context_created(td::Result<td::SslCtx> r_ssl_ctx) {
if (r_ssl_ctx.is_error()) {
create_webhook_error("Can't create an SSL context", r_ssl_ctx.move_as_error(), true);
loop();
return;
}
ssl_ctx_ = r_ssl_ctx.move_as_ok();
VLOG(webhook) << "SSL context was created";
loop();
}
td::Status WebhookActor::create_connection() {
CHECK(ip_address_.is_valid());
if (!ip_address_.is_valid()) {
VLOG(webhook) << "Can't create connection: IP address is not ready";
return td::Status::Error("IP address is not ready");
}
if (parameters_->webhook_proxy_ip_address_.is_valid()) {
auto r_proxy_socket_fd = td::SocketFd::open(parameters_->webhook_proxy_ip_address_);
if (r_proxy_socket_fd.is_error()) {
return create_webhook_error("Can't connect to the webhook proxy", r_proxy_socket_fd.move_as_error(), false);
td::Slice error_message = "Can't connect to the webhook proxy";
auto error = td::Status::Error(PSLICE() << error_message << ": " << r_proxy_socket_fd.error());
VLOG(webhook) << error;
on_webhook_error(error_message);
on_error(td::Status::Error(error_message));
return error;
}
if (!was_checked_) {
TRY_STATUS(create_ssl_stream()); // check certificate
// verify webhook even we can't establish connection to the webhook
was_checked_ = true;
on_webhook_verified();
@ -190,33 +188,29 @@ td::Status WebhookActor::create_connection() {
auto r_fd = td::SocketFd::open(ip_address_);
if (r_fd.is_error()) {
return create_webhook_error("Can't connect to the webhook", r_fd.move_as_error(), false);
td::Slice error_message = "Can't connect to the webhook";
auto error = td::Status::Error(PSLICE() << error_message << ": " << r_fd.error());
VLOG(webhook) << error;
on_webhook_error(error_message);
on_error(r_fd.move_as_error());
return error;
}
return create_connection(td::BufferedFd<td::SocketFd>(r_fd.move_as_ok()));
}
td::Status WebhookActor::create_webhook_error(td::Slice error_message, td::Status &&result, bool is_public) {
CHECK(result.is_error());
auto error = td::Status::Error(PSLICE() << error_message << ": " << result);
VLOG(webhook) << error;
if (is_public) {
on_webhook_error(PSLICE() << error_message << ": " << result.public_message());
} else {
on_webhook_error(error_message);
}
on_error(std::move(result));
return error;
}
td::Result<td::SslStream> WebhookActor::create_ssl_stream() {
if (url_.protocol_ == td::HttpUrl::Protocol::Http) {
return td::SslStream();
}
CHECK(ssl_ctx_);
auto r_ssl_stream = td::SslStream::create(url_.host_, ssl_ctx_, !cert_path_.empty());
auto r_ssl_stream = td::SslStream::create(url_.host_, cert_path_, td::SslStream::VerifyPeer::On, !cert_path_.empty());
if (r_ssl_stream.is_error()) {
return create_webhook_error("Can't create an SSL connection", r_ssl_stream.move_as_error(), true);
td::Slice error_message = "Can't create an SSL connection";
auto error = td::Status::Error(PSLICE() << error_message << ": " << r_ssl_stream.error());
VLOG(webhook) << error;
on_webhook_error(PSLICE() << error_message << ": " << r_ssl_stream.error().public_message());
on_error(r_ssl_stream.move_as_error());
return std::move(error);
}
return r_ssl_stream.move_as_ok();
}
@ -227,14 +221,13 @@ td::Status WebhookActor::create_connection(td::BufferedFd<td::SocketFd> fd) {
auto id = connections_.create(Connection());
auto *conn = connections_.get(id);
conn->actor_id_ = td::create_actor<td::HttpOutboundConnection>(
PSLICE() << "Connect:" << id, std::move(fd), std::move(ssl_stream), 0, 50, 60,
td::ActorShared<td::HttpOutboundConnection::Callback>(actor_id(this), id),
SharedData::get_slow_outgoing_http_scheduler_id());
PSLICE() << "Connect:" << id, std::move(fd), std::move(ssl_stream), std::numeric_limits<size_t>::max(), 20, 60,
td::ActorShared<td::HttpOutboundConnection::Callback>(actor_id(this), id));
conn->ip_generation_ = ip_generation_;
conn->event_id_ = {};
conn->id_ = id;
ready_connections_.put(conn->to_list_node());
total_connection_count_.fetch_add(1, std::memory_order_relaxed);
total_connections_count_.fetch_add(1, std::memory_order_relaxed);
if (!was_checked_) {
was_checked_ = true;
@ -258,15 +251,6 @@ void WebhookActor::on_socket_ready_async(td::Result<td::BufferedFd<td::SocketFd>
}
void WebhookActor::create_new_connections() {
if (!ip_address_.is_valid()) {
VLOG(webhook) << "Can't create new connections: IP address is not ready";
return;
}
if (url_.protocol_ != td::HttpUrl::Protocol::Http && !ssl_ctx_) {
VLOG(webhook) << "Can't create new connections: SSL context is not ready";
return;
}
size_t need_connections = queue_updates_.size();
if (need_connections > static_cast<size_t>(max_connections_)) {
need_connections = max_connections_;
@ -303,7 +287,7 @@ void WebhookActor::create_new_connections() {
<< td::tag("after", td::format::as_time(wakeup_at - now));
break;
}
flood->add_event(now);
flood->add_event(static_cast<td::int32>(now));
if (create_connection().is_error()) {
relax_wakeup_at(now + 1.0, "create_new_connections error");
return;
@ -398,7 +382,7 @@ void WebhookActor::load_updates() {
CHECK(update.id.is_valid());
auto &dest_ptr = update_map_[update.id];
if (dest_ptr != nullptr) {
LOG(ERROR) << "Receive duplicate event " << update.id << " from TQueue";
LOG(ERROR) << "Receive duplicated event " << update.id << " from TQueue";
continue;
}
dest_ptr = td::make_unique<Update>();
@ -503,8 +487,7 @@ void WebhookActor::on_update_error(td::TQueue::EventId event_id, td::Slice error
int next_delay = update.delay_;
int next_effective_delay = retry_after;
if (retry_after == 0 && update.fail_count_ > 0) {
auto max_timeout = td::Random::fast(WEBHOOK_MAX_RESEND_TIMEOUT, WEBHOOK_MAX_RESEND_TIMEOUT * 2);
next_delay = td::min(max_timeout, next_delay * 2);
next_delay = td::min(WEBHOOK_MAX_RESEND_TIMEOUT, next_delay * 2);
next_effective_delay = next_delay;
}
if (parameters_->shared_data_->get_unix_time(now) + next_effective_delay > update.expires_at_) {
@ -589,11 +572,6 @@ void WebhookActor::send_updates() {
}
void WebhookActor::handle(td::unique_ptr<td::HttpQuery> response) {
SCOPE_EXIT {
bool dummy = false;
td::Scheduler::instance()->destroy_on_scheduler(SharedData::get_file_gc_scheduler_id(), response, dummy);
};
auto connection_id = get_link_token();
if (response) {
VLOG(webhook) << "Got response from connection " << connection_id;
@ -622,12 +600,10 @@ void WebhookActor::handle(td::unique_ptr<td::HttpQuery> response) {
if (!method.empty() && method != "deletewebhook" && method != "setwebhook" && method != "close" &&
method != "logout" && !td::begins_with(method, "get")) {
VLOG(webhook) << "Receive request " << method << " in response to webhook";
response->container_.emplace_back(PSLICE() << (tqueue_id_ & ((static_cast<td::int64>(1) << 54) - 1)));
auto token = response->container_.back().as_slice();
auto query = td::make_unique<Query>(
std::move(response->container_), token, tqueue_id_ >= (static_cast<td::int64>(1) << 54), false,
td::MutableSlice(), std::move(response->args_), std::move(response->headers_),
std::move(response->files_), parameters_->shared_data_, response->peer_address_, false);
auto query = td::make_unique<Query>(std::move(response->container_), td::MutableSlice(), false, false,
td::MutableSlice(), std::move(response->args_),
std::move(response->headers_), std::move(response->files_),
parameters_->shared_data_, response->peer_address_, false);
auto promised_query = PromisedQueryPtr(query.release(), PromiseDeleter(td::Promise<td::unique_ptr<Query>>()));
send_closure(callback_, &Callback::send, std::move(promised_query));
}
@ -676,7 +652,7 @@ void WebhookActor::handle(td::unique_ptr<td::HttpQuery> response) {
if (need_close || close_connection) {
VLOG(webhook) << "Close connection " << connection_id;
connections_.erase(connection_ptr->id_);
total_connection_count_.fetch_sub(1, std::memory_order_relaxed);
total_connections_count_.fetch_sub(1, std::memory_order_relaxed);
} else {
ready_connections_.put(connection_ptr->to_list_node());
}
@ -691,16 +667,11 @@ void WebhookActor::handle(td::unique_ptr<td::HttpQuery> response) {
void WebhookActor::start_up() {
max_loaded_updates_ = max_connections_ * 2;
last_success_time_ = td::Time::now() - 2 * IP_ADDRESS_CACHE_TIME;
if (from_db_flag_) {
next_ip_address_resolve_time_ = td::Time::now() + td::Random::fast(0, IP_ADDRESS_CACHE_TIME);
} else {
next_ip_address_resolve_time_ = last_success_time_;
}
next_ip_address_resolve_time_ = last_success_time_ = td::Time::now() - 3600;
active_new_connection_flood_.add_limit(1, 10 * max_connections_);
active_new_connection_flood_.add_limit(5, 20 * max_connections_);
active_new_connection_flood_.add_limit(0.5, 10);
pending_new_connection_flood_.add_limit(2, 1);
pending_new_connection_flood_.add_limit(1, 1);
if (!parameters_->local_mode_) {
if (url_.protocol_ == td::HttpUrl::Protocol::Https || (parameters_->allow_http_ && url_.protocol_ == td::HttpUrl::Protocol::Http)) {
@ -711,14 +682,15 @@ void WebhookActor::start_up() {
} else {
CHECK(url_.protocol_ == td::HttpUrl::Protocol::Http);
VLOG(webhook) << "Can't create connection: HTTP is forbidden";
on_error(td::Status::Error("An HTTPS URL must be provided for webhook"));
on_error(td::Status::Error("HTTPS url must be provided for webhook"));
}
}
if (fix_ip_address_ && !stop_flag_) {
auto check_status = check_ip_address(ip_address_);
if (check_status.is_error()) {
return on_error(std::move(check_status));
if (!ip_address_.is_valid()) {
on_error(td::Status::Error("Invalid IP address specified"));
} else if (!check_ip_address(ip_address_)) {
on_error(td::Status::Error(PSLICE() << "IP address " << ip_address_.get_ip_str() << " is reserved"));
}
}
@ -727,16 +699,6 @@ void WebhookActor::start_up() {
on_webhook_verified();
}
if (url_.protocol_ != td::HttpUrl::Protocol::Http && !stop_flag_) {
// asynchronously create SSL context
td::Scheduler::instance()->run_on_scheduler(SharedData::get_webhook_certificate_scheduler_id(),
[actor_id = actor_id(this), cert_path = cert_path_](td::Unit) mutable {
send_closure(
actor_id, &WebhookActor::on_ssl_context_created,
td::SslCtx::create(cert_path, td::SslCtx::VerifyPeer::On));
});
}
yield();
}
@ -758,7 +720,7 @@ void WebhookActor::close() {
}
void WebhookActor::tear_down() {
total_connection_count_.fetch_sub(connections_.size(), std::memory_order_relaxed);
total_connections_count_.fetch_sub(connections_.size(), std::memory_order_relaxed);
}
void WebhookActor::on_webhook_verified() {
@ -769,26 +731,24 @@ void WebhookActor::on_webhook_verified() {
send_closure(callback_, &Callback::webhook_verified, std::move(ip_address_str));
}
td::Status WebhookActor::check_ip_address(const td::IPAddress &addr) const {
bool WebhookActor::check_ip_address(const td::IPAddress &addr) const {
if (!addr.is_valid()) {
return td::Status::Error("Invalid IP address specified");
return false;
}
if (parameters_->local_mode_) {
return td::Status::OK();
// allow any valid IP address
return true;
}
if (!addr.is_ipv4()) {
VLOG(webhook) << "Bad IP address (not IPv4): " << addr;
return td::Status::Error("IPv6-only addresses are not allowed");
return false;
}
if (addr.is_reserved()) {
return td::Status::Error(PSLICE() << "IP address " << addr.get_ip_str() << " is reserved");
}
return td::Status::OK();
return !addr.is_reserved();
}
void WebhookActor::on_error(td::Status status) {
VLOG(webhook) << "Receive webhook error " << status;
if (!was_checked_ && !stop_flag_) {
if (!was_checked_) {
CHECK(!callback_.empty());
send_closure(std::move(callback_), &Callback::webhook_closed, std::move(status));
stop_flag_ = true;

View File

@ -1,5 +1,5 @@
//
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2023
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2021
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
@ -12,7 +12,6 @@
#include "td/net/HttpOutboundConnection.h"
#include "td/net/HttpQuery.h"
#include "td/net/SslCtx.h"
#include "td/net/SslStream.h"
#include "td/actor/actor.h"
@ -22,7 +21,6 @@
#include "td/utils/Container.h"
#include "td/utils/FlatHashMap.h"
#include "td/utils/FloodControlFast.h"
#include "td/utils/HashTableUtils.h"
#include "td/utils/HttpUrl.h"
#include "td/utils/JsonBuilder.h"
#include "td/utils/List.h"
@ -33,6 +31,7 @@
#include "td/utils/VectorQueue.h"
#include <atomic>
#include <functional>
#include <memory>
#include <set>
#include <tuple>
@ -55,18 +54,13 @@ class WebhookActor final : public td::HttpOutboundConnection::Callback {
WebhookActor(td::ActorShared<Callback> callback, td::int64 tqueue_id, td::HttpUrl url, td::string cert_path,
td::int32 max_connections, bool from_db_flag, td::string cached_ip_address, bool fix_ip_address,
td::string secret_token, std::shared_ptr<const ClientParameters> parameters);
WebhookActor(const WebhookActor &) = delete;
WebhookActor &operator=(const WebhookActor &) = delete;
WebhookActor(WebhookActor &&) = delete;
WebhookActor &operator=(WebhookActor &&) = delete;
~WebhookActor();
void update();
void close();
static td::int64 get_total_connection_count() {
return total_connection_count_;
static td::int64 get_total_connections_count() {
return total_connections_count_;
}
private:
@ -75,14 +69,14 @@ class WebhookActor final : public td::HttpOutboundConnection::Callback {
static constexpr int WEBHOOK_MAX_RESEND_TIMEOUT = 60;
static constexpr int WEBHOOK_DROP_TIMEOUT = 60 * 60 * 23;
static std::atomic<td::uint64> total_connection_count_;
static std::atomic<td::uint64> total_connections_count_;
td::ActorShared<Callback> callback_;
td::int64 tqueue_id_;
bool tqueue_empty_ = false;
std::size_t last_pending_update_count_ = MIN_PENDING_UPDATES_WARNING;
td::HttpUrl url_;
const td::string cert_path_;
td::string cert_path_;
std::shared_ptr<const ClientParameters> parameters_;
double last_error_time_ = 0;
@ -128,8 +122,8 @@ class WebhookActor final : public td::HttpOutboundConnection::Callback {
td::TQueue::EventId tqueue_offset_;
std::size_t max_loaded_updates_ = 0;
struct EventIdHash {
td::uint32 operator()(td::TQueue::EventId event_id) const {
return td::Hash<td::int32>()(event_id.value());
std::size_t operator()(td::TQueue::EventId event_id) const {
return std::hash<td::int32>()(event_id.value());
}
};
td::FlatHashMap<td::TQueue::EventId, td::unique_ptr<Update>, EventIdHash> update_map_;
@ -139,7 +133,6 @@ class WebhookActor final : public td::HttpOutboundConnection::Callback {
double first_error_410_time_ = 0;
td::SslCtx ssl_ctx_;
td::IPAddress ip_address_;
td::int32 ip_generation_ = 0;
double next_ip_address_resolve_time_ = 0;
@ -183,10 +176,6 @@ class WebhookActor final : public td::HttpOutboundConnection::Callback {
void resolve_ip_address();
void on_resolved_ip_address(td::Result<td::IPAddress> r_ip_address);
void on_ssl_context_created(td::Result<td::SslCtx> r_ssl_ctx);
td::Status create_webhook_error(td::Slice error_message, td::Status &&result, bool is_public);
td::Result<td::SslStream> create_ssl_stream();
td::Status create_connection() TD_WARN_UNUSED_RESULT;
td::Status create_connection(td::BufferedFd<td::SocketFd> fd) TD_WARN_UNUSED_RESULT;
@ -213,7 +202,7 @@ class WebhookActor final : public td::HttpOutboundConnection::Callback {
void start_up() final;
td::Status check_ip_address(const td::IPAddress &addr) const;
bool check_ip_address(const td::IPAddress &addr) const;
void on_error(td::Status status);
void on_connection_error(td::Status error) final;

View File

@ -1,5 +1,5 @@
//
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2023
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2021
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
@ -9,10 +9,13 @@
#include "telegram-bot-api/HttpConnection.h"
#include "telegram-bot-api/HttpServer.h"
#include "telegram-bot-api/HttpStatConnection.h"
#include "telegram-bot-api/Query.h"
#include "telegram-bot-api/Stats.h"
#include "telegram-bot-api/Watchdog.h"
#include "td/telegram/ClientActor.h"
#include "td/db/binlog/Binlog.h"
#include "td/db/TQueue.h"
#include "td/net/GetHostByNameActor.h"
#include "td/net/HttpInboundConnection.h"
@ -20,11 +23,13 @@
#include "td/actor/actor.h"
#include "td/actor/ConcurrentScheduler.h"
#include "td/utils/AsyncFileLog.h"
#include "td/utils/buffer.h"
#include "td/utils/CombinedLog.h"
#include "td/utils/common.h"
#include "td/utils/crypto.h"
#include "td/utils/ExitGuard.h"
#include "td/utils/FileLog.h"
#include "td/utils/format.h"
//#include "td/utils/GitInfo.h"
#include "td/utils/logging.h"
#include "td/utils/MemoryLog.h"
@ -37,14 +42,18 @@
#include "td/utils/port/rlimit.h"
#include "td/utils/port/signals.h"
#include "td/utils/port/stacktrace.h"
#include "td/utils/port/thread.h"
#include "td/utils/port/Stat.h"
#include "td/utils/port/user.h"
#include "td/utils/Promise.h"
#include "td/utils/Slice.h"
#include "td/utils/SliceBuilder.h"
#include "td/utils/Status.h"
#include "td/utils/Time.h"
#include "td/utils/TsLog.h"
#include "memprof/memprof.h"
#include <algorithm>
#include <atomic>
#include <cstdlib>
#include <memory>
@ -67,49 +76,24 @@ static void quit_signal_handler(int sig) {
static td::MemoryLog<1 << 20> memory_log;
void print_log() {
td::LogGuard log_guard;
auto buf = memory_log.get_buffer();
auto pos = memory_log.get_pos();
size_t tail_length = buf.size() - pos;
while (tail_length > 0 && buf[pos + tail_length - 1] == ' ') {
tail_length--;
}
if (tail_length + 100 >= buf.size() - pos) {
tail_length = buf.size() - pos;
}
td::signal_safe_write("------- Log dump -------\n");
td::signal_safe_write(buf.substr(pos, tail_length), false);
td::signal_safe_write(buf.substr(pos), false);
td::signal_safe_write(buf.substr(0, pos), false);
td::signal_safe_write("\n", false);
td::signal_safe_write("------------------------\n");
}
static std::atomic_bool has_failed{false};
static std::atomic_flag need_dump_statistics;
static void dump_stacktrace_signal_handler(int sig) {
if (has_failed) {
return;
}
td::LogGuard log_guard;
if (LOG_TAG != nullptr && *LOG_TAG) {
td::signal_safe_write(td::Slice(LOG_TAG));
td::signal_safe_write(td::Slice("\n"), false);
}
td::Stacktrace::print_to_stderr();
need_dump_statistics.clear();
}
static void fail_signal_handler(int sig) {
has_failed = true;
{
td::LogGuard log_guard;
td::signal_safe_write_signal_number(sig);
td::Stacktrace::PrintOptions options;
options.use_gdb = true;
td::Stacktrace::print_to_stderr(options);
}
td::signal_safe_write_signal_number(sig);
td::Stacktrace::PrintOptions options;
options.use_gdb = true;
td::Stacktrace::print_to_stderr(options);
print_log();
_Exit(EXIT_FAILURE);
}
@ -123,9 +107,6 @@ static void change_verbosity_level_signal_handler(int sig) {
static std::atomic_flag need_dump_log;
static void dump_log_signal_handler(int sig) {
if (has_failed) {
return;
}
need_dump_log.clear();
}
@ -134,6 +115,61 @@ static void sigsegv_signal_handler(int signum, void *addr) {
fail_signal_handler(signum);
}
static void dump_statistics(const std::shared_ptr<SharedData> &shared_data,
const std::shared_ptr<td::NetQueryStats> &net_query_stats) {
if (is_memprof_on()) {
LOG(WARNING) << "Memory dump:";
td::vector<AllocInfo> v;
dump_alloc([&](const AllocInfo &info) { v.push_back(info); });
std::sort(v.begin(), v.end(), [](const AllocInfo &a, const AllocInfo &b) { return a.size > b.size; });
size_t total_size = 0;
size_t other_size = 0;
int count = 0;
for (auto &info : v) {
if (count++ < 50) {
LOG(WARNING) << td::format::as_size(info.size) << td::format::as_array(info.backtrace);
} else {
other_size += info.size;
}
total_size += info.size;
}
LOG(WARNING) << td::tag("other", td::format::as_size(other_size));
LOG(WARNING) << td::tag("total size", td::format::as_size(total_size));
LOG(WARNING) << td::tag("total traces", get_ht_size());
LOG(WARNING) << td::tag("fast_backtrace_success_rate", get_fast_backtrace_success_rate());
}
auto r_mem_stat = td::mem_stat();
if (r_mem_stat.is_ok()) {
auto mem_stat = r_mem_stat.move_as_ok();
LOG(WARNING) << td::tag("rss", td::format::as_size(mem_stat.resident_size_));
LOG(WARNING) << td::tag("vm", td::format::as_size(mem_stat.virtual_size_));
LOG(WARNING) << td::tag("rss_peak", td::format::as_size(mem_stat.resident_size_peak_));
LOG(WARNING) << td::tag("vm_peak", td::format::as_size(mem_stat.virtual_size_peak_));
}
LOG(WARNING) << td::tag("buffer_mem", td::format::as_size(td::BufferAllocator::get_buffer_mem()));
LOG(WARNING) << td::tag("buffer_slice_size", td::format::as_size(td::BufferAllocator::get_buffer_slice_size()));
auto query_list_size = shared_data->query_list_size_;
auto query_count = shared_data->query_count_.load();
LOG(WARNING) << td::tag("pending queries", query_count) << td::tag("pending requests", query_list_size);
td::uint64 i = 0;
bool was_gap = false;
for (auto end = &shared_data->query_list_, cur = end->prev; cur != end; cur = cur->prev, i++) {
if (i < 20 || i > query_list_size - 20 || i % (query_list_size / 50 + 1) == 0) {
if (was_gap) {
LOG(WARNING) << "...";
was_gap = false;
}
LOG(WARNING) << static_cast<const Query &>(*cur);
} else {
was_gap = true;
}
}
td::dump_pending_network_queries(*net_query_stats);
}
int main(int argc, char *argv[]) {
SET_VERBOSITY_LEVEL(VERBOSITY_NAME(FATAL));
td::ExitGuard exit_guard;
@ -142,7 +178,6 @@ int main(int argc, char *argv[]) {
need_reopen_log.test_and_set();
need_quit.test_and_set();
need_change_verbosity_level.test_and_set();
need_dump_statistics.test_and_set();
need_dump_log.test_and_set();
td::Stacktrace::init();
@ -156,16 +191,16 @@ int main(int argc, char *argv[]) {
td::set_signal_handler(td::SignalType::Other, fail_signal_handler).ensure();
td::set_extended_signal_handler(td::SignalType::Error, sigsegv_signal_handler).ensure();
td::set_real_time_signal_handler(0, change_verbosity_level_signal_handler).ensure();
td::set_real_time_signal_handler(1, dump_log_signal_handler).ensure();
td::set_real_time_signal_handler(2, dump_stacktrace_signal_handler).ensure();
td::set_runtime_signal_handler(0, change_verbosity_level_signal_handler).ensure();
td::set_runtime_signal_handler(1, dump_log_signal_handler).ensure();
td::set_runtime_signal_handler(2, dump_stacktrace_signal_handler).ensure();
td::init_openssl_threads();
auto start_time = td::Time::now();
auto shared_data = std::make_shared<SharedData>();
auto parameters = std::make_unique<ClientParameters>();
parameters->version_ = "7.0";
parameters->version_ = "6.2";
parameters->shared_data_ = shared_data;
parameters->start_time_ = start_time;
auto net_query_stats = td::create_net_query_stats();
@ -187,8 +222,6 @@ int main(int argc, char *argv[]) {
td::string username;
td::string groupname;
td::uint64 max_connections = 0;
td::uint64 cpu_affinity = 0;
td::uint64 main_thread_affinity = 0;
ClientManager::TokenRange token_range{0, 1};
parameters->api_id_ = [](auto x) -> td::int32 {
@ -286,17 +319,6 @@ int main(int argc, char *argv[]) {
options.add_option('g', "groupname", "effective group name to switch to", td::OptionParser::parse_string(groupname));
options.add_checked_option('c', "max-connections", "maximum number of open file descriptors",
td::OptionParser::parse_integer(max_connections));
#if TD_HAVE_THREAD_AFFINITY
options.add_checked_option('\0', "cpu-affinity", "CPU affinity as 64-bit mask (defaults to all available CPUs)",
td::OptionParser::parse_integer(cpu_affinity));
options.add_checked_option(
'\0', "main-thread-affinity",
"CPU affinity of the main thread as 64-bit mask (defaults to the value of the option --cpu-affinity)",
td::OptionParser::parse_integer(main_thread_affinity));
#else
(void)cpu_affinity;
(void)main_thread_affinity;
#endif
options.add_checked_option('\0', "max-batch-operations", PSLICE() << "maximum number of batch operations (default: " << parameters->max_batch_operations << ")",
@ -304,7 +326,7 @@ int main(int argc, char *argv[]) {
options.add_checked_option('\0', "file-expiration-time",
PSLICE() << "downloaded files expire after this amount of seconds of not being used (defaults to " << parameters->file_expiration_timeout_seconds_ << ")",
td::OptionParser::parse_integer(parameters->file_expiration_timeout_seconds_));
options.add_checked_option('\0', "proxy",
"HTTP proxy server for outgoing webhook requests in the format http://host:port",
[&](td::Slice address) {
@ -353,29 +375,10 @@ int main(int argc, char *argv[]) {
log.set_second(&memory_log);
td::log_interface = &log;
td::AsyncFileLog file_log;
td::FileLog file_log;
td::TsLog ts_log(&file_log);
auto init_status = [&] {
#if TD_HAVE_THREAD_AFFINITY
if (main_thread_affinity == 0) {
main_thread_affinity = cpu_affinity;
}
if (main_thread_affinity != 0) {
auto initial_mask = td::thread::get_affinity_mask(td::this_thread::get_id());
if (initial_mask == 0) {
return td::Status::Error("Failed to get current thread affinity");
}
if (cpu_affinity != 0) {
TRY_STATUS_PREFIX(td::thread::set_affinity_mask(td::this_thread::get_id(), cpu_affinity),
"Can't set CPU affinity mask: ");
} else {
cpu_affinity = initial_mask;
}
TRY_STATUS_PREFIX(td::thread::set_affinity_mask(td::this_thread::get_id(), main_thread_affinity),
"Can't set main thread CPU affinity mask: ");
}
#endif
if (max_connections != 0) {
TRY_STATUS_PREFIX(td::set_resource_limit(td::ResourceLimitType::NoFile, max_connections),
"Can't set file descriptor limit: ");
@ -389,7 +392,7 @@ int main(int argc, char *argv[]) {
TRY_RESULT_PREFIX_ASSIGN(working_directory, td::realpath(working_directory, true),
"Invalid working directory specified: ");
if (working_directory.empty()) {
return td::Status::Error("Empty path specified as working directory");
return td::Status::Error("Working directory can't be empty");
}
if (working_directory.back() != TD_DIR_SLASH) {
working_directory += TD_DIR_SLASH;
@ -445,13 +448,13 @@ int main(int argc, char *argv[]) {
log_file_path = working_directory + log_file_path;
}
TRY_STATUS_PREFIX(file_log.init(log_file_path, log_max_file_size), "Can't open log file: ");
log.set_first(&file_log);
log.set_first(&ts_log);
}
return td::Status::OK();
}();
if (init_status.is_error()) {
LOG(PLAIN) << init_status.message();
LOG(PLAIN) << init_status.error().message();
LOG(PLAIN) << options;
return 1;
}
@ -474,55 +477,50 @@ int main(int argc, char *argv[]) {
// LOG(WARNING) << "Bot API server with commit " << td::GitInfo::commit() << ' '
// << (td::GitInfo::is_dirty() ? "(dirty)" : "") << " started";
LOG(WARNING) << "TDLight Bot API " << parameters->version_ << " server started";
LOG(WARNING) << "Bot API " << parameters->version_ << " server started";
td::ConcurrentScheduler sched(SharedData::get_thread_count() - 1, cpu_affinity);
const int threads_n = 5; // +3 for Td, one for slow HTTP connections and one for DNS resolving
td::ConcurrentScheduler sched;
sched.init(threads_n);
td::GetHostByNameActor::Options get_host_by_name_options;
get_host_by_name_options.scheduler_id = SharedData::get_dns_resolver_scheduler_id();
get_host_by_name_options.scheduler_id = threads_n;
parameters->get_host_by_name_actor_id_ =
sched.create_actor_unsafe<td::GetHostByNameActor>(0, "GetHostByName", std::move(get_host_by_name_options))
.release();
auto client_manager = sched
.create_actor_unsafe<ClientManager>(SharedData::get_client_scheduler_id(), "ClientManager",
std::move(parameters), token_range)
.release();
auto client_manager =
sched.create_actor_unsafe<ClientManager>(0, "ClientManager", std::move(parameters), token_range).release();
sched
.create_actor_unsafe<HttpServer>(
SharedData::get_client_scheduler_id(), "HttpServer", http_ip_address, http_port,
0, "HttpServer", http_ip_address, http_port,
[client_manager, shared_data] {
return td::ActorOwn<td::HttpInboundConnection::Callback>(
td::create_actor<HttpConnection>("HttpConnection", client_manager, shared_data));
})
.release();
if (http_stat_port != 0) {
sched
.create_actor_unsafe<HttpServer>(
SharedData::get_client_scheduler_id(), "HttpStatsServer", http_stat_ip_address, http_stat_port,
0, "HttpStatsServer", http_stat_ip_address, http_stat_port,
[client_manager] {
return td::ActorOwn<td::HttpInboundConnection::Callback>(
td::create_actor<HttpStatConnection>("HttpStatConnection", client_manager));
})
.release();
}
constexpr double WATCHDOG_TIMEOUT = 0.25;
auto watchdog_id = sched.create_actor_unsafe<Watchdog>(SharedData::get_watchdog_scheduler_id(), "Watchdog",
td::this_thread::get_id(), WATCHDOG_TIMEOUT);
sched.start();
double next_watchdog_kick_time = start_time;
double next_cron_time = start_time;
double last_dump_time = start_time - 1000.0;
double last_tqueue_gc_time = start_time - 1000.0;
td::int64 tqueue_deleted_events = 0;
td::int64 last_tqueue_deleted_events = 0;
bool close_flag = false;
std::atomic_bool can_quit{false};
ServerCpuStat::instance(); // create ServerCpuStat instance
while (true) {
sched.run_main(td::min(next_cron_time, next_watchdog_kick_time) - td::Time::now());
sched.run_main(next_cron_time - td::Time::now());
if (!need_reopen_log.test_and_set()) {
td::log_interface->after_rotation();
@ -535,9 +533,9 @@ int main(int argc, char *argv[]) {
}
LOG(WARNING) << "Stopping engine with uptime " << (td::Time::now() - start_time) << " seconds by a signal";
dump_statistics(shared_data, net_query_stats);
close_flag = true;
auto guard = sched.get_main_guard();
watchdog_id.reset();
send_closure(client_manager, &ClientManager::close, td::PromiseCreator::lambda([&can_quit](td::Unit) {
can_quit.store(true);
td::Scheduler::instance()->yield();
@ -564,7 +562,7 @@ int main(int argc, char *argv[]) {
if (!need_dump_log.test_and_set()) {
print_log();
need_dump_statistics.clear();
dump_statistics(shared_data, net_query_stats);
}
double now = td::Time::now();
@ -573,28 +571,32 @@ int main(int argc, char *argv[]) {
next_cron_time = now;
}
next_cron_time += 1.0;
auto guard = sched.get_main_guard();
td::Scheduler::instance()->run_on_scheduler(SharedData::get_statistics_thread_id(),
[](td::Unit) { ServerCpuStat::update(td::Time::now()); });
ServerCpuStat::update(now);
}
if (now >= start_time + 600) {
if (now > last_tqueue_gc_time + 60.0) {
auto unix_time = shared_data->get_unix_time(now);
LOG(INFO) << "Run TQueue GC at " << unix_time;
last_tqueue_gc_time = now;
auto guard = sched.get_main_guard();
send_closure(watchdog_id, &Watchdog::kick);
next_watchdog_kick_time = now + WATCHDOG_TIMEOUT / 10;
auto deleted_events = shared_data->tqueue_->run_gc(unix_time);
LOG(INFO) << "TQueue GC deleted " << deleted_events << " events";
tqueue_deleted_events += deleted_events;
if (tqueue_deleted_events > last_tqueue_deleted_events + 10000) {
LOG(WARNING) << "TQueue GC already deleted " << tqueue_deleted_events << " events since the start";
last_tqueue_deleted_events = tqueue_deleted_events;
}
}
if (!need_dump_statistics.test_and_set() || now > last_dump_time + 300.0) {
if (now > last_dump_time + 300.0) {
last_dump_time = now;
auto guard = sched.get_main_guard();
send_closure(client_manager, &ClientManager::dump_statistics);
dump_statistics(shared_data, net_query_stats);
}
}
LOG(WARNING) << "--------------------FINISH ENGINE--------------------";
if (net_query_stats.use_count() != 1) {
LOG(ERROR) << "NetQueryStats have leaked";
}
CHECK(net_query_stats.use_count() == 1);
net_query_stats = nullptr;
sched.finish();
SET_VERBOSITY_LEVEL(VERBOSITY_NAME(FATAL));