diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..ea3b1fc --- /dev/null +++ b/pom.xml @@ -0,0 +1,207 @@ + + 4.0.0 + it.tdlight + tdlib-session-container + 3.169.50 + TDLib Session Container + + UTF-8 + + 11 + 11 + + + + protoarch + protoarch + http://home.apache.org/~aajisaka/repository + + + mchv-release + MCHV Release Apache Maven Packages + https://mvn.mchv.eu/repository/mchv + + + mchv-snapshot + MCHV Snapshot Apache Maven Packages + https://mvn.mchv.eu/repository/mchv-snapshot + + + + + io.vertx + vertx-core + 3.9.3 + + + io.vertx + vertx-hazelcast + 3.9.3 + + + io.vertx + vertx-reactive-streams + 3.9.3 + + + io.vertx + vertx-circuit-breaker + 3.9.3 + + + io.projectreactor + reactor-core + 3.3.10.RELEASE + + + io.projectreactor + reactor-tools + 3.3.10.RELEASE + + + org.slf4j + slf4j-api + 1.7.30 + + + org.apache.logging.log4j + log4j-core + 2.12.1 + + + org.apache.logging.log4j + log4j-slf4j-impl + 2.12.1 + + + org.warp + common-utils + 1.0.8 + + + + it.tdlight + tdlight-java + 3.169.50 + + + it.cavallium + concurrent-locks + 1.0.5 + + + javax.annotation + javax.annotation-api + 1.2 + + + + it.unimi.dsi + fastutil + 8.3.0 + + + org.junit.jupiter + junit-jupiter-api + RELEASE + test + + + org.hamcrest + hamcrest-core + + + + + com.google.code.gson + gson + 2.8.6 + + + + src/test/java + + + ../src/main/libs + + **/*.jar + + + + src/main/resources + + **/*.jar + + + + + + kr.motd.maven + os-maven-plugin + 1.5.0.Final + + + + + org.apache.maven.plugins + maven-install-plugin + 3.0.0-M1 + + + maven-dependency-plugin + + + compile + + copy-dependencies + + + ${project.build.directory}/lib + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.8.1 + + 11 + 11 + 11 + + + + org.codehaus.mojo + templating-maven-plugin + 1.0.0 + + + filtering-java-templates + + filter-sources + + + + + + maven-surefire-plugin + 2.22.1 + + + org.junit.platform + junit-platform-surefire-provider + 1.2.0 + + + org.junit.jupiter + junit-jupiter-engine + 5.7.0 + + + + + + diff --git a/src/main/java-templates/org/tdlibsessioncontainer/utils/generated/LibraryVersion.java b/src/main/java-templates/org/tdlibsessioncontainer/utils/generated/LibraryVersion.java new file mode 100644 index 0000000..7bf0e51 --- /dev/null +++ b/src/main/java-templates/org/tdlibsessioncontainer/utils/generated/LibraryVersion.java @@ -0,0 +1,4 @@ +package org.tdlibsessioncontainer.utils.generated; +public final class LibraryVersion { + public static final String VERSION = "${project.version}"; +} \ No newline at end of file diff --git a/src/main/java/it/tdlight/tdlibsession/FatalErrorType.java b/src/main/java/it/tdlight/tdlibsession/FatalErrorType.java new file mode 100644 index 0000000..5c65983 --- /dev/null +++ b/src/main/java/it/tdlight/tdlibsession/FatalErrorType.java @@ -0,0 +1,5 @@ +package it.tdlight.tdlibsession; + +public enum FatalErrorType { + ACCESS_TOKEN_INVALID, PHONE_NUMBER_INVALID, CONNECTION_KILLED +} diff --git a/src/main/java/it/tdlight/tdlibsession/TdGson.java b/src/main/java/it/tdlight/tdlibsession/TdGson.java new file mode 100644 index 0000000..f5b6dd5 --- /dev/null +++ b/src/main/java/it/tdlight/tdlibsession/TdGson.java @@ -0,0 +1,65 @@ +package it.tdlight.tdlibsession; + +import com.google.gson.GsonBuilder; +import com.google.gson.JsonDeserializationContext; +import com.google.gson.JsonDeserializer; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonParseException; +import com.google.gson.JsonPrimitive; +import com.google.gson.JsonSerializationContext; +import com.google.gson.JsonSerializer; +import it.tdlight.jni.TdApi; +import java.lang.reflect.Modifier; +import java.lang.reflect.Type; +import java.util.ArrayList; + +public class TdGson { + + private static final TdApiGenericSerializer tdApiGenericSerializerInstance = new TdApiGenericSerializer<>(); + private static final ArrayList> abstractClassesSerializers = new ArrayList<>(); + + static { + for (Class declaredClass : TdApi.class.getDeclaredClasses()) { + var modifiers = declaredClass.getModifiers(); + if (Modifier.isAbstract(modifiers) && Modifier.isPublic(modifiers) && Modifier + .isStatic(modifiers)) { + abstractClassesSerializers.add(declaredClass); + } + } + } + + public static GsonBuilder registerAdapters(GsonBuilder gsonBuilder) { + for (Class abstractClassesSerializer : abstractClassesSerializers) { + gsonBuilder.registerTypeAdapter(abstractClassesSerializer, tdApiGenericSerializerInstance); + } + return gsonBuilder; + } + + public static class TdApiGenericSerializer implements JsonSerializer, JsonDeserializer { + + @Override + public JsonElement serialize(T src, Type typeOfSrc, JsonSerializationContext context) { + JsonObject result = new JsonObject(); + result.add("type", new JsonPrimitive(src.getClass().getSimpleName())); + result.add("properties", context.serialize(src, src.getClass())); + + return result; + } + + @Override + public T deserialize(JsonElement json, Type typeOfT, JsonDeserializationContext context) + throws JsonParseException { + JsonObject jsonObject = json.getAsJsonObject(); + String type = jsonObject.get("type").getAsString().replaceAll("[^a-zA-Z0-9]", ""); + JsonElement element = jsonObject.get("properties"); + + try { + return context + .deserialize(element, Class.forName(TdApi.class.getCanonicalName() + "$" + type)); + } catch (ClassNotFoundException cnfe) { + throw new JsonParseException("Unknown element type: " + type, cnfe); + } + } + } +} \ No newline at end of file diff --git a/src/main/java/it/tdlight/tdlibsession/VariableWrapper.java b/src/main/java/it/tdlight/tdlibsession/VariableWrapper.java new file mode 100644 index 0000000..1d510c1 --- /dev/null +++ b/src/main/java/it/tdlight/tdlibsession/VariableWrapper.java @@ -0,0 +1,10 @@ +package it.tdlight.tdlibsession; + +public class VariableWrapper { + + public volatile T var; + + public VariableWrapper(T value) { + this.var = value; + } +} diff --git a/src/main/java/it/tdlight/tdlibsession/remoteclient/SecurityInfo.java b/src/main/java/it/tdlight/tdlibsession/remoteclient/SecurityInfo.java new file mode 100644 index 0000000..225a197 --- /dev/null +++ b/src/main/java/it/tdlight/tdlibsession/remoteclient/SecurityInfo.java @@ -0,0 +1,74 @@ +package it.tdlight.tdlibsession.remoteclient; + +import io.vertx.core.file.FileSystemException; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.NoSuchElementException; +import java.util.StringJoiner; + +public class SecurityInfo { + + private final Path keyStorePath; + private final Path keyStorePasswordPath; + private final Path trustStorePath; + private final Path trustStorePasswordPath; + + public SecurityInfo(Path keyStorePath, Path keyStorePasswordPath, Path trustStorePath, Path trustStorePasswordPath) { + this.keyStorePath = keyStorePath; + this.keyStorePasswordPath = keyStorePasswordPath; + this.trustStorePath = trustStorePath; + this.trustStorePasswordPath = trustStorePasswordPath; + } + + public Path getKeyStorePath() { + return keyStorePath; + } + + public Path getKeyStorePasswordPath() { + return keyStorePasswordPath; + } + + public String getKeyStorePassword() { + try { + if (Files.isReadable(keyStorePasswordPath) && Files.size(keyStorePasswordPath) >= 6) { + return Files.readString(keyStorePasswordPath, StandardCharsets.UTF_8).split("\n")[0]; + } else { + throw new NoSuchElementException("No keystore password is set on '" + keyStorePasswordPath.toString() + "'"); + } + } catch (IOException ex) { + throw new FileSystemException(ex); + } + } + + public Path getTrustStorePath() { + return trustStorePath; + } + + public Path getTrustStorePasswordPath() { + return trustStorePasswordPath; + } + + public String getTrustStorePassword() { + try { + if (Files.isReadable(trustStorePasswordPath) && Files.size(trustStorePasswordPath) >= 6) { + return Files.readString(trustStorePasswordPath, StandardCharsets.UTF_8).split("\n")[0]; + } else { + throw new NoSuchElementException("No truststore password is set on '" + trustStorePasswordPath.toString() + "'"); + } + } catch (IOException ex) { + throw new FileSystemException(ex); + } + } + + @Override + public String toString() { + return new StringJoiner(", ", SecurityInfo.class.getSimpleName() + "[", "]") + .add("keyStorePath=" + keyStorePath) + .add("keyStorePasswordPath=" + keyStorePasswordPath) + .add("trustStorePath=" + trustStorePath) + .add("trustStorePasswordPath=" + trustStorePasswordPath) + .toString(); + } +} diff --git a/src/main/java/it/tdlight/tdlibsession/remoteclient/TDLibRemoteClient.java b/src/main/java/it/tdlight/tdlibsession/remoteclient/TDLibRemoteClient.java new file mode 100644 index 0000000..28f9bf6 --- /dev/null +++ b/src/main/java/it/tdlight/tdlibsession/remoteclient/TDLibRemoteClient.java @@ -0,0 +1,155 @@ +package it.tdlight.tdlibsession.remoteclient; + +import io.vertx.core.DeploymentOptions; +import io.vertx.core.Handler; +import io.vertx.core.json.JsonObject; +import io.vertx.core.net.JksOptions; +import it.tdlight.common.Init; +import it.tdlight.common.utils.CantLoadLibrary; +import it.tdlight.tdlibsession.td.middle.TdClusterManager; +import it.tdlight.tdlibsession.td.middle.server.AsyncTdMiddleEventBusServer; +import java.io.IOException; +import java.net.URISyntaxException; +import java.nio.file.FileAlreadyExistsException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.LinkedHashSet; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.logging.log4j.LogManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import it.tdlight.utils.MonoUtils; +import reactor.core.publisher.Mono; +import reactor.core.publisher.ReplayProcessor; + +public class TDLibRemoteClient implements AutoCloseable { + + private static final Logger logger = LoggerFactory.getLogger(TDLibRemoteClient.class); + + private final SecurityInfo securityInfo; + private final String masterHostname; + private final String netInterface; + private final int port; + private final Set membersAddresses; + private final LinkedHashSet botIds; + private final ReplayProcessor clusterManager = ReplayProcessor.cacheLast(); + + public TDLibRemoteClient(SecurityInfo securityInfo, String masterHostname, String netInterface, int port, Set membersAddresses, Set botIds) { + this.securityInfo = securityInfo; + this.masterHostname = masterHostname; + this.netInterface = netInterface; + this.port = port; + this.membersAddresses = membersAddresses; + this.botIds = new LinkedHashSet<>(botIds); + + try { + Init.start(); + } catch (CantLoadLibrary ex) { + throw new RuntimeException(ex); + } + } + + public static void main(String[] args) throws URISyntaxException { + if (args.length < 1) { + return; + } + + String masterHostname = args[0]; + + String[] interfaceAndPort = args[1].split(":", 2); + + String netInterface = interfaceAndPort[0]; + + int port = Integer.parseInt(interfaceAndPort[1]); + + Set membersAddresses = Set.of(args[2].split(",")); + + Set botIds = Set.of(args[3].split(",")); + + Path keyStorePath = Paths.get(args[4]); + Path keyStorePasswordPath = Paths.get(args[5]); + Path trustStorePath = Paths.get(args[6]); + Path trustStorePasswordPath = Paths.get(args[7]); + + var loggerContext = (org.apache.logging.log4j.core.LoggerContext) LogManager.getContext(false); + loggerContext.setConfigLocation(TDLibRemoteClient.class.getResource("/tdlib-session-container-log4j2.xml").toURI()); + + var securityInfo = new SecurityInfo(keyStorePath, keyStorePasswordPath, trustStorePath, trustStorePasswordPath); + + new TDLibRemoteClient(securityInfo, masterHostname, netInterface, port, membersAddresses, botIds).run(x -> {}); + } + + public void start(Handler startedEventHandler) throws IllegalStateException { + run(startedEventHandler); + } + + public void run(Handler startedEventHandler) { + try { + // Set verbosity level here, before creating the bots + if (Files.notExists(Paths.get("logs"))) { + try { + Files.createDirectory(Paths.get("logs")); + } catch (FileAlreadyExistsException ignored) { + } + } + + logger.info("TDLib remote client is being hosted on" + netInterface + ":" + port + ". Master: " + masterHostname); + + var keyStoreOptions = new JksOptions() + .setPath(securityInfo.getKeyStorePath().toAbsolutePath().toString()) + .setPassword(securityInfo.getKeyStorePassword()); + + var trustStoreOptions = new JksOptions() + .setPath(securityInfo.getTrustStorePath().toAbsolutePath().toString()) + .setPassword(securityInfo.getTrustStorePassword()); + + Mono flux; + if (!botIds.isEmpty()) { + flux = TdClusterManager.ofNodes(keyStoreOptions, + trustStoreOptions, + false, + masterHostname, + netInterface, + port, + membersAddresses + ); + } else { + flux = Mono.empty(); + } + + flux + .doOnNext(clusterManager::onNext) + .doOnTerminate(clusterManager::onComplete) + .doOnError(clusterManager::onError) + .flatMapIterable(clusterManager -> botIds + .stream() + .map(id -> Map.entry(clusterManager, id)) + .collect(Collectors.toList())) + .flatMap(entry -> Mono.create(sink -> { + entry + .getKey() + .getVertx() + .deployVerticle(new AsyncTdMiddleEventBusServer(entry.getKey()), + new DeploymentOptions().setConfig(new JsonObject() + .put("botAddress", entry.getValue()) + .put("botAlias", entry.getValue()) + .put("local", false)), + MonoUtils.toHandler(sink) + ); + })) + .doOnError(ex -> { + logger.error(ex.getLocalizedMessage(), ex); + }).subscribe(i -> {}, e -> {}, () -> startedEventHandler.handle(null)); + } catch (IOException ex) { + logger.error("Remote client error", ex); + } + } + + @Override + public void close() { + clusterManager.blockFirst(); + } +} diff --git a/src/main/java/it/tdlight/tdlibsession/td/ResponseError.java b/src/main/java/it/tdlight/tdlibsession/td/ResponseError.java new file mode 100644 index 0000000..b9792ed --- /dev/null +++ b/src/main/java/it/tdlight/tdlibsession/td/ResponseError.java @@ -0,0 +1,126 @@ +package it.tdlight.tdlibsession.td; + +import it.tdlight.jni.TdApi; +import it.tdlight.jni.TdApi.Function; +import java.io.IOException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +public class ResponseError extends IOException { + + @NotNull + private final String botName; + @NotNull + private final String tag; + private final int code; + @NotNull + private final String message; + private final Throwable cause; + + public ResponseError(@NotNull Function function, @NotNull String botName, @NotNull TdApi.Error cause) { + super("Bot '" + botName + "' failed the request '" + functionToInlineString(function) + "': " + cause.code + " " + cause.message); + this.botName = botName; + this.tag = functionToInlineString(function); + this.code = cause.code; + this.message = cause.message; + this.cause = null; + } + + public ResponseError(@NotNull String tag, @NotNull String botName, @NotNull TdApi.Error cause) { + super("Bot '" + botName + "' failed the request '" + tag + "': " + cause.code + " " + cause.message); + this.botName = botName; + this.tag = tag; + this.code = cause.code; + this.message = cause.message; + this.cause = null; + } + + public ResponseError(@NotNull Function function, @NotNull String botName, @NotNull Throwable cause) { + super("Bot '" + botName + "' failed the request '" + functionToInlineString(function) + "': " + cause.getMessage()); + this.botName = botName; + this.tag = functionToInlineString(function); + this.code = 500; + this.message = cause.getMessage(); + this.cause = cause; + } + + public ResponseError(@NotNull String tag, @NotNull String botName, @NotNull Throwable cause) { + super("Bot '" + botName + "' failed the request '" + tag + "': " + cause.getMessage()); + this.botName = botName; + this.tag = tag; + this.code = 500; + this.message = cause.getMessage(); + this.cause = cause; + } + + public static ResponseError newResponseError(@NotNull Function function, @NotNull String botName, @NotNull TdApi.Error cause) { + return new ResponseError(function, botName, cause); + } + + public static ResponseError newResponseError(@NotNull String tag, @NotNull String botName, @NotNull TdApi.Error cause) { + return new ResponseError(tag, botName, cause); + } + + public static ResponseError newResponseError(@NotNull Function function, @NotNull String botName, @NotNull Throwable cause) { + return new ResponseError(function, botName, cause); + } + + public static ResponseError newResponseError(@NotNull String tag, @NotNull String botName, @NotNull Throwable cause) { + return new ResponseError(tag, botName, cause); + } + + @Nullable + public static T get(@NotNull Function function, @NotNull String botName, CompletableFuture action) throws ResponseError { + try { + return action.get(); + } catch (InterruptedException e) { + throw ResponseError.newResponseError(function, botName, e); + } catch (ExecutionException executionException) { + if (executionException.getCause() instanceof ResponseError) { + throw (ResponseError) executionException.getCause(); + } else { + throw ResponseError.newResponseError(function, botName, executionException); + } + } + } + + @Nullable + public static T get(@NotNull String tag, @NotNull String botName, CompletableFuture action) throws ResponseError { + try { + return action.get(); + } catch (InterruptedException e) { + throw ResponseError.newResponseError(tag, botName, e); + } catch (ExecutionException executionException) { + if (executionException.getCause() instanceof ResponseError) { + throw (ResponseError) executionException.getCause(); + } else { + throw ResponseError.newResponseError(tag, botName, executionException); + } + } + } + + @NotNull + public String getBotName() { + return botName; + } + + public int getErrorCode() { + return code; + } + + @NotNull + public String getErrorMessage() { + return message; + } + + private static String functionToInlineString(Function function) { + return function + .toString() + .replace("\n", " ") + .replace("\t", "") + .replace(" ", "") + .replace(" = ", "="); + } +} diff --git a/src/main/java/it/tdlight/tdlibsession/td/TdError.java b/src/main/java/it/tdlight/tdlibsession/td/TdError.java new file mode 100644 index 0000000..2eb9a1c --- /dev/null +++ b/src/main/java/it/tdlight/tdlibsession/td/TdError.java @@ -0,0 +1,12 @@ +package it.tdlight.tdlibsession.td; + +public class TdError extends RuntimeException { + + public TdError(int code, String message) { + super(code + " " + message); + } + + public TdError(int code, String message, Throwable cause) { + super(code + " " + message, cause); + } +} diff --git a/src/main/java/it/tdlight/tdlibsession/td/TdResult.java b/src/main/java/it/tdlight/tdlibsession/td/TdResult.java new file mode 100644 index 0000000..05c06cf --- /dev/null +++ b/src/main/java/it/tdlight/tdlibsession/td/TdResult.java @@ -0,0 +1,277 @@ +package it.tdlight.tdlibsession.td; + +import it.tdlight.jni.TdApi; +import it.tdlight.jni.TdApi.Error; +import java.util.concurrent.CompletionException; +import java.util.function.Function; +import org.jetbrains.annotations.NotNull; + +/** + * Encapsulates the result of an asynchronous operation. + *

