Fix hangs and inconsistencies

This commit is contained in:
Andrea Cavalli 2023-05-14 22:43:01 +02:00
parent 3c3af46254
commit 11fc1a13db
5 changed files with 90 additions and 31 deletions

View File

@ -92,35 +92,65 @@ public class ClientFactory implements AutoCloseable {
TdApi.Object[] clientEvents, TdApi.Object[] clientEvents,
int arrayOffset, int arrayOffset,
int arrayLength) { int arrayLength) {
ClientEventsHandler handler = state.getClientEventsHandler(clientId); var eventsHandlingLock = state.getEventsHandlingLock();
boolean closeWriteLockAcquisitionFailed = false;
var stamp = eventsHandlingLock.readLock();
try {
ClientEventsHandler handler = state.getClientEventsHandler(clientId);
if (handler != null) { if (handler != null) {
handler.handleEvents(isClosed, clientEventIds, clientEvents, arrayOffset, arrayLength); handler.handleEvents(isClosed, clientEventIds, clientEvents, arrayOffset, arrayLength);
} else { } else {
java.util.List<DroppedEvent> droppedEvents = getEffectivelyDroppedEvents(clientEventIds, java.util.List<DroppedEvent> droppedEvents = getEffectivelyDroppedEvents(clientEventIds,
clientEvents, clientEvents,
arrayOffset, arrayOffset,
arrayLength arrayLength
); );
if (!droppedEvents.isEmpty()) { if (!droppedEvents.isEmpty()) {
logger.error("Unknown client id \"{}\"! {} events have been dropped!", clientId, droppedEvents.size()); logger.error("Unknown client id \"{}\"! {} events have been dropped!", clientId, droppedEvents.size());
for (DroppedEvent droppedEvent : droppedEvents) { for (DroppedEvent droppedEvent : droppedEvents) {
logger.error("The following event, with id \"{}\", has been dropped: {}", logger.error("The following event, with id \"{}\", has been dropped: {}",
droppedEvent.id, droppedEvent.id,
droppedEvent.event droppedEvent.event
); );
}
} }
} }
if (isClosed) {
var writeLockStamp = eventsHandlingLock.tryConvertToWriteLock(stamp);
if (writeLockStamp != 0L) {
stamp = writeLockStamp;
removeClientEventHandlers(clientId);
} else {
closeWriteLockAcquisitionFailed = true;
}
}
} finally {
eventsHandlingLock.unlock(stamp);
} }
if (isClosed) { if (closeWriteLockAcquisitionFailed) {
logger.debug("Removing Client {} from event handlers", clientId); stamp = eventsHandlingLock.writeLock();
state.removeClientEventHandlers(clientId); try {
logger.debug("Removed Client {} from event handlers", clientId); removeClientEventHandlers(clientId);
} finally {
eventsHandlingLock.unlockWrite(stamp);
}
} }
} }
/**
* Call this method only inside handleClientEvents!
* Ensure that state has the eventsHandlingLock locked in write mode
*/
private void removeClientEventHandlers(int clientId) {
logger.debug("Removing Client {} from event handlers", clientId);
state.removeClientEventHandlers(clientId);
logger.debug("Removed Client {} from event handlers", clientId);
}
/** /**
* Get only events that have been dropped, ignoring synthetic errors related to the closure of a client * Get only events that have been dropped, ignoring synthetic errors related to the closure of a client
*/ */

View File

