diff --git a/pom.xml b/pom.xml index 1500473..d06be31 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ 4.0.0 it.tdlight tdlib-session-container - 3.169.52 + 3.169.54 TDLib Session Container UTF-8 @@ -84,7 +84,7 @@ it.tdlight tdlight-java - 3.169.52 + 3.169.54 it.cavallium diff --git a/src/main/java/it/tdlight/tdlibsession/td/TdError.java b/src/main/java/it/tdlight/tdlibsession/td/TdError.java index 2eb9a1c..1a14e08 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/TdError.java +++ b/src/main/java/it/tdlight/tdlibsession/td/TdError.java @@ -1,12 +1,34 @@ package it.tdlight.tdlibsession.td; +import it.tdlight.jni.TdApi; +import it.tdlight.jni.TdApi.Error; + public class TdError extends RuntimeException { + private final int code; + private final String message; + public TdError(int code, String message) { super(code + " " + message); + this.code = code; + this.message = message; } public TdError(int code, String message, Throwable cause) { super(code + " " + message, cause); + this.code = code; + this.message = message; + } + + public int getTdCode() { + return code; + } + + public String getTdMessage() { + return message; + } + + public TdApi.Error getTdError() { + return new Error(code, message); } } diff --git a/src/main/java/it/tdlight/tdlibsession/td/TdResult.java b/src/main/java/it/tdlight/tdlibsession/td/TdResult.java index 47d2596..74f6e87 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/TdResult.java +++ b/src/main/java/it/tdlight/tdlibsession/td/TdResult.java @@ -2,6 +2,7 @@ package it.tdlight.tdlibsession.td; import it.tdlight.jni.TdApi; import it.tdlight.jni.TdApi.Error; +import java.util.Objects; import java.util.StringJoiner; import java.util.concurrent.CompletionException; import java.util.function.Function; @@ -219,10 +220,12 @@ public interface TdResult { } static TdResult succeeded(@NotNull T value) { + Objects.requireNonNull(value); return new TdResultImpl(value, null); } static TdResult failed(@NotNull TdApi.Error error) { + Objects.requireNonNull(error); return new TdResultImpl(null, error); } diff --git a/src/main/java/it/tdlight/tdlibsession/td/easy/AsyncTdEasy.java b/src/main/java/it/tdlight/tdlibsession/td/easy/AsyncTdEasy.java index 84d6a43..0dc6180 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/easy/AsyncTdEasy.java +++ b/src/main/java/it/tdlight/tdlibsession/td/easy/AsyncTdEasy.java @@ -200,7 +200,7 @@ public class AsyncTdEasy { * @return The value or nothing */ public Mono getOptionString(String name) { - return this.sendDirectly(new TdApi.GetOption(name)).handle(MonoUtils::orElseThrow).flatMap((TdApi.OptionValue value) -> { + return this.sendDirectly(new TdApi.GetOption(name)).flatMap(MonoUtils::orElseThrow).flatMap((TdApi.OptionValue value) -> { switch (value.getConstructor()) { case OptionValueString.CONSTRUCTOR: return Mono.just(((OptionValueString) value).value); @@ -219,7 +219,7 @@ public class AsyncTdEasy { * @return The value or nothing */ public Mono getOptionInteger(String name) { - return this.sendDirectly(new TdApi.GetOption(name)).handle(MonoUtils::orElseThrow).flatMap((TdApi.OptionValue value) -> { + return this.sendDirectly(new TdApi.GetOption(name)).flatMap(MonoUtils::orElseThrow).flatMap((TdApi.OptionValue value) -> { switch (value.getConstructor()) { case OptionValueInteger.CONSTRUCTOR: return Mono.just(((OptionValueInteger) value).value); @@ -238,7 +238,7 @@ public class AsyncTdEasy { * @return The value or nothing */ public Mono getOptionBoolean(String name) { - return this.sendDirectly(new TdApi.GetOption(name)).handle(MonoUtils::orElseThrow).flatMap((TdApi.OptionValue value) -> { + return this.sendDirectly(new TdApi.GetOption(name)).flatMap(MonoUtils::orElseThrow).flatMap((TdApi.OptionValue value) -> { switch (value.getConstructor()) { case OptionValueBoolean.CONSTRUCTOR: return Mono.just(((OptionValueBoolean) value).value); diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java b/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java index d4662e5..01b9b5c 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java @@ -171,7 +171,7 @@ public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements Asy } private Mono pipe() { - incomingUpdatesCo.onNext(this.requestUpdatesBatchFromNetwork() + var updates = this.requestUpdatesBatchFromNetwork() .repeatWhen(nFlux -> { return Flux.push(emitter -> { var dispos = Flux.combineLatest(nFlux, tdClosed, Pair::of).subscribe(val -> { @@ -210,7 +210,13 @@ public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements Asy } }); }) - .log("TdMiddle", Level.FINEST).publish().autoConnect(1)); + .log("TdMiddle", Level.FINEST).publish().autoConnect(1); + + updates.subscribe(t -> incomingUpdatesCo.onNext(Flux.just(t)), + incomingUpdatesCo::onError, + incomingUpdatesCo::onComplete + ); + return Mono.empty(); } @@ -303,32 +309,43 @@ public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements Asy .replace(" = ", "=")); } - return Mono.from(tdClosed).single() - .filter(tdClosed -> !tdClosed) - .>flatMap((_x) -> Mono.create(sink -> { - cluster.getEventBus().request(botAddress + ".execute", req, cluster.newDeliveryOpts().setLocalOnly(local), (AsyncResult> event) -> { - if (event.succeeded()) { - if (event.result().body() == null) { - sink.error(new NullPointerException("Response is empty")); - } else { - sink.success(Objects.requireNonNull(event.result().body()).toTdResult()); - } - } else { - sink.error(ResponseError.newResponseError(request, botAlias, event.cause())); - } - }); - })).>handle((response, sink) -> { + return Mono.from(tdClosed).single().filter(tdClosed -> !tdClosed).>flatMap((_x) -> Mono.create(sink -> { + try { + cluster + .getEventBus() + .request(botAddress + ".execute", + req, + cluster.newDeliveryOpts().setLocalOnly(local), + (AsyncResult> event) -> { + try { + if (event.succeeded()) { + if (event.result().body() == null) { + sink.error(new NullPointerException("Response is empty")); + } else { + sink.success(Objects.requireNonNull(event.result().body()).toTdResult()); + } + } else { + sink.error(ResponseError.newResponseError(request, botAlias, event.cause())); + } + } catch (Throwable t) { + sink.error(t); + } + } + ); + } catch (Throwable t) { + sink.error(t); + } + })).>switchIfEmpty(Mono.defer(() -> Mono.just(TdResult.failed(new TdApi.Error(500, + "Client is closed or response is empty" + ))))).>handle((response, sink) -> { try { Objects.requireNonNull(response); if (OUTPUT_REQUESTS) { - System.out.println(" <- " + response.toString() - .replace("\n", " ") - .replace("\t", "") - .replace(" ", "") - .replace(" = ", "=")); + System.out.println( + " <- " + response.toString().replace("\n", " ").replace("\t", "").replace(" ", "").replace(" = ", "=")); } sink.next(response); - } catch (ClassCastException | NullPointerException e) { + } catch (Exception e) { sink.error(e); } }).switchIfEmpty(Mono.fromSupplier(() -> { diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java b/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java index a4cd8d9..2246081 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java @@ -216,11 +216,17 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { .switchIfEmpty(Mono.fromSupplier(() -> { return TdResult.failed(new TdApi.Error(500, "Received null response")); })) - .subscribe(response -> { - msg.reply(new TdResultMessage(response.result(), response.cause()), - cluster.newDeliveryOpts().setLocalOnly(local) - ); - }, ex -> { + .handle((response, sink) -> { + try { + msg.reply(new TdResultMessage(response.result(), response.cause()), + cluster.newDeliveryOpts().setLocalOnly(local) + ); + sink.next(response); + } catch (Exception ex) { + sink.error(ex); + } + }) + .subscribe(response -> {}, ex -> { logger.error("Error when processing a request", ex); msg.fail(500, ex.getLocalizedMessage()); }); diff --git a/src/main/java/it/tdlight/utils/MonoUtils.java b/src/main/java/it/tdlight/utils/MonoUtils.java index 48e16f6..f27b57c 100644 --- a/src/main/java/it/tdlight/utils/MonoUtils.java +++ b/src/main/java/it/tdlight/utils/MonoUtils.java @@ -5,12 +5,14 @@ import io.vertx.core.Handler; import io.vertx.core.Promise; import io.vertx.core.Vertx; import it.tdlight.jni.TdApi; +import it.tdlight.jni.TdApi.Object; import it.tdlight.tdlibsession.td.TdError; import it.tdlight.tdlibsession.td.TdResult; import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.function.BiConsumer; import java.util.function.Consumer; +import java.util.function.Supplier; import org.reactivestreams.Subscription; import org.warp.commonutils.concurrency.future.CompletableFutureUtils; import reactor.core.CoreSubscriber; @@ -117,12 +119,11 @@ public class MonoUtils { } } - public static void orElseThrow(TdResult value, SynchronousSink sink) { + public static Mono orElseThrow(TdResult value) { if (value.succeeded()) { - sink.next(value.result()); + return Mono.just(value.result()); } else { - sink.complete(); - //sink.error(new TdError(value.cause().code, value.cause().message)); + return Mono.error(new TdError(value.cause().code, value.cause().message)); } } @@ -135,4 +136,42 @@ public class MonoUtils { } }); } + + public static Mono fromFuture(CompletableFuture future) { + return Mono.create(sink -> { + future.whenComplete((result, error) -> { + if (error != null) { + sink.error(error); + } else if (result != null) { + sink.success(result); + } else { + sink.success(); + } + }); + }); + } + + public static Mono fromFuture(Supplier> future) { + return Mono.create(sink -> { + CompletableFutureUtils.getCompletableFuture(future).whenComplete((result, error) -> { + if (error != null) { + sink.error(error.getCause()); + } else if (result != null) { + sink.success(result); + } else { + sink.success(); + } + }); + }); + } + + public static CompletableFuture toFuture(Mono mono) { + var cf = new CompletableFuture(); + mono.subscribe(value -> { + cf.complete(value); + }, ex -> { + cf.completeExceptionally(ex); + }, () -> cf.complete(null)); + return cf; + } }