//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
//  This source code is licensed under both the GPLv2 (found in the
//  COPYING file in the root directory) and Apache 2.0 License
//  (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.

#include <algorithm>
#include <cinttypes>
#include <iostream>
#include <mutex>
#include <queue>
#include <set>
#include <thread>
#include <unordered_set>
#include <utility>

#include "db/db_impl/db_impl.h"
#include "db/dbformat.h"
#include "db/job_context.h"
#include "db/version_set.h"
#include "db/write_batch_internal.h"
#include "env/mock_env.h"
#include "file/filename.h"
#include "monitoring/statistics.h"
#include "monitoring/thread_status_util.h"
#include "port/stack_trace.h"
#include "rocksdb/cache.h"
#include "rocksdb/compaction_filter.h"
#include "rocksdb/convenience.h"
#include "rocksdb/db.h"
#include "rocksdb/env.h"
#include "rocksdb/experimental.h"
#include "rocksdb/filter_policy.h"
#include "rocksdb/options.h"
#include "rocksdb/perf_context.h"
#include "rocksdb/slice.h"
#include "rocksdb/slice_transform.h"
#include "rocksdb/table.h"
#include "rocksdb/table_properties.h"
#include "rocksdb/thread_status.h"
#include "rocksdb/utilities/checkpoint.h"
#include "rocksdb/utilities/write_batch_with_index.h"
#include "table/block_based/block_based_table_factory.h"
#include "table/mock_table.h"
#include "table/plain/plain_table_factory.h"
#include "table/scoped_arena_iterator.h"
#include "test_util/sync_point.h"
#include "test_util/testharness.h"
#include "test_util/testutil.h"
#include "util/cast_util.h"
#include "util/compression.h"
#include "util/hash.h"
#include "util/mutexlock.h"
#include "util/rate_limiter.h"
#include "util/string_util.h"
#include "utilities/merge_operators.h"

