Use SemaphoreActor for global hanshakes flood limit

This commit is contained in:
Arseny Smirnov 2022-06-10 15:17:18 +02:00
parent 1b5798393d
commit ef3900a853

View File

@ -48,6 +48,50 @@ namespace td {
namespace detail {
class SemaphoreActor final : public Actor {
public:
explicit SemaphoreActor(size_t capacity) : capacity_(capacity) {
}
void execute(td::Promise<td::Promise<td::Unit>> promise) {
if (capacity_ == 0) {
pending_.push(std::move(promise));
} else {
start(std::move(promise));
}
}
private:
size_t capacity_;
VectorQueue<td::Promise<td::Promise<td::Unit>>> pending_;
void finish(td::Result<td::Unit> r) {
capacity_++;
if (capacity_ > 0 && !pending_.empty()) {
start(pending_.pop());
}
}
void start(td::Promise<td::Promise<td::Unit>> promise) {
CHECK(capacity_ > 0);
capacity_--;
promise.set_value(promise_send_closure(actor_id(this), &SemaphoreActor::finish));
}
};
struct Semaphore {
public:
explicit Semaphore(size_t capacity) {
semaphore_ = create_actor<SemaphoreActor>("semaphore", capacity).release();
}
void execute(td::Promise<td::Promise<td::Unit>> promise) {
send_closure(semaphore_, &SemaphoreActor::execute, std::move(promise));
}
private:
td::ActorId<SemaphoreActor> semaphore_;
};
class GenAuthKeyActor final : public Actor {
public:
GenAuthKeyActor(Slice name, unique_ptr<mtproto::AuthKeyHandshake> handshake,
@ -80,31 +124,26 @@ class GenAuthKeyActor final : public Actor {
CancellationTokenSource cancellation_token_source_;
ActorOwn<mtproto::HandshakeActor> child_;
FloodControlGlobal::Guard guard_;
Promise<Unit> finish_promise_;
FloodControlGlobal *get_handshake_flood() {
constexpr uint64 MAX_CONCURRENT_HANDSHAKES = 500;
static FloodControlGlobal flood{MAX_CONCURRENT_HANDSHAKES};
return &flood;
static TD_THREAD_LOCAL Semaphore *semaphore_;
Semaphore &get_handshake_semaphore() {
init_thread_local<Semaphore>(semaphore_, 50);
return *semaphore_;
}
void start_up() final {
// Bug in Android clang and MSVC
// std::tuple<Result<int>> b(std::forward_as_tuple(Result<int>()));
get_handshake_semaphore().execute(promise_send_closure(actor_id(this), &GenAuthKeyActor::do_start_up));
}
// Will sleep a little if there are too many active handshakes now
//
// TODO: we may want to use a blocking wait - semaphore but for actors.
// (problem is - multiple schedulers may want to uses this semaphore)
guard_ = get_handshake_flood()->try_start();
if (guard_ == nullptr) {
// Set timeout because otherwise this actor will be recreated immediately.
// Sadly, it is still O(clients_count^2) time, because all clients will keep waking up.
// Still much better than creating new connection each time.
set_timeout_in(1);
return;
void do_start_up(Result<td::Promise<td::Unit>> r_finish_promise) {
if (r_finish_promise.is_error()) {
LOG(ERROR) << "Unexpected error: " << r_finish_promise.error();
} else {
finish_promise_ = r_finish_promise.move_as_ok();
}
callback_->request_raw_connection(
nullptr, PromiseCreator::cancellable_lambda(
cancellation_token_source_.get_cancellation_token(),
@ -138,14 +177,10 @@ class GenAuthKeyActor final : public Actor {
std::move(raw_connection), std::move(context_), 10, std::move(connection_promise_),
std::move(handshake_promise_));
}
void timeout_expired() override {
CHECK(guard_ == nullptr);
connection_promise_.set_error(Status::Error(1, "Handshake limit reached"));
handshake_promise_.set_value(std::move(handshake_));
}
};
TD_THREAD_LOCAL Semaphore *GenAuthKeyActor::semaphore_{};
} // namespace detail
void Session::PriorityQueue::push(NetQueryPtr query) {