diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java b/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java index b197813..f17a053 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java @@ -48,6 +48,8 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle { private final Empty updatesStreamEnd = Sinks.one(); // This will only result in a crash, never completes in other ways private final Empty crash = Sinks.one(); + // This will only result in a successful completion, never completes in other ways + private final Empty pingFail = Sinks.one(); private int botId; private String botAddress; @@ -131,29 +133,71 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle { if (local) { return Mono.empty(); } + logger.trace("Requesting bots.start-bot"); return cluster.getEventBus() .rxRequest("bots.start-bot", msg).as(MonoUtils::toMono) - .publishOn(Schedulers.boundedElastic()); + .doOnSuccess(s -> logger.trace("bots.start-bot returned successfully")) + .subscribeOn(Schedulers.boundedElastic()); })) - .then(); + .then(setupPing()); }) .publishOn(Schedulers.single()); } - private Mono setupUpdatesListener() { - return MonoUtils - .fromBlockingMaybe(() -> { - MessageConsumer updateConsumer = MessageConsumer.newInstance(cluster.getEventBus().consumer( - botAddress + ".updates").setMaxBufferedMessages(5000).getDelegate()); + private Mono setupPing() { + return Mono.fromCallable(() -> { + logger.trace("Setting up ping"); + // Disable ping on local servers + if (!local) { + Mono + .defer(() -> cluster.getEventBus().rxRequest(botAddress + ".ping", + EMPTY, + deliveryOptionsWithTimeout + ).as(MonoUtils::toMono)) + .flatMap(msg -> Mono.fromCallable(msg::body).subscribeOn(Schedulers.boundedElastic())) + .repeatWhen(l -> l.delayElements(Duration.ofSeconds(10)).takeWhile(x -> true)) + .takeUntilOther(Mono.firstWithSignal(this.updatesStreamEnd.asMono().doOnTerminate(() -> { + logger.trace("About to kill pinger because updates stream ended"); + }), this.crash.asMono().onErrorResume(ex -> Mono.empty()).doOnTerminate(() -> { + logger.trace("About to kill pinger because it has seen a crash signal"); + }))) + .doOnNext(s -> logger.warn("REPEATING PING")) + .map(x -> 1) + .defaultIfEmpty(0) + .doOnNext(s -> logger.warn("PING")) + .then() + .doOnNext(s -> logger.warn("END PING")) + .then(MonoUtils.emitEmpty(this.pingFail)) + .subscribeOn(Schedulers.single()) + .subscribe(); + } + logger.trace("Ping setup success"); + return null; + }).subscribeOn(Schedulers.boundedElastic()); + } + private Mono setupUpdatesListener() { + return Mono + .fromRunnable(() -> logger.trace("Setting up updates listener...")) + .then(MonoUtils.>fromBlockingSingle(() -> { + return MessageConsumer.newInstance(cluster.getEventBus().consumer(botAddress + ".updates") + .setMaxBufferedMessages(5000) + .getDelegate()); + })) + .flatMap(updateConsumer -> { // Here the updates will be piped from the server to the client var updateConsumerFlux = MonoUtils.fromConsumer(updateConsumer); // Return when the registration of all the consumers has been done across the cluster - return MonoUtils - .emitValue(updates, updateConsumerFlux) - .then(updateConsumer.rxCompletionHandler().as(MonoUtils::toMono)); + return Mono + .fromRunnable(() -> logger.trace("Emitting updates flux to sink")) + .then(MonoUtils.emitValue(updates, updateConsumerFlux)) + .doOnSuccess(s -> logger.trace("Emitted updates flux to sink")) + .doOnSuccess(s -> logger.trace("Waiting to register update consumer across the cluster")) + .then(updateConsumer.rxCompletionHandler().as(MonoUtils::toMono)) + .doOnSuccess(s -> logger.trace("Registered update consumer across the cluster")); }) + .doOnSuccess(s ->logger.trace("Set up updates listener")) .then(); } @@ -163,22 +207,24 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle { return Mono .fromRunnable(() -> logger.trace("Called receive() from parent")) .doOnSuccess(s -> logger.trace("Sending ready-to-receive")) - .then() .doOnSuccess(s -> logger.trace("Sent ready-to-receive, received reply")) .doOnSuccess(s -> logger.trace("About to read updates flux")) .then(updates.asMono().publishOn(Schedulers.single())) .timeout(Duration.ofSeconds(5)) .publishOn(Schedulers.single()) - .flatMapMany(tdResultListFlux -> tdResultListFlux.publishOn(Schedulers.single())) - .timeout(Duration.ofMinutes(1), Mono.fromCallable(() -> { - var ex = new ConnectException("Server did not respond to 12 pings after 1 minute (5 seconds per ping)"); - ex.setStackTrace(new StackTraceElement[0]); - throw ex; - })) - .doOnSubscribe(s -> cluster.getEventBus().send(botAddress + ".ready-to-receive", - EMPTY, - deliveryOptionsWithTimeout - )) + .flatMapMany(updatesFlux -> updatesFlux.publishOn(Schedulers.single())) + .takeUntilOther(Flux + .merge( + crash.asMono() + .onErrorResume(ex -> Mono.empty()), + pingFail.asMono() + .then(Mono.fromCallable(() -> { + var ex = new ConnectException("Server did not respond to ping"); + ex.setStackTrace(new StackTraceElement[0]); + throw ex; + }).onErrorResume(ex -> MonoUtils.emitError(crash, ex))) + ) + ) .flatMapSequential(updates -> { if (updates.succeeded()) { return Flux.fromIterable(updates.value()); @@ -188,7 +234,10 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle { }) .publishOn(Schedulers.single()) .flatMapSequential(this::interceptUpdate) + // Redirect errors to crash sink .doOnError(crash::tryEmitError) + .onErrorResume(ex -> Mono.empty()) + .doOnTerminate(updatesStreamEnd::tryEmitEmpty) .publishOn(Schedulers.single()); } diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/direct/AsyncTdMiddleDirect.java b/src/main/java/it/tdlight/tdlibsession/td/middle/direct/AsyncTdMiddleDirect.java index 7a0dc9e..d414882 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/direct/AsyncTdMiddleDirect.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/direct/AsyncTdMiddleDirect.java @@ -88,7 +88,7 @@ public class AsyncTdMiddleDirect extends AbstractVerticle implements AsyncTdMidd return td .receive(new AsyncTdDirectOptions(WAIT_DURATION, 100)) .takeUntilOther(closeRequest.asMono()) - .doOnNext(s -> logger.trace("Received update from tdlib: {}", s)) + .doOnNext(s -> logger.trace("Received update from tdlib: {}", s.getClass().getSimpleName())) .doOnError(ex -> logger.info("TdMiddle verticle error", ex)) .doOnTerminate(() -> logger.debug("TdMiddle verticle stopped")) .subscribeOn(Schedulers.boundedElastic()) diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java b/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java index d048376..3c077ee 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java @@ -141,15 +141,21 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { executeConsumer.handler(sink::next); executeConsumer.endHandler(h -> sink.complete()); }) - .flatMapSequential(msg -> { - logger.trace("Received execute request {}", msg.body()); - var request = overrideRequest(msg.body().getRequest(), botId); + .flatMapSequential(msg -> Mono + .fromCallable(() -> Tuples.of(msg, msg.body())) + .subscribeOn(Schedulers.boundedElastic()) + ) + .flatMapSequential(tuple -> { + var msg = tuple.getT1(); + var body = tuple.getT2(); + logger.trace("Received execute request {}", body); + var request = overrideRequest(body.getRequest(), botId); return td - .execute(request, msg.body().isExecuteDirectly()) + .execute(request, body.isExecuteDirectly()) .map(result -> Tuples.of(msg, result)) .doOnSuccess(s -> logger.trace("Executed successfully")); }) - .handle((tuple, sink) -> { + .flatMapSequential(tuple -> Mono.fromCallable(() -> { var msg = tuple.getT1(); var response = tuple.getT2(); var replyOpts = new DeliveryOptions().setLocalOnly(local); @@ -157,13 +163,13 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { try { logger.trace("Replying with success response"); msg.reply(replyValue, replyOpts); - sink.next(response); + return response; } catch (Exception ex) { - logger.trace("Replying with error response: {}", ex.getLocalizedMessage()); + logger.debug("Replying with error response: {}", ex.getLocalizedMessage()); msg.fail(500, ex.getLocalizedMessage()); - sink.error(ex); + throw ex; } - }) + }).subscribeOn(Schedulers.boundedElastic())) .then() .publishOn(Schedulers.single()) .subscribe(v -> {}, @@ -214,7 +220,7 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { }) .then() .doOnSuccess(s -> logger.trace("Finished handling ready-to-receive requests (updates pipe ended)")) - .publishOn(Schedulers.single()) + .subscribeOn(Schedulers.boundedElastic()) // Don't handle errors here. Handle them in pipeFlux .subscribe(v -> {}); @@ -228,12 +234,13 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { pingConsumer.handler(sink::next); pingConsumer.endHandler(h -> sink.complete()); }) - .doOnNext(msg -> { + .flatMapSequential(msg -> Mono.fromCallable(() -> { var opts = new DeliveryOptions().setLocalOnly(local).setSendTimeout(Duration.ofSeconds(10).toMillis()); msg.reply(EMPTY, opts); - }) + return null; + })) .then() - .publishOn(Schedulers.single()) + .subscribeOn(Schedulers.boundedElastic()) .subscribe(v -> {}, ex -> logger.error("Error when processing a ping request", ex), () -> logger.trace("Finished handling ping requests") diff --git a/src/main/java/it/tdlight/utils/MonoUtils.java b/src/main/java/it/tdlight/utils/MonoUtils.java index 8492f6b..2fcc832 100644 --- a/src/main/java/it/tdlight/utils/MonoUtils.java +++ b/src/main/java/it/tdlight/utils/MonoUtils.java @@ -261,23 +261,23 @@ public class MonoUtils { } public static Mono emitValue(One sink, T value) { - return fromEmitResult(sink.tryEmitValue(value)); + return Mono.defer(() -> fromEmitResult(sink.tryEmitValue(value))); } public static Mono emitNext(Many sink, T value) { - return fromEmitResult(sink.tryEmitNext(value)); + return Mono.defer(() -> fromEmitResult(sink.tryEmitNext(value))); } public static Mono emitComplete(Many sink) { - return fromEmitResult(sink.tryEmitComplete()); + return Mono.defer(() -> fromEmitResult(sink.tryEmitComplete())); + } + + public static Mono emitEmpty(Empty sink) { + return Mono.defer(() -> fromEmitResult(sink.tryEmitEmpty())); } public static Mono emitError(Empty sink, Throwable value) { - return fromEmitResult(sink.tryEmitError(value)); - } - - public static Future emitEmpty(Empty sink) { - return fromEmitResultFuture(sink.tryEmitEmpty()); + return Mono.defer(() -> fromEmitResult(sink.tryEmitError(value))); } public static Future emitValueFuture(One sink, T value) { @@ -343,12 +343,13 @@ public class MonoUtils { public static Flux fromConsumer(MessageConsumer messageConsumer) { return Flux.>create(sink -> { - messageConsumer.handler(sink::next); - messageConsumer.endHandler(e -> sink.complete()); - sink.onDispose(messageConsumer::unregister); - }) - .subscribeOn(Schedulers.boundedElastic()) - .flatMapSequential(msg -> Mono.fromCallable(msg::body).subscribeOn(Schedulers.boundedElastic())); + messageConsumer.handler(sink::next); + messageConsumer.endHandler(e -> sink.complete()); + sink.onDispose(messageConsumer::unregister); + }) + .startWith(MonoUtils.castVoid(messageConsumer.rxCompletionHandler().as(MonoUtils::toMono))) + .flatMapSequential(msg -> Mono.fromCallable(msg::body).subscribeOn(Schedulers.boundedElastic())) + .subscribeOn(Schedulers.boundedElastic()); } public static class SinkRWStream implements io.vertx.core.streams.WriteStream, io.vertx.core.streams.ReadStream {