Publish ready-to-receive asynchronously

This commit is contained in:
Andrea Cavalli 2021-01-25 20:45:29 +01:00
parent c7cdfd49d0
commit cb3609586b
5 changed files with 37 additions and 27 deletions

View File

@ -74,7 +74,7 @@ public class AsyncTdDirectImpl implements AsyncTdDirect {
return telegramClientFactory.create(implementationDetails)
.flatMapMany(client -> Flux
.<TdApi.Object>create(updatesSink -> {
Schedulers.boundedElastic().schedule(() -> client.initialize((TdApi.Object object) -> {
client.initialize((TdApi.Object object) -> {
updatesSink.next(object);
// Close the emitter if receive closed state
if (object.getConstructor() == UpdateAuthorizationState.CONSTRUCTOR
@ -84,7 +84,7 @@ public class AsyncTdDirectImpl implements AsyncTdDirect {
closedFromTd.tryEmitValue(true);
updatesSink.complete();
}
}, updatesSink::error, updatesSink::error));
}, updatesSink::error, updatesSink::error);
if (td.tryEmitValue(client).isFailure()) {
updatesSink.error(new TdError(500, "Failed to emit td client"));
@ -105,7 +105,8 @@ public class AsyncTdDirectImpl implements AsyncTdDirect {
}).publishOn(Schedulers.single()).subscribe();
});
})
.publishOn(Schedulers.boundedElastic())
.subscribeOn(Schedulers.boundedElastic())
.publishOn(Schedulers.single())
);
}
}

View File

@ -58,7 +58,7 @@ public class AsyncTdEasy {
private final Logger logger;
private static final Scheduler scheduler = Schedulers.newSingle("AsyncTdEasy");
private static final Scheduler scheduler = Schedulers.newSingle("AsyncTdEasy", true);
private final Many<AuthorizationState> authState = Sinks.many().replay().latest();
private final Many<Boolean> requestedDefinitiveExit = Sinks.many().replay().latestOrDefault(false);
private final Many<TdEasySettings> settings = Sinks.many().replay().latest();

View File

@ -166,17 +166,20 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle {
.then()
.doOnSuccess(s -> logger.trace("Sent ready-to-receive, received reply"))
.doOnSuccess(s -> logger.trace("About to read updates flux"))
.then(updates.asMono())
.then(updates.asMono().publishOn(Schedulers.single()))
.timeout(Duration.ofSeconds(5))
.flatMapMany(Flux::hide)
.publishOn(Schedulers.single())
.flatMapMany(tdResultListFlux -> tdResultListFlux.publishOn(Schedulers.single()))
.startWith(MonoUtils
.castVoid(Mono.<Void>fromRunnable(() -> {
cluster.getEventBus().<byte[]>send(botAddress + ".ready-to-receive", EMPTY, deliveryOptionsWithTimeout);
}).subscribeOn(Schedulers.boundedElastic()))
)
.timeout(Duration.ofMinutes(1), Mono.fromCallable(() -> {
var ex = new ConnectException("Server did not respond to 12 pings after 1 minute (5 seconds per ping)");
ex.setStackTrace(new StackTraceElement[0]);
throw ex;
}))
.doOnSubscribe(s -> Schedulers.boundedElastic().schedule(() -> {
cluster.getEventBus().<byte[]>send(botAddress + ".ready-to-receive", EMPTY, deliveryOptionsWithTimeout);
}))
.flatMapSequential(updates -> {
if (updates.succeeded()) {
return Flux.fromIterable(updates.value());
@ -184,6 +187,7 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle {
return Mono.fromCallable(() -> TdResult.failed(updates.error()).orElseThrow());
}
})
.publishOn(Schedulers.single())
.flatMapSequential(this::interceptUpdate)
.doOnError(crash::tryEmitError)
.doOnTerminate(updatesStreamEnd::tryEmitEmpty)

View File

@ -109,7 +109,7 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
}
return onSuccessfulStartRequest(td, botAddress, botAlias, botId, local);
})
.flatMap(Mono::hide)
.flatMap(voidMono -> voidMono.hide().subscribeOn(Schedulers.boundedElastic()).publishOn(Schedulers.single()))
.doOnSuccess(s -> logger.trace("Stated verticle"))
.publishOn(Schedulers.single())
);
@ -128,7 +128,7 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
}
private Mono<Void> listen(AsyncTdDirectImpl td, String botAddress, String botAlias, int botId, boolean local) {
return Mono.<Void>create(registrationSink -> Schedulers.boundedElastic().schedule(() -> {
return Mono.<Void>create(registrationSink -> {
logger.trace("Preparing listeners");
MessageConsumer<ExecuteObject> executeConsumer = vertx.eventBus().consumer(botAddress + ".execute");
@ -248,7 +248,9 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
.doOnSuccess(s -> logger.trace("Finished preparing listeners"))
.publishOn(Schedulers.single())
.subscribe(v -> {}, registrationSink::error, registrationSink::success);
}));
})
.subscribeOn(Schedulers.boundedElastic())
.publishOn(Schedulers.single());
}
/**
@ -281,14 +283,16 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
.then(readBinlogConsumer
.asMono()
.timeout(Duration.ofSeconds(10), Mono.empty())
.doOnNext(ec -> Schedulers.boundedElastic().schedule(() -> Mono
// ReadBinLog will live for another 30 minutes.
// Since every consumer of ReadBinLog is identical, this should not pose a problem.
.delay(Duration.ofMinutes(30))
.then(ec.rxUnregister().as(MonoUtils::toMono))
.publishOn(Schedulers.single())
.subscribe())
)
.flatMap(ec -> Mono.fromCallable(() -> {
Mono
// ReadBinLog will live for another 30 minutes.
// Since every consumer of ReadBinLog is identical, this should not pose a problem.
.delay(Duration.ofMinutes(30))
.then(ec.rxUnregister().as(MonoUtils::toMono))
.publishOn(Schedulers.single())
.subscribe();
return null;
}).subscribeOn(Schedulers.boundedElastic()).publishOn(Schedulers.single()))
)
.then(readyToReceiveConsumer
.asMono()

View File

@ -100,7 +100,7 @@ public class MonoUtils {
}
public static <T> Mono<T> fromBlockingMaybe(Callable<T> callable) {
return Mono.fromCallable(callable).publishOn(Schedulers.boundedElastic());
return Mono.fromCallable(callable).subscribeOn(Schedulers.boundedElastic()).publishOn(Schedulers.single());
}
public static <T> Mono<T> fromBlockingSingle(Callable<T> callable) {
@ -343,12 +343,13 @@ public class MonoUtils {
public static <T> Flux<T> fromConsumer(MessageConsumer<T> messageConsumer) {
return Flux.<Message<T>>create(sink -> {
Schedulers.boundedElastic().schedule(() -> {
messageConsumer.handler(sink::next);
messageConsumer.endHandler(e -> sink.complete());
sink.onDispose(messageConsumer::unregister);
});
}).flatMapSequential(msg -> Mono.fromCallable(msg::body).publishOn(Schedulers.boundedElastic()));
messageConsumer.handler(sink::next);
messageConsumer.endHandler(e -> sink.complete());
sink.onDispose(messageConsumer::unregister);
})
.subscribeOn(Schedulers.boundedElastic())
.publishOn(Schedulers.single())
.flatMapSequential(msg -> Mono.fromCallable(msg::body).publishOn(Schedulers.boundedElastic()));
}
public static class SinkRWStream<T> implements io.vertx.core.streams.WriteStream<T>, io.vertx.core.streams.ReadStream<T> {