From 9b6812982f903cee9d691f9542c2f079b36ab04d Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Wed, 13 Jan 2021 22:05:34 +0100 Subject: [PATCH] Bugfix --- .../it/tdlight/tdlibsession/EventBusFlux.java | 28 +++++++++++-------- .../td/direct/AsyncTdDirectImpl.java | 8 +++--- .../client/AsyncTdMiddleEventBusClient.java | 8 ++---- .../server/AsyncTdMiddleEventBusServer.java | 18 +++++------- 4 files changed, 29 insertions(+), 33 deletions(-) diff --git a/src/main/java/it/tdlight/tdlibsession/EventBusFlux.java b/src/main/java/it/tdlight/tdlibsession/EventBusFlux.java index b1c9383..f005afe 100644 --- a/src/main/java/it/tdlight/tdlibsession/EventBusFlux.java +++ b/src/main/java/it/tdlight/tdlibsession/EventBusFlux.java @@ -63,7 +63,7 @@ public class EventBusFlux { subscriptionReady.unregister(subscriptionReadyUnregistered -> { if (subscriptionReadyUnregistered.succeeded()) { var subscription = flux.subscribe(item -> { - var request = eventBus.request(subscriptionAddress + ".signal", SignalMessage.onNext(item), signalDeliveryOptions, msg2 -> { + eventBus.request(subscriptionAddress + ".signal", SignalMessage.onNext(item), signalDeliveryOptions, msg2 -> { if (msg2.failed()) { logger.error("Failed to send onNext signal", msg2.cause()); } @@ -83,15 +83,11 @@ public class EventBusFlux { }); cancel.handler(msg3 -> { - if (!subscription.isDisposed()) { - subscription.dispose(); - } + subscription.dispose(); msg3.reply(EMPTY, deliveryOptions); }); dispose.handler(msg2 -> { - if (!subscription.isDisposed()) { - subscription.dispose(); - } + subscription.dispose(); cancel.unregister(v -> { if (v.failed()) { logger.error("Failed to unregister cancel", v.cause()); @@ -181,20 +177,28 @@ public class EventBusFlux { msg2.reply(EMPTY); }); signalConsumer.completionHandler(h -> { - if (h.failed()) { - emitter.error(new IllegalStateException("Signal consumer registration failed", msg.cause())); - } else { + if (h.succeeded()) { eventBus.request(fluxAddress + ".subscriptionReady", EMPTY, deliveryOptions, msg2 -> { if (msg2.failed()) { logger.error("Failed to tell that the subscription is ready"); } }); + } else { + emitter.error(new IllegalStateException("Signal consumer registration failed", msg.cause())); } }); - emitter.onDispose(() -> eventBus.send(subscriptionAddress + ".dispose", EMPTY, deliveryOptions)); + emitter.onDispose(() -> eventBus.request(subscriptionAddress + ".dispose", EMPTY, deliveryOptions, msg2 -> { + if (msg.failed()) { + logger.error("Failed to tell that the subscription is disposed"); + } + })); - emitter.onCancel(() -> eventBus.send(subscriptionAddress + ".cancel", EMPTY, deliveryOptions)); + emitter.onCancel(() -> eventBus.request(subscriptionAddress + ".cancel", EMPTY, deliveryOptions, msg2 -> { + if (msg.failed()) { + logger.error("Failed to tell that the subscription is cancelled"); + } + })); } else { emitter.error(new IllegalStateException("Subscription failed", msg.cause())); } diff --git a/src/main/java/it/tdlight/tdlibsession/td/direct/AsyncTdDirectImpl.java b/src/main/java/it/tdlight/tdlibsession/td/direct/AsyncTdDirectImpl.java index 0b199e8..63f3a4c 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/direct/AsyncTdDirectImpl.java +++ b/src/main/java/it/tdlight/tdlibsession/td/direct/AsyncTdDirectImpl.java @@ -45,7 +45,7 @@ public class AsyncTdDirectImpl implements AsyncTdDirect { } throw new IllegalStateException("TDLib client is destroyed"); } - }).publishOn(Schedulers.boundedElastic()).single().subscribeOn(tdScheduler)); + }).publishOn(Schedulers.boundedElastic()).single()); } else { return td.asMono().flatMap(td -> Mono.>create(sink -> { if (td != null) { @@ -60,7 +60,7 @@ public class AsyncTdDirectImpl implements AsyncTdDirect { } sink.error(new IllegalStateException("TDLib client is destroyed")); } - })).single().subscribeOn(tdScheduler); + })).single(); } } @@ -89,8 +89,8 @@ public class AsyncTdDirectImpl implements AsyncTdDirect { closedFromTd.asMono().take(Duration.ofMillis(10)).switchIfEmpty(Mono.fromRunnable(() -> client.send(new Close(), result -> logger.trace("Close result: {}", result), ex -> logger.trace("Error when disposing td client", ex) - ))).subscribeOn(tdScheduler).subscribe(); + ))).subscribe(); }); - }).subscribeOn(tdScheduler); + }); } } diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java b/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java index bd7c3b2..73f5858 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java @@ -39,8 +39,6 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.Sinks; import reactor.core.publisher.Sinks.Many; -import reactor.core.scheduler.Scheduler; -import reactor.core.scheduler.Schedulers; public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements AsyncTdMiddle { @@ -49,7 +47,6 @@ public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements Asy public static final boolean OUTPUT_REQUESTS = false; public static final byte[] EMPTY = new byte[0]; - private final Scheduler tdMiddleScheduler = Schedulers.single(); private final Many tdClosed = Sinks.many().replay().latestOrDefault(false); private final DeliveryOptions deliveryOptions; private final DeliveryOptions deliveryOptionsWithTimeout; @@ -149,7 +146,6 @@ public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements Asy if (msg.succeeded()) { this.listen() .timeout(Duration.ofSeconds(30)) - .subscribeOn(tdMiddleScheduler) .subscribe(v -> {}, future::fail, future::complete); } else { future.fail(msg.cause()); @@ -244,7 +240,7 @@ public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements Asy tdClosed.tryEmitNext(true); } } - })).subscribeOn(tdMiddleScheduler); + })); } @Override @@ -300,6 +296,6 @@ public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements Asy } }).switchIfEmpty(Mono.fromSupplier(() -> { return TdResult.failed(new TdApi.Error(500, "Client is closed or response is empty")); - })).subscribeOn(tdMiddleScheduler); + })); } } diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java b/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java index 9f2b1fe..a57119a 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java @@ -33,8 +33,6 @@ import java.util.function.Consumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.publisher.Mono; -import reactor.core.scheduler.Scheduler; -import reactor.core.scheduler.Schedulers; public class AsyncTdMiddleEventBusServer { @@ -54,7 +52,6 @@ public class AsyncTdMiddleEventBusServer { private boolean local; protected AsyncTdDirectImpl td; - private final Scheduler tdSrvPoll; /** * Value is not important, emits when a request is received */ @@ -68,7 +65,6 @@ public class AsyncTdMiddleEventBusServer { public AsyncTdMiddleEventBusServer(TdClusterManager clusterManager) { this.cluster = clusterManager; this.tdOptions = new AsyncTdDirectOptions(WAIT_DURATION, 1000); - this.tdSrvPoll = Schedulers.single(); if (cluster.registerDefaultCodec(TdResultList.class, new TdResultListMessageCodec())) { cluster.registerDefaultCodec(ExecuteObject.class, new TdExecuteObjectMessageCodec()); cluster.registerDefaultCodec(TdResultMessage.class, new TdResultMessageCodec()); @@ -99,7 +95,7 @@ public class AsyncTdMiddleEventBusServer { workingMsg.reply(EMPTY, cluster.newDeliveryOpts().setLocalOnly(local)); }); this.isWorkingConsumer.completionHandler(MonoUtils.toHandler(registrationSink)); - })).subscribeOn(this.tdSrvPoll) + })) .subscribe(v -> {}, ex -> { logger.info(botAddress + " server deployed and started. succeeded: false"); logger.error(ex.getLocalizedMessage(), ex); @@ -123,7 +119,7 @@ public class AsyncTdMiddleEventBusServer { sink.error(h.cause()); } }); - }).subscribeOn(tdSrvPoll); + }); } public void onBeforeStop(Consumer> r) { @@ -180,7 +176,7 @@ public class AsyncTdMiddleEventBusServer { } catch (Exception ex) { sink.error(ex); } - }).subscribeOn(this.tdSrvPoll) + }) .subscribe(response -> {}, ex -> { logger.error("Error when processing a request", ex); msg.fail(500, ex.getLocalizedMessage()); @@ -191,7 +187,7 @@ public class AsyncTdMiddleEventBusServer { } }); executeConsumer.completionHandler(MonoUtils.toHandler(registrationSink)); - }).subscribeOn(tdSrvPoll); + }); } private void undeploy(Runnable whenUndeployed) { @@ -227,7 +223,7 @@ public class AsyncTdMiddleEventBusServer { whenUndeployed.run(); }); - }).subscribeOn(this.tdSrvPoll).subscribe(v -> {}, ex -> { + }).subscribe(v -> {}, ex -> { logger.error("Error when stopping", ex); }, () -> {}); }); @@ -251,7 +247,7 @@ public class AsyncTdMiddleEventBusServer { System.out.println("<=: end (3)"); } this.undeploy(() -> {}); - }).subscribeOn(tdSrvPoll); + }); var fluxCodec = new TdResultListMessageCodec(); return EventBusFlux.serve(updatesFlux, cluster.getEventBus(), @@ -259,6 +255,6 @@ public class AsyncTdMiddleEventBusServer { cluster.newDeliveryOpts().setLocalOnly(local), fluxCodec, Duration.ofSeconds(30) - ).subscribeOn(tdSrvPoll); + ); } }