This commit is contained in:
Andrea Cavalli 2022-01-08 18:13:40 +01:00
parent 473783b501
commit 4bbb9cd762
5 changed files with 18 additions and 10 deletions

View File

@ -48,7 +48,7 @@
<dependency>
<groupId>it.tdlight</groupId>
<artifactId>tdlight-java-bom</artifactId>
<version>2.7.10.8</version>
<version>2.8.0.1</version>
<type>pom</type>
<scope>import</scope>
</dependency>

View File

@ -19,6 +19,7 @@ import it.tdlight.reactiveapi.CreateSessionRequest.LoadSessionFromDiskRequest;
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.HashSet;
import java.util.Map;
import java.util.Map.Entry;
@ -399,9 +400,15 @@ public class AtomixReactiveApi implements ReactiveApi {
@Override
public Mono<Long> resolveUserLiveId(long userId) {
return Mono.fromCompletionStage(() -> atomix
.getEventService()
.send(SubjectNaming.getDynamicIdResolveSubject(userId), userId, Longs::toByteArray, Longs::fromByteArray))
return Mono
.fromCompletionStage(() -> atomix
.getEventService()
.send(SubjectNaming.getDynamicIdResolveSubject(userId),
userId,
Longs::toByteArray,
Longs::fromByteArray,
Duration.ofSeconds(1)
))
.onErrorResume(ex -> {
if (ex instanceof MessagingException.NoRemoteHandler) {
return Mono.empty();

View File

@ -65,12 +65,12 @@ public sealed interface Event permits ClientBoundEvent, ServerBoundEvent {
public static <T extends TdApi.Object> Request<T> deserialize(DataInput dataInput) {
try {
var liveId = dataInput.readLong();
@SuppressWarnings("unchecked")
TdApi.Function<T> request = (TdApi.Function<T>) TdApi.Deserializer.deserialize(dataInput);
long millis = dataInput.readLong();
var timeout = Instant.ofEpochMilli(millis);
@SuppressWarnings("unchecked")
TdApi.Function<T> request = (TdApi.Function<T>) TdApi.Deserializer.deserialize(dataInput);
return new Request<>(liveId, request, timeout);
} catch (IOException e) {
} catch (UnsupportedOperationException | IOException e) {
throw new SerializationException(e);
}
}

View File

@ -89,11 +89,12 @@ public class LiveAtomixReactiveApiClient implements ReactiveApiClient {
try (var byteArrayOutputStream = new ByteArrayOutputStream()) {
try (var dataOutputStream = new DataOutputStream(byteArrayOutputStream)) {
dataOutputStream.writeLong(request.liveId());
request.request().serialize(dataOutputStream);
dataOutputStream.writeLong(request.timeout().toEpochMilli());
request.request().serialize(dataOutputStream);
dataOutputStream.flush();
return byteArrayOutputStream.toByteArray();
}
} catch (IOException ex) {
} catch (UnsupportedOperationException | IOException ex) {
throw new SerializationException(ex);
}
}

View File

@ -357,7 +357,7 @@ public abstract class ReactiveApiPublisher {
return Mono
.just(requestObj)
.filter(req -> {
if (userId != req.liveId()) {
if (liveId != req.liveId()) {
LOG.error("Received a request for another session!");
return false;
} else {