This commit is contained in:
Andrea Cavalli 2020-10-14 15:14:54 +02:00
parent cdfc68d30f
commit 9ea736308a
8 changed files with 71 additions and 30 deletions

View File

@ -4,7 +4,7 @@
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<groupId>it.tdlight</groupId> <groupId>it.tdlight</groupId>
<artifactId>tdlib-session-container</artifactId> <artifactId>tdlib-session-container</artifactId>
<version>3.169.50</version> <version>3.169.52</version>
<name>TDLib Session Container</name> <name>TDLib Session Container</name>
<properties> <properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
@ -84,7 +84,7 @@
<dependency> <dependency>
<groupId>it.tdlight</groupId> <groupId>it.tdlight</groupId>
<artifactId>tdlight-java</artifactId> <artifactId>tdlight-java</artifactId>
<version>3.169.50</version> <version>3.169.52</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>it.cavallium</groupId> <groupId>it.cavallium</groupId>

View File

@ -2,6 +2,7 @@ package it.tdlight.tdlibsession.td;
import it.tdlight.jni.TdApi; import it.tdlight.jni.TdApi;
import it.tdlight.jni.TdApi.Error; import it.tdlight.jni.TdApi.Error;
import java.util.StringJoiner;
import java.util.concurrent.CompletionException; import java.util.concurrent.CompletionException;
import java.util.function.Function; import java.util.function.Function;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
@ -273,5 +274,13 @@ public interface TdResult<T extends TdApi.Object> {
public boolean failed() { public boolean failed() {
return error != null; return error != null;
} }
@Override
public String toString() {
return new StringJoiner(", ", TdResultImpl.class.getSimpleName() + "[", "]")
.add("value=" + value)
.add("error=" + error)
.toString();
}
} }
} }

View File

@ -3,6 +3,7 @@ package it.tdlight.tdlibsession.td;
import it.tdlight.jni.TdApi; import it.tdlight.jni.TdApi;
import it.tdlight.jni.TdApi.Error; import it.tdlight.jni.TdApi.Error;
import it.tdlight.jni.TdApi.Object; import it.tdlight.jni.TdApi.Object;
import java.util.StringJoiner;
public class TdResultMessage { public class TdResultMessage {
public final TdApi.Object value; public final TdApi.Object value;
@ -21,4 +22,12 @@ public class TdResultMessage {
return TdResult.failed(cause); return TdResult.failed(cause);
} }
} }
@Override
public String toString() {
return new StringJoiner(", ", TdResultMessage.class.getSimpleName() + "[", "]")
.add("value=" + value)
.add("cause=" + cause)
.toString();
}
} }

View File

