From 18646d6d496ac706a9a4d9fee953b6852becfbc2 Mon Sep 17 00:00:00 2001 From: levlam Date: Mon, 15 May 2023 13:28:23 +0300 Subject: [PATCH] Load expired messages from database in small batches. --- td/telegram/MessageDb.cpp | 33 ++++++---------------- td/telegram/MessageDb.h | 5 ++-- td/telegram/MessagesManager.cpp | 50 ++++++++++++++------------------- td/telegram/MessagesManager.h | 9 ++++-- 4 files changed, 37 insertions(+), 60 deletions(-) diff --git a/td/telegram/MessageDb.cpp b/td/telegram/MessageDb.cpp index d7f2b410e..5baaf42c8 100644 --- a/td/telegram/MessageDb.cpp +++ b/td/telegram/MessageDb.cpp @@ -204,11 +204,9 @@ class MessageDbImpl final : public MessageDbSyncInterface { get_message_by_unique_message_id_stmt_, db_.get_statement("SELECT dialog_id, message_id, data FROM messages WHERE unique_message_id = ?1")); - TRY_RESULT_ASSIGN(get_expiring_messages_stmt_, - db_.get_statement("SELECT dialog_id, message_id, data FROM messages WHERE ttl_expires_at <= ?1")); - TRY_RESULT_ASSIGN(get_expiring_messages_helper_stmt_, - db_.get_statement("SELECT MAX(ttl_expires_at), COUNT(*) FROM (SELECT ttl_expires_at FROM " - "messages WHERE ?1 < ttl_expires_at LIMIT ?2) AS T")); + TRY_RESULT_ASSIGN( + get_expiring_messages_stmt_, + db_.get_statement("SELECT dialog_id, message_id, data FROM messages WHERE ttl_expires_at <= ?1 LIMIT ?2")); TRY_RESULT_ASSIGN(get_messages_stmt_.asc_stmt_, db_.get_statement("SELECT data, message_id FROM messages WHERE dialog_id = ?1 AND message_id > " @@ -283,7 +281,6 @@ class MessageDbImpl final : public MessageDbSyncInterface { // LOG(ERROR) << get_message_by_unique_message_id_stmt_.explain().ok(); // LOG(ERROR) << get_expiring_messages_stmt_.explain().ok(); - // LOG(ERROR) << get_expiring_messages_helper_stmt_.explain().ok(); // LOG(FATAL) << "EXPLAINED"; @@ -571,15 +568,14 @@ class MessageDbImpl final : public MessageDbSyncInterface { return Status::Error("Not found"); } - std::pair, int32> get_expiring_messages(int32 expires_till, int32 limit) final { + vector get_expiring_messages(int32 expires_till, int32 limit) final { SCOPE_EXIT { get_expiring_messages_stmt_.reset(); - get_expiring_messages_helper_stmt_.reset(); }; vector messages; - // load messages get_expiring_messages_stmt_.bind_int32(1, expires_till).ensure(); + get_expiring_messages_stmt_.bind_int32(2, limit).ensure(); get_expiring_messages_stmt_.step().ensure(); while (get_expiring_messages_stmt_.has_row()) { @@ -590,17 +586,7 @@ class MessageDbImpl final : public MessageDbSyncInterface { get_expiring_messages_stmt_.step().ensure(); } - // calc next expires_till - get_expiring_messages_helper_stmt_.bind_int32(1, expires_till).ensure(); - get_expiring_messages_helper_stmt_.bind_int32(2, limit).ensure(); - get_expiring_messages_helper_stmt_.step().ensure(); - CHECK(get_expiring_messages_helper_stmt_.has_row()); - int32 count = get_expiring_messages_helper_stmt_.view_int32(1); - int32 next_expires_till = -1; - if (count != 0) { - next_expires_till = get_expiring_messages_helper_stmt_.view_int32(0); - } - return std::make_pair(std::move(messages), next_expires_till); + return std::move(messages); } MessageDbCalendar get_dialog_message_calendar(MessageDbDialogCalendarQuery query) final { @@ -850,7 +836,6 @@ class MessageDbImpl final : public MessageDbSyncInterface { SqliteStatement get_message_by_random_id_stmt_; SqliteStatement get_message_by_unique_message_id_stmt_; SqliteStatement get_expiring_messages_stmt_; - SqliteStatement get_expiring_messages_helper_stmt_; struct GetMessagesStmt { SqliteStatement asc_stmt_; @@ -1066,8 +1051,7 @@ class MessageDbAsync final : public MessageDbAsyncInterface { void get_messages_fts(MessageDbFtsQuery query, Promise promise) final { send_closure_later(impl_, &Impl::get_messages_fts, std::move(query), std::move(promise)); } - void get_expiring_messages(int32 expires_till, int32 limit, - Promise, int32>> promise) final { + void get_expiring_messages(int32 expires_till, int32 limit, Promise> promise) final { send_closure_later(impl_, &Impl::get_expiring_messages, expires_till, limit, std::move(promise)); } @@ -1178,8 +1162,7 @@ class MessageDbAsync final : public MessageDbAsyncInterface { add_read_query(); promise.set_value(sync_db_->get_messages_fts(std::move(query))); } - void get_expiring_messages(int32 expires_till, int32 limit, - Promise, int32>> promise) { + void get_expiring_messages(int32 expires_till, int32 limit, Promise> promise) { add_read_query(); promise.set_value(sync_db_->get_expiring_messages(expires_till, limit)); } diff --git a/td/telegram/MessageDb.h b/td/telegram/MessageDb.h index 5a71ec58c..8360f39d6 100644 --- a/td/telegram/MessageDb.h +++ b/td/telegram/MessageDb.h @@ -130,7 +130,7 @@ class MessageDbSyncInterface { NotificationId from_notification_id, int32 limit) = 0; - virtual std::pair, int32> get_expiring_messages(int32 expires_till, int32 limit) = 0; + virtual vector get_expiring_messages(int32 expires_till, int32 limit) = 0; virtual MessageDbCallsResult get_calls(MessageDbCallsQuery query) = 0; virtual MessageDbFtsResult get_messages_fts(MessageDbFtsQuery query) = 0; @@ -187,8 +187,7 @@ class MessageDbAsyncInterface { virtual void get_calls(MessageDbCallsQuery, Promise promise) = 0; virtual void get_messages_fts(MessageDbFtsQuery query, Promise promise) = 0; - virtual void get_expiring_messages(int32 expires_till, int32 limit, - Promise, int32>> promise) = 0; + virtual void get_expiring_messages(int32 expires_till, int32 limit, Promise> promise) = 0; virtual void close(Promise<> promise) = 0; virtual void force_flush() = 0; diff --git a/td/telegram/MessagesManager.cpp b/td/telegram/MessagesManager.cpp index c281d4937..c41a44b91 100644 --- a/td/telegram/MessagesManager.cpp +++ b/td/telegram/MessagesManager.cpp @@ -13797,7 +13797,7 @@ void MessagesManager::init() { G()->td_db()->get_binlog_pmc()->erase("dialog_pinned_current_order"); if (G()->use_message_database()) { - ttl_db_loop_start(); + ttl_db_loop(); } load_calls_db_state(); @@ -13838,26 +13838,11 @@ void MessagesManager::on_authorization_success() { create_folders(); } -void MessagesManager::ttl_db_loop_start() { - ttl_db_next_request_time_ = 0; - ttl_db_expires_till_ = G()->unix_time() + 15; - ttl_db_has_query_ = false; - - ttl_db_loop(); -} - void MessagesManager::ttl_db_loop() { - LOG(INFO) << "Begin ttl_db loop: " << tag("expires_till", ttl_db_expires_till_) - << tag("has_query", ttl_db_has_query_); if (ttl_db_has_query_) { return; } - if (ttl_db_expires_till_ < 0) { - LOG(INFO) << "Finish ttl_db loop"; - return; - } - auto now = Time::now(); if (now < ttl_db_next_request_time_) { ttl_db_slot_.set_event(EventCreator::yield(actor_shared(this, YieldType::TtlDb))); @@ -13868,17 +13853,15 @@ void MessagesManager::ttl_db_loop() { } ttl_db_has_query_ = true; - int32 limit = 50; - LOG(INFO) << "Send ttl_db query " << tag("expires_till", ttl_db_expires_till_) << tag("limit", limit); + LOG(INFO) << "Send ttl_db query with limit " << ttl_db_next_limit_; G()->td_db()->get_message_db_async()->get_expiring_messages( - ttl_db_expires_till_, limit, - PromiseCreator::lambda( - [actor_id = actor_id(this)](Result, int32>> result) { - send_closure(actor_id, &MessagesManager::ttl_db_on_result, std::move(result), false); - })); + G()->unix_time() - 1, ttl_db_next_limit_, + PromiseCreator::lambda([actor_id = actor_id(this)](Result> result) { + send_closure(actor_id, &MessagesManager::ttl_db_on_result, std::move(result), false); + })); } -void MessagesManager::ttl_db_on_result(Result, int32>> r_result, bool dummy) { +void MessagesManager::ttl_db_on_result(Result> r_result, bool dummy) { if (G()->close_flag()) { return; } @@ -13886,12 +13869,21 @@ void MessagesManager::ttl_db_on_result(Result, int32>> r_result, bool dummy); + + void ttl_db_on_result(Result> r_result, bool dummy); void on_restore_missing_message_after_get_difference(FullMessageId full_message_id, MessageId old_message_id, Result result); @@ -3263,7 +3266,7 @@ class MessagesManager final : public Actor { enum YieldType : int32 { None, TtlDb }; // None must be first double ttl_db_next_request_time_ = 0; - int32 ttl_db_expires_till_ = 0; + int32 ttl_db_next_limit_ = DEFAULT_LOADED_EXPIRED_MESSAGES; bool ttl_db_has_query_ = false; Slot ttl_db_slot_;