Optimize AsyncTdMiddleDirect

This commit is contained in:
Andrea Cavalli 2020-10-19 16:23:40 +02:00
parent 4fe39ae7d1
commit d57c5ef8ae

View File

@ -14,9 +14,11 @@ import it.tdlight.tdlibsession.td.direct.AsyncTdDirectImpl;
import it.tdlight.tdlibsession.td.middle.AsyncTdMiddle;
import it.tdlight.tdlibsession.td.middle.TdClusterManager;
import it.tdlight.utils.MonoUtils;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.warp.commonutils.error.InitializationException;
import reactor.core.publisher.ConnectableFlux;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.ReplayProcessor;
@ -29,6 +31,7 @@ public class AsyncTdMiddleDirect extends AbstractVerticle implements AsyncTdMidd
protected AsyncTdDirectImpl td;
private String botAddress;
private String botAlias;
private Flux<Update> updatesFluxCo;
public AsyncTdMiddleDirect() {
}
@ -66,7 +69,24 @@ public class AsyncTdMiddleDirect extends AbstractVerticle implements AsyncTdMidd
this.td = new AsyncTdDirectImpl(botAlias);
td.initializeClient().doOnSuccess(v -> startPromise.complete()).subscribe(success -> {
td.initializeClient().doOnSuccess(v -> {
updatesFluxCo = Mono.from(tdClosed).filter(closed -> !closed).flatMapMany(_x -> td.getUpdates(WAIT_DURATION, 1000).flatMap(result -> {
if (result.succeeded()) {
if (result.result().succeeded()) {
return Mono.just(result.result().result());
} else {
logger.error("Received an errored update",
ResponseError.newResponseError("incoming update", botAlias, result.result().cause())
);
return Mono.<Update>empty();
}
} else {
logger.error("Received an errored update", result.cause());
return Mono.<Update>empty();
}
})).publish().refCount(1);
startPromise.complete();
}).subscribe(success -> {
}, (ex) -> {
logger.error("Failure when starting bot " + botAlias + ", address " + botAddress, ex);
startPromise.fail(new InitializationException("Can't connect tdlib middle client to tdlib middle server!"));
@ -86,21 +106,7 @@ public class AsyncTdMiddleDirect extends AbstractVerticle implements AsyncTdMidd
@Override
public Flux<Update> getUpdates() {
return Mono.from(tdClosed).filter(closed -> !closed).flatMapMany(_x -> td.getUpdates(WAIT_DURATION, 1000).flatMap(result -> {
if (result.succeeded()) {
if (result.result().succeeded()) {
return Mono.just(result.result().result());
} else {
logger.error("Received an errored update",
ResponseError.newResponseError("incoming update", botAlias, result.result().cause())
);
return Mono.empty();
}
} else {
logger.error("Received an errored update", result.cause());
return Mono.empty();
}
}));
return Flux.from(updatesFluxCo);
}
@Override