Fix memory leak

This commit is contained in:
Andrea Cavalli 2024-09-16 17:27:50 +02:00
parent ed9e8de721
commit 62e536e88f

View File

@ -20,7 +20,6 @@ import it.cavallium.rockserver.core.common.RequestType.RequestMulti;
import it.cavallium.rockserver.core.common.RequestType.RequestNothing;
import it.cavallium.rockserver.core.common.RequestType.RequestPrevious;
import it.cavallium.rockserver.core.common.RequestType.RequestPreviousPresence;
import it.cavallium.rockserver.core.common.Utils;
import it.cavallium.rockserver.core.common.api.proto.Changed;
import it.cavallium.rockserver.core.common.api.proto.CloseFailedUpdateRequest;
import it.cavallium.rockserver.core.common.api.proto.CloseIteratorRequest;
@ -62,7 +61,8 @@ import java.util.concurrent.CompletionException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.function.Function;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -87,11 +87,9 @@ public class GrpcServer extends Server {
private static final Function<? super Void, Empty> MAP_EMPTY = _ -> Empty.getDefaultInstance();
private final RocksDBConnection client;
private final Arena autoArena;
public GrpcServerImpl(RocksDBConnection client) {
this.client = client;
this.autoArena = Arena.ofAuto();
}
// functions
@ -103,7 +101,7 @@ public class GrpcServer extends Server {
.openTransactionAsync(request.getTimeoutMs())
.whenComplete(handleResponseObserver(
txId -> OpenTransactionResponse.newBuilder().setTransactionId(txId).build(),
responseObserver));
responseObserver, null));
}
@Override
@ -113,15 +111,14 @@ public class GrpcServer extends Server {
.closeTransactionAsync(request.getTransactionId(), request.getCommit())
.whenComplete(handleResponseObserver(
committed -> CloseTransactionResponse.newBuilder().setSuccessful(committed).build(),
responseObserver
));
responseObserver, null));
}
@Override
public void closeFailedUpdate(CloseFailedUpdateRequest request, StreamObserver<Empty> responseObserver) {
client.getAsyncApi()
.closeFailedUpdateAsync(request.getUpdateId())
.whenComplete(handleResponseObserver(MAP_EMPTY, responseObserver));
.whenComplete(handleResponseObserver(MAP_EMPTY, responseObserver, null));
}
@Override
@ -130,14 +127,14 @@ public class GrpcServer extends Server {
.createColumnAsync(request.getName(), mapColumnSchema(request.getSchema()))
.whenComplete(handleResponseObserver(
colId -> CreateColumnResponse.newBuilder().setColumnId(colId).build(),
responseObserver));
responseObserver, null));
}
@Override
public void deleteColumn(DeleteColumnRequest request, StreamObserver<Empty> responseObserver) {
client.getAsyncApi()
.deleteColumnAsync(request.getColumnId())
.whenComplete(handleResponseObserver(MAP_EMPTY, responseObserver));
.whenComplete(handleResponseObserver(MAP_EMPTY, responseObserver, null));
}
@Override
@ -146,11 +143,12 @@ public class GrpcServer extends Server {
.getColumnIdAsync(request.getName())
.whenComplete(handleResponseObserver(
colId -> GetColumnIdResponse.newBuilder().setColumnId(colId).build(),
responseObserver));
responseObserver, null));
}
@Override
public void put(PutRequest request, StreamObserver<Empty> responseObserver) {
var autoArena = Arena.ofShared();
client.getAsyncApi()
.putAsync(autoArena,
request.getTransactionOrUpdateId(),
@ -159,11 +157,12 @@ public class GrpcServer extends Server {
toMemorySegment(autoArena, request.getData().getValue()),
new RequestNothing<>()
)
.whenComplete(handleResponseObserver(MAP_EMPTY, responseObserver));
.whenComplete(handleResponseObserver(MAP_EMPTY, responseObserver, autoArena));
}
@Override
public StreamObserver<PutMultiRequest> putMulti(StreamObserver<Empty> responseObserver) {
var autoArena = Arena.ofShared();
return new StreamObserver<>() {
private boolean initialRequestDone = false;
private long requestsCount = 0;
@ -196,11 +195,13 @@ public class GrpcServer extends Server {
)
.whenComplete((_, error) -> {
if (error != null) {
closeArenaSafe(autoArena);
responseObserver.onError(error);
} else {
var newProcessedRequestCount = processedRequestsCount.incrementAndGet();
if (requestsCountFinalized) {
if (newProcessedRequestCount == requestsCount) {
closeArenaSafe(autoArena);
responseObserver.onCompleted();
}
}
@ -215,6 +216,7 @@ public class GrpcServer extends Server {
@Override
public void onError(Throwable t) {
closeArenaSafe(autoArena);
responseObserver.onError(t);
}
@ -222,6 +224,7 @@ public class GrpcServer extends Server {
public void onCompleted() {
requestsCountFinalized = true;
if (requestsCount == 0) {
closeArenaSafe(autoArena);
responseObserver.onCompleted();
}
}
@ -230,6 +233,7 @@ public class GrpcServer extends Server {
@Override
public void putGetPrevious(PutRequest request, StreamObserver<Previous> responseObserver) {
var autoArena = Arena.ofShared();
client.getAsyncApi()
.putAsync(autoArena,
request.getTransactionOrUpdateId(),
@ -246,11 +250,12 @@ public class GrpcServer extends Server {
}
return prevBuilder.build();
},
responseObserver));
responseObserver, autoArena));
}
@Override
public void putGetDelta(PutRequest request, StreamObserver<Delta> responseObserver) {
var autoArena = Arena.ofShared();
client.getAsyncApi()
.putAsync(autoArena,
request.getTransactionOrUpdateId(),
@ -270,11 +275,12 @@ public class GrpcServer extends Server {
}
return deltaBuilder.build();
},
responseObserver));
responseObserver, autoArena));
}
@Override
public void putGetChanged(PutRequest request, StreamObserver<Changed> responseObserver) {
var autoArena = Arena.ofShared();
client.getAsyncApi()
.putAsync(autoArena,
request.getTransactionOrUpdateId(),
@ -285,11 +291,12 @@ public class GrpcServer extends Server {
)
.whenComplete(handleResponseObserver(
changed -> Changed.newBuilder().setChanged(changed).build(),
responseObserver));
responseObserver, autoArena));
}
@Override
public void putGetPreviousPresence(PutRequest request, StreamObserver<PreviousPresence> responseObserver) {
var autoArena = Arena.ofShared();
client.getAsyncApi()
.putAsync(autoArena,
request.getTransactionOrUpdateId(),
@ -300,11 +307,12 @@ public class GrpcServer extends Server {
)
.whenComplete(handleResponseObserver(
present -> PreviousPresence.newBuilder().setPresent(present).build(),
responseObserver));
responseObserver, autoArena));
}
@Override
public void get(GetRequest request, StreamObserver<GetResponse> responseObserver) {
var autoArena = Arena.ofShared();
client.getAsyncApi()
.getAsync(autoArena,
request.getTransactionOrUpdateId(),
@ -320,11 +328,12 @@ public class GrpcServer extends Server {
}
return response.build();
},
responseObserver));
responseObserver, autoArena));
}
@Override
public void getForUpdate(GetRequest request, StreamObserver<UpdateBegin> responseObserver) {
var autoArena = Arena.ofShared();
client.getAsyncApi()
.getAsync(autoArena,
request.getTransactionOrUpdateId(),
@ -341,11 +350,12 @@ public class GrpcServer extends Server {
}
return response.build();
},
responseObserver));
responseObserver, autoArena));
}
@Override
public void exists(GetRequest request, StreamObserver<PreviousPresence> responseObserver) {
var autoArena = Arena.ofShared();
client.getAsyncApi()
.getAsync(autoArena,
request.getTransactionOrUpdateId(),
@ -355,11 +365,12 @@ public class GrpcServer extends Server {
)
.whenComplete(handleResponseObserver(
exists -> PreviousPresence.newBuilder().setPresent(exists).build(),
responseObserver));
responseObserver, autoArena));
}
@Override
public void openIterator(OpenIteratorRequest request, StreamObserver<OpenIteratorResponse> responseObserver) {
var autoArena = Arena.ofShared();
client.getAsyncApi()
.openIteratorAsync(autoArena,
request.getTransactionId(),
@ -371,25 +382,27 @@ public class GrpcServer extends Server {
)
.whenComplete(handleResponseObserver(
iteratorId -> OpenIteratorResponse.newBuilder().setIteratorId(iteratorId).build(),
responseObserver));
responseObserver, autoArena));
}
@Override
public void closeIterator(CloseIteratorRequest request, StreamObserver<Empty> responseObserver) {
client.getAsyncApi()
.closeIteratorAsync(request.getIteratorId())
.whenComplete(handleResponseObserver(MAP_EMPTY, responseObserver));
.whenComplete(handleResponseObserver(MAP_EMPTY, responseObserver, null));
}
@Override
public void seekTo(SeekToRequest request, StreamObserver<Empty> responseObserver) {
var autoArena = Arena.ofShared();
client.getAsyncApi()
.seekToAsync(autoArena, request.getIterationId(), mapKeys(autoArena, request.getKeysCount(), request::getKeys))
.whenComplete(handleResponseObserver(MAP_EMPTY, responseObserver));
.whenComplete(handleResponseObserver(MAP_EMPTY, responseObserver, autoArena));
}
@Override
public void subsequent(SubsequentRequest request, StreamObserver<Empty> responseObserver) {
var autoArena = Arena.ofShared();
client.getAsyncApi()
.subsequentAsync(autoArena,
request.getIterationId(),
@ -397,11 +410,12 @@ public class GrpcServer extends Server {
request.getTakeCount(),
new RequestNothing<>()
)
.whenComplete(handleResponseObserver(MAP_EMPTY, responseObserver));
.whenComplete(handleResponseObserver(MAP_EMPTY, responseObserver, autoArena));
}
@Override
public void subsequentExists(SubsequentRequest request, StreamObserver<PreviousPresence> responseObserver) {
var autoArena = Arena.ofShared();
client.getAsyncApi()
.subsequentAsync(autoArena,
request.getIterationId(),
@ -411,15 +425,16 @@ public class GrpcServer extends Server {
)
.whenComplete(handleResponseObserver(
exists -> PreviousPresence.newBuilder().setPresent(exists).build(),
responseObserver));
responseObserver, autoArena));
}
@Override
public void subsequentMultiGet(SubsequentRequest request, StreamObserver<KV> responseObserver) {
subsequentMultiPage(request, responseObserver, 0);
var autoArena = Arena.ofShared();
subsequentMultiPage(request, responseObserver, 0, autoArena);
}
public void subsequentMultiPage(SubsequentRequest request, StreamObserver<KV> responseObserver, int pageIndex) {
public void subsequentMultiPage(SubsequentRequest request, StreamObserver<KV> responseObserver, int pageIndex, Arena autoArena) {
final long pageSize = 16L;
if (request.getTakeCount() > pageIndex * pageSize) {
client.getAsyncApi()
@ -431,6 +446,7 @@ public class GrpcServer extends Server {
)
.whenComplete((response, ex) -> {
if (ex != null) {
closeArenaSafe(autoArena);
responseObserver.onError(ex);
} else {
for (MemorySegment entry : response) {
@ -441,14 +457,23 @@ public class GrpcServer extends Server {
.setValue(ByteString.copyFrom(value.asByteBuffer()))
.build());
}
subsequentMultiPage(request, responseObserver, pageIndex + 1);
subsequentMultiPage(request, responseObserver, pageIndex + 1, autoArena);
}
});
} else {
closeArenaSafe(autoArena);
responseObserver.onCompleted();
}
}
private static void closeArenaSafe(Arena autoArena) {
try {
autoArena.close();
} catch (Exception ex2) {
LOG.error("Failed to close arena", ex2);
}
}
// mappers
private static ColumnSchema mapColumnSchema(it.cavallium.rockserver.core.common.api.proto.ColumnSchema schema) {
@ -504,9 +529,10 @@ public class GrpcServer extends Server {
}
private static <PREV, T> BiConsumer<? super PREV, Throwable> handleResponseObserver(Function<PREV, T> resultMapper,
StreamObserver<T> responseObserver) {
StreamObserver<T> responseObserver, @Nullable Arena autoArena) {
return (value, ex) -> {
if (ex != null) {
closeArenaSafe(autoArena);
var cause = ex;
if (cause instanceof CompletionException completionException) {
cause = completionException;
@ -523,12 +549,14 @@ public class GrpcServer extends Server {
try {
mapped = resultMapper.apply(value);
} catch (Throwable ex2) {
closeArenaSafe(autoArena);
responseObserver.onError(ex2);
return;
}
if (mapped != null) {
responseObserver.onNext(mapped);
}
closeArenaSafe(autoArena);
responseObserver.onCompleted();
}
};