From a14f8e1f0ee7b66a59b06bc5f6c52221c88eea49 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Wed, 9 Nov 2022 18:56:28 +0100 Subject: [PATCH] Allow the use of local sockets --- pom.xml | 2 +- .../tdlight/reactiveapi/ChannelFactory.java | 5 +- .../reactiveapi/RSocketParameters.java | 66 +++++++------------ .../tdlight/reactiveapi/TransportFactory.java | 20 ++++++ .../reactiveapi/TransportFactoryLocal.java | 55 ++++++++++++++++ .../reactiveapi/TransportFactoryTcp.java | 56 ++++++++++++++++ .../reactiveapi/rsocket/MyRSocketClient.java | 8 ++- .../reactiveapi/rsocket/MyRSocketServer.java | 36 ++++++++-- .../tdlight/reactiveapi/test/TestChannel.java | 9 ++- .../tdlight/reactiveapi/test/TestRSocket.java | 28 ++++---- src/test/java/module-info.java | 1 + 11 files changed, 216 insertions(+), 70 deletions(-) create mode 100644 src/main/java/it/tdlight/reactiveapi/TransportFactory.java create mode 100644 src/main/java/it/tdlight/reactiveapi/TransportFactoryLocal.java create mode 100644 src/main/java/it/tdlight/reactiveapi/TransportFactoryTcp.java diff --git a/pom.xml b/pom.xml index 301338f..4a46e53 100644 --- a/pom.xml +++ b/pom.xml @@ -77,7 +77,7 @@ it.cavallium filequeue - 3.1.2 + 3.1.3 org.ow2.asm diff --git a/src/main/java/it/tdlight/reactiveapi/ChannelFactory.java b/src/main/java/it/tdlight/reactiveapi/ChannelFactory.java index 7111e9d..f2448c7 100644 --- a/src/main/java/it/tdlight/reactiveapi/ChannelFactory.java +++ b/src/main/java/it/tdlight/reactiveapi/ChannelFactory.java @@ -1,6 +1,5 @@ package it.tdlight.reactiveapi; -import io.rsocket.core.RSocketClient; import it.tdlight.reactiveapi.kafka.KafkaConsumer; import it.tdlight.reactiveapi.kafka.KafkaProducer; import it.tdlight.reactiveapi.rsocket.MyRSocketClient; @@ -52,9 +51,9 @@ public interface ChannelFactory { public RSocketChannelFactory(RSocketParameters channelsParameters) { this.channelsParameters = channelsParameters; if (channelsParameters.isClient()) { - this.manager = new MyRSocketClient(channelsParameters.baseHost()); + this.manager = new MyRSocketClient(channelsParameters.transportFactory()); } else { - this.manager = new MyRSocketServer(channelsParameters.baseHost()); + this.manager = new MyRSocketServer(channelsParameters.transportFactory()); } } diff --git a/src/main/java/it/tdlight/reactiveapi/RSocketParameters.java b/src/main/java/it/tdlight/reactiveapi/RSocketParameters.java index 520713f..ba8ca0f 100644 --- a/src/main/java/it/tdlight/reactiveapi/RSocketParameters.java +++ b/src/main/java/it/tdlight/reactiveapi/RSocketParameters.java @@ -5,17 +5,24 @@ import java.util.LinkedHashSet; import java.util.List; import java.util.Objects; import java.util.Set; -import org.jetbrains.annotations.Nullable; +import java.util.StringJoiner; public final class RSocketParameters implements ChannelsParameters { private final boolean client; - private final HostAndPort host; + private final TransportFactory transportFactory; private final List lanes; public RSocketParameters(boolean client, String host, List lanes) { this.client = client; - this.host = HostAndPort.fromString(host); + var hostAndPort = HostAndPort.fromString(host); + this.transportFactory = TransportFactory.tcp(hostAndPort); + this.lanes = lanes; + } + + public RSocketParameters(boolean client, TransportFactory transportFactory, List lanes) { + this.client = client; + this.transportFactory = transportFactory; this.lanes = lanes; } @@ -31,58 +38,35 @@ public final class RSocketParameters implements ChannelsParameters { return client; } - public HostAndPort baseHost() { - return host; - } - - public HostAndPort channelHost(String channelName) { - return switch (channelName) { - case "request" -> HostAndPort.fromParts(host.getHost(), host.getPort()); - case "response" -> HostAndPort.fromParts(host.getHost(), host.getPort() + 1); - default -> { - if (channelName.startsWith("event-")) { - var lane = channelName.substring("event-".length()); - int index; - if (lane.equals("main")) { - index = 0; - } else { - index = lanes.indexOf(lane); - if (index < 0) { - throw new IllegalArgumentException("Unknown lane: " + lane); - } - index++; - } - yield HostAndPort.fromParts(host.getHost(), host.getPort() + 2 + index); - } else { - throw new IllegalArgumentException("Unknown channel: " + channelName); - } - } - }; + public TransportFactory transportFactory() { + return transportFactory; } @Override - public boolean equals(Object obj) { - if (obj == this) { + public boolean equals(Object o) { + if (this == o) { return true; } - if (obj == null || obj.getClass() != this.getClass()) { + if (o == null || getClass() != o.getClass()) { return false; } - var that = (RSocketParameters) obj; - return Objects.equals(this.client, that.client) && Objects.equals( - this.host, - that.host) && Objects.equals(this.lanes, that.lanes); + RSocketParameters that = (RSocketParameters) o; + return client == that.client && Objects.equals(transportFactory, that.transportFactory) && Objects.equals(lanes, + that.lanes + ); } @Override public int hashCode() { - return Objects.hash(client, host, lanes); + return Objects.hash(client, transportFactory, lanes); } @Override public String toString() { - return "RSocketParameters[client=" + client + ", " + "host=" + host + ", " - + "lanes=" + lanes + ']'; + return new StringJoiner(", ", RSocketParameters.class.getSimpleName() + "[", "]") + .add("client=" + client) + .add("transportFactory=" + transportFactory) + .add("lanes=" + lanes) + .toString(); } - } diff --git a/src/main/java/it/tdlight/reactiveapi/TransportFactory.java b/src/main/java/it/tdlight/reactiveapi/TransportFactory.java new file mode 100644 index 0000000..21cafe0 --- /dev/null +++ b/src/main/java/it/tdlight/reactiveapi/TransportFactory.java @@ -0,0 +1,20 @@ +package it.tdlight.reactiveapi; + +import com.google.common.net.HostAndPort; +import io.rsocket.transport.ClientTransport; +import io.rsocket.transport.ServerTransport; + +public interface TransportFactory { + + ClientTransport getClientTransport(int index); + + ServerTransport getServerTransport(int index); + + static TransportFactory tcp(HostAndPort baseHost) { + return new TransportFactoryTcp(baseHost); + } + + static TransportFactory local(String prefix) { + return new TransportFactoryLocal(prefix); + } +} diff --git a/src/main/java/it/tdlight/reactiveapi/TransportFactoryLocal.java b/src/main/java/it/tdlight/reactiveapi/TransportFactoryLocal.java new file mode 100644 index 0000000..1fdcc28 --- /dev/null +++ b/src/main/java/it/tdlight/reactiveapi/TransportFactoryLocal.java @@ -0,0 +1,55 @@ +package it.tdlight.reactiveapi; + +import io.rsocket.transport.ClientTransport; +import io.rsocket.transport.ServerTransport; +import io.rsocket.transport.local.LocalClientTransport; +import io.rsocket.transport.local.LocalServerTransport; +import java.util.Objects; +import java.util.StringJoiner; + +class TransportFactoryLocal implements TransportFactory { + + private final String prefix; + + TransportFactoryLocal(String prefix) { + this.prefix = prefix; + } + + @Override + public ClientTransport getClientTransport(int index) { + return LocalClientTransport.create(getLabel(index)); + } + + @Override + public ServerTransport getServerTransport(int index) { + return LocalServerTransport.create(getLabel(index)); + } + + private String getLabel(int index) { + return prefix + "-" + index; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + TransportFactoryLocal that = (TransportFactoryLocal) o; + return Objects.equals(prefix, that.prefix); + } + + @Override + public int hashCode() { + return Objects.hash(prefix); + } + + @Override + public String toString() { + return new StringJoiner(", ", TransportFactoryLocal.class.getSimpleName() + "[", "]") + .add("prefix='" + prefix + "'") + .toString(); + } +} diff --git a/src/main/java/it/tdlight/reactiveapi/TransportFactoryTcp.java b/src/main/java/it/tdlight/reactiveapi/TransportFactoryTcp.java new file mode 100644 index 0000000..bc050a8 --- /dev/null +++ b/src/main/java/it/tdlight/reactiveapi/TransportFactoryTcp.java @@ -0,0 +1,56 @@ +package it.tdlight.reactiveapi; + +import com.google.common.net.HostAndPort; +import io.rsocket.transport.ClientTransport; +import io.rsocket.transport.ServerTransport; +import io.rsocket.transport.netty.client.TcpClientTransport; +import io.rsocket.transport.netty.server.TcpServerTransport; +import java.util.Objects; +import java.util.StringJoiner; + +class TransportFactoryTcp implements TransportFactory { + + private final HostAndPort baseHost; + + TransportFactoryTcp(HostAndPort baseHost) { + this.baseHost = baseHost; + } + + @Override + public ClientTransport getClientTransport(int index) { + return TcpClientTransport.create(baseHost.getHost(), getPort(index)); + } + + @Override + public ServerTransport getServerTransport(int index) { + return TcpServerTransport.create(baseHost.getHost(), getPort(index)); + } + + private int getPort(int index) { + return baseHost.getPort() + index; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + TransportFactoryTcp that = (TransportFactoryTcp) o; + return Objects.equals(baseHost, that.baseHost); + } + + @Override + public int hashCode() { + return Objects.hash(baseHost); + } + + @Override + public String toString() { + return new StringJoiner(", ", TransportFactoryTcp.class.getSimpleName() + "[", "]") + .add("baseHost=" + baseHost) + .toString(); + } +} diff --git a/src/main/java/it/tdlight/reactiveapi/rsocket/MyRSocketClient.java b/src/main/java/it/tdlight/reactiveapi/rsocket/MyRSocketClient.java index b8796f4..f297b7d 100644 --- a/src/main/java/it/tdlight/reactiveapi/rsocket/MyRSocketClient.java +++ b/src/main/java/it/tdlight/reactiveapi/rsocket/MyRSocketClient.java @@ -7,6 +7,7 @@ import io.rsocket.RSocket; import io.rsocket.SocketAcceptor; import io.rsocket.core.RSocketConnector; import io.rsocket.frame.decoder.PayloadDecoder; +import io.rsocket.transport.ClientTransport; import io.rsocket.transport.netty.client.TcpClientTransport; import io.rsocket.util.DefaultPayload; import it.tdlight.reactiveapi.ChannelCodec; @@ -16,6 +17,7 @@ import it.tdlight.reactiveapi.EventProducer; import it.tdlight.reactiveapi.Serializer; import it.tdlight.reactiveapi.SimpleEventProducer; import it.tdlight.reactiveapi.Timestamped; +import it.tdlight.reactiveapi.TransportFactory; import java.time.Duration; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -37,12 +39,14 @@ public class MyRSocketClient implements RSocketChannelManager { private final Empty disposeRequest = Sinks.empty(); public MyRSocketClient(HostAndPort baseHost) { - var transport = TcpClientTransport.create(baseHost.getHost(), baseHost.getPort()); + this(TransportFactory.tcp(baseHost)); + } + public MyRSocketClient(TransportFactory transportFactory) { this.nextClient = RSocketConnector.create() .setupPayload(DefaultPayload.create("client", "setup-info")) .payloadDecoder(PayloadDecoder.ZERO_COPY) - .connect(transport) + .connect(transportFactory.getClientTransport(0)) .doOnNext(lastClient::set) .cacheInvalidateIf(RSocket::isDisposed); } diff --git a/src/main/java/it/tdlight/reactiveapi/rsocket/MyRSocketServer.java b/src/main/java/it/tdlight/reactiveapi/rsocket/MyRSocketServer.java index 439cff7..8648b12 100644 --- a/src/main/java/it/tdlight/reactiveapi/rsocket/MyRSocketServer.java +++ b/src/main/java/it/tdlight/reactiveapi/rsocket/MyRSocketServer.java @@ -3,12 +3,14 @@ package it.tdlight.reactiveapi.rsocket; import static reactor.util.concurrent.Queues.XS_BUFFER_SIZE; import com.google.common.net.HostAndPort; +import io.rsocket.Closeable; import io.rsocket.ConnectionSetupPayload; import io.rsocket.Payload; import io.rsocket.RSocket; import io.rsocket.SocketAcceptor; import io.rsocket.core.RSocketServer; import io.rsocket.frame.decoder.PayloadDecoder; +import io.rsocket.transport.ServerTransport; import io.rsocket.transport.netty.server.CloseableChannel; import io.rsocket.transport.netty.server.TcpServerTransport; import io.rsocket.util.DefaultPayload; @@ -18,6 +20,7 @@ import it.tdlight.reactiveapi.EventConsumer; import it.tdlight.reactiveapi.EventProducer; import it.tdlight.reactiveapi.Serializer; import it.tdlight.reactiveapi.Timestamped; +import it.tdlight.reactiveapi.TransportFactory; import java.io.IOException; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -35,19 +38,27 @@ public class MyRSocketServer implements RSocketChannelManager, RSocket { private final Logger logger = LogManager.getLogger(this.getClass()); private final int bufferSize; - private final Mono serverCloseable; + private final Mono serverCloseable; protected final Map> consumerRegistry = new ConcurrentHashMap<>(); protected final Map> producerRegistry = new ConcurrentHashMap<>(); public MyRSocketServer(HostAndPort baseHost) { - this(baseHost, XS_BUFFER_SIZE); + this(TransportFactory.tcp(baseHost)); } public MyRSocketServer(HostAndPort baseHost, int bufferSize) { + this(TransportFactory.tcp(baseHost), bufferSize); + } + + public MyRSocketServer(TransportFactory transportFactory) { + this(transportFactory, XS_BUFFER_SIZE); + } + + public MyRSocketServer(TransportFactory transportFactory, int bufferSize) { this.bufferSize = bufferSize; - var serverMono = RSocketServer + Mono serverMono = RSocketServer .create(new SocketAcceptor() { @Override public @NotNull Mono accept(@NotNull ConnectionSetupPayload setup, @NotNull RSocket sendingSocket) { @@ -59,9 +70,20 @@ public class MyRSocketServer implements RSocketChannelManager, RSocket { } }) .payloadDecoder(PayloadDecoder.ZERO_COPY) - .bind(TcpServerTransport.create(baseHost.getHost(), baseHost.getPort())) + .bind(transportFactory.getServerTransport(0)) + .cast(Object.class) .doOnNext(d -> logger.debug("Server up")) - .cacheInvalidateIf(CloseableChannel::isDisposed); + .cacheInvalidateIf(t -> t instanceof Closeable closeableChannel && closeableChannel.isDisposed()) + .filter(t -> t instanceof Closeable) + .cast(Closeable.class) + .defaultIfEmpty(new Closeable() { + @Override + public void dispose() {} + @Override + public @NotNull Mono onClose() { + return Mono.empty(); + } + }); serverMono.subscribeOn(Schedulers.parallel()).subscribe(v -> {}, ex -> logger.warn("Failed to bind server")); @@ -158,13 +180,13 @@ public class MyRSocketServer implements RSocketChannelManager, RSocket { @Override public @NotNull Mono onClose() { - return Mono.when(serverCloseable.flatMap(CloseableChannel::onClose)); + return Mono.when(serverCloseable.flatMap(Closeable::onClose)); } @Override public void dispose() { serverCloseable - .doOnNext(CloseableChannel::dispose) + .doOnNext(Closeable::dispose) .subscribeOn(Schedulers.parallel()) .subscribe(v -> {}, ex -> logger.error("Failed to dispose the server", ex)); } diff --git a/src/test/java/it/tdlight/reactiveapi/test/TestChannel.java b/src/test/java/it/tdlight/reactiveapi/test/TestChannel.java index 4ea8434..3de9bf2 100644 --- a/src/test/java/it/tdlight/reactiveapi/test/TestChannel.java +++ b/src/test/java/it/tdlight/reactiveapi/test/TestChannel.java @@ -1,5 +1,7 @@ package it.tdlight.reactiveapi.test; +import com.google.common.net.HostAndPort; +import io.rsocket.transport.local.LocalClientTransport; import it.tdlight.reactiveapi.ChannelCodec; import it.tdlight.reactiveapi.ChannelFactory; import it.tdlight.reactiveapi.ChannelFactory.RSocketChannelFactory; @@ -7,6 +9,7 @@ import it.tdlight.reactiveapi.EventConsumer; import it.tdlight.reactiveapi.EventProducer; import it.tdlight.reactiveapi.RSocketParameters; import it.tdlight.reactiveapi.Timestamped; +import it.tdlight.reactiveapi.TransportFactory; import it.unimi.dsi.fastutil.ints.IntArrayList; import java.io.Closeable; import java.time.Duration; @@ -31,6 +34,8 @@ import reactor.util.retry.Retry; public abstract class TestChannel { + private static final TransportFactory TEST_TRANSPORT_FACTORY = TransportFactory.local("local"); + private static final Logger LOG = LogManager.getLogger(TestChannel.class); protected ChannelFactory channelFactory; @@ -43,8 +48,8 @@ public abstract class TestChannel { @BeforeEach public void beforeEach() { - var consumerFactory = new RSocketChannelFactory(new RSocketParameters(isConsumerClient(), "127.0.0.1:25689", List.of())); - var producerFactory = new RSocketChannelFactory(new RSocketParameters(!isConsumerClient(), "127.0.0.1:25689", List.of())); + var consumerFactory = new RSocketChannelFactory(new RSocketParameters(isConsumerClient(), TEST_TRANSPORT_FACTORY, List.of())); + var producerFactory = new RSocketChannelFactory(new RSocketParameters(!isConsumerClient(), TEST_TRANSPORT_FACTORY, List.of())); closeables.offer(consumerFactory); closeables.offer(producerFactory); diff --git a/src/test/java/it/tdlight/reactiveapi/test/TestRSocket.java b/src/test/java/it/tdlight/reactiveapi/test/TestRSocket.java index 52a860f..14efdea 100644 --- a/src/test/java/it/tdlight/reactiveapi/test/TestRSocket.java +++ b/src/test/java/it/tdlight/reactiveapi/test/TestRSocket.java @@ -7,33 +7,33 @@ import io.rsocket.SocketAcceptor; import io.rsocket.core.RSocketConnector; import io.rsocket.core.RSocketServer; import io.rsocket.frame.decoder.PayloadDecoder; -import io.rsocket.transport.ClientTransport; -import io.rsocket.transport.netty.client.TcpClientTransport; +import io.rsocket.transport.local.LocalClientTransport; import io.rsocket.transport.netty.server.TcpServerTransport; import io.rsocket.util.DefaultPayload; import it.tdlight.reactiveapi.ChannelCodec; import it.tdlight.reactiveapi.EventConsumer; import it.tdlight.reactiveapi.EventProducer; import it.tdlight.reactiveapi.Timestamped; +import it.tdlight.reactiveapi.TransportFactory; import it.tdlight.reactiveapi.rsocket.MyRSocketClient; import it.tdlight.reactiveapi.rsocket.MyRSocketServer; import java.time.Duration; import java.util.List; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import reactor.core.publisher.Flux; -import reactor.core.publisher.Hooks; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; public class TestRSocket { + private static final TransportFactory TEST_TRANSPORT_FACTORY = TransportFactory.tcp(HostAndPort.fromParts("127.0.0.1", 8085)); + @Test public void testClientOnClose() { Assertions.assertThrows(IllegalStateException.class, () -> { - var client = new MyRSocketClient(HostAndPort.fromParts("127.0.0.1", 8085)); + var client = new MyRSocketClient(TEST_TRANSPORT_FACTORY); try { client.onClose().block(Duration.ofSeconds(1)); } finally { @@ -44,12 +44,12 @@ public class TestRSocket { @Test public void testServerConsumer() { - var server = new MyRSocketServer(HostAndPort.fromParts("127.0.0.1", 8085), 1); + var server = new MyRSocketServer(TEST_TRANSPORT_FACTORY, 1); try { var rawClient = RSocketConnector.create() .setupPayload(DefaultPayload.create("client", "setup-info")) .payloadDecoder(PayloadDecoder.ZERO_COPY) - .connect(TcpClientTransport.create("127.0.0.1", 8085)) + .connect(TEST_TRANSPORT_FACTORY.getClientTransport(0)) .block(Duration.ofSeconds(5)); Assertions.assertNotNull(rawClient); var outputSequence = Flux.just("a", "b", "c").map(DefaultPayload::create); @@ -73,12 +73,12 @@ public class TestRSocket { @Test public void testServerProducer() { - var server = new MyRSocketServer(HostAndPort.fromParts("127.0.0.1", 8085), 1); + var server = new MyRSocketServer(TEST_TRANSPORT_FACTORY, 1); try { var rawClient = RSocketConnector.create() .setupPayload(DefaultPayload.create("client", "setup-info")) .payloadDecoder(PayloadDecoder.ZERO_COPY) - .connect(TcpClientTransport.create("127.0.0.1", 8085)) + .connect(TEST_TRANSPORT_FACTORY.getClientTransport(0)) .block(Duration.ofSeconds(5)); Assertions.assertNotNull(rawClient); EventProducer producer = server.registerProducer(ChannelCodec.UTF8_TEST, "test"); @@ -104,7 +104,7 @@ public class TestRSocket { @Test public void testClientConsumer() { - var client = new MyRSocketClient(HostAndPort.fromParts("127.0.0.1", 8085)); + var client = new MyRSocketClient(TEST_TRANSPORT_FACTORY); try { var rawServer = RSocketServer.create(SocketAcceptor.forRequestStream(payload -> { var metadata = payload.getMetadataUtf8(); @@ -114,7 +114,7 @@ public class TestRSocket { return Flux.just("a", "b", "c").map(DefaultPayload::create); })) .payloadDecoder(PayloadDecoder.ZERO_COPY) - .bindNow(TcpServerTransport.create("127.0.0.1", 8085)); + .bindNow(TEST_TRANSPORT_FACTORY.getServerTransport(0)); try { var events = client .registerConsumer(ChannelCodec.UTF8_TEST, "test") @@ -153,9 +153,9 @@ public class TestRSocket { } }))) .payloadDecoder(PayloadDecoder.ZERO_COPY) - .bindNow(TcpServerTransport.create("127.0.0.1", 8085)); + .bindNow(TEST_TRANSPORT_FACTORY.getServerTransport(0)); try { - var client = new MyRSocketClient(HostAndPort.fromParts("127.0.0.1", 8085)); + var client = new MyRSocketClient(TEST_TRANSPORT_FACTORY); try { client .registerProducer(ChannelCodec.UTF8_TEST, "test") @@ -173,7 +173,7 @@ public class TestRSocket { @Test public void testServerOnClose() { Assertions.assertThrows(IllegalStateException.class, () -> { - var server = new MyRSocketServer(HostAndPort.fromParts("127.0.0.1", 8085), 1); + var server = new MyRSocketServer(TEST_TRANSPORT_FACTORY, 1); try { server.onClose().block(Duration.ofSeconds(1)); } finally { diff --git a/src/test/java/module-info.java b/src/test/java/module-info.java index 027cc39..1320a60 100644 --- a/src/test/java/module-info.java +++ b/src/test/java/module-info.java @@ -19,4 +19,5 @@ module tdlib.reactive.api.test { requires reactor.netty.core; requires org.apache.logging.log4j; requires filequeue; + requires rsocket.transport.local; } \ No newline at end of file