From 872cf6e10d6021b6d2bdd9a7e63132f2f60b2ab8 Mon Sep 17 00:00:00 2001 From: levlam Date: Mon, 8 Oct 2018 21:18:06 +0300 Subject: [PATCH] Add local locking to FileFd::lock. GitOrigin-RevId: 4ad3e15f9952b1c68c879182a0f10dd5cad270f3 --- td/telegram/Td.cpp | 8 +++-- td/telegram/cli.cpp | 2 +- tddb/td/db/binlog/Binlog.cpp | 20 ++++++----- tddb/td/db/binlog/Binlog.h | 2 +- tdutils/td/utils/port/FileFd.cpp | 62 ++++++++++++++++++++++++++++++-- tdutils/td/utils/port/FileFd.h | 3 +- 6 files changed, 79 insertions(+), 18 deletions(-) diff --git a/td/telegram/Td.cpp b/td/telegram/Td.cpp index 8658a0fd..e8749ae4 100644 --- a/td/telegram/Td.cpp +++ b/td/telegram/Td.cpp @@ -3854,11 +3854,13 @@ Status Td::init(DbKey key) { VLOG(td_init) << "Begin to init database"; TdDb::Events events; - TRY_RESULT(td_db, - TdDb::open(min(current_scheduler_id + 1, scheduler_count - 1), parameters_, std::move(key), events)); + auto r_td_db = TdDb::open(min(current_scheduler_id + 1, scheduler_count - 1), parameters_, std::move(key), events); + if (r_td_db.is_error()) { + return Status::Error(400, r_td_db.error().message()); + } LOG(INFO) << "Successfully inited database in " << tag("database_directory", parameters_.database_directory) << " and " << tag("files_directory", parameters_.files_directory); - G()->init(parameters_, actor_id(this), std::move(td_db)).ensure(); + G()->init(parameters_, actor_id(this), r_td_db.move_as_ok()).ensure(); // Init all managers and actors VLOG(td_init) << "Create StateManager"; diff --git a/td/telegram/cli.cpp b/td/telegram/cli.cpp index f4b4bc20..f17b1045 100644 --- a/td/telegram/cli.cpp +++ b/td/telegram/cli.cpp @@ -713,10 +713,10 @@ class CliClient final : public Actor { return; } + LOG(WARNING) << "Creating new TD " << name << " with generation " << generation_ + 1; class TdCallbackImpl : public TdCallback { public: TdCallbackImpl(CliClient *client, uint64 generation) : client_(client), generation_(generation) { - LOG(WARNING) << "Creating new TD with generation " << generation; } void on_result(uint64 id, tl_object_ptr result) override { client_->on_result(generation_, id, std::move(result)); diff --git a/tddb/td/db/binlog/Binlog.cpp b/tddb/td/db/binlog/Binlog.cpp index d6d4d52d..f7f142d2 100644 --- a/tddb/td/db/binlog/Binlog.cpp +++ b/tddb/td/db/binlog/Binlog.cpp @@ -170,9 +170,9 @@ Binlog::~Binlog() { close().ignore(); } -Result Binlog::open_binlog(CSlice path, int32 flags) { +Result Binlog::open_binlog(const string &path, int32 flags) { TRY_RESULT(fd, FileFd::open(path, flags)); - TRY_STATUS(fd.lock(FileFd::LockFlags::Write, 100)); + TRY_STATUS(fd.lock(FileFd::LockFlags::Write, path, 100)); return std::move(fd); } @@ -280,17 +280,17 @@ Status Binlog::close(bool need_sync) { if (fd_.empty()) { return Status::OK(); } - SCOPE_EXIT { - path_ = ""; - info_.is_opened = false; - fd_.close(); - need_sync_ = false; - }; if (need_sync) { sync(); } else { flush(); } + + fd_.lock(FileFd::LockFlags::Unlock, path_, 1).ensure(); + fd_.close(); + path_.clear(); + info_.is_opened = false; + need_sync_ = false; return Status::OK(); } @@ -618,7 +618,7 @@ void Binlog::do_reindex() { LOG(ERROR) << "Can't open new binlog for regenerate: " << r_opened_file.error(); return; } - fd_.close(); + auto old_fd = std::move(fd_); // can't close fd_ now, because it will release file lock fd_ = BufferedFdBase(r_opened_file.move_as_ok()); buffer_writer_ = ChainBufferWriter(); @@ -639,7 +639,9 @@ void Binlog::do_reindex() { // finish_reindex auto status = unlink(path_); LOG_IF(FATAL, status.is_error()) << "Failed to unlink old binlog: " << status; + old_fd.close(); // now we can close old file and release the system lock status = rename(new_path, path_); + FileFd::remove_local_lock(new_path); // now we can release local lock for temporary file LOG_IF(FATAL, status.is_error()) << "Failed to rename binlog: " << status; auto finish_time = Clocks::monotonic(); diff --git a/tddb/td/db/binlog/Binlog.h b/tddb/td/db/binlog/Binlog.h index f71c8655..8d4e7047 100644 --- a/tddb/td/db/binlog/Binlog.h +++ b/tddb/td/db/binlog/Binlog.h @@ -132,7 +132,7 @@ class Binlog { static constexpr uint32 MAX_EVENT_SIZE = 65536; - Result open_binlog(CSlice path, int32 flags); + Result open_binlog(const string &path, int32 flags); size_t flush_events_buffer(bool force); void do_add_event(BinlogEvent &&event); void do_event(BinlogEvent &&event); diff --git a/tdutils/td/utils/port/FileFd.cpp b/tdutils/td/utils/port/FileFd.cpp index da1aa397..86680649 100644 --- a/tdutils/td/utils/port/FileFd.cpp +++ b/tdutils/td/utils/port/FileFd.cpp @@ -21,6 +21,8 @@ #include "td/utils/StringBuilder.h" #include +#include +#include #if TD_PORT_POSIX #include @@ -282,10 +284,48 @@ Result FileFd::pread(MutableSlice slice, int64 offset) const { return OS_ERROR(PSLICE() << "Pread from " << get_native_fd() << " at offset " << offset << " has failed"); } -Status FileFd::lock(FileFd::LockFlags flags, int32 max_tries) { +static std::mutex in_process_lock_mutex; +static std::unordered_set locked_files; + +Status FileFd::lock(const LockFlags flags, const string &path, int32 max_tries) { if (max_tries <= 0) { return Status::Error(0, "Can't lock file: wrong max_tries"); } + + bool need_local_unlock = false; + if (!path.empty()) { + if (flags == LockFlags::Unlock) { + need_local_unlock = true; + } else if (flags == LockFlags::Read) { + LOG(FATAL) << "Local locking in Read mode is unsupported"; + } else { + CHECK(flags == LockFlags::Write); + VLOG(fd) << "Trying to lock file \"" << path << '"'; + while (true) { + std::unique_lock lock(in_process_lock_mutex); + if (locked_files.find(path) != locked_files.end()) { + if (--max_tries > 0) { + usleep_for(100000); + continue; + } + + return Status::Error( + 0, PSLICE() << "Can't lock file \"" << path << "\", because it is already in use by current program"); + } + + VLOG(fd) << "Lock file \"" << path << '"'; + need_local_unlock = true; + locked_files.insert(path); + break; + } + } + } + SCOPE_EXIT { + if (need_local_unlock) { + remove_local_lock(path); + } + }; + #if TD_PORT_POSIX auto native_fd = get_native_fd().fd(); #elif TD_PORT_WINDOWS @@ -337,12 +377,28 @@ Status FileFd::lock(FileFd::LockFlags flags, int32 max_tries) { continue; } - return OS_ERROR("Can't lock file because it is already in use; check for another program instance running"); + return OS_ERROR(PSLICE() << "Can't lock file \"" << path + << "\", because it is already in use; check for another program instance running"); } return OS_ERROR("Can't lock file"); } - return Status::OK(); + + break; + } + + if (flags == LockFlags::Write) { + need_local_unlock = false; + } + return Status::OK(); +} + +void FileFd::remove_local_lock(const string &path) { + if (!path.empty()) { + VLOG(fd) << "Unlock file \"" << path << '"'; + std::unique_lock lock(in_process_lock_mutex); + auto erased = locked_files.erase(path); + CHECK(erased > 0); } } diff --git a/tdutils/td/utils/port/FileFd.h b/tdutils/td/utils/port/FileFd.h index 10241913..630ba9a4 100644 --- a/tdutils/td/utils/port/FileFd.h +++ b/tdutils/td/utils/port/FileFd.h @@ -41,7 +41,8 @@ class FileFd { Result pread(MutableSlice slice, int64 offset) const TD_WARN_UNUSED_RESULT; enum class LockFlags { Write, Read, Unlock }; - Status lock(LockFlags flags, int32 max_tries = 1) TD_WARN_UNUSED_RESULT; + Status lock(const LockFlags flags, const string &path, int32 max_tries) TD_WARN_UNUSED_RESULT; + static void remove_local_lock(const string &path); PollableFdInfo &get_poll_info(); const PollableFdInfo &get_poll_info() const;