Improve NetQuery public field names.
GitOrigin-RevId: cfa0dbd98c1b03e4e86d62fc6486fcf90e72ef25
This commit is contained in:
parent
1e6bdbb7a5
commit
c921bcc5ed
@ -582,7 +582,7 @@ void CallActor::try_send_request_query() {
|
||||
double timeout = call_receive_timeout_ms * 0.001;
|
||||
LOG(INFO) << "Set call timeout to " << timeout;
|
||||
set_timeout_in(timeout);
|
||||
query->total_timeout_limit = timeout;
|
||||
query->total_timeout_limit_ = max(timeout, 10.0);
|
||||
request_query_ref_ = query.get_weak();
|
||||
send_with_promise(std::move(query), PromiseCreator::lambda([actor_id = actor_id(this)](NetQueryPtr net_query) {
|
||||
send_closure(actor_id, &CallActor::on_request_query_result, std::move(net_query));
|
||||
|
@ -63,7 +63,7 @@ class GetBotCallbackAnswerQuery : public Td::ResultHandler {
|
||||
|
||||
auto net_query = G()->net_query_creator().create(telegram_api::messages_getBotCallbackAnswer(
|
||||
flags, false /*ignored*/, std::move(input_peer), message_id.get_server_message_id().get(), std::move(data)));
|
||||
net_query->need_resend_on_503 = false;
|
||||
net_query->need_resend_on_503_ = false;
|
||||
send_query(std::move(net_query));
|
||||
}
|
||||
|
||||
|
@ -506,9 +506,9 @@ ActorOwn<> get_full_config(DcOption option, Promise<FullConfig> promise, ActorSh
|
||||
false /*need_destroy_auth_key*/, mtproto::AuthKey(),
|
||||
std::vector<mtproto::ServerSalt>());
|
||||
auto query = G()->net_query_creator().create_unauth(telegram_api::help_getConfig(), DcId::empty());
|
||||
query->total_timeout_limit = 60 * 60 * 24;
|
||||
query->total_timeout_limit_ = 60 * 60 * 24;
|
||||
query->set_callback(actor_shared(this));
|
||||
query->dispatch_ttl = 0;
|
||||
query->dispatch_ttl_ = 0;
|
||||
send_closure(session_, &Session::send, std::move(query));
|
||||
set_timeout_in(10);
|
||||
}
|
||||
@ -937,7 +937,7 @@ void ConfigManager::get_app_config(Promise<td_api::object_ptr<td_api::JsonValue>
|
||||
get_app_config_queries_.push_back(std::move(promise));
|
||||
if (get_app_config_queries_.size() == 1) {
|
||||
auto query = G()->net_query_creator().create_unauth(telegram_api::help_getAppConfig());
|
||||
query->total_timeout_limit = 60 * 60 * 24;
|
||||
query->total_timeout_limit_ = 60 * 60 * 24;
|
||||
G()->net_query_dispatcher().dispatch_with_callback(std::move(query), actor_shared(this, 1));
|
||||
}
|
||||
}
|
||||
@ -993,7 +993,7 @@ void ConfigManager::on_dc_options_update(DcOptions dc_options) {
|
||||
void ConfigManager::request_config_from_dc_impl(DcId dc_id) {
|
||||
config_sent_cnt_++;
|
||||
auto query = G()->net_query_creator().create_unauth(telegram_api::help_getConfig(), dc_id);
|
||||
query->total_timeout_limit = 60 * 60 * 24;
|
||||
query->total_timeout_limit_ = 60 * 60 * 24;
|
||||
G()->net_query_dispatcher().dispatch_with_callback(std::move(query), actor_shared(this, 0));
|
||||
}
|
||||
|
||||
|
@ -84,7 +84,7 @@ class GetInlineBotResultsQuery : public Td::ResultHandler {
|
||||
flags, std::move(bot_input_user), std::move(input_peer),
|
||||
user_location.empty() ? nullptr : user_location.get_input_geo_point(), query, offset));
|
||||
auto result = net_query.get_weak();
|
||||
net_query->need_resend_on_503 = false;
|
||||
net_query->need_resend_on_503_ = false;
|
||||
send_query(std::move(net_query));
|
||||
return result;
|
||||
}
|
||||
|
@ -1476,7 +1476,7 @@ NetQueryPtr SecretChatActor::create_net_query(const logevent::OutboundSecretMess
|
||||
message.file.as_input_encrypted_file()));
|
||||
}
|
||||
if (!message.is_rewritable) {
|
||||
query->total_timeout_limit = 1000000000; // inf. We will re-sent it immediately anyway
|
||||
query->total_timeout_limit_ = 1000000000; // inf. We will re-sent it immediately anyway
|
||||
}
|
||||
if (message.is_external && context_->get_config_option_boolean("use_quick_ack")) {
|
||||
query->quick_ack_promise_ =
|
||||
|
@ -49,12 +49,12 @@ void SequenceDispatcher::check_timeout(Data &data) {
|
||||
if (data.state_ != State::Start) {
|
||||
return;
|
||||
}
|
||||
data.query_->total_timeout += data.total_timeout_;
|
||||
data.query_->total_timeout_ += data.total_timeout_;
|
||||
data.total_timeout_ = 0;
|
||||
if (data.query_->total_timeout > data.query_->total_timeout_limit) {
|
||||
if (data.query_->total_timeout_ > data.query_->total_timeout_limit_) {
|
||||
LOG(WARNING) << "Fail " << data.query_ << " to " << data.query_->source_ << " because total_timeout "
|
||||
<< data.query_->total_timeout << " is greater than total_timeout_limit "
|
||||
<< data.query_->total_timeout_limit;
|
||||
<< data.query_->total_timeout_ << " is greater than total_timeout_limit "
|
||||
<< data.query_->total_timeout_limit_;
|
||||
data.query_->set_error(Status::Error(
|
||||
429, PSLICE() << "Too Many Requests: retry after " << static_cast<int32>(data.last_timeout_ + 0.999)));
|
||||
data.state_ = State::Dummy;
|
||||
@ -129,10 +129,10 @@ void SequenceDispatcher::on_result(NetQueryPtr query) {
|
||||
size_t pos = &data - &data_[0];
|
||||
CHECK(pos < data_.size());
|
||||
|
||||
if (query->last_timeout != 0) {
|
||||
if (query->last_timeout_ != 0) {
|
||||
for (auto i = pos + 1; i < data_.size(); i++) {
|
||||
data_[i].total_timeout_ += query->last_timeout;
|
||||
data_[i].last_timeout_ = query->last_timeout;
|
||||
data_[i].total_timeout_ += query->last_timeout_;
|
||||
data_[i].last_timeout_ = query->last_timeout_;
|
||||
check_timeout(data_[i]);
|
||||
}
|
||||
}
|
||||
@ -166,7 +166,7 @@ void SequenceDispatcher::loop() {
|
||||
invoke_after = data_[last_sent_i_].net_query_ref_;
|
||||
}
|
||||
data_[next_i_].query_->set_invoke_after(invoke_after);
|
||||
data_[next_i_].query_->last_timeout = 0;
|
||||
data_[next_i_].query_->last_timeout_ = 0;
|
||||
|
||||
VLOG(net_query) << "Send " << data_[next_i_].query_;
|
||||
|
||||
|
@ -173,7 +173,7 @@ void DcAuthManager::dc_loop(DcInfo &dc) {
|
||||
auto id = UniqueId::next();
|
||||
auto query = G()->net_query_creator().create(id, telegram_api::auth_exportAuthorization(dc.dc_id.get_raw_id()),
|
||||
DcId::main(), NetQuery::Type::Common, NetQuery::AuthFlag::On);
|
||||
query->total_timeout_limit = 60 * 60 * 24;
|
||||
query->total_timeout_limit_ = 60 * 60 * 24;
|
||||
G()->net_query_dispatcher().dispatch_with_callback(std::move(query), actor_shared(this, dc.dc_id.get_raw_id()));
|
||||
dc.wait_id = id;
|
||||
dc.export_id = -1;
|
||||
@ -190,7 +190,7 @@ void DcAuthManager::dc_loop(DcInfo &dc) {
|
||||
auto query = G()->net_query_creator().create(
|
||||
id, telegram_api::auth_importAuthorization(dc.export_id, std::move(dc.export_bytes)), dc.dc_id,
|
||||
NetQuery::Type::Common, NetQuery::AuthFlag::Off);
|
||||
query->total_timeout_limit = 60 * 60 * 24;
|
||||
query->total_timeout_limit_ = 60 * 60 * 24;
|
||||
G()->net_query_dispatcher().dispatch_with_callback(std::move(query), actor_shared(this, dc.dc_id.get_raw_id()));
|
||||
dc.wait_id = id;
|
||||
dc.state = DcInfo::State::BeforeOk;
|
||||
|
@ -83,7 +83,7 @@ void dump_pending_network_queries() {
|
||||
}
|
||||
const NetQueryDebug &debug = cur->get_data_unsafe();
|
||||
const NetQuery &nq = *static_cast<const NetQuery *>(cur);
|
||||
LOG(WARNING) << tag("user id", debug.my_id_) << nq << tag("total flood", format::as_time(nq.total_timeout))
|
||||
LOG(WARNING) << tag("user id", debug.my_id_) << nq << tag("total flood", format::as_time(nq.total_timeout_))
|
||||
<< tag("since start", format::as_time(Time::now_cached() - debug.start_timestamp_))
|
||||
<< tag("state", debug.state_)
|
||||
<< tag("in this state", format::as_time(Time::now_cached() - debug.state_timestamp_))
|
||||
|
@ -146,7 +146,7 @@ class NetQuery : public TsListNode<NetQueryDebug> {
|
||||
void on_net_write(size_t size);
|
||||
void on_net_read(size_t size);
|
||||
|
||||
void set_error(Status status, string source = "");
|
||||
void set_error(Status status, string source = string());
|
||||
|
||||
void set_error_resend() {
|
||||
set_error_impl(Status::Error<Error::Resend>());
|
||||
@ -288,6 +288,7 @@ class NetQuery : public TsListNode<NetQueryDebug> {
|
||||
GzipFlag gzip_flag_ = GzipFlag::Off;
|
||||
DcId dc_id_;
|
||||
|
||||
NetQueryCounter nq_counter_;
|
||||
Status status_;
|
||||
uint64 id_ = 0;
|
||||
BufferSlice query_;
|
||||
@ -296,6 +297,9 @@ class NetQuery : public TsListNode<NetQueryDebug> {
|
||||
|
||||
NetQueryRef invoke_after_;
|
||||
uint32 session_rand_ = 0;
|
||||
|
||||
bool may_be_lost_ = false;
|
||||
|
||||
template <class T>
|
||||
struct movable_atomic : public std::atomic<T> {
|
||||
movable_atomic() = default;
|
||||
@ -313,35 +317,32 @@ class NetQuery : public TsListNode<NetQueryDebug> {
|
||||
~movable_atomic() = default;
|
||||
};
|
||||
|
||||
static int32 get_my_id();
|
||||
|
||||
movable_atomic<uint64> session_id_{0};
|
||||
uint64 message_id_{0};
|
||||
|
||||
movable_atomic<int32> cancellation_token_{-1}; // == 0 if query is canceled
|
||||
ActorShared<NetQueryCallback> callback_;
|
||||
|
||||
void set_error_impl(Status status, string source = "") {
|
||||
void set_error_impl(Status status, string source = string()) {
|
||||
VLOG(net_query) << "Got error " << *this << " " << status;
|
||||
status_ = std::move(status);
|
||||
state_ = State::Error;
|
||||
source_ = std::move(source);
|
||||
}
|
||||
|
||||
public:
|
||||
double next_timeout = 1;
|
||||
double total_timeout = 0;
|
||||
double total_timeout_limit = 60;
|
||||
double last_timeout = 0;
|
||||
bool need_resend_on_503 = true;
|
||||
bool may_be_lost_ = false;
|
||||
string source_;
|
||||
int32 dispatch_ttl = -1;
|
||||
Slot cancel_slot_;
|
||||
Promise<> quick_ack_promise_;
|
||||
int32 file_type_ = -1;
|
||||
static int32 get_my_id();
|
||||
|
||||
NetQueryCounter nq_counter_;
|
||||
public:
|
||||
double next_timeout_ = 1; // for NetQueryDelayer
|
||||
double total_timeout_ = 0; // for NetQueryDelayer/SequenceDispatcher
|
||||
double total_timeout_limit_ = 60; // for NetQueryDelayer/SequenceDispatcher and to be set by caller
|
||||
double last_timeout_ = 0; // for NetQueryDelayer/SequenceDispatcher
|
||||
string source_; // for NetQueryDelayer/SequenceDispatcher
|
||||
bool need_resend_on_503_ = true; // for NetQueryDispatcher and to be set by caller
|
||||
int32 dispatch_ttl_ = -1; // for NetQueryDispatcher and to be set by caller
|
||||
Slot cancel_slot_; // for Session and to be set by caller
|
||||
Promise<> quick_ack_promise_; // for Session and to be set by caller
|
||||
int32 file_type_ = -1; // to be set by caller
|
||||
|
||||
NetQuery(State state, uint64 id, BufferSlice &&query, BufferSlice &&answer, DcId dc_id, Type type, AuthFlag auth_flag,
|
||||
GzipFlag gzip_flag, int32 tl_constructor, double total_timeout_limit)
|
||||
@ -350,13 +351,13 @@ class NetQuery : public TsListNode<NetQueryDebug> {
|
||||
, auth_flag_(auth_flag)
|
||||
, gzip_flag_(gzip_flag)
|
||||
, dc_id_(dc_id)
|
||||
, nq_counter_(true)
|
||||
, status_()
|
||||
, id_(id)
|
||||
, query_(std::move(query))
|
||||
, answer_(std::move(answer))
|
||||
, tl_constructor_(tl_constructor)
|
||||
, total_timeout_limit(total_timeout_limit)
|
||||
, nq_counter_(true) {
|
||||
, total_timeout_limit_(total_timeout_limit) {
|
||||
get_data_unsafe().my_id_ = get_my_id();
|
||||
get_data_unsafe().start_timestamp_ = Time::now();
|
||||
LOG(INFO) << *this;
|
||||
|
@ -46,30 +46,30 @@ void NetQueryDelayer::delay(NetQueryPtr query) {
|
||||
}
|
||||
|
||||
if (timeout == 0) {
|
||||
timeout = query->next_timeout;
|
||||
timeout = query->next_timeout_;
|
||||
if (timeout < 60) {
|
||||
query->next_timeout *= 2;
|
||||
query->next_timeout_ *= 2;
|
||||
}
|
||||
} else {
|
||||
query->next_timeout = 1;
|
||||
query->next_timeout_ = 1;
|
||||
}
|
||||
query->total_timeout += timeout;
|
||||
query->last_timeout = timeout;
|
||||
query->total_timeout_ += timeout;
|
||||
query->last_timeout_ = timeout;
|
||||
|
||||
auto error = query->error().move_as_error();
|
||||
query->resend();
|
||||
|
||||
// Fix for infinity flood control
|
||||
if (!query->need_resend_on_503 && code == -503) {
|
||||
if (!query->need_resend_on_503_ && code == -503) {
|
||||
query->set_error(Status::Error(502, "Bad Gateway"));
|
||||
query->debug("DcManager: send to DcManager");
|
||||
G()->net_query_dispatcher().dispatch(std::move(query));
|
||||
return;
|
||||
}
|
||||
|
||||
if (query->total_timeout > query->total_timeout_limit) {
|
||||
if (query->total_timeout_ > query->total_timeout_limit_) {
|
||||
// TODO: support timeouts in DcAuth and GetConfig
|
||||
LOG(WARNING) << "Failed: " << query << " " << tag("timeout", timeout) << tag("total_timeout", query->total_timeout)
|
||||
LOG(WARNING) << "Failed: " << query << " " << tag("timeout", timeout) << tag("total_timeout", query->total_timeout_)
|
||||
<< " because of " << error << " from " << query->source_;
|
||||
// NB: code must differ from tdapi FLOOD_WAIT code
|
||||
query->set_error(
|
||||
@ -79,7 +79,7 @@ void NetQueryDelayer::delay(NetQueryPtr query) {
|
||||
return;
|
||||
}
|
||||
|
||||
LOG(WARNING) << "Delay: " << query << " " << tag("timeout", timeout) << tag("total_timeout", query->total_timeout)
|
||||
LOG(WARNING) << "Delay: " << query << " " << tag("timeout", timeout) << tag("total_timeout", query->total_timeout_)
|
||||
<< " because of " << error << " from " << query->source_;
|
||||
query->debug(PSTRING() << "delay for " << format::as_time(timeout));
|
||||
auto id = container_.create(QuerySlot());
|
||||
|
@ -72,7 +72,7 @@ void NetQueryDispatcher::dispatch(NetQueryPtr net_query) {
|
||||
}
|
||||
|
||||
if (!net_query->is_ready()) {
|
||||
if (net_query->dispatch_ttl == 0) {
|
||||
if (net_query->dispatch_ttl_ == 0) {
|
||||
net_query->set_error(Status::Error("DispatchTtlError"));
|
||||
}
|
||||
}
|
||||
@ -89,8 +89,8 @@ void NetQueryDispatcher::dispatch(NetQueryPtr net_query) {
|
||||
return complete_net_query(std::move(net_query));
|
||||
}
|
||||
|
||||
if (net_query->dispatch_ttl > 0) {
|
||||
net_query->dispatch_ttl--;
|
||||
if (net_query->dispatch_ttl_ > 0) {
|
||||
net_query->dispatch_ttl_--;
|
||||
}
|
||||
|
||||
size_t dc_pos = static_cast<size_t>(dest_dc_id.get_raw_id() - 1);
|
||||
|
@ -69,7 +69,7 @@ void PublicRsaKeyWatchdog::loop() {
|
||||
flood_control_.add_event(static_cast<int32>(Time::now_cached()));
|
||||
has_query_ = true;
|
||||
auto query = G()->net_query_creator().create(telegram_api::help_getCdnConfig());
|
||||
query->total_timeout_limit = 60 * 60 * 24;
|
||||
query->total_timeout_limit_ = 60 * 60 * 24;
|
||||
G()->net_query_dispatcher().dispatch_with_callback(std::move(query), actor_shared(this));
|
||||
}
|
||||
|
||||
|
@ -1118,7 +1118,7 @@ bool Session::connection_send_check_main_key(ConnectionInfo *info) {
|
||||
last_check_query_id_ = UniqueId::next(UniqueId::BindKey);
|
||||
NetQueryPtr query = G()->net_query_creator().create(last_check_query_id_, telegram_api::help_getNearestDc(),
|
||||
DcId::main(), NetQuery::Type::Common, NetQuery::AuthFlag::On);
|
||||
query->dispatch_ttl = 0;
|
||||
query->dispatch_ttl_ = 0;
|
||||
query->set_callback(actor_shared(this));
|
||||
connection_send_query(info, std::move(query));
|
||||
|
||||
@ -1155,7 +1155,7 @@ bool Session::connection_send_bind_key(ConnectionInfo *info) {
|
||||
last_bind_query_id_,
|
||||
telegram_api::auth_bindTempAuthKey(perm_auth_key_id, nonce, expires_at, std::move(encrypted)), DcId::main(),
|
||||
NetQuery::Type::Common, NetQuery::AuthFlag::On);
|
||||
query->dispatch_ttl = 0;
|
||||
query->dispatch_ttl_ = 0;
|
||||
query->set_callback(actor_shared(this));
|
||||
connection_send_query(info, std::move(query), message_id);
|
||||
|
||||
|
@ -39,7 +39,7 @@ SessionMultiProxy::SessionMultiProxy(int32 session_count, std::shared_ptr<AuthDa
|
||||
void SessionMultiProxy::send(NetQueryPtr query) {
|
||||
size_t pos = 0;
|
||||
// TODO temporary hack with total_timeout_limit
|
||||
if (query->auth_flag() == NetQuery::AuthFlag::On && query->total_timeout_limit > 7) {
|
||||
if (query->auth_flag() == NetQuery::AuthFlag::On && query->total_timeout_limit_ > 7) {
|
||||
if (query->session_rand()) {
|
||||
pos = query->session_rand() % sessions_.size();
|
||||
} else {
|
||||
|
Loading…
Reference in New Issue
Block a user