Remove lower bound check for ttl_expires_at.
This commit is contained in:
parent
7d3fea5a70
commit
a14a07c33d
@ -204,10 +204,8 @@ class MessageDbImpl final : public MessageDbSyncInterface {
|
||||
get_message_by_unique_message_id_stmt_,
|
||||
db_.get_statement("SELECT dialog_id, message_id, data FROM messages WHERE unique_message_id = ?1"));
|
||||
|
||||
TRY_RESULT_ASSIGN(
|
||||
get_expiring_messages_stmt_,
|
||||
db_.get_statement(
|
||||
"SELECT dialog_id, message_id, data FROM messages WHERE ?1 < ttl_expires_at AND ttl_expires_at <= ?2"));
|
||||
TRY_RESULT_ASSIGN(get_expiring_messages_stmt_,
|
||||
db_.get_statement("SELECT dialog_id, message_id, data FROM messages WHERE ttl_expires_at <= ?1"));
|
||||
TRY_RESULT_ASSIGN(get_expiring_messages_helper_stmt_,
|
||||
db_.get_statement("SELECT MAX(ttl_expires_at), COUNT(*) FROM (SELECT ttl_expires_at FROM "
|
||||
"messages WHERE ?1 < ttl_expires_at LIMIT ?2) AS T"));
|
||||
@ -573,8 +571,7 @@ class MessageDbImpl final : public MessageDbSyncInterface {
|
||||
return Status::Error("Not found");
|
||||
}
|
||||
|
||||
std::pair<vector<MessageDbMessage>, int32> get_expiring_messages(int32 expires_from, int32 expires_till,
|
||||
int32 limit) final {
|
||||
std::pair<vector<MessageDbMessage>, int32> get_expiring_messages(int32 expires_till, int32 limit) final {
|
||||
SCOPE_EXIT {
|
||||
get_expiring_messages_stmt_.reset();
|
||||
get_expiring_messages_helper_stmt_.reset();
|
||||
@ -582,9 +579,7 @@ class MessageDbImpl final : public MessageDbSyncInterface {
|
||||
|
||||
vector<MessageDbMessage> messages;
|
||||
// load messages
|
||||
if (expires_from <= expires_till) {
|
||||
get_expiring_messages_stmt_.bind_int32(1, expires_from).ensure();
|
||||
get_expiring_messages_stmt_.bind_int32(2, expires_till).ensure();
|
||||
get_expiring_messages_stmt_.bind_int32(1, expires_till).ensure();
|
||||
get_expiring_messages_stmt_.step().ensure();
|
||||
|
||||
while (get_expiring_messages_stmt_.has_row()) {
|
||||
@ -594,7 +589,6 @@ class MessageDbImpl final : public MessageDbSyncInterface {
|
||||
messages.push_back(MessageDbMessage{dialog_id, message_id, std::move(data)});
|
||||
get_expiring_messages_stmt_.step().ensure();
|
||||
}
|
||||
}
|
||||
|
||||
// calc next expires_till
|
||||
get_expiring_messages_helper_stmt_.bind_int32(1, expires_till).ensure();
|
||||
@ -1072,9 +1066,9 @@ class MessageDbAsync final : public MessageDbAsyncInterface {
|
||||
void get_messages_fts(MessageDbFtsQuery query, Promise<MessageDbFtsResult> promise) final {
|
||||
send_closure_later(impl_, &Impl::get_messages_fts, std::move(query), std::move(promise));
|
||||
}
|
||||
void get_expiring_messages(int32 expires_from, int32 expires_till, int32 limit,
|
||||
void get_expiring_messages(int32 expires_till, int32 limit,
|
||||
Promise<std::pair<vector<MessageDbMessage>, int32>> promise) final {
|
||||
send_closure_later(impl_, &Impl::get_expiring_messages, expires_from, expires_till, limit, std::move(promise));
|
||||
send_closure_later(impl_, &Impl::get_expiring_messages, expires_till, limit, std::move(promise));
|
||||
}
|
||||
|
||||
void close(Promise<> promise) final {
|
||||
@ -1184,10 +1178,10 @@ class MessageDbAsync final : public MessageDbAsyncInterface {
|
||||
add_read_query();
|
||||
promise.set_value(sync_db_->get_messages_fts(std::move(query)));
|
||||
}
|
||||
void get_expiring_messages(int32 expires_from, int32 expires_till, int32 limit,
|
||||
void get_expiring_messages(int32 expires_till, int32 limit,
|
||||
Promise<std::pair<vector<MessageDbMessage>, int32>> promise) {
|
||||
add_read_query();
|
||||
promise.set_value(sync_db_->get_expiring_messages(expires_from, expires_till, limit));
|
||||
promise.set_value(sync_db_->get_expiring_messages(expires_till, limit));
|
||||
}
|
||||
|
||||
void close(Promise<> promise) {
|
||||
|
@ -130,8 +130,7 @@ class MessageDbSyncInterface {
|
||||
NotificationId from_notification_id,
|
||||
int32 limit) = 0;
|
||||
|
||||
virtual std::pair<vector<MessageDbMessage>, int32> get_expiring_messages(int32 expires_from, int32 expires_till,
|
||||
int32 limit) = 0;
|
||||
virtual std::pair<vector<MessageDbMessage>, int32> get_expiring_messages(int32 expires_till, int32 limit) = 0;
|
||||
virtual MessageDbCallsResult get_calls(MessageDbCallsQuery query) = 0;
|
||||
virtual MessageDbFtsResult get_messages_fts(MessageDbFtsQuery query) = 0;
|
||||
|
||||
@ -188,7 +187,7 @@ class MessageDbAsyncInterface {
|
||||
virtual void get_calls(MessageDbCallsQuery, Promise<MessageDbCallsResult> promise) = 0;
|
||||
virtual void get_messages_fts(MessageDbFtsQuery query, Promise<MessageDbFtsResult> promise) = 0;
|
||||
|
||||
virtual void get_expiring_messages(int32 expires_from, int32 expires_till, int32 limit,
|
||||
virtual void get_expiring_messages(int32 expires_till, int32 limit,
|
||||
Promise<std::pair<vector<MessageDbMessage>, int32>> promise) = 0;
|
||||
|
||||
virtual void close(Promise<> promise) = 0;
|
||||
|
@ -13506,7 +13506,7 @@ void MessagesManager::on_message_ttl_expired_impl(Dialog *d, Message *m) {
|
||||
void MessagesManager::loop() {
|
||||
auto token = get_link_token();
|
||||
if (token == YieldType::TtlDb) {
|
||||
ttl_db_loop(G()->server_time());
|
||||
ttl_db_loop();
|
||||
} else {
|
||||
ttl_loop(Time::now());
|
||||
}
|
||||
@ -13797,7 +13797,7 @@ void MessagesManager::init() {
|
||||
G()->td_db()->get_binlog_pmc()->erase("dialog_pinned_current_order");
|
||||
|
||||
if (G()->use_message_database()) {
|
||||
ttl_db_loop_start(G()->server_time());
|
||||
ttl_db_loop_start();
|
||||
}
|
||||
|
||||
load_calls_db_state();
|
||||
@ -13838,31 +13838,30 @@ void MessagesManager::on_authorization_success() {
|
||||
create_folders();
|
||||
}
|
||||
|
||||
void MessagesManager::ttl_db_loop_start(double server_now) {
|
||||
ttl_db_expires_from_ = 0;
|
||||
ttl_db_expires_till_ = static_cast<int32>(server_now) + 15 /* 15 seconds */;
|
||||
void MessagesManager::ttl_db_loop_start() {
|
||||
ttl_db_next_request_time_ = 0;
|
||||
ttl_db_expires_till_ = G()->unix_time() + 15;
|
||||
ttl_db_has_query_ = false;
|
||||
|
||||
ttl_db_loop(server_now);
|
||||
ttl_db_loop();
|
||||
}
|
||||
|
||||
void MessagesManager::ttl_db_loop(double server_now) {
|
||||
LOG(INFO) << "Begin ttl_db loop: " << tag("expires_from", ttl_db_expires_from_)
|
||||
<< tag("expires_till", ttl_db_expires_till_) << tag("has_query", ttl_db_has_query_);
|
||||
void MessagesManager::ttl_db_loop() {
|
||||
LOG(INFO) << "Begin ttl_db loop: " << tag("expires_till", ttl_db_expires_till_)
|
||||
<< tag("has_query", ttl_db_has_query_);
|
||||
if (ttl_db_has_query_) {
|
||||
return;
|
||||
}
|
||||
|
||||
auto now = static_cast<int32>(server_now);
|
||||
|
||||
if (ttl_db_expires_till_ < 0) {
|
||||
LOG(INFO) << "Finish ttl_db loop";
|
||||
return;
|
||||
}
|
||||
|
||||
if (now < ttl_db_expires_from_) {
|
||||
auto now = Time::now();
|
||||
if (now < ttl_db_next_request_time_) {
|
||||
ttl_db_slot_.set_event(EventCreator::yield(actor_shared(this, YieldType::TtlDb)));
|
||||
auto wakeup_in = ttl_db_expires_from_ - server_now;
|
||||
auto wakeup_in = ttl_db_next_request_time_ - now;
|
||||
ttl_db_slot_.set_timeout_in(wakeup_in);
|
||||
LOG(INFO) << "Set ttl_db timeout in " << wakeup_in;
|
||||
return;
|
||||
@ -13870,10 +13869,9 @@ void MessagesManager::ttl_db_loop(double server_now) {
|
||||
|
||||
ttl_db_has_query_ = true;
|
||||
int32 limit = 50;
|
||||
LOG(INFO) << "Send ttl_db query " << tag("expires_from", ttl_db_expires_from_)
|
||||
<< tag("expires_till", ttl_db_expires_till_) << tag("limit", limit);
|
||||
LOG(INFO) << "Send ttl_db query " << tag("expires_till", ttl_db_expires_till_) << tag("limit", limit);
|
||||
G()->td_db()->get_message_db_async()->get_expiring_messages(
|
||||
ttl_db_expires_from_, ttl_db_expires_till_, limit,
|
||||
ttl_db_expires_till_, limit,
|
||||
PromiseCreator::lambda(
|
||||
[actor_id = actor_id(this)](Result<std::pair<std::vector<MessageDbMessage>, int32>> result) {
|
||||
send_closure(actor_id, &MessagesManager::ttl_db_on_result, std::move(result), false);
|
||||
@ -13888,7 +13886,7 @@ void MessagesManager::ttl_db_on_result(Result<std::pair<std::vector<MessageDbMes
|
||||
CHECK(r_result.is_ok());
|
||||
auto result = r_result.move_as_ok();
|
||||
ttl_db_has_query_ = false;
|
||||
ttl_db_expires_from_ = ttl_db_expires_till_;
|
||||
ttl_db_next_request_time_ = Time::now() + Random::fast(3000, 4200);
|
||||
ttl_db_expires_till_ = result.second;
|
||||
|
||||
LOG(INFO) << "Receive " << result.first.size()
|
||||
@ -13896,7 +13894,7 @@ void MessagesManager::ttl_db_on_result(Result<std::pair<std::vector<MessageDbMes
|
||||
for (auto &dialog_message : result.first) {
|
||||
on_get_message_from_database(dialog_message, false, "ttl_db_on_result");
|
||||
}
|
||||
ttl_db_loop(G()->server_time());
|
||||
ttl_db_loop();
|
||||
}
|
||||
|
||||
void MessagesManager::on_send_secret_message_error(int64 random_id, Status error, Promise<Unit> promise) {
|
||||
|
@ -2834,8 +2834,8 @@ class MessagesManager final : public Actor {
|
||||
void create_folders();
|
||||
void init();
|
||||
|
||||
void ttl_db_loop_start(double server_now);
|
||||
void ttl_db_loop(double server_now);
|
||||
void ttl_db_loop_start();
|
||||
void ttl_db_loop();
|
||||
void ttl_db_on_result(Result<std::pair<std::vector<MessageDbMessage>, int32>> r_result, bool dummy);
|
||||
|
||||
void on_restore_missing_message_after_get_difference(FullMessageId full_message_id, MessageId old_message_id,
|
||||
@ -3262,9 +3262,9 @@ class MessagesManager final : public Actor {
|
||||
Slot ttl_slot_;
|
||||
|
||||
enum YieldType : int32 { None, TtlDb }; // None must be first
|
||||
int32 ttl_db_expires_from_;
|
||||
int32 ttl_db_expires_till_;
|
||||
bool ttl_db_has_query_;
|
||||
double ttl_db_next_request_time_ = 0;
|
||||
int32 ttl_db_expires_till_ = 0;
|
||||
bool ttl_db_has_query_ = false;
|
||||
Slot ttl_db_slot_;
|
||||
|
||||
FlatHashMap<int64, FullMessageId> being_sent_messages_; // message_random_id -> message
|
||||
|
Loading…
Reference in New Issue
Block a user