NetQuery: thread safe dump_pending_network_queries
GitOrigin-RevId: 43650ab11c3ca881ae7ee1f768353ce90f094da7
This commit is contained in:
parent
2a51b98453
commit
99fa220ae5
|
@ -15,7 +15,7 @@
|
|||
|
||||
namespace td {
|
||||
|
||||
ListNode net_query_list_;
|
||||
TsList<NetQueryDebug> net_query_list_;
|
||||
|
||||
int32 NetQuery::get_my_id() {
|
||||
return G()->get_my_id();
|
||||
|
@ -70,14 +70,16 @@ void dump_pending_network_queries() {
|
|||
|
||||
decltype(n) i = 0;
|
||||
bool was_gap = false;
|
||||
for (auto end = &net_query_list_, cur = end->prev; cur != end; cur = cur->prev, i++) {
|
||||
auto guard = net_query_list_.lock();
|
||||
for (auto end = net_query_list_.end(), cur = net_query_list_.begin(); cur != end; cur = cur->get_next(), i++) {
|
||||
if (i < 20 || i + 20 > n || i % (n / 20 + 1) == 0) {
|
||||
if (was_gap) {
|
||||
LOG(WARNING) << "...";
|
||||
was_gap = false;
|
||||
}
|
||||
auto nq = &static_cast<NetQuery &>(*cur);
|
||||
LOG(WARNING) << tag("id", nq->my_id_) << *nq << tag("total_flood", format::as_time(nq->total_timeout)) << " "
|
||||
auto nq = &cur->get_data_unsafe();
|
||||
LOG(WARNING) << tag("id", nq->my_id_) << *static_cast<NetQuery *>(cur)
|
||||
<< /*tag("total_flood", format::as_time(nq->total_timeout_)) <<*/ " "
|
||||
<< tag("since start", format::as_time(Time::now_cached() - nq->start_timestamp_))
|
||||
<< tag("state", nq->debug_str_)
|
||||
<< tag("since state", format::as_time(Time::now_cached() - nq->debug_timestamp_))
|
||||
|
|
|
@ -16,7 +16,6 @@
|
|||
#include "td/utils/buffer.h"
|
||||
#include "td/utils/common.h"
|
||||
#include "td/utils/format.h"
|
||||
#include "td/utils/List.h"
|
||||
#include "td/utils/logging.h"
|
||||
#include "td/utils/ObjectPool.h"
|
||||
#include "td/utils/Slice.h"
|
||||
|
@ -24,6 +23,7 @@
|
|||
#include "td/utils/StringBuilder.h"
|
||||
#include "td/utils/Time.h"
|
||||
#include "td/utils/tl_parsers.h"
|
||||
#include "td/utils/TsList.h"
|
||||
|
||||
#include <atomic>
|
||||
#include <utility>
|
||||
|
@ -40,9 +40,21 @@ class NetQueryCallback : public Actor {
|
|||
virtual void on_result_resendable(NetQueryPtr query, Promise<NetQueryPtr> promise);
|
||||
};
|
||||
|
||||
extern ListNode net_query_list_;
|
||||
struct NetQueryDebug {
|
||||
double start_timestamp_ = 0;
|
||||
int32 my_id_ = 0;
|
||||
int32 debug_resend_cnt_ = 0;
|
||||
string debug_str_ = "empty";
|
||||
double debug_timestamp_ = 0;
|
||||
int32 debug_cnt_ = 0;
|
||||
int32 debug_send_failed_cnt_ = 0;
|
||||
int debug_ack = 0;
|
||||
bool debug_unknown = false;
|
||||
};
|
||||
|
||||
class NetQuery : public ListNode {
|
||||
extern TsList<NetQueryDebug> net_query_list_;
|
||||
|
||||
class NetQuery : public TsListNode<NetQueryDebug> {
|
||||
public:
|
||||
NetQuery() = default;
|
||||
|
||||
|
@ -78,7 +90,10 @@ class NetQuery : public ListNode {
|
|||
|
||||
void resend(DcId new_dc_id) {
|
||||
VLOG(net_query) << "Resend" << *this;
|
||||
debug_resend_cnt_++;
|
||||
{
|
||||
auto guard = lock();
|
||||
get_data_unsafe().debug_resend_cnt_++;
|
||||
}
|
||||
dc_id_ = new_dc_id;
|
||||
status_ = Status::OK();
|
||||
state_ = State::Query;
|
||||
|
@ -213,7 +228,10 @@ class NetQuery : public ListNode {
|
|||
}
|
||||
|
||||
void clear() {
|
||||
LOG_IF(ERROR, !is_ready()) << "Destroy not ready query " << *this << " " << tag("debug", debug_str_);
|
||||
if (!is_ready()) {
|
||||
auto guard = lock();
|
||||
LOG(ERROR) << "Destroy not ready query " << *this << " " << tag("debug", get_data_unsafe().debug_str_);
|
||||
}
|
||||
// TODO: CHECK if net_query is lost here
|
||||
cancel_slot_.close();
|
||||
*this = NetQuery();
|
||||
|
@ -228,15 +246,20 @@ class NetQuery : public ListNode {
|
|||
}
|
||||
|
||||
void debug_send_failed() {
|
||||
debug_send_failed_cnt_++;
|
||||
auto guard = lock();
|
||||
get_data_unsafe().debug_send_failed_cnt_++;
|
||||
}
|
||||
|
||||
void debug(string str, bool may_be_lost = false) {
|
||||
may_be_lost_ = may_be_lost;
|
||||
debug_str_ = std::move(str);
|
||||
debug_timestamp_ = Time::now();
|
||||
debug_cnt_++;
|
||||
VLOG(net_query) << *this << " " << tag("debug", debug_str_);
|
||||
{
|
||||
auto guard = lock();
|
||||
auto &data = get_data_unsafe();
|
||||
data.debug_str_ = str;
|
||||
data.debug_timestamp_ = Time::now();
|
||||
data.debug_cnt_++;
|
||||
}
|
||||
VLOG(net_query) << *this << " " << tag("debug", str);
|
||||
}
|
||||
|
||||
void set_callback(ActorShared<NetQueryCallback> callback) {
|
||||
|
@ -312,21 +335,12 @@ class NetQuery : public ListNode {
|
|||
double last_timeout = 0;
|
||||
bool need_resend_on_503 = true;
|
||||
bool may_be_lost_ = false;
|
||||
string debug_str_ = "empty";
|
||||
string source_;
|
||||
double debug_timestamp_ = 0;
|
||||
int32 debug_cnt_ = 0;
|
||||
int32 debug_send_failed_cnt_ = 0;
|
||||
int32 debug_resend_cnt_ = 0;
|
||||
int debug_ack = 0;
|
||||
bool debug_unknown = false;
|
||||
int32 dispatch_ttl = -1;
|
||||
Slot cancel_slot_;
|
||||
Promise<> quick_ack_promise_;
|
||||
int32 file_type_ = -1;
|
||||
|
||||
double start_timestamp_ = 0;
|
||||
int32 my_id_ = 0;
|
||||
NetQueryCounter nq_counter_;
|
||||
|
||||
NetQuery(State state, uint64 id, BufferSlice &&query, BufferSlice &&answer, DcId dc_id, Type type, AuthFlag auth_flag,
|
||||
|
@ -343,10 +357,10 @@ class NetQuery : public ListNode {
|
|||
, tl_constructor_(tl_constructor)
|
||||
, total_timeout_limit(total_timeout_limit)
|
||||
, nq_counter_(true) {
|
||||
my_id_ = get_my_id();
|
||||
start_timestamp_ = Time::now();
|
||||
get_data_unsafe().my_id_ = get_my_id();
|
||||
get_data_unsafe().start_timestamp_ = Time::now();
|
||||
LOG(INFO) << *this;
|
||||
// net_query_list_.put(this);
|
||||
net_query_list_.put(this);
|
||||
}
|
||||
};
|
||||
|
||||
|
|
|
@ -617,7 +617,10 @@ void Session::on_message_ack_impl_inner(uint64 id, int32 type, bool in_container
|
|||
}
|
||||
VLOG(net_query) << "Ack " << tag("msg_id", id) << it->second.query;
|
||||
it->second.ack = true;
|
||||
it->second.query->debug_ack |= type;
|
||||
{
|
||||
auto lock = it->second.query->lock();
|
||||
it->second.query->get_data_unsafe().debug_ack |= type;
|
||||
}
|
||||
it->second.query->quick_ack_promise_.set_value(Unit());
|
||||
if (!in_container) {
|
||||
cleanup_container(id, &it->second);
|
||||
|
@ -652,7 +655,10 @@ void Session::cleanup_container(uint64 message_id, Query *query) {
|
|||
}
|
||||
|
||||
void Session::mark_as_known(uint64 id, Query *query) {
|
||||
query->query->debug_unknown = false;
|
||||
{
|
||||
auto lock = query->query->lock();
|
||||
query->query->get_data_unsafe().debug_unknown = false;
|
||||
}
|
||||
if (!query->unknown) {
|
||||
return;
|
||||
}
|
||||
|
@ -665,7 +671,10 @@ void Session::mark_as_known(uint64 id, Query *query) {
|
|||
}
|
||||
|
||||
void Session::mark_as_unknown(uint64 id, Query *query) {
|
||||
query->query->debug_unknown = true;
|
||||
{
|
||||
auto lock = query->query->lock();
|
||||
query->query->get_data_unsafe().debug_unknown = true;
|
||||
}
|
||||
if (query->unknown) {
|
||||
return;
|
||||
}
|
||||
|
@ -930,8 +939,11 @@ void Session::connection_send_query(ConnectionInfo *info, NetQueryPtr &&net_quer
|
|||
net_query->set_message_id(message_id);
|
||||
net_query->cancel_slot_.clear_event();
|
||||
LOG_CHECK(sent_queries_.find(message_id) == sent_queries_.end()) << message_id;
|
||||
net_query->debug_unknown = false;
|
||||
net_query->debug_ack = 0;
|
||||
{
|
||||
auto lock = net_query->lock();
|
||||
net_query->get_data_unsafe().debug_unknown = false;
|
||||
net_query->get_data_unsafe().debug_ack = 0;
|
||||
}
|
||||
if (!net_query->cancel_slot_.empty()) {
|
||||
LOG(DEBUG) << "Set event for net_query cancellation " << tag("message_id", format::as_hex(message_id));
|
||||
net_query->cancel_slot_.set_event(EventCreator::raw(actor_id(), message_id));
|
||||
|
|
Loading…
Reference in New Issue
Block a user