//
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2019
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include "td/telegram/SecretChatActor.h"

#include "td/telegram/net/NetQueryCreator.h"
#include "td/telegram/SecretChatId.h"
#include "td/telegram/UniqueId.h"

#include "td/telegram/secret_api.hpp"
#include "td/telegram/telegram_api.hpp"

#include "td/mtproto/PacketInfo.h"
#include "td/mtproto/PacketStorer.h"
#include "td/mtproto/Transport.h"
#include "td/mtproto/utils.h"

#include "td/db/binlog/BinlogHelper.h"
#include "td/db/binlog/BinlogInterface.h"

#include "td/actor/MultiPromise.h"

#include "td/utils/as.h"
#include "td/utils/crypto.h"
#include "td/utils/format.h"
#include "td/utils/logging.h"
#include "td/utils/misc.h"
#include "td/utils/overloaded.h"
#include "td/utils/Random.h"
#include "td/utils/ScopeGuard.h"
#include "td/utils/StorerBase.h"
#include "td/utils/Time.h"
#include "td/utils/tl_parsers.h"

#include <array>
#include <tuple>
#include <type_traits>

#define G GLOBAL_SHOULD_NOT_BE_USED_HERE

namespace td {

inline TLObjectStorer<secret_api::Object> create_storer(const secret_api::Object &object) {
  return TLObjectStorer<secret_api::Object>(object);
}

class SecretImpl {
 public:
  explicit SecretImpl(const Storer &data) : data(data) {
  }
  template <class StorerT>
  void do_store(StorerT &storer) const {
    storer.store_binary(static_cast<int32>(data.size()));
    storer.store_storer(data);
  }

