//
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2020
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include "td/telegram/Global.h"
#include "td/telegram/MessageId.h"
#include "td/telegram/SecretChatActor.h"
#include "td/telegram/SecretChatId.h"

#include "td/telegram/secret_api.h"
#include "td/telegram/telegram_api.h"

#include "td/actor/actor.h"
#include "td/actor/PromiseFuture.h"

#include "td/db/binlog/BinlogInterface.h"
#include "td/db/binlog/detail/BinlogEventsProcessor.h"
#include "td/db/BinlogKeyValue.h"
#include "td/db/DbKey.h"

#include "td/mtproto/DhHandshake.h"
#include "td/mtproto/utils.h"

#include "td/tl/tl_object_parse.h"
#include "td/tl/tl_object_store.h"

#include "td/utils/base64.h"
#include "td/utils/buffer.h"
#include "td/utils/common.h"
#include "td/utils/crypto.h"
#include "td/utils/format.h"
#include "td/utils/Gzip.h"
#include "td/utils/logging.h"
#include "td/utils/misc.h"
#include "td/utils/Random.h"
#include "td/utils/Slice.h"
#include "td/utils/Status.h"
#include "td/utils/tests.h"
#include "td/utils/tl_helpers.h"
#include "td/utils/tl_parsers.h"
#include "td/utils/tl_storers.h"

#include <cstdio>
#include <ctime>
#include <limits>
#include <map>
#include <memory>

REGISTER_TESTS(secret);

namespace my_api {

using namespace td;

class messages_getDhConfig {
 public:
  int32 version_{};
  int32 random_length_{};

  messages_getDhConfig() = default;

  messages_getDhConfig(int32 version_, int32 random_length_);

  static const int32 ID = 651135312;

  explicit messages_getDhConfig(TlBufferParser &p)
#define FAIL(error) p.set_error(error)
      : version_(TlFetchInt::parse(p))
      , random_length_(TlFetchInt::parse(p))
#undef FAIL
  {
  }
};

class InputUser {
 public:
  static tl_object_ptr<InputUser> fetch(TlBufferParser &p);
};

class inputUser final : public InputUser {
 public:
  int32 user_id_{};
  int64 access_hash_{};

  static const int32 ID = -668391402;
  inputUser() = default;

  explicit inputUser(TlBufferParser &p)
#define FAIL(error) p.set_error(error)
      : user_id_(TlFetchInt::parse(p))
      , access_hash_(TlFetchLong::parse(p))
#undef FAIL
  {
  }
};

tl_object_ptr<InputUser> InputUser::fetch(TlBufferParser &p) {
#define FAIL(error)   \
  p.set_error(error); \
  return nullptr;
  int constructor = p.fetch_int();
  switch (constructor) {
    case inputUser::ID:
      return make_tl_object<inputUser>(p);
    default:
      FAIL(PSTRING() << "Unknown constructor found " << format::as_hex(constructor));
  }
#undef FAIL
}

class messages_requestEncryption final {
 public:
  tl_object_ptr<InputUser> user_id_;
  int32 random_id_{};
  BufferSlice g_a_;

  static const int32 ID = -162681021;
  messages_requestEncryption();

  explicit messages_requestEncryption(TlBufferParser &p)
      : user_id_(TlFetchObject<InputUser>::parse(p))
      , random_id_(TlFetchInt::parse(p))
      , g_a_(TlFetchBytes<BufferSlice>::parse(p)) {
  }
};

class inputEncryptedChat final {
 public:
  int32 chat_id_{};
  int64 access_hash_{};

  inputEncryptedChat() = default;

  static const int32 ID = -247351839;
  explicit inputEncryptedChat(TlBufferParser &p) : chat_id_(TlFetchInt::parse(p)), access_hash_(TlFetchLong::parse(p)) {
  }
  static tl_object_ptr<inputEncryptedChat> fetch(TlBufferParser &p) {
    return make_tl_object<inputEncryptedChat>(p);
  }
};

class messages_acceptEncryption final {
 public:
  tl_object_ptr<inputEncryptedChat> peer_;
  BufferSlice g_b_;
  int64 key_fingerprint_{};

  messages_acceptEncryption() = default;

  static const int32 ID = 1035731989;

  explicit messages_acceptEncryption(TlBufferParser &p)
      : peer_(TlFetchBoxed<TlFetchObject<inputEncryptedChat>, -247351839>::parse(p))
      , g_b_(TlFetchBytes<BufferSlice>::parse(p))
      , key_fingerprint_(TlFetchLong::parse(p)) {
  }
};

class messages_sendEncryptedService final {
 public:
  tl_object_ptr<inputEncryptedChat> peer_;
  int64 random_id_{};
  BufferSlice data_;

