From a6320722a1acdb9712d75d215a6a6b5a5bd8d382 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Wed, 2 Oct 2024 18:52:18 +0200 Subject: [PATCH] Add socket address --- src/fatjar/java/module-info.java | 3 +- src/library/java/module-info.java | 1 + .../rockserver/core/TestGrpcLoop.java | 3 +- .../rockserver/core/client/ClientBuilder.java | 4 +- .../core/client/GrpcConnection.java | 41 +++++++++++++++++-- .../rockserver/core/server/GrpcServer.java | 24 +++++------ .../rockserver/core/server/ServerBuilder.java | 8 ++-- src/native/java/module-info.java | 1 + 8 files changed, 59 insertions(+), 26 deletions(-) diff --git a/src/fatjar/java/module-info.java b/src/fatjar/java/module-info.java index f0e1a23..b390eac 100644 --- a/src/fatjar/java/module-info.java +++ b/src/fatjar/java/module-info.java @@ -29,8 +29,9 @@ module rockserver.core { requires jdk.unsupported; requires io.netty.transport.classes.epoll; requires org.reactivestreams; + requires io.netty.transport.unix.common; - exports it.cavallium.rockserver.core.client; + exports it.cavallium.rockserver.core.client; exports it.cavallium.rockserver.core.common; exports it.cavallium.rockserver.core.config; opens it.cavallium.rockserver.core.resources; diff --git a/src/library/java/module-info.java b/src/library/java/module-info.java index 641528f..bb8c172 100644 --- a/src/library/java/module-info.java +++ b/src/library/java/module-info.java @@ -27,6 +27,7 @@ module rockserver.core { requires jdk.unsupported; requires io.netty.transport.classes.epoll; requires org.reactivestreams; + requires io.netty.transport.unix.common; exports it.cavallium.rockserver.core.client; exports it.cavallium.rockserver.core.common; diff --git a/src/main/java/it/cavallium/rockserver/core/TestGrpcLoop.java b/src/main/java/it/cavallium/rockserver/core/TestGrpcLoop.java index 0f10db0..366c887 100644 --- a/src/main/java/it/cavallium/rockserver/core/TestGrpcLoop.java +++ b/src/main/java/it/cavallium/rockserver/core/TestGrpcLoop.java @@ -15,11 +15,12 @@ import it.unimi.dsi.fastutil.objects.ObjectList; import java.io.IOException; import java.lang.foreign.Arena; import java.lang.foreign.MemorySegment; +import java.net.InetSocketAddress; public class TestGrpcLoop { public static void main(String[] args) throws IOException, InterruptedException { var embeddedDB = new EmbeddedConnection(null, "main", null); - var server = new GrpcServer(embeddedDB, "localhost", 12345); + var server = new GrpcServer(embeddedDB, new InetSocketAddress("localhost", 12345)); var clientB = new ClientBuilder(); clientB.setHttpAddress(new Utils.HostAndPort("localhost", 12345)); clientB.setName("local"); diff --git a/src/main/java/it/cavallium/rockserver/core/client/ClientBuilder.java b/src/main/java/it/cavallium/rockserver/core/client/ClientBuilder.java index c339362..20b70b6 100644 --- a/src/main/java/it/cavallium/rockserver/core/client/ClientBuilder.java +++ b/src/main/java/it/cavallium/rockserver/core/client/ClientBuilder.java @@ -54,12 +54,12 @@ public class ClientBuilder { } else if (embeddedPath != null) { return new EmbeddedConnection(embeddedPath, name, embeddedConfig); } else if (unixAddress != null) { - throw new UnsupportedOperationException("Not implemented: unix socket"); + return GrpcConnection.forPath(name, unixAddress.getPath()); } else if (httpAddress != null) { if (useThrift) { throw new UnsupportedOperationException("Not implemented: thrift http2 address"); } else { - return new GrpcConnection(name, httpAddress); + return GrpcConnection.forHostAndPort(name, httpAddress); } } else if (iNetAddress != null) { throw new UnsupportedOperationException("Not implemented: inet address"); diff --git a/src/main/java/it/cavallium/rockserver/core/client/GrpcConnection.java b/src/main/java/it/cavallium/rockserver/core/client/GrpcConnection.java index f5cf429..e7a0a1c 100644 --- a/src/main/java/it/cavallium/rockserver/core/client/GrpcConnection.java +++ b/src/main/java/it/cavallium/rockserver/core/client/GrpcConnection.java @@ -10,7 +10,13 @@ import com.google.protobuf.Empty; import com.google.protobuf.UnsafeByteOperations; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; +import io.grpc.netty.NettyChannelBuilder; import io.grpc.stub.*; +import io.netty.channel.epoll.EpollEventLoopGroup; +import io.netty.channel.epoll.EpollServerDomainSocketChannel; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.channel.unix.DomainSocketAddress; import it.cavallium.rockserver.core.common.*; import it.cavallium.rockserver.core.common.ColumnSchema; import it.cavallium.rockserver.core.common.KVBatch; @@ -34,7 +40,11 @@ import it.cavallium.rockserver.core.common.api.proto.RocksDBServiceGrpc.RocksDBS import it.unimi.dsi.fastutil.ints.IntArrayList; import java.lang.foreign.Arena; import java.lang.foreign.MemorySegment; +import java.net.InetSocketAddress; +import java.net.SocketAddress; import java.net.URI; +import java.net.UnixDomainSocketAddress; +import java.nio.file.Path; import java.util.ArrayList; import java.util.Arrays; import java.util.Iterator; @@ -61,17 +71,40 @@ public class GrpcConnection extends BaseConnection implements RocksDBAPI { private final RocksDBServiceFutureStub futureStub; private final URI address; - public GrpcConnection(String name, HostAndPort address) { + private GrpcConnection(String name, SocketAddress socketAddress, URI address) { super(name); - var channelBuilder = ManagedChannelBuilder - .forAddress(address.host(), address.port()) + var channelBuilder = NettyChannelBuilder + .forAddress(socketAddress) .directExecutor() .usePlaintext(); + if (socketAddress instanceof UnixDomainSocketAddress _) { + channelBuilder + .eventLoopGroup(new EpollEventLoopGroup(Runtime.getRuntime().availableProcessors() * 2)) + .channelType(EpollServerDomainSocketChannel.class); + } else { + channelBuilder + .eventLoopGroup(new NioEventLoopGroup(Runtime.getRuntime().availableProcessors() * 2)) + .channelType(NioServerSocketChannel.class); + } this.channel = channelBuilder.build(); this.blockingStub = RocksDBServiceGrpc.newBlockingStub(channel); this.asyncStub = RocksDBServiceGrpc.newStub(channel); this.futureStub = RocksDBServiceGrpc.newFutureStub(channel); - this.address = URI.create("http://" + address.host() + ":" + address.port()); + this.address = address; + } + + public static GrpcConnection forHostAndPort(String name, HostAndPort address) { + return new GrpcConnection(name, + new InetSocketAddress(address.host(), address.port()), + URI.create("http://" + address.host() + ":" + address.port()) + ); + } + + public static GrpcConnection forPath(String name, Path unixSocketPath) { + return new GrpcConnection(name, + new DomainSocketAddress(unixSocketPath.toFile()), + URI.create("unix://" + unixSocketPath) + ); } @Override diff --git a/src/main/java/it/cavallium/rockserver/core/server/GrpcServer.java b/src/main/java/it/cavallium/rockserver/core/server/GrpcServer.java index 6623eef..edc4ab3 100644 --- a/src/main/java/it/cavallium/rockserver/core/server/GrpcServer.java +++ b/src/main/java/it/cavallium/rockserver/core/server/GrpcServer.java @@ -16,6 +16,7 @@ import io.netty.channel.epoll.EpollEventLoopGroup; import io.netty.channel.epoll.EpollServerDomainSocketChannel; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.channel.unix.DomainSocketAddress; import io.netty.util.NettyRuntime; import it.cavallium.rockserver.core.client.RocksDBConnection; import it.cavallium.rockserver.core.common.*; @@ -45,6 +46,8 @@ import java.io.IOException; import java.lang.foreign.Arena; import java.lang.foreign.MemorySegment; import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.net.UnixDomainSocketAddress; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletionException; @@ -67,27 +70,22 @@ public class GrpcServer extends Server { private final ExecutorService executor; private final io.grpc.Server server; - public GrpcServer(RocksDBConnection client, String http2Host, int http2Port) throws IOException { + public GrpcServer(RocksDBConnection client, SocketAddress socketAddress) throws IOException { super(client); this.grpc = new GrpcServerImpl(this.getClient()); EventLoopGroup elg; Class channelType; - if (http2Port != 0) { + if (socketAddress instanceof DomainSocketAddress _) { + elg = new EpollEventLoopGroup(Runtime.getRuntime().availableProcessors() * 2); + channelType = EpollServerDomainSocketChannel.class; + } else { elg = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors() * 2); channelType = NioServerSocketChannel.class; - } else { - try { - elg = new EpollEventLoopGroup(Runtime.getRuntime().availableProcessors() * 2); - channelType = EpollServerDomainSocketChannel.class; - } catch (Throwable ex) { - LOG.warn("Can't load Epoll event loop group, the server will be slower", ex); - elg = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors() * 2); - channelType = NioServerSocketChannel.class; - } } this.elg = elg; this.executor = Executors.newWorkStealingPool(Runtime.getRuntime().availableProcessors() * 2); - this.server = NettyServerBuilder.forAddress(new InetSocketAddress(http2Host, http2Port)) + this.server = NettyServerBuilder + .forAddress(socketAddress) .bossEventLoopGroup(elg) .workerEventLoopGroup(elg) .directExecutor() @@ -97,7 +95,7 @@ public class GrpcServer extends Server { .addService(grpc) .build(); server.start(); - LOG.info("GRPC RocksDB server is listening at " + http2Host + ":" + http2Port); + LOG.info("GRPC RocksDB server is listening at " + socketAddress); } private final class GrpcServerImpl extends RocksDBServiceImplBase { diff --git a/src/main/java/it/cavallium/rockserver/core/server/ServerBuilder.java b/src/main/java/it/cavallium/rockserver/core/server/ServerBuilder.java index d819128..ebbb18e 100644 --- a/src/main/java/it/cavallium/rockserver/core/server/ServerBuilder.java +++ b/src/main/java/it/cavallium/rockserver/core/server/ServerBuilder.java @@ -1,13 +1,11 @@ package it.cavallium.rockserver.core.server; -import it.cavallium.rockserver.core.client.ClientBuilder; -import it.cavallium.rockserver.core.client.EmbeddedConnection; +import io.netty.channel.unix.DomainSocketAddress; import it.cavallium.rockserver.core.client.RocksDBConnection; import it.cavallium.rockserver.core.common.Utils.HostAndPort; import java.io.IOException; import java.net.InetSocketAddress; import java.net.UnixDomainSocketAddress; -import java.nio.file.Path; public class ServerBuilder { @@ -42,10 +40,10 @@ public class ServerBuilder { if (useThrift) { return new ThriftServer(client, http2Address.host(), http2Address.port()); } else { - return new GrpcServer(client, http2Address.host(), http2Address.port()); + return new GrpcServer(client, new InetSocketAddress(http2Address.host(), http2Address.port())); } } else if (unixAddress != null) { - throw new UnsupportedOperationException("Not implemented: unix socket"); + return new GrpcServer(client, new DomainSocketAddress(unixAddress.getPath().toFile())); } else if (iNetAddress != null) { throw new UnsupportedOperationException("Not implemented: inet address"); } else { diff --git a/src/native/java/module-info.java b/src/native/java/module-info.java index a95a2e3..695d8c1 100644 --- a/src/native/java/module-info.java +++ b/src/native/java/module-info.java @@ -29,6 +29,7 @@ module rockserver.core { requires io.netty.codec.http2; requires io.netty.transport.classes.epoll; requires org.reactivestreams; + requires io.netty.transport.unix.common; exports it.cavallium.rockserver.core.client; exports it.cavallium.rockserver.core.common;