From 4c4b7a3677b8a2e755c54e8f2e75b2428b64f5ed Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Fri, 14 Jan 2022 00:58:35 +0100 Subject: [PATCH] Bugfixes --- .../it/tdlight/reactiveapi/ClientBoundEventDeserializer.java | 3 +++ .../it/tdlight/reactiveapi/ClientBoundEventSerializer.java | 3 +++ .../tdlight/reactiveapi/DynamicAtomixReactiveApiClient.java | 4 ++++ .../it/tdlight/reactiveapi/LiveAtomixReactiveApiClient.java | 3 +++ 4 files changed, 13 insertions(+) diff --git a/src/main/java/it/tdlight/reactiveapi/ClientBoundEventDeserializer.java b/src/main/java/it/tdlight/reactiveapi/ClientBoundEventDeserializer.java index 95dc0ea..4d1d34e 100644 --- a/src/main/java/it/tdlight/reactiveapi/ClientBoundEventDeserializer.java +++ b/src/main/java/it/tdlight/reactiveapi/ClientBoundEventDeserializer.java @@ -7,6 +7,9 @@ public class ClientBoundEventDeserializer implements Deserializer @Override public byte[] serialize(String topic, ClientBoundEvent data) { + if (data == null) { + return null; + } return ReactiveApiPublisher.serializeEvent(data); } } diff --git a/src/main/java/it/tdlight/reactiveapi/DynamicAtomixReactiveApiClient.java b/src/main/java/it/tdlight/reactiveapi/DynamicAtomixReactiveApiClient.java index 0d43365..6d00853 100644 --- a/src/main/java/it/tdlight/reactiveapi/DynamicAtomixReactiveApiClient.java +++ b/src/main/java/it/tdlight/reactiveapi/DynamicAtomixReactiveApiClient.java @@ -33,6 +33,8 @@ public class DynamicAtomixReactiveApiClient implements ReactiveApiClient, AutoCl private final Flux liveIdChange; private final Mono liveIdResolution; + private volatile boolean closed; + DynamicAtomixReactiveApiClient(AtomixReactiveApi api, KafkaConsumer kafkaConsumer, long userId, String subGroupId) { this.api = api; this.eventService = api.getAtomix().getEventService(); @@ -40,6 +42,7 @@ public class DynamicAtomixReactiveApiClient implements ReactiveApiClient, AutoCl clientBoundEvents = kafkaConsumer.consumeMessages(subGroupId, true, userId) .doOnNext(e -> liveId.set(e.liveId())) + .takeWhile(n -> !closed) .share(); liveIdChange = this.clientBoundEvents() @@ -117,6 +120,7 @@ public class DynamicAtomixReactiveApiClient implements ReactiveApiClient, AutoCl } public void close() { + this.closed = true; liveIdSubscription.dispose(); } } diff --git a/src/main/java/it/tdlight/reactiveapi/LiveAtomixReactiveApiClient.java b/src/main/java/it/tdlight/reactiveapi/LiveAtomixReactiveApiClient.java index 748755d..a3a5ce9 100644 --- a/src/main/java/it/tdlight/reactiveapi/LiveAtomixReactiveApiClient.java +++ b/src/main/java/it/tdlight/reactiveapi/LiveAtomixReactiveApiClient.java @@ -77,6 +77,9 @@ public class LiveAtomixReactiveApiClient implements ReactiveApiClient { static TdApi.Object deserializeResponse(byte[] bytes) { try { + if (bytes == null || bytes.length == 0) { + return null; + } return TdApi.Deserializer.deserialize(new DataInputStream(new ByteArrayInputStream(bytes))); } catch (IOException ex) { throw new SerializationException(ex);