tdlib-session-container/src/main/java/it/tdlight/tdlibsession/EventBusFlux.java

214 lines
7.5 KiB
Java

package it.tdlight.tdlibsession;
import io.vertx.core.eventbus.DeliveryOptions;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.eventbus.MessageCodec;
import io.vertx.core.eventbus.MessageConsumer;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class EventBusFlux {
private static final Logger logger = LoggerFactory.getLogger(EventBusFlux.class);
private static final byte[] EMPTY = new byte[0];
public static <T> void registerFluxCodec(EventBus eventBus, MessageCodec<T, T> itemsCodec) {
var signalsCodec = new SignalMessageCodec<T>(itemsCodec);
try {
eventBus.registerCodec(signalsCodec);
} catch (IllegalStateException ex) {
if (!ex.getMessage().contains("Already a codec registered with name")) {
throw ex;
}
}
}
public static <T> Mono<Void> serve(Flux<T> flux,
EventBus eventBus,
String fluxAddress,
DeliveryOptions baseDeliveryOptions,
MessageCodec<T, T> itemsCodec,
Duration connectionTimeout) {
var signalsCodec = new SignalMessageCodec<T>(itemsCodec);
EventBusFlux.registerFluxCodec(eventBus, itemsCodec);
var deliveryOptions = new DeliveryOptions(baseDeliveryOptions)
.setSendTimeout(connectionTimeout.toMillis());
var signalDeliveryOptions = new DeliveryOptions(deliveryOptions)
.setCodecName(signalsCodec.name());
AtomicInteger subscriptionsCount = new AtomicInteger();
return Mono.create(sink -> {
MessageConsumer<byte[]> subscribe = eventBus.consumer(fluxAddress + ".subscribe");
subscribe.handler(msg -> {
subscribe.unregister(subscribeUnregistered -> {
if (subscribeUnregistered.succeeded()) {
if (subscriptionsCount.incrementAndGet() > 1) {
subscriptionsCount.decrementAndGet();
logger.error("Another client tried to connect to the same flux. Rejecting the request.");
msg.fail(500, "This flux is already in use!");
return;
}
long subscriptionId = 0;
var subscriptionAddress = fluxAddress + "." + subscriptionId;
MessageConsumer<byte[]> subscriptionReady = eventBus.consumer(fluxAddress + ".subscriptionReady");
MessageConsumer<byte[]> dispose = eventBus.consumer(subscriptionAddress + ".dispose");
MessageConsumer<byte[]> cancel = eventBus.consumer(subscriptionAddress + ".cancel");
subscriptionReady.<Long>handler(subscriptionReadyMsg -> {
subscriptionReady.unregister(subscriptionReadyUnregistered -> {
if (subscriptionReadyUnregistered.succeeded()) {
var subscription = flux.subscribe(item -> {
eventBus.request(subscriptionAddress + ".signal", SignalMessage.<T>onNext(item), signalDeliveryOptions, msg2 -> {
if (msg2.failed()) {
logger.error("Failed to send onNext signal", msg2.cause());
}
});
}, error -> {
eventBus.request(subscriptionAddress + ".signal", SignalMessage.<T>onError(error), signalDeliveryOptions, msg2 -> {
logger.info("Errored flux \"" + fluxAddress + "\"");
if (msg2.failed()) {
logger.error("Failed to send onError signal", msg2.cause());
}
});
}, () -> {
eventBus.request(subscriptionAddress + ".signal", SignalMessage.<T>onComplete(), signalDeliveryOptions, msg2 -> {
logger.info("Completed flux \"" + fluxAddress + "\"");
if (msg2.failed()) {
logger.error("Failed to send onComplete signal", msg2.cause());
}
});
});
cancel.handler(msg3 -> {
logger.warn("Cancelling flux \"" + fluxAddress + "\"");
subscription.dispose();
msg3.reply(EMPTY, deliveryOptions);
});
dispose.handler(msg2 -> {
logger.warn("Disposing flux \"" + fluxAddress + "\"");
subscription.dispose();
cancel.unregister(v -> {
if (v.failed()) {
logger.error("Failed to unregister cancel", v.cause());
}
dispose.unregister(v2 -> {
if (v.failed()) {
logger.error("Failed to unregister dispose", v2.cause());
}
msg2.reply(EMPTY);
});
});
});
cancel.completionHandler(h -> {
if (h.succeeded()) {
dispose.completionHandler(h2 -> {
if (h2.succeeded()) {
subscriptionReadyMsg.reply((Long) subscriptionId);
} else {
logger.error("Failed to register dispose", h.cause());
subscriptionReadyMsg.fail(500, "Failed to register dispose");
}
});
} else {
logger.error("Failed to register cancel", h.cause());
subscriptionReadyMsg.fail(500, "Failed to register cancel");
}
});
} else {
logger.error("Failed to unregister \"subscription ready\"");
}
});
});
subscriptionReady.completionHandler(srh -> {
if (srh.succeeded()) {
msg.reply((Long) subscriptionId);
} else {
logger.error("Failed to register \"subscription ready\"", srh.cause());
msg.fail(500, "Failed to register \"subscription ready\"");
}
});
} else {
logger.error("Failed to unregister subscribe consumer");
}
});
});
subscribe.completionHandler(h -> {
if (h.failed()) {
sink.error(h.cause());
} else {
sink.success();
}
});
});
}
public static <T> Flux<T> connect(EventBus eventBus,
String fluxAddress,
DeliveryOptions baseDeliveryOptions,
MessageCodec<T, T> itemsCodec,
Duration connectionTimeout) {
EventBusFlux.registerFluxCodec(eventBus, itemsCodec);
return Flux.<T>create(emitter -> {
var deliveryOptions = new DeliveryOptions(baseDeliveryOptions)
.setSendTimeout(connectionTimeout.toMillis());
eventBus.<Long>request(fluxAddress + ".subscribe", EMPTY, deliveryOptions, msg -> {
if (msg.succeeded()) {
long subscriptionId = msg.result().body();
var subscriptionAddress = fluxAddress + "." + subscriptionId;
var signalConsumer = eventBus.<SignalMessage<T>>consumer(subscriptionAddress + ".signal");
signalConsumer.handler(msg2 -> {
var signal = msg2.body();
switch (signal.getSignalType()) {
case ITEM:
emitter.next(signal.getItem());
break;
case ERROR:
emitter.error(new Exception(signal.getErrorMessage()));
break;
case COMPLETE:
emitter.complete();
break;
}
msg2.reply(EMPTY);
});
signalConsumer.completionHandler(h -> {
if (h.succeeded()) {
eventBus.<Long>request(fluxAddress + ".subscriptionReady", EMPTY, deliveryOptions, msg2 -> {
if (msg2.failed()) {
logger.error("Failed to tell that the subscription is ready");
}
});
} else {
emitter.error(new IllegalStateException("Signal consumer registration failed", msg.cause()));
}
});
emitter.onDispose(() -> eventBus.request(subscriptionAddress + ".dispose", EMPTY, deliveryOptions, msg2 -> {
if (msg.failed()) {
logger.error("Failed to tell that the subscription is disposed");
}
}));
emitter.onCancel(() -> eventBus.request(subscriptionAddress + ".cancel", EMPTY, deliveryOptions, msg2 -> {
if (msg.failed()) {
logger.error("Failed to tell that the subscription is cancelled");
}
}));
} else {
emitter.error(new IllegalStateException("Subscription failed", msg.cause()));
}
});
});
}
}