Add local locking to FileFd::lock.
GitOrigin-RevId: 4ad3e15f9952b1c68c879182a0f10dd5cad270f3
This commit is contained in:
parent
286c5040c1
commit
872cf6e10d
@ -3854,11 +3854,13 @@ Status Td::init(DbKey key) {
|
|||||||
|
|
||||||
VLOG(td_init) << "Begin to init database";
|
VLOG(td_init) << "Begin to init database";
|
||||||
TdDb::Events events;
|
TdDb::Events events;
|
||||||
TRY_RESULT(td_db,
|
auto r_td_db = TdDb::open(min(current_scheduler_id + 1, scheduler_count - 1), parameters_, std::move(key), events);
|
||||||
TdDb::open(min(current_scheduler_id + 1, scheduler_count - 1), parameters_, std::move(key), events));
|
if (r_td_db.is_error()) {
|
||||||
|
return Status::Error(400, r_td_db.error().message());
|
||||||
|
}
|
||||||
LOG(INFO) << "Successfully inited database in " << tag("database_directory", parameters_.database_directory)
|
LOG(INFO) << "Successfully inited database in " << tag("database_directory", parameters_.database_directory)
|
||||||
<< " and " << tag("files_directory", parameters_.files_directory);
|
<< " and " << tag("files_directory", parameters_.files_directory);
|
||||||
G()->init(parameters_, actor_id(this), std::move(td_db)).ensure();
|
G()->init(parameters_, actor_id(this), r_td_db.move_as_ok()).ensure();
|
||||||
|
|
||||||
// Init all managers and actors
|
// Init all managers and actors
|
||||||
VLOG(td_init) << "Create StateManager";
|
VLOG(td_init) << "Create StateManager";
|
||||||
|
@ -713,10 +713,10 @@ class CliClient final : public Actor {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
LOG(WARNING) << "Creating new TD " << name << " with generation " << generation_ + 1;
|
||||||
class TdCallbackImpl : public TdCallback {
|
class TdCallbackImpl : public TdCallback {
|
||||||
public:
|
public:
|
||||||
TdCallbackImpl(CliClient *client, uint64 generation) : client_(client), generation_(generation) {
|
TdCallbackImpl(CliClient *client, uint64 generation) : client_(client), generation_(generation) {
|
||||||
LOG(WARNING) << "Creating new TD with generation " << generation;
|
|
||||||
}
|
}
|
||||||
void on_result(uint64 id, tl_object_ptr<td_api::Object> result) override {
|
void on_result(uint64 id, tl_object_ptr<td_api::Object> result) override {
|
||||||
client_->on_result(generation_, id, std::move(result));
|
client_->on_result(generation_, id, std::move(result));
|
||||||
|
@ -170,9 +170,9 @@ Binlog::~Binlog() {
|
|||||||
close().ignore();
|
close().ignore();
|
||||||
}
|
}
|
||||||
|
|
||||||
Result<FileFd> Binlog::open_binlog(CSlice path, int32 flags) {
|
Result<FileFd> Binlog::open_binlog(const string &path, int32 flags) {
|
||||||
TRY_RESULT(fd, FileFd::open(path, flags));
|
TRY_RESULT(fd, FileFd::open(path, flags));
|
||||||
TRY_STATUS(fd.lock(FileFd::LockFlags::Write, 100));
|
TRY_STATUS(fd.lock(FileFd::LockFlags::Write, path, 100));
|
||||||
return std::move(fd);
|
return std::move(fd);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -280,17 +280,17 @@ Status Binlog::close(bool need_sync) {
|
|||||||
if (fd_.empty()) {
|
if (fd_.empty()) {
|
||||||
return Status::OK();
|
return Status::OK();
|
||||||
}
|
}
|
||||||
SCOPE_EXIT {
|
|
||||||
path_ = "";
|
|
||||||
info_.is_opened = false;
|
|
||||||
fd_.close();
|
|
||||||
need_sync_ = false;
|
|
||||||
};
|
|
||||||
if (need_sync) {
|
if (need_sync) {
|
||||||
sync();
|
sync();
|
||||||
} else {
|
} else {
|
||||||
flush();
|
flush();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fd_.lock(FileFd::LockFlags::Unlock, path_, 1).ensure();
|
||||||
|
fd_.close();
|
||||||
|
path_.clear();
|
||||||
|
info_.is_opened = false;
|
||||||
|
need_sync_ = false;
|
||||||
return Status::OK();
|
return Status::OK();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -618,7 +618,7 @@ void Binlog::do_reindex() {
|
|||||||
LOG(ERROR) << "Can't open new binlog for regenerate: " << r_opened_file.error();
|
LOG(ERROR) << "Can't open new binlog for regenerate: " << r_opened_file.error();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
fd_.close();
|
auto old_fd = std::move(fd_); // can't close fd_ now, because it will release file lock
|
||||||
fd_ = BufferedFdBase<FileFd>(r_opened_file.move_as_ok());
|
fd_ = BufferedFdBase<FileFd>(r_opened_file.move_as_ok());
|
||||||
|
|
||||||
buffer_writer_ = ChainBufferWriter();
|
buffer_writer_ = ChainBufferWriter();
|
||||||
@ -639,7 +639,9 @@ void Binlog::do_reindex() {
|
|||||||
// finish_reindex
|
// finish_reindex
|
||||||
auto status = unlink(path_);
|
auto status = unlink(path_);
|
||||||
LOG_IF(FATAL, status.is_error()) << "Failed to unlink old binlog: " << status;
|
LOG_IF(FATAL, status.is_error()) << "Failed to unlink old binlog: " << status;
|
||||||
|
old_fd.close(); // now we can close old file and release the system lock
|
||||||
status = rename(new_path, path_);
|
status = rename(new_path, path_);
|
||||||
|
FileFd::remove_local_lock(new_path); // now we can release local lock for temporary file
|
||||||
LOG_IF(FATAL, status.is_error()) << "Failed to rename binlog: " << status;
|
LOG_IF(FATAL, status.is_error()) << "Failed to rename binlog: " << status;
|
||||||
|
|
||||||
auto finish_time = Clocks::monotonic();
|
auto finish_time = Clocks::monotonic();
|
||||||
|
@ -132,7 +132,7 @@ class Binlog {
|
|||||||
|
|
||||||
static constexpr uint32 MAX_EVENT_SIZE = 65536;
|
static constexpr uint32 MAX_EVENT_SIZE = 65536;
|
||||||
|
|
||||||
Result<FileFd> open_binlog(CSlice path, int32 flags);
|
Result<FileFd> open_binlog(const string &path, int32 flags);
|
||||||
size_t flush_events_buffer(bool force);
|
size_t flush_events_buffer(bool force);
|
||||||
void do_add_event(BinlogEvent &&event);
|
void do_add_event(BinlogEvent &&event);
|
||||||
void do_event(BinlogEvent &&event);
|
void do_event(BinlogEvent &&event);
|
||||||
|
@ -21,6 +21,8 @@
|
|||||||
#include "td/utils/StringBuilder.h"
|
#include "td/utils/StringBuilder.h"
|
||||||
|
|
||||||
#include <cstring>
|
#include <cstring>
|
||||||
|
#include <mutex>
|
||||||
|
#include <unordered_set>
|
||||||
|
|
||||||
#if TD_PORT_POSIX
|
#if TD_PORT_POSIX
|
||||||
#include <fcntl.h>
|
#include <fcntl.h>
|
||||||
@ -282,10 +284,48 @@ Result<size_t> FileFd::pread(MutableSlice slice, int64 offset) const {
|
|||||||
return OS_ERROR(PSLICE() << "Pread from " << get_native_fd() << " at offset " << offset << " has failed");
|
return OS_ERROR(PSLICE() << "Pread from " << get_native_fd() << " at offset " << offset << " has failed");
|
||||||
}
|
}
|
||||||
|
|
||||||
Status FileFd::lock(FileFd::LockFlags flags, int32 max_tries) {
|
static std::mutex in_process_lock_mutex;
|
||||||
|
static std::unordered_set<string> locked_files;
|
||||||
|
|
||||||
|
Status FileFd::lock(const LockFlags flags, const string &path, int32 max_tries) {
|
||||||
if (max_tries <= 0) {
|
if (max_tries <= 0) {
|
||||||
return Status::Error(0, "Can't lock file: wrong max_tries");
|
return Status::Error(0, "Can't lock file: wrong max_tries");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool need_local_unlock = false;
|
||||||
|
if (!path.empty()) {
|
||||||
|
if (flags == LockFlags::Unlock) {
|
||||||
|
need_local_unlock = true;
|
||||||
|
} else if (flags == LockFlags::Read) {
|
||||||
|
LOG(FATAL) << "Local locking in Read mode is unsupported";
|
||||||
|
} else {
|
||||||
|
CHECK(flags == LockFlags::Write);
|
||||||
|
VLOG(fd) << "Trying to lock file \"" << path << '"';
|
||||||
|
while (true) {
|
||||||
|
std::unique_lock<std::mutex> lock(in_process_lock_mutex);
|
||||||
|
if (locked_files.find(path) != locked_files.end()) {
|
||||||
|
if (--max_tries > 0) {
|
||||||
|
usleep_for(100000);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
return Status::Error(
|
||||||
|
0, PSLICE() << "Can't lock file \"" << path << "\", because it is already in use by current program");
|
||||||
|
}
|
||||||
|
|
||||||
|
VLOG(fd) << "Lock file \"" << path << '"';
|
||||||
|
need_local_unlock = true;
|
||||||
|
locked_files.insert(path);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
SCOPE_EXIT {
|
||||||
|
if (need_local_unlock) {
|
||||||
|
remove_local_lock(path);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
#if TD_PORT_POSIX
|
#if TD_PORT_POSIX
|
||||||
auto native_fd = get_native_fd().fd();
|
auto native_fd = get_native_fd().fd();
|
||||||
#elif TD_PORT_WINDOWS
|
#elif TD_PORT_WINDOWS
|
||||||
@ -337,12 +377,28 @@ Status FileFd::lock(FileFd::LockFlags flags, int32 max_tries) {
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
return OS_ERROR("Can't lock file because it is already in use; check for another program instance running");
|
return OS_ERROR(PSLICE() << "Can't lock file \"" << path
|
||||||
|
<< "\", because it is already in use; check for another program instance running");
|
||||||
}
|
}
|
||||||
|
|
||||||
return OS_ERROR("Can't lock file");
|
return OS_ERROR("Can't lock file");
|
||||||
}
|
}
|
||||||
return Status::OK();
|
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (flags == LockFlags::Write) {
|
||||||
|
need_local_unlock = false;
|
||||||
|
}
|
||||||
|
return Status::OK();
|
||||||
|
}
|
||||||
|
|
||||||
|
void FileFd::remove_local_lock(const string &path) {
|
||||||
|
if (!path.empty()) {
|
||||||
|
VLOG(fd) << "Unlock file \"" << path << '"';
|
||||||
|
std::unique_lock<std::mutex> lock(in_process_lock_mutex);
|
||||||
|
auto erased = locked_files.erase(path);
|
||||||
|
CHECK(erased > 0);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -41,7 +41,8 @@ class FileFd {
|
|||||||
Result<size_t> pread(MutableSlice slice, int64 offset) const TD_WARN_UNUSED_RESULT;
|
Result<size_t> pread(MutableSlice slice, int64 offset) const TD_WARN_UNUSED_RESULT;
|
||||||
|
|
||||||
enum class LockFlags { Write, Read, Unlock };
|
enum class LockFlags { Write, Read, Unlock };
|
||||||
Status lock(LockFlags flags, int32 max_tries = 1) TD_WARN_UNUSED_RESULT;
|
Status lock(const LockFlags flags, const string &path, int32 max_tries) TD_WARN_UNUSED_RESULT;
|
||||||
|
static void remove_local_lock(const string &path);
|
||||||
|
|
||||||
PollableFdInfo &get_poll_info();
|
PollableFdInfo &get_poll_info();
|
||||||
const PollableFdInfo &get_poll_info() const;
|
const PollableFdInfo &get_poll_info() const;
|
||||||
|
Loading…
Reference in New Issue
Block a user