Reduce allocations

This commit is contained in:
Andrea Cavalli 2020-10-17 18:28:54 +02:00
parent a7223c4d83
commit a157f6f6ec

View File

@ -6,6 +6,7 @@ import io.vertx.core.AbstractVerticle;
import io.vertx.core.AsyncResult; import io.vertx.core.AsyncResult;
import io.vertx.core.DeploymentOptions; import io.vertx.core.DeploymentOptions;
import io.vertx.core.Promise; import io.vertx.core.Promise;
import io.vertx.core.eventbus.DeliveryOptions;
import io.vertx.core.eventbus.Message; import io.vertx.core.eventbus.Message;
import io.vertx.core.json.JsonObject; import io.vertx.core.json.JsonObject;
import it.tdlight.common.ConstructorDetector; import it.tdlight.common.ConstructorDetector;
@ -46,6 +47,8 @@ public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements Asy
public static final byte[] EMPTY = new byte[0]; public static final byte[] EMPTY = new byte[0];
private final ReplayProcessor<Boolean> tdClosed = ReplayProcessor.cacheLastOrDefault(false); private final ReplayProcessor<Boolean> tdClosed = ReplayProcessor.cacheLastOrDefault(false);
private final DeliveryOptions deliveryOptions;
private final DeliveryOptions deliveryOptionsWithTimeout;
private ReplayProcessor<Flux<Update>> incomingUpdatesCo = ReplayProcessor.cacheLast(); private ReplayProcessor<Flux<Update>> incomingUpdatesCo = ReplayProcessor.cacheLast();
@ -66,6 +69,8 @@ public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements Asy
cluster.registerDefaultCodec(value, new TdMessageCodec(value)); cluster.registerDefaultCodec(value, new TdMessageCodec(value));
} }
} }
this.deliveryOptions = cluster.newDeliveryOpts().setLocalOnly(local);
this.deliveryOptionsWithTimeout = cluster.newDeliveryOpts().setLocalOnly(local).setSendTimeout(10000);
} }
public static Mono<AsyncTdMiddleEventBusClient> getAndDeployInstance(TdClusterManager clusterManager, String botAlias, String botAddress, boolean local) throws InitializationException { public static Mono<AsyncTdMiddleEventBusClient> getAndDeployInstance(TdClusterManager clusterManager, String botAlias, String botAddress, boolean local) throws InitializationException {
@ -120,18 +125,18 @@ public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements Asy
logger.error("Requesting " + botAddress + ".ping"); logger.error("Requesting " + botAddress + ".ping");
cluster cluster
.getEventBus() .getEventBus()
.request(botAddress + ".ping", EMPTY, cluster.newDeliveryOpts().setLocalOnly(local), pingMsg -> { .request(botAddress + ".ping", EMPTY, deliveryOptions, pingMsg -> {
if (pingMsg.succeeded()) { if (pingMsg.succeeded()) {
logger.error("Received ping reply (succeeded)"); logger.error("Received ping reply (succeeded)");
logger.error("Requesting " + botAddress + ".start"); logger.error("Requesting " + botAddress + ".start");
cluster cluster
.getEventBus() .getEventBus()
.request(botAddress + ".start", EMPTY, cluster.newDeliveryOpts().setLocalOnly(local).setSendTimeout(10000), startMsg -> { .request(botAddress + ".start", EMPTY, deliveryOptionsWithTimeout, startMsg -> {
if (startMsg.succeeded()) { if (startMsg.succeeded()) {
logger.error("Requesting " + botAddress + ".isWorking"); logger.error("Requesting " + botAddress + ".isWorking");
cluster cluster
.getEventBus() .getEventBus()
.request(botAddress + ".isWorking", EMPTY, cluster.newDeliveryOpts().setLocalOnly(local).setSendTimeout(10000), msg -> { .request(botAddress + ".isWorking", EMPTY, deliveryOptionsWithTimeout, msg -> {
if (msg.succeeded()) { if (msg.succeeded()) {
this.listen().then(this.pipe()).timeout(Duration.ofSeconds(10)).subscribe(v -> {}, future::fail, future::complete); this.listen().then(this.pipe()).timeout(Duration.ofSeconds(10)).subscribe(v -> {}, future::fail, future::complete);
} else { } else {
@ -246,7 +251,7 @@ public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements Asy
.flatMap(_x -> Mono.<Flux<TdApi.Update>>create(sink -> { .flatMap(_x -> Mono.<Flux<TdApi.Update>>create(sink -> {
cluster.getEventBus().<TdOptionalList>request(botAddress + ".getNextUpdatesBlock", cluster.getEventBus().<TdOptionalList>request(botAddress + ".getNextUpdatesBlock",
EMPTY, EMPTY,
cluster.newDeliveryOpts().setLocalOnly(local), deliveryOptions,
msg -> { msg -> {
if (msg.failed()) { if (msg.failed()) {
//if (System.currentTimeMillis() - initTime <= 30000) { //if (System.currentTimeMillis() - initTime <= 30000) {
@ -315,7 +320,7 @@ public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements Asy
.getEventBus() .getEventBus()
.request(botAddress + ".execute", .request(botAddress + ".execute",
req, req,
cluster.newDeliveryOpts().setLocalOnly(local), deliveryOptions,
(AsyncResult<Message<TdResultMessage>> event) -> { (AsyncResult<Message<TdResultMessage>> event) -> {
try { try {
if (event.succeeded()) { if (event.succeeded()) {