Avoid MultiPromise usage if there is only one update to process.

This commit is contained in:
levlam 2022-06-23 22:00:48 +03:00
parent 638b4346ca
commit d29d508b84

View File

@ -63,6 +63,7 @@
#include "td/utils/logging.h"
#include "td/utils/misc.h"
#include "td/utils/Random.h"
#include "td/utils/ScopeGuard.h"
#include "td/utils/Slice.h"
#include "td/utils/SliceBuilder.h"
#include "td/utils/Status.h"
@ -1826,11 +1827,29 @@ void UpdatesManager::on_pending_updates(vector<tl_object_ptr<telegram_api::Updat
}
MultiPromiseActorSafe mpas{"OnPendingUpdatesMultiPromiseActor"};
being_processed_updates_++;
mpas.add_promise([actor_id = create_reference(), promise = std::move(promise)](Result<Unit> &&result) mutable {
send_closure(actor_id, &UpdatesManager::on_pending_updates_processed, std::move(result), std::move(promise));
});
auto lock = mpas.get_promise();
Promise<Unit> lock;
auto use_mpas = need_postpone || update_count != 1;
auto get_promise = [&] {
if (use_mpas) {
return mpas.get_promise();
} else {
CHECK(update_count != 0);
update_count--;
return std::move(promise);
}
};
if (use_mpas) {
being_processed_updates_++;
mpas.add_promise([actor_id = create_reference(), promise = std::move(promise)](Result<Unit> &&result) mutable {
send_closure(actor_id, &UpdatesManager::on_pending_updates_processed, std::move(result), std::move(promise));
});
lock = get_promise();
}
SCOPE_EXIT {
if (!use_mpas && update_count == 1) {
promise.set_value(Unit());
}
};
for (auto &update : updates) {
if (update != nullptr) {
@ -1853,11 +1872,11 @@ void UpdatesManager::on_pending_updates(vector<tl_object_ptr<telegram_api::Updat
update = nullptr;
}
if (id == telegram_api::updateFolderPeers::ID) {
on_update(move_tl_object_as<telegram_api::updateFolderPeers>(update), mpas.get_promise());
on_update(move_tl_object_as<telegram_api::updateFolderPeers>(update), get_promise());
update = nullptr;
}
if (id == telegram_api::updateEncryption::ID) {
on_update(move_tl_object_as<telegram_api::updateEncryption>(update), mpas.get_promise());
on_update(move_tl_object_as<telegram_api::updateEncryption>(update), get_promise());
update = nullptr;
}
CHECK(need_postpone || !running_get_difference_);
@ -1873,7 +1892,7 @@ void UpdatesManager::on_pending_updates(vector<tl_object_ptr<telegram_api::Updat
min_postponed_update_pts_ = pts;
}
}
downcast_call(*update, OnUpdate(this, update, mpas.get_promise()));
downcast_call(*update, OnUpdate(this, update, get_promise()));
update = nullptr;
} else if (is_qts_update(update.get())) {
if (running_get_difference_) {
@ -1882,7 +1901,7 @@ void UpdatesManager::on_pending_updates(vector<tl_object_ptr<telegram_api::Updat
min_postponed_update_qts_ = qts;
}
}
downcast_call(*update, OnUpdate(this, update, mpas.get_promise()));
downcast_call(*update, OnUpdate(this, update, get_promise()));
update = nullptr;
}
}
@ -1909,14 +1928,14 @@ void UpdatesManager::on_pending_updates(vector<tl_object_ptr<telegram_api::Updat
LOG(ERROR) << "Run get difference while applying updates from " << source;
}
postponed_updates_.emplace(
seq_begin, PendingSeqUpdates(seq_begin, seq_end, date, receive_time, std::move(updates), mpas.get_promise()));
seq_begin, PendingSeqUpdates(seq_begin, seq_end, date, receive_time, std::move(updates), get_promise()));
return lock.set_value(Unit());
}
if (seq_begin == 0 || seq_begin == seq_ + 1) {
LOG(INFO) << "Process " << updates.size() << " updates [" << seq_begin << ", " << seq_end
<< "] with date = " << date << " from " << source;
process_seq_updates(seq_end, date, std::move(updates), mpas.get_promise());
process_seq_updates(seq_end, date, std::move(updates), get_promise());
process_pending_seq_updates();
return lock.set_value(Unit());
}
@ -1941,7 +1960,7 @@ void UpdatesManager::on_pending_updates(vector<tl_object_ptr<telegram_api::Updat
<< "Already have pending updates with seq = " << seq_begin << ", but receive it again from " << source;
pending_seq_updates_.emplace(
seq_begin, PendingSeqUpdates(seq_begin, seq_end, date, receive_time, std::move(updates), mpas.get_promise()));
seq_begin, PendingSeqUpdates(seq_begin, seq_end, date, receive_time, std::move(updates), get_promise()));
set_seq_gap_timeout(receive_time + MAX_UNFILLED_GAP_TIME - Time::now());
lock.set_value(Unit());
}
@ -2002,9 +2021,34 @@ void UpdatesManager::process_updates(vector<tl_object_ptr<telegram_api::Update>>
Promise<Unit> &&promise) {
tl_object_ptr<telegram_api::updatePtsChanged> update_pts_changed;
int32 update_count = 0;
for (auto &update : updates) {
if (update != nullptr) {
update_count++;
}
}
MultiPromiseActorSafe mpas{"OnProcessUpdatesMultiPromiseActor"};
mpas.add_promise(std::move(promise));
auto lock = mpas.get_promise();
Promise<Unit> lock;
auto use_mpas = update_count != 1;
auto get_promise = [&] {
if (use_mpas) {
return mpas.get_promise();
} else {
CHECK(update_count != 0);
update_count--;
return std::move(promise);
}
};
if (use_mpas) {
mpas.add_promise(std::move(promise));
lock = get_promise();
}
SCOPE_EXIT {
if (!use_mpas && update_count == 1) {
promise.set_value(Unit());
}
};
/*
for (auto &update : updates) {
@ -2013,7 +2057,7 @@ void UpdatesManager::process_updates(vector<tl_object_ptr<telegram_api::Update>>
// process updateReadChannelInbox before updateNewChannelMessage
auto constructor_id = update->get_id();
if (constructor_id == telegram_api::updateReadChannelInbox::ID) {
on_update(move_tl_object_as<telegram_api::updateReadChannelInbox>(update), mpas.get_promise());
on_update(move_tl_object_as<telegram_api::updateReadChannelInbox>(update), get_promise());
}
}
}
@ -2023,19 +2067,19 @@ void UpdatesManager::process_updates(vector<tl_object_ptr<telegram_api::Update>>
// process updateNewChannelMessage first
auto constructor_id = update->get_id();
if (constructor_id == telegram_api::updateNewChannelMessage::ID) {
on_update(move_tl_object_as<telegram_api::updateNewChannelMessage>(update), mpas.get_promise());
on_update(move_tl_object_as<telegram_api::updateNewChannelMessage>(update), get_promise());
continue;
}
// process updateNewScheduledMessage first
if (constructor_id == telegram_api::updateNewScheduledMessage::ID) {
on_update(move_tl_object_as<telegram_api::updateNewScheduledMessage>(update), mpas.get_promise());
on_update(move_tl_object_as<telegram_api::updateNewScheduledMessage>(update), get_promise());
continue;
}
// updateGroupCallConnection must be processed before updateGroupCall
if (constructor_id == telegram_api::updateGroupCallConnection::ID) {
on_update(move_tl_object_as<telegram_api::updateGroupCallConnection>(update), mpas.get_promise());
on_update(move_tl_object_as<telegram_api::updateGroupCallConnection>(update), get_promise());
continue;
}
@ -2065,7 +2109,7 @@ void UpdatesManager::process_updates(vector<tl_object_ptr<telegram_api::Update>>
process_pts_update(std::move(update));
} else if (is_qts_update(update.get())) {
process_qts_update(std::move(update), 0, mpas.get_promise());
process_qts_update(std::move(update), 0, get_promise());
} else if (update->get_id() == telegram_api::updateChannelTooLong::ID) {
td_->messages_manager_->on_update_channel_too_long(
move_tl_object_as<telegram_api::updateChannelTooLong>(update), true);
@ -2076,12 +2120,12 @@ void UpdatesManager::process_updates(vector<tl_object_ptr<telegram_api::Update>>
for (auto &update : updates) {
if (update != nullptr) {
LOG(INFO) << "Process update " << to_string(update);
downcast_call(*update, OnUpdate(this, update, mpas.get_promise()));
downcast_call(*update, OnUpdate(this, update, get_promise()));
CHECK(!running_get_difference_);
}
}
if (update_pts_changed != nullptr) {
on_update(std::move(update_pts_changed), mpas.get_promise());
on_update(std::move(update_pts_changed), get_promise());
}
lock.set_value(Unit());
}