Introduce ObjectCleaner and use it in FastThreadLocal to ensure FastThreadLocal.onRemoval(...) is called

Motivation:

There is no guarantee that FastThreadLocal.onRemoval(...) is called if the FastThreadLocal is used by "non" FastThreacLocalThreads. This can lead to all sort of problems, like for example memory leaks as direct memory is not correctly cleaned up etc.

Beside this we use ThreadDeathWatcher to check if we need to release buffers back to the pool when thread local caches are collected. In the past ThreadDeathWatcher was used which will need to "wakeup" every second to check if the registered Threads are still alive. If we can ensure FastThreadLocal.onRemoval(...) is called we do not need this anymore.

Modifications:

- Introduce ObjectCleaner and use it to ensure FastThreadLocal.onRemoval(...) is always called when a Thread is collected.
- Deprecate ThreadDeathWatcher
- Add unit tests.

Result:

Consistent way of cleanup FastThreadLocals when a Thread is collected.
This commit is contained in:
Norman Maurer 2017-12-01 16:37:30 +01:00
parent 942b993f2b
commit e329ca1cf3
10 changed files with 315 additions and 66 deletions

View File

@ -20,7 +20,6 @@ package io.netty.buffer;
import io.netty.buffer.PoolArena.SizeClass;
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
import io.netty.util.ThreadDeathWatcher;
import io.netty.util.internal.MathUtil;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.logging.InternalLogger;
@ -56,9 +55,6 @@ final class PoolThreadCache {
private final int numShiftsNormalHeap;
private final int freeSweepAllocationThreshold;
private final Thread deathWatchThread;
private final Runnable freeTask;
private int allocations;
// TODO: Test if adding padding helps under contention
@ -66,8 +62,7 @@ final class PoolThreadCache {
PoolThreadCache(PoolArena<byte[]> heapArena, PoolArena<ByteBuffer> directArena,
int tinyCacheSize, int smallCacheSize, int normalCacheSize,
int maxCachedBufferCapacity, int freeSweepAllocationThreshold,
boolean useThreadDeathWatcher) {
int maxCachedBufferCapacity, int freeSweepAllocationThreshold) {
if (maxCachedBufferCapacity < 0) {
throw new IllegalArgumentException("maxCachedBufferCapacity: "
+ maxCachedBufferCapacity + " (expected: >= 0)");
@ -120,25 +115,6 @@ final class PoolThreadCache {
throw new IllegalArgumentException("freeSweepAllocationThreshold: "
+ freeSweepAllocationThreshold + " (expected: > 0)");
}
if (useThreadDeathWatcher) {
freeTask = new Runnable() {
@Override
public void run() {
free0();
}
};
deathWatchThread = Thread.currentThread();
// The thread-local cache will keep a list of pooled buffers which must be returned to
// the pool when the thread is not alive anymore.
ThreadDeathWatcher.watch(deathWatchThread, freeTask);
} else {
freeTask = null;
deathWatchThread = null;
}
}
private static <T> MemoryRegionCache<T>[] createSubPageCaches(
@ -247,14 +223,6 @@ final class PoolThreadCache {
* Should be called if the Thread that uses this cache is about to exist to release resources out of the cache
*/
void free() {
if (freeTask != null) {
assert deathWatchThread != null;
ThreadDeathWatcher.unwatch(deathWatchThread, freeTask);
}
free0();
}
private void free0() {
int numFreed = free(tinySubPageDirectCaches) +
free(smallSubPageDirectCaches) +
free(normalDirectCaches) +

View File

@ -436,18 +436,13 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator implements
final PoolArena<ByteBuffer> directArena = leastUsedArena(directArenas);
Thread current = Thread.currentThread();
boolean fastThread = current instanceof FastThreadLocalThread;
if (useCacheForAllThreads || current instanceof FastThreadLocalThread) {
// If our FastThreadLocalThread will call FastThreadLocal.removeAll() we not need to use
// the ThreadDeathWatcher to release memory from the PoolThreadCache once the Thread dies.
boolean useTheadWatcher = fastThread ?
!((FastThreadLocalThread) current).willCleanupFastThreadLocals() : true;
return new PoolThreadCache(
heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize,
DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL, useTheadWatcher);
DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL);
}
// No caching for non FastThreadLocalThreads.
return new PoolThreadCache(heapArena, directArena, 0, 0, 0, 0, 0, false);
// No caching so just use 0 as sizes.
return new PoolThreadCache(heapArena, directArena, 0, 0, 0, 0, 0);
}
@Override

View File

@ -241,43 +241,38 @@ public class PooledByteBufAllocatorTest extends AbstractByteBufAllocatorTest<Poo
}
@Test (timeout = 4000)
public void testThreadCacheDestroyedByThreadDeathWatcher() throws InterruptedException {
testThreadCacheDestroyedByThreadDeathWatcher(false);
public void testThreadCacheDestroyedByThreadCleaner() throws InterruptedException {
testThreadCacheDestroyed(false);
}
@Test (timeout = 4000)
public void testThreadCacheDestroyedAfterExitRun() throws InterruptedException {
testThreadCacheDestroyedByThreadDeathWatcher(true);
testThreadCacheDestroyed(true);
}
private static void testThreadCacheDestroyedByThreadDeathWatcher(boolean useRunnable) throws InterruptedException {
private static void testThreadCacheDestroyed(boolean useRunnable) throws InterruptedException {
int numArenas = 11;
final PooledByteBufAllocator allocator =
new PooledByteBufAllocator(numArenas, numArenas, 8192, 1);
final AtomicBoolean threadCachesCreated = new AtomicBoolean(true);
final CountDownLatch latch = new CountDownLatch(numArenas);
final Runnable task = new Runnable() {
@Override
public void run() {
try {
ByteBuf buf = allocator.newHeapBuffer(1024, 1024);
for (int i = 0; i < buf.capacity(); i++) {
buf.writeByte(0);
}
// Make sure that thread caches are actually created,
// so that down below we are not testing for zero
// thread caches without any of them ever having been initialized.
if (allocator.metric().numThreadLocalCaches() == 0) {
threadCachesCreated.set(false);
}
buf.release();
} finally {
latch.countDown();
ByteBuf buf = allocator.newHeapBuffer(1024, 1024);
for (int i = 0; i < buf.capacity(); i++) {
buf.writeByte(0);
}
// Make sure that thread caches are actually created,
// so that down below we are not testing for zero
// thread caches without any of them ever having been initialized.
if (allocator.metric().numThreadLocalCaches() == 0) {
threadCachesCreated.set(false);
}
buf.release();
}
};
@ -296,12 +291,14 @@ public class PooledByteBufAllocatorTest extends AbstractByteBufAllocatorTest<Poo
assertFalse(thread.willCleanupFastThreadLocals());
}
thread.start();
thread.join();
}
latch.await();
// Wait for the ThreadDeathWatcher to have destroyed all thread caches
while (allocator.metric().numThreadLocalCaches() > 0) {
// Signal we want to have a GC run to ensure we can process our ThreadCleanerReference
System.gc();
System.runFinalization();
LockSupport.parkNanos(MILLISECONDS.toNanos(100));
}

View File

@ -285,7 +285,7 @@ public abstract class Recycler<T> {
static WeakOrderQueue allocate(Stack<?> stack, Thread thread) {
// We allocated a Link so reserve the space
return reserveSpace(stack.availableSharedCapacity, LINK_CAPACITY)
? WeakOrderQueue.newQueue(stack, thread) : null;
? newQueue(stack, thread) : null;
}
private static boolean reserveSpace(AtomicInteger availableSharedCapacity, int space) {

View File

@ -37,7 +37,10 @@ import java.util.concurrent.atomic.AtomicBoolean;
* associated {@link Runnable}s. When there is no thread to watch (i.e. all threads are dead), the daemon thread
* will terminate itself, and a new daemon thread will be started again when a new watch is added.
* </p>
*
* @deprecated will be removed in the next major release
*/
@Deprecated
public final class ThreadDeathWatcher {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(ThreadDeathWatcher.class);

View File

@ -17,6 +17,7 @@ package io.netty.util.concurrent;
import io.netty.util.internal.InternalThreadLocalMap;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.ObjectCleaner;
import java.util.Collections;
import java.util.IdentityHashMap;
@ -131,8 +132,30 @@ public class FastThreadLocal<V> {
/**
* Returns the current value for the current thread
*/
@SuppressWarnings("unchecked")
public final V get() {
return get(InternalThreadLocalMap.get());
final InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
Object v = threadLocalMap.indexedVariable(index);
if (v != InternalThreadLocalMap.UNSET) {
return (V) v;
}
V value = initialize(threadLocalMap);
Thread current = Thread.currentThread();
if (!FastThreadLocalThread.willCleanupFastThreadLocals(current)) {
// We will need to ensure we will trigger remove(InternalThreadLocalMap) so everything will be released
// and FastThreadLocal.onRemoval(...) will be called.
ObjectCleaner.register(current, new Runnable() {
@Override
public void run() {
remove(threadLocalMap);
// It's fine to not call InternalThreadLocalMap.remove() here as this will only be triggered once
// the Thread is collected by GC. In this case the ThreadLocal will be gone away already.
}
});
}
return value;
}
/**

View File

@ -89,4 +89,13 @@ public class FastThreadLocalThread extends Thread {
public boolean willCleanupFastThreadLocals() {
return cleanupFastThreadLocals;
}
/**
* Returns {@code true} if {@link FastThreadLocal#removeAll()} will be called once {@link Thread#run()} completes.
*/
@UnstableApi
public static boolean willCleanupFastThreadLocals(Thread thread) {
return thread instanceof FastThreadLocalThread &&
((FastThreadLocalThread) thread).willCleanupFastThreadLocals();
}
}

View File

@ -0,0 +1,134 @@
/*
* Copyright 2017 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.util.internal;
import io.netty.util.concurrent.FastThreadLocalThread;
import java.lang.ref.ReferenceQueue;
import java.lang.ref.WeakReference;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* Allows a way to register some {@link Runnable} that will executed once there are no references to an {@link Object}
* anymore.
*/
public final class ObjectCleaner {
// This will hold a reference to the AutomaticCleanerReference which will be removed once we called cleanup()
private static final Set<AutomaticCleanerReference> LIVE_SET = new ConcurrentSet<AutomaticCleanerReference>();
private static final ReferenceQueue<Object> REFERENCE_QUEUE = new ReferenceQueue<Object>();
private static final AtomicBoolean CLEANER_RUNNING = new AtomicBoolean(false);
private static final String CLEANER_THREAD_NAME = ObjectCleaner.class.getSimpleName() + "Thread";
private static final Runnable CLEANER_TASK = new Runnable() {
@Override
public void run() {
boolean interrupted = false;
for (;;) {
// Keep on processing as long as the LIVE_SET is not empty and once it becomes empty
// See if we can let this thread complete.
while (!LIVE_SET.isEmpty()) {
try {
AutomaticCleanerReference reference =
(AutomaticCleanerReference) REFERENCE_QUEUE.remove();
try {
reference.cleanup();
} finally {
LIVE_SET.remove(reference);
}
} catch (InterruptedException ex) {
// Just consume and move on
interrupted = true;
}
}
CLEANER_RUNNING.set(false);
// Its important to first access the LIVE_SET and then CLEANER_RUNNING to ensure correct
// behavior in multi-threaded environments.
if (LIVE_SET.isEmpty() || !CLEANER_RUNNING.compareAndSet(false, true)) {
// There was nothing added after we set STARTED to false or some other cleanup Thread
// was started already so its safe to let this Thread complete now.
break;
}
}
if (interrupted) {
// As we caught the InterruptedException above we should mark the Thread as interrupted.
Thread.currentThread().interrupt();
}
}
};
/**
* Register the given {@link Object} for which the {@link Runnable} will be executed once there are no references
* to the object anymore.
*
* This should only be used if there are no other ways to execute some cleanup once the Object is not reachable
* anymore because it is not a cheap way to handle the cleanup.
*/
public static void register(Object object, Runnable cleanupTask) {
AutomaticCleanerReference reference = new AutomaticCleanerReference(object,
ObjectUtil.checkNotNull(cleanupTask, "cleanupTask"));
// Its important to add the reference to the LIVE_SET before we access CLEANER_RUNNING to ensure correct
// behavior in multi-threaded environments.
LIVE_SET.add(reference);
// Check if there is already a cleaner running.
if (CLEANER_RUNNING.compareAndSet(false, true)) {
Thread cleanupThread = new FastThreadLocalThread(CLEANER_TASK);
cleanupThread.setPriority(Thread.MIN_PRIORITY);
// Set to null to ensure we not create classloader leaks by holding a strong reference to the inherited
// classloader.
// See:
// - https://github.com/netty/netty/issues/7290
// - https://bugs.openjdk.java.net/browse/JDK-7008595
cleanupThread.setContextClassLoader(null);
cleanupThread.setName(CLEANER_THREAD_NAME);
// This Thread is not a daemon as it will die once all references to the registered Objects will go away
// and its important to always invoke all cleanup tasks as these may free up direct memory etc.
cleanupThread.setDaemon(false);
cleanupThread.start();
}
}
private ObjectCleaner() {
// Only contains a static method.
}
private static final class AutomaticCleanerReference extends WeakReference<Object> {
private final Runnable cleanupTask;
AutomaticCleanerReference(Object referent, Runnable cleanupTask) {
super(referent, REFERENCE_QUEUE);
this.cleanupTask = cleanupTask;
}
void cleanup() {
cleanupTask.run();
}
@Override
public Thread get() {
return null;
}
@Override
public void clear() {
LIVE_SET.remove(this);
super.clear();
}
}
}

View File

@ -75,4 +75,61 @@ public class FastThreadLocalTest {
throw t;
}
}
@Test(timeout = 4000)
public void testOnRemoveCalledForFastThreadLocal() throws Exception {
testOnRemoveCalled(true);
}
@Test(timeout = 4000)
public void testOnRemoveCalledForNonFastThreadLocal() throws Exception {
testOnRemoveCalled(false);
}
private static void testOnRemoveCalled(boolean fastThreadLocal) throws Exception {
final TestFastThreadLocal threadLocal = new TestFastThreadLocal();
final TestFastThreadLocal threadLocal2 = new TestFastThreadLocal();
Runnable runnable = new Runnable() {
@Override
public void run() {
assertEquals(Thread.currentThread().getName(), threadLocal.get());
assertEquals(Thread.currentThread().getName(), threadLocal2.get());
}
};
Thread thread = fastThreadLocal ? new FastThreadLocalThread(runnable) : new Thread(runnable);
thread.start();
thread.join();
String threadName = thread.getName();
// Null this out so it can be collected
thread = null;
// Loop until onRemoval(...) was called. This will fail the test if this not works due a timeout.
while (threadLocal.onRemovalCalled.get() == null || threadLocal2.onRemovalCalled.get() == null) {
System.gc();
System.runFinalization();
Thread.sleep(50);
}
assertEquals(threadName, threadLocal.onRemovalCalled.get());
assertEquals(threadName, threadLocal2.onRemovalCalled.get());
}
private static final class TestFastThreadLocal extends FastThreadLocal<String> {
final AtomicReference<String> onRemovalCalled = new AtomicReference<String>();
@Override
protected String initialValue() throws Exception {
return Thread.currentThread().getName();
}
@Override
protected void onRemoval(String value) throws Exception {
onRemovalCalled.set(value);
}
}
}

View File

@ -0,0 +1,63 @@
/*
* Copyright 2017 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.util.internal;
import org.junit.Assert;
import org.junit.Test;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
public class ObjectCleanerTest {
private Thread temporaryThread;
@Test(timeout = 5000)
public void testCleanup() throws Exception {
final AtomicBoolean freeCalled = new AtomicBoolean();
final CountDownLatch latch = new CountDownLatch(1);
temporaryThread = new Thread(new Runnable() {
@Override
public void run() {
try {
latch.await();
} catch (InterruptedException ignore) {
// just ignore
}
}
});
temporaryThread.start();
ObjectCleaner.register(temporaryThread, new Runnable() {
@Override
public void run() {
freeCalled.set(true);
}
});
latch.countDown();
temporaryThread.join();
Assert.assertFalse(freeCalled.get());
// Null out the temporary object to ensure it is enqueued for GC.
temporaryThread = null;
while (!freeCalled.get()) {
System.gc();
System.runFinalization();
Thread.sleep(100);
}
}
}