// Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved.
//  Copyright (c) 2016, Red Hat, Inc.  All rights reserved.
//  This source code is licensed under both the GPLv2 (found in the
//  COPYING file in the root directory) and Apache 2.0 License
//  (found in the LICENSE.Apache file in the root directory).

#ifndef ROCKSDB_LITE

#include "rocksdb/utilities/env_librados.h"
#include <rados/librados.hpp>
#include "env/mock_env.h"
#include "test_util/testharness.h"

#include "rocksdb/db.h"
#include "rocksdb/slice.h"
#include "rocksdb/options.h"
#include "util/random.h"
#include <chrono>
#include <ostream>
#include "rocksdb/utilities/transaction_db.h"

class Timer {
  typedef std::chrono::high_resolution_clock high_resolution_clock;
  typedef std::chrono::milliseconds milliseconds;
public:
  explicit Timer(bool run = false)
  {
    if (run)
      Reset();
  }
  void Reset()
  {
    _start = high_resolution_clock::now();
  }
  milliseconds Elapsed() const
  {
    return std::chrono::duration_cast<milliseconds>(high_resolution_clock::now() - _start);
  }
  template <typename T, typename Traits>
  friend std::basic_ostream<T, Traits>& operator<<(std::basic_ostream<T, Traits>& out, const Timer& timer)
  {
    return out << timer.Elapsed().count();
  }
private:
  high_resolution_clock::time_point _start;
};

namespace ROCKSDB_NAMESPACE {

class EnvLibradosTest : public testing::Test {
public:
  // we will use all of these below
  const std::string db_name = "env_librados_test_db";
  const std::string db_pool = db_name + "_pool";
  const char *keyring = "admin";
  const char *config = "../ceph/src/ceph.conf";

  EnvLibrados* env_;
  const EnvOptions soptions_;

