diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/direct/AsyncTdMiddleDirect.java b/src/main/java/it/tdlight/tdlibsession/td/middle/direct/AsyncTdMiddleDirect.java index dcb73f3..346f817 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/direct/AsyncTdMiddleDirect.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/direct/AsyncTdMiddleDirect.java @@ -14,9 +14,11 @@ import it.tdlight.tdlibsession.td.direct.AsyncTdDirectImpl; import it.tdlight.tdlibsession.td.middle.AsyncTdMiddle; import it.tdlight.tdlibsession.td.middle.TdClusterManager; import it.tdlight.utils.MonoUtils; +import org.reactivestreams.Publisher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.warp.commonutils.error.InitializationException; +import reactor.core.publisher.ConnectableFlux; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.ReplayProcessor; @@ -29,6 +31,7 @@ public class AsyncTdMiddleDirect extends AbstractVerticle implements AsyncTdMidd protected AsyncTdDirectImpl td; private String botAddress; private String botAlias; + private Flux updatesFluxCo; public AsyncTdMiddleDirect() { } @@ -66,7 +69,24 @@ public class AsyncTdMiddleDirect extends AbstractVerticle implements AsyncTdMidd this.td = new AsyncTdDirectImpl(botAlias); - td.initializeClient().doOnSuccess(v -> startPromise.complete()).subscribe(success -> { + td.initializeClient().doOnSuccess(v -> { + updatesFluxCo = Mono.from(tdClosed).filter(closed -> !closed).flatMapMany(_x -> td.getUpdates(WAIT_DURATION, 1000).flatMap(result -> { + if (result.succeeded()) { + if (result.result().succeeded()) { + return Mono.just(result.result().result()); + } else { + logger.error("Received an errored update", + ResponseError.newResponseError("incoming update", botAlias, result.result().cause()) + ); + return Mono.empty(); + } + } else { + logger.error("Received an errored update", result.cause()); + return Mono.empty(); + } + })).publish().refCount(1); + startPromise.complete(); + }).subscribe(success -> { }, (ex) -> { logger.error("Failure when starting bot " + botAlias + ", address " + botAddress, ex); startPromise.fail(new InitializationException("Can't connect tdlib middle client to tdlib middle server!")); @@ -86,21 +106,7 @@ public class AsyncTdMiddleDirect extends AbstractVerticle implements AsyncTdMidd @Override public Flux getUpdates() { - return Mono.from(tdClosed).filter(closed -> !closed).flatMapMany(_x -> td.getUpdates(WAIT_DURATION, 1000).flatMap(result -> { - if (result.succeeded()) { - if (result.result().succeeded()) { - return Mono.just(result.result().result()); - } else { - logger.error("Received an errored update", - ResponseError.newResponseError("incoming update", botAlias, result.result().cause()) - ); - return Mono.empty(); - } - } else { - logger.error("Received an errored update", result.cause()); - return Mono.empty(); - } - })); + return Flux.from(updatesFluxCo); } @Override