From 302ae92fc1824437d12f7ed576992ce76d940a8c Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Tue, 29 Oct 2024 18:04:07 +0100 Subject: [PATCH] Use read-write thread pools --- .../core/client/EmbeddedConnection.java | 15 +++- .../core/client/GrpcConnection.java | 1 - .../core/common/RocksDBAPICommand.java | 83 +++++++++++++++++++ .../rockserver/core/config/ConfigPrinter.java | 14 +++- .../core/config/DatabaseConfig.java | 2 + .../core/config/ParallelismConfig.java | 16 ++++ .../rockserver/core/impl/EmbeddedDB.java | 41 ++++++--- .../core/impl/InternalConnection.java | 6 ++ .../rockserver/core/impl/RWScheduler.java | 32 +++++++ .../rockserver/core/server/GrpcServer.java | 82 +++++++++--------- 10 files changed, 236 insertions(+), 56 deletions(-) create mode 100644 src/main/java/it/cavallium/rockserver/core/config/ParallelismConfig.java create mode 100644 src/main/java/it/cavallium/rockserver/core/impl/InternalConnection.java create mode 100644 src/main/java/it/cavallium/rockserver/core/impl/RWScheduler.java diff --git a/src/main/java/it/cavallium/rockserver/core/client/EmbeddedConnection.java b/src/main/java/it/cavallium/rockserver/core/client/EmbeddedConnection.java index 9f7b1d5..1f1c697 100644 --- a/src/main/java/it/cavallium/rockserver/core/client/EmbeddedConnection.java +++ b/src/main/java/it/cavallium/rockserver/core/client/EmbeddedConnection.java @@ -4,6 +4,8 @@ import it.cavallium.rockserver.core.common.*; import it.cavallium.rockserver.core.common.RequestType.RequestGet; import it.cavallium.rockserver.core.common.RequestType.RequestPut; import it.cavallium.rockserver.core.impl.EmbeddedDB; +import it.cavallium.rockserver.core.impl.InternalConnection; +import it.cavallium.rockserver.core.impl.RWScheduler; import java.io.IOException; import java.lang.foreign.Arena; import java.lang.foreign.MemorySegment; @@ -13,6 +15,7 @@ import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.stream.Stream; @@ -21,16 +24,14 @@ import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.reactivestreams.Publisher; -public class EmbeddedConnection extends BaseConnection implements RocksDBAPI { +public class EmbeddedConnection extends BaseConnection implements RocksDBAPI, InternalConnection { private final EmbeddedDB db; public static final URI PRIVATE_MEMORY_URL = URI.create("memory://private"); - private final ExecutorService exeuctor; public EmbeddedConnection(@Nullable Path path, String name, @Nullable Path embeddedConfig) throws IOException { super(name); this.db = new EmbeddedDB(path, name, embeddedConfig); - this.exeuctor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); } @Override @@ -95,7 +96,8 @@ public class EmbeddedConnection extends BaseConnection implements RocksDBAPI { return (RA) switch (req) { case RocksDBAPICommand.RocksDBAPICommandSingle.PutBatch putBatch -> this.putBatchAsync(putBatch.columnId(), putBatch.batchPublisher(), putBatch.mode()); case RocksDBAPICommand.RocksDBAPICommandStream.GetRange getRange -> this.getRangeAsync(getRange.arena(), getRange.transactionId(), getRange.columnId(), getRange.startKeysInclusive(), getRange.endKeysExclusive(), getRange.reverse(), getRange.requestType(), getRange.timeoutMs()); - case RocksDBAPICommand.RocksDBAPICommandSingle _ -> CompletableFuture.supplyAsync(() -> req.handleSync(this), exeuctor); + case RocksDBAPICommand.RocksDBAPICommandSingle _ -> CompletableFuture.supplyAsync(() -> req.handleSync(this), + (req.isReadOnly() ? db.getScheduler().readExecutor() : db.getScheduler().writeExecutor())); case RocksDBAPICommand.RocksDBAPICommandStream _ -> throw RocksDBException.of(RocksDBException.RocksDBErrorType.NOT_IMPLEMENTED, "The request of type " + req.getClass().getName() + " is not implemented in class " + this.getClass().getName()); }; } @@ -187,4 +189,9 @@ public class EmbeddedConnection extends BaseConnection implements RocksDBAPI { public Publisher getRangeAsync(Arena arena, long transactionId, long columnId, @Nullable Keys startKeysInclusive, @Nullable Keys endKeysExclusive, boolean reverse, RequestType.RequestGetRange requestType, long timeoutMs) throws RocksDBException { return db.getRangeAsyncInternal(arena, transactionId, columnId, startKeysInclusive, endKeysExclusive, reverse, requestType, timeoutMs); } + + @Override + public RWScheduler getScheduler() { + return db.getScheduler(); + } } diff --git a/src/main/java/it/cavallium/rockserver/core/client/GrpcConnection.java b/src/main/java/it/cavallium/rockserver/core/client/GrpcConnection.java index dd897be..3aa1781 100644 --- a/src/main/java/it/cavallium/rockserver/core/client/GrpcConnection.java +++ b/src/main/java/it/cavallium/rockserver/core/client/GrpcConnection.java @@ -8,7 +8,6 @@ import com.google.common.util.concurrent.MoreExecutors; import com.google.protobuf.ByteString; import com.google.protobuf.UnsafeByteOperations; import io.grpc.ManagedChannel; -import io.grpc.Status; import io.grpc.Status.Code; import io.grpc.StatusRuntimeException; import io.grpc.netty.NettyChannelBuilder; diff --git a/src/main/java/it/cavallium/rockserver/core/common/RocksDBAPICommand.java b/src/main/java/it/cavallium/rockserver/core/common/RocksDBAPICommand.java index 9479b1f..1a3816e 100644 --- a/src/main/java/it/cavallium/rockserver/core/common/RocksDBAPICommand.java +++ b/src/main/java/it/cavallium/rockserver/core/common/RocksDBAPICommand.java @@ -2,6 +2,7 @@ package it.cavallium.rockserver.core.common; import it.cavallium.rockserver.core.common.RequestType.RequestGet; import it.cavallium.rockserver.core.common.RequestType.RequestPut; +import it.cavallium.rockserver.core.common.RequestType.RequestTypeId; import java.lang.foreign.Arena; import java.lang.foreign.MemorySegment; import java.util.List; @@ -18,6 +19,8 @@ public sealed interface RocksDBAPICommand extends RocksDBAPICommand> { @@ -40,6 +43,11 @@ public sealed interface RocksDBAPICommand extends RocksDBAPICommand, Publisher> { @@ -492,6 +570,11 @@ public sealed interface RocksDBAPICommand columns; private final Map columnsConifg; @@ -89,6 +92,13 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable { this.dbOptions = loadedDb.dbOptions(); this.refs = loadedDb.refs(); this.cache = loadedDb.cache(); + try { + int readCap = Objects.requireNonNullElse(config.parallelism().read(), Runtime.getRuntime().availableProcessors()); + int writeCap = Objects.requireNonNullElse(config.parallelism().write(), Runtime.getRuntime().availableProcessors()); + this.scheduler = new RWScheduler(readCap, writeCap, "db"); + } catch (GestaltException e) { + throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.CONFIG_ERROR, "Can't get the scheduler parallelism"); + } this.columnsConifg = loadedDb.definitiveColumnFamilyOptionsMap(); try { this.tempSSTsPath = config.global().tempSstPath(); @@ -257,6 +267,12 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable { } } + private ReadOptions newReadOptions() { + var ro = new ReadOptions(); + ro.setAsyncIo(true); + return ro; + } + @Override public long openTransaction(long timeoutMs) { return allocateTransactionInternal(openTransactionInternal(timeoutMs, false)); @@ -716,7 +732,7 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable { if (col.hasBuckets()) { assert dbWriter instanceof Tx; var bucketElementKeys = col.getBucketElementKeys(keys.keys()); - try (var readOptions = new ReadOptions()) { + try (var readOptions = newReadOptions()) { var previousRawBucketByteArray = ((Tx) dbWriter).val().getForUpdate(readOptions, col.cfh(), calculatedKey.toArray(BIG_ENDIAN_BYTES), true); MemorySegment previousRawBucket = toMemorySegment(arena, previousRawBucketByteArray); var bucket = previousRawBucket != null ? new Bucket(col, previousRawBucket) : new Bucket(col); @@ -730,7 +746,7 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable { } else { if (RequestType.requiresGettingPreviousValue(callback)) { assert dbWriter instanceof Tx; - try (var readOptions = new ReadOptions()) { + try (var readOptions = newReadOptions()) { byte[] previousValueByteArray; previousValueByteArray = ((Tx) dbWriter).val().getForUpdate(readOptions, col.cfh(), calculatedKey.toArray(BIG_ENDIAN_BYTES), true); previousValue = transformResultValue(col, toMemorySegment(arena, previousValueByteArray)); @@ -740,7 +756,7 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable { } else if (RequestType.requiresGettingPreviousPresence(callback)) { // todo: in the future this should be replaced with just keyExists assert dbWriter instanceof Tx; - try (var readOptions = new ReadOptions()) { + try (var readOptions = newReadOptions()) { byte[] previousValueByteArray; previousValueByteArray = ((Tx) dbWriter).val().getForUpdate(readOptions, col.cfh(), calculatedKey.toArray(BIG_ENDIAN_BYTES), true); previousValue = previousValueByteArray != null ? MemorySegment.NULL : null; @@ -850,7 +866,7 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable { MemorySegment calculatedKey = col.calculateKey(arena, keys.keys()); if (col.hasBuckets()) { var bucketElementKeys = col.getBucketElementKeys(keys.keys()); - try (var readOptions = new ReadOptions()) { + try (var readOptions = newReadOptions()) { MemorySegment previousRawBucket = dbGet(tx, col, arena, readOptions, calculatedKey); if (previousRawBucket != null) { var bucket = new Bucket(col, previousRawBucket); @@ -866,7 +882,7 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable { boolean shouldGetCurrent = RequestType.requiresGettingCurrentValue(callback) || (tx != null && callback instanceof RequestType.RequestExists); if (shouldGetCurrent) { - try (var readOptions = new ReadOptions()) { + try (var readOptions = newReadOptions()) { foundValue = dbGet(tx, col, arena, readOptions, calculatedKey); existsValue = foundValue != null; } catch (org.rocksdb.RocksDBException e) { @@ -914,7 +930,7 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable { try { var col = getColumn(columnId); RocksIterator it; - var ro = new ReadOptions(); + var ro = newReadOptions(); if (transactionId > 0L) { //noinspection resource it = getTransaction(transactionId, false).val().getIterator(ro, col.cfh()); @@ -987,7 +1003,7 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable { } } - try (var ro = new ReadOptions()) { + try (var ro = newReadOptions()) { MemorySegment calculatedStartKey = startKeysInclusive != null && startKeysInclusive.keys().length > 0 ? col.calculateKey(arena, startKeysInclusive.keys()) : null; MemorySegment calculatedEndKey = endKeysExclusive != null && endKeysExclusive.keys().length > 0 ? col.calculateKey(arena, endKeysExclusive.keys()) : null; try (var startKeySlice = calculatedStartKey != null ? toDirectSlice(calculatedStartKey) : null; @@ -1099,7 +1115,7 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable { } } - var ro = new ReadOptions(); + var ro = newReadOptions(); try { MemorySegment calculatedStartKey = startKeysInclusive != null && startKeysInclusive.keys().length > 0 ? col.calculateKey(arena, startKeysInclusive.keys()) : null; MemorySegment calculatedEndKey = endKeysExclusive != null && endKeysExclusive.keys().length > 0 ? col.calculateKey(arena, endKeysExclusive.keys()) : null; @@ -1157,7 +1173,7 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable { } return it; }), Resources::close) - .subscribeOn(Schedulers.boundedElastic()) + .subscribeOn(scheduler.read()) .doFirst(ops::beginOp) .doFinally(_ -> ops.endOp()); } @@ -1270,4 +1286,9 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable { throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.NOT_IMPLEMENTED, "Bucket column type not implemented, implement them"); } + + @Override + public RWScheduler getScheduler() { + return scheduler; + } } diff --git a/src/main/java/it/cavallium/rockserver/core/impl/InternalConnection.java b/src/main/java/it/cavallium/rockserver/core/impl/InternalConnection.java new file mode 100644 index 0000000..8b21ccc --- /dev/null +++ b/src/main/java/it/cavallium/rockserver/core/impl/InternalConnection.java @@ -0,0 +1,6 @@ +package it.cavallium.rockserver.core.impl; + +public interface InternalConnection { + + RWScheduler getScheduler(); +} diff --git a/src/main/java/it/cavallium/rockserver/core/impl/RWScheduler.java b/src/main/java/it/cavallium/rockserver/core/impl/RWScheduler.java new file mode 100644 index 0000000..3561dde --- /dev/null +++ b/src/main/java/it/cavallium/rockserver/core/impl/RWScheduler.java @@ -0,0 +1,32 @@ +package it.cavallium.rockserver.core.impl; + +import static reactor.core.scheduler.Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE; + +import java.util.concurrent.Executor; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Scheduler; +import reactor.core.scheduler.Schedulers; + +public record RWScheduler(Scheduler read, Scheduler write, Executor readExecutor, Executor writeExecutor) { + + public RWScheduler(Scheduler read, Scheduler write) { + this(read, write, read::schedule, write::schedule); + } + + public RWScheduler(int readCap, int writeCap, String name) { + this( + Schedulers.newBoundedElastic(readCap, DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, name + "-read"), + Schedulers.newBoundedElastic(writeCap, DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, name + "-write") + ); + } + + public Mono disposeGracefully() { + return Mono.whenDelayError(read.disposeGracefully(), write.disposeGracefully()); + } + + public void dispose() { + read.dispose(); + write.dispose(); + } +} diff --git a/src/main/java/it/cavallium/rockserver/core/server/GrpcServer.java b/src/main/java/it/cavallium/rockserver/core/server/GrpcServer.java index 3ad2324..0121774 100644 --- a/src/main/java/it/cavallium/rockserver/core/server/GrpcServer.java +++ b/src/main/java/it/cavallium/rockserver/core/server/GrpcServer.java @@ -39,6 +39,8 @@ import it.cavallium.rockserver.core.common.api.proto.*; import it.cavallium.rockserver.core.common.api.proto.Delta; import it.cavallium.rockserver.core.common.api.proto.FirstAndLast; import it.cavallium.rockserver.core.common.api.proto.KV; +import it.cavallium.rockserver.core.impl.InternalConnection; +import it.cavallium.rockserver.core.impl.RWScheduler; import it.unimi.dsi.fastutil.ints.Int2IntFunction; import it.unimi.dsi.fastutil.ints.Int2ObjectFunction; import it.unimi.dsi.fastutil.ints.IntArrayList; @@ -63,8 +65,6 @@ import org.slf4j.LoggerFactory; import reactor.core.publisher.Flux; import reactor.core.publisher.FluxSink; import reactor.core.publisher.Mono; -import reactor.core.scheduler.Scheduler; -import reactor.core.scheduler.Schedulers; public class GrpcServer extends Server { @@ -72,11 +72,19 @@ public class GrpcServer extends Server { private final GrpcServerImpl grpc; private final EventLoopGroup elg; - private final Scheduler executor; private final io.grpc.Server server; + private final RWScheduler scheduler; public GrpcServer(RocksDBConnection client, SocketAddress socketAddress) throws IOException { super(client); + if (client instanceof InternalConnection internalConnection) { + this.scheduler = internalConnection.getScheduler(); + } else { + this.scheduler = new RWScheduler(Runtime.getRuntime().availableProcessors(), + Runtime.getRuntime().availableProcessors(), + "grpc-db" + ); + } this.grpc = new GrpcServerImpl(this.getClient()); EventLoopGroup elg; Class channelType; @@ -88,7 +96,6 @@ public class GrpcServer extends Server { channelType = NioServerSocketChannel.class; } this.elg = elg; - this.executor = Schedulers.newBoundedElastic(Runtime.getRuntime().availableProcessors() * 2, Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, "server-db-executor"); this.server = NettyServerBuilder .forAddress(socketAddress) .bossEventLoopGroup(elg) @@ -125,7 +132,7 @@ public class GrpcServer extends Server { return executeSync(() -> { var txId = api.openTransaction(request.getTimeoutMs()); return OpenTransactionResponse.newBuilder().setTransactionId(txId).build(); - }).transform(this.onErrorMapMonoWithRequestInfo("openTransaction", request)); + }, false).transform(this.onErrorMapMonoWithRequestInfo("openTransaction", request)); } @Override @@ -133,7 +140,7 @@ public class GrpcServer extends Server { return executeSync(() -> { var committed = api.closeTransaction(request.getTransactionId(), request.getCommit()); return CloseTransactionResponse.newBuilder().setSuccessful(committed).build(); - }).transform(this.onErrorMapMonoWithRequestInfo("closeTransaction", request)); + }, false).transform(this.onErrorMapMonoWithRequestInfo("closeTransaction", request)); } @Override @@ -141,7 +148,7 @@ public class GrpcServer extends Server { return executeSync(() -> { api.closeFailedUpdate(request.getUpdateId()); return Empty.getDefaultInstance(); - }).transform(this.onErrorMapMonoWithRequestInfo("closeFailedUpdate", request)); + }, true).transform(this.onErrorMapMonoWithRequestInfo("closeFailedUpdate", request)); } @Override @@ -149,7 +156,7 @@ public class GrpcServer extends Server { return executeSync(() -> { var colId = api.createColumn(request.getName(), mapColumnSchema(request.getSchema())); return CreateColumnResponse.newBuilder().setColumnId(colId).build(); - }).transform(this.onErrorMapMonoWithRequestInfo("createColumn", request)); + }, false).transform(this.onErrorMapMonoWithRequestInfo("createColumn", request)); } @Override @@ -157,7 +164,7 @@ public class GrpcServer extends Server { return executeSync(() -> { api.deleteColumn(request.getColumnId()); return Empty.getDefaultInstance(); - }).transform(this.onErrorMapMonoWithRequestInfo("deleteColumn", request)); + }, false).transform(this.onErrorMapMonoWithRequestInfo("deleteColumn", request)); } @Override @@ -165,7 +172,7 @@ public class GrpcServer extends Server { return executeSync(() -> { var colId = api.getColumnId(request.getName()); return GetColumnIdResponse.newBuilder().setColumnId(colId).build(); - }).transform(this.onErrorMapMonoWithRequestInfo("getColumnId", request)); + }, true).transform(this.onErrorMapMonoWithRequestInfo("getColumnId", request)); } @Override @@ -181,7 +188,7 @@ public class GrpcServer extends Server { ); } return Empty.getDefaultInstance(); - }).transform(this.onErrorMapMonoWithRequestInfo("put", request)); + }, false).transform(this.onErrorMapMonoWithRequestInfo("put", request)); } @Override @@ -233,7 +240,7 @@ public class GrpcServer extends Server { var initialRequest = firstValue.getInitialRequest(); return nextRequests - .publishOn(executor) + .publishOn(scheduler.write()) .doOnNext(putRequest -> { var data = putRequest.getData(); try (var arena = Arena.ofConfined()) { @@ -272,7 +279,7 @@ public class GrpcServer extends Server { } return prevBuilder.build(); } - }).transform(this.onErrorMapMonoWithRequestInfo("putGetPrevious", request)); + }, false).transform(this.onErrorMapMonoWithRequestInfo("putGetPrevious", request)); } @Override @@ -295,7 +302,7 @@ public class GrpcServer extends Server { } return deltaBuilder.build(); } - }).transform(this.onErrorMapMonoWithRequestInfo("putGetDelta", request)); + }, false).transform(this.onErrorMapMonoWithRequestInfo("putGetDelta", request)); } @Override @@ -311,7 +318,7 @@ public class GrpcServer extends Server { ); return Changed.newBuilder().setChanged(changed).build(); } - }).transform(this.onErrorMapMonoWithRequestInfo("putGetChanged", request)); + }, false).transform(this.onErrorMapMonoWithRequestInfo("putGetChanged", request)); } @Override @@ -327,7 +334,7 @@ public class GrpcServer extends Server { ); return PreviousPresence.newBuilder().setPresent(present).build(); } - }).transform(this.onErrorMapMonoWithRequestInfo("putGetPreviousPresence", request)); + }, false).transform(this.onErrorMapMonoWithRequestInfo("putGetPreviousPresence", request)); } @Override @@ -346,7 +353,7 @@ public class GrpcServer extends Server { } return responseBuilder.build(); } - }).transform(this.onErrorMapMonoWithRequestInfo("get", request)); + }, true).transform(this.onErrorMapMonoWithRequestInfo("get", request)); } @Override @@ -366,7 +373,7 @@ public class GrpcServer extends Server { } return responseBuilder.build(); } - }).transform(this.onErrorMapMonoWithRequestInfo("getForUpdate", request)); + }, false).transform(this.onErrorMapMonoWithRequestInfo("getForUpdate", request)); } @Override @@ -381,7 +388,7 @@ public class GrpcServer extends Server { ); return PreviousPresence.newBuilder().setPresent(exists).build(); } - }).transform(this.onErrorMapMonoWithRequestInfo("exists", request)); + }, true).transform(this.onErrorMapMonoWithRequestInfo("exists", request)); } @Override @@ -398,7 +405,7 @@ public class GrpcServer extends Server { ); return OpenIteratorResponse.newBuilder().setIteratorId(iteratorId).build(); } - }).transform(this.onErrorMapMonoWithRequestInfo("openIterator", request)); + }, true).transform(this.onErrorMapMonoWithRequestInfo("openIterator", request)); } @Override @@ -406,7 +413,7 @@ public class GrpcServer extends Server { return executeSync(() -> { api.closeIterator(request.getIteratorId()); return Empty.getDefaultInstance(); - }).transform(this.onErrorMapMonoWithRequestInfo("closeIterator", request)); + }, true).transform(this.onErrorMapMonoWithRequestInfo("closeIterator", request)); } @Override @@ -416,7 +423,7 @@ public class GrpcServer extends Server { api.seekTo(arena, request.getIterationId(), mapKeys(arena, request.getKeysCount(), request::getKeys)); } return Empty.getDefaultInstance(); - }).transform(this.onErrorMapMonoWithRequestInfo("seekTo", request)); + }, true).transform(this.onErrorMapMonoWithRequestInfo("seekTo", request)); } @Override @@ -429,7 +436,7 @@ public class GrpcServer extends Server { new RequestNothing<>()); } return Empty.getDefaultInstance(); - }).transform(this.onErrorMapMonoWithRequestInfo("subsequent", request)); + }, true).transform(this.onErrorMapMonoWithRequestInfo("subsequent", request)); } @Override @@ -442,7 +449,7 @@ public class GrpcServer extends Server { new RequestExists<>()); return PreviousPresence.newBuilder().setPresent(exists).build(); } - }).transform(this.onErrorMapMonoWithRequestInfo("subsequentExists", request)); + }, true).transform(this.onErrorMapMonoWithRequestInfo("subsequentExists", request)); } @Override @@ -510,7 +517,7 @@ public class GrpcServer extends Server { ); return EntriesCount.newBuilder().setCount(entriesCount).build(); } - }).transform(this.onErrorMapMonoWithRequestInfo("reduceRangeEntriesCount", request)); + }, true).transform(this.onErrorMapMonoWithRequestInfo("reduceRangeEntriesCount", request)); } @Override @@ -533,8 +540,8 @@ public class GrpcServer extends Server { // utils - private Mono executeSync(Callable callable) { - return Mono.fromCallable(callable).subscribeOn(executor); + private Mono executeSync(Callable callable, boolean isReadOnly) { + return Mono.fromCallable(callable).subscribeOn(isReadOnly ? scheduler.read() : scheduler.write()); } // mappers @@ -661,17 +668,12 @@ public class GrpcServer extends Server { if (ex instanceof CompletionException exx) { return handleError(exx.getCause()); } else { - if (ex instanceof RocksDBException e) { - return Status.INTERNAL - .withDescription(e.getLocalizedMessage()) - .withCause(e); - } else if (ex instanceof StatusException ex2) { - return ex2.getStatus(); - } else if (ex instanceof StatusRuntimeException ex3) { - return ex3.getStatus(); - } else { - return Status.INTERNAL.withCause(ex); - } + return switch (ex) { + case RocksDBException e -> Status.INTERNAL.withDescription(e.getLocalizedMessage()).withCause(e); + case StatusException ex2 -> ex2.getStatus(); + case StatusRuntimeException ex3 -> ex3.getStatus(); + case null, default -> Status.INTERNAL.withCause(ex); + }; } } } @@ -686,9 +688,9 @@ public class GrpcServer extends Server { throw new RuntimeException(e); } elg.close(); - executor.disposeGracefully().timeout(Duration.ofMinutes(2)).onErrorResume(ex -> { + scheduler.disposeGracefully().timeout(Duration.ofMinutes(2)).onErrorResume(ex -> { LOG.error("Grpc server executor shutdown timed out, terminating...", ex); - executor.dispose(); + scheduler.dispose(); return Mono.empty(); }).block(); super.close();