Move some NetQuery function implementations to cpp.

This commit is contained in:
levlam 2024-05-15 15:05:36 +03:00
parent f95132c5bd
commit 29cd56cab0
2 changed files with 78 additions and 63 deletions

View File

@ -59,6 +59,45 @@ NetQuery::NetQuery(uint64 id, BufferSlice &&query, DcId dc_id, Type type, AuthFl
} }
} }
void NetQuery::clear() {
if (!is_ready()) {
auto guard = lock();
LOG(ERROR) << "Destroy not ready query " << *this << " " << tag("state", get_data_unsafe().state_);
}
// TODO: CHECK if net_query is lost here
cancel_slot_.close();
*this = NetQuery();
}
void NetQuery::resend(DcId new_dc_id) {
VLOG(net_query) << "Resend " << *this;
{
auto guard = lock();
get_data_unsafe().resend_count_++;
}
dc_id_ = new_dc_id;
status_ = Status::OK();
state_ = State::Query;
}
bool NetQuery::update_is_ready() {
if (state_ == State::Query) {
if (cancellation_token_.load(std::memory_order_relaxed) == 0 || cancel_slot_.was_signal()) {
set_error_canceled();
return true;
}
return false;
}
return true;
}
void NetQuery::set_ok(BufferSlice slice) {
VLOG(net_query) << "Receive answer " << *this;
CHECK(state_ == State::Query);
answer_ = std::move(slice);
state_ = State::OK;
}
void NetQuery::on_net_write(size_t size) { void NetQuery::on_net_write(size_t size) {
const auto &callbacks = G()->get_net_stats_file_callbacks(); const auto &callbacks = G()->get_net_stats_file_callbacks();
if (static_cast<size_t>(file_type_) < callbacks.size()) { if (static_cast<size_t>(file_type_) < callbacks.size()) {
@ -101,4 +140,35 @@ void NetQuery::set_error(Status status, string source) {
set_error_impl(std::move(status), std::move(source)); set_error_impl(std::move(status), std::move(source));
} }
void NetQuery::set_error_impl(Status status, string source) {
VLOG(net_query) << "Receive error " << *this << " " << status;
status_ = std::move(status);
state_ = State::Error;
source_ = std::move(source);
}
StringBuilder &operator<<(StringBuilder &stream, const NetQuery &net_query) {
stream << "[Query:";
stream << tag("id", net_query.id());
stream << tag("tl", format::as_hex(net_query.tl_constructor()));
auto message_id = net_query.message_id();
if (message_id != 0) {
stream << tag("msg_id", format::as_hex(message_id));
}
if (net_query.is_error()) {
stream << net_query.error();
} else if (net_query.is_ok()) {
stream << tag("result_tl", format::as_hex(net_query.ok_tl_constructor()));
}
stream << ']';
return stream;
}
StringBuilder &operator<<(StringBuilder &stream, const NetQueryPtr &net_query_ptr) {
if (net_query_ptr.empty()) {
return stream << "[Query: null]";
}
return stream << *net_query_ptr;
}
} // namespace td } // namespace td

View File

@ -80,16 +80,7 @@ class NetQuery final : public TsListNode<NetQueryDebug> {
return tl_constructor_; return tl_constructor_;
} }
void resend(DcId new_dc_id) { void resend(DcId new_dc_id);
VLOG(net_query) << "Resend " << *this;
{
auto guard = lock();
get_data_unsafe().resend_count_++;
}
dc_id_ = new_dc_id;
status_ = Status::OK();
state_ = State::Query;
}
void resend() { void resend() {
resend(dc_id_); resend(dc_id_);
@ -121,12 +112,7 @@ class NetQuery final : public TsListNode<NetQueryDebug> {
return status; return status;
} }
void set_ok(BufferSlice slice) { void set_ok(BufferSlice slice);
VLOG(net_query) << "Receive answer " << *this;
CHECK(state_ == State::Query);
answer_ = std::move(slice);
state_ = State::OK;
}
void on_net_write(size_t size); void on_net_write(size_t size);
void on_net_read(size_t size); void on_net_read(size_t size);
@ -145,16 +131,7 @@ class NetQuery final : public TsListNode<NetQueryDebug> {
set_error_impl(Status::Error<Error::ResendInvokeAfter>()); set_error_impl(Status::Error<Error::ResendInvokeAfter>());
} }
bool update_is_ready() { bool update_is_ready();
if (state_ == State::Query) {
if (cancellation_token_.load(std::memory_order_relaxed) == 0 || cancel_slot_.was_signal()) {
set_error_canceled();
return true;
}
return false;
}
return true;
}
bool is_ready() const { bool is_ready() const {
return state_ != State::Query; return state_ != State::Query;
@ -208,15 +185,8 @@ class NetQuery final : public TsListNode<NetQueryDebug> {
cancellation_token_.store(cancellation_token, std::memory_order_relaxed); cancellation_token_.store(cancellation_token, std::memory_order_relaxed);
} }
void clear() { void clear();
if (!is_ready()) {
auto guard = lock();
LOG(ERROR) << "Destroy not ready query " << *this << " " << tag("state", get_data_unsafe().state_);
}
// TODO: CHECK if net_query is lost here
cancel_slot_.close();
*this = NetQuery();
}
bool empty() const { bool empty() const {
return state_ == State::Empty || !nq_counter_ || may_be_lost_; return state_ == State::Empty || !nq_counter_ || may_be_lost_;
} }
@ -312,12 +282,7 @@ class NetQuery final : public TsListNode<NetQueryDebug> {
movable_atomic<int32> cancellation_token_{-1}; // == 0 if query is canceled movable_atomic<int32> cancellation_token_{-1}; // == 0 if query is canceled
ActorShared<NetQueryCallback> callback_; ActorShared<NetQueryCallback> callback_;
void set_error_impl(Status status, string source = string()) { void set_error_impl(Status status, string source = string());
VLOG(net_query) << "Receive error " << *this << " " << status;
status_ = std::move(status);
state_ = State::Error;
source_ = std::move(source);
}
static int32 tl_magic(const BufferSlice &buffer_slice); static int32 tl_magic(const BufferSlice &buffer_slice);
@ -337,29 +302,9 @@ class NetQuery final : public TsListNode<NetQueryDebug> {
int32 tl_constructor, int32 total_timeout_limit, NetQueryStats *stats, vector<ChainId> chain_ids); int32 tl_constructor, int32 total_timeout_limit, NetQueryStats *stats, vector<ChainId> chain_ids);
}; };
inline StringBuilder &operator<<(StringBuilder &stream, const NetQuery &net_query) { StringBuilder &operator<<(StringBuilder &stream, const NetQuery &net_query);
stream << "[Query:";
stream << tag("id", net_query.id());
stream << tag("tl", format::as_hex(net_query.tl_constructor()));
auto message_id = net_query.message_id();
if (message_id != 0) {
stream << tag("msg_id", format::as_hex(message_id));
}
if (net_query.is_error()) {
stream << net_query.error();
} else if (net_query.is_ok()) {
stream << tag("result_tl", format::as_hex(net_query.ok_tl_constructor()));
}
stream << ']';
return stream;
}
inline StringBuilder &operator<<(StringBuilder &stream, const NetQueryPtr &net_query_ptr) { StringBuilder &operator<<(StringBuilder &stream, const NetQueryPtr &net_query_ptr);
if (net_query_ptr.empty()) {
return stream << "[Query: null]";
}
return stream << *net_query_ptr;
}
inline void cancel_query(NetQueryRef &ref) { inline void cancel_query(NetQueryRef &ref) {
if (ref.empty()) { if (ref.empty()) {