Merge remote-tracking branch 'upstream/master'

Changes how searchChatMessages works, may break some code
This commit is contained in:
Giuseppe Marino 2022-12-30 21:33:26 +01:00
commit 73c8efa528
No known key found for this signature in database
GPG Key ID: C26F7A532ADEC25E
14 changed files with 723 additions and 339 deletions

View File

@ -6,7 +6,7 @@ if (POLICY CMP0065)
cmake_policy(SET CMP0065 NEW)
endif()
project(TelegramBotApi VERSION 6.3.2 LANGUAGES CXX)
project(TelegramBotApi VERSION 6.4 LANGUAGES CXX)
if (POLICY CMP0069)
option(TELEGRAM_BOT_API_ENABLE_LTO "Use \"ON\" to enable Link Time Optimization.")

2
td

@ -1 +1 @@
Subproject commit 7eba19887ad834fd731b6b07b53c2426fe4beb59
Subproject commit 93c42f6d7c1209937431469f80427d48907f1b8d

File diff suppressed because it is too large Load Diff

View File

@ -22,13 +22,13 @@
#include "td/utils/Container.h"
#include "td/utils/FlatHashMap.h"
#include "td/utils/FlatHashSet.h"
#include "td/utils/HashTableUtils.h"
#include "td/utils/JsonBuilder.h"
#include "td/utils/Promise.h"
#include "td/utils/Slice.h"
#include "td/utils/Status.h"
#include "td/utils/WaitFreeHashMap.h"
#include <functional>
#include <limits>
#include <memory>
#include <queue>
@ -68,10 +68,12 @@ class Client final : public WebhookActor::Callback {
static constexpr bool USE_MESSAGE_DATABASE = false;
static constexpr int64 GENERAL_MESSAGE_THREAD_ID = 1 << 20;
static constexpr int32 MAX_CERTIFICATE_FILE_SIZE = 3 << 20;
static constexpr int32 MAX_DOWNLOAD_FILE_SIZE = 20 << 20;
static constexpr int32 MAX_CONCURRENTLY_SENT_CHAT_MESSAGES = 1000; // some unreasonably big value
static constexpr int32 MAX_CONCURRENTLY_SENT_CHAT_MESSAGES = 250; // some unreasonably big value
static constexpr std::size_t MIN_PENDING_UPDATES_WARNING = 200;
@ -97,6 +99,7 @@ class Client final : public WebhookActor::Callback {
static constexpr int USER_ONLY_ERROR_CODE = 405;
static constexpr Slice USER_ONLY_ERROR_DESCRIPTION = "Method Not Allowed: You can only use this method as a user";
class JsonEmptyObject;
class JsonFile;
class JsonDatedFile;
class JsonDatedFiles;
@ -132,7 +135,6 @@ class Client final : public WebhookActor::Callback {
class JsonPollAnswer;
class JsonEntity;
class JsonVectorEntities;
class JsonCallbackGame;
class JsonWebAppInfo;
class JsonInlineKeyboardButton;
class JsonInlineKeyboard;
@ -156,7 +158,7 @@ class Client final : public WebhookActor::Callback {
class JsonChatMemberUpdated;
class JsonChatJoinRequest;
class JsonForumTopicCreated;
class JsonForumTopicIsClosedToggled;
class JsonForumTopicEdited;
class JsonForumTopicInfo;
class JsonGameHighScore;
class JsonAddress;
@ -168,10 +170,9 @@ class Client final : public WebhookActor::Callback {
class JsonWebAppData;
class JsonProximityAlertTriggered;
class JsonVideoChatScheduled;
class JsonVideoChatStarted;
class JsonVideoChatEnded;
class JsonInviteVideoChatParticipants;
class JsonChatSetTtl;
class JsonChatSetMessageAutoDeleteTime;
class JsonUpdateTypes;
class JsonWebhookInfo;
class JsonStickerSet;
@ -184,6 +185,7 @@ class Client final : public WebhookActor::Callback {
class JsonChats;
class JsonChatsNearby;
class JsonMessagesArray;
class JsonFoundMessages;
class JsonProxy;
class JsonProxiesArray;
//stop custom Json objects
@ -236,6 +238,7 @@ class Client final : public WebhookActor::Callback {
class TdOnJoinChatCallback;
class TdOnReturnChatCallback;
class TdOnReturnMessagesCallback;
class TdOnFoundMessagesCallback;
class TdOnGetCallbackQueryAnswerCallback;
class TdOnGetProxiesQueryCallback;
class TdOnAddProxyQueryCallback;
@ -359,15 +362,10 @@ class Client final : public WebhookActor::Callback {
void on_result(td::uint64 id, object_ptr<td_api::Object> result);
void on_update_authorization_state();
void log_out(bool is_api_id_invalid);
Slice get_logging_out_error_description() const;
void log_out(int32 error_code, Slice error_message);
void on_closed();
void finish_closing();
static int32 get_database_scheduler_id();
static int32 get_file_gc_scheduler_id();
void clear_tqueue();
bool allow_update_before_authorization(const td_api::Object *update) const;
@ -519,6 +517,8 @@ class Client final : public WebhookActor::Callback {
static td::Result<int64> get_user_id(const Query *query, Slice field_name = Slice("user_id"));
void decrease_yet_unsent_message_count(int64 chat_id, int32 count);
int64 extract_yet_unsent_message_query_id(int64 chat_id, int64 message_id, bool *is_reply_to_message_deleted);
// start custom helper methods
@ -614,6 +614,11 @@ class Client final : public WebhookActor::Callback {
Status process_reopen_forum_topic_query(PromisedQueryPtr &query);
Status process_delete_forum_topic_query(PromisedQueryPtr &query);
Status process_unpin_all_forum_topic_messages_query(PromisedQueryPtr &query);
Status process_edit_general_forum_topic_query(PromisedQueryPtr &query);
Status process_close_general_forum_topic_query(PromisedQueryPtr &query);
Status process_reopen_general_forum_topic_query(PromisedQueryPtr &query);
Status process_hide_general_forum_topic_query(PromisedQueryPtr &query);
Status process_unhide_general_forum_topic_query(PromisedQueryPtr &query);
Status process_get_chat_member_query(PromisedQueryPtr &query);
Status process_get_chat_administrators_query(PromisedQueryPtr &query);
Status process_get_chat_member_count_query(PromisedQueryPtr &query);
@ -719,10 +724,19 @@ class Client final : public WebhookActor::Callback {
void abort_long_poll(bool from_set_webhook);
void fail_query_closing(PromisedQueryPtr &&query) const;
void fail_query_closing(PromisedQueryPtr &&query);
void fail_query_conflict(Slice message, PromisedQueryPtr &&query);
struct ClosingError {
int code;
int retry_after;
Slice message;
};
ClosingError get_closing_error();
static int get_retry_after_time(Slice error_message);
static void fail_query_with_error(PromisedQueryPtr query, int32 error_code, Slice error_message,
Slice default_message = Slice());
@ -817,6 +831,8 @@ class Client final : public WebhookActor::Callback {
bool has_location = false;
bool join_to_send_messages = false;
bool join_by_request = false;
bool has_hidden_members = false;
bool has_aggressive_anti_spam_enabled = false;
// start custom properties
bool is_verified = false;
@ -832,6 +848,8 @@ class Client final : public WebhookActor::Callback {
void set_supergroup_slow_mode_delay(int64 supergroup_id, int32 slow_mode_delay);
void set_supergroup_linked_chat_id(int64 supergroup_id, int64 linked_chat_id);
void set_supergroup_location(int64 supergroup_id, object_ptr<td_api::chatLocation> location);
void set_supergroup_has_hidden_members(int64 supergroup_id, bool has_hidden_members);
void set_supergroup_has_aggressive_anti_spam_enabled(int64 supergroup_id, bool has_aggressive_anti_spam_enabled);
SupergroupInfo *add_supergroup_info(int64 supergroup_id);
const SupergroupInfo *get_supergroup_info(int64 supergroup_id) const;
@ -942,7 +960,8 @@ class Client final : public WebhookActor::Callback {
td::unique_ptr<MessageInfo> delete_message(int64 chat_id, int64 message_id, bool only_from_cache);
void add_new_message(object_ptr<td_api::message> &&message, bool is_edited);
void process_new_message_queue(int64 chat_id);
void process_new_message_queue(int64 chat_id, int state);
struct FullMessageId {
int64 chat_id;
@ -959,14 +978,14 @@ class Client final : public WebhookActor::Callback {
};
struct FullMessageIdHash {
std::size_t operator()(FullMessageId full_message_id) const {
return std::hash<td::int64>()(full_message_id.chat_id) * 2023654985u +
std::hash<td::int64>()(full_message_id.message_id);
td::uint32 operator()(FullMessageId full_message_id) const {
return td::Hash<td::int64>()(full_message_id.chat_id) * 2023654985u +
td::Hash<td::int64>()(full_message_id.message_id);
}
};
FullMessageId add_message(object_ptr<td_api::message> &&message, bool force_update_content = false);
const MessageInfo *get_message(int64 chat_id, int64 message_id) const;
const MessageInfo *get_message(int64 chat_id, int64 message_id, bool force_cache) const;
MessageInfo *get_message_editable(int64 chat_id, int64 message_id);
void update_message_content(int64 chat_id, int64 message_id, object_ptr<td_api::MessageContent> &&content);
@ -998,6 +1017,7 @@ class Client final : public WebhookActor::Callback {
const td::string &inline_message_id);
void add_new_callback_query(object_ptr<td_api::updateNewCallbackQuery> &&query);
void process_new_callback_query_queue(int64 user_id, int state);
void add_new_inline_callback_query(object_ptr<td_api::updateNewInlineCallbackQuery> &&query);
@ -1085,6 +1105,10 @@ class Client final : public WebhookActor::Callback {
int64 my_id_ = -1;
int32 authorization_date_ = -1;
double next_authorization_time_ = 0;
int32 prev_retry_after = 0;
td::string retry_after_error_message;
int64 group_anonymous_bot_user_id_ = 0;
int64 channel_bot_user_id_ = 0;
@ -1216,6 +1240,9 @@ class Client final : public WebhookActor::Callback {
double previous_get_updates_finish_time_ = 0;
double next_get_updates_conflict_time_ = 0;
int32 flood_limited_query_count_ = 0;
double next_flood_limit_warning_time_ = 0;
td::uint64 webhook_generation_ = 1;
UpdateType delayed_update_type_ = UpdateType::Size;

View File

@ -209,14 +209,14 @@ bool ClientManager::check_flood_limits(PromisedQueryPtr &query, bool is_user_log
flood_control.add_limit(60 * 60, 600); // 600 in an hour
}
}
auto now = static_cast<td::uint32>(td::Time::now());
td::uint32 wakeup_at = flood_control.get_wakeup_at();
auto now = td::Time::now();
auto wakeup_at = flood_control.get_wakeup_at();
if (wakeup_at > now) {
LOG(INFO) << "Failed to create Client from IP address " << ip_address;
query->set_retry_after_error(static_cast<int>(wakeup_at - now) + 1);
return false;
}
flood_control.add_event(static_cast<td::int32>(now));
flood_control.add_event(now);
}
return true;
}
@ -344,13 +344,13 @@ void ClientManager::get_stats(td::Promise<td::BufferSlice> promise,
if(as_json) {
jb_root("buffer_memory", JsonStatsSize(td::BufferAllocator::get_buffer_mem()));
jb_root("active_webhook_connections", td::JsonLong(WebhookActor::get_total_connections_count()));
jb_root("active_requests", td::JsonLong(parameters_->shared_data_->query_count_.load()));
jb_root("active_webhook_connections", td::JsonLong(WebhookActor::get_total_connection_count()));
jb_root("active_requests", td::JsonLong(parameters_->shared_data_->query_count_.load(std::memory_order_relaxed)));
jb_root("active_network_queries", td::JsonLong(td::get_pending_network_query_count(*parameters_->net_query_stats_)));
} else {
sb << "buffer_memory\t" << td::format::as_size(td::BufferAllocator::get_buffer_mem()) << '\n';
sb << "active_webhook_connections\t" << WebhookActor::get_total_connections_count() << '\n';
sb << "active_requests\t" << parameters_->shared_data_->query_count_.load() << '\n';
sb << "active_webhook_connections\t" << WebhookActor::get_total_connection_count() << '\n';
sb << "active_requests\t" << parameters_->shared_data_->query_count_.load(std::memory_order_relaxed) << '\n';
sb << "active_network_queries\t" << td::get_pending_network_query_count(*parameters_->net_query_stats_) << '\n';
}
if(as_json) {
@ -370,9 +370,10 @@ void ClientManager::get_stats(td::Promise<td::BufferSlice> promise,
ServerBotInfo bot_info = client_info->client_.get_actor_unsafe()->get_bot_info();
auto active_request_count = client_info->stat_.get_active_request_count();
auto active_file_upload_bytes = client_info->stat_.get_active_file_upload_bytes();
auto active_file_upload_count = client_info->stat_.get_active_file_upload_count();
auto stats = client_info->stat_.as_json_ready_vector(now);
JsonStatsBotAdvanced bot(
std::move(top_bot_id), std::move(bot_info), active_request_count, active_file_upload_bytes, std::move(stats), parameters_->stats_hide_sensible_data_, now
std::move(top_bot_id), std::move(bot_info), active_request_count, active_file_upload_bytes, active_file_upload_count, std::move(stats), parameters_->stats_hide_sensible_data_, now
);
bots.push_back(bot);
}
@ -385,6 +386,7 @@ void ClientManager::get_stats(td::Promise<td::BufferSlice> promise,
auto bot_info = client_info->client_.get_actor_unsafe()->get_bot_info();
auto active_request_count = client_info->stat_.get_active_request_count();
auto active_file_upload_bytes = client_info->stat_.get_active_file_upload_bytes();
auto active_file_upload_count = client_info->stat_.get_active_file_upload_count();
sb << '\n';
sb << "id\t" << bot_info.id_ << '\n';
sb << "uptime\t" << now - bot_info.start_time_ << '\n';
@ -398,6 +400,9 @@ void ClientManager::get_stats(td::Promise<td::BufferSlice> promise,
if (active_file_upload_bytes != 0) {
sb << "active_file_upload_bytes\t" << active_file_upload_bytes << '\n';
}
if (active_file_upload_count != 0) {
sb << "active_file_upload_count\t" << active_file_upload_count << '\n';
}
if (!bot_info.webhook_.empty()) {
if (!parameters_->stats_hide_sensible_data_) {
sb << "webhook\t" << bot_info.webhook_ << '\n';

View File

@ -53,6 +53,16 @@ struct SharedData {
}
return static_cast<td::int32>(result);
}
static td::int32 get_database_scheduler_id() {
// the same scheduler as for database in Td
return 1;
}
static td::int32 get_file_gc_scheduler_id() {
// the same scheduler as for file GC in Td
return 2;
}
};
struct ClientParameters {

View File

@ -46,7 +46,7 @@ class HttpServer final : public td::TcpListener::Callback {
set_timeout_at(wakeup_at);
return;
}
flood_control_.add_event(static_cast<td::int32>(now));
flood_control_.add_event(now);
LOG(INFO) << "Create tcp listener " << td::tag("address", ip_address_) << td::tag("port", port_);
listener_ = td::create_actor<td::TcpListener>(
PSLICE() << "TcpListener" << td::tag("address", ip_address_) << td::tag("port", port_), port_,

View File

@ -136,7 +136,7 @@ void Query::send_response_stat() const {
return;
}
send_closure(stat_actor_, &BotStatActor::add_event<ServerBotStat::Response>,
ServerBotStat::Response{state_ == State::OK, answer_.size(), files_size()}, now);
ServerBotStat::Response{state_ == State::OK, answer_.size(), file_count(), files_size()}, now);
}
} // namespace telegram_bot_api

View File

@ -196,6 +196,15 @@ double BotStatActor::get_score(double now) {
return result;
}
double BotStatActor::get_minute_update_count(double now) {
auto minute_stat = stat_[2].stat_duration(now);
double result = minute_stat.first.update_count_;
if (minute_stat.second != 0) {
result /= minute_stat.second;
}
return result;
}
td::int64 BotStatActor::get_active_request_count() const {
return active_request_count_;
}
@ -204,6 +213,10 @@ td::int64 BotStatActor::get_active_file_upload_bytes() const {
return active_file_upload_bytes_;
}
td::int64 BotStatActor::get_active_file_upload_count() const {
return active_file_upload_count_;
}
bool BotStatActor::is_active(double now) const {
return last_activity_timestamp_ > now - 86400;
}

View File

@ -116,6 +116,7 @@ struct ServerBotStat {
struct Response {
bool ok_;
size_t size_;
td::int64 file_count_;
td::int64 files_size_;
};
void on_event(const Response &response) {
@ -183,15 +184,20 @@ class BotStatActor final : public td::Actor {
td::vector<StatItem> as_vector(double now);
td::vector<ServerBotStat> as_json_ready_vector(double now);
td::string get_description() const;
td::vector<td::string> get_jsonable_description() const;
double get_score(double now);
double get_minute_update_count(double now);
td::int64 get_active_request_count() const;
td::int64 get_active_file_upload_bytes() const;
td::int64 get_active_file_upload_count() const;
bool is_active(double now) const;
static constexpr std::size_t SIZE = 4;
@ -203,12 +209,14 @@ class BotStatActor final : public td::Actor {
double last_activity_timestamp_ = -1e9;
td::int64 active_request_count_ = 0;
td::int64 active_file_upload_bytes_ = 0;
td::int64 active_file_upload_count_ = 0;
void on_event(const ServerBotStat::Update &update) {
}
void on_event(const ServerBotStat::Response &response) {
active_request_count_--;
active_file_upload_count_ -= response.file_count_;
active_file_upload_bytes_ -= response.files_size_;
CHECK(active_request_count_ >= 0);
CHECK(active_file_upload_bytes_ >= 0);
@ -216,6 +224,7 @@ class BotStatActor final : public td::Actor {
void on_event(const ServerBotStat::Request &request) {
active_request_count_++;
active_file_upload_count_ += request.file_count_;
active_file_upload_bytes_ += request.files_size_;
}
};

View File

@ -202,12 +202,13 @@ class JsonStatsBotAdvanced : public JsonStatsBot {
ServerBotInfo bot,
td::int64 active_request_count,
td::int64 active_file_upload_bytes,
td::int64 active_file_upload_count,
td::vector<ServerBotStat> stats,
const bool hide_sensible_data,
const double now)
: JsonStatsBot(std::move(score_id_pair)), bot_(std::move(bot)), active_request_count_(active_request_count),
active_file_upload_bytes_(active_file_upload_bytes), stats_(std::move(stats)),
hide_sensible_data_(hide_sensible_data), now_(now) {
active_file_upload_bytes_(active_file_upload_bytes), active_file_upload_count_(active_file_upload_count),
stats_(std::move(stats)), hide_sensible_data_(hide_sensible_data), now_(now) {
}
void store(td::JsonValueScope *scope) const {
auto object = scope->enter_object();
@ -240,6 +241,7 @@ class JsonStatsBotAdvanced : public JsonStatsBot {
ServerBotInfo bot_;
td::int64 active_request_count_;
td::int64 active_file_upload_bytes_;
td::int64 active_file_upload_count_;
td::vector<ServerBotStat> stats_;
const bool hide_sensible_data_;
const double now_;

View File

@ -11,11 +11,8 @@
#include "td/net/GetHostByNameActor.h"
#include "td/net/HttpHeaderCreator.h"
#include "td/net/HttpProxy.h"
#include "td/net/SslStream.h"
#include "td/net/TransparentProxy.h"
#include "td/actor/actor.h"
#include "td/utils/base64.h"
#include "td/utils/buffer.h"
#include "td/utils/common.h"
@ -32,13 +29,11 @@
#include "td/utils/Span.h"
#include "td/utils/Time.h"
#include <limits>
namespace telegram_bot_api {
static int VERBOSITY_NAME(webhook) = VERBOSITY_NAME(DEBUG);
std::atomic<td::uint64> WebhookActor::total_connections_count_{0};
std::atomic<td::uint64> WebhookActor::total_connection_count_{0};
WebhookActor::WebhookActor(td::ActorShared<Callback> callback, td::int64 tqueue_id, td::HttpUrl url,
td::string cert_path, td::int32 max_connections, bool from_db_flag,
@ -52,8 +47,10 @@ WebhookActor::WebhookActor(td::ActorShared<Callback> callback, td::int64 tqueue_
, fix_ip_address_(fix_ip_address)
, from_db_flag_(from_db_flag)
, max_connections_(max_connections)
, secret_token_(std::move(secret_token)) {
, secret_token_(std::move(secret_token))
, slow_scheduler_id_(td::Scheduler::instance()->sched_count() - 2) {
CHECK(max_connections_ > 0);
CHECK(slow_scheduler_id_ > 0);
if (!cached_ip_address.empty()) {
auto r_ip_address = td::IPAddress::get_ip_address(cached_ip_address);
@ -74,6 +71,11 @@ WebhookActor::WebhookActor(td::ActorShared<Callback> callback, td::int64 tqueue_
<< ", max_connections = " << max_connections_;
}
WebhookActor::~WebhookActor() {
td::Scheduler::instance()->destroy_on_scheduler(SharedData::get_file_gc_scheduler_id(), update_map_, queue_updates_,
queues_);
}
void WebhookActor::relax_wakeup_at(double wakeup_at, const char *source) {
if (wakeup_at_ == 0 || wakeup_at < wakeup_at_) {
VLOG(webhook) << "Wake up in " << wakeup_at - td::Time::now() << " from " << source;
@ -114,8 +116,9 @@ void WebhookActor::on_resolved_ip_address(td::Result<td::IPAddress> r_ip_address
return on_error(r_ip_address.move_as_error());
}
auto new_ip_address = r_ip_address.move_as_ok();
if (!check_ip_address(new_ip_address)) {
return on_error(td::Status::Error(PSLICE() << "IP address " << new_ip_address.get_ip_str() << " is reserved"));
auto check_status = check_ip_address(new_ip_address);
if (check_status.is_error()) {
return on_error(std::move(check_status));
}
if (!(ip_address_ == new_ip_address)) {
VLOG(webhook) << "IP address has changed: " << ip_address_ << " --> " << new_ip_address;
@ -128,24 +131,25 @@ void WebhookActor::on_resolved_ip_address(td::Result<td::IPAddress> r_ip_address
VLOG(webhook) << "IP address was verified";
}
td::Status WebhookActor::create_connection() {
if (!ip_address_.is_valid()) {
VLOG(webhook) << "Can't create connection: IP address is not ready";
return td::Status::Error("IP address is not ready");
void WebhookActor::on_ssl_context_created(td::Result<td::SslCtx> r_ssl_ctx) {
if (r_ssl_ctx.is_error()) {
create_webhook_error("Can't create an SSL context", r_ssl_ctx.move_as_error(), true);
loop();
return;
}
ssl_ctx_ = r_ssl_ctx.move_as_ok();
VLOG(webhook) << "SSL context was created";
loop();
}
td::Status WebhookActor::create_connection() {
CHECK(ip_address_.is_valid());
if (parameters_->webhook_proxy_ip_address_.is_valid()) {
auto r_proxy_socket_fd = td::SocketFd::open(parameters_->webhook_proxy_ip_address_);
if (r_proxy_socket_fd.is_error()) {
td::Slice error_message = "Can't connect to the webhook proxy";
auto error = td::Status::Error(PSLICE() << error_message << ": " << r_proxy_socket_fd.error());
VLOG(webhook) << error;
on_webhook_error(error_message);
on_error(td::Status::Error(error_message));
return error;
return create_webhook_error("Can't connect to the webhook proxy", r_proxy_socket_fd.move_as_error(), false);
}
if (!was_checked_) {
TRY_STATUS(create_ssl_stream()); // check certificate
// verify webhook even we can't establish connection to the webhook
was_checked_ = true;
on_webhook_verified();
@ -188,29 +192,33 @@ td::Status WebhookActor::create_connection() {
auto r_fd = td::SocketFd::open(ip_address_);
if (r_fd.is_error()) {
td::Slice error_message = "Can't connect to the webhook";
auto error = td::Status::Error(PSLICE() << error_message << ": " << r_fd.error());
VLOG(webhook) << error;
on_webhook_error(error_message);
on_error(r_fd.move_as_error());
return error;
return create_webhook_error("Can't connect to the webhook", r_fd.move_as_error(), false);
}
return create_connection(td::BufferedFd<td::SocketFd>(r_fd.move_as_ok()));
}
td::Status WebhookActor::create_webhook_error(td::Slice error_message, td::Status &&result, bool is_public) {
CHECK(result.is_error());
auto error = td::Status::Error(PSLICE() << error_message << ": " << result);
VLOG(webhook) << error;
if (is_public) {
on_webhook_error(PSLICE() << error_message << ": " << result.public_message());
} else {
on_webhook_error(error_message);
}
on_error(std::move(result));
return std::move(error);
}
td::Result<td::SslStream> WebhookActor::create_ssl_stream() {
if (url_.protocol_ == td::HttpUrl::Protocol::Http) {
return td::SslStream();
}
auto r_ssl_stream = td::SslStream::create(url_.host_, cert_path_, td::SslStream::VerifyPeer::On, !cert_path_.empty());
CHECK(ssl_ctx_);
auto r_ssl_stream = td::SslStream::create(url_.host_, ssl_ctx_, !cert_path_.empty());
if (r_ssl_stream.is_error()) {
td::Slice error_message = "Can't create an SSL connection";
auto error = td::Status::Error(PSLICE() << error_message << ": " << r_ssl_stream.error());
VLOG(webhook) << error;
on_webhook_error(PSLICE() << error_message << ": " << r_ssl_stream.error().public_message());
on_error(r_ssl_stream.move_as_error());
return std::move(error);
return create_webhook_error("Can't create an SSL connection", r_ssl_stream.move_as_error(), true);
}
return r_ssl_stream.move_as_ok();
}
@ -221,13 +229,13 @@ td::Status WebhookActor::create_connection(td::BufferedFd<td::SocketFd> fd) {
auto id = connections_.create(Connection());
auto *conn = connections_.get(id);
conn->actor_id_ = td::create_actor<td::HttpOutboundConnection>(
PSLICE() << "Connect:" << id, std::move(fd), std::move(ssl_stream), std::numeric_limits<size_t>::max(), 20, 60,
td::ActorShared<td::HttpOutboundConnection::Callback>(actor_id(this), id));
PSLICE() << "Connect:" << id, std::move(fd), std::move(ssl_stream), 0, 20, 60,
td::ActorShared<td::HttpOutboundConnection::Callback>(actor_id(this), id), slow_scheduler_id_);
conn->ip_generation_ = ip_generation_;
conn->event_id_ = {};
conn->id_ = id;
ready_connections_.put(conn->to_list_node());
total_connections_count_.fetch_add(1, std::memory_order_relaxed);
total_connection_count_.fetch_add(1, std::memory_order_relaxed);
if (!was_checked_) {
was_checked_ = true;
@ -251,6 +259,15 @@ void WebhookActor::on_socket_ready_async(td::Result<td::BufferedFd<td::SocketFd>
}
void WebhookActor::create_new_connections() {
if (!ip_address_.is_valid()) {
VLOG(webhook) << "Can't create new connections: IP address is not ready";
return;
}
if (url_.protocol_ != td::HttpUrl::Protocol::Http && !ssl_ctx_) {
VLOG(webhook) << "Can't create new connections: SSL context is not ready";
return;
}
size_t need_connections = queue_updates_.size();
if (need_connections > static_cast<size_t>(max_connections_)) {
need_connections = max_connections_;
@ -287,7 +304,7 @@ void WebhookActor::create_new_connections() {
<< td::tag("after", td::format::as_time(wakeup_at - now));
break;
}
flood->add_event(static_cast<td::int32>(now));
flood->add_event(now);
if (create_connection().is_error()) {
relax_wakeup_at(now + 1.0, "create_new_connections error");
return;
@ -652,7 +669,7 @@ void WebhookActor::handle(td::unique_ptr<td::HttpQuery> response) {
if (need_close || close_connection) {
VLOG(webhook) << "Close connection " << connection_id;
connections_.erase(connection_ptr->id_);
total_connections_count_.fetch_sub(1, std::memory_order_relaxed);
total_connection_count_.fetch_sub(1, std::memory_order_relaxed);
} else {
ready_connections_.put(connection_ptr->to_list_node());
}
@ -668,10 +685,10 @@ void WebhookActor::start_up() {
max_loaded_updates_ = max_connections_ * 2;
next_ip_address_resolve_time_ = last_success_time_ = td::Time::now() - 3600;
active_new_connection_flood_.add_limit(1, 10 * max_connections_);
active_new_connection_flood_.add_limit(5, 20 * max_connections_);
pending_new_connection_flood_.add_limit(1, 1);
active_new_connection_flood_.add_limit(1, 20);
pending_new_connection_flood_.add_limit(2, 1);
if (!parameters_->local_mode_) {
if (url_.protocol_ == td::HttpUrl::Protocol::Https || (parameters_->allow_http_ && url_.protocol_ == td::HttpUrl::Protocol::Http)) {
@ -682,15 +699,14 @@ void WebhookActor::start_up() {
} else {
CHECK(url_.protocol_ == td::HttpUrl::Protocol::Http);
VLOG(webhook) << "Can't create connection: HTTP is forbidden";
on_error(td::Status::Error("HTTPS url must be provided for webhook"));
on_error(td::Status::Error("An HTTPS URL must be provided for webhook"));
}
}
if (fix_ip_address_ && !stop_flag_) {
if (!ip_address_.is_valid()) {
on_error(td::Status::Error("Invalid IP address specified"));
} else if (!check_ip_address(ip_address_)) {
on_error(td::Status::Error(PSLICE() << "IP address " << ip_address_.get_ip_str() << " is reserved"));
auto check_status = check_ip_address(ip_address_);
if (check_status.is_error()) {
return on_error(std::move(check_status));
}
}
@ -699,6 +715,15 @@ void WebhookActor::start_up() {
on_webhook_verified();
}
if (url_.protocol_ != td::HttpUrl::Protocol::Http && !stop_flag_) {
// asynchronously create SSL context
td::Scheduler::instance()->run_on_scheduler(
SharedData::get_database_scheduler_id(), [actor_id = actor_id(this), cert_path = cert_path_](td::Unit) mutable {
send_closure(actor_id, &WebhookActor::on_ssl_context_created,
td::SslCtx::create(cert_path, td::SslCtx::VerifyPeer::On));
});
}
yield();
}
@ -720,7 +745,7 @@ void WebhookActor::close() {
}
void WebhookActor::tear_down() {
total_connections_count_.fetch_sub(connections_.size(), std::memory_order_relaxed);
total_connection_count_.fetch_sub(connections_.size(), std::memory_order_relaxed);
}
void WebhookActor::on_webhook_verified() {
@ -731,24 +756,26 @@ void WebhookActor::on_webhook_verified() {
send_closure(callback_, &Callback::webhook_verified, std::move(ip_address_str));
}
bool WebhookActor::check_ip_address(const td::IPAddress &addr) const {
td::Status WebhookActor::check_ip_address(const td::IPAddress &addr) const {
if (!addr.is_valid()) {
return false;
return td::Status::Error("Invalid IP address specified");
}
if (parameters_->local_mode_) {
// allow any valid IP address
return true;
return td::Status::OK();
}
if (!addr.is_ipv4()) {
VLOG(webhook) << "Bad IP address (not IPv4): " << addr;
return false;
return td::Status::Error("IPv6-only addresses are not allowed");
}
return !addr.is_reserved();
if (addr.is_reserved()) {
return td::Status::Error(PSLICE() << "IP address " << addr.get_ip_str() << " is reserved");
}
return td::Status::OK();
}
void WebhookActor::on_error(td::Status status) {
VLOG(webhook) << "Receive webhook error " << status;
if (!was_checked_) {
if (!was_checked_ && !stop_flag_) {
CHECK(!callback_.empty());
send_closure(std::move(callback_), &Callback::webhook_closed, std::move(status));
stop_flag_ = true;

View File

@ -12,6 +12,7 @@
#include "td/net/HttpOutboundConnection.h"
#include "td/net/HttpQuery.h"
#include "td/net/SslCtx.h"
#include "td/net/SslStream.h"
#include "td/actor/actor.h"
@ -21,6 +22,7 @@
#include "td/utils/Container.h"
#include "td/utils/FlatHashMap.h"
#include "td/utils/FloodControlFast.h"
#include "td/utils/HashTableUtils.h"
#include "td/utils/HttpUrl.h"
#include "td/utils/JsonBuilder.h"
#include "td/utils/List.h"
@ -31,7 +33,6 @@
#include "td/utils/VectorQueue.h"
#include <atomic>
#include <functional>
#include <memory>
#include <set>
#include <tuple>
@ -54,13 +55,18 @@ class WebhookActor final : public td::HttpOutboundConnection::Callback {
WebhookActor(td::ActorShared<Callback> callback, td::int64 tqueue_id, td::HttpUrl url, td::string cert_path,
td::int32 max_connections, bool from_db_flag, td::string cached_ip_address, bool fix_ip_address,
td::string secret_token, std::shared_ptr<const ClientParameters> parameters);
WebhookActor(const WebhookActor &) = delete;
WebhookActor &operator=(const WebhookActor &) = delete;
WebhookActor(WebhookActor &&) = delete;
WebhookActor &operator=(WebhookActor &&) = delete;
~WebhookActor();
void update();
void close();
static td::int64 get_total_connections_count() {
return total_connections_count_;
static td::int64 get_total_connection_count() {
return total_connection_count_;
}
private:
@ -69,14 +75,14 @@ class WebhookActor final : public td::HttpOutboundConnection::Callback {
static constexpr int WEBHOOK_MAX_RESEND_TIMEOUT = 60;
static constexpr int WEBHOOK_DROP_TIMEOUT = 60 * 60 * 23;
static std::atomic<td::uint64> total_connections_count_;
static std::atomic<td::uint64> total_connection_count_;
td::ActorShared<Callback> callback_;
td::int64 tqueue_id_;
bool tqueue_empty_ = false;
std::size_t last_pending_update_count_ = MIN_PENDING_UPDATES_WARNING;
td::HttpUrl url_;
td::string cert_path_;
const td::string cert_path_;
std::shared_ptr<const ClientParameters> parameters_;
double last_error_time_ = 0;
@ -122,8 +128,8 @@ class WebhookActor final : public td::HttpOutboundConnection::Callback {
td::TQueue::EventId tqueue_offset_;
std::size_t max_loaded_updates_ = 0;
struct EventIdHash {
std::size_t operator()(td::TQueue::EventId event_id) const {
return std::hash<td::int32>()(event_id.value());
td::uint32 operator()(td::TQueue::EventId event_id) const {
return td::Hash<td::int32>()(event_id.value());
}
};
td::FlatHashMap<td::TQueue::EventId, td::unique_ptr<Update>, EventIdHash> update_map_;
@ -133,6 +139,7 @@ class WebhookActor final : public td::HttpOutboundConnection::Callback {
double first_error_410_time_ = 0;
td::SslCtx ssl_ctx_;
td::IPAddress ip_address_;
td::int32 ip_generation_ = 0;
double next_ip_address_resolve_time_ = 0;
@ -170,12 +177,17 @@ class WebhookActor final : public td::HttpOutboundConnection::Callback {
double last_success_time_ = 0;
double wakeup_at_ = 0;
bool last_update_was_successful_ = true;
td::int32 slow_scheduler_id_ = -1;
void relax_wakeup_at(double wakeup_at, const char *source);
void resolve_ip_address();
void on_resolved_ip_address(td::Result<td::IPAddress> r_ip_address);
void on_ssl_context_created(td::Result<td::SslCtx> r_ssl_ctx);
td::Status create_webhook_error(td::Slice error_message, td::Status &&result, bool is_public);
td::Result<td::SslStream> create_ssl_stream();
td::Status create_connection() TD_WARN_UNUSED_RESULT;
td::Status create_connection(td::BufferedFd<td::SocketFd> fd) TD_WARN_UNUSED_RESULT;
@ -202,7 +214,7 @@ class WebhookActor final : public td::HttpOutboundConnection::Callback {
void start_up() final;
bool check_ip_address(const td::IPAddress &addr) const;
td::Status check_ip_address(const td::IPAddress &addr) const;
void on_error(td::Status status);
void on_connection_error(td::Status error) final;

View File

@ -67,6 +67,7 @@ static void quit_signal_handler(int sig) {
static td::MemoryLog<1 << 20> memory_log;
void print_log() {
td::LogGuard log_guard;
auto buf = memory_log.get_buffer();
auto pos = memory_log.get_pos();
size_t tail_length = buf.size() - pos;
@ -85,19 +86,30 @@ void print_log() {
static std::atomic_bool has_failed{false};
static std::atomic_flag need_dump_statistics;
static void dump_stacktrace_signal_handler(int sig) {
if (has_failed) {
return;
}
td::LogGuard log_guard;
if (LOG_TAG != nullptr && *LOG_TAG) {
td::signal_safe_write(td::Slice(LOG_TAG));
td::signal_safe_write(td::Slice("\n"), false);
}
td::Stacktrace::print_to_stderr();
need_dump_statistics.clear();
}
static void fail_signal_handler(int sig) {
has_failed = true;
td::signal_safe_write_signal_number(sig);
td::Stacktrace::PrintOptions options;
options.use_gdb = true;
td::Stacktrace::print_to_stderr(options);
{
td::LogGuard log_guard;
td::signal_safe_write_signal_number(sig);
td::Stacktrace::PrintOptions options;
options.use_gdb = true;
td::Stacktrace::print_to_stderr(options);
}
print_log();
_Exit(EXIT_FAILURE);
}
@ -130,6 +142,7 @@ int main(int argc, char *argv[]) {
need_reopen_log.test_and_set();
need_quit.test_and_set();
need_change_verbosity_level.test_and_set();
need_dump_statistics.test_and_set();
need_dump_log.test_and_set();
td::Stacktrace::init();
@ -152,7 +165,7 @@ int main(int argc, char *argv[]) {
auto start_time = td::Time::now();
auto shared_data = std::make_shared<SharedData>();
auto parameters = std::make_unique<ClientParameters>();
parameters->version_ = "6.3.2";
parameters->version_ = "6.4";
parameters->shared_data_ = shared_data;
parameters->start_time_ = start_time;
auto net_query_stats = td::create_net_query_stats();
@ -556,8 +569,7 @@ int main(int argc, char *argv[]) {
if (!need_dump_log.test_and_set()) {
print_log();
auto guard = sched.get_main_guard();
send_closure(client_manager, &ClientManager::dump_statistics);
need_dump_statistics.clear();
}
double now = td::Time::now();
@ -575,7 +587,7 @@ int main(int argc, char *argv[]) {
next_watchdog_kick_time = now + WATCHDOG_TIMEOUT / 2;
}
if (now > last_dump_time + 300.0) {
if (!need_dump_statistics.test_and_set() || now > last_dump_time + 300.0) {
last_dump_time = now;
auto guard = sched.get_main_guard();
send_closure(client_manager, &ClientManager::dump_statistics);