From 89335f9526ddb75c1734f5423ab5ca88b86858af Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Wed, 13 Jan 2021 20:30:01 +0100 Subject: [PATCH] Simplify scheduling --- .../it/tdlight/tdlibsession/EventBusFlux.java | 126 ++++++++++-------- .../remoteclient/TDLibRemoteClient.java | 5 +- .../td/direct/AsyncTdDirectImpl.java | 8 +- .../client/AsyncTdMiddleEventBusClient.java | 6 +- .../server/AsyncTdMiddleEventBusServer.java | 1 - 5 files changed, 79 insertions(+), 67 deletions(-) diff --git a/src/main/java/it/tdlight/tdlibsession/EventBusFlux.java b/src/main/java/it/tdlight/tdlibsession/EventBusFlux.java index a8d61e2..808005b 100644 --- a/src/main/java/it/tdlight/tdlibsession/EventBusFlux.java +++ b/src/main/java/it/tdlight/tdlibsession/EventBusFlux.java @@ -34,6 +34,7 @@ public class EventBusFlux { MessageCodec itemsCodec, Duration connectionTimeout) { var signalsCodec = new SignalMessageCodec(itemsCodec); + EventBusFlux.registerFluxCodec(eventBus, itemsCodec); var deliveryOptions = new DeliveryOptions(baseDeliveryOptions) .setSendTimeout(connectionTimeout.toMillis()); var signalDeliveryOptions = new DeliveryOptions(deliveryOptions) @@ -52,70 +53,82 @@ public class EventBusFlux { long subscriptionId = 0; var subscriptionAddress = fluxAddress + "." + subscriptionId; + MessageConsumer subscriptionReady = eventBus.consumer(fluxAddress + ".subscriptionReady"); MessageConsumer dispose = eventBus.consumer(subscriptionAddress + ".dispose"); MessageConsumer cancel = eventBus.consumer(subscriptionAddress + ".cancel"); - var subscription = flux.subscribe(item -> { - var request = eventBus.request(subscriptionAddress + ".signal", SignalMessage.onNext(item), signalDeliveryOptions, msg2 -> { - if (msg2.failed()) { - logger.error("Failed to send onNext signal", msg2.cause()); - } - }); - }, error -> { - eventBus.request(subscriptionAddress + ".signal", SignalMessage.onError(error), signalDeliveryOptions, msg2 -> { - if (msg2.failed()) { - logger.error("Failed to send onNext signal", msg2.cause()); - } - }); - }, () -> { - eventBus.request(subscriptionAddress + ".signal", SignalMessage.onComplete(), signalDeliveryOptions, msg2 -> { - if (msg2.failed()) { - logger.error("Failed to send onNext signal", msg2.cause()); - } - }); - }); - - cancel.handler(msg3 -> { - if (!subscription.isDisposed()) { - subscription.dispose(); - } - msg3.reply(EMPTY, deliveryOptions); - }); - dispose.handler(msg2 -> { - if (!subscription.isDisposed()) { - subscription.dispose(); - } - cancel.unregister(v -> { - if (v.failed()) { - logger.error("Failed to unregister cancel", v.cause()); - } - dispose.unregister(v2 -> { - if (v.failed()) { - logger.error("Failed to unregister dispose", v2.cause()); + subscriptionReady.handler(subscriptionReadyMsg -> { + var subscription = flux.subscribe(item -> { + var request = eventBus.request(subscriptionAddress + ".signal", SignalMessage.onNext(item), signalDeliveryOptions, msg2 -> { + if (msg2.failed()) { + logger.error("Failed to send onNext signal", msg2.cause()); } - subscribe.unregister(v3 -> { - if (v2.failed()) { - logger.error("Failed to unregister subscribe", v3.cause()); + }); + }, error -> { + eventBus.request(subscriptionAddress + ".signal", SignalMessage.onError(error), signalDeliveryOptions, msg2 -> { + if (msg2.failed()) { + logger.error("Failed to send onNext signal", msg2.cause()); + } + }); + }, () -> { + eventBus.request(subscriptionAddress + ".signal", SignalMessage.onComplete(), signalDeliveryOptions, msg2 -> { + if (msg2.failed()) { + logger.error("Failed to send onNext signal", msg2.cause()); + } + }); + }); + + cancel.handler(msg3 -> { + if (!subscription.isDisposed()) { + subscription.dispose(); + } + msg3.reply(EMPTY, deliveryOptions); + }); + dispose.handler(msg2 -> { + if (!subscription.isDisposed()) { + subscription.dispose(); + } + cancel.unregister(v -> { + if (v.failed()) { + logger.error("Failed to unregister cancel", v.cause()); + } + dispose.unregister(v2 -> { + if (v.failed()) { + logger.error("Failed to unregister dispose", v2.cause()); } - msg2.reply(EMPTY); + subscribe.unregister(v3 -> { + if (v2.failed()) { + logger.error("Failed to unregister subscribe", v3.cause()); + } + msg2.reply(EMPTY); + }); }); }); }); + + cancel.completionHandler(h -> { + if (h.succeeded()) { + dispose.completionHandler(h2 -> { + if (h2.succeeded()) { + subscriptionReadyMsg.reply((Long) subscriptionId); + } else { + logger.error("Failed to register dispose", h.cause()); + subscriptionReadyMsg.fail(500, "Failed to register dispose"); + } + }); + } else { + logger.error("Failed to register cancel", h.cause()); + subscriptionReadyMsg.fail(500, "Failed to register cancel"); + } + }); }); - cancel.completionHandler(h -> { - if (h.succeeded()) { - dispose.completionHandler(h2 -> { - if (h2.succeeded()) { - msg.reply((Long) subscriptionId); - } else { - logger.error("Failed to register dispose", h.cause()); - msg.fail(500, "Failed to register dispose"); - } - }); + subscriptionReady.completionHandler(srh -> { + if (srh.succeeded()) { + msg.reply((Long) subscriptionId); } else { - logger.error("Failed to register cancel", h.cause()); - msg.fail(500, "Failed to register cancel"); + logger.error("Failed to register \"subscription ready\"", srh.cause()); + msg.fail(500, "Failed to register \"subscription ready\""); } }); @@ -136,6 +149,7 @@ public class EventBusFlux { DeliveryOptions baseDeliveryOptions, MessageCodec itemsCodec, Duration connectionTimeout) { + EventBusFlux.registerFluxCodec(eventBus, itemsCodec); return Flux.create(emitter -> { var deliveryOptions = new DeliveryOptions(baseDeliveryOptions) .setSendTimeout(connectionTimeout.toMillis()); @@ -163,6 +177,12 @@ public class EventBusFlux { signalConsumer.completionHandler(h -> { if (h.failed()) { emitter.error(new IllegalStateException("Signal consumer registration failed", msg.cause())); + } else { + eventBus.request(fluxAddress + ".subscriptionReady", EMPTY, deliveryOptions, msg2 -> { + if (msg2.failed()) { + logger.error("Failed to tell that the subscription is ready"); + } + }); } }); diff --git a/src/main/java/it/tdlight/tdlibsession/remoteclient/TDLibRemoteClient.java b/src/main/java/it/tdlight/tdlibsession/remoteclient/TDLibRemoteClient.java index fc6015b..e422fb9 100644 --- a/src/main/java/it/tdlight/tdlibsession/remoteclient/TDLibRemoteClient.java +++ b/src/main/java/it/tdlight/tdlibsession/remoteclient/TDLibRemoteClient.java @@ -23,8 +23,6 @@ import org.slf4j.LoggerFactory; import reactor.core.publisher.Flux; import reactor.core.publisher.Sinks; import reactor.core.publisher.Sinks.Many; -import reactor.core.scheduler.Scheduler; -import reactor.core.scheduler.Schedulers; public class TDLibRemoteClient implements AutoCloseable { @@ -36,7 +34,6 @@ public class TDLibRemoteClient implements AutoCloseable { private final int port; private final Set membersAddresses; private final Many clusterManager = Sinks.many().replay().latest(); - private final Scheduler deploymentScheduler = Schedulers.single(); public TDLibRemoteClient(SecurityInfo securityInfo, String masterHostname, String netInterface, int port, Set membersAddresses) { this.securityInfo = securityInfo; @@ -263,7 +260,7 @@ public class TDLibRemoteClient implements AutoCloseable { }); verticle.start(botAddress, botAddress, false).doOnError(error -> { logger.error("Can't deploy bot \"" + botAddress + "\"", error); - }).subscribeOn(deploymentScheduler).subscribe(v -> {}, err -> { + }).subscribe(v -> {}, err -> { deploymentHandler.handle(Future.failedFuture(err)); }, () -> { deploymentHandler.handle(Future.succeededFuture()); diff --git a/src/main/java/it/tdlight/tdlibsession/td/direct/AsyncTdDirectImpl.java b/src/main/java/it/tdlight/tdlibsession/td/direct/AsyncTdDirectImpl.java index 28b131d..6f6f57b 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/direct/AsyncTdDirectImpl.java +++ b/src/main/java/it/tdlight/tdlibsession/td/direct/AsyncTdDirectImpl.java @@ -17,7 +17,6 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.Sinks; import reactor.core.publisher.Sinks.One; -import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; public class AsyncTdDirectImpl implements AsyncTdDirect { @@ -25,7 +24,6 @@ public class AsyncTdDirectImpl implements AsyncTdDirect { private static final Logger logger = LoggerFactory.getLogger(AsyncTdDirect.class); private final One td = Sinks.one(); - private final Scheduler tdScheduler = Schedulers.newSingle("TdMain"); private final String botAlias; @@ -45,7 +43,7 @@ public class AsyncTdDirectImpl implements AsyncTdDirect { } throw new IllegalStateException("TDLib client is destroyed"); } - }).publishOn(Schedulers.boundedElastic()).single().subscribeOn(tdScheduler)); + }).publishOn(Schedulers.boundedElastic()).single()); } else { return td.asMono().flatMap(td -> Mono.>create(sink -> { if (td != null) { @@ -60,7 +58,7 @@ public class AsyncTdDirectImpl implements AsyncTdDirect { } sink.error(new IllegalStateException("TDLib client is destroyed")); } - })).single().subscribeOn(tdScheduler); + })).single(); } } @@ -91,6 +89,6 @@ public class AsyncTdDirectImpl implements AsyncTdDirect { ex -> logger.trace("Error when disposing td client", ex) ))).subscribe(); }); - }).subscribeOn(tdScheduler); + }); } } diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java b/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java index 343482d..271a403 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java @@ -149,7 +149,6 @@ public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements Asy if (msg.succeeded()) { this.listen() .timeout(Duration.ofSeconds(30)) - .subscribeOn(tdMiddleScheduler) .subscribe(v -> {}, future::fail, future::complete); } else { future.fail(msg.cause()); @@ -208,7 +207,6 @@ public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements Asy @Override public Flux receive() { var fluxCodec = new TdResultListMessageCodec(); - EventBusFlux.registerFluxCodec(cluster.getEventBus(), fluxCodec); return Mono.from(tdClosed.asFlux()).single().filter(tdClosed -> !tdClosed).flatMapMany(_closed -> EventBusFlux .connect(cluster.getEventBus(), botAddress + ".updates", @@ -245,7 +243,7 @@ public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements Asy tdClosed.tryEmitNext(true); } } - })).subscribeOn(tdMiddleScheduler); + })); } @Override @@ -301,6 +299,6 @@ public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements Asy } }).switchIfEmpty(Mono.fromSupplier(() -> { return TdResult.failed(new TdApi.Error(500, "Client is closed or response is empty")); - })).subscribeOn(tdMiddleScheduler); + })); } } diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java b/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java index 05c798b..57d3e56 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java @@ -253,7 +253,6 @@ public class AsyncTdMiddleEventBusServer { this.undeploy(() -> {}); }); var fluxCodec = new TdResultListMessageCodec(); - EventBusFlux.registerFluxCodec(cluster.getEventBus(), fluxCodec); return EventBusFlux.serve(updatesFlux, cluster.getEventBus(), botAddress + ".updates",