From e523a5b2f6cb1b659b8531f98fd9b6c822023029 Mon Sep 17 00:00:00 2001 From: levlam Date: Mon, 10 Jul 2023 13:18:32 +0300 Subject: [PATCH] Add story database. --- CMakeLists.txt | 2 + td/telegram/StoryDb.cpp | 379 ++++++++++++++++++++++++++++++++++++++++ td/telegram/StoryDb.h | 90 ++++++++++ td/telegram/TdDb.cpp | 41 ++++- td/telegram/TdDb.h | 9 + 5 files changed, 516 insertions(+), 5 deletions(-) create mode 100644 td/telegram/StoryDb.cpp create mode 100644 td/telegram/StoryDb.h diff --git a/CMakeLists.txt b/CMakeLists.txt index 5372af823..e480ddc83 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -471,6 +471,7 @@ set(TDLIB_SOURCE td/telegram/StorageManager.cpp td/telegram/StoryContent.cpp td/telegram/StoryContentType.cpp + td/telegram/StoryDb.cpp td/telegram/StoryInteractionInfo.cpp td/telegram/StoryManager.cpp td/telegram/SuggestedAction.cpp @@ -765,6 +766,7 @@ set(TDLIB_SOURCE td/telegram/StorageManager.h td/telegram/StoryContent.h td/telegram/StoryContentType.h + td/telegram/StoryDb.h td/telegram/StoryFullId.h td/telegram/StoryId.h td/telegram/StoryInteractionInfo.h diff --git a/td/telegram/StoryDb.cpp b/td/telegram/StoryDb.cpp new file mode 100644 index 000000000..59151ae25 --- /dev/null +++ b/td/telegram/StoryDb.cpp @@ -0,0 +1,379 @@ +// +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2023 +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +#include "td/telegram/StoryDb.h" + +#include "td/telegram/logevent/LogEvent.h" +#include "td/telegram/UserId.h" +#include "td/telegram/Version.h" + +#include "td/db/SqliteConnectionSafe.h" +#include "td/db/SqliteDb.h" +#include "td/db/SqliteStatement.h" + +#include "td/actor/actor.h" +#include "td/actor/SchedulerLocalStorage.h" + +#include "td/utils/format.h" +#include "td/utils/logging.h" +#include "td/utils/ScopeGuard.h" +#include "td/utils/Slice.h" +#include "td/utils/SliceBuilder.h" +#include "td/utils/StackAllocator.h" +#include "td/utils/StringBuilder.h" +#include "td/utils/Time.h" +#include "td/utils/tl_helpers.h" +#include "td/utils/unicode.h" +#include "td/utils/utf8.h" + +#include +#include +#include +#include +#include +#include + +namespace td { + +// NB: must happen inside a transaction +Status init_story_db(SqliteDb &db, int32 version) { + LOG(INFO) << "Init story database " << tag("version", version); + + // Check if database exists + TRY_RESULT(has_table, db.has_table("stories")); + if (!has_table) { + version = 0; + } else if (version < static_cast(DbVersion::DialogDbCreated) || version > current_db_version()) { + TRY_STATUS(drop_story_db(db, version)); + version = 0; + } + + if (version == 0) { + LOG(INFO) << "Create new story database"; + TRY_STATUS( + db.exec("CREATE TABLE IF NOT EXISTS stories (dialog_id INT8, story_id INT4, expires_at INT4, notification_id " + "INT4, data BLOB, PRIMARY KEY (dialog_id, story_id))")); + + TRY_STATUS(db.exec("CREATE INDEX IF NOT EXISTS story_by_ttl ON stories (expires_at) WHERE expires_at IS NOT NULL")); + + TRY_STATUS( + db.exec("CREATE INDEX IF NOT EXISTS story_by_notification_id ON stories (dialog_id, notification_id) WHERE " + "notification_id IS NOT NULL")); + + version = current_db_version(); + } + return Status::OK(); +} + +// NB: must happen inside a transaction +Status drop_story_db(SqliteDb &db, int32 version) { + LOG(WARNING) << "Drop story database " << tag("version", version) << tag("current_db_version", current_db_version()); + return db.exec("DROP TABLE IF EXISTS stories"); +} + +class StoryDbImpl final : public StoryDbSyncInterface { + public: + explicit StoryDbImpl(SqliteDb db) : db_(std::move(db)) { + init().ensure(); + } + + Status init() { + TRY_RESULT_ASSIGN(add_story_stmt_, db_.get_statement("INSERT OR REPLACE INTO stories VALUES(?1, ?2, ?3, ?4, ?5)")); + TRY_RESULT_ASSIGN(delete_story_stmt_, + db_.get_statement("DELETE FROM stories WHERE dialog_id = ?1 AND story_id = ?2")); + TRY_RESULT_ASSIGN(get_story_stmt_, + db_.get_statement("SELECT data FROM stories WHERE dialog_id = ?1 AND story_id = ?2")); + + TRY_RESULT_ASSIGN(get_expiring_stories_stmt_, + db_.get_statement("SELECT data FROM stories WHERE expires_at <= ?1 LIMIT ?2")); + + TRY_RESULT_ASSIGN(get_stories_from_notification_id_stmt_, + db_.get_statement("SELECT data FROM stories WHERE dialog_id = ?1 AND " + "notification_id < ?2 ORDER BY notification_id DESC LIMIT ?3")); + + return Status::OK(); + } + + void add_story(StoryFullId story_full_id, int32 expires_at, NotificationId notification_id, BufferSlice data) final { + LOG(INFO) << "Add " << story_full_id << " to database"; + auto dialog_id = story_full_id.get_dialog_id(); + auto story_id = story_full_id.get_story_id(); + LOG_CHECK(dialog_id.is_valid()) << dialog_id << ' ' << story_id << ' ' << story_full_id; + CHECK(story_id.is_valid()); + SCOPE_EXIT { + add_story_stmt_.reset(); + }; + + add_story_stmt_.bind_int64(1, dialog_id.get()).ensure(); + add_story_stmt_.bind_int32(2, story_id.get()).ensure(); + if (expires_at != 0) { + add_story_stmt_.bind_int32(3, expires_at).ensure(); + } else { + add_story_stmt_.bind_null(3).ensure(); + } + if (notification_id.is_valid()) { + add_story_stmt_.bind_int32(4, notification_id.get()).ensure(); + } else { + add_story_stmt_.bind_null(4).ensure(); + } + add_story_stmt_.bind_blob(5, data.as_slice()).ensure(); + + add_story_stmt_.step().ensure(); + } + + void delete_story(StoryFullId story_full_id) final { + LOG(INFO) << "Delete " << story_full_id << " from database"; + auto dialog_id = story_full_id.get_dialog_id(); + auto story_id = story_full_id.get_story_id(); + CHECK(dialog_id.is_valid()); + CHECK(story_id.is_valid()); + SCOPE_EXIT { + delete_story_stmt_.reset(); + }; + delete_story_stmt_.bind_int64(1, dialog_id.get()).ensure(); + delete_story_stmt_.bind_int32(2, story_id.get()).ensure(); + delete_story_stmt_.step().ensure(); + } + + Result get_story(StoryFullId story_full_id) final { + auto dialog_id = story_full_id.get_dialog_id(); + auto story_id = story_full_id.get_story_id(); + CHECK(dialog_id.is_valid()); + CHECK(story_id.is_valid()); + SCOPE_EXIT { + get_story_stmt_.reset(); + }; + + get_story_stmt_.bind_int64(1, dialog_id.get()).ensure(); + get_story_stmt_.bind_int32(2, story_id.get()).ensure(); + get_story_stmt_.step().ensure(); + if (!get_story_stmt_.has_row()) { + return Status::Error("Not found"); + } + return BufferSlice(get_story_stmt_.view_blob(0)); + } + + vector get_expiring_stories(int32 expires_till, int32 limit) final { + SCOPE_EXIT { + get_expiring_stories_stmt_.reset(); + }; + + vector stories; + get_expiring_stories_stmt_.bind_int32(1, expires_till).ensure(); + get_expiring_stories_stmt_.bind_int32(2, limit).ensure(); + get_expiring_stories_stmt_.step().ensure(); + + while (get_expiring_stories_stmt_.has_row()) { + stories.emplace_back(get_expiring_stories_stmt_.view_blob(0)); + get_expiring_stories_stmt_.step().ensure(); + } + + return stories; + } + + vector get_stories_from_notification_id(DialogId dialog_id, NotificationId from_notification_id, + int32 limit) final { + auto &stmt = get_stories_from_notification_id_stmt_; + SCOPE_EXIT { + stmt.reset(); + }; + stmt.bind_int64(1, dialog_id.get()).ensure(); + stmt.bind_int32(2, from_notification_id.get()).ensure(); + stmt.bind_int32(3, limit).ensure(); + + vector result; + stmt.step().ensure(); + while (stmt.has_row()) { + result.emplace_back(stmt.view_blob(0)); + stmt.step().ensure(); + } + return result; + } + + Status begin_write_transaction() final { + return db_.begin_write_transaction(); + } + Status commit_transaction() final { + return db_.commit_transaction(); + } + + private: + SqliteDb db_; + + SqliteStatement add_story_stmt_; + SqliteStatement delete_story_stmt_; + SqliteStatement get_story_stmt_; + SqliteStatement get_expiring_stories_stmt_; + SqliteStatement get_stories_from_notification_id_stmt_; +}; + +std::shared_ptr create_story_db_sync( + std::shared_ptr sqlite_connection) { + class StoryDbSyncSafe final : public StoryDbSyncSafeInterface { + public: + explicit StoryDbSyncSafe(std::shared_ptr sqlite_connection) + : lsls_db_([safe_connection = std::move(sqlite_connection)] { + return make_unique(safe_connection->get().clone()); + }) { + } + StoryDbSyncInterface &get() final { + return *lsls_db_.get(); + } + + private: + LazySchedulerLocalStorage> lsls_db_; + }; + return std::make_shared(std::move(sqlite_connection)); +} + +class StoryDbAsync final : public StoryDbAsyncInterface { + public: + StoryDbAsync(std::shared_ptr sync_db, int32 scheduler_id) { + impl_ = create_actor_on_scheduler("StoryDbActor", scheduler_id, std::move(sync_db)); + } + + void add_story(StoryFullId story_full_id, int32 expires_at, NotificationId notification_id, BufferSlice data, + Promise promise) final { + send_closure_later(impl_, &Impl::add_story, story_full_id, expires_at, notification_id, std::move(data), + std::move(promise)); + } + + void delete_story(StoryFullId story_full_id, Promise promise) final { + send_closure_later(impl_, &Impl::delete_story, story_full_id, std::move(promise)); + } + + void get_story(StoryFullId story_full_id, Promise promise) final { + send_closure_later(impl_, &Impl::get_story, story_full_id, std::move(promise)); + } + + void get_expiring_stories(int32 expires_till, int32 limit, Promise> promise) final { + send_closure_later(impl_, &Impl::get_expiring_stories, expires_till, limit, std::move(promise)); + } + + void get_stories_from_notification_id(DialogId dialog_id, NotificationId from_notification_id, int32 limit, + Promise> promise) final { + send_closure_later(impl_, &Impl::get_stories_from_notification_id, dialog_id, from_notification_id, limit, + std::move(promise)); + } + + void close(Promise promise) final { + send_closure_later(impl_, &Impl::close, std::move(promise)); + } + + void force_flush() final { + send_closure_later(impl_, &Impl::force_flush); + } + + private: + class Impl final : public Actor { + public: + explicit Impl(std::shared_ptr sync_db_safe) : sync_db_safe_(std::move(sync_db_safe)) { + } + void add_story(StoryFullId story_full_id, int32 expires_at, NotificationId notification_id, BufferSlice data, + Promise promise) { + add_write_query([this, story_full_id, expires_at, notification_id, data = std::move(data), + promise = std::move(promise)](Unit) mutable { + sync_db_->add_story(story_full_id, expires_at, notification_id, std::move(data)); + on_write_result(std::move(promise)); + }); + } + + void delete_story(StoryFullId story_full_id, Promise promise) { + add_write_query([this, story_full_id, promise = std::move(promise)](Unit) mutable { + sync_db_->delete_story(story_full_id); + on_write_result(std::move(promise)); + }); + } + + void on_write_result(Promise &&promise) { + // We are inside a transaction and don't know how to handle errors + finished_writes_.push_back(std::move(promise)); + } + + void get_story(StoryFullId story_full_id, Promise promise) { + add_read_query(); + promise.set_result(sync_db_->get_story(story_full_id)); + } + + void get_expiring_stories(int32 expires_till, int32 limit, Promise> promise) { + add_read_query(); + promise.set_value(sync_db_->get_expiring_stories(expires_till, limit)); + } + + void get_stories_from_notification_id(DialogId dialog_id, NotificationId from_notification_id, int32 limit, + Promise> promise) { + add_read_query(); + promise.set_value(sync_db_->get_stories_from_notification_id(dialog_id, from_notification_id, limit)); + } + + void close(Promise promise) { + do_flush(); + sync_db_safe_.reset(); + sync_db_ = nullptr; + promise.set_value(Unit()); + stop(); + } + + void force_flush() { + do_flush(); + LOG(INFO) << "StoryDb flushed"; + } + + private: + std::shared_ptr sync_db_safe_; + StoryDbSyncInterface *sync_db_ = nullptr; + + static constexpr size_t MAX_PENDING_QUERIES_COUNT{50}; + static constexpr double MAX_PENDING_QUERIES_DELAY{0.01}; + + //NB: order is important, destructor of pending_writes_ will change finished_writes_ + vector> finished_writes_; + vector> pending_writes_; // TODO use Action + double wakeup_at_ = 0; + + template + void add_write_query(F &&f) { + pending_writes_.push_back(PromiseCreator::lambda(std::forward(f))); + if (pending_writes_.size() > MAX_PENDING_QUERIES_COUNT) { + do_flush(); + wakeup_at_ = 0; + } else if (wakeup_at_ == 0) { + wakeup_at_ = Time::now_cached() + MAX_PENDING_QUERIES_DELAY; + } + if (wakeup_at_ != 0) { + set_timeout_at(wakeup_at_); + } + } + void add_read_query() { + do_flush(); + } + void do_flush() { + if (pending_writes_.empty()) { + return; + } + sync_db_->begin_write_transaction().ensure(); + set_promises(pending_writes_); + sync_db_->commit_transaction().ensure(); + set_promises(finished_writes_); + cancel_timeout(); + } + void timeout_expired() final { + do_flush(); + } + + void start_up() final { + sync_db_ = &sync_db_safe_->get(); + } + }; + ActorOwn impl_; +}; + +std::shared_ptr create_story_db_async(std::shared_ptr sync_db, + int32 scheduler_id) { + return std::make_shared(std::move(sync_db), scheduler_id); +} + +} // namespace td diff --git a/td/telegram/StoryDb.h b/td/telegram/StoryDb.h new file mode 100644 index 000000000..e93124cd0 --- /dev/null +++ b/td/telegram/StoryDb.h @@ -0,0 +1,90 @@ +// +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2023 +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +#pragma once + +#include "td/telegram/DialogId.h" +#include "td/telegram/NotificationId.h" +#include "td/telegram/StoryFullId.h" + +#include "td/utils/buffer.h" +#include "td/utils/common.h" +#include "td/utils/Promise.h" +#include "td/utils/Status.h" + +#include +#include + +namespace td { + +class SqliteConnectionSafe; +class SqliteDb; + +class StoryDbSyncInterface { + public: + StoryDbSyncInterface() = default; + StoryDbSyncInterface(const StoryDbSyncInterface &) = delete; + StoryDbSyncInterface &operator=(const StoryDbSyncInterface &) = delete; + virtual ~StoryDbSyncInterface() = default; + + virtual void add_story(StoryFullId story_full_id, int32 expires_at, NotificationId notification_id, + BufferSlice data) = 0; + + virtual void delete_story(StoryFullId story_full_id) = 0; + + virtual Result get_story(StoryFullId story_full_id) = 0; + + virtual vector get_expiring_stories(int32 expires_till, int32 limit) = 0; + + virtual vector get_stories_from_notification_id(DialogId dialog_id, NotificationId from_notification_id, + int32 limit) = 0; + + virtual Status begin_write_transaction() = 0; + virtual Status commit_transaction() = 0; +}; + +class StoryDbSyncSafeInterface { + public: + StoryDbSyncSafeInterface() = default; + StoryDbSyncSafeInterface(const StoryDbSyncSafeInterface &) = delete; + StoryDbSyncSafeInterface &operator=(const StoryDbSyncSafeInterface &) = delete; + virtual ~StoryDbSyncSafeInterface() = default; + + virtual StoryDbSyncInterface &get() = 0; +}; + +class StoryDbAsyncInterface { + public: + StoryDbAsyncInterface() = default; + StoryDbAsyncInterface(const StoryDbAsyncInterface &) = delete; + StoryDbAsyncInterface &operator=(const StoryDbAsyncInterface &) = delete; + virtual ~StoryDbAsyncInterface() = default; + + virtual void add_story(StoryFullId story_full_id, int32 expires_at, NotificationId notification_id, BufferSlice data, + Promise promise) = 0; + + virtual void delete_story(StoryFullId story_full_id, Promise promise) = 0; + + virtual void get_story(StoryFullId story_full_id, Promise promise) = 0; + + virtual void get_expiring_stories(int32 expires_till, int32 limit, Promise> promise) = 0; + + virtual void get_stories_from_notification_id(DialogId dialog_id, NotificationId from_notification_id, int32 limit, + Promise> promise) = 0; + + virtual void close(Promise promise) = 0; + virtual void force_flush() = 0; +}; + +Status init_story_db(SqliteDb &db, int version) TD_WARN_UNUSED_RESULT; +Status drop_story_db(SqliteDb &db, int version) TD_WARN_UNUSED_RESULT; + +std::shared_ptr create_story_db_sync(std::shared_ptr sqlite_connection); + +std::shared_ptr create_story_db_async(std::shared_ptr sync_db, + int32 scheduler_id = -1); + +} // namespace td diff --git a/td/telegram/TdDb.cpp b/td/telegram/TdDb.cpp index b5ad5dab2..fd60f7735 100644 --- a/td/telegram/TdDb.cpp +++ b/td/telegram/TdDb.cpp @@ -13,6 +13,7 @@ #include "td/telegram/logevent/LogEvent.h" #include "td/telegram/MessageDb.h" #include "td/telegram/MessageThreadDb.h" +#include "td/telegram/StoryDb.h" #include "td/telegram/Td.h" #include "td/telegram/Version.h" @@ -233,6 +234,14 @@ DialogDbAsyncInterface *TdDb::get_dialog_db_async() { return dialog_db_async_.get(); } +StoryDbSyncInterface *TdDb::get_story_db_sync() { + return &story_db_sync_safe_->get(); +} + +StoryDbAsyncInterface *TdDb::get_story_db_async() { + return story_db_async_.get(); +} + void TdDb::flush_all() { LOG(INFO) << "Flush all databases"; if (message_db_async_) { @@ -244,6 +253,9 @@ void TdDb::flush_all() { if (dialog_db_async_) { dialog_db_async_->force_flush(); } + if (story_db_async_) { + story_db_async_->force_flush(); + } binlog_->force_flush(); } @@ -299,6 +311,11 @@ void TdDb::do_close(Promise<> on_finished, bool destroy_flag) { dialog_db_async_->close(mpas.get_promise()); } + story_db_sync_safe_.reset(); + if (story_db_async_) { + story_db_async_->close(mpas.get_promise()); + } + // binlog_pmc is dependent on binlog_ and anyway it doesn't support close_and_destroy CHECK(binlog_pmc_.unique()); binlog_pmc_.reset(); @@ -325,10 +342,11 @@ Status TdDb::init_sqlite(const Parameters ¶meters, const DbKey &key, const D const string sql_database_path = get_sqlite_path(parameters); bool use_sqlite = parameters.use_file_database_; - bool use_file_database_ = parameters.use_file_database_; + bool use_file_database = parameters.use_file_database_; bool use_dialog_db = parameters.use_message_database_; bool use_message_thread_db = parameters.use_message_database_ && false; - bool use_message_database_ = parameters.use_message_database_; + bool use_message_database = parameters.use_message_database_; + bool use_story_database = parameters.use_message_database_; was_dialog_db_created_ = false; @@ -370,14 +388,21 @@ Status TdDb::init_sqlite(const Parameters ¶meters, const DbKey &key, const D } // init MessageDb - if (use_message_database_) { + if (use_message_database) { TRY_STATUS(init_message_db(db, user_version)); } else { TRY_STATUS(drop_message_db(db, user_version)); } + // init StoryDb + if (use_story_database) { + TRY_STATUS(init_story_db(db, user_version)); + } else { + TRY_STATUS(drop_story_db(db, user_version)); + } + // init FileDb - if (use_file_database_) { + if (use_file_database) { TRY_STATUS(init_file_db(db, user_version)); } else { TRY_STATUS(drop_file_db(db, user_version)); @@ -432,11 +457,16 @@ Status TdDb::init_sqlite(const Parameters ¶meters, const DbKey &key, const D message_thread_db_async_ = create_message_thread_db_async(message_thread_db_sync_safe_); } - if (use_message_database_) { + if (use_message_database) { message_db_sync_safe_ = create_message_db_sync(sql_connection_); message_db_async_ = create_message_db_async(message_db_sync_safe_); } + if (use_story_database) { + story_db_sync_safe_ = create_story_db_sync(sql_connection_); + story_db_async_ = create_story_db_async(story_db_sync_safe_); + } + return Status::OK(); } @@ -644,6 +674,7 @@ Result TdDb::get_stats() { << mask << "'", PSLICE() << table << ":" << mask); }; + TRY_STATUS(run_query("SELECT 0, SUM(length(data)), COUNT(*) FROM stories WHERE 1", "stories")); TRY_STATUS(run_query("SELECT 0, SUM(length(data)), COUNT(*) FROM messages WHERE 1", "messages")); TRY_STATUS(run_query("SELECT 0, SUM(length(data)), COUNT(*) FROM dialogs WHERE 1", "dialogs")); TRY_STATUS(run_kv_query("%", "common")); diff --git a/td/telegram/TdDb.h b/td/telegram/TdDb.h index 75d74e8b8..b38adbaeb 100644 --- a/td/telegram/TdDb.h +++ b/td/telegram/TdDb.h @@ -38,6 +38,9 @@ class SqliteConnectionSafe; class SqliteKeyValueSafe; class SqliteKeyValueAsyncInterface; class SqliteKeyValue; +class StoryDbSyncInterface; +class StoryDbSyncSafeInterface; +class StoryDbAsyncInterface; class TdDb { public: @@ -141,6 +144,9 @@ class TdDb { DialogDbSyncInterface *get_dialog_db_sync(); DialogDbAsyncInterface *get_dialog_db_async(); + StoryDbSyncInterface *get_story_db_sync(); + StoryDbAsyncInterface *get_story_db_async(); + void change_key(DbKey key, Promise<> promise); void with_db_path(const std::function &callback); @@ -168,6 +174,9 @@ class TdDb { std::shared_ptr dialog_db_sync_safe_; std::shared_ptr dialog_db_async_; + std::shared_ptr story_db_sync_safe_; + std::shared_ptr story_db_async_; + std::shared_ptr> binlog_pmc_; std::shared_ptr> config_pmc_; std::shared_ptr binlog_;