diff --git a/src/main/java/it/tdlight/tdlibsession/td/easy/AsyncTdEasy.java b/src/main/java/it/tdlight/tdlibsession/td/easy/AsyncTdEasy.java index 905ad88..40c2b90 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/easy/AsyncTdEasy.java +++ b/src/main/java/it/tdlight/tdlibsession/td/easy/AsyncTdEasy.java @@ -45,6 +45,7 @@ import java.util.Objects; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.LockSupport; import org.reactivestreams.Publisher; import org.warp.commonutils.log.Logger; import org.warp.commonutils.log.LoggerFactory; @@ -52,8 +53,11 @@ import org.warp.commonutils.error.InitializationException; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.Sinks; +import reactor.core.publisher.Sinks.EmitResult; +import reactor.core.publisher.Sinks.Empty; import reactor.core.publisher.Sinks.Many; import reactor.core.publisher.Sinks.One; +import reactor.core.publisher.SynchronousSink; import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; @@ -63,8 +67,11 @@ public class AsyncTdEasy { private final Logger logger; private static final Duration DEFAULT_TIMEOUT = Duration.ofMinutes(1); - private final Many authState = Sinks.many().replay().latest(); + private final Empty closed = Sinks.empty(); + private final Many authStateSink = Sinks.many().replay().latest(); + private final AtomicReference authState = new AtomicReference<>(new AuthorizationStateClosed()); private final AtomicBoolean requestedDefinitiveExit = new AtomicBoolean(); + private final AtomicBoolean canSendCloseRequest = new AtomicBoolean(); private final AtomicReference settings = new AtomicReference<>(null); private final Many globalErrors = Sinks.many().multicast().onBackpressureBuffer(); private final One fatalError = Sinks.one(); @@ -79,9 +86,28 @@ public class AsyncTdEasy { this.logger = LoggerFactory.getLogger("AsyncTdEasy " + logName); this.incomingUpdates = td.receive() + .doFirst(() -> { + canSendCloseRequest.set(true); + logger.debug("From now onwards TdApi.Close cannot be called"); + }) + .doOnTerminate(() -> { + canSendCloseRequest.set(false); + logger.debug("From now onwards TdApi.Close can be called"); + }) .flatMapSequential(this::preprocessUpdates) - .flatMapSequential(update -> Mono.from(this.getState()).single().map(state -> new AsyncTdUpdateObj(state, update))) + .map(update -> { + var state = authState.get(); + Objects.requireNonNull(state, "State is not set"); + return new AsyncTdUpdateObj(state, update); + }) .map(upd -> (TdApi.Update) upd.getUpdate()) + .takeUntil(update -> { + if (update.getConstructor() == UpdateAuthorizationState.CONSTRUCTOR) { + var state = ((UpdateAuthorizationState) update).authorizationState; + return state.getConstructor() == AuthorizationStateClosed.CONSTRUCTOR; + } + return false; + }) .doOnError(ex -> { if (ex instanceof TdError) { var tdEx = (TdError) ex; @@ -97,35 +123,24 @@ public class AsyncTdEasy { logger.error(ex.getLocalizedMessage(), ex); } }) - .doOnComplete(() -> authState.asFlux().take(1, true).single().subscribeOn(scheduler).subscribe(authState -> { + .doFinally(s -> { + var state = authState.get(); onUpdatesTerminated(); - if (authState.getConstructor() != AuthorizationStateClosed.CONSTRUCTOR) { + if (state.getConstructor() != AuthorizationStateClosed.CONSTRUCTOR) { logger.warn("Updates stream has closed while" + " the current authorization state is" - + " still {}. Setting authorization state as closed!", authState.getClass().getSimpleName()); + + " still {}. Setting authorization state as closed!", state.getClass().getSimpleName()); this.fatalError.tryEmitValue(FatalErrorType.CONNECTION_KILLED); - this.authState.tryEmitNext(new AuthorizationStateClosed()); } - })).doOnError(ex -> authState.asFlux() - .take(1, true) - .single() - .subscribeOn(scheduler) - .subscribe(authState -> { - onUpdatesTerminated(); - if (authState.getConstructor() != AuthorizationStateClosed.CONSTRUCTOR) { - logger.warn("Updates stream has terminated with an error while" - + " the current authorization state is" - + " still {}. Setting authorization state as closed!", authState.getClass().getSimpleName()); - this.fatalError.tryEmitValue(FatalErrorType.CONNECTION_KILLED); - this.authState.tryEmitNext(new AuthorizationStateClosed()); - } - }) - ); + }); } private void onUpdatesTerminated() { logger.debug("Incoming updates flux terminated. Setting requestedDefinitiveExit: true"); requestedDefinitiveExit.set(true); + + var newState = new AuthorizationStateClosed(); + emitState(newState); } public Mono create(TdEasySettings settings) { @@ -156,8 +171,8 @@ public class AsyncTdEasy { /** * Get TDLib state */ - public Flux getState() { - return authState.asFlux().distinct(); + public Flux state() { + return authStateSink.asFlux().distinct(); } /** @@ -199,7 +214,7 @@ public class AsyncTdEasy { * @param i level */ public Mono setVerbosityLevel(int i) { - return thenOrFatalError(sendDirectly(new TdApi.SetLogVerbosityLevel(i), true)); + return sendDirectly(new TdApi.SetLogVerbosityLevel(i), true).transform(this::thenOrFatalError); } /** @@ -207,8 +222,8 @@ public class AsyncTdEasy { * @param name option name */ public Mono clearOption(String name) { - return thenOrFatalError(sendDirectly(new TdApi.SetOption(name, new TdApi.OptionValueEmpty()), false - )); + return sendDirectly(new TdApi.SetOption(name, new TdApi.OptionValueEmpty()), false) + .transform(this::thenOrFatalError); } /** @@ -217,8 +232,8 @@ public class AsyncTdEasy { * @param value option value */ public Mono setOptionString(String name, String value) { - return thenOrFatalError(sendDirectly(new TdApi.SetOption(name, new TdApi.OptionValueString(value)), false - )); + return sendDirectly(new TdApi.SetOption(name, new TdApi.OptionValueString(value)), false) + .transform(this::thenOrFatalError); } /** @@ -227,8 +242,8 @@ public class AsyncTdEasy { * @param value option value */ public Mono setOptionInteger(String name, long value) { - return thenOrFatalError(sendDirectly(new TdApi.SetOption(name, new TdApi.OptionValueInteger(value)), false - )); + return sendDirectly(new TdApi.SetOption(name, new TdApi.OptionValueInteger(value)), false) + .transform(this::thenOrFatalError); } /** @@ -237,8 +252,8 @@ public class AsyncTdEasy { * @param value option value */ public Mono setOptionBoolean(String name, boolean value) { - return thenOrFatalError(sendDirectly(new TdApi.SetOption(name, new TdApi.OptionValueBoolean(value)), false - )); + return sendDirectly(new TdApi.SetOption(name, new TdApi.OptionValueBoolean(value)), false) + .transform(this::thenOrFatalError); } /** @@ -292,7 +307,7 @@ public class AsyncTdEasy { */ public Mono getOptionBoolean(String name) { return this - .sendDirectly(new TdApi.GetOption(name), false) + .sendDirectly(new TdApi.GetOption(name), false) .handle(MonoUtils::orElseThrow) .flatMap(value -> { switch (value.getConstructor()) { @@ -323,7 +338,19 @@ public class AsyncTdEasy { * Closes the client gracefully by sending {@link TdApi.Close}. */ public Mono close() { - return Mono.from(getState()) + var waitClosed = closed.asMono() + .doFirst(() -> logger.debug("Waiting for AuthorizationStateClosed...")) + .doOnSuccess(s -> logger.debug("Received AuthorizationStateClosed after TdApi.Close")) + .transformDeferred(mono -> { + if (canSendCloseRequest.get()) { + return mono; + } else { + return Mono.fromRunnable(() -> emitState(new AuthorizationStateClosed())); + } + }); + + return Mono + .fromSupplier(authState::get) .filter(state -> { switch (state.getConstructor()) { case AuthorizationStateClosing.CONSTRUCTOR: @@ -339,16 +366,23 @@ public class AsyncTdEasy { logger.debug("Setting requestedDefinitiveExit: true"); requestedDefinitiveExit.set(true); }) - .doOnSuccess(s -> logger.debug("Sending TdApi.Close")) - .then(td.execute(new TdApi.Close(), DEFAULT_TIMEOUT, false)) - .doOnNext(closeResponse -> logger.debug("TdApi.Close response is: \"{}\"", - closeResponse.toString().replace('\n', ' ') - )) - .then(authState.asFlux() - .filter(authorizationState -> authorizationState.getConstructor() == AuthorizationStateClosed.CONSTRUCTOR) - .take(1) - .singleOrEmpty()) - .doOnNext(ok -> logger.debug("Received AuthorizationStateClosed after TdApi.Close")) + .then(td + .execute(new TdApi.Close(), Duration.ofSeconds(5), false) + .doFirst(() -> logger.debug("Sending TdApi.Close")) + .doOnNext(closeResponse -> logger.debug("TdApi.Close response is: \"{}\"", + closeResponse.toString().replace('\n', ' ') + )) + .doOnSuccess(s -> logger.debug("Sent TdApi.Close")) + .transformDeferred(closeMono -> { + if (canSendCloseRequest.get()) { + return closeMono; + } else { + return Mono.empty(); + } + }) + ) + + .then(waitClosed) .doOnSuccess(s -> logger.info("AsyncTdEasy closed successfully")) .then(); } @@ -409,51 +443,61 @@ public class AsyncTdEasy { .flatMap(obj -> { switch (obj.getConstructor()) { case AuthorizationStateWaitTdlibParameters.CONSTRUCTOR: - return thenOrFatalError(Mono.fromCallable(this.settings::get).single().map(settings -> { - var parameters = new TdlibParameters(); - parameters.useTestDc = settings.useTestDc; - parameters.databaseDirectory = settings.databaseDirectory; - parameters.filesDirectory = settings.filesDirectory; - parameters.useFileDatabase = settings.useFileDatabase; - parameters.useChatInfoDatabase = settings.useChatInfoDatabase; - parameters.useMessageDatabase = settings.useMessageDatabase; - parameters.useSecretChats = false; - parameters.apiId = settings.apiId; - parameters.apiHash = settings.apiHash; - parameters.systemLanguageCode = settings.systemLanguageCode; - parameters.deviceModel = settings.deviceModel; - parameters.systemVersion = settings.systemVersion; - parameters.applicationVersion = settings.applicationVersion; - parameters.enableStorageOptimizer = settings.enableStorageOptimizer; - parameters.ignoreFileNames = settings.ignoreFileNames; - return new SetTdlibParameters(parameters); - }).flatMap((SetTdlibParameters obj1) -> sendDirectly(obj1, false))); + return Mono + .fromCallable(this.settings::get) + .single() + .map(settings -> { + var parameters = new TdlibParameters(); + parameters.useTestDc = settings.useTestDc; + parameters.databaseDirectory = settings.databaseDirectory; + parameters.filesDirectory = settings.filesDirectory; + parameters.useFileDatabase = settings.useFileDatabase; + parameters.useChatInfoDatabase = settings.useChatInfoDatabase; + parameters.useMessageDatabase = settings.useMessageDatabase; + parameters.useSecretChats = false; + parameters.apiId = settings.apiId; + parameters.apiHash = settings.apiHash; + parameters.systemLanguageCode = settings.systemLanguageCode; + parameters.deviceModel = settings.deviceModel; + parameters.systemVersion = settings.systemVersion; + parameters.applicationVersion = settings.applicationVersion; + parameters.enableStorageOptimizer = settings.enableStorageOptimizer; + parameters.ignoreFileNames = settings.ignoreFileNames; + return new SetTdlibParameters(parameters); + }) + .flatMap((SetTdlibParameters obj1) -> sendDirectly(obj1, false)) + .transform(this::thenOrFatalError); case AuthorizationStateWaitEncryptionKey.CONSTRUCTOR: - return thenOrFatalError(sendDirectly(new CheckDatabaseEncryptionKey(), false)) + return sendDirectly(new CheckDatabaseEncryptionKey(), false) + .transform(this::thenOrFatalError) .onErrorResume((error) -> { logger.error("Error while checking TDLib encryption key", error); return sendDirectly(new TdApi.Close(), false).then(); }); case AuthorizationStateWaitPhoneNumber.CONSTRUCTOR: - return thenOrFatalError(Mono.fromCallable(this.settings::get).single().flatMap(settings -> { - if (settings.isPhoneNumberSet()) { - return sendDirectly(new SetAuthenticationPhoneNumber(String.valueOf(settings.getPhoneNumber()), - new PhoneNumberAuthenticationSettings(false, false, false) - ), false); - } else if (settings.isBotTokenSet()) { - return sendDirectly(new CheckAuthenticationBotToken(settings.getBotToken()), false); - } else { - return Mono.error(new IllegalArgumentException("A bot is neither an user or a bot")); - } - })).onErrorResume((error) -> { - logger.error("Error while waiting for phone number", error); - return sendDirectly(new TdApi.Close(), false).then(); - }); + return Mono + .fromCallable(this.settings::get).single().flatMap(settings -> { + if (settings.isPhoneNumberSet()) { + return sendDirectly(new SetAuthenticationPhoneNumber(String.valueOf(settings.getPhoneNumber()), + new PhoneNumberAuthenticationSettings(false, false, false) + ), false); + } else if (settings.isBotTokenSet()) { + return sendDirectly(new CheckAuthenticationBotToken(settings.getBotToken()), false); + } else { + return Mono.error(new IllegalArgumentException("A bot is neither an user or a bot")); + } + }) + .transform(this::thenOrFatalError) + .onErrorResume((error) -> { + logger.error("Error while waiting for phone number", error); + return sendDirectly(new TdApi.Close(), false).then(); + }); case AuthorizationStateWaitRegistration.CONSTRUCTOR: var authorizationStateWaitRegistration = (AuthorizationStateWaitRegistration) obj; RegisterUser registerUser = new RegisterUser(); if (authorizationStateWaitRegistration.termsOfService != null - && authorizationStateWaitRegistration.termsOfService.text != null && !authorizationStateWaitRegistration.termsOfService.text.text.isBlank()) { + && authorizationStateWaitRegistration.termsOfService.text != null + && !authorizationStateWaitRegistration.termsOfService.text.text.isBlank()) { logger.info("Telegram Terms of Service:\n" + authorizationStateWaitRegistration.termsOfService.text.text); } @@ -461,7 +505,7 @@ public class AsyncTdEasy { .fromCallable(this.settings::get) .single() .map(TdEasySettings::getParameterRequestHandler) - .flatMap(handler -> MonoUtils.thenOrLogRepeatError(() -> handler + .flatMap(handler -> handler .onParameterRequest(Parameter.ASK_FIRST_NAME, new ParameterInfoEmpty()) .filter(Objects::nonNull) .map(String::trim) @@ -480,7 +524,8 @@ public class AsyncTdEasy { .doOnNext(lastName -> registerUser.lastName = lastName) ) .then(sendDirectly(registerUser, false)) - )); + .transform(this::thenOrLogRepeatError) + ); case TdApi.AuthorizationStateWaitOtherDeviceConfirmation.CONSTRUCTOR: var authorizationStateWaitOtherDeviceConfirmation = (AuthorizationStateWaitOtherDeviceConfirmation) obj; return Mono @@ -496,26 +541,29 @@ public class AsyncTdEasy { .fromCallable(this.settings::get) .single() .map(TdEasySettings::getParameterRequestHandler) - .flatMap(handler -> MonoUtils.thenOrLogRepeatError(() -> handler + .flatMap(handler -> handler .onParameterRequest(Parameter.ASK_CODE, new ParameterInfoCode(authorizationStateWaitCode.codeInfo.phoneNumber, authorizationStateWaitCode.codeInfo.nextType, authorizationStateWaitCode.codeInfo.timeout, authorizationStateWaitCode.codeInfo.type)) .flatMap(code -> sendDirectly(new CheckAuthenticationCode(code), false)) - )); + .transform(this::thenOrLogRepeatError) + ); case AuthorizationStateWaitPassword.CONSTRUCTOR: var authorizationStateWaitPassword = (AuthorizationStateWaitPassword) obj; return Mono .fromCallable(this.settings::get) .single() .map(TdEasySettings::getParameterRequestHandler) - .flatMap(handler -> MonoUtils.thenOrLogRepeatError(() -> handler + .flatMap(handler -> handler .onParameterRequest(Parameter.ASK_PASSWORD, new ParameterInfoPasswordHint( authorizationStateWaitPassword.passwordHint)) .flatMap(password -> sendDirectly(new CheckAuthenticationPassword(password), false)) - )); + ) + .transform(this::thenOrLogRepeatError); case AuthorizationStateReady.CONSTRUCTOR: { - this.authState.tryEmitNext(new AuthorizationStateReady()); + var state = new AuthorizationStateReady(); + emitState(state); return Mono.empty(); } case AuthorizationStateClosing.CONSTRUCTOR: @@ -530,7 +578,7 @@ public class AsyncTdEasy { } else { logger.warn("td closed unexpectedly: {}", logName); } - authState.tryEmitNext(obj); + emitState(obj); return closeRequested; }).flatMap(closeRequested -> { if (closeRequested) { @@ -574,11 +622,36 @@ public class AsyncTdEasy { .then(Mono.justOrEmpty(updateObj.getConstructor() == Error.CONSTRUCTOR ? null : (Update) updateObj)); } - public Mono thenOrFatalError(Mono> optionalMono) { - return MonoUtils.thenOrError(optionalMono.doOnNext(result -> { + private void emitState(AuthorizationState state) { + if (state.getConstructor() == AuthorizationStateClosed.CONSTRUCTOR) { + this.closed.tryEmitEmpty(); + } + this.authState.set(state); + EmitResult emitResult; + while ((emitResult = this.authStateSink.tryEmitNext(state)) == EmitResult.FAIL_NON_SERIALIZED) { + // Wait 10ms + LockSupport.parkNanos(10L * 1000000L); + } + emitResult.orThrow(); + } + + private Mono thenOrFatalError(Mono> mono) { + return mono.doOnNext(result -> { if (result.failed()) { analyzeFatalErrors(result.cause()); } - })); + }).transform(MonoUtils::thenOrError); + } + + + private Mono thenOrLogRepeatError(Mono> mono) { + return mono.handle((TdResult optional, SynchronousSink sink) -> { + if (optional.succeeded()) { + sink.complete(); + } else { + logger.error("Received TDLib error: {}", optional.cause()); + sink.error(new TdError(optional.cause().code, optional.cause().message)); + } + }).retry(); } } diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java b/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java index 80a0a49..dddce3a 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java @@ -66,10 +66,6 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle { private final AtomicReference crash = new AtomicReference<>(); // This will only result in a successful completion, never completes in other ways private final Empty pingFail = Sinks.empty(); - // This will only result in a successful completion, never completes in other ways. - // It will be called when UpdateAuthorizationStateClosing is intercepted. - // If it's completed stop checking if the ping works or not - private final Empty authStateClosing = Sinks.empty(); private long botId; private String botAddress; @@ -306,21 +302,25 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle { logger.trace("Received update {}", update.getClass().getSimpleName()); if (update.getConstructor() == TdApi.UpdateAuthorizationState.CONSTRUCTOR) { var updateAuthorizationState = (TdApi.UpdateAuthorizationState) update; - switch (updateAuthorizationState.authorizationState.getConstructor()) { - case TdApi.AuthorizationStateClosing.CONSTRUCTOR: - authStateClosing.tryEmitEmpty(); - break; - case TdApi.AuthorizationStateClosed.CONSTRUCTOR: - logger.info("Received AuthorizationStateClosed from tdlib"); - return cluster.getEventBus() - .rxRequest(this.botAddress + ".read-binlog", EMPTY) - .to(RxJava2Adapter::singleToMono) - .mapNotNull(Message::body) - .doOnNext(latestBinlog -> logger.info("Received binlog from server. Size: {}", - BinlogUtils.humanReadableByteCountBin(latestBinlog.binlog().length()))) - .flatMap(latestBinlog -> this.saveBinlog(latestBinlog.binlog())) - .doOnSuccess(s -> logger.info("Overwritten binlog from server")) - .thenReturn(update); + if (updateAuthorizationState.authorizationState.getConstructor() == AuthorizationStateClosed.CONSTRUCTOR) { + logger.info("Received AuthorizationStateClosed from tdlib"); + + var pinger = this.pinger.get(); + if (pinger != null) { + pinger.dispose(); + } + + return cluster.getEventBus() + .rxRequest(this.botAddress + ".read-binlog", EMPTY) + .to(RxJava2Adapter::singleToMono) + .mapNotNull(Message::body) + .doOnNext(latestBinlog -> logger.info("Received binlog from server. Size: {}", + BinlogUtils.humanReadableByteCountBin(latestBinlog.binlog().length()) + )) + .flatMap(latestBinlog -> this.saveBinlog(latestBinlog.binlog())) + .doOnSuccess(s -> logger.info("Overwritten binlog from server")) + .doFirst(() -> logger.info("Asking binlog to server")) + .thenReturn(update); } } return Mono.just(update); diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java b/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java index edd7de7..b281a4f 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java @@ -37,6 +37,7 @@ import java.net.ConnectException; import java.time.Duration; import java.util.List; import java.util.Objects; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import org.warp.commonutils.log.Logger; import org.warp.commonutils.log.LoggerFactory; @@ -82,6 +83,9 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { return Mono .fromCallable(() -> { logger.trace("Stating verticle"); + + System.setProperty("it.tdlight.enableShutdownHooks", "false"); + var botId = config().getInteger("botId"); if (botId == null || botId <= 0) { throw new IllegalArgumentException("botId is not set!"); @@ -271,10 +275,12 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { if (pingConsumer != null) { pingConsumer.dispose(); } - var readBinlogConsumer = this.readBinlogConsumer.get(); - if (readBinlogConsumer != null) { - readBinlogConsumer.dispose(); - } + var readBinlogConsumer = this.readBinlogConsumer.getAndSet(null); + Schedulers.boundedElastic().schedule(() -> { + if (readBinlogConsumer != null) { + readBinlogConsumer.dispose(); + } + }, 10, TimeUnit.MINUTES); var readyToReceiveConsumer = this.readyToReceiveConsumer.get(); if (readyToReceiveConsumer != null) { readyToReceiveConsumer.dispose(); diff --git a/src/main/java/it/tdlight/utils/MonoUtils.java b/src/main/java/it/tdlight/utils/MonoUtils.java index 7861c1e..053b82d 100644 --- a/src/main/java/it/tdlight/utils/MonoUtils.java +++ b/src/main/java/it/tdlight/utils/MonoUtils.java @@ -9,6 +9,7 @@ import io.vertx.core.AsyncResult; import io.vertx.core.Future; import io.vertx.core.Handler; import io.vertx.core.Promise; +import io.vertx.reactivex.RxHelper; import io.vertx.reactivex.core.eventbus.Message; import io.vertx.reactivex.core.eventbus.MessageConsumer; import io.vertx.reactivex.core.streams.Pipe; @@ -27,6 +28,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; import java.util.function.Supplier; +import org.apache.commons.lang3.NotImplementedException; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.reactivestreams.Publisher; @@ -59,7 +61,7 @@ public class MonoUtils { public static Mono notImplemented() { return Mono.fromCallable(() -> { - throw new UnsupportedOperationException("Method not implemented"); + throw new NotImplementedException(); }); } @@ -78,14 +80,6 @@ public class MonoUtils { return fromBlockingMaybe(callable).single(); } - public static void orElseThrowFuture(TdResult value, SynchronousSink> sink) { - if (value.succeeded()) { - sink.next(CompletableFuture.completedFuture(value.result())); - } else { - sink.next(CompletableFuture.failedFuture(new TdError(value.cause().code, value.cause().message))); - } - } - public static void orElseThrow(TdResult value, SynchronousSink sink) { if (value.succeeded()) { sink.next(value.result()); @@ -104,66 +98,28 @@ public class MonoUtils { }); } - public static Mono thenOrLogSkipError(Mono> optionalMono) { - return optionalMono.handle((optional, sink) -> { - if (optional.failed()) { - logger.error("Received TDLib error: {}", optional.cause()); - } - sink.complete(); - }); - } - - public static Mono orElseLogSkipError(TdResult optional) { - if (optional.failed()) { - logger.error("Received TDLib error: {}", optional.cause()); - return Mono.empty(); - } - return Mono.just(optional.result()); - } - - public static Mono thenOrLogRepeatError(Supplier>> optionalMono) { - return Mono.defer(() -> optionalMono.get().handle((TdResult optional, SynchronousSink sink) -> { - if (optional.succeeded()) { - sink.complete(); - } else { - logger.error("Received TDLib error: {}", optional.cause()); - sink.error(new TdError(optional.cause().code, optional.cause().message)); - } - })).retry(); - } - + @Deprecated public static Mono toMono(Future future) { - return Mono.create(sink -> future.onComplete(result -> { - if (result.succeeded()) { - sink.success(result.result()); - } else { - sink.error(result.cause()); - } - })); + return Mono.fromCompletionStage(future.toCompletionStage()); } + @Deprecated @NotNull public static Mono toMono(Single single) { - return Mono.from(single.toFlowable()); + return RxJava2Adapter.singleToMono(single); } + @Deprecated @NotNull - public static Mono toMono(Maybe single) { - return Mono.from(single.toFlowable()); + public static Mono toMono(Maybe maybe) { + return RxJava2Adapter.maybeToMono(maybe); } + @Deprecated @NotNull public static Mono toMono(Completable completable) { - return Mono.from(completable.toFlowable()); - } - - public static Completable toCompletable(Mono s) { - return Completable.fromPublisher(s); - } - - @SuppressWarnings({"unchecked", "rawtypes"}) - public static Mono castVoid(Mono mono) { - return (Mono) mono; + //noinspection unchecked + return (Mono) RxJava2Adapter.completableToMono(completable); } public static Flux fromMessageConsumer(Mono onRegistered, MessageConsumer messageConsumer) { @@ -177,7 +133,9 @@ public class MonoUtils { .doFirst(() -> logger.trace("Waiting for consumer registration completion...")) .doOnSuccess(s -> logger.trace("Consumer registered")) .then(onRegistered); - return messageConsumer.toFlowable().to(RxJava2Adapter::flowableToFlux).mergeWith(registration.then(Mono.empty())); + var messages = messageConsumer.toFlowable().to(RxJava2Adapter::flowableToFlux); + + return messages.mergeWith(registration.then(Mono.empty())); } public static Scheduler newBoundedSingle(String name) { @@ -202,16 +160,4 @@ public class MonoUtils { .map(res -> true) .defaultIfEmpty(false); } - - @FunctionalInterface - public interface VoidCallable { - void call() throws Exception; - } - - public static Mono fromVoidCallable(VoidCallable callable) { - return Mono.fromCallable(() -> { - callable.call(); - return null; - }); - } }