From b8b552cb1853e996086b87aef6da0e60bd053080 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Mon, 21 Oct 2024 13:22:31 +0200 Subject: [PATCH] Add GetAllInRange, RequestReduceRange, use reactive grpc server --- pom.xml | 39 + src/fatjar/java/module-info.java | 3 +- src/library/java/module-info.java | 3 +- .../core/client/EmbeddedConnection.java | 4 +- .../core/client/GrpcConnection.java | 26 +- .../rockserver/core/client/LoggingClient.java | 52 +- .../rockserver/core/common/RequestType.java | 20 +- .../core/common/RocksDBAPICommand.java | 2 +- .../core/common/RocksDBAsyncAPI.java | 3 +- .../core/common/RocksDBSyncAPI.java | 2 +- .../rockserver/core/impl/EmbeddedDB.java | 2 +- .../rockserver/core/server/GrpcServer.java | 863 +++++++----------- src/main/proto/rocksdb.proto | 4 +- src/native/java/module-info.java | 3 +- 14 files changed, 450 insertions(+), 576 deletions(-) diff --git a/pom.xml b/pom.xml index 63fd1a2..3181318 100644 --- a/pom.xml +++ b/pom.xml @@ -119,6 +119,12 @@ io.grpc grpc-netty ${grpc.version} + + + com.google.code.findbugs + jsr305 + + io.netty @@ -140,11 +146,23 @@ io.grpc grpc-protobuf ${grpc.version} + + + com.google.code.findbugs + jsr305 + + io.grpc grpc-stub ${grpc.version} + + + com.google.code.findbugs + jsr305 + + org.reactivestreams @@ -156,6 +174,18 @@ reactor-core 3.6.4 + + com.salesforce.servicelibs + reactor-grpc-stub + 1.2.4 + provided + + + com.google.code.findbugs + jsr305 + + + org.lz4 @@ -244,6 +274,15 @@ com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier} grpc-java io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier} + + + reactor-grpc + com.salesforce.servicelibs + reactor-grpc + 1.2.4 + com.salesforce.reactorgrpc.ReactorGrpcGenerator + + diff --git a/src/fatjar/java/module-info.java b/src/fatjar/java/module-info.java index cef924d..8827a36 100644 --- a/src/fatjar/java/module-info.java +++ b/src/fatjar/java/module-info.java @@ -13,7 +13,6 @@ module rockserver.core { requires io.grpc.protobuf; requires io.grpc.stub; requires io.grpc; - requires jsr305; requires com.google.common; requires io.grpc.netty; requires io.jstach.rainbowgum; @@ -31,6 +30,8 @@ module rockserver.core { requires org.reactivestreams; requires io.netty.transport.unix.common; requires reactor.core; + requires reactor.grpc.stub; + requires java.annotation; exports it.cavallium.rockserver.core.client; exports it.cavallium.rockserver.core.common; diff --git a/src/library/java/module-info.java b/src/library/java/module-info.java index bb8c172..626ad40 100644 --- a/src/library/java/module-info.java +++ b/src/library/java/module-info.java @@ -13,7 +13,6 @@ module rockserver.core { requires io.grpc.protobuf; requires io.grpc.stub; requires io.grpc; - requires jsr305; requires com.google.common; requires io.grpc.netty; requires io.netty.common; @@ -28,6 +27,8 @@ module rockserver.core { requires io.netty.transport.classes.epoll; requires org.reactivestreams; requires io.netty.transport.unix.common; + requires reactor.grpc.stub; + requires java.annotation; exports it.cavallium.rockserver.core.client; exports it.cavallium.rockserver.core.common; diff --git a/src/main/java/it/cavallium/rockserver/core/client/EmbeddedConnection.java b/src/main/java/it/cavallium/rockserver/core/client/EmbeddedConnection.java index 67693c3..e289b60 100644 --- a/src/main/java/it/cavallium/rockserver/core/client/EmbeddedConnection.java +++ b/src/main/java/it/cavallium/rockserver/core/client/EmbeddedConnection.java @@ -88,7 +88,7 @@ public class EmbeddedConnection extends BaseConnection implements RocksDBAPI { } @Override - public RS requestSync(RocksDBAPICommand req) { + public RS requestSync(RocksDBAPICommand req) { return req.handleSync(this); } @@ -176,7 +176,7 @@ public class EmbeddedConnection extends BaseConnection implements RocksDBAPI { } @Override - public T reduceRange(Arena arena, long transactionId, long columnId, @Nullable Keys startKeysInclusive, @Nullable Keys endKeysExclusive, boolean reverse, RequestType.@NotNull RequestGetRange requestType, long timeoutMs) throws RocksDBException { + public T reduceRange(Arena arena, long transactionId, long columnId, @Nullable Keys startKeysInclusive, @Nullable Keys endKeysExclusive, boolean reverse, RequestType.@NotNull RequestReduceRange requestType, long timeoutMs) throws RocksDBException { return db.reduceRange(arena, transactionId, columnId, startKeysInclusive, endKeysExclusive, reverse, requestType, timeoutMs); } } diff --git a/src/main/java/it/cavallium/rockserver/core/client/GrpcConnection.java b/src/main/java/it/cavallium/rockserver/core/client/GrpcConnection.java index 33b2aa8..45461ea 100644 --- a/src/main/java/it/cavallium/rockserver/core/client/GrpcConnection.java +++ b/src/main/java/it/cavallium/rockserver/core/client/GrpcConnection.java @@ -73,6 +73,7 @@ public class GrpcConnection extends BaseConnection implements RocksDBAPI { private final RocksDBServiceBlockingStub blockingStub; private final RocksDBServiceStub asyncStub; private final RocksDBServiceFutureStub futureStub; + private final ReactorRocksDBServiceGrpc.ReactorRocksDBServiceStub reactiveStub; private final URI address; private GrpcConnection(String name, SocketAddress socketAddress, URI address) { @@ -102,6 +103,7 @@ public class GrpcConnection extends BaseConnection implements RocksDBAPI { this.blockingStub = RocksDBServiceGrpc.newBlockingStub(channel); this.asyncStub = RocksDBServiceGrpc.newStub(channel); this.futureStub = RocksDBServiceGrpc.newFutureStub(channel); + this.reactiveStub = ReactorRocksDBServiceGrpc.newReactorStub(channel); this.address = address; } @@ -515,8 +517,8 @@ public class GrpcConnection extends BaseConnection implements RocksDBAPI { } @SuppressWarnings("unchecked") - @Override - public CompletableFuture reduceRangeAsync(Arena arena, long transactionId, long columnId, @NotNull Keys startKeysInclusive, @Nullable Keys endKeysExclusive, boolean reverse, RequestType.RequestGetRange requestType, long timeoutMs) throws RocksDBException { + @Override + public CompletableFuture reduceRangeAsync(Arena arena, long transactionId, long columnId, @Nullable Keys startKeysInclusive, @Nullable Keys endKeysExclusive, boolean reverse, RequestType.RequestReduceRange requestType, long timeoutMs) throws RocksDBException { var request = GetRangeRequest.newBuilder() .setTransactionId(transactionId) .setColumnId(columnId) @@ -527,16 +529,28 @@ public class GrpcConnection extends BaseConnection implements RocksDBAPI { .build(); return (CompletableFuture) switch (requestType) { case RequestType.RequestGetFirstAndLast _ -> - toResponse(this.futureStub.getRangeFirstAndLast(request), result -> new FirstAndLast<>( + toResponse(this.futureStub.reduceRangeFirstAndLast(request), result -> new FirstAndLast<>( result.hasFirst() ? mapKV(arena, result.getFirst()) : null, result.hasLast() ? mapKV(arena, result.getLast()) : null )); }; } - @Override - public Publisher getRangeStream(Arena arena, long transactionId, long columnId, @Nullable Keys startKeysInclusive, @Nullable Keys endKeysExclusive, boolean reverse, RequestType.RequestGetRange requestType, long timeoutMs) throws RocksDBException { - // todo: implement + @SuppressWarnings("unchecked") + @Override + public Publisher getRangeAsync(Arena arena, long transactionId, long columnId, @Nullable Keys startKeysInclusive, @Nullable Keys endKeysExclusive, boolean reverse, RequestType.RequestGetRange requestType, long timeoutMs) throws RocksDBException { + var request = GetRangeRequest.newBuilder() + .setTransactionId(transactionId) + .setColumnId(columnId) + .addAllStartKeysInclusive(mapKeys(startKeysInclusive)) + .addAllEndKeysExclusive(mapKeys(endKeysExclusive)) + .setReverse(reverse) + .setTimeoutMs(timeoutMs) + .build(); + return (Publisher) switch (requestType) { + case RequestType.RequestGetAllInRange _ -> reactiveStub.getAllInRange(request) + .map(kv -> mapKV(arena, kv)); + }; } private static it.cavallium.rockserver.core.common.Delta mapDelta(Delta x) { diff --git a/src/main/java/it/cavallium/rockserver/core/client/LoggingClient.java b/src/main/java/it/cavallium/rockserver/core/client/LoggingClient.java index 8966efc..c6c190c 100644 --- a/src/main/java/it/cavallium/rockserver/core/client/LoggingClient.java +++ b/src/main/java/it/cavallium/rockserver/core/client/LoggingClient.java @@ -6,11 +6,12 @@ import it.cavallium.rockserver.core.common.RocksDBSyncAPI; import java.io.IOException; import java.net.URI; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionStage; -import java.util.function.Supplier; -import java.util.logging.Level; + +import org.reactivestreams.Publisher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.slf4j.event.Level; +import reactor.core.publisher.Flux; public class LoggingClient implements RocksDBConnection { @@ -55,9 +56,9 @@ public class LoggingClient implements RocksDBConnection { } @Override - public R requestSync(RocksDBAPICommand req) { + public SYNC_RESULT requestSync(RocksDBAPICommand req) { logger.trace("Request input (sync): {}", req); - R result; + SYNC_RESULT result; try { result = syncApi.requestSync(req); } catch (Throwable e) { @@ -77,16 +78,37 @@ public class LoggingClient implements RocksDBConnection { this.asyncApi = asyncApi; } - @Override - public CompletableFuture requestAsync(RocksDBAPICommand req) { - logger.trace("Request input (async): {}", req); - return asyncApi.requestAsync(req).whenComplete((result, e) -> { - if (e != null) { - logger.trace("Request failed: {} Error: {}", req, e.getMessage()); - } else { - logger.trace("Request executed: {} Result: {}", req, result); - } - }); + @SuppressWarnings("unchecked") + @Override + public ASYNC_RESULT requestAsync(RocksDBAPICommand req) { + if (!logger.isEnabledForLevel(Level.TRACE)) { + return asyncApi.requestAsync(req); + } else { + logger.trace("Request input (async): {}", req); + var r = asyncApi.requestAsync(req); + return switch (req) { + case RocksDBAPICommand.RocksDBAPICommandSingle _ -> + (ASYNC_RESULT) ((CompletableFuture) r).whenComplete((result, e) -> { + if (e != null) { + logger.trace("Request failed: {} Error: {}", req, e.getMessage()); + } else { + logger.trace("Request executed: {} Result: {}", req, result); + } + }); + case RocksDBAPICommand.RocksDBAPICommandStream _ -> + (ASYNC_RESULT) Flux.from((Publisher) r).doOnEach(signal -> { + if (signal.isOnNext()) { + logger.trace("Request: {} Partial result: {}", req, signal); + } else if (signal.isOnError()) { + var e = signal.getThrowable(); + assert e != null; + logger.trace("Request failed: {} Error: {}", req, e.getMessage()); + } else if (signal.isOnComplete()) { + logger.trace("Request executed: {} Result: terminated successfully", req); + } + }); + }; + } } } } diff --git a/src/main/java/it/cavallium/rockserver/core/common/RequestType.java b/src/main/java/it/cavallium/rockserver/core/common/RequestType.java index e5a21f9..73b9a57 100644 --- a/src/main/java/it/cavallium/rockserver/core/common/RequestType.java +++ b/src/main/java/it/cavallium/rockserver/core/common/RequestType.java @@ -1,7 +1,6 @@ package it.cavallium.rockserver.core.common; import java.util.List; -import java.util.concurrent.CompletableFuture; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -99,6 +98,11 @@ public sealed interface RequestType { return (RequestGetFirstAndLast) RequestGetFirstAndLast.INSTANCE; } + @SuppressWarnings("unchecked") + static RequestGetAllInRange allInRange() { + return (RequestGetAllInRange) RequestGetAllInRange.INSTANCE; + } + @SuppressWarnings("unchecked") static RequestNothing none() { return (RequestNothing) RequestNothing.INSTANCE; @@ -110,6 +114,8 @@ public sealed interface RequestType { sealed interface RequestGet extends RequestType {} + sealed interface RequestReduceRange extends RequestType {} + sealed interface RequestGetRange extends RequestType {} sealed interface RequestIterate extends RequestType {} @@ -205,7 +211,7 @@ public sealed interface RequestType { } } - record RequestGetFirstAndLast() implements RequestGetRange> { + record RequestGetFirstAndLast() implements RequestReduceRange> { private static final RequestGetFirstAndLast INSTANCE = new RequestGetFirstAndLast<>(); @@ -214,4 +220,14 @@ public sealed interface RequestType { return RequestTypeId.FIRST_AND_LAST; } } + + record RequestGetAllInRange() implements RequestGetRange> { + + private static final RequestGetAllInRange INSTANCE = new RequestGetAllInRange<>(); + + @Override + public RequestTypeId getRequestTypeId() { + return RequestTypeId.FIRST_AND_LAST; + } + } } diff --git a/src/main/java/it/cavallium/rockserver/core/common/RocksDBAPICommand.java b/src/main/java/it/cavallium/rockserver/core/common/RocksDBAPICommand.java index 2600f99..9479b1f 100644 --- a/src/main/java/it/cavallium/rockserver/core/common/RocksDBAPICommand.java +++ b/src/main/java/it/cavallium/rockserver/core/common/RocksDBAPICommand.java @@ -410,7 +410,7 @@ public sealed interface RocksDBAPICommand requestType, + RequestType.RequestReduceRange requestType, long timeoutMs) implements RocksDBAPICommandSingle { @Override diff --git a/src/main/java/it/cavallium/rockserver/core/common/RocksDBAsyncAPI.java b/src/main/java/it/cavallium/rockserver/core/common/RocksDBAsyncAPI.java index 8e13ecb..fabeae3 100644 --- a/src/main/java/it/cavallium/rockserver/core/common/RocksDBAsyncAPI.java +++ b/src/main/java/it/cavallium/rockserver/core/common/RocksDBAsyncAPI.java @@ -17,6 +17,7 @@ import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSi import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.PutBatch; import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.SeekTo; import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandSingle.Subsequent; +import it.cavallium.rockserver.core.common.RocksDBAPICommand.RocksDBAPICommandStream.GetRange; import java.lang.foreign.Arena; import java.lang.foreign.MemorySegment; import java.util.List; @@ -137,7 +138,7 @@ public interface RocksDBAsyncAPI extends RocksDBAsyncAPIRequestHandler { @Nullable Keys startKeysInclusive, @Nullable Keys endKeysExclusive, boolean reverse, - RequestType.RequestGetRange requestType, + RequestType.RequestReduceRange requestType, long timeoutMs) throws RocksDBException { return requestAsync(new ReduceRange<>(arena, transactionId, diff --git a/src/main/java/it/cavallium/rockserver/core/common/RocksDBSyncAPI.java b/src/main/java/it/cavallium/rockserver/core/common/RocksDBSyncAPI.java index 5427a98..0c5478b 100644 --- a/src/main/java/it/cavallium/rockserver/core/common/RocksDBSyncAPI.java +++ b/src/main/java/it/cavallium/rockserver/core/common/RocksDBSyncAPI.java @@ -131,7 +131,7 @@ public interface RocksDBSyncAPI extends RocksDBSyncAPIRequestHandler { @Nullable Keys startKeysInclusive, @Nullable Keys endKeysExclusive, boolean reverse, - @NotNull RequestType.RequestGetRange requestType, + @NotNull RequestType.RequestReduceRange requestType, long timeoutMs) throws RocksDBException { return requestSync(new ReduceRange<>(arena, transactionId, columnId, startKeysInclusive, endKeysExclusive, reverse, requestType, timeoutMs)); } diff --git a/src/main/java/it/cavallium/rockserver/core/impl/EmbeddedDB.java b/src/main/java/it/cavallium/rockserver/core/impl/EmbeddedDB.java index 06f9cb4..e808e22 100644 --- a/src/main/java/it/cavallium/rockserver/core/impl/EmbeddedDB.java +++ b/src/main/java/it/cavallium/rockserver/core/impl/EmbeddedDB.java @@ -955,7 +955,7 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable { @Nullable Keys startKeysInclusive, @Nullable Keys endKeysExclusive, boolean reverse, - RequestType.@NotNull RequestGetRange requestType, + RequestType.@NotNull RequestReduceRange requestType, long timeoutMs) throws it.cavallium.rockserver.core.common.RocksDBException { ops.beginOp(); try { diff --git a/src/main/java/it/cavallium/rockserver/core/server/GrpcServer.java b/src/main/java/it/cavallium/rockserver/core/server/GrpcServer.java index b5941b3..a44d4cc 100644 --- a/src/main/java/it/cavallium/rockserver/core/server/GrpcServer.java +++ b/src/main/java/it/cavallium/rockserver/core/server/GrpcServer.java @@ -48,8 +48,10 @@ import java.io.IOException; import java.lang.foreign.Arena; import java.lang.foreign.MemorySegment; import java.net.SocketAddress; +import java.time.Duration; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.Callable; import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -62,6 +64,11 @@ import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import reactor.core.publisher.Flux; +import reactor.core.publisher.FluxSink; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Scheduler; +import reactor.core.scheduler.Schedulers; public class GrpcServer extends Server { @@ -69,7 +76,7 @@ public class GrpcServer extends Server { private final GrpcServerImpl grpc; private final EventLoopGroup elg; - private final ExecutorService executor; + private final Scheduler executor; private final io.grpc.Server server; public GrpcServer(RocksDBConnection client, SocketAddress socketAddress) throws IOException { @@ -85,7 +92,7 @@ public class GrpcServer extends Server { channelType = NioServerSocketChannel.class; } this.elg = elg; - this.executor = Executors.newWorkStealingPool(Runtime.getRuntime().availableProcessors() * 2); + this.executor = Schedulers.newBoundedElastic(Runtime.getRuntime().availableProcessors() * 2, Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, "server-db-executor"); this.server = NettyServerBuilder .forAddress(socketAddress) .bossEventLoopGroup(elg) @@ -104,7 +111,7 @@ public class GrpcServer extends Server { server.start(); } - private final class GrpcServerImpl extends RocksDBServiceImplBase { + private final class GrpcServerImpl extends ReactorRocksDBServiceGrpc.RocksDBServiceImplBase { private final RocksDBAsyncAPI asyncApi; private final RocksDBSyncAPI api; @@ -116,360 +123,185 @@ public class GrpcServer extends Server { // functions + @Override - public void openTransaction(OpenTransactionRequest request, - StreamObserver responseObserver) { - executor.execute(() -> { - try { - var txId = api.openTransaction(request.getTimeoutMs()); - responseObserver.onNext(OpenTransactionResponse.newBuilder().setTransactionId(txId).build()); - responseObserver.onCompleted(); - } catch (Throwable ex) { - handleError(responseObserver, ex); - } + public Mono openTransaction(OpenTransactionRequest request) { + return executeSync(() -> { + var txId = api.openTransaction(request.getTimeoutMs()); + return OpenTransactionResponse.newBuilder().setTransactionId(txId).build(); }); } @Override - public void closeTransaction(CloseTransactionRequest request, - StreamObserver responseObserver) { - executor.execute(() -> { - try { - var committed = api.closeTransaction(request.getTransactionId(), request.getCommit()); - var response = CloseTransactionResponse.newBuilder().setSuccessful(committed).build(); - responseObserver.onNext(response); - responseObserver.onCompleted(); - } catch (Throwable ex) { - handleError(responseObserver, ex); - } + public Mono closeTransaction(CloseTransactionRequest request) { + return executeSync(() -> { + var committed = api.closeTransaction(request.getTransactionId(), request.getCommit()); + return CloseTransactionResponse.newBuilder().setSuccessful(committed).build(); }); } @Override - public void closeFailedUpdate(CloseFailedUpdateRequest request, StreamObserver responseObserver) { - executor.execute(() -> { - try { - api.closeFailedUpdate(request.getUpdateId()); - responseObserver.onNext(Empty.getDefaultInstance()); - responseObserver.onCompleted(); - } catch (Throwable ex) { - handleError(responseObserver, ex); - } + public Mono closeFailedUpdate(CloseFailedUpdateRequest request) { + return executeSync(() -> { + api.closeFailedUpdate(request.getUpdateId()); + return Empty.getDefaultInstance(); }); } @Override - public void createColumn(CreateColumnRequest request, StreamObserver responseObserver) { - executor.execute(() -> { - try { - var colId = api.createColumn(request.getName(), mapColumnSchema(request.getSchema())); - var response = CreateColumnResponse.newBuilder().setColumnId(colId).build(); - responseObserver.onNext(response); - responseObserver.onCompleted(); - } catch (Throwable ex) { - handleError(responseObserver, ex); - } + public Mono createColumn(CreateColumnRequest request) { + return executeSync(() -> { + var colId = api.createColumn(request.getName(), mapColumnSchema(request.getSchema())); + return CreateColumnResponse.newBuilder().setColumnId(colId).build(); }); } @Override - public void deleteColumn(DeleteColumnRequest request, StreamObserver responseObserver) { - executor.execute(() -> { - try { - api.deleteColumn(request.getColumnId()); - responseObserver.onNext(Empty.getDefaultInstance()); - responseObserver.onCompleted(); - } catch (Throwable ex) { - handleError(responseObserver, ex); - } + public Mono deleteColumn(DeleteColumnRequest request) { + return executeSync(() -> { + api.deleteColumn(request.getColumnId()); + return Empty.getDefaultInstance(); }); } @Override - public void getColumnId(GetColumnIdRequest request, StreamObserver responseObserver) { - executor.execute(() -> { - try { - var colId = api.getColumnId(request.getName()); - var response = GetColumnIdResponse.newBuilder().setColumnId(colId).build(); - responseObserver.onNext(response); - responseObserver.onCompleted(); - } catch (Throwable ex) { - handleError(responseObserver, ex); - } + public Mono getColumnId(GetColumnIdRequest request) { + return executeSync(() -> { + var colId = api.getColumnId(request.getName()); + return GetColumnIdResponse.newBuilder().setColumnId(colId).build(); }); } @Override - public void put(PutRequest request, StreamObserver responseObserver) { - executor.execute(() -> { - try { - try (var arena = Arena.ofConfined()) { - api.put(arena, - request.getTransactionOrUpdateId(), - request.getColumnId(), - mapKeys(arena, request.getData().getKeysCount(), request.getData()::getKeys), - toMemorySegment(arena, request.getData().getValue()), - new RequestNothing<>() - ); - } - responseObserver.onNext(Empty.getDefaultInstance()); - responseObserver.onCompleted(); - } catch (Throwable ex) { - handleError(responseObserver, ex); + public Mono put(PutRequest request) { + return executeSync(() -> { + try (var arena = Arena.ofConfined()) { + api.put(arena, + request.getTransactionOrUpdateId(), + request.getColumnId(), + mapKeys(arena, request.getData().getKeysCount(), request.getData()::getKeys), + toMemorySegment(arena, request.getData().getValue()), + new RequestNothing<>() + ); } + return Empty.getDefaultInstance(); }); } @Override - public StreamObserver putBatch(StreamObserver responseObserver) { - final ServerCallStreamObserver serverCallStreamObserver = - (ServerCallStreamObserver) responseObserver; - serverCallStreamObserver.disableAutoRequest(); - serverCallStreamObserver.request(1); - var requestObserver = new StreamObserver() { - enum State { - BEFORE_INITIAL_REQUEST, - RECEIVING_DATA, - RECEIVED_ALL - } - private final ExecutorService sstExecutor = Executors.newSingleThreadExecutor(); - final AtomicInteger pendingRequests = new AtomicInteger(); - State state = State.BEFORE_INITIAL_REQUEST; - private PutBatchInitialRequest initialRequest; - private Subscriber putBatchInputsSubscriber; - @Override - public void onNext(PutBatchRequest putBatchRequest) { - if (state == State.BEFORE_INITIAL_REQUEST) { - if (!putBatchRequest.hasInitialRequest()) { - serverCallStreamObserver.onError(RocksDBException.of(RocksDBException.RocksDBErrorType.PUT_INVALID_REQUEST, "Missing initial request")); - } - - initialRequest = putBatchRequest.getInitialRequest(); - - try { - asyncApi.putBatchAsync(initialRequest.getColumnId(), - subscriber2 -> { - putBatchInputsSubscriber = subscriber2; - subscriber2.onSubscribe(new Subscription() { - @Override - public void request(long l) { - serverCallStreamObserver.request(Math.toIntExact(l)); - } - - @Override - public void cancel() { - serverCallStreamObserver.onError(new IOException("Cancelled")); - - } - }); - }, - switch (initialRequest.getMode()) { - case WRITE_BATCH -> PutBatchMode.WRITE_BATCH; - case WRITE_BATCH_NO_WAL -> PutBatchMode.WRITE_BATCH_NO_WAL; - case SST_INGESTION -> PutBatchMode.SST_INGESTION; - case SST_INGEST_BEHIND -> PutBatchMode.SST_INGEST_BEHIND; - case UNRECOGNIZED -> throw new UnsupportedOperationException("Unrecognized request \"mode\""); - } - ).whenComplete((_, ex) -> { - if (ex != null) { - handleError(serverCallStreamObserver, ex); - } else { - serverCallStreamObserver.onNext(Empty.getDefaultInstance()); - serverCallStreamObserver.onCompleted(); - } - }); - } catch (Throwable ex) { - handleError(serverCallStreamObserver, ex); - } - state = State.RECEIVING_DATA; - } else if (state == State.RECEIVING_DATA) { - pendingRequests.incrementAndGet(); - var kvBatch = putBatchRequest.getData(); - sstExecutor.execute(() -> { - try { - try (var arena = Arena.ofConfined()) { - putBatchInputsSubscriber.onNext(mapKVBatch(arena, kvBatch.getEntriesCount(), kvBatch::getEntries)); - } - checkCompleted(true); - } catch (Throwable ex) { - putBatchInputsSubscriber.onError(ex); - } - }); - } else { - serverCallStreamObserver.onError(RocksDBException.of(RocksDBException.RocksDBErrorType.PUT_INVALID_REQUEST, "Invalid request")); + public Mono putBatch(Flux request) { + return request.switchOnFirst((firstSignal, nextRequests) -> { + if (firstSignal.isOnNext()) { + var firstValue = firstSignal.get(); + assert firstValue != null; + if (!firstValue.hasInitialRequest()) { + return Mono.error(RocksDBException.of( + RocksDBException.RocksDBErrorType.PUT_INVALID_REQUEST, "Missing initial request")); } - } + var initialRequest = firstValue.getInitialRequest(); + var mode = switch (initialRequest.getMode()) { + case WRITE_BATCH -> PutBatchMode.WRITE_BATCH; + case WRITE_BATCH_NO_WAL -> PutBatchMode.WRITE_BATCH_NO_WAL; + case SST_INGESTION -> PutBatchMode.SST_INGESTION; + case SST_INGEST_BEHIND -> PutBatchMode.SST_INGEST_BEHIND; + case UNRECOGNIZED -> throw new UnsupportedOperationException("Unrecognized request \"mode\""); + }; - @Override - public void onError(Throwable throwable) { - sstExecutor.execute(() -> { - state = State.RECEIVED_ALL; - doFinally(); - if (putBatchInputsSubscriber != null) { - putBatchInputsSubscriber.onError(throwable); - } else { - serverCallStreamObserver.onError(throwable); - } + var batches = nextRequests.map(putBatchRequest -> { + var batch = putBatchRequest.getData(); + return mapKVBatch(Arena.ofAuto(), batch.getEntriesCount(), batch::getEntries); }); - } - @Override - public void onCompleted() { - sstExecutor.execute(() -> { - if (state == State.BEFORE_INITIAL_REQUEST) { - serverCallStreamObserver.onError(RocksDBException.of(RocksDBException.RocksDBErrorType.PUT_INVALID_REQUEST, "Missing initial request")); - } else if (state == State.RECEIVING_DATA) { - state = State.RECEIVED_ALL; - checkCompleted(false); - } else { - putBatchInputsSubscriber.onError(RocksDBException.of(RocksDBException.RocksDBErrorType.PUT_UNKNOWN_ERROR, "Unknown state during onComplete: " + state)); - } - }); + return Mono.fromFuture(asyncApi.putBatchAsync(initialRequest.getColumnId(), batches, mode)); + } else if (firstSignal.isOnComplete()) { + return Mono.just(RocksDBException.of( + RocksDBException.RocksDBErrorType.PUT_INVALID_REQUEST, "No initial request")); + } else { + return nextRequests; } - - private void checkCompleted(boolean requestDone) { - if ((requestDone ? pendingRequests.decrementAndGet() : pendingRequests.get()) == 0 - && state == State.RECEIVED_ALL) { - doFinally(); - putBatchInputsSubscriber.onComplete(); - } - } - - private void doFinally() { - sstExecutor.shutdown(); - } - }; - return requestObserver; + }).then(Mono.just(Empty.getDefaultInstance())); } @Override - public StreamObserver putMulti(StreamObserver responseObserver) { - return new StreamObserver<>() { - private boolean initialRequestDone = false; - private long requestsCount = 0; - private boolean requestsCountFinalized; - private final AtomicLong processedRequestsCount = new AtomicLong(); - private PutMultiInitialRequest initialRequest; - - @Override - public void onNext(PutMultiRequest request) { - switch (request.getPutMultiRequestTypeCase()) { - case INITIALREQUEST -> { - if (initialRequestDone) { - throw new UnsupportedOperationException("Initial request already done!"); - } - this.initialRequest = request.getInitialRequest(); - this.initialRequestDone = true; - } - case DATA -> { - if (!initialRequestDone) { - throw new UnsupportedOperationException("Initial request already done!"); - } - ++requestsCount; - executor.execute(() -> { - try { - try (var arena = Arena.ofConfined()) { - api.put(arena, - initialRequest.getTransactionOrUpdateId(), - initialRequest.getColumnId(), - mapKeys(arena, request.getData().getKeysCount(), request.getData()::getKeys), - toMemorySegment(arena, request.getData().getValue()), - new RequestNothing<>()); - } - } catch (RocksDBException ex) { - handleError(responseObserver, ex); - return; - } - - var newProcessedRequestCount = processedRequestsCount.incrementAndGet(); - if (requestsCountFinalized) { - if (newProcessedRequestCount == requestsCount) { - responseObserver.onNext(Empty.getDefaultInstance()); - responseObserver.onCompleted(); - } - } - }); - } - case null, default -> - throw new UnsupportedOperationException("Unsupported operation: " - + request.getPutMultiRequestTypeCase()); + public Mono putMulti(Flux request) { + return request.switchOnFirst((firstSignal, nextRequests) -> { + if (firstSignal.isOnNext()) { + var firstValue = firstSignal.get(); + assert firstValue != null; + if (!firstValue.hasInitialRequest()) { + return Mono.error(RocksDBException.of( + RocksDBException.RocksDBErrorType.PUT_INVALID_REQUEST, "Missing initial request")); } - } + var initialRequest = firstValue.getInitialRequest(); - @Override - public void onError(Throwable t) { - responseObserver.onError(t); + return nextRequests + .publishOn(executor) + .doOnNext(putRequest -> { + var data = putRequest.getData(); + try (var arena = Arena.ofConfined()) { + api.put(arena, + initialRequest.getTransactionOrUpdateId(), + initialRequest.getColumnId(), + mapKeys(arena, data.getKeysCount(), data::getKeys), + toMemorySegment(arena, data.getValue()), + new RequestNothing<>()); + } + }); + } else if (firstSignal.isOnComplete()) { + return Mono.just(RocksDBException.of( + RocksDBException.RocksDBErrorType.PUT_INVALID_REQUEST, "No initial request")); + } else { + return nextRequests; } - - @Override - public void onCompleted() { - requestsCountFinalized = true; - if (requestsCount == 0) { - responseObserver.onCompleted(); - } - } - }; + }).then(Mono.just(Empty.getDefaultInstance())); } @Override - public void putGetPrevious(PutRequest request, StreamObserver responseObserver) { - executor.execute(() -> { - try { - try (var arena = Arena.ofConfined()) { - var prev = api.put(arena, - request.getTransactionOrUpdateId(), - request.getColumnId(), - mapKeys(arena, request.getData().getKeysCount(), request.getData()::getKeys), - toMemorySegment(arena, request.getData().getValue()), - new RequestPrevious<>() - ); - var prevBuilder = Previous.newBuilder(); - if (prev != null) { - prevBuilder.setPrevious(ByteString.copyFrom(prev.asByteBuffer())); - } - var response = prevBuilder.build(); - responseObserver.onNext(response); - } - responseObserver.onCompleted(); - } catch (Throwable ex) { - handleError(responseObserver, ex); + public Mono putGetPrevious(PutRequest request) { + return executeSync(() -> { + try (var arena = Arena.ofConfined()) { + var prev = api.put(arena, + request.getTransactionOrUpdateId(), + request.getColumnId(), + mapKeys(arena, request.getData().getKeysCount(), request.getData()::getKeys), + toMemorySegment(arena, request.getData().getValue()), + new RequestPrevious<>() + ); + var prevBuilder = Previous.newBuilder(); + if (prev != null) { + prevBuilder.setPrevious(ByteString.copyFrom(prev.asByteBuffer())); + } + return prevBuilder.build(); } }); } @Override - public void putGetDelta(PutRequest request, StreamObserver responseObserver) { - executor.execute(() -> { - try { - try (var arena = Arena.ofConfined()) { - var delta = api.put(arena, - request.getTransactionOrUpdateId(), - request.getColumnId(), - mapKeys(arena, request.getData().getKeysCount(), request.getData()::getKeys), - toMemorySegment(arena, request.getData().getValue()), - new RequestDelta<>() - ); - var deltaBuilder = Delta.newBuilder(); - if (delta.previous() != null) { - deltaBuilder.setPrevious(ByteString.copyFrom(delta.previous().asByteBuffer())); - } - if (delta.current() != null) { - deltaBuilder.setCurrent(ByteString.copyFrom(delta.current().asByteBuffer())); - } - var response = deltaBuilder.build(); - responseObserver.onNext(response); - } - responseObserver.onCompleted(); - } catch (Throwable ex) { - handleError(responseObserver, ex); + public Mono putGetDelta(PutRequest request) { + return executeSync(() -> { + try (var arena = Arena.ofConfined()) { + var delta = api.put(arena, + request.getTransactionOrUpdateId(), + request.getColumnId(), + mapKeys(arena, request.getData().getKeysCount(), request.getData()::getKeys), + toMemorySegment(arena, request.getData().getValue()), + new RequestDelta<>() + ); + var deltaBuilder = Delta.newBuilder(); + if (delta.previous() != null) { + deltaBuilder.setPrevious(ByteString.copyFrom(delta.previous().asByteBuffer())); + } + if (delta.current() != null) { + deltaBuilder.setCurrent(ByteString.copyFrom(delta.current().asByteBuffer())); + } + return deltaBuilder.build(); } }); } @Override - public void putGetChanged(PutRequest request, StreamObserver responseObserver) { - executor.execute(() -> { - try { + public Mono putGetChanged(PutRequest request) { + return executeSync(() -> { try (var arena = Arena.ofConfined()) { var changed = api.put(arena, request.getTransactionOrUpdateId(), @@ -478,251 +310,188 @@ public class GrpcServer extends Server { toMemorySegment(arena, request.getData().getValue()), new RequestChanged<>() ); - var response = Changed.newBuilder().setChanged(changed).build(); - responseObserver.onNext(response); + return Changed.newBuilder().setChanged(changed).build(); } - responseObserver.onCompleted(); - } catch (Throwable ex) { - handleError(responseObserver, ex); + }); + } + + @Override + public Mono putGetPreviousPresence(PutRequest request) { + return executeSync(() -> { + try (var arena = Arena.ofConfined()) { + var present = api.put(arena, + request.getTransactionOrUpdateId(), + request.getColumnId(), + mapKeys(arena, request.getData().getKeysCount(), request.getData()::getKeys), + toMemorySegment(arena, request.getData().getValue()), + new RequestPreviousPresence<>() + ); + return PreviousPresence.newBuilder().setPresent(present).build(); } }); } @Override - public void putGetPreviousPresence(PutRequest request, StreamObserver responseObserver) { - executor.execute(() -> { - try { - try (var arena = Arena.ofConfined()) { - var present = api.put(arena, - request.getTransactionOrUpdateId(), - request.getColumnId(), - mapKeys(arena, request.getData().getKeysCount(), request.getData()::getKeys), - toMemorySegment(arena, request.getData().getValue()), - new RequestPreviousPresence<>() - ); - var response = PreviousPresence.newBuilder().setPresent(present).build(); - responseObserver.onNext(response); - } - responseObserver.onCompleted(); - } catch (Throwable ex) { - handleError(responseObserver, ex); - } - }); - } - - @Override - public void get(GetRequest request, StreamObserver responseObserver) { - executor.execute(() -> { - try { - try (var arena = Arena.ofConfined()) { - var current = api.get(arena, - request.getTransactionOrUpdateId(), - request.getColumnId(), - mapKeys(arena, request.getKeysCount(), request::getKeys), - new RequestCurrent<>() - ); - var responseBuilder = GetResponse.newBuilder(); - if (current != null) { - responseBuilder.setValue(ByteString.copyFrom(current.asByteBuffer())); - } - var response = responseBuilder.build(); - responseObserver.onNext(response); - } - responseObserver.onCompleted(); - } catch (Throwable ex) { - handleError(responseObserver, ex); - } - }); - } - - @Override - public void getForUpdate(GetRequest request, StreamObserver responseObserver) { - executor.execute(() -> { - try { - try (var arena = Arena.ofConfined()) { - var forUpdate = api.get(arena, - request.getTransactionOrUpdateId(), - request.getColumnId(), - mapKeys(arena, request.getKeysCount(), request::getKeys), - new RequestForUpdate<>() - ); - var responseBuilder = UpdateBegin.newBuilder(); - responseBuilder.setUpdateId(forUpdate.updateId()); - if (forUpdate.previous() != null) { - responseBuilder.setPrevious(ByteString.copyFrom(forUpdate.previous().asByteBuffer())); - } - var response = responseBuilder.build(); - responseObserver.onNext(response); - } - responseObserver.onCompleted(); - } catch (Throwable ex) { - handleError(responseObserver, ex); - } - }); - } - - @Override - public void exists(GetRequest request, StreamObserver responseObserver) { - executor.execute(() -> { - try { - try (var arena = Arena.ofConfined()) { - var exists = api.get(arena, - request.getTransactionOrUpdateId(), - request.getColumnId(), - mapKeys(arena, request.getKeysCount(), request::getKeys), - new RequestExists<>() - ); - responseObserver.onNext(PreviousPresence.newBuilder().setPresent(exists).build()); - } - responseObserver.onCompleted(); - } catch (Throwable ex) { - handleError(responseObserver, ex); - } - }); - } - - @Override - public void openIterator(OpenIteratorRequest request, StreamObserver responseObserver) { - executor.execute(() -> { - try { - try (var arena = Arena.ofConfined()) { - var iteratorId = api.openIterator(arena, - request.getTransactionId(), - request.getColumnId(), - mapKeys(arena, request.getStartKeysInclusiveCount(), request::getStartKeysInclusive), - mapKeys(arena, request.getEndKeysExclusiveCount(), request::getEndKeysExclusive), - request.getReverse(), - request.getTimeoutMs() - ); - responseObserver.onNext(OpenIteratorResponse.newBuilder().setIteratorId(iteratorId).build()); - } - responseObserver.onCompleted(); - } catch (Throwable ex) { - handleError(responseObserver, ex); - } - }); - } - - @Override - public void closeIterator(CloseIteratorRequest request, StreamObserver responseObserver) { - executor.execute(() -> { - try { - api.closeIterator(request.getIteratorId()); - responseObserver.onNext(Empty.getDefaultInstance()); - responseObserver.onCompleted(); - } catch (Throwable ex) { - handleError(responseObserver, ex); - } - }); - } - - @Override - public void seekTo(SeekToRequest request, StreamObserver responseObserver) { - executor.execute(() -> { - try { - try (var arena = Arena.ofConfined()) { - api.seekTo(arena, request.getIterationId(), mapKeys(arena, request.getKeysCount(), request::getKeys)); - } - responseObserver.onNext(Empty.getDefaultInstance()); - responseObserver.onCompleted(); - } catch (Throwable ex) { - handleError(responseObserver, ex); - } - }); - } - - @Override - public void subsequent(SubsequentRequest request, StreamObserver responseObserver) { - executor.execute(() -> { - try { - try (var arena = Arena.ofConfined()) { - api.subsequent(arena, request.getIterationId(), - request.getSkipCount(), - request.getTakeCount(), - new RequestNothing<>()); - } - responseObserver.onNext(Empty.getDefaultInstance()); - responseObserver.onCompleted(); - } catch (Throwable ex) { - handleError(responseObserver, ex); - } - }); - } - - @Override - public void subsequentExists(SubsequentRequest request, StreamObserver responseObserver) { - executor.execute(() -> { - try { - try (var arena = Arena.ofConfined()) { - var exists = api.subsequent(arena, request.getIterationId(), - request.getSkipCount(), - request.getTakeCount(), - new RequestExists<>()); - var response = PreviousPresence.newBuilder().setPresent(exists).build(); - responseObserver.onNext(response); - } - responseObserver.onCompleted(); - } catch (Throwable ex) { - handleError(responseObserver, ex); - } - }); - } - - @Override - public void subsequentMultiGet(SubsequentRequest request, StreamObserver responseObserver) { - executor.execute(() -> { - try { - try (var arena = Arena.ofConfined()) { - int pageIndex = 0; - final long pageSize = 16L; - while (request.getTakeCount() > pageIndex * pageSize) { - var response = api.subsequent(arena, - request.getIterationId(), - pageIndex == 0 ? request.getSkipCount() : 0, - Math.min(request.getTakeCount() - pageIndex * pageSize, pageSize), - new RequestMulti<>() - ); - for (MemorySegment entry : response) { - Keys keys = null; // todo: implement - MemorySegment value = entry; - responseObserver.onNext(KV.newBuilder() - .addAllKeys(null) // todo: implement - .setValue(ByteString.copyFrom(value.asByteBuffer())) - .build()); - } - pageIndex++; - } - } - responseObserver.onCompleted(); - } catch (Throwable ex) { - handleError(responseObserver, ex); - } - }); - } - - @Override - public void getRangeFirstAndLast(GetRangeRequest request, StreamObserver responseObserver) { - executor.execute(() -> { - try { - try (var arena = Arena.ofConfined()) { - it.cavallium.rockserver.core.common.FirstAndLast firstAndLast - = api.reduceRange(arena, - request.getTransactionId(), - request.getColumnId(), - mapKeys(arena, request.getStartKeysInclusiveCount(), request::getStartKeysInclusive), - mapKeys(arena, request.getEndKeysExclusiveCount(), request::getEndKeysExclusive), - request.getReverse(), - RequestType.firstAndLast(), - request.getTimeoutMs() - ); - responseObserver.onNext(FirstAndLast.newBuilder() - .setFirst(unmapKV(firstAndLast.first())) - .setLast(unmapKV(firstAndLast.last())) - .build()); + public Mono get(GetRequest request) { + return executeSync(() -> { + try (var arena = Arena.ofConfined()) { + var current = api.get(arena, + request.getTransactionOrUpdateId(), + request.getColumnId(), + mapKeys(arena, request.getKeysCount(), request::getKeys), + new RequestCurrent<>() + ); + var responseBuilder = GetResponse.newBuilder(); + if (current != null) { + responseBuilder.setValue(ByteString.copyFrom(current.asByteBuffer())); } - responseObserver.onCompleted(); - } catch (Throwable ex) { - handleError(responseObserver, ex); + return responseBuilder.build(); + } + }); + } + + @Override + public Mono getForUpdate(GetRequest request) { + return executeSync(() -> { + try (var arena = Arena.ofConfined()) { + var forUpdate = api.get(arena, + request.getTransactionOrUpdateId(), + request.getColumnId(), + mapKeys(arena, request.getKeysCount(), request::getKeys), + new RequestForUpdate<>() + ); + var responseBuilder = UpdateBegin.newBuilder(); + responseBuilder.setUpdateId(forUpdate.updateId()); + if (forUpdate.previous() != null) { + responseBuilder.setPrevious(ByteString.copyFrom(forUpdate.previous().asByteBuffer())); + } + return responseBuilder.build(); + } + }); + } + + @Override + public Mono exists(GetRequest request) { + return executeSync(() -> { + try (var arena = Arena.ofConfined()) { + var exists = api.get(arena, + request.getTransactionOrUpdateId(), + request.getColumnId(), + mapKeys(arena, request.getKeysCount(), request::getKeys), + new RequestExists<>() + ); + return PreviousPresence.newBuilder().setPresent(exists).build(); + } + }); + } + + @Override + public Mono openIterator(OpenIteratorRequest request) { + return executeSync(() -> { + try (var arena = Arena.ofConfined()) { + var iteratorId = api.openIterator(arena, + request.getTransactionId(), + request.getColumnId(), + mapKeys(arena, request.getStartKeysInclusiveCount(), request::getStartKeysInclusive), + mapKeys(arena, request.getEndKeysExclusiveCount(), request::getEndKeysExclusive), + request.getReverse(), + request.getTimeoutMs() + ); + return OpenIteratorResponse.newBuilder().setIteratorId(iteratorId).build(); + } + }); + } + + @Override + public Mono closeIterator(CloseIteratorRequest request) { + return executeSync(() -> { + api.closeIterator(request.getIteratorId()); + return Empty.getDefaultInstance(); + }); + } + + @Override + public Mono seekTo(SeekToRequest request) { + return executeSync(() -> { + try (var arena = Arena.ofConfined()) { + api.seekTo(arena, request.getIterationId(), mapKeys(arena, request.getKeysCount(), request::getKeys)); + } + return Empty.getDefaultInstance(); + }); + } + + @Override + public Mono subsequent(SubsequentRequest request) { + return executeSync(() -> { + try (var arena = Arena.ofConfined()) { + api.subsequent(arena, request.getIterationId(), + request.getSkipCount(), + request.getTakeCount(), + new RequestNothing<>()); + } + return Empty.getDefaultInstance(); + }); + } + + @Override + public Mono subsequentExists(SubsequentRequest request) { + return executeSync(() -> { + try (var arena = Arena.ofConfined()) { + var exists = api.subsequent(arena, request.getIterationId(), + request.getSkipCount(), + request.getTakeCount(), + new RequestExists<>()); + return PreviousPresence.newBuilder().setPresent(exists).build(); + } + }); + } + + @Override + public Flux subsequentMultiGet(SubsequentRequest request) { + return Flux.create(emitter -> { + try (var arena = Arena.ofConfined()) { + int pageIndex = 0; + final long pageSize = 16L; + while (request.getTakeCount() > pageIndex * pageSize) { + var response = api.subsequent(arena, + request.getIterationId(), + pageIndex == 0 ? request.getSkipCount() : 0, + Math.min(request.getTakeCount() - pageIndex * pageSize, pageSize), + new RequestMulti<>() + ); + for (MemorySegment entry : response) { + Keys keys = null; // todo: implement + MemorySegment value = entry; + emitter.next(KV.newBuilder() + .addAllKeys(null) // todo: implement + .setValue(ByteString.copyFrom(value.asByteBuffer())) + .build()); + } + pageIndex++; + } + } + emitter.complete(); + }, FluxSink.OverflowStrategy.BUFFER); + } + + @Override + public Mono reduceRangeFirstAndLast(GetRangeRequest request) { + return executeSync(() -> { + try (var arena = Arena.ofConfined()) { + it.cavallium.rockserver.core.common.FirstAndLast firstAndLast + = api.reduceRange(arena, + request.getTransactionId(), + request.getColumnId(), + mapKeys(arena, request.getStartKeysInclusiveCount(), request::getStartKeysInclusive), + mapKeys(arena, request.getEndKeysExclusiveCount(), request::getEndKeysExclusive), + request.getReverse(), + RequestType.firstAndLast(), + request.getTimeoutMs() + ); + return FirstAndLast.newBuilder() + .setFirst(unmapKV(firstAndLast.first())) + .setLast(unmapKV(firstAndLast.last())) + .build(); } }); } @@ -737,6 +506,12 @@ public class GrpcServer extends Server { } } + // utils + + private Mono executeSync(Callable callable) { + return Mono.fromCallable(callable).subscribeOn(executor); + } + // mappers private static KV unmapKV(it.cavallium.rockserver.core.common.KV kv) { @@ -859,7 +634,11 @@ public class GrpcServer extends Server { throw new RuntimeException(e); } elg.close(); - executor.close(); + executor.disposeGracefully().timeout(Duration.ofMinutes(2)).onErrorResume(ex -> { + LOG.error("Grpc server executor shutdown timed out, terminating...", ex); + executor.dispose(); + return Mono.empty(); + }).block(); super.close(); } } diff --git a/src/main/proto/rocksdb.proto b/src/main/proto/rocksdb.proto index e14855b..920eb6a 100644 --- a/src/main/proto/rocksdb.proto +++ b/src/main/proto/rocksdb.proto @@ -124,6 +124,6 @@ service RocksDBService { rpc subsequent(SubsequentRequest) returns (google.protobuf.Empty); rpc subsequentExists(SubsequentRequest) returns (PreviousPresence); rpc subsequentMultiGet(SubsequentRequest) returns (stream KV); - rpc getRangeFirstAndLast(GetRangeRequest) returns (FirstAndLast); - rpc getRangeStream(GetRangeRequest) returns (stream KV); + rpc reduceRangeFirstAndLast(GetRangeRequest) returns (FirstAndLast); + rpc getAllInRange(GetRangeRequest) returns (stream KV); } diff --git a/src/native/java/module-info.java b/src/native/java/module-info.java index 695d8c1..aae87d4 100644 --- a/src/native/java/module-info.java +++ b/src/native/java/module-info.java @@ -13,7 +13,6 @@ module rockserver.core { requires io.grpc.protobuf; requires io.grpc.stub; requires io.grpc; - requires jsr305; requires com.google.common; requires io.grpc.netty; requires io.jstach.rainbowgum; @@ -30,6 +29,8 @@ module rockserver.core { requires io.netty.transport.classes.epoll; requires org.reactivestreams; requires io.netty.transport.unix.common; + requires reactor.grpc.stub; + requires java.annotation; exports it.cavallium.rockserver.core.client; exports it.cavallium.rockserver.core.common;