//
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2023
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include "td/actor/actor.h"
#include "td/actor/ConcurrentScheduler.h"
#include "td/actor/PromiseFuture.h"

#include "td/utils/common.h"
#include "td/utils/logging.h"
#include "td/utils/Random.h"
#include "td/utils/ScopeGuard.h"
#include "td/utils/tests.h"

#include <limits>
#include <map>
#include <memory>
#include <utility>

template <class ContainerT>
static typename ContainerT::value_type &rand_elem(ContainerT &cont) {
  CHECK(0 < cont.size() && cont.size() <= static_cast<size_t>(std::numeric_limits<int>::max()));
  return cont[td::Random::fast(0, static_cast<int>(cont.size()) - 1)];
}

static td::uint32 fast_pow_mod_uint32(td::uint32 x, td::uint32 p) {
  td::uint32 res = 1;
  while (p) {
    if (p & 1) {
      res *= x;
    }
    x *= x;
    p >>= 1;
  }
  return res;
}

static td::uint32 slow_pow_mod_uint32(td::uint32 x, td::uint32 p) {
  td::uint32 res = 1;
  for (td::uint32 i = 0; i < p; i++) {
    res *= x;
  }
  return res;
}

struct ActorQuery {
  td::uint32 query_id{};
  td::uint32 result{};
  td::vector<int> todo;
  ActorQuery() = default;
  ActorQuery(const ActorQuery &) = delete;
  ActorQuery &operator=(const ActorQuery &) = delete;
  ActorQuery(ActorQuery &&) = default;
  ActorQuery &operator=(ActorQuery &&) = default;
  ~ActorQuery() {
    LOG_CHECK(todo.empty()) << "ActorQuery lost";
  }
  int next_pow() {
    CHECK(!todo.empty());
    int res = todo.back();
    todo.pop_back();
    return res;
  }
  bool ready() {
    return todo.empty();
  }
};

static td::uint32 fast_calc(ActorQuery &q) {
  td::uint32 result = q.result;
  for (auto x : q.todo) {
    result = fast_pow_mod_uint32(result, x);
  }
  return result;
}

class Worker final : public td::Actor {
 public:
  explicit Worker(int threads_n) : threads_n_(threads_n) {
  }
  void query(td::PromiseActor<td::uint32> &&promise, td::uint32 x, td::uint32 p) {
    td::uint32 result = slow_pow_mod_uint32(x, p);
    promise.set_value(std::move(result));

    (void)threads_n_;
    // if (threads_n_ > 1 && td::Random::fast(0, 9) == 0) {
    // migrate(td::Random::fast(2, threads_n));
    //}
  }

 private:
  int threads_n_;
};

class QueryActor final : public td::Actor {
 public:
  class Callback {
   public:
    Callback() = default;
    Callback(const Callback &) = delete;
    Callback &operator=(const Callback &) = delete;
    Callback(Callback &&) = delete;
    Callback &operator=(Callback &&) = delete;
    virtual ~Callback() = default;
    virtual void on_result(ActorQuery &&query) = 0;
    virtual void on_closed() = 0;
  };

  explicit QueryActor(int threads_n) : threads_n_(threads_n) {
  }

  void set_callback(td::unique_ptr<Callback> callback) {
    callback_ = std::move(callback);
  }
  void set_workers(td::vector<td::ActorId<Worker>> workers) {
    workers_ = std::move(workers);
  }

