tdlight/tddb/td/db/binlog/Binlog.cpp
2023-01-16 12:47:37 +03:00

766 lines
23 KiB
C++

//
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2023
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include "td/db/binlog/Binlog.h"
#include "td/db/binlog/detail/BinlogEventsBuffer.h"
#include "td/db/binlog/detail/BinlogEventsProcessor.h"
#include "td/utils/buffer.h"
#include "td/utils/format.h"
#include "td/utils/misc.h"
#include "td/utils/port/Clocks.h"
#include "td/utils/port/FileFd.h"
#include "td/utils/port/path.h"
#include "td/utils/port/PollFlags.h"
#include "td/utils/port/sleep.h"
#include "td/utils/port/Stat.h"
#include "td/utils/Random.h"
#include "td/utils/ScopeGuard.h"
#include "td/utils/SliceBuilder.h"
#include "td/utils/Status.h"
#include "td/utils/Time.h"
#include "td/utils/tl_helpers.h"
#include "td/utils/tl_parsers.h"
namespace td {
namespace detail {
struct AesCtrEncryptionEvent {
static constexpr size_t min_salt_size() {
return 16; // 256 bits
}
static constexpr size_t default_salt_size() {
return 32; // 256 bits
}
static constexpr size_t key_size() {
return 32; // 256 bits
}
static constexpr size_t iv_size() {
return 16; // 128 bits
}
static constexpr size_t hash_size() {
return 32; // 256 bits
}
static constexpr size_t kdf_iteration_count() {
return 60002;
}
static constexpr size_t kdf_fast_iteration_count() {
return 2;
}
BufferSlice key_salt_;
BufferSlice iv_;
BufferSlice key_hash_;
BufferSlice generate_key(const DbKey &db_key) const {
CHECK(!db_key.is_empty());
BufferSlice key(key_size());
size_t iteration_count = kdf_iteration_count();
if (db_key.is_raw_key()) {
iteration_count = kdf_fast_iteration_count();
}
pbkdf2_sha256(db_key.data(), key_salt_.as_slice(), narrow_cast<int>(iteration_count), key.as_slice());
return key;
}
static BufferSlice generate_hash(Slice key) {
BufferSlice hash(hash_size());
hmac_sha256(key, "cucumbers everywhere", hash.as_slice());
return hash;
}
template <class StorerT>
void store(StorerT &storer) const {
using td::store;
BEGIN_STORE_FLAGS();
END_STORE_FLAGS();
store(key_salt_, storer);
store(iv_, storer);
store(key_hash_, storer);
}
template <class ParserT>
void parse(ParserT &&parser) {
using td::parse;
BEGIN_PARSE_FLAGS();
END_PARSE_FLAGS();
parse(key_salt_, parser);
parse(iv_, parser);
parse(key_hash_, parser);
}
};
class BinlogReader {
public:
explicit BinlogReader(ChainBufferReader *input) : input_(input) {
}
void set_input(ChainBufferReader *input, bool is_encrypted, int64 expected_size) {
input_ = input;
is_encrypted_ = is_encrypted;
expected_size_ = expected_size;
}
ChainBufferReader *input() {
return input_;
}
int64 offset() const {
return offset_;
}
Result<size_t> read_next(BinlogEvent *event) {
if (state_ == State::ReadLength) {
if (input_->size() < 4) {
return 4;
}
auto it = input_->clone();
char buf[4];
it.advance(4, MutableSlice(buf, 4));
size_ = static_cast<size_t>(TlParser(Slice(buf, 4)).fetch_int());
if (size_ > BinlogEvent::MAX_SIZE) {
return Status::Error(PSLICE() << "Too big event " << tag("size", size_));
}
if (size_ < BinlogEvent::MIN_SIZE) {
return Status::Error(PSLICE() << "Too small event " << tag("size", size_));
}
if (size_ % 4 != 0) {
return Status::Error(-2, PSLICE() << "Event of size " << size_ << " at offset " << offset() << " out of "
<< expected_size_ << ' ' << tag("is_encrypted", is_encrypted_)
<< format::as_hex_dump<4>(Slice(input_->prepare_read().truncate(28))));
}
state_ = State::ReadEvent;
}
if (input_->size() < size_) {
return size_;
}
event->debug_info_ = BinlogDebugInfo{__FILE__, __LINE__};
TRY_STATUS(event->init(input_->cut_head(size_).move_as_buffer_slice()));
offset_ += size_;
event->offset_ = offset_;
state_ = State::ReadLength;
return 0;
}
private:
ChainBufferReader *input_;
enum class State { ReadLength, ReadEvent };
State state_ = State::ReadLength;
size_t size_{0};
int64 offset_{0};
int64 expected_size_{0};
bool is_encrypted_{false};
};
static int64 file_size(CSlice path) {
auto r_stat = stat(path);
if (r_stat.is_error()) {
return 0;
}
return r_stat.ok().size_;
}
} // namespace detail
int32 VERBOSITY_NAME(binlog) = VERBOSITY_NAME(DEBUG) + 8;
Binlog::Binlog() = default;
Binlog::~Binlog() {
close().ignore();
}
Result<FileFd> Binlog::open_binlog(const string &path, int32 flags) {
TRY_RESULT(fd, FileFd::open(path, flags));
TRY_STATUS(fd.lock(FileFd::LockFlags::Write, path, 100));
return std::move(fd);
}
Status Binlog::init(string path, const Callback &callback, DbKey db_key, DbKey old_db_key, int32 dummy,
const Callback &debug_callback) {
close().ignore();
db_key_ = std::move(db_key);
old_db_key_ = std::move(old_db_key);
processor_ = make_unique<detail::BinlogEventsProcessor>();
// Turn off BinlogEventsBuffer
// events_buffer_ = make_unique<detail::BinlogEventsBuffer>();
// try to restore binlog from regenerated version
if (stat(path).is_error()) {
rename(PSLICE() << path << ".new", path).ignore();
}
info_ = BinlogInfo();
info_.was_created = stat(path).is_error();
TRY_RESULT(fd, open_binlog(path, FileFd::Flags::Read | FileFd::Flags::Write | FileFd::Flags::Create));
fd_ = BufferedFdBase<FileFd>(std::move(fd));
fd_size_ = 0;
path_ = std::move(path);
auto status = load_binlog(callback, debug_callback);
if (status.is_error()) {
close().ignore();
return status;
}
info_.last_event_id = processor_->last_event_id();
last_event_id_ = processor_->last_event_id();
if (info_.wrong_password) {
close().ignore();
return Status::Error(static_cast<int>(Error::WrongPassword), "Wrong password");
}
if ((!db_key_.is_empty() && !db_key_used_) || (db_key_.is_empty() && encryption_type_ != EncryptionType::None)) {
aes_ctr_key_salt_ = BufferSlice();
do_reindex();
}
info_.is_opened = true;
return Status::OK();
}
void Binlog::add_event(BinlogEvent &&event) {
if (event.size_ % 4 != 0) {
LOG(FATAL) << "Trying to add event with bad size " << event.public_to_string();
}
if (!events_buffer_) {
do_add_event(std::move(event));
} else {
events_buffer_->add_event(std::move(event));
}
lazy_flush();
if (state_ == State::Run) {
auto fd_size = fd_size_;
if (events_buffer_) {
fd_size += events_buffer_->size();
}
auto need_reindex = [&](int64 min_size, int rate) {
return fd_size > min_size && fd_size / rate > processor_->total_raw_events_size();
};
if (need_reindex(50000, 5) || need_reindex(100000, 4) || need_reindex(300000, 3) || need_reindex(500000, 2)) {
LOG(INFO) << tag("fd_size", format::as_size(fd_size))
<< tag("total events size", format::as_size(processor_->total_raw_events_size()));
do_reindex();
}
}
}
size_t Binlog::flush_events_buffer(bool force) {
if (!events_buffer_) {
return 0;
}
if (!force && !events_buffer_->need_flush()) {
return events_buffer_->size();
}
CHECK(!in_flush_events_buffer_);
in_flush_events_buffer_ = true;
events_buffer_->flush([&](BinlogEvent &&event) { this->do_add_event(std::move(event)); });
in_flush_events_buffer_ = false;
return 0;
}
void Binlog::do_add_event(BinlogEvent &&event) {
if (event.flags_ & BinlogEvent::Flags::Partial) {
event.flags_ &= ~BinlogEvent::Flags::Partial;
pending_events_.emplace_back(std::move(event));
} else {
for (auto &pending_event : pending_events_) {
do_event(std::move(pending_event));
}
pending_events_.clear();
do_event(std::move(event));
}
}
Status Binlog::close(bool need_sync) {
if (fd_.empty()) {
return Status::OK();
}
if (need_sync) {
sync();
} else {
flush();
}
fd_.lock(FileFd::LockFlags::Unlock, path_, 1).ensure();
fd_.close();
path_.clear();
info_.is_opened = false;
need_sync_ = false;
return Status::OK();
}
void Binlog::close(Promise<> promise) {
TRY_STATUS_PROMISE(promise, close());
promise.set_value({});
}
void Binlog::change_key(DbKey new_db_key) {
db_key_ = std::move(new_db_key);
aes_ctr_key_salt_ = BufferSlice();
do_reindex();
}
Status Binlog::close_and_destroy() {
auto path = path_;
auto close_status = close(false);
destroy(path).ignore();
return close_status;
}
Status Binlog::destroy(Slice path) {
unlink(PSLICE() << path << ".new").ignore(); // delete regenerated version first to avoid it becoming main version
unlink(PSLICE() << path).ignore();
return Status::OK();
}
void Binlog::do_event(BinlogEvent &&event) {
auto event_size = event.raw_event_.size();
if (state_ == State::Run || state_ == State::Reindex) {
auto validate_status = event.validate();
if (validate_status.is_error()) {
LOG(FATAL) << "Failed to validate binlog event " << validate_status << " "
<< format::as_hex_dump<4>(Slice(event.raw_event_.as_slice().truncate(28)));
}
VLOG(binlog) << "Write binlog event: " << format::cond(state_ == State::Reindex, "[reindex] ")
<< event.public_to_string();
switch (encryption_type_) {
case EncryptionType::None: {
buffer_writer_.append(event.raw_event_.clone());
break;
}
case EncryptionType::AesCtr: {
buffer_writer_.append(event.raw_event_.as_slice());
break;
}
}
}
if (event.type_ < 0) {
if (event.type_ == BinlogEvent::ServiceTypes::AesCtrEncryption) {
detail::AesCtrEncryptionEvent encryption_event;
encryption_event.parse(TlParser(event.data_));
BufferSlice key;
if (aes_ctr_key_salt_.as_slice() == encryption_event.key_salt_.as_slice()) {
key = BufferSlice(as_slice(aes_ctr_key_));
} else if (!db_key_.is_empty()) {
key = encryption_event.generate_key(db_key_);
}
if (detail::AesCtrEncryptionEvent::generate_hash(key.as_slice()).as_slice() !=
encryption_event.key_hash_.as_slice()) {
CHECK(state_ == State::Load);
if (!old_db_key_.is_empty()) {
key = encryption_event.generate_key(old_db_key_);
if (detail::AesCtrEncryptionEvent::generate_hash(key.as_slice()).as_slice() !=
encryption_event.key_hash_.as_slice()) {
info_.wrong_password = true;
}
} else {
info_.wrong_password = true;
}
} else {
db_key_used_ = true;
}
encryption_type_ = EncryptionType::AesCtr;
aes_ctr_key_salt_ = encryption_event.key_salt_.copy();
update_encryption(key.as_slice(), encryption_event.iv_.as_slice());
if (state_ == State::Load) {
update_read_encryption();
LOG(INFO) << "Load: init encryption";
} else {
CHECK(state_ == State::Reindex);
flush();
update_write_encryption();
//LOG(INFO) << format::cond(state_ == State::Run, "Run", "Reindex") << ": init encryption";
}
}
}
if (state_ != State::Reindex) {
auto status = processor_->add_event(std::move(event));
if (status.is_error()) {
auto old_size = detail::file_size(path_);
auto data = debug_get_binlog_data(fd_size_, old_size);
if (state_ == State::Load) {
fd_.seek(fd_size_).ensure();
fd_.truncate_to_current_position(fd_size_).ensure();
if (data.empty()) {
return;
}
}
LOG(FATAL) << "Truncate binlog \"" << path_ << "\" from size " << old_size << " to size " << fd_size_
<< " in state " << static_cast<int32>(state_) << " due to error: " << status << " after reading "
<< data;
}
}
fd_events_++;
fd_size_ += event_size;
}
void Binlog::sync() {
flush();
if (need_sync_) {
auto status = fd_.sync();
LOG_IF(FATAL, status.is_error()) << "Failed to sync binlog: " << status;
need_sync_ = false;
}
}
void Binlog::flush() {
if (state_ == State::Load) {
return;
}
flush_events_buffer(true);
// NB: encryption happens during flush
if (byte_flow_flag_) {
byte_flow_source_.wakeup();
}
auto r_written = fd_.flush_write();
r_written.ensure();
auto written = r_written.ok();
if (written > 0) {
need_sync_ = true;
}
need_flush_since_ = 0;
LOG_IF(FATAL, fd_.need_flush_write()) << "Failed to flush binlog";
}
void Binlog::lazy_flush() {
size_t events_buffer_size = flush_events_buffer(false /*force*/);
buffer_reader_.sync_with_writer();
auto size = buffer_reader_.size() + events_buffer_size;
if (size > (1 << 14)) {
flush();
} else if (size > 0 && need_flush_since_ == 0) {
need_flush_since_ = Time::now_cached();
}
}
void Binlog::update_read_encryption() {
CHECK(binlog_reader_ptr_);
switch (encryption_type_) {
case EncryptionType::None: {
auto r_file_size = fd_.get_size();
r_file_size.ensure();
binlog_reader_ptr_->set_input(&buffer_reader_, false, r_file_size.ok());
byte_flow_flag_ = false;
break;
}
case EncryptionType::AesCtr: {
byte_flow_source_ = ByteFlowSource(&buffer_reader_);
aes_xcode_byte_flow_ = AesCtrByteFlow();
aes_xcode_byte_flow_.init(std::move(aes_ctr_state_));
byte_flow_sink_ = ByteFlowSink();
byte_flow_source_ >> aes_xcode_byte_flow_ >> byte_flow_sink_;
byte_flow_flag_ = true;
auto r_file_size = fd_.get_size();
r_file_size.ensure();
binlog_reader_ptr_->set_input(byte_flow_sink_.get_output(), true, r_file_size.ok());
break;
}
}
}
void Binlog::update_write_encryption() {
switch (encryption_type_) {
case EncryptionType::None: {
fd_.set_output_reader(&buffer_reader_);
byte_flow_flag_ = false;
break;
}
case EncryptionType::AesCtr: {
byte_flow_source_ = ByteFlowSource(&buffer_reader_);
aes_xcode_byte_flow_ = AesCtrByteFlow();
aes_xcode_byte_flow_.init(std::move(aes_ctr_state_));
byte_flow_sink_ = ByteFlowSink();
byte_flow_source_ >> aes_xcode_byte_flow_ >> byte_flow_sink_;
byte_flow_flag_ = true;
fd_.set_output_reader(byte_flow_sink_.get_output());
break;
}
}
}
Status Binlog::load_binlog(const Callback &callback, const Callback &debug_callback) {
state_ = State::Load;
buffer_writer_ = ChainBufferWriter();
buffer_reader_ = buffer_writer_.extract_reader();
fd_.set_input_writer(&buffer_writer_);
detail::BinlogReader reader{nullptr};
binlog_reader_ptr_ = &reader;
update_read_encryption();
fd_.get_poll_info().add_flags(PollFlags::Read());
info_.wrong_password = false;
while (true) {
BinlogEvent event;
auto r_need_size = reader.read_next(&event);
if (r_need_size.is_error()) {
if (r_need_size.error().code() == -2) {
auto old_size = detail::file_size(path_);
auto offset = reader.offset();
auto data = debug_get_binlog_data(offset, old_size);
fd_.seek(offset).ensure();
fd_.truncate_to_current_position(offset).ensure();
if (data.empty()) {
break;
}
LOG(FATAL) << "Truncate binlog \"" << path_ << "\" from size " << old_size << " to size " << offset
<< " due to error: " << r_need_size.error() << " after reading " << data;
}
LOG(ERROR) << r_need_size.error();
break;
}
auto need_size = r_need_size.move_as_ok();
// LOG(ERROR) << "Need size = " << need_size;
if (need_size == 0) {
if (debug_callback) {
debug_callback(event);
}
do_add_event(std::move(event));
if (info_.wrong_password) {
return Status::OK();
}
} else {
TRY_STATUS(fd_.flush_read(max(need_size, static_cast<size_t>(4096))));
buffer_reader_.sync_with_writer();
if (byte_flow_flag_) {
byte_flow_source_.wakeup();
}
if (reader.input()->size() < need_size) {
break;
}
}
}
auto offset = processor_->offset();
processor_->for_each([&](BinlogEvent &event) {
VLOG(binlog) << "Replay binlog event: " << event.public_to_string();
if (callback) {
callback(event);
}
});
TRY_RESULT(fd_size, fd_.get_size());
if (offset != fd_size) {
LOG(ERROR) << "Truncate " << tag("path", path_) << tag("old_size", fd_size) << tag("new_size", offset);
fd_.seek(offset).ensure();
fd_.truncate_to_current_position(offset).ensure();
db_key_used_ = false; // force reindex
}
LOG_CHECK(fd_size_ == offset) << fd_size << " " << fd_size_ << " " << offset;
binlog_reader_ptr_ = nullptr;
state_ = State::Run;
buffer_writer_ = ChainBufferWriter();
buffer_reader_ = buffer_writer_.extract_reader();
// reuse aes_ctr_state_
if (encryption_type_ == EncryptionType::AesCtr) {
aes_ctr_state_ = aes_xcode_byte_flow_.move_aes_ctr_state();
}
update_write_encryption();
return Status::OK();
}
void Binlog::update_encryption(Slice key, Slice iv) {
as_slice(aes_ctr_key_).copy_from(key);
UInt128 aes_ctr_iv;
as_slice(aes_ctr_iv).copy_from(iv);
aes_ctr_state_.init(as_slice(aes_ctr_key_), as_slice(aes_ctr_iv));
}
void Binlog::reset_encryption() {
if (db_key_.is_empty()) {
encryption_type_ = EncryptionType::None;
return;
}
using EncryptionEvent = detail::AesCtrEncryptionEvent;
EncryptionEvent event;
if (aes_ctr_key_salt_.empty()) {
event.key_salt_ = BufferSlice(EncryptionEvent::default_salt_size());
Random::secure_bytes(event.key_salt_.as_slice());
} else {
event.key_salt_ = aes_ctr_key_salt_.clone();
}
event.iv_ = BufferSlice(EncryptionEvent::iv_size());
Random::secure_bytes(event.iv_.as_slice());
BufferSlice key;
if (aes_ctr_key_salt_.as_slice() == event.key_salt_.as_slice()) {
key = BufferSlice(as_slice(aes_ctr_key_));
} else {
key = event.generate_key(db_key_);
}
event.key_hash_ = EncryptionEvent::generate_hash(key.as_slice());
do_event(BinlogEvent(
BinlogEvent::create_raw(0, BinlogEvent::ServiceTypes::AesCtrEncryption, 0, create_default_storer(event)),
BinlogDebugInfo{__FILE__, __LINE__}));
}
void Binlog::do_reindex() {
flush_events_buffer(true);
// start reindex
CHECK(state_ == State::Run);
state_ = State::Reindex;
SCOPE_EXIT {
state_ = State::Run;
};
auto start_time = Clocks::monotonic();
auto start_size = detail::file_size(path_);
auto start_events = fd_events_;
string new_path = path_ + ".new";
auto r_opened_file = open_binlog(new_path, FileFd::Flags::Write | FileFd::Flags::Create | FileFd::Truncate);
if (r_opened_file.is_error()) {
LOG(ERROR) << "Can't open new binlog for regenerate: " << r_opened_file.error();
return;
}
auto old_fd = std::move(fd_); // can't close fd_ now, because it will release file lock
fd_ = BufferedFdBase<FileFd>(r_opened_file.move_as_ok());
buffer_writer_ = ChainBufferWriter();
buffer_reader_ = buffer_writer_.extract_reader();
encryption_type_ = EncryptionType::None;
update_write_encryption();
// reindex
fd_size_ = 0;
fd_events_ = 0;
reset_encryption();
processor_->for_each([&](BinlogEvent &event) {
event.realloc();
do_event(std::move(event)); // NB: no move is actually happens
});
need_sync_ = start_size != 0; // must sync creation of the file if it is non-empty
sync();
// finish_reindex
auto status = unlink(path_);
LOG_IF(FATAL, status.is_error()) << "Failed to unlink old binlog: " << status;
old_fd.close(); // now we can close old file and release the system lock
status = rename(new_path, path_);
FileFd::remove_local_lock(new_path); // now we can release local lock for temporary file
LOG_IF(FATAL, status.is_error()) << "Failed to rename binlog: " << status;
auto finish_time = Clocks::monotonic();
auto finish_size = fd_size_;
auto finish_events = fd_events_;
for (int left_tries = 10; left_tries > 0; left_tries--) {
auto r_stat = stat(path_);
if (r_stat.is_error()) {
if (left_tries != 1) {
usleep_for(200000 / left_tries);
continue;
}
LOG(FATAL) << "Failed to rename binlog of size " << fd_size_ << " to " << path_ << ": " << r_stat.error()
<< ". Temp file size is " << detail::file_size(new_path) << ", new size " << detail::file_size(path_);
}
LOG_CHECK(fd_size_ == r_stat.ok().size_) << fd_size_ << ' ' << r_stat.ok().size_ << ' '
<< detail::file_size(new_path) << ' ' << fd_events_ << ' ' << path_;
break;
}
auto ratio = static_cast<double>(start_size) / static_cast<double>(finish_size + 1);
[&](Slice msg) {
if (start_size > (10 << 20) || finish_time - start_time > 1) {
LOG(WARNING) << "Slow " << msg;
} else {
LOG(INFO) << msg;
}
}(PSLICE() << "Regenerate index " << tag("name", path_) << tag("time", format::as_time(finish_time - start_time))
<< tag("before_size", format::as_size(start_size)) << tag("after_size", format::as_size(finish_size))
<< tag("ratio", ratio) << tag("before_events", start_events) << tag("after_events", finish_events));
buffer_writer_ = ChainBufferWriter();
buffer_reader_ = buffer_writer_.extract_reader();
// reuse aes_ctr_state_
if (encryption_type_ == EncryptionType::AesCtr) {
aes_ctr_state_ = aes_xcode_byte_flow_.move_aes_ctr_state();
}
update_write_encryption();
}
string Binlog::debug_get_binlog_data(int64 begin_offset, int64 end_offset) {
if (begin_offset > end_offset) {
return "Begin offset is bigger than end_offset";
}
if (begin_offset == end_offset) {
return string();
}
static int64 MAX_DATA_LENGTH = 512;
if (end_offset - begin_offset > MAX_DATA_LENGTH) {
end_offset = begin_offset + MAX_DATA_LENGTH;
}
auto r_fd = FileFd::open(path_, FileFd::Flags::Read);
if (r_fd.is_error()) {
return PSTRING() << "Failed to open binlog: " << r_fd.error();
}
auto fd = r_fd.move_as_ok();
fd_.lock(FileFd::LockFlags::Unlock, path_, 1).ignore();
SCOPE_EXIT {
fd_.lock(FileFd::LockFlags::Write, path_, 1).ensure();
};
auto expected_data_length = narrow_cast<size_t>(end_offset - begin_offset);
string data(expected_data_length, '\0');
auto r_data_size = fd.pread(data, begin_offset);
if (r_data_size.is_error()) {
return PSTRING() << "Failed to read binlog: " << r_data_size.error();
}
if (r_data_size.ok() < expected_data_length) {
data.resize(r_data_size.ok());
data = PSTRING() << format::as_hex_dump<4>(Slice(data)) << " | with " << expected_data_length - r_data_size.ok()
<< " missed bytes";
} else {
if (encryption_type_ == EncryptionType::AesCtr) {
bool is_zero = true;
for (auto &c : data) {
if (c != '\0') {
is_zero = false;
}
}
// very often we have '\0' bytes written to disk instead of a real log event
// this is clearly impossible content for a real encrypted log event, so just ignore it
if (is_zero) {
return string();
}
}
data = PSTRING() << format::as_hex_dump<4>(Slice(data));
}
return data;
}
} // namespace td