diff --git a/src/fatjar/java/module-info.java b/src/fatjar/java/module-info.java index 84fbdd0..c8ee4f3 100644 --- a/src/fatjar/java/module-info.java +++ b/src/fatjar/java/module-info.java @@ -29,9 +29,9 @@ module rockserver.core { requires io.netty.transport.classes.epoll; requires org.reactivestreams; requires io.netty.transport.unix.common; - requires reactor.core; - requires reactor.grpc.stub; - requires java.annotation; + requires reactor.core; + requires reactor.grpc.stub; + requires java.annotation; exports it.cavallium.rockserver.core.client; exports it.cavallium.rockserver.core.common; @@ -40,4 +40,5 @@ module rockserver.core { opens it.cavallium.rockserver.core.config to org.github.gestalt.core, org.github.gestalt.hocon; exports it.cavallium.rockserver.core.impl.rocksdb; exports it.cavallium.rockserver.core.impl; + exports it.cavallium.rockserver.core.common.api.proto to protobuf.java; } \ No newline at end of file diff --git a/src/library/java/module-info.java b/src/library/java/module-info.java index 9e0a315..6e34027 100644 --- a/src/library/java/module-info.java +++ b/src/library/java/module-info.java @@ -38,4 +38,5 @@ module rockserver.core { opens it.cavallium.rockserver.core.config to org.github.gestalt.core, org.github.gestalt.hocon; exports it.cavallium.rockserver.core.impl.rocksdb; exports it.cavallium.rockserver.core.impl; + exports it.cavallium.rockserver.core.common.api.proto to protobuf.java; } \ No newline at end of file diff --git a/src/main/java/it/cavallium/rockserver/core/common/RocksDBException.java b/src/main/java/it/cavallium/rockserver/core/common/RocksDBException.java index 9baf225..52ab96c 100644 --- a/src/main/java/it/cavallium/rockserver/core/common/RocksDBException.java +++ b/src/main/java/it/cavallium/rockserver/core/common/RocksDBException.java @@ -34,7 +34,8 @@ public class RocksDBException extends RuntimeException { SST_GET_SIZE_FAILED, UNSUPPORTED_COLUMN_TYPE, NOT_IMPLEMENTED, - GET_PROPERTY_ERROR + GET_PROPERTY_ERROR, + INTERNAL_ERROR } public static RocksDBException of(RocksDBErrorType errorUniqueId, String message) { diff --git a/src/main/java/it/cavallium/rockserver/core/server/GrpcServer.java b/src/main/java/it/cavallium/rockserver/core/server/GrpcServer.java index dd0daaf..eaf7513 100644 --- a/src/main/java/it/cavallium/rockserver/core/server/GrpcServer.java +++ b/src/main/java/it/cavallium/rockserver/core/server/GrpcServer.java @@ -34,6 +34,7 @@ import it.cavallium.rockserver.core.common.RequestType.RequestMulti; import it.cavallium.rockserver.core.common.RequestType.RequestNothing; import it.cavallium.rockserver.core.common.RequestType.RequestPrevious; import it.cavallium.rockserver.core.common.RequestType.RequestPreviousPresence; +import it.cavallium.rockserver.core.common.RocksDBException.RocksDBErrorType; import it.cavallium.rockserver.core.common.api.proto.*; import it.cavallium.rockserver.core.common.api.proto.Delta; import it.cavallium.rockserver.core.common.api.proto.FirstAndLast; @@ -124,7 +125,7 @@ public class GrpcServer extends Server { return executeSync(() -> { var txId = api.openTransaction(request.getTimeoutMs()); return OpenTransactionResponse.newBuilder().setTransactionId(txId).build(); - }).transform(this.onErrorMapMonoWithRequestInfo(request)); + }).transform(this.onErrorMapMonoWithRequestInfo("openTransaction", request)); } @Override @@ -132,7 +133,7 @@ public class GrpcServer extends Server { return executeSync(() -> { var committed = api.closeTransaction(request.getTransactionId(), request.getCommit()); return CloseTransactionResponse.newBuilder().setSuccessful(committed).build(); - }).transform(this.onErrorMapMonoWithRequestInfo(request)); + }).transform(this.onErrorMapMonoWithRequestInfo("closeTransaction", request)); } @Override @@ -140,7 +141,7 @@ public class GrpcServer extends Server { return executeSync(() -> { api.closeFailedUpdate(request.getUpdateId()); return Empty.getDefaultInstance(); - }).transform(this.onErrorMapMonoWithRequestInfo(request)); + }).transform(this.onErrorMapMonoWithRequestInfo("closeFailedUpdate", request)); } @Override @@ -148,7 +149,7 @@ public class GrpcServer extends Server { return executeSync(() -> { var colId = api.createColumn(request.getName(), mapColumnSchema(request.getSchema())); return CreateColumnResponse.newBuilder().setColumnId(colId).build(); - }).transform(this.onErrorMapMonoWithRequestInfo(request)); + }).transform(this.onErrorMapMonoWithRequestInfo("createColumn", request)); } @Override @@ -156,7 +157,7 @@ public class GrpcServer extends Server { return executeSync(() -> { api.deleteColumn(request.getColumnId()); return Empty.getDefaultInstance(); - }).transform(this.onErrorMapMonoWithRequestInfo(request)); + }).transform(this.onErrorMapMonoWithRequestInfo("deleteColumn", request)); } @Override @@ -164,7 +165,7 @@ public class GrpcServer extends Server { return executeSync(() -> { var colId = api.getColumnId(request.getName()); return GetColumnIdResponse.newBuilder().setColumnId(colId).build(); - }).transform(this.onErrorMapMonoWithRequestInfo(request)); + }).transform(this.onErrorMapMonoWithRequestInfo("getColumnId", request)); } @Override @@ -180,7 +181,7 @@ public class GrpcServer extends Server { ); } return Empty.getDefaultInstance(); - }).transform(this.onErrorMapMonoWithRequestInfo(request)); + }).transform(this.onErrorMapMonoWithRequestInfo("put", request)); } @Override @@ -209,7 +210,7 @@ public class GrpcServer extends Server { return Mono .fromFuture(() -> asyncApi.putBatchAsync(initialRequest.getColumnId(), batches, mode)) - .transform(this.onErrorMapMonoWithRequestInfo(initialRequest)); + .transform(this.onErrorMapMonoWithRequestInfo("putBatch", initialRequest)); } else if (firstSignal.isOnComplete()) { return Mono.just(RocksDBException.of( RocksDBException.RocksDBErrorType.PUT_INVALID_REQUEST, "No initial request")); @@ -244,7 +245,7 @@ public class GrpcServer extends Server { new RequestNothing<>()); } }) - .transform(this.onErrorMapFluxWithRequestInfo(initialRequest)); + .transform(this.onErrorMapFluxWithRequestInfo("putMulti", initialRequest)); } else if (firstSignal.isOnComplete()) { return Mono.just(RocksDBException.of( RocksDBException.RocksDBErrorType.PUT_INVALID_REQUEST, "No initial request")); @@ -271,7 +272,7 @@ public class GrpcServer extends Server { } return prevBuilder.build(); } - }).transform(this.onErrorMapMonoWithRequestInfo(request)); + }).transform(this.onErrorMapMonoWithRequestInfo("putGetPrevious", request)); } @Override @@ -294,7 +295,7 @@ public class GrpcServer extends Server { } return deltaBuilder.build(); } - }).transform(this.onErrorMapMonoWithRequestInfo(request)); + }).transform(this.onErrorMapMonoWithRequestInfo("putGetDelta", request)); } @Override @@ -310,7 +311,7 @@ public class GrpcServer extends Server { ); return Changed.newBuilder().setChanged(changed).build(); } - }).transform(this.onErrorMapMonoWithRequestInfo(request)); + }).transform(this.onErrorMapMonoWithRequestInfo("putGetChanged", request)); } @Override @@ -326,7 +327,7 @@ public class GrpcServer extends Server { ); return PreviousPresence.newBuilder().setPresent(present).build(); } - }).transform(this.onErrorMapMonoWithRequestInfo(request)); + }).transform(this.onErrorMapMonoWithRequestInfo("putGetPreviousPresence", request)); } @Override @@ -345,7 +346,7 @@ public class GrpcServer extends Server { } return responseBuilder.build(); } - }).transform(this.onErrorMapMonoWithRequestInfo(request)); + }).transform(this.onErrorMapMonoWithRequestInfo("get", request)); } @Override @@ -365,7 +366,7 @@ public class GrpcServer extends Server { } return responseBuilder.build(); } - }).transform(this.onErrorMapMonoWithRequestInfo(request)); + }).transform(this.onErrorMapMonoWithRequestInfo("getForUpdate", request)); } @Override @@ -380,7 +381,7 @@ public class GrpcServer extends Server { ); return PreviousPresence.newBuilder().setPresent(exists).build(); } - }).transform(this.onErrorMapMonoWithRequestInfo(request)); + }).transform(this.onErrorMapMonoWithRequestInfo("exists", request)); } @Override @@ -397,7 +398,7 @@ public class GrpcServer extends Server { ); return OpenIteratorResponse.newBuilder().setIteratorId(iteratorId).build(); } - }).transform(this.onErrorMapMonoWithRequestInfo(request)); + }).transform(this.onErrorMapMonoWithRequestInfo("openIterator", request)); } @Override @@ -405,7 +406,7 @@ public class GrpcServer extends Server { return executeSync(() -> { api.closeIterator(request.getIteratorId()); return Empty.getDefaultInstance(); - }).transform(this.onErrorMapMonoWithRequestInfo(request)); + }).transform(this.onErrorMapMonoWithRequestInfo("closeIterator", request)); } @Override @@ -415,7 +416,7 @@ public class GrpcServer extends Server { api.seekTo(arena, request.getIterationId(), mapKeys(arena, request.getKeysCount(), request::getKeys)); } return Empty.getDefaultInstance(); - }).transform(this.onErrorMapMonoWithRequestInfo(request)); + }).transform(this.onErrorMapMonoWithRequestInfo("seekTo", request)); } @Override @@ -428,7 +429,7 @@ public class GrpcServer extends Server { new RequestNothing<>()); } return Empty.getDefaultInstance(); - }).transform(this.onErrorMapMonoWithRequestInfo(request)); + }).transform(this.onErrorMapMonoWithRequestInfo("subsequent", request)); } @Override @@ -441,7 +442,7 @@ public class GrpcServer extends Server { new RequestExists<>()); return PreviousPresence.newBuilder().setPresent(exists).build(); } - }).transform(this.onErrorMapMonoWithRequestInfo(request)); + }).transform(this.onErrorMapMonoWithRequestInfo("subsequentExists", request)); } @Override @@ -469,7 +470,7 @@ public class GrpcServer extends Server { } } emitter.complete(); - }, FluxSink.OverflowStrategy.BUFFER).transform(this.onErrorMapFluxWithRequestInfo(request)); + }, FluxSink.OverflowStrategy.BUFFER).transform(this.onErrorMapFluxWithRequestInfo("subsequentMultiGet", request)); } @Override @@ -490,7 +491,7 @@ public class GrpcServer extends Server { resultBuilder.setLast(unmapKV(firstAndLast.last())); } return resultBuilder.build(); - }).transform(this.onErrorMapMonoWithRequestInfo(request)); + }).transform(this.onErrorMapMonoWithRequestInfo("reduceRangeFirstAndLast", request)); } @Override @@ -509,7 +510,7 @@ public class GrpcServer extends Server { ); return EntriesCount.newBuilder().setCount(entriesCount).build(); } - }).transform(this.onErrorMapMonoWithRequestInfo(request)); + }).transform(this.onErrorMapMonoWithRequestInfo("reduceRangeEntriesCount", request)); } @Override @@ -526,7 +527,7 @@ public class GrpcServer extends Server { request.getTimeoutMs() )) .map(GrpcServerImpl::unmapKV) - .transform(this.onErrorMapFluxWithRequestInfo(request)); + .transform(this.onErrorMapFluxWithRequestInfo("getAllInRange", request)); } @@ -538,7 +539,7 @@ public class GrpcServer extends Server { // mappers - private Function, Flux> onErrorMapFluxWithRequestInfo(Message request) { + private Function, Flux> onErrorMapFluxWithRequestInfo(String requestName, Message request) { return flux -> flux.onErrorResume(throwable -> { var ex = handleError(throwable).asException(); if (ex.getStatus().getCode() == Code.INTERNAL) { @@ -548,11 +549,12 @@ public class GrpcServer extends Server { }); } - private Function, Mono> onErrorMapMonoWithRequestInfo(Message request) { + private Function, Mono> onErrorMapMonoWithRequestInfo(String requestName, Message request) { return flux -> flux.onErrorResume(throwable -> { var ex = handleError(throwable).asException(); if (ex.getStatus().getCode() == Code.INTERNAL) { - LOG.error("Unexpected internal error during request: {}", request, ex); + LOG.error("Unexpected internal error during request \"{}\": {}", requestName, request.toString(), ex); + return Mono.error(RocksDBException.of(RocksDBErrorType.INTERNAL_ERROR, ex.getCause())); } return Mono.error(ex); }); @@ -561,7 +563,7 @@ public class GrpcServer extends Server { @Override protected Throwable onErrorMap(Throwable throwable) { var ex = handleError(throwable).asException(); - if (ex.getStatus().getCode() == Code.INTERNAL) { + if (ex.getStatus().getCode() == Code.INTERNAL && !(throwable instanceof RocksDBException)) { LOG.error("Unexpected internal error during request", ex); } return ex; diff --git a/src/native/java/module-info.java b/src/native/java/module-info.java index d548034..6aba5a5 100644 --- a/src/native/java/module-info.java +++ b/src/native/java/module-info.java @@ -40,4 +40,5 @@ module rockserver.core { opens it.cavallium.rockserver.core.config to org.github.gestalt.core, org.github.gestalt.hocon; exports it.cavallium.rockserver.core.impl.rocksdb; exports it.cavallium.rockserver.core.impl; + exports it.cavallium.rockserver.core.common.api.proto to protobuf.java; } \ No newline at end of file