From 677ceb70a1e32a811dc23a43ddd913459cd76956 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Wed, 12 Oct 2022 18:31:44 +0200 Subject: [PATCH] Fix some connection errors --- .../reactiveapi/AtomixReactiveApi.java | 2 +- .../reactiveapi/ReactiveApiPublisher.java | 16 ++++- .../it/tdlight/reactiveapi/ReactorUtils.java | 14 +++- .../java/it/tdlight/reactiveapi/Stats.java | 26 ++++--- .../reactiveapi/TdlibChannelsSharedHost.java | 14 ++-- .../rsocket/ConsumerConnection.java | 72 +++++++++---------- .../rsocket/ProducerConnection.java | 12 ++-- .../reactiveapi/rsocket/RSocketUtils.java | 2 + .../tdlight/reactiveapi/test/TestChannel.java | 70 +++++++++--------- 9 files changed, 131 insertions(+), 97 deletions(-) diff --git a/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApi.java b/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApi.java index 0ed852b..c911c8e 100644 --- a/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApi.java +++ b/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApi.java @@ -148,7 +148,7 @@ public class AtomixReactiveApi implements ReactiveApi { if (publisher != null) { publisher.handleRequest(req.data()); } else { - LOG.error("Dropped request because no session is found: {}", req); + LOG.debug("Dropped request because no session is found: {}", req); } }) .subscribeOn(Schedulers.parallel()) diff --git a/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java b/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java index 3d4b322..d02bbb6 100644 --- a/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java +++ b/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java @@ -67,6 +67,7 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.FluxSink.OverflowStrategy; import reactor.core.publisher.Mono; import reactor.core.publisher.Sinks.EmitFailureHandler; +import reactor.core.publisher.Sinks.EmitResult; import reactor.core.publisher.Sinks.Many; import reactor.core.scheduler.Scheduler.Worker; import reactor.core.scheduler.Schedulers; @@ -503,8 +504,21 @@ public abstract class ReactiveApiPublisher { public void handleRequest(OnRequest onRequestObj) { handleRequestInternal(onRequestObj, response -> { + EmitResult status; synchronized (this.responses) { - this.responses.emitNext(response, EmitFailureHandler.FAIL_FAST); + status = this.responses.tryEmitNext(response); + } + if (status.isFailure()) { + switch (status) { + case FAIL_ZERO_SUBSCRIBER -> + LOG.warn("Failed to send response of request {}, user {}, client {}: no subscribers", + onRequestObj.userId(), onRequestObj.userId(), onRequestObj.clientId()); + case FAIL_OVERFLOW -> + LOG.warn("Failed to send response of request {}, user {}, client {}: too many unsent responses", + onRequestObj.userId(), onRequestObj.userId(), onRequestObj.clientId()); + default -> LOG.error("Failed to send response of request {}, user {}, client {}: {}", + onRequestObj.userId(), onRequestObj.userId(), onRequestObj.clientId(), status); + } } }); } diff --git a/src/main/java/it/tdlight/reactiveapi/ReactorUtils.java b/src/main/java/it/tdlight/reactiveapi/ReactorUtils.java index 77bb5bf..6e092d3 100644 --- a/src/main/java/it/tdlight/reactiveapi/ReactorUtils.java +++ b/src/main/java/it/tdlight/reactiveapi/ReactorUtils.java @@ -185,16 +185,28 @@ public class ReactorUtils { } } ); + AtomicReference startEx = new AtomicReference<>(); var disposable = flux .subscribeOn(Schedulers.parallel()) .publishOn(Schedulers.boundedElastic()) - .subscribe(queue::add); + .subscribe(queue::add, ex -> { + startEx.set(ex); + var refVal = ref.get(); + if (refVal != null) { + refVal.error(ex); + } + }); queue.startQueue(); return Flux.create(sink -> { sink.onDispose(() -> { disposable.dispose(); queue.close(); }); + var startExVal = startEx.get(); + if (startExVal != null) { + sink.error(startExVal); + return; + } ref.set(sink); sink.onCancel(() -> ref.set(null)); }); diff --git a/src/main/java/it/tdlight/reactiveapi/Stats.java b/src/main/java/it/tdlight/reactiveapi/Stats.java index c3d52b9..adb41a2 100644 --- a/src/main/java/it/tdlight/reactiveapi/Stats.java +++ b/src/main/java/it/tdlight/reactiveapi/Stats.java @@ -99,7 +99,9 @@ public class Stats extends Thread { processedUpdatesRateSum += processedUpdatesRate; clientBoundEventsRateSum += clientBoundEventsRate; sentClientBoundEventsRateSum += sentClientBoundEventsRate; - out.append(String.format("%d:\t", clientIds.getLong(i))); + if (LOG.isTraceEnabled()) { + out.append(String.format("%d:\t", clientIds.getLong(i))); + } } else { receivedUpdatesRate = receivedUpdatesRateSum; diskBuffered = diskBufferedSum; @@ -110,16 +112,18 @@ public class Stats extends Thread { sentClientBoundEventsRate = sentClientBoundEventsRateSum; out.append("Total:\t"); } - out.append(String.format( - "\tUpdates:\t[received %03.2fHz\tbuffered: %03.2fHz (RAM: %d HDD: %d)\tprocessed: %03.2fHz]\tClient bound events: %03.2fHz\tProcessed events: %03.2fHz\t%n", - receivedUpdatesRate, - bufferedUpdatesRate, - ramBuffered, - diskBuffered, - processedUpdatesRate, - clientBoundEventsRate, - sentClientBoundEventsRate - )); + if (i == currentClients || LOG.isTraceEnabled()) { + out.append(String.format( + "\tUpdates:\t[received %03.2fHz\tbuffered: %03.2fHz (RAM: %d HDD: %d)\tprocessed: %03.2fHz]\tClient bound events: %03.2fHz\tProcessed events: %03.2fHz\t%n", + receivedUpdatesRate, + bufferedUpdatesRate, + ramBuffered, + diskBuffered, + processedUpdatesRate, + clientBoundEventsRate, + sentClientBoundEventsRate + )); + } } out.append(String.format("%n")); diff --git a/src/main/java/it/tdlight/reactiveapi/TdlibChannelsSharedHost.java b/src/main/java/it/tdlight/reactiveapi/TdlibChannelsSharedHost.java index d63ce01..9cade62 100644 --- a/src/main/java/it/tdlight/reactiveapi/TdlibChannelsSharedHost.java +++ b/src/main/java/it/tdlight/reactiveapi/TdlibChannelsSharedHost.java @@ -47,14 +47,13 @@ public class TdlibChannelsSharedHost implements Closeable { private final TdlibChannelsServers tdServersChannels; private final Disposable responsesSub; private final AtomicReference requestsSub = new AtomicReference<>(); - private final Many> responses = Sinks.many().multicast().directAllOrNothing(); + private final Many> responses = Sinks.many().multicast().onBackpressureBuffer(65_535); private final Map>> events; private final Flux>> requests; public TdlibChannelsSharedHost(Set allLanes, TdlibChannelsServers tdServersChannels) { this.tdServersChannels = tdServersChannels; - this.responsesSub = Mono.defer(() -> tdServersChannels.response() - .sendMessages(responses.asFlux()/*.log("responses", Level.FINE)*/)) + this.responsesSub = tdServersChannels.response().sendMessages(responses.asFlux()/*.log("responses", Level.FINE)*/) .repeatWhen(REPEAT_STRATEGY) .retryWhen(RETRY_STRATEGY) .subscribeOn(Schedulers.parallel()) @@ -64,8 +63,7 @@ public class TdlibChannelsSharedHost implements Closeable { Flux outputEventsFlux = Flux .merge(sink.asFlux().map(flux -> flux.publishOn(Schedulers.parallel())), Integer.MAX_VALUE, 256) .doFinally(s -> LOG.debug("Output events flux of lane \"{}\" terminated with signal {}", lane, s)); - Mono.defer(() -> tdServersChannels.events(lane) - .sendMessages(outputEventsFlux)) + tdServersChannels.events(lane).sendMessages(outputEventsFlux) .repeatWhen(REPEAT_STRATEGY) .retryWhen(RETRY_STRATEGY) .subscribeOn(Schedulers.parallel()) @@ -92,9 +90,9 @@ public class TdlibChannelsSharedHost implements Closeable { if (eventsSink == null) { throw new IllegalArgumentException("Lane " + lane + " does not exist"); } - eventsSink.emitNext(eventFlux.takeUntilOther(canceller.asMono()), - EmitFailureHandler.busyLooping(Duration.ofMillis(100)) - ); + synchronized (events) { + eventsSink.emitNext(eventFlux.takeUntilOther(canceller.asMono()), EmitFailureHandler.FAIL_FAST); + } return () -> canceller.tryEmitEmpty(); } diff --git a/src/main/java/it/tdlight/reactiveapi/rsocket/ConsumerConnection.java b/src/main/java/it/tdlight/reactiveapi/rsocket/ConsumerConnection.java index dbbfbdb..fb27a3a 100644 --- a/src/main/java/it/tdlight/reactiveapi/rsocket/ConsumerConnection.java +++ b/src/main/java/it/tdlight/reactiveapi/rsocket/ConsumerConnection.java @@ -15,7 +15,6 @@ import reactor.core.publisher.Sinks.EmitFailureHandler; import reactor.core.publisher.Sinks.Empty; import reactor.core.publisher.Sinks.Many; import reactor.core.scheduler.Schedulers; -import reactor.util.concurrent.Queues; public class ConsumerConnection { @@ -61,20 +60,7 @@ public class ConsumerConnection { return remote.doOnError(ex -> { synchronized (ConsumerConnection.this) { if (remoteCount <= 1) { - if (remoteCount > 0 && localTerminationState == null) { - localTerminationState = Optional.of(ex); - if (LOG.isDebugEnabled()) { - LOG.debug("%s Local connection ended with failure".formatted(this.printStatus()), ex); - } - if (remoteCount <= 1) { - var sink = localTerminationSink; - reset(); - sink.emitError(ex, EmitFailureHandler.FAIL_FAST); - if (LOG.isDebugEnabled()) { - LOG.debug("%s Local connection ended with failure, emitted termination failure".formatted(this.printStatus())); - } - } - } + onRemoteLastError(ex); } else { remoteCount--; if (LOG.isDebugEnabled()) { @@ -88,17 +74,7 @@ public class ConsumerConnection { synchronized (ConsumerConnection.this) { if (LOG.isDebugEnabled()) LOG.debug("{} Remote connection ending with status {}", this.printStatus(), s); if (remoteCount <= 1) { - if (remoteCount > 0 && localTerminationState == null) { - assert connectedState; - localTerminationState = Optional.empty(); - if (s == SignalType.CANCEL) { - localTerminationSink.emitError(new CancelledChannelException(), EmitFailureHandler.FAIL_FAST); - } else { - localTerminationSink.emitEmpty(EmitFailureHandler.FAIL_FAST); - } - } - reset(); - if (LOG.isDebugEnabled()) LOG.debug("{} Remote connection ended with status {}, emitted termination complete", this.printStatus(), s); + onLastFinally(s); } else { remoteCount--; if (LOG.isDebugEnabled()) LOG.debug("{} Remote connection ended with status {}, but at least one remote is still online", this.printStatus(), s); @@ -122,17 +98,39 @@ public class ConsumerConnection { }) .map(element -> new Timestamped<>(System.currentTimeMillis(), element)); } - })).doFinally(s -> { + })).doOnError(this::onRemoteLastError).doFinally(this::onLastFinally); + } + + private synchronized void onLastFinally(SignalType s) { + if (remoteCount > 0 && localTerminationState == null) { + assert connectedState; + var ex = new CancelledChannelException(); + localTerminationState = Optional.of(ex); if (s == SignalType.CANCEL) { - synchronized (ConsumerConnection.this) { - local = null; - var ex = new InterruptedException(); - localTerminationState = Optional.of(ex); - if (LOG.isDebugEnabled()) LOG.debug("{} Local is cancelled", this.printStatus()); - localTerminationSink.emitError(ex, EmitFailureHandler.busyLooping(Duration.ofMillis(100))); + localTerminationSink.emitError(ex, EmitFailureHandler.FAIL_FAST); + } else { + localTerminationSink.emitEmpty(EmitFailureHandler.FAIL_FAST); + } + } + reset(); + if (LOG.isDebugEnabled()) LOG.debug("{} Remote connection ended with status {}, emitted termination complete", this.printStatus(), s); + } + + private synchronized void onRemoteLastError(Throwable ex) { + if (remoteCount > 0 && localTerminationState == null) { + localTerminationState = Optional.of(ex); + if (LOG.isDebugEnabled()) { + LOG.debug("%s Local connection ended with failure".formatted(this.printStatus()), ex); + } + if (remoteCount <= 1) { + var sink = localTerminationSink; + reset(); + sink.emitError(ex, EmitFailureHandler.FAIL_FAST); + if (LOG.isDebugEnabled()) { + LOG.debug("%s Local connection ended with failure, emitted termination failure".formatted(this.printStatus())); } } - }); + } } public synchronized Mono connectRemote() { @@ -154,7 +152,7 @@ public class ConsumerConnection { if (connectedState) { if (localTerminationState == null) { if (LOG.isDebugEnabled()) LOG.debug("{} The previous connection is still marked as open but not terminated, interrupting it", this.printStatus()); - var ex = new InterruptedException(); + var ex = new InterruptedException("Interrupted this connection because a new one is being prepared"); localTerminationState = Optional.of(ex); localTerminationSink.emitError(ex, EmitFailureHandler.busyLooping(Duration.ofMillis(100))); if (LOG.isDebugEnabled()) LOG.debug("{} The previous connection has been interrupted", this.printStatus()); @@ -167,7 +165,7 @@ public class ConsumerConnection { } local = null; remoteCount = 0; - remotes.emitComplete(EmitFailureHandler.FAIL_FAST); + remotes.tryEmitComplete(); remotes = Sinks.many().replay().all(); connectedState = false; connectedSink = Sinks.empty(); @@ -179,7 +177,7 @@ public class ConsumerConnection { public synchronized void registerRemote(Flux remote) { if (LOG.isDebugEnabled()) LOG.debug("{} Remote is trying to register", this.printStatus()); this.remoteCount++; - this.remotes.emitNext(remote, EmitFailureHandler.FAIL_FAST); + this.remotes.tryEmitNext(remote); if (LOG.isDebugEnabled()) LOG.debug("{} Remote registered", this.printStatus()); onChanged(); } diff --git a/src/main/java/it/tdlight/reactiveapi/rsocket/ProducerConnection.java b/src/main/java/it/tdlight/reactiveapi/rsocket/ProducerConnection.java index 36dfa8b..b4a7da4 100644 --- a/src/main/java/it/tdlight/reactiveapi/rsocket/ProducerConnection.java +++ b/src/main/java/it/tdlight/reactiveapi/rsocket/ProducerConnection.java @@ -56,8 +56,12 @@ public class ProducerConnection { return remoteTerminationSink.asMono().publishOn(Schedulers.parallel()); } })).doFinally(s -> { - if (s == SignalType.CANCEL) { + if (s == SignalType.ON_ERROR || s == SignalType.CANCEL) { synchronized (ProducerConnection.this) { + if (connectedState) { + connectedState = false; + connectedSink = Sinks.empty(); + } local = null; if (LOG.isDebugEnabled()) LOG.debug("{} Local is cancelled", this.printStatus()); } @@ -126,7 +130,7 @@ public class ProducerConnection { if (connectedState) { if (remoteTerminationState == null) { if (LOG.isDebugEnabled()) LOG.debug("{} The previous connection is still marked as open but not terminated, interrupting it", this.printStatus()); - var ex = new InterruptedException(); + var ex = new InterruptedException("Interrupted this connection because a new one is being prepared"); remoteTerminationState = Optional.of(ex); remoteTerminationSink.emitError(ex, EmitFailureHandler.busyLooping(Duration.ofMillis(100))); if (LOG.isDebugEnabled()) LOG.debug("{} The previous connection has been interrupted", this.printStatus()); @@ -148,10 +152,6 @@ public class ProducerConnection { public synchronized void registerRemote() { if (LOG.isDebugEnabled()) LOG.debug("{} Remote is trying to register", this.printStatus()); - if (this.remoteCount > 0) { - if (LOG.isDebugEnabled()) LOG.debug("{} Remote was already registered", this.printStatus()); - throw new IllegalStateException("Remote is already registered"); - } this.remoteCount++; if (LOG.isDebugEnabled()) LOG.debug("{} Remote registered", this.printStatus()); onChanged(); diff --git a/src/main/java/it/tdlight/reactiveapi/rsocket/RSocketUtils.java b/src/main/java/it/tdlight/reactiveapi/rsocket/RSocketUtils.java index bc5c14f..9aaa631 100644 --- a/src/main/java/it/tdlight/reactiveapi/rsocket/RSocketUtils.java +++ b/src/main/java/it/tdlight/reactiveapi/rsocket/RSocketUtils.java @@ -22,6 +22,8 @@ public class RSocketUtils { } } catch (IOException e) { throw new UncheckedIOException(e); + } finally { + payload.release(); } }); } diff --git a/src/test/java/it/tdlight/reactiveapi/test/TestChannel.java b/src/test/java/it/tdlight/reactiveapi/test/TestChannel.java index 46e21cd..4ea8434 100644 --- a/src/test/java/it/tdlight/reactiveapi/test/TestChannel.java +++ b/src/test/java/it/tdlight/reactiveapi/test/TestChannel.java @@ -7,12 +7,9 @@ import it.tdlight.reactiveapi.EventConsumer; import it.tdlight.reactiveapi.EventProducer; import it.tdlight.reactiveapi.RSocketParameters; import it.tdlight.reactiveapi.Timestamped; -import it.tdlight.reactiveapi.rsocket.CancelledChannelException; import it.unimi.dsi.fastutil.ints.IntArrayList; import java.io.Closeable; -import java.io.IOException; import java.time.Duration; -import java.util.Collections; import java.util.List; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.atomic.AtomicReference; @@ -25,6 +22,7 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.function.Executable; import reactor.core.Exceptions; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -449,7 +447,7 @@ public abstract class TestChannel { } @Test - public void testFailTwoSubscribers() { + public void testTwoSubscribers() { var dataFlux = Flux.fromIterable(data).publish().autoConnect(); var exRef1 = new AtomicReference(); var exRef2 = new AtomicReference(); @@ -459,35 +457,43 @@ public abstract class TestChannel { .retryWhen(Retry.fixedDelay(5, Duration.ofSeconds(1))) .subscribe(n -> {}, ex -> exRef1.set(ex)); - Assertions.assertThrows(IllegalStateException.class, () -> { - try { - Mono - .when(consumer - .consumeMessages() - .limitRate(1) - .map(Timestamped::data) - .map(Integer::parseUnsignedInt) - .log("consumer-1", Level.INFO) - .doOnError(ex -> exRef2.set(ex)), - consumer - .consumeMessages() - .limitRate(1) - .map(Timestamped::data) - .map(Integer::parseUnsignedInt) - .log("consumer-2", Level.INFO) - .onErrorResume(io.rsocket.exceptions.ApplicationErrorException.class, - ex -> Mono.error(new IllegalStateException(ex)) - ) - ) - .block(); - Assertions.assertNull(exRef1.get()); - Assertions.assertNull(exRef2.get()); - } catch (RuntimeException ex) { - throw Exceptions.unwrap(ex); - } finally { - eventProducer.dispose(); + var exe = new Executable() { + @Override + public void execute() throws Throwable { + try { + Mono + .when(consumer + .consumeMessages() + .limitRate(1) + .map(Timestamped::data) + .map(Integer::parseUnsignedInt) + .log("consumer-1", Level.INFO) + .doOnError(ex -> exRef2.set(ex)), + consumer + .consumeMessages() + .limitRate(1) + .map(Timestamped::data) + .map(Integer::parseUnsignedInt) + .log("consumer-2", Level.INFO) + .onErrorResume(io.rsocket.exceptions.ApplicationErrorException.class, + ex -> Mono.error(new IllegalStateException(ex)) + ) + ) + .block(); + Assertions.assertNull(exRef1.get()); + Assertions.assertNull(exRef2.get()); + } catch (RuntimeException ex) { + throw Exceptions.unwrap(ex); + } finally { + eventProducer.dispose(); + } } - }); + }; + if (isConsumerClient()) { + Assertions.assertDoesNotThrow(exe); + } else { + Assertions.assertThrows(IllegalStateException.class, exe); + } } @Test