9cd334cc4b
GitOrigin-RevId: 947ad46a22b8dafe4b5f107e6c6e07a7c92ec7cf
623 lines
15 KiB
C++
623 lines
15 KiB
C++
//
|
|
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
|
|
//
|
|
// Distributed under the Boost Software License, Version 1.0. (See accompanying
|
|
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
|
|
//
|
|
#include "td/utils/tests.h"
|
|
|
|
#include "td/actor/actor.h"
|
|
#include "td/actor/MultiPromise.h"
|
|
#include "td/actor/PromiseFuture.h"
|
|
#include "td/actor/SleepActor.h"
|
|
#include "td/actor/Timeout.h"
|
|
|
|
#include "td/utils/logging.h"
|
|
#include "td/utils/Observer.h"
|
|
#include "td/utils/port/FileFd.h"
|
|
#include "td/utils/Slice.h"
|
|
#include "td/utils/Status.h"
|
|
#include "td/utils/StringBuilder.h"
|
|
|
|
#include <tuple>
|
|
|
|
REGISTER_TESTS(actors_simple)
|
|
|
|
namespace {
|
|
using namespace td;
|
|
|
|
static const size_t BUF_SIZE = 1024 * 1024;
|
|
static char buf[BUF_SIZE];
|
|
static char buf2[BUF_SIZE];
|
|
static StringBuilder sb(MutableSlice(buf, BUF_SIZE - 1));
|
|
static StringBuilder sb2(MutableSlice(buf2, BUF_SIZE - 1));
|
|
|
|
TEST(Actors, SendLater) {
|
|
SET_VERBOSITY_LEVEL(VERBOSITY_NAME(ERROR));
|
|
sb.clear();
|
|
Scheduler scheduler;
|
|
scheduler.init();
|
|
|
|
auto guard = scheduler.get_guard();
|
|
class Worker : public Actor {
|
|
public:
|
|
void f() {
|
|
sb << "A";
|
|
}
|
|
};
|
|
auto id = create_actor<Worker>("Worker");
|
|
scheduler.run_no_guard(0);
|
|
send_closure(id, &Worker::f);
|
|
send_closure_later(id, &Worker::f);
|
|
send_closure(id, &Worker::f);
|
|
ASSERT_STREQ("A", sb.as_cslice().c_str());
|
|
scheduler.run_no_guard(0);
|
|
ASSERT_STREQ("AAA", sb.as_cslice().c_str());
|
|
}
|
|
|
|
class X {
|
|
public:
|
|
X() {
|
|
sb << "[cnstr_default]";
|
|
}
|
|
X(const X &) {
|
|
sb << "[cnstr_copy]";
|
|
}
|
|
X(X &&) {
|
|
sb << "[cnstr_move]";
|
|
}
|
|
X &operator=(const X &) {
|
|
sb << "[set_copy]";
|
|
return *this;
|
|
}
|
|
X &operator=(X &&) {
|
|
sb << "[set_move]";
|
|
return *this;
|
|
}
|
|
~X() = default;
|
|
};
|
|
|
|
class XReceiver final : public Actor {
|
|
public:
|
|
void by_const_ref(const X &) {
|
|
sb << "[by_const_ref]";
|
|
}
|
|
void by_lvalue_ref(const X &) {
|
|
sb << "[by_lvalue_ref]";
|
|
}
|
|
void by_value(X) {
|
|
sb << "[by_value]";
|
|
}
|
|
};
|
|
|
|
TEST(Actors, simple_pass_event_arguments) {
|
|
SET_VERBOSITY_LEVEL(VERBOSITY_NAME(ERROR));
|
|
Scheduler scheduler;
|
|
scheduler.init();
|
|
|
|
auto guard = scheduler.get_guard();
|
|
auto id = create_actor<XReceiver>("XR").release();
|
|
scheduler.run_no_guard(0);
|
|
|
|
X x;
|
|
|
|
// check tuple
|
|
// std::tuple<X> tx;
|
|
// sb.clear();
|
|
// std::tuple<X> ty(std::move(tx));
|
|
// tx = std::move(ty);
|
|
// ASSERT_STREQ("[cnstr_move]", sb.as_cslice().c_str());
|
|
|
|
// Send temporary object
|
|
|
|
// Tmp-->ConstRef
|
|
sb.clear();
|
|
send_closure(id, &XReceiver::by_const_ref, X());
|
|
ASSERT_STREQ("[cnstr_default][by_const_ref]", sb.as_cslice().c_str());
|
|
|
|
// Tmp-->ConstRef (Delayed)
|
|
sb.clear();
|
|
send_closure_later(id, &XReceiver::by_const_ref, X());
|
|
scheduler.run_no_guard(0);
|
|
// LOG(ERROR) << sb.as_cslice();
|
|
ASSERT_STREQ("[cnstr_default][cnstr_move][by_const_ref]", sb.as_cslice().c_str());
|
|
|
|
// Tmp-->LvalueRef
|
|
sb.clear();
|
|
send_closure(id, &XReceiver::by_lvalue_ref, X());
|
|
ASSERT_STREQ("[cnstr_default][by_lvalue_ref]", sb.as_cslice().c_str());
|
|
|
|
// Tmp-->LvalueRef (Delayed)
|
|
sb.clear();
|
|
send_closure_later(id, &XReceiver::by_lvalue_ref, X());
|
|
scheduler.run_no_guard(0);
|
|
ASSERT_STREQ("[cnstr_default][cnstr_move][by_lvalue_ref]", sb.as_cslice().c_str());
|
|
|
|
// Tmp-->Value
|
|
sb.clear();
|
|
send_closure(id, &XReceiver::by_value, X());
|
|
ASSERT_STREQ("[cnstr_default][cnstr_move][by_value]", sb.as_cslice().c_str());
|
|
|
|
// Tmp-->Value (Delayed)
|
|
sb.clear();
|
|
send_closure_later(id, &XReceiver::by_value, X());
|
|
scheduler.run_no_guard(0);
|
|
ASSERT_STREQ("[cnstr_default][cnstr_move][cnstr_move][by_value]", sb.as_cslice().c_str());
|
|
|
|
// Var-->ConstRef
|
|
sb.clear();
|
|
send_closure(id, &XReceiver::by_const_ref, x);
|
|
ASSERT_STREQ("[by_const_ref]", sb.as_cslice().c_str());
|
|
|
|
// Var-->ConstRef (Delayed)
|
|
sb.clear();
|
|
send_closure_later(id, &XReceiver::by_const_ref, x);
|
|
scheduler.run_no_guard(0);
|
|
ASSERT_STREQ("[cnstr_copy][by_const_ref]", sb.as_cslice().c_str());
|
|
|
|
// Var-->LvalueRef
|
|
// Var-->LvalueRef (Delayed)
|
|
// CE or stange behaviour
|
|
|
|
// Var-->Value
|
|
sb.clear();
|
|
send_closure(id, &XReceiver::by_value, x);
|
|
ASSERT_STREQ("[cnstr_copy][by_value]", sb.as_cslice().c_str());
|
|
|
|
// Var-->Value (Delayed)
|
|
sb.clear();
|
|
send_closure_later(id, &XReceiver::by_value, x);
|
|
scheduler.run_no_guard(0);
|
|
ASSERT_STREQ("[cnstr_copy][cnstr_move][by_value]", sb.as_cslice().c_str());
|
|
}
|
|
|
|
class PrintChar final : public Actor {
|
|
public:
|
|
PrintChar(char c, int cnt) : char_(c), cnt_(cnt) {
|
|
}
|
|
void start_up() override {
|
|
yield();
|
|
}
|
|
void wakeup() override {
|
|
if (cnt_ == 0) {
|
|
stop();
|
|
} else {
|
|
sb << char_;
|
|
cnt_--;
|
|
yield();
|
|
}
|
|
}
|
|
|
|
private:
|
|
char char_;
|
|
int cnt_;
|
|
};
|
|
} // namespace
|
|
|
|
//
|
|
// Yield must add actor to the end of queue
|
|
//
|
|
TEST(Actors, simple_hand_yield) {
|
|
SET_VERBOSITY_LEVEL(VERBOSITY_NAME(ERROR));
|
|
Scheduler scheduler;
|
|
scheduler.init();
|
|
sb.clear();
|
|
int cnt = 1000;
|
|
{
|
|
auto guard = scheduler.get_guard();
|
|
create_actor<PrintChar>("PrintA", 'A', cnt).release();
|
|
create_actor<PrintChar>("PrintB", 'B', cnt).release();
|
|
create_actor<PrintChar>("PrintC", 'C', cnt).release();
|
|
}
|
|
scheduler.run(0);
|
|
std::string expected;
|
|
for (int i = 0; i < cnt; i++) {
|
|
expected += "ABC";
|
|
}
|
|
ASSERT_STREQ(expected.c_str(), sb.as_cslice().c_str());
|
|
}
|
|
|
|
class Ball {
|
|
public:
|
|
friend void start_migrate(Ball &ball, int32 sched_id) {
|
|
sb << "start";
|
|
}
|
|
friend void finish_migrate(Ball &ball) {
|
|
sb2 << "finish";
|
|
}
|
|
};
|
|
|
|
class Pong final : public Actor {
|
|
public:
|
|
void pong(Ball ball) {
|
|
Scheduler::instance()->finish();
|
|
}
|
|
};
|
|
|
|
class Ping final : public Actor {
|
|
public:
|
|
explicit Ping(ActorId<Pong> pong) : pong_(pong) {
|
|
}
|
|
void start_up() override {
|
|
send_closure(pong_, &Pong::pong, Ball());
|
|
}
|
|
|
|
private:
|
|
ActorId<Pong> pong_;
|
|
};
|
|
|
|
TEST(Actors, simple_migrate) {
|
|
sb.clear();
|
|
sb2.clear();
|
|
|
|
ConcurrentScheduler scheduler;
|
|
scheduler.init(2);
|
|
auto pong = scheduler.create_actor_unsafe<Pong>(2, "Pong").release();
|
|
scheduler.create_actor_unsafe<Ping>(1, "Ping", pong).release();
|
|
scheduler.start();
|
|
while (scheduler.run_main(10)) {
|
|
}
|
|
scheduler.finish();
|
|
#if TD_THREAD_UNSUPPORTED || TD_EVENTFD_UNSUPPORTED
|
|
ASSERT_STREQ("", sb.as_cslice().c_str());
|
|
ASSERT_STREQ("", sb2.as_cslice().c_str());
|
|
#else
|
|
ASSERT_STREQ("start", sb.as_cslice().c_str());
|
|
ASSERT_STREQ("finish", sb2.as_cslice().c_str());
|
|
#endif
|
|
}
|
|
|
|
class OpenClose final : public Actor {
|
|
public:
|
|
explicit OpenClose(int cnt) : cnt_(cnt) {
|
|
}
|
|
void start_up() override {
|
|
yield();
|
|
}
|
|
void wakeup() override {
|
|
ObserverBase *observer = reinterpret_cast<ObserverBase *>(123);
|
|
if (cnt_ > 0) {
|
|
auto r_file_fd = FileFd::open("server", FileFd::Read | FileFd::Create);
|
|
CHECK(r_file_fd.is_ok()) << r_file_fd.error();
|
|
auto file_fd = r_file_fd.move_as_ok();
|
|
// LOG(ERROR) << file_fd.get_native_fd();
|
|
file_fd.get_fd().set_observer(observer);
|
|
file_fd.close();
|
|
cnt_--;
|
|
yield();
|
|
} else {
|
|
Scheduler::instance()->finish();
|
|
}
|
|
}
|
|
|
|
private:
|
|
int cnt_;
|
|
};
|
|
|
|
TEST(Actors, open_close) {
|
|
SET_VERBOSITY_LEVEL(VERBOSITY_NAME(ERROR));
|
|
ConcurrentScheduler scheduler;
|
|
scheduler.init(2);
|
|
int cnt = 1000000;
|
|
#if TD_WINDOWS || TD_ANDROID
|
|
// TODO(perf) optimize
|
|
cnt = 100;
|
|
#endif
|
|
scheduler.create_actor_unsafe<OpenClose>(1, "A", cnt).release();
|
|
scheduler.create_actor_unsafe<OpenClose>(2, "B", cnt).release();
|
|
scheduler.start();
|
|
while (scheduler.run_main(10)) {
|
|
}
|
|
scheduler.finish();
|
|
}
|
|
|
|
namespace {
|
|
class MsgActor : public Actor {
|
|
public:
|
|
virtual void msg() = 0;
|
|
};
|
|
|
|
class Slave : public Actor {
|
|
public:
|
|
ActorId<MsgActor> msg;
|
|
explicit Slave(ActorId<MsgActor> msg) : msg(msg) {
|
|
}
|
|
void hangup() override {
|
|
send_closure(msg, &MsgActor::msg);
|
|
}
|
|
};
|
|
|
|
class MasterActor : public MsgActor {
|
|
public:
|
|
void loop() override {
|
|
alive_ = true;
|
|
slave = create_actor<Slave>("slave", static_cast<ActorId<MsgActor>>(actor_id(this)));
|
|
stop();
|
|
}
|
|
ActorOwn<Slave> slave;
|
|
|
|
MasterActor() = default;
|
|
MasterActor(const MasterActor &) = delete;
|
|
MasterActor &operator=(const MasterActor &) = delete;
|
|
MasterActor(MasterActor &&) = delete;
|
|
MasterActor &operator=(MasterActor &&) = delete;
|
|
~MasterActor() override {
|
|
alive_ = 987654321;
|
|
}
|
|
void msg() override {
|
|
CHECK(alive_ == 123456789);
|
|
}
|
|
uint64 alive_ = 123456789;
|
|
};
|
|
} // namespace
|
|
|
|
TEST(Actors, call_after_destruct) {
|
|
SET_VERBOSITY_LEVEL(VERBOSITY_NAME(ERROR));
|
|
Scheduler scheduler;
|
|
scheduler.init();
|
|
{
|
|
auto guard = scheduler.get_guard();
|
|
create_actor<MasterActor>("Master").release();
|
|
}
|
|
scheduler.run(0);
|
|
}
|
|
|
|
class LinkTokenSlave : public Actor {
|
|
public:
|
|
explicit LinkTokenSlave(ActorShared<> parent) : parent_(std::move(parent)) {
|
|
}
|
|
void add(uint64 link_token) {
|
|
CHECK(link_token == get_link_token());
|
|
}
|
|
void close() {
|
|
stop();
|
|
}
|
|
|
|
private:
|
|
ActorShared<> parent_;
|
|
};
|
|
|
|
class LinkTokenMasterActor : public Actor {
|
|
public:
|
|
explicit LinkTokenMasterActor(int cnt) : cnt_(cnt) {
|
|
}
|
|
void start_up() override {
|
|
child_ = create_actor<LinkTokenSlave>("Slave", actor_shared(this, 123)).release();
|
|
yield();
|
|
}
|
|
void loop() override {
|
|
for (int i = 0; i < 100 && cnt_ > 0; cnt_--, i++) {
|
|
switch (i % 4) {
|
|
case 0: {
|
|
send_closure(ActorShared<LinkTokenSlave>(child_, cnt_ + 1), &LinkTokenSlave::add, cnt_ + 1);
|
|
break;
|
|
}
|
|
case 1: {
|
|
send_closure_later(ActorShared<LinkTokenSlave>(child_, cnt_ + 1), &LinkTokenSlave::add, cnt_ + 1);
|
|
break;
|
|
}
|
|
case 2: {
|
|
EventCreator::closure(ActorShared<LinkTokenSlave>(child_, cnt_ + 1), &LinkTokenSlave::add, cnt_ + 1)
|
|
.try_emit();
|
|
break;
|
|
}
|
|
case 3: {
|
|
EventCreator::closure(ActorShared<LinkTokenSlave>(child_, cnt_ + 1), &LinkTokenSlave::add, cnt_ + 1)
|
|
.try_emit_later();
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
if (cnt_ == 0) {
|
|
send_closure(child_, &LinkTokenSlave::close);
|
|
} else {
|
|
yield();
|
|
}
|
|
}
|
|
|
|
void hangup_shared() override {
|
|
CHECK(get_link_token() == 123);
|
|
Scheduler::instance()->finish();
|
|
stop();
|
|
}
|
|
|
|
private:
|
|
int cnt_;
|
|
ActorId<LinkTokenSlave> child_;
|
|
};
|
|
|
|
TEST(Actors, link_token) {
|
|
SET_VERBOSITY_LEVEL(VERBOSITY_NAME(ERROR));
|
|
ConcurrentScheduler scheduler;
|
|
scheduler.init(0);
|
|
auto cnt = 100000;
|
|
scheduler.create_actor_unsafe<LinkTokenMasterActor>(0, "A", cnt).release();
|
|
scheduler.start();
|
|
while (scheduler.run_main(10)) {
|
|
}
|
|
scheduler.finish();
|
|
}
|
|
|
|
TEST(Actors, promise) {
|
|
int value = -1;
|
|
Promise<int> p1 = PromiseCreator::lambda([&](int x) { value = x; });
|
|
p1.set_error(Status::Error("Test error"));
|
|
ASSERT_EQ(0, value);
|
|
Promise<int32> p2 = PromiseCreator::lambda([&](Result<int32> x) { value = 1; });
|
|
p2.set_error(Status::Error("Test error"));
|
|
ASSERT_EQ(1, value);
|
|
}
|
|
|
|
class LaterSlave : public Actor {
|
|
public:
|
|
explicit LaterSlave(ActorShared<> parent) : parent_(std::move(parent)) {
|
|
}
|
|
|
|
private:
|
|
ActorShared<> parent_;
|
|
|
|
void hangup() override {
|
|
sb << "A";
|
|
send_closure(actor_id(this), &LaterSlave::finish);
|
|
}
|
|
void finish() {
|
|
sb << "B";
|
|
stop();
|
|
}
|
|
};
|
|
|
|
class LaterMasterActor : public Actor {
|
|
int cnt_ = 3;
|
|
std::vector<ActorOwn<LaterSlave>> children_;
|
|
void start_up() override {
|
|
for (int i = 0; i < cnt_; i++) {
|
|
children_.push_back(create_actor<LaterSlave>("B", actor_shared()));
|
|
}
|
|
yield();
|
|
}
|
|
void loop() override {
|
|
children_.clear();
|
|
}
|
|
void hangup_shared() override {
|
|
if (!--cnt_) {
|
|
Scheduler::instance()->finish();
|
|
stop();
|
|
}
|
|
}
|
|
};
|
|
|
|
TEST(Actors, later) {
|
|
SET_VERBOSITY_LEVEL(VERBOSITY_NAME(ERROR));
|
|
sb.clear();
|
|
ConcurrentScheduler scheduler;
|
|
scheduler.init(0);
|
|
scheduler.create_actor_unsafe<LaterMasterActor>(0, "A").release();
|
|
scheduler.start();
|
|
while (scheduler.run_main(10)) {
|
|
}
|
|
scheduler.finish();
|
|
ASSERT_STREQ(sb.as_cslice().c_str(), "AAABBB");
|
|
}
|
|
|
|
class MultiPromise2 : public Actor {
|
|
public:
|
|
void start_up() override {
|
|
auto promise = PromiseCreator::lambda([](Result<Unit> result) {
|
|
result.ensure();
|
|
Scheduler::instance()->finish();
|
|
});
|
|
|
|
MultiPromiseActorSafe multi_promise;
|
|
multi_promise.add_promise(std::move(promise));
|
|
for (int i = 0; i < 10; i++) {
|
|
create_actor<SleepActor>("Sleep", 0.1, multi_promise.get_promise()).release();
|
|
}
|
|
}
|
|
};
|
|
|
|
class MultiPromise1 : public Actor {
|
|
public:
|
|
void start_up() override {
|
|
auto promise = PromiseCreator::lambda([](Result<Unit> result) {
|
|
CHECK(result.is_error());
|
|
create_actor<MultiPromise2>("B").release();
|
|
});
|
|
MultiPromiseActorSafe multi_promise;
|
|
multi_promise.add_promise(std::move(promise));
|
|
}
|
|
};
|
|
|
|
TEST(Actors, MultiPromise) {
|
|
SET_VERBOSITY_LEVEL(VERBOSITY_NAME(ERROR));
|
|
sb.clear();
|
|
ConcurrentScheduler scheduler;
|
|
scheduler.init(0);
|
|
scheduler.create_actor_unsafe<MultiPromise1>(0, "A").release();
|
|
scheduler.start();
|
|
while (scheduler.run_main(10)) {
|
|
}
|
|
scheduler.finish();
|
|
}
|
|
|
|
class FastPromise : public Actor {
|
|
public:
|
|
void start_up() override {
|
|
PromiseFuture<int> pf;
|
|
auto promise = pf.move_promise();
|
|
auto future = pf.move_future();
|
|
promise.set_value(123);
|
|
CHECK(future.move_as_ok() == 123);
|
|
Scheduler::instance()->finish();
|
|
}
|
|
};
|
|
|
|
TEST(Actors, FastPromise) {
|
|
SET_VERBOSITY_LEVEL(VERBOSITY_NAME(ERROR));
|
|
sb.clear();
|
|
ConcurrentScheduler scheduler;
|
|
scheduler.init(0);
|
|
scheduler.create_actor_unsafe<FastPromise>(0, "A").release();
|
|
scheduler.start();
|
|
while (scheduler.run_main(10)) {
|
|
}
|
|
scheduler.finish();
|
|
}
|
|
|
|
class StopInTeardown : public Actor {
|
|
void loop() override {
|
|
stop();
|
|
}
|
|
void tear_down() override {
|
|
stop();
|
|
Scheduler::instance()->finish();
|
|
}
|
|
};
|
|
|
|
TEST(Actors, stop_in_teardown) {
|
|
SET_VERBOSITY_LEVEL(VERBOSITY_NAME(ERROR));
|
|
sb.clear();
|
|
ConcurrentScheduler scheduler;
|
|
scheduler.init(0);
|
|
scheduler.create_actor_unsafe<StopInTeardown>(0, "A").release();
|
|
scheduler.start();
|
|
while (scheduler.run_main(10)) {
|
|
}
|
|
scheduler.finish();
|
|
}
|
|
|
|
class AlwaysWaitForMailbox : public Actor {
|
|
public:
|
|
void start_up() override {
|
|
always_wait_for_mailbox();
|
|
create_actor<SleepActor>("Sleep", 0.1, PromiseCreator::lambda([actor_id = actor_id(this), ptr = this](Unit) {
|
|
send_closure(actor_id, &AlwaysWaitForMailbox::g);
|
|
send_closure(actor_id, &AlwaysWaitForMailbox::g);
|
|
CHECK(!ptr->was_f_);
|
|
}))
|
|
.release();
|
|
}
|
|
|
|
void f() {
|
|
was_f_ = true;
|
|
Scheduler::instance()->finish();
|
|
}
|
|
void g() {
|
|
send_closure(actor_id(this), &AlwaysWaitForMailbox::f);
|
|
}
|
|
|
|
private:
|
|
Timeout timeout_;
|
|
bool was_f_{false};
|
|
};
|
|
|
|
TEST(Actors, always_wait_for_mailbox) {
|
|
SET_VERBOSITY_LEVEL(VERBOSITY_NAME(ERROR));
|
|
ConcurrentScheduler scheduler;
|
|
scheduler.init(0);
|
|
scheduler.create_actor_unsafe<AlwaysWaitForMailbox>(0, "A").release();
|
|
scheduler.start();
|
|
while (scheduler.run_main(10)) {
|
|
}
|
|
scheduler.finish();
|
|
}
|