//
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include "td/telegram/files/FileDb.h"

#include "td/telegram/files/FileLocation.h"
#include "td/telegram/Version.h"

#include "td/actor/actor.h"

#include "td/db/SqliteKeyValueSafe.h"

#include "td/utils/format.h"
#include "td/utils/logging.h"
#include "td/utils/misc.h"
#include "td/utils/ScopeGuard.h"
#include "td/utils/Slice.h"
#include "td/utils/StackAllocator.h"
#include "td/utils/Status.h"
#include "td/utils/tl_helpers.h"
#include "td/utils/tl_parsers.h"
#include "td/utils/tl_storers.h"

namespace td {

Status drop_file_db(SqliteDb &db, int32 version) {
  LOG(WARNING) << "Drop file_db " << tag("version", version) << tag("current_db_version", current_db_version());
  TRY_STATUS(SqliteKeyValue::drop(db, "files"));
  return Status::OK();
}

Status fix_file_remote_location_key_bug(SqliteDb &db);
Status init_file_db(SqliteDb &db, int32 version) {
  LOG(INFO) << "Init file db " << tag("version", version);

  // Check if database exists
  TRY_RESULT(has_table, db.has_table("files"));

  if (!has_table) {
    version = 0;
  } else if (version < static_cast<int32>(DbVersion::DialogDbCreated)) {
    TRY_STATUS(drop_file_db(db, version));
    version = 0;
  } else if (version < static_cast<int>(DbVersion::FixFileRemoteLocationKeyBug)) {
    TRY_STATUS(fix_file_remote_location_key_bug(db));
  }

  if (version == 0) {
    TRY_STATUS(SqliteKeyValue::init(db, "files"));
  }
  return Status::OK();
}

class FileDb : public FileDbInterface {
 public:
  class FileDbActor : public Actor {
   public:
    using Id = FileDbInterface::Id;
    FileDbActor(Id current_pmc_id, std::shared_ptr<SqliteKeyValueSafe> file_kv_safe)
        : current_pmc_id_(current_pmc_id), file_kv_safe_(std::move(file_kv_safe)) {
    }

    void close(Promise<> promise) {
      file_kv_safe_.reset();
      LOG(INFO) << "FileDb is closed";
      promise.set_value(Unit());
      stop();
    }

    void load_file_data(const string &key, Promise<FileData> promise) {
      promise.set_result(load_file_data_impl(file_pmc(), key));
    }

    void clear_file_data(Id id, const string &remote_key, const string &local_key, const string &generate_key) {
      auto &pmc = file_pmc();
      pmc.begin_transaction().ensure();
      SCOPE_EXIT {
        pmc.commit_transaction().ensure();
      };

      if (id > current_pmc_id_) {
        pmc.set("file_id", to_string(id));
        current_pmc_id_ = id;
      }

      pmc.erase("file" + to_string(id));
      LOG(DEBUG) << "ERASE " << format::as_hex_dump<4>(Slice(PSLICE() << "file" << to_string(id)));

      if (!remote_key.empty()) {
        pmc.erase(remote_key);
        LOG(DEBUG) << "ERASE remote " << format::as_hex_dump<4>(Slice(remote_key));
      }
      if (!local_key.empty()) {
        pmc.erase(local_key);
        LOG(DEBUG) << "ERASE local " << format::as_hex_dump<4>(Slice(local_key));
      }
      if (!generate_key.empty()) {
        pmc.erase(generate_key);
      }
    }
    void store_file_data(Id id, const string &file_data, const string &remote_key, const string &local_key,
                         const string &generate_key) {
      auto &pmc = file_pmc();
      pmc.begin_transaction().ensure();
      SCOPE_EXIT {
        pmc.commit_transaction().ensure();
      };

      if (id > current_pmc_id_) {
        pmc.set("file_id", to_string(id));
        current_pmc_id_ = id;
      }

      pmc.set("file" + to_string(id), file_data);

      if (!remote_key.empty()) {
        pmc.set(remote_key, to_string(id));
      }
      if (!local_key.empty()) {
        pmc.set(local_key, to_string(id));
      }
      if (!generate_key.empty()) {
        pmc.set(generate_key, to_string(id));
      }
    }
    void store_file_data_ref(Id id, Id new_id) {
      auto &pmc = file_pmc();
      pmc.begin_transaction().ensure();
      SCOPE_EXIT {
        pmc.commit_transaction().ensure();
      };

      if (id > current_pmc_id_) {
        pmc.set("file_id", to_string(id));
        current_pmc_id_ = id;
      }

      pmc.set("file" + to_string(id), "@@" + to_string(new_id));
    }

   private:
    Id current_pmc_id_;
    std::shared_ptr<SqliteKeyValueSafe> file_kv_safe_;

    SqliteKeyValue &file_pmc() {
      return file_kv_safe_->get();
    }
  };

  explicit FileDb(std::shared_ptr<SqliteKeyValueSafe> kv_safe, int scheduler_id = -1) {
    file_kv_safe_ = std::move(kv_safe);
    CHECK(file_kv_safe_);
    current_pmc_id_ = to_integer<int32>(file_kv_safe_->get().get("file_id"));
    file_db_actor_ =
        create_actor_on_scheduler<FileDbActor>("FileDbActor", scheduler_id, current_pmc_id_, file_kv_safe_);
  }