  messages_sendEncryptedService() = default;
  static const int32 ID = 852769188;
  explicit messages_sendEncryptedService(TlBufferParser &p)
      : peer_(TlFetchBoxed<TlFetchObject<inputEncryptedChat>, -247351839>::parse(p))
      , random_id_(TlFetchLong::parse(p))
      , data_(TlFetchBytes<BufferSlice>::parse(p)) {
  }
};

class messages_sendEncrypted final {
 public:
  tl_object_ptr<inputEncryptedChat> peer_;
  int64 random_id_{};
  BufferSlice data_;

  messages_sendEncrypted() = default;
  static const int32 ID = -1451792525;

  explicit messages_sendEncrypted(TlBufferParser &p)
      : peer_(TlFetchBoxed<TlFetchObject<inputEncryptedChat>, -247351839>::parse(p))
      , random_id_(TlFetchLong::parse(p))
      , data_(TlFetchBytes<BufferSlice>::parse(p)) {
  }
};

template <class F>
static void downcast_call(TlBufferParser &p, F &&f) {
  auto id = p.fetch_int();
  switch (id) {
    case messages_getDhConfig::ID:
      return f(*make_tl_object<messages_getDhConfig>(p));
    case messages_requestEncryption::ID:
      return f(*make_tl_object<messages_requestEncryption>(p));
    case messages_acceptEncryption::ID:
      return f(*make_tl_object<messages_acceptEncryption>(p));
    case messages_sendEncrypted::ID:
      return f(*make_tl_object<messages_sendEncrypted>(p));
    case messages_sendEncryptedService::ID:
      return f(*make_tl_object<messages_sendEncryptedService>(p));
    default:
      LOG(ERROR) << "Unknown constructor " << id;
      UNREACHABLE();
  }
}

class messages_dhConfig final {
 public:
  int32 g_{};
  BufferSlice p_;
  int32 version_{};
  BufferSlice random_;

  messages_dhConfig() = default;

  messages_dhConfig(int32 g_, BufferSlice &&p_, int32 version_, BufferSlice &&random_)
      : g_(g_), p_(std::move(p_)), version_(version_), random_(std::move(random_)) {
  }

  static const int32 ID = 740433629;
  int32 get_id() const {
    return ID;
  }

  void store(TlStorerCalcLength &s) const {
    (void)sizeof(s);
    TlStoreBinary::store(g_, s);
    TlStoreString::store(p_, s);
    TlStoreBinary::store(version_, s);
    TlStoreString::store(random_, s);
  }
  void store(TlStorerUnsafe &s) const {
    (void)sizeof(s);
    TlStoreBinary::store(g_, s);
    TlStoreString::store(p_, s);
    TlStoreBinary::store(version_, s);
    TlStoreString::store(random_, s);
  }
};

class encryptedChat final {
 public:
  int32 id_{};
  int64 access_hash_{};
  int32 date_{};
  int32 admin_id_{};
  int32 participant_id_{};
  BufferSlice g_a_or_b_;
  int64 key_fingerprint_{};

  encryptedChat() = default;

  encryptedChat(int32 id_, int64 access_hash_, int32 date_, int32 admin_id_, int32 participant_id_,
                BufferSlice &&g_a_or_b_, int64 key_fingerprint_)
      : id_(id_)
      , access_hash_(access_hash_)
      , date_(date_)
      , admin_id_(admin_id_)
      , participant_id_(participant_id_)
      , g_a_or_b_(std::move(g_a_or_b_))
      , key_fingerprint_(key_fingerprint_) {
  }

  static const int32 ID = -94974410;
  int32 get_id() const {
    return ID;
  }

  void store(TlStorerCalcLength &s) const {
    (void)sizeof(s);
    TlStoreBinary::store(id_, s);
    TlStoreBinary::store(access_hash_, s);
    TlStoreBinary::store(date_, s);
    TlStoreBinary::store(admin_id_, s);
    TlStoreBinary::store(participant_id_, s);
    TlStoreString::store(g_a_or_b_, s);
    TlStoreBinary::store(key_fingerprint_, s);
  }

  void store(TlStorerUnsafe &s) const {
    (void)sizeof(s);
    TlStoreBinary::store(id_, s);
    TlStoreBinary::store(access_hash_, s);
    TlStoreBinary::store(date_, s);
    TlStoreBinary::store(admin_id_, s);
    TlStoreBinary::store(participant_id_, s);
    TlStoreString::store(g_a_or_b_, s);
    TlStoreBinary::store(key_fingerprint_, s);
  }
};

class messages_sentEncryptedMessage final {
 public:
  int32 date_{};

  messages_sentEncryptedMessage() = default;

  explicit messages_sentEncryptedMessage(int32 date_) : date_(date_) {
  }

  static const int32 ID = 1443858741;
  int32 get_id() const {
    return ID;
  }

