Allow the use of local sockets

This commit is contained in:
Andrea Cavalli 2022-11-09 18:56:28 +01:00
parent e5712361e0
commit a14f8e1f0e
11 changed files with 216 additions and 70 deletions

View File

@ -77,7 +77,7 @@
<dependency>
<groupId>it.cavallium</groupId>
<artifactId>filequeue</artifactId>
<version>3.1.2</version>
<version>3.1.3</version>
<exclusions>
<exclusion>
<groupId>org.ow2.asm</groupId>

View File

@ -1,6 +1,5 @@
package it.tdlight.reactiveapi;
import io.rsocket.core.RSocketClient;
import it.tdlight.reactiveapi.kafka.KafkaConsumer;
import it.tdlight.reactiveapi.kafka.KafkaProducer;
import it.tdlight.reactiveapi.rsocket.MyRSocketClient;
@ -52,9 +51,9 @@ public interface ChannelFactory {
public RSocketChannelFactory(RSocketParameters channelsParameters) {
this.channelsParameters = channelsParameters;
if (channelsParameters.isClient()) {
this.manager = new MyRSocketClient(channelsParameters.baseHost());
this.manager = new MyRSocketClient(channelsParameters.transportFactory());
} else {
this.manager = new MyRSocketServer(channelsParameters.baseHost());
this.manager = new MyRSocketServer(channelsParameters.transportFactory());
}
}

View File

@ -5,17 +5,24 @@ import java.util.LinkedHashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import org.jetbrains.annotations.Nullable;
import java.util.StringJoiner;
public final class RSocketParameters implements ChannelsParameters {
private final boolean client;
private final HostAndPort host;
private final TransportFactory transportFactory;
private final List<String> lanes;
public RSocketParameters(boolean client, String host, List<String> lanes) {
this.client = client;
this.host = HostAndPort.fromString(host);
var hostAndPort = HostAndPort.fromString(host);
this.transportFactory = TransportFactory.tcp(hostAndPort);
this.lanes = lanes;
}
public RSocketParameters(boolean client, TransportFactory transportFactory, List<String> lanes) {
this.client = client;
this.transportFactory = transportFactory;
this.lanes = lanes;
}
@ -31,58 +38,35 @@ public final class RSocketParameters implements ChannelsParameters {
return client;
}
public HostAndPort baseHost() {
return host;
}
public HostAndPort channelHost(String channelName) {
return switch (channelName) {
case "request" -> HostAndPort.fromParts(host.getHost(), host.getPort());
case "response" -> HostAndPort.fromParts(host.getHost(), host.getPort() + 1);
default -> {
if (channelName.startsWith("event-")) {
var lane = channelName.substring("event-".length());
int index;
if (lane.equals("main")) {
index = 0;
} else {
index = lanes.indexOf(lane);
if (index < 0) {
throw new IllegalArgumentException("Unknown lane: " + lane);
}
index++;
}
yield HostAndPort.fromParts(host.getHost(), host.getPort() + 2 + index);
} else {
throw new IllegalArgumentException("Unknown channel: " + channelName);
}
}
};
public TransportFactory transportFactory() {
return transportFactory;
}
@Override
public boolean equals(Object obj) {
if (obj == this) {
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (obj == null || obj.getClass() != this.getClass()) {
if (o == null || getClass() != o.getClass()) {
return false;
}
var that = (RSocketParameters) obj;
return Objects.equals(this.client, that.client) && Objects.equals(
this.host,
that.host) && Objects.equals(this.lanes, that.lanes);
RSocketParameters that = (RSocketParameters) o;
return client == that.client && Objects.equals(transportFactory, that.transportFactory) && Objects.equals(lanes,
that.lanes
);
}
@Override
public int hashCode() {
return Objects.hash(client, host, lanes);
return Objects.hash(client, transportFactory, lanes);
}
@Override
public String toString() {
return "RSocketParameters[client=" + client + ", " + "host=" + host + ", "
+ "lanes=" + lanes + ']';
return new StringJoiner(", ", RSocketParameters.class.getSimpleName() + "[", "]")
.add("client=" + client)
.add("transportFactory=" + transportFactory)
.add("lanes=" + lanes)
.toString();
}
}

View File

@ -0,0 +1,20 @@
package it.tdlight.reactiveapi;
import com.google.common.net.HostAndPort;
import io.rsocket.transport.ClientTransport;
import io.rsocket.transport.ServerTransport;
public interface TransportFactory {
ClientTransport getClientTransport(int index);
ServerTransport<?> getServerTransport(int index);
static TransportFactory tcp(HostAndPort baseHost) {
return new TransportFactoryTcp(baseHost);
}
static TransportFactory local(String prefix) {
return new TransportFactoryLocal(prefix);
}
}

View File

@ -0,0 +1,55 @@
package it.tdlight.reactiveapi;
import io.rsocket.transport.ClientTransport;
import io.rsocket.transport.ServerTransport;
import io.rsocket.transport.local.LocalClientTransport;
import io.rsocket.transport.local.LocalServerTransport;
import java.util.Objects;
import java.util.StringJoiner;
class TransportFactoryLocal implements TransportFactory {
private final String prefix;
TransportFactoryLocal(String prefix) {
this.prefix = prefix;
}
@Override
public ClientTransport getClientTransport(int index) {
return LocalClientTransport.create(getLabel(index));
}
@Override
public ServerTransport<?> getServerTransport(int index) {
return LocalServerTransport.create(getLabel(index));
}
private String getLabel(int index) {
return prefix + "-" + index;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
TransportFactoryLocal that = (TransportFactoryLocal) o;
return Objects.equals(prefix, that.prefix);
}
@Override
public int hashCode() {
return Objects.hash(prefix);
}
@Override
public String toString() {
return new StringJoiner(", ", TransportFactoryLocal.class.getSimpleName() + "[", "]")
.add("prefix='" + prefix + "'")
.toString();
}
}

View File

@ -0,0 +1,56 @@
package it.tdlight.reactiveapi;
import com.google.common.net.HostAndPort;
import io.rsocket.transport.ClientTransport;
import io.rsocket.transport.ServerTransport;
import io.rsocket.transport.netty.client.TcpClientTransport;
import io.rsocket.transport.netty.server.TcpServerTransport;
import java.util.Objects;
import java.util.StringJoiner;
class TransportFactoryTcp implements TransportFactory {
private final HostAndPort baseHost;
TransportFactoryTcp(HostAndPort baseHost) {
this.baseHost = baseHost;
}
@Override
public ClientTransport getClientTransport(int index) {
return TcpClientTransport.create(baseHost.getHost(), getPort(index));
}
@Override
public ServerTransport<?> getServerTransport(int index) {
return TcpServerTransport.create(baseHost.getHost(), getPort(index));
}
private int getPort(int index) {
return baseHost.getPort() + index;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
TransportFactoryTcp that = (TransportFactoryTcp) o;
return Objects.equals(baseHost, that.baseHost);
}
@Override
public int hashCode() {
return Objects.hash(baseHost);
}
@Override
public String toString() {
return new StringJoiner(", ", TransportFactoryTcp.class.getSimpleName() + "[", "]")
.add("baseHost=" + baseHost)
.toString();
}
}

View File

@ -7,6 +7,7 @@ import io.rsocket.RSocket;
import io.rsocket.SocketAcceptor;
import io.rsocket.core.RSocketConnector;
import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.transport.ClientTransport;
import io.rsocket.transport.netty.client.TcpClientTransport;
import io.rsocket.util.DefaultPayload;
import it.tdlight.reactiveapi.ChannelCodec;
@ -16,6 +17,7 @@ import it.tdlight.reactiveapi.EventProducer;
import it.tdlight.reactiveapi.Serializer;
import it.tdlight.reactiveapi.SimpleEventProducer;
import it.tdlight.reactiveapi.Timestamped;
import it.tdlight.reactiveapi.TransportFactory;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@ -37,12 +39,14 @@ public class MyRSocketClient implements RSocketChannelManager {
private final Empty<Void> disposeRequest = Sinks.empty();
public MyRSocketClient(HostAndPort baseHost) {
var transport = TcpClientTransport.create(baseHost.getHost(), baseHost.getPort());
this(TransportFactory.tcp(baseHost));
}
public MyRSocketClient(TransportFactory transportFactory) {
this.nextClient = RSocketConnector.create()
.setupPayload(DefaultPayload.create("client", "setup-info"))
.payloadDecoder(PayloadDecoder.ZERO_COPY)
.connect(transport)
.connect(transportFactory.getClientTransport(0))
.doOnNext(lastClient::set)
.cacheInvalidateIf(RSocket::isDisposed);
}

View File

@ -3,12 +3,14 @@ package it.tdlight.reactiveapi.rsocket;
import static reactor.util.concurrent.Queues.XS_BUFFER_SIZE;
import com.google.common.net.HostAndPort;
import io.rsocket.Closeable;
import io.rsocket.ConnectionSetupPayload;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.SocketAcceptor;
import io.rsocket.core.RSocketServer;
import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.transport.ServerTransport;
import io.rsocket.transport.netty.server.CloseableChannel;
import io.rsocket.transport.netty.server.TcpServerTransport;
import io.rsocket.util.DefaultPayload;
@ -18,6 +20,7 @@ import it.tdlight.reactiveapi.EventConsumer;
import it.tdlight.reactiveapi.EventProducer;
import it.tdlight.reactiveapi.Serializer;
import it.tdlight.reactiveapi.Timestamped;
import it.tdlight.reactiveapi.TransportFactory;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@ -35,19 +38,27 @@ public class MyRSocketServer implements RSocketChannelManager, RSocket {
private final Logger logger = LogManager.getLogger(this.getClass());
private final int bufferSize;
private final Mono<CloseableChannel> serverCloseable;
private final Mono<Closeable> serverCloseable;
protected final Map<String, ConsumerConnection<?>> consumerRegistry = new ConcurrentHashMap<>();
protected final Map<String, ProducerConnection<?>> producerRegistry = new ConcurrentHashMap<>();
public MyRSocketServer(HostAndPort baseHost) {
this(baseHost, XS_BUFFER_SIZE);
this(TransportFactory.tcp(baseHost));
}
public MyRSocketServer(HostAndPort baseHost, int bufferSize) {
this(TransportFactory.tcp(baseHost), bufferSize);
}
public MyRSocketServer(TransportFactory transportFactory) {
this(transportFactory, XS_BUFFER_SIZE);
}
public MyRSocketServer(TransportFactory transportFactory, int bufferSize) {
this.bufferSize = bufferSize;
var serverMono = RSocketServer
Mono<Closeable> serverMono = RSocketServer
.create(new SocketAcceptor() {
@Override
public @NotNull Mono<RSocket> accept(@NotNull ConnectionSetupPayload setup, @NotNull RSocket sendingSocket) {
@ -59,9 +70,20 @@ public class MyRSocketServer implements RSocketChannelManager, RSocket {
}
})
.payloadDecoder(PayloadDecoder.ZERO_COPY)
.bind(TcpServerTransport.create(baseHost.getHost(), baseHost.getPort()))
.bind(transportFactory.getServerTransport(0))
.cast(Object.class)
.doOnNext(d -> logger.debug("Server up"))
.cacheInvalidateIf(CloseableChannel::isDisposed);
.cacheInvalidateIf(t -> t instanceof Closeable closeableChannel && closeableChannel.isDisposed())
.filter(t -> t instanceof Closeable)
.cast(Closeable.class)
.defaultIfEmpty(new Closeable() {
@Override
public void dispose() {}
@Override
public @NotNull Mono<Void> onClose() {
return Mono.empty();
}
});
serverMono.subscribeOn(Schedulers.parallel()).subscribe(v -> {}, ex -> logger.warn("Failed to bind server"));
@ -158,13 +180,13 @@ public class MyRSocketServer implements RSocketChannelManager, RSocket {
@Override
public @NotNull Mono<Void> onClose() {
return Mono.when(serverCloseable.flatMap(CloseableChannel::onClose));
return Mono.when(serverCloseable.flatMap(Closeable::onClose));
}
@Override
public void dispose() {
serverCloseable
.doOnNext(CloseableChannel::dispose)
.doOnNext(Closeable::dispose)
.subscribeOn(Schedulers.parallel())
.subscribe(v -> {}, ex -> logger.error("Failed to dispose the server", ex));
}

View File

@ -1,5 +1,7 @@
package it.tdlight.reactiveapi.test;
import com.google.common.net.HostAndPort;
import io.rsocket.transport.local.LocalClientTransport;
import it.tdlight.reactiveapi.ChannelCodec;
import it.tdlight.reactiveapi.ChannelFactory;
import it.tdlight.reactiveapi.ChannelFactory.RSocketChannelFactory;
@ -7,6 +9,7 @@ import it.tdlight.reactiveapi.EventConsumer;
import it.tdlight.reactiveapi.EventProducer;
import it.tdlight.reactiveapi.RSocketParameters;
import it.tdlight.reactiveapi.Timestamped;
import it.tdlight.reactiveapi.TransportFactory;
import it.unimi.dsi.fastutil.ints.IntArrayList;
import java.io.Closeable;
import java.time.Duration;
@ -31,6 +34,8 @@ import reactor.util.retry.Retry;
public abstract class TestChannel {
private static final TransportFactory TEST_TRANSPORT_FACTORY = TransportFactory.local("local");
private static final Logger LOG = LogManager.getLogger(TestChannel.class);
protected ChannelFactory channelFactory;
@ -43,8 +48,8 @@ public abstract class TestChannel {
@BeforeEach
public void beforeEach() {
var consumerFactory = new RSocketChannelFactory(new RSocketParameters(isConsumerClient(), "127.0.0.1:25689", List.of()));
var producerFactory = new RSocketChannelFactory(new RSocketParameters(!isConsumerClient(), "127.0.0.1:25689", List.of()));
var consumerFactory = new RSocketChannelFactory(new RSocketParameters(isConsumerClient(), TEST_TRANSPORT_FACTORY, List.of()));
var producerFactory = new RSocketChannelFactory(new RSocketParameters(!isConsumerClient(), TEST_TRANSPORT_FACTORY, List.of()));
closeables.offer(consumerFactory);
closeables.offer(producerFactory);

View File

@ -7,33 +7,33 @@ import io.rsocket.SocketAcceptor;
import io.rsocket.core.RSocketConnector;
import io.rsocket.core.RSocketServer;
import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.transport.ClientTransport;
import io.rsocket.transport.netty.client.TcpClientTransport;
import io.rsocket.transport.local.LocalClientTransport;
import io.rsocket.transport.netty.server.TcpServerTransport;
import io.rsocket.util.DefaultPayload;
import it.tdlight.reactiveapi.ChannelCodec;
import it.tdlight.reactiveapi.EventConsumer;
import it.tdlight.reactiveapi.EventProducer;
import it.tdlight.reactiveapi.Timestamped;
import it.tdlight.reactiveapi.TransportFactory;
import it.tdlight.reactiveapi.rsocket.MyRSocketClient;
import it.tdlight.reactiveapi.rsocket.MyRSocketServer;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Hooks;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
public class TestRSocket {
private static final TransportFactory TEST_TRANSPORT_FACTORY = TransportFactory.tcp(HostAndPort.fromParts("127.0.0.1", 8085));
@Test
public void testClientOnClose() {
Assertions.assertThrows(IllegalStateException.class, () -> {
var client = new MyRSocketClient(HostAndPort.fromParts("127.0.0.1", 8085));
var client = new MyRSocketClient(TEST_TRANSPORT_FACTORY);
try {
client.onClose().block(Duration.ofSeconds(1));
} finally {
@ -44,12 +44,12 @@ public class TestRSocket {
@Test
public void testServerConsumer() {
var server = new MyRSocketServer(HostAndPort.fromParts("127.0.0.1", 8085), 1);
var server = new MyRSocketServer(TEST_TRANSPORT_FACTORY, 1);
try {
var rawClient = RSocketConnector.create()
.setupPayload(DefaultPayload.create("client", "setup-info"))
.payloadDecoder(PayloadDecoder.ZERO_COPY)
.connect(TcpClientTransport.create("127.0.0.1", 8085))
.connect(TEST_TRANSPORT_FACTORY.getClientTransport(0))
.block(Duration.ofSeconds(5));
Assertions.assertNotNull(rawClient);
var outputSequence = Flux.just("a", "b", "c").map(DefaultPayload::create);
@ -73,12 +73,12 @@ public class TestRSocket {
@Test
public void testServerProducer() {
var server = new MyRSocketServer(HostAndPort.fromParts("127.0.0.1", 8085), 1);
var server = new MyRSocketServer(TEST_TRANSPORT_FACTORY, 1);
try {
var rawClient = RSocketConnector.create()
.setupPayload(DefaultPayload.create("client", "setup-info"))
.payloadDecoder(PayloadDecoder.ZERO_COPY)
.connect(TcpClientTransport.create("127.0.0.1", 8085))
.connect(TEST_TRANSPORT_FACTORY.getClientTransport(0))
.block(Duration.ofSeconds(5));
Assertions.assertNotNull(rawClient);
EventProducer<String> producer = server.registerProducer(ChannelCodec.UTF8_TEST, "test");
@ -104,7 +104,7 @@ public class TestRSocket {
@Test
public void testClientConsumer() {
var client = new MyRSocketClient(HostAndPort.fromParts("127.0.0.1", 8085));
var client = new MyRSocketClient(TEST_TRANSPORT_FACTORY);
try {
var rawServer = RSocketServer.create(SocketAcceptor.forRequestStream(payload -> {
var metadata = payload.getMetadataUtf8();
@ -114,7 +114,7 @@ public class TestRSocket {
return Flux.just("a", "b", "c").map(DefaultPayload::create);
}))
.payloadDecoder(PayloadDecoder.ZERO_COPY)
.bindNow(TcpServerTransport.create("127.0.0.1", 8085));
.bindNow(TEST_TRANSPORT_FACTORY.getServerTransport(0));
try {
var events = client
.<String>registerConsumer(ChannelCodec.UTF8_TEST, "test")
@ -153,9 +153,9 @@ public class TestRSocket {
}
})))
.payloadDecoder(PayloadDecoder.ZERO_COPY)
.bindNow(TcpServerTransport.create("127.0.0.1", 8085));
.bindNow(TEST_TRANSPORT_FACTORY.getServerTransport(0));
try {
var client = new MyRSocketClient(HostAndPort.fromParts("127.0.0.1", 8085));
var client = new MyRSocketClient(TEST_TRANSPORT_FACTORY);
try {
client
.<String>registerProducer(ChannelCodec.UTF8_TEST, "test")
@ -173,7 +173,7 @@ public class TestRSocket {
@Test
public void testServerOnClose() {
Assertions.assertThrows(IllegalStateException.class, () -> {
var server = new MyRSocketServer(HostAndPort.fromParts("127.0.0.1", 8085), 1);
var server = new MyRSocketServer(TEST_TRANSPORT_FACTORY, 1);
try {
server.onClose().block(Duration.ofSeconds(1));
} finally {

View File

@ -19,4 +19,5 @@ module tdlib.reactive.api.test {
requires reactor.netty.core;
requires org.apache.logging.log4j;
requires filequeue;
requires rsocket.transport.local;
}