Add more trace logs, fix some edge cases

This commit is contained in:
Andrea Cavalli 2021-03-31 04:33:39 +02:00
parent 74acca9606
commit d666363b41
2 changed files with 59 additions and 48 deletions

View File

@ -56,7 +56,9 @@ public class InternalClientManager implements AutoCloseable {
} }
if (isClosed) { if (isClosed) {
logger.trace("Removing Client {} from event handlers", clientId);
registeredClientEventHandlers.remove(clientId); registeredClientEventHandlers.remove(clientId);
logger.trace("Removed Client {} from event handlers", clientId);
} }
} }

View File

@ -7,6 +7,7 @@ import it.tdlight.jni.TdApi.Object;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import org.reactivestreams.Publisher; import org.reactivestreams.Publisher;
@ -17,7 +18,8 @@ import org.slf4j.LoggerFactory;
public class InternalReactiveClient implements ClientEventsHandler, ReactiveTelegramClient { public class InternalReactiveClient implements ClientEventsHandler, ReactiveTelegramClient {
private static final Logger logger = LoggerFactory.getLogger(TelegramClient.class); private static boolean ENABLE_BACKPRESSURE_QUEUE = false;
private static final Logger logger = LoggerFactory.getLogger(InternalReactiveClient.class);
private final ConcurrentHashMap<Long, Handler> handlers = new ConcurrentHashMap<Long, Handler>(); private final ConcurrentHashMap<Long, Handler> handlers = new ConcurrentHashMap<Long, Handler>();
private final ConcurrentLinkedQueue<ReactiveItem> backpressureQueue = new ConcurrentLinkedQueue<>(); private final ConcurrentLinkedQueue<ReactiveItem> backpressureQueue = new ConcurrentLinkedQueue<>();
private final ExceptionHandler defaultExceptionHandler; private final ExceptionHandler defaultExceptionHandler;
@ -85,6 +87,11 @@ public class InternalReactiveClient implements ClientEventsHandler, ReactiveTele
private void handleResponse(long eventId, Object event, Handler handler) { private void handleResponse(long eventId, Object event, Handler handler) {
if (handler != null) { if (handler != null) {
try { try {
if (eventId == 0) {
logger.trace("Client {} received an event: {}", clientId, event);
} else {
logger.trace("Client {} received a response for query id {}: {}", clientId, eventId, event);
}
handler.getResultHandler().onResult(event); handler.getResultHandler().onResult(event);
} catch (Throwable cause) { } catch (Throwable cause) {
handleException(handler.getExceptionHandler(), cause); handleException(handler.getExceptionHandler(), cause);
@ -121,12 +128,10 @@ public class InternalReactiveClient implements ClientEventsHandler, ReactiveTele
@Override @Override
public void request(long n) { public void request(long n) {
if (!backpressureQueue.isEmpty()) { ReactiveItem item;
while (!backpressureQueue.isEmpty() && n > 0) { while (n > 0 && (item = backpressureQueue.poll()) != null) {
var item = backpressureQueue.poll(); subscriber.onNext(item);
subscriber.onNext(item); n--;
n--;
}
} }
if (n > 0) { if (n > 0) {
requested.addAndGet(n); requested.addAndGet(n);
@ -178,9 +183,12 @@ public class InternalReactiveClient implements ClientEventsHandler, ReactiveTele
@SuppressWarnings("ReactiveStreamsSubscriberImplementation") @SuppressWarnings("ReactiveStreamsSubscriberImplementation")
public void createAndRegisterClient() { public void createAndRegisterClient() {
if (clientId != null) throw new UnsupportedOperationException("Can't initialize the same client twice!"); if (clientId != null) throw new UnsupportedOperationException("Can't initialize the same client twice!");
logger.debug("Creating new client");
clientId = NativeClientAccess.create(); clientId = NativeClientAccess.create();
logger.debug("Registering new client {}", clientId);
clientManager.registerClient(clientId, this); clientManager.registerClient(clientId, this);
logger.info("Registered new client {}", clientId);
CountDownLatch registeredClient = new CountDownLatch(1);
// Send a dummy request because @levlam is too lazy to fix race conditions in a better way // Send a dummy request because @levlam is too lazy to fix race conditions in a better way
this.send(new TdApi.GetAuthorizationState()).subscribe(new Subscriber<>() { this.send(new TdApi.GetAuthorizationState()).subscribe(new Subscriber<>() {
@ -191,66 +199,67 @@ public class InternalReactiveClient implements ClientEventsHandler, ReactiveTele
@Override @Override
public void onNext(Object item) { public void onNext(Object item) {
registeredClient.countDown();
} }
@Override @Override
public void onError(Throwable throwable) { public void onError(Throwable throwable) {
registeredClient.countDown();
} }
@Override @Override
public void onComplete() { public void onComplete() {
registeredClient.countDown();
} }
}); });
try {
registeredClient.await();
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
logger.debug("Registered new client {}", clientId);
} }
@Override @Override
public Publisher<TdApi.Object> send(Function query) { public Publisher<TdApi.Object> send(Function query) {
AtomicBoolean alreadySubscribed = new AtomicBoolean(false);
AtomicBoolean cancelled = new AtomicBoolean(false);
return subscriber -> { return subscriber -> {
if (alreadySubscribed.compareAndSet(false, true)) { var subscription = new Subscription() {
AtomicBoolean alreadyRequested = new AtomicBoolean(false);
var subscription = new Subscription() {
@Override private final AtomicBoolean alreadyRequested = new AtomicBoolean(false);
public void request(long n) {
if (alreadyRequested.compareAndSet(false, true) && !cancelled.get()) { @Override
if (isClosedAndMaybeThrow(query)) { public void request(long n) {
subscriber.onNext(new TdApi.Ok()); if (alreadyRequested.compareAndSet(false, true)) {
if (isClosedAndMaybeThrow(query)) {
logger.trace("Client {} is already closed, sending \"Ok\" to: {}", clientId, query);
subscriber.onNext(new TdApi.Ok());
subscriber.onComplete();
} else if (clientId == null) {
logger.trace("Can't send a request to TDLib before calling \"createAndRegisterClient\" function!");
subscriber.onError(
new IllegalStateException("Can't send a request to TDLib before calling \"createAndRegisterClient\" function!")
);
} else {
long queryId = clientManager.getNextQueryId();
handlers.put(queryId, new Handler(result -> {
subscriber.onNext(result);
subscriber.onComplete(); subscriber.onComplete();
} else if (clientId == null) { }, subscriber::onError));
subscriber.onError( logger.trace("Client {} is requesting with query id {}: {}", clientId, queryId, query);
new IllegalStateException("Can't send a request to TDLib before calling \"createAndRegisterClient\" function!") NativeClientAccess.send(clientId, queryId, query);
); logger.trace("Client {} requested with query id {}: {}", clientId, queryId, query);
} else {
long queryId = clientManager.getNextQueryId();
handlers.put(queryId, new Handler(result -> {
if (!cancelled.get()) {
subscriber.onNext(result);
subscriber.onComplete();
}
}, throwable -> {
if (!cancelled.get()) {
subscriber.onError(throwable);
}
}));
NativeClientAccess.send(clientId, queryId, query);
}
} }
} else {
logger.debug("Client {} tried to request again the same request, ignored: {}", clientId, query);
} }
}
@Override @Override
public void cancel() { public void cancel() {
cancelled.set(true); }
} };
}; subscriber.onSubscribe(subscription);
subscriber.onSubscribe(subscription);
} else {
throw new IllegalStateException("Already subscribed");
}
}; };
} }