  void store(TlStorerCalcLength &s) const {
    (void)sizeof(s);
    TlStoreBinary::store(date_, s);
  }

  void store(TlStorerUnsafe &s) const {
    (void)sizeof(s);
    TlStoreBinary::store(date_, s);
  }
};

}  // namespace my_api

namespace td {
static int32 g = 3;
static string prime_base64 =
    "xxyuucaxyQSObFIvcPE_c5gNQCOOPiHBSTTQN1Y9kw9IGYoKp8FAWCKUk9IlMPTb-jNvbgrJJROVQ67UTM58NyD9UfaUWHBaxozU_mtrE6vcl0ZRKW"
    "kyhFTxj6-MWV9kJHf-lrsqlB1bzR1KyMxJiAcI-ps3jjxPOpBgvuZ8-aSkppWBEFGQfhYnU7VrD2tBDbp02KhLKhSzFE4O8ShHVP0X7ZUNWWW0ud1G"
    "WC2xF40WnGvEZbDW_5yjko_vW5rk5Bj8Feg-vqD4f6n_Xu1wBQ3tKEn0e_lZ2VaFDOkphR8NgRX2NbEF7i5OFdBLJFS_b0-t8DSxBAMRnNjjuS_MW"
    "w";

class FakeDhCallback : public DhCallback {
 public:
  int is_good_prime(Slice prime_str) const override {
    auto it = cache.find(prime_str.str());
    if (it == cache.end()) {
      return -1;
    }
    return it->second;
  }
  void add_good_prime(Slice prime_str) const override {
    cache[prime_str.str()] = 1;
  }
  void add_bad_prime(Slice prime_str) const override {
    cache[prime_str.str()] = 0;
  }
  mutable std::map<string, int> cache;
};

class FakeBinlog
    : public BinlogInterface
    , public Actor {
 public:
  FakeBinlog() {
    register_actor("FakeBinlog", this).release();
  }
  void force_sync(Promise<> promise) override {
    if (pending_events_.empty()) {
      pending_events_.emplace_back();
    }
    pending_events_.back().promises_.push_back(std::move(promise));
    pending_events_.back().sync_flag = true;
    request_sync();
  }
  void request_sync() {
    if (!has_request_sync) {
      has_request_sync = true;
      if (Random::fast(0, 4) == 0) {
        set_timeout_in(Random::fast(0, 99) / 100.0 * 0.005 + 0.001);
      } else {
        yield();
      }
    }
  }
  void force_flush() override {
  }

  uint64 next_id() override {
    auto res = last_id_;
    last_id_++;
    return res;
  }
  uint64 next_id(int32 shift) override {
    auto res = last_id_;
    last_id_ += shift;
    return res;
  }
  template <class F>
  void for_each(const F &f) {
    events_processor_.for_each([&](auto &x) {
      LOG(INFO) << "REPLAY: " << x.id_;
      f(x);
    });
  }

  void restart() {
    has_request_sync = false;
    cancel_timeout();
    for (auto &pending : pending_events_) {
      auto &event = pending.event;
      if (!event.empty()) {
        // LOG(ERROR) << "FORGET EVENT: " << event.id_ << " " << event;
      }
    }
    pending_events_.clear();
  }

  void change_key(DbKey key, Promise<> promise) override {
  }

 protected:
  void close_impl(Promise<> promise) override {
  }
  void close_and_destroy_impl(Promise<> promise) override {
  }
  void add_raw_event_impl(uint64 id, BufferSlice &&raw_event, Promise<> promise, BinlogDebugInfo info) override {
    auto event = BinlogEvent(std::move(raw_event), info);
    LOG(INFO) << "ADD EVENT: " << event.id_ << " " << event;
    pending_events_.emplace_back();
    pending_events_.back().event = std::move(event);
    pending_events_.back().promises_.push_back(std::move(promise));
  }
  void do_force_sync() {
    if (pending_events_.empty()) {
      return;
    }
    cancel_timeout();
    has_request_sync = false;
    auto pos = static_cast<size_t>(Random::fast_uint64() % pending_events_.size());
    // pos = pending_events_.size() - 1;
    std::vector<Promise<>> promises;
    for (size_t i = 0; i <= pos; i++) {
      auto &pending = pending_events_[i];
      auto event = std::move(pending.event);
      if (!event.empty()) {
        LOG(INFO) << "SAVE EVENT: " << event.id_ << " " << event;
        events_processor_.add_event(std::move(event)).ensure();
      }
      append(promises, std::move(pending.promises_));
    }
    pending_events_.erase(pending_events_.begin(), pending_events_.begin() + pos + 1);
    for (auto &promise : promises) {
      promise.set_value(Unit());
    }

    for (auto &event : pending_events_) {
      if (event.sync_flag) {
        request_sync();
        break;
      }
    }
  }
  void timeout_expired() override {
    do_force_sync();
  }
  void wakeup() override {
    if (has_request_sync) {
      do_force_sync();
    }
  }
  bool has_request_sync = false;
  uint64 last_id_ = 1;
  detail::BinlogEventsProcessor events_processor_;

  struct PendingEvent {
    BinlogEvent event;
    bool sync_flag = false;
    std::vector<Promise<>> promises_;
  };

  std::vector<PendingEvent> pending_events_;
};

using FakeKeyValue = BinlogKeyValue<BinlogInterface>;

class Master;
class FakeSecretChatContext : public SecretChatActor::Context {
 public:
  FakeSecretChatContext(std::shared_ptr<BinlogInterface> binlog, std::shared_ptr<KeyValueSyncInterface> key_value,
                        std::shared_ptr<bool> close_flag, ActorShared<Master> master)
      : binlog_(std::move(binlog))
      , key_value_(std::move(key_value))
      , close_flag_(std::move(close_flag))
      , master_(std::move(master)) {
    secret_chat_db_ = std::make_shared<SecretChatDb>(key_value_, 1);
    net_query_creator_.stop_check();  // :(
  }
  DhCallback *dh_callback() override {
    return &fake_dh_callback_;
  }
  NetQueryCreator &net_query_creator() override {
    return net_query_creator_;
  }
  int32 unix_time() override {
    return static_cast<int32>(std::time(nullptr));
  }
  bool close_flag() override {
    return *close_flag_;
  }
  BinlogInterface *binlog() override {
    return binlog_.get();
  }
  SecretChatDb *secret_chat_db() override {
    return secret_chat_db_.get();
  }
  std::shared_ptr<DhConfig> dh_config() override {
    static auto config = [] {
      DhConfig dh_config;
      dh_config.version = 12;
      dh_config.g = g;
      dh_config.prime = base64url_decode(prime_base64).move_as_ok();
      return std::make_shared<DhConfig>(dh_config);
    }();

    return config;
  }
  void set_dh_config(std::shared_ptr<DhConfig> dh_config) override {
    // empty
  }

