Support automatic deletion of messages by ttl_period.

This commit is contained in:
levlam 2021-02-08 23:41:17 +03:00
parent bf42d62f1e
commit 876932843a
2 changed files with 133 additions and 35 deletions

View File

@ -4837,6 +4837,7 @@ void MessagesManager::Message::store(StorerT &storer) const {
bool has_interaction_info_update_date = interaction_info_update_date != 0;
bool has_send_emoji = !send_emoji.empty();
bool is_imported = is_forwarded && forward_info->is_imported;
bool has_ttl_period = ttl_period != 0;
BEGIN_STORE_FLAGS();
STORE_FLAG(is_channel_post);
STORE_FLAG(is_outgoing);
@ -4897,6 +4898,7 @@ void MessagesManager::Message::store(StorerT &storer) const {
STORE_FLAG(has_interaction_info_update_date);
STORE_FLAG(has_send_emoji);
STORE_FLAG(is_imported);
STORE_FLAG(has_ttl_period); // 25
END_STORE_FLAGS();
}
@ -5009,6 +5011,9 @@ void MessagesManager::Message::store(StorerT &storer) const {
if (has_reply_markup) {
store(reply_markup, storer);
}
if (has_ttl_period) {
store(ttl_period, storer);
}
}
// do not forget to resolve message dependencies
@ -5049,6 +5054,7 @@ void MessagesManager::Message::parse(ParserT &parser) {
bool has_interaction_info_update_date = false;
bool has_send_emoji = false;
bool is_imported = false;
bool has_ttl_period = false;
BEGIN_PARSE_FLAGS();
PARSE_FLAG(is_channel_post);
PARSE_FLAG(is_outgoing);
@ -5109,6 +5115,7 @@ void MessagesManager::Message::parse(ParserT &parser) {
PARSE_FLAG(has_interaction_info_update_date);
PARSE_FLAG(has_send_emoji);
PARSE_FLAG(is_imported);
PARSE_FLAG(has_ttl_period);
END_PARSE_FLAGS();
}
@ -5228,6 +5235,9 @@ void MessagesManager::Message::parse(ParserT &parser) {
if (has_reply_markup) {
parse(reply_markup, parser);
}
if (has_ttl_period) {
parse(ttl_period, parser);
}
is_content_secret |=
is_secret_message_content(ttl, content->get_type()); // repair is_content_secret for old messages
@ -7423,7 +7433,7 @@ void MessagesManager::process_channel_update(tl_object_ptr<telegram_api::Update>
}
auto dialog_id = DialogId(channel_id);
delete_dialog_messages_from_updates(dialog_id, message_ids, false);
delete_dialog_messages(dialog_id, message_ids, true, false);
break;
}
case telegram_api::updateEditChannelMessage::ID: {
@ -9085,7 +9095,7 @@ void MessagesManager::after_get_difference() {
void MessagesManager::on_get_empty_messages(DialogId dialog_id, vector<MessageId> empty_message_ids) {
if (!empty_message_ids.empty()) {
delete_dialog_messages_from_updates(dialog_id, std::move(empty_message_ids), true);
delete_dialog_messages(dialog_id, std::move(empty_message_ids), true, true);
}
}
@ -9903,12 +9913,12 @@ void MessagesManager::delete_messages_from_updates(const vector<MessageId> &mess
}
}
void MessagesManager::delete_dialog_messages_from_updates(DialogId dialog_id, const vector<MessageId> &message_ids,
bool skip_update_for_not_found_messages) {
CHECK(dialog_id.get_type() == DialogType::Channel || dialog_id.get_type() == DialogType::SecretChat);
void MessagesManager::delete_dialog_messages(DialogId dialog_id, const vector<MessageId> &message_ids,
bool from_updates, bool skip_update_for_not_found_messages) {
Dialog *d = get_dialog_force(dialog_id);
if (d == nullptr) {
LOG(INFO) << "Ignore deleteChannelMessages for unknown " << dialog_id;
CHECK(from_updates);
CHECK(dialog_id.get_type() == DialogType::Channel);
return;
}
@ -9916,12 +9926,16 @@ void MessagesManager::delete_dialog_messages_from_updates(DialogId dialog_id, co
vector<int64> deleted_message_ids;
bool need_update_dialog_pos = false;
for (auto message_id : message_ids) {
if (!message_id.is_valid() || (!message_id.is_server() && dialog_id.get_type() != DialogType::SecretChat)) {
LOG(ERROR) << "Incoming update tries to delete " << message_id;
continue;
if (from_updates) {
if (!message_id.is_valid() || (!message_id.is_server() && dialog_id.get_type() != DialogType::SecretChat)) {
LOG(ERROR) << "Incoming update tries to delete " << message_id;
continue;
}
} else {
CHECK(message_id.is_valid());
}
auto message = delete_message(d, message_id, true, &need_update_dialog_pos, "updates");
auto message = delete_message(d, message_id, true, &need_update_dialog_pos, "delete_dialog_messages");
if (message == nullptr) {
if (!skip_update_for_not_found_messages) {
deleted_message_ids.push_back(message_id.get());
@ -9931,7 +9945,7 @@ void MessagesManager::delete_dialog_messages_from_updates(DialogId dialog_id, co
}
}
if (need_update_dialog_pos) {
send_update_chat_last_message(d, "delete_dialog_messages_from_updates");
send_update_chat_last_message(d, "delete_dialog_messages");
}
send_update_delete_messages(dialog_id, std::move(deleted_message_ids), true, false);
}
@ -11904,12 +11918,11 @@ bool MessagesManager::ttl_on_open(Dialog *d, Message *m, double now, bool is_loc
}
void MessagesManager::ttl_register_message(DialogId dialog_id, const Message *m, double now) {
if (m->ttl_expires_at == 0) {
return;
}
CHECK(m != nullptr);
CHECK(m->ttl_expires_at != 0);
CHECK(!m->message_id.is_scheduled());
auto it_flag = ttl_nodes_.insert(TtlNode(dialog_id, m->message_id));
auto it_flag = ttl_nodes_.emplace(dialog_id, m->message_id, false);
CHECK(it_flag.second);
auto it = it_flag.first;
@ -11917,14 +11930,27 @@ void MessagesManager::ttl_register_message(DialogId dialog_id, const Message *m,
ttl_update_timeout(now);
}
void MessagesManager::ttl_unregister_message(DialogId dialog_id, const Message *m, double now, const char *source) {
void MessagesManager::ttl_period_register_message(DialogId dialog_id, const Message *m, int32 unix_time) {
CHECK(m != nullptr);
CHECK(m->ttl_period != 0);
CHECK(!m->message_id.is_scheduled());
auto it_flag = ttl_nodes_.emplace(dialog_id, m->message_id, true);
CHECK(it_flag.second);
auto it = it_flag.first;
auto now = Time::now();
ttl_heap_.insert(now + (m->date + m->ttl_period - unix_time), it->as_heap_node());
ttl_update_timeout(now);
}
void MessagesManager::ttl_unregister_message(DialogId dialog_id, const Message *m, const char *source) {
if (m->ttl_expires_at == 0) {
return;
}
CHECK(!m->message_id.is_scheduled());
TtlNode ttl_node(dialog_id, m->message_id);
auto it = ttl_nodes_.find(ttl_node);
auto it = ttl_nodes_.find(TtlNode(dialog_id, m->message_id, false));
// expect m->ttl == 0, but m->ttl_expires_at > 0 from binlog
LOG_CHECK(it != ttl_nodes_.end()) << dialog_id << " " << m->message_id << " " << source << " " << G()->close_flag()
@ -11936,15 +11962,34 @@ void MessagesManager::ttl_unregister_message(DialogId dialog_id, const Message *
ttl_heap_.erase(heap_node);
}
ttl_nodes_.erase(it);
ttl_update_timeout(now);
ttl_update_timeout(Time::now());
}
void MessagesManager::ttl_period_unregister_message(DialogId dialog_id, const Message *m) {
if (m->ttl_period == 0) {
return;
}
CHECK(!m->message_id.is_scheduled());
auto it = ttl_nodes_.find(TtlNode(dialog_id, m->message_id, true));
CHECK(it != ttl_nodes_.end());
auto *heap_node = it->as_heap_node();
if (heap_node->in_heap()) {
ttl_heap_.erase(heap_node);
}
ttl_nodes_.erase(it);
ttl_update_timeout(Time::now());
}
void MessagesManager::ttl_loop(double now) {
std::unordered_map<DialogId, std::vector<MessageId>, DialogIdHash> to_delete;
while (!ttl_heap_.empty() && ttl_heap_.top_key() < now) {
auto full_message_id = TtlNode::from_heap_node(ttl_heap_.pop())->full_message_id;
TtlNode *ttl_node = TtlNode::from_heap_node(ttl_heap_.pop());
auto full_message_id = ttl_node->full_message_id_;
auto dialog_id = full_message_id.get_dialog_id();
if (dialog_id.get_type() == DialogType::SecretChat) {
if (dialog_id.get_type() == DialogType::SecretChat || ttl_node->by_ttl_period_) {
to_delete[dialog_id].push_back(full_message_id.get_message_id());
} else {
auto d = get_dialog(dialog_id);
@ -11956,7 +12001,7 @@ void MessagesManager::ttl_loop(double now) {
}
}
for (auto &it : to_delete) {
delete_dialog_messages_from_updates(it.first, it.second, false);
delete_dialog_messages(it.first, it.second, false, true);
}
ttl_update_timeout(now);
}
@ -11977,7 +12022,7 @@ void MessagesManager::on_message_ttl_expired(Dialog *d, Message *m) {
CHECK(m != nullptr);
CHECK(m->ttl > 0);
CHECK(d->dialog_id.get_type() != DialogType::SecretChat);
ttl_unregister_message(d->dialog_id, m, Time::now(), "on_message_ttl_expired");
ttl_unregister_message(d->dialog_id, m, "on_message_ttl_expired");
unregister_message_content(td_, m->content.get(), {d->dialog_id, m->message_id}, "on_message_ttl_expired");
remove_message_file_sources(d->dialog_id, m);
on_message_ttl_expired_impl(d, m);
@ -12660,7 +12705,7 @@ void MessagesManager::finish_delete_secret_messages(DialogId dialog_id, std::vec
LOG(INFO) << "Skip deletion of service " << message_id;
}
}
delete_dialog_messages_from_updates(dialog_id, to_delete_message_ids, false);
delete_dialog_messages(dialog_id, to_delete_message_ids, true, false);
}
void MessagesManager::delete_secret_chat_history(SecretChatId secret_chat_id, bool remove_from_dialog_list,
@ -13033,6 +13078,9 @@ MessagesManager::MessageInfo MessagesManager::parse_telegram_api_message(
if (message->flags_ & MESSAGE_FLAG_HAS_MEDIA_ALBUM_ID) {
message_info.media_album_id = message->grouped_id_;
}
if (message->flags_ & MESSAGE_FLAG_HAS_TTL_PERIOD) {
message_info.ttl_period = message->ttl_period_;
}
message_info.flags = message->flags_;
bool is_content_read = (message->flags_ & MESSAGE_FLAG_HAS_UNREAD_CONTENT) == 0;
if (is_message_auto_read(message_info.dialog_id, (message->flags_ & MESSAGE_FLAG_IS_OUT) != 0)) {
@ -13223,9 +13271,15 @@ std::pair<DialogId, unique_ptr<MessagesManager::Message>> MessagesManager::creat
hide_edit_date = false;
}
int32 ttl_period = message_info.ttl_period;
if (ttl_period < 0 || message_id.is_scheduled()) {
LOG(ERROR) << "Wrong TTL period = " << ttl_period << " received in " << message_id << " in " << dialog_id;
ttl_period = 0;
}
int32 ttl = message_info.ttl;
bool is_content_secret = is_secret_message_content(ttl, content_type); // should be calculated before TTL is adjusted
if (ttl < 0) {
if (ttl < 0 || message_id.is_scheduled()) {
LOG(ERROR) << "Wrong TTL = " << ttl << " received in " << message_id << " in " << dialog_id;
ttl = 0;
} else if (ttl > 0) {
@ -13260,6 +13314,7 @@ std::pair<DialogId, unique_ptr<MessagesManager::Message>> MessagesManager::creat
message->sender_user_id = sender_user_id;
message->sender_dialog_id = sender_dialog_id;
message->date = date;
message->ttl_period = ttl_period;
message->ttl = ttl;
message->edit_date = edit_date;
message->random_id = message_info.random_id;
@ -15001,7 +15056,8 @@ void MessagesManager::on_message_deleted(Dialog *d, Message *m, bool is_permanen
default:
UNREACHABLE();
}
ttl_unregister_message(d->dialog_id, m, Time::now(), source);
ttl_unregister_message(d->dialog_id, m, source);
ttl_period_unregister_message(d->dialog_id, m);
unregister_message_content(td_, m->content.get(), {d->dialog_id, m->message_id}, "on_message_deleted");
if (m->notification_id.is_valid()) {
delete_notification_id_to_message_id_correspondence(d, m->notification_id, m->message_id);
@ -23339,10 +23395,10 @@ Result<InputMessageContent> MessagesManager::process_input_message_content(
TRY_RESULT(content, get_input_message_content(dialog_id, std::move(input_message_content), td_));
if (content.ttl < 0 || content.ttl > MAX_PRIVATE_MESSAGE_TTL) {
return Status::Error(10, "Invalid message TTL specified");
return Status::Error(10, "Invalid message content TTL specified");
}
if (content.ttl > 0 && dialog_id.get_type() != DialogType::User) {
return Status::Error(10, "Message TTL can be specified only in private chats");
return Status::Error(10, "Message content TTL can be specified only in private chats");
}
if (dialog_id != DialogId()) {
@ -28495,6 +28551,8 @@ FullMessageId MessagesManager::on_send_message_success(int64 random_id, MessageI
CHECK(d->last_message_id != old_message_id);
}
sent_message->ttl_period = ttl_period;
// reply_to message may be already deleted
// but can't use get_message_force for check, because the message can be already unloaded from the memory
// if (get_message_force(d, sent_message->reply_to_message_id, "on_send_message_success 2") == nullptr) {
@ -31907,6 +31965,19 @@ MessagesManager::Message *MessagesManager::add_message_to_dialog(Dialog *d, uniq
ttl_register_message(dialog_id, message.get(), now);
}
}
if (message->ttl_period > 0) {
CHECK(dialog_id.get_type() != DialogType::SecretChat);
auto unix_time = G()->unix_time();
if (message->date + message->ttl_period <= unix_time) {
LOG(INFO) << "Can't add " << message_id << " with expired TTL period to " << dialog_id << " from " << source;
delete_message_from_database(d, message_id, message.get(), true);
debug_add_message_to_dialog_fail_reason_ = "delete expired by TTL period message";
d->being_added_message_id = MessageId();
return nullptr;
} else {
ttl_period_register_message(dialog_id, message.get(), unix_time);
}
}
LOG(INFO) << "Adding not found " << message_id << " to " << dialog_id << " from " << source;
if (d->is_empty) {
@ -32297,6 +32368,12 @@ MessagesManager::Message *MessagesManager::add_scheduled_message_to_dialog(Dialo
debug_add_message_to_dialog_fail_reason_ = "skip adding secret scheduled message";
return nullptr;
}
if (message->ttl_period != 0) {
LOG(ERROR) << "Tried to add " << message_id << " with TTL period " << message->ttl_period << " to " << dialog_id
<< " from " << source;
debug_add_message_to_dialog_fail_reason_ = "skip adding auto-deleting scheduled message";
return nullptr;
}
if (td_->auth_manager_->is_bot()) {
LOG(ERROR) << "Bot tried to add " << message_id << " to " << dialog_id << " from " << source;
debug_add_message_to_dialog_fail_reason_ = "skip adding scheduled message by bot";
@ -32468,7 +32545,10 @@ void MessagesManager::add_message_to_database(const Dialog *d, const Message *m,
int32 ttl_expires_at = 0;
if (m->ttl_expires_at != 0) {
ttl_expires_at = static_cast<int32>(m->ttl_expires_at - Time::now() + G()->server_time());
ttl_expires_at = static_cast<int32>(m->ttl_expires_at - Time::now() + G()->server_time()) + 1;
}
if (m->ttl_period != 0 && (ttl_expires_at == 0 || m->date + m->ttl_period < ttl_expires_at)) {
ttl_expires_at = m->date + m->ttl_period;
}
G()->td_db()->get_messages_db_async()->add_message({d->dialog_id, message_id}, unique_message_id, m->sender_user_id,
random_id, ttl_expires_at, get_message_index_mask(d->dialog_id, m),
@ -32921,6 +33001,17 @@ bool MessagesManager::update_message(Dialog *d, Message *old_message, unique_ptr
old_message->is_mention_notification_disabled = true;
}
if (old_message->ttl_period != new_message->ttl_period) {
if (old_message->ttl_period != 0 || !message_id.is_yet_unsent()) {
LOG(ERROR) << message_id << " in " << dialog_id << " has changed TTL period from " << old_message->ttl_period
<< " to " << new_message->ttl_period;
} else {
LOG(DEBUG) << "Change message TTL period";
old_message->ttl_period = new_message->ttl_period;
need_send_update = true;
}
}
if (old_message->reply_to_message_id != new_message->reply_to_message_id) {
// Can't check "&& get_message_force(d, old_message->reply_to_message_id, "update_message") == nullptr", because it
// can change message tree and invalidate reference to old_message

View File

@ -117,6 +117,7 @@ class MessagesManager : public Actor {
static constexpr int32 MESSAGE_FLAG_IS_RESTRICTED = 1 << 22;
static constexpr int32 MESSAGE_FLAG_HAS_REPLY_INFO = 1 << 23;
static constexpr int32 MESSAGE_FLAG_IS_PINNED = 1 << 24;
static constexpr int32 MESSAGE_FLAG_HAS_TTL_PERIOD = 1 << 25;
static constexpr int32 SEND_MESSAGE_FLAG_IS_REPLY = 1 << 0;
static constexpr int32 SEND_MESSAGE_FLAG_DISABLE_WEB_PAGE_PREVIEW = 1 << 1;
@ -924,6 +925,7 @@ class MessagesManager : public Actor {
UserId sender_user_id;
DialogId sender_dialog_id;
int32 date = 0;
int32 ttl_period = 0;
int32 ttl = 0;
int64 random_id = 0;
tl_object_ptr<telegram_api::messageFwdHeader> forward_header;
@ -1068,6 +1070,7 @@ class MessagesManager : public Actor {
string send_error_message;
double try_resend_at = 0;
int32 ttl_period = 0;
int32 ttl = 0;
double ttl_expires_at = 0;
@ -1785,8 +1788,8 @@ class MessagesManager : public Actor {
void delete_messages_from_updates(const vector<MessageId> &message_ids);
void delete_dialog_messages_from_updates(DialogId dialog_id, const vector<MessageId> &message_ids,
bool skip_update_for_not_found_messages);
void delete_dialog_messages(DialogId dialog_id, const vector<MessageId> &message_ids, bool from_updates,
bool skip_update_for_not_found_messages);
void update_dialog_pinned_messages_from_updates(DialogId dialog_id, const vector<MessageId> &message_ids,
bool is_pin);
@ -2589,7 +2592,9 @@ class MessagesManager : public Actor {
void ttl_on_view(const Dialog *d, Message *m, double view_date, double now);
bool ttl_on_open(Dialog *d, Message *m, double now, bool is_local_read);
void ttl_register_message(DialogId dialog_id, const Message *m, double now);
void ttl_unregister_message(DialogId dialog_id, const Message *m, double now, const char *source);
void ttl_unregister_message(DialogId dialog_id, const Message *m, const char *source);
void ttl_period_register_message(DialogId dialog_id, const Message *m, int32 unix_time);
void ttl_period_unregister_message(DialogId dialog_id, const Message *m);
void ttl_loop(double now);
void ttl_update_timeout(double now);
@ -2989,10 +2994,12 @@ class MessagesManager : public Actor {
// TTL
class TtlNode : private HeapNode {
public:
TtlNode(DialogId dialog_id, MessageId message_id) : full_message_id(dialog_id, message_id) {
TtlNode(DialogId dialog_id, MessageId message_id, bool by_ttl_period)
: full_message_id_(dialog_id, message_id), by_ttl_period_(by_ttl_period) {
}
FullMessageId full_message_id;
FullMessageId full_message_id_;
bool by_ttl_period_;
HeapNode *as_heap_node() const {
return const_cast<HeapNode *>(static_cast<const HeapNode *>(this));
@ -3002,12 +3009,12 @@ class MessagesManager : public Actor {
}
bool operator==(const TtlNode &other) const {
return full_message_id == other.full_message_id;
return full_message_id_ == other.full_message_id_;
}
};
struct TtlNodeHash {
std::size_t operator()(const TtlNode &ttl_node) const {
return FullMessageIdHash()(ttl_node.full_message_id);
return FullMessageIdHash()(ttl_node.full_message_id_) * 2 + static_cast<size_t>(ttl_node.by_ttl_period_);
}
};
std::unordered_set<TtlNode, TtlNodeHash> ttl_nodes_;