Load expired messages from database in small batches.

This commit is contained in:
levlam 2023-05-15 13:28:23 +03:00
parent a14a07c33d
commit 18646d6d49
4 changed files with 37 additions and 60 deletions

View File

@ -204,11 +204,9 @@ class MessageDbImpl final : public MessageDbSyncInterface {
get_message_by_unique_message_id_stmt_,
db_.get_statement("SELECT dialog_id, message_id, data FROM messages WHERE unique_message_id = ?1"));
TRY_RESULT_ASSIGN(get_expiring_messages_stmt_,
db_.get_statement("SELECT dialog_id, message_id, data FROM messages WHERE ttl_expires_at <= ?1"));
TRY_RESULT_ASSIGN(get_expiring_messages_helper_stmt_,
db_.get_statement("SELECT MAX(ttl_expires_at), COUNT(*) FROM (SELECT ttl_expires_at FROM "
"messages WHERE ?1 < ttl_expires_at LIMIT ?2) AS T"));
TRY_RESULT_ASSIGN(
get_expiring_messages_stmt_,
db_.get_statement("SELECT dialog_id, message_id, data FROM messages WHERE ttl_expires_at <= ?1 LIMIT ?2"));
TRY_RESULT_ASSIGN(get_messages_stmt_.asc_stmt_,
db_.get_statement("SELECT data, message_id FROM messages WHERE dialog_id = ?1 AND message_id > "
@ -283,7 +281,6 @@ class MessageDbImpl final : public MessageDbSyncInterface {
// LOG(ERROR) << get_message_by_unique_message_id_stmt_.explain().ok();
// LOG(ERROR) << get_expiring_messages_stmt_.explain().ok();
// LOG(ERROR) << get_expiring_messages_helper_stmt_.explain().ok();
// LOG(FATAL) << "EXPLAINED";
@ -571,15 +568,14 @@ class MessageDbImpl final : public MessageDbSyncInterface {
return Status::Error("Not found");
}
std::pair<vector<MessageDbMessage>, int32> get_expiring_messages(int32 expires_till, int32 limit) final {
vector<MessageDbMessage> get_expiring_messages(int32 expires_till, int32 limit) final {
SCOPE_EXIT {
get_expiring_messages_stmt_.reset();
get_expiring_messages_helper_stmt_.reset();
};
vector<MessageDbMessage> messages;
// load messages
get_expiring_messages_stmt_.bind_int32(1, expires_till).ensure();
get_expiring_messages_stmt_.bind_int32(2, limit).ensure();
get_expiring_messages_stmt_.step().ensure();
while (get_expiring_messages_stmt_.has_row()) {
@ -590,17 +586,7 @@ class MessageDbImpl final : public MessageDbSyncInterface {
get_expiring_messages_stmt_.step().ensure();
}
// calc next expires_till
get_expiring_messages_helper_stmt_.bind_int32(1, expires_till).ensure();
get_expiring_messages_helper_stmt_.bind_int32(2, limit).ensure();
get_expiring_messages_helper_stmt_.step().ensure();
CHECK(get_expiring_messages_helper_stmt_.has_row());
int32 count = get_expiring_messages_helper_stmt_.view_int32(1);
int32 next_expires_till = -1;
if (count != 0) {
next_expires_till = get_expiring_messages_helper_stmt_.view_int32(0);
}
return std::make_pair(std::move(messages), next_expires_till);
return std::move(messages);
}
MessageDbCalendar get_dialog_message_calendar(MessageDbDialogCalendarQuery query) final {
@ -850,7 +836,6 @@ class MessageDbImpl final : public MessageDbSyncInterface {
SqliteStatement get_message_by_random_id_stmt_;
SqliteStatement get_message_by_unique_message_id_stmt_;
SqliteStatement get_expiring_messages_stmt_;
SqliteStatement get_expiring_messages_helper_stmt_;
struct GetMessagesStmt {
SqliteStatement asc_stmt_;
@ -1066,8 +1051,7 @@ class MessageDbAsync final : public MessageDbAsyncInterface {
void get_messages_fts(MessageDbFtsQuery query, Promise<MessageDbFtsResult> promise) final {
send_closure_later(impl_, &Impl::get_messages_fts, std::move(query), std::move(promise));
}
void get_expiring_messages(int32 expires_till, int32 limit,
Promise<std::pair<vector<MessageDbMessage>, int32>> promise) final {
void get_expiring_messages(int32 expires_till, int32 limit, Promise<vector<MessageDbMessage>> promise) final {
send_closure_later(impl_, &Impl::get_expiring_messages, expires_till, limit, std::move(promise));
}
@ -1178,8 +1162,7 @@ class MessageDbAsync final : public MessageDbAsyncInterface {
add_read_query();
promise.set_value(sync_db_->get_messages_fts(std::move(query)));
}
void get_expiring_messages(int32 expires_till, int32 limit,
Promise<std::pair<vector<MessageDbMessage>, int32>> promise) {
void get_expiring_messages(int32 expires_till, int32 limit, Promise<vector<MessageDbMessage>> promise) {
add_read_query();
promise.set_value(sync_db_->get_expiring_messages(expires_till, limit));
}

View File

@ -130,7 +130,7 @@ class MessageDbSyncInterface {
NotificationId from_notification_id,
int32 limit) = 0;
virtual std::pair<vector<MessageDbMessage>, int32> get_expiring_messages(int32 expires_till, int32 limit) = 0;
virtual vector<MessageDbMessage> get_expiring_messages(int32 expires_till, int32 limit) = 0;
virtual MessageDbCallsResult get_calls(MessageDbCallsQuery query) = 0;
virtual MessageDbFtsResult get_messages_fts(MessageDbFtsQuery query) = 0;
@ -187,8 +187,7 @@ class MessageDbAsyncInterface {
virtual void get_calls(MessageDbCallsQuery, Promise<MessageDbCallsResult> promise) = 0;
virtual void get_messages_fts(MessageDbFtsQuery query, Promise<MessageDbFtsResult> promise) = 0;
virtual void get_expiring_messages(int32 expires_till, int32 limit,
Promise<std::pair<vector<MessageDbMessage>, int32>> promise) = 0;
virtual void get_expiring_messages(int32 expires_till, int32 limit, Promise<vector<MessageDbMessage>> promise) = 0;
virtual void close(Promise<> promise) = 0;
virtual void force_flush() = 0;

View File

@ -13797,7 +13797,7 @@ void MessagesManager::init() {
G()->td_db()->get_binlog_pmc()->erase("dialog_pinned_current_order");
if (G()->use_message_database()) {
ttl_db_loop_start();
ttl_db_loop();
}
load_calls_db_state();
@ -13838,26 +13838,11 @@ void MessagesManager::on_authorization_success() {
create_folders();
}
void MessagesManager::ttl_db_loop_start() {
ttl_db_next_request_time_ = 0;
ttl_db_expires_till_ = G()->unix_time() + 15;
ttl_db_has_query_ = false;
ttl_db_loop();
}
void MessagesManager::ttl_db_loop() {
LOG(INFO) << "Begin ttl_db loop: " << tag("expires_till", ttl_db_expires_till_)
<< tag("has_query", ttl_db_has_query_);
if (ttl_db_has_query_) {
return;
}
if (ttl_db_expires_till_ < 0) {
LOG(INFO) << "Finish ttl_db loop";
return;
}
auto now = Time::now();
if (now < ttl_db_next_request_time_) {
ttl_db_slot_.set_event(EventCreator::yield(actor_shared(this, YieldType::TtlDb)));
@ -13868,17 +13853,15 @@ void MessagesManager::ttl_db_loop() {
}
ttl_db_has_query_ = true;
int32 limit = 50;
LOG(INFO) << "Send ttl_db query " << tag("expires_till", ttl_db_expires_till_) << tag("limit", limit);
LOG(INFO) << "Send ttl_db query with limit " << ttl_db_next_limit_;
G()->td_db()->get_message_db_async()->get_expiring_messages(
ttl_db_expires_till_, limit,
PromiseCreator::lambda(
[actor_id = actor_id(this)](Result<std::pair<std::vector<MessageDbMessage>, int32>> result) {
send_closure(actor_id, &MessagesManager::ttl_db_on_result, std::move(result), false);
}));
G()->unix_time() - 1, ttl_db_next_limit_,
PromiseCreator::lambda([actor_id = actor_id(this)](Result<std::vector<MessageDbMessage>> result) {
send_closure(actor_id, &MessagesManager::ttl_db_on_result, std::move(result), false);
}));
}
void MessagesManager::ttl_db_on_result(Result<std::pair<std::vector<MessageDbMessage>, int32>> r_result, bool dummy) {
void MessagesManager::ttl_db_on_result(Result<std::vector<MessageDbMessage>> r_result, bool dummy) {
if (G()->close_flag()) {
return;
}
@ -13886,12 +13869,21 @@ void MessagesManager::ttl_db_on_result(Result<std::pair<std::vector<MessageDbMes
CHECK(r_result.is_ok());
auto result = r_result.move_as_ok();
ttl_db_has_query_ = false;
ttl_db_next_request_time_ = Time::now() + Random::fast(3000, 4200);
ttl_db_expires_till_ = result.second;
LOG(INFO) << "Receive " << result.first.size()
<< " expired messages from database with new expires_till = " << ttl_db_expires_till_;
for (auto &dialog_message : result.first) {
int32 next_request_delay;
if (result.size() == static_cast<size_t>(ttl_db_next_limit_)) {
CHECK(ttl_db_next_limit_ < (1 << 30));
ttl_db_next_limit_ *= 2;
next_request_delay = 1;
} else {
ttl_db_next_limit_ = DEFAULT_LOADED_EXPIRED_MESSAGES;
next_request_delay = Random::fast(3000, 4200);
}
ttl_db_next_request_time_ = Time::now() + next_request_delay;
LOG(INFO) << "Receive " << result.size() << " expired messages from ttl_db with next request in "
<< next_request_delay << " seconds";
for (auto &dialog_message : result) {
on_get_message_from_database(dialog_message, false, "ttl_db_on_result");
}
ttl_db_loop();

View File

@ -1695,6 +1695,8 @@ class MessagesManager final : public Actor {
static constexpr int32 MIN_READ_HISTORY_DELAY = 3; // seconds
static constexpr int32 MAX_SAVE_DIALOG_DELAY = 0; // seconds
static constexpr int32 DEFAULT_LOADED_EXPIRED_MESSAGES = 50;
static constexpr int32 LIVE_LOCATION_VIEW_PERIOD = 60; // seconds, server-side limit
static constexpr int32 UPDATE_VIEWED_MESSAGES_PERIOD = 15; // seconds
@ -2832,11 +2834,12 @@ class MessagesManager final : public Actor {
void hangup() final;
void create_folders();
void init();
void ttl_db_loop_start();
void ttl_db_loop();
void ttl_db_on_result(Result<std::pair<std::vector<MessageDbMessage>, int32>> r_result, bool dummy);
void ttl_db_on_result(Result<std::vector<MessageDbMessage>> r_result, bool dummy);
void on_restore_missing_message_after_get_difference(FullMessageId full_message_id, MessageId old_message_id,
Result<Unit> result);
@ -3263,7 +3266,7 @@ class MessagesManager final : public Actor {
enum YieldType : int32 { None, TtlDb }; // None must be first
double ttl_db_next_request_time_ = 0;
int32 ttl_db_expires_till_ = 0;
int32 ttl_db_next_limit_ = DEFAULT_LOADED_EXPIRED_MESSAGES;
bool ttl_db_has_query_ = false;
Slot ttl_db_slot_;