diff --git a/buffer/src/main/java/io/netty/buffer/PoolThreadCache.java b/buffer/src/main/java/io/netty/buffer/PoolThreadCache.java index 225c11eb7b..167072b4b7 100644 --- a/buffer/src/main/java/io/netty/buffer/PoolThreadCache.java +++ b/buffer/src/main/java/io/netty/buffer/PoolThreadCache.java @@ -192,29 +192,32 @@ final class PoolThreadCache { /** * Should be called if the Thread that uses this cache is about to exist to release resources out of the cache */ - void free() { - free(tinySubPageDirectCaches); - free(smallSubPageDirectCaches); - free(normalDirectCaches); - free(tinySubPageHeapCaches); - free(smallSubPageHeapCaches); - free(normalHeapCaches); + int free() { + return free(tinySubPageDirectCaches) + + free(smallSubPageDirectCaches) + + free(normalDirectCaches) + + free(tinySubPageHeapCaches) + + free(smallSubPageHeapCaches) + + free(normalHeapCaches); } - private static void free(MemoryRegionCache[] caches) { + private static int free(MemoryRegionCache[] caches) { if (caches == null) { - return; + return 0; } + + int numFreed = 0; for (int i = 0; i < caches.length; i++) { - free(caches[i]); + numFreed += free(caches[i]); } + return numFreed; } - private static void free(MemoryRegionCache cache) { + private static int free(MemoryRegionCache cache) { if (cache == null) { - return; + return 0; } - cache.free(); + return cache.free(); } void trim() { @@ -384,13 +387,16 @@ final class PoolThreadCache { /** * Clear out this cache and free up all previous cached {@link PoolChunk}s and {@code handle}s. */ - public void free() { + public int free() { + int numFreed = 0; entriesInUse = 0; maxEntriesInUse = 0; for (int i = head;; i = nextIdx(i)) { - if (!freeEntry(entries[i])) { + if (freeEntry(entries[i])) { + numFreed++; + } else { // all cleared - return; + return numFreed; } } } @@ -440,7 +446,7 @@ final class PoolThreadCache { private int nextIdx(int index) { // use bitwise operation as this is faster as using modulo. - return (index + 1) & entries.length - 1; + return index + 1 & entries.length - 1; } private static final class Entry { diff --git a/buffer/src/main/java/io/netty/buffer/PooledByteBufAllocator.java b/buffer/src/main/java/io/netty/buffer/PooledByteBufAllocator.java index b65fc8d44e..a7a76cb454 100644 --- a/buffer/src/main/java/io/netty/buffer/PooledByteBufAllocator.java +++ b/buffer/src/main/java/io/netty/buffer/PooledByteBufAllocator.java @@ -16,18 +16,13 @@ package io.netty.buffer; -import io.netty.util.concurrent.GlobalEventExecutor; -import io.netty.util.concurrent.ScheduledFuture; +import io.netty.util.ThreadDeathWatcher; import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.SystemPropertyUtil; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; import java.nio.ByteBuffer; -import java.util.IdentityHashMap; -import java.util.Iterator; -import java.util.Map; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; public class PooledByteBufAllocator extends AbstractByteBufAllocator { @@ -44,7 +39,6 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator { private static final int DEFAULT_NORMAL_CACHE_SIZE; private static final int DEFAULT_MAX_CACHED_BUFFER_CAPACITY; private static final int DEFAULT_CACHE_TRIM_INTERVAL; - private static final long DEFAULT_CACHE_CLEANUP_INTERVAL; private static final int MIN_PAGE_SIZE = 4096; private static final int MAX_CHUNK_SIZE = (int) (((long) Integer.MAX_VALUE + 1) / 2); @@ -101,9 +95,6 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator { DEFAULT_CACHE_TRIM_INTERVAL = SystemPropertyUtil.getInt( "io.netty.allocator.cacheTrimInterval", 8192); - // the default interval at which we check for caches that are assigned to Threads that are not alive anymore - DEFAULT_CACHE_CLEANUP_INTERVAL = SystemPropertyUtil.getLong( - "io.netty.allocator.cacheCleanupInterval", 5000); if (logger.isDebugEnabled()) { logger.debug("-Dio.netty.allocator.numHeapArenas: {}", DEFAULT_NUM_HEAP_ARENA); logger.debug("-Dio.netty.allocator.numDirectArenas: {}", DEFAULT_NUM_DIRECT_ARENA); @@ -122,10 +113,7 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator { logger.debug("-Dio.netty.allocator.smallCacheSize: {}", DEFAULT_SMALL_CACHE_SIZE); logger.debug("-Dio.netty.allocator.normalCacheSize: {}", DEFAULT_NORMAL_CACHE_SIZE); logger.debug("-Dio.netty.allocator.maxCachedBufferCapacity: {}", DEFAULT_MAX_CACHED_BUFFER_CAPACITY); - logger.debug("-Dio.netty.allocator.cacheTrimInterval: {}", - DEFAULT_CACHE_TRIM_INTERVAL); - logger.debug("-Dio.netty.allocator.cacheCleanupInterval: {} ms", - DEFAULT_CACHE_CLEANUP_INTERVAL); + logger.debug("-Dio.netty.allocator.cacheTrimInterval: {}", DEFAULT_CACHE_TRIM_INTERVAL); } } @@ -159,15 +147,8 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator { public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder, int tinyCacheSize, int smallCacheSize, int normalCacheSize) { - this(preferDirect, nHeapArena, nDirectArena, pageSize, maxOrder, tinyCacheSize, smallCacheSize, - normalCacheSize, DEFAULT_CACHE_CLEANUP_INTERVAL); - } - - public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder, - int tinyCacheSize, int smallCacheSize, int normalCacheSize, - long cacheThreadAliveCheckInterval) { super(preferDirect); - threadCache = new PoolThreadLocalCache(cacheThreadAliveCheckInterval); + threadCache = new PoolThreadLocalCache(); this.tinyCacheSize = tinyCacheSize; this.smallCacheSize = smallCacheSize; this.normalCacheSize = normalCacheSize; @@ -276,97 +257,45 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator { } final class PoolThreadLocalCache extends ThreadLocal { - private final Map caches = new IdentityHashMap(); - private final ReleaseCacheTask task = new ReleaseCacheTask(); private final AtomicInteger index = new AtomicInteger(); - private final long cacheThreadAliveCheckInterval; - - PoolThreadLocalCache(long cacheThreadAliveCheckInterval) { - this.cacheThreadAliveCheckInterval = cacheThreadAliveCheckInterval; - } @Override - public PoolThreadCache get() { - PoolThreadCache cache = super.get(); - if (cache == null) { - final int idx = index.getAndIncrement(); - final PoolArena heapArena; - final PoolArena directArena; + protected PoolThreadCache initialValue() { + final int idx = index.getAndIncrement(); + final PoolArena heapArena; + final PoolArena directArena; - if (heapArenas != null) { - heapArena = heapArenas[Math.abs(idx % heapArenas.length)]; - } else { - heapArena = null; - } - - if (directArenas != null) { - directArena = directArenas[Math.abs(idx % directArenas.length)]; - } else { - directArena = null; - } - // If the current Thread is assigned to an EventExecutor we can - // easily free the cached stuff again once the EventExecutor completes later. - cache = new PoolThreadCache( - heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize, - DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL); - set(cache); + if (heapArenas != null) { + heapArena = heapArenas[Math.abs(idx % heapArenas.length)]; + } else { + heapArena = null; } + + if (directArenas != null) { + directArena = directArenas[Math.abs(idx % directArenas.length)]; + } else { + directArena = null; + } + + final PoolThreadCache cache = new PoolThreadCache( + heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize, + DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL); + + // The thread-local cache will keep a list of pooled buffers which must be returned to + // the pool when the thread is not alive anymore. + final Thread thread = Thread.currentThread(); + ThreadDeathWatcher.watch(thread, new Runnable() { + @Override + public void run() { + int numFreed = cache.free(); + if (logger.isDebugEnabled()) { + logger.debug("Freed {} thread-local buffer(s) from thread: {}", numFreed, thread.getName()); + } + } + }); + return cache; } - - @Override - public void set(PoolThreadCache value) { - Thread current = Thread.currentThread(); - synchronized (caches) { - caches.put(current, value); - if (task.releaseTaskFuture == null) { - task.releaseTaskFuture = GlobalEventExecutor.INSTANCE.scheduleWithFixedDelay(task, - cacheThreadAliveCheckInterval, cacheThreadAliveCheckInterval, TimeUnit.MILLISECONDS); - } - } - super.set(value); - } - - @Override - public void remove() { - super.remove(); - PoolThreadCache cache; - Thread current = Thread.currentThread(); - synchronized (caches) { - cache = caches.remove(current); - } - if (cache != null) { - cache.free(); - } - } - - private final class ReleaseCacheTask implements Runnable { - private ScheduledFuture releaseTaskFuture; - - @Override - public void run() { - synchronized (caches) { - for (Iterator> i = caches.entrySet().iterator(); - i.hasNext();) { - Map.Entry cache = i.next(); - if (cache.getKey().isAlive()) { - // Thread is still alive... - continue; - } - cache.getValue().free(); - i.remove(); - } - if (caches.isEmpty()) { - // Nothing in the caches anymore so no need to continue to check if something needs to be - // released periodically. The task will be rescheduled if there is any need later. - if (releaseTaskFuture != null) { - releaseTaskFuture.cancel(true); - releaseTaskFuture = null; - } - } - } - } - } } // Too noisy at the moment. diff --git a/common/src/main/java/io/netty/util/ReferenceCountUtil.java b/common/src/main/java/io/netty/util/ReferenceCountUtil.java index fc95c1b2e8..c22a056717 100644 --- a/common/src/main/java/io/netty/util/ReferenceCountUtil.java +++ b/common/src/main/java/io/netty/util/ReferenceCountUtil.java @@ -15,19 +15,10 @@ */ package io.netty.util; -import io.netty.util.concurrent.GlobalEventExecutor; -import io.netty.util.concurrent.ScheduledFuture; import io.netty.util.internal.StringUtil; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; -import java.util.ArrayList; -import java.util.IdentityHashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.concurrent.TimeUnit; - /** * Collection of method to handle objects that may implement {@link ReferenceCounted}. */ @@ -106,8 +97,6 @@ public final class ReferenceCountUtil { return false; } - private static final Map> pendingReleases = new IdentityHashMap>(); - /** * Schedules the specified object to be released when the caller thread terminates. Note that this operation is * intended to simplify reference counting of ephemeral objects during unit tests. Do not use it beyond the @@ -124,82 +113,40 @@ public final class ReferenceCountUtil { */ public static T releaseLater(T msg, int decrement) { if (msg instanceof ReferenceCounted) { - synchronized (pendingReleases) { - Thread thread = Thread.currentThread(); - List entries = pendingReleases.get(thread); - if (entries == null) { - // Start the periodic releasing task (if not started yet.) - if (pendingReleases.isEmpty()) { - ReleasingTask task = new ReleasingTask(); - task.future = GlobalEventExecutor.INSTANCE.scheduleWithFixedDelay(task, 1, 1, TimeUnit.SECONDS); - } - - // Create a new entry. - entries = new ArrayList(); - pendingReleases.put(thread, entries); - } - - entries.add(new Entry((ReferenceCounted) msg, decrement)); - } + ThreadDeathWatcher.watch(Thread.currentThread(), new ReleasingTask((ReferenceCounted) msg, decrement)); } return msg; } - private static final class Entry { - final ReferenceCounted obj; - final int decrement; - - Entry(ReferenceCounted obj, int decrement) { - this.obj = obj; - this.decrement = decrement; - } - - public String toString() { - return StringUtil.simpleClassName(obj) + ".release(" + decrement + ") refCnt: " + obj.refCnt(); - } - } - /** * Releases the objects when the thread that called {@link #releaseLater(Object)} has been terminated. */ private static final class ReleasingTask implements Runnable { - volatile ScheduledFuture future; + + private final ReferenceCounted obj; + private final int decrement; + + ReleasingTask(ReferenceCounted obj, int decrement) { + this.obj = obj; + this.decrement = decrement; + } @Override public void run() { - synchronized (pendingReleases) { - for (Iterator>> i = pendingReleases.entrySet().iterator(); - i.hasNext();) { - - Map.Entry> e = i.next(); - if (e.getKey().isAlive()) { - continue; - } - - releaseAll(e.getValue()); - - // Remove from the map since the thread is not alive anymore. - i.remove(); - } - - if (pendingReleases.isEmpty()) { - future.cancel(false); + try { + if (!obj.release(decrement)) { + logger.warn("Non-zero refCnt: {}", this); + } else { + logger.debug("Released: {}", this); } + } catch (Exception ex) { + logger.warn("Failed to release an object: {}", obj, ex); } } - private static void releaseAll(Iterable entries) { - for (Entry e: entries) { - try { - if (!e.obj.release(e.decrement)) { - logger.warn("Non-zero refCnt: {}", e); - } else { - logger.warn("Released: {}", e); - } - } catch (Exception ex) { - logger.warn("Failed to release an object: {}", e.obj, ex); - } - } + @Override + public String toString() { + return StringUtil.simpleClassName(obj) + ".release(" + decrement + ") refCnt: " + obj.refCnt(); } } diff --git a/common/src/main/java/io/netty/util/ThreadDeathWatcher.java b/common/src/main/java/io/netty/util/ThreadDeathWatcher.java new file mode 100644 index 0000000000..02cc6fefea --- /dev/null +++ b/common/src/main/java/io/netty/util/ThreadDeathWatcher.java @@ -0,0 +1,168 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.util; + +import io.netty.util.concurrent.DefaultThreadFactory; +import io.netty.util.internal.MpscLinkedQueue; +import io.netty.util.internal.PlatformDependent; +import io.netty.util.internal.logging.InternalLogger; +import io.netty.util.internal.logging.InternalLoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Checks if a thread is alive periodically and runs a task when a thread dies. + *

+ * This thread starts a daemon thread to check the state of the threads being watched and to invoke their + * associated {@link Runnable}s. When there is no thread to watch (i.e. all threads are dead), the daemon thread + * will terminate itself, and a new daemon thread will be started again when a new watch is added. + *

+ */ +public final class ThreadDeathWatcher { + + private static final InternalLogger logger = InternalLoggerFactory.getInstance(ThreadDeathWatcher.class); + private static final ThreadFactory threadFactory = + new DefaultThreadFactory(ThreadDeathWatcher.class, true, Thread.MIN_PRIORITY); + + private static final Queue pendingEntries = PlatformDependent.newMpscQueue(); + private static final Watcher watcher = new Watcher(); + private static final AtomicBoolean started = new AtomicBoolean(); + + /** + * Schedules the specified {@code task} to run when the specified {@code thread} dies. + * + * @param thread the {@link Thread} to watch + * @param task the {@link Runnable} to run when the {@code thread} dies + * + * @throws IllegalArgumentException if the specified {@code thread} is not alive + */ + public static void watch(Thread thread, Runnable task) { + if (thread == null) { + throw new NullPointerException("thread"); + } + if (task == null) { + throw new NullPointerException("task"); + } + if (!thread.isAlive()) { + throw new IllegalArgumentException("thread must be alive."); + } + + pendingEntries.add(new Entry(thread, task)); + + if (started.compareAndSet(false, true)) { + Thread watcherThread = threadFactory.newThread(watcher); + watcherThread.start(); + } + } + + private ThreadDeathWatcher() { } + + private static final class Watcher implements Runnable { + + private final List watchees = new ArrayList(); + + @Override + public void run() { + for (;;) { + fetchWatchees(); + notifyWatchees(); + + try { + Thread.sleep(1000); + } catch (InterruptedException ignore) { + // Ignore the interrupt; do not terminate until all tasks are run. + } + + if (watchees.isEmpty() && pendingEntries.isEmpty()) { + + // Mark the current worker thread as stopped. + // The following CAS must always success and must be uncontended, + // because only one watcher thread should be running at the same time. + boolean stopped = started.compareAndSet(true, false); + assert stopped; + + // Check if there are pending entries added by watch() while we do CAS above. + if (pendingEntries.isEmpty()) { + // A) watch() was not invoked and thus there's nothing to handle + // -> safe to terminate because there's nothing left to do + // B) a new watcher thread started and handled them all + // -> safe to terminate the new watcher thread will take care the rest + break; + } + + // There are pending entries again, added by watch() + if (!started.compareAndSet(false, true)) { + // watch() started a new watcher thread and set 'started' to true. + // -> terminate this thread so that the new watcher reads from pendingEntries exclusively. + break; + } + + // watch() added an entry, but this worker was faster to set 'started' to true. + // i.e. a new watcher thread was not started + // -> keep this thread alive to handle the newly added entries. + } + } + } + + private void fetchWatchees() { + for (;;) { + Entry e = pendingEntries.poll(); + if (e == null) { + break; + } + + watchees.add(e); + } + } + + private void notifyWatchees() { + List watchees = this.watchees; + for (int i = 0; i < watchees.size();) { + Entry e = watchees.get(i); + if (!e.thread.isAlive()) { + watchees.remove(i); + try { + e.task.run(); + } catch (Throwable t) { + logger.warn("Thread death watcher task raised an exception:", t); + } + } else { + i ++; + } + } + } + } + + private static final class Entry extends MpscLinkedQueue.Node { + final Thread thread; + final Runnable task; + + Entry(Thread thread, Runnable task) { + this.thread = thread; + this.task = task; + } + + @Override + public Entry value() { + return this; + } + } +} diff --git a/common/src/test/java/io/netty/util/ThreadDeathWatcherTest.java b/common/src/test/java/io/netty/util/ThreadDeathWatcherTest.java new file mode 100644 index 0000000000..fbb042e565 --- /dev/null +++ b/common/src/test/java/io/netty/util/ThreadDeathWatcherTest.java @@ -0,0 +1,73 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.util; + +import org.junit.Test; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static org.hamcrest.CoreMatchers.*; +import static org.junit.Assert.*; + +public class ThreadDeathWatcherTest { + + @Test(timeout = 10000) + public void testSimple() throws Exception { + final CountDownLatch latch = new CountDownLatch(1); + final Thread t = new Thread() { + @Override + public void run() { + for (;;) { + try { + Thread.sleep(1000); + } catch (InterruptedException ignore) { + break; + } + } + } + }; + + final Runnable task = new Runnable() { + @Override + public void run() { + if (!t.isAlive()) { + latch.countDown(); + } + } + }; + + try { + ThreadDeathWatcher.watch(t, task); + fail("must reject to watch a non-alive thread."); + } catch (IllegalArgumentException e) { + // expected + } + + t.start(); + ThreadDeathWatcher.watch(t, task); + + // As long as the thread is alive, the task should not run. + assertThat(latch.await(750, TimeUnit.MILLISECONDS), is(false)); + + // Interrupt the thread to terminate it. + t.interrupt(); + + // The task must be run on termination. + latch.await(); + } +} \ No newline at end of file