Implement client

This commit is contained in:
Andrea Cavalli 2022-01-07 12:21:41 +01:00
parent ede105a6ea
commit af96cfb7dc
5 changed files with 145 additions and 12 deletions

25
pom.xml
View File

@ -260,4 +260,29 @@
</plugin>
</plugins>
</build>
<profiles>
<profile>
<id>standalone</id>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
<dependencies>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.32</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>2.17.1</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j18-impl</artifactId>
<version>2.17.1</version>
</dependency>
</dependencies>
</profile>
</profiles>
</project>

View File

@ -0,0 +1,101 @@
package it.tdlight.reactiveapi;
import io.atomix.cluster.messaging.ClusterEventService;
import io.atomix.cluster.messaging.Subscription;
import io.atomix.core.Atomix;
import it.tdlight.jni.TdApi;
import it.tdlight.reactiveapi.Event.ClientBoundEvent;
import it.tdlight.reactiveapi.Event.OnBotLoginCodeRequested;
import it.tdlight.reactiveapi.Event.OnUpdateData;
import it.tdlight.reactiveapi.Event.OnUpdateError;
import it.tdlight.reactiveapi.Event.OnUserLoginCodeRequested;
import it.tdlight.reactiveapi.Event.Request;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.CompletableFuture;
import org.apache.commons.lang3.SerializationException;
import reactor.core.publisher.BufferOverflowStrategy;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink.OverflowStrategy;
import reactor.core.publisher.Mono;
public class AtomixReactiveApiClient implements ReactiveApiClient {
private final ClusterEventService eventService;
private final long liveId;
private final long userId;
public AtomixReactiveApiClient(Atomix atomix, long liveId, long userId) {
this.eventService = atomix.getEventService();
this.liveId = liveId;
this.userId = userId;
}
@Override
public Flux<ClientBoundEvent> clientBoundEvents() {
return Flux.<ClientBoundEvent>push(sink -> {
var subscriptionFuture = eventService.subscribe("session-" + liveId + "-client-bound-events",
this::deserializeEvent,
s -> {
sink.next(s);
return CompletableFuture.completedFuture(null);
},
(a) -> null
);
sink.onDispose(() -> subscriptionFuture.thenAccept(Subscription::close));
}, OverflowStrategy.ERROR).onBackpressureBuffer(0xFFFF, BufferOverflowStrategy.ERROR);
}
@Override
public <T extends TdApi.Object> Mono<T> request(TdApi.Function<T> request, Instant timeout) {
return Mono.fromCompletionStage(() -> eventService.send("session-" + liveId + "-requests",
new Request<>(liveId, request, timeout),
AtomixReactiveApiClient::serializeRequest,
AtomixReactiveApiClient::deserializeResponse,
Duration.between(Instant.now(), timeout)
));
}
@SuppressWarnings("unchecked")
private static <R extends TdApi.Object> R deserializeResponse(byte[] bytes) {
try {
return (R) TdApi.Deserializer.deserialize(new DataInputStream(new ByteArrayInputStream(bytes)));
} catch (IOException ex) {
throw new SerializationException(ex);
}
}
private static byte[] serializeRequest(Request<?> request) {
try (var byteArrayOutputStream = new ByteArrayOutputStream()) {
try (var dataOutputStream = new DataOutputStream(byteArrayOutputStream)) {
dataOutputStream.writeLong(request.liveId());
request.request().serialize(dataOutputStream);
dataOutputStream.writeLong(request.timeout().toEpochMilli());
return byteArrayOutputStream.toByteArray();
}
} catch (IOException ex) {
throw new SerializationException(ex);
}
}
private ClientBoundEvent deserializeEvent(byte[] bytes) {
try (var byteArrayInputStream = new ByteArrayInputStream(bytes)) {
try (var dataInputStream = new DataInputStream(byteArrayInputStream)) {
return switch (dataInputStream.readByte()) {
case 0x01 -> new OnUpdateData(liveId, userId, (TdApi.Update) TdApi.Deserializer.deserialize(dataInputStream));
case 0x02 -> new OnUpdateError(liveId, userId, (TdApi.Error) TdApi.Deserializer.deserialize(dataInputStream));
case 0x03 -> new OnUserLoginCodeRequested(liveId, userId, dataInputStream.readLong());
case 0x04 -> new OnBotLoginCodeRequested(liveId, userId, dataInputStream.readUTF());
default -> throw new IllegalStateException("Unexpected value: " + dataInputStream.readByte());
};
}
} catch (IOException ex) {
throw new SerializationException(ex);
}
}
}

View File

@ -0,0 +1,14 @@
package it.tdlight.reactiveapi;
import it.tdlight.jni.TdApi;
import it.tdlight.reactiveapi.Event.ClientBoundEvent;
import java.time.Instant;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public interface ReactiveApiClient {
Flux<ClientBoundEvent> clientBoundEvents();
<T extends TdApi.Object> Mono<T> request(TdApi.Function<T> request, Instant timeout);
}

View File

@ -98,9 +98,7 @@ public abstract class ReactiveApiPublisher {
.subscribeOn(Schedulers.parallel())
// Handle signals, then return a ResultingEvent
.mapNotNull(this::onSignal)
.doFinally(s -> {
LOG.trace("Finalized telegram client events");
})
.doFinally(s -> LOG.trace("Finalized telegram client events"))
.publish();
publishedResultingEvents
@ -293,16 +291,16 @@ public abstract class ReactiveApiPublisher {
try (var byteArrayOutputStream = new ByteArrayOutputStream()) {
try (var dataOutputStream = new DataOutputStream(byteArrayOutputStream)) {
if (clientBoundEvent instanceof OnUpdateData onUpdateData) {
dataOutputStream.write(0x1);
dataOutputStream.writeByte(0x1);
onUpdateData.update().serialize(dataOutputStream);
} else if (clientBoundEvent instanceof OnUpdateError onUpdateError) {
dataOutputStream.write(0x2);
dataOutputStream.writeByte(0x2);
onUpdateError.error().serialize(dataOutputStream);
} else if (clientBoundEvent instanceof OnUserLoginCodeRequested onUserLoginCodeRequested) {
dataOutputStream.write(0x3);
dataOutputStream.writeByte(0x3);
dataOutputStream.writeLong(onUserLoginCodeRequested.phoneNumber());
} else if (clientBoundEvent instanceof OnBotLoginCodeRequested onBotLoginCodeRequested) {
dataOutputStream.write(0x4);
dataOutputStream.writeByte(0x4);
dataOutputStream.writeUTF(onBotLoginCodeRequested.token());
} else {
throw new UnsupportedOperationException("Unexpected value: " + clientBoundEvent);

View File

@ -1,5 +0,0 @@
package it.tdlight.reactiveapi;
public class ReactiveApiSubscriber {
}