[#2370] Periodically check for not alive Threads and free up their ThreadPoolCache

Motivation:
At the moment we create new ThreadPoolCache whenever a Thread tries either allocate or release something on the PooledByteBufAllocator. When something is released we put it then in its ThreadPoolCache. The problem is we never check if a Thread is not alive anymore and so we may end up with memory that is never freed again if a user create many short living Threads that use the PooledByteBufAllocator.

Modifications:
Periodically check if the Thread is still alive that has a ThreadPoolCache assinged and if not free it.

Result:
Memory is freed up correctly even for short living Threads.
This commit is contained in:
Norman Maurer 2014-04-09 11:07:14 +02:00
parent 012166803a
commit 9c934116f2

View File

@ -16,12 +16,18 @@
package io.netty.buffer; package io.netty.buffer;
import io.netty.util.concurrent.GlobalEventExecutor;
import io.netty.util.concurrent.ScheduledFuture;
import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.SystemPropertyUtil; import io.netty.util.internal.SystemPropertyUtil;
import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory; import io.netty.util.internal.logging.InternalLoggerFactory;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
public class PooledByteBufAllocator extends AbstractByteBufAllocator { public class PooledByteBufAllocator extends AbstractByteBufAllocator {
@ -38,6 +44,7 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator {
private static final int DEFAULT_NORMAL_CACHE_SIZE; private static final int DEFAULT_NORMAL_CACHE_SIZE;
private static final int DEFAULT_MAX_CACHED_BUFFER_CAPACITY; private static final int DEFAULT_MAX_CACHED_BUFFER_CAPACITY;
private static final int DEFAULT_CACHE_TRIM_INTERVAL; private static final int DEFAULT_CACHE_TRIM_INTERVAL;
private static final long DEFAULT_CACHE_CLEANUP_INTERVAL;
private static final int MIN_PAGE_SIZE = 4096; private static final int MIN_PAGE_SIZE = 4096;
private static final int MAX_CHUNK_SIZE = (int) (((long) Integer.MAX_VALUE + 1) / 2); private static final int MAX_CHUNK_SIZE = (int) (((long) Integer.MAX_VALUE + 1) / 2);
@ -94,6 +101,9 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator {
DEFAULT_CACHE_TRIM_INTERVAL = SystemPropertyUtil.getInt( DEFAULT_CACHE_TRIM_INTERVAL = SystemPropertyUtil.getInt(
"io.netty.allocator.cacheTrimInterval", 8192); "io.netty.allocator.cacheTrimInterval", 8192);
// the default interval at which we check for caches that are assigned to Threads that are not alive anymore
DEFAULT_CACHE_CLEANUP_INTERVAL = SystemPropertyUtil.getLong(
"io.netty.allocator.cacheCleanupInterval", 5000);
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.debug("-Dio.netty.allocator.numHeapArenas: {}", DEFAULT_NUM_HEAP_ARENA); logger.debug("-Dio.netty.allocator.numHeapArenas: {}", DEFAULT_NUM_HEAP_ARENA);
logger.debug("-Dio.netty.allocator.numDirectArenas: {}", DEFAULT_NUM_DIRECT_ARENA); logger.debug("-Dio.netty.allocator.numDirectArenas: {}", DEFAULT_NUM_DIRECT_ARENA);
@ -114,6 +124,8 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator {
logger.debug("-Dio.netty.allocator.maxCachedBufferCapacity: {}", DEFAULT_MAX_CACHED_BUFFER_CAPACITY); logger.debug("-Dio.netty.allocator.maxCachedBufferCapacity: {}", DEFAULT_MAX_CACHED_BUFFER_CAPACITY);
logger.debug("-Dio.netty.allocator.cacheTrimInterval: {}", logger.debug("-Dio.netty.allocator.cacheTrimInterval: {}",
DEFAULT_CACHE_TRIM_INTERVAL); DEFAULT_CACHE_TRIM_INTERVAL);
logger.debug("-Dio.netty.allocator.cacheCleanupInterval: {} ms",
DEFAULT_CACHE_CLEANUP_INTERVAL);
} }
} }
@ -126,7 +138,7 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator {
private final int smallCacheSize; private final int smallCacheSize;
private final int normalCacheSize; private final int normalCacheSize;
final PoolThreadLocalCache threadCache = new PoolThreadLocalCache(); final PoolThreadLocalCache threadCache;
public PooledByteBufAllocator() { public PooledByteBufAllocator() {
this(false); this(false);
@ -147,7 +159,15 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator {
public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder, public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder,
int tinyCacheSize, int smallCacheSize, int normalCacheSize) { int tinyCacheSize, int smallCacheSize, int normalCacheSize) {
this(preferDirect, nHeapArena, nDirectArena, pageSize, maxOrder, tinyCacheSize, smallCacheSize,
normalCacheSize, DEFAULT_CACHE_CLEANUP_INTERVAL);
}
public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder,
int tinyCacheSize, int smallCacheSize, int normalCacheSize,
long cacheThreadAliveCheckInterval) {
super(preferDirect); super(preferDirect);
threadCache = new PoolThreadLocalCache(cacheThreadAliveCheckInterval);
this.tinyCacheSize = tinyCacheSize; this.tinyCacheSize = tinyCacheSize;
this.smallCacheSize = smallCacheSize; this.smallCacheSize = smallCacheSize;
this.normalCacheSize = normalCacheSize; this.normalCacheSize = normalCacheSize;
@ -259,6 +279,7 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator {
* Returns {@code true} if the calling {@link Thread} has a {@link ThreadLocal} cache for the allocated * Returns {@code true} if the calling {@link Thread} has a {@link ThreadLocal} cache for the allocated
* buffers. * buffers.
*/ */
@Deprecated
public boolean hasThreadLocalCache() { public boolean hasThreadLocalCache() {
return threadCache.exists(); return threadCache.exists();
} }
@ -266,13 +287,20 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator {
/** /**
* Free all cached buffers for the calling {@link Thread}. * Free all cached buffers for the calling {@link Thread}.
*/ */
@Deprecated
public void freeThreadLocalCache() { public void freeThreadLocalCache() {
threadCache.free(); threadCache.free();
} }
final class PoolThreadLocalCache extends ThreadLocal<PoolThreadCache> { final class PoolThreadLocalCache extends ThreadLocal<PoolThreadCache> {
private final Map<Thread, PoolThreadCache> caches = new IdentityHashMap<Thread, PoolThreadCache>();
private final ReleaseCacheTask task = new ReleaseCacheTask();
private final AtomicInteger index = new AtomicInteger(); private final AtomicInteger index = new AtomicInteger();
private final long cacheThreadAliveCheckInterval;
PoolThreadLocalCache(long cacheThreadAliveCheckInterval) {
this.cacheThreadAliveCheckInterval = cacheThreadAliveCheckInterval;
}
@Override @Override
public PoolThreadCache get() { public PoolThreadCache get() {
@ -303,10 +331,37 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator {
return cache; return cache;
} }
@Override
public void set(PoolThreadCache value) {
Thread current = Thread.currentThread();
synchronized (caches) {
caches.put(current, value);
if (task.releaseTaskFuture == null) {
task.releaseTaskFuture = GlobalEventExecutor.INSTANCE.scheduleWithFixedDelay(task,
cacheThreadAliveCheckInterval, cacheThreadAliveCheckInterval, TimeUnit.MILLISECONDS);
}
}
super.set(value);
}
@Override
public void remove() {
super.remove();
PoolThreadCache cache;
Thread current = Thread.currentThread();
synchronized (caches) {
cache = caches.remove(current);
}
if (cache != null) {
cache.free();
}
}
/** /**
* Returns {@code true} if the calling {@link Thread} has a {@link ThreadLocal} cache for the allocated * Returns {@code true} if the calling {@link Thread} has a {@link ThreadLocal} cache for the allocated
* buffers. * buffers.
*/ */
@Deprecated
public boolean exists() { public boolean exists() {
return super.get() != null; return super.get() != null;
} }
@ -314,12 +369,41 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator {
/** /**
* Free all cached buffers for the calling {@link Thread}. * Free all cached buffers for the calling {@link Thread}.
*/ */
@Deprecated
public void free() { public void free() {
PoolThreadCache cache = super.get(); PoolThreadCache cache = super.get();
if (cache != null) { if (cache != null) {
cache.free(); cache.free();
} }
} }
private final class ReleaseCacheTask implements Runnable {
private ScheduledFuture<?> releaseTaskFuture;
@Override
public void run() {
synchronized (caches) {
for (Iterator<Map.Entry<Thread, PoolThreadCache>> i = caches.entrySet().iterator();
i.hasNext();) {
Map.Entry<Thread, PoolThreadCache> cache = i.next();
if (cache.getKey().isAlive()) {
// Thread is still alive...
continue;
}
cache.getValue().free();
i.remove();
}
if (caches.isEmpty()) {
// Nothing in the caches anymore so no need to continue to check if something needs to be
// released periodically. The task will be rescheduled if there is any need later.
if (releaseTaskFuture != null) {
releaseTaskFuture.cancel(true);
releaseTaskFuture = null;
}
}
}
}
}
} }
// Too noisy at the moment. // Too noisy at the moment.