This commit is contained in:
Andrea Cavalli 2021-04-18 23:27:23 +02:00
parent 0f1c2ed0fa
commit a855b4fdba
2 changed files with 16 additions and 6 deletions

View File

@ -43,7 +43,7 @@ public class InternalClientManager implements AutoCloseable {
ClientEventsHandler handler = registeredClientEventHandlers.get(clientId); ClientEventsHandler handler = registeredClientEventHandlers.get(clientId);
if (handler != null) { if (handler != null) {
handler.handleEvents(isClosed, clientEventIds, clientEvents); handler.handleEvents(isClosed, clientEventIds, clientEvents);
} else { } else {
java.util.List<Entry<Long, TdApi.Object>> droppedEvents = getEffectivelyDroppedEvents(clientEventIds, clientEvents); java.util.List<Entry<Long, TdApi.Object>> droppedEvents = getEffectivelyDroppedEvents(clientEventIds, clientEvents);

View File

@ -76,7 +76,7 @@ public class InternalReactiveClient implements ClientEventsHandler, ReactiveTele
handleClose(); handleClose();
} }
} }
} }
private void handleClose() { private void handleClose() {
handlers.forEach((eventId, handler) -> { handlers.forEach((eventId, handler) -> {
@ -231,10 +231,11 @@ public class InternalReactiveClient implements ClientEventsHandler, ReactiveTele
var subscription = new Subscription() { var subscription = new Subscription() {
private final AtomicBoolean alreadyRequested = new AtomicBoolean(false); private final AtomicBoolean alreadyRequested = new AtomicBoolean(false);
private volatile boolean cancelled = false;
@Override @Override
public void request(long n) { public void request(long n) {
if (alreadyRequested.compareAndSet(false, true)) { if (n > 0 && alreadyRequested.compareAndSet(false, true)) {
if (isClosedAndMaybeThrow(query)) { if (isClosedAndMaybeThrow(query)) {
logger.trace("Client {} is already closed, sending \"Ok\" to: {}", clientId, query); logger.trace("Client {} is already closed, sending \"Ok\" to: {}", clientId, query);
subscriber.onNext(new TdApi.Ok()); subscriber.onNext(new TdApi.Ok());
@ -247,9 +248,17 @@ public class InternalReactiveClient implements ClientEventsHandler, ReactiveTele
} else { } else {
long queryId = clientManager.getNextQueryId(); long queryId = clientManager.getNextQueryId();
handlers.put(queryId, new Handler(result -> { handlers.put(queryId, new Handler(result -> {
subscriber.onNext(result); if (!cancelled) {
subscriber.onComplete(); subscriber.onNext(result);
}, subscriber::onError)); }
if (!cancelled) {
subscriber.onComplete();
}
}, t -> {
if (!cancelled) {
subscriber.onError(t);
}
}));
logger.trace("Client {} is requesting with query id {}: {}", clientId, queryId, query); logger.trace("Client {} is requesting with query id {}: {}", clientId, queryId, query);
NativeClientAccess.send(clientId, queryId, query); NativeClientAccess.send(clientId, queryId, query);
logger.trace("Client {} requested with query id {}: {}", clientId, queryId, query); logger.trace("Client {} requested with query id {}: {}", clientId, queryId, query);
@ -261,6 +270,7 @@ public class InternalReactiveClient implements ClientEventsHandler, ReactiveTele
@Override @Override
public void cancel() { public void cancel() {
cancelled = true;
} }
}; };
subscriber.onSubscribe(subscription); subscriber.onSubscribe(subscription);