Set client initialization success once

This commit is contained in:
Andrea Cavalli 2020-10-19 15:57:50 +02:00
parent 8829aa998d
commit 4fe39ae7d1
1 changed files with 16 additions and 5 deletions

View File

@ -71,7 +71,7 @@ public class AsyncTdDirectImpl implements AsyncTdDirect {
@Override @Override
public Mono<Void> initializeClient() { public Mono<Void> initializeClient() {
return Mono.<Boolean>create(sink -> { return Mono.<Boolean>create(sink -> {
Flux.<AsyncResult<TdResult<Update>>>create(emitter -> { var updatesConnectableFlux = Flux.<AsyncResult<TdResult<Update>>>create(emitter -> {
var client = ClientManager.create((Object object) -> { var client = ClientManager.create((Object object) -> {
emitter.next(Future.succeededFuture(TdResult.of(object))); emitter.next(Future.succeededFuture(TdResult.of(object)));
// Close the emitter if receive closed state // Close the emitter if receive closed state
@ -90,16 +90,27 @@ public class AsyncTdDirectImpl implements AsyncTdDirect {
emitter.onDispose(() -> { emitter.onDispose(() -> {
this.td.set(null); this.td.set(null);
}); });
}).subscribeOn(tdPollScheduler).subscribe(next -> { }).subscribeOn(tdPollScheduler).publish();
updatesProcessor.onNext(next);
// Complete initialization when receiving first update
updatesConnectableFlux.subscribeOn(tdPollScheduler).take(1).single().subscribe(next -> {
sink.success(true); sink.success(true);
}, error -> { }, error -> {
updatesProcessor.onError(error);
sink.error(error); sink.error(error);
}, () -> { }, () -> {
updatesProcessor.onComplete();
sink.success(true); sink.success(true);
}); });
// Pass updates to UpdatesProcessor
updatesConnectableFlux.subscribeOn(tdPollScheduler).subscribe(next -> {
updatesProcessor.onNext(next);
}, error -> {
updatesProcessor.onError(error);
}, () -> {
updatesProcessor.onComplete();
});
updatesConnectableFlux.connect();
}).single().then().subscribeOn(tdScheduler); }).single().then().subscribeOn(tdScheduler);
} }