Delay before sending read history request to server.

GitOrigin-RevId: 5452973469e6142cefa00d063ceef36add728f14
This commit is contained in:
levlam 2018-05-03 15:36:05 +03:00
parent a82dfe4735
commit a37dfc406b
3 changed files with 97 additions and 15 deletions

View File

@ -701,7 +701,7 @@ void ConfigManager::process_config(tl_object_ptr<telegram_api::config> config) {
shared_config.set_option_integer("supergroup_size_max", config->megagroup_size_max_); shared_config.set_option_integer("supergroup_size_max", config->megagroup_size_max_);
shared_config.set_option_integer("pinned_chat_count_max", config->pinned_dialogs_count_max_); shared_config.set_option_integer("pinned_chat_count_max", config->pinned_dialogs_count_max_);
if (is_from_main_dc || !shared_config.have_option("expect_blocking")) { if (is_from_main_dc || !shared_config.have_option("expect_blocking")) {
shared_config.set_option_integer("expect_blocking", shared_config.set_option_boolean("expect_blocking",
(config->flags_ & telegram_api::config::BLOCKED_MODE_MASK) != 0); (config->flags_ & telegram_api::config::BLOCKED_MODE_MASK) != 0);
} }
if (is_from_main_dc || !shared_config.have_option("t_me_url")) { if (is_from_main_dc || !shared_config.have_option("t_me_url")) {

View File

@ -4686,6 +4686,9 @@ MessagesManager::MessagesManager(Td *td, ActorShared<> parent) : td_(td), parent
pending_draft_message_timeout_.set_callback(on_pending_draft_message_timeout_callback); pending_draft_message_timeout_.set_callback(on_pending_draft_message_timeout_callback);
pending_draft_message_timeout_.set_callback_data(static_cast<void *>(this)); pending_draft_message_timeout_.set_callback_data(static_cast<void *>(this));
pending_read_history_timeout_.set_callback(on_pending_read_history_timeout_callback);
pending_read_history_timeout_.set_callback_data(static_cast<void *>(this));
pending_updated_dialog_timeout_.set_callback(on_pending_updated_dialog_timeout_callback); pending_updated_dialog_timeout_.set_callback(on_pending_updated_dialog_timeout_callback);
pending_updated_dialog_timeout_.set_callback_data(static_cast<void *>(this)); pending_updated_dialog_timeout_.set_callback_data(static_cast<void *>(this));
@ -4799,6 +4802,16 @@ void MessagesManager::on_pending_draft_message_timeout_callback(void *messages_m
messages_manager->save_dialog_draft_message_on_server(dialog_id); messages_manager->save_dialog_draft_message_on_server(dialog_id);
} }
void MessagesManager::on_pending_read_history_timeout_callback(void *messages_manager_ptr, int64 dialog_id_int) {
if (G()->close_flag()) {
return;
}
auto messages_manager = static_cast<MessagesManager *>(messages_manager_ptr);
DialogId dialog_id(dialog_id_int);
messages_manager->read_history_on_server_impl(dialog_id, MessageId());
}
void MessagesManager::on_pending_updated_dialog_timeout_callback(void *messages_manager_ptr, int64 dialog_id_int) { void MessagesManager::on_pending_updated_dialog_timeout_callback(void *messages_manager_ptr, int64 dialog_id_int) {
auto messages_manager = static_cast<MessagesManager *>(messages_manager_ptr); auto messages_manager = static_cast<MessagesManager *>(messages_manager_ptr);
// TODO it is unsafe to save dialog to database before binlog is flushed // TODO it is unsafe to save dialog to database before binlog is flushed
@ -12871,7 +12884,6 @@ Status MessagesManager::view_messages(DialogId dialog_id, const vector<MessageId
} }
bool need_read = force_read || d->is_opened; bool need_read = force_read || d->is_opened;
bool is_secret = dialog_id.get_type() == DialogType::SecretChat;
MessageId max_message_id; // max server or local viewed message_id MessageId max_message_id; // max server or local viewed message_id
vector<MessageId> read_content_message_ids; vector<MessageId> read_content_message_ids;
for (auto message_id : message_ids) { for (auto message_id : message_ids) {
@ -12906,19 +12918,19 @@ Status MessagesManager::view_messages(DialogId dialog_id, const vector<MessageId
if (need_read && max_message_id.get() > d->last_read_inbox_message_id.get()) { if (need_read && max_message_id.get() > d->last_read_inbox_message_id.get()) {
MessageId last_read_message_id = max_message_id; MessageId last_read_message_id = max_message_id;
MessageId prev_last_read_inbox_message_id = d->last_read_inbox_message_id;
read_history_inbox(d->dialog_id, last_read_message_id, -1, "view_messages");
if (dialog_id.get_type() != DialogType::SecretChat) { if (dialog_id.get_type() != DialogType::SecretChat) {
if (last_read_message_id.get_prev_server_message_id().get() > if (last_read_message_id.get_prev_server_message_id().get() >
d->last_read_inbox_message_id.get_prev_server_message_id().get()) { prev_last_read_inbox_message_id.get_prev_server_message_id().get()) {
read_history_on_server(d->dialog_id, last_read_message_id.get_prev_server_message_id(), 0); read_history_on_server(d, last_read_message_id.get_prev_server_message_id());
} }
} else { } else {
if (last_read_message_id.get() > d->last_read_inbox_message_id.get()) { if (last_read_message_id.get() > prev_last_read_inbox_message_id.get()) {
read_history_on_server(d->dialog_id, last_read_message_id, 0); read_history_on_server(d, last_read_message_id);
} }
} }
read_history_inbox(d->dialog_id, last_read_message_id, -1, "view_messages");
} }
return Status::OK(); return Status::OK();
@ -13067,10 +13079,15 @@ void MessagesManager::close_dialog(Dialog *d) {
if (pending_message_views_timeout_.has_timeout(d->dialog_id.get())) { if (pending_message_views_timeout_.has_timeout(d->dialog_id.get())) {
pending_message_views_timeout_.set_timeout_in(d->dialog_id.get(), 0.0); pending_message_views_timeout_.set_timeout_in(d->dialog_id.get(), 0.0);
} }
if (pending_read_history_timeout_.has_timeout(d->dialog_id.get())) {
pending_read_history_timeout_.set_timeout_in(d->dialog_id.get(), 0.0);
}
} else { } else {
pending_message_views_timeout_.cancel_timeout(d->dialog_id.get()); pending_message_views_timeout_.cancel_timeout(d->dialog_id.get());
d->pending_viewed_message_ids.clear(); d->pending_viewed_message_ids.clear();
d->increment_view_counter = false; d->increment_view_counter = false;
pending_read_history_timeout_.cancel_timeout(d->dialog_id.get());
} }
if (is_message_unload_enabled()) { if (is_message_unload_enabled()) {
@ -13640,22 +13657,59 @@ class MessagesManager::ReadHistoryOnServerLogEvent {
} }
}; };
void MessagesManager::read_history_on_server(DialogId dialog_id, MessageId max_message_id, uint64 logevent_id) { void MessagesManager::read_history_on_server(Dialog *d, MessageId max_message_id) {
if (td_->auth_manager_->is_bot()) { if (td_->auth_manager_->is_bot()) {
return; return;
} }
auto dialog_id = d->dialog_id;
LOG(INFO) << "Read history in " << dialog_id << " on server up to " << max_message_id; LOG(INFO) << "Read history in " << dialog_id << " on server up to " << max_message_id;
if (logevent_id == 0 && G()->parameters().use_message_db) { if (G()->parameters().use_message_db) {
ReadHistoryOnServerLogEvent logevent; ReadHistoryOnServerLogEvent logevent;
logevent.dialog_id_ = dialog_id; logevent.dialog_id_ = dialog_id;
logevent.max_message_id_ = max_message_id; logevent.max_message_id_ = max_message_id;
auto storer = LogEventStorerImpl<ReadHistoryOnServerLogEvent>(logevent); auto storer = LogEventStorerImpl<ReadHistoryOnServerLogEvent>(logevent);
logevent_id = BinlogHelper::add(G()->td_db()->get_binlog(), LogEvent::HandlerType::ReadHistoryOnServer, storer); if (d->read_history_logevent_id == 0) {
d->read_history_logevent_id = BinlogHelper::add(
G()->td_db()->get_binlog(), LogEvent::HandlerType::ReadHistoryOnServer, storer);
LOG(INFO) << "Add read history logevent " << d->read_history_logevent_id;
} else {
auto new_logevent_id = BinlogHelper::rewrite(G()->td_db()->get_binlog(), d->read_history_logevent_id,
LogEvent::HandlerType::ReadHistoryOnServer, storer);
LOG(INFO) << "Rewrite read history logevent " << d->read_history_logevent_id << " with " << new_logevent_id;
}
d->read_history_logevent_id_generation++;
} }
auto promise = get_erase_logevent_promise(logevent_id); pending_read_history_timeout_.set_timeout_in(dialog_id.get(), d->is_opened && dialog_id.get_type() != DialogType::SecretChat ? MIN_READ_HISTORY_DELAY : 0);
}
void MessagesManager::read_history_on_server_impl(DialogId dialog_id, MessageId max_message_id) {
Dialog *d = get_dialog(dialog_id);
CHECK(d != nullptr);
auto message_id = d->last_read_inbox_message_id;
if (dialog_id.get_type() != DialogType::SecretChat) {
message_id = message_id.get_prev_server_message_id();
}
if (message_id.get() > max_message_id.get()) {
max_message_id = message_id;
}
Promise<> promise;
if (d->read_history_logevent_id != 0) {
promise = PromiseCreator::lambda(
[actor_id = actor_id(this), dialog_id,
generation = d->read_history_logevent_id_generation](Result<Unit> result) mutable {
if (!G()->close_flag()) {
send_closure(actor_id, &MessagesManager::on_read_history_finished, dialog_id, generation);
}
});
}
LOG(INFO) << "Send read history request in " << dialog_id << " up to " << max_message_id;
switch (dialog_id.get_type()) { switch (dialog_id.get_type()) {
case DialogType::User: case DialogType::User:
case DialogType::Chat: case DialogType::Chat:
@ -13668,7 +13722,7 @@ void MessagesManager::read_history_on_server(DialogId dialog_id, MessageId max_m
} }
case DialogType::SecretChat: { case DialogType::SecretChat: {
auto secret_chat_id = dialog_id.get_secret_chat_id(); auto secret_chat_id = dialog_id.get_secret_chat_id();
auto *message = get_message_force(FullMessageId(dialog_id, max_message_id)); auto *message = get_message_force(d, max_message_id);
if (message != nullptr) { if (message != nullptr) {
send_closure(G()->secret_chats_manager(), &SecretChatsManager::send_read_history, secret_chat_id, message->date, send_closure(G()->secret_chats_manager(), &SecretChatsManager::send_read_history, secret_chat_id, message->date,
std::move(promise)); std::move(promise));
@ -13681,6 +13735,18 @@ void MessagesManager::read_history_on_server(DialogId dialog_id, MessageId max_m
} }
} }
void MessagesManager::on_read_history_finished(DialogId dialog_id, uint64 generation) {
auto d = get_dialog(dialog_id);
CHECK(d != nullptr);
LOG(INFO) << "Finished reading history in " << dialog_id << " with logevent " << d->read_history_logevent_id;
if (d->read_history_logevent_id_generation == generation) {
CHECK(d->read_history_logevent_id != 0);
LOG(INFO) << "Delete read history logevent " << d->read_history_logevent_id;
BinlogHelper::erase(G()->td_db()->get_binlog(), d->read_history_logevent_id);
d->read_history_logevent_id = 0;
}
}
std::pair<int32, vector<MessageId>> MessagesManager::search_dialog_messages( std::pair<int32, vector<MessageId>> MessagesManager::search_dialog_messages(
DialogId dialog_id, const string &query, UserId sender_user_id, MessageId from_message_id, int32 offset, DialogId dialog_id, const string &query, UserId sender_user_id, MessageId from_message_id, int32 offset,
int32 limit, const tl_object_ptr<td_api::SearchMessagesFilter> &filter, int64 &random_id, bool use_db, int32 limit, const tl_object_ptr<td_api::SearchMessagesFilter> &filter, int64 &random_id, bool use_db,
@ -25205,8 +25271,14 @@ void MessagesManager::on_binlog_events(vector<BinlogEvent> &&events) {
BinlogHelper::erase(G()->td_db()->get_binlog(), event.id_); BinlogHelper::erase(G()->td_db()->get_binlog(), event.id_);
break; break;
} }
if (d->read_history_logevent_id != 0) {
// we need only latest read history event
BinlogHelper::erase(G()->td_db()->get_binlog(), d->read_history_logevent_id);
}
d->read_history_logevent_id = event.id_;
d->read_history_logevent_id_generation++;
read_history_on_server(dialog_id, log_event.max_message_id_, event.id_); read_history_on_server_impl(dialog_id, log_event.max_message_id_);
break; break;
} }
case LogEvent::HandlerType::ReadMessageContentsOnServer: { case LogEvent::HandlerType::ReadMessageContentsOnServer: {

View File

@ -1581,6 +1581,8 @@ class MessagesManager : public Actor {
uint64 save_draft_message_logevent_id_generation = 0; uint64 save_draft_message_logevent_id_generation = 0;
uint64 save_notification_settings_logevent_id = 0; uint64 save_notification_settings_logevent_id = 0;
uint64 save_notification_settings_logevent_id_generation = 0; uint64 save_notification_settings_logevent_id_generation = 0;
uint64 read_history_logevent_id = 0;
uint64 read_history_logevent_id_generation = 0;
MessageId MessageId
last_read_all_mentions_message_id; // all mentions with a message id not greater than it are implicitly read last_read_all_mentions_message_id; // all mentions with a message id not greater than it are implicitly read
@ -1875,6 +1877,7 @@ class MessagesManager : public Actor {
static constexpr int32 MAX_MESSAGE_VIEW_DELAY = 1; // seconds static constexpr int32 MAX_MESSAGE_VIEW_DELAY = 1; // seconds
static constexpr int32 MIN_SAVE_DRAFT_DELAY = 1; // seconds static constexpr int32 MIN_SAVE_DRAFT_DELAY = 1; // seconds
static constexpr int32 MIN_READ_HISTORY_DELAY = 3; // seconds
static constexpr int32 MAX_SAVE_DIALOG_DELAY = 0; // seconds static constexpr int32 MAX_SAVE_DIALOG_DELAY = 0; // seconds
static constexpr int32 DIALOG_UNLOAD_DELAY = 60; // seconds static constexpr int32 DIALOG_UNLOAD_DELAY = 60; // seconds
@ -2094,7 +2097,11 @@ class MessagesManager : public Actor {
void read_history_outbox(DialogId dialog_id, MessageId max_message_id, int32 read_date = -1); void read_history_outbox(DialogId dialog_id, MessageId max_message_id, int32 read_date = -1);
void read_history_on_server(DialogId dialog_id, MessageId max_message_id, uint64 logevent_id); void read_history_on_server(Dialog *d, MessageId max_message_id);
void read_history_on_server_impl(DialogId dialog_id, MessageId max_message_id);
void on_read_history_finished(DialogId dialog_id, uint64 generation);
void read_secret_chat_outbox_inner(DialogId dialog_id, int32 up_to_date, int32 read_date); void read_secret_chat_outbox_inner(DialogId dialog_id, int32 up_to_date, int32 read_date);
@ -2556,6 +2563,8 @@ class MessagesManager : public Actor {
static void on_pending_draft_message_timeout_callback(void *messages_manager_ptr, int64 dialog_id_int); static void on_pending_draft_message_timeout_callback(void *messages_manager_ptr, int64 dialog_id_int);
static void on_pending_read_history_timeout_callback(void *messages_manager_ptr, int64 dialog_id_int);
static void on_pending_updated_dialog_timeout_callback(void *messages_manager_ptr, int64 dialog_id_int); static void on_pending_updated_dialog_timeout_callback(void *messages_manager_ptr, int64 dialog_id_int);
static void on_pending_unload_dialog_timeout_callback(void *messages_manager_ptr, int64 dialog_id_int); static void on_pending_unload_dialog_timeout_callback(void *messages_manager_ptr, int64 dialog_id_int);
@ -2825,6 +2834,7 @@ class MessagesManager : public Actor {
MultiTimeout channel_get_difference_retry_timeout_; MultiTimeout channel_get_difference_retry_timeout_;
MultiTimeout pending_message_views_timeout_; MultiTimeout pending_message_views_timeout_;
MultiTimeout pending_draft_message_timeout_; MultiTimeout pending_draft_message_timeout_;
MultiTimeout pending_read_history_timeout_;
MultiTimeout pending_updated_dialog_timeout_; MultiTimeout pending_updated_dialog_timeout_;
MultiTimeout pending_unload_dialog_timeout_; MultiTimeout pending_unload_dialog_timeout_;
MultiTimeout dialog_unmute_timeout_; MultiTimeout dialog_unmute_timeout_;