From 62e536e88f8aa90ad9ea50af182d80424698339c Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Mon, 16 Sep 2024 17:27:50 +0200 Subject: [PATCH] Fix memory leak --- .../rockserver/core/server/GrpcServer.java | 84 ++++++++++++------- 1 file changed, 56 insertions(+), 28 deletions(-) diff --git a/src/main/java/it/cavallium/rockserver/core/server/GrpcServer.java b/src/main/java/it/cavallium/rockserver/core/server/GrpcServer.java index 4e52676..c64402c 100644 --- a/src/main/java/it/cavallium/rockserver/core/server/GrpcServer.java +++ b/src/main/java/it/cavallium/rockserver/core/server/GrpcServer.java @@ -20,7 +20,6 @@ import it.cavallium.rockserver.core.common.RequestType.RequestMulti; import it.cavallium.rockserver.core.common.RequestType.RequestNothing; import it.cavallium.rockserver.core.common.RequestType.RequestPrevious; import it.cavallium.rockserver.core.common.RequestType.RequestPreviousPresence; -import it.cavallium.rockserver.core.common.Utils; import it.cavallium.rockserver.core.common.api.proto.Changed; import it.cavallium.rockserver.core.common.api.proto.CloseFailedUpdateRequest; import it.cavallium.rockserver.core.common.api.proto.CloseIteratorRequest; @@ -62,7 +61,8 @@ import java.util.concurrent.CompletionException; import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiConsumer; import java.util.function.Function; -import org.jetbrains.annotations.NotNull; + +import org.jetbrains.annotations.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -87,11 +87,9 @@ public class GrpcServer extends Server { private static final Function MAP_EMPTY = _ -> Empty.getDefaultInstance(); private final RocksDBConnection client; - private final Arena autoArena; public GrpcServerImpl(RocksDBConnection client) { this.client = client; - this.autoArena = Arena.ofAuto(); } // functions @@ -103,7 +101,7 @@ public class GrpcServer extends Server { .openTransactionAsync(request.getTimeoutMs()) .whenComplete(handleResponseObserver( txId -> OpenTransactionResponse.newBuilder().setTransactionId(txId).build(), - responseObserver)); + responseObserver, null)); } @Override @@ -113,15 +111,14 @@ public class GrpcServer extends Server { .closeTransactionAsync(request.getTransactionId(), request.getCommit()) .whenComplete(handleResponseObserver( committed -> CloseTransactionResponse.newBuilder().setSuccessful(committed).build(), - responseObserver - )); + responseObserver, null)); } @Override public void closeFailedUpdate(CloseFailedUpdateRequest request, StreamObserver responseObserver) { client.getAsyncApi() .closeFailedUpdateAsync(request.getUpdateId()) - .whenComplete(handleResponseObserver(MAP_EMPTY, responseObserver)); + .whenComplete(handleResponseObserver(MAP_EMPTY, responseObserver, null)); } @Override @@ -130,14 +127,14 @@ public class GrpcServer extends Server { .createColumnAsync(request.getName(), mapColumnSchema(request.getSchema())) .whenComplete(handleResponseObserver( colId -> CreateColumnResponse.newBuilder().setColumnId(colId).build(), - responseObserver)); + responseObserver, null)); } @Override public void deleteColumn(DeleteColumnRequest request, StreamObserver responseObserver) { client.getAsyncApi() .deleteColumnAsync(request.getColumnId()) - .whenComplete(handleResponseObserver(MAP_EMPTY, responseObserver)); + .whenComplete(handleResponseObserver(MAP_EMPTY, responseObserver, null)); } @Override @@ -146,11 +143,12 @@ public class GrpcServer extends Server { .getColumnIdAsync(request.getName()) .whenComplete(handleResponseObserver( colId -> GetColumnIdResponse.newBuilder().setColumnId(colId).build(), - responseObserver)); + responseObserver, null)); } @Override public void put(PutRequest request, StreamObserver responseObserver) { + var autoArena = Arena.ofShared(); client.getAsyncApi() .putAsync(autoArena, request.getTransactionOrUpdateId(), @@ -159,11 +157,12 @@ public class GrpcServer extends Server { toMemorySegment(autoArena, request.getData().getValue()), new RequestNothing<>() ) - .whenComplete(handleResponseObserver(MAP_EMPTY, responseObserver)); + .whenComplete(handleResponseObserver(MAP_EMPTY, responseObserver, autoArena)); } @Override public StreamObserver putMulti(StreamObserver responseObserver) { + var autoArena = Arena.ofShared(); return new StreamObserver<>() { private boolean initialRequestDone = false; private long requestsCount = 0; @@ -196,11 +195,13 @@ public class GrpcServer extends Server { ) .whenComplete((_, error) -> { if (error != null) { + closeArenaSafe(autoArena); responseObserver.onError(error); } else { var newProcessedRequestCount = processedRequestsCount.incrementAndGet(); if (requestsCountFinalized) { if (newProcessedRequestCount == requestsCount) { + closeArenaSafe(autoArena); responseObserver.onCompleted(); } } @@ -215,6 +216,7 @@ public class GrpcServer extends Server { @Override public void onError(Throwable t) { + closeArenaSafe(autoArena); responseObserver.onError(t); } @@ -222,6 +224,7 @@ public class GrpcServer extends Server { public void onCompleted() { requestsCountFinalized = true; if (requestsCount == 0) { + closeArenaSafe(autoArena); responseObserver.onCompleted(); } } @@ -230,6 +233,7 @@ public class GrpcServer extends Server { @Override public void putGetPrevious(PutRequest request, StreamObserver responseObserver) { + var autoArena = Arena.ofShared(); client.getAsyncApi() .putAsync(autoArena, request.getTransactionOrUpdateId(), @@ -246,11 +250,12 @@ public class GrpcServer extends Server { } return prevBuilder.build(); }, - responseObserver)); + responseObserver, autoArena)); } @Override public void putGetDelta(PutRequest request, StreamObserver responseObserver) { + var autoArena = Arena.ofShared(); client.getAsyncApi() .putAsync(autoArena, request.getTransactionOrUpdateId(), @@ -270,11 +275,12 @@ public class GrpcServer extends Server { } return deltaBuilder.build(); }, - responseObserver)); + responseObserver, autoArena)); } @Override public void putGetChanged(PutRequest request, StreamObserver responseObserver) { + var autoArena = Arena.ofShared(); client.getAsyncApi() .putAsync(autoArena, request.getTransactionOrUpdateId(), @@ -285,11 +291,12 @@ public class GrpcServer extends Server { ) .whenComplete(handleResponseObserver( changed -> Changed.newBuilder().setChanged(changed).build(), - responseObserver)); + responseObserver, autoArena)); } @Override public void putGetPreviousPresence(PutRequest request, StreamObserver responseObserver) { + var autoArena = Arena.ofShared(); client.getAsyncApi() .putAsync(autoArena, request.getTransactionOrUpdateId(), @@ -300,11 +307,12 @@ public class GrpcServer extends Server { ) .whenComplete(handleResponseObserver( present -> PreviousPresence.newBuilder().setPresent(present).build(), - responseObserver)); + responseObserver, autoArena)); } @Override public void get(GetRequest request, StreamObserver responseObserver) { + var autoArena = Arena.ofShared(); client.getAsyncApi() .getAsync(autoArena, request.getTransactionOrUpdateId(), @@ -320,11 +328,12 @@ public class GrpcServer extends Server { } return response.build(); }, - responseObserver)); + responseObserver, autoArena)); } @Override public void getForUpdate(GetRequest request, StreamObserver responseObserver) { + var autoArena = Arena.ofShared(); client.getAsyncApi() .getAsync(autoArena, request.getTransactionOrUpdateId(), @@ -341,11 +350,12 @@ public class GrpcServer extends Server { } return response.build(); }, - responseObserver)); + responseObserver, autoArena)); } @Override public void exists(GetRequest request, StreamObserver responseObserver) { + var autoArena = Arena.ofShared(); client.getAsyncApi() .getAsync(autoArena, request.getTransactionOrUpdateId(), @@ -355,11 +365,12 @@ public class GrpcServer extends Server { ) .whenComplete(handleResponseObserver( exists -> PreviousPresence.newBuilder().setPresent(exists).build(), - responseObserver)); + responseObserver, autoArena)); } @Override public void openIterator(OpenIteratorRequest request, StreamObserver responseObserver) { + var autoArena = Arena.ofShared(); client.getAsyncApi() .openIteratorAsync(autoArena, request.getTransactionId(), @@ -371,25 +382,27 @@ public class GrpcServer extends Server { ) .whenComplete(handleResponseObserver( iteratorId -> OpenIteratorResponse.newBuilder().setIteratorId(iteratorId).build(), - responseObserver)); + responseObserver, autoArena)); } @Override public void closeIterator(CloseIteratorRequest request, StreamObserver responseObserver) { client.getAsyncApi() .closeIteratorAsync(request.getIteratorId()) - .whenComplete(handleResponseObserver(MAP_EMPTY, responseObserver)); + .whenComplete(handleResponseObserver(MAP_EMPTY, responseObserver, null)); } @Override public void seekTo(SeekToRequest request, StreamObserver responseObserver) { + var autoArena = Arena.ofShared(); client.getAsyncApi() .seekToAsync(autoArena, request.getIterationId(), mapKeys(autoArena, request.getKeysCount(), request::getKeys)) - .whenComplete(handleResponseObserver(MAP_EMPTY, responseObserver)); + .whenComplete(handleResponseObserver(MAP_EMPTY, responseObserver, autoArena)); } @Override public void subsequent(SubsequentRequest request, StreamObserver responseObserver) { + var autoArena = Arena.ofShared(); client.getAsyncApi() .subsequentAsync(autoArena, request.getIterationId(), @@ -397,11 +410,12 @@ public class GrpcServer extends Server { request.getTakeCount(), new RequestNothing<>() ) - .whenComplete(handleResponseObserver(MAP_EMPTY, responseObserver)); + .whenComplete(handleResponseObserver(MAP_EMPTY, responseObserver, autoArena)); } @Override public void subsequentExists(SubsequentRequest request, StreamObserver responseObserver) { + var autoArena = Arena.ofShared(); client.getAsyncApi() .subsequentAsync(autoArena, request.getIterationId(), @@ -411,15 +425,16 @@ public class GrpcServer extends Server { ) .whenComplete(handleResponseObserver( exists -> PreviousPresence.newBuilder().setPresent(exists).build(), - responseObserver)); + responseObserver, autoArena)); } @Override public void subsequentMultiGet(SubsequentRequest request, StreamObserver responseObserver) { - subsequentMultiPage(request, responseObserver, 0); + var autoArena = Arena.ofShared(); + subsequentMultiPage(request, responseObserver, 0, autoArena); } - public void subsequentMultiPage(SubsequentRequest request, StreamObserver responseObserver, int pageIndex) { + public void subsequentMultiPage(SubsequentRequest request, StreamObserver responseObserver, int pageIndex, Arena autoArena) { final long pageSize = 16L; if (request.getTakeCount() > pageIndex * pageSize) { client.getAsyncApi() @@ -431,6 +446,7 @@ public class GrpcServer extends Server { ) .whenComplete((response, ex) -> { if (ex != null) { + closeArenaSafe(autoArena); responseObserver.onError(ex); } else { for (MemorySegment entry : response) { @@ -441,14 +457,23 @@ public class GrpcServer extends Server { .setValue(ByteString.copyFrom(value.asByteBuffer())) .build()); } - subsequentMultiPage(request, responseObserver, pageIndex + 1); + subsequentMultiPage(request, responseObserver, pageIndex + 1, autoArena); } }); } else { + closeArenaSafe(autoArena); responseObserver.onCompleted(); } } + private static void closeArenaSafe(Arena autoArena) { + try { + autoArena.close(); + } catch (Exception ex2) { + LOG.error("Failed to close arena", ex2); + } + } + // mappers private static ColumnSchema mapColumnSchema(it.cavallium.rockserver.core.common.api.proto.ColumnSchema schema) { @@ -504,9 +529,10 @@ public class GrpcServer extends Server { } private static BiConsumer handleResponseObserver(Function resultMapper, - StreamObserver responseObserver) { + StreamObserver responseObserver, @Nullable Arena autoArena) { return (value, ex) -> { if (ex != null) { + closeArenaSafe(autoArena); var cause = ex; if (cause instanceof CompletionException completionException) { cause = completionException; @@ -523,12 +549,14 @@ public class GrpcServer extends Server { try { mapped = resultMapper.apply(value); } catch (Throwable ex2) { + closeArenaSafe(autoArena); responseObserver.onError(ex2); return; } if (mapped != null) { responseObserver.onNext(mapped); } + closeArenaSafe(autoArena); responseObserver.onCompleted(); } };