Add story database.
This commit is contained in:
parent
9d4087b101
commit
e523a5b2f6
@ -471,6 +471,7 @@ set(TDLIB_SOURCE
|
|||||||
td/telegram/StorageManager.cpp
|
td/telegram/StorageManager.cpp
|
||||||
td/telegram/StoryContent.cpp
|
td/telegram/StoryContent.cpp
|
||||||
td/telegram/StoryContentType.cpp
|
td/telegram/StoryContentType.cpp
|
||||||
|
td/telegram/StoryDb.cpp
|
||||||
td/telegram/StoryInteractionInfo.cpp
|
td/telegram/StoryInteractionInfo.cpp
|
||||||
td/telegram/StoryManager.cpp
|
td/telegram/StoryManager.cpp
|
||||||
td/telegram/SuggestedAction.cpp
|
td/telegram/SuggestedAction.cpp
|
||||||
@ -765,6 +766,7 @@ set(TDLIB_SOURCE
|
|||||||
td/telegram/StorageManager.h
|
td/telegram/StorageManager.h
|
||||||
td/telegram/StoryContent.h
|
td/telegram/StoryContent.h
|
||||||
td/telegram/StoryContentType.h
|
td/telegram/StoryContentType.h
|
||||||
|
td/telegram/StoryDb.h
|
||||||
td/telegram/StoryFullId.h
|
td/telegram/StoryFullId.h
|
||||||
td/telegram/StoryId.h
|
td/telegram/StoryId.h
|
||||||
td/telegram/StoryInteractionInfo.h
|
td/telegram/StoryInteractionInfo.h
|
||||||
|
379
td/telegram/StoryDb.cpp
Normal file
379
td/telegram/StoryDb.cpp
Normal file
@ -0,0 +1,379 @@
|
|||||||
|
//
|
||||||
|
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2023
|
||||||
|
//
|
||||||
|
// Distributed under the Boost Software License, Version 1.0. (See accompanying
|
||||||
|
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
|
||||||
|
//
|
||||||
|
#include "td/telegram/StoryDb.h"
|
||||||
|
|
||||||
|
#include "td/telegram/logevent/LogEvent.h"
|
||||||
|
#include "td/telegram/UserId.h"
|
||||||
|
#include "td/telegram/Version.h"
|
||||||
|
|
||||||
|
#include "td/db/SqliteConnectionSafe.h"
|
||||||
|
#include "td/db/SqliteDb.h"
|
||||||
|
#include "td/db/SqliteStatement.h"
|
||||||
|
|
||||||
|
#include "td/actor/actor.h"
|
||||||
|
#include "td/actor/SchedulerLocalStorage.h"
|
||||||
|
|
||||||
|
#include "td/utils/format.h"
|
||||||
|
#include "td/utils/logging.h"
|
||||||
|
#include "td/utils/ScopeGuard.h"
|
||||||
|
#include "td/utils/Slice.h"
|
||||||
|
#include "td/utils/SliceBuilder.h"
|
||||||
|
#include "td/utils/StackAllocator.h"
|
||||||
|
#include "td/utils/StringBuilder.h"
|
||||||
|
#include "td/utils/Time.h"
|
||||||
|
#include "td/utils/tl_helpers.h"
|
||||||
|
#include "td/utils/unicode.h"
|
||||||
|
#include "td/utils/utf8.h"
|
||||||
|
|
||||||
|
#include <algorithm>
|
||||||
|
#include <array>
|
||||||
|
#include <iterator>
|
||||||
|
#include <limits>
|
||||||
|
#include <tuple>
|
||||||
|
#include <utility>
|
||||||
|
|
||||||
|
namespace td {
|
||||||
|
|
||||||
|
// NB: must happen inside a transaction
|
||||||
|
Status init_story_db(SqliteDb &db, int32 version) {
|
||||||
|
LOG(INFO) << "Init story database " << tag("version", version);
|
||||||
|
|
||||||
|
// Check if database exists
|
||||||
|
TRY_RESULT(has_table, db.has_table("stories"));
|
||||||
|
if (!has_table) {
|
||||||
|
version = 0;
|
||||||
|
} else if (version < static_cast<int32>(DbVersion::DialogDbCreated) || version > current_db_version()) {
|
||||||
|
TRY_STATUS(drop_story_db(db, version));
|
||||||
|
version = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (version == 0) {
|
||||||
|
LOG(INFO) << "Create new story database";
|
||||||
|
TRY_STATUS(
|
||||||
|
db.exec("CREATE TABLE IF NOT EXISTS stories (dialog_id INT8, story_id INT4, expires_at INT4, notification_id "
|
||||||
|
"INT4, data BLOB, PRIMARY KEY (dialog_id, story_id))"));
|
||||||
|
|
||||||
|
TRY_STATUS(db.exec("CREATE INDEX IF NOT EXISTS story_by_ttl ON stories (expires_at) WHERE expires_at IS NOT NULL"));
|
||||||
|
|
||||||
|
TRY_STATUS(
|
||||||
|
db.exec("CREATE INDEX IF NOT EXISTS story_by_notification_id ON stories (dialog_id, notification_id) WHERE "
|
||||||
|
"notification_id IS NOT NULL"));
|
||||||
|
|
||||||
|
version = current_db_version();
|
||||||
|
}
|
||||||
|
return Status::OK();
|
||||||
|
}
|
||||||
|
|
||||||
|
// NB: must happen inside a transaction
|
||||||
|
Status drop_story_db(SqliteDb &db, int32 version) {
|
||||||
|
LOG(WARNING) << "Drop story database " << tag("version", version) << tag("current_db_version", current_db_version());
|
||||||
|
return db.exec("DROP TABLE IF EXISTS stories");
|
||||||
|
}
|
||||||
|
|
||||||
|
class StoryDbImpl final : public StoryDbSyncInterface {
|
||||||
|
public:
|
||||||
|
explicit StoryDbImpl(SqliteDb db) : db_(std::move(db)) {
|
||||||
|
init().ensure();
|
||||||
|
}
|
||||||
|
|
||||||
|
Status init() {
|
||||||
|
TRY_RESULT_ASSIGN(add_story_stmt_, db_.get_statement("INSERT OR REPLACE INTO stories VALUES(?1, ?2, ?3, ?4, ?5)"));
|
||||||
|
TRY_RESULT_ASSIGN(delete_story_stmt_,
|
||||||
|
db_.get_statement("DELETE FROM stories WHERE dialog_id = ?1 AND story_id = ?2"));
|
||||||
|
TRY_RESULT_ASSIGN(get_story_stmt_,
|
||||||
|
db_.get_statement("SELECT data FROM stories WHERE dialog_id = ?1 AND story_id = ?2"));
|
||||||
|
|
||||||
|
TRY_RESULT_ASSIGN(get_expiring_stories_stmt_,
|
||||||
|
db_.get_statement("SELECT data FROM stories WHERE expires_at <= ?1 LIMIT ?2"));
|
||||||
|
|
||||||
|
TRY_RESULT_ASSIGN(get_stories_from_notification_id_stmt_,
|
||||||
|
db_.get_statement("SELECT data FROM stories WHERE dialog_id = ?1 AND "
|
||||||
|
"notification_id < ?2 ORDER BY notification_id DESC LIMIT ?3"));
|
||||||
|
|
||||||
|
return Status::OK();
|
||||||
|
}
|
||||||
|
|
||||||
|
void add_story(StoryFullId story_full_id, int32 expires_at, NotificationId notification_id, BufferSlice data) final {
|
||||||
|
LOG(INFO) << "Add " << story_full_id << " to database";
|
||||||
|
auto dialog_id = story_full_id.get_dialog_id();
|
||||||
|
auto story_id = story_full_id.get_story_id();
|
||||||
|
LOG_CHECK(dialog_id.is_valid()) << dialog_id << ' ' << story_id << ' ' << story_full_id;
|
||||||
|
CHECK(story_id.is_valid());
|
||||||
|
SCOPE_EXIT {
|
||||||
|
add_story_stmt_.reset();
|
||||||
|
};
|
||||||
|
|
||||||
|
add_story_stmt_.bind_int64(1, dialog_id.get()).ensure();
|
||||||
|
add_story_stmt_.bind_int32(2, story_id.get()).ensure();
|
||||||
|
if (expires_at != 0) {
|
||||||
|
add_story_stmt_.bind_int32(3, expires_at).ensure();
|
||||||
|
} else {
|
||||||
|
add_story_stmt_.bind_null(3).ensure();
|
||||||
|
}
|
||||||
|
if (notification_id.is_valid()) {
|
||||||
|
add_story_stmt_.bind_int32(4, notification_id.get()).ensure();
|
||||||
|
} else {
|
||||||
|
add_story_stmt_.bind_null(4).ensure();
|
||||||
|
}
|
||||||
|
add_story_stmt_.bind_blob(5, data.as_slice()).ensure();
|
||||||
|
|
||||||
|
add_story_stmt_.step().ensure();
|
||||||
|
}
|
||||||
|
|
||||||
|
void delete_story(StoryFullId story_full_id) final {
|
||||||
|
LOG(INFO) << "Delete " << story_full_id << " from database";
|
||||||
|
auto dialog_id = story_full_id.get_dialog_id();
|
||||||
|
auto story_id = story_full_id.get_story_id();
|
||||||
|
CHECK(dialog_id.is_valid());
|
||||||
|
CHECK(story_id.is_valid());
|
||||||
|
SCOPE_EXIT {
|
||||||
|
delete_story_stmt_.reset();
|
||||||
|
};
|
||||||
|
delete_story_stmt_.bind_int64(1, dialog_id.get()).ensure();
|
||||||
|
delete_story_stmt_.bind_int32(2, story_id.get()).ensure();
|
||||||
|
delete_story_stmt_.step().ensure();
|
||||||
|
}
|
||||||
|
|
||||||
|
Result<BufferSlice> get_story(StoryFullId story_full_id) final {
|
||||||
|
auto dialog_id = story_full_id.get_dialog_id();
|
||||||
|
auto story_id = story_full_id.get_story_id();
|
||||||
|
CHECK(dialog_id.is_valid());
|
||||||
|
CHECK(story_id.is_valid());
|
||||||
|
SCOPE_EXIT {
|
||||||
|
get_story_stmt_.reset();
|
||||||
|
};
|
||||||
|
|
||||||
|
get_story_stmt_.bind_int64(1, dialog_id.get()).ensure();
|
||||||
|
get_story_stmt_.bind_int32(2, story_id.get()).ensure();
|
||||||
|
get_story_stmt_.step().ensure();
|
||||||
|
if (!get_story_stmt_.has_row()) {
|
||||||
|
return Status::Error("Not found");
|
||||||
|
}
|
||||||
|
return BufferSlice(get_story_stmt_.view_blob(0));
|
||||||
|
}
|
||||||
|
|
||||||
|
vector<BufferSlice> get_expiring_stories(int32 expires_till, int32 limit) final {
|
||||||
|
SCOPE_EXIT {
|
||||||
|
get_expiring_stories_stmt_.reset();
|
||||||
|
};
|
||||||
|
|
||||||
|
vector<BufferSlice> stories;
|
||||||
|
get_expiring_stories_stmt_.bind_int32(1, expires_till).ensure();
|
||||||
|
get_expiring_stories_stmt_.bind_int32(2, limit).ensure();
|
||||||
|
get_expiring_stories_stmt_.step().ensure();
|
||||||
|
|
||||||
|
while (get_expiring_stories_stmt_.has_row()) {
|
||||||
|
stories.emplace_back(get_expiring_stories_stmt_.view_blob(0));
|
||||||
|
get_expiring_stories_stmt_.step().ensure();
|
||||||
|
}
|
||||||
|
|
||||||
|
return stories;
|
||||||
|
}
|
||||||
|
|
||||||
|
vector<BufferSlice> get_stories_from_notification_id(DialogId dialog_id, NotificationId from_notification_id,
|
||||||
|
int32 limit) final {
|
||||||
|
auto &stmt = get_stories_from_notification_id_stmt_;
|
||||||
|
SCOPE_EXIT {
|
||||||
|
stmt.reset();
|
||||||
|
};
|
||||||
|
stmt.bind_int64(1, dialog_id.get()).ensure();
|
||||||
|
stmt.bind_int32(2, from_notification_id.get()).ensure();
|
||||||
|
stmt.bind_int32(3, limit).ensure();
|
||||||
|
|
||||||
|
vector<BufferSlice> result;
|
||||||
|
stmt.step().ensure();
|
||||||
|
while (stmt.has_row()) {
|
||||||
|
result.emplace_back(stmt.view_blob(0));
|
||||||
|
stmt.step().ensure();
|
||||||
|
}
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
Status begin_write_transaction() final {
|
||||||
|
return db_.begin_write_transaction();
|
||||||
|
}
|
||||||
|
Status commit_transaction() final {
|
||||||
|
return db_.commit_transaction();
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
SqliteDb db_;
|
||||||
|
|
||||||
|
SqliteStatement add_story_stmt_;
|
||||||
|
SqliteStatement delete_story_stmt_;
|
||||||
|
SqliteStatement get_story_stmt_;
|
||||||
|
SqliteStatement get_expiring_stories_stmt_;
|
||||||
|
SqliteStatement get_stories_from_notification_id_stmt_;
|
||||||
|
};
|
||||||
|
|
||||||
|
std::shared_ptr<StoryDbSyncSafeInterface> create_story_db_sync(
|
||||||
|
std::shared_ptr<SqliteConnectionSafe> sqlite_connection) {
|
||||||
|
class StoryDbSyncSafe final : public StoryDbSyncSafeInterface {
|
||||||
|
public:
|
||||||
|
explicit StoryDbSyncSafe(std::shared_ptr<SqliteConnectionSafe> sqlite_connection)
|
||||||
|
: lsls_db_([safe_connection = std::move(sqlite_connection)] {
|
||||||
|
return make_unique<StoryDbImpl>(safe_connection->get().clone());
|
||||||
|
}) {
|
||||||
|
}
|
||||||
|
StoryDbSyncInterface &get() final {
|
||||||
|
return *lsls_db_.get();
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
LazySchedulerLocalStorage<unique_ptr<StoryDbSyncInterface>> lsls_db_;
|
||||||
|
};
|
||||||
|
return std::make_shared<StoryDbSyncSafe>(std::move(sqlite_connection));
|
||||||
|
}
|
||||||
|
|
||||||
|
class StoryDbAsync final : public StoryDbAsyncInterface {
|
||||||
|
public:
|
||||||
|
StoryDbAsync(std::shared_ptr<StoryDbSyncSafeInterface> sync_db, int32 scheduler_id) {
|
||||||
|
impl_ = create_actor_on_scheduler<Impl>("StoryDbActor", scheduler_id, std::move(sync_db));
|
||||||
|
}
|
||||||
|
|
||||||
|
void add_story(StoryFullId story_full_id, int32 expires_at, NotificationId notification_id, BufferSlice data,
|
||||||
|
Promise<Unit> promise) final {
|
||||||
|
send_closure_later(impl_, &Impl::add_story, story_full_id, expires_at, notification_id, std::move(data),
|
||||||
|
std::move(promise));
|
||||||
|
}
|
||||||
|
|
||||||
|
void delete_story(StoryFullId story_full_id, Promise<Unit> promise) final {
|
||||||
|
send_closure_later(impl_, &Impl::delete_story, story_full_id, std::move(promise));
|
||||||
|
}
|
||||||
|
|
||||||
|
void get_story(StoryFullId story_full_id, Promise<BufferSlice> promise) final {
|
||||||
|
send_closure_later(impl_, &Impl::get_story, story_full_id, std::move(promise));
|
||||||
|
}
|
||||||
|
|
||||||
|
void get_expiring_stories(int32 expires_till, int32 limit, Promise<vector<BufferSlice>> promise) final {
|
||||||
|
send_closure_later(impl_, &Impl::get_expiring_stories, expires_till, limit, std::move(promise));
|
||||||
|
}
|
||||||
|
|
||||||
|
void get_stories_from_notification_id(DialogId dialog_id, NotificationId from_notification_id, int32 limit,
|
||||||
|
Promise<vector<BufferSlice>> promise) final {
|
||||||
|
send_closure_later(impl_, &Impl::get_stories_from_notification_id, dialog_id, from_notification_id, limit,
|
||||||
|
std::move(promise));
|
||||||
|
}
|
||||||
|
|
||||||
|
void close(Promise<Unit> promise) final {
|
||||||
|
send_closure_later(impl_, &Impl::close, std::move(promise));
|
||||||
|
}
|
||||||
|
|
||||||
|
void force_flush() final {
|
||||||
|
send_closure_later(impl_, &Impl::force_flush);
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
class Impl final : public Actor {
|
||||||
|
public:
|
||||||
|
explicit Impl(std::shared_ptr<StoryDbSyncSafeInterface> sync_db_safe) : sync_db_safe_(std::move(sync_db_safe)) {
|
||||||
|
}
|
||||||
|
void add_story(StoryFullId story_full_id, int32 expires_at, NotificationId notification_id, BufferSlice data,
|
||||||
|
Promise<Unit> promise) {
|
||||||
|
add_write_query([this, story_full_id, expires_at, notification_id, data = std::move(data),
|
||||||
|
promise = std::move(promise)](Unit) mutable {
|
||||||
|
sync_db_->add_story(story_full_id, expires_at, notification_id, std::move(data));
|
||||||
|
on_write_result(std::move(promise));
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
void delete_story(StoryFullId story_full_id, Promise<Unit> promise) {
|
||||||
|
add_write_query([this, story_full_id, promise = std::move(promise)](Unit) mutable {
|
||||||
|
sync_db_->delete_story(story_full_id);
|
||||||
|
on_write_result(std::move(promise));
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
void on_write_result(Promise<Unit> &&promise) {
|
||||||
|
// We are inside a transaction and don't know how to handle errors
|
||||||
|
finished_writes_.push_back(std::move(promise));
|
||||||
|
}
|
||||||
|
|
||||||
|
void get_story(StoryFullId story_full_id, Promise<BufferSlice> promise) {
|
||||||
|
add_read_query();
|
||||||
|
promise.set_result(sync_db_->get_story(story_full_id));
|
||||||
|
}
|
||||||
|
|
||||||
|
void get_expiring_stories(int32 expires_till, int32 limit, Promise<vector<BufferSlice>> promise) {
|
||||||
|
add_read_query();
|
||||||
|
promise.set_value(sync_db_->get_expiring_stories(expires_till, limit));
|
||||||
|
}
|
||||||
|
|
||||||
|
void get_stories_from_notification_id(DialogId dialog_id, NotificationId from_notification_id, int32 limit,
|
||||||
|
Promise<vector<BufferSlice>> promise) {
|
||||||
|
add_read_query();
|
||||||
|
promise.set_value(sync_db_->get_stories_from_notification_id(dialog_id, from_notification_id, limit));
|
||||||
|
}
|
||||||
|
|
||||||
|
void close(Promise<Unit> promise) {
|
||||||
|
do_flush();
|
||||||
|
sync_db_safe_.reset();
|
||||||
|
sync_db_ = nullptr;
|
||||||
|
promise.set_value(Unit());
|
||||||
|
stop();
|
||||||
|
}
|
||||||
|
|
||||||
|
void force_flush() {
|
||||||
|
do_flush();
|
||||||
|
LOG(INFO) << "StoryDb flushed";
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
std::shared_ptr<StoryDbSyncSafeInterface> sync_db_safe_;
|
||||||
|
StoryDbSyncInterface *sync_db_ = nullptr;
|
||||||
|
|
||||||
|
static constexpr size_t MAX_PENDING_QUERIES_COUNT{50};
|
||||||
|
static constexpr double MAX_PENDING_QUERIES_DELAY{0.01};
|
||||||
|
|
||||||
|
//NB: order is important, destructor of pending_writes_ will change finished_writes_
|
||||||
|
vector<Promise<Unit>> finished_writes_;
|
||||||
|
vector<Promise<Unit>> pending_writes_; // TODO use Action
|
||||||
|
double wakeup_at_ = 0;
|
||||||
|
|
||||||
|
template <class F>
|
||||||
|
void add_write_query(F &&f) {
|
||||||
|
pending_writes_.push_back(PromiseCreator::lambda(std::forward<F>(f)));
|
||||||
|
if (pending_writes_.size() > MAX_PENDING_QUERIES_COUNT) {
|
||||||
|
do_flush();
|
||||||
|
wakeup_at_ = 0;
|
||||||
|
} else if (wakeup_at_ == 0) {
|
||||||
|
wakeup_at_ = Time::now_cached() + MAX_PENDING_QUERIES_DELAY;
|
||||||
|
}
|
||||||
|
if (wakeup_at_ != 0) {
|
||||||
|
set_timeout_at(wakeup_at_);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
void add_read_query() {
|
||||||
|
do_flush();
|
||||||
|
}
|
||||||
|
void do_flush() {
|
||||||
|
if (pending_writes_.empty()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
sync_db_->begin_write_transaction().ensure();
|
||||||
|
set_promises(pending_writes_);
|
||||||
|
sync_db_->commit_transaction().ensure();
|
||||||
|
set_promises(finished_writes_);
|
||||||
|
cancel_timeout();
|
||||||
|
}
|
||||||
|
void timeout_expired() final {
|
||||||
|
do_flush();
|
||||||
|
}
|
||||||
|
|
||||||
|
void start_up() final {
|
||||||
|
sync_db_ = &sync_db_safe_->get();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
ActorOwn<Impl> impl_;
|
||||||
|
};
|
||||||
|
|
||||||
|
std::shared_ptr<StoryDbAsyncInterface> create_story_db_async(std::shared_ptr<StoryDbSyncSafeInterface> sync_db,
|
||||||
|
int32 scheduler_id) {
|
||||||
|
return std::make_shared<StoryDbAsync>(std::move(sync_db), scheduler_id);
|
||||||
|
}
|
||||||
|
|
||||||
|
} // namespace td
|
90
td/telegram/StoryDb.h
Normal file
90
td/telegram/StoryDb.h
Normal file
@ -0,0 +1,90 @@
|
|||||||
|
//
|
||||||
|
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2023
|
||||||
|
//
|
||||||
|
// Distributed under the Boost Software License, Version 1.0. (See accompanying
|
||||||
|
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
|
||||||
|
//
|
||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include "td/telegram/DialogId.h"
|
||||||
|
#include "td/telegram/NotificationId.h"
|
||||||
|
#include "td/telegram/StoryFullId.h"
|
||||||
|
|
||||||
|
#include "td/utils/buffer.h"
|
||||||
|
#include "td/utils/common.h"
|
||||||
|
#include "td/utils/Promise.h"
|
||||||
|
#include "td/utils/Status.h"
|
||||||
|
|
||||||
|
#include <memory>
|
||||||
|
#include <utility>
|
||||||
|
|
||||||
|
namespace td {
|
||||||
|
|
||||||
|
class SqliteConnectionSafe;
|
||||||
|
class SqliteDb;
|
||||||
|
|
||||||
|
class StoryDbSyncInterface {
|
||||||
|
public:
|
||||||
|
StoryDbSyncInterface() = default;
|
||||||
|
StoryDbSyncInterface(const StoryDbSyncInterface &) = delete;
|
||||||
|
StoryDbSyncInterface &operator=(const StoryDbSyncInterface &) = delete;
|
||||||
|
virtual ~StoryDbSyncInterface() = default;
|
||||||
|
|
||||||
|
virtual void add_story(StoryFullId story_full_id, int32 expires_at, NotificationId notification_id,
|
||||||
|
BufferSlice data) = 0;
|
||||||
|
|
||||||
|
virtual void delete_story(StoryFullId story_full_id) = 0;
|
||||||
|
|
||||||
|
virtual Result<BufferSlice> get_story(StoryFullId story_full_id) = 0;
|
||||||
|
|
||||||
|
virtual vector<BufferSlice> get_expiring_stories(int32 expires_till, int32 limit) = 0;
|
||||||
|
|
||||||
|
virtual vector<BufferSlice> get_stories_from_notification_id(DialogId dialog_id, NotificationId from_notification_id,
|
||||||
|
int32 limit) = 0;
|
||||||
|
|
||||||
|
virtual Status begin_write_transaction() = 0;
|
||||||
|
virtual Status commit_transaction() = 0;
|
||||||
|
};
|
||||||
|
|
||||||
|
class StoryDbSyncSafeInterface {
|
||||||
|
public:
|
||||||
|
StoryDbSyncSafeInterface() = default;
|
||||||
|
StoryDbSyncSafeInterface(const StoryDbSyncSafeInterface &) = delete;
|
||||||
|
StoryDbSyncSafeInterface &operator=(const StoryDbSyncSafeInterface &) = delete;
|
||||||
|
virtual ~StoryDbSyncSafeInterface() = default;
|
||||||
|
|
||||||
|
virtual StoryDbSyncInterface &get() = 0;
|
||||||
|
};
|
||||||
|
|
||||||
|
class StoryDbAsyncInterface {
|
||||||
|
public:
|
||||||
|
StoryDbAsyncInterface() = default;
|
||||||
|
StoryDbAsyncInterface(const StoryDbAsyncInterface &) = delete;
|
||||||
|
StoryDbAsyncInterface &operator=(const StoryDbAsyncInterface &) = delete;
|
||||||
|
virtual ~StoryDbAsyncInterface() = default;
|
||||||
|
|
||||||
|
virtual void add_story(StoryFullId story_full_id, int32 expires_at, NotificationId notification_id, BufferSlice data,
|
||||||
|
Promise<Unit> promise) = 0;
|
||||||
|
|
||||||
|
virtual void delete_story(StoryFullId story_full_id, Promise<Unit> promise) = 0;
|
||||||
|
|
||||||
|
virtual void get_story(StoryFullId story_full_id, Promise<BufferSlice> promise) = 0;
|
||||||
|
|
||||||
|
virtual void get_expiring_stories(int32 expires_till, int32 limit, Promise<vector<BufferSlice>> promise) = 0;
|
||||||
|
|
||||||
|
virtual void get_stories_from_notification_id(DialogId dialog_id, NotificationId from_notification_id, int32 limit,
|
||||||
|
Promise<vector<BufferSlice>> promise) = 0;
|
||||||
|
|
||||||
|
virtual void close(Promise<Unit> promise) = 0;
|
||||||
|
virtual void force_flush() = 0;
|
||||||
|
};
|
||||||
|
|
||||||
|
Status init_story_db(SqliteDb &db, int version) TD_WARN_UNUSED_RESULT;
|
||||||
|
Status drop_story_db(SqliteDb &db, int version) TD_WARN_UNUSED_RESULT;
|
||||||
|
|
||||||
|
std::shared_ptr<StoryDbSyncSafeInterface> create_story_db_sync(std::shared_ptr<SqliteConnectionSafe> sqlite_connection);
|
||||||
|
|
||||||
|
std::shared_ptr<StoryDbAsyncInterface> create_story_db_async(std::shared_ptr<StoryDbSyncSafeInterface> sync_db,
|
||||||
|
int32 scheduler_id = -1);
|
||||||
|
|
||||||
|
} // namespace td
|
@ -13,6 +13,7 @@
|
|||||||
#include "td/telegram/logevent/LogEvent.h"
|
#include "td/telegram/logevent/LogEvent.h"
|
||||||
#include "td/telegram/MessageDb.h"
|
#include "td/telegram/MessageDb.h"
|
||||||
#include "td/telegram/MessageThreadDb.h"
|
#include "td/telegram/MessageThreadDb.h"
|
||||||
|
#include "td/telegram/StoryDb.h"
|
||||||
#include "td/telegram/Td.h"
|
#include "td/telegram/Td.h"
|
||||||
#include "td/telegram/Version.h"
|
#include "td/telegram/Version.h"
|
||||||
|
|
||||||
@ -233,6 +234,14 @@ DialogDbAsyncInterface *TdDb::get_dialog_db_async() {
|
|||||||
return dialog_db_async_.get();
|
return dialog_db_async_.get();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
StoryDbSyncInterface *TdDb::get_story_db_sync() {
|
||||||
|
return &story_db_sync_safe_->get();
|
||||||
|
}
|
||||||
|
|
||||||
|
StoryDbAsyncInterface *TdDb::get_story_db_async() {
|
||||||
|
return story_db_async_.get();
|
||||||
|
}
|
||||||
|
|
||||||
void TdDb::flush_all() {
|
void TdDb::flush_all() {
|
||||||
LOG(INFO) << "Flush all databases";
|
LOG(INFO) << "Flush all databases";
|
||||||
if (message_db_async_) {
|
if (message_db_async_) {
|
||||||
@ -244,6 +253,9 @@ void TdDb::flush_all() {
|
|||||||
if (dialog_db_async_) {
|
if (dialog_db_async_) {
|
||||||
dialog_db_async_->force_flush();
|
dialog_db_async_->force_flush();
|
||||||
}
|
}
|
||||||
|
if (story_db_async_) {
|
||||||
|
story_db_async_->force_flush();
|
||||||
|
}
|
||||||
binlog_->force_flush();
|
binlog_->force_flush();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -299,6 +311,11 @@ void TdDb::do_close(Promise<> on_finished, bool destroy_flag) {
|
|||||||
dialog_db_async_->close(mpas.get_promise());
|
dialog_db_async_->close(mpas.get_promise());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
story_db_sync_safe_.reset();
|
||||||
|
if (story_db_async_) {
|
||||||
|
story_db_async_->close(mpas.get_promise());
|
||||||
|
}
|
||||||
|
|
||||||
// binlog_pmc is dependent on binlog_ and anyway it doesn't support close_and_destroy
|
// binlog_pmc is dependent on binlog_ and anyway it doesn't support close_and_destroy
|
||||||
CHECK(binlog_pmc_.unique());
|
CHECK(binlog_pmc_.unique());
|
||||||
binlog_pmc_.reset();
|
binlog_pmc_.reset();
|
||||||
@ -325,10 +342,11 @@ Status TdDb::init_sqlite(const Parameters ¶meters, const DbKey &key, const D
|
|||||||
const string sql_database_path = get_sqlite_path(parameters);
|
const string sql_database_path = get_sqlite_path(parameters);
|
||||||
|
|
||||||
bool use_sqlite = parameters.use_file_database_;
|
bool use_sqlite = parameters.use_file_database_;
|
||||||
bool use_file_database_ = parameters.use_file_database_;
|
bool use_file_database = parameters.use_file_database_;
|
||||||
bool use_dialog_db = parameters.use_message_database_;
|
bool use_dialog_db = parameters.use_message_database_;
|
||||||
bool use_message_thread_db = parameters.use_message_database_ && false;
|
bool use_message_thread_db = parameters.use_message_database_ && false;
|
||||||
bool use_message_database_ = parameters.use_message_database_;
|
bool use_message_database = parameters.use_message_database_;
|
||||||
|
bool use_story_database = parameters.use_message_database_;
|
||||||
|
|
||||||
was_dialog_db_created_ = false;
|
was_dialog_db_created_ = false;
|
||||||
|
|
||||||
@ -370,14 +388,21 @@ Status TdDb::init_sqlite(const Parameters ¶meters, const DbKey &key, const D
|
|||||||
}
|
}
|
||||||
|
|
||||||
// init MessageDb
|
// init MessageDb
|
||||||
if (use_message_database_) {
|
if (use_message_database) {
|
||||||
TRY_STATUS(init_message_db(db, user_version));
|
TRY_STATUS(init_message_db(db, user_version));
|
||||||
} else {
|
} else {
|
||||||
TRY_STATUS(drop_message_db(db, user_version));
|
TRY_STATUS(drop_message_db(db, user_version));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// init StoryDb
|
||||||
|
if (use_story_database) {
|
||||||
|
TRY_STATUS(init_story_db(db, user_version));
|
||||||
|
} else {
|
||||||
|
TRY_STATUS(drop_story_db(db, user_version));
|
||||||
|
}
|
||||||
|
|
||||||
// init FileDb
|
// init FileDb
|
||||||
if (use_file_database_) {
|
if (use_file_database) {
|
||||||
TRY_STATUS(init_file_db(db, user_version));
|
TRY_STATUS(init_file_db(db, user_version));
|
||||||
} else {
|
} else {
|
||||||
TRY_STATUS(drop_file_db(db, user_version));
|
TRY_STATUS(drop_file_db(db, user_version));
|
||||||
@ -432,11 +457,16 @@ Status TdDb::init_sqlite(const Parameters ¶meters, const DbKey &key, const D
|
|||||||
message_thread_db_async_ = create_message_thread_db_async(message_thread_db_sync_safe_);
|
message_thread_db_async_ = create_message_thread_db_async(message_thread_db_sync_safe_);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (use_message_database_) {
|
if (use_message_database) {
|
||||||
message_db_sync_safe_ = create_message_db_sync(sql_connection_);
|
message_db_sync_safe_ = create_message_db_sync(sql_connection_);
|
||||||
message_db_async_ = create_message_db_async(message_db_sync_safe_);
|
message_db_async_ = create_message_db_async(message_db_sync_safe_);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (use_story_database) {
|
||||||
|
story_db_sync_safe_ = create_story_db_sync(sql_connection_);
|
||||||
|
story_db_async_ = create_story_db_async(story_db_sync_safe_);
|
||||||
|
}
|
||||||
|
|
||||||
return Status::OK();
|
return Status::OK();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -644,6 +674,7 @@ Result<string> TdDb::get_stats() {
|
|||||||
<< mask << "'",
|
<< mask << "'",
|
||||||
PSLICE() << table << ":" << mask);
|
PSLICE() << table << ":" << mask);
|
||||||
};
|
};
|
||||||
|
TRY_STATUS(run_query("SELECT 0, SUM(length(data)), COUNT(*) FROM stories WHERE 1", "stories"));
|
||||||
TRY_STATUS(run_query("SELECT 0, SUM(length(data)), COUNT(*) FROM messages WHERE 1", "messages"));
|
TRY_STATUS(run_query("SELECT 0, SUM(length(data)), COUNT(*) FROM messages WHERE 1", "messages"));
|
||||||
TRY_STATUS(run_query("SELECT 0, SUM(length(data)), COUNT(*) FROM dialogs WHERE 1", "dialogs"));
|
TRY_STATUS(run_query("SELECT 0, SUM(length(data)), COUNT(*) FROM dialogs WHERE 1", "dialogs"));
|
||||||
TRY_STATUS(run_kv_query("%", "common"));
|
TRY_STATUS(run_kv_query("%", "common"));
|
||||||
|
@ -38,6 +38,9 @@ class SqliteConnectionSafe;
|
|||||||
class SqliteKeyValueSafe;
|
class SqliteKeyValueSafe;
|
||||||
class SqliteKeyValueAsyncInterface;
|
class SqliteKeyValueAsyncInterface;
|
||||||
class SqliteKeyValue;
|
class SqliteKeyValue;
|
||||||
|
class StoryDbSyncInterface;
|
||||||
|
class StoryDbSyncSafeInterface;
|
||||||
|
class StoryDbAsyncInterface;
|
||||||
|
|
||||||
class TdDb {
|
class TdDb {
|
||||||
public:
|
public:
|
||||||
@ -141,6 +144,9 @@ class TdDb {
|
|||||||
DialogDbSyncInterface *get_dialog_db_sync();
|
DialogDbSyncInterface *get_dialog_db_sync();
|
||||||
DialogDbAsyncInterface *get_dialog_db_async();
|
DialogDbAsyncInterface *get_dialog_db_async();
|
||||||
|
|
||||||
|
StoryDbSyncInterface *get_story_db_sync();
|
||||||
|
StoryDbAsyncInterface *get_story_db_async();
|
||||||
|
|
||||||
void change_key(DbKey key, Promise<> promise);
|
void change_key(DbKey key, Promise<> promise);
|
||||||
|
|
||||||
void with_db_path(const std::function<void(CSlice)> &callback);
|
void with_db_path(const std::function<void(CSlice)> &callback);
|
||||||
@ -168,6 +174,9 @@ class TdDb {
|
|||||||
std::shared_ptr<DialogDbSyncSafeInterface> dialog_db_sync_safe_;
|
std::shared_ptr<DialogDbSyncSafeInterface> dialog_db_sync_safe_;
|
||||||
std::shared_ptr<DialogDbAsyncInterface> dialog_db_async_;
|
std::shared_ptr<DialogDbAsyncInterface> dialog_db_async_;
|
||||||
|
|
||||||
|
std::shared_ptr<StoryDbSyncSafeInterface> story_db_sync_safe_;
|
||||||
|
std::shared_ptr<StoryDbAsyncInterface> story_db_async_;
|
||||||
|
|
||||||
std::shared_ptr<BinlogKeyValue<ConcurrentBinlog>> binlog_pmc_;
|
std::shared_ptr<BinlogKeyValue<ConcurrentBinlog>> binlog_pmc_;
|
||||||
std::shared_ptr<BinlogKeyValue<ConcurrentBinlog>> config_pmc_;
|
std::shared_ptr<BinlogKeyValue<ConcurrentBinlog>> config_pmc_;
|
||||||
std::shared_ptr<ConcurrentBinlog> binlog_;
|
std::shared_ptr<ConcurrentBinlog> binlog_;
|
||||||
|
Loading…
Reference in New Issue
Block a user