[#2370] Periodically check for not alive Threads and free up their ThreadPoolCache
Motivation: At the moment we create new ThreadPoolCache whenever a Thread tries either allocate or release something on the PooledByteBufAllocator. When something is released we put it then in its ThreadPoolCache. The problem is we never check if a Thread is not alive anymore and so we may end up with memory that is never freed again if a user create many short living Threads that use the PooledByteBufAllocator. Modifications: Periodically check if the Thread is still alive that has a ThreadPoolCache assinged and if not free it. Result: Memory is freed up correctly even for short living Threads.
This commit is contained in:
parent
88481131be
commit
ceffa82d0d
@ -16,12 +16,18 @@
|
||||
|
||||
package io.netty.buffer;
|
||||
|
||||
import io.netty.util.concurrent.GlobalEventExecutor;
|
||||
import io.netty.util.concurrent.ScheduledFuture;
|
||||
import io.netty.util.internal.PlatformDependent;
|
||||
import io.netty.util.internal.SystemPropertyUtil;
|
||||
import io.netty.util.internal.logging.InternalLogger;
|
||||
import io.netty.util.internal.logging.InternalLoggerFactory;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.IdentityHashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
public class PooledByteBufAllocator extends AbstractByteBufAllocator {
|
||||
@ -38,6 +44,7 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator {
|
||||
private static final int DEFAULT_NORMAL_CACHE_SIZE;
|
||||
private static final int DEFAULT_MAX_CACHED_BUFFER_CAPACITY;
|
||||
private static final int DEFAULT_CACHE_TRIM_INTERVAL;
|
||||
private static final long DEFAULT_CACHE_CLEANUP_INTERVAL;
|
||||
|
||||
private static final int MIN_PAGE_SIZE = 4096;
|
||||
private static final int MAX_CHUNK_SIZE = (int) (((long) Integer.MAX_VALUE + 1) / 2);
|
||||
@ -94,6 +101,9 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator {
|
||||
DEFAULT_CACHE_TRIM_INTERVAL = SystemPropertyUtil.getInt(
|
||||
"io.netty.allocator.cacheTrimInterval", 8192);
|
||||
|
||||
// the default interval at which we check for caches that are assigned to Threads that are not alive anymore
|
||||
DEFAULT_CACHE_CLEANUP_INTERVAL = SystemPropertyUtil.getLong(
|
||||
"io.netty.allocator.cacheCleanupInterval", 5000);
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("-Dio.netty.allocator.numHeapArenas: {}", DEFAULT_NUM_HEAP_ARENA);
|
||||
logger.debug("-Dio.netty.allocator.numDirectArenas: {}", DEFAULT_NUM_DIRECT_ARENA);
|
||||
@ -114,6 +124,8 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator {
|
||||
logger.debug("-Dio.netty.allocator.maxCachedBufferCapacity: {}", DEFAULT_MAX_CACHED_BUFFER_CAPACITY);
|
||||
logger.debug("-Dio.netty.allocator.cacheTrimInterval: {}",
|
||||
DEFAULT_CACHE_TRIM_INTERVAL);
|
||||
logger.debug("-Dio.netty.allocator.cacheCleanupInterval: {} ms",
|
||||
DEFAULT_CACHE_CLEANUP_INTERVAL);
|
||||
}
|
||||
}
|
||||
|
||||
@ -126,7 +138,7 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator {
|
||||
private final int smallCacheSize;
|
||||
private final int normalCacheSize;
|
||||
|
||||
final PoolThreadLocalCache threadCache = new PoolThreadLocalCache();
|
||||
final PoolThreadLocalCache threadCache;
|
||||
|
||||
public PooledByteBufAllocator() {
|
||||
this(false);
|
||||
@ -147,7 +159,15 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator {
|
||||
|
||||
public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder,
|
||||
int tinyCacheSize, int smallCacheSize, int normalCacheSize) {
|
||||
this(preferDirect, nHeapArena, nDirectArena, pageSize, maxOrder, tinyCacheSize, smallCacheSize,
|
||||
normalCacheSize, DEFAULT_CACHE_CLEANUP_INTERVAL);
|
||||
}
|
||||
|
||||
public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder,
|
||||
int tinyCacheSize, int smallCacheSize, int normalCacheSize,
|
||||
long cacheThreadAliveCheckInterval) {
|
||||
super(preferDirect);
|
||||
threadCache = new PoolThreadLocalCache(cacheThreadAliveCheckInterval);
|
||||
this.tinyCacheSize = tinyCacheSize;
|
||||
this.smallCacheSize = smallCacheSize;
|
||||
this.normalCacheSize = normalCacheSize;
|
||||
@ -259,6 +279,7 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator {
|
||||
* Returns {@code true} if the calling {@link Thread} has a {@link ThreadLocal} cache for the allocated
|
||||
* buffers.
|
||||
*/
|
||||
@Deprecated
|
||||
public boolean hasThreadLocalCache() {
|
||||
return threadCache.exists();
|
||||
}
|
||||
@ -266,13 +287,20 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator {
|
||||
/**
|
||||
* Free all cached buffers for the calling {@link Thread}.
|
||||
*/
|
||||
@Deprecated
|
||||
public void freeThreadLocalCache() {
|
||||
threadCache.free();
|
||||
}
|
||||
|
||||
final class PoolThreadLocalCache extends ThreadLocal<PoolThreadCache> {
|
||||
|
||||
private final Map<Thread, PoolThreadCache> caches = new IdentityHashMap<Thread, PoolThreadCache>();
|
||||
private final ReleaseCacheTask task = new ReleaseCacheTask();
|
||||
private final AtomicInteger index = new AtomicInteger();
|
||||
private final long cacheThreadAliveCheckInterval;
|
||||
|
||||
PoolThreadLocalCache(long cacheThreadAliveCheckInterval) {
|
||||
this.cacheThreadAliveCheckInterval = cacheThreadAliveCheckInterval;
|
||||
}
|
||||
|
||||
@Override
|
||||
public PoolThreadCache get() {
|
||||
@ -303,10 +331,37 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator {
|
||||
return cache;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void set(PoolThreadCache value) {
|
||||
Thread current = Thread.currentThread();
|
||||
synchronized (caches) {
|
||||
caches.put(current, value);
|
||||
if (task.releaseTaskFuture == null) {
|
||||
task.releaseTaskFuture = GlobalEventExecutor.INSTANCE.scheduleWithFixedDelay(task,
|
||||
cacheThreadAliveCheckInterval, cacheThreadAliveCheckInterval, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
}
|
||||
super.set(value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void remove() {
|
||||
super.remove();
|
||||
PoolThreadCache cache;
|
||||
Thread current = Thread.currentThread();
|
||||
synchronized (caches) {
|
||||
cache = caches.remove(current);
|
||||
}
|
||||
if (cache != null) {
|
||||
cache.free();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns {@code true} if the calling {@link Thread} has a {@link ThreadLocal} cache for the allocated
|
||||
* buffers.
|
||||
*/
|
||||
@Deprecated
|
||||
public boolean exists() {
|
||||
return super.get() != null;
|
||||
}
|
||||
@ -314,12 +369,41 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator {
|
||||
/**
|
||||
* Free all cached buffers for the calling {@link Thread}.
|
||||
*/
|
||||
@Deprecated
|
||||
public void free() {
|
||||
PoolThreadCache cache = super.get();
|
||||
if (cache != null) {
|
||||
cache.free();
|
||||
}
|
||||
}
|
||||
|
||||
private final class ReleaseCacheTask implements Runnable {
|
||||
private ScheduledFuture<?> releaseTaskFuture;
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
synchronized (caches) {
|
||||
for (Iterator<Map.Entry<Thread, PoolThreadCache>> i = caches.entrySet().iterator();
|
||||
i.hasNext();) {
|
||||
Map.Entry<Thread, PoolThreadCache> cache = i.next();
|
||||
if (cache.getKey().isAlive()) {
|
||||
// Thread is still alive...
|
||||
continue;
|
||||
}
|
||||
cache.getValue().free();
|
||||
i.remove();
|
||||
}
|
||||
if (caches.isEmpty()) {
|
||||
// Nothing in the caches anymore so no need to continue to check if something needs to be
|
||||
// released periodically. The task will be rescheduled if there is any need later.
|
||||
if (releaseTaskFuture != null) {
|
||||
releaseTaskFuture.cancel(true);
|
||||
releaseTaskFuture = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Too noisy at the moment.
|
||||
|
Loading…
Reference in New Issue
Block a user