diff --git a/src/main/java/it/tdlight/tdlibsession/EventBusFlux.java b/src/main/java/it/tdlight/tdlibsession/EventBusFlux.java index 600147c..462c871 100644 --- a/src/main/java/it/tdlight/tdlibsession/EventBusFlux.java +++ b/src/main/java/it/tdlight/tdlibsession/EventBusFlux.java @@ -99,7 +99,7 @@ public class EventBusFlux { var responseHandler = MonoUtils.toHandler(itemSink); eventBus.request(subscriptionAddress + ".signal", SignalMessage.onNext(item), signalDeliveryOptions, responseHandler); })) - .publishOn(Schedulers.parallel()) + .subscribeOn(Schedulers.parallel()) .subscribe(response -> {}, error -> { if (error instanceof ReplyException) { var errorMessageCode = ((ReplyException) error).failureCode(); @@ -313,7 +313,7 @@ public class EventBusFlux { }))) .publishOn(Schedulers.boundedElastic()) .onBackpressureBuffer() - .publishOn(Schedulers.parallel()) + .subscribeOn(Schedulers.parallel()) .subscribe(v -> {}, emitter::error); emitter.onDispose(() -> { diff --git a/src/main/java/it/tdlight/tdlibsession/remoteclient/TDLibRemoteClient.java b/src/main/java/it/tdlight/tdlibsession/remoteclient/TDLibRemoteClient.java index f75e89a..a7a812b 100644 --- a/src/main/java/it/tdlight/tdlibsession/remoteclient/TDLibRemoteClient.java +++ b/src/main/java/it/tdlight/tdlibsession/remoteclient/TDLibRemoteClient.java @@ -202,7 +202,7 @@ public class TDLibRemoteClient implements AutoCloseable { return Mono.empty(); }); }) - .publishOn(Schedulers.parallel()) + .subscribeOn(Schedulers.parallel()) .subscribe( v -> {}, ex -> logger.error("Bots starter activity crashed. From now on, no new bots can be started anymore", ex) diff --git a/src/main/java/it/tdlight/tdlibsession/td/direct/AsyncTdDirectImpl.java b/src/main/java/it/tdlight/tdlibsession/td/direct/AsyncTdDirectImpl.java index 02832bc..cc01e95 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/direct/AsyncTdDirectImpl.java +++ b/src/main/java/it/tdlight/tdlibsession/td/direct/AsyncTdDirectImpl.java @@ -102,7 +102,7 @@ public class AsyncTdDirectImpl implements AsyncTdDirect { result -> logger.warn("Close result: {}", result), ex -> logger.error("Error when disposing td client", ex) ); - }).publishOn(Schedulers.parallel()).subscribe(); + }).subscribeOn(Schedulers.parallel()).subscribe(); }); }) .subscribeOn(Schedulers.boundedElastic()) diff --git a/src/main/java/it/tdlight/tdlibsession/td/direct/TestClient.java b/src/main/java/it/tdlight/tdlibsession/td/direct/TestClient.java index dd81f83..770aa3c 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/direct/TestClient.java +++ b/src/main/java/it/tdlight/tdlibsession/td/direct/TestClient.java @@ -60,7 +60,7 @@ public class TestClient implements TelegramClient { .asFlux() .buffer(50) .doOnNext(ub -> logger.trace("Received update block of size {}", ub.size())) - .publishOn(testClientScheduler) + .subscribeOn(testClientScheduler) .subscribe(updatesHandler::onUpdates, updateExceptionHandler::onException); for (String featureName : features) { @@ -70,7 +70,7 @@ public class TestClient implements TelegramClient { .repeat() .buffer(100) .doOnNext(updatesHandler::onUpdates) - .publishOn(testClientScheduler) + .subscribeOn(testClientScheduler) .subscribe(); break; default: diff --git a/src/main/java/it/tdlight/tdlibsession/td/easy/AsyncTdEasy.java b/src/main/java/it/tdlight/tdlibsession/td/easy/AsyncTdEasy.java index c54ff3d..7d8029a 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/easy/AsyncTdEasy.java +++ b/src/main/java/it/tdlight/tdlibsession/td/easy/AsyncTdEasy.java @@ -94,7 +94,7 @@ public class AsyncTdEasy { } }) .doOnComplete(() -> { - authState.asFlux().take(1).single().publishOn(scheduler).subscribe(authState -> { + authState.asFlux().take(1).single().subscribeOn(scheduler).subscribe(authState -> { onUpdatesTerminated(); if (authState.getConstructor() != AuthorizationStateClosed.CONSTRUCTOR) { logger.warn("Updates stream has closed while" @@ -105,7 +105,7 @@ public class AsyncTdEasy { } }); }).doOnError(ex -> { - authState.asFlux().take(1).single().publishOn(scheduler).subscribe(authState -> { + authState.asFlux().take(1).single().subscribeOn(scheduler).subscribe(authState -> { onUpdatesTerminated(); if (authState.getConstructor() != AuthorizationStateClosed.CONSTRUCTOR) { logger.warn("Updates stream has terminated with an error while" @@ -136,7 +136,7 @@ public class AsyncTdEasy { } // Register fatal error handler - fatalError.asMono().flatMap(settings.getFatalErrorHandler()::onFatalError).publishOn(scheduler).subscribe(); + fatalError.asMono().flatMap(settings.getFatalErrorHandler()::onFatalError).subscribeOn(scheduler).subscribe(); return true; }) diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java b/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java index da0324b..b312a63 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java @@ -169,7 +169,7 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { } }).subscribeOn(Schedulers.boundedElastic())) .then() - .publishOn(Schedulers.parallel()) + .subscribeOn(Schedulers.parallel()) .subscribe(v -> {}, ex -> logger.error("Error when processing an execute request", ex), () -> logger.trace("Finished handling execute requests") @@ -182,7 +182,7 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { } BinlogUtils .readBinlogConsumer(vertx, readBinlogConsumer, botId, local) - .publishOn(Schedulers.parallel()) + .subscribeOn(Schedulers.parallel()) .subscribe(v -> {}, ex -> logger.error("Error when processing a read-binlog request", ex)); MessageConsumer readyToReceiveConsumer = vertx.eventBus().consumer(botAddress + ".ready-to-receive"); @@ -255,7 +255,7 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { .andThen(pingConsumer.rxCompletionHandler()) .as(MonoUtils::toMono) .doOnSuccess(s -> logger.trace("Finished preparing listeners")) - .publishOn(Schedulers.parallel()) + .subscribeOn(Schedulers.parallel()) .subscribe(v -> {}, registrationSink::error, registrationSink::success); }) .subscribeOn(Schedulers.boundedElastic()) diff --git a/src/main/java/it/tdlight/utils/MonoUtils.java b/src/main/java/it/tdlight/utils/MonoUtils.java index 10caab8..9932d88 100644 --- a/src/main/java/it/tdlight/utils/MonoUtils.java +++ b/src/main/java/it/tdlight/utils/MonoUtils.java @@ -343,7 +343,7 @@ public class MonoUtils { private static Future toVertxFuture(Mono toTransform) { var promise = Promise.promise(); - toTransform.publishOn(Schedulers.parallel()).subscribe(next -> {}, promise::fail, promise::complete); + toTransform.subscribeOn(Schedulers.parallel()).subscribe(next -> {}, promise::fail, promise::complete); return promise.future(); } @@ -445,7 +445,7 @@ public class MonoUtils { if (backpressureSize != null) { AtomicBoolean drained = new AtomicBoolean(true); var drainSubscription = backpressureSize - .publishOn(Schedulers.boundedElastic()) + .subscribeOn(Schedulers.boundedElastic()) .subscribe(size -> { writeQueueFull = size >= this.writeQueueMaxSize; @@ -465,7 +465,7 @@ public class MonoUtils { termination .asMono() .doOnTerminate(drainSubscription::dispose) - .publishOn(Schedulers.boundedElastic()) + .subscribeOn(Schedulers.boundedElastic()) .subscribe(); } } @@ -516,7 +516,7 @@ public class MonoUtils { @Override public io.vertx.core.streams.ReadStream handler(@io.vertx.codegen.annotations.Nullable Handler handler) { - sink.asFlux().publishOn(scheduler).subscribe(new CoreSubscriber() { + sink.asFlux().subscribeOn(scheduler).subscribe(new CoreSubscriber() { @Override public void onSubscribe(@NotNull Subscription s) { @@ -656,7 +656,7 @@ public class MonoUtils { @SuppressWarnings("DuplicatedCode") @Override public io.vertx.core.streams.ReadStream handler(@io.vertx.codegen.annotations.Nullable Handler handler) { - flux.publishOn(scheduler).subscribe(new CoreSubscriber() { + flux.subscribeOn(scheduler).subscribe(new CoreSubscriber() { @Override public void onSubscribe(@NotNull Subscription s) {