NetQueryStats

GitOrigin-RevId: 5a04b322e12e2838b1d7f812fcf06bff4eefbace
This commit is contained in:
Arseny Smirnov 2020-07-30 17:28:56 +03:00
parent d7eadd77c1
commit cacabaf6d1
17 changed files with 186 additions and 77 deletions

View File

@ -456,6 +456,7 @@ set(TDLIB_SOURCE
td/telegram/net/NetQueryCreator.cpp
td/telegram/net/NetQueryDelayer.cpp
td/telegram/net/NetQueryDispatcher.cpp
td/telegram/net/NetQueryStats.cpp
td/telegram/net/NetStatsManager.cpp
td/telegram/net/Proxy.cpp
td/telegram/net/PublicRsaKeyShared.cpp
@ -628,6 +629,7 @@ set(TDLIB_SOURCE
td/telegram/net/NetQueryCreator.h
td/telegram/net/NetQueryDelayer.h
td/telegram/net/NetQueryDispatcher.h
td/telegram/net/NetQueryStats.h
td/telegram/net/NetStatsManager.h
td/telegram/net/NetType.h
td/telegram/net/Proxy.h

View File

@ -30,6 +30,8 @@ namespace td {
class MultiTd : public Actor {
public:
explicit MultiTd(Td::Options options) : options_(std::move(options)) {
}
void create(int32 td_id, unique_ptr<TdCallback> callback) {
auto &td = tds_[td_id];
CHECK(td.empty());
@ -47,7 +49,7 @@ class MultiTd : public Actor {
auto context = std::make_shared<TdActorContext>(to_string(td_id));
auto old_context = set_context(context);
auto old_tag = set_tag(context->tag_);
td = create_actor<Td>("Td", std::move(callback));
td = create_actor<Td>("Td", std::move(callback), options_);
set_context(old_context);
set_tag(old_tag);
}
@ -64,6 +66,7 @@ class MultiTd : public Actor {
}
private:
Td::Options options_;
std::unordered_map<int32, ActorOwn<Td>> tds_;
};
@ -112,6 +115,7 @@ class TdReceiver {
class MultiClient::Impl final {
public:
Impl() {
options_.net_query_stats = std::make_shared<NetQueryStats>();
concurrent_scheduler_ = make_unique<ConcurrentScheduler>();
concurrent_scheduler_->init(0);
receiver_ = make_unique<TdReceiver>();
@ -120,7 +124,8 @@ class MultiClient::Impl final {
ClientId create_client() {
auto client_id = ++client_id_;
tds_[client_id] = concurrent_scheduler_->create_actor_unsafe<Td>(0, "Td", receiver_->create_callback(client_id));
tds_[client_id] =
concurrent_scheduler_->create_actor_unsafe<Td>(0, "Td", receiver_->create_callback(client_id), options_);
return client_id;
}
@ -185,6 +190,7 @@ class MultiClient::Impl final {
std::vector<Request> requests_;
unique_ptr<ConcurrentScheduler> concurrent_scheduler_;
ClientId client_id_{0};
Td::Options options_;
std::unordered_map<int32, ActorOwn<Td>> tds_;
};
@ -270,14 +276,16 @@ class TdReceiver {
class MultiImpl {
public:
MultiImpl() {
MultiImpl(std::shared_ptr<NetQueryStats> net_query_stats) {
concurrent_scheduler_ = std::make_shared<ConcurrentScheduler>();
concurrent_scheduler_->init(3);
concurrent_scheduler_->start();
{
auto guard = concurrent_scheduler_->get_main_guard();
multi_td_ = create_actor<MultiTd>("MultiTd");
Td::Options options;
options.net_query_stats = std::move(net_query_stats);
multi_td_ = create_actor<MultiTd>("MultiTd", std::move(options));
}
scheduler_thread_ = thread([concurrent_scheduler = concurrent_scheduler_] {
@ -343,15 +351,20 @@ class MultiImplPool {
[](auto &a, auto &b) { return a.lock().use_count() < b.lock().use_count(); });
auto res = impl.lock();
if (!res) {
res = std::make_shared<MultiImpl>();
res = std::make_shared<MultiImpl>(net_query_stats_);
impl = res;
}
return res;
}
std::shared_ptr<NetQueryStats> get_net_query_stats() const {
return net_query_stats_;
}
private:
std::mutex mutex_;
std::vector<std::weak_ptr<MultiImpl>> impls_;
std::shared_ptr<NetQueryStats> net_query_stats_ = std::make_shared<NetQueryStats>();
};
class MultiClient::Impl final {

View File

@ -9,12 +9,15 @@
#include "td/telegram/td_api.h"
#include "td/telegram/net/NetQueryCounter.h"
#include "td/telegram/net/NetQueryStats.h"
#include "td/telegram/Td.h"
namespace td {
ClientActor::ClientActor(unique_ptr<TdCallback> callback) {
td_ = create_actor<Td>("Td", std::move(callback));
ClientActor::ClientActor(unique_ptr<TdCallback> callback, Options options) {
Td::Options td_options;
td_options.net_query_stats = options.net_query_stats;
td_ = create_actor<Td>("Td", std::move(callback), std::move(td_options));
}
void ClientActor::request(uint64 id, td_api::object_ptr<td_api::Function> request) {
@ -31,8 +34,16 @@ td_api::object_ptr<td_api::Object> ClientActor::execute(td_api::object_ptr<td_ap
return Td::static_request(std::move(request));
}
uint64 get_pending_network_query_count() {
return NetQueryCounter::get_count();
std::shared_ptr<NetQueryStats> create_net_query_stats() {
return std::make_shared<NetQueryStats>();
}
void dump_pending_network_queries(NetQueryStats &stats) {
stats.dump_pending_network_queries();
}
uint64 get_pending_network_query_count(NetQueryStats &stats) {
return stats.get_count();
}
} // namespace td

View File

@ -20,6 +20,7 @@
namespace td {
class Td;
class NetQueryStats;
/**
* This is a low-level Actor interface for interaction with TDLib. The interface is a lot more flexible than
@ -27,11 +28,15 @@ class Td;
*/
class ClientActor : public Actor {
public:
struct Options {
std::shared_ptr<NetQueryStats> net_query_stats;
};
/**
* Creates a ClientActor using the specified callback.
* \param[in] callback Callback for outgoing notifications from TDLib.
*/
explicit ClientActor(unique_ptr<TdCallback> callback);
explicit ClientActor(unique_ptr<TdCallback> callback, Options options = {});
/**
* Sends one request to TDLib. The answer will be received via callback.
@ -70,16 +75,18 @@ class ClientActor : public Actor {
ActorOwn<Td> td_;
};
std::shared_ptr<NetQueryStats> create_net_query_stats();
/**
* Dumps information about all pending network queries to the internal TDLib log.
* This is useful for library debugging.
*/
void dump_pending_network_queries();
void dump_pending_network_queries(NetQueryStats &stats);
/**
* Returns the current number of pending network queries. Useful for library debugging.
* \return Number of currently pending network queries.
*/
uint64 get_pending_network_query_count();
uint64 get_pending_network_query_count(NetQueryStats &stats);
} // namespace td

View File

@ -219,6 +219,10 @@ bool Global::ignore_backgrond_updates() const {
shared_config_->get_option_boolean("ignore_background_updates");
}
void Global::set_net_query_stats(std::shared_ptr<NetQueryStats> net_query_stats) {
net_query_creator_.set_create_func([=] { return td::make_unique<NetQueryCreator>(net_query_stats); });
}
void Global::set_net_query_dispatcher(unique_ptr<NetQueryDispatcher> net_query_dispatcher) {
net_query_dispatcher_ = std::move(net_query_dispatcher);
}

View File

@ -102,9 +102,10 @@ class Global : public ActorContext {
bool ignore_backgrond_updates() const;
NetQueryCreator &net_query_creator() {
return net_query_creator_.get();
return *net_query_creator_.get();
}
void set_net_query_stats(std::shared_ptr<NetQueryStats> net_query_stats);
void set_net_query_dispatcher(unique_ptr<NetQueryDispatcher> net_query_dispatcher);
NetQueryDispatcher &net_query_dispatcher() {
@ -422,7 +423,7 @@ class Global : public ActorContext {
ActorId<StateManager> state_manager_;
SchedulerLocalStorage<NetQueryCreator> net_query_creator_;
LazySchedulerLocalStorage<td::unique_ptr<NetQueryCreator>> net_query_creator_;
unique_ptr<NetQueryDispatcher> net_query_dispatcher_;
unique_ptr<ConfigShared> shared_config_;

View File

@ -3005,7 +3005,8 @@ class SetBackgroundRequest : public RequestActor<> {
}
};
Td::Td(unique_ptr<TdCallback> callback) : callback_(std::move(callback)) {
Td::Td(unique_ptr<TdCallback> callback, Options options)
: callback_(std::move(callback)), td_options_(std::move(options)) {
}
Td::~Td() = default;
@ -3740,6 +3741,7 @@ void Td::start_up() {
VLOG(td_init) << "Create Global";
set_context(std::make_shared<Global>());
G()->set_net_query_stats(td_options_.net_query_stats);
inc_request_actor_refcnt(); // guard
inc_actor_refcnt(); // guard

View File

@ -14,6 +14,7 @@
#include "td/telegram/TdParameters.h"
#include "td/telegram/TermsOfService.h"
#include "td/telegram/net/NetQueryStats.h"
#include "td/telegram/td_api.h"
#include "td/telegram/telegram_api.h"
@ -93,7 +94,11 @@ class Td final : public NetQueryCallback {
Td &operator=(Td &&) = delete;
~Td() override;
explicit Td(unique_ptr<TdCallback> callback);
struct Options {
std::shared_ptr<td::NetQueryStats> net_query_stats;
};
Td(unique_ptr<TdCallback> callback, Options options);
void request(uint64 id, tl_object_ptr<td_api::Function> function);
@ -260,6 +265,7 @@ class Td final : public NetQueryCallback {
TdParameters parameters_;
unique_ptr<TdCallback> callback_;
Options td_options_;
StateManager::State connection_state_;

View File

@ -846,7 +846,10 @@ class CliClient final : public Actor {
uint64 generation_;
};
td_client_ = create_actor<ClientActor>(name, make_unique<TdCallbackImpl>(this, ++generation_));
ClientActor::Options options;
options.net_query_stats = net_query_stats_;
td_client_ = create_actor<ClientActor>(name, make_unique<TdCallbackImpl>(this, ++generation_), std::move(options));
}
void init_td() {
@ -4164,7 +4167,7 @@ class CliClient final : public Actor {
} else if (op == "q" || op == "Quit") {
quit();
} else if (op == "dnq" || op == "DumpNetQueries") {
dump_pending_network_queries();
dump_pending_network_queries(*net_query_stats_);
} else if (op == "fatal") {
LOG(FATAL) << "Fatal!";
} else if (op == "unreachable") {
@ -4289,6 +4292,7 @@ class CliClient final : public Actor {
ConcurrentScheduler *scheduler_{nullptr};
bool use_test_dc_ = false;
std::shared_ptr<NetQueryStats> net_query_stats_ = create_net_query_stats();
ActorOwn<ClientActor> td_client_;
std::queue<string> cmd_queue_;
bool close_flag_ = false;

View File

@ -62,43 +62,4 @@ void NetQuery::set_error(Status status, string source) {
set_error_impl(std::move(status), std::move(source));
}
TsList<NetQueryDebug> &NetQuery::get_net_query_list() {
static auto init_mutex = [] {
TsList<NetQueryDebug>::lock().unlock(); // initialize mutex before any NetQuery
return true;
}();
CHECK(init_mutex);
static TsList<NetQueryDebug> net_query_list;
return net_query_list;
}
void dump_pending_network_queries() {
auto n = NetQueryCounter::get_count();
LOG(WARNING) << tag("pending net queries", n);
decltype(n) i = 0;
bool was_gap = false;
auto &net_query_list = NetQuery::get_net_query_list();
auto guard = net_query_list.lock();
for (auto end = net_query_list.end(), cur = net_query_list.begin(); cur != end; cur = cur->get_next(), i++) {
if (i < 20 || i + 20 > n || i % (n / 20 + 1) == 0) {
if (was_gap) {
LOG(WARNING) << "...";
was_gap = false;
}
const NetQueryDebug &debug = cur->get_data_unsafe();
const NetQuery &nq = *static_cast<const NetQuery *>(cur);
LOG(WARNING) << tag("user", debug.my_id_) << nq << tag("total flood", format::as_time(nq.total_timeout_))
<< tag("since start", format::as_time(Time::now_cached() - debug.start_timestamp_))
<< tag("state", debug.state_)
<< tag("in this state", format::as_time(Time::now_cached() - debug.state_timestamp_))
<< tag("state changed", debug.state_change_count_) << tag("resend count", debug.resend_count_)
<< tag("fail count", debug.send_failed_count_) << tag("ack state", debug.ack_state_)
<< tag("unknown", debug.unknown_state_);
} else {
was_gap = true;
}
}
}
} // namespace td

View File

@ -8,6 +8,7 @@
#include "td/telegram/net/DcId.h"
#include "td/telegram/net/NetQueryCounter.h"
#include "td/telegram/net/NetQueryStats.h"
#include "td/actor/actor.h"
#include "td/actor/PromiseFuture.h"
@ -40,18 +41,6 @@ class NetQueryCallback : public Actor {
virtual void on_result_resendable(NetQueryPtr query, Promise<NetQueryPtr> promise);
};
struct NetQueryDebug {
double start_timestamp_ = 0;
int32 my_id_ = 0;
int32 resend_count_ = 0;
string state_ = "empty";
double state_timestamp_ = 0;
int32 state_change_count_ = 0;
int32 send_failed_count_ = 0;
int ack_state_ = 0;
bool unknown_state_ = false;
};
class NetQuery : public TsListNode<NetQueryDebug> {
public:
NetQuery() = default;
@ -279,8 +268,6 @@ class NetQuery : public TsListNode<NetQueryDebug> {
static int32 tl_magic(const BufferSlice &buffer_slice);
static TsList<NetQueryDebug> &get_net_query_list();
private:
State state_ = State::Empty;
Type type_ = Type::Common;
@ -345,13 +332,12 @@ class NetQuery : public TsListNode<NetQueryDebug> {
int32 file_type_ = -1; // to be set by caller
NetQuery(State state, uint64 id, BufferSlice &&query, BufferSlice &&answer, DcId dc_id, Type type, AuthFlag auth_flag,
GzipFlag gzip_flag, int32 tl_constructor, double total_timeout_limit)
GzipFlag gzip_flag, int32 tl_constructor, double total_timeout_limit, NetQueryStats *stats)
: state_(state)
, type_(type)
, auth_flag_(auth_flag)
, gzip_flag_(gzip_flag)
, dc_id_(dc_id)
, nq_counter_(true)
, status_()
, id_(id)
, query_(std::move(query))
@ -361,7 +347,9 @@ class NetQuery : public TsListNode<NetQueryDebug> {
get_data_unsafe().my_id_ = get_my_id();
get_data_unsafe().start_timestamp_ = Time::now();
LOG(INFO) << *this;
get_net_query_list().put(this);
if (stats) {
nq_counter_ = stats->register_query(this);
}
}
};

View File

@ -68,7 +68,7 @@ NetQueryPtr NetQueryCreator::create(uint64 id, const telegram_api::Function &fun
}
}
auto query = object_pool_.create(NetQuery::State::Query, id, std::move(slice), BufferSlice(), dc_id, type, auth_flag,
gzip_flag, tl_constructor, total_timeout_limit);
gzip_flag, tl_constructor, total_timeout_limit, net_query_stats_.get());
query->set_cancellation_token(query.generation());
return query;
}

View File

@ -21,7 +21,8 @@ class Function;
class NetQueryCreator {
public:
NetQueryCreator() {
explicit NetQueryCreator(std::shared_ptr<NetQueryStats> net_query_stats = {}) {
net_query_stats_ = std::move(net_query_stats);
object_pool_.set_check_empty(true);
}
@ -31,7 +32,8 @@ class NetQueryCreator {
NetQueryPtr create_update(BufferSlice &&buffer) {
return object_pool_.create(NetQuery::State::OK, 0, BufferSlice(), std::move(buffer), DcId::main(),
NetQuery::Type::Common, NetQuery::AuthFlag::On, NetQuery::GzipFlag::Off, 0, 0);
NetQuery::Type::Common, NetQuery::AuthFlag::On, NetQuery::GzipFlag::Off, 0, 0,
net_query_stats_.get());
}
NetQueryPtr create(const telegram_api::Function &function, DcId dc_id = DcId::main(),
@ -45,6 +47,7 @@ class NetQueryCreator {
NetQuery::AuthFlag auth_flag);
private:
std::shared_ptr<NetQueryStats> net_query_stats_;
ObjectPool<NetQuery> object_pool_;
};

View File

@ -0,0 +1,50 @@
//
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2020
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include "td/telegram/net/NetQueryStats.h"
#include "td/telegram/net/NetQuery.h"
#include "td/utils/logging.h"
#include "td/utils/format.h"
namespace td {
uint64 NetQueryStats::get_count() const {
return count_;
}
void NetQueryStats::dump_pending_network_queries() {
auto n = get_count();
LOG(WARNING) << tag("pending net queries", n);
if (!use_list_) {
return;
}
decltype(n) i = 0;
bool was_gap = false;
auto &net_query_list = list_;
auto guard = net_query_list.lock();
for (auto end = net_query_list.end(), cur = net_query_list.begin(); cur != end; cur = cur->get_next(), i++) {
if (i < 20 || i + 20 > n || i % (n / 20 + 1) == 0) {
if (was_gap) {
LOG(WARNING) << "...";
was_gap = false;
}
const NetQueryDebug &debug = cur->get_data_unsafe();
const NetQuery &nq = *static_cast<const NetQuery *>(cur);
LOG(WARNING) << tag("user", debug.my_id_) << nq << tag("total flood", format::as_time(nq.total_timeout_))
<< tag("since start", format::as_time(Time::now_cached() - debug.start_timestamp_))
<< tag("state", debug.state_)
<< tag("in this state", format::as_time(Time::now_cached() - debug.state_timestamp_))
<< tag("state changed", debug.state_change_count_) << tag("resend count", debug.resend_count_)
<< tag("fail count", debug.send_failed_count_) << tag("ack state", debug.ack_state_)
<< tag("unknown", debug.unknown_state_);
} else {
was_gap = true;
}
}
}
} // namespace td

View File

@ -0,0 +1,50 @@
//
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2020
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#pragma once
#include "td/telegram/net/NetQueryCounter.h"
#include "td/utils/int_types.h"
#include "td/utils/TsList.h"
#include <atomic>
namespace td {
struct NetQueryDebug {
double start_timestamp_ = 0;
int32 my_id_ = 0;
int32 resend_count_ = 0;
string state_ = "empty";
double state_timestamp_ = 0;
int32 state_change_count_ = 0;
int32 send_failed_count_ = 0;
int ack_state_ = 0;
bool unknown_state_ = false;
};
class NetQueryStats {
public:
[[deprecated]] static NetQueryStats &get_default_stats() {
static NetQueryStats res;
return res;
}
NetQueryCounter register_query(TsListNode<NetQueryDebug> *query) {
if (use_list_.load(std::memory_order_relaxed)) {
list_.put(query);
}
return NetQueryCounter(&count_);
}
uint64 get_count() const;
void dump_pending_network_queries();
private:
NetQueryCounter::Counter count_;
std::atomic<bool> use_list_{true};
TsList<NetQueryDebug> list_;
};
} // namespace td

View File

@ -46,6 +46,10 @@ class LazySchedulerLocalStorage {
LazySchedulerLocalStorage() = default;
explicit LazySchedulerLocalStorage(std::function<T()> create_func) : create_func_(std::move(create_func)) {
}
void set_create_func(std::function<T()> create_func) {
CHECK(!create_func_);
create_func_ = create_func;
}
T &get() {
auto &optional_value_ = sls_optional_value_.get();

View File

@ -202,6 +202,9 @@ class TsList : public TsListNode<DataT> {
template <class DataT>
std::unique_lock<std::mutex> TsListNode<DataT>::lock() {
if (parent == nullptr) {
return {};
}
CHECK(parent != nullptr);
return parent->lock();
}