From 714081a93cc0a10ffcf6aaae65a89e65193fb9ee Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Sat, 2 Oct 2021 16:48:56 +0200 Subject: [PATCH] Use timeouts --- pom.xml | 6 +- .../td/ReactorTelegramClient.java | 3 +- .../td/WrappedReactorTelegramClient.java | 10 +- .../tdlibsession/td/direct/AsyncTdDirect.java | 4 +- .../td/direct/AsyncTdDirectImpl.java | 4 +- .../tdlibsession/td/direct/TestClient.java | 3 +- .../tdlibsession/td/easy/AsyncTdEasy.java | 117 +++++++++++------- .../tdlibsession/td/middle/AsyncTdMiddle.java | 6 +- .../client/AsyncTdMiddleEventBusClient.java | 5 +- .../td/middle/direct/AsyncTdMiddleDirect.java | 7 +- .../td/middle/direct/AsyncTdMiddleLocal.java | 5 +- .../server/AsyncTdMiddleEventBusServer.java | 6 +- 12 files changed, 110 insertions(+), 66 deletions(-) diff --git a/pom.xml b/pom.xml index a4f4a11..51acf16 100644 --- a/pom.xml +++ b/pom.xml @@ -63,6 +63,11 @@ fastutil 8.5.6 + + it.tdlight + tdlight-java + 2.7.8.11 + @@ -154,7 +159,6 @@ it.tdlight tdlight-java - 2.7.8.9 it.cavallium diff --git a/src/main/java/it/tdlight/tdlibsession/td/ReactorTelegramClient.java b/src/main/java/it/tdlight/tdlibsession/td/ReactorTelegramClient.java index 5a4fd73..6726c7d 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/ReactorTelegramClient.java +++ b/src/main/java/it/tdlight/tdlibsession/td/ReactorTelegramClient.java @@ -2,6 +2,7 @@ package it.tdlight.tdlibsession.td; import it.tdlight.jni.TdApi; import it.tdlight.jni.TdApi.Object; +import java.time.Duration; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -11,7 +12,7 @@ public interface ReactorTelegramClient { Flux receive(); - Mono send(TdApi.Function query); + Mono send(TdApi.Function query, Duration timeout); TdApi.Object execute(TdApi.Function query); } diff --git a/src/main/java/it/tdlight/tdlibsession/td/WrappedReactorTelegramClient.java b/src/main/java/it/tdlight/tdlibsession/td/WrappedReactorTelegramClient.java index 5e86a2a..685c54a 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/WrappedReactorTelegramClient.java +++ b/src/main/java/it/tdlight/tdlibsession/td/WrappedReactorTelegramClient.java @@ -4,6 +4,7 @@ import it.tdlight.common.ReactiveItem; import it.tdlight.common.ReactiveTelegramClient; import it.tdlight.jni.TdApi; import it.tdlight.utils.MonoUtils; +import java.time.Duration; import org.jetbrains.annotations.NotNull; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; @@ -67,13 +68,14 @@ public class WrappedReactorTelegramClient implements ReactorTelegramClient { /** * Sends a request to the TDLib. * - * @param query Object representing a query to the TDLib. - * @throws NullPointerException if query is null. + * @param query Object representing a query to the TDLib. + * @param timeout Response timeout. * @return a publisher that will emit exactly one item, or an error + * @throws NullPointerException if query is null. */ @Override - public Mono send(TdApi.Function query) { - return Mono.from(reactiveTelegramClient.send(query)).single(); + public Mono send(TdApi.Function query, Duration timeout) { + return Mono.from(reactiveTelegramClient.send(query, timeout)).single(); } /** diff --git a/src/main/java/it/tdlight/tdlibsession/td/direct/AsyncTdDirect.java b/src/main/java/it/tdlight/tdlibsession/td/direct/AsyncTdDirect.java index 9c74f04..aaa095e 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/direct/AsyncTdDirect.java +++ b/src/main/java/it/tdlight/tdlibsession/td/direct/AsyncTdDirect.java @@ -3,6 +3,7 @@ package it.tdlight.tdlibsession.td.direct; import it.tdlight.jni.TdApi; import it.tdlight.jni.TdApi.Function; import it.tdlight.tdlibsession.td.TdResult; +import java.time.Duration; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -22,9 +23,10 @@ public interface AsyncTdDirect { * Should be called after receive. * * @param request Request to TDLib. + * @param timeout Response timeout. * @param synchronous Execute synchronously. * @return The request response or {@link it.tdlight.jni.TdApi.Error}. */ - Mono> execute(Function request, boolean synchronous); + Mono> execute(Function request, Duration timeout, boolean synchronous); } diff --git a/src/main/java/it/tdlight/tdlibsession/td/direct/AsyncTdDirectImpl.java b/src/main/java/it/tdlight/tdlibsession/td/direct/AsyncTdDirectImpl.java index f725be5..37b9b94 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/direct/AsyncTdDirectImpl.java +++ b/src/main/java/it/tdlight/tdlibsession/td/direct/AsyncTdDirectImpl.java @@ -39,7 +39,7 @@ public class AsyncTdDirectImpl implements AsyncTdDirect { } @Override - public Mono> execute(Function request, boolean synchronous) { + public Mono> execute(Function request, Duration timeout, boolean synchronous) { if (synchronous) { return Mono .firstWithSignal(td.asMono(), Mono.empty()) @@ -61,7 +61,7 @@ public class AsyncTdDirectImpl implements AsyncTdDirect { if (td != null) { return Mono .fromRunnable(() -> logger.trace("Sending request to TDLib {}", request)) - .then(td.send(request)) + .then(td.send(request, timeout)) .single() .>map(TdResult::of) .doOnSuccess(s -> logger.trace("Sent request to TDLib {}", request)); diff --git a/src/main/java/it/tdlight/tdlibsession/td/direct/TestClient.java b/src/main/java/it/tdlight/tdlibsession/td/direct/TestClient.java index 229a483..c65f143 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/direct/TestClient.java +++ b/src/main/java/it/tdlight/tdlibsession/td/direct/TestClient.java @@ -25,6 +25,7 @@ import it.tdlight.jni.TdApi.UpdateConnectionState; import it.tdlight.jni.TdApi.UpdateNewMessage; import it.tdlight.jni.TdApi.User; import it.tdlight.tdlibsession.td.ReactorTelegramClient; +import java.time.Duration; import java.util.ArrayList; import java.util.Iterator; import java.util.List; @@ -112,7 +113,7 @@ public class TestClient implements ReactorTelegramClient { } @Override - public Mono send(Function query) { + public Mono send(Function query, Duration timeout) { return Mono.fromCallable(() -> { TdApi.Object result = executeCommon(query); if (result != null) { diff --git a/src/main/java/it/tdlight/tdlibsession/td/easy/AsyncTdEasy.java b/src/main/java/it/tdlight/tdlibsession/td/easy/AsyncTdEasy.java index 35076c6..6f03799 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/easy/AsyncTdEasy.java +++ b/src/main/java/it/tdlight/tdlibsession/td/easy/AsyncTdEasy.java @@ -39,6 +39,7 @@ import java.io.File; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; +import java.time.Duration; import java.util.Comparator; import java.util.Objects; import java.util.Set; @@ -57,6 +58,7 @@ import reactor.core.scheduler.Schedulers; public class AsyncTdEasy { private final Logger logger; + private static final Duration DEFAULT_TIMEOUT = Duration.ofMinutes(1); private final Many authState = Sinks.many().replay().latest(); private final Many requestedDefinitiveExit = Sinks.many().replay().latestOrDefault(false); @@ -181,14 +183,17 @@ public class AsyncTdEasy { /** * Sends request to TDLib. + * @param timeout Timeout duration. * @return The response or {@link TdApi.Error}. */ - public Mono> send(TdApi.Function request) { - return td.execute(request, false); + public Mono> send(TdApi.Function request, Duration timeout) { + return td.execute(request, timeout, false); } - private Mono> sendDirectly(TdApi.Function obj, boolean synchronous) { - return td.execute(obj, synchronous); + private Mono> sendDirectly(TdApi.Function obj, + Duration timeout, + boolean synchronous) { + return td.execute(obj, timeout, synchronous); } /** @@ -196,7 +201,7 @@ public class AsyncTdEasy { * @param i level */ public Mono setVerbosityLevel(int i) { - return thenOrFatalError(sendDirectly(new TdApi.SetLogVerbosityLevel(i), true)); + return thenOrFatalError(sendDirectly(new TdApi.SetLogVerbosityLevel(i), DEFAULT_TIMEOUT, true)); } /** @@ -204,7 +209,10 @@ public class AsyncTdEasy { * @param name option name */ public Mono clearOption(String name) { - return thenOrFatalError(sendDirectly(new TdApi.SetOption(name, new TdApi.OptionValueEmpty()), false)); + return thenOrFatalError(sendDirectly(new TdApi.SetOption(name, new TdApi.OptionValueEmpty()), + DEFAULT_TIMEOUT, + false + )); } /** @@ -213,7 +221,10 @@ public class AsyncTdEasy { * @param value option value */ public Mono setOptionString(String name, String value) { - return thenOrFatalError(sendDirectly(new TdApi.SetOption(name, new TdApi.OptionValueString(value)), false)); + return thenOrFatalError(sendDirectly(new TdApi.SetOption(name, new TdApi.OptionValueString(value)), + DEFAULT_TIMEOUT, + false + )); } /** @@ -222,7 +233,10 @@ public class AsyncTdEasy { * @param value option value */ public Mono setOptionInteger(String name, long value) { - return thenOrFatalError(sendDirectly(new TdApi.SetOption(name, new TdApi.OptionValueInteger(value)), false)); + return thenOrFatalError(sendDirectly(new TdApi.SetOption(name, new TdApi.OptionValueInteger(value)), + DEFAULT_TIMEOUT, + false + )); } /** @@ -231,7 +245,10 @@ public class AsyncTdEasy { * @param value option value */ public Mono setOptionBoolean(String name, boolean value) { - return thenOrFatalError(sendDirectly(new TdApi.SetOption(name, new TdApi.OptionValueBoolean(value)), false)); + return thenOrFatalError(sendDirectly(new TdApi.SetOption(name, new TdApi.OptionValueBoolean(value)), + DEFAULT_TIMEOUT, + false + )); } /** @@ -240,7 +257,10 @@ public class AsyncTdEasy { * @return The value or nothing */ public Mono getOptionString(String name) { - return this.sendDirectly(new TdApi.GetOption(name), false).flatMap(MonoUtils::orElseThrow).flatMap((TdApi.OptionValue value) -> { + return this + .sendDirectly(new TdApi.GetOption(name), DEFAULT_TIMEOUT, false) + .flatMap(MonoUtils::orElseThrow) + .flatMap((TdApi.OptionValue value) -> { switch (value.getConstructor()) { case OptionValueString.CONSTRUCTOR: return Mono.just(((OptionValueString) value).value); @@ -259,17 +279,20 @@ public class AsyncTdEasy { * @return The value or nothing */ public Mono getOptionInteger(String name) { - return this.sendDirectly(new TdApi.GetOption(name), false).flatMap(MonoUtils::orElseThrow).flatMap((TdApi.OptionValue value) -> { - switch (value.getConstructor()) { - case OptionValueInteger.CONSTRUCTOR: - return Mono.just(((OptionValueInteger) value).value); - case OptionValueEmpty.CONSTRUCTOR: - return Mono.empty(); - default: - return Mono.error(new UnsupportedOperationException("The option " + name + " is of type " - + value.getClass().getSimpleName())); - } - }); + return this + .sendDirectly(new TdApi.GetOption(name), DEFAULT_TIMEOUT, false) + .flatMap(MonoUtils::orElseThrow) + .flatMap((TdApi.OptionValue value) -> { + switch (value.getConstructor()) { + case OptionValueInteger.CONSTRUCTOR: + return Mono.just(((OptionValueInteger) value).value); + case OptionValueEmpty.CONSTRUCTOR: + return Mono.empty(); + default: + return Mono.error(new UnsupportedOperationException( + "The option " + name + " is of type " + value.getClass().getSimpleName())); + } + }); } /** @@ -278,17 +301,20 @@ public class AsyncTdEasy { * @return The value or nothing */ public Mono getOptionBoolean(String name) { - return this.sendDirectly(new TdApi.GetOption(name), false).flatMap(MonoUtils::orElseThrow).flatMap((TdApi.OptionValue value) -> { - switch (value.getConstructor()) { - case OptionValueBoolean.CONSTRUCTOR: - return Mono.just(((OptionValueBoolean) value).value); - case OptionValueEmpty.CONSTRUCTOR: - return Mono.empty(); - default: - return Mono.error(new UnsupportedOperationException("The option " + name + " is of type " - + value.getClass().getSimpleName())); - } - }); + return this + .sendDirectly(new TdApi.GetOption(name), DEFAULT_TIMEOUT, false) + .flatMap(MonoUtils::orElseThrow) + .flatMap((TdApi.OptionValue value) -> { + switch (value.getConstructor()) { + case OptionValueBoolean.CONSTRUCTOR: + return Mono.just(((OptionValueBoolean) value).value); + case OptionValueEmpty.CONSTRUCTOR: + return Mono.empty(); + default: + return Mono.error(new UnsupportedOperationException( + "The option " + name + " is of type " + value.getClass().getSimpleName())); + } + }); } /** @@ -296,10 +322,11 @@ public class AsyncTdEasy { * be called from any thread. * * @param request Request to the TDLib. + * @param timeout Timeout. * @return The request response. */ - public Mono> execute(TdApi.Function request) { - return td.execute(request, true); + public Mono> execute(TdApi.Function request, Duration timeout) { + return td.execute(request, timeout, true); } /** @@ -322,7 +349,7 @@ public class AsyncTdEasy { logger.debug("Setting requestedDefinitiveExit: true"); requestedDefinitiveExit.tryEmitNext(true); }) - .then(td.execute(new TdApi.Close(), false).doOnSubscribe(s -> { + .then(td.execute(new TdApi.Close(), DEFAULT_TIMEOUT, false).doOnSubscribe(s -> { logger.debug("Sending TdApi.Close"); })) .doOnNext(closeResponse -> logger.debug("TdApi.Close response is: \"{}\"", @@ -342,7 +369,7 @@ public class AsyncTdEasy { } private Mono catchErrors(Object obj) { - return Mono.fromCallable(() -> { + return Mono.fromCallable(() -> { if (obj.getConstructor() == Error.CONSTRUCTOR) { var error = (Error) obj; @@ -419,27 +446,27 @@ public class AsyncTdEasy { parameters.enableStorageOptimizer = settings.enableStorageOptimizer; parameters.ignoreFileNames = settings.ignoreFileNames; return new SetTdlibParameters(parameters); - }).flatMap((SetTdlibParameters obj1) -> sendDirectly(obj1, false))); + }).flatMap((SetTdlibParameters obj1) -> sendDirectly(obj1, DEFAULT_TIMEOUT, false))); case AuthorizationStateWaitEncryptionKey.CONSTRUCTOR: - return thenOrFatalError(sendDirectly(new CheckDatabaseEncryptionKey(), false)) + return thenOrFatalError(sendDirectly(new CheckDatabaseEncryptionKey(), DEFAULT_TIMEOUT, false)) .onErrorResume((error) -> { logger.error("Error while checking TDLib encryption key", error); - return sendDirectly(new TdApi.Close(), false).then(); + return sendDirectly(new TdApi.Close(), DEFAULT_TIMEOUT, false).then(); }); case AuthorizationStateWaitPhoneNumber.CONSTRUCTOR: return thenOrFatalError(Mono.from(this.settings.asFlux()).flatMap(settings -> { if (settings.isPhoneNumberSet()) { return sendDirectly(new SetAuthenticationPhoneNumber(String.valueOf(settings.getPhoneNumber()), new PhoneNumberAuthenticationSettings(false, false, false) - ), false); + ), DEFAULT_TIMEOUT, false); } else if (settings.isBotTokenSet()) { - return sendDirectly(new CheckAuthenticationBotToken(settings.getBotToken()), false); + return sendDirectly(new CheckAuthenticationBotToken(settings.getBotToken()), DEFAULT_TIMEOUT, false); } else { return Mono.error(new IllegalArgumentException("A bot is neither an user or a bot")); } })).onErrorResume((error) -> { logger.error("Error while waiting for phone number", error); - return sendDirectly(new TdApi.Close(), false).then(); + return sendDirectly(new TdApi.Close(), DEFAULT_TIMEOUT, false).then(); }); case AuthorizationStateWaitRegistration.CONSTRUCTOR: var authorizationStateWaitRegistration = (AuthorizationStateWaitRegistration) obj; @@ -471,7 +498,7 @@ public class AsyncTdEasy { .defaultIfEmpty("") .doOnNext(lastName -> registerUser.lastName = lastName) ) - .then(sendDirectly(registerUser, false))); + .then(sendDirectly(registerUser, DEFAULT_TIMEOUT, false))); }); case TdApi.AuthorizationStateWaitOtherDeviceConfirmation.CONSTRUCTOR: var authorizationStateWaitOtherDeviceConfirmation = (AuthorizationStateWaitOtherDeviceConfirmation) obj; @@ -495,7 +522,7 @@ public class AsyncTdEasy { authorizationStateWaitCode.codeInfo.timeout, authorizationStateWaitCode.codeInfo.type ) - ).flatMap(code -> sendDirectly(new CheckAuthenticationCode(code), false))); + ).flatMap(code -> sendDirectly(new CheckAuthenticationCode(code), DEFAULT_TIMEOUT, false))); }); case AuthorizationStateWaitPassword.CONSTRUCTOR: var authorizationStateWaitPassword = (AuthorizationStateWaitPassword) obj; @@ -505,7 +532,7 @@ public class AsyncTdEasy { .flatMap(handler -> { return MonoUtils.thenOrLogRepeatError(() -> handler.onParameterRequest(Parameter.ASK_PASSWORD, new ParameterInfoPasswordHint(authorizationStateWaitPassword.passwordHint) - ).flatMap(password -> sendDirectly(new CheckAuthenticationPassword(password), false))); + ).flatMap(password -> sendDirectly(new CheckAuthenticationPassword(password), DEFAULT_TIMEOUT, false))); }); case AuthorizationStateReady.CONSTRUCTOR: { this.authState.tryEmitNext(new AuthorizationStateReady()); diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/AsyncTdMiddle.java b/src/main/java/it/tdlight/tdlibsession/td/middle/AsyncTdMiddle.java index 2c2c679..0b5231d 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/AsyncTdMiddle.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/AsyncTdMiddle.java @@ -2,6 +2,7 @@ package it.tdlight.tdlibsession.td.middle; import it.tdlight.jni.TdApi; import it.tdlight.tdlibsession.td.TdResult; +import java.time.Duration; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -20,7 +21,8 @@ public interface AsyncTdMiddle { * Sends request to TDLib. May be called from any thread. * * @param request Request to TDLib. - * @param executeDirectly Execute the function synchronously. + * @param timeout Timeout. + * @param executeSync Execute the function synchronously. */ - Mono> execute(TdApi.Function request, boolean executeDirectly); + Mono> execute(TdApi.Function request, Duration timeout, boolean executeSync); } diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java b/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java index b6d4798..7d6d77c 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java @@ -343,8 +343,9 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle { } @Override - public Mono> execute(Function request, boolean executeDirectly) { - var req = new ExecuteObject(executeDirectly, request); + public Mono> execute(Function request, Duration timeout, boolean executeSync) { + var req = new ExecuteObject(executeSync, request); + var deliveryOptions = new DeliveryOptions(this.deliveryOptions).setSendTimeout(timeout.toMillis()); var crashMono = crash.asMono() .doOnSuccess(s -> logger.debug("Failed request {} because the TDLib session was already crashed", request)) diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/direct/AsyncTdMiddleDirect.java b/src/main/java/it/tdlight/tdlibsession/td/middle/direct/AsyncTdMiddleDirect.java index 198c0ea..1ecb684 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/direct/AsyncTdMiddleDirect.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/direct/AsyncTdMiddleDirect.java @@ -16,6 +16,7 @@ import it.tdlight.tdlibsession.td.direct.TelegramClientFactory; import it.tdlight.tdlibsession.td.middle.AsyncTdMiddle; import it.tdlight.tdlibsession.td.middle.TdClusterManager; import it.tdlight.utils.MonoUtils; +import java.time.Duration; import org.warp.commonutils.log.Logger; import org.warp.commonutils.log.LoggerFactory; import reactor.core.publisher.Flux; @@ -101,9 +102,11 @@ public class AsyncTdMiddleDirect extends AbstractVerticle implements AsyncTdMidd } @Override - public Mono> execute(Function requestFunction, boolean executeDirectly) { + public Mono> execute(Function requestFunction, + Duration timeout, + boolean executeDirectly) { return td - .execute(requestFunction, executeDirectly) + .execute(requestFunction, timeout, executeDirectly) .onErrorMap(error -> ResponseError.newResponseError(requestFunction, botAlias, error)); } } diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/direct/AsyncTdMiddleLocal.java b/src/main/java/it/tdlight/tdlibsession/td/middle/direct/AsyncTdMiddleLocal.java index 9521be7..b0ca0e8 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/direct/AsyncTdMiddleLocal.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/direct/AsyncTdMiddleLocal.java @@ -13,6 +13,7 @@ import it.tdlight.tdlibsession.td.middle.client.AsyncTdMiddleEventBusClient; import it.tdlight.tdlibsession.td.middle.server.AsyncTdMiddleEventBusServer; import it.tdlight.utils.MonoUtils; import java.nio.file.Path; +import java.time.Duration; import org.warp.commonutils.error.InitializationException; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -79,7 +80,7 @@ public class AsyncTdMiddleLocal implements AsyncTdMiddle { } @Override - public Mono> execute(Function request, boolean executeDirectly) { - return cli.asMono().single().flatMap(c -> c.execute(request, executeDirectly)); + public Mono> execute(Function request, Duration timeout, boolean executeDirectly) { + return cli.asMono().single().flatMap(c -> c.execute(request, timeout, executeDirectly)); } } diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java b/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java index ea3f683..cae652e 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java @@ -149,9 +149,8 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { logger.trace("Received execute request {}", request); } return td - .execute(request, body.isExecuteDirectly()) + .execute(request, Duration.ofSeconds(60 + 30), body.isExecuteDirectly()) .single() - .timeout(Duration.ofSeconds(60 + 30)) .doOnSuccess(s -> logger.trace("Executed successfully. Request was {}", request)) .onErrorResume(ex -> Mono.fromRunnable(() -> msg.fail(500, ex.getLocalizedMessage()))) .flatMap(response -> Mono.fromCallable(() -> { @@ -389,7 +388,8 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { if (printDefaultException) { logger.warn("Undeploying after a fatal error in a served flux", ex); } - return td.execute(new TdApi.Close(), false) + return td + .execute(new TdApi.Close(), Duration.ofDays(1), false) .doOnError(ex2 -> logger.error("Unexpected error", ex2)) .doOnSuccess(s -> logger.debug("Emergency Close() signal has been sent successfully")) .then(rxStop().as(MonoUtils::toMono));