From 139971f4597a7701369c2cbbcc484e60826794c3 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Sun, 14 Nov 2021 13:07:24 +0100 Subject: [PATCH] Fully working session initialization --- service-td/pom.xml | 4 + .../java/io/volvox/td/ExecutorProducer.java | 17 ----- .../td/JacksonTdObjectJsonSerializer.java | 72 ++++++++++++++++++ .../main/java/io/volvox/td/RandomUUID.java | 6 ++ .../td/ReactiveTelegramClientProducer.java | 25 +++++++ .../java/io/volvox/td/TdClientProducers.java | 38 ---------- .../{TdSession.java => TdEventBusClient.java} | 74 ++++++++++--------- .../java/io/volvox/td/TdNativeClient.java | 12 ++- .../src/main/java/io/volvox/td/TdObject.java | 1 + .../io/volvox/td/TdObjectJsonSerializer.java | 11 +++ .../main/java/io/volvox/td/TdResource.java | 8 +- .../src/main/java/io/volvox/td/TdService.java | 30 +++++++- .../java/io/volvox/td/TdSessionRegistry.java | 28 ------- .../java/io/volvox/td/TdSessionResource.java | 59 +++++++++++++++ .../java/io/volvox/td/TdResourceTest.java | 61 ++++++++++++++- 15 files changed, 314 insertions(+), 132 deletions(-) delete mode 100644 service-td/src/main/java/io/volvox/td/ExecutorProducer.java create mode 100644 service-td/src/main/java/io/volvox/td/JacksonTdObjectJsonSerializer.java create mode 100644 service-td/src/main/java/io/volvox/td/ReactiveTelegramClientProducer.java delete mode 100644 service-td/src/main/java/io/volvox/td/TdClientProducers.java rename service-td/src/main/java/io/volvox/td/{TdSession.java => TdEventBusClient.java} (52%) create mode 100644 service-td/src/main/java/io/volvox/td/TdObjectJsonSerializer.java delete mode 100644 service-td/src/main/java/io/volvox/td/TdSessionRegistry.java create mode 100644 service-td/src/main/java/io/volvox/td/TdSessionResource.java diff --git a/service-td/pom.xml b/service-td/pom.xml index a0e1310..3355cce 100644 --- a/service-td/pom.xml +++ b/service-td/pom.xml @@ -96,6 +96,10 @@ it.tdlight tdlight-natives-linux-amd64 + + org.apache.commons + commons-lang3 + diff --git a/service-td/src/main/java/io/volvox/td/ExecutorProducer.java b/service-td/src/main/java/io/volvox/td/ExecutorProducer.java deleted file mode 100644 index 432c0f9..0000000 --- a/service-td/src/main/java/io/volvox/td/ExecutorProducer.java +++ /dev/null @@ -1,17 +0,0 @@ -package io.volvox.td; - -import java.util.concurrent.Executor; -import java.util.concurrent.Executors; -import javax.enterprise.context.ApplicationScoped; -import javax.enterprise.inject.Produces; -import javax.inject.Singleton; - -@ApplicationScoped -public class ExecutorProducer { - - @Produces - @Singleton - public Executor produceExecutor() { - return Executors.newCachedThreadPool(); - } -} diff --git a/service-td/src/main/java/io/volvox/td/JacksonTdObjectJsonSerializer.java b/service-td/src/main/java/io/volvox/td/JacksonTdObjectJsonSerializer.java new file mode 100644 index 0000000..0bf7f0e --- /dev/null +++ b/service-td/src/main/java/io/volvox/td/JacksonTdObjectJsonSerializer.java @@ -0,0 +1,72 @@ +package io.volvox.td; + +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.jsontype.BasicPolymorphicTypeValidator; +import com.fasterxml.jackson.databind.jsontype.NamedType; +import it.tdlight.common.Init; +import it.tdlight.common.utils.CantLoadLibrary; +import it.tdlight.jni.TdApi; +import java.io.IOException; +import java.io.InputStream; +import java.lang.reflect.Modifier; +import javax.enterprise.context.ApplicationScoped; + +@ApplicationScoped +public class JacksonTdObjectJsonSerializer implements TdObjectJsonSerializer { + + private final ObjectMapper objectMapper; + + static { + try { + Init.start(); + } catch (CantLoadLibrary e) { + throw new RuntimeException(e); + } + } + + public JacksonTdObjectJsonSerializer() { + var objectMapper = new ObjectMapper(); + var validator = BasicPolymorphicTypeValidator.builder(); + // Iterate TdApi inner classes + for (Class declaredClass : TdApi.class.getDeclaredClasses()) { + // Register only TDLib objects + if (TdApi.Object.class.isAssignableFrom(declaredClass)) { + if (Modifier.isAbstract(declaredClass.getModifiers())) { + // Register abstract base type + + objectMapper.addMixIn(declaredClass, AbstractTypeMixIn.class); + validator.allowIfBaseType(declaredClass); + } else { + // Register named subtype + + validator.allowIfSubType(declaredClass); + objectMapper.registerSubtypes(new NamedType(declaredClass, declaredClass.getSimpleName())); + } + } + } + this.objectMapper = objectMapper; + } + + @Override + public TdApi.Object deserialize(InputStream json) { + try { + return objectMapper.readValue(json, TdApi.Object.class); + } catch (IOException e) { + throw new UnsupportedOperationException(e); + } + } + + @Override + public String serialize(TdApi.Object object) { + try { + return objectMapper.writeValueAsString(object); + } catch (JsonProcessingException e) { + throw new UnsupportedOperationException(e); + } + } + + @JsonTypeInfo(use = JsonTypeInfo.Id.NAME) + public abstract static class AbstractTypeMixIn {} +} diff --git a/service-td/src/main/java/io/volvox/td/RandomUUID.java b/service-td/src/main/java/io/volvox/td/RandomUUID.java index 2b04ece..d63f3fd 100644 --- a/service-td/src/main/java/io/volvox/td/RandomUUID.java +++ b/service-td/src/main/java/io/volvox/td/RandomUUID.java @@ -1,7 +1,9 @@ package io.volvox.td; import java.util.UUID; +import javax.enterprise.context.Dependent; +@Dependent public class RandomUUID { final String uuid; @@ -10,4 +12,8 @@ public class RandomUUID { this.uuid = UUID.randomUUID().toString(); } + @Override + public String toString() { + return uuid; + } } diff --git a/service-td/src/main/java/io/volvox/td/ReactiveTelegramClientProducer.java b/service-td/src/main/java/io/volvox/td/ReactiveTelegramClientProducer.java new file mode 100644 index 0000000..5bf9d35 --- /dev/null +++ b/service-td/src/main/java/io/volvox/td/ReactiveTelegramClientProducer.java @@ -0,0 +1,25 @@ +package io.volvox.td; + +import it.tdlight.common.Init; +import it.tdlight.common.ReactiveTelegramClient; +import it.tdlight.common.utils.CantLoadLibrary; +import it.tdlight.tdlight.ClientManager; +import javax.enterprise.inject.Produces; +import org.jboss.logging.Logger; + +class ReactiveTelegramClientProducer { + + private static final Logger LOGGER = Logger.getLogger(ReactiveTelegramClient.class); + + @Produces + static ReactiveTelegramClient produceNativeClient() { + LOGGER.debug("Producing native client"); + try { + Init.start(); + } catch (CantLoadLibrary e) { + LOGGER.error("Failed to start native library", e); + throw new RuntimeException(e); + } + return ClientManager.createReactive(); + } +} diff --git a/service-td/src/main/java/io/volvox/td/TdClientProducers.java b/service-td/src/main/java/io/volvox/td/TdClientProducers.java deleted file mode 100644 index e85607b..0000000 --- a/service-td/src/main/java/io/volvox/td/TdClientProducers.java +++ /dev/null @@ -1,38 +0,0 @@ -package io.volvox.td; - -import io.quarkus.vertx.runtime.VertxProducer; -import it.tdlight.common.Init; -import it.tdlight.common.ReactiveTelegramClient; -import it.tdlight.common.utils.CantLoadLibrary; -import it.tdlight.tdlight.ClientManager; -import java.time.Duration; -import javax.enterprise.context.ApplicationScoped; -import javax.enterprise.inject.Produces; -import javax.inject.Singleton; -import org.eclipse.microprofile.config.inject.ConfigProperty; -import org.jboss.logging.Logger; - -@ApplicationScoped -class TdClientProducers { - - private static final Logger LOGGER = Logger.getLogger(VertxProducer.class); - - @ConfigProperty(name = "td.requests.timeout") - Duration requestTimeout; - - @Produces - ReactiveTelegramClient produceReactiveTelegramClient() { - try { - Init.start(); - } catch (CantLoadLibrary e) { - LOGGER.error(e); - } - return ClientManager.createReactive(); - } - - @Produces - @Singleton - public Duration produceRequestTimeout() { - return requestTimeout; - } -} diff --git a/service-td/src/main/java/io/volvox/td/TdSession.java b/service-td/src/main/java/io/volvox/td/TdEventBusClient.java similarity index 52% rename from service-td/src/main/java/io/volvox/td/TdSession.java rename to service-td/src/main/java/io/volvox/td/TdEventBusClient.java index 7e80289..30fcce7 100644 --- a/service-td/src/main/java/io/volvox/td/TdSession.java +++ b/service-td/src/main/java/io/volvox/td/TdEventBusClient.java @@ -5,45 +5,47 @@ import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.Uni; import io.smallrye.mutiny.subscription.Cancellable; import io.vertx.core.eventbus.DeliveryOptions; -import io.vertx.core.eventbus.EventBus; -import io.vertx.core.eventbus.Message; +import io.vertx.mutiny.core.eventbus.EventBus; +import io.vertx.mutiny.core.eventbus.Message; +import it.tdlight.jni.TdApi; import it.tdlight.jni.TdApi.Function; -import it.tdlight.jni.TdApi.Object; import it.tdlight.jni.TdApi.Update; -import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicReference; +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; import javax.enterprise.context.Dependent; import javax.inject.Inject; +import org.apache.commons.lang3.concurrent.BasicThreadFactory; @Dependent -public class TdSession implements TdClient { +public class TdEventBusClient implements TdClient { + + @Inject + TdNativeClient client; + + private static final ExecutorService EXECUTOR_SERVICE + = Executors.newCachedThreadPool(new BasicThreadFactory.Builder().namingPattern("TdSession").build()); private static final DeliveryOptions UPDATES_OPTS = new DeliveryOptions().setCodecName("TdObjectCodec"); private static final DeliveryOptions SEND_OPTS = new DeliveryOptions().setCodecName("TdObjectCodec"); - @Inject - RandomUUID uuid; - - @Inject - Executor executor; - - @Inject - TdNativeClient client; - @Inject EventBus bus; private final AtomicReference updatesPublisher = new AtomicReference<>(); - @Override public Multi updates() { - return client.updates(); + @PostConstruct + void init() { + this.publishUpdates(); } public void publishUpdates() { var newPublisher = this.updates() - .runSubscriptionOn(executor) + .runSubscriptionOn(EXECUTOR_SERVICE) .subscribe() .with(item -> bus.publish("td.update", item, UPDATES_OPTS)); var prev = this.updatesPublisher.getAndSet(newPublisher); @@ -52,29 +54,35 @@ public class TdSession implements TdClient { } } - @ConsumeEvent(value = "td.send", codec = TdObjectCodec.class) - void onSendRequest(Message msg) { - this.send(msg.body().getObject()).subscribe() - .with(message -> msg.reply(message, SEND_OPTS), - ex -> { - if (ex instanceof TdException tdException) { - msg.fail(tdException.getCode(), tdException.getMessage()); - } else { - msg.fail(500, ex.toString()); - } - } - ); + @Override + public Multi updates() { + return client.updates(); } - @Override public Uni send(Function function) { + @Override + public Uni send(Function function) { return client.send(function); } - @SuppressWarnings("unchecked") @Override public Uni execute(Function function) { - return null; + @Override + public Uni execute(Function function) { + return client.execute(function); } - @Override public void dispose() { + @ConsumeEvent(value = "td.send", codec = TdObjectCodec.class) + public void onSendRequest(Message msg) { + this.send(msg.body().getObject()).subscribe().with(message -> msg.reply(message, SEND_OPTS), ex -> { + if (ex instanceof TdException tdException) { + msg.fail(tdException.getCode(), tdException.getMessage()); + } else { + msg.fail(500, ex.toString()); + } + }); + } + + @Override + @PreDestroy + public void dispose() { var updatesPublisher = this.updatesPublisher.get(); if (updatesPublisher != null) { updatesPublisher.cancel(); diff --git a/service-td/src/main/java/io/volvox/td/TdNativeClient.java b/service-td/src/main/java/io/volvox/td/TdNativeClient.java index 501fd83..e52dc69 100644 --- a/service-td/src/main/java/io/volvox/td/TdNativeClient.java +++ b/service-td/src/main/java/io/volvox/td/TdNativeClient.java @@ -9,9 +9,10 @@ import it.tdlight.jni.TdApi.Error; import it.tdlight.jni.TdApi.Update; import java.time.Duration; import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; import javax.enterprise.context.Dependent; -import javax.enterprise.event.Observes; import javax.inject.Inject; +import org.eclipse.microprofile.config.inject.ConfigProperty; @Dependent public class TdNativeClient implements TdClient { @@ -19,7 +20,7 @@ public class TdNativeClient implements TdClient { @Inject ReactiveTelegramClient client; - @Inject + @ConfigProperty(name = "td.requests.timeout") Duration requestTimeout; private Multi updates; @@ -44,7 +45,8 @@ public class TdNativeClient implements TdClient { }).broadcast().toAllSubscribers(); } - @Override public Multi updates() { + @Override + public Multi updates() { return updates; } @@ -80,7 +82,9 @@ public class TdNativeClient implements TdClient { }); } - @Override public void dispose() { + @Override + @PreDestroy + public void dispose() { this.client.dispose(); } } diff --git a/service-td/src/main/java/io/volvox/td/TdObject.java b/service-td/src/main/java/io/volvox/td/TdObject.java index 0a7b280..869f969 100644 --- a/service-td/src/main/java/io/volvox/td/TdObject.java +++ b/service-td/src/main/java/io/volvox/td/TdObject.java @@ -2,6 +2,7 @@ package io.volvox.td; import it.tdlight.jni.TdApi; +@SuppressWarnings("CdiInjectionPointsInspection") public class TdObject { private final TdApi.Object object; diff --git a/service-td/src/main/java/io/volvox/td/TdObjectJsonSerializer.java b/service-td/src/main/java/io/volvox/td/TdObjectJsonSerializer.java new file mode 100644 index 0000000..6f4ce52 --- /dev/null +++ b/service-td/src/main/java/io/volvox/td/TdObjectJsonSerializer.java @@ -0,0 +1,11 @@ +package io.volvox.td; + +import it.tdlight.jni.TdApi; +import java.io.InputStream; + +public interface TdObjectJsonSerializer { + + TdApi.Object deserialize(InputStream json); + + String serialize(TdApi.Object object); +} diff --git a/service-td/src/main/java/io/volvox/td/TdResource.java b/service-td/src/main/java/io/volvox/td/TdResource.java index 79ff526..2c1aa72 100644 --- a/service-td/src/main/java/io/volvox/td/TdResource.java +++ b/service-td/src/main/java/io/volvox/td/TdResource.java @@ -9,8 +9,6 @@ import javax.ws.rs.core.MediaType; @Path("/api/td") public class TdResource { - @Inject TdSessionRegistry tdSessionRegistry; - @Inject TdService tdService; @Path("/list") @@ -18,8 +16,8 @@ public class TdResource { @Produces(MediaType.TEXT_PLAIN) public String listSessions() { StringBuilder sb = new StringBuilder(); - for (var session : tdSessionRegistry.getSessions()) { - sb.append(session).append(System.lineSeparator()); + for (var session : tdService.getSessions()) { + sb.append(session.getKey()).append(System.lineSeparator()); } return sb.toString(); } @@ -28,6 +26,6 @@ public class TdResource { @GET @Produces(MediaType.TEXT_PLAIN) public String createSession() { - return tdService.startSession(null); + return tdService.startSession(); } } diff --git a/service-td/src/main/java/io/volvox/td/TdService.java b/service-td/src/main/java/io/volvox/td/TdService.java index 76d2420..1864350 100644 --- a/service-td/src/main/java/io/volvox/td/TdService.java +++ b/service-td/src/main/java/io/volvox/td/TdService.java @@ -3,6 +3,12 @@ package io.volvox.td; import io.quarkus.runtime.ShutdownEvent; import io.quarkus.vertx.ConsumeEvent; import io.vertx.core.eventbus.EventBus; +import io.vertx.core.eventbus.Message; +import it.tdlight.tdnative.NativeClient; +import it.tdlight.tdnative.NativeLog; +import java.util.Map.Entry; +import java.util.Optional; +import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -20,15 +26,19 @@ public class TdService { EventBus bus; @Inject - Instance sessionInstances; + Instance sessionInstances; @ConsumeEvent(value = "td.start-session") - public String startSession(Void param) { + private void onStartSession(Message msg) { + var sessionId = this.startSession(); + msg.reply(sessionId); + } + + public String startSession() { String uuid = generateRandomUUID(); var client = sessionInstances.get(); clients.put(uuid, client); - client.publishUpdates(); return uuid; } @@ -40,4 +50,18 @@ public class TdService { void shutdown(@Observes ShutdownEvent event) { clients.forEach((uuid, client) -> client.dispose()); } + + public Optional get(String uuid) { + if (uuid == null) return Optional.empty(); + return Optional.ofNullable(clients.get(uuid)); + } + + public Optional get(TdClient client) { + if (client == null) return Optional.empty(); + return clients.entrySet().stream().filter(e -> e.getValue() == client).map(Entry::getKey).findAny(); + } + + public Set> getSessions() { + return clients.entrySet(); + } } \ No newline at end of file diff --git a/service-td/src/main/java/io/volvox/td/TdSessionRegistry.java b/service-td/src/main/java/io/volvox/td/TdSessionRegistry.java deleted file mode 100644 index 1ed09bd..0000000 --- a/service-td/src/main/java/io/volvox/td/TdSessionRegistry.java +++ /dev/null @@ -1,28 +0,0 @@ -package io.volvox.td; - -import io.vertx.core.impl.ConcurrentHashSet; -import java.util.Set; -import javax.enterprise.context.ApplicationScoped; -import javax.enterprise.inject.Disposes; -import javax.enterprise.inject.Produces; - -@ApplicationScoped -public class TdSessionRegistry { - - private final Set clients = new ConcurrentHashSet<>(); - - @Produces - public RandomUUID produceUUID() { - var randomUUID = new RandomUUID(); - clients.add(randomUUID.uuid); - return randomUUID; - } - - public void cleanUUID(@Disposes RandomUUID toClean) { - clients.remove(toClean.uuid); - } - - public Set getSessions() { - return clients; - } -} diff --git a/service-td/src/main/java/io/volvox/td/TdSessionResource.java b/service-td/src/main/java/io/volvox/td/TdSessionResource.java new file mode 100644 index 0000000..d987b6e --- /dev/null +++ b/service-td/src/main/java/io/volvox/td/TdSessionResource.java @@ -0,0 +1,59 @@ +package io.volvox.td; + +import io.quarkus.runtime.StartupEvent; +import io.smallrye.mutiny.Uni; +import it.tdlight.jni.TdApi; +import it.tdlight.jni.TdApi.Function; +import it.tdlight.jni.TdApi.Object; +import java.io.InputStream; +import java.util.NoSuchElementException; +import java.util.Objects; +import javax.annotation.PostConstruct; +import javax.enterprise.context.Dependent; +import javax.enterprise.context.Initialized; +import javax.enterprise.event.Observes; +import javax.inject.Inject; +import javax.ws.rs.Consumes; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.core.MediaType; + +@Path("/api/td/session/{sessionId}") +public class TdSessionResource { + + @PathParam("sessionId") + String sessionId; + + @Inject + TdService tdService; + + @Inject + TdObjectJsonSerializer tdObjectJsonSerializer; + + private TdClient client() { + return tdService.get(sessionId) + .orElseThrow(() -> new NoSuchElementException("Session not found: " + sessionId)); + } + + @SuppressWarnings("unchecked") + @POST + @Produces(MediaType.APPLICATION_JSON) + @Consumes(MediaType.APPLICATION_JSON) + @Path("/send") + public Uni sendRest(InputStream functionJson) { + TdApi.Function requestFunction = (Function) tdObjectJsonSerializer.deserialize(functionJson); + return client().send(requestFunction).map(response -> tdObjectJsonSerializer.serialize(response)); + } + + @SuppressWarnings("unchecked") + @POST + @Produces(MediaType.APPLICATION_JSON) + @Consumes(MediaType.APPLICATION_JSON) + @Path("/execute") + public Uni executeRest(InputStream functionJson) { + TdApi.Function requestFunction = (Function) tdObjectJsonSerializer.deserialize(functionJson); + return client().execute(requestFunction).map(response -> tdObjectJsonSerializer.serialize(response)); + } +} diff --git a/service-td/src/test/java/io/volvox/td/TdResourceTest.java b/service-td/src/test/java/io/volvox/td/TdResourceTest.java index 55bf2a6..47c1d5b 100644 --- a/service-td/src/test/java/io/volvox/td/TdResourceTest.java +++ b/service-td/src/test/java/io/volvox/td/TdResourceTest.java @@ -1,10 +1,21 @@ package io.volvox.td; import io.quarkus.test.junit.QuarkusTest; +import io.restassured.specification.Argument; +import java.util.List; +import java.util.Set; +import org.hamcrest.CoreMatchers; +import org.hamcrest.core.IsNot; +import org.hamcrest.text.IsEmptyString; +import org.hamcrest.text.MatchesPattern; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import static io.restassured.RestAssured.given; +import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.not; +import static org.hamcrest.text.IsEmptyString.emptyOrNullString; @QuarkusTest public class TdResourceTest { @@ -12,10 +23,52 @@ public class TdResourceTest { @Test public void testEmptyList() { given() - .when().get("/api/td/list") - .then() - .statusCode(200) - .body(is("")); + .when().get("/api/td/list") + .then() + .statusCode(200) + .body(is("")); + } + + @Test + public void testCreateSession() { + given() + .when().get("/api/td/create-session") + .then() + .statusCode(200) + .body(not(emptyOrNullString())); + } + + @Test + public void testCreateMultipleSessions() { + var sessionId1 = given() + .when().get("/api/td/create-session") + .then() + .statusCode(200) + .body(not(emptyOrNullString())) + .extract() + .body() + .asString(); + var sessionId2 = given() + .when().get("/api/td/create-session") + .then() + .statusCode(200) + .body(not(emptyOrNullString())) + .extract() + .body() + .asString(); + + var expectedBodyElems = Set.of(sessionId1, sessionId2); + + var bodyElems = Set.of(given() + .when().get("/api/td/list") + .then() + .statusCode(200) + .extract() + .body() + .asString() + .split("\n")); + + Assertions.assertEquals(expectedBodyElems, bodyElems); } } \ No newline at end of file