diff --git a/pom.xml b/pom.xml index 5db844a..8d777b9 100644 --- a/pom.xml +++ b/pom.xml @@ -260,4 +260,29 @@ + + + standalone + + true + + + + org.slf4j + slf4j-api + 1.7.32 + + + org.apache.logging.log4j + log4j-slf4j-impl + 2.17.1 + + + org.apache.logging.log4j + log4j-slf4j18-impl + 2.17.1 + + + + diff --git a/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApiClient.java b/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApiClient.java new file mode 100644 index 0000000..0f0faeb --- /dev/null +++ b/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApiClient.java @@ -0,0 +1,101 @@ +package it.tdlight.reactiveapi; + +import io.atomix.cluster.messaging.ClusterEventService; +import io.atomix.cluster.messaging.Subscription; +import io.atomix.core.Atomix; +import it.tdlight.jni.TdApi; +import it.tdlight.reactiveapi.Event.ClientBoundEvent; +import it.tdlight.reactiveapi.Event.OnBotLoginCodeRequested; +import it.tdlight.reactiveapi.Event.OnUpdateData; +import it.tdlight.reactiveapi.Event.OnUpdateError; +import it.tdlight.reactiveapi.Event.OnUserLoginCodeRequested; +import it.tdlight.reactiveapi.Event.Request; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.time.Duration; +import java.time.Instant; +import java.util.concurrent.CompletableFuture; +import org.apache.commons.lang3.SerializationException; +import reactor.core.publisher.BufferOverflowStrategy; +import reactor.core.publisher.Flux; +import reactor.core.publisher.FluxSink.OverflowStrategy; +import reactor.core.publisher.Mono; + +public class AtomixReactiveApiClient implements ReactiveApiClient { + + private final ClusterEventService eventService; + private final long liveId; + private final long userId; + + public AtomixReactiveApiClient(Atomix atomix, long liveId, long userId) { + this.eventService = atomix.getEventService(); + this.liveId = liveId; + this.userId = userId; + } + + @Override + public Flux clientBoundEvents() { + return Flux.push(sink -> { + var subscriptionFuture = eventService.subscribe("session-" + liveId + "-client-bound-events", + this::deserializeEvent, + s -> { + sink.next(s); + return CompletableFuture.completedFuture(null); + }, + (a) -> null + ); + sink.onDispose(() -> subscriptionFuture.thenAccept(Subscription::close)); + }, OverflowStrategy.ERROR).onBackpressureBuffer(0xFFFF, BufferOverflowStrategy.ERROR); + } + + @Override + public Mono request(TdApi.Function request, Instant timeout) { + return Mono.fromCompletionStage(() -> eventService.send("session-" + liveId + "-requests", + new Request<>(liveId, request, timeout), + AtomixReactiveApiClient::serializeRequest, + AtomixReactiveApiClient::deserializeResponse, + Duration.between(Instant.now(), timeout) + )); + } + + @SuppressWarnings("unchecked") + private static R deserializeResponse(byte[] bytes) { + try { + return (R) TdApi.Deserializer.deserialize(new DataInputStream(new ByteArrayInputStream(bytes))); + } catch (IOException ex) { + throw new SerializationException(ex); + } + } + + private static byte[] serializeRequest(Request request) { + try (var byteArrayOutputStream = new ByteArrayOutputStream()) { + try (var dataOutputStream = new DataOutputStream(byteArrayOutputStream)) { + dataOutputStream.writeLong(request.liveId()); + request.request().serialize(dataOutputStream); + dataOutputStream.writeLong(request.timeout().toEpochMilli()); + return byteArrayOutputStream.toByteArray(); + } + } catch (IOException ex) { + throw new SerializationException(ex); + } + } + + private ClientBoundEvent deserializeEvent(byte[] bytes) { + try (var byteArrayInputStream = new ByteArrayInputStream(bytes)) { + try (var dataInputStream = new DataInputStream(byteArrayInputStream)) { + return switch (dataInputStream.readByte()) { + case 0x01 -> new OnUpdateData(liveId, userId, (TdApi.Update) TdApi.Deserializer.deserialize(dataInputStream)); + case 0x02 -> new OnUpdateError(liveId, userId, (TdApi.Error) TdApi.Deserializer.deserialize(dataInputStream)); + case 0x03 -> new OnUserLoginCodeRequested(liveId, userId, dataInputStream.readLong()); + case 0x04 -> new OnBotLoginCodeRequested(liveId, userId, dataInputStream.readUTF()); + default -> throw new IllegalStateException("Unexpected value: " + dataInputStream.readByte()); + }; + } + } catch (IOException ex) { + throw new SerializationException(ex); + } + } +} diff --git a/src/main/java/it/tdlight/reactiveapi/ReactiveApiClient.java b/src/main/java/it/tdlight/reactiveapi/ReactiveApiClient.java new file mode 100644 index 0000000..701d302 --- /dev/null +++ b/src/main/java/it/tdlight/reactiveapi/ReactiveApiClient.java @@ -0,0 +1,14 @@ +package it.tdlight.reactiveapi; + +import it.tdlight.jni.TdApi; +import it.tdlight.reactiveapi.Event.ClientBoundEvent; +import java.time.Instant; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +public interface ReactiveApiClient { + + Flux clientBoundEvents(); + + Mono request(TdApi.Function request, Instant timeout); +} diff --git a/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java b/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java index 1d80bda..250ab81 100644 --- a/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java +++ b/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java @@ -98,9 +98,7 @@ public abstract class ReactiveApiPublisher { .subscribeOn(Schedulers.parallel()) // Handle signals, then return a ResultingEvent .mapNotNull(this::onSignal) - .doFinally(s -> { - LOG.trace("Finalized telegram client events"); - }) + .doFinally(s -> LOG.trace("Finalized telegram client events")) .publish(); publishedResultingEvents @@ -293,16 +291,16 @@ public abstract class ReactiveApiPublisher { try (var byteArrayOutputStream = new ByteArrayOutputStream()) { try (var dataOutputStream = new DataOutputStream(byteArrayOutputStream)) { if (clientBoundEvent instanceof OnUpdateData onUpdateData) { - dataOutputStream.write(0x1); + dataOutputStream.writeByte(0x1); onUpdateData.update().serialize(dataOutputStream); } else if (clientBoundEvent instanceof OnUpdateError onUpdateError) { - dataOutputStream.write(0x2); + dataOutputStream.writeByte(0x2); onUpdateError.error().serialize(dataOutputStream); } else if (clientBoundEvent instanceof OnUserLoginCodeRequested onUserLoginCodeRequested) { - dataOutputStream.write(0x3); + dataOutputStream.writeByte(0x3); dataOutputStream.writeLong(onUserLoginCodeRequested.phoneNumber()); } else if (clientBoundEvent instanceof OnBotLoginCodeRequested onBotLoginCodeRequested) { - dataOutputStream.write(0x4); + dataOutputStream.writeByte(0x4); dataOutputStream.writeUTF(onBotLoginCodeRequested.token()); } else { throw new UnsupportedOperationException("Unexpected value: " + clientBoundEvent); diff --git a/src/main/java/it/tdlight/reactiveapi/ReactiveApiSubscriber.java b/src/main/java/it/tdlight/reactiveapi/ReactiveApiSubscriber.java deleted file mode 100644 index 88045d9..0000000 --- a/src/main/java/it/tdlight/reactiveapi/ReactiveApiSubscriber.java +++ /dev/null @@ -1,5 +0,0 @@ -package it.tdlight.reactiveapi; - -public class ReactiveApiSubscriber { - -}