From 9e06f9b9c2b5b444d496660630f96f925897025b Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Fri, 18 Oct 2024 18:03:17 +0200 Subject: [PATCH] Partial commit --- pom.xml | 5 + src/fatjar/java/module-info.java | 3 +- .../core/client/EmbeddedConnection.java | 26 +- .../core/client/GrpcConnection.java | 27 +- .../core/common/RocksDBAPICommand.java | 877 ++++++++++-------- .../core/common/RocksDBAsyncAPI.java | 64 +- .../common/RocksDBAsyncAPIRequestHandler.java | 14 +- .../core/common/RocksDBSyncAPI.java | 61 +- .../common/RocksDBSyncAPIRequestHandler.java | 2 +- .../rockserver/core/impl/EmbeddedDB.java | 16 +- .../rockserver/core/server/GrpcServer.java | 2 +- src/main/proto/rocksdb.proto | 1 + .../core/impl/test/EmbeddedDBTest.java | 12 +- 13 files changed, 619 insertions(+), 491 deletions(-) diff --git a/pom.xml b/pom.xml index 1a1e8e8..63fd1a2 100644 --- a/pom.xml +++ b/pom.xml @@ -151,6 +151,11 @@ reactive-streams 1.0.4 + + io.projectreactor + reactor-core + 3.6.4 + org.lz4 diff --git a/src/fatjar/java/module-info.java b/src/fatjar/java/module-info.java index 0d28f6f..cef924d 100644 --- a/src/fatjar/java/module-info.java +++ b/src/fatjar/java/module-info.java @@ -30,8 +30,9 @@ module rockserver.core { requires io.netty.transport.classes.epoll; requires org.reactivestreams; requires io.netty.transport.unix.common; + requires reactor.core; - exports it.cavallium.rockserver.core.client; + exports it.cavallium.rockserver.core.client; exports it.cavallium.rockserver.core.common; exports it.cavallium.rockserver.core.config; opens it.cavallium.rockserver.core.resources; diff --git a/src/main/java/it/cavallium/rockserver/core/client/EmbeddedConnection.java b/src/main/java/it/cavallium/rockserver/core/client/EmbeddedConnection.java index cbb74f8..67693c3 100644 --- a/src/main/java/it/cavallium/rockserver/core/client/EmbeddedConnection.java +++ b/src/main/java/it/cavallium/rockserver/core/client/EmbeddedConnection.java @@ -15,9 +15,14 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.stream.Stream; + import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; public class EmbeddedConnection extends BaseConnection implements RocksDBAPI { @@ -83,17 +88,18 @@ public class EmbeddedConnection extends BaseConnection implements RocksDBAPI { } @Override - public R requestSync(RocksDBAPICommand req) { + public RS requestSync(RocksDBAPICommand req) { return req.handleSync(this); } - @Override - public CompletableFuture requestAsync(RocksDBAPICommand req) { - if (req instanceof RocksDBAPICommand.PutBatch putBatch) { - //noinspection unchecked - return (CompletableFuture) this.putBatchAsync(putBatch.columnId(), putBatch.batchPublisher(), putBatch.mode()); - } - return CompletableFuture.supplyAsync(() -> req.handleSync(this), exeuctor); + @SuppressWarnings("unchecked") + @Override + public RA requestAsync(RocksDBAPICommand req) { + return (RA) switch (req) { + case RocksDBAPICommand.RocksDBAPICommandSingle.PutBatch putBatch -> this.putBatchAsync(putBatch.columnId(), putBatch.batchPublisher(), putBatch.mode()); + case RocksDBAPICommand.RocksDBAPICommandSingle _ -> CompletableFuture.supplyAsync(() -> req.handleSync(this), exeuctor); + case RocksDBAPICommand.RocksDBAPICommandStream _ -> throw RocksDBException.of(RocksDBException.RocksDBErrorType.NOT_IMPLEMENTED, "The request of type " + req.getClass().getName() + " is not implemented in class " + this.getClass().getName()); + }; } @Override @@ -170,7 +176,7 @@ public class EmbeddedConnection extends BaseConnection implements RocksDBAPI { } @Override - public T getRange(Arena arena, long transactionId, long columnId, @Nullable Keys startKeysInclusive, @Nullable Keys endKeysExclusive, boolean reverse, RequestType.@NotNull RequestGetRange requestType, long timeoutMs) throws RocksDBException { - return db.getRange(arena, transactionId, columnId, startKeysInclusive, endKeysExclusive, reverse, requestType, timeoutMs); + public T reduceRange(Arena arena, long transactionId, long columnId, @Nullable Keys startKeysInclusive, @Nullable Keys endKeysExclusive, boolean reverse, RequestType.@NotNull RequestGetRange requestType, long timeoutMs) throws RocksDBException { + return db.reduceRange(arena, transactionId, columnId, startKeysInclusive, endKeysExclusive, reverse, requestType, timeoutMs); } } diff --git a/src/main/java/it/cavallium/rockserver/core/client/GrpcConnection.java b/src/main/java/it/cavallium/rockserver/core/client/GrpcConnection.java index c7a7834..33b2aa8 100644 --- a/src/main/java/it/cavallium/rockserver/core/client/GrpcConnection.java +++ b/src/main/java/it/cavallium/rockserver/core/client/GrpcConnection.java @@ -61,6 +61,7 @@ import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import reactor.core.publisher.Flux; import static it.cavallium.rockserver.core.common.Utils.toMemorySegment; @@ -133,12 +134,19 @@ public class GrpcConnection extends BaseConnection implements RocksDBAPI { return this; } - @Override - public R requestSync(RocksDBAPICommand req) { - var asyncResponse = req.handleAsync(this); - return asyncResponse - .toCompletableFuture() - .join(); + @SuppressWarnings("unchecked") + @Override + public RS requestSync(RocksDBAPICommand req) { + return (RS) switch (req) { + case RocksDBAPICommand.RocksDBAPICommandSingle _ -> { + var asyncResponse = (CompletableFuture) req.handleAsync(this); + yield asyncResponse.join(); + } + case RocksDBAPICommand.RocksDBAPICommandStream _ -> { + var asyncResponse = (Publisher) req.handleAsync(this); + yield Flux.from(asyncResponse).toStream(); + } + }; } @Override @@ -508,7 +516,7 @@ public class GrpcConnection extends BaseConnection implements RocksDBAPI { @SuppressWarnings("unchecked") @Override - public CompletableFuture getRangeAsync(Arena arena, long transactionId, long columnId, @NotNull Keys startKeysInclusive, @Nullable Keys endKeysExclusive, boolean reverse, RequestType.RequestGetRange requestType, long timeoutMs) throws RocksDBException { + public CompletableFuture reduceRangeAsync(Arena arena, long transactionId, long columnId, @NotNull Keys startKeysInclusive, @Nullable Keys endKeysExclusive, boolean reverse, RequestType.RequestGetRange requestType, long timeoutMs) throws RocksDBException { var request = GetRangeRequest.newBuilder() .setTransactionId(transactionId) .setColumnId(columnId) @@ -526,6 +534,11 @@ public class GrpcConnection extends BaseConnection implements RocksDBAPI { }; } + @Override + public Publisher getRangeStream(Arena arena, long transactionId, long columnId, @Nullable Keys startKeysInclusive, @Nullable Keys endKeysExclusive, boolean reverse, RequestType.RequestGetRange requestType, long timeoutMs) throws RocksDBException { + // todo: implement + } + private static it.cavallium.rockserver.core.common.Delta mapDelta(Delta x) { return new it.cavallium.rockserver.core.common.Delta<>( x.hasPrevious() ? mapByteString(x.getPrevious()) : null, diff --git a/src/main/java/it/cavallium/rockserver/core/common/RocksDBAPICommand.java b/src/main/java/it/cavallium/rockserver/core/common/RocksDBAPICommand.java index e798a92..2600f99 100644 --- a/src/main/java/it/cavallium/rockserver/core/common/RocksDBAPICommand.java +++ b/src/main/java/it/cavallium/rockserver/core/common/RocksDBAPICommand.java @@ -7,432 +7,491 @@ import java.lang.foreign.MemorySegment; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; +import java.util.stream.Stream; + import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; +import org.reactivestreams.Publisher; -public sealed interface RocksDBAPICommand { +public sealed interface RocksDBAPICommand { - R handleSync(RocksDBSyncAPI api); - CompletionStage handleAsync(RocksDBAsyncAPI api); + SYNC_RESULT handleSync(RocksDBSyncAPI api); + ASYNC_RESULT handleAsync(RocksDBAsyncAPI api); - /** - * Open a transaction - *

- * Returns the transaction id - * - * @param timeoutMs timeout in milliseconds - */ - record OpenTransaction(long timeoutMs) implements RocksDBAPICommand { + sealed interface RocksDBAPICommandSingle extends RocksDBAPICommand> { - @Override - public Long handleSync(RocksDBSyncAPI api) { - return api.openTransaction(timeoutMs); - } - @Override - public CompletionStage handleAsync(RocksDBAsyncAPI api) { - return api.openTransactionAsync(timeoutMs); - } + /** + * Open a transaction + *

+ * Returns the transaction id + * + * @param timeoutMs timeout in milliseconds + */ + record OpenTransaction(long timeoutMs) implements RocksDBAPICommandSingle { - } - /** - * Close a transaction - *

- * Returns true if committed, if false, you should try again - * - * @param transactionId transaction id to close - * @param commit true to commit the transaction, false to rollback it - */ - record CloseTransaction(long transactionId, boolean commit) implements RocksDBAPICommand { - - @Override - public Boolean handleSync(RocksDBSyncAPI api) { - return api.closeTransaction(transactionId, commit); - } - - @Override - public CompletionStage handleAsync(RocksDBAsyncAPI api) { - return api.closeTransactionAsync(transactionId, commit); - } - - } - /** - * Close a failed update, discarding all changes - * - * @param updateId update id to close - */ - record CloseFailedUpdate(long updateId) implements RocksDBAPICommand { - - @Override - public Void handleSync(RocksDBSyncAPI api) { - api.closeFailedUpdate(updateId); - return null; - } - - @Override - public CompletionStage handleAsync(RocksDBAsyncAPI api) { - return api.closeFailedUpdateAsync(updateId); - } - - } - /** - * Create a column - *

- * Returns the column id - * - * @param name column name - * @param schema column key-value schema - */ - record CreateColumn(String name, @NotNull ColumnSchema schema) implements RocksDBAPICommand { - - @Override - public Long handleSync(RocksDBSyncAPI api) { - return api.createColumn(name, schema); - } - - @Override - public CompletionStage handleAsync(RocksDBAsyncAPI api) { - return api.createColumnAsync(name, schema); - } - - } - /** - * Delete a column - * @param columnId column id - */ - record DeleteColumn(long columnId) implements RocksDBAPICommand { - - @Override - public Void handleSync(RocksDBSyncAPI api) { - api.deleteColumn(columnId); - return null; - } - - @Override - public CompletionStage handleAsync(RocksDBAsyncAPI api) { - return api.deleteColumnAsync(columnId); - } - - } - /** - * Get column id by name - *

- * Returns the column id - * - * @param name column name - */ - record GetColumnId(@NotNull String name) implements RocksDBAPICommand { - - @Override - public Long handleSync(RocksDBSyncAPI api) { - return api.getColumnId(name); - } - - @Override - public CompletionStage handleAsync(RocksDBAsyncAPI api) { - return api.getColumnIdAsync(name); - } - - } - /** - * Put an element into the specified position - * @param arena arena - * @param transactionOrUpdateId transaction id, update id, or 0 - * @param columnId column id - * @param keys column keys, or empty array if not needed - * @param value value, or null if not needed - * @param requestType the request type determines which type of data will be returned. - */ - record Put(Arena arena, - long transactionOrUpdateId, - long columnId, - Keys keys, - @NotNull MemorySegment value, - RequestPut requestType) implements RocksDBAPICommand { - - @Override - public T handleSync(RocksDBSyncAPI api) { - return api.put(arena, transactionOrUpdateId, columnId, keys, value, requestType); - } - - @Override - public CompletionStage handleAsync(RocksDBAsyncAPI api) { - return api.putAsync(arena, transactionOrUpdateId, columnId, keys, value, requestType); - } - - @Override - public String toString() { - var sb = new StringBuilder("PUT"); - if (transactionOrUpdateId != 0) { - sb.append(" tx:").append(transactionOrUpdateId); + @Override + public Long handleSync(RocksDBSyncAPI api) { + return api.openTransaction(timeoutMs); } - sb.append(" column:").append(columnId); - sb.append(" keys:").append(keys); - sb.append(" value:").append(Utils.toPrettyString(value)); - sb.append(" expected:").append(requestType.getRequestTypeId()); - return sb.toString(); - } - } - /** - * Put multiple elements into the specified positions - * @param arena arena - * @param transactionOrUpdateId transaction id, update id, or 0 - * @param columnId column id - * @param keys multiple lists of column keys - * @param values multiple values, or null if not needed - * @param requestType the request type determines which type of data will be returned. - */ - record PutMulti(Arena arena, long transactionOrUpdateId, long columnId, - @NotNull List keys, - @NotNull List<@NotNull MemorySegment> values, - RequestPut requestType) implements RocksDBAPICommand> { - @Override - public List handleSync(RocksDBSyncAPI api) { - return api.putMulti(arena, transactionOrUpdateId, columnId, keys, values, requestType); - } - - @Override - public CompletionStage> handleAsync(RocksDBAsyncAPI api) { - return api.putMultiAsync(arena, transactionOrUpdateId, columnId, keys, values, requestType); - } - - @Override - public String toString() { - var sb = new StringBuilder("PUT_MULTI"); - if (transactionOrUpdateId != 0) { - sb.append(" tx:").append(transactionOrUpdateId); + @Override + public CompletableFuture handleAsync(RocksDBAsyncAPI api) { + return api.openTransactionAsync(timeoutMs); } - sb.append(" column:").append(columnId); - sb.append(" expected:").append(requestType.getRequestTypeId()); - sb.append(" multi:["); - for (int i = 0; i < keys.size(); i++) { - if (i > 0) sb.append(","); - sb.append(" keys:").append(keys.get(i)); - sb.append(" value:").append(Utils.toPrettyString(values.get(i))); + + } + /** + * Close a transaction + *

+ * Returns true if committed, if false, you should try again + * + * @param transactionId transaction id to close + * @param commit true to commit the transaction, false to rollback it + */ + record CloseTransaction(long transactionId, boolean commit) implements RocksDBAPICommandSingle { + + @Override + public Boolean handleSync(RocksDBSyncAPI api) { + return api.closeTransaction(transactionId, commit); } - sb.append("]"); - return sb.toString(); - } - } - /** - * Put multiple elements into the specified positions - * @param columnId column id - * @param batchPublisher publisher of batches of keys and values - * @param mode put batch mode - */ - record PutBatch(long columnId, - @NotNull org.reactivestreams.Publisher<@NotNull KVBatch> batchPublisher, - @NotNull PutBatchMode mode) implements RocksDBAPICommand { - @Override - public Void handleSync(RocksDBSyncAPI api) { - api.putBatch(columnId, batchPublisher, mode); - return null; - } - - @Override - public CompletionStage handleAsync(RocksDBAsyncAPI api) { - return api.putBatchAsync(columnId, batchPublisher, mode); - } - - @Override - public String toString() { - var sb = new StringBuilder("PUT_BATCH"); - sb.append(" column:").append(columnId); - sb.append(" mode:").append(mode); - sb.append(" batch:[...]"); - return sb.toString(); - } - } - /** - * Get an element from the specified position - * @param arena arena - * @param transactionOrUpdateId transaction id, update id for retry operations, or 0 - * @param columnId column id - * @param keys column keys, or empty array if not needed - * @param requestType the request type determines which type of data will be returned. - */ - record Get(Arena arena, - long transactionOrUpdateId, - long columnId, - Keys keys, - RequestGet requestType) implements RocksDBAPICommand { - - @Override - public T handleSync(RocksDBSyncAPI api) { - return api.get(arena, transactionOrUpdateId, columnId, keys, requestType); - } - - @Override - public CompletionStage handleAsync(RocksDBAsyncAPI api) { - return api.getAsync(arena, transactionOrUpdateId, columnId, keys, requestType); - } - - @Override - public String toString() { - var sb = new StringBuilder("GET"); - if (transactionOrUpdateId != 0) { - sb.append(" tx:").append(transactionOrUpdateId); + @Override + public CompletableFuture handleAsync(RocksDBAsyncAPI api) { + return api.closeTransactionAsync(transactionId, commit); } - sb.append(" column:").append(columnId); - sb.append(" keys:").append(keys); - sb.append(" expected:").append(requestType.getRequestTypeId()); - return sb.toString(); + + } + /** + * Close a failed update, discarding all changes + * + * @param updateId update id to close + */ + record CloseFailedUpdate(long updateId) implements RocksDBAPICommandSingle { + + @Override + public Void handleSync(RocksDBSyncAPI api) { + api.closeFailedUpdate(updateId); + return null; + } + + @Override + public CompletableFuture handleAsync(RocksDBAsyncAPI api) { + return api.closeFailedUpdateAsync(updateId); + } + + } + /** + * Create a column + *

+ * Returns the column id + * + * @param name column name + * @param schema column key-value schema + */ + record CreateColumn(String name, @NotNull ColumnSchema schema) implements RocksDBAPICommandSingle { + + @Override + public Long handleSync(RocksDBSyncAPI api) { + return api.createColumn(name, schema); + } + + @Override + public CompletableFuture handleAsync(RocksDBAsyncAPI api) { + return api.createColumnAsync(name, schema); + } + + } + /** + * Delete a column + * @param columnId column id + */ + record DeleteColumn(long columnId) implements RocksDBAPICommandSingle { + + @Override + public Void handleSync(RocksDBSyncAPI api) { + api.deleteColumn(columnId); + return null; + } + + @Override + public CompletableFuture handleAsync(RocksDBAsyncAPI api) { + return api.deleteColumnAsync(columnId); + } + + } + /** + * Get column id by name + *

+ * Returns the column id + * + * @param name column name + */ + record GetColumnId(@NotNull String name) implements RocksDBAPICommandSingle { + + @Override + public Long handleSync(RocksDBSyncAPI api) { + return api.getColumnId(name); + } + + @Override + public CompletableFuture handleAsync(RocksDBAsyncAPI api) { + return api.getColumnIdAsync(name); + } + + } + /** + * Put an element into the specified position + * @param arena arena + * @param transactionOrUpdateId transaction id, update id, or 0 + * @param columnId column id + * @param keys column keys, or empty array if not needed + * @param value value, or null if not needed + * @param requestType the request type determines which type of data will be returned. + */ + record Put(Arena arena, + long transactionOrUpdateId, + long columnId, + Keys keys, + @NotNull MemorySegment value, + RequestPut requestType) implements RocksDBAPICommandSingle { + + @Override + public T handleSync(RocksDBSyncAPI api) { + return api.put(arena, transactionOrUpdateId, columnId, keys, value, requestType); + } + + @Override + public CompletableFuture handleAsync(RocksDBAsyncAPI api) { + return api.putAsync(arena, transactionOrUpdateId, columnId, keys, value, requestType); + } + + @Override + public String toString() { + var sb = new StringBuilder("PUT"); + if (transactionOrUpdateId != 0) { + sb.append(" tx:").append(transactionOrUpdateId); + } + sb.append(" column:").append(columnId); + sb.append(" keys:").append(keys); + sb.append(" value:").append(Utils.toPrettyString(value)); + sb.append(" expected:").append(requestType.getRequestTypeId()); + return sb.toString(); + } + } + /** + * Put multiple elements into the specified positions + * @param arena arena + * @param transactionOrUpdateId transaction id, update id, or 0 + * @param columnId column id + * @param keys multiple lists of column keys + * @param values multiple values, or null if not needed + * @param requestType the request type determines which type of data will be returned. + */ + record PutMulti(Arena arena, long transactionOrUpdateId, long columnId, + @NotNull List keys, + @NotNull List<@NotNull MemorySegment> values, + RequestPut requestType) implements RocksDBAPICommandSingle> { + + @Override + public List handleSync(RocksDBSyncAPI api) { + return api.putMulti(arena, transactionOrUpdateId, columnId, keys, values, requestType); + } + + @Override + public CompletableFuture> handleAsync(RocksDBAsyncAPI api) { + return api.putMultiAsync(arena, transactionOrUpdateId, columnId, keys, values, requestType); + } + + @Override + public String toString() { + var sb = new StringBuilder("PUT_MULTI"); + if (transactionOrUpdateId != 0) { + sb.append(" tx:").append(transactionOrUpdateId); + } + sb.append(" column:").append(columnId); + sb.append(" expected:").append(requestType.getRequestTypeId()); + sb.append(" multi:["); + for (int i = 0; i < keys.size(); i++) { + if (i > 0) sb.append(","); + sb.append(" keys:").append(keys.get(i)); + sb.append(" value:").append(Utils.toPrettyString(values.get(i))); + } + sb.append("]"); + return sb.toString(); + } + } + /** + * Put multiple elements into the specified positions + * @param columnId column id + * @param batchPublisher publisher of batches of keys and values + * @param mode put batch mode + */ + record PutBatch(long columnId, + @NotNull org.reactivestreams.Publisher<@NotNull KVBatch> batchPublisher, + @NotNull PutBatchMode mode) implements RocksDBAPICommandSingle { + + @Override + public Void handleSync(RocksDBSyncAPI api) { + api.putBatch(columnId, batchPublisher, mode); + return null; + } + + @Override + public CompletableFuture handleAsync(RocksDBAsyncAPI api) { + return api.putBatchAsync(columnId, batchPublisher, mode); + } + + @Override + public String toString() { + var sb = new StringBuilder("PUT_BATCH"); + sb.append(" column:").append(columnId); + sb.append(" mode:").append(mode); + sb.append(" batch:[...]"); + return sb.toString(); + } + } + /** + * Get an element from the specified position + * @param arena arena + * @param transactionOrUpdateId transaction id, update id for retry operations, or 0 + * @param columnId column id + * @param keys column keys, or empty array if not needed + * @param requestType the request type determines which type of data will be returned. + */ + record Get(Arena arena, + long transactionOrUpdateId, + long columnId, + Keys keys, + RequestGet requestType) implements RocksDBAPICommandSingle { + + @Override + public T handleSync(RocksDBSyncAPI api) { + return api.get(arena, transactionOrUpdateId, columnId, keys, requestType); + } + + @Override + public CompletableFuture handleAsync(RocksDBAsyncAPI api) { + return api.getAsync(arena, transactionOrUpdateId, columnId, keys, requestType); + } + + @Override + public String toString() { + var sb = new StringBuilder("GET"); + if (transactionOrUpdateId != 0) { + sb.append(" tx:").append(transactionOrUpdateId); + } + sb.append(" column:").append(columnId); + sb.append(" keys:").append(keys); + sb.append(" expected:").append(requestType.getRequestTypeId()); + return sb.toString(); + } + } + /** + * Open an iterator + *

+ * Returns the iterator id + * + * @param arena arena + * @param transactionId transaction id, or 0 + * @param columnId column id + * @param startKeysInclusive start keys, inclusive. [] means "the beginning" + * @param endKeysExclusive end keys, exclusive. Null means "the end" + * @param reverse if true, seek in reverse direction + * @param timeoutMs timeout in milliseconds + */ + record OpenIterator(Arena arena, + long transactionId, + long columnId, + Keys startKeysInclusive, + @Nullable Keys endKeysExclusive, + boolean reverse, + long timeoutMs) implements RocksDBAPICommandSingle { + + @Override + public Long handleSync(RocksDBSyncAPI api) { + return api.openIterator(arena, transactionId, columnId, startKeysInclusive, endKeysExclusive, reverse, timeoutMs); + } + + @Override + public CompletableFuture handleAsync(RocksDBAsyncAPI api) { + return api.openIteratorAsync(arena, + transactionId, + columnId, + startKeysInclusive, + endKeysExclusive, + reverse, + timeoutMs + ); + } + + } + /** + * Close an iterator + * @param iteratorId iterator id + */ + record CloseIterator(long iteratorId) implements RocksDBAPICommandSingle { + + @Override + public Void handleSync(RocksDBSyncAPI api) { + api.closeIterator(iteratorId); + return null; + } + + @Override + public CompletableFuture handleAsync(RocksDBAsyncAPI api) { + return api.closeIteratorAsync(iteratorId); + } + + } + /** + * Seek to the specific element during an iteration, or the subsequent one if not found + * @param arena arena + * @param iterationId iteration id + * @param keys keys, inclusive. [] means "the beginning" + */ + record SeekTo(Arena arena, long iterationId, Keys keys) implements RocksDBAPICommandSingle { + + @Override + public Void handleSync(RocksDBSyncAPI api) { + api.seekTo(arena, iterationId, keys); + return null; + } + + @Override + public CompletableFuture handleAsync(RocksDBAsyncAPI api) { + return api.seekToAsync(arena, iterationId, keys); + } + + } + /** + * Get the subsequent element during an iteration + * @param arena arena + * @param iterationId iteration id + * @param skipCount number of elements to skip + * @param takeCount number of elements to take + * @param requestType the request type determines which type of data will be returned. + */ + record Subsequent(Arena arena, + long iterationId, + long skipCount, + long takeCount, + @NotNull RequestType.RequestIterate requestType) + implements RocksDBAPICommandSingle { + + @Override + public T handleSync(RocksDBSyncAPI api) { + return api.subsequent(arena, iterationId, skipCount, takeCount, requestType); + } + + @Override + public CompletableFuture handleAsync(RocksDBAsyncAPI api) { + return api.subsequentAsync(arena, iterationId, skipCount, takeCount, requestType); + } + + } + /** + * Reduce values in a range + *

+ * Returns the result + * + * @param arena arena + * @param transactionId transaction id, or 0 + * @param columnId column id + * @param startKeysInclusive start keys, inclusive. [] means "the beginning" + * @param endKeysExclusive end keys, exclusive. Null means "the end" + * @param reverse if true, seek in reverse direction + * @param requestType the request type determines which type of data will be returned. + * @param timeoutMs timeout in milliseconds + */ + record ReduceRange(Arena arena, + long transactionId, + long columnId, + @Nullable Keys startKeysInclusive, + @Nullable Keys endKeysExclusive, + boolean reverse, + RequestType.RequestGetRange requestType, + long timeoutMs) implements RocksDBAPICommandSingle { + + @Override + public T handleSync(RocksDBSyncAPI api) { + return api.reduceRange(arena, + transactionId, + columnId, + startKeysInclusive, + endKeysExclusive, + reverse, + requestType, + timeoutMs + ); + } + + @Override + public CompletableFuture handleAsync(RocksDBAsyncAPI api) { + return api.reduceRangeAsync(arena, + transactionId, + columnId, + startKeysInclusive, + endKeysExclusive, + reverse, + requestType, + timeoutMs + ); + } + } } - /** - * Open an iterator - *

- * Returns the iterator id - * - * @param arena arena - * @param transactionId transaction id, or 0 - * @param columnId column id - * @param startKeysInclusive start keys, inclusive. [] means "the beginning" - * @param endKeysExclusive end keys, exclusive. Null means "the end" - * @param reverse if true, seek in reverse direction - * @param timeoutMs timeout in milliseconds - */ - record OpenIterator(Arena arena, - long transactionId, - long columnId, - Keys startKeysInclusive, - @Nullable Keys endKeysExclusive, - boolean reverse, - long timeoutMs) implements RocksDBAPICommand { + sealed interface RocksDBAPICommandStream extends RocksDBAPICommand, Publisher> { + + /** + * Get some values in a range + *

+ * Returns the result + * + * @param arena arena + * @param transactionId transaction id, or 0 + * @param columnId column id + * @param startKeysInclusive start keys, inclusive. [] means "the beginning" + * @param endKeysExclusive end keys, exclusive. Null means "the end" + * @param reverse if true, seek in reverse direction + * @param requestType the request type determines which type of data will be returned. + * @param timeoutMs timeout in milliseconds + */ + record GetRange(Arena arena, + long transactionId, + long columnId, + @Nullable Keys startKeysInclusive, + @Nullable Keys endKeysExclusive, + boolean reverse, + RequestType.RequestGetRange requestType, + long timeoutMs) implements RocksDBAPICommandStream { + + @Override + public Stream handleSync(RocksDBSyncAPI api) { + return api.getRange(arena, + transactionId, + columnId, + startKeysInclusive, + endKeysExclusive, + reverse, + requestType, + timeoutMs + ); + } + + @Override + public Publisher handleAsync(RocksDBAsyncAPI api) { + return api.getRangeAsync(arena, + transactionId, + columnId, + startKeysInclusive, + endKeysExclusive, + reverse, + requestType, + timeoutMs + ); + } - @Override - public Long handleSync(RocksDBSyncAPI api) { - return api.openIterator(arena, transactionId, columnId, startKeysInclusive, endKeysExclusive, reverse, timeoutMs); } - - @Override - public CompletionStage handleAsync(RocksDBAsyncAPI api) { - return api.openIteratorAsync(arena, - transactionId, - columnId, - startKeysInclusive, - endKeysExclusive, - reverse, - timeoutMs - ); - } - - } - /** - * Close an iterator - * @param iteratorId iterator id - */ - record CloseIterator(long iteratorId) implements RocksDBAPICommand { - - @Override - public Void handleSync(RocksDBSyncAPI api) { - api.closeIterator(iteratorId); - return null; - } - - @Override - public CompletionStage handleAsync(RocksDBAsyncAPI api) { - return api.closeIteratorAsync(iteratorId); - } - - } - /** - * Seek to the specific element during an iteration, or the subsequent one if not found - * @param arena arena - * @param iterationId iteration id - * @param keys keys, inclusive. [] means "the beginning" - */ - record SeekTo(Arena arena, long iterationId, Keys keys) implements - RocksDBAPICommand { - - @Override - public Void handleSync(RocksDBSyncAPI api) { - api.seekTo(arena, iterationId, keys); - return null; - } - - @Override - public CompletionStage handleAsync(RocksDBAsyncAPI api) { - return api.seekToAsync(arena, iterationId, keys); - } - - } - /** - * Get the subsequent element during an iteration - * @param arena arena - * @param iterationId iteration id - * @param skipCount number of elements to skip - * @param takeCount number of elements to take - * @param requestType the request type determines which type of data will be returned. - */ - record Subsequent(Arena arena, - long iterationId, - long skipCount, - long takeCount, - @NotNull RequestType.RequestIterate requestType) - implements RocksDBAPICommand { - - @Override - public T handleSync(RocksDBSyncAPI api) { - return api.subsequent(arena, iterationId, skipCount, takeCount, requestType); - } - - @Override - public CompletionStage handleAsync(RocksDBAsyncAPI api) { - return api.subsequentAsync(arena, iterationId, skipCount, takeCount, requestType); - } - - } - /** - * Get some values in a range - *

- * Returns the result - * - * @param arena arena - * @param transactionId transaction id, or 0 - * @param columnId column id - * @param startKeysInclusive start keys, inclusive. [] means "the beginning" - * @param endKeysExclusive end keys, exclusive. Null means "the end" - * @param reverse if true, seek in reverse direction - * @param requestType the request type determines which type of data will be returned. - * @param timeoutMs timeout in milliseconds - */ - record GetRange(Arena arena, - long transactionId, - long columnId, - @Nullable Keys startKeysInclusive, - @Nullable Keys endKeysExclusive, - boolean reverse, - RequestType.RequestGetRange requestType, - long timeoutMs) implements RocksDBAPICommand { - - @Override - public T handleSync(RocksDBSyncAPI api) { - return api.getRange(arena, - transactionId, - columnId, - startKeysInclusive, - endKeysExclusive, - reverse, - requestType, - timeoutMs - ); - } - - @Override - public CompletableFuture handleAsync(RocksDBAsyncAPI api) { - return api.getRangeAsync(arena, - transactionId, - columnId, - startKeysInclusive, - endKeysExclusive, - reverse, - requestType, - timeoutMs - ); - } - } } diff --git a/src/main/java/it/cavallium/rockserver/core/common/RocksDBAsyncAPI.java b/src/main/java/it/cavallium/rockserver/core/common/RocksDBAsyncAPI.java index 092f440..8e13ecb 100644 --- a/src/main/java/it/cavallium/rockserver/core/common/RocksDBAsyncAPI.java +++ b/src/main/java/it/cavallium/rockserver/core/common/RocksDBAsyncAPI.java @@ -2,20 +2,21 @@ package it.cavallium.rockserver.core.common; import it.cavallium.rockserver.core.common.RequestType.RequestGet; import it.cavallium.rockserver.core.common.RequestType.RequestPut; -import it.cavallium.rockserver.core.common.RocksDBAPICommand.CloseFailedUpdate; -import it.cavallium.rockserver.core.common.RocksDBAPICommand.CloseIterator; -import it.cavallium.rockserver.core.common.RocksDBAPICommand.CloseTransaction; -import it.cavallium.rockserver.core.common.RocksDBAPICommand.CreateColumn; -import it.cavallium.rockserver.core.common.RocksDBAPICommand.DeleteColumn; -import it.cavallium.rockserver.core.common.RocksDBAPICommand.Get; -import it.cavallium.rockserver.core.common.RocksDBAPICommand.GetColumnId; -import it.cavallium.rockserver.core.common.RocksDBAPICommand.GetRange; -import it.cavallium.rockserver.core.common.RocksDBAPICommand.OpenTransaction; -import it.cavallium.rockserver.core.common.RocksDBAPICommand.Put; -import it.cavallium.rockserver.core.common.RocksDBAPICommand.PutMulti; -import it.cavallium.rockserver.core.common.RocksDBAPICommand.PutBatch; -import it.cavallium.rockserver.core.common.RocksDBAPICommand.SeekTo; -import it.cavallium.rockserver.core.common.RocksDBAPICommand.Subsequent; +import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.CloseFailedUpdate; +import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.CloseIterator; +import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.CloseTransaction; +import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.CreateColumn; +import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.DeleteColumn; +import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.Get; +import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.GetColumnId; +import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.ReduceRange; +import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.OpenIterator; +import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.OpenTransaction; +import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.Put; +import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.PutMulti; +import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.PutBatch; +import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.SeekTo; +import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.Subsequent; import java.lang.foreign.Arena; import java.lang.foreign.MemorySegment; import java.util.List; @@ -100,7 +101,7 @@ public interface RocksDBAsyncAPI extends RocksDBAsyncAPIRequestHandler { @Nullable Keys endKeysExclusive, boolean reverse, long timeoutMs) throws RocksDBException { - return requestAsync(new RocksDBAPICommand.OpenIterator(arena, + return requestAsync(new OpenIterator(arena, transactionId, columnId, startKeysInclusive, @@ -129,16 +130,16 @@ public interface RocksDBAsyncAPI extends RocksDBAsyncAPIRequestHandler { return requestAsync(new Subsequent<>(arena, iterationId, skipCount, takeCount, requestType)); } - /** See: {@link GetRange}. */ - default CompletableFuture getRangeAsync(Arena arena, - long transactionId, - long columnId, - @Nullable Keys startKeysInclusive, - @Nullable Keys endKeysExclusive, - boolean reverse, - RequestType.RequestGetRange requestType, - long timeoutMs) throws RocksDBException { - return requestAsync(new RocksDBAPICommand.GetRange<>(arena, + /** See: {@link ReduceRange}. */ + default CompletableFuture reduceRangeAsync(Arena arena, + long transactionId, + long columnId, + @Nullable Keys startKeysInclusive, + @Nullable Keys endKeysExclusive, + boolean reverse, + RequestType.RequestGetRange requestType, + long timeoutMs) throws RocksDBException { + return requestAsync(new ReduceRange<>(arena, transactionId, columnId, startKeysInclusive, @@ -148,4 +149,17 @@ public interface RocksDBAsyncAPI extends RocksDBAsyncAPIRequestHandler { timeoutMs )); } + + /** See: {@link GetRange}. */ + default Publisher getRangeAsync(Arena arena, + long transactionId, + long columnId, + @Nullable Keys startKeysInclusive, + @Nullable Keys endKeysExclusive, + boolean reverse, + RequestType.RequestGetRange requestType, + long timeoutMs) throws RocksDBException { + throw RocksDBException.of(RocksDBException.RocksDBErrorType.NOT_IMPLEMENTED, + "GetRangeStream is not implemented"); + } } diff --git a/src/main/java/it/cavallium/rockserver/core/common/RocksDBAsyncAPIRequestHandler.java b/src/main/java/it/cavallium/rockserver/core/common/RocksDBAsyncAPIRequestHandler.java index ac96e61..943de49 100644 --- a/src/main/java/it/cavallium/rockserver/core/common/RocksDBAsyncAPIRequestHandler.java +++ b/src/main/java/it/cavallium/rockserver/core/common/RocksDBAsyncAPIRequestHandler.java @@ -1,10 +1,20 @@ package it.cavallium.rockserver.core.common; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; + import java.util.concurrent.CompletableFuture; public interface RocksDBAsyncAPIRequestHandler { - default CompletableFuture requestAsync(RocksDBAPICommand req) { - return CompletableFuture.failedFuture(new UnsupportedOperationException("Unsupported request type: " + req)); + @SuppressWarnings("unchecked") + default RA requestAsync(RocksDBAPICommand req) { + return (RA) switch (req) { + case RocksDBAPICommand.RocksDBAPICommandStream _ -> + (Publisher) subscriber -> + subscriber.onError(new UnsupportedOperationException("Unsupported request type: " + req)); + case RocksDBAPICommand.RocksDBAPICommandSingle _ -> + CompletableFuture.failedFuture(new UnsupportedOperationException("Unsupported request type: " + req)); + }; } } diff --git a/src/main/java/it/cavallium/rockserver/core/common/RocksDBSyncAPI.java b/src/main/java/it/cavallium/rockserver/core/common/RocksDBSyncAPI.java index f21bacb..5427a98 100644 --- a/src/main/java/it/cavallium/rockserver/core/common/RocksDBSyncAPI.java +++ b/src/main/java/it/cavallium/rockserver/core/common/RocksDBSyncAPI.java @@ -2,24 +2,27 @@ package it.cavallium.rockserver.core.common; import it.cavallium.rockserver.core.common.RequestType.RequestGet; import it.cavallium.rockserver.core.common.RequestType.RequestPut; -import it.cavallium.rockserver.core.common.RocksDBAPICommand.CloseFailedUpdate; -import it.cavallium.rockserver.core.common.RocksDBAPICommand.CloseIterator; -import it.cavallium.rockserver.core.common.RocksDBAPICommand.CloseTransaction; -import it.cavallium.rockserver.core.common.RocksDBAPICommand.CreateColumn; -import it.cavallium.rockserver.core.common.RocksDBAPICommand.DeleteColumn; -import it.cavallium.rockserver.core.common.RocksDBAPICommand.Get; -import it.cavallium.rockserver.core.common.RocksDBAPICommand.GetColumnId; -import it.cavallium.rockserver.core.common.RocksDBAPICommand.GetRange; -import it.cavallium.rockserver.core.common.RocksDBAPICommand.OpenIterator; -import it.cavallium.rockserver.core.common.RocksDBAPICommand.OpenTransaction; -import it.cavallium.rockserver.core.common.RocksDBAPICommand.Put; -import it.cavallium.rockserver.core.common.RocksDBAPICommand.PutMulti; -import it.cavallium.rockserver.core.common.RocksDBAPICommand.PutBatch; -import it.cavallium.rockserver.core.common.RocksDBAPICommand.SeekTo; -import it.cavallium.rockserver.core.common.RocksDBAPICommand.Subsequent; +import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.CloseFailedUpdate; +import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.CloseIterator; +import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.CloseTransaction; +import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.CreateColumn; +import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.DeleteColumn; +import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.Get; +import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.GetColumnId; +import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.ReduceRange; +import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.OpenIterator; +import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.OpenTransaction; +import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.Put; +import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.PutMulti; +import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.PutBatch; +import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.SeekTo; +import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.Subsequent; +import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandStream.GetRange; import java.lang.foreign.Arena; import java.lang.foreign.MemorySegment; import java.util.List; +import java.util.stream.Stream; + import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -121,15 +124,27 @@ public interface RocksDBSyncAPI extends RocksDBSyncAPIRequestHandler { return requestSync(new Subsequent<>(arena, iterationId, skipCount, takeCount, requestType)); } + /** See: {@link ReduceRange}. */ + default T reduceRange(Arena arena, + long transactionId, + long columnId, + @Nullable Keys startKeysInclusive, + @Nullable Keys endKeysExclusive, + boolean reverse, + @NotNull RequestType.RequestGetRange requestType, + long timeoutMs) throws RocksDBException { + return requestSync(new ReduceRange<>(arena, transactionId, columnId, startKeysInclusive, endKeysExclusive, reverse, requestType, timeoutMs)); + } + /** See: {@link GetRange}. */ - default T getRange(Arena arena, - long transactionId, - long columnId, - @Nullable Keys startKeysInclusive, - @Nullable Keys endKeysExclusive, - boolean reverse, - @NotNull RequestType.RequestGetRange requestType, - long timeoutMs) throws RocksDBException { + default Stream getRange(Arena arena, + long transactionId, + long columnId, + @Nullable Keys startKeysInclusive, + @Nullable Keys endKeysExclusive, + boolean reverse, + @NotNull RequestType.RequestGetRange requestType, + long timeoutMs) throws RocksDBException { return requestSync(new GetRange<>(arena, transactionId, columnId, startKeysInclusive, endKeysExclusive, reverse, requestType, timeoutMs)); } } diff --git a/src/main/java/it/cavallium/rockserver/core/common/RocksDBSyncAPIRequestHandler.java b/src/main/java/it/cavallium/rockserver/core/common/RocksDBSyncAPIRequestHandler.java index a388ba0..a514aa8 100644 --- a/src/main/java/it/cavallium/rockserver/core/common/RocksDBSyncAPIRequestHandler.java +++ b/src/main/java/it/cavallium/rockserver/core/common/RocksDBSyncAPIRequestHandler.java @@ -2,7 +2,7 @@ package it.cavallium.rockserver.core.common; public interface RocksDBSyncAPIRequestHandler { - default R requestSync(RocksDBAPICommand req) { + default RS requestSync(RocksDBAPICommand req) { throw new UnsupportedOperationException("Unsupported request type: " + req); } } diff --git a/src/main/java/it/cavallium/rockserver/core/impl/EmbeddedDB.java b/src/main/java/it/cavallium/rockserver/core/impl/EmbeddedDB.java index bdd2130..06f9cb4 100644 --- a/src/main/java/it/cavallium/rockserver/core/impl/EmbeddedDB.java +++ b/src/main/java/it/cavallium/rockserver/core/impl/EmbeddedDB.java @@ -949,14 +949,14 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable { @SuppressWarnings("unchecked") @Override - public T getRange(Arena arena, - long transactionId, - long columnId, - @Nullable Keys startKeysInclusive, - @Nullable Keys endKeysExclusive, - boolean reverse, - RequestType.@NotNull RequestGetRange requestType, - long timeoutMs) throws it.cavallium.rockserver.core.common.RocksDBException { + public T reduceRange(Arena arena, + long transactionId, + long columnId, + @Nullable Keys startKeysInclusive, + @Nullable Keys endKeysExclusive, + boolean reverse, + RequestType.@NotNull RequestGetRange requestType, + long timeoutMs) throws it.cavallium.rockserver.core.common.RocksDBException { ops.beginOp(); try { var col = getColumn(columnId); diff --git a/src/main/java/it/cavallium/rockserver/core/server/GrpcServer.java b/src/main/java/it/cavallium/rockserver/core/server/GrpcServer.java index 8d3a550..b5941b3 100644 --- a/src/main/java/it/cavallium/rockserver/core/server/GrpcServer.java +++ b/src/main/java/it/cavallium/rockserver/core/server/GrpcServer.java @@ -706,7 +706,7 @@ public class GrpcServer extends Server { try { try (var arena = Arena.ofConfined()) { it.cavallium.rockserver.core.common.FirstAndLast firstAndLast - = api.getRange(arena, + = api.reduceRange(arena, request.getTransactionId(), request.getColumnId(), mapKeys(arena, request.getStartKeysInclusiveCount(), request::getStartKeysInclusive), diff --git a/src/main/proto/rocksdb.proto b/src/main/proto/rocksdb.proto index 4c59c15..e14855b 100644 --- a/src/main/proto/rocksdb.proto +++ b/src/main/proto/rocksdb.proto @@ -125,4 +125,5 @@ service RocksDBService { rpc subsequentExists(SubsequentRequest) returns (PreviousPresence); rpc subsequentMultiGet(SubsequentRequest) returns (stream KV); rpc getRangeFirstAndLast(GetRangeRequest) returns (FirstAndLast); + rpc getRangeStream(GetRangeRequest) returns (stream KV); } diff --git a/src/test/java/it/cavallium/rockserver/core/impl/test/EmbeddedDBTest.java b/src/test/java/it/cavallium/rockserver/core/impl/test/EmbeddedDBTest.java index 4510f91..1a311a7 100644 --- a/src/test/java/it/cavallium/rockserver/core/impl/test/EmbeddedDBTest.java +++ b/src/test/java/it/cavallium/rockserver/core/impl/test/EmbeddedDBTest.java @@ -384,23 +384,27 @@ abstract class EmbeddedDBTest { } @Test - void getRangeFirstAndLast() { + void reduceRangeFirstAndLast() { var firstKey = getKVSequenceFirst().keys(); var lastKey = getKVSequenceLast().keys(); var prevLastKV = getKVSequence().get(getKVSequence().size() - 2); if (getSchemaVarKeys().isEmpty()) { - FirstAndLast firstAndLast = db.getRange(arena, 0, colId, firstKey, lastKey, false, RequestType.firstAndLast(), 1000); + FirstAndLast firstAndLast = db.reduceRange(arena, 0, colId, firstKey, lastKey, false, RequestType.firstAndLast(), 1000); Assertions.assertNull(firstAndLast.first(), "First should be empty because the db is empty"); Assertions.assertNull(firstAndLast.last(), "Last should be empty because the db is empty"); fillSomeKeys(); - firstAndLast = db.getRange(arena, 0, colId, firstKey, lastKey, false, RequestType.firstAndLast(), 1000); + firstAndLast = db.reduceRange(arena, 0, colId, firstKey, lastKey, false, RequestType.firstAndLast(), 1000); Assertions.assertEquals(getKVSequenceFirst(), firstAndLast.first(), "First key mismatch"); Assertions.assertEquals(prevLastKV, firstAndLast.last(), "Last key mismatch"); + + firstAndLast = db.reduceRange(arena, 0, colId, firstKey, firstKey, false, RequestType.firstAndLast(), 1000); + Assertions.assertNull(firstAndLast.first(), "First should be empty because the range is empty"); + Assertions.assertNull(firstAndLast.last(), "Last should be empty because the range is empty"); } else { Assertions.assertThrowsExactly(RocksDBException.class, () -> { - db.getRange(arena, 0, colId, firstKey, lastKey, false, RequestType.firstAndLast(), 1000); + db.reduceRange(arena, 0, colId, firstKey, lastKey, false, RequestType.firstAndLast(), 1000); }); } }