+ * Many operations in Vert.x APIs provide results back by passing an instance of this in a {@link io.vertx.core.Handler}. + *

+ * The result can either have failed or succeeded. + *

+ * If it failed then the cause of the failure is available with {@link #cause}. + *

+ * If it succeeded then the actual result is available with {@link #result} + * + * @author Tim Fox + */ +public interface TdResult { + + /** + * The result of the operation. This will be null if the operation failed. + * + * @return the result or null if the operation failed. + */ + T result(); + + /** + * The result of the operation. This will throw CompletionException if the operation failed. + * + * @return the result. + */ + T orElseThrow() throws CompletionException; + + /** + * A TdApi.Error describing failure. This will be null if the operation succeeded. + * + * @return the cause or null if the operation succeeded. + */ + TdApi.Error cause(); + + /** + * Did it succeed? + * + * @return true if it succeded or false otherwise + */ + boolean succeeded(); + + /** + * Did it fail? + * + * @return true if it failed or false otherwise + */ + boolean failed(); + + /** + * Apply a {@code mapper} function on this async result.

+ * + * The {@code mapper} is called with the completed value and this mapper returns a value. This value will complete the result returned by this method call.

+ * + * When this async result is failed, the failure will be propagated to the returned async result and the {@code mapper} will not be called. + * + * @param mapper the mapper function + * @return the mapped async result + */ + default TdResult map(Function mapper) { + if (mapper == null) { + throw new NullPointerException(); + } + return new TdResult() { + @Override + public U result() { + if (succeeded()) { + return mapper.apply(TdResult.this.result()); + } else { + return null; + } + } + + @Override + public U orElseThrow() throws CompletionException { + if (succeeded()) { + return mapper.apply(TdResult.this.orElseThrow()); + } else { + return null; + } + } + + @Override + public TdApi.Error cause() { + return TdResult.this.cause(); + } + + @Override + public boolean succeeded() { + return TdResult.this.succeeded(); + } + + @Override + public boolean failed() { + return TdResult.this.failed(); + } + }; + } + + /** + * Map the result of this async result to a specific {@code value}.

+ * + * When this async result succeeds, this {@code value} will succeeed the async result returned by this method call.

+ * + * When this async result fails, the failure will be propagated to the returned async result. + * + * @param value the value that eventually completes the mapped async result + * @return the mapped async result + */ + default TdResult map(V value) { + return map((Function) t -> value); + } + + /** + * Map the result of this async result to {@code null}.

+ * + * This is a convenience for {@code TdResult.map((T) null)} or {@code TdResult.map((Void) null)}.

+ * + * When this async result succeeds, {@code null} will succeeed the async result returned by this method call.

+ * + * When this async result fails, the failure will be propagated to the returned async result. + * + * @return the mapped async result + */ + default TdResult mapEmpty() { + return map((V)null); + } + + /** + * Apply a {@code mapper} function on this async result.

+ * + * The {@code mapper} is called with the failure and this mapper returns a value. This value will complete the result returned by this method call.

+ * + * When this async result is succeeded, the value will be propagated to the returned async result and the {@code mapper} will not be called. + * + * @param mapper the mapper function + * @return the mapped async result + */ + default TdResult otherwise(Function mapper) { + if (mapper == null) { + throw new NullPointerException(); + } + return new TdResult() { + @Override + public T result() { + if (TdResult.this.succeeded()) { + return TdResult.this.result(); + } else if (TdResult.this.failed()) { + return mapper.apply(TdResult.this.cause()); + } else { + return null; + } + } + @Override + public T orElseThrow() { + if (TdResult.this.succeeded()) { + return TdResult.this.orElseThrow(); + } else if (TdResult.this.failed()) { + return mapper.apply(TdResult.this.cause()); + } else { + return null; + } + } + + @Override + public TdApi.Error cause() { + return null; + } + + @Override + public boolean succeeded() { + return TdResult.this.succeeded() || TdResult.this.failed(); + } + + @Override + public boolean failed() { + return false; + } + }; + } + + /** + * Map the failure of this async result to a specific {@code value}.

+ * + * When this async result fails, this {@code value} will succeeed the async result returned by this method call.

+ * + * When this async succeeds, the result will be propagated to the returned async result. + * + * @param value the value that eventually completes the mapped async result + * @return the mapped async result + */ + default TdResult otherwise(T value) { + return otherwise(err -> value); + } + + /** + * Map the failure of this async result to {@code null}.

+ * + * This is a convenience for {@code TdResult.otherwise((T) null)}.

+ * + * When this async result fails, the {@code null} will succeeed the async result returned by this method call.

+ * + * When this async succeeds, the result will be propagated to the returned async result. + * + * @return the mapped async result + */ + default TdResult otherwiseEmpty() { + return otherwise(err -> null); + } + + static TdResult succeeded(@NotNull T value) { + return new TdResultImpl(value, null); + } + + static TdResult failed(@NotNull TdApi.Error error) { + return new TdResultImpl(null, error); + } + + static TdResult of(@NotNull TdApi.Object resultOrError) { + if (resultOrError.getConstructor() == TdApi.Error.CONSTRUCTOR) { + return failed((TdApi.Error) resultOrError); + } else { + //noinspection unchecked + return succeeded((T) resultOrError); + } + } + + class TdResultImpl implements TdResult { + + private final U value; + private final Error error; + + public TdResultImpl(U value, Error error) { + this.value = value; + this.error = error; + + assert (value == null) != (error == null); + } + + @Override + public U result() { + return value; + } + + @Override + public U orElseThrow() { + if (error != null) { + throw new TdError(error.code, error.message); + } + return value; + } + + @Override + public Error cause() { + return error; + } + + @Override + public boolean succeeded() { + return value != null; + } + + @Override + public boolean failed() { + return error != null; + } + } +} diff --git a/src/main/java/it/tdlight/tdlibsession/td/TdResultMessage.java b/src/main/java/it/tdlight/tdlibsession/td/TdResultMessage.java new file mode 100644 index 0000000..84fadd1 --- /dev/null +++ b/src/main/java/it/tdlight/tdlibsession/td/TdResultMessage.java @@ -0,0 +1,24 @@ +package it.tdlight.tdlibsession.td; + +import it.tdlight.jni.TdApi; +import it.tdlight.jni.TdApi.Error; +import it.tdlight.jni.TdApi.Object; + +public class TdResultMessage { + public final TdApi.Object value; + public final TdApi.Error cause; + + public TdResultMessage(Object value, Error cause) { + this.value = value; + this.cause = cause; + } + + public TdResult toTdResult() { + if (value != null) { + //noinspection unchecked + return TdResult.succeeded((T) value); + } else { + return TdResult.failed(cause); + } + } +} diff --git a/src/main/java/it/tdlight/tdlibsession/td/direct/AsyncTdDirect.java b/src/main/java/it/tdlight/tdlibsession/td/direct/AsyncTdDirect.java new file mode 100644 index 0000000..fe9c0f4 --- /dev/null +++ b/src/main/java/it/tdlight/tdlibsession/td/direct/AsyncTdDirect.java @@ -0,0 +1,43 @@ +package it.tdlight.tdlibsession.td.direct; + +import io.vertx.core.AsyncResult; +import it.tdlight.jni.TdApi; +import it.tdlight.jni.TdApi.Function; +import it.tdlight.jni.TdApi.Update; +import it.tdlight.tdlibsession.td.TdResult; +import java.time.Duration; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +public interface AsyncTdDirect { + + /** + * Receives incoming updates and request responses from TDLib. May be called from any thread, but + * shouldn't be called simultaneously from two different threads. + * + * @param receiveDuration Maximum number of seconds allowed for this function to wait for new records. Default: 1 sec + * @param eventsSize Maximum number of events allowed in list. Default: 350 events + * @return An incoming update or request response list. The object returned in the response may be + * an empty list if the timeout expires. + */ + Flux>> getUpdates(Duration receiveDuration, int eventsSize); + + /** + * Sends request to TDLib. May be called from any thread. + * + * @param request Request to TDLib. + * @param synchronous Execute synchronously. + * @return The request response or {@link it.tdlight.jni.TdApi.Error}. + */ + Mono> execute(Function request, boolean synchronous); + + /** + * Initializes the client and TDLib instance. + */ + Mono initializeClient(); + + /** + * Destroys the client and TDLib instance. + */ + Mono destroyClient(); +} diff --git a/src/main/java/it/tdlight/tdlibsession/td/direct/AsyncTdDirectImpl.java b/src/main/java/it/tdlight/tdlibsession/td/direct/AsyncTdDirectImpl.java new file mode 100644 index 0000000..7866650 --- /dev/null +++ b/src/main/java/it/tdlight/tdlibsession/td/direct/AsyncTdDirectImpl.java @@ -0,0 +1,113 @@ +package it.tdlight.tdlibsession.td.direct; + +import io.vertx.core.AsyncResult; +import io.vertx.core.Future; +import it.tdlight.common.TelegramClient; +import it.tdlight.jni.TdApi; +import it.tdlight.jni.TdApi.AuthorizationStateClosed; +import it.tdlight.jni.TdApi.Function; +import it.tdlight.jni.TdApi.Object; +import it.tdlight.jni.TdApi.Update; +import it.tdlight.jni.TdApi.UpdateAuthorizationState; +import it.tdlight.tdlibsession.td.TdResult; +import it.tdlight.tdlight.ClientManager; +import java.time.Duration; +import java.util.concurrent.atomic.AtomicReference; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import reactor.core.publisher.EmitterProcessor; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Scheduler; +import reactor.core.scheduler.Schedulers; + +public class AsyncTdDirectImpl implements AsyncTdDirect { + + private static final Logger logger = LoggerFactory.getLogger(AsyncTdDirect.class); + + private final AtomicReference td = new AtomicReference<>(); + private final Scheduler tdScheduler = Schedulers.newSingle("TdMain"); + private final Scheduler tdPollScheduler = Schedulers.newSingle("TdPoll"); + private final Scheduler tdUpdatesScheduler = Schedulers.newSingle("TdUpdate"); + private final Scheduler tdResponsesScheduler = Schedulers.newSingle("TdResponse"); + + private final EmitterProcessor>> updatesProcessor = EmitterProcessor.create(); + private final String botAlias; + + public AsyncTdDirectImpl(String botAlias) { + this.botAlias = botAlias; + } + + @Override + public Mono> execute(Function request, boolean synchronous) { + if (synchronous) { + return Mono.just(TdResult.of(this.td.get().execute(request))); + } else { + return Mono.>create(sink -> { + try { + this.td.get().send(request, v -> { + sink.success(TdResult.of(v)); + }, sink::error); + } catch (Throwable t) { + sink.error(t); + } + }).subscribeOn(tdResponsesScheduler); + } + } + + @Override + public Flux>> getUpdates(Duration receiveDuration, int eventsSize) { + return Flux.from(updatesProcessor.subscribeOn(tdUpdatesScheduler)); + } + + public Scheduler getTdUpdatesScheduler() { + return tdUpdatesScheduler; + } + + public Scheduler getTdResponsesScheduler() { + return tdResponsesScheduler; + } + + @Override + public Mono initializeClient() { + return Mono.create(sink -> { + Flux.>>create(emitter -> { + var client = ClientManager.create((Object object) -> { + emitter.next(Future.succeededFuture(TdResult.of(object))); + // Close the emitter if receive closed state + if (object.getConstructor() == UpdateAuthorizationState.CONSTRUCTOR + && ((UpdateAuthorizationState) object).authorizationState.getConstructor() + == AuthorizationStateClosed.CONSTRUCTOR) { + emitter.complete(); + } + }, updateError -> { + emitter.next(Future.failedFuture(updateError)); + }, error -> { + emitter.next(Future.failedFuture(error)); + }); + this.td.set(client); + + emitter.onDispose(() -> { + this.td.set(null); + }); + }).subscribeOn(tdPollScheduler).subscribe(next -> { + updatesProcessor.onNext(next); + sink.success(true); + }, error -> { + updatesProcessor.onError(error); + sink.error(error); + }, () -> { + updatesProcessor.onComplete(); + sink.success(true); + }); + }).single().then().subscribeOn(tdScheduler); + } + + @Override + public Mono destroyClient() { + return Mono.fromCallable(() -> { + // do nothing + return (Void) null; + }).single().subscribeOn(tdScheduler); + } +} diff --git a/src/main/java/it/tdlight/tdlibsession/td/easy/AsyncTdEasy.java b/src/main/java/it/tdlight/tdlibsession/td/easy/AsyncTdEasy.java new file mode 100644 index 0000000..24a2280 --- /dev/null +++ b/src/main/java/it/tdlight/tdlibsession/td/easy/AsyncTdEasy.java @@ -0,0 +1,460 @@ +package it.tdlight.tdlibsession.td.easy; + +import it.tdlight.common.utils.ScannerUtils; +import it.tdlight.jni.TdApi; +import it.tdlight.jni.TdApi.AuthorizationState; +import it.tdlight.jni.TdApi.AuthorizationStateClosed; +import it.tdlight.jni.TdApi.AuthorizationStateClosing; +import it.tdlight.jni.TdApi.AuthorizationStateReady; +import it.tdlight.jni.TdApi.AuthorizationStateWaitCode; +import it.tdlight.jni.TdApi.AuthorizationStateWaitEncryptionKey; +import it.tdlight.jni.TdApi.AuthorizationStateWaitPassword; +import it.tdlight.jni.TdApi.AuthorizationStateWaitPhoneNumber; +import it.tdlight.jni.TdApi.AuthorizationStateWaitRegistration; +import it.tdlight.jni.TdApi.AuthorizationStateWaitTdlibParameters; +import it.tdlight.jni.TdApi.CheckAuthenticationBotToken; +import it.tdlight.jni.TdApi.CheckAuthenticationPassword; +import it.tdlight.jni.TdApi.CheckDatabaseEncryptionKey; +import it.tdlight.jni.TdApi.Error; +import it.tdlight.jni.TdApi.Object; +import it.tdlight.jni.TdApi.OptionValue; +import it.tdlight.jni.TdApi.OptionValueBoolean; +import it.tdlight.jni.TdApi.OptionValueEmpty; +import it.tdlight.jni.TdApi.OptionValueInteger; +import it.tdlight.jni.TdApi.OptionValueString; +import it.tdlight.jni.TdApi.PhoneNumberAuthenticationSettings; +import it.tdlight.jni.TdApi.RegisterUser; +import it.tdlight.jni.TdApi.SetAuthenticationPhoneNumber; +import it.tdlight.jni.TdApi.SetTdlibParameters; +import it.tdlight.jni.TdApi.TdlibParameters; +import it.tdlight.jni.TdApi.Update; +import it.tdlight.jni.TdApi.UpdateAuthorizationState; +import it.tdlight.tdlibsession.FatalErrorType; +import it.tdlight.tdlibsession.td.TdResult; +import it.tdlight.tdlibsession.td.middle.AsyncTdMiddle; +import it.tdlight.utils.MonoUtils; +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Comparator; +import java.util.Set; +import org.reactivestreams.Publisher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.warp.commonutils.error.InitializationException; +import reactor.core.publisher.EmitterProcessor; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.publisher.ReplayProcessor; +import reactor.core.scheduler.Schedulers; + +public class AsyncTdEasy { + + private static final Logger logger = LoggerFactory.getLogger(AsyncTdEasy.class); + + private final ReplayProcessor authState = ReplayProcessor.cacheLastOrDefault(new AuthorizationStateClosed()); + private final ReplayProcessor requestedDefinitiveExit = ReplayProcessor.cacheLastOrDefault(false); + private final ReplayProcessor settings = ReplayProcessor.cacheLast(); + private final EmitterProcessor globalErrors = EmitterProcessor.create(); + private final EmitterProcessor fatalErrors = EmitterProcessor.create(); + private final AsyncTdMiddle td; + private final String logName; + private final Flux incomingUpdatesCo; + + public AsyncTdEasy(AsyncTdMiddle td, String logName) { + this.td = td; + this.logName = logName; + + var sch = Schedulers.newSingle("TdEasyUpdates"); + + // todo: use Duration.ZERO instead of 10ms interval + this.incomingUpdatesCo = td.getUpdates() + .filterWhen(update -> Mono.from(requestedDefinitiveExit).map(requestedDefinitiveExit -> !requestedDefinitiveExit)) + .subscribeOn(sch) + .publishOn(sch) + .flatMap(this::preprocessUpdates) + .flatMap(update -> Mono.from(this.getState()).single().map(state -> new AsyncTdUpdateObj(state, update))) + .filter(upd -> upd.getState().getConstructor() == AuthorizationStateReady.CONSTRUCTOR) + .map(upd -> (TdApi.Update) upd.getUpdate()) + .doOnError(ex -> { + logger.error(ex.getLocalizedMessage(), ex); + }).doOnNext(v -> { + if (logger.isDebugEnabled()) logger.debug(v.toString()); + }).doOnComplete(() -> { + authState.onNext(new AuthorizationStateClosed()); + }) + .publish().refCount(1); + } + + public Mono create(TdEasySettings settings) { + return Mono + .fromCallable(() -> { + // Create session directories + if (Files.notExists(Path.of(settings.databaseDirectory))) { + try { + Files.createDirectories(Path.of(settings.databaseDirectory)); + } catch (IOException ex) { + throw new InitializationException(ex); + } + } + return true; + }) + .subscribeOn(Schedulers.boundedElastic()) + .flatMap(_v -> { + this.settings.onNext(settings); + return Mono.empty(); + }); + } + + /** + * Get TDLib state + */ + public Flux getState() { + return Flux.from(authState); + } + + /** + * Get incoming updates from TDLib. + */ + public Flux getIncomingUpdates() { + return getIncomingUpdates(false); + } + + private Flux getIncomingUpdates(boolean includePreAuthUpdates) { + return Flux.from(incomingUpdatesCo); + } + + /** + * Get generic error updates from TDLib (When they are not linked to a precise request). + */ + public Flux getIncomingErrors() { + return Flux.from(globalErrors); + } + + /** + * Receives fatal errors from TDLib. + */ + public Flux getFatalErrors() { + return Flux.from(fatalErrors); + } + + /** + * Sends request to TDLib. + * @return The response or {@link TdApi.Error}. + */ + public Mono> send(TdApi.Function request) { + return td.execute(request, false); + } + + private Mono> sendDirectly(TdApi.Function obj) { + return td.execute(obj, false); + } + + /** + * Set verbosity level + * @param i level + */ + public Mono setVerbosityLevel(int i) { + return sendDirectly(new TdApi.SetLogVerbosityLevel(i)).then(); + } + + /** + * Clear option on TDLib + * @param name option name + */ + public Mono clearOption(String name) { + return sendDirectly(new TdApi.SetOption(name, new TdApi.OptionValueEmpty())).then(); + } + + /** + * Set option on TDLib + * @param name option name + * @param value option value + */ + public Mono setOptionString(String name, String value) { + return sendDirectly(new TdApi.SetOption(name, new TdApi.OptionValueString(value))).then(); + } + + /** + * Set option on TDLib + * @param name option name + * @param value option value + */ + public Mono setOptionInteger(String name, long value) { + return sendDirectly(new TdApi.SetOption(name, new TdApi.OptionValueInteger(value))).then(); + } + + /** + * Set option on TDLib + * @param name option name + * @param value option value + */ + public Mono setOptionBoolean(String name, boolean value) { + return sendDirectly(new TdApi.SetOption(name, new TdApi.OptionValueBoolean(value))).then(); + } + + /** + * Get option from TDLib + * @param name option name + * @return The value or nothing + */ + public Mono getOptionString(String name) { + return this.sendDirectly(new TdApi.GetOption(name)).handle(MonoUtils::orElseThrow).flatMap((TdApi.OptionValue value) -> { + switch (value.getConstructor()) { + case OptionValueString.CONSTRUCTOR: + return Mono.just(((OptionValueString) value).value); + case OptionValueEmpty.CONSTRUCTOR: + return Mono.empty(); + default: + return Mono.error(new UnsupportedOperationException("The option " + name + " is of type " + + value.getClass().getSimpleName())); + } + }); + } + + /** + * Get option from TDLib + * @param name option name + * @return The value or nothing + */ + public Mono getOptionInteger(String name) { + return this.sendDirectly(new TdApi.GetOption(name)).handle(MonoUtils::orElseThrow).flatMap((TdApi.OptionValue value) -> { + switch (value.getConstructor()) { + case OptionValueInteger.CONSTRUCTOR: + return Mono.just(((OptionValueInteger) value).value); + case OptionValueEmpty.CONSTRUCTOR: + return Mono.empty(); + default: + return Mono.error(new UnsupportedOperationException("The option " + name + " is of type " + + value.getClass().getSimpleName())); + } + }); + } + + /** + * Get option from TDLib + * @param name option name + * @return The value or nothing + */ + public Mono getOptionBoolean(String name) { + return this.sendDirectly(new TdApi.GetOption(name)).handle(MonoUtils::orElseThrow).flatMap((TdApi.OptionValue value) -> { + switch (value.getConstructor()) { + case OptionValueBoolean.CONSTRUCTOR: + return Mono.just(((OptionValueBoolean) value).value); + case OptionValueEmpty.CONSTRUCTOR: + return Mono.empty(); + default: + return Mono.error(new UnsupportedOperationException("The option " + name + " is of type " + + value.getClass().getSimpleName())); + } + }); + } + + /** + * Synchronously executes TDLib requests. Only a few requests can be executed synchronously. May + * be called from any thread. + * + * @param request Request to the TDLib. + * @return The request response. + */ + public Mono> execute(TdApi.Function request) { + return td.execute(request, true); + } + + /** + * Set if skip updates or not + */ + public Mono setSkipUpdates(boolean skipUpdates) { //todo: do this + return null; + } + + /** + * Closes the client gracefully by sending {@link TdApi.Close}. + */ + public Mono close() { + return Mono.from(getState()) + .filter(state -> { + switch (state.getConstructor()) { + case AuthorizationStateClosing.CONSTRUCTOR: + case AuthorizationStateClosed.CONSTRUCTOR: + return false; + default: + return true; + } + }) + .then(Mono.from(requestedDefinitiveExit).single()) + .filter(closeRequested -> !closeRequested) + .doOnSuccess(v -> requestedDefinitiveExit.onNext(true)) + .then(td.execute(new TdApi.Close(), false)) + .then(); + } + + /** + * + * @param timeout Timeout in seconds when reading data + */ + public void setReadTimeout(int timeout) { + //todo: do this + } + + /** + * + * @param timeout Timeout in seconds when listening methods or connecting + */ + public void setMethodTimeout(int timeout) { + //todo: do this + } + + private Mono catchErrors(Object obj) { + if (obj.getConstructor() == Error.CONSTRUCTOR) { + var error = (Error) obj; + + switch (error.message) { + case "PHONE_CODE_INVALID": + globalErrors.onNext(error); + return Mono.just(new AuthorizationStateWaitCode()); + case "PASSWORD_HASH_INVALID": + globalErrors.onNext(error); + return Mono.just(new AuthorizationStateWaitPassword()); + case "PHONE_NUMBER_INVALID": + fatalErrors.onNext(FatalErrorType.PHONE_NUMBER_INVALID); + break; + case "ACCESS_TOKEN_INVALID": + fatalErrors.onNext(FatalErrorType.ACCESS_TOKEN_INVALID); + break; + case "CONNECTION_KILLED": + fatalErrors.onNext(FatalErrorType.CONNECTION_KILLED); + break; + default: + globalErrors.onNext(error); + break; + } + return Mono.empty(); + } + return Mono.just(obj); + } + + public Mono isBot() { + return Mono.from(settings).single().map(TdEasySettings::isBotTokenSet); + } + + private Publisher preprocessUpdates(Update updateObj) { + return Mono + .just(updateObj) + .flatMap(this::catchErrors) + .filter(obj -> obj.getConstructor() == UpdateAuthorizationState.CONSTRUCTOR) + .map(obj -> ((UpdateAuthorizationState) obj).authorizationState) + .flatMap(obj -> { + this.authState.onNext(new AuthorizationStateReady()); + switch (obj.getConstructor()) { + case AuthorizationStateWaitTdlibParameters.CONSTRUCTOR: + return Mono.from(this.settings).map(settings -> { + var parameters = new TdlibParameters(); + parameters.useTestDc = settings.useTestDc; + parameters.databaseDirectory = settings.databaseDirectory; + parameters.filesDirectory = settings.filesDirectory; + parameters.useFileDatabase = settings.useFileDatabase; + parameters.useChatInfoDatabase = settings.useChatInfoDatabase; + parameters.useMessageDatabase = settings.useMessageDatabase; + parameters.useSecretChats = false; + parameters.apiId = settings.apiId; + parameters.apiHash = settings.apiHash; + parameters.systemLanguageCode = settings.systemLanguageCode; + parameters.deviceModel = settings.deviceModel; + parameters.systemVersion = settings.systemVersion; + parameters.applicationVersion = settings.applicationVersion; + parameters.enableStorageOptimizer = settings.enableStorageOptimizer; + parameters.ignoreFileNames = settings.ignoreFileNames; + return new SetTdlibParameters(parameters); + }).flatMap(this::sendDirectly).then(); + case AuthorizationStateWaitEncryptionKey.CONSTRUCTOR: + return sendDirectly(new CheckDatabaseEncryptionKey()).then(); + case AuthorizationStateWaitPhoneNumber.CONSTRUCTOR: + return Mono.from(this.settings).flatMap(settings -> { + if (settings.isPhoneNumberSet()) { + return sendDirectly(new SetAuthenticationPhoneNumber(String.valueOf(settings.getPhoneNumber()), + new PhoneNumberAuthenticationSettings(false, false, false) + )); + } else if (settings.isBotTokenSet()) { + return sendDirectly(new CheckAuthenticationBotToken(settings.getBotToken())); + } else { + return Mono.error(new IllegalArgumentException("A bot is neither an user or a bot")); + } + }).then(); + case AuthorizationStateWaitRegistration.CONSTRUCTOR: + var authorizationStateWaitRegistration = (AuthorizationStateWaitRegistration) obj; + RegisterUser registerUser = new RegisterUser(); + if (authorizationStateWaitRegistration.termsOfService != null + && authorizationStateWaitRegistration.termsOfService.text != null && !authorizationStateWaitRegistration.termsOfService.text.text.isBlank()) { + logger.info("Telegram Terms of Service:\n" + authorizationStateWaitRegistration.termsOfService.text.text); + } + + while (registerUser.firstName == null || registerUser.firstName.length() <= 0 + || registerUser.firstName.length() > 64 || registerUser.firstName.isBlank()) { + registerUser.firstName = ScannerUtils.askParameter(this.logName, "Enter First Name").trim(); + } + while (registerUser.lastName == null || registerUser.firstName.length() > 64) { + registerUser.lastName = ScannerUtils.askParameter(this.logName, "Enter Last Name").trim(); + } + + return sendDirectly(registerUser).then(); + case AuthorizationStateWaitPassword.CONSTRUCTOR: + var authorizationStateWaitPassword = (AuthorizationStateWaitPassword) obj; + String passwordMessage = "Password authorization of '" + this.logName + "':"; + if (authorizationStateWaitPassword.passwordHint != null && !authorizationStateWaitPassword.passwordHint.isBlank()) { + passwordMessage += "\n\tHint: " + authorizationStateWaitPassword.passwordHint; + } + logger.info(passwordMessage); + + var password = ScannerUtils.askParameter(this.logName, "Enter your password"); + + return sendDirectly(new CheckAuthenticationPassword(password)).then(); + case AuthorizationStateReady.CONSTRUCTOR: { + return Mono.empty(); + } + case AuthorizationStateClosed.CONSTRUCTOR: + return Mono.from(requestedDefinitiveExit).doOnNext(closeRequested -> { + if (closeRequested) { + logger.info("AsyncTdEasy closed successfully"); + } else { + logger.warn("AsyncTdEasy closed unexpectedly: " + logName); + } + }).flatMap(closeRequested -> { + if (closeRequested) { + return Mono + .from(settings) + .map(settings -> settings.databaseDirectory) + .map(Path::of) + .flatMapIterable(sessionPath -> Set.of(sessionPath.resolve("media"), + sessionPath.resolve("passport"), + sessionPath.resolve("profile_photos"), + sessionPath.resolve("stickers"), + sessionPath.resolve("temp"), + sessionPath.resolve("thumbnails"), + sessionPath.resolve("wallpapers") + )) + .doOnNext(directory -> { + try { + if (!Files.walk(directory) + .sorted(Comparator.reverseOrder()) + .map(Path::toFile) + .allMatch(File::delete)) { + throw new IOException("Can't delete a file!"); + } + } catch (IOException e) { + logger.error("Can't delete temporary session subdirectory", e); + } + }) + .then(Mono.just(closeRequested)); + } else { + return Mono.just(closeRequested); + } + }).then(); + default: + return Mono.empty(); + } + }) + .thenReturn(updateObj); + } +} diff --git a/src/main/java/it/tdlight/tdlibsession/td/easy/AsyncTdUpdateObj.java b/src/main/java/it/tdlight/tdlibsession/td/easy/AsyncTdUpdateObj.java new file mode 100644 index 0000000..dab815e --- /dev/null +++ b/src/main/java/it/tdlight/tdlibsession/td/easy/AsyncTdUpdateObj.java @@ -0,0 +1,49 @@ +package it.tdlight.tdlibsession.td.easy; + +import it.tdlight.jni.TdApi; +import it.tdlight.jni.TdApi.AuthorizationState; +import java.util.Objects; +import java.util.StringJoiner; + +public class AsyncTdUpdateObj { + private final AuthorizationState state; + private final TdApi.Object update; + + public AsyncTdUpdateObj(AuthorizationState state, TdApi.Object update) { + this.state = state; + this.update = update; + } + + public AuthorizationState getState() { + return state; + } + + public TdApi.Object getUpdate() { + return update; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + AsyncTdUpdateObj that = (AsyncTdUpdateObj) o; + return Objects.equals(state, that.state) && Objects.equals(update, that.update); + } + + @Override + public int hashCode() { + return Objects.hash(state, update); + } + + @Override + public String toString() { + return new StringJoiner(", ", AsyncTdUpdateObj.class.getSimpleName() + "[", "]") + .add("state=" + state) + .add("update=" + update) + .toString(); + } +} diff --git a/src/main/java/it/tdlight/tdlibsession/td/easy/TdEasySettings.java b/src/main/java/it/tdlight/tdlibsession/td/easy/TdEasySettings.java new file mode 100644 index 0000000..6e3d1b1 --- /dev/null +++ b/src/main/java/it/tdlight/tdlibsession/td/easy/TdEasySettings.java @@ -0,0 +1,262 @@ +package it.tdlight.tdlibsession.td.easy; + +import java.util.Objects; +import org.jetbrains.annotations.Nullable; + +public class TdEasySettings { + public final boolean useTestDc; + public final String databaseDirectory; + public final String filesDirectory; + public final boolean useFileDatabase; + public final boolean useChatInfoDatabase; + public final boolean useMessageDatabase; + public final int apiId; + public final String apiHash; + public final String systemLanguageCode; + public final String deviceModel; + public final String systemVersion; + public final String applicationVersion; + public final boolean enableStorageOptimizer; + public final boolean ignoreFileNames; + private final Long phoneNumber; + private final String botToken; + + public TdEasySettings(boolean useTestDc, + String databaseDirectory, + String filesDirectory, + boolean useFileDatabase, + boolean useChatInfoDatabase, + boolean useMessageDatabase, + int apiId, + String apiHash, + String systemLanguageCode, + String deviceModel, + String systemVersion, + String applicationVersion, + boolean enableStorageOptimizer, + boolean ignoreFileNames, + @Nullable Long phoneNumber, + @Nullable String botToken) { + this.useTestDc = useTestDc; + this.databaseDirectory = databaseDirectory; + this.filesDirectory = filesDirectory; + this.useFileDatabase = useFileDatabase; + this.useChatInfoDatabase = useChatInfoDatabase; + this.useMessageDatabase = useMessageDatabase; + this.apiId = apiId; + this.apiHash = apiHash; + this.systemLanguageCode = systemLanguageCode; + this.deviceModel = deviceModel; + this.systemVersion = systemVersion; + this.applicationVersion = applicationVersion; + this.enableStorageOptimizer = enableStorageOptimizer; + this.ignoreFileNames = ignoreFileNames; + this.phoneNumber = phoneNumber; + this.botToken = botToken; + if ((phoneNumber == null) == (botToken == null)) { + throw new IllegalArgumentException("You must set a phone number or a bot token"); + } + } + + public boolean isPhoneNumberSet() { + return phoneNumber != null; + } + + public long getPhoneNumber() { + return Objects.requireNonNull(phoneNumber, "You must set a phone number"); + } + + public boolean isBotTokenSet() { + return botToken != null; + } + + public String getBotToken() { + return Objects.requireNonNull(botToken, "You must set a bot token"); + } + + public static Builder newBuilder() { + return new Builder(); + } + + + public static class Builder { + private boolean useTestDc = false; + private String databaseDirectory = "jtdlib-database"; + private String filesDirectory = "jtdlib-files"; + private boolean useFileDatabase = true; + private boolean useChatInfoDatabase = true; + private boolean useMessageDatabase = true; + private int apiId = 376588; + private String apiHash = "2143fdfc2bbba3ec723228d2f81336c9"; + private String systemLanguageCode = "en"; + private String deviceModel = "JTDLib"; + private String systemVersion = "JTDLib"; + private String applicationVersion = "1.0"; + private boolean enableStorageOptimizer = false; + private boolean ignoreFileNames = false; + @Nullable + private Long phoneNumber = null; + @Nullable + private String botToken = null; + + private Builder() { + + } + + public boolean isUseTestDc() { + return useTestDc; + } + + public Builder setUseTestDc(boolean useTestDc) { + this.useTestDc = useTestDc; + return this; + } + + public String getDatabaseDirectory() { + return databaseDirectory; + } + + public Builder setDatabaseDirectory(String databaseDirectory) { + this.databaseDirectory = databaseDirectory; + return this; + } + + public String getFilesDirectory() { + return filesDirectory; + } + + public Builder setFilesDirectory(String filesDirectory) { + this.filesDirectory = filesDirectory; + return this; + } + + public boolean isUseFileDatabase() { + return useFileDatabase; + } + + public Builder setUseFileDatabase(boolean useFileDatabase) { + this.useFileDatabase = useFileDatabase; + return this; + } + + public boolean isUseChatInfoDatabase() { + return useChatInfoDatabase; + } + + public Builder setUseChatInfoDatabase(boolean useChatInfoDatabase) { + this.useChatInfoDatabase = useChatInfoDatabase; + return this; + } + + public boolean isUseMessageDatabase() { + return useMessageDatabase; + } + + public Builder setUseMessageDatabase(boolean useMessageDatabase) { + this.useMessageDatabase = useMessageDatabase; + return this; + } + + public int getApiId() { + return apiId; + } + + public Builder setApiId(int apiId) { + this.apiId = apiId; + return this; + } + + public String getApiHash() { + return apiHash; + } + + public Builder setApiHash(String apiHash) { + this.apiHash = apiHash; + return this; + } + + public String getSystemLanguageCode() { + return systemLanguageCode; + } + + public Builder setSystemLanguageCode(String systemLanguageCode) { + this.systemLanguageCode = systemLanguageCode; + return this; + } + + public String getDeviceModel() { + return deviceModel; + } + + public Builder setDeviceModel(String deviceModel) { + this.deviceModel = deviceModel; + return this; + } + + public String getSystemVersion() { + return systemVersion; + } + + public Builder setSystemVersion(String systemVersion) { + this.systemVersion = systemVersion; + return this; + } + + public String getApplicationVersion() { + return applicationVersion; + } + + public Builder setApplicationVersion(String applicationVersion) { + this.applicationVersion = applicationVersion; + return this; + } + + public boolean isEnableStorageOptimizer() { + return enableStorageOptimizer; + } + + public Builder setEnableStorageOptimizer(boolean enableStorageOptimizer) { + this.enableStorageOptimizer = enableStorageOptimizer; + return this; + } + + public boolean isIgnoreFileNames() { + return ignoreFileNames; + } + + public Builder setIgnoreFileNames(boolean ignoreFileNames) { + this.ignoreFileNames = ignoreFileNames; + return this; + } + + public Builder setPhoneNumber(long phoneNumber) { + this.phoneNumber = phoneNumber; + return this; + } + + public Builder setBotToken(String botToken) { + this.botToken = botToken; + return this; + } + + public TdEasySettings build() { + return new TdEasySettings(useTestDc, + databaseDirectory, + filesDirectory, + useFileDatabase, + useChatInfoDatabase, + useMessageDatabase, + apiId, + apiHash, + systemLanguageCode, + deviceModel, + systemVersion, + applicationVersion, + enableStorageOptimizer, + ignoreFileNames, + phoneNumber, + botToken + ); + } + } +} diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/AsyncTdMiddle.java b/src/main/java/it/tdlight/tdlibsession/td/middle/AsyncTdMiddle.java new file mode 100644 index 0000000..82aae3c --- /dev/null +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/AsyncTdMiddle.java @@ -0,0 +1,24 @@ +package it.tdlight.tdlibsession.td.middle; + +import it.tdlight.jni.TdApi; +import it.tdlight.tdlibsession.td.TdResult; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +public interface AsyncTdMiddle { + + /** + * Receives incoming updates from TDLib. + * + * @return Updates + */ + Flux getUpdates(); + + /** + * Sends request to TDLib. May be called from any thread. + * + * @param request Request to TDLib. + * @param executeDirectly Execute the function synchronously. + */ + Mono> execute(TdApi.Function request, boolean executeDirectly); +} diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/AsyncTdMiddleCommon.java b/src/main/java/it/tdlight/tdlibsession/td/middle/AsyncTdMiddleCommon.java new file mode 100644 index 0000000..8a7984a --- /dev/null +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/AsyncTdMiddleCommon.java @@ -0,0 +1,10 @@ +package it.tdlight.tdlibsession.td.middle; + +import io.vertx.core.AbstractVerticle; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AsyncTdMiddleCommon extends AbstractVerticle { + + private static final Logger logger = LoggerFactory.getLogger(AsyncTdMiddleCommon.class); +} diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/ExecuteObject.java b/src/main/java/it/tdlight/tdlibsession/td/middle/ExecuteObject.java new file mode 100644 index 0000000..9766758 --- /dev/null +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/ExecuteObject.java @@ -0,0 +1,55 @@ +package it.tdlight.tdlibsession.td.middle; + +import it.tdlight.jni.TdApi; +import java.util.Objects; +import java.util.StringJoiner; + +public class ExecuteObject { + private final boolean executeDirectly; + private final TdApi.Function request; + + public ExecuteObject(boolean executeDirectly, TdApi.Function request) { + this.executeDirectly = executeDirectly; + this.request = request; + } + + public boolean isExecuteDirectly() { + return executeDirectly; + } + + public TdApi.Function getRequest() { + return request; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + ExecuteObject that = (ExecuteObject) o; + + if (executeDirectly != that.executeDirectly) { + return false; + } + return Objects.equals(request, that.request); + } + + @Override + public int hashCode() { + int result = (executeDirectly ? 1 : 0); + result = 31 * result + (request != null ? request.hashCode() : 0); + return result; + } + + @Override + public String toString() { + return new StringJoiner(", ", ExecuteObject.class.getSimpleName() + "[", "]") + .add("executeDirectly=" + executeDirectly) + .add("request=" + request) + .toString(); + } +} diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/TdClusterManager.java b/src/main/java/it/tdlight/tdlibsession/td/middle/TdClusterManager.java new file mode 100644 index 0000000..e9cb4ef --- /dev/null +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/TdClusterManager.java @@ -0,0 +1,227 @@ +package it.tdlight.tdlibsession.td.middle; + +import com.hazelcast.config.Config; +import com.hazelcast.config.EvictionPolicy; +import com.hazelcast.config.GroupConfig; +import com.hazelcast.config.MapConfig; +import com.hazelcast.config.MaxSizeConfig; +import com.hazelcast.config.MaxSizeConfig.MaxSizePolicy; +import com.hazelcast.config.MergePolicyConfig; +import com.hazelcast.config.SemaphoreConfig; +import io.vertx.core.Handler; +import io.vertx.core.Vertx; +import io.vertx.core.VertxOptions; +import io.vertx.core.eventbus.DeliveryOptions; +import io.vertx.core.eventbus.EventBus; +import io.vertx.core.eventbus.Message; +import io.vertx.core.eventbus.MessageCodec; +import io.vertx.core.eventbus.MessageConsumer; +import io.vertx.core.http.ClientAuth; +import io.vertx.core.net.JksOptions; +import io.vertx.core.spi.cluster.ClusterManager; +import io.vertx.spi.cluster.hazelcast.HazelcastClusterManager; +import java.nio.channels.AlreadyBoundException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import org.jetbrains.annotations.Nullable; +import it.tdlight.utils.MonoUtils; +import reactor.core.publisher.Mono; + +public class TdClusterManager { + + private static final AtomicBoolean definedMasterCluster = new AtomicBoolean(false); + private static final AtomicBoolean definedNodesCluster = new AtomicBoolean(false); + private final ClusterManager mgr; + private final VertxOptions vertxOptions; + private final Vertx vertx; + private final EventBus eb; + + public TdClusterManager(ClusterManager mgr, VertxOptions vertxOptions, Vertx vertx, EventBus eventBus) { + this.mgr = mgr; + this.vertxOptions = vertxOptions; + this.vertx = vertx; + this.eb = eventBus; + } + + public static Mono ofMaster(JksOptions keyStoreOptions, JksOptions trustStoreOptions, boolean onlyLocal, String masterHostname, String netInterface, int port, Set nodesAddresses) { + if (definedMasterCluster.compareAndSet(false, true)) { + var vertxOptions = new VertxOptions(); + netInterface = onlyLocal ? "127.0.0.1" : netInterface; + Config cfg; + if (!onlyLocal) { + cfg = new Config(); + cfg.setInstanceName("Master"); + } else { + cfg = null; + } + return of(cfg, + vertxOptions, + keyStoreOptions, trustStoreOptions, masterHostname, netInterface, port, nodesAddresses); + } else { + return Mono.error(new AlreadyBoundException()); + } + } + + public static Mono ofNodes(JksOptions keyStoreOptions, JksOptions trustStoreOptions, boolean onlyLocal, String masterHostname, String netInterface, int port, Set nodesAddresses) { + if (definedNodesCluster.compareAndSet(false, true)) { + var vertxOptions = new VertxOptions(); + netInterface = onlyLocal ? "127.0.0.1" : netInterface; + Config cfg; + if (!onlyLocal) { + cfg = new Config(); + cfg.setInstanceName("Node-" + new Random().nextLong()); + } else { + cfg = null; + } + return of(cfg, vertxOptions, keyStoreOptions, trustStoreOptions, masterHostname, netInterface, port, nodesAddresses); + } else { + return Mono.error(new AlreadyBoundException()); + } + } + + public static Mono of(@Nullable Config cfg, + VertxOptions vertxOptions, + JksOptions keyStoreOptions, + JksOptions trustStoreOptions, + String masterHostname, + String netInterface, + int port, + Set nodesAddresses) { + ClusterManager mgr; + if (cfg != null) { + cfg.getNetworkConfig().setPortCount(1); + cfg.getNetworkConfig().setPort(port); + cfg.getNetworkConfig().setPortAutoIncrement(false); + cfg.getPartitionGroupConfig().setEnabled(false); + cfg.addMapConfig(new MapConfig() + .setName("__vertx.subs") + .setBackupCount(1) + .setTimeToLiveSeconds(0) + .setMaxIdleSeconds(0) + .setEvictionPolicy(EvictionPolicy.NONE) + .setMaxSizeConfig(new MaxSizeConfig().setMaxSizePolicy(MaxSizePolicy.PER_NODE).setSize(0)) + .setMergePolicyConfig(new MergePolicyConfig().setPolicy("com.hazelcast.map.merge.LatestUpdateMapMergePolicy"))); + cfg.setSemaphoreConfigs(Map.of("__vertx.*", new SemaphoreConfig().setInitialPermits(1))); + cfg.getNetworkConfig().getJoin().getMulticastConfig().setEnabled(false); + cfg.getNetworkConfig().getJoin().getAwsConfig().setEnabled(false); + cfg.getNetworkConfig().getJoin().getTcpIpConfig().setEnabled(true); + var addresses = new ArrayList<>(nodesAddresses); + cfg.getNetworkConfig().getJoin().getTcpIpConfig().setMembers(addresses); + cfg.getNetworkConfig().getInterfaces().clear(); + cfg.getNetworkConfig().getInterfaces().setInterfaces(Collections.singleton(netInterface)).setEnabled(true); + cfg.getNetworkConfig().setOutboundPorts(Collections.singleton(0)); + + cfg.setProperty("hazelcast.logging.type", "slf4j"); + cfg.setProperty("hazelcast.wait.seconds.before.join", "0"); + cfg.setProperty("hazelcast.tcp.join.port.try.count", "5"); + cfg.setProperty("hazelcast.socket.bind.any", "false"); + cfg.setGroupConfig(new GroupConfig().setName("dev").setPassword("HzPasswordsAreDeprecated")); + mgr = new HazelcastClusterManager(cfg); + vertxOptions.setClusterManager(mgr); + vertxOptions.getEventBusOptions().setConnectTimeout(120000); + //vertxOptions.getEventBusOptions().setIdleTimeout(60); + //vertxOptions.getEventBusOptions().setSsl(false); + + vertxOptions.getEventBusOptions().setSslHandshakeTimeout(120000).setSslHandshakeTimeoutUnit(TimeUnit.MILLISECONDS); + vertxOptions.getEventBusOptions().setKeyStoreOptions(keyStoreOptions); + vertxOptions.getEventBusOptions().setTrustStoreOptions(trustStoreOptions); + vertxOptions.getEventBusOptions().setHost(masterHostname); + vertxOptions.getEventBusOptions().setPort(port + 1); + vertxOptions.getEventBusOptions().setSsl(true).setEnabledSecureTransportProtocols(Set.of("TLSv1.3", "TLSv1.2")); + vertxOptions.getEventBusOptions().setClientAuth(ClientAuth.REQUIRED); + } else { + mgr = null; + vertxOptions.setClusterManager(null); + vertxOptions.getEventBusOptions().setClustered(false); + } + + return Mono + .create(sink -> { + if (mgr != null) { + Vertx.clusteredVertx(vertxOptions, MonoUtils.toHandler(sink)); + } else { + sink.success(Vertx.vertx(vertxOptions)); + } + }) + .map(vertx -> new TdClusterManager(mgr, vertxOptions, vertx, vertx.eventBus())); + } + + public Vertx getVertx() { + return vertx; + } + + public EventBus getEventBus() { + return eb; + } + + public VertxOptions getVertxOptions() { + return vertxOptions; + } + + public DeliveryOptions newDeliveryOpts() { + return new DeliveryOptions().setSendTimeout(120000); + } + + /** + * + * @param objectClass + * @param messageCodec + * @param + * @return true if registered, false if already registered + */ + public boolean registerDefaultCodec(Class objectClass, MessageCodec messageCodec) { + try { + eb.registerDefaultCodec(objectClass, messageCodec); + return true; + } catch (IllegalStateException ex) { + if (ex.getMessage().startsWith("Already a default codec registered for class")) { + return false; + } + if (ex.getMessage().startsWith("Already a codec registered with name")) { + return false; + } + throw ex; + } + } + + /** + * Create a message consumer against the specified address. + *

+ * The returned consumer is not yet registered + * at the address, registration will be effective when {@link MessageConsumer#handler(io.vertx.core.Handler)} + * is called. + * + * @param address the address that it will register it at + * @param localOnly if you want to receive only local messages + * @return the event bus message consumer + */ + public MessageConsumer consumer(String address, boolean localOnly) { + if (localOnly) { + return eb.localConsumer(address); + } else { + return eb.consumer(address); + } + } + + /** + * Create a consumer and register it against the specified address. + * + * @param address the address that will register it at + * @param localOnly if you want to receive only local messages + * @param handler the handler that will process the received messages + * + * @return the event bus message consumer + */ + public MessageConsumer consumer(String address, boolean localOnly, Handler> handler) { + if (localOnly) { + return eb.localConsumer(address, handler); + } else { + return eb.consumer(address, handler); + } + } +} diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/TdExecuteObjectMessageCodec.java b/src/main/java/it/tdlight/tdlibsession/td/middle/TdExecuteObjectMessageCodec.java new file mode 100644 index 0000000..3f9acff --- /dev/null +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/TdExecuteObjectMessageCodec.java @@ -0,0 +1,62 @@ +package it.tdlight.tdlibsession.td.middle; + +import io.vertx.core.buffer.Buffer; +import io.vertx.core.eventbus.MessageCodec; +import it.tdlight.jni.TdApi; +import it.tdlight.jni.TdApi.Function; +import it.unimi.dsi.fastutil.io.FastByteArrayInputStream; +import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; + +public class TdExecuteObjectMessageCodec implements MessageCodec { + + public TdExecuteObjectMessageCodec() { + super(); + } + + @Override + public void encodeToWire(Buffer buffer, ExecuteObject t) { + try (var bos = new FastByteArrayOutputStream()) { + try (var dos = new DataOutputStream(bos)) { + dos.writeBoolean(t.isExecuteDirectly()); + t.getRequest().serialize(dos); + } + bos.trim(); + buffer.appendBytes(bos.array); + } catch (IOException ex) { + ex.printStackTrace(); + } + } + + @Override + public ExecuteObject decodeFromWire(int pos, Buffer buffer) { + try (var fis = new FastByteArrayInputStream(buffer.getBytes(pos, buffer.length()))) { + try (var dis = new DataInputStream(fis)) { + return new ExecuteObject(dis.readBoolean(), (Function) TdApi.Deserializer.deserialize(dis)); + } + } catch (IOException ex) { + ex.printStackTrace(); + } + return null; + } + + @Override + public ExecuteObject transform(ExecuteObject t) { + // If a message is sent *locally* across the event bus. + // This sends message just as is + return t; + } + + @Override + public String name() { + return "ExecuteObjectCodec"; + } + + @Override + public byte systemCodecID() { + // Always -1 + return -1; + } +} diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/TdMessageCodec.java b/src/main/java/it/tdlight/tdlibsession/td/middle/TdMessageCodec.java new file mode 100644 index 0000000..8c6e1c7 --- /dev/null +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/TdMessageCodec.java @@ -0,0 +1,66 @@ +package it.tdlight.tdlibsession.td.middle; + +import io.vertx.core.buffer.Buffer; +import io.vertx.core.eventbus.MessageCodec; +import it.tdlight.jni.TdApi; +import it.unimi.dsi.fastutil.io.FastByteArrayInputStream; +import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; + +public class TdMessageCodec implements MessageCodec { + + private final Class clazz; + private final String codecName; + + public TdMessageCodec(Class clazz) { + super(); + this.clazz = clazz; + this.codecName = clazz.getSimpleName() + "TdCodec"; + } + + @Override + public void encodeToWire(Buffer buffer, T t) { + try (var bos = new FastByteArrayOutputStream()) { + try (var dos = new DataOutputStream(bos)) { + t.serialize(dos); + } + bos.trim(); + buffer.appendBytes(bos.array); + } catch (IOException ex) { + ex.printStackTrace(); + } + } + + @Override + public T decodeFromWire(int pos, Buffer buffer) { + try (var fis = new FastByteArrayInputStream(buffer.getBytes(pos, buffer.length()))) { + try (var dis = new DataInputStream(fis)) { + //noinspection unchecked + return (T) TdApi.Deserializer.deserialize(dis); + } + } catch (IOException ex) { + ex.printStackTrace(); + } + return null; + } + + @Override + public T transform(T t) { + // If a message is sent *locally* across the event bus. + // This sends message just as is + return t; + } + + @Override + public String name() { + return codecName; + } + + @Override + public byte systemCodecID() { + // Always -1 + return -1; + } +} diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/TdOptListMessageCodec.java b/src/main/java/it/tdlight/tdlibsession/td/middle/TdOptListMessageCodec.java new file mode 100644 index 0000000..2a9d265 --- /dev/null +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/TdOptListMessageCodec.java @@ -0,0 +1,90 @@ +package it.tdlight.tdlibsession.td.middle; + +import io.vertx.core.buffer.Buffer; +import io.vertx.core.eventbus.MessageCodec; +import it.tdlight.jni.TdApi; +import it.tdlight.jni.TdApi.Error; +import it.tdlight.jni.TdApi.Update; +import it.tdlight.tdlibsession.td.TdResult; +import it.unimi.dsi.fastutil.io.FastByteArrayInputStream; +import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; + +public class TdOptListMessageCodec implements MessageCodec { + + public TdOptListMessageCodec() { + super(); + } + + @Override + public void encodeToWire(Buffer buffer, TdOptionalList ts) { + try (var bos = new FastByteArrayOutputStream()) { + try (var dos = new DataOutputStream(bos)) { + if (ts.isSet()) { + var t = ts.getValues(); + dos.writeInt(t.size()); + for (TdResult t1 : t) { + if (t1.succeeded()) { + dos.writeBoolean(true); + t1.result().serialize(dos); + } else { + dos.writeBoolean(false); + t1.cause().serialize(dos); + } + } + } else { + dos.writeInt(-1); + } + } + bos.trim(); + buffer.appendBytes(bos.array); + } catch (IOException ex) { + ex.printStackTrace(); + } + } + + @Override + public TdOptionalList decodeFromWire(int pos, Buffer buffer) { + try (var fis = new FastByteArrayInputStream(buffer.getBytes(pos, buffer.length()))) { + try (var dis = new DataInputStream(fis)) { + var size = dis.readInt(); + if (size < 0) { + return new TdOptionalList(false, Collections.emptyList()); + } else { + ArrayList> list = new ArrayList<>(); + for (int i = 0; i < size; i++) { + if (dis.readBoolean()) { + list.add(TdResult.succeeded((Update) TdApi.Deserializer.deserialize(dis))); + } else { + list.add(TdResult.failed((Error) TdApi.Deserializer.deserialize(dis))); + } + } + return new TdOptionalList(true, list); + } + } + } catch (IOException | UnsupportedOperationException ex) { + ex.printStackTrace(); + return new TdOptionalList(false, Collections.emptyList()); + } + } + + @Override + public TdOptionalList transform(TdOptionalList ts) { + return ts; + } + + @Override + public String name() { + return "TdOptListCodec"; + } + + @Override + public byte systemCodecID() { + // Always -1 + return -1; + } +} diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/TdOptionalList.java b/src/main/java/it/tdlight/tdlibsession/td/middle/TdOptionalList.java new file mode 100644 index 0000000..73206d3 --- /dev/null +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/TdOptionalList.java @@ -0,0 +1,57 @@ +package it.tdlight.tdlibsession.td.middle; + +import it.tdlight.jni.TdApi; +import it.tdlight.tdlibsession.td.TdResult; +import java.util.List; +import java.util.Objects; +import java.util.StringJoiner; + +public class TdOptionalList { + private final boolean isSet; + private final List> values; + + public TdOptionalList(boolean isSet, List> values) { + this.isSet = isSet; + this.values = values; + } + + public boolean isSet() { + return isSet; + } + + public List> getValues() { + return values; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + TdOptionalList that = (TdOptionalList) o; + + if (isSet != that.isSet) { + return false; + } + return Objects.equals(values, that.values); + } + + @Override + public int hashCode() { + int result = (isSet ? 1 : 0); + result = 31 * result + (values != null ? values.hashCode() : 0); + return result; + } + + @Override + public String toString() { + return new StringJoiner(", ", TdOptionalList.class.getSimpleName() + "[", "]") + .add("isSet=" + isSet) + .add("values=" + values) + .toString(); + } +} diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/TdResultMessageCodec.java b/src/main/java/it/tdlight/tdlibsession/td/middle/TdResultMessageCodec.java new file mode 100644 index 0000000..009528d --- /dev/null +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/TdResultMessageCodec.java @@ -0,0 +1,75 @@ +package it.tdlight.tdlibsession.td.middle; + +import io.vertx.core.buffer.Buffer; +import io.vertx.core.eventbus.MessageCodec; +import it.tdlight.jni.TdApi; +import it.tdlight.tdlibsession.td.TdResultMessage; +import it.unimi.dsi.fastutil.io.FastByteArrayInputStream; +import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; + +@SuppressWarnings("rawtypes") +public class TdResultMessageCodec implements MessageCodec { + + private final String codecName; + + public TdResultMessageCodec() { + super(); + this.codecName = "TdResultCodec"; + } + + @Override + public void encodeToWire(Buffer buffer, TdResultMessage t) { + try (var bos = new FastByteArrayOutputStream()) { + try (var dos = new DataOutputStream(bos)) { + if (t.value != null) { + dos.writeBoolean(true); + t.value.serialize(dos); + } else { + dos.writeBoolean(false); + t.cause.serialize(dos); + } + } + bos.trim(); + buffer.appendBytes(bos.array); + } catch (IOException ex) { + ex.printStackTrace(); + } + } + + @Override + public TdResultMessage decodeFromWire(int pos, Buffer buffer) { + try (var fis = new FastByteArrayInputStream(buffer.getBytes(pos, buffer.length()))) { + try (var dis = new DataInputStream(fis)) { + if (dis.readBoolean()) { + return new TdResultMessage(TdApi.Deserializer.deserialize(dis), null); + } else { + return new TdResultMessage(null, (TdApi.Error) TdApi.Deserializer.deserialize(dis)); + } + } + } catch (IOException ex) { + ex.printStackTrace(); + } + return null; + } + + @Override + public TdResultMessage transform(TdResultMessage t) { + // If a message is sent *locally* across the event bus. + // This sends message just as is + return t; + } + + @Override + public String name() { + return codecName; + } + + @Override + public byte systemCodecID() { + // Always -1 + return -1; + } +} diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java b/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java new file mode 100644 index 0000000..38d714b --- /dev/null +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java @@ -0,0 +1,337 @@ +package it.tdlight.tdlibsession.td.middle.client; + +import io.vertx.circuitbreaker.CircuitBreaker; +import io.vertx.circuitbreaker.CircuitBreakerOptions; +import io.vertx.core.AbstractVerticle; +import io.vertx.core.AsyncResult; +import io.vertx.core.DeploymentOptions; +import io.vertx.core.Promise; +import io.vertx.core.eventbus.Message; +import io.vertx.core.json.JsonObject; +import it.tdlight.common.ConstructorDetector; +import it.tdlight.jni.TdApi; +import it.tdlight.jni.TdApi.AuthorizationStateClosed; +import it.tdlight.jni.TdApi.Function; +import it.tdlight.jni.TdApi.Update; +import it.tdlight.jni.TdApi.UpdateAuthorizationState; +import it.tdlight.tdlibsession.td.ResponseError; +import it.tdlight.tdlibsession.td.TdResult; +import it.tdlight.tdlibsession.td.TdResultMessage; +import it.tdlight.tdlibsession.td.middle.AsyncTdMiddle; +import it.tdlight.tdlibsession.td.middle.ExecuteObject; +import it.tdlight.tdlibsession.td.middle.TdClusterManager; +import it.tdlight.tdlibsession.td.middle.TdExecuteObjectMessageCodec; +import it.tdlight.tdlibsession.td.middle.TdMessageCodec; +import it.tdlight.tdlibsession.td.middle.TdOptListMessageCodec; +import it.tdlight.tdlibsession.td.middle.TdOptionalList; +import it.tdlight.tdlibsession.td.middle.TdResultMessageCodec; +import it.tdlight.utils.MonoUtils; +import java.time.Duration; +import java.util.List; +import java.util.Objects; +import java.util.StringJoiner; +import java.util.logging.Level; +import org.apache.commons.lang3.tuple.Pair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.warp.commonutils.error.InitializationException; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.publisher.ReplayProcessor; + +public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements AsyncTdMiddle { + + private static final Logger logger = LoggerFactory.getLogger(AsyncTdMiddleEventBusClient.class); + public static final boolean OUTPUT_REQUESTS = false; + public static final byte[] EMPTY = new byte[0]; + + private final ReplayProcessor tdClosed = ReplayProcessor.cacheLastOrDefault(false); + + private ReplayProcessor> incomingUpdatesCo = ReplayProcessor.cacheLast(); + + private TdClusterManager cluster; + + private String botAddress; + private String botAlias; + private boolean local; + private long initTime; + + @SuppressWarnings({"unchecked", "rawtypes"}) + public AsyncTdMiddleEventBusClient(TdClusterManager clusterManager) { + cluster = clusterManager; + if (cluster.registerDefaultCodec(TdOptionalList.class, new TdOptListMessageCodec())) { + cluster.registerDefaultCodec(ExecuteObject.class, new TdExecuteObjectMessageCodec()); + cluster.registerDefaultCodec(TdResultMessage.class, new TdResultMessageCodec()); + for (Class value : ConstructorDetector.getTDConstructorsUnsafe().values()) { + cluster.registerDefaultCodec(value, new TdMessageCodec(value)); + } + } + } + + public static Mono getAndDeployInstance(TdClusterManager clusterManager, String botAlias, String botAddress, boolean local) throws InitializationException { + try { + var instance = new AsyncTdMiddleEventBusClient(clusterManager); + var options = new DeploymentOptions().setConfig(new JsonObject() + .put("botAddress", botAddress) + .put("botAlias", botAlias) + .put("local", local)); + return MonoUtils.executeAsFuture(promise -> { + clusterManager.getVertx().deployVerticle(instance, options, promise); + }).doOnNext(_v -> { + logger.trace("Deployed verticle for bot address: " + botAddress); + }).thenReturn(instance); + } catch (RuntimeException e) { + throw new InitializationException(e); + } + } + + @Override + public void start(Promise startPromise) { + var botAddress = config().getString("botAddress"); + if (botAddress == null || botAddress.isEmpty()) { + throw new IllegalArgumentException("botAddress is not set!"); + } + this.botAddress = botAddress; + var botAlias = config().getString("botAlias"); + if (botAlias == null || botAlias.isEmpty()) { + throw new IllegalArgumentException("botAlias is not set!"); + } + this.botAlias = botAlias; + var local = config().getBoolean("local"); + if (local == null) { + throw new IllegalArgumentException("local is not set!"); + } + this.local = local; + this.initTime = System.currentTimeMillis(); + + CircuitBreaker startBreaker = CircuitBreaker.create("bot-" + botAddress + "-server-online-check-circuit-breaker", vertx, + new CircuitBreakerOptions().setMaxFailures(1).setMaxRetries(4).setTimeout(10000) + ) + .retryPolicy(policy -> 4000L) + .openHandler(closed -> { + logger.error("Circuit opened! " + botAddress); + }) + .closeHandler(closed -> { + logger.error("Circuit closed! " + botAddress); + }); + + startBreaker.execute(future -> { + try { + logger.error("Requesting " + botAddress + ".ping"); + cluster + .getEventBus() + .request(botAddress + ".ping", EMPTY, cluster.newDeliveryOpts().setLocalOnly(local), pingMsg -> { + if (pingMsg.succeeded()) { + logger.error("Received ping reply (succeeded)"); + logger.error("Requesting " + botAddress + ".start"); + cluster + .getEventBus() + .request(botAddress + ".start", EMPTY, cluster.newDeliveryOpts().setLocalOnly(local).setSendTimeout(10000), startMsg -> { + if (startMsg.succeeded()) { + logger.error("Requesting " + botAddress + ".isWorking"); + cluster + .getEventBus() + .request(botAddress + ".isWorking", EMPTY, cluster.newDeliveryOpts().setLocalOnly(local).setSendTimeout(10000), msg -> { + if (msg.succeeded()) { + this.listen().then(this.pipe()).timeout(Duration.ofSeconds(10)).subscribe(v -> {}, future::fail, future::complete); + } else { + future.fail(msg.cause()); + } + }); + } else { + future.fail(startMsg.cause()); + } + }); + } else { + logger.error("Received ping reply (failed) (local=" + local + ")", pingMsg.cause()); + future.fail(pingMsg.cause()); + } + } + ); + } catch (Exception ex) { + future.fail(ex); + } + }) + .onFailure(ex -> { + logger.error("Failure when starting bot " + botAddress, ex); + startPromise.fail(new InitializationException("Can't connect tdlib middle client to tdlib middle server!")); + }) + .onSuccess(v -> startPromise.complete()); + } + + @Override + public void stop(Promise stopPromise) { + tdClosed.onNext(true); + stopPromise.complete(); + } + + private Mono listen() { + // Nothing to listen for now + return Mono.empty(); + } + + private Mono pipe() { + incomingUpdatesCo.onNext(this.requestUpdatesBatchFromNetwork() + .repeatWhen(nFlux -> { + return Flux.push(emitter -> { + var dispos = Flux.combineLatest(nFlux, tdClosed, Pair::of).subscribe(val -> { + //noinspection PointlessBooleanExpression + if (val.getRight() == true) { + emitter.complete(); + } else { + if (val.getLeft() == 0) { + emitter.complete(); + } else { + emitter.next(val); + } + } + }); + emitter.onDispose(dispos); + }); + }) // Repeat when there is one batch with a flux of updates + .flatMap(batch -> batch) + .flatMap(update -> { + return Mono.create(sink -> { + if (update.getConstructor() == UpdateAuthorizationState.CONSTRUCTOR) { + var state = (UpdateAuthorizationState) update; + if (state.authorizationState.getConstructor() == AuthorizationStateClosed.CONSTRUCTOR) { + tdClosed.onNext(true); + this.getVertx().undeploy(this.deploymentID(), undeployed -> { + if (undeployed.failed()) { + logger.error("Error when undeploying td verticle", undeployed.cause()); + } + sink.success(update); + }); + } else { + sink.success(update); + } + } else { + sink.success(update); + } + }); + }) + .log("TdMiddle", Level.FINEST).publish().autoConnect(1)); + return Mono.empty(); + } + + private static class UpdatesBatchResult { + public final Flux updatesFlux; + public final boolean completed; + + private UpdatesBatchResult(Flux updatesFlux, boolean completed) { + this.updatesFlux = updatesFlux; + this.completed = completed; + } + + @Override + public String toString() { + return new StringJoiner(", ", UpdatesBatchResult.class.getSimpleName() + "[", "]") + .add("updatesFlux=" + updatesFlux) + .add("completed=" + completed) + .toString(); + } + } + + private Mono> requestUpdatesBatchFromNetwork() { + return Mono + .from(tdClosed) + .single() + .filter(tdClosed -> !tdClosed) + .flatMap(_x -> Mono.>create(sink -> { + cluster.getEventBus().request(botAddress + ".getNextUpdatesBlock", + EMPTY, + cluster.newDeliveryOpts().setLocalOnly(local), + msg -> { + if (msg.failed()) { + //if (System.currentTimeMillis() - initTime <= 30000) { + // // The serve has not been started + // sink.success(Flux.empty()); + //} else { + // // Timeout + sink.error(msg.cause()); + //} + } else { + var result = msg.result(); + if (result.body() == null) { + sink.success(); + } else { + var resultBody = msg.result().body(); + if (resultBody.isSet()) { + List> updates = resultBody.getValues(); + for (TdResult updateObj : updates) { + if (updateObj.succeeded()) { + if (OUTPUT_REQUESTS) { + System.out.println(" <- " + updateObj.result() + .toString() + .replace("\n", " ") + .replace("\t", "") + .replace(" ", "") + .replace(" = ", "=")); + } + } else { + logger.error("Received an errored update", + ResponseError.newResponseError("incoming update", botAlias, updateObj.cause()) + ); + } + } + sink.success(Flux.fromIterable(updates).filter(TdResult::succeeded).map(TdResult::result)); + } else { + // the stream has ended + sink.success(); + } + } + } + } + ); + })); + } + + @Override + public Flux getUpdates() { + return incomingUpdatesCo.filter(Objects::nonNull).take(1).single().flatMapMany(v -> v); + } + + @Override + public Mono> execute(Function request, boolean executeDirectly) { + + var req = new ExecuteObject(executeDirectly, request); + if (OUTPUT_REQUESTS) { + System.out.println(" -> " + request.toString() + .replace("\n", " ") + .replace("\t", "") + .replace(" ", "") + .replace(" = ", "=")); + } + + return Mono.from(tdClosed).single() + .filter(tdClosed -> !tdClosed) + .flatMap(_x -> Mono.>create(sink -> { + cluster.getEventBus().request(botAddress + ".execute", req, cluster.newDeliveryOpts().setLocalOnly(local), (AsyncResult> event) -> { + if (event.succeeded()) { + if (event.result().body() == null) { + sink.success(); + } else { + sink.success(Objects.requireNonNull(event.result().body()).toTdResult()); + } + } else { + sink.error(ResponseError.newResponseError(request, botAlias, event.cause())); + } + }); + + })).flatMap(response -> { + try { + Objects.requireNonNull(response); + if (OUTPUT_REQUESTS) { + System.out.println(" <- " + response.toString() + .replace("\n", " ") + .replace("\t", "") + .replace(" ", "") + .replace(" = ", "=")); + } + return Mono.just((TdResult) response); + } catch (ClassCastException | NullPointerException e) { + return Mono.error(e); + } + }); + } +} diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/direct/AsyncTdMiddleDirect.java b/src/main/java/it/tdlight/tdlibsession/td/middle/direct/AsyncTdMiddleDirect.java new file mode 100644 index 0000000..93971d5 --- /dev/null +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/direct/AsyncTdMiddleDirect.java @@ -0,0 +1,117 @@ +package it.tdlight.tdlibsession.td.middle.direct; + +import static it.tdlight.tdlibsession.td.middle.server.AsyncTdMiddleEventBusServer.WAIT_DURATION; + +import io.vertx.core.AbstractVerticle; +import io.vertx.core.DeploymentOptions; +import io.vertx.core.Promise; +import io.vertx.core.json.JsonObject; +import it.tdlight.jni.TdApi.Function; +import it.tdlight.jni.TdApi.Object; +import it.tdlight.jni.TdApi.Update; +import it.tdlight.tdlibsession.td.ResponseError; +import it.tdlight.tdlibsession.td.TdResult; +import it.tdlight.tdlibsession.td.direct.AsyncTdDirectImpl; +import it.tdlight.tdlibsession.td.middle.AsyncTdMiddle; +import it.tdlight.tdlibsession.td.middle.TdClusterManager; +import it.tdlight.utils.MonoUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.warp.commonutils.error.InitializationException; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.publisher.ReplayProcessor; + +public class AsyncTdMiddleDirect extends AbstractVerticle implements AsyncTdMiddle { + + private static final Logger logger = LoggerFactory.getLogger(AsyncTdMiddleDirect.class); + + protected final ReplayProcessor tdClosed = ReplayProcessor.cacheLastOrDefault(false); + protected AsyncTdDirectImpl td; + private String botAddress; + private String botAlias; + + public AsyncTdMiddleDirect() { + } + + public static Mono getAndDeployInstance(TdClusterManager clusterManager, + String botAlias, + String botAddress) throws InitializationException { + try { + var instance = new AsyncTdMiddleDirect(); + var options = new DeploymentOptions().setConfig(new JsonObject() + .put("botAlias", botAlias) + .put("botAddress", botAddress)); + return MonoUtils.executeAsFuture(promise -> { + clusterManager.getVertx().deployVerticle(instance, options, promise); + }).doOnNext(_v -> { + logger.trace("Deployed verticle for bot " + botAlias + ", address: " + botAddress); + }).thenReturn(instance); + } catch (RuntimeException e) { + throw new InitializationException(e); + } + } + + @Override + public void start(Promise startPromise) { + var botAddress = config().getString("botAddress"); + if (botAddress == null || botAddress.isEmpty()) { + throw new IllegalArgumentException("botAddress is not set!"); + } + this.botAddress = botAddress; + var botAlias = config().getString("botAlias"); + if (botAlias == null || botAlias.isEmpty()) { + throw new IllegalArgumentException("botAlias is not set!"); + } + this.botAlias = botAlias; + + this.td = new AsyncTdDirectImpl(botAlias); + + td.initializeClient().doOnSuccess(v -> startPromise.complete()).subscribe(success -> { + }, (ex) -> { + logger.error("Failure when starting bot " + botAlias + ", address " + botAddress, ex); + startPromise.fail(new InitializationException("Can't connect tdlib middle client to tdlib middle server!")); + }, () -> {}); + } + + @Override + public void stop(Promise stopPromise) { + tdClosed.onNext(true); + td.destroyClient().onErrorResume(ex -> { + logger.error("Can't destroy client", ex); + return Mono.empty(); + }).doOnTerminate(() -> { + logger.debug("TdMiddle verticle stopped"); + }).subscribe(MonoUtils.toSubscriber(stopPromise)); + } + + @Override + public Flux getUpdates() { + return Mono.from(tdClosed).filter(closed -> !closed).flatMapMany(_x -> td.getUpdates(WAIT_DURATION, 1000).flatMap(result -> { + if (result.succeeded()) { + if (result.result().succeeded()) { + return Mono.just(result.result().result()); + } else { + logger.error("Received an errored update", + ResponseError.newResponseError("incoming update", botAlias, result.result().cause()) + ); + return Mono.empty(); + } + } else { + logger.error("Received an errored update", result.cause()); + return Mono.empty(); + } + })); + } + + @Override + public Mono> execute(Function requestFunction, boolean executeDirectly) { + return td.execute(requestFunction, executeDirectly).onErrorMap(error -> { + return ResponseError.newResponseError( + requestFunction, + botAlias, + error + ); + }); + } +} diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/direct/AsyncTdMiddleLocal.java b/src/main/java/it/tdlight/tdlibsession/td/middle/direct/AsyncTdMiddleLocal.java new file mode 100644 index 0000000..6adf053 --- /dev/null +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/direct/AsyncTdMiddleLocal.java @@ -0,0 +1,67 @@ +package it.tdlight.tdlibsession.td.middle.direct; + +import io.vertx.core.DeploymentOptions; +import io.vertx.core.json.JsonObject; +import it.tdlight.jni.TdApi.Function; +import it.tdlight.jni.TdApi.Object; +import it.tdlight.jni.TdApi.Update; +import it.tdlight.tdlibsession.td.TdResult; +import it.tdlight.tdlibsession.td.middle.AsyncTdMiddle; +import it.tdlight.tdlibsession.td.middle.client.AsyncTdMiddleEventBusClient; +import it.tdlight.tdlibsession.td.middle.server.AsyncTdMiddleEventBusServer; +import java.util.Objects; +import org.warp.commonutils.error.InitializationException; +import it.tdlight.tdlibsession.td.direct.AsyncTdDirectImpl; +import it.tdlight.tdlibsession.td.middle.TdClusterManager; +import it.tdlight.utils.MonoUtils; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.publisher.ReplayProcessor; + +public class AsyncTdMiddleLocal implements AsyncTdMiddle { + + private final AsyncTdDirectImpl td; + private final AsyncTdMiddleEventBusServer srv; + private final TdClusterManager masterClusterManager; + private ReplayProcessor cli = ReplayProcessor.cacheLast(); + private final String botAlias; + private final String botAddress; + + public AsyncTdMiddleLocal(TdClusterManager masterClusterManager, String botAlias, String botAddress) throws InitializationException { + this.td = new AsyncTdDirectImpl(botAlias); + this.srv = new AsyncTdMiddleEventBusServer(masterClusterManager); + this.masterClusterManager = masterClusterManager; + this.botAlias = botAlias; + this.botAddress = botAddress; + } + + public Mono start() { + return Mono.create(sink -> { + masterClusterManager + .getVertx() + .deployVerticle(srv, + new DeploymentOptions().setConfig(new JsonObject().put("botAddress", botAddress).put("local", true)), + MonoUtils.toHandler(sink) + ); + }).onErrorMap(InitializationException::new).flatMap(_x -> { + try { + return AsyncTdMiddleEventBusClient.getAndDeployInstance(masterClusterManager, botAlias, botAddress, true).doOnNext(cli -> { + this.cli.onNext(cli); + }).doOnError(error -> this.cli.onError(error)).doFinally(_v -> this.cli.onComplete()); + } catch (InitializationException e) { + this.cli.onError(e); + return Mono.error(e); + } + }).map(v -> this); + } + + @Override + public Flux getUpdates() { + return cli.filter(Objects::nonNull).single().flatMapMany(AsyncTdMiddleEventBusClient::getUpdates); + } + + @Override + public Mono> execute(Function request, boolean executeDirectly) { + return cli.filter(Objects::nonNull).single().flatMap(c -> c.execute(request, executeDirectly)); + } +} diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java b/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java new file mode 100644 index 0000000..17fe198 --- /dev/null +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java @@ -0,0 +1,239 @@ +package it.tdlight.tdlibsession.td.middle.server; + +import static it.tdlight.tdlibsession.td.middle.client.AsyncTdMiddleEventBusClient.OUTPUT_REQUESTS; + +import io.vertx.core.AbstractVerticle; +import io.vertx.core.AsyncResult; +import io.vertx.core.Promise; +import io.vertx.core.eventbus.Message; +import it.tdlight.common.ConstructorDetector; +import it.tdlight.jni.TdApi.AuthorizationStateClosed; +import it.tdlight.jni.TdApi.Update; +import it.tdlight.jni.TdApi.UpdateAuthorizationState; +import it.tdlight.tdlibsession.td.TdResult; +import it.tdlight.tdlibsession.td.TdResultMessage; +import it.tdlight.tdlibsession.td.direct.AsyncTdDirectImpl; +import it.tdlight.tdlibsession.td.middle.ExecuteObject; +import it.tdlight.tdlibsession.td.middle.TdClusterManager; +import it.tdlight.tdlibsession.td.middle.TdExecuteObjectMessageCodec; +import it.tdlight.tdlibsession.td.middle.TdMessageCodec; +import it.tdlight.tdlibsession.td.middle.TdOptListMessageCodec; +import it.tdlight.tdlibsession.td.middle.TdOptionalList; +import it.tdlight.tdlibsession.td.middle.TdResultMessageCodec; +import it.tdlight.utils.MonoUtils; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Queue; +import java.util.concurrent.atomic.AtomicBoolean; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.publisher.ReplayProcessor; +import reactor.util.concurrent.Queues; + +public class AsyncTdMiddleEventBusServer extends AbstractVerticle { + + private static final Logger logger = LoggerFactory.getLogger(AsyncTdMiddleEventBusServer.class); + private static final byte[] EMPTY = new byte[0]; + // todo: restore duration to 2 seconds instead of 10 millis, when the bug of tdlight double queue wait is fixed + public static final Duration WAIT_DURATION = Duration.ofSeconds(1);// Duration.ofMillis(10); + + private final TdClusterManager cluster; + + private String botAlias; + private String botAddress; + private boolean local; + + protected final ReplayProcessor tdClosed = ReplayProcessor.cacheLastOrDefault(false); + protected AsyncTdDirectImpl td; + protected final Queue>> queue = Queues.>>unbounded().get(); + + @SuppressWarnings({"unchecked", "rawtypes"}) + public AsyncTdMiddleEventBusServer(TdClusterManager clusterManager) { + this.cluster = clusterManager; + if (cluster.registerDefaultCodec(TdOptionalList.class, new TdOptListMessageCodec())) { + cluster.registerDefaultCodec(ExecuteObject.class, new TdExecuteObjectMessageCodec()); + cluster.registerDefaultCodec(TdResultMessage.class, new TdResultMessageCodec()); + for (Class value : ConstructorDetector.getTDConstructorsUnsafe().values()) { + cluster.registerDefaultCodec(value, new TdMessageCodec(value)); + } + } + } + + @Override + public void start(Promise startPromise) { + var botAddress = config().getString("botAddress"); + if (botAddress == null || botAddress.isEmpty()) { + throw new IllegalArgumentException("botAddress is not set!"); + } + this.botAddress = botAddress; + var botAlias = config().getString("botAlias"); + if (botAlias == null || botAlias.isEmpty()) { + throw new IllegalArgumentException("botAlias is not set!"); + } + this.botAlias = botAlias; + var local = config().getBoolean("local"); + if (local == null) { + throw new IllegalArgumentException("local is not set!"); + } + this.local = local; + this.td = new AsyncTdDirectImpl(botAlias); + + cluster.getEventBus().consumer(botAddress + ".ping", (Message msg) -> { + logger.error("Received ping. Replying..."); + msg.reply(EMPTY); + logger.error("Replied."); + }); + + AtomicBoolean alreadyDeployed = new AtomicBoolean(false); + cluster.getEventBus().consumer(botAddress + ".start", (Message msg) -> { + if (alreadyDeployed.compareAndSet(false, true)) { + td.initializeClient() + .then(this.listen()) + .then(this.pipe()) + .then(Mono.create(registrationSink -> { + + cluster.getEventBus().consumer(botAddress + ".isWorking", (Message workingMsg) -> { + workingMsg.reply(EMPTY, cluster.newDeliveryOpts().setLocalOnly(local)); + }).completionHandler(MonoUtils.toHandler(registrationSink)); + + })) + .subscribe(v -> {}, ex -> { + logger.info(botAddress + " server deployed and started. succeeded: false"); + logger.error(ex.getLocalizedMessage(), ex); + msg.fail(500, ex.getLocalizedMessage()); + }, () -> { + logger.info(botAddress + " server deployed and started. succeeded: true"); + msg.reply(EMPTY); + }); + } else { + msg.reply(EMPTY); + } + }).completionHandler(h -> { + logger.info(botAddress + " server deployed. succeeded: " + h.succeeded()); + if (h.succeeded()) { + startPromise.complete(h.result()); + } else { + startPromise.fail(h.cause()); + } + }); + } + + @Override + public void stop(Promise stopPromise) { + tdClosed.onNext(true); + td.destroyClient().onErrorResume(ex -> { + logger.error("Can't destroy client", ex); + return Mono.empty(); + }).doOnTerminate(() -> { + logger.debug("TdMiddle verticle stopped"); + }).subscribe(MonoUtils.toSubscriber(stopPromise)); + } + + private Mono listen() { + return Mono.create(registrationSink -> { + cluster.getEventBus().consumer(botAddress + ".getNextUpdatesBlock", (Message msg) -> { + Mono + .from(tdClosed) + .single() + .filter(tdClosedVal -> !tdClosedVal) + .subscribeOn(td.getTdUpdatesScheduler()) + .map(_v -> { + ArrayList>> updatesBatch = new ArrayList<>(); + while (!queue.isEmpty() && updatesBatch.size() < 1000) { + var item = queue.poll(); + if (item == null) break; + updatesBatch.add(item); + } + return updatesBatch; + }) + .flatMap(receivedList -> { + return Flux.fromIterable(receivedList).flatMap(result -> { + if (result.succeeded()) { + var received = result.result(); + if (OUTPUT_REQUESTS) { + System.out.println("<=: " + received + .toString() + .replace("\n", " ") + .replace("\t", "") + .replace(" ", "") + .replace(" = ", "=")); + } + return Mono.create(sink -> { + if (received.succeeded() && received.result().getConstructor() == UpdateAuthorizationState.CONSTRUCTOR) { + var authState = (UpdateAuthorizationState) received.result(); + if (authState.authorizationState.getConstructor() == AuthorizationStateClosed.CONSTRUCTOR) { + tdClosed.onNext(true); + vertx.undeploy(deploymentID(), undeployed -> { + if (undeployed.failed()) { + logger.error("Error when undeploying td verticle", undeployed.cause()); + } + sink.success(); + }); + } else { + sink.success(); + } + } else { + sink.success(); + } + }).then(Mono.>create(sink -> { + sink.success(received); + })); + } else { + logger.error("Received an error update", result.cause()); + return Mono.empty(); + } + }).collectList().map(list -> new TdOptionalList(true, list)); + }) + .defaultIfEmpty(new TdOptionalList(false, Collections.emptyList())) + .subscribe(v -> { + msg.reply(v); + }, ex -> { + logger.error("Error when processing a 'receiveUpdates' request", ex); + msg.fail(500, ex.getLocalizedMessage()); + }, () -> {}); + }).completionHandler(MonoUtils.toHandler(registrationSink)); + + }).then(Mono.create(registrationSink -> { + + cluster.getEventBus().consumer(botAddress + ".execute", (Message msg) -> { + try { + if (OUTPUT_REQUESTS) { + System.out.println(":=> " + msg + .body() + .getRequest() + .toString() + .replace("\n", " ") + .replace("\t", "") + .replace(" ", "") + .replace(" = ", "=")); + } + td.execute(msg.body().getRequest(), msg.body().isExecuteDirectly()).single().subscribe(response -> { + msg.reply(new TdResultMessage(response.result(), response.cause()), cluster.newDeliveryOpts().setLocalOnly(local)); + }, ex -> { + msg.fail(500, ex.getLocalizedMessage()); + logger.error("Error when processing a request", ex); + }); + } catch (ClassCastException ex) { + msg.fail(500, ex.getMessage()); + logger.error("Error when deserializing a request", ex); + } + }).completionHandler(MonoUtils.toHandler(registrationSink)); + + })); + } + + private Mono pipe() { + return Mono.fromCallable(() -> { + td + .getUpdates(WAIT_DURATION, 1000) + .bufferTimeout(1000, local ? Duration.ofMillis(1) : Duration.ofMillis(100)) + .subscribe(nextItems -> { + queue.addAll(nextItems); + }); + return (Void) null; + }); + } +} diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/server/RequestId.java b/src/main/java/it/tdlight/tdlibsession/td/middle/server/RequestId.java new file mode 100644 index 0000000..33a0d34 --- /dev/null +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/server/RequestId.java @@ -0,0 +1,19 @@ +package it.tdlight.tdlibsession.td.middle.server; + +import java.util.concurrent.atomic.AtomicLong; +import reactor.core.publisher.Mono; + +public class RequestId { + + public static Mono create() { + AtomicLong _requestId = new AtomicLong(1); + + return Mono.fromCallable(() -> _requestId.updateAndGet(n -> { + if (n > Long.MAX_VALUE - 100) { + return 1; + } else { + return n + 1; + } + })); + } +} diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/server/RequestIdToReplyAddress.java b/src/main/java/it/tdlight/tdlibsession/td/middle/server/RequestIdToReplyAddress.java new file mode 100644 index 0000000..57abb7c --- /dev/null +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/server/RequestIdToReplyAddress.java @@ -0,0 +1,30 @@ +package it.tdlight.tdlibsession.td.middle.server; + +import io.vertx.core.Promise; +import it.tdlight.jni.TdApi.Object; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import org.jetbrains.annotations.Async.Execute; +import org.jetbrains.annotations.Async.Schedule; + +public class RequestIdToReplyAddress { + private final ConcurrentHashMap> reqIdToReplyAddress = new ConcurrentHashMap<>();; + + public RequestIdToReplyAddress() { + + } + + public void schedule(@Schedule Long requestId, Promise replyPromise) { + reqIdToReplyAddress.put(requestId, replyPromise); + } + + public void failed(@Execute Long requestId, Promise replyPromise) { + reqIdToReplyAddress.remove(requestId, replyPromise); + } + + public void complete(@Execute Long id, Object item) { + var replyPromise = reqIdToReplyAddress.remove(id); + Objects.requireNonNull(replyPromise, () -> "Reply promise must be not empty"); + replyPromise.complete(item); + } +} diff --git a/src/main/java/it/tdlight/utils/MonoUtils.java b/src/main/java/it/tdlight/utils/MonoUtils.java new file mode 100644 index 0000000..414296e --- /dev/null +++ b/src/main/java/it/tdlight/utils/MonoUtils.java @@ -0,0 +1,128 @@ +package it.tdlight.utils; + +import io.vertx.core.AsyncResult; +import io.vertx.core.Handler; +import io.vertx.core.Promise; +import io.vertx.core.Vertx; +import it.tdlight.jni.TdApi; +import it.tdlight.tdlibsession.td.TdError; +import it.tdlight.tdlibsession.td.TdResult; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.function.BiConsumer; +import java.util.function.Consumer; +import org.reactivestreams.Subscription; +import org.warp.commonutils.concurrency.future.CompletableFutureUtils; +import reactor.core.CoreSubscriber; +import reactor.core.publisher.Mono; +import reactor.core.publisher.MonoSink; +import reactor.core.publisher.SynchronousSink; +import reactor.util.context.Context; + +public class MonoUtils { + + public static Handler> toHandler(SynchronousSink sink) { + return event -> { + if (event.succeeded()) { + if (event.result() == null) { + sink.complete(); + } else { + sink.next(Objects.requireNonNull(event.result())); + } + } else { + sink.error(event.cause()); + } + }; + } + + public static Handler> toHandler(MonoSink sink) { + return event -> { + if (event.succeeded()) { + if (event.result() == null) { + sink.success(); + } else { + sink.success(Objects.requireNonNull(event.result())); + } + } else { + sink.error(event.cause()); + } + }; + } + + public static SynchronousSink toSink(Context context, Promise promise) { + return PromiseSink.of(context, promise); + } + + public static BiConsumer> executeBlockingSink(Vertx vertx, BiConsumer> handler) { + return (value, sink) -> { + vertx.executeBlocking((Promise finished) -> { + handler.accept(value, PromiseSink.of(sink.currentContext(), finished)); + }, toHandler(sink)); + }; + } + + public static Mono executeBlocking(Vertx vertx, Consumer> action) { + return Mono.create((MonoSink sink) -> { + vertx.executeBlocking((Promise finished) -> { + action.accept(toSink(sink.currentContext(), finished)); + }, toHandler(sink)); + }); + } + + public static Mono executeAsFuture(Consumer>> action) { + return Mono.fromFuture(() -> { + return CompletableFutureUtils.getCompletableFuture(() -> { + var resultFuture = new CompletableFuture(); + action.accept(handler -> { + if (handler.failed()) { + resultFuture.completeExceptionally(handler.cause()); + } else { + resultFuture.complete(handler.result()); + } + }); + return resultFuture; + }); + }); + } + + public static CoreSubscriber toSubscriber(Promise promise) { + return new CoreSubscriber() { + @Override + public void onSubscribe(Subscription s) { + s.request(1); + } + + @Override + public void onNext(T t) { + promise.complete(t); + } + + @Override + public void onError(Throwable t) { + promise.fail(t); + } + + @Override + public void onComplete() { + promise.tryComplete(); + } + }; + } + + public static void orElseThrowFuture(TdResult value, SynchronousSink> sink) { + if (value.succeeded()) { + sink.next(CompletableFuture.completedFuture(value.result())); + } else { + sink.next(CompletableFuture.failedFuture(new TdError(value.cause().code, value.cause().message))); + } + } + + public static void orElseThrow(TdResult value, SynchronousSink sink) { + if (value.succeeded()) { + sink.next(value.result()); + } else { + sink.complete(); + //sink.error(new TdError(value.cause().code, value.cause().message)); + } + } +} diff --git a/src/main/java/it/tdlight/utils/PromiseSink.java b/src/main/java/it/tdlight/utils/PromiseSink.java new file mode 100644 index 0000000..4aabd7a --- /dev/null +++ b/src/main/java/it/tdlight/utils/PromiseSink.java @@ -0,0 +1,49 @@ +package it.tdlight.utils; + +import io.vertx.core.Promise; +import org.jetbrains.annotations.NotNull; +import reactor.core.publisher.SynchronousSink; +import reactor.util.context.Context; + +public abstract class PromiseSink implements SynchronousSink { + + private final Promise promise; + + private PromiseSink(Promise promise) { + this.promise = promise; + } + + public static PromiseSink of(Context context, Promise promise) { + return new PromiseSinkImpl<>(promise, context); + } + + @Override + public void complete() { + promise.complete(); + } + + @Override + public void error(@NotNull Throwable error) { + promise.fail(error); + } + + @Override + public void next(@NotNull T value) { + promise.complete(value); + } + + private static class PromiseSinkImpl extends PromiseSink { + + private final Context context; + + public PromiseSinkImpl(Promise promise, Context context) { + super(promise); + this.context = context; + } + + @Override + public @NotNull Context currentContext() { + return context; + } + } +} diff --git a/src/main/resources/tdlib-session-container-log4j2.xml b/src/main/resources/tdlib-session-container-log4j2.xml new file mode 100644 index 0000000..922d77c --- /dev/null +++ b/src/main/resources/tdlib-session-container-log4j2.xml @@ -0,0 +1,39 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file