#if !defined(IOS_CROSS_COMPILE)
#ifndef ROCKSDB_LITE
namespace ROCKSDB_NAMESPACE {

static std::string RandomString(Random* rnd, int len, double ratio) {
  std::string r;
  test::CompressibleString(rnd, ratio, len, &r);
  return r;
}

std::string Key(uint64_t key, int length) {
  const int kBufSize = 1000;
  char buf[kBufSize];
  if (length > kBufSize) {
    length = kBufSize;
  }
  snprintf(buf, kBufSize, "%0*" PRIu64, length, key);
  return std::string(buf);
}

class CompactionJobStatsTest : public testing::Test,
                               public testing::WithParamInterface<bool> {
 public:
  std::string dbname_;
  std::string alternative_wal_dir_;
  Env* env_;
  DB* db_;
  std::vector<ColumnFamilyHandle*> handles_;
  uint32_t max_subcompactions_;

  Options last_options_;

  CompactionJobStatsTest() : env_(Env::Default()) {
    env_->SetBackgroundThreads(1, Env::LOW);
    env_->SetBackgroundThreads(1, Env::HIGH);
    dbname_ = test::PerThreadDBPath("compaction_job_stats_test");
    alternative_wal_dir_ = dbname_ + "/wal";
    Options options;
    options.create_if_missing = true;
    max_subcompactions_ = GetParam();
    options.max_subcompactions = max_subcompactions_;
    auto delete_options = options;
    delete_options.wal_dir = alternative_wal_dir_;
    EXPECT_OK(DestroyDB(dbname_, delete_options));
    // Destroy it for not alternative WAL dir is used.
    EXPECT_OK(DestroyDB(dbname_, options));
    db_ = nullptr;
    Reopen(options);
  }

  ~CompactionJobStatsTest() override {
    ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
    ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({});
    ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
    Close();
    Options options;
    options.db_paths.emplace_back(dbname_, 0);
    options.db_paths.emplace_back(dbname_ + "_2", 0);
    options.db_paths.emplace_back(dbname_ + "_3", 0);
    options.db_paths.emplace_back(dbname_ + "_4", 0);
    EXPECT_OK(DestroyDB(dbname_, options));
  }

  // Required if inheriting from testing::WithParamInterface<>
  static void SetUpTestCase() {}
  static void TearDownTestCase() {}

  DBImpl* dbfull() { return static_cast_with_check<DBImpl>(db_); }

  void CreateColumnFamilies(const std::vector<std::string>& cfs,
                            const Options& options) {
    ColumnFamilyOptions cf_opts(options);
    size_t cfi = handles_.size();
    handles_.resize(cfi + cfs.size());
    for (auto cf : cfs) {
      ASSERT_OK(db_->CreateColumnFamily(cf_opts, cf, &handles_[cfi++]));
    }
  }

  void CreateAndReopenWithCF(const std::vector<std::string>& cfs,
                             const Options& options) {
    CreateColumnFamilies(cfs, options);
    std::vector<std::string> cfs_plus_default = cfs;
    cfs_plus_default.insert(cfs_plus_default.begin(), kDefaultColumnFamilyName);
    ReopenWithColumnFamilies(cfs_plus_default, options);
  }

  void ReopenWithColumnFamilies(const std::vector<std::string>& cfs,
                                const std::vector<Options>& options) {
    ASSERT_OK(TryReopenWithColumnFamilies(cfs, options));
  }

  void ReopenWithColumnFamilies(const std::vector<std::string>& cfs,
                                const Options& options) {
    ASSERT_OK(TryReopenWithColumnFamilies(cfs, options));
  }

  Status TryReopenWithColumnFamilies(
      const std::vector<std::string>& cfs,
      const std::vector<Options>& options) {
    Close();
    EXPECT_EQ(cfs.size(), options.size());
    std::vector<ColumnFamilyDescriptor> column_families;
    for (size_t i = 0; i < cfs.size(); ++i) {
      column_families.push_back(ColumnFamilyDescriptor(cfs[i], options[i]));
    }
    DBOptions db_opts = DBOptions(options[0]);
    return DB::Open(db_opts, dbname_, column_families, &handles_, &db_);
  }

  Status TryReopenWithColumnFamilies(const std::vector<std::string>& cfs,
                                     const Options& options) {
    Close();
    std::vector<Options> v_opts(cfs.size(), options);
    return TryReopenWithColumnFamilies(cfs, v_opts);
  }

  void Reopen(const Options& options) {
    ASSERT_OK(TryReopen(options));
  }

  void Close() {
    for (auto h : handles_) {
      delete h;
    }
    handles_.clear();
    delete db_;
    db_ = nullptr;
  }

  void DestroyAndReopen(const Options& options) {
    // Destroy using last options
    Destroy(last_options_);
    ASSERT_OK(TryReopen(options));
  }

  void Destroy(const Options& options) {
    Close();
    ASSERT_OK(DestroyDB(dbname_, options));
  }

  Status ReadOnlyReopen(const Options& options) {
    return DB::OpenForReadOnly(options, dbname_, &db_);
  }

  Status TryReopen(const Options& options) {
    Close();
    last_options_ = options;
    return DB::Open(options, dbname_, &db_);
  }

  Status Flush(int cf = 0) {
    if (cf == 0) {
      return db_->Flush(FlushOptions());
    } else {
      return db_->Flush(FlushOptions(), handles_[cf]);
    }
  }

  Status Put(const Slice& k, const Slice& v, WriteOptions wo = WriteOptions()) {
    return db_->Put(wo, k, v);
  }

  Status Put(int cf, const Slice& k, const Slice& v,
             WriteOptions wo = WriteOptions()) {
    return db_->Put(wo, handles_[cf], k, v);
  }

  Status Delete(const std::string& k) {
    return db_->Delete(WriteOptions(), k);
  }

  Status Delete(int cf, const std::string& k) {
    return db_->Delete(WriteOptions(), handles_[cf], k);
  }

  std::string Get(const std::string& k, const Snapshot* snapshot = nullptr) {
    ReadOptions options;
    options.verify_checksums = true;
    options.snapshot = snapshot;
    std::string result;
    Status s = db_->Get(options, k, &result);
    if (s.IsNotFound()) {
      result = "NOT_FOUND";
    } else if (!s.ok()) {
      result = s.ToString();
    }
    return result;
  }

  std::string Get(int cf, const std::string& k,
                  const Snapshot* snapshot = nullptr) {
    ReadOptions options;
    options.verify_checksums = true;
    options.snapshot = snapshot;
    std::string result;
    Status s = db_->Get(options, handles_[cf], k, &result);
    if (s.IsNotFound()) {
      result = "NOT_FOUND";
    } else if (!s.ok()) {
      result = s.ToString();
    }
    return result;
  }

  int NumTableFilesAtLevel(int level, int cf = 0) {
    std::string property;
    if (cf == 0) {
      // default cfd
      EXPECT_TRUE(db_->GetProperty(
          "rocksdb.num-files-at-level" + ToString(level), &property));
    } else {
      EXPECT_TRUE(db_->GetProperty(
          handles_[cf], "rocksdb.num-files-at-level" + ToString(level),
          &property));
    }
    return atoi(property.c_str());
  }

  // Return spread of files per level
  std::string FilesPerLevel(int cf = 0) {
    int num_levels =
        (cf == 0) ? db_->NumberLevels() : db_->NumberLevels(handles_[1]);
    std::string result;
    size_t last_non_zero_offset = 0;
    for (int level = 0; level < num_levels; level++) {
      int f = NumTableFilesAtLevel(level, cf);
      char buf[100];
      snprintf(buf, sizeof(buf), "%s%d", (level ? "," : ""), f);
      result += buf;
      if (f > 0) {
        last_non_zero_offset = result.size();
      }
    }
    result.resize(last_non_zero_offset);
    return result;
  }

  Status Size(uint64_t* size, const Slice& start, const Slice& limit,
              int cf = 0) {
    Range r(start, limit);
    if (cf == 0) {
      return db_->GetApproximateSizes(&r, 1, size);
    } else {
      return db_->GetApproximateSizes(handles_[1], &r, 1, size);
    }
  }

  void Compact(int cf, const Slice& start, const Slice& limit,
               uint32_t target_path_id) {
    CompactRangeOptions compact_options;
    compact_options.target_path_id = target_path_id;
    ASSERT_OK(db_->CompactRange(compact_options, handles_[cf], &start, &limit));
  }

  void Compact(int cf, const Slice& start, const Slice& limit) {
    ASSERT_OK(
        db_->CompactRange(CompactRangeOptions(), handles_[cf], &start, &limit));
  }

  void Compact(const Slice& start, const Slice& limit) {
    ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &start, &limit));
  }

  void TEST_Compact(int level, int cf, const Slice& start, const Slice& limit) {
    ASSERT_OK(dbfull()->TEST_CompactRange(level, &start, &limit, handles_[cf],
                                          true /* disallow trivial move */));
  }

  // Do n memtable compactions, each of which produces an sstable
  // covering the range [small,large].
  void MakeTables(int n, const std::string& small, const std::string& large,
                  int cf = 0) {
    for (int i = 0; i < n; i++) {
      ASSERT_OK(Put(cf, small, "begin"));
      ASSERT_OK(Put(cf, large, "end"));
      ASSERT_OK(Flush(cf));
    }
  }

  static void SetDeletionCompactionStats(
      CompactionJobStats *stats, uint64_t input_deletions,
      uint64_t expired_deletions, uint64_t records_replaced) {
    stats->num_input_deletion_records = input_deletions;
    stats->num_expired_deletion_records = expired_deletions;
    stats->num_records_replaced = records_replaced;
  }

  void MakeTableWithKeyValues(
    Random* rnd, uint64_t smallest, uint64_t largest,
    int key_size, int value_size, uint64_t interval,
    double ratio, int cf = 0) {
    for (auto key = smallest; key < largest; key += interval) {
      ASSERT_OK(Put(cf, Slice(Key(key, key_size)),
                        Slice(RandomString(rnd, value_size, ratio))));
    }
    ASSERT_OK(Flush(cf));
  }

  // This function behaves with the implicit understanding that two
  // rounds of keys are inserted into the database, as per the behavior
  // of the DeletionStatsTest.
  void SelectivelyDeleteKeys(uint64_t smallest, uint64_t largest,
    uint64_t interval, int deletion_interval, int key_size,
    uint64_t cutoff_key_num, CompactionJobStats* stats, int cf = 0) {

    // interval needs to be >= 2 so that deletion entries can be inserted
    // that are intended to not result in an actual key deletion by using
    // an offset of 1 from another existing key
    ASSERT_GE(interval, 2);

    uint64_t ctr = 1;
    uint32_t deletions_made = 0;
    uint32_t num_deleted = 0;
    uint32_t num_expired = 0;
    for (auto key = smallest; key <= largest; key += interval, ctr++) {
      if (ctr % deletion_interval == 0) {
        ASSERT_OK(Delete(cf, Key(key, key_size)));
        deletions_made++;
        num_deleted++;

        if (key > cutoff_key_num) {
          num_expired++;
        }
      }
    }

    // Insert some deletions for keys that don't exist that
    // are both in and out of the key range
    ASSERT_OK(Delete(cf, Key(smallest+1, key_size)));
    deletions_made++;

    ASSERT_OK(Delete(cf, Key(smallest-1, key_size)));
    deletions_made++;
    num_expired++;

    ASSERT_OK(Delete(cf, Key(smallest-9, key_size)));
    deletions_made++;
    num_expired++;

    ASSERT_OK(Flush(cf));
    SetDeletionCompactionStats(stats, deletions_made, num_expired,
      num_deleted);
  }
};

