diff --git a/src/main/java/it/tdlight/common/internal/InternalReactiveClient.java b/src/main/java/it/tdlight/common/internal/InternalReactiveClient.java index e24e184..a6faf09 100644 --- a/src/main/java/it/tdlight/common/internal/InternalReactiveClient.java +++ b/src/main/java/it/tdlight/common/internal/InternalReactiveClient.java @@ -9,6 +9,7 @@ import it.tdlight.jni.TdApi.Error; import it.tdlight.jni.TdApi.Function; import java.time.Duration; import java.util.Objects; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CountDownLatch; @@ -31,6 +32,7 @@ public final class InternalReactiveClient implements ClientEventsHandler, Reacti private static final Marker TG_MARKER = MarkerFactory.getMarker("TG"); private static final Logger logger = LoggerFactory.getLogger(InternalReactiveClient.class); private final ConcurrentHashMap handlers = new ConcurrentHashMap<>(); + private final Set timedOutHandlers = new ConcurrentHashMap().keySet(new Object()); private final ScheduledExecutorService timers = Executors.newSingleThreadScheduledExecutor(); private final ConcurrentLinkedQueue backpressureQueue = new ConcurrentLinkedQueue<>(); private final ExceptionHandler defaultExceptionHandler; @@ -112,7 +114,12 @@ public final class InternalReactiveClient implements ClientEventsHandler, Reacti return; } } - logger.error(TG_MARKER, "Unknown event id \"{}\", the event has been dropped! {}", eventId, event); + if (timedOutHandlers.remove(eventId)) { + logger.trace(TG_MARKER, "Received event id \"{}\", but the event has been dropped because it" + + "timed out some time ago! {}", eventId, event); + } else { + logger.error(TG_MARKER, "Unknown event id \"{}\", the event has been dropped! {}", eventId, event); + } } } @@ -261,8 +268,11 @@ public final class InternalReactiveClient implements ClientEventsHandler, Reacti // Handle timeout ScheduledFuture timeoutFuture = timers.schedule(() -> { + logger.trace(TG_MARKER, "Client {} timed out on query id {}: {}", clientId, queryId, query); if (handlers.remove(queryId) != null) { if (!cancelled) { + timedOutHandlers.add(queryId); + subscriber.onNext(new Error(408, "Request Timeout")); } if (!cancelled) { @@ -272,6 +282,8 @@ public final class InternalReactiveClient implements ClientEventsHandler, Reacti }, responseTimeout.toMillis(), TimeUnit.MILLISECONDS); handlers.put(queryId, new Handler(result -> { + logger.trace(TG_MARKER, "Client {} is replying the query id {}: request: {} result: {}", clientId, + queryId, query, result); boolean timeoutCancelled = timeoutFuture.cancel(false); if (!cancelled && timeoutCancelled) { subscriber.onNext(result); @@ -280,6 +292,8 @@ public final class InternalReactiveClient implements ClientEventsHandler, Reacti subscriber.onComplete(); } }, t -> { + logger.trace(TG_MARKER, "Client {} has failed the query id {}: {}", clientId, + queryId, query); boolean timeoutCancelled = timeoutFuture.cancel(false); if (!cancelled && timeoutCancelled) { subscriber.onError(t);