Fix race condition

This commit is contained in:
Andrea Cavalli 2020-10-13 18:33:06 +02:00
parent 2647bd0d70
commit fe8bfd17e0
3 changed files with 90 additions and 46 deletions

View File

@ -10,7 +10,7 @@ import java.util.concurrent.ConcurrentHashMap;
public class InternalClient implements ClientEventsHandler, TelegramClient {
private static final java.lang.Object CLIENT_CREATION_LOCK = new java.lang.Object();
static final java.lang.Object CLIENT_CREATION_LOCK = new java.lang.Object();
private final ConcurrentHashMap<Long, Handler> handlers = new ConcurrentHashMap<Long, Handler>();
private final int clientId;
@ -30,9 +30,8 @@ public class InternalClient implements ClientEventsHandler, TelegramClient {
this.updatesHandler = null;
this.defaultExceptionHandler = defaultExceptionHandler;
this.clientManager = clientManager;
clientManager.preregisterClient(this);
this.clientId = NativeClientAccess.create();
clientManager.registerClient(clientId, this);
}
}
@ -46,9 +45,8 @@ public class InternalClient implements ClientEventsHandler, TelegramClient {
this.updatesHandler = new MultiHandler(updatesHandler, updateExceptionHandler);
this.clientManager = clientManager;
this.defaultExceptionHandler = defaultExceptionHandler;
clientManager.preregisterClient(this);
this.clientId = NativeClientAccess.create();
clientManager.registerClient(clientId, this);
}
}

View File

@ -12,7 +12,6 @@ public class InternalClientManager implements AutoCloseable {
private final String implementationName;
private final ResponseReceiver responseReceiver = new ResponseReceiver(this::handleClientEvents);
private final ConcurrentHashMap<Integer, ClientEventsHandler> registeredClientEventHandlers = new ConcurrentHashMap<>();
private final ConcurrentHashMap<ClientEventsHandler, java.lang.Object> unregisteredClientEventHandlers = new ConcurrentHashMap<>();
private final AtomicLong currentQueryId = new AtomicLong();
private InternalClientManager(String implementationName) {
@ -32,15 +31,6 @@ public class InternalClientManager implements AutoCloseable {
private void handleClientEvents(int clientId, boolean isClosed, long[] clientEventIds, Object[] clientEvents) {
ClientEventsHandler handler = registeredClientEventHandlers.get(clientId);
if (handler == null) {
handler = unregisteredClientEventHandlers
.keySet()
.stream()
.filter(item -> item.getClientId() == clientId)
.findAny()
.orElse(null);
}
if (handler != null) {
handler.handleEvents(isClosed, clientEventIds, clientEvents);
} else {
@ -48,17 +38,17 @@ public class InternalClientManager implements AutoCloseable {
}
if (isClosed) {
System.err.println("Unregister client " + clientId);
registeredClientEventHandlers.remove(clientId);
}
}
public void preregisterClient(ClientEventsHandler client) {
this.unregisteredClientEventHandlers.put(client, new java.lang.Object());
}
public void registerClient(int clientId, InternalClient internalClient) {
registeredClientEventHandlers.put(clientId, internalClient);
unregisteredClientEventHandlers.remove(internalClient);
System.err.println("Register client " + clientId + ", " + internalClient);
boolean replaced = registeredClientEventHandlers.put(clientId, internalClient) != null;
if (replaced) {
throw new IllegalStateException("Client " + clientId + " already registered");
}
}
public String getImplementationName() {

View File

@ -1,11 +1,22 @@
package it.tdlight.common;
import static it.tdlight.common.InternalClient.CLIENT_CREATION_LOCK;
import it.tdlight.jni.TdApi;
import it.tdlight.jni.TdApi.Object;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.stream.Collectors;
public class ResponseReceiver extends Thread implements AutoCloseable {
private static final boolean USE_OPTIMIZED_DISPATCHER = Boolean.parseBoolean(System.getProperty(
"tdlight.dispatcher.use_optimized_dispatcher",
"true"
));
private static final int MAX_EVENTS = 1000;
private static final int[] originalSortingSource = new int[MAX_EVENTS];
static {
@ -37,11 +48,15 @@ public class ResponseReceiver extends Thread implements AutoCloseable {
int[] sortIndex;
try {
while(true) {
int resultsCount = NativeClientAccess.receive(clientIds, eventIds, events, 2.0 /*seconds*/);
int resultsCount;
synchronized (CLIENT_CREATION_LOCK) {
resultsCount = NativeClientAccess.receive(clientIds, eventIds, events, 2.0 /*seconds*/);
}
if (resultsCount <= 0)
continue;
if (USE_OPTIMIZED_DISPATCHER) {
// Generate a list of indices sorted by client id, from 0 to resultsCount
sortIndex = generateSortIndex(0, resultsCount, clientIds);
@ -81,6 +96,47 @@ public class ResponseReceiver extends Thread implements AutoCloseable {
lastClientIdEventsCount++;
}
}
} else {
class Event {
public final int clientId;
public final long eventId;
public final Object event;
public Event(int clientId, long eventId, Object event) {
this.clientId = clientId;
this.eventId = eventId;
this.event = event;
}
}
List<Event> eventsList = new ArrayList<>(resultsCount);
for (int i = 0; i < resultsCount; i++) {
eventsList.add(new Event(clientIds[i], eventIds[i], events[i]));
}
Set<Integer> clientIds = eventsList.stream().map(e -> e.clientId).collect(Collectors.toSet());
for (int clientId : clientIds) {
List<Event> clientEventsList = eventsList.stream().filter(e -> e.clientId == clientId).collect(Collectors.toList());
long[] clientEventIds = new long[clientEventsList.size()];
Object[] clientEvents = new Object[clientEventsList.size()];
boolean closed = false;
for (int i = 0; i < clientEventsList.size(); i++) {
Event e = clientEventsList.get(i);
clientEventIds[i] = e.eventId;
clientEvents[i] = e.event;
if (e.eventId == 0 && e.event instanceof TdApi.UpdateAuthorizationState) {
TdApi.AuthorizationState authorizationState = ((TdApi.UpdateAuthorizationState) e.event).authorizationState;
if (authorizationState instanceof TdApi.AuthorizationStateClosed) {
closed = true;
}
}
}
eventsHandler.handleClientEvents(clientId, closed, clientEventIds, clientEvents);
}
}
Arrays.fill(clientIds, 0);
Arrays.fill(eventIds, 0);
Arrays.fill(events, null);
}
} finally {