  void query(ActorQuery &&query) {
    td::uint32 x = query.result;
    td::uint32 p = query.next_pow();
    if (td::Random::fast(0, 3) && (p <= 1000 || workers_.empty())) {
      query.result = slow_pow_mod_uint32(x, p);
      callback_->on_result(std::move(query));
    } else {
      auto future = td::Random::fast(0, 3) == 0
                        ? td::send_promise<td::ActorSendType::Immediate>(rand_elem(workers_), &Worker::query, x, p)
                        : td::send_promise<td::ActorSendType::Later>(rand_elem(workers_), &Worker::query, x, p);
      if (future.is_ready()) {
        query.result = future.move_as_ok();
        callback_->on_result(std::move(query));
      } else {
        future.set_event(td::EventCreator::raw(actor_id(), query.query_id));
        auto query_id = query.query_id;
        pending_.emplace(query_id, std::make_pair(std::move(future), std::move(query)));
      }
    }
    if (threads_n_ > 1 && td::Random::fast(0, 9) == 0) {
      migrate(td::Random::fast(2, threads_n_));
    }
  }

  void raw_event(const td::Event::Raw &event) final {
    td::uint32 id = event.u32;
    auto it = pending_.find(id);
    auto future = std::move(it->second.first);
    auto query = std::move(it->second.second);
    pending_.erase(it);
    CHECK(future.is_ready());
    query.result = future.move_as_ok();
    callback_->on_result(std::move(query));
  }

  void close() {
    callback_->on_closed();
    stop();
  }

  void on_start_migrate(td::int32 sched_id) final {
    for (auto &it : pending_) {
      start_migrate(it.second.first, sched_id);
    }
  }
  void on_finish_migrate() final {
    for (auto &it : pending_) {
      finish_migrate(it.second.first);
    }
  }

 private:
  td::unique_ptr<Callback> callback_;
  std::map<td::uint32, std::pair<td::FutureActor<td::uint32>, ActorQuery>> pending_;
  td::vector<td::ActorId<Worker>> workers_;
  int threads_n_;
};

class MainQueryActor final : public td::Actor {
  class QueryActorCallback final : public QueryActor::Callback {
   public:
    void on_result(ActorQuery &&query) final {
      if (query.ready()) {
        send_closure(parent_id_, &MainQueryActor::on_result, std::move(query));
      } else {
        send_closure(next_solver_, &QueryActor::query, std::move(query));
      }
    }
    void on_closed() final {
      send_closure(parent_id_, &MainQueryActor::on_closed);
    }
    QueryActorCallback(td::ActorId<MainQueryActor> parent_id, td::ActorId<QueryActor> next_solver)
        : parent_id_(parent_id), next_solver_(next_solver) {
    }

   private:
    td::ActorId<MainQueryActor> parent_id_;
    td::ActorId<QueryActor> next_solver_;
  };

  const int ACTORS_CNT = 10;
  const int WORKERS_CNT = 4;

 public:
  explicit MainQueryActor(int threads_n) : threads_n_(threads_n) {
  }

  void start_up() final {
    actors_.resize(ACTORS_CNT);
    for (auto &actor : actors_) {
      auto actor_ptr = td::make_unique<QueryActor>(threads_n_);
      actor = register_actor("QueryActor", std::move(actor_ptr), threads_n_ > 1 ? td::Random::fast(2, threads_n_) : 0)
                  .release();
    }

    workers_.resize(WORKERS_CNT);
    for (auto &worker : workers_) {
      auto actor_ptr = td::make_unique<Worker>(threads_n_);
      worker = register_actor("Worker", std::move(actor_ptr), threads_n_ > 1 ? td::Random::fast(2, threads_n_) : 0)
                   .release();
    }

    for (int i = 0; i < ACTORS_CNT; i++) {
      ref_cnt_++;
      send_closure(actors_[i], &QueryActor::set_callback,
                   td::make_unique<QueryActorCallback>(actor_id(this), actors_[(i + 1) % ACTORS_CNT]));
      send_closure(actors_[i], &QueryActor::set_workers, workers_);
    }
    yield();
  }

  void on_result(ActorQuery &&query) {
    CHECK(query.ready());
    CHECK(query.result == expected_[query.query_id]);
    in_cnt_++;
    wakeup();
  }

