Update tdlight-java
This commit is contained in:
parent
33d8347da2
commit
23023db235
2
pom.xml
2
pom.xml
|
@ -66,7 +66,7 @@
|
|||
<dependency>
|
||||
<groupId>it.tdlight</groupId>
|
||||
<artifactId>tdlight-java</artifactId>
|
||||
<version>2.7.8.11</version>
|
||||
<version>2.7.8.12</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</dependencyManagement>
|
||||
|
|
|
@ -1,10 +1,13 @@
|
|||
package it.tdlight.tdlibsession.td;
|
||||
|
||||
import it.tdlight.common.ReactiveItem;
|
||||
import it.tdlight.common.ReactiveTelegramClient;
|
||||
import it.tdlight.common.Signal;
|
||||
import it.tdlight.common.SignalListener;
|
||||
import it.tdlight.common.UpdatesHandler;
|
||||
import it.tdlight.jni.TdApi;
|
||||
import it.tdlight.utils.MonoUtils;
|
||||
import java.time.Duration;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
import org.reactivestreams.Subscriber;
|
||||
import org.reactivestreams.Subscription;
|
||||
|
@ -12,53 +15,50 @@ import reactor.core.CoreSubscriber;
|
|||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.FluxSink.OverflowStrategy;
|
||||
import reactor.core.publisher.Mono;
|
||||
import reactor.core.scheduler.Schedulers;
|
||||
|
||||
public class WrappedReactorTelegramClient implements ReactorTelegramClient {
|
||||
|
||||
private final ReactiveTelegramClient reactiveTelegramClient;
|
||||
private final AtomicReference<Flux<Signal>> multicastSignals = new AtomicReference<>(null);
|
||||
|
||||
public WrappedReactorTelegramClient(ReactiveTelegramClient reactiveTelegramClient) {
|
||||
this.reactiveTelegramClient = reactiveTelegramClient;
|
||||
}
|
||||
|
||||
@SuppressWarnings("Convert2MethodRef")
|
||||
public Mono<Void> initialize() {
|
||||
return MonoUtils
|
||||
.fromBlockingEmpty(() -> reactiveTelegramClient.createAndRegisterClient());
|
||||
.fromBlockingEmpty(() -> {
|
||||
reactiveTelegramClient.createAndRegisterClient();
|
||||
Flux<Signal> signalsFlux = Flux
|
||||
.<Signal>create(sink -> {
|
||||
reactiveTelegramClient.setListener(sink::next);
|
||||
sink.onCancel(reactiveTelegramClient::cancel);
|
||||
sink.onDispose(reactiveTelegramClient::dispose);
|
||||
}, OverflowStrategy.BUFFER)
|
||||
.subscribeOn(Schedulers.boundedElastic())
|
||||
.takeWhile(Signal::isNotClosed);
|
||||
Flux<Signal> refCountedSharedSignalsFlux = signalsFlux.publish().refCount();
|
||||
multicastSignals.set(refCountedSharedSignalsFlux);
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public Flux<TdApi.Object> receive() {
|
||||
return Flux
|
||||
.<ReactiveItem>create(sink -> reactiveTelegramClient.subscribe(new CoreSubscriber<>() {
|
||||
@Override
|
||||
public void onSubscribe(@NotNull Subscription s) {
|
||||
sink.onCancel(s::cancel);
|
||||
sink.onRequest(s::request);
|
||||
.defer(() -> {
|
||||
Flux<Signal> flux = multicastSignals.get();
|
||||
if (flux == null) {
|
||||
return Flux.error(new IllegalStateException("TDLib session not started"));
|
||||
} else {
|
||||
return flux;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onNext(ReactiveItem reactiveItem) {
|
||||
sink.next(reactiveItem);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(Throwable t) {
|
||||
sink.error(t);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onComplete() {
|
||||
sink.complete();
|
||||
}
|
||||
}), OverflowStrategy.BUFFER)
|
||||
})
|
||||
.handle((item, sink) -> {
|
||||
if (item.isUpdate()) {
|
||||
sink.next(item.getUpdate());
|
||||
} else if (item.isHandleException()) {
|
||||
sink.error(item.getHandleException());
|
||||
} else if (item.isUpdateException()) {
|
||||
sink.error(item.getUpdateException());
|
||||
} else if (item.isException()) {
|
||||
sink.error(item.getException());
|
||||
} else {
|
||||
sink.error(new IllegalStateException("This shouldn't happen. Received unknown ReactiveItem type"));
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue