From eed71dfa823958e9b7f29604335bdd0eee2d2a17 Mon Sep 17 00:00:00 2001 From: Alan Paxton Date: Mon, 14 Feb 2022 08:30:00 -0800 Subject: [PATCH] Transaction multiGet convert to list-based (#9522) Summary: Transaction multiGet convert to list-based. RocksDB Java (non-transactional) has multiGetAsList() methods to expose multiGet(). These return a list of results. These methods replaced multiGet() methods returning an array of results, which were deprecated in Rocks 6 and are being removed in Rocks 7. The transactional API still presents multiGet() methods returning arrays, so in Rocks 7 we replace these with multiGetAsList()methods and deprecate the multiGet() methods. This does not require any changes to the supporting JNI/C++ code, only to the wrappers which present the Java API. Pull Request resolved: https://github.com/facebook/rocksdb/pull/9522 Reviewed By: mrambacher Differential Revision: D34114373 Pulled By: jay-zhuang fbshipit-source-id: cb22d6095934d951b6aee4aed3e07923d3c18007 --- .../main/java/org/rocksdb/Transaction.java | 172 +++++++++++++++++- .../org/rocksdb/AbstractTransactionTest.java | 99 ++++++++-- .../rocksdb/OptimisticTransactionTest.java | 130 ++++++++++--- .../java/org/rocksdb/TransactionTest.java | 102 +++++++++-- 4 files changed, 429 insertions(+), 74 deletions(-) diff --git a/java/src/main/java/org/rocksdb/Transaction.java b/java/src/main/java/org/rocksdb/Transaction.java index 0c10f93e5..6280c26bb 100644 --- a/java/src/main/java/org/rocksdb/Transaction.java +++ b/java/src/main/java/org/rocksdb/Transaction.java @@ -5,6 +5,8 @@ package org.rocksdb; +import java.util.ArrayList; +import java.util.Arrays; import java.util.List; /** @@ -310,7 +312,7 @@ public class Transaction extends RocksObject { /** * This function is similar to - * {@link RocksDB#multiGet(ReadOptions, List, List)} except it will + * {@link RocksDB#multiGetAsList} except it will * also read pending changes in this transaction. * Currently, this function will return Status::MergeInProgress if the most * recent write to the queried key in this batch is a Merge. @@ -336,9 +338,10 @@ public class Transaction extends RocksObject { * @throws IllegalArgumentException thrown if the size of passed keys is not * equal to the amount of passed column family handles. */ + @Deprecated public byte[][] multiGet(final ReadOptions readOptions, - final List columnFamilyHandles, - final byte[][] keys) throws RocksDBException { + final List columnFamilyHandles, final byte[][] keys) + throws RocksDBException { assert(isOwningHandle()); // Check if key size equals cfList size. If not a exception must be // thrown. If not a Segmentation fault happens. @@ -360,7 +363,57 @@ public class Transaction extends RocksObject { /** * This function is similar to - * {@link RocksDB#multiGet(ReadOptions, List)} except it will + * {@link RocksDB#multiGetAsList(ReadOptions, List, List)} except it will + * also read pending changes in this transaction. + * Currently, this function will return Status::MergeInProgress if the most + * recent write to the queried key in this batch is a Merge. + * + * If {@link ReadOptions#snapshot()} is not set, the current version of the + * key will be read. Calling {@link #setSnapshot()} does not affect the + * version of the data returned. + * + * Note that setting {@link ReadOptions#setSnapshot(Snapshot)} will affect + * what is read from the DB but will NOT change which keys are read from this + * transaction (the keys in this transaction do not yet belong to any snapshot + * and will be fetched regardless). + * + * @param readOptions Read options. + * @param columnFamilyHandles {@link java.util.List} containing + * {@link org.rocksdb.ColumnFamilyHandle} instances. + * @param keys of keys for which values need to be retrieved. + * + * @return Array of values, one for each key + * + * @throws RocksDBException thrown if error happens in underlying + * native library. + * @throws IllegalArgumentException thrown if the size of passed keys is not + * equal to the amount of passed column family handles. + */ + + public List multiGetAsList(final ReadOptions readOptions, + final List columnFamilyHandles, final List keys) + throws RocksDBException { + assert (isOwningHandle()); + // Check if key size equals cfList size. If not a exception must be + // thrown. If not a Segmentation fault happens. + if (keys.size() != columnFamilyHandles.size()) { + throw new IllegalArgumentException("For each key there must be a ColumnFamilyHandle."); + } + if (keys.size() == 0) { + return new ArrayList<>(0); + } + final byte[][] keysArray = keys.toArray(new byte[keys.size()][]); + final long[] cfHandles = new long[columnFamilyHandles.size()]; + for (int i = 0; i < columnFamilyHandles.size(); i++) { + cfHandles[i] = columnFamilyHandles.get(i).nativeHandle_; + } + + return Arrays.asList(multiGet(nativeHandle_, readOptions.nativeHandle_, keysArray, cfHandles)); + } + + /** + * This function is similar to + * {@link RocksDB#multiGetAsList} except it will * also read pending changes in this transaction. * Currently, this function will return Status::MergeInProgress if the most * recent write to the queried key in this batch is a Merge. @@ -383,8 +436,9 @@ public class Transaction extends RocksObject { * @throws RocksDBException thrown if error happens in underlying * native library. */ - public byte[][] multiGet(final ReadOptions readOptions, - final byte[][] keys) throws RocksDBException { + @Deprecated + public byte[][] multiGet(final ReadOptions readOptions, final byte[][] keys) + throws RocksDBException { assert(isOwningHandle()); if(keys.length == 0) { return new byte[0][0]; @@ -394,6 +448,41 @@ public class Transaction extends RocksObject { keys); } + /** + * This function is similar to + * {@link RocksDB#multiGetAsList} except it will + * also read pending changes in this transaction. + * Currently, this function will return Status::MergeInProgress if the most + * recent write to the queried key in this batch is a Merge. + * + * If {@link ReadOptions#snapshot()} is not set, the current version of the + * key will be read. Calling {@link #setSnapshot()} does not affect the + * version of the data returned. + * + * Note that setting {@link ReadOptions#setSnapshot(Snapshot)} will affect + * what is read from the DB but will NOT change which keys are read from this + * transaction (the keys in this transaction do not yet belong to any snapshot + * and will be fetched regardless). + * + * @param readOptions Read options.= + * {@link org.rocksdb.ColumnFamilyHandle} instances. + * @param keys of keys for which values need to be retrieved. + * + * @return Array of values, one for each key + * + * @throws RocksDBException thrown if error happens in underlying + * native library. + */ + public List multiGetAsList(final ReadOptions readOptions, final List keys) + throws RocksDBException { + if (keys.size() == 0) { + return new ArrayList<>(0); + } + final byte[][] keysArray = keys.toArray(new byte[keys.size()][]); + + return Arrays.asList(multiGet(nativeHandle_, readOptions.nativeHandle_, keysArray)); + } + /** * Read this key and ensure that this transaction will only * be able to be committed if this key is not written outside this @@ -541,9 +630,10 @@ public class Transaction extends RocksObject { * @throws RocksDBException thrown if error happens in underlying * native library. */ + @Deprecated public byte[][] multiGetForUpdate(final ReadOptions readOptions, - final List columnFamilyHandles, - final byte[][] keys) throws RocksDBException { + final List columnFamilyHandles, final byte[][] keys) + throws RocksDBException { assert(isOwningHandle()); // Check if key size equals cfList size. If not a exception must be // thrown. If not a Segmentation fault happens. @@ -562,6 +652,43 @@ public class Transaction extends RocksObject { keys, cfHandles); } + /** + * A multi-key version of + * {@link #getForUpdate(ReadOptions, ColumnFamilyHandle, byte[], boolean)}. + * + * + * @param readOptions Read options. + * @param columnFamilyHandles {@link org.rocksdb.ColumnFamilyHandle} + * instances + * @param keys the keys to retrieve the values for. + * + * @return Array of values, one for each key + * + * @throws RocksDBException thrown if error happens in underlying + * native library. + */ + public List multiGetForUpdateAsList(final ReadOptions readOptions, + final List columnFamilyHandles, final List keys) + throws RocksDBException { + assert (isOwningHandle()); + // Check if key size equals cfList size. If not a exception must be + // thrown. If not a Segmentation fault happens. + if (keys.size() != columnFamilyHandles.size()) { + throw new IllegalArgumentException("For each key there must be a ColumnFamilyHandle."); + } + if (keys.size() == 0) { + return new ArrayList<>(); + } + final byte[][] keysArray = keys.toArray(new byte[keys.size()][]); + + final long[] cfHandles = new long[columnFamilyHandles.size()]; + for (int i = 0; i < columnFamilyHandles.size(); i++) { + cfHandles[i] = columnFamilyHandles.get(i).nativeHandle_; + } + return Arrays.asList( + multiGetForUpdate(nativeHandle_, readOptions.nativeHandle_, keysArray, cfHandles)); + } + /** * A multi-key version of {@link #getForUpdate(ReadOptions, byte[], boolean)}. * @@ -574,8 +701,9 @@ public class Transaction extends RocksObject { * @throws RocksDBException thrown if error happens in underlying * native library. */ - public byte[][] multiGetForUpdate(final ReadOptions readOptions, - final byte[][] keys) throws RocksDBException { + @Deprecated + public byte[][] multiGetForUpdate(final ReadOptions readOptions, final byte[][] keys) + throws RocksDBException { assert(isOwningHandle()); if(keys.length == 0) { return new byte[0][0]; @@ -585,6 +713,30 @@ public class Transaction extends RocksObject { readOptions.nativeHandle_, keys); } + /** + * A multi-key version of {@link #getForUpdate(ReadOptions, byte[], boolean)}. + * + * + * @param readOptions Read options. + * @param keys the keys to retrieve the values for. + * + * @return List of values, one for each key + * + * @throws RocksDBException thrown if error happens in underlying + * native library. + */ + public List multiGetForUpdateAsList( + final ReadOptions readOptions, final List keys) throws RocksDBException { + assert (isOwningHandle()); + if (keys.size() == 0) { + return new ArrayList<>(0); + } + + final byte[][] keysArray = keys.toArray(new byte[keys.size()][]); + + return Arrays.asList(multiGetForUpdate(nativeHandle_, readOptions.nativeHandle_, keysArray)); + } + /** * Returns an iterator that will iterate on all keys in the default * column family including both keys in the DB and uncommitted keys in this diff --git a/java/src/test/java/org/rocksdb/AbstractTransactionTest.java b/java/src/test/java/org/rocksdb/AbstractTransactionTest.java index 7cac3015b..46685f9fd 100644 --- a/java/src/test/java/org/rocksdb/AbstractTransactionTest.java +++ b/java/src/test/java/org/rocksdb/AbstractTransactionTest.java @@ -206,12 +206,8 @@ public abstract class AbstractTransactionTest { @Test public void multiGetPut_cf() throws RocksDBException { - final byte keys[][] = new byte[][] { - "key1".getBytes(UTF_8), - "key2".getBytes(UTF_8)}; - final byte values[][] = new byte[][] { - "value1".getBytes(UTF_8), - "value2".getBytes(UTF_8)}; + final byte[][] keys = new byte[][] {"key1".getBytes(UTF_8), "key2".getBytes(UTF_8)}; + final byte[][] values = new byte[][] {"value1".getBytes(UTF_8), "value2".getBytes(UTF_8)}; try(final DBContainer dbContainer = startDb(); final ReadOptions readOptions = new ReadOptions(); @@ -227,14 +223,31 @@ public abstract class AbstractTransactionTest { } } + @Test + public void multiGetPutAsList_cf() throws RocksDBException { + final byte[][] keys = new byte[][] {"key1".getBytes(UTF_8), "key2".getBytes(UTF_8)}; + final byte[][] values = new byte[][] {"value1".getBytes(UTF_8), "value2".getBytes(UTF_8)}; + + try (final DBContainer dbContainer = startDb(); + final ReadOptions readOptions = new ReadOptions(); + final Transaction txn = dbContainer.beginTransaction()) { + final ColumnFamilyHandle testCf = dbContainer.getTestColumnFamily(); + final List cfList = Arrays.asList(testCf, testCf); + + assertThat(txn.multiGetAsList(readOptions, cfList, Arrays.asList(keys))) + .containsExactly(null, null); + + txn.put(testCf, keys[0], values[0]); + txn.put(testCf, keys[1], values[1]); + assertThat(txn.multiGetAsList(readOptions, cfList, Arrays.asList(keys))) + .containsExactly(values); + } + } + @Test public void multiGetPut() throws RocksDBException { - final byte keys[][] = new byte[][] { - "key1".getBytes(UTF_8), - "key2".getBytes(UTF_8)}; - final byte values[][] = new byte[][] { - "value1".getBytes(UTF_8), - "value2".getBytes(UTF_8)}; + final byte[][] keys = new byte[][] {"key1".getBytes(UTF_8), "key2".getBytes(UTF_8)}; + final byte[][] values = new byte[][] {"value1".getBytes(UTF_8), "value2".getBytes(UTF_8)}; try(final DBContainer dbContainer = startDb(); final ReadOptions readOptions = new ReadOptions(); @@ -248,6 +261,22 @@ public abstract class AbstractTransactionTest { } } + @Test + public void multiGetPutAsList() throws RocksDBException { + final byte[][] keys = new byte[][] {"key1".getBytes(UTF_8), "key2".getBytes(UTF_8)}; + final byte[][] values = new byte[][] {"value1".getBytes(UTF_8), "value2".getBytes(UTF_8)}; + + try (final DBContainer dbContainer = startDb(); + final ReadOptions readOptions = new ReadOptions(); + final Transaction txn = dbContainer.beginTransaction()) { + assertThat(txn.multiGetAsList(readOptions, Arrays.asList(keys))).containsExactly(null, null); + + txn.put(keys[0], values[0]); + txn.put(keys[1], values[1]); + assertThat(txn.multiGetAsList(readOptions, Arrays.asList(keys))).containsExactly(values); + } + } + @Test public void getForUpdate_cf() throws RocksDBException { final byte k1[] = "key1".getBytes(UTF_8); @@ -495,6 +524,7 @@ public abstract class AbstractTransactionTest { } } + @Deprecated @Test public void multiGetPutUntracked_cf() throws RocksDBException { final byte keys[][] = new byte[][] { @@ -518,14 +548,32 @@ public abstract class AbstractTransactionTest { } } + @Test + public void multiGetPutUntrackedAsList_cf() throws RocksDBException { + final byte[][] keys = new byte[][] {"key1".getBytes(UTF_8), "key2".getBytes(UTF_8)}; + final byte[][] values = new byte[][] {"value1".getBytes(UTF_8), "value2".getBytes(UTF_8)}; + + try (final DBContainer dbContainer = startDb(); + final ReadOptions readOptions = new ReadOptions(); + final Transaction txn = dbContainer.beginTransaction()) { + final ColumnFamilyHandle testCf = dbContainer.getTestColumnFamily(); + + final List cfList = Arrays.asList(testCf, testCf); + + assertThat(txn.multiGetAsList(readOptions, cfList, Arrays.asList(keys))) + .containsExactly(null, null); + txn.putUntracked(testCf, keys[0], values[0]); + txn.putUntracked(testCf, keys[1], values[1]); + assertThat(txn.multiGetAsList(readOptions, cfList, Arrays.asList(keys))) + .containsExactly(values); + } + } + + @Deprecated @Test public void multiGetPutUntracked() throws RocksDBException { - final byte keys[][] = new byte[][] { - "key1".getBytes(UTF_8), - "key2".getBytes(UTF_8)}; - final byte values[][] = new byte[][] { - "value1".getBytes(UTF_8), - "value2".getBytes(UTF_8)}; + final byte[][] keys = new byte[][] {"key1".getBytes(UTF_8), "key2".getBytes(UTF_8)}; + final byte[][] values = new byte[][] {"value1".getBytes(UTF_8), "value2".getBytes(UTF_8)}; try(final DBContainer dbContainer = startDb(); final ReadOptions readOptions = new ReadOptions(); @@ -538,6 +586,21 @@ public abstract class AbstractTransactionTest { } } + @Test + public void multiGetPutAsListUntracked() throws RocksDBException { + final byte[][] keys = new byte[][] {"key1".getBytes(UTF_8), "key2".getBytes(UTF_8)}; + final byte[][] values = new byte[][] {"value1".getBytes(UTF_8), "value2".getBytes(UTF_8)}; + + try (final DBContainer dbContainer = startDb(); + final ReadOptions readOptions = new ReadOptions(); + final Transaction txn = dbContainer.beginTransaction()) { + assertThat(txn.multiGetAsList(readOptions, Arrays.asList(keys))).containsExactly(null, null); + txn.putUntracked(keys[0], values[0]); + txn.putUntracked(keys[1], values[1]); + assertThat(txn.multiGetAsList(readOptions, Arrays.asList(keys))).containsExactly(values); + } + } + @Test public void mergeUntracked_cf() throws RocksDBException { final byte[] k1 = "key1".getBytes(UTF_8); diff --git a/java/src/test/java/org/rocksdb/OptimisticTransactionTest.java b/java/src/test/java/org/rocksdb/OptimisticTransactionTest.java index f44816e64..f23080242 100644 --- a/java/src/test/java/org/rocksdb/OptimisticTransactionTest.java +++ b/java/src/test/java/org/rocksdb/OptimisticTransactionTest.java @@ -19,9 +19,9 @@ public class OptimisticTransactionTest extends AbstractTransactionTest { @Test public void getForUpdate_cf_conflict() throws RocksDBException { - final byte k1[] = "key1".getBytes(UTF_8); - final byte v1[] = "value1".getBytes(UTF_8); - final byte v12[] = "value12".getBytes(UTF_8); + final byte[] k1 = "key1".getBytes(UTF_8); + final byte[] v1 = "value1".getBytes(UTF_8); + final byte[] v12 = "value12".getBytes(UTF_8); try(final DBContainer dbContainer = startDb(); final ReadOptions readOptions = new ReadOptions()) { final ColumnFamilyHandle testCf = dbContainer.getTestColumnFamily(); @@ -57,9 +57,9 @@ public class OptimisticTransactionTest extends AbstractTransactionTest { @Test public void getForUpdate_conflict() throws RocksDBException { - final byte k1[] = "key1".getBytes(UTF_8); - final byte v1[] = "value1".getBytes(UTF_8); - final byte v12[] = "value12".getBytes(UTF_8); + final byte[] k1 = "key1".getBytes(UTF_8); + final byte[] v1 = "value1".getBytes(UTF_8); + final byte[] v12 = "value12".getBytes(UTF_8); try(final DBContainer dbContainer = startDb(); final ReadOptions readOptions = new ReadOptions()) { @@ -92,14 +92,11 @@ public class OptimisticTransactionTest extends AbstractTransactionTest { } } + @Deprecated @Test public void multiGetForUpdate_cf_conflict() throws RocksDBException { - final byte keys[][] = new byte[][] { - "key1".getBytes(UTF_8), - "key2".getBytes(UTF_8)}; - final byte values[][] = new byte[][] { - "value1".getBytes(UTF_8), - "value2".getBytes(UTF_8)}; + final byte[][] keys = new byte[][] {"key1".getBytes(UTF_8), "key2".getBytes(UTF_8)}; + final byte[][] values = new byte[][] {"value1".getBytes(UTF_8), "value2".getBytes(UTF_8)}; final byte[] otherValue = "otherValue".getBytes(UTF_8); try(final DBContainer dbContainer = startDb(); @@ -139,14 +136,54 @@ public class OptimisticTransactionTest extends AbstractTransactionTest { } } + @Test + public void multiGetAsListForUpdate_cf_conflict() throws RocksDBException { + final byte[][] keys = new byte[][] {"key1".getBytes(UTF_8), "key2".getBytes(UTF_8)}; + final byte[][] values = new byte[][] {"value1".getBytes(UTF_8), "value2".getBytes(UTF_8)}; + final byte[] otherValue = "otherValue".getBytes(UTF_8); + + try (final DBContainer dbContainer = startDb(); + final ReadOptions readOptions = new ReadOptions()) { + final ColumnFamilyHandle testCf = dbContainer.getTestColumnFamily(); + final List cfList = Arrays.asList(testCf, testCf); + + try (final Transaction txn = dbContainer.beginTransaction()) { + txn.put(testCf, keys[0], values[0]); + txn.put(testCf, keys[1], values[1]); + assertThat(txn.multiGetAsList(readOptions, cfList, Arrays.asList(keys))) + .containsExactly(values); + txn.commit(); + } + + try (final Transaction txn2 = dbContainer.beginTransaction()) { + try (final Transaction txn3 = dbContainer.beginTransaction()) { + assertThat(txn3.multiGetForUpdateAsList(readOptions, cfList, Arrays.asList(keys))) + .containsExactly(values); + + // NOTE: txn2 updates k1, during txn3 + txn2.put(testCf, keys[0], otherValue); + assertThat(txn2.get(testCf, readOptions, keys[0])).isEqualTo(otherValue); + txn2.commit(); + + try { + txn3.commit(); // should cause an exception! + } catch (final RocksDBException e) { + assertThat(e.getStatus().getCode()).isSameAs(Status.Code.Busy); + return; + } + } + } + + fail("Expected an exception for put after getForUpdate from conflicting" + + "transactions"); + } + } + + @Deprecated @Test public void multiGetForUpdate_conflict() throws RocksDBException { - final byte keys[][] = new byte[][] { - "key1".getBytes(UTF_8), - "key2".getBytes(UTF_8)}; - final byte values[][] = new byte[][] { - "value1".getBytes(UTF_8), - "value2".getBytes(UTF_8)}; + final byte[][] keys = new byte[][] {"key1".getBytes(UTF_8), "key2".getBytes(UTF_8)}; + final byte[][] values = new byte[][] {"value1".getBytes(UTF_8), "value2".getBytes(UTF_8)}; final byte[] otherValue = "otherValue".getBytes(UTF_8); try(final DBContainer dbContainer = startDb(); @@ -183,11 +220,50 @@ public class OptimisticTransactionTest extends AbstractTransactionTest { } } + @Test + public void multiGetasListForUpdate_conflict() throws RocksDBException { + final byte[][] keys = new byte[][] {"key1".getBytes(UTF_8), "key2".getBytes(UTF_8)}; + final byte[][] values = new byte[][] {"value1".getBytes(UTF_8), "value2".getBytes(UTF_8)}; + final byte[] otherValue = "otherValue".getBytes(UTF_8); + + try (final DBContainer dbContainer = startDb(); + final ReadOptions readOptions = new ReadOptions()) { + try (final Transaction txn = dbContainer.beginTransaction()) { + txn.put(keys[0], values[0]); + txn.put(keys[1], values[1]); + assertThat(txn.multiGetAsList(readOptions, Arrays.asList(keys))).containsExactly(values); + txn.commit(); + } + + try (final Transaction txn2 = dbContainer.beginTransaction()) { + try (final Transaction txn3 = dbContainer.beginTransaction()) { + assertThat(txn3.multiGetForUpdateAsList(readOptions, Arrays.asList(keys))) + .containsExactly(values); + + // NOTE: txn2 updates k1, during txn3 + txn2.put(keys[0], otherValue); + assertThat(txn2.get(readOptions, keys[0])).isEqualTo(otherValue); + txn2.commit(); + + try { + txn3.commit(); // should cause an exception! + } catch (final RocksDBException e) { + assertThat(e.getStatus().getCode()).isSameAs(Status.Code.Busy); + return; + } + } + } + + fail("Expected an exception for put after getForUpdate from conflicting" + + "transactions"); + } + } + @Test public void undoGetForUpdate_cf_conflict() throws RocksDBException { - final byte k1[] = "key1".getBytes(UTF_8); - final byte v1[] = "value1".getBytes(UTF_8); - final byte v12[] = "value12".getBytes(UTF_8); + final byte[] k1 = "key1".getBytes(UTF_8); + final byte[] v1 = "value1".getBytes(UTF_8); + final byte[] v12 = "value12".getBytes(UTF_8); try(final DBContainer dbContainer = startDb(); final ReadOptions readOptions = new ReadOptions()) { final ColumnFamilyHandle testCf = dbContainer.getTestColumnFamily(); @@ -220,9 +296,9 @@ public class OptimisticTransactionTest extends AbstractTransactionTest { @Test public void undoGetForUpdate_conflict() throws RocksDBException { - final byte k1[] = "key1".getBytes(UTF_8); - final byte v1[] = "value1".getBytes(UTF_8); - final byte v12[] = "value12".getBytes(UTF_8); + final byte[] k1 = "key1".getBytes(UTF_8); + final byte[] v1 = "value1".getBytes(UTF_8); + final byte[] v12 = "value12".getBytes(UTF_8); try(final DBContainer dbContainer = startDb(); final ReadOptions readOptions = new ReadOptions()) { @@ -261,12 +337,10 @@ public class OptimisticTransactionTest extends AbstractTransactionTest { try { txn.setName(name); + fail("Optimistic transactions cannot be named."); } catch(final RocksDBException e) { - assertThat(e.getStatus().getCode() == Status.Code.InvalidArgument); - return; + assertThat(e.getStatus().getCode()).isEqualTo(Status.Code.InvalidArgument); } - - fail("Optimistic transactions cannot be named."); } } diff --git a/java/src/test/java/org/rocksdb/TransactionTest.java b/java/src/test/java/org/rocksdb/TransactionTest.java index 725ff2cb5..ebe08ce20 100644 --- a/java/src/test/java/org/rocksdb/TransactionTest.java +++ b/java/src/test/java/org/rocksdb/TransactionTest.java @@ -19,9 +19,9 @@ public class TransactionTest extends AbstractTransactionTest { @Test public void getForUpdate_cf_conflict() throws RocksDBException { - final byte k1[] = "key1".getBytes(UTF_8); - final byte v1[] = "value1".getBytes(UTF_8); - final byte v12[] = "value12".getBytes(UTF_8); + final byte[] k1 = "key1".getBytes(UTF_8); + final byte[] v1 = "value1".getBytes(UTF_8); + final byte[] v12 = "value12".getBytes(UTF_8); try(final DBContainer dbContainer = startDb(); final ReadOptions readOptions = new ReadOptions()) { final ColumnFamilyHandle testCf = dbContainer.getTestColumnFamily(); @@ -53,9 +53,9 @@ public class TransactionTest extends AbstractTransactionTest { @Test public void getForUpdate_conflict() throws RocksDBException { - final byte k1[] = "key1".getBytes(UTF_8); - final byte v1[] = "value1".getBytes(UTF_8); - final byte v12[] = "value12".getBytes(UTF_8); + final byte[] k1 = "key1".getBytes(UTF_8); + final byte[] v1 = "value1".getBytes(UTF_8); + final byte[] v12 = "value12".getBytes(UTF_8); try(final DBContainer dbContainer = startDb(); final ReadOptions readOptions = new ReadOptions()) { @@ -86,12 +86,8 @@ public class TransactionTest extends AbstractTransactionTest { @Test public void multiGetForUpdate_cf_conflict() throws RocksDBException { - final byte keys[][] = new byte[][] { - "key1".getBytes(UTF_8), - "key2".getBytes(UTF_8)}; - final byte values[][] = new byte[][] { - "value1".getBytes(UTF_8), - "value2".getBytes(UTF_8)}; + final byte[][] keys = new byte[][] {"key1".getBytes(UTF_8), "key2".getBytes(UTF_8)}; + final byte[][] values = new byte[][] {"value1".getBytes(UTF_8), "value2".getBytes(UTF_8)}; final byte[] otherValue = "otherValue".getBytes(UTF_8); try(final DBContainer dbContainer = startDb(); @@ -126,14 +122,49 @@ public class TransactionTest extends AbstractTransactionTest { } } + @Test + public void multiGetAsListForUpdate_cf_conflict() throws RocksDBException { + final byte[][] keys = new byte[][] {"key1".getBytes(UTF_8), "key2".getBytes(UTF_8)}; + final byte[][] values = new byte[][] {"value1".getBytes(UTF_8), "value2".getBytes(UTF_8)}; + final byte[] otherValue = "otherValue".getBytes(UTF_8); + + try (final DBContainer dbContainer = startDb(); + final ReadOptions readOptions = new ReadOptions()) { + final ColumnFamilyHandle testCf = dbContainer.getTestColumnFamily(); + final List cfList = Arrays.asList(testCf, testCf); + + try (final Transaction txn = dbContainer.beginTransaction()) { + txn.put(testCf, keys[0], values[0]); + txn.put(testCf, keys[1], values[1]); + assertThat(txn.multiGetAsList(readOptions, cfList, Arrays.asList(keys))) + .containsExactly(values); + txn.commit(); + } + + try (final Transaction txn2 = dbContainer.beginTransaction()) { + try (final Transaction txn3 = dbContainer.beginTransaction()) { + assertThat(txn3.multiGetForUpdateAsList(readOptions, cfList, Arrays.asList(keys))) + .containsExactly(values); + + // NOTE: txn2 updates k1, during txn3 + try { + txn2.put(testCf, keys[0], otherValue); // should cause an exception! + } catch (final RocksDBException e) { + assertThat(e.getStatus().getCode()).isSameAs(Status.Code.TimedOut); + return; + } + } + } + + fail("Expected an exception for put after getForUpdate from conflicting" + + "transactions"); + } + } + @Test public void multiGetForUpdate_conflict() throws RocksDBException { - final byte keys[][] = new byte[][] { - "key1".getBytes(UTF_8), - "key2".getBytes(UTF_8)}; - final byte values[][] = new byte[][] { - "value1".getBytes(UTF_8), - "value2".getBytes(UTF_8)}; + final byte[][] keys = new byte[][] {"key1".getBytes(UTF_8), "key2".getBytes(UTF_8)}; + final byte[][] values = new byte[][] {"value1".getBytes(UTF_8), "value2".getBytes(UTF_8)}; final byte[] otherValue = "otherValue".getBytes(UTF_8); try(final DBContainer dbContainer = startDb(); @@ -150,6 +181,41 @@ public class TransactionTest extends AbstractTransactionTest { assertThat(txn3.multiGetForUpdate(readOptions, keys)) .isEqualTo(values); + // NOTE: txn2 updates k1, during txn3 + try { + txn2.put(keys[0], otherValue); // should cause an exception! + } catch (final RocksDBException e) { + assertThat(e.getStatus().getCode()).isSameAs(Status.Code.TimedOut); + return; + } + } + } + + fail("Expected an exception for put after getForUpdate from conflicting" + + "transactions"); + } + } + + @Test + public void multiGetAsListForUpdate_conflict() throws RocksDBException { + final byte[][] keys = new byte[][] {"key1".getBytes(UTF_8), "key2".getBytes(UTF_8)}; + final byte[][] values = new byte[][] {"value1".getBytes(UTF_8), "value2".getBytes(UTF_8)}; + final byte[] otherValue = "otherValue".getBytes(UTF_8); + + try (final DBContainer dbContainer = startDb(); + final ReadOptions readOptions = new ReadOptions()) { + try (final Transaction txn = dbContainer.beginTransaction()) { + txn.put(keys[0], values[0]); + txn.put(keys[1], values[1]); + assertThat(txn.multiGetAsList(readOptions, Arrays.asList(keys))).containsExactly(values); + txn.commit(); + } + + try (final Transaction txn2 = dbContainer.beginTransaction()) { + try (final Transaction txn3 = dbContainer.beginTransaction()) { + assertThat(txn3.multiGetForUpdateAsList(readOptions, Arrays.asList(keys))) + .containsExactly(values); + // NOTE: txn2 updates k1, during txn3 try { txn2.put(keys[0], otherValue); // should cause an exception!