diff --git a/src/main/java/it/tdlight/tdlibsession/remoteclient/TDLibRemoteClient.java b/src/main/java/it/tdlight/tdlibsession/remoteclient/TDLibRemoteClient.java index 60d6665..2195d34 100644 --- a/src/main/java/it/tdlight/tdlibsession/remoteclient/TDLibRemoteClient.java +++ b/src/main/java/it/tdlight/tdlibsession/remoteclient/TDLibRemoteClient.java @@ -30,7 +30,6 @@ import reactor.core.publisher.Sinks; import reactor.core.publisher.Sinks.One; import reactor.core.scheduler.Schedulers; import reactor.tools.agent.ReactorDebugAgent; -import reactor.util.function.Tuple2; public class TDLibRemoteClient implements AutoCloseable { @@ -168,21 +167,23 @@ public class TDLibRemoteClient implements AutoCloseable { }) .single() .flatMap(clusterManager -> { - MessageConsumer startBotConsumer = clusterManager.getEventBus().consumer("bots.start-bot"); - return MonoUtils - .fromReplyableResolvedMessageConsumer(startBotConsumer) - .flatMap(tuple -> this.listenForStartBotsCommand(clusterManager, tuple.getT1(), tuple.getT2())); + MessageConsumer startBotConsumer + = clusterManager.getEventBus().consumer("bots.start-bot"); + + return this.listenForStartBotsCommand( + clusterManager, + MonoUtils.fromReplyableMessageConsumer(Mono.empty(), startBotConsumer) + ); }) .then(); } private Mono listenForStartBotsCommand(TdClusterManager clusterManager, - Mono completion, - Flux, StartSessionMessage>> messages) { + Flux> messages) { return MonoUtils .fromBlockingEmpty(() -> messages .flatMapSequential(msg -> { - StartSessionMessage req = msg.getT2(); + StartSessionMessage req = msg.body(); DeploymentOptions deploymentOptions = clusterManager .newDeploymentOpts() .setConfig(new JsonObject() @@ -201,9 +202,9 @@ public class TDLibRemoteClient implements AutoCloseable { .chooseBinlog(clusterManager.getVertx().fileSystem(), blPath, req.binlog(), req.binlogDate()) .then(BinlogUtils.cleanSessionPath(clusterManager.getVertx().fileSystem(), blPath, sessPath, mediaPath)) .then(clusterManager.getVertx().rxDeployVerticle(verticle, deploymentOptions).as(MonoUtils::toMono)) - .then(MonoUtils.fromBlockingEmpty(() -> msg.getT1().reply(new byte[0]))) + .then(MonoUtils.fromBlockingEmpty(() -> msg.reply(new byte[0]))) .onErrorResume(ex -> { - msg.getT1().fail(500, "Failed to deploy bot verticle: " + ex.getMessage()); + msg.fail(500, "Failed to deploy bot verticle: " + ex.getMessage()); logger.error("Failed to deploy bot verticle", ex); return Mono.empty(); }); @@ -213,8 +214,7 @@ public class TDLibRemoteClient implements AutoCloseable { v -> {}, ex -> logger.error("Bots starter activity crashed. From now on, no new bots can be started anymore", ex) ) - ) - .then(completion); + ); } public static Path getSessionDirectory(long botId) { diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java b/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java index 1c0e745..c1853f7 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java @@ -226,68 +226,64 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle { .then(updates.asMono()) .publishOn(Schedulers.parallel()) .timeout(Duration.ofSeconds(30)) - .flatMapMany(updatesMessageConsumer -> MonoUtils - .fromMessageConsumer(updatesMessageConsumer) - .flatMapMany(registration -> Mono - .fromRunnable(() -> logger.trace("Registering updates flux")) - .then(registration.getT1()) - .doOnSuccess(s -> logger.trace("Registered updates flux")) - .doOnSuccess(s -> logger.trace("Sending ready-to-receive")) - .then(cluster.getEventBus().rxRequest(botAddress + ".ready-to-receive", - EMPTY, - deliveryOptionsWithTimeout - ).as(MonoUtils::toMono)) - .doOnSuccess(s -> logger.trace("Sent ready-to-receive, received reply")) - .doOnSuccess(s -> logger.trace("About to read updates flux")) - .thenMany(registration.getT2()) + .doOnSuccess(s -> logger.trace("Registering updates flux")) + .flatMapMany(updatesMessageConsumer -> MonoUtils.fromMessageConsumer(Mono + .empty() + .doOnSuccess(s -> logger.trace("Sending ready-to-receive")) + .then(cluster.getEventBus().rxRequest(botAddress + ".ready-to-receive", + EMPTY, + deliveryOptionsWithTimeout + ).as(MonoUtils::toMono)) + .doOnSuccess(s -> logger.trace("Sent ready-to-receive, received reply")) + .doOnSuccess(s -> logger.trace("About to read updates flux")) + .then(), updatesMessageConsumer) + ) + .takeUntilOther(Flux + .merge( + crash.asMono() + .onErrorResume(ex -> { + logger.error("TDLib crashed", ex); + return Mono.empty(); + }), + pingFail.asMono() + .then(Mono.fromCallable(() -> { + var ex = new ConnectException("Server did not respond to ping"); + ex.setStackTrace(new StackTraceElement[0]); + throw ex; + }).onErrorResume(ex -> MonoUtils.emitError(crash, ex))) + .takeUntilOther(Mono + .firstWithSignal(crash.asMono(), authStateClosing.asMono()) + .onErrorResume(e -> Mono.empty()) + ) ) - .takeUntilOther(Flux - .merge( - crash.asMono() - .onErrorResume(ex -> { - logger.error("TDLib crashed", ex); - return Mono.empty(); - }), - pingFail.asMono() - .then(Mono.fromCallable(() -> { - var ex = new ConnectException("Server did not respond to ping"); - ex.setStackTrace(new StackTraceElement[0]); - throw ex; - }).onErrorResume(ex -> MonoUtils.emitError(crash, ex))) - .takeUntilOther(Mono - .firstWithSignal(crash.asMono(), authStateClosing.asMono()) - .onErrorResume(e -> Mono.empty()) - ) - ) - .doOnTerminate(() -> logger.trace("TakeUntilOther has been trigghered, the receive() flux will end")) - ) - .takeUntil(a -> a.succeeded() && a.value().stream().anyMatch(item -> { - if (item.getConstructor() == UpdateAuthorizationState.CONSTRUCTOR) { - return ((UpdateAuthorizationState) item).authorizationState.getConstructor() - == AuthorizationStateClosed.CONSTRUCTOR; - } - return false; - })) - .flatMapSequential(updates -> { - if (updates.succeeded()) { - return Flux.fromIterable(updates.value()); - } else { - return Mono.fromCallable(() -> TdResult.failed(updates.error()).orElseThrow()); - } - }) - .flatMapSequential(update -> interceptUpdate(updatesMessageConsumer, update)) - // Redirect errors to crash sink - .doOnError(error -> crash.tryEmitError(error)) - .onErrorResume(ex -> { - logger.trace("Absorbing the error, the error has been published using the crash sink", ex); - return Mono.empty(); - }) + .doOnTerminate(() -> logger.trace("TakeUntilOther has been trigghered, the receive() flux will end")) + ) + .takeUntil(a -> a.succeeded() && a.value().stream().anyMatch(item -> { + if (item.getConstructor() == UpdateAuthorizationState.CONSTRUCTOR) { + return ((UpdateAuthorizationState) item).authorizationState.getConstructor() + == AuthorizationStateClosed.CONSTRUCTOR; + } + return false; + })) + .flatMapSequential(updates -> { + if (updates.succeeded()) { + return Flux.fromIterable(updates.value()); + } else { + return Mono.fromCallable(() -> TdResult.failed(updates.error()).orElseThrow()); + } + }) + .concatMap(update -> interceptUpdate(update)) + // Redirect errors to crash sink + .doOnError(error -> crash.tryEmitError(error)) + .onErrorResume(ex -> { + logger.trace("Absorbing the error, the error has been published using the crash sink", ex); + return Mono.empty(); + }) - .doOnTerminate(updatesStreamEnd::tryEmitEmpty) - ); + .doOnTerminate(updatesStreamEnd::tryEmitEmpty); } - private Mono interceptUpdate(MessageConsumer updatesMessageConsumer, Object update) { + private Mono interceptUpdate(Object update) { logger.trace("Received update {}", update.getClass().getSimpleName()); switch (update.getConstructor()) { case TdApi.UpdateAuthorizationState.CONSTRUCTOR: @@ -298,7 +294,6 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle { break; case TdApi.AuthorizationStateClosed.CONSTRUCTOR: return Mono.fromRunnable(() -> logger.info("Received AuthorizationStateClosed from tdlib")) - .then(updatesMessageConsumer.rxUnregister().as(MonoUtils::toMono)) .then(cluster.getEventBus().rxRequest(this.botAddress + ".read-binlog", EMPTY).as(MonoUtils::toMono)) .flatMap(latestBinlogMsg -> Mono.fromCallable(() -> latestBinlogMsg.body()).subscribeOn(Schedulers.parallel())) .doOnNext(latestBinlog -> logger.info("Received binlog from server. Size: " + BinlogUtils.humanReadableByteCountBin(latestBinlog.binlog().length()))) diff --git a/src/main/java/it/tdlight/utils/MonoUtils.java b/src/main/java/it/tdlight/utils/MonoUtils.java index a14fdde..af3ec0b 100644 --- a/src/main/java/it/tdlight/utils/MonoUtils.java +++ b/src/main/java/it/tdlight/utils/MonoUtils.java @@ -29,11 +29,12 @@ import java.util.function.Supplier; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.reactivestreams.Subscription; +import org.warp.commonutils.concurrency.future.CompletableFutureUtils; import org.warp.commonutils.log.Logger; import org.warp.commonutils.log.LoggerFactory; -import org.warp.commonutils.concurrency.future.CompletableFutureUtils; import reactor.core.CoreSubscriber; import reactor.core.publisher.Flux; +import reactor.core.publisher.FluxSink.OverflowStrategy; import reactor.core.publisher.Mono; import reactor.core.publisher.MonoSink; import reactor.core.publisher.Sinks; @@ -47,8 +48,6 @@ import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; import reactor.util.concurrent.Queues; import reactor.util.context.Context; -import reactor.util.function.Tuple2; -import reactor.util.function.Tuples; public class MonoUtils { @@ -375,53 +374,44 @@ public class MonoUtils { ); } - public static Mono, Flux>> fromMessageConsumer(MessageConsumer messageConsumer) { - return fromReplyableMessageConsumer(messageConsumer) - .map(tuple -> tuple.mapT2(msgs -> msgs.flatMapSequential(msg -> Mono - .fromCallable(msg::body) - .subscribeOn(Schedulers.parallel()))) - ); + public static Flux fromMessageConsumer(Mono onRegistered, MessageConsumer messageConsumer) { + return fromReplyableMessageConsumer(onRegistered, messageConsumer).map(Message::body); } - public static Mono, Flux, T>>>> fromReplyableResolvedMessageConsumer(MessageConsumer messageConsumer) { - return fromReplyableMessageConsumer(messageConsumer) - .map(tuple -> tuple.mapT2(msgs -> msgs.flatMapSequential(msg -> Mono - .fromCallable(() -> Tuples., T>of(msg, msg.body())) - .subscribeOn(Schedulers.parallel()))) - ); - } - - public static Mono, Flux>>> fromReplyableMessageConsumer(MessageConsumer messageConsumer) { - return Mono., Flux>>>fromCallable(() -> { - Many> messages = Sinks.many().unicast().onBackpressureError(); - Empty registrationRequested = Sinks.empty(); - Empty registrationCompletion = Sinks.empty(); - messageConsumer.endHandler(e -> { - messages.tryEmitComplete(); - registrationCompletion.tryEmitEmpty(); + public static Flux> fromReplyableMessageConsumer(Mono onRegistered, + MessageConsumer messageConsumer) { + Mono endMono = Mono.create(sink -> { + AtomicBoolean alreadyRequested = new AtomicBoolean(); + sink.onRequest(n -> { + if (n > 0 && alreadyRequested.compareAndSet(false, true)) { + messageConsumer.endHandler(e -> sink.success()); + } }); - messageConsumer.>handler(messages::tryEmitNext); + }); - Flux> dataFlux = Flux - .concatDelayError( - messages.asFlux(), - messageConsumer - .rxUnregister() - .as(MonoUtils::>toMono) - .doOnSuccess(s -> logger.trace("Unregistered message consumer")) - ) - .doOnSubscribe(s -> registrationRequested.tryEmitEmpty()); + Mono> registrationCompletionMono = Mono + .fromRunnable(() -> logger.trace("Waiting for consumer registration completion...")) + .then(messageConsumer.rxCompletionHandler().as(MonoUtils::toMono)) + .doOnSuccess(s -> logger.trace("Consumer registered")) + .then(onRegistered) + .thenReturn(messageConsumer); - Mono registrationCompletionMono = Mono.empty() - .doOnSubscribe(s -> registrationRequested.tryEmitEmpty()) - .then(registrationRequested.asMono()) - .doOnSuccess(s -> logger.trace("Subscribed to registration completion mono")) - .doOnSuccess(s -> logger.trace("Waiting for consumer registration completion...")) - .then(messageConsumer.rxCompletionHandler().as(MonoUtils::toMono)) - .doOnSuccess(s -> logger.trace("Consumer registered")) - .share(); - return Tuples.of(registrationCompletionMono, dataFlux); - }).subscribeOn(Schedulers.boundedElastic()); + messageConsumer.handler(s -> { + throw new IllegalStateException("Subscriber still didn't request any value!"); + }); + + Flux> dataFlux = Flux + .push(sink -> sink.onRequest(n -> messageConsumer.handler(sink::next)), OverflowStrategy.ERROR); + + Mono disposeMono = messageConsumer + .rxUnregister() + .as(MonoUtils::>toMono) + .doOnSuccess(s -> logger.trace("Unregistered message consumer")) + .then(); + + return Flux + .usingWhen(registrationCompletionMono, msgCons -> dataFlux, msgCons -> disposeMono) + .takeUntilOther(endMono); } public static Scheduler newBoundedSingle(String name) {