diff --git a/src/main/java/it/tdlight/reactiveapi/BaseAtomixReactiveApiClient.java b/src/main/java/it/tdlight/reactiveapi/BaseAtomixReactiveApiClient.java index 0cba26a..7860437 100644 --- a/src/main/java/it/tdlight/reactiveapi/BaseAtomixReactiveApiClient.java +++ b/src/main/java/it/tdlight/reactiveapi/BaseAtomixReactiveApiClient.java @@ -27,6 +27,7 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; import org.apache.kafka.common.errors.SerializationException; import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; @@ -46,7 +47,7 @@ abstract class BaseAtomixReactiveApiClient implements ReactiveApiMultiClient { // Temporary id used to make requests private final long clientId; - private final Many> requests; + private final Consumer> requests; private final Map>>> responses = new ConcurrentHashMap<>(); private final AtomicLong requestId = new AtomicLong(0); @@ -54,7 +55,7 @@ abstract class BaseAtomixReactiveApiClient implements ReactiveApiMultiClient { public BaseAtomixReactiveApiClient(TdlibChannelsSharedReceive sharedTdlibClients) { this.clientId = System.nanoTime(); - this.requests = sharedTdlibClients.requests(); + this.requests = sharedTdlibClients::emitRequest; this.subscription = sharedTdlibClients.responses().doOnNext(response -> { var responseSink = responses.get(response.data().requestId()); @@ -63,7 +64,8 @@ abstract class BaseAtomixReactiveApiClient implements ReactiveApiMultiClient { return; } responseSink.complete(response); - }).subscribeOn(Schedulers.parallel()).subscribe(); + }).subscribeOn(Schedulers.parallel()) + .subscribe(v -> {}, ex -> LOG.error("Reactive api client responses flux has failed unexpectedly!", ex)); } @Override @@ -100,9 +102,7 @@ abstract class BaseAtomixReactiveApiClient implements ReactiveApiMultiClient { } }) .doFinally(s -> this.responses.remove(requestId)); - synchronized (requests) { - requests.emitNext(new Request<>(userId, clientId, requestId, request, timeout), EmitFailureHandler.FAIL_FAST); - } + requests.accept(new Request<>(userId, clientId, requestId, request, timeout)); return response; }); } diff --git a/src/main/java/it/tdlight/reactiveapi/FutureEventConsumer.java b/src/main/java/it/tdlight/reactiveapi/FutureEventConsumer.java deleted file mode 100644 index 93a2488..0000000 --- a/src/main/java/it/tdlight/reactiveapi/FutureEventConsumer.java +++ /dev/null @@ -1,19 +0,0 @@ -package it.tdlight.reactiveapi; - -import java.util.function.Function; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; - -public class FutureEventConsumer implements EventConsumer { - - private final Mono> future; - - public FutureEventConsumer(Mono> future) { - this.future = future.cache(); - } - - @Override - public Flux> consumeMessages() { - return future.flatMapMany(EventConsumer::consumeMessages); - } -} diff --git a/src/main/java/it/tdlight/reactiveapi/FutureEventProducer.java b/src/main/java/it/tdlight/reactiveapi/FutureEventProducer.java deleted file mode 100644 index 98ab837..0000000 --- a/src/main/java/it/tdlight/reactiveapi/FutureEventProducer.java +++ /dev/null @@ -1,25 +0,0 @@ -package it.tdlight.reactiveapi; - -import java.util.function.Function; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; -import reactor.core.scheduler.Schedulers; - -public class FutureEventProducer implements EventProducer { - - private final Mono> future; - - public FutureEventProducer(Mono> future) { - this.future = future.cache(); - } - - @Override - public Mono sendMessages(Flux eventsFlux) { - return future.flatMap(ep -> ep.sendMessages(eventsFlux)); - } - - @Override - public void close() { - future.doOnNext(EventProducer::close).subscribeOn(Schedulers.parallel()).subscribe(); - } -} diff --git a/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java b/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java index 41b6671..1f66219 100644 --- a/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java +++ b/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java @@ -198,7 +198,7 @@ public abstract class ReactiveApiPublisher { .then(Mono.empty()) ) .subscribeOn(Schedulers.parallel()) - .subscribe(); + .subscribe(v -> {}, ex -> LOG.error("Resulting events flux has failed unexpectedly! (1)", ex)); var messagesToSend = publishedResultingEvents // Obtain only client-bound events @@ -229,7 +229,7 @@ public abstract class ReactiveApiPublisher { } else { LOG.error("Unknown cluster-bound event: {}", clusterBoundEvent); } - }); + }, ex -> LOG.error("Resulting events flux has failed unexpectedly! (2)", ex)); var prev = this.disposable.getAndSet(publishedResultingEvents.connect()); diff --git a/src/main/java/it/tdlight/reactiveapi/SimpleEventProducer.java b/src/main/java/it/tdlight/reactiveapi/SimpleEventProducer.java index dac3a34..fd81583 100644 --- a/src/main/java/it/tdlight/reactiveapi/SimpleEventProducer.java +++ b/src/main/java/it/tdlight/reactiveapi/SimpleEventProducer.java @@ -1,6 +1,9 @@ package it.tdlight.reactiveapi; import java.time.Duration; +import java.util.concurrent.atomic.AtomicReference; +import org.reactivestreams.Subscription; +import reactor.core.Disposable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.Sinks; @@ -9,17 +12,20 @@ import reactor.core.publisher.Sinks.Empty; public abstract class SimpleEventProducer implements EventProducer { - private final Empty closeRequest = Sinks.empty(); + private AtomicReference closeRequest = new AtomicReference<>(); @Override public final Mono sendMessages(Flux eventsFlux) { - return handleSendMessages(eventsFlux.takeUntilOther(closeRequest.asMono())); + return handleSendMessages(eventsFlux).doOnSubscribe(s -> closeRequest.set(s)); } public abstract Mono handleSendMessages(Flux eventsFlux); @Override public final void close() { - closeRequest.emitEmpty(EmitFailureHandler.busyLooping(Duration.ofMillis(100))); + var s = closeRequest.get(); + if (s != null) { + s.cancel(); + } } } diff --git a/src/main/java/it/tdlight/reactiveapi/TdlibChannelsSharedHost.java b/src/main/java/it/tdlight/reactiveapi/TdlibChannelsSharedHost.java index af86e9a..a82d820 100644 --- a/src/main/java/it/tdlight/reactiveapi/TdlibChannelsSharedHost.java +++ b/src/main/java/it/tdlight/reactiveapi/TdlibChannelsSharedHost.java @@ -17,6 +17,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import reactor.core.Disposable; import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; import reactor.core.publisher.SignalType; import reactor.core.publisher.Sinks; import reactor.core.publisher.Sinks.EmitFailureHandler; @@ -30,43 +31,48 @@ import reactor.util.retry.RetryBackoffSpec; public class TdlibChannelsSharedHost implements Closeable { private static final Logger LOG = LogManager.getLogger(TdlibChannelsSharedHost.class); - private static final RetryBackoffSpec RETRY_STRATEGY = Retry + public static final RetryBackoffSpec RETRY_STRATEGY = Retry .backoff(Long.MAX_VALUE, Duration.ofSeconds(1)) .maxBackoff(Duration.ofSeconds(16)) .jitter(1.0) - .doBeforeRetry(signal -> LOG.warn("Retrying channel with signal {}", signal)); + .doBeforeRetry(signal -> LogManager.getLogger("Channels").warn("Retrying channel with signal {}", signal)); + + public static final Function, Flux> REPEAT_STRATEGY = n -> n + .doOnNext(i -> LogManager.getLogger("Channels").debug("Resubscribing to channel")) + .delayElements(Duration.ofSeconds(5)); private final TdlibChannelsServers tdServersChannels; private final Disposable responsesSub; private final AtomicReference requestsSub = new AtomicReference<>(); - private final Many> responses = Sinks.many().multicast().onBackpressureBuffer(65535); + private final Many> responses = Sinks.many().multicast().directAllOrNothing(); private final Map>> events; private final Flux>> requests; public TdlibChannelsSharedHost(Set allLanes, TdlibChannelsServers tdServersChannels) { this.tdServersChannels = tdServersChannels; - this.responsesSub = tdServersChannels.response() - .sendMessages(responses.asFlux().log("responses", Level.FINEST, SignalType.ON_NEXT)) - .repeatWhen(n -> n.delayElements(Duration.ofSeconds(5))) + this.responsesSub = Mono.defer(() -> tdServersChannels.response() + .sendMessages(responses.asFlux().log("responses", Level.FINE))) + .repeatWhen(REPEAT_STRATEGY) .retryWhen(RETRY_STRATEGY) .subscribeOn(Schedulers.parallel()) .subscribe(n -> {}, ex -> LOG.error("Unexpected error when sending responses", ex)); events = allLanes.stream().collect(Collectors.toUnmodifiableMap(Function.identity(), lane -> { Many> sink = Sinks.many().multicast().onBackpressureBuffer(65535); var outputEventsFlux = Flux - .merge(sink.asFlux().map(flux -> flux.subscribeOn(Schedulers.parallel())), Integer.MAX_VALUE) + .merge(sink.asFlux().cache().map(flux -> flux.publish().autoConnect().subscribeOn(Schedulers.parallel())), Integer.MAX_VALUE) .doFinally(s -> LOG.debug("Output events flux of lane \"{}\" terminated with signal {}", lane, s)); - tdServersChannels + Mono.defer(() -> tdServersChannels .events(lane) - .sendMessages(outputEventsFlux) - .repeatWhen(n -> n.delayElements(Duration.ofSeconds(5))) + .sendMessages(outputEventsFlux)) + .repeatWhen(REPEAT_STRATEGY) .retryWhen(RETRY_STRATEGY) .subscribeOn(Schedulers.parallel()) .subscribe(n -> {}, ex -> LOG.error("Unexpected error when sending events to lane {}", lane, ex)); return sink; })); this.requests = tdServersChannels.request().consumeMessages() - .repeatWhen(n -> n.delayElements(Duration.ofSeconds(5))) + .doFinally(s -> LOG.debug("Input requests consumer terminated with signal {}", s)) + .repeatWhen(REPEAT_STRATEGY) .retryWhen(RETRY_STRATEGY) .doOnError(ex -> LOG.error("Unexpected error when receiving requests", ex)) .doFinally(s -> LOG.debug("Input requests flux terminated with signal {}", s)); diff --git a/src/main/java/it/tdlight/reactiveapi/TdlibChannelsSharedReceive.java b/src/main/java/it/tdlight/reactiveapi/TdlibChannelsSharedReceive.java index 2917991..8ba7d82 100644 --- a/src/main/java/it/tdlight/reactiveapi/TdlibChannelsSharedReceive.java +++ b/src/main/java/it/tdlight/reactiveapi/TdlibChannelsSharedReceive.java @@ -1,5 +1,8 @@ package it.tdlight.reactiveapi; +import static it.tdlight.reactiveapi.TdlibChannelsSharedHost.REPEAT_STRATEGY; +import static it.tdlight.reactiveapi.TdlibChannelsSharedHost.RETRY_STRATEGY; + import it.tdlight.jni.TdApi.Object; import it.tdlight.reactiveapi.Event.ClientBoundEvent; import it.tdlight.reactiveapi.Event.OnRequest; @@ -9,6 +12,7 @@ import java.time.Duration; import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.atomic.AtomicReference; +import java.util.logging.Level; import java.util.stream.Collectors; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -27,45 +31,46 @@ public class TdlibChannelsSharedReceive implements Closeable { private static final Logger LOG = LogManager.getLogger(TdlibChannelsSharedReceive.class); - private static final RetryBackoffSpec RETRY_STRATEGY = Retry - .backoff(Long.MAX_VALUE, Duration.ofSeconds(1)) - .maxBackoff(Duration.ofSeconds(16)) - .jitter(1.0) - .doBeforeRetry(signal -> LOG.warn("Retrying channel with signal {}", signal)); - private final TdlibChannelsClients tdClientsChannels; private final AtomicReference responsesSub = new AtomicReference<>(); private final Disposable requestsSub; private final AtomicReference eventsSub = new AtomicReference<>(); private final Flux>> responses; private final Map>> events; - private final Many> requests = Sinks.many().multicast().onBackpressureBuffer(65535); + private final Many> requests = Sinks.many().multicast().directAllOrNothing(); public TdlibChannelsSharedReceive(TdlibChannelsClients tdClientsChannels) { this.tdClientsChannels = tdClientsChannels; - this.responses = tdClientsChannels - .response() - .consumeMessages() - .repeatWhen(n -> n.delayElements(Duration.ofSeconds(5))) + this.responses = Flux + .defer(() -> tdClientsChannels.response().consumeMessages()) + .log("responses", Level.FINE) + .repeatWhen(REPEAT_STRATEGY) .retryWhen(RETRY_STRATEGY) + .publish() + .autoConnect() .doFinally(s -> LOG.debug("Input responses flux terminated with signal {}", s)); this.events = tdClientsChannels.events().entrySet().stream() .collect(Collectors.toUnmodifiableMap(Entry::getKey, - e -> e - .getValue() - .consumeMessages() - .repeatWhen(n -> n.delayElements(Duration.ofSeconds(5))) + e -> Flux + .defer(() -> e.getValue().consumeMessages()) + .repeatWhen(REPEAT_STRATEGY) .retryWhen(RETRY_STRATEGY) .doFinally(s -> LOG.debug("Input events flux of lane \"{}\" terminated with signal {}", e.getKey(), s)) )); - var requestsFlux = Flux.defer(() -> requests.asFlux() - .doFinally(s -> LOG.debug("Output requests flux terminated with signal {}", s))); - this.requestsSub = tdClientsChannels.request() - .sendMessages(requestsFlux) - .repeatWhen(n -> n.delayElements(Duration.ofSeconds(5))) + this.requestsSub = tdClientsChannels + .request() + .sendMessages(Flux + .defer(() -> requests + .asFlux() + .doFinally(s -> LOG.debug("Output requests flux terminated with signal {}", s)))) + .doFinally(s -> LOG.debug("Output requests sender terminated with signal {}", s)) + .repeatWhen(REPEAT_STRATEGY) .retryWhen(RETRY_STRATEGY) .subscribeOn(Schedulers.parallel()) - .subscribe(n -> {}, ex -> requests.emitError(ex, EmitFailureHandler.busyLooping(Duration.ofMillis(100)))); + .subscribe(n -> {}, ex -> { + LOG.error("An error when handling requests killed the requests subscriber!", ex); + requests.emitError(ex, EmitFailureHandler.busyLooping(Duration.ofMillis(100))); + }); } public Flux>> responses() { @@ -84,8 +89,10 @@ public class TdlibChannelsSharedReceive implements Closeable { return events; } - public Many> requests() { - return requests; + public void emitRequest(OnRequest request) { + synchronized (requests) { + requests.emitNext(request, EmitFailureHandler.FAIL_FAST); + } } @Override diff --git a/src/main/java/it/tdlight/reactiveapi/rsocket/ConsumerConnection.java b/src/main/java/it/tdlight/reactiveapi/rsocket/ConsumerConnection.java index cc1b914..0fc58b6 100644 --- a/src/main/java/it/tdlight/reactiveapi/rsocket/ConsumerConnection.java +++ b/src/main/java/it/tdlight/reactiveapi/rsocket/ConsumerConnection.java @@ -3,88 +3,165 @@ package it.tdlight.reactiveapi.rsocket; import io.rsocket.Payload; import it.tdlight.reactiveapi.Timestamped; import java.time.Duration; +import java.util.Optional; import org.apache.kafka.common.serialization.Deserializer; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.publisher.SignalType; import reactor.core.publisher.Sinks; import reactor.core.publisher.Sinks.EmitFailureHandler; import reactor.core.publisher.Sinks.Empty; import reactor.core.scheduler.Schedulers; -class ConsumerConnection { +public class ConsumerConnection { - private Flux> remote; + private static final Logger LOG = LogManager.getLogger(ConsumerConnection.class); + + private final String channel; + private Flux remote; private Deserializer local; - private Empty connected = Sinks.empty(); - - private Empty remoteResult = Sinks.empty(); + private boolean connectedState = false; + private Empty connectedSink = Sinks.empty(); + private Optional localTerminationState = null; + private Empty localTerminationSink = Sinks.empty(); public ConsumerConnection(String channel) { + this.channel = channel; + if (LOG.isDebugEnabled()) LOG.debug("{} Create new blank connection", this.printStatus()); + } + private String printStatus() { + return "[\"%s\" (%d)%s%s%s]".formatted(channel, + System.identityHashCode(this), + local != null ? ", local" : "", + remote != null ? ", remote" : "", + connectedState ? ((localTerminationState != null) ? (localTerminationState.isPresent() ? ", done with error" : ", done") : ", connected") : ", waiting" + ); } public synchronized Flux> connectLocal() { - return connected.asMono().publishOn(Schedulers.parallel()).thenMany(Flux.defer(() -> { + if (LOG.isDebugEnabled()) LOG.debug("{} Local is asking to connect", this.printStatus()); + return Mono.defer(() -> { synchronized (ConsumerConnection.this) { - return remote; + return connectedSink.asMono(); } - })); + }).publishOn(Schedulers.parallel()).thenMany(Flux.defer(() -> { + synchronized (ConsumerConnection.this) { + if (LOG.isDebugEnabled()) LOG.debug("{} Local is connected", this.printStatus()); + return RSocketUtils.deserialize(remote, local) + .map(element -> new Timestamped<>(System.currentTimeMillis(), element)); + } + })).doOnError(ex -> { + synchronized (ConsumerConnection.this) { + if (remote != null && localTerminationState == null) { + localTerminationState = Optional.of(ex); + if (LOG.isDebugEnabled()) LOG.debug("%s Local connection ended with failure, emitting termination failure".formatted(this.printStatus()), ex); + var sink = localTerminationSink; + reset(true); + sink.emitError(ex, EmitFailureHandler.busyLooping(Duration.ofMillis(100))); + if (LOG.isDebugEnabled()) LOG.debug("%s Local connection ended with failure, emitted termination failure".formatted(this.printStatus())); + } + } + }).doFinally(s -> { + if (s != SignalType.ON_ERROR) { + synchronized (ConsumerConnection.this) { + if (remote != null && localTerminationState == null) { + assert connectedState; + localTerminationState = Optional.empty(); + LOG.debug("{} Remote connection ended with status {}, emitting termination complete", this::printStatus, () -> s); + if (s == SignalType.CANCEL) { + localTerminationSink.emitError(new CancelledChannelException(), EmitFailureHandler.busyLooping(Duration.ofMillis(100))); + } else { + localTerminationSink.emitEmpty(EmitFailureHandler.busyLooping(Duration.ofMillis(100))); + } + LOG.debug("{} Remote connection ended with status {}, emitted termination complete", this::printStatus, () -> s); + } + reset(false); + } + } + }); } public synchronized Mono connectRemote() { - return connected.asMono().publishOn(Schedulers.parallel()).then(Mono.defer(() -> { + if (LOG.isDebugEnabled()) LOG.debug("{} Remote is asking to connect", this.printStatus()); + return Mono.defer(() -> { synchronized (ConsumerConnection.this) { - return remoteResult.asMono(); + return connectedSink.asMono(); } - })); + }).publishOn(Schedulers.parallel()).then(Mono.defer(() -> { + synchronized (ConsumerConnection.this) { + if (LOG.isDebugEnabled()) LOG.debug("{} Remote is connected", this.printStatus()); + return localTerminationSink.asMono().publishOn(Schedulers.parallel()); + } + })).doFinally(s -> { + if (s != SignalType.ON_ERROR) { + synchronized (ConsumerConnection.this) { + //reset(true); + } + } + }); } - public synchronized void resetRemote() { - connected = Sinks.empty(); - remoteResult = Sinks.empty(); - remote = null; + public synchronized void reset(boolean resettingFromRemote) { + if (LOG.isDebugEnabled()) LOG.debug("{} Reset started", this.printStatus()); + if (connectedState) { + if (localTerminationState == null) { + if (LOG.isDebugEnabled()) LOG.debug("{} The previous connection is still marked as open but not terminated, interrupting it", this.printStatus()); + var ex = new InterruptedException(); + localTerminationState = Optional.of(ex); + localTerminationSink.emitError(ex, EmitFailureHandler.busyLooping(Duration.ofMillis(100))); + if (LOG.isDebugEnabled()) LOG.debug("{} The previous connection has been interrupted", this.printStatus()); + } + } else { + if (LOG.isDebugEnabled()) LOG.debug("{} The previous connection is still marked as waiting for a connection, interrupting it", this.printStatus()); + localTerminationState = Optional.empty(); + localTerminationSink.emitEmpty(EmitFailureHandler.busyLooping(Duration.ofMillis(100))); + if (LOG.isDebugEnabled()) LOG.debug("{} The previous connection has been interrupted", this.printStatus()); + } local = null; + remote = null; + connectedState = false; + connectedSink = Sinks.empty(); + localTerminationState = null; + localTerminationSink = Sinks.empty(); + if (LOG.isDebugEnabled()) LOG.debug("{} Reset ended", this.printStatus()); } public synchronized void registerRemote(Flux remote) { + if (LOG.isDebugEnabled()) LOG.debug("{} Remote is trying to register", this.printStatus()); if (this.remote != null) { + if (LOG.isDebugEnabled()) LOG.debug("{} Remote was already registered", this.printStatus()); throw new IllegalStateException("Remote is already registered"); } - this.remote = remote - .transformDeferred(flux -> { - synchronized (ConsumerConnection.this) { - assert local != null; - return RSocketUtils.deserialize(flux, local); - } - }) - .map(element -> new Timestamped<>(System.currentTimeMillis(), element)) - .doOnError(ex -> { - synchronized (ConsumerConnection.this) { - remoteResult.emitError(ex, EmitFailureHandler.busyLooping(Duration.ofMillis(100))); - } - }) - .doFinally(s -> { - synchronized (ConsumerConnection.this) { - remoteResult.emitEmpty(EmitFailureHandler.busyLooping(Duration.ofMillis(100))); - resetRemote(); - } - }); + this.remote = remote; + if (LOG.isDebugEnabled()) LOG.debug("{} Remote registered", this.printStatus()); onChanged(); } public synchronized void registerLocal(Deserializer local) { + if (LOG.isDebugEnabled()) LOG.debug("{} Local is trying to register", this.printStatus()); if (this.local != null) { + if (LOG.isDebugEnabled()) LOG.debug("{} Local was already registered", this.printStatus()); throw new IllegalStateException("Local is already registered"); } this.local = local; + if (LOG.isDebugEnabled()) LOG.debug("{} Local registered", this.printStatus()); onChanged(); } private synchronized void onChanged() { + if (LOG.isDebugEnabled()) LOG.debug("{} Checking connection changes", this.printStatus()); if (local != null && remote != null) { - connected.emitEmpty(EmitFailureHandler.busyLooping(Duration.ofMillis(100))); + connectedState = true; + if (LOG.isDebugEnabled()) LOG.debug("{} Connected successfully! Emitting connected event", this.printStatus()); + connectedSink.emitEmpty(EmitFailureHandler.busyLooping(Duration.ofMillis(100))); + if (LOG.isDebugEnabled()) LOG.debug("{} Connected successfully! Emitted connected event", this.printStatus()); + } else { + if (LOG.isDebugEnabled()) LOG.debug("{} Still not connected", this.printStatus()); } } } diff --git a/src/main/java/it/tdlight/reactiveapi/rsocket/MyRSocketClient.java b/src/main/java/it/tdlight/reactiveapi/rsocket/MyRSocketClient.java index 2995522..10b44ff 100644 --- a/src/main/java/it/tdlight/reactiveapi/rsocket/MyRSocketClient.java +++ b/src/main/java/it/tdlight/reactiveapi/rsocket/MyRSocketClient.java @@ -43,10 +43,10 @@ public class MyRSocketClient implements RSocketChannelManager { this.nextClient = RSocketConnector.create() //.setupPayload(DefaultPayload.create("client", "setup-info")) .payloadDecoder(PayloadDecoder.ZERO_COPY) - .reconnect(retryStrategy) + //.reconnect(retryStrategy) .connect(transport) .doOnNext(lastClient::set) - .cache(); + .cacheInvalidateIf(RSocket::isDisposed); } @Override @@ -70,9 +70,11 @@ public class MyRSocketClient implements RSocketChannelManager { @Override public Mono handleSendMessages(Flux eventsFlux) { - Flux rawFlux = eventsFlux.transform(flux -> RSocketUtils.serialize(flux, serializer)); - Flux combinedRawFlux = Flux.just(DefaultPayload.create(channelName, "channel")).concatWith(rawFlux); - return nextClient.flatMapMany(client -> client.requestChannel(combinedRawFlux)).take(1, true).then(); + return Mono.defer(() -> { + Flux rawFlux = eventsFlux.transform(flux -> RSocketUtils.serialize(flux, serializer)); + Flux combinedRawFlux = Flux.just(DefaultPayload.create(channelName, "channel")).concatWith(rawFlux); + return nextClient.flatMapMany(client -> client.requestChannel(combinedRawFlux).take(1, true)).then(); + }); } }; diff --git a/src/main/java/it/tdlight/reactiveapi/rsocket/MyRSocketServer.java b/src/main/java/it/tdlight/reactiveapi/rsocket/MyRSocketServer.java index fbc6c0a..39d2e4d 100644 --- a/src/main/java/it/tdlight/reactiveapi/rsocket/MyRSocketServer.java +++ b/src/main/java/it/tdlight/reactiveapi/rsocket/MyRSocketServer.java @@ -43,7 +43,7 @@ public class MyRSocketServer implements RSocketChannelManager, RSocket { .payloadDecoder(PayloadDecoder.ZERO_COPY) .bind(TcpServerTransport.create(baseHost.getHost(), baseHost.getPort())) .doOnNext(d -> logger.debug("Server up")) - .cache(); + .cacheInvalidateIf(CloseableChannel::isDisposed); serverMono.subscribeOn(Schedulers.parallel()).subscribe(v -> {}, ex -> logger.warn("Failed to bind server")); @@ -93,7 +93,7 @@ public class MyRSocketServer implements RSocketChannelManager, RSocket { return new EventConsumer() { @Override public Flux> consumeMessages() { - return Flux.defer(() -> { + return serverCloseable.flatMapMany(x -> { //noinspection unchecked var conn = (ConsumerConnection) consumerRegistry.computeIfAbsent(channelName, ConsumerConnection::new); conn.registerLocal(deserializer); @@ -116,7 +116,7 @@ public class MyRSocketServer implements RSocketChannelManager, RSocket { return new EventProducer() { @Override public Mono sendMessages(Flux eventsFlux) { - return Mono.defer(() -> { + return serverCloseable.flatMap(x -> { //noinspection unchecked var conn = (ProducerConnection) producerRegistry.computeIfAbsent(channelName, ProducerConnection::new); conn.registerLocal(eventsFlux.transform(flux -> RSocketUtils.serialize(flux, serializer))); diff --git a/src/main/java/it/tdlight/reactiveapi/rsocket/ProducerConnection.java b/src/main/java/it/tdlight/reactiveapi/rsocket/ProducerConnection.java index 7c48312..fee637c 100644 --- a/src/main/java/it/tdlight/reactiveapi/rsocket/ProducerConnection.java +++ b/src/main/java/it/tdlight/reactiveapi/rsocket/ProducerConnection.java @@ -2,80 +2,167 @@ package it.tdlight.reactiveapi.rsocket; import io.rsocket.Payload; import java.time.Duration; +import java.util.Optional; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.publisher.Signal; +import reactor.core.publisher.SignalType; import reactor.core.publisher.Sinks; import reactor.core.publisher.Sinks.EmitFailureHandler; +import reactor.core.publisher.Sinks.EmitResult; import reactor.core.publisher.Sinks.Empty; import reactor.core.scheduler.Schedulers; -class ProducerConnection { +public class ProducerConnection { + + private static final Logger LOG = LogManager.getLogger(ProducerConnection.class); + + private final String channel; private Object remote; private Flux local; - private Empty connected = Sinks.empty(); - - private Empty remoteResult = Sinks.empty(); + private boolean connectedState = false; + private Empty connectedSink = Sinks.empty(); + private Optional remoteTerminationState = null; + private Empty remoteTerminationSink = Sinks.empty(); public ProducerConnection(String channel) { + this.channel = channel; + if (LOG.isDebugEnabled()) LOG.debug("{} Create new blank connection", this.printStatus()); + } + private synchronized String printStatus() { + return "[\"%s\" (%d)%s%s%s]".formatted(channel, + System.identityHashCode(this), + local != null ? ", local" : "", + remote != null ? ", remote" : "", + connectedState ? ((remoteTerminationState != null) ? (remoteTerminationState.isPresent() ? ", done with error" : ", done") : ", connected") : ", waiting" + ); } public synchronized Mono connectLocal() { - return connected.asMono().publishOn(Schedulers.parallel()).then(Mono.defer(() -> { + if (LOG.isDebugEnabled()) LOG.debug("{} Local is asking to connect", this.printStatus()); + return Mono.defer(() -> { synchronized (ProducerConnection.this) { - return remoteResult.asMono().doFinally(r -> reset()); + return connectedSink.asMono(); } - })).doOnError(ex -> { + }).publishOn(Schedulers.parallel()).then(Mono.defer(() -> { synchronized (ProducerConnection.this) { - remoteResult.emitError(ex, EmitFailureHandler.busyLooping(Duration.ofMillis(100))); + if (LOG.isDebugEnabled()) LOG.debug("{} Local is connected", this.printStatus()); + return remoteTerminationSink.asMono().publishOn(Schedulers.parallel()); } - }).doFinally(ended -> reset()); + })).doFinally(s -> { + if (s != SignalType.ON_ERROR) { + synchronized (ProducerConnection.this) { + //reset(false); + } + } + }); } public synchronized Flux connectRemote() { - return connected.asMono().publishOn(Schedulers.parallel()).thenMany(Flux.defer(() -> { + if (LOG.isDebugEnabled()) LOG.debug("{} Remote is asking to connect", this.printStatus()); + return Mono.defer(() -> { synchronized (ProducerConnection.this) { + return connectedSink.asMono(); + } + }).publishOn(Schedulers.parallel()).thenMany(Flux.defer(() -> { + synchronized (ProducerConnection.this) { + if (LOG.isDebugEnabled()) LOG.debug("{} Remote is connected", this.printStatus()); return local; } - }).doOnError(ex -> { + })).doOnError(ex -> { synchronized (ProducerConnection.this) { - remoteResult.emitError(ex, EmitFailureHandler.busyLooping(Duration.ofMillis(100))); + if (local != null && remoteTerminationState == null) { + remoteTerminationState = Optional.of(ex); + if (LOG.isDebugEnabled()) LOG.debug("%s Remote connection ended with failure, emitting termination failure".formatted(this.printStatus()), ex); + var sink = remoteTerminationSink; + reset(true); + sink.emitError(ex, EmitFailureHandler.busyLooping(Duration.ofMillis(100))); + if (LOG.isDebugEnabled()) LOG.debug("%s Remote connection ended with failure, emitted termination failure".formatted(this.printStatus())); + } } - })).doFinally(ended -> reset()); + }).doFinally(s -> { + if (s != SignalType.ON_ERROR) { + synchronized (ProducerConnection.this) { + if (local != null && remoteTerminationState == null) { + assert connectedState; + remoteTerminationState = Optional.empty(); + if (LOG.isDebugEnabled()) LOG.debug("{} Remote connection ended with status {}, emitting termination complete", this.printStatus(), s); + if (s == SignalType.CANCEL) { + remoteTerminationSink.emitError(new CancelledChannelException(), EmitFailureHandler.busyLooping(Duration.ofMillis(100))); + } else { + remoteTerminationSink.emitEmpty(EmitFailureHandler.busyLooping(Duration.ofMillis(100))); + } + if (LOG.isDebugEnabled()) LOG.debug("{} Remote connection ended with status {}, emitted termination complete", this.printStatus(), s); + } + reset(true); + + } + } + }); } - public synchronized void reset() { - if (local != null && remote != null) { - remoteResult.emitEmpty(EmitFailureHandler.busyLooping(Duration.ofMillis(100))); - local = null; - remote = null; - connected = Sinks.empty(); - remoteResult = Sinks.empty(); + public synchronized void reset(boolean resettingFromRemote) { + if (LOG.isDebugEnabled()) LOG.debug("{} Reset started", this.printStatus()); + if (connectedState) { + if (remoteTerminationState == null) { + if (LOG.isDebugEnabled()) LOG.debug("{} The previous connection is still marked as open but not terminated, interrupting it", this.printStatus()); + var ex = new InterruptedException(); + remoteTerminationState = Optional.of(ex); + remoteTerminationSink.emitError(ex, EmitFailureHandler.busyLooping(Duration.ofMillis(100))); + if (LOG.isDebugEnabled()) LOG.debug("{} The previous connection has been interrupted", this.printStatus()); + } + } else { + if (LOG.isDebugEnabled()) LOG.debug("{} The previous connection is still marked as waiting for a connection, interrupting it", this.printStatus()); + remoteTerminationState = Optional.empty(); + remoteTerminationSink.emitEmpty(EmitFailureHandler.busyLooping(Duration.ofMillis(100))); + if (LOG.isDebugEnabled()) LOG.debug("{} The previous connection has been interrupted", this.printStatus()); } + local = null; + remote = null; + connectedState = false; + connectedSink = Sinks.empty(); + remoteTerminationState = null; + remoteTerminationSink = Sinks.empty(); + if (LOG.isDebugEnabled()) LOG.debug("{} Reset ended", this.printStatus()); } public synchronized void registerRemote() { + if (LOG.isDebugEnabled()) LOG.debug("{} Remote is trying to register", this.printStatus()); if (this.remote != null) { + if (LOG.isDebugEnabled()) LOG.debug("{} Remote was already registered", this.printStatus()); throw new IllegalStateException("Remote is already registered"); } this.remote = new Object(); + if (LOG.isDebugEnabled()) LOG.debug("{} Remote registered", this.printStatus()); onChanged(); } public synchronized void registerLocal(Flux local) { + if (LOG.isDebugEnabled()) LOG.debug("{} Local is trying to register", this.printStatus()); if (this.local != null) { + if (LOG.isDebugEnabled()) LOG.debug("{} Local was already registered", this.printStatus()); throw new IllegalStateException("Local is already registered"); } this.local = local; + if (LOG.isDebugEnabled()) LOG.debug("{} Local registered", this.printStatus()); onChanged(); } private synchronized void onChanged() { + if (LOG.isDebugEnabled()) LOG.debug("{} Checking connection changes", this.printStatus()); if (local != null && remote != null) { - connected.emitEmpty(EmitFailureHandler.busyLooping(Duration.ofMillis(100))); + connectedState = true; + if (LOG.isDebugEnabled()) LOG.debug("{} Connected successfully! Emitting connected event", this.printStatus()); + connectedSink.emitEmpty(EmitFailureHandler.busyLooping(Duration.ofMillis(100))); + if (LOG.isDebugEnabled()) LOG.debug("{} Connected successfully! Emitted connected event", this.printStatus()); + } else { + if (LOG.isDebugEnabled()) LOG.debug("{} Still not connected", this.printStatus()); } } } diff --git a/src/main/resources/log4j2.xml b/src/main/resources/log4j2.xml index 0b9f56b..cefa99d 100644 --- a/src/main/resources/log4j2.xml +++ b/src/main/resources/log4j2.xml @@ -16,6 +16,7 @@ + diff --git a/src/test/java/it/tdlight/reactiveapi/test/TestChannel.java b/src/test/java/it/tdlight/reactiveapi/test/TestChannel.java index 240c4af..20066b8 100644 --- a/src/test/java/it/tdlight/reactiveapi/test/TestChannel.java +++ b/src/test/java/it/tdlight/reactiveapi/test/TestChannel.java @@ -212,12 +212,14 @@ public abstract class TestChannel { .map(Integer::parseUnsignedInt) .take(50, true) .collect(Collectors.toCollection(IntArrayList::new)); - var response = Flux - .merge(isConsumerClient() ? (List.of(eventProducer, eventConsumer)) : List.of(eventConsumer, eventProducer)) - .blockLast(Duration.ofSeconds(5)); - data.removeElements(50, 100); - Assertions.assertEquals(response, data); - System.out.println(response); + Assertions.assertThrows(Throwable.class, () -> { + var response = Flux + .merge(isConsumerClient() ? (List.of(eventProducer, eventConsumer)) : List.of(eventConsumer, eventProducer)) + .blockLast(Duration.ofSeconds(5)); + data.removeElements(50, 100); + Assertions.assertEquals(response, data); + System.out.println(response); + }); } @Test @@ -274,14 +276,14 @@ public abstract class TestChannel { .map(Integer::parseUnsignedInt) .take(10, true) .collect(Collectors.toCollection(IntArrayList::new)) - .block(); + .block(Duration.ofSeconds(5)); var receiver2 = consumer .consumeMessages() .limitRate(1) .map(Timestamped::data) .map(Integer::parseUnsignedInt) .collect(Collectors.toCollection(IntArrayList::new)) - .block(); + .block(Duration.ofSeconds(5)); Assertions.assertNotNull(receiver1); Assertions.assertNotNull(receiver2); receiver1.addAll(receiver2);