diff --git a/buffer/src/main/java/io/netty/buffer/PoolThreadCache.java b/buffer/src/main/java/io/netty/buffer/PoolThreadCache.java index a4cb574b99..3503748c0d 100644 --- a/buffer/src/main/java/io/netty/buffer/PoolThreadCache.java +++ b/buffer/src/main/java/io/netty/buffer/PoolThreadCache.java @@ -20,7 +20,6 @@ package io.netty.buffer; import io.netty.buffer.PoolArena.SizeClass; import io.netty.util.Recycler; import io.netty.util.Recycler.Handle; -import io.netty.util.ThreadDeathWatcher; import io.netty.util.internal.MathUtil; import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.logging.InternalLogger; @@ -56,9 +55,6 @@ final class PoolThreadCache { private final int numShiftsNormalHeap; private final int freeSweepAllocationThreshold; - private final Thread deathWatchThread; - private final Runnable freeTask; - private int allocations; // TODO: Test if adding padding helps under contention @@ -66,8 +62,7 @@ final class PoolThreadCache { PoolThreadCache(PoolArena heapArena, PoolArena directArena, int tinyCacheSize, int smallCacheSize, int normalCacheSize, - int maxCachedBufferCapacity, int freeSweepAllocationThreshold, - boolean useThreadDeathWatcher) { + int maxCachedBufferCapacity, int freeSweepAllocationThreshold) { if (maxCachedBufferCapacity < 0) { throw new IllegalArgumentException("maxCachedBufferCapacity: " + maxCachedBufferCapacity + " (expected: >= 0)"); @@ -120,25 +115,6 @@ final class PoolThreadCache { throw new IllegalArgumentException("freeSweepAllocationThreshold: " + freeSweepAllocationThreshold + " (expected: > 0)"); } - - if (useThreadDeathWatcher) { - - freeTask = new Runnable() { - @Override - public void run() { - free0(); - } - }; - - deathWatchThread = Thread.currentThread(); - - // The thread-local cache will keep a list of pooled buffers which must be returned to - // the pool when the thread is not alive anymore. - ThreadDeathWatcher.watch(deathWatchThread, freeTask); - } else { - freeTask = null; - deathWatchThread = null; - } } private static MemoryRegionCache[] createSubPageCaches( @@ -247,14 +223,6 @@ final class PoolThreadCache { * Should be called if the Thread that uses this cache is about to exist to release resources out of the cache */ void free() { - if (freeTask != null) { - assert deathWatchThread != null; - ThreadDeathWatcher.unwatch(deathWatchThread, freeTask); - } - free0(); - } - - private void free0() { int numFreed = free(tinySubPageDirectCaches) + free(smallSubPageDirectCaches) + free(normalDirectCaches) + diff --git a/buffer/src/main/java/io/netty/buffer/PooledByteBufAllocator.java b/buffer/src/main/java/io/netty/buffer/PooledByteBufAllocator.java index cef3d4f4f8..b613fea9e1 100644 --- a/buffer/src/main/java/io/netty/buffer/PooledByteBufAllocator.java +++ b/buffer/src/main/java/io/netty/buffer/PooledByteBufAllocator.java @@ -436,18 +436,13 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator implements final PoolArena directArena = leastUsedArena(directArenas); Thread current = Thread.currentThread(); - boolean fastThread = current instanceof FastThreadLocalThread; if (useCacheForAllThreads || current instanceof FastThreadLocalThread) { - // If our FastThreadLocalThread will call FastThreadLocal.removeAll() we not need to use - // the ThreadDeathWatcher to release memory from the PoolThreadCache once the Thread dies. - boolean useTheadWatcher = fastThread ? - !((FastThreadLocalThread) current).willCleanupFastThreadLocals() : true; return new PoolThreadCache( heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize, - DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL, useTheadWatcher); + DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL); } - // No caching for non FastThreadLocalThreads. - return new PoolThreadCache(heapArena, directArena, 0, 0, 0, 0, 0, false); + // No caching so just use 0 as sizes. + return new PoolThreadCache(heapArena, directArena, 0, 0, 0, 0, 0); } @Override diff --git a/buffer/src/test/java/io/netty/buffer/PooledByteBufAllocatorTest.java b/buffer/src/test/java/io/netty/buffer/PooledByteBufAllocatorTest.java index d0676ad264..495bb76540 100644 --- a/buffer/src/test/java/io/netty/buffer/PooledByteBufAllocatorTest.java +++ b/buffer/src/test/java/io/netty/buffer/PooledByteBufAllocatorTest.java @@ -241,43 +241,38 @@ public class PooledByteBufAllocatorTest extends AbstractByteBufAllocatorTest 0) { + // Signal we want to have a GC run to ensure we can process our ThreadCleanerReference + System.gc(); + System.runFinalization(); LockSupport.parkNanos(MILLISECONDS.toNanos(100)); } diff --git a/common/src/main/java/io/netty/util/Recycler.java b/common/src/main/java/io/netty/util/Recycler.java index 8bedc37fb6..a47bd1aabc 100644 --- a/common/src/main/java/io/netty/util/Recycler.java +++ b/common/src/main/java/io/netty/util/Recycler.java @@ -285,7 +285,7 @@ public abstract class Recycler { static WeakOrderQueue allocate(Stack stack, Thread thread) { // We allocated a Link so reserve the space return reserveSpace(stack.availableSharedCapacity, LINK_CAPACITY) - ? WeakOrderQueue.newQueue(stack, thread) : null; + ? newQueue(stack, thread) : null; } private static boolean reserveSpace(AtomicInteger availableSharedCapacity, int space) { diff --git a/common/src/main/java/io/netty/util/ThreadDeathWatcher.java b/common/src/main/java/io/netty/util/ThreadDeathWatcher.java index 6d487533c9..10e38143b9 100644 --- a/common/src/main/java/io/netty/util/ThreadDeathWatcher.java +++ b/common/src/main/java/io/netty/util/ThreadDeathWatcher.java @@ -37,7 +37,10 @@ import java.util.concurrent.atomic.AtomicBoolean; * associated {@link Runnable}s. When there is no thread to watch (i.e. all threads are dead), the daemon thread * will terminate itself, and a new daemon thread will be started again when a new watch is added. *

+ * + * @deprecated will be removed in the next major release */ +@Deprecated public final class ThreadDeathWatcher { private static final InternalLogger logger = InternalLoggerFactory.getInstance(ThreadDeathWatcher.class); diff --git a/common/src/main/java/io/netty/util/concurrent/FastThreadLocal.java b/common/src/main/java/io/netty/util/concurrent/FastThreadLocal.java index 554c129ca6..2881a356f9 100644 --- a/common/src/main/java/io/netty/util/concurrent/FastThreadLocal.java +++ b/common/src/main/java/io/netty/util/concurrent/FastThreadLocal.java @@ -17,6 +17,7 @@ package io.netty.util.concurrent; import io.netty.util.internal.InternalThreadLocalMap; import io.netty.util.internal.PlatformDependent; +import io.netty.util.internal.ObjectCleaner; import java.util.Collections; import java.util.IdentityHashMap; @@ -131,8 +132,30 @@ public class FastThreadLocal { /** * Returns the current value for the current thread */ + @SuppressWarnings("unchecked") public final V get() { - return get(InternalThreadLocalMap.get()); + final InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get(); + Object v = threadLocalMap.indexedVariable(index); + if (v != InternalThreadLocalMap.UNSET) { + return (V) v; + } + + V value = initialize(threadLocalMap); + Thread current = Thread.currentThread(); + if (!FastThreadLocalThread.willCleanupFastThreadLocals(current)) { + // We will need to ensure we will trigger remove(InternalThreadLocalMap) so everything will be released + // and FastThreadLocal.onRemoval(...) will be called. + ObjectCleaner.register(current, new Runnable() { + @Override + public void run() { + remove(threadLocalMap); + + // It's fine to not call InternalThreadLocalMap.remove() here as this will only be triggered once + // the Thread is collected by GC. In this case the ThreadLocal will be gone away already. + } + }); + } + return value; } /** diff --git a/common/src/main/java/io/netty/util/concurrent/FastThreadLocalThread.java b/common/src/main/java/io/netty/util/concurrent/FastThreadLocalThread.java index 35fef3fe12..704509f5dd 100644 --- a/common/src/main/java/io/netty/util/concurrent/FastThreadLocalThread.java +++ b/common/src/main/java/io/netty/util/concurrent/FastThreadLocalThread.java @@ -89,4 +89,13 @@ public class FastThreadLocalThread extends Thread { public boolean willCleanupFastThreadLocals() { return cleanupFastThreadLocals; } + + /** + * Returns {@code true} if {@link FastThreadLocal#removeAll()} will be called once {@link Thread#run()} completes. + */ + @UnstableApi + public static boolean willCleanupFastThreadLocals(Thread thread) { + return thread instanceof FastThreadLocalThread && + ((FastThreadLocalThread) thread).willCleanupFastThreadLocals(); + } } diff --git a/common/src/main/java/io/netty/util/internal/ObjectCleaner.java b/common/src/main/java/io/netty/util/internal/ObjectCleaner.java new file mode 100644 index 0000000000..49be65434b --- /dev/null +++ b/common/src/main/java/io/netty/util/internal/ObjectCleaner.java @@ -0,0 +1,134 @@ +/* + * Copyright 2017 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.util.internal; + +import io.netty.util.concurrent.FastThreadLocalThread; + +import java.lang.ref.ReferenceQueue; +import java.lang.ref.WeakReference; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Allows a way to register some {@link Runnable} that will executed once there are no references to an {@link Object} + * anymore. + */ +public final class ObjectCleaner { + + // This will hold a reference to the AutomaticCleanerReference which will be removed once we called cleanup() + private static final Set LIVE_SET = new ConcurrentSet(); + private static final ReferenceQueue REFERENCE_QUEUE = new ReferenceQueue(); + private static final AtomicBoolean CLEANER_RUNNING = new AtomicBoolean(false); + private static final String CLEANER_THREAD_NAME = ObjectCleaner.class.getSimpleName() + "Thread"; + private static final Runnable CLEANER_TASK = new Runnable() { + @Override + public void run() { + boolean interrupted = false; + for (;;) { + // Keep on processing as long as the LIVE_SET is not empty and once it becomes empty + // See if we can let this thread complete. + while (!LIVE_SET.isEmpty()) { + try { + AutomaticCleanerReference reference = + (AutomaticCleanerReference) REFERENCE_QUEUE.remove(); + try { + reference.cleanup(); + } finally { + LIVE_SET.remove(reference); + } + } catch (InterruptedException ex) { + // Just consume and move on + interrupted = true; + } + } + CLEANER_RUNNING.set(false); + + // Its important to first access the LIVE_SET and then CLEANER_RUNNING to ensure correct + // behavior in multi-threaded environments. + if (LIVE_SET.isEmpty() || !CLEANER_RUNNING.compareAndSet(false, true)) { + // There was nothing added after we set STARTED to false or some other cleanup Thread + // was started already so its safe to let this Thread complete now. + break; + } + } + if (interrupted) { + // As we caught the InterruptedException above we should mark the Thread as interrupted. + Thread.currentThread().interrupt(); + } + } + }; + + /** + * Register the given {@link Object} for which the {@link Runnable} will be executed once there are no references + * to the object anymore. + * + * This should only be used if there are no other ways to execute some cleanup once the Object is not reachable + * anymore because it is not a cheap way to handle the cleanup. + */ + public static void register(Object object, Runnable cleanupTask) { + AutomaticCleanerReference reference = new AutomaticCleanerReference(object, + ObjectUtil.checkNotNull(cleanupTask, "cleanupTask")); + // Its important to add the reference to the LIVE_SET before we access CLEANER_RUNNING to ensure correct + // behavior in multi-threaded environments. + LIVE_SET.add(reference); + + // Check if there is already a cleaner running. + if (CLEANER_RUNNING.compareAndSet(false, true)) { + Thread cleanupThread = new FastThreadLocalThread(CLEANER_TASK); + cleanupThread.setPriority(Thread.MIN_PRIORITY); + // Set to null to ensure we not create classloader leaks by holding a strong reference to the inherited + // classloader. + // See: + // - https://github.com/netty/netty/issues/7290 + // - https://bugs.openjdk.java.net/browse/JDK-7008595 + cleanupThread.setContextClassLoader(null); + cleanupThread.setName(CLEANER_THREAD_NAME); + + // This Thread is not a daemon as it will die once all references to the registered Objects will go away + // and its important to always invoke all cleanup tasks as these may free up direct memory etc. + cleanupThread.setDaemon(false); + cleanupThread.start(); + } + } + + private ObjectCleaner() { + // Only contains a static method. + } + + private static final class AutomaticCleanerReference extends WeakReference { + private final Runnable cleanupTask; + + AutomaticCleanerReference(Object referent, Runnable cleanupTask) { + super(referent, REFERENCE_QUEUE); + this.cleanupTask = cleanupTask; + } + + void cleanup() { + cleanupTask.run(); + } + + @Override + public Thread get() { + return null; + } + + @Override + public void clear() { + LIVE_SET.remove(this); + super.clear(); + } + } +} diff --git a/common/src/test/java/io/netty/util/concurrent/FastThreadLocalTest.java b/common/src/test/java/io/netty/util/concurrent/FastThreadLocalTest.java index 2a758781f0..b7ac6bdc34 100644 --- a/common/src/test/java/io/netty/util/concurrent/FastThreadLocalTest.java +++ b/common/src/test/java/io/netty/util/concurrent/FastThreadLocalTest.java @@ -75,4 +75,61 @@ public class FastThreadLocalTest { throw t; } } + + @Test(timeout = 4000) + public void testOnRemoveCalledForFastThreadLocal() throws Exception { + testOnRemoveCalled(true); + } + + @Test(timeout = 4000) + public void testOnRemoveCalledForNonFastThreadLocal() throws Exception { + testOnRemoveCalled(false); + } + + private static void testOnRemoveCalled(boolean fastThreadLocal) throws Exception { + + final TestFastThreadLocal threadLocal = new TestFastThreadLocal(); + final TestFastThreadLocal threadLocal2 = new TestFastThreadLocal(); + + Runnable runnable = new Runnable() { + @Override + public void run() { + assertEquals(Thread.currentThread().getName(), threadLocal.get()); + assertEquals(Thread.currentThread().getName(), threadLocal2.get()); + } + }; + Thread thread = fastThreadLocal ? new FastThreadLocalThread(runnable) : new Thread(runnable); + thread.start(); + thread.join(); + + String threadName = thread.getName(); + + // Null this out so it can be collected + thread = null; + + // Loop until onRemoval(...) was called. This will fail the test if this not works due a timeout. + while (threadLocal.onRemovalCalled.get() == null || threadLocal2.onRemovalCalled.get() == null) { + System.gc(); + System.runFinalization(); + Thread.sleep(50); + } + + assertEquals(threadName, threadLocal.onRemovalCalled.get()); + assertEquals(threadName, threadLocal2.onRemovalCalled.get()); + } + + private static final class TestFastThreadLocal extends FastThreadLocal { + + final AtomicReference onRemovalCalled = new AtomicReference(); + + @Override + protected String initialValue() throws Exception { + return Thread.currentThread().getName(); + } + + @Override + protected void onRemoval(String value) throws Exception { + onRemovalCalled.set(value); + } + } } diff --git a/common/src/test/java/io/netty/util/internal/ObjectCleanerTest.java b/common/src/test/java/io/netty/util/internal/ObjectCleanerTest.java new file mode 100644 index 0000000000..e57d2c6f25 --- /dev/null +++ b/common/src/test/java/io/netty/util/internal/ObjectCleanerTest.java @@ -0,0 +1,63 @@ +/* + * Copyright 2017 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.util.internal; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; + +public class ObjectCleanerTest { + + private Thread temporaryThread; + + @Test(timeout = 5000) + public void testCleanup() throws Exception { + final AtomicBoolean freeCalled = new AtomicBoolean(); + final CountDownLatch latch = new CountDownLatch(1); + temporaryThread = new Thread(new Runnable() { + @Override + public void run() { + try { + latch.await(); + } catch (InterruptedException ignore) { + // just ignore + } + } + }); + temporaryThread.start(); + ObjectCleaner.register(temporaryThread, new Runnable() { + @Override + public void run() { + freeCalled.set(true); + } + }); + + latch.countDown(); + temporaryThread.join(); + Assert.assertFalse(freeCalled.get()); + + // Null out the temporary object to ensure it is enqueued for GC. + temporaryThread = null; + + while (!freeCalled.get()) { + System.gc(); + System.runFinalization(); + Thread.sleep(100); + } + } +}