//
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include "td/actor/impl/Scheduler.h"

#include "td/actor/impl/Actor.h"
#include "td/actor/impl/ActorId.h"
#include "td/actor/impl/ActorInfo.h"
#include "td/actor/impl/Event.h"
#include "td/actor/impl/EventFull.h"

#include "td/utils/common.h"
#include "td/utils/format.h"
#include "td/utils/List.h"
#include "td/utils/logging.h"
#include "td/utils/ObjectPool.h"
#include "td/utils/port/thread_local.h"
#include "td/utils/ScopeGuard.h"
#include "td/utils/Time.h"

#include <functional>
#include <utility>

namespace td {

TD_THREAD_LOCAL Scheduler *Scheduler::scheduler_;   // static zero-initialized
TD_THREAD_LOCAL ActorContext *Scheduler::context_;  // static zero-initialized

Scheduler::~Scheduler() {
  clear();
}

Scheduler *Scheduler::instance() {
  return scheduler_;
}

ActorContext *&Scheduler::context() {
  return context_;
}

void Scheduler::on_context_updated() {
  LOG_TAG = context_->tag_;
}

void Scheduler::set_scheduler(Scheduler *scheduler) {
  scheduler_ = scheduler;
}

void Scheduler::ServiceActor::start_up() {
#if TD_THREAD_UNSUPPORTED || TD_EVENTFD_UNSUPPORTED
  CHECK(!inbound_);
#else
  if (!inbound_) {
    return;
  }
  auto &fd = inbound_->reader_get_event_fd();

  fd.get_fd().set_observer(this);
  ::td::subscribe(fd.get_fd(), Fd::Read);
  yield();
#endif
}

void Scheduler::ServiceActor::loop() {
  auto &queue = inbound_;
  int ready_n = queue->reader_wait_nonblock();
  if (ready_n == 0) {
    return;
  }
  while (ready_n-- > 0) {
    EventFull event = queue->reader_get_unsafe();
    if (event.actor_id().empty()) {
      Scheduler::instance()->register_migrated_actor(static_cast<ActorInfo *>(event.data().data.ptr));
    } else {
      VLOG(actor) << "Receive " << event.data();
      finish_migrate(event.data());
      event.try_emit();
    }
  }
  queue->reader_flush();
  yield();
}

/*** SchedlerGuard ***/
SchedulerGuard::SchedulerGuard(Scheduler *scheduler) : scheduler_(scheduler) {
  CHECK(!scheduler_->has_guard_);
  scheduler_->has_guard_ = true;
  save_scheduler_ = Scheduler::instance();
  Scheduler::set_scheduler(scheduler_);

  // Scheduler::context() must be not null
  save_context_ = scheduler_->save_context_.get();
  save_tag_ = LOG_TAG;
  LOG_TAG = save_context_->tag_;
  std::swap(save_context_, Scheduler::context());
}

SchedulerGuard::~SchedulerGuard() {
  if (is_valid_.get()) {
    std::swap(save_context_, scheduler_->context());
    Scheduler::set_scheduler(save_scheduler_);
    CHECK(scheduler_->has_guard_);
    scheduler_->has_guard_ = false;
    LOG_TAG = save_tag_;
  }
}

/*** EventGuard ***/
EventGuard::EventGuard(Scheduler *scheduler, ActorInfo *actor_info) : scheduler_(scheduler) {
  actor_info->start_run();
  event_context_.actor_info = actor_info;
  event_context_ptr_ = &event_context_;

  save_context_ = actor_info->get_context();
#ifdef TD_DEBUG
  save_log_tag2_ = actor_info->get_name().c_str();
#endif
  swap_context(actor_info);
}

EventGuard::~EventGuard() {
  auto info = event_context_.actor_info;
  auto node = info->get_list_node();
  node->remove();
  if (info->mailbox_.empty()) {
    scheduler_->pending_actors_list_.put(node);
  } else {
    scheduler_->ready_actors_list_.put(node);
  }
  info->finish_run();
  swap_context(info);
  CHECK(info->is_lite() || save_context_ == info->get_context());
#ifdef TD_DEBUG
  CHECK(info->is_lite() || save_log_tag2_ == info->get_name().c_str());
#endif
  if (event_context_.flags & Scheduler::EventContext::Stop) {
    scheduler_->do_stop_actor(info);
    return;
  }
  if (event_context_.flags & Scheduler::EventContext::Migrate) {
    scheduler_->do_migrate_actor(info, event_context_.dest_sched_id);
  }
}

void EventGuard::swap_context(ActorInfo *info) {
  std::swap(scheduler_->event_context_ptr_, event_context_ptr_);

  if (info->is_lite()) {
    return;
  }

#ifdef TD_DEBUG
  std::swap(LOG_TAG2, save_log_tag2_);
#endif

  auto *current_context_ptr = &Scheduler::context();
  if (save_context_ != *current_context_ptr) {
    std::swap(save_context_, *current_context_ptr);
    Scheduler::on_context_updated();
  }
}

void Scheduler::init(int32 id, std::vector<std::shared_ptr<MpscPollableQueue<EventFull>>> outbound,
                     Callback *callback) {
  save_context_ = std::make_shared<ActorContext>();
  save_context_->this_ptr_ = save_context_;
  save_context_->tag_ = LOG_TAG;

  auto guard = get_guard();

  callback_ = callback;
  actor_info_pool_ = make_unique<ObjectPool<ActorInfo>>();

  yield_flag_ = false;
  actor_count_ = 0;
  sched_id_ = 0;

  poll_.init();

#if !TD_THREAD_UNSUPPORTED && !TD_EVENTFD_UNSUPPORTED
  event_fd_.init();
  subscribe(event_fd_.get_fd(), Fd::Read);
#endif

  if (!outbound.empty()) {
    inbound_queue_ = std::move(outbound[id]);
  }
  outbound_queues_ = std::move(outbound);
  sched_id_ = id;
  sched_n_ = static_cast<int32>(outbound_queues_.size());
  service_actor_.set_queue(inbound_queue_);
  register_actor("ServiceActor", &service_actor_).release();
}

void Scheduler::clear() {
  if (service_actor_.empty()) {
    return;
  }
  close_flag_ = true;
  auto guard = get_guard();

  // Stop all actors
  if (!service_actor_.empty()) {
    service_actor_.do_stop();
  }
  while (!pending_actors_list_.empty()) {
    auto actor_info = ActorInfo::from_list_node(pending_actors_list_.get());
    do_stop_actor(actor_info);
  }
  while (!ready_actors_list_.empty()) {
    auto actor_info = ActorInfo::from_list_node(ready_actors_list_.get());
    do_stop_actor(actor_info);
  }
  LOG_IF(FATAL, !ready_actors_list_.empty()) << ActorInfo::from_list_node(ready_actors_list_.next)->get_name();
  CHECK(ready_actors_list_.empty());
  poll_.clear();

#if !TD_THREAD_UNSUPPORTED && !TD_EVENTFD_UNSUPPORTED
  if (!event_fd_.empty()) {
    event_fd_.close();
  }
#endif

  if (callback_) {
    // can't move lambda with unique_ptr inside into std::function
    auto ptr = actor_info_pool_.release();
    callback_->register_at_finish([=]() { delete ptr; });
  } else {
    actor_info_pool_.reset();
  }
}

void Scheduler::do_event(ActorInfo *actor_info, Event &&event) {
  event_context_ptr_->link_token = event.link_token;
  auto actor = actor_info->get_actor_unsafe();
  switch (event.type) {
    case Event::Type::Start: {
      VLOG(actor) << *actor_info << " Event::Start";
      actor->start_up();
      break;
    }
    case Event::Type::Stop: {
      VLOG(actor) << *actor_info << " Event::Stop";
      actor->tear_down();
      break;
    }
    case Event::Type::Yield: {
      VLOG(actor) << *actor_info << " Event::Yield";
      actor->wakeup();
      break;
    }
    case Event::Type::Hangup: {
      auto token = get_link_token(actor);
      VLOG(actor) << *actor_info << " Event::Hangup " << tag("token", format::as_hex(token));
      if (token != 0) {
        actor->hangup_shared();
      } else {
        actor->hangup();
      }
      break;
    }
    case Event::Type::Timeout: {
      VLOG(actor) << *actor_info << " Event::Timeout";
      actor->timeout_expired();
      break;
    }
    case Event::Type::Raw: {
      VLOG(actor) << *actor_info << " Event::Raw";
      actor->raw_event(event.data);
      break;
    }
    case Event::Type::Custom: {
      do_custom_event(actor_info, *event.data.custom_event);
      break;
    }
    case Event::Type::NoType: {
      UNREACHABLE();
      break;
    }
  }
  // can't clear event here. It may be already destroyed during destory_actor
}

void Scheduler::register_migrated_actor(ActorInfo *actor_info) {
  VLOG(actor) << "Register migrated actor: " << tag("name", *actor_info) << tag("ptr", actor_info)
              << tag("actor_count", actor_count_);
  actor_count_++;
  CHECK(actor_info->is_migrating());
  CHECK(sched_id_ == actor_info->migrate_dest());
  // CHECK(!actor_info->is_running());
  actor_info->finish_migrate();
  for (auto &event : actor_info->mailbox_) {
    finish_migrate(event);
  }
  auto it = pending_events_.find(actor_info);
  if (it != pending_events_.end()) {
    actor_info->mailbox_.insert(actor_info->mailbox_.end(), make_move_iterator(begin(it->second)),
                                make_move_iterator(end(it->second)));
    pending_events_.erase(it);
  }
  if (actor_info->mailbox_.empty()) {
    pending_actors_list_.put(actor_info->get_list_node());
  } else {
    ready_actors_list_.put(actor_info->get_list_node());
  }
  actor_info->get_actor_unsafe()->on_finish_migrate();
}

void Scheduler::send_to_other_scheduler(int32 sched_id, const ActorId<> &actor_id, Event &&event) {
  if (sched_id < sched_count()) {
    auto actor_info = actor_id.get_actor_info();
    if (actor_info) {
      VLOG(actor) << "Send to " << *actor_info << " on scheduler " << sched_id << ": " << event;
    } else {
      VLOG(actor) << "Send to scheduler " << sched_id << ": " << event;
    }
    start_migrate(event, sched_id);
    outbound_queues_[sched_id]->writer_put(EventCreator::event_unsafe(actor_id, std::move(event)));
    outbound_queues_[sched_id]->writer_flush();
  }
}

void Scheduler::add_to_mailbox(ActorInfo *actor_info, Event &&event) {
  if (!actor_info->is_running()) {
    auto node = actor_info->get_list_node();
    node->remove();
    ready_actors_list_.put(node);
  }
  VLOG(actor) << "Add to mailbox: " << *actor_info << " " << event;
  actor_info->mailbox_.push_back(std::move(event));
}

void Scheduler::do_stop_actor(Actor *actor) {
  return do_stop_actor(actor->get_info());
}
void Scheduler::do_stop_actor(ActorInfo *actor_info) {
  CHECK(!actor_info->is_migrating());
  CHECK(actor_info->migrate_dest() == sched_id_) << actor_info->migrate_dest() << " " << sched_id_;
  ObjectPool<ActorInfo>::OwnerPtr owner_ptr;
  if (!actor_info->is_lite()) {
    EventGuard guard(this, actor_info);
    do_event(actor_info, Event::stop());
    owner_ptr = actor_info->get_actor_unsafe()->clear();
    // Actor context is visible in destructor
    actor_info->destroy_actor();
    event_context_ptr_->flags = 0;
  } else {
    owner_ptr = actor_info->get_actor_unsafe()->clear();
  }
  destroy_actor(actor_info);
}

void Scheduler::migrate_actor(Actor *actor, int32 dest_sched_id) {
  migrate_actor(actor->get_info(), dest_sched_id);
}
void Scheduler::migrate_actor(ActorInfo *actor_info, int32 dest_sched_id) {
  CHECK(event_context_ptr_->actor_info == actor_info);
  if (sched_id_ == dest_sched_id) {
    return;
  }
  event_context_ptr_->flags |= EventContext::Migrate;
  event_context_ptr_->dest_sched_id = dest_sched_id;
}

void Scheduler::do_migrate_actor(Actor *actor, int32 dest_sched_id) {
  do_migrate_actor(actor->get_info(), dest_sched_id);
}
void Scheduler::do_migrate_actor(ActorInfo *actor_info, int32 dest_sched_id) {
#if TD_THREAD_UNSUPPORTED || TD_EVENTFD_UNSUPPORTED
  dest_sched_id = 0;
#endif
  if (sched_id_ == dest_sched_id) {
    return;
  }
  start_migrate_actor(actor_info, dest_sched_id);
  send_to_other_scheduler(dest_sched_id, ActorId<>(), Event::raw(actor_info));
}

void Scheduler::start_migrate_actor(Actor *actor, int32 dest_sched_id) {
  start_migrate_actor(actor->get_info(), dest_sched_id);
}
void Scheduler::start_migrate_actor(ActorInfo *actor_info, int32 dest_sched_id) {
  VLOG(actor) << "Start migrate actor: " << tag("name", actor_info) << tag("ptr", actor_info)
              << tag("actor_count", actor_count_);
  actor_count_--;
  CHECK(actor_count_ >= 0);
  actor_info->get_actor_unsafe()->on_start_migrate(dest_sched_id);
  for (auto &event : actor_info->mailbox_) {
    start_migrate(event, dest_sched_id);
  }
  actor_info->start_migrate(dest_sched_id);
  actor_info->get_list_node()->remove();
  cancel_actor_timeout(actor_info);
}

void Scheduler::set_actor_timeout_in(ActorInfo *actor_info, double timeout) {
  if (timeout > 1e10) {
    timeout = 1e10;
  }
  if (timeout < 0) {
    timeout = 0;
  }
  double expire_at = Time::now() + timeout;
  set_actor_timeout_at(actor_info, expire_at);
}

void Scheduler::set_actor_timeout_at(ActorInfo *actor_info, double timeout_at) {
  HeapNode *heap_node = actor_info->get_heap_node();
  VLOG(actor) << "set actor " << *actor_info << " " << tag("timeout", timeout_at) << timeout_at - Time::now_cached();
  if (heap_node->in_heap()) {
    timeout_queue_.fix(timeout_at, heap_node);
  } else {
    timeout_queue_.insert(timeout_at, heap_node);
  }
}

void Scheduler::run_poll(double timeout) {
  // LOG(DEBUG) << "run poll [timeout:" << format::as_time(timeout) << "]";
  // we can't wait for less than 1ms
  poll_.run(static_cast<int32>(timeout * 1000 + 1));

#if !TD_THREAD_UNSUPPORTED && !TD_EVENTFD_UNSUPPORTED
  if (can_read(event_fd_.get_fd())) {
    std::atomic_thread_fence(std::memory_order_acquire);
    event_fd_.acquire();
  }
#endif
}

void Scheduler::run_mailbox() {
  VLOG(actor) << "run mailbox : begin";
  ListNode actors_list = std::move(ready_actors_list_);
  while (!actors_list.empty()) {
    ListNode *node = actors_list.get();
    CHECK(node);
    auto actor_info = ActorInfo::from_list_node(node);
    inc_wait_generation();
    flush_mailbox(actor_info, static_cast<void (*)(ActorInfo *)>(nullptr), static_cast<Event (*)()>(nullptr));
  }
  VLOG(actor) << "run mailbox : finish " << actor_count_;

  //Useful for debug, but O(ActorsCount) check

  //int cnt = 0;
  //for (ListNode *end = &pending_actors_list_, *it = pending_actors_list_.next; it != end; it = it->next) {
  //cnt++;
  //auto actor_info = ActorInfo::from_list_node(it);
  //LOG(ERROR) << *actor_info;
  //CHECK(actor_info->mailbox_.empty());
  //CHECK(!actor_info->is_running());
  //}
  //for (ListNode *end = &ready_actors_list_, *it = ready_actors_list_.next; it != end; it = it->next) {
  //auto actor_info = ActorInfo::from_list_node(it);
  //LOG(ERROR) << *actor_info;
  //cnt++;
  //}
  //CHECK(cnt == actor_count_) << cnt << " vs " << actor_count_;
}

double Scheduler::run_timeout() {
  double now = Time::now();
  while (!timeout_queue_.empty() && timeout_queue_.top_key() < now) {
    HeapNode *node = timeout_queue_.pop();
    ActorInfo *actor_info = ActorInfo::from_heap_node(node);
    inc_wait_generation();
    send(actor_info->actor_id(), Event::timeout(), Send::immediate);
  }
  if (timeout_queue_.empty()) {
    return 10000;
  }
  double timeout = timeout_queue_.top_key() - now;
  // LOG(DEBUG) << "Timeout [cnt:" << timeout_queue_.size() << "] in " << format::as_time(timeout);
  return timeout;
}

void Scheduler::run_no_guard(double timeout) {
  CHECK(has_guard_);
  SCOPE_EXIT {
    yield_flag_ = false;
  };

  double next_timeout = run_events();
  if (next_timeout < timeout) {
    timeout = next_timeout;
  }
  if (yield_flag_) {
    return;
  }
  run_poll(timeout);
  run_events();
}

}  // namespace td