diff --git a/src/main/java/it/tdlight/reactiveapi/TdlibChannelsSharedHost.java b/src/main/java/it/tdlight/reactiveapi/TdlibChannelsSharedHost.java index 5e7fa5e..7b5386f 100644 --- a/src/main/java/it/tdlight/reactiveapi/TdlibChannelsSharedHost.java +++ b/src/main/java/it/tdlight/reactiveapi/TdlibChannelsSharedHost.java @@ -58,11 +58,10 @@ public class TdlibChannelsSharedHost implements Closeable { .subscribe(n -> {}, ex -> LOG.error("Unexpected error when sending responses", ex)); events = allLanes.stream().collect(Collectors.toUnmodifiableMap(Function.identity(), lane -> { Many> sink = Sinks.many().replay().all(); - var outputEventsFlux = Flux - .merge(sink.asFlux().map(flux -> flux.publish().autoConnect().subscribeOn(Schedulers.parallel())), Integer.MAX_VALUE) + Flux outputEventsFlux = Flux + .merge(sink.asFlux().map(flux -> flux.publishOn(Schedulers.parallel())), Integer.MAX_VALUE, 256) .doFinally(s -> LOG.debug("Output events flux of lane \"{}\" terminated with signal {}", lane, s)); - Mono.defer(() -> tdServersChannels - .events(lane) + Mono.defer(() -> tdServersChannels.events(lane) .sendMessages(outputEventsFlux)) .repeatWhen(REPEAT_STRATEGY) .retryWhen(RETRY_STRATEGY) diff --git a/src/main/java/it/tdlight/reactiveapi/TdlibChannelsSharedReceive.java b/src/main/java/it/tdlight/reactiveapi/TdlibChannelsSharedReceive.java index ad2e4d6..aa362ea 100644 --- a/src/main/java/it/tdlight/reactiveapi/TdlibChannelsSharedReceive.java +++ b/src/main/java/it/tdlight/reactiveapi/TdlibChannelsSharedReceive.java @@ -46,8 +46,6 @@ public class TdlibChannelsSharedReceive implements Closeable { //.log("responses", Level.FINE) .repeatWhen(REPEAT_STRATEGY) .retryWhen(RETRY_STRATEGY) - .publish() - .autoConnect() .doFinally(s -> LOG.debug("Input responses flux terminated with signal {}", s)); this.events = tdClientsChannels.events().entrySet().stream() .collect(Collectors.toUnmodifiableMap(Entry::getKey, @@ -59,10 +57,7 @@ public class TdlibChannelsSharedReceive implements Closeable { )); this.requestsSub = tdClientsChannels .request() - .sendMessages(Flux - .defer(() -> requests - .asFlux() - .doFinally(s -> LOG.debug("Output requests flux terminated with signal {}", s)))) + .sendMessages(Flux.defer(() -> requests.asFlux().doFinally(s -> LOG.debug("Output requests flux terminated with signal {}", s)))) .doFinally(s -> LOG.debug("Output requests sender terminated with signal {}", s)) .repeatWhen(REPEAT_STRATEGY) .retryWhen(RETRY_STRATEGY)