2021-02-13 17:42:22 +01:00
|
|
|
package it.tdlight.tdlibsession.td;
|
|
|
|
|
2021-08-05 17:33:21 +02:00
|
|
|
import it.tdlight.common.ReactiveItem;
|
2021-02-13 17:42:22 +01:00
|
|
|
import it.tdlight.common.ReactiveTelegramClient;
|
|
|
|
import it.tdlight.jni.TdApi;
|
2021-02-25 11:21:03 +01:00
|
|
|
import it.tdlight.utils.MonoUtils;
|
2021-08-05 17:33:21 +02:00
|
|
|
import org.jetbrains.annotations.NotNull;
|
|
|
|
import org.reactivestreams.Subscriber;
|
|
|
|
import org.reactivestreams.Subscription;
|
|
|
|
import reactor.core.CoreSubscriber;
|
2021-02-13 17:42:22 +01:00
|
|
|
import reactor.core.publisher.Flux;
|
2021-08-05 17:33:21 +02:00
|
|
|
import reactor.core.publisher.FluxSink.OverflowStrategy;
|
2021-02-13 17:42:22 +01:00
|
|
|
import reactor.core.publisher.Mono;
|
|
|
|
|
|
|
|
public class WrappedReactorTelegramClient implements ReactorTelegramClient {
|
|
|
|
|
|
|
|
private final ReactiveTelegramClient reactiveTelegramClient;
|
|
|
|
|
|
|
|
public WrappedReactorTelegramClient(ReactiveTelegramClient reactiveTelegramClient) {
|
|
|
|
this.reactiveTelegramClient = reactiveTelegramClient;
|
|
|
|
}
|
|
|
|
|
2021-02-25 11:21:03 +01:00
|
|
|
@SuppressWarnings("Convert2MethodRef")
|
|
|
|
public Mono<Void> initialize() {
|
|
|
|
return MonoUtils
|
|
|
|
.fromBlockingEmpty(() -> reactiveTelegramClient.createAndRegisterClient());
|
|
|
|
}
|
|
|
|
|
2021-02-13 17:42:22 +01:00
|
|
|
@Override
|
2021-02-25 11:21:03 +01:00
|
|
|
public Flux<TdApi.Object> receive() {
|
|
|
|
return Flux
|
2021-08-05 17:33:21 +02:00
|
|
|
.<ReactiveItem>create(sink -> reactiveTelegramClient.subscribe(new CoreSubscriber<>() {
|
|
|
|
@Override
|
|
|
|
public void onSubscribe(@NotNull Subscription s) {
|
|
|
|
sink.onCancel(s::cancel);
|
|
|
|
sink.onRequest(s::request);
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public void onNext(ReactiveItem reactiveItem) {
|
|
|
|
sink.next(reactiveItem);
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public void onError(Throwable t) {
|
|
|
|
sink.error(t);
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public void onComplete() {
|
|
|
|
sink.complete();
|
|
|
|
}
|
|
|
|
}), OverflowStrategy.BUFFER)
|
2021-04-11 14:57:05 +02:00
|
|
|
.handle((item, sink) -> {
|
2021-02-25 11:21:03 +01:00
|
|
|
if (item.isUpdate()) {
|
2021-04-11 14:57:05 +02:00
|
|
|
sink.next(item.getUpdate());
|
2021-02-25 11:21:03 +01:00
|
|
|
} else if (item.isHandleException()) {
|
2021-04-11 14:57:05 +02:00
|
|
|
sink.error(item.getHandleException());
|
2021-02-25 11:21:03 +01:00
|
|
|
} else if (item.isUpdateException()) {
|
2021-04-11 14:57:05 +02:00
|
|
|
sink.error(item.getUpdateException());
|
2021-02-25 11:21:03 +01:00
|
|
|
} else {
|
2021-04-11 14:57:05 +02:00
|
|
|
sink.error(new IllegalStateException("This shouldn't happen. Received unknown ReactiveItem type"));
|
2021-02-25 11:21:03 +01:00
|
|
|
}
|
|
|
|
});
|
2021-02-13 17:42:22 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Sends a request to the TDLib.
|
|
|
|
*
|
|
|
|
* @param query Object representing a query to the TDLib.
|
|
|
|
* @throws NullPointerException if query is null.
|
|
|
|
* @return a publisher that will emit exactly one item, or an error
|
|
|
|
*/
|
|
|
|
@Override
|
2021-04-13 21:48:36 +02:00
|
|
|
public Mono<TdApi.Object> send(TdApi.Function query) {
|
2021-04-18 23:29:10 +02:00
|
|
|
return Mono.from(reactiveTelegramClient.send(query)).single();
|
2021-02-13 17:42:22 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Synchronously executes a TDLib request. Only a few marked accordingly requests can be executed synchronously.
|
|
|
|
*
|
|
|
|
* @param query Object representing a query to the TDLib.
|
|
|
|
* @return request result or {@link TdApi.Error}.
|
|
|
|
* @throws NullPointerException if query is null.
|
|
|
|
*/
|
|
|
|
@Override
|
2021-04-13 21:48:36 +02:00
|
|
|
public TdApi.Object execute(TdApi.Function query) {
|
|
|
|
return reactiveTelegramClient.execute(query);
|
2021-02-13 17:42:22 +01:00
|
|
|
}
|
|
|
|
}
|