diff --git a/src/main/java/it/tdlight/common/InternalClientManager.java b/src/main/java/it/tdlight/common/InternalClientManager.java index ce3dc46..6b6d409 100644 --- a/src/main/java/it/tdlight/common/InternalClientManager.java +++ b/src/main/java/it/tdlight/common/InternalClientManager.java @@ -56,7 +56,9 @@ public class InternalClientManager implements AutoCloseable { } if (isClosed) { + logger.trace("Removing Client {} from event handlers", clientId); registeredClientEventHandlers.remove(clientId); + logger.trace("Removed Client {} from event handlers", clientId); } } diff --git a/src/main/java/it/tdlight/common/InternalReactiveClient.java b/src/main/java/it/tdlight/common/InternalReactiveClient.java index c14ae03..ac008f3 100644 --- a/src/main/java/it/tdlight/common/InternalReactiveClient.java +++ b/src/main/java/it/tdlight/common/InternalReactiveClient.java @@ -7,6 +7,7 @@ import it.tdlight.jni.TdApi.Object; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import org.reactivestreams.Publisher; @@ -17,7 +18,8 @@ import org.slf4j.LoggerFactory; public class InternalReactiveClient implements ClientEventsHandler, ReactiveTelegramClient { - private static final Logger logger = LoggerFactory.getLogger(TelegramClient.class); + private static boolean ENABLE_BACKPRESSURE_QUEUE = false; + private static final Logger logger = LoggerFactory.getLogger(InternalReactiveClient.class); private final ConcurrentHashMap handlers = new ConcurrentHashMap(); private final ConcurrentLinkedQueue backpressureQueue = new ConcurrentLinkedQueue<>(); private final ExceptionHandler defaultExceptionHandler; @@ -85,6 +87,11 @@ public class InternalReactiveClient implements ClientEventsHandler, ReactiveTele private void handleResponse(long eventId, Object event, Handler handler) { if (handler != null) { try { + if (eventId == 0) { + logger.trace("Client {} received an event: {}", clientId, event); + } else { + logger.trace("Client {} received a response for query id {}: {}", clientId, eventId, event); + } handler.getResultHandler().onResult(event); } catch (Throwable cause) { handleException(handler.getExceptionHandler(), cause); @@ -121,12 +128,10 @@ public class InternalReactiveClient implements ClientEventsHandler, ReactiveTele @Override public void request(long n) { - if (!backpressureQueue.isEmpty()) { - while (!backpressureQueue.isEmpty() && n > 0) { - var item = backpressureQueue.poll(); - subscriber.onNext(item); - n--; - } + ReactiveItem item; + while (n > 0 && (item = backpressureQueue.poll()) != null) { + subscriber.onNext(item); + n--; } if (n > 0) { requested.addAndGet(n); @@ -178,9 +183,12 @@ public class InternalReactiveClient implements ClientEventsHandler, ReactiveTele @SuppressWarnings("ReactiveStreamsSubscriberImplementation") public void createAndRegisterClient() { if (clientId != null) throw new UnsupportedOperationException("Can't initialize the same client twice!"); + logger.debug("Creating new client"); clientId = NativeClientAccess.create(); + logger.debug("Registering new client {}", clientId); clientManager.registerClient(clientId, this); - logger.info("Registered new client {}", clientId); + + CountDownLatch registeredClient = new CountDownLatch(1); // Send a dummy request because @levlam is too lazy to fix race conditions in a better way this.send(new TdApi.GetAuthorizationState()).subscribe(new Subscriber<>() { @@ -191,66 +199,67 @@ public class InternalReactiveClient implements ClientEventsHandler, ReactiveTele @Override public void onNext(Object item) { - + registeredClient.countDown(); } @Override public void onError(Throwable throwable) { - + registeredClient.countDown(); } @Override public void onComplete() { - + registeredClient.countDown(); } }); + + try { + registeredClient.await(); + } catch (InterruptedException e) { + throw new IllegalStateException(e); + } + logger.debug("Registered new client {}", clientId); } @Override public Publisher send(Function query) { - AtomicBoolean alreadySubscribed = new AtomicBoolean(false); - AtomicBoolean cancelled = new AtomicBoolean(false); return subscriber -> { - if (alreadySubscribed.compareAndSet(false, true)) { - AtomicBoolean alreadyRequested = new AtomicBoolean(false); - var subscription = new Subscription() { + var subscription = new Subscription() { - @Override - public void request(long n) { - if (alreadyRequested.compareAndSet(false, true) && !cancelled.get()) { - if (isClosedAndMaybeThrow(query)) { - subscriber.onNext(new TdApi.Ok()); + private final AtomicBoolean alreadyRequested = new AtomicBoolean(false); + + @Override + public void request(long n) { + if (alreadyRequested.compareAndSet(false, true)) { + if (isClosedAndMaybeThrow(query)) { + logger.trace("Client {} is already closed, sending \"Ok\" to: {}", clientId, query); + subscriber.onNext(new TdApi.Ok()); + subscriber.onComplete(); + } else if (clientId == null) { + logger.trace("Can't send a request to TDLib before calling \"createAndRegisterClient\" function!"); + subscriber.onError( + new IllegalStateException("Can't send a request to TDLib before calling \"createAndRegisterClient\" function!") + ); + } else { + long queryId = clientManager.getNextQueryId(); + handlers.put(queryId, new Handler(result -> { + subscriber.onNext(result); subscriber.onComplete(); - } else if (clientId == null) { - subscriber.onError( - new IllegalStateException("Can't send a request to TDLib before calling \"createAndRegisterClient\" function!") - ); - } else { - long queryId = clientManager.getNextQueryId(); - handlers.put(queryId, new Handler(result -> { - if (!cancelled.get()) { - subscriber.onNext(result); - subscriber.onComplete(); - } - }, throwable -> { - if (!cancelled.get()) { - subscriber.onError(throwable); - } - })); - NativeClientAccess.send(clientId, queryId, query); - } + }, subscriber::onError)); + logger.trace("Client {} is requesting with query id {}: {}", clientId, queryId, query); + NativeClientAccess.send(clientId, queryId, query); + logger.trace("Client {} requested with query id {}: {}", clientId, queryId, query); } + } else { + logger.debug("Client {} tried to request again the same request, ignored: {}", clientId, query); } + } - @Override - public void cancel() { - cancelled.set(true); - } - }; - subscriber.onSubscribe(subscription); - } else { - throw new IllegalStateException("Already subscribed"); - } + @Override + public void cancel() { + } + }; + subscriber.onSubscribe(subscription); }; }