2020-10-14 01:38:44 +02:00
|
|
|
package it.tdlight.tdlibsession.td.middle.client;
|
|
|
|
|
|
|
|
import io.vertx.circuitbreaker.CircuitBreaker;
|
|
|
|
import io.vertx.circuitbreaker.CircuitBreakerOptions;
|
|
|
|
import io.vertx.core.AbstractVerticle;
|
|
|
|
import io.vertx.core.AsyncResult;
|
|
|
|
import io.vertx.core.Promise;
|
2020-10-17 18:28:54 +02:00
|
|
|
import io.vertx.core.eventbus.DeliveryOptions;
|
2020-10-14 01:38:44 +02:00
|
|
|
import io.vertx.core.eventbus.Message;
|
|
|
|
import io.vertx.core.json.JsonObject;
|
|
|
|
import it.tdlight.common.ConstructorDetector;
|
|
|
|
import it.tdlight.jni.TdApi;
|
|
|
|
import it.tdlight.jni.TdApi.AuthorizationStateClosed;
|
2020-10-28 12:04:42 +01:00
|
|
|
import it.tdlight.jni.TdApi.Error;
|
2020-10-14 01:38:44 +02:00
|
|
|
import it.tdlight.jni.TdApi.Function;
|
|
|
|
import it.tdlight.jni.TdApi.UpdateAuthorizationState;
|
2021-01-13 17:22:14 +01:00
|
|
|
import it.tdlight.tdlibsession.EventBusFlux;
|
2020-10-14 01:38:44 +02:00
|
|
|
import it.tdlight.tdlibsession.td.ResponseError;
|
|
|
|
import it.tdlight.tdlibsession.td.TdResult;
|
|
|
|
import it.tdlight.tdlibsession.td.TdResultMessage;
|
|
|
|
import it.tdlight.tdlibsession.td.middle.AsyncTdMiddle;
|
|
|
|
import it.tdlight.tdlibsession.td.middle.ExecuteObject;
|
|
|
|
import it.tdlight.tdlibsession.td.middle.TdClusterManager;
|
|
|
|
import it.tdlight.tdlibsession.td.middle.TdExecuteObjectMessageCodec;
|
|
|
|
import it.tdlight.tdlibsession.td.middle.TdMessageCodec;
|
2021-01-13 17:22:14 +01:00
|
|
|
import it.tdlight.tdlibsession.td.middle.TdResultList;
|
|
|
|
import it.tdlight.tdlibsession.td.middle.TdResultListMessageCodec;
|
2020-10-14 01:38:44 +02:00
|
|
|
import it.tdlight.tdlibsession.td.middle.TdResultMessageCodec;
|
|
|
|
import it.tdlight.utils.MonoUtils;
|
2021-01-15 23:31:10 +01:00
|
|
|
import java.net.ConnectException;
|
2020-10-14 01:38:44 +02:00
|
|
|
import java.time.Duration;
|
|
|
|
import java.util.Objects;
|
|
|
|
import java.util.StringJoiner;
|
|
|
|
import org.slf4j.Logger;
|
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
import org.warp.commonutils.error.InitializationException;
|
|
|
|
import reactor.core.publisher.Flux;
|
|
|
|
import reactor.core.publisher.Mono;
|
2021-01-15 22:01:56 +01:00
|
|
|
import reactor.core.publisher.SignalType;
|
2021-01-13 17:22:14 +01:00
|
|
|
import reactor.core.publisher.Sinks;
|
|
|
|
import reactor.core.publisher.Sinks.Many;
|
2021-01-22 12:25:04 +01:00
|
|
|
import reactor.core.publisher.Sinks.One;
|
2021-01-19 03:18:00 +01:00
|
|
|
import reactor.core.scheduler.Schedulers;
|
2020-10-14 01:38:44 +02:00
|
|
|
|
|
|
|
public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements AsyncTdMiddle {
|
|
|
|
|
2020-10-28 12:04:42 +01:00
|
|
|
private static final Logger logger = LoggerFactory.getLogger(AsyncTdMiddleEventBusClient.class );
|
|
|
|
|
2020-10-14 01:38:44 +02:00
|
|
|
public static final boolean OUTPUT_REQUESTS = false;
|
|
|
|
public static final byte[] EMPTY = new byte[0];
|
|
|
|
|
2021-01-15 22:01:56 +01:00
|
|
|
private final Many<Boolean> tdCloseRequested = Sinks.many().replay().latestOrDefault(false);
|
2021-01-13 17:22:14 +01:00
|
|
|
private final Many<Boolean> tdClosed = Sinks.many().replay().latestOrDefault(false);
|
2021-01-22 12:25:04 +01:00
|
|
|
private final One<Error> tdCrash = Sinks.one();
|
2020-10-17 18:28:54 +02:00
|
|
|
private final DeliveryOptions deliveryOptions;
|
|
|
|
private final DeliveryOptions deliveryOptionsWithTimeout;
|
2020-10-14 01:38:44 +02:00
|
|
|
|
|
|
|
private TdClusterManager cluster;
|
|
|
|
|
|
|
|
private String botAddress;
|
|
|
|
private String botAlias;
|
|
|
|
private boolean local;
|
|
|
|
private long initTime;
|
|
|
|
|
|
|
|
@SuppressWarnings({"unchecked", "rawtypes"})
|
|
|
|
public AsyncTdMiddleEventBusClient(TdClusterManager clusterManager) {
|
|
|
|
cluster = clusterManager;
|
2021-01-13 17:22:14 +01:00
|
|
|
if (cluster.registerDefaultCodec(TdResultList.class, new TdResultListMessageCodec())) {
|
2020-10-14 01:38:44 +02:00
|
|
|
cluster.registerDefaultCodec(ExecuteObject.class, new TdExecuteObjectMessageCodec());
|
|
|
|
cluster.registerDefaultCodec(TdResultMessage.class, new TdResultMessageCodec());
|
|
|
|
for (Class<?> value : ConstructorDetector.getTDConstructorsUnsafe().values()) {
|
|
|
|
cluster.registerDefaultCodec(value, new TdMessageCodec(value));
|
|
|
|
}
|
|
|
|
}
|
2020-10-17 18:28:54 +02:00
|
|
|
this.deliveryOptions = cluster.newDeliveryOpts().setLocalOnly(local);
|
2020-11-15 22:56:54 +01:00
|
|
|
this.deliveryOptionsWithTimeout = cluster.newDeliveryOpts().setLocalOnly(local).setSendTimeout(30000);
|
2020-10-14 01:38:44 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
public static Mono<AsyncTdMiddleEventBusClient> getAndDeployInstance(TdClusterManager clusterManager, String botAlias, String botAddress, boolean local) throws InitializationException {
|
|
|
|
try {
|
|
|
|
var instance = new AsyncTdMiddleEventBusClient(clusterManager);
|
2020-10-19 00:50:27 +02:00
|
|
|
var options = clusterManager.newDeploymentOpts().setConfig(new JsonObject()
|
2020-10-14 01:38:44 +02:00
|
|
|
.put("botAddress", botAddress)
|
|
|
|
.put("botAlias", botAlias)
|
|
|
|
.put("local", local));
|
|
|
|
return MonoUtils.<String>executeAsFuture(promise -> {
|
|
|
|
clusterManager.getVertx().deployVerticle(instance, options, promise);
|
|
|
|
}).doOnNext(_v -> {
|
|
|
|
logger.trace("Deployed verticle for bot address: " + botAddress);
|
|
|
|
}).thenReturn(instance);
|
|
|
|
} catch (RuntimeException e) {
|
|
|
|
throw new InitializationException(e);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public void start(Promise<Void> startPromise) {
|
|
|
|
var botAddress = config().getString("botAddress");
|
|
|
|
if (botAddress == null || botAddress.isEmpty()) {
|
|
|
|
throw new IllegalArgumentException("botAddress is not set!");
|
|
|
|
}
|
|
|
|
this.botAddress = botAddress;
|
|
|
|
var botAlias = config().getString("botAlias");
|
|
|
|
if (botAlias == null || botAlias.isEmpty()) {
|
|
|
|
throw new IllegalArgumentException("botAlias is not set!");
|
|
|
|
}
|
|
|
|
this.botAlias = botAlias;
|
|
|
|
var local = config().getBoolean("local");
|
|
|
|
if (local == null) {
|
|
|
|
throw new IllegalArgumentException("local is not set!");
|
|
|
|
}
|
|
|
|
this.local = local;
|
|
|
|
this.initTime = System.currentTimeMillis();
|
|
|
|
|
|
|
|
CircuitBreaker startBreaker = CircuitBreaker.create("bot-" + botAddress + "-server-online-check-circuit-breaker", vertx,
|
2020-11-15 22:56:54 +01:00
|
|
|
new CircuitBreakerOptions().setMaxFailures(1).setMaxRetries(4).setTimeout(30000)
|
2020-10-14 01:38:44 +02:00
|
|
|
)
|
|
|
|
.retryPolicy(policy -> 4000L)
|
|
|
|
.openHandler(closed -> {
|
|
|
|
logger.error("Circuit opened! " + botAddress);
|
|
|
|
})
|
|
|
|
.closeHandler(closed -> {
|
|
|
|
logger.error("Circuit closed! " + botAddress);
|
|
|
|
});
|
|
|
|
|
|
|
|
startBreaker.execute(future -> {
|
|
|
|
try {
|
2020-10-28 12:04:42 +01:00
|
|
|
logger.debug("Requesting tdlib.remoteclient.clients.deploy.existing");
|
|
|
|
cluster.getEventBus().publish("tdlib.remoteclient.clients.deploy", botAddress, deliveryOptions);
|
|
|
|
|
|
|
|
|
|
|
|
logger.debug("Waiting for " + botAddress + ".readyToStart");
|
2021-01-22 12:25:04 +01:00
|
|
|
var readyToStartConsumer = cluster.getEventBus().<byte[]>consumer(botAddress + ".readyToStart");
|
|
|
|
readyToStartConsumer.handler((Message<byte[]> pingMsg) -> {
|
|
|
|
logger.debug("Received ping reply (succeeded)");
|
|
|
|
readyToStartConsumer.unregister(unregistered -> {
|
|
|
|
// Reply instantly
|
|
|
|
pingMsg.reply(new byte[0]);
|
|
|
|
if (unregistered.succeeded()) {
|
|
|
|
logger.debug("Requesting " + botAddress + ".start");
|
|
|
|
cluster
|
|
|
|
.getEventBus()
|
|
|
|
.request(botAddress + ".start", EMPTY, deliveryOptionsWithTimeout, startMsg -> {
|
|
|
|
if (startMsg.succeeded()) {
|
|
|
|
logger.debug("Requesting " + botAddress + ".isWorking");
|
|
|
|
cluster
|
|
|
|
.getEventBus()
|
|
|
|
.request(botAddress + ".isWorking", EMPTY, deliveryOptionsWithTimeout, msg -> {
|
|
|
|
if (msg.succeeded()) {
|
|
|
|
this.listen()
|
|
|
|
.timeout(Duration.ofSeconds(30))
|
|
|
|
.subscribeOn(Schedulers.single())
|
|
|
|
.subscribe(v -> {}, future::fail, future::complete);
|
|
|
|
} else {
|
|
|
|
future.fail(msg.cause());
|
|
|
|
}
|
|
|
|
});
|
|
|
|
} else {
|
|
|
|
future.fail(startMsg.cause());
|
|
|
|
}
|
|
|
|
});
|
|
|
|
} else {
|
|
|
|
logger.error("Failed to unregister readyToStartConsumer", unregistered.cause());
|
|
|
|
}
|
|
|
|
});
|
2020-10-28 12:04:42 +01:00
|
|
|
});
|
2020-10-14 01:38:44 +02:00
|
|
|
} catch (Exception ex) {
|
|
|
|
future.fail(ex);
|
|
|
|
}
|
|
|
|
})
|
|
|
|
.onFailure(ex -> {
|
|
|
|
logger.error("Failure when starting bot " + botAddress, ex);
|
|
|
|
startPromise.fail(new InitializationException("Can't connect tdlib middle client to tdlib middle server!"));
|
|
|
|
})
|
|
|
|
.onSuccess(v -> startPromise.complete());
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public void stop(Promise<Void> stopPromise) {
|
2021-01-15 22:01:56 +01:00
|
|
|
logger.debug("Stopping AsyncTdMiddle client verticle...");
|
2021-01-22 12:25:04 +01:00
|
|
|
tdCloseRequested.asFlux().take(1).single().flatMap(closeRequested -> {
|
|
|
|
if (!closeRequested) {
|
|
|
|
return tdCrash.asMono().switchIfEmpty(Mono
|
|
|
|
.fromRunnable(() -> logger.warn("Verticle is being stopped before closing TDLib with Close()! Sending Close() before stopping..."))
|
|
|
|
.then(this.execute(new TdApi.Close(), false)
|
|
|
|
).then()
|
|
|
|
.cast(TdApi.Error.class)
|
|
|
|
).doOnTerminate(() -> {
|
|
|
|
logger.debug("Close() sent to td");
|
|
|
|
markCloseRequested();
|
|
|
|
}).then();
|
2021-01-15 22:01:56 +01:00
|
|
|
} else {
|
2021-01-22 12:25:04 +01:00
|
|
|
return Mono.empty();
|
2021-01-15 22:01:56 +01:00
|
|
|
}
|
2021-01-22 12:25:04 +01:00
|
|
|
}).thenMany(tdClosed.asFlux()).filter(closed -> closed).take(1).subscribe(v -> {}, cause -> {
|
|
|
|
logger.debug("Failed to stop AsyncTdMiddle client verticle");
|
|
|
|
stopPromise.fail(cause);
|
|
|
|
}, () -> {
|
|
|
|
logger.debug("Stopped AsyncTdMiddle client verticle");
|
|
|
|
stopPromise.complete();
|
2020-10-28 12:04:42 +01:00
|
|
|
});
|
2020-10-14 01:38:44 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
private Mono<Void> listen() {
|
|
|
|
// Nothing to listen for now
|
|
|
|
return Mono.empty();
|
|
|
|
}
|
|
|
|
|
|
|
|
private static class UpdatesBatchResult {
|
2020-10-28 12:04:42 +01:00
|
|
|
public final Flux<TdApi.Object> updatesFlux;
|
2020-10-14 01:38:44 +02:00
|
|
|
public final boolean completed;
|
|
|
|
|
2020-10-28 12:04:42 +01:00
|
|
|
private UpdatesBatchResult(Flux<TdApi.Object> updatesFlux, boolean completed) {
|
2020-10-14 01:38:44 +02:00
|
|
|
this.updatesFlux = updatesFlux;
|
|
|
|
this.completed = completed;
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public String toString() {
|
|
|
|
return new StringJoiner(", ", UpdatesBatchResult.class.getSimpleName() + "[", "]")
|
|
|
|
.add("updatesFlux=" + updatesFlux)
|
|
|
|
.add("completed=" + completed)
|
|
|
|
.toString();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
2021-01-13 04:00:43 +01:00
|
|
|
public Flux<TdApi.Object> receive() {
|
2021-01-13 17:22:14 +01:00
|
|
|
var fluxCodec = new TdResultListMessageCodec();
|
2021-01-15 22:01:56 +01:00
|
|
|
return tdCloseRequested.asFlux().take(1).single().filter(close -> !close).flatMapMany(_closed -> EventBusFlux
|
2021-01-13 17:22:14 +01:00
|
|
|
.<TdResultList>connect(cluster.getEventBus(),
|
|
|
|
botAddress + ".updates",
|
|
|
|
deliveryOptions,
|
|
|
|
fluxCodec,
|
|
|
|
Duration.ofMillis(deliveryOptionsWithTimeout.getSendTimeout())
|
|
|
|
)
|
|
|
|
.filter(Objects::nonNull)
|
|
|
|
.flatMap(block -> Flux.fromIterable(block.getValues()))
|
|
|
|
.filter(Objects::nonNull)
|
|
|
|
.onErrorResume(error -> {
|
2021-01-22 12:25:04 +01:00
|
|
|
TdApi.Error theError;
|
2021-01-15 23:31:10 +01:00
|
|
|
if (error instanceof ConnectException) {
|
2021-01-22 12:25:04 +01:00
|
|
|
theError = new Error(444, "CONNECTION_KILLED");
|
2021-01-15 23:31:10 +01:00
|
|
|
} else if (error.getMessage().contains("Timed out")) {
|
2021-01-22 12:25:04 +01:00
|
|
|
theError = new Error(444, "CONNECTION_KILLED");
|
2021-01-13 17:22:14 +01:00
|
|
|
} else {
|
2021-01-22 12:25:04 +01:00
|
|
|
theError = new Error(406, "INVALID_UPDATE");
|
2021-01-15 23:31:10 +01:00
|
|
|
logger.error("Bot updates request failed! Marking as closed.", error);
|
2021-01-13 17:22:14 +01:00
|
|
|
}
|
2021-01-22 12:25:04 +01:00
|
|
|
tdCrash.tryEmitValue(theError);
|
|
|
|
return Flux.just(TdResult.failed(theError));
|
2021-01-13 17:22:14 +01:00
|
|
|
}).flatMap(item -> Mono.fromCallable(item::orElseThrow))
|
|
|
|
.filter(Objects::nonNull)
|
|
|
|
.doOnNext(item -> {
|
|
|
|
if (OUTPUT_REQUESTS) {
|
|
|
|
System.out.println(" <- " + item.toString()
|
|
|
|
.replace("\n", " ")
|
|
|
|
.replace("\t", "")
|
|
|
|
.replace(" ", "")
|
|
|
|
.replace(" = ", "=")
|
|
|
|
);
|
|
|
|
}
|
|
|
|
if (item.getConstructor() == UpdateAuthorizationState.CONSTRUCTOR) {
|
|
|
|
var state = (UpdateAuthorizationState) item;
|
|
|
|
if (state.authorizationState.getConstructor() == AuthorizationStateClosed.CONSTRUCTOR) {
|
|
|
|
// Send tdClosed early to avoid errors
|
2021-01-15 22:01:56 +01:00
|
|
|
logger.debug("Received AuthorizationStateClosed from td. Marking td as closed");
|
|
|
|
markCloseRequested();
|
|
|
|
markClosed();
|
2021-01-13 17:22:14 +01:00
|
|
|
}
|
|
|
|
}
|
2021-01-15 22:01:56 +01:00
|
|
|
})).doFinally(s -> {
|
|
|
|
if (s == SignalType.ON_ERROR) {
|
|
|
|
// Send tdClosed early to avoid errors
|
|
|
|
logger.debug("Updates flux terminated with an error signal. Marking td as closed");
|
|
|
|
markCloseRequested();
|
|
|
|
markClosed();
|
|
|
|
}
|
|
|
|
});
|
|
|
|
}
|
|
|
|
|
|
|
|
private void markCloseRequested() {
|
|
|
|
if (tdCloseRequested.tryEmitNext(true).isFailure()) {
|
|
|
|
logger.error("Failed to set tdCloseRequested");
|
|
|
|
if (tdCloseRequested.tryEmitComplete().isFailure()) {
|
|
|
|
logger.error("Failed to complete tdCloseRequested");
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
private void markClosed() {
|
2021-01-22 12:25:04 +01:00
|
|
|
if (tdCrash.tryEmitEmpty().isFailure()) {
|
|
|
|
logger.debug("TDLib already crashed");
|
|
|
|
}
|
2021-01-15 22:01:56 +01:00
|
|
|
if (tdClosed.tryEmitNext(true).isFailure()) {
|
|
|
|
logger.error("Failed to set tdClosed");
|
|
|
|
if (tdClosed.tryEmitComplete().isFailure()) {
|
|
|
|
logger.error("Failed to complete tdClosed");
|
|
|
|
}
|
|
|
|
}
|
2020-10-14 01:38:44 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public <T extends TdApi.Object> Mono<TdResult<T>> execute(Function request, boolean executeDirectly) {
|
|
|
|
|
|
|
|
var req = new ExecuteObject(executeDirectly, request);
|
|
|
|
if (OUTPUT_REQUESTS) {
|
|
|
|
System.out.println(" -> " + request.toString()
|
|
|
|
.replace("\n", " ")
|
|
|
|
.replace("\t", "")
|
|
|
|
.replace(" ", "")
|
|
|
|
.replace(" = ", "="));
|
|
|
|
}
|
|
|
|
|
2021-01-22 12:25:04 +01:00
|
|
|
var crashMono = tdCrash.asMono().<TdResult<T>>map(TdResult::failed);
|
|
|
|
var executeMono = tdCloseRequested.asFlux().take(1).single().filter(close -> !close).<TdResult<T>>flatMap((_x) -> Mono.create(sink -> {
|
2020-10-14 20:04:23 +02:00
|
|
|
try {
|
|
|
|
cluster
|
|
|
|
.getEventBus()
|
|
|
|
.request(botAddress + ".execute",
|
|
|
|
req,
|
2020-10-17 18:28:54 +02:00
|
|
|
deliveryOptions,
|
2020-10-14 20:04:23 +02:00
|
|
|
(AsyncResult<Message<TdResultMessage>> event) -> {
|
|
|
|
try {
|
|
|
|
if (event.succeeded()) {
|
|
|
|
if (event.result().body() == null) {
|
|
|
|
sink.error(new NullPointerException("Response is empty"));
|
|
|
|
} else {
|
|
|
|
sink.success(Objects.requireNonNull(event.result().body()).toTdResult());
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
sink.error(ResponseError.newResponseError(request, botAlias, event.cause()));
|
|
|
|
}
|
|
|
|
} catch (Throwable t) {
|
|
|
|
sink.error(t);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
);
|
|
|
|
} catch (Throwable t) {
|
|
|
|
sink.error(t);
|
|
|
|
}
|
|
|
|
})).<TdResult<T>>switchIfEmpty(Mono.defer(() -> Mono.just(TdResult.<T>failed(new TdApi.Error(500,
|
|
|
|
"Client is closed or response is empty"
|
|
|
|
))))).<TdResult<T>>handle((response, sink) -> {
|
2020-10-14 01:38:44 +02:00
|
|
|
try {
|
|
|
|
Objects.requireNonNull(response);
|
|
|
|
if (OUTPUT_REQUESTS) {
|
2020-10-14 20:04:23 +02:00
|
|
|
System.out.println(
|
|
|
|
" <- " + response.toString().replace("\n", " ").replace("\t", "").replace(" ", "").replace(" = ", "="));
|
2020-10-14 01:38:44 +02:00
|
|
|
}
|
2020-10-14 12:19:14 +02:00
|
|
|
sink.next(response);
|
2020-10-14 20:04:23 +02:00
|
|
|
} catch (Exception e) {
|
2020-10-14 12:19:14 +02:00
|
|
|
sink.error(e);
|
2020-10-14 01:38:44 +02:00
|
|
|
}
|
2020-10-14 15:14:54 +02:00
|
|
|
}).switchIfEmpty(Mono.fromSupplier(() -> {
|
|
|
|
return TdResult.failed(new TdApi.Error(500, "Client is closed or response is empty"));
|
2021-01-13 22:05:34 +01:00
|
|
|
}));
|
2021-01-22 12:25:04 +01:00
|
|
|
return Mono.firstWithValue(crashMono, executeMono);
|
2020-10-14 01:38:44 +02:00
|
|
|
}
|
|
|
|
}
|