Merge pull request #179 from edsrzf/c-api-compaction-filter

Support for compaction filters in the C API
This commit is contained in:
Igor Canadi 2014-06-19 21:22:46 +02:00
commit bae495740d
3 changed files with 147 additions and 1 deletions

75
db/c.cc
View File

@ -14,6 +14,7 @@
#include <stdlib.h>
#include <unistd.h>
#include "rocksdb/cache.h"
#include "rocksdb/compaction_filter.h"
#include "rocksdb/comparator.h"
#include "rocksdb/db.h"
#include "rocksdb/env.h"
@ -30,6 +31,7 @@
#include "rocksdb/table.h"
using rocksdb::Cache;
using rocksdb::CompactionFilter;
using rocksdb::Comparator;
using rocksdb::CompressionType;
using rocksdb::DB;
@ -77,6 +79,49 @@ struct rocksdb_logger_t { shared_ptr<Logger> rep; };
struct rocksdb_cache_t { shared_ptr<Cache> rep; };
struct rocksdb_livefiles_t { std::vector<LiveFileMetaData> rep; };
struct rocksdb_compactionfilter_t : public CompactionFilter {
void* state_;
void (*destructor_)(void*);
unsigned char (*filter_)(
void*,
int level,
const char* key, size_t key_length,
const char* existing_value, size_t value_length,
char** new_value, size_t *new_value_length,
unsigned char* value_changed);
const char* (*name_)(void*);
virtual ~rocksdb_compactionfilter_t() {
(*destructor_)(state_);
}
virtual bool Filter(
int level,
const Slice& key,
const Slice& existing_value,
std::string* new_value,
bool* value_changed) const {
char* c_new_value = NULL;
size_t new_value_length = 0;
unsigned char c_value_changed = 0;
unsigned char result = (*filter_)(
state_,
level,
key.data(), key.size(),
existing_value.data(), existing_value.size(),
&c_new_value, &new_value_length, &c_value_changed);
if (c_value_changed) {
new_value->assign(c_new_value, new_value_length);
*value_changed = true;
}
return result;
}
virtual const char* Name() const {
return (*name_)(state_);
}
};
struct rocksdb_comparator_t : public Comparator {
void* state_;
void (*destructor_)(void*);
@ -631,6 +676,12 @@ void rocksdb_options_destroy(rocksdb_options_t* options) {
delete options;
}
void rocksdb_options_set_compaction_filter(
rocksdb_options_t* opt,
rocksdb_compactionfilter_t* filter) {
opt->rep.compaction_filter = filter;
}
void rocksdb_options_set_comparator(
rocksdb_options_t* opt,
rocksdb_comparator_t* cmp) {
@ -1119,10 +1170,32 @@ DB::GetUpdatesSince
DB::GetDbIdentity
DB::RunManualCompaction
custom cache
compaction_filter
table_properties_collectors
*/
rocksdb_compactionfilter_t* rocksdb_compactionfilter_create(
void* state,
void (*destructor)(void*),
unsigned char (*filter)(
void*,
int level,
const char* key, size_t key_length,
const char* existing_value, size_t value_length,
char** new_value, size_t *new_value_length,
unsigned char* value_changed),
const char* (*name)(void*)) {
rocksdb_compactionfilter_t* result = new rocksdb_compactionfilter_t;
result->state_ = state;
result->destructor_ = destructor;
result->filter_ = filter;
result->name_ = name;
return result;
}
void rocksdb_compactionfilter_destroy(rocksdb_compactionfilter_t* filter) {
delete filter;
}
rocksdb_comparator_t* rocksdb_comparator_create(
void* state,
void (*destructor)(void*),

View File

@ -154,6 +154,28 @@ static unsigned char FilterKeyMatch(
return fake_filter_result;
}
// Custom compaction filter
static void CFilterDestroy(void* arg) {}
static const char* CFilterName(void* arg) { return "foo"; }
static unsigned char CFilterFilter(void* arg, int level, const char* key,
size_t key_length,
const char* existing_value,
size_t value_length, char** new_value,
size_t* new_value_length,
unsigned char* value_changed) {
if (key_length == 3) {
if (memcmp(key, "bar", key_length) == 0) {
return 1;
} else if (memcmp(key, "baz", key_length) == 0) {
*value_changed = 1;
*new_value = "newbazvalue";
*new_value_length = 11;
return 0;
}
}
return 0;
}
// Custom merge operator
static void MergeOperatorDestroy(void* arg) { }
static const char* MergeOperatorName(void* arg) {
@ -407,6 +429,37 @@ int main(int argc, char** argv) {
rocksdb_filterpolicy_destroy(policy);
}
StartPhase("compaction_filter");
{
rocksdb_compactionfilter_t* cfilter;
cfilter = rocksdb_compactionfilter_create(NULL, CFilterDestroy,
CFilterFilter, CFilterName);
// Create new database
rocksdb_close(db);
rocksdb_destroy_db(options, dbname, &err);
rocksdb_options_set_compaction_filter(options, cfilter);
db = rocksdb_open(options, dbname, &err);
CheckNoError(err);
rocksdb_put(db, woptions, "foo", 3, "foovalue", 8, &err);
CheckNoError(err);
CheckGet(db, roptions, "foo", "foovalue");
rocksdb_put(db, woptions, "bar", 3, "barvalue", 8, &err);
CheckNoError(err);
CheckGet(db, roptions, "bar", "barvalue");
rocksdb_put(db, woptions, "baz", 3, "bazvalue", 8, &err);
CheckNoError(err);
CheckGet(db, roptions, "baz", "bazvalue");
// Force compaction
rocksdb_compact_range(db, NULL, 0, NULL, 0);
// should have filtered bar, but not foo
CheckGet(db, roptions, "foo", "foovalue");
CheckGet(db, roptions, "bar", NULL);
CheckGet(db, roptions, "baz", "newbazvalue");
rocksdb_compactionfilter_destroy(cfilter);
}
StartPhase("merge_operator");
{
rocksdb_mergeoperator_t* merge_operator;

View File

@ -56,6 +56,7 @@ extern "C" {
typedef struct rocksdb_t rocksdb_t;
typedef struct rocksdb_cache_t rocksdb_cache_t;
typedef struct rocksdb_compactionfilter_t rocksdb_compactionfilter_t;
typedef struct rocksdb_comparator_t rocksdb_comparator_t;
typedef struct rocksdb_env_t rocksdb_env_t;
typedef struct rocksdb_filelock_t rocksdb_filelock_t;
@ -229,6 +230,9 @@ extern const char* rocksdb_writebatch_data(rocksdb_writebatch_t*, size_t *size);
extern rocksdb_options_t* rocksdb_options_create();
extern void rocksdb_options_destroy(rocksdb_options_t*);
extern void rocksdb_options_set_compaction_filter(
rocksdb_options_t*,
rocksdb_compactionfilter_t*);
extern void rocksdb_options_set_comparator(
rocksdb_options_t*,
rocksdb_comparator_t*);
@ -401,6 +405,22 @@ enum {
};
extern void rocksdb_options_set_compaction_style(rocksdb_options_t*, int);
extern void rocksdb_options_set_universal_compaction_options(rocksdb_options_t*, rocksdb_universal_compaction_options_t*);
/* Compaction Filter */
extern rocksdb_compactionfilter_t* rocksdb_compactionfilter_create(
void* state,
void (*destructor)(void*),
unsigned char (*filter)(
void*,
int level,
const char* key, size_t key_length,
const char* existing_value, size_t value_length,
char** new_value, size_t *new_value_length,
unsigned char* value_changed),
const char* (*name)(void*));
extern void rocksdb_compactionfilter_destroy(rocksdb_compactionfilter_t*);
/* Comparator */
extern rocksdb_comparator_t* rocksdb_comparator_create(