Fix concurrency errors

This commit is contained in:
Andrea Cavalli 2021-02-25 23:36:49 +01:00
parent a2bf050742
commit 2339ae7284
4 changed files with 5 additions and 3 deletions

View File

@ -76,6 +76,7 @@ public class InternalClient implements ClientEventsHandler, TelegramClient {
handleResponse(eventId, new Error(500, "Instance closed"), handler); handleResponse(eventId, new Error(500, "Instance closed"), handler);
}); });
handlers.clear(); handlers.clear();
logger.info("Client closed {}", clientId);
} }
/** /**

View File

@ -51,11 +51,11 @@ public class InternalClientManager implements AutoCloseable {
} }
public void registerClient(int clientId, ClientEventsHandler internalClient) { public void registerClient(int clientId, ClientEventsHandler internalClient) {
responseReceiver.registerClient(clientId);
boolean replaced = registeredClientEventHandlers.put(clientId, internalClient) != null; boolean replaced = registeredClientEventHandlers.put(clientId, internalClient) != null;
if (replaced) { if (replaced) {
throw new IllegalStateException("Client " + clientId + " already registered"); throw new IllegalStateException("Client " + clientId + " already registered");
} }
responseReceiver.registerClient(clientId);
} }
public String getImplementationName() { public String getImplementationName() {

View File

@ -134,6 +134,7 @@ public class InternalReactiveClient implements ClientEventsHandler, ReactiveTele
if (isClosed.get()) { if (isClosed.get()) {
if (alreadyCompleted.compareAndSet(false, true)) { if (alreadyCompleted.compareAndSet(false, true)) {
subscriber.onComplete(); subscriber.onComplete();
logger.info("Client closed {}", clientId);
} }
} }
} }

View File

@ -8,7 +8,7 @@ import java.util.Arrays;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -32,7 +32,7 @@ public class ResponseReceiver extends Thread implements AutoCloseable {
private final TdApi.Object[] events = new TdApi.Object[MAX_EVENTS]; private final TdApi.Object[] events = new TdApi.Object[MAX_EVENTS];
private final CountDownLatch closeWait = new CountDownLatch(1); private final CountDownLatch closeWait = new CountDownLatch(1);
private final Set<Integer> registeredClients = new ConcurrentSkipListSet<>(); private final Set<Integer> registeredClients = new ConcurrentHashMap<Integer, java.lang.Object>().keySet(new java.lang.Object());
private volatile boolean closeRequested = false; private volatile boolean closeRequested = false;