Improve ChainScheduler.
This commit is contained in:
parent
634767d986
commit
0b33522821
@ -405,7 +405,7 @@ class MultiSequenceDispatcherImpl final : public MultiSequenceDispatcher {
|
||||
|
||||
void tear_down() final {
|
||||
// Leaves scheduler_ in an invalid state, but we are closing anyway
|
||||
scheduler_.for_each([&](Node &node) {
|
||||
scheduler_.for_each([](Node &node) {
|
||||
if (node.net_query.empty()) {
|
||||
return;
|
||||
}
|
||||
|
@ -17,8 +17,8 @@
|
||||
#include "td/utils/VectorQueue.h"
|
||||
|
||||
#include <functional>
|
||||
#include <map>
|
||||
#include <set>
|
||||
#include <unordered_map>
|
||||
#include <unordered_set>
|
||||
|
||||
namespace td {
|
||||
|
||||
@ -34,41 +34,38 @@ class ChainScheduler final : public ChainSchedulerBase {
|
||||
public:
|
||||
using TaskId = uint64;
|
||||
using ChainId = uint64;
|
||||
|
||||
TaskId create_task(Span<ChainId> chains, ExtraT extra = {});
|
||||
|
||||
ExtraT *get_task_extra(TaskId task_id);
|
||||
|
||||
optional<TaskWithParents> start_next_task();
|
||||
|
||||
void pause_task(TaskId task_id);
|
||||
|
||||
void finish_task(TaskId task_id);
|
||||
|
||||
void reset_task(TaskId task_id);
|
||||
template <class ExtraTT>
|
||||
friend StringBuilder &operator<<(StringBuilder &sb, ChainScheduler<ExtraTT> &scheduler);
|
||||
|
||||
template <class F>
|
||||
void for_each(F &&f) {
|
||||
tasks_.for_each([&f](auto, Task &task) { f(task.extra); });
|
||||
}
|
||||
|
||||
template <class F>
|
||||
void for_each_dependent(TaskId task_id, F &&f) {
|
||||
auto *task = tasks_.get(task_id);
|
||||
CHECK(task);
|
||||
std::set<TaskId> visited;
|
||||
CHECK(task != nullptr);
|
||||
std::unordered_set<TaskId> visited;
|
||||
bool check_for_collisions = task->chains.size() > 1;
|
||||
for (TaskChainInfo &task_chain_info : task->chains) {
|
||||
ChainInfo &chain_info = *task_chain_info.chain_info;
|
||||
chain_info.chain.foreach_child(&task_chain_info.chain_node, [&](auto task_id, auto) {
|
||||
if (check_for_collisions) {
|
||||
auto it_ok = visited.insert(task_id);
|
||||
if (!it_ok.second) {
|
||||
return;
|
||||
}
|
||||
chain_info.chain.foreach_child(&task_chain_info.chain_node, [&](TaskId task_id, uint64) {
|
||||
if (check_for_collisions && !visited.insert(task_id).second) {
|
||||
return;
|
||||
}
|
||||
f(task_id);
|
||||
});
|
||||
auto o_child = chain_info.chain.get_child(&task_chain_info.chain_node);
|
||||
if (o_child) {
|
||||
f(o_child.value());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -143,14 +140,14 @@ class ChainScheduler final : public ChainSchedulerBase {
|
||||
vector<TaskChainInfo> chains;
|
||||
ExtraT extra;
|
||||
};
|
||||
std::map<ChainId, ChainInfo> chains_;
|
||||
std::map<ChainId, TaskId> limited_tasks_;
|
||||
std::unordered_map<ChainId, ChainInfo> chains_;
|
||||
std::unordered_map<ChainId, TaskId> limited_tasks_;
|
||||
Container<Task> tasks_;
|
||||
VectorQueue<TaskId> pending_tasks_;
|
||||
|
||||
void try_start_task(TaskId task_id) {
|
||||
auto *task = tasks_.get(task_id);
|
||||
CHECK(task);
|
||||
CHECK(task != nullptr);
|
||||
if (task->state != Task::State::Pending) {
|
||||
return;
|
||||
}
|
||||
@ -198,7 +195,7 @@ class ChainScheduler final : public ChainSchedulerBase {
|
||||
void inactivate_task(TaskId task_id, bool failed) {
|
||||
LOG(DEBUG) << "Inactivate " << task_id << " " << (failed ? "failed" : "finished");
|
||||
auto *task = tasks_.get(task_id);
|
||||
CHECK(task);
|
||||
CHECK(task != nullptr);
|
||||
bool was_active = task->state == Task::State::Active;
|
||||
task->state = Task::State::Pending;
|
||||
for (TaskChainInfo &task_chain_info : task->chains) {
|
||||
@ -251,6 +248,9 @@ class ChainScheduler final : public ChainSchedulerBase {
|
||||
}
|
||||
CHECK(to_start_.empty());
|
||||
}
|
||||
|
||||
template <class ExtraTT>
|
||||
friend StringBuilder &operator<<(StringBuilder &sb, ChainScheduler<ExtraTT> &scheduler);
|
||||
};
|
||||
|
||||
template <class ExtraT>
|
||||
@ -282,7 +282,7 @@ typename ChainScheduler<ExtraT>::TaskId ChainScheduler<ExtraT>::create_task(Span
|
||||
template <class ExtraT>
|
||||
ExtraT *ChainScheduler<ExtraT>::get_task_extra(ChainScheduler::TaskId task_id) { // may return nullptr
|
||||
auto *task = tasks_.get(task_id);
|
||||
if (!task) {
|
||||
if (task == nullptr) {
|
||||
return nullptr;
|
||||
}
|
||||
return &task->extra;
|
||||
@ -297,7 +297,7 @@ optional<ChainSchedulerBase::TaskWithParents> ChainScheduler<ExtraT>::start_next
|
||||
TaskWithParents res;
|
||||
res.task_id = task_id;
|
||||
auto *task = tasks_.get(task_id);
|
||||
CHECK(task);
|
||||
CHECK(task != nullptr);
|
||||
for (TaskChainInfo &task_chain_info : task->chains) {
|
||||
Chain &chain = task_chain_info.chain_info->chain;
|
||||
auto o_parent = chain.get_parent(&task_chain_info.chain_node);
|
||||
@ -311,7 +311,7 @@ optional<ChainSchedulerBase::TaskWithParents> ChainScheduler<ExtraT>::start_next
|
||||
template <class ExtraT>
|
||||
void ChainScheduler<ExtraT>::finish_task(ChainScheduler::TaskId task_id) {
|
||||
auto *task = tasks_.get(task_id);
|
||||
CHECK(task);
|
||||
CHECK(task != nullptr);
|
||||
CHECK(to_start_.empty());
|
||||
|
||||
inactivate_task(task_id, false);
|
||||
@ -330,7 +330,7 @@ template <class ExtraT>
|
||||
void ChainScheduler<ExtraT>::reset_task(ChainScheduler::TaskId task_id) {
|
||||
CHECK(to_start_.empty());
|
||||
auto *task = tasks_.get(task_id);
|
||||
CHECK(task);
|
||||
CHECK(task != nullptr);
|
||||
inactivate_task(task_id, true);
|
||||
try_start_task_later(task_id);
|
||||
flush_try_start_task();
|
||||
@ -339,7 +339,7 @@ void ChainScheduler<ExtraT>::reset_task(ChainScheduler::TaskId task_id) {
|
||||
template <class ExtraT>
|
||||
void ChainScheduler<ExtraT>::pause_task(TaskId task_id) {
|
||||
auto *task = tasks_.get(task_id);
|
||||
CHECK(task);
|
||||
CHECK(task != nullptr);
|
||||
inactivate_task(task_id, true);
|
||||
task->state = Task::State::Paused;
|
||||
flush_try_start_task();
|
||||
@ -348,26 +348,26 @@ void ChainScheduler<ExtraT>::pause_task(TaskId task_id) {
|
||||
template <class ExtraT>
|
||||
StringBuilder &operator<<(StringBuilder &sb, ChainScheduler<ExtraT> &scheduler) {
|
||||
// 1 print chains
|
||||
sb << "\n";
|
||||
sb << '\n';
|
||||
for (auto &it : scheduler.chains_) {
|
||||
sb << "ChainId{" << it.first << "} ";
|
||||
sb << " active_cnt=" << it.second.active_tasks;
|
||||
sb << " g=" << it.second.generation;
|
||||
sb << " :";
|
||||
sb << "ChainId{" << it.first << "}";
|
||||
sb << " active_cnt = " << it.second.active_tasks;
|
||||
sb << " g = " << it.second.generation;
|
||||
sb << ':';
|
||||
it.second.chain.foreach(
|
||||
[&](auto task_id, auto generation) { sb << " " << *scheduler.get_task_extra(task_id) << ":" << generation; });
|
||||
sb << "\n";
|
||||
[&](auto task_id, auto generation) { sb << ' ' << *scheduler.get_task_extra(task_id) << ':' << generation; });
|
||||
sb << '\n';
|
||||
}
|
||||
scheduler.tasks_.for_each([&](auto id, auto &task) {
|
||||
sb << "Task: " << task.extra;
|
||||
sb << " state =" << static_cast<int>(task.state);
|
||||
sb << " state = " << static_cast<int>(task.state);
|
||||
for (auto &task_chain_info : task.chains) {
|
||||
sb << " g=" << task_chain_info.chain_node.generation;
|
||||
sb << " g = " << task_chain_info.chain_node.generation;
|
||||
if (task_chain_info.chain_info->generation != task_chain_info.chain_node.generation) {
|
||||
sb << " chain_g=" << task_chain_info.chain_info->generation;
|
||||
sb << " chain_g = " << task_chain_info.chain_info->generation;
|
||||
}
|
||||
}
|
||||
sb << "\n";
|
||||
sb << '\n';
|
||||
});
|
||||
return sb;
|
||||
}
|
||||
|
@ -201,7 +201,7 @@ TEST(ChainScheduler, Stress) {
|
||||
query->skipped = true;
|
||||
scheduler.finish_task(task_id);
|
||||
inflight_queries--;
|
||||
LOG(ERROR) << "Skip " << query->id;
|
||||
LOG(INFO) << "Skip " << query->id;
|
||||
};
|
||||
auto execute_one_query = [&] {
|
||||
if (active_queries.empty()) {
|
||||
@ -225,7 +225,7 @@ TEST(ChainScheduler, Stress) {
|
||||
LOG(INFO) << "OK " << query->id;
|
||||
} else {
|
||||
scheduler.reset_task(query->task_id);
|
||||
LOG(ERROR) << "Reset " << query->id;
|
||||
LOG(INFO) << "Reset " << query->id;
|
||||
}
|
||||
};
|
||||
|
||||
@ -235,8 +235,8 @@ TEST(ChainScheduler, Stress) {
|
||||
flush_pending_queries();
|
||||
// LOG(INFO) << scheduler;
|
||||
}
|
||||
LOG(ERROR) << "Sent queries count " << sent_cnt;
|
||||
LOG(ERROR) << "Total queries " << current_query_id;
|
||||
LOG(INFO) << "Sent queries count " << sent_cnt;
|
||||
LOG(INFO) << "Total queries " << current_query_id;
|
||||
for (auto &chain : chains) {
|
||||
int prev_ok = -1;
|
||||
int failed_cnt = 0;
|
||||
|
Loading…
Reference in New Issue
Block a user