Update pom.xml and AsyncTdMiddleEventBusClient.java

This commit is contained in:
Andrea Cavalli 2020-11-12 13:08:39 +01:00
parent b935a345a3
commit 0ab45ce211
2 changed files with 11 additions and 20 deletions

View File

@ -102,7 +102,7 @@
<dependency> <dependency>
<groupId>it.tdlight</groupId> <groupId>it.tdlight</groupId>
<artifactId>tdlight-java</artifactId> <artifactId>tdlight-java</artifactId>
<version>3.169.111</version> <version>3.169.112</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>it.cavallium</groupId> <groupId>it.cavallium</groupId>

View File

@ -33,7 +33,6 @@ import java.util.Objects;
import java.util.StringJoiner; import java.util.StringJoiner;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level; import java.util.logging.Level;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.warp.commonutils.error.InitializationException; import org.warp.commonutils.error.InitializationException;
@ -186,23 +185,8 @@ public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements Asy
private Mono<Void> pipe() { private Mono<Void> pipe() {
var updates = this.requestUpdatesBatchFromNetwork() var updates = this.requestUpdatesBatchFromNetwork()
.repeatWhen(nFlux -> { .repeatWhen(nFlux -> nFlux.takeWhile(n -> n > 0)) // Repeat when there is one batch with a flux of updates
return Flux.push(emitter -> { .takeUntilOther(tdClosed.distinct().filter(tdClosed -> tdClosed)) // Stop when closed
var dispos = Flux.combineLatest(nFlux, tdClosed.distinct(), Pair::of).subscribe(val -> {
//noinspection PointlessBooleanExpression
if (val.getRight() == true) {
emitter.complete();
} else {
if (val.getLeft() == 0) {
emitter.complete();
} else {
emitter.next(val);
}
}
});
emitter.onDispose(dispos);
});
}) // Repeat when there is one batch with a flux of updates
.flatMap(batch -> batch) .flatMap(batch -> batch)
.onErrorResume(error -> { .onErrorResume(error -> {
logger.error("Bot updates request failed! Marking as closed.", error); logger.error("Bot updates request failed! Marking as closed.", error);
@ -217,6 +201,10 @@ public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements Asy
if (update.getConstructor() == UpdateAuthorizationState.CONSTRUCTOR) { if (update.getConstructor() == UpdateAuthorizationState.CONSTRUCTOR) {
var state = (UpdateAuthorizationState) update; var state = (UpdateAuthorizationState) update;
if (state.authorizationState.getConstructor() == AuthorizationStateClosed.CONSTRUCTOR) { if (state.authorizationState.getConstructor() == AuthorizationStateClosed.CONSTRUCTOR) {
// Send tdClosed early to avoid errors
tdClosed.onNext(true);
this.getVertx().undeploy(this.deploymentID(), undeployed -> { this.getVertx().undeploy(this.deploymentID(), undeployed -> {
if (undeployed.failed()) { if (undeployed.failed()) {
logger.error("Error when undeploying td verticle", undeployed.cause()); logger.error("Error when undeploying td verticle", undeployed.cause());
@ -231,7 +219,10 @@ public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements Asy
} }
}); });
}) })
.log("TdMiddle", Level.FINEST).publish().autoConnect(1); .log("TdMiddle", Level.FINEST)
.takeUntilOther(tdClosed.distinct().filter(tdClosed -> tdClosed)) // Stop when closed
.publish()
.autoConnect(1);
updates.subscribe(t -> incomingUpdatesCo.onNext(Flux.just(t)), updates.subscribe(t -> incomingUpdatesCo.onNext(Flux.just(t)),
incomingUpdatesCo::onError, incomingUpdatesCo::onError,