From b51fcbbf9077c39631aabaab2ccec55e2febb2a4 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Fri, 22 Jan 2021 17:31:09 +0100 Subject: [PATCH] Use RxJava2 --- pom.xml | 10 + .../it/tdlight/tdlibsession/EventBusFlux.java | 8 +- .../remoteclient/TDLibRemoteClient.java | 6 +- .../td/middle/TdClusterManager.java | 29 +- .../client/AsyncTdMiddleEventBusClient.java | 462 ++++++++---------- .../td/middle/direct/AsyncTdMiddleDirect.java | 52 +- .../td/middle/direct/AsyncTdMiddleLocal.java | 37 +- .../server/AsyncTdMiddleEventBusServer.java | 15 +- src/main/java/it/tdlight/utils/MonoUtils.java | 48 +- 9 files changed, 302 insertions(+), 365 deletions(-) diff --git a/pom.xml b/pom.xml index 87885fb..5026b3f 100644 --- a/pom.xml +++ b/pom.xml @@ -72,6 +72,16 @@ ${vertx.version} test + + io.vertx + vertx-reactive-streams + ${vertx.version} + + + io.vertx + vertx-rx-java2 + ${vertx.version} + org.junit.jupiter junit-jupiter-api diff --git a/src/main/java/it/tdlight/tdlibsession/EventBusFlux.java b/src/main/java/it/tdlight/tdlibsession/EventBusFlux.java index 91785c7..e388aa5 100644 --- a/src/main/java/it/tdlight/tdlibsession/EventBusFlux.java +++ b/src/main/java/it/tdlight/tdlibsession/EventBusFlux.java @@ -1,11 +1,11 @@ package it.tdlight.tdlibsession; import io.vertx.core.eventbus.DeliveryOptions; -import io.vertx.core.eventbus.EventBus; -import io.vertx.core.eventbus.Message; -import io.vertx.core.eventbus.MessageCodec; -import io.vertx.core.eventbus.MessageConsumer; import io.vertx.core.eventbus.ReplyException; +import io.vertx.reactivex.core.eventbus.EventBus; +import io.vertx.reactivex.core.eventbus.Message; +import io.vertx.core.eventbus.MessageCodec; +import io.vertx.reactivex.core.eventbus.MessageConsumer; import it.tdlight.utils.MonoUtils; import java.net.ConnectException; import java.time.Duration; diff --git a/src/main/java/it/tdlight/tdlibsession/remoteclient/TDLibRemoteClient.java b/src/main/java/it/tdlight/tdlibsession/remoteclient/TDLibRemoteClient.java index fbd2d11..d449668 100644 --- a/src/main/java/it/tdlight/tdlibsession/remoteclient/TDLibRemoteClient.java +++ b/src/main/java/it/tdlight/tdlibsession/remoteclient/TDLibRemoteClient.java @@ -3,10 +3,10 @@ package it.tdlight.tdlibsession.remoteclient; import io.vertx.core.AsyncResult; import io.vertx.core.Future; import io.vertx.core.Handler; -import io.vertx.core.Promise; -import io.vertx.core.eventbus.Message; +import io.vertx.reactivex.core.Promise; +import io.vertx.reactivex.core.eventbus.Message; import io.vertx.core.net.JksOptions; -import io.vertx.core.shareddata.AsyncMap; +import io.vertx.reactivex.core.shareddata.AsyncMap; import it.tdlight.common.Init; import it.tdlight.common.utils.CantLoadLibrary; import it.tdlight.tdlibsession.td.middle.TdClusterManager; diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/TdClusterManager.java b/src/main/java/it/tdlight/tdlibsession/td/middle/TdClusterManager.java index 787b2b4..8978f1c 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/TdClusterManager.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/TdClusterManager.java @@ -9,18 +9,20 @@ import com.hazelcast.config.MergePolicyConfig; import com.hazelcast.config.cp.SemaphoreConfig; import io.vertx.core.DeploymentOptions; import io.vertx.core.Handler; -import io.vertx.core.Vertx; import io.vertx.core.VertxOptions; import io.vertx.core.eventbus.DeliveryOptions; -import io.vertx.core.eventbus.EventBus; -import io.vertx.core.eventbus.Message; import io.vertx.core.eventbus.MessageCodec; -import io.vertx.core.eventbus.MessageConsumer; import io.vertx.core.http.ClientAuth; import io.vertx.core.net.JksOptions; -import io.vertx.core.shareddata.SharedData; import io.vertx.core.spi.cluster.ClusterManager; +import io.vertx.reactivex.core.Vertx; +import io.vertx.reactivex.core.eventbus.EventBus; +import io.vertx.reactivex.core.eventbus.Message; +import io.vertx.reactivex.core.eventbus.MessageConsumer; +import io.vertx.reactivex.core.shareddata.SharedData; import io.vertx.spi.cluster.hazelcast.HazelcastClusterManager; +import it.tdlight.common.ConstructorDetector; +import it.tdlight.tdlibsession.td.TdResultMessage; import it.tdlight.utils.MonoUtils; import java.nio.channels.AlreadyBoundException; import java.util.ArrayList; @@ -45,6 +47,18 @@ public class TdClusterManager { this.mgr = mgr; this.vertxOptions = vertxOptions; this.vertx = vertx; + + if (vertx != null && vertx.eventBus() != null) { + vertx + .eventBus() + .getDelegate() + .registerDefaultCodec(TdResultList.class, new TdResultListMessageCodec()) + .registerDefaultCodec(ExecuteObject.class, new TdExecuteObjectMessageCodec()) + .registerDefaultCodec(TdResultMessage.class, new TdResultMessageCodec()); + for (Class value : ConstructorDetector.getTDConstructorsUnsafe().values()) { + vertx.eventBus().getDelegate().registerDefaultCodec(value, new TdMessageCodec(value)); + } + } } public static Mono ofMaster(JksOptions keyStoreOptions, JksOptions trustStoreOptions, boolean onlyLocal, String masterHostname, String netInterface, int port, Set nodesAddresses) { @@ -169,14 +183,13 @@ public class TdClusterManager { /** * - * @param objectClass * @param messageCodec * @param * @return true if registered, false if already registered */ - public boolean registerDefaultCodec(Class objectClass, MessageCodec messageCodec) { + public boolean registerCodec(MessageCodec messageCodec) { try { - vertx.eventBus().registerDefaultCodec(objectClass, messageCodec); + vertx.eventBus().registerCodec(messageCodec); return true; } catch (IllegalStateException ex) { if (ex.getMessage().startsWith("Already a default codec registered for class")) { diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java b/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java index 835c55e..744cae9 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java @@ -1,14 +1,12 @@ package it.tdlight.tdlibsession.td.middle.client; -import io.vertx.circuitbreaker.CircuitBreaker; +import io.reactivex.Completable; import io.vertx.circuitbreaker.CircuitBreakerOptions; -import io.vertx.core.AbstractVerticle; -import io.vertx.core.AsyncResult; -import io.vertx.core.Promise; import io.vertx.core.eventbus.DeliveryOptions; -import io.vertx.core.eventbus.Message; import io.vertx.core.json.JsonObject; -import it.tdlight.common.ConstructorDetector; +import io.vertx.reactivex.circuitbreaker.CircuitBreaker; +import io.vertx.reactivex.core.AbstractVerticle; +import io.vertx.reactivex.core.eventbus.Message; import it.tdlight.jni.TdApi; import it.tdlight.jni.TdApi.AuthorizationStateClosed; import it.tdlight.jni.TdApi.Error; @@ -21,16 +19,12 @@ import it.tdlight.tdlibsession.td.TdResultMessage; import it.tdlight.tdlibsession.td.middle.AsyncTdMiddle; import it.tdlight.tdlibsession.td.middle.ExecuteObject; import it.tdlight.tdlibsession.td.middle.TdClusterManager; -import it.tdlight.tdlibsession.td.middle.TdExecuteObjectMessageCodec; -import it.tdlight.tdlibsession.td.middle.TdMessageCodec; import it.tdlight.tdlibsession.td.middle.TdResultList; import it.tdlight.tdlibsession.td.middle.TdResultListMessageCodec; -import it.tdlight.tdlibsession.td.middle.TdResultMessageCodec; import it.tdlight.utils.MonoUtils; import java.net.ConnectException; import java.time.Duration; import java.util.Objects; -import java.util.StringJoiner; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.warp.commonutils.error.InitializationException; @@ -38,7 +32,7 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.SignalType; import reactor.core.publisher.Sinks; -import reactor.core.publisher.Sinks.Many; +import reactor.core.publisher.Sinks.Empty; import reactor.core.publisher.Sinks.One; import reactor.core.scheduler.Schedulers; @@ -49,69 +43,43 @@ public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements Asy public static final boolean OUTPUT_REQUESTS = false; public static final byte[] EMPTY = new byte[0]; - private final Many tdCloseRequested = Sinks.many().replay().latestOrDefault(false); - private final Many tdClosed = Sinks.many().replay().latestOrDefault(false); - private final One tdCrash = Sinks.one(); + private final TdClusterManager cluster; private final DeliveryOptions deliveryOptions; private final DeliveryOptions deliveryOptionsWithTimeout; - private TdClusterManager cluster; + private final Empty tdCloseRequested = Sinks.empty(); + private final Empty tdClosed = Sinks.empty(); + private final One tdCrashed = Sinks.one(); private String botAddress; private String botAlias; private boolean local; private long initTime; - @SuppressWarnings({"unchecked", "rawtypes"}) public AsyncTdMiddleEventBusClient(TdClusterManager clusterManager) { cluster = clusterManager; - if (cluster.registerDefaultCodec(TdResultList.class, new TdResultListMessageCodec())) { - cluster.registerDefaultCodec(ExecuteObject.class, new TdExecuteObjectMessageCodec()); - cluster.registerDefaultCodec(TdResultMessage.class, new TdResultMessageCodec()); - for (Class value : ConstructorDetector.getTDConstructorsUnsafe().values()) { - cluster.registerDefaultCodec(value, new TdMessageCodec(value)); - } - } this.deliveryOptions = cluster.newDeliveryOpts().setLocalOnly(local); this.deliveryOptionsWithTimeout = cluster.newDeliveryOpts().setLocalOnly(local).setSendTimeout(30000); } - public static Mono getAndDeployInstance(TdClusterManager clusterManager, String botAlias, String botAddress, boolean local) throws InitializationException { - try { - var instance = new AsyncTdMiddleEventBusClient(clusterManager); - var options = clusterManager.newDeploymentOpts().setConfig(new JsonObject() - .put("botAddress", botAddress) - .put("botAlias", botAlias) - .put("local", local)); - return MonoUtils.executeAsFuture(promise -> { - clusterManager.getVertx().deployVerticle(instance, options, promise); - }).doOnNext(_v -> { - logger.trace("Deployed verticle for bot address: " + botAddress); - }).thenReturn(instance); - } catch (RuntimeException e) { - throw new InitializationException(e); - } + public static Mono getAndDeployInstance(TdClusterManager clusterManager, + String botAlias, + String botAddress, + boolean local) { + var instance = new AsyncTdMiddleEventBusClient(clusterManager); + var options = clusterManager + .newDeploymentOpts() + .setConfig(new JsonObject().put("botAddress", botAddress).put("botAlias", botAlias).put("local", local)); + return clusterManager + .getVertx() + .rxDeployVerticle(instance, options) + .as(MonoUtils::toMono) + .doOnSuccess(s -> logger.trace("Deployed verticle for bot address: " + botAddress)) + .thenReturn(instance); } @Override - public void start(Promise startPromise) { - var botAddress = config().getString("botAddress"); - if (botAddress == null || botAddress.isEmpty()) { - throw new IllegalArgumentException("botAddress is not set!"); - } - this.botAddress = botAddress; - var botAlias = config().getString("botAlias"); - if (botAlias == null || botAlias.isEmpty()) { - throw new IllegalArgumentException("botAlias is not set!"); - } - this.botAlias = botAlias; - var local = config().getBoolean("local"); - if (local == null) { - throw new IllegalArgumentException("local is not set!"); - } - this.local = local; - this.initTime = System.currentTimeMillis(); - + public Completable rxStart() { CircuitBreaker startBreaker = CircuitBreaker.create("bot-" + botAddress + "-server-online-check-circuit-breaker", vertx, new CircuitBreakerOptions().setMaxFailures(1).setMaxRetries(4).setTimeout(30000) ) @@ -123,238 +91,204 @@ public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements Asy logger.error("Circuit closed! " + botAddress); }); - startBreaker.execute(future -> { - try { - logger.debug("Requesting tdlib.remoteclient.clients.deploy.existing"); - cluster.getEventBus().publish("tdlib.remoteclient.clients.deploy", botAddress, deliveryOptions); - - - logger.debug("Waiting for " + botAddress + ".readyToStart"); - var readyToStartConsumer = cluster.getEventBus().consumer(botAddress + ".readyToStart"); - readyToStartConsumer.handler((Message pingMsg) -> { - logger.debug("Received ping reply (succeeded)"); - readyToStartConsumer.unregister(unregistered -> { - // Reply instantly - pingMsg.reply(new byte[0]); - if (unregistered.succeeded()) { - logger.debug("Requesting " + botAddress + ".start"); - cluster - .getEventBus() - .request(botAddress + ".start", EMPTY, deliveryOptionsWithTimeout, startMsg -> { - if (startMsg.succeeded()) { - logger.debug("Requesting " + botAddress + ".isWorking"); - cluster - .getEventBus() - .request(botAddress + ".isWorking", EMPTY, deliveryOptionsWithTimeout, msg -> { - if (msg.succeeded()) { - this.listen() - .timeout(Duration.ofSeconds(30)) - .subscribeOn(Schedulers.single()) - .subscribe(v -> {}, future::fail, future::complete); - } else { - future.fail(msg.cause()); - } - }); - } else { - future.fail(startMsg.cause()); - } - }); - } else { - logger.error("Failed to unregister readyToStartConsumer", unregistered.cause()); - } - }); - }); - } catch (Exception ex) { - future.fail(ex); - } - }) - .onFailure(ex -> { - logger.error("Failure when starting bot " + botAddress, ex); - startPromise.fail(new InitializationException("Can't connect tdlib middle client to tdlib middle server!")); + return Mono + .fromCallable(() -> { + var botAddress = config().getString("botAddress"); + if (botAddress == null || botAddress.isEmpty()) { + throw new IllegalArgumentException("botAddress is not set!"); + } + this.botAddress = botAddress; + var botAlias = config().getString("botAlias"); + if (botAlias == null || botAlias.isEmpty()) { + throw new IllegalArgumentException("botAlias is not set!"); + } + this.botAlias = botAlias; + var local = config().getBoolean("local"); + if (local == null) { + throw new IllegalArgumentException("local is not set!"); + } + this.local = local; + this.initTime = System.currentTimeMillis(); + return null; }) - .onSuccess(v -> startPromise.complete()); + .then(startBreaker.rxExecute(future -> { + logger.debug("Waiting for " + botAddress + ".readyToStart"); + var readyToStartConsumer = cluster.getEventBus().consumer(botAddress + ".readyToStart"); + readyToStartConsumer.handler((Message pingMsg) -> { + logger.debug("Received ping reply (succeeded)"); + readyToStartConsumer + .rxUnregister() + .as(MonoUtils::toMono) + .doOnError(ex -> { + logger.error("Failed to unregister readyToStartConsumer", ex); + }) + .then(Mono.fromCallable(() -> { + pingMsg.reply(new byte[0]); + logger.debug("Requesting " + botAddress + ".start"); + return null; + })) + .then(cluster + .getEventBus() + .rxRequest(botAddress + ".start", EMPTY, deliveryOptionsWithTimeout) + .as(MonoUtils::toMono) + .doOnError(ex -> logger.error("Failed to request bot start", ex))) + .doOnNext(msg -> logger.debug("Requesting " + botAddress + ".isWorking")) + .then(cluster.getEventBus() + .rxRequest(botAddress + ".isWorking", EMPTY, deliveryOptionsWithTimeout) + .as(MonoUtils::toMono) + .doOnError(ex -> logger.error("Failed to request isWorking", ex))) + .subscribe(v -> {}, future::fail, future::complete); + }); + + readyToStartConsumer + .rxCompletionHandler() + .as(MonoUtils::toMono) + .doOnSuccess(s -> logger.debug("Requesting tdlib.remoteclient.clients.deploy.existing")) + .then(Mono.fromCallable(() -> cluster.getEventBus().publish("tdlib.remoteclient.clients.deploy", botAddress, deliveryOptions))) + .subscribeOn(Schedulers.single()) + .subscribe(v -> {}, future::fail); + }).as(MonoUtils::toMono) + ) + .onErrorMap(ex -> { + logger.error("Failure when starting bot " + botAddress, ex); + return new InitializationException("Can't connect tdlib middle client to tdlib middle server!"); + }) + .as(MonoUtils::toCompletable); } @Override - public void stop(Promise stopPromise) { - logger.debug("Stopping AsyncTdMiddle client verticle..."); - tdCloseRequested.asFlux().take(1).single().flatMap(closeRequested -> { - if (!closeRequested) { - return tdCrash.asMono().switchIfEmpty(Mono - .fromRunnable(() -> logger.warn("Verticle is being stopped before closing TDLib with Close()! Sending Close() before stopping...")) - .then(this.execute(new TdApi.Close(), false) - ).then() - .cast(TdApi.Error.class) - ).doOnTerminate(() -> { - logger.debug("Close() sent to td"); - markCloseRequested(); - }).then(); - } else { - return Mono.empty(); - } - }).thenMany(tdClosed.asFlux()).filter(closed -> closed).take(1).subscribe(v -> {}, cause -> { - logger.debug("Failed to stop AsyncTdMiddle client verticle"); - stopPromise.fail(cause); - }, () -> { - logger.debug("Stopped AsyncTdMiddle client verticle"); - stopPromise.complete(); - }); - } - - private Mono listen() { - // Nothing to listen for now - return Mono.empty(); - } - - private static class UpdatesBatchResult { - public final Flux updatesFlux; - public final boolean completed; - - private UpdatesBatchResult(Flux updatesFlux, boolean completed) { - this.updatesFlux = updatesFlux; - this.completed = completed; - } - - @Override - public String toString() { - return new StringJoiner(", ", UpdatesBatchResult.class.getSimpleName() + "[", "]") - .add("updatesFlux=" + updatesFlux) - .add("completed=" + completed) - .toString(); - } + public Completable rxStop() { + return Mono + .fromRunnable(() -> logger.debug("Stopping AsyncTdMiddle client verticle...")) + .then(Mono + .firstWithSignal( + tdCloseRequested.asMono(), + tdClosed.asMono(), + Mono.firstWithSignal( + tdCrashed.asMono(), + Mono + .fromRunnable(() -> logger.warn("Verticle is being stopped before closing TDLib with Close()! Sending Close() before stopping...")) + .then(this.execute(new TdApi.Close(), false)) + .then() + ) + .doOnTerminate(() -> { + logger.debug("Close() sent to td"); + markCloseRequested(); + }) + .then(tdClosed.asMono()) + ) + ) + .doOnError(ex -> logger.debug("Failed to stop AsyncTdMiddle client verticle")) + .doOnSuccess(s -> logger.debug("Stopped AsyncTdMiddle client verticle")) + .as(MonoUtils::toCompletable); } @Override public Flux receive() { var fluxCodec = new TdResultListMessageCodec(); - return tdCloseRequested.asFlux().take(1).single().filter(close -> !close).flatMapMany(_closed -> EventBusFlux - .connect(cluster.getEventBus(), - botAddress + ".updates", - deliveryOptions, - fluxCodec, - Duration.ofMillis(deliveryOptionsWithTimeout.getSendTimeout()) - ) - .filter(Objects::nonNull) - .flatMap(block -> Flux.fromIterable(block.getValues())) - .filter(Objects::nonNull) - .onErrorResume(error -> { - TdApi.Error theError; - if (error instanceof ConnectException) { - theError = new Error(444, "CONNECTION_KILLED"); - } else if (error.getMessage().contains("Timed out")) { - theError = new Error(444, "CONNECTION_KILLED"); - } else { - theError = new Error(406, "INVALID_UPDATE"); - logger.error("Bot updates request failed! Marking as closed.", error); - } - tdCrash.tryEmitValue(theError); - return Flux.just(TdResult.failed(theError)); - }).flatMap(item -> Mono.fromCallable(item::orElseThrow)) - .filter(Objects::nonNull) - .doOnNext(item -> { - if (OUTPUT_REQUESTS) { - System.out.println(" <- " + item.toString() - .replace("\n", " ") - .replace("\t", "") - .replace(" ", "") - .replace(" = ", "=") - ); - } - if (item.getConstructor() == UpdateAuthorizationState.CONSTRUCTOR) { - var state = (UpdateAuthorizationState) item; - if (state.authorizationState.getConstructor() == AuthorizationStateClosed.CONSTRUCTOR) { - // Send tdClosed early to avoid errors - logger.debug("Received AuthorizationStateClosed from td. Marking td as closed"); - markCloseRequested(); - markClosed(); - } - } - })).doFinally(s -> { - if (s == SignalType.ON_ERROR) { - // Send tdClosed early to avoid errors - logger.debug("Updates flux terminated with an error signal. Marking td as closed"); - markCloseRequested(); - markClosed(); - } - }); + return Flux + .firstWithSignal( + tdCloseRequested.asMono().flux().cast(TdApi.Object.class), + tdClosed.asMono().flux().cast(TdApi.Object.class), + EventBusFlux + .connect(cluster.getEventBus(), + botAddress + ".updates", + deliveryOptions, + fluxCodec, + Duration.ofMillis(deliveryOptionsWithTimeout.getSendTimeout()) + ) + .filter(Objects::nonNull) + .flatMap(block -> Flux.fromIterable(block.getValues())) + .filter(Objects::nonNull) + .onErrorResume(error -> { + TdApi.Error theError; + if (error instanceof ConnectException) { + theError = new Error(444, "CONNECTION_KILLED"); + } else if (error.getMessage().contains("Timed out")) { + theError = new Error(444, "CONNECTION_KILLED"); + } else { + theError = new Error(406, "INVALID_UPDATE"); + logger.error("Bot updates request failed! Marking as closed.", error); + } + tdCrashed.tryEmitValue(theError); + return Flux.just(TdResult.failed(theError)); + }).flatMap(item -> Mono.fromCallable(item::orElseThrow)) + .filter(Objects::nonNull) + .doOnNext(item -> { + if (OUTPUT_REQUESTS) { + System.out.println(" <- " + item.toString() + .replace("\n", " ") + .replace("\t", "") + .replace(" ", "") + .replace(" = ", "=") + ); + } + if (item.getConstructor() == UpdateAuthorizationState.CONSTRUCTOR) { + var state = (UpdateAuthorizationState) item; + if (state.authorizationState.getConstructor() == AuthorizationStateClosed.CONSTRUCTOR) { + // Send tdClosed early to avoid errors + logger.debug("Received AuthorizationStateClosed from td. Marking td as closed"); + markCloseRequested(); + markClosed(); + } + } + }).doFinally(s -> { + if (s == SignalType.ON_ERROR) { + // Send tdClosed early to avoid errors + logger.debug("Updates flux terminated with an error signal. Marking td as closed"); + markCloseRequested(); + markClosed(); + } + }) + ); } private void markCloseRequested() { - if (tdCloseRequested.tryEmitNext(true).isFailure()) { + if (tdCloseRequested.tryEmitEmpty().isFailure()) { logger.error("Failed to set tdCloseRequested"); - if (tdCloseRequested.tryEmitComplete().isFailure()) { - logger.error("Failed to complete tdCloseRequested"); - } } } private void markClosed() { - if (tdCrash.tryEmitEmpty().isFailure()) { - logger.debug("TDLib already crashed"); - } - if (tdClosed.tryEmitNext(true).isFailure()) { + if (tdClosed.tryEmitEmpty().isFailure()) { logger.error("Failed to set tdClosed"); - if (tdClosed.tryEmitComplete().isFailure()) { - logger.error("Failed to complete tdClosed"); - } + } + if (tdCrashed.tryEmitEmpty().isFailure()) { + logger.debug("TDLib already crashed"); } } @Override public Mono> execute(Function request, boolean executeDirectly) { - var req = new ExecuteObject(executeDirectly, request); - if (OUTPUT_REQUESTS) { - System.out.println(" -> " + request.toString() - .replace("\n", " ") - .replace("\t", "") - .replace(" ", "") - .replace(" = ", "=")); - } - - var crashMono = tdCrash.asMono().>map(TdResult::failed); - var executeMono = tdCloseRequested.asFlux().take(1).single().filter(close -> !close).>flatMap((_x) -> Mono.create(sink -> { - try { - cluster - .getEventBus() - .request(botAddress + ".execute", - req, - deliveryOptions, - (AsyncResult> event) -> { - try { - if (event.succeeded()) { - if (event.result().body() == null) { - sink.error(new NullPointerException("Response is empty")); - } else { - sink.success(Objects.requireNonNull(event.result().body()).toTdResult()); - } - } else { - sink.error(ResponseError.newResponseError(request, botAlias, event.cause())); - } - } catch (Throwable t) { - sink.error(t); + return Mono + .fromRunnable(() -> { + if (OUTPUT_REQUESTS) { + System.out.println(" -> " + request.toString() + .replace("\n", " ") + .replace("\t", "") + .replace(" ", "") + .replace(" = ", "=")); + } + }) + .then(Mono.firstWithSignal( + tdCloseRequested.asMono().flatMap(t -> Mono.empty()), + tdClosed.asMono().flatMap(t -> Mono.empty()), + cluster.getEventBus() + .rxRequest(botAddress + ".execute", req, deliveryOptions) + .as(MonoUtils::toMono) + .onErrorMap(ex -> ResponseError.newResponseError(request, botAlias, ex)) + .>flatMap(resp -> resp.body() == null ? Mono.>error(new NullPointerException("Response is empty")) : Mono.just(resp.body().toTdResult())) + .switchIfEmpty(Mono.defer(() -> Mono.just(TdResult.failed(new TdApi.Error(500, "Client is closed or response is empty"))))) + .doOnNext(response -> { + if (OUTPUT_REQUESTS) { + System.out.println(" <- " + response.toString() + .replace("\n", " ") + .replace("\t", "") + .replace(" ", "") + .replace(" = ", "=")); } - } - ); - } catch (Throwable t) { - sink.error(t); - } - })).>switchIfEmpty(Mono.defer(() -> Mono.just(TdResult.failed(new TdApi.Error(500, - "Client is closed or response is empty" - ))))).>handle((response, sink) -> { - try { - Objects.requireNonNull(response); - if (OUTPUT_REQUESTS) { - System.out.println( - " <- " + response.toString().replace("\n", " ").replace("\t", "").replace(" ", "").replace(" = ", "=")); - } - sink.next(response); - } catch (Exception e) { - sink.error(e); - } - }).switchIfEmpty(Mono.fromSupplier(() -> { - return TdResult.failed(new TdApi.Error(500, "Client is closed or response is empty")); - })); - return Mono.firstWithValue(crashMono, executeMono); + }) + )); } } diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/direct/AsyncTdMiddleDirect.java b/src/main/java/it/tdlight/tdlibsession/td/middle/direct/AsyncTdMiddleDirect.java index f1d9494..dee4bbb 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/direct/AsyncTdMiddleDirect.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/direct/AsyncTdMiddleDirect.java @@ -17,7 +17,6 @@ import it.tdlight.tdlibsession.td.middle.TdClusterManager; import it.tdlight.utils.MonoUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.warp.commonutils.error.InitializationException; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.Sinks; @@ -30,27 +29,23 @@ public class AsyncTdMiddleDirect extends AbstractVerticle implements AsyncTdMidd protected AsyncTdDirectImpl td; private String botAddress; private String botAlias; - private Empty closeRequest = Sinks.empty(); + private final Empty closeRequest = Sinks.empty(); public AsyncTdMiddleDirect() { } - public static Mono getAndDeployInstance(TdClusterManager clusterManager, + public static Mono getAndDeployInstance(TdClusterManager clusterManager, String botAlias, - String botAddress) throws InitializationException { - try { + String botAddress) { var instance = new AsyncTdMiddleDirect(); var options = clusterManager.newDeploymentOpts().setConfig(new JsonObject() .put("botAlias", botAlias) .put("botAddress", botAddress)); - return MonoUtils.executeAsFuture(promise -> { - clusterManager.getVertx().deployVerticle(instance, options, promise); - }).doOnNext(_v -> { - logger.trace("Deployed verticle for bot " + botAlias + ", address: " + botAddress); - }).thenReturn(instance); - } catch (RuntimeException e) { - throw new InitializationException(e); - } + return clusterManager.getVertx() + .rxDeployVerticle(instance, options) + .as(MonoUtils::toMono) + .doOnNext(_v -> logger.trace("Deployed verticle for bot " + botAlias + ", address: " + botAddress)) + .thenReturn(instance); } @Override @@ -82,30 +77,21 @@ public class AsyncTdMiddleDirect extends AbstractVerticle implements AsyncTdMidd return td .receive(new AsyncTdDirectOptions(WAIT_DURATION, 1000)) .takeUntilOther(closeRequest.asMono()) - .doOnError(ex -> { - logger.info("TdMiddle verticle error", ex); - }) - .doOnTerminate(() -> { - logger.debug("TdMiddle verticle stopped"); - }).flatMap(result -> { - if (result.succeeded()) { - return Mono.just(result.result()); - } else { - logger.error("Received an errored update", - ResponseError.newResponseError("incoming update", botAlias, result.cause())); - return Mono.empty(); + .doOnError(ex -> logger.info("TdMiddle verticle error", ex)) + .doOnTerminate(() -> logger.debug("TdMiddle verticle stopped")) + .doOnNext(result -> { + if (result.failed()) { + logger.error("Received an errored update: {}", result.cause()); } - }); + }) + .filter(TdResult::succeeded) + .map(TdResult::result); } @Override public Mono> execute(Function requestFunction, boolean executeDirectly) { - return td.execute(requestFunction, executeDirectly).onErrorMap(error -> { - return ResponseError.newResponseError( - requestFunction, - botAlias, - error - ); - }); + return td + .execute(requestFunction, executeDirectly) + .onErrorMap(error -> ResponseError.newResponseError(requestFunction, botAlias, error)); } } diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/direct/AsyncTdMiddleLocal.java b/src/main/java/it/tdlight/tdlibsession/td/middle/direct/AsyncTdMiddleLocal.java index e21a339..f85e375 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/direct/AsyncTdMiddleLocal.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/direct/AsyncTdMiddleLocal.java @@ -4,28 +4,25 @@ import it.tdlight.jni.TdApi; import it.tdlight.jni.TdApi.Function; import it.tdlight.jni.TdApi.Object; import it.tdlight.tdlibsession.td.TdResult; -import it.tdlight.tdlibsession.td.direct.AsyncTdDirectImpl; import it.tdlight.tdlibsession.td.middle.AsyncTdMiddle; import it.tdlight.tdlibsession.td.middle.TdClusterManager; import it.tdlight.tdlibsession.td.middle.client.AsyncTdMiddleEventBusClient; import it.tdlight.tdlibsession.td.middle.server.AsyncTdMiddleEventBusServer; -import java.util.Objects; import org.warp.commonutils.error.InitializationException; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import reactor.core.publisher.ReplayProcessor; +import reactor.core.publisher.Sinks; +import reactor.core.publisher.Sinks.One; public class AsyncTdMiddleLocal implements AsyncTdMiddle { - private final AsyncTdDirectImpl td; private final AsyncTdMiddleEventBusServer srv; private final TdClusterManager masterClusterManager; - private ReplayProcessor cli = ReplayProcessor.cacheLast(); + private final One cli = Sinks.one(); private final String botAlias; private final String botAddress; - public AsyncTdMiddleLocal(TdClusterManager masterClusterManager, String botAlias, String botAddress) throws InitializationException { - this.td = new AsyncTdDirectImpl(botAlias); + public AsyncTdMiddleLocal(TdClusterManager masterClusterManager, String botAlias, String botAddress) { this.srv = new AsyncTdMiddleEventBusServer(masterClusterManager); this.masterClusterManager = masterClusterManager; this.botAlias = botAlias; @@ -33,28 +30,24 @@ public class AsyncTdMiddleLocal implements AsyncTdMiddle { } public Mono start() { - return srv.start(botAddress, botAlias, true).onErrorMap(InitializationException::new).flatMap(_x -> { - try { - return AsyncTdMiddleEventBusClient.getAndDeployInstance(masterClusterManager, botAlias, botAddress, true).doOnNext(cli -> { - this.cli.onNext(cli); - }).doOnError(error -> this.cli.onError(error)).doOnSuccess(_v -> this.cli.onComplete()); - } catch (InitializationException e) { - this.cli.onError(e); - return Mono.error(e); - } - }).map(v -> this); + return srv + .start(botAddress, botAlias, true) + .onErrorMap(InitializationException::new) + .single() + .then(AsyncTdMiddleEventBusClient.getAndDeployInstance(masterClusterManager, botAlias, botAddress, true)) + .single() + .doOnNext(this.cli::tryEmitValue) + .doOnError(this.cli::tryEmitError) + .thenReturn(this); } @Override public Flux receive() { - return cli.filter(Objects::nonNull).single().flatMapMany(AsyncTdMiddleEventBusClient::receive); + return cli.asMono().single().flatMapMany(AsyncTdMiddle::receive); } @Override public Mono> execute(Function request, boolean executeDirectly) { - return cli - .filter(obj -> Objects.nonNull(obj)) - .single() - .flatMap(c -> c.execute(request, executeDirectly)); + return cli.asMono().single().flatMap(c -> c.execute(request, executeDirectly)); } } diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java b/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java index daecdde..7744196 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java @@ -6,9 +6,8 @@ import io.vertx.core.AsyncResult; import io.vertx.core.Future; import io.vertx.core.Handler; import io.vertx.core.Promise; -import io.vertx.core.eventbus.Message; -import io.vertx.core.eventbus.MessageConsumer; -import it.tdlight.common.ConstructorDetector; +import io.vertx.reactivex.core.eventbus.Message; +import io.vertx.reactivex.core.eventbus.MessageConsumer; import it.tdlight.jni.TdApi; import it.tdlight.jni.TdApi.AuthorizationStateClosed; import it.tdlight.jni.TdApi.Error; @@ -21,11 +20,8 @@ import it.tdlight.tdlibsession.td.direct.AsyncTdDirectImpl; import it.tdlight.tdlibsession.td.direct.AsyncTdDirectOptions; import it.tdlight.tdlibsession.td.middle.ExecuteObject; import it.tdlight.tdlibsession.td.middle.TdClusterManager; -import it.tdlight.tdlibsession.td.middle.TdExecuteObjectMessageCodec; -import it.tdlight.tdlibsession.td.middle.TdMessageCodec; import it.tdlight.tdlibsession.td.middle.TdResultList; import it.tdlight.tdlibsession.td.middle.TdResultListMessageCodec; -import it.tdlight.tdlibsession.td.middle.TdResultMessageCodec; import it.tdlight.utils.MonoUtils; import java.time.Duration; import java.util.ArrayList; @@ -71,13 +67,6 @@ public class AsyncTdMiddleEventBusServer { public AsyncTdMiddleEventBusServer(TdClusterManager clusterManager) { this.cluster = clusterManager; this.tdOptions = new AsyncTdDirectOptions(WAIT_DURATION, 1000); - if (cluster.registerDefaultCodec(TdResultList.class, new TdResultListMessageCodec())) { - cluster.registerDefaultCodec(ExecuteObject.class, new TdExecuteObjectMessageCodec()); - cluster.registerDefaultCodec(TdResultMessage.class, new TdResultMessageCodec()); - for (Class value : ConstructorDetector.getTDConstructorsUnsafe().values()) { - cluster.registerDefaultCodec(value, new TdMessageCodec(value)); - } - } } public Mono start(String botAddress, String botAlias, boolean local) { diff --git a/src/main/java/it/tdlight/utils/MonoUtils.java b/src/main/java/it/tdlight/utils/MonoUtils.java index 4b566a1..c631e55 100644 --- a/src/main/java/it/tdlight/utils/MonoUtils.java +++ b/src/main/java/it/tdlight/utils/MonoUtils.java @@ -1,16 +1,18 @@ package it.tdlight.utils; +import io.reactivex.Completable; +import io.reactivex.Maybe; +import io.reactivex.Single; import io.vertx.core.AsyncResult; +import io.vertx.core.Future; import io.vertx.core.Handler; import io.vertx.core.Promise; -import io.vertx.core.Vertx; import it.tdlight.jni.TdApi; import it.tdlight.jni.TdApi.Object; import it.tdlight.tdlibsession.td.TdError; import it.tdlight.tdlibsession.td.TdResult; import java.util.Objects; import java.util.concurrent.CompletableFuture; -import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Supplier; import org.reactivestreams.Subscription; @@ -59,22 +61,6 @@ public class MonoUtils { return PromiseSink.of(context, promise); } - public static BiConsumer> executeBlockingSink(Vertx vertx, BiConsumer> handler) { - return (value, sink) -> { - vertx.executeBlocking((Promise finished) -> { - handler.accept(value, PromiseSink.of(sink.currentContext(), finished)); - }, toHandler(sink)); - }; - } - - public static Mono executeBlocking(Vertx vertx, Consumer> action) { - return Mono.create((MonoSink sink) -> { - vertx.executeBlocking((Promise finished) -> { - action.accept(toSink(sink.currentContext(), finished)); - }, toHandler(sink)); - }); - } - public static Mono executeAsFuture(Consumer>> action) { return Mono.fromFuture(() -> { return CompletableFutureUtils.getCompletableFuture(() -> { @@ -206,4 +192,30 @@ public class MonoUtils { }, () -> cf.complete(null)); return cf; } + + public static Mono toMono(Future future) { + return Mono.create(sink -> future.onComplete(result -> { + if (result.succeeded()) { + sink.success(result.result()); + } else { + sink.error(result.cause()); + } + })); + } + + public static Mono toMono(Single single) { + return Mono.fromDirect(single.toFlowable()); + } + + public static Mono toMono(Maybe single) { + return Mono.fromDirect(single.toFlowable()); + } + + public static Mono toMono(Completable completable) { + return Mono.fromDirect(completable.toFlowable()); + } + + public static Completable toCompletable(Mono s) { + return Completable.fromPublisher(s); + } }