Improved transactions support in C API

Summary:
Solves #2632

Added OptimisticTransactionDB to the C API.
Added missing merge operations to Transaction.
Added missing get_for_update operation to transaction

If required I will create tests for this another day.
Closes https://github.com/facebook/rocksdb/pull/2633

Differential Revision: D5600906

Pulled By: yiwu-arbug

fbshipit-source-id: da23e4484433d8f59d471f778ff2ae210e3fe4eb
This commit is contained in:
mkosieradzki 2017-08-23 12:32:42 -07:00 committed by Facebook Github Bot
parent c10b391314
commit a12479819d
2 changed files with 166 additions and 5 deletions

123
db/c.cc
View File

@ -36,6 +36,7 @@
#include "utilities/merge_operators.h"
#include "rocksdb/utilities/transaction.h"
#include "rocksdb/utilities/transaction_db.h"
#include "rocksdb/utilities/optimistic_transaction_db.h"
#include "rocksdb/utilities/checkpoint.h"
using rocksdb::BytewiseComparator;
@ -95,6 +96,8 @@ using rocksdb::PinnableSlice;
using rocksdb::TransactionDBOptions;
using rocksdb::TransactionDB;
using rocksdb::TransactionOptions;
using rocksdb::OptimisticTransactionDB;
using rocksdb::OptimisticTransactionOptions;
using rocksdb::Transaction;
using rocksdb::Checkpoint;
@ -153,6 +156,12 @@ struct rocksdb_transaction_t {
struct rocksdb_checkpoint_t {
Checkpoint* rep;
};
struct rocksdb_optimistictransactiondb_t {
OptimisticTransactionDB* rep;
};
struct rocksdb_optimistictransaction_options_t {
OptimisticTransactionOptions rep;
};
struct rocksdb_compactionfiltercontext_t {
CompactionFilter::Context rep;
@ -3253,6 +3262,21 @@ void rocksdb_transaction_options_set_max_write_batch_size(
opt->rep.max_write_batch_size = size;
}
rocksdb_optimistictransaction_options_t*
rocksdb_optimistictransaction_options_create() {
return new rocksdb_optimistictransaction_options_t;
}
void rocksdb_optimistictransaction_options_destroy(
rocksdb_optimistictransaction_options_t* opt) {
delete opt;
}
void rocksdb_optimistictransaction_options_set_set_snapshot(
rocksdb_optimistictransaction_options_t* opt, unsigned char v) {
opt->rep.set_snapshot = v;
}
rocksdb_column_family_handle_t* rocksdb_transactiondb_create_column_family(
rocksdb_transactiondb_t* txn_db,
const rocksdb_options_t* column_family_options,
@ -3320,7 +3344,14 @@ void rocksdb_transaction_destroy(rocksdb_transaction_t* txn) {
delete txn;
}
//Read a key inside a transaction
const rocksdb_snapshot_t* rocksdb_transaction_get_snapshot(
rocksdb_transaction_t* txn) {
rocksdb_snapshot_t* result = new rocksdb_snapshot_t;
result->rep = txn->rep->GetSnapshot();
return result;
}
// Read a key inside a transaction
char* rocksdb_transaction_get(rocksdb_transaction_t* txn,
const rocksdb_readoptions_t* options,
const char* key, size_t klen, size_t* vlen,
@ -3361,6 +3392,28 @@ char* rocksdb_transaction_get_cf(rocksdb_transaction_t* txn,
return result;
}
// Read a key inside a transaction
char* rocksdb_transaction_get_for_update(rocksdb_transaction_t* txn,
const rocksdb_readoptions_t* options,
const char* key, size_t klen,
size_t* vlen, unsigned char exclusive,
char** errptr) {
char* result = nullptr;
std::string tmp;
Status s =
txn->rep->GetForUpdate(options->rep, Slice(key, klen), &tmp, exclusive);
if (s.ok()) {
*vlen = tmp.size();
result = CopyString(tmp);
} else {
*vlen = 0;
if (!s.IsNotFound()) {
SaveError(errptr, s);
}
}
return result;
}
// Read a key outside a transaction
char* rocksdb_transactiondb_get(
rocksdb_transactiondb_t* txn_db,
@ -3418,13 +3471,13 @@ void rocksdb_transaction_put_cf(rocksdb_transaction_t* txn,
Slice(val, vlen)));
}
//Put a key outside a transaction
// Put a key outside a transaction
void rocksdb_transactiondb_put(rocksdb_transactiondb_t* txn_db,
const rocksdb_writeoptions_t* options,
const char* key, size_t klen, const char* val,
size_t vlen, char** errptr) {
SaveError(errptr,
txn_db->rep->Put(options->rep, Slice(key, klen), Slice(val, vlen)));
SaveError(errptr, txn_db->rep->Put(options->rep, Slice(key, klen),
Slice(val, vlen)));
}
void rocksdb_transactiondb_put_cf(rocksdb_transactiondb_t* txn_db,
@ -3437,7 +3490,7 @@ void rocksdb_transactiondb_put_cf(rocksdb_transactiondb_t* txn_db,
Slice(key, keylen), Slice(val, vallen)));
}
//Write batch into transaction db
// Write batch into transaction db
void rocksdb_transactiondb_write(
rocksdb_transactiondb_t* db,
const rocksdb_writeoptions_t* options,
@ -3446,6 +3499,22 @@ void rocksdb_transactiondb_write(
SaveError(errptr, db->rep->Write(options->rep, &batch->rep));
}
// Merge a key inside a transaction
void rocksdb_transaction_merge(rocksdb_transaction_t* txn, const char* key,
size_t klen, const char* val, size_t vlen,
char** errptr) {
SaveError(errptr, txn->rep->Merge(Slice(key, klen), Slice(val, vlen)));
}
// Merge a key outside a transaction
void rocksdb_transactiondb_merge(rocksdb_transactiondb_t* txn_db,
const rocksdb_writeoptions_t* options,
const char* key, size_t klen, const char* val,
size_t vlen, char** errptr) {
SaveError(errptr,
txn_db->rep->Merge(options->rep, Slice(key, klen), Slice(val, vlen)));
}
// Delete a key inside a transaction
void rocksdb_transaction_delete(rocksdb_transaction_t* txn, const char* key,
size_t klen, char** errptr) {
@ -3481,6 +3550,14 @@ rocksdb_iterator_t* rocksdb_transaction_create_iterator(
return result;
}
// Create an iterator outside a transaction
rocksdb_iterator_t* rocksdb_transactiondb_create_iterator(
rocksdb_transactiondb_t* txn_db, const rocksdb_readoptions_t* options) {
rocksdb_iterator_t* result = new rocksdb_iterator_t;
result->rep = txn_db->rep->NewIterator(options->rep);
return result;
}
void rocksdb_transactiondb_close(rocksdb_transactiondb_t* txn_db) {
delete txn_db->rep;
delete txn_db;
@ -3497,6 +3574,42 @@ rocksdb_checkpoint_t* rocksdb_transactiondb_checkpoint_object_create(
return result;
}
rocksdb_optimistictransactiondb_t* rocksdb_optimistictransactiondb_open(
const rocksdb_options_t* options, const char* name,
char** errptr) {
OptimisticTransactionDB* otxn_db;
if (SaveError(errptr, OptimisticTransactionDB::Open(
options->rep, std::string(name), &otxn_db))) {
return nullptr;
}
rocksdb_optimistictransactiondb_t* result =
new rocksdb_optimistictransactiondb_t;
result->rep = otxn_db;
return result;
}
rocksdb_transaction_t* rocksdb_optimistictransaction_begin(
rocksdb_optimistictransactiondb_t* otxn_db,
const rocksdb_writeoptions_t* write_options,
const rocksdb_optimistictransaction_options_t* otxn_options,
rocksdb_transaction_t* old_txn) {
if (old_txn == nullptr) {
rocksdb_transaction_t* result = new rocksdb_transaction_t;
result->rep = otxn_db->rep->BeginTransaction(write_options->rep,
otxn_options->rep, nullptr);
return result;
}
old_txn->rep = otxn_db->rep->BeginTransaction(
write_options->rep, otxn_options->rep, old_txn->rep);
return old_txn;
}
void rocksdb_optimistictransactiondb_close(
rocksdb_optimistictransactiondb_t* otxn_db) {
delete otxn_db->rep;
delete otxn_db;
}
void rocksdb_free(void* ptr) { free(ptr); }
rocksdb_pinnableslice_t* rocksdb_get_pinned(

View File

@ -117,6 +117,8 @@ typedef struct rocksdb_pinnableslice_t rocksdb_pinnableslice_t;
typedef struct rocksdb_transactiondb_options_t rocksdb_transactiondb_options_t;
typedef struct rocksdb_transactiondb_t rocksdb_transactiondb_t;
typedef struct rocksdb_transaction_options_t rocksdb_transaction_options_t;
typedef struct rocksdb_optimistictransactiondb_t rocksdb_optimistictransactiondb_t;
typedef struct rocksdb_optimistictransaction_options_t rocksdb_optimistictransaction_options_t;
typedef struct rocksdb_transaction_t rocksdb_transaction_t;
typedef struct rocksdb_checkpoint_t rocksdb_checkpoint_t;
@ -1290,6 +1292,10 @@ extern ROCKSDB_LIBRARY_API void rocksdb_transaction_rollback(
extern ROCKSDB_LIBRARY_API void rocksdb_transaction_destroy(
rocksdb_transaction_t* txn);
// This snapshot should be freed using rocksdb_free
extern ROCKSDB_LIBRARY_API const rocksdb_snapshot_t*
rocksdb_transaction_get_snapshot(rocksdb_transaction_t* txn);
extern ROCKSDB_LIBRARY_API char* rocksdb_transaction_get(
rocksdb_transaction_t* txn, const rocksdb_readoptions_t* options,
const char* key, size_t klen, size_t* vlen, char** errptr);
@ -1299,6 +1305,11 @@ extern ROCKSDB_LIBRARY_API char* rocksdb_transaction_get_cf(
rocksdb_column_family_handle_t* column_family, const char* key, size_t klen,
size_t* vlen, char** errptr);
extern ROCKSDB_LIBRARY_API char* rocksdb_transaction_get_for_update(
rocksdb_transaction_t* txn, const rocksdb_readoptions_t* options,
const char* key, size_t klen, size_t* vlen, unsigned char exclusive,
char** errptr);
extern ROCKSDB_LIBRARY_API char* rocksdb_transactiondb_get(
rocksdb_transactiondb_t* txn_db, const rocksdb_readoptions_t* options,
const char* key, size_t klen, size_t* vlen, char** errptr);
@ -1329,6 +1340,14 @@ extern ROCKSDB_LIBRARY_API void rocksdb_transactiondb_write(
rocksdb_transactiondb_t* txn_db, const rocksdb_writeoptions_t* options,
rocksdb_writebatch_t *batch, char** errptr);
extern ROCKSDB_LIBRARY_API void rocksdb_transaction_merge(
rocksdb_transaction_t* txn, const char* key, size_t klen, const char* val,
size_t vlen, char** errptr);
extern ROCKSDB_LIBRARY_API void rocksdb_transactiondb_merge(
rocksdb_transactiondb_t* txn_db, const rocksdb_writeoptions_t* options,
const char* key, size_t klen, const char* val, size_t vlen, char** errptr);
extern ROCKSDB_LIBRARY_API void rocksdb_transaction_delete(
rocksdb_transaction_t* txn, const char* key, size_t klen, char** errptr);
@ -1349,6 +1368,10 @@ extern ROCKSDB_LIBRARY_API rocksdb_iterator_t*
rocksdb_transaction_create_iterator(rocksdb_transaction_t* txn,
const rocksdb_readoptions_t* options);
extern ROCKSDB_LIBRARY_API rocksdb_iterator_t*
rocksdb_transactiondb_create_iterator(rocksdb_transactiondb_t* txn_db,
const rocksdb_readoptions_t* options);
extern ROCKSDB_LIBRARY_API void rocksdb_transactiondb_close(
rocksdb_transactiondb_t* txn_db);
@ -1356,6 +1379,20 @@ extern ROCKSDB_LIBRARY_API rocksdb_checkpoint_t*
rocksdb_transactiondb_checkpoint_object_create(rocksdb_transactiondb_t* txn_db,
char** errptr);
extern ROCKSDB_LIBRARY_API rocksdb_optimistictransactiondb_t*
rocksdb_optimistictransactiondb_open(const rocksdb_options_t* options,
const char* name, char** errptr);
extern ROCKSDB_LIBRARY_API rocksdb_transaction_t*
rocksdb_optimistictransaction_begin(
rocksdb_optimistictransactiondb_t* otxn_db,
const rocksdb_writeoptions_t* write_options,
const rocksdb_optimistictransaction_options_t* otxn_options,
rocksdb_transaction_t* old_txn);
extern ROCKSDB_LIBRARY_API void rocksdb_optimistictransactiondb_close(
rocksdb_optimistictransactiondb_t* otxn_db);
/* Transaction Options */
extern ROCKSDB_LIBRARY_API rocksdb_transactiondb_options_t*
@ -1404,6 +1441,17 @@ extern ROCKSDB_LIBRARY_API void
rocksdb_transaction_options_set_max_write_batch_size(
rocksdb_transaction_options_t* opt, size_t size);
extern ROCKSDB_LIBRARY_API rocksdb_optimistictransaction_options_t*
rocksdb_optimistictransaction_options_create();
extern ROCKSDB_LIBRARY_API void rocksdb_optimistictransaction_options_destroy(
rocksdb_optimistictransaction_options_t* opt);
extern ROCKSDB_LIBRARY_API void
rocksdb_optimistictransaction_options_set_set_snapshot(
rocksdb_optimistictransaction_options_t* opt, unsigned char v);
// referring to convention (3), this should be used by client
// to free memory that was malloc()ed
extern ROCKSDB_LIBRARY_API void rocksdb_free(void* ptr);