Update AsyncTdEasy.java and TdClusterManager.java

This commit is contained in:
Andrea Cavalli 2021-02-14 01:55:07 +01:00
parent faabbc59e8
commit 601c15d8ee
2 changed files with 22 additions and 22 deletions

View File

@ -58,7 +58,6 @@ public class AsyncTdEasy {
private final Logger logger; private final Logger logger;
private static final Scheduler scheduler = Schedulers.parallel();
private final Many<AuthorizationState> authState = Sinks.many().replay().latest(); private final Many<AuthorizationState> authState = Sinks.many().replay().latest();
private final Many<Boolean> requestedDefinitiveExit = Sinks.many().replay().latestOrDefault(false); private final Many<Boolean> requestedDefinitiveExit = Sinks.many().replay().latestOrDefault(false);
private final Many<TdEasySettings> settings = Sinks.many().replay().latest(); private final Many<TdEasySettings> settings = Sinks.many().replay().latest();
@ -67,6 +66,7 @@ public class AsyncTdEasy {
private final AsyncTdMiddle td; private final AsyncTdMiddle td;
private final String logName; private final String logName;
private final Flux<Update> incomingUpdates; private final Flux<Update> incomingUpdates;
private final Scheduler scheduler = Schedulers.parallel();
public AsyncTdEasy(AsyncTdMiddle td, String logName) { public AsyncTdEasy(AsyncTdMiddle td, String logName) {
this.td = td; this.td = td;
@ -151,7 +151,7 @@ public class AsyncTdEasy {
* Get TDLib state * Get TDLib state
*/ */
public Flux<AuthorizationState> getState() { public Flux<AuthorizationState> getState() {
return authState.asFlux().distinct().publishOn(scheduler); return authState.asFlux().distinct();
} }
/** /**
@ -162,21 +162,21 @@ public class AsyncTdEasy {
} }
private Flux<TdApi.Update> getIncomingUpdates(boolean includePreAuthUpdates) { private Flux<TdApi.Update> getIncomingUpdates(boolean includePreAuthUpdates) {
return incomingUpdates.publishOn(scheduler); return incomingUpdates;
} }
/** /**
* Get generic error updates from TDLib (When they are not linked to a precise request). * Get generic error updates from TDLib (When they are not linked to a precise request).
*/ */
public Flux<TdApi.Error> getIncomingErrors() { public Flux<TdApi.Error> getIncomingErrors() {
return Flux.from(globalErrors.asFlux()).publishOn(scheduler); return Flux.from(globalErrors.asFlux());
} }
/** /**
* Receives fatal errors from TDLib. * Receives fatal errors from TDLib.
*/ */
public Mono<FatalErrorType> getFatalErrors() { public Mono<FatalErrorType> getFatalErrors() {
return Mono.from(fatalError.asMono()).publishOn(scheduler); return Mono.from(fatalError.asMono());
} }
/** /**
@ -188,7 +188,7 @@ public class AsyncTdEasy {
} }
private <T extends TdApi.Object> Mono<TdResult<T>> sendDirectly(TdApi.Function obj, boolean synchronous) { private <T extends TdApi.Object> Mono<TdResult<T>> sendDirectly(TdApi.Function obj, boolean synchronous) {
return td.<T>execute(obj, synchronous).publishOn(scheduler); return td.<T>execute(obj, synchronous);
} }
/** /**
@ -196,7 +196,7 @@ public class AsyncTdEasy {
* @param i level * @param i level
*/ */
public Mono<Void> setVerbosityLevel(int i) { public Mono<Void> setVerbosityLevel(int i) {
return thenOrFatalError(sendDirectly(new TdApi.SetLogVerbosityLevel(i), true)).publishOn(scheduler); return thenOrFatalError(sendDirectly(new TdApi.SetLogVerbosityLevel(i), true));
} }
/** /**
@ -204,7 +204,7 @@ public class AsyncTdEasy {
* @param name option name * @param name option name
*/ */
public Mono<Void> clearOption(String name) { public Mono<Void> clearOption(String name) {
return thenOrFatalError(sendDirectly(new TdApi.SetOption(name, new TdApi.OptionValueEmpty()), false)).publishOn(scheduler); return thenOrFatalError(sendDirectly(new TdApi.SetOption(name, new TdApi.OptionValueEmpty()), false));
} }
/** /**
@ -213,7 +213,7 @@ public class AsyncTdEasy {
* @param value option value * @param value option value
*/ */
public Mono<Void> setOptionString(String name, String value) { public Mono<Void> setOptionString(String name, String value) {
return thenOrFatalError(sendDirectly(new TdApi.SetOption(name, new TdApi.OptionValueString(value)), false)).publishOn(scheduler); return thenOrFatalError(sendDirectly(new TdApi.SetOption(name, new TdApi.OptionValueString(value)), false));
} }
/** /**
@ -222,7 +222,7 @@ public class AsyncTdEasy {
* @param value option value * @param value option value
*/ */
public Mono<Void> setOptionInteger(String name, long value) { public Mono<Void> setOptionInteger(String name, long value) {
return thenOrFatalError(sendDirectly(new TdApi.SetOption(name, new TdApi.OptionValueInteger(value)), false)).publishOn(scheduler); return thenOrFatalError(sendDirectly(new TdApi.SetOption(name, new TdApi.OptionValueInteger(value)), false));
} }
/** /**
@ -231,7 +231,7 @@ public class AsyncTdEasy {
* @param value option value * @param value option value
*/ */
public Mono<Void> setOptionBoolean(String name, boolean value) { public Mono<Void> setOptionBoolean(String name, boolean value) {
return thenOrFatalError(sendDirectly(new TdApi.SetOption(name, new TdApi.OptionValueBoolean(value)), false)).publishOn(scheduler); return thenOrFatalError(sendDirectly(new TdApi.SetOption(name, new TdApi.OptionValueBoolean(value)), false));
} }
/** /**
@ -250,7 +250,7 @@ public class AsyncTdEasy {
return Mono.error(new UnsupportedOperationException("The option " + name + " is of type " return Mono.error(new UnsupportedOperationException("The option " + name + " is of type "
+ value.getClass().getSimpleName())); + value.getClass().getSimpleName()));
} }
}).publishOn(scheduler); });
} }
/** /**
@ -269,7 +269,7 @@ public class AsyncTdEasy {
return Mono.error(new UnsupportedOperationException("The option " + name + " is of type " return Mono.error(new UnsupportedOperationException("The option " + name + " is of type "
+ value.getClass().getSimpleName())); + value.getClass().getSimpleName()));
} }
}).publishOn(scheduler); });
} }
/** /**
@ -288,7 +288,7 @@ public class AsyncTdEasy {
return Mono.error(new UnsupportedOperationException("The option " + name + " is of type " return Mono.error(new UnsupportedOperationException("The option " + name + " is of type "
+ value.getClass().getSimpleName())); + value.getClass().getSimpleName()));
} }
}).publishOn(scheduler); });
} }
/** /**
@ -299,7 +299,7 @@ public class AsyncTdEasy {
* @return The request response. * @return The request response.
*/ */
public <T extends Object> Mono<TdResult<T>> execute(TdApi.Function request) { public <T extends Object> Mono<TdResult<T>> execute(TdApi.Function request) {
return td.<T>execute(request, true).publishOn(scheduler); return td.<T>execute(request, true);
} }
/** /**
@ -345,8 +345,7 @@ public class AsyncTdEasy {
.doOnSuccess(s -> { .doOnSuccess(s -> {
logger.info("AsyncTdEasy closed successfully"); logger.info("AsyncTdEasy closed successfully");
}) })
.then() .then();
.publishOn(scheduler);
} }
/** /**
@ -386,7 +385,7 @@ public class AsyncTdEasy {
} else { } else {
return (Update) obj; return (Update) obj;
} }
}).publishOn(scheduler); });
} }
private void analyzeFatalErrors(Object obj) { private void analyzeFatalErrors(Object obj) {
@ -413,7 +412,7 @@ public class AsyncTdEasy {
} }
public Mono<Boolean> isBot() { public Mono<Boolean> isBot() {
return Mono.from(settings.asFlux()).single().map(TdEasySettings::isBotTokenSet).publishOn(scheduler); return Mono.from(settings.asFlux()).single().map(TdEasySettings::isBotTokenSet);
} }
private Publisher<TdApi.Update> preprocessUpdates(TdApi.Object updateObj) { private Publisher<TdApi.Update> preprocessUpdates(TdApi.Object updateObj) {

View File

@ -187,11 +187,12 @@ public class TdClusterManager {
return Mono.just(Vertx.vertx(vertxOptions)); return Mono.just(Vertx.vertx(vertxOptions));
} }
}) })
.publishOn(Schedulers.parallel()) .subscribeOn(Schedulers.boundedElastic())
.flatMap(vertx -> Mono .flatMap(vertx -> Mono
.fromCallable(() -> new TdClusterManager(mgr, vertxOptions, vertx)) .fromCallable(() -> new TdClusterManager(mgr, vertxOptions, vertx))
.publishOn(Schedulers.boundedElastic()) .subscribeOn(Schedulers.boundedElastic())
); )
.publishOn(Schedulers.parallel());
} }
public Vertx getVertx() { public Vertx getVertx() {