Explicitly specify namespace td in tdactor tests.
This commit is contained in:
parent
e78a5fbecf
commit
49282f35a5
@ -13,31 +13,29 @@
|
||||
#include "td/utils/Random.h"
|
||||
#include "td/utils/tests.h"
|
||||
|
||||
using namespace td;
|
||||
|
||||
TEST(MultiTimeout, bug) {
|
||||
ConcurrentScheduler sched;
|
||||
td::ConcurrentScheduler sched;
|
||||
int threads_n = 0;
|
||||
sched.init(threads_n);
|
||||
|
||||
sched.start();
|
||||
unique_ptr<MultiTimeout> multi_timeout;
|
||||
td::unique_ptr<td::MultiTimeout> multi_timeout;
|
||||
struct Data {
|
||||
MultiTimeout *multi_timeout;
|
||||
td::MultiTimeout *multi_timeout;
|
||||
};
|
||||
Data data;
|
||||
|
||||
{
|
||||
auto guard = sched.get_main_guard();
|
||||
multi_timeout = make_unique<MultiTimeout>("MultiTimeout");
|
||||
multi_timeout = td::make_unique<td::MultiTimeout>("MultiTimeout");
|
||||
data.multi_timeout = multi_timeout.get();
|
||||
multi_timeout->set_callback([](void *void_data, int64 key) {
|
||||
multi_timeout->set_callback([](void *void_data, td::int64 key) {
|
||||
auto &data = *static_cast<Data *>(void_data);
|
||||
if (key == 1) {
|
||||
data.multi_timeout->cancel_timeout(key + 1);
|
||||
data.multi_timeout->set_timeout_in(key + 2, 1);
|
||||
} else {
|
||||
Scheduler::instance()->finish();
|
||||
td::Scheduler::instance()->finish();
|
||||
}
|
||||
});
|
||||
multi_timeout->set_callback_data(&data);
|
||||
@ -51,9 +49,9 @@ TEST(MultiTimeout, bug) {
|
||||
sched.finish();
|
||||
}
|
||||
|
||||
class TimeoutManager final : public Actor {
|
||||
class TimeoutManager final : public td::Actor {
|
||||
public:
|
||||
static int32 count;
|
||||
static td::int32 count;
|
||||
|
||||
TimeoutManager() {
|
||||
count++;
|
||||
@ -70,7 +68,7 @@ class TimeoutManager final : public Actor {
|
||||
LOG(INFO) << "Destroy TimeoutManager";
|
||||
}
|
||||
|
||||
static void on_test_timeout_callback(void *timeout_manager_ptr, int64 id) {
|
||||
static void on_test_timeout_callback(void *timeout_manager_ptr, td::int64 id) {
|
||||
CHECK(count >= 0);
|
||||
if (count == 0) {
|
||||
LOG(ERROR) << "Receive timeout after manager was closed";
|
||||
@ -84,21 +82,21 @@ class TimeoutManager final : public Actor {
|
||||
void test_timeout() {
|
||||
CHECK(count > 0);
|
||||
// we must yield scheduler, so run_main breaks immediately, if timeouts are handled immediately
|
||||
Scheduler::instance()->yield();
|
||||
td::Scheduler::instance()->yield();
|
||||
}
|
||||
|
||||
MultiTimeout test_timeout_{"TestTimeout"};
|
||||
td::MultiTimeout test_timeout_{"TestTimeout"};
|
||||
};
|
||||
|
||||
int32 TimeoutManager::count;
|
||||
td::int32 TimeoutManager::count;
|
||||
|
||||
TEST(MultiTimeout, Destroy) {
|
||||
SET_VERBOSITY_LEVEL(VERBOSITY_NAME(ERROR));
|
||||
ConcurrentScheduler sched;
|
||||
td::ConcurrentScheduler sched;
|
||||
int threads_n = 0;
|
||||
sched.init(threads_n);
|
||||
|
||||
ActorOwn<TimeoutManager> timeout_manager = sched.create_actor_unsafe<TimeoutManager>(0, "TimeoutManager");
|
||||
auto timeout_manager = sched.create_actor_unsafe<TimeoutManager>(0, "TimeoutManager");
|
||||
TimeoutManager *manager = timeout_manager.get().get_actor_unsafe();
|
||||
sched.start();
|
||||
int cnt = 100;
|
||||
@ -107,12 +105,12 @@ TEST(MultiTimeout, Destroy) {
|
||||
cnt--;
|
||||
if (cnt > 0) {
|
||||
for (int i = 0; i < 2; i++) {
|
||||
manager->test_timeout_.set_timeout_in(Random::fast(0, 1000000000), Random::fast(2, 5) / 1000.0);
|
||||
manager->test_timeout_.set_timeout_in(td::Random::fast(0, 1000000000), td::Random::fast(2, 5) / 1000.0);
|
||||
}
|
||||
} else if (cnt == 0) {
|
||||
timeout_manager.reset();
|
||||
} else if (cnt == -10) {
|
||||
Scheduler::instance()->finish();
|
||||
td::Scheduler::instance()->finish();
|
||||
}
|
||||
}
|
||||
sched.finish();
|
||||
|
@ -19,18 +19,16 @@
|
||||
#include <memory>
|
||||
#include <utility>
|
||||
|
||||
using namespace td;
|
||||
|
||||
namespace {
|
||||
|
||||
template <class ContainerT>
|
||||
static typename ContainerT::value_type &rand_elem(ContainerT &cont) {
|
||||
CHECK(0 < cont.size() && cont.size() <= static_cast<size_t>(std::numeric_limits<int>::max()));
|
||||
return cont[Random::fast(0, static_cast<int>(cont.size()) - 1)];
|
||||
return cont[td::Random::fast(0, static_cast<int>(cont.size()) - 1)];
|
||||
}
|
||||
|
||||
static uint32 fast_pow_mod_uint32(uint32 x, uint32 p) {
|
||||
uint32 res = 1;
|
||||
static td::uint32 fast_pow_mod_uint32(td::uint32 x, td::uint32 p) {
|
||||
td::uint32 res = 1;
|
||||
while (p) {
|
||||
if (p & 1) {
|
||||
res *= x;
|
||||
@ -41,18 +39,18 @@ static uint32 fast_pow_mod_uint32(uint32 x, uint32 p) {
|
||||
return res;
|
||||
}
|
||||
|
||||
static uint32 slow_pow_mod_uint32(uint32 x, uint32 p) {
|
||||
uint32 res = 1;
|
||||
for (uint32 i = 0; i < p; i++) {
|
||||
static td::uint32 slow_pow_mod_uint32(td::uint32 x, td::uint32 p) {
|
||||
td::uint32 res = 1;
|
||||
for (td::uint32 i = 0; i < p; i++) {
|
||||
res *= x;
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
struct Query {
|
||||
uint32 query_id{};
|
||||
uint32 result{};
|
||||
std::vector<int> todo;
|
||||
td::uint32 query_id{};
|
||||
td::uint32 result{};
|
||||
td::vector<int> todo;
|
||||
Query() = default;
|
||||
Query(const Query &) = delete;
|
||||
Query &operator=(const Query &) = delete;
|
||||
@ -72,25 +70,25 @@ struct Query {
|
||||
}
|
||||
};
|
||||
|
||||
static uint32 fast_calc(Query &q) {
|
||||
uint32 result = q.result;
|
||||
static td::uint32 fast_calc(Query &q) {
|
||||
td::uint32 result = q.result;
|
||||
for (auto x : q.todo) {
|
||||
result = fast_pow_mod_uint32(result, x);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
class Worker final : public Actor {
|
||||
class Worker final : public td::Actor {
|
||||
public:
|
||||
explicit Worker(int threads_n) : threads_n_(threads_n) {
|
||||
}
|
||||
void query(PromiseActor<uint32> &&promise, uint32 x, uint32 p) {
|
||||
uint32 result = slow_pow_mod_uint32(x, p);
|
||||
void query(td::PromiseActor<td::uint32> &&promise, td::uint32 x, td::uint32 p) {
|
||||
td::uint32 result = slow_pow_mod_uint32(x, p);
|
||||
promise.set_value(std::move(result));
|
||||
|
||||
(void)threads_n_;
|
||||
// if (threads_n_ > 1 && Random::fast(0, 9) == 0) {
|
||||
// migrate(Random::fast(2, threads_n));
|
||||
// if (threads_n_ > 1 && td::Random::fast(0, 9) == 0) {
|
||||
// migrate(td::Random::fast(2, threads_n));
|
||||
//}
|
||||
}
|
||||
|
||||
@ -98,7 +96,7 @@ class Worker final : public Actor {
|
||||
int threads_n_;
|
||||
};
|
||||
|
||||
class QueryActor final : public Actor {
|
||||
class QueryActor final : public td::Actor {
|
||||
public:
|
||||
class Callback {
|
||||
public:
|
||||
@ -115,39 +113,39 @@ class QueryActor final : public Actor {
|
||||
explicit QueryActor(int threads_n) : threads_n_(threads_n) {
|
||||
}
|
||||
|
||||
void set_callback(unique_ptr<Callback> callback) {
|
||||
void set_callback(td::unique_ptr<Callback> callback) {
|
||||
callback_ = std::move(callback);
|
||||
}
|
||||
void set_workers(std::vector<ActorId<Worker>> workers) {
|
||||
void set_workers(td::vector<td::ActorId<Worker>> workers) {
|
||||
workers_ = std::move(workers);
|
||||
}
|
||||
|
||||
void query(Query &&query) {
|
||||
uint32 x = query.result;
|
||||
uint32 p = query.next_pow();
|
||||
if (Random::fast(0, 3) && (p <= 1000 || workers_.empty())) {
|
||||
td::uint32 x = query.result;
|
||||
td::uint32 p = query.next_pow();
|
||||
if (td::Random::fast(0, 3) && (p <= 1000 || workers_.empty())) {
|
||||
query.result = slow_pow_mod_uint32(x, p);
|
||||
callback_->on_result(std::move(query));
|
||||
} else {
|
||||
auto future = Random::fast(0, 3) == 0
|
||||
? send_promise<ActorSendType::Immediate>(rand_elem(workers_), &Worker::query, x, p)
|
||||
: send_promise<ActorSendType::Later>(rand_elem(workers_), &Worker::query, x, p);
|
||||
auto future = td::Random::fast(0, 3) == 0
|
||||
? td::send_promise<td::ActorSendType::Immediate>(rand_elem(workers_), &Worker::query, x, p)
|
||||
: td::send_promise<td::ActorSendType::Later>(rand_elem(workers_), &Worker::query, x, p);
|
||||
if (future.is_ready()) {
|
||||
query.result = future.move_as_ok();
|
||||
callback_->on_result(std::move(query));
|
||||
} else {
|
||||
future.set_event(EventCreator::raw(actor_id(), query.query_id));
|
||||
future.set_event(td::EventCreator::raw(actor_id(), query.query_id));
|
||||
auto query_id = query.query_id;
|
||||
pending_.emplace(query_id, std::make_pair(std::move(future), std::move(query)));
|
||||
}
|
||||
}
|
||||
if (threads_n_ > 1 && Random::fast(0, 9) == 0) {
|
||||
migrate(Random::fast(2, threads_n_));
|
||||
if (threads_n_ > 1 && td::Random::fast(0, 9) == 0) {
|
||||
migrate(td::Random::fast(2, threads_n_));
|
||||
}
|
||||
}
|
||||
|
||||
void raw_event(const Event::Raw &event) final {
|
||||
uint32 id = event.u32;
|
||||
void raw_event(const td::Event::Raw &event) final {
|
||||
td::uint32 id = event.u32;
|
||||
auto it = pending_.find(id);
|
||||
auto future = std::move(it->second.first);
|
||||
auto query = std::move(it->second.second);
|
||||
@ -162,7 +160,7 @@ class QueryActor final : public Actor {
|
||||
stop();
|
||||
}
|
||||
|
||||
void on_start_migrate(int32 sched_id) final {
|
||||
void on_start_migrate(td::int32 sched_id) final {
|
||||
for (auto &it : pending_) {
|
||||
start_migrate(it.second.first, sched_id);
|
||||
}
|
||||
@ -174,13 +172,13 @@ class QueryActor final : public Actor {
|
||||
}
|
||||
|
||||
private:
|
||||
unique_ptr<Callback> callback_;
|
||||
std::map<uint32, std::pair<FutureActor<uint32>, Query>> pending_;
|
||||
std::vector<ActorId<Worker>> workers_;
|
||||
td::unique_ptr<Callback> callback_;
|
||||
std::map<td::uint32, std::pair<td::FutureActor<td::uint32>, Query>> pending_;
|
||||
td::vector<td::ActorId<Worker>> workers_;
|
||||
int threads_n_;
|
||||
};
|
||||
|
||||
class MainQueryActor final : public Actor {
|
||||
class MainQueryActor final : public td::Actor {
|
||||
class QueryActorCallback final : public QueryActor::Callback {
|
||||
public:
|
||||
void on_result(Query &&query) final {
|
||||
@ -193,13 +191,13 @@ class MainQueryActor final : public Actor {
|
||||
void on_closed() final {
|
||||
send_closure(parent_id_, &MainQueryActor::on_closed);
|
||||
}
|
||||
QueryActorCallback(ActorId<MainQueryActor> parent_id, ActorId<QueryActor> next_solver)
|
||||
QueryActorCallback(td::ActorId<MainQueryActor> parent_id, td::ActorId<QueryActor> next_solver)
|
||||
: parent_id_(parent_id), next_solver_(next_solver) {
|
||||
}
|
||||
|
||||
private:
|
||||
ActorId<MainQueryActor> parent_id_;
|
||||
ActorId<QueryActor> next_solver_;
|
||||
td::ActorId<MainQueryActor> parent_id_;
|
||||
td::ActorId<QueryActor> next_solver_;
|
||||
};
|
||||
|
||||
const int ACTORS_CNT = 10;
|
||||
@ -212,22 +210,22 @@ class MainQueryActor final : public Actor {
|
||||
void start_up() final {
|
||||
actors_.resize(ACTORS_CNT);
|
||||
for (auto &actor : actors_) {
|
||||
auto actor_ptr = make_unique<QueryActor>(threads_n_);
|
||||
actor = register_actor("QueryActor", std::move(actor_ptr), threads_n_ > 1 ? Random::fast(2, threads_n_) : 0)
|
||||
auto actor_ptr = td::make_unique<QueryActor>(threads_n_);
|
||||
actor = register_actor("QueryActor", std::move(actor_ptr), threads_n_ > 1 ? td::Random::fast(2, threads_n_) : 0)
|
||||
.release();
|
||||
}
|
||||
|
||||
workers_.resize(WORKERS_CNT);
|
||||
for (auto &worker : workers_) {
|
||||
auto actor_ptr = make_unique<Worker>(threads_n_);
|
||||
worker =
|
||||
register_actor("Worker", std::move(actor_ptr), threads_n_ > 1 ? Random::fast(2, threads_n_) : 0).release();
|
||||
auto actor_ptr = td::make_unique<Worker>(threads_n_);
|
||||
worker = register_actor("Worker", std::move(actor_ptr), threads_n_ > 1 ? td::Random::fast(2, threads_n_) : 0)
|
||||
.release();
|
||||
}
|
||||
|
||||
for (int i = 0; i < ACTORS_CNT; i++) {
|
||||
ref_cnt_++;
|
||||
send_closure(actors_[i], &QueryActor::set_callback,
|
||||
make_unique<QueryActorCallback>(actor_id(this), actors_[(i + 1) % ACTORS_CNT]));
|
||||
td::make_unique<QueryActorCallback>(actor_id(this), actors_[(i + 1) % ACTORS_CNT]));
|
||||
send_closure(actors_[i], &QueryActor::set_workers, workers_);
|
||||
}
|
||||
yield();
|
||||
@ -252,14 +250,14 @@ class MainQueryActor final : public Actor {
|
||||
void on_closed() {
|
||||
ref_cnt_--;
|
||||
if (ref_cnt_ == 0) {
|
||||
Scheduler::instance()->finish();
|
||||
td::Scheduler::instance()->finish();
|
||||
}
|
||||
}
|
||||
|
||||
void wakeup() final {
|
||||
int cnt = 100000;
|
||||
while (out_cnt_ < in_cnt_ + 100 && out_cnt_ < cnt) {
|
||||
if (Random::fast_bool()) {
|
||||
if (td::Random::fast_bool()) {
|
||||
send_closure(rand_elem(actors_), &QueryActor::query, create_query());
|
||||
} else {
|
||||
send_closure_later(rand_elem(actors_), &QueryActor::query, create_query());
|
||||
@ -276,9 +274,9 @@ class MainQueryActor final : public Actor {
|
||||
}
|
||||
|
||||
private:
|
||||
std::map<uint32, uint32> expected_;
|
||||
std::vector<ActorId<QueryActor>> actors_;
|
||||
std::vector<ActorId<Worker>> workers_;
|
||||
std::map<td::uint32, td::uint32> expected_;
|
||||
td::vector<td::ActorId<QueryActor>> actors_;
|
||||
td::vector<td::ActorId<Worker>> workers_;
|
||||
int out_cnt_ = 0;
|
||||
int in_cnt_ = 0;
|
||||
int query_id_ = 1;
|
||||
@ -286,46 +284,47 @@ class MainQueryActor final : public Actor {
|
||||
int threads_n_;
|
||||
};
|
||||
|
||||
class SimpleActor final : public Actor {
|
||||
class SimpleActor final : public td::Actor {
|
||||
public:
|
||||
explicit SimpleActor(int32 threads_n) : threads_n_(threads_n) {
|
||||
explicit SimpleActor(td::int32 threads_n) : threads_n_(threads_n) {
|
||||
}
|
||||
void start_up() final {
|
||||
auto actor_ptr = make_unique<Worker>(threads_n_);
|
||||
auto actor_ptr = td::make_unique<Worker>(threads_n_);
|
||||
worker_ =
|
||||
register_actor("Worker", std::move(actor_ptr), threads_n_ > 1 ? Random::fast(2, threads_n_) : 0).release();
|
||||
register_actor("Worker", std::move(actor_ptr), threads_n_ > 1 ? td::Random::fast(2, threads_n_) : 0).release();
|
||||
yield();
|
||||
}
|
||||
|
||||
void wakeup() final {
|
||||
if (q_ == 100000) {
|
||||
Scheduler::instance()->finish();
|
||||
td::Scheduler::instance()->finish();
|
||||
stop();
|
||||
return;
|
||||
}
|
||||
q_++;
|
||||
p_ = Random::fast_bool() ? 1 : 10000;
|
||||
auto future = Random::fast(0, 3) == 0 ? send_promise<ActorSendType::Immediate>(worker_, &Worker::query, q_, p_)
|
||||
: send_promise<ActorSendType::Later>(worker_, &Worker::query, q_, p_);
|
||||
p_ = td::Random::fast_bool() ? 1 : 10000;
|
||||
auto future = td::Random::fast(0, 3) == 0
|
||||
? td::send_promise<td::ActorSendType::Immediate>(worker_, &Worker::query, q_, p_)
|
||||
: td::send_promise<td::ActorSendType::Later>(worker_, &Worker::query, q_, p_);
|
||||
if (future.is_ready()) {
|
||||
auto result = future.move_as_ok();
|
||||
CHECK(result == fast_pow_mod_uint32(q_, p_));
|
||||
yield();
|
||||
} else {
|
||||
future.set_event(EventCreator::raw(actor_id(), nullptr));
|
||||
future.set_event(td::EventCreator::raw(actor_id(), nullptr));
|
||||
future_ = std::move(future);
|
||||
}
|
||||
// if (threads_n_ > 1 && Random::fast(0, 2) == 0) {
|
||||
// migrate(Random::fast(1, threads_n));
|
||||
// if (threads_n_ > 1 && td::Random::fast(0, 2) == 0) {
|
||||
// migrate(td::Random::fast(1, threads_n));
|
||||
//}
|
||||
}
|
||||
void raw_event(const Event::Raw &event) final {
|
||||
void raw_event(const td::Event::Raw &event) final {
|
||||
auto result = future_.move_as_ok();
|
||||
CHECK(result == fast_pow_mod_uint32(q_, p_));
|
||||
yield();
|
||||
}
|
||||
|
||||
void on_start_migrate(int32 sched_id) final {
|
||||
void on_start_migrate(td::int32 sched_id) final {
|
||||
start_migrate(future_, sched_id);
|
||||
}
|
||||
void on_finish_migrate() final {
|
||||
@ -333,25 +332,26 @@ class SimpleActor final : public Actor {
|
||||
}
|
||||
|
||||
private:
|
||||
int32 threads_n_;
|
||||
ActorId<Worker> worker_;
|
||||
FutureActor<uint32> future_;
|
||||
uint32 q_ = 1;
|
||||
uint32 p_ = 0;
|
||||
td::int32 threads_n_;
|
||||
td::ActorId<Worker> worker_;
|
||||
td::FutureActor<td::uint32> future_;
|
||||
td::uint32 q_ = 1;
|
||||
td::uint32 p_ = 0;
|
||||
};
|
||||
} // namespace
|
||||
|
||||
class SendToDead final : public Actor {
|
||||
class SendToDead final : public td::Actor {
|
||||
public:
|
||||
class Parent final : public Actor {
|
||||
class Parent final : public td::Actor {
|
||||
public:
|
||||
explicit Parent(ActorShared<> parent, int ttl = 3) : parent_(std::move(parent)), ttl_(ttl) {
|
||||
explicit Parent(td::ActorShared<> parent, int ttl = 3) : parent_(std::move(parent)), ttl_(ttl) {
|
||||
}
|
||||
void start_up() final {
|
||||
set_timeout_in(Random::fast_uint32() % 3 * 0.001);
|
||||
set_timeout_in(td::Random::fast_uint32() % 3 * 0.001);
|
||||
if (ttl_ != 0) {
|
||||
child_ = create_actor_on_scheduler<Parent>(
|
||||
"Child", Random::fast_uint32() % Scheduler::instance()->sched_count(), actor_shared(this), ttl_ - 1);
|
||||
child_ = td::create_actor_on_scheduler<Parent>(
|
||||
"Child", td::Random::fast_uint32() % td::Scheduler::instance()->sched_count(), actor_shared(this),
|
||||
ttl_ - 1);
|
||||
}
|
||||
}
|
||||
void timeout_expired() final {
|
||||
@ -359,29 +359,30 @@ class SendToDead final : public Actor {
|
||||
}
|
||||
|
||||
private:
|
||||
ActorOwn<Parent> child_;
|
||||
ActorShared<> parent_;
|
||||
td::ActorOwn<Parent> child_;
|
||||
td::ActorShared<> parent_;
|
||||
int ttl_;
|
||||
};
|
||||
|
||||
void start_up() final {
|
||||
for (int i = 0; i < 2000; i++) {
|
||||
create_actor_on_scheduler<Parent>("Parent", Random::fast_uint32() % Scheduler::instance()->sched_count(),
|
||||
create_reference(), 4)
|
||||
td::create_actor_on_scheduler<Parent>(
|
||||
"Parent", td::Random::fast_uint32() % td::Scheduler::instance()->sched_count(), create_reference(), 4)
|
||||
.release();
|
||||
}
|
||||
}
|
||||
|
||||
ActorShared<> create_reference() {
|
||||
td::ActorShared<> create_reference() {
|
||||
ref_cnt_++;
|
||||
return actor_shared(this);
|
||||
}
|
||||
|
||||
void hangup_shared() final {
|
||||
ref_cnt_--;
|
||||
if (ref_cnt_ == 0) {
|
||||
ttl_--;
|
||||
if (ttl_ <= 0) {
|
||||
Scheduler::instance()->finish();
|
||||
td::Scheduler::instance()->finish();
|
||||
stop();
|
||||
} else {
|
||||
start_up();
|
||||
@ -389,14 +390,14 @@ class SendToDead final : public Actor {
|
||||
}
|
||||
}
|
||||
|
||||
uint32 ttl_{50};
|
||||
uint32 ref_cnt_{0};
|
||||
td::uint32 ttl_{50};
|
||||
td::uint32 ref_cnt_{0};
|
||||
};
|
||||
|
||||
TEST(Actors, send_to_dead) {
|
||||
//TODO: fix CHECK(storage_count_.load() == 0)
|
||||
return;
|
||||
ConcurrentScheduler sched;
|
||||
td::ConcurrentScheduler sched;
|
||||
int threads_n = 5;
|
||||
sched.init(threads_n);
|
||||
|
||||
@ -409,9 +410,7 @@ TEST(Actors, send_to_dead) {
|
||||
}
|
||||
|
||||
TEST(Actors, main_simple) {
|
||||
SET_VERBOSITY_LEVEL(VERBOSITY_NAME(ERROR));
|
||||
|
||||
ConcurrentScheduler sched;
|
||||
td::ConcurrentScheduler sched;
|
||||
int threads_n = 3;
|
||||
sched.init(threads_n);
|
||||
|
||||
@ -424,9 +423,7 @@ TEST(Actors, main_simple) {
|
||||
}
|
||||
|
||||
TEST(Actors, main) {
|
||||
SET_VERBOSITY_LEVEL(VERBOSITY_NAME(ERROR));
|
||||
|
||||
ConcurrentScheduler sched;
|
||||
td::ConcurrentScheduler sched;
|
||||
int threads_n = 9;
|
||||
sched.init(threads_n);
|
||||
|
||||
@ -438,23 +435,21 @@ TEST(Actors, main) {
|
||||
sched.finish();
|
||||
}
|
||||
|
||||
class DoAfterStop final : public Actor {
|
||||
class DoAfterStop final : public td::Actor {
|
||||
public:
|
||||
void loop() final {
|
||||
ptr = make_unique<int>(10);
|
||||
ptr = td::make_unique<int>(10);
|
||||
stop();
|
||||
CHECK(*ptr == 10);
|
||||
Scheduler::instance()->finish();
|
||||
td::Scheduler::instance()->finish();
|
||||
}
|
||||
|
||||
private:
|
||||
unique_ptr<int> ptr;
|
||||
td::unique_ptr<int> ptr;
|
||||
};
|
||||
|
||||
TEST(Actors, do_after_stop) {
|
||||
SET_VERBOSITY_LEVEL(VERBOSITY_NAME(ERROR));
|
||||
|
||||
ConcurrentScheduler sched;
|
||||
td::ConcurrentScheduler sched;
|
||||
int threads_n = 0;
|
||||
sched.init(threads_n);
|
||||
|
||||
@ -466,9 +461,9 @@ TEST(Actors, do_after_stop) {
|
||||
sched.finish();
|
||||
}
|
||||
|
||||
class XContext final : public ActorContext {
|
||||
class XContext final : public td::ActorContext {
|
||||
public:
|
||||
int32 get_id() const final {
|
||||
td::int32 get_id() const final {
|
||||
return 123456789;
|
||||
}
|
||||
|
||||
@ -481,12 +476,12 @@ class XContext final : public ActorContext {
|
||||
int x = 1234;
|
||||
};
|
||||
|
||||
class WithXContext final : public Actor {
|
||||
class WithXContext final : public td::Actor {
|
||||
public:
|
||||
void start_up() final {
|
||||
auto old_context = set_context(std::make_shared<XContext>());
|
||||
}
|
||||
void f(unique_ptr<Guard> guard) {
|
||||
void f(td::unique_ptr<td::Guard> guard) {
|
||||
}
|
||||
void close() {
|
||||
stop();
|
||||
@ -494,25 +489,23 @@ class WithXContext final : public Actor {
|
||||
};
|
||||
|
||||
static void check_context() {
|
||||
auto ptr = static_cast<XContext *>(Scheduler::context());
|
||||
CHECK(ptr);
|
||||
auto ptr = static_cast<XContext *>(td::Scheduler::context());
|
||||
CHECK(ptr != nullptr);
|
||||
ptr->validate();
|
||||
}
|
||||
|
||||
TEST(Actors, context_during_destruction) {
|
||||
SET_VERBOSITY_LEVEL(VERBOSITY_NAME(ERROR));
|
||||
|
||||
ConcurrentScheduler sched;
|
||||
td::ConcurrentScheduler sched;
|
||||
int threads_n = 0;
|
||||
sched.init(threads_n);
|
||||
|
||||
{
|
||||
auto guard = sched.get_main_guard();
|
||||
auto with_context = create_actor<WithXContext>("WithXContext").release();
|
||||
send_closure(with_context, &WithXContext::f, create_lambda_guard([] { check_context(); }));
|
||||
auto with_context = td::create_actor<WithXContext>("WithXContext").release();
|
||||
send_closure(with_context, &WithXContext::f, td::create_lambda_guard([] { check_context(); }));
|
||||
send_closure_later(with_context, &WithXContext::close);
|
||||
send_closure(with_context, &WithXContext::f, create_lambda_guard([] { check_context(); }));
|
||||
send_closure(with_context, &WithXContext::f, create_lambda_guard([] { Scheduler::instance()->finish(); }));
|
||||
send_closure(with_context, &WithXContext::f, td::create_lambda_guard([] { check_context(); }));
|
||||
send_closure(with_context, &WithXContext::f, td::create_lambda_guard([] { td::Scheduler::instance()->finish(); }));
|
||||
}
|
||||
sched.start();
|
||||
while (sched.run_main(10)) {
|
||||
|
@ -28,40 +28,38 @@
|
||||
#include <tuple>
|
||||
|
||||
namespace {
|
||||
using namespace td;
|
||||
|
||||
static const size_t BUF_SIZE = 1024 * 1024;
|
||||
static char buf[BUF_SIZE];
|
||||
static char buf2[BUF_SIZE];
|
||||
static StringBuilder sb(MutableSlice(buf, BUF_SIZE - 1));
|
||||
static StringBuilder sb2(MutableSlice(buf2, BUF_SIZE - 1));
|
||||
static td::StringBuilder sb(td::MutableSlice(buf, BUF_SIZE - 1));
|
||||
static td::StringBuilder sb2(td::MutableSlice(buf2, BUF_SIZE - 1));
|
||||
|
||||
static auto create_queue() {
|
||||
auto res = std::make_shared<MpscPollableQueue<EventFull>>();
|
||||
static std::shared_ptr<td::MpscPollableQueue<td::EventFull>> create_queue() {
|
||||
auto res = std::make_shared<td::MpscPollableQueue<td::EventFull>>();
|
||||
res->init();
|
||||
return res;
|
||||
}
|
||||
|
||||
TEST(Actors, SendLater) {
|
||||
SET_VERBOSITY_LEVEL(VERBOSITY_NAME(ERROR));
|
||||
sb.clear();
|
||||
Scheduler scheduler;
|
||||
td::Scheduler scheduler;
|
||||
scheduler.init(0, {create_queue()}, nullptr);
|
||||
|
||||
auto guard = scheduler.get_guard();
|
||||
class Worker final : public Actor {
|
||||
class Worker final : public td::Actor {
|
||||
public:
|
||||
void f() {
|
||||
sb << "A";
|
||||
}
|
||||
};
|
||||
auto id = create_actor<Worker>("Worker");
|
||||
scheduler.run_no_guard(Timestamp::in(1));
|
||||
send_closure(id, &Worker::f);
|
||||
send_closure_later(id, &Worker::f);
|
||||
send_closure(id, &Worker::f);
|
||||
auto id = td::create_actor<Worker>("Worker");
|
||||
scheduler.run_no_guard(td::Timestamp::in(1));
|
||||
td::send_closure(id, &Worker::f);
|
||||
td::send_closure_later(id, &Worker::f);
|
||||
td::send_closure(id, &Worker::f);
|
||||
ASSERT_STREQ("A", sb.as_cslice().c_str());
|
||||
scheduler.run_no_guard(Timestamp::in(1));
|
||||
scheduler.run_no_guard(td::Timestamp::in(1));
|
||||
ASSERT_STREQ("AAA", sb.as_cslice().c_str());
|
||||
}
|
||||
|
||||
@ -87,7 +85,7 @@ class X {
|
||||
~X() = default;
|
||||
};
|
||||
|
||||
class XReceiver final : public Actor {
|
||||
class XReceiver final : public td::Actor {
|
||||
public:
|
||||
void by_const_ref(const X &) {
|
||||
sb << "[by_const_ref]";
|
||||
@ -101,13 +99,12 @@ class XReceiver final : public Actor {
|
||||
};
|
||||
|
||||
TEST(Actors, simple_pass_event_arguments) {
|
||||
SET_VERBOSITY_LEVEL(VERBOSITY_NAME(ERROR));
|
||||
Scheduler scheduler;
|
||||
td::Scheduler scheduler;
|
||||
scheduler.init(0, {create_queue()}, nullptr);
|
||||
|
||||
auto guard = scheduler.get_guard();
|
||||
auto id = create_actor<XReceiver>("XR").release();
|
||||
scheduler.run_no_guard(Timestamp::in(1));
|
||||
auto id = td::create_actor<XReceiver>("XR").release();
|
||||
scheduler.run_no_guard(td::Timestamp::in(1));
|
||||
|
||||
X x;
|
||||
|
||||
@ -122,47 +119,47 @@ TEST(Actors, simple_pass_event_arguments) {
|
||||
|
||||
// Tmp-->ConstRef
|
||||
sb.clear();
|
||||
send_closure(id, &XReceiver::by_const_ref, X());
|
||||
td::send_closure(id, &XReceiver::by_const_ref, X());
|
||||
ASSERT_STREQ("[cnstr_default][by_const_ref]", sb.as_cslice().c_str());
|
||||
|
||||
// Tmp-->ConstRef (Delayed)
|
||||
sb.clear();
|
||||
send_closure_later(id, &XReceiver::by_const_ref, X());
|
||||
scheduler.run_no_guard(Timestamp::in(1));
|
||||
td::send_closure_later(id, &XReceiver::by_const_ref, X());
|
||||
scheduler.run_no_guard(td::Timestamp::in(1));
|
||||
// LOG(ERROR) << sb.as_cslice();
|
||||
ASSERT_STREQ("[cnstr_default][cnstr_move][by_const_ref]", sb.as_cslice().c_str());
|
||||
|
||||
// Tmp-->LvalueRef
|
||||
sb.clear();
|
||||
send_closure(id, &XReceiver::by_lvalue_ref, X());
|
||||
td::send_closure(id, &XReceiver::by_lvalue_ref, X());
|
||||
ASSERT_STREQ("[cnstr_default][by_lvalue_ref]", sb.as_cslice().c_str());
|
||||
|
||||
// Tmp-->LvalueRef (Delayed)
|
||||
sb.clear();
|
||||
send_closure_later(id, &XReceiver::by_lvalue_ref, X());
|
||||
scheduler.run_no_guard(Timestamp::in(1));
|
||||
td::send_closure_later(id, &XReceiver::by_lvalue_ref, X());
|
||||
scheduler.run_no_guard(td::Timestamp::in(1));
|
||||
ASSERT_STREQ("[cnstr_default][cnstr_move][by_lvalue_ref]", sb.as_cslice().c_str());
|
||||
|
||||
// Tmp-->Value
|
||||
sb.clear();
|
||||
send_closure(id, &XReceiver::by_value, X());
|
||||
td::send_closure(id, &XReceiver::by_value, X());
|
||||
ASSERT_STREQ("[cnstr_default][cnstr_move][by_value]", sb.as_cslice().c_str());
|
||||
|
||||
// Tmp-->Value (Delayed)
|
||||
sb.clear();
|
||||
send_closure_later(id, &XReceiver::by_value, X());
|
||||
scheduler.run_no_guard(Timestamp::in(1));
|
||||
td::send_closure_later(id, &XReceiver::by_value, X());
|
||||
scheduler.run_no_guard(td::Timestamp::in(1));
|
||||
ASSERT_STREQ("[cnstr_default][cnstr_move][cnstr_move][by_value]", sb.as_cslice().c_str());
|
||||
|
||||
// Var-->ConstRef
|
||||
sb.clear();
|
||||
send_closure(id, &XReceiver::by_const_ref, x);
|
||||
td::send_closure(id, &XReceiver::by_const_ref, x);
|
||||
ASSERT_STREQ("[by_const_ref]", sb.as_cslice().c_str());
|
||||
|
||||
// Var-->ConstRef (Delayed)
|
||||
sb.clear();
|
||||
send_closure_later(id, &XReceiver::by_const_ref, x);
|
||||
scheduler.run_no_guard(Timestamp::in(1));
|
||||
td::send_closure_later(id, &XReceiver::by_const_ref, x);
|
||||
scheduler.run_no_guard(td::Timestamp::in(1));
|
||||
ASSERT_STREQ("[cnstr_copy][by_const_ref]", sb.as_cslice().c_str());
|
||||
|
||||
// Var-->LvalueRef
|
||||
@ -171,17 +168,17 @@ TEST(Actors, simple_pass_event_arguments) {
|
||||
|
||||
// Var-->Value
|
||||
sb.clear();
|
||||
send_closure(id, &XReceiver::by_value, x);
|
||||
td::send_closure(id, &XReceiver::by_value, x);
|
||||
ASSERT_STREQ("[cnstr_copy][by_value]", sb.as_cslice().c_str());
|
||||
|
||||
// Var-->Value (Delayed)
|
||||
sb.clear();
|
||||
send_closure_later(id, &XReceiver::by_value, x);
|
||||
scheduler.run_no_guard(Timestamp::in(1));
|
||||
td::send_closure_later(id, &XReceiver::by_value, x);
|
||||
scheduler.run_no_guard(td::Timestamp::in(1));
|
||||
ASSERT_STREQ("[cnstr_copy][cnstr_move][by_value]", sb.as_cslice().c_str());
|
||||
}
|
||||
|
||||
class PrintChar final : public Actor {
|
||||
class PrintChar final : public td::Actor {
|
||||
public:
|
||||
PrintChar(char c, int cnt) : char_(c), cnt_(cnt) {
|
||||
}
|
||||
@ -208,19 +205,18 @@ class PrintChar final : public Actor {
|
||||
// Yield must add actor to the end of queue
|
||||
//
|
||||
TEST(Actors, simple_hand_yield) {
|
||||
SET_VERBOSITY_LEVEL(VERBOSITY_NAME(ERROR));
|
||||
Scheduler scheduler;
|
||||
td::Scheduler scheduler;
|
||||
scheduler.init(0, {create_queue()}, nullptr);
|
||||
sb.clear();
|
||||
int cnt = 1000;
|
||||
{
|
||||
auto guard = scheduler.get_guard();
|
||||
create_actor<PrintChar>("PrintA", 'A', cnt).release();
|
||||
create_actor<PrintChar>("PrintB", 'B', cnt).release();
|
||||
create_actor<PrintChar>("PrintC", 'C', cnt).release();
|
||||
td::create_actor<PrintChar>("PrintA", 'A', cnt).release();
|
||||
td::create_actor<PrintChar>("PrintB", 'B', cnt).release();
|
||||
td::create_actor<PrintChar>("PrintC", 'C', cnt).release();
|
||||
}
|
||||
scheduler.run(Timestamp::in(1));
|
||||
std::string expected;
|
||||
scheduler.run(td::Timestamp::in(1));
|
||||
td::string expected;
|
||||
for (int i = 0; i < cnt; i++) {
|
||||
expected += "ABC";
|
||||
}
|
||||
@ -229,7 +225,7 @@ TEST(Actors, simple_hand_yield) {
|
||||
|
||||
class Ball {
|
||||
public:
|
||||
friend void start_migrate(Ball &ball, int32 sched_id) {
|
||||
friend void start_migrate(Ball &ball, td::int32 sched_id) {
|
||||
sb << "start";
|
||||
}
|
||||
friend void finish_migrate(Ball &ball) {
|
||||
@ -237,30 +233,30 @@ class Ball {
|
||||
}
|
||||
};
|
||||
|
||||
class Pong final : public Actor {
|
||||
class Pong final : public td::Actor {
|
||||
public:
|
||||
void pong(Ball ball) {
|
||||
Scheduler::instance()->finish();
|
||||
td::Scheduler::instance()->finish();
|
||||
}
|
||||
};
|
||||
|
||||
class Ping final : public Actor {
|
||||
class Ping final : public td::Actor {
|
||||
public:
|
||||
explicit Ping(ActorId<Pong> pong) : pong_(pong) {
|
||||
explicit Ping(td::ActorId<Pong> pong) : pong_(pong) {
|
||||
}
|
||||
void start_up() final {
|
||||
send_closure(pong_, &Pong::pong, Ball());
|
||||
td::send_closure(pong_, &Pong::pong, Ball());
|
||||
}
|
||||
|
||||
private:
|
||||
ActorId<Pong> pong_;
|
||||
td::ActorId<Pong> pong_;
|
||||
};
|
||||
|
||||
TEST(Actors, simple_migrate) {
|
||||
sb.clear();
|
||||
sb2.clear();
|
||||
|
||||
ConcurrentScheduler scheduler;
|
||||
td::ConcurrentScheduler scheduler;
|
||||
scheduler.init(2);
|
||||
auto pong = scheduler.create_actor_unsafe<Pong>(2, "Pong").release();
|
||||
scheduler.create_actor_unsafe<Ping>(1, "Ping", pong).release();
|
||||
@ -277,7 +273,7 @@ TEST(Actors, simple_migrate) {
|
||||
#endif
|
||||
}
|
||||
|
||||
class OpenClose final : public Actor {
|
||||
class OpenClose final : public td::Actor {
|
||||
public:
|
||||
explicit OpenClose(int cnt) : cnt_(cnt) {
|
||||
}
|
||||
@ -285,17 +281,17 @@ class OpenClose final : public Actor {
|
||||
yield();
|
||||
}
|
||||
void wakeup() final {
|
||||
ObserverBase *observer = reinterpret_cast<ObserverBase *>(123);
|
||||
auto observer = reinterpret_cast<td::ObserverBase *>(123);
|
||||
if (cnt_ > 0) {
|
||||
auto r_file_fd = FileFd::open("server", FileFd::Read | FileFd::Create);
|
||||
auto r_file_fd = td::FileFd::open("server", td::FileFd::Read | td::FileFd::Create);
|
||||
LOG_CHECK(r_file_fd.is_ok()) << r_file_fd.error();
|
||||
auto file_fd = r_file_fd.move_as_ok();
|
||||
{ PollableFd pollable_fd = file_fd.get_poll_info().extract_pollable_fd(observer); }
|
||||
{ auto pollable_fd = file_fd.get_poll_info().extract_pollable_fd(observer); }
|
||||
file_fd.close();
|
||||
cnt_--;
|
||||
yield();
|
||||
} else {
|
||||
Scheduler::instance()->finish();
|
||||
td::Scheduler::instance()->finish();
|
||||
}
|
||||
}
|
||||
|
||||
@ -304,8 +300,7 @@ class OpenClose final : public Actor {
|
||||
};
|
||||
|
||||
TEST(Actors, open_close) {
|
||||
SET_VERBOSITY_LEVEL(VERBOSITY_NAME(ERROR));
|
||||
ConcurrentScheduler scheduler;
|
||||
td::ConcurrentScheduler scheduler;
|
||||
scheduler.init(2);
|
||||
int cnt = 1000000;
|
||||
#if TD_WINDOWS || TD_ANDROID
|
||||
@ -321,18 +316,18 @@ TEST(Actors, open_close) {
|
||||
}
|
||||
|
||||
namespace {
|
||||
class MsgActor : public Actor {
|
||||
class MsgActor : public td::Actor {
|
||||
public:
|
||||
virtual void msg() = 0;
|
||||
};
|
||||
|
||||
class Slave final : public Actor {
|
||||
class Slave final : public td::Actor {
|
||||
public:
|
||||
ActorId<MsgActor> msg;
|
||||
explicit Slave(ActorId<MsgActor> msg) : msg(msg) {
|
||||
td::ActorId<MsgActor> msg;
|
||||
explicit Slave(td::ActorId<MsgActor> msg) : msg(msg) {
|
||||
}
|
||||
void hangup() final {
|
||||
send_closure(msg, &MsgActor::msg);
|
||||
td::send_closure(msg, &MsgActor::msg);
|
||||
}
|
||||
};
|
||||
|
||||
@ -340,10 +335,10 @@ class MasterActor final : public MsgActor {
|
||||
public:
|
||||
void loop() final {
|
||||
alive_ = true;
|
||||
slave = create_actor<Slave>("slave", static_cast<ActorId<MsgActor>>(actor_id(this)));
|
||||
slave = td::create_actor<Slave>("slave", static_cast<td::ActorId<MsgActor>>(actor_id(this)));
|
||||
stop();
|
||||
}
|
||||
ActorOwn<Slave> slave;
|
||||
td::ActorOwn<Slave> slave;
|
||||
|
||||
MasterActor() = default;
|
||||
MasterActor(const MasterActor &) = delete;
|
||||
@ -356,26 +351,25 @@ class MasterActor final : public MsgActor {
|
||||
void msg() final {
|
||||
CHECK(alive_ == 123456789);
|
||||
}
|
||||
uint64 alive_ = 123456789;
|
||||
td::uint64 alive_ = 123456789;
|
||||
};
|
||||
} // namespace
|
||||
|
||||
TEST(Actors, call_after_destruct) {
|
||||
SET_VERBOSITY_LEVEL(VERBOSITY_NAME(ERROR));
|
||||
Scheduler scheduler;
|
||||
td::Scheduler scheduler;
|
||||
scheduler.init(0, {create_queue()}, nullptr);
|
||||
{
|
||||
auto guard = scheduler.get_guard();
|
||||
create_actor<MasterActor>("Master").release();
|
||||
td::create_actor<MasterActor>("Master").release();
|
||||
}
|
||||
scheduler.run(Timestamp::in(1));
|
||||
scheduler.run(td::Timestamp::in(1));
|
||||
}
|
||||
|
||||
class LinkTokenSlave final : public Actor {
|
||||
class LinkTokenSlave final : public td::Actor {
|
||||
public:
|
||||
explicit LinkTokenSlave(ActorShared<> parent) : parent_(std::move(parent)) {
|
||||
explicit LinkTokenSlave(td::ActorShared<> parent) : parent_(std::move(parent)) {
|
||||
}
|
||||
void add(uint64 link_token) {
|
||||
void add(td::uint64 link_token) {
|
||||
CHECK(link_token == get_link_token());
|
||||
}
|
||||
void close() {
|
||||
@ -383,42 +377,43 @@ class LinkTokenSlave final : public Actor {
|
||||
}
|
||||
|
||||
private:
|
||||
ActorShared<> parent_;
|
||||
td::ActorShared<> parent_;
|
||||
};
|
||||
|
||||
class LinkTokenMasterActor final : public Actor {
|
||||
class LinkTokenMasterActor final : public td::Actor {
|
||||
public:
|
||||
explicit LinkTokenMasterActor(int cnt) : cnt_(cnt) {
|
||||
}
|
||||
void start_up() final {
|
||||
child_ = create_actor<LinkTokenSlave>("Slave", actor_shared(this, 123)).release();
|
||||
child_ = td::create_actor<LinkTokenSlave>("Slave", actor_shared(this, 123)).release();
|
||||
yield();
|
||||
}
|
||||
void loop() final {
|
||||
for (int i = 0; i < 100 && cnt_ > 0; cnt_--, i++) {
|
||||
auto token = static_cast<uint64>(cnt_) + 1;
|
||||
auto token = static_cast<td::uint64>(cnt_) + 1;
|
||||
switch (i % 4) {
|
||||
case 0: {
|
||||
send_closure(ActorShared<LinkTokenSlave>(child_, token), &LinkTokenSlave::add, token);
|
||||
td::send_closure(td::ActorShared<LinkTokenSlave>(child_, token), &LinkTokenSlave::add, token);
|
||||
break;
|
||||
}
|
||||
case 1: {
|
||||
send_closure_later(ActorShared<LinkTokenSlave>(child_, token), &LinkTokenSlave::add, token);
|
||||
td::send_closure_later(td::ActorShared<LinkTokenSlave>(child_, token), &LinkTokenSlave::add, token);
|
||||
break;
|
||||
}
|
||||
case 2: {
|
||||
EventCreator::closure(ActorShared<LinkTokenSlave>(child_, token), &LinkTokenSlave::add, token).try_emit();
|
||||
td::EventCreator::closure(td::ActorShared<LinkTokenSlave>(child_, token), &LinkTokenSlave::add, token)
|
||||
.try_emit();
|
||||
break;
|
||||
}
|
||||
case 3: {
|
||||
EventCreator::closure(ActorShared<LinkTokenSlave>(child_, token), &LinkTokenSlave::add, token)
|
||||
td::EventCreator::closure(td::ActorShared<LinkTokenSlave>(child_, token), &LinkTokenSlave::add, token)
|
||||
.try_emit_later();
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (cnt_ == 0) {
|
||||
send_closure(child_, &LinkTokenSlave::close);
|
||||
td::send_closure(child_, &LinkTokenSlave::close);
|
||||
} else {
|
||||
yield();
|
||||
}
|
||||
@ -426,18 +421,17 @@ class LinkTokenMasterActor final : public Actor {
|
||||
|
||||
void hangup_shared() final {
|
||||
CHECK(get_link_token() == 123);
|
||||
Scheduler::instance()->finish();
|
||||
td::Scheduler::instance()->finish();
|
||||
stop();
|
||||
}
|
||||
|
||||
private:
|
||||
int cnt_;
|
||||
ActorId<LinkTokenSlave> child_;
|
||||
td::ActorId<LinkTokenSlave> child_;
|
||||
};
|
||||
|
||||
TEST(Actors, link_token) {
|
||||
SET_VERBOSITY_LEVEL(VERBOSITY_NAME(ERROR));
|
||||
ConcurrentScheduler scheduler;
|
||||
td::ConcurrentScheduler scheduler;
|
||||
scheduler.init(0);
|
||||
auto cnt = 100000;
|
||||
scheduler.create_actor_unsafe<LinkTokenMasterActor>(0, "A", cnt).release();
|
||||
@ -449,25 +443,25 @@ TEST(Actors, link_token) {
|
||||
|
||||
TEST(Actors, promise) {
|
||||
int value = -1;
|
||||
Promise<int> p1 = PromiseCreator::lambda([&](int x) { value = x; });
|
||||
p1.set_error(Status::Error("Test error"));
|
||||
td::Promise<int> p1 = td::PromiseCreator::lambda([&](int x) { value = x; });
|
||||
p1.set_error(td::Status::Error("Test error"));
|
||||
ASSERT_EQ(0, value);
|
||||
Promise<int32> p2 = PromiseCreator::lambda([&](Result<int32> x) { value = 1; });
|
||||
p2.set_error(Status::Error("Test error"));
|
||||
td::Promise<td::int32> p2 = td::PromiseCreator::lambda([&](td::Result<td::int32> x) { value = 1; });
|
||||
p2.set_error(td::Status::Error("Test error"));
|
||||
ASSERT_EQ(1, value);
|
||||
}
|
||||
|
||||
class LaterSlave final : public Actor {
|
||||
class LaterSlave final : public td::Actor {
|
||||
public:
|
||||
explicit LaterSlave(ActorShared<> parent) : parent_(std::move(parent)) {
|
||||
explicit LaterSlave(td::ActorShared<> parent) : parent_(std::move(parent)) {
|
||||
}
|
||||
|
||||
private:
|
||||
ActorShared<> parent_;
|
||||
td::ActorShared<> parent_;
|
||||
|
||||
void hangup() final {
|
||||
sb << "A";
|
||||
send_closure(actor_id(this), &LaterSlave::finish);
|
||||
td::send_closure(actor_id(this), &LaterSlave::finish);
|
||||
}
|
||||
void finish() {
|
||||
sb << "B";
|
||||
@ -475,12 +469,12 @@ class LaterSlave final : public Actor {
|
||||
}
|
||||
};
|
||||
|
||||
class LaterMasterActor final : public Actor {
|
||||
class LaterMasterActor final : public td::Actor {
|
||||
int cnt_ = 3;
|
||||
std::vector<ActorOwn<LaterSlave>> children_;
|
||||
td::vector<td::ActorOwn<LaterSlave>> children_;
|
||||
void start_up() final {
|
||||
for (int i = 0; i < cnt_; i++) {
|
||||
children_.push_back(create_actor<LaterSlave>("B", actor_shared(this)));
|
||||
children_.push_back(td::create_actor<LaterSlave>("B", actor_shared(this)));
|
||||
}
|
||||
yield();
|
||||
}
|
||||
@ -489,16 +483,15 @@ class LaterMasterActor final : public Actor {
|
||||
}
|
||||
void hangup_shared() final {
|
||||
if (!--cnt_) {
|
||||
Scheduler::instance()->finish();
|
||||
td::Scheduler::instance()->finish();
|
||||
stop();
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
TEST(Actors, later) {
|
||||
SET_VERBOSITY_LEVEL(VERBOSITY_NAME(ERROR));
|
||||
sb.clear();
|
||||
ConcurrentScheduler scheduler;
|
||||
td::ConcurrentScheduler scheduler;
|
||||
scheduler.init(0);
|
||||
scheduler.create_actor_unsafe<LaterMasterActor>(0, "A").release();
|
||||
scheduler.start();
|
||||
@ -508,38 +501,36 @@ TEST(Actors, later) {
|
||||
ASSERT_STREQ(sb.as_cslice().c_str(), "AAABBB");
|
||||
}
|
||||
|
||||
class MultiPromise2 final : public Actor {
|
||||
class MultiPromise2 final : public td::Actor {
|
||||
public:
|
||||
void start_up() final {
|
||||
auto promise = PromiseCreator::lambda([](Result<Unit> result) {
|
||||
auto promise = td::PromiseCreator::lambda([](td::Result<td::Unit> result) {
|
||||
result.ensure();
|
||||
Scheduler::instance()->finish();
|
||||
td::Scheduler::instance()->finish();
|
||||
});
|
||||
|
||||
MultiPromiseActorSafe multi_promise{"MultiPromiseActor2"};
|
||||
td::MultiPromiseActorSafe multi_promise{"MultiPromiseActor2"};
|
||||
multi_promise.add_promise(std::move(promise));
|
||||
for (int i = 0; i < 10; i++) {
|
||||
create_actor<SleepActor>("Sleep", 0.1, multi_promise.get_promise()).release();
|
||||
td::create_actor<td::SleepActor>("Sleep", 0.1, multi_promise.get_promise()).release();
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
class MultiPromise1 final : public Actor {
|
||||
class MultiPromise1 final : public td::Actor {
|
||||
public:
|
||||
void start_up() final {
|
||||
auto promise = PromiseCreator::lambda([](Result<Unit> result) {
|
||||
auto promise = td::PromiseCreator::lambda([](td::Result<td::Unit> result) {
|
||||
CHECK(result.is_error());
|
||||
create_actor<MultiPromise2>("B").release();
|
||||
td::create_actor<MultiPromise2>("B").release();
|
||||
});
|
||||
MultiPromiseActorSafe multi_promise{"MultiPromiseActor1"};
|
||||
td::MultiPromiseActorSafe multi_promise{"MultiPromiseActor1"};
|
||||
multi_promise.add_promise(std::move(promise));
|
||||
}
|
||||
};
|
||||
|
||||
TEST(Actors, MultiPromise) {
|
||||
SET_VERBOSITY_LEVEL(VERBOSITY_NAME(ERROR));
|
||||
sb.clear();
|
||||
ConcurrentScheduler scheduler;
|
||||
td::ConcurrentScheduler scheduler;
|
||||
scheduler.init(0);
|
||||
scheduler.create_actor_unsafe<MultiPromise1>(0, "A").release();
|
||||
scheduler.start();
|
||||
@ -548,22 +539,20 @@ TEST(Actors, MultiPromise) {
|
||||
scheduler.finish();
|
||||
}
|
||||
|
||||
class FastPromise final : public Actor {
|
||||
class FastPromise final : public td::Actor {
|
||||
public:
|
||||
void start_up() final {
|
||||
PromiseFuture<int> pf;
|
||||
td::PromiseFuture<int> pf;
|
||||
auto promise = pf.move_promise();
|
||||
auto future = pf.move_future();
|
||||
promise.set_value(123);
|
||||
CHECK(future.move_as_ok() == 123);
|
||||
Scheduler::instance()->finish();
|
||||
td::Scheduler::instance()->finish();
|
||||
}
|
||||
};
|
||||
|
||||
TEST(Actors, FastPromise) {
|
||||
SET_VERBOSITY_LEVEL(VERBOSITY_NAME(ERROR));
|
||||
sb.clear();
|
||||
ConcurrentScheduler scheduler;
|
||||
td::ConcurrentScheduler scheduler;
|
||||
scheduler.init(0);
|
||||
scheduler.create_actor_unsafe<FastPromise>(0, "A").release();
|
||||
scheduler.start();
|
||||
@ -572,20 +561,18 @@ TEST(Actors, FastPromise) {
|
||||
scheduler.finish();
|
||||
}
|
||||
|
||||
class StopInTeardown final : public Actor {
|
||||
class StopInTeardown final : public td::Actor {
|
||||
void loop() final {
|
||||
stop();
|
||||
}
|
||||
void tear_down() final {
|
||||
stop();
|
||||
Scheduler::instance()->finish();
|
||||
td::Scheduler::instance()->finish();
|
||||
}
|
||||
};
|
||||
|
||||
TEST(Actors, stop_in_teardown) {
|
||||
SET_VERBOSITY_LEVEL(VERBOSITY_NAME(ERROR));
|
||||
sb.clear();
|
||||
ConcurrentScheduler scheduler;
|
||||
td::ConcurrentScheduler scheduler;
|
||||
scheduler.init(0);
|
||||
scheduler.create_actor_unsafe<StopInTeardown>(0, "A").release();
|
||||
scheduler.start();
|
||||
@ -594,34 +581,33 @@ TEST(Actors, stop_in_teardown) {
|
||||
scheduler.finish();
|
||||
}
|
||||
|
||||
class AlwaysWaitForMailbox final : public Actor {
|
||||
class AlwaysWaitForMailbox final : public td::Actor {
|
||||
public:
|
||||
void start_up() final {
|
||||
always_wait_for_mailbox();
|
||||
create_actor<SleepActor>("Sleep", 0.1, PromiseCreator::lambda([actor_id = actor_id(this), ptr = this](Unit) {
|
||||
send_closure(actor_id, &AlwaysWaitForMailbox::g);
|
||||
send_closure(actor_id, &AlwaysWaitForMailbox::g);
|
||||
CHECK(!ptr->was_f_);
|
||||
}))
|
||||
td::create_actor<td::SleepActor>("Sleep", 0.1,
|
||||
td::PromiseCreator::lambda([actor_id = actor_id(this), ptr = this](td::Unit) {
|
||||
td::send_closure(actor_id, &AlwaysWaitForMailbox::g);
|
||||
td::send_closure(actor_id, &AlwaysWaitForMailbox::g);
|
||||
CHECK(!ptr->was_f_);
|
||||
}))
|
||||
.release();
|
||||
}
|
||||
|
||||
void f() {
|
||||
was_f_ = true;
|
||||
Scheduler::instance()->finish();
|
||||
td::Scheduler::instance()->finish();
|
||||
}
|
||||
void g() {
|
||||
send_closure(actor_id(this), &AlwaysWaitForMailbox::f);
|
||||
td::send_closure(actor_id(this), &AlwaysWaitForMailbox::f);
|
||||
}
|
||||
|
||||
private:
|
||||
Timeout timeout_;
|
||||
bool was_f_{false};
|
||||
};
|
||||
|
||||
TEST(Actors, always_wait_for_mailbox) {
|
||||
SET_VERBOSITY_LEVEL(VERBOSITY_NAME(ERROR));
|
||||
ConcurrentScheduler scheduler;
|
||||
td::ConcurrentScheduler scheduler;
|
||||
scheduler.init(0);
|
||||
scheduler.create_actor_unsafe<AlwaysWaitForMailbox>(0, "A").release();
|
||||
scheduler.start();
|
||||
@ -632,17 +618,16 @@ TEST(Actors, always_wait_for_mailbox) {
|
||||
|
||||
#if !TD_THREAD_UNSUPPORTED && !TD_EVENTFD_UNSUPPORTED
|
||||
TEST(Actors, send_from_other_threads) {
|
||||
SET_VERBOSITY_LEVEL(VERBOSITY_NAME(ERROR));
|
||||
ConcurrentScheduler scheduler;
|
||||
td::ConcurrentScheduler scheduler;
|
||||
scheduler.init(1);
|
||||
int thread_n = 10;
|
||||
class Listener final : public Actor {
|
||||
class Listener final : public td::Actor {
|
||||
public:
|
||||
explicit Listener(int cnt) : cnt_(cnt) {
|
||||
}
|
||||
void dec() {
|
||||
if (--cnt_ == 0) {
|
||||
Scheduler::instance()->finish();
|
||||
td::Scheduler::instance()->finish();
|
||||
}
|
||||
}
|
||||
|
||||
@ -652,11 +637,11 @@ TEST(Actors, send_from_other_threads) {
|
||||
|
||||
auto A = scheduler.create_actor_unsafe<Listener>(1, "A", thread_n).release();
|
||||
scheduler.start();
|
||||
std::vector<td::thread> threads(thread_n);
|
||||
td::vector<td::thread> threads(thread_n);
|
||||
for (auto &thread : threads) {
|
||||
thread = td::thread([&A, &scheduler] {
|
||||
auto guard = scheduler.get_send_guard();
|
||||
send_closure(A, &Listener::dec);
|
||||
td::send_closure(A, &Listener::dec);
|
||||
});
|
||||
}
|
||||
while (scheduler.run_main(10)) {
|
||||
@ -668,7 +653,7 @@ TEST(Actors, send_from_other_threads) {
|
||||
}
|
||||
#endif
|
||||
|
||||
class DelayedCall final : public Actor {
|
||||
class DelayedCall final : public td::Actor {
|
||||
public:
|
||||
void on_called(int *order) {
|
||||
CHECK(*order == 0);
|
||||
@ -676,18 +661,18 @@ class DelayedCall final : public Actor {
|
||||
}
|
||||
};
|
||||
|
||||
class MultiPromiseSendClosureLaterTest final : public Actor {
|
||||
class MultiPromiseSendClosureLaterTest final : public td::Actor {
|
||||
public:
|
||||
void start_up() final {
|
||||
delayed_call_ = create_actor<DelayedCall>("DelayedCall").release();
|
||||
mpa_.add_promise(PromiseCreator::lambda([this](Unit) {
|
||||
delayed_call_ = td::create_actor<DelayedCall>("DelayedCall").release();
|
||||
mpa_.add_promise(td::PromiseCreator::lambda([this](td::Unit) {
|
||||
CHECK(order_ == 1);
|
||||
order_++;
|
||||
Scheduler::instance()->finish();
|
||||
td::Scheduler::instance()->finish();
|
||||
}));
|
||||
auto lock = mpa_.get_promise();
|
||||
send_closure_later(delayed_call_, &DelayedCall::on_called, &order_);
|
||||
lock.set_value(Unit());
|
||||
td::send_closure_later(delayed_call_, &DelayedCall::on_called, &order_);
|
||||
lock.set_value(td::Unit());
|
||||
}
|
||||
|
||||
void tear_down() final {
|
||||
@ -696,12 +681,12 @@ class MultiPromiseSendClosureLaterTest final : public Actor {
|
||||
|
||||
private:
|
||||
int order_ = 0;
|
||||
MultiPromiseActor mpa_{"MultiPromiseActor"};
|
||||
ActorId<DelayedCall> delayed_call_;
|
||||
td::MultiPromiseActor mpa_{"MultiPromiseActor"};
|
||||
td::ActorId<DelayedCall> delayed_call_;
|
||||
};
|
||||
|
||||
TEST(Actors, MultiPromiseSendClosureLater) {
|
||||
ConcurrentScheduler scheduler;
|
||||
td::ConcurrentScheduler scheduler;
|
||||
scheduler.init(0);
|
||||
scheduler.create_actor_unsafe<MultiPromiseSendClosureLaterTest>(0, "MultiPromiseSendClosureLaterTest").release();
|
||||
scheduler.start();
|
||||
|
@ -12,9 +12,7 @@
|
||||
|
||||
namespace {
|
||||
|
||||
using namespace td;
|
||||
|
||||
class PowerWorker final : public Actor {
|
||||
class PowerWorker final : public td::Actor {
|
||||
public:
|
||||
class Callback {
|
||||
public:
|
||||
@ -27,12 +25,12 @@ class PowerWorker final : public Actor {
|
||||
virtual void on_ready(int query, int res) = 0;
|
||||
virtual void on_closed() = 0;
|
||||
};
|
||||
void set_callback(unique_ptr<Callback> callback) {
|
||||
void set_callback(td::unique_ptr<Callback> callback) {
|
||||
callback_ = std::move(callback);
|
||||
}
|
||||
void task(uint32 x, uint32 p) {
|
||||
uint32 res = 1;
|
||||
for (uint32 i = 0; i < p; i++) {
|
||||
void task(td::uint32 x, td::uint32 p) {
|
||||
td::uint32 res = 1;
|
||||
for (td::uint32 i = 0; i < p; i++) {
|
||||
res *= x;
|
||||
}
|
||||
callback_->on_ready(x, res);
|
||||
@ -43,12 +41,12 @@ class PowerWorker final : public Actor {
|
||||
}
|
||||
|
||||
private:
|
||||
unique_ptr<Callback> callback_;
|
||||
td::unique_ptr<Callback> callback_;
|
||||
};
|
||||
|
||||
class Manager final : public Actor {
|
||||
class Manager final : public td::Actor {
|
||||
public:
|
||||
Manager(int queries_n, int query_size, std::vector<ActorId<PowerWorker>> workers)
|
||||
Manager(int queries_n, int query_size, td::vector<td::ActorId<PowerWorker>> workers)
|
||||
: workers_(std::move(workers))
|
||||
, ref_cnt_(static_cast<int>(workers_.size()))
|
||||
, left_query_(queries_n)
|
||||
@ -57,17 +55,17 @@ class Manager final : public Actor {
|
||||
|
||||
class Callback final : public PowerWorker::Callback {
|
||||
public:
|
||||
Callback(ActorId<Manager> actor_id, int worker_id) : actor_id_(actor_id), worker_id_(worker_id) {
|
||||
Callback(td::ActorId<Manager> actor_id, int worker_id) : actor_id_(actor_id), worker_id_(worker_id) {
|
||||
}
|
||||
void on_ready(int query, int result) final {
|
||||
send_closure(actor_id_, &Manager::on_ready, worker_id_, query, result);
|
||||
td::send_closure(actor_id_, &Manager::on_ready, worker_id_, query, result);
|
||||
}
|
||||
void on_closed() final {
|
||||
send_closure_later(actor_id_, &Manager::on_closed, worker_id_);
|
||||
td::send_closure_later(actor_id_, &Manager::on_closed, worker_id_);
|
||||
}
|
||||
|
||||
private:
|
||||
ActorId<Manager> actor_id_;
|
||||
td::ActorId<Manager> actor_id_;
|
||||
int worker_id_;
|
||||
};
|
||||
|
||||
@ -75,9 +73,9 @@ class Manager final : public Actor {
|
||||
int i = 0;
|
||||
for (auto &worker : workers_) {
|
||||
ref_cnt_++;
|
||||
send_closure_later(worker, &PowerWorker::set_callback, make_unique<Callback>(actor_id(this), i));
|
||||
td::send_closure_later(worker, &PowerWorker::set_callback, td::make_unique<Callback>(actor_id(this), i));
|
||||
i++;
|
||||
send_closure_later(worker, &PowerWorker::task, 3, query_size_);
|
||||
td::send_closure_later(worker, &PowerWorker::task, 3, query_size_);
|
||||
left_query_--;
|
||||
}
|
||||
}
|
||||
@ -85,10 +83,10 @@ class Manager final : public Actor {
|
||||
void on_ready(int worker_id, int query, int res) {
|
||||
ref_cnt_--;
|
||||
if (left_query_ == 0) {
|
||||
send_closure(workers_[worker_id], &PowerWorker::close);
|
||||
td::send_closure(workers_[worker_id], &PowerWorker::close);
|
||||
} else {
|
||||
ref_cnt_++;
|
||||
send_closure(workers_[worker_id], &PowerWorker::task, 3, query_size_);
|
||||
td::send_closure(workers_[worker_id], &PowerWorker::task, 3, query_size_);
|
||||
left_query_--;
|
||||
}
|
||||
}
|
||||
@ -96,23 +94,23 @@ class Manager final : public Actor {
|
||||
void on_closed(int worker_id) {
|
||||
ref_cnt_--;
|
||||
if (ref_cnt_ == 0) {
|
||||
Scheduler::instance()->finish();
|
||||
td::Scheduler::instance()->finish();
|
||||
stop();
|
||||
}
|
||||
}
|
||||
|
||||
private:
|
||||
std::vector<ActorId<PowerWorker>> workers_;
|
||||
td::vector<td::ActorId<PowerWorker>> workers_;
|
||||
int ref_cnt_;
|
||||
int left_query_;
|
||||
int query_size_;
|
||||
};
|
||||
|
||||
static void test_workers(int threads_n, int workers_n, int queries_n, int query_size) {
|
||||
ConcurrentScheduler sched;
|
||||
td::ConcurrentScheduler sched;
|
||||
sched.init(threads_n);
|
||||
|
||||
std::vector<ActorId<PowerWorker>> workers;
|
||||
td::vector<td::ActorId<PowerWorker>> workers;
|
||||
for (int i = 0; i < workers_n; i++) {
|
||||
int thread_id = threads_n ? i % (threads_n - 1) + 2 : 0;
|
||||
workers.push_back(sched.create_actor_unsafe<PowerWorker>(thread_id, PSLICE() << "worker" << i).release());
|
||||
|
Loading…
Reference in New Issue
Block a user