Use read-write thread pools

This commit is contained in:
Andrea Cavalli 2024-10-29 18:04:07 +01:00
parent 4f4533d434
commit 302ae92fc1
10 changed files with 236 additions and 56 deletions

View File

@ -4,6 +4,8 @@ import it.cavallium.rockserver.core.common.*;
import it.cavallium.rockserver.core.common.RequestType.RequestGet;
import it.cavallium.rockserver.core.common.RequestType.RequestPut;
import it.cavallium.rockserver.core.impl.EmbeddedDB;
import it.cavallium.rockserver.core.impl.InternalConnection;
import it.cavallium.rockserver.core.impl.RWScheduler;
import java.io.IOException;
import java.lang.foreign.Arena;
import java.lang.foreign.MemorySegment;
@ -13,6 +15,7 @@ import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Stream;
@ -21,16 +24,14 @@ import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.reactivestreams.Publisher;
public class EmbeddedConnection extends BaseConnection implements RocksDBAPI {
public class EmbeddedConnection extends BaseConnection implements RocksDBAPI, InternalConnection {
private final EmbeddedDB db;
public static final URI PRIVATE_MEMORY_URL = URI.create("memory://private");
private final ExecutorService exeuctor;
public EmbeddedConnection(@Nullable Path path, String name, @Nullable Path embeddedConfig) throws IOException {
super(name);
this.db = new EmbeddedDB(path, name, embeddedConfig);
this.exeuctor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
}
@Override
@ -95,7 +96,8 @@ public class EmbeddedConnection extends BaseConnection implements RocksDBAPI {
return (RA) switch (req) {
case RocksDBAPICommand.RocksDBAPICommandSingle.PutBatch putBatch -> this.putBatchAsync(putBatch.columnId(), putBatch.batchPublisher(), putBatch.mode());
case RocksDBAPICommand.RocksDBAPICommandStream.GetRange<?> getRange -> this.getRangeAsync(getRange.arena(), getRange.transactionId(), getRange.columnId(), getRange.startKeysInclusive(), getRange.endKeysExclusive(), getRange.reverse(), getRange.requestType(), getRange.timeoutMs());
case RocksDBAPICommand.RocksDBAPICommandSingle<?> _ -> CompletableFuture.supplyAsync(() -> req.handleSync(this), exeuctor);
case RocksDBAPICommand.RocksDBAPICommandSingle<?> _ -> CompletableFuture.supplyAsync(() -> req.handleSync(this),
(req.isReadOnly() ? db.getScheduler().readExecutor() : db.getScheduler().writeExecutor()));
case RocksDBAPICommand.RocksDBAPICommandStream<?> _ -> throw RocksDBException.of(RocksDBException.RocksDBErrorType.NOT_IMPLEMENTED, "The request of type " + req.getClass().getName() + " is not implemented in class " + this.getClass().getName());
};
}
@ -187,4 +189,9 @@ public class EmbeddedConnection extends BaseConnection implements RocksDBAPI {
public <T> Publisher<T> getRangeAsync(Arena arena, long transactionId, long columnId, @Nullable Keys startKeysInclusive, @Nullable Keys endKeysExclusive, boolean reverse, RequestType.RequestGetRange<? super KV, T> requestType, long timeoutMs) throws RocksDBException {
return db.getRangeAsyncInternal(arena, transactionId, columnId, startKeysInclusive, endKeysExclusive, reverse, requestType, timeoutMs);
}
@Override
public RWScheduler getScheduler() {
return db.getScheduler();
}
}

View File

@ -8,7 +8,6 @@ import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.ByteString;
import com.google.protobuf.UnsafeByteOperations;
import io.grpc.ManagedChannel;
import io.grpc.Status;
import io.grpc.Status.Code;
import io.grpc.StatusRuntimeException;
import io.grpc.netty.NettyChannelBuilder;

View File

@ -2,6 +2,7 @@ package it.cavallium.rockserver.core.common;
import it.cavallium.rockserver.core.common.RequestType.RequestGet;
import it.cavallium.rockserver.core.common.RequestType.RequestPut;
import it.cavallium.rockserver.core.common.RequestType.RequestTypeId;
import java.lang.foreign.Arena;
import java.lang.foreign.MemorySegment;
import java.util.List;
@ -18,6 +19,8 @@ public sealed interface RocksDBAPICommand<RESULT_ITEM_TYPE, SYNC_RESULT, ASYNC_R
SYNC_RESULT handleSync(RocksDBSyncAPI api);
ASYNC_RESULT handleAsync(RocksDBAsyncAPI api);
boolean isReadOnly();
sealed interface RocksDBAPICommandSingle<R> extends RocksDBAPICommand<R, R, CompletableFuture<R>> {
@ -40,6 +43,11 @@ public sealed interface RocksDBAPICommand<RESULT_ITEM_TYPE, SYNC_RESULT, ASYNC_R
return api.openTransactionAsync(timeoutMs);
}
@Override
public boolean isReadOnly() {
return false;
}
}
/**
* Close a transaction
@ -61,6 +69,11 @@ public sealed interface RocksDBAPICommand<RESULT_ITEM_TYPE, SYNC_RESULT, ASYNC_R
return api.closeTransactionAsync(transactionId, commit);
}
@Override
public boolean isReadOnly() {
return false;
}
}
/**
* Close a failed update, discarding all changes
@ -80,6 +93,11 @@ public sealed interface RocksDBAPICommand<RESULT_ITEM_TYPE, SYNC_RESULT, ASYNC_R
return api.closeFailedUpdateAsync(updateId);
}
@Override
public boolean isReadOnly() {
return true;
}
}
/**
* Create a column
@ -101,6 +119,11 @@ public sealed interface RocksDBAPICommand<RESULT_ITEM_TYPE, SYNC_RESULT, ASYNC_R
return api.createColumnAsync(name, schema);
}
@Override
public boolean isReadOnly() {
return false;
}
}
/**
* Delete a column
@ -119,6 +142,11 @@ public sealed interface RocksDBAPICommand<RESULT_ITEM_TYPE, SYNC_RESULT, ASYNC_R
return api.deleteColumnAsync(columnId);
}
@Override
public boolean isReadOnly() {
return false;
}
}
/**
* Get column id by name
@ -139,6 +167,11 @@ public sealed interface RocksDBAPICommand<RESULT_ITEM_TYPE, SYNC_RESULT, ASYNC_R
return api.getColumnIdAsync(name);
}
@Override
public boolean isReadOnly() {
return true;
}
}
/**
* Put an element into the specified position
@ -166,6 +199,11 @@ public sealed interface RocksDBAPICommand<RESULT_ITEM_TYPE, SYNC_RESULT, ASYNC_R
return api.putAsync(arena, transactionOrUpdateId, columnId, keys, value, requestType);
}
@Override
public boolean isReadOnly() {
return false;
}
@Override
public String toString() {
var sb = new StringBuilder("PUT");
@ -203,6 +241,11 @@ public sealed interface RocksDBAPICommand<RESULT_ITEM_TYPE, SYNC_RESULT, ASYNC_R
return api.putMultiAsync(arena, transactionOrUpdateId, columnId, keys, values, requestType);
}
@Override
public boolean isReadOnly() {
return false;
}
@Override
public String toString() {
var sb = new StringBuilder("PUT_MULTI");
@ -242,6 +285,11 @@ public sealed interface RocksDBAPICommand<RESULT_ITEM_TYPE, SYNC_RESULT, ASYNC_R
return api.putBatchAsync(columnId, batchPublisher, mode);
}
@Override
public boolean isReadOnly() {
return false;
}
@Override
public String toString() {
var sb = new StringBuilder("PUT_BATCH");
@ -275,6 +323,11 @@ public sealed interface RocksDBAPICommand<RESULT_ITEM_TYPE, SYNC_RESULT, ASYNC_R
return api.getAsync(arena, transactionOrUpdateId, columnId, keys, requestType);
}
@Override
public boolean isReadOnly() {
return requestType.getRequestTypeId() != RequestTypeId.FOR_UPDATE;
}
@Override
public String toString() {
var sb = new StringBuilder("GET");
@ -325,6 +378,11 @@ public sealed interface RocksDBAPICommand<RESULT_ITEM_TYPE, SYNC_RESULT, ASYNC_R
);
}
@Override
public boolean isReadOnly() {
return true;
}
}
/**
* Close an iterator
@ -343,6 +401,11 @@ public sealed interface RocksDBAPICommand<RESULT_ITEM_TYPE, SYNC_RESULT, ASYNC_R
return api.closeIteratorAsync(iteratorId);
}
@Override
public boolean isReadOnly() {
return true;
}
}
/**
* Seek to the specific element during an iteration, or the subsequent one if not found
@ -363,6 +426,11 @@ public sealed interface RocksDBAPICommand<RESULT_ITEM_TYPE, SYNC_RESULT, ASYNC_R
return api.seekToAsync(arena, iterationId, keys);
}
@Override
public boolean isReadOnly() {
return true;
}
}
/**
* Get the subsequent element during an iteration
@ -389,6 +457,11 @@ public sealed interface RocksDBAPICommand<RESULT_ITEM_TYPE, SYNC_RESULT, ASYNC_R
return api.subsequentAsync(arena, iterationId, skipCount, takeCount, requestType);
}
@Override
public boolean isReadOnly() {
return true;
}
}
/**
* Reduce values in a range
@ -439,6 +512,11 @@ public sealed interface RocksDBAPICommand<RESULT_ITEM_TYPE, SYNC_RESULT, ASYNC_R
);
}
@Override
public boolean isReadOnly() {
return true;
}
}
}
sealed interface RocksDBAPICommandStream<R> extends RocksDBAPICommand<R, Stream<R>, Publisher<R>> {
@ -492,6 +570,11 @@ public sealed interface RocksDBAPICommand<RESULT_ITEM_TYPE, SYNC_RESULT, ASYNC_R
);
}
@Override
public boolean isReadOnly() {
return true;
}
}
}
}

View File

@ -28,8 +28,9 @@ public class ConfigPrinter {
public static String stringifyDatabase(DatabaseConfig o) throws GestaltException {
return """
{
"parallelism": %s,
"global": %s
}""".formatted(stringifyGlobalDatabase(o.global()));
}""".formatted(stringifyParallelism(o.parallelism()), stringifyGlobalDatabase(o.global()));
}
public static String stringifyLevel(ColumnLevelConfig o) throws GestaltException {
@ -92,6 +93,17 @@ public class ConfigPrinter {
);
}
public static String stringifyParallelism(ParallelismConfig o) throws GestaltException {
return """
{
"read": %d,
"write": %d
}\
""".formatted(o.read(),
o.write()
);
}
private static String stringifyVolume(VolumeConfig o) throws GestaltException {
return """
{

View File

@ -5,4 +5,6 @@ import org.github.gestalt.config.exceptions.GestaltException;
public interface DatabaseConfig {
GlobalDatabaseConfig global() throws GestaltException;
ParallelismConfig parallelism() throws GestaltException;
}

View File

@ -0,0 +1,16 @@
package it.cavallium.rockserver.core.config;
import java.time.Duration;
import org.github.gestalt.config.exceptions.GestaltException;
import org.jetbrains.annotations.Nullable;
import java.nio.file.Path;
public interface ParallelismConfig {
@Nullable
Integer read() throws GestaltException;
@Nullable
Integer write() throws GestaltException;
}

View File

@ -32,6 +32,7 @@ import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeoutException;
import java.util.stream.Stream;
@ -48,9 +49,10 @@ import org.rocksdb.Status.Code;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
public class EmbeddedDB implements RocksDBSyncAPI, Closeable {
public class EmbeddedDB implements RocksDBSyncAPI, InternalConnection, Closeable {
private static final int INITIAL_DIRECT_READ_BYTE_BUF_SIZE_BYTES = 4096;
public static final long MAX_TRANSACTION_DURATION_MS = 10_000L;
@ -61,6 +63,7 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable {
private final @Nullable Path path;
private final TransactionalDB db;
private final DBOptions dbOptions;
private final RWScheduler scheduler;
private final ColumnFamilyHandle columnSchemasColumnDescriptorHandle;
private final NonBlockingHashMapLong<ColumnInstance> columns;
private final Map<String, ColumnFamilyOptions> columnsConifg;
@ -89,6 +92,13 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable {
this.dbOptions = loadedDb.dbOptions();
this.refs = loadedDb.refs();
this.cache = loadedDb.cache();
try {
int readCap = Objects.requireNonNullElse(config.parallelism().read(), Runtime.getRuntime().availableProcessors());
int writeCap = Objects.requireNonNullElse(config.parallelism().write(), Runtime.getRuntime().availableProcessors());
this.scheduler = new RWScheduler(readCap, writeCap, "db");
} catch (GestaltException e) {
throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.CONFIG_ERROR, "Can't get the scheduler parallelism");
}
this.columnsConifg = loadedDb.definitiveColumnFamilyOptionsMap();
try {
this.tempSSTsPath = config.global().tempSstPath();
@ -257,6 +267,12 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable {
}
}
private ReadOptions newReadOptions() {
var ro = new ReadOptions();
ro.setAsyncIo(true);
return ro;
}
@Override
public long openTransaction(long timeoutMs) {
return allocateTransactionInternal(openTransactionInternal(timeoutMs, false));
@ -716,7 +732,7 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable {
if (col.hasBuckets()) {
assert dbWriter instanceof Tx;
var bucketElementKeys = col.getBucketElementKeys(keys.keys());
try (var readOptions = new ReadOptions()) {
try (var readOptions = newReadOptions()) {
var previousRawBucketByteArray = ((Tx) dbWriter).val().getForUpdate(readOptions, col.cfh(), calculatedKey.toArray(BIG_ENDIAN_BYTES), true);
MemorySegment previousRawBucket = toMemorySegment(arena, previousRawBucketByteArray);
var bucket = previousRawBucket != null ? new Bucket(col, previousRawBucket) : new Bucket(col);
@ -730,7 +746,7 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable {
} else {
if (RequestType.requiresGettingPreviousValue(callback)) {
assert dbWriter instanceof Tx;
try (var readOptions = new ReadOptions()) {
try (var readOptions = newReadOptions()) {
byte[] previousValueByteArray;
previousValueByteArray = ((Tx) dbWriter).val().getForUpdate(readOptions, col.cfh(), calculatedKey.toArray(BIG_ENDIAN_BYTES), true);
previousValue = transformResultValue(col, toMemorySegment(arena, previousValueByteArray));
@ -740,7 +756,7 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable {
} else if (RequestType.requiresGettingPreviousPresence(callback)) {
// todo: in the future this should be replaced with just keyExists
assert dbWriter instanceof Tx;
try (var readOptions = new ReadOptions()) {
try (var readOptions = newReadOptions()) {
byte[] previousValueByteArray;
previousValueByteArray = ((Tx) dbWriter).val().getForUpdate(readOptions, col.cfh(), calculatedKey.toArray(BIG_ENDIAN_BYTES), true);
previousValue = previousValueByteArray != null ? MemorySegment.NULL : null;
@ -850,7 +866,7 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable {
MemorySegment calculatedKey = col.calculateKey(arena, keys.keys());
if (col.hasBuckets()) {
var bucketElementKeys = col.getBucketElementKeys(keys.keys());
try (var readOptions = new ReadOptions()) {
try (var readOptions = newReadOptions()) {
MemorySegment previousRawBucket = dbGet(tx, col, arena, readOptions, calculatedKey);
if (previousRawBucket != null) {
var bucket = new Bucket(col, previousRawBucket);
@ -866,7 +882,7 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable {
boolean shouldGetCurrent = RequestType.requiresGettingCurrentValue(callback)
|| (tx != null && callback instanceof RequestType.RequestExists<?>);
if (shouldGetCurrent) {
try (var readOptions = new ReadOptions()) {
try (var readOptions = newReadOptions()) {
foundValue = dbGet(tx, col, arena, readOptions, calculatedKey);
existsValue = foundValue != null;
} catch (org.rocksdb.RocksDBException e) {
@ -914,7 +930,7 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable {
try {
var col = getColumn(columnId);
RocksIterator it;
var ro = new ReadOptions();
var ro = newReadOptions();
if (transactionId > 0L) {
//noinspection resource
it = getTransaction(transactionId, false).val().getIterator(ro, col.cfh());
@ -987,7 +1003,7 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable {
}
}
try (var ro = new ReadOptions()) {
try (var ro = newReadOptions()) {
MemorySegment calculatedStartKey = startKeysInclusive != null && startKeysInclusive.keys().length > 0 ? col.calculateKey(arena, startKeysInclusive.keys()) : null;
MemorySegment calculatedEndKey = endKeysExclusive != null && endKeysExclusive.keys().length > 0 ? col.calculateKey(arena, endKeysExclusive.keys()) : null;
try (var startKeySlice = calculatedStartKey != null ? toDirectSlice(calculatedStartKey) : null;
@ -1099,7 +1115,7 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable {
}
}
var ro = new ReadOptions();
var ro = newReadOptions();
try {
MemorySegment calculatedStartKey = startKeysInclusive != null && startKeysInclusive.keys().length > 0 ? col.calculateKey(arena, startKeysInclusive.keys()) : null;
MemorySegment calculatedEndKey = endKeysExclusive != null && endKeysExclusive.keys().length > 0 ? col.calculateKey(arena, endKeysExclusive.keys()) : null;
@ -1157,7 +1173,7 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable {
}
return it;
}), Resources::close)
.subscribeOn(Schedulers.boundedElastic())
.subscribeOn(scheduler.read())
.doFirst(ops::beginOp)
.doFinally(_ -> ops.endOp());
}
@ -1270,4 +1286,9 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable {
throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.NOT_IMPLEMENTED,
"Bucket column type not implemented, implement them");
}
@Override
public RWScheduler getScheduler() {
return scheduler;
}
}

View File

@ -0,0 +1,6 @@
package it.cavallium.rockserver.core.impl;
public interface InternalConnection {
RWScheduler getScheduler();
}

View File

@ -0,0 +1,32 @@
package it.cavallium.rockserver.core.impl;
import static reactor.core.scheduler.Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE;
import java.util.concurrent.Executor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
public record RWScheduler(Scheduler read, Scheduler write, Executor readExecutor, Executor writeExecutor) {
public RWScheduler(Scheduler read, Scheduler write) {
this(read, write, read::schedule, write::schedule);
}
public RWScheduler(int readCap, int writeCap, String name) {
this(
Schedulers.newBoundedElastic(readCap, DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, name + "-read"),
Schedulers.newBoundedElastic(writeCap, DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, name + "-write")
);
}
public Mono<Void> disposeGracefully() {
return Mono.whenDelayError(read.disposeGracefully(), write.disposeGracefully());
}
public void dispose() {
read.dispose();
write.dispose();
}
}

View File

@ -39,6 +39,8 @@ import it.cavallium.rockserver.core.common.api.proto.*;
import it.cavallium.rockserver.core.common.api.proto.Delta;
import it.cavallium.rockserver.core.common.api.proto.FirstAndLast;
import it.cavallium.rockserver.core.common.api.proto.KV;
import it.cavallium.rockserver.core.impl.InternalConnection;
import it.cavallium.rockserver.core.impl.RWScheduler;
import it.unimi.dsi.fastutil.ints.Int2IntFunction;
import it.unimi.dsi.fastutil.ints.Int2ObjectFunction;
import it.unimi.dsi.fastutil.ints.IntArrayList;
@ -63,8 +65,6 @@ import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
public class GrpcServer extends Server {
@ -72,11 +72,19 @@ public class GrpcServer extends Server {
private final GrpcServerImpl grpc;
private final EventLoopGroup elg;
private final Scheduler executor;
private final io.grpc.Server server;
private final RWScheduler scheduler;
public GrpcServer(RocksDBConnection client, SocketAddress socketAddress) throws IOException {
super(client);
if (client instanceof InternalConnection internalConnection) {
this.scheduler = internalConnection.getScheduler();
} else {
this.scheduler = new RWScheduler(Runtime.getRuntime().availableProcessors(),
Runtime.getRuntime().availableProcessors(),
"grpc-db"
);
}
this.grpc = new GrpcServerImpl(this.getClient());
EventLoopGroup elg;
Class<? extends ServerChannel> channelType;
@ -88,7 +96,6 @@ public class GrpcServer extends Server {
channelType = NioServerSocketChannel.class;
}
this.elg = elg;
this.executor = Schedulers.newBoundedElastic(Runtime.getRuntime().availableProcessors() * 2, Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, "server-db-executor");
this.server = NettyServerBuilder
.forAddress(socketAddress)
.bossEventLoopGroup(elg)
@ -125,7 +132,7 @@ public class GrpcServer extends Server {
return executeSync(() -> {
var txId = api.openTransaction(request.getTimeoutMs());
return OpenTransactionResponse.newBuilder().setTransactionId(txId).build();
}).transform(this.onErrorMapMonoWithRequestInfo("openTransaction", request));
}, false).transform(this.onErrorMapMonoWithRequestInfo("openTransaction", request));
}
@Override
@ -133,7 +140,7 @@ public class GrpcServer extends Server {
return executeSync(() -> {
var committed = api.closeTransaction(request.getTransactionId(), request.getCommit());
return CloseTransactionResponse.newBuilder().setSuccessful(committed).build();
}).transform(this.onErrorMapMonoWithRequestInfo("closeTransaction", request));
}, false).transform(this.onErrorMapMonoWithRequestInfo("closeTransaction", request));
}
@Override
@ -141,7 +148,7 @@ public class GrpcServer extends Server {
return executeSync(() -> {
api.closeFailedUpdate(request.getUpdateId());
return Empty.getDefaultInstance();
}).transform(this.onErrorMapMonoWithRequestInfo("closeFailedUpdate", request));
}, true).transform(this.onErrorMapMonoWithRequestInfo("closeFailedUpdate", request));
}
@Override
@ -149,7 +156,7 @@ public class GrpcServer extends Server {
return executeSync(() -> {
var colId = api.createColumn(request.getName(), mapColumnSchema(request.getSchema()));
return CreateColumnResponse.newBuilder().setColumnId(colId).build();
}).transform(this.onErrorMapMonoWithRequestInfo("createColumn", request));
}, false).transform(this.onErrorMapMonoWithRequestInfo("createColumn", request));
}
@Override
@ -157,7 +164,7 @@ public class GrpcServer extends Server {
return executeSync(() -> {
api.deleteColumn(request.getColumnId());
return Empty.getDefaultInstance();
}).transform(this.onErrorMapMonoWithRequestInfo("deleteColumn", request));
}, false).transform(this.onErrorMapMonoWithRequestInfo("deleteColumn", request));
}
@Override
@ -165,7 +172,7 @@ public class GrpcServer extends Server {
return executeSync(() -> {
var colId = api.getColumnId(request.getName());
return GetColumnIdResponse.newBuilder().setColumnId(colId).build();
}).transform(this.onErrorMapMonoWithRequestInfo("getColumnId", request));
}, true).transform(this.onErrorMapMonoWithRequestInfo("getColumnId", request));
}
@Override
@ -181,7 +188,7 @@ public class GrpcServer extends Server {
);
}
return Empty.getDefaultInstance();
}).transform(this.onErrorMapMonoWithRequestInfo("put", request));
}, false).transform(this.onErrorMapMonoWithRequestInfo("put", request));
}
@Override
@ -233,7 +240,7 @@ public class GrpcServer extends Server {
var initialRequest = firstValue.getInitialRequest();
return nextRequests
.publishOn(executor)
.publishOn(scheduler.write())
.doOnNext(putRequest -> {
var data = putRequest.getData();
try (var arena = Arena.ofConfined()) {
@ -272,7 +279,7 @@ public class GrpcServer extends Server {
}
return prevBuilder.build();
}
}).transform(this.onErrorMapMonoWithRequestInfo("putGetPrevious", request));
}, false).transform(this.onErrorMapMonoWithRequestInfo("putGetPrevious", request));
}
@Override
@ -295,7 +302,7 @@ public class GrpcServer extends Server {
}
return deltaBuilder.build();
}
}).transform(this.onErrorMapMonoWithRequestInfo("putGetDelta", request));
}, false).transform(this.onErrorMapMonoWithRequestInfo("putGetDelta", request));
}
@Override
@ -311,7 +318,7 @@ public class GrpcServer extends Server {
);
return Changed.newBuilder().setChanged(changed).build();
}
}).transform(this.onErrorMapMonoWithRequestInfo("putGetChanged", request));
}, false).transform(this.onErrorMapMonoWithRequestInfo("putGetChanged", request));
}
@Override
@ -327,7 +334,7 @@ public class GrpcServer extends Server {
);
return PreviousPresence.newBuilder().setPresent(present).build();
}
}).transform(this.onErrorMapMonoWithRequestInfo("putGetPreviousPresence", request));
}, false).transform(this.onErrorMapMonoWithRequestInfo("putGetPreviousPresence", request));
}
@Override
@ -346,7 +353,7 @@ public class GrpcServer extends Server {
}
return responseBuilder.build();
}
}).transform(this.onErrorMapMonoWithRequestInfo("get", request));
}, true).transform(this.onErrorMapMonoWithRequestInfo("get", request));
}
@Override
@ -366,7 +373,7 @@ public class GrpcServer extends Server {
}
return responseBuilder.build();
}
}).transform(this.onErrorMapMonoWithRequestInfo("getForUpdate", request));
}, false).transform(this.onErrorMapMonoWithRequestInfo("getForUpdate", request));
}
@Override
@ -381,7 +388,7 @@ public class GrpcServer extends Server {
);
return PreviousPresence.newBuilder().setPresent(exists).build();
}
}).transform(this.onErrorMapMonoWithRequestInfo("exists", request));
}, true).transform(this.onErrorMapMonoWithRequestInfo("exists", request));
}
@Override
@ -398,7 +405,7 @@ public class GrpcServer extends Server {
);
return OpenIteratorResponse.newBuilder().setIteratorId(iteratorId).build();
}
}).transform(this.onErrorMapMonoWithRequestInfo("openIterator", request));
}, true).transform(this.onErrorMapMonoWithRequestInfo("openIterator", request));
}
@Override
@ -406,7 +413,7 @@ public class GrpcServer extends Server {
return executeSync(() -> {
api.closeIterator(request.getIteratorId());
return Empty.getDefaultInstance();
}).transform(this.onErrorMapMonoWithRequestInfo("closeIterator", request));
}, true).transform(this.onErrorMapMonoWithRequestInfo("closeIterator", request));
}
@Override
@ -416,7 +423,7 @@ public class GrpcServer extends Server {
api.seekTo(arena, request.getIterationId(), mapKeys(arena, request.getKeysCount(), request::getKeys));
}
return Empty.getDefaultInstance();
}).transform(this.onErrorMapMonoWithRequestInfo("seekTo", request));
}, true).transform(this.onErrorMapMonoWithRequestInfo("seekTo", request));
}
@Override
@ -429,7 +436,7 @@ public class GrpcServer extends Server {
new RequestNothing<>());
}
return Empty.getDefaultInstance();
}).transform(this.onErrorMapMonoWithRequestInfo("subsequent", request));
}, true).transform(this.onErrorMapMonoWithRequestInfo("subsequent", request));
}
@Override
@ -442,7 +449,7 @@ public class GrpcServer extends Server {
new RequestExists<>());
return PreviousPresence.newBuilder().setPresent(exists).build();
}
}).transform(this.onErrorMapMonoWithRequestInfo("subsequentExists", request));
}, true).transform(this.onErrorMapMonoWithRequestInfo("subsequentExists", request));
}
@Override
@ -510,7 +517,7 @@ public class GrpcServer extends Server {
);
return EntriesCount.newBuilder().setCount(entriesCount).build();
}
}).transform(this.onErrorMapMonoWithRequestInfo("reduceRangeEntriesCount", request));
}, true).transform(this.onErrorMapMonoWithRequestInfo("reduceRangeEntriesCount", request));
}
@Override
@ -533,8 +540,8 @@ public class GrpcServer extends Server {
// utils
private <T> Mono<T> executeSync(Callable<T> callable) {
return Mono.fromCallable(callable).subscribeOn(executor);
private <T> Mono<T> executeSync(Callable<T> callable, boolean isReadOnly) {
return Mono.fromCallable(callable).subscribeOn(isReadOnly ? scheduler.read() : scheduler.write());
}
// mappers
@ -661,17 +668,12 @@ public class GrpcServer extends Server {
if (ex instanceof CompletionException exx) {
return handleError(exx.getCause());
} else {
if (ex instanceof RocksDBException e) {
return Status.INTERNAL
.withDescription(e.getLocalizedMessage())
.withCause(e);
} else if (ex instanceof StatusException ex2) {
return ex2.getStatus();
} else if (ex instanceof StatusRuntimeException ex3) {
return ex3.getStatus();
} else {
return Status.INTERNAL.withCause(ex);
}
return switch (ex) {
case RocksDBException e -> Status.INTERNAL.withDescription(e.getLocalizedMessage()).withCause(e);
case StatusException ex2 -> ex2.getStatus();
case StatusRuntimeException ex3 -> ex3.getStatus();
case null, default -> Status.INTERNAL.withCause(ex);
};
}
}
}
@ -686,9 +688,9 @@ public class GrpcServer extends Server {
throw new RuntimeException(e);
}
elg.close();
executor.disposeGracefully().timeout(Duration.ofMinutes(2)).onErrorResume(ex -> {
scheduler.disposeGracefully().timeout(Duration.ofMinutes(2)).onErrorResume(ex -> {
LOG.error("Grpc server executor shutdown timed out, terminating...", ex);
executor.dispose();
scheduler.dispose();
return Mono.empty();
}).block();
super.close();