Use usingWhen when needed

This commit is contained in:
Andrea Cavalli 2021-03-31 12:02:49 +02:00
parent 24f83b5190
commit 49336ad910
3 changed files with 101 additions and 116 deletions

View File

@ -30,7 +30,6 @@ import reactor.core.publisher.Sinks;
import reactor.core.publisher.Sinks.One; import reactor.core.publisher.Sinks.One;
import reactor.core.scheduler.Schedulers; import reactor.core.scheduler.Schedulers;
import reactor.tools.agent.ReactorDebugAgent; import reactor.tools.agent.ReactorDebugAgent;
import reactor.util.function.Tuple2;
public class TDLibRemoteClient implements AutoCloseable { public class TDLibRemoteClient implements AutoCloseable {
@ -168,21 +167,23 @@ public class TDLibRemoteClient implements AutoCloseable {
}) })
.single() .single()
.flatMap(clusterManager -> { .flatMap(clusterManager -> {
MessageConsumer<StartSessionMessage> startBotConsumer = clusterManager.getEventBus().consumer("bots.start-bot"); MessageConsumer<StartSessionMessage> startBotConsumer
return MonoUtils = clusterManager.getEventBus().consumer("bots.start-bot");
.fromReplyableResolvedMessageConsumer(startBotConsumer)
.flatMap(tuple -> this.listenForStartBotsCommand(clusterManager, tuple.getT1(), tuple.getT2())); return this.listenForStartBotsCommand(
clusterManager,
MonoUtils.fromReplyableMessageConsumer(Mono.empty(), startBotConsumer)
);
}) })
.then(); .then();
} }
private Mono<Void> listenForStartBotsCommand(TdClusterManager clusterManager, private Mono<Void> listenForStartBotsCommand(TdClusterManager clusterManager,
Mono<Void> completion, Flux<Message<StartSessionMessage>> messages) {
Flux<Tuple2<Message<?>, StartSessionMessage>> messages) {
return MonoUtils return MonoUtils
.fromBlockingEmpty(() -> messages .fromBlockingEmpty(() -> messages
.flatMapSequential(msg -> { .flatMapSequential(msg -> {
StartSessionMessage req = msg.getT2(); StartSessionMessage req = msg.body();
DeploymentOptions deploymentOptions = clusterManager DeploymentOptions deploymentOptions = clusterManager
.newDeploymentOpts() .newDeploymentOpts()
.setConfig(new JsonObject() .setConfig(new JsonObject()
@ -201,9 +202,9 @@ public class TDLibRemoteClient implements AutoCloseable {
.chooseBinlog(clusterManager.getVertx().fileSystem(), blPath, req.binlog(), req.binlogDate()) .chooseBinlog(clusterManager.getVertx().fileSystem(), blPath, req.binlog(), req.binlogDate())
.then(BinlogUtils.cleanSessionPath(clusterManager.getVertx().fileSystem(), blPath, sessPath, mediaPath)) .then(BinlogUtils.cleanSessionPath(clusterManager.getVertx().fileSystem(), blPath, sessPath, mediaPath))
.then(clusterManager.getVertx().rxDeployVerticle(verticle, deploymentOptions).as(MonoUtils::toMono)) .then(clusterManager.getVertx().rxDeployVerticle(verticle, deploymentOptions).as(MonoUtils::toMono))
.then(MonoUtils.fromBlockingEmpty(() -> msg.getT1().reply(new byte[0]))) .then(MonoUtils.fromBlockingEmpty(() -> msg.reply(new byte[0])))
.onErrorResume(ex -> { .onErrorResume(ex -> {
msg.getT1().fail(500, "Failed to deploy bot verticle: " + ex.getMessage()); msg.fail(500, "Failed to deploy bot verticle: " + ex.getMessage());
logger.error("Failed to deploy bot verticle", ex); logger.error("Failed to deploy bot verticle", ex);
return Mono.empty(); return Mono.empty();
}); });
@ -213,8 +214,7 @@ public class TDLibRemoteClient implements AutoCloseable {
v -> {}, v -> {},
ex -> logger.error("Bots starter activity crashed. From now on, no new bots can be started anymore", ex) ex -> logger.error("Bots starter activity crashed. From now on, no new bots can be started anymore", ex)
) )
) );
.then(completion);
} }
public static Path getSessionDirectory(long botId) { public static Path getSessionDirectory(long botId) {

View File

@ -226,68 +226,64 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle {
.then(updates.asMono()) .then(updates.asMono())
.publishOn(Schedulers.parallel()) .publishOn(Schedulers.parallel())
.timeout(Duration.ofSeconds(30)) .timeout(Duration.ofSeconds(30))
.flatMapMany(updatesMessageConsumer -> MonoUtils .doOnSuccess(s -> logger.trace("Registering updates flux"))
.fromMessageConsumer(updatesMessageConsumer) .flatMapMany(updatesMessageConsumer -> MonoUtils.fromMessageConsumer(Mono
.flatMapMany(registration -> Mono .empty()
.fromRunnable(() -> logger.trace("Registering updates flux")) .doOnSuccess(s -> logger.trace("Sending ready-to-receive"))
.then(registration.getT1()) .then(cluster.getEventBus().<byte[]>rxRequest(botAddress + ".ready-to-receive",
.doOnSuccess(s -> logger.trace("Registered updates flux")) EMPTY,
.doOnSuccess(s -> logger.trace("Sending ready-to-receive")) deliveryOptionsWithTimeout
.then(cluster.getEventBus().<byte[]>rxRequest(botAddress + ".ready-to-receive", ).as(MonoUtils::toMono))
EMPTY, .doOnSuccess(s -> logger.trace("Sent ready-to-receive, received reply"))
deliveryOptionsWithTimeout .doOnSuccess(s -> logger.trace("About to read updates flux"))
).as(MonoUtils::toMono)) .then(), updatesMessageConsumer)
.doOnSuccess(s -> logger.trace("Sent ready-to-receive, received reply")) )
.doOnSuccess(s -> logger.trace("About to read updates flux")) .takeUntilOther(Flux
.thenMany(registration.getT2()) .merge(
crash.asMono()
.onErrorResume(ex -> {
logger.error("TDLib crashed", ex);
return Mono.empty();
}),
pingFail.asMono()
.then(Mono.fromCallable(() -> {
var ex = new ConnectException("Server did not respond to ping");
ex.setStackTrace(new StackTraceElement[0]);
throw ex;
}).onErrorResume(ex -> MonoUtils.emitError(crash, ex)))
.takeUntilOther(Mono
.firstWithSignal(crash.asMono(), authStateClosing.asMono())
.onErrorResume(e -> Mono.empty())
)
) )
.takeUntilOther(Flux .doOnTerminate(() -> logger.trace("TakeUntilOther has been trigghered, the receive() flux will end"))
.merge( )
crash.asMono() .takeUntil(a -> a.succeeded() && a.value().stream().anyMatch(item -> {
.onErrorResume(ex -> { if (item.getConstructor() == UpdateAuthorizationState.CONSTRUCTOR) {
logger.error("TDLib crashed", ex); return ((UpdateAuthorizationState) item).authorizationState.getConstructor()
return Mono.empty(); == AuthorizationStateClosed.CONSTRUCTOR;
}), }
pingFail.asMono() return false;
.then(Mono.fromCallable(() -> { }))
var ex = new ConnectException("Server did not respond to ping"); .flatMapSequential(updates -> {
ex.setStackTrace(new StackTraceElement[0]); if (updates.succeeded()) {
throw ex; return Flux.fromIterable(updates.value());
}).onErrorResume(ex -> MonoUtils.emitError(crash, ex))) } else {
.takeUntilOther(Mono return Mono.fromCallable(() -> TdResult.failed(updates.error()).orElseThrow());
.firstWithSignal(crash.asMono(), authStateClosing.asMono()) }
.onErrorResume(e -> Mono.empty()) })
) .concatMap(update -> interceptUpdate(update))
) // Redirect errors to crash sink
.doOnTerminate(() -> logger.trace("TakeUntilOther has been trigghered, the receive() flux will end")) .doOnError(error -> crash.tryEmitError(error))
) .onErrorResume(ex -> {
.takeUntil(a -> a.succeeded() && a.value().stream().anyMatch(item -> { logger.trace("Absorbing the error, the error has been published using the crash sink", ex);
if (item.getConstructor() == UpdateAuthorizationState.CONSTRUCTOR) { return Mono.empty();
return ((UpdateAuthorizationState) item).authorizationState.getConstructor() })
== AuthorizationStateClosed.CONSTRUCTOR;
}
return false;
}))
.flatMapSequential(updates -> {
if (updates.succeeded()) {
return Flux.fromIterable(updates.value());
} else {
return Mono.fromCallable(() -> TdResult.failed(updates.error()).orElseThrow());
}
})
.flatMapSequential(update -> interceptUpdate(updatesMessageConsumer, update))
// Redirect errors to crash sink
.doOnError(error -> crash.tryEmitError(error))
.onErrorResume(ex -> {
logger.trace("Absorbing the error, the error has been published using the crash sink", ex);
return Mono.empty();
})
.doOnTerminate(updatesStreamEnd::tryEmitEmpty) .doOnTerminate(updatesStreamEnd::tryEmitEmpty);
);
} }
private Mono<TdApi.Object> interceptUpdate(MessageConsumer<TdResultList> updatesMessageConsumer, Object update) { private Mono<TdApi.Object> interceptUpdate(Object update) {
logger.trace("Received update {}", update.getClass().getSimpleName()); logger.trace("Received update {}", update.getClass().getSimpleName());
switch (update.getConstructor()) { switch (update.getConstructor()) {
case TdApi.UpdateAuthorizationState.CONSTRUCTOR: case TdApi.UpdateAuthorizationState.CONSTRUCTOR:
@ -298,7 +294,6 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle {
break; break;
case TdApi.AuthorizationStateClosed.CONSTRUCTOR: case TdApi.AuthorizationStateClosed.CONSTRUCTOR:
return Mono.fromRunnable(() -> logger.info("Received AuthorizationStateClosed from tdlib")) return Mono.fromRunnable(() -> logger.info("Received AuthorizationStateClosed from tdlib"))
.then(updatesMessageConsumer.rxUnregister().as(MonoUtils::toMono))
.then(cluster.getEventBus().<EndSessionMessage>rxRequest(this.botAddress + ".read-binlog", EMPTY).as(MonoUtils::toMono)) .then(cluster.getEventBus().<EndSessionMessage>rxRequest(this.botAddress + ".read-binlog", EMPTY).as(MonoUtils::toMono))
.flatMap(latestBinlogMsg -> Mono.fromCallable(() -> latestBinlogMsg.body()).subscribeOn(Schedulers.parallel())) .flatMap(latestBinlogMsg -> Mono.fromCallable(() -> latestBinlogMsg.body()).subscribeOn(Schedulers.parallel()))
.doOnNext(latestBinlog -> logger.info("Received binlog from server. Size: " + BinlogUtils.humanReadableByteCountBin(latestBinlog.binlog().length()))) .doOnNext(latestBinlog -> logger.info("Received binlog from server. Size: " + BinlogUtils.humanReadableByteCountBin(latestBinlog.binlog().length())))

View File

@ -29,11 +29,12 @@ import java.util.function.Supplier;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.Nullable;
import org.reactivestreams.Subscription; import org.reactivestreams.Subscription;
import org.warp.commonutils.concurrency.future.CompletableFutureUtils;
import org.warp.commonutils.log.Logger; import org.warp.commonutils.log.Logger;
import org.warp.commonutils.log.LoggerFactory; import org.warp.commonutils.log.LoggerFactory;
import org.warp.commonutils.concurrency.future.CompletableFutureUtils;
import reactor.core.CoreSubscriber; import reactor.core.CoreSubscriber;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink.OverflowStrategy;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink; import reactor.core.publisher.MonoSink;
import reactor.core.publisher.Sinks; import reactor.core.publisher.Sinks;
@ -47,8 +48,6 @@ import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers; import reactor.core.scheduler.Schedulers;
import reactor.util.concurrent.Queues; import reactor.util.concurrent.Queues;
import reactor.util.context.Context; import reactor.util.context.Context;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;
public class MonoUtils { public class MonoUtils {
@ -375,53 +374,44 @@ public class MonoUtils {
); );
} }
public static <T> Mono<Tuple2<Mono<Void>, Flux<T>>> fromMessageConsumer(MessageConsumer<T> messageConsumer) { public static <T> Flux<T> fromMessageConsumer(Mono<Void> onRegistered, MessageConsumer<T> messageConsumer) {
return fromReplyableMessageConsumer(messageConsumer) return fromReplyableMessageConsumer(onRegistered, messageConsumer).map(Message::body);
.map(tuple -> tuple.mapT2(msgs -> msgs.flatMapSequential(msg -> Mono
.fromCallable(msg::body)
.subscribeOn(Schedulers.parallel())))
);
} }
public static <T> Mono<Tuple2<Mono<Void>, Flux<Tuple2<Message<?>, T>>>> fromReplyableResolvedMessageConsumer(MessageConsumer<T> messageConsumer) { public static <T> Flux<Message<T>> fromReplyableMessageConsumer(Mono<Void> onRegistered,
return fromReplyableMessageConsumer(messageConsumer) MessageConsumer<T> messageConsumer) {
.map(tuple -> tuple.mapT2(msgs -> msgs.flatMapSequential(msg -> Mono Mono<Void> endMono = Mono.create(sink -> {
.fromCallable(() -> Tuples.<Message<?>, T>of(msg, msg.body())) AtomicBoolean alreadyRequested = new AtomicBoolean();
.subscribeOn(Schedulers.parallel()))) sink.onRequest(n -> {
); if (n > 0 && alreadyRequested.compareAndSet(false, true)) {
} messageConsumer.endHandler(e -> sink.success());
}
public static <T> Mono<Tuple2<Mono<Void>, Flux<Message<T>>>> fromReplyableMessageConsumer(MessageConsumer<T> messageConsumer) {
return Mono.<Tuple2<Mono<Void>, Flux<Message<T>>>>fromCallable(() -> {
Many<Message<T>> messages = Sinks.many().unicast().onBackpressureError();
Empty<Void> registrationRequested = Sinks.empty();
Empty<Void> registrationCompletion = Sinks.empty();
messageConsumer.endHandler(e -> {
messages.tryEmitComplete();
registrationCompletion.tryEmitEmpty();
}); });
messageConsumer.<Message<T>>handler(messages::tryEmitNext); });
Flux<Message<T>> dataFlux = Flux Mono<MessageConsumer<T>> registrationCompletionMono = Mono
.concatDelayError( .fromRunnable(() -> logger.trace("Waiting for consumer registration completion..."))
messages.asFlux(), .<Void>then(messageConsumer.rxCompletionHandler().as(MonoUtils::toMono))
messageConsumer .doOnSuccess(s -> logger.trace("Consumer registered"))
.rxUnregister() .then(onRegistered)
.as(MonoUtils::<Message<T>>toMono) .thenReturn(messageConsumer);
.doOnSuccess(s -> logger.trace("Unregistered message consumer"))
)
.doOnSubscribe(s -> registrationRequested.tryEmitEmpty());
Mono<Void> registrationCompletionMono = Mono.empty() messageConsumer.handler(s -> {
.doOnSubscribe(s -> registrationRequested.tryEmitEmpty()) throw new IllegalStateException("Subscriber still didn't request any value!");
.then(registrationRequested.asMono()) });
.doOnSuccess(s -> logger.trace("Subscribed to registration completion mono"))
.doOnSuccess(s -> logger.trace("Waiting for consumer registration completion...")) Flux<Message<T>> dataFlux = Flux
.<Void>then(messageConsumer.rxCompletionHandler().as(MonoUtils::toMono)) .push(sink -> sink.onRequest(n -> messageConsumer.handler(sink::next)), OverflowStrategy.ERROR);
.doOnSuccess(s -> logger.trace("Consumer registered"))
.share(); Mono<Void> disposeMono = messageConsumer
return Tuples.of(registrationCompletionMono, dataFlux); .rxUnregister()
}).subscribeOn(Schedulers.boundedElastic()); .as(MonoUtils::<Message<T>>toMono)
.doOnSuccess(s -> logger.trace("Unregistered message consumer"))
.then();
return Flux
.usingWhen(registrationCompletionMono, msgCons -> dataFlux, msgCons -> disposeMono)
.takeUntilOther(endMono);
} }
public static Scheduler newBoundedSingle(String name) { public static Scheduler newBoundedSingle(String name) {