diff --git a/src/main/java/it/tdlight/common/ReactiveItem.java b/src/main/java/it/tdlight/common/ReactiveItem.java deleted file mode 100644 index 09dd3bd..0000000 --- a/src/main/java/it/tdlight/common/ReactiveItem.java +++ /dev/null @@ -1,95 +0,0 @@ -package it.tdlight.common; - -import it.tdlight.jni.TdApi; -import java.util.Objects; - -public final class ReactiveItem { - - private final TdApi.Object item; - private final Throwable ex; - private final boolean updateException; - - private ReactiveItem(Throwable ex, boolean updateException) { - this.item = null; - this.ex = Objects.requireNonNull(ex); - this.updateException = updateException; - } - - private ReactiveItem(TdApi.Object item) { - this.item = Objects.requireNonNull(item); - this.ex = null; - this.updateException = false; - } - - public static ReactiveItem ofUpdateException(Throwable ex) { - return new ReactiveItem(ex, true); - } - - public static ReactiveItem ofHandleException(Throwable ex) { - return new ReactiveItem(ex, false); - } - - public static ReactiveItem ofUpdate(TdApi.Object item) { - return new ReactiveItem(item); - } - - public boolean isUpdateException() { - return ex != null && updateException; - } - - public boolean isHandleException() { - return ex != null && !updateException; - } - - public boolean isUpdate() { - return ex == null; - } - - public TdApi.Object getUpdate() { - return Objects.requireNonNull(item, "This is not an update"); - } - - public Throwable getUpdateException() { - if (!updateException) { - throw new IllegalStateException("This is not an update exception"); - } - return Objects.requireNonNull(ex, "This is not an update exception"); - } - - public Throwable getHandleException() { - if (updateException) { - throw new IllegalStateException("This is not an handle exception"); - } - return Objects.requireNonNull(ex, "This is not an handle exception"); - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - ReactiveItem that = (ReactiveItem) o; - return updateException == that.updateException && Objects.equals(item, that.item) && Objects.equals(ex, that.ex); - } - - @Override - public int hashCode() { - return Objects.hash(item, ex, updateException); - } - - @Override - public String toString() { - if (ex != null) { - if (updateException) { - return "UpdateException(" + ex + ")"; - } else { - return "HandleException(" + ex + ")"; - } - } else { - return "" + item; - } - } -} diff --git a/src/main/java/it/tdlight/common/ReactiveTelegramClient.java b/src/main/java/it/tdlight/common/ReactiveTelegramClient.java index fb9e821..63fcf9f 100644 --- a/src/main/java/it/tdlight/common/ReactiveTelegramClient.java +++ b/src/main/java/it/tdlight/common/ReactiveTelegramClient.java @@ -5,7 +5,7 @@ import java.time.Duration; import org.reactivestreams.Publisher; @SuppressWarnings("ReactiveStreamsPublisherImplementation") -public interface ReactiveTelegramClient extends Publisher { +public interface ReactiveTelegramClient { /** * Creates and registers the client @@ -30,4 +30,16 @@ public interface ReactiveTelegramClient extends Publisher { * @throws NullPointerException if query is null. */ TdApi.Object execute(TdApi.Function query); + + void setListener(SignalListener listener); + + /** + * Send close signal but don't remove the listener + */ + void cancel(); + + /** + * Remove the listener + */ + void dispose(); } diff --git a/src/main/java/it/tdlight/common/Signal.java b/src/main/java/it/tdlight/common/Signal.java new file mode 100644 index 0000000..48780c8 --- /dev/null +++ b/src/main/java/it/tdlight/common/Signal.java @@ -0,0 +1,86 @@ +package it.tdlight.common; + +import it.tdlight.jni.TdApi; +import java.util.Objects; +import java.util.StringJoiner; + +public final class Signal { + + private final TdApi.Object item; + private final Throwable ex; + private final SignalType signalType; + + private Signal(SignalType signalType, TdApi.Object item, Throwable ex) { + this.signalType = signalType; + this.item = item; + this.ex = ex; + } + + public static Signal ofUpdateException(Throwable ex) { + return new Signal(SignalType.EXCEPTION, null, Objects.requireNonNull(ex, "Exception is null")); + } + + public static Signal ofUpdate(TdApi.Object item) { + return new Signal(SignalType.UPDATE, Objects.requireNonNull(item, "Update is null"), null); + } + + public static Signal ofClosed() { + return new Signal(SignalType.CLOSE, null, null); + } + + public boolean isUpdate() { + return signalType == SignalType.UPDATE; + } + + public boolean isException() { + return signalType == SignalType.EXCEPTION; + } + + public boolean isClosed() { + return signalType == SignalType.CLOSE; + } + + public boolean isNotClosed() { + return signalType != SignalType.CLOSE; + } + + public TdApi.Object getUpdate() { + return Objects.requireNonNull(item, "This is not an update"); + } + + public Throwable getException() { + return Objects.requireNonNull(ex, "This is not an exception"); + } + + public void getClosed() { + if (signalType != SignalType.CLOSE) { + throw new IllegalStateException("Expected signal type closed, but the type is " + signalType); + } + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + Signal signal = (Signal) o; + return Objects.equals(item, signal.item) && Objects.equals(ex, signal.ex) && signalType == signal.signalType; + } + + @Override + public int hashCode() { + return Objects.hash(item, ex, signalType); + } + + @Override + public String toString() { + return new StringJoiner(", ", Signal.class.getSimpleName() + "[", "]") + .add("item=" + item) + .add("ex=" + ex) + .add("signalType=" + signalType) + .toString(); + } +} diff --git a/src/main/java/it/tdlight/common/SignalListener.java b/src/main/java/it/tdlight/common/SignalListener.java new file mode 100644 index 0000000..8f023de --- /dev/null +++ b/src/main/java/it/tdlight/common/SignalListener.java @@ -0,0 +1,6 @@ +package it.tdlight.common; + +public interface SignalListener { + + void onSignal(Signal signal); +} diff --git a/src/main/java/it/tdlight/common/SignalType.java b/src/main/java/it/tdlight/common/SignalType.java new file mode 100644 index 0000000..348b0c3 --- /dev/null +++ b/src/main/java/it/tdlight/common/SignalType.java @@ -0,0 +1,7 @@ +package it.tdlight.common; + +public enum SignalType { + UPDATE, + EXCEPTION, + CLOSE +} diff --git a/src/main/java/it/tdlight/common/internal/InternalReactiveClient.java b/src/main/java/it/tdlight/common/internal/InternalReactiveClient.java index a6faf09..f930740 100644 --- a/src/main/java/it/tdlight/common/internal/InternalReactiveClient.java +++ b/src/main/java/it/tdlight/common/internal/InternalReactiveClient.java @@ -2,8 +2,9 @@ package it.tdlight.common.internal; import it.tdlight.common.ClientEventsHandler; import it.tdlight.common.ExceptionHandler; -import it.tdlight.common.ReactiveItem; +import it.tdlight.common.Signal; import it.tdlight.common.ReactiveTelegramClient; +import it.tdlight.common.SignalListener; import it.tdlight.jni.TdApi; import it.tdlight.jni.TdApi.Error; import it.tdlight.jni.TdApi.Function; @@ -12,15 +13,13 @@ import java.util.Objects; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import org.reactivestreams.Publisher; -import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -31,42 +30,24 @@ public final class InternalReactiveClient implements ClientEventsHandler, Reacti private static final Marker TG_MARKER = MarkerFactory.getMarker("TG"); private static final Logger logger = LoggerFactory.getLogger(InternalReactiveClient.class); + private static final Handler EMPTY_HANDLER = new Handler(r -> {}, ex -> {}); + private final ConcurrentHashMap handlers = new ConcurrentHashMap<>(); private final Set timedOutHandlers = new ConcurrentHashMap().keySet(new Object()); private final ScheduledExecutorService timers = Executors.newSingleThreadScheduledExecutor(); - private final ConcurrentLinkedQueue backpressureQueue = new ConcurrentLinkedQueue<>(); private final ExceptionHandler defaultExceptionHandler; private final Handler updateHandler; private volatile Integer clientId = null; private final InternalClientManager clientManager; - private final AtomicBoolean isClosed = new AtomicBoolean(); - private final AtomicBoolean updatesAlreadySubscribed = new AtomicBoolean(false); - private final AtomicLong requested = new AtomicLong(0); - private volatile Subscriber subscriber; + private final AtomicBoolean alreadyReceivedClosed = new AtomicBoolean(); + private final AtomicReference signalListener = new AtomicReference<>(new ReplayStartupUpdatesListener()); public InternalReactiveClient(InternalClientManager clientManager) { this.clientManager = clientManager; - this.updateHandler = new Handler( - updateItem -> { - ReactiveItem item = ReactiveItem.ofUpdate(updateItem); - if (subscriber != null && requested.getAndUpdate(n -> n == 0 ? 0 : (n - 1)) > 0) { - subscriber.onNext(item); - } else { - backpressureQueue.add(item); - } - }, - updateEx -> { - ReactiveItem item = ReactiveItem.ofUpdateException(updateEx); - if (subscriber != null && requested.getAndUpdate(n -> n == 0 ? 0 : (n - 1)) > 0) { - subscriber.onNext(item); - } else { - backpressureQueue.add(item); - } - } - ); - this.defaultExceptionHandler = ex -> backpressureQueue.add(ReactiveItem.ofHandleException(ex)); + this.updateHandler = new Handler(this::onUpdateFromHandler, this::onUpdateException); + this.defaultExceptionHandler = this::onDefaultException; } @Override @@ -81,15 +62,34 @@ public final class InternalReactiveClient implements ClientEventsHandler, Reacti } if (isClosed) { - if (this.isClosed.compareAndSet(false, true)) { + if (this.alreadyReceivedClosed.compareAndSet(false, true)) { handleClose(); } } - } + } + /** + * This method will be called exactly once + */ private void handleClose() { - handlers.forEach((eventId, handler) -> handleResponse(eventId, new Error(500, "Instance closed"), handler)); + TdApi.Error instanceClosedError = new Error(500, "Instance closed"); + handlers.forEach((eventId, handler) -> this.handleResponse(eventId, instanceClosedError, handler)); handlers.clear(); + this.timedOutHandlers.clear(); + timers.shutdown(); + try { + boolean terminated = timers.awaitTermination(1, TimeUnit.MINUTES); + if (!terminated) { + timers.shutdownNow(); + } + } catch (InterruptedException e) { + logger.debug(TG_MARKER, "Interrupted", e); + } + SignalListener signalListener = this.signalListener.get(); + // Close the signal listener if it still exists + if (signalListener != null) { + signalListener.onSignal(Signal.ofClosed()); + } } /** @@ -135,111 +135,21 @@ public final class InternalReactiveClient implements ClientEventsHandler, Reacti if (exceptionHandler == null) { exceptionHandler = defaultExceptionHandler; } - if (exceptionHandler != null) { - try { - exceptionHandler.onException(cause); - } catch (Throwable ignored) {} + try { + exceptionHandler.onException(cause); + } catch (Throwable ignored) { } } - @Override - public void subscribe(Subscriber subscriber) { - AtomicBoolean alreadyCompleted = new AtomicBoolean(); - if (updatesAlreadySubscribed.compareAndSet(false, true)) { - Subscription subscription = new Subscription() { - - @Override - public void request(long n) { - ReactiveItem item; - while (n > 0 && (item = backpressureQueue.poll()) != null) { - subscriber.onNext(item); - n--; - } - if (n > 0) { - requested.addAndGet(n); - } - if (isClosed.get()) { - if (alreadyCompleted.compareAndSet(false, true)) { - subscriber.onComplete(); - logger.info(TG_MARKER, "Client closed {}", clientId); - } - } - } - - @SuppressWarnings("ReactiveStreamsSubscriberImplementation") - @Override - public void cancel() { - if (!isClosed.get()) { - send(new TdApi.Close(), Duration.ofDays(1)).subscribe(new Subscriber() { - @Override - public void onSubscribe(Subscription subscription) { - subscription.request(1); - } - - @Override - public void onNext(TdApi.Object o) { - - } - - @Override - public void onError(Throwable throwable) { - - } - - @Override - public void onComplete() { - - } - }); - } - } - }; - this.subscriber = subscriber; - - subscriber.onSubscribe(subscription); - } else { - throw new IllegalStateException("Already subscribed"); - } - } - - @SuppressWarnings("ReactiveStreamsSubscriberImplementation") + @SuppressWarnings({"ReactiveStreamsSubscriberImplementation", "Convert2Diamond"}) public void createAndRegisterClient() { - if (clientId != null) throw new UnsupportedOperationException("Can't initialize the same client twice!"); + if (clientId != null) { + throw new UnsupportedOperationException("Can't initialize the same client twice!"); + } logger.debug(TG_MARKER, "Creating new client"); clientId = NativeClientAccess.create(); logger.debug(TG_MARKER, "Registering new client {}", clientId); clientManager.registerClient(clientId, this); - - CountDownLatch registeredClient = new CountDownLatch(1); - - // Send a dummy request because @levlam is too lazy to fix race conditions in a better way - this.send(new TdApi.GetAuthorizationState(), Duration.ofDays(1)).subscribe(new Subscriber() { - @Override - public void onSubscribe(Subscription subscription) { - subscription.request(1); - } - - @Override - public void onNext(TdApi.Object item) { - registeredClient.countDown(); - } - - @Override - public void onError(Throwable throwable) { - registeredClient.countDown(); - } - - @Override - public void onComplete() { - registeredClient.countDown(); - } - }); - - try { - registeredClient.await(); - } catch (InterruptedException e) { - throw new IllegalStateException(e); - } logger.debug(TG_MARKER, "Registered new client {}", clientId); } @@ -259,10 +169,11 @@ public final class InternalReactiveClient implements ClientEventsHandler, Reacti subscriber.onNext(new TdApi.Ok()); subscriber.onComplete(); } else if (clientId == null) { - logger.trace(TG_MARKER, "Can't send a request to TDLib before calling \"createAndRegisterClient\" function!"); - subscriber.onError( - new IllegalStateException("Can't send a request to TDLib before calling \"createAndRegisterClient\" function!") + logger.trace(TG_MARKER, + "Can't send a request to TDLib before calling \"createAndRegisterClient\" function!" ); + subscriber.onError(new IllegalStateException( + "Can't send a request to TDLib before calling \"createAndRegisterClient\" function!")); } else { long queryId = clientManager.getNextQueryId(); @@ -282,8 +193,13 @@ public final class InternalReactiveClient implements ClientEventsHandler, Reacti }, responseTimeout.toMillis(), TimeUnit.MILLISECONDS); handlers.put(queryId, new Handler(result -> { - logger.trace(TG_MARKER, "Client {} is replying the query id {}: request: {} result: {}", clientId, - queryId, query, result); + logger.trace(TG_MARKER, + "Client {} is replying the query id {}: request: {} result: {}", + clientId, + queryId, + query, + result + ); boolean timeoutCancelled = timeoutFuture.cancel(false); if (!cancelled && timeoutCancelled) { subscriber.onNext(result); @@ -292,8 +208,7 @@ public final class InternalReactiveClient implements ClientEventsHandler, Reacti subscriber.onComplete(); } }, t -> { - logger.trace(TG_MARKER, "Client {} has failed the query id {}: {}", clientId, - queryId, query); + logger.trace(TG_MARKER, "Client {} has failed the query id {}: {}", clientId, queryId, query); boolean timeoutCancelled = timeoutFuture.cancel(false); if (!cancelled && timeoutCancelled) { subscriber.onError(t); @@ -325,13 +240,71 @@ public final class InternalReactiveClient implements ClientEventsHandler, Reacti return NativeClientAccess.execute(query); } + @Override + public void setListener(SignalListener listener) { + logger.debug(TG_MARKER, "Setting handler of client {}", clientId); + SignalListener resultListener = this.signalListener.updateAndGet(previousListener -> { + if (previousListener instanceof ReplayStartupUpdatesListener) { + ReplayStartupUpdatesListener replayListener = (ReplayStartupUpdatesListener) previousListener; + replayListener.setNewListener(listener); + return replayListener; + } else if (previousListener != null) { + throw new IllegalStateException("Already subscribed"); + } else { + return listener; + } + }); + + // Drain startup queue + if (resultListener instanceof ReplayStartupUpdatesListener) { + ReplayStartupUpdatesListener replayStartupUpdatesListener = (ReplayStartupUpdatesListener) resultListener; + replayStartupUpdatesListener.drain(); + } + + TdApi.GetAuthorizationState query = new TdApi.GetAuthorizationState(); + long queryId = clientManager.getNextQueryId(); + + // Send a dummy request to effectively start the TDLib session + { + handlers.put(queryId, EMPTY_HANDLER); + logger.trace(TG_MARKER, "Client {} is requesting with query id {}: {}", clientId, queryId, query); + NativeClientAccess.send(clientId, queryId, query); + logger.trace(TG_MARKER, "Client {} requested with query id {}: {}", clientId, queryId, query); + } + + logger.debug(TG_MARKER, "Set handler of client {}", clientId); + } + + @Override + public void cancel() { + logger.debug(TG_MARKER, "Client {} is being cancelled", clientId); + this.sendCloseAndIgnoreResponse(); + } + + @Override + public void dispose() { + logger.debug(TG_MARKER, "Client {} is being disposed", clientId); + this.sendCloseAndIgnoreResponse(); + } + + private void sendCloseAndIgnoreResponse() { + if (!alreadyReceivedClosed.get()) { + TdApi.Close query = new TdApi.Close(); + long queryId = clientManager.getNextQueryId(); + + handlers.put(queryId, EMPTY_HANDLER); + logger.trace(TG_MARKER, "Client {} is requesting with query id {}: {}", clientId, queryId, query); + NativeClientAccess.send(clientId, queryId, query); + logger.trace(TG_MARKER, "Client {} requested with query id {}: {}", clientId, queryId, query); + } + } + /** - * * @param function function used to check if the check will be enforced or not. Can be null * @return true if closed */ private boolean isClosedAndMaybeThrow(Function function) { - boolean closed = isClosed.get(); + boolean closed = alreadyReceivedClosed.get(); if (closed) { if (function != null && function.getConstructor() == TdApi.Close.CONSTRUCTOR) { return true; @@ -341,4 +314,71 @@ public final class InternalReactiveClient implements ClientEventsHandler, Reacti } return false; } + + private void onDefaultException(Throwable updateEx) { + Signal item = Signal.ofUpdateException(updateEx); + SignalListener signalListener = this.signalListener.get(); + if (signalListener != null) { + signalListener.onSignal(item); + } else { + logger.error(TG_MARKER, "No signal listener set. Dropped default error {}", (Object) updateEx); + } + } + + private void onUpdateException(Throwable updateEx) { + Signal item = Signal.ofUpdateException(updateEx); + SignalListener signalListener = this.signalListener.get(); + if (signalListener != null) { + signalListener.onSignal(item); + } else { + logger.error(TG_MARKER, "No signal listener set. Dropped update error {}", (Object) updateEx); + } + } + + private void onUpdateFromHandler(TdApi.Object updateItem) { + Signal item = Signal.ofUpdate(updateItem); + SignalListener signalListener = this.signalListener.get(); + if (signalListener != null) { + signalListener.onSignal(item); + } else { + logger.error(TG_MARKER, "No signal listener set. Dropped update {}", updateItem); + } + } + + private static class ReplayStartupUpdatesListener implements SignalListener { + + private final ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue<>(); + private final AtomicReference listener = new AtomicReference<>(null); + + @Override + public void onSignal(Signal signal) { + SignalListener listener; + if ((listener = this.listener.get()) != null) { + drainQueue(listener); + assert queue.isEmpty(); + listener.onSignal(signal); + } else { + queue.add(signal); + } + } + + public void setNewListener(SignalListener listener) { + this.listener.set(listener); + } + + public void drain() { + SignalListener listener; + if ((listener = this.listener.get()) != null) { + drainQueue(listener); + assert queue.isEmpty(); + } + } + + private void drainQueue(SignalListener listener) { + Signal elem; + while ((elem = queue.poll()) != null) { + listener.onSignal(elem); + } + } + } }