// An EventListener which helps verify the compaction results in
// test CompactionJobStatsTest.
class CompactionJobStatsChecker : public EventListener {
 public:
  CompactionJobStatsChecker()
      : compression_enabled_(false), verify_next_comp_io_stats_(false) {}

  size_t NumberOfUnverifiedStats() { return expected_stats_.size(); }

  void set_verify_next_comp_io_stats(bool v) { verify_next_comp_io_stats_ = v; }

  // Once a compaction completed, this function will verify the returned
  // CompactionJobInfo with the oldest CompactionJobInfo added earlier
  // in "expected_stats_" which has not yet being used for verification.
  void OnCompactionCompleted(DB* /*db*/, const CompactionJobInfo& ci) override {
    if (verify_next_comp_io_stats_) {
      ASSERT_GT(ci.stats.file_write_nanos, 0);
      ASSERT_GT(ci.stats.file_range_sync_nanos, 0);
      ASSERT_GT(ci.stats.file_fsync_nanos, 0);
      ASSERT_GT(ci.stats.file_prepare_write_nanos, 0);
      verify_next_comp_io_stats_ = false;
    }

    std::lock_guard<std::mutex> lock(mutex_);
    if (expected_stats_.size()) {
      Verify(ci.stats, expected_stats_.front());
      expected_stats_.pop();
    }
  }

