diff --git a/src/main/java/it/tdlight/tdlibsession/EventBusFlux.java b/src/main/java/it/tdlight/tdlibsession/EventBusFlux.java index 83440db..600147c 100644 --- a/src/main/java/it/tdlight/tdlibsession/EventBusFlux.java +++ b/src/main/java/it/tdlight/tdlibsession/EventBusFlux.java @@ -99,7 +99,7 @@ public class EventBusFlux { var responseHandler = MonoUtils.toHandler(itemSink); eventBus.request(subscriptionAddress + ".signal", SignalMessage.onNext(item), signalDeliveryOptions, responseHandler); })) - .publishOn(Schedulers.single()) + .publishOn(Schedulers.parallel()) .subscribe(response -> {}, error -> { if (error instanceof ReplyException) { var errorMessageCode = ((ReplyException) error).failureCode(); @@ -208,7 +208,7 @@ public class EventBusFlux { sink.success(); } }); - }).publishOn(Schedulers.single()).share(); + }).publishOn(Schedulers.parallel()).share(); return Tuples.of(servedMono, fatalErrorSink.asMono()); } @@ -313,7 +313,7 @@ public class EventBusFlux { }))) .publishOn(Schedulers.boundedElastic()) .onBackpressureBuffer() - .publishOn(Schedulers.single()) + .publishOn(Schedulers.parallel()) .subscribe(v -> {}, emitter::error); emitter.onDispose(() -> { diff --git a/src/main/java/it/tdlight/tdlibsession/td/direct/AsyncTdDirectImpl.java b/src/main/java/it/tdlight/tdlibsession/td/direct/AsyncTdDirectImpl.java index c24e856..02832bc 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/direct/AsyncTdDirectImpl.java +++ b/src/main/java/it/tdlight/tdlibsession/td/direct/AsyncTdDirectImpl.java @@ -102,12 +102,12 @@ public class AsyncTdDirectImpl implements AsyncTdDirect { result -> logger.warn("Close result: {}", result), ex -> logger.error("Error when disposing td client", ex) ); - }).publishOn(Schedulers.single()).subscribe(); + }).publishOn(Schedulers.parallel()).subscribe(); }); }) .subscribeOn(Schedulers.boundedElastic()) - .publishOn(Schedulers.boundedElastic()) + .publishOn(Schedulers.parallel()) ) - .subscribeOn(Schedulers.boundedElastic()); + .subscribeOn(Schedulers.parallel()); } } diff --git a/src/main/java/it/tdlight/tdlibsession/td/easy/AsyncTdEasy.java b/src/main/java/it/tdlight/tdlibsession/td/easy/AsyncTdEasy.java index a5a9672..50614bc 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/easy/AsyncTdEasy.java +++ b/src/main/java/it/tdlight/tdlibsession/td/easy/AsyncTdEasy.java @@ -140,7 +140,7 @@ public class AsyncTdEasy { return true; }) - .publishOn(Schedulers.boundedElastic()) + .subscribeOn(Schedulers.boundedElastic()) .flatMap(_v -> { this.settings.tryEmitNext(settings); return Mono.empty(); @@ -563,7 +563,7 @@ public class AsyncTdEasy { )) .filterWhen(file -> Mono .fromCallable(() -> Files.exists(file)) - .publishOn(Schedulers.boundedElastic())) + .subscribeOn(Schedulers.boundedElastic())) .doOnNext(directory -> { try { if (!Files.walk(directory) diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/TdClusterManager.java b/src/main/java/it/tdlight/tdlibsession/td/middle/TdClusterManager.java index a7ab54e..aad1535 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/TdClusterManager.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/TdClusterManager.java @@ -187,7 +187,7 @@ public class TdClusterManager { return Mono.just(Vertx.vertx(vertxOptions)); } }) - .publishOn(Schedulers.single()) + .publishOn(Schedulers.parallel()) .flatMap(vertx -> Mono .fromCallable(() -> new TdClusterManager(mgr, vertxOptions, vertx)) .publishOn(Schedulers.boundedElastic()) diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java b/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java index 47b7963..fe65c50 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java @@ -142,7 +142,7 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle { })) .then(setupPing()); }) - .publishOn(Schedulers.single()); + .publishOn(Schedulers.parallel()); } private Mono setupPing() { @@ -171,7 +171,7 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle { }) .doOnNext(s -> logger.debug("END PING")) .then(MonoUtils.emitEmpty(this.pingFail)) - .subscribeOn(Schedulers.single()) + .subscribeOn(Schedulers.parallel()) .subscribe(); } logger.trace("Ping setup success"); @@ -206,9 +206,9 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle { return Mono .fromRunnable(() -> logger.trace("Called receive() from parent")) - .then(updates.asMono().publishOn(Schedulers.single())) + .then(updates.asMono().publishOn(Schedulers.parallel())) .timeout(Duration.ofSeconds(5)) - .publishOn(Schedulers.single()) + .publishOn(Schedulers.parallel()) .flatMap(MonoUtils::fromMessageConsumer) .flatMapMany(registration -> Mono .fromRunnable(() -> logger.trace("Registering updates flux")) @@ -240,7 +240,7 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle { return Mono.fromCallable(() -> TdResult.failed(updates.error()).orElseThrow()); } }) - .publishOn(Schedulers.single()) + .publishOn(Schedulers.parallel()) .flatMapSequential(this::interceptUpdate) // Redirect errors to crash sink .doOnError(crash::tryEmitError) @@ -250,7 +250,7 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle { }) .doOnTerminate(updatesStreamEnd::tryEmitEmpty) - .publishOn(Schedulers.single()); + .publishOn(Schedulers.parallel()); } private Mono interceptUpdate(TdApi.Object update) { @@ -262,11 +262,11 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle { case TdApi.AuthorizationStateClosed.CONSTRUCTOR: return Mono.fromRunnable(() -> logger.trace("Received AuthorizationStateClosed from tdlib")) .then(cluster.getEventBus().rxRequest(this.botAddress + ".read-binlog", EMPTY).as(MonoUtils::toMono)) - .flatMap(latestBinlogMsg -> Mono.fromCallable(() -> latestBinlogMsg.body()).subscribeOn(Schedulers.boundedElastic())) + .flatMap(latestBinlogMsg -> Mono.fromCallable(() -> latestBinlogMsg.body()).subscribeOn(Schedulers.parallel())) .doOnNext(latestBinlog -> logger.info("Received binlog from server. Size: " + BinlogUtils.humanReadableByteCountBin(latestBinlog.binlog().length()))) .flatMap(latestBinlog -> this.saveBinlog(latestBinlog.binlog())) .doOnSuccess(s -> logger.info("Overwritten binlog from server")) - .publishOn(Schedulers.boundedElastic()) + .publishOn(Schedulers.parallel()) .thenReturn(update); } break; @@ -291,13 +291,13 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle { } else { return resp.body().toTdResult(); } - }).subscribeOn(Schedulers.boundedElastic()) + }).subscribeOn(Schedulers.parallel()) ) .doOnSuccess(s -> logger.trace("Executed request")) ) .switchIfEmpty(Mono.defer(() -> Mono.fromCallable(() -> { throw ResponseError.newResponseError(request, botAlias, new TdError(500, "Client is closed or response is empty")); }))) - .publishOn(Schedulers.single()); + .publishOn(Schedulers.parallel()); } } diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/direct/AsyncTdMiddleDirect.java b/src/main/java/it/tdlight/tdlibsession/td/middle/direct/AsyncTdMiddleDirect.java index d414882..c33bc17 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/direct/AsyncTdMiddleDirect.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/direct/AsyncTdMiddleDirect.java @@ -91,8 +91,7 @@ public class AsyncTdMiddleDirect extends AbstractVerticle implements AsyncTdMidd .doOnNext(s -> logger.trace("Received update from tdlib: {}", s.getClass().getSimpleName())) .doOnError(ex -> logger.info("TdMiddle verticle error", ex)) .doOnTerminate(() -> logger.debug("TdMiddle verticle stopped")) - .subscribeOn(Schedulers.boundedElastic()) - .publishOn(Schedulers.single()); + .subscribeOn(Schedulers.parallel()); } @Override @@ -100,6 +99,6 @@ public class AsyncTdMiddleDirect extends AbstractVerticle implements AsyncTdMidd return td .execute(requestFunction, executeDirectly) .onErrorMap(error -> ResponseError.newResponseError(requestFunction, botAlias, error)) - .publishOn(Schedulers.single()); + .publishOn(Schedulers.parallel()); } } diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java b/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java index f55492e..68cc6a5 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java @@ -107,9 +107,9 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { } return onSuccessfulStartRequest(td, botAddress, botAlias, botId, local); }) - .flatMap(voidMono -> voidMono.hide().subscribeOn(Schedulers.boundedElastic()).publishOn(Schedulers.single())) + .flatMap(voidMono -> voidMono.hide().subscribeOn(Schedulers.boundedElastic()).publishOn(Schedulers.parallel())) .doOnSuccess(s -> logger.trace("Stated verticle")) - .publishOn(Schedulers.single()) + .publishOn(Schedulers.parallel()) ); } @@ -141,7 +141,7 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { }) .flatMapSequential(msg -> Mono .fromCallable(() -> Tuples.of(msg, msg.body())) - .subscribeOn(Schedulers.boundedElastic()) + .subscribeOn(Schedulers.parallel()) ) .flatMapSequential(tuple -> { var msg = tuple.getT1(); @@ -169,7 +169,7 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { } }).subscribeOn(Schedulers.boundedElastic())) .then() - .publishOn(Schedulers.single()) + .publishOn(Schedulers.parallel()) .subscribe(v -> {}, ex -> logger.error("Error when processing an execute request", ex), () -> logger.trace("Finished handling execute requests") @@ -182,7 +182,7 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { } BinlogUtils .readBinlogConsumer(vertx, readBinlogConsumer, botId, local) - .publishOn(Schedulers.single()) + .publishOn(Schedulers.parallel()) .subscribe(v -> {}, ex -> logger.error("Error when processing a read-binlog request", ex)); MessageConsumer readyToReceiveConsumer = vertx.eventBus().consumer(botAddress + ".ready-to-receive"); @@ -255,11 +255,11 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { .andThen(pingConsumer.rxCompletionHandler()) .as(MonoUtils::toMono) .doOnSuccess(s -> logger.trace("Finished preparing listeners")) - .publishOn(Schedulers.single()) + .publishOn(Schedulers.parallel()) .subscribe(v -> {}, registrationSink::error, registrationSink::success); }) .subscribeOn(Schedulers.boundedElastic()) - .publishOn(Schedulers.single()); + .publishOn(Schedulers.parallel()); } /** @@ -298,10 +298,10 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { // Since every consumer of ReadBinLog is identical, this should not pose a problem. .delay(Duration.ofMinutes(30)) .then(ec.rxUnregister().as(MonoUtils::toMono)) - .publishOn(Schedulers.single()) + .publishOn(Schedulers.parallel()) .subscribe(); return null; - }).subscribeOn(Schedulers.boundedElastic()).publishOn(Schedulers.single())) + }).subscribeOn(Schedulers.boundedElastic()).publishOn(Schedulers.parallel())) ) .then(readyToReceiveConsumer .asMono() @@ -314,7 +314,7 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { .doOnError(ex -> logger.error("Undeploy of bot \"" + botAlias + "\": stop failed", ex)) .doOnTerminate(() -> logger.info("Undeploy of bot \"" + botAlias + "\": stopped")) ) - .publishOn(Schedulers.single()) + .publishOn(Schedulers.parallel()) ); } diff --git a/src/main/java/it/tdlight/utils/MonoUtils.java b/src/main/java/it/tdlight/utils/MonoUtils.java index 90ee70b..10caab8 100644 --- a/src/main/java/it/tdlight/utils/MonoUtils.java +++ b/src/main/java/it/tdlight/utils/MonoUtils.java @@ -343,7 +343,7 @@ public class MonoUtils { private static Future toVertxFuture(Mono toTransform) { var promise = Promise.promise(); - toTransform.publishOn(Schedulers.single()).subscribe(next -> {}, promise::fail, promise::complete); + toTransform.publishOn(Schedulers.parallel()).subscribe(next -> {}, promise::fail, promise::complete); return promise.future(); } @@ -363,15 +363,17 @@ public class MonoUtils { sink.onDispose(messageConsumer::unregister); }) //.startWith(MonoUtils.castVoid(messageConsumer.rxCompletionHandler().as(MonoUtils::toMono))) - .flatMapSequential(msg -> Mono.fromCallable(msg::body).subscribeOn(Schedulers.boundedElastic())) - .subscribeOn(Schedulers.boundedElastic()); + .flatMapSequential(msg -> Mono + .fromCallable(msg::body) + .subscribeOn(Schedulers.parallel()) + ); } public static Mono, Flux>> fromMessageConsumer(MessageConsumer messageConsumer) { return fromReplyableMessageConsumer(messageConsumer) .map(tuple -> tuple.mapT2(msgs -> msgs.flatMapSequential(msg -> Mono .fromCallable(msg::body) - .subscribeOn(Schedulers.boundedElastic()))) + .subscribeOn(Schedulers.parallel()))) ); } @@ -379,7 +381,7 @@ public class MonoUtils { return fromReplyableMessageConsumer(messageConsumer) .map(tuple -> tuple.mapT2(msgs -> msgs.flatMapSequential(msg -> Mono .fromCallable(() -> Tuples., T>of(msg, msg.body())) - .subscribeOn(Schedulers.boundedElastic()))) + .subscribeOn(Schedulers.parallel()))) ); }