Fully working session initialization

This commit is contained in:
Andrea Cavalli 2021-11-14 13:07:24 +01:00
parent 0f3b446413
commit 139971f459
15 changed files with 314 additions and 132 deletions

View File

@ -96,6 +96,10 @@
<groupId>it.tdlight</groupId>
<artifactId>tdlight-natives-linux-amd64</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
</dependencies>
<build>
<plugins>

View File

@ -1,17 +0,0 @@
package io.volvox.td;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.inject.Produces;
import javax.inject.Singleton;
@ApplicationScoped
public class ExecutorProducer {
@Produces
@Singleton
public Executor produceExecutor() {
return Executors.newCachedThreadPool();
}
}

View File

@ -0,0 +1,72 @@
package io.volvox.td;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.BasicPolymorphicTypeValidator;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import it.tdlight.common.Init;
import it.tdlight.common.utils.CantLoadLibrary;
import it.tdlight.jni.TdApi;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.Modifier;
import javax.enterprise.context.ApplicationScoped;
@ApplicationScoped
public class JacksonTdObjectJsonSerializer implements TdObjectJsonSerializer {
private final ObjectMapper objectMapper;
static {
try {
Init.start();
} catch (CantLoadLibrary e) {
throw new RuntimeException(e);
}
}
public JacksonTdObjectJsonSerializer() {
var objectMapper = new ObjectMapper();
var validator = BasicPolymorphicTypeValidator.builder();
// Iterate TdApi inner classes
for (Class<?> declaredClass : TdApi.class.getDeclaredClasses()) {
// Register only TDLib objects
if (TdApi.Object.class.isAssignableFrom(declaredClass)) {
if (Modifier.isAbstract(declaredClass.getModifiers())) {
// Register abstract base type
objectMapper.addMixIn(declaredClass, AbstractTypeMixIn.class);
validator.allowIfBaseType(declaredClass);
} else {
// Register named subtype
validator.allowIfSubType(declaredClass);
objectMapper.registerSubtypes(new NamedType(declaredClass, declaredClass.getSimpleName()));
}
}
}
this.objectMapper = objectMapper;
}
@Override
public TdApi.Object deserialize(InputStream json) {
try {
return objectMapper.readValue(json, TdApi.Object.class);
} catch (IOException e) {
throw new UnsupportedOperationException(e);
}
}
@Override
public String serialize(TdApi.Object object) {
try {
return objectMapper.writeValueAsString(object);
} catch (JsonProcessingException e) {
throw new UnsupportedOperationException(e);
}
}
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME)
public abstract static class AbstractTypeMixIn {}
}

View File

@ -1,7 +1,9 @@
package io.volvox.td;
import java.util.UUID;
import javax.enterprise.context.Dependent;
@Dependent
public class RandomUUID {
final String uuid;
@ -10,4 +12,8 @@ public class RandomUUID {
this.uuid = UUID.randomUUID().toString();
}
@Override
public String toString() {
return uuid;
}
}

View File

@ -0,0 +1,25 @@
package io.volvox.td;
import it.tdlight.common.Init;
import it.tdlight.common.ReactiveTelegramClient;
import it.tdlight.common.utils.CantLoadLibrary;
import it.tdlight.tdlight.ClientManager;
import javax.enterprise.inject.Produces;
import org.jboss.logging.Logger;
class ReactiveTelegramClientProducer {
private static final Logger LOGGER = Logger.getLogger(ReactiveTelegramClient.class);
@Produces
static ReactiveTelegramClient produceNativeClient() {
LOGGER.debug("Producing native client");
try {
Init.start();
} catch (CantLoadLibrary e) {
LOGGER.error("Failed to start native library", e);
throw new RuntimeException(e);
}
return ClientManager.createReactive();
}
}

View File

