Minor fixes.

This commit is contained in:
levlam 2022-01-31 15:56:44 +03:00
parent ccd450789b
commit b1b945e239
19 changed files with 117 additions and 83 deletions

View File

@ -14,7 +14,9 @@
#include "td/mtproto/mtproto_api.h"
#include "td/utils/common.h"
#include "td/utils/misc.h"
#include "td/utils/Slice.h"
#include "td/utils/Span.h"
#include "td/utils/StorerBase.h"
#include "td/utils/Time.h"
@ -105,7 +107,7 @@ class CancelVectorImpl {
class InvokeAfter {
public:
explicit InvokeAfter(Span<uint64> ids): ids_(ids){
explicit InvokeAfter(Span<uint64> ids) : ids_(ids) {
}
template <class StorerT>
void store(StorerT &storer) const {
@ -120,11 +122,12 @@ class InvokeAfter {
// invokeAfterMsgs#3dc4b4f0 {X:Type} msg_ids:Vector<long> query:!X = X;
storer.store_int(static_cast<int32>(0x3dc4b4f0));
storer.store_int(static_cast<int32>(0x1cb5c415));
storer.store_int(narrow_cast<int>(ids_.size()));
storer.store_int(narrow_cast<int32>(ids_.size()));
for (auto id : ids_) {
storer.store_long(static_cast<int64>(id));
}
}
private:
Span<uint64> ids_;
};

View File

@ -774,7 +774,8 @@ Result<uint64> SessionConnection::send_query(BufferSlice buffer, bool gzip_flag,
if (to_send_.empty()) {
send_before(Time::now_cached() + QUERY_DELAY);
}
to_send_.push_back(MtprotoQuery{message_id, seq_no, std::move(buffer), gzip_flag, std::move(invoke_after_ids), use_quick_ack});
to_send_.push_back(
MtprotoQuery{message_id, seq_no, std::move(buffer), gzip_flag, std::move(invoke_after_ids), use_quick_ack});
VLOG(mtproto) << "Invoke query " << message_id << " of size " << to_send_.back().packet.size() << " with seq_no "
<< seq_no << " after " << invoke_after_ids << (use_quick_ack ? " with quick ack" : "");

View File

@ -134,7 +134,7 @@ Document DocumentsManager::on_get_document(RemoteDocument remote_document, Dialo
// video animation
video = nullptr;
}
} else if (sticker != nullptr && false) {
} else if (sticker != nullptr) {
// some stickers uploaded before release
type_attributes--;
video = nullptr;

View File

@ -6,17 +6,23 @@
//
#include "td/telegram/MessageReaction.h"
#include "td/telegram/AccessRights.h"
#include "td/telegram/ContactsManager.h"
#include "td/telegram/Global.h"
#include "td/telegram/MessageSender.h"
#include "td/telegram/MessagesManager.h"
#include "td/telegram/ServerMessageId.h"
#include "td/telegram/Td.h"
#include "td/telegram/UpdatesManager.h"
#include "td/utils/algorithm.h"
#include "td/utils/buffer.h"
#include "td/utils/logging.h"
#include "td/utils/Status.h"
#include <algorithm>
#include <unordered_set>
#include <utility>
namespace td {
@ -365,7 +371,7 @@ void MessageReactions::update_from(const MessageReactions &old_reactions) {
}
}
void MessageReactions::sort(const std::unordered_map<string, size_t> &active_reaction_pos) {
void MessageReactions::sort_reactions(const std::unordered_map<string, size_t> &active_reaction_pos) {
std::sort(reactions_.begin(), reactions_.end(),
[&active_reaction_pos](const MessageReaction &lhs, const MessageReaction &rhs) {
if (lhs.get_choose_count() != rhs.get_choose_count()) {

View File

@ -147,7 +147,7 @@ struct MessageReactions {
void update_from(const MessageReactions &old_reactions);
void sort(const std::unordered_map<string, size_t> &active_reaction_pos);
void sort_reactions(const std::unordered_map<string, size_t> &active_reaction_pos);
static bool need_update_message_reactions(const MessageReactions *old_reactions,
const MessageReactions *new_reactions);

View File

@ -7104,7 +7104,7 @@ bool MessagesManager::update_message_interaction_info(DialogId dialog_id, Messag
if (m->reactions != nullptr) {
reactions->update_from(*m->reactions);
}
reactions->sort(active_reaction_pos_);
reactions->sort_reactions(active_reaction_pos_);
}
bool need_update_reactions =
has_reactions && MessageReactions::need_update_message_reactions(m->reactions.get(), reactions.get());
@ -24175,7 +24175,7 @@ void MessagesManager::set_message_reaction(FullMessageId full_message_id, string
}
m->reactions->reactions_.emplace_back(reaction, 1, true, std::move(recent_chooser_dialog_ids), Auto());
}
m->reactions->sort(active_reaction_pos_);
m->reactions->sort_reactions(active_reaction_pos_);
send_update_message_interaction_info(dialog_id, m);
on_message_changed(d, m, true, "set_message_reaction");
@ -35344,7 +35344,7 @@ bool MessagesManager::update_message(Dialog *d, Message *old_message, unique_ptr
need_send_update = true;
}
if (old_message->restriction_reasons != new_message->restriction_reasons) {
LOG(DEBUG) << "Message restriction_reasons has changed from " << old_message->restriction_reasons << " to "
LOG(DEBUG) << "Message restriction_reasons have changed from " << old_message->restriction_reasons << " to "
<< old_message->restriction_reasons;
old_message->restriction_reasons = std::move(new_message->restriction_reasons);
need_send_update = true;

View File

@ -24,9 +24,11 @@
#include "td/telegram/SuggestedAction.h"
#include "td/telegram/Td.h"
#include "td/telegram/TdDb.h"
#include "td/telegram/telegram_api.h"
#include "td/telegram/TopDialogManager.h"
#include "td/utils/buffer.h"
#include "td/utils/logging.h"
#include "td/utils/misc.h"
#include "td/utils/SliceBuilder.h"
#include "td/utils/Status.h"

View File

@ -11,12 +11,14 @@
#include "td/actor/PromiseFuture.h"
#include "td/utils/algorithm.h"
#include "td/utils/ChainScheduler.h"
#include "td/utils/format.h"
#include "td/utils/logging.h"
#include "td/utils/misc.h"
#include "td/utils/SliceBuilder.h"
#include "td/utils/Status.h"
#include "td/utils/StringBuilder.h"
#include <limits>
@ -247,8 +249,8 @@ void SequenceDispatcher::close_silent() {
/*** MultiSequenceDispatcher ***/
void MultiSequenceDispatcherOld::send_with_callback(NetQueryPtr query, ActorShared<NetQueryCallback> callback,
td::Span<uint64> chains) {
CHECK(all_of(chains, [](auto chain_id) {return chain_id != 0;}));
Span<uint64> chains) {
CHECK(all_of(chains, [](auto chain_id) { return chain_id != 0; }));
CHECK(!chains.empty());
auto sequence_id = chains[0];
@ -280,8 +282,8 @@ void MultiSequenceDispatcherOld::ready_to_close() {
class MultiSequenceDispatcherNewImpl final : public MultiSequenceDispatcherNew {
public:
void send_with_callback(NetQueryPtr query, ActorShared<NetQueryCallback> callback, td::Span<uint64> chains) final {
CHECK(all_of(chains, [](auto chain_id) {return chain_id != 0;}));
void send_with_callback(NetQueryPtr query, ActorShared<NetQueryCallback> callback, Span<uint64> chains) final {
CHECK(all_of(chains, [](auto chain_id) { return chain_id != 0; }));
if (!chains.empty()) {
query->set_session_rand(static_cast<uint32>(chains[0] >> 10));
}
@ -299,7 +301,7 @@ class MultiSequenceDispatcherNewImpl final : public MultiSequenceDispatcherNew {
NetQueryRef net_query_ref;
NetQueryPtr net_query;
ActorShared<NetQueryCallback> callback;
friend StringBuilder &operator << (StringBuilder &sb, const Node &node) {
friend StringBuilder &operator<<(StringBuilder &sb, const Node &node) {
return sb << node.net_query;
}
};
@ -307,7 +309,7 @@ class MultiSequenceDispatcherNewImpl final : public MultiSequenceDispatcherNew {
using TaskId = ChainScheduler<NetQueryPtr>::TaskId;
using ChainId = ChainScheduler<NetQueryPtr>::ChainId;
void on_result(NetQueryPtr query) override {
void on_result(NetQueryPtr query) final {
auto task_id = TaskId(get_link_token());
auto &node = *scheduler_.get_task_extra(task_id);
@ -322,7 +324,7 @@ class MultiSequenceDispatcherNewImpl final : public MultiSequenceDispatcherNew {
send_closure(node.callback, &NetQueryCallback::on_result_resendable, std::move(query), std::move(promise));
}
void on_resend(td::Result<NetQueryPtr> query) {
void on_resend(Result<NetQueryPtr> query) {
auto task_id = TaskId(get_link_token());
auto &node = *scheduler_.get_task_extra(task_id);
if (query.is_error()) {
@ -336,10 +338,11 @@ class MultiSequenceDispatcherNewImpl final : public MultiSequenceDispatcherNew {
loop();
}
void loop() override {
void loop() final {
flush_pending_queries();
}
void tear_down() override {
void tear_down() final {
// Leaves scheduler_ in an invalid state, but we are closing anyway
scheduler_.for_each([&](Node &node) {
if (node.net_query.empty()) {
@ -360,7 +363,7 @@ class MultiSequenceDispatcherNewImpl final : public MultiSequenceDispatcherNew {
CHECK(!node.net_query.empty());
auto query = std::move(node.net_query);
std::vector<NetQueryRef> parents;
vector<NetQueryRef> parents;
for (auto parent_id : task.parents) {
auto &parent_node = *scheduler_.get_task_extra(parent_id);
parents.push_back(parent_node.net_query_ref);
@ -368,11 +371,10 @@ class MultiSequenceDispatcherNewImpl final : public MultiSequenceDispatcherNew {
}
query->set_invoke_after(std::move(parents));
query->last_timeout_ = 0; // TODO: flood
query->last_timeout_ = 0; // TODO: flood
VLOG(net_query) << "Send " << query;
query->debug("send to Td::send_with_callback");
G()->net_query_dispatcher().dispatch_with_callback(std::move(query),
actor_shared(this, task.task_id));
G()->net_query_dispatcher().dispatch_with_callback(std::move(query), actor_shared(this, task.task_id));
}
}
};

View File

@ -12,6 +12,8 @@
#include "td/utils/common.h"
#include "td/utils/Random.h"
#include "td/utils/Slice.h"
#include "td/utils/Span.h"
#include <limits>
#include <unordered_map>
@ -46,7 +48,7 @@ class SequenceDispatcher final : public NetQueryCallback {
ActorShared<Parent> parent_;
size_t id_offset_ = 1;
std::vector<Data> data_;
vector<Data> data_;
size_t finish_i_ = 0; // skip state_ == State::Finish
size_t next_i_ = 0;
size_t last_sent_i_ = std::numeric_limits<size_t>::max();
@ -76,7 +78,7 @@ class SequenceDispatcher final : public NetQueryCallback {
class MultiSequenceDispatcherOld final : public SequenceDispatcher::Parent {
public:
void send_with_callback(NetQueryPtr query, ActorShared<NetQueryCallback> callback, Span<uint64> chains);
static ActorOwn<MultiSequenceDispatcherOld> create(td::Slice name) {
static ActorOwn<MultiSequenceDispatcherOld> create(Slice name) {
return create_actor<MultiSequenceDispatcherOld>(name);
}
@ -91,7 +93,7 @@ class MultiSequenceDispatcherOld final : public SequenceDispatcher::Parent {
};
using ChainId = uint64;
using ChainIds = std::vector<ChainId>;
using ChainIds = vector<ChainId>;
class MultiSequenceDispatcherNew : public NetQueryCallback {
public:
virtual void send_with_callback(NetQueryPtr query, ActorShared<NetQueryCallback> callback, Span<uint64> chains) = 0;

View File

@ -26,8 +26,6 @@
#include "td/utils/SliceBuilder.h"
#include "td/utils/Status.h"
#include <limits>
namespace td {
class GetSponsoredMessagesQuery final : public Td::ResultHandler {

View File

@ -9,7 +9,6 @@
#include "td/telegram/StickersManager.h"
#include "td/telegram/files/FileId.hpp"
#include "td/telegram/Global.h"
#include "td/telegram/misc.h"
#include "td/telegram/Photo.hpp"
#include "td/telegram/StickerFormat.h"

View File

@ -427,7 +427,7 @@ class CliClient final : public Actor {
static char get_delimiter(Slice str) {
std::unordered_set<char> chars;
for (auto c : trim(str)) {
if (!is_alnum(c) && c != '-' && c != '.' && c != '/' && static_cast<uint8_t>(c) <= 127) {
if (!is_alnum(c) && c != '-' && c != '.' && c != '/' && static_cast<uint8>(c) <= 127) {
chars.insert(c);
}
}

View File

@ -883,7 +883,7 @@ string FileManager::get_file_name(FileType file_type, Slice path) {
}
break;
case FileType::Sticker:
if (extension != "webp" && extension != "tgs") {
if (extension != "webp" && extension != "tgs" && extension != "webm") {
return fix_file_extension(file_name, "sticker", "webp");
}
break;
@ -3221,7 +3221,7 @@ FileType FileManager::guess_file_type(const tl_object_ptr<td_api::InputFile> &fi
if (extension == "mp3" || extension == "mpeg3" || extension == "m4a") {
return FileType::Audio;
}
if (extension == "webp" || extension == "tgs") {
if (extension == "webp" || extension == "tgs" || extension == "webm") {
return FileType::Sticker;
}
if (extension == "gif") {

View File

@ -35,6 +35,7 @@
#include "td/utils/Random.h"
#include "td/utils/Slice.h"
#include "td/utils/SliceBuilder.h"
#include "td/utils/Span.h"
#include "td/utils/Time.h"
#include "td/utils/Timer.h"
#include "td/utils/tl_parsers.h"
@ -1027,7 +1028,7 @@ void Session::connection_send_query(ConnectionInfo *info, NetQueryPtr &&net_quer
}
}
VLOG(net_query) << "Send query to connection " << net_query << " [msg_id:" << format::as_hex(message_id) << "]"
<< tag("invoke_after", td::transform(invoke_after_ids, [](auto id){return format::as_hex(id);}));
<< tag("invoke_after", transform(invoke_after_ids, [](auto id) { return format::as_hex(id); }));
net_query->set_message_id(message_id);
net_query->cancel_slot_.clear_event();
LOG_CHECK(sent_queries_.find(message_id) == sent_queries_.end()) << message_id;

View File

@ -184,6 +184,7 @@ set(TDUTILS_SOURCE
td/utils/BufferedUdp.h
td/utils/ByteFlow.h
td/utils/CancellationToken.h
td/utils/ChainScheduler.h
td/utils/ChangesProcessor.h
td/utils/check.h
td/utils/Closure.h

View File

@ -1,17 +1,22 @@
//
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#pragma once
#include "td/utils/algorithm.h"
#include "td/utils/common.h"
#include "td/utils/Container.h"
#include "td/utils/List.h"
#include "td/utils/optional.h"
#include "td/utils/Random.h"
#include "td/utils/Span.h"
#include "td/utils/StringBuilder.h"
#include "td/utils/VectorQueue.h"
#include <functional>
#include <map>
#include <vector>
#include <set>
#include <numeric>
namespace td {
@ -25,7 +30,7 @@ class ChainScheduler {
public:
using TaskId = uint64;
using ChainId = uint64;
TaskId create_task(td::Span<ChainId> chains, ExtraT extra = {});
TaskId create_task(Span<ChainId> chains, ExtraT extra = {});
ExtraT *get_task_extra(TaskId task_id);
optional<ChainSchedulerTaskWithParents> start_next_task();
@ -36,9 +41,7 @@ class ChainScheduler {
template <class F>
void for_each(F &&f) {
tasks_.for_each([&f](auto, Task &task) {
f(task.extra) ;
});
tasks_.for_each([&f](auto, Task &task) { f(task.extra); });
}
private:
@ -101,7 +104,7 @@ class ChainScheduler {
};
struct Task {
enum class State { Pending, Active } state{State::Pending};
std::vector<TaskChainInfo> chains;
vector<TaskChainInfo> chains;
ExtraT extra;
};
std::map<ChainId, ChainInfo> chains_;
@ -193,8 +196,10 @@ class ChainScheduler {
}
}
};
template <class ExtraT>
typename ChainScheduler<ExtraT>::TaskId ChainScheduler<ExtraT>::create_task(Span<ChainScheduler::ChainId> chains, ExtraT extra) {
typename ChainScheduler<ExtraT>::TaskId ChainScheduler<ExtraT>::create_task(Span<ChainScheduler::ChainId> chains,
ExtraT extra) {
auto task_id = tasks_.create();
Task &task = *tasks_.get(task_id);
task.extra = std::move(extra);
@ -210,12 +215,13 @@ typename ChainScheduler<ExtraT>::TaskId ChainScheduler<ExtraT>::create_task(Span
for (TaskChainInfo &task_chain_info : task.chains) {
auto &chain = task_chain_info.chain_info->chain;
chain.add_task(&task_chain_info.chain_node);
task_chain_info.waiting_for_parent = bool(chain.get_parent(&task_chain_info.chain_node));
task_chain_info.waiting_for_parent = static_cast<bool>(chain.get_parent(&task_chain_info.chain_node));
}
try_start_task(task_id, &task);
return task_id;
}
template <class ExtraT>
ExtraT *ChainScheduler<ExtraT>::get_task_extra(ChainScheduler::TaskId task_id) { // may return nullptr
auto *task = tasks_.get(task_id);
@ -224,6 +230,7 @@ ExtraT *ChainScheduler<ExtraT>::get_task_extra(ChainScheduler::TaskId task_id) {
}
return &task->extra;
}
template <class ExtraT>
optional<ChainSchedulerTaskWithParents> ChainScheduler<ExtraT>::start_next_task() {
if (pending_tasks_.empty()) {
@ -243,6 +250,7 @@ optional<ChainSchedulerTaskWithParents> ChainScheduler<ExtraT>::start_next_task(
}
return res;
}
template <class ExtraT>
void ChainScheduler<ExtraT>::finish_task(ChainScheduler::TaskId task_id) {
auto *task = tasks_.get(task_id);
@ -256,6 +264,7 @@ void ChainScheduler<ExtraT>::finish_task(ChainScheduler::TaskId task_id) {
}
tasks_.erase(task_id);
}
template <class ExtraT>
void ChainScheduler<ExtraT>::reset_task(ChainScheduler::TaskId task_id) {
auto *task = tasks_.get(task_id);
@ -264,11 +273,12 @@ void ChainScheduler<ExtraT>::reset_task(ChainScheduler::TaskId task_id) {
for (TaskChainInfo &task_chain_info : task->chains) {
ChainInfo &chain_info = chains_[task_chain_info.chain_id];
task_chain_info.waiting_for_parent = bool(chain_info.chain.get_parent(&task_chain_info.chain_node));
task_chain_info.waiting_for_parent = static_cast<bool>(chain_info.chain.get_parent(&task_chain_info.chain_node));
}
try_start_task(task_id, task);
}
template <class ExtraT>
StringBuilder &operator<<(StringBuilder &sb, ChainScheduler<ExtraT> &scheduler) {
// 1 print chains
@ -277,21 +287,22 @@ StringBuilder &operator<<(StringBuilder &sb, ChainScheduler<ExtraT> &scheduler)
sb << "ChainId{" << it.first << "} ";
sb << " active_cnt=" << it.second.active_tasks;
sb << " : ";
it.second.chain.foreach([&](auto task_id) {
sb << *scheduler.get_task_extra(task_id);
});
it.second.chain.foreach([&](auto task_id) { sb << *scheduler.get_task_extra(task_id); });
sb << "\n";
}
scheduler.tasks_.for_each([&](auto id, auto &task) {
sb << "Task: " << task.extra;
sb << " state =" << static_cast<int>(task.state);
for (auto& task_chain_info : task.chains) {
for (auto &task_chain_info : task.chains) {
if (task_chain_info.waiting_for_parent) {
sb << " wait " << *scheduler.get_task_extra(task_chain_info.chain_info->chain.get_parent(&task_chain_info.chain_node).value());
sb << " wait "
<< *scheduler.get_task_extra(
task_chain_info.chain_info->chain.get_parent(&task_chain_info.chain_node).value());
}
}
sb << "\n";
});
return sb;
}
} // namespace td

View File

@ -117,9 +117,10 @@ bool contains(const V &v, const T &value) {
}
return false;
}
template <class V, class F>
bool all_of(const V &v, F &&f) {
for (auto &x : v) {
for (const auto &x : v) {
if (!f(x)) {
return false;
}

View File

@ -1,13 +1,20 @@
//
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include "td/utils/algorithm.h"
#include "td/utils/misc.h"
#include "td/utils/optional.h"
#include "td/utils/Span.h"
#include "td/utils/tests.h"
#include "td/utils/Random.h"
#include "td/utils/ChainScheduler.h"
#include "td/utils/common.h"
#include "td/utils/logging.h"
#include "td/utils/misc.h"
#include "td/utils/Random.h"
#include "td/utils/Span.h"
#include "td/utils/StringBuilder.h"
#include "td/utils/tests.h"
#include <vector>
#include <memory>
#include <numeric>
TEST(ChainScheduler, Basic) {
@ -19,7 +26,7 @@ TEST(ChainScheduler, Basic) {
}
int j = 0;
while (j != 100) {
std::vector<TaskId> tasks;
td::vector<TaskId> tasks;
while (true) {
auto o_task_id = scheduler.start_next_task();
if (!o_task_id) {
@ -27,16 +34,16 @@ TEST(ChainScheduler, Basic) {
}
auto task_id = o_task_id.value().task_id;
auto extra = *scheduler.get_task_extra(task_id);
auto parents = td::transform(o_task_id.value().parents,
[&](auto parent) { return *scheduler.get_task_extra(parent); });
LOG(ERROR) << "start " << extra << parents;
auto parents =
td::transform(o_task_id.value().parents, [&](auto parent) { return *scheduler.get_task_extra(parent); });
LOG(INFO) << "Start " << extra << parents;
CHECK(extra == j);
j++;
tasks.push_back(task_id);
}
for (auto &task_id : tasks) {
auto extra = *scheduler.get_task_extra(task_id);
LOG(ERROR) << "finish " << extra;
LOG(INFO) << "Finish " << extra;
scheduler.finish_task(task_id);
}
}
@ -50,10 +57,11 @@ struct Query {
int id{};
TaskId task_id{};
bool is_ok{};
friend td::StringBuilder &operator << (td::StringBuilder &sb, const Query &q) {
friend td::StringBuilder &operator<<(td::StringBuilder &sb, const Query &q) {
return sb << "Q{" << q.id << "}";
}
};
TEST(ChainScheduler, Stress) {
td::Random::Xorshift128plus rnd(123);
int max_query_id = 1000;
@ -62,12 +70,12 @@ TEST(ChainScheduler, Stress) {
struct QueryWithParents {
QueryPtr id;
std::vector<QueryPtr> parents;
td::vector<QueryPtr> parents;
};
std::vector<QueryWithParents> active_queries;
td::vector<QueryWithParents> active_queries;
td::ChainScheduler<QueryPtr> scheduler;
std::vector<std::vector<QueryPtr>> chains(ChainsN);
td::vector<td::vector<QueryPtr>> chains(ChainsN);
int inflight_queries{};
int current_query_id{};
bool done = false;
@ -86,7 +94,7 @@ TEST(ChainScheduler, Stress) {
auto query = std::make_shared<Query>();
query->id = query_id;
int chain_n = rnd.fast(1, ChainsN);
std::vector<ChainId> chain_ids(ChainsN);
td::vector<ChainId> chain_ids(ChainsN);
std::iota(chain_ids.begin(), chain_ids.end(), 0);
td::random_shuffle(td::as_mutable_span(chain_ids), rnd);
chain_ids.resize(chain_n);
@ -98,14 +106,14 @@ TEST(ChainScheduler, Stress) {
inflight_queries++;
};
auto check_parents_ok = [&] (const QueryWithParents &query_with_parents) -> bool {
auto check_parents_ok = [&](const QueryWithParents &query_with_parents) -> bool {
return td::all_of(query_with_parents.parents, [](auto &parent) { return parent->is_ok; });
};
auto to_query_ptr = [&](TaskId task_id) {
return *scheduler.get_task_extra(task_id);
};
auto flush_pending_queries = [&]{
auto flush_pending_queries = [&] {
while (true) {
auto o_task_with_parents = scheduler.start_next_task();
if (!o_task_with_parents) {
@ -118,11 +126,11 @@ TEST(ChainScheduler, Stress) {
active_queries.push_back(query_with_parents);
}
};
auto execute_one_query = [&]() {
auto execute_one_query = [&] {
if (active_queries.empty()) {
return;
}
auto it = active_queries.begin() + rnd.fast(0, (int)active_queries.size() - 1);
auto it = active_queries.begin() + rnd.fast(0, static_cast<int>(active_queries.size()) - 1);
auto query_with_parents = *it;
active_queries.erase(it);
@ -130,12 +138,12 @@ TEST(ChainScheduler, Stress) {
if (rnd.fast(0, 20) == 0) {
scheduler.finish_task(query->task_id);
inflight_queries--;
LOG(ERROR) << "Fail " << query->id;
LOG(INFO) << "Fail " << query->id;
} else if (check_parents_ok(query_with_parents)) {
query->is_ok = true;
scheduler.finish_task(query->task_id);
inflight_queries--;
LOG(ERROR) << "OK " << query->id;
LOG(INFO) << "OK " << query->id;
} else {
scheduler.reset_task(query->task_id);
}
@ -145,7 +153,7 @@ TEST(ChainScheduler, Stress) {
while (!done) {
steps.step(rnd);
flush_pending_queries();
// LOG(ERROR) << scheduler;
// LOG(INFO) << scheduler;
}
for (auto &chain : chains) {
int prev_ok = -1;
@ -153,13 +161,13 @@ TEST(ChainScheduler, Stress) {
int ok_cnt = 0;
for (auto &q : chain) {
if (q->is_ok) {
CHECK(prev_ok < q->id) ;
CHECK(prev_ok < q->id);
prev_ok = q->id;
ok_cnt++;
} else {
failed_cnt++;
}
}
LOG(ERROR) << "Chain ok " << ok_cnt << " failed " << failed_cnt;
LOG(INFO) << "Chain ok " << ok_cnt << " failed " << failed_cnt;
}
}

View File

@ -4,6 +4,7 @@
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include "td/utils/algorithm.h"
#include "td/utils/buffer.h"
#include "td/utils/ByteFlow.h"
#include "td/utils/common.h"
@ -16,8 +17,6 @@
#include "td/utils/tests.h"
#include "td/utils/Time.h"
#include <algorithm>
static void encode_decode(const td::string &s) {
auto r = td::gzencode(s, 2);
ASSERT_TRUE(!r.empty());
@ -128,7 +127,7 @@ TEST(Gzip, encode_decode_flow_big) {
td::clear_thread_locals();
auto start_mem = td::BufferAllocator::get_buffer_mem();
{
auto str = std::string(200000, 'a');
auto str = td::string(200000, 'a');
td::ChainBufferWriter input_writer;
auto input = input_writer.extract_reader();
td::ByteFlowSource source(&input);
@ -145,7 +144,7 @@ TEST(Gzip, encode_decode_flow_big) {
auto validate = [&](td::Slice chunk) {
CHECK(chunk.size() <= left_size);
left_size -= chunk.size();
ASSERT_TRUE(std::all_of(chunk.begin(), chunk.end(), [](auto c) { return c == 'a'; }));
ASSERT_TRUE(td::all_of(chunk, [](auto c) { return c == 'a'; }));
};
for (size_t i = 0; i < n; i++) {
@ -171,7 +170,7 @@ TEST(Gzip, encode_decode_flow_big) {
}
TEST(Gzip, decode_encode_flow_bomb) {
std::string gzip_bomb_str;
td::string gzip_bomb_str;
size_t N = 200;
{
td::ChainBufferWriter input_writer;
@ -181,7 +180,7 @@ TEST(Gzip, decode_encode_flow_bomb) {
td::ByteFlowSink sink;
source >> gzip_flow >> sink;
std::string s(1 << 16, 'a');
td::string s(1 << 16, 'a');
for (size_t i = 0; i < N; i++) {
input_writer.append(s);
source.wakeup();
@ -224,7 +223,7 @@ TEST(Gzip, decode_encode_flow_bomb) {
auto validate = [&](td::Slice chunk) {
CHECK(chunk.size() <= left_size);
left_size -= chunk.size();
ASSERT_TRUE(std::all_of(chunk.begin(), chunk.end(), [](auto c) { return c == 'a'; }));
ASSERT_TRUE(td::all_of(chunk, [](auto c) { return c == 'a'; }));
};
input_writer.append(gzip_bomb_str);