diff --git a/src/main/java/it/tdlight/tdlibsession/td/direct/AsyncTdDirectImpl.java b/src/main/java/it/tdlight/tdlibsession/td/direct/AsyncTdDirectImpl.java index ba572a4..d17fa9c 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/direct/AsyncTdDirectImpl.java +++ b/src/main/java/it/tdlight/tdlibsession/td/direct/AsyncTdDirectImpl.java @@ -92,10 +92,11 @@ public class AsyncTdDirectImpl implements AsyncTdDirect { // Send close if the stream is disposed before tdlib is closed updatesSink.onDispose(() -> { - // Try to emit false, so that if it has not been closed from tdlib, now it is explicitly false. - closedFromTd.tryEmitValue(false); - closedFromTd.asMono().filter(isClosedFromTd -> !isClosedFromTd).doOnNext(x -> { + Mono.firstWithValue(closedFromTd.asMono(), Mono.empty()).switchIfEmpty(Mono.defer(() -> Mono.fromRunnable(() -> { + // Try to emit false, so that if it has not been closed from tdlib, now it is explicitly false. + closedFromTd.tryEmitValue(false); + }))).filter(isClosedFromTd -> !isClosedFromTd).doOnNext(x -> { logger.warn("The stream has been disposed without closing tdlib. Sending TdApi.Close()..."); client.send(new Close(), result -> logger.warn("Close result: {}", result), diff --git a/src/main/java/it/tdlight/tdlibsession/td/direct/TelegramClientFactory.java b/src/main/java/it/tdlight/tdlibsession/td/direct/TelegramClientFactory.java index aff3f80..e1b3f8f 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/direct/TelegramClientFactory.java +++ b/src/main/java/it/tdlight/tdlibsession/td/direct/TelegramClientFactory.java @@ -19,7 +19,7 @@ public class TelegramClientFactory { case "native-client": return ClientManager.create(); case "test-client": - //todo: create a noop test client with optional behaviours + return new TestClient(implementationDetails.getJsonObject("test-client-settings")); default: return null; } diff --git a/src/main/java/it/tdlight/tdlibsession/td/direct/TestClient.java b/src/main/java/it/tdlight/tdlibsession/td/direct/TestClient.java new file mode 100644 index 0000000..770aa3c --- /dev/null +++ b/src/main/java/it/tdlight/tdlibsession/td/direct/TestClient.java @@ -0,0 +1,106 @@ +package it.tdlight.tdlibsession.td.direct; + +import io.vertx.core.json.JsonArray; +import io.vertx.core.json.JsonObject; +import it.tdlight.common.ExceptionHandler; +import it.tdlight.common.ResultHandler; +import it.tdlight.common.TelegramClient; +import it.tdlight.common.UpdatesHandler; +import it.tdlight.jni.TdApi; +import it.tdlight.jni.TdApi.AuthorizationStateWaitTdlibParameters; +import it.tdlight.jni.TdApi.Error; +import it.tdlight.jni.TdApi.Function; +import it.tdlight.jni.TdApi.Object; +import it.tdlight.jni.TdApi.Ok; +import it.tdlight.jni.TdApi.SetLogTagVerbosityLevel; +import it.tdlight.jni.TdApi.SetLogVerbosityLevel; +import it.tdlight.jni.TdApi.SetOption; +import it.tdlight.jni.TdApi.SetTdlibParameters; +import it.tdlight.jni.TdApi.UpdateAuthorizationState; +import it.tdlight.tdlibsession.td.TdError; +import java.util.ArrayList; +import java.util.List; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import reactor.core.publisher.Mono; +import reactor.core.publisher.Sinks; +import reactor.core.publisher.Sinks.Many; +import reactor.core.scheduler.Scheduler; +import reactor.core.scheduler.Schedulers; + +public class TestClient implements TelegramClient { + + private static final Logger logger = LoggerFactory.getLogger(TestClient.class); + + private final Many updates = Sinks.many().unicast().onBackpressureError(); + private final Scheduler testClientScheduler = Schedulers.newSingle("test-client", true); + private final List features; + private UpdatesHandler updatesHandler; + private ExceptionHandler updateExceptionHandler; + private ExceptionHandler defaultExceptionHandler; + + public TestClient(JsonObject testClientSettings) { + JsonArray features = testClientSettings.getJsonArray("features", new JsonArray()); + this.features = new ArrayList<>(); + for (java.lang.Object feature : features) { + var featureName = (String) feature; + this.features.add(featureName); + } + } + + @Override + public void initialize(UpdatesHandler updatesHandler, + ExceptionHandler updateExceptionHandler, + ExceptionHandler defaultExceptionHandler) { + this.updatesHandler = updatesHandler; + this.updateExceptionHandler = updateExceptionHandler; + this.defaultExceptionHandler = defaultExceptionHandler; + + updates + .asFlux() + .buffer(50) + .doOnNext(ub -> logger.trace("Received update block of size {}", ub.size())) + .subscribeOn(testClientScheduler) + .subscribe(updatesHandler::onUpdates, updateExceptionHandler::onException); + + for (String featureName : features) { + switch (featureName) { + case "infinite-status-update": + Mono.just(new UpdateAuthorizationState(new AuthorizationStateWaitTdlibParameters())) + .repeat() + .buffer(100) + .doOnNext(updatesHandler::onUpdates) + .subscribeOn(testClientScheduler) + .subscribe(); + break; + default: + throw new IllegalArgumentException("Unknown feature name: " + featureName); + } + } + } + + @Override + public void send(Function query, ResultHandler resultHandler, ExceptionHandler exceptionHandler) { + switch (query.getConstructor()) { + case SetLogVerbosityLevel.CONSTRUCTOR: + case SetLogTagVerbosityLevel.CONSTRUCTOR: + case SetTdlibParameters.CONSTRUCTOR: + case SetOption.CONSTRUCTOR: + resultHandler.onResult(new Ok()); + return; + } + exceptionHandler.onException(new TdError(500, "Unsupported")); + } + + @Override + public Object execute(Function query) { + switch (query.getConstructor()) { + case SetLogVerbosityLevel.CONSTRUCTOR: + case SetLogTagVerbosityLevel.CONSTRUCTOR: + case SetTdlibParameters.CONSTRUCTOR: + case SetOption.CONSTRUCTOR: + return new Ok(); + } + return new Error(500, "Unsupported"); + } +} diff --git a/src/main/java/it/tdlight/tdlibsession/td/easy/AsyncTdEasy.java b/src/main/java/it/tdlight/tdlibsession/td/easy/AsyncTdEasy.java index d8ebea8..654cad3 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/easy/AsyncTdEasy.java +++ b/src/main/java/it/tdlight/tdlibsession/td/easy/AsyncTdEasy.java @@ -46,11 +46,10 @@ import org.reactivestreams.Publisher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.warp.commonutils.error.InitializationException; -import reactor.core.publisher.EmitterProcessor; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import reactor.core.publisher.ReplayProcessor; import reactor.core.publisher.Sinks; +import reactor.core.publisher.Sinks.Many; import reactor.core.publisher.Sinks.One; import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; @@ -59,11 +58,14 @@ public class AsyncTdEasy { private final Logger logger; - private static final Scheduler scheduler = Schedulers.newSingle("AsyncTdEasy", false); - private final ReplayProcessor authState = ReplayProcessor.create(1); - private final ReplayProcessor requestedDefinitiveExit = ReplayProcessor.cacheLastOrDefault(false); - private final ReplayProcessor settings = ReplayProcessor.cacheLast(); - private final EmitterProcessor globalErrors = EmitterProcessor.create(); + private static final Scheduler scheduler = Schedulers.newParallel("AsyncTdEasy", + Runtime.getRuntime().availableProcessors(), + false + ); + private final Many authState = Sinks.many().replay().latest(); + private final Many requestedDefinitiveExit = Sinks.many().replay().latestOrDefault(false); + private final Many settings = Sinks.many().replay().latest(); + private final Many globalErrors = Sinks.many().multicast().onBackpressureBuffer(); private final One fatalError = Sinks.one(); private final AsyncTdMiddle td; private final String logName; @@ -78,7 +80,6 @@ public class AsyncTdEasy { this.incomingUpdates = td.receive() .flatMap(this::preprocessUpdates) .flatMap(update -> Mono.from(this.getState()).single().map(state -> new AsyncTdUpdateObj(state, update))) - .filter(upd -> upd.getState().getConstructor() == AuthorizationStateReady.CONSTRUCTOR) .map(upd -> (TdApi.Update) upd.getUpdate()) .doOnError(ex -> { if (ex instanceof TdError) { @@ -96,33 +97,31 @@ public class AsyncTdEasy { } }) .doOnComplete(() -> { - authState.asFlux().take(1).single().subscribeOn(Schedulers.single()).subscribe(authState -> { + authState.asFlux().take(1).single().subscribeOn(scheduler).subscribe(authState -> { onUpdatesTerminated(); if (authState.getConstructor() != AuthorizationStateClosed.CONSTRUCTOR) { logger.warn("Updates stream has closed while" + " the current authorization state is" + " still {}. Setting authorization state as closed!", authState.getClass().getSimpleName()); - this.authState.onNext(new AuthorizationStateClosed()); + this.authState.tryEmitNext(new AuthorizationStateClosed()); } }); }).doOnError(ex -> { - authState.asFlux().take(1).single().subscribeOn(Schedulers.single()).subscribe(authState -> { + authState.asFlux().take(1).single().subscribeOn(scheduler).subscribe(authState -> { onUpdatesTerminated(); if (authState.getConstructor() != AuthorizationStateClosed.CONSTRUCTOR) { logger.warn("Updates stream has terminated with an error while" + " the current authorization state is" + " still {}. Setting authorization state as closed!", authState.getClass().getSimpleName()); - this.authState.onNext(new AuthorizationStateClosed()); + this.authState.tryEmitNext(new AuthorizationStateClosed()); } }); - }) - .subscribeOn(scheduler) - .publish().refCount(1); + }); } private void onUpdatesTerminated() { logger.debug("Incoming updates flux terminated. Setting requestedDefinitiveExit: true"); - requestedDefinitiveExit.onNext(true); + requestedDefinitiveExit.tryEmitNext(true); } public Mono create(TdEasySettings settings) { @@ -138,13 +137,13 @@ public class AsyncTdEasy { } // Register fatal error handler - fatalError.asMono().flatMap(settings.getFatalErrorHandler()::onFatalError).subscribeOn(scheduler).subscribe(); + fatalError.asMono().flatMap(settings.getFatalErrorHandler()::onFatalError).publishOn(scheduler).subscribe(); return true; }) .subscribeOn(Schedulers.boundedElastic()) .flatMap(_v -> { - this.settings.onNext(settings); + this.settings.tryEmitNext(settings); return Mono.empty(); }); } @@ -153,7 +152,7 @@ public class AsyncTdEasy { * Get TDLib state */ public Flux getState() { - return authState.distinct().subscribeOn(scheduler); + return authState.asFlux().distinct().publishOn(scheduler); } /** @@ -164,21 +163,21 @@ public class AsyncTdEasy { } private Flux getIncomingUpdates(boolean includePreAuthUpdates) { - return incomingUpdates.subscribeOn(scheduler); + return incomingUpdates.publishOn(scheduler); } /** * Get generic error updates from TDLib (When they are not linked to a precise request). */ public Flux getIncomingErrors() { - return Flux.from(globalErrors).subscribeOn(scheduler); + return Flux.from(globalErrors.asFlux()).publishOn(scheduler); } /** * Receives fatal errors from TDLib. */ public Mono getFatalErrors() { - return Mono.from(fatalError.asMono()).subscribeOn(scheduler); + return Mono.from(fatalError.asMono()).publishOn(scheduler); } /** @@ -190,7 +189,7 @@ public class AsyncTdEasy { } private Mono> sendDirectly(TdApi.Function obj, boolean synchronous) { - return td.execute(obj, synchronous).subscribeOn(scheduler); + return td.execute(obj, synchronous).publishOn(scheduler); } /** @@ -198,7 +197,7 @@ public class AsyncTdEasy { * @param i level */ public Mono setVerbosityLevel(int i) { - return thenOrFatalError(sendDirectly(new TdApi.SetLogVerbosityLevel(i), true)).subscribeOn(scheduler); + return thenOrFatalError(sendDirectly(new TdApi.SetLogVerbosityLevel(i), true)).publishOn(scheduler); } /** @@ -206,7 +205,7 @@ public class AsyncTdEasy { * @param name option name */ public Mono clearOption(String name) { - return thenOrFatalError(sendDirectly(new TdApi.SetOption(name, new TdApi.OptionValueEmpty()), false)).subscribeOn(scheduler); + return thenOrFatalError(sendDirectly(new TdApi.SetOption(name, new TdApi.OptionValueEmpty()), false)).publishOn(scheduler); } /** @@ -215,7 +214,7 @@ public class AsyncTdEasy { * @param value option value */ public Mono setOptionString(String name, String value) { - return thenOrFatalError(sendDirectly(new TdApi.SetOption(name, new TdApi.OptionValueString(value)), false)).subscribeOn(scheduler); + return thenOrFatalError(sendDirectly(new TdApi.SetOption(name, new TdApi.OptionValueString(value)), false)).publishOn(scheduler); } /** @@ -224,7 +223,7 @@ public class AsyncTdEasy { * @param value option value */ public Mono setOptionInteger(String name, long value) { - return thenOrFatalError(sendDirectly(new TdApi.SetOption(name, new TdApi.OptionValueInteger(value)), false)).subscribeOn(scheduler); + return thenOrFatalError(sendDirectly(new TdApi.SetOption(name, new TdApi.OptionValueInteger(value)), false)).publishOn(scheduler); } /** @@ -233,7 +232,7 @@ public class AsyncTdEasy { * @param value option value */ public Mono setOptionBoolean(String name, boolean value) { - return thenOrFatalError(sendDirectly(new TdApi.SetOption(name, new TdApi.OptionValueBoolean(value)), false)).subscribeOn(scheduler); + return thenOrFatalError(sendDirectly(new TdApi.SetOption(name, new TdApi.OptionValueBoolean(value)), false)).publishOn(scheduler); } /** @@ -252,7 +251,7 @@ public class AsyncTdEasy { return Mono.error(new UnsupportedOperationException("The option " + name + " is of type " + value.getClass().getSimpleName())); } - }).subscribeOn(scheduler); + }).publishOn(scheduler); } /** @@ -271,7 +270,7 @@ public class AsyncTdEasy { return Mono.error(new UnsupportedOperationException("The option " + name + " is of type " + value.getClass().getSimpleName())); } - }).subscribeOn(scheduler); + }).publishOn(scheduler); } /** @@ -290,7 +289,7 @@ public class AsyncTdEasy { return Mono.error(new UnsupportedOperationException("The option " + name + " is of type " + value.getClass().getSimpleName())); } - }).subscribeOn(scheduler); + }).publishOn(scheduler); } /** @@ -301,7 +300,7 @@ public class AsyncTdEasy { * @return The request response. */ public Mono> execute(TdApi.Function request) { - return td.execute(request, true).subscribeOn(scheduler); + return td.execute(request, true).publishOn(scheduler); } /** @@ -329,7 +328,7 @@ public class AsyncTdEasy { .filter(closeRequested -> !closeRequested) .doOnSuccess(s -> { logger.debug("Setting requestedDefinitiveExit: true"); - requestedDefinitiveExit.onNext(true); + requestedDefinitiveExit.tryEmitNext(true); }) .then(td.execute(new TdApi.Close(), false).doOnSubscribe(s -> { logger.debug("Sending TdApi.Close"); @@ -337,7 +336,7 @@ public class AsyncTdEasy { .doOnNext(closeResponse -> logger.debug("TdApi.Close response is: \"{}\"", closeResponse.toString().replace('\n', ' ') )) - .then(authState + .then(authState.asFlux() .filter(authorizationState -> authorizationState.getConstructor() == AuthorizationStateClosed.CONSTRUCTOR) .take(1) .singleOrEmpty()) @@ -348,7 +347,7 @@ public class AsyncTdEasy { logger.info("AsyncTdEasy closed successfully"); }) .then() - .subscribeOn(scheduler); + .publishOn(scheduler); } /** @@ -374,13 +373,13 @@ public class AsyncTdEasy { switch (error.message) { case "PHONE_CODE_INVALID": - globalErrors.onNext(error); + globalErrors.tryEmitNext(error); return new UpdateAuthorizationState(new AuthorizationStateWaitCode()); case "PASSWORD_HASH_INVALID": - globalErrors.onNext(error); + globalErrors.tryEmitNext(error); return new UpdateAuthorizationState(new AuthorizationStateWaitPassword()); default: - globalErrors.onNext(error); + globalErrors.tryEmitNext(error); break; } analyzeFatalErrors(error); @@ -388,7 +387,7 @@ public class AsyncTdEasy { } else { return (Update) obj; } - }).subscribeOn(scheduler); + }).publishOn(scheduler); } private void analyzeFatalErrors(Object obj) { @@ -415,7 +414,7 @@ public class AsyncTdEasy { } public Mono isBot() { - return Mono.from(settings).single().map(TdEasySettings::isBotTokenSet).subscribeOn(scheduler); + return Mono.from(settings.asFlux()).single().map(TdEasySettings::isBotTokenSet).publishOn(scheduler); } private Publisher preprocessUpdates(TdApi.Object updateObj) { @@ -427,7 +426,7 @@ public class AsyncTdEasy { .flatMap(obj -> { switch (obj.getConstructor()) { case AuthorizationStateWaitTdlibParameters.CONSTRUCTOR: - return thenOrFatalError(Mono.from(this.settings).map(settings -> { + return thenOrFatalError(Mono.from(this.settings.asFlux()).map(settings -> { var parameters = new TdlibParameters(); parameters.useTestDc = settings.useTestDc; parameters.databaseDirectory = settings.databaseDirectory; @@ -453,7 +452,7 @@ public class AsyncTdEasy { return sendDirectly(new TdApi.Close(), false).then(); }); case AuthorizationStateWaitPhoneNumber.CONSTRUCTOR: - return thenOrFatalError(Mono.from(this.settings).flatMap(settings -> { + return thenOrFatalError(Mono.from(this.settings.asFlux()).flatMap(settings -> { if (settings.isPhoneNumberSet()) { return sendDirectly(new SetAuthenticationPhoneNumber(String.valueOf(settings.getPhoneNumber()), new PhoneNumberAuthenticationSettings(false, false, false) @@ -476,7 +475,7 @@ public class AsyncTdEasy { } return Mono - .from(settings) + .from(settings.asFlux()) .map(TdEasySettings::getParameterRequestHandler) .flatMap(handler -> { return MonoUtils.thenOrLogRepeatError(() -> handler @@ -502,7 +501,7 @@ public class AsyncTdEasy { case TdApi.AuthorizationStateWaitOtherDeviceConfirmation.CONSTRUCTOR: var authorizationStateWaitOtherDeviceConfirmation = (AuthorizationStateWaitOtherDeviceConfirmation) obj; return Mono - .from(settings) + .from(settings.asFlux()) .map(TdEasySettings::getParameterRequestHandler) .flatMap(handler -> { return handler.onParameterRequest(Parameter.NOTIFY_LINK, @@ -512,7 +511,7 @@ public class AsyncTdEasy { case TdApi.AuthorizationStateWaitCode.CONSTRUCTOR: var authorizationStateWaitCode = (AuthorizationStateWaitCode) obj; return Mono - .from(settings) + .from(settings.asFlux()) .map(TdEasySettings::getParameterRequestHandler) .flatMap(handler -> { return MonoUtils.thenOrLogRepeatError(() -> handler.onParameterRequest(Parameter.ASK_CODE, @@ -526,7 +525,7 @@ public class AsyncTdEasy { case AuthorizationStateWaitPassword.CONSTRUCTOR: var authorizationStateWaitPassword = (AuthorizationStateWaitPassword) obj; return Mono - .from(settings) + .from(settings.asFlux()) .map(TdEasySettings::getParameterRequestHandler) .flatMap(handler -> { return MonoUtils.thenOrLogRepeatError(() -> handler.onParameterRequest(Parameter.ASK_PASSWORD, @@ -534,7 +533,7 @@ public class AsyncTdEasy { ).flatMap(password -> sendDirectly(new CheckAuthenticationPassword(password), false))); }); case AuthorizationStateReady.CONSTRUCTOR: { - this.authState.onNext(new AuthorizationStateReady()); + this.authState.tryEmitNext(new AuthorizationStateReady()); return Mono.empty(); } case AuthorizationStateClosing.CONSTRUCTOR: @@ -542,17 +541,17 @@ public class AsyncTdEasy { return Mono.empty(); case AuthorizationStateClosed.CONSTRUCTOR: logger.debug("Received AuthorizationStateClosed from td"); - return Mono.from(requestedDefinitiveExit).doOnNext(closeRequested -> { + return Mono.from(requestedDefinitiveExit.asFlux()).doOnNext(closeRequested -> { if (closeRequested) { logger.debug("td closed successfully"); } else { logger.warn("td closed unexpectedly: {}", logName); } - authState.onNext(obj); + authState.tryEmitNext(obj); }).flatMap(closeRequested -> { if (closeRequested) { return Mono - .from(settings) + .from(settings.asFlux()) .map(settings -> settings.databaseDirectory) .map(Path::of) .flatMapIterable(sessionPath -> Set.of(sessionPath.resolve("media"), @@ -587,8 +586,7 @@ public class AsyncTdEasy { return Mono.empty(); } }) - .then(Mono.justOrEmpty(updateObj.getConstructor() == Error.CONSTRUCTOR ? null : (Update) updateObj)) - .subscribeOn(scheduler); + .then(Mono.justOrEmpty(updateObj.getConstructor() == Error.CONSTRUCTOR ? null : (Update) updateObj)); } public Mono thenOrFatalError(Mono> optionalMono) { diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java b/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java index 9f53043..86e8df8 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java @@ -21,6 +21,7 @@ import it.tdlight.utils.BinlogAsyncFile; import it.tdlight.utils.BinlogUtils; import it.tdlight.utils.MonoUtils; import it.tdlight.utils.MonoUtils.SinkRWStream; +import java.net.ConnectException; import java.nio.file.Path; import java.time.Duration; import org.slf4j.Logger; @@ -44,7 +45,7 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle { private final One binlog = Sinks.one(); - SinkRWStream> updates = MonoUtils.unicastBackpressureStream(10000); + SinkRWStream> updates = MonoUtils.unicastBackpressureSinkStreak(); // This will only result in a successful completion, never completes in other ways private final Empty updatesStreamEnd = Sinks.one(); // This will only result in a crash, never completes in other ways @@ -131,7 +132,8 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle { @SuppressWarnings("CallingSubscribeInNonBlockingScope") private Mono setupUpdatesListener() { - MessageConsumer updateConsumer = MessageConsumer.newInstance(cluster.getEventBus().consumer(botAddress + ".updates").setMaxBufferedMessages(5000).getDelegate()); + MessageConsumer updateConsumer = MessageConsumer.newInstance(cluster.getEventBus().consumer( + botAddress + ".updates").setMaxBufferedMessages(5000).getDelegate()); updateConsumer.endHandler(h -> { logger.error("<<<<<<<<<<<<<<<>>>>>>>>>>>>"); }); @@ -152,14 +154,17 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle { return Mono .fromRunnable(() -> logger.trace("Called receive() from parent")) .doOnSuccess(s -> logger.trace("Sending ready-to-receive")) - .then(cluster.getEventBus().rxRequest(botAddress + ".ready-to-receive", EMPTY, deliveryOptionsWithTimeout).as(MonoUtils::toMono)) + .then() .doOnSuccess(s -> logger.trace("Sent ready-to-receive, received reply")) .doOnSuccess(s -> logger.trace("About to read updates flux")) .thenMany(updates.readAsFlux()) .cast(io.vertx.core.eventbus.Message.class) .timeout(Duration.ofSeconds(20), Mono.fromCallable(() -> { - throw new IllegalStateException("Server did not respond to 4 pings after 20 seconds (5 seconds per ping)"); + var ex = new ConnectException("Server did not respond to 4 pings after 20 seconds (5 seconds per ping)"); + ex.setStackTrace(new StackTraceElement[0]); + throw ex; })) + .doOnSubscribe(s -> cluster.getEventBus().send(botAddress + ".ready-to-receive", EMPTY, deliveryOptionsWithTimeout)) .flatMap(updates -> { var result = (TdResultList) updates.body(); if (result.succeeded()) { diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/direct/AsyncTdMiddleDirect.java b/src/main/java/it/tdlight/tdlibsession/td/middle/direct/AsyncTdMiddleDirect.java index a3ad09a..7743b7d 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/direct/AsyncTdMiddleDirect.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/direct/AsyncTdMiddleDirect.java @@ -2,9 +2,9 @@ package it.tdlight.tdlibsession.td.middle.direct; import static it.tdlight.tdlibsession.td.middle.server.AsyncTdMiddleEventBusServer.WAIT_DURATION; -import io.vertx.core.AbstractVerticle; -import io.vertx.core.Promise; +import io.reactivex.Completable; import io.vertx.core.json.JsonObject; +import io.vertx.reactivex.core.AbstractVerticle; import it.tdlight.jni.TdApi; import it.tdlight.jni.TdApi.Function; import it.tdlight.jni.TdApi.Object; @@ -22,6 +22,7 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.Sinks; import reactor.core.publisher.Sinks.Empty; +import reactor.core.scheduler.Schedulers; public class AsyncTdMiddleDirect extends AbstractVerticle implements AsyncTdMiddle { @@ -55,7 +56,7 @@ public class AsyncTdMiddleDirect extends AbstractVerticle implements AsyncTdMidd } @Override - public void start(Promise startPromise) { + public Completable rxStart() { var botAddress = config().getString("botAddress"); if (botAddress == null || botAddress.isEmpty()) { throw new IllegalArgumentException("botAddress is not set!"); @@ -73,13 +74,13 @@ public class AsyncTdMiddleDirect extends AbstractVerticle implements AsyncTdMidd this.td = new AsyncTdDirectImpl(clientFactory, implementationDetails, botAlias); - startPromise.complete(); + return Completable.complete(); } @Override - public void stop(Promise stopPromise) { + public Completable rxStop() { closeRequest.tryEmitEmpty(); - stopPromise.complete(); + return Completable.complete(); } @Override @@ -87,8 +88,10 @@ public class AsyncTdMiddleDirect extends AbstractVerticle implements AsyncTdMidd return td .receive(new AsyncTdDirectOptions(WAIT_DURATION, 100)) .takeUntilOther(closeRequest.asMono()) + .doOnNext(s -> logger.trace("Received update from tdlib: {}", s)) .doOnError(ex -> logger.info("TdMiddle verticle error", ex)) - .doOnTerminate(() -> logger.debug("TdMiddle verticle stopped")); + .doOnTerminate(() -> logger.debug("TdMiddle verticle stopped")) + .publishOn(Schedulers.single()); } @Override diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java b/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java index 419dbe4..522dacf 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java @@ -2,6 +2,8 @@ package it.tdlight.tdlibsession.td.middle.server; import io.reactivex.Completable; import io.vertx.core.eventbus.DeliveryOptions; +import io.vertx.core.eventbus.ReplyException; +import io.vertx.core.eventbus.ReplyFailure; import io.vertx.reactivex.core.AbstractVerticle; import io.vertx.reactivex.core.eventbus.Message; import io.vertx.reactivex.core.eventbus.MessageConsumer; @@ -24,6 +26,7 @@ import it.tdlight.tdlibsession.td.middle.TdResultList; import it.tdlight.tdlibsession.td.middle.TdResultListMessageCodec; import it.tdlight.utils.BinlogUtils; import it.tdlight.utils.MonoUtils; +import java.net.ConnectException; import java.time.Duration; import java.util.Collections; import org.slf4j.Logger; @@ -377,11 +380,26 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { } })) .onErrorResume(ex -> { - logger.warn("Undeploying after a fatal error in a served flux", ex); + boolean printDefaultException = true; + if (ex instanceof ReplyException) { + ReplyException replyException = (ReplyException) ex; + if (replyException.failureCode() == -1 && replyException.failureType() == ReplyFailure.NO_HANDLERS) { + logger.warn("Undeploying, the flux has been terminated because no more handlers are available on the event bus. {}", replyException.getMessage()); + printDefaultException = false; + } + } else if (ex instanceof ConnectException || ex instanceof java.nio.channels.ClosedChannelException) { + logger.warn("Undeploying, the flux has been terminated because the consumer disconnected from the event bus. {}", ex.getMessage()); + printDefaultException = false; + } + if (printDefaultException) { + logger.warn("Undeploying after a fatal error in a served flux", ex); + } return td.execute(new TdApi.Close(), false) .doOnError(ex2 -> logger.error("Unexpected error", ex2)) - .then(); + .doOnSuccess(s -> logger.debug("Emergency Close() signal has been sent successfully")) + .then(rxStop().as(MonoUtils::toMono)); }); + return MonoUtils.emitValue(this.pipeFlux, pipeFlux) .doOnSuccess(s -> logger.trace("Prepared piping requests successfully")); }