@ -144,7 +144,7 @@ public class AsyncTdEasy {
* @return The response or {@link TdApi.Error}. * @return The response or {@link TdApi.Error}.
*/ */
public <T extends Object> Mono<TdResult<T>> send(TdApi.Function request) { public <T extends Object> Mono<TdResult<T>> send(TdApi.Function request) {
return td.execute(request, false); return td.<T>execute(request, false);
} }
private <T extends TdApi.Object> Mono<TdResult<T>> sendDirectly(TdApi.Function obj) { private <T extends TdApi.Object> Mono<TdResult<T>> sendDirectly(TdApi.Function obj) {
@ -156,7 +156,7 @@ public class AsyncTdEasy {
* @param i level * @param i level
*/ */
public Mono<Void> setVerbosityLevel(int i) { public Mono<Void> setVerbosityLevel(int i) {
return sendDirectly(new TdApi.SetLogVerbosityLevel(i)).then(); return MonoUtils.thenOrError(sendDirectly(new TdApi.SetLogVerbosityLevel(i)));
} }
/** /**
@ -164,7 +164,7 @@ public class AsyncTdEasy {
* @param name option name * @param name option name
*/ */
public Mono<Void> clearOption(String name) { public Mono<Void> clearOption(String name) {
return sendDirectly(new TdApi.SetOption(name, new TdApi.OptionValueEmpty())).then(); return MonoUtils.thenOrError(sendDirectly(new TdApi.SetOption(name, new TdApi.OptionValueEmpty())));
} }
/** /**
@ -173,7 +173,7 @@ public class AsyncTdEasy {
* @param value option value * @param value option value
*/ */
public Mono<Void> setOptionString(String name, String value) { public Mono<Void> setOptionString(String name, String value) {
return sendDirectly(new TdApi.SetOption(name, new TdApi.OptionValueString(value))).then(); return MonoUtils.thenOrError(sendDirectly(new TdApi.SetOption(name, new TdApi.OptionValueString(value))));
} }
/** /**
@ -182,7 +182,7 @@ public class AsyncTdEasy {
* @param value option value * @param value option value
*/ */
public Mono<Void> setOptionInteger(String name, long value) { public Mono<Void> setOptionInteger(String name, long value) {
return sendDirectly(new TdApi.SetOption(name, new TdApi.OptionValueInteger(value))).then(); return MonoUtils.thenOrError(sendDirectly(new TdApi.SetOption(name, new TdApi.OptionValueInteger(value))));
} }
/** /**
@ -191,7 +191,7 @@ public class AsyncTdEasy {
* @param value option value * @param value option value
*/ */
public Mono<Void> setOptionBoolean(String name, boolean value) { public Mono<Void> setOptionBoolean(String name, boolean value) {
return sendDirectly(new TdApi.SetOption(name, new TdApi.OptionValueBoolean(value))).then(); return MonoUtils.thenOrError(sendDirectly(new TdApi.SetOption(name, new TdApi.OptionValueBoolean(value))));
} }
/** /**
@ -349,7 +349,7 @@ public class AsyncTdEasy {
this.authState.onNext(new AuthorizationStateReady()); this.authState.onNext(new AuthorizationStateReady());
switch (obj.getConstructor()) { switch (obj.getConstructor()) {
case AuthorizationStateWaitTdlibParameters.CONSTRUCTOR: case AuthorizationStateWaitTdlibParameters.CONSTRUCTOR:
return Mono.from(this.settings).map(settings -> { return MonoUtils.thenOrError(Mono.from(this.settings).map(settings -> {
var parameters = new TdlibParameters(); var parameters = new TdlibParameters();
parameters.useTestDc = settings.useTestDc; parameters.useTestDc = settings.useTestDc;
parameters.databaseDirectory = settings.databaseDirectory; parameters.databaseDirectory = settings.databaseDirectory;
@ -367,11 +367,11 @@ public class AsyncTdEasy {
parameters.enableStorageOptimizer = settings.enableStorageOptimizer; parameters.enableStorageOptimizer = settings.enableStorageOptimizer;
parameters.ignoreFileNames = settings.ignoreFileNames; parameters.ignoreFileNames = settings.ignoreFileNames;
return new SetTdlibParameters(parameters); return new SetTdlibParameters(parameters);
}).flatMap(this::sendDirectly).then(); }).flatMap(this::sendDirectly));
case AuthorizationStateWaitEncryptionKey.CONSTRUCTOR: case AuthorizationStateWaitEncryptionKey.CONSTRUCTOR:
return sendDirectly(new CheckDatabaseEncryptionKey()).then(); return MonoUtils.thenOrError(sendDirectly(new CheckDatabaseEncryptionKey()));
case AuthorizationStateWaitPhoneNumber.CONSTRUCTOR: case AuthorizationStateWaitPhoneNumber.CONSTRUCTOR:
return Mono.from(this.settings).flatMap(settings -> { return MonoUtils.thenOrError(Mono.from(this.settings).flatMap(settings -> {
if (settings.isPhoneNumberSet()) { if (settings.isPhoneNumberSet()) {
return sendDirectly(new SetAuthenticationPhoneNumber(String.valueOf(settings.getPhoneNumber()), return sendDirectly(new SetAuthenticationPhoneNumber(String.valueOf(settings.getPhoneNumber()),
new PhoneNumberAuthenticationSettings(false, false, false) new PhoneNumberAuthenticationSettings(false, false, false)
@ -381,7 +381,7 @@ public class AsyncTdEasy {
} else { } else {
return Mono.error(new IllegalArgumentException("A bot is neither an user or a bot")); return Mono.error(new IllegalArgumentException("A bot is neither an user or a bot"));
} }
}).then(); }));
case AuthorizationStateWaitRegistration.CONSTRUCTOR: case AuthorizationStateWaitRegistration.CONSTRUCTOR:
var authorizationStateWaitRegistration = (AuthorizationStateWaitRegistration) obj; var authorizationStateWaitRegistration = (AuthorizationStateWaitRegistration) obj;
RegisterUser registerUser = new RegisterUser(); RegisterUser registerUser = new RegisterUser();
@ -398,7 +398,7 @@ public class AsyncTdEasy {
registerUser.lastName = ScannerUtils.askParameter(this.logName, "Enter Last Name").trim(); registerUser.lastName = ScannerUtils.askParameter(this.logName, "Enter Last Name").trim();
} }
return sendDirectly(registerUser).then(); return MonoUtils.thenOrError(sendDirectly(registerUser));
case AuthorizationStateWaitPassword.CONSTRUCTOR: case AuthorizationStateWaitPassword.CONSTRUCTOR:
var authorizationStateWaitPassword = (AuthorizationStateWaitPassword) obj; var authorizationStateWaitPassword = (AuthorizationStateWaitPassword) obj;
String passwordMessage = "Password authorization of '" + this.logName + "':"; String passwordMessage = "Password authorization of '" + this.logName + "':";
@ -409,7 +409,7 @@ public class AsyncTdEasy {
var password = ScannerUtils.askParameter(this.logName, "Enter your password"); var password = ScannerUtils.askParameter(this.logName, "Enter your password");
return sendDirectly(new CheckAuthenticationPassword(password)).then(); return MonoUtils.thenOrError(sendDirectly(new CheckAuthenticationPassword(password)));
case AuthorizationStateReady.CONSTRUCTOR: { case AuthorizationStateReady.CONSTRUCTOR: {
return Mono.empty(); return Mono.empty();
} }

View File

@ -288,7 +288,7 @@ public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements Asy
@Override @Override
public Flux<Update> getUpdates() { public Flux<Update> getUpdates() {
return incomingUpdatesCo.filter(Objects::nonNull).take(1).single().flatMapMany(v -> v); return incomingUpdatesCo.filter(Objects::nonNull).flatMap(v -> v);
} }
@Override @Override
@ -305,19 +305,19 @@ public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements Asy
return Mono.from(tdClosed).single() return Mono.from(tdClosed).single()
.filter(tdClosed -> !tdClosed) .filter(tdClosed -> !tdClosed)
.<TdResult<T>>handle((_x, sink) -> { .<TdResult<T>>flatMap((_x) -> Mono.create(sink -> {
cluster.getEventBus().request(botAddress + ".execute", req, cluster.newDeliveryOpts().setLocalOnly(local), (AsyncResult<Message<TdResultMessage>> event) -> { cluster.getEventBus().request(botAddress + ".execute", req, cluster.newDeliveryOpts().setLocalOnly(local), (AsyncResult<Message<TdResultMessage>> event) -> {
if (event.succeeded()) { if (event.succeeded()) {
if (event.result().body() == null) { if (event.result().body() == null) {
sink.complete(); sink.error(new NullPointerException("Response is empty"));
} else { } else {
sink.next(Objects.requireNonNull(event.result().body()).toTdResult()); sink.success(Objects.requireNonNull(event.result().body()).toTdResult());
} }
} else { } else {
sink.error(ResponseError.newResponseError(request, botAlias, event.cause())); sink.error(ResponseError.newResponseError(request, botAlias, event.cause()));
} }
}); });
}).handle((response, sink) -> { })).<TdResult<T>>handle((response, sink) -> {
try { try {
Objects.requireNonNull(response); Objects.requireNonNull(response);
if (OUTPUT_REQUESTS) { if (OUTPUT_REQUESTS) {
@ -331,6 +331,8 @@ public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements Asy
} catch (ClassCastException | NullPointerException e) { } catch (ClassCastException | NullPointerException e) {
sink.error(e); sink.error(e);
} }
}); }).switchIfEmpty(Mono.fromSupplier(() -> {
return TdResult.failed(new TdApi.Error(500, "Client is closed or response is empty"));
}));
} }
} }

View File

@ -40,7 +40,7 @@ public class AsyncTdMiddleLocal implements AsyncTdMiddle {
masterClusterManager masterClusterManager
.getVertx() .getVertx()
.deployVerticle(srv, .deployVerticle(srv,
new DeploymentOptions().setConfig(new JsonObject().put("botAddress", botAddress).put("local", true)), new DeploymentOptions().setConfig(new JsonObject().put("botAddress", botAddress).put("botAlias", botAlias).put("local", true)),
MonoUtils.toHandler(sink) MonoUtils.toHandler(sink)
); );
}).onErrorMap(InitializationException::new).flatMap(_x -> { }).onErrorMap(InitializationException::new).flatMap(_x -> {
@ -62,6 +62,9 @@ public class AsyncTdMiddleLocal implements AsyncTdMiddle {
@Override @Override
public <T extends Object> Mono<TdResult<T>> execute(Function request, boolean executeDirectly) { public <T extends Object> Mono<TdResult<T>> execute(Function request, boolean executeDirectly) {
return cli.filter(Objects::nonNull).single().flatMap(c -> c.execute(request, executeDirectly)); return cli
.filter(obj -> Objects.nonNull(obj))
.single()
.flatMap(c -> c.<T>execute(request, executeDirectly));
} }
} }

View File

@ -7,6 +7,7 @@ import io.vertx.core.AsyncResult;
import io.vertx.core.Promise; import io.vertx.core.Promise;
import io.vertx.core.eventbus.Message; import io.vertx.core.eventbus.Message;
import it.tdlight.common.ConstructorDetector; import it.tdlight.common.ConstructorDetector;
import it.tdlight.jni.TdApi;
import it.tdlight.jni.TdApi.AuthorizationStateClosed; import it.tdlight.jni.TdApi.AuthorizationStateClosed;
import it.tdlight.jni.TdApi.Update; import it.tdlight.jni.TdApi.Update;
import it.tdlight.jni.TdApi.UpdateAuthorizationState; import it.tdlight.jni.TdApi.UpdateAuthorizationState;
@ -210,15 +211,22 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
.replace(" ", "") .replace(" ", "")
.replace(" = ", "=")); .replace(" = ", "="));
} }
td.execute(msg.body().getRequest(), msg.body().isExecuteDirectly()).single().subscribe(response -> { td
msg.reply(new TdResultMessage(response.result(), response.cause()), cluster.newDeliveryOpts().setLocalOnly(local)); .execute(msg.body().getRequest(), msg.body().isExecuteDirectly())
}, ex -> { .switchIfEmpty(Mono.fromSupplier(() -> {
msg.fail(500, ex.getLocalizedMessage()); return TdResult.failed(new TdApi.Error(500, "Received null response"));
logger.error("Error when processing a request", ex); }))
}); .subscribe(response -> {
msg.reply(new TdResultMessage(response.result(), response.cause()),
cluster.newDeliveryOpts().setLocalOnly(local)
);
}, ex -> {
logger.error("Error when processing a request", ex);
msg.fail(500, ex.getLocalizedMessage());
});
} catch (ClassCastException ex) { } catch (ClassCastException ex) {
msg.fail(500, ex.getMessage());
logger.error("Error when deserializing a request", ex); logger.error("Error when deserializing a request", ex);
msg.fail(500, ex.getMessage());
} }
}).completionHandler(MonoUtils.toHandler(registrationSink)); }).completionHandler(MonoUtils.toHandler(registrationSink));

View File

@ -125,4 +125,14 @@ public class MonoUtils {
//sink.error(new TdError(value.cause().code, value.cause().message)); //sink.error(new TdError(value.cause().code, value.cause().message));
} }
} }
public static <T extends TdApi.Object> Mono<Void> thenOrError(Mono<TdResult<T>> optionalMono) {
return optionalMono.handle((optional, sink) -> {
if (optional.succeeded()) {
sink.complete();
} else {
sink.error(new TdError(optional.cause().code, optional.cause().message));
}
});
}
} }