diff --git a/src/main/java/it/cavallium/rockserver/core/server/GrpcServer.java b/src/main/java/it/cavallium/rockserver/core/server/GrpcServer.java index d1b4e10..dd0daaf 100644 --- a/src/main/java/it/cavallium/rockserver/core/server/GrpcServer.java +++ b/src/main/java/it/cavallium/rockserver/core/server/GrpcServer.java @@ -4,12 +4,13 @@ import static it.cavallium.rockserver.core.common.Utils.toMemorySegment; import com.google.protobuf.ByteString; import com.google.protobuf.Empty; +import com.google.protobuf.Message; import com.google.protobuf.UnsafeByteOperations; import io.grpc.Status; +import io.grpc.Status.Code; +import io.grpc.StatusException; import io.grpc.StatusRuntimeException; import io.grpc.netty.NettyServerBuilder; -import io.grpc.stub.ServerCallStreamObserver; -import io.grpc.stub.StreamObserver; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.ServerChannel; @@ -53,6 +54,7 @@ import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.CompletionException; +import java.util.function.Function; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.slf4j.Logger; @@ -122,7 +124,7 @@ public class GrpcServer extends Server { return executeSync(() -> { var txId = api.openTransaction(request.getTimeoutMs()); return OpenTransactionResponse.newBuilder().setTransactionId(txId).build(); - }); + }).transform(this.onErrorMapMonoWithRequestInfo(request)); } @Override @@ -130,7 +132,7 @@ public class GrpcServer extends Server { return executeSync(() -> { var committed = api.closeTransaction(request.getTransactionId(), request.getCommit()); return CloseTransactionResponse.newBuilder().setSuccessful(committed).build(); - }); + }).transform(this.onErrorMapMonoWithRequestInfo(request)); } @Override @@ -138,7 +140,7 @@ public class GrpcServer extends Server { return executeSync(() -> { api.closeFailedUpdate(request.getUpdateId()); return Empty.getDefaultInstance(); - }); + }).transform(this.onErrorMapMonoWithRequestInfo(request)); } @Override @@ -146,7 +148,7 @@ public class GrpcServer extends Server { return executeSync(() -> { var colId = api.createColumn(request.getName(), mapColumnSchema(request.getSchema())); return CreateColumnResponse.newBuilder().setColumnId(colId).build(); - }); + }).transform(this.onErrorMapMonoWithRequestInfo(request)); } @Override @@ -154,7 +156,7 @@ public class GrpcServer extends Server { return executeSync(() -> { api.deleteColumn(request.getColumnId()); return Empty.getDefaultInstance(); - }); + }).transform(this.onErrorMapMonoWithRequestInfo(request)); } @Override @@ -162,7 +164,7 @@ public class GrpcServer extends Server { return executeSync(() -> { var colId = api.getColumnId(request.getName()); return GetColumnIdResponse.newBuilder().setColumnId(colId).build(); - }); + }).transform(this.onErrorMapMonoWithRequestInfo(request)); } @Override @@ -178,7 +180,7 @@ public class GrpcServer extends Server { ); } return Empty.getDefaultInstance(); - }); + }).transform(this.onErrorMapMonoWithRequestInfo(request)); } @Override @@ -205,7 +207,9 @@ public class GrpcServer extends Server { return mapKVBatch(Arena.ofAuto(), batch.getEntriesCount(), batch::getEntries); }); - return Mono.fromFuture(() -> asyncApi.putBatchAsync(initialRequest.getColumnId(), batches, mode)); + return Mono + .fromFuture(() -> asyncApi.putBatchAsync(initialRequest.getColumnId(), batches, mode)) + .transform(this.onErrorMapMonoWithRequestInfo(initialRequest)); } else if (firstSignal.isOnComplete()) { return Mono.just(RocksDBException.of( RocksDBException.RocksDBErrorType.PUT_INVALID_REQUEST, "No initial request")); @@ -228,18 +232,19 @@ public class GrpcServer extends Server { var initialRequest = firstValue.getInitialRequest(); return nextRequests - .publishOn(executor) - .doOnNext(putRequest -> { - var data = putRequest.getData(); - try (var arena = Arena.ofConfined()) { - api.put(arena, - initialRequest.getTransactionOrUpdateId(), - initialRequest.getColumnId(), - mapKeys(arena, data.getKeysCount(), data::getKeys), - toMemorySegment(arena, data.getValue()), - new RequestNothing<>()); - } - }); + .publishOn(executor) + .doOnNext(putRequest -> { + var data = putRequest.getData(); + try (var arena = Arena.ofConfined()) { + api.put(arena, + initialRequest.getTransactionOrUpdateId(), + initialRequest.getColumnId(), + mapKeys(arena, data.getKeysCount(), data::getKeys), + toMemorySegment(arena, data.getValue()), + new RequestNothing<>()); + } + }) + .transform(this.onErrorMapFluxWithRequestInfo(initialRequest)); } else if (firstSignal.isOnComplete()) { return Mono.just(RocksDBException.of( RocksDBException.RocksDBErrorType.PUT_INVALID_REQUEST, "No initial request")); @@ -264,9 +269,9 @@ public class GrpcServer extends Server { if (prev != null) { prevBuilder.setPrevious(ByteString.copyFrom(prev.asByteBuffer())); } - return prevBuilder.build(); + return prevBuilder.build(); } - }); + }).transform(this.onErrorMapMonoWithRequestInfo(request)); } @Override @@ -289,23 +294,23 @@ public class GrpcServer extends Server { } return deltaBuilder.build(); } - }); + }).transform(this.onErrorMapMonoWithRequestInfo(request)); } @Override public Mono putGetChanged(PutRequest request) { return executeSync(() -> { - try (var arena = Arena.ofConfined()) { - var changed = api.put(arena, - request.getTransactionOrUpdateId(), - request.getColumnId(), - mapKeys(arena, request.getData().getKeysCount(), request.getData()::getKeys), - toMemorySegment(arena, request.getData().getValue()), - new RequestChanged<>() - ); - return Changed.newBuilder().setChanged(changed).build(); - } - }); + try (var arena = Arena.ofConfined()) { + var changed = api.put(arena, + request.getTransactionOrUpdateId(), + request.getColumnId(), + mapKeys(arena, request.getData().getKeysCount(), request.getData()::getKeys), + toMemorySegment(arena, request.getData().getValue()), + new RequestChanged<>() + ); + return Changed.newBuilder().setChanged(changed).build(); + } + }).transform(this.onErrorMapMonoWithRequestInfo(request)); } @Override @@ -321,7 +326,7 @@ public class GrpcServer extends Server { ); return PreviousPresence.newBuilder().setPresent(present).build(); } - }); + }).transform(this.onErrorMapMonoWithRequestInfo(request)); } @Override @@ -340,7 +345,7 @@ public class GrpcServer extends Server { } return responseBuilder.build(); } - }); + }).transform(this.onErrorMapMonoWithRequestInfo(request)); } @Override @@ -360,7 +365,7 @@ public class GrpcServer extends Server { } return responseBuilder.build(); } - }); + }).transform(this.onErrorMapMonoWithRequestInfo(request)); } @Override @@ -375,7 +380,7 @@ public class GrpcServer extends Server { ); return PreviousPresence.newBuilder().setPresent(exists).build(); } - }); + }).transform(this.onErrorMapMonoWithRequestInfo(request)); } @Override @@ -392,7 +397,7 @@ public class GrpcServer extends Server { ); return OpenIteratorResponse.newBuilder().setIteratorId(iteratorId).build(); } - }); + }).transform(this.onErrorMapMonoWithRequestInfo(request)); } @Override @@ -400,7 +405,7 @@ public class GrpcServer extends Server { return executeSync(() -> { api.closeIterator(request.getIteratorId()); return Empty.getDefaultInstance(); - }); + }).transform(this.onErrorMapMonoWithRequestInfo(request)); } @Override @@ -410,7 +415,7 @@ public class GrpcServer extends Server { api.seekTo(arena, request.getIterationId(), mapKeys(arena, request.getKeysCount(), request::getKeys)); } return Empty.getDefaultInstance(); - }); + }).transform(this.onErrorMapMonoWithRequestInfo(request)); } @Override @@ -423,7 +428,7 @@ public class GrpcServer extends Server { new RequestNothing<>()); } return Empty.getDefaultInstance(); - }); + }).transform(this.onErrorMapMonoWithRequestInfo(request)); } @Override @@ -436,12 +441,12 @@ public class GrpcServer extends Server { new RequestExists<>()); return PreviousPresence.newBuilder().setPresent(exists).build(); } - }); + }).transform(this.onErrorMapMonoWithRequestInfo(request)); } @Override public Flux subsequentMultiGet(SubsequentRequest request) { - return Flux.create(emitter -> { + return Flux.create(emitter -> { try (var arena = Arena.ofConfined()) { int pageIndex = 0; final long pageSize = 16L; @@ -464,7 +469,7 @@ public class GrpcServer extends Server { } } emitter.complete(); - }, FluxSink.OverflowStrategy.BUFFER); + }, FluxSink.OverflowStrategy.BUFFER).transform(this.onErrorMapFluxWithRequestInfo(request)); } @Override @@ -485,7 +490,7 @@ public class GrpcServer extends Server { resultBuilder.setLast(unmapKV(firstAndLast.last())); } return resultBuilder.build(); - }); + }).transform(this.onErrorMapMonoWithRequestInfo(request)); } @Override @@ -504,7 +509,7 @@ public class GrpcServer extends Server { ); return EntriesCount.newBuilder().setCount(entriesCount).build(); } - }); + }).transform(this.onErrorMapMonoWithRequestInfo(request)); } @Override @@ -521,23 +526,9 @@ public class GrpcServer extends Server { request.getTimeoutMs() )) .map(GrpcServerImpl::unmapKV) - .onErrorResume(ex -> { - if (!(ex instanceof RocksDBException)) { - LOG.error("Unexpected error during request: {}", request, ex); - } - return Mono.error(ex); - }); + .transform(this.onErrorMapFluxWithRequestInfo(request)); } - private static void closeArenaSafe(Arena autoArena) { - if (autoArena != null) { - try { - autoArena.close(); - } catch (Exception ex2) { - LOG.error("Failed to close arena", ex2); - } - } - } // utils @@ -547,6 +538,35 @@ public class GrpcServer extends Server { // mappers + private Function, Flux> onErrorMapFluxWithRequestInfo(Message request) { + return flux -> flux.onErrorResume(throwable -> { + var ex = handleError(throwable).asException(); + if (ex.getStatus().getCode() == Code.INTERNAL) { + LOG.error("Unexpected internal error during request: {}", request, ex); + } + return Mono.error(ex); + }); + } + + private Function, Mono> onErrorMapMonoWithRequestInfo(Message request) { + return flux -> flux.onErrorResume(throwable -> { + var ex = handleError(throwable).asException(); + if (ex.getStatus().getCode() == Code.INTERNAL) { + LOG.error("Unexpected internal error during request: {}", request, ex); + } + return Mono.error(ex); + }); + } + + @Override + protected Throwable onErrorMap(Throwable throwable) { + var ex = handleError(throwable).asException(); + if (ex.getStatus().getCode() == Code.INTERNAL) { + LOG.error("Unexpected internal error during request", ex); + } + return ex; + } + private static KV unmapKV(it.cavallium.rockserver.core.common.KV kv) { if (kv == null) return null; return KV.newBuilder() @@ -629,29 +649,25 @@ public class GrpcServer extends Server { ); } - private static void handleError(StreamObserver responseObserver, Throwable ex) { + private static Status handleError(Throwable ex) { if (ex instanceof StatusRuntimeException e && e.getStatus().getCode().equals(Status.CANCELLED.getCode())) { LOG.warn("Connection cancelled: {}", e.getStatus().getDescription()); - return; + return e.getStatus(); } if (ex instanceof CompletionException exx) { - handleError(responseObserver, exx.getCause()); + return handleError(exx.getCause()); } else { - var serverResponseObserver = ((ServerCallStreamObserver) responseObserver); - if (!serverResponseObserver.isCancelled()) { - if (ex instanceof RocksDBException e) { - responseObserver.onError(Status.INTERNAL - .withDescription(e.getLocalizedMessage()) - .withCause(e) - .asException()); - } else { - responseObserver.onError(Status.INTERNAL - .withCause(ex) - .asException()); - } + if (ex instanceof RocksDBException e) { + return Status.INTERNAL + .withDescription(e.getLocalizedMessage()) + .withCause(e); + } else if (ex instanceof StatusException ex2) { + return ex2.getStatus(); + } else if (ex instanceof StatusRuntimeException ex3) { + return ex3.getStatus(); } else { - LOG.error("Unexpected error", ex); + return Status.INTERNAL.withCause(ex); } } }