  // A helper function which verifies whether two CompactionJobStats
  // match.  The verification of all compaction stats are done by
  // ASSERT_EQ except for the total input / output bytes, which we
  // use ASSERT_GE and ASSERT_LE with a reasonable bias ---
  // 10% in uncompressed case and 20% when compression is used.
  virtual void Verify(const CompactionJobStats& current_stats,
              const CompactionJobStats& stats) {
    // time
    ASSERT_GT(current_stats.elapsed_micros, 0U);

    ASSERT_EQ(current_stats.num_input_records,
        stats.num_input_records);
    ASSERT_EQ(current_stats.num_input_files,
        stats.num_input_files);
    ASSERT_EQ(current_stats.num_input_files_at_output_level,
        stats.num_input_files_at_output_level);

    ASSERT_EQ(current_stats.num_output_records,
        stats.num_output_records);
    ASSERT_EQ(current_stats.num_output_files,
        stats.num_output_files);

    ASSERT_EQ(current_stats.is_full_compaction, stats.is_full_compaction);
    ASSERT_EQ(current_stats.is_manual_compaction,
        stats.is_manual_compaction);

    // file size
    double kFileSizeBias = compression_enabled_ ? 0.20 : 0.10;
    ASSERT_GE(current_stats.total_input_bytes * (1.00 + kFileSizeBias),
              stats.total_input_bytes);
    ASSERT_LE(current_stats.total_input_bytes,
              stats.total_input_bytes * (1.00 + kFileSizeBias));
    ASSERT_GE(current_stats.total_output_bytes * (1.00 + kFileSizeBias),
              stats.total_output_bytes);
    ASSERT_LE(current_stats.total_output_bytes,
              stats.total_output_bytes * (1.00 + kFileSizeBias));
    ASSERT_EQ(current_stats.total_input_raw_key_bytes,
              stats.total_input_raw_key_bytes);
    ASSERT_EQ(current_stats.total_input_raw_value_bytes,
              stats.total_input_raw_value_bytes);

    ASSERT_EQ(current_stats.num_records_replaced,
        stats.num_records_replaced);

    ASSERT_EQ(current_stats.num_corrupt_keys,
        stats.num_corrupt_keys);

    ASSERT_EQ(
        std::string(current_stats.smallest_output_key_prefix),
        std::string(stats.smallest_output_key_prefix));
    ASSERT_EQ(
        std::string(current_stats.largest_output_key_prefix),
        std::string(stats.largest_output_key_prefix));
  }

  // Add an expected compaction stats, which will be used to
  // verify the CompactionJobStats returned by the OnCompactionCompleted()
  // callback.
  void AddExpectedStats(const CompactionJobStats& stats) {
    std::lock_guard<std::mutex> lock(mutex_);
    expected_stats_.push(stats);
  }

  void EnableCompression(bool flag) {
    compression_enabled_ = flag;
  }

  bool verify_next_comp_io_stats() const { return verify_next_comp_io_stats_; }

 private:
  std::mutex mutex_;
  std::queue<CompactionJobStats> expected_stats_;
  bool compression_enabled_;
  bool verify_next_comp_io_stats_;
};

// An EventListener which helps verify the compaction statistics in
// the test DeletionStatsTest.
class CompactionJobDeletionStatsChecker : public CompactionJobStatsChecker {
 public:
  // Verifies whether two CompactionJobStats match.
  void Verify(const CompactionJobStats& current_stats,
              const CompactionJobStats& stats) override {
    ASSERT_EQ(
      current_stats.num_input_deletion_records,
      stats.num_input_deletion_records);
    ASSERT_EQ(
        current_stats.num_expired_deletion_records,
        stats.num_expired_deletion_records);
    ASSERT_EQ(
        current_stats.num_records_replaced,
        stats.num_records_replaced);

    ASSERT_EQ(current_stats.num_corrupt_keys,
        stats.num_corrupt_keys);
  }
};