  bool get_config_option_boolean(const string &name) const override {
    return false;
  }

  // We don't want to expose the whole NetQueryDispatcher, MessagesManager and ContactsManager.
  // So it is more clear which parts of MessagesManager is really used. And it is much easier to create tests.
  void send_net_query(NetQueryPtr query, ActorShared<NetQueryCallback> callback, bool ordered) override;

  void on_update_secret_chat(int64 access_hash, UserId user_id, SecretChatState state, bool is_outbound, int32 ttl,
                             int32 date, string key_hash, int32 layer) override {
  }

  void on_inbound_message(UserId user_id, MessageId message_id, int32 date,
                          tl_object_ptr<telegram_api::encryptedFile> file,
                          tl_object_ptr<secret_api::decryptedMessage> message, Promise<>) override;

  void on_send_message_error(int64 random_id, Status error, Promise<>) override;
  void on_send_message_ack(int64 random_id) override;
  void on_send_message_ok(int64 random_id, MessageId message_id, int32 date,
                          tl_object_ptr<telegram_api::EncryptedFile> file, Promise<>) override;
  void on_delete_messages(std::vector<int64> random_id, Promise<>) override;
  void on_flush_history(MessageId, Promise<>) override;
  void on_read_message(int64, Promise<>) override;

  void on_screenshot_taken(UserId user_id, MessageId message_id, int32 date, int64 random_id,
                           Promise<> promise) override {
  }
  void on_set_ttl(UserId user_id, MessageId message_id, int32 date, int32 ttl, int64 random_id,
                  Promise<> promise) override {
  }

 private:
  FakeDhCallback fake_dh_callback_;
  static NetQueryCreator net_query_creator_;
  std::shared_ptr<BinlogInterface> binlog_;
  std::shared_ptr<KeyValueSyncInterface> key_value_;
  std::shared_ptr<bool> close_flag_;
  ActorShared<Master> master_;

  std::shared_ptr<SecretChatDb> secret_chat_db_;
};
NetQueryCreator FakeSecretChatContext::net_query_creator_;

class Master : public Actor {
 public:
  explicit Master(Status *status) : status_(status) {
  }
  class SecretChatProxy : public Actor {
   public:
    SecretChatProxy(string name, ActorShared<Master> parent) : name_(std::move(name)) {
      binlog_ = std::make_shared<FakeBinlog>();
      key_value_ = std::make_shared<FakeKeyValue>();
      key_value_->external_init_begin(LogEvent::HandlerType::BinlogPmcMagic);
      key_value_->external_init_finish(binlog_);
      close_flag_ = std::make_shared<bool>(false);
      parent_ = parent.get();
      parent_token_ = parent.token();
      actor_ = create_actor<SecretChatActor>(
          PSLICE() << "SecretChat " << name_, 123,
          td::make_unique<FakeSecretChatContext>(binlog_, key_value_, close_flag_, std::move(parent)), true);
      on_binlog_replay_finish();
    }

