Print errors better

This commit is contained in:
Andrea Cavalli 2024-10-24 01:09:49 +02:00
parent fbf17c8650
commit 4646f24fb3

View File

@ -4,12 +4,13 @@ import static it.cavallium.rockserver.core.common.Utils.toMemorySegment;
import com.google.protobuf.ByteString; import com.google.protobuf.ByteString;
import com.google.protobuf.Empty; import com.google.protobuf.Empty;
import com.google.protobuf.Message;
import com.google.protobuf.UnsafeByteOperations; import com.google.protobuf.UnsafeByteOperations;
import io.grpc.Status; import io.grpc.Status;
import io.grpc.Status.Code;
import io.grpc.StatusException;
import io.grpc.StatusRuntimeException; import io.grpc.StatusRuntimeException;
import io.grpc.netty.NettyServerBuilder; import io.grpc.netty.NettyServerBuilder;
import io.grpc.stub.ServerCallStreamObserver;
import io.grpc.stub.StreamObserver;
import io.netty.channel.ChannelOption; import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup; import io.netty.channel.EventLoopGroup;
import io.netty.channel.ServerChannel; import io.netty.channel.ServerChannel;
@ -53,6 +54,7 @@ import java.util.List;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.CompletionException; import java.util.concurrent.CompletionException;
import java.util.function.Function;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -122,7 +124,7 @@ public class GrpcServer extends Server {
return executeSync(() -> { return executeSync(() -> {
var txId = api.openTransaction(request.getTimeoutMs()); var txId = api.openTransaction(request.getTimeoutMs());
return OpenTransactionResponse.newBuilder().setTransactionId(txId).build(); return OpenTransactionResponse.newBuilder().setTransactionId(txId).build();
}); }).transform(this.onErrorMapMonoWithRequestInfo(request));
} }
@Override @Override
@ -130,7 +132,7 @@ public class GrpcServer extends Server {
return executeSync(() -> { return executeSync(() -> {
var committed = api.closeTransaction(request.getTransactionId(), request.getCommit()); var committed = api.closeTransaction(request.getTransactionId(), request.getCommit());
return CloseTransactionResponse.newBuilder().setSuccessful(committed).build(); return CloseTransactionResponse.newBuilder().setSuccessful(committed).build();
}); }).transform(this.onErrorMapMonoWithRequestInfo(request));
} }
@Override @Override
@ -138,7 +140,7 @@ public class GrpcServer extends Server {
return executeSync(() -> { return executeSync(() -> {
api.closeFailedUpdate(request.getUpdateId()); api.closeFailedUpdate(request.getUpdateId());
return Empty.getDefaultInstance(); return Empty.getDefaultInstance();
}); }).transform(this.onErrorMapMonoWithRequestInfo(request));
} }
@Override @Override
@ -146,7 +148,7 @@ public class GrpcServer extends Server {
return executeSync(() -> { return executeSync(() -> {
var colId = api.createColumn(request.getName(), mapColumnSchema(request.getSchema())); var colId = api.createColumn(request.getName(), mapColumnSchema(request.getSchema()));
return CreateColumnResponse.newBuilder().setColumnId(colId).build(); return CreateColumnResponse.newBuilder().setColumnId(colId).build();
}); }).transform(this.onErrorMapMonoWithRequestInfo(request));
} }
@Override @Override
@ -154,7 +156,7 @@ public class GrpcServer extends Server {
return executeSync(() -> { return executeSync(() -> {
api.deleteColumn(request.getColumnId()); api.deleteColumn(request.getColumnId());
return Empty.getDefaultInstance(); return Empty.getDefaultInstance();
}); }).transform(this.onErrorMapMonoWithRequestInfo(request));
} }
@Override @Override
@ -162,7 +164,7 @@ public class GrpcServer extends Server {
return executeSync(() -> { return executeSync(() -> {
var colId = api.getColumnId(request.getName()); var colId = api.getColumnId(request.getName());
return GetColumnIdResponse.newBuilder().setColumnId(colId).build(); return GetColumnIdResponse.newBuilder().setColumnId(colId).build();
}); }).transform(this.onErrorMapMonoWithRequestInfo(request));
} }
@Override @Override
@ -178,7 +180,7 @@ public class GrpcServer extends Server {
); );
} }
return Empty.getDefaultInstance(); return Empty.getDefaultInstance();
}); }).transform(this.onErrorMapMonoWithRequestInfo(request));
} }
@Override @Override
@ -205,7 +207,9 @@ public class GrpcServer extends Server {
return mapKVBatch(Arena.ofAuto(), batch.getEntriesCount(), batch::getEntries); return mapKVBatch(Arena.ofAuto(), batch.getEntriesCount(), batch::getEntries);
}); });
return Mono.fromFuture(() -> asyncApi.putBatchAsync(initialRequest.getColumnId(), batches, mode)); return Mono
.fromFuture(() -> asyncApi.putBatchAsync(initialRequest.getColumnId(), batches, mode))
.transform(this.onErrorMapMonoWithRequestInfo(initialRequest));
} else if (firstSignal.isOnComplete()) { } else if (firstSignal.isOnComplete()) {
return Mono.just(RocksDBException.of( return Mono.just(RocksDBException.of(
RocksDBException.RocksDBErrorType.PUT_INVALID_REQUEST, "No initial request")); RocksDBException.RocksDBErrorType.PUT_INVALID_REQUEST, "No initial request"));
@ -239,7 +243,8 @@ public class GrpcServer extends Server {
toMemorySegment(arena, data.getValue()), toMemorySegment(arena, data.getValue()),
new RequestNothing<>()); new RequestNothing<>());
} }
}); })
.transform(this.onErrorMapFluxWithRequestInfo(initialRequest));
} else if (firstSignal.isOnComplete()) { } else if (firstSignal.isOnComplete()) {
return Mono.just(RocksDBException.of( return Mono.just(RocksDBException.of(
RocksDBException.RocksDBErrorType.PUT_INVALID_REQUEST, "No initial request")); RocksDBException.RocksDBErrorType.PUT_INVALID_REQUEST, "No initial request"));
@ -266,7 +271,7 @@ public class GrpcServer extends Server {
} }
return prevBuilder.build(); return prevBuilder.build();
} }
}); }).transform(this.onErrorMapMonoWithRequestInfo(request));
} }
@Override @Override
@ -289,7 +294,7 @@ public class GrpcServer extends Server {
} }
return deltaBuilder.build(); return deltaBuilder.build();
} }
}); }).transform(this.onErrorMapMonoWithRequestInfo(request));
} }
@Override @Override
@ -305,7 +310,7 @@ public class GrpcServer extends Server {
); );
return Changed.newBuilder().setChanged(changed).build(); return Changed.newBuilder().setChanged(changed).build();
} }
}); }).transform(this.onErrorMapMonoWithRequestInfo(request));
} }
@Override @Override
@ -321,7 +326,7 @@ public class GrpcServer extends Server {
); );
return PreviousPresence.newBuilder().setPresent(present).build(); return PreviousPresence.newBuilder().setPresent(present).build();
} }
}); }).transform(this.onErrorMapMonoWithRequestInfo(request));
} }
@Override @Override
@ -340,7 +345,7 @@ public class GrpcServer extends Server {
} }
return responseBuilder.build(); return responseBuilder.build();
} }
}); }).transform(this.onErrorMapMonoWithRequestInfo(request));
} }
@Override @Override
@ -360,7 +365,7 @@ public class GrpcServer extends Server {
} }
return responseBuilder.build(); return responseBuilder.build();
} }
}); }).transform(this.onErrorMapMonoWithRequestInfo(request));
} }
@Override @Override
@ -375,7 +380,7 @@ public class GrpcServer extends Server {
); );
return PreviousPresence.newBuilder().setPresent(exists).build(); return PreviousPresence.newBuilder().setPresent(exists).build();
} }
}); }).transform(this.onErrorMapMonoWithRequestInfo(request));
} }
@Override @Override
@ -392,7 +397,7 @@ public class GrpcServer extends Server {
); );
return OpenIteratorResponse.newBuilder().setIteratorId(iteratorId).build(); return OpenIteratorResponse.newBuilder().setIteratorId(iteratorId).build();
} }
}); }).transform(this.onErrorMapMonoWithRequestInfo(request));
} }
@Override @Override
@ -400,7 +405,7 @@ public class GrpcServer extends Server {
return executeSync(() -> { return executeSync(() -> {
api.closeIterator(request.getIteratorId()); api.closeIterator(request.getIteratorId());
return Empty.getDefaultInstance(); return Empty.getDefaultInstance();
}); }).transform(this.onErrorMapMonoWithRequestInfo(request));
} }
@Override @Override
@ -410,7 +415,7 @@ public class GrpcServer extends Server {
api.seekTo(arena, request.getIterationId(), mapKeys(arena, request.getKeysCount(), request::getKeys)); api.seekTo(arena, request.getIterationId(), mapKeys(arena, request.getKeysCount(), request::getKeys));
} }
return Empty.getDefaultInstance(); return Empty.getDefaultInstance();
}); }).transform(this.onErrorMapMonoWithRequestInfo(request));
} }
@Override @Override
@ -423,7 +428,7 @@ public class GrpcServer extends Server {
new RequestNothing<>()); new RequestNothing<>());
} }
return Empty.getDefaultInstance(); return Empty.getDefaultInstance();
}); }).transform(this.onErrorMapMonoWithRequestInfo(request));
} }
@Override @Override
@ -436,12 +441,12 @@ public class GrpcServer extends Server {
new RequestExists<>()); new RequestExists<>());
return PreviousPresence.newBuilder().setPresent(exists).build(); return PreviousPresence.newBuilder().setPresent(exists).build();
} }
}); }).transform(this.onErrorMapMonoWithRequestInfo(request));
} }
@Override @Override
public Flux<KV> subsequentMultiGet(SubsequentRequest request) { public Flux<KV> subsequentMultiGet(SubsequentRequest request) {
return Flux.create(emitter -> { return Flux.<KV>create(emitter -> {
try (var arena = Arena.ofConfined()) { try (var arena = Arena.ofConfined()) {
int pageIndex = 0; int pageIndex = 0;
final long pageSize = 16L; final long pageSize = 16L;
@ -464,7 +469,7 @@ public class GrpcServer extends Server {
} }
} }
emitter.complete(); emitter.complete();
}, FluxSink.OverflowStrategy.BUFFER); }, FluxSink.OverflowStrategy.BUFFER).transform(this.onErrorMapFluxWithRequestInfo(request));
} }
@Override @Override
@ -485,7 +490,7 @@ public class GrpcServer extends Server {
resultBuilder.setLast(unmapKV(firstAndLast.last())); resultBuilder.setLast(unmapKV(firstAndLast.last()));
} }
return resultBuilder.build(); return resultBuilder.build();
}); }).transform(this.onErrorMapMonoWithRequestInfo(request));
} }
@Override @Override
@ -504,7 +509,7 @@ public class GrpcServer extends Server {
); );
return EntriesCount.newBuilder().setCount(entriesCount).build(); return EntriesCount.newBuilder().setCount(entriesCount).build();
} }
}); }).transform(this.onErrorMapMonoWithRequestInfo(request));
} }
@Override @Override
@ -521,23 +526,9 @@ public class GrpcServer extends Server {
request.getTimeoutMs() request.getTimeoutMs()
)) ))
.map(GrpcServerImpl::unmapKV) .map(GrpcServerImpl::unmapKV)
.onErrorResume(ex -> { .transform(this.onErrorMapFluxWithRequestInfo(request));
if (!(ex instanceof RocksDBException)) {
LOG.error("Unexpected error during request: {}", request, ex);
}
return Mono.error(ex);
});
} }
private static void closeArenaSafe(Arena autoArena) {
if (autoArena != null) {
try {
autoArena.close();
} catch (Exception ex2) {
LOG.error("Failed to close arena", ex2);
}
}
}
// utils // utils
@ -547,6 +538,35 @@ public class GrpcServer extends Server {
// mappers // mappers
private <T> Function<Flux<T>, Flux<T>> onErrorMapFluxWithRequestInfo(Message request) {
return flux -> flux.onErrorResume(throwable -> {
var ex = handleError(throwable).asException();
if (ex.getStatus().getCode() == Code.INTERNAL) {
LOG.error("Unexpected internal error during request: {}", request, ex);
}
return Mono.error(ex);
});
}
private <T> Function<Mono<T>, Mono<T>> onErrorMapMonoWithRequestInfo(Message request) {
return flux -> flux.onErrorResume(throwable -> {
var ex = handleError(throwable).asException();
if (ex.getStatus().getCode() == Code.INTERNAL) {
LOG.error("Unexpected internal error during request: {}", request, ex);
}
return Mono.error(ex);
});
}
@Override
protected Throwable onErrorMap(Throwable throwable) {
var ex = handleError(throwable).asException();
if (ex.getStatus().getCode() == Code.INTERNAL) {
LOG.error("Unexpected internal error during request", ex);
}
return ex;
}
private static KV unmapKV(it.cavallium.rockserver.core.common.KV kv) { private static KV unmapKV(it.cavallium.rockserver.core.common.KV kv) {
if (kv == null) return null; if (kv == null) return null;
return KV.newBuilder() return KV.newBuilder()
@ -629,29 +649,25 @@ public class GrpcServer extends Server {
); );
} }
private static void handleError(StreamObserver<?> responseObserver, Throwable ex) { private static Status handleError(Throwable ex) {
if (ex instanceof StatusRuntimeException e && e.getStatus().getCode().equals(Status.CANCELLED.getCode())) { if (ex instanceof StatusRuntimeException e && e.getStatus().getCode().equals(Status.CANCELLED.getCode())) {
LOG.warn("Connection cancelled: {}", e.getStatus().getDescription()); LOG.warn("Connection cancelled: {}", e.getStatus().getDescription());
return; return e.getStatus();
} }
if (ex instanceof CompletionException exx) { if (ex instanceof CompletionException exx) {
handleError(responseObserver, exx.getCause()); return handleError(exx.getCause());
} else { } else {
var serverResponseObserver = ((ServerCallStreamObserver<?>) responseObserver);
if (!serverResponseObserver.isCancelled()) {
if (ex instanceof RocksDBException e) { if (ex instanceof RocksDBException e) {
responseObserver.onError(Status.INTERNAL return Status.INTERNAL
.withDescription(e.getLocalizedMessage()) .withDescription(e.getLocalizedMessage())
.withCause(e) .withCause(e);
.asException()); } else if (ex instanceof StatusException ex2) {
return ex2.getStatus();
} else if (ex instanceof StatusRuntimeException ex3) {
return ex3.getStatus();
} else { } else {
responseObserver.onError(Status.INTERNAL return Status.INTERNAL.withCause(ex);
.withCause(ex)
.asException());
}
} else {
LOG.error("Unexpected error", ex);
} }
} }
} }