First commit
This commit is contained in:
commit
527ce25741
3
.gitignore
vendored
Normal file
3
.gitignore
vendored
Normal file
@ -0,0 +1,3 @@
|
||||
/target/
|
||||
/.idea/
|
||||
.directory
|
2
README.md
Normal file
2
README.md
Normal file
@ -0,0 +1,2 @@
|
||||
# Vertx RPC services
|
||||
A simple implementation of an RPC made with Vert.x event-bus
|
110
pom.xml
Normal file
110
pom.xml
Normal file
@ -0,0 +1,110 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<groupId>it.cavallium</groupId>
|
||||
<artifactId>vertx-rpc-services</artifactId>
|
||||
<version>1.0.0-SNAPSHOT</version>
|
||||
|
||||
<properties>
|
||||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
||||
|
||||
<maven-compiler-plugin.version>3.12.1</maven-compiler-plugin.version>
|
||||
<maven-surefire-plugin.version>3.0.0-M7</maven-surefire-plugin.version>
|
||||
|
||||
<vertx.version>4.5.8</vertx.version>
|
||||
<junit-jupiter.version>5.10.2</junit-jupiter.version>
|
||||
</properties>
|
||||
|
||||
<repositories>
|
||||
<repository>
|
||||
<id>sonatype-snapshot</id>
|
||||
<url>https://s01.oss.sonatype.org/content/repositories/snapshots</url>
|
||||
<releases>
|
||||
<enabled>false</enabled>
|
||||
</releases>
|
||||
<snapshots>
|
||||
<enabled>true</enabled>
|
||||
</snapshots>
|
||||
</repository>
|
||||
<repository>
|
||||
<id>central</id>
|
||||
<url>https://repo.maven.apache.org/maven2/</url>
|
||||
</repository>
|
||||
</repositories>
|
||||
|
||||
<dependencyManagement>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>io.vertx</groupId>
|
||||
<artifactId>vertx-stack-depchain</artifactId>
|
||||
<version>${vertx.version}</version>
|
||||
<type>pom</type>
|
||||
<scope>import</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</dependencyManagement>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>io.vertx</groupId>
|
||||
<artifactId>vertx-rx-java3</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.jetbrains</groupId>
|
||||
<artifactId>annotations</artifactId>
|
||||
<version>24.1.0</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.junit.jupiter</groupId>
|
||||
<artifactId>junit-jupiter-api</artifactId>
|
||||
<version>${junit-jupiter.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.junit.jupiter</groupId>
|
||||
<artifactId>junit-jupiter-engine</artifactId>
|
||||
<version>${junit-jupiter.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<artifactId>maven-compiler-plugin</artifactId>
|
||||
<version>${maven-compiler-plugin.version}</version>
|
||||
<configuration>
|
||||
<release>21</release>
|
||||
</configuration>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<artifactId>maven-surefire-plugin</artifactId>
|
||||
<version>${maven-surefire-plugin.version}</version>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-enforcer-plugin</artifactId>
|
||||
<version>3.0.0-M3</version>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>enforce</id>
|
||||
<configuration>
|
||||
<rules>
|
||||
<dependencyConvergence/>
|
||||
</rules>
|
||||
</configuration>
|
||||
<goals>
|
||||
<goal>enforce</goal>
|
||||
</goals>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
|
||||
|
||||
</project>
|
43
src/main/java/it/cavallium/vertx/rpcservice/DataCodec.java
Normal file
43
src/main/java/it/cavallium/vertx/rpcservice/DataCodec.java
Normal file
@ -0,0 +1,43 @@
|
||||
package it.cavallium.vertx.rpcservice;
|
||||
|
||||
import io.vertx.core.buffer.Buffer;
|
||||
import io.vertx.core.eventbus.MessageCodec;
|
||||
import io.vertx.core.json.Json;
|
||||
|
||||
public record DataCodec<T>(MessageCodec<T, T> codec) {
|
||||
|
||||
public static final class DataMessageCodec implements MessageCodec<Object, Object> {
|
||||
private int pos2;
|
||||
|
||||
@Override
|
||||
public void encodeToWire(Buffer buffer, Object o) {
|
||||
Json.encodeToBuffer(o).writeToBuffer(buffer);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object decodeFromWire(int pos, Buffer buffer) {
|
||||
Buffer buf = Buffer.buffer();
|
||||
this.pos2 = buffer.readFromBuffer(pos, buf);
|
||||
return Json.decodeValue(buf);
|
||||
}
|
||||
|
||||
public int getPos2() {
|
||||
return pos2;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object transform(Object o) {
|
||||
return o;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String name() {
|
||||
return "JsonObjectCodec";
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte systemCodecID() {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
}
|
24
src/main/java/it/cavallium/vertx/rpcservice/RxCloseable.java
Normal file
24
src/main/java/it/cavallium/vertx/rpcservice/RxCloseable.java
Normal file
@ -0,0 +1,24 @@
|
||||
package it.cavallium.vertx.rpcservice;
|
||||
|
||||
import io.reactivex.rxjava3.core.Completable;
|
||||
import io.vertx.core.Closeable;
|
||||
import io.vertx.core.Promise;
|
||||
|
||||
public interface RxCloseable extends Closeable, AutoCloseable {
|
||||
|
||||
@Override
|
||||
default void close(Promise<Void> completion) {
|
||||
rxClose().subscribe(completion::complete, completion::fail);
|
||||
}
|
||||
|
||||
/**
|
||||
* Use {@link #close(Promise)}
|
||||
*/
|
||||
@Deprecated
|
||||
@Override
|
||||
default void close() {
|
||||
rxClose().blockingAwait();
|
||||
}
|
||||
|
||||
Completable rxClose();
|
||||
}
|
@ -0,0 +1,10 @@
|
||||
package it.cavallium.vertx.rpcservice;
|
||||
|
||||
import java.lang.annotation.ElementType;
|
||||
import java.lang.annotation.Retention;
|
||||
import java.lang.annotation.RetentionPolicy;
|
||||
import java.lang.annotation.Target;
|
||||
|
||||
@Target(ElementType.TYPE)
|
||||
@Retention(RetentionPolicy.RUNTIME)
|
||||
public @interface ServiceClass {}
|
142
src/main/java/it/cavallium/vertx/rpcservice/ServiceClient.java
Normal file
142
src/main/java/it/cavallium/vertx/rpcservice/ServiceClient.java
Normal file
@ -0,0 +1,142 @@
|
||||
package it.cavallium.vertx.rpcservice;
|
||||
|
||||
import io.reactivex.rxjava3.core.Completable;
|
||||
import io.reactivex.rxjava3.core.Maybe;
|
||||
import io.reactivex.rxjava3.core.Single;
|
||||
import io.vertx.rxjava3.core.Vertx;
|
||||
import it.cavallium.vertx.rpcservice.ServiceMethodRequest.ServiceMethodRequestMessageCodec;
|
||||
import it.cavallium.vertx.rpcservice.ServiceMethodReturnValue.ServiceMethodReturnValueMessageCodec;
|
||||
import java.lang.reflect.InvocationHandler;
|
||||
import java.lang.reflect.Method;
|
||||
import java.lang.reflect.ParameterizedType;
|
||||
import java.lang.reflect.Proxy;
|
||||
import java.lang.reflect.Type;
|
||||
import java.util.Arrays;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
|
||||
public class ServiceClient<T> {
|
||||
|
||||
private final Vertx vertx;
|
||||
private final T instance;
|
||||
|
||||
enum ReturnArity {
|
||||
COMPLETABLE,
|
||||
MAYBE,
|
||||
SINGLE
|
||||
}
|
||||
|
||||
private record MethodData(String address, Type returnType, ReturnArity arity) {}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public ServiceClient(Vertx vertx, Class<T> serviceClass) {
|
||||
this.vertx = vertx;
|
||||
ServiceUtils.tryRegisterDefaultCodec(vertx, ServiceMethodRequest.class, ServiceMethodRequestMessageCodec.INSTANCE);
|
||||
ServiceUtils.tryRegisterDefaultCodec(vertx, ServiceMethodReturnValue.class, ServiceMethodReturnValueMessageCodec.INSTANCE);
|
||||
|
||||
if (!serviceClass.isInterface() && serviceClass.isAnnotationPresent(ServiceClass.class)) {
|
||||
throw new UnsupportedOperationException("Only interfaces are allowed");
|
||||
}
|
||||
|
||||
Map<Method, MethodData> methodData = processMethods(serviceClass, serviceClass.getDeclaredMethods());
|
||||
this.instance = (T) Proxy.newProxyInstance(this.getClass().getClassLoader(),
|
||||
new Class[]{serviceClass},
|
||||
new DynamicInvocationHandler(serviceClass, methodData)
|
||||
);
|
||||
}
|
||||
|
||||
private Map<Method, MethodData> processMethods(Class<T> serviceClass, Method[] declaredMethods) {
|
||||
return Arrays
|
||||
.stream(declaredMethods)
|
||||
.filter(method -> method.isAnnotationPresent(ServiceMethod.class))
|
||||
.filter(method -> !method.isDefault())
|
||||
.collect(Collectors.toMap(Function.identity(), method -> {
|
||||
String address = ServiceUtils.getMethodEventBusAddress(serviceClass, method);
|
||||
final ReturnArity arity = getReturnArity(serviceClass, method);
|
||||
if (arity == ReturnArity.COMPLETABLE) {
|
||||
return new MethodData(address, null, ReturnArity.COMPLETABLE);
|
||||
} else {
|
||||
Type returnType = method.getGenericReturnType();
|
||||
if (returnType instanceof ParameterizedType parameterizedType) {
|
||||
Type[] typeArguments = parameterizedType.getActualTypeArguments();
|
||||
if (typeArguments.length != 1) {
|
||||
throw new UnsupportedOperationException(
|
||||
"Method return type is not valid for service \"" + serviceClass + "\", method \"" + method
|
||||
+ "\", it should be Single<?> or Maybe<?> with a single type parameter");
|
||||
}
|
||||
var returnTypeInner = typeArguments[0];
|
||||
return new MethodData(address, returnTypeInner, arity);
|
||||
} else {
|
||||
throw new UnsupportedOperationException(
|
||||
"Method return type is not valid for service \"" + serviceClass + "\", method \"" + method
|
||||
+ "\", it should be Single<?> or Maybe<?> with a valid type parameter");
|
||||
}
|
||||
}
|
||||
}));
|
||||
}
|
||||
|
||||
static <T> @NotNull ReturnArity getReturnArity(Class<T> serviceClass, Method method) {
|
||||
Class<?> returnTypeClass = method.getReturnType();
|
||||
ReturnArity arity;
|
||||
if (returnTypeClass.equals(Completable.class)) {
|
||||
arity = ReturnArity.COMPLETABLE;
|
||||
} else if (returnTypeClass.equals(Maybe.class)) {
|
||||
arity = ReturnArity.MAYBE;
|
||||
} else if (returnTypeClass.equals(Single.class)) {
|
||||
arity = ReturnArity.SINGLE;
|
||||
} else {
|
||||
throw new UnsupportedOperationException(
|
||||
"Method return type is not valid for service \"" + serviceClass + "\", method \"" + method
|
||||
+ "\", it should be Single<?>, Maybe<?>, or Completable");
|
||||
}
|
||||
return arity;
|
||||
}
|
||||
|
||||
private class DynamicInvocationHandler implements InvocationHandler {
|
||||
|
||||
private final Class<T> serviceClass;
|
||||
private final Map<Method, ServiceClient.MethodData> methodDataMap;
|
||||
private final Object object;
|
||||
|
||||
|
||||
public DynamicInvocationHandler(Class<T> serviceClass, Map<Method, MethodData> methodDataMap) {
|
||||
this.serviceClass = serviceClass;
|
||||
this.methodDataMap = methodDataMap;
|
||||
this.object = new Object();
|
||||
}
|
||||
|
||||
@SuppressWarnings("ReactiveStreamsUnusedPublisher")
|
||||
@Override
|
||||
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
|
||||
if (!method.isAnnotationPresent(ServiceMethod.class)) {
|
||||
if (method.getDeclaringClass() == Object.class) {
|
||||
return method.invoke(object, args);
|
||||
} else if (method.isDefault()) {
|
||||
try {
|
||||
return InvocationHandler.invokeDefault(proxy, method, args);
|
||||
} catch (Throwable e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
} else {
|
||||
throw new UnsupportedOperationException("Method \"" + method + "\" is not annotated with @ServiceMethod!");
|
||||
}
|
||||
}
|
||||
var methodData = methodDataMap.get(method);
|
||||
var address = methodData.address;
|
||||
var request = new ServiceMethodRequest(args);
|
||||
var requestSingle = vertx.eventBus().<ServiceMethodReturnValue<?>>request(address, request);
|
||||
return switch (methodData.arity) {
|
||||
case COMPLETABLE -> requestSingle.ignoreElement();
|
||||
case MAYBE -> requestSingle.mapOptional(msg -> Optional.ofNullable(msg.body().value()));
|
||||
case SINGLE -> requestSingle.map(msg -> msg.body().value());
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
public T getInstance() {
|
||||
return instance;
|
||||
}
|
||||
}
|
@ -0,0 +1,10 @@
|
||||
package it.cavallium.vertx.rpcservice;
|
||||
|
||||
import java.lang.annotation.ElementType;
|
||||
import java.lang.annotation.Retention;
|
||||
import java.lang.annotation.RetentionPolicy;
|
||||
import java.lang.annotation.Target;
|
||||
|
||||
@Target(ElementType.METHOD)
|
||||
@Retention(RetentionPolicy.RUNTIME)
|
||||
public @interface ServiceMethod {}
|
@ -0,0 +1,54 @@
|
||||
package it.cavallium.vertx.rpcservice;
|
||||
|
||||
import io.vertx.core.buffer.Buffer;
|
||||
import io.vertx.core.eventbus.MessageCodec;
|
||||
import java.util.ArrayList;
|
||||
import it.cavallium.vertx.rpcservice.DataCodec.DataMessageCodec;
|
||||
|
||||
record ServiceMethodRequest(Object[] arguments) {
|
||||
|
||||
static class ServiceMethodRequestMessageCodec implements
|
||||
MessageCodec<ServiceMethodRequest, ServiceMethodRequest> {
|
||||
|
||||
public static final ServiceMethodRequestMessageCodec INSTANCE
|
||||
= new ServiceMethodRequestMessageCodec();
|
||||
private final DataMessageCodec dataCodec;
|
||||
|
||||
private ServiceMethodRequestMessageCodec() {
|
||||
this.dataCodec = new DataMessageCodec();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void encodeToWire(Buffer buffer, ServiceMethodRequest request) {
|
||||
for (int i = 0; i < request.arguments.length; i++) {
|
||||
var argument = request.arguments[i];
|
||||
dataCodec.encodeToWire(buffer, argument);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public ServiceMethodRequest decodeFromWire(int pos, Buffer buffer) {
|
||||
var resultArgs = new ArrayList<>();
|
||||
while (pos < buffer.length()) {
|
||||
resultArgs.add(dataCodec.decodeFromWire(pos, buffer));
|
||||
pos = dataCodec.getPos2();
|
||||
}
|
||||
return new ServiceMethodRequest(resultArgs.toArray(Object[]::new));
|
||||
}
|
||||
|
||||
@Override
|
||||
public ServiceMethodRequest transform(ServiceMethodRequest request) {
|
||||
return request;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String name() {
|
||||
return "ServiceMethodRequestCodec";
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte systemCodecID() {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,47 @@
|
||||
package it.cavallium.vertx.rpcservice;
|
||||
|
||||
import io.vertx.core.buffer.Buffer;
|
||||
import io.vertx.core.eventbus.MessageCodec;
|
||||
import it.cavallium.vertx.rpcservice.DataCodec.DataMessageCodec;
|
||||
|
||||
record ServiceMethodReturnValue<T>(T value) {
|
||||
|
||||
|
||||
@SuppressWarnings("rawtypes")
|
||||
static class ServiceMethodReturnValueMessageCodec implements
|
||||
MessageCodec<ServiceMethodReturnValue, ServiceMethodReturnValue> {
|
||||
|
||||
public static final ServiceMethodReturnValueMessageCodec INSTANCE
|
||||
= new ServiceMethodReturnValueMessageCodec();
|
||||
private final DataMessageCodec dataCodec;
|
||||
|
||||
private ServiceMethodReturnValueMessageCodec() {
|
||||
this.dataCodec = new DataMessageCodec();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void encodeToWire(Buffer buffer, ServiceMethodReturnValue request) {
|
||||
dataCodec.encodeToWire(buffer, request.value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ServiceMethodReturnValue<?> decodeFromWire(int pos, Buffer buffer) {
|
||||
return new ServiceMethodReturnValue<>(dataCodec.decodeFromWire(pos, buffer));
|
||||
}
|
||||
|
||||
@Override
|
||||
public ServiceMethodReturnValue<?> transform(ServiceMethodReturnValue request) {
|
||||
return request;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String name() {
|
||||
return "ServiceMethodReturnValueCodec";
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte systemCodecID() {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
}
|
103
src/main/java/it/cavallium/vertx/rpcservice/ServiceServer.java
Normal file
103
src/main/java/it/cavallium/vertx/rpcservice/ServiceServer.java
Normal file
@ -0,0 +1,103 @@
|
||||
package it.cavallium.vertx.rpcservice;
|
||||
|
||||
import static it.cavallium.vertx.rpcservice.ServiceClient.getReturnArity;
|
||||
import static it.cavallium.vertx.rpcservice.ServiceUtils.getMethodEventBusAddress;
|
||||
|
||||
import io.reactivex.rxjava3.core.Completable;
|
||||
import io.reactivex.rxjava3.core.Flowable;
|
||||
import io.reactivex.rxjava3.core.Maybe;
|
||||
import io.reactivex.rxjava3.core.Single;
|
||||
import io.reactivex.rxjava3.functions.Action;
|
||||
import io.reactivex.rxjava3.functions.Consumer;
|
||||
import io.vertx.core.Handler;
|
||||
import io.vertx.rxjava3.core.Vertx;
|
||||
import io.vertx.rxjava3.core.eventbus.Message;
|
||||
import io.vertx.rxjava3.core.eventbus.MessageConsumer;
|
||||
import java.lang.invoke.MethodHandle;
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.lang.reflect.Method;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
import it.cavallium.vertx.rpcservice.ServiceMethodReturnValue.ServiceMethodReturnValueMessageCodec;
|
||||
import it.cavallium.vertx.rpcservice.ServiceMethodRequest.ServiceMethodRequestMessageCodec;
|
||||
|
||||
public class ServiceServer<T> implements RxCloseable {
|
||||
|
||||
private final Class<? super T> serviceClass;
|
||||
private final List<MessageConsumer<ServiceMethodRequest>> consumers;
|
||||
private static final ServiceMethodReturnValue<?> EMPTY_RESULT = new ServiceMethodReturnValue<>(null);
|
||||
|
||||
public ServiceServer(Vertx vertx, T service, Class<? super T> serviceClass) {
|
||||
this.serviceClass = serviceClass;
|
||||
ServiceUtils.tryRegisterDefaultCodec(vertx, ServiceMethodRequest.class, ServiceMethodRequestMessageCodec.INSTANCE);
|
||||
ServiceUtils.tryRegisterDefaultCodec(vertx, ServiceMethodReturnValue.class, ServiceMethodReturnValueMessageCodec.INSTANCE);
|
||||
|
||||
if (!serviceClass.isInterface() && serviceClass.isAnnotationPresent(ServiceClass.class)) {
|
||||
throw new UnsupportedOperationException("Only interfaces are allowed");
|
||||
}
|
||||
|
||||
record ServiceMethodDefinition(Method method, String address, Handler<Message<ServiceMethodRequest>> handler) {}
|
||||
|
||||
this.consumers = Arrays.stream(serviceClass.getDeclaredMethods())
|
||||
.filter(method -> method.isAnnotationPresent(ServiceMethod.class))
|
||||
.map(method -> {
|
||||
var address = getMethodEventBusAddress(serviceClass, method);
|
||||
var handler = this.createRequestHandler(service, method);
|
||||
return new ServiceMethodDefinition(method, address, handler);
|
||||
})
|
||||
.map(definition -> vertx.eventBus().consumer(definition.address, definition.handler))
|
||||
.toList();
|
||||
}
|
||||
|
||||
private Handler<Message<ServiceMethodRequest>> createRequestHandler(T service, Method declaredMethod) {
|
||||
var lookup = MethodHandles.publicLookup();
|
||||
MethodHandle mh;
|
||||
int paramsCount;
|
||||
try {
|
||||
mh = lookup.unreflect(declaredMethod).bindTo(service);
|
||||
paramsCount = declaredMethod.getParameterCount();
|
||||
} catch (IllegalAccessException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
var arity = getReturnArity(serviceClass, declaredMethod);
|
||||
return msg -> {
|
||||
try {
|
||||
var req = msg.body();
|
||||
|
||||
if (req.arguments() == null && paramsCount > 0) {
|
||||
msg.fail(500, "Arguments array is null, expected " + paramsCount + " arguments");
|
||||
}
|
||||
|
||||
switch (arity) {
|
||||
case COMPLETABLE -> ((Completable) mh.invokeWithArguments(req.arguments()))
|
||||
.subscribe(getEmptyReplyHandler(msg), getErrorHandler(msg));
|
||||
case MAYBE -> ((Maybe<?>) mh.invokeWithArguments(req.arguments()))
|
||||
.subscribe(getReplyHandler(msg), getErrorHandler(msg), getEmptyReplyHandler(msg));
|
||||
case SINGLE -> ((Single<?>) mh.invokeWithArguments(req.arguments()))
|
||||
.subscribe(getReplyHandler(msg), getErrorHandler(msg));
|
||||
}
|
||||
} catch (Throwable e) {
|
||||
msg.fail(500, e.toString());
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private static @NotNull Consumer<Object> getReplyHandler(Message<ServiceMethodRequest> msg) {
|
||||
return ok -> msg.reply(new ServiceMethodReturnValue<>(ok));
|
||||
}
|
||||
|
||||
private static @NotNull Consumer<Throwable> getErrorHandler(Message<ServiceMethodRequest> msg) {
|
||||
return err -> msg.fail(500, err.toString());
|
||||
}
|
||||
|
||||
private static @NotNull Action getEmptyReplyHandler(Message<ServiceMethodRequest> msg) {
|
||||
return () -> msg.reply(EMPTY_RESULT);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Completable rxClose() {
|
||||
return Flowable.fromIterable(consumers)
|
||||
.flatMapCompletable(MessageConsumer::unregister);
|
||||
}
|
||||
}
|
@ -0,0 +1,36 @@
|
||||
package it.cavallium.vertx.rpcservice;
|
||||
|
||||
import io.vertx.core.eventbus.MessageCodec;
|
||||
import io.vertx.rxjava3.core.Vertx;
|
||||
import java.lang.reflect.Method;
|
||||
import java.util.Objects;
|
||||
|
||||
class ServiceUtils {
|
||||
|
||||
static String getMethodEventBusAddress(Class<?> serviceClass, Method method) {
|
||||
return getMethodEventBusAddress(getMethodEventBusAddressPrefix(serviceClass), method);
|
||||
}
|
||||
|
||||
static String getMethodEventBusAddress(String prefix, Method method) {
|
||||
return prefix + method.getName();
|
||||
}
|
||||
|
||||
static String getMethodEventBusAddressPrefix(Class<?> serviceClass) {
|
||||
return "t_service_" + serviceClass.getSimpleName() + "#";
|
||||
}
|
||||
|
||||
@SuppressWarnings("StatementWithEmptyBody")
|
||||
public static <T> void tryRegisterDefaultCodec(Vertx vertx,
|
||||
Class<T> serviceMethodRequestClass,
|
||||
MessageCodec<T, ?> codec) {
|
||||
try {
|
||||
vertx.eventBus().getDelegate().registerDefaultCodec(serviceMethodRequestClass, codec);
|
||||
} catch (IllegalStateException ex) {
|
||||
if (!Objects.requireNonNullElse(ex.getMessage(), "").startsWith("Already a default codec registered for class")) {
|
||||
throw ex;
|
||||
} else {
|
||||
// ignored
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
7
src/main/java/module-info.java
Normal file
7
src/main/java/module-info.java
Normal file
@ -0,0 +1,7 @@
|
||||
module vertx.rpc.services {
|
||||
requires io.reactivex.rxjava3;
|
||||
requires io.vertx.core;
|
||||
requires org.jetbrains.annotations;
|
||||
requires vertx.rx.java3;
|
||||
exports it.cavallium.vertx.rpcservice;
|
||||
}
|
@ -0,0 +1,50 @@
|
||||
package it.cavallium.vertx.rpcservice.service;
|
||||
|
||||
import io.reactivex.rxjava3.core.Completable;
|
||||
import io.reactivex.rxjava3.core.Maybe;
|
||||
import io.reactivex.rxjava3.core.Single;
|
||||
import it.cavallium.vertx.rpcservice.ServiceClass;
|
||||
import it.cavallium.vertx.rpcservice.ServiceMethod;
|
||||
import java.util.List;
|
||||
|
||||
@ServiceClass
|
||||
public interface MathService {
|
||||
|
||||
@ServiceMethod
|
||||
Single<Boolean> calculateNot(boolean a);
|
||||
|
||||
@ServiceMethod
|
||||
Single<Boolean> calculateAnd(boolean a, boolean b);
|
||||
|
||||
@ServiceMethod
|
||||
Single<Boolean> calculateOr(boolean a, boolean b);
|
||||
|
||||
@ServiceMethod
|
||||
Completable calculateCompletable();
|
||||
|
||||
@ServiceMethod
|
||||
Single<Boolean[]> calculateMergeToArray(boolean a, boolean b);
|
||||
|
||||
@ServiceMethod
|
||||
Single<List<Boolean>> calculateMergeToList(boolean a, boolean b);
|
||||
|
||||
@ServiceMethod
|
||||
Single<Boolean> calculateListOr(List<Boolean> input);
|
||||
|
||||
@ServiceMethod
|
||||
Single<Boolean> calculateArrayOr(Boolean[] input);
|
||||
|
||||
@ServiceMethod
|
||||
Maybe<Boolean> calculateMaybe(boolean shouldReturn);
|
||||
|
||||
@ServiceMethod
|
||||
Single<ComputedBooleanOperation> calculateCustomRecordOr(BooleanOperation op);
|
||||
|
||||
record BooleanOperation(boolean a, Boolean b) {}
|
||||
|
||||
record ComputedBooleanOperation(BooleanOperation input, boolean result) {}
|
||||
|
||||
default String test() {
|
||||
return "true";
|
||||
}
|
||||
}
|
@ -0,0 +1,60 @@
|
||||
package it.cavallium.vertx.rpcservice.service;
|
||||
|
||||
import io.reactivex.rxjava3.core.Completable;
|
||||
import io.reactivex.rxjava3.core.Maybe;
|
||||
import io.reactivex.rxjava3.core.Single;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
class MathServiceImpl implements MathService {
|
||||
|
||||
@Override
|
||||
public Single<Boolean> calculateNot(boolean a) {
|
||||
return Single.just(!a);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Single<Boolean> calculateAnd(boolean a, boolean b) {
|
||||
return Single.just(a & b);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Single<Boolean> calculateOr(boolean a, boolean b) {
|
||||
return Single.just(a | b);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Completable calculateCompletable() {
|
||||
return Completable.complete();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Single<Boolean[]> calculateMergeToArray(boolean a, boolean b) {
|
||||
return Single.just(new Boolean[]{a, b});
|
||||
}
|
||||
|
||||
@Override
|
||||
public Single<List<Boolean>> calculateMergeToList(boolean a, boolean b) {
|
||||
return Single.just(List.of(a, b));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Single<Boolean> calculateListOr(List<Boolean> input) {
|
||||
return Single.just(input.stream().reduce(false, (a, b) -> a | b));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Single<Boolean> calculateArrayOr(Boolean[] input) {
|
||||
return Single.just(Arrays.stream(input).reduce(false, (a, b) -> a | b));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Single<ComputedBooleanOperation> calculateCustomRecordOr(BooleanOperation op) {
|
||||
return Single.just(new ComputedBooleanOperation(op, op.a() | op.b()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Maybe<Boolean> calculateMaybe(boolean shouldReturn) {
|
||||
return shouldReturn ? Maybe.just(true) : Maybe.empty();
|
||||
}
|
||||
}
|
@ -0,0 +1,43 @@
|
||||
package it.cavallium.vertx.rpcservice.service;
|
||||
|
||||
import io.vertx.rxjava3.core.Vertx;
|
||||
import it.cavallium.vertx.rpcservice.ServiceClient;
|
||||
import it.cavallium.vertx.rpcservice.ServiceServer;
|
||||
import it.cavallium.vertx.rpcservice.service.MathService.BooleanOperation;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
public class TestService {
|
||||
|
||||
@Test
|
||||
public void testService() {
|
||||
var v = Vertx.vertx();
|
||||
|
||||
var svcImpl = new MathServiceImpl();
|
||||
|
||||
try (var server = new ServiceServer<>(v, svcImpl, MathService.class)) {
|
||||
var client = new ServiceClient<>(v, MathService.class);
|
||||
var clientInstance = client.getInstance();
|
||||
Assertions.assertDoesNotThrow(clientInstance::hashCode);
|
||||
Assertions.assertDoesNotThrow(clientInstance::toString);
|
||||
Assertions.assertEquals("true", clientInstance.test());
|
||||
Assertions.assertFalse(clientInstance.calculateAnd(true, false).blockingGet());
|
||||
Assertions.assertTrue(clientInstance.calculateAnd(true, true).blockingGet());
|
||||
Assertions.assertTrue(clientInstance.calculateOr(true, false).blockingGet());
|
||||
Assertions.assertTrue(clientInstance.calculateOr(true, true).blockingGet());
|
||||
Assertions.assertFalse(clientInstance.calculateOr(false, false).blockingGet());
|
||||
Assertions.assertTrue(clientInstance.calculateNot(false).blockingGet());
|
||||
Assertions.assertFalse(clientInstance.calculateNot(true).blockingGet());
|
||||
Assertions.assertDoesNotThrow(() -> clientInstance.calculateCompletable().blockingAwait());
|
||||
Assertions.assertArrayEquals(new Boolean[] {false, false}, clientInstance.calculateMergeToArray(false, false).blockingGet());
|
||||
Assertions.assertEquals(new ArrayList<>(List.of(false, true)), new ArrayList<>(clientInstance.calculateMergeToList(false, true).blockingGet()));
|
||||
Assertions.assertTrue(clientInstance.calculateListOr(List.of(false, true)).blockingGet());
|
||||
Assertions.assertTrue(clientInstance.calculateArrayOr(new Boolean[] {false, true}).blockingGet());
|
||||
Assertions.assertTrue(clientInstance.calculateCustomRecordOr(new BooleanOperation(false, true)).blockingGet().result());
|
||||
Assertions.assertNull(clientInstance.calculateMaybe(false).blockingGet());
|
||||
Assertions.assertTrue(clientInstance.calculateMaybe(true).blockingGet(false));
|
||||
}
|
||||
}
|
||||
}
|
7
src/test/java/module-info.java
Normal file
7
src/test/java/module-info.java
Normal file
@ -0,0 +1,7 @@
|
||||
module vertx.rpc.services.test {
|
||||
requires vertx.rpc.services;
|
||||
requires org.junit.jupiter.api;
|
||||
requires vertx.rx.java3;
|
||||
requires io.reactivex.rxjava3;
|
||||
exports it.cavallium.vertx.rpcservice.service;
|
||||
}
|
Loading…
Reference in New Issue
Block a user