diff --git a/buffer/src/main/java/io/netty/buffer/PoolArena.java b/buffer/src/main/java/io/netty/buffer/PoolArena.java index 2beff46a89..8b24c3711b 100644 --- a/buffer/src/main/java/io/netty/buffer/PoolArena.java +++ b/buffer/src/main/java/io/netty/buffer/PoolArena.java @@ -23,14 +23,16 @@ import java.nio.ByteBuffer; abstract class PoolArena { + static final int numTinySubpagePools = 512 >>> 4; + final PooledByteBufAllocator parent; - private final int pageSize; private final int maxOrder; - private final int pageShifts; - private final int chunkSize; - private final int subpageOverflowMask; - + final int pageSize; + final int pageShifts; + final int chunkSize; + final int subpageOverflowMask; + final int numSmallSubpagePools; private final PoolSubpage[] tinySubpagePools; private final PoolSubpage[] smallSubpagePools; @@ -51,13 +53,13 @@ abstract class PoolArena { this.pageShifts = pageShifts; this.chunkSize = chunkSize; subpageOverflowMask = ~(pageSize - 1); - - tinySubpagePools = newSubpagePoolArray(512 >>> 4); + tinySubpagePools = newSubpagePoolArray(numTinySubpagePools); for (int i = 0; i < tinySubpagePools.length; i ++) { tinySubpagePools[i] = newSubpagePoolHead(pageSize); } - smallSubpagePools = newSubpagePoolArray(pageShifts - 9); + numSmallSubpagePools = pageShifts - 9; + smallSubpagePools = newSubpagePoolArray(numSmallSubpagePools); for (int i = 0; i < smallSubpagePools.length; i ++) { smallSubpagePools[i] = newSubpagePoolHead(pageSize); } @@ -89,27 +91,56 @@ abstract class PoolArena { return new PoolSubpage[size]; } + abstract boolean isDirect(); + PooledByteBuf allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) { PooledByteBuf buf = newByteBuf(maxCapacity); allocate(cache, buf, reqCapacity); return buf; } + static int tinyIdx(int normCapacity) { + return normCapacity >>> 4; + } + + static int smallIdx(int normCapacity) { + int tableIdx = 0; + int i = normCapacity >>> 10; + while (i != 0) { + i >>>= 1; + tableIdx ++; + } + return tableIdx; + } + + // capacity < pageSize + boolean isTinyOrSmall(int normCapacity) { + return (normCapacity & subpageOverflowMask) == 0; + } + + // normCapacity < 512 + static boolean isTiny(int normCapacity) { + return (normCapacity & 0xFFFFFE00) == 0; + } + private void allocate(PoolThreadCache cache, PooledByteBuf buf, final int reqCapacity) { final int normCapacity = normalizeCapacity(reqCapacity); - if ((normCapacity & subpageOverflowMask) == 0) { // capacity < pageSize + if (isTinyOrSmall(normCapacity)) { // capacity < pageSize int tableIdx; PoolSubpage[] table; - if ((normCapacity & 0xFFFFFE00) == 0) { // < 512 - tableIdx = normCapacity >>> 4; + if (isTiny(normCapacity)) { // < 512 + if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) { + // was able to allocate out of the cache so move on + return; + } + tableIdx = tinyIdx(normCapacity); table = tinySubpagePools; } else { - tableIdx = 0; - int i = normCapacity >>> 10; - while (i != 0) { - i >>>= 1; - tableIdx ++; + if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) { + // was able to allocate out of the cache so move on + return; } + tableIdx = smallIdx(normCapacity); table = smallSubpagePools; } @@ -124,11 +155,16 @@ abstract class PoolArena { return; } } - } else if (normCapacity > chunkSize) { + } else if (normCapacity <= chunkSize) { + if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) { + // was able to allocate out of the cache so move on + return; + } + } else { + // Huge allocations are never served via the cache so just call allocateHuge allocateHuge(buf, reqCapacity); return; } - allocateNormal(buf, reqCapacity, normCapacity); } @@ -151,10 +187,15 @@ abstract class PoolArena { buf.initUnpooled(newUnpooledChunk(reqCapacity), reqCapacity); } - void free(PoolChunk chunk, long handle) { + void free(PoolChunk chunk, long handle, int normCapacity) { if (chunk.unpooled) { destroyChunk(chunk); } else { + PoolThreadCache cache = parent.threadCache.get(); + if (cache.add(this, chunk, handle, normCapacity)) { + // cached so not free it. + return; + } synchronized (this) { chunk.parent.free(chunk, handle); } @@ -164,7 +205,7 @@ abstract class PoolArena { PoolSubpage findSubpagePoolHead(int elemSize) { int tableIdx; PoolSubpage[] table; - if ((elemSize & 0xFFFFFE00) == 0) { // < 512 + if (isTiny(elemSize)) { // < 512 tableIdx = elemSize >>> 4; table = tinySubpagePools; } else { @@ -180,7 +221,7 @@ abstract class PoolArena { return table[tableIdx]; } - private int normalizeCapacity(int reqCapacity) { + int normalizeCapacity(int reqCapacity) { if (reqCapacity < 0) { throw new IllegalArgumentException("capacity: " + reqCapacity + " (expected: 0+)"); } @@ -188,7 +229,7 @@ abstract class PoolArena { return reqCapacity; } - if ((reqCapacity & 0xFFFFFE00) != 0) { // >= 512 + if (!isTiny(reqCapacity)) { // >= 512 // Doubled int normalizedCapacity = reqCapacity; @@ -228,7 +269,7 @@ abstract class PoolArena { long oldHandle = buf.handle; T oldMemory = buf.memory; int oldOffset = buf.offset; - + int oldMaxLength = buf.maxLength; int readerIndex = buf.readerIndex(); int writerIndex = buf.writerIndex(); @@ -253,7 +294,7 @@ abstract class PoolArena { buf.setIndex(readerIndex, writerIndex); if (freeOldMemory) { - free(oldChunk, oldHandle); + free(oldChunk, oldHandle, oldMaxLength); } } @@ -339,6 +380,11 @@ abstract class PoolArena { super(parent, pageSize, maxOrder, pageShifts, chunkSize); } + @Override + boolean isDirect() { + return false; + } + @Override protected PoolChunk newChunk(int pageSize, int maxOrder, int pageShifts, int chunkSize) { return new PoolChunk(this, new byte[chunkSize], pageSize, maxOrder, pageShifts, chunkSize); @@ -377,6 +423,11 @@ abstract class PoolArena { super(parent, pageSize, maxOrder, pageShifts, chunkSize); } + @Override + boolean isDirect() { + return true; + } + @Override protected PoolChunk newChunk(int pageSize, int maxOrder, int pageShifts, int chunkSize) { return new PoolChunk( diff --git a/buffer/src/main/java/io/netty/buffer/PoolThreadCache.java b/buffer/src/main/java/io/netty/buffer/PoolThreadCache.java index 09ac499b04..225c11eb7b 100644 --- a/buffer/src/main/java/io/netty/buffer/PoolThreadCache.java +++ b/buffer/src/main/java/io/netty/buffer/PoolThreadCache.java @@ -16,18 +16,436 @@ package io.netty.buffer; + import java.nio.ByteBuffer; +/** + * Acts a Thread cache for allocations. This implementation is moduled after + * jemalloc and the descripted + * technics of Scalable memory allocation using jemalloc. + */ final class PoolThreadCache { - final PoolArena heapArena; final PoolArena directArena; + // Hold the caches for the different size classes, which are tiny, small and normal. + private final MemoryRegionCache[] tinySubPageHeapCaches; + private final MemoryRegionCache[] smallSubPageHeapCaches; + private final MemoryRegionCache[] tinySubPageDirectCaches; + private final MemoryRegionCache[] smallSubPageDirectCaches; + private final MemoryRegionCache[] normalHeapCaches; + private final MemoryRegionCache[] normalDirectCaches; + + // Used for bitshifting when calculate the index of normal caches later + private final int numShiftsNormalDirect; + private final int numShiftsNormalHeap; + private final int freeSweepAllocationThreshold; + + private int allocations; + // TODO: Test if adding padding helps under contention //private long pad0, pad1, pad2, pad3, pad4, pad5, pad6, pad7; - PoolThreadCache(PoolArena heapArena, PoolArena directArena) { + PoolThreadCache(PoolArena heapArena, PoolArena directArena, + int tinyCacheSize, int smallCacheSize, int normalCacheSize, + int maxCachedBufferCapacity, int freeSweepAllocationThreshold) { + if (maxCachedBufferCapacity < 0) { + throw new IllegalArgumentException("maxCachedBufferCapacity: " + + maxCachedBufferCapacity + " (expected: >= 0)"); + } + if (freeSweepAllocationThreshold < 1) { + throw new IllegalArgumentException("freeSweepAllocationThreshold: " + + maxCachedBufferCapacity + " (expected: > 0)"); + } + this.freeSweepAllocationThreshold = freeSweepAllocationThreshold; this.heapArena = heapArena; this.directArena = directArena; + if (directArena != null) { + tinySubPageDirectCaches = createSubPageCaches(tinyCacheSize, PoolArena.numTinySubpagePools); + smallSubPageDirectCaches = createSubPageCaches(smallCacheSize, directArena.numSmallSubpagePools); + + numShiftsNormalDirect = log2(directArena.pageSize); + normalDirectCaches = createNormalCaches( + normalCacheSize, maxCachedBufferCapacity, directArena); + } else { + // No directArea is configured so just null out all caches + tinySubPageDirectCaches = null; + smallSubPageDirectCaches = null; + normalDirectCaches = null; + numShiftsNormalDirect = -1; + } + if (heapArena != null) { + // Create the caches for the heap allocations + tinySubPageHeapCaches = createSubPageCaches(tinyCacheSize, PoolArena.numTinySubpagePools); + smallSubPageHeapCaches = createSubPageCaches(smallCacheSize, heapArena.numSmallSubpagePools); + + numShiftsNormalHeap = log2(heapArena.pageSize); + normalHeapCaches = createNormalCaches( + normalCacheSize, maxCachedBufferCapacity, heapArena); + } else { + // No heapArea is configured so just null out all caches + tinySubPageHeapCaches = null; + smallSubPageHeapCaches = null; + normalHeapCaches = null; + numShiftsNormalHeap = -1; + } + } + + private static SubPageMemoryRegionCache[] createSubPageCaches(int cacheSize, int numCaches) { + if (cacheSize > 0) { + @SuppressWarnings("unchecked") + SubPageMemoryRegionCache[] cache = new SubPageMemoryRegionCache[numCaches]; + for (int i = 0; i < cache.length; i++) { + // TODO: maybe use cacheSize / cache.length + cache[i] = new SubPageMemoryRegionCache(cacheSize); + } + return cache; + } else { + return null; + } + } + + private static NormalMemoryRegionCache[] createNormalCaches( + int cacheSize, int maxCachedBufferCapacity, PoolArena area) { + if (cacheSize > 0) { + int max = Math.min(area.chunkSize, maxCachedBufferCapacity); + int arraySize = Math.max(1, max / area.pageSize); + + @SuppressWarnings("unchecked") + NormalMemoryRegionCache[] cache = new NormalMemoryRegionCache[arraySize]; + for (int i = 0; i < cache.length; i++) { + cache[i] = new NormalMemoryRegionCache(cacheSize); + } + return cache; + } else { + return null; + } + } + + private static int log2(int val) { + int res = 0; + while (val > 1) { + val >>= 1; + res++; + } + return res; + } + + /** + * Try to allocate a tiny buffer out of the cache. Returns {@code true} if successful {@code false} otherwise + */ + boolean allocateTiny(PoolArena area, PooledByteBuf buf, int reqCapacity, int normCapacity) { + return allocate(cacheForTiny(area, normCapacity), buf, reqCapacity); + } + + /** + * Try to allocate a small buffer out of the cache. Returns {@code true} if successful {@code false} otherwise + */ + boolean allocateSmall(PoolArena area, PooledByteBuf buf, int reqCapacity, int normCapacity) { + return allocate(cacheForSmall(area, normCapacity), buf, reqCapacity); + } + + /** + * Try to allocate a small buffer out of the cache. Returns {@code true} if successful {@code false} otherwise + */ + boolean allocateNormal(PoolArena area, PooledByteBuf buf, int reqCapacity, int normCapacity) { + return allocate(cacheForNormal(area, normCapacity), buf, reqCapacity); + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + private boolean allocate(MemoryRegionCache cache, PooledByteBuf buf, int reqCapacity) { + if (cache == null) { + // no cache found so just return false here + return false; + } + boolean allocated = cache.allocate(buf, reqCapacity); + if (++ allocations >= freeSweepAllocationThreshold) { + allocations = 0; + trim(); + } + return allocated; + } + + /** + * Add {@link PoolChunk} and {@code handle} to the cache if there is enough room. + * Returns {@code true} if it fit into the cache {@code false} otherwise. + */ + @SuppressWarnings({ "unchecked", "rawtypes" }) + boolean add(PoolArena area, PoolChunk chunk, long handle, int normCapacity) { + MemoryRegionCache cache; + if (area.isTinyOrSmall(normCapacity)) { + if (PoolArena.isTiny(normCapacity)) { + cache = cacheForTiny(area, normCapacity); + } else { + cache = cacheForSmall(area, normCapacity); + } + } else { + cache = cacheForNormal(area, normCapacity); + } + if (cache == null) { + return false; + } + return cache.add(chunk, handle); + } + + /** + * Should be called if the Thread that uses this cache is about to exist to release resources out of the cache + */ + void free() { + free(tinySubPageDirectCaches); + free(smallSubPageDirectCaches); + free(normalDirectCaches); + free(tinySubPageHeapCaches); + free(smallSubPageHeapCaches); + free(normalHeapCaches); + } + + private static void free(MemoryRegionCache[] caches) { + if (caches == null) { + return; + } + for (int i = 0; i < caches.length; i++) { + free(caches[i]); + } + } + + private static void free(MemoryRegionCache cache) { + if (cache == null) { + return; + } + cache.free(); + } + + void trim() { + trim(tinySubPageDirectCaches); + trim(smallSubPageDirectCaches); + trim(normalDirectCaches); + trim(tinySubPageHeapCaches); + trim(smallSubPageHeapCaches); + trim(normalHeapCaches); + } + + private static void trim(MemoryRegionCache[] caches) { + if (caches == null) { + return; + } + for (int i = 0; i < caches.length; i++) { + trim(caches[i]); + } + } + + private static void trim(MemoryRegionCache cache) { + if (cache == null) { + return; + } + cache.trim(); + } + + private MemoryRegionCache cacheForTiny(PoolArena area, int normCapacity) { + int idx = PoolArena.tinyIdx(normCapacity); + if (area.isDirect()) { + return cache(tinySubPageDirectCaches, idx); + } + return cache(tinySubPageHeapCaches, idx); + } + + private MemoryRegionCache cacheForSmall(PoolArena area, int normCapacity) { + int idx = PoolArena.smallIdx(normCapacity); + if (area.isDirect()) { + return cache(smallSubPageDirectCaches, idx); + } + return cache(smallSubPageHeapCaches, idx); + } + + private MemoryRegionCache cacheForNormal(PoolArena area, int normCapacity) { + if (area.isDirect()) { + int idx = log2(normCapacity >> numShiftsNormalDirect); + return cache(normalDirectCaches, idx); + } + int idx = log2(normCapacity >> numShiftsNormalHeap); + return cache(normalHeapCaches, idx); + } + + private static MemoryRegionCache cache(MemoryRegionCache[] cache, int idx) { + if (cache == null || idx > cache.length - 1) { + return null; + } + return cache[idx]; + } + + /** + * Cache used for buffers which are backed by TINY or SMALL size. + */ + private static final class SubPageMemoryRegionCache extends MemoryRegionCache { + SubPageMemoryRegionCache(int size) { + super(size); + } + + @Override + protected void initBuf( + PoolChunk chunk, long handle, PooledByteBuf buf, int reqCapacity) { + chunk.initBufWithSubpage(buf, handle, reqCapacity); + } + } + + /** + * Cache used for buffers which are backed by NORMAL size. + */ + private static final class NormalMemoryRegionCache extends MemoryRegionCache { + NormalMemoryRegionCache(int size) { + super(size); + } + + @Override + protected void initBuf( + PoolChunk chunk, long handle, PooledByteBuf buf, int reqCapacity) { + chunk.initBuf(buf, handle, reqCapacity); + } + } + + /** + * Cache of {@link PoolChunk} and handles which can be used to allocate a buffer without locking at all. + */ + private abstract static class MemoryRegionCache { + private final Entry[] entries; + private final int maxUnusedCached; + private int head; + private int tail; + private int maxEntriesInUse; + private int entriesInUse; + + @SuppressWarnings("unchecked") + MemoryRegionCache(int size) { + entries = new Entry[powerOfTwo(size)]; + for (int i = 0; i < entries.length; i++) { + entries[i] = new Entry(); + } + maxUnusedCached = size / 2; + } + + private static int powerOfTwo(int res) { + if (res <= 2) { + return 2; + } + res--; + res |= res >> 1; + res |= res >> 2; + res |= res >> 4; + res |= res >> 8; + res |= res >> 16; + res++; + return res; + } + + /** + * Init the {@link PooledByteBuf} using the provided chunk and handle with the capacity restrictions. + */ + protected abstract void initBuf(PoolChunk chunk, long handle, + PooledByteBuf buf, int reqCapacity); + + /** + * Add to cache if not already full. + */ + public boolean add(PoolChunk chunk, long handle) { + Entry entry = entries[tail]; + if (entry.chunk != null) { + // cache is full + return false; + } + entriesInUse --; + + entry.chunk = chunk; + entry.handle = handle; + tail = nextIdx(tail); + return true; + } + + /** + * Allocate something out of the cache if possible and remove the entry from the cache. + */ + public boolean allocate(PooledByteBuf buf, int reqCapacity) { + Entry entry = entries[head]; + if (entry.chunk == null) { + return false; + } + + entriesInUse ++; + if (maxEntriesInUse < entriesInUse) { + maxEntriesInUse = entriesInUse; + } + initBuf(entry.chunk, entry.handle, buf, reqCapacity); + // only null out the chunk as we only use the chunk to check if the buffer is full or not. + entry.chunk = null; + head = nextIdx(head); + return true; + } + + /** + * Clear out this cache and free up all previous cached {@link PoolChunk}s and {@code handle}s. + */ + public void free() { + entriesInUse = 0; + maxEntriesInUse = 0; + for (int i = head;; i = nextIdx(i)) { + if (!freeEntry(entries[i])) { + // all cleared + return; + } + } + } + + /** + * Free up cached {@link PoolChunk}s if not allocated frequently enough. + */ + private void trim() { + int free = size() - maxEntriesInUse; + entriesInUse = 0; + maxEntriesInUse = 0; + + if (free <= maxUnusedCached) { + return; + } + + int i = head; + for (; free > 0; free--) { + if (!freeEntry(entries[i])) { + // all freed + return; + } + i = nextIdx(i); + } + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + private static boolean freeEntry(Entry entry) { + PoolChunk chunk = entry.chunk; + if (chunk == null) { + return false; + } + // need to synchronize on the area from which it was allocated before. + synchronized (chunk.arena) { + chunk.parent.free(chunk, entry.handle); + } + entry.chunk = null; + return true; + } + + /** + * Return the number of cached entries. + */ + private int size() { + return tail - head & entries.length - 1; + } + + private int nextIdx(int index) { + // use bitwise operation as this is faster as using modulo. + return (index + 1) & entries.length - 1; + } + + private static final class Entry { + PoolChunk chunk; + long handle; + } } } diff --git a/buffer/src/main/java/io/netty/buffer/PooledByteBuf.java b/buffer/src/main/java/io/netty/buffer/PooledByteBuf.java index 64ca85480e..2a993ff6ac 100644 --- a/buffer/src/main/java/io/netty/buffer/PooledByteBuf.java +++ b/buffer/src/main/java/io/netty/buffer/PooledByteBuf.java @@ -31,7 +31,7 @@ abstract class PooledByteBuf extends AbstractReferenceCountedByteBuf { protected T memory; protected int offset; protected int length; - private int maxLength; + int maxLength; private ByteBuffer tmpNioBuf; @@ -142,7 +142,7 @@ abstract class PooledByteBuf extends AbstractReferenceCountedByteBuf { final long handle = this.handle; this.handle = -1; memory = null; - chunk.arena.free(chunk, handle); + chunk.arena.free(chunk, handle, maxLength); recycle(); } } diff --git a/buffer/src/main/java/io/netty/buffer/PooledByteBufAllocator.java b/buffer/src/main/java/io/netty/buffer/PooledByteBufAllocator.java index fd141fd52b..649fd7b1a3 100644 --- a/buffer/src/main/java/io/netty/buffer/PooledByteBufAllocator.java +++ b/buffer/src/main/java/io/netty/buffer/PooledByteBufAllocator.java @@ -33,6 +33,11 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator { private static final int DEFAULT_PAGE_SIZE; private static final int DEFAULT_MAX_ORDER; // 8192 << 11 = 16 MiB per chunk + private static final int DEFAULT_TINY_CACHE_SIZE; + private static final int DEFAULT_SMALL_CACHE_SIZE; + private static final int DEFAULT_NORMAL_CACHE_SIZE; + private static final int DEFAULT_MAX_CACHED_BUFFER_CAPACITY; + private static final int DEFAULT_CACHE_TRIM_INTERVAL; private static final int MIN_PAGE_SIZE = 4096; private static final int MAX_CHUNK_SIZE = (int) (((long) Integer.MAX_VALUE + 1) / 2); @@ -75,6 +80,20 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator { runtime.availableProcessors(), PlatformDependent.maxDirectMemory() / defaultChunkSize / 2 / 3))); + // cache sizes + DEFAULT_TINY_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.tinyCacheSize", 512); + DEFAULT_SMALL_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.smallCacheSize", 256); + DEFAULT_NORMAL_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.normalCacheSize", 64); + + // 32 kb is the default maximum capacity of the cached buffer. Similar to what is explained in + // 'Scalable memory allocation using jemalloc' + DEFAULT_MAX_CACHED_BUFFER_CAPACITY = SystemPropertyUtil.getInt( + "io.netty.allocator.maxCachedBufferCapacity", 32 * 1024); + + // the number of threshold of allocations when cached entries will be freed up if not frequently used + DEFAULT_CACHE_TRIM_INTERVAL = SystemPropertyUtil.getInt( + "io.netty.allocator.cacheTrimInterval", 8192); + if (logger.isDebugEnabled()) { logger.debug("-Dio.netty.allocator.numHeapArenas: {}", DEFAULT_NUM_HEAP_ARENA); logger.debug("-Dio.netty.allocator.numDirectArenas: {}", DEFAULT_NUM_DIRECT_ARENA); @@ -89,6 +108,12 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator { logger.debug("-Dio.netty.allocator.maxOrder: {}", DEFAULT_MAX_ORDER, maxOrderFallbackCause); } logger.debug("-Dio.netty.allocator.chunkSize: {}", DEFAULT_PAGE_SIZE << DEFAULT_MAX_ORDER); + logger.debug("-Dio.netty.allocator.tinyCacheSize: {}", DEFAULT_TINY_CACHE_SIZE); + logger.debug("-Dio.netty.allocator.smallCacheSize: {}", DEFAULT_SMALL_CACHE_SIZE); + logger.debug("-Dio.netty.allocator.normalCacheSize: {}", DEFAULT_NORMAL_CACHE_SIZE); + logger.debug("-Dio.netty.allocator.maxCachedBufferCapacity: {}", DEFAULT_MAX_CACHED_BUFFER_CAPACITY); + logger.debug("-Dio.netty.allocator.cacheTrimInterval: {}", + DEFAULT_CACHE_TRIM_INTERVAL); } } @@ -97,30 +122,11 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator { private final PoolArena[] heapArenas; private final PoolArena[] directArenas; + private final int tinyCacheSize; + private final int smallCacheSize; + private final int normalCacheSize; - final ThreadLocal threadCache = new ThreadLocal() { - private final AtomicInteger index = new AtomicInteger(); - @Override - protected PoolThreadCache initialValue() { - final int idx = index.getAndIncrement(); - final PoolArena heapArena; - final PoolArena directArena; - - if (heapArenas != null) { - heapArena = heapArenas[Math.abs(idx % heapArenas.length)]; - } else { - heapArena = null; - } - - if (directArenas != null) { - directArena = directArenas[Math.abs(idx % directArenas.length)]; - } else { - directArena = null; - } - - return new PoolThreadCache(heapArena, directArena); - } - }; + final PoolThreadLocalCache threadCache = new PoolThreadLocalCache(); public PooledByteBufAllocator() { this(false); @@ -135,8 +141,16 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator { } public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder) { - super(preferDirect); + this(preferDirect, nHeapArena, nDirectArena, pageSize, maxOrder, + DEFAULT_TINY_CACHE_SIZE, DEFAULT_SMALL_CACHE_SIZE, DEFAULT_NORMAL_CACHE_SIZE); + } + public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder, + int tinyCacheSize, int smallCacheSize, int normalCacheSize) { + super(preferDirect); + this.tinyCacheSize = tinyCacheSize; + this.smallCacheSize = smallCacheSize; + this.normalCacheSize = normalCacheSize; final int chunkSize = validateAndCalculateChunkSize(pageSize, maxOrder); if (nHeapArena < 0) { @@ -241,6 +255,73 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator { return directArenas != null; } + /** + * Returns {@code true} if the calling {@link Thread} has a {@link ThreadLocal} cache for the allocated + * buffers. + */ + public boolean hasThreadLocalCache() { + return threadCache.exists(); + } + + /** + * Free all cached buffers for the calling {@link Thread}. + */ + public void freeThreadLocalCache() { + threadCache.free(); + } + + final class PoolThreadLocalCache extends ThreadLocal { + + private final AtomicInteger index = new AtomicInteger(); + + @Override + public PoolThreadCache get() { + PoolThreadCache cache = super.get(); + if (cache == null) { + final int idx = index.getAndIncrement(); + final PoolArena heapArena; + final PoolArena directArena; + + if (heapArenas != null) { + heapArena = heapArenas[Math.abs(idx % heapArenas.length)]; + } else { + heapArena = null; + } + + if (directArenas != null) { + directArena = directArenas[Math.abs(idx % directArenas.length)]; + } else { + directArena = null; + } + // If the current Thread is assigned to an EventExecutor we can + // easily free the cached stuff again once the EventExecutor completes later. + cache = new PoolThreadCache( + heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize, + DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL); + set(cache); + } + return cache; + } + + /** + * Returns {@code true} if the calling {@link Thread} has a {@link ThreadLocal} cache for the allocated + * buffers. + */ + public boolean exists() { + return super.get() != null; + } + + /** + * Free all cached buffers for the calling {@link Thread}. + */ + public void free() { + PoolThreadCache cache = super.get(); + if (cache != null) { + cache.free(); + } + } + } + // Too noisy at the moment. // // public String toString() {