Add AsyncFileLog.

This commit is contained in:
levlam 2022-10-09 13:04:57 +03:00
parent 7e1555531c
commit 78ba8fa983
4 changed files with 206 additions and 0 deletions

View File

@ -87,6 +87,7 @@ set(TDUTILS_SOURCE
${TDMIME_AUTO}
td/utils/AsyncFileLog.cpp
td/utils/base64.cpp
td/utils/BigNum.cpp
td/utils/buffer.cpp
@ -179,6 +180,7 @@ set(TDUTILS_SOURCE
td/utils/AesCtrByteFlow.h
td/utils/algorithm.h
td/utils/as.h
td/utils/AsyncFileLog.h
td/utils/AtomicRead.h
td/utils/base64.h
td/utils/benchmark.h

View File

@ -0,0 +1,135 @@
//
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include "td/utils/AsyncFileLog.h"
#include "td/utils/port/FileFd.h"
#include "td/utils/port/path.h"
#include "td/utils/SliceBuilder.h"
namespace td {
Status AsyncFileLog::init(string path, int64 rotate_threshold) {
CHECK(path_.empty());
CHECK(!path.empty());
TRY_RESULT(fd, FileFd::open(path, FileFd::Create | FileFd::Write | FileFd::Append));
auto r_path = realpath(path, true);
if (r_path.is_error()) {
path_ = std::move(path);
} else {
path_ = r_path.move_as_ok();
}
TRY_RESULT(size, fd.get_size());
queue_ = td::make_unique<MpscPollableQueue<Query>>();
queue_->init();
logging_thread_ =
td::thread([queue = queue_.get(), fd = std::move(fd), path = path_, size, rotate_threshold]() mutable {
auto after_rotation = [&] {
ScopedDisableLog disable_log; // to ensure that nothing will be printed to the closed log
fd.close();
auto r_fd = FileFd::open(path, FileFd::Create | FileFd::Truncate | FileFd::Write);
if (r_fd.is_error()) {
process_fatal_error(PSLICE() << r_fd.error() << " in " << __FILE__ << " at " << __LINE__ << '\n');
}
fd = r_fd.move_as_ok();
size = 0;
};
auto append = [&](CSlice slice) {
if (size > rotate_threshold) {
auto status = rename(path, PSLICE() << path << ".old");
if (status.is_error()) {
process_fatal_error(PSLICE() << status.error() << " in " << __FILE__ << " at " << __LINE__ << '\n');
}
after_rotation();
}
while (!slice.empty()) {
auto r_size = fd.write(slice);
if (r_size.is_error()) {
process_fatal_error(PSLICE() << r_size.error() << " in " << __FILE__ << " at " << __LINE__ << '\n');
}
auto written = r_size.ok();
size += static_cast<int64>(written);
slice.remove_prefix(written);
}
};
while (true) {
int ready_count = queue->reader_wait_nonblock();
if (ready_count == 0) {
queue->reader_get_event_fd().wait(1000);
continue;
}
bool need_close = false;
while (ready_count-- > 0) {
Query query = queue->reader_get_unsafe();
switch (query.type_) {
case Query::Type::Log:
append(query.data_);
break;
case Query::Type::AfterRotation:
after_rotation();
break;
case Query::Type::Close:
need_close = true;
break;
default:
process_fatal_error("Invalid query type in AsyncFileLog");
}
}
queue->reader_flush();
if (need_close) {
fd.close();
break;
}
}
});
return Status::OK();
}
AsyncFileLog::~AsyncFileLog() {
if (queue_ == nullptr) {
return;
}
Query query;
query.type_ = Query::Type::Close;
queue_->writer_put(std::move(query));
logging_thread_.join();
}
vector<string> AsyncFileLog::get_file_paths() {
vector<string> result;
if (!path_.empty()) {
result.push_back(path_);
result.push_back(PSTRING() << path_ << ".old");
}
return result;
}
void AsyncFileLog::after_rotation() {
Query query;
query.type_ = Query::Type::AfterRotation;
if (queue_ == nullptr) {
process_fatal_error("AsyncFileLog is not inited");
}
queue_->writer_put(std::move(query));
}
void AsyncFileLog::do_append(int log_level, CSlice slice) {
Query query;
query.data_ = slice.str();
if (queue_ == nullptr) {
process_fatal_error("AsyncFileLog is not inited");
}
queue_->writer_put(std::move(query));
}
} // namespace td

View File

@ -0,0 +1,47 @@
//
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#pragma once
#include "td/utils/common.h"
#include "td/utils/logging.h"
#include "td/utils/MpscPollableQueue.h"
#include "td/utils/port/thread.h"
#include "td/utils/Slice.h"
#include "td/utils/Status.h"
namespace td {
class AsyncFileLog final : public LogInterface {
public:
AsyncFileLog() = default;
AsyncFileLog(const AsyncFileLog &) = delete;
AsyncFileLog &operator=(const AsyncFileLog &) = delete;
AsyncFileLog(AsyncFileLog &&) = delete;
AsyncFileLog &operator=(AsyncFileLog &&) = delete;
~AsyncFileLog();
Status init(string path, int64 rotate_threshold);
private:
struct Query {
enum class Type : int32 { Log, AfterRotation, Close };
Type type_ = Type::Log;
string data_;
};
string path_;
unique_ptr<MpscPollableQueue<Query>> queue_;
thread logging_thread_;
vector<string> get_file_paths() final;
void after_rotation() final;
void do_append(int log_level, CSlice slice) final;
};
} // namespace td

View File

@ -4,6 +4,7 @@
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include "td/utils/AsyncFileLog.h"
#include "td/utils/benchmark.h"
#include "td/utils/CombinedLog.h"
#include "td/utils/FileLog.h"
@ -161,5 +162,26 @@ TEST(Log, Bench) {
};
return td::make_unique<FileLog>();
});
#if !TD_EVENTFD_UNSUPPORTED
bench_log("AsyncFileLog", [] {
class AsyncFileLog final : public td::LogInterface {
public:
AsyncFileLog() {
file_log_.init("tmplog", std::numeric_limits<td::int64>::max()).ensure();
}
void do_append(int log_level, td::CSlice slice) final {
static_cast<td::LogInterface &>(file_log_).do_append(log_level, slice);
}
std::vector<std::string> get_file_paths() final {
return static_cast<td::LogInterface &>(file_log_).get_file_paths();
}
private:
td::AsyncFileLog file_log_;
};
return td::make_unique<AsyncFileLog>();
});
#endif
}
#endif