From d093b680f8da2b41ffbad163725ef65ce0819c97 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Sat, 23 Jan 2021 11:00:14 +0100 Subject: [PATCH] Update AsyncTdMiddleEventBusClient.java --- .../client/AsyncTdMiddleEventBusClient.java | 23 ++++--------------- 1 file changed, 5 insertions(+), 18 deletions(-) diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java b/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java index 744cae9..7720ddc 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java @@ -1,10 +1,8 @@ package it.tdlight.tdlibsession.td.middle.client; import io.reactivex.Completable; -import io.vertx.circuitbreaker.CircuitBreakerOptions; import io.vertx.core.eventbus.DeliveryOptions; import io.vertx.core.json.JsonObject; -import io.vertx.reactivex.circuitbreaker.CircuitBreaker; import io.vertx.reactivex.core.AbstractVerticle; import io.vertx.reactivex.core.eventbus.Message; import it.tdlight.jni.TdApi; @@ -80,17 +78,6 @@ public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements Asy @Override public Completable rxStart() { - CircuitBreaker startBreaker = CircuitBreaker.create("bot-" + botAddress + "-server-online-check-circuit-breaker", vertx, - new CircuitBreakerOptions().setMaxFailures(1).setMaxRetries(4).setTimeout(30000) - ) - .retryPolicy(policy -> 4000L) - .openHandler(closed -> { - logger.error("Circuit opened! " + botAddress); - }) - .closeHandler(closed -> { - logger.error("Circuit closed! " + botAddress); - }); - return Mono .fromCallable(() -> { var botAddress = config().getString("botAddress"); @@ -111,7 +98,7 @@ public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements Asy this.initTime = System.currentTimeMillis(); return null; }) - .then(startBreaker.rxExecute(future -> { + .then(Mono.create(future -> { logger.debug("Waiting for " + botAddress + ".readyToStart"); var readyToStartConsumer = cluster.getEventBus().consumer(botAddress + ".readyToStart"); readyToStartConsumer.handler((Message pingMsg) -> { @@ -137,7 +124,8 @@ public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements Asy .rxRequest(botAddress + ".isWorking", EMPTY, deliveryOptionsWithTimeout) .as(MonoUtils::toMono) .doOnError(ex -> logger.error("Failed to request isWorking", ex))) - .subscribe(v -> {}, future::fail, future::complete); + .subscribeOn(Schedulers.single()) + .subscribe(v -> {}, future::error, future::success); }); readyToStartConsumer @@ -145,9 +133,8 @@ public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements Asy .as(MonoUtils::toMono) .doOnSuccess(s -> logger.debug("Requesting tdlib.remoteclient.clients.deploy.existing")) .then(Mono.fromCallable(() -> cluster.getEventBus().publish("tdlib.remoteclient.clients.deploy", botAddress, deliveryOptions))) - .subscribeOn(Schedulers.single()) - .subscribe(v -> {}, future::fail); - }).as(MonoUtils::toMono) + .subscribe(v -> {}, future::error); + }) ) .onErrorMap(ex -> { logger.error("Failure when starting bot " + botAddress, ex);