Apply typings and other short updates immediately.

This commit is contained in:
levlam 2021-08-20 14:18:11 +03:00
parent 7261c9579f
commit 7129a6d090
1 changed files with 40 additions and 23 deletions

View File

@ -70,19 +70,19 @@ namespace td {
int VERBOSITY_NAME(get_difference) = VERBOSITY_NAME(INFO);
class OnUpdate {
UpdatesManager *manager_;
UpdatesManager *updates_manager_;
tl_object_ptr<telegram_api::Update> &update_;
Promise<Unit> &promise_;
mutable Promise<Unit> promise_;
public:
OnUpdate(UpdatesManager *manager, tl_object_ptr<telegram_api::Update> &update, Promise<Unit> &promise)
: manager_(manager), update_(update), promise_(promise) {
OnUpdate(UpdatesManager *updates_manager, tl_object_ptr<telegram_api::Update> &update, Promise<Unit> &&promise)
: updates_manager_(updates_manager), update_(update), promise_(std::move(promise)) {
}
template <class T>
void operator()(T &obj) const {
CHECK(&*update_ == &obj);
manager_->on_update(move_tl_object_as<T>(update_), std::move(promise_));
updates_manager_->on_update(move_tl_object_as<T>(update_), std::move(promise_));
}
};
@ -820,7 +820,7 @@ void UpdatesManager::on_get_updates(tl_object_ptr<telegram_api::Updates> &&updat
case telegram_api::updateLangPackTooLong::ID:
case telegram_api::updateLangPack::ID:
LOG(INFO) << "Apply without authorization " << to_string(updates_ptr);
downcast_call(*update, OnUpdate(this, update, promise));
downcast_call(*update, OnUpdate(this, update, std::move(promise)));
return;
default:
break;
@ -895,9 +895,7 @@ void UpdatesManager::on_get_updates(tl_object_ptr<telegram_api::Updates> &&updat
return get_difference("unacceptable short update");
}
short_update_date_ = update->date_;
if (!downcast_call(*update->update_, OnUpdate(this, update->update_, promise))) {
LOG(FATAL) << "Can't call on some update";
}
downcast_call(*update->update_, OnUpdate(this, update->update_, std::move(promise)));
short_update_date_ = 0;
break;
}
@ -1539,7 +1537,34 @@ void UpdatesManager::on_pending_updates(vector<tl_object_ptr<telegram_api::Updat
return promise.set_value(Unit());
}
if (running_get_difference_ /*|| string(source) != string("postponed updates")*/) {
for (auto &update : updates) {
if (update != nullptr) {
switch (update->get_id()) {
case telegram_api::updateUserTyping::ID:
case telegram_api::updateChatUserTyping::ID:
case telegram_api::updateChannelUserTyping::ID:
case telegram_api::updateEncryptedChatTyping::ID:
case telegram_api::updateLoginToken::ID:
case telegram_api::updateDcOptions::ID:
case telegram_api::updateConfig::ID:
case telegram_api::updateServiceNotification::ID:
case telegram_api::updateLangPackTooLong::ID:
case telegram_api::updateLangPack::ID:
short_update_date_ = date;
LOG(INFO) << "Process short " << oneline(to_string(update));
// don't need promise for short update
downcast_call(*update, OnUpdate(this, update, Promise<Unit>()));
short_update_date_ = 0;
update = nullptr;
break;
default:
break;
}
}
}
bool need_postpone = running_get_difference_ /*|| string(source) != string("postponed updates")*/;
if (need_postpone) {
LOG(INFO) << "Postpone " << updates.size() << " updates [" << seq_begin << ", " << seq_end
<< "] with date = " << date << " from " << source;
for (auto &update : updates) {
@ -1557,8 +1582,6 @@ void UpdatesManager::on_pending_updates(vector<tl_object_ptr<telegram_api::Updat
return;
}
// TODO typings must be processed before NewMessage
for (auto &update : updates) {
if (!is_acceptable_update(update.get())) {
CHECK(update != nullptr);
@ -1685,10 +1708,7 @@ void UpdatesManager::on_pending_updates(vector<tl_object_ptr<telegram_api::Updat
for (auto &update : updates) {
if (update != nullptr) {
if (is_pts_update(update.get()) || is_qts_update(update.get())) {
auto update_promise = mpas.get_promise();
if (!downcast_call(*update, OnUpdate(this, update, update_promise))) {
LOG(FATAL) << "Can't call on some update received from " << source;
}
downcast_call(*update, OnUpdate(this, update, mpas.get_promise()));
processed_updates++;
update = nullptr;
}
@ -1869,10 +1889,7 @@ void UpdatesManager::process_updates(vector<tl_object_ptr<telegram_api::Update>>
for (auto &update : updates) {
if (update != nullptr) {
LOG(INFO) << "Process update " << to_string(update);
auto update_promise = mpas.get_promise();
if (!downcast_call(*update, OnUpdate(this, update, update_promise))) {
LOG(FATAL) << "Can't call on some update";
}
downcast_call(*update, OnUpdate(this, update, mpas.get_promise()));
CHECK(!running_get_difference_);
}
}
@ -2161,9 +2178,9 @@ void UpdatesManager::process_postponed_pts_updates() {
if (old_pts > new_pts - pts_count || last_update_it == postponed_pts_updates_.end() ||
i == GAP_TIMEOUT_UPDATE_COUNT) {
// the updates can't be applied
VLOG(get_difference) << "Can't apply " << i << " next postponed updates with pts " << update_it->second.pts << '-'
<< new_pts << ", because their pts_count is " << pts_count << " instead of expected "
<< new_pts - old_pts;
VLOG(get_difference) << "Can't apply " << i << " next postponed updates with pts " << update_it->second.pts
<< '-' << new_pts << ", because their pts_count is " << pts_count
<< " instead of expected " << new_pts - old_pts;
last_update_it = update_it;
break;
}