diff --git a/pom.xml b/pom.xml index 8d777b9..8c37e74 100644 --- a/pom.xml +++ b/pom.xml @@ -48,7 +48,7 @@ it.tdlight tdlight-java-bom - 2.7.10.8 + 2.8.0.1 pom import diff --git a/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApi.java b/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApi.java index b726e05..77f05d5 100644 --- a/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApi.java +++ b/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApi.java @@ -19,6 +19,7 @@ import it.tdlight.reactiveapi.CreateSessionRequest.LoadSessionFromDiskRequest; import java.io.IOException; import java.nio.file.Path; import java.nio.file.Paths; +import java.time.Duration; import java.util.HashSet; import java.util.Map; import java.util.Map.Entry; @@ -399,9 +400,15 @@ public class AtomixReactiveApi implements ReactiveApi { @Override public Mono resolveUserLiveId(long userId) { - return Mono.fromCompletionStage(() -> atomix - .getEventService() - .send(SubjectNaming.getDynamicIdResolveSubject(userId), userId, Longs::toByteArray, Longs::fromByteArray)) + return Mono + .fromCompletionStage(() -> atomix + .getEventService() + .send(SubjectNaming.getDynamicIdResolveSubject(userId), + userId, + Longs::toByteArray, + Longs::fromByteArray, + Duration.ofSeconds(1) + )) .onErrorResume(ex -> { if (ex instanceof MessagingException.NoRemoteHandler) { return Mono.empty(); diff --git a/src/main/java/it/tdlight/reactiveapi/Event.java b/src/main/java/it/tdlight/reactiveapi/Event.java index 56d4cd2..659df71 100644 --- a/src/main/java/it/tdlight/reactiveapi/Event.java +++ b/src/main/java/it/tdlight/reactiveapi/Event.java @@ -65,12 +65,12 @@ public sealed interface Event permits ClientBoundEvent, ServerBoundEvent { public static Request deserialize(DataInput dataInput) { try { var liveId = dataInput.readLong(); - @SuppressWarnings("unchecked") - TdApi.Function request = (TdApi.Function) TdApi.Deserializer.deserialize(dataInput); long millis = dataInput.readLong(); var timeout = Instant.ofEpochMilli(millis); + @SuppressWarnings("unchecked") + TdApi.Function request = (TdApi.Function) TdApi.Deserializer.deserialize(dataInput); return new Request<>(liveId, request, timeout); - } catch (IOException e) { + } catch (UnsupportedOperationException | IOException e) { throw new SerializationException(e); } } diff --git a/src/main/java/it/tdlight/reactiveapi/LiveAtomixReactiveApiClient.java b/src/main/java/it/tdlight/reactiveapi/LiveAtomixReactiveApiClient.java index d86a263..5a9638f 100644 --- a/src/main/java/it/tdlight/reactiveapi/LiveAtomixReactiveApiClient.java +++ b/src/main/java/it/tdlight/reactiveapi/LiveAtomixReactiveApiClient.java @@ -89,11 +89,12 @@ public class LiveAtomixReactiveApiClient implements ReactiveApiClient { try (var byteArrayOutputStream = new ByteArrayOutputStream()) { try (var dataOutputStream = new DataOutputStream(byteArrayOutputStream)) { dataOutputStream.writeLong(request.liveId()); - request.request().serialize(dataOutputStream); dataOutputStream.writeLong(request.timeout().toEpochMilli()); + request.request().serialize(dataOutputStream); + dataOutputStream.flush(); return byteArrayOutputStream.toByteArray(); } - } catch (IOException ex) { + } catch (UnsupportedOperationException | IOException ex) { throw new SerializationException(ex); } } diff --git a/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java b/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java index 156d97a..418dd41 100644 --- a/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java +++ b/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java @@ -357,7 +357,7 @@ public abstract class ReactiveApiPublisher { return Mono .just(requestObj) .filter(req -> { - if (userId != req.liveId()) { + if (liveId != req.liveId()) { LOG.error("Received a request for another session!"); return false; } else {