Remove the deprecated ThreadDeathWatcher (#11574)
Motivation: The deprecated ThreadDeathWatcher produces more garbage and can delay resource release, when compared to manual resource management. Modification: Remove the ThreadDeathWatcher and other deprecated APIs that rely on it. Result: Less deprecated code.
This commit is contained in:
parent
85b0eb2d3e
commit
3e2e36eac5
@ -392,7 +392,7 @@ public class PooledByteBufAllocatorTest extends AbstractByteBufAllocatorTest<Poo
|
|||||||
thread.join();
|
thread.join();
|
||||||
}
|
}
|
||||||
|
|
||||||
// Wait for the ThreadDeathWatcher to have destroyed all thread caches
|
// Wait for all thread caches to be destroyed.
|
||||||
while (allocator.metric().numThreadLocalCaches() > 0) {
|
while (allocator.metric().numThreadLocalCaches() > 0) {
|
||||||
// Signal we want to have a GC run to ensure we can process our ThreadCleanerReference
|
// Signal we want to have a GC run to ensure we can process our ThreadCleanerReference
|
||||||
System.gc();
|
System.gc();
|
||||||
|
@ -16,7 +16,6 @@
|
|||||||
package io.netty.util;
|
package io.netty.util;
|
||||||
|
|
||||||
import io.netty.util.internal.ObjectUtil;
|
import io.netty.util.internal.ObjectUtil;
|
||||||
import io.netty.util.internal.StringUtil;
|
|
||||||
import io.netty.util.internal.logging.InternalLogger;
|
import io.netty.util.internal.logging.InternalLogger;
|
||||||
import io.netty.util.internal.logging.InternalLoggerFactory;
|
import io.netty.util.internal.logging.InternalLoggerFactory;
|
||||||
|
|
||||||
@ -137,34 +136,6 @@ public final class ReferenceCountUtil {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Schedules the specified object to be released when the caller thread terminates. Note that this operation is
|
|
||||||
* intended to simplify reference counting of ephemeral objects during unit tests. Do not use it beyond the
|
|
||||||
* intended use case.
|
|
||||||
*
|
|
||||||
* @deprecated this may introduce a lot of memory usage so it is generally preferable to manually release objects.
|
|
||||||
*/
|
|
||||||
@Deprecated
|
|
||||||
public static <T> T releaseLater(T msg) {
|
|
||||||
return releaseLater(msg, 1);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Schedules the specified object to be released when the caller thread terminates. Note that this operation is
|
|
||||||
* intended to simplify reference counting of ephemeral objects during unit tests. Do not use it beyond the
|
|
||||||
* intended use case.
|
|
||||||
*
|
|
||||||
* @deprecated this may introduce a lot of memory usage so it is generally preferable to manually release objects.
|
|
||||||
*/
|
|
||||||
@Deprecated
|
|
||||||
public static <T> T releaseLater(T msg, int decrement) {
|
|
||||||
ObjectUtil.checkPositive(decrement, "decrement");
|
|
||||||
if (msg instanceof ReferenceCounted) {
|
|
||||||
ThreadDeathWatcher.watch(Thread.currentThread(), new ReleasingTask((ReferenceCounted) msg, decrement));
|
|
||||||
}
|
|
||||||
return msg;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns reference count of a {@link ReferenceCounted} object. If object is not type of
|
* Returns reference count of a {@link ReferenceCounted} object. If object is not type of
|
||||||
* {@link ReferenceCounted}, {@code -1} is returned.
|
* {@link ReferenceCounted}, {@code -1} is returned.
|
||||||
@ -173,37 +144,5 @@ public final class ReferenceCountUtil {
|
|||||||
return msg instanceof ReferenceCounted ? ((ReferenceCounted) msg).refCnt() : -1;
|
return msg instanceof ReferenceCounted ? ((ReferenceCounted) msg).refCnt() : -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Releases the objects when the thread that called {@link #releaseLater(Object)} has been terminated.
|
|
||||||
*/
|
|
||||||
private static final class ReleasingTask implements Runnable {
|
|
||||||
|
|
||||||
private final ReferenceCounted obj;
|
|
||||||
private final int decrement;
|
|
||||||
|
|
||||||
ReleasingTask(ReferenceCounted obj, int decrement) {
|
|
||||||
this.obj = obj;
|
|
||||||
this.decrement = decrement;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void run() {
|
|
||||||
try {
|
|
||||||
if (!obj.release(decrement)) {
|
|
||||||
logger.warn("Non-zero refCnt: {}", this);
|
|
||||||
} else {
|
|
||||||
logger.debug("Released: {}", this);
|
|
||||||
}
|
|
||||||
} catch (Exception ex) {
|
|
||||||
logger.warn("Failed to release an object: {}", obj, ex);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String toString() {
|
|
||||||
return StringUtil.simpleClassName(obj) + ".release(" + decrement + ") refCnt: " + obj.refCnt();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private ReferenceCountUtil() { }
|
private ReferenceCountUtil() { }
|
||||||
}
|
}
|
||||||
|
@ -1,258 +0,0 @@
|
|||||||
/*
|
|
||||||
* Copyright 2014 The Netty Project
|
|
||||||
*
|
|
||||||
* The Netty Project licenses this file to you under the Apache License,
|
|
||||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at:
|
|
||||||
*
|
|
||||||
* https://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
||||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
||||||
* License for the specific language governing permissions and limitations
|
|
||||||
* under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package io.netty.util;
|
|
||||||
|
|
||||||
import static java.util.Objects.requireNonNull;
|
|
||||||
|
|
||||||
import io.netty.util.concurrent.DefaultThreadFactory;
|
|
||||||
import io.netty.util.internal.StringUtil;
|
|
||||||
import io.netty.util.internal.SystemPropertyUtil;
|
|
||||||
import io.netty.util.internal.logging.InternalLogger;
|
|
||||||
import io.netty.util.internal.logging.InternalLoggerFactory;
|
|
||||||
|
|
||||||
import java.security.AccessController;
|
|
||||||
import java.security.PrivilegedAction;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Queue;
|
|
||||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
|
||||||
import java.util.concurrent.ThreadFactory;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Checks if a thread is alive periodically and runs a task when a thread dies.
|
|
||||||
* <p>
|
|
||||||
* This thread starts a daemon thread to check the state of the threads being watched and to invoke their
|
|
||||||
* associated {@link Runnable}s. When there is no thread to watch (i.e. all threads are dead), the daemon thread
|
|
||||||
* will terminate itself, and a new daemon thread will be started again when a new watch is added.
|
|
||||||
* </p>
|
|
||||||
*
|
|
||||||
* @deprecated will be removed in the next major release
|
|
||||||
*/
|
|
||||||
@Deprecated
|
|
||||||
public final class ThreadDeathWatcher {
|
|
||||||
|
|
||||||
private static final InternalLogger logger = InternalLoggerFactory.getInstance(ThreadDeathWatcher.class);
|
|
||||||
// visible for testing
|
|
||||||
static final ThreadFactory threadFactory;
|
|
||||||
|
|
||||||
// Use a MPMC queue as we may end up checking isEmpty() from multiple threads which may not be allowed to do
|
|
||||||
// concurrently depending on the implementation of it in a MPSC queue.
|
|
||||||
private static final Queue<Entry> pendingEntries = new ConcurrentLinkedQueue<>();
|
|
||||||
private static final Watcher watcher = new Watcher();
|
|
||||||
private static final AtomicBoolean started = new AtomicBoolean();
|
|
||||||
private static volatile Thread watcherThread;
|
|
||||||
|
|
||||||
static {
|
|
||||||
String poolName = "threadDeathWatcher";
|
|
||||||
String serviceThreadPrefix = SystemPropertyUtil.get("io.netty.serviceThreadPrefix");
|
|
||||||
if (!StringUtil.isNullOrEmpty(serviceThreadPrefix)) {
|
|
||||||
poolName = serviceThreadPrefix + poolName;
|
|
||||||
}
|
|
||||||
// because the ThreadDeathWatcher is a singleton, tasks submitted to it can come from arbitrary threads and
|
|
||||||
// this can trigger the creation of a thread from arbitrary thread groups; for this reason, the thread factory
|
|
||||||
// must not be sticky about its thread group
|
|
||||||
threadFactory = new DefaultThreadFactory(poolName, true, Thread.MIN_PRIORITY, null);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Schedules the specified {@code task} to run when the specified {@code thread} dies.
|
|
||||||
*
|
|
||||||
* @param thread the {@link Thread} to watch
|
|
||||||
* @param task the {@link Runnable} to run when the {@code thread} dies
|
|
||||||
*
|
|
||||||
* @throws IllegalArgumentException if the specified {@code thread} is not alive
|
|
||||||
*/
|
|
||||||
public static void watch(Thread thread, Runnable task) {
|
|
||||||
requireNonNull(thread, "thread");
|
|
||||||
requireNonNull(task, "task");
|
|
||||||
if (!thread.isAlive()) {
|
|
||||||
throw new IllegalArgumentException("thread must be alive.");
|
|
||||||
}
|
|
||||||
|
|
||||||
schedule(thread, task, true);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Cancels the task scheduled via {@link #watch(Thread, Runnable)}.
|
|
||||||
*/
|
|
||||||
public static void unwatch(Thread thread, Runnable task) {
|
|
||||||
requireNonNull(thread, "thread");
|
|
||||||
requireNonNull(task, "taks");
|
|
||||||
|
|
||||||
schedule(thread, task, false);
|
|
||||||
}
|
|
||||||
|
|
||||||
private static void schedule(Thread thread, Runnable task, boolean isWatch) {
|
|
||||||
pendingEntries.add(new Entry(thread, task, isWatch));
|
|
||||||
|
|
||||||
if (started.compareAndSet(false, true)) {
|
|
||||||
final Thread watcherThread = threadFactory.newThread(watcher);
|
|
||||||
// Set to null to ensure we not create classloader leaks by holds a strong reference to the inherited
|
|
||||||
// classloader.
|
|
||||||
// See:
|
|
||||||
// - https://github.com/netty/netty/issues/7290
|
|
||||||
// - https://bugs.openjdk.java.net/browse/JDK-7008595
|
|
||||||
AccessController.doPrivileged((PrivilegedAction<Void>) () -> {
|
|
||||||
watcherThread.setContextClassLoader(null);
|
|
||||||
return null;
|
|
||||||
});
|
|
||||||
|
|
||||||
watcherThread.start();
|
|
||||||
ThreadDeathWatcher.watcherThread = watcherThread;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Waits until the thread of this watcher has no threads to watch and terminates itself.
|
|
||||||
* Because a new watcher thread will be started again on {@link #watch(Thread, Runnable)},
|
|
||||||
* this operation is only useful when you want to ensure that the watcher thread is terminated
|
|
||||||
* <strong>after</strong> your application is shut down and there's no chance of calling
|
|
||||||
* {@link #watch(Thread, Runnable)} afterwards.
|
|
||||||
*
|
|
||||||
* @return {@code true} if and only if the watcher thread has been terminated
|
|
||||||
*/
|
|
||||||
public static boolean awaitInactivity(long timeout, TimeUnit unit) throws InterruptedException {
|
|
||||||
requireNonNull(unit, "unit");
|
|
||||||
|
|
||||||
Thread watcherThread = ThreadDeathWatcher.watcherThread;
|
|
||||||
if (watcherThread != null) {
|
|
||||||
watcherThread.join(unit.toMillis(timeout));
|
|
||||||
return !watcherThread.isAlive();
|
|
||||||
} else {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private ThreadDeathWatcher() { }
|
|
||||||
|
|
||||||
private static final class Watcher implements Runnable {
|
|
||||||
|
|
||||||
private final List<Entry> watchees = new ArrayList<>();
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void run() {
|
|
||||||
for (;;) {
|
|
||||||
fetchWatchees();
|
|
||||||
notifyWatchees();
|
|
||||||
|
|
||||||
// Try once again just in case notifyWatchees() triggered watch() or unwatch().
|
|
||||||
fetchWatchees();
|
|
||||||
notifyWatchees();
|
|
||||||
|
|
||||||
try {
|
|
||||||
Thread.sleep(1000);
|
|
||||||
} catch (InterruptedException ignore) {
|
|
||||||
// Ignore the interrupt; do not terminate until all tasks are run.
|
|
||||||
}
|
|
||||||
|
|
||||||
if (watchees.isEmpty() && pendingEntries.isEmpty()) {
|
|
||||||
|
|
||||||
// Mark the current worker thread as stopped.
|
|
||||||
// The following CAS must always success and must be uncontended,
|
|
||||||
// because only one watcher thread should be running at the same time.
|
|
||||||
boolean stopped = started.compareAndSet(true, false);
|
|
||||||
assert stopped;
|
|
||||||
|
|
||||||
// Check if there are pending entries added by watch() while we do CAS above.
|
|
||||||
if (pendingEntries.isEmpty()) {
|
|
||||||
// A) watch() was not invoked and thus there's nothing to handle
|
|
||||||
// -> safe to terminate because there's nothing left to do
|
|
||||||
// B) a new watcher thread started and handled them all
|
|
||||||
// -> safe to terminate the new watcher thread will take care the rest
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
// There are pending entries again, added by watch()
|
|
||||||
if (!started.compareAndSet(false, true)) {
|
|
||||||
// watch() started a new watcher thread and set 'started' to true.
|
|
||||||
// -> terminate this thread so that the new watcher reads from pendingEntries exclusively.
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
// watch() added an entry, but this worker was faster to set 'started' to true.
|
|
||||||
// i.e. a new watcher thread was not started
|
|
||||||
// -> keep this thread alive to handle the newly added entries.
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void fetchWatchees() {
|
|
||||||
for (;;) {
|
|
||||||
Entry e = pendingEntries.poll();
|
|
||||||
if (e == null) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (e.isWatch) {
|
|
||||||
watchees.add(e);
|
|
||||||
} else {
|
|
||||||
watchees.remove(e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void notifyWatchees() {
|
|
||||||
List<Entry> watchees = this.watchees;
|
|
||||||
for (int i = 0; i < watchees.size();) {
|
|
||||||
Entry e = watchees.get(i);
|
|
||||||
if (!e.thread.isAlive()) {
|
|
||||||
watchees.remove(i);
|
|
||||||
try {
|
|
||||||
e.task.run();
|
|
||||||
} catch (Throwable t) {
|
|
||||||
logger.warn("Thread death watcher task raised an exception:", t);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
i ++;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private static final class Entry {
|
|
||||||
final Thread thread;
|
|
||||||
final Runnable task;
|
|
||||||
final boolean isWatch;
|
|
||||||
|
|
||||||
Entry(Thread thread, Runnable task, boolean isWatch) {
|
|
||||||
this.thread = thread;
|
|
||||||
this.task = task;
|
|
||||||
this.isWatch = isWatch;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public int hashCode() {
|
|
||||||
return thread.hashCode() ^ task.hashCode();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean equals(Object obj) {
|
|
||||||
if (obj == this) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!(obj instanceof Entry)) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
Entry that = (Entry) obj;
|
|
||||||
return thread == that.thread && task == that.task;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,130 +0,0 @@
|
|||||||
/*
|
|
||||||
* Copyright 2014 The Netty Project
|
|
||||||
*
|
|
||||||
* The Netty Project licenses this file to you under the Apache License,
|
|
||||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at:
|
|
||||||
*
|
|
||||||
* https://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
||||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
||||||
* License for the specific language governing permissions and limitations
|
|
||||||
* under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package io.netty.util;
|
|
||||||
|
|
||||||
import org.junit.jupiter.api.Test;
|
|
||||||
import org.junit.jupiter.api.Timeout;
|
|
||||||
|
|
||||||
import java.util.concurrent.CountDownLatch;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
|
||||||
|
|
||||||
import static org.hamcrest.CoreMatchers.is;
|
|
||||||
import static org.hamcrest.MatcherAssert.assertThat;
|
|
||||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
|
||||||
import static org.junit.jupiter.api.Assertions.fail;
|
|
||||||
|
|
||||||
public class ThreadDeathWatcherTest {
|
|
||||||
|
|
||||||
@Test
|
|
||||||
@Timeout(value = 10000, unit = TimeUnit.MILLISECONDS)
|
|
||||||
public void testWatch() throws Exception {
|
|
||||||
final CountDownLatch latch = new CountDownLatch(1);
|
|
||||||
final Thread t = new Thread() {
|
|
||||||
@Override
|
|
||||||
public void run() {
|
|
||||||
for (;;) {
|
|
||||||
try {
|
|
||||||
Thread.sleep(1000);
|
|
||||||
} catch (InterruptedException ignore) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
final Runnable task = () -> {
|
|
||||||
if (!t.isAlive()) {
|
|
||||||
latch.countDown();
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
try {
|
|
||||||
ThreadDeathWatcher.watch(t, task);
|
|
||||||
fail("must reject to watch a non-alive thread.");
|
|
||||||
} catch (IllegalArgumentException e) {
|
|
||||||
// expected
|
|
||||||
}
|
|
||||||
|
|
||||||
t.start();
|
|
||||||
ThreadDeathWatcher.watch(t, task);
|
|
||||||
|
|
||||||
// As long as the thread is alive, the task should not run.
|
|
||||||
assertThat(latch.await(750, TimeUnit.MILLISECONDS), is(false));
|
|
||||||
|
|
||||||
// Interrupt the thread to terminate it.
|
|
||||||
t.interrupt();
|
|
||||||
|
|
||||||
// The task must be run on termination.
|
|
||||||
latch.await();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
@Timeout(value = 10000, unit = TimeUnit.MILLISECONDS)
|
|
||||||
public void testUnwatch() throws Exception {
|
|
||||||
final AtomicBoolean run = new AtomicBoolean();
|
|
||||||
final Thread t = new Thread() {
|
|
||||||
@Override
|
|
||||||
public void run() {
|
|
||||||
for (;;) {
|
|
||||||
try {
|
|
||||||
Thread.sleep(1000);
|
|
||||||
} catch (InterruptedException ignore) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
final Runnable task = () -> run.set(true);
|
|
||||||
|
|
||||||
t.start();
|
|
||||||
|
|
||||||
// Watch and then unwatch.
|
|
||||||
ThreadDeathWatcher.watch(t, task);
|
|
||||||
ThreadDeathWatcher.unwatch(t, task);
|
|
||||||
|
|
||||||
// Interrupt the thread to terminate it.
|
|
||||||
t.interrupt();
|
|
||||||
|
|
||||||
// Wait until the thread dies.
|
|
||||||
t.join();
|
|
||||||
|
|
||||||
// Wait until the watcher thread terminates itself.
|
|
||||||
assertThat(ThreadDeathWatcher.awaitInactivity(Long.MAX_VALUE, TimeUnit.SECONDS), is(true));
|
|
||||||
|
|
||||||
// And the task should not run.
|
|
||||||
assertThat(run.get(), is(false));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
@Timeout(value = 2000, unit = TimeUnit.MILLISECONDS)
|
|
||||||
public void testThreadGroup() throws InterruptedException {
|
|
||||||
final ThreadGroup group = new ThreadGroup("group");
|
|
||||||
final AtomicReference<ThreadGroup> capturedGroup = new AtomicReference<>();
|
|
||||||
final Thread thread = new Thread(group, () -> {
|
|
||||||
final Thread t = ThreadDeathWatcher.threadFactory.newThread(() -> {
|
|
||||||
});
|
|
||||||
capturedGroup.set(t.getThreadGroup());
|
|
||||||
});
|
|
||||||
thread.start();
|
|
||||||
thread.join();
|
|
||||||
|
|
||||||
assertEquals(group, capturedGroup.get());
|
|
||||||
}
|
|
||||||
}
|
|
Loading…
Reference in New Issue
Block a user