Actor: always_wait_for_maibox flag
GitOrigin-RevId: cb048967998ffc585133d6a58c77674a17766049
This commit is contained in:
parent
19ef0361f9
commit
15356c4402
@ -3792,6 +3792,8 @@ void Td::on_authorization_lost() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
void Td::start_up() {
|
void Td::start_up() {
|
||||||
|
always_wait_for_mailbox();
|
||||||
|
|
||||||
uint64 check_endianness = 0x0706050403020100;
|
uint64 check_endianness = 0x0706050403020100;
|
||||||
auto check_endianness_raw = reinterpret_cast<const unsigned char *>(&check_endianness);
|
auto check_endianness_raw = reinterpret_cast<const unsigned char *>(&check_endianness);
|
||||||
for (unsigned char c = 0; c < 8; c++) {
|
for (unsigned char c = 0; c < 8; c++) {
|
||||||
|
@ -77,6 +77,8 @@ class Actor : public ObserverBase {
|
|||||||
void set_context(std::shared_ptr<ActorContext> context);
|
void set_context(std::shared_ptr<ActorContext> context);
|
||||||
void set_tag(CSlice tag);
|
void set_tag(CSlice tag);
|
||||||
|
|
||||||
|
void always_wait_for_mailbox();
|
||||||
|
|
||||||
// for ActorInfo mostly
|
// for ActorInfo mostly
|
||||||
void init(ObjectPool<ActorInfo>::OwnerPtr &&info);
|
void init(ObjectPool<ActorInfo>::OwnerPtr &&info);
|
||||||
ActorInfo *get_info();
|
ActorInfo *get_info();
|
||||||
|
@ -146,4 +146,8 @@ inline Slice Actor::get_name() const {
|
|||||||
return info_->get_name();
|
return info_->get_name();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
inline void Actor::always_wait_for_mailbox() {
|
||||||
|
info_->always_wait_for_mailbox();
|
||||||
|
}
|
||||||
|
|
||||||
} // namespace td
|
} // namespace td
|
||||||
|
@ -94,14 +94,17 @@ class ActorInfo
|
|||||||
vector<Event> mailbox_;
|
vector<Event> mailbox_;
|
||||||
|
|
||||||
bool is_lite() const;
|
bool is_lite() const;
|
||||||
|
bool can_flush_mailbox() const;
|
||||||
|
|
||||||
void set_wait_generation(uint32 wait_generation);
|
void set_wait_generation(uint32 wait_generation);
|
||||||
bool must_wait(uint32 wait_generation) const;
|
bool must_wait(uint32 wait_generation) const;
|
||||||
|
void always_wait_for_mailbox();
|
||||||
|
|
||||||
private:
|
private:
|
||||||
Deleter deleter_;
|
Deleter deleter_;
|
||||||
bool is_lite_;
|
bool is_lite_;
|
||||||
bool is_running_;
|
bool is_running_;
|
||||||
|
bool always_wait_for_mailbox_{false};
|
||||||
uint32 wait_generation_{0};
|
uint32 wait_generation_{0};
|
||||||
|
|
||||||
std::atomic<int32> sched_id_{0};
|
std::atomic<int32> sched_id_{0};
|
||||||
|
@ -57,7 +57,10 @@ inline void ActorInfo::set_wait_generation(uint32 wait_generation) {
|
|||||||
wait_generation_ = wait_generation;
|
wait_generation_ = wait_generation;
|
||||||
}
|
}
|
||||||
inline bool ActorInfo::must_wait(uint32 wait_generation) const {
|
inline bool ActorInfo::must_wait(uint32 wait_generation) const {
|
||||||
return wait_generation_ == wait_generation;
|
return wait_generation_ == wait_generation || (always_wait_for_mailbox_ && !mailbox_.empty());
|
||||||
|
}
|
||||||
|
inline void ActorInfo::always_wait_for_mailbox() {
|
||||||
|
always_wait_for_mailbox_ = true;
|
||||||
}
|
}
|
||||||
inline void ActorInfo::on_actor_moved(Actor *actor_new_ptr) {
|
inline void ActorInfo::on_actor_moved(Actor *actor_new_ptr) {
|
||||||
actor_ = actor_new_ptr;
|
actor_ = actor_new_ptr;
|
||||||
|
@ -10,6 +10,7 @@
|
|||||||
#include "td/actor/MultiPromise.h"
|
#include "td/actor/MultiPromise.h"
|
||||||
#include "td/actor/PromiseFuture.h"
|
#include "td/actor/PromiseFuture.h"
|
||||||
#include "td/actor/SleepActor.h"
|
#include "td/actor/SleepActor.h"
|
||||||
|
#include "td/actor/Timeout.h"
|
||||||
|
|
||||||
#include "td/utils/logging.h"
|
#include "td/utils/logging.h"
|
||||||
#include "td/utils/Observer.h"
|
#include "td/utils/Observer.h"
|
||||||
@ -583,3 +584,39 @@ TEST(Actors, stop_in_teardown) {
|
|||||||
}
|
}
|
||||||
scheduler.finish();
|
scheduler.finish();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
class AlwaysWaitForMailbox : public Actor {
|
||||||
|
public:
|
||||||
|
void start_up() override {
|
||||||
|
always_wait_for_mailbox();
|
||||||
|
create_actor<SleepActor>("Sleep", 0.1, PromiseCreator::lambda([actor_id = actor_id(this), ptr = this](Unit) {
|
||||||
|
send_closure(actor_id, &AlwaysWaitForMailbox::g);
|
||||||
|
send_closure(actor_id, &AlwaysWaitForMailbox::g);
|
||||||
|
CHECK(!ptr->was_f_);
|
||||||
|
}))
|
||||||
|
.release();
|
||||||
|
}
|
||||||
|
|
||||||
|
void f() {
|
||||||
|
was_f_ = true;
|
||||||
|
Scheduler::instance()->finish();
|
||||||
|
}
|
||||||
|
void g() {
|
||||||
|
send_closure(actor_id(this), &AlwaysWaitForMailbox::f);
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
Timeout timeout_;
|
||||||
|
bool was_f_{false};
|
||||||
|
};
|
||||||
|
|
||||||
|
TEST(Actors, always_wait_for_mailbox) {
|
||||||
|
SET_VERBOSITY_LEVEL(VERBOSITY_NAME(ERROR));
|
||||||
|
ConcurrentScheduler scheduler;
|
||||||
|
scheduler.init(0);
|
||||||
|
scheduler.create_actor_unsafe<AlwaysWaitForMailbox>(0, "A").release();
|
||||||
|
scheduler.start();
|
||||||
|
while (scheduler.run_main(10)) {
|
||||||
|
}
|
||||||
|
scheduler.finish();
|
||||||
|
}
|
||||||
|
@ -51,7 +51,7 @@ struct UInt {
|
|||||||
|
|
||||||
template <size_t size>
|
template <size_t size>
|
||||||
inline bool operator==(const UInt<size> &a, const UInt<size> &b) {
|
inline bool operator==(const UInt<size> &a, const UInt<size> &b) {
|
||||||
return std::memcmp(a.raw, b.raw, sizeof(a)) == 0;
|
return std::memcmp(a.raw, b.raw, sizeof(a.raw)) == 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
template <size_t size>
|
template <size_t size>
|
||||||
|
Reference in New Issue
Block a user