Implement timeouts properly

This commit is contained in:
Andrea Cavalli 2022-06-28 00:11:34 +02:00
parent 07c22d39f2
commit d931217e81
1 changed files with 5 additions and 4 deletions

View File

@ -102,14 +102,16 @@ abstract class BaseAtomixReactiveApiClient implements ReactiveApiClient, AutoClo
return Mono.defer(() -> {
var requestId = this.requestId.getAndIncrement();
var timeoutError = new TdError(408, "Request Timeout");
Mono<T> timeoutErrorMono = Mono.error(timeoutError);
var timeoutDuration = Duration.between(Instant.now(), timeout);
var requestTimestamp = Instant.now();
var timeoutDuration = Duration.between(requestTimestamp, timeout);
if (timeoutDuration.isNegative() || timeoutDuration.isZero()) {
return timeoutErrorMono;
return Mono.error(timeoutError);
}
var cf = new CompletableFuture<Timestamped<OnResponse<TdApi.Object>>>();
this.responses.put(requestId, cf);
Mono<T> response = Mono.fromFuture(cf)
.timeout(timeoutDuration, Mono.fromSupplier(() -> new Timestamped<>(requestTimestamp.toEpochMilli(),
new Response<>(clientId, requestId, userId, new TdApi.Error(408, "Request Timeout")))))
.<T>handle((responseObj, sink) -> {
if (Instant.ofEpochMilli(responseObj.timestamp()).compareTo(timeout) > 0) {
sink.error(new TdError(408, "Request Timeout"));
@ -128,7 +130,6 @@ abstract class BaseAtomixReactiveApiClient implements ReactiveApiClient, AutoClo
sink.error(new UnsupportedOperationException("Unknown response type: " + responseObj.data().getClass()));
}
})
.timeout(timeoutDuration, timeoutErrorMono)
.doFinally(s -> this.responses.remove(requestId));
requests.emitNext(new Request<>(clientId, requestId, request, timeout), EmitFailureHandler.busyLooping(TEN_MS));
return response;