Close tdlib receiver on jvm shutdown

This commit is contained in:
Andrea Cavalli 2021-10-05 12:25:28 +02:00
parent e24c614025
commit e1465d4cb1
2 changed files with 81 additions and 11 deletions

View File

@ -6,10 +6,9 @@ import it.tdlight.jni.TdApi;
import it.tdlight.jni.TdApi.Object;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
@ -20,8 +19,11 @@ public final class InternalClientManager implements AutoCloseable {
private static final Logger logger = LoggerFactory.getLogger(InternalClientManager.class);
private static final AtomicReference<InternalClientManager> INSTANCE = new AtomicReference<>(null);
private final AtomicBoolean startCalled = new AtomicBoolean();
private final AtomicBoolean closeCalled = new AtomicBoolean();
private final String implementationName;
private final ResponseReceiver responseReceiver = new ResponseReceiver(this::handleClientEvents);
private final ResponseReceiver responseReceiver;
private final ConcurrentHashMap<Integer, ClientEventsHandler> registeredClientEventHandlers = new ConcurrentHashMap<>();
private final AtomicLong currentQueryId = new AtomicLong();
@ -33,10 +35,42 @@ public final class InternalClientManager implements AutoCloseable {
System.exit(1);
}
this.implementationName = implementationName;
responseReceiver = new ResponseReceiver(this::handleClientEvents);
}
/**
*
* @return true if started as a result of this call
*/
public boolean startIfNeeded() {
if (closeCalled.get()) {
return false;
}
if (startCalled.compareAndSet(false, true)) {
responseReceiver.start();
return true;
} else {
return false;
}
}
public static InternalClientManager get(String implementationName) {
return INSTANCE.updateAndGet(val -> val == null ? new InternalClientManager(implementationName) : val);
InternalClientManager clientManager = INSTANCE.updateAndGet(val -> {
if (val == null) {
return new InternalClientManager(implementationName);
}
return val;
});
if (clientManager.startIfNeeded()) {
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try {
clientManager.onJVMShutdown();
} catch (InterruptedException ex) {
logger.error("Failed to close", ex);
}
}));
}
return clientManager;
}
private void handleClientEvents(int clientId, boolean isClosed, long[] clientEventIds, TdApi.Object[] clientEvents) {
@ -104,7 +138,17 @@ public final class InternalClientManager implements AutoCloseable {
@Override
public void close() throws InterruptedException {
responseReceiver.close();
if (startCalled.get()) {
if (closeCalled.compareAndSet(false, true)) {
responseReceiver.close();
}
} else {
throw new IllegalStateException("Start not called");
}
}
private void onJVMShutdown() throws InterruptedException {
responseReceiver.onJVMShutdown();
}
private static final class DroppedEvent {

View File

@ -12,6 +12,7 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
public final class ResponseReceiver extends Thread implements AutoCloseable {
@ -28,6 +29,10 @@ public final class ResponseReceiver extends Thread implements AutoCloseable {
}
}
private final AtomicBoolean startCalled = new AtomicBoolean();
private final AtomicBoolean closeCalled = new AtomicBoolean();
private final AtomicBoolean jvmShutdown = new AtomicBoolean();
private final EventsHandler eventsHandler;
private final int[] clientIds = new int[MAX_EVENTS];
private final long[] eventIds = new long[MAX_EVENTS];
@ -35,16 +40,24 @@ public final class ResponseReceiver extends Thread implements AutoCloseable {
private final CountDownLatch closeWait = new CountDownLatch(1);
private final Set<Integer> registeredClients = new ConcurrentHashMap<Integer, java.lang.Object>().keySet(new java.lang.Object());
private volatile boolean closeRequested = false;
public ResponseReceiver(EventsHandler eventsHandler) {
super("TDLib thread");
this.eventsHandler = eventsHandler;
this.setDaemon(true);
}
this.start();
@Override
public synchronized void start() {
if (closeCalled.get()) {
throw new IllegalStateException("Closed");
}
if (startCalled.compareAndSet(false, true)) {
super.start();
} else {
throw new IllegalStateException("Start already called");
}
}
@SuppressWarnings({"UnnecessaryLocalVariable"})
@ -52,7 +65,7 @@ public final class ResponseReceiver extends Thread implements AutoCloseable {
public void run() {
int[] sortIndex;
try {
while (!closeRequested || !registeredClients.isEmpty()) {
while ((!Thread.interrupted() && !closeCalled.get() && !jvmShutdown.get()) || !registeredClients.isEmpty()) {
int resultsCount = NativeClientAccess.receive(clientIds, eventIds, events, 2.0 /*seconds*/);
if (resultsCount <= 0) {
@ -172,7 +185,20 @@ public final class ResponseReceiver extends Thread implements AutoCloseable {
@Override
public void close() throws InterruptedException {
this.closeRequested = true;
this.closeWait.await();
if (startCalled.get()) {
if (closeCalled.compareAndSet(false, true)) {
this.closeWait.await();
}
} else {
throw new IllegalStateException("Start not called");
}
}
public void onJVMShutdown() throws InterruptedException {
if (startCalled.get()) {
if (this.jvmShutdown.compareAndSet(false, true)) {
this.closeWait.await();
}
}
}
}