@ -1,38 +0,0 @@
package io.volvox.td;
import io.quarkus.vertx.runtime.VertxProducer;
import it.tdlight.common.Init;
import it.tdlight.common.ReactiveTelegramClient;
import it.tdlight.common.utils.CantLoadLibrary;
import it.tdlight.tdlight.ClientManager;
import java.time.Duration;
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.inject.Produces;
import javax.inject.Singleton;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.jboss.logging.Logger;
@ApplicationScoped
class TdClientProducers {
private static final Logger LOGGER = Logger.getLogger(VertxProducer.class);
@ConfigProperty(name = "td.requests.timeout")
Duration requestTimeout;
@Produces
ReactiveTelegramClient produceReactiveTelegramClient() {
try {
Init.start();
} catch (CantLoadLibrary e) {
LOGGER.error(e);
}
return ClientManager.createReactive();
}
@Produces
@Singleton
public Duration produceRequestTimeout() {
return requestTimeout;
}
}

View File

@ -5,45 +5,47 @@ import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.subscription.Cancellable;
import io.vertx.core.eventbus.DeliveryOptions;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.eventbus.Message;
import io.vertx.mutiny.core.eventbus.EventBus;
import io.vertx.mutiny.core.eventbus.Message;
import it.tdlight.jni.TdApi;
import it.tdlight.jni.TdApi.Function;
import it.tdlight.jni.TdApi.Object;
import it.tdlight.jni.TdApi.Update;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.enterprise.context.Dependent;
import javax.inject.Inject;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
@Dependent
public class TdSession implements TdClient {
public class TdEventBusClient implements TdClient {
@Inject
TdNativeClient client;
private static final ExecutorService EXECUTOR_SERVICE
= Executors.newCachedThreadPool(new BasicThreadFactory.Builder().namingPattern("TdSession").build());
private static final DeliveryOptions UPDATES_OPTS
= new DeliveryOptions().setCodecName("TdObjectCodec");
private static final DeliveryOptions SEND_OPTS
= new DeliveryOptions().setCodecName("TdObjectCodec");
@Inject
RandomUUID uuid;
@Inject
Executor executor;
@Inject
TdNativeClient client;
@Inject
EventBus bus;
private final AtomicReference<Cancellable> updatesPublisher = new AtomicReference<>();
@Override public Multi<Update> updates() {
return client.updates();
@PostConstruct
void init() {
this.publishUpdates();
}
public void publishUpdates() {
var newPublisher = this.updates()
.runSubscriptionOn(executor)
.runSubscriptionOn(EXECUTOR_SERVICE)
.subscribe()
.with(item -> bus.publish("td.update", item, UPDATES_OPTS));
var prev = this.updatesPublisher.getAndSet(newPublisher);
@ -52,29 +54,35 @@ public class TdSession implements TdClient {
}
}
@ConsumeEvent(value = "td.send", codec = TdObjectCodec.class)
void onSendRequest(Message<TdObject> msg) {
this.send(msg.body().getObject()).subscribe()
.with(message -> msg.reply(message, SEND_OPTS),
ex -> {
if (ex instanceof TdException tdException) {
msg.fail(tdException.getCode(), tdException.getMessage());
} else {
msg.fail(500, ex.toString());
}
}
);
@Override
public Multi<Update> updates() {
return client.updates();
}
@Override public <T extends Object> Uni<T> send(Function<T> function) {
@Override
public <T extends TdApi.Object> Uni<T> send(Function<T> function) {
return client.send(function);
}
@SuppressWarnings("unchecked") @Override public <T extends Object> Uni<T> execute(Function<T> function) {
return null;
@Override
public <T extends TdApi.Object> Uni<T> execute(Function<T> function) {
return client.execute(function);
}
@Override public void dispose() {
@ConsumeEvent(value = "td.send", codec = TdObjectCodec.class)
public void onSendRequest(Message<TdObject> msg) {
this.send(msg.body().getObject()).subscribe().with(message -> msg.reply(message, SEND_OPTS), ex -> {
if (ex instanceof TdException tdException) {
msg.fail(tdException.getCode(), tdException.getMessage());
} else {
msg.fail(500, ex.toString());
}
});
}
@Override
@PreDestroy
public void dispose() {
var updatesPublisher = this.updatesPublisher.get();
if (updatesPublisher != null) {
updatesPublisher.cancel();

View File

@ -9,9 +9,10 @@ import it.tdlight.jni.TdApi.Error;
import it.tdlight.jni.TdApi.Update;
import java.time.Duration;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.enterprise.context.Dependent;
import javax.enterprise.event.Observes;
import javax.inject.Inject;
import org.eclipse.microprofile.config.inject.ConfigProperty;
@Dependent
public class TdNativeClient implements TdClient {
@ -19,7 +20,7 @@ public class TdNativeClient implements TdClient {
@Inject
ReactiveTelegramClient client;
@Inject
@ConfigProperty(name = "td.requests.timeout")
Duration requestTimeout;
private Multi<Update> updates;
@ -44,7 +45,8 @@ public class TdNativeClient implements TdClient {
}).broadcast().toAllSubscribers();
}
@Override public Multi<TdApi.Update> updates() {
@Override
public Multi<TdApi.Update> updates() {
return updates;
}
@ -80,7 +82,9 @@ public class TdNativeClient implements TdClient {
});
}
@Override public void dispose() {
@Override
@PreDestroy
public void dispose() {
this.client.dispose();
}
}

View File

@ -2,6 +2,7 @@ package io.volvox.td;
import it.tdlight.jni.TdApi;
@SuppressWarnings("CdiInjectionPointsInspection")
public class TdObject {
private final TdApi.Object object;

View File

@ -0,0 +1,11 @@
package io.volvox.td;
import it.tdlight.jni.TdApi;
import java.io.InputStream;
public interface TdObjectJsonSerializer {
TdApi.Object deserialize(InputStream json);
String serialize(TdApi.Object object);
}

View File

@ -9,8 +9,6 @@ import javax.ws.rs.core.MediaType;
@Path("/api/td")
public class TdResource {
@Inject TdSessionRegistry tdSessionRegistry;
@Inject TdService tdService;
@Path("/list")
@ -18,8 +16,8 @@ public class TdResource {
@Produces(MediaType.TEXT_PLAIN)
public String listSessions() {
StringBuilder sb = new StringBuilder();
for (var session : tdSessionRegistry.getSessions()) {
sb.append(session).append(System.lineSeparator());
for (var session : tdService.getSessions()) {
sb.append(session.getKey()).append(System.lineSeparator());
}
return sb.toString();
}
@ -28,6 +26,6 @@ public class TdResource {
@GET
@Produces(MediaType.TEXT_PLAIN)
public String createSession() {
return tdService.startSession(null);
return tdService.startSession();
}
}

View File

@ -3,6 +3,12 @@ package io.volvox.td;
import io.quarkus.runtime.ShutdownEvent;
import io.quarkus.vertx.ConsumeEvent;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.eventbus.Message;
import it.tdlight.tdnative.NativeClient;
import it.tdlight.tdnative.NativeLog;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@ -20,15 +26,19 @@ public class TdService {
EventBus bus;
@Inject
Instance<TdSession> sessionInstances;
Instance<TdEventBusClient> sessionInstances;
@ConsumeEvent(value = "td.start-session")
public String startSession(Void param) {
private void onStartSession(Message<String> msg) {
var sessionId = this.startSession();
msg.reply(sessionId);
}
public String startSession() {
String uuid = generateRandomUUID();
var client = sessionInstances.get();
clients.put(uuid, client);
client.publishUpdates();
return uuid;
}
@ -40,4 +50,18 @@ public class TdService {
void shutdown(@Observes ShutdownEvent event) {
clients.forEach((uuid, client) -> client.dispose());
}
public Optional<TdClient> get(String uuid) {
if (uuid == null) return Optional.empty();
return Optional.ofNullable(clients.get(uuid));
}
public Optional<String> get(TdClient client) {
if (client == null) return Optional.empty();
return clients.entrySet().stream().filter(e -> e.getValue() == client).map(Entry::getKey).findAny();
}
public Set<Entry<String, TdClient>> getSessions() {
return clients.entrySet();
}
}

View File

@ -1,28 +0,0 @@
package io.volvox.td;
import io.vertx.core.impl.ConcurrentHashSet;
import java.util.Set;
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.inject.Disposes;
import javax.enterprise.inject.Produces;
@ApplicationScoped
public class TdSessionRegistry {
private final Set<String> clients = new ConcurrentHashSet<>();
@Produces
public RandomUUID produceUUID() {
var randomUUID = new RandomUUID();
clients.add(randomUUID.uuid);
return randomUUID;
}
public void cleanUUID(@Disposes RandomUUID toClean) {
clients.remove(toClean.uuid);
}
public Set<String> getSessions() {
return clients;
}
}

View File

@ -0,0 +1,59 @@
package io.volvox.td;
import io.quarkus.runtime.StartupEvent;
import io.smallrye.mutiny.Uni;
import it.tdlight.jni.TdApi;
import it.tdlight.jni.TdApi.Function;
import it.tdlight.jni.TdApi.Object;
import java.io.InputStream;
import java.util.NoSuchElementException;
import java.util.Objects;
import javax.annotation.PostConstruct;
import javax.enterprise.context.Dependent;
import javax.enterprise.context.Initialized;
import javax.enterprise.event.Observes;
import javax.inject.Inject;
import javax.ws.rs.Consumes;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
@Path("/api/td/session/{sessionId}")
public class TdSessionResource {
@PathParam("sessionId")
String sessionId;
@Inject
TdService tdService;
@Inject
TdObjectJsonSerializer tdObjectJsonSerializer;
private TdClient client() {
return tdService.get(sessionId)
.orElseThrow(() -> new NoSuchElementException("Session not found: " + sessionId));
}
@SuppressWarnings("unchecked")
@POST
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
@Path("/send")
public <T extends Object> Uni<String> sendRest(InputStream functionJson) {
TdApi.Function<T> requestFunction = (Function<T>) tdObjectJsonSerializer.deserialize(functionJson);
return client().send(requestFunction).map(response -> tdObjectJsonSerializer.serialize(response));
}
@SuppressWarnings("unchecked")
@POST
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
@Path("/execute")
public <T extends Object> Uni<String> executeRest(InputStream functionJson) {
TdApi.Function<T> requestFunction = (Function<T>) tdObjectJsonSerializer.deserialize(functionJson);
return client().execute(requestFunction).map(response -> tdObjectJsonSerializer.serialize(response));
}
}

View File

@ -1,10 +1,21 @@
package io.volvox.td;
import io.quarkus.test.junit.QuarkusTest;
import io.restassured.specification.Argument;
import java.util.List;
import java.util.Set;
import org.hamcrest.CoreMatchers;
import org.hamcrest.core.IsNot;
import org.hamcrest.text.IsEmptyString;
import org.hamcrest.text.MatchesPattern;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import static io.restassured.RestAssured.given;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.text.IsEmptyString.emptyOrNullString;
@QuarkusTest
public class TdResourceTest {
@ -12,10 +23,52 @@ public class TdResourceTest {
@Test
public void testEmptyList() {
given()
.when().get("/api/td/list")
.then()
.statusCode(200)
.body(is(""));
.when().get("/api/td/list")
.then()
.statusCode(200)
.body(is(""));
}
@Test
public void testCreateSession() {
given()
.when().get("/api/td/create-session")
.then()
.statusCode(200)
.body(not(emptyOrNullString()));
}
@Test
public void testCreateMultipleSessions() {
var sessionId1 = given()
.when().get("/api/td/create-session")
.then()
.statusCode(200)
.body(not(emptyOrNullString()))
.extract()
.body()
.asString();
var sessionId2 = given()
.when().get("/api/td/create-session")
.then()
.statusCode(200)
.body(not(emptyOrNullString()))
.extract()
.body()
.asString();
var expectedBodyElems = Set.of(sessionId1, sessionId2);
var bodyElems = Set.of(given()
.when().get("/api/td/list")
.then()
.statusCode(200)
.extract()
.body()
.asString()
.split("\n"));
Assertions.assertEquals(expectedBodyElems, bodyElems);
}
}