This commit is contained in:
Andrea Cavalli 2024-09-19 00:47:07 +02:00
parent 759e37558f
commit 165e90636a
6 changed files with 413 additions and 309 deletions

View File

@ -20,6 +20,7 @@
<protobuf-plugin.version>0.6.1</protobuf-plugin.version> <protobuf-plugin.version>0.6.1</protobuf-plugin.version>
<protobuf.version>3.25.3</protobuf.version> <protobuf.version>3.25.3</protobuf.version>
<grpc.version>1.65.1</grpc.version> <grpc.version>1.65.1</grpc.version>
<netty.version>4.1.100.Final</netty.version>
<rainbowgum.version>0.7.0</rainbowgum.version> <rainbowgum.version>0.7.0</rainbowgum.version>
</properties> </properties>
@ -114,6 +115,11 @@
<artifactId>grpc-netty</artifactId> <artifactId>grpc-netty</artifactId>
<version>${grpc.version}</version> <version>${grpc.version}</version>
</dependency> </dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-epoll</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency> <dependency>
<groupId>io.netty</groupId> <groupId>io.netty</groupId>
<artifactId>netty-tcnative-boringssl-static</artifactId> <artifactId>netty-tcnative-boringssl-static</artifactId>

View File

@ -24,6 +24,8 @@ module rockserver.core {
requires io.netty.codec.http; requires io.netty.codec.http;
requires io.netty.codec; requires io.netty.codec;
requires io.netty.codec.http2; requires io.netty.codec.http2;
requires jdk.unsupported;
requires io.netty.transport.classes.epoll;
exports it.cavallium.rockserver.core.client; exports it.cavallium.rockserver.core.client;
exports it.cavallium.rockserver.core.common; exports it.cavallium.rockserver.core.common;

View File

@ -0,0 +1,49 @@
package it.cavallium.rockserver.core;
import it.cavallium.rockserver.core.client.ClientBuilder;
import it.cavallium.rockserver.core.client.EmbeddedConnection;
import it.cavallium.rockserver.core.client.RocksDBConnection;
import it.cavallium.rockserver.core.common.ColumnSchema;
import it.cavallium.rockserver.core.common.Keys;
import it.cavallium.rockserver.core.common.RequestType;
import it.cavallium.rockserver.core.common.Utils;
import it.cavallium.rockserver.core.impl.EmbeddedDB;
import it.cavallium.rockserver.core.server.GrpcServer;
import it.unimi.dsi.fastutil.ints.IntList;
import it.unimi.dsi.fastutil.objects.ObjectList;
import java.io.IOException;
import java.lang.foreign.Arena;
import java.lang.foreign.MemorySegment;
public class TestGrpcLoop {
public static void main(String[] args) throws IOException, InterruptedException {
var embeddedDB = new EmbeddedConnection(null, "main", null);
var server = new GrpcServer(embeddedDB, "localhost", 12345);
var clientB = new ClientBuilder();
clientB.setHttpAddress(new Utils.HostAndPort("localhost", 12345));
clientB.setName("local");
clientB.setUseThrift(false);
var client = clientB.build();
var col = client.getSyncApi().createColumn("test", ColumnSchema.of(IntList.of(15), ObjectList.of(), true));
var parallelism = 4;
for (int i = 0; i < parallelism; i++) {
var t = new Thread(() -> {
while (true) {
try (var arena = Arena.ofConfined()) {
var delta = client.getSyncApi().put(arena, 0, col,
new Keys(new MemorySegment[]{MemorySegment.ofArray(new byte[15])}),
MemorySegment.ofArray(new byte[15]),
RequestType.delta());
}
}
});
t.setDaemon(true);
t.setName("test-requests-thread-" + i);
t.start();
if (i + 1 == parallelism) {
t.join();
}
}
}
}

View File

@ -119,7 +119,7 @@ public class GrpcConnection extends BaseConnection implements RocksDBAPI {
@Override @Override
public <R> R requestSync(RocksDBAPICommand<R> req) { public <R> R requestSync(RocksDBAPICommand<R> req) {
var asyncResponse = requestAsync(req); var asyncResponse = req.handleAsync(this);
return asyncResponse return asyncResponse
.toCompletableFuture() .toCompletableFuture()
.join(); .join();

View File

@ -4,13 +4,17 @@ import static it.cavallium.rockserver.core.common.Utils.toMemorySegment;
import com.google.protobuf.ByteString; import com.google.protobuf.ByteString;
import com.google.protobuf.Empty; import com.google.protobuf.Empty;
import io.grpc.Status;
import io.grpc.netty.NettyServerBuilder; import io.grpc.netty.NettyServerBuilder;
import io.grpc.stub.StreamObserver; import io.grpc.stub.StreamObserver;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.ServerChannel;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollServerDomainSocketChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import it.cavallium.rockserver.core.client.RocksDBConnection; import it.cavallium.rockserver.core.client.RocksDBConnection;
import it.cavallium.rockserver.core.common.ColumnHashType; import it.cavallium.rockserver.core.common.*;
import it.cavallium.rockserver.core.common.ColumnSchema;
import it.cavallium.rockserver.core.common.Keys;
import it.cavallium.rockserver.core.common.RequestType.RequestChanged; import it.cavallium.rockserver.core.common.RequestType.RequestChanged;
import it.cavallium.rockserver.core.common.RequestType.RequestCurrent; import it.cavallium.rockserver.core.common.RequestType.RequestCurrent;
import it.cavallium.rockserver.core.common.RequestType.RequestDelta; import it.cavallium.rockserver.core.common.RequestType.RequestDelta;
@ -57,12 +61,10 @@ import java.io.IOException;
import java.lang.foreign.Arena; import java.lang.foreign.Arena;
import java.lang.foreign.MemorySegment; import java.lang.foreign.MemorySegment;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.function.Function;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -71,25 +73,42 @@ public class GrpcServer extends Server {
private static final Logger LOG = LoggerFactory.getLogger(GrpcServer.class.getName()); private static final Logger LOG = LoggerFactory.getLogger(GrpcServer.class.getName());
private final GrpcServerImpl grpc; private final GrpcServerImpl grpc;
private final EventLoopGroup elg;
private final ExecutorService executor;
private final io.grpc.Server server; private final io.grpc.Server server;
public GrpcServer(RocksDBConnection client, String http2Host, int http2Port) throws IOException { public GrpcServer(RocksDBConnection client, String http2Host, int http2Port) throws IOException {
super(client); super(client);
this.grpc = new GrpcServerImpl(this.getClient()); this.grpc = new GrpcServerImpl(this.getClient());
EventLoopGroup elg;
Class<? extends ServerChannel> channelType;
try {
elg = new EpollEventLoopGroup(1);
channelType = EpollServerDomainSocketChannel.class;
} catch (UnsatisfiedLinkError ex) {
elg = new NioEventLoopGroup();
channelType = NioServerSocketChannel.class;
}
this.elg = elg;
this.executor = Executors.newWorkStealingPool(Runtime.getRuntime().availableProcessors() * 2);
this.server = NettyServerBuilder.forAddress(new InetSocketAddress(http2Host, http2Port)) this.server = NettyServerBuilder.forAddress(new InetSocketAddress(http2Host, http2Port))
.bossEventLoopGroup(elg)
.workerEventLoopGroup(elg)
.directExecutor()
.channelType(channelType)
.withChildOption(ChannelOption.SO_KEEPALIVE, false)
.addService(grpc) .addService(grpc)
.build(); .build();
server.start(); server.start();
LOG.info("GRPC RocksDB server is listening at " + http2Host + ":" + http2Port); LOG.info("GRPC RocksDB server is listening at " + http2Host + ":" + http2Port);
} }
private static final class GrpcServerImpl extends RocksDBServiceImplBase { private final class GrpcServerImpl extends RocksDBServiceImplBase {
private static final Function<? super Void, Empty> MAP_EMPTY = _ -> Empty.getDefaultInstance(); private final RocksDBSyncAPI api;
private final RocksDBConnection client;
public GrpcServerImpl(RocksDBConnection client) { public GrpcServerImpl(RocksDBConnection client) {
this.client = client; this.api = client.getSyncApi();
} }
// functions // functions
@ -97,72 +116,98 @@ public class GrpcServer extends Server {
@Override @Override
public void openTransaction(OpenTransactionRequest request, public void openTransaction(OpenTransactionRequest request,
StreamObserver<OpenTransactionResponse> responseObserver) { StreamObserver<OpenTransactionResponse> responseObserver) {
client.getAsyncApi() executor.execute(() -> {
.openTransactionAsync(request.getTimeoutMs()) try {
.whenComplete(handleResponseObserver( var txId = api.openTransaction(request.getTimeoutMs());
txId -> OpenTransactionResponse.newBuilder().setTransactionId(txId).build(), responseObserver.onNext(OpenTransactionResponse.newBuilder().setTransactionId(txId).build());
responseObserver, null)); responseObserver.onCompleted();
} catch (Throwable ex) {
responseObserver.onError(ex);
}
});
} }
@Override @Override
public void closeTransaction(CloseTransactionRequest request, public void closeTransaction(CloseTransactionRequest request,
StreamObserver<CloseTransactionResponse> responseObserver) { StreamObserver<CloseTransactionResponse> responseObserver) {
client.getAsyncApi() executor.execute(() -> {
.closeTransactionAsync(request.getTransactionId(), request.getCommit()) try {
.whenComplete(handleResponseObserver( var committed = api.closeTransaction(request.getTransactionId(), request.getCommit());
committed -> CloseTransactionResponse.newBuilder().setSuccessful(committed).build(), var response = CloseTransactionResponse.newBuilder().setSuccessful(committed).build();
responseObserver, null)); responseObserver.onNext(response);
responseObserver.onCompleted();
} catch (Throwable ex) {
responseObserver.onError(ex);
}
});
} }
@Override @Override
public void closeFailedUpdate(CloseFailedUpdateRequest request, StreamObserver<Empty> responseObserver) { public void closeFailedUpdate(CloseFailedUpdateRequest request, StreamObserver<Empty> responseObserver) {
client.getAsyncApi() executor.execute(() -> {
.closeFailedUpdateAsync(request.getUpdateId()) try {
.whenComplete(handleResponseObserver(MAP_EMPTY, responseObserver, null)); api.closeFailedUpdate(request.getUpdateId());
responseObserver.onCompleted();
} catch (Throwable ex) {
responseObserver.onError(ex);
}
});
} }
@Override @Override
public void createColumn(CreateColumnRequest request, StreamObserver<CreateColumnResponse> responseObserver) { public void createColumn(CreateColumnRequest request, StreamObserver<CreateColumnResponse> responseObserver) {
client.getAsyncApi() executor.execute(() -> {
.createColumnAsync(request.getName(), mapColumnSchema(request.getSchema())) var colId = api.createColumn(request.getName(), mapColumnSchema(request.getSchema()));
.whenComplete(handleResponseObserver( var response = CreateColumnResponse.newBuilder().setColumnId(colId).build();
colId -> CreateColumnResponse.newBuilder().setColumnId(colId).build(), responseObserver.onNext(response);
responseObserver, null)); responseObserver.onCompleted();
});
} }
@Override @Override
public void deleteColumn(DeleteColumnRequest request, StreamObserver<Empty> responseObserver) { public void deleteColumn(DeleteColumnRequest request, StreamObserver<Empty> responseObserver) {
client.getAsyncApi() executor.execute(() -> {
.deleteColumnAsync(request.getColumnId()) api.deleteColumn(request.getColumnId());
.whenComplete(handleResponseObserver(MAP_EMPTY, responseObserver, null)); responseObserver.onCompleted();
});
} }
@Override @Override
public void getColumnId(GetColumnIdRequest request, StreamObserver<GetColumnIdResponse> responseObserver) { public void getColumnId(GetColumnIdRequest request, StreamObserver<GetColumnIdResponse> responseObserver) {
client.getAsyncApi() executor.execute(() -> {
.getColumnIdAsync(request.getName()) try {
.whenComplete(handleResponseObserver( var colId = api.getColumnId(request.getName());
colId -> GetColumnIdResponse.newBuilder().setColumnId(colId).build(), var response = GetColumnIdResponse.newBuilder().setColumnId(colId).build();
responseObserver, null)); responseObserver.onNext(response);
responseObserver.onCompleted();
} catch (Throwable ex) {
responseObserver.onError(ex);
}
});
} }
@Override @Override
public void put(PutRequest request, StreamObserver<Empty> responseObserver) { public void put(PutRequest request, StreamObserver<Empty> responseObserver) {
var autoArena = Arena.ofShared(); executor.execute(() -> {
client.getAsyncApi() try {
.putAsync(autoArena, try (var arena = Arena.ofConfined()) {
request.getTransactionOrUpdateId(), api.put(arena,
request.getColumnId(), request.getTransactionOrUpdateId(),
mapKeys(autoArena, request.getData().getKeysCount(), request.getData()::getKeys), request.getColumnId(),
toMemorySegment(autoArena, request.getData().getValue()), mapKeys(arena, request.getData().getKeysCount(), request.getData()::getKeys),
new RequestNothing<>() toMemorySegment(arena, request.getData().getValue()),
) new RequestNothing<>()
.whenComplete(handleResponseObserver(MAP_EMPTY, responseObserver, autoArena)); );
}
responseObserver.onCompleted();
} catch (Throwable ex) {
responseObserver.onError(ex);
}
});
} }
@Override @Override
public StreamObserver<PutMultiRequest> putMulti(StreamObserver<Empty> responseObserver) { public StreamObserver<PutMultiRequest> putMulti(StreamObserver<Empty> responseObserver) {
var autoArena = Arena.ofShared();
return new StreamObserver<>() { return new StreamObserver<>() {
private boolean initialRequestDone = false; private boolean initialRequestDone = false;
private long requestsCount = 0; private long requestsCount = 0;
@ -185,28 +230,28 @@ public class GrpcServer extends Server {
throw new UnsupportedOperationException("Initial request already done!"); throw new UnsupportedOperationException("Initial request already done!");
} }
++requestsCount; ++requestsCount;
client.getAsyncApi() executor.execute(() -> {
.putAsync(autoArena, try {
initialRequest.getTransactionOrUpdateId(), try (var arena = Arena.ofConfined()) {
initialRequest.getColumnId(), api.put(arena,
mapKeys(autoArena, request.getData().getKeysCount(), request.getData()::getKeys), initialRequest.getTransactionOrUpdateId(),
toMemorySegment(autoArena, request.getData().getValue()), initialRequest.getColumnId(),
new RequestNothing<>() mapKeys(arena, request.getData().getKeysCount(), request.getData()::getKeys),
) toMemorySegment(arena, request.getData().getValue()),
.whenComplete((_, error) -> { new RequestNothing<>());
if (error != null) { }
closeArenaSafe(autoArena); } catch (RocksDBException ex) {
responseObserver.onError(error); responseObserver.onError(ex);
} else { return;
var newProcessedRequestCount = processedRequestsCount.incrementAndGet(); }
if (requestsCountFinalized) {
if (newProcessedRequestCount == requestsCount) { var newProcessedRequestCount = processedRequestsCount.incrementAndGet();
closeArenaSafe(autoArena); if (requestsCountFinalized) {
responseObserver.onCompleted(); if (newProcessedRequestCount == requestsCount) {
} responseObserver.onCompleted();
} }
} }
}); });
} }
case null, default -> case null, default ->
throw new UnsupportedOperationException("Unsupported operation: " throw new UnsupportedOperationException("Unsupported operation: "
@ -216,7 +261,6 @@ public class GrpcServer extends Server {
@Override @Override
public void onError(Throwable t) { public void onError(Throwable t) {
closeArenaSafe(autoArena);
responseObserver.onError(t); responseObserver.onError(t);
} }
@ -224,7 +268,6 @@ public class GrpcServer extends Server {
public void onCompleted() { public void onCompleted() {
requestsCountFinalized = true; requestsCountFinalized = true;
if (requestsCount == 0) { if (requestsCount == 0) {
closeArenaSafe(autoArena);
responseObserver.onCompleted(); responseObserver.onCompleted();
} }
} }
@ -233,237 +276,288 @@ public class GrpcServer extends Server {
@Override @Override
public void putGetPrevious(PutRequest request, StreamObserver<Previous> responseObserver) { public void putGetPrevious(PutRequest request, StreamObserver<Previous> responseObserver) {
var autoArena = Arena.ofShared(); executor.execute(() -> {
client.getAsyncApi() try {
.putAsync(autoArena, try (var arena = Arena.ofConfined()) {
request.getTransactionOrUpdateId(), var prev = api.put(arena,
request.getColumnId(), request.getTransactionOrUpdateId(),
mapKeys(autoArena, request.getData().getKeysCount(), request.getData()::getKeys), request.getColumnId(),
toMemorySegment(autoArena, request.getData().getValue()), mapKeys(arena, request.getData().getKeysCount(), request.getData()::getKeys),
new RequestPrevious<>() toMemorySegment(arena, request.getData().getValue()),
) new RequestPrevious<>()
.whenComplete(handleResponseObserver( );
prev -> { var prevBuilder = Previous.newBuilder();
var prevBuilder = Previous.newBuilder(); if (prev != null) {
if (prev != null) { prevBuilder.setPrevious(ByteString.copyFrom(prev.asByteBuffer()));
prevBuilder.setPrevious(ByteString.copyFrom(prev.asByteBuffer())); }
} var response = prevBuilder.build();
return prevBuilder.build(); responseObserver.onNext(response);
}, }
responseObserver, autoArena)); responseObserver.onCompleted();
} catch (Throwable ex) {
responseObserver.onError(ex);
}
});
} }
@Override @Override
public void putGetDelta(PutRequest request, StreamObserver<Delta> responseObserver) { public void putGetDelta(PutRequest request, StreamObserver<Delta> responseObserver) {
var autoArena = Arena.ofShared(); executor.execute(() -> {
client.getAsyncApi() try {
.putAsync(autoArena, try (var arena = Arena.ofConfined()) {
request.getTransactionOrUpdateId(), var delta = api.put(arena,
request.getColumnId(), request.getTransactionOrUpdateId(),
mapKeys(autoArena, request.getData().getKeysCount(), request.getData()::getKeys), request.getColumnId(),
toMemorySegment(autoArena, request.getData().getValue()), mapKeys(arena, request.getData().getKeysCount(), request.getData()::getKeys),
new RequestDelta<>() toMemorySegment(arena, request.getData().getValue()),
) new RequestDelta<>()
.whenComplete(handleResponseObserver( );
delta -> { var deltaBuilder = Delta.newBuilder();
var deltaBuilder = Delta.newBuilder(); if (delta.previous() != null) {
if (delta.previous() != null) { deltaBuilder.setPrevious(ByteString.copyFrom(delta.previous().asByteBuffer()));
deltaBuilder.setPrevious(ByteString.copyFrom(delta.previous().asByteBuffer())); }
} if (delta.current() != null) {
if (delta.current() != null) { deltaBuilder.setCurrent(ByteString.copyFrom(delta.current().asByteBuffer()));
deltaBuilder.setCurrent(ByteString.copyFrom(delta.current().asByteBuffer())); }
} var response = deltaBuilder.build();
return deltaBuilder.build(); responseObserver.onNext(response);
}, }
responseObserver, autoArena)); responseObserver.onCompleted();
} catch (Throwable ex) {
responseObserver.onError(ex);
}
});
} }
@Override @Override
public void putGetChanged(PutRequest request, StreamObserver<Changed> responseObserver) { public void putGetChanged(PutRequest request, StreamObserver<Changed> responseObserver) {
var autoArena = Arena.ofShared(); executor.execute(() -> {
client.getAsyncApi() try {
.putAsync(autoArena, try (var arena = Arena.ofConfined()) {
request.getTransactionOrUpdateId(), var changed = api.put(arena,
request.getColumnId(), request.getTransactionOrUpdateId(),
mapKeys(autoArena, request.getData().getKeysCount(), request.getData()::getKeys), request.getColumnId(),
toMemorySegment(autoArena, request.getData().getValue()), mapKeys(arena, request.getData().getKeysCount(), request.getData()::getKeys),
new RequestChanged<>() toMemorySegment(arena, request.getData().getValue()),
) new RequestChanged<>()
.whenComplete(handleResponseObserver( );
changed -> Changed.newBuilder().setChanged(changed).build(), var response = Changed.newBuilder().setChanged(changed).build();
responseObserver, autoArena)); responseObserver.onNext(response);
}
responseObserver.onCompleted();
} catch (Throwable ex) {
responseObserver.onError(ex);
}
});
} }
@Override @Override
public void putGetPreviousPresence(PutRequest request, StreamObserver<PreviousPresence> responseObserver) { public void putGetPreviousPresence(PutRequest request, StreamObserver<PreviousPresence> responseObserver) {
var autoArena = Arena.ofShared(); executor.execute(() -> {
client.getAsyncApi() try {
.putAsync(autoArena, try (var arena = Arena.ofConfined()) {
request.getTransactionOrUpdateId(), var present = api.put(arena,
request.getColumnId(), request.getTransactionOrUpdateId(),
mapKeys(autoArena, request.getData().getKeysCount(), request.getData()::getKeys), request.getColumnId(),
toMemorySegment(autoArena, request.getData().getValue()), mapKeys(arena, request.getData().getKeysCount(), request.getData()::getKeys),
new RequestPreviousPresence<>() toMemorySegment(arena, request.getData().getValue()),
) new RequestPreviousPresence<>()
.whenComplete(handleResponseObserver( );
present -> PreviousPresence.newBuilder().setPresent(present).build(), var response = PreviousPresence.newBuilder().setPresent(present).build();
responseObserver, autoArena)); responseObserver.onNext(response);
}
responseObserver.onCompleted();
} catch (Throwable ex) {
responseObserver.onError(ex);
}
});
} }
@Override @Override
public void get(GetRequest request, StreamObserver<GetResponse> responseObserver) { public void get(GetRequest request, StreamObserver<GetResponse> responseObserver) {
var autoArena = Arena.ofShared(); executor.execute(() -> {
client.getAsyncApi() try {
.getAsync(autoArena, try (var arena = Arena.ofConfined()) {
request.getTransactionOrUpdateId(), var current = api.get(arena,
request.getColumnId(), request.getTransactionOrUpdateId(),
mapKeys(autoArena, request.getKeysCount(), request::getKeys), request.getColumnId(),
new RequestCurrent<>() mapKeys(arena, request.getKeysCount(), request::getKeys),
) new RequestCurrent<>()
.whenComplete(handleResponseObserver( );
current -> { var responseBuilder = GetResponse.newBuilder();
var response = GetResponse.newBuilder(); if (current != null) {
if (current != null) { responseBuilder.setValue(ByteString.copyFrom(current.asByteBuffer()));
response.setValue(ByteString.copyFrom(current.asByteBuffer())); }
} var response = responseBuilder.build();
return response.build(); responseObserver.onNext(response);
}, }
responseObserver, autoArena)); responseObserver.onCompleted();
} catch (Throwable ex) {
responseObserver.onError(ex);
}
});
} }
@Override @Override
public void getForUpdate(GetRequest request, StreamObserver<UpdateBegin> responseObserver) { public void getForUpdate(GetRequest request, StreamObserver<UpdateBegin> responseObserver) {
var autoArena = Arena.ofShared(); executor.execute(() -> {
client.getAsyncApi() try {
.getAsync(autoArena, try (var arena = Arena.ofConfined()) {
request.getTransactionOrUpdateId(), var forUpdate = api.get(arena,
request.getColumnId(), request.getTransactionOrUpdateId(),
mapKeys(autoArena, request.getKeysCount(), request::getKeys), request.getColumnId(),
new RequestForUpdate<>() mapKeys(arena, request.getKeysCount(), request::getKeys),
) new RequestForUpdate<>()
.whenComplete(handleResponseObserver( );
forUpdate -> { var responseBuilder = UpdateBegin.newBuilder();
var response = UpdateBegin.newBuilder(); responseBuilder.setUpdateId(forUpdate.updateId());
response.setUpdateId(forUpdate.updateId()); if (forUpdate.previous() != null) {
if (forUpdate.previous() != null) { responseBuilder.setPrevious(ByteString.copyFrom(forUpdate.previous().asByteBuffer()));
response.setPrevious(ByteString.copyFrom(forUpdate.previous().asByteBuffer())); }
} var response = responseBuilder.build();
return response.build(); responseObserver.onNext(response);
}, }
responseObserver, autoArena)); responseObserver.onCompleted();
} catch (Throwable ex) {
responseObserver.onError(ex);
}
});
} }
@Override @Override
public void exists(GetRequest request, StreamObserver<PreviousPresence> responseObserver) { public void exists(GetRequest request, StreamObserver<PreviousPresence> responseObserver) {
var autoArena = Arena.ofShared(); executor.execute(() -> {
client.getAsyncApi() try {
.getAsync(autoArena, try (var arena = Arena.ofConfined()) {
request.getTransactionOrUpdateId(), var exists = api.get(arena,
request.getColumnId(), request.getTransactionOrUpdateId(),
mapKeys(autoArena, request.getKeysCount(), request::getKeys), request.getColumnId(),
new RequestExists<>() mapKeys(arena, request.getKeysCount(), request::getKeys),
) new RequestExists<>()
.whenComplete(handleResponseObserver( );
exists -> PreviousPresence.newBuilder().setPresent(exists).build(), responseObserver.onNext(PreviousPresence.newBuilder().setPresent(exists).build());
responseObserver, autoArena)); }
responseObserver.onCompleted();
} catch (Throwable ex) {
responseObserver.onError(ex);
}
});
} }
@Override @Override
public void openIterator(OpenIteratorRequest request, StreamObserver<OpenIteratorResponse> responseObserver) { public void openIterator(OpenIteratorRequest request, StreamObserver<OpenIteratorResponse> responseObserver) {
var autoArena = Arena.ofShared(); executor.execute(() -> {
client.getAsyncApi() try {
.openIteratorAsync(autoArena, try (var arena = Arena.ofConfined()) {
request.getTransactionId(), var iteratorId = api.openIterator(arena,
request.getColumnId(), request.getTransactionId(),
mapKeys(autoArena, request.getStartKeysInclusiveCount(), request::getStartKeysInclusive), request.getColumnId(),
mapKeys(autoArena, request.getEndKeysExclusiveCount(), request::getEndKeysExclusive), mapKeys(arena, request.getStartKeysInclusiveCount(), request::getStartKeysInclusive),
request.getReverse(), mapKeys(arena, request.getEndKeysExclusiveCount(), request::getEndKeysExclusive),
request.getTimeoutMs() request.getReverse(),
) request.getTimeoutMs()
.whenComplete(handleResponseObserver( );
iteratorId -> OpenIteratorResponse.newBuilder().setIteratorId(iteratorId).build(), responseObserver.onNext(OpenIteratorResponse.newBuilder().setIteratorId(iteratorId).build());
responseObserver, autoArena)); }
responseObserver.onCompleted();
} catch (Throwable ex) {
responseObserver.onError(ex);
}
});
} }
@Override @Override
public void closeIterator(CloseIteratorRequest request, StreamObserver<Empty> responseObserver) { public void closeIterator(CloseIteratorRequest request, StreamObserver<Empty> responseObserver) {
client.getAsyncApi() executor.execute(() -> {
.closeIteratorAsync(request.getIteratorId()) try {
.whenComplete(handleResponseObserver(MAP_EMPTY, responseObserver, null)); api.closeIterator(request.getIteratorId());
responseObserver.onCompleted();
} catch (Throwable ex) {
responseObserver.onError(ex);
}
});
} }
@Override @Override
public void seekTo(SeekToRequest request, StreamObserver<Empty> responseObserver) { public void seekTo(SeekToRequest request, StreamObserver<Empty> responseObserver) {
var autoArena = Arena.ofShared(); executor.execute(() -> {
client.getAsyncApi() try {
.seekToAsync(autoArena, request.getIterationId(), mapKeys(autoArena, request.getKeysCount(), request::getKeys)) try (var arena = Arena.ofConfined()) {
.whenComplete(handleResponseObserver(MAP_EMPTY, responseObserver, autoArena)); api.seekTo(arena, request.getIterationId(), mapKeys(arena, request.getKeysCount(), request::getKeys));
}
responseObserver.onCompleted();
} catch (Throwable ex) {
responseObserver.onError(ex);
}
});
} }
@Override @Override
public void subsequent(SubsequentRequest request, StreamObserver<Empty> responseObserver) { public void subsequent(SubsequentRequest request, StreamObserver<Empty> responseObserver) {
var autoArena = Arena.ofShared(); executor.execute(() -> {
client.getAsyncApi() try {
.subsequentAsync(autoArena, try (var arena = Arena.ofConfined()) {
request.getIterationId(), api.subsequent(arena, request.getIterationId(),
request.getSkipCount(), request.getSkipCount(),
request.getTakeCount(), request.getTakeCount(),
new RequestNothing<>() new RequestNothing<>());
) }
.whenComplete(handleResponseObserver(MAP_EMPTY, responseObserver, autoArena)); responseObserver.onCompleted();
} catch (Throwable ex) {
responseObserver.onError(ex);
}
});
} }
@Override @Override
public void subsequentExists(SubsequentRequest request, StreamObserver<PreviousPresence> responseObserver) { public void subsequentExists(SubsequentRequest request, StreamObserver<PreviousPresence> responseObserver) {
var autoArena = Arena.ofShared(); executor.execute(() -> {
client.getAsyncApi() try {
.subsequentAsync(autoArena, try (var arena = Arena.ofConfined()) {
request.getIterationId(), var exists = api.subsequent(arena, request.getIterationId(),
request.getSkipCount(), request.getSkipCount(),
request.getTakeCount(), request.getTakeCount(),
new RequestExists<>() new RequestExists<>());
) var response = PreviousPresence.newBuilder().setPresent(exists).build();
.whenComplete(handleResponseObserver( responseObserver.onNext(response);
exists -> PreviousPresence.newBuilder().setPresent(exists).build(), }
responseObserver, autoArena)); responseObserver.onCompleted();
} catch (Throwable ex) {
responseObserver.onError(ex);
}
});
} }
@Override @Override
public void subsequentMultiGet(SubsequentRequest request, StreamObserver<KV> responseObserver) { public void subsequentMultiGet(SubsequentRequest request, StreamObserver<KV> responseObserver) {
var autoArena = Arena.ofShared(); executor.execute(() -> {
subsequentMultiPage(request, responseObserver, 0, autoArena); try {
} try (var arena = Arena.ofConfined()) {
int pageIndex = 0;
public void subsequentMultiPage(SubsequentRequest request, StreamObserver<KV> responseObserver, int pageIndex, Arena autoArena) { final long pageSize = 16L;
final long pageSize = 16L; while (request.getTakeCount() > pageIndex * pageSize) {
if (request.getTakeCount() > pageIndex * pageSize) { var response = api.subsequent(arena,
client.getAsyncApi() request.getIterationId(),
.subsequentAsync(autoArena, pageIndex == 0 ? request.getSkipCount() : 0,
request.getIterationId(), Math.min(request.getTakeCount() - pageIndex * pageSize, pageSize),
pageIndex == 0 ? request.getSkipCount() : 0, new RequestMulti<>()
Math.min(request.getTakeCount() - pageIndex * pageSize, pageSize), );
new RequestMulti<>() for (MemorySegment entry : response) {
) Keys keys = null; // todo: implement
.whenComplete((response, ex) -> { MemorySegment value = entry;
if (ex != null) { responseObserver.onNext(KV.newBuilder()
closeArenaSafe(autoArena); .addAllKeys(null) // todo: implement
responseObserver.onError(ex); .setValue(ByteString.copyFrom(value.asByteBuffer()))
} else { .build());
for (MemorySegment entry : response) { }
Keys keys = null; // todo: implement pageIndex++;
MemorySegment value = entry; }
responseObserver.onNext(KV.newBuilder() }
.addAllKeys(null) // todo: implement responseObserver.onCompleted();
.setValue(ByteString.copyFrom(value.asByteBuffer())) } catch (Throwable ex) {
.build()); responseObserver.onError(ex);
} }
subsequentMultiPage(request, responseObserver, pageIndex + 1, autoArena); });
}
});
} else {
closeArenaSafe(autoArena);
responseObserver.onCompleted();
}
} }
private static void closeArenaSafe(Arena autoArena) { private static void closeArenaSafe(Arena autoArena) {
@ -514,55 +608,6 @@ public class GrpcServer extends Server {
} }
return new Keys(segments); return new Keys(segments);
} }
// utils
private static <T> BiConsumer<? super T, Throwable> handleResponseObserver(StreamObserver<T> responseObserver) {
return (value, ex) -> {
if (ex != null) {
responseObserver.onError(ex);
} else {
if (value != null) {
responseObserver.onNext(value);
}
responseObserver.onCompleted();
}
};
}
private static <PREV, T> BiConsumer<? super PREV, Throwable> handleResponseObserver(Function<PREV, T> resultMapper,
StreamObserver<T> responseObserver, @Nullable Arena autoArena) {
return (value, ex) -> {
if (ex != null) {
closeArenaSafe(autoArena);
var cause = ex;
if (cause instanceof CompletionException completionException) {
cause = completionException;
}
if (cause instanceof it.cavallium.rockserver.core.common.RocksDBException rocksDBException) {
cause = rocksDBException;
}
var error = Status.INTERNAL.withCause(cause)
.withDescription(cause.toString())
.asException();
responseObserver.onError(error);
} else {
T mapped;
try {
mapped = resultMapper.apply(value);
} catch (Throwable ex2) {
closeArenaSafe(autoArena);
responseObserver.onError(ex2);
return;
}
if (mapped != null) {
responseObserver.onNext(mapped);
}
closeArenaSafe(autoArena);
responseObserver.onCompleted();
}
};
}
} }
@Override @Override
@ -574,6 +619,8 @@ public class GrpcServer extends Server {
} catch (InterruptedException e) { } catch (InterruptedException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
elg.close();
executor.close();
super.close(); super.close();
} }
} }

View File

@ -162,11 +162,11 @@ abstract class EmbeddedDBTest {
Assertions.assertThrows(RocksDBException.class, () -> db.put(arena, 0, colId, key, toMemorySegmentSimple(arena, 123), RequestType.delta())); Assertions.assertThrows(RocksDBException.class, () -> db.put(arena, 0, colId, key, toMemorySegmentSimple(arena, 123), RequestType.delta()));
} else { } else {
Assertions.assertThrows(RocksDBException.class, () -> db.put(arena, 0, colId, key, MemorySegment.NULL, RequestType.delta())); Assertions.assertThrows(RocksDBException.class, () -> db.put(arena, 0, colId, key, MemorySegment.NULL, RequestType.delta()));
Assertions.assertThrows(RocksDBException.class, () -> db.put(arena, 0, colId, key, null, RequestType.delta())); Assertions.assertThrows(IllegalArgumentException.class, () -> db.put(arena, 0, colId, key, null, RequestType.delta()));
} }
Assertions.assertThrows(RocksDBException.class, () -> db.put(arena, 0, colId, null, value1, RequestType.delta())); Assertions.assertThrows(IllegalArgumentException.class, () -> db.put(arena, 0, colId, null, value1, RequestType.delta()));
Assertions.assertThrows(RocksDBException.class, () -> db.put(arena, 0, colId, null, null, RequestType.delta())); Assertions.assertThrows(IllegalArgumentException.class, () -> db.put(arena, 0, colId, null, null, RequestType.delta()));
Assertions.assertThrows(RocksDBException.class, () -> db.put(arena, 0, colId, key, value1, null)); Assertions.assertThrows(RocksDBException.class, () -> db.put(arena, 0, colId, key, value1, null));
Assertions.assertThrows(RocksDBException.class, () -> db.put(arena, 1, colId, key, value1, RequestType.delta())); Assertions.assertThrows(RocksDBException.class, () -> db.put(arena, 1, colId, key, value1, RequestType.delta()));
Assertions.assertThrows(RocksDBException.class, () -> db.put(arena, 0, 21203, key, value1, RequestType.delta())); Assertions.assertThrows(RocksDBException.class, () -> db.put(arena, 0, 21203, key, value1, RequestType.delta()));