package it.tdlight.tdlibsession.td.middle.client; import io.vertx.core.eventbus.DeliveryOptions; import io.vertx.core.json.JsonObject; import io.vertx.reactivex.core.Vertx; import io.vertx.reactivex.core.buffer.Buffer; import io.vertx.reactivex.core.eventbus.Message; import io.vertx.reactivex.core.eventbus.MessageConsumer; import it.tdlight.jni.TdApi; import it.tdlight.jni.TdApi.AuthorizationStateClosed; import it.tdlight.jni.TdApi.Function; import it.tdlight.jni.TdApi.Object; import it.tdlight.jni.TdApi.UpdateAuthorizationState; import it.tdlight.tdlibsession.td.ResponseError; import it.tdlight.tdlibsession.td.TdError; import it.tdlight.tdlibsession.td.TdResult; import it.tdlight.tdlibsession.td.middle.TdResultMessage; import it.tdlight.tdlibsession.td.middle.AsyncTdMiddle; import it.tdlight.tdlibsession.td.middle.EndSessionMessage; import it.tdlight.tdlibsession.td.middle.ExecuteObject; import it.tdlight.tdlibsession.td.middle.StartSessionMessage; import it.tdlight.tdlibsession.td.middle.TdClusterManager; import it.tdlight.tdlibsession.td.middle.TdResultList; import it.tdlight.utils.BinlogAsyncFile; import it.tdlight.utils.BinlogUtils; import it.tdlight.utils.MonoUtils; import java.net.ConnectException; import java.nio.file.Path; import java.time.Duration; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.LockSupport; import org.warp.commonutils.locks.LockUtils; import org.warp.commonutils.log.Logger; import org.warp.commonutils.log.LoggerFactory; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.Sinks; import reactor.core.publisher.Sinks.EmitResult; import reactor.core.publisher.Sinks.Empty; import reactor.core.publisher.Sinks.One; import reactor.core.scheduler.Schedulers; public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle { private Logger logger; public static final byte[] EMPTY = new byte[0]; private final TdClusterManager cluster; private final DeliveryOptions deliveryOptions; private final DeliveryOptions deliveryOptionsWithTimeout; private final DeliveryOptions pingDeliveryOptions; private final One binlog = Sinks.one(); private final One> updates = Sinks.one(); // This will only result in a successful completion, never completes in other ways private final Empty updatesStreamEnd = Sinks.one(); // This will only result in a crash, never completes in other ways private final Empty crash = Sinks.one(); // This will only result in a successful completion, never completes in other ways private final Empty pingFail = Sinks.one(); // This will only result in a successful completion, never completes in other ways. // It will be called when UpdateAuthorizationStateClosing is intercepted. // If it's completed stop checking if the ping works or not private final Empty authStateClosing = Sinks.one(); private long botId; private String botAddress; private String botAlias; private boolean local; public AsyncTdMiddleEventBusClient(TdClusterManager clusterManager) { this.logger = LoggerFactory.getLogger(AsyncTdMiddleEventBusClient.class); this.cluster = clusterManager; this.deliveryOptions = cluster.newDeliveryOpts().setLocalOnly(local); this.deliveryOptionsWithTimeout = cluster.newDeliveryOpts().setLocalOnly(local).setSendTimeout(30000); this.pingDeliveryOptions = cluster.newDeliveryOpts().setLocalOnly(local).setSendTimeout(60000); } private Mono initializeEb() { return Mono.just(this); } @Override public Mono initialize() { // Do nothing here. return Mono.empty(); } public static Mono getAndDeployInstance(TdClusterManager clusterManager, long botId, String botAlias, boolean local, JsonObject implementationDetails, Path binlogsArchiveDirectory) { return new AsyncTdMiddleEventBusClient(clusterManager) .initializeEb() .flatMap(instance -> retrieveBinlog(clusterManager.getVertx(), binlogsArchiveDirectory, botId) .flatMap(binlog -> binlog .getLastModifiedTime() .filter(modTime -> modTime == 0) .doOnNext(v -> LoggerFactory .getLogger(AsyncTdMiddleEventBusClient.class) .error("Can't retrieve binlog of bot " + botId + " " + botAlias + ". Creating a new one...")) .thenReturn(binlog)).flatMap(binlog -> instance .start(botId, botAlias, local, implementationDetails, binlog) .thenReturn(instance) ) .single() ) .single(); } /** * * @return optional result */ public static Mono retrieveBinlog(Vertx vertx, Path binlogsArchiveDirectory, long botId) { return BinlogUtils.retrieveBinlog(vertx.fileSystem(), binlogsArchiveDirectory.resolve(botId + ".binlog")); } private Mono saveBinlog(Buffer data) { return this.binlog.asMono().flatMap(binlog -> BinlogUtils.saveBinlog(binlog, data)); } public Mono start(long botId, String botAlias, boolean local, JsonObject implementationDetails, BinlogAsyncFile binlog) { this.botId = botId; this.botAlias = botAlias; this.botAddress = "bots.bot." + this.botId; this.local = local; this.logger = LoggerFactory.getLogger(this.botId + " " + botAlias); return MonoUtils .fromBlockingEmpty(() -> { EmitResult result; while ((result = this.binlog.tryEmitValue(binlog)) == EmitResult.FAIL_NON_SERIALIZED) { // 10ms LockSupport.parkNanos(10000000); } result.orThrow(); }) .then(binlog.getLastModifiedTime()) .zipWith(binlog.readFully().map(Buffer::getDelegate)) .single() .flatMap(tuple -> { var binlogLastModifiedTime = tuple.getT1(); var binlogData = tuple.getT2(); var msg = new StartSessionMessage(this.botId, this.botAlias, Buffer.newInstance(binlogData), binlogLastModifiedTime, implementationDetails ); return setupUpdatesListener() .then(Mono.defer(() -> { if (local) { return Mono.empty(); } logger.trace("Requesting bots.start-bot"); return cluster.getEventBus() .rxRequest("bots.start-bot", msg).as(MonoUtils::toMono) .doOnSuccess(s -> logger.trace("bots.start-bot returned successfully")) .subscribeOn(Schedulers.boundedElastic()); })) .then(setupPing()); }); } private Mono setupPing() { return Mono.fromCallable(() -> { logger.trace("Setting up ping"); // Disable ping on local servers if (!local) { Mono .defer(() -> { logger.trace("Requesting ping..."); return cluster.getEventBus() .rxRequest(botAddress + ".ping", EMPTY, pingDeliveryOptions) .as(MonoUtils::toMono); }) .flatMap(msg -> Mono.fromCallable(msg::body).subscribeOn(Schedulers.boundedElastic())) .repeatWhen(l -> l.delayElements(Duration.ofSeconds(10)).takeWhile(x -> true)) .takeUntilOther(Mono.firstWithSignal( this.updatesStreamEnd .asMono() .doOnTerminate(() -> logger.trace("About to kill pinger because updates stream ended")), this.crash .asMono() .onErrorResume(ex -> Mono.empty()) .doOnTerminate(() -> logger.trace("About to kill pinger because it has seen a crash signal")) )) .doOnNext(s -> logger.trace("PING")) .then() .onErrorResume(ex -> { logger.warn("Ping failed: {}", ex.getMessage()); return Mono.empty(); }) .doOnNext(s -> logger.debug("END PING")) .then(MonoUtils.fromBlockingEmpty(() -> { while (this.pingFail.tryEmitEmpty() == EmitResult.FAIL_NON_SERIALIZED) { // 10ms LockSupport.parkNanos(10000000); } })) .subscribeOn(Schedulers.parallel()) .subscribe(); } logger.trace("Ping setup success"); return null; }).subscribeOn(Schedulers.boundedElastic()); } private Mono setupUpdatesListener() { return Mono .fromRunnable(() -> logger.trace("Setting up updates listener...")) .then(MonoUtils.>fromBlockingSingle(() -> MessageConsumer .newInstance(cluster.getEventBus().consumer(botAddress + ".updates") .setMaxBufferedMessages(5000) .getDelegate() )) ) .flatMap(updateConsumer -> { // Return when the registration of all the consumers has been done across the cluster return Mono .fromRunnable(() -> logger.trace("Emitting updates flux to sink")) .then(MonoUtils.fromBlockingEmpty(() -> { EmitResult result; while ((result = this.updates.tryEmitValue(updateConsumer)) == EmitResult.FAIL_NON_SERIALIZED) { // 10ms LockSupport.parkNanos(10000000); } result.orThrow(); })) .doOnSuccess(s -> logger.trace("Emitted updates flux to sink")) .doOnSuccess(s -> logger.trace("Waiting to register update consumer across the cluster")) .doOnSuccess(s -> logger.trace("Registered update consumer across the cluster")); }) .doOnSuccess(s ->logger.trace("Set up updates listener")) .then(); } @SuppressWarnings("Convert2MethodRef") @Override public Flux receive() { // Here the updates will be received return Mono .fromRunnable(() -> logger.trace("Called receive() from parent")) .then(updates.asMono()) .publishOn(Schedulers.parallel()) .timeout(Duration.ofSeconds(30)) .doOnSuccess(s -> logger.trace("Registering updates flux")) .flatMapMany(updatesMessageConsumer -> MonoUtils.fromMessageConsumer(Mono .empty() .doOnSuccess(s -> logger.trace("Sending ready-to-receive")) .then(cluster.getEventBus().rxRequest(botAddress + ".ready-to-receive", EMPTY, deliveryOptionsWithTimeout ).as(MonoUtils::toMono)) .doOnSuccess(s -> logger.trace("Sent ready-to-receive, received reply")) .doOnSuccess(s -> logger.trace("About to read updates flux")) .then(), updatesMessageConsumer) ) .takeUntilOther(Flux .merge( crash.asMono() .onErrorResume(ex -> { logger.error("TDLib crashed", ex); return Mono.empty(); }), pingFail.asMono() .then(Mono.fromCallable(() -> { var ex = new ConnectException("Server did not respond to ping"); ex.setStackTrace(new StackTraceElement[0]); throw ex; })).onErrorResume(ex -> MonoUtils.fromBlockingSingle(() -> { EmitResult result; while ((result = this.crash.tryEmitError(ex)) == EmitResult.FAIL_NON_SERIALIZED) { // 10ms LockSupport.parkNanos(10000000); } return result; })) .takeUntilOther(Mono .firstWithSignal(crash.asMono(), authStateClosing.asMono()) .onErrorResume(e -> Mono.empty()) ) ) .doOnTerminate(() -> logger.trace("TakeUntilOther has been trigghered, the receive() flux will end")) ) .takeUntil(a -> a.succeeded() && a.value().stream().anyMatch(item -> { if (item.getConstructor() == UpdateAuthorizationState.CONSTRUCTOR) { return ((UpdateAuthorizationState) item).authorizationState.getConstructor() == AuthorizationStateClosed.CONSTRUCTOR; } return false; })) .flatMapSequential(updates -> { if (updates.succeeded()) { return Flux.fromIterable(updates.value()); } else { return Mono.fromCallable(() -> TdResult.failed(updates.error()).orElseThrow()); } }) .concatMap(update -> interceptUpdate(update)) // Redirect errors to crash sink .doOnError(error -> crash.tryEmitError(error)) .onErrorResume(ex -> { logger.trace("Absorbing the error, the error has been published using the crash sink", ex); return Mono.empty(); }) .doOnTerminate(updatesStreamEnd::tryEmitEmpty); } private Mono interceptUpdate(Object update) { logger.trace("Received update {}", update.getClass().getSimpleName()); if (update.getConstructor() == TdApi.UpdateAuthorizationState.CONSTRUCTOR) { var updateAuthorizationState = (TdApi.UpdateAuthorizationState) update; switch (updateAuthorizationState.authorizationState.getConstructor()) { case TdApi.AuthorizationStateClosing.CONSTRUCTOR: authStateClosing.tryEmitEmpty(); break; case TdApi.AuthorizationStateClosed.CONSTRUCTOR: logger.info("Received AuthorizationStateClosed from tdlib"); return cluster.getEventBus() .rxRequest(this.botAddress + ".read-binlog", EMPTY) .as(MonoUtils::toMono) .mapNotNull(Message::body) .doOnNext(latestBinlog -> logger.info("Received binlog from server. Size: {}", BinlogUtils.humanReadableByteCountBin(latestBinlog.binlog().length()))) .flatMap(latestBinlog -> this.saveBinlog(latestBinlog.binlog())) .doOnSuccess(s -> logger.info("Overwritten binlog from server")) .thenReturn(update); } } return Mono.just(update); } @Override public Mono> execute(Function request, Duration timeout, boolean executeSync) { var req = new ExecuteObject(executeSync, request); var deliveryOptions = new DeliveryOptions(this.deliveryOptions).setSendTimeout(timeout.toMillis()); var crashMono = crash.asMono() .doOnSuccess(s -> logger.debug("Failed request {} because the TDLib session was already crashed", request)) .then(Mono.>empty()); var executionMono = cluster.getEventBus() .rxRequest(botAddress + ".execute", req, deliveryOptions) .as(MonoUtils::toMono) .onErrorMap(ex -> ResponseError.newResponseError(request, botAlias, ex)) .>handle((resp, sink) -> { if (resp.body() == null) { var tdError = new TdError(500, "Response is empty"); sink.error(ResponseError.newResponseError(request, botAlias, tdError)); } else { sink.next(resp.body().toTdResult()); } }) .doFirst(() -> logger.trace("Executing request {}", request)) .doOnSuccess(s -> logger.trace("Executed request {}", request)) .doOnError(ex -> logger.debug("Failed request {}: {}", req, ex)); return Mono .firstWithSignal(crashMono, executionMono) .switchIfEmpty(Mono.error(() -> ResponseError.newResponseError(request, botAlias, new TdError(500, "Client is closed or response is empty")))); } }