diff --git a/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java b/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java index fbbddfc..ade34c7 100644 --- a/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java +++ b/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java @@ -3,6 +3,7 @@ package it.tdlight.reactiveapi; import static it.tdlight.reactiveapi.AuthPhase.LOGGED_IN; import static it.tdlight.reactiveapi.AuthPhase.LOGGED_OUT; import static java.util.Objects.requireNonNull; +import static java.util.concurrent.CompletableFuture.completedFuture; import com.google.common.primitives.Longs; import io.atomix.cluster.messaging.ClusterEventService; @@ -65,7 +66,7 @@ public abstract class ReactiveApiPublisher { private static final Logger LOG = LoggerFactory.getLogger(ReactiveApiPublisher.class); - private static final Duration SPECIAL_RAW_TIMEOUT_DURATION = Duration.ofMinutes(1); + private static final Duration SPECIAL_RAW_TIMEOUT_DURATION = Duration.ofMinutes(3); private final KafkaProducer kafkaProducer; private final ClusterEventService eventService; @@ -459,7 +460,7 @@ public abstract class ReactiveApiPublisher { // Start receiving request return eventService.subscribe(dynamicIdResolveSubject, b -> null, - r -> CompletableFuture.completedFuture(liveId), + r -> completedFuture(liveId), Longs::toByteArray ); } @@ -480,35 +481,31 @@ public abstract class ReactiveApiPublisher { } private CompletableFuture handleRequest(Request requestObj) { - return Mono - .just(requestObj) - .filter(req -> { - if (liveId != req.liveId()) { - LOG.error("Received a request for another session!"); - return false; - } else { - return true; - } - }) - .map(req -> new RequestWithTimeoutInstant<>(req.request(), req.timeout())) - .flatMap(requestWithTimeoutInstant -> { - var state = this.state.get(); - if (state.authPhase() == LOGGED_IN) { - var request = requestWithTimeoutInstant.request(); - var timeoutDuration = Duration.between(Instant.now(), requestWithTimeoutInstant.timeout()); - if (timeoutDuration.isZero() || timeoutDuration.isNegative()) { - LOG.error("Received an expired request. Expiration: {}", requestWithTimeoutInstant.timeout()); - } + if (liveId != requestObj.liveId()) { + LOG.error("Received a request for another session!"); + return completedFuture(new Response(liveId, + new TdApi.Error(400, "The request live id is different than the current live id") + )); + } else { + var requestWithTimeoutInstant = new RequestWithTimeoutInstant<>(requestObj.request(), requestObj.timeout()); + var state = this.state.get(); + if (state.authPhase() == LOGGED_IN) { + var request = requestWithTimeoutInstant.request(); + var timeoutDuration = Duration.between(Instant.now(), requestWithTimeoutInstant.timeout()); + if (timeoutDuration.isZero() || timeoutDuration.isNegative()) { + LOG.error("Received an expired request. Expiration: {}", requestWithTimeoutInstant.timeout()); + } - return Mono.from(rawTelegramClient.send(request, timeoutDuration)); - } else { - LOG.error("Ignored a request because the current state is {}. Request: {}", state, requestObj); - return Mono.empty(); - } - }) - .map(responseObj -> new Response(liveId, responseObj)) - .publishOn(Schedulers.boundedElastic()) - .toFuture(); + return Mono + .from(rawTelegramClient.send(request, timeoutDuration)) + .map(responseObj -> new Response(liveId, responseObj)) + .publishOn(Schedulers.boundedElastic()) + .toFuture(); + } else { + LOG.error("Ignored a request because the current state is {}. Request: {}", state, requestObj); + return completedFuture(new Response(liveId, new TdApi.Error(503, "Service Unavailable: " + state))); + } + } } private static Request deserializeRequest(byte[] bytes) {