From 85bac8670d201922ac695f3f086eb0c6cd526e8f Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Mon, 19 Oct 2020 17:25:10 +0200 Subject: [PATCH] Redesigned subscribers --- .../td/direct/AsyncTdDirectImpl.java | 42 ++++++------------- .../server/AsyncTdMiddleEventBusServer.java | 1 - 2 files changed, 13 insertions(+), 30 deletions(-) diff --git a/src/main/java/it/tdlight/tdlibsession/td/direct/AsyncTdDirectImpl.java b/src/main/java/it/tdlight/tdlibsession/td/direct/AsyncTdDirectImpl.java index 0e0f072..971da76 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/direct/AsyncTdDirectImpl.java +++ b/src/main/java/it/tdlight/tdlibsession/td/direct/AsyncTdDirectImpl.java @@ -28,10 +28,10 @@ public class AsyncTdDirectImpl implements AsyncTdDirect { private final AtomicReference td = new AtomicReference<>(); private final Scheduler tdScheduler = Schedulers.newSingle("TdMain"); private final Scheduler tdPollScheduler = Schedulers.newSingle("TdPoll"); - private final Scheduler tdUpdatesScheduler = Schedulers.newSingle("TdUpdate"); private final Scheduler tdResponsesScheduler = Schedulers.newSingle("TdResponse"); + private final Scheduler tdResponsesOutputScheduler = Schedulers.boundedElastic(); - private final EmitterProcessor>> updatesProcessor = EmitterProcessor.create(); + private Flux>> updatesProcessor; private final String botAlias; public AsyncTdDirectImpl(String botAlias) { @@ -41,7 +41,10 @@ public class AsyncTdDirectImpl implements AsyncTdDirect { @Override public Mono> execute(Function request, boolean synchronous) { if (synchronous) { - return Mono.just(TdResult.of(this.td.get().execute(request))); + return Mono + .fromCallable(() -> TdResult.of(this.td.get().execute(request))) + .subscribeOn(tdResponsesScheduler) + .publishOn(Schedulers.single()); } else { return Mono.>create(sink -> { try { @@ -51,21 +54,13 @@ public class AsyncTdDirectImpl implements AsyncTdDirect { } catch (Throwable t) { sink.error(t); } - }).subscribeOn(tdResponsesScheduler); + }).subscribeOn(tdResponsesScheduler).publishOn(tdResponsesOutputScheduler); } } @Override public Flux>> getUpdates(Duration receiveDuration, int eventsSize) { - return Flux.from(updatesProcessor.subscribeOn(tdUpdatesScheduler)); - } - - public Scheduler getTdUpdatesScheduler() { - return tdUpdatesScheduler; - } - - public Scheduler getTdResponsesScheduler() { - return tdResponsesScheduler; + return updatesProcessor; } @Override @@ -93,25 +88,14 @@ public class AsyncTdDirectImpl implements AsyncTdDirect { }).subscribeOn(tdPollScheduler).publish(); // Complete initialization when receiving first update - updatesConnectableFlux.subscribeOn(tdPollScheduler).take(1).single().subscribe(next -> { - sink.success(true); - }, error -> { - sink.error(error); - }, () -> { - sink.success(true); - }); + updatesConnectableFlux.take(1).single() + .doOnSuccess(_v -> sink.success(true)).doOnError(sink::error).subscribe(); // Pass updates to UpdatesProcessor - updatesConnectableFlux.subscribeOn(tdPollScheduler).subscribe(next -> { - updatesProcessor.onNext(next); - }, error -> { - updatesProcessor.onError(error); - }, () -> { - updatesProcessor.onComplete(); - }); + updatesProcessor = updatesConnectableFlux.publish().refCount(); updatesConnectableFlux.connect(); - }).single().then().subscribeOn(tdScheduler); + }).single().then().subscribeOn(tdScheduler).publishOn(tdResponsesOutputScheduler); } @Override @@ -119,6 +103,6 @@ public class AsyncTdDirectImpl implements AsyncTdDirect { return Mono.fromCallable(() -> { // do nothing return (Void) null; - }).single().subscribeOn(tdScheduler); + }).single().subscribeOn(tdScheduler).publishOn(tdResponsesOutputScheduler); } } diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java b/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java index 2246081..deedc89 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java @@ -140,7 +140,6 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { .from(tdClosed) .single() .filter(tdClosedVal -> !tdClosedVal) - .subscribeOn(td.getTdUpdatesScheduler()) .map(_v -> { ArrayList>> updatesBatch = new ArrayList<>(); while (!queue.isEmpty() && updatesBatch.size() < 1000) {