Levlam race condition fix
This commit is contained in:
parent
ddad489bc0
commit
69e0b7d2c0
@ -7,11 +7,9 @@ import it.unimi.dsi.fastutil.longs.LongArrayList;
|
|||||||
import it.unimi.dsi.fastutil.objects.ObjectArrayList;
|
import it.unimi.dsi.fastutil.objects.ObjectArrayList;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
|
||||||
|
|
||||||
public class InternalClient implements ClientEventsHandler, TelegramClient {
|
public class InternalClient implements ClientEventsHandler, TelegramClient {
|
||||||
|
|
||||||
static final ReentrantReadWriteLock clientInitializationLock = new ReentrantReadWriteLock(true);
|
|
||||||
private final ConcurrentHashMap<Long, Handler> handlers = new ConcurrentHashMap<Long, Handler>();
|
private final ConcurrentHashMap<Long, Handler> handlers = new ConcurrentHashMap<Long, Handler>();
|
||||||
|
|
||||||
private final int clientId;
|
private final int clientId;
|
||||||
@ -26,36 +24,26 @@ public class InternalClient implements ClientEventsHandler, TelegramClient {
|
|||||||
ResultHandler updateHandler,
|
ResultHandler updateHandler,
|
||||||
ExceptionHandler updateExceptionHandler,
|
ExceptionHandler updateExceptionHandler,
|
||||||
ExceptionHandler defaultExceptionHandler) {
|
ExceptionHandler defaultExceptionHandler) {
|
||||||
clientInitializationLock.writeLock().lock();
|
this.updateHandler = new Handler(updateHandler, updateExceptionHandler);
|
||||||
try {
|
this.updatesHandler = null;
|
||||||
this.updateHandler = new Handler(updateHandler, updateExceptionHandler);
|
this.defaultExceptionHandler = defaultExceptionHandler;
|
||||||
this.updatesHandler = null;
|
this.clientManager = clientManager;
|
||||||
this.defaultExceptionHandler = defaultExceptionHandler;
|
this.clientId = NativeClientAccess.create();
|
||||||
this.clientManager = clientManager;
|
|
||||||
this.clientId = NativeClientAccess.create();
|
|
||||||
|
|
||||||
clientManager.registerClient(clientId, this);
|
clientManager.registerClient(clientId, this);
|
||||||
} finally {
|
|
||||||
clientInitializationLock.writeLock().unlock();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public InternalClient(InternalClientManager clientManager,
|
public InternalClient(InternalClientManager clientManager,
|
||||||
UpdatesHandler updatesHandler,
|
UpdatesHandler updatesHandler,
|
||||||
ExceptionHandler updateExceptionHandler,
|
ExceptionHandler updateExceptionHandler,
|
||||||
ExceptionHandler defaultExceptionHandler) {
|
ExceptionHandler defaultExceptionHandler) {
|
||||||
clientInitializationLock.writeLock().lock();
|
this.updateHandler = null;
|
||||||
try {
|
this.updatesHandler = new MultiHandler(updatesHandler, updateExceptionHandler);
|
||||||
this.updateHandler = null;
|
this.clientManager = clientManager;
|
||||||
this.updatesHandler = new MultiHandler(updatesHandler, updateExceptionHandler);
|
this.defaultExceptionHandler = defaultExceptionHandler;
|
||||||
this.clientManager = clientManager;
|
this.clientId = NativeClientAccess.create();
|
||||||
this.defaultExceptionHandler = defaultExceptionHandler;
|
|
||||||
this.clientId = NativeClientAccess.create();
|
|
||||||
|
|
||||||
clientManager.registerClient(clientId, this);
|
clientManager.registerClient(clientId, this);
|
||||||
} finally {
|
|
||||||
clientInitializationLock.writeLock().unlock();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -1,5 +1,6 @@
|
|||||||
package it.tdlight.common;
|
package it.tdlight.common;
|
||||||
|
|
||||||
|
import it.tdlight.jni.TdApi;
|
||||||
import it.tdlight.jni.TdApi.Object;
|
import it.tdlight.jni.TdApi.Object;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
@ -43,11 +44,14 @@ public class InternalClientManager implements AutoCloseable {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public void registerClient(int clientId, InternalClient internalClient) {
|
public void registerClient(int clientId, InternalClient internalClient) {
|
||||||
responseReceiver.registerClient();
|
responseReceiver.registerClient(clientId);
|
||||||
boolean replaced = registeredClientEventHandlers.put(clientId, internalClient) != null;
|
boolean replaced = registeredClientEventHandlers.put(clientId, internalClient) != null;
|
||||||
if (replaced) {
|
if (replaced) {
|
||||||
throw new IllegalStateException("Client " + clientId + " already registered");
|
throw new IllegalStateException("Client " + clientId + " already registered");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Send a dummy request because @levlam is too lazy to fix race conditions in a better way
|
||||||
|
internalClient.send(new TdApi.GetAuthorizationState(), null, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getImplementationName() {
|
public String getImplementationName() {
|
||||||
|
@ -1,16 +1,15 @@
|
|||||||
package it.tdlight.common;
|
package it.tdlight.common;
|
||||||
|
|
||||||
import static it.tdlight.common.InternalClient.clientInitializationLock;
|
|
||||||
|
|
||||||
import it.tdlight.common.utils.IntSwapper;
|
import it.tdlight.common.utils.IntSwapper;
|
||||||
import it.tdlight.jni.TdApi;
|
import it.tdlight.jni.TdApi;
|
||||||
import it.tdlight.jni.TdApi.Object;
|
import it.tdlight.jni.TdApi.Object;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.ConcurrentSkipListSet;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
public class ResponseReceiver extends Thread implements AutoCloseable {
|
public class ResponseReceiver extends Thread implements AutoCloseable {
|
||||||
@ -33,7 +32,7 @@ public class ResponseReceiver extends Thread implements AutoCloseable {
|
|||||||
private final TdApi.Object[] events = new TdApi.Object[MAX_EVENTS];
|
private final TdApi.Object[] events = new TdApi.Object[MAX_EVENTS];
|
||||||
|
|
||||||
private final CountDownLatch closeWait = new CountDownLatch(1);
|
private final CountDownLatch closeWait = new CountDownLatch(1);
|
||||||
private AtomicInteger registeredClients = new AtomicInteger(0);
|
private final Set<Integer> registeredClients = new ConcurrentSkipListSet<>();
|
||||||
private volatile boolean closeRequested = false;
|
private volatile boolean closeRequested = false;
|
||||||
|
|
||||||
|
|
||||||
@ -46,24 +45,19 @@ public class ResponseReceiver extends Thread implements AutoCloseable {
|
|||||||
this.start();
|
this.start();
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings({"UnnecessaryLocalVariable", "InfiniteLoopStatement"})
|
@SuppressWarnings({"UnnecessaryLocalVariable"})
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
int[] sortIndex;
|
int[] sortIndex;
|
||||||
try {
|
try {
|
||||||
while(!closeRequested || registeredClients.get() > 0) {
|
while(!closeRequested || !registeredClients.isEmpty()) {
|
||||||
int resultsCount;
|
int resultsCount;
|
||||||
clientInitializationLock.readLock().lock();
|
resultsCount = NativeClientAccess.receive(clientIds, eventIds, events, 2.0 /*seconds*/);
|
||||||
try {
|
|
||||||
resultsCount = NativeClientAccess.receive(clientIds, eventIds, events, 2.0 /*seconds*/);
|
|
||||||
} finally {
|
|
||||||
clientInitializationLock.readLock().unlock();
|
|
||||||
}
|
|
||||||
|
|
||||||
if (resultsCount <= 0)
|
if (resultsCount <= 0)
|
||||||
continue;
|
continue;
|
||||||
|
|
||||||
int closedClients = 0;
|
Set<Integer> closedClients = new HashSet<>();
|
||||||
|
|
||||||
if (USE_OPTIMIZED_DISPATCHER) {
|
if (USE_OPTIMIZED_DISPATCHER) {
|
||||||
// Generate a list of indices sorted by client id, from 0 to resultsCount
|
// Generate a list of indices sorted by client id, from 0 to resultsCount
|
||||||
@ -87,7 +81,7 @@ public class ResponseReceiver extends Thread implements AutoCloseable {
|
|||||||
TdApi.AuthorizationState authorizationState = ((TdApi.UpdateAuthorizationState) clientEvents[j]).authorizationState;
|
TdApi.AuthorizationState authorizationState = ((TdApi.UpdateAuthorizationState) clientEvents[j]).authorizationState;
|
||||||
if (authorizationState instanceof TdApi.AuthorizationStateClosed) {
|
if (authorizationState instanceof TdApi.AuthorizationStateClosed) {
|
||||||
lastClientClosed = true;
|
lastClientClosed = true;
|
||||||
closedClients++;
|
closedClients.add(clientId);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -139,7 +133,7 @@ public class ResponseReceiver extends Thread implements AutoCloseable {
|
|||||||
TdApi.AuthorizationState authorizationState = ((TdApi.UpdateAuthorizationState) e.event).authorizationState;
|
TdApi.AuthorizationState authorizationState = ((TdApi.UpdateAuthorizationState) e.event).authorizationState;
|
||||||
if (authorizationState instanceof TdApi.AuthorizationStateClosed) {
|
if (authorizationState instanceof TdApi.AuthorizationStateClosed) {
|
||||||
closed = true;
|
closed = true;
|
||||||
closedClients++;
|
closedClients.add(clientId);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -151,8 +145,8 @@ public class ResponseReceiver extends Thread implements AutoCloseable {
|
|||||||
Arrays.fill(eventIds, 0);
|
Arrays.fill(eventIds, 0);
|
||||||
Arrays.fill(events, null);
|
Arrays.fill(events, null);
|
||||||
|
|
||||||
if (closedClients > 0) {
|
if (!closedClients.isEmpty()) {
|
||||||
this.registeredClients.addAndGet(-closedClients);
|
this.registeredClients.addAll(closedClients);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
@ -169,8 +163,8 @@ public class ResponseReceiver extends Thread implements AutoCloseable {
|
|||||||
return sortedIndices;
|
return sortedIndices;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void registerClient() {
|
public void registerClient(int clientId) {
|
||||||
registeredClients.incrementAndGet();
|
registeredClients.add(clientId);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
Loading…
Reference in New Issue
Block a user