Implement GRPC server

This commit is contained in:
Andrea Cavalli 2024-07-01 23:46:24 +02:00
parent 9ab08c54a3
commit aeadadd7b4
13 changed files with 756 additions and 22 deletions

48
pom.xml
View File

@ -17,6 +17,9 @@
<slf4j.version>2.0.12</slf4j.version> <slf4j.version>2.0.12</slf4j.version>
<imageName>rockserver-core</imageName> <imageName>rockserver-core</imageName>
<mainClass>it.cavallium.rockserver.core.Main</mainClass> <mainClass>it.cavallium.rockserver.core.Main</mainClass>
<protobuf-plugin.version>0.6.1</protobuf-plugin.version>
<protobuf.version>3.25.3</protobuf.version>
<grpc.version>1.65.0</grpc.version>
</properties> </properties>
<repositories> <repositories>
@ -80,6 +83,21 @@
<artifactId>slf4j-api</artifactId> <artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version> <version>${slf4j.version}</version>
</dependency> </dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty</artifactId>
<version>${grpc.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
<version>${grpc.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
<version>${grpc.version}</version>
</dependency>
<dependency> <dependency>
<groupId>org.lz4</groupId> <groupId>org.lz4</groupId>
@ -106,6 +124,13 @@
<filtering>false</filtering> <filtering>false</filtering>
</resource> </resource>
</resources> </resources>
<extensions>
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.6.1</version>
</extension>
</extensions>
<plugins> <plugins>
<plugin> <plugin>
<groupId>org.codehaus.mojo</groupId> <groupId>org.codehaus.mojo</groupId>
@ -171,6 +196,29 @@
</execution> </execution>
</executions> </executions>
</plugin> </plugin>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>${protobuf-plugin.version}</version>
<configuration>
<protocArtifact>com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}</protocArtifact>
<pluginId>grpc-java</pluginId>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}</pluginArtifact>
</configuration>
<executions>
<execution>
<configuration>
<pluginParameter>
@generated=omit
</pluginParameter>
</configuration>
<goals>
<goal>compile</goal>
<goal>compile-custom</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins> </plugins>
</build> </build>

View File

