From cc56dd45985d057fcefded2e3c61acd4518fa394 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Fri, 22 Jan 2021 12:25:04 +0100 Subject: [PATCH] Improve reliability --- .../it/tdlight/tdlibsession/EventBusFlux.java | 49 +++++-- .../tdlibsession/td/easy/AsyncTdEasy.java | 3 +- .../client/AsyncTdMiddleEventBusClient.java | 125 +++++++++--------- 3 files changed, 101 insertions(+), 76 deletions(-) diff --git a/src/main/java/it/tdlight/tdlibsession/EventBusFlux.java b/src/main/java/it/tdlight/tdlibsession/EventBusFlux.java index bfac79c..91785c7 100644 --- a/src/main/java/it/tdlight/tdlibsession/EventBusFlux.java +++ b/src/main/java/it/tdlight/tdlibsession/EventBusFlux.java @@ -40,6 +40,12 @@ public class EventBusFlux { } /** + * If the flux is fast and you are on a network, please do this: + * + *
flux
+  .bufferTimeout(Duration.ofMillis(100))
+  .windowTimeout(1, Duration.ofSeconds(5))
+  .flatMap(w -> w.defaultIfEmpty(Collections.emptyList()))
* * @return tuple. T1 = flux served, T2 = error that caused cancelling of the subscription */ @@ -72,7 +78,7 @@ public class EventBusFlux { long subscriptionId = 0; var subscriptionAddress = fluxAddress + "." + subscriptionId; - MessageConsumer subscriptionReady = eventBus.consumer(fluxAddress + ".subscriptionReady"); + MessageConsumer subscriptionReady = eventBus.consumer(subscriptionAddress + ".subscriptionReady"); MessageConsumer dispose = eventBus.consumer(subscriptionAddress + ".dispose"); MessageConsumer ping = eventBus.consumer(subscriptionAddress + ".ping"); MessageConsumer cancel = eventBus.consumer(subscriptionAddress + ".cancel"); @@ -105,9 +111,17 @@ public class EventBusFlux { logger.error("Error when sending a signal of flux \"" + fluxAddress + "\"", error); } fatalErrorSink.tryEmitValue(error); - disposeFlux(atomicSubscription.get(), fatalErrorSink, ping, cancel, dispose, fluxAddress, () -> { - logger.warn("Forcefully disposed \"" + fluxAddress + "\" caused by the previous error"); - }); + disposeFlux(atomicSubscription.get(), + fatalErrorSink, + subscriptionReady, + ping, + cancel, + dispose, + fluxAddress, + () -> { + logger.warn("Forcefully disposed \"" + fluxAddress + "\" caused by the previous error"); + } + ); }, () -> { eventBus.request(subscriptionAddress + ".signal", SignalMessage.onComplete(), signalDeliveryOptions, msg2 -> { logger.info("Completed flux \"" + fluxAddress + "\""); @@ -134,6 +148,7 @@ public class EventBusFlux { dispose.handler(msg2 -> { disposeFlux(subscription, fatalErrorSink, + subscriptionReady, ping, cancel, dispose, @@ -191,13 +206,14 @@ public class EventBusFlux { sink.success(); } }); - }); + }).subscribeOn(Schedulers.single()).share(); return Tuples.of(servedMono, fatalErrorSink.asMono()); } private static void disposeFlux(@Nullable Disposable subscription, One fatalErrorSink, + MessageConsumer subscriptionReady, MessageConsumer ping, MessageConsumer cancel, MessageConsumer dispose, @@ -208,20 +224,25 @@ public class EventBusFlux { if (subscription != null) { subscription.dispose(); } - ping.unregister(v0 -> { + subscriptionReady.unregister(v0 -> { if (v0.failed()) { - logger.error("Failed to unregister ping", v0.cause()); + logger.error("Failed to unregister subscriptionReady", v0.cause()); } - cancel.unregister(v1 -> { + ping.unregister(v1 -> { if (v1.failed()) { - logger.error("Failed to unregister cancel", v1.cause()); + logger.error("Failed to unregister ping", v1.cause()); } - dispose.unregister(v2 -> { + cancel.unregister(v2 -> { if (v2.failed()) { - logger.error("Failed to unregister dispose", v2.cause()); + logger.error("Failed to unregister cancel", v2.cause()); } - logger.debug("Disposed flux \"" + fluxAddress + "\""); - after.run(); + dispose.unregister(v3 -> { + if (v3.failed()) { + logger.error("Failed to unregister dispose", v3.cause()); + } + logger.debug("Disposed flux \"" + fluxAddress + "\""); + after.run(); + }); }); }); }); @@ -259,7 +280,7 @@ public class EventBusFlux { }); signalConsumer.completionHandler(h -> { if (h.succeeded()) { - eventBus.request(fluxAddress + ".subscriptionReady", EMPTY, deliveryOptions, msg2 -> { + eventBus.request(subscriptionAddress + ".subscriptionReady", EMPTY, deliveryOptions, msg2 -> { if (msg2.failed()) { logger.error("Failed to tell that the subscription is ready"); } diff --git a/src/main/java/it/tdlight/tdlibsession/td/easy/AsyncTdEasy.java b/src/main/java/it/tdlight/tdlibsession/td/easy/AsyncTdEasy.java index 5ad713c..9fb0113 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/easy/AsyncTdEasy.java +++ b/src/main/java/it/tdlight/tdlibsession/td/easy/AsyncTdEasy.java @@ -59,7 +59,7 @@ public class AsyncTdEasy { private static final Logger logger = LoggerFactory.getLogger(AsyncTdEasy.class); - private final Scheduler scheduler = Schedulers.newSingle("AsyncTdEasy", false); + private static final Scheduler scheduler = Schedulers.newSingle("AsyncTdEasy", false); private final ReplayProcessor authState = ReplayProcessor.create(1); private final ReplayProcessor requestedDefinitiveExit = ReplayProcessor.cacheLastOrDefault(false); private final ReplayProcessor settings = ReplayProcessor.cacheLast(); @@ -141,7 +141,6 @@ public class AsyncTdEasy { return true; }) .subscribeOn(Schedulers.boundedElastic()) - .publishOn(scheduler) .flatMap(_v -> { this.settings.onNext(settings); return Mono.empty(); diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java b/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java index 641c0fc..835c55e 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java @@ -7,7 +7,6 @@ import io.vertx.core.AsyncResult; import io.vertx.core.Promise; import io.vertx.core.eventbus.DeliveryOptions; import io.vertx.core.eventbus.Message; -import io.vertx.core.eventbus.MessageConsumer; import io.vertx.core.json.JsonObject; import it.tdlight.common.ConstructorDetector; import it.tdlight.jni.TdApi; @@ -32,7 +31,6 @@ import java.net.ConnectException; import java.time.Duration; import java.util.Objects; import java.util.StringJoiner; -import java.util.concurrent.atomic.AtomicBoolean; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.warp.commonutils.error.InitializationException; @@ -41,6 +39,7 @@ import reactor.core.publisher.Mono; import reactor.core.publisher.SignalType; import reactor.core.publisher.Sinks; import reactor.core.publisher.Sinks.Many; +import reactor.core.publisher.Sinks.One; import reactor.core.scheduler.Schedulers; public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements AsyncTdMiddle { @@ -52,6 +51,7 @@ public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements Asy private final Many tdCloseRequested = Sinks.many().replay().latestOrDefault(false); private final Many tdClosed = Sinks.many().replay().latestOrDefault(false); + private final One tdCrash = Sinks.one(); private final DeliveryOptions deliveryOptions; private final DeliveryOptions deliveryOptionsWithTimeout; @@ -61,7 +61,6 @@ public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements Asy private String botAlias; private boolean local; private long initTime; - private MessageConsumer readyToStartConsumer; @SuppressWarnings({"unchecked", "rawtypes"}) public AsyncTdMiddleEventBusClient(TdClusterManager clusterManager) { @@ -131,38 +130,39 @@ public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements Asy logger.debug("Waiting for " + botAddress + ".readyToStart"); - AtomicBoolean alreadyReceived = new AtomicBoolean(false); - this.readyToStartConsumer = cluster.getEventBus().consumer(botAddress + ".readyToStart", (Message pingMsg) -> { - // Reply instantly - pingMsg.reply(new byte[0]); - - if (!alreadyReceived.getAndSet(true)) { - logger.debug("Received ping reply (succeeded)"); - logger.debug("Requesting " + botAddress + ".start"); - cluster - .getEventBus() - .request(botAddress + ".start", EMPTY, deliveryOptionsWithTimeout, startMsg -> { - if (startMsg.succeeded()) { - logger.debug("Requesting " + botAddress + ".isWorking"); - cluster - .getEventBus() - .request(botAddress + ".isWorking", EMPTY, deliveryOptionsWithTimeout, msg -> { - if (msg.succeeded()) { - this.listen() - .timeout(Duration.ofSeconds(30)) - .subscribeOn(Schedulers.single()) - .subscribe(v -> {}, future::fail, future::complete); - } else { - future.fail(msg.cause()); - } - }); - } else { - future.fail(startMsg.cause()); - } - }); - } else { - // Already received - } + var readyToStartConsumer = cluster.getEventBus().consumer(botAddress + ".readyToStart"); + readyToStartConsumer.handler((Message pingMsg) -> { + logger.debug("Received ping reply (succeeded)"); + readyToStartConsumer.unregister(unregistered -> { + // Reply instantly + pingMsg.reply(new byte[0]); + if (unregistered.succeeded()) { + logger.debug("Requesting " + botAddress + ".start"); + cluster + .getEventBus() + .request(botAddress + ".start", EMPTY, deliveryOptionsWithTimeout, startMsg -> { + if (startMsg.succeeded()) { + logger.debug("Requesting " + botAddress + ".isWorking"); + cluster + .getEventBus() + .request(botAddress + ".isWorking", EMPTY, deliveryOptionsWithTimeout, msg -> { + if (msg.succeeded()) { + this.listen() + .timeout(Duration.ofSeconds(30)) + .subscribeOn(Schedulers.single()) + .subscribe(v -> {}, future::fail, future::complete); + } else { + future.fail(msg.cause()); + } + }); + } else { + future.fail(startMsg.cause()); + } + }); + } else { + logger.error("Failed to unregister readyToStartConsumer", unregistered.cause()); + } + }); }); } catch (Exception ex) { future.fail(ex); @@ -178,29 +178,26 @@ public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements Asy @Override public void stop(Promise stopPromise) { logger.debug("Stopping AsyncTdMiddle client verticle..."); - readyToStartConsumer.unregister(result -> { - if (result.failed()) { - logger.error("Failed to unregister readyToStart consumer"); + tdCloseRequested.asFlux().take(1).single().flatMap(closeRequested -> { + if (!closeRequested) { + return tdCrash.asMono().switchIfEmpty(Mono + .fromRunnable(() -> logger.warn("Verticle is being stopped before closing TDLib with Close()! Sending Close() before stopping...")) + .then(this.execute(new TdApi.Close(), false) + ).then() + .cast(TdApi.Error.class) + ).doOnTerminate(() -> { + logger.debug("Close() sent to td"); + markCloseRequested(); + }).then(); } else { - logger.debug("Unregistered readyToStart consumer"); + return Mono.empty(); } - tdCloseRequested.asFlux().take(1).single().flatMap(closeRequested -> { - if (!closeRequested) { - logger.warn("Verticle is being stopped before closing TDLib with Close()! Sending Close() before stopping..."); - return this.execute(new TdApi.Close(), false).doOnTerminate(() -> { - logger.debug("Close() sent to td"); - markCloseRequested(); - }).then(); - } else { - return Mono.empty(); - } - }).thenMany(tdClosed.asFlux()).filter(closed -> closed).take(1).subscribe(v -> {}, cause -> { - logger.debug("Failed to stop AsyncTdMiddle client verticle"); - stopPromise.fail(cause); - }, () -> { - logger.debug("Stopped AsyncTdMiddle client verticle"); - stopPromise.complete(); - }); + }).thenMany(tdClosed.asFlux()).filter(closed -> closed).take(1).subscribe(v -> {}, cause -> { + logger.debug("Failed to stop AsyncTdMiddle client verticle"); + stopPromise.fail(cause); + }, () -> { + logger.debug("Stopped AsyncTdMiddle client verticle"); + stopPromise.complete(); }); } @@ -241,14 +238,17 @@ public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements Asy .flatMap(block -> Flux.fromIterable(block.getValues())) .filter(Objects::nonNull) .onErrorResume(error -> { + TdApi.Error theError; if (error instanceof ConnectException) { - return Flux.just(TdResult.failed(new Error(444, "CONNECTION_KILLED"))); + theError = new Error(444, "CONNECTION_KILLED"); } else if (error.getMessage().contains("Timed out")) { - return Flux.just(TdResult.failed(new Error(444, "CONNECTION_KILLED"))); + theError = new Error(444, "CONNECTION_KILLED"); } else { + theError = new Error(406, "INVALID_UPDATE"); logger.error("Bot updates request failed! Marking as closed.", error); - return Flux.just(TdResult.failed(new Error(406, "INVALID_UPDATE"))); } + tdCrash.tryEmitValue(theError); + return Flux.just(TdResult.failed(theError)); }).flatMap(item -> Mono.fromCallable(item::orElseThrow)) .filter(Objects::nonNull) .doOnNext(item -> { @@ -289,6 +289,9 @@ public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements Asy } private void markClosed() { + if (tdCrash.tryEmitEmpty().isFailure()) { + logger.debug("TDLib already crashed"); + } if (tdClosed.tryEmitNext(true).isFailure()) { logger.error("Failed to set tdClosed"); if (tdClosed.tryEmitComplete().isFailure()) { @@ -309,7 +312,8 @@ public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements Asy .replace(" = ", "=")); } - return tdCloseRequested.asFlux().take(1).single().filter(close -> !close).>flatMap((_x) -> Mono.create(sink -> { + var crashMono = tdCrash.asMono().>map(TdResult::failed); + var executeMono = tdCloseRequested.asFlux().take(1).single().filter(close -> !close).>flatMap((_x) -> Mono.create(sink -> { try { cluster .getEventBus() @@ -351,5 +355,6 @@ public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements Asy }).switchIfEmpty(Mono.fromSupplier(() -> { return TdResult.failed(new TdApi.Error(500, "Client is closed or response is empty")); })); + return Mono.firstWithValue(crashMono, executeMono); } }