diff --git a/pom.xml b/pom.xml index dc8a54a..b8b2de7 100644 --- a/pom.xml +++ b/pom.xml @@ -112,12 +112,12 @@ io.projectreactor reactor-core - 3.4.8 + 3.4.9 io.projectreactor reactor-tools - 3.4.8 + 3.4.9 com.akaita.java diff --git a/src/main/java/it/tdlight/tdlibsession/td/direct/AsyncTdDirectImpl.java b/src/main/java/it/tdlight/tdlibsession/td/direct/AsyncTdDirectImpl.java index d4eba7a..f725be5 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/direct/AsyncTdDirectImpl.java +++ b/src/main/java/it/tdlight/tdlibsession/td/direct/AsyncTdDirectImpl.java @@ -120,6 +120,6 @@ public class AsyncTdDirectImpl implements AsyncTdDirect { // Try to emit false, so that if it has not been closed from tdlib, now it is explicitly false. closedFromTd.tryEmitValue(false); }) - .subscribeOn(Schedulers.parallel()); + .subscribeOn(Schedulers.boundedElastic()); } } diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/direct/AsyncTdMiddleDirect.java b/src/main/java/it/tdlight/tdlibsession/td/middle/direct/AsyncTdMiddleDirect.java index 8815eb2..198c0ea 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/direct/AsyncTdMiddleDirect.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/direct/AsyncTdMiddleDirect.java @@ -97,7 +97,7 @@ public class AsyncTdMiddleDirect extends AbstractVerticle implements AsyncTdMidd .doOnNext(s -> logger.trace("Received update from tdlib: {}", s.getClass().getSimpleName())) .doOnError(ex -> logger.info("TdMiddle verticle error", ex)) .doOnTerminate(() -> logger.debug("TdMiddle verticle stopped")) - .subscribeOn(Schedulers.parallel()); + .subscribeOn(Schedulers.boundedElastic()); } @Override diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java b/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java index 240edd2..c306f42 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java @@ -107,7 +107,6 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { } return onSuccessfulStartRequest(td, botAddress, botAlias, botId, local); }) - .flatMap(voidMono -> voidMono.hide().subscribeOn(Schedulers.boundedElastic())) .doOnSuccess(s -> logger.trace("Stated verticle")) ); } diff --git a/src/main/java/it/tdlight/utils/BinlogUtils.java b/src/main/java/it/tdlight/utils/BinlogUtils.java index 8435614..f8006d0 100644 --- a/src/main/java/it/tdlight/utils/BinlogUtils.java +++ b/src/main/java/it/tdlight/utils/BinlogUtils.java @@ -38,8 +38,7 @@ public class BinlogUtils { ) // Open file .map(x -> new BinlogAsyncFile(vertxFilesystem, path)) - .single() - .publishOn(Schedulers.boundedElastic()); + .single(); } public static Mono saveBinlog(BinlogAsyncFile binlog, Buffer data) { @@ -66,8 +65,7 @@ public class BinlogUtils { .then(retrieveBinlog(vertxFilesystem, binlogPath)) ) .single() - .then() - .publishOn(Schedulers.boundedElastic()); + .then(); } public static Mono cleanSessionPath(FileSystem vertxFilesystem, @@ -87,8 +85,7 @@ public class BinlogUtils { .flatMap(file -> vertxFilesystem.rxDeleteRecursive(file, true).as(MonoUtils::toMono)) .onErrorResume(ex -> Mono.empty()) .then() - ) - .publishOn(Schedulers.boundedElastic()); + ); } public static String humanReadableByteCountBin(long bytes) { @@ -126,7 +123,6 @@ public class BinlogUtils { var opts = new DeliveryOptions().setLocalOnly(local).setSendTimeout(Duration.ofSeconds(10).toMillis()); tuple.getT1().reply(new EndSessionMessage(botId, tuple.getT2()), opts); }) - .then() - .publishOn(Schedulers.boundedElastic()); + .then(); } }