This commit is contained in:
Andrea Cavalli 2021-01-26 12:34:59 +01:00
parent 798f77ddbc
commit 703e9b1044
4 changed files with 106 additions and 49 deletions

View File

@ -48,6 +48,8 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle {
private final Empty<Void> updatesStreamEnd = Sinks.one();
// This will only result in a crash, never completes in other ways
private final Empty<Void> crash = Sinks.one();
// This will only result in a successful completion, never completes in other ways
private final Empty<Void> pingFail = Sinks.one();
private int botId;
private String botAddress;
@ -131,29 +133,71 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle {
if (local) {
return Mono.empty();
}
logger.trace("Requesting bots.start-bot");
return cluster.getEventBus()
.<byte[]>rxRequest("bots.start-bot", msg).as(MonoUtils::toMono)
.publishOn(Schedulers.boundedElastic());
.doOnSuccess(s -> logger.trace("bots.start-bot returned successfully"))
.subscribeOn(Schedulers.boundedElastic());
}))
.then();
.then(setupPing());
})
.publishOn(Schedulers.single());
}
private Mono<Void> setupUpdatesListener() {
return MonoUtils
.fromBlockingMaybe(() -> {
MessageConsumer<TdResultList> updateConsumer = MessageConsumer.newInstance(cluster.getEventBus().<TdResultList>consumer(
botAddress + ".updates").setMaxBufferedMessages(5000).getDelegate());
private Mono<Void> setupPing() {
return Mono.<Void>fromCallable(() -> {
logger.trace("Setting up ping");
// Disable ping on local servers
if (!local) {
Mono
.defer(() -> cluster.getEventBus().<byte[]>rxRequest(botAddress + ".ping",
EMPTY,
deliveryOptionsWithTimeout
).as(MonoUtils::toMono))
.flatMap(msg -> Mono.fromCallable(msg::body).subscribeOn(Schedulers.boundedElastic()))
.repeatWhen(l -> l.delayElements(Duration.ofSeconds(10)).takeWhile(x -> true))
.takeUntilOther(Mono.firstWithSignal(this.updatesStreamEnd.asMono().doOnTerminate(() -> {
logger.trace("About to kill pinger because updates stream ended");
}), this.crash.asMono().onErrorResume(ex -> Mono.empty()).doOnTerminate(() -> {
logger.trace("About to kill pinger because it has seen a crash signal");
})))
.doOnNext(s -> logger.warn("REPEATING PING"))
.map(x -> 1)
.defaultIfEmpty(0)
.doOnNext(s -> logger.warn("PING"))
.then()
.doOnNext(s -> logger.warn("END PING"))
.then(MonoUtils.emitEmpty(this.pingFail))
.subscribeOn(Schedulers.single())
.subscribe();
}
logger.trace("Ping setup success");
return null;
}).subscribeOn(Schedulers.boundedElastic());
}
private Mono<Void> setupUpdatesListener() {
return Mono
.fromRunnable(() -> logger.trace("Setting up updates listener..."))
.then(MonoUtils.<MessageConsumer<TdResultList>>fromBlockingSingle(() -> {
return MessageConsumer.newInstance(cluster.getEventBus().<TdResultList>consumer(botAddress + ".updates")
.setMaxBufferedMessages(5000)
.getDelegate());
}))
.flatMap(updateConsumer -> {
// Here the updates will be piped from the server to the client
var updateConsumerFlux = MonoUtils.fromConsumer(updateConsumer);
// Return when the registration of all the consumers has been done across the cluster
return MonoUtils
.emitValue(updates, updateConsumerFlux)
.then(updateConsumer.rxCompletionHandler().as(MonoUtils::toMono));
return Mono
.fromRunnable(() -> logger.trace("Emitting updates flux to sink"))
.then(MonoUtils.emitValue(updates, updateConsumerFlux))
.doOnSuccess(s -> logger.trace("Emitted updates flux to sink"))
.doOnSuccess(s -> logger.trace("Waiting to register update consumer across the cluster"))
.then(updateConsumer.rxCompletionHandler().as(MonoUtils::toMono))
.doOnSuccess(s -> logger.trace("Registered update consumer across the cluster"));
})
.doOnSuccess(s ->logger.trace("Set up updates listener"))
.then();
}
@ -163,22 +207,24 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle {
return Mono
.fromRunnable(() -> logger.trace("Called receive() from parent"))
.doOnSuccess(s -> logger.trace("Sending ready-to-receive"))
.then()
.doOnSuccess(s -> logger.trace("Sent ready-to-receive, received reply"))
.doOnSuccess(s -> logger.trace("About to read updates flux"))
.then(updates.asMono().publishOn(Schedulers.single()))
.timeout(Duration.ofSeconds(5))
.publishOn(Schedulers.single())
.flatMapMany(tdResultListFlux -> tdResultListFlux.publishOn(Schedulers.single()))
.timeout(Duration.ofMinutes(1), Mono.fromCallable(() -> {
var ex = new ConnectException("Server did not respond to 12 pings after 1 minute (5 seconds per ping)");
ex.setStackTrace(new StackTraceElement[0]);
throw ex;
}))
.doOnSubscribe(s -> cluster.getEventBus().<byte[]>send(botAddress + ".ready-to-receive",
EMPTY,
deliveryOptionsWithTimeout
))
.flatMapMany(updatesFlux -> updatesFlux.publishOn(Schedulers.single()))
.takeUntilOther(Flux
.merge(
crash.asMono()
.onErrorResume(ex -> Mono.empty()),
pingFail.asMono()
.then(Mono.fromCallable(() -> {
var ex = new ConnectException("Server did not respond to ping");
ex.setStackTrace(new StackTraceElement[0]);
throw ex;
}).onErrorResume(ex -> MonoUtils.emitError(crash, ex)))
)
)
.flatMapSequential(updates -> {
if (updates.succeeded()) {
return Flux.fromIterable(updates.value());
@ -188,7 +234,10 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle {
})
.publishOn(Schedulers.single())
.flatMapSequential(this::interceptUpdate)
// Redirect errors to crash sink
.doOnError(crash::tryEmitError)
.onErrorResume(ex -> Mono.empty())
.doOnTerminate(updatesStreamEnd::tryEmitEmpty)
.publishOn(Schedulers.single());
}

View File

@ -88,7 +88,7 @@ public class AsyncTdMiddleDirect extends AbstractVerticle implements AsyncTdMidd
return td
.receive(new AsyncTdDirectOptions(WAIT_DURATION, 100))
.takeUntilOther(closeRequest.asMono())
.doOnNext(s -> logger.trace("Received update from tdlib: {}", s))
.doOnNext(s -> logger.trace("Received update from tdlib: {}", s.getClass().getSimpleName()))
.doOnError(ex -> logger.info("TdMiddle verticle error", ex))
.doOnTerminate(() -> logger.debug("TdMiddle verticle stopped"))
.subscribeOn(Schedulers.boundedElastic())

View File

@ -141,15 +141,21 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
executeConsumer.handler(sink::next);
executeConsumer.endHandler(h -> sink.complete());
})
.flatMapSequential(msg -> {
logger.trace("Received execute request {}", msg.body());
var request = overrideRequest(msg.body().getRequest(), botId);
.flatMapSequential(msg -> Mono
.fromCallable(() -> Tuples.of(msg, msg.body()))
.subscribeOn(Schedulers.boundedElastic())
)
.flatMapSequential(tuple -> {
var msg = tuple.getT1();
var body = tuple.getT2();
logger.trace("Received execute request {}", body);
var request = overrideRequest(body.getRequest(), botId);
return td
.execute(request, msg.body().isExecuteDirectly())
.execute(request, body.isExecuteDirectly())
.map(result -> Tuples.of(msg, result))
.doOnSuccess(s -> logger.trace("Executed successfully"));
})
.handle((tuple, sink) -> {
.flatMapSequential(tuple -> Mono.fromCallable(() -> {
var msg = tuple.getT1();
var response = tuple.getT2();
var replyOpts = new DeliveryOptions().setLocalOnly(local);
@ -157,13 +163,13 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
try {
logger.trace("Replying with success response");
msg.reply(replyValue, replyOpts);
sink.next(response);
return response;
} catch (Exception ex) {
logger.trace("Replying with error response: {}", ex.getLocalizedMessage());
logger.debug("Replying with error response: {}", ex.getLocalizedMessage());
msg.fail(500, ex.getLocalizedMessage());
sink.error(ex);
throw ex;
}
})
}).subscribeOn(Schedulers.boundedElastic()))
.then()
.publishOn(Schedulers.single())
.subscribe(v -> {},
@ -214,7 +220,7 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
})
.then()
.doOnSuccess(s -> logger.trace("Finished handling ready-to-receive requests (updates pipe ended)"))
.publishOn(Schedulers.single())
.subscribeOn(Schedulers.boundedElastic())
// Don't handle errors here. Handle them in pipeFlux
.subscribe(v -> {});
@ -228,12 +234,13 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
pingConsumer.handler(sink::next);
pingConsumer.endHandler(h -> sink.complete());
})
.doOnNext(msg -> {
.flatMapSequential(msg -> Mono.fromCallable(() -> {
var opts = new DeliveryOptions().setLocalOnly(local).setSendTimeout(Duration.ofSeconds(10).toMillis());
msg.reply(EMPTY, opts);
})
return null;
}))
.then()
.publishOn(Schedulers.single())
.subscribeOn(Schedulers.boundedElastic())
.subscribe(v -> {},
ex -> logger.error("Error when processing a ping request", ex),
() -> logger.trace("Finished handling ping requests")

View File

@ -261,23 +261,23 @@ public class MonoUtils {
}
public static <T> Mono<Void> emitValue(One<T> sink, T value) {
return fromEmitResult(sink.tryEmitValue(value));
return Mono.defer(() -> fromEmitResult(sink.tryEmitValue(value)));
}
public static <T> Mono<Void> emitNext(Many<T> sink, T value) {
return fromEmitResult(sink.tryEmitNext(value));
return Mono.defer(() -> fromEmitResult(sink.tryEmitNext(value)));
}
public static <T> Mono<Void> emitComplete(Many<T> sink) {
return fromEmitResult(sink.tryEmitComplete());
return Mono.defer(() -> fromEmitResult(sink.tryEmitComplete()));
}
public static <T> Mono<Void> emitEmpty(Empty<T> sink) {
return Mono.defer(() -> fromEmitResult(sink.tryEmitEmpty()));
}
public static <T> Mono<Void> emitError(Empty<T> sink, Throwable value) {
return fromEmitResult(sink.tryEmitError(value));
}
public static <T> Future<Void> emitEmpty(Empty<T> sink) {
return fromEmitResultFuture(sink.tryEmitEmpty());
return Mono.defer(() -> fromEmitResult(sink.tryEmitError(value)));
}
public static <T> Future<Void> emitValueFuture(One<T> sink, T value) {
@ -343,12 +343,13 @@ public class MonoUtils {
public static <T> Flux<T> fromConsumer(MessageConsumer<T> messageConsumer) {
return Flux.<Message<T>>create(sink -> {
messageConsumer.handler(sink::next);
messageConsumer.endHandler(e -> sink.complete());
sink.onDispose(messageConsumer::unregister);
})
.subscribeOn(Schedulers.boundedElastic())
.flatMapSequential(msg -> Mono.fromCallable(msg::body).subscribeOn(Schedulers.boundedElastic()));
messageConsumer.handler(sink::next);
messageConsumer.endHandler(e -> sink.complete());
sink.onDispose(messageConsumer::unregister);
})
.startWith(MonoUtils.castVoid(messageConsumer.rxCompletionHandler().as(MonoUtils::toMono)))
.flatMapSequential(msg -> Mono.fromCallable(msg::body).subscribeOn(Schedulers.boundedElastic()))
.subscribeOn(Schedulers.boundedElastic());
}
public static class SinkRWStream<T> implements io.vertx.core.streams.WriteStream<T>, io.vertx.core.streams.ReadStream<T> {