From 30e1697157da17c28e5b1653141c47c28b5b19d6 Mon Sep 17 00:00:00 2001 From: Arseny Smirnov Date: Wed, 1 May 2019 16:15:54 +0200 Subject: [PATCH] FileGcWorker and FileStatsWorker cancellation GitOrigin-RevId: 6332bd0800b32a6ca3089223be3995fd06a3118b --- example/web/tdweb/src/index.js | 3 ++ td/telegram/StorageManager.cpp | 21 +++++++- td/telegram/StorageManager.h | 4 ++ td/telegram/cli.cpp | 1 + td/telegram/files/FileDb.cpp | 1 + td/telegram/files/FileGcWorker.cpp | 7 +++ td/telegram/files/FileGcWorker.h | 4 +- td/telegram/files/FileStatsWorker.cpp | 73 ++++++++++++++------------- td/telegram/files/FileStatsWorker.h | 6 ++- tddb/td/db/SqliteKeyValue.h | 9 +++- 10 files changed, 89 insertions(+), 40 deletions(-) diff --git a/example/web/tdweb/src/index.js b/example/web/tdweb/src/index.js index 7da270cc..56cdcb45 100644 --- a/example/web/tdweb/src/index.js +++ b/example/web/tdweb/src/index.js @@ -535,6 +535,9 @@ class FileManager { request.onsuccess = event => { const blob = event.target.result; if (blob) { + if (blob.size == 0) { + log.error('Got empty blob from db ', query.key); + } query.resolve({ data: blob, transaction_id: transaction_id }); } else { query.reject(); diff --git a/td/telegram/StorageManager.cpp b/td/telegram/StorageManager.cpp index 45979190..d29c6cd5 100644 --- a/td/telegram/StorageManager.cpp +++ b/td/telegram/StorageManager.cpp @@ -55,6 +55,10 @@ void StorageManager::on_new_file(int64 size, int32 cnt) { } void StorageManager::get_storage_stats(int32 dialog_limit, Promise promise) { + if (is_closed_) { + promise.set_error(Status::Error(500, "Request aborted")); + return; + } if (pending_storage_stats_.size() != 0) { promise.set_error(Status::Error(400, "Another storage stats is active")); return; @@ -89,6 +93,10 @@ void StorageManager::update_use_storage_optimizer() { } void StorageManager::run_gc(FileGcParameters parameters, Promise promise) { + if (is_closed_) { + promise.set_error(Status::Error(500, "Request aborted")); + return; + } if (pending_run_gc_.size() != 0) { promise.set_error(Status::Error(400, "Another gc is active")); return; @@ -123,8 +131,10 @@ void StorageManager::on_file_stats(Result r_file_stats, bool dummy) { } void StorageManager::create_stats_worker() { + CHECK(!is_closed_); if (stats_worker_.empty()) { - stats_worker_ = create_actor_on_scheduler("FileStatsWorker", scheduler_id_, create_reference()); + stats_worker_ = create_actor_on_scheduler("FileStatsWorker", scheduler_id_, create_reference(), + cancellation_token_source_.get_cancellation_token()); } } @@ -181,8 +191,10 @@ int64 StorageManager::get_log_size() { } void StorageManager::create_gc_worker() { + CHECK(!is_closed_); if (gc_worker_.empty()) { - gc_worker_ = create_actor_on_scheduler("FileGcWorker", scheduler_id_, create_reference()); + gc_worker_ = create_actor_on_scheduler("FileGcWorker", scheduler_id_, create_reference(), + cancellation_token_source_.get_cancellation_token()); } } @@ -231,6 +243,7 @@ void StorageManager::send_stats(FileStats &&stats, int32 dialog_limit, std::vect } ActorShared<> StorageManager::create_reference() { + ref_cnt_++; return actor_shared(this, 1); } @@ -242,6 +255,10 @@ void StorageManager::hangup_shared() { } void StorageManager::hangup() { + is_closed_ = true; + gc_worker_.reset(); + stats_worker_.reset(); + cancellation_token_source_.cancel(); hangup_shared(); } diff --git a/td/telegram/StorageManager.h b/td/telegram/StorageManager.h index 2df9506e..ad9dd03d 100644 --- a/td/telegram/StorageManager.h +++ b/td/telegram/StorageManager.h @@ -15,6 +15,7 @@ #include "td/telegram/td_api.h" #include "td/utils/common.h" +#include "td/utils/CancellationToken.h" #include "td/utils/Slice.h" #include "td/utils/Status.h" @@ -55,6 +56,8 @@ class StorageManager : public Actor { FileTypeStat fast_stat_; + CancellationTokenSource cancellation_token_source_; + void on_file_stats(Result r_file_stats, bool dummy); void create_stats_worker(); void send_stats(FileStats &&stats, int32 dialog_limit, std::vector> promises); @@ -68,6 +71,7 @@ class StorageManager : public Actor { // RefCnt int32 ref_cnt_{1}; + bool is_closed_{false}; ActorShared<> create_reference(); void start_up() override; void hangup_shared() override; diff --git a/td/telegram/cli.cpp b/td/telegram/cli.cpp index c7b81e4a..b48f807c 100644 --- a/td/telegram/cli.cpp +++ b/td/telegram/cli.cpp @@ -2022,6 +2022,7 @@ class CliClient final : public Actor { } else if (op == "storage") { auto chat_limit = to_integer(args); send_request(td_api::make_object(chat_limit)); + //quit(); } else if (op == "storage_fast") { send_request(td_api::make_object()); } else if (op == "database") { diff --git a/td/telegram/files/FileDb.cpp b/td/telegram/files/FileDb.cpp index 3b98c77b..e01c2c91 100644 --- a/td/telegram/files/FileDb.cpp +++ b/td/telegram/files/FileDb.cpp @@ -318,6 +318,7 @@ Status fix_file_remote_location_key_bug(SqliteDb &db) { } LOG(DEBUG) << "ERASE " << format::as_hex_dump<4>(Slice(key)); kv.erase(key); + return true; }); return Status::OK(); } diff --git a/td/telegram/files/FileGcWorker.cpp b/td/telegram/files/FileGcWorker.cpp index 900010e5..d2c4e565 100644 --- a/td/telegram/files/FileGcWorker.cpp +++ b/td/telegram/files/FileGcWorker.cpp @@ -91,6 +91,9 @@ void FileGcWorker::run_gc(const FileGcParameters ¶meters, std::vector(info.file_type)]) { type_immunity_ignored_cnt++; new_stats.add(FullFileInfo(info)); @@ -142,6 +145,10 @@ void FileGcWorker::run_gc(const FileGcParameters ¶meters, std::vector 0 || remove_size > 0)) { + if (token_) { + promise.set_error(Status::Error(500, "Request aborted")); + return; + } if (remove_count > 0) { remove_by_count_cnt++; } else { diff --git a/td/telegram/files/FileGcWorker.h b/td/telegram/files/FileGcWorker.h index 39dca139..1e99459f 100644 --- a/td/telegram/files/FileGcWorker.h +++ b/td/telegram/files/FileGcWorker.h @@ -12,6 +12,7 @@ #include "td/telegram/files/FileGcParameters.h" #include "td/telegram/files/FileStats.h" +#include "td/utils/CancellationToken.h" #include "td/utils/logging.h" namespace td { @@ -20,12 +21,13 @@ extern int VERBOSITY_NAME(file_gc); class FileGcWorker : public Actor { public: - explicit FileGcWorker(ActorShared<> parent) : parent_(std::move(parent)) { + FileGcWorker(ActorShared<> parent, CancellationToken token) : parent_(std::move(parent)), token_(std::move(token)) { } void run_gc(const FileGcParameters ¶meters, std::vector files, Promise promise); private: ActorShared<> parent_; + CancellationToken token_; void do_remove_file(const FullFileInfo &info); }; diff --git a/td/telegram/files/FileStatsWorker.cpp b/td/telegram/files/FileStatsWorker.cpp index a6f0769b..0f8164ac 100644 --- a/td/telegram/files/FileStatsWorker.cpp +++ b/td/telegram/files/FileStatsWorker.cpp @@ -47,18 +47,21 @@ struct DbFileInfo { // long and blocking template -void scan_db(CallbackT &&callback) { +void scan_db(CancellationToken &token, CallbackT &&callback) { G()->td_db()->get_file_db_shared()->pmc().get_by_range("file0", "file:", [&](Slice key, Slice value) { + if (token) { + return false; + } // skip reference to other data if (value.substr(0, 2) == "@@") { - return; + return true; } TlParser parser(value); FileData data; data.parse(parser, false); if (parser.get_status().is_error()) { LOG(ERROR) << "Invalid FileData in the database " << tag("value", format::escaped(value)); - return; + return true; } DbFileInfo info; if (data.local_.type() == LocalFileLocation::Type::Full) { @@ -68,7 +71,7 @@ void scan_db(CallbackT &&callback) { info.file_type = data.local_.partial().file_type_; info.path = data.local_.partial().path_; } else { - return; + return true; } PathView path_view(info.path); if (path_view.is_relative()) { @@ -79,9 +82,10 @@ void scan_db(CallbackT &&callback) { info.size = data.size_; if (info.size == 0 && data.local_.type() == LocalFileLocation::Type::Full) { LOG(ERROR) << "Unknown size in the database"; - return; + return true; } callback(info); + return true; }); } @@ -95,39 +99,40 @@ struct FsFileInfo { // long and blocking template -void scan_fs(CallbackT &&callback) { +void scan_fs(CancellationToken &token, CallbackT &&callback) { for (int32 i = 0; i < file_type_size; i++) { auto file_type = static_cast(i); if (file_type == FileType::SecureRaw) { continue; } auto files_dir = get_files_dir(file_type); - td::walk_path(files_dir, - [&](CSlice path, bool is_dir) { - if (is_dir) { - // TODO: skip subdirs - return; - } - auto r_stat = stat(path); - if (r_stat.is_error()) { - LOG(WARNING) << "Stat in files gc failed: " << r_stat.error(); - return; - } - auto stat = r_stat.move_as_ok(); - if (ends_with(path, "/.nomedia") && stat.size_ == 0) { - // skip .nomedia file - return; - } + td::walk_path(files_dir, [&](CSlice path, WalkPath::Type type) { + if (token) { + return WalkPath::Action::Abort; + } + if (type != WalkPath::Type::NotDir) { + return WalkPath::Action::Continue; + } + auto r_stat = stat(path); + if (r_stat.is_error()) { + LOG(WARNING) << "Stat in files gc failed: " << r_stat.error(); + return WalkPath::Action::Continue; + } + auto stat = r_stat.move_as_ok(); + if (ends_with(path, "/.nomedia") && stat.size_ == 0) { + // skip .nomedia file + return WalkPath::Action::Continue; + } - FsFileInfo info; - info.path = path.str(); - info.size = stat.size_; - info.file_type = file_type; - info.atime_nsec = stat.atime_nsec_; - info.mtime_nsec = stat.mtime_nsec_; - callback(info); - }) - .ignore(); + FsFileInfo info; + info.path = path.str(); + info.size = stat.size_; + info.file_type = file_type; + info.atime_nsec = stat.atime_nsec_; + info.mtime_nsec = stat.mtime_nsec_; + callback(info); + return WalkPath::Action::Continue; + }).ignore(); } } } // namespace @@ -140,7 +145,7 @@ void FileStatsWorker::get_stats(bool need_all_files, bool split_by_owner_dialog_ FileStats file_stats; file_stats.need_all_files = need_all_files; auto start = Time::now(); - scan_fs([&](FsFileInfo &fs_info) { + scan_fs(token_, [&](FsFileInfo &fs_info) { FullFileInfo info; info.file_type = fs_info.file_type; info.path = std::move(fs_info.path); @@ -156,7 +161,7 @@ void FileStatsWorker::get_stats(bool need_all_files, bool split_by_owner_dialog_ auto start = Time::now(); std::vector full_infos; - scan_fs([&](FsFileInfo &fs_info) { + scan_fs(token_, [&](FsFileInfo &fs_info) { FullFileInfo info; info.file_type = fs_info.file_type; info.path = std::move(fs_info.path); @@ -175,7 +180,7 @@ void FileStatsWorker::get_stats(bool need_all_files, bool split_by_owner_dialog_ hash_to_pos[std::hash()(full_info.path)] = pos; pos++; } - scan_db([&](DbFileInfo &db_info) { + scan_db(token_, [&](DbFileInfo &db_info) { auto it = hash_to_pos.find(std::hash()(db_info.path)); if (it == hash_to_pos.end()) { return; diff --git a/td/telegram/files/FileStatsWorker.h b/td/telegram/files/FileStatsWorker.h index e44f96d9..65d906dd 100644 --- a/td/telegram/files/FileStatsWorker.h +++ b/td/telegram/files/FileStatsWorker.h @@ -11,16 +11,20 @@ #include "td/telegram/files/FileStats.h" +#include "td/utils/CancellationToken.h" + namespace td { class FileStatsWorker : public Actor { public: - explicit FileStatsWorker(ActorShared<> parent) : parent_(std::move(parent)) { + FileStatsWorker(ActorShared<> parent, CancellationToken token) + : parent_(std::move(parent)), token_(std::move(token)) { } void get_stats(bool need_all_files, bool split_by_owner_dialog_id, Promise promise); private: ActorShared<> parent_; + CancellationToken token_; }; } // namespace td diff --git a/tddb/td/db/SqliteKeyValue.h b/tddb/td/db/SqliteKeyValue.h index 84a034e2..d027d422 100644 --- a/tddb/td/db/SqliteKeyValue.h +++ b/tddb/td/db/SqliteKeyValue.h @@ -64,7 +64,10 @@ class SqliteKeyValue { std::unordered_map get_all() { std::unordered_map res; - get_by_prefix("", [&](Slice key, Slice value) { res.emplace(key.str(), value.str()); }); + get_by_prefix("", [&](Slice key, Slice value) { + res.emplace(key.str(), value.str()); + return true; + }); return res; } @@ -95,7 +98,9 @@ class SqliteKeyValue { auto guard = stmt->guard(); stmt->step().ensure(); while (stmt->has_row()) { - callback(stmt->view_blob(0), stmt->view_blob(1)); + if (!callback(stmt->view_blob(0), stmt->view_blob(1))) { + return; + } stmt->step().ensure(); } }