Update AsyncTdMiddleEventBusServer.java

This commit is contained in:
Andrea Cavalli 2021-01-13 18:21:34 +01:00
parent 449d513f1a
commit ed7d12e9ba

View File

@ -56,7 +56,6 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
protected AsyncTdDirectImpl td; protected AsyncTdDirectImpl td;
private final Scheduler tdSrvPoll; private final Scheduler tdSrvPoll;
private final Scheduler vertxStatusScheduler = Schedulers.newSingle("VertxStatus", false);
/** /**
* Value is not important, emits when a request is received * Value is not important, emits when a request is received
*/ */
@ -82,56 +81,54 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
@Override @Override
public void start(Promise<Void> startPromise) { public void start(Promise<Void> startPromise) {
vertxStatusScheduler.schedule(() -> { var botAddress = config().getString("botAddress");
var botAddress = config().getString("botAddress"); if (botAddress == null || botAddress.isEmpty()) {
if (botAddress == null || botAddress.isEmpty()) { throw new IllegalArgumentException("botAddress is not set!");
throw new IllegalArgumentException("botAddress is not set!"); }
} this.botAddress = botAddress;
this.botAddress = botAddress; var botAlias = config().getString("botAlias");
var botAlias = config().getString("botAlias"); if (botAlias == null || botAlias.isEmpty()) {
if (botAlias == null || botAlias.isEmpty()) { throw new IllegalArgumentException("botAlias is not set!");
throw new IllegalArgumentException("botAlias is not set!"); }
} this.botAlias = botAlias;
this.botAlias = botAlias; var local = config().getBoolean("local");
var local = config().getBoolean("local"); if (local == null) {
if (local == null) { throw new IllegalArgumentException("local is not set!");
throw new IllegalArgumentException("local is not set!"); }
} this.local = local;
this.local = local; this.td = new AsyncTdDirectImpl(botAlias);
this.td = new AsyncTdDirectImpl(botAlias);
AtomicBoolean alreadyDeployed = new AtomicBoolean(false); AtomicBoolean alreadyDeployed = new AtomicBoolean(false);
this.startConsumer = cluster.getEventBus().consumer(botAddress + ".start", (Message<byte[]> msg) -> { this.startConsumer = cluster.getEventBus().consumer(botAddress + ".start", (Message<byte[]> msg) -> {
if (alreadyDeployed.compareAndSet(false, true)) { if (alreadyDeployed.compareAndSet(false, true)) {
this.listen().then(this.pipe()).then(Mono.<Void>create(registrationSink -> { this.listen().then(this.pipe()).then(Mono.<Void>create(registrationSink -> {
this.isWorkingConsumer = cluster.getEventBus().consumer(botAddress + ".isWorking", (Message<byte[]> workingMsg) -> { this.isWorkingConsumer = cluster.getEventBus().consumer(botAddress + ".isWorking", (Message<byte[]> workingMsg) -> {
workingMsg.reply(EMPTY, cluster.newDeliveryOpts().setLocalOnly(local)); workingMsg.reply(EMPTY, cluster.newDeliveryOpts().setLocalOnly(local));
});
this.isWorkingConsumer.completionHandler(MonoUtils.toHandler(registrationSink));
})).subscribeOn(this.tdSrvPoll)
.subscribe(v -> {}, ex -> {
logger.info(botAddress + " server deployed and started. succeeded: false");
logger.error(ex.getLocalizedMessage(), ex);
msg.fail(500, ex.getLocalizedMessage());
}, () -> {
logger.info(botAddress + " server deployed and started. succeeded: true");
msg.reply(EMPTY);
});
} else {
msg.reply(EMPTY);
}
});
startConsumer.completionHandler(h -> {
logger.info(botAddress + " server deployed. succeeded: " + h.succeeded());
if (h.succeeded()) {
logger.debug("Sending " + botAddress + ".readyToStart");
cluster.getEventBus().request(botAddress + ".readyToStart", EMPTY, cluster.newDeliveryOpts().setSendTimeout(30000), msg -> {
startPromise.complete(h.result());
}); });
} else { this.isWorkingConsumer.completionHandler(MonoUtils.toHandler(registrationSink));
startPromise.fail(h.cause()); })).subscribeOn(this.tdSrvPoll)
} .subscribe(v -> {}, ex -> {
}); logger.info(botAddress + " server deployed and started. succeeded: false");
logger.error(ex.getLocalizedMessage(), ex);
msg.fail(500, ex.getLocalizedMessage());
}, () -> {
logger.info(botAddress + " server deployed and started. succeeded: true");
msg.reply(EMPTY);
});
} else {
msg.reply(EMPTY);
}
});
startConsumer.completionHandler(h -> {
logger.info(botAddress + " server deployed. succeeded: " + h.succeeded());
if (h.succeeded()) {
logger.debug("Sending " + botAddress + ".readyToStart");
cluster.getEventBus().request(botAddress + ".readyToStart", EMPTY, cluster.newDeliveryOpts().setSendTimeout(30000), msg -> {
startPromise.complete(h.result());
});
} else {
startPromise.fail(h.cause());
}
}); });
} }
@ -145,43 +142,7 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
@Override @Override
public void stop(Promise<Void> stopPromise) { public void stop(Promise<Void> stopPromise) {
vertxStatusScheduler.schedule(() -> { stopPromise.complete();
runAll(onBeforeStopListeners, onBeforeStopHandler -> {
if (onBeforeStopHandler.failed()) {
logger.error("A beforeStop listener failed: "+ onBeforeStopHandler.cause());
}
Mono.create(sink -> this.isWorkingConsumer.unregister(result -> {
if (result.failed()) {
logger.error("Can't unregister consumer", result.cause());
}
this.startConsumer.unregister(result2 -> {
if (result2.failed()) {
logger.error("Can't unregister consumer", result2.cause());
}
this.executeConsumer.unregister(result4 -> {
if (result4.failed()) {
logger.error("Can't unregister consumer", result4.cause());
}
sink.success();
});
});
})).doFinally(signalType -> {
logger.info("TdMiddle verticle \"" + botAddress + "\" stopped");
runAll(onAfterStopListeners, onAfterStopHandler -> {
if (onAfterStopHandler.failed()) {
logger.error("An afterStop listener failed: " + onAfterStopHandler.cause());
}
stopPromise.complete();
});
}).subscribeOn(this.tdSrvPoll).subscribe(v -> {}, ex -> {
logger.error("Error when stopping", ex);
}, () -> {});
});
});
} }
private void runAll(List<Consumer<Promise<Void>>> actions, Handler<AsyncResult<Void>> resultHandler) { private void runAll(List<Consumer<Promise<Void>>> actions, Handler<AsyncResult<Void>> resultHandler) {
@ -246,11 +207,46 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
} }
private void undeploy(Runnable whenUndeployed) { private void undeploy(Runnable whenUndeployed) {
vertx.undeploy(deploymentID(), undeployed -> { runAll(onBeforeStopListeners, onBeforeStopHandler -> {
if (undeployed.failed()) { if (onBeforeStopHandler.failed()) {
logger.error("Error when undeploying td verticle", undeployed.cause()); logger.error("A beforeStop listener failed: "+ onBeforeStopHandler.cause());
} }
whenUndeployed.run();
Mono.create(sink -> this.isWorkingConsumer.unregister(result -> {
if (result.failed()) {
logger.error("Can't unregister consumer", result.cause());
}
this.startConsumer.unregister(result2 -> {
if (result2.failed()) {
logger.error("Can't unregister consumer", result2.cause());
}
this.executeConsumer.unregister(result4 -> {
if (result4.failed()) {
logger.error("Can't unregister consumer", result4.cause());
}
sink.success();
});
});
})).doFinally(signalType -> {
logger.info("TdMiddle verticle \"" + botAddress + "\" stopped");
runAll(onAfterStopListeners, onAfterStopHandler -> {
if (onAfterStopHandler.failed()) {
logger.error("An afterStop listener failed: " + onAfterStopHandler.cause());
}
vertx.undeploy(deploymentID(), undeployed -> {
if (undeployed.failed()) {
logger.error("Error when undeploying td verticle", undeployed.cause());
}
whenUndeployed.run();
});
});
}).subscribeOn(this.tdSrvPoll).subscribe(v -> {}, ex -> {
logger.error("Error when stopping", ex);
}, () -> {});
}); });
} }