This commit is contained in:
Andrea Cavalli 2022-01-14 00:58:35 +01:00
parent 3dd6241e2c
commit 4c4b7a3677
4 changed files with 13 additions and 0 deletions

View File

@ -7,6 +7,9 @@ public class ClientBoundEventDeserializer implements Deserializer<ClientBoundEve
@Override @Override
public ClientBoundEvent deserialize(String topic, byte[] data) { public ClientBoundEvent deserialize(String topic, byte[] data) {
if (data == null || data.length == 0) {
return null;
}
return LiveAtomixReactiveApiClient.deserializeEvent(data); return LiveAtomixReactiveApiClient.deserializeEvent(data);
} }
} }

View File

@ -7,6 +7,9 @@ public class ClientBoundEventSerializer implements Serializer<ClientBoundEvent>
@Override @Override
public byte[] serialize(String topic, ClientBoundEvent data) { public byte[] serialize(String topic, ClientBoundEvent data) {
if (data == null) {
return null;
}
return ReactiveApiPublisher.serializeEvent(data); return ReactiveApiPublisher.serializeEvent(data);
} }
} }

View File

@ -33,6 +33,8 @@ public class DynamicAtomixReactiveApiClient implements ReactiveApiClient, AutoCl
private final Flux<Long> liveIdChange; private final Flux<Long> liveIdChange;
private final Mono<Long> liveIdResolution; private final Mono<Long> liveIdResolution;
private volatile boolean closed;
DynamicAtomixReactiveApiClient(AtomixReactiveApi api, KafkaConsumer kafkaConsumer, long userId, String subGroupId) { DynamicAtomixReactiveApiClient(AtomixReactiveApi api, KafkaConsumer kafkaConsumer, long userId, String subGroupId) {
this.api = api; this.api = api;
this.eventService = api.getAtomix().getEventService(); this.eventService = api.getAtomix().getEventService();
@ -40,6 +42,7 @@ public class DynamicAtomixReactiveApiClient implements ReactiveApiClient, AutoCl
clientBoundEvents = kafkaConsumer.consumeMessages(subGroupId, true, userId) clientBoundEvents = kafkaConsumer.consumeMessages(subGroupId, true, userId)
.doOnNext(e -> liveId.set(e.liveId())) .doOnNext(e -> liveId.set(e.liveId()))
.takeWhile(n -> !closed)
.share(); .share();
liveIdChange = this.clientBoundEvents() liveIdChange = this.clientBoundEvents()
@ -117,6 +120,7 @@ public class DynamicAtomixReactiveApiClient implements ReactiveApiClient, AutoCl
} }
public void close() { public void close() {
this.closed = true;
liveIdSubscription.dispose(); liveIdSubscription.dispose();
} }
} }

View File

@ -77,6 +77,9 @@ public class LiveAtomixReactiveApiClient implements ReactiveApiClient {
static TdApi.Object deserializeResponse(byte[] bytes) { static TdApi.Object deserializeResponse(byte[] bytes) {
try { try {
if (bytes == null || bytes.length == 0) {
return null;
}
return TdApi.Deserializer.deserialize(new DataInputStream(new ByteArrayInputStream(bytes))); return TdApi.Deserializer.deserialize(new DataInputStream(new ByteArrayInputStream(bytes)));
} catch (IOException ex) { } catch (IOException ex) {
throw new SerializationException(ex); throw new SerializationException(ex);