  EnvLibradosTest()
    : env_(new EnvLibrados(db_name, config, db_pool)) {
  }
  ~EnvLibradosTest() {
    delete env_;
    librados::Rados rados;
    int ret = 0;
    do {
      ret = rados.init("admin"); // just use the client.admin keyring
      if (ret < 0) { // let's handle any error that might have come back
        std::cerr << "couldn't initialize rados! error " << ret << std::endl;
        ret = EXIT_FAILURE;
        break;
      }

      ret = rados.conf_read_file(config);
      if (ret < 0) {
        // This could fail if the config file is malformed, but it'd be hard.
        std::cerr << "failed to parse config file " << config
                  << "! error" << ret << std::endl;
        ret = EXIT_FAILURE;
        break;
      }

      /*
       * next, we actually connect to the cluster
       */

      ret = rados.connect();
      if (ret < 0) {
        std::cerr << "couldn't connect to cluster! error " << ret << std::endl;
        ret = EXIT_FAILURE;
        break;
      }

      /*
       * And now we're done, so let's remove our pool and then
       * shut down the connection gracefully.
       */
      int delete_ret = rados.pool_delete(db_pool.c_str());
      if (delete_ret < 0) {
        // be careful not to
        std::cerr << "We failed to delete our test pool!" << db_pool << delete_ret << std::endl;
        ret = EXIT_FAILURE;
      }
    } while (0);
  }
};

TEST_F(EnvLibradosTest, Basics) {
  uint64_t file_size;
  std::unique_ptr<WritableFile> writable_file;
  std::vector<std::string> children;

  ASSERT_OK(env_->CreateDir("/dir"));
  // Check that the directory is empty.
  ASSERT_EQ(Status::NotFound(), env_->FileExists("/dir/non_existent"));
  ASSERT_TRUE(!env_->GetFileSize("/dir/non_existent", &file_size).ok());
  ASSERT_OK(env_->GetChildren("/dir", &children));
  ASSERT_EQ(0U, children.size());

  // Create a file.
  ASSERT_OK(env_->NewWritableFile("/dir/f", &writable_file, soptions_));
  writable_file.reset();

  // Check that the file exists.
  ASSERT_OK(env_->FileExists("/dir/f"));
  ASSERT_OK(env_->GetFileSize("/dir/f", &file_size));
  ASSERT_EQ(0U, file_size);
  ASSERT_OK(env_->GetChildren("/dir", &children));
  ASSERT_EQ(1U, children.size());
  ASSERT_EQ("f", children[0]);

  // Write to the file.
  ASSERT_OK(env_->NewWritableFile("/dir/f", &writable_file, soptions_));
  ASSERT_OK(writable_file->Append("abc"));
  writable_file.reset();


  // Check for expected size.
  ASSERT_OK(env_->GetFileSize("/dir/f", &file_size));
  ASSERT_EQ(3U, file_size);


  // Check that renaming works.
  ASSERT_TRUE(!env_->RenameFile("/dir/non_existent", "/dir/g").ok());
  ASSERT_OK(env_->RenameFile("/dir/f", "/dir/g"));
  ASSERT_EQ(Status::NotFound(), env_->FileExists("/dir/f"));
  ASSERT_OK(env_->FileExists("/dir/g"));
  ASSERT_OK(env_->GetFileSize("/dir/g", &file_size));
  ASSERT_EQ(3U, file_size);

  // Check that opening non-existent file fails.
  std::unique_ptr<SequentialFile> seq_file;
  std::unique_ptr<RandomAccessFile> rand_file;
  ASSERT_TRUE(
    !env_->NewSequentialFile("/dir/non_existent", &seq_file, soptions_).ok());
  ASSERT_TRUE(!seq_file);
  ASSERT_TRUE(!env_->NewRandomAccessFile("/dir/non_existent", &rand_file,
                                         soptions_).ok());
  ASSERT_TRUE(!rand_file);

  // Check that deleting works.
  ASSERT_TRUE(!env_->DeleteFile("/dir/non_existent").ok());
  ASSERT_OK(env_->DeleteFile("/dir/g"));
  ASSERT_EQ(Status::NotFound(), env_->FileExists("/dir/g"));
  ASSERT_OK(env_->GetChildren("/dir", &children));
  ASSERT_EQ(0U, children.size());
  ASSERT_OK(env_->DeleteDir("/dir"));
}

TEST_F(EnvLibradosTest, ReadWrite) {
  std::unique_ptr<WritableFile> writable_file;
  std::unique_ptr<SequentialFile> seq_file;
  std::unique_ptr<RandomAccessFile> rand_file;
  Slice result;
  char scratch[100];

  ASSERT_OK(env_->CreateDir("/dir"));

  ASSERT_OK(env_->NewWritableFile("/dir/f", &writable_file, soptions_));
  ASSERT_OK(writable_file->Append("hello "));
  ASSERT_OK(writable_file->Append("world"));
  writable_file.reset();

  // Read sequentially.
  ASSERT_OK(env_->NewSequentialFile("/dir/f", &seq_file, soptions_));
  ASSERT_OK(seq_file->Read(5, &result, scratch));  // Read "hello".
  ASSERT_EQ(0, result.compare("hello"));
  ASSERT_OK(seq_file->Skip(1));
  ASSERT_OK(seq_file->Read(1000, &result, scratch));  // Read "world".
  ASSERT_EQ(0, result.compare("world"));
  ASSERT_OK(seq_file->Read(1000, &result, scratch));  // Try reading past EOF.
  ASSERT_EQ(0U, result.size());
  ASSERT_OK(seq_file->Skip(100));  // Try to skip past end of file.
  ASSERT_OK(seq_file->Read(1000, &result, scratch));
  ASSERT_EQ(0U, result.size());

  // Random reads.
  ASSERT_OK(env_->NewRandomAccessFile("/dir/f", &rand_file, soptions_));
  ASSERT_OK(rand_file->Read(6, 5, &result, scratch));  // Read "world".
  ASSERT_EQ(0, result.compare("world"));
  ASSERT_OK(rand_file->Read(0, 5, &result, scratch));  // Read "hello".
  ASSERT_EQ(0, result.compare("hello"));
  ASSERT_OK(rand_file->Read(10, 100, &result, scratch));  // Read "d".
  ASSERT_EQ(0, result.compare("d"));

  // Too high offset.
  ASSERT_OK(rand_file->Read(1000, 5, &result, scratch));
}

TEST_F(EnvLibradosTest, Locks) {
  FileLock* lock = nullptr;
  std::unique_ptr<WritableFile> writable_file;

  ASSERT_OK(env_->CreateDir("/dir"));

  ASSERT_OK(env_->NewWritableFile("/dir/f", &writable_file, soptions_));

  // These are no-ops, but we test they return success.
  ASSERT_OK(env_->LockFile("some file", &lock));
  ASSERT_OK(env_->UnlockFile(lock));

  ASSERT_OK(env_->LockFile("/dir/f", &lock));
  ASSERT_OK(env_->UnlockFile(lock));
}

TEST_F(EnvLibradosTest, Misc) {
  std::string test_dir;
  ASSERT_OK(env_->GetTestDirectory(&test_dir));
  ASSERT_TRUE(!test_dir.empty());

  std::unique_ptr<WritableFile> writable_file;
  ASSERT_TRUE(!env_->NewWritableFile("/a/b", &writable_file, soptions_).ok());

  ASSERT_OK(env_->NewWritableFile("/a", &writable_file, soptions_));
  // These are no-ops, but we test they return success.
  ASSERT_OK(writable_file->Sync());
  ASSERT_OK(writable_file->Flush());
  ASSERT_OK(writable_file->Close());
  writable_file.reset();
}

TEST_F(EnvLibradosTest, LargeWrite) {
  const size_t kWriteSize = 300 * 1024;
  char* scratch = new char[kWriteSize * 2];

  std::string write_data;
  for (size_t i = 0; i < kWriteSize; ++i) {
    write_data.append(1, 'h');
  }

  std::unique_ptr<WritableFile> writable_file;
  ASSERT_OK(env_->CreateDir("/dir"));
  ASSERT_OK(env_->NewWritableFile("/dir/g", &writable_file, soptions_));
  ASSERT_OK(writable_file->Append("foo"));
  ASSERT_OK(writable_file->Append(write_data));
  writable_file.reset();

  std::unique_ptr<SequentialFile> seq_file;
  Slice result;
  ASSERT_OK(env_->NewSequentialFile("/dir/g", &seq_file, soptions_));
  ASSERT_OK(seq_file->Read(3, &result, scratch));  // Read "foo".
  ASSERT_EQ(0, result.compare("foo"));

  size_t read = 0;
  std::string read_data;
  while (read < kWriteSize) {
    ASSERT_OK(seq_file->Read(kWriteSize - read, &result, scratch));
    read_data.append(result.data(), result.size());
    read += result.size();
  }
  ASSERT_TRUE(write_data == read_data);
  delete[] scratch;
}

TEST_F(EnvLibradosTest, FrequentlySmallWrite) {
  const size_t kWriteSize = 1 << 10;
  char* scratch = new char[kWriteSize * 2];

  std::string write_data;
  for (size_t i = 0; i < kWriteSize; ++i) {
    write_data.append(1, 'h');
  }

  std::unique_ptr<WritableFile> writable_file;
  ASSERT_OK(env_->CreateDir("/dir"));
  ASSERT_OK(env_->NewWritableFile("/dir/g", &writable_file, soptions_));
  ASSERT_OK(writable_file->Append("foo"));

  for (size_t i = 0; i < kWriteSize; ++i) {
    ASSERT_OK(writable_file->Append("h"));
  }
  writable_file.reset();

  std::unique_ptr<SequentialFile> seq_file;
  Slice result;
  ASSERT_OK(env_->NewSequentialFile("/dir/g", &seq_file, soptions_));
  ASSERT_OK(seq_file->Read(3, &result, scratch));  // Read "foo".
  ASSERT_EQ(0, result.compare("foo"));

  size_t read = 0;
  std::string read_data;
  while (read < kWriteSize) {
    ASSERT_OK(seq_file->Read(kWriteSize - read, &result, scratch));
    read_data.append(result.data(), result.size());
    read += result.size();
  }
  ASSERT_TRUE(write_data == read_data);
  delete[] scratch;
}

TEST_F(EnvLibradosTest, Truncate) {
  const size_t kWriteSize = 300 * 1024;
  const size_t truncSize = 1024;
  std::string write_data;
  for (size_t i = 0; i < kWriteSize; ++i) {
    write_data.append(1, 'h');
  }

  std::unique_ptr<WritableFile> writable_file;
  ASSERT_OK(env_->CreateDir("/dir"));
  ASSERT_OK(env_->NewWritableFile("/dir/g", &writable_file, soptions_));
  ASSERT_OK(writable_file->Append(write_data));
  ASSERT_EQ(writable_file->GetFileSize(), kWriteSize);
  ASSERT_OK(writable_file->Truncate(truncSize));
  ASSERT_EQ(writable_file->GetFileSize(), truncSize);
  writable_file.reset();
}

TEST_F(EnvLibradosTest, DBBasics) {
  std::string kDBPath = "/tmp/DBBasics";
  DB* db;
  Options options;
  // Optimize RocksDB. This is the easiest way to get RocksDB to perform well
  options.IncreaseParallelism();
  options.OptimizeLevelStyleCompaction();
  // create the DB if it's not already present
  options.create_if_missing = true;
  options.env = env_;

  // open DB
  Status s = DB::Open(options, kDBPath, &db);
  assert(s.ok());

  // Put key-value
  s = db->Put(WriteOptions(), "key1", "value");
  assert(s.ok());
  std::string value;
  // get value
  s = db->Get(ReadOptions(), "key1", &value);
  assert(s.ok());
  assert(value == "value");

  // atomically apply a set of updates
  {
    WriteBatch batch;
    batch.Delete("key1");
    batch.Put("key2", value);
    s = db->Write(WriteOptions(), &batch);
  }

  s = db->Get(ReadOptions(), "key1", &value);
  assert(s.IsNotFound());

  db->Get(ReadOptions(), "key2", &value);
  assert(value == "value");

  delete db;
}

TEST_F(EnvLibradosTest, DBLoadKeysInRandomOrder) {
  char key[20] = {0}, value[20] = {0};
  int max_loop = 1 << 10;
  Timer timer(false);
  std::cout << "Test size : loop(" << max_loop << ")" << std::endl;
  /**********************************
            use default env
  ***********************************/
  std::string kDBPath1 = "/tmp/DBLoadKeysInRandomOrder1";
  DB* db1;
  Options options1;
  // Optimize Rocksdb. This is the easiest way to get RocksDB to perform well
  options1.IncreaseParallelism();
  options1.OptimizeLevelStyleCompaction();
  // create the DB if it's not already present
  options1.create_if_missing = true;

  // open DB
  Status s1 = DB::Open(options1, kDBPath1, &db1);
  assert(s1.ok());

  ROCKSDB_NAMESPACE::Random64 r1(time(nullptr));

  timer.Reset();
  for (int i = 0; i < max_loop; ++i) {
    snprintf(key,
             20,
             "%16lx",
             (unsigned long)r1.Uniform(std::numeric_limits<uint64_t>::max()));
    snprintf(value,
             20,
             "%16lx",
             (unsigned long)r1.Uniform(std::numeric_limits<uint64_t>::max()));
    // Put key-value
    s1 = db1->Put(WriteOptions(), key, value);
    assert(s1.ok());
  }
  std::cout << "Time by default : " << timer << "ms" << std::endl;
  delete db1;

  /**********************************
            use librados env
  ***********************************/
  std::string kDBPath2 = "/tmp/DBLoadKeysInRandomOrder2";
  DB* db2;
  Options options2;
  // Optimize RocksDB. This is the easiest way to get RocksDB to perform well
  options2.IncreaseParallelism();
  options2.OptimizeLevelStyleCompaction();
  // create the DB if it's not already present
  options2.create_if_missing = true;
  options2.env = env_;

  // open DB
  Status s2 = DB::Open(options2, kDBPath2, &db2);
  assert(s2.ok());

  ROCKSDB_NAMESPACE::Random64 r2(time(nullptr));

  timer.Reset();
  for (int i = 0; i < max_loop; ++i) {
    snprintf(key,
             20,
             "%16lx",
             (unsigned long)r2.Uniform(std::numeric_limits<uint64_t>::max()));
    snprintf(value,
             20,
             "%16lx",
             (unsigned long)r2.Uniform(std::numeric_limits<uint64_t>::max()));
    // Put key-value
    s2 = db2->Put(WriteOptions(), key, value);
    assert(s2.ok());
  }
  std::cout << "Time by librados : " << timer << "ms" << std::endl;
  delete db2;
}

TEST_F(EnvLibradosTest, DBBulkLoadKeysInRandomOrder) {
  char key[20] = {0}, value[20] = {0};
  int max_loop = 1 << 6;
  int bulk_size = 1 << 15;
  Timer timer(false);
  std::cout << "Test size : loop(" << max_loop << "); bulk_size(" << bulk_size << ")" << std::endl;
  /**********************************
            use default env
  ***********************************/
  std::string kDBPath1 = "/tmp/DBBulkLoadKeysInRandomOrder1";
  DB* db1;
  Options options1;
  // Optimize Rocksdb. This is the easiest way to get RocksDB to perform well
  options1.IncreaseParallelism();
  options1.OptimizeLevelStyleCompaction();
  // create the DB if it's not already present
  options1.create_if_missing = true;

  // open DB
  Status s1 = DB::Open(options1, kDBPath1, &db1);
  assert(s1.ok());

  ROCKSDB_NAMESPACE::Random64 r1(time(nullptr));

  timer.Reset();
  for (int i = 0; i < max_loop; ++i) {
    WriteBatch batch;
    for (int j = 0; j < bulk_size; ++j) {
      snprintf(key,
               20,
               "%16lx",
               (unsigned long)r1.Uniform(std::numeric_limits<uint64_t>::max()));
      snprintf(value,
               20,
               "%16lx",
               (unsigned long)r1.Uniform(std::numeric_limits<uint64_t>::max()));
      batch.Put(key, value);
    }
    s1 = db1->Write(WriteOptions(), &batch);
    assert(s1.ok());
  }
  std::cout << "Time by default : " << timer << "ms" << std::endl;
  delete db1;

  /**********************************
            use librados env
  ***********************************/
  std::string kDBPath2 = "/tmp/DBBulkLoadKeysInRandomOrder2";
  DB* db2;
  Options options2;
  // Optimize RocksDB. This is the easiest way to get RocksDB to perform well
  options2.IncreaseParallelism();
  options2.OptimizeLevelStyleCompaction();
  // create the DB if it's not already present
  options2.create_if_missing = true;
  options2.env = env_;

  // open DB
  Status s2 = DB::Open(options2, kDBPath2, &db2);
  assert(s2.ok());

  ROCKSDB_NAMESPACE::Random64 r2(time(nullptr));

  timer.Reset();
  for (int i = 0; i < max_loop; ++i) {
    WriteBatch batch;
    for (int j = 0; j < bulk_size; ++j) {
      snprintf(key,
               20,
               "%16lx",
               (unsigned long)r2.Uniform(std::numeric_limits<uint64_t>::max()));
      snprintf(value,
               20,
               "%16lx",
               (unsigned long)r2.Uniform(std::numeric_limits<uint64_t>::max()));
      batch.Put(key, value);
    }
    s2 = db2->Write(WriteOptions(), &batch);
    assert(s2.ok());
  }
  std::cout << "Time by librados : " << timer << "ms" << std::endl;
  delete db2;
}

TEST_F(EnvLibradosTest, DBBulkLoadKeysInSequentialOrder) {
  char key[20] = {0}, value[20] = {0};
  int max_loop = 1 << 6;
  int bulk_size = 1 << 15;
  Timer timer(false);
  std::cout << "Test size : loop(" << max_loop << "); bulk_size(" << bulk_size << ")" << std::endl;
  /**********************************
            use default env
  ***********************************/
  std::string kDBPath1 = "/tmp/DBBulkLoadKeysInSequentialOrder1";
  DB* db1;
  Options options1;
  // Optimize Rocksdb. This is the easiest way to get RocksDB to perform well
  options1.IncreaseParallelism();
  options1.OptimizeLevelStyleCompaction();
  // create the DB if it's not already present
  options1.create_if_missing = true;

  // open DB
  Status s1 = DB::Open(options1, kDBPath1, &db1);
  assert(s1.ok());

  ROCKSDB_NAMESPACE::Random64 r1(time(nullptr));

  timer.Reset();
  for (int i = 0; i < max_loop; ++i) {
    WriteBatch batch;
    for (int j = 0; j < bulk_size; ++j) {
      snprintf(key,
               20,
               "%019lld",
               (long long)(i * bulk_size + j));
      snprintf(value,
               20,
               "%16lx",
               (unsigned long)r1.Uniform(std::numeric_limits<uint64_t>::max()));
      batch.Put(key, value);
    }
    s1 = db1->Write(WriteOptions(), &batch);
    assert(s1.ok());
  }
  std::cout << "Time by default : " << timer << "ms" << std::endl;
  delete db1;

  /**********************************
            use librados env
  ***********************************/
  std::string kDBPath2 = "/tmp/DBBulkLoadKeysInSequentialOrder2";
  DB* db2;
  Options options2;
  // Optimize RocksDB. This is the easiest way to get RocksDB to perform well
  options2.IncreaseParallelism();
  options2.OptimizeLevelStyleCompaction();
  // create the DB if it's not already present
  options2.create_if_missing = true;
  options2.env = env_;

  // open DB
  Status s2 = DB::Open(options2, kDBPath2, &db2);
  assert(s2.ok());

  ROCKSDB_NAMESPACE::Random64 r2(time(nullptr));

  timer.Reset();
  for (int i = 0; i < max_loop; ++i) {
    WriteBatch batch;
    for (int j = 0; j < bulk_size; ++j) {
      snprintf(key,
               20,
               "%16lx",
               (unsigned long)r2.Uniform(std::numeric_limits<uint64_t>::max()));
      snprintf(value,
               20,
               "%16lx",
               (unsigned long)r2.Uniform(std::numeric_limits<uint64_t>::max()));
      batch.Put(key, value);
    }
    s2 = db2->Write(WriteOptions(), &batch);
    assert(s2.ok());
  }
  std::cout << "Time by librados : " << timer << "ms" << std::endl;
  delete db2;
}

TEST_F(EnvLibradosTest, DBRandomRead) {
  char key[20] = {0}, value[20] = {0};
  int max_loop = 1 << 6;
  int bulk_size = 1 << 10;
  int read_loop = 1 << 20;
  Timer timer(false);
  std::cout << "Test size : keys_num(" << max_loop << ", " << bulk_size << "); read_loop(" << read_loop << ")" << std::endl;
  /**********************************
            use default env
  ***********************************/
  std::string kDBPath1 = "/tmp/DBRandomRead1";
  DB* db1;
  Options options1;
  // Optimize Rocksdb. This is the easiest way to get RocksDB to perform well
  options1.IncreaseParallelism();
  options1.OptimizeLevelStyleCompaction();
  // create the DB if it's not already present
  options1.create_if_missing = true;

  // open DB
  Status s1 = DB::Open(options1, kDBPath1, &db1);
  assert(s1.ok());

  ROCKSDB_NAMESPACE::Random64 r1(time(nullptr));

  for (int i = 0; i < max_loop; ++i) {
    WriteBatch batch;
    for (int j = 0; j < bulk_size; ++j) {
      snprintf(key,
               20,
               "%019lld",
               (long long)(i * bulk_size + j));
      snprintf(value,
               20,
               "%16lx",
               (unsigned long)r1.Uniform(std::numeric_limits<uint64_t>::max()));
      batch.Put(key, value);
    }
    s1 = db1->Write(WriteOptions(), &batch);
    assert(s1.ok());
  }
  timer.Reset();
  int base1 = 0, offset1 = 0;
  for (int i = 0; i < read_loop; ++i) {
    base1 = r1.Uniform(max_loop);
    offset1 = r1.Uniform(bulk_size);
    std::string value1;
    snprintf(key,
             20,
             "%019lld",
             (long long)(base1 * bulk_size + offset1));
    s1 = db1->Get(ReadOptions(), key, &value1);
    assert(s1.ok());
  }
  std::cout << "Time by default : " << timer << "ms" << std::endl;
  delete db1;

  /**********************************
            use librados env
  ***********************************/
  std::string kDBPath2 = "/tmp/DBRandomRead2";
  DB* db2;
  Options options2;
  // Optimize RocksDB. This is the easiest way to get RocksDB to perform well
  options2.IncreaseParallelism();
  options2.OptimizeLevelStyleCompaction();
  // create the DB if it's not already present
  options2.create_if_missing = true;
  options2.env = env_;

  // open DB
  Status s2 = DB::Open(options2, kDBPath2, &db2);
  assert(s2.ok());

  ROCKSDB_NAMESPACE::Random64 r2(time(nullptr));

  for (int i = 0; i < max_loop; ++i) {
    WriteBatch batch;
    for (int j = 0; j < bulk_size; ++j) {
      snprintf(key,
               20,
               "%019lld",
               (long long)(i * bulk_size + j));
      snprintf(value,
               20,
               "%16lx",
               (unsigned long)r2.Uniform(std::numeric_limits<uint64_t>::max()));
      batch.Put(key, value);
    }
    s2 = db2->Write(WriteOptions(), &batch);
    assert(s2.ok());
  }

  timer.Reset();
  int base2 = 0, offset2 = 0;
  for (int i = 0; i < read_loop; ++i) {
    base2 = r2.Uniform(max_loop);
    offset2 = r2.Uniform(bulk_size);
    std::string value2;
    snprintf(key,
             20,
             "%019lld",
             (long long)(base2 * bulk_size + offset2));
    s2 = db2->Get(ReadOptions(), key, &value2);
    if (!s2.ok()) {
      std::cout << s2.ToString() << std::endl;
    }
    assert(s2.ok());
  }
  std::cout << "Time by librados : " << timer << "ms" << std::endl;
  delete db2;
}

class EnvLibradosMutipoolTest : public testing::Test {
public:
  // we will use all of these below
  const std::string client_name = "client.admin";
  const std::string cluster_name = "ceph";
  const uint64_t flags = 0;
  const std::string db_name = "env_librados_test_db";
  const std::string db_pool = db_name + "_pool";
  const std::string wal_dir = "/wal";
  const std::string wal_pool = db_name + "_wal_pool";
  const size_t write_buffer_size = 1 << 20;
  const char *keyring = "admin";
  const char *config = "../ceph/src/ceph.conf";

