From c7c2b03ef18ff1c8e7d899a5b26faf66844ee4f3 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Tue, 13 Sep 2022 22:37:12 +0200 Subject: [PATCH] Improve performance --- .../internal/InternalReactiveClient.java | 41 ++++++++----------- 1 file changed, 18 insertions(+), 23 deletions(-) diff --git a/src/main/java/it/tdlight/common/internal/InternalReactiveClient.java b/src/main/java/it/tdlight/common/internal/InternalReactiveClient.java index ab686a7..13dec7f 100644 --- a/src/main/java/it/tdlight/common/internal/InternalReactiveClient.java +++ b/src/main/java/it/tdlight/common/internal/InternalReactiveClient.java @@ -44,7 +44,9 @@ public final class InternalReactiveClient implements ClientEventsHandler, Reacti private final InternalClientManager clientManager; private final AtomicBoolean alreadyReceivedClosed = new AtomicBoolean(); - private final AtomicReference signalListener = new AtomicReference<>(new ReplayStartupUpdatesListener()); + // This field is not volatile, but it's not problematic, because ReplayStartupUpdatesListener is able to forward + // updates to the right listener + private SignalListener signalListener = new ReplayStartupUpdatesListener(); public InternalReactiveClient(InternalClientManager clientManager) { this.clientManager = clientManager; @@ -98,7 +100,7 @@ public final class InternalReactiveClient implements ClientEventsHandler, Reacti } catch (InterruptedException e) { logger.debug(TG_MARKER, "Interrupted", e); } - SignalListener signalListener = this.signalListener.get(); + SignalListener signalListener = this.signalListener; // Close the signal listener if it still exists if (signalListener != null) { signalListener.onSignal(Signal.ofClosed()); @@ -260,24 +262,17 @@ public final class InternalReactiveClient implements ClientEventsHandler, Reacti public void setListener(SignalListener listener) { logger.debug(TG_MARKER, "Setting handler of client {}", clientId); - // Keep in mind that this lambda could be called multiple times - SignalListener resultListener = this.signalListener.updateAndGet(previousListener -> { - if (previousListener instanceof ReplayStartupUpdatesListener) { - ReplayStartupUpdatesListener replayListener = (ReplayStartupUpdatesListener) previousListener; - replayListener.setNewListener(listener); - return replayListener; - } else if (previousListener != null) { - throw new IllegalStateException("Already subscribed"); - } else { - return listener; - } - }); - - // Drain startup queue - if (resultListener instanceof ReplayStartupUpdatesListener) { - ReplayStartupUpdatesListener replayStartupUpdatesListener = (ReplayStartupUpdatesListener) resultListener; - replayStartupUpdatesListener.drain(); + var prevSignalListener = this.signalListener; + if (!(prevSignalListener instanceof ReplayStartupUpdatesListener)) { + throw new IllegalStateException("Already subscribed"); } + var replayStartupUpdatesListener = (ReplayStartupUpdatesListener) prevSignalListener; + // Set the new listener into the startup listener, then drain its startup queue + replayStartupUpdatesListener.setNewListener(listener); + replayStartupUpdatesListener.drain(); + + // Set the new listener + this.signalListener = listener; TdApi.GetAuthorizationState query = new TdApi.GetAuthorizationState(); long queryId = clientManager.getNextQueryId(); @@ -346,7 +341,7 @@ public final class InternalReactiveClient implements ClientEventsHandler, Reacti private void onDefaultException(Throwable updateEx) { Signal item = Signal.ofUpdateException(updateEx); - SignalListener signalListener = this.signalListener.get(); + SignalListener signalListener = this.signalListener; if (signalListener != null) { signalListener.onSignal(item); } else { @@ -356,7 +351,7 @@ public final class InternalReactiveClient implements ClientEventsHandler, Reacti private void onUpdateException(Throwable updateEx) { Signal item = Signal.ofUpdateException(updateEx); - SignalListener signalListener = this.signalListener.get(); + SignalListener signalListener = this.signalListener; if (signalListener != null) { signalListener.onSignal(item); } else { @@ -366,7 +361,7 @@ public final class InternalReactiveClient implements ClientEventsHandler, Reacti private void onUpdateFromHandler(TdApi.Object updateItem) { Signal item = Signal.ofUpdate(updateItem); - SignalListener signalListener = this.signalListener.get(); + SignalListener signalListener = this.signalListener; if (signalListener != null) { signalListener.onSignal(item); } else { @@ -387,7 +382,7 @@ public final class InternalReactiveClient implements ClientEventsHandler, Reacti assert queue.isEmpty(); listener.onSignal(signal); // Replace itself with the child signal listener, to reduce overhead permanently - InternalReactiveClient.this.signalListener.set(listener); + InternalReactiveClient.this.signalListener = listener; } else { queue.add(signal); }