Thread-safe client
This commit is contained in:
parent
a8800af2b6
commit
d477aeffe3
@ -1,21 +1,27 @@
|
|||||||
package it.tdlight.tdlight;
|
package it.tdlight.tdlight;
|
||||||
|
|
||||||
import it.tdlight.tdlib.NativeClient;
|
import it.tdlight.tdlib.NativeClient;
|
||||||
|
import it.tdlight.tdlib.TdApi;
|
||||||
|
import it.tdlight.tdlib.TdApi.AuthorizationStateClosed;
|
||||||
|
import it.tdlight.tdlib.TdApi.AuthorizationStateClosing;
|
||||||
|
import it.tdlight.tdlib.TdApi.AuthorizationStateWaitTdlibParameters;
|
||||||
|
import it.tdlight.tdlib.TdApi.GetOption;
|
||||||
import it.tdlight.tdlib.TdApi.Object;
|
import it.tdlight.tdlib.TdApi.Object;
|
||||||
|
import it.tdlight.tdlib.TdApi.SetOption;
|
||||||
|
import it.tdlight.tdlib.TdApi.UpdateAuthorizationState;
|
||||||
|
import java.time.Duration;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.locks.ReentrantLock;
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
import java.util.concurrent.locks.StampedLock;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Interface for interaction with TDLib.
|
* Interface for interaction with TDLib.
|
||||||
*/
|
*/
|
||||||
public class Client extends NativeClient implements TelegramClient {
|
public class Client extends NativeClient implements TelegramClient {
|
||||||
private long clientId;
|
|
||||||
private final ReentrantLock receiveResponsesLock = new ReentrantLock();
|
private ClientState state = ClientState.of(false, 0, false, false, false);
|
||||||
private final ReentrantLock receiveUpdatesLock = new ReentrantLock();
|
private final ReentrantReadWriteLock stateLock = new ReentrantReadWriteLock();
|
||||||
private final StampedLock executionLock = new StampedLock();
|
|
||||||
private volatile Long stampedLockValue;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a new TDLib client.
|
* Creates a new TDLib client.
|
||||||
@ -27,96 +33,152 @@ public class Client extends NativeClient implements TelegramClient {
|
|||||||
throwable.printStackTrace();
|
throwable.printStackTrace();
|
||||||
System.exit(1);
|
System.exit(1);
|
||||||
}
|
}
|
||||||
this.clientId = createNativeClient();
|
this.initializeClient();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void send(Request request) {
|
public void send(Request request) {
|
||||||
if (this.executionLock.isWriteLocked()) {
|
long clientId;
|
||||||
throw new IllegalStateException("ClientActor is destroyed");
|
stateLock.readLock().lock();
|
||||||
|
try {
|
||||||
|
requireInitialized();
|
||||||
|
requireReadyToSend(request.getFunction().getConstructor());
|
||||||
|
clientId = state.getClientId();
|
||||||
|
} finally {
|
||||||
|
stateLock.readLock().unlock();
|
||||||
}
|
}
|
||||||
|
|
||||||
nativeClientSend(this.clientId, request.getId(), request.getFunction());
|
nativeClientSend(clientId, request.getId(), request.getFunction());
|
||||||
}
|
}
|
||||||
|
|
||||||
private final long[][] eventIds = new long[8][];
|
|
||||||
private final Object[][] events = new Object[8][];
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<Response> receive(double timeout, int eventsSize, boolean receiveResponses, boolean receiveUpdates) {
|
public List<Response> receive(double timeout, int eventsSize, boolean receiveResponses, boolean receiveUpdates) {
|
||||||
return Arrays.asList(this.receive(timeout, eventsSize, receiveResponses, receiveUpdates, false));
|
long clientId;
|
||||||
|
stateLock.readLock().lock();
|
||||||
|
try {
|
||||||
|
if (!state.isInitialized()) {
|
||||||
|
sleep(timeout);
|
||||||
|
return Collections.emptyList();
|
||||||
|
}
|
||||||
|
requireInitialized();
|
||||||
|
if (!state.isReadyToReceive()) {
|
||||||
|
sleep(timeout);
|
||||||
|
return Collections.emptyList();
|
||||||
|
}
|
||||||
|
requireReadyToReceive();
|
||||||
|
clientId = state.getClientId();
|
||||||
|
} finally {
|
||||||
|
stateLock.readLock().unlock();
|
||||||
|
}
|
||||||
|
|
||||||
|
return Arrays.asList(this.internalReceive(clientId, timeout, eventsSize, receiveResponses, receiveUpdates));
|
||||||
}
|
}
|
||||||
|
|
||||||
private Response[] receive(double timeout, int eventsSize, boolean receiveResponses, boolean receiveUpdates, boolean singleResponse) {
|
private void sleep(double timeout) {
|
||||||
if (this.executionLock.isWriteLocked()) {
|
long nanos = (long) (timeout * 1000000000d);
|
||||||
throw new IllegalStateException("ClientActor is destroyed");
|
int nanosPart = (int) (nanos % 1000000L);
|
||||||
}
|
long millis = Duration.ofNanos(nanos - nanosPart).toMillis();
|
||||||
int group = (singleResponse ? 0b100 : 0b000) | (receiveResponses ? 0b010 : 0b000) | (receiveUpdates ? 0b001 : 0b000);
|
|
||||||
|
|
||||||
if (eventIds[group] == null) {
|
|
||||||
eventIds[group] = new long[eventsSize];
|
|
||||||
} else {
|
|
||||||
Arrays.fill(eventIds[group], 0);
|
|
||||||
}
|
|
||||||
if (events[group] == null) {
|
|
||||||
events[group] = new Object[eventsSize];
|
|
||||||
} else {
|
|
||||||
Arrays.fill(events[group], null);
|
|
||||||
}
|
|
||||||
if (eventIds[group].length != eventsSize || events[group].length != eventsSize) {
|
|
||||||
throw new IllegalArgumentException("EventSize can't change over time!"
|
|
||||||
+ " Previous: " + eventIds[group].length + " Current: " + eventsSize);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (receiveResponses && this.receiveResponsesLock.isLocked()) {
|
|
||||||
throw new IllegalThreadStateException("Thread: " + Thread.currentThread().getName() + " trying receive incoming responses but shouldn't be called simultaneously from two different threads!");
|
|
||||||
}
|
|
||||||
|
|
||||||
if (receiveUpdates && this.receiveUpdatesLock.isLocked()) {
|
|
||||||
throw new IllegalThreadStateException("Thread: " + Thread.currentThread().getName() + " trying receive incoming updates but shouldn't be called simultaneously from two different threads!");
|
|
||||||
}
|
|
||||||
|
|
||||||
int resultSize;
|
|
||||||
if (receiveResponses) this.receiveResponsesLock.lock();
|
|
||||||
try {
|
try {
|
||||||
if (receiveUpdates) this.receiveUpdatesLock.lock();
|
Thread.sleep(millis, nanosPart);
|
||||||
try {
|
} catch (InterruptedException e) {
|
||||||
resultSize = nativeClientReceive(this.clientId, eventIds[group], events[group], timeout, receiveResponses, receiveUpdates);
|
throw new RuntimeException(e);
|
||||||
} finally {
|
|
||||||
if (receiveUpdates) this.receiveUpdatesLock.unlock();
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
if (receiveResponses) this.receiveResponsesLock.unlock();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Response[] responses = new Response[resultSize];
|
|
||||||
|
|
||||||
for (int i = 0; i < resultSize; i++) {
|
|
||||||
responses[i] = new Response(eventIds[group][i], events[group][i]);
|
|
||||||
}
|
|
||||||
|
|
||||||
return responses;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Response receive(double timeout, boolean receiveResponses, boolean receiveUpdates) {
|
public Response receive(double timeout, boolean receiveResponses, boolean receiveUpdates) {
|
||||||
if (this.executionLock.isWriteLocked()) {
|
long clientId;
|
||||||
throw new IllegalStateException("ClientActor is destroyed");
|
stateLock.readLock().lock();
|
||||||
|
try {
|
||||||
|
if (!state.isInitialized()) {
|
||||||
|
sleep(timeout);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
requireInitialized();
|
||||||
|
if (!state.isReadyToReceive()) {
|
||||||
|
sleep(timeout);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
requireReadyToReceive();
|
||||||
|
clientId = state.getClientId();
|
||||||
|
} finally {
|
||||||
|
stateLock.readLock().unlock();
|
||||||
}
|
}
|
||||||
|
|
||||||
Response[] responses = receive(timeout, 1, receiveResponses, receiveUpdates, true);
|
Response[] responses = this.internalReceive(clientId, timeout, 1, receiveResponses, receiveUpdates);
|
||||||
|
|
||||||
if (responses.length != 0) {
|
if (responses.length > 0) {
|
||||||
return responses[0];
|
return responses[0];
|
||||||
}
|
}
|
||||||
|
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private Response[] internalReceive(long clientId, double timeout, int eventsSize, boolean receiveResponses, boolean receiveUpdates) {
|
||||||
|
long[] eventIds = new long[eventsSize];
|
||||||
|
TdApi.Object[] events = new TdApi.Object[eventsSize];
|
||||||
|
|
||||||
|
int resultSize = nativeClientReceive(clientId, eventIds, events, timeout, receiveResponses, receiveUpdates);
|
||||||
|
|
||||||
|
Response[] responses = new Response[resultSize];
|
||||||
|
|
||||||
|
for (int i = 0; i < resultSize; i++) {
|
||||||
|
responses[i] = new Response(eventIds[i], events[i]);
|
||||||
|
if (eventIds[i] == 0) {
|
||||||
|
handleStateEvent(events[i]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return responses;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void handleStateEvent(Object event) {
|
||||||
|
if (event == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (event.getConstructor() != UpdateAuthorizationState.CONSTRUCTOR) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
UpdateAuthorizationState updateAuthorizationState = (UpdateAuthorizationState) event;
|
||||||
|
|
||||||
|
switch (updateAuthorizationState.authorizationState.getConstructor()) {
|
||||||
|
case AuthorizationStateWaitTdlibParameters.CONSTRUCTOR:
|
||||||
|
stateLock.writeLock().lock();
|
||||||
|
try {
|
||||||
|
state.setReadyToSend(true);
|
||||||
|
} finally {
|
||||||
|
stateLock.writeLock().unlock();
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
case AuthorizationStateClosing.CONSTRUCTOR:
|
||||||
|
stateLock.writeLock().lock();
|
||||||
|
try {
|
||||||
|
state.setReadyToSend(false);
|
||||||
|
} finally {
|
||||||
|
stateLock.writeLock().unlock();
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
case AuthorizationStateClosed.CONSTRUCTOR:
|
||||||
|
stateLock.writeLock().lock();
|
||||||
|
try {
|
||||||
|
state.setReadyToSend(false).setReadyToReceive(false);
|
||||||
|
} finally {
|
||||||
|
stateLock.writeLock().unlock();
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Response execute(Request request) {
|
public Response execute(Request request) {
|
||||||
if (this.executionLock.isWriteLocked()) {
|
stateLock.readLock().lock();
|
||||||
throw new IllegalStateException("ClientActor is destroyed");
|
try {
|
||||||
|
requireInitialized();
|
||||||
|
requireReadyToSend(request.getFunction().getConstructor());
|
||||||
|
} finally {
|
||||||
|
stateLock.readLock().unlock();
|
||||||
}
|
}
|
||||||
|
|
||||||
Object object = nativeClientExecute(request.getFunction());
|
Object object = nativeClientExecute(request.getFunction());
|
||||||
@ -125,14 +187,54 @@ public class Client extends NativeClient implements TelegramClient {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void destroyClient() {
|
public void destroyClient() {
|
||||||
stampedLockValue = this.executionLock.tryWriteLock();
|
stateLock.writeLock().lock();
|
||||||
destroyNativeClient(this.clientId);
|
try {
|
||||||
|
if (state.isInitialized() && state.hasClientId()) {
|
||||||
|
if (state.isReadyToSend() || state.isReadyToReceive()) {
|
||||||
|
throw new IllegalStateException("You need to close the Client before destroying it!");
|
||||||
|
}
|
||||||
|
destroyNativeClient(this.state.getClientId());
|
||||||
|
state = ClientState.of(false, 0, false, false, false);
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
stateLock.writeLock().unlock();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void initializeClient() {
|
public void initializeClient() {
|
||||||
this.executionLock.tryUnlockWrite();
|
stateLock.writeLock().lock();
|
||||||
stampedLockValue = null;
|
try {
|
||||||
this.clientId = createNativeClient();
|
if (!state.isInitialized() && !state.hasClientId()) {
|
||||||
|
long clientId = createNativeClient();
|
||||||
|
state = ClientState.of(true, clientId, true, true, false);
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
stateLock.writeLock().unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void requireInitialized() {
|
||||||
|
if (!state.isInitialized() || !state.hasClientId()) {
|
||||||
|
throw new IllegalStateException("Client not initialized");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void requireReadyToSend(int constructor) {
|
||||||
|
if (!state.isReadyToSend()) {
|
||||||
|
switch (constructor) {
|
||||||
|
case SetOption.CONSTRUCTOR:
|
||||||
|
case GetOption.CONSTRUCTOR:
|
||||||
|
case TdApi.SetTdlibParameters.CONSTRUCTOR:
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
throw new IllegalStateException("Client not ready to send");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void requireReadyToReceive() {
|
||||||
|
if (!state.isReadyToReceive()) {
|
||||||
|
throw new IllegalStateException("Client not ready to receive");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
116
src/main/java/it/tdlight/tdlight/ClientState.java
Normal file
116
src/main/java/it/tdlight/tdlight/ClientState.java
Normal file
@ -0,0 +1,116 @@
|
|||||||
|
package it.tdlight.tdlight;
|
||||||
|
|
||||||
|
import java.util.StringJoiner;
|
||||||
|
|
||||||
|
public class ClientState {
|
||||||
|
|
||||||
|
private boolean hasClientId;
|
||||||
|
private long clientId;
|
||||||
|
private boolean initialized;
|
||||||
|
private boolean readyToReceive;
|
||||||
|
private boolean readyToSend;
|
||||||
|
|
||||||
|
private ClientState(boolean hasClientId, long clientId, boolean initialized, boolean readyToReceive, boolean readyToSend) {
|
||||||
|
this.hasClientId = hasClientId;
|
||||||
|
this.clientId = clientId;
|
||||||
|
this.initialized = initialized;
|
||||||
|
this.readyToReceive = readyToReceive;
|
||||||
|
this.readyToSend = readyToSend;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static ClientState of(boolean hasClientId, long clientId, boolean initialized, boolean readyToReceive, boolean readyToSend) {
|
||||||
|
return new ClientState(hasClientId, clientId, initialized, readyToReceive, readyToSend);
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean hasClientId() {
|
||||||
|
return hasClientId;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getClientId() {
|
||||||
|
return clientId;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isInitialized() {
|
||||||
|
return initialized;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isReadyToReceive() {
|
||||||
|
return readyToReceive;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isReadyToSend() {
|
||||||
|
return readyToSend;
|
||||||
|
}
|
||||||
|
|
||||||
|
public ClientState setHasClientId(boolean hasClientId) {
|
||||||
|
this.hasClientId = hasClientId;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public ClientState setClientId(long clientId) {
|
||||||
|
this.clientId = clientId;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public ClientState setInitialized(boolean initialized) {
|
||||||
|
this.initialized = initialized;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public ClientState setReadyToReceive(boolean readyToReceive) {
|
||||||
|
this.readyToReceive = readyToReceive;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public ClientState setReadyToSend(boolean readyToSend) {
|
||||||
|
this.readyToSend = readyToSend;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean equals(Object o) {
|
||||||
|
if (this == o) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
if (o == null || getClass() != o.getClass()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
ClientState that = (ClientState) o;
|
||||||
|
|
||||||
|
if (hasClientId != that.hasClientId) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (clientId != that.clientId) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (initialized != that.initialized) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (readyToReceive != that.readyToReceive) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return readyToSend == that.readyToSend;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int hashCode() {
|
||||||
|
int result = (hasClientId ? 1 : 0);
|
||||||
|
result = 31 * result + (int) (clientId ^ (clientId >>> 32));
|
||||||
|
result = 31 * result + (initialized ? 1 : 0);
|
||||||
|
result = 31 * result + (readyToReceive ? 1 : 0);
|
||||||
|
result = 31 * result + (readyToSend ? 1 : 0);
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return new StringJoiner(", ", ClientState.class.getSimpleName() + "[", "]")
|
||||||
|
.add("hasClientId=" + hasClientId)
|
||||||
|
.add("clientId=" + clientId)
|
||||||
|
.add("initialized=" + initialized)
|
||||||
|
.add("readyToReceive=" + readyToReceive)
|
||||||
|
.add("readyToSend=" + readyToSend)
|
||||||
|
.toString();
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user