Rename to enable_reactive_pull_backpressure

This commit is contained in:
Andrea Cavalli 2021-08-23 21:05:40 +02:00
parent 49bca76869
commit b7ef62be23
5 changed files with 47 additions and 47 deletions

View File

@ -31,8 +31,8 @@ We added some options:
* **receive_access_hashes** (true/**false**) Receive chats and users access hash as updates
* **experiment_enable_file_reference_cleanup** (**true**/false) During cleanup, free the memory of the file references
* **experiment_enable_chat_access_hash_cleanup** (**true**/false) During cleanup, clean chats and channels access hash
* **enable_pull_based_backpressure** (true/**false**) Enable manual `get_channel_difference` execution by calling `getChannelDifference(channel_difference_id)`.
Don't modify this option unless you have a very large bot that struggles to keep up with start-up updates throughput, or you want to implement a pull-based async library.
* **enable_reactive_pull_backpressure** (true/**false**) Enable manual `get_channel_difference` execution by calling `getChannelDifference(channel_difference_id)`.
Don't modify this option unless you have a very large bot that struggles to keep up with start-up updates throughput, or you want to implement a backpressure-aware pull-based library.
## Custom API functions
### TdApi.OptimizeMemory

View File

@ -12,11 +12,11 @@
should become
```cpp
auto enable_pull_based_backpressure
auto enable_reactive_pull_backpressure
= G()->shared_config()
.get_option_boolean("enable_pull_based_backpressure", false);
.get_option_boolean("enable_reactive_pull_backpressure", false);
get_channel_difference_delayed(dialog_id, old_pts, true,
enable_pull_based_backpressure,
enable_reactive_pull_backpressure,
"add_pending_channel_update pts mismatch");
```

View File

@ -6842,16 +6842,16 @@ void MessagesManager::on_update_channel_too_long(tl_object_ptr<telegram_api::upd
if (d != nullptr) {
if (update_pts == 0 || update_pts > d->pts) {
auto enable_pull_based_backpressure
= G()->shared_config().get_option_boolean("enable_pull_based_backpressure", false);
get_channel_difference_delayed(dialog_id, d->pts, true, enable_pull_based_backpressure,
auto enable_reactive_pull_backpressure
= G()->shared_config().get_option_boolean("enable_reactive_pull_backpressure", false);
get_channel_difference_delayed(dialog_id, d->pts, true, enable_reactive_pull_backpressure,
"on_update_channel_too_long 1");
}
} else {
if (force_apply) {
auto enable_pull_based_backpressure
= G()->shared_config().get_option_boolean("enable_pull_based_backpressure", false);
get_channel_difference_delayed(dialog_id, -1, true, enable_pull_based_backpressure,
auto enable_reactive_pull_backpressure
= G()->shared_config().get_option_boolean("enable_reactive_pull_backpressure", false);
get_channel_difference_delayed(dialog_id, -1, true, enable_reactive_pull_backpressure,
"on_update_channel_too_long 2");
} else {
td_->updates_manager_->schedule_get_difference("on_update_channel_too_long 3");
@ -6927,9 +6927,9 @@ void MessagesManager::update_message_interaction_info(FullMessageId full_message
LOG(INFO) << "Ignore message interaction info about unknown " << full_message_id;
if (!message_id.is_scheduled() && message_id > d->last_new_message_id &&
dialog_id.get_type() == DialogType::Channel) {
auto enable_pull_based_backpressure
= G()->shared_config().get_option_boolean("enable_pull_based_backpressure", false);
get_channel_difference_delayed(dialog_id, d->pts, true, enable_pull_based_backpressure,
auto enable_reactive_pull_backpressure
= G()->shared_config().get_option_boolean("enable_reactive_pull_backpressure", false);
get_channel_difference_delayed(dialog_id, d->pts, true, enable_reactive_pull_backpressure,
"update_message_interaction_info");
}
return;
@ -7505,9 +7505,9 @@ void MessagesManager::add_pending_channel_update(DialogId dialog_id, tl_object_p
LOG(INFO) << "Found a gap in unknown " << dialog_id << " with pts = " << pts << ". new_pts = " << new_pts
<< ", pts_count = " << pts_count << " in update from " << source;
add_postponed_channel_update(dialog_id, std::move(update), new_pts, pts_count, std::move(promise));
auto enable_pull_based_backpressure
= G()->shared_config().get_option_boolean("enable_pull_based_backpressure", false);
get_channel_difference_delayed(dialog_id, pts, true, enable_pull_based_backpressure, "add_pending_channel_update 3");
auto enable_reactive_pull_backpressure
= G()->shared_config().get_option_boolean("enable_reactive_pull_backpressure", false);
get_channel_difference_delayed(dialog_id, pts, true, enable_reactive_pull_backpressure, "add_pending_channel_update 3");
return;
}
@ -7539,9 +7539,9 @@ void MessagesManager::add_pending_channel_update(DialogId dialog_id, tl_object_p
<< ", update is from " << source << ": " << oneline(to_string(update));
last_channel_pts_jump_warning_time_ = now;
}
auto enable_pull_based_backpressure
= G()->shared_config().get_option_boolean("enable_pull_based_backpressure", false);
get_channel_difference_delayed(dialog_id, old_pts, true, enable_pull_based_backpressure, "add_pending_channel_update old");
auto enable_reactive_pull_backpressure
= G()->shared_config().get_option_boolean("enable_reactive_pull_backpressure", false);
get_channel_difference_delayed(dialog_id, old_pts, true, enable_reactive_pull_backpressure, "add_pending_channel_update old");
}
if (update->get_id() == telegram_api::updateNewChannelMessage::ID) {
@ -7587,9 +7587,9 @@ void MessagesManager::add_pending_channel_update(DialogId dialog_id, tl_object_p
if (d->was_opened || td_->contacts_manager_->get_channel_status(channel_id).is_member() ||
is_dialog_sponsored(d)) {
add_postponed_channel_update(dialog_id, std::move(update), new_pts, pts_count, std::move(promise));
auto enable_pull_based_backpressure
= G()->shared_config().get_option_boolean("enable_pull_based_backpressure", false);
get_channel_difference_delayed(dialog_id, old_pts, true, enable_pull_based_backpressure,
auto enable_reactive_pull_backpressure
= G()->shared_config().get_option_boolean("enable_reactive_pull_backpressure", false);
get_channel_difference_delayed(dialog_id, old_pts, true, enable_reactive_pull_backpressure,
"add_pending_channel_update pts mismatch");
} else {
promise.set_value(Unit());
@ -11407,9 +11407,9 @@ void MessagesManager::read_channel_message_content_from_updates(Dialog *d, Messa
if (m != nullptr) {
read_message_content(d, m, false, "read_channel_message_content_from_updates");
} else if (message_id > d->last_new_message_id) {
auto enable_pull_based_backpressure
= G()->shared_config().get_option_boolean("enable_pull_based_backpressure", false);
get_channel_difference_delayed(d->dialog_id, d->pts, true, enable_pull_based_backpressure,
auto enable_reactive_pull_backpressure
= G()->shared_config().get_option_boolean("enable_reactive_pull_backpressure", false);
get_channel_difference_delayed(d->dialog_id, d->pts, true, enable_reactive_pull_backpressure,
"read_channel_message_content_from_updates");
}
}
@ -14771,9 +14771,9 @@ void MessagesManager::on_get_dialogs(FolderId folder_id, vector<tl_object_ptr<te
send_update_chat_last_message(d, "on_get_dialogs");
}
} else {
auto enable_pull_based_backpressure
= G()->shared_config().get_option_boolean("enable_pull_based_backpressure", false);
get_channel_difference_delayed(dialog_id, d->pts, true, enable_pull_based_backpressure,
auto enable_reactive_pull_backpressure
= G()->shared_config().get_option_boolean("enable_reactive_pull_backpressure", false);
get_channel_difference_delayed(dialog_id, d->pts, true, enable_reactive_pull_backpressure,
"on_get_dialogs");
}
}
@ -29166,9 +29166,9 @@ void MessagesManager::check_send_message_result(int64 random_id, DialogId dialog
return;
}
if (dialog_id.get_type() == DialogType::Channel) {
auto enable_pull_based_backpressure
= G()->shared_config().get_option_boolean("enable_pull_based_backpressure", false);
get_channel_difference_delayed(dialog_id, d->pts, true, enable_pull_based_backpressure,
auto enable_reactive_pull_backpressure
= G()->shared_config().get_option_boolean("enable_reactive_pull_backpressure", false);
get_channel_difference_delayed(dialog_id, d->pts, true, enable_reactive_pull_backpressure,
"check_send_message_result");
} else {
td_->updates_manager_->schedule_get_difference("check_send_message_result");
@ -32944,14 +32944,14 @@ MessagesManager::Message *MessagesManager::add_message_to_dialog(Dialog *d, uniq
auto next_message = *it;
if (next_message != nullptr) {
if (next_message->message_id.is_server()) {
if (G()->shared_config().get_option_boolean("enable_pull_based_backpressure", false) == false) {
if (G()->shared_config().get_option_boolean("enable_reactive_pull_backpressure", false) == false) {
LOG(ERROR) << "Attach " << message_id << " from " << source << " before " << next_message->message_id
<< " and after " << previous_message_id << " in " << dialog_id;
dump_debug_message_op(d);
}
}
} else {
if (G()->shared_config().get_option_boolean("enable_pull_based_backpressure", false) == false) {
if (G()->shared_config().get_option_boolean("enable_reactive_pull_backpressure", false) == false) {
LOG(ERROR) << "Have_next is true, but there is no next message after " << previous_message_id << " from "
<< source << " in " << dialog_id;
dump_debug_message_op(d);
@ -36032,9 +36032,9 @@ void MessagesManager::run_after_channel_difference(DialogId dialog_id, Promise<U
run_after_get_channel_difference_[dialog_id].push_back(std::move(promise));
auto enable_pull_based_backpressure
= G()->shared_config().get_option_boolean("enable_pull_based_backpressure", false);
get_channel_difference_delayed(dialog_id, d->pts, true, enable_pull_based_backpressure,
auto enable_reactive_pull_backpressure
= G()->shared_config().get_option_boolean("enable_reactive_pull_backpressure", false);
get_channel_difference_delayed(dialog_id, d->pts, true, enable_reactive_pull_backpressure,
"run_after_channel_difference");
}
@ -36053,9 +36053,9 @@ void MessagesManager::on_channel_get_difference_timeout(DialogId dialog_id) {
LOG(ERROR) << "Unknown dialog " << dialog_id;
return;
}
auto enable_pull_based_backpressure
= G()->shared_config().get_option_boolean("enable_pull_based_backpressure", false);
get_channel_difference_delayed(dialog_id, d->pts, true, enable_pull_based_backpressure,
auto enable_reactive_pull_backpressure
= G()->shared_config().get_option_boolean("enable_reactive_pull_backpressure", false);
get_channel_difference_delayed(dialog_id, d->pts, true, enable_reactive_pull_backpressure,
"on_channel_get_difference_timeout");
}
@ -36085,8 +36085,8 @@ class MessagesManager::GetChannelDifferenceLogEvent {
};
void MessagesManager::get_channel_difference_delayed(DialogId dialog_id, int32 pts,
bool force, bool enable_pull_based_backpressure, const char *source) {
if (!enable_pull_based_backpressure) {
bool force, bool enable_reactive_pull_backpressure, const char *source) {
if (!enable_reactive_pull_backpressure) {
// Execute get_channel_difference immediatly
const char* source_prefix = "on_get_channel_difference, ";
char* joined_source{ new char[strlen(source_prefix) + strlen(source) + 1] };
@ -36678,9 +36678,9 @@ void MessagesManager::on_get_channel_difference(
if (!is_final) {
LOG_IF(ERROR, timeout > 0) << "Have timeout in nonfinal ChannelDifference in " << dialog_id;
auto enable_pull_based_backpressure
= G()->shared_config().get_option_boolean("enable_pull_based_backpressure", false);
get_channel_difference_delayed(dialog_id, d->pts, true, enable_pull_based_backpressure, "on_get_channel_difference");
auto enable_reactive_pull_backpressure
= G()->shared_config().get_option_boolean("enable_reactive_pull_backpressure", false);
get_channel_difference_delayed(dialog_id, d->pts, true, enable_reactive_pull_backpressure, "on_get_channel_difference");
return;
}

View File

@ -2814,7 +2814,7 @@ class MessagesManager final : public Actor {
void on_channel_get_difference_timeout(DialogId dialog_id);
void get_channel_difference_delayed(DialogId dialog_id, int32 pts, bool force, bool enable_pull_based_backpressure,
void get_channel_difference_delayed(DialogId dialog_id, int32 pts, bool force, bool enable_reactive_pull_backpressure,
const char *source);
void get_channel_difference(DialogId dialog_id, int32 pts, bool force, const char *source);

View File

@ -7634,7 +7634,7 @@ void Td::on_request(uint64 id, td_api::setOption &request) {
if (set_boolean_option("experiment_old_postponed_pts_updates_behavior")) {
return;
}
if (set_boolean_option("enable_pull_based_backpressure")) {
if (set_boolean_option("enable_reactive_pull_backpressure")) {
return;
}
break;