  Id create_pmc_id() override {
    return ++current_pmc_id_;
  }

  void close(Promise<> promise) override {
    send_closure(std::move(file_db_actor_), &FileDbActor::close, std::move(promise));
  }

  void get_file_data_impl(string key, Promise<FileData> promise) override {
    send_closure(file_db_actor_, &FileDbActor::load_file_data, std::move(key), std::move(promise));
  }

  Result<FileData> get_file_data_sync_impl(string key) override {
    return load_file_data_impl(file_kv_safe_->get(), key);
  }

  void clear_file_data(Id id, const FileData &file_data) override {
    string remote_key;
    if (file_data.remote_.type() == RemoteFileLocation::Type::Full) {
      remote_key = as_key(file_data.remote_.full());
    }
    string local_key;
    if (file_data.local_.type() == LocalFileLocation::Type::Full) {
      local_key = as_key(file_data.local_.full());
    }
    string generate_key;
    if (file_data.generate_.type() == GenerateFileLocation::Type::Full) {
      generate_key = as_key(file_data.generate_.full());
    }
    send_closure(file_db_actor_, &FileDbActor::clear_file_data, id, remote_key, local_key, generate_key);
  }
  void set_file_data(Id id, const FileData &file_data, bool new_remote, bool new_local, bool new_generate) override {
    string remote_key;
    if (file_data.remote_.type() == RemoteFileLocation::Type::Full && new_remote) {
      remote_key = as_key(file_data.remote_.full());
    }
    string local_key;
    if (file_data.local_.type() == LocalFileLocation::Type::Full && new_local) {
      local_key = as_key(file_data.local_.full());
    }
    string generate_key;
    if (file_data.generate_.type() == GenerateFileLocation::Type::Full && new_generate) {
      generate_key = as_key(file_data.generate_.full());
    }
    LOG(DEBUG) << "SAVE " << id << " -> " << file_data << " "
               << tag("remote", format::as_hex_dump<4>(Slice(remote_key)))
               << tag("local", format::as_hex_dump<4>(Slice(local_key)));
    send_closure(file_db_actor_, &FileDbActor::store_file_data, id, serialize(file_data), remote_key, local_key,
                 generate_key);
  }

  void set_file_data_ref(Id id, Id new_id) override {
    send_closure(file_db_actor_, &FileDbActor::store_file_data_ref, id, new_id);
  }
  SqliteKeyValue &pmc() override {
    return file_kv_safe_->get();
  }

 private:
  ActorOwn<FileDbActor> file_db_actor_;
  Id current_pmc_id_;
  std::shared_ptr<SqliteKeyValueSafe> file_kv_safe_;

  static Result<FileData> load_file_data_impl(SqliteKeyValue &pmc, const string &key) {
    //LOG(DEBUG) << "Load by key " << format::as_hex_dump<4>(Slice(key));
    TRY_RESULT(id, get_id(pmc, key));

    string data_str;
    int attempts_count = 0;
    while (true) {
      if (attempts_count > 5) {
        LOG(FATAL) << "cycle in files db?";
      }
      attempts_count++;

      data_str = pmc.get(PSTRING() << "file" << id);
      auto data_slice = Slice(data_str);

      if (data_slice.substr(0, 2) == "@@") {
        id = to_integer<Id>(data_slice.substr(2));
      } else {
        break;
      }
    }
    //LOG(DEBUG) << "By id " << id << " found data " << format::as_hex_dump<4>(Slice(data_str));

    FileData data;
    auto status = unserialize(data, data_str);
    if (status.is_error()) {
      return std::move(status);
    }
    return std::move(data);
  }

  static Result<Id> get_id(SqliteKeyValue &pmc, const string &key) TD_WARN_UNUSED_RESULT {
    auto id_str = pmc.get(key);
    //LOG(DEBUG) << "Found id " << id_str << " by key " << format::as_hex_dump<4>(Slice(key));
    if (id_str.empty()) {
      return Status::Error("There is no such a key in db");
    }
    return to_integer<Id>(id_str);
  }
};

std::shared_ptr<FileDbInterface> create_file_db(std::shared_ptr<SqliteConnectionSafe> connection, int scheduler_id) {
  auto kv = std::make_shared<SqliteKeyValueSafe>("files", std::move(connection));
  return std::make_shared<FileDb>(std::move(kv), scheduler_id);
}

Status fix_file_remote_location_key_bug(SqliteDb &db) {
  static const int32 OLD_KEY_MAGIC = 0x64378433;
  SqliteKeyValue kv;
  kv.init_with_connection(db.clone(), "files").ensure();
  auto ptr = StackAllocator::alloc(4);
  MutableSlice prefix = ptr.as_slice();
  TlStorerUnsafe(prefix.begin()).store_int(OLD_KEY_MAGIC);
  kv.get_by_prefix(prefix, [&](Slice key, Slice value) {
    CHECK(TlParser(key).fetch_int() == OLD_KEY_MAGIC);
    auto remote_str = PSTRING() << key.substr(4, 4) << Slice("\0\0\0\0") << key.substr(8);
    FullRemoteFileLocation remote;
    if (unserialize(remote, remote_str).is_ok()) {
      kv.set(as_key(remote), value);
    }
    LOG(DEBUG) << "ERASE " << format::as_hex_dump<4>(Slice(key));
    kv.erase(key);
  });
  return Status::OK();
}

}  // namespace td