From a12479819d19997e386597711b316b00c1402582 Mon Sep 17 00:00:00 2001 From: mkosieradzki Date: Wed, 23 Aug 2017 12:32:42 -0700 Subject: [PATCH] Improved transactions support in C API Summary: Solves #2632 Added OptimisticTransactionDB to the C API. Added missing merge operations to Transaction. Added missing get_for_update operation to transaction If required I will create tests for this another day. Closes https://github.com/facebook/rocksdb/pull/2633 Differential Revision: D5600906 Pulled By: yiwu-arbug fbshipit-source-id: da23e4484433d8f59d471f778ff2ae210e3fe4eb --- db/c.cc | 123 ++++++++++++++++++++++++++++++++++++++++++-- include/rocksdb/c.h | 48 +++++++++++++++++ 2 files changed, 166 insertions(+), 5 deletions(-) diff --git a/db/c.cc b/db/c.cc index 788eab68a..cbfb8557d 100644 --- a/db/c.cc +++ b/db/c.cc @@ -36,6 +36,7 @@ #include "utilities/merge_operators.h" #include "rocksdb/utilities/transaction.h" #include "rocksdb/utilities/transaction_db.h" +#include "rocksdb/utilities/optimistic_transaction_db.h" #include "rocksdb/utilities/checkpoint.h" using rocksdb::BytewiseComparator; @@ -95,6 +96,8 @@ using rocksdb::PinnableSlice; using rocksdb::TransactionDBOptions; using rocksdb::TransactionDB; using rocksdb::TransactionOptions; +using rocksdb::OptimisticTransactionDB; +using rocksdb::OptimisticTransactionOptions; using rocksdb::Transaction; using rocksdb::Checkpoint; @@ -153,6 +156,12 @@ struct rocksdb_transaction_t { struct rocksdb_checkpoint_t { Checkpoint* rep; }; +struct rocksdb_optimistictransactiondb_t { + OptimisticTransactionDB* rep; +}; +struct rocksdb_optimistictransaction_options_t { + OptimisticTransactionOptions rep; +}; struct rocksdb_compactionfiltercontext_t { CompactionFilter::Context rep; @@ -3253,6 +3262,21 @@ void rocksdb_transaction_options_set_max_write_batch_size( opt->rep.max_write_batch_size = size; } +rocksdb_optimistictransaction_options_t* +rocksdb_optimistictransaction_options_create() { + return new rocksdb_optimistictransaction_options_t; +} + +void rocksdb_optimistictransaction_options_destroy( + rocksdb_optimistictransaction_options_t* opt) { + delete opt; +} + +void rocksdb_optimistictransaction_options_set_set_snapshot( + rocksdb_optimistictransaction_options_t* opt, unsigned char v) { + opt->rep.set_snapshot = v; +} + rocksdb_column_family_handle_t* rocksdb_transactiondb_create_column_family( rocksdb_transactiondb_t* txn_db, const rocksdb_options_t* column_family_options, @@ -3320,7 +3344,14 @@ void rocksdb_transaction_destroy(rocksdb_transaction_t* txn) { delete txn; } -//Read a key inside a transaction +const rocksdb_snapshot_t* rocksdb_transaction_get_snapshot( + rocksdb_transaction_t* txn) { + rocksdb_snapshot_t* result = new rocksdb_snapshot_t; + result->rep = txn->rep->GetSnapshot(); + return result; +} + +// Read a key inside a transaction char* rocksdb_transaction_get(rocksdb_transaction_t* txn, const rocksdb_readoptions_t* options, const char* key, size_t klen, size_t* vlen, @@ -3361,6 +3392,28 @@ char* rocksdb_transaction_get_cf(rocksdb_transaction_t* txn, return result; } +// Read a key inside a transaction +char* rocksdb_transaction_get_for_update(rocksdb_transaction_t* txn, + const rocksdb_readoptions_t* options, + const char* key, size_t klen, + size_t* vlen, unsigned char exclusive, + char** errptr) { + char* result = nullptr; + std::string tmp; + Status s = + txn->rep->GetForUpdate(options->rep, Slice(key, klen), &tmp, exclusive); + if (s.ok()) { + *vlen = tmp.size(); + result = CopyString(tmp); + } else { + *vlen = 0; + if (!s.IsNotFound()) { + SaveError(errptr, s); + } + } + return result; +} + // Read a key outside a transaction char* rocksdb_transactiondb_get( rocksdb_transactiondb_t* txn_db, @@ -3418,13 +3471,13 @@ void rocksdb_transaction_put_cf(rocksdb_transaction_t* txn, Slice(val, vlen))); } -//Put a key outside a transaction +// Put a key outside a transaction void rocksdb_transactiondb_put(rocksdb_transactiondb_t* txn_db, const rocksdb_writeoptions_t* options, const char* key, size_t klen, const char* val, size_t vlen, char** errptr) { - SaveError(errptr, - txn_db->rep->Put(options->rep, Slice(key, klen), Slice(val, vlen))); + SaveError(errptr, txn_db->rep->Put(options->rep, Slice(key, klen), + Slice(val, vlen))); } void rocksdb_transactiondb_put_cf(rocksdb_transactiondb_t* txn_db, @@ -3437,7 +3490,7 @@ void rocksdb_transactiondb_put_cf(rocksdb_transactiondb_t* txn_db, Slice(key, keylen), Slice(val, vallen))); } -//Write batch into transaction db +// Write batch into transaction db void rocksdb_transactiondb_write( rocksdb_transactiondb_t* db, const rocksdb_writeoptions_t* options, @@ -3446,6 +3499,22 @@ void rocksdb_transactiondb_write( SaveError(errptr, db->rep->Write(options->rep, &batch->rep)); } +// Merge a key inside a transaction +void rocksdb_transaction_merge(rocksdb_transaction_t* txn, const char* key, + size_t klen, const char* val, size_t vlen, + char** errptr) { + SaveError(errptr, txn->rep->Merge(Slice(key, klen), Slice(val, vlen))); +} + +// Merge a key outside a transaction +void rocksdb_transactiondb_merge(rocksdb_transactiondb_t* txn_db, + const rocksdb_writeoptions_t* options, + const char* key, size_t klen, const char* val, + size_t vlen, char** errptr) { + SaveError(errptr, + txn_db->rep->Merge(options->rep, Slice(key, klen), Slice(val, vlen))); +} + // Delete a key inside a transaction void rocksdb_transaction_delete(rocksdb_transaction_t* txn, const char* key, size_t klen, char** errptr) { @@ -3481,6 +3550,14 @@ rocksdb_iterator_t* rocksdb_transaction_create_iterator( return result; } +// Create an iterator outside a transaction +rocksdb_iterator_t* rocksdb_transactiondb_create_iterator( + rocksdb_transactiondb_t* txn_db, const rocksdb_readoptions_t* options) { + rocksdb_iterator_t* result = new rocksdb_iterator_t; + result->rep = txn_db->rep->NewIterator(options->rep); + return result; +} + void rocksdb_transactiondb_close(rocksdb_transactiondb_t* txn_db) { delete txn_db->rep; delete txn_db; @@ -3497,6 +3574,42 @@ rocksdb_checkpoint_t* rocksdb_transactiondb_checkpoint_object_create( return result; } +rocksdb_optimistictransactiondb_t* rocksdb_optimistictransactiondb_open( + const rocksdb_options_t* options, const char* name, + char** errptr) { + OptimisticTransactionDB* otxn_db; + if (SaveError(errptr, OptimisticTransactionDB::Open( + options->rep, std::string(name), &otxn_db))) { + return nullptr; + } + rocksdb_optimistictransactiondb_t* result = + new rocksdb_optimistictransactiondb_t; + result->rep = otxn_db; + return result; +} + +rocksdb_transaction_t* rocksdb_optimistictransaction_begin( + rocksdb_optimistictransactiondb_t* otxn_db, + const rocksdb_writeoptions_t* write_options, + const rocksdb_optimistictransaction_options_t* otxn_options, + rocksdb_transaction_t* old_txn) { + if (old_txn == nullptr) { + rocksdb_transaction_t* result = new rocksdb_transaction_t; + result->rep = otxn_db->rep->BeginTransaction(write_options->rep, + otxn_options->rep, nullptr); + return result; + } + old_txn->rep = otxn_db->rep->BeginTransaction( + write_options->rep, otxn_options->rep, old_txn->rep); + return old_txn; +} + +void rocksdb_optimistictransactiondb_close( + rocksdb_optimistictransactiondb_t* otxn_db) { + delete otxn_db->rep; + delete otxn_db; +} + void rocksdb_free(void* ptr) { free(ptr); } rocksdb_pinnableslice_t* rocksdb_get_pinned( diff --git a/include/rocksdb/c.h b/include/rocksdb/c.h index 838d7b0c9..2269f7261 100644 --- a/include/rocksdb/c.h +++ b/include/rocksdb/c.h @@ -117,6 +117,8 @@ typedef struct rocksdb_pinnableslice_t rocksdb_pinnableslice_t; typedef struct rocksdb_transactiondb_options_t rocksdb_transactiondb_options_t; typedef struct rocksdb_transactiondb_t rocksdb_transactiondb_t; typedef struct rocksdb_transaction_options_t rocksdb_transaction_options_t; +typedef struct rocksdb_optimistictransactiondb_t rocksdb_optimistictransactiondb_t; +typedef struct rocksdb_optimistictransaction_options_t rocksdb_optimistictransaction_options_t; typedef struct rocksdb_transaction_t rocksdb_transaction_t; typedef struct rocksdb_checkpoint_t rocksdb_checkpoint_t; @@ -1290,6 +1292,10 @@ extern ROCKSDB_LIBRARY_API void rocksdb_transaction_rollback( extern ROCKSDB_LIBRARY_API void rocksdb_transaction_destroy( rocksdb_transaction_t* txn); +// This snapshot should be freed using rocksdb_free +extern ROCKSDB_LIBRARY_API const rocksdb_snapshot_t* +rocksdb_transaction_get_snapshot(rocksdb_transaction_t* txn); + extern ROCKSDB_LIBRARY_API char* rocksdb_transaction_get( rocksdb_transaction_t* txn, const rocksdb_readoptions_t* options, const char* key, size_t klen, size_t* vlen, char** errptr); @@ -1299,6 +1305,11 @@ extern ROCKSDB_LIBRARY_API char* rocksdb_transaction_get_cf( rocksdb_column_family_handle_t* column_family, const char* key, size_t klen, size_t* vlen, char** errptr); +extern ROCKSDB_LIBRARY_API char* rocksdb_transaction_get_for_update( + rocksdb_transaction_t* txn, const rocksdb_readoptions_t* options, + const char* key, size_t klen, size_t* vlen, unsigned char exclusive, + char** errptr); + extern ROCKSDB_LIBRARY_API char* rocksdb_transactiondb_get( rocksdb_transactiondb_t* txn_db, const rocksdb_readoptions_t* options, const char* key, size_t klen, size_t* vlen, char** errptr); @@ -1329,6 +1340,14 @@ extern ROCKSDB_LIBRARY_API void rocksdb_transactiondb_write( rocksdb_transactiondb_t* txn_db, const rocksdb_writeoptions_t* options, rocksdb_writebatch_t *batch, char** errptr); +extern ROCKSDB_LIBRARY_API void rocksdb_transaction_merge( + rocksdb_transaction_t* txn, const char* key, size_t klen, const char* val, + size_t vlen, char** errptr); + +extern ROCKSDB_LIBRARY_API void rocksdb_transactiondb_merge( + rocksdb_transactiondb_t* txn_db, const rocksdb_writeoptions_t* options, + const char* key, size_t klen, const char* val, size_t vlen, char** errptr); + extern ROCKSDB_LIBRARY_API void rocksdb_transaction_delete( rocksdb_transaction_t* txn, const char* key, size_t klen, char** errptr); @@ -1349,6 +1368,10 @@ extern ROCKSDB_LIBRARY_API rocksdb_iterator_t* rocksdb_transaction_create_iterator(rocksdb_transaction_t* txn, const rocksdb_readoptions_t* options); +extern ROCKSDB_LIBRARY_API rocksdb_iterator_t* +rocksdb_transactiondb_create_iterator(rocksdb_transactiondb_t* txn_db, + const rocksdb_readoptions_t* options); + extern ROCKSDB_LIBRARY_API void rocksdb_transactiondb_close( rocksdb_transactiondb_t* txn_db); @@ -1356,6 +1379,20 @@ extern ROCKSDB_LIBRARY_API rocksdb_checkpoint_t* rocksdb_transactiondb_checkpoint_object_create(rocksdb_transactiondb_t* txn_db, char** errptr); +extern ROCKSDB_LIBRARY_API rocksdb_optimistictransactiondb_t* +rocksdb_optimistictransactiondb_open(const rocksdb_options_t* options, + const char* name, char** errptr); + +extern ROCKSDB_LIBRARY_API rocksdb_transaction_t* +rocksdb_optimistictransaction_begin( + rocksdb_optimistictransactiondb_t* otxn_db, + const rocksdb_writeoptions_t* write_options, + const rocksdb_optimistictransaction_options_t* otxn_options, + rocksdb_transaction_t* old_txn); + +extern ROCKSDB_LIBRARY_API void rocksdb_optimistictransactiondb_close( + rocksdb_optimistictransactiondb_t* otxn_db); + /* Transaction Options */ extern ROCKSDB_LIBRARY_API rocksdb_transactiondb_options_t* @@ -1404,6 +1441,17 @@ extern ROCKSDB_LIBRARY_API void rocksdb_transaction_options_set_max_write_batch_size( rocksdb_transaction_options_t* opt, size_t size); + +extern ROCKSDB_LIBRARY_API rocksdb_optimistictransaction_options_t* +rocksdb_optimistictransaction_options_create(); + +extern ROCKSDB_LIBRARY_API void rocksdb_optimistictransaction_options_destroy( + rocksdb_optimistictransaction_options_t* opt); + +extern ROCKSDB_LIBRARY_API void +rocksdb_optimistictransaction_options_set_set_snapshot( + rocksdb_optimistictransaction_options_t* opt, unsigned char v); + // referring to convention (3), this should be used by client // to free memory that was malloc()ed extern ROCKSDB_LIBRARY_API void rocksdb_free(void* ptr);