From 0bb4856c7e5cd38ab1ff5dca98148435b45a6a83 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Tue, 9 Nov 2021 12:49:28 +0100 Subject: [PATCH] Bugfixes --- .../client/AsyncTdMiddleEventBusClient.java | 206 ++++++++------- .../client/NoClustersAvailableException.java | 13 + .../server/AsyncTdMiddleEventBusServer.java | 234 +++++++++--------- src/main/java/it/tdlight/utils/MonoUtils.java | 37 +-- 4 files changed, 231 insertions(+), 259 deletions(-) create mode 100644 src/main/java/it/tdlight/tdlibsession/td/middle/client/NoClustersAvailableException.java diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java b/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java index 04b4fa1..80a0a49 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java @@ -1,6 +1,8 @@ package it.tdlight.tdlibsession.td.middle.client; import io.vertx.core.eventbus.DeliveryOptions; +import io.vertx.core.eventbus.ReplyException; +import io.vertx.core.eventbus.ReplyFailure; import io.vertx.core.json.JsonObject; import io.vertx.reactivex.core.Vertx; import io.vertx.reactivex.core.buffer.Buffer; @@ -32,6 +34,8 @@ import java.util.concurrent.locks.LockSupport; import org.warp.commonutils.locks.LockUtils; import org.warp.commonutils.log.Logger; import org.warp.commonutils.log.LoggerFactory; +import reactor.adapter.rxjava.RxJava2Adapter; +import reactor.core.Disposable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.Sinks; @@ -51,13 +55,15 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle { private final DeliveryOptions deliveryOptionsWithTimeout; private final DeliveryOptions pingDeliveryOptions; - private final One binlog = Sinks.one(); + private final AtomicReference binlog = new AtomicReference<>(); + + private final AtomicReference pinger = new AtomicReference<>(); private final AtomicReference> updates = new AtomicReference<>(); // This will only result in a successful completion, never completes in other ways private final Empty updatesStreamEnd = Sinks.empty(); // This will only result in a crash, never completes in other ways - private final Empty crash = Sinks.empty(); + private final AtomicReference crash = new AtomicReference<>(); // This will only result in a successful completion, never completes in other ways private final Empty pingFail = Sinks.empty(); // This will only result in a successful completion, never completes in other ways. @@ -121,7 +127,7 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle { } private Mono saveBinlog(Buffer data) { - return this.binlog.asMono().flatMap(binlog -> BinlogUtils.saveBinlog(binlog, data)); + return Mono.fromSupplier(this.binlog::get).flatMap(binlog -> BinlogUtils.saveBinlog(binlog, data)); } public Mono start(long botId, @@ -134,15 +140,9 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle { this.botAddress = "bots.bot." + this.botId; this.local = local; this.logger = LoggerFactory.getLogger(this.botId + " " + botAlias); - return MonoUtils - .fromBlockingEmpty(() -> { - EmitResult result; - while ((result = this.binlog.tryEmitValue(binlog)) == EmitResult.FAIL_NON_SERIALIZED) { - // 10ms - LockSupport.parkNanos(10000000); - } - result.orThrow(); - }) + + return Mono + .fromRunnable(() -> this.binlog.set(binlog)) .then(binlog.getLastModifiedTime()) .zipWith(binlog.readFully().map(Buffer::getDelegate)) .single() @@ -156,69 +156,76 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle { binlogLastModifiedTime, implementationDetails ); - return setupUpdatesListener() - .then(Mono.defer(() -> { - if (local) { - return Mono.empty(); - } - logger.trace("Requesting bots.start-bot"); - return cluster.getEventBus() - .rxRequest("bots.start-bot", msg).as(MonoUtils::toMono) - .doOnSuccess(s -> logger.trace("bots.start-bot returned successfully")) - .subscribeOn(Schedulers.boundedElastic()); - })) - .then(setupPing()); + + Mono startBotRequest; + + if (local) { + startBotRequest = Mono.empty(); + } else { + startBotRequest = cluster + .getEventBus() + .rxRequest("bots.start-bot", msg) + .to(RxJava2Adapter::singleToMono) + .doOnSuccess(s -> logger.trace("bots.start-bot returned successfully")) + .doFirst(() -> logger.trace("Requesting bots.start-bot")) + .onErrorMap(ex -> { + if (ex instanceof ReplyException) { + if (((ReplyException) ex).failureType() == ReplyFailure.NO_HANDLERS) { + return new NoClustersAvailableException("Can't start bot " + + botId + " " + botAlias); + } + } + return ex; + }) + .then() + .subscribeOn(Schedulers.boundedElastic()); + } + + return setupUpdatesListener().then(startBotRequest).then(setupPing()); }); } private Mono setupPing() { - return Mono.fromCallable(() -> { - logger.trace("Setting up ping"); - // Disable ping on local servers - if (!local) { - Mono - .defer(() -> { - logger.trace("Requesting ping..."); - return cluster.getEventBus() - .rxRequest(botAddress + ".ping", EMPTY, pingDeliveryOptions) - .as(MonoUtils::toMono); - }) - .flatMap(msg -> Mono.fromCallable(msg::body).subscribeOn(Schedulers.boundedElastic())) - .repeatWhen(l -> l.delayElements(Duration.ofSeconds(10)).takeWhile(x -> true)) - .takeUntilOther(Mono.firstWithSignal( - this.updatesStreamEnd - .asMono() - .doOnTerminate(() -> logger.trace("About to kill pinger because updates stream ended")), - this.crash - .asMono() - .onErrorResume(ex -> Mono.empty()) - .doOnTerminate(() -> logger.trace("About to kill pinger because it has seen a crash signal")) - )) - .doOnNext(s -> logger.trace("PING")) - .then() - .onErrorResume(ex -> { - logger.warn("Ping failed: {}", ex.getMessage()); - return Mono.empty(); - }) - .doOnNext(s -> logger.debug("END PING")) - .then(MonoUtils.fromBlockingEmpty(() -> { - while (this.pingFail.tryEmitEmpty() == EmitResult.FAIL_NON_SERIALIZED) { - // 10ms - LockSupport.parkNanos(10000000); - } - })) - .subscribeOn(Schedulers.parallel()) - .subscribe(); - } - logger.trace("Ping setup success"); - return null; - }).subscribeOn(Schedulers.boundedElastic()); + // Disable ping on local servers + if (local) { + return Mono.empty(); + } + + var pingRequest = cluster.getEventBus() + .rxRequest(botAddress + ".ping", EMPTY, pingDeliveryOptions) + .to(RxJava2Adapter::singleToMono) + .doFirst(() -> logger.trace("Requesting ping...")); + + return Mono + .fromRunnable(() -> pinger.set(pingRequest + .flatMap(msg -> Mono.fromCallable(msg::body).subscribeOn(Schedulers.boundedElastic())) + .repeatWhen(l -> l.delayElements(Duration.ofSeconds(10)).takeWhile(x -> true)) + .doOnNext(s -> logger.trace("PING")) + .then() + .onErrorResume(ex -> { + logger.warn("Ping failed: {}", ex.getMessage()); + return Mono.empty(); + }) + .doOnNext(s -> logger.debug("END PING")) + .then(Mono.fromRunnable(() -> { + while (this.pingFail.tryEmitEmpty() == EmitResult.FAIL_NON_SERIALIZED) { + // 10ms + LockSupport.parkNanos(10000000); + } + }).subscribeOn(Schedulers.boundedElastic())) + .subscribeOn(Schedulers.parallel()) + .subscribe()) + ) + .then() + .doFirst(() -> logger.trace("Setting up ping")) + .doOnSuccess(s -> logger.trace("Ping setup success")) + .subscribeOn(Schedulers.boundedElastic()); } private Mono setupUpdatesListener() { return Mono .fromRunnable(() -> logger.trace("Setting up updates listener...")) - .then(MonoUtils.>fromBlockingSingle(() -> MessageConsumer + .then(Mono.>fromSupplier(() -> MessageConsumer .newInstance(cluster.getEventBus().consumer(botAddress + ".updates") .setMaxBufferedMessages(5000) .getDelegate() @@ -228,7 +235,7 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle { // Return when the registration of all the consumers has been done across the cluster return Mono .fromRunnable(() -> logger.trace("Emitting updates flux to sink")) - .then(MonoUtils.fromBlockingEmpty(() -> { + .then(Mono.fromRunnable(() -> { var previous = this.updates.getAndSet(updateConsumer); if (previous != null) { logger.error("Already subscribed a consumer to the updates"); @@ -257,38 +264,12 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle { .then(cluster.getEventBus().rxRequest(botAddress + ".ready-to-receive", EMPTY, deliveryOptionsWithTimeout - ).as(MonoUtils::toMono)) + ).to(RxJava2Adapter::singleToMono)) .doOnSuccess(s -> logger.trace("Sent ready-to-receive, received reply")) .doOnSuccess(s -> logger.trace("About to read updates flux")) .then(), updatesMessageConsumer) ) - .takeUntilOther(Flux - .merge( - crash.asMono() - .onErrorResume(ex -> { - logger.error("TDLib crashed", ex); - return Mono.empty(); - }), - pingFail.asMono() - .then(Mono.fromCallable(() -> { - var ex = new ConnectException("Server did not respond to ping"); - ex.setStackTrace(new StackTraceElement[0]); - throw ex; - })).onErrorResume(ex -> MonoUtils.fromBlockingSingle(() -> { - EmitResult result; - while ((result = this.crash.tryEmitError(ex)) == EmitResult.FAIL_NON_SERIALIZED) { - // 10ms - LockSupport.parkNanos(10000000); - } - return result; - })) - .takeUntilOther(Mono - .firstWithSignal(crash.asMono(), authStateClosing.asMono()) - .onErrorResume(e -> Mono.empty()) - ) - ) - .doOnTerminate(() -> logger.trace("TakeUntilOther has been trigghered, the receive() flux will end")) - ) + .takeUntilOther(pingFail.asMono()) .takeUntil(a -> a.succeeded() && a.value().stream().anyMatch(item -> { if (item.getConstructor() == UpdateAuthorizationState.CONSTRUCTOR) { return ((UpdateAuthorizationState) item).authorizationState.getConstructor() @@ -305,13 +286,20 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle { }) .concatMap(update -> interceptUpdate(update)) // Redirect errors to crash sink - .doOnError(error -> crash.tryEmitError(error)) + .doOnError(error -> crash.compareAndSet(null, error)) .onErrorResume(ex -> { logger.trace("Absorbing the error, the error has been published using the crash sink", ex); return Mono.empty(); }) - - .doOnTerminate(updatesStreamEnd::tryEmitEmpty); + .doOnCancel(() -> { + }) + .doFinally(s -> { + var pinger = this.pinger.get(); + if (pinger != null) { + pinger.dispose(); + } + updatesStreamEnd.tryEmitEmpty(); + }); } private Mono interceptUpdate(Object update) { @@ -326,7 +314,7 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle { logger.info("Received AuthorizationStateClosed from tdlib"); return cluster.getEventBus() .rxRequest(this.botAddress + ".read-binlog", EMPTY) - .as(MonoUtils::toMono) + .to(RxJava2Adapter::singleToMono) .mapNotNull(Message::body) .doOnNext(latestBinlog -> logger.info("Received binlog from server. Size: {}", BinlogUtils.humanReadableByteCountBin(latestBinlog.binlog().length()))) @@ -346,13 +334,9 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle { // Timeout + 5s (5 seconds extra are used to wait the graceful server-side timeout response) .setSendTimeout(timeout.toMillis() + 5000); - var crashMono = crash.asMono() - .doOnSuccess(s -> logger.debug("Failed request {} because the TDLib session was already crashed", request)) - .then(Mono.>empty()); - var executionMono = cluster.getEventBus() .rxRequest(botAddress + ".execute", req, deliveryOptions) - .as(MonoUtils::toMono) + .to(RxJava2Adapter::singleToMono) .onErrorMap(ex -> ResponseError.newResponseError(request, botAlias, ex)) .>handle((resp, sink) -> { if (resp.body() == null) { @@ -366,9 +350,17 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle { .doOnSuccess(s -> logger.trace("Executed request {}", request)) .doOnError(ex -> logger.debug("Failed request {}: {}", req, ex)); - return Mono - .firstWithSignal(crashMono, executionMono) + return executionMono + .transformDeferred(mono -> { + var crash = this.crash.get(); + if (crash != null) { + logger.debug("Failed request {} because the TDLib session was already crashed", request); + return Mono.empty(); + } else { + return mono; + } + }) .switchIfEmpty(Mono.error(() -> ResponseError.newResponseError(request, botAlias, - new TdError(500, "Client is closed or response is empty")))); + new TdError(500, "The client is closed or the response is empty")))); } } diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/client/NoClustersAvailableException.java b/src/main/java/it/tdlight/tdlibsession/td/middle/client/NoClustersAvailableException.java new file mode 100644 index 0000000..36d0201 --- /dev/null +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/client/NoClustersAvailableException.java @@ -0,0 +1,13 @@ +package it.tdlight.tdlibsession.td.middle.client; + +public class NoClustersAvailableException extends Throwable { + + public NoClustersAvailableException(String error) { + super(error); + } + + @Override + public String toString() { + return "No clusters are available. " + this.getMessage(); + } +} diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java b/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java index 9b1ad81..edd7de7 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java @@ -1,13 +1,21 @@ package it.tdlight.tdlibsession.td.middle.server; import io.reactivex.Completable; +import io.reactivex.processors.BehaviorProcessor; import io.vertx.core.eventbus.DeliveryOptions; import io.vertx.core.eventbus.ReplyException; import io.vertx.core.eventbus.ReplyFailure; +import io.vertx.core.streams.Pipe; +import io.vertx.core.streams.Pump; +import io.vertx.ext.reactivestreams.impl.ReactiveWriteStreamImpl; +import io.vertx.reactivex.RxHelper; +import io.vertx.reactivex.WriteStreamObserver; import io.vertx.reactivex.core.AbstractVerticle; import io.vertx.reactivex.core.eventbus.Message; import io.vertx.reactivex.core.eventbus.MessageConsumer; import io.vertx.reactivex.core.eventbus.MessageProducer; +import io.vertx.reactivex.core.streams.WriteStream; +import io.vertx.reactivex.impl.FlowableReadStream; import it.tdlight.jni.TdApi; import it.tdlight.jni.TdApi.AuthorizationStateClosed; import it.tdlight.jni.TdApi.Error; @@ -25,7 +33,6 @@ import it.tdlight.tdlibsession.td.middle.TdResultList; import it.tdlight.tdlibsession.td.middle.TdResultListMessageCodec; import it.tdlight.tdlibsession.td.middle.TdResultMessage; import it.tdlight.utils.BinlogUtils; -import it.tdlight.utils.MonoUtils; import java.net.ConnectException; import java.time.Duration; import java.util.List; @@ -33,6 +40,8 @@ import java.util.Objects; import java.util.concurrent.atomic.AtomicReference; import org.warp.commonutils.log.Logger; import org.warp.commonutils.log.LoggerFactory; +import reactor.adapter.rxjava.RxJava2Adapter; +import reactor.core.Disposable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; @@ -56,10 +65,11 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { // Variables configured at startup private final AtomicReference td = new AtomicReference<>(); - private final AtomicReference>> executeConsumer = new AtomicReference<>(); - private final AtomicReference> readBinlogConsumer = new AtomicReference<>(); - private final AtomicReference> readyToReceiveConsumer = new AtomicReference<>(); - private final AtomicReference> pingConsumer = new AtomicReference<>(); + private final AtomicReference executeConsumer = new AtomicReference<>(); + private final AtomicReference readBinlogConsumer = new AtomicReference<>(); + private final AtomicReference readyToReceiveConsumer = new AtomicReference<>(); + private final AtomicReference pingConsumer = new AtomicReference<>(); + private final AtomicReference clusterPropagationWaiter = new AtomicReference<>(); private final AtomicReference> pipeFlux = new AtomicReference<>(); public AsyncTdMiddleEventBusServer() { @@ -69,38 +79,37 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { @Override public Completable rxStart() { - return MonoUtils - .toCompletable(MonoUtils - .fromBlockingMaybe(() -> { - logger.trace("Stating verticle"); - var botId = config().getInteger("botId"); - if (botId == null || botId <= 0) { - throw new IllegalArgumentException("botId is not set!"); - } - this.botId.set(botId); - var botAddress = "bots.bot." + botId; - this.botAddress.set(botAddress); - var botAlias = config().getString("botAlias"); - if (botAlias == null || botAlias.isEmpty()) { - throw new IllegalArgumentException("botAlias is not set!"); - } - this.botAlias.set(botAlias); - var local = config().getBoolean("local"); - if (local == null) { - throw new IllegalArgumentException("local is not set!"); - } - var implementationDetails = config().getJsonObject("implementationDetails"); - if (implementationDetails == null) { - throw new IllegalArgumentException("implementationDetails is not set!"); - } + return Mono + .fromCallable(() -> { + logger.trace("Stating verticle"); + var botId = config().getInteger("botId"); + if (botId == null || botId <= 0) { + throw new IllegalArgumentException("botId is not set!"); + } + this.botId.set(botId); + var botAddress = "bots.bot." + botId; + this.botAddress.set(botAddress); + var botAlias = config().getString("botAlias"); + if (botAlias == null || botAlias.isEmpty()) { + throw new IllegalArgumentException("botAlias is not set!"); + } + this.botAlias.set(botAlias); + var local = config().getBoolean("local"); + if (local == null) { + throw new IllegalArgumentException("local is not set!"); + } + var implementationDetails = config().getJsonObject("implementationDetails"); + if (implementationDetails == null) { + throw new IllegalArgumentException("implementationDetails is not set!"); + } - var td = new AsyncTdDirectImpl(clientFactory, implementationDetails, botAlias); - this.td.set(td); - return new OnSuccessfulStartRequestInfo(td, botAddress, botAlias, botId, local); - }) - .flatMap(r -> onSuccessfulStartRequest(r.td, r.botAddress, r.botAlias, r.botId, r.local)) - .doOnSuccess(s -> logger.trace("Stated verticle")) - ); + var td = new AsyncTdDirectImpl(clientFactory, implementationDetails, botAlias); + this.td.set(td); + return new OnSuccessfulStartRequestInfo(td, botAddress, botAlias, botId, local); + }) + .flatMap(r -> onSuccessfulStartRequest(r.td, r.botAddress, r.botAlias, r.botId, r.local)) + .doOnSuccess(s -> logger.trace("Started verticle")) + .as(RxJava2Adapter::monoToCompletable); } private static class OnSuccessfulStartRequestInfo { @@ -132,16 +141,13 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { } private Mono listen(AsyncTdDirectImpl td, String botAddress, int botId, boolean local) { - return Mono.create(registrationSink -> { + return Mono.create(registrationSink -> { logger.trace("Preparing listeners"); MessageConsumer> executeConsumer = vertx.eventBus().consumer(botAddress + ".execute"); - this.executeConsumer.set(executeConsumer); - Flux - .>>create(sink -> { - executeConsumer.handler(sink::next); - executeConsumer.endHandler(h -> sink.complete()); - }) + this.executeConsumer.set(executeConsumer + .toFlowable() + .to(RxJava2Adapter::flowableToFlux) .flatMap(msg -> { var body = msg.body(); var request = overrideRequest(body.getRequest(), botId); @@ -154,7 +160,7 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { .single() .doOnSuccess(s -> logger.trace("Executed successfully. Request was {}", request)) .onErrorResume(ex -> Mono.fromRunnable(() -> msg.fail(500, ex.getLocalizedMessage()))) - .flatMap(response -> Mono.fromCallable(() -> { + .map(response -> { var replyOpts = new DeliveryOptions().setLocalOnly(local); var replyValue = new TdResultMessage(response.result(), response.cause()); try { @@ -162,38 +168,32 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { msg.reply(replyValue, replyOpts); return response; } catch (Exception ex) { - logger.debug("Replying with error response: {}. Request was {}", ex.getLocalizedMessage(), - request); + logger.debug("Replying with error response: {}. Request was {}", ex.getLocalizedMessage(), request); msg.fail(500, ex.getLocalizedMessage()); throw ex; } - }).subscribeOn(Schedulers.boundedElastic())); + }); }) .then() - .subscribeOn(Schedulers.parallel()) + .subscribeOn(Schedulers.boundedElastic()) .subscribe(v -> {}, ex -> logger.error("Fatal error when processing an execute request." + " Can't process further requests since the subscription has been broken", ex), () -> logger.trace("Finished handling execute requests") - ); + )); MessageConsumer readBinlogConsumer = vertx.eventBus().consumer(botAddress + ".read-binlog"); - this.readBinlogConsumer.set(readBinlogConsumer); - BinlogUtils + this.readBinlogConsumer.set(BinlogUtils .readBinlogConsumer(vertx, readBinlogConsumer, botId, local) - .subscribeOn(Schedulers.parallel()) - .subscribe(v -> {}, ex -> logger.error("Error when processing a read-binlog request", ex)); + .subscribeOn(Schedulers.boundedElastic()) + .subscribe(v -> {}, ex -> logger.error("Error when processing a read-binlog request", ex))); - MessageConsumer readyToReceiveConsumer = vertx.eventBus().consumer(botAddress - + ".ready-to-receive"); - this.readyToReceiveConsumer.set(readyToReceiveConsumer); + MessageConsumer readyToReceiveConsumer = vertx.eventBus().consumer(botAddress + ".ready-to-receive"); // Pipe the data - Flux - .>create(sink -> { - readyToReceiveConsumer.handler(sink::next); - readyToReceiveConsumer.endHandler(h -> sink.complete()); - }) + this.readyToReceiveConsumer.set(readyToReceiveConsumer + .toFlowable() + .to(RxJava2Adapter::flowableToFlux) .take(1, true) .single() .doOnNext(s -> logger.trace("Received ready-to-receive request from client")) @@ -201,8 +201,7 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { .doOnError(ex -> logger.error("Error when processing a ready-to-receive request", ex)) .doOnNext(s -> logger.trace("Replying to ready-to-receive request")) .flatMapMany(tuple -> { - var opts = new DeliveryOptions().setLocalOnly(local) - .setSendTimeout(Duration.ofSeconds(10).toMillis()); + var opts = new DeliveryOptions().setLocalOnly(local).setSendTimeout(Duration.ofSeconds(10).toMillis()); tuple.getT1().reply(EMPTY, opts); logger.trace("Replied to ready-to-receive"); @@ -216,39 +215,33 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { .doOnSuccess(s -> logger.trace("Finished handling ready-to-receive requests (updates pipe ended)")) .subscribeOn(Schedulers.boundedElastic()) // Don't handle errors here. Handle them in pipeFlux - .subscribe(v -> {}); + .subscribe(v -> {})); MessageConsumer pingConsumer = vertx.eventBus().consumer(botAddress + ".ping"); - this.pingConsumer.set(pingConsumer); - Flux - .>create(sink -> { - pingConsumer.handler(sink::next); - pingConsumer.endHandler(h -> sink.complete()); - }) - .concatMap(msg -> Mono.fromCallable(() -> { - var opts = new DeliveryOptions().setLocalOnly(local) - .setSendTimeout(Duration.ofSeconds(10).toMillis()); - msg.reply(EMPTY, opts); - return null; - })) - .then() - .subscribeOn(Schedulers.boundedElastic()) - .subscribe(v -> {}, - ex -> logger.error("Error when processing a ping request", ex), - () -> logger.trace("Finished handling ping requests") - ); - executeConsumer - .rxCompletionHandler() - .andThen(readBinlogConsumer.rxCompletionHandler()) - .andThen(readyToReceiveConsumer.rxCompletionHandler()) - .andThen(pingConsumer.rxCompletionHandler()) - .as(MonoUtils::toMono) + this.pingConsumer.set(pingConsumer + .toFlowable() + .to(RxJava2Adapter::flowableToFlux) + .doOnNext(msg -> { + var opts = new DeliveryOptions().setLocalOnly(local).setSendTimeout(Duration.ofSeconds(10).toMillis()); + msg.reply(EMPTY, opts); + }) + .subscribeOn(Schedulers.boundedElastic()) + .subscribe(unused -> logger.trace("Finished handling ping requests"), + ex -> logger.error("Error when processing a ping request", ex) + )); + + var executorPropagated = executeConsumer.rxCompletionHandler().to(RxJava2Adapter::completableToMono); + var readyToReceivePropagated = executeConsumer.rxCompletionHandler().to(RxJava2Adapter::completableToMono); + var readBinLogPropagated = executeConsumer.rxCompletionHandler().to(RxJava2Adapter::completableToMono); + var pingPropagated = executeConsumer.rxCompletionHandler().to(RxJava2Adapter::completableToMono); + + var allPropagated = Mono.when(executorPropagated, readyToReceivePropagated, readBinLogPropagated, pingPropagated); + this.clusterPropagationWaiter.set(allPropagated .doOnSuccess(s -> logger.trace("Finished preparing listeners")) - .subscribeOn(Schedulers.parallel()) - .subscribe(v -> {}, registrationSink::error, registrationSink::success); - }) - .subscribeOn(Schedulers.boundedElastic()); + .subscribeOn(Schedulers.boundedElastic()) + .subscribe(v -> {}, registrationSink::error, registrationSink::success)); + }); } /** @@ -266,35 +259,34 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { @Override public Completable rxStop() { - return MonoUtils.toCompletable(Mono - .fromRunnable(() -> logger.info("Undeploy of bot \"" + botAlias.get() + "\": stopping")) - .then(Mono - .fromCallable(executeConsumer::get) - .flatMap(executeConsumer -> executeConsumer.rxUnregister().as(MonoUtils::toMono)) - .doOnSuccess(s -> logger.trace("Unregistered execute consumer")) - ) - .then(MonoUtils.fromBlockingEmpty(() -> { + return Mono + .fromRunnable(() -> { + logger.info("Undeploy of bot \"" + botAlias.get() + "\": stopping"); + var executeConsumer = this.executeConsumer.get(); + if (executeConsumer != null) { + executeConsumer.dispose(); + logger.trace("Unregistered execute consumer"); + } + var pingConsumer = this.pingConsumer.get(); + if (pingConsumer != null) { + pingConsumer.dispose(); + } var readBinlogConsumer = this.readBinlogConsumer.get(); if (readBinlogConsumer != null) { - Mono - // ReadBinLog will live for another 10 minutes. - // Since every consumer of ReadBinLog is identical, this should not pose a problem. - .delay(Duration.ofMinutes(10)) - .then(readBinlogConsumer.rxUnregister().as(MonoUtils::toMono)) - .subscribe(); + readBinlogConsumer.dispose(); } - })) - .then(Mono - .fromCallable(readyToReceiveConsumer::get) - .flatMap(ec -> ec.rxUnregister().as(MonoUtils::toMono)) - ) - .then(Mono - .fromCallable(pingConsumer::get) - .flatMap(ec -> ec.rxUnregister().as(MonoUtils::toMono)) - ) + var readyToReceiveConsumer = this.readyToReceiveConsumer.get(); + if (readyToReceiveConsumer != null) { + readyToReceiveConsumer.dispose(); + } + var clusterPropagationWaiter = this.clusterPropagationWaiter.get(); + if (clusterPropagationWaiter != null) { + clusterPropagationWaiter.dispose(); + } + }) .doOnError(ex -> logger.error("Undeploy of bot \"" + botAlias.get() + "\": stop failed", ex)) .doOnTerminate(() -> logger.info("Undeploy of bot \"" + botAlias.get() + "\": stopped")) - ); + .as(RxJava2Adapter::monoToCompletable); } private Mono pipe(AsyncTdDirectImpl td, String botAddress, boolean local) { @@ -342,7 +334,7 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { var pipeFlux = updatesFlux .concatMap(updatesList -> updatesSender .rxWrite(updatesList) - .as(MonoUtils::toMono) + .to(RxJava2Adapter::completableToMono) .thenReturn(updatesList) ) .concatMap(updatesList -> Flux @@ -355,13 +347,13 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { if (tdUpdateAuthorizationState.authorizationState.getConstructor() == AuthorizationStateClosed.CONSTRUCTOR) { logger.info("Undeploying after receiving AuthorizationStateClosed"); - return rxStop().as(MonoUtils::toMono).thenReturn(item); + return rxStop().to(RxJava2Adapter::completableToMono).thenReturn(item); } } } else if (item instanceof Error) { // An error in updates means that a fatal error occurred logger.info("Undeploying after receiving a fatal error"); - return rxStop().as(MonoUtils::toMono).thenReturn(item); + return rxStop().to(RxJava2Adapter::completableToMono).thenReturn(item); } return Mono.just(item); }) @@ -393,10 +385,10 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { .execute(new TdApi.Close(), Duration.ofDays(1), false) .doOnError(ex2 -> logger.error("Unexpected error", ex2)) .doOnSuccess(s -> logger.debug("Emergency Close() signal has been sent successfully")) - .then(rxStop().as(MonoUtils::toMono)); + .then(rxStop().to(RxJava2Adapter::completableToMono)); }); - return MonoUtils.fromBlockingEmpty(() -> { + return Mono.fromRunnable(() -> { this.pipeFlux.set(pipeFlux); logger.trace("Prepared piping requests successfully"); }); diff --git a/src/main/java/it/tdlight/utils/MonoUtils.java b/src/main/java/it/tdlight/utils/MonoUtils.java index 83b246d..7861c1e 100644 --- a/src/main/java/it/tdlight/utils/MonoUtils.java +++ b/src/main/java/it/tdlight/utils/MonoUtils.java @@ -35,6 +35,7 @@ import org.warp.commonutils.concurrency.future.CompletableFutureUtils; import org.warp.commonutils.functional.IOConsumer; import org.warp.commonutils.log.Logger; import org.warp.commonutils.log.LoggerFactory; +import reactor.adapter.rxjava.RxJava2Adapter; import reactor.core.CoreSubscriber; import reactor.core.publisher.Flux; import reactor.core.publisher.FluxSink.OverflowStrategy; @@ -171,38 +172,12 @@ public class MonoUtils { public static Flux> fromReplyableMessageConsumer(Mono onRegistered, MessageConsumer messageConsumer) { - Mono endMono = Mono.create(sink -> { - AtomicBoolean alreadyRequested = new AtomicBoolean(); - sink.onRequest(n -> { - if (n > 0 && alreadyRequested.compareAndSet(false, true)) { - messageConsumer.endHandler(e -> sink.success()); - } - }); - }); - - Mono> registrationCompletionMono = Mono - .fromRunnable(() -> logger.trace("Waiting for consumer registration completion...")) - .then(messageConsumer.rxCompletionHandler().as(MonoUtils::toMono)) + var registration = messageConsumer + .rxCompletionHandler().to(RxJava2Adapter::completableToMono) + .doFirst(() -> logger.trace("Waiting for consumer registration completion...")) .doOnSuccess(s -> logger.trace("Consumer registered")) - .then(onRegistered) - .thenReturn(messageConsumer); - - messageConsumer.handler(s -> { - throw new IllegalStateException("Subscriber still didn't request any value!"); - }); - - Flux> dataFlux = Flux - .push(sink -> sink.onRequest(n -> messageConsumer.handler(sink::next)), OverflowStrategy.ERROR); - - Mono disposeMono = messageConsumer - .rxUnregister() - .as(MonoUtils::>toMono) - .doOnSuccess(s -> logger.trace("Unregistered message consumer")) - .then(); - - return Flux - .usingWhen(registrationCompletionMono, msgCons -> dataFlux, msgCons -> disposeMono) - .takeUntilOther(endMono); + .then(onRegistered); + return messageConsumer.toFlowable().to(RxJava2Adapter::flowableToFlux).mergeWith(registration.then(Mono.empty())); } public static Scheduler newBoundedSingle(String name) {