From c30ac9bec65ff7da3cd601a056e855beb7704339 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Sat, 2 Oct 2021 22:51:47 +0200 Subject: [PATCH] Use AtomicReference when needed --- .../td/direct/AsyncTdDirectImpl.java | 71 +++--- .../tdlibsession/td/easy/AsyncTdEasy.java | 229 ++++++++---------- .../td/middle/direct/AsyncTdMiddleLocal.java | 26 +- 3 files changed, 157 insertions(+), 169 deletions(-) diff --git a/src/main/java/it/tdlight/tdlibsession/td/direct/AsyncTdDirectImpl.java b/src/main/java/it/tdlight/tdlibsession/td/direct/AsyncTdDirectImpl.java index 37b9b94..67cc8b2 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/direct/AsyncTdDirectImpl.java +++ b/src/main/java/it/tdlight/tdlibsession/td/direct/AsyncTdDirectImpl.java @@ -12,6 +12,8 @@ import it.tdlight.tdlibsession.td.TdError; import it.tdlight.tdlibsession.td.TdResult; import it.tdlight.utils.MonoUtils; import java.time.Duration; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicReference; import org.warp.commonutils.log.Logger; import org.warp.commonutils.log.LoggerFactory; import reactor.core.publisher.Flux; @@ -28,7 +30,7 @@ public class AsyncTdDirectImpl implements AsyncTdDirect { private final JsonObject implementationDetails; private final String botAlias; - private final One td = Sinks.one(); + private final AtomicReference td = new AtomicReference<>(null); public AsyncTdDirectImpl(TelegramClientFactory telegramClientFactory, JsonObject implementationDetails, @@ -41,43 +43,38 @@ public class AsyncTdDirectImpl implements AsyncTdDirect { @Override public Mono> execute(Function request, Duration timeout, boolean synchronous) { if (synchronous) { - return Mono - .firstWithSignal(td.asMono(), Mono.empty()) - .single() - .timeout(Duration.ofSeconds(5)) - .flatMap(td -> MonoUtils.fromBlockingSingle(() -> { + return MonoUtils.fromBlockingSingle(() -> { + var td = this.td.get(); logger.trace("Sending execute to TDLib {}", request); + Objects.requireNonNull(td, "td is null"); TdResult result = TdResult.of(td.execute(request)); logger.trace("Received execute response from TDLib. Request was {}", request); return result; - })) - .single(); - } else { - return Mono - .firstWithSignal(td.asMono(), Mono.empty()) - .single() - .timeout(Duration.ofSeconds(5)) - .>flatMap(td -> { - if (td != null) { - return Mono - .fromRunnable(() -> logger.trace("Sending request to TDLib {}", request)) - .then(td.send(request, timeout)) - .single() - .>map(TdResult::of) - .doOnSuccess(s -> logger.trace("Sent request to TDLib {}", request)); - } else { - return Mono.fromCallable(() -> { - if (request.getConstructor() == Close.CONSTRUCTOR) { - logger.trace("Sending close success to request {}", request); - return TdResult.of(new Ok()); - } else { - logger.trace("Sending close error to request {} ", request); - throw new IllegalStateException("TDLib client is destroyed"); - } - }); - } }) .single(); + } else { + return Mono.defer(() -> { + var td = this.td.get(); + + if (td != null) { + return Mono + .fromRunnable(() -> logger.trace("Sending request to TDLib {}", request)) + .then(td.send(request, timeout)) + .single() + .>map(TdResult::of) + .doOnSuccess(s -> logger.trace("Sent request to TDLib {}", request)); + } else { + return Mono.fromCallable(() -> { + if (request.getConstructor() == Close.CONSTRUCTOR) { + logger.trace("Sending close success to request {}", request); + return TdResult.of(new Ok()); + } else { + logger.trace("Sending close error to request {} ", request); + throw new IllegalStateException("TDLib client is destroyed"); + } + }); + } + }); } } @@ -87,12 +84,7 @@ public class AsyncTdDirectImpl implements AsyncTdDirect { .fromRunnable(() -> logger.trace("Initializing")) .then(telegramClientFactory.create(implementationDetails)) .flatMap(reactorTelegramClient -> reactorTelegramClient.initialize().thenReturn(reactorTelegramClient)) - .flatMap(client -> { - if (td.tryEmitValue(client).isFailure()) { - return Mono.error(new TdError(500, "Failed to emit td client")); - } - return Mono.just(client); - }) + .doOnNext(td::set) .doOnNext(client -> client.execute(new TdApi.SetLogVerbosityLevel(1))) .doOnSuccess(s -> logger.trace("Initialized")) .then(); @@ -103,9 +95,8 @@ public class AsyncTdDirectImpl implements AsyncTdDirect { // If closed it will be either true or false final One closedFromTd = Sinks.one(); return Mono - .firstWithSignal(td.asMono(), Mono.empty()) + .fromCallable(td::get) .single() - .timeout(Duration.ofSeconds(5)) .flatMapMany(ReactorTelegramClient::receive) .doOnNext(update -> { // Close the emitter if receive closed state diff --git a/src/main/java/it/tdlight/tdlibsession/td/easy/AsyncTdEasy.java b/src/main/java/it/tdlight/tdlibsession/td/easy/AsyncTdEasy.java index cbf2a3f..2424bf0 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/easy/AsyncTdEasy.java +++ b/src/main/java/it/tdlight/tdlibsession/td/easy/AsyncTdEasy.java @@ -17,8 +17,8 @@ import it.tdlight.jni.TdApi.CheckAuthenticationCode; import it.tdlight.jni.TdApi.CheckAuthenticationPassword; import it.tdlight.jni.TdApi.CheckDatabaseEncryptionKey; import it.tdlight.jni.TdApi.Error; +import it.tdlight.jni.TdApi.Function; import it.tdlight.jni.TdApi.Object; -import it.tdlight.jni.TdApi.OptionValue; import it.tdlight.jni.TdApi.OptionValueBoolean; import it.tdlight.jni.TdApi.OptionValueEmpty; import it.tdlight.jni.TdApi.OptionValueInteger; @@ -43,6 +43,8 @@ import java.time.Duration; import java.util.Comparator; import java.util.Objects; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import org.reactivestreams.Publisher; import org.warp.commonutils.log.Logger; import org.warp.commonutils.log.LoggerFactory; @@ -55,14 +57,15 @@ import reactor.core.publisher.Sinks.One; import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; +@SuppressWarnings("unused") public class AsyncTdEasy { private final Logger logger; private static final Duration DEFAULT_TIMEOUT = Duration.ofMinutes(1); private final Many authState = Sinks.many().replay().latest(); - private final Many requestedDefinitiveExit = Sinks.many().replay().latestOrDefault(false); - private final Many settings = Sinks.many().replay().latest(); + private final AtomicBoolean requestedDefinitiveExit = new AtomicBoolean(); + private final AtomicReference settings = new AtomicReference<>(null); private final Many globalErrors = Sinks.many().multicast().onBackpressureBuffer(); private final One fatalError = Sinks.one(); private final AsyncTdMiddle td; @@ -94,34 +97,35 @@ public class AsyncTdEasy { logger.error(ex.getLocalizedMessage(), ex); } }) - .doOnComplete(() -> { - authState.asFlux().take(1).single().subscribeOn(scheduler).subscribe(authState -> { - onUpdatesTerminated(); - if (authState.getConstructor() != AuthorizationStateClosed.CONSTRUCTOR) { - logger.warn("Updates stream has closed while" - + " the current authorization state is" - + " still {}. Setting authorization state as closed!", authState.getClass().getSimpleName()); - this.fatalError.tryEmitValue(FatalErrorType.CONNECTION_KILLED); - this.authState.tryEmitNext(new AuthorizationStateClosed()); - } - }); - }).doOnError(ex -> { - authState.asFlux().take(1).single().subscribeOn(scheduler).subscribe(authState -> { - onUpdatesTerminated(); - if (authState.getConstructor() != AuthorizationStateClosed.CONSTRUCTOR) { - logger.warn("Updates stream has terminated with an error while" - + " the current authorization state is" - + " still {}. Setting authorization state as closed!", authState.getClass().getSimpleName()); - this.fatalError.tryEmitValue(FatalErrorType.CONNECTION_KILLED); - this.authState.tryEmitNext(new AuthorizationStateClosed()); - } - }); - }); + .doOnComplete(() -> authState.asFlux().take(1, true).single().subscribeOn(scheduler).subscribe(authState -> { + onUpdatesTerminated(); + if (authState.getConstructor() != AuthorizationStateClosed.CONSTRUCTOR) { + logger.warn("Updates stream has closed while" + + " the current authorization state is" + + " still {}. Setting authorization state as closed!", authState.getClass().getSimpleName()); + this.fatalError.tryEmitValue(FatalErrorType.CONNECTION_KILLED); + this.authState.tryEmitNext(new AuthorizationStateClosed()); + } + })).doOnError(ex -> authState.asFlux() + .take(1, true) + .single() + .subscribeOn(scheduler) + .subscribe(authState -> { + onUpdatesTerminated(); + if (authState.getConstructor() != AuthorizationStateClosed.CONSTRUCTOR) { + logger.warn("Updates stream has terminated with an error while" + + " the current authorization state is" + + " still {}. Setting authorization state as closed!", authState.getClass().getSimpleName()); + this.fatalError.tryEmitValue(FatalErrorType.CONNECTION_KILLED); + this.authState.tryEmitNext(new AuthorizationStateClosed()); + } + }) + ); } private void onUpdatesTerminated() { logger.debug("Incoming updates flux terminated. Setting requestedDefinitiveExit: true"); - requestedDefinitiveExit.tryEmitNext(true); + requestedDefinitiveExit.set(true); } public Mono create(TdEasySettings settings) { @@ -143,7 +147,7 @@ public class AsyncTdEasy { }) .subscribeOn(Schedulers.boundedElastic()) .flatMap(_v -> { - this.settings.tryEmitNext(settings); + this.settings.set(settings); return Mono.empty(); }) .then(td.initialize()); @@ -160,10 +164,6 @@ public class AsyncTdEasy { * Get incoming updates from TDLib. */ public Flux getIncomingUpdates() { - return getIncomingUpdates(false); - } - - private Flux getIncomingUpdates(boolean includePreAuthUpdates) { return incomingUpdates; } @@ -190,10 +190,8 @@ public class AsyncTdEasy { return td.execute(request, timeout, false); } - private Mono> sendDirectly(TdApi.Function obj, - Duration timeout, - boolean synchronous) { - return td.execute(obj, timeout, synchronous); + private Mono> sendDirectly(Function obj, boolean synchronous) { + return td.execute(obj, AsyncTdEasy.DEFAULT_TIMEOUT, synchronous); } /** @@ -201,7 +199,7 @@ public class AsyncTdEasy { * @param i level */ public Mono setVerbosityLevel(int i) { - return thenOrFatalError(sendDirectly(new TdApi.SetLogVerbosityLevel(i), DEFAULT_TIMEOUT, true)); + return thenOrFatalError(sendDirectly(new TdApi.SetLogVerbosityLevel(i), true)); } /** @@ -209,9 +207,7 @@ public class AsyncTdEasy { * @param name option name */ public Mono clearOption(String name) { - return thenOrFatalError(sendDirectly(new TdApi.SetOption(name, new TdApi.OptionValueEmpty()), - DEFAULT_TIMEOUT, - false + return thenOrFatalError(sendDirectly(new TdApi.SetOption(name, new TdApi.OptionValueEmpty()), false )); } @@ -221,9 +217,7 @@ public class AsyncTdEasy { * @param value option value */ public Mono setOptionString(String name, String value) { - return thenOrFatalError(sendDirectly(new TdApi.SetOption(name, new TdApi.OptionValueString(value)), - DEFAULT_TIMEOUT, - false + return thenOrFatalError(sendDirectly(new TdApi.SetOption(name, new TdApi.OptionValueString(value)), false )); } @@ -233,9 +227,7 @@ public class AsyncTdEasy { * @param value option value */ public Mono setOptionInteger(String name, long value) { - return thenOrFatalError(sendDirectly(new TdApi.SetOption(name, new TdApi.OptionValueInteger(value)), - DEFAULT_TIMEOUT, - false + return thenOrFatalError(sendDirectly(new TdApi.SetOption(name, new TdApi.OptionValueInteger(value)), false )); } @@ -245,9 +237,7 @@ public class AsyncTdEasy { * @param value option value */ public Mono setOptionBoolean(String name, boolean value) { - return thenOrFatalError(sendDirectly(new TdApi.SetOption(name, new TdApi.OptionValueBoolean(value)), - DEFAULT_TIMEOUT, - false + return thenOrFatalError(sendDirectly(new TdApi.SetOption(name, new TdApi.OptionValueBoolean(value)), false )); } @@ -258,7 +248,7 @@ public class AsyncTdEasy { */ public Mono getOptionString(String name) { return this - .sendDirectly(new TdApi.GetOption(name), DEFAULT_TIMEOUT, false) + .sendDirectly(new TdApi.GetOption(name), false) .handle(MonoUtils::orElseThrow) .flatMap(value -> { switch (value.getConstructor()) { @@ -280,7 +270,7 @@ public class AsyncTdEasy { */ public Mono getOptionInteger(String name) { return this - .sendDirectly(new TdApi.GetOption(name), DEFAULT_TIMEOUT, false) + .sendDirectly(new TdApi.GetOption(name), false) .handle(MonoUtils::orElseThrow) .flatMap(value -> { switch (value.getConstructor()) { @@ -302,7 +292,7 @@ public class AsyncTdEasy { */ public Mono getOptionBoolean(String name) { return this - .sendDirectly(new TdApi.GetOption(name), DEFAULT_TIMEOUT, false) + .sendDirectly(new TdApi.GetOption(name), false) .handle(MonoUtils::orElseThrow) .flatMap(value -> { switch (value.getConstructor()) { @@ -343,15 +333,14 @@ public class AsyncTdEasy { return true; } }) - .then(requestedDefinitiveExit.asFlux().take(1).single()) + .then(Mono.fromCallable(requestedDefinitiveExit::get).single()) .filter(closeRequested -> !closeRequested) .doOnSuccess(s -> { logger.debug("Setting requestedDefinitiveExit: true"); - requestedDefinitiveExit.tryEmitNext(true); + requestedDefinitiveExit.set(true); }) - .then(td.execute(new TdApi.Close(), DEFAULT_TIMEOUT, false).doOnSubscribe(s -> { - logger.debug("Sending TdApi.Close"); - })) + .doOnSuccess(s -> logger.debug("Sending TdApi.Close")) + .then(td.execute(new TdApi.Close(), DEFAULT_TIMEOUT, false)) .doOnNext(closeResponse -> logger.debug("TdApi.Close response is: \"{}\"", closeResponse.toString().replace('\n', ' ') )) @@ -359,12 +348,8 @@ public class AsyncTdEasy { .filter(authorizationState -> authorizationState.getConstructor() == AuthorizationStateClosed.CONSTRUCTOR) .take(1) .singleOrEmpty()) - .doOnNext(ok -> { - logger.debug("Received AuthorizationStateClosed after TdApi.Close"); - }) - .doOnSuccess(s -> { - logger.info("AsyncTdEasy closed successfully"); - }) + .doOnNext(ok -> logger.debug("Received AuthorizationStateClosed after TdApi.Close")) + .doOnSuccess(s -> logger.info("AsyncTdEasy closed successfully")) .then(); } @@ -415,10 +400,6 @@ public class AsyncTdEasy { } } - public Mono isBot() { - return Mono.from(settings.asFlux()).single().map(TdEasySettings::isBotTokenSet); - } - private Publisher preprocessUpdates(TdApi.Object updateObj) { return Mono .just(updateObj) @@ -428,7 +409,7 @@ public class AsyncTdEasy { .flatMap(obj -> { switch (obj.getConstructor()) { case AuthorizationStateWaitTdlibParameters.CONSTRUCTOR: - return thenOrFatalError(Mono.from(this.settings.asFlux()).map(settings -> { + return thenOrFatalError(Mono.fromCallable(this.settings::get).single().map(settings -> { var parameters = new TdlibParameters(); parameters.useTestDc = settings.useTestDc; parameters.databaseDirectory = settings.databaseDirectory; @@ -446,27 +427,27 @@ public class AsyncTdEasy { parameters.enableStorageOptimizer = settings.enableStorageOptimizer; parameters.ignoreFileNames = settings.ignoreFileNames; return new SetTdlibParameters(parameters); - }).flatMap((SetTdlibParameters obj1) -> sendDirectly(obj1, DEFAULT_TIMEOUT, false))); + }).flatMap((SetTdlibParameters obj1) -> sendDirectly(obj1, false))); case AuthorizationStateWaitEncryptionKey.CONSTRUCTOR: - return thenOrFatalError(sendDirectly(new CheckDatabaseEncryptionKey(), DEFAULT_TIMEOUT, false)) + return thenOrFatalError(sendDirectly(new CheckDatabaseEncryptionKey(), false)) .onErrorResume((error) -> { logger.error("Error while checking TDLib encryption key", error); - return sendDirectly(new TdApi.Close(), DEFAULT_TIMEOUT, false).then(); + return sendDirectly(new TdApi.Close(), false).then(); }); case AuthorizationStateWaitPhoneNumber.CONSTRUCTOR: - return thenOrFatalError(Mono.from(this.settings.asFlux()).flatMap(settings -> { + return thenOrFatalError(Mono.fromCallable(this.settings::get).single().flatMap(settings -> { if (settings.isPhoneNumberSet()) { return sendDirectly(new SetAuthenticationPhoneNumber(String.valueOf(settings.getPhoneNumber()), new PhoneNumberAuthenticationSettings(false, false, false) - ), DEFAULT_TIMEOUT, false); + ), false); } else if (settings.isBotTokenSet()) { - return sendDirectly(new CheckAuthenticationBotToken(settings.getBotToken()), DEFAULT_TIMEOUT, false); + return sendDirectly(new CheckAuthenticationBotToken(settings.getBotToken()), false); } else { return Mono.error(new IllegalArgumentException("A bot is neither an user or a bot")); } })).onErrorResume((error) -> { logger.error("Error while waiting for phone number", error); - return sendDirectly(new TdApi.Close(), DEFAULT_TIMEOUT, false).then(); + return sendDirectly(new TdApi.Close(), false).then(); }); case AuthorizationStateWaitRegistration.CONSTRUCTOR: var authorizationStateWaitRegistration = (AuthorizationStateWaitRegistration) obj; @@ -477,63 +458,62 @@ public class AsyncTdEasy { } return Mono - .from(settings.asFlux()) + .fromCallable(this.settings::get) + .single() .map(TdEasySettings::getParameterRequestHandler) - .flatMap(handler -> { - return MonoUtils.thenOrLogRepeatError(() -> handler - .onParameterRequest(Parameter.ASK_FIRST_NAME, new ParameterInfoEmpty()) - .filter(Objects::nonNull) - .map(String::trim) - .filter(firstName -> !firstName.isBlank() && firstName.length() <= 64 && firstName.length() > 0) - .repeatWhen(s -> s.takeWhile(n -> n == 0)) - .last() - .doOnNext(firstName -> registerUser.firstName = firstName) - .then(handler - .onParameterRequest(Parameter.ASK_LAST_NAME, new ParameterInfoEmpty()) - .filter(Objects::nonNull) - .map(String::trim) - .filter(lastName -> lastName.length() <= 64) - .repeatWhen(s -> s.takeWhile(n -> n == 0)) - .last() - .defaultIfEmpty("") - .doOnNext(lastName -> registerUser.lastName = lastName) - ) - .then(sendDirectly(registerUser, DEFAULT_TIMEOUT, false))); - }); + .flatMap(handler -> MonoUtils.thenOrLogRepeatError(() -> handler + .onParameterRequest(Parameter.ASK_FIRST_NAME, new ParameterInfoEmpty()) + .filter(Objects::nonNull) + .map(String::trim) + .filter(firstName -> !firstName.isBlank() && firstName.length() <= 64 && firstName.length() > 0) + .repeatWhen(s -> s.takeWhile(n -> n == 0)) + .last() + .doOnNext(firstName -> registerUser.firstName = firstName) + .then(handler + .onParameterRequest(Parameter.ASK_LAST_NAME, new ParameterInfoEmpty()) + .filter(Objects::nonNull) + .map(String::trim) + .filter(lastName -> lastName.length() <= 64) + .repeatWhen(s -> s.takeWhile(n -> n == 0)) + .last() + .defaultIfEmpty("") + .doOnNext(lastName -> registerUser.lastName = lastName) + ) + .then(sendDirectly(registerUser, false)) + )); case TdApi.AuthorizationStateWaitOtherDeviceConfirmation.CONSTRUCTOR: var authorizationStateWaitOtherDeviceConfirmation = (AuthorizationStateWaitOtherDeviceConfirmation) obj; return Mono - .from(settings.asFlux()) + .fromCallable(this.settings::get) + .single() .map(TdEasySettings::getParameterRequestHandler) - .flatMap(handler -> { - return handler.onParameterRequest(Parameter.NOTIFY_LINK, - new ParameterInfoNotifyLink(authorizationStateWaitOtherDeviceConfirmation.link) - ); - }); + .flatMap(handler -> handler.onParameterRequest(Parameter.NOTIFY_LINK, + new ParameterInfoNotifyLink(authorizationStateWaitOtherDeviceConfirmation.link) + )); case TdApi.AuthorizationStateWaitCode.CONSTRUCTOR: var authorizationStateWaitCode = (AuthorizationStateWaitCode) obj; return Mono - .from(settings.asFlux()) + .fromCallable(this.settings::get) + .single() .map(TdEasySettings::getParameterRequestHandler) - .flatMap(handler -> { - return MonoUtils.thenOrLogRepeatError(() -> handler.onParameterRequest(Parameter.ASK_CODE, - new ParameterInfoCode(authorizationStateWaitCode.codeInfo.phoneNumber, - authorizationStateWaitCode.codeInfo.nextType, - authorizationStateWaitCode.codeInfo.timeout, - authorizationStateWaitCode.codeInfo.type - ) - ).flatMap(code -> sendDirectly(new CheckAuthenticationCode(code), DEFAULT_TIMEOUT, false))); - }); + .flatMap(handler -> MonoUtils.thenOrLogRepeatError(() -> handler + .onParameterRequest(Parameter.ASK_CODE, new ParameterInfoCode(authorizationStateWaitCode.codeInfo.phoneNumber, + authorizationStateWaitCode.codeInfo.nextType, + authorizationStateWaitCode.codeInfo.timeout, + authorizationStateWaitCode.codeInfo.type)) + .flatMap(code -> sendDirectly(new CheckAuthenticationCode(code), false)) + )); case AuthorizationStateWaitPassword.CONSTRUCTOR: var authorizationStateWaitPassword = (AuthorizationStateWaitPassword) obj; return Mono - .from(settings.asFlux()) + .fromCallable(this.settings::get) + .single() .map(TdEasySettings::getParameterRequestHandler) - .flatMap(handler -> { - return MonoUtils.thenOrLogRepeatError(() -> handler.onParameterRequest(Parameter.ASK_PASSWORD, - new ParameterInfoPasswordHint(authorizationStateWaitPassword.passwordHint) - ).flatMap(password -> sendDirectly(new CheckAuthenticationPassword(password), DEFAULT_TIMEOUT, false))); - }); + .flatMap(handler -> MonoUtils.thenOrLogRepeatError(() -> handler + .onParameterRequest(Parameter.ASK_PASSWORD, new ParameterInfoPasswordHint( + authorizationStateWaitPassword.passwordHint)) + .flatMap(password -> sendDirectly(new CheckAuthenticationPassword(password), false)) + )); case AuthorizationStateReady.CONSTRUCTOR: { this.authState.tryEmitNext(new AuthorizationStateReady()); return Mono.empty(); @@ -543,17 +523,20 @@ public class AsyncTdEasy { return Mono.empty(); case AuthorizationStateClosed.CONSTRUCTOR: logger.debug("Received AuthorizationStateClosed from td"); - return Mono.from(requestedDefinitiveExit.asFlux()).doOnNext(closeRequested -> { + return Mono.fromCallable(() -> { + var closeRequested = this.requestedDefinitiveExit.get(); if (closeRequested) { logger.debug("td closed successfully"); } else { logger.warn("td closed unexpectedly: {}", logName); } authState.tryEmitNext(obj); + return closeRequested; }).flatMap(closeRequested -> { if (closeRequested) { return Mono - .from(settings.asFlux()) + .fromCallable(settings::get) + .single() .map(settings -> settings.databaseDirectory) .map(Path::of) .flatMapIterable(sessionPath -> Set.of(sessionPath.resolve("media"), @@ -579,9 +562,9 @@ public class AsyncTdEasy { logger.error("Can't delete temporary session subdirectory", e); } }) - .then(Mono.just(closeRequested)); + .then(Mono.just(true)); } else { - return Mono.just(closeRequested); + return Mono.just(false); } }).then(); default: diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/direct/AsyncTdMiddleLocal.java b/src/main/java/it/tdlight/tdlibsession/td/middle/direct/AsyncTdMiddleLocal.java index b0ca0e8..ad159bd 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/direct/AsyncTdMiddleLocal.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/direct/AsyncTdMiddleLocal.java @@ -14,6 +14,7 @@ import it.tdlight.tdlibsession.td.middle.server.AsyncTdMiddleEventBusServer; import it.tdlight.utils.MonoUtils; import java.nio.file.Path; import java.time.Duration; +import java.util.concurrent.atomic.AtomicReference; import org.warp.commonutils.error.InitializationException; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -29,7 +30,8 @@ public class AsyncTdMiddleLocal implements AsyncTdMiddle { private final Vertx vertx; private final AsyncTdMiddleEventBusServer srv; - private final One cli = Sinks.one(); + private final AtomicReference cli = new AtomicReference<>(null); + private final AtomicReference startError = new AtomicReference<>(null); private final JsonObject implementationDetails; @@ -64,23 +66,35 @@ public class AsyncTdMiddleLocal implements AsyncTdMiddle { .start(botId, botAlias, true, implementationDetails, tuple.getT2()) .thenReturn(tuple.getT1())) .onErrorMap(InitializationException::new) - .doOnNext(this.cli::tryEmitValue) - .doOnError(this.cli::tryEmitError) + .doOnNext(this.cli::set) + .doOnError(this.startError::set) .thenReturn(this); } @Override public Mono initialize() { - return cli.asMono().single().flatMap(AsyncTdMiddle::initialize); + var startError = this.startError.get(); + if (startError != null) { + return Mono.error(startError); + } + return Mono.fromCallable(cli::get).single().flatMap(AsyncTdMiddle::initialize); } @Override public Flux receive() { - return cli.asMono().single().flatMapMany(AsyncTdMiddle::receive); + var startError = this.startError.get(); + if (startError != null) { + return Flux.error(startError); + } + return Mono.fromCallable(cli::get).single().flatMapMany(AsyncTdMiddle::receive); } @Override public Mono> execute(Function request, Duration timeout, boolean executeDirectly) { - return cli.asMono().single().flatMap(c -> c.execute(request, timeout, executeDirectly)); + var startError = this.startError.get(); + if (startError != null) { + return Mono.error(startError); + } + return Mono.fromCallable(cli::get).single().flatMap(c -> c.execute(request, timeout, executeDirectly)); } }