Code cleanup

This commit is contained in:
Andrea Cavalli 2022-10-08 03:12:50 +02:00
parent edb8cec873
commit 1eb1d7b95f
2 changed files with 4 additions and 10 deletions

View File

@ -58,11 +58,10 @@ public class TdlibChannelsSharedHost implements Closeable {
.subscribe(n -> {}, ex -> LOG.error("Unexpected error when sending responses", ex)); .subscribe(n -> {}, ex -> LOG.error("Unexpected error when sending responses", ex));
events = allLanes.stream().collect(Collectors.toUnmodifiableMap(Function.identity(), lane -> { events = allLanes.stream().collect(Collectors.toUnmodifiableMap(Function.identity(), lane -> {
Many<Flux<ClientBoundEvent>> sink = Sinks.many().replay().all(); Many<Flux<ClientBoundEvent>> sink = Sinks.many().replay().all();
var outputEventsFlux = Flux Flux<ClientBoundEvent> outputEventsFlux = Flux
.merge(sink.asFlux().map(flux -> flux.publish().autoConnect().subscribeOn(Schedulers.parallel())), Integer.MAX_VALUE) .merge(sink.asFlux().map(flux -> flux.publishOn(Schedulers.parallel())), Integer.MAX_VALUE, 256)
.doFinally(s -> LOG.debug("Output events flux of lane \"{}\" terminated with signal {}", lane, s)); .doFinally(s -> LOG.debug("Output events flux of lane \"{}\" terminated with signal {}", lane, s));
Mono.defer(() -> tdServersChannels Mono.defer(() -> tdServersChannels.events(lane)
.events(lane)
.sendMessages(outputEventsFlux)) .sendMessages(outputEventsFlux))
.repeatWhen(REPEAT_STRATEGY) .repeatWhen(REPEAT_STRATEGY)
.retryWhen(RETRY_STRATEGY) .retryWhen(RETRY_STRATEGY)

View File

@ -46,8 +46,6 @@ public class TdlibChannelsSharedReceive implements Closeable {
//.log("responses", Level.FINE) //.log("responses", Level.FINE)
.repeatWhen(REPEAT_STRATEGY) .repeatWhen(REPEAT_STRATEGY)
.retryWhen(RETRY_STRATEGY) .retryWhen(RETRY_STRATEGY)
.publish()
.autoConnect()
.doFinally(s -> LOG.debug("Input responses flux terminated with signal {}", s)); .doFinally(s -> LOG.debug("Input responses flux terminated with signal {}", s));
this.events = tdClientsChannels.events().entrySet().stream() this.events = tdClientsChannels.events().entrySet().stream()
.collect(Collectors.toUnmodifiableMap(Entry::getKey, .collect(Collectors.toUnmodifiableMap(Entry::getKey,
@ -59,10 +57,7 @@ public class TdlibChannelsSharedReceive implements Closeable {
)); ));
this.requestsSub = tdClientsChannels this.requestsSub = tdClientsChannels
.request() .request()
.sendMessages(Flux .sendMessages(Flux.defer(() -> requests.asFlux().doFinally(s -> LOG.debug("Output requests flux terminated with signal {}", s))))
.defer(() -> requests
.asFlux()
.doFinally(s -> LOG.debug("Output requests flux terminated with signal {}", s))))
.doFinally(s -> LOG.debug("Output requests sender terminated with signal {}", s)) .doFinally(s -> LOG.debug("Output requests sender terminated with signal {}", s))
.repeatWhen(REPEAT_STRATEGY) .repeatWhen(REPEAT_STRATEGY)
.retryWhen(RETRY_STRATEGY) .retryWhen(RETRY_STRATEGY)