Fix grpc server

This commit is contained in:
Andrea Cavalli 2024-09-13 23:15:22 +02:00
parent d29b4e45fe
commit ba636d8ba6
7 changed files with 121 additions and 76 deletions

View File

@ -20,7 +20,6 @@ import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.jetbrains.annotations.NotNull;
@ -95,7 +94,7 @@ public class EmbeddedConnection extends BaseConnection implements RocksDBAPI {
}
@Override
public <R> CompletionStage<R> requestAsync(RocksDBAPICommand<R> req) {
public <R> CompletableFuture<R> requestAsync(RocksDBAPICommand<R> req) {
return CompletableFuture.supplyAsync(() -> req.handleSync(this), exeuctor);
}

View File

@ -10,6 +10,7 @@ import com.google.protobuf.Empty;
import com.google.protobuf.UnsafeByteOperations;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import it.cavallium.rockserver.core.common.ColumnSchema;
import it.cavallium.rockserver.core.common.Keys;
@ -63,6 +64,7 @@ import it.cavallium.rockserver.core.common.api.proto.RocksDBServiceGrpc.RocksDBS
import it.cavallium.rockserver.core.common.api.proto.SeekToRequest;
import it.cavallium.rockserver.core.common.api.proto.SubsequentRequest;
import it.unimi.dsi.fastutil.ints.IntArrayList;
import java.io.IOException;
import java.lang.foreign.Arena;
import java.lang.foreign.MemorySegment;
import java.net.URI;
@ -70,14 +72,17 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class GrpcConnection extends BaseConnection implements RocksDBAPI {
private static final Logger LOG = LoggerFactory.getLogger(GrpcConnection.class);
private static final Executor DIRECT_EXECUTOR = MoreExecutors.directExecutor();
private final ManagedChannel channel;
private final RocksDBServiceBlockingStub blockingStub;
@ -121,7 +126,7 @@ public class GrpcConnection extends BaseConnection implements RocksDBAPI {
}
@Override
public CompletionStage<Long> openTransactionAsync(long timeoutMs) throws RocksDBException {
public CompletableFuture<Long> openTransactionAsync(long timeoutMs) throws RocksDBException {
var request = OpenTransactionRequest.newBuilder()
.setTimeoutMs(timeoutMs)
.build();
@ -129,7 +134,7 @@ public class GrpcConnection extends BaseConnection implements RocksDBAPI {
}
@Override
public CompletionStage<Boolean> closeTransactionAsync(long transactionId, boolean commit) throws RocksDBException {
public CompletableFuture<Boolean> closeTransactionAsync(long transactionId, boolean commit) throws RocksDBException {
var request = CloseTransactionRequest.newBuilder()
.setTransactionId(transactionId)
.setCommit(commit)
@ -138,7 +143,7 @@ public class GrpcConnection extends BaseConnection implements RocksDBAPI {
}
@Override
public CompletionStage<Void> closeFailedUpdateAsync(long updateId) throws RocksDBException {
public CompletableFuture<Void> closeFailedUpdateAsync(long updateId) throws RocksDBException {
var request = CloseFailedUpdateRequest.newBuilder()
.setUpdateId(updateId)
.build();
@ -146,7 +151,7 @@ public class GrpcConnection extends BaseConnection implements RocksDBAPI {
}
@Override
public CompletionStage<Long> createColumnAsync(String name, @NotNull ColumnSchema schema) throws RocksDBException {
public CompletableFuture<Long> createColumnAsync(String name, @NotNull ColumnSchema schema) throws RocksDBException {
var request = CreateColumnRequest.newBuilder()
.setName(name)
.setSchema(mapColumnSchema(schema))
@ -155,7 +160,7 @@ public class GrpcConnection extends BaseConnection implements RocksDBAPI {
}
@Override
public CompletionStage<Void> deleteColumnAsync(long columnId) throws RocksDBException {
public CompletableFuture<Void> deleteColumnAsync(long columnId) throws RocksDBException {
var request = DeleteColumnRequest.newBuilder()
.setColumnId(columnId)
.build();
@ -163,7 +168,7 @@ public class GrpcConnection extends BaseConnection implements RocksDBAPI {
}
@Override
public CompletionStage<Long> getColumnIdAsync(@NotNull String name) throws RocksDBException {
public CompletableFuture<Long> getColumnIdAsync(@NotNull String name) throws RocksDBException {
var request = GetColumnIdRequest.newBuilder()
.setName(name)
.build();
@ -172,7 +177,7 @@ public class GrpcConnection extends BaseConnection implements RocksDBAPI {
@SuppressWarnings("unchecked")
@Override
public <T> CompletionStage<T> putAsync(Arena arena,
public <T> CompletableFuture<T> putAsync(Arena arena,
long transactionOrUpdateId,
long columnId,
@NotNull Keys keys,
@ -183,7 +188,7 @@ public class GrpcConnection extends BaseConnection implements RocksDBAPI {
.setColumnId(columnId)
.setData(mapKV(keys, value))
.build();
return (CompletionStage<T>) switch (requestType) {
return (CompletableFuture<T>) switch (requestType) {
case RequestNothing<?> _ -> toResponse(this.futureStub.put(request), _ -> null);
case RequestPrevious<?> _ ->
toResponse(this.futureStub.putGetPrevious(request), GrpcConnection::mapPrevious);
@ -197,7 +202,7 @@ public class GrpcConnection extends BaseConnection implements RocksDBAPI {
}
@Override
public <T> CompletionStage<List<T>> putMultiAsync(Arena arena,
public <T> CompletableFuture<List<T>> putMultiAsync(Arena arena,
long transactionOrUpdateId,
long columnId,
@NotNull List<@NotNull Keys> allKeys,
@ -270,7 +275,7 @@ public class GrpcConnection extends BaseConnection implements RocksDBAPI {
@SuppressWarnings("unchecked")
@Override
public <T> CompletionStage<T> getAsync(Arena arena,
public <T> CompletableFuture<T> getAsync(Arena arena,
long transactionOrUpdateId,
long columnId,
@NotNull Keys keys,
@ -296,7 +301,7 @@ public class GrpcConnection extends BaseConnection implements RocksDBAPI {
}
@Override
public CompletionStage<Long> openIteratorAsync(Arena arena,
public CompletableFuture<Long> openIteratorAsync(Arena arena,
long transactionId,
long columnId,
@NotNull Keys startKeysInclusive,
@ -315,7 +320,7 @@ public class GrpcConnection extends BaseConnection implements RocksDBAPI {
}
@Override
public CompletionStage<Void> closeIteratorAsync(long iteratorId) throws RocksDBException {
public CompletableFuture<Void> closeIteratorAsync(long iteratorId) throws RocksDBException {
var request = CloseIteratorRequest.newBuilder()
.setIteratorId(iteratorId)
.build();
@ -323,7 +328,7 @@ public class GrpcConnection extends BaseConnection implements RocksDBAPI {
}
@Override
public CompletionStage<Void> seekToAsync(Arena arena, long iterationId, @NotNull Keys keys) throws RocksDBException {
public CompletableFuture<Void> seekToAsync(Arena arena, long iterationId, @NotNull Keys keys) throws RocksDBException {
var request = SeekToRequest.newBuilder()
.setIterationId(iterationId)
.addAllKeys(mapKeys(keys))
@ -333,7 +338,7 @@ public class GrpcConnection extends BaseConnection implements RocksDBAPI {
@SuppressWarnings("unchecked")
@Override
public <T> CompletionStage<T> subsequentAsync(Arena arena,
public <T> CompletableFuture<T> subsequentAsync(Arena arena,
long iterationId,
long skipCount,
long takeCount,
@ -346,12 +351,12 @@ public class GrpcConnection extends BaseConnection implements RocksDBAPI {
return switch (requestType) {
case RequestNothing<?> _ -> toResponse(this.futureStub.subsequent(request), _ -> null);
case RequestExists<?> _ ->
(CompletionStage<T>) toResponse(this.futureStub.subsequentExists(request), PreviousPresence::getPresent);
(CompletableFuture<T>) toResponse(this.futureStub.subsequentExists(request), PreviousPresence::getPresent);
case RequestMulti<?> _ -> {
CollectListMappedStreamObserver<KV, MemorySegment> responseObserver
= new CollectListMappedStreamObserver<>(kv -> mapByteString(kv.getValue()));
this.asyncStub.subsequentMultiGet(request, responseObserver);
yield (CompletionStage<T>) responseObserver;
yield (CompletableFuture<T>) responseObserver;
}
};
}
@ -465,4 +470,27 @@ public class GrpcConnection extends BaseConnection implements RocksDBAPI {
return cf;
}
@Override
public void close() {
try {
if (this.channel != null) {
this.channel.shutdown();
}
} catch (Exception ex) {
LOG.error("Failed to close channel", ex);
}
try {
if (this.channel != null) {
this.channel.awaitTermination(1, TimeUnit.MINUTES);
}
} catch (InterruptedException e) {
LOG.error("Failed to wait channel termination", e);
try {
this.channel.shutdownNow();
} catch (Exception ex) {
LOG.error("Failed to close channel", ex);
}
}
}
}

View File

@ -5,6 +5,7 @@ import it.cavallium.rockserver.core.common.RocksDBAsyncAPI;
import it.cavallium.rockserver.core.common.RocksDBSyncAPI;
import java.io.IOException;
import java.net.URI;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Supplier;
import java.util.logging.Level;
@ -76,7 +77,7 @@ public class LoggingClient implements RocksDBConnection {
}
@Override
public <R> CompletionStage<R> requestAsync(RocksDBAPICommand<R> req) {
public <R> CompletableFuture<R> requestAsync(RocksDBAPICommand<R> req) {
return asyncApi.requestAsync(req).whenComplete((result, e) -> {
if (e != null) {
logger.trace("Request failed: {} Error: {}", req, e.getMessage());

View File

@ -18,44 +18,44 @@ import it.cavallium.rockserver.core.common.RocksDBAPICommand.Subsequent;
import java.lang.foreign.Arena;
import java.lang.foreign.MemorySegment;
import java.util.List;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CompletableFuture;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
public interface RocksDBAsyncAPI extends RocksDBAsyncAPIRequestHandler {
/** See: {@link OpenTransaction}. */
default CompletionStage<Long> openTransactionAsync(long timeoutMs) throws RocksDBException {
default CompletableFuture<Long> openTransactionAsync(long timeoutMs) throws RocksDBException {
return requestAsync(new OpenTransaction(timeoutMs));
}
/** See: {@link CloseTransaction}. */
default CompletionStage<Boolean> closeTransactionAsync(long transactionId, boolean commit) throws RocksDBException {
default CompletableFuture<Boolean> closeTransactionAsync(long transactionId, boolean commit) throws RocksDBException {
return requestAsync(new CloseTransaction(transactionId, commit));
}
/** See: {@link CloseFailedUpdate}. */
default CompletionStage<Void> closeFailedUpdateAsync(long updateId) throws RocksDBException {
default CompletableFuture<Void> closeFailedUpdateAsync(long updateId) throws RocksDBException {
return requestAsync(new CloseFailedUpdate(updateId));
}
/** See: {@link CreateColumn}. */
default CompletionStage<Long> createColumnAsync(String name, @NotNull ColumnSchema schema) throws RocksDBException {
default CompletableFuture<Long> createColumnAsync(String name, @NotNull ColumnSchema schema) throws RocksDBException {
return requestAsync(new CreateColumn(name, schema));
}
/** See: {@link DeleteColumn}. */
default CompletionStage<Void> deleteColumnAsync(long columnId) throws RocksDBException {
default CompletableFuture<Void> deleteColumnAsync(long columnId) throws RocksDBException {
return requestAsync(new DeleteColumn(columnId));
}
/** See: {@link GetColumnId}. */
default CompletionStage<Long> getColumnIdAsync(@NotNull String name) throws RocksDBException {
default CompletableFuture<Long> getColumnIdAsync(@NotNull String name) throws RocksDBException {
return requestAsync(new GetColumnId(name));
}
/** See: {@link Put}. */
default <T> CompletionStage<T> putAsync(Arena arena,
default <T> CompletableFuture<T> putAsync(Arena arena,
long transactionOrUpdateId,
long columnId,
@NotNull Keys keys,
@ -65,7 +65,7 @@ public interface RocksDBAsyncAPI extends RocksDBAsyncAPIRequestHandler {
}
/** See: {@link PutMulti}. */
default <T> CompletionStage<List<T>> putMultiAsync(Arena arena,
default <T> CompletableFuture<List<T>> putMultiAsync(Arena arena,
long transactionOrUpdateId,
long columnId,
@NotNull List<@NotNull Keys> keys,
@ -75,7 +75,7 @@ public interface RocksDBAsyncAPI extends RocksDBAsyncAPIRequestHandler {
}
/** See: {@link Get}. */
default <T> CompletionStage<T> getAsync(Arena arena,
default <T> CompletableFuture<T> getAsync(Arena arena,
long transactionOrUpdateId,
long columnId,
@NotNull Keys keys,
@ -84,7 +84,7 @@ public interface RocksDBAsyncAPI extends RocksDBAsyncAPIRequestHandler {
}
/** See: {@link OpenIterator}. */
default CompletionStage<Long> openIteratorAsync(Arena arena,
default CompletableFuture<Long> openIteratorAsync(Arena arena,
long transactionId,
long columnId,
@NotNull Keys startKeysInclusive,
@ -102,17 +102,17 @@ public interface RocksDBAsyncAPI extends RocksDBAsyncAPIRequestHandler {
}
/** See: {@link CloseIterator}. */
default CompletionStage<Void> closeIteratorAsync(long iteratorId) throws RocksDBException {
default CompletableFuture<Void> closeIteratorAsync(long iteratorId) throws RocksDBException {
return requestAsync(new CloseIterator(iteratorId));
}
/** See: {@link SeekTo}. */
default CompletionStage<Void> seekToAsync(Arena arena, long iterationId, @NotNull Keys keys) throws RocksDBException {
default CompletableFuture<Void> seekToAsync(Arena arena, long iterationId, @NotNull Keys keys) throws RocksDBException {
return requestAsync(new SeekTo(arena, iterationId, keys));
}
/** See: {@link Subsequent}. */
default <T> CompletionStage<T> subsequentAsync(Arena arena,
default <T> CompletableFuture<T> subsequentAsync(Arena arena,
long iterationId,
long skipCount,
long takeCount,

View File

@ -1,29 +1,10 @@
package it.cavallium.rockserver.core.common;
import it.cavallium.rockserver.core.common.RequestType.RequestGet;
import it.cavallium.rockserver.core.common.RequestType.RequestPut;
import it.cavallium.rockserver.core.common.RocksDBAPICommand.CloseFailedUpdate;
import it.cavallium.rockserver.core.common.RocksDBAPICommand.CloseIterator;
import it.cavallium.rockserver.core.common.RocksDBAPICommand.CloseTransaction;
import it.cavallium.rockserver.core.common.RocksDBAPICommand.CreateColumn;
import it.cavallium.rockserver.core.common.RocksDBAPICommand.DeleteColumn;
import it.cavallium.rockserver.core.common.RocksDBAPICommand.Get;
import it.cavallium.rockserver.core.common.RocksDBAPICommand.GetColumnId;
import it.cavallium.rockserver.core.common.RocksDBAPICommand.OpenIterator;
import it.cavallium.rockserver.core.common.RocksDBAPICommand.OpenTransaction;
import it.cavallium.rockserver.core.common.RocksDBAPICommand.Put;
import it.cavallium.rockserver.core.common.RocksDBAPICommand.SeekTo;
import it.cavallium.rockserver.core.common.RocksDBAPICommand.Subsequent;
import java.lang.foreign.Arena;
import java.lang.foreign.MemorySegment;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
public interface RocksDBAsyncAPIRequestHandler {
default <R> CompletionStage<R> requestAsync(RocksDBAPICommand<R> req) {
default <R> CompletableFuture<R> requestAsync(RocksDBAPICommand<R> req) {
return CompletableFuture.failedFuture(new UnsupportedOperationException("Unsupported request type: " + req));
}
}

View File

@ -4,6 +4,7 @@ import static it.cavallium.rockserver.core.impl.ColumnInstance.BIG_ENDIAN_BYTES;
import static java.lang.foreign.MemorySegment.NULL;
import static java.util.Objects.requireNonNullElse;
import com.google.protobuf.ByteString;
import java.io.IOException;
import java.lang.foreign.Arena;
import java.lang.foreign.MemorySegment;
@ -68,6 +69,15 @@ public class Utils {
}
}
public static @NotNull MemorySegment toMemorySegment(Arena arena, ByteString value) {
var buf = value.asReadOnlyByteBuffer();
if (buf.isDirect()) {
return MemorySegment.ofBuffer(buf);
} else {
return arena.allocate(value.size()).copyFrom(MemorySegment.ofBuffer(buf));
}
}
@Contract(value = "!null -> !null; null -> null", pure = true)
public static MemorySegment toMemorySegment(byte... array) {
if (array != null) {

View File

@ -1,8 +1,14 @@
package it.cavallium.rockserver.core.server;
import static it.cavallium.rockserver.core.common.Utils.toMemorySegment;
import com.google.protobuf.ByteString;
import com.google.protobuf.Empty;
import com.google.rpc.DebugInfo;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.netty.NettyServerBuilder;
import io.grpc.protobuf.ProtoUtils;
import io.grpc.stub.StreamObserver;
import it.cavallium.rockserver.core.client.RocksDBConnection;
import it.cavallium.rockserver.core.common.ColumnHashType;
@ -17,6 +23,7 @@ import it.cavallium.rockserver.core.common.RequestType.RequestMulti;
import it.cavallium.rockserver.core.common.RequestType.RequestNothing;
import it.cavallium.rockserver.core.common.RequestType.RequestPrevious;
import it.cavallium.rockserver.core.common.RequestType.RequestPreviousPresence;
import it.cavallium.rockserver.core.common.Utils;
import it.cavallium.rockserver.core.common.api.proto.Changed;
import it.cavallium.rockserver.core.common.api.proto.CloseFailedUpdateRequest;
import it.cavallium.rockserver.core.common.api.proto.CloseIteratorRequest;
@ -54,9 +61,11 @@ import java.io.IOException;
import java.lang.foreign.Arena;
import java.lang.foreign.MemorySegment;
import java.net.InetSocketAddress;
import java.util.concurrent.CompletionException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.function.Function;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -149,8 +158,8 @@ public class GrpcServer extends Server {
.putAsync(autoArena,
request.getTransactionOrUpdateId(),
request.getColumnId(),
mapKeys(request.getData().getKeysCount(), request.getData()::getKeys),
MemorySegment.ofBuffer(request.getData().getValue().asReadOnlyByteBuffer()),
mapKeys(autoArena, request.getData().getKeysCount(), request.getData()::getKeys),
toMemorySegment(autoArena, request.getData().getValue()),
new RequestNothing<>()
)
.whenComplete(handleResponseObserver(MAP_EMPTY, responseObserver));
@ -184,8 +193,8 @@ public class GrpcServer extends Server {
.putAsync(autoArena,
initialRequest.getTransactionOrUpdateId(),
initialRequest.getColumnId(),
mapKeys(request.getData().getKeysCount(), request.getData()::getKeys),
MemorySegment.ofBuffer(request.getData().getValue().asReadOnlyByteBuffer()),
mapKeys(autoArena, request.getData().getKeysCount(), request.getData()::getKeys),
toMemorySegment(autoArena, request.getData().getValue()),
new RequestNothing<>()
)
.whenComplete((_, error) -> {
@ -228,8 +237,8 @@ public class GrpcServer extends Server {
.putAsync(autoArena,
request.getTransactionOrUpdateId(),
request.getColumnId(),
mapKeys(request.getData().getKeysCount(), request.getData()::getKeys),
MemorySegment.ofBuffer(request.getData().getValue().asReadOnlyByteBuffer()),
mapKeys(autoArena, request.getData().getKeysCount(), request.getData()::getKeys),
toMemorySegment(autoArena, request.getData().getValue()),
new RequestPrevious<>()
)
.whenComplete(handleResponseObserver(
@ -249,8 +258,8 @@ public class GrpcServer extends Server {
.putAsync(autoArena,
request.getTransactionOrUpdateId(),
request.getColumnId(),
mapKeys(request.getData().getKeysCount(), request.getData()::getKeys),
MemorySegment.ofBuffer(request.getData().getValue().asReadOnlyByteBuffer()),
mapKeys(autoArena, request.getData().getKeysCount(), request.getData()::getKeys),
toMemorySegment(autoArena, request.getData().getValue()),
new RequestDelta<>()
)
.whenComplete(handleResponseObserver(
@ -273,8 +282,8 @@ public class GrpcServer extends Server {
.putAsync(autoArena,
request.getTransactionOrUpdateId(),
request.getColumnId(),
mapKeys(request.getData().getKeysCount(), request.getData()::getKeys),
MemorySegment.ofBuffer(request.getData().getValue().asReadOnlyByteBuffer()),
mapKeys(autoArena, request.getData().getKeysCount(), request.getData()::getKeys),
toMemorySegment(autoArena, request.getData().getValue()),
new RequestChanged<>()
)
.whenComplete(handleResponseObserver(
@ -288,8 +297,8 @@ public class GrpcServer extends Server {
.putAsync(autoArena,
request.getTransactionOrUpdateId(),
request.getColumnId(),
mapKeys(request.getData().getKeysCount(), request.getData()::getKeys),
MemorySegment.ofBuffer(request.getData().getValue().asReadOnlyByteBuffer()),
mapKeys(autoArena, request.getData().getKeysCount(), request.getData()::getKeys),
toMemorySegment(autoArena, request.getData().getValue()),
new RequestPreviousPresence<>()
)
.whenComplete(handleResponseObserver(
@ -303,7 +312,7 @@ public class GrpcServer extends Server {
.getAsync(autoArena,
request.getTransactionOrUpdateId(),
request.getColumnId(),
mapKeys(request.getKeysCount(), request::getKeys),
mapKeys(autoArena, request.getKeysCount(), request::getKeys),
new RequestCurrent<>()
)
.whenComplete(handleResponseObserver(
@ -323,7 +332,7 @@ public class GrpcServer extends Server {
.getAsync(autoArena,
request.getTransactionOrUpdateId(),
request.getColumnId(),
mapKeys(request.getKeysCount(), request::getKeys),
mapKeys(autoArena, request.getKeysCount(), request::getKeys),
new RequestForUpdate<>()
)
.whenComplete(handleResponseObserver(
@ -344,7 +353,7 @@ public class GrpcServer extends Server {
.getAsync(autoArena,
request.getTransactionOrUpdateId(),
request.getColumnId(),
mapKeys(request.getKeysCount(), request::getKeys),
mapKeys(autoArena, request.getKeysCount(), request::getKeys),
new RequestExists<>()
)
.whenComplete(handleResponseObserver(
@ -358,8 +367,8 @@ public class GrpcServer extends Server {
.openIteratorAsync(autoArena,
request.getTransactionId(),
request.getColumnId(),
mapKeys(request.getStartKeysInclusiveCount(), request::getStartKeysInclusive),
mapKeys(request.getEndKeysExclusiveCount(), request::getEndKeysExclusive),
mapKeys(autoArena, request.getStartKeysInclusiveCount(), request::getStartKeysInclusive),
mapKeys(autoArena, request.getEndKeysExclusiveCount(), request::getEndKeysExclusive),
request.getReverse(),
request.getTimeoutMs()
)
@ -378,7 +387,7 @@ public class GrpcServer extends Server {
@Override
public void seekTo(SeekToRequest request, StreamObserver<Empty> responseObserver) {
client.getAsyncApi()
.seekToAsync(autoArena, request.getIterationId(), mapKeys(request.getKeysCount(), request::getKeys))
.seekToAsync(autoArena, request.getIterationId(), mapKeys(autoArena, request.getKeysCount(), request::getKeys))
.whenComplete(handleResponseObserver(MAP_EMPTY, responseObserver));
}
@ -455,7 +464,7 @@ public class GrpcServer extends Server {
private static IntList mapKeysLength(int count, Int2IntFunction keyGetterAt) {
var l = new IntArrayList(count);
for (int i = 0; i < count; i++) {
l.add(keyGetterAt.apply(i));
l.add((int) keyGetterAt.apply(i));
}
return l;
}
@ -474,10 +483,10 @@ public class GrpcServer extends Server {
return l;
}
private static Keys mapKeys(int count, Int2ObjectFunction<ByteString> keyGetterAt) {
private static Keys mapKeys(Arena arena, int count, Int2ObjectFunction<ByteString> keyGetterAt) {
var segments = new MemorySegment[count];
for (int i = 0; i < count; i++) {
segments[i] = MemorySegment.ofBuffer(keyGetterAt.apply(i).asReadOnlyByteBuffer());
segments[i] = toMemorySegment(arena, keyGetterAt.apply(i));
}
return new Keys(segments);
}
@ -497,11 +506,28 @@ public class GrpcServer extends Server {
};
}
private static final Metadata.Key<DebugInfo> DEBUG_INFO_TRAILER_KEY =
ProtoUtils.keyForProto(DebugInfo.getDefaultInstance());
private static <PREV, T> BiConsumer<? super PREV, Throwable> handleResponseObserver(Function<PREV, T> resultMapper,
StreamObserver<T> responseObserver) {
return (value, ex) -> {
if (ex != null) {
responseObserver.onError(ex);
Metadata trailers = new Metadata();
trailers.put(DEBUG_INFO_TRAILER_KEY, DebugInfo.newBuilder()
.setDetail("rockserver grpc execution failed")
.build());
var cause = ex;
if (cause instanceof CompletionException completionException) {
cause = completionException;
}
if (cause instanceof it.cavallium.rockserver.core.common.RocksDBException rocksDBException) {
cause = rocksDBException;
}
var error = Status.INTERNAL.withCause(cause)
.withDescription(cause.toString())
.asException(trailers);
responseObserver.onError(error);
} else {
T mapped;
try {