// // Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2024 // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) // #pragma once #include "td/db/binlog/Binlog.h" #include "td/db/binlog/BinlogEvent.h" #include "td/db/DbKey.h" #include "td/db/KeyValueSyncInterface.h" #include "td/utils/algorithm.h" #include "td/utils/buffer.h" #include "td/utils/common.h" #include "td/utils/FlatHashMap.h" #include "td/utils/HashTableUtils.h" #include "td/utils/logging.h" #include "td/utils/misc.h" #include "td/utils/port/RwMutex.h" #include "td/utils/Promise.h" #include "td/utils/Slice.h" #include "td/utils/Status.h" #include "td/utils/StorerBase.h" #include "td/utils/tl_parsers.h" #include "td/utils/tl_storers.h" #include #include #include namespace td { template class BinlogKeyValue final : public KeyValueSyncInterface { public: static constexpr int32 MAGIC = 0x2a280000; struct Event final : public Storer { Event() = default; Event(Slice key, Slice value) : key(key), value(value) { } Slice key; Slice value; template void store(StorerT &&storer) const { storer.store_string(key); storer.store_string(value); } template void parse(ParserT &&parser) { key = parser.template fetch_string(); value = parser.template fetch_string(); } size_t size() const final { TlStorerCalcLength storer; store(storer); return storer.get_length(); } size_t store(uint8 *ptr) const final { TlStorerUnsafe storer(ptr); store(storer); return static_cast(storer.get_buf() - ptr); } }; int32 get_magic() const { return magic_; } Status init(string name, DbKey db_key = DbKey::empty(), int scheduler_id = -1, int32 override_magic = 0) TD_WARN_UNUSED_RESULT { close(); if (override_magic) { magic_ = override_magic; } binlog_ = std::make_shared(); TRY_STATUS(binlog_->init( name, [&](const BinlogEvent &binlog_event) { Event event; event.parse(TlParser(binlog_event.get_data())); if (event.key.empty()) { LOG(ERROR) << "Have event with empty key"; return; } map_.emplace(event.key.str(), std::make_pair(event.value.str(), binlog_event.id_)); }, std::move(db_key), DbKey::empty(), scheduler_id)); return Status::OK(); } void external_init_begin(int32 override_magic = 0) { close(); if (override_magic) { magic_ = override_magic; } } template void external_init_handle(BinlogKeyValue &&other) { map_ = std::move(other.map_); } void external_init_handle(const BinlogEvent &binlog_event) { Event event; event.parse(TlParser(binlog_event.get_data())); if (event.key.empty()) { LOG(ERROR) << "Have external event with empty key"; return; } map_.emplace(event.key.str(), std::make_pair(event.value.str(), binlog_event.id_)); } void external_init_finish(std::shared_ptr binlog) { binlog_ = std::move(binlog); } void close() { *this = BinlogKeyValue(); } void close(Promise<> promise) final { binlog_->close(std::move(promise)); } SeqNo set(string key, string value) final { auto lock = rw_mutex_.lock_write().move_as_ok(); uint64 old_event_id = 0; CHECK(!key.empty()); auto it_ok = map_.emplace(key, std::make_pair(value, 0)); if (!it_ok.second) { if (it_ok.first->second.first == value) { return 0; } VLOG(binlog) << "Change value of key " << key << " from " << hex_encode(it_ok.first->second.first) << " to " << hex_encode(value); old_event_id = it_ok.first->second.second; it_ok.first->second.first = value; } else { VLOG(binlog) << "Set value of key " << key << " to " << hex_encode(value); } bool rewrite = false; uint64 event_id; auto seq_no = binlog_->next_event_id(); if (old_event_id != 0) { rewrite = true; event_id = old_event_id; } else { event_id = seq_no; it_ok.first->second.second = event_id; } lock.reset(); add_event(seq_no, BinlogEvent::create_raw(event_id, magic_, rewrite ? BinlogEvent::Flags::Rewrite : 0, Event{key, value})); return seq_no; } SeqNo erase(const string &key) final { auto lock = rw_mutex_.lock_write().move_as_ok(); auto it = map_.find(key); if (it == map_.end()) { return 0; } VLOG(binlog) << "Remove value of key " << key << ", which is " << hex_encode(it->second.first); uint64 event_id = it->second.second; map_.erase(it); auto seq_no = binlog_->next_event_id(); lock.reset(); add_event(seq_no, BinlogEvent::create_raw(event_id, BinlogEvent::ServiceTypes::Empty, BinlogEvent::Flags::Rewrite, EmptyStorer())); return seq_no; } SeqNo erase_batch(vector keys) final { auto lock = rw_mutex_.lock_write().move_as_ok(); vector log_event_ids; for (auto &key : keys) { auto it = map_.find(key); if (it != map_.end()) { log_event_ids.push_back(it->second.second); map_.erase(it); } } if (log_event_ids.empty()) { return 0; } VLOG(binlog) << "Remove value of keys " << keys; return binlog_->erase_batch(std::move(log_event_ids)); } void add_event(uint64 seq_no, BufferSlice &&event) { binlog_->add_raw_event(BinlogDebugInfo{__FILE__, __LINE__}, seq_no, std::move(event)); } bool isset(const string &key) final { auto lock = rw_mutex_.lock_read().move_as_ok(); return map_.count(key) > 0; } string get(const string &key) final { auto lock = rw_mutex_.lock_read().move_as_ok(); auto it = map_.find(key); if (it == map_.end()) { return string(); } VLOG(binlog) << "Get value of key " << key << ", which is " << hex_encode(it->second.first); return it->second.first; } void force_sync(Promise<> &&promise) final { binlog_->force_sync(std::move(promise)); } void lazy_sync(Promise<> &&promise) { binlog_->lazy_sync(std::move(promise)); } void for_each(std::function func) final { auto lock = rw_mutex_.lock_write().move_as_ok(); for (const auto &kv : map_) { func(kv.first, kv.second.first); } } std::unordered_map> prefix_get(Slice prefix) final { auto lock = rw_mutex_.lock_write().move_as_ok(); std::unordered_map> res; for (const auto &kv : map_) { if (begins_with(kv.first, prefix)) { res.emplace(kv.first.substr(prefix.size()), kv.second.first); } } return res; } FlatHashMap get_all() final { auto lock = rw_mutex_.lock_write().move_as_ok(); FlatHashMap res; res.reserve(map_.size()); for (const auto &kv : map_) { res.emplace(kv.first, kv.second.first); } return res; } void erase_by_prefix(Slice prefix) final { auto lock = rw_mutex_.lock_write().move_as_ok(); vector event_ids; table_remove_if(map_, [&](const auto &it) { if (begins_with(it.first, prefix)) { event_ids.push_back(it.second.second); return true; } return false; }); auto seq_no = binlog_->next_event_id(narrow_cast(event_ids.size())); lock.reset(); for (auto event_id : event_ids) { add_event(seq_no, BinlogEvent::create_raw(event_id, BinlogEvent::ServiceTypes::Empty, BinlogEvent::Flags::Rewrite, EmptyStorer())); seq_no++; } } template friend class BinlogKeyValue; static Status destroy(Slice name) { return Binlog::destroy(name); } private: FlatHashMap> map_; std::shared_ptr binlog_; RwMutex rw_mutex_; int32 magic_ = MAGIC; }; template <> inline void BinlogKeyValue::add_event(uint64 seq_no, BufferSlice &&event) { binlog_->add_raw_event(std::move(event), BinlogDebugInfo{__FILE__, __LINE__}); } template <> inline void BinlogKeyValue::force_sync(Promise<> &&promise) { binlog_->sync(); promise.set_value(Unit()); } template <> inline void BinlogKeyValue::lazy_sync(Promise<> &&promise) { force_sync(std::move(promise)); } } // namespace td