package it.tdlight.tdlibsession; import io.vertx.core.eventbus.DeliveryOptions; import io.vertx.core.eventbus.EventBus; import io.vertx.core.eventbus.Message; import io.vertx.core.eventbus.MessageCodec; import io.vertx.core.eventbus.MessageConsumer; import io.vertx.core.eventbus.ReplyException; import it.tdlight.utils.MonoUtils; import java.time.Duration; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import org.jetbrains.annotations.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.Disposable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.Sinks; import reactor.core.publisher.Sinks.One; import reactor.util.function.Tuple2; import reactor.util.function.Tuples; public class EventBusFlux { private static final Logger logger = LoggerFactory.getLogger(EventBusFlux.class); private static final byte[] EMPTY = new byte[0]; public static void registerFluxCodec(EventBus eventBus, MessageCodec itemsCodec) { var signalsCodec = new SignalMessageCodec(itemsCodec); try { eventBus.registerCodec(signalsCodec); } catch (IllegalStateException ex) { if (!ex.getMessage().contains("Already a codec registered with name")) { throw ex; } } } /** * * @return tuple. T1 = flux served, T2 = error that caused cancelling of the subscription */ public static Tuple2, Mono> serve(Flux flux, EventBus eventBus, String fluxAddress, DeliveryOptions baseDeliveryOptions, MessageCodec itemsCodec, Duration connectionTimeout) { var signalsCodec = new SignalMessageCodec(itemsCodec); EventBusFlux.registerFluxCodec(eventBus, itemsCodec); var deliveryOptions = new DeliveryOptions(baseDeliveryOptions) .setSendTimeout(connectionTimeout.toMillis()); var signalDeliveryOptions = new DeliveryOptions(deliveryOptions) .setCodecName(signalsCodec.name()); AtomicInteger subscriptionsCount = new AtomicInteger(); One fatalErrorSink = Sinks.one(); var servedMono = Mono.create(sink -> { MessageConsumer subscribe = eventBus.consumer(fluxAddress + ".subscribe"); subscribe.handler(msg -> { subscribe.unregister(subscribeUnregistered -> { if (subscribeUnregistered.succeeded()) { if (subscriptionsCount.incrementAndGet() > 1) { subscriptionsCount.decrementAndGet(); logger.error("Another client tried to connect to the same flux. Rejecting the request."); msg.fail(500, "This flux is already in use!"); return; } long subscriptionId = 0; var subscriptionAddress = fluxAddress + "." + subscriptionId; MessageConsumer subscriptionReady = eventBus.consumer(fluxAddress + ".subscriptionReady"); MessageConsumer dispose = eventBus.consumer(subscriptionAddress + ".dispose"); MessageConsumer cancel = eventBus.consumer(subscriptionAddress + ".cancel"); subscriptionReady.handler(subscriptionReadyMsg -> { subscriptionReady.unregister(subscriptionReadyUnregistered -> { if (subscriptionReadyUnregistered.succeeded()) { AtomicReference atomicSubscription = new AtomicReference<>(null); var subscription = flux .onErrorResume(error -> Mono .>create(errorSink -> { var responseHandler = MonoUtils.toHandler(errorSink); eventBus.request(subscriptionAddress + ".signal", SignalMessage.onError(error), signalDeliveryOptions, responseHandler); }) .then(Mono.empty()) ) .flatMap(item -> Mono.>create(itemSink -> { var responseHandler = MonoUtils.toHandler(itemSink); eventBus.request(subscriptionAddress + ".signal", SignalMessage.onNext(item), signalDeliveryOptions, responseHandler); })).subscribe(response -> {}, error -> { if (error instanceof ReplyException) { var errorMessage = error.getMessage(); if (errorMessage != null && errorMessage.contains("NO_HANDLERS")) { logger.error("Can't send a signal of flux \"" + fluxAddress + "\" because the connection was lost"); } else { logger.error("Error when sending a signal of flux \"" + fluxAddress + "\": {}", error.getLocalizedMessage()); } } else { logger.error("Error when sending a signal of flux \"" + fluxAddress + "\"", error); } fatalErrorSink.tryEmitValue(error); disposeFlux(atomicSubscription.get(), fatalErrorSink, cancel, dispose, fluxAddress, () -> { logger.warn("Forcefully disposed \"" + fluxAddress + "\" caused by the previous error"); }); }, () -> { eventBus.request(subscriptionAddress + ".signal", SignalMessage.onComplete(), signalDeliveryOptions, msg2 -> { logger.info("Completed flux \"" + fluxAddress + "\""); if (msg2.failed()) { logger.error("Failed to send onComplete signal", msg2.cause()); fatalErrorSink.tryEmitValue(msg2.cause()); } }); }); atomicSubscription.set(subscription); cancel.handler(msg3 -> { logger.trace("Cancelling flux \"" + fluxAddress + "\""); subscription.dispose(); logger.debug("Cancelled flux \"" + fluxAddress + "\""); msg3.reply(EMPTY, deliveryOptions); }); dispose.handler(msg2 -> { disposeFlux(subscription, fatalErrorSink, cancel, dispose, fluxAddress, () -> msg2.reply(EMPTY)); }); cancel.completionHandler(h -> { if (h.succeeded()) { dispose.completionHandler(h2 -> { if (h2.succeeded()) { subscriptionReadyMsg.reply((Long) subscriptionId); } else { logger.error("Failed to register dispose", h.cause()); subscriptionReadyMsg.fail(500, "Failed to register dispose"); } }); } else { logger.error("Failed to register cancel", h.cause()); subscriptionReadyMsg.fail(500, "Failed to register cancel"); } }); } else { logger.error("Failed to unregister \"subscription ready\""); } }); }); subscriptionReady.completionHandler(srh -> { if (srh.succeeded()) { msg.reply((Long) subscriptionId); } else { logger.error("Failed to register \"subscription ready\"", srh.cause()); msg.fail(500, "Failed to register \"subscription ready\""); } }); } else { logger.error("Failed to unregister subscribe consumer"); } }); }); subscribe.completionHandler(h -> { if (h.failed()) { sink.error(h.cause()); } else { sink.success(); } }); }); return Tuples.of(servedMono, fatalErrorSink.asMono()); } private static void disposeFlux(@Nullable Disposable subscription, One fatalErrorSink, MessageConsumer cancel, MessageConsumer dispose, String fluxAddress, Runnable after) { logger.trace("Disposing flux \"" + fluxAddress + "\""); fatalErrorSink.tryEmitEmpty(); if (subscription != null) { subscription.dispose(); } cancel.unregister(v -> { if (v.failed()) { logger.error("Failed to unregister cancel", v.cause()); } dispose.unregister(v2 -> { if (v.failed()) { logger.error("Failed to unregister dispose", v2.cause()); } logger.debug("Disposed flux \"" + fluxAddress + "\""); after.run(); }); }); } public static Flux connect(EventBus eventBus, String fluxAddress, DeliveryOptions baseDeliveryOptions, MessageCodec itemsCodec, Duration connectionTimeout) { EventBusFlux.registerFluxCodec(eventBus, itemsCodec); return Flux.create(emitter -> { var deliveryOptions = new DeliveryOptions(baseDeliveryOptions) .setSendTimeout(connectionTimeout.toMillis()); eventBus.request(fluxAddress + ".subscribe", EMPTY, deliveryOptions, msg -> { if (msg.succeeded()) { long subscriptionId = msg.result().body(); var subscriptionAddress = fluxAddress + "." + subscriptionId; var signalConsumer = eventBus.>consumer(subscriptionAddress + ".signal"); signalConsumer.handler(msg2 -> { var signal = msg2.body(); switch (signal.getSignalType()) { case ITEM: emitter.next(signal.getItem()); break; case ERROR: emitter.error(new Exception(signal.getErrorMessage())); break; case COMPLETE: emitter.complete(); break; } msg2.reply(EMPTY); }); signalConsumer.completionHandler(h -> { if (h.succeeded()) { eventBus.request(fluxAddress + ".subscriptionReady", EMPTY, deliveryOptions, msg2 -> { if (msg2.failed()) { logger.error("Failed to tell that the subscription is ready"); } }); } else { emitter.error(new IllegalStateException("Signal consumer registration failed", msg.cause())); } }); emitter.onDispose(() -> eventBus.request(subscriptionAddress + ".dispose", EMPTY, deliveryOptions, msg2 -> { if (msg.failed()) { logger.error("Failed to tell that the subscription is disposed"); } })); emitter.onCancel(() -> eventBus.request(subscriptionAddress + ".cancel", EMPTY, deliveryOptions, msg2 -> { if (msg.failed()) { logger.error("Failed to tell that the subscription is cancelled"); } })); } else { emitter.error(new IllegalStateException("Subscription failed", msg.cause())); } }); }); } }