C API: column family support

This commit is contained in:
Reed Allman 2014-07-07 01:18:52 -07:00
parent 7b85c1e900
commit 1a34aaaef0
3 changed files with 443 additions and 0 deletions

242
db/c.cc
View File

@ -31,10 +31,14 @@
#include "rocksdb/table.h"
using rocksdb::Cache;
using rocksdb::ColumnFamilyDescriptor;
using rocksdb::ColumnFamilyHandle;
using rocksdb::ColumnFamilyOptions;
using rocksdb::CompactionFilter;
using rocksdb::Comparator;
using rocksdb::CompressionType;
using rocksdb::DB;
using rocksdb::DBOptions;
using rocksdb::Env;
using rocksdb::InfoLogLevel;
using rocksdb::FileLock;
@ -78,6 +82,7 @@ struct rocksdb_filelock_t { FileLock* rep; };
struct rocksdb_logger_t { shared_ptr<Logger> rep; };
struct rocksdb_cache_t { shared_ptr<Cache> rep; };
struct rocksdb_livefiles_t { std::vector<LiveFileMetaData> rep; };
struct rocksdb_column_family_handle_t { ColumnFamilyHandle* rep; };
struct rocksdb_compactionfilter_t : public CompactionFilter {
void* state_;
@ -394,6 +399,113 @@ void rocksdb_close(rocksdb_t* db) {
delete db;
}
rocksdb_t* rocksdb_open_column_families(
const rocksdb_options_t* db_options,
const char* name,
int num_column_families,
const char** column_family_names,
const rocksdb_options_t** column_family_options,
rocksdb_column_family_handle_t** column_family_handles,
char** errptr) {
std::vector<ColumnFamilyDescriptor> column_families;
for (int i = 0; i < num_column_families; i++) {
column_families.push_back(ColumnFamilyDescriptor(
std::string(column_family_names[i]),
ColumnFamilyOptions(column_family_options[i]->rep)));
}
DB* db;
std::vector<ColumnFamilyHandle*> handles;
if (SaveError(errptr, DB::Open(DBOptions(db_options->rep),
std::string(name), column_families, &handles, &db))) {
return nullptr;
}
for (size_t i = 0; i < handles.size(); i++) {
rocksdb_column_family_handle_t* c_handle = new rocksdb_column_family_handle_t;
c_handle->rep = handles[i];
column_family_handles[i] = c_handle;
}
rocksdb_t* result = new rocksdb_t;
result->rep = db;
return result;
}
rocksdb_t* rocksdb_open_for_read_only_column_families(
const rocksdb_options_t* db_options,
const char* name,
int num_column_families,
const char** column_family_names,
const rocksdb_options_t** column_family_options,
rocksdb_column_family_handle_t** column_family_handles,
unsigned char error_if_log_file_exist,
char** errptr) {
std::vector<ColumnFamilyDescriptor> column_families;
for (int i = 0; i < num_column_families; i++) {
column_families.push_back(ColumnFamilyDescriptor(
std::string(column_family_names[i]),
ColumnFamilyOptions(column_family_options[i]->rep)));
}
DB* db;
std::vector<ColumnFamilyHandle*> handles;
if (SaveError(errptr, DB::OpenForReadOnly(DBOptions(db_options->rep),
std::string(name), column_families, &handles, &db, error_if_log_file_exist))) {
return nullptr;
}
for (size_t i = 0; i < handles.size(); i++) {
rocksdb_column_family_handle_t* c_handle = new rocksdb_column_family_handle_t;
c_handle->rep = handles[i];
column_family_handles[i] = c_handle;
}
rocksdb_t* result = new rocksdb_t;
result->rep = db;
return result;
}
char** rocksdb_list_column_families(
const rocksdb_options_t* options,
const char* name,
size_t* lencfs,
char** errptr) {
std::vector<std::string> fams;
SaveError(errptr,
DB::ListColumnFamilies(DBOptions(options->rep),
std::string(name), &fams));
*lencfs = fams.size();
char** column_families = static_cast<char**>(malloc(sizeof(char*) * fams.size()));
for (size_t i = 0; i < fams.size(); i++) {
column_families[i] = strdup(fams[i].c_str());
}
return column_families;
}
rocksdb_column_family_handle_t* rocksdb_create_column_family(
rocksdb_t* db,
const rocksdb_options_t* column_family_options,
const char* column_family_name,
char** errptr) {
rocksdb_column_family_handle_t* handle = new rocksdb_column_family_handle_t;
SaveError(errptr,
db->rep->CreateColumnFamily(ColumnFamilyOptions(column_family_options->rep),
std::string(column_family_name), &(handle->rep)));
return handle;
}
void rocksdb_drop_column_family(
rocksdb_t* db,
rocksdb_column_family_handle_t* handle,
char** errptr) {
SaveError(errptr, db->rep->DropColumnFamily(handle->rep));
}
void rocksdb_column_family_handle_destroy(rocksdb_column_family_handle_t* handle) {
delete handle->rep;
delete handle;
}
void rocksdb_put(
rocksdb_t* db,
const rocksdb_writeoptions_t* options,
@ -404,6 +516,18 @@ void rocksdb_put(
db->rep->Put(options->rep, Slice(key, keylen), Slice(val, vallen)));
}
void rocksdb_put_cf(
rocksdb_t* db,
const rocksdb_writeoptions_t* options,
rocksdb_column_family_handle_t* column_family,
const char* key, size_t keylen,
const char* val, size_t vallen,
char** errptr) {
SaveError(errptr,
db->rep->Put(options->rep, column_family->rep,
Slice(key, keylen), Slice(val, vallen)));
}
void rocksdb_delete(
rocksdb_t* db,
const rocksdb_writeoptions_t* options,
@ -412,6 +536,16 @@ void rocksdb_delete(
SaveError(errptr, db->rep->Delete(options->rep, Slice(key, keylen)));
}
void rocksdb_delete_cf(
rocksdb_t* db,
const rocksdb_writeoptions_t* options,
rocksdb_column_family_handle_t* column_family,
const char* key, size_t keylen,
char** errptr) {
SaveError(errptr, db->rep->Delete(options->rep, column_family->rep,
Slice(key, keylen)));
}
void rocksdb_merge(
rocksdb_t* db,
const rocksdb_writeoptions_t* options,
@ -422,6 +556,18 @@ void rocksdb_merge(
db->rep->Merge(options->rep, Slice(key, keylen), Slice(val, vallen)));
}
void rocksdb_merge_cf(
rocksdb_t* db,
const rocksdb_writeoptions_t* options,
rocksdb_column_family_handle_t* column_family,
const char* key, size_t keylen,
const char* val, size_t vallen,
char** errptr) {
SaveError(errptr,
db->rep->Merge(options->rep, column_family->rep,
Slice(key, keylen), Slice(val, vallen)));
}
void rocksdb_write(
rocksdb_t* db,
const rocksdb_writeoptions_t* options,
@ -451,6 +597,29 @@ char* rocksdb_get(
return result;
}
char* rocksdb_get_cf(
rocksdb_t* db,
const rocksdb_readoptions_t* options,
rocksdb_column_family_handle_t* column_family,
const char* key, size_t keylen,
size_t* vallen,
char** errptr) {
char* result = nullptr;
std::string tmp;
Status s = db->rep->Get(options->rep, column_family->rep,
Slice(key, keylen), &tmp);
if (s.ok()) {
*vallen = tmp.size();
result = CopyString(tmp);
} else {
*vallen = 0;
if (!s.IsNotFound()) {
SaveError(errptr, s);
}
}
return result;
}
rocksdb_iterator_t* rocksdb_create_iterator(
rocksdb_t* db,
const rocksdb_readoptions_t* options) {
@ -459,6 +628,15 @@ rocksdb_iterator_t* rocksdb_create_iterator(
return result;
}
rocksdb_iterator_t* rocksdb_create_iterator_cf(
rocksdb_t* db,
const rocksdb_readoptions_t* options,
rocksdb_column_family_handle_t* column_family) {
rocksdb_iterator_t* result = new rocksdb_iterator_t;
result->rep = db->rep->NewIterator(options->rep, column_family->rep);
return result;
}
const rocksdb_snapshot_t* rocksdb_create_snapshot(
rocksdb_t* db) {
rocksdb_snapshot_t* result = new rocksdb_snapshot_t;
@ -485,6 +663,19 @@ char* rocksdb_property_value(
}
}
char* rocksdb_property_value_cf(
rocksdb_t* db,
rocksdb_column_family_handle_t* column_family,
const char* propname) {
std::string tmp;
if (db->rep->GetProperty(column_family->rep, Slice(propname), &tmp)) {
// We use strdup() since we expect human readable output.
return strdup(tmp.c_str());
} else {
return nullptr;
}
}
void rocksdb_approximate_sizes(
rocksdb_t* db,
int num_ranges,
@ -500,6 +691,22 @@ void rocksdb_approximate_sizes(
delete[] ranges;
}
void rocksdb_approximate_sizes_cf(
rocksdb_t* db,
rocksdb_column_family_handle_t* column_family,
int num_ranges,
const char* const* range_start_key, const size_t* range_start_key_len,
const char* const* range_limit_key, const size_t* range_limit_key_len,
uint64_t* sizes) {
Range* ranges = new Range[num_ranges];
for (int i = 0; i < num_ranges; i++) {
ranges[i].start = Slice(range_start_key[i], range_start_key_len[i]);
ranges[i].limit = Slice(range_limit_key[i], range_limit_key_len[i]);
}
db->rep->GetApproximateSizes(column_family->rep, ranges, num_ranges, sizes);
delete[] ranges;
}
void rocksdb_delete_file(
rocksdb_t* db,
const char* name) {
@ -524,6 +731,18 @@ void rocksdb_compact_range(
(limit_key ? (b = Slice(limit_key, limit_key_len), &b) : nullptr));
}
void rocksdb_compact_range_cf(
rocksdb_t* db,
rocksdb_column_family_handle_t* column_family,
const char* start_key, size_t start_key_len,
const char* limit_key, size_t limit_key_len) {
Slice a, b;
db->rep->CompactRange(
// Pass nullptr Slice if corresponding "const char*" is nullptr
(start_key ? (a = Slice(start_key, start_key_len), &a) : nullptr),
(limit_key ? (b = Slice(limit_key, limit_key_len), &b) : nullptr));
}
void rocksdb_flush(
rocksdb_t* db,
const rocksdb_flushoptions_t* options,
@ -626,6 +845,14 @@ void rocksdb_writebatch_put(
b->rep.Put(Slice(key, klen), Slice(val, vlen));
}
void rocksdb_writebatch_put_cf(
rocksdb_writebatch_t* b,
rocksdb_column_family_handle_t* column_family,
const char* key, size_t klen,
const char* val, size_t vlen) {
b->rep.Put(column_family->rep, Slice(key, klen), Slice(val, vlen));
}
void rocksdb_writebatch_merge(
rocksdb_writebatch_t* b,
const char* key, size_t klen,
@ -633,12 +860,27 @@ void rocksdb_writebatch_merge(
b->rep.Merge(Slice(key, klen), Slice(val, vlen));
}
void rocksdb_writebatch_merge_cf(
rocksdb_writebatch_t* b,
rocksdb_column_family_handle_t* column_family,
const char* key, size_t klen,
const char* val, size_t vlen) {
b->rep.Merge(column_family->rep, Slice(key, klen), Slice(val, vlen));
}
void rocksdb_writebatch_delete(
rocksdb_writebatch_t* b,
const char* key, size_t klen) {
b->rep.Delete(Slice(key, klen));
}
void rocksdb_writebatch_delete_cf(
rocksdb_writebatch_t* b,
rocksdb_column_family_handle_t* column_family,
const char* key, size_t klen) {
b->rep.Delete(column_family->rep, Slice(key, klen));
}
void rocksdb_writebatch_iterate(
rocksdb_writebatch_t* b,
void* state,

View File

@ -75,6 +75,22 @@ static void CheckGet(
Free(&val);
}
static void CheckGetCF(
rocksdb_t* db,
const rocksdb_readoptions_t* options,
rocksdb_column_family_handle_t* handle,
const char* key,
const char* expected) {
char* err = NULL;
size_t val_len;
char* val;
val = rocksdb_get_cf(db, options, handle, key, strlen(key), &val_len, &err);
CheckNoError(err);
CheckEqual(expected, val, val_len);
Free(&val);
}
static void CheckIter(rocksdb_iterator_t* iter,
const char* key, const char* val) {
size_t len;
@ -486,6 +502,82 @@ int main(int argc, char** argv) {
}
StartPhase("columnfamilies");
{
rocksdb_close(db);
rocksdb_destroy_db(options, dbname, &err);
CheckNoError(err)
rocksdb_options_t* db_options = rocksdb_options_create();
rocksdb_options_set_create_if_missing(db_options, 1);
db = rocksdb_open(db_options, dbname, &err);
CheckNoError(err)
rocksdb_column_family_handle_t* cfh;
cfh = rocksdb_create_column_family(db, db_options, "cf1", &err);
rocksdb_column_family_handle_destroy(cfh);
CheckNoError(err);
rocksdb_close(db);
size_t cflen;
char** column_fams = rocksdb_list_column_families(db_options, dbname, &cflen, &err);
CheckNoError(err);
// TODO column_families vals seg fault
CheckEqual("default", column_fams[0], 7);
CheckEqual("cf1", column_fams[1], 3);
CheckCondition(cflen == 2);
rocksdb_options_t* cf_options = rocksdb_options_create();
const char* cf_names[2] = {"default", "cf1"};
const rocksdb_options_t* cf_opts[2] = {cf_options, cf_options};
rocksdb_column_family_handle_t* handles[2];
db = rocksdb_open_column_families(db_options, dbname, 2, cf_names, cf_opts, handles, &err);
CheckNoError(err);
rocksdb_put_cf(db, woptions, handles[1], "foo", 3, "hello", 5, &err);
CheckNoError(err);
CheckGetCF(db, roptions, handles[1], "foo", "hello");
rocksdb_delete_cf(db, woptions, handles[1], "foo", 3, &err);
CheckNoError(err);
CheckGetCF(db, roptions, handles[1], "foo", NULL);
rocksdb_writebatch_t* wb = rocksdb_writebatch_create();
rocksdb_writebatch_put_cf(wb, handles[1], "baz", 3, "a", 1);
rocksdb_writebatch_clear(wb);
rocksdb_writebatch_put_cf(wb, handles[1], "bar", 3, "b", 1);
rocksdb_writebatch_put_cf(wb, handles[1], "box", 3, "c", 1);
rocksdb_writebatch_delete_cf(wb, handles[1], "bar", 3);
rocksdb_write(db, woptions, wb, &err);
CheckNoError(err);
CheckGetCF(db, roptions, handles[1], "baz", NULL);
CheckGetCF(db, roptions, handles[1], "bar", NULL);
CheckGetCF(db, roptions, handles[1], "box", "c");
rocksdb_writebatch_destroy(wb);
rocksdb_iterator_t* iter = rocksdb_create_iterator_cf(db, roptions, handles[1]);
CheckCondition(!rocksdb_iter_valid(iter));
rocksdb_iter_seek_to_first(iter);
CheckCondition(rocksdb_iter_valid(iter));
int i;
for (i = 0; rocksdb_iter_valid(iter) != 0; rocksdb_iter_next(iter)) {
i++;
}
CheckCondition(i == 1);
rocksdb_iter_get_error(iter, &err);
CheckNoError(err);
rocksdb_iter_destroy(iter);
rocksdb_drop_column_family(db, handles[1], &err);
CheckNoError(err);
for (i = 0; i < 2; i++) {
rocksdb_column_family_handle_destroy(handles[i]);
}
}
StartPhase("prefix");
{
// Create new database
@ -533,6 +625,7 @@ int main(int argc, char** argv) {
rocksdb_filterpolicy_destroy(policy);
}
StartPhase("cleanup");
rocksdb_close(db);
rocksdb_options_destroy(options);

View File

@ -76,6 +76,7 @@ typedef struct rocksdb_writebatch_t rocksdb_writebatch_t;
typedef struct rocksdb_writeoptions_t rocksdb_writeoptions_t;
typedef struct rocksdb_universal_compaction_options_t rocksdb_universal_compaction_options_t;
typedef struct rocksdb_livefiles_t rocksdb_livefiles_t;
typedef struct rocksdb_column_family_handle_t rocksdb_column_family_handle_t;
/* DB operations */
@ -90,6 +91,44 @@ extern rocksdb_t* rocksdb_open_for_read_only(
unsigned char error_if_log_file_exist,
char** errptr);
extern rocksdb_t* rocksdb_open_column_families(
const rocksdb_options_t* options,
const char* name,
int num_column_families,
const char** column_family_names,
const rocksdb_options_t** column_family_options,
rocksdb_column_family_handle_t** column_family_handles,
char** errptr);
extern rocksdb_t* rocksdb_open_for_read_only_column_families(
const rocksdb_options_t* options,
const char* name,
int num_column_families,
const char** column_family_names,
const rocksdb_options_t** column_family_options,
rocksdb_column_family_handle_t** column_family_handles,
unsigned char error_if_log_file_exist,
char** errptr);
char** rocksdb_list_column_families(
const rocksdb_options_t* options,
const char* name,
size_t* lencf,
char** errptr);
extern rocksdb_column_family_handle_t* rocksdb_create_column_family(
rocksdb_t* db,
const rocksdb_options_t* column_family_options,
const char* column_family_name,
char** errptr);
extern void rocksdb_drop_column_family(
rocksdb_t* db,
rocksdb_column_family_handle_t* handle,
char** errptr);
extern void rocksdb_column_family_handle_destroy(rocksdb_column_family_handle_t* handle);
extern void rocksdb_close(rocksdb_t* db);
extern void rocksdb_put(
@ -99,12 +138,27 @@ extern void rocksdb_put(
const char* val, size_t vallen,
char** errptr);
extern void rocksdb_put_cf(
rocksdb_t* db,
const rocksdb_writeoptions_t* options,
rocksdb_column_family_handle_t* column_family,
const char* key, size_t keylen,
const char* val, size_t vallen,
char** errptr);
extern void rocksdb_delete(
rocksdb_t* db,
const rocksdb_writeoptions_t* options,
const char* key, size_t keylen,
char** errptr);
void rocksdb_delete_cf(
rocksdb_t* db,
const rocksdb_writeoptions_t* options,
rocksdb_column_family_handle_t* column_family,
const char* key, size_t keylen,
char** errptr);
extern void rocksdb_merge(
rocksdb_t* db,
const rocksdb_writeoptions_t* options,
@ -112,6 +166,14 @@ extern void rocksdb_merge(
const char* val, size_t vallen,
char** errptr);
extern void rocksdb_merge_cf(
rocksdb_t* db,
const rocksdb_writeoptions_t* options,
rocksdb_column_family_handle_t* column_family,
const char* key, size_t keylen,
const char* val, size_t vallen,
char** errptr);
extern void rocksdb_write(
rocksdb_t* db,
const rocksdb_writeoptions_t* options,
@ -127,10 +189,23 @@ extern char* rocksdb_get(
size_t* vallen,
char** errptr);
extern char* rocksdb_get_cf(
rocksdb_t* db,
const rocksdb_readoptions_t* options,
rocksdb_column_family_handle_t* column_family,
const char* key, size_t keylen,
size_t* vallen,
char** errptr);
extern rocksdb_iterator_t* rocksdb_create_iterator(
rocksdb_t* db,
const rocksdb_readoptions_t* options);
extern rocksdb_iterator_t* rocksdb_create_iterator_cf(
rocksdb_t* db,
const rocksdb_readoptions_t* options,
rocksdb_column_family_handle_t* column_family);
extern const rocksdb_snapshot_t* rocksdb_create_snapshot(
rocksdb_t* db);
@ -144,6 +219,11 @@ extern char* rocksdb_property_value(
rocksdb_t* db,
const char* propname);
extern char* rocksdb_property_value_cf(
rocksdb_t* db,
rocksdb_column_family_handle_t* column_family,
const char* propname);
extern void rocksdb_approximate_sizes(
rocksdb_t* db,
int num_ranges,
@ -151,11 +231,25 @@ extern void rocksdb_approximate_sizes(
const char* const* range_limit_key, const size_t* range_limit_key_len,
uint64_t* sizes);
extern void rocksdb_approximate_sizes_cf(
rocksdb_t* db,
rocksdb_column_family_handle_t* column_family,
int num_ranges,
const char* const* range_start_key, const size_t* range_start_key_len,
const char* const* range_limit_key, const size_t* range_limit_key_len,
uint64_t* sizes);
extern void rocksdb_compact_range(
rocksdb_t* db,
const char* start_key, size_t start_key_len,
const char* limit_key, size_t limit_key_len);
extern void rocksdb_compact_range_cf(
rocksdb_t* db,
rocksdb_column_family_handle_t* column_family,
const char* start_key, size_t start_key_len,
const char* limit_key, size_t limit_key_len);
extern void rocksdb_delete_file(
rocksdb_t* db,
const char* name);
@ -212,13 +306,27 @@ extern void rocksdb_writebatch_put(
rocksdb_writebatch_t*,
const char* key, size_t klen,
const char* val, size_t vlen);
extern void rocksdb_writebatch_put_cf(
rocksdb_writebatch_t*,
rocksdb_column_family_handle_t* column_family,
const char* key, size_t klen,
const char* val, size_t vlen);
extern void rocksdb_writebatch_merge(
rocksdb_writebatch_t*,
const char* key, size_t klen,
const char* val, size_t vlen);
extern void rocksdb_writebatch_merge_cf(
rocksdb_writebatch_t*,
rocksdb_column_family_handle_t* column_family,
const char* key, size_t klen,
const char* val, size_t vlen);
extern void rocksdb_writebatch_delete(
rocksdb_writebatch_t*,
const char* key, size_t klen);
extern void rocksdb_writebatch_delete_cf(
rocksdb_writebatch_t*,
rocksdb_column_family_handle_t* column_family,
const char* key, size_t klen);
extern void rocksdb_writebatch_iterate(
rocksdb_writebatch_t*,
void* state,