From 99fa220ae505b377f7b73128f202cf91754a4afe Mon Sep 17 00:00:00 2001 From: Arseny Smirnov Date: Fri, 16 Aug 2019 14:57:41 +0300 Subject: [PATCH] NetQuery: thread safe dump_pending_network_queries GitOrigin-RevId: 43650ab11c3ca881ae7ee1f768353ce90f094da7 --- td/telegram/net/NetQuery.cpp | 10 ++++--- td/telegram/net/NetQuery.h | 58 ++++++++++++++++++++++-------------- td/telegram/net/Session.cpp | 22 ++++++++++---- 3 files changed, 59 insertions(+), 31 deletions(-) diff --git a/td/telegram/net/NetQuery.cpp b/td/telegram/net/NetQuery.cpp index 90e595367..2e817e542 100644 --- a/td/telegram/net/NetQuery.cpp +++ b/td/telegram/net/NetQuery.cpp @@ -15,7 +15,7 @@ namespace td { -ListNode net_query_list_; +TsList net_query_list_; int32 NetQuery::get_my_id() { return G()->get_my_id(); @@ -70,14 +70,16 @@ void dump_pending_network_queries() { decltype(n) i = 0; bool was_gap = false; - for (auto end = &net_query_list_, cur = end->prev; cur != end; cur = cur->prev, i++) { + auto guard = net_query_list_.lock(); + for (auto end = net_query_list_.end(), cur = net_query_list_.begin(); cur != end; cur = cur->get_next(), i++) { if (i < 20 || i + 20 > n || i % (n / 20 + 1) == 0) { if (was_gap) { LOG(WARNING) << "..."; was_gap = false; } - auto nq = &static_cast(*cur); - LOG(WARNING) << tag("id", nq->my_id_) << *nq << tag("total_flood", format::as_time(nq->total_timeout)) << " " + auto nq = &cur->get_data_unsafe(); + LOG(WARNING) << tag("id", nq->my_id_) << *static_cast(cur) + << /*tag("total_flood", format::as_time(nq->total_timeout_)) <<*/ " " << tag("since start", format::as_time(Time::now_cached() - nq->start_timestamp_)) << tag("state", nq->debug_str_) << tag("since state", format::as_time(Time::now_cached() - nq->debug_timestamp_)) diff --git a/td/telegram/net/NetQuery.h b/td/telegram/net/NetQuery.h index 3ed0763e7..091d4b4ce 100644 --- a/td/telegram/net/NetQuery.h +++ b/td/telegram/net/NetQuery.h @@ -16,7 +16,6 @@ #include "td/utils/buffer.h" #include "td/utils/common.h" #include "td/utils/format.h" -#include "td/utils/List.h" #include "td/utils/logging.h" #include "td/utils/ObjectPool.h" #include "td/utils/Slice.h" @@ -24,6 +23,7 @@ #include "td/utils/StringBuilder.h" #include "td/utils/Time.h" #include "td/utils/tl_parsers.h" +#include "td/utils/TsList.h" #include #include @@ -40,9 +40,21 @@ class NetQueryCallback : public Actor { virtual void on_result_resendable(NetQueryPtr query, Promise promise); }; -extern ListNode net_query_list_; +struct NetQueryDebug { + double start_timestamp_ = 0; + int32 my_id_ = 0; + int32 debug_resend_cnt_ = 0; + string debug_str_ = "empty"; + double debug_timestamp_ = 0; + int32 debug_cnt_ = 0; + int32 debug_send_failed_cnt_ = 0; + int debug_ack = 0; + bool debug_unknown = false; +}; -class NetQuery : public ListNode { +extern TsList net_query_list_; + +class NetQuery : public TsListNode { public: NetQuery() = default; @@ -78,7 +90,10 @@ class NetQuery : public ListNode { void resend(DcId new_dc_id) { VLOG(net_query) << "Resend" << *this; - debug_resend_cnt_++; + { + auto guard = lock(); + get_data_unsafe().debug_resend_cnt_++; + } dc_id_ = new_dc_id; status_ = Status::OK(); state_ = State::Query; @@ -213,7 +228,10 @@ class NetQuery : public ListNode { } void clear() { - LOG_IF(ERROR, !is_ready()) << "Destroy not ready query " << *this << " " << tag("debug", debug_str_); + if (!is_ready()) { + auto guard = lock(); + LOG(ERROR) << "Destroy not ready query " << *this << " " << tag("debug", get_data_unsafe().debug_str_); + } // TODO: CHECK if net_query is lost here cancel_slot_.close(); *this = NetQuery(); @@ -228,15 +246,20 @@ class NetQuery : public ListNode { } void debug_send_failed() { - debug_send_failed_cnt_++; + auto guard = lock(); + get_data_unsafe().debug_send_failed_cnt_++; } void debug(string str, bool may_be_lost = false) { may_be_lost_ = may_be_lost; - debug_str_ = std::move(str); - debug_timestamp_ = Time::now(); - debug_cnt_++; - VLOG(net_query) << *this << " " << tag("debug", debug_str_); + { + auto guard = lock(); + auto &data = get_data_unsafe(); + data.debug_str_ = str; + data.debug_timestamp_ = Time::now(); + data.debug_cnt_++; + } + VLOG(net_query) << *this << " " << tag("debug", str); } void set_callback(ActorShared callback) { @@ -312,21 +335,12 @@ class NetQuery : public ListNode { double last_timeout = 0; bool need_resend_on_503 = true; bool may_be_lost_ = false; - string debug_str_ = "empty"; string source_; - double debug_timestamp_ = 0; - int32 debug_cnt_ = 0; - int32 debug_send_failed_cnt_ = 0; - int32 debug_resend_cnt_ = 0; - int debug_ack = 0; - bool debug_unknown = false; int32 dispatch_ttl = -1; Slot cancel_slot_; Promise<> quick_ack_promise_; int32 file_type_ = -1; - double start_timestamp_ = 0; - int32 my_id_ = 0; NetQueryCounter nq_counter_; NetQuery(State state, uint64 id, BufferSlice &&query, BufferSlice &&answer, DcId dc_id, Type type, AuthFlag auth_flag, @@ -343,10 +357,10 @@ class NetQuery : public ListNode { , tl_constructor_(tl_constructor) , total_timeout_limit(total_timeout_limit) , nq_counter_(true) { - my_id_ = get_my_id(); - start_timestamp_ = Time::now(); + get_data_unsafe().my_id_ = get_my_id(); + get_data_unsafe().start_timestamp_ = Time::now(); LOG(INFO) << *this; - // net_query_list_.put(this); + net_query_list_.put(this); } }; diff --git a/td/telegram/net/Session.cpp b/td/telegram/net/Session.cpp index c3b851b25..7c5663b48 100644 --- a/td/telegram/net/Session.cpp +++ b/td/telegram/net/Session.cpp @@ -617,7 +617,10 @@ void Session::on_message_ack_impl_inner(uint64 id, int32 type, bool in_container } VLOG(net_query) << "Ack " << tag("msg_id", id) << it->second.query; it->second.ack = true; - it->second.query->debug_ack |= type; + { + auto lock = it->second.query->lock(); + it->second.query->get_data_unsafe().debug_ack |= type; + } it->second.query->quick_ack_promise_.set_value(Unit()); if (!in_container) { cleanup_container(id, &it->second); @@ -652,7 +655,10 @@ void Session::cleanup_container(uint64 message_id, Query *query) { } void Session::mark_as_known(uint64 id, Query *query) { - query->query->debug_unknown = false; + { + auto lock = query->query->lock(); + query->query->get_data_unsafe().debug_unknown = false; + } if (!query->unknown) { return; } @@ -665,7 +671,10 @@ void Session::mark_as_known(uint64 id, Query *query) { } void Session::mark_as_unknown(uint64 id, Query *query) { - query->query->debug_unknown = true; + { + auto lock = query->query->lock(); + query->query->get_data_unsafe().debug_unknown = true; + } if (query->unknown) { return; } @@ -930,8 +939,11 @@ void Session::connection_send_query(ConnectionInfo *info, NetQueryPtr &&net_quer net_query->set_message_id(message_id); net_query->cancel_slot_.clear_event(); LOG_CHECK(sent_queries_.find(message_id) == sent_queries_.end()) << message_id; - net_query->debug_unknown = false; - net_query->debug_ack = 0; + { + auto lock = net_query->lock(); + net_query->get_data_unsafe().debug_unknown = false; + net_query->get_data_unsafe().debug_ack = 0; + } if (!net_query->cancel_slot_.empty()) { LOG(DEBUG) << "Set event for net_query cancellation " << tag("message_id", format::as_hex(message_id)); net_query->cancel_slot_.set_event(EventCreator::raw(actor_id(), message_id));