namespace {

uint64_t EstimatedFileSize(
    uint64_t num_records, size_t key_size, size_t value_size,
    double compression_ratio = 1.0,
    size_t block_size = 4096,
    int bloom_bits_per_key = 10) {
  const size_t kPerKeyOverhead = 8;
  const size_t kFooterSize = 512;

  uint64_t data_size =
    static_cast<uint64_t>(
      num_records * (key_size + value_size * compression_ratio +
                     kPerKeyOverhead));

  return data_size + kFooterSize
         + num_records * bloom_bits_per_key / 8      // filter block
         + data_size * (key_size + 8) / block_size;  // index block
}

namespace {

void CopyPrefix(
    const Slice& src, size_t prefix_length, std::string* dst) {
  assert(prefix_length > 0);
  size_t length = src.size() > prefix_length ? prefix_length : src.size();
  dst->assign(src.data(), length);
}

}  // namespace

CompactionJobStats NewManualCompactionJobStats(
    const std::string& smallest_key, const std::string& largest_key,
    size_t num_input_files, size_t num_input_files_at_output_level,
    uint64_t num_input_records, size_t key_size, size_t value_size,
    size_t num_output_files, uint64_t num_output_records,
    double compression_ratio, uint64_t num_records_replaced,
    bool is_full = false, bool is_manual = true) {
  CompactionJobStats stats;
  stats.Reset();

  stats.num_input_records = num_input_records;
  stats.num_input_files = num_input_files;
  stats.num_input_files_at_output_level = num_input_files_at_output_level;

  stats.num_output_records = num_output_records;
  stats.num_output_files = num_output_files;

  stats.total_input_bytes =
      EstimatedFileSize(
          num_input_records / num_input_files,
          key_size, value_size, compression_ratio) * num_input_files;
  stats.total_output_bytes =
      EstimatedFileSize(
          num_output_records / num_output_files,
          key_size, value_size, compression_ratio) * num_output_files;
  stats.total_input_raw_key_bytes =
      num_input_records * (key_size + 8);
  stats.total_input_raw_value_bytes =
      num_input_records * value_size;

  stats.is_full_compaction = is_full;
  stats.is_manual_compaction = is_manual;

  stats.num_records_replaced = num_records_replaced;

  CopyPrefix(smallest_key,
             CompactionJobStats::kMaxPrefixLength,
             &stats.smallest_output_key_prefix);
  CopyPrefix(largest_key,
             CompactionJobStats::kMaxPrefixLength,
             &stats.largest_output_key_prefix);

  return stats;
}

CompressionType GetAnyCompression() {
  if (Snappy_Supported()) {
    return kSnappyCompression;
  } else if (Zlib_Supported()) {
    return kZlibCompression;
  } else if (BZip2_Supported()) {
    return kBZip2Compression;
  } else if (LZ4_Supported()) {
    return kLZ4Compression;
  } else if (XPRESS_Supported()) {
    return kXpressCompression;
  }

  return kNoCompression;
}

}  // namespace

