diff --git a/pom.xml b/pom.xml
index 1e2b7f0..f19915d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -17,6 +17,9 @@
2.0.12
rockserver-core
it.cavallium.rockserver.core.Main
+ 0.6.1
+ 3.25.3
+ 1.65.0
@@ -80,6 +83,21 @@
slf4j-api
${slf4j.version}
+
+ io.grpc
+ grpc-netty
+ ${grpc.version}
+
+
+ io.grpc
+ grpc-protobuf
+ ${grpc.version}
+
+
+ io.grpc
+ grpc-stub
+ ${grpc.version}
+
org.lz4
@@ -106,6 +124,13 @@
false
+
+
+ kr.motd.maven
+ os-maven-plugin
+ 1.6.1
+
+
org.codehaus.mojo
@@ -171,6 +196,29 @@
+
+ org.xolstice.maven.plugins
+ protobuf-maven-plugin
+ ${protobuf-plugin.version}
+
+ com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}
+ grpc-java
+ io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}
+
+
+
+
+
+ @generated=omit
+
+
+
+ compile
+ compile-custom
+
+
+
+
diff --git a/src/main/java/it/cavallium/rockserver/core/Main.java b/src/main/java/it/cavallium/rockserver/core/Main.java
index 3c16a7f..ad37b3d 100644
--- a/src/main/java/it/cavallium/rockserver/core/Main.java
+++ b/src/main/java/it/cavallium/rockserver/core/Main.java
@@ -5,6 +5,7 @@ import static java.util.Objects.requireNonNull;
import it.cavallium.rockserver.core.common.Utils;
import it.cavallium.rockserver.core.impl.rocksdb.RocksDBLoader;
+import it.cavallium.rockserver.core.server.ServerBuilder;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
@@ -30,10 +31,14 @@ public class Main {
.type(String.class)
.setDefault(PRIVATE_MEMORY_URL.toString())
.help("Specify database rocksdb://hostname:port, or unix://, or file://");
- parser.addArgument("-l", "--listen-url")
+ parser.addArgument("-l", "--thrift-listen-url")
.type(String.class)
.setDefault("http://127.0.0.1:5332")
.help("Specify database http://hostname:port, or unix://, or file://");
+ parser.addArgument("-L", "--grpc-listen-url")
+ .type(String.class)
+ .setDefault("http://127.0.0.1:5333")
+ .help("Specify database http://hostname:port, or unix://, or file://");
parser.addArgument("-n", "--name")
.type(String.class)
.setDefault("main")
@@ -53,7 +58,6 @@ public class Main {
System.exit(1);
}
var clientBuilder = new it.cavallium.rockserver.core.client.ClientBuilder();
- var serverBuilder = new it.cavallium.rockserver.core.server.ServerBuilder();
if (ns.getBoolean("print_default_config")) {
requireNonNull(Main.class.getClassLoader()
@@ -67,12 +71,14 @@ public class Main {
RocksDBLoader.loadLibrary();
var rawDatabaseUrl = ns.getString("database_url");
- var rawListenUrl = ns.getString("listen_url");
+ var rawThriftListenUrl = ns.getString("thrift_listen_url");
+ var rawGrpcListenUrl = ns.getString("grpc_listen_url");
var name = ns.getString("name");
var config = ns.getString("config");
var databaseUrl = new URI(rawDatabaseUrl);
- var listenUrl = new URI(rawListenUrl);
+ var thriftListenUrl = new URI(rawThriftListenUrl);
+ var grpcListenUrl = new URI(rawGrpcListenUrl);
if (config != null) {
if (!databaseUrl.getScheme().equals("file")) {
@@ -93,24 +99,23 @@ public class Main {
case null, default -> throw new IllegalArgumentException("Invalid scheme \"" + databaseUrlScheme + "\" for database url url: " + databaseUrl);
}
- var listenUrlScheme = listenUrl.getScheme();
- switch (listenUrlScheme) {
- case "unix" -> serverBuilder.setUnixSocket(UnixDomainSocketAddress.of(Path.of(listenUrl.getPath())));
- case "http" -> serverBuilder.setHttpAddress(listenUrl.getHost(), Utils.parsePort(listenUrl));
- case "rocksdb" -> serverBuilder.setAddress(Utils.parseHostAndPort(listenUrl));
- case null, default -> throw new IllegalArgumentException("Invalid scheme \"" + listenUrlScheme + "\" for listen url: " + listenUrl);
- }
+ var thriftServerBuilder = new it.cavallium.rockserver.core.server.ServerBuilder();
+ buildServerAddress(thriftServerBuilder, thriftListenUrl, true);
+ var grpcServerBuilder = new it.cavallium.rockserver.core.server.ServerBuilder();
+ buildServerAddress(grpcServerBuilder, grpcListenUrl, false);
clientBuilder.setName(name);
try (var connection = clientBuilder.build()) {
LOG.log(Level.INFO, "Connected to {0}", connection);
- serverBuilder.setClient(connection);
+ thriftServerBuilder.setClient(connection);
+ grpcServerBuilder.setClient(connection);
CountDownLatch shutdownLatch = new CountDownLatch(1);
Runtime.getRuntime().addShutdownHook(new Thread(shutdownLatch::countDown));
- try (var server = serverBuilder.build()) {
+ try (var _ = thriftServerBuilder.build();
+ var _ = grpcServerBuilder.build()) {
shutdownLatch.await();
LOG.info("Shutting down...");
}
@@ -119,4 +124,17 @@ public class Main {
}
LOG.info("Shut down successfully");
}
+
+ private static void buildServerAddress(ServerBuilder serverBuilder, URI listenUrl, boolean useThrift) {
+ var thriftListenUrlScheme = listenUrl.getScheme();
+ switch (thriftListenUrlScheme) {
+ case "unix" -> serverBuilder.setUnixSocket(UnixDomainSocketAddress.of(Path.of(listenUrl.getPath())));
+ case "http" -> {
+ serverBuilder.setHttpAddress(listenUrl.getHost(), Utils.parsePort(listenUrl));
+ serverBuilder.setUseThrift(useThrift);
+ }
+ case "rocksdb" -> serverBuilder.setAddress(Utils.parseHostAndPort(listenUrl));
+ case null, default -> throw new IllegalArgumentException("Invalid scheme \"" + thriftListenUrlScheme + "\" for listen url: " + listenUrl);
+ }
+ }
}
diff --git a/src/main/java/it/cavallium/rockserver/core/common/api/ColumnHashType.java b/src/main/java/it/cavallium/rockserver/core/common/api/ColumnHashType.java
index 4070c2d..c2d6321 100644
--- a/src/main/java/it/cavallium/rockserver/core/common/api/ColumnHashType.java
+++ b/src/main/java/it/cavallium/rockserver/core/common/api/ColumnHashType.java
@@ -1,5 +1,5 @@
/**
- * Autogenerated by Thrift Compiler (0.19.0)
+ * Autogenerated by Thrift Compiler (0.20.0)
*
* DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
* @generated
diff --git a/src/main/java/it/cavallium/rockserver/core/common/api/ColumnSchema.java b/src/main/java/it/cavallium/rockserver/core/common/api/ColumnSchema.java
index 0fa6203..cc6c2d6 100644
--- a/src/main/java/it/cavallium/rockserver/core/common/api/ColumnSchema.java
+++ b/src/main/java/it/cavallium/rockserver/core/common/api/ColumnSchema.java
@@ -1,5 +1,5 @@
/**
- * Autogenerated by Thrift Compiler (0.19.0)
+ * Autogenerated by Thrift Compiler (0.20.0)
*
* DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
* @generated
diff --git a/src/main/java/it/cavallium/rockserver/core/common/api/Delta.java b/src/main/java/it/cavallium/rockserver/core/common/api/Delta.java
index 5ac5e61..1dbcdb6 100644
--- a/src/main/java/it/cavallium/rockserver/core/common/api/Delta.java
+++ b/src/main/java/it/cavallium/rockserver/core/common/api/Delta.java
@@ -1,5 +1,5 @@
/**
- * Autogenerated by Thrift Compiler (0.19.0)
+ * Autogenerated by Thrift Compiler (0.20.0)
*
* DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
* @generated
diff --git a/src/main/java/it/cavallium/rockserver/core/common/api/Operation.java b/src/main/java/it/cavallium/rockserver/core/common/api/Operation.java
index ad5a310..60e6de6 100644
--- a/src/main/java/it/cavallium/rockserver/core/common/api/Operation.java
+++ b/src/main/java/it/cavallium/rockserver/core/common/api/Operation.java
@@ -1,5 +1,5 @@
/**
- * Autogenerated by Thrift Compiler (0.19.0)
+ * Autogenerated by Thrift Compiler (0.20.0)
*
* DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
* @generated
diff --git a/src/main/java/it/cavallium/rockserver/core/common/api/OptionalBinary.java b/src/main/java/it/cavallium/rockserver/core/common/api/OptionalBinary.java
index 781aec3..b90b55a 100644
--- a/src/main/java/it/cavallium/rockserver/core/common/api/OptionalBinary.java
+++ b/src/main/java/it/cavallium/rockserver/core/common/api/OptionalBinary.java
@@ -1,5 +1,5 @@
/**
- * Autogenerated by Thrift Compiler (0.19.0)
+ * Autogenerated by Thrift Compiler (0.20.0)
*
* DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
* @generated
diff --git a/src/main/java/it/cavallium/rockserver/core/common/api/UpdateBegin.java b/src/main/java/it/cavallium/rockserver/core/common/api/UpdateBegin.java
index c9ff4a1..44eabe2 100644
--- a/src/main/java/it/cavallium/rockserver/core/common/api/UpdateBegin.java
+++ b/src/main/java/it/cavallium/rockserver/core/common/api/UpdateBegin.java
@@ -1,5 +1,5 @@
/**
- * Autogenerated by Thrift Compiler (0.19.0)
+ * Autogenerated by Thrift Compiler (0.20.0)
*
* DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
* @generated
diff --git a/src/main/java/it/cavallium/rockserver/core/server/GrpcServer.java b/src/main/java/it/cavallium/rockserver/core/server/GrpcServer.java
new file mode 100644
index 0000000..3b7b193
--- /dev/null
+++ b/src/main/java/it/cavallium/rockserver/core/server/GrpcServer.java
@@ -0,0 +1,532 @@
+package it.cavallium.rockserver.core.server;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Empty;
+import io.grpc.netty.NettyServerBuilder;
+import io.grpc.stub.StreamObserver;
+import it.cavallium.rockserver.core.client.RocksDBConnection;
+import it.cavallium.rockserver.core.common.ColumnHashType;
+import it.cavallium.rockserver.core.common.ColumnSchema;
+import it.cavallium.rockserver.core.common.Keys;
+import it.cavallium.rockserver.core.common.RequestType.RequestChanged;
+import it.cavallium.rockserver.core.common.RequestType.RequestCurrent;
+import it.cavallium.rockserver.core.common.RequestType.RequestDelta;
+import it.cavallium.rockserver.core.common.RequestType.RequestExists;
+import it.cavallium.rockserver.core.common.RequestType.RequestForUpdate;
+import it.cavallium.rockserver.core.common.RequestType.RequestMulti;
+import it.cavallium.rockserver.core.common.RequestType.RequestNothing;
+import it.cavallium.rockserver.core.common.RequestType.RequestPrevious;
+import it.cavallium.rockserver.core.common.RequestType.RequestPreviousPresence;
+import it.cavallium.rockserver.core.common.api.proto.Changed;
+import it.cavallium.rockserver.core.common.api.proto.CloseFailedUpdateRequest;
+import it.cavallium.rockserver.core.common.api.proto.CloseIteratorRequest;
+import it.cavallium.rockserver.core.common.api.proto.CloseTransactionRequest;
+import it.cavallium.rockserver.core.common.api.proto.CloseTransactionResponse;
+import it.cavallium.rockserver.core.common.api.proto.CreateColumnRequest;
+import it.cavallium.rockserver.core.common.api.proto.CreateColumnResponse;
+import it.cavallium.rockserver.core.common.api.proto.DeleteColumnRequest;
+import it.cavallium.rockserver.core.common.api.proto.Delta;
+import it.cavallium.rockserver.core.common.api.proto.GetColumnIdRequest;
+import it.cavallium.rockserver.core.common.api.proto.GetColumnIdResponse;
+import it.cavallium.rockserver.core.common.api.proto.GetRequest;
+import it.cavallium.rockserver.core.common.api.proto.GetResponse;
+import it.cavallium.rockserver.core.common.api.proto.KV;
+import it.cavallium.rockserver.core.common.api.proto.OpenIteratorRequest;
+import it.cavallium.rockserver.core.common.api.proto.OpenIteratorResponse;
+import it.cavallium.rockserver.core.common.api.proto.OpenTransactionRequest;
+import it.cavallium.rockserver.core.common.api.proto.OpenTransactionResponse;
+import it.cavallium.rockserver.core.common.api.proto.Previous;
+import it.cavallium.rockserver.core.common.api.proto.PreviousPresence;
+import it.cavallium.rockserver.core.common.api.proto.PutMultiInitialRequest;
+import it.cavallium.rockserver.core.common.api.proto.PutMultiRequest;
+import it.cavallium.rockserver.core.common.api.proto.PutRequest;
+import it.cavallium.rockserver.core.common.api.proto.RocksDBServiceGrpc.RocksDBServiceImplBase;
+import it.cavallium.rockserver.core.common.api.proto.SeekToRequest;
+import it.cavallium.rockserver.core.common.api.proto.SubsequentRequest;
+import it.cavallium.rockserver.core.common.api.proto.UpdateBegin;
+import it.unimi.dsi.fastutil.ints.Int2IntFunction;
+import it.unimi.dsi.fastutil.ints.Int2ObjectFunction;
+import it.unimi.dsi.fastutil.ints.IntArrayList;
+import it.unimi.dsi.fastutil.ints.IntList;
+import it.unimi.dsi.fastutil.objects.ObjectArrayList;
+import it.unimi.dsi.fastutil.objects.ObjectList;
+import java.io.IOException;
+import java.lang.foreign.Arena;
+import java.lang.foreign.MemorySegment;
+import java.net.InetSocketAddress;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiConsumer;
+import java.util.function.Function;
+import java.util.logging.Logger;
+
+public class GrpcServer extends Server {
+
+ private static final Logger LOG = Logger.getLogger(GrpcServer.class.getName());
+
+ private final GrpcServerImpl grpc;
+ private final io.grpc.Server server;
+
+ public GrpcServer(RocksDBConnection client, String http2Host, int http2Port) throws IOException {
+ super(client);
+ this.grpc = new GrpcServerImpl(this.getClient());
+ this.server = NettyServerBuilder.forAddress(new InetSocketAddress(http2Host, http2Port))
+ .addService(grpc)
+ .build();
+ server.start();
+ LOG.info("GRPC RocksDB server is listening at " + http2Host + ":" + http2Port);
+ }
+
+ private static final class GrpcServerImpl extends RocksDBServiceImplBase {
+
+ private static final Function super Void, Empty> MAP_EMPTY = _ -> Empty.getDefaultInstance();
+ private final RocksDBConnection client;
+ private final Arena autoArena;
+
+ public GrpcServerImpl(RocksDBConnection client) {
+ this.client = client;
+ this.autoArena = Arena.ofAuto();
+ }
+
+ // functions
+
+ @Override
+ public void openTransaction(OpenTransactionRequest request,
+ StreamObserver responseObserver) {
+ client.getAsyncApi()
+ .openTransactionAsync(request.getTimeoutMs())
+ .whenComplete(handleResponseObserver(
+ txId -> OpenTransactionResponse.newBuilder().setTransactionId(txId).build(),
+ responseObserver));
+ }
+
+ @Override
+ public void closeTransaction(CloseTransactionRequest request,
+ StreamObserver responseObserver) {
+ client.getAsyncApi()
+ .closeTransactionAsync(request.getTransactionId(), request.getCommit())
+ .whenComplete(handleResponseObserver(
+ committed -> CloseTransactionResponse.newBuilder().setSuccessful(committed).build(),
+ responseObserver
+ ));
+ }
+
+ @Override
+ public void closeFailedUpdate(CloseFailedUpdateRequest request, StreamObserver responseObserver) {
+ client.getAsyncApi()
+ .closeFailedUpdateAsync(request.getUpdateId())
+ .whenComplete(handleResponseObserver(MAP_EMPTY, responseObserver));
+ }
+
+ @Override
+ public void createColumn(CreateColumnRequest request, StreamObserver responseObserver) {
+ client.getAsyncApi()
+ .createColumnAsync(request.getName(), mapColumnSchema(request.getSchema()))
+ .whenComplete(handleResponseObserver(
+ colId -> CreateColumnResponse.newBuilder().setColumnId(colId).build(),
+ responseObserver));
+ }
+
+ @Override
+ public void deleteColumn(DeleteColumnRequest request, StreamObserver responseObserver) {
+ client.getAsyncApi()
+ .deleteColumnAsync(request.getColumnId())
+ .whenComplete(handleResponseObserver(MAP_EMPTY, responseObserver));
+ }
+
+ @Override
+ public void getColumnId(GetColumnIdRequest request, StreamObserver responseObserver) {
+ client.getAsyncApi()
+ .getColumnIdAsync(request.getName())
+ .whenComplete(handleResponseObserver(
+ colId -> GetColumnIdResponse.newBuilder().setColumnId(colId).build(),
+ responseObserver));
+ }
+
+ @Override
+ public void put(PutRequest request, StreamObserver responseObserver) {
+ client.getAsyncApi()
+ .putAsync(autoArena,
+ request.getTransactionOrUpdateId(),
+ request.getColumnId(),
+ mapKeys(request.getData().getKeysCount(), request.getData()::getKeys),
+ MemorySegment.ofBuffer(request.getData().getValue().asReadOnlyByteBuffer()),
+ new RequestNothing<>()
+ )
+ .whenComplete(handleResponseObserver(MAP_EMPTY, responseObserver));
+ }
+
+ @Override
+ public StreamObserver putMulti(StreamObserver responseObserver) {
+ return new StreamObserver<>() {
+ private boolean initialRequestDone = false;
+ private long requestsCount = 0;
+ private boolean requestsCountFinalized;
+ private final AtomicLong processedRequestsCount = new AtomicLong();
+ private PutMultiInitialRequest initialRequest;
+
+ @Override
+ public void onNext(PutMultiRequest request) {
+ switch (request.getPutMultiRequestTypeCase()) {
+ case INITIALREQUEST -> {
+ if (initialRequestDone) {
+ throw new UnsupportedOperationException("Initial request already done!");
+ }
+ this.initialRequest = request.getInitialRequest();
+ this.initialRequestDone = true;
+ }
+ case DATA -> {
+ if (!initialRequestDone) {
+ throw new UnsupportedOperationException("Initial request already done!");
+ }
+ ++requestsCount;
+ client.getAsyncApi()
+ .putAsync(autoArena,
+ initialRequest.getTransactionOrUpdateId(),
+ initialRequest.getColumnId(),
+ mapKeys(request.getData().getKeysCount(), request.getData()::getKeys),
+ MemorySegment.ofBuffer(request.getData().getValue().asReadOnlyByteBuffer()),
+ new RequestNothing<>()
+ )
+ .whenComplete((_, error) -> {
+ if (error != null) {
+ responseObserver.onError(error);
+ } else {
+ var newProcessedRequestCount = processedRequestsCount.incrementAndGet();
+ if (requestsCountFinalized) {
+ if (newProcessedRequestCount == requestsCount) {
+ responseObserver.onCompleted();
+ }
+ }
+ }
+ });
+ }
+ case null, default ->
+ throw new UnsupportedOperationException("Unsupported operation: "
+ + request.getPutMultiRequestTypeCase());
+ }
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ responseObserver.onError(t);
+ }
+
+ @Override
+ public void onCompleted() {
+ requestsCountFinalized = true;
+ if (requestsCount == 0) {
+ responseObserver.onCompleted();
+ }
+ }
+ };
+ }
+
+ @Override
+ public void putGetPrevious(PutRequest request, StreamObserver responseObserver) {
+ client.getAsyncApi()
+ .putAsync(autoArena,
+ request.getTransactionOrUpdateId(),
+ request.getColumnId(),
+ mapKeys(request.getData().getKeysCount(), request.getData()::getKeys),
+ MemorySegment.ofBuffer(request.getData().getValue().asReadOnlyByteBuffer()),
+ new RequestPrevious<>()
+ )
+ .whenComplete(handleResponseObserver(
+ prev -> {
+ var prevBuilder = Previous.newBuilder();
+ if (prev != null) {
+ prevBuilder.setPrevious(ByteString.copyFrom(prev.asByteBuffer()));
+ }
+ return prevBuilder.build();
+ },
+ responseObserver));
+ }
+
+ @Override
+ public void putGetDelta(PutRequest request, StreamObserver responseObserver) {
+ client.getAsyncApi()
+ .putAsync(autoArena,
+ request.getTransactionOrUpdateId(),
+ request.getColumnId(),
+ mapKeys(request.getData().getKeysCount(), request.getData()::getKeys),
+ MemorySegment.ofBuffer(request.getData().getValue().asReadOnlyByteBuffer()),
+ new RequestDelta<>()
+ )
+ .whenComplete(handleResponseObserver(
+ delta -> {
+ var deltaBuilder = Delta.newBuilder();
+ if (delta.previous() != null) {
+ deltaBuilder.setPrevious(ByteString.copyFrom(delta.previous().asByteBuffer()));
+ }
+ if (delta.current() != null) {
+ deltaBuilder.setCurrent(ByteString.copyFrom(delta.current().asByteBuffer()));
+ }
+ return deltaBuilder.build();
+ },
+ responseObserver));
+ }
+
+ @Override
+ public void putGetChanged(PutRequest request, StreamObserver responseObserver) {
+ client.getAsyncApi()
+ .putAsync(autoArena,
+ request.getTransactionOrUpdateId(),
+ request.getColumnId(),
+ mapKeys(request.getData().getKeysCount(), request.getData()::getKeys),
+ MemorySegment.ofBuffer(request.getData().getValue().asReadOnlyByteBuffer()),
+ new RequestChanged<>()
+ )
+ .whenComplete(handleResponseObserver(
+ changed -> Changed.newBuilder().setChanged(changed).build(),
+ responseObserver));
+ }
+
+ @Override
+ public void putGetPreviousPresence(PutRequest request, StreamObserver responseObserver) {
+ client.getAsyncApi()
+ .putAsync(autoArena,
+ request.getTransactionOrUpdateId(),
+ request.getColumnId(),
+ mapKeys(request.getData().getKeysCount(), request.getData()::getKeys),
+ MemorySegment.ofBuffer(request.getData().getValue().asReadOnlyByteBuffer()),
+ new RequestPreviousPresence<>()
+ )
+ .whenComplete(handleResponseObserver(
+ present -> PreviousPresence.newBuilder().setPresent(present).build(),
+ responseObserver));
+ }
+
+ @Override
+ public void get(GetRequest request, StreamObserver responseObserver) {
+ client.getAsyncApi()
+ .getAsync(autoArena,
+ request.getTransactionOrUpdateId(),
+ request.getColumnId(),
+ mapKeys(request.getKeysCount(), request::getKeys),
+ new RequestCurrent<>()
+ )
+ .whenComplete(handleResponseObserver(
+ current -> {
+ var response = GetResponse.newBuilder();
+ if (current != null) {
+ response.setValue(ByteString.copyFrom(current.asByteBuffer()));
+ }
+ return response.build();
+ },
+ responseObserver));
+ }
+
+ @Override
+ public void getForUpdate(GetRequest request, StreamObserver responseObserver) {
+ client.getAsyncApi()
+ .getAsync(autoArena,
+ request.getTransactionOrUpdateId(),
+ request.getColumnId(),
+ mapKeys(request.getKeysCount(), request::getKeys),
+ new RequestForUpdate<>()
+ )
+ .whenComplete(handleResponseObserver(
+ forUpdate -> {
+ var response = UpdateBegin.newBuilder();
+ response.setUpdateId(forUpdate.updateId());
+ if (forUpdate.previous() != null) {
+ response.setPrevious(ByteString.copyFrom(forUpdate.previous().asByteBuffer()));
+ }
+ return response.build();
+ },
+ responseObserver));
+ }
+
+ @Override
+ public void exists(GetRequest request, StreamObserver responseObserver) {
+ client.getAsyncApi()
+ .getAsync(autoArena,
+ request.getTransactionOrUpdateId(),
+ request.getColumnId(),
+ mapKeys(request.getKeysCount(), request::getKeys),
+ new RequestExists<>()
+ )
+ .whenComplete(handleResponseObserver(
+ exists -> PreviousPresence.newBuilder().setPresent(exists).build(),
+ responseObserver));
+ }
+
+ @Override
+ public void openIterator(OpenIteratorRequest request, StreamObserver responseObserver) {
+ client.getAsyncApi()
+ .openIteratorAsync(autoArena,
+ request.getTransactionId(),
+ request.getColumnId(),
+ mapKeys(request.getStartKeysInclusiveCount(), request::getStartKeysInclusive),
+ mapKeys(request.getEndKeysExclusiveCount(), request::getEndKeysExclusive),
+ request.getReverse(),
+ request.getTimeoutMs()
+ )
+ .whenComplete(handleResponseObserver(
+ iteratorId -> OpenIteratorResponse.newBuilder().setIteratorId(iteratorId).build(),
+ responseObserver));
+ }
+
+ @Override
+ public void closeIterator(CloseIteratorRequest request, StreamObserver responseObserver) {
+ client.getAsyncApi()
+ .closeIteratorAsync(request.getIteratorId())
+ .whenComplete(handleResponseObserver(MAP_EMPTY, responseObserver));
+ }
+
+ @Override
+ public void seekTo(SeekToRequest request, StreamObserver responseObserver) {
+ client.getAsyncApi()
+ .seekToAsync(autoArena, request.getIterationId(), mapKeys(request.getKeysCount(), request::getKeys))
+ .whenComplete(handleResponseObserver(MAP_EMPTY, responseObserver));
+ }
+
+ @Override
+ public void subsequent(SubsequentRequest request, StreamObserver responseObserver) {
+ client.getAsyncApi()
+ .subsequentAsync(autoArena,
+ request.getIterationId(),
+ request.getSkipCount(),
+ request.getTakeCount(),
+ new RequestNothing<>()
+ )
+ .whenComplete(handleResponseObserver(MAP_EMPTY, responseObserver));
+ }
+
+ @Override
+ public void subsequentExists(SubsequentRequest request, StreamObserver responseObserver) {
+ client.getAsyncApi()
+ .subsequentAsync(autoArena,
+ request.getIterationId(),
+ request.getSkipCount(),
+ request.getTakeCount(),
+ new RequestExists<>()
+ )
+ .whenComplete(handleResponseObserver(
+ exists -> PreviousPresence.newBuilder().setPresent(exists).build(),
+ responseObserver));
+ }
+
+ public void subsequentMultiPage(SubsequentRequest request, StreamObserver responseObserver, int pageIndex) {
+ final long pageSize = 16L;
+ if (request.getTakeCount() > pageIndex * pageSize) {
+ client.getAsyncApi()
+ .subsequentAsync(autoArena,
+ request.getIterationId(),
+ pageIndex == 0 ? request.getSkipCount() : 0,
+ Math.min(request.getTakeCount() - pageIndex * pageSize, pageSize),
+ new RequestMulti<>()
+ )
+ .whenComplete((response, ex) -> {
+ if (ex != null) {
+ responseObserver.onError(ex);
+ } else {
+ for (MemorySegment entry : response) {
+ Keys keys = null; // todo: implement
+ MemorySegment value = entry;
+ responseObserver.onNext(KV.newBuilder()
+ .addAllKeys(null) // todo: implement
+ .setValue(ByteString.copyFrom(value.asByteBuffer()))
+ .build());
+ }
+ subsequentMultiPage(request, responseObserver, pageIndex + 1);
+ }
+ });
+ } else {
+ responseObserver.onCompleted();
+ }
+ }
+
+ @Override
+ public void subsequentMultiGet(SubsequentRequest request, StreamObserver responseObserver) {
+ subsequentMultiPage(request, responseObserver, 0);
+ }
+
+ // mappers
+
+ private static ColumnSchema mapColumnSchema(it.cavallium.rockserver.core.common.api.proto.ColumnSchema schema) {
+ return ColumnSchema.of(mapKeysLength(schema.getFixedKeysCount(), schema::getFixedKeys),
+ mapVariableTailKeys(schema.getVariableTailKeysCount(), schema::getVariableTailKeys),
+ schema.getHasValue()
+ );
+ }
+
+ private static IntList mapKeysLength(int count, Int2IntFunction keyGetterAt) {
+ var l = new IntArrayList(count);
+ for (int i = 0; i < count; i++) {
+ l.add(keyGetterAt.apply(i));
+ }
+ return l;
+ }
+
+ private static ObjectList mapVariableTailKeys(int count,
+ Int2ObjectFunction variableTailKeyGetterAt) {
+ var l = new ObjectArrayList(count);
+ for (int i = 0; i < count; i++) {
+ l.add(switch (variableTailKeyGetterAt.apply(i)) {
+ case XXHASH32 -> ColumnHashType.XXHASH32;
+ case XXHASH8 -> ColumnHashType.XXHASH8;
+ case ALLSAME8 -> ColumnHashType.ALLSAME8;
+ case UNRECOGNIZED -> throw new UnsupportedOperationException();
+ });
+ }
+ return l;
+ }
+
+ private static Keys mapKeys(int count, Int2ObjectFunction keyGetterAt) {
+ var segments = new MemorySegment[count];
+ for (int i = 0; i < count; i++) {
+ segments[i] = MemorySegment.ofBuffer(keyGetterAt.apply(i).asReadOnlyByteBuffer());
+ }
+ return new Keys(segments);
+ }
+
+ // utils
+
+ private static BiConsumer super T, Throwable> handleResponseObserver(StreamObserver responseObserver) {
+ return (value, ex) -> {
+ if (ex != null) {
+ responseObserver.onError(ex);
+ } else {
+ if (value != null) {
+ responseObserver.onNext(value);
+ }
+ responseObserver.onCompleted();
+ }
+ };
+ }
+
+ private static BiConsumer super PREV, Throwable> handleResponseObserver(Function resultMapper,
+ StreamObserver responseObserver) {
+ return (value, ex) -> {
+ if (ex != null) {
+ responseObserver.onError(ex);
+ } else {
+ T mapped;
+ try {
+ mapped = resultMapper.apply(value);
+ } catch (Throwable ex2) {
+ responseObserver.onError(ex2);
+ return;
+ }
+ if (mapped != null) {
+ responseObserver.onNext(mapped);
+ }
+ responseObserver.onCompleted();
+ }
+ };
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ LOG.info("GRPC server is shutting down...");
+ server.shutdown();
+ try {
+ server.awaitTermination();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ super.close();
+ }
+}
diff --git a/src/main/java/it/cavallium/rockserver/core/server/ServerBuilder.java b/src/main/java/it/cavallium/rockserver/core/server/ServerBuilder.java
index 840c23a..24915c3 100644
--- a/src/main/java/it/cavallium/rockserver/core/server/ServerBuilder.java
+++ b/src/main/java/it/cavallium/rockserver/core/server/ServerBuilder.java
@@ -14,6 +14,7 @@ public class ServerBuilder {
private UnixDomainSocketAddress unixAddress;
private String http2Host;
private int http2Port;
+ private boolean useThrift;
private RocksDBConnection client;
public void setUnixSocket(UnixDomainSocketAddress address) {
@@ -29,13 +30,21 @@ public class ServerBuilder {
this.http2Port = port;
}
+ public void setUseThrift(boolean useThrift) {
+ this.useThrift = useThrift;
+ }
+
public void setClient(RocksDBConnection client) {
this.client = client;
}
public Server build() throws IOException {
if (http2Host != null) {
- return new ThriftServer(client, http2Host, http2Port);
+ if (useThrift) {
+ return new ThriftServer(client, http2Host, http2Port);
+ } else {
+ return new GrpcServer(client, http2Host, http2Port);
+ }
} else if (unixAddress != null) {
throw new UnsupportedOperationException("Not implemented: unix socket");
} else if (iNetAddress != null) {
diff --git a/src/main/java/it/cavallium/rockserver/core/server/ThriftServer.java b/src/main/java/it/cavallium/rockserver/core/server/ThriftServer.java
index b12e6be..80b7f30 100644
--- a/src/main/java/it/cavallium/rockserver/core/server/ThriftServer.java
+++ b/src/main/java/it/cavallium/rockserver/core/server/ThriftServer.java
@@ -22,6 +22,7 @@ import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.List;
+import java.util.logging.Logger;
import org.apache.thrift.TException;
import org.apache.thrift.server.TThreadedSelectorServer;
import org.apache.thrift.transport.TNonblockingServerSocket;
@@ -30,19 +31,26 @@ import org.jetbrains.annotations.NotNull;
public class ThriftServer extends Server {
+ private static final Logger LOG = Logger.getLogger(ThriftServer.class.getName());
private static final OfByte BYTE_BE = ValueLayout.JAVA_BYTE.withOrder(ByteOrder.BIG_ENDIAN);
+ private final Thread thriftThread;
+ private final TThreadedSelectorServer server;
+
public ThriftServer(RocksDBConnection client, String http2Host, int http2Port) throws IOException {
super(client);
var handler = new ThriftHandler(this.getClient());
try {
var serverTransport = new TNonblockingServerSocket(new InetSocketAddress(http2Host, http2Port));
- var server = new TThreadedSelectorServer(new TThreadedSelectorServer.Args(serverTransport)
+ this.server = new TThreadedSelectorServer(new TThreadedSelectorServer.Args(serverTransport)
.processor(new Processor<>(handler))
);
- server.serve();
+ var thriftThread = new Thread(server::serve);
+ thriftThread.setName("Thrift server thread");
+ this.thriftThread = thriftThread;
+ LOG.info("Thrift RocksDB server is listening at " + http2Host + ":" + http2Port);
} catch (TTransportException e) {
throw new IOException("Can't open server socket", e);
}
@@ -300,4 +308,11 @@ public class ThriftServer extends Server {
}
}
}
+
+ @Override
+ public void close() throws IOException {
+ LOG.info("Thrift server is shutting down...");
+ this.server.stop();
+ super.close();
+ }
}
diff --git a/src/main/java/module-info.java b/src/main/java/module-info.java
index 918e4f1..097cc99 100644
--- a/src/main/java/module-info.java
+++ b/src/main/java/module-info.java
@@ -9,6 +9,13 @@ module rockserver.core {
requires it.unimi.dsi.fastutil;
requires org.apache.thrift;
requires org.slf4j;
+ requires protobuf.java;
+ requires io.grpc.protobuf;
+ requires io.grpc.stub;
+ requires io.grpc;
+ requires jsr305;
+ requires com.google.common;
+ requires io.grpc.netty;
exports it.cavallium.rockserver.core.client;
exports it.cavallium.rockserver.core.common;
diff --git a/src/main/proto/rocksdb.proto b/src/main/proto/rocksdb.proto
new file mode 100644
index 0000000..32094c9
--- /dev/null
+++ b/src/main/proto/rocksdb.proto
@@ -0,0 +1,105 @@
+syntax = "proto3";
+
+import "google/protobuf/empty.proto";
+
+option java_multiple_files = true;
+package it.cavallium.rockserver.core.common.api.proto;
+
+message ColumnSchema {
+ repeated int32 fixedKeys = 1;
+ repeated ColumnHashType variableTailKeys = 2;
+ bool hasValue = 3;
+}
+
+enum ColumnHashType {
+ XXHASH32 = 0;
+ XXHASH8 = 1;
+ ALLSAME8 = 2;
+}
+
+enum Operation {
+ NOTHING = 0;
+ PREVIOUS = 1;
+ CURRENT = 2;
+ FOR_UPDATE = 3;
+ EXISTS = 4;
+ DELTA = 5;
+ MULTI = 6;
+ CHANGED = 7;
+ PREVIOUS_PRESENCE = 8;
+}
+
+message Delta {
+ optional bytes previous = 1;
+ optional bytes current = 2;
+}
+message Previous {optional bytes previous = 1;}
+message Changed {bool changed = 1;}
+message PreviousPresence {bool present = 1;}
+
+message UpdateBegin {
+ optional bytes previous = 1;
+ optional int64 updateId = 2;
+}
+
+message KV {
+ repeated bytes keys = 1;
+ bytes value = 2;
+}
+
+message OpenTransactionRequest {int64 timeoutMs = 1;}
+message OpenTransactionResponse {int64 transactionId = 1;}
+
+message CloseTransactionRequest {int64 transactionId = 1; int64 timeoutMs = 2; bool commit = 3;}
+message CloseTransactionResponse {bool successful = 1;}
+
+message CloseFailedUpdateRequest {int64 updateId = 1;}
+
+message CreateColumnRequest {string name = 1; ColumnSchema schema = 2;}
+message CreateColumnResponse {int64 columnId = 1;}
+
+message DeleteColumnRequest {int64 columnId = 1;}
+
+message GetColumnIdRequest {string name = 1;}
+message GetColumnIdResponse {int64 columnId = 1;}
+
+message PutRequest {int64 transactionOrUpdateId = 1; int64 columnId = 2; KV data = 3;}
+
+message PutMultiInitialRequest {int64 transactionOrUpdateId = 1; int64 columnId = 2;}
+message PutMultiRequest {oneof putMultiRequestType {PutMultiInitialRequest initialRequest = 1;KV data = 2;}}
+
+message GetRequest {int64 transactionOrUpdateId = 1; int64 columnId = 2; repeated bytes keys = 3;}
+message GetResponse {optional bytes value = 1;}
+
+message OpenIteratorRequest {int64 transactionId = 1; int64 columnId = 2; repeated bytes startKeysInclusive = 3; repeated bytes endKeysExclusive = 4; bool reverse = 5; int64 timeoutMs = 6;}
+message OpenIteratorResponse {int64 iteratorId = 1;}
+
+message CloseIteratorRequest {int64 iteratorId = 1;}
+
+message SeekToRequest {int64 iterationId = 1; repeated bytes keys = 2;}
+
+message SubsequentRequest {int64 iterationId = 1; int64 skipCount = 2; int64 takeCount = 3;}
+
+service RocksDBService {
+ rpc openTransaction(OpenTransactionRequest) returns (OpenTransactionResponse);
+ rpc closeTransaction(CloseTransactionRequest) returns (CloseTransactionResponse);
+ rpc closeFailedUpdate(CloseFailedUpdateRequest) returns (google.protobuf.Empty);
+ rpc createColumn(CreateColumnRequest) returns (CreateColumnResponse);
+ rpc deleteColumn(DeleteColumnRequest) returns (google.protobuf.Empty);
+ rpc getColumnId(GetColumnIdRequest) returns (GetColumnIdResponse);
+ rpc put(PutRequest) returns (google.protobuf.Empty);
+ rpc putMulti(stream PutMultiRequest) returns (google.protobuf.Empty);
+ rpc putGetPrevious(PutRequest) returns (Previous);
+ rpc putGetDelta(PutRequest) returns (Delta);
+ rpc putGetChanged(PutRequest) returns (Changed);
+ rpc putGetPreviousPresence(PutRequest) returns (PreviousPresence);
+ rpc get(GetRequest) returns (GetResponse);
+ rpc getForUpdate(GetRequest) returns (UpdateBegin);
+ rpc exists(GetRequest) returns (PreviousPresence);
+ rpc openIterator(OpenIteratorRequest) returns (OpenIteratorResponse);
+ rpc closeIterator(CloseIteratorRequest) returns (google.protobuf.Empty);
+ rpc seekTo(SeekToRequest) returns (google.protobuf.Empty);
+ rpc subsequent(SubsequentRequest) returns (google.protobuf.Empty);
+ rpc subsequentExists(SubsequentRequest) returns (PreviousPresence);
+ rpc subsequentMultiGet(SubsequentRequest) returns (stream KV);
+}