    ActorOwn<SecretChatActor> actor_;

    void add_inbound_message(int32 chat_id, BufferSlice data, uint64 crc) {
      CHECK(crc64(data.as_slice()) == crc);
      auto event = make_unique<logevent::InboundSecretMessage>();
      event->qts = 0;
      event->chat_id = chat_id;
      event->date = 0;
      event->encrypted_message = std::move(data);
      event->qts_ack = PromiseCreator::lambda(
          [actor_id = actor_id(this), chat_id, data = event->encrypted_message.copy(), crc](Result<> result) mutable {
            if (result.is_ok()) {
              LOG(INFO) << "FINISH add_inbound_message " << tag("crc", crc);
              return;
            }
            LOG(INFO) << "RESEND add_inbound_message " << tag("crc", crc) << result.error();
            send_closure(actor_id, &SecretChatProxy::add_inbound_message, chat_id, std::move(data), crc);
          });

      add_event(Event::delayed_closure(&SecretChatActor::add_inbound_message, std::move(event)));
    }

    void send_message(tl_object_ptr<secret_api::DecryptedMessage> message) {
      BufferSlice serialized_message(serialize(*message));
      auto resend_promise = PromiseCreator::lambda(
          [actor_id = actor_id(this), serialized_message = std::move(serialized_message)](Result<> result) mutable {
            TlBufferParser parser(&serialized_message);
            auto message = secret_api::decryptedMessage::fetch(parser);
            if (result.is_ok()) {
              LOG(INFO) << "FINISH send_message " << tag("message", to_string(message));
              return;
            }
            LOG(INFO) << "RESEND send_message " << tag("message", to_string(message)) << result.error();
            CHECK(serialize(*message) == serialized_message.as_slice());
            send_closure(actor_id, &SecretChatProxy::send_message, std::move(message));
          });
      auto sync_promise = PromiseCreator::lambda([actor_id = actor_id(this), generation = this->binlog_generation_,
                                                  resend_promise = std::move(resend_promise)](Result<> result) mutable {
        if (result.is_error()) {
          resend_promise.set_error(result.move_as_error());
          return;
        }
        send_closure(actor_id, &SecretChatProxy::sync_binlog, generation, std::move(resend_promise));
      });

      add_event(
          Event::delayed_closure(&SecretChatActor::send_message, std::move(message), nullptr, std::move(sync_promise)));
    }
    int32 binlog_generation_ = 0;
    void sync_binlog(int32 binlog_generation, Promise<> promise) {
      if (binlog_generation != binlog_generation_) {
        return promise.set_error(Status::Error("Binlog generation mismatch"));
      }
      binlog_->force_sync(std::move(promise));
    }
    void on_closed() {
      LOG(INFO) << "CLOSED";
      ready_ = false;
      *close_flag_ = false;

      key_value_ = std::make_shared<FakeKeyValue>();
      key_value_->external_init_begin(LogEvent::HandlerType::BinlogPmcMagic);

      std::vector<BinlogEvent> events;
      binlog_generation_++;
      binlog_->restart();
      binlog_->for_each([&](const BinlogEvent &event) {
        if (event.type_ == LogEvent::HandlerType::BinlogPmcMagic) {
          key_value_->external_init_handle(event);
        } else {
          events.push_back(event.clone());
        }
      });

      key_value_->external_init_finish(binlog_);

      actor_ = create_actor<SecretChatActor>(
          PSLICE() << "SecretChat " << name_, 123,
          td::make_unique<FakeSecretChatContext>(binlog_, key_value_, close_flag_,
                                                 ActorShared<Master>(parent_, parent_token_)),
          true);

      for (auto &event : events) {
        CHECK(event.type_ == LogEvent::HandlerType::SecretChats);
        auto r_message = logevent::SecretChatEvent::from_buffer_slice(event.data_as_buffer_slice());
        LOG_IF(FATAL, r_message.is_error()) << "Failed to deserialize event: " << r_message.error();
        auto message = r_message.move_as_ok();
        message->set_logevent_id(event.id_);
        LOG(INFO) << "Process binlog event " << *message;
        switch (message->get_type()) {
          case logevent::SecretChatEvent::Type::InboundSecretMessage:
            send_closure_later(actor_, &SecretChatActor::replay_inbound_message,
                               unique_ptr<logevent::InboundSecretMessage>(
                                   static_cast<logevent::InboundSecretMessage *>(message.release())));
            break;
          case logevent::SecretChatEvent::Type::OutboundSecretMessage:
            send_closure_later(actor_, &SecretChatActor::replay_outbound_message,
                               unique_ptr<logevent::OutboundSecretMessage>(
                                   static_cast<logevent::OutboundSecretMessage *>(message.release())));
            break;
          default:
            UNREACHABLE();
        }
      };
      start_test();
      on_binlog_replay_finish();
    }
    void on_binlog_replay_finish() {
      ready_ = true;
      LOG(INFO) << "Finish replay binlog";
      send_closure(actor_, &SecretChatActor::binlog_replay_finish);
      for (auto &event : pending_events_) {
        send_event(actor_, std::move(event));
      }
      pending_events_.clear();
    }
    void start_test() {
      set_timeout_in(Random::fast(50, 99) * 0.3 / 50);
      events_cnt_ = 0;
    }

