Log timed out queries
This commit is contained in:
parent
2ab6c2a5e1
commit
606335512f
@ -9,6 +9,7 @@ import it.tdlight.jni.TdApi.Error;
|
|||||||
import it.tdlight.jni.TdApi.Function;
|
import it.tdlight.jni.TdApi.Function;
|
||||||
import java.time.Duration;
|
import java.time.Duration;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
|
import java.util.Set;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
@ -31,6 +32,7 @@ public final class InternalReactiveClient implements ClientEventsHandler, Reacti
|
|||||||
private static final Marker TG_MARKER = MarkerFactory.getMarker("TG");
|
private static final Marker TG_MARKER = MarkerFactory.getMarker("TG");
|
||||||
private static final Logger logger = LoggerFactory.getLogger(InternalReactiveClient.class);
|
private static final Logger logger = LoggerFactory.getLogger(InternalReactiveClient.class);
|
||||||
private final ConcurrentHashMap<Long, Handler> handlers = new ConcurrentHashMap<>();
|
private final ConcurrentHashMap<Long, Handler> handlers = new ConcurrentHashMap<>();
|
||||||
|
private final Set<Long> timedOutHandlers = new ConcurrentHashMap<Long, Object>().keySet(new Object());
|
||||||
private final ScheduledExecutorService timers = Executors.newSingleThreadScheduledExecutor();
|
private final ScheduledExecutorService timers = Executors.newSingleThreadScheduledExecutor();
|
||||||
private final ConcurrentLinkedQueue<ReactiveItem> backpressureQueue = new ConcurrentLinkedQueue<>();
|
private final ConcurrentLinkedQueue<ReactiveItem> backpressureQueue = new ConcurrentLinkedQueue<>();
|
||||||
private final ExceptionHandler defaultExceptionHandler;
|
private final ExceptionHandler defaultExceptionHandler;
|
||||||
@ -112,9 +114,14 @@ public final class InternalReactiveClient implements ClientEventsHandler, Reacti
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if (timedOutHandlers.remove(eventId)) {
|
||||||
|
logger.trace(TG_MARKER, "Received event id \"{}\", but the event has been dropped because it"
|
||||||
|
+ "timed out some time ago! {}", eventId, event);
|
||||||
|
} else {
|
||||||
logger.error(TG_MARKER, "Unknown event id \"{}\", the event has been dropped! {}", eventId, event);
|
logger.error(TG_MARKER, "Unknown event id \"{}\", the event has been dropped! {}", eventId, event);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Handles a response or an update
|
* Handles a response or an update
|
||||||
@ -261,8 +268,11 @@ public final class InternalReactiveClient implements ClientEventsHandler, Reacti
|
|||||||
|
|
||||||
// Handle timeout
|
// Handle timeout
|
||||||
ScheduledFuture<?> timeoutFuture = timers.schedule(() -> {
|
ScheduledFuture<?> timeoutFuture = timers.schedule(() -> {
|
||||||
|
logger.trace(TG_MARKER, "Client {} timed out on query id {}: {}", clientId, queryId, query);
|
||||||
if (handlers.remove(queryId) != null) {
|
if (handlers.remove(queryId) != null) {
|
||||||
if (!cancelled) {
|
if (!cancelled) {
|
||||||
|
timedOutHandlers.add(queryId);
|
||||||
|
|
||||||
subscriber.onNext(new Error(408, "Request Timeout"));
|
subscriber.onNext(new Error(408, "Request Timeout"));
|
||||||
}
|
}
|
||||||
if (!cancelled) {
|
if (!cancelled) {
|
||||||
@ -272,6 +282,8 @@ public final class InternalReactiveClient implements ClientEventsHandler, Reacti
|
|||||||
}, responseTimeout.toMillis(), TimeUnit.MILLISECONDS);
|
}, responseTimeout.toMillis(), TimeUnit.MILLISECONDS);
|
||||||
|
|
||||||
handlers.put(queryId, new Handler(result -> {
|
handlers.put(queryId, new Handler(result -> {
|
||||||
|
logger.trace(TG_MARKER, "Client {} is replying the query id {}: request: {} result: {}", clientId,
|
||||||
|
queryId, query, result);
|
||||||
boolean timeoutCancelled = timeoutFuture.cancel(false);
|
boolean timeoutCancelled = timeoutFuture.cancel(false);
|
||||||
if (!cancelled && timeoutCancelled) {
|
if (!cancelled && timeoutCancelled) {
|
||||||
subscriber.onNext(result);
|
subscriber.onNext(result);
|
||||||
@ -280,6 +292,8 @@ public final class InternalReactiveClient implements ClientEventsHandler, Reacti
|
|||||||
subscriber.onComplete();
|
subscriber.onComplete();
|
||||||
}
|
}
|
||||||
}, t -> {
|
}, t -> {
|
||||||
|
logger.trace(TG_MARKER, "Client {} has failed the query id {}: {}", clientId,
|
||||||
|
queryId, query);
|
||||||
boolean timeoutCancelled = timeoutFuture.cancel(false);
|
boolean timeoutCancelled = timeoutFuture.cancel(false);
|
||||||
if (!cancelled && timeoutCancelled) {
|
if (!cancelled && timeoutCancelled) {
|
||||||
subscriber.onError(t);
|
subscriber.onError(t);
|
||||||
|
Loading…
Reference in New Issue
Block a user