tdactor: remove actor2
GitOrigin-RevId: bcf3e8687088353cb83b32bfcddc6f2a4acf14d8
This commit is contained in:
parent
4594885e61
commit
5999b98fa7
@ -7,8 +7,6 @@ set(TDACTOR_SOURCE
|
|||||||
td/actor/MultiPromise.cpp
|
td/actor/MultiPromise.cpp
|
||||||
td/actor/Timeout.cpp
|
td/actor/Timeout.cpp
|
||||||
|
|
||||||
td/actor/impl2/Scheduler.cpp
|
|
||||||
|
|
||||||
td/actor/impl/Actor-decl.h
|
td/actor/impl/Actor-decl.h
|
||||||
td/actor/impl/Actor.h
|
td/actor/impl/Actor.h
|
||||||
td/actor/impl/ActorId-decl.h
|
td/actor/impl/ActorId-decl.h
|
||||||
@ -29,16 +27,9 @@ set(TDACTOR_SOURCE
|
|||||||
td/actor/SleepActor.h
|
td/actor/SleepActor.h
|
||||||
td/actor/Timeout.h
|
td/actor/Timeout.h
|
||||||
td/actor/actor.h
|
td/actor/actor.h
|
||||||
|
|
||||||
td/actor/impl2/ActorLocker.h
|
|
||||||
td/actor/impl2/ActorSignals.h
|
|
||||||
td/actor/impl2/ActorState.h
|
|
||||||
td/actor/impl2/Scheduler.h
|
|
||||||
td/actor/impl2/SchedulerId.h
|
|
||||||
)
|
)
|
||||||
|
|
||||||
set(TDACTOR_TEST_SOURCE
|
set(TDACTOR_TEST_SOURCE
|
||||||
${CMAKE_CURRENT_SOURCE_DIR}/test/actors_impl2.cpp
|
|
||||||
${CMAKE_CURRENT_SOURCE_DIR}/test/actors_main.cpp
|
${CMAKE_CURRENT_SOURCE_DIR}/test/actors_main.cpp
|
||||||
${CMAKE_CURRENT_SOURCE_DIR}/test/actors_simple.cpp
|
${CMAKE_CURRENT_SOURCE_DIR}/test/actors_simple.cpp
|
||||||
${CMAKE_CURRENT_SOURCE_DIR}/test/actors_workers.cpp
|
${CMAKE_CURRENT_SOURCE_DIR}/test/actors_workers.cpp
|
||||||
|
@ -1,117 +0,0 @@
|
|||||||
//
|
|
||||||
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
|
|
||||||
//
|
|
||||||
// Distributed under the Boost Software License, Version 1.0. (See accompanying
|
|
||||||
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
|
|
||||||
//
|
|
||||||
#pragma once
|
|
||||||
|
|
||||||
#include "td/actor/impl2/ActorSignals.h"
|
|
||||||
#include "td/actor/impl2/ActorState.h"
|
|
||||||
|
|
||||||
#include "td/utils/logging.h"
|
|
||||||
|
|
||||||
#include <atomic>
|
|
||||||
|
|
||||||
namespace td {
|
|
||||||
namespace actor2 {
|
|
||||||
class ActorLocker {
|
|
||||||
public:
|
|
||||||
struct Options {
|
|
||||||
Options() {
|
|
||||||
}
|
|
||||||
bool can_execute_paused = false;
|
|
||||||
bool is_shared = true;
|
|
||||||
Options &with_can_execute_paused(bool new_can_execute_paused) {
|
|
||||||
can_execute_paused = new_can_execute_paused;
|
|
||||||
return *this;
|
|
||||||
}
|
|
||||||
Options &with_is_shared(bool new_is_shared) {
|
|
||||||
is_shared = new_is_shared;
|
|
||||||
return *this;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
explicit ActorLocker(ActorState *state, Options options = {})
|
|
||||||
: state_(state), flags_(state->get_flags_unsafe()), new_flags_{}, options_{options} {
|
|
||||||
}
|
|
||||||
bool try_lock() {
|
|
||||||
CHECK(!own_lock());
|
|
||||||
while (!can_try_add_signals()) {
|
|
||||||
new_flags_ = flags_;
|
|
||||||
new_flags_.set_locked(true);
|
|
||||||
new_flags_.clear_signals();
|
|
||||||
if (state_->state_.compare_exchange_strong(flags_.raw_ref(), new_flags_.raw(), std::memory_order_acq_rel)) {
|
|
||||||
own_lock_ = true;
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
bool try_unlock(ActorState::Flags flags) {
|
|
||||||
CHECK(!flags.is_locked());
|
|
||||||
CHECK(own_lock());
|
|
||||||
// can't unlock with signals set
|
|
||||||
//CHECK(!flags.has_signals());
|
|
||||||
|
|
||||||
flags_ = flags;
|
|
||||||
//try unlock
|
|
||||||
if (state_->state_.compare_exchange_strong(new_flags_.raw_ref(), flags.raw(), std::memory_order_acq_rel)) {
|
|
||||||
own_lock_ = false;
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
// read all signals
|
|
||||||
flags.set_locked(true);
|
|
||||||
flags.clear_signals();
|
|
||||||
do {
|
|
||||||
flags_.add_signals(new_flags_.get_signals());
|
|
||||||
} while (!state_->state_.compare_exchange_strong(new_flags_.raw_ref(), flags.raw(), std::memory_order_acq_rel));
|
|
||||||
new_flags_ = flags;
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
bool try_add_signals(ActorSignals signals) {
|
|
||||||
CHECK(!own_lock());
|
|
||||||
CHECK(can_try_add_signals());
|
|
||||||
new_flags_ = flags_;
|
|
||||||
new_flags_.add_signals(signals);
|
|
||||||
return state_->state_.compare_exchange_strong(flags_.raw_ref(), new_flags_.raw(), std::memory_order_acq_rel);
|
|
||||||
}
|
|
||||||
bool add_signals(ActorSignals signals) {
|
|
||||||
CHECK(!own_lock());
|
|
||||||
while (true) {
|
|
||||||
if (can_try_add_signals()) {
|
|
||||||
if (try_add_signals(signals)) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
if (try_lock()) {
|
|
||||||
flags_.add_signals(signals);
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
bool own_lock() const {
|
|
||||||
return own_lock_;
|
|
||||||
}
|
|
||||||
ActorState::Flags flags() const {
|
|
||||||
return flags_;
|
|
||||||
}
|
|
||||||
bool can_execute() const {
|
|
||||||
return flags_.is_shared() == options_.is_shared && (options_.can_execute_paused || !flags_.is_pause());
|
|
||||||
}
|
|
||||||
|
|
||||||
private:
|
|
||||||
ActorState *state_{nullptr};
|
|
||||||
ActorState::Flags flags_;
|
|
||||||
ActorState::Flags new_flags_;
|
|
||||||
bool own_lock_{false};
|
|
||||||
Options options_;
|
|
||||||
|
|
||||||
bool can_try_add_signals() const {
|
|
||||||
return flags_.is_locked() || (flags_.is_in_queue() && !can_execute());
|
|
||||||
}
|
|
||||||
};
|
|
||||||
} // namespace actor2
|
|
||||||
} // namespace td
|
|
@ -1,84 +0,0 @@
|
|||||||
//
|
|
||||||
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
|
|
||||||
//
|
|
||||||
// Distributed under the Boost Software License, Version 1.0. (See accompanying
|
|
||||||
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
|
|
||||||
//
|
|
||||||
#pragma once
|
|
||||||
|
|
||||||
#include "td/utils/common.h"
|
|
||||||
|
|
||||||
namespace td {
|
|
||||||
namespace actor2 {
|
|
||||||
class ActorSignals {
|
|
||||||
public:
|
|
||||||
ActorSignals() = default;
|
|
||||||
uint32 raw() const {
|
|
||||||
return raw_;
|
|
||||||
}
|
|
||||||
bool empty() const {
|
|
||||||
return raw_ == 0;
|
|
||||||
}
|
|
||||||
bool has_signal(uint32 signal) const {
|
|
||||||
return (raw_ & (1u << signal)) != 0;
|
|
||||||
}
|
|
||||||
void add_signal(uint32 signal) {
|
|
||||||
raw_ |= (1u << signal);
|
|
||||||
}
|
|
||||||
void add_signals(ActorSignals signals) {
|
|
||||||
raw_ |= signals.raw();
|
|
||||||
}
|
|
||||||
void clear_signal(uint32 signal) {
|
|
||||||
raw_ &= ~(1u << signal);
|
|
||||||
}
|
|
||||||
uint32 first_signal() {
|
|
||||||
if (!raw_) {
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
#if TD_MSVC
|
|
||||||
int res = 0;
|
|
||||||
int bit = 1;
|
|
||||||
while ((raw_ & bit) == 0) {
|
|
||||||
res++;
|
|
||||||
bit <<= 1;
|
|
||||||
}
|
|
||||||
return res;
|
|
||||||
#else
|
|
||||||
return __builtin_ctz(raw_);
|
|
||||||
#endif
|
|
||||||
}
|
|
||||||
enum Signal : uint32 {
|
|
||||||
// Signals in order of priority
|
|
||||||
Wakeup = 1,
|
|
||||||
Alarm = 2,
|
|
||||||
Kill = 3, // immediate kill
|
|
||||||
Io = 4, // move to io thread
|
|
||||||
Cpu = 5, // move to cpu thread
|
|
||||||
StartUp = 6,
|
|
||||||
TearDown = 7,
|
|
||||||
// Two signals for mpmc queue logic
|
|
||||||
//
|
|
||||||
// PopSignal is set after actor is popped from queue
|
|
||||||
// When processed it should set InQueue and Pause flags to false.
|
|
||||||
//
|
|
||||||
// MessagesSignal is set after new messages was added to actor
|
|
||||||
// If owner of actor wish to delay message handling, she should set InQueue flag to true and
|
|
||||||
// add actor into mpmc queue.
|
|
||||||
Pop = 8, // got popped from queue
|
|
||||||
Message = 9, // got new message
|
|
||||||
};
|
|
||||||
|
|
||||||
static ActorSignals one(uint32 signal) {
|
|
||||||
ActorSignals res;
|
|
||||||
res.add_signal(signal);
|
|
||||||
return res;
|
|
||||||
}
|
|
||||||
|
|
||||||
private:
|
|
||||||
uint32 raw_{0};
|
|
||||||
friend class ActorState;
|
|
||||||
explicit ActorSignals(uint32 raw) : raw_(raw) {
|
|
||||||
}
|
|
||||||
};
|
|
||||||
} // namespace actor2
|
|
||||||
} // namespace td
|
|
@ -1,166 +0,0 @@
|
|||||||
//
|
|
||||||
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
|
|
||||||
//
|
|
||||||
// Distributed under the Boost Software License, Version 1.0. (See accompanying
|
|
||||||
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
|
|
||||||
//
|
|
||||||
#pragma once
|
|
||||||
|
|
||||||
#include "td/actor/impl2/ActorSignals.h"
|
|
||||||
#include "td/actor/impl2/SchedulerId.h"
|
|
||||||
|
|
||||||
#include "td/utils/common.h"
|
|
||||||
|
|
||||||
#include <atomic>
|
|
||||||
|
|
||||||
namespace td {
|
|
||||||
namespace actor2 {
|
|
||||||
class ActorState {
|
|
||||||
public:
|
|
||||||
class Flags {
|
|
||||||
public:
|
|
||||||
Flags() = default;
|
|
||||||
uint32 raw() const {
|
|
||||||
return raw_;
|
|
||||||
}
|
|
||||||
uint32 &raw_ref() {
|
|
||||||
return raw_;
|
|
||||||
}
|
|
||||||
SchedulerId get_scheduler_id() const {
|
|
||||||
return SchedulerId{static_cast<uint8>(raw_ & SchedulerMask)};
|
|
||||||
}
|
|
||||||
void set_scheduler_id(SchedulerId id) {
|
|
||||||
raw_ = (raw_ & ~SchedulerMask) | id.value();
|
|
||||||
}
|
|
||||||
|
|
||||||
bool is_shared() const {
|
|
||||||
return check_flag(SharedFlag);
|
|
||||||
}
|
|
||||||
void set_shared(bool shared) {
|
|
||||||
set_flag(SharedFlag, shared);
|
|
||||||
}
|
|
||||||
|
|
||||||
bool is_locked() const {
|
|
||||||
return check_flag(LockFlag);
|
|
||||||
}
|
|
||||||
void set_locked(bool locked) {
|
|
||||||
set_flag(LockFlag, locked);
|
|
||||||
}
|
|
||||||
|
|
||||||
bool is_migrate() const {
|
|
||||||
return check_flag(MigrateFlag);
|
|
||||||
}
|
|
||||||
void set_migrate(bool migrate) {
|
|
||||||
set_flag(MigrateFlag, migrate);
|
|
||||||
}
|
|
||||||
|
|
||||||
bool is_pause() const {
|
|
||||||
return check_flag(PauseFlag);
|
|
||||||
}
|
|
||||||
void set_pause(bool pause) {
|
|
||||||
set_flag(PauseFlag, pause);
|
|
||||||
}
|
|
||||||
|
|
||||||
bool is_closed() const {
|
|
||||||
return check_flag(ClosedFlag);
|
|
||||||
}
|
|
||||||
void set_closed(bool closed) {
|
|
||||||
set_flag(ClosedFlag, closed);
|
|
||||||
}
|
|
||||||
|
|
||||||
bool is_in_queue() const {
|
|
||||||
return check_flag(InQueueFlag);
|
|
||||||
}
|
|
||||||
void set_in_queue(bool in_queue) {
|
|
||||||
set_flag(InQueueFlag, in_queue);
|
|
||||||
}
|
|
||||||
|
|
||||||
bool has_signals() const {
|
|
||||||
return check_flag(SignalMask);
|
|
||||||
}
|
|
||||||
void clear_signals() {
|
|
||||||
set_flag(SignalMask, false);
|
|
||||||
}
|
|
||||||
void set_signals(ActorSignals signals) {
|
|
||||||
raw_ = (raw_ & ~SignalMask) | (signals.raw() << SignalOffset);
|
|
||||||
}
|
|
||||||
void add_signals(ActorSignals signals) {
|
|
||||||
raw_ = raw_ | (signals.raw() << SignalOffset);
|
|
||||||
}
|
|
||||||
ActorSignals get_signals() const {
|
|
||||||
return ActorSignals{(raw_ & SignalMask) >> SignalOffset};
|
|
||||||
}
|
|
||||||
|
|
||||||
private:
|
|
||||||
uint32 raw_{0};
|
|
||||||
|
|
||||||
friend class ActorState;
|
|
||||||
Flags(uint32 raw) : raw_(raw) {
|
|
||||||
}
|
|
||||||
|
|
||||||
bool check_flag(uint32 mask) const {
|
|
||||||
return (raw_ & mask) != 0;
|
|
||||||
}
|
|
||||||
void set_flag(uint32 mask, bool flag) {
|
|
||||||
raw_ = (raw_ & ~mask) | (flag * mask);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
Flags get_flags_unsafe() {
|
|
||||||
return Flags(state_.load(std::memory_order_relaxed));
|
|
||||||
}
|
|
||||||
void set_flags_unsafe(Flags flags) {
|
|
||||||
state_.store(flags.raw(), std::memory_order_relaxed);
|
|
||||||
}
|
|
||||||
|
|
||||||
private:
|
|
||||||
friend class ActorLocker;
|
|
||||||
std::atomic<uint32> state_{0};
|
|
||||||
enum : uint32 {
|
|
||||||
SchedulerMask = 255,
|
|
||||||
|
|
||||||
// Actors can be shared or not.
|
|
||||||
// If actor is shared, than any thread may try to lock it
|
|
||||||
// If actor is not shared, than it is owned by its scheduler, and only
|
|
||||||
// its scheduler is allowed to access it
|
|
||||||
// This flag may NOT change during the lifetime of an actor
|
|
||||||
SharedFlag = 1 << 9,
|
|
||||||
|
|
||||||
// Only shared actors need lock
|
|
||||||
// Lock if somebody is going to unlock it eventually.
|
|
||||||
// For example actor is locked, when some scheduler is executing its mailbox
|
|
||||||
// Or it is locked when it is in Mpmc queue, so someone will pop it eventually.
|
|
||||||
LockFlag = 1 << 10,
|
|
||||||
|
|
||||||
// While actor is migrating from one scheduler to another no one is allowed to change it
|
|
||||||
// Could not be set for shared actors.
|
|
||||||
MigrateFlag = 1 << 11,
|
|
||||||
|
|
||||||
// While set all messages are delayed
|
|
||||||
// Dropped from flush_maibox
|
|
||||||
// PauseFlag => InQueueFlag
|
|
||||||
PauseFlag = 1 << 12,
|
|
||||||
|
|
||||||
ClosedFlag = 1 << 13,
|
|
||||||
|
|
||||||
InQueueFlag = 1 << 14,
|
|
||||||
|
|
||||||
// Signals
|
|
||||||
SignalOffset = 15,
|
|
||||||
Signal = 1 << SignalOffset,
|
|
||||||
WakeupSignalFlag = Signal << ActorSignals::Wakeup,
|
|
||||||
AlarmSignalFlag = Signal << ActorSignals::Alarm,
|
|
||||||
KillSignalFlag = Signal << ActorSignals::Kill, // immediate kill
|
|
||||||
IoSignalFlag = Signal << ActorSignals::Io, // move to io thread
|
|
||||||
CpuSignalFlag = Signal << ActorSignals::Cpu, // move to cpu thread
|
|
||||||
StartUpSignalFlag = Signal << ActorSignals::StartUp,
|
|
||||||
TearDownSignalFlag = Signal << ActorSignals::TearDown,
|
|
||||||
MessageSignalFlag = Signal << ActorSignals::Message,
|
|
||||||
PopSignalFlag = Signal << ActorSignals::Pop,
|
|
||||||
|
|
||||||
SignalMask = WakeupSignalFlag | AlarmSignalFlag | KillSignalFlag | IoSignalFlag | CpuSignalFlag |
|
|
||||||
StartUpSignalFlag | TearDownSignalFlag | MessageSignalFlag | PopSignalFlag
|
|
||||||
};
|
|
||||||
};
|
|
||||||
} // namespace actor2
|
|
||||||
} // namespace td
|
|
@ -1,11 +0,0 @@
|
|||||||
//
|
|
||||||
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
|
|
||||||
//
|
|
||||||
// Distributed under the Boost Software License, Version 1.0. (See accompanying
|
|
||||||
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
|
|
||||||
//
|
|
||||||
#include "td/actor/impl2/Scheduler.h"
|
|
||||||
|
|
||||||
namespace td {
|
|
||||||
namespace actor2 {}
|
|
||||||
} // namespace td
|
|
File diff suppressed because it is too large
Load Diff
@ -1,32 +0,0 @@
|
|||||||
//
|
|
||||||
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
|
|
||||||
//
|
|
||||||
// Distributed under the Boost Software License, Version 1.0. (See accompanying
|
|
||||||
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
|
|
||||||
//
|
|
||||||
#pragma once
|
|
||||||
|
|
||||||
#include "td/utils/common.h"
|
|
||||||
#include "td/utils/logging.h"
|
|
||||||
|
|
||||||
namespace td {
|
|
||||||
namespace actor2 {
|
|
||||||
class SchedulerId {
|
|
||||||
public:
|
|
||||||
SchedulerId() : id_(-1) {
|
|
||||||
}
|
|
||||||
explicit SchedulerId(uint8 id) : id_(id) {
|
|
||||||
}
|
|
||||||
bool is_valid() const {
|
|
||||||
return id_ >= 0;
|
|
||||||
}
|
|
||||||
uint8 value() const {
|
|
||||||
CHECK(is_valid());
|
|
||||||
return static_cast<uint8>(id_);
|
|
||||||
}
|
|
||||||
|
|
||||||
private:
|
|
||||||
int32 id_{0};
|
|
||||||
};
|
|
||||||
} // namespace actor2
|
|
||||||
} // namespace td
|
|
@ -1,534 +0,0 @@
|
|||||||
//
|
|
||||||
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
|
|
||||||
//
|
|
||||||
// Distributed under the Boost Software License, Version 1.0. (See accompanying
|
|
||||||
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
|
|
||||||
//
|
|
||||||
#include "td/actor/impl2/ActorLocker.h"
|
|
||||||
#include "td/actor/impl2/Scheduler.h"
|
|
||||||
|
|
||||||
#include "td/utils/format.h"
|
|
||||||
#include "td/utils/logging.h"
|
|
||||||
#include "td/utils/port/thread.h"
|
|
||||||
#include "td/utils/Slice.h"
|
|
||||||
#include "td/utils/StringBuilder.h"
|
|
||||||
#include "td/utils/tests.h"
|
|
||||||
#include "td/utils/Time.h"
|
|
||||||
|
|
||||||
#include <array>
|
|
||||||
#include <atomic>
|
|
||||||
#include <deque>
|
|
||||||
#include <memory>
|
|
||||||
|
|
||||||
using td::actor2::ActorLocker;
|
|
||||||
using td::actor2::ActorSignals;
|
|
||||||
using td::actor2::ActorState;
|
|
||||||
using td::actor2::SchedulerId;
|
|
||||||
|
|
||||||
TEST(Actor2, signals) {
|
|
||||||
ActorSignals signals;
|
|
||||||
signals.add_signal(ActorSignals::Wakeup);
|
|
||||||
signals.add_signal(ActorSignals::Cpu);
|
|
||||||
signals.add_signal(ActorSignals::Kill);
|
|
||||||
signals.clear_signal(ActorSignals::Cpu);
|
|
||||||
|
|
||||||
bool was_kill = false;
|
|
||||||
bool was_wakeup = false;
|
|
||||||
while (!signals.empty()) {
|
|
||||||
auto s = signals.first_signal();
|
|
||||||
if (s == ActorSignals::Kill) {
|
|
||||||
was_kill = true;
|
|
||||||
} else if (s == ActorSignals::Wakeup) {
|
|
||||||
was_wakeup = true;
|
|
||||||
} else {
|
|
||||||
UNREACHABLE();
|
|
||||||
}
|
|
||||||
signals.clear_signal(s);
|
|
||||||
}
|
|
||||||
CHECK(was_kill && was_wakeup);
|
|
||||||
}
|
|
||||||
|
|
||||||
TEST(Actors2, flags) {
|
|
||||||
ActorState::Flags flags;
|
|
||||||
CHECK(!flags.is_locked());
|
|
||||||
flags.set_locked(true);
|
|
||||||
CHECK(flags.is_locked());
|
|
||||||
flags.set_locked(false);
|
|
||||||
CHECK(!flags.is_locked());
|
|
||||||
flags.set_pause(true);
|
|
||||||
|
|
||||||
flags.set_scheduler_id(SchedulerId{123});
|
|
||||||
|
|
||||||
auto signals = flags.get_signals();
|
|
||||||
CHECK(signals.empty());
|
|
||||||
signals.add_signal(ActorSignals::Cpu);
|
|
||||||
signals.add_signal(ActorSignals::Kill);
|
|
||||||
CHECK(signals.has_signal(ActorSignals::Cpu));
|
|
||||||
CHECK(signals.has_signal(ActorSignals::Kill));
|
|
||||||
flags.set_signals(signals);
|
|
||||||
CHECK(flags.get_signals().raw() == signals.raw()) << flags.get_signals().raw() << " " << signals.raw();
|
|
||||||
|
|
||||||
auto wakeup = ActorSignals{};
|
|
||||||
wakeup.add_signal(ActorSignals::Wakeup);
|
|
||||||
|
|
||||||
flags.add_signals(wakeup);
|
|
||||||
signals.add_signal(ActorSignals::Wakeup);
|
|
||||||
CHECK(flags.get_signals().raw() == signals.raw());
|
|
||||||
|
|
||||||
flags.clear_signals();
|
|
||||||
CHECK(flags.get_signals().empty());
|
|
||||||
|
|
||||||
CHECK(flags.get_scheduler_id().value() == 123);
|
|
||||||
CHECK(flags.is_pause());
|
|
||||||
}
|
|
||||||
|
|
||||||
TEST(Actor2, locker) {
|
|
||||||
ActorState state;
|
|
||||||
|
|
||||||
ActorSignals kill_signal;
|
|
||||||
kill_signal.add_signal(ActorSignals::Kill);
|
|
||||||
|
|
||||||
ActorSignals wakeup_signal;
|
|
||||||
kill_signal.add_signal(ActorSignals::Wakeup);
|
|
||||||
|
|
||||||
ActorSignals cpu_signal;
|
|
||||||
kill_signal.add_signal(ActorSignals::Cpu);
|
|
||||||
|
|
||||||
{
|
|
||||||
ActorLocker lockerA(&state);
|
|
||||||
ActorLocker lockerB(&state);
|
|
||||||
ActorLocker lockerC(&state);
|
|
||||||
|
|
||||||
CHECK(lockerA.try_lock());
|
|
||||||
CHECK(lockerA.own_lock());
|
|
||||||
auto flagsA = lockerA.flags();
|
|
||||||
CHECK(lockerA.try_unlock(flagsA));
|
|
||||||
CHECK(!lockerA.own_lock());
|
|
||||||
|
|
||||||
CHECK(lockerA.try_lock());
|
|
||||||
CHECK(!lockerB.try_lock());
|
|
||||||
CHECK(!lockerC.try_lock());
|
|
||||||
|
|
||||||
CHECK(lockerB.try_add_signals(kill_signal));
|
|
||||||
CHECK(!lockerC.try_add_signals(wakeup_signal));
|
|
||||||
CHECK(lockerC.try_add_signals(wakeup_signal));
|
|
||||||
CHECK(!lockerC.add_signals(cpu_signal));
|
|
||||||
CHECK(!lockerA.flags().has_signals());
|
|
||||||
CHECK(!lockerA.try_unlock(lockerA.flags()));
|
|
||||||
{
|
|
||||||
auto flags = lockerA.flags();
|
|
||||||
auto signals = flags.get_signals();
|
|
||||||
bool was_kill = false;
|
|
||||||
bool was_wakeup = false;
|
|
||||||
bool was_cpu = false;
|
|
||||||
while (!signals.empty()) {
|
|
||||||
auto s = signals.first_signal();
|
|
||||||
if (s == ActorSignals::Kill) {
|
|
||||||
was_kill = true;
|
|
||||||
} else if (s == ActorSignals::Wakeup) {
|
|
||||||
was_wakeup = true;
|
|
||||||
} else if (s == ActorSignals::Cpu) {
|
|
||||||
was_cpu = true;
|
|
||||||
} else {
|
|
||||||
UNREACHABLE();
|
|
||||||
}
|
|
||||||
signals.clear_signal(s);
|
|
||||||
}
|
|
||||||
CHECK(was_kill && was_wakeup && was_cpu);
|
|
||||||
flags.clear_signals();
|
|
||||||
CHECK(lockerA.try_unlock(flags));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
{
|
|
||||||
ActorLocker lockerB(&state);
|
|
||||||
CHECK(lockerB.try_lock());
|
|
||||||
CHECK(lockerB.try_unlock(lockerB.flags()));
|
|
||||||
CHECK(lockerB.add_signals(kill_signal));
|
|
||||||
CHECK(lockerB.flags().get_signals().has_signal(ActorSignals::Kill));
|
|
||||||
auto flags = lockerB.flags();
|
|
||||||
flags.clear_signals();
|
|
||||||
ActorLocker lockerA(&state);
|
|
||||||
CHECK(!lockerA.add_signals(kill_signal));
|
|
||||||
CHECK(!lockerB.try_unlock(flags));
|
|
||||||
CHECK(!lockerA.add_signals(kill_signal)); // do not loose this signal!
|
|
||||||
CHECK(!lockerB.try_unlock(flags));
|
|
||||||
CHECK(lockerB.flags().get_signals().has_signal(ActorSignals::Kill));
|
|
||||||
CHECK(lockerB.try_unlock(flags));
|
|
||||||
}
|
|
||||||
|
|
||||||
{
|
|
||||||
ActorLocker lockerA(&state);
|
|
||||||
CHECK(lockerA.try_lock());
|
|
||||||
auto flags = lockerA.flags();
|
|
||||||
flags.set_pause(true);
|
|
||||||
CHECK(lockerA.try_unlock(flags));
|
|
||||||
//We have to lock, though we can't execute.
|
|
||||||
CHECK(lockerA.add_signals(wakeup_signal));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#if !TD_THREAD_UNSUPPORTED
|
|
||||||
TEST(Actor2, locker_stress) {
|
|
||||||
ActorState state;
|
|
||||||
|
|
||||||
constexpr size_t threads_n = 5;
|
|
||||||
auto stage = [&](std::atomic<int> &value, int need) {
|
|
||||||
value.fetch_add(1, std::memory_order_release);
|
|
||||||
while (value.load(std::memory_order_acquire) < need) {
|
|
||||||
td::this_thread::yield();
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
struct Node {
|
|
||||||
std::atomic<td::uint32> request{0};
|
|
||||||
td::uint32 response = 0;
|
|
||||||
char pad[64];
|
|
||||||
};
|
|
||||||
std::array<Node, threads_n> nodes;
|
|
||||||
auto do_work = [&]() {
|
|
||||||
for (auto &node : nodes) {
|
|
||||||
auto query = node.request.load(std::memory_order_acquire);
|
|
||||||
if (query) {
|
|
||||||
node.response = query * query;
|
|
||||||
node.request.store(0, std::memory_order_relaxed);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
std::atomic<int> begin{0};
|
|
||||||
std::atomic<int> ready{0};
|
|
||||||
std::atomic<int> check{0};
|
|
||||||
std::vector<td::thread> threads;
|
|
||||||
for (size_t i = 0; i < threads_n; i++) {
|
|
||||||
threads.push_back(td::thread([&, id = i] {
|
|
||||||
for (size_t i = 1; i < 1000000; i++) {
|
|
||||||
ActorLocker locker(&state);
|
|
||||||
auto need = static_cast<int>(threads_n * i);
|
|
||||||
auto query = static_cast<td::uint32>(id + need);
|
|
||||||
stage(begin, need);
|
|
||||||
nodes[id].request = 0;
|
|
||||||
nodes[id].response = 0;
|
|
||||||
stage(ready, need);
|
|
||||||
if (locker.try_lock()) {
|
|
||||||
nodes[id].response = query * query;
|
|
||||||
} else {
|
|
||||||
auto cpu = ActorSignals::one(ActorSignals::Cpu);
|
|
||||||
nodes[id].request.store(query, std::memory_order_release);
|
|
||||||
locker.add_signals(cpu);
|
|
||||||
}
|
|
||||||
while (locker.own_lock()) {
|
|
||||||
auto flags = locker.flags();
|
|
||||||
auto signals = flags.get_signals();
|
|
||||||
if (!signals.empty()) {
|
|
||||||
do_work();
|
|
||||||
}
|
|
||||||
flags.clear_signals();
|
|
||||||
locker.try_unlock(flags);
|
|
||||||
}
|
|
||||||
|
|
||||||
stage(check, need);
|
|
||||||
if (id == 0) {
|
|
||||||
CHECK(locker.add_signals(ActorSignals{}));
|
|
||||||
CHECK(!locker.flags().has_signals());
|
|
||||||
CHECK(locker.try_unlock(locker.flags()));
|
|
||||||
for (size_t thread_id = 0; thread_id < threads_n; thread_id++) {
|
|
||||||
CHECK(nodes[thread_id].response ==
|
|
||||||
static_cast<td::uint32>(thread_id + need) * static_cast<td::uint32>(thread_id + need))
|
|
||||||
<< td::tag("thread", thread_id) << " " << nodes[thread_id].response << " "
|
|
||||||
<< nodes[thread_id].request.load();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}));
|
|
||||||
}
|
|
||||||
for (auto &thread : threads) {
|
|
||||||
thread.join();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
namespace {
|
|
||||||
const size_t BUF_SIZE = 1024 * 1024;
|
|
||||||
char buf[BUF_SIZE];
|
|
||||||
td::StringBuilder sb(td::MutableSlice(buf, BUF_SIZE - 1));
|
|
||||||
} // namespace
|
|
||||||
|
|
||||||
TEST(Actor2, executor_simple) {
|
|
||||||
using namespace td;
|
|
||||||
using namespace td::actor2;
|
|
||||||
struct Dispatcher : public SchedulerDispatcher {
|
|
||||||
void add_to_queue(ActorInfoPtr ptr, SchedulerId scheduler_id, bool need_poll) override {
|
|
||||||
queue.push_back(std::move(ptr));
|
|
||||||
}
|
|
||||||
void set_alarm_timestamp(const ActorInfoPtr &actor_info_ptr, Timestamp timestamp) override {
|
|
||||||
UNREACHABLE();
|
|
||||||
}
|
|
||||||
SchedulerId get_scheduler_id() const override {
|
|
||||||
return SchedulerId{0};
|
|
||||||
}
|
|
||||||
std::deque<ActorInfoPtr> queue;
|
|
||||||
};
|
|
||||||
Dispatcher dispatcher;
|
|
||||||
|
|
||||||
class TestActor : public Actor {
|
|
||||||
public:
|
|
||||||
void close() {
|
|
||||||
stop();
|
|
||||||
}
|
|
||||||
|
|
||||||
private:
|
|
||||||
void start_up() override {
|
|
||||||
sb << "StartUp";
|
|
||||||
}
|
|
||||||
void tear_down() override {
|
|
||||||
sb << "TearDown";
|
|
||||||
}
|
|
||||||
};
|
|
||||||
ActorInfoCreator actor_info_creator;
|
|
||||||
auto actor = actor_info_creator.create(
|
|
||||||
std::make_unique<TestActor>(), ActorInfoCreator::Options().on_scheduler(SchedulerId{0}).with_name("TestActor"));
|
|
||||||
dispatcher.add_to_queue(actor, SchedulerId{0}, false);
|
|
||||||
|
|
||||||
{
|
|
||||||
ActorExecutor executor(*actor, dispatcher, ActorExecutor::Options());
|
|
||||||
CHECK(executor.can_send());
|
|
||||||
CHECK(executor.can_send_immediate());
|
|
||||||
CHECK(sb.as_cslice() == "StartUp") << sb.as_cslice();
|
|
||||||
sb.clear();
|
|
||||||
executor.send(ActorMessageCreator::lambda([&] { sb << "A"; }));
|
|
||||||
CHECK(sb.as_cslice() == "A") << sb.as_cslice();
|
|
||||||
sb.clear();
|
|
||||||
auto big_message = ActorMessageCreator::lambda([&] { sb << "big"; });
|
|
||||||
big_message.set_big();
|
|
||||||
executor.send(std::move(big_message));
|
|
||||||
CHECK(sb.as_cslice() == "") << sb.as_cslice();
|
|
||||||
executor.send(ActorMessageCreator::lambda([&] { sb << "A"; }));
|
|
||||||
CHECK(sb.as_cslice() == "") << sb.as_cslice();
|
|
||||||
}
|
|
||||||
CHECK(dispatcher.queue.size() == 1);
|
|
||||||
{ ActorExecutor executor(*actor, dispatcher, ActorExecutor::Options().with_from_queue()); }
|
|
||||||
CHECK(dispatcher.queue.size() == 1);
|
|
||||||
dispatcher.queue.clear();
|
|
||||||
CHECK(sb.as_cslice() == "bigA") << sb.as_cslice();
|
|
||||||
sb.clear();
|
|
||||||
{
|
|
||||||
ActorExecutor executor(*actor, dispatcher, ActorExecutor::Options());
|
|
||||||
executor.send(
|
|
||||||
ActorMessageCreator::lambda([&] { static_cast<TestActor &>(ActorExecuteContext::get()->actor()).close(); }));
|
|
||||||
}
|
|
||||||
CHECK(sb.as_cslice() == "TearDown") << sb.as_cslice();
|
|
||||||
sb.clear();
|
|
||||||
CHECK(!actor->has_actor());
|
|
||||||
{
|
|
||||||
ActorExecutor executor(*actor, dispatcher, ActorExecutor::Options());
|
|
||||||
executor.send(
|
|
||||||
ActorMessageCreator::lambda([&] { static_cast<TestActor &>(ActorExecuteContext::get()->actor()).close(); }));
|
|
||||||
}
|
|
||||||
CHECK(dispatcher.queue.empty());
|
|
||||||
CHECK(sb.as_cslice() == "");
|
|
||||||
}
|
|
||||||
|
|
||||||
using namespace td::actor2;
|
|
||||||
using td::uint32;
|
|
||||||
static std::atomic<int> cnt;
|
|
||||||
class Worker : public Actor {
|
|
||||||
public:
|
|
||||||
void query(uint32 x, ActorInfoPtr master);
|
|
||||||
void close() {
|
|
||||||
stop();
|
|
||||||
}
|
|
||||||
};
|
|
||||||
class Master : public Actor {
|
|
||||||
public:
|
|
||||||
void on_result(uint32 x, uint32 y) {
|
|
||||||
loop();
|
|
||||||
}
|
|
||||||
|
|
||||||
private:
|
|
||||||
uint32 l = 0;
|
|
||||||
uint32 r = 100000;
|
|
||||||
ActorInfoPtr worker;
|
|
||||||
void start_up() override {
|
|
||||||
worker = detail::create_actor<Worker>(ActorOptions().with_name("Master"));
|
|
||||||
loop();
|
|
||||||
}
|
|
||||||
void loop() override {
|
|
||||||
l++;
|
|
||||||
if (l == r) {
|
|
||||||
if (!--cnt) {
|
|
||||||
SchedulerContext::get()->stop();
|
|
||||||
}
|
|
||||||
detail::send_closure(*worker, &Worker::close);
|
|
||||||
stop();
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
detail::send_lambda(*worker,
|
|
||||||
[x = l, self = get_actor_info_ptr()] { detail::current_actor<Worker>().query(x, self); });
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
void Worker::query(uint32 x, ActorInfoPtr master) {
|
|
||||||
auto y = x;
|
|
||||||
for (int i = 0; i < 100; i++) {
|
|
||||||
y = y * y;
|
|
||||||
}
|
|
||||||
detail::send_lambda(*master, [result = y, x] { detail::current_actor<Master>().on_result(x, result); });
|
|
||||||
}
|
|
||||||
|
|
||||||
TEST(Actor2, scheduler_simple) {
|
|
||||||
auto group_info = std::make_shared<SchedulerGroupInfo>(1);
|
|
||||||
Scheduler scheduler{group_info, SchedulerId{0}, 2};
|
|
||||||
scheduler.start();
|
|
||||||
scheduler.run_in_context([] {
|
|
||||||
cnt = 10;
|
|
||||||
for (int i = 0; i < 10; i++) {
|
|
||||||
detail::create_actor<Master>(ActorOptions().with_name("Master"));
|
|
||||||
}
|
|
||||||
});
|
|
||||||
while (scheduler.run(1000)) {
|
|
||||||
}
|
|
||||||
Scheduler::close_scheduler_group(*group_info);
|
|
||||||
}
|
|
||||||
|
|
||||||
TEST(Actor2, actor_id_simple) {
|
|
||||||
auto group_info = std::make_shared<SchedulerGroupInfo>(1);
|
|
||||||
Scheduler scheduler{group_info, SchedulerId{0}, 2};
|
|
||||||
sb.clear();
|
|
||||||
scheduler.start();
|
|
||||||
|
|
||||||
scheduler.run_in_context([] {
|
|
||||||
class A : public Actor {
|
|
||||||
public:
|
|
||||||
A(int value) : value_(value) {
|
|
||||||
sb << "A" << value_;
|
|
||||||
}
|
|
||||||
void hello() {
|
|
||||||
sb << "hello";
|
|
||||||
}
|
|
||||||
~A() {
|
|
||||||
sb << "~A";
|
|
||||||
if (--cnt <= 0) {
|
|
||||||
SchedulerContext::get()->stop();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private:
|
|
||||||
int value_;
|
|
||||||
};
|
|
||||||
cnt = 1;
|
|
||||||
auto id = create_actor<A>("A", 123);
|
|
||||||
CHECK(sb.as_cslice() == "A123");
|
|
||||||
sb.clear();
|
|
||||||
send_closure(id, &A::hello);
|
|
||||||
});
|
|
||||||
while (scheduler.run(1000)) {
|
|
||||||
}
|
|
||||||
CHECK(sb.as_cslice() == "hello~A");
|
|
||||||
Scheduler::close_scheduler_group(*group_info);
|
|
||||||
sb.clear();
|
|
||||||
}
|
|
||||||
|
|
||||||
TEST(Actor2, actor_creation) {
|
|
||||||
auto group_info = std::make_shared<SchedulerGroupInfo>(1);
|
|
||||||
Scheduler scheduler{group_info, SchedulerId{0}, 1};
|
|
||||||
scheduler.start();
|
|
||||||
|
|
||||||
scheduler.run_in_context([]() mutable {
|
|
||||||
class B;
|
|
||||||
class A : public Actor {
|
|
||||||
public:
|
|
||||||
void f() {
|
|
||||||
check();
|
|
||||||
stop();
|
|
||||||
}
|
|
||||||
|
|
||||||
private:
|
|
||||||
void start_up() override {
|
|
||||||
check();
|
|
||||||
create_actor<B>("Simple", actor_id(this)).release();
|
|
||||||
}
|
|
||||||
|
|
||||||
void check() {
|
|
||||||
auto &context = *SchedulerContext::get();
|
|
||||||
CHECK(context.has_poll());
|
|
||||||
context.get_poll();
|
|
||||||
}
|
|
||||||
|
|
||||||
void tear_down() override {
|
|
||||||
if (--cnt <= 0) {
|
|
||||||
SchedulerContext::get()->stop();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
class B : public Actor {
|
|
||||||
public:
|
|
||||||
B(ActorId<A> a) : a_(a) {
|
|
||||||
}
|
|
||||||
|
|
||||||
private:
|
|
||||||
void start_up() override {
|
|
||||||
auto &context = *SchedulerContext::get();
|
|
||||||
CHECK(!context.has_poll());
|
|
||||||
send_closure(a_, &A::f);
|
|
||||||
stop();
|
|
||||||
}
|
|
||||||
void tear_down() override {
|
|
||||||
if (--cnt <= 0) {
|
|
||||||
SchedulerContext::get()->stop();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
ActorId<A> a_;
|
|
||||||
};
|
|
||||||
cnt = 2;
|
|
||||||
create_actor<A>(ActorOptions().with_name("Poll").with_poll()).release();
|
|
||||||
});
|
|
||||||
while (scheduler.run(1000)) {
|
|
||||||
}
|
|
||||||
scheduler.stop();
|
|
||||||
Scheduler::close_scheduler_group(*group_info);
|
|
||||||
}
|
|
||||||
|
|
||||||
TEST(Actor2, actor_timeout_simple) {
|
|
||||||
auto group_info = std::make_shared<SchedulerGroupInfo>(1);
|
|
||||||
Scheduler scheduler{group_info, SchedulerId{0}, 2};
|
|
||||||
sb.clear();
|
|
||||||
scheduler.start();
|
|
||||||
|
|
||||||
scheduler.run_in_context([] {
|
|
||||||
class A : public Actor {
|
|
||||||
public:
|
|
||||||
void start_up() override {
|
|
||||||
set_timeout();
|
|
||||||
}
|
|
||||||
void alarm() override {
|
|
||||||
double diff = td::Time::now() - expected_timeout_;
|
|
||||||
CHECK(-0.001 < diff && diff < 0.1) << diff;
|
|
||||||
if (cnt_-- > 0) {
|
|
||||||
set_timeout();
|
|
||||||
} else {
|
|
||||||
stop();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void tear_down() override {
|
|
||||||
SchedulerContext::get()->stop();
|
|
||||||
}
|
|
||||||
|
|
||||||
private:
|
|
||||||
double expected_timeout_;
|
|
||||||
int cnt_ = 5;
|
|
||||||
void set_timeout() {
|
|
||||||
auto wakeup_timestamp = td::Timestamp::in(0.1);
|
|
||||||
expected_timeout_ = wakeup_timestamp.at();
|
|
||||||
alarm_timestamp() = wakeup_timestamp;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
create_actor<A>(ActorInfoCreator::Options().with_name("A").with_poll()).release();
|
|
||||||
});
|
|
||||||
while (scheduler.run(1000)) {
|
|
||||||
}
|
|
||||||
Scheduler::close_scheduler_group(*group_info);
|
|
||||||
sb.clear();
|
|
||||||
}
|
|
||||||
#endif //!TD_THREAD_UNSUPPORTED
|
|
Reference in New Issue
Block a user