diff --git a/README.md b/README.md index 9eaa67489..936be1089 100644 --- a/README.md +++ b/README.md @@ -31,8 +31,8 @@ We added some options: * **receive_access_hashes** (true/**false**) Receive chats and users access hash as updates * **experiment_enable_file_reference_cleanup** (**true**/false) During cleanup, free the memory of the file references * **experiment_enable_chat_access_hash_cleanup** (**true**/false) During cleanup, clean chats and channels access hash -* **enable_pull_based_backpressure** (true/**false**) Enable manual `get_channel_difference` execution by calling `getChannelDifference(channel_difference_id)`. - Don't modify this option unless you have a very large bot that struggles to keep up with start-up updates throughput, or you want to implement a pull-based async library. +* **enable_reactive_pull_backpressure** (true/**false**) Enable manual `get_channel_difference` execution by calling `getChannelDifference(channel_difference_id)`. + Don't modify this option unless you have a very large bot that struggles to keep up with start-up updates throughput, or you want to implement a backpressure-aware pull-based library. ## Custom API functions ### TdApi.OptimizeMemory diff --git a/manual_checks.md b/manual_checks.md index 453540199..a144f9866 100644 --- a/manual_checks.md +++ b/manual_checks.md @@ -12,11 +12,11 @@ should become ```cpp - auto enable_pull_based_backpressure + auto enable_reactive_pull_backpressure = G()->shared_config() - .get_option_boolean("enable_pull_based_backpressure", false); + .get_option_boolean("enable_reactive_pull_backpressure", false); get_channel_difference_delayed(dialog_id, old_pts, true, - enable_pull_based_backpressure, + enable_reactive_pull_backpressure, "add_pending_channel_update pts mismatch"); ``` diff --git a/td/telegram/MessagesManager.cpp b/td/telegram/MessagesManager.cpp index 70a44348e..ff8743012 100644 --- a/td/telegram/MessagesManager.cpp +++ b/td/telegram/MessagesManager.cpp @@ -6842,16 +6842,16 @@ void MessagesManager::on_update_channel_too_long(tl_object_ptr d->pts) { - auto enable_pull_based_backpressure - = G()->shared_config().get_option_boolean("enable_pull_based_backpressure", false); - get_channel_difference_delayed(dialog_id, d->pts, true, enable_pull_based_backpressure, + auto enable_reactive_pull_backpressure + = G()->shared_config().get_option_boolean("enable_reactive_pull_backpressure", false); + get_channel_difference_delayed(dialog_id, d->pts, true, enable_reactive_pull_backpressure, "on_update_channel_too_long 1"); } } else { if (force_apply) { - auto enable_pull_based_backpressure - = G()->shared_config().get_option_boolean("enable_pull_based_backpressure", false); - get_channel_difference_delayed(dialog_id, -1, true, enable_pull_based_backpressure, + auto enable_reactive_pull_backpressure + = G()->shared_config().get_option_boolean("enable_reactive_pull_backpressure", false); + get_channel_difference_delayed(dialog_id, -1, true, enable_reactive_pull_backpressure, "on_update_channel_too_long 2"); } else { td_->updates_manager_->schedule_get_difference("on_update_channel_too_long 3"); @@ -6927,9 +6927,9 @@ void MessagesManager::update_message_interaction_info(FullMessageId full_message LOG(INFO) << "Ignore message interaction info about unknown " << full_message_id; if (!message_id.is_scheduled() && message_id > d->last_new_message_id && dialog_id.get_type() == DialogType::Channel) { - auto enable_pull_based_backpressure - = G()->shared_config().get_option_boolean("enable_pull_based_backpressure", false); - get_channel_difference_delayed(dialog_id, d->pts, true, enable_pull_based_backpressure, + auto enable_reactive_pull_backpressure + = G()->shared_config().get_option_boolean("enable_reactive_pull_backpressure", false); + get_channel_difference_delayed(dialog_id, d->pts, true, enable_reactive_pull_backpressure, "update_message_interaction_info"); } return; @@ -7505,9 +7505,9 @@ void MessagesManager::add_pending_channel_update(DialogId dialog_id, tl_object_p LOG(INFO) << "Found a gap in unknown " << dialog_id << " with pts = " << pts << ". new_pts = " << new_pts << ", pts_count = " << pts_count << " in update from " << source; add_postponed_channel_update(dialog_id, std::move(update), new_pts, pts_count, std::move(promise)); - auto enable_pull_based_backpressure - = G()->shared_config().get_option_boolean("enable_pull_based_backpressure", false); - get_channel_difference_delayed(dialog_id, pts, true, enable_pull_based_backpressure, "add_pending_channel_update 3"); + auto enable_reactive_pull_backpressure + = G()->shared_config().get_option_boolean("enable_reactive_pull_backpressure", false); + get_channel_difference_delayed(dialog_id, pts, true, enable_reactive_pull_backpressure, "add_pending_channel_update 3"); return; } @@ -7539,9 +7539,9 @@ void MessagesManager::add_pending_channel_update(DialogId dialog_id, tl_object_p << ", update is from " << source << ": " << oneline(to_string(update)); last_channel_pts_jump_warning_time_ = now; } - auto enable_pull_based_backpressure - = G()->shared_config().get_option_boolean("enable_pull_based_backpressure", false); - get_channel_difference_delayed(dialog_id, old_pts, true, enable_pull_based_backpressure, "add_pending_channel_update old"); + auto enable_reactive_pull_backpressure + = G()->shared_config().get_option_boolean("enable_reactive_pull_backpressure", false); + get_channel_difference_delayed(dialog_id, old_pts, true, enable_reactive_pull_backpressure, "add_pending_channel_update old"); } if (update->get_id() == telegram_api::updateNewChannelMessage::ID) { @@ -7587,9 +7587,9 @@ void MessagesManager::add_pending_channel_update(DialogId dialog_id, tl_object_p if (d->was_opened || td_->contacts_manager_->get_channel_status(channel_id).is_member() || is_dialog_sponsored(d)) { add_postponed_channel_update(dialog_id, std::move(update), new_pts, pts_count, std::move(promise)); - auto enable_pull_based_backpressure - = G()->shared_config().get_option_boolean("enable_pull_based_backpressure", false); - get_channel_difference_delayed(dialog_id, old_pts, true, enable_pull_based_backpressure, + auto enable_reactive_pull_backpressure + = G()->shared_config().get_option_boolean("enable_reactive_pull_backpressure", false); + get_channel_difference_delayed(dialog_id, old_pts, true, enable_reactive_pull_backpressure, "add_pending_channel_update pts mismatch"); } else { promise.set_value(Unit()); @@ -11407,9 +11407,9 @@ void MessagesManager::read_channel_message_content_from_updates(Dialog *d, Messa if (m != nullptr) { read_message_content(d, m, false, "read_channel_message_content_from_updates"); } else if (message_id > d->last_new_message_id) { - auto enable_pull_based_backpressure - = G()->shared_config().get_option_boolean("enable_pull_based_backpressure", false); - get_channel_difference_delayed(d->dialog_id, d->pts, true, enable_pull_based_backpressure, + auto enable_reactive_pull_backpressure + = G()->shared_config().get_option_boolean("enable_reactive_pull_backpressure", false); + get_channel_difference_delayed(d->dialog_id, d->pts, true, enable_reactive_pull_backpressure, "read_channel_message_content_from_updates"); } } @@ -14771,9 +14771,9 @@ void MessagesManager::on_get_dialogs(FolderId folder_id, vectorshared_config().get_option_boolean("enable_pull_based_backpressure", false); - get_channel_difference_delayed(dialog_id, d->pts, true, enable_pull_based_backpressure, + auto enable_reactive_pull_backpressure + = G()->shared_config().get_option_boolean("enable_reactive_pull_backpressure", false); + get_channel_difference_delayed(dialog_id, d->pts, true, enable_reactive_pull_backpressure, "on_get_dialogs"); } } @@ -29166,9 +29166,9 @@ void MessagesManager::check_send_message_result(int64 random_id, DialogId dialog return; } if (dialog_id.get_type() == DialogType::Channel) { - auto enable_pull_based_backpressure - = G()->shared_config().get_option_boolean("enable_pull_based_backpressure", false); - get_channel_difference_delayed(dialog_id, d->pts, true, enable_pull_based_backpressure, + auto enable_reactive_pull_backpressure + = G()->shared_config().get_option_boolean("enable_reactive_pull_backpressure", false); + get_channel_difference_delayed(dialog_id, d->pts, true, enable_reactive_pull_backpressure, "check_send_message_result"); } else { td_->updates_manager_->schedule_get_difference("check_send_message_result"); @@ -32944,14 +32944,14 @@ MessagesManager::Message *MessagesManager::add_message_to_dialog(Dialog *d, uniq auto next_message = *it; if (next_message != nullptr) { if (next_message->message_id.is_server()) { - if (G()->shared_config().get_option_boolean("enable_pull_based_backpressure", false) == false) { + if (G()->shared_config().get_option_boolean("enable_reactive_pull_backpressure", false) == false) { LOG(ERROR) << "Attach " << message_id << " from " << source << " before " << next_message->message_id << " and after " << previous_message_id << " in " << dialog_id; dump_debug_message_op(d); } } } else { - if (G()->shared_config().get_option_boolean("enable_pull_based_backpressure", false) == false) { + if (G()->shared_config().get_option_boolean("enable_reactive_pull_backpressure", false) == false) { LOG(ERROR) << "Have_next is true, but there is no next message after " << previous_message_id << " from " << source << " in " << dialog_id; dump_debug_message_op(d); @@ -36032,9 +36032,9 @@ void MessagesManager::run_after_channel_difference(DialogId dialog_id, Promiseshared_config().get_option_boolean("enable_pull_based_backpressure", false); - get_channel_difference_delayed(dialog_id, d->pts, true, enable_pull_based_backpressure, + auto enable_reactive_pull_backpressure + = G()->shared_config().get_option_boolean("enable_reactive_pull_backpressure", false); + get_channel_difference_delayed(dialog_id, d->pts, true, enable_reactive_pull_backpressure, "run_after_channel_difference"); } @@ -36053,9 +36053,9 @@ void MessagesManager::on_channel_get_difference_timeout(DialogId dialog_id) { LOG(ERROR) << "Unknown dialog " << dialog_id; return; } - auto enable_pull_based_backpressure - = G()->shared_config().get_option_boolean("enable_pull_based_backpressure", false); - get_channel_difference_delayed(dialog_id, d->pts, true, enable_pull_based_backpressure, + auto enable_reactive_pull_backpressure + = G()->shared_config().get_option_boolean("enable_reactive_pull_backpressure", false); + get_channel_difference_delayed(dialog_id, d->pts, true, enable_reactive_pull_backpressure, "on_channel_get_difference_timeout"); } @@ -36085,8 +36085,8 @@ class MessagesManager::GetChannelDifferenceLogEvent { }; void MessagesManager::get_channel_difference_delayed(DialogId dialog_id, int32 pts, - bool force, bool enable_pull_based_backpressure, const char *source) { - if (!enable_pull_based_backpressure) { + bool force, bool enable_reactive_pull_backpressure, const char *source) { + if (!enable_reactive_pull_backpressure) { // Execute get_channel_difference immediatly const char* source_prefix = "on_get_channel_difference, "; char* joined_source{ new char[strlen(source_prefix) + strlen(source) + 1] }; @@ -36678,9 +36678,9 @@ void MessagesManager::on_get_channel_difference( if (!is_final) { LOG_IF(ERROR, timeout > 0) << "Have timeout in nonfinal ChannelDifference in " << dialog_id; - auto enable_pull_based_backpressure - = G()->shared_config().get_option_boolean("enable_pull_based_backpressure", false); - get_channel_difference_delayed(dialog_id, d->pts, true, enable_pull_based_backpressure, "on_get_channel_difference"); + auto enable_reactive_pull_backpressure + = G()->shared_config().get_option_boolean("enable_reactive_pull_backpressure", false); + get_channel_difference_delayed(dialog_id, d->pts, true, enable_reactive_pull_backpressure, "on_get_channel_difference"); return; } diff --git a/td/telegram/MessagesManager.h b/td/telegram/MessagesManager.h index ff9116f2d..d86a0d753 100644 --- a/td/telegram/MessagesManager.h +++ b/td/telegram/MessagesManager.h @@ -2814,7 +2814,7 @@ class MessagesManager final : public Actor { void on_channel_get_difference_timeout(DialogId dialog_id); - void get_channel_difference_delayed(DialogId dialog_id, int32 pts, bool force, bool enable_pull_based_backpressure, + void get_channel_difference_delayed(DialogId dialog_id, int32 pts, bool force, bool enable_reactive_pull_backpressure, const char *source); void get_channel_difference(DialogId dialog_id, int32 pts, bool force, const char *source); diff --git a/td/telegram/Td.cpp b/td/telegram/Td.cpp index 5c2682b0a..4e838fe28 100644 --- a/td/telegram/Td.cpp +++ b/td/telegram/Td.cpp @@ -7634,7 +7634,7 @@ void Td::on_request(uint64 id, td_api::setOption &request) { if (set_boolean_option("experiment_old_postponed_pts_updates_behavior")) { return; } - if (set_boolean_option("enable_pull_based_backpressure")) { + if (set_boolean_option("enable_reactive_pull_backpressure")) { return; } break;