Reimplement reactive client using simple listeners
This commit is contained in:
parent
606335512f
commit
fceec2a6ec
|
@ -1,95 +0,0 @@
|
||||||
package it.tdlight.common;
|
|
||||||
|
|
||||||
import it.tdlight.jni.TdApi;
|
|
||||||
import java.util.Objects;
|
|
||||||
|
|
||||||
public final class ReactiveItem {
|
|
||||||
|
|
||||||
private final TdApi.Object item;
|
|
||||||
private final Throwable ex;
|
|
||||||
private final boolean updateException;
|
|
||||||
|
|
||||||
private ReactiveItem(Throwable ex, boolean updateException) {
|
|
||||||
this.item = null;
|
|
||||||
this.ex = Objects.requireNonNull(ex);
|
|
||||||
this.updateException = updateException;
|
|
||||||
}
|
|
||||||
|
|
||||||
private ReactiveItem(TdApi.Object item) {
|
|
||||||
this.item = Objects.requireNonNull(item);
|
|
||||||
this.ex = null;
|
|
||||||
this.updateException = false;
|
|
||||||
}
|
|
||||||
|
|
||||||
public static ReactiveItem ofUpdateException(Throwable ex) {
|
|
||||||
return new ReactiveItem(ex, true);
|
|
||||||
}
|
|
||||||
|
|
||||||
public static ReactiveItem ofHandleException(Throwable ex) {
|
|
||||||
return new ReactiveItem(ex, false);
|
|
||||||
}
|
|
||||||
|
|
||||||
public static ReactiveItem ofUpdate(TdApi.Object item) {
|
|
||||||
return new ReactiveItem(item);
|
|
||||||
}
|
|
||||||
|
|
||||||
public boolean isUpdateException() {
|
|
||||||
return ex != null && updateException;
|
|
||||||
}
|
|
||||||
|
|
||||||
public boolean isHandleException() {
|
|
||||||
return ex != null && !updateException;
|
|
||||||
}
|
|
||||||
|
|
||||||
public boolean isUpdate() {
|
|
||||||
return ex == null;
|
|
||||||
}
|
|
||||||
|
|
||||||
public TdApi.Object getUpdate() {
|
|
||||||
return Objects.requireNonNull(item, "This is not an update");
|
|
||||||
}
|
|
||||||
|
|
||||||
public Throwable getUpdateException() {
|
|
||||||
if (!updateException) {
|
|
||||||
throw new IllegalStateException("This is not an update exception");
|
|
||||||
}
|
|
||||||
return Objects.requireNonNull(ex, "This is not an update exception");
|
|
||||||
}
|
|
||||||
|
|
||||||
public Throwable getHandleException() {
|
|
||||||
if (updateException) {
|
|
||||||
throw new IllegalStateException("This is not an handle exception");
|
|
||||||
}
|
|
||||||
return Objects.requireNonNull(ex, "This is not an handle exception");
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean equals(Object o) {
|
|
||||||
if (this == o) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
if (o == null || getClass() != o.getClass()) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
ReactiveItem that = (ReactiveItem) o;
|
|
||||||
return updateException == that.updateException && Objects.equals(item, that.item) && Objects.equals(ex, that.ex);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public int hashCode() {
|
|
||||||
return Objects.hash(item, ex, updateException);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String toString() {
|
|
||||||
if (ex != null) {
|
|
||||||
if (updateException) {
|
|
||||||
return "UpdateException(" + ex + ")";
|
|
||||||
} else {
|
|
||||||
return "HandleException(" + ex + ")";
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
return "" + item;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -5,7 +5,7 @@ import java.time.Duration;
|
||||||
import org.reactivestreams.Publisher;
|
import org.reactivestreams.Publisher;
|
||||||
|
|
||||||
@SuppressWarnings("ReactiveStreamsPublisherImplementation")
|
@SuppressWarnings("ReactiveStreamsPublisherImplementation")
|
||||||
public interface ReactiveTelegramClient extends Publisher<ReactiveItem> {
|
public interface ReactiveTelegramClient {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates and registers the client
|
* Creates and registers the client
|
||||||
|
@ -30,4 +30,16 @@ public interface ReactiveTelegramClient extends Publisher<ReactiveItem> {
|
||||||
* @throws NullPointerException if query is null.
|
* @throws NullPointerException if query is null.
|
||||||
*/
|
*/
|
||||||
TdApi.Object execute(TdApi.Function query);
|
TdApi.Object execute(TdApi.Function query);
|
||||||
|
|
||||||
|
void setListener(SignalListener listener);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Send close signal but don't remove the listener
|
||||||
|
*/
|
||||||
|
void cancel();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Remove the listener
|
||||||
|
*/
|
||||||
|
void dispose();
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,86 @@
|
||||||
|
package it.tdlight.common;
|
||||||
|
|
||||||
|
import it.tdlight.jni.TdApi;
|
||||||
|
import java.util.Objects;
|
||||||
|
import java.util.StringJoiner;
|
||||||
|
|
||||||
|
public final class Signal {
|
||||||
|
|
||||||
|
private final TdApi.Object item;
|
||||||
|
private final Throwable ex;
|
||||||
|
private final SignalType signalType;
|
||||||
|
|
||||||
|
private Signal(SignalType signalType, TdApi.Object item, Throwable ex) {
|
||||||
|
this.signalType = signalType;
|
||||||
|
this.item = item;
|
||||||
|
this.ex = ex;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static Signal ofUpdateException(Throwable ex) {
|
||||||
|
return new Signal(SignalType.EXCEPTION, null, Objects.requireNonNull(ex, "Exception is null"));
|
||||||
|
}
|
||||||
|
|
||||||
|
public static Signal ofUpdate(TdApi.Object item) {
|
||||||
|
return new Signal(SignalType.UPDATE, Objects.requireNonNull(item, "Update is null"), null);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static Signal ofClosed() {
|
||||||
|
return new Signal(SignalType.CLOSE, null, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isUpdate() {
|
||||||
|
return signalType == SignalType.UPDATE;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isException() {
|
||||||
|
return signalType == SignalType.EXCEPTION;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isClosed() {
|
||||||
|
return signalType == SignalType.CLOSE;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isNotClosed() {
|
||||||
|
return signalType != SignalType.CLOSE;
|
||||||
|
}
|
||||||
|
|
||||||
|
public TdApi.Object getUpdate() {
|
||||||
|
return Objects.requireNonNull(item, "This is not an update");
|
||||||
|
}
|
||||||
|
|
||||||
|
public Throwable getException() {
|
||||||
|
return Objects.requireNonNull(ex, "This is not an exception");
|
||||||
|
}
|
||||||
|
|
||||||
|
public void getClosed() {
|
||||||
|
if (signalType != SignalType.CLOSE) {
|
||||||
|
throw new IllegalStateException("Expected signal type closed, but the type is " + signalType);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean equals(Object o) {
|
||||||
|
if (this == o) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
if (o == null || getClass() != o.getClass()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
Signal signal = (Signal) o;
|
||||||
|
return Objects.equals(item, signal.item) && Objects.equals(ex, signal.ex) && signalType == signal.signalType;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int hashCode() {
|
||||||
|
return Objects.hash(item, ex, signalType);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return new StringJoiner(", ", Signal.class.getSimpleName() + "[", "]")
|
||||||
|
.add("item=" + item)
|
||||||
|
.add("ex=" + ex)
|
||||||
|
.add("signalType=" + signalType)
|
||||||
|
.toString();
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,6 @@
|
||||||
|
package it.tdlight.common;
|
||||||
|
|
||||||
|
public interface SignalListener {
|
||||||
|
|
||||||
|
void onSignal(Signal signal);
|
||||||
|
}
|
|
@ -0,0 +1,7 @@
|
||||||
|
package it.tdlight.common;
|
||||||
|
|
||||||
|
public enum SignalType {
|
||||||
|
UPDATE,
|
||||||
|
EXCEPTION,
|
||||||
|
CLOSE
|
||||||
|
}
|
|
@ -2,8 +2,9 @@ package it.tdlight.common.internal;
|
||||||
|
|
||||||
import it.tdlight.common.ClientEventsHandler;
|
import it.tdlight.common.ClientEventsHandler;
|
||||||
import it.tdlight.common.ExceptionHandler;
|
import it.tdlight.common.ExceptionHandler;
|
||||||
import it.tdlight.common.ReactiveItem;
|
import it.tdlight.common.Signal;
|
||||||
import it.tdlight.common.ReactiveTelegramClient;
|
import it.tdlight.common.ReactiveTelegramClient;
|
||||||
|
import it.tdlight.common.SignalListener;
|
||||||
import it.tdlight.jni.TdApi;
|
import it.tdlight.jni.TdApi;
|
||||||
import it.tdlight.jni.TdApi.Error;
|
import it.tdlight.jni.TdApi.Error;
|
||||||
import it.tdlight.jni.TdApi.Function;
|
import it.tdlight.jni.TdApi.Function;
|
||||||
|
@ -12,15 +13,13 @@ import java.util.Objects;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||||
import java.util.concurrent.CountDownLatch;
|
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.ScheduledExecutorService;
|
import java.util.concurrent.ScheduledExecutorService;
|
||||||
import java.util.concurrent.ScheduledFuture;
|
import java.util.concurrent.ScheduledFuture;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
import org.reactivestreams.Publisher;
|
import org.reactivestreams.Publisher;
|
||||||
import org.reactivestreams.Subscriber;
|
|
||||||
import org.reactivestreams.Subscription;
|
import org.reactivestreams.Subscription;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
@ -31,42 +30,24 @@ public final class InternalReactiveClient implements ClientEventsHandler, Reacti
|
||||||
|
|
||||||
private static final Marker TG_MARKER = MarkerFactory.getMarker("TG");
|
private static final Marker TG_MARKER = MarkerFactory.getMarker("TG");
|
||||||
private static final Logger logger = LoggerFactory.getLogger(InternalReactiveClient.class);
|
private static final Logger logger = LoggerFactory.getLogger(InternalReactiveClient.class);
|
||||||
|
private static final Handler EMPTY_HANDLER = new Handler(r -> {}, ex -> {});
|
||||||
|
|
||||||
private final ConcurrentHashMap<Long, Handler> handlers = new ConcurrentHashMap<>();
|
private final ConcurrentHashMap<Long, Handler> handlers = new ConcurrentHashMap<>();
|
||||||
private final Set<Long> timedOutHandlers = new ConcurrentHashMap<Long, Object>().keySet(new Object());
|
private final Set<Long> timedOutHandlers = new ConcurrentHashMap<Long, Object>().keySet(new Object());
|
||||||
private final ScheduledExecutorService timers = Executors.newSingleThreadScheduledExecutor();
|
private final ScheduledExecutorService timers = Executors.newSingleThreadScheduledExecutor();
|
||||||
private final ConcurrentLinkedQueue<ReactiveItem> backpressureQueue = new ConcurrentLinkedQueue<>();
|
|
||||||
private final ExceptionHandler defaultExceptionHandler;
|
private final ExceptionHandler defaultExceptionHandler;
|
||||||
private final Handler updateHandler;
|
private final Handler updateHandler;
|
||||||
|
|
||||||
private volatile Integer clientId = null;
|
private volatile Integer clientId = null;
|
||||||
private final InternalClientManager clientManager;
|
private final InternalClientManager clientManager;
|
||||||
|
|
||||||
private final AtomicBoolean isClosed = new AtomicBoolean();
|
private final AtomicBoolean alreadyReceivedClosed = new AtomicBoolean();
|
||||||
private final AtomicBoolean updatesAlreadySubscribed = new AtomicBoolean(false);
|
private final AtomicReference<SignalListener> signalListener = new AtomicReference<>(new ReplayStartupUpdatesListener());
|
||||||
private final AtomicLong requested = new AtomicLong(0);
|
|
||||||
private volatile Subscriber<? super ReactiveItem> subscriber;
|
|
||||||
|
|
||||||
public InternalReactiveClient(InternalClientManager clientManager) {
|
public InternalReactiveClient(InternalClientManager clientManager) {
|
||||||
this.clientManager = clientManager;
|
this.clientManager = clientManager;
|
||||||
this.updateHandler = new Handler(
|
this.updateHandler = new Handler(this::onUpdateFromHandler, this::onUpdateException);
|
||||||
updateItem -> {
|
this.defaultExceptionHandler = this::onDefaultException;
|
||||||
ReactiveItem item = ReactiveItem.ofUpdate(updateItem);
|
|
||||||
if (subscriber != null && requested.getAndUpdate(n -> n == 0 ? 0 : (n - 1)) > 0) {
|
|
||||||
subscriber.onNext(item);
|
|
||||||
} else {
|
|
||||||
backpressureQueue.add(item);
|
|
||||||
}
|
|
||||||
},
|
|
||||||
updateEx -> {
|
|
||||||
ReactiveItem item = ReactiveItem.ofUpdateException(updateEx);
|
|
||||||
if (subscriber != null && requested.getAndUpdate(n -> n == 0 ? 0 : (n - 1)) > 0) {
|
|
||||||
subscriber.onNext(item);
|
|
||||||
} else {
|
|
||||||
backpressureQueue.add(item);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
);
|
|
||||||
this.defaultExceptionHandler = ex -> backpressureQueue.add(ReactiveItem.ofHandleException(ex));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -81,15 +62,34 @@ public final class InternalReactiveClient implements ClientEventsHandler, Reacti
|
||||||
}
|
}
|
||||||
|
|
||||||
if (isClosed) {
|
if (isClosed) {
|
||||||
if (this.isClosed.compareAndSet(false, true)) {
|
if (this.alreadyReceivedClosed.compareAndSet(false, true)) {
|
||||||
handleClose();
|
handleClose();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This method will be called exactly once
|
||||||
|
*/
|
||||||
private void handleClose() {
|
private void handleClose() {
|
||||||
handlers.forEach((eventId, handler) -> handleResponse(eventId, new Error(500, "Instance closed"), handler));
|
TdApi.Error instanceClosedError = new Error(500, "Instance closed");
|
||||||
|
handlers.forEach((eventId, handler) -> this.handleResponse(eventId, instanceClosedError, handler));
|
||||||
handlers.clear();
|
handlers.clear();
|
||||||
|
this.timedOutHandlers.clear();
|
||||||
|
timers.shutdown();
|
||||||
|
try {
|
||||||
|
boolean terminated = timers.awaitTermination(1, TimeUnit.MINUTES);
|
||||||
|
if (!terminated) {
|
||||||
|
timers.shutdownNow();
|
||||||
|
}
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
logger.debug(TG_MARKER, "Interrupted", e);
|
||||||
|
}
|
||||||
|
SignalListener signalListener = this.signalListener.get();
|
||||||
|
// Close the signal listener if it still exists
|
||||||
|
if (signalListener != null) {
|
||||||
|
signalListener.onSignal(Signal.ofClosed());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -135,111 +135,21 @@ public final class InternalReactiveClient implements ClientEventsHandler, Reacti
|
||||||
if (exceptionHandler == null) {
|
if (exceptionHandler == null) {
|
||||||
exceptionHandler = defaultExceptionHandler;
|
exceptionHandler = defaultExceptionHandler;
|
||||||
}
|
}
|
||||||
if (exceptionHandler != null) {
|
try {
|
||||||
try {
|
exceptionHandler.onException(cause);
|
||||||
exceptionHandler.onException(cause);
|
} catch (Throwable ignored) {
|
||||||
} catch (Throwable ignored) {}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@SuppressWarnings({"ReactiveStreamsSubscriberImplementation", "Convert2Diamond"})
|
||||||
public void subscribe(Subscriber<? super ReactiveItem> subscriber) {
|
|
||||||
AtomicBoolean alreadyCompleted = new AtomicBoolean();
|
|
||||||
if (updatesAlreadySubscribed.compareAndSet(false, true)) {
|
|
||||||
Subscription subscription = new Subscription() {
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void request(long n) {
|
|
||||||
ReactiveItem item;
|
|
||||||
while (n > 0 && (item = backpressureQueue.poll()) != null) {
|
|
||||||
subscriber.onNext(item);
|
|
||||||
n--;
|
|
||||||
}
|
|
||||||
if (n > 0) {
|
|
||||||
requested.addAndGet(n);
|
|
||||||
}
|
|
||||||
if (isClosed.get()) {
|
|
||||||
if (alreadyCompleted.compareAndSet(false, true)) {
|
|
||||||
subscriber.onComplete();
|
|
||||||
logger.info(TG_MARKER, "Client closed {}", clientId);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@SuppressWarnings("ReactiveStreamsSubscriberImplementation")
|
|
||||||
@Override
|
|
||||||
public void cancel() {
|
|
||||||
if (!isClosed.get()) {
|
|
||||||
send(new TdApi.Close(), Duration.ofDays(1)).subscribe(new Subscriber<TdApi.Object>() {
|
|
||||||
@Override
|
|
||||||
public void onSubscribe(Subscription subscription) {
|
|
||||||
subscription.request(1);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onNext(TdApi.Object o) {
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onError(Throwable throwable) {
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onComplete() {
|
|
||||||
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
this.subscriber = subscriber;
|
|
||||||
|
|
||||||
subscriber.onSubscribe(subscription);
|
|
||||||
} else {
|
|
||||||
throw new IllegalStateException("Already subscribed");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@SuppressWarnings("ReactiveStreamsSubscriberImplementation")
|
|
||||||
public void createAndRegisterClient() {
|
public void createAndRegisterClient() {
|
||||||
if (clientId != null) throw new UnsupportedOperationException("Can't initialize the same client twice!");
|
if (clientId != null) {
|
||||||
|
throw new UnsupportedOperationException("Can't initialize the same client twice!");
|
||||||
|
}
|
||||||
logger.debug(TG_MARKER, "Creating new client");
|
logger.debug(TG_MARKER, "Creating new client");
|
||||||
clientId = NativeClientAccess.create();
|
clientId = NativeClientAccess.create();
|
||||||
logger.debug(TG_MARKER, "Registering new client {}", clientId);
|
logger.debug(TG_MARKER, "Registering new client {}", clientId);
|
||||||
clientManager.registerClient(clientId, this);
|
clientManager.registerClient(clientId, this);
|
||||||
|
|
||||||
CountDownLatch registeredClient = new CountDownLatch(1);
|
|
||||||
|
|
||||||
// Send a dummy request because @levlam is too lazy to fix race conditions in a better way
|
|
||||||
this.send(new TdApi.GetAuthorizationState(), Duration.ofDays(1)).subscribe(new Subscriber<TdApi.Object>() {
|
|
||||||
@Override
|
|
||||||
public void onSubscribe(Subscription subscription) {
|
|
||||||
subscription.request(1);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onNext(TdApi.Object item) {
|
|
||||||
registeredClient.countDown();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onError(Throwable throwable) {
|
|
||||||
registeredClient.countDown();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onComplete() {
|
|
||||||
registeredClient.countDown();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
try {
|
|
||||||
registeredClient.await();
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
throw new IllegalStateException(e);
|
|
||||||
}
|
|
||||||
logger.debug(TG_MARKER, "Registered new client {}", clientId);
|
logger.debug(TG_MARKER, "Registered new client {}", clientId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -259,10 +169,11 @@ public final class InternalReactiveClient implements ClientEventsHandler, Reacti
|
||||||
subscriber.onNext(new TdApi.Ok());
|
subscriber.onNext(new TdApi.Ok());
|
||||||
subscriber.onComplete();
|
subscriber.onComplete();
|
||||||
} else if (clientId == null) {
|
} else if (clientId == null) {
|
||||||
logger.trace(TG_MARKER, "Can't send a request to TDLib before calling \"createAndRegisterClient\" function!");
|
logger.trace(TG_MARKER,
|
||||||
subscriber.onError(
|
"Can't send a request to TDLib before calling \"createAndRegisterClient\" function!"
|
||||||
new IllegalStateException("Can't send a request to TDLib before calling \"createAndRegisterClient\" function!")
|
|
||||||
);
|
);
|
||||||
|
subscriber.onError(new IllegalStateException(
|
||||||
|
"Can't send a request to TDLib before calling \"createAndRegisterClient\" function!"));
|
||||||
} else {
|
} else {
|
||||||
long queryId = clientManager.getNextQueryId();
|
long queryId = clientManager.getNextQueryId();
|
||||||
|
|
||||||
|
@ -282,8 +193,13 @@ public final class InternalReactiveClient implements ClientEventsHandler, Reacti
|
||||||
}, responseTimeout.toMillis(), TimeUnit.MILLISECONDS);
|
}, responseTimeout.toMillis(), TimeUnit.MILLISECONDS);
|
||||||
|
|
||||||
handlers.put(queryId, new Handler(result -> {
|
handlers.put(queryId, new Handler(result -> {
|
||||||
logger.trace(TG_MARKER, "Client {} is replying the query id {}: request: {} result: {}", clientId,
|
logger.trace(TG_MARKER,
|
||||||
queryId, query, result);
|
"Client {} is replying the query id {}: request: {} result: {}",
|
||||||
|
clientId,
|
||||||
|
queryId,
|
||||||
|
query,
|
||||||
|
result
|
||||||
|
);
|
||||||
boolean timeoutCancelled = timeoutFuture.cancel(false);
|
boolean timeoutCancelled = timeoutFuture.cancel(false);
|
||||||
if (!cancelled && timeoutCancelled) {
|
if (!cancelled && timeoutCancelled) {
|
||||||
subscriber.onNext(result);
|
subscriber.onNext(result);
|
||||||
|
@ -292,8 +208,7 @@ public final class InternalReactiveClient implements ClientEventsHandler, Reacti
|
||||||
subscriber.onComplete();
|
subscriber.onComplete();
|
||||||
}
|
}
|
||||||
}, t -> {
|
}, t -> {
|
||||||
logger.trace(TG_MARKER, "Client {} has failed the query id {}: {}", clientId,
|
logger.trace(TG_MARKER, "Client {} has failed the query id {}: {}", clientId, queryId, query);
|
||||||
queryId, query);
|
|
||||||
boolean timeoutCancelled = timeoutFuture.cancel(false);
|
boolean timeoutCancelled = timeoutFuture.cancel(false);
|
||||||
if (!cancelled && timeoutCancelled) {
|
if (!cancelled && timeoutCancelled) {
|
||||||
subscriber.onError(t);
|
subscriber.onError(t);
|
||||||
|
@ -325,13 +240,71 @@ public final class InternalReactiveClient implements ClientEventsHandler, Reacti
|
||||||
return NativeClientAccess.execute(query);
|
return NativeClientAccess.execute(query);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setListener(SignalListener listener) {
|
||||||
|
logger.debug(TG_MARKER, "Setting handler of client {}", clientId);
|
||||||
|
SignalListener resultListener = this.signalListener.updateAndGet(previousListener -> {
|
||||||
|
if (previousListener instanceof ReplayStartupUpdatesListener) {
|
||||||
|
ReplayStartupUpdatesListener replayListener = (ReplayStartupUpdatesListener) previousListener;
|
||||||
|
replayListener.setNewListener(listener);
|
||||||
|
return replayListener;
|
||||||
|
} else if (previousListener != null) {
|
||||||
|
throw new IllegalStateException("Already subscribed");
|
||||||
|
} else {
|
||||||
|
return listener;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// Drain startup queue
|
||||||
|
if (resultListener instanceof ReplayStartupUpdatesListener) {
|
||||||
|
ReplayStartupUpdatesListener replayStartupUpdatesListener = (ReplayStartupUpdatesListener) resultListener;
|
||||||
|
replayStartupUpdatesListener.drain();
|
||||||
|
}
|
||||||
|
|
||||||
|
TdApi.GetAuthorizationState query = new TdApi.GetAuthorizationState();
|
||||||
|
long queryId = clientManager.getNextQueryId();
|
||||||
|
|
||||||
|
// Send a dummy request to effectively start the TDLib session
|
||||||
|
{
|
||||||
|
handlers.put(queryId, EMPTY_HANDLER);
|
||||||
|
logger.trace(TG_MARKER, "Client {} is requesting with query id {}: {}", clientId, queryId, query);
|
||||||
|
NativeClientAccess.send(clientId, queryId, query);
|
||||||
|
logger.trace(TG_MARKER, "Client {} requested with query id {}: {}", clientId, queryId, query);
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.debug(TG_MARKER, "Set handler of client {}", clientId);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void cancel() {
|
||||||
|
logger.debug(TG_MARKER, "Client {} is being cancelled", clientId);
|
||||||
|
this.sendCloseAndIgnoreResponse();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void dispose() {
|
||||||
|
logger.debug(TG_MARKER, "Client {} is being disposed", clientId);
|
||||||
|
this.sendCloseAndIgnoreResponse();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void sendCloseAndIgnoreResponse() {
|
||||||
|
if (!alreadyReceivedClosed.get()) {
|
||||||
|
TdApi.Close query = new TdApi.Close();
|
||||||
|
long queryId = clientManager.getNextQueryId();
|
||||||
|
|
||||||
|
handlers.put(queryId, EMPTY_HANDLER);
|
||||||
|
logger.trace(TG_MARKER, "Client {} is requesting with query id {}: {}", clientId, queryId, query);
|
||||||
|
NativeClientAccess.send(clientId, queryId, query);
|
||||||
|
logger.trace(TG_MARKER, "Client {} requested with query id {}: {}", clientId, queryId, query);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
|
||||||
* @param function function used to check if the check will be enforced or not. Can be null
|
* @param function function used to check if the check will be enforced or not. Can be null
|
||||||
* @return true if closed
|
* @return true if closed
|
||||||
*/
|
*/
|
||||||
private boolean isClosedAndMaybeThrow(Function function) {
|
private boolean isClosedAndMaybeThrow(Function function) {
|
||||||
boolean closed = isClosed.get();
|
boolean closed = alreadyReceivedClosed.get();
|
||||||
if (closed) {
|
if (closed) {
|
||||||
if (function != null && function.getConstructor() == TdApi.Close.CONSTRUCTOR) {
|
if (function != null && function.getConstructor() == TdApi.Close.CONSTRUCTOR) {
|
||||||
return true;
|
return true;
|
||||||
|
@ -341,4 +314,71 @@ public final class InternalReactiveClient implements ClientEventsHandler, Reacti
|
||||||
}
|
}
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void onDefaultException(Throwable updateEx) {
|
||||||
|
Signal item = Signal.ofUpdateException(updateEx);
|
||||||
|
SignalListener signalListener = this.signalListener.get();
|
||||||
|
if (signalListener != null) {
|
||||||
|
signalListener.onSignal(item);
|
||||||
|
} else {
|
||||||
|
logger.error(TG_MARKER, "No signal listener set. Dropped default error {}", (Object) updateEx);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void onUpdateException(Throwable updateEx) {
|
||||||
|
Signal item = Signal.ofUpdateException(updateEx);
|
||||||
|
SignalListener signalListener = this.signalListener.get();
|
||||||
|
if (signalListener != null) {
|
||||||
|
signalListener.onSignal(item);
|
||||||
|
} else {
|
||||||
|
logger.error(TG_MARKER, "No signal listener set. Dropped update error {}", (Object) updateEx);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void onUpdateFromHandler(TdApi.Object updateItem) {
|
||||||
|
Signal item = Signal.ofUpdate(updateItem);
|
||||||
|
SignalListener signalListener = this.signalListener.get();
|
||||||
|
if (signalListener != null) {
|
||||||
|
signalListener.onSignal(item);
|
||||||
|
} else {
|
||||||
|
logger.error(TG_MARKER, "No signal listener set. Dropped update {}", updateItem);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class ReplayStartupUpdatesListener implements SignalListener {
|
||||||
|
|
||||||
|
private final ConcurrentLinkedQueue<Signal> queue = new ConcurrentLinkedQueue<>();
|
||||||
|
private final AtomicReference<SignalListener> listener = new AtomicReference<>(null);
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onSignal(Signal signal) {
|
||||||
|
SignalListener listener;
|
||||||
|
if ((listener = this.listener.get()) != null) {
|
||||||
|
drainQueue(listener);
|
||||||
|
assert queue.isEmpty();
|
||||||
|
listener.onSignal(signal);
|
||||||
|
} else {
|
||||||
|
queue.add(signal);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setNewListener(SignalListener listener) {
|
||||||
|
this.listener.set(listener);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void drain() {
|
||||||
|
SignalListener listener;
|
||||||
|
if ((listener = this.listener.get()) != null) {
|
||||||
|
drainQueue(listener);
|
||||||
|
assert queue.isEmpty();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void drainQueue(SignalListener listener) {
|
||||||
|
Signal elem;
|
||||||
|
while ((elem = queue.poll()) != null) {
|
||||||
|
listener.onSignal(elem);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue