handleAsync(RocksDBAsyncAPI api);
+ SYNC_RESULT handleSync(RocksDBSyncAPI api);
+ ASYNC_RESULT handleAsync(RocksDBAsyncAPI api);
- /**
- * Open a transaction
- *
- * Returns the transaction id
- *
- * @param timeoutMs timeout in milliseconds
- */
- record OpenTransaction(long timeoutMs) implements RocksDBAPICommand {
+ sealed interface RocksDBAPICommandSingle extends RocksDBAPICommand> {
- @Override
- public Long handleSync(RocksDBSyncAPI api) {
- return api.openTransaction(timeoutMs);
- }
- @Override
- public CompletionStage handleAsync(RocksDBAsyncAPI api) {
- return api.openTransactionAsync(timeoutMs);
- }
+ /**
+ * Open a transaction
+ *
+ * Returns the transaction id
+ *
+ * @param timeoutMs timeout in milliseconds
+ */
+ record OpenTransaction(long timeoutMs) implements RocksDBAPICommandSingle {
- }
- /**
- * Close a transaction
- *
- * Returns true if committed, if false, you should try again
- *
- * @param transactionId transaction id to close
- * @param commit true to commit the transaction, false to rollback it
- */
- record CloseTransaction(long transactionId, boolean commit) implements RocksDBAPICommand {
-
- @Override
- public Boolean handleSync(RocksDBSyncAPI api) {
- return api.closeTransaction(transactionId, commit);
- }
-
- @Override
- public CompletionStage handleAsync(RocksDBAsyncAPI api) {
- return api.closeTransactionAsync(transactionId, commit);
- }
-
- }
- /**
- * Close a failed update, discarding all changes
- *
- * @param updateId update id to close
- */
- record CloseFailedUpdate(long updateId) implements RocksDBAPICommand {
-
- @Override
- public Void handleSync(RocksDBSyncAPI api) {
- api.closeFailedUpdate(updateId);
- return null;
- }
-
- @Override
- public CompletionStage handleAsync(RocksDBAsyncAPI api) {
- return api.closeFailedUpdateAsync(updateId);
- }
-
- }
- /**
- * Create a column
- *
- * Returns the column id
- *
- * @param name column name
- * @param schema column key-value schema
- */
- record CreateColumn(String name, @NotNull ColumnSchema schema) implements RocksDBAPICommand {
-
- @Override
- public Long handleSync(RocksDBSyncAPI api) {
- return api.createColumn(name, schema);
- }
-
- @Override
- public CompletionStage handleAsync(RocksDBAsyncAPI api) {
- return api.createColumnAsync(name, schema);
- }
-
- }
- /**
- * Delete a column
- * @param columnId column id
- */
- record DeleteColumn(long columnId) implements RocksDBAPICommand {
-
- @Override
- public Void handleSync(RocksDBSyncAPI api) {
- api.deleteColumn(columnId);
- return null;
- }
-
- @Override
- public CompletionStage handleAsync(RocksDBAsyncAPI api) {
- return api.deleteColumnAsync(columnId);
- }
-
- }
- /**
- * Get column id by name
- *
- * Returns the column id
- *
- * @param name column name
- */
- record GetColumnId(@NotNull String name) implements RocksDBAPICommand {
-
- @Override
- public Long handleSync(RocksDBSyncAPI api) {
- return api.getColumnId(name);
- }
-
- @Override
- public CompletionStage handleAsync(RocksDBAsyncAPI api) {
- return api.getColumnIdAsync(name);
- }
-
- }
- /**
- * Put an element into the specified position
- * @param arena arena
- * @param transactionOrUpdateId transaction id, update id, or 0
- * @param columnId column id
- * @param keys column keys, or empty array if not needed
- * @param value value, or null if not needed
- * @param requestType the request type determines which type of data will be returned.
- */
- record Put(Arena arena,
- long transactionOrUpdateId,
- long columnId,
- Keys keys,
- @NotNull MemorySegment value,
- RequestPut super MemorySegment, T> requestType) implements RocksDBAPICommand {
-
- @Override
- public T handleSync(RocksDBSyncAPI api) {
- return api.put(arena, transactionOrUpdateId, columnId, keys, value, requestType);
- }
-
- @Override
- public CompletionStage handleAsync(RocksDBAsyncAPI api) {
- return api.putAsync(arena, transactionOrUpdateId, columnId, keys, value, requestType);
- }
-
- @Override
- public String toString() {
- var sb = new StringBuilder("PUT");
- if (transactionOrUpdateId != 0) {
- sb.append(" tx:").append(transactionOrUpdateId);
+ @Override
+ public Long handleSync(RocksDBSyncAPI api) {
+ return api.openTransaction(timeoutMs);
}
- sb.append(" column:").append(columnId);
- sb.append(" keys:").append(keys);
- sb.append(" value:").append(Utils.toPrettyString(value));
- sb.append(" expected:").append(requestType.getRequestTypeId());
- return sb.toString();
- }
- }
- /**
- * Put multiple elements into the specified positions
- * @param arena arena
- * @param transactionOrUpdateId transaction id, update id, or 0
- * @param columnId column id
- * @param keys multiple lists of column keys
- * @param values multiple values, or null if not needed
- * @param requestType the request type determines which type of data will be returned.
- */
- record PutMulti(Arena arena, long transactionOrUpdateId, long columnId,
- @NotNull List keys,
- @NotNull List<@NotNull MemorySegment> values,
- RequestPut super MemorySegment, T> requestType) implements RocksDBAPICommand> {
- @Override
- public List handleSync(RocksDBSyncAPI api) {
- return api.putMulti(arena, transactionOrUpdateId, columnId, keys, values, requestType);
- }
-
- @Override
- public CompletionStage> handleAsync(RocksDBAsyncAPI api) {
- return api.putMultiAsync(arena, transactionOrUpdateId, columnId, keys, values, requestType);
- }
-
- @Override
- public String toString() {
- var sb = new StringBuilder("PUT_MULTI");
- if (transactionOrUpdateId != 0) {
- sb.append(" tx:").append(transactionOrUpdateId);
+ @Override
+ public CompletableFuture handleAsync(RocksDBAsyncAPI api) {
+ return api.openTransactionAsync(timeoutMs);
}
- sb.append(" column:").append(columnId);
- sb.append(" expected:").append(requestType.getRequestTypeId());
- sb.append(" multi:[");
- for (int i = 0; i < keys.size(); i++) {
- if (i > 0) sb.append(",");
- sb.append(" keys:").append(keys.get(i));
- sb.append(" value:").append(Utils.toPrettyString(values.get(i)));
+
+ }
+ /**
+ * Close a transaction
+ *
+ * Returns true if committed, if false, you should try again
+ *
+ * @param transactionId transaction id to close
+ * @param commit true to commit the transaction, false to rollback it
+ */
+ record CloseTransaction(long transactionId, boolean commit) implements RocksDBAPICommandSingle {
+
+ @Override
+ public Boolean handleSync(RocksDBSyncAPI api) {
+ return api.closeTransaction(transactionId, commit);
}
- sb.append("]");
- return sb.toString();
- }
- }
- /**
- * Put multiple elements into the specified positions
- * @param columnId column id
- * @param batchPublisher publisher of batches of keys and values
- * @param mode put batch mode
- */
- record PutBatch(long columnId,
- @NotNull org.reactivestreams.Publisher<@NotNull KVBatch> batchPublisher,
- @NotNull PutBatchMode mode) implements RocksDBAPICommand {
- @Override
- public Void handleSync(RocksDBSyncAPI api) {
- api.putBatch(columnId, batchPublisher, mode);
- return null;
- }
-
- @Override
- public CompletionStage handleAsync(RocksDBAsyncAPI api) {
- return api.putBatchAsync(columnId, batchPublisher, mode);
- }
-
- @Override
- public String toString() {
- var sb = new StringBuilder("PUT_BATCH");
- sb.append(" column:").append(columnId);
- sb.append(" mode:").append(mode);
- sb.append(" batch:[...]");
- return sb.toString();
- }
- }
- /**
- * Get an element from the specified position
- * @param arena arena
- * @param transactionOrUpdateId transaction id, update id for retry operations, or 0
- * @param columnId column id
- * @param keys column keys, or empty array if not needed
- * @param requestType the request type determines which type of data will be returned.
- */
- record Get(Arena arena,
- long transactionOrUpdateId,
- long columnId,
- Keys keys,
- RequestGet super MemorySegment, T> requestType) implements RocksDBAPICommand {
-
- @Override
- public T handleSync(RocksDBSyncAPI api) {
- return api.get(arena, transactionOrUpdateId, columnId, keys, requestType);
- }
-
- @Override
- public CompletionStage handleAsync(RocksDBAsyncAPI api) {
- return api.getAsync(arena, transactionOrUpdateId, columnId, keys, requestType);
- }
-
- @Override
- public String toString() {
- var sb = new StringBuilder("GET");
- if (transactionOrUpdateId != 0) {
- sb.append(" tx:").append(transactionOrUpdateId);
+ @Override
+ public CompletableFuture handleAsync(RocksDBAsyncAPI api) {
+ return api.closeTransactionAsync(transactionId, commit);
}
- sb.append(" column:").append(columnId);
- sb.append(" keys:").append(keys);
- sb.append(" expected:").append(requestType.getRequestTypeId());
- return sb.toString();
+
+ }
+ /**
+ * Close a failed update, discarding all changes
+ *
+ * @param updateId update id to close
+ */
+ record CloseFailedUpdate(long updateId) implements RocksDBAPICommandSingle {
+
+ @Override
+ public Void handleSync(RocksDBSyncAPI api) {
+ api.closeFailedUpdate(updateId);
+ return null;
+ }
+
+ @Override
+ public CompletableFuture handleAsync(RocksDBAsyncAPI api) {
+ return api.closeFailedUpdateAsync(updateId);
+ }
+
+ }
+ /**
+ * Create a column
+ *
+ * Returns the column id
+ *
+ * @param name column name
+ * @param schema column key-value schema
+ */
+ record CreateColumn(String name, @NotNull ColumnSchema schema) implements RocksDBAPICommandSingle {
+
+ @Override
+ public Long handleSync(RocksDBSyncAPI api) {
+ return api.createColumn(name, schema);
+ }
+
+ @Override
+ public CompletableFuture handleAsync(RocksDBAsyncAPI api) {
+ return api.createColumnAsync(name, schema);
+ }
+
+ }
+ /**
+ * Delete a column
+ * @param columnId column id
+ */
+ record DeleteColumn(long columnId) implements RocksDBAPICommandSingle {
+
+ @Override
+ public Void handleSync(RocksDBSyncAPI api) {
+ api.deleteColumn(columnId);
+ return null;
+ }
+
+ @Override
+ public CompletableFuture handleAsync(RocksDBAsyncAPI api) {
+ return api.deleteColumnAsync(columnId);
+ }
+
+ }
+ /**
+ * Get column id by name
+ *
+ * Returns the column id
+ *
+ * @param name column name
+ */
+ record GetColumnId(@NotNull String name) implements RocksDBAPICommandSingle {
+
+ @Override
+ public Long handleSync(RocksDBSyncAPI api) {
+ return api.getColumnId(name);
+ }
+
+ @Override
+ public CompletableFuture handleAsync(RocksDBAsyncAPI api) {
+ return api.getColumnIdAsync(name);
+ }
+
+ }
+ /**
+ * Put an element into the specified position
+ * @param arena arena
+ * @param transactionOrUpdateId transaction id, update id, or 0
+ * @param columnId column id
+ * @param keys column keys, or empty array if not needed
+ * @param value value, or null if not needed
+ * @param requestType the request type determines which type of data will be returned.
+ */
+ record Put(Arena arena,
+ long transactionOrUpdateId,
+ long columnId,
+ Keys keys,
+ @NotNull MemorySegment value,
+ RequestPut super MemorySegment, T> requestType) implements RocksDBAPICommandSingle {
+
+ @Override
+ public T handleSync(RocksDBSyncAPI api) {
+ return api.put(arena, transactionOrUpdateId, columnId, keys, value, requestType);
+ }
+
+ @Override
+ public CompletableFuture handleAsync(RocksDBAsyncAPI api) {
+ return api.putAsync(arena, transactionOrUpdateId, columnId, keys, value, requestType);
+ }
+
+ @Override
+ public String toString() {
+ var sb = new StringBuilder("PUT");
+ if (transactionOrUpdateId != 0) {
+ sb.append(" tx:").append(transactionOrUpdateId);
+ }
+ sb.append(" column:").append(columnId);
+ sb.append(" keys:").append(keys);
+ sb.append(" value:").append(Utils.toPrettyString(value));
+ sb.append(" expected:").append(requestType.getRequestTypeId());
+ return sb.toString();
+ }
+ }
+ /**
+ * Put multiple elements into the specified positions
+ * @param arena arena
+ * @param transactionOrUpdateId transaction id, update id, or 0
+ * @param columnId column id
+ * @param keys multiple lists of column keys
+ * @param values multiple values, or null if not needed
+ * @param requestType the request type determines which type of data will be returned.
+ */
+ record PutMulti(Arena arena, long transactionOrUpdateId, long columnId,
+ @NotNull List keys,
+ @NotNull List<@NotNull MemorySegment> values,
+ RequestPut super MemorySegment, T> requestType) implements RocksDBAPICommandSingle> {
+
+ @Override
+ public List handleSync(RocksDBSyncAPI api) {
+ return api.putMulti(arena, transactionOrUpdateId, columnId, keys, values, requestType);
+ }
+
+ @Override
+ public CompletableFuture> handleAsync(RocksDBAsyncAPI api) {
+ return api.putMultiAsync(arena, transactionOrUpdateId, columnId, keys, values, requestType);
+ }
+
+ @Override
+ public String toString() {
+ var sb = new StringBuilder("PUT_MULTI");
+ if (transactionOrUpdateId != 0) {
+ sb.append(" tx:").append(transactionOrUpdateId);
+ }
+ sb.append(" column:").append(columnId);
+ sb.append(" expected:").append(requestType.getRequestTypeId());
+ sb.append(" multi:[");
+ for (int i = 0; i < keys.size(); i++) {
+ if (i > 0) sb.append(",");
+ sb.append(" keys:").append(keys.get(i));
+ sb.append(" value:").append(Utils.toPrettyString(values.get(i)));
+ }
+ sb.append("]");
+ return sb.toString();
+ }
+ }
+ /**
+ * Put multiple elements into the specified positions
+ * @param columnId column id
+ * @param batchPublisher publisher of batches of keys and values
+ * @param mode put batch mode
+ */
+ record PutBatch(long columnId,
+ @NotNull org.reactivestreams.Publisher<@NotNull KVBatch> batchPublisher,
+ @NotNull PutBatchMode mode) implements RocksDBAPICommandSingle {
+
+ @Override
+ public Void handleSync(RocksDBSyncAPI api) {
+ api.putBatch(columnId, batchPublisher, mode);
+ return null;
+ }
+
+ @Override
+ public CompletableFuture handleAsync(RocksDBAsyncAPI api) {
+ return api.putBatchAsync(columnId, batchPublisher, mode);
+ }
+
+ @Override
+ public String toString() {
+ var sb = new StringBuilder("PUT_BATCH");
+ sb.append(" column:").append(columnId);
+ sb.append(" mode:").append(mode);
+ sb.append(" batch:[...]");
+ return sb.toString();
+ }
+ }
+ /**
+ * Get an element from the specified position
+ * @param arena arena
+ * @param transactionOrUpdateId transaction id, update id for retry operations, or 0
+ * @param columnId column id
+ * @param keys column keys, or empty array if not needed
+ * @param requestType the request type determines which type of data will be returned.
+ */
+ record Get(Arena arena,
+ long transactionOrUpdateId,
+ long columnId,
+ Keys keys,
+ RequestGet super MemorySegment, T> requestType) implements RocksDBAPICommandSingle {
+
+ @Override
+ public T handleSync(RocksDBSyncAPI api) {
+ return api.get(arena, transactionOrUpdateId, columnId, keys, requestType);
+ }
+
+ @Override
+ public CompletableFuture handleAsync(RocksDBAsyncAPI api) {
+ return api.getAsync(arena, transactionOrUpdateId, columnId, keys, requestType);
+ }
+
+ @Override
+ public String toString() {
+ var sb = new StringBuilder("GET");
+ if (transactionOrUpdateId != 0) {
+ sb.append(" tx:").append(transactionOrUpdateId);
+ }
+ sb.append(" column:").append(columnId);
+ sb.append(" keys:").append(keys);
+ sb.append(" expected:").append(requestType.getRequestTypeId());
+ return sb.toString();
+ }
+ }
+ /**
+ * Open an iterator
+ *
+ * Returns the iterator id
+ *
+ * @param arena arena
+ * @param transactionId transaction id, or 0
+ * @param columnId column id
+ * @param startKeysInclusive start keys, inclusive. [] means "the beginning"
+ * @param endKeysExclusive end keys, exclusive. Null means "the end"
+ * @param reverse if true, seek in reverse direction
+ * @param timeoutMs timeout in milliseconds
+ */
+ record OpenIterator(Arena arena,
+ long transactionId,
+ long columnId,
+ Keys startKeysInclusive,
+ @Nullable Keys endKeysExclusive,
+ boolean reverse,
+ long timeoutMs) implements RocksDBAPICommandSingle {
+
+ @Override
+ public Long handleSync(RocksDBSyncAPI api) {
+ return api.openIterator(arena, transactionId, columnId, startKeysInclusive, endKeysExclusive, reverse, timeoutMs);
+ }
+
+ @Override
+ public CompletableFuture handleAsync(RocksDBAsyncAPI api) {
+ return api.openIteratorAsync(arena,
+ transactionId,
+ columnId,
+ startKeysInclusive,
+ endKeysExclusive,
+ reverse,
+ timeoutMs
+ );
+ }
+
+ }
+ /**
+ * Close an iterator
+ * @param iteratorId iterator id
+ */
+ record CloseIterator(long iteratorId) implements RocksDBAPICommandSingle {
+
+ @Override
+ public Void handleSync(RocksDBSyncAPI api) {
+ api.closeIterator(iteratorId);
+ return null;
+ }
+
+ @Override
+ public CompletableFuture handleAsync(RocksDBAsyncAPI api) {
+ return api.closeIteratorAsync(iteratorId);
+ }
+
+ }
+ /**
+ * Seek to the specific element during an iteration, or the subsequent one if not found
+ * @param arena arena
+ * @param iterationId iteration id
+ * @param keys keys, inclusive. [] means "the beginning"
+ */
+ record SeekTo(Arena arena, long iterationId, Keys keys) implements RocksDBAPICommandSingle {
+
+ @Override
+ public Void handleSync(RocksDBSyncAPI api) {
+ api.seekTo(arena, iterationId, keys);
+ return null;
+ }
+
+ @Override
+ public CompletableFuture handleAsync(RocksDBAsyncAPI api) {
+ return api.seekToAsync(arena, iterationId, keys);
+ }
+
+ }
+ /**
+ * Get the subsequent element during an iteration
+ * @param arena arena
+ * @param iterationId iteration id
+ * @param skipCount number of elements to skip
+ * @param takeCount number of elements to take
+ * @param requestType the request type determines which type of data will be returned.
+ */
+ record Subsequent(Arena arena,
+ long iterationId,
+ long skipCount,
+ long takeCount,
+ @NotNull RequestType.RequestIterate super MemorySegment, T> requestType)
+ implements RocksDBAPICommandSingle {
+
+ @Override
+ public T handleSync(RocksDBSyncAPI api) {
+ return api.subsequent(arena, iterationId, skipCount, takeCount, requestType);
+ }
+
+ @Override
+ public CompletableFuture handleAsync(RocksDBAsyncAPI api) {
+ return api.subsequentAsync(arena, iterationId, skipCount, takeCount, requestType);
+ }
+
+ }
+ /**
+ * Reduce values in a range
+ *
+ * Returns the result
+ *
+ * @param arena arena
+ * @param transactionId transaction id, or 0
+ * @param columnId column id
+ * @param startKeysInclusive start keys, inclusive. [] means "the beginning"
+ * @param endKeysExclusive end keys, exclusive. Null means "the end"
+ * @param reverse if true, seek in reverse direction
+ * @param requestType the request type determines which type of data will be returned.
+ * @param timeoutMs timeout in milliseconds
+ */
+ record ReduceRange(Arena arena,
+ long transactionId,
+ long columnId,
+ @Nullable Keys startKeysInclusive,
+ @Nullable Keys endKeysExclusive,
+ boolean reverse,
+ RequestType.RequestGetRange super KV, T> requestType,
+ long timeoutMs) implements RocksDBAPICommandSingle {
+
+ @Override
+ public T handleSync(RocksDBSyncAPI api) {
+ return api.reduceRange(arena,
+ transactionId,
+ columnId,
+ startKeysInclusive,
+ endKeysExclusive,
+ reverse,
+ requestType,
+ timeoutMs
+ );
+ }
+
+ @Override
+ public CompletableFuture handleAsync(RocksDBAsyncAPI api) {
+ return api.reduceRangeAsync(arena,
+ transactionId,
+ columnId,
+ startKeysInclusive,
+ endKeysExclusive,
+ reverse,
+ requestType,
+ timeoutMs
+ );
+ }
+
}
}
- /**
- * Open an iterator
- *
- * Returns the iterator id
- *
- * @param arena arena
- * @param transactionId transaction id, or 0
- * @param columnId column id
- * @param startKeysInclusive start keys, inclusive. [] means "the beginning"
- * @param endKeysExclusive end keys, exclusive. Null means "the end"
- * @param reverse if true, seek in reverse direction
- * @param timeoutMs timeout in milliseconds
- */
- record OpenIterator(Arena arena,
- long transactionId,
- long columnId,
- Keys startKeysInclusive,
- @Nullable Keys endKeysExclusive,
- boolean reverse,
- long timeoutMs) implements RocksDBAPICommand {
+ sealed interface RocksDBAPICommandStream extends RocksDBAPICommand, Publisher> {
+
+ /**
+ * Get some values in a range
+ *
+ * Returns the result
+ *
+ * @param arena arena
+ * @param transactionId transaction id, or 0
+ * @param columnId column id
+ * @param startKeysInclusive start keys, inclusive. [] means "the beginning"
+ * @param endKeysExclusive end keys, exclusive. Null means "the end"
+ * @param reverse if true, seek in reverse direction
+ * @param requestType the request type determines which type of data will be returned.
+ * @param timeoutMs timeout in milliseconds
+ */
+ record GetRange(Arena arena,
+ long transactionId,
+ long columnId,
+ @Nullable Keys startKeysInclusive,
+ @Nullable Keys endKeysExclusive,
+ boolean reverse,
+ RequestType.RequestGetRange super KV, T> requestType,
+ long timeoutMs) implements RocksDBAPICommandStream {
+
+ @Override
+ public Stream handleSync(RocksDBSyncAPI api) {
+ return api.getRange(arena,
+ transactionId,
+ columnId,
+ startKeysInclusive,
+ endKeysExclusive,
+ reverse,
+ requestType,
+ timeoutMs
+ );
+ }
+
+ @Override
+ public Publisher handleAsync(RocksDBAsyncAPI api) {
+ return api.getRangeAsync(arena,
+ transactionId,
+ columnId,
+ startKeysInclusive,
+ endKeysExclusive,
+ reverse,
+ requestType,
+ timeoutMs
+ );
+ }
- @Override
- public Long handleSync(RocksDBSyncAPI api) {
- return api.openIterator(arena, transactionId, columnId, startKeysInclusive, endKeysExclusive, reverse, timeoutMs);
}
-
- @Override
- public CompletionStage handleAsync(RocksDBAsyncAPI api) {
- return api.openIteratorAsync(arena,
- transactionId,
- columnId,
- startKeysInclusive,
- endKeysExclusive,
- reverse,
- timeoutMs
- );
- }
-
- }
- /**
- * Close an iterator
- * @param iteratorId iterator id
- */
- record CloseIterator(long iteratorId) implements RocksDBAPICommand {
-
- @Override
- public Void handleSync(RocksDBSyncAPI api) {
- api.closeIterator(iteratorId);
- return null;
- }
-
- @Override
- public CompletionStage handleAsync(RocksDBAsyncAPI api) {
- return api.closeIteratorAsync(iteratorId);
- }
-
- }
- /**
- * Seek to the specific element during an iteration, or the subsequent one if not found
- * @param arena arena
- * @param iterationId iteration id
- * @param keys keys, inclusive. [] means "the beginning"
- */
- record SeekTo(Arena arena, long iterationId, Keys keys) implements
- RocksDBAPICommand {
-
- @Override
- public Void handleSync(RocksDBSyncAPI api) {
- api.seekTo(arena, iterationId, keys);
- return null;
- }
-
- @Override
- public CompletionStage handleAsync(RocksDBAsyncAPI api) {
- return api.seekToAsync(arena, iterationId, keys);
- }
-
- }
- /**
- * Get the subsequent element during an iteration
- * @param arena arena
- * @param iterationId iteration id
- * @param skipCount number of elements to skip
- * @param takeCount number of elements to take
- * @param requestType the request type determines which type of data will be returned.
- */
- record Subsequent(Arena arena,
- long iterationId,
- long skipCount,
- long takeCount,
- @NotNull RequestType.RequestIterate super MemorySegment, T> requestType)
- implements RocksDBAPICommand {
-
- @Override
- public T handleSync(RocksDBSyncAPI api) {
- return api.subsequent(arena, iterationId, skipCount, takeCount, requestType);
- }
-
- @Override
- public CompletionStage handleAsync(RocksDBAsyncAPI api) {
- return api.subsequentAsync(arena, iterationId, skipCount, takeCount, requestType);
- }
-
- }
- /**
- * Get some values in a range
- *
- * Returns the result
- *
- * @param arena arena
- * @param transactionId transaction id, or 0
- * @param columnId column id
- * @param startKeysInclusive start keys, inclusive. [] means "the beginning"
- * @param endKeysExclusive end keys, exclusive. Null means "the end"
- * @param reverse if true, seek in reverse direction
- * @param requestType the request type determines which type of data will be returned.
- * @param timeoutMs timeout in milliseconds
- */
- record GetRange(Arena arena,
- long transactionId,
- long columnId,
- @Nullable Keys startKeysInclusive,
- @Nullable Keys endKeysExclusive,
- boolean reverse,
- RequestType.RequestGetRange super KV, T> requestType,
- long timeoutMs) implements RocksDBAPICommand {
-
- @Override
- public T handleSync(RocksDBSyncAPI api) {
- return api.getRange(arena,
- transactionId,
- columnId,
- startKeysInclusive,
- endKeysExclusive,
- reverse,
- requestType,
- timeoutMs
- );
- }
-
- @Override
- public CompletableFuture handleAsync(RocksDBAsyncAPI api) {
- return api.getRangeAsync(arena,
- transactionId,
- columnId,
- startKeysInclusive,
- endKeysExclusive,
- reverse,
- requestType,
- timeoutMs
- );
- }
-
}
}
diff --git a/src/main/java/it/cavallium/rockserver/core/common/RocksDBAsyncAPI.java b/src/main/java/it/cavallium/rockserver/core/common/RocksDBAsyncAPI.java
index 092f440..8e13ecb 100644
--- a/src/main/java/it/cavallium/rockserver/core/common/RocksDBAsyncAPI.java
+++ b/src/main/java/it/cavallium/rockserver/core/common/RocksDBAsyncAPI.java
@@ -2,20 +2,21 @@ package it.cavallium.rockserver.core.common;
import it.cavallium.rockserver.core.common.RequestType.RequestGet;
import it.cavallium.rockserver.core.common.RequestType.RequestPut;
-import it.cavallium.rockserver.core.common.RocksDBAPICommand.CloseFailedUpdate;
-import it.cavallium.rockserver.core.common.RocksDBAPICommand.CloseIterator;
-import it.cavallium.rockserver.core.common.RocksDBAPICommand.CloseTransaction;
-import it.cavallium.rockserver.core.common.RocksDBAPICommand.CreateColumn;
-import it.cavallium.rockserver.core.common.RocksDBAPICommand.DeleteColumn;
-import it.cavallium.rockserver.core.common.RocksDBAPICommand.Get;
-import it.cavallium.rockserver.core.common.RocksDBAPICommand.GetColumnId;
-import it.cavallium.rockserver.core.common.RocksDBAPICommand.GetRange;
-import it.cavallium.rockserver.core.common.RocksDBAPICommand.OpenTransaction;
-import it.cavallium.rockserver.core.common.RocksDBAPICommand.Put;
-import it.cavallium.rockserver.core.common.RocksDBAPICommand.PutMulti;
-import it.cavallium.rockserver.core.common.RocksDBAPICommand.PutBatch;
-import it.cavallium.rockserver.core.common.RocksDBAPICommand.SeekTo;
-import it.cavallium.rockserver.core.common.RocksDBAPICommand.Subsequent;
+import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.CloseFailedUpdate;
+import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.CloseIterator;
+import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.CloseTransaction;
+import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.CreateColumn;
+import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.DeleteColumn;
+import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.Get;
+import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.GetColumnId;
+import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.ReduceRange;
+import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.OpenIterator;
+import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.OpenTransaction;
+import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.Put;
+import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.PutMulti;
+import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.PutBatch;
+import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.SeekTo;
+import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.Subsequent;
import java.lang.foreign.Arena;
import java.lang.foreign.MemorySegment;
import java.util.List;
@@ -100,7 +101,7 @@ public interface RocksDBAsyncAPI extends RocksDBAsyncAPIRequestHandler {
@Nullable Keys endKeysExclusive,
boolean reverse,
long timeoutMs) throws RocksDBException {
- return requestAsync(new RocksDBAPICommand.OpenIterator(arena,
+ return requestAsync(new OpenIterator(arena,
transactionId,
columnId,
startKeysInclusive,
@@ -129,16 +130,16 @@ public interface RocksDBAsyncAPI extends RocksDBAsyncAPIRequestHandler {
return requestAsync(new Subsequent<>(arena, iterationId, skipCount, takeCount, requestType));
}
- /** See: {@link GetRange}. */
- default CompletableFuture getRangeAsync(Arena arena,
- long transactionId,
- long columnId,
- @Nullable Keys startKeysInclusive,
- @Nullable Keys endKeysExclusive,
- boolean reverse,
- RequestType.RequestGetRange super KV, T> requestType,
- long timeoutMs) throws RocksDBException {
- return requestAsync(new RocksDBAPICommand.GetRange<>(arena,
+ /** See: {@link ReduceRange}. */
+ default CompletableFuture reduceRangeAsync(Arena arena,
+ long transactionId,
+ long columnId,
+ @Nullable Keys startKeysInclusive,
+ @Nullable Keys endKeysExclusive,
+ boolean reverse,
+ RequestType.RequestGetRange super KV, T> requestType,
+ long timeoutMs) throws RocksDBException {
+ return requestAsync(new ReduceRange<>(arena,
transactionId,
columnId,
startKeysInclusive,
@@ -148,4 +149,17 @@ public interface RocksDBAsyncAPI extends RocksDBAsyncAPIRequestHandler {
timeoutMs
));
}
+
+ /** See: {@link GetRange}. */
+ default Publisher getRangeAsync(Arena arena,
+ long transactionId,
+ long columnId,
+ @Nullable Keys startKeysInclusive,
+ @Nullable Keys endKeysExclusive,
+ boolean reverse,
+ RequestType.RequestGetRange super KV, T> requestType,
+ long timeoutMs) throws RocksDBException {
+ throw RocksDBException.of(RocksDBException.RocksDBErrorType.NOT_IMPLEMENTED,
+ "GetRangeStream is not implemented");
+ }
}
diff --git a/src/main/java/it/cavallium/rockserver/core/common/RocksDBAsyncAPIRequestHandler.java b/src/main/java/it/cavallium/rockserver/core/common/RocksDBAsyncAPIRequestHandler.java
index ac96e61..943de49 100644
--- a/src/main/java/it/cavallium/rockserver/core/common/RocksDBAsyncAPIRequestHandler.java
+++ b/src/main/java/it/cavallium/rockserver/core/common/RocksDBAsyncAPIRequestHandler.java
@@ -1,10 +1,20 @@
package it.cavallium.rockserver.core.common;
+import org.reactivestreams.Publisher;
+import org.reactivestreams.Subscriber;
+
import java.util.concurrent.CompletableFuture;
public interface RocksDBAsyncAPIRequestHandler {
- default CompletableFuture requestAsync(RocksDBAPICommand req) {
- return CompletableFuture.failedFuture(new UnsupportedOperationException("Unsupported request type: " + req));
+ @SuppressWarnings("unchecked")
+ default RA requestAsync(RocksDBAPICommand req) {
+ return (RA) switch (req) {
+ case RocksDBAPICommand.RocksDBAPICommandStream> _ ->
+ (Publisher) subscriber ->
+ subscriber.onError(new UnsupportedOperationException("Unsupported request type: " + req));
+ case RocksDBAPICommand.RocksDBAPICommandSingle> _ ->
+ CompletableFuture.failedFuture(new UnsupportedOperationException("Unsupported request type: " + req));
+ };
}
}
diff --git a/src/main/java/it/cavallium/rockserver/core/common/RocksDBSyncAPI.java b/src/main/java/it/cavallium/rockserver/core/common/RocksDBSyncAPI.java
index f21bacb..5427a98 100644
--- a/src/main/java/it/cavallium/rockserver/core/common/RocksDBSyncAPI.java
+++ b/src/main/java/it/cavallium/rockserver/core/common/RocksDBSyncAPI.java
@@ -2,24 +2,27 @@ package it.cavallium.rockserver.core.common;
import it.cavallium.rockserver.core.common.RequestType.RequestGet;
import it.cavallium.rockserver.core.common.RequestType.RequestPut;
-import it.cavallium.rockserver.core.common.RocksDBAPICommand.CloseFailedUpdate;
-import it.cavallium.rockserver.core.common.RocksDBAPICommand.CloseIterator;
-import it.cavallium.rockserver.core.common.RocksDBAPICommand.CloseTransaction;
-import it.cavallium.rockserver.core.common.RocksDBAPICommand.CreateColumn;
-import it.cavallium.rockserver.core.common.RocksDBAPICommand.DeleteColumn;
-import it.cavallium.rockserver.core.common.RocksDBAPICommand.Get;
-import it.cavallium.rockserver.core.common.RocksDBAPICommand.GetColumnId;
-import it.cavallium.rockserver.core.common.RocksDBAPICommand.GetRange;
-import it.cavallium.rockserver.core.common.RocksDBAPICommand.OpenIterator;
-import it.cavallium.rockserver.core.common.RocksDBAPICommand.OpenTransaction;
-import it.cavallium.rockserver.core.common.RocksDBAPICommand.Put;
-import it.cavallium.rockserver.core.common.RocksDBAPICommand.PutMulti;
-import it.cavallium.rockserver.core.common.RocksDBAPICommand.PutBatch;
-import it.cavallium.rockserver.core.common.RocksDBAPICommand.SeekTo;
-import it.cavallium.rockserver.core.common.RocksDBAPICommand.Subsequent;
+import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.CloseFailedUpdate;
+import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.CloseIterator;
+import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.CloseTransaction;
+import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.CreateColumn;
+import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.DeleteColumn;
+import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.Get;
+import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.GetColumnId;
+import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.ReduceRange;
+import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.OpenIterator;
+import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.OpenTransaction;
+import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.Put;
+import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.PutMulti;
+import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.PutBatch;
+import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.SeekTo;
+import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.Subsequent;
+import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandStream.GetRange;
import java.lang.foreign.Arena;
import java.lang.foreign.MemorySegment;
import java.util.List;
+import java.util.stream.Stream;
+
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@@ -121,15 +124,27 @@ public interface RocksDBSyncAPI extends RocksDBSyncAPIRequestHandler {
return requestSync(new Subsequent<>(arena, iterationId, skipCount, takeCount, requestType));
}
+ /** See: {@link ReduceRange}. */
+ default T reduceRange(Arena arena,
+ long transactionId,
+ long columnId,
+ @Nullable Keys startKeysInclusive,
+ @Nullable Keys endKeysExclusive,
+ boolean reverse,
+ @NotNull RequestType.RequestGetRange super KV, T> requestType,
+ long timeoutMs) throws RocksDBException {
+ return requestSync(new ReduceRange<>(arena, transactionId, columnId, startKeysInclusive, endKeysExclusive, reverse, requestType, timeoutMs));
+ }
+
/** See: {@link GetRange}. */
- default T getRange(Arena arena,
- long transactionId,
- long columnId,
- @Nullable Keys startKeysInclusive,
- @Nullable Keys endKeysExclusive,
- boolean reverse,
- @NotNull RequestType.RequestGetRange super KV, T> requestType,
- long timeoutMs) throws RocksDBException {
+ default Stream getRange(Arena arena,
+ long transactionId,
+ long columnId,
+ @Nullable Keys startKeysInclusive,
+ @Nullable Keys endKeysExclusive,
+ boolean reverse,
+ @NotNull RequestType.RequestGetRange super KV, T> requestType,
+ long timeoutMs) throws RocksDBException {
return requestSync(new GetRange<>(arena, transactionId, columnId, startKeysInclusive, endKeysExclusive, reverse, requestType, timeoutMs));
}
}
diff --git a/src/main/java/it/cavallium/rockserver/core/common/RocksDBSyncAPIRequestHandler.java b/src/main/java/it/cavallium/rockserver/core/common/RocksDBSyncAPIRequestHandler.java
index a388ba0..a514aa8 100644
--- a/src/main/java/it/cavallium/rockserver/core/common/RocksDBSyncAPIRequestHandler.java
+++ b/src/main/java/it/cavallium/rockserver/core/common/RocksDBSyncAPIRequestHandler.java
@@ -2,7 +2,7 @@ package it.cavallium.rockserver.core.common;
public interface RocksDBSyncAPIRequestHandler {
- default R requestSync(RocksDBAPICommand req) {
+ default RS requestSync(RocksDBAPICommand req) {
throw new UnsupportedOperationException("Unsupported request type: " + req);
}
}
diff --git a/src/main/java/it/cavallium/rockserver/core/impl/EmbeddedDB.java b/src/main/java/it/cavallium/rockserver/core/impl/EmbeddedDB.java
index bdd2130..06f9cb4 100644
--- a/src/main/java/it/cavallium/rockserver/core/impl/EmbeddedDB.java
+++ b/src/main/java/it/cavallium/rockserver/core/impl/EmbeddedDB.java
@@ -949,14 +949,14 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable {
@SuppressWarnings("unchecked")
@Override
- public T getRange(Arena arena,
- long transactionId,
- long columnId,
- @Nullable Keys startKeysInclusive,
- @Nullable Keys endKeysExclusive,
- boolean reverse,
- RequestType.@NotNull RequestGetRange super KV, T> requestType,
- long timeoutMs) throws it.cavallium.rockserver.core.common.RocksDBException {
+ public T reduceRange(Arena arena,
+ long transactionId,
+ long columnId,
+ @Nullable Keys startKeysInclusive,
+ @Nullable Keys endKeysExclusive,
+ boolean reverse,
+ RequestType.@NotNull RequestGetRange super KV, T> requestType,
+ long timeoutMs) throws it.cavallium.rockserver.core.common.RocksDBException {
ops.beginOp();
try {
var col = getColumn(columnId);
diff --git a/src/main/java/it/cavallium/rockserver/core/server/GrpcServer.java b/src/main/java/it/cavallium/rockserver/core/server/GrpcServer.java
index 8d3a550..b5941b3 100644
--- a/src/main/java/it/cavallium/rockserver/core/server/GrpcServer.java
+++ b/src/main/java/it/cavallium/rockserver/core/server/GrpcServer.java
@@ -706,7 +706,7 @@ public class GrpcServer extends Server {
try {
try (var arena = Arena.ofConfined()) {
it.cavallium.rockserver.core.common.FirstAndLast firstAndLast
- = api.getRange(arena,
+ = api.reduceRange(arena,
request.getTransactionId(),
request.getColumnId(),
mapKeys(arena, request.getStartKeysInclusiveCount(), request::getStartKeysInclusive),
diff --git a/src/main/proto/rocksdb.proto b/src/main/proto/rocksdb.proto
index 4c59c15..e14855b 100644
--- a/src/main/proto/rocksdb.proto
+++ b/src/main/proto/rocksdb.proto
@@ -125,4 +125,5 @@ service RocksDBService {
rpc subsequentExists(SubsequentRequest) returns (PreviousPresence);
rpc subsequentMultiGet(SubsequentRequest) returns (stream KV);
rpc getRangeFirstAndLast(GetRangeRequest) returns (FirstAndLast);
+ rpc getRangeStream(GetRangeRequest) returns (stream KV);
}
diff --git a/src/test/java/it/cavallium/rockserver/core/impl/test/EmbeddedDBTest.java b/src/test/java/it/cavallium/rockserver/core/impl/test/EmbeddedDBTest.java
index 4510f91..1a311a7 100644
--- a/src/test/java/it/cavallium/rockserver/core/impl/test/EmbeddedDBTest.java
+++ b/src/test/java/it/cavallium/rockserver/core/impl/test/EmbeddedDBTest.java
@@ -384,23 +384,27 @@ abstract class EmbeddedDBTest {
}
@Test
- void getRangeFirstAndLast() {
+ void reduceRangeFirstAndLast() {
var firstKey = getKVSequenceFirst().keys();
var lastKey = getKVSequenceLast().keys();
var prevLastKV = getKVSequence().get(getKVSequence().size() - 2);
if (getSchemaVarKeys().isEmpty()) {
- FirstAndLast firstAndLast = db.getRange(arena, 0, colId, firstKey, lastKey, false, RequestType.firstAndLast(), 1000);
+ FirstAndLast firstAndLast = db.reduceRange(arena, 0, colId, firstKey, lastKey, false, RequestType.firstAndLast(), 1000);
Assertions.assertNull(firstAndLast.first(), "First should be empty because the db is empty");
Assertions.assertNull(firstAndLast.last(), "Last should be empty because the db is empty");
fillSomeKeys();
- firstAndLast = db.getRange(arena, 0, colId, firstKey, lastKey, false, RequestType.firstAndLast(), 1000);
+ firstAndLast = db.reduceRange(arena, 0, colId, firstKey, lastKey, false, RequestType.firstAndLast(), 1000);
Assertions.assertEquals(getKVSequenceFirst(), firstAndLast.first(), "First key mismatch");
Assertions.assertEquals(prevLastKV, firstAndLast.last(), "Last key mismatch");
+
+ firstAndLast = db.reduceRange(arena, 0, colId, firstKey, firstKey, false, RequestType.firstAndLast(), 1000);
+ Assertions.assertNull(firstAndLast.first(), "First should be empty because the range is empty");
+ Assertions.assertNull(firstAndLast.last(), "Last should be empty because the range is empty");
} else {
Assertions.assertThrowsExactly(RocksDBException.class, () -> {
- db.getRange(arena, 0, colId, firstKey, lastKey, false, RequestType.firstAndLast(), 1000);
+ db.reduceRange(arena, 0, colId, firstKey, lastKey, false, RequestType.firstAndLast(), 1000);
});
}
}