diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java b/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java index f61ab28..4f7ca38 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java @@ -56,7 +56,6 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { protected AsyncTdDirectImpl td; private final Scheduler tdSrvPoll; - private final Scheduler vertxStatusScheduler = Schedulers.newSingle("VertxStatus", false); /** * Value is not important, emits when a request is received */ @@ -82,56 +81,54 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { @Override public void start(Promise startPromise) { - vertxStatusScheduler.schedule(() -> { - var botAddress = config().getString("botAddress"); - if (botAddress == null || botAddress.isEmpty()) { - throw new IllegalArgumentException("botAddress is not set!"); - } - this.botAddress = botAddress; - var botAlias = config().getString("botAlias"); - if (botAlias == null || botAlias.isEmpty()) { - throw new IllegalArgumentException("botAlias is not set!"); - } - this.botAlias = botAlias; - var local = config().getBoolean("local"); - if (local == null) { - throw new IllegalArgumentException("local is not set!"); - } - this.local = local; - this.td = new AsyncTdDirectImpl(botAlias); + var botAddress = config().getString("botAddress"); + if (botAddress == null || botAddress.isEmpty()) { + throw new IllegalArgumentException("botAddress is not set!"); + } + this.botAddress = botAddress; + var botAlias = config().getString("botAlias"); + if (botAlias == null || botAlias.isEmpty()) { + throw new IllegalArgumentException("botAlias is not set!"); + } + this.botAlias = botAlias; + var local = config().getBoolean("local"); + if (local == null) { + throw new IllegalArgumentException("local is not set!"); + } + this.local = local; + this.td = new AsyncTdDirectImpl(botAlias); - AtomicBoolean alreadyDeployed = new AtomicBoolean(false); - this.startConsumer = cluster.getEventBus().consumer(botAddress + ".start", (Message msg) -> { - if (alreadyDeployed.compareAndSet(false, true)) { - this.listen().then(this.pipe()).then(Mono.create(registrationSink -> { - this.isWorkingConsumer = cluster.getEventBus().consumer(botAddress + ".isWorking", (Message workingMsg) -> { - workingMsg.reply(EMPTY, cluster.newDeliveryOpts().setLocalOnly(local)); - }); - this.isWorkingConsumer.completionHandler(MonoUtils.toHandler(registrationSink)); - })).subscribeOn(this.tdSrvPoll) - .subscribe(v -> {}, ex -> { - logger.info(botAddress + " server deployed and started. succeeded: false"); - logger.error(ex.getLocalizedMessage(), ex); - msg.fail(500, ex.getLocalizedMessage()); - }, () -> { - logger.info(botAddress + " server deployed and started. succeeded: true"); - msg.reply(EMPTY); - }); - } else { - msg.reply(EMPTY); - } - }); - startConsumer.completionHandler(h -> { - logger.info(botAddress + " server deployed. succeeded: " + h.succeeded()); - if (h.succeeded()) { - logger.debug("Sending " + botAddress + ".readyToStart"); - cluster.getEventBus().request(botAddress + ".readyToStart", EMPTY, cluster.newDeliveryOpts().setSendTimeout(30000), msg -> { - startPromise.complete(h.result()); + AtomicBoolean alreadyDeployed = new AtomicBoolean(false); + this.startConsumer = cluster.getEventBus().consumer(botAddress + ".start", (Message msg) -> { + if (alreadyDeployed.compareAndSet(false, true)) { + this.listen().then(this.pipe()).then(Mono.create(registrationSink -> { + this.isWorkingConsumer = cluster.getEventBus().consumer(botAddress + ".isWorking", (Message workingMsg) -> { + workingMsg.reply(EMPTY, cluster.newDeliveryOpts().setLocalOnly(local)); }); - } else { - startPromise.fail(h.cause()); - } - }); + this.isWorkingConsumer.completionHandler(MonoUtils.toHandler(registrationSink)); + })).subscribeOn(this.tdSrvPoll) + .subscribe(v -> {}, ex -> { + logger.info(botAddress + " server deployed and started. succeeded: false"); + logger.error(ex.getLocalizedMessage(), ex); + msg.fail(500, ex.getLocalizedMessage()); + }, () -> { + logger.info(botAddress + " server deployed and started. succeeded: true"); + msg.reply(EMPTY); + }); + } else { + msg.reply(EMPTY); + } + }); + startConsumer.completionHandler(h -> { + logger.info(botAddress + " server deployed. succeeded: " + h.succeeded()); + if (h.succeeded()) { + logger.debug("Sending " + botAddress + ".readyToStart"); + cluster.getEventBus().request(botAddress + ".readyToStart", EMPTY, cluster.newDeliveryOpts().setSendTimeout(30000), msg -> { + startPromise.complete(h.result()); + }); + } else { + startPromise.fail(h.cause()); + } }); } @@ -145,43 +142,7 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { @Override public void stop(Promise stopPromise) { - vertxStatusScheduler.schedule(() -> { - runAll(onBeforeStopListeners, onBeforeStopHandler -> { - if (onBeforeStopHandler.failed()) { - logger.error("A beforeStop listener failed: "+ onBeforeStopHandler.cause()); - } - - Mono.create(sink -> this.isWorkingConsumer.unregister(result -> { - if (result.failed()) { - logger.error("Can't unregister consumer", result.cause()); - } - this.startConsumer.unregister(result2 -> { - if (result2.failed()) { - logger.error("Can't unregister consumer", result2.cause()); - } - - this.executeConsumer.unregister(result4 -> { - if (result4.failed()) { - logger.error("Can't unregister consumer", result4.cause()); - } - - sink.success(); - }); - }); - })).doFinally(signalType -> { - logger.info("TdMiddle verticle \"" + botAddress + "\" stopped"); - - runAll(onAfterStopListeners, onAfterStopHandler -> { - if (onAfterStopHandler.failed()) { - logger.error("An afterStop listener failed: " + onAfterStopHandler.cause()); - } - stopPromise.complete(); - }); - }).subscribeOn(this.tdSrvPoll).subscribe(v -> {}, ex -> { - logger.error("Error when stopping", ex); - }, () -> {}); - }); - }); + stopPromise.complete(); } private void runAll(List>> actions, Handler> resultHandler) { @@ -246,11 +207,46 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { } private void undeploy(Runnable whenUndeployed) { - vertx.undeploy(deploymentID(), undeployed -> { - if (undeployed.failed()) { - logger.error("Error when undeploying td verticle", undeployed.cause()); + runAll(onBeforeStopListeners, onBeforeStopHandler -> { + if (onBeforeStopHandler.failed()) { + logger.error("A beforeStop listener failed: "+ onBeforeStopHandler.cause()); } - whenUndeployed.run(); + + Mono.create(sink -> this.isWorkingConsumer.unregister(result -> { + if (result.failed()) { + logger.error("Can't unregister consumer", result.cause()); + } + this.startConsumer.unregister(result2 -> { + if (result2.failed()) { + logger.error("Can't unregister consumer", result2.cause()); + } + + this.executeConsumer.unregister(result4 -> { + if (result4.failed()) { + logger.error("Can't unregister consumer", result4.cause()); + } + + sink.success(); + }); + }); + })).doFinally(signalType -> { + logger.info("TdMiddle verticle \"" + botAddress + "\" stopped"); + + runAll(onAfterStopListeners, onAfterStopHandler -> { + if (onAfterStopHandler.failed()) { + logger.error("An afterStop listener failed: " + onAfterStopHandler.cause()); + } + + vertx.undeploy(deploymentID(), undeployed -> { + if (undeployed.failed()) { + logger.error("Error when undeploying td verticle", undeployed.cause()); + } + whenUndeployed.run(); + }); + }); + }).subscribeOn(this.tdSrvPoll).subscribe(v -> {}, ex -> { + logger.error("Error when stopping", ex); + }, () -> {}); }); }