From 4fe39ae7d145b4164c8c4fb4536feac2b2694915 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Mon, 19 Oct 2020 15:57:50 +0200 Subject: [PATCH] Set client initialization success once --- .../td/direct/AsyncTdDirectImpl.java | 21 ++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/src/main/java/it/tdlight/tdlibsession/td/direct/AsyncTdDirectImpl.java b/src/main/java/it/tdlight/tdlibsession/td/direct/AsyncTdDirectImpl.java index 7866650..0e0f072 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/direct/AsyncTdDirectImpl.java +++ b/src/main/java/it/tdlight/tdlibsession/td/direct/AsyncTdDirectImpl.java @@ -71,7 +71,7 @@ public class AsyncTdDirectImpl implements AsyncTdDirect { @Override public Mono initializeClient() { return Mono.create(sink -> { - Flux.>>create(emitter -> { + var updatesConnectableFlux = Flux.>>create(emitter -> { var client = ClientManager.create((Object object) -> { emitter.next(Future.succeededFuture(TdResult.of(object))); // Close the emitter if receive closed state @@ -90,16 +90,27 @@ public class AsyncTdDirectImpl implements AsyncTdDirect { emitter.onDispose(() -> { this.td.set(null); }); - }).subscribeOn(tdPollScheduler).subscribe(next -> { - updatesProcessor.onNext(next); + }).subscribeOn(tdPollScheduler).publish(); + + // Complete initialization when receiving first update + updatesConnectableFlux.subscribeOn(tdPollScheduler).take(1).single().subscribe(next -> { sink.success(true); }, error -> { - updatesProcessor.onError(error); sink.error(error); }, () -> { - updatesProcessor.onComplete(); sink.success(true); }); + + // Pass updates to UpdatesProcessor + updatesConnectableFlux.subscribeOn(tdPollScheduler).subscribe(next -> { + updatesProcessor.onNext(next); + }, error -> { + updatesProcessor.onError(error); + }, () -> { + updatesProcessor.onComplete(); + }); + + updatesConnectableFlux.connect(); }).single().then().subscribeOn(tdScheduler); }