Properly manage cluster disconnection

This commit is contained in:
Andrea Cavalli 2021-01-15 23:31:10 +01:00
parent 52cfeada84
commit 5ca09460dc
5 changed files with 136 additions and 44 deletions

View File

@ -7,6 +7,7 @@ import io.vertx.core.eventbus.MessageCodec;
import io.vertx.core.eventbus.MessageConsumer; import io.vertx.core.eventbus.MessageConsumer;
import io.vertx.core.eventbus.ReplyException; import io.vertx.core.eventbus.ReplyException;
import it.tdlight.utils.MonoUtils; import it.tdlight.utils.MonoUtils;
import java.net.ConnectException;
import java.time.Duration; import java.time.Duration;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
@ -18,6 +19,7 @@ import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks; import reactor.core.publisher.Sinks;
import reactor.core.publisher.Sinks.One; import reactor.core.publisher.Sinks.One;
import reactor.core.scheduler.Schedulers;
import reactor.util.function.Tuple2; import reactor.util.function.Tuple2;
import reactor.util.function.Tuples; import reactor.util.function.Tuples;
@ -72,6 +74,7 @@ public class EventBusFlux {
MessageConsumer<byte[]> subscriptionReady = eventBus.consumer(fluxAddress + ".subscriptionReady"); MessageConsumer<byte[]> subscriptionReady = eventBus.consumer(fluxAddress + ".subscriptionReady");
MessageConsumer<byte[]> dispose = eventBus.consumer(subscriptionAddress + ".dispose"); MessageConsumer<byte[]> dispose = eventBus.consumer(subscriptionAddress + ".dispose");
MessageConsumer<byte[]> ping = eventBus.consumer(subscriptionAddress + ".ping");
MessageConsumer<byte[]> cancel = eventBus.consumer(subscriptionAddress + ".cancel"); MessageConsumer<byte[]> cancel = eventBus.consumer(subscriptionAddress + ".cancel");
subscriptionReady.<Long>handler(subscriptionReadyMsg -> { subscriptionReady.<Long>handler(subscriptionReadyMsg -> {
@ -91,17 +94,18 @@ public class EventBusFlux {
eventBus.request(subscriptionAddress + ".signal", SignalMessage.<T>onNext(item), signalDeliveryOptions, responseHandler); eventBus.request(subscriptionAddress + ".signal", SignalMessage.<T>onNext(item), signalDeliveryOptions, responseHandler);
})).subscribe(response -> {}, error -> { })).subscribe(response -> {}, error -> {
if (error instanceof ReplyException) { if (error instanceof ReplyException) {
var errorMessage = error.getMessage(); var errorMessageCode = ((ReplyException) error).failureCode();
if (errorMessage != null && errorMessage.contains("NO_HANDLERS")) { // -1 == NO_HANDLERS
if (errorMessageCode == -1) {
logger.error("Can't send a signal of flux \"" + fluxAddress + "\" because the connection was lost"); logger.error("Can't send a signal of flux \"" + fluxAddress + "\" because the connection was lost");
} else { } else {
logger.error("Error when sending a signal of flux \"" + fluxAddress + "\": {}", error.getLocalizedMessage()); logger.error("Error when sending a signal of flux \"" + fluxAddress + "\": {}", error.toString());
} }
} else { } else {
logger.error("Error when sending a signal of flux \"" + fluxAddress + "\"", error); logger.error("Error when sending a signal of flux \"" + fluxAddress + "\"", error);
} }
fatalErrorSink.tryEmitValue(error); fatalErrorSink.tryEmitValue(error);
disposeFlux(atomicSubscription.get(), fatalErrorSink, cancel, dispose, fluxAddress, () -> { disposeFlux(atomicSubscription.get(), fatalErrorSink, ping, cancel, dispose, fluxAddress, () -> {
logger.warn("Forcefully disposed \"" + fluxAddress + "\" caused by the previous error"); logger.warn("Forcefully disposed \"" + fluxAddress + "\" caused by the previous error");
}); });
}, () -> { }, () -> {
@ -115,29 +119,49 @@ public class EventBusFlux {
}); });
atomicSubscription.set(subscription); atomicSubscription.set(subscription);
cancel.handler(msg3 -> { ping.handler(msg2 -> {
logger.trace("Client is still alive");
msg2.reply(EMPTY, deliveryOptions);
});
cancel.handler(msg2 -> {
logger.trace("Cancelling flux \"" + fluxAddress + "\""); logger.trace("Cancelling flux \"" + fluxAddress + "\"");
subscription.dispose(); subscription.dispose();
logger.debug("Cancelled flux \"" + fluxAddress + "\""); logger.debug("Cancelled flux \"" + fluxAddress + "\"");
msg3.reply(EMPTY, deliveryOptions); msg2.reply(EMPTY, deliveryOptions);
});
dispose.handler(msg2 -> {
disposeFlux(subscription, fatalErrorSink, cancel, dispose, fluxAddress, () -> msg2.reply(EMPTY));
}); });
cancel.completionHandler(h -> { dispose.handler(msg2 -> {
if (h.succeeded()) { disposeFlux(subscription,
dispose.completionHandler(h2 -> { fatalErrorSink,
if (h2.succeeded()) { ping,
subscriptionReadyMsg.reply((Long) subscriptionId); cancel,
dispose,
fluxAddress,
() -> msg2.reply(EMPTY)
);
});
ping.completionHandler(h0 -> {
if (h0.succeeded()) {
cancel.completionHandler(h1 -> {
if (h1.succeeded()) {
dispose.completionHandler(h2 -> {
if (h2.succeeded()) {
subscriptionReadyMsg.reply((Long) subscriptionId);
} else {
logger.error("Failed to register dispose", h1.cause());
subscriptionReadyMsg.fail(500, "Failed to register dispose");
}
});
} else { } else {
logger.error("Failed to register dispose", h.cause()); logger.error("Failed to register cancel", h1.cause());
subscriptionReadyMsg.fail(500, "Failed to register dispose"); subscriptionReadyMsg.fail(500, "Failed to register cancel");
} }
}); });
} else { } else {
logger.error("Failed to register cancel", h.cause()); logger.error("Failed to register ping", h0.cause());
subscriptionReadyMsg.fail(500, "Failed to register cancel"); subscriptionReadyMsg.fail(500, "Failed to register ping");
} }
}); });
} else { } else {
@ -174,6 +198,7 @@ public class EventBusFlux {
private static void disposeFlux(@Nullable Disposable subscription, private static void disposeFlux(@Nullable Disposable subscription,
One<Throwable> fatalErrorSink, One<Throwable> fatalErrorSink,
MessageConsumer<byte[]> ping,
MessageConsumer<byte[]> cancel, MessageConsumer<byte[]> cancel,
MessageConsumer<byte[]> dispose, MessageConsumer<byte[]> dispose,
String fluxAddress, String fluxAddress,
@ -183,16 +208,21 @@ public class EventBusFlux {
if (subscription != null) { if (subscription != null) {
subscription.dispose(); subscription.dispose();
} }
cancel.unregister(v -> { ping.unregister(v0 -> {
if (v.failed()) { if (v0.failed()) {
logger.error("Failed to unregister cancel", v.cause()); logger.error("Failed to unregister ping", v0.cause());
} }
dispose.unregister(v2 -> { cancel.unregister(v1 -> {
if (v.failed()) { if (v1.failed()) {
logger.error("Failed to unregister dispose", v2.cause()); logger.error("Failed to unregister cancel", v1.cause());
} }
logger.debug("Disposed flux \"" + fluxAddress + "\""); dispose.unregister(v2 -> {
after.run(); if (v2.failed()) {
logger.error("Failed to unregister dispose", v2.cause());
}
logger.debug("Disposed flux \"" + fluxAddress + "\"");
after.run();
});
}); });
}); });
} }
@ -239,17 +269,49 @@ public class EventBusFlux {
} }
}); });
emitter.onDispose(() -> eventBus.request(subscriptionAddress + ".dispose", EMPTY, deliveryOptions, msg2 -> { var pingSubscription = Flux.interval(Duration.ofSeconds(10)).flatMap(n -> Mono.create(pingSink ->
if (msg.failed()) { eventBus.<byte[]>request(subscriptionAddress + ".ping", EMPTY, deliveryOptions, pingMsg -> {
logger.error("Failed to tell that the subscription is disposed"); if (pingMsg.succeeded()) {
} pingSink.success(pingMsg.result().body());
})); } else {
var pingError = pingMsg.cause();
if (pingError instanceof ReplyException) {
var pingReplyException = (ReplyException) pingError;
// -1 = NO_HANDLERS
if (pingReplyException.failureCode() == -1) {
pingSink.error(new ConnectException( "Can't send a ping to flux \"" + fluxAddress + "\" because the connection was lost"));
} else {
pingSink.error(new ConnectException("Ping failed:" + pingReplyException.toString()));
}
} else {
pingSink.error(new IllegalStateException("Ping failed", pingError));
}
}
})))
.subscribeOn(Schedulers.single())
.subscribe(v -> {}, emitter::error);
emitter.onCancel(() -> eventBus.request(subscriptionAddress + ".cancel", EMPTY, deliveryOptions, msg2 -> { emitter.onDispose(() -> {
if (msg.failed()) { if (!pingSubscription.isDisposed()) {
logger.error("Failed to tell that the subscription is cancelled"); pingSubscription.dispose();
} }
})); eventBus.request(subscriptionAddress + ".dispose", EMPTY, deliveryOptions, msg2 -> {
if (msg.failed()) {
logger.error("Failed to tell that the subscription is disposed");
}
});
});
emitter.onCancel(() -> {
if (!pingSubscription.isDisposed()) {
pingSubscription.dispose();
}
eventBus.request(subscriptionAddress + ".cancel", EMPTY, deliveryOptions, msg2 -> {
if (msg.failed()) {
logger.error("Failed to tell that the subscription is cancelled");
}
});
});
} else { } else {
emitter.error(new IllegalStateException("Subscription failed", msg.cause())); emitter.error(new IllegalStateException("Subscription failed", msg.cause()));
} }