TEST_P(CompactionJobStatsTest, CompactionJobStatsTest) {
  Random rnd(301);
  const int kBufSize = 100;
  char buf[kBufSize];
  uint64_t key_base = 100000000l;
  // Note: key_base must be multiple of num_keys_per_L0_file
  int num_keys_per_L0_file = 100;
  const int kTestScale = 8;
  const int kKeySize = 10;
  const int kValueSize = 1000;
  const double kCompressionRatio = 0.5;
  double compression_ratio = 1.0;
  uint64_t key_interval = key_base / num_keys_per_L0_file;

  // Whenever a compaction completes, this listener will try to
  // verify whether the returned CompactionJobStats matches
  // what we expect.  The expected CompactionJobStats is added
  // via AddExpectedStats().
  auto* stats_checker = new CompactionJobStatsChecker();
  Options options;
  options.listeners.emplace_back(stats_checker);
  options.create_if_missing = true;
  // just enough setting to hold off auto-compaction.
  options.level0_file_num_compaction_trigger = kTestScale + 1;
  options.num_levels = 3;
  options.compression = kNoCompression;
  options.max_subcompactions = max_subcompactions_;
  options.bytes_per_sync = 512 * 1024;

  options.report_bg_io_stats = true;
  for (int test = 0; test < 2; ++test) {
    DestroyAndReopen(options);
    CreateAndReopenWithCF({"pikachu"}, options);

    // 1st Phase: generate "num_L0_files" L0 files.
    int num_L0_files = 0;
    for (uint64_t start_key = key_base;
                  start_key <= key_base * kTestScale;
                  start_key += key_base) {
      MakeTableWithKeyValues(
          &rnd, start_key, start_key + key_base - 1,
          kKeySize, kValueSize, key_interval,
          compression_ratio, 1);
      snprintf(buf, kBufSize, "%d", ++num_L0_files);
      ASSERT_EQ(std::string(buf), FilesPerLevel(1));
    }
    ASSERT_EQ(ToString(num_L0_files), FilesPerLevel(1));

    // 2nd Phase: perform L0 -> L1 compaction.
    int L0_compaction_count = 6;
    int count = 1;
    std::string smallest_key;
    std::string largest_key;
    for (uint64_t start_key = key_base;
         start_key <= key_base * L0_compaction_count;
         start_key += key_base, count++) {
      smallest_key = Key(start_key, 10);
      largest_key = Key(start_key + key_base - key_interval, 10);
      stats_checker->AddExpectedStats(
          NewManualCompactionJobStats(
              smallest_key, largest_key,
              1, 0, num_keys_per_L0_file,
              kKeySize, kValueSize,
              1, num_keys_per_L0_file,
              compression_ratio, 0));
      ASSERT_EQ(stats_checker->NumberOfUnverifiedStats(), 1U);
      TEST_Compact(0, 1, smallest_key, largest_key);
      snprintf(buf, kBufSize, "%d,%d", num_L0_files - count, count);
      ASSERT_EQ(std::string(buf), FilesPerLevel(1));
    }

    // compact two files into one in the last L0 -> L1 compaction
    int num_remaining_L0 = num_L0_files - L0_compaction_count;
    smallest_key = Key(key_base * (L0_compaction_count + 1), 10);
    largest_key = Key(key_base * (kTestScale + 1) - key_interval, 10);
    stats_checker->AddExpectedStats(
        NewManualCompactionJobStats(
            smallest_key, largest_key,
            num_remaining_L0,
            0, num_keys_per_L0_file * num_remaining_L0,
            kKeySize, kValueSize,
            1, num_keys_per_L0_file * num_remaining_L0,
            compression_ratio, 0));
    ASSERT_EQ(stats_checker->NumberOfUnverifiedStats(), 1U);
    TEST_Compact(0, 1, smallest_key, largest_key);

    int num_L1_files = num_L0_files - num_remaining_L0 + 1;
    num_L0_files = 0;
    snprintf(buf, kBufSize, "%d,%d", num_L0_files, num_L1_files);
    ASSERT_EQ(std::string(buf), FilesPerLevel(1));

    // 3rd Phase: generate sparse L0 files (wider key-range, same num of keys)
    int sparseness = 2;
    for (uint64_t start_key = key_base;
                  start_key <= key_base * kTestScale;
                  start_key += key_base * sparseness) {
      MakeTableWithKeyValues(
          &rnd, start_key, start_key + key_base * sparseness - 1,
          kKeySize, kValueSize,
          key_base * sparseness / num_keys_per_L0_file,
          compression_ratio, 1);
      snprintf(buf, kBufSize, "%d,%d", ++num_L0_files, num_L1_files);
      ASSERT_EQ(std::string(buf), FilesPerLevel(1));
    }

    // 4th Phase: perform L0 -> L1 compaction again, expect higher write amp
    // When subcompactions are enabled, the number of output files increases
    // by 1 because multiple threads are consuming the input and generating
    // output files without coordinating to see if the output could fit into
    // a smaller number of files like it does when it runs sequentially
    int num_output_files = options.max_subcompactions > 1 ? 2 : 1;
    for (uint64_t start_key = key_base;
         num_L0_files > 1;
         start_key += key_base * sparseness) {
      smallest_key = Key(start_key, 10);
      largest_key =
          Key(start_key + key_base * sparseness - key_interval, 10);
      stats_checker->AddExpectedStats(
          NewManualCompactionJobStats(
              smallest_key, largest_key,
              3, 2, num_keys_per_L0_file * 3,
              kKeySize, kValueSize,
              num_output_files,
              num_keys_per_L0_file * 2,  // 1/3 of the data will be updated.
              compression_ratio,
              num_keys_per_L0_file));
      ASSERT_EQ(stats_checker->NumberOfUnverifiedStats(), 1U);
      Compact(1, smallest_key, largest_key);
      if (options.max_subcompactions == 1) {
        --num_L1_files;
      }
      snprintf(buf, kBufSize, "%d,%d", --num_L0_files, num_L1_files);
      ASSERT_EQ(std::string(buf), FilesPerLevel(1));
    }

    // 5th Phase: Do a full compaction, which involves in two sub-compactions.
    // Here we expect to have 1 L0 files and 4 L1 files
    // In the first sub-compaction, we expect L0 compaction.
    smallest_key = Key(key_base, 10);
    largest_key = Key(key_base * (kTestScale + 1) - key_interval, 10);
    stats_checker->AddExpectedStats(
        NewManualCompactionJobStats(
            Key(key_base * (kTestScale + 1 - sparseness), 10), largest_key,
            2, 1, num_keys_per_L0_file * 3,
            kKeySize, kValueSize,
            1, num_keys_per_L0_file * 2,
            compression_ratio,
            num_keys_per_L0_file));
    ASSERT_EQ(stats_checker->NumberOfUnverifiedStats(), 1U);
    Compact(1, smallest_key, largest_key);

    num_L1_files = options.max_subcompactions > 1 ? 7 : 4;
    char L1_buf[4];
    snprintf(L1_buf, sizeof(L1_buf), "0,%d", num_L1_files);
    std::string L1_files(L1_buf);
    ASSERT_EQ(L1_files, FilesPerLevel(1));
    options.compression = GetAnyCompression();
    if (options.compression == kNoCompression) {
      break;
    }
    stats_checker->EnableCompression(true);
    compression_ratio = kCompressionRatio;

    for (int i = 0; i < 5; i++) {
      ASSERT_OK(Put(1, Slice(Key(key_base + i, 10)),
                    Slice(RandomString(&rnd, 512 * 1024, 1))));
    }

    ASSERT_OK(Flush(1));
    ASSERT_OK(static_cast_with_check<DBImpl>(db_)->TEST_WaitForCompact());

    stats_checker->set_verify_next_comp_io_stats(true);
    std::atomic<bool> first_prepare_write(true);
    ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
        "WritableFileWriter::Append:BeforePrepareWrite", [&](void* /*arg*/) {
          if (first_prepare_write.load()) {
            options.env->SleepForMicroseconds(3);
            first_prepare_write.store(false);
          }
        });

    std::atomic<bool> first_flush(true);
    ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
        "WritableFileWriter::Flush:BeforeAppend", [&](void* /*arg*/) {
          if (first_flush.load()) {
            options.env->SleepForMicroseconds(3);
            first_flush.store(false);
          }
        });

    std::atomic<bool> first_sync(true);
    ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
        "WritableFileWriter::SyncInternal:0", [&](void* /*arg*/) {
          if (first_sync.load()) {
            options.env->SleepForMicroseconds(3);
            first_sync.store(false);
          }
        });

    std::atomic<bool> first_range_sync(true);
    ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
        "WritableFileWriter::RangeSync:0", [&](void* /*arg*/) {
          if (first_range_sync.load()) {
            options.env->SleepForMicroseconds(3);
            first_range_sync.store(false);
          }
        });
    ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();

    Compact(1, smallest_key, largest_key);

    ASSERT_TRUE(!stats_checker->verify_next_comp_io_stats());
    ASSERT_TRUE(!first_prepare_write.load());
    ASSERT_TRUE(!first_flush.load());
    ASSERT_TRUE(!first_sync.load());
    ASSERT_TRUE(!first_range_sync.load());
    ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
  }
  ASSERT_EQ(stats_checker->NumberOfUnverifiedStats(), 0U);
}

