Use int32 ro store last/next/total_timeout.
This commit is contained in:
parent
1ffb1ebb2f
commit
46e23a407f
@ -636,7 +636,9 @@ void CallActor::try_send_request_query() {
|
|||||||
auto timeout = static_cast<double>(call_receive_timeout_ms) * 0.001;
|
auto timeout = static_cast<double>(call_receive_timeout_ms) * 0.001;
|
||||||
LOG(INFO) << "Set call timeout to " << timeout;
|
LOG(INFO) << "Set call timeout to " << timeout;
|
||||||
set_timeout_in(timeout);
|
set_timeout_in(timeout);
|
||||||
query->total_timeout_limit_ = max(timeout, 10.0);
|
query->total_timeout_limit_ =
|
||||||
|
static_cast<int32>(clamp(call_receive_timeout_ms + 999, static_cast<int64>(10000), static_cast<int64>(100000))) /
|
||||||
|
1000;
|
||||||
request_query_ref_ = query.get_weak();
|
request_query_ref_ = query.get_weak();
|
||||||
send_with_promise(std::move(query),
|
send_with_promise(std::move(query),
|
||||||
PromiseCreator::lambda([actor_id = actor_id(this)](Result<NetQueryPtr> r_net_query) {
|
PromiseCreator::lambda([actor_id = actor_id(this)](Result<NetQueryPtr> r_net_query) {
|
||||||
|
@ -46,7 +46,7 @@ void SequenceDispatcher::send_with_callback(NetQueryPtr query, ActorShared<NetQu
|
|||||||
cancel_timeout();
|
cancel_timeout();
|
||||||
query->debug("Waiting at SequenceDispatcher");
|
query->debug("Waiting at SequenceDispatcher");
|
||||||
auto query_weak_ref = query.get_weak();
|
auto query_weak_ref = query.get_weak();
|
||||||
data_.push_back(Data{State::Start, std::move(query_weak_ref), std::move(query), std::move(callback), 0, 0.0, 0.0});
|
data_.push_back(Data{State::Start, std::move(query_weak_ref), std::move(query), std::move(callback), 0, 0, 0});
|
||||||
loop();
|
loop();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -60,8 +60,7 @@ void SequenceDispatcher::check_timeout(Data &data) {
|
|||||||
LOG(WARNING) << "Fail " << data.query_ << " to " << data.query_->source_ << " because total_timeout "
|
LOG(WARNING) << "Fail " << data.query_ << " to " << data.query_->source_ << " because total_timeout "
|
||||||
<< data.query_->total_timeout_ << " is greater than total_timeout_limit "
|
<< data.query_->total_timeout_ << " is greater than total_timeout_limit "
|
||||||
<< data.query_->total_timeout_limit_;
|
<< data.query_->total_timeout_limit_;
|
||||||
data.query_->set_error(Status::Error(
|
data.query_->set_error(Status::Error(429, PSLICE() << "Too Many Requests: retry after " << data.last_timeout_));
|
||||||
429, PSLICE() << "Too Many Requests: retry after " << static_cast<int32>(data.last_timeout_ + 0.999)));
|
|
||||||
data.state_ = State::Dummy;
|
data.state_ = State::Dummy;
|
||||||
try_resend_query(data, std::move(data.query_));
|
try_resend_query(data, std::move(data.query_));
|
||||||
}
|
}
|
||||||
@ -307,8 +306,8 @@ class MultiSequenceDispatcherImpl final : public MultiSequenceDispatcher {
|
|||||||
struct Node {
|
struct Node {
|
||||||
NetQueryRef net_query_ref;
|
NetQueryRef net_query_ref;
|
||||||
NetQueryPtr net_query;
|
NetQueryPtr net_query;
|
||||||
double total_timeout{0};
|
int32 total_timeout{0};
|
||||||
double last_timeout{0};
|
int32 last_timeout{0};
|
||||||
ActorShared<NetQueryCallback> callback;
|
ActorShared<NetQueryCallback> callback;
|
||||||
friend StringBuilder &operator<<(StringBuilder &sb, const Node &node) {
|
friend StringBuilder &operator<<(StringBuilder &sb, const Node &node) {
|
||||||
return sb << node.net_query;
|
return sb << node.net_query;
|
||||||
@ -329,8 +328,7 @@ class MultiSequenceDispatcherImpl final : public MultiSequenceDispatcher {
|
|||||||
LOG(WARNING) << "Fail " << net_query << " to " << net_query->source_ << " because total_timeout "
|
LOG(WARNING) << "Fail " << net_query << " to " << net_query->source_ << " because total_timeout "
|
||||||
<< net_query->total_timeout_ << " is greater than total_timeout_limit "
|
<< net_query->total_timeout_ << " is greater than total_timeout_limit "
|
||||||
<< net_query->total_timeout_limit_;
|
<< net_query->total_timeout_limit_;
|
||||||
net_query->set_error(Status::Error(
|
net_query->set_error(Status::Error(429, PSLICE() << "Too Many Requests: retry after " << node.last_timeout));
|
||||||
429, PSLICE() << "Too Many Requests: retry after " << static_cast<int32>(node.last_timeout + 0.999)));
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
return false;
|
return false;
|
||||||
|
@ -41,8 +41,8 @@ class SequenceDispatcher final : public NetQueryCallback {
|
|||||||
NetQueryPtr query_;
|
NetQueryPtr query_;
|
||||||
ActorShared<NetQueryCallback> callback_;
|
ActorShared<NetQueryCallback> callback_;
|
||||||
uint64 generation_;
|
uint64 generation_;
|
||||||
double total_timeout_;
|
int32 total_timeout_;
|
||||||
double last_timeout_;
|
int32 last_timeout_;
|
||||||
};
|
};
|
||||||
|
|
||||||
ActorShared<Parent> parent_;
|
ActorShared<Parent> parent_;
|
||||||
|
@ -346,19 +346,19 @@ class NetQuery final : public TsListNode<NetQueryDebug> {
|
|||||||
static int32 tl_magic(const BufferSlice &buffer_slice);
|
static int32 tl_magic(const BufferSlice &buffer_slice);
|
||||||
|
|
||||||
public:
|
public:
|
||||||
double next_timeout_ = 1; // for NetQueryDelayer
|
int32 next_timeout_ = 1; // for NetQueryDelayer
|
||||||
double total_timeout_ = 0; // for NetQueryDelayer/SequenceDispatcher
|
int32 total_timeout_ = 0; // for NetQueryDelayer/SequenceDispatcher
|
||||||
double total_timeout_limit_ = 60; // for NetQueryDelayer/SequenceDispatcher and to be set by caller
|
int32 total_timeout_limit_ = 60; // for NetQueryDelayer/SequenceDispatcher and to be set by caller
|
||||||
double last_timeout_ = 0; // for NetQueryDelayer/SequenceDispatcher
|
int32 last_timeout_ = 0; // for NetQueryDelayer/SequenceDispatcher
|
||||||
string source_; // for NetQueryDelayer/SequenceDispatcher
|
string source_; // for NetQueryDelayer/SequenceDispatcher
|
||||||
bool need_resend_on_503_ = true; // for NetQueryDispatcher and to be set by caller
|
int32 dispatch_ttl_ = -1; // for NetQueryDispatcher and to be set by caller
|
||||||
int32 dispatch_ttl_ = -1; // for NetQueryDispatcher and to be set by caller
|
int32 file_type_ = -1; // to be set by caller
|
||||||
Slot cancel_slot_; // for Session and to be set by caller
|
Slot cancel_slot_; // for Session and to be set by caller
|
||||||
Promise<> quick_ack_promise_; // for Session and to be set by caller
|
Promise<> quick_ack_promise_; // for Session and to be set by caller
|
||||||
int32 file_type_ = -1; // to be set by caller
|
bool need_resend_on_503_ = true; // for NetQueryDispatcher and to be set by caller
|
||||||
|
|
||||||
NetQuery(State state, uint64 id, BufferSlice &&query, BufferSlice &&answer, DcId dc_id, Type type, AuthFlag auth_flag,
|
NetQuery(State state, uint64 id, BufferSlice &&query, BufferSlice &&answer, DcId dc_id, Type type, AuthFlag auth_flag,
|
||||||
GzipFlag gzip_flag, int32 tl_constructor, double total_timeout_limit, NetQueryStats *stats)
|
GzipFlag gzip_flag, int32 tl_constructor, int32 total_timeout_limit, NetQueryStats *stats)
|
||||||
: state_(state)
|
: state_(state)
|
||||||
, type_(type)
|
, type_(type)
|
||||||
, auth_flag_(auth_flag)
|
, auth_flag_(auth_flag)
|
||||||
|
@ -61,7 +61,7 @@ NetQueryPtr NetQueryCreator::create(uint64 id, const telegram_api::Function &fun
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
double total_timeout_limit = 60;
|
int32 total_timeout_limit = 60;
|
||||||
if (!G()->close_flag()) {
|
if (!G()->close_flag()) {
|
||||||
auto td = G()->td();
|
auto td = G()->td();
|
||||||
if (!td.empty()) {
|
if (!td.empty()) {
|
||||||
|
@ -24,7 +24,7 @@ void NetQueryDelayer::delay(NetQueryPtr query) {
|
|||||||
query->is_ready();
|
query->is_ready();
|
||||||
CHECK(query->is_error());
|
CHECK(query->is_error());
|
||||||
auto code = query->error().code();
|
auto code = query->error().code();
|
||||||
double timeout = 0;
|
int32 timeout = 0;
|
||||||
if (code < 0) {
|
if (code < 0) {
|
||||||
// skip
|
// skip
|
||||||
} else if (code == 500) {
|
} else if (code == 500) {
|
||||||
@ -73,8 +73,7 @@ void NetQueryDelayer::delay(NetQueryPtr query) {
|
|||||||
LOG(WARNING) << "Failed: " << query << " " << tag("timeout", timeout) << tag("total_timeout", query->total_timeout_)
|
LOG(WARNING) << "Failed: " << query << " " << tag("timeout", timeout) << tag("total_timeout", query->total_timeout_)
|
||||||
<< " because of " << error << " from " << query->source_;
|
<< " because of " << error << " from " << query->source_;
|
||||||
// NB: code must differ from tdapi FLOOD_WAIT code
|
// NB: code must differ from tdapi FLOOD_WAIT code
|
||||||
query->set_error(
|
query->set_error(Status::Error(429, PSLICE() << "Too Many Requests: retry after " << timeout));
|
||||||
Status::Error(429, PSLICE() << "Too Many Requests: retry after " << static_cast<int32>(timeout + 0.999)));
|
|
||||||
query->debug("DcManager: send to DcManager");
|
query->debug("DcManager: send to DcManager");
|
||||||
G()->net_query_dispatcher().dispatch(std::move(query));
|
G()->net_query_dispatcher().dispatch(std::move(query));
|
||||||
return;
|
return;
|
||||||
|
Loading…
Reference in New Issue
Block a user