View File

@ -31,6 +31,7 @@ import it.tdlight.jni.TdApi.TdlibParameters;
import it.tdlight.jni.TdApi.Update; import it.tdlight.jni.TdApi.Update;
import it.tdlight.jni.TdApi.UpdateAuthorizationState; import it.tdlight.jni.TdApi.UpdateAuthorizationState;
import it.tdlight.tdlibsession.FatalErrorType; import it.tdlight.tdlibsession.FatalErrorType;
import it.tdlight.tdlibsession.td.TdError;
import it.tdlight.tdlibsession.td.TdResult; import it.tdlight.tdlibsession.td.TdResult;
import it.tdlight.tdlibsession.td.middle.AsyncTdMiddle; import it.tdlight.tdlibsession.td.middle.AsyncTdMiddle;
import it.tdlight.utils.MonoUtils; import it.tdlight.utils.MonoUtils;
@ -79,9 +80,22 @@ public class AsyncTdEasy {
.filter(upd -> upd.getState().getConstructor() == AuthorizationStateReady.CONSTRUCTOR) .filter(upd -> upd.getState().getConstructor() == AuthorizationStateReady.CONSTRUCTOR)
.map(upd -> (TdApi.Update) upd.getUpdate()) .map(upd -> (TdApi.Update) upd.getUpdate())
.doOnError(ex -> { .doOnError(ex -> {
logger.error(ex.getLocalizedMessage(), ex); if (ex instanceof TdError) {
var tdEx = (TdError) ex;
logger.error("Received an error update from telegram: " + tdEx.getTdCode() + " " + tdEx.getTdMessage());
FatalErrorType fatalErrorType;
try {
fatalErrorType = FatalErrorType.valueOf(tdEx.getTdMessage());
} catch (IllegalArgumentException ignored) {
fatalErrorType = FatalErrorType.INVALID_UPDATE;
}
this.fatalError.tryEmitValue(fatalErrorType);
} else {
logger.error(ex.getLocalizedMessage(), ex);
}
}).doOnComplete(() -> { }).doOnComplete(() -> {
authState.asFlux().take(1).single().subscribe(authState -> { authState.asFlux().take(1).single().subscribe(authState -> {
onUpdatesTerminated();
if (authState.getConstructor() != AuthorizationStateClosed.CONSTRUCTOR) { if (authState.getConstructor() != AuthorizationStateClosed.CONSTRUCTOR) {
logger.warn("Updates stream has closed while" logger.warn("Updates stream has closed while"
+ " the current authorization state is" + " the current authorization state is"
@ -89,15 +103,26 @@ public class AsyncTdEasy {
this.authState.onNext(new AuthorizationStateClosed()); this.authState.onNext(new AuthorizationStateClosed());
} }
}); });
}) }).doOnError(ex -> {
.doOnTerminate(() -> { authState.asFlux().take(1).single().subscribe(authState -> {
logger.debug("Incoming updates flux terminated. Setting requestedDefinitiveExit: true"); onUpdatesTerminated();
requestedDefinitiveExit.onNext(true); if (authState.getConstructor() != AuthorizationStateClosed.CONSTRUCTOR) {
logger.warn("Updates stream has terminated with an error while"
+ " the current authorization state is"
+ " still {}. Setting authorization state as closed!", authState.getClass().getSimpleName());
this.authState.onNext(new AuthorizationStateClosed());
}
});
}) })
.subscribeOn(scheduler) .subscribeOn(scheduler)
.publish().refCount(1); .publish().refCount(1);
} }
private void onUpdatesTerminated() {
logger.debug("Incoming updates flux terminated. Setting requestedDefinitiveExit: true");
requestedDefinitiveExit.onNext(true);
}
public Mono<Void> create(TdEasySettings settings) { public Mono<Void> create(TdEasySettings settings) {
return Mono return Mono
.fromCallable(() -> { .fromCallable(() -> {

View File

@ -1,8 +1,9 @@
package it.tdlight.tdlibsession.td.easy; package it.tdlight.tdlibsession.td.easy;
import it.tdlight.tdlibsession.FatalErrorType; import it.tdlight.tdlibsession.FatalErrorType;
import org.jetbrains.annotations.NotNull;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
public interface FatalErrorHandler { public interface FatalErrorHandler {
Mono<Void> onFatalError(FatalErrorType error); @NotNull Mono<Void> onFatalError(FatalErrorType error);
} }

View File

@ -2,6 +2,7 @@ package it.tdlight.tdlibsession.td.easy;
import java.util.Objects; import java.util.Objects;
import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.Nullable;
import reactor.core.publisher.Mono;
public class TdEasySettings { public class TdEasySettings {
public final boolean useTestDc; public final boolean useTestDc;
@ -69,7 +70,7 @@ public class TdEasySettings {
} }
this.parameterRequestHandler = parameterRequestHandler; this.parameterRequestHandler = parameterRequestHandler;
if (fatalErrorHandler == null) { if (fatalErrorHandler == null) {
fatalErrorHandler = error -> null; fatalErrorHandler = error -> Mono.empty();
} }
this.fatalErrorHandler = fatalErrorHandler; this.fatalErrorHandler = fatalErrorHandler;
} }

View File

@ -28,6 +28,7 @@ import it.tdlight.tdlibsession.td.middle.TdResultList;
import it.tdlight.tdlibsession.td.middle.TdResultListMessageCodec; import it.tdlight.tdlibsession.td.middle.TdResultListMessageCodec;
import it.tdlight.tdlibsession.td.middle.TdResultMessageCodec; import it.tdlight.tdlibsession.td.middle.TdResultMessageCodec;
import it.tdlight.utils.MonoUtils; import it.tdlight.utils.MonoUtils;
import java.net.ConnectException;
import java.time.Duration; import java.time.Duration;
import java.util.Objects; import java.util.Objects;
import java.util.StringJoiner; import java.util.StringJoiner;
@ -238,10 +239,12 @@ public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements Asy
.flatMap(block -> Flux.fromIterable(block.getValues())) .flatMap(block -> Flux.fromIterable(block.getValues()))
.filter(Objects::nonNull) .filter(Objects::nonNull)
.onErrorResume(error -> { .onErrorResume(error -> {
logger.error("Bot updates request failed! Marking as closed.", error); if (error instanceof ConnectException) {
if (error.getMessage().contains("Timed out")) { return Flux.just(TdResult.failed(new Error(444, "CONNECTION_KILLED")));
} else if (error.getMessage().contains("Timed out")) {
return Flux.just(TdResult.failed(new Error(444, "CONNECTION_KILLED"))); return Flux.just(TdResult.failed(new Error(444, "CONNECTION_KILLED")));
} else { } else {
logger.error("Bot updates request failed! Marking as closed.", error);
return Flux.just(TdResult.failed(new Error(406, "INVALID_UPDATE"))); return Flux.just(TdResult.failed(new Error(406, "INVALID_UPDATE")));
} }
}).flatMap(item -> Mono.fromCallable(item::orElseThrow)) }).flatMap(item -> Mono.fromCallable(item::orElseThrow))