TEST_P(CompactionJobStatsTest, DeletionStatsTest) {
  Random rnd(301);
  uint64_t key_base = 100000l;
  // Note: key_base must be multiple of num_keys_per_L0_file
  int num_keys_per_L0_file = 20;
  const int kTestScale = 8;  // make sure this is even
  const int kKeySize = 10;
  const int kValueSize = 100;
  double compression_ratio = 1.0;
  uint64_t key_interval = key_base / num_keys_per_L0_file;
  uint64_t largest_key_num = key_base * (kTestScale + 1) - key_interval;
  uint64_t cutoff_key_num = key_base * (kTestScale / 2 + 1) - key_interval;
  const std::string smallest_key = Key(key_base - 10, kKeySize);
  const std::string largest_key = Key(largest_key_num + 10, kKeySize);

  // Whenever a compaction completes, this listener will try to
  // verify whether the returned CompactionJobStats matches
  // what we expect.
  auto* stats_checker = new CompactionJobDeletionStatsChecker();
  Options options;
  options.listeners.emplace_back(stats_checker);
  options.create_if_missing = true;
  options.level0_file_num_compaction_trigger = kTestScale+1;
  options.num_levels = 3;
  options.compression = kNoCompression;
  options.max_bytes_for_level_multiplier = 2;
  options.max_subcompactions = max_subcompactions_;

  DestroyAndReopen(options);
  CreateAndReopenWithCF({"pikachu"}, options);

  // Stage 1: Generate several L0 files and then send them to L2 by
  // using CompactRangeOptions and CompactRange(). These files will
  // have a strict subset of the keys from the full key-range
  for (uint64_t start_key = key_base;
                start_key <= key_base * kTestScale / 2;
                start_key += key_base) {
    MakeTableWithKeyValues(
        &rnd, start_key, start_key + key_base - 1,
        kKeySize, kValueSize, key_interval,
        compression_ratio, 1);
  }

  CompactRangeOptions cr_options;
  cr_options.change_level = true;
  cr_options.target_level = 2;
  ASSERT_OK(db_->CompactRange(cr_options, handles_[1], nullptr, nullptr));
  ASSERT_GT(NumTableFilesAtLevel(2, 1), 0);

  // Stage 2: Generate files including keys from the entire key range
  for (uint64_t start_key = key_base;
                start_key <= key_base * kTestScale;
                start_key += key_base) {
    MakeTableWithKeyValues(
        &rnd, start_key, start_key + key_base - 1,
        kKeySize, kValueSize, key_interval,
        compression_ratio, 1);
  }

  // Send these L0 files to L1
  TEST_Compact(0, 1, smallest_key, largest_key);
  ASSERT_GT(NumTableFilesAtLevel(1, 1), 0);

  // Add a new record and flush so now there is a L0 file
  // with a value too (not just deletions from the next step)
  ASSERT_OK(Put(1, Key(key_base-6, kKeySize), "test"));
  ASSERT_OK(Flush(1));

  // Stage 3: Generate L0 files with some deletions so now
  // there are files with the same key range in L0, L1, and L2
  int deletion_interval = 3;
  CompactionJobStats first_compaction_stats;
  SelectivelyDeleteKeys(key_base, largest_key_num,
      key_interval, deletion_interval, kKeySize, cutoff_key_num,
      &first_compaction_stats, 1);

  stats_checker->AddExpectedStats(first_compaction_stats);

  // Stage 4: Trigger compaction and verify the stats
  TEST_Compact(0, 1, smallest_key, largest_key);
}

