Add a reactive telegram client interface
This commit is contained in:
parent
e45c88da2a
commit
b9709313bd
@ -12,7 +12,16 @@ public abstract class CommonClientManager {
|
||||
return create(client);
|
||||
}
|
||||
|
||||
protected synchronized static ReactiveTelegramClient createReactive(String implementationName) {
|
||||
InternalReactiveClient reactiveClient = new InternalReactiveClient(getClientManager(implementationName));
|
||||
return createReactive(reactiveClient);
|
||||
}
|
||||
|
||||
private static TelegramClient create(InternalClient internalClient) {
|
||||
return internalClient;
|
||||
}
|
||||
|
||||
private static ReactiveTelegramClient createReactive(InternalReactiveClient internalReactiveClient) {
|
||||
return internalReactiveClient;
|
||||
}
|
||||
}
|
||||
|
@ -50,7 +50,7 @@ public class InternalClientManager implements AutoCloseable {
|
||||
}
|
||||
}
|
||||
|
||||
public void registerClient(int clientId, InternalClient internalClient) {
|
||||
public void registerClient(int clientId, ClientEventsHandler internalClient) {
|
||||
responseReceiver.registerClient(clientId);
|
||||
boolean replaced = registeredClientEventHandlers.put(clientId, internalClient) != null;
|
||||
if (replaced) {
|
||||
|
279
src/main/java/it/tdlight/common/InternalReactiveClient.java
Normal file
279
src/main/java/it/tdlight/common/InternalReactiveClient.java
Normal file
@ -0,0 +1,279 @@
|
||||
package it.tdlight.common;
|
||||
|
||||
import it.tdlight.jni.TdApi;
|
||||
import it.tdlight.jni.TdApi.Error;
|
||||
import it.tdlight.jni.TdApi.Function;
|
||||
import it.tdlight.jni.TdApi.Object;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import org.reactivestreams.Publisher;
|
||||
import org.reactivestreams.Subscriber;
|
||||
import org.reactivestreams.Subscription;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class InternalReactiveClient implements ClientEventsHandler, ReactiveTelegramClient {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(TelegramClient.class);
|
||||
private final ConcurrentHashMap<Long, Handler> handlers = new ConcurrentHashMap<Long, Handler>();
|
||||
private final ConcurrentLinkedQueue<ReactiveItem> backpressureQueue = new ConcurrentLinkedQueue<>();
|
||||
private final ExceptionHandler defaultExceptionHandler;
|
||||
private final Handler updateHandler;
|
||||
|
||||
private volatile Integer clientId = null;
|
||||
private final InternalClientManager clientManager;
|
||||
|
||||
private final AtomicBoolean isClosed = new AtomicBoolean();
|
||||
private final AtomicBoolean updatesAlreadySubscribed = new AtomicBoolean(false);
|
||||
private final AtomicLong requested = new AtomicLong(0);
|
||||
private volatile Subscriber<? super ReactiveItem> subscriber;
|
||||
|
||||
public InternalReactiveClient(InternalClientManager clientManager) {
|
||||
this.clientManager = clientManager;
|
||||
this.updateHandler = new Handler(
|
||||
updateItem -> {
|
||||
var item = ReactiveItem.ofUpdate(updateItem);
|
||||
if (subscriber != null && requested.getAndUpdate(n -> n == 0 ? 0 : (n - 1)) > 0) {
|
||||
subscriber.onNext(item);
|
||||
} else {
|
||||
backpressureQueue.add(item);
|
||||
}
|
||||
},
|
||||
updateEx -> {
|
||||
var item = ReactiveItem.ofUpdateException(updateEx);
|
||||
if (subscriber != null && requested.getAndUpdate(n -> n == 0 ? 0 : (n - 1)) > 0) {
|
||||
subscriber.onNext(item);
|
||||
} else {
|
||||
backpressureQueue.add(item);
|
||||
}
|
||||
}
|
||||
);
|
||||
this.defaultExceptionHandler = ex -> backpressureQueue.add(ReactiveItem.ofHandleException(ex));
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getClientId() {
|
||||
return Objects.requireNonNull(clientId, "Can't obtain the client id before initialization");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleEvents(boolean isClosed, long[] eventIds, Object[] events) {
|
||||
for (int i = 0; i < eventIds.length; i++) {
|
||||
handleEvent(eventIds[i], events[i]);
|
||||
}
|
||||
|
||||
if (isClosed) {
|
||||
if (this.isClosed.compareAndSet(false, true)) {
|
||||
handleClose();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void handleClose() {
|
||||
handlers.forEach((eventId, handler) -> {
|
||||
handleResponse(eventId, new Error(500, "Instance closed"), handler);
|
||||
});
|
||||
handlers.clear();
|
||||
}
|
||||
|
||||
/**
|
||||
* Handles only a response (not an update!)
|
||||
*/
|
||||
private void handleResponse(long eventId, Object event, Handler handler) {
|
||||
if (handler != null) {
|
||||
try {
|
||||
handler.getResultHandler().onResult(event);
|
||||
} catch (Throwable cause) {
|
||||
handleException(handler.getExceptionHandler(), cause);
|
||||
}
|
||||
} else {
|
||||
logger.error("Unknown event id \"{}\", the event has been dropped! {}", eventId, event);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Handles a response or an update
|
||||
*/
|
||||
private void handleEvent(long eventId, Object event) {
|
||||
Handler handler = eventId == 0 ? updateHandler : handlers.remove(eventId);
|
||||
handleResponse(eventId, event, handler);
|
||||
}
|
||||
|
||||
private void handleException(ExceptionHandler exceptionHandler, Throwable cause) {
|
||||
if (exceptionHandler == null) {
|
||||
exceptionHandler = defaultExceptionHandler;
|
||||
}
|
||||
if (exceptionHandler != null) {
|
||||
try {
|
||||
exceptionHandler.onException(cause);
|
||||
} catch (Throwable ignored) {}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void subscribe(Subscriber<? super ReactiveItem> subscriber) {
|
||||
AtomicBoolean alreadyCompleted = new AtomicBoolean();
|
||||
if (updatesAlreadySubscribed.compareAndSet(false, true)) {
|
||||
var subscription = new Subscription() {
|
||||
|
||||
@Override
|
||||
public void request(long n) {
|
||||
if (!backpressureQueue.isEmpty()) {
|
||||
while (!backpressureQueue.isEmpty() && n > 0) {
|
||||
var item = backpressureQueue.poll();
|
||||
subscriber.onNext(item);
|
||||
n--;
|
||||
}
|
||||
}
|
||||
if (n > 0) {
|
||||
requested.addAndGet(n);
|
||||
}
|
||||
if (isClosed.get()) {
|
||||
if (alreadyCompleted.compareAndSet(false, true)) {
|
||||
subscriber.onComplete();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cancel() {
|
||||
if (!isClosed.get()) {
|
||||
send(new TdApi.Close()).subscribe(new Subscriber<>() {
|
||||
@Override
|
||||
public void onSubscribe(Subscription subscription) {
|
||||
subscription.request(1);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onNext(Object item) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(Throwable throwable) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onComplete() {
|
||||
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
};
|
||||
this.subscriber = subscriber;
|
||||
|
||||
createAndRegisterClient();
|
||||
subscriber.onSubscribe(subscription);
|
||||
} else {
|
||||
throw new IllegalStateException("Already subscribed");
|
||||
}
|
||||
}
|
||||
|
||||
private void createAndRegisterClient() {
|
||||
if (clientId != null) throw new UnsupportedOperationException("Can't initialize the same client twice!");
|
||||
clientId = NativeClientAccess.create();
|
||||
clientManager.registerClient(clientId, this);
|
||||
logger.info("Registered new client {}", clientId);
|
||||
|
||||
// Send a dummy request because @levlam is too lazy to fix race conditions in a better way
|
||||
this.send(new TdApi.GetAuthorizationState()).subscribe(new Subscriber<>() {
|
||||
@Override
|
||||
public void onSubscribe(Subscription subscription) {
|
||||
subscription.request(1);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onNext(Object item) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(Throwable throwable) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onComplete() {
|
||||
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public Publisher<TdApi.Object> send(Function query) {
|
||||
AtomicBoolean alreadySubscribed = new AtomicBoolean(false);
|
||||
AtomicBoolean cancelled = new AtomicBoolean(false);
|
||||
return subscriber -> {
|
||||
if (alreadySubscribed.compareAndSet(false, true)) {
|
||||
AtomicBoolean alreadyRequested = new AtomicBoolean(false);
|
||||
var subscription = new Subscription() {
|
||||
|
||||
@Override
|
||||
public void request(long n) {
|
||||
if (alreadyRequested.compareAndSet(false, true) && !cancelled.get()) {
|
||||
if (isClosedAndMaybeThrow(query)) {
|
||||
subscriber.onNext(new TdApi.Ok());
|
||||
subscriber.onComplete();
|
||||
} else if (clientId == null) {
|
||||
subscriber.onError(
|
||||
new IllegalStateException("Can't send a request to TDLib before calling \"initialize\" function!")
|
||||
);
|
||||
} else {
|
||||
long queryId = clientManager.getNextQueryId();
|
||||
handlers.put(queryId, new Handler(result -> {
|
||||
if (!cancelled.get()) {
|
||||
subscriber.onNext(result);
|
||||
subscriber.onComplete();
|
||||
}
|
||||
}, throwable -> {
|
||||
if (!cancelled.get()) {
|
||||
subscriber.onError(throwable);
|
||||
}
|
||||
}));
|
||||
NativeClientAccess.send(clientId, queryId, query);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cancel() {
|
||||
cancelled.set(true);
|
||||
}
|
||||
};
|
||||
subscriber.onSubscribe(subscription);
|
||||
} else {
|
||||
throw new IllegalStateException("Already subscribed");
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object execute(Function query) {
|
||||
if (isClosedAndMaybeThrow(query)) {
|
||||
return new TdApi.Ok();
|
||||
}
|
||||
return NativeClientAccess.execute(query);
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @param function function used to check if the check will be enforced or not. Can be null
|
||||
* @return true if closed
|
||||
*/
|
||||
private boolean isClosedAndMaybeThrow(Function function) {
|
||||
boolean closed = isClosed.get();
|
||||
if (closed) {
|
||||
if (function != null && function.getConstructor() == TdApi.Close.CONSTRUCTOR) {
|
||||
return true;
|
||||
} else {
|
||||
throw new IllegalStateException("The client is closed!");
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
95
src/main/java/it/tdlight/common/ReactiveItem.java
Normal file
95
src/main/java/it/tdlight/common/ReactiveItem.java
Normal file
@ -0,0 +1,95 @@
|
||||
package it.tdlight.common;
|
||||
|
||||
import it.tdlight.jni.TdApi;
|
||||
import java.util.Objects;
|
||||
|
||||
public class ReactiveItem {
|
||||
|
||||
private final TdApi.Object item;
|
||||
private final Throwable ex;
|
||||
private final boolean updateException;
|
||||
|
||||
private ReactiveItem(Throwable ex, boolean updateException) {
|
||||
this.item = null;
|
||||
this.ex = Objects.requireNonNull(ex);
|
||||
this.updateException = updateException;
|
||||
}
|
||||
|
||||
private ReactiveItem(TdApi.Object item) {
|
||||
this.item = Objects.requireNonNull(item);
|
||||
this.ex = null;
|
||||
this.updateException = false;
|
||||
}
|
||||
|
||||
public static ReactiveItem ofUpdateException(Throwable ex) {
|
||||
return new ReactiveItem(ex, true);
|
||||
}
|
||||
|
||||
public static ReactiveItem ofHandleException(Throwable ex) {
|
||||
return new ReactiveItem(ex, false);
|
||||
}
|
||||
|
||||
public static ReactiveItem ofUpdate(TdApi.Object item) {
|
||||
return new ReactiveItem(item);
|
||||
}
|
||||
|
||||
public boolean isUpdateException() {
|
||||
return ex != null && updateException;
|
||||
}
|
||||
|
||||
public boolean isHandleException() {
|
||||
return ex != null && !updateException;
|
||||
}
|
||||
|
||||
public boolean isUpdate() {
|
||||
return ex == null;
|
||||
}
|
||||
|
||||
public TdApi.Object getUpdate() {
|
||||
return Objects.requireNonNull(item, "This is not an update");
|
||||
}
|
||||
|
||||
public Throwable getUpdateException() {
|
||||
if (!updateException) {
|
||||
throw new IllegalStateException("This is not an update exception");
|
||||
}
|
||||
return Objects.requireNonNull(ex, "This is not an update exception");
|
||||
}
|
||||
|
||||
public Throwable getHandleException() {
|
||||
if (updateException) {
|
||||
throw new IllegalStateException("This is not an handle exception");
|
||||
}
|
||||
return Objects.requireNonNull(ex, "This is not an handle exception");
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) {
|
||||
return true;
|
||||
}
|
||||
if (o == null || getClass() != o.getClass()) {
|
||||
return false;
|
||||
}
|
||||
ReactiveItem that = (ReactiveItem) o;
|
||||
return updateException == that.updateException && Objects.equals(item, that.item) && Objects.equals(ex, that.ex);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(item, ex, updateException);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
if (ex != null) {
|
||||
if (updateException) {
|
||||
return "UpdateException(" + ex + ")";
|
||||
} else {
|
||||
return "HandleException(" + ex + ")";
|
||||
}
|
||||
} else {
|
||||
return "" + item;
|
||||
}
|
||||
}
|
||||
}
|
26
src/main/java/it/tdlight/common/ReactiveTelegramClient.java
Normal file
26
src/main/java/it/tdlight/common/ReactiveTelegramClient.java
Normal file
@ -0,0 +1,26 @@
|
||||
package it.tdlight.common;
|
||||
|
||||
import it.tdlight.jni.TdApi;
|
||||
import org.reactivestreams.Publisher;
|
||||
|
||||
@SuppressWarnings("ReactiveStreamsPublisherImplementation")
|
||||
public interface ReactiveTelegramClient extends Publisher<ReactiveItem> {
|
||||
|
||||
/**
|
||||
* Sends a request to the TDLib.
|
||||
*
|
||||
* @param query Object representing a query to the TDLib.
|
||||
* @throws NullPointerException if query is null.
|
||||
* @return a publisher that will emit exactly one item, or an error
|
||||
*/
|
||||
Publisher<TdApi.Object> send(TdApi.Function query);
|
||||
|
||||
/**
|
||||
* Synchronously executes a TDLib request. Only a few marked accordingly requests can be executed synchronously.
|
||||
*
|
||||
* @param query Object representing a query to the TDLib.
|
||||
* @return request result or {@link TdApi.Error}.
|
||||
* @throws NullPointerException if query is null.
|
||||
*/
|
||||
TdApi.Object execute(TdApi.Function query);
|
||||
}
|
@ -1,6 +1,7 @@
|
||||
package it.tdlight.tdlib;
|
||||
|
||||
import it.tdlight.common.CommonClientManager;
|
||||
import it.tdlight.common.ReactiveTelegramClient;
|
||||
import it.tdlight.common.TelegramClient;
|
||||
|
||||
/**
|
||||
@ -13,4 +14,8 @@ public class ClientManager extends CommonClientManager {
|
||||
public static TelegramClient create() {
|
||||
return CommonClientManager.create(implementationName);
|
||||
}
|
||||
|
||||
public static ReactiveTelegramClient createReactive() {
|
||||
return CommonClientManager.createReactive(implementationName);
|
||||
}
|
||||
}
|
||||
|
@ -1,6 +1,7 @@
|
||||
package it.tdlight.tdlight;
|
||||
|
||||
import it.tdlight.common.CommonClientManager;
|
||||
import it.tdlight.common.ReactiveTelegramClient;
|
||||
import it.tdlight.common.TelegramClient;
|
||||
|
||||
/**
|
||||
@ -13,4 +14,8 @@ public class ClientManager extends CommonClientManager {
|
||||
public static TelegramClient create() {
|
||||
return CommonClientManager.create(implementationName);
|
||||
}
|
||||
|
||||
public static ReactiveTelegramClient createReactive() {
|
||||
return CommonClientManager.createReactive(implementationName);
|
||||
}
|
||||
}
|
||||
|
@ -108,6 +108,11 @@
|
||||
<artifactId>slf4j-api</artifactId>
|
||||
<version>1.7.30</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.reactivestreams</groupId>
|
||||
<artifactId>reactive-streams</artifactId>
|
||||
<version>1.0.3</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
<profiles>
|
||||
<profile>
|
||||
|
@ -108,6 +108,11 @@
|
||||
<artifactId>slf4j-api</artifactId>
|
||||
<version>1.7.30</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.reactivestreams</groupId>
|
||||
<artifactId>reactive-streams</artifactId>
|
||||
<version>1.0.3</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
<profiles>
|
||||
<profile>
|
||||
|
Loading…
Reference in New Issue
Block a user