tdlight/td/telegram/net/SessionMultiProxy.cpp
2024-01-01 03:07:21 +03:00

180 lines
5.5 KiB
C++

//
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2024
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include "td/telegram/net/SessionMultiProxy.h"
#include "td/telegram/net/SessionProxy.h"
#include "td/utils/common.h"
#include "td/utils/format.h"
#include "td/utils/logging.h"
#include "td/utils/misc.h"
#include "td/utils/Random.h"
#include "td/utils/Slice.h"
#include "td/utils/SliceBuilder.h"
namespace td {
SessionMultiProxy::~SessionMultiProxy() = default;
SessionMultiProxy::SessionMultiProxy(int32 session_count, std::shared_ptr<AuthDataShared> shared_auth_data,
bool is_primary, bool is_main, bool use_pfs, bool allow_media_only, bool is_media,
bool is_cdn)
: session_count_(session_count)
, auth_data_(std::move(shared_auth_data))
, is_primary_(is_primary)
, is_main_(is_main)
, use_pfs_(use_pfs)
, allow_media_only_(allow_media_only)
, is_media_(is_media)
, is_cdn_(is_cdn) {
if (allow_media_only_) {
CHECK(is_media_);
}
}
void SessionMultiProxy::send(NetQueryPtr query) {
size_t pos = 0;
if (query->auth_flag() == NetQuery::AuthFlag::On) {
size_t session_rand = query->session_rand();
if (session_rand) {
pos = session_rand % sessions_.size();
} else {
size_t equal_count = 1;
int min_query_count = sessions_[pos].query_count;
for (size_t i = 1; i < sessions_.size(); i++) {
if (sessions_[i].query_count < min_query_count) {
pos = i;
min_query_count = sessions_[pos].query_count;
equal_count = 1;
} else if (sessions_[i].query_count == min_query_count) {
equal_count++;
if (Random::fast_uint32() % equal_count == 0) {
pos = i;
}
}
}
}
}
// query->debug(PSTRING() << get_name() << ": send to proxy #" << pos);
sessions_[pos].query_count++;
send_closure(sessions_[pos].proxy, &SessionProxy::send, std::move(query));
}
void SessionMultiProxy::update_main_flag(bool is_main) {
LOG(INFO) << "Update is_main to " << is_main;
is_main_ = is_main;
for (auto &session : sessions_) {
send_closure(session.proxy, &SessionProxy::update_main_flag, is_main);
}
}
void SessionMultiProxy::destroy_auth_key() {
update_options(1, false, true);
}
void SessionMultiProxy::update_session_count(int32 session_count) {
update_options(session_count, use_pfs_, need_destroy_auth_key_);
}
void SessionMultiProxy::update_use_pfs(bool use_pfs) {
update_options(session_count_, use_pfs, need_destroy_auth_key_);
}
void SessionMultiProxy::update_options(int32 session_count, bool use_pfs, bool need_destroy_auth_key) {
if (need_destroy_auth_key_) {
LOG(INFO) << "Ignore session option changes while destroying auth key";
return;
}
bool is_changed = false;
session_count = clamp(session_count, 1, 100);
if (session_count != session_count_) {
session_count_ = session_count;
LOG(INFO) << "Update session_count to " << session_count_;
is_changed = true;
}
if (use_pfs != use_pfs_) {
bool old_pfs_flag = get_pfs_flag();
use_pfs_ = use_pfs;
if (old_pfs_flag != get_pfs_flag()) {
LOG(INFO) << "Update use_pfs to " << use_pfs_;
is_changed = true;
}
}
if (need_destroy_auth_key) {
need_destroy_auth_key_ = need_destroy_auth_key;
is_changed = true;
LOG(WARNING) << "Destroy auth key";
}
if (is_changed) {
init();
}
}
void SessionMultiProxy::update_mtproto_header() {
for (auto &session : sessions_) {
send_closure_later(session.proxy, &SessionProxy::update_mtproto_header);
}
}
void SessionMultiProxy::start_up() {
init();
}
bool SessionMultiProxy::get_pfs_flag() const {
return use_pfs_ && !is_cdn_;
}
void SessionMultiProxy::init() {
sessions_generation_++;
sessions_.clear();
if (is_main_ && session_count_ > 1) {
LOG(WARNING) << tag("session_count", session_count_);
}
for (int32 i = 0; i < session_count_; i++) {
string name = PSTRING() << "Session" << get_name().substr(Slice("SessionMulti").size())
<< format::cond(session_count_ > 1, format::concat("#", i));
SessionInfo info;
class Callback final : public SessionProxy::Callback {
public:
Callback(ActorId<SessionMultiProxy> parent, uint32 generation, int32 session_id)
: parent_(parent), generation_(generation), session_id_(session_id) {
}
void on_query_finished() final {
send_closure(parent_, &SessionMultiProxy::on_query_finished, generation_, session_id_);
}
private:
ActorId<SessionMultiProxy> parent_;
uint32 generation_;
int32 session_id_;
};
info.proxy =
create_actor<SessionProxy>(name, make_unique<Callback>(actor_id(this), sessions_generation_, i), auth_data_,
is_primary_, is_main_, allow_media_only_, is_media_, get_pfs_flag(),
session_count_ > 1 && is_primary_, is_cdn_, need_destroy_auth_key_ && i == 0);
sessions_.push_back(std::move(info));
}
}
void SessionMultiProxy::on_query_finished(uint32 generation, int session_id) {
if (generation != sessions_generation_) {
return;
}
CHECK(static_cast<size_t>(session_id) < sessions_.size());
auto &query_count = sessions_[session_id].query_count;
CHECK(query_count > 0);
query_count--;
}
} // namespace td