diff --git a/pom.xml b/pom.xml index 8a0d436..268e45e 100644 --- a/pom.xml +++ b/pom.xml @@ -20,6 +20,7 @@ 0.6.1 3.25.3 1.65.1 + 4.1.100.Final 0.7.0 @@ -114,6 +115,11 @@ grpc-netty ${grpc.version} + + io.netty + netty-transport-native-epoll + ${netty.version} + io.netty netty-tcnative-boringssl-static diff --git a/src/library/java/module-info.java b/src/library/java/module-info.java index b2f0d2e..71d4d6d 100644 --- a/src/library/java/module-info.java +++ b/src/library/java/module-info.java @@ -24,6 +24,8 @@ module rockserver.core { requires io.netty.codec.http; requires io.netty.codec; requires io.netty.codec.http2; + requires jdk.unsupported; + requires io.netty.transport.classes.epoll; exports it.cavallium.rockserver.core.client; exports it.cavallium.rockserver.core.common; diff --git a/src/main/java/it/cavallium/rockserver/core/TestGrpcLoop.java b/src/main/java/it/cavallium/rockserver/core/TestGrpcLoop.java new file mode 100644 index 0000000..0f10db0 --- /dev/null +++ b/src/main/java/it/cavallium/rockserver/core/TestGrpcLoop.java @@ -0,0 +1,49 @@ +package it.cavallium.rockserver.core; + +import it.cavallium.rockserver.core.client.ClientBuilder; +import it.cavallium.rockserver.core.client.EmbeddedConnection; +import it.cavallium.rockserver.core.client.RocksDBConnection; +import it.cavallium.rockserver.core.common.ColumnSchema; +import it.cavallium.rockserver.core.common.Keys; +import it.cavallium.rockserver.core.common.RequestType; +import it.cavallium.rockserver.core.common.Utils; +import it.cavallium.rockserver.core.impl.EmbeddedDB; +import it.cavallium.rockserver.core.server.GrpcServer; +import it.unimi.dsi.fastutil.ints.IntList; +import it.unimi.dsi.fastutil.objects.ObjectList; + +import java.io.IOException; +import java.lang.foreign.Arena; +import java.lang.foreign.MemorySegment; + +public class TestGrpcLoop { + public static void main(String[] args) throws IOException, InterruptedException { + var embeddedDB = new EmbeddedConnection(null, "main", null); + var server = new GrpcServer(embeddedDB, "localhost", 12345); + var clientB = new ClientBuilder(); + clientB.setHttpAddress(new Utils.HostAndPort("localhost", 12345)); + clientB.setName("local"); + clientB.setUseThrift(false); + var client = clientB.build(); + var col = client.getSyncApi().createColumn("test", ColumnSchema.of(IntList.of(15), ObjectList.of(), true)); + var parallelism = 4; + for (int i = 0; i < parallelism; i++) { + var t = new Thread(() -> { + while (true) { + try (var arena = Arena.ofConfined()) { + var delta = client.getSyncApi().put(arena, 0, col, + new Keys(new MemorySegment[]{MemorySegment.ofArray(new byte[15])}), + MemorySegment.ofArray(new byte[15]), + RequestType.delta()); + } + } + }); + t.setDaemon(true); + t.setName("test-requests-thread-" + i); + t.start(); + if (i + 1 == parallelism) { + t.join(); + } + } + } +} diff --git a/src/main/java/it/cavallium/rockserver/core/client/GrpcConnection.java b/src/main/java/it/cavallium/rockserver/core/client/GrpcConnection.java index 53d5d33..04fcf73 100644 --- a/src/main/java/it/cavallium/rockserver/core/client/GrpcConnection.java +++ b/src/main/java/it/cavallium/rockserver/core/client/GrpcConnection.java @@ -119,7 +119,7 @@ public class GrpcConnection extends BaseConnection implements RocksDBAPI { @Override public R requestSync(RocksDBAPICommand req) { - var asyncResponse = requestAsync(req); + var asyncResponse = req.handleAsync(this); return asyncResponse .toCompletableFuture() .join(); diff --git a/src/main/java/it/cavallium/rockserver/core/server/GrpcServer.java b/src/main/java/it/cavallium/rockserver/core/server/GrpcServer.java index 6438696..f4677d3 100644 --- a/src/main/java/it/cavallium/rockserver/core/server/GrpcServer.java +++ b/src/main/java/it/cavallium/rockserver/core/server/GrpcServer.java @@ -4,13 +4,17 @@ import static it.cavallium.rockserver.core.common.Utils.toMemorySegment; import com.google.protobuf.ByteString; import com.google.protobuf.Empty; -import io.grpc.Status; import io.grpc.netty.NettyServerBuilder; import io.grpc.stub.StreamObserver; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.ServerChannel; +import io.netty.channel.epoll.EpollEventLoopGroup; +import io.netty.channel.epoll.EpollServerDomainSocketChannel; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioServerSocketChannel; import it.cavallium.rockserver.core.client.RocksDBConnection; -import it.cavallium.rockserver.core.common.ColumnHashType; -import it.cavallium.rockserver.core.common.ColumnSchema; -import it.cavallium.rockserver.core.common.Keys; +import it.cavallium.rockserver.core.common.*; import it.cavallium.rockserver.core.common.RequestType.RequestChanged; import it.cavallium.rockserver.core.common.RequestType.RequestCurrent; import it.cavallium.rockserver.core.common.RequestType.RequestDelta; @@ -57,12 +61,10 @@ import java.io.IOException; import java.lang.foreign.Arena; import java.lang.foreign.MemorySegment; import java.net.InetSocketAddress; -import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicLong; -import java.util.function.BiConsumer; -import java.util.function.Function; -import org.jetbrains.annotations.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -71,25 +73,42 @@ public class GrpcServer extends Server { private static final Logger LOG = LoggerFactory.getLogger(GrpcServer.class.getName()); private final GrpcServerImpl grpc; + private final EventLoopGroup elg; + private final ExecutorService executor; private final io.grpc.Server server; public GrpcServer(RocksDBConnection client, String http2Host, int http2Port) throws IOException { super(client); this.grpc = new GrpcServerImpl(this.getClient()); + EventLoopGroup elg; + Class channelType; + try { + elg = new EpollEventLoopGroup(1); + channelType = EpollServerDomainSocketChannel.class; + } catch (UnsatisfiedLinkError ex) { + elg = new NioEventLoopGroup(); + channelType = NioServerSocketChannel.class; + } + this.elg = elg; + this.executor = Executors.newWorkStealingPool(Runtime.getRuntime().availableProcessors() * 2); this.server = NettyServerBuilder.forAddress(new InetSocketAddress(http2Host, http2Port)) + .bossEventLoopGroup(elg) + .workerEventLoopGroup(elg) + .directExecutor() + .channelType(channelType) + .withChildOption(ChannelOption.SO_KEEPALIVE, false) .addService(grpc) .build(); server.start(); LOG.info("GRPC RocksDB server is listening at " + http2Host + ":" + http2Port); } - private static final class GrpcServerImpl extends RocksDBServiceImplBase { + private final class GrpcServerImpl extends RocksDBServiceImplBase { - private static final Function MAP_EMPTY = _ -> Empty.getDefaultInstance(); - private final RocksDBConnection client; + private final RocksDBSyncAPI api; public GrpcServerImpl(RocksDBConnection client) { - this.client = client; + this.api = client.getSyncApi(); } // functions @@ -97,72 +116,98 @@ public class GrpcServer extends Server { @Override public void openTransaction(OpenTransactionRequest request, StreamObserver responseObserver) { - client.getAsyncApi() - .openTransactionAsync(request.getTimeoutMs()) - .whenComplete(handleResponseObserver( - txId -> OpenTransactionResponse.newBuilder().setTransactionId(txId).build(), - responseObserver, null)); + executor.execute(() -> { + try { + var txId = api.openTransaction(request.getTimeoutMs()); + responseObserver.onNext(OpenTransactionResponse.newBuilder().setTransactionId(txId).build()); + responseObserver.onCompleted(); + } catch (Throwable ex) { + responseObserver.onError(ex); + } + }); } @Override public void closeTransaction(CloseTransactionRequest request, StreamObserver responseObserver) { - client.getAsyncApi() - .closeTransactionAsync(request.getTransactionId(), request.getCommit()) - .whenComplete(handleResponseObserver( - committed -> CloseTransactionResponse.newBuilder().setSuccessful(committed).build(), - responseObserver, null)); + executor.execute(() -> { + try { + var committed = api.closeTransaction(request.getTransactionId(), request.getCommit()); + var response = CloseTransactionResponse.newBuilder().setSuccessful(committed).build(); + responseObserver.onNext(response); + responseObserver.onCompleted(); + } catch (Throwable ex) { + responseObserver.onError(ex); + } + }); } @Override public void closeFailedUpdate(CloseFailedUpdateRequest request, StreamObserver responseObserver) { - client.getAsyncApi() - .closeFailedUpdateAsync(request.getUpdateId()) - .whenComplete(handleResponseObserver(MAP_EMPTY, responseObserver, null)); + executor.execute(() -> { + try { + api.closeFailedUpdate(request.getUpdateId()); + responseObserver.onCompleted(); + } catch (Throwable ex) { + responseObserver.onError(ex); + } + }); } @Override public void createColumn(CreateColumnRequest request, StreamObserver responseObserver) { - client.getAsyncApi() - .createColumnAsync(request.getName(), mapColumnSchema(request.getSchema())) - .whenComplete(handleResponseObserver( - colId -> CreateColumnResponse.newBuilder().setColumnId(colId).build(), - responseObserver, null)); + executor.execute(() -> { + var colId = api.createColumn(request.getName(), mapColumnSchema(request.getSchema())); + var response = CreateColumnResponse.newBuilder().setColumnId(colId).build(); + responseObserver.onNext(response); + responseObserver.onCompleted(); + }); } @Override public void deleteColumn(DeleteColumnRequest request, StreamObserver responseObserver) { - client.getAsyncApi() - .deleteColumnAsync(request.getColumnId()) - .whenComplete(handleResponseObserver(MAP_EMPTY, responseObserver, null)); + executor.execute(() -> { + api.deleteColumn(request.getColumnId()); + responseObserver.onCompleted(); + }); } @Override public void getColumnId(GetColumnIdRequest request, StreamObserver responseObserver) { - client.getAsyncApi() - .getColumnIdAsync(request.getName()) - .whenComplete(handleResponseObserver( - colId -> GetColumnIdResponse.newBuilder().setColumnId(colId).build(), - responseObserver, null)); + executor.execute(() -> { + try { + var colId = api.getColumnId(request.getName()); + var response = GetColumnIdResponse.newBuilder().setColumnId(colId).build(); + responseObserver.onNext(response); + responseObserver.onCompleted(); + } catch (Throwable ex) { + responseObserver.onError(ex); + } + }); } @Override public void put(PutRequest request, StreamObserver responseObserver) { - var autoArena = Arena.ofShared(); - client.getAsyncApi() - .putAsync(autoArena, - request.getTransactionOrUpdateId(), - request.getColumnId(), - mapKeys(autoArena, request.getData().getKeysCount(), request.getData()::getKeys), - toMemorySegment(autoArena, request.getData().getValue()), - new RequestNothing<>() - ) - .whenComplete(handleResponseObserver(MAP_EMPTY, responseObserver, autoArena)); + executor.execute(() -> { + try { + try (var arena = Arena.ofConfined()) { + api.put(arena, + request.getTransactionOrUpdateId(), + request.getColumnId(), + mapKeys(arena, request.getData().getKeysCount(), request.getData()::getKeys), + toMemorySegment(arena, request.getData().getValue()), + new RequestNothing<>() + ); + } + responseObserver.onCompleted(); + } catch (Throwable ex) { + responseObserver.onError(ex); + } + }); } @Override public StreamObserver putMulti(StreamObserver responseObserver) { - var autoArena = Arena.ofShared(); return new StreamObserver<>() { private boolean initialRequestDone = false; private long requestsCount = 0; @@ -185,28 +230,28 @@ public class GrpcServer extends Server { throw new UnsupportedOperationException("Initial request already done!"); } ++requestsCount; - client.getAsyncApi() - .putAsync(autoArena, - initialRequest.getTransactionOrUpdateId(), - initialRequest.getColumnId(), - mapKeys(autoArena, request.getData().getKeysCount(), request.getData()::getKeys), - toMemorySegment(autoArena, request.getData().getValue()), - new RequestNothing<>() - ) - .whenComplete((_, error) -> { - if (error != null) { - closeArenaSafe(autoArena); - responseObserver.onError(error); - } else { - var newProcessedRequestCount = processedRequestsCount.incrementAndGet(); - if (requestsCountFinalized) { - if (newProcessedRequestCount == requestsCount) { - closeArenaSafe(autoArena); - responseObserver.onCompleted(); - } - } - } - }); + executor.execute(() -> { + try { + try (var arena = Arena.ofConfined()) { + api.put(arena, + initialRequest.getTransactionOrUpdateId(), + initialRequest.getColumnId(), + mapKeys(arena, request.getData().getKeysCount(), request.getData()::getKeys), + toMemorySegment(arena, request.getData().getValue()), + new RequestNothing<>()); + } + } catch (RocksDBException ex) { + responseObserver.onError(ex); + return; + } + + var newProcessedRequestCount = processedRequestsCount.incrementAndGet(); + if (requestsCountFinalized) { + if (newProcessedRequestCount == requestsCount) { + responseObserver.onCompleted(); + } + } + }); } case null, default -> throw new UnsupportedOperationException("Unsupported operation: " @@ -216,7 +261,6 @@ public class GrpcServer extends Server { @Override public void onError(Throwable t) { - closeArenaSafe(autoArena); responseObserver.onError(t); } @@ -224,7 +268,6 @@ public class GrpcServer extends Server { public void onCompleted() { requestsCountFinalized = true; if (requestsCount == 0) { - closeArenaSafe(autoArena); responseObserver.onCompleted(); } } @@ -233,237 +276,288 @@ public class GrpcServer extends Server { @Override public void putGetPrevious(PutRequest request, StreamObserver responseObserver) { - var autoArena = Arena.ofShared(); - client.getAsyncApi() - .putAsync(autoArena, - request.getTransactionOrUpdateId(), - request.getColumnId(), - mapKeys(autoArena, request.getData().getKeysCount(), request.getData()::getKeys), - toMemorySegment(autoArena, request.getData().getValue()), - new RequestPrevious<>() - ) - .whenComplete(handleResponseObserver( - prev -> { - var prevBuilder = Previous.newBuilder(); - if (prev != null) { - prevBuilder.setPrevious(ByteString.copyFrom(prev.asByteBuffer())); - } - return prevBuilder.build(); - }, - responseObserver, autoArena)); + executor.execute(() -> { + try { + try (var arena = Arena.ofConfined()) { + var prev = api.put(arena, + request.getTransactionOrUpdateId(), + request.getColumnId(), + mapKeys(arena, request.getData().getKeysCount(), request.getData()::getKeys), + toMemorySegment(arena, request.getData().getValue()), + new RequestPrevious<>() + ); + var prevBuilder = Previous.newBuilder(); + if (prev != null) { + prevBuilder.setPrevious(ByteString.copyFrom(prev.asByteBuffer())); + } + var response = prevBuilder.build(); + responseObserver.onNext(response); + } + responseObserver.onCompleted(); + } catch (Throwable ex) { + responseObserver.onError(ex); + } + }); } @Override public void putGetDelta(PutRequest request, StreamObserver responseObserver) { - var autoArena = Arena.ofShared(); - client.getAsyncApi() - .putAsync(autoArena, - request.getTransactionOrUpdateId(), - request.getColumnId(), - mapKeys(autoArena, request.getData().getKeysCount(), request.getData()::getKeys), - toMemorySegment(autoArena, request.getData().getValue()), - new RequestDelta<>() - ) - .whenComplete(handleResponseObserver( - delta -> { - var deltaBuilder = Delta.newBuilder(); - if (delta.previous() != null) { - deltaBuilder.setPrevious(ByteString.copyFrom(delta.previous().asByteBuffer())); - } - if (delta.current() != null) { - deltaBuilder.setCurrent(ByteString.copyFrom(delta.current().asByteBuffer())); - } - return deltaBuilder.build(); - }, - responseObserver, autoArena)); + executor.execute(() -> { + try { + try (var arena = Arena.ofConfined()) { + var delta = api.put(arena, + request.getTransactionOrUpdateId(), + request.getColumnId(), + mapKeys(arena, request.getData().getKeysCount(), request.getData()::getKeys), + toMemorySegment(arena, request.getData().getValue()), + new RequestDelta<>() + ); + var deltaBuilder = Delta.newBuilder(); + if (delta.previous() != null) { + deltaBuilder.setPrevious(ByteString.copyFrom(delta.previous().asByteBuffer())); + } + if (delta.current() != null) { + deltaBuilder.setCurrent(ByteString.copyFrom(delta.current().asByteBuffer())); + } + var response = deltaBuilder.build(); + responseObserver.onNext(response); + } + responseObserver.onCompleted(); + } catch (Throwable ex) { + responseObserver.onError(ex); + } + }); } @Override public void putGetChanged(PutRequest request, StreamObserver responseObserver) { - var autoArena = Arena.ofShared(); - client.getAsyncApi() - .putAsync(autoArena, - request.getTransactionOrUpdateId(), - request.getColumnId(), - mapKeys(autoArena, request.getData().getKeysCount(), request.getData()::getKeys), - toMemorySegment(autoArena, request.getData().getValue()), - new RequestChanged<>() - ) - .whenComplete(handleResponseObserver( - changed -> Changed.newBuilder().setChanged(changed).build(), - responseObserver, autoArena)); + executor.execute(() -> { + try { + try (var arena = Arena.ofConfined()) { + var changed = api.put(arena, + request.getTransactionOrUpdateId(), + request.getColumnId(), + mapKeys(arena, request.getData().getKeysCount(), request.getData()::getKeys), + toMemorySegment(arena, request.getData().getValue()), + new RequestChanged<>() + ); + var response = Changed.newBuilder().setChanged(changed).build(); + responseObserver.onNext(response); + } + responseObserver.onCompleted(); + } catch (Throwable ex) { + responseObserver.onError(ex); + } + }); } @Override public void putGetPreviousPresence(PutRequest request, StreamObserver responseObserver) { - var autoArena = Arena.ofShared(); - client.getAsyncApi() - .putAsync(autoArena, - request.getTransactionOrUpdateId(), - request.getColumnId(), - mapKeys(autoArena, request.getData().getKeysCount(), request.getData()::getKeys), - toMemorySegment(autoArena, request.getData().getValue()), - new RequestPreviousPresence<>() - ) - .whenComplete(handleResponseObserver( - present -> PreviousPresence.newBuilder().setPresent(present).build(), - responseObserver, autoArena)); + executor.execute(() -> { + try { + try (var arena = Arena.ofConfined()) { + var present = api.put(arena, + request.getTransactionOrUpdateId(), + request.getColumnId(), + mapKeys(arena, request.getData().getKeysCount(), request.getData()::getKeys), + toMemorySegment(arena, request.getData().getValue()), + new RequestPreviousPresence<>() + ); + var response = PreviousPresence.newBuilder().setPresent(present).build(); + responseObserver.onNext(response); + } + responseObserver.onCompleted(); + } catch (Throwable ex) { + responseObserver.onError(ex); + } + }); } @Override public void get(GetRequest request, StreamObserver responseObserver) { - var autoArena = Arena.ofShared(); - client.getAsyncApi() - .getAsync(autoArena, - request.getTransactionOrUpdateId(), - request.getColumnId(), - mapKeys(autoArena, request.getKeysCount(), request::getKeys), - new RequestCurrent<>() - ) - .whenComplete(handleResponseObserver( - current -> { - var response = GetResponse.newBuilder(); - if (current != null) { - response.setValue(ByteString.copyFrom(current.asByteBuffer())); - } - return response.build(); - }, - responseObserver, autoArena)); + executor.execute(() -> { + try { + try (var arena = Arena.ofConfined()) { + var current = api.get(arena, + request.getTransactionOrUpdateId(), + request.getColumnId(), + mapKeys(arena, request.getKeysCount(), request::getKeys), + new RequestCurrent<>() + ); + var responseBuilder = GetResponse.newBuilder(); + if (current != null) { + responseBuilder.setValue(ByteString.copyFrom(current.asByteBuffer())); + } + var response = responseBuilder.build(); + responseObserver.onNext(response); + } + responseObserver.onCompleted(); + } catch (Throwable ex) { + responseObserver.onError(ex); + } + }); } @Override public void getForUpdate(GetRequest request, StreamObserver responseObserver) { - var autoArena = Arena.ofShared(); - client.getAsyncApi() - .getAsync(autoArena, - request.getTransactionOrUpdateId(), - request.getColumnId(), - mapKeys(autoArena, request.getKeysCount(), request::getKeys), - new RequestForUpdate<>() - ) - .whenComplete(handleResponseObserver( - forUpdate -> { - var response = UpdateBegin.newBuilder(); - response.setUpdateId(forUpdate.updateId()); - if (forUpdate.previous() != null) { - response.setPrevious(ByteString.copyFrom(forUpdate.previous().asByteBuffer())); - } - return response.build(); - }, - responseObserver, autoArena)); + executor.execute(() -> { + try { + try (var arena = Arena.ofConfined()) { + var forUpdate = api.get(arena, + request.getTransactionOrUpdateId(), + request.getColumnId(), + mapKeys(arena, request.getKeysCount(), request::getKeys), + new RequestForUpdate<>() + ); + var responseBuilder = UpdateBegin.newBuilder(); + responseBuilder.setUpdateId(forUpdate.updateId()); + if (forUpdate.previous() != null) { + responseBuilder.setPrevious(ByteString.copyFrom(forUpdate.previous().asByteBuffer())); + } + var response = responseBuilder.build(); + responseObserver.onNext(response); + } + responseObserver.onCompleted(); + } catch (Throwable ex) { + responseObserver.onError(ex); + } + }); } @Override public void exists(GetRequest request, StreamObserver responseObserver) { - var autoArena = Arena.ofShared(); - client.getAsyncApi() - .getAsync(autoArena, - request.getTransactionOrUpdateId(), - request.getColumnId(), - mapKeys(autoArena, request.getKeysCount(), request::getKeys), - new RequestExists<>() - ) - .whenComplete(handleResponseObserver( - exists -> PreviousPresence.newBuilder().setPresent(exists).build(), - responseObserver, autoArena)); + executor.execute(() -> { + try { + try (var arena = Arena.ofConfined()) { + var exists = api.get(arena, + request.getTransactionOrUpdateId(), + request.getColumnId(), + mapKeys(arena, request.getKeysCount(), request::getKeys), + new RequestExists<>() + ); + responseObserver.onNext(PreviousPresence.newBuilder().setPresent(exists).build()); + } + responseObserver.onCompleted(); + } catch (Throwable ex) { + responseObserver.onError(ex); + } + }); } @Override public void openIterator(OpenIteratorRequest request, StreamObserver responseObserver) { - var autoArena = Arena.ofShared(); - client.getAsyncApi() - .openIteratorAsync(autoArena, - request.getTransactionId(), - request.getColumnId(), - mapKeys(autoArena, request.getStartKeysInclusiveCount(), request::getStartKeysInclusive), - mapKeys(autoArena, request.getEndKeysExclusiveCount(), request::getEndKeysExclusive), - request.getReverse(), - request.getTimeoutMs() - ) - .whenComplete(handleResponseObserver( - iteratorId -> OpenIteratorResponse.newBuilder().setIteratorId(iteratorId).build(), - responseObserver, autoArena)); + executor.execute(() -> { + try { + try (var arena = Arena.ofConfined()) { + var iteratorId = api.openIterator(arena, + request.getTransactionId(), + request.getColumnId(), + mapKeys(arena, request.getStartKeysInclusiveCount(), request::getStartKeysInclusive), + mapKeys(arena, request.getEndKeysExclusiveCount(), request::getEndKeysExclusive), + request.getReverse(), + request.getTimeoutMs() + ); + responseObserver.onNext(OpenIteratorResponse.newBuilder().setIteratorId(iteratorId).build()); + } + responseObserver.onCompleted(); + } catch (Throwable ex) { + responseObserver.onError(ex); + } + }); } @Override public void closeIterator(CloseIteratorRequest request, StreamObserver responseObserver) { - client.getAsyncApi() - .closeIteratorAsync(request.getIteratorId()) - .whenComplete(handleResponseObserver(MAP_EMPTY, responseObserver, null)); + executor.execute(() -> { + try { + api.closeIterator(request.getIteratorId()); + responseObserver.onCompleted(); + } catch (Throwable ex) { + responseObserver.onError(ex); + } + }); } @Override public void seekTo(SeekToRequest request, StreamObserver responseObserver) { - var autoArena = Arena.ofShared(); - client.getAsyncApi() - .seekToAsync(autoArena, request.getIterationId(), mapKeys(autoArena, request.getKeysCount(), request::getKeys)) - .whenComplete(handleResponseObserver(MAP_EMPTY, responseObserver, autoArena)); + executor.execute(() -> { + try { + try (var arena = Arena.ofConfined()) { + api.seekTo(arena, request.getIterationId(), mapKeys(arena, request.getKeysCount(), request::getKeys)); + } + responseObserver.onCompleted(); + } catch (Throwable ex) { + responseObserver.onError(ex); + } + }); } @Override public void subsequent(SubsequentRequest request, StreamObserver responseObserver) { - var autoArena = Arena.ofShared(); - client.getAsyncApi() - .subsequentAsync(autoArena, - request.getIterationId(), - request.getSkipCount(), - request.getTakeCount(), - new RequestNothing<>() - ) - .whenComplete(handleResponseObserver(MAP_EMPTY, responseObserver, autoArena)); + executor.execute(() -> { + try { + try (var arena = Arena.ofConfined()) { + api.subsequent(arena, request.getIterationId(), + request.getSkipCount(), + request.getTakeCount(), + new RequestNothing<>()); + } + responseObserver.onCompleted(); + } catch (Throwable ex) { + responseObserver.onError(ex); + } + }); } @Override public void subsequentExists(SubsequentRequest request, StreamObserver responseObserver) { - var autoArena = Arena.ofShared(); - client.getAsyncApi() - .subsequentAsync(autoArena, - request.getIterationId(), - request.getSkipCount(), - request.getTakeCount(), - new RequestExists<>() - ) - .whenComplete(handleResponseObserver( - exists -> PreviousPresence.newBuilder().setPresent(exists).build(), - responseObserver, autoArena)); + executor.execute(() -> { + try { + try (var arena = Arena.ofConfined()) { + var exists = api.subsequent(arena, request.getIterationId(), + request.getSkipCount(), + request.getTakeCount(), + new RequestExists<>()); + var response = PreviousPresence.newBuilder().setPresent(exists).build(); + responseObserver.onNext(response); + } + responseObserver.onCompleted(); + } catch (Throwable ex) { + responseObserver.onError(ex); + } + }); } @Override public void subsequentMultiGet(SubsequentRequest request, StreamObserver responseObserver) { - var autoArena = Arena.ofShared(); - subsequentMultiPage(request, responseObserver, 0, autoArena); - } - - public void subsequentMultiPage(SubsequentRequest request, StreamObserver responseObserver, int pageIndex, Arena autoArena) { - final long pageSize = 16L; - if (request.getTakeCount() > pageIndex * pageSize) { - client.getAsyncApi() - .subsequentAsync(autoArena, - request.getIterationId(), - pageIndex == 0 ? request.getSkipCount() : 0, - Math.min(request.getTakeCount() - pageIndex * pageSize, pageSize), - new RequestMulti<>() - ) - .whenComplete((response, ex) -> { - if (ex != null) { - closeArenaSafe(autoArena); - responseObserver.onError(ex); - } else { - for (MemorySegment entry : response) { - Keys keys = null; // todo: implement - MemorySegment value = entry; - responseObserver.onNext(KV.newBuilder() - .addAllKeys(null) // todo: implement - .setValue(ByteString.copyFrom(value.asByteBuffer())) - .build()); - } - subsequentMultiPage(request, responseObserver, pageIndex + 1, autoArena); - } - }); - } else { - closeArenaSafe(autoArena); - responseObserver.onCompleted(); - } + executor.execute(() -> { + try { + try (var arena = Arena.ofConfined()) { + int pageIndex = 0; + final long pageSize = 16L; + while (request.getTakeCount() > pageIndex * pageSize) { + var response = api.subsequent(arena, + request.getIterationId(), + pageIndex == 0 ? request.getSkipCount() : 0, + Math.min(request.getTakeCount() - pageIndex * pageSize, pageSize), + new RequestMulti<>() + ); + for (MemorySegment entry : response) { + Keys keys = null; // todo: implement + MemorySegment value = entry; + responseObserver.onNext(KV.newBuilder() + .addAllKeys(null) // todo: implement + .setValue(ByteString.copyFrom(value.asByteBuffer())) + .build()); + } + pageIndex++; + } + } + responseObserver.onCompleted(); + } catch (Throwable ex) { + responseObserver.onError(ex); + } + }); } private static void closeArenaSafe(Arena autoArena) { @@ -514,55 +608,6 @@ public class GrpcServer extends Server { } return new Keys(segments); } - - // utils - - private static BiConsumer handleResponseObserver(StreamObserver responseObserver) { - return (value, ex) -> { - if (ex != null) { - responseObserver.onError(ex); - } else { - if (value != null) { - responseObserver.onNext(value); - } - responseObserver.onCompleted(); - } - }; - } - - private static BiConsumer handleResponseObserver(Function resultMapper, - StreamObserver responseObserver, @Nullable Arena autoArena) { - return (value, ex) -> { - if (ex != null) { - closeArenaSafe(autoArena); - var cause = ex; - if (cause instanceof CompletionException completionException) { - cause = completionException; - } - if (cause instanceof it.cavallium.rockserver.core.common.RocksDBException rocksDBException) { - cause = rocksDBException; - } - var error = Status.INTERNAL.withCause(cause) - .withDescription(cause.toString()) - .asException(); - responseObserver.onError(error); - } else { - T mapped; - try { - mapped = resultMapper.apply(value); - } catch (Throwable ex2) { - closeArenaSafe(autoArena); - responseObserver.onError(ex2); - return; - } - if (mapped != null) { - responseObserver.onNext(mapped); - } - closeArenaSafe(autoArena); - responseObserver.onCompleted(); - } - }; - } } @Override @@ -574,6 +619,8 @@ public class GrpcServer extends Server { } catch (InterruptedException e) { throw new RuntimeException(e); } + elg.close(); + executor.close(); super.close(); } } diff --git a/src/test/java/it/cavallium/rockserver/core/impl/test/EmbeddedDBTest.java b/src/test/java/it/cavallium/rockserver/core/impl/test/EmbeddedDBTest.java index bb65d89..c2743dc 100644 --- a/src/test/java/it/cavallium/rockserver/core/impl/test/EmbeddedDBTest.java +++ b/src/test/java/it/cavallium/rockserver/core/impl/test/EmbeddedDBTest.java @@ -162,11 +162,11 @@ abstract class EmbeddedDBTest { Assertions.assertThrows(RocksDBException.class, () -> db.put(arena, 0, colId, key, toMemorySegmentSimple(arena, 123), RequestType.delta())); } else { Assertions.assertThrows(RocksDBException.class, () -> db.put(arena, 0, colId, key, MemorySegment.NULL, RequestType.delta())); - Assertions.assertThrows(RocksDBException.class, () -> db.put(arena, 0, colId, key, null, RequestType.delta())); + Assertions.assertThrows(IllegalArgumentException.class, () -> db.put(arena, 0, colId, key, null, RequestType.delta())); } - Assertions.assertThrows(RocksDBException.class, () -> db.put(arena, 0, colId, null, value1, RequestType.delta())); - Assertions.assertThrows(RocksDBException.class, () -> db.put(arena, 0, colId, null, null, RequestType.delta())); + Assertions.assertThrows(IllegalArgumentException.class, () -> db.put(arena, 0, colId, null, value1, RequestType.delta())); + Assertions.assertThrows(IllegalArgumentException.class, () -> db.put(arena, 0, colId, null, null, RequestType.delta())); Assertions.assertThrows(RocksDBException.class, () -> db.put(arena, 0, colId, key, value1, null)); Assertions.assertThrows(RocksDBException.class, () -> db.put(arena, 1, colId, key, value1, RequestType.delta())); Assertions.assertThrows(RocksDBException.class, () -> db.put(arena, 0, 21203, key, value1, RequestType.delta()));