Update pom.xml, TdError.java, and 5 more files...

This commit is contained in:
Andrea Cavalli 2020-10-14 20:04:23 +02:00
parent 9ea736308a
commit 4fe963c3b7
7 changed files with 124 additions and 37 deletions

View File

@ -4,7 +4,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>it.tdlight</groupId>
<artifactId>tdlib-session-container</artifactId>
<version>3.169.52</version>
<version>3.169.54</version>
<name>TDLib Session Container</name>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
@ -84,7 +84,7 @@
<dependency>
<groupId>it.tdlight</groupId>
<artifactId>tdlight-java</artifactId>
<version>3.169.52</version>
<version>3.169.54</version>
</dependency>
<dependency>
<groupId>it.cavallium</groupId>

View File

@ -1,12 +1,34 @@
package it.tdlight.tdlibsession.td;
import it.tdlight.jni.TdApi;
import it.tdlight.jni.TdApi.Error;
public class TdError extends RuntimeException {
private final int code;
private final String message;
public TdError(int code, String message) {
super(code + " " + message);
this.code = code;
this.message = message;
}
public TdError(int code, String message, Throwable cause) {
super(code + " " + message, cause);
this.code = code;
this.message = message;
}
public int getTdCode() {
return code;
}
public String getTdMessage() {
return message;
}
public TdApi.Error getTdError() {
return new Error(code, message);
}
}

View File

@ -2,6 +2,7 @@ package it.tdlight.tdlibsession.td;
import it.tdlight.jni.TdApi;
import it.tdlight.jni.TdApi.Error;
import java.util.Objects;
import java.util.StringJoiner;
import java.util.concurrent.CompletionException;
import java.util.function.Function;
@ -219,10 +220,12 @@ public interface TdResult<T extends TdApi.Object> {
}
static <T extends TdApi.Object> TdResult<T> succeeded(@NotNull T value) {
Objects.requireNonNull(value);
return new TdResultImpl<T>(value, null);
}
static <T extends TdApi.Object> TdResult<T> failed(@NotNull TdApi.Error error) {
Objects.requireNonNull(error);
return new TdResultImpl<T>(null, error);
}

View File

