diff --git a/src/main/java/it/tdlight/tdlibsession/EventBusFlux.java b/src/main/java/it/tdlight/tdlibsession/EventBusFlux.java index 808005b..b1c9383 100644 --- a/src/main/java/it/tdlight/tdlibsession/EventBusFlux.java +++ b/src/main/java/it/tdlight/tdlibsession/EventBusFlux.java @@ -44,94 +44,100 @@ public class EventBusFlux { MessageConsumer subscribe = eventBus.consumer(fluxAddress + ".subscribe"); subscribe.handler(msg -> { - if (subscriptionsCount.incrementAndGet() > 1) { - subscriptionsCount.decrementAndGet(); - logger.error("Another client tried to connect to the same flux. Rejecting the request."); - msg.fail(500, "This flux is already in use!"); - return; - } - long subscriptionId = 0; - var subscriptionAddress = fluxAddress + "." + subscriptionId; - - MessageConsumer subscriptionReady = eventBus.consumer(fluxAddress + ".subscriptionReady"); - MessageConsumer dispose = eventBus.consumer(subscriptionAddress + ".dispose"); - MessageConsumer cancel = eventBus.consumer(subscriptionAddress + ".cancel"); - - subscriptionReady.handler(subscriptionReadyMsg -> { - var subscription = flux.subscribe(item -> { - var request = eventBus.request(subscriptionAddress + ".signal", SignalMessage.onNext(item), signalDeliveryOptions, msg2 -> { - if (msg2.failed()) { - logger.error("Failed to send onNext signal", msg2.cause()); - } - }); - }, error -> { - eventBus.request(subscriptionAddress + ".signal", SignalMessage.onError(error), signalDeliveryOptions, msg2 -> { - if (msg2.failed()) { - logger.error("Failed to send onNext signal", msg2.cause()); - } - }); - }, () -> { - eventBus.request(subscriptionAddress + ".signal", SignalMessage.onComplete(), signalDeliveryOptions, msg2 -> { - if (msg2.failed()) { - logger.error("Failed to send onNext signal", msg2.cause()); - } - }); - }); - - cancel.handler(msg3 -> { - if (!subscription.isDisposed()) { - subscription.dispose(); + subscribe.unregister(subscribeUnregistered -> { + if (subscribeUnregistered.succeeded()) { + if (subscriptionsCount.incrementAndGet() > 1) { + subscriptionsCount.decrementAndGet(); + logger.error("Another client tried to connect to the same flux. Rejecting the request."); + msg.fail(500, "This flux is already in use!"); + return; } - msg3.reply(EMPTY, deliveryOptions); - }); - dispose.handler(msg2 -> { - if (!subscription.isDisposed()) { - subscription.dispose(); - } - cancel.unregister(v -> { - if (v.failed()) { - logger.error("Failed to unregister cancel", v.cause()); - } - dispose.unregister(v2 -> { - if (v.failed()) { - logger.error("Failed to unregister dispose", v2.cause()); - } - subscribe.unregister(v3 -> { - if (v2.failed()) { - logger.error("Failed to unregister subscribe", v3.cause()); - } - msg2.reply(EMPTY); - }); - }); - }); - }); + long subscriptionId = 0; + var subscriptionAddress = fluxAddress + "." + subscriptionId; - cancel.completionHandler(h -> { - if (h.succeeded()) { - dispose.completionHandler(h2 -> { - if (h2.succeeded()) { - subscriptionReadyMsg.reply((Long) subscriptionId); + MessageConsumer subscriptionReady = eventBus.consumer(fluxAddress + ".subscriptionReady"); + MessageConsumer dispose = eventBus.consumer(subscriptionAddress + ".dispose"); + MessageConsumer cancel = eventBus.consumer(subscriptionAddress + ".cancel"); + + subscriptionReady.handler(subscriptionReadyMsg -> { + subscriptionReady.unregister(subscriptionReadyUnregistered -> { + if (subscriptionReadyUnregistered.succeeded()) { + var subscription = flux.subscribe(item -> { + var request = eventBus.request(subscriptionAddress + ".signal", SignalMessage.onNext(item), signalDeliveryOptions, msg2 -> { + if (msg2.failed()) { + logger.error("Failed to send onNext signal", msg2.cause()); + } + }); + }, error -> { + eventBus.request(subscriptionAddress + ".signal", SignalMessage.onError(error), signalDeliveryOptions, msg2 -> { + if (msg2.failed()) { + logger.error("Failed to send onNext signal", msg2.cause()); + } + }); + }, () -> { + eventBus.request(subscriptionAddress + ".signal", SignalMessage.onComplete(), signalDeliveryOptions, msg2 -> { + if (msg2.failed()) { + logger.error("Failed to send onNext signal", msg2.cause()); + } + }); + }); + + cancel.handler(msg3 -> { + if (!subscription.isDisposed()) { + subscription.dispose(); + } + msg3.reply(EMPTY, deliveryOptions); + }); + dispose.handler(msg2 -> { + if (!subscription.isDisposed()) { + subscription.dispose(); + } + cancel.unregister(v -> { + if (v.failed()) { + logger.error("Failed to unregister cancel", v.cause()); + } + dispose.unregister(v2 -> { + if (v.failed()) { + logger.error("Failed to unregister dispose", v2.cause()); + } + msg2.reply(EMPTY); + }); + }); + }); + + cancel.completionHandler(h -> { + if (h.succeeded()) { + dispose.completionHandler(h2 -> { + if (h2.succeeded()) { + subscriptionReadyMsg.reply((Long) subscriptionId); + } else { + logger.error("Failed to register dispose", h.cause()); + subscriptionReadyMsg.fail(500, "Failed to register dispose"); + } + }); + } else { + logger.error("Failed to register cancel", h.cause()); + subscriptionReadyMsg.fail(500, "Failed to register cancel"); + } + }); } else { - logger.error("Failed to register dispose", h.cause()); - subscriptionReadyMsg.fail(500, "Failed to register dispose"); + logger.error("Failed to unregister \"subscription ready\""); } }); - } else { - logger.error("Failed to register cancel", h.cause()); - subscriptionReadyMsg.fail(500, "Failed to register cancel"); - } - }); - }); + }); - subscriptionReady.completionHandler(srh -> { - if (srh.succeeded()) { - msg.reply((Long) subscriptionId); + subscriptionReady.completionHandler(srh -> { + if (srh.succeeded()) { + msg.reply((Long) subscriptionId); + } else { + logger.error("Failed to register \"subscription ready\"", srh.cause()); + msg.fail(500, "Failed to register \"subscription ready\""); + } + }); } else { - logger.error("Failed to register \"subscription ready\"", srh.cause()); - msg.fail(500, "Failed to register \"subscription ready\""); + logger.error("Failed to unregister subscribe consumer"); } }); - }); subscribe.completionHandler(h -> { diff --git a/src/main/java/it/tdlight/tdlibsession/remoteclient/TDLibRemoteClient.java b/src/main/java/it/tdlight/tdlibsession/remoteclient/TDLibRemoteClient.java index 84c1376..2f7fa12 100644 --- a/src/main/java/it/tdlight/tdlibsession/remoteclient/TDLibRemoteClient.java +++ b/src/main/java/it/tdlight/tdlibsession/remoteclient/TDLibRemoteClient.java @@ -263,7 +263,7 @@ public class TDLibRemoteClient implements AutoCloseable { }); verticle.start(botAddress, botAddress, false).doOnError(error -> { logger.error("Can't deploy bot \"" + botAddress + "\"", error); - }).subscribe(v -> {}, err -> { + }).subscribeOn(deploymentScheduler).subscribe(v -> {}, err -> { deploymentHandler.handle(Future.failedFuture(err)); }, () -> { deploymentHandler.handle(Future.succeededFuture()); diff --git a/src/main/java/it/tdlight/tdlibsession/td/direct/AsyncTdDirectImpl.java b/src/main/java/it/tdlight/tdlibsession/td/direct/AsyncTdDirectImpl.java index 3d06629..0b199e8 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/direct/AsyncTdDirectImpl.java +++ b/src/main/java/it/tdlight/tdlibsession/td/direct/AsyncTdDirectImpl.java @@ -25,7 +25,7 @@ public class AsyncTdDirectImpl implements AsyncTdDirect { private static final Logger logger = LoggerFactory.getLogger(AsyncTdDirect.class); private final One td = Sinks.one(); - private final Scheduler tdScheduler = Schedulers.single(); + private final Scheduler tdScheduler = Schedulers.newSingle("TdMain"); private final String botAlias; @@ -89,7 +89,7 @@ public class AsyncTdDirectImpl implements AsyncTdDirect { closedFromTd.asMono().take(Duration.ofMillis(10)).switchIfEmpty(Mono.fromRunnable(() -> client.send(new Close(), result -> logger.trace("Close result: {}", result), ex -> logger.trace("Error when disposing td client", ex) - ))).subscribe(); + ))).subscribeOn(tdScheduler).subscribe(); }); }).subscribeOn(tdScheduler); } diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java b/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java index 57d3e56..9f2b1fe 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java @@ -251,7 +251,7 @@ public class AsyncTdMiddleEventBusServer { System.out.println("<=: end (3)"); } this.undeploy(() -> {}); - }); + }).subscribeOn(tdSrvPoll); var fluxCodec = new TdResultListMessageCodec(); return EventBusFlux.serve(updatesFlux, cluster.getEventBus(),