diff --git a/pom.xml b/pom.xml index 51acf16..af9c5d7 100644 --- a/pom.xml +++ b/pom.xml @@ -66,7 +66,7 @@ it.tdlight tdlight-java - 2.7.8.11 + 2.7.8.12 diff --git a/src/main/java/it/tdlight/tdlibsession/td/WrappedReactorTelegramClient.java b/src/main/java/it/tdlight/tdlibsession/td/WrappedReactorTelegramClient.java index 685c54a..7c0204f 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/WrappedReactorTelegramClient.java +++ b/src/main/java/it/tdlight/tdlibsession/td/WrappedReactorTelegramClient.java @@ -1,10 +1,13 @@ package it.tdlight.tdlibsession.td; -import it.tdlight.common.ReactiveItem; import it.tdlight.common.ReactiveTelegramClient; +import it.tdlight.common.Signal; +import it.tdlight.common.SignalListener; +import it.tdlight.common.UpdatesHandler; import it.tdlight.jni.TdApi; import it.tdlight.utils.MonoUtils; import java.time.Duration; +import java.util.concurrent.atomic.AtomicReference; import org.jetbrains.annotations.NotNull; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; @@ -12,53 +15,50 @@ import reactor.core.CoreSubscriber; import reactor.core.publisher.Flux; import reactor.core.publisher.FluxSink.OverflowStrategy; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; public class WrappedReactorTelegramClient implements ReactorTelegramClient { private final ReactiveTelegramClient reactiveTelegramClient; + private final AtomicReference> multicastSignals = new AtomicReference<>(null); public WrappedReactorTelegramClient(ReactiveTelegramClient reactiveTelegramClient) { this.reactiveTelegramClient = reactiveTelegramClient; } - @SuppressWarnings("Convert2MethodRef") public Mono initialize() { return MonoUtils - .fromBlockingEmpty(() -> reactiveTelegramClient.createAndRegisterClient()); + .fromBlockingEmpty(() -> { + reactiveTelegramClient.createAndRegisterClient(); + Flux signalsFlux = Flux + .create(sink -> { + reactiveTelegramClient.setListener(sink::next); + sink.onCancel(reactiveTelegramClient::cancel); + sink.onDispose(reactiveTelegramClient::dispose); + }, OverflowStrategy.BUFFER) + .subscribeOn(Schedulers.boundedElastic()) + .takeWhile(Signal::isNotClosed); + Flux refCountedSharedSignalsFlux = signalsFlux.publish().refCount(); + multicastSignals.set(refCountedSharedSignalsFlux); + }); } @Override public Flux receive() { return Flux - .create(sink -> reactiveTelegramClient.subscribe(new CoreSubscriber<>() { - @Override - public void onSubscribe(@NotNull Subscription s) { - sink.onCancel(s::cancel); - sink.onRequest(s::request); + .defer(() -> { + Flux flux = multicastSignals.get(); + if (flux == null) { + return Flux.error(new IllegalStateException("TDLib session not started")); + } else { + return flux; } - - @Override - public void onNext(ReactiveItem reactiveItem) { - sink.next(reactiveItem); - } - - @Override - public void onError(Throwable t) { - sink.error(t); - } - - @Override - public void onComplete() { - sink.complete(); - } - }), OverflowStrategy.BUFFER) + }) .handle((item, sink) -> { if (item.isUpdate()) { sink.next(item.getUpdate()); - } else if (item.isHandleException()) { - sink.error(item.getHandleException()); - } else if (item.isUpdateException()) { - sink.error(item.getUpdateException()); + } else if (item.isException()) { + sink.error(item.getException()); } else { sink.error(new IllegalStateException("This shouldn't happen. Received unknown ReactiveItem type")); }