Add optiona pull-based backpressure system for get_channel_difference

This commit is contained in:
Andrea Cavalli 2021-04-03 12:46:22 +02:00
parent 66a58daf70
commit efb9c6b41d
6 changed files with 66 additions and 21 deletions

View File

@ -30,8 +30,8 @@ We added some options:
* **delete_file_reference_after_seconds** (positive number) During cleanup, free the memory of the files that have not been touched for more than X seconds
* **experiment_enable_file_reference_cleanup** (**true**/false) During cleanup, free the memory of the file references
* **experiment_enable_chat_access_hash_cleanup** (**true**/false) During cleanup, clean chats and channels access hash
* **get_channel_difference_delay_milliseconds** (**0**) Delay get_channel_difference n milliseconds every ~3000pts (~300msg).
Don't modify this option unless you have a very large bot that struggles to keep up with start-up updates throughput.
* **enable_pull_based_backpressure** (true/**false**) Enable manual `get_channel_difference` execution by calling `getChannelDifference(channel_difference_id)`.
Don't modify this option unless you have a very large bot that struggles to keep up with start-up updates throughput, or you want to implement a pull-based async library.
## Custom API functions
### TdApi.OptimizeMemory

View File

@ -3346,6 +3346,9 @@ updateAuthorizationState authorization_state:AuthorizationState = Update;
//@description A new message was received; can also be an outgoing message @message The new message
updateNewMessage message:message = Update;
//@description A new channel difference part was received @channel_difference_id The channel difference id
updateNewChannelDifferencePart channel_difference_id:int64 = Update;
//@description A request to send a message has reached the Telegram server. This doesn't mean that the message will be sent successfully or even that the send message request will be processed. This update will be sent only if the option "use_quick_ack" is set to true. This update may be sent multiple times for the same message
//@chat_id The chat identifier of the sent message @message_id A temporary message identifier
updateMessageSendAcknowledged chat_id:int53 message_id:int53 = Update;
@ -3823,6 +3826,9 @@ getMessageThread chat_id:int53 message_id:int53 = MessageThreadInfo;
//@description Returns information about a file; this is an offline request @file_id Identifier of the file to get
getFile file_id:int32 = File;
//@description Execute a channel difference @channel_difference_id Identifier of the channel difference to execute
getChannelDifference channel_difference_id:int64 = Ok;
//@description Returns information about a file by its remote ID; this is an offline request. Can be used to register a URL as a file for further uploading, or sending as a message. Even the request succeeds, the file can be used only if it is still accessible to the user.
//-For example, if the file is from a message, then the message must be not deleted and accessible to the user. If the file database is disabled, then the corresponding object with the file must be preloaded by the application
//@remote_file_id Remote identifier of the file to get @file_type File type, if known

View File

@ -32718,14 +32718,14 @@ MessagesManager::Message *MessagesManager::add_message_to_dialog(Dialog *d, uniq
auto next_message = *it;
if (next_message != nullptr) {
if (next_message->message_id.is_server()) {
if (G()->shared_config().get_option_integer("get_channel_difference_delay_milliseconds", 0) <= 0) {
if (G()->shared_config().get_option_boolean("enable_pull_based_backpressure", false) == false) {
LOG(ERROR) << "Attach " << message_id << " from " << source << " before " << next_message->message_id
<< " and after " << previous_message_id << " in " << dialog_id;
dump_debug_message_op(d);
}
}
} else {
if (G()->shared_config().get_option_integer("get_channel_difference_delay_milliseconds", 0) <= 0) {
if (G()->shared_config().get_option_boolean("enable_pull_based_backpressure", false) == false) {
LOG(ERROR) << "Have_next is true, but there is no next message after " << previous_message_id << " from "
<< source << " in " << dialog_id;
dump_debug_message_op(d);
@ -35794,22 +35794,32 @@ class MessagesManager::GetChannelDifferenceLogEvent {
};
void MessagesManager::get_channel_difference_delayed(DialogId dialog_id, int32 pts,
bool force, double delay_seconds, const char *source) {
if (delay_seconds <= 0.0) {
bool force, bool enable_pull_based_backpressure, const char *source) {
if (!enable_pull_based_backpressure) {
// Execute get_channel_difference immediatly
get_channel_difference(dialog_id,pts, force, "on_get_channel_difference");
} else {
// Schedule get_channel_difference
create_actor<SleepActor>(
"GetChannelDifferenceDelayedActor", delay_seconds,
PromiseCreator::lambda([actor_id = actor_id(this), dialog_id, pts, force](Result<Unit> result) {
send_closure(actor_id, &MessagesManager::get_channel_difference, dialog_id, pts, force,
"on_get_channel_difference");
}))
.release();
auto channel_difference_id = ++last_pending_channel_difference_;
pending_channel_difference_
.emplace(channel_difference_id, td::make_unique<PendingChannelDifference>(dialog_id, pts, force, source));
send_closure(G()->td(), &Td::send_update,
make_tl_object<td_api::updateNewChannelDifferencePart>(channel_difference_id));
}
}
bool MessagesManager::run_get_channel_difference_request(long id) {
auto pending_channel_difference_entry = pending_channel_difference_.find(id);
if (pending_channel_difference_entry == pending_channel_difference_.end()) {
return false;
}
pending_channel_difference_.erase(pending_channel_difference_entry->first);
// Run get_channel_difference
get_channel_difference(pending_channel_difference_entry->second->dialog_id,
pending_channel_difference_entry->second->pts, pending_channel_difference_entry->second->force,
"on_get_channel_difference");
return true;
}
void MessagesManager::get_channel_difference(DialogId dialog_id, int32 pts, bool force, const char *source) {
if (channel_get_difference_retry_timeout_.has_timeout(dialog_id.get())) {
LOG(INFO) << "Skip running channels.getDifference for " << dialog_id << " from " << source
@ -36226,9 +36236,9 @@ void MessagesManager::on_get_channel_difference(
if (!is_final) {
LOG_IF(ERROR, timeout > 0) << "Have timeout in not final ChannelDifference in " << dialog_id;
auto delay_seconds = static_cast<double>(G()->shared_config()
.get_option_integer("get_channel_difference_delay_milliseconds", 0)) / 1000.0;
get_channel_difference_delayed(dialog_id, d->pts, true, delay_seconds, "on_get_channel_difference");
auto enable_pull_based_backpressure
= G()->shared_config().get_option_boolean("enable_pull_based_backpressure", false);
get_channel_difference_delayed(dialog_id, d->pts, true, enable_pull_based_backpressure, "on_get_channel_difference");
return;
}

View File

@ -195,6 +195,8 @@ class MessagesManager : public Actor {
void get_channel_difference_if_needed(DialogId dialog_id, MessagesInfo &&messages_info,
Promise<MessagesInfo> &&promise);
bool run_get_channel_difference_request(long id);
void on_get_messages(vector<tl_object_ptr<telegram_api::Message>> &&messages, bool is_channel_message,
bool is_scheduled, const char *source);
@ -2788,7 +2790,8 @@ class MessagesManager : public Actor {
void on_channel_get_difference_timeout(DialogId dialog_id);
void get_channel_difference_delayed(DialogId dialog_id, int32 pts, bool force, double timeout, const char *source);
void get_channel_difference_delayed(DialogId dialog_id, int32 pts, bool force, bool enable_pull_based_backpressure,
const char *source);
void get_channel_difference(DialogId dialog_id, int32 pts, bool force, const char *source);
@ -3130,6 +3133,20 @@ class MessagesManager : public Actor {
};
std::unordered_map<int64, unique_ptr<PendingMessageImport>> pending_message_imports_;
struct PendingChannelDifference {
DialogId dialog_id;
int32 pts;
bool force;
const char *source;
PendingChannelDifference(DialogId dialog_id, int32 pts, bool force, const char *source)
: dialog_id(dialog_id), pts(pts), force(force), source(source) {
}
};
std::unordered_map<int64, unique_ptr<PendingChannelDifference>> pending_channel_difference_;
int64 last_pending_channel_difference_ = 0;
struct PendingMessageGroupSend {
DialogId dialog_id;
size_t finished_count = 0;

View File

@ -5177,6 +5177,18 @@ void Td::on_request(uint64 id, const td_api::getFile &request) {
send_closure(actor_id(this), &Td::send_result, id, file_manager_->get_file_object(FileId(request.file_id_, 0)));
}
void Td::on_request(uint64 id, const td_api::getChannelDifference &request) {
auto result = messages_manager_->run_get_channel_difference_request(request.channel_difference_id_);
if (result) {
send_closure(actor_id(this), &Td::send_result, id,
td_api::make_object<td_api::ok>());
} else {
send_closure(actor_id(this), &Td::send_result, id,
td_api::make_object<td_api::error>(
400, "Channel diffence identifier already executed or nonexistent"));
}
}
void Td::on_request(uint64 id, td_api::getRemoteFile &request) {
CLEAN_INPUT_STRING(request.remote_file_id_);
auto file_type = request.file_type_ == nullptr ? FileType::Temp : get_file_type(*request.file_type_);
@ -7540,9 +7552,7 @@ void Td::on_request(uint64 id, td_api::setOption &request) {
if (set_boolean_option("experiment_old_postponed_pts_updates_behavior")) {
return;
}
break;
case 'g':
if (set_integer_option("get_channel_difference_delay_milliseconds")) {
if (set_boolean_option("enable_pull_based_backpressure")) {
return;
}
break;

View File

@ -525,6 +525,8 @@ class Td final : public NetQueryCallback {
void on_request(uint64 id, const td_api::getFile &request);
void on_request(uint64 id, const td_api::getChannelDifference &request);
void on_request(uint64 id, td_api::getRemoteFile &request);
void on_request(uint64 id, td_api::getStorageStatistics &request);