FileGcWorker and FileStatsWorker cancellation
GitOrigin-RevId: 6332bd0800b32a6ca3089223be3995fd06a3118b
This commit is contained in:
parent
e3e54b7a53
commit
30e1697157
@ -535,6 +535,9 @@ class FileManager {
|
|||||||
request.onsuccess = event => {
|
request.onsuccess = event => {
|
||||||
const blob = event.target.result;
|
const blob = event.target.result;
|
||||||
if (blob) {
|
if (blob) {
|
||||||
|
if (blob.size == 0) {
|
||||||
|
log.error('Got empty blob from db ', query.key);
|
||||||
|
}
|
||||||
query.resolve({ data: blob, transaction_id: transaction_id });
|
query.resolve({ data: blob, transaction_id: transaction_id });
|
||||||
} else {
|
} else {
|
||||||
query.reject();
|
query.reject();
|
||||||
|
@ -55,6 +55,10 @@ void StorageManager::on_new_file(int64 size, int32 cnt) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
void StorageManager::get_storage_stats(int32 dialog_limit, Promise<FileStats> promise) {
|
void StorageManager::get_storage_stats(int32 dialog_limit, Promise<FileStats> promise) {
|
||||||
|
if (is_closed_) {
|
||||||
|
promise.set_error(Status::Error(500, "Request aborted"));
|
||||||
|
return;
|
||||||
|
}
|
||||||
if (pending_storage_stats_.size() != 0) {
|
if (pending_storage_stats_.size() != 0) {
|
||||||
promise.set_error(Status::Error(400, "Another storage stats is active"));
|
promise.set_error(Status::Error(400, "Another storage stats is active"));
|
||||||
return;
|
return;
|
||||||
@ -89,6 +93,10 @@ void StorageManager::update_use_storage_optimizer() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
void StorageManager::run_gc(FileGcParameters parameters, Promise<FileStats> promise) {
|
void StorageManager::run_gc(FileGcParameters parameters, Promise<FileStats> promise) {
|
||||||
|
if (is_closed_) {
|
||||||
|
promise.set_error(Status::Error(500, "Request aborted"));
|
||||||
|
return;
|
||||||
|
}
|
||||||
if (pending_run_gc_.size() != 0) {
|
if (pending_run_gc_.size() != 0) {
|
||||||
promise.set_error(Status::Error(400, "Another gc is active"));
|
promise.set_error(Status::Error(400, "Another gc is active"));
|
||||||
return;
|
return;
|
||||||
@ -123,8 +131,10 @@ void StorageManager::on_file_stats(Result<FileStats> r_file_stats, bool dummy) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
void StorageManager::create_stats_worker() {
|
void StorageManager::create_stats_worker() {
|
||||||
|
CHECK(!is_closed_);
|
||||||
if (stats_worker_.empty()) {
|
if (stats_worker_.empty()) {
|
||||||
stats_worker_ = create_actor_on_scheduler<FileStatsWorker>("FileStatsWorker", scheduler_id_, create_reference());
|
stats_worker_ = create_actor_on_scheduler<FileStatsWorker>("FileStatsWorker", scheduler_id_, create_reference(),
|
||||||
|
cancellation_token_source_.get_cancellation_token());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -181,8 +191,10 @@ int64 StorageManager::get_log_size() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
void StorageManager::create_gc_worker() {
|
void StorageManager::create_gc_worker() {
|
||||||
|
CHECK(!is_closed_);
|
||||||
if (gc_worker_.empty()) {
|
if (gc_worker_.empty()) {
|
||||||
gc_worker_ = create_actor_on_scheduler<FileGcWorker>("FileGcWorker", scheduler_id_, create_reference());
|
gc_worker_ = create_actor_on_scheduler<FileGcWorker>("FileGcWorker", scheduler_id_, create_reference(),
|
||||||
|
cancellation_token_source_.get_cancellation_token());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -231,6 +243,7 @@ void StorageManager::send_stats(FileStats &&stats, int32 dialog_limit, std::vect
|
|||||||
}
|
}
|
||||||
|
|
||||||
ActorShared<> StorageManager::create_reference() {
|
ActorShared<> StorageManager::create_reference() {
|
||||||
|
ref_cnt_++;
|
||||||
return actor_shared(this, 1);
|
return actor_shared(this, 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -242,6 +255,10 @@ void StorageManager::hangup_shared() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
void StorageManager::hangup() {
|
void StorageManager::hangup() {
|
||||||
|
is_closed_ = true;
|
||||||
|
gc_worker_.reset();
|
||||||
|
stats_worker_.reset();
|
||||||
|
cancellation_token_source_.cancel();
|
||||||
hangup_shared();
|
hangup_shared();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -15,6 +15,7 @@
|
|||||||
#include "td/telegram/td_api.h"
|
#include "td/telegram/td_api.h"
|
||||||
|
|
||||||
#include "td/utils/common.h"
|
#include "td/utils/common.h"
|
||||||
|
#include "td/utils/CancellationToken.h"
|
||||||
#include "td/utils/Slice.h"
|
#include "td/utils/Slice.h"
|
||||||
#include "td/utils/Status.h"
|
#include "td/utils/Status.h"
|
||||||
|
|
||||||
@ -55,6 +56,8 @@ class StorageManager : public Actor {
|
|||||||
|
|
||||||
FileTypeStat fast_stat_;
|
FileTypeStat fast_stat_;
|
||||||
|
|
||||||
|
CancellationTokenSource cancellation_token_source_;
|
||||||
|
|
||||||
void on_file_stats(Result<FileStats> r_file_stats, bool dummy);
|
void on_file_stats(Result<FileStats> r_file_stats, bool dummy);
|
||||||
void create_stats_worker();
|
void create_stats_worker();
|
||||||
void send_stats(FileStats &&stats, int32 dialog_limit, std::vector<Promise<FileStats>> promises);
|
void send_stats(FileStats &&stats, int32 dialog_limit, std::vector<Promise<FileStats>> promises);
|
||||||
@ -68,6 +71,7 @@ class StorageManager : public Actor {
|
|||||||
|
|
||||||
// RefCnt
|
// RefCnt
|
||||||
int32 ref_cnt_{1};
|
int32 ref_cnt_{1};
|
||||||
|
bool is_closed_{false};
|
||||||
ActorShared<> create_reference();
|
ActorShared<> create_reference();
|
||||||
void start_up() override;
|
void start_up() override;
|
||||||
void hangup_shared() override;
|
void hangup_shared() override;
|
||||||
|
@ -2022,6 +2022,7 @@ class CliClient final : public Actor {
|
|||||||
} else if (op == "storage") {
|
} else if (op == "storage") {
|
||||||
auto chat_limit = to_integer<int32>(args);
|
auto chat_limit = to_integer<int32>(args);
|
||||||
send_request(td_api::make_object<td_api::getStorageStatistics>(chat_limit));
|
send_request(td_api::make_object<td_api::getStorageStatistics>(chat_limit));
|
||||||
|
//quit();
|
||||||
} else if (op == "storage_fast") {
|
} else if (op == "storage_fast") {
|
||||||
send_request(td_api::make_object<td_api::getStorageStatisticsFast>());
|
send_request(td_api::make_object<td_api::getStorageStatisticsFast>());
|
||||||
} else if (op == "database") {
|
} else if (op == "database") {
|
||||||
|
@ -318,6 +318,7 @@ Status fix_file_remote_location_key_bug(SqliteDb &db) {
|
|||||||
}
|
}
|
||||||
LOG(DEBUG) << "ERASE " << format::as_hex_dump<4>(Slice(key));
|
LOG(DEBUG) << "ERASE " << format::as_hex_dump<4>(Slice(key));
|
||||||
kv.erase(key);
|
kv.erase(key);
|
||||||
|
return true;
|
||||||
});
|
});
|
||||||
return Status::OK();
|
return Status::OK();
|
||||||
}
|
}
|
||||||
|
@ -91,6 +91,9 @@ void FileGcWorker::run_gc(const FileGcParameters ¶meters, std::vector<FullFi
|
|||||||
std::remove_if(
|
std::remove_if(
|
||||||
files.begin(), files.end(),
|
files.begin(), files.end(),
|
||||||
[&](const FullFileInfo &info) {
|
[&](const FullFileInfo &info) {
|
||||||
|
if (token_) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
if (immune_types[narrow_cast<size_t>(info.file_type)]) {
|
if (immune_types[narrow_cast<size_t>(info.file_type)]) {
|
||||||
type_immunity_ignored_cnt++;
|
type_immunity_ignored_cnt++;
|
||||||
new_stats.add(FullFileInfo(info));
|
new_stats.add(FullFileInfo(info));
|
||||||
@ -142,6 +145,10 @@ void FileGcWorker::run_gc(const FileGcParameters ¶meters, std::vector<FullFi
|
|||||||
|
|
||||||
size_t pos = 0;
|
size_t pos = 0;
|
||||||
while (pos < files.size() && (remove_count > 0 || remove_size > 0)) {
|
while (pos < files.size() && (remove_count > 0 || remove_size > 0)) {
|
||||||
|
if (token_) {
|
||||||
|
promise.set_error(Status::Error(500, "Request aborted"));
|
||||||
|
return;
|
||||||
|
}
|
||||||
if (remove_count > 0) {
|
if (remove_count > 0) {
|
||||||
remove_by_count_cnt++;
|
remove_by_count_cnt++;
|
||||||
} else {
|
} else {
|
||||||
|
@ -12,6 +12,7 @@
|
|||||||
#include "td/telegram/files/FileGcParameters.h"
|
#include "td/telegram/files/FileGcParameters.h"
|
||||||
#include "td/telegram/files/FileStats.h"
|
#include "td/telegram/files/FileStats.h"
|
||||||
|
|
||||||
|
#include "td/utils/CancellationToken.h"
|
||||||
#include "td/utils/logging.h"
|
#include "td/utils/logging.h"
|
||||||
|
|
||||||
namespace td {
|
namespace td {
|
||||||
@ -20,12 +21,13 @@ extern int VERBOSITY_NAME(file_gc);
|
|||||||
|
|
||||||
class FileGcWorker : public Actor {
|
class FileGcWorker : public Actor {
|
||||||
public:
|
public:
|
||||||
explicit FileGcWorker(ActorShared<> parent) : parent_(std::move(parent)) {
|
FileGcWorker(ActorShared<> parent, CancellationToken token) : parent_(std::move(parent)), token_(std::move(token)) {
|
||||||
}
|
}
|
||||||
void run_gc(const FileGcParameters ¶meters, std::vector<FullFileInfo> files, Promise<FileStats> promise);
|
void run_gc(const FileGcParameters ¶meters, std::vector<FullFileInfo> files, Promise<FileStats> promise);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
ActorShared<> parent_;
|
ActorShared<> parent_;
|
||||||
|
CancellationToken token_;
|
||||||
void do_remove_file(const FullFileInfo &info);
|
void do_remove_file(const FullFileInfo &info);
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -47,18 +47,21 @@ struct DbFileInfo {
|
|||||||
|
|
||||||
// long and blocking
|
// long and blocking
|
||||||
template <class CallbackT>
|
template <class CallbackT>
|
||||||
void scan_db(CallbackT &&callback) {
|
void scan_db(CancellationToken &token, CallbackT &&callback) {
|
||||||
G()->td_db()->get_file_db_shared()->pmc().get_by_range("file0", "file:", [&](Slice key, Slice value) {
|
G()->td_db()->get_file_db_shared()->pmc().get_by_range("file0", "file:", [&](Slice key, Slice value) {
|
||||||
|
if (token) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
// skip reference to other data
|
// skip reference to other data
|
||||||
if (value.substr(0, 2) == "@@") {
|
if (value.substr(0, 2) == "@@") {
|
||||||
return;
|
return true;
|
||||||
}
|
}
|
||||||
TlParser parser(value);
|
TlParser parser(value);
|
||||||
FileData data;
|
FileData data;
|
||||||
data.parse(parser, false);
|
data.parse(parser, false);
|
||||||
if (parser.get_status().is_error()) {
|
if (parser.get_status().is_error()) {
|
||||||
LOG(ERROR) << "Invalid FileData in the database " << tag("value", format::escaped(value));
|
LOG(ERROR) << "Invalid FileData in the database " << tag("value", format::escaped(value));
|
||||||
return;
|
return true;
|
||||||
}
|
}
|
||||||
DbFileInfo info;
|
DbFileInfo info;
|
||||||
if (data.local_.type() == LocalFileLocation::Type::Full) {
|
if (data.local_.type() == LocalFileLocation::Type::Full) {
|
||||||
@ -68,7 +71,7 @@ void scan_db(CallbackT &&callback) {
|
|||||||
info.file_type = data.local_.partial().file_type_;
|
info.file_type = data.local_.partial().file_type_;
|
||||||
info.path = data.local_.partial().path_;
|
info.path = data.local_.partial().path_;
|
||||||
} else {
|
} else {
|
||||||
return;
|
return true;
|
||||||
}
|
}
|
||||||
PathView path_view(info.path);
|
PathView path_view(info.path);
|
||||||
if (path_view.is_relative()) {
|
if (path_view.is_relative()) {
|
||||||
@ -79,9 +82,10 @@ void scan_db(CallbackT &&callback) {
|
|||||||
info.size = data.size_;
|
info.size = data.size_;
|
||||||
if (info.size == 0 && data.local_.type() == LocalFileLocation::Type::Full) {
|
if (info.size == 0 && data.local_.type() == LocalFileLocation::Type::Full) {
|
||||||
LOG(ERROR) << "Unknown size in the database";
|
LOG(ERROR) << "Unknown size in the database";
|
||||||
return;
|
return true;
|
||||||
}
|
}
|
||||||
callback(info);
|
callback(info);
|
||||||
|
return true;
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -95,28 +99,29 @@ struct FsFileInfo {
|
|||||||
|
|
||||||
// long and blocking
|
// long and blocking
|
||||||
template <class CallbackT>
|
template <class CallbackT>
|
||||||
void scan_fs(CallbackT &&callback) {
|
void scan_fs(CancellationToken &token, CallbackT &&callback) {
|
||||||
for (int32 i = 0; i < file_type_size; i++) {
|
for (int32 i = 0; i < file_type_size; i++) {
|
||||||
auto file_type = static_cast<FileType>(i);
|
auto file_type = static_cast<FileType>(i);
|
||||||
if (file_type == FileType::SecureRaw) {
|
if (file_type == FileType::SecureRaw) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
auto files_dir = get_files_dir(file_type);
|
auto files_dir = get_files_dir(file_type);
|
||||||
td::walk_path(files_dir,
|
td::walk_path(files_dir, [&](CSlice path, WalkPath::Type type) {
|
||||||
[&](CSlice path, bool is_dir) {
|
if (token) {
|
||||||
if (is_dir) {
|
return WalkPath::Action::Abort;
|
||||||
// TODO: skip subdirs
|
}
|
||||||
return;
|
if (type != WalkPath::Type::NotDir) {
|
||||||
|
return WalkPath::Action::Continue;
|
||||||
}
|
}
|
||||||
auto r_stat = stat(path);
|
auto r_stat = stat(path);
|
||||||
if (r_stat.is_error()) {
|
if (r_stat.is_error()) {
|
||||||
LOG(WARNING) << "Stat in files gc failed: " << r_stat.error();
|
LOG(WARNING) << "Stat in files gc failed: " << r_stat.error();
|
||||||
return;
|
return WalkPath::Action::Continue;
|
||||||
}
|
}
|
||||||
auto stat = r_stat.move_as_ok();
|
auto stat = r_stat.move_as_ok();
|
||||||
if (ends_with(path, "/.nomedia") && stat.size_ == 0) {
|
if (ends_with(path, "/.nomedia") && stat.size_ == 0) {
|
||||||
// skip .nomedia file
|
// skip .nomedia file
|
||||||
return;
|
return WalkPath::Action::Continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
FsFileInfo info;
|
FsFileInfo info;
|
||||||
@ -126,8 +131,8 @@ void scan_fs(CallbackT &&callback) {
|
|||||||
info.atime_nsec = stat.atime_nsec_;
|
info.atime_nsec = stat.atime_nsec_;
|
||||||
info.mtime_nsec = stat.mtime_nsec_;
|
info.mtime_nsec = stat.mtime_nsec_;
|
||||||
callback(info);
|
callback(info);
|
||||||
})
|
return WalkPath::Action::Continue;
|
||||||
.ignore();
|
}).ignore();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} // namespace
|
} // namespace
|
||||||
@ -140,7 +145,7 @@ void FileStatsWorker::get_stats(bool need_all_files, bool split_by_owner_dialog_
|
|||||||
FileStats file_stats;
|
FileStats file_stats;
|
||||||
file_stats.need_all_files = need_all_files;
|
file_stats.need_all_files = need_all_files;
|
||||||
auto start = Time::now();
|
auto start = Time::now();
|
||||||
scan_fs([&](FsFileInfo &fs_info) {
|
scan_fs(token_, [&](FsFileInfo &fs_info) {
|
||||||
FullFileInfo info;
|
FullFileInfo info;
|
||||||
info.file_type = fs_info.file_type;
|
info.file_type = fs_info.file_type;
|
||||||
info.path = std::move(fs_info.path);
|
info.path = std::move(fs_info.path);
|
||||||
@ -156,7 +161,7 @@ void FileStatsWorker::get_stats(bool need_all_files, bool split_by_owner_dialog_
|
|||||||
auto start = Time::now();
|
auto start = Time::now();
|
||||||
|
|
||||||
std::vector<FullFileInfo> full_infos;
|
std::vector<FullFileInfo> full_infos;
|
||||||
scan_fs([&](FsFileInfo &fs_info) {
|
scan_fs(token_, [&](FsFileInfo &fs_info) {
|
||||||
FullFileInfo info;
|
FullFileInfo info;
|
||||||
info.file_type = fs_info.file_type;
|
info.file_type = fs_info.file_type;
|
||||||
info.path = std::move(fs_info.path);
|
info.path = std::move(fs_info.path);
|
||||||
@ -175,7 +180,7 @@ void FileStatsWorker::get_stats(bool need_all_files, bool split_by_owner_dialog_
|
|||||||
hash_to_pos[std::hash<std::string>()(full_info.path)] = pos;
|
hash_to_pos[std::hash<std::string>()(full_info.path)] = pos;
|
||||||
pos++;
|
pos++;
|
||||||
}
|
}
|
||||||
scan_db([&](DbFileInfo &db_info) {
|
scan_db(token_, [&](DbFileInfo &db_info) {
|
||||||
auto it = hash_to_pos.find(std::hash<std::string>()(db_info.path));
|
auto it = hash_to_pos.find(std::hash<std::string>()(db_info.path));
|
||||||
if (it == hash_to_pos.end()) {
|
if (it == hash_to_pos.end()) {
|
||||||
return;
|
return;
|
||||||
|
@ -11,16 +11,20 @@
|
|||||||
|
|
||||||
#include "td/telegram/files/FileStats.h"
|
#include "td/telegram/files/FileStats.h"
|
||||||
|
|
||||||
|
#include "td/utils/CancellationToken.h"
|
||||||
|
|
||||||
namespace td {
|
namespace td {
|
||||||
|
|
||||||
class FileStatsWorker : public Actor {
|
class FileStatsWorker : public Actor {
|
||||||
public:
|
public:
|
||||||
explicit FileStatsWorker(ActorShared<> parent) : parent_(std::move(parent)) {
|
FileStatsWorker(ActorShared<> parent, CancellationToken token)
|
||||||
|
: parent_(std::move(parent)), token_(std::move(token)) {
|
||||||
}
|
}
|
||||||
void get_stats(bool need_all_files, bool split_by_owner_dialog_id, Promise<FileStats> promise);
|
void get_stats(bool need_all_files, bool split_by_owner_dialog_id, Promise<FileStats> promise);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
ActorShared<> parent_;
|
ActorShared<> parent_;
|
||||||
|
CancellationToken token_;
|
||||||
};
|
};
|
||||||
|
|
||||||
} // namespace td
|
} // namespace td
|
||||||
|
@ -64,7 +64,10 @@ class SqliteKeyValue {
|
|||||||
|
|
||||||
std::unordered_map<string, string> get_all() {
|
std::unordered_map<string, string> get_all() {
|
||||||
std::unordered_map<string, string> res;
|
std::unordered_map<string, string> res;
|
||||||
get_by_prefix("", [&](Slice key, Slice value) { res.emplace(key.str(), value.str()); });
|
get_by_prefix("", [&](Slice key, Slice value) {
|
||||||
|
res.emplace(key.str(), value.str());
|
||||||
|
return true;
|
||||||
|
});
|
||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -95,7 +98,9 @@ class SqliteKeyValue {
|
|||||||
auto guard = stmt->guard();
|
auto guard = stmt->guard();
|
||||||
stmt->step().ensure();
|
stmt->step().ensure();
|
||||||
while (stmt->has_row()) {
|
while (stmt->has_row()) {
|
||||||
callback(stmt->view_blob(0), stmt->view_blob(1));
|
if (!callback(stmt->view_blob(0), stmt->view_blob(1))) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
stmt->step().ensure();
|
stmt->step().ensure();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user