StorageManager: new query cancels older queries
GitOrigin-RevId: 42d33f212554dce388f7be3b0a69efd756e7b547
This commit is contained in:
parent
fb3d439dac
commit
ec28b6c48b
@ -54,23 +54,32 @@ void StorageManager::on_new_file(int64 size, int32 cnt) {
|
|||||||
save_fast_stat();
|
save_fast_stat();
|
||||||
}
|
}
|
||||||
|
|
||||||
void StorageManager::get_storage_stats(int32 dialog_limit, Promise<FileStats> promise) {
|
void StorageManager::get_storage_stats(bool need_all_files, int32 dialog_limit, Promise<FileStats> promise) {
|
||||||
if (is_closed_) {
|
if (is_closed_) {
|
||||||
promise.set_error(Status::Error(500, "Request aborted"));
|
promise.set_error(Status::Error(500, "Request aborted"));
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (pending_storage_stats_.size() != 0) {
|
if (pending_storage_stats_.size() != 0) {
|
||||||
promise.set_error(Status::Error(400, "Another storage stats is active"));
|
if (stats_dialog_limit_ == dialog_limit && need_all_files == stats_need_all_files_) {
|
||||||
return;
|
pending_storage_stats_.emplace_back(std::move(promise));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
//TODO group same queries
|
||||||
|
close_stats_worker();
|
||||||
|
}
|
||||||
|
if (pending_run_gc_.size() != 0) {
|
||||||
|
close_gc_worker();
|
||||||
}
|
}
|
||||||
stats_dialog_limit_ = dialog_limit;
|
stats_dialog_limit_ = dialog_limit;
|
||||||
|
stats_need_all_files_ = need_all_files;
|
||||||
pending_storage_stats_.emplace_back(std::move(promise));
|
pending_storage_stats_.emplace_back(std::move(promise));
|
||||||
|
|
||||||
create_stats_worker();
|
create_stats_worker();
|
||||||
send_closure(stats_worker_, &FileStatsWorker::get_stats, false /*need_all_files*/, stats_dialog_limit_ != 0,
|
send_closure(stats_worker_, &FileStatsWorker::get_stats, need_all_files, stats_dialog_limit_ != 0,
|
||||||
PromiseCreator::lambda([actor_id = actor_id(this)](Result<FileStats> file_stats) {
|
PromiseCreator::lambda(
|
||||||
send_closure(actor_id, &StorageManager::on_file_stats, std::move(file_stats), false);
|
[actor_id = actor_id(this), stats_generation = stats_generation_](Result<FileStats> file_stats) {
|
||||||
}));
|
send_closure(actor_id, &StorageManager::on_file_stats, std::move(file_stats), stats_generation);
|
||||||
|
}));
|
||||||
}
|
}
|
||||||
|
|
||||||
void StorageManager::get_storage_stats_fast(Promise<FileStatsFast> promise) {
|
void StorageManager::get_storage_stats_fast(Promise<FileStatsFast> promise) {
|
||||||
@ -98,27 +107,25 @@ void StorageManager::run_gc(FileGcParameters parameters, Promise<FileStats> prom
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (pending_run_gc_.size() != 0) {
|
if (pending_run_gc_.size() != 0) {
|
||||||
promise.set_error(Status::Error(400, "Another gc is active"));
|
close_gc_worker();
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
get_storage_stats(true /*need_all_file*/,
|
||||||
|
!gc_parameters_.owner_dialog_ids.empty() || !gc_parameters_.exclude_owner_dialog_ids.empty() ||
|
||||||
|
gc_parameters_.dialog_limit != 0 /*split_by_owner_dialog_id*/,
|
||||||
|
PromiseCreator::lambda([actor_id = actor_id(this)](Result<FileStats> file_stats) {
|
||||||
|
send_closure(actor_id, &StorageManager::on_all_files, std::move(file_stats), false);
|
||||||
|
}));
|
||||||
|
|
||||||
|
//NB: get_storage stats will cancel all gc queries
|
||||||
pending_run_gc_.emplace_back(std::move(promise));
|
pending_run_gc_.emplace_back(std::move(promise));
|
||||||
if (pending_run_gc_.size() > 1) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
gc_parameters_ = std::move(parameters);
|
gc_parameters_ = std::move(parameters);
|
||||||
|
|
||||||
create_stats_worker();
|
|
||||||
send_closure(stats_worker_, &FileStatsWorker::get_stats, true /*need_all_file*/,
|
|
||||||
!gc_parameters_.owner_dialog_ids.empty() || !gc_parameters_.exclude_owner_dialog_ids.empty() ||
|
|
||||||
gc_parameters_.dialog_limit != 0 /*split_by_owner_dialog_id*/,
|
|
||||||
PromiseCreator::lambda([actor_id = actor_id(this)](Result<FileStats> file_stats) {
|
|
||||||
send_closure(actor_id, &StorageManager::on_all_files, std::move(file_stats), false);
|
|
||||||
}));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void StorageManager::on_file_stats(Result<FileStats> r_file_stats, bool dummy) {
|
void StorageManager::on_file_stats(Result<FileStats> r_file_stats, uint32 generation) {
|
||||||
|
if (generation != stats_generation_) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
if (r_file_stats.is_error()) {
|
if (r_file_stats.is_error()) {
|
||||||
auto promises = std::move(pending_storage_stats_);
|
auto promises = std::move(pending_storage_stats_);
|
||||||
for (auto &promise : promises) {
|
for (auto &promise : promises) {
|
||||||
@ -133,8 +140,9 @@ void StorageManager::on_file_stats(Result<FileStats> r_file_stats, bool dummy) {
|
|||||||
void StorageManager::create_stats_worker() {
|
void StorageManager::create_stats_worker() {
|
||||||
CHECK(!is_closed_);
|
CHECK(!is_closed_);
|
||||||
if (stats_worker_.empty()) {
|
if (stats_worker_.empty()) {
|
||||||
stats_worker_ = create_actor_on_scheduler<FileStatsWorker>("FileStatsWorker", scheduler_id_, create_reference(),
|
stats_worker_ =
|
||||||
cancellation_token_source_.get_cancellation_token());
|
create_actor_on_scheduler<FileStatsWorker>("FileStatsWorker", scheduler_id_, create_reference(),
|
||||||
|
stats_cancellation_token_source_.get_cancellation_token());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -192,7 +200,7 @@ void StorageManager::create_gc_worker() {
|
|||||||
CHECK(!is_closed_);
|
CHECK(!is_closed_);
|
||||||
if (gc_worker_.empty()) {
|
if (gc_worker_.empty()) {
|
||||||
gc_worker_ = create_actor_on_scheduler<FileGcWorker>("FileGcWorker", scheduler_id_, create_reference(),
|
gc_worker_ = create_actor_on_scheduler<FileGcWorker>("FileGcWorker", scheduler_id_, create_reference(),
|
||||||
cancellation_token_source_.get_cancellation_token());
|
gc_cancellation_token_source_.get_cancellation_token());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -254,11 +262,30 @@ void StorageManager::hangup_shared() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void StorageManager::close_stats_worker() {
|
||||||
|
for (auto &promise : pending_storage_stats_) {
|
||||||
|
promise.set_error(Status::Error(500, "Request aborted"));
|
||||||
|
}
|
||||||
|
pending_storage_stats_.clear();
|
||||||
|
stats_generation_++;
|
||||||
|
stats_worker_.reset();
|
||||||
|
stats_cancellation_token_source_.cancel();
|
||||||
|
}
|
||||||
|
|
||||||
|
void StorageManager::close_gc_worker() {
|
||||||
|
for (auto &promise : pending_run_gc_) {
|
||||||
|
promise.set_error(Status::Error(500, "Request aborted"));
|
||||||
|
}
|
||||||
|
pending_run_gc_.clear();
|
||||||
|
gc_generation_++;
|
||||||
|
gc_worker_.reset();
|
||||||
|
gc_cancellation_token_source_.cancel();
|
||||||
|
}
|
||||||
|
|
||||||
void StorageManager::hangup() {
|
void StorageManager::hangup() {
|
||||||
is_closed_ = true;
|
is_closed_ = true;
|
||||||
gc_worker_.reset();
|
close_stats_worker();
|
||||||
stats_worker_.reset();
|
close_gc_worker();
|
||||||
cancellation_token_source_.cancel();
|
|
||||||
hangup_shared();
|
hangup_shared();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -302,9 +329,14 @@ void StorageManager::timeout_expired() {
|
|||||||
if (next_gc_at_ == 0) {
|
if (next_gc_at_ == 0) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
if (!pending_run_gc_.empty() || !pending_storage_stats_.empty()) {
|
||||||
|
set_timeout_in(60);
|
||||||
|
return;
|
||||||
|
}
|
||||||
next_gc_at_ = 0;
|
next_gc_at_ = 0;
|
||||||
run_gc({}, PromiseCreator::lambda([actor_id = actor_id(this)](Result<FileStats> r_stats) {
|
run_gc({}, PromiseCreator::lambda([actor_id = actor_id(this)](Result<FileStats> r_stats) {
|
||||||
if (!r_stats.is_error() || r_stats.error().code() != 1) {
|
if (!r_stats.is_error() || r_stats.error().code() != 500) {
|
||||||
|
// do not save gc timestamp is request was cancelled
|
||||||
send_closure(actor_id, &StorageManager::save_last_gc_timestamp);
|
send_closure(actor_id, &StorageManager::save_last_gc_timestamp);
|
||||||
}
|
}
|
||||||
send_closure(actor_id, &StorageManager::schedule_next_gc);
|
send_closure(actor_id, &StorageManager::schedule_next_gc);
|
||||||
|
@ -32,7 +32,7 @@ struct DatabaseStats {
|
|||||||
class StorageManager : public Actor {
|
class StorageManager : public Actor {
|
||||||
public:
|
public:
|
||||||
StorageManager(ActorShared<> parent, int32 scheduler_id);
|
StorageManager(ActorShared<> parent, int32 scheduler_id);
|
||||||
void get_storage_stats(int32 dialog_limit, Promise<FileStats> promise);
|
void get_storage_stats(bool need_all_files, int32 dialog_limit, Promise<FileStats> promise);
|
||||||
void get_storage_stats_fast(Promise<FileStatsFast> promise);
|
void get_storage_stats_fast(Promise<FileStatsFast> promise);
|
||||||
void get_database_stats(Promise<DatabaseStats> promise);
|
void get_database_stats(Promise<DatabaseStats> promise);
|
||||||
void run_gc(FileGcParameters parameters, Promise<FileStats> promise);
|
void run_gc(FileGcParameters parameters, Promise<FileStats> promise);
|
||||||
@ -52,13 +52,16 @@ class StorageManager : public Actor {
|
|||||||
// get stats
|
// get stats
|
||||||
ActorOwn<FileStatsWorker> stats_worker_;
|
ActorOwn<FileStatsWorker> stats_worker_;
|
||||||
std::vector<Promise<FileStats>> pending_storage_stats_;
|
std::vector<Promise<FileStats>> pending_storage_stats_;
|
||||||
int32 stats_dialog_limit_ = 0;
|
uint32 stats_generation_{0};
|
||||||
|
int32 stats_dialog_limit_{0};
|
||||||
|
bool stats_need_all_files_{false};
|
||||||
|
|
||||||
FileTypeStat fast_stat_;
|
FileTypeStat fast_stat_;
|
||||||
|
|
||||||
CancellationTokenSource cancellation_token_source_;
|
CancellationTokenSource stats_cancellation_token_source_;
|
||||||
|
CancellationTokenSource gc_cancellation_token_source_;
|
||||||
|
|
||||||
void on_file_stats(Result<FileStats> r_file_stats, bool dummy);
|
void on_file_stats(Result<FileStats> r_file_stats, uint32 generation);
|
||||||
void create_stats_worker();
|
void create_stats_worker();
|
||||||
void send_stats(FileStats &&stats, int32 dialog_limit, std::vector<Promise<FileStats>> promises);
|
void send_stats(FileStats &&stats, int32 dialog_limit, std::vector<Promise<FileStats>> promises);
|
||||||
|
|
||||||
@ -80,6 +83,7 @@ class StorageManager : public Actor {
|
|||||||
// Gc
|
// Gc
|
||||||
ActorOwn<FileGcWorker> gc_worker_;
|
ActorOwn<FileGcWorker> gc_worker_;
|
||||||
std::vector<Promise<FileStats>> pending_run_gc_;
|
std::vector<Promise<FileStats>> pending_run_gc_;
|
||||||
|
uint32 gc_generation_{0};
|
||||||
FileGcParameters gc_parameters_;
|
FileGcParameters gc_parameters_;
|
||||||
|
|
||||||
uint32 last_gc_timestamp_ = 0;
|
uint32 last_gc_timestamp_ = 0;
|
||||||
@ -89,6 +93,9 @@ class StorageManager : public Actor {
|
|||||||
void create_gc_worker();
|
void create_gc_worker();
|
||||||
void on_gc_finished(Result<FileStats> r_file_stats, bool dummy);
|
void on_gc_finished(Result<FileStats> r_file_stats, bool dummy);
|
||||||
|
|
||||||
|
void close_stats_worker();
|
||||||
|
void close_gc_worker();
|
||||||
|
|
||||||
uint32 load_last_gc_timestamp();
|
uint32 load_last_gc_timestamp();
|
||||||
void save_last_gc_timestamp();
|
void save_last_gc_timestamp();
|
||||||
void schedule_next_gc();
|
void schedule_next_gc();
|
||||||
|
@ -5219,7 +5219,8 @@ void Td::on_request(uint64 id, td_api::getStorageStatistics &request) {
|
|||||||
promise.set_value(result.ok().as_td_api());
|
promise.set_value(result.ok().as_td_api());
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
send_closure(storage_manager_, &StorageManager::get_storage_stats, request.chat_limit_, std::move(query_promise));
|
send_closure(storage_manager_, &StorageManager::get_storage_stats, false /*need_all_files*/, request.chat_limit_,
|
||||||
|
std::move(query_promise));
|
||||||
}
|
}
|
||||||
|
|
||||||
void Td::on_request(uint64 id, td_api::getStorageStatisticsFast &request) {
|
void Td::on_request(uint64 id, td_api::getStorageStatisticsFast &request) {
|
||||||
|
Reference in New Issue
Block a user