   private:
    string name_;

    ActorId<Master> parent_;
    uint64 parent_token_;
    std::shared_ptr<FakeBinlog> binlog_;
    std::shared_ptr<FakeKeyValue> key_value_;
    std::shared_ptr<bool> close_flag_;
    int events_cnt_ = 0;

    std::vector<Event> pending_events_;
    bool ready_ = false;

    bool is_active() {
      return !actor_.empty() && ready_;
    }
    void add_event(Event event) {
      events_cnt_++;
      if (is_active()) {
        LOG(INFO) << "EMIT";
        send_event(actor_, std::move(event));
      } else {
        LOG(INFO) << "DELAY";
        pending_events_.push_back(std::move(event));
      }
    }

    int32 bad_cnt_ = 0;
    void timeout_expired() override {
      LOG(INFO) << "TIMEOUT EXPIRED";
      if (events_cnt_ < 4) {
        bad_cnt_++;
        CHECK(bad_cnt_ < 10);
      } else {
        bad_cnt_ = 0;
      }
      *close_flag_ = true;
      actor_.reset();
    }
  };

  auto &get_by_id(uint64 id) {
    if (id == 1) {
      return alice_;
    } else {
      return bob_;
    }
  }
  auto &from() {
    return get_by_id(get_link_token());
  }
  auto &to() {
    return get_by_id(3 - get_link_token());
  }
  void start_up() override {
    auto old_context = set_context(std::make_shared<Global>());
    alice_ = create_actor<SecretChatProxy>("SecretChatProxy alice", "alice", actor_shared(this, 1));
    bob_ = create_actor<SecretChatProxy>("SecretChatProxy bob", "bob", actor_shared(this, 2));
    send_closure(alice_->get_actor_unsafe()->actor_, &SecretChatActor::create_chat, 2, 0, 123,
                 PromiseCreator::lambda([actor_id = actor_id(this)](Result<SecretChatId> res) {
                   send_closure(actor_id, &Master::got_secret_chat_id, std::move(res), 0);
                 }));
  }
  void got_secret_chat_id(Result<SecretChatId> res, int) {  // second parameter is needed to workaround clang bug
    CHECK(res.is_ok());
    auto id = res.move_as_ok();
    LOG(INFO) << "SecretChatId = " << id;
  }
  bool can_fail(NetQueryPtr &query) {
    static int cnt = 20;
    if (cnt > 0) {
      cnt--;
      return false;
    }
    if (query->tl_constructor() == telegram_api::messages_sendEncrypted::ID ||
        query->tl_constructor() == telegram_api::messages_sendEncryptedFile::ID) {
      return true;
    }
    return false;
  }
  void send_net_query(NetQueryPtr query, ActorShared<NetQueryCallback> callback, bool ordered) {
    if (can_fail(query) && Random::fast(0, 1) == 0) {
      LOG(INFO) << "Fail query " << query;
      auto resend_promise =
          PromiseCreator::lambda([id = actor_shared(this, get_link_token()), callback_actor = callback.get(),
                                  callback_token = callback.token()](Result<NetQueryPtr> r_net_query) mutable {
            if (r_net_query.is_error()) {
              id.release();
              return;
            }
            send_closure(std::move(id), &Master::send_net_query, r_net_query.move_as_ok(),
                         ActorShared<NetQueryCallback>(callback_actor, callback_token), true);
          });
      query->set_error(Status::Error(429, "Test error"));
      send_closure(std::move(callback), &NetQueryCallback::on_result_resendable, std::move(query),
                   std::move(resend_promise));
      return;
    } else {
      LOG(INFO) << "Do not fail " << query;
    }
    auto query_slice = query->query().clone();
    if (query->gzip_flag() == NetQuery::GzipFlag::On) {
      query_slice = gzdecode(query_slice.as_slice());
    }
    TlBufferParser parser(&query_slice);
    //auto object = telegram_api::Function::fetch(parser);
    //LOG(INFO) << query_slice.size();
    //parser.get_status().ensure();
    my_api::downcast_call(parser, [&](auto &object) {
      this->process_net_query(std::move(object), std::move(query), std::move(callback));
    });
  }
  template <class T>
  void process_net_query(T &&object, NetQueryPtr query, ActorShared<NetQueryCallback> callback) {
    LOG(FATAL) << "Unsupported query: " << to_string(object);
  }
  void process_net_query(my_api::messages_getDhConfig &&get_dh_config, NetQueryPtr net_query,
                         ActorShared<NetQueryCallback> callback) {
    //LOG(INFO) << "Got query " << to_string(get_dh_config);
    my_api::messages_dhConfig config;
    config.p_ = BufferSlice(base64url_decode(prime_base64).move_as_ok());
    config.g_ = g;
    config.version_ = 12;
    auto storer = TLObjectStorer<my_api::messages_dhConfig>(config);
    BufferSlice answer(storer.size());
    auto real_size = storer.store(answer.as_slice().ubegin());
    CHECK(real_size == answer.size());
    net_query->set_ok(std::move(answer));
    send_closure(std::move(callback), &NetQueryCallback::on_result, std::move(net_query));
  }
  void process_net_query(my_api::messages_requestEncryption &&request_encryption, NetQueryPtr net_query,
                         ActorShared<NetQueryCallback> callback) {
    CHECK(get_link_token() == 1);
    send_closure(alice_->get_actor_unsafe()->actor_, &SecretChatActor::update_chat,
                 make_tl_object<telegram_api::encryptedChatWaiting>(123, 321, 0, 1, 2));
    send_closure(
        bob_->get_actor_unsafe()->actor_, &SecretChatActor::update_chat,
        make_tl_object<telegram_api::encryptedChatRequested>(123, 321, 0, 1, 2, request_encryption.g_a_.clone()));
    net_query->clear();
  }
  void process_net_query(my_api::messages_acceptEncryption &&request_encryption, NetQueryPtr net_query,
                         ActorShared<NetQueryCallback> callback) {
    CHECK(get_link_token() == 2);
    send_closure(alice_->get_actor_unsafe()->actor_, &SecretChatActor::update_chat,
                 make_tl_object<telegram_api::encryptedChat>(123, 321, 0, 1, 2, request_encryption.g_b_.clone(),
                                                             request_encryption.key_fingerprint_));

    my_api::encryptedChat encrypted_chat(123, 321, 0, 1, 2, BufferSlice(), request_encryption.key_fingerprint_);
    auto storer = TLObjectStorer<my_api::encryptedChat>(encrypted_chat);
    BufferSlice answer(storer.size());
    auto real_size = storer.store(answer.as_slice().ubegin());
    CHECK(real_size == answer.size());
    net_query->set_ok(std::move(answer));
    send_closure(std::move(callback), &NetQueryCallback::on_result, std::move(net_query));
    send_closure(alice_, &SecretChatProxy::start_test);
    send_closure(bob_, &SecretChatProxy::start_test);
    send_ping(1, 5000);
    set_timeout_in(1);
  }
  void timeout_expired() override {
    send_message(1, "oppa");
    send_message(2, "appo");
    set_timeout_in(1);
  }
  void send_ping(int id, int cnt) {
    if (cnt % 200 == 0) {
      LOG(ERROR) << "Send ping " << tag("id", id) << tag("cnt", cnt);
    } else {
      LOG(INFO) << "Send ping " << tag("id", id) << tag("cnt", cnt);
    }
    string text = PSTRING() << "PING: " << cnt;
    send_message(id, std::move(text));
  }
  void send_message(int id, string text) {
    auto random_id = Random::secure_int64();
    LOG(INFO) << "Send message: " << tag("id", id) << tag("text", text) << tag("random_id", random_id);
    sent_messages_[random_id] = Message{id, text};
    send_closure(get_by_id(id), &SecretChatProxy::send_message,
                 secret_api::make_object<secret_api::decryptedMessage>(0, random_id, 0, text, Auto(), Auto(), Auto(),
                                                                       Auto(), 0));
  }
  void process_net_query(my_api::messages_sendEncryptedService &&message, NetQueryPtr net_query,
                         ActorShared<NetQueryCallback> callback) {
    process_net_query_send_enrypted(std::move(message.data_), std::move(net_query), std::move(callback));
  }
  void process_net_query(my_api::messages_sendEncrypted &&message, NetQueryPtr net_query,
                         ActorShared<NetQueryCallback> callback) {
    process_net_query_send_enrypted(std::move(message.data_), std::move(net_query), std::move(callback));
  }
  void process_net_query_send_enrypted(BufferSlice data, NetQueryPtr net_query,
                                       ActorShared<NetQueryCallback> callback) {
    my_api::messages_sentEncryptedMessage sent_message;
    sent_message.date_ = 0;
    auto storer = TLObjectStorer<my_api::messages_sentEncryptedMessage>(sent_message);
    BufferSlice answer(storer.size());
    auto real_size = storer.store(answer.as_slice().ubegin());
    CHECK(real_size == answer.size());
    net_query->set_ok(std::move(answer));
    send_closure(std::move(callback), &NetQueryCallback::on_result, std::move(net_query));

    // We can't loose updates yet :(
    auto crc = crc64(data.as_slice());
    LOG(INFO) << "Send SecretChatProxy::add_inbound_message" << tag("crc", crc);
    send_closure(to(), &SecretChatProxy::add_inbound_message, narrow_cast<int32>(3 - get_link_token()), std::move(data),
                 crc);
  }

