package it.tdlight.tdlibsession.td.middle.server; import io.reactivex.Completable; import io.vertx.core.eventbus.DeliveryOptions; import io.vertx.reactivex.core.AbstractVerticle; import io.vertx.reactivex.core.eventbus.Message; import io.vertx.reactivex.core.eventbus.MessageConsumer; import io.vertx.reactivex.core.eventbus.MessageProducer; import it.tdlight.jni.TdApi; import it.tdlight.jni.TdApi.AuthorizationStateClosed; import it.tdlight.jni.TdApi.Error; import it.tdlight.jni.TdApi.Function; import it.tdlight.jni.TdApi.SetTdlibParameters; import it.tdlight.jni.TdApi.Update; import it.tdlight.jni.TdApi.UpdateAuthorizationState; import it.tdlight.tdlibsession.remoteclient.TDLibRemoteClient; import it.tdlight.tdlibsession.td.TdResultMessage; import it.tdlight.tdlibsession.td.direct.AsyncTdDirectImpl; import it.tdlight.tdlibsession.td.direct.AsyncTdDirectOptions; import it.tdlight.tdlibsession.td.middle.EndSessionMessage; import it.tdlight.tdlibsession.td.middle.ExecuteObject; import it.tdlight.tdlibsession.td.middle.TdResultList; import it.tdlight.tdlibsession.td.middle.TdResultListMessageCodec; import it.tdlight.utils.BinlogAsyncFile; import it.tdlight.utils.BinlogUtils; import it.tdlight.utils.MonoUtils; import java.time.Duration; import java.util.Collections; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.Sinks; import reactor.core.publisher.Sinks.One; import reactor.core.scheduler.Schedulers; import reactor.util.function.Tuples; public class AsyncTdMiddleEventBusServer extends AbstractVerticle { // Static values protected static final Logger logger = LoggerFactory.getLogger("TdMiddleServer"); public static final byte[] EMPTY = new byte[0]; public static final Duration WAIT_DURATION = Duration.ofSeconds(1); // Values configured from constructor private final AsyncTdDirectOptions tdOptions; // Variables configured by the user at startup private final One botId = Sinks.one(); private final One botAddress = Sinks.one(); private final One botAlias = Sinks.one(); private final One local = Sinks.one(); // Variables configured at startup private final One td = Sinks.one(); private final One> executeConsumer = Sinks.one(); private final One> readBinlogConsumer = Sinks.one(); public AsyncTdMiddleEventBusServer() { this.tdOptions = new AsyncTdDirectOptions(WAIT_DURATION, 100); } @Override public Completable rxStart() { return MonoUtils.toCompletable(Mono .fromCallable(() -> { var botId = config().getInteger("botId"); if (botId == null || botId <= 0) { throw new IllegalArgumentException("botId is not set!"); } if (this.botId.tryEmitValue(botId).isFailure()) { throw new IllegalStateException("Failed to set botId"); } var botAddress = "bots.bot." + botId; if (this.botAddress.tryEmitValue(botAddress).isFailure()) { throw new IllegalStateException("Failed to set botAddress"); } var botAlias = config().getString("botAlias"); if (botAlias == null || botAlias.isEmpty()) { throw new IllegalArgumentException("botAlias is not set!"); } if (this.botAlias.tryEmitValue(botAlias).isFailure()) { throw new IllegalStateException("Failed to set botAlias"); } var local = config().getBoolean("local"); if (local == null) { throw new IllegalArgumentException("local is not set!"); } if (this.local.tryEmitValue(local).isFailure()) { throw new IllegalStateException("Failed to set local"); } var td = new AsyncTdDirectImpl(botAlias); if (this.td.tryEmitValue(td).isFailure()) { throw new IllegalStateException("Failed to set td instance"); } return onSuccessfulStartRequest(td, botAddress, botAlias, botId, local); }) .flatMap(Mono::hide)); } private Mono onSuccessfulStartRequest(AsyncTdDirectImpl td, String botAddress, String botAlias, int botId, boolean local) { return this .listen(td, botAddress, botAlias, botId, local) .then(this.pipe(td, botAddress, botAlias, botId, local)) .doOnSuccess(s -> { logger.info("Deploy and start of bot \"" + botAlias + "\": ✅ Succeeded"); }) .doOnError(ex -> { logger.error("Deploy and start of bot \"" + botAlias + "\": ❌ Failed", ex); }); } private Mono listen(AsyncTdDirectImpl td, String botAddress, String botAlias, int botId, boolean local) { return Mono.create(registrationSink -> { MessageConsumer executeConsumer = vertx.eventBus().consumer(botAddress + ".execute"); if (this.executeConsumer.tryEmitValue(executeConsumer).isFailure()) { registrationSink.error(new IllegalStateException("Failed to set executeConsumer")); return; } Flux .>create(sink -> { executeConsumer.handler(sink::next); executeConsumer.endHandler(h -> sink.complete()); }) .flatMap(msg -> { var request = overrideRequest(msg.body().getRequest(), botId); return td .execute(request, msg.body().isExecuteDirectly()) .map(result -> Tuples.of(msg, result)); }) .handle((tuple, sink) -> { var msg = tuple.getT1(); var response = tuple.getT2(); var replyOpts = new DeliveryOptions().setLocalOnly(local); var replyValue = new TdResultMessage(response.result(), response.cause()); try { msg.reply(replyValue, replyOpts); sink.next(response); } catch (Exception ex) { msg.fail(500, ex.getLocalizedMessage()); sink.error(ex); } }) .then() .subscribeOn(Schedulers.single()) .subscribe(v -> {}, ex -> logger.error("Error when processing an execute request", ex)); MessageConsumer readBinlogConsumer = vertx.eventBus().consumer(botAddress + ".read-binlog"); if (this.readBinlogConsumer.tryEmitValue(readBinlogConsumer).isFailure()) { registrationSink.error(new IllegalStateException("Failed to set readBinlogConsumer")); return; } Flux .>create(sink -> { readBinlogConsumer.handler(sink::next); readBinlogConsumer.endHandler(h -> sink.complete()); }) .flatMap(req -> BinlogUtils .retrieveBinlog(vertx.fileSystem(), TDLibRemoteClient.getSessionBinlogDirectory(botId)) .flatMap(BinlogAsyncFile::readFullyBytes) .single() .map(binlog -> Tuples.of(req, binlog)) ) .doOnNext(tuple -> { var opts = new DeliveryOptions().setLocalOnly(local).setSendTimeout(Duration.ofSeconds(10).toMillis()); tuple.getT1().reply(new EndSessionMessage(botId, tuple.getT2()), opts); }) .then() .subscribeOn(Schedulers.single()) .subscribe(v -> {}, ex -> logger.error("Error when processing a read-binlog request", ex)); //noinspection ResultOfMethodCallIgnored executeConsumer .rxCompletionHandler() .andThen(readBinlogConsumer.rxCompletionHandler()) .subscribeOn(io.reactivex.schedulers.Schedulers.single()) .subscribe(registrationSink::success, registrationSink::error); }); } /** * Override some requests */ private Function overrideRequest(Function request, int botId) { switch (request.getConstructor()) { case SetTdlibParameters.CONSTRUCTOR: // Fix session directory locations var setTdlibParamsObj = (SetTdlibParameters) request; setTdlibParamsObj.parameters.databaseDirectory = TDLibRemoteClient.getSessionDirectory(botId).toString(); setTdlibParamsObj.parameters.filesDirectory = TDLibRemoteClient.getMediaDirectory(botId).toString(); return request; default: return request; } } @Override public Completable rxStop() { return MonoUtils.toCompletable(botAlias .asMono() .timeout(Duration.ofSeconds(1), Mono.just("???")) .flatMap(botAlias -> Mono .fromRunnable(() -> logger.info("Undeploy of bot \"" + botAlias + "\": stopping")) .then(executeConsumer .asMono() .timeout(Duration.ofSeconds(5), Mono.empty()) .flatMap(ec -> ec.rxUnregister().as(MonoUtils::toMono))) .doOnError(ex -> logger.error("Undeploy of bot \"" + botAlias + "\": stop failed", ex)) .doOnTerminate(() -> logger.info("Undeploy of bot \"" + botAlias + "\": stopped")))); } private Mono pipe(AsyncTdDirectImpl td, String botAddress, String botAlias, int botId, boolean local) { Flux updatesFlux = td .receive(tdOptions) .flatMap(item -> Mono.defer(() -> { if (item.succeeded()) { var tdObject = item.result(); if (tdObject instanceof Update) { var tdUpdate = (Update) tdObject; if (tdUpdate.getConstructor() == UpdateAuthorizationState.CONSTRUCTOR) { var tdUpdateAuthorizationState = (UpdateAuthorizationState) tdUpdate; if (tdUpdateAuthorizationState.authorizationState.getConstructor() == AuthorizationStateClosed.CONSTRUCTOR) { logger.debug("Undeploying after receiving AuthorizationStateClosed"); return rxStop().as(MonoUtils::toMono).thenReturn(item); } } } else if (tdObject instanceof Error) { // An error in updates means that a fatal error occurred logger.debug("Undeploying after receiving a fatal error"); return rxStop().as(MonoUtils::toMono).thenReturn(item); } return Mono.just(item); } else { return Mono.just(item); } })) .bufferTimeout(tdOptions.getEventsSize(), local ? Duration.ofMillis(1) : Duration.ofMillis(100)) .windowTimeout(1, Duration.ofSeconds(5)) .flatMap(w -> w.defaultIfEmpty(Collections.emptyList())) .map(TdResultList::new); var fluxCodec = new TdResultListMessageCodec(); var opts = new DeliveryOptions() .setLocalOnly(local) .setSendTimeout(Duration.ofSeconds(30).toMillis()) .setCodecName(fluxCodec.name()); MessageProducer updatesSender = vertx .eventBus() .sender(botAddress + ".updates", opts); //noinspection CallingSubscribeInNonBlockingScope updatesFlux .flatMap(update -> updatesSender.rxWrite(update).as(MonoUtils::toMono)) .doOnTerminate(() -> updatesSender.close(h -> { if (h.failed()) { logger.error("Failed to close \"updates\" message sender"); } })) .onErrorResume(ex -> { logger.warn("Undeploying after a fatal error in a served flux", ex); return td.execute(new TdApi.Close(), false) .doOnError(ex2 -> logger.error("Unexpected error", ex2)) .then(); }) .subscribeOn(Schedulers.single()) .subscribe(); return Mono.empty(); } }