Fix requests handling

This commit is contained in:
Andrea Cavalli 2022-01-21 19:45:46 +01:00
parent 7e166b0920
commit e1fee1f90d
1 changed files with 27 additions and 30 deletions

View File

@ -3,6 +3,7 @@ package it.tdlight.reactiveapi;
import static it.tdlight.reactiveapi.AuthPhase.LOGGED_IN;
import static it.tdlight.reactiveapi.AuthPhase.LOGGED_OUT;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.CompletableFuture.completedFuture;
import com.google.common.primitives.Longs;
import io.atomix.cluster.messaging.ClusterEventService;
@ -65,7 +66,7 @@ public abstract class ReactiveApiPublisher {
private static final Logger LOG = LoggerFactory.getLogger(ReactiveApiPublisher.class);
private static final Duration SPECIAL_RAW_TIMEOUT_DURATION = Duration.ofMinutes(1);
private static final Duration SPECIAL_RAW_TIMEOUT_DURATION = Duration.ofMinutes(3);
private final KafkaProducer kafkaProducer;
private final ClusterEventService eventService;
@ -459,7 +460,7 @@ public abstract class ReactiveApiPublisher {
// Start receiving request
return eventService.subscribe(dynamicIdResolveSubject,
b -> null,
r -> CompletableFuture.completedFuture(liveId),
r -> completedFuture(liveId),
Longs::toByteArray
);
}
@ -480,35 +481,31 @@ public abstract class ReactiveApiPublisher {
}
private CompletableFuture<Response> handleRequest(Request<Object> requestObj) {
return Mono
.just(requestObj)
.filter(req -> {
if (liveId != req.liveId()) {
LOG.error("Received a request for another session!");
return false;
} else {
return true;
}
})
.map(req -> new RequestWithTimeoutInstant<>(req.request(), req.timeout()))
.flatMap(requestWithTimeoutInstant -> {
var state = this.state.get();
if (state.authPhase() == LOGGED_IN) {
var request = requestWithTimeoutInstant.request();
var timeoutDuration = Duration.between(Instant.now(), requestWithTimeoutInstant.timeout());
if (timeoutDuration.isZero() || timeoutDuration.isNegative()) {
LOG.error("Received an expired request. Expiration: {}", requestWithTimeoutInstant.timeout());
}
if (liveId != requestObj.liveId()) {
LOG.error("Received a request for another session!");
return completedFuture(new Response(liveId,
new TdApi.Error(400, "The request live id is different than the current live id")
));
} else {
var requestWithTimeoutInstant = new RequestWithTimeoutInstant<>(requestObj.request(), requestObj.timeout());
var state = this.state.get();
if (state.authPhase() == LOGGED_IN) {
var request = requestWithTimeoutInstant.request();
var timeoutDuration = Duration.between(Instant.now(), requestWithTimeoutInstant.timeout());
if (timeoutDuration.isZero() || timeoutDuration.isNegative()) {
LOG.error("Received an expired request. Expiration: {}", requestWithTimeoutInstant.timeout());
}
return Mono.from(rawTelegramClient.send(request, timeoutDuration));
} else {
LOG.error("Ignored a request because the current state is {}. Request: {}", state, requestObj);
return Mono.empty();
}
})
.map(responseObj -> new Response(liveId, responseObj))
.publishOn(Schedulers.boundedElastic())
.toFuture();
return Mono
.from(rawTelegramClient.send(request, timeoutDuration))
.map(responseObj -> new Response(liveId, responseObj))
.publishOn(Schedulers.boundedElastic())
.toFuture();
} else {
LOG.error("Ignored a request because the current state is {}. Request: {}", state, requestObj);
return completedFuture(new Response(liveId, new TdApi.Error(503, "Service Unavailable: " + state)));
}
}
}
private static <T extends TdApi.Object> Request<T> deserializeRequest(byte[] bytes) {