Update InternalClientManager.java and ResponseReceiver.java

This commit is contained in:
Andrea Cavalli 2020-10-14 19:16:21 +02:00
parent 3f53b5c4ee
commit bed2064694
2 changed files with 19 additions and 1 deletions

View File

@ -43,6 +43,7 @@ public class InternalClientManager implements AutoCloseable {
} }
public void registerClient(int clientId, InternalClient internalClient) { public void registerClient(int clientId, InternalClient internalClient) {
responseReceiver.registerClient();
boolean replaced = registeredClientEventHandlers.put(clientId, internalClient) != null; boolean replaced = registeredClientEventHandlers.put(clientId, internalClient) != null;
if (replaced) { if (replaced) {
throw new IllegalStateException("Client " + clientId + " already registered"); throw new IllegalStateException("Client " + clientId + " already registered");

View File

@ -10,6 +10,7 @@ import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors; import java.util.stream.Collectors;
public class ResponseReceiver extends Thread implements AutoCloseable { public class ResponseReceiver extends Thread implements AutoCloseable {
@ -32,6 +33,8 @@ public class ResponseReceiver extends Thread implements AutoCloseable {
private final TdApi.Object[] events = new TdApi.Object[MAX_EVENTS]; private final TdApi.Object[] events = new TdApi.Object[MAX_EVENTS];
private final CountDownLatch closeWait = new CountDownLatch(1); private final CountDownLatch closeWait = new CountDownLatch(1);
private AtomicInteger registeredClients = new AtomicInteger(0);
private volatile boolean closeRequested = false;
public ResponseReceiver(EventsHandler eventsHandler) { public ResponseReceiver(EventsHandler eventsHandler) {
@ -48,7 +51,7 @@ public class ResponseReceiver extends Thread implements AutoCloseable {
public void run() { public void run() {
int[] sortIndex; int[] sortIndex;
try { try {
while(true) { while(!closeRequested || registeredClients.get() > 0) {
int resultsCount; int resultsCount;
clientInitializationLock.readLock().lock(); clientInitializationLock.readLock().lock();
try { try {
@ -60,6 +63,8 @@ public class ResponseReceiver extends Thread implements AutoCloseable {
if (resultsCount <= 0) if (resultsCount <= 0)
continue; continue;
int closedClients = 0;
if (USE_OPTIMIZED_DISPATCHER) { if (USE_OPTIMIZED_DISPATCHER) {
// Generate a list of indices sorted by client id, from 0 to resultsCount // Generate a list of indices sorted by client id, from 0 to resultsCount
sortIndex = generateSortIndex(0, resultsCount, clientIds); sortIndex = generateSortIndex(0, resultsCount, clientIds);
@ -82,6 +87,7 @@ public class ResponseReceiver extends Thread implements AutoCloseable {
TdApi.AuthorizationState authorizationState = ((TdApi.UpdateAuthorizationState) clientEvents[j]).authorizationState; TdApi.AuthorizationState authorizationState = ((TdApi.UpdateAuthorizationState) clientEvents[j]).authorizationState;
if (authorizationState instanceof TdApi.AuthorizationStateClosed) { if (authorizationState instanceof TdApi.AuthorizationStateClosed) {
lastClientClosed = true; lastClientClosed = true;
closedClients++;
} }
} }
} }
@ -133,15 +139,21 @@ public class ResponseReceiver extends Thread implements AutoCloseable {
TdApi.AuthorizationState authorizationState = ((TdApi.UpdateAuthorizationState) e.event).authorizationState; TdApi.AuthorizationState authorizationState = ((TdApi.UpdateAuthorizationState) e.event).authorizationState;
if (authorizationState instanceof TdApi.AuthorizationStateClosed) { if (authorizationState instanceof TdApi.AuthorizationStateClosed) {
closed = true; closed = true;
closedClients++;
} }
} }
} }
eventsHandler.handleClientEvents(clientId, closed, clientEventIds, clientEvents); eventsHandler.handleClientEvents(clientId, closed, clientEventIds, clientEvents);
} }
} }
Arrays.fill(clientIds, 0); Arrays.fill(clientIds, 0);
Arrays.fill(eventIds, 0); Arrays.fill(eventIds, 0);
Arrays.fill(events, null); Arrays.fill(events, null);
if (closedClients > 0) {
this.registeredClients.addAndGet(-closedClients);
}
} }
} finally { } finally {
this.closeWait.countDown(); this.closeWait.countDown();
@ -157,8 +169,13 @@ public class ResponseReceiver extends Thread implements AutoCloseable {
return sortedIndices; return sortedIndices;
} }
public void registerClient() {
registeredClients.incrementAndGet();
}
@Override @Override
public void close() throws InterruptedException { public void close() throws InterruptedException {
this.closeRequested = true;
this.closeWait.await(); this.closeWait.await();
} }
} }