  EnvLibrados* env_;
  const EnvOptions soptions_;

  EnvLibradosMutipoolTest() {
    env_ = new EnvLibrados(client_name, cluster_name, flags, db_name, config, db_pool, wal_dir, wal_pool, write_buffer_size);
  }
  ~EnvLibradosMutipoolTest() {
    delete env_;
    librados::Rados rados;
    int ret = 0;
    do {
      ret = rados.init("admin"); // just use the client.admin keyring
      if (ret < 0) { // let's handle any error that might have come back
        std::cerr << "couldn't initialize rados! error " << ret << std::endl;
        ret = EXIT_FAILURE;
        break;
      }

      ret = rados.conf_read_file(config);
      if (ret < 0) {
        // This could fail if the config file is malformed, but it'd be hard.
        std::cerr << "failed to parse config file " << config
                  << "! error" << ret << std::endl;
        ret = EXIT_FAILURE;
        break;
      }

      /*
       * next, we actually connect to the cluster
       */

      ret = rados.connect();
      if (ret < 0) {
        std::cerr << "couldn't connect to cluster! error " << ret << std::endl;
        ret = EXIT_FAILURE;
        break;
      }

      /*
       * And now we're done, so let's remove our pool and then
       * shut down the connection gracefully.
       */
      int delete_ret = rados.pool_delete(db_pool.c_str());
      if (delete_ret < 0) {
        // be careful not to
        std::cerr << "We failed to delete our test pool!" << db_pool << delete_ret << std::endl;
        ret = EXIT_FAILURE;
      }
      delete_ret = rados.pool_delete(wal_pool.c_str());
      if (delete_ret < 0) {
        // be careful not to
        std::cerr << "We failed to delete our test pool!" << wal_pool << delete_ret << std::endl;
        ret = EXIT_FAILURE;
      }
    } while (0);
  }
};

TEST_F(EnvLibradosMutipoolTest, Basics) {
  uint64_t file_size;
  std::unique_ptr<WritableFile> writable_file;
  std::vector<std::string> children;
  std::vector<std::string> v = {"/tmp/dir1", "/tmp/dir2", "/tmp/dir3", "/tmp/dir4", "dir"};

  for (size_t i = 0; i < v.size(); ++i) {
    std::string dir = v[i];
    std::string dir_non_existent = dir + "/non_existent";
    std::string dir_f = dir + "/f";
    std::string dir_g = dir + "/g";

    ASSERT_OK(env_->CreateDir(dir.c_str()));
    // Check that the directory is empty.
    ASSERT_EQ(Status::NotFound(), env_->FileExists(dir_non_existent.c_str()));
    ASSERT_TRUE(!env_->GetFileSize(dir_non_existent.c_str(), &file_size).ok());
    ASSERT_OK(env_->GetChildren(dir.c_str(), &children));
    ASSERT_EQ(0U, children.size());

    // Create a file.
    ASSERT_OK(env_->NewWritableFile(dir_f.c_str(), &writable_file, soptions_));
    writable_file.reset();

    // Check that the file exists.
    ASSERT_OK(env_->FileExists(dir_f.c_str()));
    ASSERT_OK(env_->GetFileSize(dir_f.c_str(), &file_size));
    ASSERT_EQ(0U, file_size);
    ASSERT_OK(env_->GetChildren(dir.c_str(), &children));
    ASSERT_EQ(1U, children.size());
    ASSERT_EQ("f", children[0]);

    // Write to the file.
    ASSERT_OK(env_->NewWritableFile(dir_f.c_str(), &writable_file, soptions_));
    ASSERT_OK(writable_file->Append("abc"));
    writable_file.reset();


    // Check for expected size.
    ASSERT_OK(env_->GetFileSize(dir_f.c_str(), &file_size));
    ASSERT_EQ(3U, file_size);


    // Check that renaming works.
    ASSERT_TRUE(!env_->RenameFile(dir_non_existent.c_str(), dir_g.c_str()).ok());
    ASSERT_OK(env_->RenameFile(dir_f.c_str(), dir_g.c_str()));
    ASSERT_EQ(Status::NotFound(), env_->FileExists(dir_f.c_str()));
    ASSERT_OK(env_->FileExists(dir_g.c_str()));
    ASSERT_OK(env_->GetFileSize(dir_g.c_str(), &file_size));
    ASSERT_EQ(3U, file_size);

    // Check that opening non-existent file fails.
    std::unique_ptr<SequentialFile> seq_file;
    std::unique_ptr<RandomAccessFile> rand_file;
    ASSERT_TRUE(
      !env_->NewSequentialFile(dir_non_existent.c_str(), &seq_file, soptions_).ok());
    ASSERT_TRUE(!seq_file);
    ASSERT_TRUE(!env_->NewRandomAccessFile(dir_non_existent.c_str(), &rand_file,
                                           soptions_).ok());
    ASSERT_TRUE(!rand_file);

    // Check that deleting works.
    ASSERT_TRUE(!env_->DeleteFile(dir_non_existent.c_str()).ok());
    ASSERT_OK(env_->DeleteFile(dir_g.c_str()));
    ASSERT_EQ(Status::NotFound(), env_->FileExists(dir_g.c_str()));
    ASSERT_OK(env_->GetChildren(dir.c_str(), &children));
    ASSERT_EQ(0U, children.size());
    ASSERT_OK(env_->DeleteDir(dir.c_str()));
  }
}

TEST_F(EnvLibradosMutipoolTest, DBBasics) {
  std::string kDBPath = "/tmp/DBBasics";
  std::string walPath = "/tmp/wal";
  DB* db;
  Options options;
  // Optimize RocksDB. This is the easiest way to get RocksDB to perform well
  options.IncreaseParallelism();
  options.OptimizeLevelStyleCompaction();
  // create the DB if it's not already present
  options.create_if_missing = true;
  options.env = env_;
  options.wal_dir = walPath;

  // open DB
  Status s = DB::Open(options, kDBPath, &db);
  assert(s.ok());

  // Put key-value
  s = db->Put(WriteOptions(), "key1", "value");
  assert(s.ok());
  std::string value;
  // get value
  s = db->Get(ReadOptions(), "key1", &value);
  assert(s.ok());
  assert(value == "value");

  // atomically apply a set of updates
  {
    WriteBatch batch;
    batch.Delete("key1");
    batch.Put("key2", value);
    s = db->Write(WriteOptions(), &batch);
  }

  s = db->Get(ReadOptions(), "key1", &value);
  assert(s.IsNotFound());

  db->Get(ReadOptions(), "key2", &value);
  assert(value == "value");

  delete db;
}

TEST_F(EnvLibradosMutipoolTest, DBBulkLoadKeysInRandomOrder) {
  char key[20] = {0}, value[20] = {0};
  int max_loop = 1 << 6;
  int bulk_size = 1 << 15;
  Timer timer(false);
  std::cout << "Test size : loop(" << max_loop << "); bulk_size(" << bulk_size << ")" << std::endl;
  /**********************************
            use default env
  ***********************************/
  std::string kDBPath1 = "/tmp/DBBulkLoadKeysInRandomOrder1";
  std::string walPath = "/tmp/wal";
  DB* db1;
  Options options1;
  // Optimize Rocksdb. This is the easiest way to get RocksDB to perform well
  options1.IncreaseParallelism();
  options1.OptimizeLevelStyleCompaction();
  // create the DB if it's not already present
  options1.create_if_missing = true;

  // open DB
  Status s1 = DB::Open(options1, kDBPath1, &db1);
  assert(s1.ok());

  ROCKSDB_NAMESPACE::Random64 r1(time(nullptr));

  timer.Reset();
  for (int i = 0; i < max_loop; ++i) {
    WriteBatch batch;
    for (int j = 0; j < bulk_size; ++j) {
      snprintf(key,
               20,
               "%16lx",
               (unsigned long)r1.Uniform(std::numeric_limits<uint64_t>::max()));
      snprintf(value,
               20,
               "%16lx",
               (unsigned long)r1.Uniform(std::numeric_limits<uint64_t>::max()));
      batch.Put(key, value);
    }
    s1 = db1->Write(WriteOptions(), &batch);
    assert(s1.ok());
  }
  std::cout << "Time by default : " << timer << "ms" << std::endl;
  delete db1;

  /**********************************
            use librados env
  ***********************************/
  std::string kDBPath2 = "/tmp/DBBulkLoadKeysInRandomOrder2";
  DB* db2;
  Options options2;
  // Optimize RocksDB. This is the easiest way to get RocksDB to perform well
  options2.IncreaseParallelism();
  options2.OptimizeLevelStyleCompaction();
  // create the DB if it's not already present
  options2.create_if_missing = true;
  options2.env = env_;
  options2.wal_dir = walPath;

  // open DB
  Status s2 = DB::Open(options2, kDBPath2, &db2);
  if (!s2.ok()) {
    std::cerr << s2.ToString() << std::endl;
  }
  assert(s2.ok());

  ROCKSDB_NAMESPACE::Random64 r2(time(nullptr));

  timer.Reset();
  for (int i = 0; i < max_loop; ++i) {
    WriteBatch batch;
    for (int j = 0; j < bulk_size; ++j) {
      snprintf(key,
               20,
               "%16lx",
               (unsigned long)r2.Uniform(std::numeric_limits<uint64_t>::max()));
      snprintf(value,
               20,
               "%16lx",
               (unsigned long)r2.Uniform(std::numeric_limits<uint64_t>::max()));
      batch.Put(key, value);
    }
    s2 = db2->Write(WriteOptions(), &batch);
    assert(s2.ok());
  }
  std::cout << "Time by librados : " << timer << "ms" << std::endl;
  delete db2;
}

TEST_F(EnvLibradosMutipoolTest, DBTransactionDB) {
  std::string kDBPath = "/tmp/DBTransactionDB";
  // open DB
  Options options;
  TransactionDBOptions txn_db_options;
  options.create_if_missing = true;
  options.env = env_;
  TransactionDB* txn_db;

  Status s = TransactionDB::Open(options, txn_db_options, kDBPath, &txn_db);
  assert(s.ok());

  WriteOptions write_options;
  ReadOptions read_options;
  TransactionOptions txn_options;
  std::string value;

  ////////////////////////////////////////////////////////
  //
  // Simple OptimisticTransaction Example ("Read Committed")
  //
  ////////////////////////////////////////////////////////

  // Start a transaction
  Transaction* txn = txn_db->BeginTransaction(write_options);
  assert(txn);

  // Read a key in this transaction
  s = txn->Get(read_options, "abc", &value);
  assert(s.IsNotFound());

  // Write a key in this transaction
  s = txn->Put("abc", "def");
  assert(s.ok());

  // Read a key OUTSIDE this transaction. Does not affect txn.
  s = txn_db->Get(read_options, "abc", &value);

  // Write a key OUTSIDE of this transaction.
  // Does not affect txn since this is an unrelated key.  If we wrote key 'abc'
  // here, the transaction would fail to commit.
  s = txn_db->Put(write_options, "xyz", "zzz");

  // Commit transaction
  s = txn->Commit();
  assert(s.ok());
  delete txn;

  ////////////////////////////////////////////////////////
  //
  // "Repeatable Read" (Snapshot Isolation) Example
  //   -- Using a single Snapshot
  //
  ////////////////////////////////////////////////////////

  // Set a snapshot at start of transaction by setting set_snapshot=true
  txn_options.set_snapshot = true;
  txn = txn_db->BeginTransaction(write_options, txn_options);

  const Snapshot* snapshot = txn->GetSnapshot();

  // Write a key OUTSIDE of transaction
  s = txn_db->Put(write_options, "abc", "xyz");
  assert(s.ok());

  // Attempt to read a key using the snapshot.  This will fail since
  // the previous write outside this txn conflicts with this read.
  read_options.snapshot = snapshot;
  s = txn->GetForUpdate(read_options, "abc", &value);
  assert(s.IsBusy());

  txn->Rollback();

  delete txn;
  // Clear snapshot from read options since it is no longer valid
  read_options.snapshot = nullptr;
  snapshot = nullptr;

  ////////////////////////////////////////////////////////
  //
  // "Read Committed" (Monotonic Atomic Views) Example
  //   --Using multiple Snapshots
  //
  ////////////////////////////////////////////////////////

  // In this example, we set the snapshot multiple times.  This is probably
  // only necessary if you have very strict isolation requirements to
  // implement.

  // Set a snapshot at start of transaction
  txn_options.set_snapshot = true;
  txn = txn_db->BeginTransaction(write_options, txn_options);

  // Do some reads and writes to key "x"
  read_options.snapshot = txn_db->GetSnapshot();
  s = txn->Get(read_options, "x", &value);
  txn->Put("x", "x");

  // Do a write outside of the transaction to key "y"
  s = txn_db->Put(write_options, "y", "y");

  // Set a new snapshot in the transaction
  txn->SetSnapshot();
  txn->SetSavePoint();
  read_options.snapshot = txn_db->GetSnapshot();

  // Do some reads and writes to key "y"
  // Since the snapshot was advanced, the write done outside of the
  // transaction does not conflict.
  s = txn->GetForUpdate(read_options, "y", &value);
  txn->Put("y", "y");

  // Decide we want to revert the last write from this transaction.
  txn->RollbackToSavePoint();

  // Commit.
  s = txn->Commit();
  assert(s.ok());
  delete txn;
  // Clear snapshot from read options since it is no longer valid
  read_options.snapshot = nullptr;

  // Cleanup
  delete txn_db;
  DestroyDB(kDBPath, options);
}

}  // namespace ROCKSDB_NAMESPACE

int main(int argc, char** argv) {
  ::testing::InitGoogleTest(&argc, argv);
  return RUN_ALL_TESTS();
}

#else
#include <stdio.h>

int main(int argc, char** argv) {
  fprintf(stderr, "SKIPPED as EnvMirror is not supported in ROCKSDB_LITE\n");
  return 0;
}

#endif  // !ROCKSDB_LITE