Add scheduled messages database table.

GitOrigin-RevId: 6007af6d26f1c4577fd3e5f1613e5f89d1610fdc
This commit is contained in:
levlam 2019-12-02 03:33:35 +03:00
parent 7b53d64492
commit 1a0bb5f56f
4 changed files with 85 additions and 29 deletions

View File

@ -94,9 +94,18 @@ Status init_messages_db(SqliteDb &db, int32 version) {
return Status::OK();
};
auto add_notification_id_index = [&db]() {
return db.exec(
"CREATE INDEX IF NOT EXISTS message_by_notification_id ON messages (dialog_id, notification_id) WHERE "
"notification_id IS NOT NULL");
};
auto add_scheduled_messages_table = [&db]() {
TRY_STATUS(
db.exec("CREATE INDEX IF NOT EXISTS message_by_notification_id ON messages (dialog_id, notification_id) "
"WHERE notification_id IS NOT NULL"));
db.exec("CREATE TABLE IF NOT EXISTS scheduled_messages (dialog_id INT8, message_id INT8, "
"server_message_id INT4, data BLOB, PRIMARY KEY (dialog_id, message_id))"));
TRY_STATUS(
db.exec("CREATE INDEX IF NOT EXISTS message_by_server_message_id ON scheduled_messages "
"(dialog_id, server_message_id) WHERE server_message_id IS NOT NULL"));
return Status::OK();
};
@ -128,6 +137,8 @@ Status init_messages_db(SqliteDb &db, int32 version) {
TRY_STATUS(add_notification_id_index());
TRY_STATUS(add_scheduled_messages_table());
version = current_db_version();
}
if (version < static_cast<int32>(DbVersion::MessagesDbMediaIndex)) {
@ -149,6 +160,9 @@ Status init_messages_db(SqliteDb &db, int32 version) {
TRY_STATUS(db.exec("ALTER TABLE messages ADD COLUMN notification_id INT4"));
TRY_STATUS(add_notification_id_index());
}
if (version < static_cast<int32>(DbVersion::AddScheduledMessages)) {
TRY_STATUS(add_scheduled_messages_table());
}
return Status::OK();
}
@ -229,6 +243,11 @@ class MessagesDbImpl : public MessagesDbSyncInterface {
<< (1 << i) << ") != 0 ORDER BY unique_message_id DESC LIMIT ?2"));
}
TRY_RESULT_ASSIGN(add_scheduled_message_stmt_,
db_.get_statement("INSERT OR REPLACE INTO scheduled_messages VALUES(?1, ?2, ?3, ?4)"));
TRY_RESULT_ASSIGN(delete_scheduled_message_stmt_,
db_.get_statement("DELETE FROM scheduled_messages WHERE dialog_id = ?1 AND message_id = ?2"));
// LOG(ERROR) << get_message_stmt_.explain().ok();
// LOG(ERROR) << get_messages_from_notification_id_stmt.explain().ok();
// LOG(ERROR) << get_message_by_random_id_stmt_.explain().ok();
@ -318,17 +337,43 @@ class MessagesDbImpl : public MessagesDbSyncInterface {
return Status::OK();
}
Status add_scheduled_message(FullMessageId full_message_id, BufferSlice data) override {
LOG(INFO) << "Add " << full_message_id << " to database";
auto dialog_id = full_message_id.get_dialog_id();
auto message_id = full_message_id.get_message_id();
CHECK(dialog_id.is_valid());
CHECK(message_id.is_valid_scheduled());
SCOPE_EXIT {
add_scheduled_message_stmt_.reset();
};
add_scheduled_message_stmt_.bind_int64(1, dialog_id.get()).ensure();
add_scheduled_message_stmt_.bind_int64(2, message_id.get()).ensure();
if (message_id.is_scheduled_server()) {
add_scheduled_message_stmt_.bind_int32(3, message_id.get_scheduled_server_message_id().get()).ensure();
} else {
add_scheduled_message_stmt_.bind_null(3).ensure();
}
add_scheduled_message_stmt_.bind_blob(4, data.as_slice()).ensure();
add_scheduled_message_stmt_.step().ensure();
return Status::OK();
}
Status delete_message(FullMessageId full_message_id) override {
auto dialog_id = full_message_id.get_dialog_id();
auto message_id = full_message_id.get_message_id();
CHECK(dialog_id.is_valid());
CHECK(message_id.is_valid());
CHECK(message_id.is_valid() || message_id.is_valid_scheduled());
auto &stmt = message_id.is_scheduled() ? delete_scheduled_message_stmt_ : delete_message_stmt_;
SCOPE_EXIT {
delete_message_stmt_.reset();
stmt.reset();
};
delete_message_stmt_.bind_int64(1, dialog_id.get()).ensure();
delete_message_stmt_.bind_int64(2, message_id.get()).ensure();
delete_message_stmt_.step().ensure();
stmt.bind_int64(1, dialog_id.get()).ensure();
stmt.bind_int64(2, message_id.get()).ensure();
stmt.step().ensure();
return Status::OK();
}
@ -732,6 +777,9 @@ class MessagesDbImpl : public MessagesDbSyncInterface {
SqliteStatement get_messages_fts_stmt_;
SqliteStatement add_scheduled_message_stmt_;
SqliteStatement delete_scheduled_message_stmt_;
Result<std::vector<BufferSlice>> get_messages_impl(GetMessagesStmt &stmt, DialogId dialog_id,
MessageId from_message_id, int32 offset, int32 limit) {
LOG_CHECK(dialog_id.is_valid()) << dialog_id;
@ -863,6 +911,9 @@ class MessagesDbAsync : public MessagesDbAsyncInterface {
ttl_expires_at, index_mask, search_id, std::move(text), notification_id, std::move(data),
std::move(promise));
}
void add_scheduled_message(FullMessageId full_message_id, BufferSlice data, Promise<> promise) override {
send_closure_later(impl_, &Impl::add_scheduled_message, full_message_id, std::move(data), std::move(promise));
}
void delete_message(FullMessageId full_message_id, Promise<> promise) override {
send_closure_later(impl_, &Impl::delete_message, full_message_id, std::move(promise));
@ -933,6 +984,11 @@ class MessagesDbAsync : public MessagesDbAsyncInterface {
index_mask, search_id, std::move(text), notification_id, std::move(data)));
});
}
void add_scheduled_message(FullMessageId full_message_id, BufferSlice data, Promise<> promise) {
add_write_query([this, full_message_id, promise = std::move(promise), data = std::move(data)](Unit) mutable {
this->on_write_result(std::move(promise), sync_db_->add_scheduled_message(full_message_id, std::move(data)));
});
}
void delete_message(FullMessageId full_message_id, Promise<> promise) {
add_write_query([=, promise = std::move(promise)](Unit) mutable {

View File

@ -91,6 +91,7 @@ class MessagesDbSyncInterface {
virtual Status add_message(FullMessageId full_message_id, ServerMessageId unique_message_id, UserId sender_user_id,
int64 random_id, int32 ttl_expires_at, int32 index_mask, int64 search_id, string text,
NotificationId notification_id, BufferSlice data) = 0;
virtual Status add_scheduled_message(FullMessageId full_message_id, BufferSlice data) = 0;
virtual Status delete_message(FullMessageId full_message_id) = 0;
virtual Status delete_all_dialog_messages(DialogId dialog_id, MessageId from_message_id) = 0;
@ -137,6 +138,7 @@ class MessagesDbAsyncInterface {
virtual void add_message(FullMessageId full_message_id, ServerMessageId unique_message_id, UserId sender_user_id,
int64 random_id, int32 ttl_expires_at, int32 index_mask, int64 search_id, string text,
NotificationId notification_id, BufferSlice data, Promise<> promise) = 0;
virtual void add_scheduled_message(FullMessageId full_message_id, BufferSlice data, Promise<> promise) = 0;
virtual void delete_message(FullMessageId full_message_id, Promise<> promise) = 0;
virtual void delete_all_dialog_messages(DialogId dialog_id, MessageId from_message_id, Promise<> promise) = 0;

View File

@ -590,8 +590,7 @@ class GetCommonDialogsQuery : public Td::ResultHandler {
auto chats_ptr = result_ptr.move_as_ok();
LOG(INFO) << "Receive result for GetCommonDialogsQuery: " << to_string(chats_ptr);
int32 constructor_id = chats_ptr->get_id();
switch (constructor_id) {
switch (chats_ptr->get_id()) {
case telegram_api::messages_chats::ID: {
auto chats = move_tl_object_as<telegram_api::messages_chats>(chats_ptr);
td->messages_manager_->on_get_common_dialogs(user_id_, offset_chat_id_, std::move(chats->chats_),
@ -10646,17 +10645,17 @@ MessagesManager::MessageInfo MessagesManager::parse_telegram_api_message(
tl_object_ptr<telegram_api::Message> message_ptr, bool is_scheduled, const char *source) const {
LOG(DEBUG) << "Receive from " << source << " " << to_string(message_ptr);
LOG_CHECK(message_ptr != nullptr) << source;
int32 constructor_id = message_ptr->get_id();
MessageInfo message_info;
switch (constructor_id) {
message_info.message_id = get_message_id(message_ptr, is_scheduled);
switch (message_ptr->get_id()) {
case telegram_api::messageEmpty::ID:
message_info.message_id = MessageId();
break;
case telegram_api::message::ID: {
auto message = move_tl_object_as<telegram_api::message>(message_ptr);
message_info.dialog_id = DialogId(message->to_id_);
message_info.message_id = get_message_id(message_ptr, is_scheduled);
if (message->flags_ & MESSAGE_FLAG_HAS_FROM_ID) {
message_info.sender_user_id = UserId(message->from_id_);
}
@ -10703,7 +10702,6 @@ MessagesManager::MessageInfo MessagesManager::parse_telegram_api_message(
auto message = move_tl_object_as<telegram_api::messageService>(message_ptr);
message_info.dialog_id = DialogId(message->to_id_);
message_info.message_id = get_message_id(message_ptr, is_scheduled);
if (message->flags_ & MESSAGE_FLAG_HAS_FROM_ID) {
message_info.sender_user_id = UserId(message->from_id_);
}
@ -16284,8 +16282,8 @@ void MessagesManager::on_get_dialog_message_by_date_success(DialogId dialog_id,
continue;
}
if (message_date != 0 && message_date <= date) {
result = on_get_message(std::move(message), false, dialog_id.get_type() == DialogType::Channel, false, false, false,
"on_get_dialog_message_by_date_success");
result = on_get_message(std::move(message), false, dialog_id.get_type() == DialogType::Channel, false, false,
false, "on_get_dialog_message_by_date_success");
if (result != FullMessageId()) {
const Dialog *d = get_dialog(dialog_id);
CHECK(d != nullptr);
@ -24306,7 +24304,8 @@ tl_object_ptr<td_api::ChatEventAction> MessagesManager::get_chat_event_action_ob
case telegram_api::channelAdminLogEventActionUpdatePinned::ID: {
auto action = move_tl_object_as<telegram_api::channelAdminLogEventActionUpdatePinned>(action_ptr);
auto message = create_message(
parse_telegram_api_message(std::move(action->message_), false, "channelAdminLogEventActionUpdatePinned"), true);
parse_telegram_api_message(std::move(action->message_), false, "channelAdminLogEventActionUpdatePinned"),
true);
if (message.second == nullptr) {
return make_tl_object<td_api::chatEventMessageUnpinned>();
}
@ -24315,12 +24314,12 @@ tl_object_ptr<td_api::ChatEventAction> MessagesManager::get_chat_event_action_ob
}
case telegram_api::channelAdminLogEventActionEditMessage::ID: {
auto action = move_tl_object_as<telegram_api::channelAdminLogEventActionEditMessage>(action_ptr);
auto old_message = create_message(
parse_telegram_api_message(std::move(action->prev_message_), false, "prev channelAdminLogEventActionEditMessage"),
true);
auto new_message = create_message(
parse_telegram_api_message(std::move(action->new_message_), false, "new channelAdminLogEventActionEditMessage"),
true);
auto old_message = create_message(parse_telegram_api_message(std::move(action->prev_message_), false,
"prev channelAdminLogEventActionEditMessage"),
true);
auto new_message = create_message(parse_telegram_api_message(std::move(action->new_message_), false,
"new channelAdminLogEventActionEditMessage"),
true);
if (old_message.second == nullptr || new_message.second == nullptr || old_message.first != new_message.first) {
LOG(ERROR) << "Failed to get edited message";
return nullptr;
@ -24347,7 +24346,8 @@ tl_object_ptr<td_api::ChatEventAction> MessagesManager::get_chat_event_action_ob
case telegram_api::channelAdminLogEventActionDeleteMessage::ID: {
auto action = move_tl_object_as<telegram_api::channelAdminLogEventActionDeleteMessage>(action_ptr);
auto message = create_message(
parse_telegram_api_message(std::move(action->message_), false, "channelAdminLogEventActionDeleteMessage"), true);
parse_telegram_api_message(std::move(action->message_), false, "channelAdminLogEventActionDeleteMessage"),
true);
if (message.second == nullptr) {
LOG(ERROR) << "Failed to get deleted message";
return nullptr;
@ -25462,7 +25462,8 @@ void MessagesManager::add_message_to_database(const Dialog *d, const Message *m,
CHECK(m != nullptr);
MessageId message_id = m->message_id;
if (message_id.is_scheduled()) {
// TODO save scheduled message to database
G()->td_db()->get_messages_db_async()->add_scheduled_message({d->dialog_id, message_id}, log_event_store(*m),
Auto()); // TODO Promise
return;
}
LOG_CHECK(message_id.is_server() || message_id.is_local()) << source;
@ -25715,11 +25716,7 @@ void MessagesManager::do_delete_message_logevent(const DeleteMessageLogEvent &lo
// message may not exist in the dialog
LOG(INFO) << "Delete " << logevent.full_message_id_ << " from database";
if (logevent.full_message_id_.get_message_id().is_scheduled()) {
// TODO delete scheduled message from database
} else {
G()->td_db()->get_messages_db_async()->delete_message(logevent.full_message_id_, std::move(db_promise));
}
G()->td_db()->get_messages_db_async()->delete_message(logevent.full_message_id_, std::move(db_promise));
}
void MessagesManager::attach_message_to_previous(Dialog *d, MessageId message_id, const char *source) {

View File

@ -47,6 +47,7 @@ enum class DbVersion : int32 {
FixFileRemoteLocationKeyBug,
AddNotificationsSupport,
AddFolders,
AddScheduledMessages,
Next
};