  int32 last_ping_ = std::numeric_limits<int32>::max();
  void on_inbound_message(string message, Promise<> promise) {
    promise.set_value(Unit());
    LOG(INFO) << "GOT INBOUND MESSAGE: " << message << " " << get_link_token();
    int32 cnt;
    int x = std::sscanf(message.c_str(), "PING: %d", &cnt);
    if (x != 1) {
      return;
    }
    if (cnt == 0) {
      Scheduler::instance()->finish();
      *status_ = Status::OK();
      return;
    }
    if (cnt >= last_ping_) {
      return;
    }
    last_ping_ = cnt;
    send_ping(narrow_cast<int32>(get_link_token()), cnt - 1);
  }
  void on_send_message_error(int64 random_id, Status error, Promise<> promise) {
    promise.set_value(Unit());
    LOG(INFO) << "Receive send message error: " << tag("random_id", random_id) << error;
    auto it = sent_messages_.find(random_id);
    if (it == sent_messages_.end()) {
      LOG(INFO) << "TODO: try to fix errors about message after it is sent";
      return;
    }
    CHECK(it != sent_messages_.end());
    auto message = it->second;
    // sent_messages_.erase(it);
    send_message(message.id, message.text);
  }
  void on_send_message_ok(int64 random_id, Promise<> promise) {
    promise.set_value(Unit());
    LOG(INFO) << "Receive send message ok: " << tag("random_id", random_id);
    auto it = sent_messages_.find(random_id);
    if (it == sent_messages_.end()) {
      LOG(INFO) << "TODO: try to fix errors about message after it is sent";
      return;
    }
    CHECK(it != sent_messages_.end());
    // sent_messages_.erase(it);
  }

