Support deletion from database messages by sender_dialog_id.

This commit is contained in:
levlam 2021-11-19 16:00:21 +03:00
parent 5232774d89
commit 6941a4500a
7 changed files with 43 additions and 41 deletions

View File

@ -54,13 +54,13 @@ class MessagesDbBench final : public td::Benchmark {
for (int j = 0; j < 20; j++) {
auto message_id = td::MessageId{td::ServerMessageId{message_id_raw + j}};
auto unique_message_id = td::ServerMessageId{i + 1};
auto sender_user_id = td::UserId(static_cast<td::int64>(td::Random::fast(1, 1000)));
auto sender_dialog_id = td::DialogId(td::UserId(static_cast<td::int64>(td::Random::fast(1, 1000))));
auto random_id = i + 1;
auto ttl_expires_at = 0;
auto data = td::BufferSlice(td::Random::fast(100, 299));
// use async on same thread.
messages_db_async_->add_message({dialog_id, message_id}, unique_message_id, sender_user_id, random_id,
messages_db_async_->add_message({dialog_id, message_id}, unique_message_id, sender_dialog_id, random_id,
ttl_expires_at, 0, 0, "", td::NotificationId(), td::MessageId(),
std::move(data), td::Promise<>());
}

View File

@ -191,7 +191,7 @@ class MessagesDbImpl final : public MessagesDbSyncInterface {
db_.get_statement("DELETE FROM messages WHERE dialog_id = ?1 AND message_id = ?2"));
TRY_RESULT_ASSIGN(delete_all_dialog_messages_stmt_,
db_.get_statement("DELETE FROM messages WHERE dialog_id = ?1 AND message_id <= ?2"));
TRY_RESULT_ASSIGN(delete_dialog_messages_from_user_stmt_,
TRY_RESULT_ASSIGN(delete_dialog_messages_by_sender_stmt_,
db_.get_statement("DELETE FROM messages WHERE dialog_id = ?1 AND sender_user_id == ?2"));
TRY_RESULT_ASSIGN(
@ -292,7 +292,7 @@ class MessagesDbImpl final : public MessagesDbSyncInterface {
return Status::OK();
}
Status add_message(FullMessageId full_message_id, ServerMessageId unique_message_id, UserId sender_user_id,
Status add_message(FullMessageId full_message_id, ServerMessageId unique_message_id, DialogId sender_dialog_id,
int64 random_id, int32 ttl_expires_at, int32 index_mask, int64 search_id, string text,
NotificationId notification_id, MessageId top_thread_message_id, BufferSlice data) final {
LOG(INFO) << "Add " << full_message_id << " to database";
@ -312,8 +312,8 @@ class MessagesDbImpl final : public MessagesDbSyncInterface {
add_message_stmt_.bind_null(3).ensure();
}
if (sender_user_id.is_valid()) {
add_message_stmt_.bind_int64(4, sender_user_id.get()).ensure();
if (sender_dialog_id.is_valid()) {
add_message_stmt_.bind_int64(4, sender_dialog_id.get()).ensure();
} else {
add_message_stmt_.bind_null(4).ensure();
}
@ -438,16 +438,16 @@ class MessagesDbImpl final : public MessagesDbSyncInterface {
return status;
}
Status delete_dialog_messages_from_user(DialogId dialog_id, UserId sender_user_id) final {
LOG(INFO) << "Delete all messages in " << dialog_id << " sent by " << sender_user_id << " from database";
Status delete_dialog_messages_by_sender(DialogId dialog_id, DialogId sender_dialog_id) final {
LOG(INFO) << "Delete all messages in " << dialog_id << " sent by " << sender_dialog_id << " from database";
CHECK(dialog_id.is_valid());
CHECK(sender_user_id.is_valid());
CHECK(sender_dialog_id.is_valid());
SCOPE_EXIT {
delete_dialog_messages_from_user_stmt_.reset();
delete_dialog_messages_by_sender_stmt_.reset();
};
delete_dialog_messages_from_user_stmt_.bind_int64(1, dialog_id.get()).ensure();
delete_dialog_messages_from_user_stmt_.bind_int64(2, sender_user_id.get()).ensure();
delete_dialog_messages_from_user_stmt_.step().ensure();
delete_dialog_messages_by_sender_stmt_.bind_int64(1, dialog_id.get()).ensure();
delete_dialog_messages_by_sender_stmt_.bind_int64(2, sender_dialog_id.get()).ensure();
delete_dialog_messages_by_sender_stmt_.step().ensure();
return Status::OK();
}
@ -857,7 +857,7 @@ class MessagesDbImpl final : public MessagesDbSyncInterface {
SqliteStatement delete_message_stmt_;
SqliteStatement delete_all_dialog_messages_stmt_;
SqliteStatement delete_dialog_messages_from_user_stmt_;
SqliteStatement delete_dialog_messages_by_sender_stmt_;
SqliteStatement get_message_stmt_;
SqliteStatement get_message_by_random_id_stmt_;
@ -1017,11 +1017,11 @@ class MessagesDbAsync final : public MessagesDbAsyncInterface {
impl_ = create_actor_on_scheduler<Impl>("MessagesDbActor", scheduler_id, std::move(sync_db));
}
void add_message(FullMessageId full_message_id, ServerMessageId unique_message_id, UserId sender_user_id,
void add_message(FullMessageId full_message_id, ServerMessageId unique_message_id, DialogId sender_dialog_id,
int64 random_id, int32 ttl_expires_at, int32 index_mask, int64 search_id, string text,
NotificationId notification_id, MessageId top_thread_message_id, BufferSlice data,
Promise<> promise) final {
send_closure_later(impl_, &Impl::add_message, full_message_id, unique_message_id, sender_user_id, random_id,
send_closure_later(impl_, &Impl::add_message, full_message_id, unique_message_id, sender_dialog_id, random_id,
ttl_expires_at, index_mask, search_id, std::move(text), notification_id, top_thread_message_id,
std::move(data), std::move(promise));
}
@ -1035,8 +1035,8 @@ class MessagesDbAsync final : public MessagesDbAsyncInterface {
void delete_all_dialog_messages(DialogId dialog_id, MessageId from_message_id, Promise<> promise) final {
send_closure_later(impl_, &Impl::delete_all_dialog_messages, dialog_id, from_message_id, std::move(promise));
}
void delete_dialog_messages_from_user(DialogId dialog_id, UserId sender_user_id, Promise<> promise) final {
send_closure_later(impl_, &Impl::delete_dialog_messages_from_user, dialog_id, sender_user_id, std::move(promise));
void delete_dialog_messages_by_sender(DialogId dialog_id, DialogId sender_dialog_id, Promise<> promise) final {
send_closure_later(impl_, &Impl::delete_dialog_messages_by_sender, dialog_id, sender_dialog_id, std::move(promise));
}
void get_message(FullMessageId full_message_id, Promise<MessagesDbDialogMessage> promise) final {
@ -1098,15 +1098,15 @@ class MessagesDbAsync final : public MessagesDbAsyncInterface {
public:
explicit Impl(std::shared_ptr<MessagesDbSyncSafeInterface> sync_db_safe) : sync_db_safe_(std::move(sync_db_safe)) {
}
void add_message(FullMessageId full_message_id, ServerMessageId unique_message_id, UserId sender_user_id,
void add_message(FullMessageId full_message_id, ServerMessageId unique_message_id, DialogId sender_dialog_id,
int64 random_id, int32 ttl_expires_at, int32 index_mask, int64 search_id, string text,
NotificationId notification_id, MessageId top_thread_message_id, BufferSlice data,
Promise<> promise) {
add_write_query([this, full_message_id, unique_message_id, sender_user_id, random_id, ttl_expires_at, index_mask,
search_id, text = std::move(text), notification_id, top_thread_message_id,
add_write_query([this, full_message_id, unique_message_id, sender_dialog_id, random_id, ttl_expires_at,
index_mask, search_id, text = std::move(text), notification_id, top_thread_message_id,
data = std::move(data), promise = std::move(promise)](Unit) mutable {
on_write_result(std::move(promise),
sync_db_->add_message(full_message_id, unique_message_id, sender_user_id, random_id,
sync_db_->add_message(full_message_id, unique_message_id, sender_dialog_id, random_id,
ttl_expires_at, index_mask, search_id, std::move(text), notification_id,
top_thread_message_id, std::move(data)));
});
@ -1131,9 +1131,9 @@ class MessagesDbAsync final : public MessagesDbAsyncInterface {
add_read_query();
promise.set_result(sync_db_->delete_all_dialog_messages(dialog_id, from_message_id));
}
void delete_dialog_messages_from_user(DialogId dialog_id, UserId sender_user_id, Promise<> promise) {
void delete_dialog_messages_by_sender(DialogId dialog_id, DialogId sender_dialog_id, Promise<> promise) {
add_read_query();
promise.set_result(sync_db_->delete_dialog_messages_from_user(dialog_id, sender_user_id));
promise.set_result(sync_db_->delete_dialog_messages_by_sender(dialog_id, sender_dialog_id));
}
void get_message(FullMessageId full_message_id, Promise<MessagesDbDialogMessage> promise) {

View File

@ -105,14 +105,15 @@ class MessagesDbSyncInterface {
MessagesDbSyncInterface &operator=(const MessagesDbSyncInterface &) = delete;
virtual ~MessagesDbSyncInterface() = default;
virtual Status add_message(FullMessageId full_message_id, ServerMessageId unique_message_id, UserId sender_user_id,
int64 random_id, int32 ttl_expires_at, int32 index_mask, int64 search_id, string text,
NotificationId notification_id, MessageId top_thread_message_id, BufferSlice data) = 0;
virtual Status add_message(FullMessageId full_message_id, ServerMessageId unique_message_id,
DialogId sender_dialog_id, int64 random_id, int32 ttl_expires_at, int32 index_mask,
int64 search_id, string text, NotificationId notification_id,
MessageId top_thread_message_id, BufferSlice data) = 0;
virtual Status add_scheduled_message(FullMessageId full_message_id, BufferSlice data) = 0;
virtual Status delete_message(FullMessageId full_message_id) = 0;
virtual Status delete_all_dialog_messages(DialogId dialog_id, MessageId from_message_id) = 0;
virtual Status delete_dialog_messages_from_user(DialogId dialog_id, UserId sender_user_id) = 0;
virtual Status delete_dialog_messages_by_sender(DialogId dialog_id, DialogId sender_dialog_id) = 0;
virtual Result<MessagesDbDialogMessage> get_message(FullMessageId full_message_id) = 0;
virtual Result<MessagesDbMessage> get_message_by_unique_message_id(ServerMessageId unique_message_id) = 0;
@ -158,7 +159,7 @@ class MessagesDbAsyncInterface {
MessagesDbAsyncInterface &operator=(const MessagesDbAsyncInterface &) = delete;
virtual ~MessagesDbAsyncInterface() = default;
virtual void add_message(FullMessageId full_message_id, ServerMessageId unique_message_id, UserId sender_user_id,
virtual void add_message(FullMessageId full_message_id, ServerMessageId unique_message_id, DialogId sender_dialog_id,
int64 random_id, int32 ttl_expires_at, int32 index_mask, int64 search_id, string text,
NotificationId notification_id, MessageId top_thread_message_id, BufferSlice data,
Promise<> promise) = 0;
@ -166,7 +167,7 @@ class MessagesDbAsyncInterface {
virtual void delete_message(FullMessageId full_message_id, Promise<> promise) = 0;
virtual void delete_all_dialog_messages(DialogId dialog_id, MessageId from_message_id, Promise<> promise) = 0;
virtual void delete_dialog_messages_from_user(DialogId dialog_id, UserId sender_user_id, Promise<> promise) = 0;
virtual void delete_dialog_messages_by_sender(DialogId dialog_id, DialogId sender_dialog_id, Promise<> promise) = 0;
virtual void get_message(FullMessageId full_message_id, Promise<MessagesDbDialogMessage> promise) = 0;
virtual void get_message_by_unique_message_id(ServerMessageId unique_message_id,

View File

@ -10810,9 +10810,9 @@ void MessagesManager::delete_dialog_messages_by_sender(DialogId dialog_id, Dialo
return promise.set_value(Unit());
}
if (G()->parameters().use_message_db && sender_dialog_id.get_type() == DialogType::User) {
if (G()->parameters().use_message_db) {
LOG(INFO) << "Delete all messages from " << sender_dialog_id << " in " << dialog_id << " from database";
G()->td_db()->get_messages_db_async()->delete_dialog_messages_from_user(dialog_id, sender_dialog_id.get_user_id(),
G()->td_db()->get_messages_db_async()->delete_dialog_messages_by_sender(dialog_id, sender_dialog_id,
Auto()); // TODO Promise
}
@ -10843,7 +10843,7 @@ void MessagesManager::delete_dialog_messages_by_sender(DialogId dialog_id, Dialo
delete_all_channel_messages_by_sender_on_server(channel_id, sender_dialog_id, 0, std::move(promise));
}
class MessagesManager::DeleteAllChannelMessagesFromUserOnServerLogEvent {
class MessagesManager::DeleteAllChannelMessagesFromSenderOnServerLogEvent {
public:
ChannelId channel_id_;
DialogId sender_dialog_id_;
@ -10869,8 +10869,8 @@ class MessagesManager::DeleteAllChannelMessagesFromUserOnServerLogEvent {
uint64 MessagesManager::save_delete_all_channel_messages_by_sender_on_server_log_event(ChannelId channel_id,
DialogId sender_dialog_id) {
DeleteAllChannelMessagesFromUserOnServerLogEvent log_event{channel_id, sender_dialog_id};
return binlog_add(G()->td_db()->get_binlog(), LogEvent::HandlerType::DeleteAllChannelMessagesFromUserOnServer,
DeleteAllChannelMessagesFromSenderOnServerLogEvent log_event{channel_id, sender_dialog_id};
return binlog_add(G()->td_db()->get_binlog(), LogEvent::HandlerType::DeleteAllChannelMessagesFromSenderOnServer,
get_log_event_storer(log_event));
}
@ -33447,7 +33447,8 @@ void MessagesManager::add_message_to_database(const Dialog *d, const Message *m,
if (m->ttl_period != 0 && (ttl_expires_at == 0 || m->date + m->ttl_period < ttl_expires_at)) {
ttl_expires_at = m->date + m->ttl_period;
}
G()->td_db()->get_messages_db_async()->add_message({d->dialog_id, message_id}, unique_message_id, m->sender_user_id,
auto sender_dialog_id = m->sender_dialog_id.is_valid() ? m->sender_dialog_id : DialogId(m->sender_user_id);
G()->td_db()->get_messages_db_async()->add_message({d->dialog_id, message_id}, unique_message_id, sender_dialog_id,
random_id, ttl_expires_at, get_message_index_mask(d->dialog_id, m),
search_id, text, m->notification_id, m->top_thread_message_id,
log_event_store(*m),
@ -37389,13 +37390,13 @@ void MessagesManager::on_binlog_events(vector<BinlogEvent> &&events) {
Auto());
break;
}
case LogEvent::HandlerType::DeleteAllChannelMessagesFromUserOnServer: {
case LogEvent::HandlerType::DeleteAllChannelMessagesFromSenderOnServer: {
if (!G()->parameters().use_chat_info_db) {
binlog_erase(G()->td_db()->get_binlog(), event.id_);
break;
}
DeleteAllChannelMessagesFromUserOnServerLogEvent log_event;
DeleteAllChannelMessagesFromSenderOnServerLogEvent log_event;
log_event_parse(log_event, event.data_).ensure();
auto channel_id = log_event.channel_id_;

View File

@ -1646,7 +1646,7 @@ class MessagesManager final : public Actor {
class BlockMessageSenderFromRepliesOnServerLogEvent;
class DeleteAllCallMessagesOnServerLogEvent;
class DeleteAllChannelMessagesFromUserOnServerLogEvent;
class DeleteAllChannelMessagesFromSenderOnServerLogEvent;
class DeleteDialogHistoryOnServerLogEvent;
class DeleteDialogMessagesByDateOnServerLogEvent;
class DeleteMessageLogEvent;

View File

@ -100,7 +100,7 @@ Status init_binlog(Binlog &binlog, string path, BinlogKeyValue<Binlog> &binlog_p
case LogEvent::HandlerType::SendInlineQueryResultMessage:
case LogEvent::HandlerType::DeleteDialogHistoryOnServer:
case LogEvent::HandlerType::ReadAllDialogMentionsOnServer:
case LogEvent::HandlerType::DeleteAllChannelMessagesFromUserOnServer:
case LogEvent::HandlerType::DeleteAllChannelMessagesFromSenderOnServer:
case LogEvent::HandlerType::ToggleDialogIsPinnedOnServer:
case LogEvent::HandlerType::ReorderPinnedDialogsOnServer:
case LogEvent::HandlerType::SaveDialogDraftMessageOnServer:

View File

@ -81,7 +81,7 @@ class LogEvent {
SendInlineQueryResultMessage = 0x108,
DeleteDialogHistoryOnServer = 0x109,
ReadAllDialogMentionsOnServer = 0x10a,
DeleteAllChannelMessagesFromUserOnServer = 0x10b,
DeleteAllChannelMessagesFromSenderOnServer = 0x10b,
ToggleDialogIsPinnedOnServer = 0x10c,
ReorderPinnedDialogsOnServer = 0x10d,
SaveDialogDraftMessageOnServer = 0x10e,