diff --git a/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApiMultiClient.java b/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApiMultiClient.java index 3da7140..813563b 100644 --- a/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApiMultiClient.java +++ b/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApiMultiClient.java @@ -8,6 +8,7 @@ import it.tdlight.reactiveapi.Event.ClientBoundEvent; import it.tdlight.reactiveapi.Event.Request; import java.time.Duration; import java.time.Instant; +import java.util.List; import java.util.concurrent.CompletableFuture; import reactor.core.publisher.BufferOverflowStrategy; import reactor.core.publisher.Flux; @@ -26,9 +27,9 @@ public class AtomixReactiveApiMultiClient implements ReactiveApiMultiClient, Aut this.eventService = api.getAtomix().getEventService(); clientBoundEvents = Flux - .push(sink -> { + .>push(sink -> { var subscriptionFuture = eventService.subscribe("session-client-bound-events", - LiveAtomixReactiveApiClient::deserializeEvent, + LiveAtomixReactiveApiClient::deserializeEvents, s -> { sink.next(s); return CompletableFuture.completedFuture(null); @@ -38,6 +39,7 @@ public class AtomixReactiveApiMultiClient implements ReactiveApiMultiClient, Aut sink.onDispose(() -> subscriptionFuture.thenAccept(Subscription::close)); }, OverflowStrategy.ERROR) .onBackpressureBuffer(0xFFFF, BufferOverflowStrategy.ERROR) + .flatMapIterable(list -> list) .takeUntil(s -> closed) .share(); } diff --git a/src/main/java/it/tdlight/reactiveapi/DynamicAtomixReactiveApiClient.java b/src/main/java/it/tdlight/reactiveapi/DynamicAtomixReactiveApiClient.java index d981a8d..029034c 100644 --- a/src/main/java/it/tdlight/reactiveapi/DynamicAtomixReactiveApiClient.java +++ b/src/main/java/it/tdlight/reactiveapi/DynamicAtomixReactiveApiClient.java @@ -8,6 +8,7 @@ import it.tdlight.reactiveapi.Event.ClientBoundEvent; import it.tdlight.reactiveapi.Event.Request; import java.time.Duration; import java.time.Instant; +import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicLong; import reactor.core.Disposable; @@ -38,9 +39,9 @@ public class DynamicAtomixReactiveApiClient implements ReactiveApiClient, AutoCl this.userId = userId; clientBoundEvents = Flux - .push(sink -> { + .>push(sink -> { var subscriptionFuture = eventService.subscribe("session-client-bound-events", - LiveAtomixReactiveApiClient::deserializeEvent, + LiveAtomixReactiveApiClient::deserializeEvents, s -> { sink.next(s); return CompletableFuture.completedFuture(null); @@ -49,9 +50,10 @@ public class DynamicAtomixReactiveApiClient implements ReactiveApiClient, AutoCl ); sink.onDispose(() -> subscriptionFuture.thenAccept(Subscription::close)); }, OverflowStrategy.ERROR) + .onBackpressureBuffer(0xFFFF, BufferOverflowStrategy.ERROR) + .flatMapIterable(list -> list) .filter(e -> e.userId() == userId) .doOnNext(e -> liveId.set(e.liveId())) - .onBackpressureBuffer(0xFFFF, BufferOverflowStrategy.ERROR) .share(); liveIdChange = this.clientBoundEvents() diff --git a/src/main/java/it/tdlight/reactiveapi/LiveAtomixReactiveApiClient.java b/src/main/java/it/tdlight/reactiveapi/LiveAtomixReactiveApiClient.java index 2fc1d4e..cb30f8c 100644 --- a/src/main/java/it/tdlight/reactiveapi/LiveAtomixReactiveApiClient.java +++ b/src/main/java/it/tdlight/reactiveapi/LiveAtomixReactiveApiClient.java @@ -19,6 +19,8 @@ import java.io.DataOutputStream; import java.io.IOException; import java.time.Duration; import java.time.Instant; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.CompletableFuture; import org.apache.commons.lang3.SerializationException; import reactor.core.publisher.BufferOverflowStrategy; @@ -39,15 +41,20 @@ public class LiveAtomixReactiveApiClient implements ReactiveApiClient { this.liveId = liveId; this.userId = userId; this.clientBoundEvents = Flux - .push(sink -> { - var subscriptionFuture = eventService.subscribe("session-client-bound-events", LiveAtomixReactiveApiClient::deserializeEvent, s -> { - sink.next(s); - return CompletableFuture.completedFuture(null); - }, (a) -> null); + .>push(sink -> { + var subscriptionFuture = eventService.subscribe("session-client-bound-events", + LiveAtomixReactiveApiClient::deserializeEvents, + s -> { + sink.next(s); + return CompletableFuture.completedFuture(null); + }, + (a) -> null + ); sink.onDispose(() -> subscriptionFuture.thenAccept(Subscription::close)); }, OverflowStrategy.ERROR) - .filter(e -> e.userId() == userId && e.liveId() == liveId) .onBackpressureBuffer(0xFFFF, BufferOverflowStrategy.ERROR) + .flatMapIterable(list -> list) + .filter(e -> e.userId() == userId && e.liveId() == liveId) .share(); } @@ -103,20 +110,39 @@ public class LiveAtomixReactiveApiClient implements ReactiveApiClient { static ClientBoundEvent deserializeEvent(byte[] bytes) { try (var byteArrayInputStream = new ByteArrayInputStream(bytes)) { try (var is = new DataInputStream(byteArrayInputStream)) { - var liveId = is.readLong(); - var userId = is.readLong(); - return switch (is.readByte()) { - case 0x01 -> new OnUpdateData(liveId, userId, (TdApi.Update) TdApi.Deserializer.deserialize(is)); - case 0x02 -> new OnUpdateError(liveId, userId, (TdApi.Error) TdApi.Deserializer.deserialize(is)); - case 0x03 -> new OnUserLoginCodeRequested(liveId, userId, is.readLong()); - case 0x04 -> new OnBotLoginCodeRequested(liveId, userId, is.readUTF()); - case 0x05 -> new OnOtherDeviceLoginRequested(liveId, userId, is.readUTF()); - case 0x06 -> new OnPasswordRequested(liveId, userId, is.readUTF(), is.readBoolean(), is.readUTF()); - default -> throw new IllegalStateException("Unexpected value: " + is.readByte()); - }; + return deserializeEvent(is); } } catch (IOException ex) { throw new SerializationException(ex); } } + + static List deserializeEvents(byte[] bytes) { + try (var byteArrayInputStream = new ByteArrayInputStream(bytes)) { + try (var is = new DataInputStream(byteArrayInputStream)) { + var len = is.readInt(); + var result = new ArrayList(len); + for (int i = 0; i < len; i++) { + result.add(deserializeEvent(is)); + } + return result; + } + } catch (IOException ex) { + throw new SerializationException(ex); + } + } + + static ClientBoundEvent deserializeEvent(DataInputStream is) throws IOException { + var liveId = is.readLong(); + var userId = is.readLong(); + return switch (is.readByte()) { + case 0x01 -> new OnUpdateData(liveId, userId, (TdApi.Update) TdApi.Deserializer.deserialize(is)); + case 0x02 -> new OnUpdateError(liveId, userId, (TdApi.Error) TdApi.Deserializer.deserialize(is)); + case 0x03 -> new OnUserLoginCodeRequested(liveId, userId, is.readLong()); + case 0x04 -> new OnBotLoginCodeRequested(liveId, userId, is.readUTF()); + case 0x05 -> new OnOtherDeviceLoginRequested(liveId, userId, is.readUTF()); + case 0x06 -> new OnPasswordRequested(liveId, userId, is.readUTF(), is.readBoolean(), is.readUTF()); + default -> throw new IllegalStateException("Unexpected value: " + is.readByte()); + }; + } } diff --git a/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java b/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java index 1f08d8a..5fa3caa 100644 --- a/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java +++ b/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java @@ -177,10 +177,13 @@ public abstract class ReactiveApiPublisher { .cast(ClientBoundResultingEvent.class) .map(ClientBoundResultingEvent::event) + .limitRate(1) + .bufferTimeout(64, Duration.ofMillis(10)) + // Send events to the client .subscribeOn(Schedulers.parallel()) .subscribe(clientBoundEvent -> eventService.broadcast("session-client-bound-events", - clientBoundEvent, ReactiveApiPublisher::serializeEvent)); + clientBoundEvent, ReactiveApiPublisher::serializeEvents)); publishedResultingEvents // Obtain only cluster-bound events @@ -352,33 +355,12 @@ public abstract class ReactiveApiPublisher { return List.of(); } - private static byte[] serializeEvent(ClientBoundEvent clientBoundEvent) { + private static byte[] serializeEvents(List clientBoundEvents) { try (var byteArrayOutputStream = new ByteArrayOutputStream()) { try (var dataOutputStream = new DataOutputStream(byteArrayOutputStream)) { - dataOutputStream.writeLong(clientBoundEvent.liveId()); - dataOutputStream.writeLong(clientBoundEvent.userId()); - if (clientBoundEvent instanceof OnUpdateData onUpdateData) { - dataOutputStream.writeByte(0x1); - onUpdateData.update().serialize(dataOutputStream); - } else if (clientBoundEvent instanceof OnUpdateError onUpdateError) { - dataOutputStream.writeByte(0x2); - onUpdateError.error().serialize(dataOutputStream); - } else if (clientBoundEvent instanceof OnUserLoginCodeRequested onUserLoginCodeRequested) { - dataOutputStream.writeByte(0x3); - dataOutputStream.writeLong(onUserLoginCodeRequested.phoneNumber()); - } else if (clientBoundEvent instanceof OnBotLoginCodeRequested onBotLoginCodeRequested) { - dataOutputStream.writeByte(0x4); - dataOutputStream.writeUTF(onBotLoginCodeRequested.token()); - } else if (clientBoundEvent instanceof OnOtherDeviceLoginRequested onOtherDeviceLoginRequested) { - dataOutputStream.writeByte(0x5); - dataOutputStream.writeUTF(onOtherDeviceLoginRequested.link()); - } else if (clientBoundEvent instanceof OnPasswordRequested onPasswordRequested) { - dataOutputStream.writeByte(0x6); - dataOutputStream.writeUTF(onPasswordRequested.passwordHint()); - dataOutputStream.writeBoolean(onPasswordRequested.hasRecoveryEmail()); - dataOutputStream.writeUTF(onPasswordRequested.recoveryEmailPattern()); - } else { - throw new UnsupportedOperationException("Unexpected value: " + clientBoundEvent); + dataOutputStream.writeInt(clientBoundEvents.size()); + for (ClientBoundEvent clientBoundEvent : clientBoundEvents) { + writeClientBoundEvent(clientBoundEvent, dataOutputStream); } return byteArrayOutputStream.toByteArray(); } @@ -387,6 +369,46 @@ public abstract class ReactiveApiPublisher { } } + private static byte[] serializeEvent(ClientBoundEvent clientBoundEvent) { + try (var byteArrayOutputStream = new ByteArrayOutputStream()) { + try (var dataOutputStream = new DataOutputStream(byteArrayOutputStream)) { + writeClientBoundEvent(clientBoundEvent, dataOutputStream); + return byteArrayOutputStream.toByteArray(); + } + } catch (IOException ex) { + throw new SerializationException(ex); + } + } + + private static void writeClientBoundEvent(ClientBoundEvent clientBoundEvent, DataOutputStream dataOutputStream) + throws IOException { + dataOutputStream.writeLong(clientBoundEvent.liveId()); + dataOutputStream.writeLong(clientBoundEvent.userId()); + if (clientBoundEvent instanceof OnUpdateData onUpdateData) { + dataOutputStream.writeByte(0x1); + onUpdateData.update().serialize(dataOutputStream); + } else if (clientBoundEvent instanceof OnUpdateError onUpdateError) { + dataOutputStream.writeByte(0x2); + onUpdateError.error().serialize(dataOutputStream); + } else if (clientBoundEvent instanceof OnUserLoginCodeRequested onUserLoginCodeRequested) { + dataOutputStream.writeByte(0x3); + dataOutputStream.writeLong(onUserLoginCodeRequested.phoneNumber()); + } else if (clientBoundEvent instanceof OnBotLoginCodeRequested onBotLoginCodeRequested) { + dataOutputStream.writeByte(0x4); + dataOutputStream.writeUTF(onBotLoginCodeRequested.token()); + } else if (clientBoundEvent instanceof OnOtherDeviceLoginRequested onOtherDeviceLoginRequested) { + dataOutputStream.writeByte(0x5); + dataOutputStream.writeUTF(onOtherDeviceLoginRequested.link()); + } else if (clientBoundEvent instanceof OnPasswordRequested onPasswordRequested) { + dataOutputStream.writeByte(0x6); + dataOutputStream.writeUTF(onPasswordRequested.passwordHint()); + dataOutputStream.writeBoolean(onPasswordRequested.hasRecoveryEmail()); + dataOutputStream.writeUTF(onPasswordRequested.recoveryEmailPattern()); + } else { + throw new UnsupportedOperationException("Unexpected value: " + clientBoundEvent); + } + } + private CompletableFuture registerTopics() { // Start receiving requests eventService.subscribe("session-" + liveId + "-requests",