e7aac1c9b3
GitOrigin-RevId: d0af7ebd5e03471dd33b1baf3506b6c72d2518fc
2302 lines
92 KiB
C++
2302 lines
92 KiB
C++
//
|
|
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2019
|
|
//
|
|
// Distributed under the Boost Software License, Version 1.0. (See accompanying
|
|
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
|
|
//
|
|
#include "td/telegram/SecretChatActor.h"
|
|
|
|
#include "td/telegram/net/NetQueryCreator.h"
|
|
#include "td/telegram/SecretChatId.h"
|
|
#include "td/telegram/UniqueId.h"
|
|
|
|
#include "td/telegram/secret_api.hpp"
|
|
#include "td/telegram/telegram_api.hpp"
|
|
|
|
#include "td/mtproto/PacketInfo.h"
|
|
#include "td/mtproto/PacketStorer.h"
|
|
#include "td/mtproto/Transport.h"
|
|
#include "td/mtproto/utils.h"
|
|
|
|
#include "td/db/binlog/BinlogHelper.h"
|
|
#include "td/db/binlog/BinlogInterface.h"
|
|
|
|
#include "td/actor/MultiPromise.h"
|
|
|
|
#include "td/utils/as.h"
|
|
#include "td/utils/crypto.h"
|
|
#include "td/utils/format.h"
|
|
#include "td/utils/logging.h"
|
|
#include "td/utils/misc.h"
|
|
#include "td/utils/overloaded.h"
|
|
#include "td/utils/Random.h"
|
|
#include "td/utils/ScopeGuard.h"
|
|
#include "td/utils/StorerBase.h"
|
|
#include "td/utils/Time.h"
|
|
#include "td/utils/tl_parsers.h"
|
|
|
|
#include <array>
|
|
#include <tuple>
|
|
#include <type_traits>
|
|
|
|
#define G GLOBAL_SHOULD_NOT_BE_USED_HERE
|
|
|
|
namespace td {
|
|
|
|
inline TLObjectStorer<secret_api::Object> create_storer(const secret_api::Object &object) {
|
|
return TLObjectStorer<secret_api::Object>(object);
|
|
}
|
|
|
|
class SecretImpl {
|
|
public:
|
|
explicit SecretImpl(const Storer &data) : data(data) {
|
|
}
|
|
template <class StorerT>
|
|
void do_store(StorerT &storer) const {
|
|
storer.store_binary(static_cast<int32>(data.size()));
|
|
storer.store_storer(data);
|
|
}
|
|
|
|
private:
|
|
const Storer &data;
|
|
};
|
|
|
|
SecretChatActor::SecretChatActor(int32 id, unique_ptr<Context> context, bool can_be_empty)
|
|
: context_(std::move(context)), can_be_empty_(can_be_empty) {
|
|
auth_state_.id = id;
|
|
}
|
|
|
|
void SecretChatActor::update_chat(telegram_api::object_ptr<telegram_api::EncryptedChat> chat) {
|
|
if (close_flag_) {
|
|
return;
|
|
}
|
|
check_status(on_update_chat(std::move(chat)));
|
|
loop();
|
|
}
|
|
|
|
void SecretChatActor::create_chat(int32 user_id, int64 user_access_hash, int32 random_id,
|
|
Promise<SecretChatId> promise) {
|
|
if (close_flag_) {
|
|
promise.set_error(Status::Error(400, "Chat is closed"));
|
|
return;
|
|
}
|
|
if (auth_state_.state != State::Empty) {
|
|
promise.set_error(Status::Error(500, "Bad random_id"));
|
|
check_status(Status::Error("Unexpected request_chat"));
|
|
loop();
|
|
return;
|
|
}
|
|
|
|
auto event = make_unique<logevent::CreateSecretChat>();
|
|
event->user_id = user_id;
|
|
event->user_access_hash = user_access_hash;
|
|
event->random_id = random_id;
|
|
event->set_logevent_id(binlog_add(context_->binlog(), LogEvent::HandlerType::SecretChats, create_storer(*event)));
|
|
do_create_chat_impl(std::move(event));
|
|
promise.set_value(SecretChatId(random_id));
|
|
loop();
|
|
}
|
|
|
|
void SecretChatActor::on_result_resendable(NetQueryPtr net_query, Promise<NetQueryPtr> promise) {
|
|
LOG(INFO) << "In on_result_resendable: " << net_query << " " << close_flag_;
|
|
if (context_->close_flag()) {
|
|
return;
|
|
}
|
|
|
|
auto key = UniqueId::extract_key(net_query->id());
|
|
if (close_flag_) {
|
|
if (key == static_cast<uint8>(QueryType::DiscardEncryption)) {
|
|
on_discard_encryption_result(std::move(net_query));
|
|
}
|
|
return;
|
|
}
|
|
check_status([&] {
|
|
switch (key) {
|
|
case static_cast<uint8>(QueryType::DhConfig):
|
|
return on_dh_config(std::move(net_query));
|
|
case static_cast<uint8>(QueryType::EncryptedChat):
|
|
return on_update_chat(std::move(net_query));
|
|
case static_cast<uint8>(QueryType::Message):
|
|
on_outbound_send_message_result(std::move(net_query), std::move(promise));
|
|
return Status::OK();
|
|
case static_cast<uint8>(QueryType::ReadHistory):
|
|
return on_read_history(std::move(net_query));
|
|
case static_cast<uint8>(QueryType::Ignore):
|
|
return Status::OK();
|
|
}
|
|
UNREACHABLE();
|
|
}());
|
|
|
|
loop();
|
|
}
|
|
|
|
void SecretChatActor::replay_close_chat(unique_ptr<logevent::CloseSecretChat> event) {
|
|
do_close_chat_impl(std::move(event));
|
|
}
|
|
|
|
void SecretChatActor::replay_create_chat(unique_ptr<logevent::CreateSecretChat> event) {
|
|
if (close_flag_) {
|
|
return;
|
|
}
|
|
do_create_chat_impl(std::move(event));
|
|
}
|
|
|
|
void SecretChatActor::add_inbound_message(unique_ptr<logevent::InboundSecretMessage> message) {
|
|
SCOPE_EXIT {
|
|
if (message) {
|
|
message->qts_ack.set_value(Unit());
|
|
}
|
|
};
|
|
if (close_flag_) {
|
|
return;
|
|
}
|
|
if (auth_state_.state != State::Ready) {
|
|
LOG(ERROR) << "Ignore unexpected update: " << tag("message", *message);
|
|
return;
|
|
}
|
|
check_status(do_inbound_message_encrypted(std::move(message)));
|
|
loop();
|
|
}
|
|
|
|
void SecretChatActor::replay_inbound_message(unique_ptr<logevent::InboundSecretMessage> message) {
|
|
if (close_flag_) {
|
|
return;
|
|
}
|
|
if (auth_state_.state != State::Ready) {
|
|
LOG(ERROR) << "Ignore unexpected replay inbound message: " << tag("message", *message);
|
|
return;
|
|
}
|
|
|
|
CHECK(!binlog_replay_finish_flag_);
|
|
CHECK(message->decrypted_message_layer); // from binlog
|
|
if (message->is_pending) { // wait for gaps?
|
|
// check_status(do_inbound_message_decrypted_unchecked(std::move(message)));
|
|
do_inbound_message_decrypted_pending(std::move(message));
|
|
} else { // just replay
|
|
LOG_CHECK(message->message_id > last_binlog_message_id_)
|
|
<< tag("last_binlog_message_id", last_binlog_message_id_) << tag("message_id", message->message_id);
|
|
last_binlog_message_id_ = message->message_id;
|
|
check_status(do_inbound_message_decrypted(std::move(message)));
|
|
}
|
|
loop();
|
|
}
|
|
|
|
void SecretChatActor::replay_outbound_message(unique_ptr<logevent::OutboundSecretMessage> message) {
|
|
if (close_flag_) {
|
|
return;
|
|
}
|
|
if (auth_state_.state != State::Ready) {
|
|
LOG(ERROR) << "Ignore unexpected replay outbound message: " << tag("message", *message);
|
|
return;
|
|
}
|
|
CHECK(!binlog_replay_finish_flag_);
|
|
LOG_CHECK(message->message_id > last_binlog_message_id_)
|
|
<< tag("last_binlog_message_id", last_binlog_message_id_) << tag("message_id", message->message_id);
|
|
last_binlog_message_id_ = message->message_id;
|
|
do_outbound_message_impl(std::move(message), Promise<>());
|
|
loop();
|
|
}
|
|
|
|
// NB: my_seq_no is just after message is sent, i.e. my_out_seq_no is already incremented
|
|
Result<BufferSlice> SecretChatActor::create_encrypted_message(int32 layer, int32 my_in_seq_no, int32 my_out_seq_no,
|
|
tl_object_ptr<secret_api::DecryptedMessage> &message) {
|
|
if (message->get_id() == secret_api::decryptedMessage::ID && layer < MTPROTO_2_LAYER) {
|
|
auto old = secret_api::move_object_as<secret_api::decryptedMessage>(message);
|
|
old->flags_ &= ~secret_api::decryptedMessage::GROUPED_ID_MASK;
|
|
message = secret_api::make_object<secret_api::decryptedMessage46>(
|
|
old->flags_, old->random_id_, old->ttl_, std::move(old->message_), std::move(old->media_),
|
|
std::move(old->entities_), std::move(old->via_bot_name_), old->reply_to_random_id_);
|
|
}
|
|
|
|
mtproto::AuthKey *auth_key = &pfs_state_.auth_key;
|
|
auto in_seq_no = my_in_seq_no * 2 + auth_state_.x;
|
|
auto out_seq_no = my_out_seq_no * 2 - 1 - auth_state_.x;
|
|
|
|
BufferSlice random_bytes(32);
|
|
Random::secure_bytes(random_bytes.as_slice().ubegin(), random_bytes.size());
|
|
auto message_with_layer = secret_api::make_object<secret_api::decryptedMessageLayer>(
|
|
std::move(random_bytes), layer, in_seq_no, out_seq_no, std::move(message));
|
|
LOG(INFO) << to_string(message_with_layer);
|
|
auto storer = create_storer(*message_with_layer);
|
|
auto new_storer = mtproto::PacketStorer<SecretImpl>(storer);
|
|
mtproto::PacketInfo info;
|
|
info.type = mtproto::PacketInfo::EndToEnd;
|
|
// Send with mtproto 2.0 if current layer is at least MTPROTO_2_LAYER
|
|
info.version = layer >= MTPROTO_2_LAYER ? 2 : 1;
|
|
info.is_creator = auth_state_.x == 0;
|
|
auto packet_writer = BufferWriter{mtproto::Transport::write(new_storer, *auth_key, &info), 0, 0};
|
|
mtproto::Transport::write(new_storer, *auth_key, &info, packet_writer.as_slice());
|
|
message = std::move(message_with_layer->message_);
|
|
return packet_writer.as_buffer_slice();
|
|
}
|
|
|
|
void SecretChatActor::send_message(tl_object_ptr<secret_api::DecryptedMessage> message,
|
|
tl_object_ptr<telegram_api::InputEncryptedFile> file, Promise<> promise) {
|
|
if (close_flag_) {
|
|
promise.set_error(Status::Error(400, "Chat is closed"));
|
|
return;
|
|
}
|
|
send_message_impl(std::move(message), std::move(file), SendFlag::External | SendFlag::Push, std::move(promise));
|
|
}
|
|
|
|
static int32 get_min_layer(const secret_api::decryptedMessageActionTyping &message) {
|
|
switch (message.action_->get_id()) {
|
|
case secret_api::sendMessageRecordRoundAction::ID:
|
|
case secret_api::sendMessageUploadRoundAction::ID:
|
|
return SecretChatActor::VIDEO_NOTES_LAYER;
|
|
}
|
|
return 0;
|
|
}
|
|
static int32 get_min_layer(const secret_api::decryptedMessageService &message) {
|
|
switch (message.action_->get_id()) {
|
|
case secret_api::decryptedMessageActionTyping::ID:
|
|
return get_min_layer(static_cast<const secret_api::decryptedMessageActionTyping &>(*message.action_));
|
|
default:
|
|
return 0;
|
|
}
|
|
}
|
|
static int32 get_min_layer(const secret_api::DocumentAttribute &attribute) {
|
|
switch (attribute.get_id()) {
|
|
case secret_api::documentAttributeVideo66::ID:
|
|
return SecretChatActor::VIDEO_NOTES_LAYER;
|
|
default:
|
|
return 0;
|
|
}
|
|
}
|
|
static int32 get_min_layer(const secret_api::decryptedMessageMediaDocument &message) {
|
|
int32 res = 0;
|
|
for (auto &attribute : message.attributes_) {
|
|
auto attrirbute_layer = get_min_layer(*attribute);
|
|
if (attrirbute_layer > res) {
|
|
res = attrirbute_layer;
|
|
}
|
|
return res;
|
|
}
|
|
return res;
|
|
}
|
|
static int32 get_min_layer(const secret_api::decryptedMessage &message) {
|
|
if (!message.media_) {
|
|
return 0;
|
|
}
|
|
switch (message.media_->get_id()) {
|
|
case secret_api::decryptedMessageMediaDocument::ID:
|
|
return get_min_layer(static_cast<const secret_api::decryptedMessageMediaDocument &>(*message.media_));
|
|
default:
|
|
return 0;
|
|
}
|
|
}
|
|
static int32 get_min_layer(const secret_api::DecryptedMessage &message) {
|
|
switch (message.get_id()) {
|
|
case secret_api::decryptedMessageService::ID:
|
|
return get_min_layer(static_cast<const secret_api::decryptedMessageService &>(message));
|
|
case secret_api::decryptedMessage::ID:
|
|
return get_min_layer(static_cast<const secret_api::decryptedMessage &>(message));
|
|
default:
|
|
return 0;
|
|
}
|
|
}
|
|
|
|
void SecretChatActor::send_message_impl(tl_object_ptr<secret_api::DecryptedMessage> message,
|
|
tl_object_ptr<telegram_api::InputEncryptedFile> file, int32 flags,
|
|
Promise<> promise) {
|
|
if (close_flag_) {
|
|
promise.set_error(Status::Error(400, "Chat is closed"));
|
|
return;
|
|
}
|
|
if (auth_state_.state != State::Ready) {
|
|
LOG(ERROR) << "Ignore send_message: " << tag("message", to_string(message)) << tag("file", to_string(file));
|
|
return promise.set_error(Status::Error(400, "Chat is not accessible"));
|
|
}
|
|
if (get_min_layer(*message) > config_state_.his_layer) {
|
|
return promise.set_error(Status::Error(400, "Message is not supported by the other side"));
|
|
}
|
|
LOG_CHECK(binlog_replay_finish_flag_) << "Trying to send message before binlog replay is finished: "
|
|
<< to_string(*message) << to_string(file);
|
|
int64 random_id = 0;
|
|
downcast_call(*message, [&](auto &x) { random_id = x.random_id_; });
|
|
|
|
LOG(INFO) << "Send message: " << to_string(*message) << to_string(file);
|
|
|
|
auto it = random_id_to_outbound_message_state_token_.find(random_id);
|
|
if (it != end(random_id_to_outbound_message_state_token_)) {
|
|
return on_outbound_outer_send_message_promise(it->second, std::move(promise));
|
|
}
|
|
|
|
auto binlog_event = make_unique<logevent::OutboundSecretMessage>();
|
|
binlog_event->chat_id = auth_state_.id;
|
|
binlog_event->random_id = random_id;
|
|
binlog_event->file = logevent::EncryptedInputFile::from_input_encrypted_file(file);
|
|
binlog_event->message_id = seq_no_state_.message_id + 1;
|
|
binlog_event->my_in_seq_no = seq_no_state_.my_in_seq_no;
|
|
binlog_event->my_out_seq_no = seq_no_state_.my_out_seq_no + 1;
|
|
binlog_event->his_in_seq_no = seq_no_state_.his_in_seq_no;
|
|
binlog_event->encrypted_message =
|
|
create_encrypted_message(current_layer(), binlog_event->my_in_seq_no, binlog_event->my_out_seq_no, message)
|
|
.move_as_ok();
|
|
binlog_event->is_service = (flags & SendFlag::Push) == 0;
|
|
binlog_event->is_external = (flags & SendFlag::External) != 0;
|
|
if (message->get_id() == secret_api::decryptedMessageService::ID) {
|
|
binlog_event->is_rewritable = false;
|
|
auto service_message = move_tl_object_as<secret_api::decryptedMessageService>(message);
|
|
binlog_event->action = std::move(service_message->action_);
|
|
} else {
|
|
binlog_event->is_rewritable = true;
|
|
}
|
|
|
|
do_outbound_message_impl(std::move(binlog_event), std::move(promise));
|
|
}
|
|
|
|
void SecretChatActor::send_message_action(tl_object_ptr<secret_api::SendMessageAction> action) {
|
|
if (close_flag_) {
|
|
return;
|
|
}
|
|
if (auth_state_.state != State::Ready) {
|
|
LOG(ERROR) << "Ignore send_message_action: " << tag("message", to_string(action));
|
|
return;
|
|
}
|
|
bool flag = action->get_id() != secret_api::sendMessageCancelAction::ID;
|
|
|
|
auto net_query = context_->net_query_creator().create(
|
|
UniqueId::next(UniqueId::Type::Default, static_cast<uint8>(QueryType::Ignore)),
|
|
create_storer(telegram_api::messages_setEncryptedTyping(get_input_chat(), flag)));
|
|
if (!set_typing_query_.empty()) {
|
|
LOG(INFO) << "Cancel previous set typing query";
|
|
cancel_query(set_typing_query_);
|
|
}
|
|
set_typing_query_ = net_query.get_weak();
|
|
context_->send_net_query(std::move(net_query), actor_shared(this), false);
|
|
}
|
|
|
|
void SecretChatActor::send_read_history(int32 date, Promise<> promise) {
|
|
if (close_flag_) {
|
|
promise.set_error(Status::Error(400, "Chat is closed"));
|
|
return;
|
|
}
|
|
if (auth_state_.state != State::Ready) {
|
|
LOG(ERROR) << "Ignore send_read_history: " << tag("date", date);
|
|
promise.set_error(Status::Error(400, "Can't access the chat"));
|
|
return;
|
|
}
|
|
|
|
if (date <= last_read_history_date_) {
|
|
return promise.set_value(Unit());
|
|
}
|
|
|
|
if (read_history_promise_) {
|
|
LOG(INFO) << "Cancel previous read history request in secret chat " << auth_state_.id;
|
|
read_history_promise_.set_value(Unit());
|
|
cancel_query(read_history_query_);
|
|
}
|
|
|
|
auto net_query = context_->net_query_creator().create(
|
|
UniqueId::next(UniqueId::Type::Default, static_cast<uint8>(QueryType::ReadHistory)),
|
|
create_storer(telegram_api::messages_readEncryptedHistory(get_input_chat(), date)));
|
|
read_history_query_ = net_query.get_weak();
|
|
last_read_history_date_ = date;
|
|
read_history_promise_ = std::move(promise);
|
|
LOG(INFO) << "Send read history request with date " << date << " in secret chat " << auth_state_.id;
|
|
context_->send_net_query(std::move(net_query), actor_shared(this), false);
|
|
}
|
|
|
|
void SecretChatActor::send_open_message(int64 random_id, Promise<> promise) {
|
|
if (close_flag_) {
|
|
promise.set_error(Status::Error(400, "Chat is closed"));
|
|
return;
|
|
}
|
|
if (auth_state_.state != State::Ready) {
|
|
promise.set_error(Status::Error(400, "Can't access the chat"));
|
|
return;
|
|
}
|
|
std::vector<int64> random_ids{random_id};
|
|
send_action(make_tl_object<secret_api::decryptedMessageActionReadMessages>(std::move(random_ids)), SendFlag::Push,
|
|
std::move(promise));
|
|
}
|
|
|
|
void SecretChatActor::delete_message(int64 random_id, Promise<> promise) {
|
|
if (auth_state_.state == State::Closed) {
|
|
promise.set_value(Unit());
|
|
return;
|
|
}
|
|
if (close_flag_) {
|
|
promise.set_error(Status::Error(400, "Chat is closed"));
|
|
return;
|
|
}
|
|
if (auth_state_.state != State::Ready) {
|
|
promise.set_error(Status::Error(400, "Can't access the chat"));
|
|
return;
|
|
}
|
|
return delete_messages(std::vector<int64>{random_id}, std::move(promise));
|
|
}
|
|
|
|
void SecretChatActor::delete_messages(std::vector<int64> random_ids, Promise<> promise) {
|
|
if (auth_state_.state == State::Closed) {
|
|
promise.set_value(Unit());
|
|
return;
|
|
}
|
|
if (close_flag_) {
|
|
promise.set_error(Status::Error(400, "Chat is closed"));
|
|
return;
|
|
}
|
|
if (auth_state_.state != State::Ready) {
|
|
promise.set_error(Status::Error(400, "Can't access the chat"));
|
|
return;
|
|
}
|
|
send_action(make_tl_object<secret_api::decryptedMessageActionDeleteMessages>(std::move(random_ids)), SendFlag::Push,
|
|
std::move(promise));
|
|
}
|
|
void SecretChatActor::delete_all_messages(Promise<> promise) {
|
|
if (auth_state_.state == State::Closed) {
|
|
promise.set_value(Unit());
|
|
return;
|
|
}
|
|
if (close_flag_) {
|
|
promise.set_error(Status::Error(400, "Chat is closed"));
|
|
return;
|
|
}
|
|
if (auth_state_.state != State::Ready) {
|
|
promise.set_error(Status::Error(400, "Can't access the chat"));
|
|
return;
|
|
}
|
|
send_action(make_tl_object<secret_api::decryptedMessageActionFlushHistory>(), SendFlag::Push, std::move(promise));
|
|
}
|
|
|
|
void SecretChatActor::notify_screenshot_taken(Promise<> promise) {
|
|
if (close_flag_) {
|
|
promise.set_error(Status::Error(400, "Chat is closed"));
|
|
return;
|
|
}
|
|
if (auth_state_.state != State::Ready) {
|
|
promise.set_error(Status::Error(400, "Can't access the chat"));
|
|
return;
|
|
}
|
|
send_action(make_tl_object<secret_api::decryptedMessageActionScreenshotMessages>(), SendFlag::Push,
|
|
std::move(promise));
|
|
}
|
|
|
|
void SecretChatActor::send_set_ttl_message(int32 ttl, int64 random_id, Promise<> promise) {
|
|
if (close_flag_) {
|
|
promise.set_error(Status::Error(400, "Chat is closed"));
|
|
return;
|
|
}
|
|
if (auth_state_.state != State::Ready) {
|
|
promise.set_error(Status::Error(400, "Can't access the chat"));
|
|
return;
|
|
}
|
|
send_message_impl(secret_api::make_object<secret_api::decryptedMessageService>(
|
|
random_id, make_tl_object<secret_api::decryptedMessageActionSetMessageTTL>(ttl)),
|
|
nullptr, SendFlag::External | SendFlag::Push, std::move(promise));
|
|
}
|
|
|
|
void SecretChatActor::send_action(tl_object_ptr<secret_api::DecryptedMessageAction> action, int32 flags,
|
|
Promise<> promise) {
|
|
send_message_impl(
|
|
secret_api::make_object<secret_api::decryptedMessageService>(Random::secure_int64(), std::move(action)), nullptr,
|
|
flags, std::move(promise));
|
|
}
|
|
|
|
void SecretChatActor::binlog_replay_finish() {
|
|
on_his_in_seq_no_updated();
|
|
LOG(INFO) << "Binlog replay is finished with SeqNoState " << seq_no_state_;
|
|
LOG(INFO) << "Binlog replay is finished with PfsState " << pfs_state_;
|
|
binlog_replay_finish_flag_ = true;
|
|
if (auth_state_.state == State::Ready) {
|
|
if (config_state_.my_layer < MY_LAYER) {
|
|
send_action(secret_api::make_object<secret_api::decryptedMessageActionNotifyLayer>(MY_LAYER), SendFlag::None,
|
|
Promise<>());
|
|
}
|
|
}
|
|
yield();
|
|
}
|
|
|
|
void SecretChatActor::loop() {
|
|
if (close_flag_) {
|
|
return;
|
|
}
|
|
if (!binlog_replay_finish_flag_) {
|
|
return;
|
|
}
|
|
|
|
check_status(do_loop());
|
|
}
|
|
|
|
Status SecretChatActor::do_loop() {
|
|
TRY_STATUS(run_auth());
|
|
run_pfs();
|
|
run_fill_gaps();
|
|
return Status::OK();
|
|
}
|
|
|
|
void SecretChatActor::on_send_message_ack(int64 random_id) {
|
|
context_->on_send_message_ack(random_id);
|
|
}
|
|
|
|
Status SecretChatActor::on_delete_messages(const std::vector<int64> &random_ids) {
|
|
for (auto random_id : random_ids) {
|
|
auto it = random_id_to_outbound_message_state_token_.find(random_id);
|
|
if (it == random_id_to_outbound_message_state_token_.end()) {
|
|
continue;
|
|
}
|
|
auto state_id = it->second;
|
|
TRY_STATUS(outbound_rewrite_with_empty(state_id));
|
|
}
|
|
return Status::OK();
|
|
}
|
|
|
|
Status SecretChatActor::on_flush_history(int32 last_message_id) {
|
|
std::vector<uint64> to_rewrite;
|
|
outbound_message_states_.for_each([&](auto state_id, auto &state) {
|
|
if (state.message->message_id < last_message_id && state.message->is_rewritable) {
|
|
to_rewrite.push_back(state_id);
|
|
}
|
|
});
|
|
for (auto state_id : to_rewrite) {
|
|
TRY_STATUS(outbound_rewrite_with_empty(state_id));
|
|
}
|
|
return Status::OK();
|
|
}
|
|
|
|
Status SecretChatActor::run_auth() {
|
|
switch (auth_state_.state) {
|
|
case State::Empty:
|
|
return Status::OK();
|
|
case State::SendRequest: {
|
|
if (!auth_state_.handshake.has_config()) {
|
|
return Status::OK();
|
|
}
|
|
// messages.requestEncryption#f64daf43 user_id:InputUser random_id:int g_a:bytes = EncryptedChat;
|
|
telegram_api::messages_requestEncryption tl_query;
|
|
tl_query.user_id_ = get_input_user();
|
|
tl_query.random_id_ = auth_state_.random_id;
|
|
tl_query.g_a_ = BufferSlice(auth_state_.handshake.get_g_b());
|
|
auto query = context_->net_query_creator().create(
|
|
UniqueId::next(UniqueId::Type::Default, static_cast<uint8>(QueryType::EncryptedChat)),
|
|
create_storer(tl_query));
|
|
context_->send_net_query(std::move(query), actor_shared(this), false);
|
|
auth_state_.state = State::WaitRequestResponse;
|
|
return Status::OK();
|
|
}
|
|
case State::SendAccept: {
|
|
if (!auth_state_.handshake.has_config()) {
|
|
return Status::OK();
|
|
}
|
|
TRY_STATUS(auth_state_.handshake.run_checks(true, context_->dh_callback()));
|
|
auto id_and_key = auth_state_.handshake.gen_key();
|
|
pfs_state_.auth_key = mtproto::AuthKey(id_and_key.first, std::move(id_and_key.second));
|
|
calc_key_hash();
|
|
// messages.acceptEncryption#3dbc0415 peer:InputEncryptedChat g_b:bytes key_fingerprint:long =
|
|
// EncryptedChat;
|
|
telegram_api::messages_acceptEncryption tl_query;
|
|
tl_query.peer_ = get_input_chat();
|
|
tl_query.g_b_ = BufferSlice(auth_state_.handshake.get_g_b());
|
|
tl_query.key_fingerprint_ = pfs_state_.auth_key.id();
|
|
auto query = context_->net_query_creator().create(
|
|
UniqueId::next(UniqueId::Type::Default, static_cast<uint8>(QueryType::EncryptedChat)),
|
|
create_storer(tl_query));
|
|
context_->send_net_query(std::move(query), actor_shared(this), false);
|
|
auth_state_.state = State::WaitAcceptResponse;
|
|
return Status::OK();
|
|
}
|
|
default:
|
|
break;
|
|
}
|
|
return Status::OK();
|
|
}
|
|
|
|
void SecretChatActor::run_fill_gaps() {
|
|
// replay messages
|
|
while (true) {
|
|
if (pending_inbound_messages_.empty()) {
|
|
break;
|
|
}
|
|
auto begin = pending_inbound_messages_.begin();
|
|
auto next_seq_no = begin->first;
|
|
if (next_seq_no <= seq_no_state_.my_in_seq_no) {
|
|
LOG(INFO) << "Replay pending event: " << tag("seq_no", next_seq_no);
|
|
auto message = std::move(begin->second);
|
|
pending_inbound_messages_.erase(begin);
|
|
check_status(do_inbound_message_decrypted_unchecked(std::move(message)));
|
|
CHECK(pending_inbound_messages_.find(next_seq_no) == pending_inbound_messages_.end());
|
|
} else {
|
|
break;
|
|
}
|
|
}
|
|
|
|
if (pending_inbound_messages_.empty()) {
|
|
return;
|
|
}
|
|
|
|
auto start_seq_no = seq_no_state_.my_in_seq_no;
|
|
auto finish_seq_no = pending_inbound_messages_.begin()->first - 1;
|
|
LOG(INFO) << tag("start_seq_no", start_seq_no) << tag("finish_seq_no", finish_seq_no)
|
|
<< tag("resend_end_seq_no", seq_no_state_.resend_end_seq_no);
|
|
CHECK(start_seq_no <= finish_seq_no);
|
|
if (seq_no_state_.resend_end_seq_no >= finish_seq_no) {
|
|
return;
|
|
}
|
|
CHECK(seq_no_state_.resend_end_seq_no < start_seq_no);
|
|
|
|
start_seq_no = start_seq_no * 2 + auth_state_.x;
|
|
finish_seq_no = finish_seq_no * 2 + auth_state_.x;
|
|
|
|
send_action(secret_api::make_object<secret_api::decryptedMessageActionResend>(start_seq_no, finish_seq_no),
|
|
SendFlag::None, Promise<>());
|
|
}
|
|
|
|
void SecretChatActor::run_pfs() {
|
|
while (true) {
|
|
LOG(INFO) << "Run pfs loop: " << pfs_state_;
|
|
if (pfs_state_.state == PfsState::Empty &&
|
|
(pfs_state_.last_message_id + 100 < seq_no_state_.message_id ||
|
|
pfs_state_.last_timestamp + 60 * 60 * 24 * 7 < Time::now()) &&
|
|
pfs_state_.other_auth_key.empty()) {
|
|
LOG(INFO) << "Request new key";
|
|
request_new_key();
|
|
}
|
|
switch (pfs_state_.state) {
|
|
case PfsState::SendRequest: {
|
|
// shouldn't wait, pfs_state is already saved explicitly
|
|
pfs_state_.state = PfsState::WaitSendRequest; // don't save it!
|
|
send_action(secret_api::make_object<secret_api::decryptedMessageActionRequestKey>(
|
|
pfs_state_.exchange_id, BufferSlice(pfs_state_.handshake.get_g_b())),
|
|
SendFlag::None, Promise<>());
|
|
break;
|
|
}
|
|
case PfsState::SendCommit: {
|
|
// must wait till pfs_state is saved to binlog. Otherwise we may save ActionCommit to binlog without pfs_state,
|
|
// which has the new auth_key.
|
|
if (saved_pfs_state_message_id_ < pfs_state_.wait_message_id) {
|
|
return;
|
|
}
|
|
|
|
// TODO: wait till gaps are filled???
|
|
pfs_state_.state = PfsState::WaitSendCommit; // don't save it
|
|
send_action(secret_api::make_object<secret_api::decryptedMessageActionCommitKey>(
|
|
pfs_state_.exchange_id, static_cast<int64>(pfs_state_.other_auth_key.id())),
|
|
SendFlag::None, Promise<>());
|
|
|
|
break;
|
|
}
|
|
case PfsState::SendAccept: {
|
|
if (saved_pfs_state_message_id_ < pfs_state_.wait_message_id) {
|
|
return;
|
|
}
|
|
|
|
pfs_state_.state = PfsState::WaitSendAccept; // don't save it
|
|
send_action(secret_api::make_object<secret_api::decryptedMessageActionAcceptKey>(
|
|
pfs_state_.exchange_id, BufferSlice(pfs_state_.handshake.get_g_b()),
|
|
static_cast<int64>(pfs_state_.other_auth_key.id())),
|
|
SendFlag::None, Promise<>());
|
|
|
|
break;
|
|
}
|
|
default:
|
|
return;
|
|
}
|
|
}
|
|
}
|
|
|
|
void SecretChatActor::check_status(Status status) {
|
|
if (status.is_error()) {
|
|
if (status.code() == 1) {
|
|
LOG(WARNING) << "Non-fatal error: " << status;
|
|
} else {
|
|
on_fatal_error(std::move(status));
|
|
}
|
|
}
|
|
}
|
|
|
|
void SecretChatActor::on_fatal_error(Status status) {
|
|
LOG(ERROR) << "Fatal error: " << status;
|
|
cancel_chat(Promise<>());
|
|
}
|
|
|
|
void SecretChatActor::cancel_chat(Promise<> promise) {
|
|
if (close_flag_) {
|
|
promise.set_value(Unit());
|
|
return;
|
|
}
|
|
close_flag_ = true;
|
|
|
|
std::vector<logevent::LogEvent::Id> to_delete;
|
|
outbound_message_states_.for_each(
|
|
[&](auto state_id, auto &state) { to_delete.push_back(state.message->logevent_id()); });
|
|
inbound_message_states_.for_each([&](auto state_id, auto &state) { to_delete.push_back(state.logevent_id); });
|
|
|
|
// TODO: It must be a transaction
|
|
for (auto id : to_delete) {
|
|
binlog_erase(context_->binlog(), id);
|
|
}
|
|
if (create_logevent_id_ != 0) {
|
|
binlog_erase(context_->binlog(), create_logevent_id_);
|
|
create_logevent_id_ = 0;
|
|
}
|
|
|
|
auto event = make_unique<logevent::CloseSecretChat>();
|
|
event->chat_id = auth_state_.id;
|
|
event->set_logevent_id(binlog_add(context_->binlog(), LogEvent::HandlerType::SecretChats, create_storer(*event)));
|
|
|
|
auto on_sync = PromiseCreator::lambda(
|
|
[actor_id = actor_id(this), event = std::move(event), promise = std::move(promise)](Result<Unit> result) mutable {
|
|
if (result.is_ok()) {
|
|
send_closure(actor_id, &SecretChatActor::do_close_chat_impl, std::move(event));
|
|
promise.set_value(Unit());
|
|
} else {
|
|
promise.set_error(result.error().clone());
|
|
send_closure(actor_id, &SecretChatActor::on_promise_error, result.move_as_error(), "do_close_chat_impl");
|
|
}
|
|
});
|
|
|
|
context_->binlog()->force_sync(std::move(on_sync));
|
|
yield();
|
|
}
|
|
|
|
void SecretChatActor::do_close_chat_impl(unique_ptr<logevent::CloseSecretChat> event) {
|
|
close_flag_ = true;
|
|
close_logevent_id_ = event->logevent_id();
|
|
LOG(INFO) << "Send messages.discardEncryption";
|
|
auth_state_.state = State::Closed;
|
|
context_->secret_chat_db()->set_value(auth_state_);
|
|
context_->secret_chat_db()->erase_value(config_state_);
|
|
context_->secret_chat_db()->erase_value(pfs_state_);
|
|
context_->secret_chat_db()->erase_value(seq_no_state_);
|
|
telegram_api::messages_discardEncryption tl_query(auth_state_.id);
|
|
auto query = context_->net_query_creator().create(
|
|
UniqueId::next(UniqueId::Type::Default, static_cast<uint8>(QueryType::DiscardEncryption)),
|
|
create_storer(tl_query));
|
|
|
|
send_update_secret_chat();
|
|
|
|
context_->send_net_query(std::move(query), actor_shared(this), true);
|
|
}
|
|
|
|
void SecretChatActor::do_create_chat_impl(unique_ptr<logevent::CreateSecretChat> event) {
|
|
LOG(INFO) << *event;
|
|
CHECK(event->random_id == auth_state_.id);
|
|
create_logevent_id_ = event->logevent_id();
|
|
|
|
if (auth_state_.state == State::Empty) {
|
|
auth_state_.user_id = event->user_id;
|
|
auth_state_.user_access_hash = event->user_access_hash;
|
|
auth_state_.random_id = event->random_id;
|
|
auth_state_.state = State::SendRequest;
|
|
auth_state_.x = 0;
|
|
auth_state_.date = context_->unix_time();
|
|
send_update_secret_chat();
|
|
} else if (auth_state_.state == State::SendRequest) {
|
|
} else if (auth_state_.state == State::WaitRequestResponse) {
|
|
} else {
|
|
binlog_erase(context_->binlog(), create_logevent_id_);
|
|
create_logevent_id_ = 0;
|
|
}
|
|
}
|
|
void SecretChatActor::on_discard_encryption_result(NetQueryPtr result) {
|
|
CHECK(close_flag_);
|
|
CHECK(close_logevent_id_ != 0);
|
|
if (context_->close_flag()) {
|
|
return;
|
|
}
|
|
LOG(INFO) << "Got result for messages.discardEncryption";
|
|
context_->secret_chat_db()->erase_value(auth_state_);
|
|
binlog_erase(context_->binlog(), close_logevent_id_);
|
|
// skip flush
|
|
stop();
|
|
}
|
|
|
|
telegram_api::object_ptr<telegram_api::inputUser> SecretChatActor::get_input_user() {
|
|
return telegram_api::make_object<telegram_api::inputUser>(auth_state_.user_id, auth_state_.user_access_hash);
|
|
}
|
|
telegram_api::object_ptr<telegram_api::inputEncryptedChat> SecretChatActor::get_input_chat() {
|
|
return telegram_api::make_object<telegram_api::inputEncryptedChat>(auth_state_.id, auth_state_.access_hash);
|
|
}
|
|
void SecretChatActor::tear_down() {
|
|
LOG(INFO) << "SecretChatActor: tear_down";
|
|
// TODO notify send update that we are dead
|
|
}
|
|
|
|
Result<std::tuple<uint64, BufferSlice, int32>> SecretChatActor::decrypt(BufferSlice &encrypted_message) {
|
|
MutableSlice data = encrypted_message.as_slice();
|
|
CHECK(is_aligned_pointer<4>(data.data()));
|
|
TRY_RESULT(auth_key_id, mtproto::Transport::read_auth_key_id(data));
|
|
mtproto::AuthKey *auth_key = nullptr;
|
|
if (auth_key_id == pfs_state_.auth_key.id()) {
|
|
auth_key = &pfs_state_.auth_key;
|
|
} else if (auth_key_id == pfs_state_.other_auth_key.id()) {
|
|
auth_key = &pfs_state_.other_auth_key;
|
|
} else {
|
|
return Status::Error(1, PSLICE() << "Unknown " << tag("auth_key_id", format::as_hex(auth_key_id))
|
|
<< tag("crc", crc64(encrypted_message.as_slice())));
|
|
}
|
|
|
|
// expect that message is encrypted with mtproto 2.0 if their layer is at least MTPROTO_2_LAYER
|
|
std::array<int, 2> versions{{1, 2}};
|
|
if (config_state_.his_layer >= MTPROTO_2_LAYER) {
|
|
std::swap(versions[0], versions[1]);
|
|
}
|
|
|
|
BufferSlice encrypted_message_copy;
|
|
int32 mtproto_version = -1;
|
|
Result<mtproto::Transport::ReadResult> r_read_result;
|
|
for (size_t i = 0; i < versions.size(); i++) {
|
|
bool is_last = i + 1 == versions.size();
|
|
encrypted_message_copy = encrypted_message.copy();
|
|
data = encrypted_message_copy.as_slice();
|
|
CHECK(is_aligned_pointer<4>(data.data()));
|
|
|
|
mtproto::PacketInfo info;
|
|
info.type = mtproto::PacketInfo::EndToEnd;
|
|
mtproto_version = versions[i];
|
|
info.version = mtproto_version;
|
|
info.is_creator = auth_state_.x == 0;
|
|
r_read_result = mtproto::Transport::read(data, *auth_key, &info);
|
|
if (!is_last && r_read_result.is_error()) {
|
|
LOG(WARNING) << tag("mtproto", mtproto_version) << " decryption failed " << r_read_result.error();
|
|
continue;
|
|
}
|
|
break;
|
|
}
|
|
TRY_RESULT(read_result, std::move(r_read_result));
|
|
switch (read_result.type()) {
|
|
case mtproto::Transport::ReadResult::Quickack:
|
|
return Status::Error("Got quickack instead of a message");
|
|
case mtproto::Transport::ReadResult::Error:
|
|
return Status::Error(PSLICE() << "Got mtproto error code instead of a message: " << read_result.error());
|
|
case mtproto::Transport::ReadResult::Nop:
|
|
return Status::Error("Got nop instead of a message");
|
|
case mtproto::Transport::ReadResult::Packet:
|
|
data = read_result.packet();
|
|
break;
|
|
default:
|
|
UNREACHABLE();
|
|
}
|
|
|
|
int32 len = as<int32>(data.begin());
|
|
data = data.substr(4, len);
|
|
if (!is_aligned_pointer<4>(data.data())) {
|
|
return std::make_tuple(auth_key_id, BufferSlice(data), mtproto_version);
|
|
} else {
|
|
return std::make_tuple(auth_key_id, encrypted_message_copy.from_slice(data), mtproto_version);
|
|
}
|
|
}
|
|
|
|
Status SecretChatActor::do_inbound_message_encrypted(unique_ptr<logevent::InboundSecretMessage> message) {
|
|
SCOPE_EXIT {
|
|
if (message) {
|
|
message->qts_ack.set_value(Unit());
|
|
}
|
|
};
|
|
TRY_RESULT(decrypted, decrypt(message->encrypted_message));
|
|
auto auth_key_id = std::get<0>(decrypted);
|
|
auto data_buffer = std::move(std::get<1>(decrypted));
|
|
auto mtproto_version = std::get<2>(decrypted);
|
|
message->auth_key_id = auth_key_id;
|
|
|
|
TlBufferParser parser(&data_buffer);
|
|
auto id = parser.fetch_int();
|
|
Status status;
|
|
if (id == secret_api::decryptedMessageLayer::ID) {
|
|
auto message_with_layer = secret_api::decryptedMessageLayer::fetch(parser);
|
|
parser.fetch_end();
|
|
if (!parser.get_error()) {
|
|
auto layer = message_with_layer->layer_;
|
|
if (layer < DEFAULT_LAYER && false /*TODO: fix android app bug? */) {
|
|
LOG(ERROR) << "All or nothing, " << tag("layer", layer) << " is not supported, drop message "
|
|
<< to_string(message_with_layer);
|
|
return Status::OK();
|
|
}
|
|
if (config_state_.his_layer < layer) {
|
|
config_state_.his_layer = layer;
|
|
context_->secret_chat_db()->set_value(config_state_);
|
|
send_update_secret_chat();
|
|
}
|
|
if (layer >= MTPROTO_2_LAYER && mtproto_version < 2) {
|
|
return Status::Error(PSLICE() << "Mtproto 1.0 encryption is forbidden for this layer");
|
|
}
|
|
if (message_with_layer->in_seq_no_ < 0) {
|
|
return Status::Error(PSLICE() << "Invalid seq_no: " << to_string(message_with_layer));
|
|
}
|
|
message->decrypted_message_layer = std::move(message_with_layer);
|
|
return do_inbound_message_decrypted_unchecked(std::move(message));
|
|
} else {
|
|
status = Status::Error(PSLICE() << parser.get_error() << format::as_hex_dump<4>(data_buffer.as_slice()));
|
|
}
|
|
} else {
|
|
status = Status::Error(PSLICE() << "Unknown constructor " << tag("ID", format::as_hex(id)));
|
|
}
|
|
|
|
// support for older layer
|
|
LOG(WARNING) << "Failed to Fetch update: " << status;
|
|
send_action(secret_api::make_object<secret_api::decryptedMessageActionNotifyLayer>(MY_LAYER), SendFlag::None,
|
|
Promise<>());
|
|
|
|
if (config_state_.his_layer == 8) {
|
|
TlBufferParser new_parser(&data_buffer);
|
|
auto message_without_layer = secret_api::DecryptedMessage::fetch(new_parser);
|
|
parser.fetch_end();
|
|
if (!new_parser.get_error()) {
|
|
message->decrypted_message_layer = secret_api::make_object<secret_api::decryptedMessageLayer>(
|
|
BufferSlice(), config_state_.his_layer, -1, -1, std::move(message_without_layer));
|
|
return do_inbound_message_decrypted_unchecked(std::move(message));
|
|
}
|
|
LOG(ERROR) << "Failed to fetch update (DecryptedMessage): " << new_parser.get_error()
|
|
<< format::as_hex_dump<4>(data_buffer.as_slice());
|
|
}
|
|
|
|
return status;
|
|
}
|
|
|
|
Status SecretChatActor::check_seq_no(int in_seq_no, int out_seq_no, int32 his_layer) {
|
|
if (in_seq_no < 0) {
|
|
return Status::OK();
|
|
}
|
|
if (in_seq_no % 2 != (1 - auth_state_.x) || out_seq_no % 2 != auth_state_.x) {
|
|
return Status::Error("Bad seq_no parity");
|
|
}
|
|
in_seq_no /= 2;
|
|
out_seq_no /= 2;
|
|
if (out_seq_no < seq_no_state_.my_in_seq_no) {
|
|
return Status::Error(1, "Old seq_no");
|
|
}
|
|
if (out_seq_no > seq_no_state_.my_in_seq_no) {
|
|
return Status::Error(2, "Gap found!");
|
|
}
|
|
if (in_seq_no < seq_no_state_.his_in_seq_no) {
|
|
return Status::Error("in_seq_no is not monotonic");
|
|
}
|
|
if (seq_no_state_.my_out_seq_no < in_seq_no) {
|
|
return Status::Error("in_seq_no is bigger than seq_no_state_.my_out_seq_no");
|
|
}
|
|
if (his_layer < seq_no_state_.his_layer) {
|
|
return Status::Error("his_layer is not monotonic");
|
|
}
|
|
|
|
return Status::OK();
|
|
}
|
|
|
|
Status SecretChatActor::do_inbound_message_decrypted_unchecked(unique_ptr<logevent::InboundSecretMessage> message) {
|
|
SCOPE_EXIT {
|
|
LOG_IF(FATAL, message && message->qts_ack) << "Lost qts_promise";
|
|
};
|
|
auto in_seq_no = message->decrypted_message_layer->in_seq_no_;
|
|
auto out_seq_no = message->decrypted_message_layer->out_seq_no_;
|
|
auto status = check_seq_no(in_seq_no, out_seq_no, message->his_layer());
|
|
if (status.is_error() && status.code() != 2 /* not gap found */) {
|
|
message->qts_ack.set_value(Unit());
|
|
if (message->logevent_id()) {
|
|
LOG(INFO) << "Erase binlog event: " << tag("logevent_id", message->logevent_id());
|
|
binlog_erase(context_->binlog(), message->logevent_id());
|
|
}
|
|
auto warning_message = PSTRING() << status << tag("seq_no_state_.my_in_seq_no", seq_no_state_.my_in_seq_no)
|
|
<< tag("seq_no_state_.my_out_seq_no", seq_no_state_.my_out_seq_no)
|
|
<< tag("seq_no_state_.his_in_seq_no", seq_no_state_.his_in_seq_no)
|
|
<< tag("in_seq_no", in_seq_no) << tag("out_seq_no", out_seq_no)
|
|
<< to_string(message->decrypted_message_layer);
|
|
if (status.code()) {
|
|
LOG(WARNING) << warning_message;
|
|
} else {
|
|
LOG(ERROR) << warning_message;
|
|
}
|
|
return status;
|
|
}
|
|
|
|
if (message->decrypted_message_layer->message_->get_id() == secret_api::decryptedMessageService8::ID) {
|
|
auto old = move_tl_object_as<secret_api::decryptedMessageService8>(message->decrypted_message_layer->message_);
|
|
message->decrypted_message_layer->message_ =
|
|
secret_api::make_object<secret_api::decryptedMessageService>(old->random_id_, std::move(old->action_));
|
|
}
|
|
|
|
// Process ActionResend.
|
|
if (message->decrypted_message_layer->message_->get_id() == secret_api::decryptedMessageService::ID) {
|
|
auto *decrypted_message_service =
|
|
static_cast<secret_api::decryptedMessageService *>(message->decrypted_message_layer->message_.get());
|
|
if (decrypted_message_service->action_->get_id() == secret_api::decryptedMessageActionResend::ID) {
|
|
auto *action_resend =
|
|
static_cast<secret_api::decryptedMessageActionResend *>(decrypted_message_service->action_.get());
|
|
|
|
uint32 start_seq_no = static_cast<uint32>(action_resend->start_seq_no_ / 2);
|
|
uint32 finish_seq_no = static_cast<uint32>(action_resend->end_seq_no_ / 2);
|
|
if (start_seq_no + MAX_RESEND_COUNT < finish_seq_no) {
|
|
message->qts_ack.set_value(Unit());
|
|
return Status::Error(PSLICE() << "Won't resend more than " << MAX_RESEND_COUNT << " messages");
|
|
}
|
|
LOG(INFO) << "ActionResend: " << tag("start", start_seq_no) << tag("finish_seq_no", finish_seq_no);
|
|
for (auto seq_no = start_seq_no; seq_no <= finish_seq_no; seq_no++) {
|
|
auto it = out_seq_no_to_outbound_message_state_token_.find(seq_no);
|
|
if (it == out_seq_no_to_outbound_message_state_token_.end()) {
|
|
message->qts_ack.set_value(Unit());
|
|
return Status::Error(PSLICE() << "Can't resend query " << tag("seq_no", seq_no));
|
|
}
|
|
auto state_id = it->second;
|
|
outbound_resend(state_id);
|
|
}
|
|
// It is ok to replace action with Noop, because it won't be written to binlog before message is marked unsent
|
|
decrypted_message_service->action_ = secret_api::make_object<secret_api::decryptedMessageActionNoop>();
|
|
}
|
|
}
|
|
|
|
LOG(INFO) << "GOT MESSAGE " << to_string(message->decrypted_message_layer);
|
|
|
|
if (status.is_error()) {
|
|
CHECK(status.code() == 2); // gap found
|
|
do_inbound_message_decrypted_pending(std::move(message));
|
|
return Status::OK();
|
|
}
|
|
|
|
message->message_id = seq_no_state_.message_id + 1;
|
|
if (in_seq_no != -1) {
|
|
message->my_in_seq_no = out_seq_no / 2 + 1;
|
|
message->my_out_seq_no = seq_no_state_.my_out_seq_no;
|
|
message->his_in_seq_no = in_seq_no / 2;
|
|
}
|
|
|
|
return do_inbound_message_decrypted(std::move(message));
|
|
}
|
|
|
|
void SecretChatActor::do_outbound_message_impl(unique_ptr<logevent::OutboundSecretMessage> binlog_event,
|
|
Promise<> promise) {
|
|
binlog_event->crc = crc64(binlog_event->encrypted_message.as_slice());
|
|
LOG(INFO) << "Do outbound message: " << *binlog_event << tag("crc", binlog_event->crc);
|
|
auto &state_id_ref = random_id_to_outbound_message_state_token_[binlog_event->random_id];
|
|
LOG_CHECK(state_id_ref == 0) << "Random id collision";
|
|
state_id_ref = outbound_message_states_.create();
|
|
const uint64 state_id = state_id_ref;
|
|
auto *state = outbound_message_states_.get(state_id);
|
|
LOG(INFO) << tag("state_id", state_id);
|
|
CHECK(state);
|
|
state->message = std::move(binlog_event);
|
|
|
|
// OutboundSecretMessage
|
|
//
|
|
// 1. [] => Save logevent. [save_logevent]
|
|
// 2. [save_logevent] => Save SeqNoState [save_changes]
|
|
// 3. [save_logevent] => Send NetQuery [send_message]
|
|
// Note: we have to force binlog to flush
|
|
// 4.0 [send_message]:Fail => rewrite
|
|
// 4. [save_changes; send_message] => Mark logevent as sent [rewrite_logevent]
|
|
// 5. [save_changes; send_message; ack] => [remove_logevent]
|
|
|
|
auto message = state->message.get();
|
|
|
|
// send_message
|
|
auto send_message_start = PromiseCreator::lambda([actor_id = actor_id(this), state_id](Result<> result) {
|
|
if (result.is_ok()) {
|
|
send_closure(actor_id, &SecretChatActor::on_outbound_send_message_start, state_id);
|
|
} else {
|
|
send_closure(actor_id, &SecretChatActor::on_promise_error, result.move_as_error(),
|
|
"on_outbound_send_message_start");
|
|
}
|
|
});
|
|
|
|
// update seq_no
|
|
update_seq_no_state(*message);
|
|
|
|
// process action
|
|
if (message->action) {
|
|
on_outbound_action(*message->action, message->message_id);
|
|
}
|
|
|
|
// save_changes
|
|
auto save_changes_finish = PromiseCreator::lambda([actor_id = actor_id(this), state_id](Result<> result) {
|
|
if (result.is_ok()) {
|
|
send_closure(actor_id, &SecretChatActor::on_outbound_save_changes_finish, state_id);
|
|
} else {
|
|
send_closure(actor_id, &SecretChatActor::on_promise_error, result.move_as_error(),
|
|
"on_outbound_save_chages_finish");
|
|
}
|
|
});
|
|
|
|
auto save_changes_start = add_changes(std::move(save_changes_finish));
|
|
|
|
// wait for ack
|
|
auto out_seq_no = state->message->my_out_seq_no - 1;
|
|
if (out_seq_no < seq_no_state_.his_in_seq_no) {
|
|
state->ack_flag = true;
|
|
} else {
|
|
out_seq_no_to_outbound_message_state_token_[out_seq_no] = state_id;
|
|
}
|
|
|
|
// save_logevent => [send_message; save_changes]
|
|
auto save_logevent_finish = PromiseCreator::join(std::move(send_message_start), std::move(save_changes_start));
|
|
|
|
auto logevent_id = state->message->logevent_id();
|
|
if (logevent_id == 0) {
|
|
logevent_id = binlog_add(context_->binlog(), LogEvent::HandlerType::SecretChats, create_storer(*state->message));
|
|
LOG(INFO) << "Outbound secret message [save_logevent] start " << tag("logevent_id", logevent_id);
|
|
context_->binlog()->force_sync(std::move(save_logevent_finish));
|
|
state->message->set_logevent_id(logevent_id);
|
|
} else {
|
|
LOG(INFO) << "Outbound secret message [save_logevent] skip " << tag("logevent_id", logevent_id);
|
|
save_logevent_finish.set_value(Unit());
|
|
}
|
|
promise.set_value(Unit()); // logevent was sent to binlog;
|
|
}
|
|
|
|
void SecretChatActor::on_his_in_seq_no_updated() {
|
|
auto it = begin(out_seq_no_to_outbound_message_state_token_);
|
|
while (it != end(out_seq_no_to_outbound_message_state_token_) && it->first < seq_no_state_.his_in_seq_no) {
|
|
auto token = it->second;
|
|
it = out_seq_no_to_outbound_message_state_token_.erase(it);
|
|
on_outbound_ack(token);
|
|
}
|
|
}
|
|
void SecretChatActor::on_seq_no_state_changed() {
|
|
seq_no_state_changed_ = true;
|
|
}
|
|
|
|
void SecretChatActor::on_pfs_state_changed() {
|
|
LOG(INFO) << "In on_pfs_state_changed: " << pfs_state_;
|
|
pfs_state_changed_ = true;
|
|
}
|
|
Promise<> SecretChatActor::add_changes(Promise<> save_changes_finish) {
|
|
StateChange change;
|
|
if (seq_no_state_changed_) {
|
|
change.seq_no_state_change = SeqNoStateChange(seq_no_state_);
|
|
seq_no_state_changed_ = false;
|
|
}
|
|
if (pfs_state_changed_) {
|
|
change.pfs_state_change = PfsStateChange(pfs_state_);
|
|
pfs_state_changed_ = false;
|
|
}
|
|
|
|
change.save_changes_finish = std::move(save_changes_finish);
|
|
auto save_changes_start_token = changes_processor_.add(std::move(change));
|
|
|
|
return PromiseCreator::lambda([actor_id = actor_id(this), save_changes_start_token](Result<> result) {
|
|
if (result.is_ok()) {
|
|
send_closure(actor_id, &SecretChatActor::on_save_changes_start, save_changes_start_token);
|
|
} else {
|
|
send_closure(actor_id, &SecretChatActor::on_promise_error, result.move_as_error(), "on_save_changes_start");
|
|
}
|
|
});
|
|
}
|
|
|
|
template <class T>
|
|
void SecretChatActor::update_seq_no_state(const T &new_seq_no_state) {
|
|
// Some old updates may arrive. Just ignore them
|
|
if (seq_no_state_.message_id >= new_seq_no_state.message_id &&
|
|
seq_no_state_.my_in_seq_no >= new_seq_no_state.my_in_seq_no &&
|
|
seq_no_state_.my_out_seq_no >= new_seq_no_state.my_out_seq_no &&
|
|
seq_no_state_.his_in_seq_no >= new_seq_no_state.his_in_seq_no) {
|
|
return;
|
|
}
|
|
seq_no_state_.message_id = new_seq_no_state.message_id;
|
|
if (new_seq_no_state.my_in_seq_no != -1) {
|
|
LOG(INFO) << "Have my_in_seq_no: " << seq_no_state_.my_in_seq_no << "--->" << new_seq_no_state.my_in_seq_no;
|
|
seq_no_state_.my_in_seq_no = new_seq_no_state.my_in_seq_no;
|
|
seq_no_state_.my_out_seq_no = new_seq_no_state.my_out_seq_no;
|
|
|
|
auto new_his_layer = new_seq_no_state.his_layer();
|
|
if (new_his_layer != -1) {
|
|
seq_no_state_.his_layer = new_his_layer;
|
|
}
|
|
|
|
if (seq_no_state_.his_in_seq_no != new_seq_no_state.his_in_seq_no) {
|
|
seq_no_state_.his_in_seq_no = new_seq_no_state.his_in_seq_no;
|
|
on_his_in_seq_no_updated();
|
|
}
|
|
}
|
|
|
|
return on_seq_no_state_changed();
|
|
}
|
|
|
|
void SecretChatActor::do_inbound_message_decrypted_pending(unique_ptr<logevent::InboundSecretMessage> message) {
|
|
// Just save logevent if necessary
|
|
auto logevent_id = message->logevent_id();
|
|
|
|
// qts
|
|
auto qts_promise = std::move(message->qts_ack);
|
|
|
|
if (logevent_id == 0) {
|
|
message->is_pending = true;
|
|
message->set_logevent_id(binlog_add(context_->binlog(), LogEvent::HandlerType::SecretChats, create_storer(*message),
|
|
std::move(qts_promise)));
|
|
LOG(INFO) << "Inbound PENDING secret message [save_logevent] start (do not expect finish) "
|
|
<< tag("logevent_id", message->logevent_id());
|
|
} else {
|
|
LOG(INFO) << "Inbound PENDING secret message [save_logevent] skip " << tag("logevent_id", logevent_id);
|
|
CHECK(!qts_promise);
|
|
}
|
|
LOG(INFO) << "Inbound PENDING secret message start " << tag("logevent_id", logevent_id) << tag("message", *message);
|
|
|
|
auto seq_no = message->decrypted_message_layer->out_seq_no_ / 2;
|
|
pending_inbound_messages_[seq_no] = std::move(message);
|
|
}
|
|
|
|
Status SecretChatActor::do_inbound_message_decrypted(unique_ptr<logevent::InboundSecretMessage> message) {
|
|
// InboundSecretMessage
|
|
//
|
|
// 1. [] => Add logevent. [save_logevent]
|
|
// 2. [save_logevent] => Save SeqNoState [save_changes]
|
|
// 3. [save_logevent] => Add message to MessageManager [save_message]
|
|
// Note: if we are able to add message by random_id, we may not wait for (logevent). Otherwise we should force
|
|
// binlog flush.
|
|
// 4. [save_logevent] => Update qts [qts]
|
|
// 5. [save_changes; save_message; ?qts) => Remove logevent [remove_logevent]
|
|
// Note: It is easier not to wait for qts. In the worst case old update will be handled again after restart.
|
|
|
|
auto state_id = inbound_message_states_.create();
|
|
InboundMessageState &state = *inbound_message_states_.get(state_id);
|
|
|
|
// save logevent
|
|
auto logevent_id = message->logevent_id();
|
|
bool need_sync = false;
|
|
if (logevent_id == 0) {
|
|
logevent_id = binlog_add(context_->binlog(), LogEvent::HandlerType::SecretChats, create_storer(*message));
|
|
LOG(INFO) << "Inbound secret message [save_logevent] start " << tag("logevent_id", logevent_id);
|
|
need_sync = true;
|
|
} else {
|
|
if (message->is_pending) {
|
|
message->is_pending = false;
|
|
auto old_logevent_id = logevent_id;
|
|
logevent_id = binlog_add(context_->binlog(), LogEvent::HandlerType::SecretChats, create_storer(*message));
|
|
binlog_erase(context_->binlog(), old_logevent_id);
|
|
LOG(INFO) << "Inbound secret message [save_logevent] rewrite (after pending state) "
|
|
<< tag("logevent_id", logevent_id) << tag("old_logevent_id", old_logevent_id);
|
|
need_sync = true;
|
|
} else {
|
|
LOG(INFO) << "Inbound secret message [save_logevent] skip " << tag("logevent_id", logevent_id);
|
|
}
|
|
}
|
|
LOG(INFO) << "Inbound secret message start " << tag("logevent_id", logevent_id) << tag("message", *message);
|
|
state.logevent_id = logevent_id;
|
|
|
|
// save_message
|
|
auto save_message_finish = PromiseCreator::lambda([actor_id = actor_id(this), state_id](Result<> result) {
|
|
if (result.is_ok()) {
|
|
send_closure(actor_id, &SecretChatActor::on_inbound_save_message_finish, state_id);
|
|
} else {
|
|
send_closure(actor_id, &SecretChatActor::on_promise_error, result.move_as_error(),
|
|
"on_inbound_save_message_finish");
|
|
}
|
|
});
|
|
|
|
// update seq_no
|
|
update_seq_no_state(*message);
|
|
|
|
// drop old key
|
|
if (!pfs_state_.other_auth_key.empty() && message->auth_key_id == pfs_state_.auth_key.id() &&
|
|
pfs_state_.can_forget_other_key) {
|
|
LOG(INFO) << "Drop old auth key " << tag("auth_key_id", format::as_hex(pfs_state_.other_auth_key.id()));
|
|
pfs_state_.other_auth_key = mtproto::AuthKey();
|
|
on_pfs_state_changed();
|
|
}
|
|
|
|
// qts
|
|
auto qts_promise = std::move(message->qts_ack);
|
|
|
|
// process message
|
|
tl_object_ptr<telegram_api::encryptedFile> file;
|
|
if (message->has_encrypted_file) {
|
|
file = message->file.as_encrypted_file();
|
|
}
|
|
|
|
if (message->decrypted_message_layer->message_->get_id() == secret_api::decryptedMessage46::ID) {
|
|
auto old = move_tl_object_as<secret_api::decryptedMessage46>(message->decrypted_message_layer->message_);
|
|
old->flags_ &= ~secret_api::decryptedMessage::GROUPED_ID_MASK; // just in case
|
|
message->decrypted_message_layer->message_ = secret_api::make_object<secret_api::decryptedMessage>(
|
|
old->flags_, old->random_id_, old->ttl_, std::move(old->message_), std::move(old->media_),
|
|
std::move(old->entities_), std::move(old->via_bot_name_), old->reply_to_random_id_, 0);
|
|
}
|
|
if (message->decrypted_message_layer->message_->get_id() == secret_api::decryptedMessageService8::ID) {
|
|
auto old = move_tl_object_as<secret_api::decryptedMessageService8>(message->decrypted_message_layer->message_);
|
|
message->decrypted_message_layer->message_ =
|
|
secret_api::make_object<secret_api::decryptedMessageService>(old->random_id_, std::move(old->action_));
|
|
}
|
|
|
|
// NB: message is invalid after this 'move_as'
|
|
// Send update through context_
|
|
// Note, that update may be sent multiple times and should be somehow protected from replay.
|
|
// Luckily all updates seems to be idempotent.
|
|
// We could use ChangesProcessor to mark logevent as sent to context_, but I don't see any advantages of this
|
|
// approach.
|
|
if (message->decrypted_message_layer->message_->get_id() == secret_api::decryptedMessage::ID) {
|
|
auto decrypted_message =
|
|
move_tl_object_as<secret_api::decryptedMessage>(message->decrypted_message_layer->message_);
|
|
context_->on_inbound_message(get_user_id(), MessageId(ServerMessageId(message->message_id)), message->date,
|
|
std::move(file), std::move(decrypted_message), std::move(save_message_finish));
|
|
} else if (message->decrypted_message_layer->message_->get_id() == secret_api::decryptedMessageService::ID) {
|
|
auto decrypted_message_service =
|
|
move_tl_object_as<secret_api::decryptedMessageService>(message->decrypted_message_layer->message_);
|
|
|
|
auto action = std::move(decrypted_message_service->action_);
|
|
switch (action->get_id()) {
|
|
case secret_api::decryptedMessageActionDeleteMessages::ID:
|
|
// Corresponding logevent won't be deleted before promise returned by add_changes is set.
|
|
context_->on_delete_messages(
|
|
static_cast<const secret_api::decryptedMessageActionDeleteMessages &>(*action).random_ids_,
|
|
std::move(save_message_finish));
|
|
break;
|
|
case secret_api::decryptedMessageActionFlushHistory::ID:
|
|
context_->on_flush_history(MessageId(ServerMessageId(message->message_id)), std::move(save_message_finish));
|
|
break;
|
|
case secret_api::decryptedMessageActionReadMessages::ID: {
|
|
const auto &random_ids =
|
|
static_cast<const secret_api::decryptedMessageActionReadMessages &>(*action).random_ids_;
|
|
if (random_ids.size() == 1) {
|
|
context_->on_read_message(random_ids[0], std::move(save_message_finish));
|
|
} else { // probably never happens
|
|
MultiPromiseActorSafe mpas{"ReadSecretMessagesMultiPromiseActor"};
|
|
mpas.add_promise(std::move(save_message_finish));
|
|
auto lock = mpas.get_promise();
|
|
for (auto random_id : random_ids) {
|
|
context_->on_read_message(random_id, mpas.get_promise());
|
|
}
|
|
lock.set_value(Unit());
|
|
}
|
|
break;
|
|
}
|
|
case secret_api::decryptedMessageActionScreenshotMessages::ID:
|
|
context_->on_screenshot_taken(get_user_id(), MessageId(ServerMessageId(message->message_id)), message->date,
|
|
decrypted_message_service->random_id_, std::move(save_message_finish));
|
|
break;
|
|
case secret_api::decryptedMessageActionSetMessageTTL::ID:
|
|
context_->on_set_ttl(get_user_id(), MessageId(ServerMessageId(message->message_id)), message->date,
|
|
static_cast<const secret_api::decryptedMessageActionSetMessageTTL &>(*action).ttl_seconds_,
|
|
decrypted_message_service->random_id_, std::move(save_message_finish));
|
|
break;
|
|
default:
|
|
/*
|
|
decryptedMessageActionResend#511110b0 start_seq_no:int end_seq_no:int = DecryptedMessageAction;
|
|
decryptedMessageActionNotifyLayer#f3048883 layer:int = DecryptedMessageAction;
|
|
decryptedMessageActionTyping#ccb27641 action:SendMessageAction = DecryptedMessageAction;
|
|
decryptedMessageActionRequestKey#f3c9611b exchange_id:long g_a:bytes = DecryptedMessageAction;
|
|
decryptedMessageActionAcceptKey#6fe1735b exchange_id:long g_b:bytes key_fingerprint:long = DecryptedMessageAction;
|
|
decryptedMessageActionAbortKey#dd05ec6b exchange_id:long = DecryptedMessageAction;
|
|
decryptedMessageActionCommitKey#ec2e0b9b exchange_id:long key_fingerprint:long = DecryptedMessageAction;
|
|
decryptedMessageActionNoop#a82fdd63 = DecryptedMessageAction;
|
|
*/
|
|
save_message_finish.set_value(Unit());
|
|
break;
|
|
}
|
|
|
|
state.message_id = message->message_id;
|
|
TRY_STATUS(on_inbound_action(*action, message->message_id));
|
|
} else {
|
|
LOG(ERROR) << "INGORE MESSAGE: " << to_string(message->decrypted_message_layer);
|
|
save_message_finish.set_value(Unit());
|
|
}
|
|
|
|
// save_changes
|
|
auto save_changes_finish = PromiseCreator::lambda([actor_id = actor_id(this), state_id](Result<> result) {
|
|
if (result.is_ok()) {
|
|
send_closure(actor_id, &SecretChatActor::on_inbound_save_changes_finish, state_id);
|
|
} else {
|
|
send_closure(actor_id, &SecretChatActor::on_promise_error, result.move_as_error(),
|
|
"on_inbound_save_changes_finish");
|
|
}
|
|
});
|
|
auto save_changes_start = add_changes(std::move(save_changes_finish));
|
|
|
|
// save_logevent
|
|
auto save_logevent_finish = PromiseCreator::join(std::move(save_changes_start), std::move(qts_promise));
|
|
if (need_sync) {
|
|
// TODO: lazy sync is enough
|
|
context_->binlog()->force_sync(std::move(save_logevent_finish));
|
|
} else {
|
|
save_logevent_finish.set_value(Unit());
|
|
}
|
|
return Status::OK();
|
|
}
|
|
|
|
void SecretChatActor::on_save_changes_start(ChangesProcessor<StateChange>::Id save_changes_token) {
|
|
if (close_flag_) {
|
|
return;
|
|
}
|
|
SeqNoStateChange seq_no_state_change;
|
|
PfsStateChange pfs_state_change;
|
|
std::vector<Promise<Unit>> save_changes_finish_promises;
|
|
changes_processor_.finish(save_changes_token, [&](StateChange &&change) {
|
|
save_changes_finish_promises.emplace_back(std::move(change.save_changes_finish));
|
|
if (change.seq_no_state_change) {
|
|
seq_no_state_change = std::move(change.seq_no_state_change);
|
|
}
|
|
if (change.pfs_state_change) {
|
|
pfs_state_change = std::move(change.pfs_state_change);
|
|
}
|
|
});
|
|
if (seq_no_state_change) {
|
|
LOG(INFO) << "SAVE SeqNoState " << seq_no_state_change;
|
|
context_->secret_chat_db()->set_value(seq_no_state_change);
|
|
}
|
|
if (pfs_state_change) {
|
|
LOG(INFO) << "SAVE PfsState " << pfs_state_change;
|
|
saved_pfs_state_message_id_ = pfs_state_change.message_id;
|
|
context_->secret_chat_db()->set_value(pfs_state_change);
|
|
}
|
|
// NB: we may not wait till database is flushed, because every other change will be in the same binlog
|
|
for (auto &save_changes_finish : save_changes_finish_promises) {
|
|
save_changes_finish.set_value(Unit());
|
|
}
|
|
}
|
|
|
|
void SecretChatActor::on_inbound_save_message_finish(uint64 state_id) {
|
|
if (close_flag_) {
|
|
return;
|
|
}
|
|
auto *state = inbound_message_states_.get(state_id);
|
|
CHECK(state);
|
|
LOG(INFO) << "Inbound message [save_message] finish " << tag("logevent_id", state->logevent_id);
|
|
state->save_message_finish = true;
|
|
inbound_loop(state, state_id);
|
|
}
|
|
|
|
void SecretChatActor::on_inbound_save_changes_finish(uint64 state_id) {
|
|
if (close_flag_) {
|
|
return;
|
|
}
|
|
auto *state = inbound_message_states_.get(state_id);
|
|
CHECK(state);
|
|
LOG(INFO) << "Inbound message [save_changes] finish " << tag("logevent_id", state->logevent_id);
|
|
state->save_changes_finish = true;
|
|
inbound_loop(state, state_id);
|
|
}
|
|
|
|
void SecretChatActor::inbound_loop(InboundMessageState *state, uint64 state_id) {
|
|
if (close_flag_) {
|
|
return;
|
|
}
|
|
if (!state->save_changes_finish || !state->save_message_finish) {
|
|
return;
|
|
}
|
|
LOG(INFO) << "Inbound message [remove_logevent] start " << tag("logevent_id", state->logevent_id);
|
|
binlog_erase(context_->binlog(), state->logevent_id);
|
|
|
|
inbound_message_states_.erase(state_id);
|
|
}
|
|
|
|
NetQueryPtr SecretChatActor::create_net_query(const logevent::OutboundSecretMessage &message) {
|
|
NetQueryPtr query;
|
|
if (message.is_service) {
|
|
CHECK(message.file.empty());
|
|
query = context_->net_query_creator().create(
|
|
UniqueId::next(UniqueId::Type::Default, static_cast<uint8>(QueryType::Message)),
|
|
create_storer(telegram_api::messages_sendEncryptedService(get_input_chat(), message.random_id,
|
|
message.encrypted_message.clone())));
|
|
query->total_timeout_limit = 1000000000; // inf. We will re-sent it immediately anyway.
|
|
} else if (message.file.empty()) {
|
|
query = context_->net_query_creator().create(
|
|
UniqueId::next(UniqueId::Type::Default, static_cast<uint8>(QueryType::Message)),
|
|
create_storer(telegram_api::messages_sendEncrypted(get_input_chat(), message.random_id,
|
|
message.encrypted_message.clone())));
|
|
} else {
|
|
query = context_->net_query_creator().create(
|
|
UniqueId::next(UniqueId::Type::Default, static_cast<uint8>(QueryType::Message)),
|
|
create_storer(telegram_api::messages_sendEncryptedFile(get_input_chat(), message.random_id,
|
|
message.encrypted_message.clone(),
|
|
message.file.as_input_encrypted_file())));
|
|
}
|
|
if (message.is_external && context_->get_config_option_boolean("use_quick_ack")) {
|
|
query->quick_ack_promise_ =
|
|
PromiseCreator::lambda([actor_id = actor_id(this), random_id = message.random_id](
|
|
Unit) { send_closure(actor_id, &SecretChatActor::on_send_message_ack, random_id); },
|
|
PromiseCreator::Ignore());
|
|
}
|
|
|
|
return query;
|
|
}
|
|
|
|
void SecretChatActor::on_outbound_send_message_start(uint64 state_id) {
|
|
auto *state = outbound_message_states_.get(state_id);
|
|
if (state == nullptr) {
|
|
LOG(INFO) << "Outbound message [send_message] start ignored (unknown state_id) " << tag("state_id", state_id);
|
|
return;
|
|
}
|
|
|
|
auto *message = state->message.get();
|
|
|
|
if (!message->is_sent) {
|
|
LOG(INFO) << "Outbound message [send_message] start " << tag("logevent_id", state->message->logevent_id());
|
|
auto query = create_net_query(*message);
|
|
state->net_query_id = query->id();
|
|
state->net_query_ref = query.get_weak();
|
|
state->net_query_may_fail = state->message->is_rewritable;
|
|
context_->send_net_query(std::move(query), actor_shared(this, state_id), true);
|
|
} else {
|
|
LOG(INFO) << "Outbound message [send_message] start dummy " << tag("logevent_id", state->message->logevent_id());
|
|
on_outbound_send_message_finish(state_id);
|
|
}
|
|
}
|
|
|
|
void SecretChatActor::outbound_resend(uint64 state_id) {
|
|
if (close_flag_) {
|
|
return;
|
|
}
|
|
auto *state = outbound_message_states_.get(state_id);
|
|
CHECK(state);
|
|
|
|
state->message->is_sent = false;
|
|
state->net_query_id = 0;
|
|
state->net_query_ref = NetQueryRef();
|
|
LOG(INFO) << "Outbound message [resend] " << tag("logevent_id", state->message->logevent_id())
|
|
<< tag("state_id", state_id);
|
|
|
|
binlog_rewrite(context_->binlog(), state->message->logevent_id(), LogEvent::HandlerType::SecretChats,
|
|
create_storer(*state->message));
|
|
auto send_message_start = PromiseCreator::lambda([actor_id = actor_id(this), state_id](Result<> result) {
|
|
if (result.is_ok()) {
|
|
send_closure(actor_id, &SecretChatActor::on_outbound_send_message_start, state_id);
|
|
} else {
|
|
send_closure(actor_id, &SecretChatActor::on_promise_error, result.move_as_error(),
|
|
"on_outbound_send_message_start");
|
|
}
|
|
});
|
|
context_->binlog()->force_sync(std::move(send_message_start));
|
|
}
|
|
|
|
Status SecretChatActor::outbound_rewrite_with_empty(uint64 state_id) {
|
|
if (close_flag_) {
|
|
return Status::OK();
|
|
}
|
|
auto *state = outbound_message_states_.get(state_id);
|
|
if (state == nullptr || !state->message->is_rewritable) {
|
|
return Status::OK();
|
|
}
|
|
cancel_query(state->net_query_ref);
|
|
|
|
MutableSlice data = state->message->encrypted_message.as_slice();
|
|
CHECK(is_aligned_pointer<4>(data.data()));
|
|
|
|
// Rewrite with delete itself.
|
|
tl_object_ptr<secret_api::DecryptedMessage> message = secret_api::make_object<secret_api::decryptedMessageService>(
|
|
state->message->random_id, secret_api::make_object<secret_api::decryptedMessageActionDeleteMessages>(
|
|
std::vector<int64>{static_cast<int64>(state->message->random_id)}));
|
|
|
|
TRY_RESULT(encrypted_message, create_encrypted_message(current_layer(), state->message->my_in_seq_no,
|
|
state->message->my_out_seq_no, message));
|
|
state->message->encrypted_message = std::move(encrypted_message);
|
|
LOG(INFO) << tag("crc", crc64(state->message->encrypted_message.as_slice()));
|
|
state->message->is_rewritable = false;
|
|
state->message->is_external = false;
|
|
state->message->is_service = true;
|
|
state->message->file = logevent::EncryptedInputFile::from_input_encrypted_file(nullptr);
|
|
binlog_rewrite(context_->binlog(), state->message->logevent_id(), LogEvent::HandlerType::SecretChats,
|
|
create_storer(*state->message));
|
|
return Status::OK();
|
|
}
|
|
|
|
void SecretChatActor::on_outbound_send_message_result(NetQueryPtr query, Promise<NetQueryPtr> resend_promise) {
|
|
if (close_flag_) {
|
|
return;
|
|
}
|
|
auto state_id = get_link_token();
|
|
auto *state = outbound_message_states_.get(state_id);
|
|
if (!state) {
|
|
LOG(INFO) << "Ignore old net query result " << tag("state_id", state_id);
|
|
query->clear();
|
|
return;
|
|
}
|
|
CHECK(state);
|
|
if (state->net_query_id != query->id()) {
|
|
LOG(INFO) << "Ignore old net query result " << tag("logevent_id", state->message->logevent_id())
|
|
<< tag("query_id", query->id()) << tag("state_query_id", state->net_query_id) << query;
|
|
query->clear();
|
|
return;
|
|
}
|
|
|
|
state->net_query_id = 0;
|
|
state->net_query_ref = NetQueryRef();
|
|
|
|
auto r_result = fetch_result<telegram_api::messages_sendEncrypted>(std::move(query));
|
|
if (r_result.is_error()) {
|
|
auto error = r_result.move_as_error();
|
|
|
|
auto send_message_error_promise =
|
|
PromiseCreator::lambda([actor_id = actor_id(this), state_id, error = error.clone(),
|
|
resend_promise = std::move(resend_promise)](Result<> result) mutable {
|
|
if (result.is_ok()) {
|
|
send_closure(actor_id, &SecretChatActor::on_outbound_send_message_error, state_id, std::move(error),
|
|
std::move(resend_promise));
|
|
} else {
|
|
send_closure(actor_id, &SecretChatActor::on_promise_error, result.move_as_error(),
|
|
"on_outbound_send_message_error");
|
|
}
|
|
});
|
|
|
|
if (state->message->is_external) {
|
|
LOG(INFO) << "Outbound secret message [send_message] failed, rewrite it with dummy "
|
|
<< tag("logevent_id", state->message->logevent_id()) << tag("error", error);
|
|
state->send_result_ = [this, random_id = state->message->random_id, error_code = error.code(),
|
|
error_message = error.message()](Promise<> promise) {
|
|
this->context_->on_send_message_error(random_id, Status::Error(error_code, error_message), std::move(promise));
|
|
};
|
|
state->send_result_(std::move(send_message_error_promise));
|
|
} else {
|
|
// Just resend.
|
|
LOG(INFO) << "Outbound secret message [send_message] failed, resend it "
|
|
<< tag("logevent_id", state->message->logevent_id()) << tag("error", error);
|
|
send_message_error_promise.set_value(Unit());
|
|
}
|
|
return;
|
|
}
|
|
|
|
auto result = r_result.move_as_ok();
|
|
LOG(INFO) << "Got messages_sendEncrypted result: " << tag("message_id", state->message->message_id)
|
|
<< tag("random_id", state->message->random_id) << to_string(*result);
|
|
|
|
auto send_message_finish_promise = PromiseCreator::lambda([actor_id = actor_id(this), state_id](Result<> result) {
|
|
if (result.is_ok()) {
|
|
send_closure(actor_id, &SecretChatActor::on_outbound_send_message_finish, state_id);
|
|
} else {
|
|
send_closure(actor_id, &SecretChatActor::on_promise_error, result.move_as_error(),
|
|
"on_outbound_send_message_finish");
|
|
}
|
|
});
|
|
|
|
if (state->message->is_external) {
|
|
switch (result->get_id()) {
|
|
case telegram_api::messages_sentEncryptedMessage::ID: {
|
|
auto sent = move_tl_object_as<telegram_api::messages_sentEncryptedMessage>(result);
|
|
state->send_result_ = [this, random_id = state->message->random_id,
|
|
message_id = MessageId(ServerMessageId(state->message->message_id)),
|
|
date = sent->date_](Promise<> promise) {
|
|
this->context_->on_send_message_ok(random_id, message_id, date, nullptr, std::move(promise));
|
|
};
|
|
state->send_result_(std::move(send_message_finish_promise));
|
|
return;
|
|
}
|
|
case telegram_api::messages_sentEncryptedFile::ID: {
|
|
auto sent = move_tl_object_as<telegram_api::messages_sentEncryptedFile>(result);
|
|
std::function<telegram_api::object_ptr<telegram_api::EncryptedFile>()> get_file;
|
|
telegram_api::downcast_call(
|
|
*sent->file_, overloaded(
|
|
[&](telegram_api::encryptedFileEmpty &) {
|
|
state->message->file = logevent::EncryptedInputFile::from_input_encrypted_file(
|
|
telegram_api::inputEncryptedFileEmpty());
|
|
get_file = [] { return telegram_api::make_object<telegram_api::encryptedFileEmpty>(); };
|
|
},
|
|
[&](telegram_api::encryptedFile &file) {
|
|
state->message->file = logevent::EncryptedInputFile::from_input_encrypted_file(
|
|
telegram_api::inputEncryptedFile(file.id_, file.access_hash_));
|
|
get_file = [id = file.id_, access_hash = file.access_hash_, size = file.size_,
|
|
dc_id = file.dc_id_, key_fingerprint = file.key_fingerprint_] {
|
|
return telegram_api::make_object<telegram_api::encryptedFile>(id, access_hash, size,
|
|
dc_id, key_fingerprint);
|
|
};
|
|
}));
|
|
|
|
state->send_result_ = [this, random_id = state->message->random_id,
|
|
message_id = MessageId(ServerMessageId(state->message->message_id)), date = sent->date_,
|
|
get_file = std::move(get_file)](Promise<> promise) {
|
|
this->context_->on_send_message_ok(random_id, message_id, date, get_file(), std::move(promise));
|
|
};
|
|
state->send_result_(std::move(send_message_finish_promise));
|
|
return;
|
|
}
|
|
}
|
|
}
|
|
send_message_finish_promise.set_value(Unit());
|
|
}
|
|
|
|
void SecretChatActor::on_outbound_send_message_error(uint64 state_id, Status error,
|
|
Promise<NetQueryPtr> resend_promise) {
|
|
if (close_flag_) {
|
|
return;
|
|
}
|
|
if (context_->close_flag()) {
|
|
return;
|
|
}
|
|
auto *state = outbound_message_states_.get(state_id);
|
|
if (!state) {
|
|
return;
|
|
}
|
|
bool need_sync = false;
|
|
if (state->net_query_may_fail) {
|
|
// message could be already non-rewritable, if it was deleted during NetQuery execution.
|
|
if (state->message->is_rewritable) {
|
|
delete_message(state->message->random_id, Promise<>());
|
|
// state pointer may be invalidated
|
|
state = outbound_message_states_.get(state_id);
|
|
need_sync = true;
|
|
}
|
|
} else {
|
|
bool should_fail = false;
|
|
if (error.code() == 429) {
|
|
should_fail = false;
|
|
} else if (error.code() == 400 && error.message() == "ENCRYPTION_DECLINED") {
|
|
should_fail = true;
|
|
} else {
|
|
LOG(ERROR) << "Got unknown error for encrypted service message: " << error;
|
|
should_fail = true;
|
|
}
|
|
if (should_fail) {
|
|
return on_fatal_error(std::move(error));
|
|
}
|
|
}
|
|
auto query = create_net_query(*state->message);
|
|
state->net_query_id = query->id();
|
|
|
|
CHECK(resend_promise);
|
|
auto send_message_start =
|
|
PromiseCreator::lambda([actor_id = actor_id(this), resend_promise = std::move(resend_promise),
|
|
query = std::move(query)](Result<> result) mutable {
|
|
if (result.is_ok()) {
|
|
resend_promise.set_value(std::move(query));
|
|
} else {
|
|
send_closure(actor_id, &SecretChatActor::on_promise_error, result.move_as_error(), "resend_query");
|
|
}
|
|
});
|
|
if (need_sync) {
|
|
context_->binlog()->force_sync(std::move(send_message_start));
|
|
} else {
|
|
send_message_start.set_value(Unit());
|
|
}
|
|
}
|
|
|
|
void SecretChatActor::on_outbound_send_message_finish(uint64 state_id) {
|
|
if (close_flag_) {
|
|
return;
|
|
}
|
|
auto *state = outbound_message_states_.get(state_id);
|
|
if (!state) {
|
|
return;
|
|
}
|
|
LOG(INFO) << "Outbound secret message [send_message] finish " << tag("logevent_id", state->message->logevent_id());
|
|
state->send_message_finish_flag = true;
|
|
state->outer_send_message_finish.set_value(Unit());
|
|
|
|
outbound_loop(state, state_id);
|
|
}
|
|
|
|
void SecretChatActor::on_outbound_save_changes_finish(uint64 state_id) {
|
|
if (close_flag_) {
|
|
return;
|
|
}
|
|
auto *state = outbound_message_states_.get(state_id);
|
|
CHECK(state);
|
|
LOG(INFO) << "Outbound secret message [save_changes] finish " << tag("logevent_id", state->message->logevent_id());
|
|
state->save_changes_finish_flag = true;
|
|
outbound_loop(state, state_id);
|
|
}
|
|
|
|
void SecretChatActor::on_outbound_ack(uint64 state_id) {
|
|
if (close_flag_) {
|
|
return;
|
|
}
|
|
auto *state = outbound_message_states_.get(state_id);
|
|
CHECK(state);
|
|
LOG(INFO) << "Outbound secret message [ack] finish " << tag("logevent_id", state->message->logevent_id());
|
|
state->ack_flag = true;
|
|
outbound_loop(state, state_id);
|
|
}
|
|
|
|
void SecretChatActor::on_outbound_outer_send_message_promise(uint64 state_id, Promise<> promise) {
|
|
if (close_flag_) {
|
|
promise.set_error(Status::Error(400, "Chat is closed"));
|
|
return;
|
|
}
|
|
auto *state = outbound_message_states_.get(state_id);
|
|
CHECK(state);
|
|
LOG(INFO) << "Outbound secret message [TODO] " << tag("logevent_id", state->message->logevent_id());
|
|
promise.set_value(Unit()); // Seems like this message is at least stored to binlog already
|
|
if (state->send_result_) {
|
|
state->send_result_({});
|
|
} else {
|
|
context_->on_send_message_error(state->message->random_id, Status::Error(400, "Message has already been sent"),
|
|
Auto());
|
|
}
|
|
}
|
|
|
|
void SecretChatActor::outbound_loop(OutboundMessageState *state, uint64 state_id) {
|
|
if (close_flag_) {
|
|
return;
|
|
}
|
|
if (state->save_changes_finish_flag /*&& state->send_message_finish_flag*/ && state->ack_flag) {
|
|
LOG(INFO) << "Outbound message [remove_logevent] start " << tag("logevent_id", state->message->logevent_id());
|
|
binlog_erase(context_->binlog(), state->message->logevent_id());
|
|
|
|
random_id_to_outbound_message_state_token_.erase(state->message->random_id);
|
|
LOG(INFO) << "Outbound message finish (lazy) " << tag("logevent_id", state->message->logevent_id());
|
|
outbound_message_states_.erase(state_id);
|
|
return;
|
|
}
|
|
|
|
if (state->save_changes_finish_flag && state->send_message_finish_flag &&
|
|
!state->message->is_sent) { // [rewrite_logevent]
|
|
LOG(INFO) << "Outbound message [rewrite_logevent] start " << tag("logevent_id", state->message->logevent_id());
|
|
state->message->is_sent = true;
|
|
binlog_rewrite(context_->binlog(), state->message->logevent_id(), LogEvent::HandlerType::SecretChats,
|
|
create_storer(*state->message));
|
|
}
|
|
}
|
|
|
|
template <class T>
|
|
Status SecretChatActor::save_common_info(T &update) {
|
|
if (auth_state_.id != update.id_) {
|
|
return Status::Error(PSLICE() << "chat_id mismatch: " << tag("mine", auth_state_.id) << tag("outer", update.id_));
|
|
}
|
|
auth_state_.id = update.id_;
|
|
auth_state_.access_hash = update.access_hash_;
|
|
return Status::OK();
|
|
}
|
|
|
|
Status SecretChatActor::on_update_chat(telegram_api::encryptedChatRequested &update) {
|
|
if (auth_state_.state != State::Empty) {
|
|
LOG(WARNING) << "Unexpected ChatRequested ignored: " << to_string(update);
|
|
return Status::OK();
|
|
}
|
|
auth_state_.state = State::SendAccept;
|
|
auth_state_.x = 1;
|
|
auth_state_.user_id = update.admin_id_;
|
|
auth_state_.date = context_->unix_time();
|
|
TRY_STATUS(save_common_info(update));
|
|
auth_state_.handshake.set_g_a(update.g_a_.as_slice());
|
|
send_update_secret_chat();
|
|
return Status::OK();
|
|
}
|
|
Status SecretChatActor::on_update_chat(telegram_api::encryptedChatEmpty &update) {
|
|
return Status::OK();
|
|
}
|
|
Status SecretChatActor::on_update_chat(telegram_api::encryptedChatWaiting &update) {
|
|
if (auth_state_.state != State::WaitRequestResponse && auth_state_.state != State::WaitAcceptResponse) {
|
|
LOG(WARNING) << "Unexpected ChatWaiting ignored";
|
|
return Status::OK();
|
|
}
|
|
TRY_STATUS(save_common_info(update));
|
|
send_update_secret_chat();
|
|
return Status::OK();
|
|
}
|
|
Status SecretChatActor::on_update_chat(telegram_api::encryptedChat &update) {
|
|
if (auth_state_.state != State::WaitRequestResponse && auth_state_.state != State::WaitAcceptResponse) {
|
|
LOG(WARNING) << "Unexpected Chat ignored";
|
|
return Status::OK();
|
|
}
|
|
TRY_STATUS(save_common_info(update));
|
|
if (auth_state_.state == State::WaitRequestResponse) {
|
|
auth_state_.handshake.set_g_a(update.g_a_or_b_.as_slice());
|
|
TRY_STATUS(auth_state_.handshake.run_checks(true, context_->dh_callback()));
|
|
auto id_and_key = auth_state_.handshake.gen_key();
|
|
pfs_state_.auth_key = mtproto::AuthKey(id_and_key.first, std::move(id_and_key.second));
|
|
calc_key_hash();
|
|
}
|
|
if (static_cast<int64>(pfs_state_.auth_key.id()) != update.key_fingerprint_) {
|
|
return Status::Error("Key fingerprint mismatch");
|
|
}
|
|
auth_state_.state = State::Ready;
|
|
if (create_logevent_id_ != 0) {
|
|
binlog_erase(context_->binlog(), create_logevent_id_);
|
|
create_logevent_id_ = 0;
|
|
}
|
|
|
|
// NB: order is important
|
|
context_->secret_chat_db()->set_value(pfs_state_);
|
|
context_->secret_chat_db()->set_value(auth_state_);
|
|
LOG(INFO) << "OK! Ready!";
|
|
send_update_secret_chat();
|
|
send_action(secret_api::make_object<secret_api::decryptedMessageActionNotifyLayer>(MY_LAYER), SendFlag::None,
|
|
Promise<>());
|
|
return Status::OK();
|
|
}
|
|
Status SecretChatActor::on_update_chat(telegram_api::encryptedChatDiscarded &update) {
|
|
return Status::Error("Chat discarded");
|
|
}
|
|
|
|
Status SecretChatActor::on_update_chat(NetQueryPtr query) {
|
|
static_assert(std::is_same<telegram_api::messages_requestEncryption::ReturnType,
|
|
telegram_api::messages_acceptEncryption::ReturnType>::value,
|
|
"");
|
|
TRY_RESULT(config, fetch_result<telegram_api::messages_requestEncryption>(std::move(query)));
|
|
TRY_STATUS(on_update_chat(std::move(config)));
|
|
if (auth_state_.state == State::WaitRequestResponse) {
|
|
context_->secret_chat_db()->set_value(auth_state_);
|
|
context_->binlog()->force_sync(Promise<>());
|
|
}
|
|
return Status::OK();
|
|
}
|
|
|
|
Status SecretChatActor::on_update_chat(telegram_api::object_ptr<telegram_api::EncryptedChat> chat) {
|
|
Status res;
|
|
downcast_call(*chat, [&](auto &obj) { res = this->on_update_chat(obj); });
|
|
return res;
|
|
}
|
|
|
|
Status SecretChatActor::on_read_history(NetQueryPtr query) {
|
|
if (query.generation() == read_history_query_.generation()) {
|
|
read_history_query_ = NetQueryRef();
|
|
read_history_promise_.set_value(Unit());
|
|
}
|
|
return Status::OK();
|
|
}
|
|
|
|
void SecretChatActor::start_up() {
|
|
LOG(INFO) << "SecretChatActor: start_up";
|
|
// auto start = Time::now();
|
|
auto r_auth_state = context_->secret_chat_db()->get_value<AuthState>();
|
|
if (r_auth_state.is_ok()) {
|
|
auth_state_ = r_auth_state.move_as_ok();
|
|
}
|
|
if (!can_be_empty_ && auth_state_.state == State::Empty) {
|
|
LOG(WARNING) << "Close Secret chat because it is empty";
|
|
return stop();
|
|
}
|
|
if (auth_state_.state == State::Closed) {
|
|
close_flag_ = true;
|
|
}
|
|
auto r_seq_no_state = context_->secret_chat_db()->get_value<SeqNoState>();
|
|
if (r_seq_no_state.is_ok()) {
|
|
seq_no_state_ = r_seq_no_state.move_as_ok();
|
|
}
|
|
auto r_config_state = context_->secret_chat_db()->get_value<ConfigState>();
|
|
if (r_config_state.is_ok()) {
|
|
config_state_ = r_config_state.move_as_ok();
|
|
}
|
|
auto r_pfs_state = context_->secret_chat_db()->get_value<PfsState>();
|
|
if (r_pfs_state.is_ok()) {
|
|
pfs_state_ = r_pfs_state.move_as_ok();
|
|
}
|
|
saved_pfs_state_message_id_ = pfs_state_.message_id;
|
|
pfs_state_.last_timestamp = Time::now();
|
|
|
|
send_update_secret_chat();
|
|
get_dh_config();
|
|
|
|
// auto end = Time::now();
|
|
// CHECK(end - start < 0.2);
|
|
LOG(INFO) << "In start_up with SeqNoState " << seq_no_state_;
|
|
LOG(INFO) << "In start_up with PfsState " << pfs_state_;
|
|
}
|
|
|
|
void SecretChatActor::get_dh_config() {
|
|
if (auth_state_.state != State::Empty) {
|
|
return;
|
|
}
|
|
|
|
auto dh_config = context_->dh_config();
|
|
if (dh_config) {
|
|
auth_state_.dh_config = *dh_config;
|
|
}
|
|
|
|
auto version = auth_state_.dh_config.version;
|
|
int random_length = 0;
|
|
telegram_api::messages_getDhConfig tl_query(version, random_length);
|
|
|
|
auto query = context_->net_query_creator().create(
|
|
UniqueId::next(UniqueId::Type::Default, static_cast<uint8>(QueryType::DhConfig)), create_storer(tl_query));
|
|
context_->send_net_query(std::move(query), actor_shared(this), false);
|
|
}
|
|
|
|
Status SecretChatActor::on_dh_config(NetQueryPtr query) {
|
|
LOG(INFO) << "Got dh config";
|
|
TRY_RESULT(config, fetch_result<telegram_api::messages_getDhConfig>(std::move(query)));
|
|
downcast_call(*config, [&](auto &obj) { this->on_dh_config(obj); });
|
|
TRY_STATUS(DhHandshake::check_config(auth_state_.dh_config.g, auth_state_.dh_config.prime, context_->dh_callback()));
|
|
auth_state_.handshake.set_config(auth_state_.dh_config.g, auth_state_.dh_config.prime);
|
|
return Status::OK();
|
|
}
|
|
|
|
void SecretChatActor::on_dh_config(telegram_api::messages_dhConfigNotModified &dh_not_modified) {
|
|
Random::add_seed(dh_not_modified.random_.as_slice());
|
|
}
|
|
|
|
void SecretChatActor::on_dh_config(telegram_api::messages_dhConfig &dh) {
|
|
auto dh_config = std::make_shared<DhConfig>();
|
|
dh_config->version = dh.version_;
|
|
dh_config->prime = dh.p_.as_slice().str();
|
|
dh_config->g = dh.g_;
|
|
Random::add_seed(dh.random_.as_slice());
|
|
auth_state_.dh_config = *dh_config;
|
|
context_->set_dh_config(dh_config);
|
|
}
|
|
|
|
void SecretChatActor::calc_key_hash() {
|
|
unsigned char sha1_buf[20];
|
|
auto sha1_slice = Slice(sha1_buf, 20);
|
|
sha1(pfs_state_.auth_key.key(), sha1_buf);
|
|
|
|
unsigned char sha256_buf[32];
|
|
auto sha256_slice = MutableSlice(sha256_buf, 32);
|
|
sha256(pfs_state_.auth_key.key(), sha256_slice);
|
|
|
|
auth_state_.key_hash = sha1_slice.truncate(16).str() + sha256_slice.truncate(20).str();
|
|
}
|
|
|
|
void SecretChatActor::send_update_secret_chat() {
|
|
if (auth_state_.state == State::Empty) {
|
|
return;
|
|
}
|
|
SecretChatState state;
|
|
if (auth_state_.state == State::Ready) {
|
|
state = SecretChatState::Active;
|
|
} else if (auth_state_.state == State::Closed) {
|
|
state = SecretChatState::Closed;
|
|
} else {
|
|
state = SecretChatState::Waiting;
|
|
}
|
|
context_->on_update_secret_chat(auth_state_.access_hash, get_user_id(), state, auth_state_.x == 0, config_state_.ttl,
|
|
auth_state_.date, auth_state_.key_hash, current_layer());
|
|
}
|
|
|
|
void SecretChatActor::on_outbound_action(secret_api::decryptedMessageActionSetMessageTTL &set_ttl) {
|
|
config_state_.ttl = set_ttl.ttl_seconds_;
|
|
context_->secret_chat_db()->set_value(config_state_);
|
|
send_update_secret_chat();
|
|
}
|
|
void SecretChatActor::on_outbound_action(secret_api::decryptedMessageActionReadMessages &read_messages) {
|
|
// TODO
|
|
}
|
|
void SecretChatActor::on_outbound_action(secret_api::decryptedMessageActionDeleteMessages &delete_messages) {
|
|
// Corresponding logevent won't be deleted before promise returned by add_changes is set.
|
|
on_delete_messages(delete_messages.random_ids_).ensure();
|
|
}
|
|
void SecretChatActor::on_outbound_action(secret_api::decryptedMessageActionScreenshotMessages &screenshot) {
|
|
// noting to do
|
|
}
|
|
void SecretChatActor::on_outbound_action(secret_api::decryptedMessageActionFlushHistory &flush_history) {
|
|
on_flush_history(pfs_state_.message_id).ensure();
|
|
}
|
|
void SecretChatActor::on_outbound_action(secret_api::decryptedMessageActionResend &resend) {
|
|
if (seq_no_state_.resend_end_seq_no < resend.end_seq_no_ / 2) { // replay protection
|
|
seq_no_state_.resend_end_seq_no = resend.end_seq_no_ / 2;
|
|
on_seq_no_state_changed();
|
|
}
|
|
}
|
|
void SecretChatActor::on_outbound_action(secret_api::decryptedMessageActionNotifyLayer ¬ify_layer) {
|
|
config_state_.my_layer = notify_layer.layer_;
|
|
context_->secret_chat_db()->set_value(config_state_);
|
|
}
|
|
void SecretChatActor::on_outbound_action(secret_api::decryptedMessageActionTyping &typing) {
|
|
// noop
|
|
}
|
|
|
|
Status SecretChatActor::on_inbound_action(secret_api::decryptedMessageActionSetMessageTTL &set_ttl) {
|
|
config_state_.ttl = set_ttl.ttl_seconds_;
|
|
context_->secret_chat_db()->set_value(config_state_);
|
|
send_update_secret_chat();
|
|
return Status::OK();
|
|
}
|
|
Status SecretChatActor::on_inbound_action(secret_api::decryptedMessageActionReadMessages &read_messages) {
|
|
// TODO
|
|
return Status::OK();
|
|
}
|
|
Status SecretChatActor::on_inbound_action(secret_api::decryptedMessageActionDeleteMessages &delete_messages) {
|
|
return on_delete_messages(delete_messages.random_ids_);
|
|
}
|
|
Status SecretChatActor::on_inbound_action(secret_api::decryptedMessageActionScreenshotMessages &screenshot) {
|
|
// TODO
|
|
return Status::OK();
|
|
}
|
|
Status SecretChatActor::on_inbound_action(secret_api::decryptedMessageActionFlushHistory &screenshot) {
|
|
return on_flush_history(pfs_state_.message_id);
|
|
}
|
|
Status SecretChatActor::on_inbound_action(secret_api::decryptedMessageActionResend &resend) {
|
|
return Status::OK();
|
|
}
|
|
Status SecretChatActor::on_inbound_action(secret_api::decryptedMessageActionNotifyLayer ¬ify_layer) {
|
|
if (notify_layer.layer_ > config_state_.his_layer) {
|
|
config_state_.his_layer = notify_layer.layer_;
|
|
context_->secret_chat_db()->set_value(config_state_);
|
|
send_update_secret_chat();
|
|
}
|
|
return Status::OK();
|
|
}
|
|
Status SecretChatActor::on_inbound_action(secret_api::decryptedMessageActionTyping &typing) {
|
|
// noop
|
|
return Status::OK();
|
|
}
|
|
|
|
// Perfect Forward Secrecy
|
|
void SecretChatActor::on_outbound_action(secret_api::decryptedMessageActionRequestKey &request_key) {
|
|
LOG_CHECK(pfs_state_.state == PfsState::WaitSendRequest || pfs_state_.state == PfsState::SendRequest) << pfs_state_;
|
|
pfs_state_.state = PfsState::WaitRequestResponse;
|
|
on_pfs_state_changed();
|
|
}
|
|
void SecretChatActor::on_outbound_action(secret_api::decryptedMessageActionAcceptKey &accept_key) {
|
|
CHECK(pfs_state_.state == PfsState::WaitSendAccept || pfs_state_.state == PfsState::SendAccept);
|
|
pfs_state_.state = PfsState::WaitAcceptResponse;
|
|
pfs_state_.handshake = DhHandshake();
|
|
on_pfs_state_changed();
|
|
}
|
|
void SecretChatActor::on_outbound_action(secret_api::decryptedMessageActionAbortKey &abort_key) {
|
|
// TODO
|
|
LOG(FATAL) << "TODO";
|
|
}
|
|
void SecretChatActor::on_outbound_action(secret_api::decryptedMessageActionCommitKey &commit_key) {
|
|
CHECK(pfs_state_.state == PfsState::WaitSendCommit || pfs_state_.state == PfsState::SendCommit);
|
|
|
|
CHECK(static_cast<int64>(pfs_state_.other_auth_key.id()) == commit_key.key_fingerprint_);
|
|
std::swap(pfs_state_.auth_key, pfs_state_.other_auth_key);
|
|
pfs_state_.can_forget_other_key = true;
|
|
|
|
pfs_state_.state = PfsState::Empty;
|
|
pfs_state_.last_message_id = pfs_state_.message_id;
|
|
pfs_state_.last_timestamp = Time::now();
|
|
pfs_state_.last_out_seq_no = seq_no_state_.my_out_seq_no;
|
|
|
|
on_pfs_state_changed();
|
|
}
|
|
void SecretChatActor::on_outbound_action(secret_api::decryptedMessageActionNoop &noop) {
|
|
// noop
|
|
}
|
|
|
|
// decryptedMessageActionRequestKey#f3c9611b exchange_id:long g_a:bytes = DecryptedMessageAction;
|
|
Status SecretChatActor::on_inbound_action(secret_api::decryptedMessageActionRequestKey &request_key) {
|
|
if (pfs_state_.state == PfsState::WaitRequestResponse || pfs_state_.state == PfsState::SendRequest) {
|
|
if (pfs_state_.exchange_id > request_key.exchange_id_) {
|
|
LOG(INFO) << "RequestKey: silently abort their request";
|
|
return Status::OK();
|
|
} else {
|
|
pfs_state_.state = PfsState::Empty;
|
|
if (pfs_state_.exchange_id == request_key.exchange_id_) {
|
|
context_->secret_chat_db()->set_value(pfs_state_);
|
|
LOG(WARNING) << "RequestKey: silently abort both requests (almost impossible)";
|
|
return Status::OK();
|
|
}
|
|
}
|
|
}
|
|
|
|
if (pfs_state_.state != PfsState::Empty) {
|
|
return Status::Error("Unexpected RequestKey");
|
|
}
|
|
LOG_CHECK(pfs_state_.other_auth_key.empty()) << "TODO: got requestKey, before old key is dropped";
|
|
pfs_state_.state = PfsState::SendAccept;
|
|
pfs_state_.handshake = DhHandshake();
|
|
pfs_state_.exchange_id = request_key.exchange_id_;
|
|
pfs_state_.handshake.set_config(auth_state_.dh_config.g, auth_state_.dh_config.prime);
|
|
pfs_state_.handshake.set_g_a(request_key.g_a_.as_slice());
|
|
TRY_STATUS(pfs_state_.handshake.run_checks(true, context_->dh_callback()));
|
|
auto id_and_key = pfs_state_.handshake.gen_key();
|
|
|
|
pfs_state_.other_auth_key = mtproto::AuthKey(id_and_key.first, std::move(id_and_key.second));
|
|
pfs_state_.can_forget_other_key = false;
|
|
pfs_state_.wait_message_id = pfs_state_.message_id;
|
|
|
|
on_pfs_state_changed();
|
|
return Status::OK();
|
|
}
|
|
|
|
// decryptedMessageActionAcceptKey#6fe1735b exchange_id:long g_b:bytes key_fingerprint:long = DecryptedMessageAction;
|
|
Status SecretChatActor::on_inbound_action(secret_api::decryptedMessageActionAcceptKey &accept_key) {
|
|
if (pfs_state_.state != PfsState::WaitRequestResponse) {
|
|
return Status::Error("AcceptKey: unexpected");
|
|
}
|
|
if (pfs_state_.exchange_id != accept_key.exchange_id_) {
|
|
return Status::Error("AcceptKey: exchange_id mismatch");
|
|
}
|
|
pfs_state_.handshake.set_g_a(accept_key.g_b_.as_slice());
|
|
TRY_STATUS(pfs_state_.handshake.run_checks(true, context_->dh_callback()));
|
|
auto id_and_key = pfs_state_.handshake.gen_key();
|
|
if (static_cast<int64>(id_and_key.first) != accept_key.key_fingerprint_) {
|
|
return Status::Error("AcceptKey: key_fingerprint mismatch");
|
|
}
|
|
pfs_state_.state = PfsState::SendCommit;
|
|
pfs_state_.handshake = DhHandshake();
|
|
CHECK(pfs_state_.can_forget_other_key || static_cast<int64>(pfs_state_.other_auth_key.id()) == id_and_key.first);
|
|
pfs_state_.other_auth_key = mtproto::AuthKey(id_and_key.first, std::move(id_and_key.second));
|
|
pfs_state_.can_forget_other_key = false;
|
|
pfs_state_.wait_message_id = pfs_state_.message_id;
|
|
|
|
on_pfs_state_changed();
|
|
return Status::OK();
|
|
}
|
|
Status SecretChatActor::on_inbound_action(secret_api::decryptedMessageActionAbortKey &abort_key) {
|
|
if (pfs_state_.exchange_id != abort_key.exchange_id_) {
|
|
LOG(INFO) << "AbortKey: exchange_id mismatch: " << tag("my exchange_id", pfs_state_.exchange_id)
|
|
<< to_string(abort_key);
|
|
return Status::OK();
|
|
}
|
|
if (pfs_state_.state != PfsState::WaitRequestResponse) {
|
|
return Status::Error("AbortKey: unexpected");
|
|
}
|
|
pfs_state_.state = PfsState::Empty;
|
|
pfs_state_.handshake = DhHandshake();
|
|
|
|
on_pfs_state_changed();
|
|
return Status::OK();
|
|
}
|
|
Status SecretChatActor::on_inbound_action(secret_api::decryptedMessageActionCommitKey &commit_key) {
|
|
if (pfs_state_.state != PfsState::WaitAcceptResponse) {
|
|
return Status::Error("CommitKey: unexpected");
|
|
}
|
|
if (pfs_state_.exchange_id != commit_key.exchange_id_) {
|
|
return Status::Error("CommitKey: exchange_id mismatch ");
|
|
}
|
|
|
|
CHECK(!pfs_state_.can_forget_other_key);
|
|
if (static_cast<int64>(pfs_state_.other_auth_key.id()) != commit_key.key_fingerprint_) {
|
|
return Status::Error("CommitKey: fingerprint mismatch");
|
|
}
|
|
std::swap(pfs_state_.auth_key, pfs_state_.other_auth_key);
|
|
pfs_state_.can_forget_other_key = true;
|
|
|
|
pfs_state_.state = PfsState::Empty;
|
|
pfs_state_.last_message_id = pfs_state_.message_id;
|
|
pfs_state_.last_timestamp = Time::now();
|
|
pfs_state_.last_out_seq_no = seq_no_state_.my_out_seq_no;
|
|
|
|
on_pfs_state_changed();
|
|
return Status::OK();
|
|
}
|
|
Status SecretChatActor::on_inbound_action(secret_api::decryptedMessageActionNoop &noop) {
|
|
// noop
|
|
return Status::OK();
|
|
}
|
|
|
|
Status SecretChatActor::on_inbound_action(secret_api::DecryptedMessageAction &action, int32 message_id) {
|
|
// Action may be not about PFS, but we still can use pfs_state_.message_id
|
|
if (message_id <= pfs_state_.message_id) { // replay protection
|
|
LOG(INFO) << "Drop old inbound DecryptedMessageAction: " << to_string(action) << tag("message_id", message_id)
|
|
<< tag("known_message_id", pfs_state_.message_id);
|
|
return Status::OK();
|
|
}
|
|
|
|
// if message_id < seq_no_state_.message_id, then SeqNoState with message_id bigger than current message_id is already saved.
|
|
// And event corresponding to message_id is saved too.
|
|
//
|
|
// Also, if SeqNoState with message_id greater than current message_id is not saved, then corresponding action will be
|
|
// replayed.
|
|
//
|
|
// This works only for ttl, not for pfs. Same ttl action may be processed twice.
|
|
if (message_id < seq_no_state_.message_id) {
|
|
LOG(INFO) << "Drop old inbound DecryptedMessageAction (non-pfs action): " << to_string(action);
|
|
return Status::OK();
|
|
}
|
|
pfs_state_.message_id = message_id; // replay protection
|
|
|
|
LOG(INFO) << "In on_inbound_action: " << to_string(action);
|
|
Status res;
|
|
downcast_call(action, [&](auto &obj) { res = this->on_inbound_action(obj); });
|
|
return res;
|
|
}
|
|
|
|
void SecretChatActor::on_outbound_action(secret_api::DecryptedMessageAction &action, int32 message_id) {
|
|
// Action may be not about PFS, but we still can use pfs_state_.message_id
|
|
if (message_id <= pfs_state_.message_id) { // replay protection
|
|
LOG(INFO) << "Drop old outbound DecryptedMessageAction: " << to_string(action);
|
|
return;
|
|
}
|
|
|
|
// see comment in on_inbound_action
|
|
if (message_id < seq_no_state_.message_id) {
|
|
LOG(INFO) << "Drop old outbound DecryptedMessageAction (non-pfs action): " << to_string(action);
|
|
return;
|
|
}
|
|
pfs_state_.message_id = message_id; // replay protection
|
|
|
|
LOG(INFO) << "In on_outbound_action: " << to_string(action);
|
|
downcast_call(action, [&](auto &obj) { this->on_outbound_action(obj); });
|
|
}
|
|
|
|
// decryptedMessageActionRequestKey#f3c9611b exchange_id:long g_a:bytes = DecryptedMessageAction;
|
|
void SecretChatActor::request_new_key() {
|
|
CHECK(!auth_state_.dh_config.empty());
|
|
|
|
pfs_state_.state = PfsState::SendRequest;
|
|
pfs_state_.handshake = DhHandshake();
|
|
pfs_state_.handshake.set_config(auth_state_.dh_config.g, auth_state_.dh_config.prime);
|
|
pfs_state_.exchange_id = Random::secure_int64();
|
|
|
|
// NB: must save explicitly
|
|
LOG(INFO) << "SAVE PfsState " << pfs_state_;
|
|
context_->secret_chat_db()->set_value(pfs_state_);
|
|
}
|
|
|
|
void SecretChatActor::on_promise_error(Status error, string desc) {
|
|
if (context_->close_flag()) {
|
|
LOG(DEBUG) << "Ignore " << tag("promise", desc) << error;
|
|
return;
|
|
}
|
|
LOG(FATAL) << "Failed: " << tag("promise", desc) << error;
|
|
}
|
|
|
|
constexpr int32 SecretChatActor::MAX_RESEND_COUNT;
|
|
|
|
} // namespace td
|