@ -5,6 +5,7 @@ import static java.util.Objects.requireNonNull;
import it.cavallium.rockserver.core.common.Utils; import it.cavallium.rockserver.core.common.Utils;
import it.cavallium.rockserver.core.impl.rocksdb.RocksDBLoader; import it.cavallium.rockserver.core.impl.rocksdb.RocksDBLoader;
import it.cavallium.rockserver.core.server.ServerBuilder;
import java.io.IOException; import java.io.IOException;
import java.net.URI; import java.net.URI;
import java.net.URISyntaxException; import java.net.URISyntaxException;
@ -30,10 +31,14 @@ public class Main {
.type(String.class) .type(String.class)
.setDefault(PRIVATE_MEMORY_URL.toString()) .setDefault(PRIVATE_MEMORY_URL.toString())
.help("Specify database rocksdb://hostname:port, or unix://<path>, or file://<path>"); .help("Specify database rocksdb://hostname:port, or unix://<path>, or file://<path>");
parser.addArgument("-l", "--listen-url") parser.addArgument("-l", "--thrift-listen-url")
.type(String.class) .type(String.class)
.setDefault("http://127.0.0.1:5332") .setDefault("http://127.0.0.1:5332")
.help("Specify database http://hostname:port, or unix://<path>, or file://<path>"); .help("Specify database http://hostname:port, or unix://<path>, or file://<path>");
parser.addArgument("-L", "--grpc-listen-url")
.type(String.class)
.setDefault("http://127.0.0.1:5333")
.help("Specify database http://hostname:port, or unix://<path>, or file://<path>");
parser.addArgument("-n", "--name") parser.addArgument("-n", "--name")
.type(String.class) .type(String.class)
.setDefault("main") .setDefault("main")
@ -53,7 +58,6 @@ public class Main {
System.exit(1); System.exit(1);
} }
var clientBuilder = new it.cavallium.rockserver.core.client.ClientBuilder(); var clientBuilder = new it.cavallium.rockserver.core.client.ClientBuilder();
var serverBuilder = new it.cavallium.rockserver.core.server.ServerBuilder();
if (ns.getBoolean("print_default_config")) { if (ns.getBoolean("print_default_config")) {
requireNonNull(Main.class.getClassLoader() requireNonNull(Main.class.getClassLoader()
@ -67,12 +71,14 @@ public class Main {
RocksDBLoader.loadLibrary(); RocksDBLoader.loadLibrary();
var rawDatabaseUrl = ns.getString("database_url"); var rawDatabaseUrl = ns.getString("database_url");
var rawListenUrl = ns.getString("listen_url"); var rawThriftListenUrl = ns.getString("thrift_listen_url");
var rawGrpcListenUrl = ns.getString("grpc_listen_url");
var name = ns.getString("name"); var name = ns.getString("name");
var config = ns.getString("config"); var config = ns.getString("config");
var databaseUrl = new URI(rawDatabaseUrl); var databaseUrl = new URI(rawDatabaseUrl);
var listenUrl = new URI(rawListenUrl); var thriftListenUrl = new URI(rawThriftListenUrl);
var grpcListenUrl = new URI(rawGrpcListenUrl);
if (config != null) { if (config != null) {
if (!databaseUrl.getScheme().equals("file")) { if (!databaseUrl.getScheme().equals("file")) {
@ -93,24 +99,23 @@ public class Main {
case null, default -> throw new IllegalArgumentException("Invalid scheme \"" + databaseUrlScheme + "\" for database url url: " + databaseUrl); case null, default -> throw new IllegalArgumentException("Invalid scheme \"" + databaseUrlScheme + "\" for database url url: " + databaseUrl);
} }
var listenUrlScheme = listenUrl.getScheme(); var thriftServerBuilder = new it.cavallium.rockserver.core.server.ServerBuilder();
switch (listenUrlScheme) { buildServerAddress(thriftServerBuilder, thriftListenUrl, true);
case "unix" -> serverBuilder.setUnixSocket(UnixDomainSocketAddress.of(Path.of(listenUrl.getPath()))); var grpcServerBuilder = new it.cavallium.rockserver.core.server.ServerBuilder();
case "http" -> serverBuilder.setHttpAddress(listenUrl.getHost(), Utils.parsePort(listenUrl)); buildServerAddress(grpcServerBuilder, grpcListenUrl, false);
case "rocksdb" -> serverBuilder.setAddress(Utils.parseHostAndPort(listenUrl));
case null, default -> throw new IllegalArgumentException("Invalid scheme \"" + listenUrlScheme + "\" for listen url: " + listenUrl);
}
clientBuilder.setName(name); clientBuilder.setName(name);
try (var connection = clientBuilder.build()) { try (var connection = clientBuilder.build()) {
LOG.log(Level.INFO, "Connected to {0}", connection); LOG.log(Level.INFO, "Connected to {0}", connection);
serverBuilder.setClient(connection); thriftServerBuilder.setClient(connection);
grpcServerBuilder.setClient(connection);
CountDownLatch shutdownLatch = new CountDownLatch(1); CountDownLatch shutdownLatch = new CountDownLatch(1);
Runtime.getRuntime().addShutdownHook(new Thread(shutdownLatch::countDown)); Runtime.getRuntime().addShutdownHook(new Thread(shutdownLatch::countDown));
try (var server = serverBuilder.build()) { try (var _ = thriftServerBuilder.build();
var _ = grpcServerBuilder.build()) {
shutdownLatch.await(); shutdownLatch.await();
LOG.info("Shutting down..."); LOG.info("Shutting down...");
} }
@ -119,4 +124,17 @@ public class Main {
} }
LOG.info("Shut down successfully"); LOG.info("Shut down successfully");
} }
private static void buildServerAddress(ServerBuilder serverBuilder, URI listenUrl, boolean useThrift) {
var thriftListenUrlScheme = listenUrl.getScheme();
switch (thriftListenUrlScheme) {
case "unix" -> serverBuilder.setUnixSocket(UnixDomainSocketAddress.of(Path.of(listenUrl.getPath())));
case "http" -> {
serverBuilder.setHttpAddress(listenUrl.getHost(), Utils.parsePort(listenUrl));
serverBuilder.setUseThrift(useThrift);
}
case "rocksdb" -> serverBuilder.setAddress(Utils.parseHostAndPort(listenUrl));
case null, default -> throw new IllegalArgumentException("Invalid scheme \"" + thriftListenUrlScheme + "\" for listen url: " + listenUrl);
}
}
} }

View File

@ -1,5 +1,5 @@
/** /**
* Autogenerated by Thrift Compiler (0.19.0) * Autogenerated by Thrift Compiler (0.20.0)
* *
* DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
* @generated * @generated

View File

@ -1,5 +1,5 @@
/** /**
* Autogenerated by Thrift Compiler (0.19.0) * Autogenerated by Thrift Compiler (0.20.0)
* *
* DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
* @generated * @generated

View File

@ -1,5 +1,5 @@
/** /**
* Autogenerated by Thrift Compiler (0.19.0) * Autogenerated by Thrift Compiler (0.20.0)
* *
* DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
* @generated * @generated

View File

@ -1,5 +1,5 @@
/** /**
* Autogenerated by Thrift Compiler (0.19.0) * Autogenerated by Thrift Compiler (0.20.0)
* *
* DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
* @generated * @generated

View File

@ -1,5 +1,5 @@
/** /**
* Autogenerated by Thrift Compiler (0.19.0) * Autogenerated by Thrift Compiler (0.20.0)
* *
* DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
* @generated * @generated

View File

@ -1,5 +1,5 @@
/** /**
* Autogenerated by Thrift Compiler (0.19.0) * Autogenerated by Thrift Compiler (0.20.0)
* *
* DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
* @generated * @generated

View File

@ -0,0 +1,532 @@
package it.cavallium.rockserver.core.server;
import com.google.protobuf.ByteString;
import com.google.protobuf.Empty;
import io.grpc.netty.NettyServerBuilder;
import io.grpc.stub.StreamObserver;
import it.cavallium.rockserver.core.client.RocksDBConnection;
import it.cavallium.rockserver.core.common.ColumnHashType;
import it.cavallium.rockserver.core.common.ColumnSchema;
import it.cavallium.rockserver.core.common.Keys;
import it.cavallium.rockserver.core.common.RequestType.RequestChanged;
import it.cavallium.rockserver.core.common.RequestType.RequestCurrent;
import it.cavallium.rockserver.core.common.RequestType.RequestDelta;
import it.cavallium.rockserver.core.common.RequestType.RequestExists;
import it.cavallium.rockserver.core.common.RequestType.RequestForUpdate;
import it.cavallium.rockserver.core.common.RequestType.RequestMulti;
import it.cavallium.rockserver.core.common.RequestType.RequestNothing;
import it.cavallium.rockserver.core.common.RequestType.RequestPrevious;
import it.cavallium.rockserver.core.common.RequestType.RequestPreviousPresence;
import it.cavallium.rockserver.core.common.api.proto.Changed;
import it.cavallium.rockserver.core.common.api.proto.CloseFailedUpdateRequest;
import it.cavallium.rockserver.core.common.api.proto.CloseIteratorRequest;
import it.cavallium.rockserver.core.common.api.proto.CloseTransactionRequest;
import it.cavallium.rockserver.core.common.api.proto.CloseTransactionResponse;
import it.cavallium.rockserver.core.common.api.proto.CreateColumnRequest;
import it.cavallium.rockserver.core.common.api.proto.CreateColumnResponse;
import it.cavallium.rockserver.core.common.api.proto.DeleteColumnRequest;
import it.cavallium.rockserver.core.common.api.proto.Delta;
import it.cavallium.rockserver.core.common.api.proto.GetColumnIdRequest;
import it.cavallium.rockserver.core.common.api.proto.GetColumnIdResponse;
import it.cavallium.rockserver.core.common.api.proto.GetRequest;
import it.cavallium.rockserver.core.common.api.proto.GetResponse;
import it.cavallium.rockserver.core.common.api.proto.KV;
import it.cavallium.rockserver.core.common.api.proto.OpenIteratorRequest;
import it.cavallium.rockserver.core.common.api.proto.OpenIteratorResponse;
import it.cavallium.rockserver.core.common.api.proto.OpenTransactionRequest;
import it.cavallium.rockserver.core.common.api.proto.OpenTransactionResponse;
import it.cavallium.rockserver.core.common.api.proto.Previous;
import it.cavallium.rockserver.core.common.api.proto.PreviousPresence;
import it.cavallium.rockserver.core.common.api.proto.PutMultiInitialRequest;
import it.cavallium.rockserver.core.common.api.proto.PutMultiRequest;
import it.cavallium.rockserver.core.common.api.proto.PutRequest;
import it.cavallium.rockserver.core.common.api.proto.RocksDBServiceGrpc.RocksDBServiceImplBase;
import it.cavallium.rockserver.core.common.api.proto.SeekToRequest;
import it.cavallium.rockserver.core.common.api.proto.SubsequentRequest;
import it.cavallium.rockserver.core.common.api.proto.UpdateBegin;
import it.unimi.dsi.fastutil.ints.Int2IntFunction;
import it.unimi.dsi.fastutil.ints.Int2ObjectFunction;
import it.unimi.dsi.fastutil.ints.IntArrayList;
import it.unimi.dsi.fastutil.ints.IntList;
import it.unimi.dsi.fastutil.objects.ObjectArrayList;
import it.unimi.dsi.fastutil.objects.ObjectList;
import java.io.IOException;
import java.lang.foreign.Arena;
import java.lang.foreign.MemorySegment;
import java.net.InetSocketAddress;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.logging.Logger;
public class GrpcServer extends Server {
private static final Logger LOG = Logger.getLogger(GrpcServer.class.getName());
private final GrpcServerImpl grpc;
private final io.grpc.Server server;
public GrpcServer(RocksDBConnection client, String http2Host, int http2Port) throws IOException {
super(client);
this.grpc = new GrpcServerImpl(this.getClient());
this.server = NettyServerBuilder.forAddress(new InetSocketAddress(http2Host, http2Port))
.addService(grpc)
.build();
server.start();
LOG.info("GRPC RocksDB server is listening at " + http2Host + ":" + http2Port);
}
private static final class GrpcServerImpl extends RocksDBServiceImplBase {
private static final Function<? super Void, Empty> MAP_EMPTY = _ -> Empty.getDefaultInstance();
private final RocksDBConnection client;
private final Arena autoArena;
public GrpcServerImpl(RocksDBConnection client) {
this.client = client;
this.autoArena = Arena.ofAuto();
}
// functions
@Override
public void openTransaction(OpenTransactionRequest request,
StreamObserver<OpenTransactionResponse> responseObserver) {
client.getAsyncApi()
.openTransactionAsync(request.getTimeoutMs())
.whenComplete(handleResponseObserver(
txId -> OpenTransactionResponse.newBuilder().setTransactionId(txId).build(),
responseObserver));
}
@Override
public void closeTransaction(CloseTransactionRequest request,
StreamObserver<CloseTransactionResponse> responseObserver) {
client.getAsyncApi()
.closeTransactionAsync(request.getTransactionId(), request.getCommit())
.whenComplete(handleResponseObserver(
committed -> CloseTransactionResponse.newBuilder().setSuccessful(committed).build(),
responseObserver
));
}
@Override
public void closeFailedUpdate(CloseFailedUpdateRequest request, StreamObserver<Empty> responseObserver) {
client.getAsyncApi()
.closeFailedUpdateAsync(request.getUpdateId())
.whenComplete(handleResponseObserver(MAP_EMPTY, responseObserver));
}
@Override
public void createColumn(CreateColumnRequest request, StreamObserver<CreateColumnResponse> responseObserver) {
client.getAsyncApi()
.createColumnAsync(request.getName(), mapColumnSchema(request.getSchema()))
.whenComplete(handleResponseObserver(
colId -> CreateColumnResponse.newBuilder().setColumnId(colId).build(),
responseObserver));
}
@Override
public void deleteColumn(DeleteColumnRequest request, StreamObserver<Empty> responseObserver) {
client.getAsyncApi()
.deleteColumnAsync(request.getColumnId())
.whenComplete(handleResponseObserver(MAP_EMPTY, responseObserver));
}
@Override
public void getColumnId(GetColumnIdRequest request, StreamObserver<GetColumnIdResponse> responseObserver) {
client.getAsyncApi()
.getColumnIdAsync(request.getName())
.whenComplete(handleResponseObserver(
colId -> GetColumnIdResponse.newBuilder().setColumnId(colId).build(),
responseObserver));
}
@Override
public void put(PutRequest request, StreamObserver<Empty> responseObserver) {
client.getAsyncApi()
.putAsync(autoArena,
request.getTransactionOrUpdateId(),
request.getColumnId(),
mapKeys(request.getData().getKeysCount(), request.getData()::getKeys),
MemorySegment.ofBuffer(request.getData().getValue().asReadOnlyByteBuffer()),
new RequestNothing<>()
)
.whenComplete(handleResponseObserver(MAP_EMPTY, responseObserver));
}
@Override
public StreamObserver<PutMultiRequest> putMulti(StreamObserver<Empty> responseObserver) {
return new StreamObserver<>() {
private boolean initialRequestDone = false;
private long requestsCount = 0;
private boolean requestsCountFinalized;
private final AtomicLong processedRequestsCount = new AtomicLong();
private PutMultiInitialRequest initialRequest;
@Override
public void onNext(PutMultiRequest request) {
switch (request.getPutMultiRequestTypeCase()) {
case INITIALREQUEST -> {
if (initialRequestDone) {
throw new UnsupportedOperationException("Initial request already done!");
}
this.initialRequest = request.getInitialRequest();
this.initialRequestDone = true;
}
case DATA -> {
if (!initialRequestDone) {
throw new UnsupportedOperationException("Initial request already done!");
}
++requestsCount;
client.getAsyncApi()
.putAsync(autoArena,
initialRequest.getTransactionOrUpdateId(),
initialRequest.getColumnId(),
mapKeys(request.getData().getKeysCount(), request.getData()::getKeys),
MemorySegment.ofBuffer(request.getData().getValue().asReadOnlyByteBuffer()),
new RequestNothing<>()
)
.whenComplete((_, error) -> {
if (error != null) {
responseObserver.onError(error);
} else {
var newProcessedRequestCount = processedRequestsCount.incrementAndGet();
if (requestsCountFinalized) {
if (newProcessedRequestCount == requestsCount) {
responseObserver.onCompleted();
}
}
}
});
}
case null, default ->
throw new UnsupportedOperationException("Unsupported operation: "
+ request.getPutMultiRequestTypeCase());
}
}
@Override
public void onError(Throwable t) {
responseObserver.onError(t);
}
@Override
public void onCompleted() {
requestsCountFinalized = true;
if (requestsCount == 0) {
responseObserver.onCompleted();
}
}
};
}
@Override
public void putGetPrevious(PutRequest request, StreamObserver<Previous> responseObserver) {
client.getAsyncApi()
.putAsync(autoArena,
request.getTransactionOrUpdateId(),
request.getColumnId(),
mapKeys(request.getData().getKeysCount(), request.getData()::getKeys),
MemorySegment.ofBuffer(request.getData().getValue().asReadOnlyByteBuffer()),
new RequestPrevious<>()
)
.whenComplete(handleResponseObserver(
prev -> {
var prevBuilder = Previous.newBuilder();
if (prev != null) {
prevBuilder.setPrevious(ByteString.copyFrom(prev.asByteBuffer()));
}
return prevBuilder.build();
},
responseObserver));
}
@Override
public void putGetDelta(PutRequest request, StreamObserver<Delta> responseObserver) {
client.getAsyncApi()
.putAsync(autoArena,
request.getTransactionOrUpdateId(),
request.getColumnId(),
mapKeys(request.getData().getKeysCount(), request.getData()::getKeys),
MemorySegment.ofBuffer(request.getData().getValue().asReadOnlyByteBuffer()),
new RequestDelta<>()
)
.whenComplete(handleResponseObserver(
delta -> {
var deltaBuilder = Delta.newBuilder();
if (delta.previous() != null) {
deltaBuilder.setPrevious(ByteString.copyFrom(delta.previous().asByteBuffer()));
}
if (delta.current() != null) {
deltaBuilder.setCurrent(ByteString.copyFrom(delta.current().asByteBuffer()));
}
return deltaBuilder.build();
},
responseObserver));
}
@Override
public void putGetChanged(PutRequest request, StreamObserver<Changed> responseObserver) {
client.getAsyncApi()
.putAsync(autoArena,
request.getTransactionOrUpdateId(),
request.getColumnId(),
mapKeys(request.getData().getKeysCount(), request.getData()::getKeys),
MemorySegment.ofBuffer(request.getData().getValue().asReadOnlyByteBuffer()),
new RequestChanged<>()
)
.whenComplete(handleResponseObserver(
changed -> Changed.newBuilder().setChanged(changed).build(),
responseObserver));
}
@Override
public void putGetPreviousPresence(PutRequest request, StreamObserver<PreviousPresence> responseObserver) {
client.getAsyncApi()
.putAsync(autoArena,
request.getTransactionOrUpdateId(),
request.getColumnId(),
mapKeys(request.getData().getKeysCount(), request.getData()::getKeys),
MemorySegment.ofBuffer(request.getData().getValue().asReadOnlyByteBuffer()),
new RequestPreviousPresence<>()
)
.whenComplete(handleResponseObserver(
present -> PreviousPresence.newBuilder().setPresent(present).build(),
responseObserver));
}
@Override
public void get(GetRequest request, StreamObserver<GetResponse> responseObserver) {
client.getAsyncApi()
.getAsync(autoArena,
request.getTransactionOrUpdateId(),
request.getColumnId(),
mapKeys(request.getKeysCount(), request::getKeys),
new RequestCurrent<>()
)
.whenComplete(handleResponseObserver(
current -> {
var response = GetResponse.newBuilder();
if (current != null) {
response.setValue(ByteString.copyFrom(current.asByteBuffer()));
}
return response.build();
},
responseObserver));
}
@Override
public void getForUpdate(GetRequest request, StreamObserver<UpdateBegin> responseObserver) {
client.getAsyncApi()
.getAsync(autoArena,
request.getTransactionOrUpdateId(),
request.getColumnId(),
mapKeys(request.getKeysCount(), request::getKeys),
new RequestForUpdate<>()
)
.whenComplete(handleResponseObserver(
forUpdate -> {
var response = UpdateBegin.newBuilder();
response.setUpdateId(forUpdate.updateId());
if (forUpdate.previous() != null) {
response.setPrevious(ByteString.copyFrom(forUpdate.previous().asByteBuffer()));
}
return response.build();
},
responseObserver));
}
@Override
public void exists(GetRequest request, StreamObserver<PreviousPresence> responseObserver) {
client.getAsyncApi()
.getAsync(autoArena,
request.getTransactionOrUpdateId(),
request.getColumnId(),
mapKeys(request.getKeysCount(), request::getKeys),
new RequestExists<>()
)
.whenComplete(handleResponseObserver(
exists -> PreviousPresence.newBuilder().setPresent(exists).build(),
responseObserver));
}
@Override
public void openIterator(OpenIteratorRequest request, StreamObserver<OpenIteratorResponse> responseObserver) {
client.getAsyncApi()
.openIteratorAsync(autoArena,
request.getTransactionId(),
request.getColumnId(),
mapKeys(request.getStartKeysInclusiveCount(), request::getStartKeysInclusive),
mapKeys(request.getEndKeysExclusiveCount(), request::getEndKeysExclusive),
request.getReverse(),
request.getTimeoutMs()
)
.whenComplete(handleResponseObserver(
iteratorId -> OpenIteratorResponse.newBuilder().setIteratorId(iteratorId).build(),
responseObserver));
}
@Override
public void closeIterator(CloseIteratorRequest request, StreamObserver<Empty> responseObserver) {
client.getAsyncApi()
.closeIteratorAsync(request.getIteratorId())
.whenComplete(handleResponseObserver(MAP_EMPTY, responseObserver));
}
@Override
public void seekTo(SeekToRequest request, StreamObserver<Empty> responseObserver) {
client.getAsyncApi()
.seekToAsync(autoArena, request.getIterationId(), mapKeys(request.getKeysCount(), request::getKeys))
.whenComplete(handleResponseObserver(MAP_EMPTY, responseObserver));
}
@Override
public void subsequent(SubsequentRequest request, StreamObserver<Empty> responseObserver) {
client.getAsyncApi()
.subsequentAsync(autoArena,
request.getIterationId(),
request.getSkipCount(),
request.getTakeCount(),
new RequestNothing<>()
)
.whenComplete(handleResponseObserver(MAP_EMPTY, responseObserver));
}
@Override
public void subsequentExists(SubsequentRequest request, StreamObserver<PreviousPresence> responseObserver) {
client.getAsyncApi()
.subsequentAsync(autoArena,
request.getIterationId(),
request.getSkipCount(),
request.getTakeCount(),
new RequestExists<>()
)
.whenComplete(handleResponseObserver(
exists -> PreviousPresence.newBuilder().setPresent(exists).build(),
responseObserver));
}
public void subsequentMultiPage(SubsequentRequest request, StreamObserver<KV> responseObserver, int pageIndex) {
final long pageSize = 16L;
if (request.getTakeCount() > pageIndex * pageSize) {
client.getAsyncApi()
.subsequentAsync(autoArena,
request.getIterationId(),
pageIndex == 0 ? request.getSkipCount() : 0,
Math.min(request.getTakeCount() - pageIndex * pageSize, pageSize),
new RequestMulti<>()
)
.whenComplete((response, ex) -> {
if (ex != null) {
responseObserver.onError(ex);
} else {
for (MemorySegment entry : response) {
Keys keys = null; // todo: implement
MemorySegment value = entry;
responseObserver.onNext(KV.newBuilder()
.addAllKeys(null) // todo: implement
.setValue(ByteString.copyFrom(value.asByteBuffer()))
.build());
}
subsequentMultiPage(request, responseObserver, pageIndex + 1);
}
});
} else {
responseObserver.onCompleted();
}
}
@Override
public void subsequentMultiGet(SubsequentRequest request, StreamObserver<KV> responseObserver) {
subsequentMultiPage(request, responseObserver, 0);
}
// mappers
private static ColumnSchema mapColumnSchema(it.cavallium.rockserver.core.common.api.proto.ColumnSchema schema) {
return ColumnSchema.of(mapKeysLength(schema.getFixedKeysCount(), schema::getFixedKeys),
mapVariableTailKeys(schema.getVariableTailKeysCount(), schema::getVariableTailKeys),
schema.getHasValue()
);
}
private static IntList mapKeysLength(int count, Int2IntFunction keyGetterAt) {
var l = new IntArrayList(count);
for (int i = 0; i < count; i++) {
l.add(keyGetterAt.apply(i));
}
return l;
}
private static ObjectList<ColumnHashType> mapVariableTailKeys(int count,
Int2ObjectFunction<it.cavallium.rockserver.core.common.api.proto.ColumnHashType> variableTailKeyGetterAt) {
var l = new ObjectArrayList<ColumnHashType>(count);
for (int i = 0; i < count; i++) {
l.add(switch (variableTailKeyGetterAt.apply(i)) {
case XXHASH32 -> ColumnHashType.XXHASH32;
case XXHASH8 -> ColumnHashType.XXHASH8;
case ALLSAME8 -> ColumnHashType.ALLSAME8;
case UNRECOGNIZED -> throw new UnsupportedOperationException();
});
}
return l;
}
private static Keys mapKeys(int count, Int2ObjectFunction<ByteString> keyGetterAt) {
var segments = new MemorySegment[count];
for (int i = 0; i < count; i++) {
segments[i] = MemorySegment.ofBuffer(keyGetterAt.apply(i).asReadOnlyByteBuffer());
}
return new Keys(segments);
}
// utils
private static <T> BiConsumer<? super T, Throwable> handleResponseObserver(StreamObserver<T> responseObserver) {
return (value, ex) -> {
if (ex != null) {
responseObserver.onError(ex);
} else {
if (value != null) {
responseObserver.onNext(value);
}
responseObserver.onCompleted();
}
};
}
private static <PREV, T> BiConsumer<? super PREV, Throwable> handleResponseObserver(Function<PREV, T> resultMapper,
StreamObserver<T> responseObserver) {
return (value, ex) -> {
if (ex != null) {
responseObserver.onError(ex);
} else {
T mapped;
try {
mapped = resultMapper.apply(value);
} catch (Throwable ex2) {
responseObserver.onError(ex2);
return;
}
if (mapped != null) {
responseObserver.onNext(mapped);
}
responseObserver.onCompleted();
}
};
}
}
@Override
public void close() throws IOException {
LOG.info("GRPC server is shutting down...");
server.shutdown();
try {
server.awaitTermination();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
super.close();
}
}

View File

@ -14,6 +14,7 @@ public class ServerBuilder {
private UnixDomainSocketAddress unixAddress; private UnixDomainSocketAddress unixAddress;
private String http2Host; private String http2Host;
private int http2Port; private int http2Port;
private boolean useThrift;
private RocksDBConnection client; private RocksDBConnection client;
public void setUnixSocket(UnixDomainSocketAddress address) { public void setUnixSocket(UnixDomainSocketAddress address) {
@ -29,13 +30,21 @@ public class ServerBuilder {
this.http2Port = port; this.http2Port = port;
} }
public void setUseThrift(boolean useThrift) {
this.useThrift = useThrift;
}
public void setClient(RocksDBConnection client) { public void setClient(RocksDBConnection client) {
this.client = client; this.client = client;
} }
public Server build() throws IOException { public Server build() throws IOException {
if (http2Host != null) { if (http2Host != null) {
if (useThrift) {
return new ThriftServer(client, http2Host, http2Port); return new ThriftServer(client, http2Host, http2Port);
} else {
return new GrpcServer(client, http2Host, http2Port);
}
} else if (unixAddress != null) { } else if (unixAddress != null) {
throw new UnsupportedOperationException("Not implemented: unix socket"); throw new UnsupportedOperationException("Not implemented: unix socket");
} else if (iNetAddress != null) { } else if (iNetAddress != null) {

View File

@ -22,6 +22,7 @@ import java.net.InetSocketAddress;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.ByteOrder; import java.nio.ByteOrder;
import java.util.List; import java.util.List;
import java.util.logging.Logger;
import org.apache.thrift.TException; import org.apache.thrift.TException;
import org.apache.thrift.server.TThreadedSelectorServer; import org.apache.thrift.server.TThreadedSelectorServer;
import org.apache.thrift.transport.TNonblockingServerSocket; import org.apache.thrift.transport.TNonblockingServerSocket;
@ -30,19 +31,26 @@ import org.jetbrains.annotations.NotNull;
public class ThriftServer extends Server { public class ThriftServer extends Server {
private static final Logger LOG = Logger.getLogger(ThriftServer.class.getName());
private static final OfByte BYTE_BE = ValueLayout.JAVA_BYTE.withOrder(ByteOrder.BIG_ENDIAN); private static final OfByte BYTE_BE = ValueLayout.JAVA_BYTE.withOrder(ByteOrder.BIG_ENDIAN);
private final Thread thriftThread;
private final TThreadedSelectorServer server;
public ThriftServer(RocksDBConnection client, String http2Host, int http2Port) throws IOException { public ThriftServer(RocksDBConnection client, String http2Host, int http2Port) throws IOException {
super(client); super(client);
var handler = new ThriftHandler(this.getClient()); var handler = new ThriftHandler(this.getClient());
try { try {
var serverTransport = new TNonblockingServerSocket(new InetSocketAddress(http2Host, http2Port)); var serverTransport = new TNonblockingServerSocket(new InetSocketAddress(http2Host, http2Port));
var server = new TThreadedSelectorServer(new TThreadedSelectorServer.Args(serverTransport) this.server = new TThreadedSelectorServer(new TThreadedSelectorServer.Args(serverTransport)
.processor(new Processor<>(handler)) .processor(new Processor<>(handler))
); );
server.serve(); var thriftThread = new Thread(server::serve);
thriftThread.setName("Thrift server thread");
this.thriftThread = thriftThread;
LOG.info("Thrift RocksDB server is listening at " + http2Host + ":" + http2Port);
} catch (TTransportException e) { } catch (TTransportException e) {
throw new IOException("Can't open server socket", e); throw new IOException("Can't open server socket", e);
} }
@ -300,4 +308,11 @@ public class ThriftServer extends Server {
} }
} }
} }
@Override
public void close() throws IOException {
LOG.info("Thrift server is shutting down...");
this.server.stop();
super.close();
}
} }

View File

@ -9,6 +9,13 @@ module rockserver.core {
requires it.unimi.dsi.fastutil; requires it.unimi.dsi.fastutil;
requires org.apache.thrift; requires org.apache.thrift;
requires org.slf4j; requires org.slf4j;
requires protobuf.java;
requires io.grpc.protobuf;
requires io.grpc.stub;
requires io.grpc;
requires jsr305;
requires com.google.common;
requires io.grpc.netty;
exports it.cavallium.rockserver.core.client; exports it.cavallium.rockserver.core.client;
exports it.cavallium.rockserver.core.common; exports it.cavallium.rockserver.core.common;

View File

@ -0,0 +1,105 @@
syntax = "proto3";
import "google/protobuf/empty.proto";
option java_multiple_files = true;
package it.cavallium.rockserver.core.common.api.proto;
message ColumnSchema {
repeated int32 fixedKeys = 1;
repeated ColumnHashType variableTailKeys = 2;
bool hasValue = 3;
}
enum ColumnHashType {
XXHASH32 = 0;
XXHASH8 = 1;
ALLSAME8 = 2;
}
enum Operation {
NOTHING = 0;
PREVIOUS = 1;
CURRENT = 2;
FOR_UPDATE = 3;
EXISTS = 4;
DELTA = 5;
MULTI = 6;
CHANGED = 7;
PREVIOUS_PRESENCE = 8;
}
message Delta {
optional bytes previous = 1;
optional bytes current = 2;
}
message Previous {optional bytes previous = 1;}
message Changed {bool changed = 1;}
message PreviousPresence {bool present = 1;}
message UpdateBegin {
optional bytes previous = 1;
optional int64 updateId = 2;
}
message KV {
repeated bytes keys = 1;
bytes value = 2;
}
message OpenTransactionRequest {int64 timeoutMs = 1;}
message OpenTransactionResponse {int64 transactionId = 1;}
message CloseTransactionRequest {int64 transactionId = 1; int64 timeoutMs = 2; bool commit = 3;}
message CloseTransactionResponse {bool successful = 1;}
message CloseFailedUpdateRequest {int64 updateId = 1;}
message CreateColumnRequest {string name = 1; ColumnSchema schema = 2;}
message CreateColumnResponse {int64 columnId = 1;}
message DeleteColumnRequest {int64 columnId = 1;}
message GetColumnIdRequest {string name = 1;}
message GetColumnIdResponse {int64 columnId = 1;}
message PutRequest {int64 transactionOrUpdateId = 1; int64 columnId = 2; KV data = 3;}
message PutMultiInitialRequest {int64 transactionOrUpdateId = 1; int64 columnId = 2;}
message PutMultiRequest {oneof putMultiRequestType {PutMultiInitialRequest initialRequest = 1;KV data = 2;}}
message GetRequest {int64 transactionOrUpdateId = 1; int64 columnId = 2; repeated bytes keys = 3;}
message GetResponse {optional bytes value = 1;}
message OpenIteratorRequest {int64 transactionId = 1; int64 columnId = 2; repeated bytes startKeysInclusive = 3; repeated bytes endKeysExclusive = 4; bool reverse = 5; int64 timeoutMs = 6;}
message OpenIteratorResponse {int64 iteratorId = 1;}
message CloseIteratorRequest {int64 iteratorId = 1;}
message SeekToRequest {int64 iterationId = 1; repeated bytes keys = 2;}
message SubsequentRequest {int64 iterationId = 1; int64 skipCount = 2; int64 takeCount = 3;}
service RocksDBService {
rpc openTransaction(OpenTransactionRequest) returns (OpenTransactionResponse);
rpc closeTransaction(CloseTransactionRequest) returns (CloseTransactionResponse);
rpc closeFailedUpdate(CloseFailedUpdateRequest) returns (google.protobuf.Empty);
rpc createColumn(CreateColumnRequest) returns (CreateColumnResponse);
rpc deleteColumn(DeleteColumnRequest) returns (google.protobuf.Empty);
rpc getColumnId(GetColumnIdRequest) returns (GetColumnIdResponse);
rpc put(PutRequest) returns (google.protobuf.Empty);
rpc putMulti(stream PutMultiRequest) returns (google.protobuf.Empty);
rpc putGetPrevious(PutRequest) returns (Previous);
rpc putGetDelta(PutRequest) returns (Delta);
rpc putGetChanged(PutRequest) returns (Changed);
rpc putGetPreviousPresence(PutRequest) returns (PreviousPresence);
rpc get(GetRequest) returns (GetResponse);
rpc getForUpdate(GetRequest) returns (UpdateBegin);
rpc exists(GetRequest) returns (PreviousPresence);
rpc openIterator(OpenIteratorRequest) returns (OpenIteratorResponse);
rpc closeIterator(CloseIteratorRequest) returns (google.protobuf.Empty);
rpc seekTo(SeekToRequest) returns (google.protobuf.Empty);
rpc subsequent(SubsequentRequest) returns (google.protobuf.Empty);
rpc subsequentExists(SubsequentRequest) returns (PreviousPresence);
rpc subsequentMultiGet(SubsequentRequest) returns (stream KV);
}