diff --git a/src/main/java/it/tdlight/tdlibsession/td/ReactorTelegramClient.java b/src/main/java/it/tdlight/tdlibsession/td/ReactorTelegramClient.java index 9284663..8e0c6cf 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/ReactorTelegramClient.java +++ b/src/main/java/it/tdlight/tdlibsession/td/ReactorTelegramClient.java @@ -11,7 +11,7 @@ public interface ReactorTelegramClient { Flux receive(); - Mono send(TdApi.Function query); + Mono send(TdApi.Function query); - Object execute(TdApi.Function query); + T execute(TdApi.Function query); } diff --git a/src/main/java/it/tdlight/tdlibsession/td/WrappedReactorTelegramClient.java b/src/main/java/it/tdlight/tdlibsession/td/WrappedReactorTelegramClient.java index 7170f67..9cb9f32 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/WrappedReactorTelegramClient.java +++ b/src/main/java/it/tdlight/tdlibsession/td/WrappedReactorTelegramClient.java @@ -2,6 +2,7 @@ package it.tdlight.tdlibsession.td; import it.tdlight.common.ReactiveTelegramClient; import it.tdlight.jni.TdApi; +import it.tdlight.jni.TdApi.Error; import it.tdlight.utils.MonoUtils; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -24,15 +25,15 @@ public class WrappedReactorTelegramClient implements ReactorTelegramClient { public Flux receive() { return Flux .from(reactiveTelegramClient) - .concatMap(item -> { + .handle((item, sink) -> { if (item.isUpdate()) { - return Mono.just(item.getUpdate()); + sink.next(item.getUpdate()); } else if (item.isHandleException()) { - return Mono.error(item.getHandleException()); + sink.error(item.getHandleException()); } else if (item.isUpdateException()) { - return Mono.error(item.getUpdateException()); + sink.error(item.getUpdateException()); } else { - return Mono.error(new IllegalStateException("This shouldn't happen. Received unknown ReactiveItem type")); + sink.error(new IllegalStateException("This shouldn't happen. Received unknown ReactiveItem type")); } }); } @@ -45,8 +46,16 @@ public class WrappedReactorTelegramClient implements ReactorTelegramClient { * @return a publisher that will emit exactly one item, or an error */ @Override - public Mono send(TdApi.Function query) { - return Mono.from(reactiveTelegramClient.send(query)).single(); + public Mono send(TdApi.Function query) { + return Flux.from(reactiveTelegramClient.send(query)).single().handle((item, sink) -> { + if (item.getConstructor() == Error.CONSTRUCTOR) { + var error = ((TdApi.Error) item); + sink.error(new TdError(error.code, error.message)); + } else { + //noinspection unchecked + sink.next((T) item); + } + }); } /** @@ -57,7 +66,8 @@ public class WrappedReactorTelegramClient implements ReactorTelegramClient { * @throws NullPointerException if query is null. */ @Override - public TdApi.Object execute(TdApi.Function query) { - return reactiveTelegramClient.execute(query); + public T execute(TdApi.Function query) { + //noinspection unchecked + return (T) reactiveTelegramClient.execute(query); } } diff --git a/src/main/java/it/tdlight/tdlibsession/td/direct/TestClient.java b/src/main/java/it/tdlight/tdlibsession/td/direct/TestClient.java index f59c87d..f591e60 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/direct/TestClient.java +++ b/src/main/java/it/tdlight/tdlibsession/td/direct/TestClient.java @@ -14,7 +14,6 @@ import it.tdlight.jni.TdApi.Function; import it.tdlight.jni.TdApi.Message; import it.tdlight.jni.TdApi.MessageSenderUser; import it.tdlight.jni.TdApi.MessageText; -import it.tdlight.jni.TdApi.Object; import it.tdlight.jni.TdApi.Ok; import it.tdlight.jni.TdApi.SetLogTagVerbosityLevel; import it.tdlight.jni.TdApi.SetLogVerbosityLevel; @@ -110,23 +109,29 @@ public class TestClient implements ReactorTelegramClient { } @Override - public Mono send(Function query) { - return Mono.fromCallable(() -> { + public Mono send(Function query) { + return Mono.fromCallable(() -> { TdApi.Object result = executeCommon(query); if (result != null) { - return result; + if (result.getConstructor() != Error.CONSTRUCTOR) { + //noinspection unchecked + return (T) result; + } else { + Error error = (Error) result; + throw new TdError(error.code, error.message); + } } throw new TdError(500, "Unsupported"); }); } @Override - public TdApi.Object execute(Function query) { + public T execute(Function query) { TdApi.Object result = executeCommon(query); if (result != null) { - return result; + return (T) result; } - return new Error(500, "Unsupported"); + throw new TdError(500, "Unsupported"); } @Nullable