tdlight/tddb/td/db/BinlogKeyValue.h
2024-02-02 15:09:17 +03:00

302 lines
8.6 KiB
C++

//
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2024
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#pragma once
#include "td/db/binlog/Binlog.h"
#include "td/db/binlog/BinlogEvent.h"
#include "td/db/DbKey.h"
#include "td/db/KeyValueSyncInterface.h"
#include "td/utils/algorithm.h"
#include "td/utils/buffer.h"
#include "td/utils/common.h"
#include "td/utils/FlatHashMap.h"
#include "td/utils/HashTableUtils.h"
#include "td/utils/logging.h"
#include "td/utils/misc.h"
#include "td/utils/port/RwMutex.h"
#include "td/utils/Promise.h"
#include "td/utils/Slice.h"
#include "td/utils/Status.h"
#include "td/utils/StorerBase.h"
#include "td/utils/tl_parsers.h"
#include "td/utils/tl_storers.h"
#include <functional>
#include <memory>
#include <unordered_map>
#include <utility>
namespace td {
template <class BinlogT>
class BinlogKeyValue final : public KeyValueSyncInterface {
public:
static constexpr int32 MAGIC = 0x2a280000;
struct Event final : public Storer {
Event() = default;
Event(Slice key, Slice value) : key(key), value(value) {
}
Slice key;
Slice value;
template <class StorerT>
void store(StorerT &&storer) const {
storer.store_string(key);
storer.store_string(value);
}
template <class ParserT>
void parse(ParserT &&parser) {
key = parser.template fetch_string<Slice>();
value = parser.template fetch_string<Slice>();
}
size_t size() const final {
TlStorerCalcLength storer;
store(storer);
return storer.get_length();
}
size_t store(uint8 *ptr) const final {
TlStorerUnsafe storer(ptr);
store(storer);
return static_cast<size_t>(storer.get_buf() - ptr);
}
};
int32 get_magic() const {
return magic_;
}
Status init(string name, DbKey db_key = DbKey::empty(), int scheduler_id = -1,
int32 override_magic = 0) TD_WARN_UNUSED_RESULT {
close();
if (override_magic) {
magic_ = override_magic;
}
binlog_ = std::make_shared<BinlogT>();
TRY_STATUS(binlog_->init(
name,
[&](const BinlogEvent &binlog_event) {
Event event;
event.parse(TlParser(binlog_event.get_data()));
if (event.key.empty()) {
LOG(ERROR) << "Have event with empty key";
return;
}
map_.emplace(event.key.str(), std::make_pair(event.value.str(), binlog_event.id_));
},
std::move(db_key), DbKey::empty(), scheduler_id));
return Status::OK();
}
void external_init_begin(int32 override_magic = 0) {
close();
if (override_magic) {
magic_ = override_magic;
}
}
template <class OtherBinlogT>
void external_init_handle(BinlogKeyValue<OtherBinlogT> &&other) {
map_ = std::move(other.map_);
}
void external_init_handle(const BinlogEvent &binlog_event) {
Event event;
event.parse(TlParser(binlog_event.get_data()));
if (event.key.empty()) {
LOG(ERROR) << "Have external event with empty key";
return;
}
map_.emplace(event.key.str(), std::make_pair(event.value.str(), binlog_event.id_));
}
void external_init_finish(std::shared_ptr<BinlogT> binlog) {
binlog_ = std::move(binlog);
}
void close() {
*this = BinlogKeyValue();
}
void close(Promise<> promise) final {
binlog_->close(std::move(promise));
}
SeqNo set(string key, string value) final {
auto lock = rw_mutex_.lock_write().move_as_ok();
uint64 old_event_id = 0;
CHECK(!key.empty());
auto it_ok = map_.emplace(key, std::make_pair(value, 0));
if (!it_ok.second) {
if (it_ok.first->second.first == value) {
return 0;
}
VLOG(binlog) << "Change value of key " << key << " from " << hex_encode(it_ok.first->second.first) << " to "
<< hex_encode(value);
old_event_id = it_ok.first->second.second;
it_ok.first->second.first = value;
} else {
VLOG(binlog) << "Set value of key " << key << " to " << hex_encode(value);
}
bool rewrite = false;
uint64 event_id;
auto seq_no = binlog_->next_event_id();
if (old_event_id != 0) {
rewrite = true;
event_id = old_event_id;
} else {
event_id = seq_no;
it_ok.first->second.second = event_id;
}
lock.reset();
add_event(seq_no,
BinlogEvent::create_raw(event_id, magic_, rewrite ? BinlogEvent::Flags::Rewrite : 0, Event{key, value}));
return seq_no;
}
SeqNo erase(const string &key) final {
auto lock = rw_mutex_.lock_write().move_as_ok();
auto it = map_.find(key);
if (it == map_.end()) {
return 0;
}
VLOG(binlog) << "Remove value of key " << key << ", which is " << hex_encode(it->second.first);
uint64 event_id = it->second.second;
map_.erase(it);
auto seq_no = binlog_->next_event_id();
lock.reset();
add_event(seq_no, BinlogEvent::create_raw(event_id, BinlogEvent::ServiceTypes::Empty, BinlogEvent::Flags::Rewrite,
EmptyStorer()));
return seq_no;
}
SeqNo erase_batch(vector<string> keys) final {
auto lock = rw_mutex_.lock_write().move_as_ok();
vector<uint64> log_event_ids;
for (auto &key : keys) {
auto it = map_.find(key);
if (it != map_.end()) {
log_event_ids.push_back(it->second.second);
map_.erase(it);
}
}
if (log_event_ids.empty()) {
return 0;
}
VLOG(binlog) << "Remove value of keys " << keys;
return binlog_->erase_batch(std::move(log_event_ids));
}
void add_event(uint64 seq_no, BufferSlice &&event) {
binlog_->add_raw_event(BinlogDebugInfo{__FILE__, __LINE__}, seq_no, std::move(event));
}
bool isset(const string &key) final {
auto lock = rw_mutex_.lock_read().move_as_ok();
return map_.count(key) > 0;
}
string get(const string &key) final {
auto lock = rw_mutex_.lock_read().move_as_ok();
auto it = map_.find(key);
if (it == map_.end()) {
return string();
}
VLOG(binlog) << "Get value of key " << key << ", which is " << hex_encode(it->second.first);
return it->second.first;
}
void force_sync(Promise<> &&promise, const char *source) final {
binlog_->force_sync(std::move(promise), source);
}
void lazy_sync(Promise<> &&promise) {
binlog_->lazy_sync(std::move(promise));
}
void for_each(std::function<void(Slice, Slice)> func) final {
auto lock = rw_mutex_.lock_write().move_as_ok();
for (const auto &kv : map_) {
func(kv.first, kv.second.first);
}
}
std::unordered_map<string, string, Hash<string>> prefix_get(Slice prefix) final {
auto lock = rw_mutex_.lock_write().move_as_ok();
std::unordered_map<string, string, Hash<string>> res;
for (const auto &kv : map_) {
if (begins_with(kv.first, prefix)) {
res.emplace(kv.first.substr(prefix.size()), kv.second.first);
}
}
return res;
}
FlatHashMap<string, string> get_all() final {
auto lock = rw_mutex_.lock_write().move_as_ok();
FlatHashMap<string, string> res;
res.reserve(map_.size());
for (const auto &kv : map_) {
res.emplace(kv.first, kv.second.first);
}
return res;
}
void erase_by_prefix(Slice prefix) final {
auto lock = rw_mutex_.lock_write().move_as_ok();
vector<uint64> event_ids;
table_remove_if(map_, [&](const auto &it) {
if (begins_with(it.first, prefix)) {
event_ids.push_back(it.second.second);
return true;
}
return false;
});
auto seq_no = binlog_->next_event_id(narrow_cast<int32>(event_ids.size()));
lock.reset();
for (auto event_id : event_ids) {
add_event(seq_no, BinlogEvent::create_raw(event_id, BinlogEvent::ServiceTypes::Empty, BinlogEvent::Flags::Rewrite,
EmptyStorer()));
seq_no++;
}
}
template <class T>
friend class BinlogKeyValue;
static Status destroy(Slice name) {
return Binlog::destroy(name);
}
private:
FlatHashMap<string, std::pair<string, uint64>> map_;
std::shared_ptr<BinlogT> binlog_;
RwMutex rw_mutex_;
int32 magic_ = MAGIC;
};
template <>
inline void BinlogKeyValue<Binlog>::add_event(uint64 seq_no, BufferSlice &&event) {
binlog_->add_raw_event(std::move(event), BinlogDebugInfo{__FILE__, __LINE__});
}
template <>
inline void BinlogKeyValue<Binlog>::force_sync(Promise<> &&promise, const char *source) {
binlog_->sync(source);
promise.set_value(Unit());
}
template <>
inline void BinlogKeyValue<Binlog>::lazy_sync(Promise<> &&promise) {
force_sync(std::move(promise), "lazy_sync");
}
} // namespace td