Improve transaction C-API.
* Support two-phase commit. * Support get_pinned in transaction. * Add rocksdb_transactiondb_flush * Support get writebatch from transaction and rebuild transaction from writebatch.
This commit is contained in:
parent
2e5f764294
commit
d789c6b058
169
db/c.cc
169
db/c.cc
@ -4893,6 +4893,11 @@ void rocksdb_transaction_options_set_max_write_batch_size(
|
||||
opt->rep.max_write_batch_size = size;
|
||||
}
|
||||
|
||||
void rocksdb_transaction_options_set_skip_prepare(
|
||||
rocksdb_transaction_options_t* opt, unsigned char v) {
|
||||
opt->rep.skip_prepare = v;
|
||||
}
|
||||
|
||||
rocksdb_optimistictransaction_options_t*
|
||||
rocksdb_optimistictransaction_options_create() {
|
||||
return new rocksdb_optimistictransaction_options_t;
|
||||
@ -5036,6 +5041,58 @@ rocksdb_transaction_t* rocksdb_transaction_begin(
|
||||
return old_txn;
|
||||
}
|
||||
|
||||
rocksdb_transaction_t** rocksdb_transactiondb_get_prepared_transactions(
|
||||
rocksdb_transactiondb_t* txn_db, size_t* cnt) {
|
||||
std::vector<Transaction*> txns;
|
||||
txn_db->rep->GetAllPreparedTransactions(&txns);
|
||||
*cnt = txns.size();
|
||||
if (txns.empty()) {
|
||||
return nullptr;
|
||||
} else {
|
||||
rocksdb_transaction_t** buf =
|
||||
(rocksdb_transaction_t**)malloc(txns.size() * sizeof(rocksdb_transaction_t*));
|
||||
for (size_t i = 0; i < txns.size(); i++) {
|
||||
buf[i] = new rocksdb_transaction_t;
|
||||
buf[i]->rep = txns[i];
|
||||
}
|
||||
return buf;
|
||||
}
|
||||
}
|
||||
|
||||
void rocksdb_transaction_set_name(rocksdb_transaction_t* txn, const char* name,
|
||||
size_t name_len, char** errptr) {
|
||||
std::string str = std::string(name, name_len);
|
||||
SaveError(errptr, txn->rep->SetName(str));
|
||||
}
|
||||
|
||||
char* rocksdb_transaction_get_name(rocksdb_transaction_t* txn, size_t* name_len) {
|
||||
auto name = txn->rep->GetName();
|
||||
*name_len = name.size();
|
||||
return CopyString(name);
|
||||
}
|
||||
|
||||
void rocksdb_transaction_prepare(rocksdb_transaction_t* txn, char** errptr) {
|
||||
SaveError(errptr, txn->rep->Prepare());
|
||||
}
|
||||
|
||||
rocksdb_writebatch_wi_t* rocksdb_transaction_get_writebatch_wi(
|
||||
rocksdb_transaction_t* txn) {
|
||||
rocksdb_writebatch_wi_t *wi = (rocksdb_writebatch_wi_t*)malloc(sizeof(rocksdb_writebatch_wi_t));
|
||||
wi->rep = txn->rep->GetWriteBatch();
|
||||
|
||||
return wi;
|
||||
}
|
||||
|
||||
void rocksdb_transaction_rebuild_from_writebatch(
|
||||
rocksdb_transaction_t* txn, rocksdb_writebatch_t *writebatch, char** errptr) {
|
||||
SaveError(errptr, txn->rep->RebuildFromWriteBatch(&writebatch->rep));
|
||||
}
|
||||
|
||||
void rocksdb_transaction_rebuild_from_writebatch_wi(
|
||||
rocksdb_transaction_t* txn, rocksdb_writebatch_wi_t *wi, char** errptr) {
|
||||
SaveError(errptr, txn->rep->RebuildFromWriteBatch(wi->rep->GetWriteBatch()));
|
||||
}
|
||||
|
||||
void rocksdb_transaction_commit(rocksdb_transaction_t* txn, char** errptr) {
|
||||
SaveError(errptr, txn->rep->Commit());
|
||||
}
|
||||
@ -5087,6 +5144,22 @@ char* rocksdb_transaction_get(rocksdb_transaction_t* txn,
|
||||
return result;
|
||||
}
|
||||
|
||||
rocksdb_pinnableslice_t* rocksdb_transaction_get_pinned(
|
||||
rocksdb_transaction_t* txn, const rocksdb_readoptions_t* options,
|
||||
const char* key, size_t klen, char** errptr) {
|
||||
rocksdb_pinnableslice_t* v = new (rocksdb_pinnableslice_t);
|
||||
Status s = txn->rep->Get(options->rep, Slice(key, klen), &v->rep);
|
||||
if (!s.ok()) {
|
||||
delete (v);
|
||||
if (!s.IsNotFound()) {
|
||||
SaveError(errptr, s);
|
||||
}
|
||||
return nullptr;
|
||||
}
|
||||
return v;
|
||||
}
|
||||
|
||||
|
||||
char* rocksdb_transaction_get_cf(rocksdb_transaction_t* txn,
|
||||
const rocksdb_readoptions_t* options,
|
||||
rocksdb_column_family_handle_t* column_family,
|
||||
@ -5108,6 +5181,22 @@ char* rocksdb_transaction_get_cf(rocksdb_transaction_t* txn,
|
||||
return result;
|
||||
}
|
||||
|
||||
rocksdb_pinnableslice_t* rocksdb_transaction_get_pinned_cf(
|
||||
rocksdb_transaction_t* txn, const rocksdb_readoptions_t* options,
|
||||
rocksdb_column_family_handle_t* column_family, const char* key, size_t klen,
|
||||
char** errptr) {
|
||||
rocksdb_pinnableslice_t* v = new (rocksdb_pinnableslice_t);
|
||||
Status s = txn->rep->Get(options->rep, column_family->rep, Slice(key, klen), &v->rep);
|
||||
if (!s.ok()) {
|
||||
delete (v);
|
||||
if (!s.IsNotFound()) {
|
||||
SaveError(errptr, s);
|
||||
}
|
||||
return nullptr;
|
||||
}
|
||||
return v;
|
||||
}
|
||||
|
||||
// Read a key inside a transaction
|
||||
char* rocksdb_transaction_get_for_update(rocksdb_transaction_t* txn,
|
||||
const rocksdb_readoptions_t* options,
|
||||
@ -5130,6 +5219,22 @@ char* rocksdb_transaction_get_for_update(rocksdb_transaction_t* txn,
|
||||
return result;
|
||||
}
|
||||
|
||||
rocksdb_pinnableslice_t* rocksdb_transaction_get_pinned_for_update(
|
||||
rocksdb_transaction_t* txn, const rocksdb_readoptions_t* options,
|
||||
const char* key, size_t klen, unsigned char exclusive, char** errptr) {
|
||||
rocksdb_pinnableslice_t* v = new (rocksdb_pinnableslice_t);
|
||||
Status s = txn->rep->GetForUpdate(options->rep, Slice(key, klen), v->rep.GetSelf(), exclusive);
|
||||
v->rep.PinSelf();
|
||||
if (!s.ok()) {
|
||||
delete (v);
|
||||
if (!s.IsNotFound()) {
|
||||
SaveError(errptr, s);
|
||||
}
|
||||
return nullptr;
|
||||
}
|
||||
return v;
|
||||
}
|
||||
|
||||
char* rocksdb_transaction_get_for_update_cf(
|
||||
rocksdb_transaction_t* txn, const rocksdb_readoptions_t* options,
|
||||
rocksdb_column_family_handle_t* column_family, const char* key, size_t klen,
|
||||
@ -5150,6 +5255,22 @@ char* rocksdb_transaction_get_for_update_cf(
|
||||
return result;
|
||||
}
|
||||
|
||||
rocksdb_pinnableslice_t* rocksdb_transaction_get_pinned_for_update_cf(
|
||||
rocksdb_transaction_t* txn, const rocksdb_readoptions_t* options,
|
||||
rocksdb_column_family_handle_t* column_family, const char* key, size_t klen,
|
||||
unsigned char exclusive, char** errptr) {
|
||||
rocksdb_pinnableslice_t* v = new (rocksdb_pinnableslice_t);
|
||||
Status s = txn->rep->GetForUpdate(options->rep, column_family->rep, Slice(key, klen), &v->rep, exclusive);
|
||||
if (!s.ok()) {
|
||||
delete (v);
|
||||
if (!s.IsNotFound()) {
|
||||
SaveError(errptr, s);
|
||||
}
|
||||
return nullptr;
|
||||
}
|
||||
return v;
|
||||
}
|
||||
|
||||
// Read a key outside a transaction
|
||||
char* rocksdb_transactiondb_get(
|
||||
rocksdb_transactiondb_t* txn_db,
|
||||
@ -5172,6 +5293,21 @@ char* rocksdb_transactiondb_get(
|
||||
return result;
|
||||
}
|
||||
|
||||
rocksdb_pinnableslice_t* rocksdb_transactiondb_get_pinned(
|
||||
rocksdb_transactiondb_t* txn_db, const rocksdb_readoptions_t* options,
|
||||
const char* key, size_t klen, char** errptr) {
|
||||
rocksdb_pinnableslice_t* v = new (rocksdb_pinnableslice_t);
|
||||
Status s = txn_db->rep->Get(options->rep, txn_db->rep->DefaultColumnFamily(), Slice(key, klen), &v->rep);
|
||||
if (!s.ok()) {
|
||||
delete (v);
|
||||
if (!s.IsNotFound()) {
|
||||
SaveError(errptr, s);
|
||||
}
|
||||
return nullptr;
|
||||
}
|
||||
return v;
|
||||
}
|
||||
|
||||
char* rocksdb_transactiondb_get_cf(
|
||||
rocksdb_transactiondb_t* txn_db, const rocksdb_readoptions_t* options,
|
||||
rocksdb_column_family_handle_t* column_family, const char* key,
|
||||
@ -5192,6 +5328,22 @@ char* rocksdb_transactiondb_get_cf(
|
||||
return result;
|
||||
}
|
||||
|
||||
rocksdb_pinnableslice_t* rocksdb_transactiondb_get_pinned_cf(
|
||||
rocksdb_transactiondb_t* txn_db, const rocksdb_readoptions_t* options,
|
||||
rocksdb_column_family_handle_t* column_family, const char* key,
|
||||
size_t keylen, char** errptr) {
|
||||
rocksdb_pinnableslice_t* v = new (rocksdb_pinnableslice_t);
|
||||
Status s = txn_db->rep->Get(options->rep, column_family->rep, Slice(key, keylen), &v->rep);
|
||||
if (!s.ok()) {
|
||||
delete (v);
|
||||
if (!s.IsNotFound()) {
|
||||
SaveError(errptr, s);
|
||||
}
|
||||
return nullptr;
|
||||
}
|
||||
return v;
|
||||
}
|
||||
|
||||
// Put a key inside a transaction
|
||||
void rocksdb_transaction_put(rocksdb_transaction_t* txn, const char* key,
|
||||
size_t klen, const char* val, size_t vlen,
|
||||
@ -5332,6 +5484,23 @@ void rocksdb_transactiondb_close(rocksdb_transactiondb_t* txn_db) {
|
||||
delete txn_db;
|
||||
}
|
||||
|
||||
void rocksdb_transactiondb_flush_wal(
|
||||
rocksdb_transactiondb_t* txn_db, unsigned char sync, char** errptr) {
|
||||
SaveError(errptr, txn_db->rep->FlushWAL(sync));
|
||||
}
|
||||
|
||||
void rocksdb_transactiondb_flush(
|
||||
rocksdb_transactiondb_t* txn_db, const rocksdb_flushoptions_t* options,
|
||||
char** errptr) {
|
||||
SaveError(errptr, txn_db->rep->Flush(options->rep));
|
||||
}
|
||||
|
||||
void rocksdb_transactiondb_flush_cf(
|
||||
rocksdb_transactiondb_t* txn_db, const rocksdb_flushoptions_t* options,
|
||||
rocksdb_column_family_handle_t* column_family, char** errptr) {
|
||||
SaveError(errptr, txn_db->rep->Flush(options->rep, column_family->rep));
|
||||
}
|
||||
|
||||
rocksdb_checkpoint_t* rocksdb_transactiondb_checkpoint_object_create(
|
||||
rocksdb_transactiondb_t* txn_db, char** errptr) {
|
||||
Checkpoint* checkpoint;
|
||||
|
187
db/c_test.c
187
db/c_test.c
@ -400,6 +400,35 @@ static void CheckTxnGetCF(rocksdb_transaction_t* txn,
|
||||
Free(&val);
|
||||
}
|
||||
|
||||
static void CheckTxnPinGet(rocksdb_transaction_t* txn,
|
||||
const rocksdb_readoptions_t* options,
|
||||
const char* key, const char* expected) {
|
||||
rocksdb_pinnableslice_t* p = NULL;
|
||||
const char* val = NULL;
|
||||
char* err = NULL;
|
||||
size_t val_len;
|
||||
p = rocksdb_transaction_get_pinned(txn, options, key, strlen(key), &err);
|
||||
CheckNoError(err);
|
||||
val = rocksdb_pinnableslice_value(p, &val_len);
|
||||
CheckEqual(expected, val, val_len);
|
||||
rocksdb_pinnableslice_destroy(p);
|
||||
}
|
||||
|
||||
static void CheckTxnPinGetCF(rocksdb_transaction_t* txn,
|
||||
const rocksdb_readoptions_t* options,
|
||||
rocksdb_column_family_handle_t* column_family,
|
||||
const char* key, const char* expected) {
|
||||
rocksdb_pinnableslice_t* p = NULL;
|
||||
const char* val = NULL;
|
||||
char* err = NULL;
|
||||
size_t val_len;
|
||||
p = rocksdb_transaction_get_pinned_cf(txn, options, column_family, key, strlen(key), &err);
|
||||
CheckNoError(err);
|
||||
val = rocksdb_pinnableslice_value(p, &val_len);
|
||||
CheckEqual(expected, val, val_len);
|
||||
rocksdb_pinnableslice_destroy(p);
|
||||
}
|
||||
|
||||
static void CheckTxnDBGet(
|
||||
rocksdb_transactiondb_t* txn_db,
|
||||
const rocksdb_readoptions_t* options,
|
||||
@ -428,6 +457,36 @@ static void CheckTxnDBGetCF(rocksdb_transactiondb_t* txn_db,
|
||||
Free(&val);
|
||||
}
|
||||
|
||||
static void CheckTxnDBPinGet(rocksdb_transactiondb_t* txn_db,
|
||||
const rocksdb_readoptions_t* options,
|
||||
const char* key,
|
||||
const char* expected) {
|
||||
rocksdb_pinnableslice_t* p = NULL;
|
||||
const char* val = NULL;
|
||||
char* err = NULL;
|
||||
size_t val_len;
|
||||
p = rocksdb_transactiondb_get_pinned(txn_db, options, key, strlen(key), &err);
|
||||
CheckNoError(err);
|
||||
val = rocksdb_pinnableslice_value(p, &val_len);
|
||||
CheckEqual(expected, val, val_len);
|
||||
rocksdb_pinnableslice_destroy(p);
|
||||
}
|
||||
|
||||
static void CheckTxnDBPinGetCF(rocksdb_transactiondb_t* txn_db,
|
||||
const rocksdb_readoptions_t* options,
|
||||
rocksdb_column_family_handle_t* column_family,
|
||||
const char* key, const char* expected) {
|
||||
rocksdb_pinnableslice_t* p = NULL;
|
||||
const char* val = NULL;
|
||||
char* err = NULL;
|
||||
size_t val_len;
|
||||
p = rocksdb_transactiondb_get_pinned_cf(txn_db, options, column_family, key, strlen(key), &err);
|
||||
CheckNoError(err);
|
||||
val = rocksdb_pinnableslice_value(p, &val_len);
|
||||
CheckEqual(expected, val, val_len);
|
||||
rocksdb_pinnableslice_destroy(p);
|
||||
}
|
||||
|
||||
int main(int argc, char** argv) {
|
||||
(void)argc;
|
||||
(void)argv;
|
||||
@ -2655,11 +2714,13 @@ int main(int argc, char** argv) {
|
||||
rocksdb_transactiondb_put(txn_db, woptions, "foo", 3, "hello", 5, &err);
|
||||
CheckNoError(err);
|
||||
CheckTxnDBGet(txn_db, roptions, "foo", "hello");
|
||||
CheckTxnDBPinGet(txn_db, roptions, "foo", "hello");
|
||||
|
||||
// delete from outside transaction
|
||||
rocksdb_transactiondb_delete(txn_db, woptions, "foo", 3, &err);
|
||||
CheckNoError(err);
|
||||
CheckTxnDBGet(txn_db, roptions, "foo", NULL);
|
||||
CheckTxnDBPinGet(txn_db, roptions, "foo", NULL);
|
||||
|
||||
// write batch into TransactionDB
|
||||
rocksdb_writebatch_t* wb = rocksdb_writebatch_create();
|
||||
@ -2671,6 +2732,7 @@ int main(int argc, char** argv) {
|
||||
rocksdb_transactiondb_write(txn_db, woptions, wb, &err);
|
||||
rocksdb_writebatch_destroy(wb);
|
||||
CheckTxnDBGet(txn_db, roptions, "box", "c");
|
||||
CheckTxnDBPinGet(txn_db, roptions, "box", "c");
|
||||
CheckNoError(err);
|
||||
|
||||
// begin a transaction
|
||||
@ -2679,16 +2741,19 @@ int main(int argc, char** argv) {
|
||||
rocksdb_transaction_put(txn, "foo", 3, "hello", 5, &err);
|
||||
CheckNoError(err);
|
||||
CheckTxnGet(txn, roptions, "foo", "hello");
|
||||
CheckTxnPinGet(txn, roptions, "foo", "hello");
|
||||
// delete
|
||||
rocksdb_transaction_delete(txn, "foo", 3, &err);
|
||||
CheckNoError(err);
|
||||
CheckTxnGet(txn, roptions, "foo", NULL);
|
||||
CheckTxnPinGet(txn, roptions, "foo", NULL);
|
||||
|
||||
rocksdb_transaction_put(txn, "foo", 3, "hello", 5, &err);
|
||||
CheckNoError(err);
|
||||
|
||||
// read from outside transaction, before commit
|
||||
CheckTxnDBGet(txn_db, roptions, "foo", NULL);
|
||||
CheckTxnDBPinGet(txn_db, roptions, "foo", NULL);
|
||||
|
||||
// commit
|
||||
rocksdb_transaction_commit(txn, &err);
|
||||
@ -2696,6 +2761,7 @@ int main(int argc, char** argv) {
|
||||
|
||||
// read from outside transaction, after commit
|
||||
CheckTxnDBGet(txn_db, roptions, "foo", "hello");
|
||||
CheckTxnDBPinGet(txn_db, roptions, "foo", "hello");
|
||||
|
||||
// reuse old transaction
|
||||
txn = rocksdb_transaction_begin(txn_db, woptions, txn_options, txn);
|
||||
@ -2709,9 +2775,11 @@ int main(int argc, char** argv) {
|
||||
CheckNoError(err);
|
||||
|
||||
CheckTxnDBGet(txn_db, roptions, "foo", "hello");
|
||||
CheckTxnDBPinGet(txn_db, roptions, "foo", "hello");
|
||||
rocksdb_readoptions_set_snapshot(roptions, NULL);
|
||||
rocksdb_transactiondb_release_snapshot(txn_db, snapshot);
|
||||
CheckTxnDBGet(txn_db, roptions, "foo", "hey");
|
||||
CheckTxnDBPinGet(txn_db, roptions, "foo", "hey");
|
||||
|
||||
// iterate
|
||||
rocksdb_transaction_put(txn, "bar", 3, "hi", 2, &err);
|
||||
@ -2728,25 +2796,34 @@ int main(int argc, char** argv) {
|
||||
rocksdb_transaction_rollback(txn, &err);
|
||||
CheckNoError(err);
|
||||
CheckTxnDBGet(txn_db, roptions, "bar", NULL);
|
||||
CheckTxnDBPinGet(txn_db, roptions, "bar", NULL);
|
||||
|
||||
// save point
|
||||
rocksdb_transaction_put(txn, "foo1", 4, "hi1", 3, &err);
|
||||
rocksdb_transaction_set_savepoint(txn);
|
||||
CheckTxnGet(txn, roptions, "foo1", "hi1");
|
||||
CheckTxnPinGet(txn, roptions, "foo1", "hi1");
|
||||
rocksdb_transaction_put(txn, "foo2", 4, "hi2", 3, &err);
|
||||
CheckTxnGet(txn, roptions, "foo2", "hi2");
|
||||
CheckTxnPinGet(txn, roptions, "foo2", "hi2");
|
||||
|
||||
// rollback to savepoint
|
||||
rocksdb_transaction_rollback_to_savepoint(txn, &err);
|
||||
CheckNoError(err);
|
||||
CheckTxnGet(txn, roptions, "foo2", NULL);
|
||||
CheckTxnGet(txn, roptions, "foo1", "hi1");
|
||||
CheckTxnPinGet(txn, roptions, "foo2", NULL);
|
||||
CheckTxnPinGet(txn, roptions, "foo1", "hi1");
|
||||
CheckTxnDBGet(txn_db, roptions, "foo1", NULL);
|
||||
CheckTxnDBGet(txn_db, roptions, "foo2", NULL);
|
||||
CheckTxnDBPinGet(txn_db, roptions, "foo1", NULL);
|
||||
CheckTxnDBPinGet(txn_db, roptions, "foo2", NULL);
|
||||
rocksdb_transaction_commit(txn, &err);
|
||||
CheckNoError(err);
|
||||
CheckTxnDBGet(txn_db, roptions, "foo1", "hi1");
|
||||
CheckTxnDBGet(txn_db, roptions, "foo2", NULL);
|
||||
CheckTxnDBPinGet(txn_db, roptions, "foo1", "hi1");
|
||||
CheckTxnDBPinGet(txn_db, roptions, "foo2", NULL);
|
||||
|
||||
// Column families.
|
||||
rocksdb_column_family_handle_t* cfh;
|
||||
@ -2758,14 +2835,26 @@ int main(int argc, char** argv) {
|
||||
8, &err);
|
||||
CheckNoError(err);
|
||||
CheckTxnDBGetCF(txn_db, roptions, cfh, "cf_foo", "cf_hello");
|
||||
CheckTxnDBPinGetCF(txn_db, roptions, cfh, "cf_foo", "cf_hello");
|
||||
|
||||
rocksdb_transactiondb_delete_cf(txn_db, woptions, cfh, "cf_foo", 6, &err);
|
||||
CheckNoError(err);
|
||||
CheckTxnDBGetCF(txn_db, roptions, cfh, "cf_foo", NULL);
|
||||
CheckTxnDBPinGetCF(txn_db, roptions, cfh, "cf_foo", NULL);
|
||||
|
||||
rocksdb_column_family_handle_destroy(cfh);
|
||||
// flush
|
||||
rocksdb_flushoptions_t *flush_options = rocksdb_flushoptions_create();
|
||||
rocksdb_flushoptions_set_wait(flush_options, 1);
|
||||
rocksdb_transactiondb_flush_wal(txn_db, 1, &err);
|
||||
CheckNoError(err);
|
||||
rocksdb_transactiondb_flush_cf(txn_db, flush_options, cfh, &err);
|
||||
CheckNoError(err);
|
||||
rocksdb_transactiondb_flush(txn_db, flush_options, &err);
|
||||
CheckNoError(err);
|
||||
rocksdb_flushoptions_destroy(flush_options);
|
||||
|
||||
// close and destroy
|
||||
rocksdb_column_family_handle_destroy(cfh);
|
||||
rocksdb_transaction_destroy(txn);
|
||||
rocksdb_transactiondb_close(txn_db);
|
||||
rocksdb_destroy_db(options, dbname, &err);
|
||||
@ -2774,6 +2863,97 @@ int main(int argc, char** argv) {
|
||||
rocksdb_transactiondb_options_destroy(txn_db_options);
|
||||
}
|
||||
|
||||
StartPhase("two-phase commit");
|
||||
{
|
||||
// open a TransactionDB
|
||||
txn_db_options = rocksdb_transactiondb_options_create();
|
||||
txn_options = rocksdb_transaction_options_create();
|
||||
rocksdb_options_set_create_if_missing(options, 1);
|
||||
txn_db = rocksdb_transactiondb_open(options, txn_db_options, dbname, &err);
|
||||
CheckNoError(err);
|
||||
|
||||
rocksdb_transaction_options_set_skip_prepare(txn_options, 0);
|
||||
txn = rocksdb_transaction_begin(txn_db, woptions, txn_options, NULL);
|
||||
rocksdb_transaction_commit(txn, &err);
|
||||
CheckCondition(err != NULL);
|
||||
Free(&err);
|
||||
err = NULL;
|
||||
rocksdb_transaction_prepare(txn, &err);
|
||||
CheckCondition(err != NULL);
|
||||
Free(&err);
|
||||
err = NULL;
|
||||
rocksdb_transaction_set_name(txn, "txn1", 4, &err);
|
||||
CheckNoError(err);
|
||||
rocksdb_transaction_prepare(txn, &err);
|
||||
CheckNoError(err);
|
||||
rocksdb_transaction_commit(txn, &err);
|
||||
CheckNoError(err);
|
||||
rocksdb_transaction_destroy(txn);
|
||||
|
||||
// prepare 2 transactions and close db.
|
||||
rocksdb_transaction_t* txn1 = rocksdb_transaction_begin(
|
||||
txn_db, woptions, txn_options, NULL);
|
||||
rocksdb_transaction_put(txn1, "bar1", 4, "1", 1, &err);
|
||||
CheckNoError(err);
|
||||
rocksdb_transaction_set_name(txn1, "txn1", 4, &err);
|
||||
CheckNoError(err);
|
||||
rocksdb_transaction_prepare(txn1, &err);
|
||||
CheckNoError(err);
|
||||
rocksdb_transaction_t* txn2 = rocksdb_transaction_begin(
|
||||
txn_db, woptions, txn_options, NULL);
|
||||
rocksdb_transaction_put(txn2, "bar2", 4, "2", 1, &err);
|
||||
CheckNoError(err);
|
||||
rocksdb_transaction_set_name(txn2, "txn2", 4, &err);
|
||||
CheckNoError(err);
|
||||
rocksdb_transaction_prepare(txn2, &err);
|
||||
CheckNoError(err);
|
||||
rocksdb_transaction_destroy(txn1);
|
||||
rocksdb_transaction_destroy(txn2);
|
||||
rocksdb_transactiondb_close(txn_db);
|
||||
rocksdb_transaction_options_destroy(txn_options);
|
||||
rocksdb_transactiondb_options_destroy(txn_db_options);
|
||||
|
||||
// reopen db and get all prepared.
|
||||
txn_db_options = rocksdb_transactiondb_options_create();
|
||||
txn_options = rocksdb_transaction_options_create();
|
||||
rocksdb_options_set_error_if_exists(options, 0);
|
||||
txn_db = rocksdb_transactiondb_open(options, txn_db_options, dbname, &err);
|
||||
CheckNoError(err);
|
||||
CheckTxnDBPinGet(txn_db, roptions, "bar1", NULL);
|
||||
CheckTxnDBPinGet(txn_db, roptions, "bar2", NULL);
|
||||
size_t cnt;
|
||||
rocksdb_transaction_t** txns =
|
||||
rocksdb_transactiondb_get_prepared_transactions(txn_db, &cnt);
|
||||
CheckCondition(cnt == 2);
|
||||
size_t i;
|
||||
for (i = 0; i < cnt; i++) {
|
||||
txn = txns[i];
|
||||
size_t name_len = 0;
|
||||
char* name = rocksdb_transaction_get_name(txn, &name_len);
|
||||
CheckCondition(name_len == 4);
|
||||
if (strncmp(name, "txn1", name_len) == 0) {
|
||||
rocksdb_transaction_commit(txn, &err);
|
||||
} else if (strncmp(name, "txn2", name_len) == 0) {
|
||||
rocksdb_transaction_rollback(txn, &err);
|
||||
}
|
||||
rocksdb_free(name);
|
||||
CheckNoError(err);
|
||||
rocksdb_transaction_destroy(txn);
|
||||
}
|
||||
rocksdb_free(txns);
|
||||
CheckTxnDBGet(txn_db, roptions, "bar1", "1");
|
||||
CheckTxnDBGet(txn_db, roptions, "bar2", NULL);
|
||||
rocksdb_transactiondb_put(txn_db, woptions, "bar2", 4, "2", 1, &err);
|
||||
CheckNoError(err);
|
||||
|
||||
// close and destroy
|
||||
rocksdb_transactiondb_close(txn_db);
|
||||
rocksdb_destroy_db(options, dbname, &err);
|
||||
CheckNoError(err);
|
||||
rocksdb_transaction_options_destroy(txn_options);
|
||||
rocksdb_transactiondb_options_destroy(txn_db_options);
|
||||
}
|
||||
|
||||
StartPhase("optimistic_transactions");
|
||||
{
|
||||
rocksdb_options_t* db_options = rocksdb_options_create();
|
||||
@ -2790,6 +2970,7 @@ int main(int argc, char** argv) {
|
||||
rocksdb_transaction_put(txn2, "key1", 4, "value1", 6, &err);
|
||||
CheckNoError(err);
|
||||
CheckTxnGet(txn1, roptions, "key", "value");
|
||||
CheckTxnPinGet(txn1, roptions, "key", "value");
|
||||
rocksdb_transaction_commit(txn1, &err);
|
||||
CheckNoError(err);
|
||||
rocksdb_transaction_commit(txn2, &err);
|
||||
@ -2816,6 +2997,7 @@ int main(int argc, char** argv) {
|
||||
txn);
|
||||
CheckGetCF(db, roptions, cfh1, "key_cf1", "val_cf1");
|
||||
CheckTxnGetCF(txn, roptions, cfh1, "key_cf1", "val_cf1");
|
||||
CheckTxnPinGetCF(txn, roptions, cfh1, "key_cf1", "val_cf1");
|
||||
|
||||
// Check iterator with column family
|
||||
rocksdb_transaction_put_cf(txn, cfh1, "key1_cf", 7, "val1_cf", 7, &err);
|
||||
@ -2861,6 +3043,9 @@ int main(int argc, char** argv) {
|
||||
CheckTxnGetCF(txn_cf, roptions, cf_handles[0], "key", "value");
|
||||
CheckTxnGetCF(txn_cf, roptions, cf_handles[1], "key_cf1", "val_cf1");
|
||||
CheckTxnGetCF(txn_cf, roptions, cf_handles[2], "key_cf2", "val_cf2");
|
||||
CheckTxnPinGetCF(txn_cf, roptions, cf_handles[0], "key", "value");
|
||||
CheckTxnPinGetCF(txn_cf, roptions, cf_handles[1], "key_cf1", "val_cf1");
|
||||
CheckTxnPinGetCF(txn_cf, roptions, cf_handles[2], "key_cf2", "val_cf2");
|
||||
rocksdb_transaction_destroy(txn_cf);
|
||||
rocksdb_options_destroy(cf_options);
|
||||
rocksdb_column_family_handle_destroy(cf_handles[0]);
|
||||
|
@ -2067,6 +2067,18 @@ extern ROCKSDB_LIBRARY_API rocksdb_transaction_t* rocksdb_transaction_begin(
|
||||
const rocksdb_transaction_options_t* txn_options,
|
||||
rocksdb_transaction_t* old_txn);
|
||||
|
||||
extern ROCKSDB_LIBRARY_API rocksdb_transaction_t** rocksdb_transactiondb_get_prepared_transactions(
|
||||
rocksdb_transactiondb_t* txn_db, size_t* cnt);
|
||||
|
||||
extern ROCKSDB_LIBRARY_API void rocksdb_transaction_set_name(
|
||||
rocksdb_transaction_t* txn, const char* name, size_t name_len, char** errptr);
|
||||
|
||||
extern ROCKSDB_LIBRARY_API char* rocksdb_transaction_get_name(
|
||||
rocksdb_transaction_t* txn, size_t* name_len);
|
||||
|
||||
extern ROCKSDB_LIBRARY_API void rocksdb_transaction_prepare(
|
||||
rocksdb_transaction_t* txn, char** errptr);
|
||||
|
||||
extern ROCKSDB_LIBRARY_API void rocksdb_transaction_commit(
|
||||
rocksdb_transaction_t* txn, char** errptr);
|
||||
|
||||
@ -2082,6 +2094,16 @@ extern ROCKSDB_LIBRARY_API void rocksdb_transaction_rollback_to_savepoint(
|
||||
extern ROCKSDB_LIBRARY_API void rocksdb_transaction_destroy(
|
||||
rocksdb_transaction_t* txn);
|
||||
|
||||
extern ROCKSDB_LIBRARY_API rocksdb_writebatch_wi_t* rocksdb_transaction_get_writebatch_wi(
|
||||
rocksdb_transaction_t* txn);
|
||||
|
||||
extern ROCKSDB_LIBRARY_API void rocksdb_transaction_rebuild_from_writebatch(
|
||||
rocksdb_transaction_t* txn, rocksdb_writebatch_t *writebatch, char** errptr);
|
||||
|
||||
// This rocksdb_writebatch_wi_t should be freed with rocksdb_free
|
||||
extern ROCKSDB_LIBRARY_API void rocksdb_transaction_rebuild_from_writebatch_wi(
|
||||
rocksdb_transaction_t* txn, rocksdb_writebatch_wi_t *wi, char** errptr);
|
||||
|
||||
// This snapshot should be freed using rocksdb_free
|
||||
extern ROCKSDB_LIBRARY_API const rocksdb_snapshot_t*
|
||||
rocksdb_transaction_get_snapshot(rocksdb_transaction_t* txn);
|
||||
@ -2090,30 +2112,57 @@ extern ROCKSDB_LIBRARY_API char* rocksdb_transaction_get(
|
||||
rocksdb_transaction_t* txn, const rocksdb_readoptions_t* options,
|
||||
const char* key, size_t klen, size_t* vlen, char** errptr);
|
||||
|
||||
extern ROCKSDB_LIBRARY_API rocksdb_pinnableslice_t* rocksdb_transaction_get_pinned(
|
||||
rocksdb_transaction_t* txn, const rocksdb_readoptions_t* options,
|
||||
const char* key, size_t klen, char** errptr);
|
||||
|
||||
extern ROCKSDB_LIBRARY_API char* rocksdb_transaction_get_cf(
|
||||
rocksdb_transaction_t* txn, const rocksdb_readoptions_t* options,
|
||||
rocksdb_column_family_handle_t* column_family, const char* key, size_t klen,
|
||||
size_t* vlen, char** errptr);
|
||||
|
||||
extern ROCKSDB_LIBRARY_API rocksdb_pinnableslice_t* rocksdb_transaction_get_pinned_cf(
|
||||
rocksdb_transaction_t* txn, const rocksdb_readoptions_t* options,
|
||||
rocksdb_column_family_handle_t* column_family, const char* key, size_t klen,
|
||||
char** errptr);
|
||||
|
||||
extern ROCKSDB_LIBRARY_API char* rocksdb_transaction_get_for_update(
|
||||
rocksdb_transaction_t* txn, const rocksdb_readoptions_t* options,
|
||||
const char* key, size_t klen, size_t* vlen, unsigned char exclusive,
|
||||
char** errptr);
|
||||
|
||||
extern ROCKSDB_LIBRARY_API rocksdb_pinnableslice_t* rocksdb_transaction_get_pinned_for_update(
|
||||
rocksdb_transaction_t* txn, const rocksdb_readoptions_t* options,
|
||||
const char* key, size_t klen, unsigned char exclusive, char** errptr);
|
||||
|
||||
extern ROCKSDB_LIBRARY_API char* rocksdb_transaction_get_for_update_cf(
|
||||
rocksdb_transaction_t* txn, const rocksdb_readoptions_t* options,
|
||||
rocksdb_column_family_handle_t* column_family, const char* key, size_t klen,
|
||||
size_t* vlen, unsigned char exclusive, char** errptr);
|
||||
|
||||
extern ROCKSDB_LIBRARY_API rocksdb_pinnableslice_t* rocksdb_transaction_get_pinned_for_update_cf(
|
||||
rocksdb_transaction_t* txn, const rocksdb_readoptions_t* options,
|
||||
rocksdb_column_family_handle_t* column_family, const char* key, size_t klen,
|
||||
unsigned char exclusive, char** errptr);
|
||||
|
||||
extern ROCKSDB_LIBRARY_API char* rocksdb_transactiondb_get(
|
||||
rocksdb_transactiondb_t* txn_db, const rocksdb_readoptions_t* options,
|
||||
const char* key, size_t klen, size_t* vlen, char** errptr);
|
||||
|
||||
extern ROCKSDB_LIBRARY_API rocksdb_pinnableslice_t* rocksdb_transactiondb_get_pinned(
|
||||
rocksdb_transactiondb_t* txn_db, const rocksdb_readoptions_t* options,
|
||||
const char* key, size_t klen, char** errptr);
|
||||
|
||||
extern ROCKSDB_LIBRARY_API char* rocksdb_transactiondb_get_cf(
|
||||
rocksdb_transactiondb_t* txn_db, const rocksdb_readoptions_t* options,
|
||||
rocksdb_column_family_handle_t* column_family, const char* key,
|
||||
size_t keylen, size_t* vallen, char** errptr);
|
||||
|
||||
extern ROCKSDB_LIBRARY_API rocksdb_pinnableslice_t* rocksdb_transactiondb_get_pinned_cf(
|
||||
rocksdb_transactiondb_t* txn_db, const rocksdb_readoptions_t* options,
|
||||
rocksdb_column_family_handle_t* column_family, const char* key,
|
||||
size_t keylen, char** errptr);
|
||||
|
||||
extern ROCKSDB_LIBRARY_API void rocksdb_transaction_put(
|
||||
rocksdb_transaction_t* txn, const char* key, size_t klen, const char* val,
|
||||
size_t vlen, char** errptr);
|
||||
@ -2189,6 +2238,16 @@ rocksdb_transactiondb_create_iterator_cf(
|
||||
extern ROCKSDB_LIBRARY_API void rocksdb_transactiondb_close(
|
||||
rocksdb_transactiondb_t* txn_db);
|
||||
|
||||
extern ROCKSDB_LIBRARY_API void rocksdb_transactiondb_flush(
|
||||
rocksdb_transactiondb_t* txn_db, const rocksdb_flushoptions_t* options, char** errptr);
|
||||
|
||||
extern ROCKSDB_LIBRARY_API void rocksdb_transactiondb_flush_cf(
|
||||
rocksdb_transactiondb_t* txn_db, const rocksdb_flushoptions_t* options,
|
||||
rocksdb_column_family_handle_t* column_family, char** errptr);
|
||||
|
||||
extern ROCKSDB_LIBRARY_API void rocksdb_transactiondb_flush_wal(
|
||||
rocksdb_transactiondb_t* txn_db, unsigned char sync, char** errptr);
|
||||
|
||||
extern ROCKSDB_LIBRARY_API rocksdb_checkpoint_t*
|
||||
rocksdb_transactiondb_checkpoint_object_create(rocksdb_transactiondb_t* txn_db,
|
||||
char** errptr);
|
||||
@ -2278,6 +2337,10 @@ extern ROCKSDB_LIBRARY_API void
|
||||
rocksdb_transaction_options_set_max_write_batch_size(
|
||||
rocksdb_transaction_options_t* opt, size_t size);
|
||||
|
||||
extern ROCKSDB_LIBRARY_API void
|
||||
rocksdb_transaction_options_set_skip_prepare(
|
||||
rocksdb_transaction_options_t* opt, unsigned char v);
|
||||
|
||||
extern ROCKSDB_LIBRARY_API rocksdb_optimistictransaction_options_t*
|
||||
rocksdb_optimistictransaction_options_create(void);
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user