From fe8bfd17e0e4014062d90984b26c22d6a688f719 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Tue, 13 Oct 2020 18:33:06 +0200 Subject: [PATCH] Fix race condition --- .../it/tdlight/common/InternalClient.java | 8 +- .../tdlight/common/InternalClientManager.java | 22 +--- .../it/tdlight/common/ResponseReceiver.java | 106 +++++++++++++----- 3 files changed, 90 insertions(+), 46 deletions(-) diff --git a/src/main/java/it/tdlight/common/InternalClient.java b/src/main/java/it/tdlight/common/InternalClient.java index f9ff205..94cd91f 100644 --- a/src/main/java/it/tdlight/common/InternalClient.java +++ b/src/main/java/it/tdlight/common/InternalClient.java @@ -10,7 +10,7 @@ import java.util.concurrent.ConcurrentHashMap; public class InternalClient implements ClientEventsHandler, TelegramClient { - private static final java.lang.Object CLIENT_CREATION_LOCK = new java.lang.Object(); + static final java.lang.Object CLIENT_CREATION_LOCK = new java.lang.Object(); private final ConcurrentHashMap handlers = new ConcurrentHashMap(); private final int clientId; @@ -30,9 +30,8 @@ public class InternalClient implements ClientEventsHandler, TelegramClient { this.updatesHandler = null; this.defaultExceptionHandler = defaultExceptionHandler; this.clientManager = clientManager; - - clientManager.preregisterClient(this); this.clientId = NativeClientAccess.create(); + clientManager.registerClient(clientId, this); } } @@ -46,9 +45,8 @@ public class InternalClient implements ClientEventsHandler, TelegramClient { this.updatesHandler = new MultiHandler(updatesHandler, updateExceptionHandler); this.clientManager = clientManager; this.defaultExceptionHandler = defaultExceptionHandler; - - clientManager.preregisterClient(this); this.clientId = NativeClientAccess.create(); + clientManager.registerClient(clientId, this); } } diff --git a/src/main/java/it/tdlight/common/InternalClientManager.java b/src/main/java/it/tdlight/common/InternalClientManager.java index a3e76d0..02eefd1 100644 --- a/src/main/java/it/tdlight/common/InternalClientManager.java +++ b/src/main/java/it/tdlight/common/InternalClientManager.java @@ -12,7 +12,6 @@ public class InternalClientManager implements AutoCloseable { private final String implementationName; private final ResponseReceiver responseReceiver = new ResponseReceiver(this::handleClientEvents); private final ConcurrentHashMap registeredClientEventHandlers = new ConcurrentHashMap<>(); - private final ConcurrentHashMap unregisteredClientEventHandlers = new ConcurrentHashMap<>(); private final AtomicLong currentQueryId = new AtomicLong(); private InternalClientManager(String implementationName) { @@ -32,15 +31,6 @@ public class InternalClientManager implements AutoCloseable { private void handleClientEvents(int clientId, boolean isClosed, long[] clientEventIds, Object[] clientEvents) { ClientEventsHandler handler = registeredClientEventHandlers.get(clientId); - if (handler == null) { - handler = unregisteredClientEventHandlers - .keySet() - .stream() - .filter(item -> item.getClientId() == clientId) - .findAny() - .orElse(null); - } - if (handler != null) { handler.handleEvents(isClosed, clientEventIds, clientEvents); } else { @@ -48,17 +38,17 @@ public class InternalClientManager implements AutoCloseable { } if (isClosed) { + System.err.println("Unregister client " + clientId); registeredClientEventHandlers.remove(clientId); } } - public void preregisterClient(ClientEventsHandler client) { - this.unregisteredClientEventHandlers.put(client, new java.lang.Object()); - } - public void registerClient(int clientId, InternalClient internalClient) { - registeredClientEventHandlers.put(clientId, internalClient); - unregisteredClientEventHandlers.remove(internalClient); + System.err.println("Register client " + clientId + ", " + internalClient); + boolean replaced = registeredClientEventHandlers.put(clientId, internalClient) != null; + if (replaced) { + throw new IllegalStateException("Client " + clientId + " already registered"); + } } public String getImplementationName() { diff --git a/src/main/java/it/tdlight/common/ResponseReceiver.java b/src/main/java/it/tdlight/common/ResponseReceiver.java index 04598c5..bfeca2e 100644 --- a/src/main/java/it/tdlight/common/ResponseReceiver.java +++ b/src/main/java/it/tdlight/common/ResponseReceiver.java @@ -1,11 +1,22 @@ package it.tdlight.common; +import static it.tdlight.common.InternalClient.CLIENT_CREATION_LOCK; + import it.tdlight.jni.TdApi; +import it.tdlight.jni.TdApi.Object; +import java.util.ArrayList; import java.util.Arrays; +import java.util.List; +import java.util.Set; import java.util.concurrent.CountDownLatch; +import java.util.stream.Collectors; public class ResponseReceiver extends Thread implements AutoCloseable { + private static final boolean USE_OPTIMIZED_DISPATCHER = Boolean.parseBoolean(System.getProperty( + "tdlight.dispatcher.use_optimized_dispatcher", + "true" + )); private static final int MAX_EVENTS = 1000; private static final int[] originalSortingSource = new int[MAX_EVENTS]; static { @@ -37,50 +48,95 @@ public class ResponseReceiver extends Thread implements AutoCloseable { int[] sortIndex; try { while(true) { - int resultsCount = NativeClientAccess.receive(clientIds, eventIds, events, 2.0 /*seconds*/); + int resultsCount; + synchronized (CLIENT_CREATION_LOCK) { + resultsCount = NativeClientAccess.receive(clientIds, eventIds, events, 2.0 /*seconds*/); + } if (resultsCount <= 0) continue; - // Generate a list of indices sorted by client id, from 0 to resultsCount - sortIndex = generateSortIndex(0, resultsCount, clientIds); + if (USE_OPTIMIZED_DISPATCHER) { + // Generate a list of indices sorted by client id, from 0 to resultsCount + sortIndex = generateSortIndex(0, resultsCount, clientIds); - int lastClientId = clientIds[sortIndex[0]]; - int lastClientIdEventsCount = 0; - boolean lastClientClosed = false; + int lastClientId = clientIds[sortIndex[0]]; + int lastClientIdEventsCount = 0; + boolean lastClientClosed = false; - for (int i = 0; i <= resultsCount; i++) { - if (i == resultsCount || (i != 0 && clientIds[sortIndex[i]] != lastClientId)) { - if (lastClientIdEventsCount > 0) { - int clientId = lastClientId; - long[] clientEventIds = new long[lastClientIdEventsCount]; - TdApi.Object[] clientEvents = new TdApi.Object[lastClientIdEventsCount]; - for (int j = 0; j < lastClientIdEventsCount; j++) { - clientEventIds[j] = eventIds[sortIndex[i - lastClientIdEventsCount + j]]; - clientEvents[j] = events[sortIndex[i - lastClientIdEventsCount + j]]; + for (int i = 0; i <= resultsCount; i++) { + if (i == resultsCount || (i != 0 && clientIds[sortIndex[i]] != lastClientId)) { + if (lastClientIdEventsCount > 0) { + int clientId = lastClientId; + long[] clientEventIds = new long[lastClientIdEventsCount]; + TdApi.Object[] clientEvents = new TdApi.Object[lastClientIdEventsCount]; + for (int j = 0; j < lastClientIdEventsCount; j++) { + clientEventIds[j] = eventIds[sortIndex[i - lastClientIdEventsCount + j]]; + clientEvents[j] = events[sortIndex[i - lastClientIdEventsCount + j]]; - if (clientEventIds[j] == 0 && clientEvents[j] instanceof TdApi.UpdateAuthorizationState) { - TdApi.AuthorizationState authorizationState = ((TdApi.UpdateAuthorizationState) clientEvents[j]).authorizationState; - if (authorizationState instanceof TdApi.AuthorizationStateClosed) { - lastClientClosed = true; + if (clientEventIds[j] == 0 && clientEvents[j] instanceof TdApi.UpdateAuthorizationState) { + TdApi.AuthorizationState authorizationState = ((TdApi.UpdateAuthorizationState) clientEvents[j]).authorizationState; + if (authorizationState instanceof TdApi.AuthorizationStateClosed) { + lastClientClosed = true; + } } } + + eventsHandler.handleClientEvents(clientId, lastClientClosed, clientEventIds, clientEvents); } - eventsHandler.handleClientEvents(clientId, lastClientClosed, clientEventIds, clientEvents); + if (i < resultsCount) { + lastClientId = clientIds[sortIndex[i]]; + lastClientIdEventsCount = 0; + lastClientClosed = false; + } } if (i < resultsCount) { - lastClientId = clientIds[sortIndex[i]]; - lastClientIdEventsCount = 0; - lastClientClosed = false; + lastClientIdEventsCount++; + } + } + } else { + class Event { + + public final int clientId; + public final long eventId; + public final Object event; + + public Event(int clientId, long eventId, Object event) { + this.clientId = clientId; + this.eventId = eventId; + this.event = event; } } - if (i < resultsCount) { - lastClientIdEventsCount++; + List eventsList = new ArrayList<>(resultsCount); + for (int i = 0; i < resultsCount; i++) { + eventsList.add(new Event(clientIds[i], eventIds[i], events[i])); + } + Set clientIds = eventsList.stream().map(e -> e.clientId).collect(Collectors.toSet()); + for (int clientId : clientIds) { + List clientEventsList = eventsList.stream().filter(e -> e.clientId == clientId).collect(Collectors.toList()); + long[] clientEventIds = new long[clientEventsList.size()]; + Object[] clientEvents = new Object[clientEventsList.size()]; + boolean closed = false; + for (int i = 0; i < clientEventsList.size(); i++) { + Event e = clientEventsList.get(i); + clientEventIds[i] = e.eventId; + clientEvents[i] = e.event; + + if (e.eventId == 0 && e.event instanceof TdApi.UpdateAuthorizationState) { + TdApi.AuthorizationState authorizationState = ((TdApi.UpdateAuthorizationState) e.event).authorizationState; + if (authorizationState instanceof TdApi.AuthorizationStateClosed) { + closed = true; + } + } + } + eventsHandler.handleClientEvents(clientId, closed, clientEventIds, clientEvents); } } + Arrays.fill(clientIds, 0); + Arrays.fill(eventIds, 0); Arrays.fill(events, null); } } finally {