This repository has been archived on 2020-05-25. You can view files and clone it, but cannot push or open issues or pull requests.
tdlib-fork/td/telegram/SecretChatActor.cpp

2293 lines
92 KiB
C++
Raw Normal View History

//
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2020
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include "td/telegram/SecretChatActor.h"
#include "td/telegram/net/DcId.h"
#include "td/telegram/net/NetQueryCreator.h"
#include "td/telegram/SecretChatId.h"
#include "td/telegram/ServerMessageId.h"
#include "td/telegram/UniqueId.h"
#include "td/telegram/secret_api.hpp"
#include "td/telegram/telegram_api.hpp"
#include "td/mtproto/PacketInfo.h"
#include "td/mtproto/PacketStorer.h"
#include "td/mtproto/Transport.h"
#include "td/mtproto/utils.h"
#include "td/db/binlog/BinlogHelper.h"
#include "td/db/binlog/BinlogInterface.h"
#include "td/actor/MultiPromise.h"
#include "td/utils/as.h"
#include "td/utils/crypto.h"
#include "td/utils/format.h"
#include "td/utils/logging.h"
#include "td/utils/misc.h"
#include "td/utils/overloaded.h"
#include "td/utils/Random.h"
#include "td/utils/ScopeGuard.h"
#include "td/utils/StorerBase.h"
#include "td/utils/Time.h"
#include "td/utils/tl_parsers.h"
#include <array>
#include <tuple>
#include <type_traits>
//#define G GLOBAL_SHOULD_NOT_BE_USED_HERE
#undef G
namespace td {
inline TLObjectStorer<secret_api::Object> create_storer(const secret_api::Object &object) {
return TLObjectStorer<secret_api::Object>(object);
}
class SecretImpl {
public:
explicit SecretImpl(const Storer &data) : data(data) {
}
template <class StorerT>
void do_store(StorerT &storer) const {
storer.store_binary(static_cast<int32>(data.size()));
storer.store_storer(data);
}
private:
const Storer &data;
};
SecretChatActor::SecretChatActor(int32 id, unique_ptr<Context> context, bool can_be_empty)
: context_(std::move(context)), can_be_empty_(can_be_empty) {
auth_state_.id = id;
}
template <class T>
NetQueryPtr SecretChatActor::create_net_query(QueryType type, const T &function) {
return context_->net_query_creator().create(UniqueId::next(UniqueId::Type::Default, static_cast<uint8>(type)),
function, DcId::main(), NetQuery::Type::Common, NetQuery::AuthFlag::On);
}
void SecretChatActor::update_chat(telegram_api::object_ptr<telegram_api::EncryptedChat> chat) {
if (close_flag_) {
return;
}
check_status(on_update_chat(std::move(chat)));
loop();
}
void SecretChatActor::create_chat(int32 user_id, int64 user_access_hash, int32 random_id,
Promise<SecretChatId> promise) {
if (close_flag_) {
promise.set_error(Status::Error(400, "Chat is closed"));
return;
}
if (auth_state_.state != State::Empty) {
promise.set_error(Status::Error(500, "Bad random_id"));
check_status(Status::Error("Unexpected request_chat"));
loop();
return;
}
auto event = make_unique<logevent::CreateSecretChat>();
event->user_id = user_id;
event->user_access_hash = user_access_hash;
event->random_id = random_id;
event->set_logevent_id(binlog_add(context_->binlog(), LogEvent::HandlerType::SecretChats, create_storer(*event)));
do_create_chat_impl(std::move(event));
promise.set_value(SecretChatId(random_id));
loop();
}
void SecretChatActor::on_result_resendable(NetQueryPtr net_query, Promise<NetQueryPtr> promise) {
LOG(INFO) << "In on_result_resendable: " << net_query << " " << close_flag_;
if (context_->close_flag()) {
return;
}
auto key = UniqueId::extract_key(net_query->id());
if (close_flag_) {
if (key == static_cast<uint8>(QueryType::DiscardEncryption)) {
on_discard_encryption_result(std::move(net_query));
}
return;
}
check_status([&] {
switch (key) {
case static_cast<uint8>(QueryType::DhConfig):
return on_dh_config(std::move(net_query));
case static_cast<uint8>(QueryType::EncryptedChat):
return on_update_chat(std::move(net_query));
case static_cast<uint8>(QueryType::Message):
on_outbound_send_message_result(std::move(net_query), std::move(promise));
return Status::OK();
case static_cast<uint8>(QueryType::ReadHistory):
return on_read_history(std::move(net_query));
case static_cast<uint8>(QueryType::Ignore):
return Status::OK();
}
UNREACHABLE();
}());
loop();
}
void SecretChatActor::replay_close_chat(unique_ptr<logevent::CloseSecretChat> event) {
do_close_chat_impl(std::move(event));
}
void SecretChatActor::replay_create_chat(unique_ptr<logevent::CreateSecretChat> event) {
if (close_flag_) {
return;
}
do_create_chat_impl(std::move(event));
}
void SecretChatActor::add_inbound_message(unique_ptr<logevent::InboundSecretMessage> message) {
SCOPE_EXIT {
if (message) {
message->qts_ack.set_value(Unit());
}
};
if (close_flag_) {
return;
}
if (auth_state_.state != State::Ready) {
LOG(ERROR) << "Ignore unexpected update: " << tag("message", *message);
return;
}
check_status(do_inbound_message_encrypted(std::move(message)));
loop();
}
void SecretChatActor::replay_inbound_message(unique_ptr<logevent::InboundSecretMessage> message) {
if (close_flag_) {
return;
}
if (auth_state_.state != State::Ready) {
LOG(ERROR) << "Ignore unexpected replay inbound message: " << tag("message", *message);
return;
}
CHECK(!binlog_replay_finish_flag_);
CHECK(message->decrypted_message_layer); // from binlog
if (message->is_pending) { // wait for gaps?
// check_status(do_inbound_message_decrypted_unchecked(std::move(message)));
do_inbound_message_decrypted_pending(std::move(message));
} else { // just replay
LOG_CHECK(message->message_id > last_binlog_message_id_)
<< tag("last_binlog_message_id", last_binlog_message_id_) << tag("message_id", message->message_id);
last_binlog_message_id_ = message->message_id;
check_status(do_inbound_message_decrypted(std::move(message)));
}
loop();
}
void SecretChatActor::replay_outbound_message(unique_ptr<logevent::OutboundSecretMessage> message) {
if (close_flag_) {
return;
}
if (auth_state_.state != State::Ready) {
LOG(ERROR) << "Ignore unexpected replay outbound message: " << tag("message", *message);
return;
}
CHECK(!binlog_replay_finish_flag_);
LOG_CHECK(message->message_id > last_binlog_message_id_)
<< tag("last_binlog_message_id", last_binlog_message_id_) << tag("message_id", message->message_id);
last_binlog_message_id_ = message->message_id;
do_outbound_message_impl(std::move(message), Promise<>());
loop();
}
// NB: my_seq_no is just after message is sent, i.e. my_out_seq_no is already incremented
Result<BufferSlice> SecretChatActor::create_encrypted_message(int32 layer, int32 my_in_seq_no, int32 my_out_seq_no,
tl_object_ptr<secret_api::DecryptedMessage> &message) {
if (message->get_id() == secret_api::decryptedMessage::ID && layer < MTPROTO_2_LAYER) {
auto old = secret_api::move_object_as<secret_api::decryptedMessage>(message);
old->flags_ &= ~secret_api::decryptedMessage::GROUPED_ID_MASK;
message = secret_api::make_object<secret_api::decryptedMessage46>(
old->flags_, old->random_id_, old->ttl_, std::move(old->message_), std::move(old->media_),
std::move(old->entities_), std::move(old->via_bot_name_), old->reply_to_random_id_);
}
mtproto::AuthKey *auth_key = &pfs_state_.auth_key;
auto in_seq_no = my_in_seq_no * 2 + auth_state_.x;
auto out_seq_no = my_out_seq_no * 2 - 1 - auth_state_.x;
BufferSlice random_bytes(32);
Random::secure_bytes(random_bytes.as_slice().ubegin(), random_bytes.size());
auto message_with_layer = secret_api::make_object<secret_api::decryptedMessageLayer>(
std::move(random_bytes), layer, in_seq_no, out_seq_no, std::move(message));
LOG(INFO) << to_string(message_with_layer);
auto storer = create_storer(*message_with_layer);
auto new_storer = mtproto::PacketStorer<SecretImpl>(storer);
mtproto::PacketInfo info;
info.type = mtproto::PacketInfo::EndToEnd;
// Send with mtproto 2.0 if current layer is at least MTPROTO_2_LAYER
info.version = layer >= MTPROTO_2_LAYER ? 2 : 1;
info.is_creator = auth_state_.x == 0;
auto packet_writer = BufferWriter{mtproto::Transport::write(new_storer, *auth_key, &info), 0, 0};
mtproto::Transport::write(new_storer, *auth_key, &info, packet_writer.as_slice());
message = std::move(message_with_layer->message_);
return packet_writer.as_buffer_slice();
}
void SecretChatActor::send_message(tl_object_ptr<secret_api::DecryptedMessage> message,
tl_object_ptr<telegram_api::InputEncryptedFile> file, Promise<> promise) {
if (close_flag_) {
promise.set_error(Status::Error(400, "Chat is closed"));
return;
}
send_message_impl(std::move(message), std::move(file), SendFlag::External | SendFlag::Push, std::move(promise));
}
static int32 get_min_layer(const secret_api::decryptedMessageActionTyping &message) {
switch (message.action_->get_id()) {
case secret_api::sendMessageRecordRoundAction::ID:
case secret_api::sendMessageUploadRoundAction::ID:
return SecretChatActor::VIDEO_NOTES_LAYER;
}
return 0;
}
static int32 get_min_layer(const secret_api::decryptedMessageService &message) {
switch (message.action_->get_id()) {
case secret_api::decryptedMessageActionTyping::ID:
return get_min_layer(static_cast<const secret_api::decryptedMessageActionTyping &>(*message.action_));
default:
return 0;
}
}
static int32 get_min_layer(const secret_api::DocumentAttribute &attribute) {
switch (attribute.get_id()) {
case secret_api::documentAttributeVideo66::ID:
return SecretChatActor::VIDEO_NOTES_LAYER;
default:
return 0;
}
}
static int32 get_min_layer(const secret_api::decryptedMessageMediaDocument &message) {
int32 res = 0;
for (auto &attribute : message.attributes_) {
auto attrirbute_layer = get_min_layer(*attribute);
if (attrirbute_layer > res) {
res = attrirbute_layer;
}
return res;
}
return res;
}
static int32 get_min_layer(const secret_api::decryptedMessage &message) {
if (!message.media_) {
return 0;
}
switch (message.media_->get_id()) {
case secret_api::decryptedMessageMediaDocument::ID:
return get_min_layer(static_cast<const secret_api::decryptedMessageMediaDocument &>(*message.media_));
default:
return 0;
}
}
static int32 get_min_layer(const secret_api::DecryptedMessage &message) {
switch (message.get_id()) {
case secret_api::decryptedMessageService::ID:
return get_min_layer(static_cast<const secret_api::decryptedMessageService &>(message));
case secret_api::decryptedMessage::ID:
return get_min_layer(static_cast<const secret_api::decryptedMessage &>(message));
default:
return 0;
}
}
void SecretChatActor::send_message_impl(tl_object_ptr<secret_api::DecryptedMessage> message,
tl_object_ptr<telegram_api::InputEncryptedFile> file, int32 flags,
Promise<> promise) {
if (close_flag_) {
promise.set_error(Status::Error(400, "Chat is closed"));
return;
}
if (auth_state_.state != State::Ready) {
LOG(ERROR) << "Ignore send_message: " << tag("message", to_string(message)) << tag("file", to_string(file));
return promise.set_error(Status::Error(400, "Chat is not accessible"));
}
if (get_min_layer(*message) > config_state_.his_layer) {
return promise.set_error(Status::Error(400, "Message is not supported by the other side"));
}
LOG_CHECK(binlog_replay_finish_flag_) << "Trying to send message before binlog replay is finished: "
<< to_string(*message) << to_string(file);
int64 random_id = 0;
downcast_call(*message, [&](auto &x) { random_id = x.random_id_; });
LOG(INFO) << "Send message: " << to_string(*message) << to_string(file);
auto it = random_id_to_outbound_message_state_token_.find(random_id);
if (it != end(random_id_to_outbound_message_state_token_)) {
return on_outbound_outer_send_message_promise(it->second, std::move(promise));
}
auto binlog_event = make_unique<logevent::OutboundSecretMessage>();
binlog_event->chat_id = auth_state_.id;
binlog_event->random_id = random_id;
binlog_event->file = logevent::EncryptedInputFile::from_input_encrypted_file(file);
binlog_event->message_id = seq_no_state_.message_id + 1;
binlog_event->my_in_seq_no = seq_no_state_.my_in_seq_no;
binlog_event->my_out_seq_no = seq_no_state_.my_out_seq_no + 1;
binlog_event->his_in_seq_no = seq_no_state_.his_in_seq_no;
binlog_event->encrypted_message =
create_encrypted_message(current_layer(), binlog_event->my_in_seq_no, binlog_event->my_out_seq_no, message)
.move_as_ok();
binlog_event->is_service = (flags & SendFlag::Push) == 0;
binlog_event->is_external = (flags & SendFlag::External) != 0;
if (message->get_id() == secret_api::decryptedMessageService::ID) {
binlog_event->is_rewritable = false;
auto service_message = move_tl_object_as<secret_api::decryptedMessageService>(message);
binlog_event->action = std::move(service_message->action_);
} else {
binlog_event->is_rewritable = true;
}
do_outbound_message_impl(std::move(binlog_event), std::move(promise));
}
void SecretChatActor::send_message_action(tl_object_ptr<secret_api::SendMessageAction> action) {
if (close_flag_) {
return;
}
if (auth_state_.state != State::Ready) {
LOG(ERROR) << "Ignore send_message_action: " << tag("message", to_string(action));
return;
}
bool flag = action->get_id() != secret_api::sendMessageCancelAction::ID;
auto net_query =
create_net_query(QueryType::Ignore, telegram_api::messages_setEncryptedTyping(get_input_chat(), flag));
if (!set_typing_query_.empty()) {
LOG(INFO) << "Cancel previous set typing query";
cancel_query(set_typing_query_);
}
set_typing_query_ = net_query.get_weak();
context_->send_net_query(std::move(net_query), actor_shared(this), false);
}
void SecretChatActor::send_read_history(int32 date, Promise<> promise) {
if (close_flag_) {
promise.set_error(Status::Error(400, "Chat is closed"));
return;
}
if (auth_state_.state != State::Ready) {
LOG(ERROR) << "Ignore send_read_history: " << tag("date", date);
promise.set_error(Status::Error(400, "Can't access the chat"));
return;
}
if (date <= last_read_history_date_) {
return promise.set_value(Unit());
}
if (read_history_promise_) {
LOG(INFO) << "Cancel previous read history request in secret chat " << auth_state_.id;
read_history_promise_.set_value(Unit());
cancel_query(read_history_query_);
}
auto net_query =
create_net_query(QueryType::ReadHistory, telegram_api::messages_readEncryptedHistory(get_input_chat(), date));
read_history_query_ = net_query.get_weak();
last_read_history_date_ = date;
read_history_promise_ = std::move(promise);
LOG(INFO) << "Send read history request with date " << date << " in secret chat " << auth_state_.id;
context_->send_net_query(std::move(net_query), actor_shared(this), false);
}
void SecretChatActor::send_open_message(int64 random_id, Promise<> promise) {
if (close_flag_) {
promise.set_error(Status::Error(400, "Chat is closed"));
return;
}
if (auth_state_.state != State::Ready) {
promise.set_error(Status::Error(400, "Can't access the chat"));
return;
}
std::vector<int64> random_ids{random_id};
send_action(make_tl_object<secret_api::decryptedMessageActionReadMessages>(std::move(random_ids)), SendFlag::Push,
std::move(promise));
}
void SecretChatActor::delete_message(int64 random_id, Promise<> promise) {
if (auth_state_.state == State::Closed) {
promise.set_value(Unit());
return;
}
if (close_flag_) {
promise.set_error(Status::Error(400, "Chat is closed"));
return;
}
if (auth_state_.state != State::Ready) {
promise.set_error(Status::Error(400, "Can't access the chat"));
return;
}
return delete_messages(std::vector<int64>{random_id}, std::move(promise));
}
void SecretChatActor::delete_messages(std::vector<int64> random_ids, Promise<> promise) {
if (auth_state_.state == State::Closed) {
promise.set_value(Unit());
return;
}
if (close_flag_) {
promise.set_error(Status::Error(400, "Chat is closed"));
return;
}
if (auth_state_.state != State::Ready) {
promise.set_error(Status::Error(400, "Can't access the chat"));
return;
}
send_action(make_tl_object<secret_api::decryptedMessageActionDeleteMessages>(std::move(random_ids)), SendFlag::Push,
std::move(promise));
}
void SecretChatActor::delete_all_messages(Promise<> promise) {
if (auth_state_.state == State::Closed) {
promise.set_value(Unit());
return;
}
if (close_flag_) {
promise.set_error(Status::Error(400, "Chat is closed"));
return;
}
if (auth_state_.state != State::Ready) {
promise.set_error(Status::Error(400, "Can't access the chat"));
return;
}
send_action(make_tl_object<secret_api::decryptedMessageActionFlushHistory>(), SendFlag::Push, std::move(promise));
}
void SecretChatActor::notify_screenshot_taken(Promise<> promise) {
if (close_flag_) {
promise.set_error(Status::Error(400, "Chat is closed"));
return;
}
if (auth_state_.state != State::Ready) {
promise.set_error(Status::Error(400, "Can't access the chat"));
return;
}
send_action(make_tl_object<secret_api::decryptedMessageActionScreenshotMessages>(vector<int64>()), SendFlag::Push,
std::move(promise));
}
void SecretChatActor::send_set_ttl_message(int32 ttl, int64 random_id, Promise<> promise) {
if (close_flag_) {
promise.set_error(Status::Error(400, "Chat is closed"));
return;
}
if (auth_state_.state != State::Ready) {
promise.set_error(Status::Error(400, "Can't access the chat"));
return;
}
send_message_impl(secret_api::make_object<secret_api::decryptedMessageService>(
random_id, make_tl_object<secret_api::decryptedMessageActionSetMessageTTL>(ttl)),
nullptr, SendFlag::External | SendFlag::Push, std::move(promise));
}
void SecretChatActor::send_action(tl_object_ptr<secret_api::DecryptedMessageAction> action, int32 flags,
Promise<> promise) {
send_message_impl(
secret_api::make_object<secret_api::decryptedMessageService>(Random::secure_int64(), std::move(action)), nullptr,
flags, std::move(promise));
}
void SecretChatActor::binlog_replay_finish() {
on_his_in_seq_no_updated();
LOG(INFO) << "Binlog replay is finished with SeqNoState " << seq_no_state_;
LOG(INFO) << "Binlog replay is finished with PfsState " << pfs_state_;
binlog_replay_finish_flag_ = true;
if (auth_state_.state == State::Ready) {
if (config_state_.my_layer < MY_LAYER) {
send_action(secret_api::make_object<secret_api::decryptedMessageActionNotifyLayer>(MY_LAYER), SendFlag::None,
Promise<>());
}
}
yield();
}
void SecretChatActor::loop() {
if (close_flag_) {
return;
}
if (!binlog_replay_finish_flag_) {
return;
}
check_status(do_loop());
}
Status SecretChatActor::do_loop() {
TRY_STATUS(run_auth());
run_pfs();
run_fill_gaps();
return Status::OK();
}
void SecretChatActor::on_send_message_ack(int64 random_id) {
context_->on_send_message_ack(random_id);
}
Status SecretChatActor::on_delete_messages(const std::vector<int64> &random_ids) {
for (auto random_id : random_ids) {
auto it = random_id_to_outbound_message_state_token_.find(random_id);
if (it == random_id_to_outbound_message_state_token_.end()) {
continue;
}
auto state_id = it->second;
TRY_STATUS(outbound_rewrite_with_empty(state_id));
}
return Status::OK();
}
Status SecretChatActor::on_flush_history(int32 last_message_id) {
std::vector<uint64> to_rewrite;
outbound_message_states_.for_each([&](auto state_id, auto &state) {
if (state.message->message_id < last_message_id && state.message->is_rewritable) {
to_rewrite.push_back(state_id);
}
});
for (auto state_id : to_rewrite) {
TRY_STATUS(outbound_rewrite_with_empty(state_id));
}
return Status::OK();
}
Status SecretChatActor::run_auth() {
switch (auth_state_.state) {
case State::Empty:
return Status::OK();
case State::SendRequest: {
if (!auth_state_.handshake.has_config()) {
return Status::OK();
}
// messages.requestEncryption#f64daf43 user_id:InputUser random_id:int g_a:bytes = EncryptedChat;
auto query = create_net_query(QueryType::EncryptedChat, telegram_api::messages_requestEncryption(
get_input_user(), auth_state_.random_id,
BufferSlice(auth_state_.handshake.get_g_b())));
context_->send_net_query(std::move(query), actor_shared(this), false);
auth_state_.state = State::WaitRequestResponse;
return Status::OK();
}
case State::SendAccept: {
if (!auth_state_.handshake.has_config()) {
return Status::OK();
}
TRY_STATUS(auth_state_.handshake.run_checks(true, context_->dh_callback()));
auto id_and_key = auth_state_.handshake.gen_key();
pfs_state_.auth_key = mtproto::AuthKey(id_and_key.first, std::move(id_and_key.second));
calc_key_hash();
// messages.acceptEncryption#3dbc0415 peer:InputEncryptedChat g_b:bytes key_fingerprint:long = EncryptedChat;
auto query = create_net_query(
QueryType::EncryptedChat,
telegram_api::messages_acceptEncryption(get_input_chat(), BufferSlice(auth_state_.handshake.get_g_b()),
pfs_state_.auth_key.id()));
context_->send_net_query(std::move(query), actor_shared(this), false);
auth_state_.state = State::WaitAcceptResponse;
return Status::OK();
}
default:
break;
}
return Status::OK();
}
void SecretChatActor::run_fill_gaps() {
// replay messages
while (true) {
if (pending_inbound_messages_.empty()) {
break;
}
auto begin = pending_inbound_messages_.begin();
auto next_seq_no = begin->first;
if (next_seq_no <= seq_no_state_.my_in_seq_no) {
LOG(INFO) << "Replay pending event: " << tag("seq_no", next_seq_no);
auto message = std::move(begin->second);
pending_inbound_messages_.erase(begin);
check_status(do_inbound_message_decrypted_unchecked(std::move(message)));
CHECK(pending_inbound_messages_.find(next_seq_no) == pending_inbound_messages_.end());
} else {
break;
}
}
if (pending_inbound_messages_.empty()) {
return;
}
auto start_seq_no = seq_no_state_.my_in_seq_no;
auto finish_seq_no = pending_inbound_messages_.begin()->first - 1;
LOG(INFO) << tag("start_seq_no", start_seq_no) << tag("finish_seq_no", finish_seq_no)
<< tag("resend_end_seq_no", seq_no_state_.resend_end_seq_no);
CHECK(start_seq_no <= finish_seq_no);
if (seq_no_state_.resend_end_seq_no >= finish_seq_no) {
return;
}
CHECK(seq_no_state_.resend_end_seq_no < start_seq_no);
start_seq_no = start_seq_no * 2 + auth_state_.x;
finish_seq_no = finish_seq_no * 2 + auth_state_.x;
send_action(secret_api::make_object<secret_api::decryptedMessageActionResend>(start_seq_no, finish_seq_no),
SendFlag::None, Promise<>());
}
void SecretChatActor::run_pfs() {
while (true) {
LOG(INFO) << "Run pfs loop: " << pfs_state_;
if (pfs_state_.state == PfsState::Empty &&
(pfs_state_.last_message_id + 100 < seq_no_state_.message_id ||
pfs_state_.last_timestamp + 60 * 60 * 24 * 7 < Time::now()) &&
pfs_state_.other_auth_key.empty()) {
LOG(INFO) << "Request new key";
request_new_key();
}
switch (pfs_state_.state) {
case PfsState::SendRequest: {
// shouldn't wait, pfs_state is already saved explicitly
pfs_state_.state = PfsState::WaitSendRequest; // don't save it!
send_action(secret_api::make_object<secret_api::decryptedMessageActionRequestKey>(
pfs_state_.exchange_id, BufferSlice(pfs_state_.handshake.get_g_b())),
SendFlag::None, Promise<>());
break;
}
case PfsState::SendCommit: {
// must wait till pfs_state is saved to binlog. Otherwise we may save ActionCommit to binlog without pfs_state,
// which has the new auth_key.
if (saved_pfs_state_message_id_ < pfs_state_.wait_message_id) {
return;
}
// TODO: wait till gaps are filled???
pfs_state_.state = PfsState::WaitSendCommit; // don't save it
send_action(secret_api::make_object<secret_api::decryptedMessageActionCommitKey>(
pfs_state_.exchange_id, static_cast<int64>(pfs_state_.other_auth_key.id())),
SendFlag::None, Promise<>());
break;
}
case PfsState::SendAccept: {
if (saved_pfs_state_message_id_ < pfs_state_.wait_message_id) {
return;
}
pfs_state_.state = PfsState::WaitSendAccept; // don't save it
send_action(secret_api::make_object<secret_api::decryptedMessageActionAcceptKey>(
pfs_state_.exchange_id, BufferSlice(pfs_state_.handshake.get_g_b()),
static_cast<int64>(pfs_state_.other_auth_key.id())),
SendFlag::None, Promise<>());
break;
}
default:
return;
}
}
}
void SecretChatActor::check_status(Status status) {
if (status.is_error()) {
if (status.code() == 1) {
LOG(WARNING) << "Non-fatal error: " << status;
} else {
on_fatal_error(std::move(status));
}
}
}
void SecretChatActor::on_fatal_error(Status status) {
LOG(ERROR) << "Fatal error: " << status;
cancel_chat(Promise<>());
}
void SecretChatActor::cancel_chat(Promise<> promise) {
if (close_flag_) {
promise.set_value(Unit());
return;
}
close_flag_ = true;
std::vector<logevent::LogEvent::Id> to_delete;
outbound_message_states_.for_each(
[&](auto state_id, auto &state) { to_delete.push_back(state.message->logevent_id()); });
inbound_message_states_.for_each([&](auto state_id, auto &state) { to_delete.push_back(state.logevent_id); });
// TODO: It must be a transaction
for (auto id : to_delete) {
binlog_erase(context_->binlog(), id);
}
if (create_logevent_id_ != 0) {
binlog_erase(context_->binlog(), create_logevent_id_);
create_logevent_id_ = 0;
}
auto event = make_unique<logevent::CloseSecretChat>();
event->chat_id = auth_state_.id;
event->set_logevent_id(binlog_add(context_->binlog(), LogEvent::HandlerType::SecretChats, create_storer(*event)));
auto on_sync = PromiseCreator::lambda(
[actor_id = actor_id(this), event = std::move(event), promise = std::move(promise)](Result<Unit> result) mutable {
if (result.is_ok()) {
send_closure(actor_id, &SecretChatActor::do_close_chat_impl, std::move(event));
promise.set_value(Unit());
} else {
promise.set_error(result.error().clone());
send_closure(actor_id, &SecretChatActor::on_promise_error, result.move_as_error(), "do_close_chat_impl");
}
});
context_->binlog()->force_sync(std::move(on_sync));
yield();
}
void SecretChatActor::do_close_chat_impl(unique_ptr<logevent::CloseSecretChat> event) {
close_flag_ = true;
close_logevent_id_ = event->logevent_id();
LOG(INFO) << "Send messages.discardEncryption";
auth_state_.state = State::Closed;
context_->secret_chat_db()->set_value(auth_state_);
context_->secret_chat_db()->erase_value(config_state_);
context_->secret_chat_db()->erase_value(pfs_state_);
context_->secret_chat_db()->erase_value(seq_no_state_);
auto query = create_net_query(QueryType::DiscardEncryption, telegram_api::messages_discardEncryption(auth_state_.id));
send_update_secret_chat();
context_->send_net_query(std::move(query), actor_shared(this), true);
}
void SecretChatActor::do_create_chat_impl(unique_ptr<logevent::CreateSecretChat> event) {
LOG(INFO) << *event;
CHECK(event->random_id == auth_state_.id);
create_logevent_id_ = event->logevent_id();
if (auth_state_.state == State::Empty) {
auth_state_.user_id = event->user_id;
auth_state_.user_access_hash = event->user_access_hash;
auth_state_.random_id = event->random_id;
auth_state_.state = State::SendRequest;
auth_state_.x = 0;
auth_state_.date = context_->unix_time();
send_update_secret_chat();
} else if (auth_state_.state == State::SendRequest) {
} else if (auth_state_.state == State::WaitRequestResponse) {
} else {
binlog_erase(context_->binlog(), create_logevent_id_);
create_logevent_id_ = 0;
}
}
void SecretChatActor::on_discard_encryption_result(NetQueryPtr result) {
CHECK(close_flag_);
CHECK(close_logevent_id_ != 0);
if (context_->close_flag()) {
return;
}
LOG(INFO) << "Got result for messages.discardEncryption";
context_->secret_chat_db()->erase_value(auth_state_);
binlog_erase(context_->binlog(), close_logevent_id_);
// skip flush
stop();
}
telegram_api::object_ptr<telegram_api::inputUser> SecretChatActor::get_input_user() {
return telegram_api::make_object<telegram_api::inputUser>(auth_state_.user_id, auth_state_.user_access_hash);
}
telegram_api::object_ptr<telegram_api::inputEncryptedChat> SecretChatActor::get_input_chat() {
return telegram_api::make_object<telegram_api::inputEncryptedChat>(auth_state_.id, auth_state_.access_hash);
}
void SecretChatActor::tear_down() {
LOG(INFO) << "SecretChatActor: tear_down";
// TODO notify send update that we are dead
}
Result<std::tuple<uint64, BufferSlice, int32>> SecretChatActor::decrypt(BufferSlice &encrypted_message) {
MutableSlice data = encrypted_message.as_slice();
CHECK(is_aligned_pointer<4>(data.data()));
TRY_RESULT(auth_key_id, mtproto::Transport::read_auth_key_id(data));
mtproto::AuthKey *auth_key = nullptr;
if (auth_key_id == pfs_state_.auth_key.id()) {
auth_key = &pfs_state_.auth_key;
} else if (auth_key_id == pfs_state_.other_auth_key.id()) {
auth_key = &pfs_state_.other_auth_key;
} else {
return Status::Error(1, PSLICE() << "Unknown " << tag("auth_key_id", format::as_hex(auth_key_id))
<< tag("crc", crc64(encrypted_message.as_slice())));
}
// expect that message is encrypted with mtproto 2.0 if their layer is at least MTPROTO_2_LAYER
std::array<int, 2> versions{{1, 2}};
if (config_state_.his_layer >= MTPROTO_2_LAYER) {
std::swap(versions[0], versions[1]);
}
BufferSlice encrypted_message_copy;
int32 mtproto_version = -1;
Result<mtproto::Transport::ReadResult> r_read_result;
for (size_t i = 0; i < versions.size(); i++) {
bool is_last = i + 1 == versions.size();
encrypted_message_copy = encrypted_message.copy();
data = encrypted_message_copy.as_slice();
CHECK(is_aligned_pointer<4>(data.data()));
mtproto::PacketInfo info;
info.type = mtproto::PacketInfo::EndToEnd;
mtproto_version = versions[i];
info.version = mtproto_version;
info.is_creator = auth_state_.x == 0;
r_read_result = mtproto::Transport::read(data, *auth_key, &info);
if (!is_last && r_read_result.is_error()) {
LOG(WARNING) << tag("mtproto", mtproto_version) << " decryption failed " << r_read_result.error();
continue;
}
break;
}
TRY_RESULT(read_result, std::move(r_read_result));
switch (read_result.type()) {
case mtproto::Transport::ReadResult::Quickack:
return Status::Error("Got quickack instead of a message");
case mtproto::Transport::ReadResult::Error:
return Status::Error(PSLICE() << "Got mtproto error code instead of a message: " << read_result.error());
case mtproto::Transport::ReadResult::Nop:
return Status::Error("Got nop instead of a message");
case mtproto::Transport::ReadResult::Packet:
data = read_result.packet();
break;
default:
UNREACHABLE();
}
int32 len = as<int32>(data.begin());
data = data.substr(4, len);
if (!is_aligned_pointer<4>(data.data())) {
return std::make_tuple(auth_key_id, BufferSlice(data), mtproto_version);
} else {
return std::make_tuple(auth_key_id, encrypted_message_copy.from_slice(data), mtproto_version);
}
}
Status SecretChatActor::do_inbound_message_encrypted(unique_ptr<logevent::InboundSecretMessage> message) {
SCOPE_EXIT {
if (message) {
message->qts_ack.set_value(Unit());
}
};
TRY_RESULT(decrypted, decrypt(message->encrypted_message));
auto auth_key_id = std::get<0>(decrypted);
auto data_buffer = std::move(std::get<1>(decrypted));
auto mtproto_version = std::get<2>(decrypted);
message->auth_key_id = auth_key_id;
TlBufferParser parser(&data_buffer);
auto id = parser.fetch_int();
Status status;
if (id == secret_api::decryptedMessageLayer::ID) {
auto message_with_layer = secret_api::decryptedMessageLayer::fetch(parser);
parser.fetch_end();
if (!parser.get_error()) {
auto layer = message_with_layer->layer_;
if (layer < DEFAULT_LAYER && false /* Android app can send such messages */) {
LOG(ERROR) << "Layer " << layer << " is not supported, drop message " << to_string(message_with_layer);
return Status::OK();
}
if (config_state_.his_layer < layer) {
config_state_.his_layer = layer;
context_->secret_chat_db()->set_value(config_state_);
send_update_secret_chat();
}
if (layer >= MTPROTO_2_LAYER && mtproto_version < 2) {
return Status::Error(PSLICE() << "MTProto 1.0 encryption is forbidden for this layer");
}
if (message_with_layer->in_seq_no_ < 0) {
return Status::Error(PSLICE() << "Invalid seq_no: " << to_string(message_with_layer));
}
message->decrypted_message_layer = std::move(message_with_layer);
return do_inbound_message_decrypted_unchecked(std::move(message));
} else {
status = Status::Error(PSLICE() << parser.get_error() << format::as_hex_dump<4>(data_buffer.as_slice()));
}
} else {
status = Status::Error(PSLICE() << "Unknown constructor " << format::as_hex(id));
}
// support for older layer
LOG(WARNING) << "Failed to Fetch update: " << status;
send_action(secret_api::make_object<secret_api::decryptedMessageActionNotifyLayer>(MY_LAYER), SendFlag::None,
Promise<>());
if (config_state_.his_layer == 8) {
TlBufferParser new_parser(&data_buffer);
auto message_without_layer = secret_api::DecryptedMessage::fetch(new_parser);
parser.fetch_end();
if (!new_parser.get_error()) {
message->decrypted_message_layer = secret_api::make_object<secret_api::decryptedMessageLayer>(
BufferSlice(), config_state_.his_layer, -1, -1, std::move(message_without_layer));
return do_inbound_message_decrypted_unchecked(std::move(message));
}
LOG(ERROR) << "Failed to fetch update (DecryptedMessage): " << new_parser.get_error()
<< format::as_hex_dump<4>(data_buffer.as_slice());
}
return status;
}
Status SecretChatActor::check_seq_no(int in_seq_no, int out_seq_no, int32 his_layer) {
if (in_seq_no < 0) {
return Status::OK();
}
if (in_seq_no % 2 != (1 - auth_state_.x) || out_seq_no % 2 != auth_state_.x) {
return Status::Error("Bad seq_no parity");
}
in_seq_no /= 2;
out_seq_no /= 2;
if (out_seq_no < seq_no_state_.my_in_seq_no) {
return Status::Error(1, "Old seq_no");
}
if (out_seq_no > seq_no_state_.my_in_seq_no) {
return Status::Error(2, "Gap found!");
}
if (in_seq_no < seq_no_state_.his_in_seq_no) {
return Status::Error("in_seq_no is not monotonic");
}
if (seq_no_state_.my_out_seq_no < in_seq_no) {
return Status::Error("in_seq_no is bigger than seq_no_state_.my_out_seq_no");
}
if (his_layer < seq_no_state_.his_layer) {
return Status::Error("his_layer is not monotonic");
}
return Status::OK();
}
Status SecretChatActor::do_inbound_message_decrypted_unchecked(unique_ptr<logevent::InboundSecretMessage> message) {
SCOPE_EXIT {
LOG_IF(FATAL, message && message->qts_ack) << "Lost qts_promise";
};
auto in_seq_no = message->decrypted_message_layer->in_seq_no_;
auto out_seq_no = message->decrypted_message_layer->out_seq_no_;
auto status = check_seq_no(in_seq_no, out_seq_no, message->his_layer());
if (status.is_error() && status.code() != 2 /* not gap found */) {
message->qts_ack.set_value(Unit());
if (message->logevent_id()) {
LOG(INFO) << "Erase binlog event: " << tag("logevent_id", message->logevent_id());
binlog_erase(context_->binlog(), message->logevent_id());
}
auto warning_message = PSTRING() << status << tag("seq_no_state_.my_in_seq_no", seq_no_state_.my_in_seq_no)
<< tag("seq_no_state_.my_out_seq_no", seq_no_state_.my_out_seq_no)
<< tag("seq_no_state_.his_in_seq_no", seq_no_state_.his_in_seq_no)
<< tag("in_seq_no", in_seq_no) << tag("out_seq_no", out_seq_no)
<< to_string(message->decrypted_message_layer);
if (status.code()) {
LOG(WARNING) << warning_message;
} else {
LOG(ERROR) << warning_message;
}
return status;
}
if (message->decrypted_message_layer->message_->get_id() == secret_api::decryptedMessageService8::ID) {
auto old = move_tl_object_as<secret_api::decryptedMessageService8>(message->decrypted_message_layer->message_);
message->decrypted_message_layer->message_ =
secret_api::make_object<secret_api::decryptedMessageService>(old->random_id_, std::move(old->action_));
}
// Process ActionResend.
if (message->decrypted_message_layer->message_->get_id() == secret_api::decryptedMessageService::ID) {
auto *decrypted_message_service =
static_cast<secret_api::decryptedMessageService *>(message->decrypted_message_layer->message_.get());
if (decrypted_message_service->action_->get_id() == secret_api::decryptedMessageActionResend::ID) {
auto *action_resend =
static_cast<secret_api::decryptedMessageActionResend *>(decrypted_message_service->action_.get());
uint32 start_seq_no = static_cast<uint32>(action_resend->start_seq_no_ / 2);
uint32 finish_seq_no = static_cast<uint32>(action_resend->end_seq_no_ / 2);
if (start_seq_no + MAX_RESEND_COUNT < finish_seq_no) {
message->qts_ack.set_value(Unit());
return Status::Error(PSLICE() << "Won't resend more than " << MAX_RESEND_COUNT << " messages");
}
LOG(INFO) << "ActionResend: " << tag("start", start_seq_no) << tag("finish_seq_no", finish_seq_no);
for (auto seq_no = start_seq_no; seq_no <= finish_seq_no; seq_no++) {
auto it = out_seq_no_to_outbound_message_state_token_.find(seq_no);
if (it == out_seq_no_to_outbound_message_state_token_.end()) {
message->qts_ack.set_value(Unit());
return Status::Error(PSLICE() << "Can't resend query " << tag("seq_no", seq_no));
}
auto state_id = it->second;
outbound_resend(state_id);
}
// It is ok to replace action with Noop, because it won't be written to binlog before message is marked unsent
decrypted_message_service->action_ = secret_api::make_object<secret_api::decryptedMessageActionNoop>();
}
}
LOG(INFO) << "GOT MESSAGE " << to_string(message->decrypted_message_layer);
if (status.is_error()) {
CHECK(status.code() == 2); // gap found
do_inbound_message_decrypted_pending(std::move(message));
return Status::OK();
}
message->message_id = seq_no_state_.message_id + 1;
if (in_seq_no != -1) {
message->my_in_seq_no = out_seq_no / 2 + 1;
message->my_out_seq_no = seq_no_state_.my_out_seq_no;
message->his_in_seq_no = in_seq_no / 2;
}
return do_inbound_message_decrypted(std::move(message));
}
void SecretChatActor::do_outbound_message_impl(unique_ptr<logevent::OutboundSecretMessage> binlog_event,
Promise<> promise) {
binlog_event->crc = crc64(binlog_event->encrypted_message.as_slice());
LOG(INFO) << "Do outbound message: " << *binlog_event << tag("crc", binlog_event->crc);
auto &state_id_ref = random_id_to_outbound_message_state_token_[binlog_event->random_id];
LOG_CHECK(state_id_ref == 0) << "Random id collision";
state_id_ref = outbound_message_states_.create();
const uint64 state_id = state_id_ref;
auto *state = outbound_message_states_.get(state_id);
LOG(INFO) << tag("state_id", state_id);
CHECK(state);
state->message = std::move(binlog_event);
// OutboundSecretMessage
//
// 1. [] => Save logevent. [save_logevent]
// 2. [save_logevent] => Save SeqNoState [save_changes]
// 3. [save_logevent] => Send NetQuery [send_message]
// Note: we have to force binlog to flush
// 4.0 [send_message]:Fail => rewrite
// 4. [save_changes; send_message] => Mark logevent as sent [rewrite_logevent]
// 5. [save_changes; send_message; ack] => [remove_logevent]
auto message = state->message.get();
// send_message
auto send_message_start = PromiseCreator::lambda([actor_id = actor_id(this), state_id](Result<> result) {
if (result.is_ok()) {
send_closure(actor_id, &SecretChatActor::on_outbound_send_message_start, state_id);
} else {
send_closure(actor_id, &SecretChatActor::on_promise_error, result.move_as_error(),
"on_outbound_send_message_start");
}
});
// update seq_no
update_seq_no_state(*message);
// process action
if (message->action) {
on_outbound_action(*message->action, message->message_id);
}
// save_changes
auto save_changes_finish = PromiseCreator::lambda([actor_id = actor_id(this), state_id](Result<> result) {
if (result.is_ok()) {
send_closure(actor_id, &SecretChatActor::on_outbound_save_changes_finish, state_id);
} else {
send_closure(actor_id, &SecretChatActor::on_promise_error, result.move_as_error(),
"on_outbound_save_chages_finish");
}
});
auto save_changes_start = add_changes(std::move(save_changes_finish));
// wait for ack
auto out_seq_no = state->message->my_out_seq_no - 1;
if (out_seq_no < seq_no_state_.his_in_seq_no) {
state->ack_flag = true;
} else {
out_seq_no_to_outbound_message_state_token_[out_seq_no] = state_id;
}
// save_logevent => [send_message; save_changes]
auto save_logevent_finish = PromiseCreator::join(std::move(send_message_start), std::move(save_changes_start));
auto logevent_id = state->message->logevent_id();
if (logevent_id == 0) {
logevent_id = binlog_add(context_->binlog(), LogEvent::HandlerType::SecretChats, create_storer(*state->message));
LOG(INFO) << "Outbound secret message [save_logevent] start " << tag("logevent_id", logevent_id);
context_->binlog()->force_sync(std::move(save_logevent_finish));
state->message->set_logevent_id(logevent_id);
} else {
LOG(INFO) << "Outbound secret message [save_logevent] skip " << tag("logevent_id", logevent_id);
save_logevent_finish.set_value(Unit());
}
promise.set_value(Unit()); // logevent was sent to binlog;
}
void SecretChatActor::on_his_in_seq_no_updated() {
auto it = begin(out_seq_no_to_outbound_message_state_token_);
while (it != end(out_seq_no_to_outbound_message_state_token_) && it->first < seq_no_state_.his_in_seq_no) {
auto token = it->second;
it = out_seq_no_to_outbound_message_state_token_.erase(it);
on_outbound_ack(token);
}
}
void SecretChatActor::on_seq_no_state_changed() {
seq_no_state_changed_ = true;
}
void SecretChatActor::on_pfs_state_changed() {
LOG(INFO) << "In on_pfs_state_changed: " << pfs_state_;
pfs_state_changed_ = true;
}
Promise<> SecretChatActor::add_changes(Promise<> save_changes_finish) {
StateChange change;
if (seq_no_state_changed_) {
change.seq_no_state_change = SeqNoStateChange(seq_no_state_);
seq_no_state_changed_ = false;
}
if (pfs_state_changed_) {
change.pfs_state_change = PfsStateChange(pfs_state_);
pfs_state_changed_ = false;
}
change.save_changes_finish = std::move(save_changes_finish);
auto save_changes_start_token = changes_processor_.add(std::move(change));
return PromiseCreator::lambda([actor_id = actor_id(this), save_changes_start_token](Result<> result) {
if (result.is_ok()) {
send_closure(actor_id, &SecretChatActor::on_save_changes_start, save_changes_start_token);
} else {
send_closure(actor_id, &SecretChatActor::on_promise_error, result.move_as_error(), "on_save_changes_start");
}
});
}
template <class T>
void SecretChatActor::update_seq_no_state(const T &new_seq_no_state) {
// Some old updates may arrive. Just ignore them
if (seq_no_state_.message_id >= new_seq_no_state.message_id &&
seq_no_state_.my_in_seq_no >= new_seq_no_state.my_in_seq_no &&
seq_no_state_.my_out_seq_no >= new_seq_no_state.my_out_seq_no &&
seq_no_state_.his_in_seq_no >= new_seq_no_state.his_in_seq_no) {
return;
}
seq_no_state_.message_id = new_seq_no_state.message_id;
if (new_seq_no_state.my_in_seq_no != -1) {
LOG(INFO) << "Have my_in_seq_no: " << seq_no_state_.my_in_seq_no << "--->" << new_seq_no_state.my_in_seq_no;
seq_no_state_.my_in_seq_no = new_seq_no_state.my_in_seq_no;
seq_no_state_.my_out_seq_no = new_seq_no_state.my_out_seq_no;
auto new_his_layer = new_seq_no_state.his_layer();
if (new_his_layer != -1) {
seq_no_state_.his_layer = new_his_layer;
}
if (seq_no_state_.his_in_seq_no != new_seq_no_state.his_in_seq_no) {
seq_no_state_.his_in_seq_no = new_seq_no_state.his_in_seq_no;
on_his_in_seq_no_updated();
}
}
return on_seq_no_state_changed();
}
void SecretChatActor::do_inbound_message_decrypted_pending(unique_ptr<logevent::InboundSecretMessage> message) {
// Just save logevent if necessary
auto logevent_id = message->logevent_id();
// qts
auto qts_promise = std::move(message->qts_ack);
if (logevent_id == 0) {
message->is_pending = true;
message->set_logevent_id(binlog_add(context_->binlog(), LogEvent::HandlerType::SecretChats, create_storer(*message),
std::move(qts_promise)));
LOG(INFO) << "Inbound PENDING secret message [save_logevent] start (do not expect finish) "
<< tag("logevent_id", message->logevent_id());
} else {
LOG(INFO) << "Inbound PENDING secret message [save_logevent] skip " << tag("logevent_id", logevent_id);
CHECK(!qts_promise);
}
LOG(INFO) << "Inbound PENDING secret message start " << tag("logevent_id", logevent_id) << tag("message", *message);
auto seq_no = message->decrypted_message_layer->out_seq_no_ / 2;
pending_inbound_messages_[seq_no] = std::move(message);
}
Status SecretChatActor::do_inbound_message_decrypted(unique_ptr<logevent::InboundSecretMessage> message) {
// InboundSecretMessage
//
// 1. [] => Add logevent. [save_logevent]
// 2. [save_logevent] => Save SeqNoState [save_changes]
// 3. [save_logevent] => Add message to MessageManager [save_message]
// Note: if we are able to add message by random_id, we may not wait for (logevent). Otherwise we should force
// binlog flush.
// 4. [save_logevent] => Update qts [qts]
// 5. [save_changes; save_message; ?qts) => Remove logevent [remove_logevent]
// Note: It is easier not to wait for qts. In the worst case old update will be handled again after restart.
auto state_id = inbound_message_states_.create();
InboundMessageState &state = *inbound_message_states_.get(state_id);
// save logevent
auto logevent_id = message->logevent_id();
bool need_sync = false;
if (logevent_id == 0) {
logevent_id = binlog_add(context_->binlog(), LogEvent::HandlerType::SecretChats, create_storer(*message));
LOG(INFO) << "Inbound secret message [save_logevent] start " << tag("logevent_id", logevent_id);
need_sync = true;
} else {
if (message->is_pending) {
message->is_pending = false;
auto old_logevent_id = logevent_id;
logevent_id = binlog_add(context_->binlog(), LogEvent::HandlerType::SecretChats, create_storer(*message));
binlog_erase(context_->binlog(), old_logevent_id);
LOG(INFO) << "Inbound secret message [save_logevent] rewrite (after pending state) "
<< tag("logevent_id", logevent_id) << tag("old_logevent_id", old_logevent_id);
need_sync = true;
} else {
LOG(INFO) << "Inbound secret message [save_logevent] skip " << tag("logevent_id", logevent_id);
}
}
LOG(INFO) << "Inbound secret message start " << tag("logevent_id", logevent_id) << tag("message", *message);
state.logevent_id = logevent_id;
// save_message
auto save_message_finish = PromiseCreator::lambda([actor_id = actor_id(this), state_id](Result<> result) {
if (result.is_ok()) {
send_closure(actor_id, &SecretChatActor::on_inbound_save_message_finish, state_id);
} else {
send_closure(actor_id, &SecretChatActor::on_promise_error, result.move_as_error(),
"on_inbound_save_message_finish");
}
});
// update seq_no
update_seq_no_state(*message);
// drop old key
if (!pfs_state_.other_auth_key.empty() && message->auth_key_id == pfs_state_.auth_key.id() &&
pfs_state_.can_forget_other_key) {
LOG(INFO) << "Drop old auth key " << tag("auth_key_id", format::as_hex(pfs_state_.other_auth_key.id()));
pfs_state_.other_auth_key = mtproto::AuthKey();
on_pfs_state_changed();
}
// qts
auto qts_promise = std::move(message->qts_ack);
// process message
tl_object_ptr<telegram_api::encryptedFile> file;
if (message->has_encrypted_file) {
file = message->file.as_encrypted_file();
}
if (message->decrypted_message_layer->message_->get_id() == secret_api::decryptedMessage46::ID) {
auto old = move_tl_object_as<secret_api::decryptedMessage46>(message->decrypted_message_layer->message_);
old->flags_ &= ~secret_api::decryptedMessage::GROUPED_ID_MASK; // just in case
message->decrypted_message_layer->message_ = secret_api::make_object<secret_api::decryptedMessage>(
old->flags_, old->random_id_, old->ttl_, std::move(old->message_), std::move(old->media_),
std::move(old->entities_), std::move(old->via_bot_name_), old->reply_to_random_id_, 0);
}
if (message->decrypted_message_layer->message_->get_id() == secret_api::decryptedMessageService8::ID) {
auto old = move_tl_object_as<secret_api::decryptedMessageService8>(message->decrypted_message_layer->message_);
message->decrypted_message_layer->message_ =
secret_api::make_object<secret_api::decryptedMessageService>(old->random_id_, std::move(old->action_));
}
// NB: message is invalid after this 'move_as'
// Send update through context_
// Note, that update may be sent multiple times and should be somehow protected from replay.
// Luckily all updates seems to be idempotent.
// We could use ChangesProcessor to mark logevent as sent to context_, but I don't see any advantages of this
// approach.
if (message->decrypted_message_layer->message_->get_id() == secret_api::decryptedMessage::ID) {
auto decrypted_message =
move_tl_object_as<secret_api::decryptedMessage>(message->decrypted_message_layer->message_);
context_->on_inbound_message(get_user_id(), MessageId(ServerMessageId(message->message_id)), message->date,
std::move(file), std::move(decrypted_message), std::move(save_message_finish));
} else if (message->decrypted_message_layer->message_->get_id() == secret_api::decryptedMessageService::ID) {
auto decrypted_message_service =
move_tl_object_as<secret_api::decryptedMessageService>(message->decrypted_message_layer->message_);
auto action = std::move(decrypted_message_service->action_);
switch (action->get_id()) {
case secret_api::decryptedMessageActionDeleteMessages::ID:
// Corresponding logevent won't be deleted before promise returned by add_changes is set.
context_->on_delete_messages(
static_cast<const secret_api::decryptedMessageActionDeleteMessages &>(*action).random_ids_,
std::move(save_message_finish));
break;
case secret_api::decryptedMessageActionFlushHistory::ID:
context_->on_flush_history(MessageId(ServerMessageId(message->message_id)), std::move(save_message_finish));
break;
case secret_api::decryptedMessageActionReadMessages::ID: {
const auto &random_ids =
static_cast<const secret_api::decryptedMessageActionReadMessages &>(*action).random_ids_;
if (random_ids.size() == 1) {
context_->on_read_message(random_ids[0], std::move(save_message_finish));
} else { // probably never happens
MultiPromiseActorSafe mpas{"ReadSecretMessagesMultiPromiseActor"};
mpas.add_promise(std::move(save_message_finish));
auto lock = mpas.get_promise();
for (auto random_id : random_ids) {
context_->on_read_message(random_id, mpas.get_promise());
}
lock.set_value(Unit());
}
break;
}
case secret_api::decryptedMessageActionScreenshotMessages::ID:
context_->on_screenshot_taken(get_user_id(), MessageId(ServerMessageId(message->message_id)), message->date,
decrypted_message_service->random_id_, std::move(save_message_finish));
break;
case secret_api::decryptedMessageActionSetMessageTTL::ID:
context_->on_set_ttl(get_user_id(), MessageId(ServerMessageId(message->message_id)), message->date,
static_cast<const secret_api::decryptedMessageActionSetMessageTTL &>(*action).ttl_seconds_,
decrypted_message_service->random_id_, std::move(save_message_finish));
break;
default:
/*
decryptedMessageActionResend#511110b0 start_seq_no:int end_seq_no:int = DecryptedMessageAction;
decryptedMessageActionNotifyLayer#f3048883 layer:int = DecryptedMessageAction;
decryptedMessageActionTyping#ccb27641 action:SendMessageAction = DecryptedMessageAction;
decryptedMessageActionRequestKey#f3c9611b exchange_id:long g_a:bytes = DecryptedMessageAction;
decryptedMessageActionAcceptKey#6fe1735b exchange_id:long g_b:bytes key_fingerprint:long = DecryptedMessageAction;
decryptedMessageActionAbortKey#dd05ec6b exchange_id:long = DecryptedMessageAction;
decryptedMessageActionCommitKey#ec2e0b9b exchange_id:long key_fingerprint:long = DecryptedMessageAction;
decryptedMessageActionNoop#a82fdd63 = DecryptedMessageAction;
*/
save_message_finish.set_value(Unit());
break;
}
state.message_id = message->message_id;
TRY_STATUS(on_inbound_action(*action, message->message_id));
} else {
LOG(ERROR) << "INGORE MESSAGE: " << to_string(message->decrypted_message_layer);
save_message_finish.set_value(Unit());
}
// save_changes
auto save_changes_finish = PromiseCreator::lambda([actor_id = actor_id(this), state_id](Result<> result) {
if (result.is_ok()) {
send_closure(actor_id, &SecretChatActor::on_inbound_save_changes_finish, state_id);
} else {
send_closure(actor_id, &SecretChatActor::on_promise_error, result.move_as_error(),
"on_inbound_save_changes_finish");
}
});
auto save_changes_start = add_changes(std::move(save_changes_finish));
// save_logevent
auto save_logevent_finish = PromiseCreator::join(std::move(save_changes_start), std::move(qts_promise));
if (need_sync) {
// TODO: lazy sync is enough
context_->binlog()->force_sync(std::move(save_logevent_finish));
} else {
save_logevent_finish.set_value(Unit());
}
return Status::OK();
}
void SecretChatActor::on_save_changes_start(ChangesProcessor<StateChange>::Id save_changes_token) {
if (close_flag_) {
return;
}
SeqNoStateChange seq_no_state_change;
PfsStateChange pfs_state_change;
std::vector<Promise<Unit>> save_changes_finish_promises;
changes_processor_.finish(save_changes_token, [&](StateChange &&change) {
save_changes_finish_promises.emplace_back(std::move(change.save_changes_finish));
if (change.seq_no_state_change) {
seq_no_state_change = std::move(change.seq_no_state_change);
}
if (change.pfs_state_change) {
pfs_state_change = std::move(change.pfs_state_change);
}
});
if (seq_no_state_change) {
LOG(INFO) << "SAVE SeqNoState " << seq_no_state_change;
context_->secret_chat_db()->set_value(seq_no_state_change);
}
if (pfs_state_change) {
LOG(INFO) << "SAVE PfsState " << pfs_state_change;
saved_pfs_state_message_id_ = pfs_state_change.message_id;
context_->secret_chat_db()->set_value(pfs_state_change);
}
// NB: we may not wait till database is flushed, because every other change will be in the same binlog
for (auto &save_changes_finish : save_changes_finish_promises) {
save_changes_finish.set_value(Unit());
}
}
void SecretChatActor::on_inbound_save_message_finish(uint64 state_id) {
if (close_flag_) {
return;
}
auto *state = inbound_message_states_.get(state_id);
CHECK(state);
LOG(INFO) << "Inbound message [save_message] finish " << tag("logevent_id", state->logevent_id);
state->save_message_finish = true;
inbound_loop(state, state_id);
}
void SecretChatActor::on_inbound_save_changes_finish(uint64 state_id) {
if (close_flag_) {
return;
}
auto *state = inbound_message_states_.get(state_id);
CHECK(state);
LOG(INFO) << "Inbound message [save_changes] finish " << tag("logevent_id", state->logevent_id);
state->save_changes_finish = true;
inbound_loop(state, state_id);
}
void SecretChatActor::inbound_loop(InboundMessageState *state, uint64 state_id) {
if (close_flag_) {
return;
}
if (!state->save_changes_finish || !state->save_message_finish) {
return;
}
LOG(INFO) << "Inbound message [remove_logevent] start " << tag("logevent_id", state->logevent_id);
binlog_erase(context_->binlog(), state->logevent_id);
inbound_message_states_.erase(state_id);
}
NetQueryPtr SecretChatActor::create_net_query(const logevent::OutboundSecretMessage &message) {
NetQueryPtr query;
if (message.is_service) {
CHECK(message.file.empty());
query = create_net_query(QueryType::Message,
telegram_api::messages_sendEncryptedService(get_input_chat(), message.random_id,
message.encrypted_message.clone()));
query->total_timeout_limit = 1000000000; // inf. We will re-sent it immediately anyway
} else if (message.file.empty()) {
query = create_net_query(
QueryType::Message,
telegram_api::messages_sendEncrypted(get_input_chat(), message.random_id, message.encrypted_message.clone()));
} else {
query = create_net_query(
QueryType::Message,
telegram_api::messages_sendEncryptedFile(get_input_chat(), message.random_id, message.encrypted_message.clone(),
message.file.as_input_encrypted_file()));
}
if (message.is_external && context_->get_config_option_boolean("use_quick_ack")) {
query->quick_ack_promise_ =
PromiseCreator::lambda([actor_id = actor_id(this), random_id = message.random_id](
Unit) { send_closure(actor_id, &SecretChatActor::on_send_message_ack, random_id); },
PromiseCreator::Ignore());
}
return query;
}
void SecretChatActor::on_outbound_send_message_start(uint64 state_id) {
auto *state = outbound_message_states_.get(state_id);
if (state == nullptr) {
LOG(INFO) << "Outbound message [send_message] start ignored (unknown state_id) " << tag("state_id", state_id);
return;
}
auto *message = state->message.get();
if (!message->is_sent) {
LOG(INFO) << "Outbound message [send_message] start " << tag("logevent_id", state->message->logevent_id());
auto query = create_net_query(*message);
state->net_query_id = query->id();
state->net_query_ref = query.get_weak();
state->net_query_may_fail = state->message->is_rewritable;
context_->send_net_query(std::move(query), actor_shared(this, state_id), true);
} else {
LOG(INFO) << "Outbound message [send_message] start dummy " << tag("logevent_id", state->message->logevent_id());
on_outbound_send_message_finish(state_id);
}
}
void SecretChatActor::outbound_resend(uint64 state_id) {
if (close_flag_) {
return;
}
auto *state = outbound_message_states_.get(state_id);
CHECK(state);
state->message->is_sent = false;
state->net_query_id = 0;
state->net_query_ref = NetQueryRef();
LOG(INFO) << "Outbound message [resend] " << tag("logevent_id", state->message->logevent_id())
<< tag("state_id", state_id);
binlog_rewrite(context_->binlog(), state->message->logevent_id(), LogEvent::HandlerType::SecretChats,
create_storer(*state->message));
auto send_message_start = PromiseCreator::lambda([actor_id = actor_id(this), state_id](Result<> result) {
if (result.is_ok()) {
send_closure(actor_id, &SecretChatActor::on_outbound_send_message_start, state_id);
} else {
send_closure(actor_id, &SecretChatActor::on_promise_error, result.move_as_error(),
"on_outbound_send_message_start");
}
});
context_->binlog()->force_sync(std::move(send_message_start));
}
Status SecretChatActor::outbound_rewrite_with_empty(uint64 state_id) {
if (close_flag_) {
return Status::OK();
}
auto *state = outbound_message_states_.get(state_id);
if (state == nullptr || !state->message->is_rewritable) {
return Status::OK();
}
cancel_query(state->net_query_ref);
MutableSlice data = state->message->encrypted_message.as_slice();
CHECK(is_aligned_pointer<4>(data.data()));
// Rewrite with delete itself.
tl_object_ptr<secret_api::DecryptedMessage> message = secret_api::make_object<secret_api::decryptedMessageService>(
state->message->random_id, secret_api::make_object<secret_api::decryptedMessageActionDeleteMessages>(
std::vector<int64>{static_cast<int64>(state->message->random_id)}));
TRY_RESULT(encrypted_message, create_encrypted_message(current_layer(), state->message->my_in_seq_no,
state->message->my_out_seq_no, message));
state->message->encrypted_message = std::move(encrypted_message);
LOG(INFO) << tag("crc", crc64(state->message->encrypted_message.as_slice()));
state->message->is_rewritable = false;
state->message->is_external = false;
state->message->is_service = true;
state->message->file = logevent::EncryptedInputFile::from_input_encrypted_file(nullptr);
binlog_rewrite(context_->binlog(), state->message->logevent_id(), LogEvent::HandlerType::SecretChats,
create_storer(*state->message));
return Status::OK();
}
void SecretChatActor::on_outbound_send_message_result(NetQueryPtr query, Promise<NetQueryPtr> resend_promise) {
if (close_flag_) {
return;
}
auto state_id = get_link_token();
auto *state = outbound_message_states_.get(state_id);
if (!state) {
LOG(INFO) << "Ignore old net query result " << tag("state_id", state_id);
query->clear();
return;
}
CHECK(state);
if (state->net_query_id != query->id()) {
LOG(INFO) << "Ignore old net query result " << tag("logevent_id", state->message->logevent_id())
<< tag("query_id", query->id()) << tag("state_query_id", state->net_query_id) << query;
query->clear();
return;
}
state->net_query_id = 0;
state->net_query_ref = NetQueryRef();
auto r_result = fetch_result<telegram_api::messages_sendEncrypted>(std::move(query));
if (r_result.is_error()) {
auto error = r_result.move_as_error();
auto send_message_error_promise =
PromiseCreator::lambda([actor_id = actor_id(this), state_id, error = error.clone(),
resend_promise = std::move(resend_promise)](Result<> result) mutable {
if (result.is_ok()) {
send_closure(actor_id, &SecretChatActor::on_outbound_send_message_error, state_id, std::move(error),
std::move(resend_promise));
} else {
send_closure(actor_id, &SecretChatActor::on_promise_error, result.move_as_error(),
"on_outbound_send_message_error");
}
});
if (state->message->is_external) {
LOG(INFO) << "Outbound secret message [send_message] failed, rewrite it with dummy "
<< tag("logevent_id", state->message->logevent_id()) << tag("error", error);
state->send_result_ = [this, random_id = state->message->random_id, error_code = error.code(),
error_message = error.message()](Promise<> promise) {
this->context_->on_send_message_error(random_id, Status::Error(error_code, error_message), std::move(promise));
};
state->send_result_(std::move(send_message_error_promise));
} else {
// Just resend.
LOG(INFO) << "Outbound secret message [send_message] failed, resend it "
<< tag("logevent_id", state->message->logevent_id()) << tag("error", error);
send_message_error_promise.set_value(Unit());
}
return;
}
auto result = r_result.move_as_ok();
LOG(INFO) << "Got messages_sendEncrypted result: " << tag("message_id", state->message->message_id)
<< tag("random_id", state->message->random_id) << to_string(*result);
auto send_message_finish_promise = PromiseCreator::lambda([actor_id = actor_id(this), state_id](Result<> result) {
if (result.is_ok()) {
send_closure(actor_id, &SecretChatActor::on_outbound_send_message_finish, state_id);
} else {
send_closure(actor_id, &SecretChatActor::on_promise_error, result.move_as_error(),
"on_outbound_send_message_finish");
}
});
if (state->message->is_external) {
switch (result->get_id()) {
case telegram_api::messages_sentEncryptedMessage::ID: {
auto sent = move_tl_object_as<telegram_api::messages_sentEncryptedMessage>(result);
state->send_result_ = [this, random_id = state->message->random_id,
message_id = MessageId(ServerMessageId(state->message->message_id)),
date = sent->date_](Promise<> promise) {
this->context_->on_send_message_ok(random_id, message_id, date, nullptr, std::move(promise));
};
state->send_result_(std::move(send_message_finish_promise));
return;
}
case telegram_api::messages_sentEncryptedFile::ID: {
auto sent = move_tl_object_as<telegram_api::messages_sentEncryptedFile>(result);
std::function<telegram_api::object_ptr<telegram_api::EncryptedFile>()> get_file;
telegram_api::downcast_call(
*sent->file_, overloaded(
[&](telegram_api::encryptedFileEmpty &) {
state->message->file = logevent::EncryptedInputFile::from_input_encrypted_file(
telegram_api::inputEncryptedFileEmpty());
get_file = [] {
return telegram_api::make_object<telegram_api::encryptedFileEmpty>();
};
},
[&](telegram_api::encryptedFile &file) {
state->message->file = logevent::EncryptedInputFile::from_input_encrypted_file(
telegram_api::inputEncryptedFile(file.id_, file.access_hash_));
get_file = [id = file.id_, access_hash = file.access_hash_, size = file.size_,
dc_id = file.dc_id_, key_fingerprint = file.key_fingerprint_] {
return telegram_api::make_object<telegram_api::encryptedFile>(id, access_hash, size,
dc_id, key_fingerprint);
};
}));
state->send_result_ = [this, random_id = state->message->random_id,
message_id = MessageId(ServerMessageId(state->message->message_id)), date = sent->date_,
get_file = std::move(get_file)](Promise<> promise) {
this->context_->on_send_message_ok(random_id, message_id, date, get_file(), std::move(promise));
};
state->send_result_(std::move(send_message_finish_promise));
return;
}
}
}
send_message_finish_promise.set_value(Unit());
}
void SecretChatActor::on_outbound_send_message_error(uint64 state_id, Status error,
Promise<NetQueryPtr> resend_promise) {
if (close_flag_) {
return;
}
if (context_->close_flag()) {
return;
}
auto *state = outbound_message_states_.get(state_id);
if (!state) {
return;
}
bool need_sync = false;
if (state->net_query_may_fail) {
// message could be already non-rewritable, if it was deleted during NetQuery execution.
if (state->message->is_rewritable) {
delete_message(state->message->random_id, Promise<>());
// state pointer may be invalidated
state = outbound_message_states_.get(state_id);
need_sync = true;
}
} else {
bool should_fail = false;
if (error.code() == 429) {
should_fail = false;
} else if (error.code() == 400 && error.message() == "ENCRYPTION_DECLINED") {
should_fail = true;
} else {
LOG(ERROR) << "Got unknown error for encrypted service message: " << error;
should_fail = true;
}
if (should_fail) {
return on_fatal_error(std::move(error));
}
}
auto query = create_net_query(*state->message);
state->net_query_id = query->id();
CHECK(resend_promise);
auto send_message_start =
PromiseCreator::lambda([actor_id = actor_id(this), resend_promise = std::move(resend_promise),
query = std::move(query)](Result<> result) mutable {
if (result.is_ok()) {
resend_promise.set_value(std::move(query));
} else {
send_closure(actor_id, &SecretChatActor::on_promise_error, result.move_as_error(), "resend_query");
}
});
if (need_sync) {
context_->binlog()->force_sync(std::move(send_message_start));
} else {
send_message_start.set_value(Unit());
}
}
void SecretChatActor::on_outbound_send_message_finish(uint64 state_id) {
if (close_flag_) {
return;
}
auto *state = outbound_message_states_.get(state_id);
if (!state) {
return;
}
LOG(INFO) << "Outbound secret message [send_message] finish " << tag("logevent_id", state->message->logevent_id());
state->send_message_finish_flag = true;
state->outer_send_message_finish.set_value(Unit());
outbound_loop(state, state_id);
}
void SecretChatActor::on_outbound_save_changes_finish(uint64 state_id) {
if (close_flag_) {
return;
}
auto *state = outbound_message_states_.get(state_id);
CHECK(state);
LOG(INFO) << "Outbound secret message [save_changes] finish " << tag("logevent_id", state->message->logevent_id());
state->save_changes_finish_flag = true;
outbound_loop(state, state_id);
}
void SecretChatActor::on_outbound_ack(uint64 state_id) {
if (close_flag_) {
return;
}
auto *state = outbound_message_states_.get(state_id);
CHECK(state);
LOG(INFO) << "Outbound secret message [ack] finish " << tag("logevent_id", state->message->logevent_id());
state->ack_flag = true;
outbound_loop(state, state_id);
}
void SecretChatActor::on_outbound_outer_send_message_promise(uint64 state_id, Promise<> promise) {
if (close_flag_) {
promise.set_error(Status::Error(400, "Chat is closed"));
return;
}
auto *state = outbound_message_states_.get(state_id);
CHECK(state);
LOG(INFO) << "Outbound secret message [TODO] " << tag("logevent_id", state->message->logevent_id());
promise.set_value(Unit()); // Seems like this message is at least stored to binlog already
if (state->send_result_) {
state->send_result_({});
} else {
context_->on_send_message_error(state->message->random_id, Status::Error(400, "Message has already been sent"),
Auto());
}
}
void SecretChatActor::outbound_loop(OutboundMessageState *state, uint64 state_id) {
if (close_flag_) {
return;
}
if (state->save_changes_finish_flag /*&& state->send_message_finish_flag*/ && state->ack_flag) {
LOG(INFO) << "Outbound message [remove_logevent] start " << tag("logevent_id", state->message->logevent_id());
binlog_erase(context_->binlog(), state->message->logevent_id());
random_id_to_outbound_message_state_token_.erase(state->message->random_id);
LOG(INFO) << "Outbound message finish (lazy) " << tag("logevent_id", state->message->logevent_id());
outbound_message_states_.erase(state_id);
return;
}
if (state->save_changes_finish_flag && state->send_message_finish_flag &&
!state->message->is_sent) { // [rewrite_logevent]
LOG(INFO) << "Outbound message [rewrite_logevent] start " << tag("logevent_id", state->message->logevent_id());
state->message->is_sent = true;
binlog_rewrite(context_->binlog(), state->message->logevent_id(), LogEvent::HandlerType::SecretChats,
create_storer(*state->message));
}
}
template <class T>
Status SecretChatActor::save_common_info(T &update) {
if (auth_state_.id != update.id_) {
return Status::Error(PSLICE() << "chat_id mismatch: " << tag("mine", auth_state_.id) << tag("outer", update.id_));
}
auth_state_.id = update.id_;
auth_state_.access_hash = update.access_hash_;
return Status::OK();
}
Status SecretChatActor::on_update_chat(telegram_api::encryptedChatRequested &update) {
if (auth_state_.state != State::Empty) {
LOG(WARNING) << "Unexpected ChatRequested ignored: " << to_string(update);
return Status::OK();
}
auth_state_.state = State::SendAccept;
auth_state_.x = 1;
auth_state_.user_id = update.admin_id_;
auth_state_.date = context_->unix_time();
TRY_STATUS(save_common_info(update));
auth_state_.handshake.set_g_a(update.g_a_.as_slice());
send_update_secret_chat();
return Status::OK();
}
Status SecretChatActor::on_update_chat(telegram_api::encryptedChatEmpty &update) {
return Status::OK();
}
Status SecretChatActor::on_update_chat(telegram_api::encryptedChatWaiting &update) {
if (auth_state_.state != State::WaitRequestResponse && auth_state_.state != State::WaitAcceptResponse) {
LOG(WARNING) << "Unexpected ChatWaiting ignored";
return Status::OK();
}
TRY_STATUS(save_common_info(update));
send_update_secret_chat();
return Status::OK();
}
Status SecretChatActor::on_update_chat(telegram_api::encryptedChat &update) {
if (auth_state_.state != State::WaitRequestResponse && auth_state_.state != State::WaitAcceptResponse) {
LOG(WARNING) << "Unexpected Chat ignored";
return Status::OK();
}
TRY_STATUS(save_common_info(update));
if (auth_state_.state == State::WaitRequestResponse) {
auth_state_.handshake.set_g_a(update.g_a_or_b_.as_slice());
TRY_STATUS(auth_state_.handshake.run_checks(true, context_->dh_callback()));
auto id_and_key = auth_state_.handshake.gen_key();
pfs_state_.auth_key = mtproto::AuthKey(id_and_key.first, std::move(id_and_key.second));
calc_key_hash();
}
if (static_cast<int64>(pfs_state_.auth_key.id()) != update.key_fingerprint_) {
return Status::Error("Key fingerprint mismatch");
}
auth_state_.state = State::Ready;
if (create_logevent_id_ != 0) {
binlog_erase(context_->binlog(), create_logevent_id_);
create_logevent_id_ = 0;
}
// NB: order is important
context_->secret_chat_db()->set_value(pfs_state_);
context_->secret_chat_db()->set_value(auth_state_);
LOG(INFO) << "OK! Ready!";
send_update_secret_chat();
send_action(secret_api::make_object<secret_api::decryptedMessageActionNotifyLayer>(MY_LAYER), SendFlag::None,
Promise<>());
return Status::OK();
}
Status SecretChatActor::on_update_chat(telegram_api::encryptedChatDiscarded &update) {
return Status::Error("Chat discarded");
}
Status SecretChatActor::on_update_chat(NetQueryPtr query) {
static_assert(std::is_same<telegram_api::messages_requestEncryption::ReturnType,
telegram_api::messages_acceptEncryption::ReturnType>::value,
"");
TRY_RESULT(config, fetch_result<telegram_api::messages_requestEncryption>(std::move(query)));
TRY_STATUS(on_update_chat(std::move(config)));
if (auth_state_.state == State::WaitRequestResponse) {
context_->secret_chat_db()->set_value(auth_state_);
context_->binlog()->force_sync(Promise<>());
}
return Status::OK();
}
Status SecretChatActor::on_update_chat(telegram_api::object_ptr<telegram_api::EncryptedChat> chat) {
Status res;
downcast_call(*chat, [&](auto &obj) { res = this->on_update_chat(obj); });
return res;
}
Status SecretChatActor::on_read_history(NetQueryPtr query) {
if (query.generation() == read_history_query_.generation()) {
read_history_query_ = NetQueryRef();
read_history_promise_.set_value(Unit());
}
return Status::OK();
}
void SecretChatActor::start_up() {
LOG(INFO) << "SecretChatActor: start_up";
// auto start = Time::now();
auto r_auth_state = context_->secret_chat_db()->get_value<AuthState>();
if (r_auth_state.is_ok()) {
auth_state_ = r_auth_state.move_as_ok();
}
if (!can_be_empty_ && auth_state_.state == State::Empty) {
LOG(WARNING) << "Close Secret chat because it is empty";
return stop();
}
if (auth_state_.state == State::Closed) {
close_flag_ = true;
}
auto r_seq_no_state = context_->secret_chat_db()->get_value<SeqNoState>();
if (r_seq_no_state.is_ok()) {
seq_no_state_ = r_seq_no_state.move_as_ok();
}
auto r_config_state = context_->secret_chat_db()->get_value<ConfigState>();
if (r_config_state.is_ok()) {
config_state_ = r_config_state.move_as_ok();
}
auto r_pfs_state = context_->secret_chat_db()->get_value<PfsState>();
if (r_pfs_state.is_ok()) {
pfs_state_ = r_pfs_state.move_as_ok();
}
saved_pfs_state_message_id_ = pfs_state_.message_id;
pfs_state_.last_timestamp = Time::now();
send_update_secret_chat();
get_dh_config();
// auto end = Time::now();
// CHECK(end - start < 0.2);
LOG(INFO) << "In start_up with SeqNoState " << seq_no_state_;
LOG(INFO) << "In start_up with PfsState " << pfs_state_;
}
void SecretChatActor::get_dh_config() {
if (auth_state_.state != State::Empty) {
return;
}
auto dh_config = context_->dh_config();
if (dh_config) {
auth_state_.dh_config = *dh_config;
}
auto version = auth_state_.dh_config.version;
int32 random_length = 0;
auto query = create_net_query(QueryType::DhConfig, telegram_api::messages_getDhConfig(version, random_length));
context_->send_net_query(std::move(query), actor_shared(this), false);
}
Status SecretChatActor::on_dh_config(NetQueryPtr query) {
LOG(INFO) << "Got dh config";
TRY_RESULT(config, fetch_result<telegram_api::messages_getDhConfig>(std::move(query)));
downcast_call(*config, [&](auto &obj) { this->on_dh_config(obj); });
TRY_STATUS(DhHandshake::check_config(auth_state_.dh_config.g, auth_state_.dh_config.prime, context_->dh_callback()));
auth_state_.handshake.set_config(auth_state_.dh_config.g, auth_state_.dh_config.prime);
return Status::OK();
}
void SecretChatActor::on_dh_config(telegram_api::messages_dhConfigNotModified &dh_not_modified) {
Random::add_seed(dh_not_modified.random_.as_slice());
}
void SecretChatActor::on_dh_config(telegram_api::messages_dhConfig &dh) {
auto dh_config = std::make_shared<DhConfig>();
dh_config->version = dh.version_;
dh_config->prime = dh.p_.as_slice().str();
dh_config->g = dh.g_;
Random::add_seed(dh.random_.as_slice());
auth_state_.dh_config = *dh_config;
context_->set_dh_config(dh_config);
}
void SecretChatActor::calc_key_hash() {
unsigned char sha1_buf[20];
auto sha1_slice = Slice(sha1_buf, 20);
sha1(pfs_state_.auth_key.key(), sha1_buf);
unsigned char sha256_buf[32];
auto sha256_slice = MutableSlice(sha256_buf, 32);
sha256(pfs_state_.auth_key.key(), sha256_slice);
auth_state_.key_hash = sha1_slice.truncate(16).str() + sha256_slice.truncate(20).str();
}
void SecretChatActor::send_update_secret_chat() {
if (auth_state_.state == State::Empty) {
return;
}
SecretChatState state;
if (auth_state_.state == State::Ready) {
state = SecretChatState::Active;
} else if (auth_state_.state == State::Closed) {
state = SecretChatState::Closed;
} else {
state = SecretChatState::Waiting;
}
context_->on_update_secret_chat(auth_state_.access_hash, get_user_id(), state, auth_state_.x == 0, config_state_.ttl,
auth_state_.date, auth_state_.key_hash, current_layer());
}
void SecretChatActor::on_outbound_action(secret_api::decryptedMessageActionSetMessageTTL &set_ttl) {
config_state_.ttl = set_ttl.ttl_seconds_;
context_->secret_chat_db()->set_value(config_state_);
send_update_secret_chat();
}
void SecretChatActor::on_outbound_action(secret_api::decryptedMessageActionReadMessages &read_messages) {
// TODO
}
void SecretChatActor::on_outbound_action(secret_api::decryptedMessageActionDeleteMessages &delete_messages) {
// Corresponding logevent won't be deleted before promise returned by add_changes is set.
on_delete_messages(delete_messages.random_ids_).ensure();
}
void SecretChatActor::on_outbound_action(secret_api::decryptedMessageActionScreenshotMessages &screenshot) {
// noting to do
}
void SecretChatActor::on_outbound_action(secret_api::decryptedMessageActionFlushHistory &flush_history) {
on_flush_history(pfs_state_.message_id).ensure();
}
void SecretChatActor::on_outbound_action(secret_api::decryptedMessageActionResend &resend) {
if (seq_no_state_.resend_end_seq_no < resend.end_seq_no_ / 2) { // replay protection
seq_no_state_.resend_end_seq_no = resend.end_seq_no_ / 2;
on_seq_no_state_changed();
}
}
void SecretChatActor::on_outbound_action(secret_api::decryptedMessageActionNotifyLayer &notify_layer) {
config_state_.my_layer = notify_layer.layer_;
context_->secret_chat_db()->set_value(config_state_);
}
void SecretChatActor::on_outbound_action(secret_api::decryptedMessageActionTyping &typing) {
// noop
}
Status SecretChatActor::on_inbound_action(secret_api::decryptedMessageActionSetMessageTTL &set_ttl) {
config_state_.ttl = set_ttl.ttl_seconds_;
context_->secret_chat_db()->set_value(config_state_);
send_update_secret_chat();
return Status::OK();
}
Status SecretChatActor::on_inbound_action(secret_api::decryptedMessageActionReadMessages &read_messages) {
// TODO
return Status::OK();
}
Status SecretChatActor::on_inbound_action(secret_api::decryptedMessageActionDeleteMessages &delete_messages) {
return on_delete_messages(delete_messages.random_ids_);
}
Status SecretChatActor::on_inbound_action(secret_api::decryptedMessageActionScreenshotMessages &screenshot) {
// TODO
return Status::OK();
}
Status SecretChatActor::on_inbound_action(secret_api::decryptedMessageActionFlushHistory &screenshot) {
return on_flush_history(pfs_state_.message_id);
}
Status SecretChatActor::on_inbound_action(secret_api::decryptedMessageActionResend &resend) {
return Status::OK();
}
Status SecretChatActor::on_inbound_action(secret_api::decryptedMessageActionNotifyLayer &notify_layer) {
if (notify_layer.layer_ > config_state_.his_layer) {
config_state_.his_layer = notify_layer.layer_;
context_->secret_chat_db()->set_value(config_state_);
send_update_secret_chat();
}
return Status::OK();
}
Status SecretChatActor::on_inbound_action(secret_api::decryptedMessageActionTyping &typing) {
// noop
return Status::OK();
}
// Perfect Forward Secrecy
void SecretChatActor::on_outbound_action(secret_api::decryptedMessageActionRequestKey &request_key) {
LOG_CHECK(pfs_state_.state == PfsState::WaitSendRequest || pfs_state_.state == PfsState::SendRequest) << pfs_state_;
pfs_state_.state = PfsState::WaitRequestResponse;
on_pfs_state_changed();
}
void SecretChatActor::on_outbound_action(secret_api::decryptedMessageActionAcceptKey &accept_key) {
CHECK(pfs_state_.state == PfsState::WaitSendAccept || pfs_state_.state == PfsState::SendAccept);
pfs_state_.state = PfsState::WaitAcceptResponse;
pfs_state_.handshake = DhHandshake();
on_pfs_state_changed();
}
void SecretChatActor::on_outbound_action(secret_api::decryptedMessageActionAbortKey &abort_key) {
// TODO
LOG(FATAL) << "TODO";
}
void SecretChatActor::on_outbound_action(secret_api::decryptedMessageActionCommitKey &commit_key) {
CHECK(pfs_state_.state == PfsState::WaitSendCommit || pfs_state_.state == PfsState::SendCommit);
CHECK(static_cast<int64>(pfs_state_.other_auth_key.id()) == commit_key.key_fingerprint_);
std::swap(pfs_state_.auth_key, pfs_state_.other_auth_key);
pfs_state_.can_forget_other_key = true;
pfs_state_.state = PfsState::Empty;
pfs_state_.last_message_id = pfs_state_.message_id;
pfs_state_.last_timestamp = Time::now();
pfs_state_.last_out_seq_no = seq_no_state_.my_out_seq_no;
on_pfs_state_changed();
}
void SecretChatActor::on_outbound_action(secret_api::decryptedMessageActionNoop &noop) {
// noop
}
// decryptedMessageActionRequestKey#f3c9611b exchange_id:long g_a:bytes = DecryptedMessageAction;
Status SecretChatActor::on_inbound_action(secret_api::decryptedMessageActionRequestKey &request_key) {
if (pfs_state_.state == PfsState::WaitRequestResponse || pfs_state_.state == PfsState::SendRequest) {
if (pfs_state_.exchange_id > request_key.exchange_id_) {
LOG(INFO) << "RequestKey: silently abort their request";
return Status::OK();
} else {
pfs_state_.state = PfsState::Empty;
if (pfs_state_.exchange_id == request_key.exchange_id_) {
context_->secret_chat_db()->set_value(pfs_state_);
LOG(WARNING) << "RequestKey: silently abort both requests (almost impossible)";
return Status::OK();
}
}
}
if (pfs_state_.state != PfsState::Empty) {
return Status::Error("Unexpected RequestKey");
}
LOG_CHECK(pfs_state_.other_auth_key.empty()) << "TODO: got requestKey, before old key is dropped";
pfs_state_.state = PfsState::SendAccept;
pfs_state_.handshake = DhHandshake();
pfs_state_.exchange_id = request_key.exchange_id_;
pfs_state_.handshake.set_config(auth_state_.dh_config.g, auth_state_.dh_config.prime);
pfs_state_.handshake.set_g_a(request_key.g_a_.as_slice());
TRY_STATUS(pfs_state_.handshake.run_checks(true, context_->dh_callback()));
auto id_and_key = pfs_state_.handshake.gen_key();
pfs_state_.other_auth_key = mtproto::AuthKey(id_and_key.first, std::move(id_and_key.second));
pfs_state_.can_forget_other_key = false;
pfs_state_.wait_message_id = pfs_state_.message_id;
on_pfs_state_changed();
return Status::OK();
}
// decryptedMessageActionAcceptKey#6fe1735b exchange_id:long g_b:bytes key_fingerprint:long = DecryptedMessageAction;
Status SecretChatActor::on_inbound_action(secret_api::decryptedMessageActionAcceptKey &accept_key) {
if (pfs_state_.state != PfsState::WaitRequestResponse) {
return Status::Error("AcceptKey: unexpected");
}
if (pfs_state_.exchange_id != accept_key.exchange_id_) {
return Status::Error("AcceptKey: exchange_id mismatch");
}
pfs_state_.handshake.set_g_a(accept_key.g_b_.as_slice());
TRY_STATUS(pfs_state_.handshake.run_checks(true, context_->dh_callback()));
auto id_and_key = pfs_state_.handshake.gen_key();
if (static_cast<int64>(id_and_key.first) != accept_key.key_fingerprint_) {
return Status::Error("AcceptKey: key_fingerprint mismatch");
}
pfs_state_.state = PfsState::SendCommit;
pfs_state_.handshake = DhHandshake();
CHECK(pfs_state_.can_forget_other_key || static_cast<int64>(pfs_state_.other_auth_key.id()) == id_and_key.first);
pfs_state_.other_auth_key = mtproto::AuthKey(id_and_key.first, std::move(id_and_key.second));
pfs_state_.can_forget_other_key = false;
pfs_state_.wait_message_id = pfs_state_.message_id;
on_pfs_state_changed();
return Status::OK();
}
Status SecretChatActor::on_inbound_action(secret_api::decryptedMessageActionAbortKey &abort_key) {
if (pfs_state_.exchange_id != abort_key.exchange_id_) {
LOG(INFO) << "AbortKey: exchange_id mismatch: " << tag("my exchange_id", pfs_state_.exchange_id)
<< to_string(abort_key);
return Status::OK();
}
if (pfs_state_.state != PfsState::WaitRequestResponse) {
return Status::Error("AbortKey: unexpected");
}
pfs_state_.state = PfsState::Empty;
pfs_state_.handshake = DhHandshake();
on_pfs_state_changed();
return Status::OK();
}
Status SecretChatActor::on_inbound_action(secret_api::decryptedMessageActionCommitKey &commit_key) {
if (pfs_state_.state != PfsState::WaitAcceptResponse) {
return Status::Error("CommitKey: unexpected");
}
if (pfs_state_.exchange_id != commit_key.exchange_id_) {
return Status::Error("CommitKey: exchange_id mismatch ");
}
CHECK(!pfs_state_.can_forget_other_key);
if (static_cast<int64>(pfs_state_.other_auth_key.id()) != commit_key.key_fingerprint_) {
return Status::Error("CommitKey: fingerprint mismatch");
}
std::swap(pfs_state_.auth_key, pfs_state_.other_auth_key);
pfs_state_.can_forget_other_key = true;
pfs_state_.state = PfsState::Empty;
pfs_state_.last_message_id = pfs_state_.message_id;
pfs_state_.last_timestamp = Time::now();
pfs_state_.last_out_seq_no = seq_no_state_.my_out_seq_no;
on_pfs_state_changed();
return Status::OK();
}
Status SecretChatActor::on_inbound_action(secret_api::decryptedMessageActionNoop &noop) {
// noop
return Status::OK();
}
Status SecretChatActor::on_inbound_action(secret_api::DecryptedMessageAction &action, int32 message_id) {
// Action may be not about PFS, but we still can use pfs_state_.message_id
if (message_id <= pfs_state_.message_id) { // replay protection
LOG(INFO) << "Drop old inbound DecryptedMessageAction: " << to_string(action) << tag("message_id", message_id)
<< tag("known_message_id", pfs_state_.message_id);
return Status::OK();
}
// if message_id < seq_no_state_.message_id, then SeqNoState with message_id bigger than current message_id is already saved.
// And event corresponding to message_id is saved too.
//
// Also, if SeqNoState with message_id greater than current message_id is not saved, then corresponding action will be
// replayed.
//
// This works only for ttl, not for pfs. Same ttl action may be processed twice.
if (message_id < seq_no_state_.message_id) {
LOG(INFO) << "Drop old inbound DecryptedMessageAction (non-pfs action): " << to_string(action);
return Status::OK();
}
pfs_state_.message_id = message_id; // replay protection
LOG(INFO) << "In on_inbound_action: " << to_string(action);
Status res;
downcast_call(action, [&](auto &obj) { res = this->on_inbound_action(obj); });
return res;
}
void SecretChatActor::on_outbound_action(secret_api::DecryptedMessageAction &action, int32 message_id) {
// Action may be not about PFS, but we still can use pfs_state_.message_id
if (message_id <= pfs_state_.message_id) { // replay protection
LOG(INFO) << "Drop old outbound DecryptedMessageAction: " << to_string(action);
return;
}
// see comment in on_inbound_action
if (message_id < seq_no_state_.message_id) {
LOG(INFO) << "Drop old outbound DecryptedMessageAction (non-pfs action): " << to_string(action);
return;
}
pfs_state_.message_id = message_id; // replay protection
LOG(INFO) << "In on_outbound_action: " << to_string(action);
downcast_call(action, [&](auto &obj) { this->on_outbound_action(obj); });
}
// decryptedMessageActionRequestKey#f3c9611b exchange_id:long g_a:bytes = DecryptedMessageAction;
void SecretChatActor::request_new_key() {
CHECK(!auth_state_.dh_config.empty());
pfs_state_.state = PfsState::SendRequest;
pfs_state_.handshake = DhHandshake();
pfs_state_.handshake.set_config(auth_state_.dh_config.g, auth_state_.dh_config.prime);
pfs_state_.exchange_id = Random::secure_int64();
// NB: must save explicitly
LOG(INFO) << "SAVE PfsState " << pfs_state_;
context_->secret_chat_db()->set_value(pfs_state_);
}
void SecretChatActor::on_promise_error(Status error, string desc) {
if (context_->close_flag()) {
LOG(DEBUG) << "Ignore " << tag("promise", desc) << error;
return;
}
LOG(FATAL) << "Failed: " << tag("promise", desc) << error;
}
constexpr int32 SecretChatActor::MAX_RESEND_COUNT;
} // namespace td