  ActorQuery create_query() {
    ActorQuery q;
    q.query_id = (query_id_ += 2);
    q.result = q.query_id;
    q.todo = {1, 1, 1, 1, 1, 1, 1, 1, 10000};
    expected_[q.query_id] = fast_calc(q);
    return q;
  }

  void on_closed() {
    ref_cnt_--;
    if (ref_cnt_ == 0) {
      td::Scheduler::instance()->finish();
    }
  }

  void wakeup() final {
    int cnt = 10000;
    while (out_cnt_ < in_cnt_ + 100 && out_cnt_ < cnt) {
      if (td::Random::fast_bool()) {
        send_closure(rand_elem(actors_), &QueryActor::query, create_query());
      } else {
        send_closure_later(rand_elem(actors_), &QueryActor::query, create_query());
      }
      out_cnt_++;
    }
    if (in_cnt_ == cnt) {
      in_cnt_++;
      ref_cnt_--;
      for (auto &actor : actors_) {
        send_closure(actor, &QueryActor::close);
      }
    }
  }

 private:
  std::map<td::uint32, td::uint32> expected_;
  td::vector<td::ActorId<QueryActor>> actors_;
  td::vector<td::ActorId<Worker>> workers_;
  int out_cnt_ = 0;
  int in_cnt_ = 0;
  int query_id_ = 1;
  int ref_cnt_ = 1;
  int threads_n_;
};

class SimpleActor final : public td::Actor {
 public:
  explicit SimpleActor(td::int32 threads_n) : threads_n_(threads_n) {
  }
  void start_up() final {
    auto actor_ptr = td::make_unique<Worker>(threads_n_);
    worker_ =
        register_actor("Worker", std::move(actor_ptr), threads_n_ > 1 ? td::Random::fast(2, threads_n_) : 0).release();
    yield();
  }

  void wakeup() final {
    if (q_ == 10000) {
      td::Scheduler::instance()->finish();
      stop();
      return;
    }
    q_++;
    p_ = td::Random::fast_bool() ? 1 : 10000;
    auto future = td::Random::fast(0, 3) == 0
                      ? td::send_promise<td::ActorSendType::Immediate>(worker_, &Worker::query, q_, p_)
                      : td::send_promise<td::ActorSendType::Later>(worker_, &Worker::query, q_, p_);
    if (future.is_ready()) {
      auto result = future.move_as_ok();
      CHECK(result == fast_pow_mod_uint32(q_, p_));
      yield();
    } else {
      future.set_event(td::EventCreator::raw(actor_id(), nullptr));
      future_ = std::move(future);
    }
    // if (threads_n_ > 1 && td::Random::fast(0, 2) == 0) {
    // migrate(td::Random::fast(1, threads_n));
    //}
  }
  void raw_event(const td::Event::Raw &event) final {
    auto result = future_.move_as_ok();
    CHECK(result == fast_pow_mod_uint32(q_, p_));
    yield();
  }

  void on_start_migrate(td::int32 sched_id) final {
    start_migrate(future_, sched_id);
  }
  void on_finish_migrate() final {
    finish_migrate(future_);
  }

 private:
  td::int32 threads_n_;
  td::ActorId<Worker> worker_;
  td::FutureActor<td::uint32> future_;
  td::uint32 q_ = 1;
  td::uint32 p_ = 0;
};

class SendToDead final : public td::Actor {
 public:
  class Parent final : public td::Actor {
   public:
    explicit Parent(td::ActorShared<> parent, int ttl = 3) : parent_(std::move(parent)), ttl_(ttl) {
    }
    void start_up() final {
      set_timeout_in(td::Random::fast_uint32() % 3 * 0.001);
      if (ttl_ != 0) {
        child_ = td::create_actor_on_scheduler<Parent>(
            "Child", td::Random::fast_uint32() % td::Scheduler::instance()->sched_count(), actor_shared(this),
            ttl_ - 1);
      }
    }
    void timeout_expired() final {
      stop();
    }

   private:
    td::ActorOwn<Parent> child_;
    td::ActorShared<> parent_;
    int ttl_;
  };