@ -200,7 +200,7 @@ public class AsyncTdEasy {
* @return The value or nothing
*/
public Mono<String> getOptionString(String name) {
return this.<TdApi.OptionValue>sendDirectly(new TdApi.GetOption(name)).<OptionValue>handle(MonoUtils::orElseThrow).flatMap((TdApi.OptionValue value) -> {
return this.<TdApi.OptionValue>sendDirectly(new TdApi.GetOption(name)).<OptionValue>flatMap(MonoUtils::orElseThrow).flatMap((TdApi.OptionValue value) -> {
switch (value.getConstructor()) {
case OptionValueString.CONSTRUCTOR:
return Mono.just(((OptionValueString) value).value);
@ -219,7 +219,7 @@ public class AsyncTdEasy {
* @return The value or nothing
*/
public Mono<Long> getOptionInteger(String name) {
return this.<TdApi.OptionValue>sendDirectly(new TdApi.GetOption(name)).<TdApi.OptionValue>handle(MonoUtils::orElseThrow).flatMap((TdApi.OptionValue value) -> {
return this.<TdApi.OptionValue>sendDirectly(new TdApi.GetOption(name)).<TdApi.OptionValue>flatMap(MonoUtils::orElseThrow).flatMap((TdApi.OptionValue value) -> {
switch (value.getConstructor()) {
case OptionValueInteger.CONSTRUCTOR:
return Mono.just(((OptionValueInteger) value).value);
@ -238,7 +238,7 @@ public class AsyncTdEasy {
* @return The value or nothing
*/
public Mono<Boolean> getOptionBoolean(String name) {
return this.<TdApi.OptionValue>sendDirectly(new TdApi.GetOption(name)).<TdApi.OptionValue>handle(MonoUtils::orElseThrow).flatMap((TdApi.OptionValue value) -> {
return this.<TdApi.OptionValue>sendDirectly(new TdApi.GetOption(name)).<TdApi.OptionValue>flatMap(MonoUtils::orElseThrow).flatMap((TdApi.OptionValue value) -> {
switch (value.getConstructor()) {
case OptionValueBoolean.CONSTRUCTOR:
return Mono.just(((OptionValueBoolean) value).value);

View File

@ -171,7 +171,7 @@ public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements Asy
}
private Mono<Void> pipe() {
incomingUpdatesCo.onNext(this.requestUpdatesBatchFromNetwork()
var updates = this.requestUpdatesBatchFromNetwork()
.repeatWhen(nFlux -> {
return Flux.push(emitter -> {
var dispos = Flux.combineLatest(nFlux, tdClosed, Pair::of).subscribe(val -> {
@ -210,7 +210,13 @@ public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements Asy
}
});
})
.log("TdMiddle", Level.FINEST).publish().autoConnect(1));
.log("TdMiddle", Level.FINEST).publish().autoConnect(1);
updates.subscribe(t -> incomingUpdatesCo.onNext(Flux.just(t)),
incomingUpdatesCo::onError,
incomingUpdatesCo::onComplete
);
return Mono.empty();
}
@ -303,32 +309,43 @@ public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements Asy
.replace(" = ", "="));
}
return Mono.from(tdClosed).single()
.filter(tdClosed -> !tdClosed)
.<TdResult<T>>flatMap((_x) -> Mono.create(sink -> {
cluster.getEventBus().request(botAddress + ".execute", req, cluster.newDeliveryOpts().setLocalOnly(local), (AsyncResult<Message<TdResultMessage>> event) -> {
if (event.succeeded()) {
if (event.result().body() == null) {
sink.error(new NullPointerException("Response is empty"));
} else {
sink.success(Objects.requireNonNull(event.result().body()).toTdResult());
}
} else {
sink.error(ResponseError.newResponseError(request, botAlias, event.cause()));
}
});
})).<TdResult<T>>handle((response, sink) -> {
return Mono.from(tdClosed).single().filter(tdClosed -> !tdClosed).<TdResult<T>>flatMap((_x) -> Mono.create(sink -> {
try {
cluster
.getEventBus()
.request(botAddress + ".execute",
req,
cluster.newDeliveryOpts().setLocalOnly(local),
(AsyncResult<Message<TdResultMessage>> event) -> {
try {
if (event.succeeded()) {
if (event.result().body() == null) {
sink.error(new NullPointerException("Response is empty"));
} else {
sink.success(Objects.requireNonNull(event.result().body()).toTdResult());
}
} else {
sink.error(ResponseError.newResponseError(request, botAlias, event.cause()));
}
} catch (Throwable t) {
sink.error(t);
}
}
);
} catch (Throwable t) {
sink.error(t);
}
})).<TdResult<T>>switchIfEmpty(Mono.defer(() -> Mono.just(TdResult.<T>failed(new TdApi.Error(500,
"Client is closed or response is empty"
))))).<TdResult<T>>handle((response, sink) -> {
try {
Objects.requireNonNull(response);
if (OUTPUT_REQUESTS) {
System.out.println(" <- " + response.toString()
.replace("\n", " ")
.replace("\t", "")
.replace(" ", "")
.replace(" = ", "="));
System.out.println(
" <- " + response.toString().replace("\n", " ").replace("\t", "").replace(" ", "").replace(" = ", "="));
}
sink.next(response);
} catch (ClassCastException | NullPointerException e) {
} catch (Exception e) {
sink.error(e);
}
}).switchIfEmpty(Mono.fromSupplier(() -> {

View File

@ -216,11 +216,17 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
.switchIfEmpty(Mono.fromSupplier(() -> {
return TdResult.failed(new TdApi.Error(500, "Received null response"));
}))
.subscribe(response -> {
msg.reply(new TdResultMessage(response.result(), response.cause()),
cluster.newDeliveryOpts().setLocalOnly(local)
);
}, ex -> {
.handle((response, sink) -> {
try {
msg.reply(new TdResultMessage(response.result(), response.cause()),
cluster.newDeliveryOpts().setLocalOnly(local)
);
sink.next(response);
} catch (Exception ex) {
sink.error(ex);
}
})
.subscribe(response -> {}, ex -> {
logger.error("Error when processing a request", ex);
msg.fail(500, ex.getLocalizedMessage());
});

View File

@ -5,12 +5,14 @@ import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import it.tdlight.jni.TdApi;
import it.tdlight.jni.TdApi.Object;
import it.tdlight.tdlibsession.td.TdError;
import it.tdlight.tdlibsession.td.TdResult;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.reactivestreams.Subscription;
import org.warp.commonutils.concurrency.future.CompletableFutureUtils;
import reactor.core.CoreSubscriber;
@ -117,12 +119,11 @@ public class MonoUtils {
}
}
public static <R extends TdApi.Object> void orElseThrow(TdResult<R> value, SynchronousSink<R> sink) {
public static <R extends TdApi.Object> Mono<R> orElseThrow(TdResult<R> value) {
if (value.succeeded()) {
sink.next(value.result());
return Mono.just(value.result());
} else {
sink.complete();
//sink.error(new TdError(value.cause().code, value.cause().message));
return Mono.error(new TdError(value.cause().code, value.cause().message));
}
}
@ -135,4 +136,42 @@ public class MonoUtils {
}
});
}
public static <T> Mono<T> fromFuture(CompletableFuture<T> future) {
return Mono.create(sink -> {
future.whenComplete((result, error) -> {
if (error != null) {
sink.error(error);
} else if (result != null) {
sink.success(result);
} else {
sink.success();
}
});
});
}
public static <T> Mono<T> fromFuture(Supplier<CompletableFuture<T>> future) {
return Mono.create(sink -> {
CompletableFutureUtils.getCompletableFuture(future).whenComplete((result, error) -> {
if (error != null) {
sink.error(error.getCause());
} else if (result != null) {
sink.success(result);
} else {
sink.success();
}
});
});
}
public static <T extends Object> CompletableFuture<T> toFuture(Mono<T> mono) {
var cf = new CompletableFuture<T>();
mono.subscribe(value -> {
cf.complete(value);
}, ex -> {
cf.completeExceptionally(ex);
}, () -> cf.complete(null));
return cf;
}
}