Move QtsManager to UpdatesManager.

GitOrigin-RevId: 792faddd71cb3f9c07a4fd915ca782bfe2606ac3
This commit is contained in:
levlam 2020-08-02 22:07:22 +03:00
parent f7011a3853
commit c44cd3415c
9 changed files with 81 additions and 160 deletions

View File

@ -153,7 +153,7 @@ void SecretChatActor::replay_create_chat(unique_ptr<logevent::CreateSecretChat>
void SecretChatActor::add_inbound_message(unique_ptr<logevent::InboundSecretMessage> message) {
SCOPE_EXIT {
if (message) {
message->qts_ack.set_value(Unit());
message->promise.set_value(Unit());
}
};
if (close_flag_) {
@ -877,7 +877,7 @@ Result<std::tuple<uint64, BufferSlice, int32>> SecretChatActor::decrypt(BufferSl
Status SecretChatActor::do_inbound_message_encrypted(unique_ptr<logevent::InboundSecretMessage> message) {
SCOPE_EXIT {
if (message) {
message->qts_ack.set_value(Unit());
message->promise.set_value(Unit());
}
};
TRY_RESULT(decrypted, decrypt(message->encrypted_message));
@ -969,13 +969,13 @@ Status SecretChatActor::check_seq_no(int in_seq_no, int out_seq_no, int32 his_la
Status SecretChatActor::do_inbound_message_decrypted_unchecked(unique_ptr<logevent::InboundSecretMessage> message) {
SCOPE_EXIT {
CHECK(message == nullptr || !message->qts_ack);
CHECK(message == nullptr || !message->promise);
};
auto in_seq_no = message->decrypted_message_layer->in_seq_no_;
auto out_seq_no = message->decrypted_message_layer->out_seq_no_;
auto status = check_seq_no(in_seq_no, out_seq_no, message->his_layer());
if (status.is_error() && status.code() != 2 /* not gap found */) {
message->qts_ack.set_value(Unit());
message->promise.set_value(Unit());
if (message->logevent_id()) {
LOG(INFO) << "Erase binlog event: " << tag("logevent_id", message->logevent_id());
binlog_erase(context_->binlog(), message->logevent_id());
@ -1010,14 +1010,14 @@ Status SecretChatActor::do_inbound_message_decrypted_unchecked(unique_ptr<logeve
uint32 start_seq_no = static_cast<uint32>(action_resend->start_seq_no_ / 2);
uint32 finish_seq_no = static_cast<uint32>(action_resend->end_seq_no_ / 2);
if (start_seq_no + MAX_RESEND_COUNT < finish_seq_no) {
message->qts_ack.set_value(Unit());
message->promise.set_value(Unit());
return Status::Error(PSLICE() << "Won't resend more than " << MAX_RESEND_COUNT << " messages");
}
LOG(INFO) << "ActionResend: " << tag("start", start_seq_no) << tag("finish_seq_no", finish_seq_no);
for (auto seq_no = start_seq_no; seq_no <= finish_seq_no; seq_no++) {
auto it = out_seq_no_to_outbound_message_state_token_.find(seq_no);
if (it == out_seq_no_to_outbound_message_state_token_.end()) {
message->qts_ack.set_value(Unit());
message->promise.set_value(Unit());
return Status::Error(PSLICE() << "Can't resend query " << tag("seq_no", seq_no));
}
auto state_id = it->second;
@ -1198,7 +1198,7 @@ void SecretChatActor::do_inbound_message_decrypted_pending(unique_ptr<logevent::
auto logevent_id = message->logevent_id();
// qts
auto qts_promise = std::move(message->qts_ack);
auto qts_promise = std::move(message->promise);
if (logevent_id == 0) {
message->is_pending = true;
@ -1276,7 +1276,7 @@ Status SecretChatActor::do_inbound_message_decrypted(unique_ptr<logevent::Inboun
}
// qts
auto qts_promise = std::move(message->qts_ack);
auto qts_promise = std::move(message->promise);
// process message
tl_object_ptr<telegram_api::encryptedFile> file;

View File

@ -43,18 +43,6 @@
namespace td {
// qts and seq_no
// Each EncryptedMessage (update_message) has qts.
// Such updates must be handled in order of qts
//
// Qts should be handled on level of SecretChatsManager
// 1. Each update can be received by SecretChatsManager multiple times.
// 2. Each update should be sent to SecretChatActor only once. (Though SecretChatActor mustn't rely it)
// 3. Updates must be send in order of qts, without gaps.
// 4. SecretChatActor must notify SecretChatManager when update is processed (saved in database)
// 5. Only after all updates <= qts are processed by SecretChatActor, UpdatesManager should be
// notified about new qts.
//
// seq_no
// 1.
// x_in = 0 if we initiated secret chat.
@ -94,12 +82,6 @@ void SecretChatsManager::start_up() {
dummy_mode_ = true;
return;
}
// TODO: use database wrapper
auto pmc = G()->td_db()->get_binlog_pmc();
auto qts_str = pmc->get("updates.qts");
if (!qts_str.empty()) {
init_qts(to_integer<int32>(qts_str));
}
class StateCallback : public StateManager::Callback {
public:
@ -116,25 +98,6 @@ void SecretChatsManager::start_up() {
send_closure(G()->state_manager(), &StateManager::add_callback, make_unique<StateCallback>(actor_id(this)));
}
void SecretChatsManager::init_qts(int qts) {
if (dummy_mode_ || close_flag_) {
return;
}
has_qts_ = true;
qts_manager_.init(qts);
LOG(INFO) << "Init secret chats qts " << tag("qts", qts);
}
void SecretChatsManager::update_qts(int qts) {
if (dummy_mode_ || close_flag_ || qts < 0) {
return;
}
LOG(INFO) << "Update qts to " << qts;
add_qts(qts).set_value(Unit());
has_qts_ = true;
LOG(INFO) << "Update secret chats qts " << tag("qts", qts);
}
void SecretChatsManager::create_chat(int32 user_id, int64 user_access_hash, Promise<SecretChatId> promise) {
int32 random_id;
ActorId<SecretChatActor> actor;
@ -202,14 +165,6 @@ void SecretChatsManager::send_set_ttl_message(SecretChatId secret_chat_id, int32
send_closure(actor, &SecretChatActor::send_set_ttl_message, ttl, random_id, std::move(safe_promise));
}
void SecretChatsManager::before_get_difference(int32 qts) {
if (dummy_mode_ || close_flag_) {
return;
}
last_get_difference_qts_ = qts;
// We will receive all updates later than qts anyway.
}
void SecretChatsManager::on_update_chat(tl_object_ptr<telegram_api::updateEncryption> update) {
if (dummy_mode_ || close_flag_) {
return;
@ -228,47 +183,22 @@ void SecretChatsManager::do_update_chat(tl_object_ptr<telegram_api::updateEncryp
&SecretChatActor::update_chat, std::move(update->chat_));
}
void SecretChatsManager::on_update_message(tl_object_ptr<telegram_api::updateNewEncryptedMessage> update,
bool force_apply) {
void SecretChatsManager::on_new_message(tl_object_ptr<telegram_api::EncryptedMessage> &&message_ptr,
Promise<Unit> &&promise) {
if (dummy_mode_ || close_flag_) {
return;
}
// UpdatesManager MUST postpone updates during GetDifference
auto qts = update->qts_;
if (!force_apply) {
if (!has_qts_) {
LOG(INFO) << "Got update, don't know current qts. Force get_difference";
force_get_difference();
return;
}
if (qts <= last_get_difference_qts_) {
LOG(WARNING) << "Got updates with " << tag("qts", qts) << " lower or equal than "
<< tag("last get difference qts", last_get_difference_qts_);
force_get_difference();
return;
}
auto mem_qts = qts_manager_.mem_pts();
if (qts <= mem_qts) {
LOG(WARNING) << "Duplicated update " << tag("qts", qts) << tag("mem_qts", mem_qts);
return;
}
if (qts != mem_qts + 1) {
LOG(WARNING) << "Got gap in qts " << mem_qts << " ... " << qts;
force_get_difference();
// TODO wait 1 second?
return;
}
}
CHECK(message_ptr != nullptr);
auto event = make_unique<logevent::InboundSecretMessage>();
event->qts_ack = add_qts(qts);
downcast_call(*update->message_, [&](auto &x) {
event->promise = std::move(promise);
downcast_call(*message_ptr, [&](auto &x) {
event->chat_id = x.chat_id_;
event->date = x.date_;
event->encrypted_message = std::move(x.bytes_);
});
if (update->message_->get_id() == telegram_api::encryptedMessage::ID) {
auto message = move_tl_object_as<telegram_api::encryptedMessage>(update->message_);
if (message_ptr->get_id() == telegram_api::encryptedMessage::ID) {
auto message = move_tl_object_as<telegram_api::encryptedMessage>(message_ptr);
if (message->file_->get_id() == telegram_api::encryptedFile::ID) {
auto file = move_tl_object_as<telegram_api::encryptedFile>(message->file_);
@ -284,11 +214,6 @@ void SecretChatsManager::on_update_message(tl_object_ptr<telegram_api::updateNew
add_inbound_message(std::move(event));
}
Promise<> SecretChatsManager::add_qts(int32 qts) {
auto id = qts_manager_.add_pts(qts);
return PromiseCreator::event(self_closure(this, &SecretChatsManager::on_qts_ack, id));
}
void SecretChatsManager::replay_binlog_event(BinlogEvent &&binlog_event) {
if (dummy_mode_) {
binlog_erase(G()->td_db()->get_binlog(), binlog_event.id_);
@ -357,11 +282,6 @@ void SecretChatsManager::replay_outbound_message(unique_ptr<logevent::OutboundSe
send_closure_later(actor, &SecretChatActor::replay_outbound_message, std::move(message));
}
void SecretChatsManager::force_get_difference() {
LOG(INFO) << "Force get difference";
send_closure(G()->td(), &Td::force_get_difference);
}
ActorId<SecretChatActor> SecretChatsManager::get_chat_actor(int32 id) {
return create_chat_actor_impl(id, false);
}
@ -501,19 +421,6 @@ ActorId<SecretChatActor> SecretChatsManager::create_chat_actor_impl(int32 id, bo
}
}
void SecretChatsManager::on_qts_ack(PtsManager::PtsId qts_ack_token) {
auto old_qts = qts_manager_.db_pts();
auto new_qts = qts_manager_.finish(qts_ack_token);
if (old_qts != new_qts) {
save_qts();
}
}
void SecretChatsManager::save_qts() {
LOG(INFO) << "Save " << tag("qts", qts_manager_.db_pts());
send_closure(G()->td(), &Td::update_qts, qts_manager_.db_pts());
}
void SecretChatsManager::hangup() {
close_flag_ = true;
if (dummy_mode_) {

View File

@ -29,16 +29,11 @@ struct BinlogEvent;
class SecretChatsManager : public Actor {
public:
explicit SecretChatsManager(ActorShared<> parent);
void init_qts(int32 qts);
void update_qts(int32 qts);
// we can forget all pending_updates after start_get_difference they will be received after this point anyway
// It is not necessary, but it will help.
void before_get_difference(int32 qts);
// Proxy query to corrensponding SecretChatActor.
// Look for more info in SecretChatActor.h
void on_update_chat(tl_object_ptr<telegram_api::updateEncryption> update);
void on_update_message(tl_object_ptr<telegram_api::updateNewEncryptedMessage> update, bool force_apply);
void on_new_message(tl_object_ptr<telegram_api::EncryptedMessage> &&message_ptr, Promise<Unit> &&promise);
void create_chat(int32 user_id, int64 user_access_hash, Promise<SecretChatId> promise);
void cancel_chat(SecretChatId, Promise<> promise);
@ -60,13 +55,9 @@ class SecretChatsManager : public Actor {
bool binlog_replay_finish_flag_ = false;
bool dummy_mode_ = false;
bool close_flag_ = false;
bool has_qts_ = false;
ActorShared<> parent_;
std::map<int32, ActorOwn<SecretChatActor>> id_to_actor_;
PtsManager qts_manager_;
int32 last_get_difference_qts_ = -1;
bool is_online_{false};
std::vector<std::pair<Timestamp, telegram_api::object_ptr<telegram_api::updateEncryption>>> pending_chat_updates_;
@ -83,10 +74,6 @@ class SecretChatsManager : public Actor {
ActorId<SecretChatActor> get_chat_actor(int32 id);
ActorId<SecretChatActor> create_chat_actor(int32 id);
ActorId<SecretChatActor> create_chat_actor_impl(int32 id, bool can_be_empty);
Promise<> add_qts(int32 qts);
void on_qts_ack(PtsManager::PtsId qts_ack_token);
void save_qts();
void force_get_difference();
void start_up() override;
void hangup() override;

View File

@ -3520,14 +3520,6 @@ void Td::send(NetQueryPtr &&query) {
G()->net_query_dispatcher().dispatch(std::move(query));
}
void Td::update_qts(int32 qts) {
if (close_flag_ > 1) {
return;
}
updates_manager_->set_qts(qts);
}
void Td::force_get_difference() {
if (close_flag_) {
return;

View File

@ -105,8 +105,6 @@ class Td final : public NetQueryCallback {
void destroy();
void close();
void update_qts(int32 qts);
void force_get_difference();
void schedule_get_terms_of_service(int32 expires_in);

View File

@ -227,9 +227,7 @@ void UpdatesManager::before_get_difference(bool is_initial) {
send_closure(G()->state_manager(), &StateManager::on_synchronized, false);
td_->messages_manager_->before_get_difference();
if (!is_initial) {
send_closure(td_->secret_chats_manager_, &SecretChatsManager::before_get_difference, get_qts());
}
send_closure_later(td_->notification_manager_actor_, &NotificationManager::before_get_difference);
}
@ -238,6 +236,11 @@ Promise<> UpdatesManager::add_pts(int32 pts) {
return PromiseCreator::event(self_closure(this, &UpdatesManager::on_pts_ack, id));
}
Promise<> UpdatesManager::add_qts(int32 qts) {
auto id = qts_manager_.add_pts(qts);
return PromiseCreator::event(self_closure(this, &UpdatesManager::on_qts_ack, id));
}
void UpdatesManager::on_pts_ack(PtsManager::PtsId ack_token) {
auto old_pts = pts_manager_.db_pts();
auto new_pts = pts_manager_.finish(ack_token);
@ -246,6 +249,14 @@ void UpdatesManager::on_pts_ack(PtsManager::PtsId ack_token) {
}
}
void UpdatesManager::on_qts_ack(PtsManager::PtsId ack_token) {
auto old_qts = qts_manager_.db_pts();
auto new_qts = qts_manager_.finish(ack_token);
if (old_qts != new_qts) {
save_qts(new_qts);
}
}
void UpdatesManager::save_pts(int32 pts) {
if (pts == std::numeric_limits<int32>::max()) {
G()->td_db()->get_binlog_pmc()->erase("updates.pts");
@ -254,6 +265,12 @@ void UpdatesManager::save_pts(int32 pts) {
}
}
void UpdatesManager::save_qts(int32 qts) {
if (!G()->ignore_backgrond_updates()) {
G()->td_db()->get_binlog_pmc()->set("updates.qts", to_string(qts));
}
}
Promise<> UpdatesManager::set_pts(int32 pts, const char *source) {
if (pts == std::numeric_limits<int32>::max()) {
LOG(WARNING) << "Update pts from " << get_pts() << " to -1 from " << source;
@ -281,17 +298,20 @@ Promise<> UpdatesManager::set_pts(int32 pts, const char *source) {
return result;
}
void UpdatesManager::set_qts(int32 qts) {
if (qts > qts_) {
LOG(INFO) << "Update qts to " << qts;
qts_ = qts;
if (!G()->ignore_backgrond_updates()) {
G()->td_db()->get_binlog_pmc()->set("updates.qts", to_string(qts));
Promise<> UpdatesManager::set_qts(int32 qts) {
Promise<> result;
if (qts > get_qts() || (0 < qts && qts < get_qts() - 399999)) { // qts can only go up or drop cardinally
if (qts < get_qts() - 399999) {
LOG(WARNING) << "Qts decreases from " << get_qts() << " to " << qts;
} else {
LOG(INFO) << "Update qts from " << get_qts() << " to " << qts;
}
} else if (qts < qts_) {
LOG(ERROR) << "Receive wrong qts = " << qts << ". Current qts = " << qts_;
result = add_qts(qts);
} else if (qts < get_qts()) {
LOG(ERROR) << "Receive wrong qts = " << qts << " less than current qts = " << get_qts();
}
return result;
}
void UpdatesManager::set_date(int32 date, bool from_update, string date_source) {
@ -791,7 +811,7 @@ void UpdatesManager::on_get_updates_state(tl_object_ptr<telegram_api::updates_st
string full_source = "on_get_updates_state " + oneline(to_string(state)) + " from " + source;
set_pts(state->pts_, full_source.c_str()).set_value(Unit());
set_date(state->date_, false, std::move(full_source));
// set_qts(state->qts_);
// set_qts(state->qts_).set_value(Unit());
seq_ = state->seq_;
}
@ -952,11 +972,10 @@ void UpdatesManager::init_state() {
}
pts_manager_.init(to_integer<int32>(pts_str));
last_get_difference_pts_ = get_pts();
qts_ = to_integer<int32>(pmc->get("updates.qts"));
qts_manager_.init(to_integer<int32>(pmc->get("updates.qts")));
date_ = to_integer<int32>(pmc->get("updates.date"));
date_source_ = "database";
LOG(DEBUG) << "Init: " << get_pts() << " " << qts_ << " " << date_;
send_closure(td_->secret_chats_manager_, &SecretChatsManager::init_qts, qts_);
LOG(DEBUG) << "Init: " << get_pts() << " " << get_qts() << " " << date_;
get_difference("init_state");
}
@ -1014,11 +1033,13 @@ void UpdatesManager::process_get_difference_updates(
}
for (auto &encrypted_message : new_encrypted_messages) {
on_update(make_tl_object<telegram_api::updateNewEncryptedMessage>(std::move(encrypted_message), 0), true);
send_closure(td_->secret_chats_manager_, &SecretChatsManager::on_new_message, std::move(encrypted_message),
Promise<Unit>());
}
send_closure(td_->secret_chats_manager_, &SecretChatsManager::update_qts, qts);
process_updates(std::move(other_updates), true);
set_qts(qts).set_value(Unit());
}
void UpdatesManager::on_get_difference(tl_object_ptr<telegram_api::updates_Difference> &&difference_ptr) {
@ -1060,7 +1081,7 @@ void UpdatesManager::on_get_difference(tl_object_ptr<telegram_api::updates_Diffe
case telegram_api::updates_differenceSlice::ID: {
auto difference = move_tl_object_as<telegram_api::updates_differenceSlice>(difference_ptr);
if (difference->intermediate_state_->pts_ >= get_pts() && get_pts() != std::numeric_limits<int32>::max() &&
difference->intermediate_state_->date_ >= date_ && difference->intermediate_state_->qts_ == qts_) {
difference->intermediate_state_->date_ >= date_ && difference->intermediate_state_->qts_ == get_qts()) {
// TODO send new getDifference request and apply difference slice only after that
}
@ -1915,7 +1936,19 @@ void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateEncryption> upd
}
void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateNewEncryptedMessage> update, bool force_apply) {
send_closure(td_->secret_chats_manager_, &SecretChatsManager::on_update_message, std::move(update), force_apply);
if (!force_apply) {
if (update->qts_ <= get_qts()) {
LOG(INFO) << "Ignore already processed update with qts " << update->qts_;
return;
}
if (update->qts_ != get_qts() + 1) {
// TODO fill gap
return;
}
}
send_closure(td_->secret_chats_manager_, &SecretChatsManager::on_new_message, std::move(update->message_),
add_qts(update->qts_));
}
void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateEncryptedMessagesRead> update, bool /*force_apply*/) {

View File

@ -63,7 +63,7 @@ class UpdatesManager : public Actor {
return pts_manager_.mem_pts();
}
int32 get_qts() const {
return qts_;
return qts_manager_.mem_pts();
}
int32 get_date() const {
return date_;
@ -71,7 +71,7 @@ class UpdatesManager : public Actor {
Promise<> set_pts(int32 pts, const char *source) TD_WARN_UNUSED_RESULT;
void set_qts(int32 qts);
Promise<> set_qts(int32 qts) TD_WARN_UNUSED_RESULT;
static const double MAX_UNFILLED_GAP_TIME;
@ -102,7 +102,7 @@ class UpdatesManager : public Actor {
ActorShared<> parent_;
PtsManager pts_manager_;
int32 qts_ = 0;
PtsManager qts_manager_;
int32 date_ = 0;
int32 seq_ = 0;
string date_source_ = "nowhere";
@ -126,6 +126,10 @@ class UpdatesManager : public Actor {
void on_pts_ack(PtsManager::PtsId ack_token);
void save_pts(int32 pts);
Promise<> add_qts(int32 qts);
void on_qts_ack(PtsManager::PtsId ack_token);
void save_qts(int32 qts);
void set_date(int32 date, bool from_update, string date_source);
int32 get_short_update_date() const;
@ -260,7 +264,7 @@ class UpdatesManager : public Actor {
void on_update(tl_object_ptr<telegram_api::updatePrivacy> update, bool /*force_apply*/);
void on_update(tl_object_ptr<telegram_api::updateEncryption> update, bool /*force_apply*/);
void on_update(tl_object_ptr<telegram_api::updateNewEncryptedMessage> update, bool /*force_apply*/);
void on_update(tl_object_ptr<telegram_api::updateNewEncryptedMessage> update, bool force_apply);
void on_update(tl_object_ptr<telegram_api::updateEncryptedMessagesRead> update, bool /*force_apply*/);
void on_update(tl_object_ptr<telegram_api::updateNewStickerSet> update, bool /*force_apply*/);

View File

@ -211,7 +211,7 @@ class InboundSecretMessage : public SecretChatLogEventBase<InboundSecretMessage>
int32 date = 0;
BufferSlice encrypted_message; // empty when we store event to binlog
Promise<Unit> qts_ack;
Promise<Unit> promise;
bool is_checked = false;
// after decrypted and checked
@ -245,7 +245,7 @@ class InboundSecretMessage : public SecretChatLogEventBase<InboundSecretMessage>
store(chat_id, storer);
store(date, storer);
// skip encrypted_message
// skip qts_ack
// skip promise
// TODO
decrypted_message_layer->store(storer);
@ -278,7 +278,7 @@ class InboundSecretMessage : public SecretChatLogEventBase<InboundSecretMessage>
parse(chat_id, parser);
parse(date, parser);
// skip encrypted_message
// skip qts_ack
// skip promise
// TODO
decrypted_message_layer = secret_api::decryptedMessageLayer::fetch(parser);

View File

@ -591,7 +591,7 @@ class Master : public Actor {
event->chat_id = chat_id;
event->date = 0;
event->encrypted_message = std::move(data);
event->qts_ack = PromiseCreator::lambda(
event->promise = PromiseCreator::lambda(
[actor_id = actor_id(this), chat_id, data = event->encrypted_message.copy(), crc](Result<> result) mutable {
if (result.is_ok()) {
LOG(INFO) << "FINISH add_inbound_message " << tag("crc", crc);