namespace {
int GetUniversalCompactionInputUnits(uint32_t num_flushes) {
  uint32_t compaction_input_units;
  for (compaction_input_units = 1;
       num_flushes >= compaction_input_units;
       compaction_input_units *= 2) {
    if ((num_flushes & compaction_input_units) != 0) {
      return compaction_input_units > 1 ? compaction_input_units : 0;
    }
  }
  return 0;
}
}  // namespace

TEST_P(CompactionJobStatsTest, UniversalCompactionTest) {
  Random rnd(301);
  uint64_t key_base = 100000000l;
  // Note: key_base must be multiple of num_keys_per_L0_file
  int num_keys_per_table = 100;
  const uint32_t kTestScale = 6;
  const int kKeySize = 10;
  const int kValueSize = 900;
  double compression_ratio = 1.0;
  uint64_t key_interval = key_base / num_keys_per_table;

  auto* stats_checker = new CompactionJobStatsChecker();
  Options options;
  options.listeners.emplace_back(stats_checker);
  options.create_if_missing = true;
  options.num_levels = 3;
  options.compression = kNoCompression;
  options.level0_file_num_compaction_trigger = 2;
  options.target_file_size_base = num_keys_per_table * 1000;
  options.compaction_style = kCompactionStyleUniversal;
  options.compaction_options_universal.size_ratio = 1;
  options.compaction_options_universal.max_size_amplification_percent = 1000;
  options.max_subcompactions = max_subcompactions_;

  DestroyAndReopen(options);
  CreateAndReopenWithCF({"pikachu"}, options);

  // Generates the expected CompactionJobStats for each compaction
  for (uint32_t num_flushes = 2; num_flushes <= kTestScale; num_flushes++) {
    // Here we treat one newly flushed file as an unit.
    //
    // For example, if a newly flushed file is 100k, and a compaction has
    // 4 input units, then this compaction inputs 400k.
    uint32_t num_input_units = GetUniversalCompactionInputUnits(num_flushes);
    if (num_input_units == 0) {
      continue;
    }
    // A full compaction only happens when the number of flushes equals to
    // the number of compaction input runs.
    bool is_full = num_flushes == num_input_units;
    // The following statement determines the expected smallest key
    // based on whether it is a full compaction.
    uint64_t smallest_key = is_full ? key_base : key_base * (num_flushes - 1);

    stats_checker->AddExpectedStats(NewManualCompactionJobStats(
        Key(smallest_key, 10),
        Key(smallest_key + key_base * num_input_units - key_interval, 10),
        num_input_units, num_input_units > 2 ? num_input_units / 2 : 0,
        num_keys_per_table * num_input_units, kKeySize, kValueSize,
        num_input_units, num_keys_per_table * num_input_units, 1.0, 0, is_full,
        false));
    ASSERT_OK(dbfull()->TEST_WaitForCompact());
  }
  ASSERT_EQ(stats_checker->NumberOfUnverifiedStats(), 3U);

  for (uint64_t start_key = key_base;
                start_key <= key_base * kTestScale;
                start_key += key_base) {
    MakeTableWithKeyValues(
        &rnd, start_key, start_key + key_base - 1,
        kKeySize, kValueSize, key_interval,
        compression_ratio, 1);
    ASSERT_OK(static_cast_with_check<DBImpl>(db_)->TEST_WaitForCompact());
  }
  ASSERT_EQ(stats_checker->NumberOfUnverifiedStats(), 0U);
}

INSTANTIATE_TEST_CASE_P(CompactionJobStatsTest, CompactionJobStatsTest,
                        ::testing::Values(1, 4));
}  // namespace ROCKSDB_NAMESPACE

int main(int argc, char** argv) {
  ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
  ::testing::InitGoogleTest(&argc, argv);
  return RUN_ALL_TESTS();
}

#else
#include <stdio.h>

int main(int /*argc*/, char** /*argv*/) {
  fprintf(stderr, "SKIPPED, not supported in ROCKSDB_LITE\n");
  return 0;
}

#endif  // !ROCKSDB_LITE

#else

int main(int /*argc*/, char** /*argv*/) { return 0; }
#endif  // !defined(IOS_CROSS_COMPILE)