diff --git a/src/main/java/it/tdlight/tdlibsession/td/direct/AsyncTdDirectImpl.java b/src/main/java/it/tdlight/tdlibsession/td/direct/AsyncTdDirectImpl.java index 00be6ad..c3580de 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/direct/AsyncTdDirectImpl.java +++ b/src/main/java/it/tdlight/tdlibsession/td/direct/AsyncTdDirectImpl.java @@ -74,7 +74,7 @@ public class AsyncTdDirectImpl implements AsyncTdDirect { return telegramClientFactory.create(implementationDetails) .flatMapMany(client -> Flux .create(updatesSink -> { - Schedulers.boundedElastic().schedule(() -> client.initialize((TdApi.Object object) -> { + client.initialize((TdApi.Object object) -> { updatesSink.next(object); // Close the emitter if receive closed state if (object.getConstructor() == UpdateAuthorizationState.CONSTRUCTOR @@ -84,7 +84,7 @@ public class AsyncTdDirectImpl implements AsyncTdDirect { closedFromTd.tryEmitValue(true); updatesSink.complete(); } - }, updatesSink::error, updatesSink::error)); + }, updatesSink::error, updatesSink::error); if (td.tryEmitValue(client).isFailure()) { updatesSink.error(new TdError(500, "Failed to emit td client")); @@ -105,7 +105,8 @@ public class AsyncTdDirectImpl implements AsyncTdDirect { }).publishOn(Schedulers.single()).subscribe(); }); }) - .publishOn(Schedulers.boundedElastic()) + .subscribeOn(Schedulers.boundedElastic()) + .publishOn(Schedulers.single()) ); } } diff --git a/src/main/java/it/tdlight/tdlibsession/td/easy/AsyncTdEasy.java b/src/main/java/it/tdlight/tdlibsession/td/easy/AsyncTdEasy.java index 80c3f1e..8d9ed6b 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/easy/AsyncTdEasy.java +++ b/src/main/java/it/tdlight/tdlibsession/td/easy/AsyncTdEasy.java @@ -58,7 +58,7 @@ public class AsyncTdEasy { private final Logger logger; - private static final Scheduler scheduler = Schedulers.newSingle("AsyncTdEasy"); + private static final Scheduler scheduler = Schedulers.newSingle("AsyncTdEasy", true); private final Many authState = Sinks.many().replay().latest(); private final Many requestedDefinitiveExit = Sinks.many().replay().latestOrDefault(false); private final Many settings = Sinks.many().replay().latest(); diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java b/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java index 71ba826..c72649b 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java @@ -166,17 +166,20 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle { .then() .doOnSuccess(s -> logger.trace("Sent ready-to-receive, received reply")) .doOnSuccess(s -> logger.trace("About to read updates flux")) - .then(updates.asMono()) + .then(updates.asMono().publishOn(Schedulers.single())) .timeout(Duration.ofSeconds(5)) - .flatMapMany(Flux::hide) + .publishOn(Schedulers.single()) + .flatMapMany(tdResultListFlux -> tdResultListFlux.publishOn(Schedulers.single())) + .startWith(MonoUtils + .castVoid(Mono.fromRunnable(() -> { + cluster.getEventBus().send(botAddress + ".ready-to-receive", EMPTY, deliveryOptionsWithTimeout); + }).subscribeOn(Schedulers.boundedElastic())) + ) .timeout(Duration.ofMinutes(1), Mono.fromCallable(() -> { var ex = new ConnectException("Server did not respond to 12 pings after 1 minute (5 seconds per ping)"); ex.setStackTrace(new StackTraceElement[0]); throw ex; })) - .doOnSubscribe(s -> Schedulers.boundedElastic().schedule(() -> { - cluster.getEventBus().send(botAddress + ".ready-to-receive", EMPTY, deliveryOptionsWithTimeout); - })) .flatMapSequential(updates -> { if (updates.succeeded()) { return Flux.fromIterable(updates.value()); @@ -184,6 +187,7 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle { return Mono.fromCallable(() -> TdResult.failed(updates.error()).orElseThrow()); } }) + .publishOn(Schedulers.single()) .flatMapSequential(this::interceptUpdate) .doOnError(crash::tryEmitError) .doOnTerminate(updatesStreamEnd::tryEmitEmpty) diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java b/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java index 87dabc1..cda42d2 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java @@ -109,7 +109,7 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { } return onSuccessfulStartRequest(td, botAddress, botAlias, botId, local); }) - .flatMap(Mono::hide) + .flatMap(voidMono -> voidMono.hide().subscribeOn(Schedulers.boundedElastic()).publishOn(Schedulers.single())) .doOnSuccess(s -> logger.trace("Stated verticle")) .publishOn(Schedulers.single()) ); @@ -128,7 +128,7 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { } private Mono listen(AsyncTdDirectImpl td, String botAddress, String botAlias, int botId, boolean local) { - return Mono.create(registrationSink -> Schedulers.boundedElastic().schedule(() -> { + return Mono.create(registrationSink -> { logger.trace("Preparing listeners"); MessageConsumer executeConsumer = vertx.eventBus().consumer(botAddress + ".execute"); @@ -248,7 +248,9 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { .doOnSuccess(s -> logger.trace("Finished preparing listeners")) .publishOn(Schedulers.single()) .subscribe(v -> {}, registrationSink::error, registrationSink::success); - })); + }) + .subscribeOn(Schedulers.boundedElastic()) + .publishOn(Schedulers.single()); } /** @@ -281,14 +283,16 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { .then(readBinlogConsumer .asMono() .timeout(Duration.ofSeconds(10), Mono.empty()) - .doOnNext(ec -> Schedulers.boundedElastic().schedule(() -> Mono - // ReadBinLog will live for another 30 minutes. - // Since every consumer of ReadBinLog is identical, this should not pose a problem. - .delay(Duration.ofMinutes(30)) - .then(ec.rxUnregister().as(MonoUtils::toMono)) - .publishOn(Schedulers.single()) - .subscribe()) - ) + .flatMap(ec -> Mono.fromCallable(() -> { + Mono + // ReadBinLog will live for another 30 minutes. + // Since every consumer of ReadBinLog is identical, this should not pose a problem. + .delay(Duration.ofMinutes(30)) + .then(ec.rxUnregister().as(MonoUtils::toMono)) + .publishOn(Schedulers.single()) + .subscribe(); + return null; + }).subscribeOn(Schedulers.boundedElastic()).publishOn(Schedulers.single())) ) .then(readyToReceiveConsumer .asMono() diff --git a/src/main/java/it/tdlight/utils/MonoUtils.java b/src/main/java/it/tdlight/utils/MonoUtils.java index 0f15389..27b5d98 100644 --- a/src/main/java/it/tdlight/utils/MonoUtils.java +++ b/src/main/java/it/tdlight/utils/MonoUtils.java @@ -100,7 +100,7 @@ public class MonoUtils { } public static Mono fromBlockingMaybe(Callable callable) { - return Mono.fromCallable(callable).publishOn(Schedulers.boundedElastic()); + return Mono.fromCallable(callable).subscribeOn(Schedulers.boundedElastic()).publishOn(Schedulers.single()); } public static Mono fromBlockingSingle(Callable callable) { @@ -343,12 +343,13 @@ public class MonoUtils { public static Flux fromConsumer(MessageConsumer messageConsumer) { return Flux.>create(sink -> { - Schedulers.boundedElastic().schedule(() -> { - messageConsumer.handler(sink::next); - messageConsumer.endHandler(e -> sink.complete()); - sink.onDispose(messageConsumer::unregister); - }); - }).flatMapSequential(msg -> Mono.fromCallable(msg::body).publishOn(Schedulers.boundedElastic())); + messageConsumer.handler(sink::next); + messageConsumer.endHandler(e -> sink.complete()); + sink.onDispose(messageConsumer::unregister); + }) + .subscribeOn(Schedulers.boundedElastic()) + .publishOn(Schedulers.single()) + .flatMapSequential(msg -> Mono.fromCallable(msg::body).publishOn(Schedulers.boundedElastic())); } public static class SinkRWStream implements io.vertx.core.streams.WriteStream, io.vertx.core.streams.ReadStream {