From a157f6f6ecb49794a57b1854e2703b511a93d339 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Sat, 17 Oct 2020 18:28:54 +0200 Subject: [PATCH] Reduce allocations --- .../client/AsyncTdMiddleEventBusClient.java | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java b/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java index 01b9b5c..5fcfaa7 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java @@ -6,6 +6,7 @@ import io.vertx.core.AbstractVerticle; import io.vertx.core.AsyncResult; import io.vertx.core.DeploymentOptions; import io.vertx.core.Promise; +import io.vertx.core.eventbus.DeliveryOptions; import io.vertx.core.eventbus.Message; import io.vertx.core.json.JsonObject; import it.tdlight.common.ConstructorDetector; @@ -46,6 +47,8 @@ public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements Asy public static final byte[] EMPTY = new byte[0]; private final ReplayProcessor tdClosed = ReplayProcessor.cacheLastOrDefault(false); + private final DeliveryOptions deliveryOptions; + private final DeliveryOptions deliveryOptionsWithTimeout; private ReplayProcessor> incomingUpdatesCo = ReplayProcessor.cacheLast(); @@ -66,6 +69,8 @@ public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements Asy cluster.registerDefaultCodec(value, new TdMessageCodec(value)); } } + this.deliveryOptions = cluster.newDeliveryOpts().setLocalOnly(local); + this.deliveryOptionsWithTimeout = cluster.newDeliveryOpts().setLocalOnly(local).setSendTimeout(10000); } public static Mono getAndDeployInstance(TdClusterManager clusterManager, String botAlias, String botAddress, boolean local) throws InitializationException { @@ -120,18 +125,18 @@ public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements Asy logger.error("Requesting " + botAddress + ".ping"); cluster .getEventBus() - .request(botAddress + ".ping", EMPTY, cluster.newDeliveryOpts().setLocalOnly(local), pingMsg -> { + .request(botAddress + ".ping", EMPTY, deliveryOptions, pingMsg -> { if (pingMsg.succeeded()) { logger.error("Received ping reply (succeeded)"); logger.error("Requesting " + botAddress + ".start"); cluster .getEventBus() - .request(botAddress + ".start", EMPTY, cluster.newDeliveryOpts().setLocalOnly(local).setSendTimeout(10000), startMsg -> { + .request(botAddress + ".start", EMPTY, deliveryOptionsWithTimeout, startMsg -> { if (startMsg.succeeded()) { logger.error("Requesting " + botAddress + ".isWorking"); cluster .getEventBus() - .request(botAddress + ".isWorking", EMPTY, cluster.newDeliveryOpts().setLocalOnly(local).setSendTimeout(10000), msg -> { + .request(botAddress + ".isWorking", EMPTY, deliveryOptionsWithTimeout, msg -> { if (msg.succeeded()) { this.listen().then(this.pipe()).timeout(Duration.ofSeconds(10)).subscribe(v -> {}, future::fail, future::complete); } else { @@ -246,7 +251,7 @@ public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements Asy .flatMap(_x -> Mono.>create(sink -> { cluster.getEventBus().request(botAddress + ".getNextUpdatesBlock", EMPTY, - cluster.newDeliveryOpts().setLocalOnly(local), + deliveryOptions, msg -> { if (msg.failed()) { //if (System.currentTimeMillis() - initTime <= 30000) { @@ -315,7 +320,7 @@ public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements Asy .getEventBus() .request(botAddress + ".execute", req, - cluster.newDeliveryOpts().setLocalOnly(local), + deliveryOptions, (AsyncResult> event) -> { try { if (event.succeeded()) {