Add socket address

This commit is contained in:
Andrea Cavalli 2024-10-02 18:52:18 +02:00
parent 9ba2026265
commit a6320722a1
8 changed files with 59 additions and 26 deletions

View File

@ -29,8 +29,9 @@ module rockserver.core {
requires jdk.unsupported; requires jdk.unsupported;
requires io.netty.transport.classes.epoll; requires io.netty.transport.classes.epoll;
requires org.reactivestreams; requires org.reactivestreams;
requires io.netty.transport.unix.common;
exports it.cavallium.rockserver.core.client; exports it.cavallium.rockserver.core.client;
exports it.cavallium.rockserver.core.common; exports it.cavallium.rockserver.core.common;
exports it.cavallium.rockserver.core.config; exports it.cavallium.rockserver.core.config;
opens it.cavallium.rockserver.core.resources; opens it.cavallium.rockserver.core.resources;

View File

@ -27,6 +27,7 @@ module rockserver.core {
requires jdk.unsupported; requires jdk.unsupported;
requires io.netty.transport.classes.epoll; requires io.netty.transport.classes.epoll;
requires org.reactivestreams; requires org.reactivestreams;
requires io.netty.transport.unix.common;
exports it.cavallium.rockserver.core.client; exports it.cavallium.rockserver.core.client;
exports it.cavallium.rockserver.core.common; exports it.cavallium.rockserver.core.common;

View File

@ -15,11 +15,12 @@ import it.unimi.dsi.fastutil.objects.ObjectList;
import java.io.IOException; import java.io.IOException;
import java.lang.foreign.Arena; import java.lang.foreign.Arena;
import java.lang.foreign.MemorySegment; import java.lang.foreign.MemorySegment;
import java.net.InetSocketAddress;
public class TestGrpcLoop { public class TestGrpcLoop {
public static void main(String[] args) throws IOException, InterruptedException { public static void main(String[] args) throws IOException, InterruptedException {
var embeddedDB = new EmbeddedConnection(null, "main", null); var embeddedDB = new EmbeddedConnection(null, "main", null);
var server = new GrpcServer(embeddedDB, "localhost", 12345); var server = new GrpcServer(embeddedDB, new InetSocketAddress("localhost", 12345));
var clientB = new ClientBuilder(); var clientB = new ClientBuilder();
clientB.setHttpAddress(new Utils.HostAndPort("localhost", 12345)); clientB.setHttpAddress(new Utils.HostAndPort("localhost", 12345));
clientB.setName("local"); clientB.setName("local");

View File

@ -54,12 +54,12 @@ public class ClientBuilder {
} else if (embeddedPath != null) { } else if (embeddedPath != null) {
return new EmbeddedConnection(embeddedPath, name, embeddedConfig); return new EmbeddedConnection(embeddedPath, name, embeddedConfig);
} else if (unixAddress != null) { } else if (unixAddress != null) {
throw new UnsupportedOperationException("Not implemented: unix socket"); return GrpcConnection.forPath(name, unixAddress.getPath());
} else if (httpAddress != null) { } else if (httpAddress != null) {
if (useThrift) { if (useThrift) {
throw new UnsupportedOperationException("Not implemented: thrift http2 address"); throw new UnsupportedOperationException("Not implemented: thrift http2 address");
} else { } else {
return new GrpcConnection(name, httpAddress); return GrpcConnection.forHostAndPort(name, httpAddress);
} }
} else if (iNetAddress != null) { } else if (iNetAddress != null) {
throw new UnsupportedOperationException("Not implemented: inet address"); throw new UnsupportedOperationException("Not implemented: inet address");

View File

@ -10,7 +10,13 @@ import com.google.protobuf.Empty;
import com.google.protobuf.UnsafeByteOperations; import com.google.protobuf.UnsafeByteOperations;
import io.grpc.ManagedChannel; import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder; import io.grpc.ManagedChannelBuilder;
import io.grpc.netty.NettyChannelBuilder;
import io.grpc.stub.*; import io.grpc.stub.*;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollServerDomainSocketChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.unix.DomainSocketAddress;
import it.cavallium.rockserver.core.common.*; import it.cavallium.rockserver.core.common.*;
import it.cavallium.rockserver.core.common.ColumnSchema; import it.cavallium.rockserver.core.common.ColumnSchema;
import it.cavallium.rockserver.core.common.KVBatch; import it.cavallium.rockserver.core.common.KVBatch;
@ -34,7 +40,11 @@ import it.cavallium.rockserver.core.common.api.proto.RocksDBServiceGrpc.RocksDBS
import it.unimi.dsi.fastutil.ints.IntArrayList; import it.unimi.dsi.fastutil.ints.IntArrayList;
import java.lang.foreign.Arena; import java.lang.foreign.Arena;
import java.lang.foreign.MemorySegment; import java.lang.foreign.MemorySegment;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.URI; import java.net.URI;
import java.net.UnixDomainSocketAddress;
import java.nio.file.Path;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Iterator; import java.util.Iterator;
@ -61,17 +71,40 @@ public class GrpcConnection extends BaseConnection implements RocksDBAPI {
private final RocksDBServiceFutureStub futureStub; private final RocksDBServiceFutureStub futureStub;
private final URI address; private final URI address;
public GrpcConnection(String name, HostAndPort address) { private GrpcConnection(String name, SocketAddress socketAddress, URI address) {
super(name); super(name);
var channelBuilder = ManagedChannelBuilder var channelBuilder = NettyChannelBuilder
.forAddress(address.host(), address.port()) .forAddress(socketAddress)
.directExecutor() .directExecutor()
.usePlaintext(); .usePlaintext();
if (socketAddress instanceof UnixDomainSocketAddress _) {
channelBuilder
.eventLoopGroup(new EpollEventLoopGroup(Runtime.getRuntime().availableProcessors() * 2))
.channelType(EpollServerDomainSocketChannel.class);
} else {
channelBuilder
.eventLoopGroup(new NioEventLoopGroup(Runtime.getRuntime().availableProcessors() * 2))
.channelType(NioServerSocketChannel.class);
}
this.channel = channelBuilder.build(); this.channel = channelBuilder.build();
this.blockingStub = RocksDBServiceGrpc.newBlockingStub(channel); this.blockingStub = RocksDBServiceGrpc.newBlockingStub(channel);
this.asyncStub = RocksDBServiceGrpc.newStub(channel); this.asyncStub = RocksDBServiceGrpc.newStub(channel);
this.futureStub = RocksDBServiceGrpc.newFutureStub(channel); this.futureStub = RocksDBServiceGrpc.newFutureStub(channel);
this.address = URI.create("http://" + address.host() + ":" + address.port()); this.address = address;
}
public static GrpcConnection forHostAndPort(String name, HostAndPort address) {
return new GrpcConnection(name,
new InetSocketAddress(address.host(), address.port()),
URI.create("http://" + address.host() + ":" + address.port())
);
}
public static GrpcConnection forPath(String name, Path unixSocketPath) {
return new GrpcConnection(name,
new DomainSocketAddress(unixSocketPath.toFile()),
URI.create("unix://" + unixSocketPath)
);
} }
@Override @Override

View File

@ -16,6 +16,7 @@ import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollServerDomainSocketChannel; import io.netty.channel.epoll.EpollServerDomainSocketChannel;
import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.unix.DomainSocketAddress;
import io.netty.util.NettyRuntime; import io.netty.util.NettyRuntime;
import it.cavallium.rockserver.core.client.RocksDBConnection; import it.cavallium.rockserver.core.client.RocksDBConnection;
import it.cavallium.rockserver.core.common.*; import it.cavallium.rockserver.core.common.*;
@ -45,6 +46,8 @@ import java.io.IOException;
import java.lang.foreign.Arena; import java.lang.foreign.Arena;
import java.lang.foreign.MemorySegment; import java.lang.foreign.MemorySegment;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.UnixDomainSocketAddress;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.CompletionException; import java.util.concurrent.CompletionException;
@ -67,27 +70,22 @@ public class GrpcServer extends Server {
private final ExecutorService executor; private final ExecutorService executor;
private final io.grpc.Server server; private final io.grpc.Server server;
public GrpcServer(RocksDBConnection client, String http2Host, int http2Port) throws IOException { public GrpcServer(RocksDBConnection client, SocketAddress socketAddress) throws IOException {
super(client); super(client);
this.grpc = new GrpcServerImpl(this.getClient()); this.grpc = new GrpcServerImpl(this.getClient());
EventLoopGroup elg; EventLoopGroup elg;
Class<? extends ServerChannel> channelType; Class<? extends ServerChannel> channelType;
if (http2Port != 0) { if (socketAddress instanceof DomainSocketAddress _) {
elg = new EpollEventLoopGroup(Runtime.getRuntime().availableProcessors() * 2);
channelType = EpollServerDomainSocketChannel.class;
} else {
elg = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors() * 2); elg = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors() * 2);
channelType = NioServerSocketChannel.class; channelType = NioServerSocketChannel.class;
} else {
try {
elg = new EpollEventLoopGroup(Runtime.getRuntime().availableProcessors() * 2);
channelType = EpollServerDomainSocketChannel.class;
} catch (Throwable ex) {
LOG.warn("Can't load Epoll event loop group, the server will be slower", ex);
elg = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors() * 2);
channelType = NioServerSocketChannel.class;
}
} }
this.elg = elg; this.elg = elg;
this.executor = Executors.newWorkStealingPool(Runtime.getRuntime().availableProcessors() * 2); this.executor = Executors.newWorkStealingPool(Runtime.getRuntime().availableProcessors() * 2);
this.server = NettyServerBuilder.forAddress(new InetSocketAddress(http2Host, http2Port)) this.server = NettyServerBuilder
.forAddress(socketAddress)
.bossEventLoopGroup(elg) .bossEventLoopGroup(elg)
.workerEventLoopGroup(elg) .workerEventLoopGroup(elg)
.directExecutor() .directExecutor()
@ -97,7 +95,7 @@ public class GrpcServer extends Server {
.addService(grpc) .addService(grpc)
.build(); .build();
server.start(); server.start();
LOG.info("GRPC RocksDB server is listening at " + http2Host + ":" + http2Port); LOG.info("GRPC RocksDB server is listening at " + socketAddress);
} }
private final class GrpcServerImpl extends RocksDBServiceImplBase { private final class GrpcServerImpl extends RocksDBServiceImplBase {

View File

@ -1,13 +1,11 @@
package it.cavallium.rockserver.core.server; package it.cavallium.rockserver.core.server;
import it.cavallium.rockserver.core.client.ClientBuilder; import io.netty.channel.unix.DomainSocketAddress;
import it.cavallium.rockserver.core.client.EmbeddedConnection;
import it.cavallium.rockserver.core.client.RocksDBConnection; import it.cavallium.rockserver.core.client.RocksDBConnection;
import it.cavallium.rockserver.core.common.Utils.HostAndPort; import it.cavallium.rockserver.core.common.Utils.HostAndPort;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.UnixDomainSocketAddress; import java.net.UnixDomainSocketAddress;
import java.nio.file.Path;
public class ServerBuilder { public class ServerBuilder {
@ -42,10 +40,10 @@ public class ServerBuilder {
if (useThrift) { if (useThrift) {
return new ThriftServer(client, http2Address.host(), http2Address.port()); return new ThriftServer(client, http2Address.host(), http2Address.port());
} else { } else {
return new GrpcServer(client, http2Address.host(), http2Address.port()); return new GrpcServer(client, new InetSocketAddress(http2Address.host(), http2Address.port()));
} }
} else if (unixAddress != null) { } else if (unixAddress != null) {
throw new UnsupportedOperationException("Not implemented: unix socket"); return new GrpcServer(client, new DomainSocketAddress(unixAddress.getPath().toFile()));
} else if (iNetAddress != null) { } else if (iNetAddress != null) {
throw new UnsupportedOperationException("Not implemented: inet address"); throw new UnsupportedOperationException("Not implemented: inet address");
} else { } else {

View File

@ -29,6 +29,7 @@ module rockserver.core {
requires io.netty.codec.http2; requires io.netty.codec.http2;
requires io.netty.transport.classes.epoll; requires io.netty.transport.classes.epoll;
requires org.reactivestreams; requires org.reactivestreams;
requires io.netty.transport.unix.common;
exports it.cavallium.rockserver.core.client; exports it.cavallium.rockserver.core.client;
exports it.cavallium.rockserver.core.common; exports it.cavallium.rockserver.core.common;