Minor improvements.

This commit is contained in:
levlam 2022-06-09 16:38:38 +03:00
parent 7941672e79
commit a30ac1c277
5 changed files with 35 additions and 19 deletions

View File

@ -346,26 +346,33 @@ Status AuthKeyHandshake::on_message(Slice message, Callback *connection, AuthKey
return status; return status;
} }
GlobalFloodControl::GlobalFloodControl(uint64 limit) : limit_(limit) {
}
void GlobalFloodControl::finish() { void GlobalFloodControl::finish() {
// TODO: instead of decrementing count, we may wake up some pending request // TODO: instead of decrementing count, we can wake up some pending request
active_count_--; auto old_value = active_count_.fetch_sub(1, std::memory_order_relaxed);
} CHECK(old_value > 0);
GlobalFloodControl::GlobalFloodControl(uint64_t limit) : limit_(limit) {
} }
Result<GlobalFloodControl::Guard> GlobalFloodControl::try_start() { Result<GlobalFloodControl::Guard> GlobalFloodControl::try_start() {
if (++active_count_ > limit_) { auto old_value = active_count_.fetch_add(1, std::memory_order_relaxed);
active_count_--; if (old_value >= limit_) {
return td::Status::Error("Handshake limit reached"); finish();
return Status::Error("Handshake limit reached");
} }
return Guard(this); return Guard(this);
} }
GlobalFloodControl *GlobalFloodControl::get_handshake_flood() { GlobalFloodControl *GlobalFloodControl::get_handshake_flood() {
constexpr uint64_t MAX_CONCURRENT_HANDSHAKES = 50; constexpr uint64 MAX_CONCURRENT_HANDSHAKES = 250;
static GlobalFloodControl flood{MAX_CONCURRENT_HANDSHAKES}; static GlobalFloodControl flood{MAX_CONCURRENT_HANDSHAKES};
return &flood; return &flood;
} }
void GlobalFloodControl::Finish::operator()(GlobalFloodControl *ctrl) const { void GlobalFloodControl::Finish::operator()(GlobalFloodControl *ctrl) const {
ctrl->finish(); ctrl->finish();
} }
} // namespace mtproto } // namespace mtproto
} // namespace td } // namespace td

View File

@ -10,11 +10,15 @@
#include "td/mtproto/RSA.h" #include "td/mtproto/RSA.h"
#include "td/utils/buffer.h" #include "td/utils/buffer.h"
#include "td/utils/common.h"
#include "td/utils/Slice.h" #include "td/utils/Slice.h"
#include "td/utils/Status.h" #include "td/utils/Status.h"
#include "td/utils/StorerBase.h" #include "td/utils/StorerBase.h"
#include "td/utils/UInt.h" #include "td/utils/UInt.h"
#include <atomic>
#include <memory>
namespace td { namespace td {
namespace mtproto_api { namespace mtproto_api {
@ -27,17 +31,19 @@ class DhCallback;
class GlobalFloodControl { class GlobalFloodControl {
public: public:
explicit GlobalFloodControl(uint64_t limit); explicit GlobalFloodControl(uint64 limit);
struct Finish { struct Finish {
void operator()(GlobalFloodControl *ctrl) const; void operator()(GlobalFloodControl *ctrl) const;
}; };
using Guard = std::unique_ptr<GlobalFloodControl, Finish>; using Guard = std::unique_ptr<GlobalFloodControl, Finish>;
td::Result<Guard> try_start(); Result<Guard> try_start();
static GlobalFloodControl *get_handshake_flood(); static GlobalFloodControl *get_handshake_flood();
private: private:
std::atomic<uint64_t> active_count_{0}; std::atomic<uint64> active_count_{0};
uint64_t limit_{0}; uint64 limit_{0};
void finish(); void finish();
}; };
@ -47,7 +53,7 @@ class AuthKeyHandshakeContext {
virtual ~AuthKeyHandshakeContext() = default; virtual ~AuthKeyHandshakeContext() = default;
virtual DhCallback *get_dh_callback() = 0; virtual DhCallback *get_dh_callback() = 0;
virtual PublicRsaKeyInterface *get_public_rsa_key_interface() = 0; virtual PublicRsaKeyInterface *get_public_rsa_key_interface() = 0;
virtual td::Status try_start() = 0; virtual Status try_start() = 0;
}; };
class AuthKeyHandshake { class AuthKeyHandshake {

View File

@ -500,8 +500,8 @@ class TestProxyRequest final : public RequestOnceActor {
mtproto::PublicRsaKeyInterface *get_public_rsa_key_interface() final { mtproto::PublicRsaKeyInterface *get_public_rsa_key_interface() final {
return &public_rsa_key; return &public_rsa_key;
} }
td::Status try_start() final { Status try_start() final {
return td::Status::OK(); return Status::OK();
} }
private: private:

View File

@ -79,13 +79,13 @@ class GenAuthKeyActor final : public Actor {
CancellationTokenSource cancellation_token_source_; CancellationTokenSource cancellation_token_source_;
ActorOwn<mtproto::HandshakeActor> child_; ActorOwn<mtproto::HandshakeActor> child_;
td::Status alarm_error_; Status alarm_error_;
void start_up() final { void start_up() final {
// Bug in Android clang and MSVC // Bug in Android clang and MSVC
// std::tuple<Result<int>> b(std::forward_as_tuple(Result<int>())); // std::tuple<Result<int>> b(std::forward_as_tuple(Result<int>()));
// Will sleep a little it there are too much active handshakes now // Will sleep a little if there are too many active handshakes now
// //
// TODO: we may want to use a blocking wait - semaphore but for actors. // TODO: we may want to use a blocking wait - semaphore but for actors.
// (problem is - multiple schedulers may want to uses this semaphore) // (problem is - multiple schedulers may want to uses this semaphore)
@ -1372,9 +1372,9 @@ void Session::create_gen_auth_key_actor(HandshakeId handshake_id) {
return public_rsa_key_.get(); return public_rsa_key_.get();
} }
td::Status try_start() { Status try_start() final {
TRY_RESULT_ASSIGN(guard_, mtproto::GlobalFloodControl::get_handshake_flood()->try_start()); TRY_RESULT_ASSIGN(guard_, mtproto::GlobalFloodControl::get_handshake_flood()->try_start());
return td::Status::OK(); return Status::OK();
} }
private: private:

View File

@ -305,6 +305,9 @@ class HandshakeContext final : public td::mtproto::AuthKeyHandshakeContext {
td::mtproto::PublicRsaKeyInterface *get_public_rsa_key_interface() final { td::mtproto::PublicRsaKeyInterface *get_public_rsa_key_interface() final {
return &public_rsa_key; return &public_rsa_key;
} }
td::Status try_start() final {
return td::Status::OK();
}
private: private:
td::PublicRsaKeyShared public_rsa_key{td::DcId::empty(), true}; td::PublicRsaKeyShared public_rsa_key{td::DcId::empty(), true};