From ceffa82d0d341c08804554441601fdf253227fa9 Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Wed, 9 Apr 2014 11:07:14 +0200 Subject: [PATCH] [#2370] Periodically check for not alive Threads and free up their ThreadPoolCache Motivation: At the moment we create new ThreadPoolCache whenever a Thread tries either allocate or release something on the PooledByteBufAllocator. When something is released we put it then in its ThreadPoolCache. The problem is we never check if a Thread is not alive anymore and so we may end up with memory that is never freed again if a user create many short living Threads that use the PooledByteBufAllocator. Modifications: Periodically check if the Thread is still alive that has a ThreadPoolCache assinged and if not free it. Result: Memory is freed up correctly even for short living Threads. --- .../netty/buffer/PooledByteBufAllocator.java | 88 ++++++++++++++++++- 1 file changed, 86 insertions(+), 2 deletions(-) diff --git a/buffer/src/main/java/io/netty/buffer/PooledByteBufAllocator.java b/buffer/src/main/java/io/netty/buffer/PooledByteBufAllocator.java index 649fd7b1a3..dccd58cf9b 100644 --- a/buffer/src/main/java/io/netty/buffer/PooledByteBufAllocator.java +++ b/buffer/src/main/java/io/netty/buffer/PooledByteBufAllocator.java @@ -16,12 +16,18 @@ package io.netty.buffer; +import io.netty.util.concurrent.GlobalEventExecutor; +import io.netty.util.concurrent.ScheduledFuture; import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.SystemPropertyUtil; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; import java.nio.ByteBuffer; +import java.util.IdentityHashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; public class PooledByteBufAllocator extends AbstractByteBufAllocator { @@ -38,6 +44,7 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator { private static final int DEFAULT_NORMAL_CACHE_SIZE; private static final int DEFAULT_MAX_CACHED_BUFFER_CAPACITY; private static final int DEFAULT_CACHE_TRIM_INTERVAL; + private static final long DEFAULT_CACHE_CLEANUP_INTERVAL; private static final int MIN_PAGE_SIZE = 4096; private static final int MAX_CHUNK_SIZE = (int) (((long) Integer.MAX_VALUE + 1) / 2); @@ -94,6 +101,9 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator { DEFAULT_CACHE_TRIM_INTERVAL = SystemPropertyUtil.getInt( "io.netty.allocator.cacheTrimInterval", 8192); + // the default interval at which we check for caches that are assigned to Threads that are not alive anymore + DEFAULT_CACHE_CLEANUP_INTERVAL = SystemPropertyUtil.getLong( + "io.netty.allocator.cacheCleanupInterval", 5000); if (logger.isDebugEnabled()) { logger.debug("-Dio.netty.allocator.numHeapArenas: {}", DEFAULT_NUM_HEAP_ARENA); logger.debug("-Dio.netty.allocator.numDirectArenas: {}", DEFAULT_NUM_DIRECT_ARENA); @@ -114,6 +124,8 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator { logger.debug("-Dio.netty.allocator.maxCachedBufferCapacity: {}", DEFAULT_MAX_CACHED_BUFFER_CAPACITY); logger.debug("-Dio.netty.allocator.cacheTrimInterval: {}", DEFAULT_CACHE_TRIM_INTERVAL); + logger.debug("-Dio.netty.allocator.cacheCleanupInterval: {} ms", + DEFAULT_CACHE_CLEANUP_INTERVAL); } } @@ -126,7 +138,7 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator { private final int smallCacheSize; private final int normalCacheSize; - final PoolThreadLocalCache threadCache = new PoolThreadLocalCache(); + final PoolThreadLocalCache threadCache; public PooledByteBufAllocator() { this(false); @@ -147,7 +159,15 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator { public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder, int tinyCacheSize, int smallCacheSize, int normalCacheSize) { + this(preferDirect, nHeapArena, nDirectArena, pageSize, maxOrder, tinyCacheSize, smallCacheSize, + normalCacheSize, DEFAULT_CACHE_CLEANUP_INTERVAL); + } + + public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder, + int tinyCacheSize, int smallCacheSize, int normalCacheSize, + long cacheThreadAliveCheckInterval) { super(preferDirect); + threadCache = new PoolThreadLocalCache(cacheThreadAliveCheckInterval); this.tinyCacheSize = tinyCacheSize; this.smallCacheSize = smallCacheSize; this.normalCacheSize = normalCacheSize; @@ -259,6 +279,7 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator { * Returns {@code true} if the calling {@link Thread} has a {@link ThreadLocal} cache for the allocated * buffers. */ + @Deprecated public boolean hasThreadLocalCache() { return threadCache.exists(); } @@ -266,13 +287,20 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator { /** * Free all cached buffers for the calling {@link Thread}. */ + @Deprecated public void freeThreadLocalCache() { threadCache.free(); } final class PoolThreadLocalCache extends ThreadLocal { - + private final Map caches = new IdentityHashMap(); + private final ReleaseCacheTask task = new ReleaseCacheTask(); private final AtomicInteger index = new AtomicInteger(); + private final long cacheThreadAliveCheckInterval; + + PoolThreadLocalCache(long cacheThreadAliveCheckInterval) { + this.cacheThreadAliveCheckInterval = cacheThreadAliveCheckInterval; + } @Override public PoolThreadCache get() { @@ -303,10 +331,37 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator { return cache; } + @Override + public void set(PoolThreadCache value) { + Thread current = Thread.currentThread(); + synchronized (caches) { + caches.put(current, value); + if (task.releaseTaskFuture == null) { + task.releaseTaskFuture = GlobalEventExecutor.INSTANCE.scheduleWithFixedDelay(task, + cacheThreadAliveCheckInterval, cacheThreadAliveCheckInterval, TimeUnit.MILLISECONDS); + } + } + super.set(value); + } + + @Override + public void remove() { + super.remove(); + PoolThreadCache cache; + Thread current = Thread.currentThread(); + synchronized (caches) { + cache = caches.remove(current); + } + if (cache != null) { + cache.free(); + } + } + /** * Returns {@code true} if the calling {@link Thread} has a {@link ThreadLocal} cache for the allocated * buffers. */ + @Deprecated public boolean exists() { return super.get() != null; } @@ -314,12 +369,41 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator { /** * Free all cached buffers for the calling {@link Thread}. */ + @Deprecated public void free() { PoolThreadCache cache = super.get(); if (cache != null) { cache.free(); } } + + private final class ReleaseCacheTask implements Runnable { + private ScheduledFuture releaseTaskFuture; + + @Override + public void run() { + synchronized (caches) { + for (Iterator> i = caches.entrySet().iterator(); + i.hasNext();) { + Map.Entry cache = i.next(); + if (cache.getKey().isAlive()) { + // Thread is still alive... + continue; + } + cache.getValue().free(); + i.remove(); + } + if (caches.isEmpty()) { + // Nothing in the caches anymore so no need to continue to check if something needs to be + // released periodically. The task will be rescheduled if there is any need later. + if (releaseTaskFuture != null) { + releaseTaskFuture.cancel(true); + releaseTaskFuture = null; + } + } + } + } + } } // Too noisy at the moment.