From d477aeffe39b026f7aecd757270a822451737a8a Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Sat, 5 Sep 2020 14:12:21 +0200 Subject: [PATCH] Thread-safe client --- src/main/java/it/tdlight/tdlight/Client.java | 248 ++++++++++++------ .../java/it/tdlight/tdlight/ClientState.java | 116 ++++++++ 2 files changed, 291 insertions(+), 73 deletions(-) create mode 100644 src/main/java/it/tdlight/tdlight/ClientState.java diff --git a/src/main/java/it/tdlight/tdlight/Client.java b/src/main/java/it/tdlight/tdlight/Client.java index ff33a63..1424765 100644 --- a/src/main/java/it/tdlight/tdlight/Client.java +++ b/src/main/java/it/tdlight/tdlight/Client.java @@ -1,21 +1,27 @@ package it.tdlight.tdlight; import it.tdlight.tdlib.NativeClient; +import it.tdlight.tdlib.TdApi; +import it.tdlight.tdlib.TdApi.AuthorizationStateClosed; +import it.tdlight.tdlib.TdApi.AuthorizationStateClosing; +import it.tdlight.tdlib.TdApi.AuthorizationStateWaitTdlibParameters; +import it.tdlight.tdlib.TdApi.GetOption; import it.tdlight.tdlib.TdApi.Object; +import it.tdlight.tdlib.TdApi.SetOption; +import it.tdlight.tdlib.TdApi.UpdateAuthorizationState; +import java.time.Duration; import java.util.Arrays; +import java.util.Collections; import java.util.List; -import java.util.concurrent.locks.ReentrantLock; -import java.util.concurrent.locks.StampedLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; /** * Interface for interaction with TDLib. */ public class Client extends NativeClient implements TelegramClient { - private long clientId; - private final ReentrantLock receiveResponsesLock = new ReentrantLock(); - private final ReentrantLock receiveUpdatesLock = new ReentrantLock(); - private final StampedLock executionLock = new StampedLock(); - private volatile Long stampedLockValue; + + private ClientState state = ClientState.of(false, 0, false, false, false); + private final ReentrantReadWriteLock stateLock = new ReentrantReadWriteLock(); /** * Creates a new TDLib client. @@ -27,96 +33,152 @@ public class Client extends NativeClient implements TelegramClient { throwable.printStackTrace(); System.exit(1); } - this.clientId = createNativeClient(); + this.initializeClient(); } @Override public void send(Request request) { - if (this.executionLock.isWriteLocked()) { - throw new IllegalStateException("ClientActor is destroyed"); + long clientId; + stateLock.readLock().lock(); + try { + requireInitialized(); + requireReadyToSend(request.getFunction().getConstructor()); + clientId = state.getClientId(); + } finally { + stateLock.readLock().unlock(); } - nativeClientSend(this.clientId, request.getId(), request.getFunction()); + nativeClientSend(clientId, request.getId(), request.getFunction()); } - private final long[][] eventIds = new long[8][]; - private final Object[][] events = new Object[8][]; - @Override public List receive(double timeout, int eventsSize, boolean receiveResponses, boolean receiveUpdates) { - return Arrays.asList(this.receive(timeout, eventsSize, receiveResponses, receiveUpdates, false)); + long clientId; + stateLock.readLock().lock(); + try { + if (!state.isInitialized()) { + sleep(timeout); + return Collections.emptyList(); + } + requireInitialized(); + if (!state.isReadyToReceive()) { + sleep(timeout); + return Collections.emptyList(); + } + requireReadyToReceive(); + clientId = state.getClientId(); + } finally { + stateLock.readLock().unlock(); + } + + return Arrays.asList(this.internalReceive(clientId, timeout, eventsSize, receiveResponses, receiveUpdates)); } - private Response[] receive(double timeout, int eventsSize, boolean receiveResponses, boolean receiveUpdates, boolean singleResponse) { - if (this.executionLock.isWriteLocked()) { - throw new IllegalStateException("ClientActor is destroyed"); - } - int group = (singleResponse ? 0b100 : 0b000) | (receiveResponses ? 0b010 : 0b000) | (receiveUpdates ? 0b001 : 0b000); - - if (eventIds[group] == null) { - eventIds[group] = new long[eventsSize]; - } else { - Arrays.fill(eventIds[group], 0); - } - if (events[group] == null) { - events[group] = new Object[eventsSize]; - } else { - Arrays.fill(events[group], null); - } - if (eventIds[group].length != eventsSize || events[group].length != eventsSize) { - throw new IllegalArgumentException("EventSize can't change over time!" - + " Previous: " + eventIds[group].length + " Current: " + eventsSize); - } - - if (receiveResponses && this.receiveResponsesLock.isLocked()) { - throw new IllegalThreadStateException("Thread: " + Thread.currentThread().getName() + " trying receive incoming responses but shouldn't be called simultaneously from two different threads!"); - } - - if (receiveUpdates && this.receiveUpdatesLock.isLocked()) { - throw new IllegalThreadStateException("Thread: " + Thread.currentThread().getName() + " trying receive incoming updates but shouldn't be called simultaneously from two different threads!"); - } - - int resultSize; - if (receiveResponses) this.receiveResponsesLock.lock(); + private void sleep(double timeout) { + long nanos = (long) (timeout * 1000000000d); + int nanosPart = (int) (nanos % 1000000L); + long millis = Duration.ofNanos(nanos - nanosPart).toMillis(); try { - if (receiveUpdates) this.receiveUpdatesLock.lock(); - try { - resultSize = nativeClientReceive(this.clientId, eventIds[group], events[group], timeout, receiveResponses, receiveUpdates); - } finally { - if (receiveUpdates) this.receiveUpdatesLock.unlock(); - } - } finally { - if (receiveResponses) this.receiveResponsesLock.unlock(); + Thread.sleep(millis, nanosPart); + } catch (InterruptedException e) { + throw new RuntimeException(e); } - - Response[] responses = new Response[resultSize]; - - for (int i = 0; i < resultSize; i++) { - responses[i] = new Response(eventIds[group][i], events[group][i]); - } - - return responses; } @Override public Response receive(double timeout, boolean receiveResponses, boolean receiveUpdates) { - if (this.executionLock.isWriteLocked()) { - throw new IllegalStateException("ClientActor is destroyed"); + long clientId; + stateLock.readLock().lock(); + try { + if (!state.isInitialized()) { + sleep(timeout); + return null; + } + requireInitialized(); + if (!state.isReadyToReceive()) { + sleep(timeout); + return null; + } + requireReadyToReceive(); + clientId = state.getClientId(); + } finally { + stateLock.readLock().unlock(); } - Response[] responses = receive(timeout, 1, receiveResponses, receiveUpdates, true); + Response[] responses = this.internalReceive(clientId, timeout, 1, receiveResponses, receiveUpdates); - if (responses.length != 0) { + if (responses.length > 0) { return responses[0]; } return null; } + private Response[] internalReceive(long clientId, double timeout, int eventsSize, boolean receiveResponses, boolean receiveUpdates) { + long[] eventIds = new long[eventsSize]; + TdApi.Object[] events = new TdApi.Object[eventsSize]; + + int resultSize = nativeClientReceive(clientId, eventIds, events, timeout, receiveResponses, receiveUpdates); + + Response[] responses = new Response[resultSize]; + + for (int i = 0; i < resultSize; i++) { + responses[i] = new Response(eventIds[i], events[i]); + if (eventIds[i] == 0) { + handleStateEvent(events[i]); + } + } + + return responses; + } + + private void handleStateEvent(Object event) { + if (event == null) { + return; + } + + if (event.getConstructor() != UpdateAuthorizationState.CONSTRUCTOR) { + return; + } + + UpdateAuthorizationState updateAuthorizationState = (UpdateAuthorizationState) event; + + switch (updateAuthorizationState.authorizationState.getConstructor()) { + case AuthorizationStateWaitTdlibParameters.CONSTRUCTOR: + stateLock.writeLock().lock(); + try { + state.setReadyToSend(true); + } finally { + stateLock.writeLock().unlock(); + } + break; + case AuthorizationStateClosing.CONSTRUCTOR: + stateLock.writeLock().lock(); + try { + state.setReadyToSend(false); + } finally { + stateLock.writeLock().unlock(); + } + break; + case AuthorizationStateClosed.CONSTRUCTOR: + stateLock.writeLock().lock(); + try { + state.setReadyToSend(false).setReadyToReceive(false); + } finally { + stateLock.writeLock().unlock(); + } + break; + } + } + @Override public Response execute(Request request) { - if (this.executionLock.isWriteLocked()) { - throw new IllegalStateException("ClientActor is destroyed"); + stateLock.readLock().lock(); + try { + requireInitialized(); + requireReadyToSend(request.getFunction().getConstructor()); + } finally { + stateLock.readLock().unlock(); } Object object = nativeClientExecute(request.getFunction()); @@ -125,14 +187,54 @@ public class Client extends NativeClient implements TelegramClient { @Override public void destroyClient() { - stampedLockValue = this.executionLock.tryWriteLock(); - destroyNativeClient(this.clientId); + stateLock.writeLock().lock(); + try { + if (state.isInitialized() && state.hasClientId()) { + if (state.isReadyToSend() || state.isReadyToReceive()) { + throw new IllegalStateException("You need to close the Client before destroying it!"); + } + destroyNativeClient(this.state.getClientId()); + state = ClientState.of(false, 0, false, false, false); + } + } finally { + stateLock.writeLock().unlock(); + } } @Override public void initializeClient() { - this.executionLock.tryUnlockWrite(); - stampedLockValue = null; - this.clientId = createNativeClient(); + stateLock.writeLock().lock(); + try { + if (!state.isInitialized() && !state.hasClientId()) { + long clientId = createNativeClient(); + state = ClientState.of(true, clientId, true, true, false); + } + } finally { + stateLock.writeLock().unlock(); + } + } + + private void requireInitialized() { + if (!state.isInitialized() || !state.hasClientId()) { + throw new IllegalStateException("Client not initialized"); + } + } + + private void requireReadyToSend(int constructor) { + if (!state.isReadyToSend()) { + switch (constructor) { + case SetOption.CONSTRUCTOR: + case GetOption.CONSTRUCTOR: + case TdApi.SetTdlibParameters.CONSTRUCTOR: + return; + } + throw new IllegalStateException("Client not ready to send"); + } + } + + private void requireReadyToReceive() { + if (!state.isReadyToReceive()) { + throw new IllegalStateException("Client not ready to receive"); + } } } diff --git a/src/main/java/it/tdlight/tdlight/ClientState.java b/src/main/java/it/tdlight/tdlight/ClientState.java new file mode 100644 index 0000000..0446f63 --- /dev/null +++ b/src/main/java/it/tdlight/tdlight/ClientState.java @@ -0,0 +1,116 @@ +package it.tdlight.tdlight; + +import java.util.StringJoiner; + +public class ClientState { + + private boolean hasClientId; + private long clientId; + private boolean initialized; + private boolean readyToReceive; + private boolean readyToSend; + + private ClientState(boolean hasClientId, long clientId, boolean initialized, boolean readyToReceive, boolean readyToSend) { + this.hasClientId = hasClientId; + this.clientId = clientId; + this.initialized = initialized; + this.readyToReceive = readyToReceive; + this.readyToSend = readyToSend; + } + + public static ClientState of(boolean hasClientId, long clientId, boolean initialized, boolean readyToReceive, boolean readyToSend) { + return new ClientState(hasClientId, clientId, initialized, readyToReceive, readyToSend); + } + + public boolean hasClientId() { + return hasClientId; + } + + public long getClientId() { + return clientId; + } + + public boolean isInitialized() { + return initialized; + } + + public boolean isReadyToReceive() { + return readyToReceive; + } + + public boolean isReadyToSend() { + return readyToSend; + } + + public ClientState setHasClientId(boolean hasClientId) { + this.hasClientId = hasClientId; + return this; + } + + public ClientState setClientId(long clientId) { + this.clientId = clientId; + return this; + } + + public ClientState setInitialized(boolean initialized) { + this.initialized = initialized; + return this; + } + + public ClientState setReadyToReceive(boolean readyToReceive) { + this.readyToReceive = readyToReceive; + return this; + } + + public ClientState setReadyToSend(boolean readyToSend) { + this.readyToSend = readyToSend; + return this; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + ClientState that = (ClientState) o; + + if (hasClientId != that.hasClientId) { + return false; + } + if (clientId != that.clientId) { + return false; + } + if (initialized != that.initialized) { + return false; + } + if (readyToReceive != that.readyToReceive) { + return false; + } + return readyToSend == that.readyToSend; + } + + @Override + public int hashCode() { + int result = (hasClientId ? 1 : 0); + result = 31 * result + (int) (clientId ^ (clientId >>> 32)); + result = 31 * result + (initialized ? 1 : 0); + result = 31 * result + (readyToReceive ? 1 : 0); + result = 31 * result + (readyToSend ? 1 : 0); + return result; + } + + @Override + public String toString() { + return new StringJoiner(", ", ClientState.class.getSimpleName() + "[", "]") + .add("hasClientId=" + hasClientId) + .add("clientId=" + clientId) + .add("initialized=" + initialized) + .add("readyToReceive=" + readyToReceive) + .add("readyToSend=" + readyToSend) + .toString(); + } +}