Client: set unique tag for each Td actor
GitOrigin-RevId: 7ee500c44703a395aae9151969b6bb8e3aee5ebd
This commit is contained in:
parent
3193d5c2d9
commit
eaf48c36a1
@ -116,10 +116,18 @@ class MultiTd : public Actor {
|
|||||||
CHECK(td.empty());
|
CHECK(td.empty());
|
||||||
|
|
||||||
string name = "Td";
|
string name = "Td";
|
||||||
if (td_id != 0) {
|
class TdActorContext : public ActorContext {
|
||||||
name += PSTRING() << '#' << td_id;
|
public:
|
||||||
|
TdActorContext(std::string tag) : tag_(std::move(tag)) {
|
||||||
}
|
}
|
||||||
td = create_actor<Td>(name, std::move(callback));
|
std::string tag_;
|
||||||
|
};
|
||||||
|
auto context = std::make_shared<TdActorContext>(to_string(td_id));
|
||||||
|
auto old_context = set_context(context);
|
||||||
|
auto old_tag = set_tag(context->tag_);
|
||||||
|
td = create_actor<Td>("Td", std::move(callback));
|
||||||
|
set_context(old_context);
|
||||||
|
set_tag(old_tag);
|
||||||
}
|
}
|
||||||
void send(int32 td_id, Client::Request request) {
|
void send(int32 td_id, Client::Request request) {
|
||||||
auto &td = tds_[td_id];
|
auto &td = tds_[td_id];
|
||||||
@ -176,6 +184,7 @@ class MultiImpl {
|
|||||||
MultiImpl &operator=(MultiImpl &&) = delete;
|
MultiImpl &operator=(MultiImpl &&) = delete;
|
||||||
|
|
||||||
int32 create_id() {
|
int32 create_id() {
|
||||||
|
static std::atomic<int32> id_{0};
|
||||||
return id_.fetch_add(1) + 1;
|
return id_.fetch_add(1) + 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -207,7 +216,6 @@ class MultiImpl {
|
|||||||
std::shared_ptr<ConcurrentScheduler> concurrent_scheduler_;
|
std::shared_ptr<ConcurrentScheduler> concurrent_scheduler_;
|
||||||
thread scheduler_thread_;
|
thread scheduler_thread_;
|
||||||
ActorOwn<MultiTd> multi_td_;
|
ActorOwn<MultiTd> multi_td_;
|
||||||
std::atomic<int32> id_{0};
|
|
||||||
};
|
};
|
||||||
|
|
||||||
class Client::Impl final {
|
class Client::Impl final {
|
||||||
|
@ -74,8 +74,8 @@ class Actor : public ObserverBase {
|
|||||||
void do_migrate(int32 sched_id);
|
void do_migrate(int32 sched_id);
|
||||||
|
|
||||||
uint64 get_link_token();
|
uint64 get_link_token();
|
||||||
void set_context(std::shared_ptr<ActorContext> context);
|
std::shared_ptr<ActorContext> set_context(std::shared_ptr<ActorContext> context);
|
||||||
void set_tag(CSlice tag);
|
CSlice set_tag(CSlice tag);
|
||||||
|
|
||||||
void always_wait_for_mailbox();
|
void always_wait_for_mailbox();
|
||||||
|
|
||||||
|
@ -85,12 +85,18 @@ std::enable_if_t<std::is_base_of<Actor, ActorType>::value> finish_migrate(ActorT
|
|||||||
inline uint64 Actor::get_link_token() {
|
inline uint64 Actor::get_link_token() {
|
||||||
return Scheduler::instance()->get_link_token(this);
|
return Scheduler::instance()->get_link_token(this);
|
||||||
}
|
}
|
||||||
inline void Actor::set_context(std::shared_ptr<ActorContext> context) {
|
inline std::shared_ptr<ActorContext> Actor::set_context(std::shared_ptr<ActorContext> context) {
|
||||||
info_->set_context(std::move(context));
|
return info_->set_context(std::move(context));
|
||||||
}
|
}
|
||||||
inline void Actor::set_tag(CSlice tag) {
|
inline CSlice Actor::set_tag(CSlice tag) {
|
||||||
info_->get_context()->tag_ = tag.c_str();
|
auto &tag_ref = info_->get_context()->tag_;
|
||||||
|
CSlice old_tag;
|
||||||
|
if (tag_ref) {
|
||||||
|
old_tag = CSlice(tag_ref);
|
||||||
|
}
|
||||||
|
tag_ref = tag.c_str();
|
||||||
Scheduler::on_context_updated();
|
Scheduler::on_context_updated();
|
||||||
|
return old_tag;
|
||||||
}
|
}
|
||||||
|
|
||||||
inline void Actor::init(ObjectPool<ActorInfo>::OwnerPtr &&info) {
|
inline void Actor::init(ObjectPool<ActorInfo>::OwnerPtr &&info) {
|
||||||
|
@ -74,7 +74,7 @@ class ActorInfo
|
|||||||
Actor *get_actor_unsafe();
|
Actor *get_actor_unsafe();
|
||||||
const Actor *get_actor_unsafe() const;
|
const Actor *get_actor_unsafe() const;
|
||||||
|
|
||||||
void set_context(std::shared_ptr<ActorContext> context);
|
std::shared_ptr<ActorContext> set_context(std::shared_ptr<ActorContext> context);
|
||||||
ActorContext *get_context();
|
ActorContext *get_context();
|
||||||
const ActorContext *get_context() const;
|
const ActorContext *get_context() const;
|
||||||
CSlice get_name() const;
|
CSlice get_name() const;
|
||||||
|
@ -143,13 +143,14 @@ inline const Actor *ActorInfo::get_actor_unsafe() const {
|
|||||||
return actor_;
|
return actor_;
|
||||||
}
|
}
|
||||||
|
|
||||||
inline void ActorInfo::set_context(std::shared_ptr<ActorContext> context) {
|
inline std::shared_ptr<ActorContext> ActorInfo::set_context(std::shared_ptr<ActorContext> context) {
|
||||||
CHECK(is_running());
|
CHECK(is_running());
|
||||||
context->this_ptr_ = context;
|
context->this_ptr_ = context;
|
||||||
context->tag_ = Scheduler::context()->tag_;
|
context->tag_ = Scheduler::context()->tag_;
|
||||||
context_ = std::move(context);
|
std::swap(context_, context);
|
||||||
Scheduler::context() = context_.get();
|
Scheduler::context() = context_.get();
|
||||||
Scheduler::on_context_updated();
|
Scheduler::on_context_updated();
|
||||||
|
return context;
|
||||||
}
|
}
|
||||||
inline const ActorContext *ActorInfo::get_context() const {
|
inline const ActorContext *ActorInfo::get_context() const {
|
||||||
return context_.get();
|
return context_.get();
|
||||||
|
Reference in New Issue
Block a user