rocksdb/examples/compact_files_example.cc
Yueh-Hsuan Chiang 28c82ff1b3 CompactFiles, EventListener and GetDatabaseMetaData
Summary:
This diff adds three sets of APIs to RocksDB.

= GetColumnFamilyMetaData =
* This APIs allow users to obtain the current state of a RocksDB instance on one column family.
* See GetColumnFamilyMetaData in include/rocksdb/db.h

= EventListener =
* A virtual class that allows users to implement a set of
  call-back functions which will be called when specific
  events of a RocksDB instance happens.
* To register EventListener, simply insert an EventListener to ColumnFamilyOptions::listeners

= CompactFiles =
* CompactFiles API inputs a set of file numbers and an output level, and RocksDB
  will try to compact those files into the specified level.

= Example =
* Example code can be found in example/compact_files_example.cc, which implements
  a simple external compactor using EventListener, GetColumnFamilyMetaData, and
  CompactFiles API.

Test Plan:
listener_test
compactor_test
example/compact_files_example
export ROCKSDB_TESTS=CompactFiles
db_test
export ROCKSDB_TESTS=MetaData
db_test

Reviewers: ljin, igor, rven, sdong

Reviewed By: sdong

Subscribers: MarkCallaghan, dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D24705
2014-11-07 14:45:18 -08:00

176 lines
5.8 KiB
C++

// Copyright (c) 2014, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
// An example code demonstrating how to use CompactFiles, EventListener,
// and GetColumnFamilyMetaData APIs to implement custom compaction algorithm.
#include <mutex>
#include <string>
#include "rocksdb/db.h"
#include "rocksdb/env.h"
#include "rocksdb/options.h"
using namespace rocksdb;
std::string kDBPath = "/tmp/rocksdb_compact_files_example";
class CompactionTask;
// This is an example interface of external-compaction algorithm.
// Compaction algorithm can be implemented outside the core-RocksDB
// code by using the pluggable compaction APIs that RocksDb provides.
class Compactor : public EventListener {
public:
// Picks and returns a compaction task given the specified DB
// and column family. It is the caller's responsibility to
// destroy the returned CompactionTask. Returns "nullptr"
// if it cannot find a proper compaction task.
virtual CompactionTask* PickCompaction(
DB* db, const std::string& cf_name) = 0;
// Schedule and run the specified compaction task in background.
virtual void ScheduleCompaction(CompactionTask *task) = 0;
};
// Example structure that describes a compaction task.
struct CompactionTask {
CompactionTask(
DB* db, Compactor* compactor,
const std::string& column_family_name,
const std::vector<std::string>& input_file_names,
const int output_level,
const CompactionOptions& compact_options,
bool retry_on_fail)
: db(db),
compactor(compactor),
column_family_name(column_family_name),
input_file_names(input_file_names),
output_level(output_level),
compact_options(compact_options),
retry_on_fail(false) {}
DB* db;
Compactor* compactor;
const std::string& column_family_name;
std::vector<std::string> input_file_names;
int output_level;
CompactionOptions compact_options;
bool retry_on_fail;
};
// A simple compaction algorithm that always compacts everything
// to the highest level whenever possible.
class FullCompactor : public Compactor {
public:
explicit FullCompactor(const Options options) : options_(options) {
compact_options_.compression = options_.compression;
compact_options_.output_file_size_limit =
options_.target_file_size_base;
}
// When flush happens, it determins whether to trigger compaction.
// If triggered_writes_stop is true, it will also set the retry
// flag of compaction-task to true.
void OnFlushCompleted(
DB* db, const std::string& cf_name,
const std::string& file_path,
bool triggered_writes_slowdown,
bool triggered_writes_stop) override {
CompactionTask* task = PickCompaction(db, cf_name);
if (task != nullptr) {
if (triggered_writes_stop) {
task->retry_on_fail = true;
}
// Schedule compaction in a different thread.
ScheduleCompaction(task);
}
}
// Always pick a compaction which includes all files whenever possible.
CompactionTask* PickCompaction(
DB* db, const std::string& cf_name) override {
ColumnFamilyMetaData cf_meta;
db->GetColumnFamilyMetaData(&cf_meta);
std::vector<std::string> input_file_names;
for (auto level : cf_meta.levels) {
for (auto file : level.files) {
if (file.being_compacted) {
return nullptr;
}
input_file_names.push_back(file.name);
}
}
return new CompactionTask(
db, this, cf_name, input_file_names,
options_.num_levels - 1, compact_options_, false);
}
// Schedule the specified compaction task in background.
void ScheduleCompaction(CompactionTask* task) override {
options_.env->Schedule(&FullCompactor::CompactFiles, task);
}
static void CompactFiles(void* arg) {
CompactionTask* task = reinterpret_cast<CompactionTask*>(arg);
assert(task);
assert(task->db);
Status s = task->db->CompactFiles(
task->compact_options,
task->input_file_names,
task->output_level);
printf("CompactFiles() finished with status %s\n", s.ToString().c_str());
if (!s.ok() && !s.IsIOError() && task->retry_on_fail) {
// If a compaction task with its retry_on_fail=true failed,
// try to schedule another compaction in case the reason
// is not an IO error.
CompactionTask* new_task = task->compactor->PickCompaction(
task->db, task->column_family_name);
task->compactor->ScheduleCompaction(new_task);
}
// release the task
delete task;
}
private:
Options options_;
CompactionOptions compact_options_;
};
int main() {
Options options;
options.create_if_missing = true;
// Disable RocksDB background compaction.
options.compaction_style = kCompactionStyleNone;
// Small slowdown and stop trigger for experimental purpose.
options.level0_slowdown_writes_trigger = 3;
options.level0_stop_writes_trigger = 5;
options.IncreaseParallelism(5);
options.listeners.emplace_back(new FullCompactor(options));
DB* db = nullptr;
DestroyDB(kDBPath, options);
Status s = DB::Open(options, kDBPath, &db);
assert(s.ok());
assert(db);
// if background compaction is not working, write will stall
// because of options.level0_stop_writes_trigger
for (int i = 1000; i < 99999; ++i) {
db->Put(WriteOptions(), std::to_string(i),
std::string(500, 'a' + (i % 26)));
}
// verify the values are still there
std::string value;
for (int i = 1000; i < 99999; ++i) {
db->Get(ReadOptions(), std::to_string(i),
&value);
assert(value == std::string(500, 'a' + (i % 26)));
}
// close the db.
delete db;
return 0;
}