add some kind of global limit for concurrent hanshakes
This commit is contained in:
parent
1cace9e666
commit
7941672e79
|
@ -346,5 +346,26 @@ Status AuthKeyHandshake::on_message(Slice message, Callback *connection, AuthKey
|
|||
return status;
|
||||
}
|
||||
|
||||
void GlobalFloodControl::finish() {
|
||||
// TODO: instead of decrementing count, we may wake up some pending request
|
||||
active_count_--;
|
||||
}
|
||||
GlobalFloodControl::GlobalFloodControl(uint64_t limit) : limit_(limit) {
|
||||
}
|
||||
Result<GlobalFloodControl::Guard> GlobalFloodControl::try_start() {
|
||||
if (++active_count_ > limit_) {
|
||||
active_count_--;
|
||||
return td::Status::Error("Handshake limit reached");
|
||||
}
|
||||
return Guard(this);
|
||||
}
|
||||
GlobalFloodControl *GlobalFloodControl::get_handshake_flood() {
|
||||
constexpr uint64_t MAX_CONCURRENT_HANDSHAKES = 50;
|
||||
static GlobalFloodControl flood{MAX_CONCURRENT_HANDSHAKES};
|
||||
return &flood;
|
||||
}
|
||||
void GlobalFloodControl::Finish::operator()(GlobalFloodControl *ctrl) const {
|
||||
ctrl->finish();
|
||||
}
|
||||
} // namespace mtproto
|
||||
} // namespace td
|
||||
|
|
|
@ -25,11 +25,29 @@ namespace mtproto {
|
|||
|
||||
class DhCallback;
|
||||
|
||||
class GlobalFloodControl {
|
||||
public:
|
||||
explicit GlobalFloodControl(uint64_t limit);
|
||||
struct Finish {
|
||||
void operator()(GlobalFloodControl *ctrl) const;
|
||||
};
|
||||
using Guard = std::unique_ptr<GlobalFloodControl, Finish>;
|
||||
td::Result<Guard> try_start();
|
||||
static GlobalFloodControl *get_handshake_flood();
|
||||
|
||||
private:
|
||||
std::atomic<uint64_t> active_count_{0};
|
||||
uint64_t limit_{0};
|
||||
|
||||
void finish();
|
||||
};
|
||||
|
||||
class AuthKeyHandshakeContext {
|
||||
public:
|
||||
virtual ~AuthKeyHandshakeContext() = default;
|
||||
virtual DhCallback *get_dh_callback() = 0;
|
||||
virtual PublicRsaKeyInterface *get_public_rsa_key_interface() = 0;
|
||||
virtual td::Status try_start() = 0;
|
||||
};
|
||||
|
||||
class AuthKeyHandshake {
|
||||
|
|
|
@ -500,6 +500,9 @@ class TestProxyRequest final : public RequestOnceActor {
|
|||
mtproto::PublicRsaKeyInterface *get_public_rsa_key_interface() final {
|
||||
return &public_rsa_key;
|
||||
}
|
||||
td::Status try_start() final {
|
||||
return td::Status::OK();
|
||||
}
|
||||
|
||||
private:
|
||||
PublicRsaKeyShared public_rsa_key{DcId::empty(), false};
|
||||
|
|
|
@ -79,11 +79,26 @@ class GenAuthKeyActor final : public Actor {
|
|||
CancellationTokenSource cancellation_token_source_;
|
||||
|
||||
ActorOwn<mtproto::HandshakeActor> child_;
|
||||
td::Status alarm_error_;
|
||||
|
||||
void start_up() final {
|
||||
// Bug in Android clang and MSVC
|
||||
// std::tuple<Result<int>> b(std::forward_as_tuple(Result<int>()));
|
||||
|
||||
// Will sleep a little it there are too much active handshakes now
|
||||
//
|
||||
// TODO: we may want to use a blocking wait - semaphore but for actors.
|
||||
// (problem is - multiple schedulers may want to uses this semaphore)
|
||||
auto status = context_->try_start();
|
||||
if (status.is_error()) {
|
||||
alarm_error_ = std::move(status);
|
||||
// Set timeout because otherwise this actor will be recreated immediately.
|
||||
// Sadly, it is still O(clients_count^2) time, because all clients will keep waking up.
|
||||
// Still much better than creating new connection each time.
|
||||
set_timeout_in(5);
|
||||
return;
|
||||
}
|
||||
|
||||
callback_->request_raw_connection(
|
||||
nullptr, PromiseCreator::cancellable_lambda(
|
||||
cancellation_token_source_.get_cancellation_token(),
|
||||
|
@ -117,6 +132,12 @@ class GenAuthKeyActor final : public Actor {
|
|||
std::move(raw_connection), std::move(context_), 10, std::move(connection_promise_),
|
||||
std::move(handshake_promise_));
|
||||
}
|
||||
|
||||
void timeout_expired() override {
|
||||
CHECK(alarm_error_.is_error());
|
||||
connection_promise_.set_error(std::move(alarm_error_));
|
||||
handshake_promise_.set_value(std::move(handshake_));
|
||||
}
|
||||
};
|
||||
|
||||
} // namespace detail
|
||||
|
@ -1351,10 +1372,17 @@ void Session::create_gen_auth_key_actor(HandshakeId handshake_id) {
|
|||
return public_rsa_key_.get();
|
||||
}
|
||||
|
||||
td::Status try_start() {
|
||||
TRY_RESULT_ASSIGN(guard_, mtproto::GlobalFloodControl::get_handshake_flood()->try_start());
|
||||
return td::Status::OK();
|
||||
}
|
||||
|
||||
private:
|
||||
mtproto::DhCallback *dh_callback_;
|
||||
std::shared_ptr<mtproto::PublicRsaKeyInterface> public_rsa_key_;
|
||||
mtproto::GlobalFloodControl::Guard guard_;
|
||||
};
|
||||
|
||||
info.actor_ = create_actor<detail::GenAuthKeyActor>(
|
||||
PSLICE() << get_name() << "::GenAuthKey", get_name(), std::move(info.handshake_),
|
||||
td::make_unique<AuthKeyHandshakeContext>(DhCache::instance(), shared_auth_data_->public_rsa_key()),
|
||||
|
|
Loading…
Reference in New Issue