diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java b/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java index f8c58f2..0f3c905 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java @@ -120,7 +120,8 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { public final int botId; public final boolean local; - public OnSuccessfulStartRequestInfo(AsyncTdDirectImpl td, String botAddress, String botAlias, int botId, boolean local) { + public OnSuccessfulStartRequestInfo(AsyncTdDirectImpl td, String botAddress, String botAlias, int botId, + boolean local) { this.td = td; this.botAddress = botAddress; this.botAlias = botAlias; @@ -129,7 +130,8 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { } } - private Mono onSuccessfulStartRequest(AsyncTdDirectImpl td, String botAddress, String botAlias, int botId, boolean local) { + private Mono onSuccessfulStartRequest(AsyncTdDirectImpl td, String botAddress, String botAlias, int botId, + boolean local) { return td .initialize() .then(this.pipe(td, botAddress, botAlias, botId, local)) @@ -178,7 +180,8 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { msg.reply(replyValue, replyOpts); return response; } catch (Exception ex) { - logger.debug("Replying with error response: {}. Request was {}", ex.getLocalizedMessage(), request); + logger.debug("Replying with error response: {}. Request was {}", ex.getLocalizedMessage(), + request); msg.fail(500, ex.getLocalizedMessage()); throw ex; } @@ -202,7 +205,8 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { .subscribeOn(Schedulers.parallel()) .subscribe(v -> {}, ex -> logger.error("Error when processing a read-binlog request", ex)); - MessageConsumer readyToReceiveConsumer = vertx.eventBus().consumer(botAddress + ".ready-to-receive"); + MessageConsumer readyToReceiveConsumer = vertx.eventBus().consumer(botAddress + + ".ready-to-receive"); if (this.readyToReceiveConsumer.tryEmitValue(readyToReceiveConsumer).isFailure()) { registrationSink.error(new IllegalStateException("Failed to set readyToReceiveConsumer")); return; @@ -224,7 +228,8 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { .doOnError(ex -> logger.error("Error when processing a ready-to-receive request", ex)) .doOnNext(s -> logger.trace("Replying to ready-to-receive request")) .flatMapMany(tuple -> { - var opts = new DeliveryOptions().setLocalOnly(local).setSendTimeout(Duration.ofSeconds(10).toMillis()); + var opts = new DeliveryOptions().setLocalOnly(local) + .setSendTimeout(Duration.ofSeconds(10).toMillis()); tuple.getT1().reply(EMPTY, opts); logger.trace("Replied to ready-to-receive"); @@ -253,7 +258,8 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { pingConsumer.endHandler(h -> sink.complete()); }) .concatMap(msg -> Mono.fromCallable(() -> { - var opts = new DeliveryOptions().setLocalOnly(local).setSendTimeout(Duration.ofSeconds(10).toMillis()); + var opts = new DeliveryOptions().setLocalOnly(local) + .setSendTimeout(Duration.ofSeconds(10).toMillis()); msg.reply(EMPTY, opts); return null; })) @@ -357,7 +363,8 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { } })) .limitRate(Math.max(1, tdOptions.getEventsSize())) - .transform(normal -> new BufferTimeOutPublisher<>(normal, Math.max(1, tdOptions.getEventsSize()), local ? Duration.ofMillis(1) : Duration.ofMillis(100), false)) + .transform(normal -> new BufferTimeOutPublisher<>(normal,Math.max(1, tdOptions.getEventsSize()), + local ? Duration.ofMillis(1) : Duration.ofMillis(100), false)) //.bufferTimeout(Math.max(1, tdOptions.getEventsSize()), local ? Duration.ofMillis(1) : Duration.ofMillis(100)) //.map(List::of) .limitRate(Math.max(1, tdOptions.getEventsSize())) @@ -411,11 +418,13 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { if (ex instanceof ReplyException) { ReplyException replyException = (ReplyException) ex; if (replyException.failureCode() == -1 && replyException.failureType() == ReplyFailure.NO_HANDLERS) { - logger.warn("Undeploying, the flux has been terminated because no more handlers are available on the event bus. {}", replyException.getMessage()); + logger.warn("Undeploying, the flux has been terminated because no more" + + " handlers are available on the event bus. {}", replyException.getMessage()); printDefaultException = false; } } else if (ex instanceof ConnectException || ex instanceof java.nio.channels.ClosedChannelException) { - logger.warn("Undeploying, the flux has been terminated because the consumer disconnected from the event bus. {}", ex.getMessage()); + logger.warn("Undeploying, the flux has been terminated because the consumer" + + " disconnected from the event bus. {}", ex.getMessage()); printDefaultException = false; } if (printDefaultException) {