From 5ca09460dc3b71a6f734d268e516e3710b63b9eb Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Fri, 15 Jan 2021 23:31:10 +0100 Subject: [PATCH] Properly manage cluster disconnection --- .../it/tdlight/tdlibsession/EventBusFlux.java | 132 +++++++++++++----- .../tdlibsession/td/easy/AsyncTdEasy.java | 35 ++++- .../td/easy/FatalErrorHandler.java | 3 +- .../tdlibsession/td/easy/TdEasySettings.java | 3 +- .../client/AsyncTdMiddleEventBusClient.java | 7 +- 5 files changed, 136 insertions(+), 44 deletions(-) diff --git a/src/main/java/it/tdlight/tdlibsession/EventBusFlux.java b/src/main/java/it/tdlight/tdlibsession/EventBusFlux.java index d5153e3..bfac79c 100644 --- a/src/main/java/it/tdlight/tdlibsession/EventBusFlux.java +++ b/src/main/java/it/tdlight/tdlibsession/EventBusFlux.java @@ -7,6 +7,7 @@ import io.vertx.core.eventbus.MessageCodec; import io.vertx.core.eventbus.MessageConsumer; import io.vertx.core.eventbus.ReplyException; import it.tdlight.utils.MonoUtils; +import java.net.ConnectException; import java.time.Duration; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -18,6 +19,7 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.Sinks; import reactor.core.publisher.Sinks.One; +import reactor.core.scheduler.Schedulers; import reactor.util.function.Tuple2; import reactor.util.function.Tuples; @@ -72,6 +74,7 @@ public class EventBusFlux { MessageConsumer subscriptionReady = eventBus.consumer(fluxAddress + ".subscriptionReady"); MessageConsumer dispose = eventBus.consumer(subscriptionAddress + ".dispose"); + MessageConsumer ping = eventBus.consumer(subscriptionAddress + ".ping"); MessageConsumer cancel = eventBus.consumer(subscriptionAddress + ".cancel"); subscriptionReady.handler(subscriptionReadyMsg -> { @@ -91,17 +94,18 @@ public class EventBusFlux { eventBus.request(subscriptionAddress + ".signal", SignalMessage.onNext(item), signalDeliveryOptions, responseHandler); })).subscribe(response -> {}, error -> { if (error instanceof ReplyException) { - var errorMessage = error.getMessage(); - if (errorMessage != null && errorMessage.contains("NO_HANDLERS")) { + var errorMessageCode = ((ReplyException) error).failureCode(); + // -1 == NO_HANDLERS + if (errorMessageCode == -1) { logger.error("Can't send a signal of flux \"" + fluxAddress + "\" because the connection was lost"); } else { - logger.error("Error when sending a signal of flux \"" + fluxAddress + "\": {}", error.getLocalizedMessage()); + logger.error("Error when sending a signal of flux \"" + fluxAddress + "\": {}", error.toString()); } } else { logger.error("Error when sending a signal of flux \"" + fluxAddress + "\"", error); } fatalErrorSink.tryEmitValue(error); - disposeFlux(atomicSubscription.get(), fatalErrorSink, cancel, dispose, fluxAddress, () -> { + disposeFlux(atomicSubscription.get(), fatalErrorSink, ping, cancel, dispose, fluxAddress, () -> { logger.warn("Forcefully disposed \"" + fluxAddress + "\" caused by the previous error"); }); }, () -> { @@ -115,29 +119,49 @@ public class EventBusFlux { }); atomicSubscription.set(subscription); - cancel.handler(msg3 -> { + ping.handler(msg2 -> { + logger.trace("Client is still alive"); + msg2.reply(EMPTY, deliveryOptions); + }); + + cancel.handler(msg2 -> { logger.trace("Cancelling flux \"" + fluxAddress + "\""); subscription.dispose(); logger.debug("Cancelled flux \"" + fluxAddress + "\""); - msg3.reply(EMPTY, deliveryOptions); - }); - dispose.handler(msg2 -> { - disposeFlux(subscription, fatalErrorSink, cancel, dispose, fluxAddress, () -> msg2.reply(EMPTY)); + msg2.reply(EMPTY, deliveryOptions); }); - cancel.completionHandler(h -> { - if (h.succeeded()) { - dispose.completionHandler(h2 -> { - if (h2.succeeded()) { - subscriptionReadyMsg.reply((Long) subscriptionId); + dispose.handler(msg2 -> { + disposeFlux(subscription, + fatalErrorSink, + ping, + cancel, + dispose, + fluxAddress, + () -> msg2.reply(EMPTY) + ); + }); + + ping.completionHandler(h0 -> { + if (h0.succeeded()) { + cancel.completionHandler(h1 -> { + if (h1.succeeded()) { + dispose.completionHandler(h2 -> { + if (h2.succeeded()) { + subscriptionReadyMsg.reply((Long) subscriptionId); + } else { + logger.error("Failed to register dispose", h1.cause()); + subscriptionReadyMsg.fail(500, "Failed to register dispose"); + } + }); } else { - logger.error("Failed to register dispose", h.cause()); - subscriptionReadyMsg.fail(500, "Failed to register dispose"); + logger.error("Failed to register cancel", h1.cause()); + subscriptionReadyMsg.fail(500, "Failed to register cancel"); } }); } else { - logger.error("Failed to register cancel", h.cause()); - subscriptionReadyMsg.fail(500, "Failed to register cancel"); + logger.error("Failed to register ping", h0.cause()); + subscriptionReadyMsg.fail(500, "Failed to register ping"); } }); } else { @@ -174,6 +198,7 @@ public class EventBusFlux { private static void disposeFlux(@Nullable Disposable subscription, One fatalErrorSink, + MessageConsumer ping, MessageConsumer cancel, MessageConsumer dispose, String fluxAddress, @@ -183,16 +208,21 @@ public class EventBusFlux { if (subscription != null) { subscription.dispose(); } - cancel.unregister(v -> { - if (v.failed()) { - logger.error("Failed to unregister cancel", v.cause()); + ping.unregister(v0 -> { + if (v0.failed()) { + logger.error("Failed to unregister ping", v0.cause()); } - dispose.unregister(v2 -> { - if (v.failed()) { - logger.error("Failed to unregister dispose", v2.cause()); + cancel.unregister(v1 -> { + if (v1.failed()) { + logger.error("Failed to unregister cancel", v1.cause()); } - logger.debug("Disposed flux \"" + fluxAddress + "\""); - after.run(); + dispose.unregister(v2 -> { + if (v2.failed()) { + logger.error("Failed to unregister dispose", v2.cause()); + } + logger.debug("Disposed flux \"" + fluxAddress + "\""); + after.run(); + }); }); }); } @@ -239,17 +269,49 @@ public class EventBusFlux { } }); - emitter.onDispose(() -> eventBus.request(subscriptionAddress + ".dispose", EMPTY, deliveryOptions, msg2 -> { - if (msg.failed()) { - logger.error("Failed to tell that the subscription is disposed"); - } - })); + var pingSubscription = Flux.interval(Duration.ofSeconds(10)).flatMap(n -> Mono.create(pingSink -> + eventBus.request(subscriptionAddress + ".ping", EMPTY, deliveryOptions, pingMsg -> { + if (pingMsg.succeeded()) { + pingSink.success(pingMsg.result().body()); + } else { + var pingError = pingMsg.cause(); + if (pingError instanceof ReplyException) { + var pingReplyException = (ReplyException) pingError; + // -1 = NO_HANDLERS + if (pingReplyException.failureCode() == -1) { + pingSink.error(new ConnectException( "Can't send a ping to flux \"" + fluxAddress + "\" because the connection was lost")); + } else { + pingSink.error(new ConnectException("Ping failed:" + pingReplyException.toString())); + } + } else { + pingSink.error(new IllegalStateException("Ping failed", pingError)); + } + } + }))) + .subscribeOn(Schedulers.single()) + .subscribe(v -> {}, emitter::error); - emitter.onCancel(() -> eventBus.request(subscriptionAddress + ".cancel", EMPTY, deliveryOptions, msg2 -> { - if (msg.failed()) { - logger.error("Failed to tell that the subscription is cancelled"); + emitter.onDispose(() -> { + if (!pingSubscription.isDisposed()) { + pingSubscription.dispose(); } - })); + eventBus.request(subscriptionAddress + ".dispose", EMPTY, deliveryOptions, msg2 -> { + if (msg.failed()) { + logger.error("Failed to tell that the subscription is disposed"); + } + }); + }); + + emitter.onCancel(() -> { + if (!pingSubscription.isDisposed()) { + pingSubscription.dispose(); + } + eventBus.request(subscriptionAddress + ".cancel", EMPTY, deliveryOptions, msg2 -> { + if (msg.failed()) { + logger.error("Failed to tell that the subscription is cancelled"); + } + }); + }); } else { emitter.error(new IllegalStateException("Subscription failed", msg.cause())); } diff --git a/src/main/java/it/tdlight/tdlibsession/td/easy/AsyncTdEasy.java b/src/main/java/it/tdlight/tdlibsession/td/easy/AsyncTdEasy.java index 801c7e0..fe9fca4 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/easy/AsyncTdEasy.java +++ b/src/main/java/it/tdlight/tdlibsession/td/easy/AsyncTdEasy.java @@ -31,6 +31,7 @@ import it.tdlight.jni.TdApi.TdlibParameters; import it.tdlight.jni.TdApi.Update; import it.tdlight.jni.TdApi.UpdateAuthorizationState; import it.tdlight.tdlibsession.FatalErrorType; +import it.tdlight.tdlibsession.td.TdError; import it.tdlight.tdlibsession.td.TdResult; import it.tdlight.tdlibsession.td.middle.AsyncTdMiddle; import it.tdlight.utils.MonoUtils; @@ -79,9 +80,22 @@ public class AsyncTdEasy { .filter(upd -> upd.getState().getConstructor() == AuthorizationStateReady.CONSTRUCTOR) .map(upd -> (TdApi.Update) upd.getUpdate()) .doOnError(ex -> { - logger.error(ex.getLocalizedMessage(), ex); + if (ex instanceof TdError) { + var tdEx = (TdError) ex; + logger.error("Received an error update from telegram: " + tdEx.getTdCode() + " " + tdEx.getTdMessage()); + FatalErrorType fatalErrorType; + try { + fatalErrorType = FatalErrorType.valueOf(tdEx.getTdMessage()); + } catch (IllegalArgumentException ignored) { + fatalErrorType = FatalErrorType.INVALID_UPDATE; + } + this.fatalError.tryEmitValue(fatalErrorType); + } else { + logger.error(ex.getLocalizedMessage(), ex); + } }).doOnComplete(() -> { authState.asFlux().take(1).single().subscribe(authState -> { + onUpdatesTerminated(); if (authState.getConstructor() != AuthorizationStateClosed.CONSTRUCTOR) { logger.warn("Updates stream has closed while" + " the current authorization state is" @@ -89,15 +103,26 @@ public class AsyncTdEasy { this.authState.onNext(new AuthorizationStateClosed()); } }); - }) - .doOnTerminate(() -> { - logger.debug("Incoming updates flux terminated. Setting requestedDefinitiveExit: true"); - requestedDefinitiveExit.onNext(true); + }).doOnError(ex -> { + authState.asFlux().take(1).single().subscribe(authState -> { + onUpdatesTerminated(); + if (authState.getConstructor() != AuthorizationStateClosed.CONSTRUCTOR) { + logger.warn("Updates stream has terminated with an error while" + + " the current authorization state is" + + " still {}. Setting authorization state as closed!", authState.getClass().getSimpleName()); + this.authState.onNext(new AuthorizationStateClosed()); + } + }); }) .subscribeOn(scheduler) .publish().refCount(1); } + private void onUpdatesTerminated() { + logger.debug("Incoming updates flux terminated. Setting requestedDefinitiveExit: true"); + requestedDefinitiveExit.onNext(true); + } + public Mono create(TdEasySettings settings) { return Mono .fromCallable(() -> { diff --git a/src/main/java/it/tdlight/tdlibsession/td/easy/FatalErrorHandler.java b/src/main/java/it/tdlight/tdlibsession/td/easy/FatalErrorHandler.java index ec751b9..3ba8a3b 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/easy/FatalErrorHandler.java +++ b/src/main/java/it/tdlight/tdlibsession/td/easy/FatalErrorHandler.java @@ -1,8 +1,9 @@ package it.tdlight.tdlibsession.td.easy; import it.tdlight.tdlibsession.FatalErrorType; +import org.jetbrains.annotations.NotNull; import reactor.core.publisher.Mono; public interface FatalErrorHandler { - Mono onFatalError(FatalErrorType error); + @NotNull Mono onFatalError(FatalErrorType error); } diff --git a/src/main/java/it/tdlight/tdlibsession/td/easy/TdEasySettings.java b/src/main/java/it/tdlight/tdlibsession/td/easy/TdEasySettings.java index 72d8847..5725b7e 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/easy/TdEasySettings.java +++ b/src/main/java/it/tdlight/tdlibsession/td/easy/TdEasySettings.java @@ -2,6 +2,7 @@ package it.tdlight.tdlibsession.td.easy; import java.util.Objects; import org.jetbrains.annotations.Nullable; +import reactor.core.publisher.Mono; public class TdEasySettings { public final boolean useTestDc; @@ -69,7 +70,7 @@ public class TdEasySettings { } this.parameterRequestHandler = parameterRequestHandler; if (fatalErrorHandler == null) { - fatalErrorHandler = error -> null; + fatalErrorHandler = error -> Mono.empty(); } this.fatalErrorHandler = fatalErrorHandler; } diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java b/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java index 89aefbb..29b0065 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java @@ -28,6 +28,7 @@ import it.tdlight.tdlibsession.td.middle.TdResultList; import it.tdlight.tdlibsession.td.middle.TdResultListMessageCodec; import it.tdlight.tdlibsession.td.middle.TdResultMessageCodec; import it.tdlight.utils.MonoUtils; +import java.net.ConnectException; import java.time.Duration; import java.util.Objects; import java.util.StringJoiner; @@ -238,10 +239,12 @@ public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements Asy .flatMap(block -> Flux.fromIterable(block.getValues())) .filter(Objects::nonNull) .onErrorResume(error -> { - logger.error("Bot updates request failed! Marking as closed.", error); - if (error.getMessage().contains("Timed out")) { + if (error instanceof ConnectException) { + return Flux.just(TdResult.failed(new Error(444, "CONNECTION_KILLED"))); + } else if (error.getMessage().contains("Timed out")) { return Flux.just(TdResult.failed(new Error(444, "CONNECTION_KILLED"))); } else { + logger.error("Bot updates request failed! Marking as closed.", error); return Flux.just(TdResult.failed(new Error(406, "INVALID_UPDATE"))); } }).flatMap(item -> Mono.fromCallable(item::orElseThrow))