Store all promises for pending qts updates.

This commit is contained in:
levlam 2020-12-23 01:58:56 +03:00
parent 6d6d1b20b6
commit b9d1530c78
2 changed files with 22 additions and 13 deletions

View File

@ -1119,17 +1119,24 @@ void UpdatesManager::on_get_difference(tl_object_ptr<telegram_api::updates_Diffe
seq_ = difference->seq_; seq_ = difference->seq_;
if (!pending_seq_updates_.empty()) { if (!pending_seq_updates_.empty()) {
LOG(WARNING) << "Drop " << pending_seq_updates_.size() << " pending seq updates after receive empty difference"; LOG(WARNING) << "Drop " << pending_seq_updates_.size() << " pending seq updates after receive empty difference";
for (auto &pending_update : pending_seq_updates_) { auto pending_seq_updates = std::move(pending_seq_updates_);
pending_seq_updates_.clear();
for (auto &pending_update : pending_seq_updates) {
pending_update.second.promise.set_value(Unit()); pending_update.second.promise.set_value(Unit());
} }
pending_seq_updates_.clear();
} }
if (!pending_qts_updates_.empty()) { if (!pending_qts_updates_.empty()) {
LOG(WARNING) << "Drop " << pending_qts_updates_.size() << " pending qts updates after receive empty difference"; LOG(WARNING) << "Drop " << pending_qts_updates_.size() << " pending qts updates after receive empty difference";
for (auto &pending_update : pending_qts_updates_) { auto pending_qts_updates = std::move(pending_qts_updates_);
pending_update.second.promise.set_value(Unit());
}
pending_qts_updates_.clear(); pending_qts_updates_.clear();
for (auto &pending_update : pending_qts_updates) {
auto promises = std::move(pending_update.second.promises);
for (auto &promise : promises) {
promise.set_value(Unit());
}
}
} }
break; break;
} }
@ -1446,10 +1453,8 @@ void UpdatesManager::on_pending_updates(vector<tl_object_ptr<telegram_api::Updat
LOG(INFO) << "Gap in seq has found. Receive " << updates.size() << " updates [" << seq_begin << ", " << seq_end LOG(INFO) << "Gap in seq has found. Receive " << updates.size() << " updates [" << seq_begin << ", " << seq_end
<< "] from " << source << ", but seq = " << seq_; << "] from " << source << ", but seq = " << seq_;
if (pending_seq_updates_.find(seq_begin) != pending_seq_updates_.end()) { LOG_IF(WARNING, pending_seq_updates_.find(seq_begin) != pending_seq_updates_.end())
LOG(WARNING) << "Already have pending updates with seq = " << seq_begin << ", but receive it again from " << source; << "Already have pending updates with seq = " << seq_begin << ", but receive it again from " << source;
pending_seq_updates_.find(seq_begin)->second.promise.set_value(Unit()); // TODO
}
pending_seq_updates_.emplace(seq_begin, pending_seq_updates_.emplace(seq_begin,
PendingUpdates(seq_begin, seq_end, date, std::move(updates), mpas.get_promise())); PendingUpdates(seq_begin, seq_end, date, std::move(updates), mpas.get_promise()));
@ -1492,10 +1497,9 @@ void UpdatesManager::add_pending_qts_update(tl_object_ptr<telegram_api::Update>
auto &pending_update = pending_qts_updates_[qts]; auto &pending_update = pending_qts_updates_[qts];
if (pending_update.update != nullptr) { if (pending_update.update != nullptr) {
LOG(WARNING) << "Receive duplicate update with qts = " << qts; LOG(WARNING) << "Receive duplicate update with qts = " << qts;
pending_update.promise.set_value(Unit()); // TODO
} }
pending_update.update = std::move(update); pending_update.update = std::move(update);
pending_update.promise = std::move(promise); pending_update.promises.push_back(std::move(promise));
return; return;
} }
@ -1644,7 +1648,12 @@ void UpdatesManager::process_pending_qts_updates() {
break; break;
} }
if (qts == get_qts() + 1) { if (qts == get_qts() + 1) {
process_qts_update(std::move(update_it->second.update), qts, std::move(update_it->second.promise)); auto promise = PromiseCreator::lambda([promises = std::move(update_it->second.promises)](Unit) mutable {
for (auto &promise : promises) {
promise.set_value(Unit());
}
});
process_qts_update(std::move(update_it->second.update), qts, std::move(promise));
} }
pending_qts_updates_.erase(update_it); pending_qts_updates_.erase(update_it);
} }

View File

@ -103,7 +103,7 @@ class UpdatesManager : public Actor {
class PendingQtsUpdate { class PendingQtsUpdate {
public: public:
tl_object_ptr<telegram_api::Update> update; tl_object_ptr<telegram_api::Update> update;
Promise<Unit> promise; vector<Promise<Unit>> promises;
}; };
Td *td_; Td *td_;