diff --git a/src/main/java/it/tdlight/tdlibsession/td/easy/AsyncTdEasy.java b/src/main/java/it/tdlight/tdlibsession/td/easy/AsyncTdEasy.java index 7d8029a..638387f 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/easy/AsyncTdEasy.java +++ b/src/main/java/it/tdlight/tdlibsession/td/easy/AsyncTdEasy.java @@ -58,7 +58,6 @@ public class AsyncTdEasy { private final Logger logger; - private static final Scheduler scheduler = Schedulers.parallel(); private final Many authState = Sinks.many().replay().latest(); private final Many requestedDefinitiveExit = Sinks.many().replay().latestOrDefault(false); private final Many settings = Sinks.many().replay().latest(); @@ -67,6 +66,7 @@ public class AsyncTdEasy { private final AsyncTdMiddle td; private final String logName; private final Flux incomingUpdates; + private final Scheduler scheduler = Schedulers.parallel(); public AsyncTdEasy(AsyncTdMiddle td, String logName) { this.td = td; @@ -151,7 +151,7 @@ public class AsyncTdEasy { * Get TDLib state */ public Flux getState() { - return authState.asFlux().distinct().publishOn(scheduler); + return authState.asFlux().distinct(); } /** @@ -162,21 +162,21 @@ public class AsyncTdEasy { } private Flux getIncomingUpdates(boolean includePreAuthUpdates) { - return incomingUpdates.publishOn(scheduler); + return incomingUpdates; } /** * Get generic error updates from TDLib (When they are not linked to a precise request). */ public Flux getIncomingErrors() { - return Flux.from(globalErrors.asFlux()).publishOn(scheduler); + return Flux.from(globalErrors.asFlux()); } /** * Receives fatal errors from TDLib. */ public Mono getFatalErrors() { - return Mono.from(fatalError.asMono()).publishOn(scheduler); + return Mono.from(fatalError.asMono()); } /** @@ -188,7 +188,7 @@ public class AsyncTdEasy { } private Mono> sendDirectly(TdApi.Function obj, boolean synchronous) { - return td.execute(obj, synchronous).publishOn(scheduler); + return td.execute(obj, synchronous); } /** @@ -196,7 +196,7 @@ public class AsyncTdEasy { * @param i level */ public Mono setVerbosityLevel(int i) { - return thenOrFatalError(sendDirectly(new TdApi.SetLogVerbosityLevel(i), true)).publishOn(scheduler); + return thenOrFatalError(sendDirectly(new TdApi.SetLogVerbosityLevel(i), true)); } /** @@ -204,7 +204,7 @@ public class AsyncTdEasy { * @param name option name */ public Mono clearOption(String name) { - return thenOrFatalError(sendDirectly(new TdApi.SetOption(name, new TdApi.OptionValueEmpty()), false)).publishOn(scheduler); + return thenOrFatalError(sendDirectly(new TdApi.SetOption(name, new TdApi.OptionValueEmpty()), false)); } /** @@ -213,7 +213,7 @@ public class AsyncTdEasy { * @param value option value */ public Mono setOptionString(String name, String value) { - return thenOrFatalError(sendDirectly(new TdApi.SetOption(name, new TdApi.OptionValueString(value)), false)).publishOn(scheduler); + return thenOrFatalError(sendDirectly(new TdApi.SetOption(name, new TdApi.OptionValueString(value)), false)); } /** @@ -222,7 +222,7 @@ public class AsyncTdEasy { * @param value option value */ public Mono setOptionInteger(String name, long value) { - return thenOrFatalError(sendDirectly(new TdApi.SetOption(name, new TdApi.OptionValueInteger(value)), false)).publishOn(scheduler); + return thenOrFatalError(sendDirectly(new TdApi.SetOption(name, new TdApi.OptionValueInteger(value)), false)); } /** @@ -231,7 +231,7 @@ public class AsyncTdEasy { * @param value option value */ public Mono setOptionBoolean(String name, boolean value) { - return thenOrFatalError(sendDirectly(new TdApi.SetOption(name, new TdApi.OptionValueBoolean(value)), false)).publishOn(scheduler); + return thenOrFatalError(sendDirectly(new TdApi.SetOption(name, new TdApi.OptionValueBoolean(value)), false)); } /** @@ -250,7 +250,7 @@ public class AsyncTdEasy { return Mono.error(new UnsupportedOperationException("The option " + name + " is of type " + value.getClass().getSimpleName())); } - }).publishOn(scheduler); + }); } /** @@ -269,7 +269,7 @@ public class AsyncTdEasy { return Mono.error(new UnsupportedOperationException("The option " + name + " is of type " + value.getClass().getSimpleName())); } - }).publishOn(scheduler); + }); } /** @@ -288,7 +288,7 @@ public class AsyncTdEasy { return Mono.error(new UnsupportedOperationException("The option " + name + " is of type " + value.getClass().getSimpleName())); } - }).publishOn(scheduler); + }); } /** @@ -299,7 +299,7 @@ public class AsyncTdEasy { * @return The request response. */ public Mono> execute(TdApi.Function request) { - return td.execute(request, true).publishOn(scheduler); + return td.execute(request, true); } /** @@ -345,8 +345,7 @@ public class AsyncTdEasy { .doOnSuccess(s -> { logger.info("AsyncTdEasy closed successfully"); }) - .then() - .publishOn(scheduler); + .then(); } /** @@ -386,7 +385,7 @@ public class AsyncTdEasy { } else { return (Update) obj; } - }).publishOn(scheduler); + }); } private void analyzeFatalErrors(Object obj) { @@ -413,7 +412,7 @@ public class AsyncTdEasy { } public Mono isBot() { - return Mono.from(settings.asFlux()).single().map(TdEasySettings::isBotTokenSet).publishOn(scheduler); + return Mono.from(settings.asFlux()).single().map(TdEasySettings::isBotTokenSet); } private Publisher preprocessUpdates(TdApi.Object updateObj) { diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/TdClusterManager.java b/src/main/java/it/tdlight/tdlibsession/td/middle/TdClusterManager.java index aad1535..7ef71b8 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/TdClusterManager.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/TdClusterManager.java @@ -187,11 +187,12 @@ public class TdClusterManager { return Mono.just(Vertx.vertx(vertxOptions)); } }) - .publishOn(Schedulers.parallel()) + .subscribeOn(Schedulers.boundedElastic()) .flatMap(vertx -> Mono .fromCallable(() -> new TdClusterManager(mgr, vertxOptions, vertx)) - .publishOn(Schedulers.boundedElastic()) - ); + .subscribeOn(Schedulers.boundedElastic()) + ) + .publishOn(Schedulers.parallel()); } public Vertx getVertx() {