  void start_up() final {
    for (int i = 0; i < 2000; i++) {
      td::create_actor_on_scheduler<Parent>(
          "Parent", td::Random::fast_uint32() % td::Scheduler::instance()->sched_count(), create_reference(), 4)
          .release();
    }
  }

  td::ActorShared<> create_reference() {
    ref_cnt_++;
    return actor_shared(this);
  }

  void hangup_shared() final {
    ref_cnt_--;
    if (ref_cnt_ == 0) {
      ttl_--;
      if (ttl_ <= 0) {
        td::Scheduler::instance()->finish();
        stop();
      } else {
        start_up();
      }
    }
  }

  td::uint32 ttl_{50};
  td::uint32 ref_cnt_{0};
};

TEST(Actors, send_to_dead) {
  //TODO: fix CHECK(storage_count_.load() == 0)
  return;
  int threads_n = 5;
  td::ConcurrentScheduler sched(threads_n, 0);

  sched.create_actor_unsafe<SendToDead>(0, "SendToDead").release();
  sched.start();
  while (sched.run_main(10)) {
    // empty
  }
  sched.finish();
}

TEST(Actors, main_simple) {
  int threads_n = 3;
  td::ConcurrentScheduler sched(threads_n, 0);

  sched.create_actor_unsafe<SimpleActor>(threads_n > 1 ? 1 : 0, "simple", threads_n).release();
  sched.start();
  while (sched.run_main(10)) {
    // empty
  }
  sched.finish();
}

TEST(Actors, main) {
  int threads_n = 9;
  td::ConcurrentScheduler sched(threads_n, 0);

  sched.create_actor_unsafe<MainQueryActor>(threads_n > 1 ? 1 : 0, "MainQuery", threads_n).release();
  sched.start();
  while (sched.run_main(10)) {
    // empty
  }
  sched.finish();
}

class DoAfterStop final : public td::Actor {
 public:
  void loop() final {
    ptr = td::make_unique<int>(10);
    stop();
    CHECK(*ptr == 10);
    td::Scheduler::instance()->finish();
  }

 private:
  td::unique_ptr<int> ptr;
};

TEST(Actors, do_after_stop) {
  int threads_n = 0;
  td::ConcurrentScheduler sched(threads_n, 0);

  sched.create_actor_unsafe<DoAfterStop>(0, "DoAfterStop").release();
  sched.start();
  while (sched.run_main(10)) {
    // empty
  }
  sched.finish();
}

class XContext final : public td::ActorContext {
 public:
  td::int32 get_id() const final {
    return 123456789;
  }

  void validate() {
    CHECK(x == 1234);
  }
  ~XContext() final {
    x = 0;
  }
  int x = 1234;
};

class WithXContext final : public td::Actor {
 public:
  void start_up() final {
    auto old_context = set_context(std::make_shared<XContext>());
  }
  void f(td::unique_ptr<td::Guard> guard) {
  }
  void close() {
    stop();
  }
};

static void check_context() {
  auto ptr = static_cast<XContext *>(td::Scheduler::context());
  CHECK(ptr != nullptr);
  ptr->validate();
}

TEST(Actors, context_during_destruction) {
  int threads_n = 0;
  td::ConcurrentScheduler sched(threads_n, 0);

  {
    auto guard = sched.get_main_guard();
    auto with_context = td::create_actor<WithXContext>("WithXContext").release();
    send_closure(with_context, &WithXContext::f, td::create_lambda_guard([] { check_context(); }));
    send_closure_later(with_context, &WithXContext::close);
    send_closure(with_context, &WithXContext::f, td::create_lambda_guard([] { check_context(); }));
    send_closure(with_context, &WithXContext::f, td::create_lambda_guard([] { td::Scheduler::instance()->finish(); }));
  }
  sched.start();
  while (sched.run_main(10)) {
    // empty
  }
  sched.finish();
}