 private:
  Status *status_;
  ActorOwn<SecretChatProxy> alice_;
  ActorOwn<SecretChatProxy> bob_;
  struct Message {
    int32 id;
    string text;
  };
  std::map<int64, Message> sent_messages_;

  void hangup_shared() override {
    LOG(INFO) << "GOT HANGUP: " << get_link_token();
    send_closure(from(), &SecretChatProxy::on_closed);
  }
};

void FakeSecretChatContext::send_net_query(NetQueryPtr query, ActorShared<NetQueryCallback> callback, bool ordered) {
  send_closure(master_, &Master::send_net_query, std::move(query), std::move(callback), ordered);
}
void FakeSecretChatContext::on_inbound_message(UserId user_id, MessageId message_id, int32 date,
                                               tl_object_ptr<telegram_api::encryptedFile> file,
                                               tl_object_ptr<secret_api::decryptedMessage> message, Promise<> promise) {
  send_closure(master_, &Master::on_inbound_message, message->message_, std::move(promise));
}
void FakeSecretChatContext::on_send_message_error(int64 random_id, Status error, Promise<> promise) {
  send_closure(master_, &Master::on_send_message_error, random_id, std::move(error), std::move(promise));
}
void FakeSecretChatContext::on_send_message_ack(int64 random_id) {
}
void FakeSecretChatContext::on_send_message_ok(int64 random_id, MessageId message_id, int32 date,
                                               tl_object_ptr<telegram_api::EncryptedFile> file, Promise<> promise) {
  send_closure(master_, &Master::on_send_message_ok, random_id, std::move(promise));
}
void FakeSecretChatContext::on_delete_messages(std::vector<int64> random_id, Promise<> promise) {
  promise.set_value(Unit());
}
void FakeSecretChatContext::on_flush_history(MessageId, Promise<> promise) {
  promise.set_error(Status::Error("Unsupported"));
}
void FakeSecretChatContext::on_read_message(int64, Promise<> promise) {
  promise.set_error(Status::Error("Unsupported"));
}

TEST(Secret, go) {
  SET_VERBOSITY_LEVEL(VERBOSITY_NAME(ERROR));
  ConcurrentScheduler sched;
  int threads_n = 0;
  sched.init(threads_n);

  Status result;
  sched.create_actor_unsafe<Master>(0, "HandshakeTestActor", &result).release();
  sched.start();
  while (sched.run_main(10)) {
    // empty;
  }
  sched.finish();

  if (result.is_error()) {
    LOG(ERROR) << result;
  }
  ASSERT_TRUE(result.is_ok());
}

}  // namespace td