diff --git a/pom.xml b/pom.xml index 7ab9b69..dcf0e49 100644 --- a/pom.xml +++ b/pom.xml @@ -102,7 +102,7 @@ it.tdlight tdlight-java - 3.169.111 + 3.169.112 it.cavallium diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java b/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java index 3273b18..cbe553f 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java @@ -33,7 +33,6 @@ import java.util.Objects; import java.util.StringJoiner; import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.Level; -import org.apache.commons.lang3.tuple.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.warp.commonutils.error.InitializationException; @@ -186,23 +185,8 @@ public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements Asy private Mono pipe() { var updates = this.requestUpdatesBatchFromNetwork() - .repeatWhen(nFlux -> { - return Flux.push(emitter -> { - var dispos = Flux.combineLatest(nFlux, tdClosed.distinct(), Pair::of).subscribe(val -> { - //noinspection PointlessBooleanExpression - if (val.getRight() == true) { - emitter.complete(); - } else { - if (val.getLeft() == 0) { - emitter.complete(); - } else { - emitter.next(val); - } - } - }); - emitter.onDispose(dispos); - }); - }) // Repeat when there is one batch with a flux of updates + .repeatWhen(nFlux -> nFlux.takeWhile(n -> n > 0)) // Repeat when there is one batch with a flux of updates + .takeUntilOther(tdClosed.distinct().filter(tdClosed -> tdClosed)) // Stop when closed .flatMap(batch -> batch) .onErrorResume(error -> { logger.error("Bot updates request failed! Marking as closed.", error); @@ -217,6 +201,10 @@ public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements Asy if (update.getConstructor() == UpdateAuthorizationState.CONSTRUCTOR) { var state = (UpdateAuthorizationState) update; if (state.authorizationState.getConstructor() == AuthorizationStateClosed.CONSTRUCTOR) { + + // Send tdClosed early to avoid errors + tdClosed.onNext(true); + this.getVertx().undeploy(this.deploymentID(), undeployed -> { if (undeployed.failed()) { logger.error("Error when undeploying td verticle", undeployed.cause()); @@ -231,7 +219,10 @@ public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements Asy } }); }) - .log("TdMiddle", Level.FINEST).publish().autoConnect(1); + .log("TdMiddle", Level.FINEST) + .takeUntilOther(tdClosed.distinct().filter(tdClosed -> tdClosed)) // Stop when closed + .publish() + .autoConnect(1); updates.subscribe(t -> incomingUpdatesCo.onNext(Flux.just(t)), incomingUpdatesCo::onError,