Minor handhake semaphore improvements.
This commit is contained in:
parent
caf84b01b7
commit
f93001b0f3
@ -39,6 +39,7 @@
|
|||||||
#include "td/utils/Timer.h"
|
#include "td/utils/Timer.h"
|
||||||
#include "td/utils/tl_parsers.h"
|
#include "td/utils/tl_parsers.h"
|
||||||
#include "td/utils/utf8.h"
|
#include "td/utils/utf8.h"
|
||||||
|
#include "td/utils/VectorQueue.h"
|
||||||
|
|
||||||
#include <tuple>
|
#include <tuple>
|
||||||
#include <utility>
|
#include <utility>
|
||||||
@ -52,7 +53,7 @@ class SemaphoreActor final : public Actor {
|
|||||||
explicit SemaphoreActor(size_t capacity) : capacity_(capacity) {
|
explicit SemaphoreActor(size_t capacity) : capacity_(capacity) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void execute(td::Promise<td::Promise<td::Unit>> promise) {
|
void execute(Promise<Promise<Unit>> promise) {
|
||||||
if (capacity_ == 0) {
|
if (capacity_ == 0) {
|
||||||
pending_.push(std::move(promise));
|
pending_.push(std::move(promise));
|
||||||
} else {
|
} else {
|
||||||
@ -62,15 +63,16 @@ class SemaphoreActor final : public Actor {
|
|||||||
|
|
||||||
private:
|
private:
|
||||||
size_t capacity_;
|
size_t capacity_;
|
||||||
VectorQueue<td::Promise<td::Promise<td::Unit>>> pending_;
|
VectorQueue<Promise<Promise<Unit>>> pending_;
|
||||||
void finish(td::Result<td::Unit> r) {
|
|
||||||
|
void finish(Result<Unit>) {
|
||||||
capacity_++;
|
capacity_++;
|
||||||
if (capacity_ > 0 && !pending_.empty()) {
|
if (!pending_.empty()) {
|
||||||
start(pending_.pop());
|
start(pending_.pop());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void start(td::Promise<td::Promise<td::Unit>> promise) {
|
void start(Promise<Promise<Unit>> promise) {
|
||||||
CHECK(capacity_ > 0);
|
CHECK(capacity_ > 0);
|
||||||
capacity_--;
|
capacity_--;
|
||||||
promise.set_value(promise_send_closure(actor_id(this), &SemaphoreActor::finish));
|
promise.set_value(promise_send_closure(actor_id(this), &SemaphoreActor::finish));
|
||||||
@ -83,12 +85,12 @@ struct Semaphore {
|
|||||||
semaphore_ = create_actor<SemaphoreActor>("semaphore", capacity).release();
|
semaphore_ = create_actor<SemaphoreActor>("semaphore", capacity).release();
|
||||||
}
|
}
|
||||||
|
|
||||||
void execute(td::Promise<td::Promise<td::Unit>> promise) {
|
void execute(Promise<Promise<Unit>> promise) {
|
||||||
send_closure(semaphore_, &SemaphoreActor::execute, std::move(promise));
|
send_closure(semaphore_, &SemaphoreActor::execute, std::move(promise));
|
||||||
}
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
td::ActorId<SemaphoreActor> semaphore_;
|
ActorId<SemaphoreActor> semaphore_;
|
||||||
};
|
};
|
||||||
|
|
||||||
class GenAuthKeyActor final : public Actor {
|
class GenAuthKeyActor final : public Actor {
|
||||||
@ -137,7 +139,7 @@ class GenAuthKeyActor final : public Actor {
|
|||||||
get_handshake_semaphore().execute(promise_send_closure(actor_id(this), &GenAuthKeyActor::do_start_up));
|
get_handshake_semaphore().execute(promise_send_closure(actor_id(this), &GenAuthKeyActor::do_start_up));
|
||||||
}
|
}
|
||||||
|
|
||||||
void do_start_up(Result<td::Promise<td::Unit>> r_finish_promise) {
|
void do_start_up(Result<Promise<Unit>> r_finish_promise) {
|
||||||
if (r_finish_promise.is_error()) {
|
if (r_finish_promise.is_error()) {
|
||||||
LOG(ERROR) << "Unexpected error: " << r_finish_promise.error();
|
LOG(ERROR) << "Unexpected error: " << r_finish_promise.error();
|
||||||
} else {
|
} else {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user