From a3a2893fb8b34919eb6563e3b5837e87fca14878 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Wed, 13 Jan 2021 19:46:46 +0100 Subject: [PATCH] Replace verticles with normal class --- .../it/tdlight/tdlibsession/EventBusFlux.java | 18 ++- .../remoteclient/TDLibRemoteClient.java | 87 ++++++------- .../tdlibsession/td/easy/AsyncTdEasy.java | 2 +- .../client/AsyncTdMiddleEventBusClient.java | 8 +- .../td/middle/direct/AsyncTdMiddleLocal.java | 11 +- .../server/AsyncTdMiddleEventBusServer.java | 115 ++++++++---------- 6 files changed, 109 insertions(+), 132 deletions(-) diff --git a/src/main/java/it/tdlight/tdlibsession/EventBusFlux.java b/src/main/java/it/tdlight/tdlibsession/EventBusFlux.java index 6a482c1..a8d61e2 100644 --- a/src/main/java/it/tdlight/tdlibsession/EventBusFlux.java +++ b/src/main/java/it/tdlight/tdlibsession/EventBusFlux.java @@ -56,11 +56,23 @@ public class EventBusFlux { MessageConsumer cancel = eventBus.consumer(subscriptionAddress + ".cancel"); var subscription = flux.subscribe(item -> { - eventBus.send(subscriptionAddress + ".signal", SignalMessage.onNext(item), signalDeliveryOptions); + var request = eventBus.request(subscriptionAddress + ".signal", SignalMessage.onNext(item), signalDeliveryOptions, msg2 -> { + if (msg2.failed()) { + logger.error("Failed to send onNext signal", msg2.cause()); + } + }); }, error -> { - eventBus.send(subscriptionAddress + ".signal", SignalMessage.onError(error), signalDeliveryOptions); + eventBus.request(subscriptionAddress + ".signal", SignalMessage.onError(error), signalDeliveryOptions, msg2 -> { + if (msg2.failed()) { + logger.error("Failed to send onNext signal", msg2.cause()); + } + }); }, () -> { - eventBus.send(subscriptionAddress + ".signal", SignalMessage.onComplete(), signalDeliveryOptions); + eventBus.request(subscriptionAddress + ".signal", SignalMessage.onComplete(), signalDeliveryOptions, msg2 -> { + if (msg2.failed()) { + logger.error("Failed to send onNext signal", msg2.cause()); + } + }); }); cancel.handler(msg3 -> { diff --git a/src/main/java/it/tdlight/tdlibsession/remoteclient/TDLibRemoteClient.java b/src/main/java/it/tdlight/tdlibsession/remoteclient/TDLibRemoteClient.java index 87acbe6..fc6015b 100644 --- a/src/main/java/it/tdlight/tdlibsession/remoteclient/TDLibRemoteClient.java +++ b/src/main/java/it/tdlight/tdlibsession/remoteclient/TDLibRemoteClient.java @@ -4,7 +4,6 @@ import io.vertx.core.AsyncResult; import io.vertx.core.Future; import io.vertx.core.Handler; import io.vertx.core.eventbus.Message; -import io.vertx.core.json.JsonObject; import io.vertx.core.net.JksOptions; import io.vertx.core.shareddata.AsyncMap; import it.tdlight.common.Init; @@ -37,7 +36,7 @@ public class TDLibRemoteClient implements AutoCloseable { private final int port; private final Set membersAddresses; private final Many clusterManager = Sinks.many().replay().latest(); - private final Scheduler vertxStatusScheduler = Schedulers.newSingle("VertxStatus", false); + private final Scheduler deploymentScheduler = Schedulers.single(); public TDLibRemoteClient(SecurityInfo securityInfo, String masterHostname, String netInterface, int port, Set membersAddresses) { this.securityInfo = securityInfo; @@ -155,7 +154,9 @@ public class TDLibRemoteClient implements AutoCloseable { }) .doOnError(ex -> { logger.error(ex.getLocalizedMessage(), ex); - }).subscribe(i -> {}, e -> {}, () -> startedEventHandler.handle(null)); + }).subscribe(i -> {}, e -> { + logger.error("Remote client error", e); + }, () -> startedEventHandler.handle(null)); } catch (IOException ex) { logger.error("Remote client error", ex); } @@ -230,56 +231,43 @@ public class TDLibRemoteClient implements AutoCloseable { private void deployBot(TdClusterManager clusterManager, String botAddress, Handler> deploymentHandler) { AsyncTdMiddleEventBusServer verticle = new AsyncTdMiddleEventBusServer(clusterManager); verticle.onBeforeStop(handler -> { - vertxStatusScheduler.schedule(() -> { - clusterManager.getSharedData().getLockWithTimeout("deployment", 15000, lockAcquisitionResult -> { - if (lockAcquisitionResult.succeeded()) { - var deploymentLock = lockAcquisitionResult.result(); - verticle.onAfterStop(handler2 -> { - vertxStatusScheduler.schedule(() -> { - deploymentLock.release(); - handler2.complete(); - }); - }); - clusterManager.getSharedData().getClusterWideMap("runningBotAddresses", (AsyncResult> mapResult) -> { - if (mapResult.succeeded()) { - var runningBotAddresses = mapResult.result(); - runningBotAddresses.removeIfPresent(botAddress, netInterface, putResult -> { - if (putResult.succeeded()) { - if (putResult.result() != null) { - handler.complete(); - } else { - handler.fail("Can't destroy bot with address \"" + botAddress + "\" because it has been already destroyed"); - } + clusterManager.getSharedData().getLockWithTimeout("deployment", 15000, lockAcquisitionResult -> { + if (lockAcquisitionResult.succeeded()) { + var deploymentLock = lockAcquisitionResult.result(); + verticle.onAfterStop(handler2 -> { + deploymentLock.release(); + handler2.complete(); + }); + clusterManager.getSharedData().getClusterWideMap("runningBotAddresses", (AsyncResult> mapResult) -> { + if (mapResult.succeeded()) { + var runningBotAddresses = mapResult.result(); + runningBotAddresses.removeIfPresent(botAddress, netInterface, putResult -> { + if (putResult.succeeded()) { + if (putResult.result() != null) { + handler.complete(); } else { - handler.fail(putResult.cause()); + handler.fail("Can't destroy bot with address \"" + botAddress + "\" because it has been already destroyed"); } - }); - } else { - handler.fail(mapResult.cause()); - } - }); - } else { - handler.fail(lockAcquisitionResult.cause()); - } - }); + } else { + handler.fail(putResult.cause()); + } + }); + } else { + handler.fail(mapResult.cause()); + } + }); + } else { + handler.fail(lockAcquisitionResult.cause()); + } }); }); - clusterManager - .getVertx() - .deployVerticle(verticle, - clusterManager - .newDeploymentOpts() - .setConfig(new JsonObject() - .put("botAddress", botAddress) - .put("botAlias", botAddress) - .put("local", false)), - (deployed) -> { - if (deployed.failed()) { - logger.error("Can't deploy bot \"" + botAddress + "\"", deployed.cause()); - } - deploymentHandler.handle(deployed); - } - ); + verticle.start(botAddress, botAddress, false).doOnError(error -> { + logger.error("Can't deploy bot \"" + botAddress + "\"", error); + }).subscribeOn(deploymentScheduler).subscribe(v -> {}, err -> { + deploymentHandler.handle(Future.failedFuture(err)); + }, () -> { + deploymentHandler.handle(Future.succeededFuture()); + }); } private void putAllAsync(AsyncMap sharedMap, @@ -307,6 +295,5 @@ public class TDLibRemoteClient implements AutoCloseable { @Override public void close() { clusterManager.asFlux().blockFirst(); - vertxStatusScheduler.dispose(); } } diff --git a/src/main/java/it/tdlight/tdlibsession/td/easy/AsyncTdEasy.java b/src/main/java/it/tdlight/tdlibsession/td/easy/AsyncTdEasy.java index e276d30..79e14bf 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/easy/AsyncTdEasy.java +++ b/src/main/java/it/tdlight/tdlibsession/td/easy/AsyncTdEasy.java @@ -58,7 +58,7 @@ public class AsyncTdEasy { private static final Logger logger = LoggerFactory.getLogger(AsyncTdEasy.class); - private final Scheduler scheduler = Schedulers.newSingle("TdEasyUpdates"); + private final Scheduler scheduler = Schedulers.single(); private final ReplayProcessor authState = ReplayProcessor.create(1); private final ReplayProcessor requestedDefinitiveExit = ReplayProcessor.cacheLastOrDefault(false); private final ReplayProcessor settings = ReplayProcessor.cacheLast(); diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java b/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java index 76cc6cb..343482d 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java @@ -39,6 +39,8 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.Sinks; import reactor.core.publisher.Sinks.Many; +import reactor.core.scheduler.Scheduler; +import reactor.core.scheduler.Schedulers; public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements AsyncTdMiddle { @@ -47,6 +49,7 @@ public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements Asy public static final boolean OUTPUT_REQUESTS = false; public static final byte[] EMPTY = new byte[0]; + private final Scheduler tdMiddleScheduler = Schedulers.single(); private final Many tdClosed = Sinks.many().replay().latestOrDefault(false); private final DeliveryOptions deliveryOptions; private final DeliveryOptions deliveryOptionsWithTimeout; @@ -146,6 +149,7 @@ public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements Asy if (msg.succeeded()) { this.listen() .timeout(Duration.ofSeconds(30)) + .subscribeOn(tdMiddleScheduler) .subscribe(v -> {}, future::fail, future::complete); } else { future.fail(msg.cause()); @@ -241,7 +245,7 @@ public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements Asy tdClosed.tryEmitNext(true); } } - })); + })).subscribeOn(tdMiddleScheduler); } @Override @@ -297,6 +301,6 @@ public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements Asy } }).switchIfEmpty(Mono.fromSupplier(() -> { return TdResult.failed(new TdApi.Error(500, "Client is closed or response is empty")); - })); + })).subscribeOn(tdMiddleScheduler); } } diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/direct/AsyncTdMiddleLocal.java b/src/main/java/it/tdlight/tdlibsession/td/middle/direct/AsyncTdMiddleLocal.java index fe37da8..8b73f20 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/direct/AsyncTdMiddleLocal.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/direct/AsyncTdMiddleLocal.java @@ -1,6 +1,5 @@ package it.tdlight.tdlibsession.td.middle.direct; -import io.vertx.core.json.JsonObject; import it.tdlight.jni.TdApi; import it.tdlight.jni.TdApi.Function; import it.tdlight.jni.TdApi.Object; @@ -10,7 +9,6 @@ import it.tdlight.tdlibsession.td.middle.AsyncTdMiddle; import it.tdlight.tdlibsession.td.middle.TdClusterManager; import it.tdlight.tdlibsession.td.middle.client.AsyncTdMiddleEventBusClient; import it.tdlight.tdlibsession.td.middle.server.AsyncTdMiddleEventBusServer; -import it.tdlight.utils.MonoUtils; import java.util.Objects; import org.warp.commonutils.error.InitializationException; import reactor.core.publisher.Flux; @@ -35,14 +33,7 @@ public class AsyncTdMiddleLocal implements AsyncTdMiddle { } public Mono start() { - return Mono.create(sink -> { - masterClusterManager - .getVertx() - .deployVerticle(srv, - masterClusterManager.newDeploymentOpts().setConfig(new JsonObject().put("botAddress", botAddress).put("botAlias", botAlias).put("local", true)), - MonoUtils.toHandler(sink) - ); - }).onErrorMap(InitializationException::new).flatMap(_x -> { + return srv.start(botAddress, botAlias, true).onErrorMap(InitializationException::new).flatMap(_x -> { try { return AsyncTdMiddleEventBusClient.getAndDeployInstance(masterClusterManager, botAlias, botAddress, true).doOnNext(cli -> { this.cli.onNext(cli); diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java b/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java index 4f7ca38..05c798b 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java @@ -2,7 +2,6 @@ package it.tdlight.tdlibsession.td.middle.server; import static it.tdlight.tdlibsession.td.middle.client.AsyncTdMiddleEventBusClient.OUTPUT_REQUESTS; -import io.vertx.core.AbstractVerticle; import io.vertx.core.AsyncResult; import io.vertx.core.Future; import io.vertx.core.Handler; @@ -37,7 +36,7 @@ import reactor.core.publisher.Mono; import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; -public class AsyncTdMiddleEventBusServer extends AbstractVerticle { +public class AsyncTdMiddleEventBusServer { private static final Logger logger = LoggerFactory.getLogger(AsyncTdMiddleEventBusServer.class); @@ -69,7 +68,7 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { public AsyncTdMiddleEventBusServer(TdClusterManager clusterManager) { this.cluster = clusterManager; this.tdOptions = new AsyncTdDirectOptions(WAIT_DURATION, 1000); - this.tdSrvPoll = Schedulers.newSingle("TdSrvPoll"); + this.tdSrvPoll = Schedulers.single(); if (cluster.registerDefaultCodec(TdResultList.class, new TdResultListMessageCodec())) { cluster.registerDefaultCodec(ExecuteObject.class, new TdExecuteObjectMessageCodec()); cluster.registerDefaultCodec(TdResultMessage.class, new TdResultMessageCodec()); @@ -79,57 +78,52 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { } } - @Override - public void start(Promise startPromise) { - var botAddress = config().getString("botAddress"); - if (botAddress == null || botAddress.isEmpty()) { - throw new IllegalArgumentException("botAddress is not set!"); - } - this.botAddress = botAddress; - var botAlias = config().getString("botAlias"); - if (botAlias == null || botAlias.isEmpty()) { - throw new IllegalArgumentException("botAlias is not set!"); - } - this.botAlias = botAlias; - var local = config().getBoolean("local"); - if (local == null) { - throw new IllegalArgumentException("local is not set!"); - } - this.local = local; - this.td = new AsyncTdDirectImpl(botAlias); + public Mono start(String botAddress, String botAlias, boolean local) { + return Mono.create(sink -> { + if (botAddress == null || botAddress.isEmpty()) { + sink.error(new IllegalArgumentException("botAddress is not set!")); + } + this.botAddress = botAddress; + if (botAlias == null || botAlias.isEmpty()) { + sink.error(new IllegalArgumentException("botAlias is not set!")); + } + this.botAlias = botAlias; + this.local = local; + this.td = new AsyncTdDirectImpl(botAlias); - AtomicBoolean alreadyDeployed = new AtomicBoolean(false); - this.startConsumer = cluster.getEventBus().consumer(botAddress + ".start", (Message msg) -> { - if (alreadyDeployed.compareAndSet(false, true)) { - this.listen().then(this.pipe()).then(Mono.create(registrationSink -> { - this.isWorkingConsumer = cluster.getEventBus().consumer(botAddress + ".isWorking", (Message workingMsg) -> { - workingMsg.reply(EMPTY, cluster.newDeliveryOpts().setLocalOnly(local)); - }); - this.isWorkingConsumer.completionHandler(MonoUtils.toHandler(registrationSink)); - })).subscribeOn(this.tdSrvPoll) - .subscribe(v -> {}, ex -> { - logger.info(botAddress + " server deployed and started. succeeded: false"); - logger.error(ex.getLocalizedMessage(), ex); - msg.fail(500, ex.getLocalizedMessage()); - }, () -> { - logger.info(botAddress + " server deployed and started. succeeded: true"); - msg.reply(EMPTY); + AtomicBoolean alreadyDeployed = new AtomicBoolean(false); + this.startConsumer = cluster.getEventBus().consumer(botAddress + ".start", (Message msg) -> { + if (alreadyDeployed.compareAndSet(false, true)) { + this.listen().then(this.pipe()).then(Mono.create(registrationSink -> { + this.isWorkingConsumer = cluster.getEventBus().consumer(botAddress + ".isWorking", (Message workingMsg) -> { + workingMsg.reply(EMPTY, cluster.newDeliveryOpts().setLocalOnly(local)); }); - } else { - msg.reply(EMPTY); - } - }); - startConsumer.completionHandler(h -> { - logger.info(botAddress + " server deployed. succeeded: " + h.succeeded()); - if (h.succeeded()) { - logger.debug("Sending " + botAddress + ".readyToStart"); - cluster.getEventBus().request(botAddress + ".readyToStart", EMPTY, cluster.newDeliveryOpts().setSendTimeout(30000), msg -> { - startPromise.complete(h.result()); - }); - } else { - startPromise.fail(h.cause()); - } - }); + this.isWorkingConsumer.completionHandler(MonoUtils.toHandler(registrationSink)); + })).subscribeOn(this.tdSrvPoll) + .subscribe(v -> {}, ex -> { + logger.info(botAddress + " server deployed and started. succeeded: false"); + logger.error(ex.getLocalizedMessage(), ex); + msg.fail(500, ex.getLocalizedMessage()); + }, () -> { + logger.info(botAddress + " server deployed and started. succeeded: true"); + msg.reply(EMPTY); + }); + } else { + msg.reply(EMPTY); + } + }); + startConsumer.completionHandler(h -> { + logger.info(botAddress + " server deployed. succeeded: " + h.succeeded()); + if (h.succeeded()) { + logger.debug("Sending " + botAddress + ".readyToStart"); + cluster.getEventBus().request(botAddress + ".readyToStart", EMPTY, cluster.newDeliveryOpts().setSendTimeout(30000), msg -> { + sink.success(); + }); + } else { + sink.error(h.cause()); + } + }); + }).subscribeOn(tdSrvPoll); } public void onBeforeStop(Consumer> r) { @@ -140,11 +134,6 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { this.onAfterStopListeners.add(r); } - @Override - public void stop(Promise stopPromise) { - stopPromise.complete(); - } - private void runAll(List>> actions, Handler> resultHandler) { if (actions.isEmpty()) { resultHandler.handle(Future.succeededFuture()); @@ -202,8 +191,7 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { } }); executeConsumer.completionHandler(MonoUtils.toHandler(registrationSink)); - - }); + }).subscribeOn(tdSrvPoll); } private void undeploy(Runnable whenUndeployed) { @@ -237,12 +225,7 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { logger.error("An afterStop listener failed: " + onAfterStopHandler.cause()); } - vertx.undeploy(deploymentID(), undeployed -> { - if (undeployed.failed()) { - logger.error("Error when undeploying td verticle", undeployed.cause()); - } - whenUndeployed.run(); - }); + whenUndeployed.run(); }); }).subscribeOn(this.tdSrvPoll).subscribe(v -> {}, ex -> { logger.error("Error when stopping", ex); @@ -260,7 +243,7 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { .replace(" ", "") .replace(" = ", "=")); } - }).bufferTimeout(1000, local ? Duration.ofMillis(1) : Duration.ofMillis(100)) + }).bufferTimeout(tdOptions.getEventsSize(), local ? Duration.ofMillis(1) : Duration.ofMillis(100)) .windowTimeout(1, Duration.ofSeconds(5)) .flatMap(w -> w.defaultIfEmpty(Collections.emptyList())) .map(TdResultList::new).doFinally(s -> {