Request timeouts

This commit is contained in:
Andrea Cavalli 2022-01-16 15:55:28 +01:00
parent 2e21f765ab
commit 788101aa0f
2 changed files with 14 additions and 1 deletions

View File

@ -7,6 +7,7 @@ import it.tdlight.reactiveapi.Event.ClientBoundEvent;
import it.tdlight.reactiveapi.Event.Request;
import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
@ -65,6 +66,8 @@ public class DynamicAtomixReactiveApiClient implements ReactiveApiClient, AutoCl
)).subscribeOn(Schedulers.boundedElastic()).onErrorMap(ex -> {
if (ex instanceof MessagingException.NoRemoteHandler) {
return new TdError(404, "Bot #IDU" + this.userId + " (liveId: " + liveId + ") is not found on the cluster");
} else if (ex instanceof TimeoutException) {
return new TdError(408, "Request Timeout", ex);
} else {
return ex;
}

View File

@ -1,6 +1,7 @@
package it.tdlight.reactiveapi;
import io.atomix.cluster.messaging.ClusterEventService;
import io.atomix.cluster.messaging.MessagingException;
import io.atomix.core.Atomix;
import it.tdlight.jni.TdApi;
import it.tdlight.reactiveapi.Event.ClientBoundEvent;
@ -20,6 +21,7 @@ import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeoutException;
import org.apache.commons.lang3.SerializationException;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@ -54,7 +56,15 @@ public class LiveAtomixReactiveApiClient implements ReactiveApiClient {
LiveAtomixReactiveApiClient::deserializeResponse,
Duration.between(Instant.now(), timeout)
))
.subscribeOn(Schedulers.boundedElastic())
.subscribeOn(Schedulers.boundedElastic()).onErrorMap(ex -> {
if (ex instanceof MessagingException.NoRemoteHandler) {
return new TdError(404, "Bot #IDU" + this.userId + " (liveId: " + liveId + ") is not found on the cluster");
} else if (ex instanceof TimeoutException) {
return new TdError(408, "Request Timeout", ex);
} else {
return ex;
}
})
.handle((item, sink) -> {
if (item instanceof TdApi.Error error) {
sink.error(new TdError(error.code, error.message));