Minor improvements.

This commit is contained in:
levlam 2022-02-01 18:25:02 +03:00
parent 7c9d698023
commit 950876b496
3 changed files with 42 additions and 40 deletions

View File

@ -347,10 +347,10 @@ class MultiSequenceDispatcherNewImpl final : public MultiSequenceDispatcherNew {
}
});
for (auto task_id : to_check_timeout) {
if (check_timeout(task_id)) {
scheduler_.pause_task(task_id);
try_resend(task_id);
for (auto dependent_task_id : to_check_timeout) {
if (check_timeout(dependent_task_id)) {
scheduler_.pause_task(dependent_task_id);
try_resend(dependent_task_id);
}
}
}

View File

@ -10,6 +10,7 @@
#include "td/utils/common.h"
#include "td/utils/Container.h"
#include "td/utils/List.h"
#include "td/utils/logging.h"
#include "td/utils/optional.h"
#include "td/utils/Span.h"
#include "td/utils/StringBuilder.h"
@ -29,7 +30,7 @@ struct ChainSchedulerBase {
};
template <class ExtraT = Unit>
class ChainScheduler : public ChainSchedulerBase {
class ChainScheduler final : public ChainSchedulerBase {
public:
using TaskId = uint64;
using ChainId = uint64;
@ -117,8 +118,8 @@ class ChainScheduler : public ChainSchedulerBase {
f(node.task_id, node.generation);
}
}
void foreach_child(ListNode *node, std::function<void(TaskId, uint64)> f) const {
for (auto it = node; it != head_.end(); it = it->get_next()) {
void foreach_child(ListNode *start_node, std::function<void(TaskId, uint64)> f) const {
for (auto it = start_node; it != head_.end(); it = it->get_next()) {
auto &node = static_cast<const ChainNode &>(*it);
f(node.task_id, node.generation);
}
@ -180,9 +181,7 @@ class ChainScheduler : public ChainSchedulerBase {
task->state = Task::State::Active;
pending_tasks_.push(task_id);
for_each_child(task, [&](auto task_id) {
try_start_task(task_id);
});
for_each_child(task, [&](auto task_id) { try_start_task(task_id); });
}
template <class F>
@ -197,7 +196,7 @@ class ChainScheduler : public ChainSchedulerBase {
}
void inactivate_task(TaskId task_id, bool failed) {
LOG(ERROR) << "inactivate " << task_id << " " << (failed ? "failed" : "finished");
LOG(DEBUG) << "Inactivate " << task_id << " " << (failed ? "failed" : "finished");
auto *task = tasks_.get(task_id);
CHECK(task);
bool was_active = task->state == Task::State::Active;
@ -238,17 +237,19 @@ class ChainScheduler : public ChainSchedulerBase {
}
}
std::vector<TaskId> to_start;
vector<TaskId> to_start_;
void try_start_task_later(TaskId task_id) {
LOG(ERROR) << "try start later " << task_id;
to_start.push_back(task_id);
LOG(DEBUG) << "Try to start later " << task_id;
to_start_.push_back(task_id);
}
void flush_try_start_task() {
auto moved_to_start = std::move(to_start);
auto moved_to_start = std::move(to_start_);
for (auto task_id : moved_to_start) {
try_start_task(task_id);
}
CHECK(to_start.empty());
CHECK(to_start_.empty());
}
};
@ -311,13 +312,11 @@ template <class ExtraT>
void ChainScheduler<ExtraT>::finish_task(ChainScheduler::TaskId task_id) {
auto *task = tasks_.get(task_id);
CHECK(task);
CHECK(to_start.empty());
CHECK(to_start_.empty());
inactivate_task(task_id, false);
for_each_child(task, [&](auto task_id) {
try_start_task_later(task_id);
});
for_each_child(task, [&](auto task_id) { try_start_task_later(task_id); });
for (TaskChainInfo &task_chain_info : task->chains) {
finish_chain_task(task_chain_info);
@ -329,7 +328,7 @@ void ChainScheduler<ExtraT>::finish_task(ChainScheduler::TaskId task_id) {
template <class ExtraT>
void ChainScheduler<ExtraT>::reset_task(ChainScheduler::TaskId task_id) {
CHECK(to_start.empty());
CHECK(to_start_.empty());
auto *task = tasks_.get(task_id);
CHECK(task);
inactivate_task(task_id, true);
@ -355,9 +354,8 @@ StringBuilder &operator<<(StringBuilder &sb, ChainScheduler<ExtraT> &scheduler)
sb << " active_cnt=" << it.second.active_tasks;
sb << " g=" << it.second.generation;
sb << " :";
it.second.chain.foreach([&](auto task_id, auto generation) {
sb << " " << *scheduler.get_task_extra(task_id) << ":" << generation;
});
it.second.chain.foreach(
[&](auto task_id, auto generation) { sb << " " << *scheduler.get_task_extra(task_id) << ":" << generation; });
sb << "\n";
}
scheduler.tasks_.for_each([&](auto id, auto &task) {

View File

@ -60,7 +60,7 @@ TEST(ChainScheduler, SendAfterRestart) {
scheduler.reset_task(first_task_id);
auto third_task_id = scheduler.create_task( chains, 3);
scheduler.create_task(chains, 3);
ASSERT_EQ(first_task_id, scheduler.start_next_task().unwrap().task_id);
ASSERT_TRUE(!scheduler.start_next_task());
@ -102,18 +102,22 @@ struct Query;
using QueryPtr = std::shared_ptr<Query>;
using ChainId = td::ChainScheduler<QueryPtr>::ChainId;
using TaskId = td::ChainScheduler<QueryPtr>::TaskId;
struct Query {
int id{};
TaskId task_id{};
bool is_ok{};
bool skipped{};
friend td::StringBuilder &operator << (td::StringBuilder &sb, const Query &q) {
};
static td::StringBuilder &operator<<(td::StringBuilder &sb, const Query &q) {
return sb << "Q{" << q.id << "}";
}
};
td::StringBuilder &operator << (td::StringBuilder &sb, const QueryPtr &query_ptr) {
static td::StringBuilder &operator<<(td::StringBuilder &sb, const QueryPtr &query_ptr) {
return sb << *query_ptr;
}
TEST(ChainScheduler, Stress) {
td::Random::Xorshift128plus rnd(123);
int max_query_id = 100000;
@ -184,11 +188,11 @@ TEST(ChainScheduler, Stress) {
sent_cnt++;
}
};
auto skip_one_query = [&]() {
auto skip_one_query = [&] {
if (pending_queries.empty()) {
return;
}
auto it = pending_queries.begin() + rnd.fast(0, (int)pending_queries.size() - 1);
auto it = pending_queries.begin() + rnd.fast(0, static_cast<int>(pending_queries.size()) - 1);
auto task_id = *it;
pending_queries.erase(it);
td::remove_if(active_queries, [&](auto &q) { return q.task_id == task_id; });