diff --git a/tdlight-java/src/main/java/it/tdlight/ClientFactory.java b/tdlight-java/src/main/java/it/tdlight/ClientFactory.java index fe19593..5d91152 100644 --- a/tdlight-java/src/main/java/it/tdlight/ClientFactory.java +++ b/tdlight-java/src/main/java/it/tdlight/ClientFactory.java @@ -92,35 +92,65 @@ public class ClientFactory implements AutoCloseable { TdApi.Object[] clientEvents, int arrayOffset, int arrayLength) { - ClientEventsHandler handler = state.getClientEventsHandler(clientId); + var eventsHandlingLock = state.getEventsHandlingLock(); + boolean closeWriteLockAcquisitionFailed = false; + var stamp = eventsHandlingLock.readLock(); + try { + ClientEventsHandler handler = state.getClientEventsHandler(clientId); - if (handler != null) { - handler.handleEvents(isClosed, clientEventIds, clientEvents, arrayOffset, arrayLength); - } else { - java.util.List droppedEvents = getEffectivelyDroppedEvents(clientEventIds, - clientEvents, - arrayOffset, - arrayLength - ); + if (handler != null) { + handler.handleEvents(isClosed, clientEventIds, clientEvents, arrayOffset, arrayLength); + } else { + java.util.List droppedEvents = getEffectivelyDroppedEvents(clientEventIds, + clientEvents, + arrayOffset, + arrayLength + ); - if (!droppedEvents.isEmpty()) { - logger.error("Unknown client id \"{}\"! {} events have been dropped!", clientId, droppedEvents.size()); - for (DroppedEvent droppedEvent : droppedEvents) { - logger.error("The following event, with id \"{}\", has been dropped: {}", - droppedEvent.id, - droppedEvent.event - ); + if (!droppedEvents.isEmpty()) { + logger.error("Unknown client id \"{}\"! {} events have been dropped!", clientId, droppedEvents.size()); + for (DroppedEvent droppedEvent : droppedEvents) { + logger.error("The following event, with id \"{}\", has been dropped: {}", + droppedEvent.id, + droppedEvent.event + ); + } } } + + if (isClosed) { + var writeLockStamp = eventsHandlingLock.tryConvertToWriteLock(stamp); + if (writeLockStamp != 0L) { + stamp = writeLockStamp; + removeClientEventHandlers(clientId); + } else { + closeWriteLockAcquisitionFailed = true; + } + } + } finally { + eventsHandlingLock.unlock(stamp); } - if (isClosed) { - logger.debug("Removing Client {} from event handlers", clientId); - state.removeClientEventHandlers(clientId); - logger.debug("Removed Client {} from event handlers", clientId); + if (closeWriteLockAcquisitionFailed) { + stamp = eventsHandlingLock.writeLock(); + try { + removeClientEventHandlers(clientId); + } finally { + eventsHandlingLock.unlockWrite(stamp); + } } } + /** + * Call this method only inside handleClientEvents! + * Ensure that state has the eventsHandlingLock locked in write mode + */ + private void removeClientEventHandlers(int clientId) { + logger.debug("Removing Client {} from event handlers", clientId); + state.removeClientEventHandlers(clientId); + logger.debug("Removed Client {} from event handlers", clientId); + } + /** * Get only events that have been dropped, ignoring synthetic errors related to the closure of a client */ diff --git a/tdlight-java/src/main/java/it/tdlight/InternalClient.java b/tdlight-java/src/main/java/it/tdlight/InternalClient.java index a9a2acd..8a43a6a 100644 --- a/tdlight-java/src/main/java/it/tdlight/InternalClient.java +++ b/tdlight-java/src/main/java/it/tdlight/InternalClient.java @@ -10,6 +10,7 @@ import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.StampedLock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.Marker; @@ -149,21 +150,25 @@ final class InternalClient implements ClientEventsHandler, TelegramClient { } private void createAndRegisterClient() { - synchronized (this) { + InternalClientsState clientManagerState = this.clientManagerState; + final StampedLock eventsHandlingLock = clientManagerState.getEventsHandlingLock(); + var stamp = eventsHandlingLock.writeLock(); + try { if (clientId != null) { throw new UnsupportedOperationException("Can't initialize the same client twice!"); } - int clientId = NativeClientAccess.create(); - InternalClientsState clientManagerState = this.clientManagerState; - this.clientId = clientId; + this.clientId = NativeClientAccess.create(); if (clientRegistrationEventHandler != null) { clientRegistrationEventHandler.onClientRegistered(clientId, clientManagerState::getNextQueryId); // Remove the event handler clientRegistrationEventHandler = null; } + logger.debug(TG_MARKER, "Registering new client {}", clientId); + clientManagerState.registerClient(clientId, this); + logger.info(TG_MARKER, "Registered new client {}", clientId); + } finally { + eventsHandlingLock.unlockWrite(stamp); } - clientManagerState.registerClient(clientId, this); - logger.info(TG_MARKER, "Registered new client {}", clientId); // Send a dummy request to start TDLib logger.debug(TG_MARKER, "Sending dummy startup request as client {}", clientId); diff --git a/tdlight-java/src/main/java/it/tdlight/InternalClientsState.java b/tdlight-java/src/main/java/it/tdlight/InternalClientsState.java index 2b0bbdc..270f25f 100644 --- a/tdlight-java/src/main/java/it/tdlight/InternalClientsState.java +++ b/tdlight-java/src/main/java/it/tdlight/InternalClientsState.java @@ -1,10 +1,12 @@ package it.tdlight; import io.atlassian.util.concurrent.CopyOnWriteMap; +import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.StampedLock; public class InternalClientsState { static final int STATE_INITIAL = 0; @@ -14,13 +16,17 @@ public class InternalClientsState { static final int STATE_STOPPED = 4; private final AtomicInteger runState = new AtomicInteger(); private final AtomicLong currentQueryId = new AtomicLong(); - private final Map registeredClientEventHandlers = new ConcurrentHashMap<>(); + private final Map registeredClientEventHandlers = new HashMap<>(); + private final StampedLock eventsHandlingLock = new StampedLock(); public long getNextQueryId() { return currentQueryId.updateAndGet(value -> (value == Long.MAX_VALUE ? 0 : value) + 1); } + /** + * Before calling this method, lock using getEventsHandlingLock().writeLock() + */ public void registerClient(int clientId, ClientEventsHandler internalClient) { boolean replaced = registeredClientEventHandlers.put(clientId, internalClient) != null; if (replaced) { @@ -28,10 +34,17 @@ public class InternalClientsState { } } + /** + * Before calling this method, lock using getEventsHandlingLock().readLock() + */ public ClientEventsHandler getClientEventsHandler(int clientId) { return registeredClientEventHandlers.get(clientId); } + public StampedLock getEventsHandlingLock() { + return eventsHandlingLock; + } + public boolean shouldStartNow() { return runState.compareAndSet(STATE_INITIAL, STATE_STARTING); } @@ -46,6 +59,9 @@ public class InternalClientsState { } } + /** + * Before calling this method, lock using getEventsHandlingLock().writeLock() + */ public void removeClientEventHandlers(int clientId) { registeredClientEventHandlers.remove(clientId); } diff --git a/tdlight-java/src/main/java/it/tdlight/InternalReactiveClient.java b/tdlight-java/src/main/java/it/tdlight/InternalReactiveClient.java index bfa17f0..940592f 100644 --- a/tdlight-java/src/main/java/it/tdlight/InternalReactiveClient.java +++ b/tdlight-java/src/main/java/it/tdlight/InternalReactiveClient.java @@ -165,9 +165,15 @@ final class InternalReactiveClient implements ClientEventsHandler, ReactiveTeleg } logger.debug(TG_MARKER, "Creating new client"); clientId = NativeClientAccess.create(); - logger.debug(TG_MARKER, "Registering new client {}", clientId); - clientManagerState.registerClient(clientId, this); - logger.debug(TG_MARKER, "Registered new client {}", clientId); + var eventsHandlingLock = clientManagerState.getEventsHandlingLock(); + var stamp = eventsHandlingLock.writeLock(); + try { + logger.debug(TG_MARKER, "Registering new client {}", clientId); + clientManagerState.registerClient(clientId, this); + logger.info(TG_MARKER, "Registered new client {}", clientId); + } finally { + eventsHandlingLock.unlockWrite(stamp); + } } @Override diff --git a/tdlight-java/src/main/java/it/tdlight/ResponseReceiver.java b/tdlight-java/src/main/java/it/tdlight/ResponseReceiver.java index 5883df1..fe95ea4 100644 --- a/tdlight-java/src/main/java/it/tdlight/ResponseReceiver.java +++ b/tdlight-java/src/main/java/it/tdlight/ResponseReceiver.java @@ -47,6 +47,7 @@ abstract class ResponseReceiver extends Thread implements AutoCloseable { private final Object registeredClientsLock = new Object(); // Do not modify the int[] directly, this should be replaced private volatile int[] registeredClients = new int[0]; + private volatile boolean closeRequested; public ResponseReceiver(EventsHandler eventsHandler) { @@ -67,7 +68,7 @@ abstract class ResponseReceiver extends Thread implements AutoCloseable { final SimpleIntQueue closedClients = new SimpleIntQueue(); try { boolean interrupted; - while (!(interrupted = Thread.interrupted()) && registeredClients.length > 0) { + while (!(interrupted = Thread.interrupted()) && (!closeRequested || registeredClients.length > 0)) { // Timeout is expressed in seconds int resultsCount = receive(clientIds, eventIds, events, 2.0); LOG.trace("Received {} events", resultsCount); @@ -272,6 +273,7 @@ abstract class ResponseReceiver extends Thread implements AutoCloseable { @Override public void close() throws InterruptedException { + this.closeRequested = true; this.closeWait.await(); if (registeredClients.length == 0) { LOG.debug("Interrupting response receiver");