Merge remote-tracking branch 'td/master'
This commit is contained in:
commit
2bb228c967
@ -10,6 +10,7 @@
|
||||
#include "td/mtproto/RSA.h"
|
||||
|
||||
#include "td/utils/buffer.h"
|
||||
#include "td/utils/common.h"
|
||||
#include "td/utils/Slice.h"
|
||||
#include "td/utils/Status.h"
|
||||
#include "td/utils/StorerBase.h"
|
||||
|
@ -10909,7 +10909,9 @@ void MessagesManager::delete_sent_message_on_server(DialogId dialog_id, MessageI
|
||||
bool need_update_dialog_pos = false;
|
||||
auto m = delete_message(d, message_id, true, &need_update_dialog_pos, "delete_sent_message_on_server");
|
||||
CHECK(m == nullptr);
|
||||
CHECK(need_update_dialog_pos == false);
|
||||
if (need_update_dialog_pos) { // last_clear_history_message_id might be removed
|
||||
update_dialog_pos(d, "delete_sent_message_on_server");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -3038,11 +3038,21 @@ void Td::request(uint64 id, tl_object_ptr<td_api::Function> function) {
|
||||
}
|
||||
|
||||
VLOG(td_requests) << "Receive request " << id << ": " << to_string(function);
|
||||
int32 function_id = function->get_id();
|
||||
if (is_synchronous_request(function_id)) {
|
||||
if (is_synchronous_request(function->get_id())) {
|
||||
// send response synchronously
|
||||
return send_result(id, static_request(std::move(function)));
|
||||
}
|
||||
|
||||
run_request(id, std::move(function));
|
||||
}
|
||||
|
||||
void Td::run_request(uint64 id, tl_object_ptr<td_api::Function> function) {
|
||||
if (init_request_id_ > 0) {
|
||||
pending_init_requests_.emplace_back(id, std::move(function));
|
||||
return;
|
||||
}
|
||||
|
||||
int32 function_id = function->get_id();
|
||||
if (state_ != State::Run) {
|
||||
switch (function_id) {
|
||||
case td_api::getAuthorizationState::ID:
|
||||
@ -3088,11 +3098,11 @@ void Td::request(uint64 id, tl_object_ptr<td_api::Function> function) {
|
||||
switch (function_id) {
|
||||
case td_api::checkDatabaseEncryptionKey::ID: {
|
||||
auto check_key = move_tl_object_as<td_api::checkDatabaseEncryptionKey>(function);
|
||||
return answer_ok_query(id, init(as_db_key(std::move(check_key->encryption_key_), parameters_.use_custom_db_format)));
|
||||
return start_init(id, std::move(check_key->encryption_key_));
|
||||
}
|
||||
case td_api::setDatabaseEncryptionKey::ID: {
|
||||
auto set_key = move_tl_object_as<td_api::setDatabaseEncryptionKey>(function);
|
||||
return answer_ok_query(id, init(as_db_key(std::move(set_key->new_encryption_key_), parameters_.use_custom_db_format)));
|
||||
return start_init(id, std::move(set_key->new_encryption_key_));
|
||||
}
|
||||
case td_api::destroy::ID:
|
||||
// need to send response synchronously before actual destroying
|
||||
@ -3655,29 +3665,34 @@ void Td::complete_pending_preauthentication_requests(const T &func) {
|
||||
}
|
||||
}
|
||||
|
||||
Status Td::init(DbKey key) {
|
||||
void Td::start_init(uint64 id, string &&key) {
|
||||
VLOG(td_init) << "Begin to init database";
|
||||
init_request_id_ = id;
|
||||
|
||||
auto promise = PromiseCreator::lambda([actor_id = actor_id(this)](Result<TdDb::OpenedDatabase> r_opened_database) {
|
||||
send_closure(actor_id, &Td::init, std::move(r_opened_database));
|
||||
});
|
||||
auto current_scheduler_id = Scheduler::instance()->sched_id();
|
||||
auto scheduler_count = Scheduler::instance()->sched_count();
|
||||
#ifdef __linux__
|
||||
#if defined(__GLIBC__) && !defined(__UCLIBC__) && !defined(__MUSL__)
|
||||
mallopt(M_ARENA_MAX, 1);
|
||||
// Force mmap to be able to free the memory after an instance has been closed
|
||||
mallopt(M_MMAP_THRESHOLD, 0);
|
||||
#endif
|
||||
#endif
|
||||
TdDb::open(min(current_scheduler_id + 1, scheduler_count - 1), parameters_, as_db_key(std::move(key), parameters_.use_custom_db_format),
|
||||
std::move(promise));
|
||||
}
|
||||
|
||||
VLOG(td_init) << "Begin to init database";
|
||||
TdDb::Events events;
|
||||
auto r_td_db = TdDb::open(min(current_scheduler_id + 1, scheduler_count - 1), parameters_, std::move(key), events);
|
||||
if (r_td_db.is_error()) {
|
||||
LOG(WARNING) << "Failed to open database: " << r_td_db.error();
|
||||
return Status::Error(400, r_td_db.error().message());
|
||||
void Td::init(Result<TdDb::OpenedDatabase> r_opened_database) {
|
||||
CHECK(init_request_id_ != 0);
|
||||
if (r_opened_database.is_error()) {
|
||||
LOG(WARNING) << "Failed to open database: " << r_opened_database.error();
|
||||
send_closure(actor_id(this), &Td::send_error, init_request_id_,
|
||||
Status::Error(400, r_opened_database.error().message()));
|
||||
return finish_init();
|
||||
}
|
||||
|
||||
LOG(INFO) << "Successfully inited database in " << tag("database_directory", parameters_.database_directory)
|
||||
<< " and " << tag("files_directory", parameters_.files_directory);
|
||||
VLOG(td_init) << "Successfully inited database";
|
||||
|
||||
G()->init(parameters_, actor_id(this), r_td_db.move_as_ok()).ensure();
|
||||
auto events = r_opened_database.move_as_ok();
|
||||
G()->init(parameters_, actor_id(this), std::move(events.database)).ensure();
|
||||
|
||||
init_options_and_network();
|
||||
|
||||
@ -3720,6 +3735,9 @@ Status Td::init(DbKey key) {
|
||||
|
||||
G()->set_my_id(G()->shared_config().get_option_integer("my_id"));
|
||||
|
||||
auto current_scheduler_id = Scheduler::instance()->sched_id();
|
||||
auto scheduler_count = Scheduler::instance()->sched_count();
|
||||
|
||||
storage_manager_ = create_actor<StorageManager>("StorageManager", create_reference(),
|
||||
min(current_scheduler_id + 2, scheduler_count - 1));
|
||||
G()->set_storage_manager(storage_manager_.get());
|
||||
@ -3801,7 +3819,25 @@ Status Td::init(DbKey key) {
|
||||
VLOG(td_init) << "Finish initialization";
|
||||
|
||||
state_ = State::Run;
|
||||
return Status::OK();
|
||||
|
||||
send_closure(actor_id(this), &Td::send_result, init_request_id_, td_api::make_object<td_api::ok>());
|
||||
return finish_init();
|
||||
}
|
||||
|
||||
void Td::finish_init() {
|
||||
CHECK(init_request_id_ > 0);
|
||||
init_request_id_ = 0;
|
||||
|
||||
if (pending_init_requests_.empty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
VLOG(td_init) << "Continue to execute " << pending_init_requests_.size() << " pending requests";
|
||||
auto requests = std::move(pending_init_requests_);
|
||||
for (auto &request : requests) {
|
||||
run_request(request.first, std::move(request.second));
|
||||
}
|
||||
CHECK(pending_init_requests_.size() < requests.size());
|
||||
}
|
||||
|
||||
void Td::init_options_and_network() {
|
||||
|
@ -13,6 +13,7 @@
|
||||
#include "td/telegram/net/NetQueryStats.h"
|
||||
#include "td/telegram/td_api.h"
|
||||
#include "td/telegram/TdCallback.h"
|
||||
#include "td/telegram/TdDb.h"
|
||||
#include "td/telegram/TdParameters.h"
|
||||
#include "td/telegram/telegram_api.h"
|
||||
#include "td/telegram/TermsOfService.h"
|
||||
@ -264,6 +265,8 @@ class Td final : public Actor {
|
||||
|
||||
void on_connection_state_changed(ConnectionState new_state);
|
||||
|
||||
void run_request(uint64 id, tl_object_ptr<td_api::Function> function);
|
||||
|
||||
void send_result(uint64 id, tl_object_ptr<td_api::Object> object);
|
||||
void send_error(uint64 id, Status error);
|
||||
void send_error_impl(uint64 id, tl_object_ptr<td_api::error> error);
|
||||
@ -300,6 +303,7 @@ class Td final : public Actor {
|
||||
int close_flag_ = 0;
|
||||
|
||||
enum class State : int32 { WaitParameters, Decrypt, Run, Close } state_ = State::WaitParameters;
|
||||
uint64 init_request_id_ = 0;
|
||||
bool is_database_encrypted_ = false;
|
||||
|
||||
FlatHashMap<uint64, std::shared_ptr<ResultHandler>> result_handlers_;
|
||||
@ -325,6 +329,8 @@ class Td final : public Actor {
|
||||
|
||||
vector<std::pair<uint64, td_api::object_ptr<td_api::Function>>> pending_preauthentication_requests_;
|
||||
|
||||
vector<std::pair<uint64, td_api::object_ptr<td_api::Function>>> pending_init_requests_;
|
||||
|
||||
template <class T>
|
||||
void complete_pending_preauthentication_requests(const T &func);
|
||||
|
||||
@ -1405,14 +1411,27 @@ class Td final : public Actor {
|
||||
static td_api::object_ptr<td_api::Object> do_static_request(td_api::testReturnError &request);
|
||||
|
||||
static DbKey as_db_key(string key, bool custom_db);
|
||||
Status init(DbKey key) TD_WARN_UNUSED_RESULT;
|
||||
|
||||
void start_init(uint64 id, string &&key);
|
||||
|
||||
void init(Result<TdDb::OpenedDatabase> r_opened_database);
|
||||
|
||||
void init_options_and_network();
|
||||
|
||||
void init_connection_creator();
|
||||
|
||||
void init_file_manager();
|
||||
|
||||
void init_managers();
|
||||
|
||||
void finish_init();
|
||||
|
||||
void clear();
|
||||
|
||||
void close_impl(bool destroy_flag);
|
||||
|
||||
static Status fix_parameters(TdParameters ¶meters) TD_WARN_UNUSED_RESULT;
|
||||
|
||||
Status set_parameters(td_api::object_ptr<td_api::tdlibParameters> parameters) TD_WARN_UNUSED_RESULT;
|
||||
|
||||
static td_api::object_ptr<td_api::error> make_error(int32 code, CSlice error) {
|
||||
|
@ -64,7 +64,7 @@ Result<TdDb::EncryptionInfo> check_encryption(string path) {
|
||||
}
|
||||
|
||||
Status init_binlog(Binlog &binlog, string path, BinlogKeyValue<Binlog> &binlog_pmc, BinlogKeyValue<Binlog> &config_pmc,
|
||||
TdDb::Events &events, DbKey key) {
|
||||
TdDb::OpenedDatabase &events, DbKey key) {
|
||||
auto callback = [&](const BinlogEvent &event) {
|
||||
switch (event.type_) {
|
||||
case LogEvent::HandlerType::SecretChats:
|
||||
@ -396,7 +396,21 @@ Status TdDb::init_sqlite(int32 scheduler_id, const TdParameters ¶meters, con
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status TdDb::init(int32 scheduler_id, const TdParameters ¶meters, DbKey key, Events &events) {
|
||||
void TdDb::open(int32 scheduler_id, TdParameters parameters, DbKey key, Promise<OpenedDatabase> &&promise) {
|
||||
if (scheduler_id >= 0 && Scheduler::instance()->sched_id() != scheduler_id) {
|
||||
class Worker final : public Actor {
|
||||
public:
|
||||
void open(TdParameters &¶meters, DbKey &&key, Promise<OpenedDatabase> &&promise) {
|
||||
TdDb::open(-1, std::move(parameters), std::move(key), std::move(promise));
|
||||
stop();
|
||||
}
|
||||
};
|
||||
send_closure(create_actor_on_scheduler<Worker>("worker", scheduler_id), &Worker::open, std::move(parameters),
|
||||
std::move(key), std::move(promise));
|
||||
return;
|
||||
}
|
||||
OpenedDatabase result;
|
||||
|
||||
// Init pmc
|
||||
Binlog *binlog_ptr = nullptr;
|
||||
auto binlog = std::shared_ptr<Binlog>(new Binlog, [&](Binlog *ptr) { binlog_ptr = ptr; });
|
||||
@ -408,7 +422,8 @@ Status TdDb::init(int32 scheduler_id, const TdParameters ¶meters, DbKey key,
|
||||
|
||||
bool encrypt_binlog = !key.is_empty();
|
||||
VLOG(td_init) << "Start binlog loading";
|
||||
TRY_STATUS(init_binlog(*binlog, get_binlog_path(parameters), *binlog_pmc, *config_pmc, events, std::move(key)));
|
||||
TRY_STATUS_PROMISE(
|
||||
promise, init_binlog(*binlog, get_binlog_path(parameters), *binlog_pmc, *config_pmc, result, std::move(key)));
|
||||
VLOG(td_init) << "Finish binlog loading";
|
||||
|
||||
binlog_pmc->external_init_finish(binlog);
|
||||
@ -436,15 +451,16 @@ Status TdDb::init(int32 scheduler_id, const TdParameters ¶meters, DbKey key,
|
||||
}
|
||||
}
|
||||
VLOG(td_init) << "Start to init database";
|
||||
auto init_sqlite_status = init_sqlite(scheduler_id, parameters, new_sqlite_key, old_sqlite_key, *binlog_pmc);
|
||||
auto db = make_unique<TdDb>();
|
||||
auto init_sqlite_status = db->init_sqlite(scheduler_id, parameters, new_sqlite_key, old_sqlite_key, *binlog_pmc);
|
||||
VLOG(td_init) << "Finish to init database";
|
||||
if (init_sqlite_status.is_error()) {
|
||||
LOG(ERROR) << "Destroy bad SQLite database because of " << init_sqlite_status;
|
||||
if (sql_connection_ != nullptr) {
|
||||
sql_connection_->get().close();
|
||||
if (db->sql_connection_ != nullptr) {
|
||||
db->sql_connection_->get().close();
|
||||
}
|
||||
SqliteDb::destroy(get_sqlite_path(parameters)).ignore();
|
||||
TRY_STATUS(init_sqlite(scheduler_id, parameters, new_sqlite_key, old_sqlite_key, *binlog_pmc));
|
||||
TRY_STATUS_PROMISE(promise, db->init_sqlite(scheduler_id, parameters, new_sqlite_key, old_sqlite_key, *binlog_pmc));
|
||||
}
|
||||
if (drop_sqlite_key) {
|
||||
binlog_pmc->erase("sqlite_key");
|
||||
@ -474,22 +490,18 @@ Status TdDb::init(int32 scheduler_id, const TdParameters ¶meters, DbKey key,
|
||||
VLOG(td_init) << "Init concurrent_config_pmc";
|
||||
concurrent_config_pmc->external_init_finish(concurrent_binlog);
|
||||
|
||||
binlog_pmc_ = std::move(concurrent_binlog_pmc);
|
||||
config_pmc_ = std::move(concurrent_config_pmc);
|
||||
binlog_ = std::move(concurrent_binlog);
|
||||
db->binlog_pmc_ = std::move(concurrent_binlog_pmc);
|
||||
db->config_pmc_ = std::move(concurrent_config_pmc);
|
||||
db->binlog_ = std::move(concurrent_binlog);
|
||||
|
||||
return Status::OK();
|
||||
result.database = std::move(db);
|
||||
|
||||
promise.set_value(std::move(result));
|
||||
}
|
||||
|
||||
TdDb::TdDb() = default;
|
||||
TdDb::~TdDb() = default;
|
||||
|
||||
Result<unique_ptr<TdDb>> TdDb::open(int32 scheduler_id, const TdParameters ¶meters, DbKey key, Events &events) {
|
||||
auto db = make_unique<TdDb>();
|
||||
TRY_STATUS(db->init(scheduler_id, parameters, std::move(key), events));
|
||||
return std::move(db);
|
||||
}
|
||||
|
||||
Result<TdDb::EncryptionInfo> TdDb::check_encryption(const TdParameters ¶meters) {
|
||||
return ::td::check_encryption(get_binlog_path(parameters));
|
||||
}
|
||||
|
@ -48,7 +48,9 @@ class TdDb {
|
||||
TdDb &operator=(TdDb &&) = delete;
|
||||
~TdDb();
|
||||
|
||||
struct Events {
|
||||
struct OpenedDatabase {
|
||||
unique_ptr<TdDb> database;
|
||||
|
||||
vector<BinlogEvent> to_secret_chats_manager;
|
||||
vector<BinlogEvent> user_events;
|
||||
vector<BinlogEvent> chat_events;
|
||||
@ -60,7 +62,7 @@ class TdDb {
|
||||
vector<BinlogEvent> to_notification_manager;
|
||||
vector<BinlogEvent> to_notification_settings_manager;
|
||||
};
|
||||
static Result<unique_ptr<TdDb>> open(int32 scheduler_id, const TdParameters ¶meters, DbKey key, Events &events);
|
||||
static void open(int32 scheduler_id, TdParameters parameters, DbKey key, Promise<OpenedDatabase> &&promise);
|
||||
|
||||
struct EncryptionInfo {
|
||||
bool is_encrypted{false};
|
||||
@ -121,7 +123,6 @@ class TdDb {
|
||||
std::shared_ptr<BinlogKeyValue<ConcurrentBinlog>> config_pmc_;
|
||||
std::shared_ptr<ConcurrentBinlog> binlog_;
|
||||
|
||||
Status init(int32 scheduler_id, const TdParameters ¶meters, DbKey key, Events &events);
|
||||
Status init_sqlite(int32 scheduler_id, const TdParameters ¶meters, const DbKey &key, const DbKey &old_key,
|
||||
BinlogKeyValue<Binlog> &binlog_pmc);
|
||||
|
||||
|
@ -39,6 +39,7 @@
|
||||
#include "td/utils/Timer.h"
|
||||
#include "td/utils/tl_parsers.h"
|
||||
#include "td/utils/utf8.h"
|
||||
#include "td/utils/VectorQueue.h"
|
||||
|
||||
#include <tuple>
|
||||
#include <utility>
|
||||
@ -47,6 +48,51 @@ namespace td {
|
||||
|
||||
namespace detail {
|
||||
|
||||
class SemaphoreActor final : public Actor {
|
||||
public:
|
||||
explicit SemaphoreActor(size_t capacity) : capacity_(capacity) {
|
||||
}
|
||||
|
||||
void execute(Promise<Promise<Unit>> promise) {
|
||||
if (capacity_ == 0) {
|
||||
pending_.push(std::move(promise));
|
||||
} else {
|
||||
start(std::move(promise));
|
||||
}
|
||||
}
|
||||
|
||||
private:
|
||||
size_t capacity_;
|
||||
VectorQueue<Promise<Promise<Unit>>> pending_;
|
||||
|
||||
void finish(Result<Unit>) {
|
||||
capacity_++;
|
||||
if (!pending_.empty()) {
|
||||
start(pending_.pop());
|
||||
}
|
||||
}
|
||||
|
||||
void start(Promise<Promise<Unit>> promise) {
|
||||
CHECK(capacity_ > 0);
|
||||
capacity_--;
|
||||
promise.set_value(promise_send_closure(actor_id(this), &SemaphoreActor::finish));
|
||||
}
|
||||
};
|
||||
|
||||
struct Semaphore {
|
||||
public:
|
||||
explicit Semaphore(size_t capacity) {
|
||||
semaphore_ = create_actor<SemaphoreActor>("semaphore", capacity).release();
|
||||
}
|
||||
|
||||
void execute(Promise<Promise<Unit>> promise) {
|
||||
send_closure(semaphore_, &SemaphoreActor::execute, std::move(promise));
|
||||
}
|
||||
|
||||
private:
|
||||
ActorId<SemaphoreActor> semaphore_;
|
||||
};
|
||||
|
||||
class GenAuthKeyActor final : public Actor {
|
||||
public:
|
||||
GenAuthKeyActor(Slice name, unique_ptr<mtproto::AuthKeyHandshake> handshake,
|
||||
@ -79,11 +125,26 @@ class GenAuthKeyActor final : public Actor {
|
||||
CancellationTokenSource cancellation_token_source_;
|
||||
|
||||
ActorOwn<mtproto::HandshakeActor> child_;
|
||||
Promise<Unit> finish_promise_;
|
||||
|
||||
static TD_THREAD_LOCAL Semaphore *semaphore_;
|
||||
Semaphore &get_handshake_semaphore() {
|
||||
init_thread_local<Semaphore>(semaphore_, 50);
|
||||
return *semaphore_;
|
||||
}
|
||||
|
||||
void start_up() final {
|
||||
// Bug in Android clang and MSVC
|
||||
// std::tuple<Result<int>> b(std::forward_as_tuple(Result<int>()));
|
||||
get_handshake_semaphore().execute(promise_send_closure(actor_id(this), &GenAuthKeyActor::do_start_up));
|
||||
}
|
||||
|
||||
void do_start_up(Result<Promise<Unit>> r_finish_promise) {
|
||||
if (r_finish_promise.is_error()) {
|
||||
LOG(ERROR) << "Unexpected error: " << r_finish_promise.error();
|
||||
} else {
|
||||
finish_promise_ = r_finish_promise.move_as_ok();
|
||||
}
|
||||
callback_->request_raw_connection(
|
||||
nullptr, PromiseCreator::cancellable_lambda(
|
||||
cancellation_token_source_.get_cancellation_token(),
|
||||
@ -119,6 +180,8 @@ class GenAuthKeyActor final : public Actor {
|
||||
}
|
||||
};
|
||||
|
||||
TD_THREAD_LOCAL Semaphore *GenAuthKeyActor::semaphore_{};
|
||||
|
||||
} // namespace detail
|
||||
|
||||
void Session::PriorityQueue::push(NetQueryPtr query) {
|
||||
@ -1294,7 +1357,7 @@ void Session::on_handshake_ready(Result<unique_ptr<mtproto::AuthKeyHandshake>> r
|
||||
} else {
|
||||
auto handshake = r_handshake.move_as_ok();
|
||||
if (!handshake->is_ready_for_finish()) {
|
||||
LOG(WARNING) << "Handshake is not yet ready";
|
||||
LOG(INFO) << "Handshake is not yet ready";
|
||||
info.handshake_ = std::move(handshake);
|
||||
} else {
|
||||
if (is_main) {
|
||||
@ -1355,6 +1418,7 @@ void Session::create_gen_auth_key_actor(HandshakeId handshake_id) {
|
||||
mtproto::DhCallback *dh_callback_;
|
||||
std::shared_ptr<mtproto::PublicRsaKeyInterface> public_rsa_key_;
|
||||
};
|
||||
|
||||
info.actor_ = create_actor<detail::GenAuthKeyActor>(
|
||||
PSLICE() << get_name() << "::GenAuthKey", get_name(), std::move(info.handshake_),
|
||||
td::make_unique<AuthKeyHandshakeContext>(DhCache::instance(), shared_auth_data_->public_rsa_key()),
|
||||
|
@ -99,6 +99,7 @@ set(TDUTILS_SOURCE
|
||||
td/utils/filesystem.cpp
|
||||
td/utils/find_boundary.cpp
|
||||
td/utils/FlatHashTable.cpp
|
||||
td/utils/FloodControlGlobal.cpp
|
||||
td/utils/Gzip.cpp
|
||||
td/utils/GzipByteFlow.cpp
|
||||
td/utils/Hints.cpp
|
||||
@ -213,6 +214,7 @@ set(TDUTILS_SOURCE
|
||||
td/utils/FlatHashSet.h
|
||||
td/utils/FlatHashTable.h
|
||||
td/utils/FloodControlFast.h
|
||||
td/utils/FloodControlGlobal.h
|
||||
td/utils/FloodControlStrict.h
|
||||
td/utils/format.h
|
||||
td/utils/Gzip.h
|
||||
|
32
tdutils/td/utils/FloodControlGlobal.cpp
Normal file
32
tdutils/td/utils/FloodControlGlobal.cpp
Normal file
@ -0,0 +1,32 @@
|
||||
//
|
||||
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022
|
||||
//
|
||||
// Distributed under the Boost Software License, Version 1.0. (See accompanying
|
||||
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
|
||||
//
|
||||
#include "td/utils/FloodControlGlobal.h"
|
||||
|
||||
namespace td {
|
||||
|
||||
FloodControlGlobal::FloodControlGlobal(uint64 limit) : limit_(limit) {
|
||||
}
|
||||
|
||||
void FloodControlGlobal::finish() {
|
||||
auto old_value = active_count_.fetch_sub(1, std::memory_order_relaxed);
|
||||
CHECK(old_value > 0);
|
||||
}
|
||||
|
||||
FloodControlGlobal::Guard FloodControlGlobal::try_start() {
|
||||
auto old_value = active_count_.fetch_add(1, std::memory_order_relaxed);
|
||||
if (old_value >= limit_) {
|
||||
finish();
|
||||
return nullptr;
|
||||
}
|
||||
return Guard(this);
|
||||
}
|
||||
|
||||
void FloodControlGlobal::Finish::operator()(FloodControlGlobal *ctrl) const {
|
||||
ctrl->finish();
|
||||
}
|
||||
|
||||
} // namespace td
|
35
tdutils/td/utils/FloodControlGlobal.h
Normal file
35
tdutils/td/utils/FloodControlGlobal.h
Normal file
@ -0,0 +1,35 @@
|
||||
//
|
||||
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022
|
||||
//
|
||||
// Distributed under the Boost Software License, Version 1.0. (See accompanying
|
||||
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
|
||||
//
|
||||
#pragma once
|
||||
|
||||
#include "td/utils/common.h"
|
||||
|
||||
#include <atomic>
|
||||
#include <memory>
|
||||
|
||||
namespace td {
|
||||
|
||||
// Restricts the total number of events
|
||||
class FloodControlGlobal {
|
||||
public:
|
||||
explicit FloodControlGlobal(uint64 limit);
|
||||
|
||||
struct Finish {
|
||||
void operator()(FloodControlGlobal *ctrl) const;
|
||||
};
|
||||
using Guard = std::unique_ptr<FloodControlGlobal, Finish>;
|
||||
|
||||
Guard try_start();
|
||||
|
||||
private:
|
||||
std::atomic<uint64> active_count_{0};
|
||||
uint64 limit_{0};
|
||||
|
||||
void finish();
|
||||
};
|
||||
|
||||
} // namespace td
|
Loading…
x
Reference in New Issue
Block a user