Add timeouts in reactive telegram clients

This commit is contained in:
Andrea Cavalli 2021-10-02 15:02:48 +02:00
parent e36ca566da
commit 19d3a845ef
2 changed files with 33 additions and 12 deletions

View File

@ -1,6 +1,7 @@
package it.tdlight.common; package it.tdlight.common;
import it.tdlight.jni.TdApi; import it.tdlight.jni.TdApi;
import java.time.Duration;
import org.reactivestreams.Publisher; import org.reactivestreams.Publisher;
@SuppressWarnings("ReactiveStreamsPublisherImplementation") @SuppressWarnings("ReactiveStreamsPublisherImplementation")
@ -14,11 +15,12 @@ public interface ReactiveTelegramClient extends Publisher<ReactiveItem> {
/** /**
* Sends a request to the TDLib. * Sends a request to the TDLib.
* *
* @param query Object representing a query to the TDLib. * @param query Object representing a query to the TDLib.
* @param timeout Response timeout
* @return a publisher that will emit exactly one item, or an error * @return a publisher that will emit exactly one item, or an error
* @throws NullPointerException if query is null. * @throws NullPointerException if query is null.
*/ */
Publisher<TdApi.Object> send(TdApi.Function query); Publisher<TdApi.Object> send(TdApi.Function query, Duration timeout);
/** /**
* Synchronously executes a TDLib request. Only a few marked accordingly requests can be executed synchronously. * Synchronously executes a TDLib request. Only a few marked accordingly requests can be executed synchronously.

View File

@ -7,10 +7,15 @@ import it.tdlight.common.ReactiveTelegramClient;
import it.tdlight.jni.TdApi; import it.tdlight.jni.TdApi;
import it.tdlight.jni.TdApi.Error; import it.tdlight.jni.TdApi.Error;
import it.tdlight.jni.TdApi.Function; import it.tdlight.jni.TdApi.Function;
import java.time.Duration;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import org.reactivestreams.Publisher; import org.reactivestreams.Publisher;
@ -22,7 +27,8 @@ import org.slf4j.LoggerFactory;
public final class InternalReactiveClient implements ClientEventsHandler, ReactiveTelegramClient { public final class InternalReactiveClient implements ClientEventsHandler, ReactiveTelegramClient {
private static final Logger logger = LoggerFactory.getLogger(InternalReactiveClient.class); private static final Logger logger = LoggerFactory.getLogger(InternalReactiveClient.class);
private final ConcurrentHashMap<Long, Handler> handlers = new ConcurrentHashMap<Long, Handler>(); private final ConcurrentHashMap<Long, Handler> handlers = new ConcurrentHashMap<>();
private final ScheduledExecutorService timers = Executors.newSingleThreadScheduledExecutor();
private final ConcurrentLinkedQueue<ReactiveItem> backpressureQueue = new ConcurrentLinkedQueue<>(); private final ConcurrentLinkedQueue<ReactiveItem> backpressureQueue = new ConcurrentLinkedQueue<>();
private final ExceptionHandler defaultExceptionHandler; private final ExceptionHandler defaultExceptionHandler;
private final Handler updateHandler; private final Handler updateHandler;
@ -77,9 +83,7 @@ public final class InternalReactiveClient implements ClientEventsHandler, Reacti
} }
private void handleClose() { private void handleClose() {
handlers.forEach((eventId, handler) -> { handlers.forEach((eventId, handler) -> handleResponse(eventId, new Error(500, "Instance closed"), handler));
handleResponse(eventId, new Error(500, "Instance closed"), handler);
});
handlers.clear(); handlers.clear();
} }
@ -150,7 +154,7 @@ public final class InternalReactiveClient implements ClientEventsHandler, Reacti
@Override @Override
public void cancel() { public void cancel() {
if (!isClosed.get()) { if (!isClosed.get()) {
send(new TdApi.Close()).subscribe(new Subscriber<TdApi.Object>() { send(new TdApi.Close(), Duration.ofDays(1)).subscribe(new Subscriber<TdApi.Object>() {
@Override @Override
public void onSubscribe(Subscription subscription) { public void onSubscribe(Subscription subscription) {
subscription.request(1); subscription.request(1);
@ -193,7 +197,7 @@ public final class InternalReactiveClient implements ClientEventsHandler, Reacti
CountDownLatch registeredClient = new CountDownLatch(1); CountDownLatch registeredClient = new CountDownLatch(1);
// Send a dummy request because @levlam is too lazy to fix race conditions in a better way // Send a dummy request because @levlam is too lazy to fix race conditions in a better way
this.send(new TdApi.GetAuthorizationState()).subscribe(new Subscriber<TdApi.Object>() { this.send(new TdApi.GetAuthorizationState(), Duration.ofDays(1)).subscribe(new Subscriber<TdApi.Object>() {
@Override @Override
public void onSubscribe(Subscription subscription) { public void onSubscribe(Subscription subscription) {
subscription.request(1); subscription.request(1);
@ -224,7 +228,7 @@ public final class InternalReactiveClient implements ClientEventsHandler, Reacti
} }
@Override @Override
public Publisher<TdApi.Object> send(Function query) { public Publisher<TdApi.Object> send(Function query, Duration responseTimeout) {
return subscriber -> { return subscriber -> {
Subscription subscription = new Subscription() { Subscription subscription = new Subscription() {
@ -245,15 +249,30 @@ public final class InternalReactiveClient implements ClientEventsHandler, Reacti
); );
} else { } else {
long queryId = clientManager.getNextQueryId(); long queryId = clientManager.getNextQueryId();
// Handle timeout
ScheduledFuture<?> timeoutFuture = timers.schedule(() -> {
if (handlers.remove(queryId) != null) {
if (!cancelled) {
subscriber.onNext(new Error(408, "Request Timeout"));
}
if (!cancelled) {
subscriber.onComplete();
}
}
}, responseTimeout.toMillis(), TimeUnit.MILLISECONDS);
handlers.put(queryId, new Handler(result -> { handlers.put(queryId, new Handler(result -> {
if (!cancelled) { boolean timeoutCancelled = timeoutFuture.cancel(false);
if (!cancelled && timeoutCancelled) {
subscriber.onNext(result); subscriber.onNext(result);
} }
if (!cancelled) { if (!cancelled && timeoutCancelled) {
subscriber.onComplete(); subscriber.onComplete();
} }
}, t -> { }, t -> {
if (!cancelled) { boolean timeoutCancelled = timeoutFuture.cancel(false);
if (!cancelled && timeoutCancelled) {
subscriber.onError(t); subscriber.onError(t);
} }
})); }));