 private:
  const Storer &data;
};

SecretChatActor::SecretChatActor(int32 id, unique_ptr<Context> context, bool can_be_empty)
    : context_(std::move(context)), can_be_empty_(can_be_empty) {
  auth_state_.id = id;
}

void SecretChatActor::update_chat(telegram_api::object_ptr<telegram_api::EncryptedChat> chat) {
  if (close_flag_) {
    return;
  }
  check_status(on_update_chat(std::move(chat)));
  loop();
}

void SecretChatActor::create_chat(int32 user_id, int64 user_access_hash, int32 random_id,
                                  Promise<SecretChatId> promise) {
  if (close_flag_) {
    promise.set_error(Status::Error(400, "Chat is closed"));
    return;
  }
  if (auth_state_.state != State::Empty) {
    promise.set_error(Status::Error(500, "Bad random_id"));
    check_status(Status::Error("Unexpected request_chat"));
    loop();
    return;
  }

  auto event = make_unique<logevent::CreateSecretChat>();
  event->user_id = user_id;
  event->user_access_hash = user_access_hash;
  event->random_id = random_id;
  event->set_logevent_id(binlog_add(context_->binlog(), LogEvent::HandlerType::SecretChats, create_storer(*event)));
  do_create_chat_impl(std::move(event));
  promise.set_value(SecretChatId(random_id));
  loop();
}

void SecretChatActor::on_result_resendable(NetQueryPtr net_query, Promise<NetQueryPtr> promise) {
  LOG(INFO) << "In on_result_resendable: " << net_query << " " << close_flag_;
  if (context_->close_flag()) {
    return;
  }

  auto key = UniqueId::extract_key(net_query->id());
  if (close_flag_) {
    if (key == static_cast<uint8>(QueryType::DiscardEncryption)) {
      on_discard_encryption_result(std::move(net_query));
    }
    return;
  }
  check_status([&] {
    switch (key) {
      case static_cast<uint8>(QueryType::DhConfig):
        return on_dh_config(std::move(net_query));
      case static_cast<uint8>(QueryType::EncryptedChat):
        return on_update_chat(std::move(net_query));
      case static_cast<uint8>(QueryType::Message):
        on_outbound_send_message_result(std::move(net_query), std::move(promise));
        return Status::OK();
      case static_cast<uint8>(QueryType::ReadHistory):
        return on_read_history(std::move(net_query));
      case static_cast<uint8>(QueryType::Ignore):
        return Status::OK();
    }
    UNREACHABLE();
  }());

  loop();
}

void SecretChatActor::replay_close_chat(unique_ptr<logevent::CloseSecretChat> event) {
  do_close_chat_impl(std::move(event));
}

void SecretChatActor::replay_create_chat(unique_ptr<logevent::CreateSecretChat> event) {
  if (close_flag_) {
    return;
  }
  do_create_chat_impl(std::move(event));
}

void SecretChatActor::add_inbound_message(unique_ptr<logevent::InboundSecretMessage> message) {
  SCOPE_EXIT {
    if (message) {
      message->qts_ack.set_value(Unit());
    }
  };
  if (close_flag_) {
    return;
  }
  if (auth_state_.state != State::Ready) {
    LOG(ERROR) << "Ignore unexpected update: " << tag("message", *message);
    return;
  }
  check_status(do_inbound_message_encrypted(std::move(message)));
  loop();
}

void SecretChatActor::replay_inbound_message(unique_ptr<logevent::InboundSecretMessage> message) {
  if (close_flag_) {
    return;
  }
  if (auth_state_.state != State::Ready) {
    LOG(ERROR) << "Ignore unexpected replay inbound message: " << tag("message", *message);
    return;
  }

  CHECK(!binlog_replay_finish_flag_);
  CHECK(message->decrypted_message_layer);  // from binlog
  if (message->is_pending) {                // wait for gaps?
    // check_status(do_inbound_message_decrypted_unchecked(std::move(message)));
    do_inbound_message_decrypted_pending(std::move(message));
  } else {  // just replay
    LOG_CHECK(message->message_id > last_binlog_message_id_)
        << tag("last_binlog_message_id", last_binlog_message_id_) << tag("message_id", message->message_id);
    last_binlog_message_id_ = message->message_id;
    check_status(do_inbound_message_decrypted(std::move(message)));
  }
  loop();
}

void SecretChatActor::replay_outbound_message(unique_ptr<logevent::OutboundSecretMessage> message) {
  if (close_flag_) {
    return;
  }
  if (auth_state_.state != State::Ready) {
    LOG(ERROR) << "Ignore unexpected replay outbound message: " << tag("message", *message);
    return;
  }
  CHECK(!binlog_replay_finish_flag_);
  LOG_CHECK(message->message_id > last_binlog_message_id_)
      << tag("last_binlog_message_id", last_binlog_message_id_) << tag("message_id", message->message_id);
  last_binlog_message_id_ = message->message_id;
  do_outbound_message_impl(std::move(message), Promise<>());
  loop();
}

// NB: my_seq_no is just after message is sent, i.e. my_out_seq_no is already incremented
Result<BufferSlice> SecretChatActor::create_encrypted_message(int32 layer, int32 my_in_seq_no, int32 my_out_seq_no,
                                                              tl_object_ptr<secret_api::DecryptedMessage> &message) {
  if (message->get_id() == secret_api::decryptedMessage::ID && layer < MTPROTO_2_LAYER) {
    auto old = secret_api::move_object_as<secret_api::decryptedMessage>(message);
    old->flags_ &= ~secret_api::decryptedMessage::GROUPED_ID_MASK;
    message = secret_api::make_object<secret_api::decryptedMessage46>(
        old->flags_, old->random_id_, old->ttl_, std::move(old->message_), std::move(old->media_),
        std::move(old->entities_), std::move(old->via_bot_name_), old->reply_to_random_id_);
  }

  mtproto::AuthKey *auth_key = &pfs_state_.auth_key;
  auto in_seq_no = my_in_seq_no * 2 + auth_state_.x;
  auto out_seq_no = my_out_seq_no * 2 - 1 - auth_state_.x;

  BufferSlice random_bytes(32);
  Random::secure_bytes(random_bytes.as_slice().ubegin(), random_bytes.size());
  auto message_with_layer = secret_api::make_object<secret_api::decryptedMessageLayer>(
      std::move(random_bytes), layer, in_seq_no, out_seq_no, std::move(message));
  LOG(INFO) << to_string(message_with_layer);
  auto storer = create_storer(*message_with_layer);
  auto new_storer = mtproto::PacketStorer<SecretImpl>(storer);
  mtproto::PacketInfo info;
  info.type = mtproto::PacketInfo::EndToEnd;
  // Send with mtproto 2.0 if current layer is at least MTPROTO_2_LAYER
  info.version = layer >= MTPROTO_2_LAYER ? 2 : 1;
  info.is_creator = auth_state_.x == 0;
  auto packet_writer = BufferWriter{mtproto::Transport::write(new_storer, *auth_key, &info), 0, 0};
  mtproto::Transport::write(new_storer, *auth_key, &info, packet_writer.as_slice());
  message = std::move(message_with_layer->message_);
  return packet_writer.as_buffer_slice();
}

void SecretChatActor::send_message(tl_object_ptr<secret_api::DecryptedMessage> message,
                                   tl_object_ptr<telegram_api::InputEncryptedFile> file, Promise<> promise) {
  if (close_flag_) {
    promise.set_error(Status::Error(400, "Chat is closed"));
    return;
  }
  send_message_impl(std::move(message), std::move(file), SendFlag::External | SendFlag::Push, std::move(promise));
}

static int32 get_min_layer(const secret_api::decryptedMessageActionTyping &message) {
  switch (message.action_->get_id()) {
    case secret_api::sendMessageRecordRoundAction::ID:
    case secret_api::sendMessageUploadRoundAction::ID:
      return SecretChatActor::VIDEO_NOTES_LAYER;
  }
  return 0;
}
static int32 get_min_layer(const secret_api::decryptedMessageService &message) {
  switch (message.action_->get_id()) {
    case secret_api::decryptedMessageActionTyping::ID:
      return get_min_layer(static_cast<const secret_api::decryptedMessageActionTyping &>(*message.action_));
    default:
      return 0;
  }
}
static int32 get_min_layer(const secret_api::DocumentAttribute &attribute) {
  switch (attribute.get_id()) {
    case secret_api::documentAttributeVideo66::ID:
      return SecretChatActor::VIDEO_NOTES_LAYER;
    default:
      return 0;
  }
}
static int32 get_min_layer(const secret_api::decryptedMessageMediaDocument &message) {
  int32 res = 0;
  for (auto &attribute : message.attributes_) {
    auto attrirbute_layer = get_min_layer(*attribute);
    if (attrirbute_layer > res) {
      res = attrirbute_layer;
    }
    return res;
  }
  return res;
}
static int32 get_min_layer(const secret_api::decryptedMessage &message) {
  if (!message.media_) {
    return 0;
  }
  switch (message.media_->get_id()) {
    case secret_api::decryptedMessageMediaDocument::ID:
      return get_min_layer(static_cast<const secret_api::decryptedMessageMediaDocument &>(*message.media_));
    default:
      return 0;
  }
}
static int32 get_min_layer(const secret_api::DecryptedMessage &message) {
  switch (message.get_id()) {
    case secret_api::decryptedMessageService::ID:
      return get_min_layer(static_cast<const secret_api::decryptedMessageService &>(message));
    case secret_api::decryptedMessage::ID:
      return get_min_layer(static_cast<const secret_api::decryptedMessage &>(message));
    default:
      return 0;
  }
}

void SecretChatActor::send_message_impl(tl_object_ptr<secret_api::DecryptedMessage> message,
                                        tl_object_ptr<telegram_api::InputEncryptedFile> file, int32 flags,
                                        Promise<> promise) {
  if (close_flag_) {
    promise.set_error(Status::Error(400, "Chat is closed"));
    return;
  }
  if (auth_state_.state != State::Ready) {
    LOG(ERROR) << "Ignore send_message: " << tag("message", to_string(message)) << tag("file", to_string(file));
    return promise.set_error(Status::Error(400, "Chat is not accessible"));
  }
  if (get_min_layer(*message) > config_state_.his_layer) {
    return promise.set_error(Status::Error(400, "Message is not supported by the other side"));
  }
  LOG_CHECK(binlog_replay_finish_flag_) << "Trying to send message before binlog replay is finished: "
                                        << to_string(*message) << to_string(file);
  int64 random_id = 0;
  downcast_call(*message, [&](auto &x) { random_id = x.random_id_; });

  LOG(INFO) << "Send message: " << to_string(*message) << to_string(file);

  auto it = random_id_to_outbound_message_state_token_.find(random_id);
  if (it != end(random_id_to_outbound_message_state_token_)) {
    return on_outbound_outer_send_message_promise(it->second, std::move(promise));
  }

  auto binlog_event = make_unique<logevent::OutboundSecretMessage>();
  binlog_event->chat_id = auth_state_.id;
  binlog_event->random_id = random_id;
  binlog_event->file = logevent::EncryptedInputFile::from_input_encrypted_file(file);
  binlog_event->message_id = seq_no_state_.message_id + 1;
  binlog_event->my_in_seq_no = seq_no_state_.my_in_seq_no;
  binlog_event->my_out_seq_no = seq_no_state_.my_out_seq_no + 1;
  binlog_event->his_in_seq_no = seq_no_state_.his_in_seq_no;
  binlog_event->encrypted_message =
      create_encrypted_message(current_layer(), binlog_event->my_in_seq_no, binlog_event->my_out_seq_no, message)
          .move_as_ok();
  binlog_event->is_service = (flags & SendFlag::Push) == 0;
  binlog_event->is_external = (flags & SendFlag::External) != 0;
  if (message->get_id() == secret_api::decryptedMessageService::ID) {
    binlog_event->is_rewritable = false;
    auto service_message = move_tl_object_as<secret_api::decryptedMessageService>(message);
    binlog_event->action = std::move(service_message->action_);
  } else {
    binlog_event->is_rewritable = true;
  }

  do_outbound_message_impl(std::move(binlog_event), std::move(promise));
}

void SecretChatActor::send_message_action(tl_object_ptr<secret_api::SendMessageAction> action) {
  if (close_flag_) {
    return;
  }
  if (auth_state_.state != State::Ready) {
    LOG(ERROR) << "Ignore send_message_action: " << tag("message", to_string(action));
    return;
  }
  bool flag = action->get_id() != secret_api::sendMessageCancelAction::ID;

  auto net_query = context_->net_query_creator().create(
      UniqueId::next(UniqueId::Type::Default, static_cast<uint8>(QueryType::Ignore)),
      create_storer(telegram_api::messages_setEncryptedTyping(get_input_chat(), flag)));
  if (!set_typing_query_.empty()) {
    LOG(INFO) << "Cancel previous set typing query";
    cancel_query(set_typing_query_);
  }
  set_typing_query_ = net_query.get_weak();
  context_->send_net_query(std::move(net_query), actor_shared(this), false);
}

void SecretChatActor::send_read_history(int32 date, Promise<> promise) {
  if (close_flag_) {
    promise.set_error(Status::Error(400, "Chat is closed"));
    return;
  }
  if (auth_state_.state != State::Ready) {
    LOG(ERROR) << "Ignore send_read_history: " << tag("date", date);
    promise.set_error(Status::Error(400, "Can't access the chat"));
    return;
  }

  if (date <= last_read_history_date_) {
    return promise.set_value(Unit());
  }

  if (read_history_promise_) {
    LOG(INFO) << "Cancel previous read history request in secret chat " << auth_state_.id;
    read_history_promise_.set_value(Unit());
    cancel_query(read_history_query_);
  }

  auto net_query = context_->net_query_creator().create(
      UniqueId::next(UniqueId::Type::Default, static_cast<uint8>(QueryType::ReadHistory)),
      create_storer(telegram_api::messages_readEncryptedHistory(get_input_chat(), date)));
  read_history_query_ = net_query.get_weak();
  last_read_history_date_ = date;
  read_history_promise_ = std::move(promise);
  LOG(INFO) << "Send read history request with date " << date << " in secret chat " << auth_state_.id;
  context_->send_net_query(std::move(net_query), actor_shared(this), false);
}

void SecretChatActor::send_open_message(int64 random_id, Promise<> promise) {
  if (close_flag_) {
    promise.set_error(Status::Error(400, "Chat is closed"));
    return;
  }
  if (auth_state_.state != State::Ready) {
    promise.set_error(Status::Error(400, "Can't access the chat"));
    return;
  }
  std::vector<int64> random_ids{random_id};
  send_action(make_tl_object<secret_api::decryptedMessageActionReadMessages>(std::move(random_ids)), SendFlag::Push,
              std::move(promise));
}

void SecretChatActor::delete_message(int64 random_id, Promise<> promise) {
  if (auth_state_.state == State::Closed) {
    promise.set_value(Unit());
    return;
  }
  if (close_flag_) {
    promise.set_error(Status::Error(400, "Chat is closed"));
    return;
  }
  if (auth_state_.state != State::Ready) {
    promise.set_error(Status::Error(400, "Can't access the chat"));
    return;
  }
  return delete_messages(std::vector<int64>{random_id}, std::move(promise));
}

void SecretChatActor::delete_messages(std::vector<int64> random_ids, Promise<> promise) {
  if (auth_state_.state == State::Closed) {
    promise.set_value(Unit());
    return;
  }
  if (close_flag_) {
    promise.set_error(Status::Error(400, "Chat is closed"));
    return;
  }
  if (auth_state_.state != State::Ready) {
    promise.set_error(Status::Error(400, "Can't access the chat"));
    return;
  }
  send_action(make_tl_object<secret_api::decryptedMessageActionDeleteMessages>(std::move(random_ids)), SendFlag::Push,
              std::move(promise));
}
void SecretChatActor::delete_all_messages(Promise<> promise) {
  if (auth_state_.state == State::Closed) {
    promise.set_value(Unit());
    return;
  }
  if (close_flag_) {
    promise.set_error(Status::Error(400, "Chat is closed"));
    return;
  }
  if (auth_state_.state != State::Ready) {
    promise.set_error(Status::Error(400, "Can't access the chat"));
    return;
  }
  send_action(make_tl_object<secret_api::decryptedMessageActionFlushHistory>(), SendFlag::Push, std::move(promise));
}

void SecretChatActor::notify_screenshot_taken(Promise<> promise) {
  if (close_flag_) {
    promise.set_error(Status::Error(400, "Chat is closed"));
    return;
  }
  if (auth_state_.state != State::Ready) {
    promise.set_error(Status::Error(400, "Can't access the chat"));
    return;
  }
  send_action(make_tl_object<secret_api::decryptedMessageActionScreenshotMessages>(), SendFlag::Push,
              std::move(promise));
}

void SecretChatActor::send_set_ttl_message(int32 ttl, int64 random_id, Promise<> promise) {
  if (close_flag_) {
    promise.set_error(Status::Error(400, "Chat is closed"));
    return;
  }
  if (auth_state_.state != State::Ready) {
    promise.set_error(Status::Error(400, "Can't access the chat"));
    return;
  }
  send_message_impl(secret_api::make_object<secret_api::decryptedMessageService>(
                        random_id, make_tl_object<secret_api::decryptedMessageActionSetMessageTTL>(ttl)),
                    nullptr, SendFlag::External | SendFlag::Push, std::move(promise));
}

void SecretChatActor::send_action(tl_object_ptr<secret_api::DecryptedMessageAction> action, int32 flags,
                                  Promise<> promise) {
  send_message_impl(
      secret_api::make_object<secret_api::decryptedMessageService>(Random::secure_int64(), std::move(action)), nullptr,
      flags, std::move(promise));
}

void SecretChatActor::binlog_replay_finish() {
  on_his_in_seq_no_updated();
  LOG(INFO) << "Binlog replay is finished with SeqNoState=" << seq_no_state_;
  LOG(INFO) << "Binlog replay is finished with PfsState=" << pfs_state_;
  binlog_replay_finish_flag_ = true;
  if (auth_state_.state == State::Ready) {
    if (config_state_.my_layer < MY_LAYER) {
      send_action(secret_api::make_object<secret_api::decryptedMessageActionNotifyLayer>(MY_LAYER), SendFlag::None,
                  Promise<>());
    }
  }
  yield();
}

void SecretChatActor::loop() {
  if (close_flag_) {
    return;
  }
  if (!binlog_replay_finish_flag_) {
    return;
  }

  check_status(do_loop());
}

Status SecretChatActor::do_loop() {
  TRY_STATUS(run_auth());
  run_pfs();
  run_fill_gaps();
  return Status::OK();
}

void SecretChatActor::on_send_message_ack(int64 random_id) {
  context_->on_send_message_ack(random_id);
}

Status SecretChatActor::on_delete_messages(const std::vector<int64> &random_ids) {
  for (auto random_id : random_ids) {
    auto it = random_id_to_outbound_message_state_token_.find(random_id);
    if (it == random_id_to_outbound_message_state_token_.end()) {
      continue;
    }
    auto state_id = it->second;
    TRY_STATUS(outbound_rewrite_with_empty(state_id));
  }
  return Status::OK();
}

Status SecretChatActor::on_flush_history(int32 last_message_id) {
  std::vector<uint64> to_rewrite;
  outbound_message_states_.for_each([&](auto state_id, auto &state) {
    if (state.message->message_id < last_message_id && state.message->is_rewritable) {
      to_rewrite.push_back(state_id);
    }
  });
  for (auto state_id : to_rewrite) {
    TRY_STATUS(outbound_rewrite_with_empty(state_id));
  }
  return Status::OK();
}

Status SecretChatActor::run_auth() {
  switch (auth_state_.state) {
    case State::Empty:
      return Status::OK();
    case State::SendRequest: {
      if (!auth_state_.handshake.has_config()) {
        return Status::OK();
      }
      // messages.requestEncryption#f64daf43 user_id:InputUser random_id:int g_a:bytes = EncryptedChat;
      telegram_api::messages_requestEncryption tl_query;
      tl_query.user_id_ = get_input_user();
      tl_query.random_id_ = auth_state_.random_id;
      tl_query.g_a_ = BufferSlice(auth_state_.handshake.get_g_b());
      auto query = context_->net_query_creator().create(
          UniqueId::next(UniqueId::Type::Default, static_cast<uint8>(QueryType::EncryptedChat)),
          create_storer(tl_query));
      context_->send_net_query(std::move(query), actor_shared(this), false);
      auth_state_.state = State::WaitRequestResponse;
      return Status::OK();
    }
    case State::SendAccept: {
      if (!auth_state_.handshake.has_config()) {
        return Status::OK();
      }
      TRY_STATUS(auth_state_.handshake.run_checks(true, context_->dh_callback()));
      auto id_and_key = auth_state_.handshake.gen_key();
      pfs_state_.auth_key = mtproto::AuthKey(id_and_key.first, std::move(id_and_key.second));
      calc_key_hash();
      // messages.acceptEncryption#3dbc0415 peer:InputEncryptedChat g_b:bytes key_fingerprint:long =
      // EncryptedChat;
      telegram_api::messages_acceptEncryption tl_query;
      tl_query.peer_ = get_input_chat();
      tl_query.g_b_ = BufferSlice(auth_state_.handshake.get_g_b());
      tl_query.key_fingerprint_ = pfs_state_.auth_key.id();
      auto query = context_->net_query_creator().create(
          UniqueId::next(UniqueId::Type::Default, static_cast<uint8>(QueryType::EncryptedChat)),
          create_storer(tl_query));
      context_->send_net_query(std::move(query), actor_shared(this), false);
      auth_state_.state = State::WaitAcceptResponse;
      return Status::OK();
    }
    default:
      break;
  }
  return Status::OK();
}

void SecretChatActor::run_fill_gaps() {
  // replay messages
  while (true) {
    if (pending_inbound_messages_.empty()) {
      break;
    }
    auto begin = pending_inbound_messages_.begin();
    auto next_seq_no = begin->first;
    if (next_seq_no <= seq_no_state_.my_in_seq_no) {
      LOG(INFO) << "Replay pending event: " << tag("seq_no", next_seq_no);
      auto message = std::move(begin->second);
      pending_inbound_messages_.erase(begin);
      check_status(do_inbound_message_decrypted_unchecked(std::move(message)));
      CHECK(pending_inbound_messages_.find(next_seq_no) == pending_inbound_messages_.end());
    } else {
      break;
    }
  }

  if (pending_inbound_messages_.empty()) {
    return;
  }

  auto start_seq_no = seq_no_state_.my_in_seq_no;
  auto finish_seq_no = pending_inbound_messages_.begin()->first - 1;
  LOG(INFO) << tag("start_seq_no", start_seq_no) << tag("finish_seq_no", finish_seq_no)
            << tag("resend_end_seq_no", seq_no_state_.resend_end_seq_no);
  CHECK(start_seq_no <= finish_seq_no);
  if (seq_no_state_.resend_end_seq_no >= finish_seq_no) {
    return;
  }
  CHECK(seq_no_state_.resend_end_seq_no < start_seq_no);

  start_seq_no = start_seq_no * 2 + auth_state_.x;
  finish_seq_no = finish_seq_no * 2 + auth_state_.x;

  send_action(secret_api::make_object<secret_api::decryptedMessageActionResend>(start_seq_no, finish_seq_no),
              SendFlag::None, Promise<>());
}

void SecretChatActor::run_pfs() {
  while (true) {
    LOG(INFO) << "Run pfs loop: " << pfs_state_;
    if (pfs_state_.state == PfsState::Empty &&
        (pfs_state_.last_message_id + 100 < seq_no_state_.message_id ||
         pfs_state_.last_timestamp + 60 * 60 * 24 * 7 < Time::now()) &&
        pfs_state_.other_auth_key.empty()) {
      LOG(INFO) << "Request new key";
      request_new_key();
    }
    switch (pfs_state_.state) {
      case PfsState::SendRequest: {
        // shouldn't wait, pfs_state is already saved explicitly
        pfs_state_.state = PfsState::WaitSendRequest;  // don't save it!
        send_action(secret_api::make_object<secret_api::decryptedMessageActionRequestKey>(
                        pfs_state_.exchange_id, BufferSlice(pfs_state_.handshake.get_g_b())),
                    SendFlag::None, Promise<>());
        break;
      }
      case PfsState::SendCommit: {
        // must wait till pfs_state is saved to binlog. Otherwise we may save ActionCommit to binlog without pfs_state,
        // which has the new auth_key.
        if (saved_pfs_state_message_id_ < pfs_state_.wait_message_id) {
          return;
        }

        // TODO: wait till gaps are filled???
        pfs_state_.state = PfsState::WaitSendCommit;  // don't save it
        send_action(secret_api::make_object<secret_api::decryptedMessageActionCommitKey>(
                        pfs_state_.exchange_id, static_cast<int64>(pfs_state_.other_auth_key.id())),
                    SendFlag::None, Promise<>());

        break;
      }
      case PfsState::SendAccept: {
        if (saved_pfs_state_message_id_ < pfs_state_.wait_message_id) {
          return;
        }

        pfs_state_.state = PfsState::WaitSendAccept;  // don't save it
        send_action(secret_api::make_object<secret_api::decryptedMessageActionAcceptKey>(
                        pfs_state_.exchange_id, BufferSlice(pfs_state_.handshake.get_g_b()),
                        static_cast<int64>(pfs_state_.other_auth_key.id())),
                    SendFlag::None, Promise<>());

        break;
      }
      default:
        return;
    }
  }
}

void SecretChatActor::check_status(Status status) {
  if (status.is_error()) {
    if (status.code() == 1) {
      LOG(WARNING) << "Non-fatal error: " << status;
    } else {
      on_fatal_error(std::move(status));
    }
  }
}

void SecretChatActor::on_fatal_error(Status status) {
  LOG(ERROR) << "Fatal error: " << status;
  cancel_chat(Promise<>());
}

void SecretChatActor::cancel_chat(Promise<> promise) {
  if (close_flag_) {
    promise.set_value(Unit());
    return;
  }
  close_flag_ = true;

  std::vector<logevent::LogEvent::Id> to_delete;
  outbound_message_states_.for_each(
      [&](auto state_id, auto &state) { to_delete.push_back(state.message->logevent_id()); });
  inbound_message_states_.for_each([&](auto state_id, auto &state) { to_delete.push_back(state.logevent_id); });

  // TODO: It must be a transaction
  for (auto id : to_delete) {
    binlog_erase(context_->binlog(), id);
  }
  if (create_logevent_id_ != 0) {
    binlog_erase(context_->binlog(), create_logevent_id_);
    create_logevent_id_ = 0;
  }

  auto event = make_unique<logevent::CloseSecretChat>();
  event->chat_id = auth_state_.id;
  event->set_logevent_id(binlog_add(context_->binlog(), LogEvent::HandlerType::SecretChats, create_storer(*event)));

  auto on_sync = PromiseCreator::lambda(
      [actor_id = actor_id(this), event = std::move(event), promise = std::move(promise)](Result<Unit> result) mutable {
        if (result.is_ok()) {
          send_closure(actor_id, &SecretChatActor::do_close_chat_impl, std::move(event));
          promise.set_value(Unit());
        } else {
          promise.set_error(result.error().clone());
          send_closure(actor_id, &SecretChatActor::on_promise_error, result.move_as_error(), "do_close_chat_impl");
        }
      });

  context_->binlog()->force_sync(std::move(on_sync));
  yield();
}

void SecretChatActor::do_close_chat_impl(unique_ptr<logevent::CloseSecretChat> event) {
  close_flag_ = true;
  close_logevent_id_ = event->logevent_id();
  LOG(INFO) << "Send messages.discardEncryption";
  auth_state_.state = State::Closed;
  context_->secret_chat_db()->set_value(auth_state_);
  context_->secret_chat_db()->erase_value(config_state_);
  context_->secret_chat_db()->erase_value(pfs_state_);
  context_->secret_chat_db()->erase_value(seq_no_state_);
  telegram_api::messages_discardEncryption tl_query(auth_state_.id);
  auto query = context_->net_query_creator().create(
      UniqueId::next(UniqueId::Type::Default, static_cast<uint8>(QueryType::DiscardEncryption)),
      create_storer(tl_query));

  send_update_secret_chat();

  context_->send_net_query(std::move(query), actor_shared(this), true);
}

void SecretChatActor::do_create_chat_impl(unique_ptr<logevent::CreateSecretChat> event) {
  LOG(INFO) << *event;
  CHECK(event->random_id == auth_state_.id);
  create_logevent_id_ = event->logevent_id();

  if (auth_state_.state == State::Empty) {
    auth_state_.user_id = event->user_id;
    auth_state_.user_access_hash = event->user_access_hash;
    auth_state_.random_id = event->random_id;
    auth_state_.state = State::SendRequest;
    auth_state_.x = 0;
    auth_state_.date = context_->unix_time();
    send_update_secret_chat();
  } else if (auth_state_.state == State::SendRequest) {
  } else if (auth_state_.state == State::WaitRequestResponse) {
  } else {
    binlog_erase(context_->binlog(), create_logevent_id_);
    create_logevent_id_ = 0;
  }
}
void SecretChatActor::on_discard_encryption_result(NetQueryPtr result) {
  CHECK(close_flag_);
  CHECK(close_logevent_id_ != 0);
  if (context_->close_flag()) {
    return;
  }
  LOG(INFO) << "Got result for messages.discardEncryption";
  context_->secret_chat_db()->erase_value(auth_state_);
  binlog_erase(context_->binlog(), close_logevent_id_);
  // skip flush
  stop();
}

telegram_api::object_ptr<telegram_api::inputUser> SecretChatActor::get_input_user() {
  return telegram_api::make_object<telegram_api::inputUser>(auth_state_.user_id, auth_state_.user_access_hash);
}
telegram_api::object_ptr<telegram_api::inputEncryptedChat> SecretChatActor::get_input_chat() {
  return telegram_api::make_object<telegram_api::inputEncryptedChat>(auth_state_.id, auth_state_.access_hash);
}
void SecretChatActor::tear_down() {
  LOG(INFO) << "SecretChatActor: tear_down";
  // TODO notify send update that we are dead
}

Result<std::tuple<uint64, BufferSlice, int32>> SecretChatActor::decrypt(BufferSlice &encrypted_message) {
  MutableSlice data = encrypted_message.as_slice();
  CHECK(is_aligned_pointer<4>(data.data()));
  TRY_RESULT(auth_key_id, mtproto::Transport::read_auth_key_id(data));
  mtproto::AuthKey *auth_key = nullptr;
  if (auth_key_id == pfs_state_.auth_key.id()) {
    auth_key = &pfs_state_.auth_key;
  } else if (auth_key_id == pfs_state_.other_auth_key.id()) {
    auth_key = &pfs_state_.other_auth_key;
  } else {
    return Status::Error(1, PSLICE() << "Unknown " << tag("auth_key_id", format::as_hex(auth_key_id))
                                     << tag("crc", crc64(encrypted_message.as_slice())));
  }

  // expect that message is encrypted with mtproto 2.0 if his layer is at least MTPROTO_2_LAYER
  std::array<int, 2> versions{{1, 2}};
  if (config_state_.his_layer >= MTPROTO_2_LAYER) {
    std::swap(versions[0], versions[1]);
  }

  BufferSlice encrypted_message_copy;
  int32 mtproto_version = -1;
  Result<mtproto::Transport::ReadResult> r_read_result;
  for (size_t i = 0; i < versions.size(); i++) {
    bool is_last = i + 1 == versions.size();
    encrypted_message_copy = encrypted_message.copy();
    data = encrypted_message_copy.as_slice();
    CHECK(is_aligned_pointer<4>(data.data()));

    mtproto::PacketInfo info;
    info.type = mtproto::PacketInfo::EndToEnd;
    mtproto_version = versions[i];
    info.version = mtproto_version;
    info.is_creator = auth_state_.x == 0;
    r_read_result = mtproto::Transport::read(data, *auth_key, &info);
    if (!is_last && r_read_result.is_error()) {
      LOG(WARNING) << tag("mtproto", mtproto_version) << " decryption failed " << r_read_result.error();
      continue;
    }
    break;
  }
  TRY_RESULT(read_result, std::move(r_read_result));
  switch (read_result.type()) {
    case mtproto::Transport::ReadResult::Quickack:
      return Status::Error("Got quickack instead of a message");
    case mtproto::Transport::ReadResult::Error:
      return Status::Error(PSLICE() << "Got mtproto error code instead of a message: " << read_result.error());
    case mtproto::Transport::ReadResult::Nop:
      return Status::Error("Got nop instead of a message");
    case mtproto::Transport::ReadResult::Packet:
      data = read_result.packet();
      break;
    default:
      UNREACHABLE();
  }

  int32 len = as<int32>(data.begin());
  data = data.substr(4, len);
  if (!is_aligned_pointer<4>(data.data())) {
    return std::make_tuple(auth_key_id, BufferSlice(data), mtproto_version);
  } else {
    return std::make_tuple(auth_key_id, encrypted_message_copy.from_slice(data), mtproto_version);
  }
}

Status SecretChatActor::do_inbound_message_encrypted(unique_ptr<logevent::InboundSecretMessage> message) {
  SCOPE_EXIT {
    if (message) {
      message->qts_ack.set_value(Unit());
    }
  };
  TRY_RESULT(decrypted, decrypt(message->encrypted_message));
  auto auth_key_id = std::get<0>(decrypted);
  auto data_buffer = std::move(std::get<1>(decrypted));
  auto mtproto_version = std::get<2>(decrypted);
  message->auth_key_id = auth_key_id;

  TlBufferParser parser(&data_buffer);
  auto id = parser.fetch_int();
  Status status;
  if (id == secret_api::decryptedMessageLayer::ID) {
    auto message_with_layer = secret_api::decryptedMessageLayer::fetch(parser);
    if (!parser.get_error()) {
      auto layer = message_with_layer->layer_;
      if (layer < DEFAULT_LAYER && false /*TODO: fix android app bug? */) {
        LOG(ERROR) << "All or nothing, " << tag("layer", layer) << " is not supported, drop message "
                   << to_string(message_with_layer);
        return Status::OK();
      }
      if (config_state_.his_layer < layer) {
        config_state_.his_layer = layer;
        context_->secret_chat_db()->set_value(config_state_);
        send_update_secret_chat();
      }
      if (layer >= MTPROTO_2_LAYER && mtproto_version < 2) {
        return Status::Error(PSLICE() << "Mtproto 1.0 encryption is forbidden for this layer");
      }
      if (message_with_layer->in_seq_no_ < 0) {
        return Status::Error(PSLICE() << "Invalid seq_no: " << to_string(message_with_layer));
      }
      message->decrypted_message_layer = std::move(message_with_layer);
      return do_inbound_message_decrypted_unchecked(std::move(message));
    } else {
      status = Status::Error(PSLICE() << parser.get_error() << format::as_hex_dump<4>(data_buffer.as_slice()));
    }
  } else {
    status = Status::Error(PSLICE() << "Unknown constructor " << tag("ID", format::as_hex(id)));
  }

  // support for older layer
  LOG(WARNING) << "Failed to Fetch update: " << status;
  send_action(secret_api::make_object<secret_api::decryptedMessageActionNotifyLayer>(MY_LAYER), SendFlag::None,
              Promise<>());

  if (config_state_.his_layer == 8) {
    TlBufferParser new_parser(&data_buffer);
    auto message_without_layer = secret_api::DecryptedMessage::fetch(new_parser);
    if (!new_parser.get_error()) {
      message->decrypted_message_layer = secret_api::make_object<secret_api::decryptedMessageLayer>(
          BufferSlice(), config_state_.his_layer, -1, -1, std::move(message_without_layer));
      return do_inbound_message_decrypted_unchecked(std::move(message));
    }
    LOG(ERROR) << "Failed to fetch update (DecryptedMessage): " << new_parser.get_error()
               << format::as_hex_dump<4>(data_buffer.as_slice());
  }

  return status;
}

Status SecretChatActor::check_seq_no(int in_seq_no, int out_seq_no, int32 his_layer) {
  if (in_seq_no < 0) {
    return Status::OK();
  }
  if (in_seq_no % 2 != (1 - auth_state_.x) || out_seq_no % 2 != auth_state_.x) {
    return Status::Error("Bad seq_no parity");
  }
  in_seq_no /= 2;
  out_seq_no /= 2;
  if (out_seq_no < seq_no_state_.my_in_seq_no) {
    return Status::Error(1, "Old seq_no");
  }
  if (out_seq_no > seq_no_state_.my_in_seq_no) {
    return Status::Error(2, "Gap found!");
  }
  if (in_seq_no < seq_no_state_.his_in_seq_no) {
    return Status::Error("in_seq_no is not monotonic");
  }
  if (seq_no_state_.my_out_seq_no < in_seq_no) {
    return Status::Error("in_seq_no is bigger than seq_no_state_.my_out_seq_no");
  }
  if (his_layer < seq_no_state_.his_layer) {
    return Status::Error("his_layer is not monotonic");
  }

  return Status::OK();
}

Status SecretChatActor::do_inbound_message_decrypted_unchecked(unique_ptr<logevent::InboundSecretMessage> message) {
  SCOPE_EXIT {
    LOG_IF(FATAL, message && message->qts_ack) << "Lost qts_promise";
  };
  auto in_seq_no = message->decrypted_message_layer->in_seq_no_;
  auto out_seq_no = message->decrypted_message_layer->out_seq_no_;
  auto status = check_seq_no(in_seq_no, out_seq_no, message->his_layer());
  if (status.is_error() && status.code() != 2 /* not gap found */) {
    message->qts_ack.set_value(Unit());
    if (message->logevent_id()) {
      LOG(INFO) << "Erase binlog event: " << tag("logevent_id", message->logevent_id());
      binlog_erase(context_->binlog(), message->logevent_id());
    }
    auto warning_message = PSTRING() << status << tag("seq_no_state_.my_in_seq_no", seq_no_state_.my_in_seq_no)
                                     << tag("seq_no_state_.my_out_seq_no", seq_no_state_.my_out_seq_no)
                                     << tag("seq_no_state_.his_in_seq_no", seq_no_state_.his_in_seq_no)
                                     << tag("in_seq_no", in_seq_no) << tag("out_seq_no", out_seq_no)
                                     << to_string(message->decrypted_message_layer);
    if (status.code()) {
      LOG(WARNING) << warning_message;
    } else {
      LOG(ERROR) << warning_message;
    }
    return status;
  }

  if (message->decrypted_message_layer->message_->get_id() == secret_api::decryptedMessageService8::ID) {
    auto old = move_tl_object_as<secret_api::decryptedMessageService8>(message->decrypted_message_layer->message_);
    message->decrypted_message_layer->message_ =
        secret_api::make_object<secret_api::decryptedMessageService>(old->random_id_, std::move(old->action_));
  }

  // Process ActionResend.
  if (message->decrypted_message_layer->message_->get_id() == secret_api::decryptedMessageService::ID) {
    auto *decrypted_message_service =
        static_cast<secret_api::decryptedMessageService *>(message->decrypted_message_layer->message_.get());
    if (decrypted_message_service->action_->get_id() == secret_api::decryptedMessageActionResend::ID) {
      auto *action_resend =
          static_cast<secret_api::decryptedMessageActionResend *>(decrypted_message_service->action_.get());

      uint32 start_seq_no = static_cast<uint32>(action_resend->start_seq_no_ / 2);
      uint32 finish_seq_no = static_cast<uint32>(action_resend->end_seq_no_ / 2);
      if (start_seq_no + MAX_RESEND_COUNT < finish_seq_no) {
        message->qts_ack.set_value(Unit());
        return Status::Error(PSLICE() << "Won't resend more than " << MAX_RESEND_COUNT << " messages");
      }
      LOG(INFO) << "ActionResend: " << tag("start", start_seq_no) << tag("finish_seq_no", finish_seq_no);
      for (auto seq_no = start_seq_no; seq_no <= finish_seq_no; seq_no++) {
        auto it = out_seq_no_to_outbound_message_state_token_.find(seq_no);
        if (it == out_seq_no_to_outbound_message_state_token_.end()) {
          message->qts_ack.set_value(Unit());
          return Status::Error(PSLICE() << "Can't resend query " << tag("seq_no", seq_no));
        }
        auto state_id = it->second;
        outbound_resend(state_id);
      }
      // It is ok to replace action with Noop, because it won't be written to binlog before message is marked unsent
      decrypted_message_service->action_ = secret_api::make_object<secret_api::decryptedMessageActionNoop>();
    }
  }

  LOG(INFO) << "GOT MESSAGE " << to_string(message->decrypted_message_layer);

  if (status.is_error()) {
    CHECK(status.code() == 2);  // gap found
    do_inbound_message_decrypted_pending(std::move(message));
    return Status::OK();
  }

  message->message_id = seq_no_state_.message_id + 1;
  if (in_seq_no != -1) {
    message->my_in_seq_no = out_seq_no / 2 + 1;
    message->my_out_seq_no = seq_no_state_.my_out_seq_no;
    message->his_in_seq_no = in_seq_no / 2;
  }

  return do_inbound_message_decrypted(std::move(message));
}

void SecretChatActor::do_outbound_message_impl(unique_ptr<logevent::OutboundSecretMessage> binlog_event,
                                               Promise<> promise) {
  binlog_event->crc = crc64(binlog_event->encrypted_message.as_slice());
  LOG(INFO) << "Do outbound message: " << *binlog_event << tag("crc", binlog_event->crc);
  auto &state_id_ref = random_id_to_outbound_message_state_token_[binlog_event->random_id];
  LOG_CHECK(state_id_ref == 0) << "Random id collision";
  state_id_ref = outbound_message_states_.create();
  const uint64 state_id = state_id_ref;
  auto *state = outbound_message_states_.get(state_id);
  LOG(INFO) << tag("state_id", state_id);
  CHECK(state);
  state->message = std::move(binlog_event);

  // OutboundSecretMessage
  //
  // 1. [] => Save logevent. [save_logevent]
  // 2. [save_logevent] => Save SeqNoState [save_changes]
  // 3. [save_logevent] => Send NetQuery [send_message]
  //   Note: we have to force binlog to flush
  // 4.0 [send_message]:Fail => rewrite
  // 4. [save_changes; send_message] => Mark logevent as sent [rewrite_logevent]
  // 5. [save_changes; send_message; ack] => [remove_logevent]

  auto message = state->message.get();

  // send_message
  auto send_message_start = PromiseCreator::lambda([actor_id = actor_id(this), state_id](Result<> result) {
    if (result.is_ok()) {
      send_closure(actor_id, &SecretChatActor::on_outbound_send_message_start, state_id);
    } else {
      send_closure(actor_id, &SecretChatActor::on_promise_error, result.move_as_error(),
                   "on_oubound_send_message_start");
    }
  });

  // update seq_no
  update_seq_no_state(*message);

  // process action
  if (message->action) {
    on_outbound_action(*message->action, message->message_id);
  }

  // save_changes
  auto save_changes_finish = PromiseCreator::lambda([actor_id = actor_id(this), state_id](Result<> result) {
    if (result.is_ok()) {
      send_closure(actor_id, &SecretChatActor::on_outbound_save_changes_finish, state_id);
    } else {
      send_closure(actor_id, &SecretChatActor::on_promise_error, result.move_as_error(),
                   "on_outbound_save_chages_finish");
    }
  });

  auto save_changes_start = add_changes(std::move(save_changes_finish));

  // wait for ack
  auto out_seq_no = state->message->my_out_seq_no - 1;
  if (out_seq_no < seq_no_state_.his_in_seq_no) {
    state->ack_flag = true;
  } else {
    out_seq_no_to_outbound_message_state_token_[out_seq_no] = state_id;
  }

  // save_logevent => [send_message; save_changes]
  auto save_logevent_finish = PromiseCreator::join(std::move(send_message_start), std::move(save_changes_start));

  auto logevent_id = state->message->logevent_id();
  if (logevent_id == 0) {
    logevent_id = binlog_add(context_->binlog(), LogEvent::HandlerType::SecretChats, create_storer(*state->message));
    LOG(INFO) << "Outbound secret message [save_logevent] start " << tag("logevent_id", logevent_id);
    context_->binlog()->force_sync(std::move(save_logevent_finish));
    state->message->set_logevent_id(logevent_id);
  } else {
    LOG(INFO) << "Outbound secret message [save_logevent] skip " << tag("logevent_id", logevent_id);
    save_logevent_finish.set_value(Unit());
  }
  promise.set_value(Unit());  // logevent was sent to binlog;
}

void SecretChatActor::on_his_in_seq_no_updated() {
  auto it = begin(out_seq_no_to_outbound_message_state_token_);
  while (it != end(out_seq_no_to_outbound_message_state_token_) && it->first < seq_no_state_.his_in_seq_no) {
    auto token = it->second;
    it = out_seq_no_to_outbound_message_state_token_.erase(it);
    on_outbound_ack(token);
  }
}
void SecretChatActor::on_seq_no_state_changed() {
  seq_no_state_changed_ = true;
}

void SecretChatActor::on_pfs_state_changed() {
  LOG(INFO) << "In on_pfs_state_changed: " << pfs_state_;
  pfs_state_changed_ = true;
}
Promise<> SecretChatActor::add_changes(Promise<> save_changes_finish) {
  StateChange change;
  if (seq_no_state_changed_) {
    change.seq_no_state_change = SeqNoStateChange(seq_no_state_);
    seq_no_state_changed_ = false;
  }
  if (pfs_state_changed_) {
    change.pfs_state_change = PfsStateChange(pfs_state_);
    pfs_state_changed_ = false;
  }

  change.save_changes_finish = std::move(save_changes_finish);
  auto save_changes_start_token = changes_processor_.add(std::move(change));

  return PromiseCreator::lambda([actor_id = actor_id(this), save_changes_start_token](Result<> result) {
    if (result.is_ok()) {
      send_closure(actor_id, &SecretChatActor::on_save_changes_start, save_changes_start_token);
    } else {
      send_closure(actor_id, &SecretChatActor::on_promise_error, result.move_as_error(), "on_save_changes_start");
    }
  });
}

template <class T>
void SecretChatActor::update_seq_no_state(const T &new_seq_no_state) {
  // Some old updates may arrive. Just ignore them
  if (seq_no_state_.message_id >= new_seq_no_state.message_id &&
      seq_no_state_.my_in_seq_no >= new_seq_no_state.my_in_seq_no &&
      seq_no_state_.my_out_seq_no >= new_seq_no_state.my_out_seq_no &&
      seq_no_state_.his_in_seq_no >= new_seq_no_state.his_in_seq_no) {
    return;
  }
  seq_no_state_.message_id = new_seq_no_state.message_id;
  if (new_seq_no_state.my_in_seq_no != -1) {
    LOG(INFO) << "Have my_in_seq_no: " << seq_no_state_.my_in_seq_no << "--->" << new_seq_no_state.my_in_seq_no;
    seq_no_state_.my_in_seq_no = new_seq_no_state.my_in_seq_no;
    seq_no_state_.my_out_seq_no = new_seq_no_state.my_out_seq_no;

    auto new_his_layer = new_seq_no_state.his_layer();
    if (new_his_layer != -1) {
      seq_no_state_.his_layer = new_his_layer;
    }

    if (seq_no_state_.his_in_seq_no != new_seq_no_state.his_in_seq_no) {
      seq_no_state_.his_in_seq_no = new_seq_no_state.his_in_seq_no;
      on_his_in_seq_no_updated();
    }
  }

  return on_seq_no_state_changed();
}

Status SecretChatActor::do_inbound_message_decrypted_pending(unique_ptr<logevent::InboundSecretMessage> message) {
  // Just save logevent if necessary
  auto logevent_id = message->logevent_id();

  // qts
  auto qts_promise = std::move(message->qts_ack);

  if (logevent_id == 0) {
    message->is_pending = true;
    message->set_logevent_id(binlog_add(context_->binlog(), LogEvent::HandlerType::SecretChats, create_storer(*message),
                                        std::move(qts_promise)));
    LOG(INFO) << "Inbound PENDING secret message [save_logevent] start (do not expect finish) "
              << tag("logevent_id", message->logevent_id());
  } else {
    LOG(INFO) << "Inbound PENDING secret message [save_logevent] skip " << tag("logevent_id", logevent_id);
    CHECK(!qts_promise);
  }
  LOG(INFO) << "Inbound PENDING secret message start " << tag("logevent_id", logevent_id) << tag("message", *message);

  auto seq_no = message->decrypted_message_layer->out_seq_no_ / 2;
  pending_inbound_messages_[seq_no] = std::move(message);

  return Status::OK();
}

Status SecretChatActor::do_inbound_message_decrypted(unique_ptr<logevent::InboundSecretMessage> message) {
  // InboundSecretMessage
  //
  // 1. [] => Add logevent. [save_logevent]
  // 2. [save_logevent] => Save SeqNoState [save_changes]
  // 3. [save_logevent] => Add message to MessageManager [save_message]
  //    Note: if we are able to add message by random_id, we may not wait for (logevent). Otherwise we should force
  //    binlog flush.
  // 4. [save_logevent] => Update qts [qts]
  // 5. [save_changes; save_message; ?qts) => Remove logevent [remove_logevent]
  //    Note: It is easier not to wait for qts. In the worst case old update will be handled again after restart.

  auto state_id = inbound_message_states_.create();
  InboundMessageState &state = *inbound_message_states_.get(state_id);

  // save logevent
  auto logevent_id = message->logevent_id();
  bool need_sync = false;
  if (logevent_id == 0) {
    logevent_id = binlog_add(context_->binlog(), LogEvent::HandlerType::SecretChats, create_storer(*message));
    LOG(INFO) << "Inbound secret message [save_logevent] start " << tag("logevent_id", logevent_id);
    need_sync = true;
  } else {
    if (message->is_pending) {
      message->is_pending = false;
      auto old_logevent_id = logevent_id;
      logevent_id = binlog_add(context_->binlog(), LogEvent::HandlerType::SecretChats, create_storer(*message));
      binlog_erase(context_->binlog(), old_logevent_id);
      LOG(INFO) << "Inbound secret message [save_logevent] rewrite (after pending state) "
                << tag("logevent_id", logevent_id) << tag("old_logevent_id", old_logevent_id);
      need_sync = true;
    } else {
      LOG(INFO) << "Inbound secret message [save_logevent] skip " << tag("logevent_id", logevent_id);
    }
  }
  LOG(INFO) << "Inbound secret message start " << tag("logevent_id", logevent_id) << tag("message", *message);
  state.logevent_id = logevent_id;

  // save_message
  auto save_message_finish = PromiseCreator::lambda([actor_id = actor_id(this), state_id](Result<> result) {
    if (result.is_ok()) {
      send_closure(actor_id, &SecretChatActor::on_inbound_save_message_finish, state_id);
    } else {
      send_closure(actor_id, &SecretChatActor::on_promise_error, result.move_as_error(),
                   "on_inbound_save_message_finish");
    }
  });

  // update seq_no
  update_seq_no_state(*message);

  // drop old key
  if (!pfs_state_.other_auth_key.empty() && message->auth_key_id == pfs_state_.auth_key.id() &&
      pfs_state_.can_forget_other_key) {
    LOG(INFO) << "Drop old auth key " << tag("auth_key_id", format::as_hex(pfs_state_.other_auth_key.id()));
    pfs_state_.other_auth_key = mtproto::AuthKey();
    on_pfs_state_changed();
  }

  // qts
  auto qts_promise = std::move(message->qts_ack);

  // process message
  tl_object_ptr<telegram_api::encryptedFile> file;
  if (message->has_encrypted_file) {
    file = message->file.as_encrypted_file();
  }

  if (message->decrypted_message_layer->message_->get_id() == secret_api::decryptedMessage46::ID) {
    auto old = move_tl_object_as<secret_api::decryptedMessage46>(message->decrypted_message_layer->message_);
    old->flags_ &= ~secret_api::decryptedMessage::GROUPED_ID_MASK;  // just in case
    message->decrypted_message_layer->message_ = secret_api::make_object<secret_api::decryptedMessage>(
        old->flags_, old->random_id_, old->ttl_, std::move(old->message_), std::move(old->media_),
        std::move(old->entities_), std::move(old->via_bot_name_), old->reply_to_random_id_, 0);
  }
  if (message->decrypted_message_layer->message_->get_id() == secret_api::decryptedMessageService8::ID) {
    auto old = move_tl_object_as<secret_api::decryptedMessageService8>(message->decrypted_message_layer->message_);
    message->decrypted_message_layer->message_ =
        secret_api::make_object<secret_api::decryptedMessageService>(old->random_id_, std::move(old->action_));
  }

  // NB: message is invalid after this 'move_as'
  // Send update through context_
  // Note, that update may be sent multiple times and should be somehow protected from replay.
  // Luckily all updates seems to be idempotent.
  // We could use ChangesProcessor to mark logevent as sent to context_, but I don't see any advantages of this
  // approach.
  if (message->decrypted_message_layer->message_->get_id() == secret_api::decryptedMessage::ID) {
    auto decrypted_message =
        move_tl_object_as<secret_api::decryptedMessage>(message->decrypted_message_layer->message_);
    context_->on_inbound_message(get_user_id(), MessageId(ServerMessageId(message->message_id)), message->date,
                                 std::move(file), std::move(decrypted_message), std::move(save_message_finish));
  } else if (message->decrypted_message_layer->message_->get_id() == secret_api::decryptedMessageService::ID) {
    auto decrypted_message_service =
        move_tl_object_as<secret_api::decryptedMessageService>(message->decrypted_message_layer->message_);

    auto action = std::move(decrypted_message_service->action_);
    switch (action->get_id()) {
      case secret_api::decryptedMessageActionDeleteMessages::ID:
        // Corresponding logevent won't be deleted before promise returned by add_changes is set.
        context_->on_delete_messages(
            std::move(static_cast<secret_api::decryptedMessageActionDeleteMessages &>(*action).random_ids_),
            std::move(save_message_finish));
        break;
      case secret_api::decryptedMessageActionFlushHistory::ID:
        context_->on_flush_history(MessageId(ServerMessageId(message->message_id)), std::move(save_message_finish));
        break;
      case secret_api::decryptedMessageActionReadMessages::ID: {
        auto &random_ids = static_cast<secret_api::decryptedMessageActionReadMessages &>(*action).random_ids_;
        if (random_ids.size() == 1) {
          context_->on_read_message(random_ids[0], std::move(save_message_finish));
        } else {  // probably never happens
          MultiPromiseActorSafe mpas{"ReadSecretMessagesMultiPromiseActor"};
          mpas.add_promise(std::move(save_message_finish));
          auto lock = mpas.get_promise();
          for (auto random_id : random_ids) {
            context_->on_read_message(random_id, mpas.get_promise());
          }
          lock.set_value(Unit());
        }
        break;
      }
      case secret_api::decryptedMessageActionScreenshotMessages::ID:
        context_->on_screenshot_taken(get_user_id(), MessageId(ServerMessageId(message->message_id)), message->date,
                                      decrypted_message_service->random_id_, std::move(save_message_finish));
        break;
      case secret_api::decryptedMessageActionSetMessageTTL::ID:
        context_->on_set_ttl(get_user_id(), MessageId(ServerMessageId(message->message_id)), message->date,
                             static_cast<secret_api::decryptedMessageActionSetMessageTTL &>(*action).ttl_seconds_,
                             decrypted_message_service->random_id_, std::move(save_message_finish));
        break;
      default:
        /*
decryptedMessageActionResend#511110b0 start_seq_no:int end_seq_no:int = DecryptedMessageAction;
decryptedMessageActionNotifyLayer#f3048883 layer:int = DecryptedMessageAction;
decryptedMessageActionTyping#ccb27641 action:SendMessageAction = DecryptedMessageAction;
decryptedMessageActionRequestKey#f3c9611b exchange_id:long g_a:bytes = DecryptedMessageAction;
decryptedMessageActionAcceptKey#6fe1735b exchange_id:long g_b:bytes key_fingerprint:long = DecryptedMessageAction;
decryptedMessageActionAbortKey#dd05ec6b exchange_id:long = DecryptedMessageAction;
decryptedMessageActionCommitKey#ec2e0b9b exchange_id:long key_fingerprint:long = DecryptedMessageAction;
decryptedMessageActionNoop#a82fdd63 = DecryptedMessageAction;
        */
        save_message_finish.set_value(Unit());
        break;
    }

    state.message_id = message->message_id;
    TRY_STATUS(on_inbound_action(*action, message->message_id));
  } else {
    LOG(ERROR) << "INGORE MESSAGE: " << to_string(message->decrypted_message_layer);
    save_message_finish.set_value(Unit());
  }

  // save_changes
  auto save_changes_finish = PromiseCreator::lambda([actor_id = actor_id(this), state_id](Result<> result) {
    if (result.is_ok()) {
      send_closure(actor_id, &SecretChatActor::on_inbound_save_changes_finish, state_id);
    } else {
      send_closure(actor_id, &SecretChatActor::on_promise_error, result.move_as_error(),
                   "on_inbound_save_changes_finish");
    }
  });
  auto save_changes_start = add_changes(std::move(save_changes_finish));

  // save_logevent
  auto save_logevent_finish = PromiseCreator::join(std::move(save_changes_start), std::move(qts_promise));
  if (need_sync) {
    // TODO: lazy sync is enough
    context_->binlog()->force_sync(std::move(save_logevent_finish));
  } else {
    save_logevent_finish.set_value(Unit());
  }
  return Status::OK();
}

void SecretChatActor::on_save_changes_start(ChangesProcessor<StateChange>::Id save_changes_token) {
  if (close_flag_) {
    return;
  }
  SeqNoStateChange seq_no_state_change;
  PfsStateChange pfs_state_change;
  std::vector<Promise<Unit>> save_changes_finish_promises;
  changes_processor_.finish(save_changes_token, [&](StateChange &&change) {
    save_changes_finish_promises.emplace_back(std::move(change.save_changes_finish));
    if (change.seq_no_state_change) {
      seq_no_state_change = std::move(change.seq_no_state_change);
    }
    if (change.pfs_state_change) {
      pfs_state_change = std::move(change.pfs_state_change);
    }
  });
  if (seq_no_state_change) {
    LOG(INFO) << "SAVE SeqNoState " << seq_no_state_change;
    context_->secret_chat_db()->set_value(seq_no_state_change);
  }
  if (pfs_state_change) {
    LOG(INFO) << "SAVE PfsState " << pfs_state_change;
    saved_pfs_state_message_id_ = pfs_state_change.message_id;
    context_->secret_chat_db()->set_value(pfs_state_change);
  }
  // NB: we may not wait till database is flushed, because every other change will be in the same binlog
  for (auto &save_changes_finish : save_changes_finish_promises) {
    save_changes_finish.set_value(Unit());
  }
}

void SecretChatActor::on_inbound_save_message_finish(uint64 state_id) {
  if (close_flag_) {
    return;
  }
  auto *state = inbound_message_states_.get(state_id);
  CHECK(state);
  LOG(INFO) << "Inbound message [save_message] finish " << tag("logevent_id", state->logevent_id);
  state->save_message_finish = true;
  inbound_loop(state, state_id);
}

void SecretChatActor::on_inbound_save_changes_finish(uint64 state_id) {
  if (close_flag_) {
    return;
  }
  auto *state = inbound_message_states_.get(state_id);
  CHECK(state);
  LOG(INFO) << "Inbound message [save_changes] finish " << tag("logevent_id", state->logevent_id);
  state->save_changes_finish = true;
  inbound_loop(state, state_id);
}

void SecretChatActor::inbound_loop(InboundMessageState *state, uint64 state_id) {
  if (close_flag_) {
    return;
  }
  if (!state->save_changes_finish || !state->save_message_finish) {
    return;
  }
  LOG(INFO) << "Inbound message [remove_logevent] start " << tag("logevent_id", state->logevent_id);
  binlog_erase(context_->binlog(), state->logevent_id);

  inbound_message_states_.erase(state_id);
}

NetQueryPtr SecretChatActor::create_net_query(const logevent::OutboundSecretMessage &message) {
  NetQueryPtr query;
  if (message.is_service) {
    CHECK(message.file.empty());
    query = context_->net_query_creator().create(
        UniqueId::next(UniqueId::Type::Default, static_cast<uint8>(QueryType::Message)),
        create_storer(telegram_api::messages_sendEncryptedService(get_input_chat(), message.random_id,
                                                                  message.encrypted_message.clone())));
    query->total_timeout_limit = 1000000000;  // inf. We will re-sent it immediately anyway.
  } else if (message.file.empty()) {
    query = context_->net_query_creator().create(
        UniqueId::next(UniqueId::Type::Default, static_cast<uint8>(QueryType::Message)),
        create_storer(telegram_api::messages_sendEncrypted(get_input_chat(), message.random_id,
                                                           message.encrypted_message.clone())));
  } else {
    query = context_->net_query_creator().create(
        UniqueId::next(UniqueId::Type::Default, static_cast<uint8>(QueryType::Message)),
        create_storer(telegram_api::messages_sendEncryptedFile(get_input_chat(), message.random_id,
                                                               message.encrypted_message.clone(),
                                                               message.file.as_input_encrypted_file())));
  }
  if (message.is_external && context_->get_config_option_boolean("use_quick_ack")) {
    query->quick_ack_promise_ =
        PromiseCreator::lambda([actor_id = actor_id(this), random_id = message.random_id](
                                   Unit) { send_closure(actor_id, &SecretChatActor::on_send_message_ack, random_id); },
                               PromiseCreator::Ignore());
  }

  return query;
}

void SecretChatActor::on_outbound_send_message_start(uint64 state_id) {
  auto *state = outbound_message_states_.get(state_id);
  if (state == nullptr) {
    LOG(INFO) << "Outbound message [send_message] start ignored (unknown state_id) " << tag("state_id", state_id);
    return;
  }

  auto *message = state->message.get();

  if (!message->is_sent) {
    LOG(INFO) << "Outbound message [send_message] start " << tag("logevent_id", state->message->logevent_id());
    auto query = create_net_query(*message);
    state->net_query_id = query->id();
    state->net_query_ref = query.get_weak();
    state->net_query_may_fail = state->message->is_rewritable;
    context_->send_net_query(std::move(query), actor_shared(this, state_id), true);
  } else {
    LOG(INFO) << "Outbound message [send_message] start dummy " << tag("logevent_id", state->message->logevent_id());
    on_outbound_send_message_finish(state_id);
  }
}

void SecretChatActor::outbound_resend(uint64 state_id) {
  if (close_flag_) {
    return;
  }
  auto *state = outbound_message_states_.get(state_id);
  CHECK(state);

  state->message->is_sent = false;
  state->net_query_id = 0;
  state->net_query_ref = NetQueryRef();
  LOG(INFO) << "Oubound message [resend] " << tag("logevent_id", state->message->logevent_id())
            << tag("state_id", state_id);

  binlog_rewrite(context_->binlog(), state->message->logevent_id(), LogEvent::HandlerType::SecretChats,
                 create_storer(*state->message));
  auto send_message_start = PromiseCreator::lambda([actor_id = actor_id(this), state_id](Result<> result) {
    if (result.is_ok()) {
      send_closure(actor_id, &SecretChatActor::on_outbound_send_message_start, state_id);
    } else {
      send_closure(actor_id, &SecretChatActor::on_promise_error, result.move_as_error(),
                   "on_outbound_send_message_start");
    }
  });
  context_->binlog()->force_sync(std::move(send_message_start));
}

Status SecretChatActor::outbound_rewrite_with_empty(uint64 state_id) {
  if (close_flag_) {
    return Status::OK();
  }
  auto *state = outbound_message_states_.get(state_id);
  if (state == nullptr || !state->message->is_rewritable) {
    return Status::OK();
  }
  cancel_query(state->net_query_ref);

  MutableSlice data = state->message->encrypted_message.as_slice();
  CHECK(is_aligned_pointer<4>(data.data()));

  // Rewrite with delete itself.
  tl_object_ptr<secret_api::DecryptedMessage> message = secret_api::make_object<secret_api::decryptedMessageService>(
      state->message->random_id, secret_api::make_object<secret_api::decryptedMessageActionDeleteMessages>(
                                     std::vector<int64>{static_cast<int64>(state->message->random_id)}));

  TRY_RESULT(encrypted_message, create_encrypted_message(current_layer(), state->message->my_in_seq_no,
                                                         state->message->my_out_seq_no, message));
  state->message->encrypted_message = std::move(encrypted_message);
  LOG(INFO) << tag("crc", crc64(state->message->encrypted_message.as_slice()));
  state->message->is_rewritable = false;
  state->message->is_external = false;
  state->message->is_service = true;
  state->message->file = logevent::EncryptedInputFile::from_input_encrypted_file(nullptr);
  binlog_rewrite(context_->binlog(), state->message->logevent_id(), LogEvent::HandlerType::SecretChats,
                 create_storer(*state->message));
  return Status::OK();
}

void SecretChatActor::on_outbound_send_message_result(NetQueryPtr query, Promise<NetQueryPtr> resend_promise) {
  if (close_flag_) {
    return;
  }
  auto state_id = get_link_token();
  auto *state = outbound_message_states_.get(state_id);
  if (!state) {
    LOG(INFO) << "Ignore old net query result " << tag("state_id", state_id);
    query->clear();
    return;
  }
  CHECK(state);
  if (state->net_query_id != query->id()) {
    LOG(INFO) << "Ignore old net query result " << tag("logevent_id", state->message->logevent_id())
              << tag("query_id", query->id()) << tag("state_query_id", state->net_query_id) << query;
    query->clear();
    return;
  }

  state->net_query_id = 0;
  state->net_query_ref = NetQueryRef();

  auto r_result = fetch_result<telegram_api::messages_sendEncrypted>(std::move(query));
  if (r_result.is_error()) {
    auto error = r_result.move_as_error();

    auto send_message_error_promise =
        PromiseCreator::lambda([actor_id = actor_id(this), state_id, error = error.clone(),
                                resend_promise = std::move(resend_promise)](Result<> result) mutable {
          if (result.is_ok()) {
            send_closure(actor_id, &SecretChatActor::on_outbound_send_message_error, state_id, std::move(error),
                         std::move(resend_promise));
          } else {
            send_closure(actor_id, &SecretChatActor::on_promise_error, result.move_as_error(),
                         "on_outbound_send_message_error");
          }
        });

    if (state->message->is_external) {
      LOG(INFO) << "Outbound secret message [send_message] failed, rewrite it with dummy "
                << tag("logevent_id", state->message->logevent_id()) << tag("error", error);
      state->send_result_ = [this, random_id = state->message->random_id, error_code = error.code(),
                             error_message = error.message()](Promise<> promise) {
        this->context_->on_send_message_error(random_id, Status::Error(error_code, error_message), std::move(promise));
      };
      state->send_result_(std::move(send_message_error_promise));
    } else {
      // Just resend.
      LOG(INFO) << "Outbound secret message [send_message] failed, resend it "
                << tag("logevent_id", state->message->logevent_id()) << tag("error", error);
      send_message_error_promise.set_value(Unit());
    }
    return;
  }

  auto result = r_result.move_as_ok();
  LOG(INFO) << "Got messages_sendEncrypted result: " << tag("message_id", state->message->message_id)
            << tag("random_id", state->message->random_id) << to_string(*result);

  auto send_message_finish_promise = PromiseCreator::lambda([actor_id = actor_id(this), state_id](Result<> result) {
    if (result.is_ok()) {
      send_closure(actor_id, &SecretChatActor::on_outbound_send_message_finish, state_id);
    } else {
      send_closure(actor_id, &SecretChatActor::on_promise_error, result.move_as_error(),
                   "on_outbound_send_message_finish");
    }
  });

  if (state->message->is_external) {
    switch (result->get_id()) {
      case telegram_api::messages_sentEncryptedMessage::ID: {
        auto sent = move_tl_object_as<telegram_api::messages_sentEncryptedMessage>(result);
        state->send_result_ = [this, random_id = state->message->random_id,
                               message_id = MessageId(ServerMessageId(state->message->message_id)),
                               date = sent->date_](Promise<> promise) {
          this->context_->on_send_message_ok(random_id, message_id, date, nullptr, std::move(promise));
        };
        state->send_result_(std::move(send_message_finish_promise));
        return;
      }
      case telegram_api::messages_sentEncryptedFile::ID: {
        auto sent = move_tl_object_as<telegram_api::messages_sentEncryptedFile>(result);
        std::function<telegram_api::object_ptr<telegram_api::EncryptedFile>()> get_file;
        telegram_api::downcast_call(
            *sent->file_, overloaded(
                              [&](telegram_api::encryptedFileEmpty &) {
                                state->message->file = logevent::EncryptedInputFile::from_input_encrypted_file(
                                    telegram_api::inputEncryptedFileEmpty());
                                get_file = [] { return telegram_api::make_object<telegram_api::encryptedFileEmpty>(); };
                              },
                              [&](telegram_api::encryptedFile &file) {
                                state->message->file = logevent::EncryptedInputFile::from_input_encrypted_file(
                                    telegram_api::inputEncryptedFile(file.id_, file.access_hash_));
                                get_file = [id = file.id_, access_hash = file.access_hash_, size = file.size_,
                                            dc_id = file.dc_id_, key_fingerprint = file.key_fingerprint_] {
                                  return telegram_api::make_object<telegram_api::encryptedFile>(id, access_hash, size,
                                                                                                dc_id, key_fingerprint);
                                };
                              }));

        state->send_result_ = [this, random_id = state->message->random_id,
                               message_id = MessageId(ServerMessageId(state->message->message_id)), date = sent->date_,
                               get_file = std::move(get_file)](Promise<> promise) {
          this->context_->on_send_message_ok(random_id, message_id, date, get_file(), std::move(promise));
        };
        state->send_result_(std::move(send_message_finish_promise));
        return;
      }
    }
  }
  send_message_finish_promise.set_value(Unit());
}

void SecretChatActor::on_outbound_send_message_error(uint64 state_id, Status error,
                                                     Promise<NetQueryPtr> resend_promise) {
  if (close_flag_) {
    return;
  }
  if (context_->close_flag()) {
    return;
  }
  auto *state = outbound_message_states_.get(state_id);
  if (!state) {
    return;
  }
  bool need_sync = false;
  if (state->net_query_may_fail) {
    // message could be already non-rewritable, if it was deleted during NetQuery execution.
    if (state->message->is_rewritable) {
      delete_message(state->message->random_id, Promise<>());
      // state pointer may be invalidated
      state = outbound_message_states_.get(state_id);
      need_sync = true;
    }
  } else {
    bool should_fail = false;
    if (error.code() == 429) {
      should_fail = false;
    } else if (error.code() == 400 && error.message() == "ENCRYPTION_DECLINED") {
      should_fail = true;
    } else {
      LOG(ERROR) << "Got unknown error for encrypted service message: " << error;
      should_fail = true;
    }
    if (should_fail) {
      return on_fatal_error(std::move(error));
    }
  }
  auto query = create_net_query(*state->message);
  state->net_query_id = query->id();

  CHECK(resend_promise);
  auto send_message_start =
      PromiseCreator::lambda([actor_id = actor_id(this), resend_promise = std::move(resend_promise),
                              query = std::move(query)](Result<> result) mutable {
        if (result.is_ok()) {
          resend_promise.set_value(std::move(query));
        } else {
          send_closure(actor_id, &SecretChatActor::on_promise_error, result.move_as_error(), "resend_query");
        }
      });
  if (need_sync) {
    context_->binlog()->force_sync(std::move(send_message_start));
  } else {
    send_message_start.set_value(Unit());
  }
}

void SecretChatActor::on_outbound_send_message_finish(uint64 state_id) {
  if (close_flag_) {
    return;
  }
  auto *state = outbound_message_states_.get(state_id);
  if (!state) {
    return;
  }
  LOG(INFO) << "Outbound secret message [send_message] finish " << tag("logevent_id", state->message->logevent_id());
  state->send_message_finish_flag = true;
  state->outer_send_message_finish.set_value(Unit());

  outbound_loop(state, state_id);
}

void SecretChatActor::on_outbound_save_changes_finish(uint64 state_id) {
  if (close_flag_) {
    return;
  }
  auto *state = outbound_message_states_.get(state_id);
  CHECK(state);
  LOG(INFO) << "Outbound secret message [save_changes] finish " << tag("logevent_id", state->message->logevent_id());
  state->save_changes_finish_flag = true;
  outbound_loop(state, state_id);
}

void SecretChatActor::on_outbound_ack(uint64 state_id) {
  if (close_flag_) {
    return;
  }
  auto *state = outbound_message_states_.get(state_id);
  CHECK(state);
  LOG(INFO) << "Outbound secret message [ack] finish " << tag("logevent_id", state->message->logevent_id());
  state->ack_flag = true;
  outbound_loop(state, state_id);
}

void SecretChatActor::on_outbound_outer_send_message_promise(uint64 state_id, Promise<> promise) {
  if (close_flag_) {
    promise.set_error(Status::Error(400, "Chat is closed"));
    return;
  }
  auto *state = outbound_message_states_.get(state_id);
  CHECK(state);
  LOG(INFO) << "Outbound secret message [TODO] " << tag("logevent_id", state->message->logevent_id());
  promise.set_value(Unit());  // Seems like this message is at least stored to binlog already
  if (state->send_result_) {
    state->send_result_({});
  } else {
    context_->on_send_message_error(state->message->random_id, Status::Error(400, "Message has already been sent"),
                                    Auto());
  }
}

void SecretChatActor::outbound_loop(OutboundMessageState *state, uint64 state_id) {
  if (close_flag_) {
    return;
  }
  if (state->save_changes_finish_flag /*&& state->send_message_finish_flag*/ && state->ack_flag) {
    LOG(INFO) << "Outbound message [remove_logevent] start " << tag("logevent_id", state->message->logevent_id());
    binlog_erase(context_->binlog(), state->message->logevent_id());

    random_id_to_outbound_message_state_token_.erase(state->message->random_id);
    LOG(INFO) << "Outbound message finish (lazy) " << tag("logevent_id", state->message->logevent_id());
    outbound_message_states_.erase(state_id);
    return;
  }

  if (state->save_changes_finish_flag && state->send_message_finish_flag &&
      !state->message->is_sent) {  // [rewrite_logevent]
    LOG(INFO) << "Outbound message [rewrite_logevent] start " << tag("logevent_id", state->message->logevent_id());
    state->message->is_sent = true;
    binlog_rewrite(context_->binlog(), state->message->logevent_id(), LogEvent::HandlerType::SecretChats,
                   create_storer(*state->message));
  }
}

template <class T>
Status SecretChatActor::save_common_info(T &update) {
  if (auth_state_.id != update.id_) {
    return Status::Error(PSLICE() << "chat_id mismatch: " << tag("mine", auth_state_.id) << tag("outer", update.id_));
  }
  auth_state_.id = update.id_;
  auth_state_.access_hash = update.access_hash_;
  return Status::OK();
}

Status SecretChatActor::on_update_chat(telegram_api::encryptedChatRequested &update) {
  if (auth_state_.state != State::Empty) {
    LOG(WARNING) << "Unexpected ChatRequested ignored: " << to_string(update);
    return Status::OK();
  }
  auth_state_.state = State::SendAccept;
  auth_state_.x = 1;
  auth_state_.user_id = update.admin_id_;
  auth_state_.date = context_->unix_time();
  TRY_STATUS(save_common_info(update));
  auth_state_.handshake.set_g_a(update.g_a_.as_slice());
  send_update_secret_chat();
  return Status::OK();
}
Status SecretChatActor::on_update_chat(telegram_api::encryptedChatEmpty &update) {
  return Status::OK();
}
Status SecretChatActor::on_update_chat(telegram_api::encryptedChatWaiting &update) {
  if (auth_state_.state != State::WaitRequestResponse && auth_state_.state != State::WaitAcceptResponse) {
    LOG(WARNING) << "Unexpected ChatWaiting ignored";
    return Status::OK();
  }
  TRY_STATUS(save_common_info(update));
  send_update_secret_chat();
  return Status::OK();
}
Status SecretChatActor::on_update_chat(telegram_api::encryptedChat &update) {
  if (auth_state_.state != State::WaitRequestResponse && auth_state_.state != State::WaitAcceptResponse) {
    LOG(WARNING) << "Unexpected Chat ignored";
    return Status::OK();
  }
  TRY_STATUS(save_common_info(update));
  if (auth_state_.state == State::WaitRequestResponse) {
    auth_state_.handshake.set_g_a(update.g_a_or_b_.as_slice());
    TRY_STATUS(auth_state_.handshake.run_checks(true, context_->dh_callback()));
    auto id_and_key = auth_state_.handshake.gen_key();
    pfs_state_.auth_key = mtproto::AuthKey(id_and_key.first, std::move(id_and_key.second));
    calc_key_hash();
  }
  if (static_cast<int64>(pfs_state_.auth_key.id()) != update.key_fingerprint_) {
    return Status::Error("Key fingerprint mismatch");
  }
  auth_state_.state = State::Ready;
  if (create_logevent_id_ != 0) {
    binlog_erase(context_->binlog(), create_logevent_id_);
    create_logevent_id_ = 0;
  }

  // NB: order is important
  context_->secret_chat_db()->set_value(pfs_state_);
  context_->secret_chat_db()->set_value(auth_state_);
  LOG(INFO) << "OK! Ready!";
  send_update_secret_chat();
  send_action(secret_api::make_object<secret_api::decryptedMessageActionNotifyLayer>(MY_LAYER), SendFlag::None,
              Promise<>());
  return Status::OK();
}
Status SecretChatActor::on_update_chat(telegram_api::encryptedChatDiscarded &update) {
  return Status::Error("Chat discarded");
}

Status SecretChatActor::on_update_chat(NetQueryPtr query) {
  static_assert(std::is_same<telegram_api::messages_requestEncryption::ReturnType,
                             telegram_api::messages_acceptEncryption::ReturnType>::value,
                "");
  TRY_RESULT(config, fetch_result<telegram_api::messages_requestEncryption>(std::move(query)));
  TRY_STATUS(on_update_chat(std::move(config)));
  if (auth_state_.state == State::WaitRequestResponse) {
    context_->secret_chat_db()->set_value(auth_state_);
    context_->binlog()->force_sync(Promise<>());
  }
  return Status::OK();
}

Status SecretChatActor::on_update_chat(telegram_api::object_ptr<telegram_api::EncryptedChat> chat) {
  Status res;
  downcast_call(*chat, [&](auto &obj) { res = this->on_update_chat(obj); });
  return res;
}

Status SecretChatActor::on_read_history(NetQueryPtr query) {
  if (query.generation() == read_history_query_.generation()) {
    read_history_query_ = NetQueryRef();
    read_history_promise_.set_value(Unit());
  }
  return Status::OK();
}

void SecretChatActor::start_up() {
  LOG(INFO) << "SecretChatActor: start_up";
  // auto start = Time::now();
  auto r_auth_state = context_->secret_chat_db()->get_value<AuthState>();
  if (r_auth_state.is_ok()) {
    auth_state_ = r_auth_state.move_as_ok();
  }
  if (!can_be_empty_ && auth_state_.state == State::Empty) {
    LOG(WARNING) << "Close Secret chat because it is empty";
    return stop();
  }
  if (auth_state_.state == State::Closed) {
    close_flag_ = true;
  }
  auto r_seq_no_state = context_->secret_chat_db()->get_value<SeqNoState>();
  if (r_seq_no_state.is_ok()) {
    seq_no_state_ = r_seq_no_state.move_as_ok();
  }
  auto r_config_state = context_->secret_chat_db()->get_value<ConfigState>();
  if (r_config_state.is_ok()) {
    config_state_ = r_config_state.move_as_ok();
  }
  auto r_pfs_state = context_->secret_chat_db()->get_value<PfsState>();
  if (r_pfs_state.is_ok()) {
    pfs_state_ = r_pfs_state.move_as_ok();
  }
  saved_pfs_state_message_id_ = pfs_state_.message_id;
  pfs_state_.last_timestamp = Time::now();

  send_update_secret_chat();
  get_dh_config();

  // auto end = Time::now();
  // CHECK(end - start < 0.2);
  LOG(INFO) << "In start_up with SeqNoState=" << seq_no_state_;
  LOG(INFO) << "In start_up with PfsState=" << pfs_state_;
}

void SecretChatActor::get_dh_config() {
  if (auth_state_.state != State::Empty) {
    return;
  }

  auto dh_config = context_->dh_config();
  if (dh_config) {
    auth_state_.dh_config = *dh_config;
  }

  auto version = auth_state_.dh_config.version;
  int random_length = 0;
  telegram_api::messages_getDhConfig tl_query(version, random_length);

  auto query = context_->net_query_creator().create(
      UniqueId::next(UniqueId::Type::Default, static_cast<uint8>(QueryType::DhConfig)), create_storer(tl_query));
  context_->send_net_query(std::move(query), actor_shared(this), false);
}

Status SecretChatActor::on_dh_config(NetQueryPtr query) {
  LOG(INFO) << "Got dh config";
  TRY_RESULT(config, fetch_result<telegram_api::messages_getDhConfig>(std::move(query)));
  downcast_call(*config, [&](auto &obj) { this->on_dh_config(obj); });
  TRY_STATUS(DhHandshake::check_config(auth_state_.dh_config.g, auth_state_.dh_config.prime, context_->dh_callback()));
  auth_state_.handshake.set_config(auth_state_.dh_config.g, auth_state_.dh_config.prime);
  return Status::OK();
}

void SecretChatActor::on_dh_config(telegram_api::messages_dhConfigNotModified &dh_not_modified) {
  Random::add_seed(dh_not_modified.random_.as_slice());
}

void SecretChatActor::on_dh_config(telegram_api::messages_dhConfig &dh) {
  auto dh_config = std::make_shared<DhConfig>();
  dh_config->version = dh.version_;
  dh_config->prime = dh.p_.as_slice().str();
  dh_config->g = dh.g_;
  Random::add_seed(dh.random_.as_slice());
  auth_state_.dh_config = *dh_config;
  context_->set_dh_config(dh_config);
}

void SecretChatActor::calc_key_hash() {
  unsigned char sha1_buf[20];
  auto sha1_slice = Slice(sha1_buf, 20);
  sha1(pfs_state_.auth_key.key(), sha1_buf);

  unsigned char sha256_buf[32];
  auto sha256_slice = MutableSlice(sha256_buf, 32);
  sha256(pfs_state_.auth_key.key(), sha256_slice);

  auth_state_.key_hash = sha1_slice.truncate(16).str() + sha256_slice.truncate(20).str();
}

void SecretChatActor::send_update_secret_chat() {
  if (auth_state_.state == State::Empty) {
    return;
  }
  SecretChatState state;
  if (auth_state_.state == State::Ready) {
    state = SecretChatState::Active;
  } else if (auth_state_.state == State::Closed) {
    state = SecretChatState::Closed;
  } else {
    state = SecretChatState::Waiting;
  }
  context_->on_update_secret_chat(auth_state_.access_hash, get_user_id(), state, auth_state_.x == 0, config_state_.ttl,
                                  auth_state_.date, auth_state_.key_hash, current_layer());
}

void SecretChatActor::on_outbound_action(secret_api::decryptedMessageActionSetMessageTTL &set_ttl) {
  config_state_.ttl = set_ttl.ttl_seconds_;
  context_->secret_chat_db()->set_value(config_state_);
  send_update_secret_chat();
}
void SecretChatActor::on_outbound_action(secret_api::decryptedMessageActionReadMessages &read_messages) {
  // TODO
}
void SecretChatActor::on_outbound_action(secret_api::decryptedMessageActionDeleteMessages &delete_messages) {
  // Corresponding logevent won't be deleted before promise returned by add_changes is set.
  on_delete_messages(delete_messages.random_ids_).ensure();
}
void SecretChatActor::on_outbound_action(secret_api::decryptedMessageActionScreenshotMessages &screenshot) {
  // noting to do
}
void SecretChatActor::on_outbound_action(secret_api::decryptedMessageActionFlushHistory &flush_history) {
  on_flush_history(pfs_state_.message_id).ensure();
}
void SecretChatActor::on_outbound_action(secret_api::decryptedMessageActionResend &resend) {
  if (seq_no_state_.resend_end_seq_no < resend.end_seq_no_ / 2) {  // replay protection
    seq_no_state_.resend_end_seq_no = resend.end_seq_no_ / 2;
    on_seq_no_state_changed();
  }
}
void SecretChatActor::on_outbound_action(secret_api::decryptedMessageActionNotifyLayer &notify_layer) {
  config_state_.my_layer = notify_layer.layer_;
  context_->secret_chat_db()->set_value(config_state_);
}
void SecretChatActor::on_outbound_action(secret_api::decryptedMessageActionTyping &typing) {
  // noop
}

Status SecretChatActor::on_inbound_action(secret_api::decryptedMessageActionSetMessageTTL &set_ttl) {
  config_state_.ttl = set_ttl.ttl_seconds_;
  context_->secret_chat_db()->set_value(config_state_);
  send_update_secret_chat();
  return Status::OK();
}
Status SecretChatActor::on_inbound_action(secret_api::decryptedMessageActionReadMessages &read_messages) {
  // TODO
  return Status::OK();
}
Status SecretChatActor::on_inbound_action(secret_api::decryptedMessageActionDeleteMessages &delete_messages) {
  return on_delete_messages(delete_messages.random_ids_);
}
Status SecretChatActor::on_inbound_action(secret_api::decryptedMessageActionScreenshotMessages &screenshot) {
  // TODO
  return Status::OK();
}
Status SecretChatActor::on_inbound_action(secret_api::decryptedMessageActionFlushHistory &screenshot) {
  return on_flush_history(pfs_state_.message_id);
}
Status SecretChatActor::on_inbound_action(secret_api::decryptedMessageActionResend &resend) {
  return Status::OK();
}
Status SecretChatActor::on_inbound_action(secret_api::decryptedMessageActionNotifyLayer &notify_layer) {
  if (notify_layer.layer_ > config_state_.his_layer) {
    config_state_.his_layer = notify_layer.layer_;
    context_->secret_chat_db()->set_value(config_state_);
    send_update_secret_chat();
  }
  return Status::OK();
}
Status SecretChatActor::on_inbound_action(secret_api::decryptedMessageActionTyping &typing) {
  // noop
  return Status::OK();
}

// Perfect Forward Secrecy
void SecretChatActor::on_outbound_action(secret_api::decryptedMessageActionRequestKey &request_key) {
  LOG_CHECK(pfs_state_.state == PfsState::WaitSendRequest || pfs_state_.state == PfsState::SendRequest) << pfs_state_;
  pfs_state_.state = PfsState::WaitRequestResponse;
  on_pfs_state_changed();
}
void SecretChatActor::on_outbound_action(secret_api::decryptedMessageActionAcceptKey &accept_key) {
  CHECK(pfs_state_.state == PfsState::WaitSendAccept || pfs_state_.state == PfsState::SendAccept);
  pfs_state_.state = PfsState::WaitAcceptResponse;
  pfs_state_.handshake = DhHandshake();
  on_pfs_state_changed();
}
void SecretChatActor::on_outbound_action(secret_api::decryptedMessageActionAbortKey &abort_key) {
  // TODO
  LOG(FATAL) << "TODO";
}
void SecretChatActor::on_outbound_action(secret_api::decryptedMessageActionCommitKey &commit_key) {
  CHECK(pfs_state_.state == PfsState::WaitSendCommit || pfs_state_.state == PfsState::SendCommit);

  CHECK(static_cast<int64>(pfs_state_.other_auth_key.id()) == commit_key.key_fingerprint_);
  std::swap(pfs_state_.auth_key, pfs_state_.other_auth_key);
  pfs_state_.can_forget_other_key = true;

  pfs_state_.state = PfsState::Empty;
  pfs_state_.last_message_id = pfs_state_.message_id;
  pfs_state_.last_timestamp = Time::now();
  pfs_state_.last_out_seq_no = seq_no_state_.my_out_seq_no;

  on_pfs_state_changed();
}
void SecretChatActor::on_outbound_action(secret_api::decryptedMessageActionNoop &noop) {
  // noop
}

// decryptedMessageActionRequestKey#f3c9611b exchange_id:long g_a:bytes = DecryptedMessageAction;
Status SecretChatActor::on_inbound_action(secret_api::decryptedMessageActionRequestKey &request_key) {
  if (pfs_state_.state == PfsState::WaitRequestResponse || pfs_state_.state == PfsState::SendRequest) {
    if (pfs_state_.exchange_id > request_key.exchange_id_) {
      LOG(INFO) << "RequestKey: silently abort his request";
      return Status::OK();
    } else {
      pfs_state_.state = PfsState::Empty;
      if (pfs_state_.exchange_id == request_key.exchange_id_) {
        context_->secret_chat_db()->set_value(pfs_state_);
        LOG(WARNING) << "RequestKey: silently abort both requests (almost impossible)";
        return Status::OK();
      }
    }
  }

  if (pfs_state_.state != PfsState::Empty) {
    return Status::Error("Unexpected RequestKey");
  }
  LOG_CHECK(pfs_state_.other_auth_key.empty()) << "TODO: got requestKey, before old key is dropped";
  pfs_state_.state = PfsState::SendAccept;
  pfs_state_.handshake = DhHandshake();
  pfs_state_.exchange_id = request_key.exchange_id_;
  pfs_state_.handshake.set_config(auth_state_.dh_config.g, auth_state_.dh_config.prime);
  pfs_state_.handshake.set_g_a(request_key.g_a_.as_slice());
  TRY_STATUS(pfs_state_.handshake.run_checks(true, context_->dh_callback()));
  auto id_and_key = pfs_state_.handshake.gen_key();

  pfs_state_.other_auth_key = mtproto::AuthKey(id_and_key.first, std::move(id_and_key.second));
  pfs_state_.can_forget_other_key = false;
  pfs_state_.wait_message_id = pfs_state_.message_id;

  on_pfs_state_changed();
  return Status::OK();
}

// decryptedMessageActionAcceptKey#6fe1735b exchange_id:long g_b:bytes key_fingerprint:long = DecryptedMessageAction;
Status SecretChatActor::on_inbound_action(secret_api::decryptedMessageActionAcceptKey &accept_key) {
  if (pfs_state_.state != PfsState::WaitRequestResponse) {
    return Status::Error("AcceptKey: unexpected");
  }
  if (pfs_state_.exchange_id != accept_key.exchange_id_) {
    return Status::Error("AcceptKey: exchange_id mismatch");
  }
  pfs_state_.handshake.set_g_a(accept_key.g_b_.as_slice());
  TRY_STATUS(pfs_state_.handshake.run_checks(true, context_->dh_callback()));
  auto id_and_key = pfs_state_.handshake.gen_key();
  if (static_cast<int64>(id_and_key.first) != accept_key.key_fingerprint_) {
    return Status::Error("AcceptKey: key_fingerprint mismatch");
  }
  pfs_state_.state = PfsState::SendCommit;
  pfs_state_.handshake = DhHandshake();
  CHECK(pfs_state_.can_forget_other_key || static_cast<int64>(pfs_state_.other_auth_key.id()) == id_and_key.first);
  pfs_state_.other_auth_key = mtproto::AuthKey(id_and_key.first, std::move(id_and_key.second));
  pfs_state_.can_forget_other_key = false;
  pfs_state_.wait_message_id = pfs_state_.message_id;

  on_pfs_state_changed();
  return Status::OK();
}
Status SecretChatActor::on_inbound_action(secret_api::decryptedMessageActionAbortKey &abort_key) {
  if (pfs_state_.exchange_id != abort_key.exchange_id_) {
    LOG(INFO) << "AbortKey: exchange_id mismatch: " << tag("my exchange_id", pfs_state_.exchange_id)
              << to_string(abort_key);
    return Status::OK();
  }
  if (pfs_state_.state != PfsState::WaitRequestResponse) {
    return Status::Error("AbortKey: unexpected");
  }
  pfs_state_.state = PfsState::Empty;
  pfs_state_.handshake = DhHandshake();

  on_pfs_state_changed();
  return Status::OK();
}
Status SecretChatActor::on_inbound_action(secret_api::decryptedMessageActionCommitKey &commit_key) {
  if (pfs_state_.state != PfsState::WaitAcceptResponse) {
    return Status::Error("CommitKey: unexpected");
  }
  if (pfs_state_.exchange_id != commit_key.exchange_id_) {
    return Status::Error("CommitKey: exchange_id mismatch ");
  }

  CHECK(!pfs_state_.can_forget_other_key);
  if (static_cast<int64>(pfs_state_.other_auth_key.id()) != commit_key.key_fingerprint_) {
    return Status::Error("CommitKey: fingerprint mismatch");
  }
  std::swap(pfs_state_.auth_key, pfs_state_.other_auth_key);
  pfs_state_.can_forget_other_key = true;

  pfs_state_.state = PfsState::Empty;
  pfs_state_.last_message_id = pfs_state_.message_id;
  pfs_state_.last_timestamp = Time::now();
  pfs_state_.last_out_seq_no = seq_no_state_.my_out_seq_no;

  on_pfs_state_changed();
  return Status::OK();
}
Status SecretChatActor::on_inbound_action(secret_api::decryptedMessageActionNoop &noop) {
  // noop
  return Status::OK();
}

Status SecretChatActor::on_inbound_action(secret_api::DecryptedMessageAction &action, int32 message_id) {
  // Action may be not about PFS, but we still can use pfs_state_.message_id
  if (message_id <= pfs_state_.message_id) {  // replay protection
    LOG(INFO) << "Drop old inbound DecryptedMessageAction: " << to_string(action) << tag("message_id", message_id)
              << tag("known_message_id", pfs_state_.message_id);
    return Status::OK();
  }

  // if message_id < seq_no_state_.message_id, then SeqNoState with message_id bigger than current message_id is already saved.
  // And event corresponding to message_id is saved too.
  //
  // Also, if SeqNoState with message_id greater than current message_id is not saved, then corresponding action will be
  // replayed.
  //
  // This works only for ttl, not for pfs. Same ttl action may be processed twice.
  if (message_id < seq_no_state_.message_id) {
    LOG(INFO) << "Drop old inbound DecryptedMessageAction (non-pfs action): " << to_string(action);
    return Status::OK();
  }
  pfs_state_.message_id = message_id;  // replay protection

  LOG(INFO) << "In on_inbound_action: " << to_string(action);
  Status res;
  downcast_call(action, [&](auto &obj) { res = this->on_inbound_action(obj); });
  return res;
}

void SecretChatActor::on_outbound_action(secret_api::DecryptedMessageAction &action, int32 message_id) {
  // Action may be not about PFS, but we still can use pfs_state_.message_id
  if (message_id <= pfs_state_.message_id) {  // replay protection
    LOG(INFO) << "Drop old outbound DecryptedMessageAction: " << to_string(action);
    return;
  }

  // see comment in on_inbound_action
  if (message_id < seq_no_state_.message_id) {
    LOG(INFO) << "Drop old outbound DecryptedMessageAction (non-pfs action): " << to_string(action);
    return;
  }
  pfs_state_.message_id = message_id;  // replay protection

  LOG(INFO) << "In on_outbound_action: " << to_string(action);
  downcast_call(action, [&](auto &obj) { this->on_outbound_action(obj); });
}

// decryptedMessageActionRequestKey#f3c9611b exchange_id:long g_a:bytes = DecryptedMessageAction;
void SecretChatActor::request_new_key() {
  CHECK(!auth_state_.dh_config.empty());

  pfs_state_.state = PfsState::SendRequest;
  pfs_state_.handshake = DhHandshake();
  pfs_state_.handshake.set_config(auth_state_.dh_config.g, auth_state_.dh_config.prime);
  pfs_state_.exchange_id = Random::secure_int64();

  // NB: must save explicitly
  LOG(INFO) << "SAVE PfsState " << pfs_state_;
  context_->secret_chat_db()->set_value(pfs_state_);
}

void SecretChatActor::on_promise_error(Status error, string desc) {
  if (context_->close_flag()) {
    LOG(DEBUG) << "Ignore " << tag("promise", desc) << error;
    return;
  }
  LOG(FATAL) << "Failed: " << tag("promise", desc) << error;
}

constexpr int32 SecretChatActor::MAX_RESEND_COUNT;

}  // namespace td