Word wrap

This commit is contained in:
Andrea Cavalli 2021-09-09 20:24:55 +02:00
parent eb6d6cf22b
commit 2b4933617c

View File

@ -120,7 +120,8 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
public final int botId; public final int botId;
public final boolean local; public final boolean local;
public OnSuccessfulStartRequestInfo(AsyncTdDirectImpl td, String botAddress, String botAlias, int botId, boolean local) { public OnSuccessfulStartRequestInfo(AsyncTdDirectImpl td, String botAddress, String botAlias, int botId,
boolean local) {
this.td = td; this.td = td;
this.botAddress = botAddress; this.botAddress = botAddress;
this.botAlias = botAlias; this.botAlias = botAlias;
@ -129,7 +130,8 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
} }
} }
private Mono<Void> onSuccessfulStartRequest(AsyncTdDirectImpl td, String botAddress, String botAlias, int botId, boolean local) { private Mono<Void> onSuccessfulStartRequest(AsyncTdDirectImpl td, String botAddress, String botAlias, int botId,
boolean local) {
return td return td
.initialize() .initialize()
.then(this.pipe(td, botAddress, botAlias, botId, local)) .then(this.pipe(td, botAddress, botAlias, botId, local))
@ -178,7 +180,8 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
msg.reply(replyValue, replyOpts); msg.reply(replyValue, replyOpts);
return response; return response;
} catch (Exception ex) { } catch (Exception ex) {
logger.debug("Replying with error response: {}. Request was {}", ex.getLocalizedMessage(), request); logger.debug("Replying with error response: {}. Request was {}", ex.getLocalizedMessage(),
request);
msg.fail(500, ex.getLocalizedMessage()); msg.fail(500, ex.getLocalizedMessage());
throw ex; throw ex;
} }
@ -202,7 +205,8 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
.subscribeOn(Schedulers.parallel()) .subscribeOn(Schedulers.parallel())
.subscribe(v -> {}, ex -> logger.error("Error when processing a read-binlog request", ex)); .subscribe(v -> {}, ex -> logger.error("Error when processing a read-binlog request", ex));
MessageConsumer<byte[]> readyToReceiveConsumer = vertx.eventBus().consumer(botAddress + ".ready-to-receive"); MessageConsumer<byte[]> readyToReceiveConsumer = vertx.eventBus().consumer(botAddress
+ ".ready-to-receive");
if (this.readyToReceiveConsumer.tryEmitValue(readyToReceiveConsumer).isFailure()) { if (this.readyToReceiveConsumer.tryEmitValue(readyToReceiveConsumer).isFailure()) {
registrationSink.error(new IllegalStateException("Failed to set readyToReceiveConsumer")); registrationSink.error(new IllegalStateException("Failed to set readyToReceiveConsumer"));
return; return;
@ -224,7 +228,8 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
.doOnError(ex -> logger.error("Error when processing a ready-to-receive request", ex)) .doOnError(ex -> logger.error("Error when processing a ready-to-receive request", ex))
.doOnNext(s -> logger.trace("Replying to ready-to-receive request")) .doOnNext(s -> logger.trace("Replying to ready-to-receive request"))
.flatMapMany(tuple -> { .flatMapMany(tuple -> {
var opts = new DeliveryOptions().setLocalOnly(local).setSendTimeout(Duration.ofSeconds(10).toMillis()); var opts = new DeliveryOptions().setLocalOnly(local)
.setSendTimeout(Duration.ofSeconds(10).toMillis());
tuple.getT1().reply(EMPTY, opts); tuple.getT1().reply(EMPTY, opts);
logger.trace("Replied to ready-to-receive"); logger.trace("Replied to ready-to-receive");
@ -253,7 +258,8 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
pingConsumer.endHandler(h -> sink.complete()); pingConsumer.endHandler(h -> sink.complete());
}) })
.concatMap(msg -> Mono.fromCallable(() -> { .concatMap(msg -> Mono.fromCallable(() -> {
var opts = new DeliveryOptions().setLocalOnly(local).setSendTimeout(Duration.ofSeconds(10).toMillis()); var opts = new DeliveryOptions().setLocalOnly(local)
.setSendTimeout(Duration.ofSeconds(10).toMillis());
msg.reply(EMPTY, opts); msg.reply(EMPTY, opts);
return null; return null;
})) }))
@ -357,7 +363,8 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
} }
})) }))
.limitRate(Math.max(1, tdOptions.getEventsSize())) .limitRate(Math.max(1, tdOptions.getEventsSize()))
.transform(normal -> new BufferTimeOutPublisher<>(normal, Math.max(1, tdOptions.getEventsSize()), local ? Duration.ofMillis(1) : Duration.ofMillis(100), false)) .transform(normal -> new BufferTimeOutPublisher<>(normal,Math.max(1, tdOptions.getEventsSize()),
local ? Duration.ofMillis(1) : Duration.ofMillis(100), false))
//.bufferTimeout(Math.max(1, tdOptions.getEventsSize()), local ? Duration.ofMillis(1) : Duration.ofMillis(100)) //.bufferTimeout(Math.max(1, tdOptions.getEventsSize()), local ? Duration.ofMillis(1) : Duration.ofMillis(100))
//.map(List::of) //.map(List::of)
.limitRate(Math.max(1, tdOptions.getEventsSize())) .limitRate(Math.max(1, tdOptions.getEventsSize()))
@ -411,11 +418,13 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
if (ex instanceof ReplyException) { if (ex instanceof ReplyException) {
ReplyException replyException = (ReplyException) ex; ReplyException replyException = (ReplyException) ex;
if (replyException.failureCode() == -1 && replyException.failureType() == ReplyFailure.NO_HANDLERS) { if (replyException.failureCode() == -1 && replyException.failureType() == ReplyFailure.NO_HANDLERS) {
logger.warn("Undeploying, the flux has been terminated because no more handlers are available on the event bus. {}", replyException.getMessage()); logger.warn("Undeploying, the flux has been terminated because no more"
+ " handlers are available on the event bus. {}", replyException.getMessage());
printDefaultException = false; printDefaultException = false;
} }
} else if (ex instanceof ConnectException || ex instanceof java.nio.channels.ClosedChannelException) { } else if (ex instanceof ConnectException || ex instanceof java.nio.channels.ClosedChannelException) {
logger.warn("Undeploying, the flux has been terminated because the consumer disconnected from the event bus. {}", ex.getMessage()); logger.warn("Undeploying, the flux has been terminated because the consumer"
+ " disconnected from the event bus. {}", ex.getMessage());
printDefaultException = false; printDefaultException = false;
} }
if (printDefaultException) { if (printDefaultException) {