diff --git a/src/main/java/it/tdlight/reactiveapi/ChannelFactory.java b/src/main/java/it/tdlight/reactiveapi/ChannelFactory.java index cc77481..e4f58d7 100644 --- a/src/main/java/it/tdlight/reactiveapi/ChannelFactory.java +++ b/src/main/java/it/tdlight/reactiveapi/ChannelFactory.java @@ -53,9 +53,9 @@ public interface ChannelFactory { String channelName) { var socketParameters = (RSocketParameters) channelsParameters; if (socketParameters.isClient()) { - return new RSocketConsumeAsClient<>(socketParameters.host(), channelCodec, channelName); + return new RSocketConsumeAsClient<>(socketParameters.channelHost(channelName), channelCodec, channelName); } else { - return new RSocketConsumeAsServer<>(socketParameters.host(), channelCodec, channelName); + return new RSocketConsumeAsServer<>(socketParameters.channelHost(channelName), channelCodec, channelName); } } @@ -65,9 +65,9 @@ public interface ChannelFactory { String channelName) { var socketParameters = (RSocketParameters) channelsParameters; if (socketParameters.isClient()) { - return new RSocketProduceAsServer<>(socketParameters.host(), channelCodec, channelName); + return new RSocketProduceAsClient<>(socketParameters.channelHost(channelName), channelCodec, channelName); } else { - return new RSocketProduceAsClient<>(socketParameters.host(), channelCodec, channelName); + return new RSocketProduceAsServer<>(socketParameters.channelHost(channelName), channelCodec, channelName); } } } diff --git a/src/main/java/it/tdlight/reactiveapi/ClientsSharedTdlib.java b/src/main/java/it/tdlight/reactiveapi/ClientsSharedTdlib.java index a4578bd..ef2457a 100644 --- a/src/main/java/it/tdlight/reactiveapi/ClientsSharedTdlib.java +++ b/src/main/java/it/tdlight/reactiveapi/ClientsSharedTdlib.java @@ -33,11 +33,12 @@ public class ClientsSharedTdlib implements Closeable { public ClientsSharedTdlib(TdlibChannelsClients tdClientsChannels) { this.tdClientsChannels = tdClientsChannels; - this.responses = tdClientsChannels.response().consumeMessages(); + this.responses = tdClientsChannels.response().consumeMessages().repeat(); this.events = tdClientsChannels.events().entrySet().stream() - .collect(Collectors.toUnmodifiableMap(Entry::getKey, e -> e.getValue().consumeMessages())); + .collect(Collectors.toUnmodifiableMap(Entry::getKey, e -> e.getValue().consumeMessages().repeat())); this.requestsSub = tdClientsChannels.request() .sendMessages(requests.asFlux()) + .repeat() .subscribeOn(Schedulers.parallel()) .subscribe(); } diff --git a/src/main/java/it/tdlight/reactiveapi/RSocketParameters.java b/src/main/java/it/tdlight/reactiveapi/RSocketParameters.java index 94bcd3e..44322c8 100644 --- a/src/main/java/it/tdlight/reactiveapi/RSocketParameters.java +++ b/src/main/java/it/tdlight/reactiveapi/RSocketParameters.java @@ -31,10 +31,35 @@ public final class RSocketParameters implements ChannelsParameters { return client; } - public HostAndPort host() { + public HostAndPort baseHost() { return host; } + public HostAndPort channelHost(String channelName) { + return switch (channelName) { + case "request" -> HostAndPort.fromParts(host.getHost(), host.getPort()); + case "response" -> HostAndPort.fromParts(host.getHost(), host.getPort() + 1); + default -> { + if (channelName.startsWith("event-")) { + var lane = channelName.substring("event-".length()); + int index; + if (lane.equals("main")) { + index = 0; + } else { + index = lanes.indexOf(lane); + if (index < 0) { + throw new IllegalArgumentException("Unknown lane: " + lane); + } + index++; + } + yield HostAndPort.fromParts(host.getHost(), host.getPort() + 2 + index); + } else { + throw new IllegalArgumentException("Unknown channel: " + channelName); + } + } + }; + } + @Override public boolean equals(Object obj) { if (obj == this) { diff --git a/src/main/java/it/tdlight/reactiveapi/TdlibChannelsSharedServer.java b/src/main/java/it/tdlight/reactiveapi/TdlibChannelsSharedServer.java index 7f62399..2113353 100644 --- a/src/main/java/it/tdlight/reactiveapi/TdlibChannelsSharedServer.java +++ b/src/main/java/it/tdlight/reactiveapi/TdlibChannelsSharedServer.java @@ -29,9 +29,10 @@ public class TdlibChannelsSharedServer implements Closeable { this.tdServersChannels = tdServersChannels; this.responsesSub = tdServersChannels.response() .sendMessages(responses.asFlux().log("responses", Level.FINEST, SignalType.ON_NEXT)) + .repeat() .subscribeOn(Schedulers.parallel()) .subscribe(); - this.requests = tdServersChannels.request().consumeMessages(); + this.requests = tdServersChannels.request().consumeMessages().repeat(); } public Flux>> requests() { @@ -43,6 +44,7 @@ public class TdlibChannelsSharedServer implements Closeable { public Disposable events(String lane, Flux eventFlux) { return tdServersChannels.events(lane) .sendMessages(eventFlux) + .repeat() .subscribeOn(Schedulers.parallel()) .subscribe(); } diff --git a/src/main/java/it/tdlight/reactiveapi/rsocket/RSocketConsumeAsClient.java b/src/main/java/it/tdlight/reactiveapi/rsocket/RSocketConsumeAsClient.java index 799ebf9..55c53aa 100644 --- a/src/main/java/it/tdlight/reactiveapi/rsocket/RSocketConsumeAsClient.java +++ b/src/main/java/it/tdlight/reactiveapi/rsocket/RSocketConsumeAsClient.java @@ -13,13 +13,17 @@ import it.tdlight.reactiveapi.ChannelCodec; import it.tdlight.reactiveapi.EventConsumer; import it.tdlight.reactiveapi.RSocketParameters; import it.tdlight.reactiveapi.Timestamped; +import java.nio.channels.ClosedChannelException; +import java.time.Duration; import java.util.logging.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.jetbrains.annotations.NotNull; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.publisher.SignalType; import reactor.core.scheduler.Schedulers; +import reactor.util.retry.Retry; public class RSocketConsumeAsClient implements EventConsumer { @@ -52,9 +56,14 @@ public class RSocketConsumeAsClient implements EventConsumer { var deserializer = channelCodec.getNewDeserializer(); return RSocketConnector.create() - .resume(new Resume()) + //.resume(new Resume()) .payloadDecoder(PayloadDecoder.ZERO_COPY) .connect(TcpClientTransport.create(host.getHost(), host.getPort())) + .retryWhen(Retry + .backoff(Long.MAX_VALUE, Duration.ofSeconds(1)) + .maxBackoff(Duration.ofSeconds(16)) + .jitter(1.0) + .doBeforeRetry(rs -> LOG.warn("Failed to bind, retrying. {}", rs))) .flatMapMany(socket -> socket .requestStream(DefaultPayload.create("", "consume")) .map(payload -> { @@ -64,12 +73,22 @@ public class RSocketConsumeAsClient implements EventConsumer { //noinspection unchecked return new Timestamped(System.currentTimeMillis(), (T) deserializer.deserialize(null, data)); }) - .doFinally(signalType -> socket - .fireAndForget(DefaultPayload.create("", "close")) - .then(socket.onClose()) - .doFinally(s -> socket.dispose()) - .subscribeOn(Schedulers.parallel()) - .subscribe())) + .doFinally(signalType -> { + socket + .fireAndForget(DefaultPayload.create("", "close")) + .then(socket.onClose().timeout(Duration.ofSeconds(5), Mono.empty())) + .doFinally(s -> socket.dispose()) + .onErrorResume(ex -> Mono.empty()) + .subscribeOn(Schedulers.parallel()) + .subscribe(); + }) + ) + .retryWhen(Retry + .backoff(Long.MAX_VALUE, Duration.ofSeconds(1)) + .filter(ex -> ex instanceof ClosedChannelException) + .maxBackoff(Duration.ofSeconds(16)) + .jitter(1.0) + .doBeforeRetry(rs -> LOG.warn("Failed to communicate, retrying. {}", rs))) .log("RSOCKET_CONSUMER_CLIENT", Level.FINE); } } diff --git a/src/main/java/it/tdlight/reactiveapi/rsocket/RSocketConsumeAsServer.java b/src/main/java/it/tdlight/reactiveapi/rsocket/RSocketConsumeAsServer.java index 3543b0c..437c09a 100644 --- a/src/main/java/it/tdlight/reactiveapi/rsocket/RSocketConsumeAsServer.java +++ b/src/main/java/it/tdlight/reactiveapi/rsocket/RSocketConsumeAsServer.java @@ -18,6 +18,7 @@ import it.tdlight.reactiveapi.ChannelCodec; import it.tdlight.reactiveapi.EventConsumer; import it.tdlight.reactiveapi.RSocketParameters; import it.tdlight.reactiveapi.Timestamped; +import java.nio.channels.ClosedChannelException; import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.concurrent.ConcurrentLinkedDeque; @@ -30,6 +31,7 @@ import org.apache.logging.log4j.Logger; import org.jetbrains.annotations.NotNull; import org.reactivestreams.Subscription; import reactor.core.CoreSubscriber; +import reactor.core.Disposable; import reactor.core.publisher.BaseSubscriber; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -73,7 +75,7 @@ public class RSocketConsumeAsServer implements EventConsumer { return Mono .>>>create(sink -> { AtomicReference serverRef = new AtomicReference<>(); - var server = RSocketServer + var disposable = RSocketServer .create((setup, in) -> { var inRawFlux = in.requestStream(DefaultPayload.create("", "consume")); var inFlux = inRawFlux.map(payload -> { @@ -87,16 +89,30 @@ public class RSocketConsumeAsServer implements EventConsumer { return Mono.just(new RSocket() {}); }) .payloadDecoder(PayloadDecoder.ZERO_COPY) - .resume(new Resume()) - .bindNow(TcpServerTransport.create(host.getHost(), host.getPort())); - serverRef.set(server); - sink.onCancel(server); + //.resume(new Resume()) + .bind(TcpServerTransport.create(host.getHost(), host.getPort())) + .retryWhen(Retry + .backoff(Long.MAX_VALUE, Duration.ofSeconds(1)) + .maxBackoff(Duration.ofSeconds(16)) + .jitter(1.0) + .doBeforeRetry(rs -> LOG.warn("Failed to bind, retrying. {}", rs))) + .subscribe(server -> { + serverRef.set(server); + sink.onCancel(server); + }); + sink.onDispose(disposable); }) .subscribeOn(Schedulers.boundedElastic()) .flatMapMany(t -> t.getT3().doFinally(s -> { t.getT2().dispose(); t.getT1().dispose(); })) + .retryWhen(Retry + .backoff(Long.MAX_VALUE, Duration.ofSeconds(1)) + .filter(ex -> ex instanceof ClosedChannelException) + .maxBackoff(Duration.ofSeconds(16)) + .jitter(1.0) + .doBeforeRetry(rs -> LOG.warn("Failed to communicate, retrying. {}", rs))) .log("RSOCKET_CONSUMER_SERVER", Level.FINE); } /*return Flux.defer(() -> { diff --git a/src/main/java/it/tdlight/reactiveapi/rsocket/RSocketProduceAsClient.java b/src/main/java/it/tdlight/reactiveapi/rsocket/RSocketProduceAsClient.java index 19308c5..e34e3e1 100644 --- a/src/main/java/it/tdlight/reactiveapi/rsocket/RSocketProduceAsClient.java +++ b/src/main/java/it/tdlight/reactiveapi/rsocket/RSocketProduceAsClient.java @@ -16,6 +16,7 @@ import it.tdlight.reactiveapi.EventProducer; import it.tdlight.reactiveapi.RSocketParameters; import it.tdlight.reactiveapi.ReactorUtils; import it.tdlight.reactiveapi.Timestamped; +import java.nio.channels.ClosedChannelException; import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.logging.Level; @@ -66,9 +67,21 @@ public final class RSocketProduceAsClient implements EventProducer { .payloadDecoder(PayloadDecoder.ZERO_COPY) .setupPayload(DefaultPayload.create("", "connect")) .acceptor(SocketAcceptor.forRequestStream(payload -> serializedEventsFlux)) + //.resume(new Resume()) .connect(TcpClientTransport.create(host.getHost(), host.getPort())) + .retryWhen(Retry + .backoff(Long.MAX_VALUE, Duration.ofSeconds(1)) + .maxBackoff(Duration.ofSeconds(16)) + .jitter(1.0) + .doBeforeRetry(rs -> LOG.warn("Failed to bind, retrying. {}", rs))) .flatMap(rSocket -> rSocket.onClose() .takeUntilOther(closeRequest.asMono().doFinally(s -> rSocket.dispose()))) + .retryWhen(Retry + .backoff(Long.MAX_VALUE, Duration.ofSeconds(1)) + .filter(ex -> ex instanceof ClosedChannelException) + .maxBackoff(Duration.ofSeconds(16)) + .jitter(1.0) + .doBeforeRetry(rs -> LOG.warn("Failed to communicate, retrying. {}", rs))) .log("RSOCKET_PRODUCER_CLIENT_Y", Level.FINE); } diff --git a/src/main/java/it/tdlight/reactiveapi/rsocket/RSocketProduceAsServer.java b/src/main/java/it/tdlight/reactiveapi/rsocket/RSocketProduceAsServer.java index 6da58da..b9dd7c1 100644 --- a/src/main/java/it/tdlight/reactiveapi/rsocket/RSocketProduceAsServer.java +++ b/src/main/java/it/tdlight/reactiveapi/rsocket/RSocketProduceAsServer.java @@ -15,7 +15,9 @@ import io.rsocket.util.DefaultPayload; import it.tdlight.reactiveapi.ChannelCodec; import it.tdlight.reactiveapi.EventProducer; import it.tdlight.reactiveapi.ReactorUtils; +import java.nio.channels.ClosedChannelException; import java.nio.charset.StandardCharsets; +import java.time.Duration; import java.util.concurrent.atomic.AtomicReference; import java.util.logging.Level; import org.apache.kafka.common.serialization.Serializer; @@ -26,6 +28,7 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.Sinks; import reactor.core.publisher.Sinks.Empty; +import reactor.util.retry.Retry; public final class RSocketProduceAsServer implements EventProducer { @@ -84,12 +87,23 @@ public final class RSocketProduceAsServer implements EventProducer { }); } }) - .resume(new Resume()) + //.resume(new Resume()) .payloadDecoder(PayloadDecoder.ZERO_COPY) .bind(TcpServerTransport.create(host.getHost(), host.getPort())) + .retryWhen(Retry + .backoff(Long.MAX_VALUE, Duration.ofSeconds(1)) + .maxBackoff(Duration.ofSeconds(16)) + .jitter(1.0) + .doBeforeRetry(rs -> LOG.warn("Failed to bind, retrying. {}", rs))) .doOnNext(serverRef::set) .flatMap(closeableChannel -> closeableChannel.onClose() - .takeUntilOther(closeRequest.asMono().doFinally(s -> closeableChannel.dispose()))); + .takeUntilOther(closeRequest.asMono().doFinally(s -> closeableChannel.dispose()))) + .retryWhen(Retry + .backoff(Long.MAX_VALUE, Duration.ofSeconds(1)) + .filter(ex -> ex instanceof ClosedChannelException) + .maxBackoff(Duration.ofSeconds(16)) + .jitter(1.0) + .doBeforeRetry(rs -> LOG.warn("Failed to communicate, retrying. {}", rs))); }); } diff --git a/src/test/java/it/tdlight/reactiveapi/test/TestChannel.java b/src/test/java/it/tdlight/reactiveapi/test/TestChannel.java index 84b5936..906cd7a 100644 --- a/src/test/java/it/tdlight/reactiveapi/test/TestChannel.java +++ b/src/test/java/it/tdlight/reactiveapi/test/TestChannel.java @@ -177,7 +177,6 @@ public abstract class TestChannel { .map(Integer::parseUnsignedInt) .take(10, true) .collect(Collectors.toCollection(IntArrayList::new)); - var receiverWait = Flux.empty().delaySubscription(Duration.ofSeconds(4)); var receiver2 = consumer .consumeMessages() .limitRate(1) @@ -187,7 +186,7 @@ public abstract class TestChannel { Flux part1 = Flux .merge((isServerSender() ? sender : receiver1), isServerSender() ? receiver1 : sender); Flux part2 = Flux - .merge((isServerSender() ? sender : receiver2), receiverWait.then(isServerSender() ? receiver2 : sender)); + .merge((isServerSender() ? sender : receiver2), isServerSender() ? receiver2 : sender); var response = Flux.concat(part1, part2).reduce((a, b) -> { a.addAll(b); return a;