Rename MessagesDb to MessageDb.

This commit is contained in:
levlam 2022-11-09 20:35:22 +03:00
parent 71e3011afa
commit 8e32de133a
9 changed files with 314 additions and 316 deletions

View File

@ -384,7 +384,7 @@ set(TDLIB_SOURCE
td/telegram/MessageReaction.cpp
td/telegram/MessageReplyHeader.cpp
td/telegram/MessageReplyInfo.cpp
td/telegram/MessagesDb.cpp
td/telegram/MessageDb.cpp
td/telegram/MessageSearchFilter.cpp
td/telegram/MessageSender.cpp
td/telegram/MessagesManager.cpp
@ -627,7 +627,7 @@ set(TDLIB_SOURCE
td/telegram/MessageReplyHeader.h
td/telegram/MessageReplyInfo.h
td/telegram/MessageThreadInfo.h
td/telegram/MessagesDb.h
td/telegram/MessageDb.h
td/telegram/MessageSearchFilter.h
td/telegram/MessageSender.h
td/telegram/MessagesManager.h

View File

@ -5,8 +5,8 @@
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include "td/telegram/DialogId.h"
#include "td/telegram/MessageDb.h"
#include "td/telegram/MessageId.h"
#include "td/telegram/MessagesDb.h"
#include "td/telegram/NotificationId.h"
#include "td/telegram/ServerMessageId.h"
#include "td/telegram/UserId.h"
@ -36,10 +36,10 @@ static td::Status init_db(td::SqliteDb &db) {
return td::Status::OK();
}
class MessagesDbBench final : public td::Benchmark {
class MessageDbBench final : public td::Benchmark {
public:
td::string get_description() const final {
return "MessagesDb";
return "MessageDb";
}
void start_up() final {
LOG(ERROR) << "START UP";
@ -60,9 +60,9 @@ class MessagesDbBench final : public td::Benchmark {
auto data = td::BufferSlice(td::Random::fast(100, 299));
// use async on same thread.
messages_db_async_->add_message({dialog_id, message_id}, unique_message_id, sender_dialog_id, random_id,
ttl_expires_at, 0, 0, "", td::NotificationId(), td::MessageId(),
std::move(data), td::Promise<>());
message_db_async_->add_message({dialog_id, message_id}, unique_message_id, sender_dialog_id, random_id,
ttl_expires_at, 0, 0, "", td::NotificationId(), td::MessageId(), std::move(data),
td::Promise<>());
}
}
}
@ -71,8 +71,8 @@ class MessagesDbBench final : public td::Benchmark {
{
auto guard = scheduler_->get_main_guard();
sql_connection_.reset();
messages_db_sync_safe_.reset();
messages_db_async_.reset();
message_db_sync_safe_.reset();
message_db_async_.reset();
}
scheduler_->finish();
@ -83,8 +83,8 @@ class MessagesDbBench final : public td::Benchmark {
private:
td::unique_ptr<td::ConcurrentScheduler> scheduler_;
std::shared_ptr<td::SqliteConnectionSafe> sql_connection_;
std::shared_ptr<td::MessagesDbSyncSafeInterface> messages_db_sync_safe_;
std::shared_ptr<td::MessagesDbAsyncInterface> messages_db_async_;
std::shared_ptr<td::MessageDbSyncSafeInterface> message_db_sync_safe_;
std::shared_ptr<td::MessageDbAsyncInterface> message_db_async_;
td::Status do_start_up() {
scheduler_ = td::make_unique<td::ConcurrentScheduler>(1, 0);
@ -98,16 +98,16 @@ class MessagesDbBench final : public td::Benchmark {
db.exec("BEGIN TRANSACTION").ensure();
// version == 0 ==> db will be destroyed
TRY_STATUS(init_messages_db(db, 0));
TRY_STATUS(init_message_db(db, 0));
db.exec("COMMIT TRANSACTION").ensure();
messages_db_sync_safe_ = td::create_messages_db_sync(sql_connection_);
messages_db_async_ = td::create_messages_db_async(messages_db_sync_safe_, 0);
message_db_sync_safe_ = td::create_message_db_sync(sql_connection_);
message_db_async_ = td::create_message_db_async(message_db_sync_safe_, 0);
return td::Status::OK();
}
};
int main() {
SET_VERBOSITY_LEVEL(VERBOSITY_NAME(WARNING));
td::bench(MessagesDbBench());
td::bench(MessageDbBench());
}

View File

@ -4,7 +4,7 @@
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include "td/telegram/MessagesDb.h"
#include "td/telegram/MessageDb.h"
#include "td/telegram/logevent/LogEvent.h"
#include "td/telegram/UserId.h"
@ -38,11 +38,11 @@
namespace td {
static constexpr int32 MESSAGES_DB_INDEX_COUNT = 30;
static constexpr int32 MESSAGES_DB_INDEX_COUNT_OLD = 9;
static constexpr int32 MESSAGE_DB_INDEX_COUNT = 30;
static constexpr int32 MESSAGE_DB_INDEX_COUNT_OLD = 9;
// NB: must happen inside a transaction
Status init_messages_db(SqliteDb &db, int32 version) {
Status init_message_db(SqliteDb &db, int32 version) {
LOG(INFO) << "Init message database " << tag("version", version);
// Check if database exists
@ -50,7 +50,7 @@ Status init_messages_db(SqliteDb &db, int32 version) {
if (!has_table) {
version = 0;
} else if (version < static_cast<int32>(DbVersion::DialogDbCreated) || version > current_db_version()) {
TRY_STATUS(drop_messages_db(db, version));
TRY_STATUS(drop_message_db(db, version));
version = 0;
}
@ -130,7 +130,7 @@ Status init_messages_db(SqliteDb &db, int32 version) {
db.exec("CREATE INDEX IF NOT EXISTS message_by_ttl ON messages "
"(ttl_expires_at) WHERE ttl_expires_at IS NOT NULL"));
TRY_STATUS(add_media_indices(0, MESSAGES_DB_INDEX_COUNT));
TRY_STATUS(add_media_indices(0, MESSAGE_DB_INDEX_COUNT));
TRY_STATUS(add_fts());
@ -142,14 +142,14 @@ Status init_messages_db(SqliteDb &db, int32 version) {
version = current_db_version();
}
if (version < static_cast<int32>(DbVersion::MessagesDbMediaIndex)) {
if (version < static_cast<int32>(DbVersion::MessageDbMediaIndex)) {
TRY_STATUS(db.exec("ALTER TABLE messages ADD COLUMN index_mask INT4"));
TRY_STATUS(add_media_indices(0, MESSAGES_DB_INDEX_COUNT_OLD));
TRY_STATUS(add_media_indices(0, MESSAGE_DB_INDEX_COUNT_OLD));
}
if (version < static_cast<int32>(DbVersion::MessagesDb30MediaIndex)) {
TRY_STATUS(add_media_indices(MESSAGES_DB_INDEX_COUNT_OLD, MESSAGES_DB_INDEX_COUNT));
if (version < static_cast<int32>(DbVersion::MessageDb30MediaIndex)) {
TRY_STATUS(add_media_indices(MESSAGE_DB_INDEX_COUNT_OLD, MESSAGE_DB_INDEX_COUNT));
}
if (version < static_cast<int32>(DbVersion::MessagesDbFts)) {
if (version < static_cast<int32>(DbVersion::MessageDbFts)) {
TRY_STATUS(db.exec("ALTER TABLE messages ADD COLUMN search_id INT8"));
TRY_STATUS(db.exec("ALTER TABLE messages ADD COLUMN text STRING"));
TRY_STATUS(add_fts());
@ -171,15 +171,15 @@ Status init_messages_db(SqliteDb &db, int32 version) {
}
// NB: must happen inside a transaction
Status drop_messages_db(SqliteDb &db, int32 version) {
Status drop_message_db(SqliteDb &db, int32 version) {
LOG(WARNING) << "Drop message database " << tag("version", version)
<< tag("current_db_version", current_db_version());
return db.exec("DROP TABLE IF EXISTS messages");
}
class MessagesDbImpl final : public MessagesDbSyncInterface {
class MessageDbImpl final : public MessageDbSyncInterface {
public:
explicit MessagesDbImpl(SqliteDb db) : db_(std::move(db)) {
explicit MessageDbImpl(SqliteDb db) : db_(std::move(db)) {
init().ensure();
}
@ -229,7 +229,7 @@ class MessagesDbImpl final : public MessagesDbSyncInterface {
"IN (SELECT rowid FROM messages_fts WHERE messages_fts MATCH ?1 AND rowid < ?2 "
"ORDER BY rowid DESC LIMIT ?3) ORDER BY search_id DESC"));
for (int32 i = 0; i < MESSAGES_DB_INDEX_COUNT; i++) {
for (int32 i = 0; i < MESSAGE_DB_INDEX_COUNT; i++) {
TRY_RESULT_ASSIGN(
get_message_ids_stmts_[i],
db_.get_statement(
@ -341,7 +341,7 @@ class MessagesDbImpl final : public MessagesDbSyncInterface {
// add dialog_id to text
text += PSTRING() << " \a" << dialog_id.get();
if (index_mask != 0) {
for (int i = 0; i < MESSAGES_DB_INDEX_COUNT; i++) {
for (int i = 0; i < MESSAGE_DB_INDEX_COUNT; i++) {
if ((index_mask & (1 << i))) {
text += PSTRING() << " \a\a" << i;
}
@ -444,7 +444,7 @@ class MessagesDbImpl final : public MessagesDbSyncInterface {
delete_dialog_messages_by_sender_stmt_.step().ensure();
}
Result<MessagesDbDialogMessage> get_message(FullMessageId full_message_id) final {
Result<MessageDbDialogMessage> get_message(FullMessageId full_message_id) final {
auto dialog_id = full_message_id.get_dialog_id();
auto message_id = full_message_id.get_message_id();
CHECK(dialog_id.is_valid());
@ -477,10 +477,10 @@ class MessagesDbImpl final : public MessagesDbSyncInterface {
LOG_CHECK(received_message_id == message_id)
<< received_message_id << ' ' << message_id << ' ' << get_message_info(received_message_id, data, true).first;
}
return MessagesDbDialogMessage{received_message_id, BufferSlice(data)};
return MessageDbDialogMessage{received_message_id, BufferSlice(data)};
}
Result<MessagesDbMessage> get_message_by_unique_message_id(ServerMessageId unique_message_id) final {
Result<MessageDbMessage> get_message_by_unique_message_id(ServerMessageId unique_message_id) final {
if (!unique_message_id.is_valid()) {
return Status::Error("Invalid unique_message_id");
}
@ -494,10 +494,10 @@ class MessagesDbImpl final : public MessagesDbSyncInterface {
}
DialogId dialog_id(get_message_by_unique_message_id_stmt_.view_int64(0));
MessageId message_id(get_message_by_unique_message_id_stmt_.view_int64(1));
return MessagesDbMessage{dialog_id, message_id, BufferSlice(get_message_by_unique_message_id_stmt_.view_blob(2))};
return MessageDbMessage{dialog_id, message_id, BufferSlice(get_message_by_unique_message_id_stmt_.view_blob(2))};
}
Result<MessagesDbDialogMessage> get_message_by_random_id(DialogId dialog_id, int64 random_id) final {
Result<MessageDbDialogMessage> get_message_by_random_id(DialogId dialog_id, int64 random_id) final {
SCOPE_EXIT {
get_message_by_random_id_stmt_.reset();
};
@ -508,11 +508,11 @@ class MessagesDbImpl final : public MessagesDbSyncInterface {
return Status::Error("Not found");
}
MessageId message_id(get_message_by_random_id_stmt_.view_int64(0));
return MessagesDbDialogMessage{message_id, BufferSlice(get_message_by_random_id_stmt_.view_blob(1))};
return MessageDbDialogMessage{message_id, BufferSlice(get_message_by_random_id_stmt_.view_blob(1))};
}
Result<MessagesDbDialogMessage> get_dialog_message_by_date(DialogId dialog_id, MessageId first_message_id,
MessageId last_message_id, int32 date) final {
Result<MessageDbDialogMessage> get_dialog_message_by_date(DialogId dialog_id, MessageId first_message_id,
MessageId last_message_id, int32 date) final {
int64 left_message_id = first_message_id.get();
int64 right_message_id = last_message_id.get();
LOG_CHECK(left_message_id <= right_message_id) << first_message_id << " " << last_message_id;
@ -573,14 +573,14 @@ class MessagesDbImpl final : public MessagesDbSyncInterface {
return Status::Error("Not found");
}
std::pair<vector<MessagesDbMessage>, int32> get_expiring_messages(int32 expires_from, int32 expires_till,
int32 limit) final {
std::pair<vector<MessageDbMessage>, int32> get_expiring_messages(int32 expires_from, int32 expires_till,
int32 limit) final {
SCOPE_EXIT {
get_expiring_messages_stmt_.reset();
get_expiring_messages_helper_stmt_.reset();
};
vector<MessagesDbMessage> messages;
vector<MessageDbMessage> messages;
// load messages
if (expires_from <= expires_till) {
get_expiring_messages_stmt_.bind_int32(1, expires_from).ensure();
@ -591,7 +591,7 @@ class MessagesDbImpl final : public MessagesDbSyncInterface {
DialogId dialog_id(get_expiring_messages_stmt_.view_int64(0));
MessageId message_id(get_expiring_messages_stmt_.view_int64(1));
BufferSlice data(get_expiring_messages_stmt_.view_blob(2));
messages.push_back(MessagesDbMessage{dialog_id, message_id, std::move(data)});
messages.push_back(MessageDbMessage{dialog_id, message_id, std::move(data)});
get_expiring_messages_stmt_.step().ensure();
}
}
@ -609,7 +609,7 @@ class MessagesDbImpl final : public MessagesDbSyncInterface {
return std::make_pair(std::move(messages), next_expires_till);
}
MessagesDbCalendar get_dialog_message_calendar(MessagesDbDialogCalendarQuery query) final {
MessageDbCalendar get_dialog_message_calendar(MessageDbDialogCalendarQuery query) final {
auto &stmt = get_messages_from_index_stmts_[message_search_filter_index(query.filter)].desc_stmt_;
SCOPE_EXIT {
stmt.reset();
@ -619,7 +619,7 @@ class MessagesDbImpl final : public MessagesDbSyncInterface {
stmt.bind_int64(2, query.from_message_id.get()).ensure();
stmt.bind_int32(3, limit).ensure();
vector<MessagesDbDialogMessage> messages;
vector<MessageDbDialogMessage> messages;
vector<int32> total_counts;
stmt.step().ensure();
int32 current_day = std::numeric_limits<int32>::max();
@ -633,16 +633,16 @@ class MessagesDbImpl final : public MessagesDbSyncInterface {
total_counts.back()++;
} else {
current_day = day;
messages.push_back(MessagesDbDialogMessage{message_id, BufferSlice(data_slice)});
messages.push_back(MessageDbDialogMessage{message_id, BufferSlice(data_slice)});
total_counts.push_back(1);
}
stmt.step().ensure();
}
return MessagesDbCalendar{std::move(messages), std::move(total_counts)};
return MessageDbCalendar{std::move(messages), std::move(total_counts)};
}
Result<MessagesDbMessagePositions> get_dialog_sparse_message_positions(
MessagesDbGetDialogSparseMessagePositionsQuery query) final {
Result<MessageDbMessagePositions> get_dialog_sparse_message_positions(
MessageDbGetDialogSparseMessagePositionsQuery query) final {
auto &stmt = get_message_ids_stmts_[message_search_filter_index(query.filter)];
SCOPE_EXIT {
stmt.reset();
@ -657,7 +657,7 @@ class MessagesDbImpl final : public MessagesDbSyncInterface {
stmt.step().ensure();
}
MessagesDbMessagePositions positions;
MessageDbMessagePositions positions;
int32 limit = min(query.limit, static_cast<int32>(message_ids.size()));
if (limit > 0) {
double delta = static_cast<double>(message_ids.size()) / limit;
@ -668,26 +668,26 @@ class MessagesDbImpl final : public MessagesDbSyncInterface {
auto message_id = message_ids[position];
TRY_RESULT(message, get_message({query.dialog_id, message_id}));
auto date = get_message_info(message).second;
positions.positions.push_back(MessagesDbMessagePosition{position, date, message_id});
positions.positions.push_back(MessageDbMessagePosition{position, date, message_id});
}
}
return positions;
}
vector<MessagesDbDialogMessage> get_messages(MessagesDbMessagesQuery query) final {
vector<MessageDbDialogMessage> get_messages(MessageDbMessagesQuery query) final {
if (query.filter != MessageSearchFilter::Empty) {
return get_messages_from_index(query.dialog_id, query.from_message_id, query.filter, query.offset, query.limit);
}
return get_messages_impl(get_messages_stmt_, query.dialog_id, query.from_message_id, query.offset, query.limit);
}
vector<MessagesDbDialogMessage> get_scheduled_messages(DialogId dialog_id, int32 limit) final {
vector<MessageDbDialogMessage> get_scheduled_messages(DialogId dialog_id, int32 limit) final {
return get_messages_inner(get_scheduled_messages_stmt_, dialog_id, std::numeric_limits<int64>::max(), limit);
}
vector<MessagesDbDialogMessage> get_messages_from_notification_id(DialogId dialog_id,
NotificationId from_notification_id,
int32 limit) final {
vector<MessageDbDialogMessage> get_messages_from_notification_id(DialogId dialog_id,
NotificationId from_notification_id,
int32 limit) final {
auto &stmt = get_messages_from_notification_id_stmt_;
SCOPE_EXIT {
stmt.reset();
@ -696,12 +696,12 @@ class MessagesDbImpl final : public MessagesDbSyncInterface {
stmt.bind_int32(2, from_notification_id.get()).ensure();
stmt.bind_int32(3, limit).ensure();
vector<MessagesDbDialogMessage> result;
vector<MessageDbDialogMessage> result;
stmt.step().ensure();
while (stmt.has_row()) {
auto data_slice = stmt.view_blob(0);
MessageId message_id(stmt.view_int64(1));
result.push_back(MessagesDbDialogMessage{message_id, BufferSlice(data_slice)});
result.push_back(MessageDbDialogMessage{message_id, BufferSlice(data_slice)});
LOG(INFO) << "Load " << message_id << " in " << dialog_id << " from database";
stmt.step().ensure();
}
@ -755,7 +755,7 @@ class MessagesDbImpl final : public MessagesDbSyncInterface {
return sb.as_cslice().str();
}
MessagesDbFtsResult get_messages_fts(MessagesDbFtsQuery query) final {
MessageDbFtsResult get_messages_fts(MessageDbFtsQuery query) final {
SCOPE_EXIT {
get_messages_fts_stmt_.reset();
};
@ -782,7 +782,7 @@ class MessagesDbImpl final : public MessagesDbSyncInterface {
}
stmt.bind_int64(2, query.from_search_id).ensure();
stmt.bind_int32(3, query.limit).ensure();
MessagesDbFtsResult result;
MessageDbFtsResult result;
auto status = stmt.step();
if (status.is_error()) {
LOG(ERROR) << status;
@ -794,19 +794,19 @@ class MessagesDbImpl final : public MessagesDbSyncInterface {
auto data_slice = stmt.view_blob(2);
auto search_id = stmt.view_int64(3);
result.next_search_id = search_id;
result.messages.push_back(MessagesDbMessage{dialog_id, message_id, BufferSlice(data_slice)});
result.messages.push_back(MessageDbMessage{dialog_id, message_id, BufferSlice(data_slice)});
stmt.step().ensure();
}
return result;
}
vector<MessagesDbDialogMessage> get_messages_from_index(DialogId dialog_id, MessageId from_message_id,
MessageSearchFilter filter, int32 offset, int32 limit) {
vector<MessageDbDialogMessage> get_messages_from_index(DialogId dialog_id, MessageId from_message_id,
MessageSearchFilter filter, int32 offset, int32 limit) {
auto &stmt = get_messages_from_index_stmts_[message_search_filter_index(filter)];
return get_messages_impl(stmt, dialog_id, from_message_id, offset, limit);
}
MessagesDbCallsResult get_calls(MessagesDbCallsQuery query) final {
MessageDbCallsResult get_calls(MessageDbCallsQuery query) final {
int32 pos;
if (query.filter == MessageSearchFilter::Call) {
pos = 0;
@ -824,13 +824,13 @@ class MessagesDbImpl final : public MessagesDbSyncInterface {
stmt.bind_int32(1, query.from_unique_message_id).ensure();
stmt.bind_int32(2, query.limit).ensure();
MessagesDbCallsResult result;
MessageDbCallsResult result;
stmt.step().ensure();
while (stmt.has_row()) {
DialogId dialog_id(stmt.view_int64(0));
MessageId message_id(stmt.view_int64(1));
auto data_slice = stmt.view_blob(2);
result.messages.push_back(MessagesDbMessage{dialog_id, message_id, BufferSlice(data_slice)});
result.messages.push_back(MessageDbMessage{dialog_id, message_id, BufferSlice(data_slice)});
stmt.step().ensure();
}
return result;
@ -866,8 +866,8 @@ class MessagesDbImpl final : public MessagesDbSyncInterface {
SqliteStatement get_scheduled_messages_stmt_;
SqliteStatement get_messages_from_notification_id_stmt_;
std::array<SqliteStatement, MESSAGES_DB_INDEX_COUNT> get_message_ids_stmts_;
std::array<GetMessagesStmt, MESSAGES_DB_INDEX_COUNT> get_messages_from_index_stmts_;
std::array<SqliteStatement, MESSAGE_DB_INDEX_COUNT> get_message_ids_stmts_;
std::array<GetMessagesStmt, MESSAGE_DB_INDEX_COUNT> get_messages_from_index_stmts_;
std::array<SqliteStatement, 2> get_calls_stmts_;
SqliteStatement get_messages_fts_stmt_;
@ -878,8 +878,8 @@ class MessagesDbImpl final : public MessagesDbSyncInterface {
SqliteStatement delete_scheduled_message_stmt_;
SqliteStatement delete_scheduled_server_message_stmt_;
static vector<MessagesDbDialogMessage> get_messages_impl(GetMessagesStmt &stmt, DialogId dialog_id,
MessageId from_message_id, int32 offset, int32 limit) {
static vector<MessageDbDialogMessage> get_messages_impl(GetMessagesStmt &stmt, DialogId dialog_id,
MessageId from_message_id, int32 offset, int32 limit) {
LOG_CHECK(dialog_id.is_valid()) << dialog_id;
CHECK(from_message_id.is_valid());
@ -898,8 +898,8 @@ class MessagesDbImpl final : public MessagesDbSyncInterface {
auto right_message_id = message_id - 1;
auto right_cnt = -offset;
vector<MessagesDbDialogMessage> left;
vector<MessagesDbDialogMessage> right;
vector<MessageDbDialogMessage> left;
vector<MessageDbDialogMessage> right;
if (left_cnt != 0) {
if (right_cnt == 1 && false) {
@ -930,8 +930,8 @@ class MessagesDbImpl final : public MessagesDbSyncInterface {
return right;
}
static vector<MessagesDbDialogMessage> get_messages_inner(SqliteStatement &stmt, DialogId dialog_id,
int64 from_message_id, int32 limit) {
static vector<MessageDbDialogMessage> get_messages_inner(SqliteStatement &stmt, DialogId dialog_id,
int64 from_message_id, int32 limit) {
SCOPE_EXIT {
stmt.reset();
};
@ -941,19 +941,19 @@ class MessagesDbImpl final : public MessagesDbSyncInterface {
LOG(INFO) << "Begin to load " << limit << " messages in " << dialog_id << " from " << MessageId(from_message_id)
<< " from database";
vector<MessagesDbDialogMessage> result;
vector<MessageDbDialogMessage> result;
stmt.step().ensure();
while (stmt.has_row()) {
auto data_slice = stmt.view_blob(0);
MessageId message_id(stmt.view_int64(1));
result.push_back(MessagesDbDialogMessage{message_id, BufferSlice(data_slice)});
result.push_back(MessageDbDialogMessage{message_id, BufferSlice(data_slice)});
LOG(INFO) << "Loaded " << message_id << " in " << dialog_id << " from database";
stmt.step().ensure();
}
return result;
}
static std::pair<MessageId, int32> get_message_info(const MessagesDbDialogMessage &message, bool from_data = false) {
static std::pair<MessageId, int32> get_message_info(const MessageDbDialogMessage &message, bool from_data = false) {
return get_message_info(message.message_id, message.data.as_slice(), from_data);
}
@ -984,29 +984,29 @@ class MessagesDbImpl final : public MessagesDbSyncInterface {
}
};
std::shared_ptr<MessagesDbSyncSafeInterface> create_messages_db_sync(
std::shared_ptr<MessageDbSyncSafeInterface> create_message_db_sync(
std::shared_ptr<SqliteConnectionSafe> sqlite_connection) {
class MessagesDbSyncSafe final : public MessagesDbSyncSafeInterface {
class MessageDbSyncSafe final : public MessageDbSyncSafeInterface {
public:
explicit MessagesDbSyncSafe(std::shared_ptr<SqliteConnectionSafe> sqlite_connection)
explicit MessageDbSyncSafe(std::shared_ptr<SqliteConnectionSafe> sqlite_connection)
: lsls_db_([safe_connection = std::move(sqlite_connection)] {
return make_unique<MessagesDbImpl>(safe_connection->get().clone());
return make_unique<MessageDbImpl>(safe_connection->get().clone());
}) {
}
MessagesDbSyncInterface &get() final {
MessageDbSyncInterface &get() final {
return *lsls_db_.get();
}
private:
LazySchedulerLocalStorage<unique_ptr<MessagesDbSyncInterface>> lsls_db_;
LazySchedulerLocalStorage<unique_ptr<MessageDbSyncInterface>> lsls_db_;
};
return std::make_shared<MessagesDbSyncSafe>(std::move(sqlite_connection));
return std::make_shared<MessageDbSyncSafe>(std::move(sqlite_connection));
}
class MessagesDbAsync final : public MessagesDbAsyncInterface {
class MessageDbAsync final : public MessageDbAsyncInterface {
public:
MessagesDbAsync(std::shared_ptr<MessagesDbSyncSafeInterface> sync_db, int32 scheduler_id) {
impl_ = create_actor_on_scheduler<Impl>("MessagesDbActor", scheduler_id, std::move(sync_db));
MessageDbAsync(std::shared_ptr<MessageDbSyncSafeInterface> sync_db, int32 scheduler_id) {
impl_ = create_actor_on_scheduler<Impl>("MessageDbActor", scheduler_id, std::move(sync_db));
}
void add_message(FullMessageId full_message_id, ServerMessageId unique_message_id, DialogId sender_dialog_id,
@ -1031,49 +1031,49 @@ class MessagesDbAsync final : public MessagesDbAsyncInterface {
send_closure_later(impl_, &Impl::delete_dialog_messages_by_sender, dialog_id, sender_dialog_id, std::move(promise));
}
void get_message(FullMessageId full_message_id, Promise<MessagesDbDialogMessage> promise) final {
void get_message(FullMessageId full_message_id, Promise<MessageDbDialogMessage> promise) final {
send_closure_later(impl_, &Impl::get_message, full_message_id, std::move(promise));
}
void get_message_by_unique_message_id(ServerMessageId unique_message_id, Promise<MessagesDbMessage> promise) final {
void get_message_by_unique_message_id(ServerMessageId unique_message_id, Promise<MessageDbMessage> promise) final {
send_closure_later(impl_, &Impl::get_message_by_unique_message_id, unique_message_id, std::move(promise));
}
void get_message_by_random_id(DialogId dialog_id, int64 random_id, Promise<MessagesDbDialogMessage> promise) final {
void get_message_by_random_id(DialogId dialog_id, int64 random_id, Promise<MessageDbDialogMessage> promise) final {
send_closure_later(impl_, &Impl::get_message_by_random_id, dialog_id, random_id, std::move(promise));
}
void get_dialog_message_by_date(DialogId dialog_id, MessageId first_message_id, MessageId last_message_id, int32 date,
Promise<MessagesDbDialogMessage> promise) final {
Promise<MessageDbDialogMessage> promise) final {
send_closure_later(impl_, &Impl::get_dialog_message_by_date, dialog_id, first_message_id, last_message_id, date,
std::move(promise));
}
void get_dialog_message_calendar(MessagesDbDialogCalendarQuery query, Promise<MessagesDbCalendar> promise) final {
void get_dialog_message_calendar(MessageDbDialogCalendarQuery query, Promise<MessageDbCalendar> promise) final {
send_closure_later(impl_, &Impl::get_dialog_message_calendar, std::move(query), std::move(promise));
}
void get_dialog_sparse_message_positions(MessagesDbGetDialogSparseMessagePositionsQuery query,
Promise<MessagesDbMessagePositions> promise) final {
void get_dialog_sparse_message_positions(MessageDbGetDialogSparseMessagePositionsQuery query,
Promise<MessageDbMessagePositions> promise) final {
send_closure_later(impl_, &Impl::get_dialog_sparse_message_positions, std::move(query), std::move(promise));
}
void get_messages(MessagesDbMessagesQuery query, Promise<vector<MessagesDbDialogMessage>> promise) final {
void get_messages(MessageDbMessagesQuery query, Promise<vector<MessageDbDialogMessage>> promise) final {
send_closure_later(impl_, &Impl::get_messages, std::move(query), std::move(promise));
}
void get_scheduled_messages(DialogId dialog_id, int32 limit, Promise<vector<MessagesDbDialogMessage>> promise) final {
void get_scheduled_messages(DialogId dialog_id, int32 limit, Promise<vector<MessageDbDialogMessage>> promise) final {
send_closure_later(impl_, &Impl::get_scheduled_messages, dialog_id, limit, std::move(promise));
}
void get_messages_from_notification_id(DialogId dialog_id, NotificationId from_notification_id, int32 limit,
Promise<vector<MessagesDbDialogMessage>> promise) final {
Promise<vector<MessageDbDialogMessage>> promise) final {
send_closure_later(impl_, &Impl::get_messages_from_notification_id, dialog_id, from_notification_id, limit,
std::move(promise));
}
void get_calls(MessagesDbCallsQuery query, Promise<MessagesDbCallsResult> promise) final {
void get_calls(MessageDbCallsQuery query, Promise<MessageDbCallsResult> promise) final {
send_closure_later(impl_, &Impl::get_calls, std::move(query), std::move(promise));
}
void get_messages_fts(MessagesDbFtsQuery query, Promise<MessagesDbFtsResult> promise) final {
void get_messages_fts(MessageDbFtsQuery query, Promise<MessageDbFtsResult> promise) final {
send_closure_later(impl_, &Impl::get_messages_fts, std::move(query), std::move(promise));
}
void get_expiring_messages(int32 expires_from, int32 expires_till, int32 limit,
Promise<std::pair<vector<MessagesDbMessage>, int32>> promise) final {
Promise<std::pair<vector<MessageDbMessage>, int32>> promise) final {
send_closure_later(impl_, &Impl::get_expiring_messages, expires_from, expires_till, limit, std::move(promise));
}
@ -1088,7 +1088,7 @@ class MessagesDbAsync final : public MessagesDbAsyncInterface {
private:
class Impl final : public Actor {
public:
explicit Impl(std::shared_ptr<MessagesDbSyncSafeInterface> sync_db_safe) : sync_db_safe_(std::move(sync_db_safe)) {
explicit Impl(std::shared_ptr<MessageDbSyncSafeInterface> sync_db_safe) : sync_db_safe_(std::move(sync_db_safe)) {
}
void add_message(FullMessageId full_message_id, ServerMessageId unique_message_id, DialogId sender_dialog_id,
int64 random_id, int32 ttl_expires_at, int32 index_mask, int64 search_id, string text,
@ -1134,58 +1134,58 @@ class MessagesDbAsync final : public MessagesDbAsyncInterface {
promise.set_value(Unit());
}
void get_message(FullMessageId full_message_id, Promise<MessagesDbDialogMessage> promise) {
void get_message(FullMessageId full_message_id, Promise<MessageDbDialogMessage> promise) {
add_read_query();
promise.set_result(sync_db_->get_message(full_message_id));
}
void get_message_by_unique_message_id(ServerMessageId unique_message_id, Promise<MessagesDbMessage> promise) {
void get_message_by_unique_message_id(ServerMessageId unique_message_id, Promise<MessageDbMessage> promise) {
add_read_query();
promise.set_result(sync_db_->get_message_by_unique_message_id(unique_message_id));
}
void get_message_by_random_id(DialogId dialog_id, int64 random_id, Promise<MessagesDbDialogMessage> promise) {
void get_message_by_random_id(DialogId dialog_id, int64 random_id, Promise<MessageDbDialogMessage> promise) {
add_read_query();
promise.set_result(sync_db_->get_message_by_random_id(dialog_id, random_id));
}
void get_dialog_message_by_date(DialogId dialog_id, MessageId first_message_id, MessageId last_message_id,
int32 date, Promise<MessagesDbDialogMessage> promise) {
int32 date, Promise<MessageDbDialogMessage> promise) {
add_read_query();
promise.set_result(sync_db_->get_dialog_message_by_date(dialog_id, first_message_id, last_message_id, date));
}
void get_dialog_message_calendar(MessagesDbDialogCalendarQuery query, Promise<MessagesDbCalendar> promise) {
void get_dialog_message_calendar(MessageDbDialogCalendarQuery query, Promise<MessageDbCalendar> promise) {
add_read_query();
promise.set_value(sync_db_->get_dialog_message_calendar(std::move(query)));
}
void get_dialog_sparse_message_positions(MessagesDbGetDialogSparseMessagePositionsQuery query,
Promise<MessagesDbMessagePositions> promise) {
void get_dialog_sparse_message_positions(MessageDbGetDialogSparseMessagePositionsQuery query,
Promise<MessageDbMessagePositions> promise) {
add_read_query();
promise.set_result(sync_db_->get_dialog_sparse_message_positions(std::move(query)));
}
void get_messages(MessagesDbMessagesQuery query, Promise<vector<MessagesDbDialogMessage>> promise) {
void get_messages(MessageDbMessagesQuery query, Promise<vector<MessageDbDialogMessage>> promise) {
add_read_query();
promise.set_value(sync_db_->get_messages(std::move(query)));
}
void get_scheduled_messages(DialogId dialog_id, int32 limit, Promise<vector<MessagesDbDialogMessage>> promise) {
void get_scheduled_messages(DialogId dialog_id, int32 limit, Promise<vector<MessageDbDialogMessage>> promise) {
add_read_query();
promise.set_value(sync_db_->get_scheduled_messages(dialog_id, limit));
}
void get_messages_from_notification_id(DialogId dialog_id, NotificationId from_notification_id, int32 limit,
Promise<vector<MessagesDbDialogMessage>> promise) {
Promise<vector<MessageDbDialogMessage>> promise) {
add_read_query();
promise.set_value(sync_db_->get_messages_from_notification_id(dialog_id, from_notification_id, limit));
}
void get_calls(MessagesDbCallsQuery query, Promise<MessagesDbCallsResult> promise) {
void get_calls(MessageDbCallsQuery query, Promise<MessageDbCallsResult> promise) {
add_read_query();
promise.set_value(sync_db_->get_calls(std::move(query)));
}
void get_messages_fts(MessagesDbFtsQuery query, Promise<MessagesDbFtsResult> promise) {
void get_messages_fts(MessageDbFtsQuery query, Promise<MessageDbFtsResult> promise) {
add_read_query();
promise.set_value(sync_db_->get_messages_fts(std::move(query)));
}
void get_expiring_messages(int32 expires_from, int32 expires_till, int32 limit,
Promise<std::pair<vector<MessagesDbMessage>, int32>> promise) {
Promise<std::pair<vector<MessageDbMessage>, int32>> promise) {
add_read_query();
promise.set_value(sync_db_->get_expiring_messages(expires_from, expires_till, limit));
}
@ -1199,13 +1199,13 @@ class MessagesDbAsync final : public MessagesDbAsyncInterface {
}
void force_flush() {
LOG(INFO) << "MessagesDb flushed";
LOG(INFO) << "MessageDb flushed";
do_flush();
}
private:
std::shared_ptr<MessagesDbSyncSafeInterface> sync_db_safe_;
MessagesDbSyncInterface *sync_db_ = nullptr;
std::shared_ptr<MessageDbSyncSafeInterface> sync_db_safe_;
MessageDbSyncInterface *sync_db_ = nullptr;
static constexpr size_t MAX_PENDING_QUERIES_COUNT{50};
static constexpr double MAX_PENDING_QUERIES_DELAY{0.01};
@ -1258,9 +1258,9 @@ class MessagesDbAsync final : public MessagesDbAsyncInterface {
ActorOwn<Impl> impl_;
};
std::shared_ptr<MessagesDbAsyncInterface> create_messages_db_async(std::shared_ptr<MessagesDbSyncSafeInterface> sync_db,
int32 scheduler_id) {
return std::make_shared<MessagesDbAsync>(std::move(sync_db), scheduler_id);
std::shared_ptr<MessageDbAsyncInterface> create_message_db_async(std::shared_ptr<MessageDbSyncSafeInterface> sync_db,
int32 scheduler_id) {
return std::make_shared<MessageDbAsync>(std::move(sync_db), scheduler_id);
}
} // namespace td

View File

@ -26,7 +26,7 @@ namespace td {
class SqliteConnectionSafe;
class SqliteDb;
struct MessagesDbMessagesQuery {
struct MessageDbMessagesQuery {
DialogId dialog_id;
MessageSearchFilter filter{MessageSearchFilter::Empty};
MessageId from_message_id;
@ -34,75 +34,75 @@ struct MessagesDbMessagesQuery {
int32 limit{100};
};
struct MessagesDbDialogMessage {
struct MessageDbDialogMessage {
MessageId message_id;
BufferSlice data;
};
struct MessagesDbMessage {
struct MessageDbMessage {
DialogId dialog_id;
MessageId message_id;
BufferSlice data;
};
struct MessagesDbDialogCalendarQuery {
struct MessageDbDialogCalendarQuery {
DialogId dialog_id;
MessageSearchFilter filter{MessageSearchFilter::Empty};
MessageId from_message_id;
int32 tz_offset{0};
};
struct MessagesDbCalendar {
vector<MessagesDbDialogMessage> messages;
struct MessageDbCalendar {
vector<MessageDbDialogMessage> messages;
vector<int32> total_counts;
};
struct MessagesDbGetDialogSparseMessagePositionsQuery {
struct MessageDbGetDialogSparseMessagePositionsQuery {
DialogId dialog_id;
MessageSearchFilter filter{MessageSearchFilter::Empty};
MessageId from_message_id;
int32 limit{0};
};
struct MessagesDbMessagePosition {
struct MessageDbMessagePosition {
int32 position;
int32 date;
MessageId message_id;
};
struct MessagesDbMessagePositions {
struct MessageDbMessagePositions {
int32 total_count{0};
vector<MessagesDbMessagePosition> positions;
vector<MessageDbMessagePosition> positions;
};
struct MessagesDbFtsQuery {
struct MessageDbFtsQuery {
string query;
DialogId dialog_id;
MessageSearchFilter filter{MessageSearchFilter::Empty};
int64 from_search_id{0};
int32 limit{100};
};
struct MessagesDbFtsResult {
vector<MessagesDbMessage> messages;
struct MessageDbFtsResult {
vector<MessageDbMessage> messages;
int64 next_search_id{1};
};
struct MessagesDbCallsQuery {
struct MessageDbCallsQuery {
MessageSearchFilter filter{MessageSearchFilter::Empty};
int32 from_unique_message_id{0};
int32 limit{100};
};
struct MessagesDbCallsResult {
vector<MessagesDbMessage> messages;
struct MessageDbCallsResult {
vector<MessageDbMessage> messages;
};
class MessagesDbSyncInterface {
class MessageDbSyncInterface {
public:
MessagesDbSyncInterface() = default;
MessagesDbSyncInterface(const MessagesDbSyncInterface &) = delete;
MessagesDbSyncInterface &operator=(const MessagesDbSyncInterface &) = delete;
virtual ~MessagesDbSyncInterface() = default;
MessageDbSyncInterface() = default;
MessageDbSyncInterface(const MessageDbSyncInterface &) = delete;
MessageDbSyncInterface &operator=(const MessageDbSyncInterface &) = delete;
virtual ~MessageDbSyncInterface() = default;
virtual void add_message(FullMessageId full_message_id, ServerMessageId unique_message_id, DialogId sender_dialog_id,
int64 random_id, int32 ttl_expires_at, int32 index_mask, int64 search_id, string text,
@ -113,48 +113,48 @@ class MessagesDbSyncInterface {
virtual void delete_all_dialog_messages(DialogId dialog_id, MessageId from_message_id) = 0;
virtual void delete_dialog_messages_by_sender(DialogId dialog_id, DialogId sender_dialog_id) = 0;
virtual Result<MessagesDbDialogMessage> get_message(FullMessageId full_message_id) = 0;
virtual Result<MessagesDbMessage> get_message_by_unique_message_id(ServerMessageId unique_message_id) = 0;
virtual Result<MessagesDbDialogMessage> get_message_by_random_id(DialogId dialog_id, int64 random_id) = 0;
virtual Result<MessagesDbDialogMessage> get_dialog_message_by_date(DialogId dialog_id, MessageId first_message_id,
MessageId last_message_id, int32 date) = 0;
virtual Result<MessageDbDialogMessage> get_message(FullMessageId full_message_id) = 0;
virtual Result<MessageDbMessage> get_message_by_unique_message_id(ServerMessageId unique_message_id) = 0;
virtual Result<MessageDbDialogMessage> get_message_by_random_id(DialogId dialog_id, int64 random_id) = 0;
virtual Result<MessageDbDialogMessage> get_dialog_message_by_date(DialogId dialog_id, MessageId first_message_id,
MessageId last_message_id, int32 date) = 0;
virtual MessagesDbCalendar get_dialog_message_calendar(MessagesDbDialogCalendarQuery query) = 0;
virtual MessageDbCalendar get_dialog_message_calendar(MessageDbDialogCalendarQuery query) = 0;
virtual Result<MessagesDbMessagePositions> get_dialog_sparse_message_positions(
MessagesDbGetDialogSparseMessagePositionsQuery query) = 0;
virtual Result<MessageDbMessagePositions> get_dialog_sparse_message_positions(
MessageDbGetDialogSparseMessagePositionsQuery query) = 0;
virtual vector<MessagesDbDialogMessage> get_messages(MessagesDbMessagesQuery query) = 0;
virtual vector<MessagesDbDialogMessage> get_scheduled_messages(DialogId dialog_id, int32 limit) = 0;
virtual vector<MessagesDbDialogMessage> get_messages_from_notification_id(DialogId dialog_id,
NotificationId from_notification_id,
int32 limit) = 0;
virtual vector<MessageDbDialogMessage> get_messages(MessageDbMessagesQuery query) = 0;
virtual vector<MessageDbDialogMessage> get_scheduled_messages(DialogId dialog_id, int32 limit) = 0;
virtual vector<MessageDbDialogMessage> get_messages_from_notification_id(DialogId dialog_id,
NotificationId from_notification_id,
int32 limit) = 0;
virtual std::pair<vector<MessagesDbMessage>, int32> get_expiring_messages(int32 expires_from, int32 expires_till,
int32 limit) = 0;
virtual MessagesDbCallsResult get_calls(MessagesDbCallsQuery query) = 0;
virtual MessagesDbFtsResult get_messages_fts(MessagesDbFtsQuery query) = 0;
virtual std::pair<vector<MessageDbMessage>, int32> get_expiring_messages(int32 expires_from, int32 expires_till,
int32 limit) = 0;
virtual MessageDbCallsResult get_calls(MessageDbCallsQuery query) = 0;
virtual MessageDbFtsResult get_messages_fts(MessageDbFtsQuery query) = 0;
virtual Status begin_write_transaction() = 0;
virtual Status commit_transaction() = 0;
};
class MessagesDbSyncSafeInterface {
class MessageDbSyncSafeInterface {
public:
MessagesDbSyncSafeInterface() = default;
MessagesDbSyncSafeInterface(const MessagesDbSyncSafeInterface &) = delete;
MessagesDbSyncSafeInterface &operator=(const MessagesDbSyncSafeInterface &) = delete;
virtual ~MessagesDbSyncSafeInterface() = default;
MessageDbSyncSafeInterface() = default;
MessageDbSyncSafeInterface(const MessageDbSyncSafeInterface &) = delete;
MessageDbSyncSafeInterface &operator=(const MessageDbSyncSafeInterface &) = delete;
virtual ~MessageDbSyncSafeInterface() = default;
virtual MessagesDbSyncInterface &get() = 0;
virtual MessageDbSyncInterface &get() = 0;
};
class MessagesDbAsyncInterface {
class MessageDbAsyncInterface {
public:
MessagesDbAsyncInterface() = default;
MessagesDbAsyncInterface(const MessagesDbAsyncInterface &) = delete;
MessagesDbAsyncInterface &operator=(const MessagesDbAsyncInterface &) = delete;
virtual ~MessagesDbAsyncInterface() = default;
MessageDbAsyncInterface() = default;
MessageDbAsyncInterface(const MessageDbAsyncInterface &) = delete;
MessageDbAsyncInterface &operator=(const MessageDbAsyncInterface &) = delete;
virtual ~MessageDbAsyncInterface() = default;
virtual void add_message(FullMessageId full_message_id, ServerMessageId unique_message_id, DialogId sender_dialog_id,
int64 random_id, int32 ttl_expires_at, int32 index_mask, int64 search_id, string text,
@ -166,43 +166,42 @@ class MessagesDbAsyncInterface {
virtual void delete_all_dialog_messages(DialogId dialog_id, MessageId from_message_id, Promise<> promise) = 0;
virtual void delete_dialog_messages_by_sender(DialogId dialog_id, DialogId sender_dialog_id, Promise<> promise) = 0;
virtual void get_message(FullMessageId full_message_id, Promise<MessagesDbDialogMessage> promise) = 0;
virtual void get_message(FullMessageId full_message_id, Promise<MessageDbDialogMessage> promise) = 0;
virtual void get_message_by_unique_message_id(ServerMessageId unique_message_id,
Promise<MessagesDbMessage> promise) = 0;
Promise<MessageDbMessage> promise) = 0;
virtual void get_message_by_random_id(DialogId dialog_id, int64 random_id,
Promise<MessagesDbDialogMessage> promise) = 0;
Promise<MessageDbDialogMessage> promise) = 0;
virtual void get_dialog_message_by_date(DialogId dialog_id, MessageId first_message_id, MessageId last_message_id,
int32 date, Promise<MessagesDbDialogMessage> promise) = 0;
int32 date, Promise<MessageDbDialogMessage> promise) = 0;
virtual void get_dialog_message_calendar(MessagesDbDialogCalendarQuery query,
Promise<MessagesDbCalendar> promise) = 0;
virtual void get_dialog_message_calendar(MessageDbDialogCalendarQuery query, Promise<MessageDbCalendar> promise) = 0;
virtual void get_dialog_sparse_message_positions(MessagesDbGetDialogSparseMessagePositionsQuery query,
Promise<MessagesDbMessagePositions> promise) = 0;
virtual void get_dialog_sparse_message_positions(MessageDbGetDialogSparseMessagePositionsQuery query,
Promise<MessageDbMessagePositions> promise) = 0;
virtual void get_messages(MessagesDbMessagesQuery query, Promise<vector<MessagesDbDialogMessage>> promise) = 0;
virtual void get_messages(MessageDbMessagesQuery query, Promise<vector<MessageDbDialogMessage>> promise) = 0;
virtual void get_scheduled_messages(DialogId dialog_id, int32 limit,
Promise<vector<MessagesDbDialogMessage>> promise) = 0;
Promise<vector<MessageDbDialogMessage>> promise) = 0;
virtual void get_messages_from_notification_id(DialogId dialog_id, NotificationId from_notification_id, int32 limit,
Promise<vector<MessagesDbDialogMessage>> promise) = 0;
Promise<vector<MessageDbDialogMessage>> promise) = 0;
virtual void get_calls(MessagesDbCallsQuery, Promise<MessagesDbCallsResult> promise) = 0;
virtual void get_messages_fts(MessagesDbFtsQuery query, Promise<MessagesDbFtsResult> promise) = 0;
virtual void get_calls(MessageDbCallsQuery, Promise<MessageDbCallsResult> promise) = 0;
virtual void get_messages_fts(MessageDbFtsQuery query, Promise<MessageDbFtsResult> promise) = 0;
virtual void get_expiring_messages(int32 expires_from, int32 expires_till, int32 limit,
Promise<std::pair<vector<MessagesDbMessage>, int32>> promise) = 0;
Promise<std::pair<vector<MessageDbMessage>, int32>> promise) = 0;
virtual void close(Promise<> promise) = 0;
virtual void force_flush() = 0;
};
Status init_messages_db(SqliteDb &db, int version) TD_WARN_UNUSED_RESULT;
Status drop_messages_db(SqliteDb &db, int version) TD_WARN_UNUSED_RESULT;
Status init_message_db(SqliteDb &db, int version) TD_WARN_UNUSED_RESULT;
Status drop_message_db(SqliteDb &db, int version) TD_WARN_UNUSED_RESULT;
std::shared_ptr<MessagesDbSyncSafeInterface> create_messages_db_sync(
std::shared_ptr<MessageDbSyncSafeInterface> create_message_db_sync(
std::shared_ptr<SqliteConnectionSafe> sqlite_connection);
std::shared_ptr<MessagesDbAsyncInterface> create_messages_db_async(std::shared_ptr<MessagesDbSyncSafeInterface> sync_db,
int32 scheduler_id = -1);
std::shared_ptr<MessageDbAsyncInterface> create_message_db_async(std::shared_ptr<MessageDbSyncSafeInterface> sync_db,
int32 scheduler_id = -1);
} // namespace td

View File

@ -34,12 +34,12 @@
#include "td/telegram/Location.h"
#include "td/telegram/logevent/LogEvent.h"
#include "td/telegram/MessageContent.h"
#include "td/telegram/MessageDb.h"
#include "td/telegram/MessageEntity.h"
#include "td/telegram/MessageEntity.hpp"
#include "td/telegram/MessageReaction.h"
#include "td/telegram/MessageReaction.hpp"
#include "td/telegram/MessageReplyInfo.hpp"
#include "td/telegram/MessagesDb.h"
#include "td/telegram/MessageSender.h"
#include "td/telegram/misc.h"
#include "td/telegram/net/DcId.h"
@ -11735,8 +11735,8 @@ void MessagesManager::delete_dialog_messages_by_sender(DialogId dialog_id, Dialo
if (G()->parameters().use_message_db) {
LOG(INFO) << "Delete all messages from " << sender_dialog_id << " in " << dialog_id << " from database";
G()->td_db()->get_messages_db_async()->delete_dialog_messages_by_sender(dialog_id, sender_dialog_id,
Auto()); // TODO Promise
G()->td_db()->get_message_db_async()->delete_dialog_messages_by_sender(dialog_id, sender_dialog_id,
Auto()); // TODO Promise
}
vector<MessageId> message_ids;
@ -14068,15 +14068,15 @@ void MessagesManager::ttl_db_loop(double server_now) {
int32 limit = 50;
LOG(INFO) << "Send ttl_db query " << tag("expires_from", ttl_db_expires_from_)
<< tag("expires_till", ttl_db_expires_till_) << tag("limit", limit);
G()->td_db()->get_messages_db_async()->get_expiring_messages(
G()->td_db()->get_message_db_async()->get_expiring_messages(
ttl_db_expires_from_, ttl_db_expires_till_, limit,
PromiseCreator::lambda(
[actor_id = actor_id(this)](Result<std::pair<std::vector<MessagesDbMessage>, int32>> result) {
[actor_id = actor_id(this)](Result<std::pair<std::vector<MessageDbMessage>, int32>> result) {
send_closure(actor_id, &MessagesManager::ttl_db_on_result, std::move(result), false);
}));
}
void MessagesManager::ttl_db_on_result(Result<std::pair<std::vector<MessagesDbMessage>, int32>> r_result, bool dummy) {
void MessagesManager::ttl_db_on_result(Result<std::pair<std::vector<MessageDbMessage>, int32>> r_result, bool dummy) {
if (G()->close_flag()) {
return;
}
@ -21707,9 +21707,9 @@ void MessagesManager::open_dialog(Dialog *d) {
LOG(INFO) << "Send check has_scheduled_database_messages request";
d->is_has_scheduled_database_messages_checked = true;
G()->td_db()->get_messages_db_async()->get_scheduled_messages(
G()->td_db()->get_message_db_async()->get_scheduled_messages(
dialog_id, 1,
PromiseCreator::lambda([actor_id = actor_id(this), dialog_id](vector<MessagesDbDialogMessage> messages) {
PromiseCreator::lambda([actor_id = actor_id(this), dialog_id](vector<MessageDbDialogMessage> messages) {
if (messages.empty()) {
send_closure(actor_id, &MessagesManager::set_dialog_has_scheduled_database_messages, dialog_id, false);
}
@ -22810,17 +22810,17 @@ td_api::object_ptr<td_api::messageCalendar> MessagesManager::get_dialog_message_
LOG(INFO) << "Get message calendar from database in " << dialog_id << " from " << fixed_from_message_id;
auto new_promise =
PromiseCreator::lambda([random_id, dialog_id, fixed_from_message_id, first_db_message_id, filter,
promise = std::move(promise)](Result<MessagesDbCalendar> r_calendar) mutable {
promise = std::move(promise)](Result<MessageDbCalendar> r_calendar) mutable {
send_closure(G()->messages_manager(), &MessagesManager::on_get_message_calendar_from_database, random_id,
dialog_id, fixed_from_message_id, first_db_message_id, filter, std::move(r_calendar),
std::move(promise));
});
MessagesDbDialogCalendarQuery db_query;
MessageDbDialogCalendarQuery db_query;
db_query.dialog_id = dialog_id;
db_query.filter = filter;
db_query.from_message_id = fixed_from_message_id;
db_query.tz_offset = static_cast<int32>(td_->option_manager_->get_option_integer("utc_time_offset"));
G()->td_db()->get_messages_db_async()->get_dialog_message_calendar(db_query, std::move(new_promise));
G()->td_db()->get_message_db_async()->get_dialog_message_calendar(db_query, std::move(new_promise));
return {};
}
}
@ -22852,7 +22852,7 @@ td_api::object_ptr<td_api::messageCalendar> MessagesManager::get_dialog_message_
void MessagesManager::on_get_message_calendar_from_database(int64 random_id, DialogId dialog_id,
MessageId from_message_id, MessageId first_db_message_id,
MessageSearchFilter filter,
Result<MessagesDbCalendar> r_calendar,
Result<MessageDbCalendar> r_calendar,
Promise<Unit> promise) {
TRY_STATUS_PROMISE(promise, G()->close_status());
@ -23031,18 +23031,18 @@ std::pair<int32, vector<MessageId>> MessagesManager::search_dialog_messages(
<< " and with limit " << limit;
auto new_promise = PromiseCreator::lambda(
[random_id, dialog_id, fixed_from_message_id, first_db_message_id, filter, offset, limit,
promise = std::move(promise)](Result<vector<MessagesDbDialogMessage>> r_messages) mutable {
send_closure(G()->messages_manager(), &MessagesManager::on_search_dialog_messages_db_result, random_id,
promise = std::move(promise)](Result<vector<MessageDbDialogMessage>> r_messages) mutable {
send_closure(G()->messages_manager(), &MessagesManager::on_search_dialog_message_db_result, random_id,
dialog_id, fixed_from_message_id, first_db_message_id, filter, offset, limit,
std::move(r_messages), std::move(promise));
});
MessagesDbMessagesQuery db_query;
MessageDbMessagesQuery db_query;
db_query.dialog_id = dialog_id;
db_query.filter = filter;
db_query.from_message_id = fixed_from_message_id;
db_query.offset = offset;
db_query.limit = limit;
G()->td_db()->get_messages_db_async()->get_messages(db_query, std::move(new_promise));
G()->td_db()->get_message_db_async()->get_messages(db_query, std::move(new_promise));
return result;
}
}
@ -23136,15 +23136,15 @@ std::pair<int32, vector<FullMessageId>> MessagesManager::search_call_messages(Me
if (first_db_message_id < fixed_from_message_id && message_count != -1) {
LOG(INFO) << "Search messages in database from " << fixed_from_message_id << " and with limit " << limit;
MessagesDbCallsQuery db_query;
MessageDbCallsQuery db_query;
db_query.filter = filter;
db_query.from_unique_message_id = fixed_from_message_id.get_server_message_id().get();
db_query.limit = limit;
G()->td_db()->get_messages_db_async()->get_calls(
G()->td_db()->get_message_db_async()->get_calls(
db_query, PromiseCreator::lambda([random_id, first_db_message_id, filter, promise = std::move(promise)](
Result<MessagesDbCallsResult> calls_result) mutable {
send_closure(G()->messages_manager(), &MessagesManager::on_messages_db_calls_result,
std::move(calls_result), random_id, first_db_message_id, filter, std::move(promise));
Result<MessageDbCallsResult> calls_result) mutable {
send_closure(G()->messages_manager(), &MessagesManager::on_message_db_calls_result, std::move(calls_result),
random_id, first_db_message_id, filter, std::move(promise));
}));
return result;
}
@ -23569,11 +23569,11 @@ MessageId MessagesManager::get_first_database_message_id_by_index(const Dialog *
return message_id;
}
void MessagesManager::on_search_dialog_messages_db_result(int64 random_id, DialogId dialog_id,
MessageId from_message_id, MessageId first_db_message_id,
MessageSearchFilter filter, int32 offset, int32 limit,
Result<vector<MessagesDbDialogMessage>> r_messages,
Promise<Unit> promise) {
void MessagesManager::on_search_dialog_message_db_result(int64 random_id, DialogId dialog_id, MessageId from_message_id,
MessageId first_db_message_id, MessageSearchFilter filter,
int32 offset, int32 limit,
Result<vector<MessageDbDialogMessage>> r_messages,
Promise<Unit> promise) {
TRY_STATUS_PROMISE(promise, G()->close_status());
if (r_messages.is_error()) {
@ -23598,7 +23598,7 @@ void MessagesManager::on_search_dialog_messages_db_result(int64 random_id, Dialo
res.reserve(messages.size());
for (auto &message : messages) {
auto m = on_get_message_from_database(d, message, false, "on_search_dialog_messages_db_result");
auto m = on_get_message_from_database(d, message, false, "on_search_dialog_message_db_result");
if (m != nullptr && first_db_message_id <= m->message_id) {
if (filter == MessageSearchFilter::UnreadMention && !m->contains_unread_mention) {
// skip already read by d->last_read_all_mentions_message_id mentions
@ -23626,9 +23626,9 @@ void MessagesManager::on_search_dialog_messages_db_result(int64 random_id, Dialo
if (filter == MessageSearchFilter::UnreadReaction) {
d->unread_reaction_count = message_count;
// update_dialog_mention_notification_count(d);
send_update_chat_unread_reaction_count(d, "on_search_dialog_messages_db_result");
send_update_chat_unread_reaction_count(d, "on_search_dialog_message_db_result");
}
on_dialog_updated(dialog_id, "on_search_dialog_messages_db_result");
on_dialog_updated(dialog_id, "on_search_dialog_message_db_result");
}
it->second.first = message_count;
if (res.empty() && first_db_message_id != MessageId::min() && dialog_id.get_type() != DialogType::SecretChat) {
@ -23693,7 +23693,7 @@ MessagesManager::FoundMessages MessagesManager::offline_search_messages(DialogId
limit = MAX_SEARCH_MESSAGES;
}
MessagesDbFtsQuery fts_query;
MessageDbFtsQuery fts_query;
fts_query.query = query;
fts_query.dialog_id = dialog_id;
fts_query.filter = filter;
@ -23712,19 +23712,19 @@ MessagesManager::FoundMessages MessagesManager::offline_search_messages(DialogId
} while (random_id == 0 || found_fts_messages_.count(random_id) > 0);
found_fts_messages_[random_id]; // reserve place for result
G()->td_db()->get_messages_db_async()->get_messages_fts(
G()->td_db()->get_message_db_async()->get_messages_fts(
std::move(fts_query),
PromiseCreator::lambda([random_id, offset = std::move(offset), limit,
promise = std::move(promise)](Result<MessagesDbFtsResult> fts_result) mutable {
send_closure(G()->messages_manager(), &MessagesManager::on_messages_db_fts_result, std::move(fts_result),
promise = std::move(promise)](Result<MessageDbFtsResult> fts_result) mutable {
send_closure(G()->messages_manager(), &MessagesManager::on_message_db_fts_result, std::move(fts_result),
std::move(offset), limit, random_id, std::move(promise));
}));
return {};
}
void MessagesManager::on_messages_db_fts_result(Result<MessagesDbFtsResult> result, string offset, int32 limit,
int64 random_id, Promise<Unit> &&promise) {
void MessagesManager::on_message_db_fts_result(Result<MessageDbFtsResult> result, string offset, int32 limit,
int64 random_id, Promise<Unit> &&promise) {
if (G()->close_flag()) {
result = Global::request_aborted_error();
}
@ -23740,7 +23740,7 @@ void MessagesManager::on_messages_db_fts_result(Result<MessagesDbFtsResult> resu
res.reserve(fts_result.messages.size());
for (auto &message : fts_result.messages) {
auto m = on_get_message_from_database(message, false, "on_messages_db_fts_result");
auto m = on_get_message_from_database(message, false, "on_message_db_fts_result");
if (m != nullptr) {
res.emplace_back(message.dialog_id, m->message_id);
}
@ -23754,9 +23754,9 @@ void MessagesManager::on_messages_db_fts_result(Result<MessagesDbFtsResult> resu
promise.set_value(Unit());
}
void MessagesManager::on_messages_db_calls_result(Result<MessagesDbCallsResult> result, int64 random_id,
MessageId first_db_message_id, MessageSearchFilter filter,
Promise<Unit> &&promise) {
void MessagesManager::on_message_db_calls_result(Result<MessageDbCallsResult> result, int64 random_id,
MessageId first_db_message_id, MessageSearchFilter filter,
Promise<Unit> &&promise) {
if (G()->close_flag()) {
result = Global::request_aborted_error();
}
@ -23773,7 +23773,7 @@ void MessagesManager::on_messages_db_calls_result(Result<MessagesDbCallsResult>
CHECK(!first_db_message_id.is_scheduled());
res.reserve(calls_result.messages.size());
for (auto &message : calls_result.messages) {
auto m = on_get_message_from_database(message, false, "on_messages_db_calls_result");
auto m = on_get_message_from_database(message, false, "on_message_db_calls_result");
if (m != nullptr && first_db_message_id <= m->message_id) {
res.emplace_back(message.dialog_id, m->message_id);
@ -23885,10 +23885,10 @@ int64 MessagesManager::get_dialog_message_by_date(DialogId dialog_id, int32 date
if (G()->parameters().use_message_db && d->last_database_message_id != MessageId()) {
CHECK(d->first_database_message_id != MessageId());
G()->td_db()->get_messages_db_async()->get_dialog_message_by_date(
G()->td_db()->get_message_db_async()->get_dialog_message_by_date(
dialog_id, d->first_database_message_id, d->last_database_message_id, date,
PromiseCreator::lambda([actor_id = actor_id(this), dialog_id, date, random_id,
promise = std::move(promise)](Result<MessagesDbDialogMessage> result) mutable {
promise = std::move(promise)](Result<MessageDbDialogMessage> result) mutable {
send_closure(actor_id, &MessagesManager::on_get_dialog_message_by_date_from_database, dialog_id, date,
random_id, std::move(result), std::move(promise));
}));
@ -23978,7 +23978,7 @@ void MessagesManager::find_messages_by_date(const Message *m, int32 min_date, in
}
void MessagesManager::on_get_dialog_message_by_date_from_database(DialogId dialog_id, int32 date, int64 random_id,
Result<MessagesDbDialogMessage> result,
Result<MessageDbDialogMessage> result,
Promise<Unit> promise) {
TRY_STATUS_PROMISE(promise, G()->close_status());
@ -24115,7 +24115,7 @@ void MessagesManager::get_dialog_sparse_message_positions(
LOG(INFO) << "Get sparse message positions from database";
auto new_promise =
PromiseCreator::lambda([promise = std::move(promise)](Result<MessagesDbMessagePositions> result) mutable {
PromiseCreator::lambda([promise = std::move(promise)](Result<MessageDbMessagePositions> result) mutable {
TRY_STATUS_PROMISE(promise, G()->close_status());
if (result.is_error()) {
return promise.set_error(result.move_as_error());
@ -24123,17 +24123,17 @@ void MessagesManager::get_dialog_sparse_message_positions(
auto positions = result.move_as_ok();
promise.set_value(td_api::make_object<td_api::messagePositions>(
positions.total_count, transform(positions.positions, [](const MessagesDbMessagePosition &position) {
positions.total_count, transform(positions.positions, [](const MessageDbMessagePosition &position) {
return td_api::make_object<td_api::messagePosition>(position.position, position.message_id.get(),
position.date);
})));
});
MessagesDbGetDialogSparseMessagePositionsQuery db_query;
MessageDbGetDialogSparseMessagePositionsQuery db_query;
db_query.dialog_id = dialog_id;
db_query.filter = filter;
db_query.from_message_id = from_message_id;
db_query.limit = limit;
G()->td_db()->get_messages_db_async()->get_dialog_sparse_message_positions(db_query, std::move(new_promise));
G()->td_db()->get_message_db_async()->get_dialog_sparse_message_positions(db_query, std::move(new_promise));
return;
}
@ -24377,8 +24377,7 @@ unique_ptr<MessagesManager::Message> MessagesManager::parse_message(Dialog *d, M
void MessagesManager::on_get_history_from_database(DialogId dialog_id, MessageId from_message_id,
MessageId old_last_database_message_id, int32 offset, int32 limit,
bool from_the_end, bool only_local,
vector<MessagesDbDialogMessage> &&messages,
Promise<Unit> &&promise) {
vector<MessageDbDialogMessage> &&messages, Promise<Unit> &&promise) {
TRY_STATUS_PROMISE(promise, G()->close_status());
CHECK(-limit < offset && offset <= 0);
CHECK(offset < 0 || from_the_end);
@ -24658,14 +24657,14 @@ void MessagesManager::get_history_from_the_end_impl(const Dialog *d, bool from_d
limit = 10;
}
LOG(INFO) << "Get history from the end of " << dialog_id << " from database from " << source;
MessagesDbMessagesQuery db_query;
MessageDbMessagesQuery db_query;
db_query.dialog_id = dialog_id;
db_query.from_message_id = MessageId::max();
db_query.limit = limit;
G()->td_db()->get_messages_db_async()->get_messages(
G()->td_db()->get_message_db_async()->get_messages(
db_query, PromiseCreator::lambda([dialog_id, old_last_database_message_id = d->last_database_message_id,
only_local, limit, actor_id = actor_id(this), promise = std::move(promise)](
vector<MessagesDbDialogMessage> messages) mutable {
vector<MessageDbDialogMessage> messages) mutable {
send_closure(actor_id, &MessagesManager::on_get_history_from_database, dialog_id, MessageId::max(),
old_last_database_message_id, 0, limit, true, only_local, std::move(messages),
std::move(promise));
@ -24711,16 +24710,16 @@ void MessagesManager::get_history_impl(const Dialog *d, MessageId from_message_i
if (from_database && G()->parameters().use_message_db) {
LOG(INFO) << "Get history in " << dialog_id << " from " << from_message_id << " with offset " << offset
<< " and limit " << limit << " from database";
MessagesDbMessagesQuery db_query;
MessageDbMessagesQuery db_query;
db_query.dialog_id = dialog_id;
db_query.from_message_id = from_message_id;
db_query.offset = offset;
db_query.limit = limit;
G()->td_db()->get_messages_db_async()->get_messages(
G()->td_db()->get_message_db_async()->get_messages(
db_query,
PromiseCreator::lambda([dialog_id, from_message_id, old_last_database_message_id = d->last_database_message_id,
offset, limit, only_local, actor_id = actor_id(this),
promise = std::move(promise)](vector<MessagesDbDialogMessage> messages) mutable {
promise = std::move(promise)](vector<MessageDbDialogMessage> messages) mutable {
send_closure(actor_id, &MessagesManager::on_get_history_from_database, dialog_id, from_message_id,
old_last_database_message_id, offset, limit, false, only_local, std::move(messages),
std::move(promise));
@ -24863,9 +24862,9 @@ void MessagesManager::load_dialog_scheduled_messages(DialogId dialog_id, bool fr
auto &queries = load_scheduled_messages_from_database_queries_[dialog_id];
queries.push_back(std::move(promise));
if (queries.size() == 1) {
G()->td_db()->get_messages_db_async()->get_scheduled_messages(
G()->td_db()->get_message_db_async()->get_scheduled_messages(
dialog_id, 1000,
PromiseCreator::lambda([dialog_id, actor_id = actor_id(this)](vector<MessagesDbDialogMessage> messages) {
PromiseCreator::lambda([dialog_id, actor_id = actor_id(this)](vector<MessageDbDialogMessage> messages) {
send_closure(actor_id, &MessagesManager::on_get_scheduled_messages_from_database, dialog_id,
std::move(messages));
}));
@ -24877,7 +24876,7 @@ void MessagesManager::load_dialog_scheduled_messages(DialogId dialog_id, bool fr
}
void MessagesManager::on_get_scheduled_messages_from_database(DialogId dialog_id,
vector<MessagesDbDialogMessage> &&messages) {
vector<MessageDbDialogMessage> &&messages) {
if (G()->close_flag()) {
auto it = load_scheduled_messages_from_database_queries_.find(dialog_id);
CHECK(it != load_scheduled_messages_from_database_queries_.end());
@ -30461,12 +30460,12 @@ vector<Notification> MessagesManager::get_message_notifications_from_database_fo
return res;
}
vector<MessagesDbDialogMessage> MessagesManager::do_get_message_notifications_from_database_force(
vector<MessageDbDialogMessage> MessagesManager::do_get_message_notifications_from_database_force(
Dialog *d, bool from_mentions, NotificationId from_notification_id, MessageId from_message_id, int32 limit) {
CHECK(G()->parameters().use_message_db);
CHECK(!from_message_id.is_scheduled());
auto *db = G()->td_db()->get_messages_db_sync();
auto *db = G()->td_db()->get_message_db_sync();
if (!from_mentions) {
CHECK(from_message_id > d->last_read_inbox_message_id);
VLOG(notifications) << "Trying to load " << limit << " messages with notifications in "
@ -30478,7 +30477,7 @@ vector<MessagesDbDialogMessage> MessagesManager::do_get_message_notifications_fr
<< d->mention_notification_group.group_id << '/' << d->dialog_id << " from " << from_message_id;
// ignore first_db_message_id, notifications can be nonconsecutive
MessagesDbMessagesQuery db_query;
MessageDbMessagesQuery db_query;
db_query.dialog_id = d->dialog_id;
db_query.filter = MessageSearchFilter::UnreadMention;
db_query.from_message_id = from_message_id;
@ -30582,12 +30581,12 @@ void MessagesManager::do_get_message_notifications_from_database(Dialog *d, bool
auto dialog_id = d->dialog_id;
auto new_promise =
PromiseCreator::lambda([actor_id = actor_id(this), dialog_id, from_mentions, initial_from_notification_id, limit,
promise = std::move(promise)](Result<vector<MessagesDbDialogMessage>> result) mutable {
promise = std::move(promise)](Result<vector<MessageDbDialogMessage>> result) mutable {
send_closure(actor_id, &MessagesManager::on_get_message_notifications_from_database, dialog_id, from_mentions,
initial_from_notification_id, limit, std::move(result), std::move(promise));
});
auto *db = G()->td_db()->get_messages_db_async();
auto *db = G()->td_db()->get_message_db_async();
if (!from_mentions) {
VLOG(notifications) << "Trying to load " << limit << " messages with notifications in " << group_info.group_id
<< '/' << dialog_id << " from " << from_notification_id;
@ -30597,7 +30596,7 @@ void MessagesManager::do_get_message_notifications_from_database(Dialog *d, bool
<< '/' << dialog_id << " from " << from_message_id;
// ignore first_db_message_id, notifications can be nonconsecutive
MessagesDbMessagesQuery db_query;
MessageDbMessagesQuery db_query;
db_query.dialog_id = dialog_id;
db_query.filter = MessageSearchFilter::UnreadMention;
db_query.from_message_id = from_message_id;
@ -30610,7 +30609,7 @@ void MessagesManager::do_get_message_notifications_from_database(Dialog *d, bool
void MessagesManager::on_get_message_notifications_from_database(DialogId dialog_id, bool from_mentions,
NotificationId initial_from_notification_id,
int32 limit,
Result<vector<MessagesDbDialogMessage>> result,
Result<vector<MessageDbDialogMessage>> result,
Promise<vector<Notification>> promise) {
if (G()->close_flag()) {
result = Global::request_aborted_error();
@ -30756,10 +30755,10 @@ void MessagesManager::remove_message_notification(DialogId dialog_id, Notificati
}
if (G()->parameters().use_message_db) {
G()->td_db()->get_messages_db_async()->get_messages_from_notification_id(
G()->td_db()->get_message_db_async()->get_messages_from_notification_id(
dialog_id, NotificationId(notification_id.get() + 1), 1,
PromiseCreator::lambda([dialog_id, from_mentions, notification_id,
actor_id = actor_id(this)](vector<MessagesDbDialogMessage> result) {
actor_id = actor_id(this)](vector<MessageDbDialogMessage> result) {
send_closure(actor_id, &MessagesManager::do_remove_message_notification, dialog_id, from_mentions,
notification_id, std::move(result));
}));
@ -30802,7 +30801,7 @@ void MessagesManager::remove_message_notifications_by_message_ids(DialogId dialo
void MessagesManager::do_remove_message_notification(DialogId dialog_id, bool from_mentions,
NotificationId notification_id,
vector<MessagesDbDialogMessage> result) {
vector<MessageDbDialogMessage> result) {
if (result.empty() || G()->close_flag()) {
return;
}
@ -34893,14 +34892,14 @@ MessagesManager::Message *MessagesManager::get_message_force(Dialog *d, MessageI
LOG(INFO) << "Trying to load " << FullMessageId{d->dialog_id, message_id} << " from database from " << source;
auto r_value = G()->td_db()->get_messages_db_sync()->get_message({d->dialog_id, message_id});
auto r_value = G()->td_db()->get_message_db_sync()->get_message({d->dialog_id, message_id});
if (r_value.is_error()) {
return nullptr;
}
return on_get_message_from_database(d, r_value.ok(), message_id.is_scheduled(), source);
}
MessagesManager::Message *MessagesManager::on_get_message_from_database(const MessagesDbMessage &message,
MessagesManager::Message *MessagesManager::on_get_message_from_database(const MessageDbMessage &message,
bool is_scheduled, const char *source) {
if (message.data.empty()) {
return nullptr;
@ -34932,7 +34931,7 @@ MessagesManager::Message *MessagesManager::on_get_message_from_database(const Me
}
MessagesManager::Message *MessagesManager::on_get_message_from_database(Dialog *d,
const MessagesDbDialogMessage &message,
const MessageDbDialogMessage &message,
bool is_scheduled, const char *source) {
return on_get_message_from_database(d, message.message_id, message.data, is_scheduled, source);
}
@ -36065,8 +36064,8 @@ void MessagesManager::add_message_to_database(const Dialog *d, const Message *m,
if (message_id.is_scheduled()) {
set_dialog_has_scheduled_database_messages(d->dialog_id, true);
G()->td_db()->get_messages_db_async()->add_scheduled_message({d->dialog_id, message_id}, log_event_store(*m),
Auto()); // TODO Promise
G()->td_db()->get_message_db_async()->add_scheduled_message({d->dialog_id, message_id}, log_event_store(*m),
Auto()); // TODO Promise
return;
}
LOG_CHECK(message_id.is_server() || message_id.is_local()) << source;
@ -36108,11 +36107,11 @@ void MessagesManager::add_message_to_database(const Dialog *d, const Message *m,
if (m->ttl_period != 0 && (ttl_expires_at == 0 || m->date + m->ttl_period < ttl_expires_at)) {
ttl_expires_at = m->date + m->ttl_period;
}
G()->td_db()->get_messages_db_async()->add_message({d->dialog_id, message_id}, unique_message_id,
get_message_sender(m), random_id, ttl_expires_at,
get_message_index_mask(d->dialog_id, m), search_id, text,
m->notification_id, m->top_thread_message_id, log_event_store(*m),
Auto()); // TODO Promise
G()->td_db()->get_message_db_async()->add_message({d->dialog_id, message_id}, unique_message_id,
get_message_sender(m), random_id, ttl_expires_at,
get_message_index_mask(d->dialog_id, m), search_id, text,
m->notification_id, m->top_thread_message_id, log_event_store(*m),
Auto()); // TODO Promise
}
void MessagesManager::delete_all_dialog_messages_from_database(Dialog *d, MessageId max_message_id,
@ -36153,8 +36152,8 @@ void MessagesManager::delete_all_dialog_messages_from_database(Dialog *d, Messag
}
}
*/
G()->td_db()->get_messages_db_async()->delete_all_dialog_messages(dialog_id, max_message_id,
Auto()); // TODO Promise
G()->td_db()->get_message_db_async()->delete_all_dialog_messages(dialog_id, max_message_id,
Auto()); // TODO Promise
}
class MessagesManager::DeleteMessageLogEvent {
@ -36375,7 +36374,7 @@ void MessagesManager::do_delete_message_log_event(const DeleteMessageLogEvent &l
// message may not exist in the dialog
LOG(INFO) << "Delete " << log_event.full_message_id_ << " from database";
G()->td_db()->get_messages_db_async()->delete_message(log_event.full_message_id_, std::move(db_promise));
G()->td_db()->get_message_db_async()->delete_message(log_event.full_message_id_, std::move(db_promise));
}
void MessagesManager::attach_message_to_previous(Dialog *d, MessageId message_id, const char *source) {
@ -37010,7 +37009,7 @@ MessagesManager::Dialog *MessagesManager::get_dialog_by_message_id(MessageId mes
if (dialog_id == DialogId()) {
if (G()->parameters().use_message_db) {
auto r_value =
G()->td_db()->get_messages_db_sync()->get_message_by_unique_message_id(message_id.get_server_message_id());
G()->td_db()->get_message_db_sync()->get_message_by_unique_message_id(message_id.get_server_message_id());
if (r_value.is_ok()) {
Message *m = on_get_message_from_database(r_value.ok(), false, "get_dialog_by_message_id");
if (m != nullptr) {
@ -37041,7 +37040,7 @@ MessageId MessagesManager::get_message_id_by_random_id(Dialog *d, int64 random_i
auto it = d->random_id_to_message_id.find(random_id);
if (it == d->random_id_to_message_id.end()) {
if (G()->parameters().use_message_db && d->dialog_id.get_type() == DialogType::SecretChat) {
auto r_value = G()->td_db()->get_messages_db_sync()->get_message_by_random_id(d->dialog_id, random_id);
auto r_value = G()->td_db()->get_message_db_sync()->get_message_by_random_id(d->dialog_id, random_id);
if (r_value.is_ok()) {
debug_add_message_to_dialog_fail_reason_ = "not called";
Message *m = on_get_message_from_database(d, r_value.ok(), false, "get_message_id_by_random_id");

View File

@ -31,11 +31,11 @@
#include "td/telegram/logevent/LogEventHelper.h"
#include "td/telegram/MessageContentType.h"
#include "td/telegram/MessageCopyOptions.h"
#include "td/telegram/MessageDb.h"
#include "td/telegram/MessageId.h"
#include "td/telegram/MessageLinkInfo.h"
#include "td/telegram/MessageReplyHeader.h"
#include "td/telegram/MessageReplyInfo.h"
#include "td/telegram/MessagesDb.h"
#include "td/telegram/MessageSearchFilter.h"
#include "td/telegram/MessageThreadInfo.h"
#include "td/telegram/MessageTtl.h"
@ -2302,7 +2302,7 @@ class MessagesManager final : public Actor {
void on_get_history_from_database(DialogId dialog_id, MessageId from_message_id,
MessageId old_last_database_message_id, int32 offset, int32 limit,
bool from_the_end, bool only_local, vector<MessagesDbDialogMessage> &&messages,
bool from_the_end, bool only_local, vector<MessageDbDialogMessage> &&messages,
Promise<Unit> &&promise);
void get_history_from_the_end(DialogId dialog_id, bool from_database, bool only_local, Promise<Unit> &&promise);
@ -2324,7 +2324,7 @@ class MessagesManager final : public Actor {
void load_dialog_scheduled_messages(DialogId dialog_id, bool from_database, int64 hash, Promise<Unit> &&promise);
void on_get_scheduled_messages_from_database(DialogId dialog_id, vector<MessagesDbDialogMessage> &&messages);
void on_get_scheduled_messages_from_database(DialogId dialog_id, vector<MessageDbDialogMessage> &&messages);
static int32 get_random_y(MessageId message_id);
@ -2475,7 +2475,7 @@ class MessagesManager final : public Actor {
vector<Notification> get_message_notifications_from_database_force(Dialog *d, bool from_mentions, int32 limit);
static vector<MessagesDbDialogMessage> do_get_message_notifications_from_database_force(
static vector<MessageDbDialogMessage> do_get_message_notifications_from_database_force(
Dialog *d, bool from_mentions, NotificationId from_notification_id, MessageId from_message_id, int32 limit);
void do_get_message_notifications_from_database(Dialog *d, bool from_mentions,
@ -2485,11 +2485,11 @@ class MessagesManager final : public Actor {
void on_get_message_notifications_from_database(DialogId dialog_id, bool from_mentions,
NotificationId initial_from_notification_id, int32 limit,
Result<vector<MessagesDbDialogMessage>> result,
Result<vector<MessageDbDialogMessage>> result,
Promise<vector<Notification>> promise);
void do_remove_message_notification(DialogId dialog_id, bool from_mentions, NotificationId notification_id,
vector<MessagesDbDialogMessage> result);
vector<MessageDbDialogMessage> result);
int32 get_dialog_pending_notification_count(const Dialog *d, bool from_mentions) const;
@ -2940,9 +2940,9 @@ class MessagesManager final : public Actor {
void get_message_force_from_server(Dialog *d, MessageId message_id, Promise<Unit> &&promise,
tl_object_ptr<telegram_api::InputMessage> input_message = nullptr);
Message *on_get_message_from_database(const MessagesDbMessage &message, bool is_scheduled, const char *source);
Message *on_get_message_from_database(const MessageDbMessage &message, bool is_scheduled, const char *source);
Message *on_get_message_from_database(Dialog *d, const MessagesDbDialogMessage &message, bool is_scheduled,
Message *on_get_message_from_database(Dialog *d, const MessageDbDialogMessage &message, bool is_scheduled,
const char *source);
Message *on_get_message_from_database(Dialog *d, MessageId message_id, const BufferSlice &value, bool is_scheduled,
@ -2952,7 +2952,7 @@ class MessagesManager final : public Actor {
Promise<Unit> &&promise);
void on_get_dialog_message_by_date_from_database(DialogId dialog_id, int32 date, int64 random_id,
Result<MessagesDbDialogMessage> result, Promise<Unit> promise);
Result<MessageDbDialogMessage> result, Promise<Unit> promise);
std::pair<bool, int32> get_dialog_mute_until(DialogId dialog_id, const Dialog *d) const;
@ -3023,7 +3023,7 @@ class MessagesManager final : public Actor {
void ttl_db_loop_start(double server_now);
void ttl_db_loop(double server_now);
void ttl_db_on_result(Result<std::pair<std::vector<MessagesDbMessage>, int32>> r_result, bool dummy);
void ttl_db_on_result(Result<std::pair<std::vector<MessageDbMessage>, int32>> r_result, bool dummy);
void on_restore_missing_message_after_get_difference(FullMessageId full_message_id, MessageId old_message_id,
Result<Unit> result);
@ -3049,18 +3049,18 @@ class MessagesManager final : public Actor {
void on_get_message_calendar_from_database(int64 random_id, DialogId dialog_id, MessageId from_message_id,
MessageId first_db_message_id, MessageSearchFilter filter,
Result<MessagesDbCalendar> r_calendar, Promise<Unit> promise);
Result<MessageDbCalendar> r_calendar, Promise<Unit> promise);
void on_search_dialog_messages_db_result(int64 random_id, DialogId dialog_id, MessageId from_message_id,
MessageId first_db_message_id, MessageSearchFilter filter, int32 offset,
int32 limit, Result<vector<MessagesDbDialogMessage>> r_messages,
Promise<Unit> promise);
void on_search_dialog_message_db_result(int64 random_id, DialogId dialog_id, MessageId from_message_id,
MessageId first_db_message_id, MessageSearchFilter filter, int32 offset,
int32 limit, Result<vector<MessageDbDialogMessage>> r_messages,
Promise<Unit> promise);
void on_messages_db_fts_result(Result<MessagesDbFtsResult> result, string offset, int32 limit, int64 random_id,
Promise<Unit> &&promise);
void on_message_db_fts_result(Result<MessageDbFtsResult> result, string offset, int32 limit, int64 random_id,
Promise<Unit> &&promise);
void on_messages_db_calls_result(Result<MessagesDbCallsResult> result, int64 random_id, MessageId first_db_message_id,
MessageSearchFilter filter, Promise<Unit> &&promise);
void on_message_db_calls_result(Result<MessageDbCallsResult> result, int64 random_id, MessageId first_db_message_id,
MessageSearchFilter filter, Promise<Unit> &&promise);
void on_load_active_live_location_full_message_ids_from_database(string value);

View File

@ -10,7 +10,7 @@
#include "td/telegram/files/FileDb.h"
#include "td/telegram/Global.h"
#include "td/telegram/logevent/LogEvent.h"
#include "td/telegram/MessagesDb.h"
#include "td/telegram/MessageDb.h"
#include "td/telegram/Td.h"
#include "td/telegram/TdParameters.h"
#include "td/telegram/Version.h"
@ -184,11 +184,11 @@ SqliteKeyValueAsyncInterface *TdDb::get_sqlite_pmc() {
return common_kv_async_.get();
}
MessagesDbSyncInterface *TdDb::get_messages_db_sync() {
return &messages_db_sync_safe_->get();
MessageDbSyncInterface *TdDb::get_message_db_sync() {
return &message_db_sync_safe_->get();
}
MessagesDbAsyncInterface *TdDb::get_messages_db_async() {
return messages_db_async_.get();
MessageDbAsyncInterface *TdDb::get_message_db_async() {
return message_db_async_.get();
}
DialogDbSyncInterface *TdDb::get_dialog_db_sync() {
return &dialog_db_sync_safe_->get();
@ -206,8 +206,8 @@ CSlice TdDb::sqlite_path() const {
void TdDb::flush_all() {
LOG(INFO) << "Flush all databases";
if (messages_db_async_) {
messages_db_async_->force_flush();
if (message_db_async_) {
message_db_async_->force_flush();
}
binlog_->force_flush();
}
@ -249,9 +249,9 @@ void TdDb::do_close(Promise<> on_finished, bool destroy_flag) {
common_kv_async_->close(mpas.get_promise());
}
messages_db_sync_safe_.reset();
if (messages_db_async_) {
messages_db_async_->close(mpas.get_promise());
message_db_sync_safe_.reset();
if (message_db_async_) {
message_db_async_->close(mpas.get_promise());
}
dialog_db_sync_safe_.reset();
@ -320,11 +320,11 @@ Status TdDb::init_sqlite(const TdParameters &parameters, const DbKey &key, const
TRY_STATUS(drop_dialog_db(db, user_version));
}
// init MessagesDb
// init MessageDb
if (use_message_db) {
TRY_STATUS(init_messages_db(db, user_version));
TRY_STATUS(init_message_db(db, user_version));
} else {
TRY_STATUS(drop_messages_db(db, user_version));
TRY_STATUS(drop_message_db(db, user_version));
}
// init filesDb
@ -372,8 +372,8 @@ Status TdDb::init_sqlite(const TdParameters &parameters, const DbKey &key, const
}
if (use_message_db) {
messages_db_sync_safe_ = create_messages_db_sync(sql_connection_);
messages_db_async_ = create_messages_db_async(messages_db_sync_safe_);
message_db_sync_safe_ = create_message_db_sync(sql_connection_);
message_db_async_ = create_message_db_async(message_db_sync_safe_);
}
return Status::OK();

View File

@ -30,9 +30,9 @@ class DialogDbSyncInterface;
class DialogDbSyncSafeInterface;
class DialogDbAsyncInterface;
class FileDbInterface;
class MessagesDbSyncInterface;
class MessagesDbSyncSafeInterface;
class MessagesDbAsyncInterface;
class MessageDbSyncInterface;
class MessageDbSyncSafeInterface;
class MessageDbAsyncInterface;
class SqliteConnectionSafe;
class SqliteKeyValueSafe;
class SqliteKeyValueAsyncInterface;
@ -92,8 +92,8 @@ class TdDb {
void close_all(Promise<> on_finished);
void close_and_destroy_all(Promise<> on_finished);
MessagesDbSyncInterface *get_messages_db_sync();
MessagesDbAsyncInterface *get_messages_db_async();
MessageDbSyncInterface *get_message_db_sync();
MessageDbAsyncInterface *get_message_db_async();
DialogDbSyncInterface *get_dialog_db_sync();
DialogDbAsyncInterface *get_dialog_db_async();
@ -113,8 +113,8 @@ class TdDb {
std::shared_ptr<SqliteKeyValueSafe> common_kv_safe_;
unique_ptr<SqliteKeyValueAsyncInterface> common_kv_async_;
std::shared_ptr<MessagesDbSyncSafeInterface> messages_db_sync_safe_;
std::shared_ptr<MessagesDbAsyncInterface> messages_db_async_;
std::shared_ptr<MessageDbSyncSafeInterface> message_db_sync_safe_;
std::shared_ptr<MessageDbAsyncInterface> message_db_async_;
std::shared_ptr<DialogDbSyncSafeInterface> dialog_db_sync_safe_;
std::shared_ptr<DialogDbAsyncInterface> dialog_db_async_;

View File

@ -62,9 +62,9 @@ enum class Version : int32 {
enum class DbVersion : int32 {
DialogDbCreated = 3,
MessagesDbMediaIndex,
MessagesDb30MediaIndex,
MessagesDbFts,
MessageDbMediaIndex,
MessageDb30MediaIndex,
MessageDbFts,
MessagesCallIndex,
FixFileRemoteLocationKeyBug,
AddNotificationsSupport,