@ -10,6 +10,7 @@ import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.StampedLock;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.slf4j.Marker; import org.slf4j.Marker;
@ -149,21 +150,25 @@ final class InternalClient implements ClientEventsHandler, TelegramClient {
} }
private void createAndRegisterClient() { private void createAndRegisterClient() {
synchronized (this) { InternalClientsState clientManagerState = this.clientManagerState;
final StampedLock eventsHandlingLock = clientManagerState.getEventsHandlingLock();
var stamp = eventsHandlingLock.writeLock();
try {
if (clientId != null) { if (clientId != null) {
throw new UnsupportedOperationException("Can't initialize the same client twice!"); throw new UnsupportedOperationException("Can't initialize the same client twice!");
} }
int clientId = NativeClientAccess.create(); this.clientId = NativeClientAccess.create();
InternalClientsState clientManagerState = this.clientManagerState;
this.clientId = clientId;
if (clientRegistrationEventHandler != null) { if (clientRegistrationEventHandler != null) {
clientRegistrationEventHandler.onClientRegistered(clientId, clientManagerState::getNextQueryId); clientRegistrationEventHandler.onClientRegistered(clientId, clientManagerState::getNextQueryId);
// Remove the event handler // Remove the event handler
clientRegistrationEventHandler = null; clientRegistrationEventHandler = null;
} }
logger.debug(TG_MARKER, "Registering new client {}", clientId);
clientManagerState.registerClient(clientId, this);
logger.info(TG_MARKER, "Registered new client {}", clientId);
} finally {
eventsHandlingLock.unlockWrite(stamp);
} }
clientManagerState.registerClient(clientId, this);
logger.info(TG_MARKER, "Registered new client {}", clientId);
// Send a dummy request to start TDLib // Send a dummy request to start TDLib
logger.debug(TG_MARKER, "Sending dummy startup request as client {}", clientId); logger.debug(TG_MARKER, "Sending dummy startup request as client {}", clientId);

View File

@ -1,10 +1,12 @@
package it.tdlight; package it.tdlight;
import io.atlassian.util.concurrent.CopyOnWriteMap; import io.atlassian.util.concurrent.CopyOnWriteMap;
import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.StampedLock;
public class InternalClientsState { public class InternalClientsState {
static final int STATE_INITIAL = 0; static final int STATE_INITIAL = 0;
@ -14,13 +16,17 @@ public class InternalClientsState {
static final int STATE_STOPPED = 4; static final int STATE_STOPPED = 4;
private final AtomicInteger runState = new AtomicInteger(); private final AtomicInteger runState = new AtomicInteger();
private final AtomicLong currentQueryId = new AtomicLong(); private final AtomicLong currentQueryId = new AtomicLong();
private final Map<Integer, ClientEventsHandler> registeredClientEventHandlers = new ConcurrentHashMap<>(); private final Map<Integer, ClientEventsHandler> registeredClientEventHandlers = new HashMap<>();
private final StampedLock eventsHandlingLock = new StampedLock();
public long getNextQueryId() { public long getNextQueryId() {
return currentQueryId.updateAndGet(value -> (value == Long.MAX_VALUE ? 0 : value) + 1); return currentQueryId.updateAndGet(value -> (value == Long.MAX_VALUE ? 0 : value) + 1);
} }
/**
* Before calling this method, lock using getEventsHandlingLock().writeLock()
*/
public void registerClient(int clientId, ClientEventsHandler internalClient) { public void registerClient(int clientId, ClientEventsHandler internalClient) {
boolean replaced = registeredClientEventHandlers.put(clientId, internalClient) != null; boolean replaced = registeredClientEventHandlers.put(clientId, internalClient) != null;
if (replaced) { if (replaced) {
@ -28,10 +34,17 @@ public class InternalClientsState {
} }
} }
/**
* Before calling this method, lock using getEventsHandlingLock().readLock()
*/
public ClientEventsHandler getClientEventsHandler(int clientId) { public ClientEventsHandler getClientEventsHandler(int clientId) {
return registeredClientEventHandlers.get(clientId); return registeredClientEventHandlers.get(clientId);
} }
public StampedLock getEventsHandlingLock() {
return eventsHandlingLock;
}
public boolean shouldStartNow() { public boolean shouldStartNow() {
return runState.compareAndSet(STATE_INITIAL, STATE_STARTING); return runState.compareAndSet(STATE_INITIAL, STATE_STARTING);
} }
@ -46,6 +59,9 @@ public class InternalClientsState {
} }
} }
/**
* Before calling this method, lock using getEventsHandlingLock().writeLock()
*/
public void removeClientEventHandlers(int clientId) { public void removeClientEventHandlers(int clientId) {
registeredClientEventHandlers.remove(clientId); registeredClientEventHandlers.remove(clientId);
} }

View File

@ -165,9 +165,15 @@ final class InternalReactiveClient implements ClientEventsHandler, ReactiveTeleg
} }
logger.debug(TG_MARKER, "Creating new client"); logger.debug(TG_MARKER, "Creating new client");
clientId = NativeClientAccess.create(); clientId = NativeClientAccess.create();
logger.debug(TG_MARKER, "Registering new client {}", clientId); var eventsHandlingLock = clientManagerState.getEventsHandlingLock();
clientManagerState.registerClient(clientId, this); var stamp = eventsHandlingLock.writeLock();
logger.debug(TG_MARKER, "Registered new client {}", clientId); try {
logger.debug(TG_MARKER, "Registering new client {}", clientId);
clientManagerState.registerClient(clientId, this);
logger.info(TG_MARKER, "Registered new client {}", clientId);
} finally {
eventsHandlingLock.unlockWrite(stamp);
}
} }
@Override @Override

View File

@ -47,6 +47,7 @@ abstract class ResponseReceiver extends Thread implements AutoCloseable {
private final Object registeredClientsLock = new Object(); private final Object registeredClientsLock = new Object();
// Do not modify the int[] directly, this should be replaced // Do not modify the int[] directly, this should be replaced
private volatile int[] registeredClients = new int[0]; private volatile int[] registeredClients = new int[0];
private volatile boolean closeRequested;
public ResponseReceiver(EventsHandler eventsHandler) { public ResponseReceiver(EventsHandler eventsHandler) {
@ -67,7 +68,7 @@ abstract class ResponseReceiver extends Thread implements AutoCloseable {
final SimpleIntQueue closedClients = new SimpleIntQueue(); final SimpleIntQueue closedClients = new SimpleIntQueue();
try { try {
boolean interrupted; boolean interrupted;
while (!(interrupted = Thread.interrupted()) && registeredClients.length > 0) { while (!(interrupted = Thread.interrupted()) && (!closeRequested || registeredClients.length > 0)) {
// Timeout is expressed in seconds // Timeout is expressed in seconds
int resultsCount = receive(clientIds, eventIds, events, 2.0); int resultsCount = receive(clientIds, eventIds, events, 2.0);
LOG.trace("Received {} events", resultsCount); LOG.trace("Received {} events", resultsCount);
@ -272,6 +273,7 @@ abstract class ResponseReceiver extends Thread implements AutoCloseable {
@Override @Override
public void close() throws InterruptedException { public void close() throws InterruptedException {
this.closeRequested = true;
this.closeWait.await(); this.closeWait.await();
if (registeredClients.length == 0) { if (registeredClients.length == 0) {
LOG.debug("Interrupting response receiver"); LOG.debug("Interrupting response receiver");