Introduce ObjectCleaner and use it in FastThreadLocal to ensure FastThreadLocal.onRemoval(...) is called
Motivation: There is no guarantee that FastThreadLocal.onRemoval(...) is called if the FastThreadLocal is used by "non" FastThreacLocalThreads. This can lead to all sort of problems, like for example memory leaks as direct memory is not correctly cleaned up etc. Beside this we use ThreadDeathWatcher to check if we need to release buffers back to the pool when thread local caches are collected. In the past ThreadDeathWatcher was used which will need to "wakeup" every second to check if the registered Threads are still alive. If we can ensure FastThreadLocal.onRemoval(...) is called we do not need this anymore. Modifications: - Introduce ObjectCleaner and use it to ensure FastThreadLocal.onRemoval(...) is always called when a Thread is collected. - Deprecate ThreadDeathWatcher - Add unit tests. Result: Consistent way of cleanup FastThreadLocals when a Thread is collected.
This commit is contained in:
parent
942b993f2b
commit
e329ca1cf3
@ -20,7 +20,6 @@ package io.netty.buffer;
|
||||
import io.netty.buffer.PoolArena.SizeClass;
|
||||
import io.netty.util.Recycler;
|
||||
import io.netty.util.Recycler.Handle;
|
||||
import io.netty.util.ThreadDeathWatcher;
|
||||
import io.netty.util.internal.MathUtil;
|
||||
import io.netty.util.internal.PlatformDependent;
|
||||
import io.netty.util.internal.logging.InternalLogger;
|
||||
@ -56,9 +55,6 @@ final class PoolThreadCache {
|
||||
private final int numShiftsNormalHeap;
|
||||
private final int freeSweepAllocationThreshold;
|
||||
|
||||
private final Thread deathWatchThread;
|
||||
private final Runnable freeTask;
|
||||
|
||||
private int allocations;
|
||||
|
||||
// TODO: Test if adding padding helps under contention
|
||||
@ -66,8 +62,7 @@ final class PoolThreadCache {
|
||||
|
||||
PoolThreadCache(PoolArena<byte[]> heapArena, PoolArena<ByteBuffer> directArena,
|
||||
int tinyCacheSize, int smallCacheSize, int normalCacheSize,
|
||||
int maxCachedBufferCapacity, int freeSweepAllocationThreshold,
|
||||
boolean useThreadDeathWatcher) {
|
||||
int maxCachedBufferCapacity, int freeSweepAllocationThreshold) {
|
||||
if (maxCachedBufferCapacity < 0) {
|
||||
throw new IllegalArgumentException("maxCachedBufferCapacity: "
|
||||
+ maxCachedBufferCapacity + " (expected: >= 0)");
|
||||
@ -120,25 +115,6 @@ final class PoolThreadCache {
|
||||
throw new IllegalArgumentException("freeSweepAllocationThreshold: "
|
||||
+ freeSweepAllocationThreshold + " (expected: > 0)");
|
||||
}
|
||||
|
||||
if (useThreadDeathWatcher) {
|
||||
|
||||
freeTask = new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
free0();
|
||||
}
|
||||
};
|
||||
|
||||
deathWatchThread = Thread.currentThread();
|
||||
|
||||
// The thread-local cache will keep a list of pooled buffers which must be returned to
|
||||
// the pool when the thread is not alive anymore.
|
||||
ThreadDeathWatcher.watch(deathWatchThread, freeTask);
|
||||
} else {
|
||||
freeTask = null;
|
||||
deathWatchThread = null;
|
||||
}
|
||||
}
|
||||
|
||||
private static <T> MemoryRegionCache<T>[] createSubPageCaches(
|
||||
@ -247,14 +223,6 @@ final class PoolThreadCache {
|
||||
* Should be called if the Thread that uses this cache is about to exist to release resources out of the cache
|
||||
*/
|
||||
void free() {
|
||||
if (freeTask != null) {
|
||||
assert deathWatchThread != null;
|
||||
ThreadDeathWatcher.unwatch(deathWatchThread, freeTask);
|
||||
}
|
||||
free0();
|
||||
}
|
||||
|
||||
private void free0() {
|
||||
int numFreed = free(tinySubPageDirectCaches) +
|
||||
free(smallSubPageDirectCaches) +
|
||||
free(normalDirectCaches) +
|
||||
|
@ -436,18 +436,13 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator implements
|
||||
final PoolArena<ByteBuffer> directArena = leastUsedArena(directArenas);
|
||||
|
||||
Thread current = Thread.currentThread();
|
||||
boolean fastThread = current instanceof FastThreadLocalThread;
|
||||
if (useCacheForAllThreads || current instanceof FastThreadLocalThread) {
|
||||
// If our FastThreadLocalThread will call FastThreadLocal.removeAll() we not need to use
|
||||
// the ThreadDeathWatcher to release memory from the PoolThreadCache once the Thread dies.
|
||||
boolean useTheadWatcher = fastThread ?
|
||||
!((FastThreadLocalThread) current).willCleanupFastThreadLocals() : true;
|
||||
return new PoolThreadCache(
|
||||
heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize,
|
||||
DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL, useTheadWatcher);
|
||||
DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL);
|
||||
}
|
||||
// No caching for non FastThreadLocalThreads.
|
||||
return new PoolThreadCache(heapArena, directArena, 0, 0, 0, 0, 0, false);
|
||||
// No caching so just use 0 as sizes.
|
||||
return new PoolThreadCache(heapArena, directArena, 0, 0, 0, 0, 0);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -241,43 +241,38 @@ public class PooledByteBufAllocatorTest extends AbstractByteBufAllocatorTest<Poo
|
||||
}
|
||||
|
||||
@Test (timeout = 4000)
|
||||
public void testThreadCacheDestroyedByThreadDeathWatcher() throws InterruptedException {
|
||||
testThreadCacheDestroyedByThreadDeathWatcher(false);
|
||||
public void testThreadCacheDestroyedByThreadCleaner() throws InterruptedException {
|
||||
testThreadCacheDestroyed(false);
|
||||
}
|
||||
|
||||
@Test (timeout = 4000)
|
||||
public void testThreadCacheDestroyedAfterExitRun() throws InterruptedException {
|
||||
testThreadCacheDestroyedByThreadDeathWatcher(true);
|
||||
testThreadCacheDestroyed(true);
|
||||
}
|
||||
|
||||
private static void testThreadCacheDestroyedByThreadDeathWatcher(boolean useRunnable) throws InterruptedException {
|
||||
private static void testThreadCacheDestroyed(boolean useRunnable) throws InterruptedException {
|
||||
int numArenas = 11;
|
||||
final PooledByteBufAllocator allocator =
|
||||
new PooledByteBufAllocator(numArenas, numArenas, 8192, 1);
|
||||
|
||||
final AtomicBoolean threadCachesCreated = new AtomicBoolean(true);
|
||||
final CountDownLatch latch = new CountDownLatch(numArenas);
|
||||
|
||||
final Runnable task = new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
ByteBuf buf = allocator.newHeapBuffer(1024, 1024);
|
||||
for (int i = 0; i < buf.capacity(); i++) {
|
||||
buf.writeByte(0);
|
||||
}
|
||||
|
||||
// Make sure that thread caches are actually created,
|
||||
// so that down below we are not testing for zero
|
||||
// thread caches without any of them ever having been initialized.
|
||||
if (allocator.metric().numThreadLocalCaches() == 0) {
|
||||
threadCachesCreated.set(false);
|
||||
}
|
||||
|
||||
buf.release();
|
||||
} finally {
|
||||
latch.countDown();
|
||||
ByteBuf buf = allocator.newHeapBuffer(1024, 1024);
|
||||
for (int i = 0; i < buf.capacity(); i++) {
|
||||
buf.writeByte(0);
|
||||
}
|
||||
|
||||
// Make sure that thread caches are actually created,
|
||||
// so that down below we are not testing for zero
|
||||
// thread caches without any of them ever having been initialized.
|
||||
if (allocator.metric().numThreadLocalCaches() == 0) {
|
||||
threadCachesCreated.set(false);
|
||||
}
|
||||
|
||||
buf.release();
|
||||
}
|
||||
};
|
||||
|
||||
@ -296,12 +291,14 @@ public class PooledByteBufAllocatorTest extends AbstractByteBufAllocatorTest<Poo
|
||||
assertFalse(thread.willCleanupFastThreadLocals());
|
||||
}
|
||||
thread.start();
|
||||
thread.join();
|
||||
}
|
||||
|
||||
latch.await();
|
||||
|
||||
// Wait for the ThreadDeathWatcher to have destroyed all thread caches
|
||||
while (allocator.metric().numThreadLocalCaches() > 0) {
|
||||
// Signal we want to have a GC run to ensure we can process our ThreadCleanerReference
|
||||
System.gc();
|
||||
System.runFinalization();
|
||||
LockSupport.parkNanos(MILLISECONDS.toNanos(100));
|
||||
}
|
||||
|
||||
|
@ -285,7 +285,7 @@ public abstract class Recycler<T> {
|
||||
static WeakOrderQueue allocate(Stack<?> stack, Thread thread) {
|
||||
// We allocated a Link so reserve the space
|
||||
return reserveSpace(stack.availableSharedCapacity, LINK_CAPACITY)
|
||||
? WeakOrderQueue.newQueue(stack, thread) : null;
|
||||
? newQueue(stack, thread) : null;
|
||||
}
|
||||
|
||||
private static boolean reserveSpace(AtomicInteger availableSharedCapacity, int space) {
|
||||
|
@ -37,7 +37,10 @@ import java.util.concurrent.atomic.AtomicBoolean;
|
||||
* associated {@link Runnable}s. When there is no thread to watch (i.e. all threads are dead), the daemon thread
|
||||
* will terminate itself, and a new daemon thread will be started again when a new watch is added.
|
||||
* </p>
|
||||
*
|
||||
* @deprecated will be removed in the next major release
|
||||
*/
|
||||
@Deprecated
|
||||
public final class ThreadDeathWatcher {
|
||||
|
||||
private static final InternalLogger logger = InternalLoggerFactory.getInstance(ThreadDeathWatcher.class);
|
||||
|
@ -17,6 +17,7 @@ package io.netty.util.concurrent;
|
||||
|
||||
import io.netty.util.internal.InternalThreadLocalMap;
|
||||
import io.netty.util.internal.PlatformDependent;
|
||||
import io.netty.util.internal.ObjectCleaner;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.IdentityHashMap;
|
||||
@ -131,8 +132,30 @@ public class FastThreadLocal<V> {
|
||||
/**
|
||||
* Returns the current value for the current thread
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public final V get() {
|
||||
return get(InternalThreadLocalMap.get());
|
||||
final InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
|
||||
Object v = threadLocalMap.indexedVariable(index);
|
||||
if (v != InternalThreadLocalMap.UNSET) {
|
||||
return (V) v;
|
||||
}
|
||||
|
||||
V value = initialize(threadLocalMap);
|
||||
Thread current = Thread.currentThread();
|
||||
if (!FastThreadLocalThread.willCleanupFastThreadLocals(current)) {
|
||||
// We will need to ensure we will trigger remove(InternalThreadLocalMap) so everything will be released
|
||||
// and FastThreadLocal.onRemoval(...) will be called.
|
||||
ObjectCleaner.register(current, new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
remove(threadLocalMap);
|
||||
|
||||
// It's fine to not call InternalThreadLocalMap.remove() here as this will only be triggered once
|
||||
// the Thread is collected by GC. In this case the ThreadLocal will be gone away already.
|
||||
}
|
||||
});
|
||||
}
|
||||
return value;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -89,4 +89,13 @@ public class FastThreadLocalThread extends Thread {
|
||||
public boolean willCleanupFastThreadLocals() {
|
||||
return cleanupFastThreadLocals;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns {@code true} if {@link FastThreadLocal#removeAll()} will be called once {@link Thread#run()} completes.
|
||||
*/
|
||||
@UnstableApi
|
||||
public static boolean willCleanupFastThreadLocals(Thread thread) {
|
||||
return thread instanceof FastThreadLocalThread &&
|
||||
((FastThreadLocalThread) thread).willCleanupFastThreadLocals();
|
||||
}
|
||||
}
|
||||
|
134
common/src/main/java/io/netty/util/internal/ObjectCleaner.java
Normal file
134
common/src/main/java/io/netty/util/internal/ObjectCleaner.java
Normal file
@ -0,0 +1,134 @@
|
||||
/*
|
||||
* Copyright 2017 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.netty.util.internal;
|
||||
|
||||
import io.netty.util.concurrent.FastThreadLocalThread;
|
||||
|
||||
import java.lang.ref.ReferenceQueue;
|
||||
import java.lang.ref.WeakReference;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
/**
|
||||
* Allows a way to register some {@link Runnable} that will executed once there are no references to an {@link Object}
|
||||
* anymore.
|
||||
*/
|
||||
public final class ObjectCleaner {
|
||||
|
||||
// This will hold a reference to the AutomaticCleanerReference which will be removed once we called cleanup()
|
||||
private static final Set<AutomaticCleanerReference> LIVE_SET = new ConcurrentSet<AutomaticCleanerReference>();
|
||||
private static final ReferenceQueue<Object> REFERENCE_QUEUE = new ReferenceQueue<Object>();
|
||||
private static final AtomicBoolean CLEANER_RUNNING = new AtomicBoolean(false);
|
||||
private static final String CLEANER_THREAD_NAME = ObjectCleaner.class.getSimpleName() + "Thread";
|
||||
private static final Runnable CLEANER_TASK = new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
boolean interrupted = false;
|
||||
for (;;) {
|
||||
// Keep on processing as long as the LIVE_SET is not empty and once it becomes empty
|
||||
// See if we can let this thread complete.
|
||||
while (!LIVE_SET.isEmpty()) {
|
||||
try {
|
||||
AutomaticCleanerReference reference =
|
||||
(AutomaticCleanerReference) REFERENCE_QUEUE.remove();
|
||||
try {
|
||||
reference.cleanup();
|
||||
} finally {
|
||||
LIVE_SET.remove(reference);
|
||||
}
|
||||
} catch (InterruptedException ex) {
|
||||
// Just consume and move on
|
||||
interrupted = true;
|
||||
}
|
||||
}
|
||||
CLEANER_RUNNING.set(false);
|
||||
|
||||
// Its important to first access the LIVE_SET and then CLEANER_RUNNING to ensure correct
|
||||
// behavior in multi-threaded environments.
|
||||
if (LIVE_SET.isEmpty() || !CLEANER_RUNNING.compareAndSet(false, true)) {
|
||||
// There was nothing added after we set STARTED to false or some other cleanup Thread
|
||||
// was started already so its safe to let this Thread complete now.
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (interrupted) {
|
||||
// As we caught the InterruptedException above we should mark the Thread as interrupted.
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* Register the given {@link Object} for which the {@link Runnable} will be executed once there are no references
|
||||
* to the object anymore.
|
||||
*
|
||||
* This should only be used if there are no other ways to execute some cleanup once the Object is not reachable
|
||||
* anymore because it is not a cheap way to handle the cleanup.
|
||||
*/
|
||||
public static void register(Object object, Runnable cleanupTask) {
|
||||
AutomaticCleanerReference reference = new AutomaticCleanerReference(object,
|
||||
ObjectUtil.checkNotNull(cleanupTask, "cleanupTask"));
|
||||
// Its important to add the reference to the LIVE_SET before we access CLEANER_RUNNING to ensure correct
|
||||
// behavior in multi-threaded environments.
|
||||
LIVE_SET.add(reference);
|
||||
|
||||
// Check if there is already a cleaner running.
|
||||
if (CLEANER_RUNNING.compareAndSet(false, true)) {
|
||||
Thread cleanupThread = new FastThreadLocalThread(CLEANER_TASK);
|
||||
cleanupThread.setPriority(Thread.MIN_PRIORITY);
|
||||
// Set to null to ensure we not create classloader leaks by holding a strong reference to the inherited
|
||||
// classloader.
|
||||
// See:
|
||||
// - https://github.com/netty/netty/issues/7290
|
||||
// - https://bugs.openjdk.java.net/browse/JDK-7008595
|
||||
cleanupThread.setContextClassLoader(null);
|
||||
cleanupThread.setName(CLEANER_THREAD_NAME);
|
||||
|
||||
// This Thread is not a daemon as it will die once all references to the registered Objects will go away
|
||||
// and its important to always invoke all cleanup tasks as these may free up direct memory etc.
|
||||
cleanupThread.setDaemon(false);
|
||||
cleanupThread.start();
|
||||
}
|
||||
}
|
||||
|
||||
private ObjectCleaner() {
|
||||
// Only contains a static method.
|
||||
}
|
||||
|
||||
private static final class AutomaticCleanerReference extends WeakReference<Object> {
|
||||
private final Runnable cleanupTask;
|
||||
|
||||
AutomaticCleanerReference(Object referent, Runnable cleanupTask) {
|
||||
super(referent, REFERENCE_QUEUE);
|
||||
this.cleanupTask = cleanupTask;
|
||||
}
|
||||
|
||||
void cleanup() {
|
||||
cleanupTask.run();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Thread get() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clear() {
|
||||
LIVE_SET.remove(this);
|
||||
super.clear();
|
||||
}
|
||||
}
|
||||
}
|
@ -75,4 +75,61 @@ public class FastThreadLocalTest {
|
||||
throw t;
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 4000)
|
||||
public void testOnRemoveCalledForFastThreadLocal() throws Exception {
|
||||
testOnRemoveCalled(true);
|
||||
}
|
||||
|
||||
@Test(timeout = 4000)
|
||||
public void testOnRemoveCalledForNonFastThreadLocal() throws Exception {
|
||||
testOnRemoveCalled(false);
|
||||
}
|
||||
|
||||
private static void testOnRemoveCalled(boolean fastThreadLocal) throws Exception {
|
||||
|
||||
final TestFastThreadLocal threadLocal = new TestFastThreadLocal();
|
||||
final TestFastThreadLocal threadLocal2 = new TestFastThreadLocal();
|
||||
|
||||
Runnable runnable = new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
assertEquals(Thread.currentThread().getName(), threadLocal.get());
|
||||
assertEquals(Thread.currentThread().getName(), threadLocal2.get());
|
||||
}
|
||||
};
|
||||
Thread thread = fastThreadLocal ? new FastThreadLocalThread(runnable) : new Thread(runnable);
|
||||
thread.start();
|
||||
thread.join();
|
||||
|
||||
String threadName = thread.getName();
|
||||
|
||||
// Null this out so it can be collected
|
||||
thread = null;
|
||||
|
||||
// Loop until onRemoval(...) was called. This will fail the test if this not works due a timeout.
|
||||
while (threadLocal.onRemovalCalled.get() == null || threadLocal2.onRemovalCalled.get() == null) {
|
||||
System.gc();
|
||||
System.runFinalization();
|
||||
Thread.sleep(50);
|
||||
}
|
||||
|
||||
assertEquals(threadName, threadLocal.onRemovalCalled.get());
|
||||
assertEquals(threadName, threadLocal2.onRemovalCalled.get());
|
||||
}
|
||||
|
||||
private static final class TestFastThreadLocal extends FastThreadLocal<String> {
|
||||
|
||||
final AtomicReference<String> onRemovalCalled = new AtomicReference<String>();
|
||||
|
||||
@Override
|
||||
protected String initialValue() throws Exception {
|
||||
return Thread.currentThread().getName();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void onRemoval(String value) throws Exception {
|
||||
onRemovalCalled.set(value);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,63 @@
|
||||
/*
|
||||
* Copyright 2017 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.netty.util.internal;
|
||||
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
public class ObjectCleanerTest {
|
||||
|
||||
private Thread temporaryThread;
|
||||
|
||||
@Test(timeout = 5000)
|
||||
public void testCleanup() throws Exception {
|
||||
final AtomicBoolean freeCalled = new AtomicBoolean();
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
temporaryThread = new Thread(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
latch.await();
|
||||
} catch (InterruptedException ignore) {
|
||||
// just ignore
|
||||
}
|
||||
}
|
||||
});
|
||||
temporaryThread.start();
|
||||
ObjectCleaner.register(temporaryThread, new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
freeCalled.set(true);
|
||||
}
|
||||
});
|
||||
|
||||
latch.countDown();
|
||||
temporaryThread.join();
|
||||
Assert.assertFalse(freeCalled.get());
|
||||
|
||||
// Null out the temporary object to ensure it is enqueued for GC.
|
||||
temporaryThread = null;
|
||||
|
||||
while (!freeCalled.get()) {
|
||||
System.gc();
|
||||
System.runFinalization();
|
||||
Thread.sleep(100);
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user