SequenceDispatcher: distribute food limits to net queries in the same chain and with the same tl_constructor

This commit is contained in:
Arseny Smirnov 2022-02-01 16:01:17 +03:00
parent 24766fdad8
commit da4d6318fc
2 changed files with 118 additions and 40 deletions

View File

@ -311,26 +311,49 @@ class MultiSequenceDispatcherNewImpl final : public MultiSequenceDispatcherNew {
using TaskId = ChainScheduler<NetQueryPtr>::TaskId;
using ChainId = ChainScheduler<NetQueryPtr>::ChainId;
bool check_timeout(TaskId task_id) {
auto &node = *scheduler_.get_task_extra(task_id);
NetQueryPtr &net_query = node.net_query;
if (net_query->is_ready()) {
return false;
}
net_query->total_timeout_ += node.total_timeout;
node.total_timeout = 0;
if (net_query->total_timeout_ > net_query->total_timeout_limit_) {
LOG(WARNING) << "Fail " << net_query << " to " << net_query->source_ << " because total_timeout "
<< net_query->total_timeout_ << " is greater than total_timeout_limit "
<< net_query->total_timeout_limit_;
net_query->set_error(Status::Error(
429, PSLICE() << "Too Many Requests: retry after " << static_cast<int32>(node.last_timeout + 0.999)));
return true;
}
return false;
}
void on_result(NetQueryPtr query) final {
auto task_id = TaskId(get_link_token());
auto &node = *scheduler_.get_task_extra(task_id);
// if (query->last_timeout_ != 0) {
// for (auto i = pos + 1; i < data_.size(); i++) {
// data_[i].total_timeout_ += query->last_timeout_;
// data_[i].last_timeout_ = query->last_timeout_;
// check_timeout(data_[i]);
// if (data.query_->total_timeout_ > data.query_->total_timeout_limit_) {
// LOG(WARNING) << "Fail " << data.query_ << " to " << data.query_->source_ << " because total_timeout "
// << data.query_->total_timeout_ << " is greater than total_timeout_limit "
// << data.query_->total_timeout_limit_;
// data.query_->set_error(Status::Error(
// 429, PSLICE() << "Too Many Requests: retry after " << static_cast<int32>(data.last_timeout_ + 0.999)));
// data.state_ = State::Dummy;
// try_resend_query(data, std::move(data.query_));
// }
// }
// }
if (query->last_timeout_ != 0) {
std::vector<TaskId> to_check_timeout;
auto tl_constructor = query->tl_constructor();
scheduler_.for_each_dependent(task_id, [&](TaskId child_task_id) {
auto &child_node = *scheduler_.get_task_extra(child_task_id);
if (node.net_query_ref->tl_constructor() == tl_constructor) {
node.total_timeout += query->last_timeout_;
node.last_timeout = query->last_timeout_;
to_check_timeout.push_back(child_task_id);
}
});
for (auto task_id : to_check_timeout) {
if (check_timeout(task_id)) {
scheduler_.pause_task(task_id);
try_resend(task_id);
}
}
}
if (query->is_error() && (query->error().code() == NetQuery::ResendInvokeAfter ||
(query->error().code() == 400 && (query->error().message() == "MSG_WAIT_FAILED" ||
@ -339,20 +362,31 @@ class MultiSequenceDispatcherNewImpl final : public MultiSequenceDispatcherNew {
query->resend();
return on_resend(std::move(query));
}
auto promise = promise_send_closure(actor_shared(this, task_id), &MultiSequenceDispatcherNewImpl::on_resend);
send_closure(node.callback, &NetQueryCallback::on_result_resendable, std::move(query), std::move(promise));
node.net_query = std::move(query);
try_resend(task_id);
}
void on_resend(Result<NetQueryPtr> query) {
void try_resend(TaskId task_id) {
auto &node = *scheduler_.get_task_extra(task_id);
auto promise = promise_send_closure(actor_shared(this, task_id), &MultiSequenceDispatcherNewImpl::on_resend);
send_closure(node.callback, &NetQueryCallback::on_result_resendable, std::move(node.net_query), std::move(promise));
}
void on_resend(Result<NetQueryPtr> r_query) {
auto task_id = TaskId(get_link_token());
auto &node = *scheduler_.get_task_extra(task_id);
if (query.is_error()) {
if (r_query.is_error()) {
scheduler_.finish_task(task_id);
} else {
node.net_query = query.move_as_ok();
node.net_query = r_query.move_as_ok();
node.net_query->debug("Waiting at SequenceDispatcher");
node.net_query_ref = node.net_query.get_weak();
scheduler_.reset_task(task_id);
if (check_timeout(task_id)) {
scheduler_.pause_task(task_id);
try_resend(task_id);
} else {
scheduler_.reset_task(task_id);
}
}
loop();
}
@ -390,7 +424,7 @@ class MultiSequenceDispatcherNewImpl final : public MultiSequenceDispatcherNew {
}
query->set_invoke_after(std::move(parents));
query->last_timeout_ = 0; // TODO: flood
query->last_timeout_ = 0;
VLOG(net_query) << "Send " << query;
query->debug("send to Td::send_with_callback");
G()->net_query_dispatcher().dispatch_with_callback(std::move(query), actor_shared(this, task.task_id));

View File

@ -36,6 +36,7 @@ class ChainScheduler : public ChainSchedulerBase {
ExtraT *get_task_extra(TaskId task_id);
optional<TaskWithParents> start_next_task();
void pause_task(TaskId task_id);
void finish_task(TaskId task_id);
void reset_task(TaskId task_id);
template <class ExtraTT>
@ -45,6 +46,29 @@ class ChainScheduler : public ChainSchedulerBase {
void for_each(F &&f) {
tasks_.for_each([&f](auto, Task &task) { f(task.extra); });
}
template <class F>
void for_each_dependent(TaskId task_id, F &&f) {
auto *task = tasks_.get(task_id);
CHECK(task);
std::set<TaskId> visited;
bool check_for_collisions = task->chains.size() > 1;
for (TaskChainInfo &task_chain_info : task->chains) {
ChainInfo &chain_info = *task_chain_info.chain_info;
chain_info.chain.foreach_child(&task_chain_info.chain_node, [&](auto task_id, auto) {
if (check_for_collisions) {
auto it_ok = visited.insert(task_id);
if (!it_ok.second) {
return;
}
}
f(task_id);
});
auto o_child = chain_info.chain.get_child(&task_chain_info.chain_node);
if (o_child) {
f(o_child.value());
}
}
}
private:
struct ChainNode : ListNode {
@ -92,6 +116,12 @@ class ChainScheduler : public ChainSchedulerBase {
f(node.task_id, node.generation);
}
}
void foreach_child(ListNode *node, std::function<void(TaskId, uint64)> f) const {
for (auto it = node; it != head_.end(); it = it->get_next()) {
auto &node = static_cast<const ChainNode &>(*it);
f(node.task_id, node.generation);
}
}
private:
ListNode head_;
@ -107,7 +137,7 @@ class ChainScheduler : public ChainSchedulerBase {
ChainInfo *chain_info{};
};
struct Task {
enum class State { Pending, Active } state{State::Pending};
enum class State { Pending, Active, Paused } state{State::Pending};
vector<TaskChainInfo> chains;
ExtraT extra;
};
@ -116,7 +146,9 @@ class ChainScheduler : public ChainSchedulerBase {
Container<Task> tasks_;
VectorQueue<TaskId> pending_tasks_;
void try_start_task(TaskId task_id, Task *task) {
void try_start_task(TaskId task_id) {
auto *task = tasks_.get(task_id);
CHECK(task);
if (task->state != Task::State::Pending) {
return;
}
@ -148,7 +180,7 @@ class ChainScheduler : public ChainSchedulerBase {
pending_tasks_.push(task_id);
for_each_child(task, [&](auto task_id) {
try_start_task(task_id, tasks_.get(task_id));
try_start_task(task_id);
});
}
@ -163,8 +195,10 @@ class ChainScheduler : public ChainSchedulerBase {
}
}
void inactivate_task(TaskId task_id, Task *task) {
LOG(ERROR) << "inactivate " << task_id;
void inactivate_task(TaskId task_id, bool failed) {
LOG(ERROR) << "inactivate " << task_id << " " << (failed ? "failed" : "finished");
auto *task = tasks_.get(task_id);
CHECK(task);
bool was_active = task->state == Task::State::Active;
task->state = Task::State::Pending;
for (TaskChainInfo &task_chain_info : task->chains) {
@ -172,6 +206,9 @@ class ChainScheduler : public ChainSchedulerBase {
if (was_active) {
chain_info.active_tasks--;
}
if (was_active && failed) {
chain_info.generation = std::max(chain_info.generation, task_chain_info.chain_node.generation + 1);
}
auto it = limited_tasks_.find(task_chain_info.chain_id);
if (it != limited_tasks_.end()) {
@ -181,6 +218,7 @@ class ChainScheduler : public ChainSchedulerBase {
try_start_task_later(limited_task_id);
}
}
auto o_first = chain_info.chain.get_first();
if (o_first) {
auto first_task_id = o_first.unwrap();
@ -201,12 +239,13 @@ class ChainScheduler : public ChainSchedulerBase {
std::vector<TaskId> to_start;
void try_start_task_later(TaskId task_id) {
LOG(ERROR) << "try start later " << task_id;
to_start.push_back(task_id);
}
void flush_try_start_task() {
auto moved_to_start = std::move(to_start);
for (auto task_id : moved_to_start) {
try_start_task(task_id, tasks_.get(task_id));
try_start_task(task_id);
}
CHECK(to_start.empty());
}
@ -224,6 +263,7 @@ typename ChainScheduler<ExtraT>::TaskId ChainScheduler<ExtraT>::create_task(Span
task_chain_info.chain_id = chain_id;
task_chain_info.chain_info = &chain_info;
task_chain_info.chain_node.task_id = task_id;
task_chain_info.chain_node.generation = 0;
return task_chain_info;
});
@ -232,10 +272,11 @@ typename ChainScheduler<ExtraT>::TaskId ChainScheduler<ExtraT>::create_task(Span
chain_info.chain.add_task(&task_chain_info.chain_node);
}
try_start_task(task_id, &task);
try_start_task(task_id);
return task_id;
}
// TODO: return reference
template <class ExtraT>
ExtraT *ChainScheduler<ExtraT>::get_task_extra(ChainScheduler::TaskId task_id) { // may return nullptr
auto *task = tasks_.get(task_id);
@ -269,9 +310,10 @@ template <class ExtraT>
void ChainScheduler<ExtraT>::finish_task(ChainScheduler::TaskId task_id) {
auto *task = tasks_.get(task_id);
CHECK(task);
CHECK(to_start.empty());
inactivate_task(task_id, task);
inactivate_task(task_id, false);
for_each_child(task, [&](auto task_id) {
try_start_task_later(task_id);
});
@ -280,26 +322,28 @@ void ChainScheduler<ExtraT>::finish_task(ChainScheduler::TaskId task_id) {
finish_chain_task(task_chain_info);
}
auto task_copy = std::move(*task);
tasks_.erase(task_id);
flush_try_start_task();
}
template <class ExtraT>
void ChainScheduler<ExtraT>::reset_task(ChainScheduler::TaskId task_id) {
CHECK(to_start.empty());
auto *task = tasks_.get(task_id);
CHECK(task);
inactivate_task(task_id, task);
for (TaskChainInfo &task_chain_info : task->chains) {
ChainInfo &chain_info = *task_chain_info.chain_info;
chain_info.generation = td::max(chain_info.generation, task_chain_info.chain_node.generation + 1);
}
inactivate_task(task_id, true);
try_start_task_later(task_id);
flush_try_start_task();
}
template <class ExtraT>
void ChainScheduler<ExtraT>::pause_task(TaskId task_id) {
auto *task = tasks_.get(task_id);
CHECK(task);
inactivate_task(task_id, true);
task->state = Task::State::Paused;
}
template <class ExtraT>
StringBuilder &operator<<(StringBuilder &sb, ChainScheduler<ExtraT> &scheduler) {
// 1 print chains