From ec28b6c48bb6bda6fa2435aeaa30e3baf163daf3 Mon Sep 17 00:00:00 2001 From: Arseny Smirnov Date: Fri, 2 Aug 2019 16:05:01 +0300 Subject: [PATCH] StorageManager: new query cancels older queries GitOrigin-RevId: 42d33f212554dce388f7be3b0a69efd756e7b547 --- td/telegram/StorageManager.cpp | 90 +++++++++++++++++++++++----------- td/telegram/StorageManager.h | 15 ++++-- td/telegram/Td.cpp | 3 +- 3 files changed, 74 insertions(+), 34 deletions(-) diff --git a/td/telegram/StorageManager.cpp b/td/telegram/StorageManager.cpp index b16254b1..fd0976e6 100644 --- a/td/telegram/StorageManager.cpp +++ b/td/telegram/StorageManager.cpp @@ -54,23 +54,32 @@ void StorageManager::on_new_file(int64 size, int32 cnt) { save_fast_stat(); } -void StorageManager::get_storage_stats(int32 dialog_limit, Promise promise) { +void StorageManager::get_storage_stats(bool need_all_files, int32 dialog_limit, Promise promise) { if (is_closed_) { promise.set_error(Status::Error(500, "Request aborted")); return; } if (pending_storage_stats_.size() != 0) { - promise.set_error(Status::Error(400, "Another storage stats is active")); - return; + if (stats_dialog_limit_ == dialog_limit && need_all_files == stats_need_all_files_) { + pending_storage_stats_.emplace_back(std::move(promise)); + return; + } + //TODO group same queries + close_stats_worker(); + } + if (pending_run_gc_.size() != 0) { + close_gc_worker(); } stats_dialog_limit_ = dialog_limit; + stats_need_all_files_ = need_all_files; pending_storage_stats_.emplace_back(std::move(promise)); create_stats_worker(); - send_closure(stats_worker_, &FileStatsWorker::get_stats, false /*need_all_files*/, stats_dialog_limit_ != 0, - PromiseCreator::lambda([actor_id = actor_id(this)](Result file_stats) { - send_closure(actor_id, &StorageManager::on_file_stats, std::move(file_stats), false); - })); + send_closure(stats_worker_, &FileStatsWorker::get_stats, need_all_files, stats_dialog_limit_ != 0, + PromiseCreator::lambda( + [actor_id = actor_id(this), stats_generation = stats_generation_](Result file_stats) { + send_closure(actor_id, &StorageManager::on_file_stats, std::move(file_stats), stats_generation); + })); } void StorageManager::get_storage_stats_fast(Promise promise) { @@ -98,27 +107,25 @@ void StorageManager::run_gc(FileGcParameters parameters, Promise prom return; } if (pending_run_gc_.size() != 0) { - promise.set_error(Status::Error(400, "Another gc is active")); - return; + close_gc_worker(); } + get_storage_stats(true /*need_all_file*/, + !gc_parameters_.owner_dialog_ids.empty() || !gc_parameters_.exclude_owner_dialog_ids.empty() || + gc_parameters_.dialog_limit != 0 /*split_by_owner_dialog_id*/, + PromiseCreator::lambda([actor_id = actor_id(this)](Result file_stats) { + send_closure(actor_id, &StorageManager::on_all_files, std::move(file_stats), false); + })); + + //NB: get_storage stats will cancel all gc queries pending_run_gc_.emplace_back(std::move(promise)); - if (pending_run_gc_.size() > 1) { - return; - } - gc_parameters_ = std::move(parameters); - - create_stats_worker(); - send_closure(stats_worker_, &FileStatsWorker::get_stats, true /*need_all_file*/, - !gc_parameters_.owner_dialog_ids.empty() || !gc_parameters_.exclude_owner_dialog_ids.empty() || - gc_parameters_.dialog_limit != 0 /*split_by_owner_dialog_id*/, - PromiseCreator::lambda([actor_id = actor_id(this)](Result file_stats) { - send_closure(actor_id, &StorageManager::on_all_files, std::move(file_stats), false); - })); } -void StorageManager::on_file_stats(Result r_file_stats, bool dummy) { +void StorageManager::on_file_stats(Result r_file_stats, uint32 generation) { + if (generation != stats_generation_) { + return; + } if (r_file_stats.is_error()) { auto promises = std::move(pending_storage_stats_); for (auto &promise : promises) { @@ -133,8 +140,9 @@ void StorageManager::on_file_stats(Result r_file_stats, bool dummy) { void StorageManager::create_stats_worker() { CHECK(!is_closed_); if (stats_worker_.empty()) { - stats_worker_ = create_actor_on_scheduler("FileStatsWorker", scheduler_id_, create_reference(), - cancellation_token_source_.get_cancellation_token()); + stats_worker_ = + create_actor_on_scheduler("FileStatsWorker", scheduler_id_, create_reference(), + stats_cancellation_token_source_.get_cancellation_token()); } } @@ -192,7 +200,7 @@ void StorageManager::create_gc_worker() { CHECK(!is_closed_); if (gc_worker_.empty()) { gc_worker_ = create_actor_on_scheduler("FileGcWorker", scheduler_id_, create_reference(), - cancellation_token_source_.get_cancellation_token()); + gc_cancellation_token_source_.get_cancellation_token()); } } @@ -254,11 +262,30 @@ void StorageManager::hangup_shared() { } } +void StorageManager::close_stats_worker() { + for (auto &promise : pending_storage_stats_) { + promise.set_error(Status::Error(500, "Request aborted")); + } + pending_storage_stats_.clear(); + stats_generation_++; + stats_worker_.reset(); + stats_cancellation_token_source_.cancel(); +} + +void StorageManager::close_gc_worker() { + for (auto &promise : pending_run_gc_) { + promise.set_error(Status::Error(500, "Request aborted")); + } + pending_run_gc_.clear(); + gc_generation_++; + gc_worker_.reset(); + gc_cancellation_token_source_.cancel(); +} + void StorageManager::hangup() { is_closed_ = true; - gc_worker_.reset(); - stats_worker_.reset(); - cancellation_token_source_.cancel(); + close_stats_worker(); + close_gc_worker(); hangup_shared(); } @@ -302,9 +329,14 @@ void StorageManager::timeout_expired() { if (next_gc_at_ == 0) { return; } + if (!pending_run_gc_.empty() || !pending_storage_stats_.empty()) { + set_timeout_in(60); + return; + } next_gc_at_ = 0; run_gc({}, PromiseCreator::lambda([actor_id = actor_id(this)](Result r_stats) { - if (!r_stats.is_error() || r_stats.error().code() != 1) { + if (!r_stats.is_error() || r_stats.error().code() != 500) { + // do not save gc timestamp is request was cancelled send_closure(actor_id, &StorageManager::save_last_gc_timestamp); } send_closure(actor_id, &StorageManager::schedule_next_gc); diff --git a/td/telegram/StorageManager.h b/td/telegram/StorageManager.h index e6d211f4..dc93b378 100644 --- a/td/telegram/StorageManager.h +++ b/td/telegram/StorageManager.h @@ -32,7 +32,7 @@ struct DatabaseStats { class StorageManager : public Actor { public: StorageManager(ActorShared<> parent, int32 scheduler_id); - void get_storage_stats(int32 dialog_limit, Promise promise); + void get_storage_stats(bool need_all_files, int32 dialog_limit, Promise promise); void get_storage_stats_fast(Promise promise); void get_database_stats(Promise promise); void run_gc(FileGcParameters parameters, Promise promise); @@ -52,13 +52,16 @@ class StorageManager : public Actor { // get stats ActorOwn stats_worker_; std::vector> pending_storage_stats_; - int32 stats_dialog_limit_ = 0; + uint32 stats_generation_{0}; + int32 stats_dialog_limit_{0}; + bool stats_need_all_files_{false}; FileTypeStat fast_stat_; - CancellationTokenSource cancellation_token_source_; + CancellationTokenSource stats_cancellation_token_source_; + CancellationTokenSource gc_cancellation_token_source_; - void on_file_stats(Result r_file_stats, bool dummy); + void on_file_stats(Result r_file_stats, uint32 generation); void create_stats_worker(); void send_stats(FileStats &&stats, int32 dialog_limit, std::vector> promises); @@ -80,6 +83,7 @@ class StorageManager : public Actor { // Gc ActorOwn gc_worker_; std::vector> pending_run_gc_; + uint32 gc_generation_{0}; FileGcParameters gc_parameters_; uint32 last_gc_timestamp_ = 0; @@ -89,6 +93,9 @@ class StorageManager : public Actor { void create_gc_worker(); void on_gc_finished(Result r_file_stats, bool dummy); + void close_stats_worker(); + void close_gc_worker(); + uint32 load_last_gc_timestamp(); void save_last_gc_timestamp(); void schedule_next_gc(); diff --git a/td/telegram/Td.cpp b/td/telegram/Td.cpp index 87caa258..c3863f44 100644 --- a/td/telegram/Td.cpp +++ b/td/telegram/Td.cpp @@ -5219,7 +5219,8 @@ void Td::on_request(uint64 id, td_api::getStorageStatistics &request) { promise.set_value(result.ok().as_td_api()); } }); - send_closure(storage_manager_, &StorageManager::get_storage_stats, request.chat_limit_, std::move(query_promise)); + send_closure(storage_manager_, &StorageManager::get_storage_stats, false /*need_all_files*/, request.chat_limit_, + std::move(query_promise)); } void Td::on_request(uint64 id, td_api::getStorageStatisticsFast &request) {