From 8429ecfcc410d22402a6b7ea2b409908190d6428 Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Sat, 1 Mar 2014 15:47:03 +0100 Subject: [PATCH] Implement Thread caches for pooled buffers to minimize conditions. This fixes [#2264] and [#808]. Motivation: Remove the synchronization bottleneck in PoolArena and so speed up things Modifications: This implementation uses kind of the same technics as outlined in the jemalloc paper and jemalloc blogpost https://www.facebook.com/notes/facebook-engineering/scalable-memory-allocation-using-jemalloc/480222803919. At the moment we only cache for "known" Threads (that powers EventExecutors) and not for others to keep the overhead minimal when need to free up unused buffers in the cache and free up cached buffers once the Thread completes. Here we use multi-level caches for tiny, small and normal allocations. Huge allocations are not cached at all to keep the memory usage at a sane level. All the different cache configurations can be adjusted via system properties or the constructor directly where it makes sense. Result: Less conditions as most allocations can be served by the cache itself --- .../main/java/io/netty/buffer/PoolArena.java | 99 +++- .../java/io/netty/buffer/PoolThreadCache.java | 422 +++++++++++++++++- .../java/io/netty/buffer/PooledByteBuf.java | 4 +- .../netty/buffer/PooledByteBufAllocator.java | 129 +++++- 4 files changed, 602 insertions(+), 52 deletions(-) diff --git a/buffer/src/main/java/io/netty/buffer/PoolArena.java b/buffer/src/main/java/io/netty/buffer/PoolArena.java index 2beff46a89..8b24c3711b 100644 --- a/buffer/src/main/java/io/netty/buffer/PoolArena.java +++ b/buffer/src/main/java/io/netty/buffer/PoolArena.java @@ -23,14 +23,16 @@ import java.nio.ByteBuffer; abstract class PoolArena { + static final int numTinySubpagePools = 512 >>> 4; + final PooledByteBufAllocator parent; - private final int pageSize; private final int maxOrder; - private final int pageShifts; - private final int chunkSize; - private final int subpageOverflowMask; - + final int pageSize; + final int pageShifts; + final int chunkSize; + final int subpageOverflowMask; + final int numSmallSubpagePools; private final PoolSubpage[] tinySubpagePools; private final PoolSubpage[] smallSubpagePools; @@ -51,13 +53,13 @@ abstract class PoolArena { this.pageShifts = pageShifts; this.chunkSize = chunkSize; subpageOverflowMask = ~(pageSize - 1); - - tinySubpagePools = newSubpagePoolArray(512 >>> 4); + tinySubpagePools = newSubpagePoolArray(numTinySubpagePools); for (int i = 0; i < tinySubpagePools.length; i ++) { tinySubpagePools[i] = newSubpagePoolHead(pageSize); } - smallSubpagePools = newSubpagePoolArray(pageShifts - 9); + numSmallSubpagePools = pageShifts - 9; + smallSubpagePools = newSubpagePoolArray(numSmallSubpagePools); for (int i = 0; i < smallSubpagePools.length; i ++) { smallSubpagePools[i] = newSubpagePoolHead(pageSize); } @@ -89,27 +91,56 @@ abstract class PoolArena { return new PoolSubpage[size]; } + abstract boolean isDirect(); + PooledByteBuf allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) { PooledByteBuf buf = newByteBuf(maxCapacity); allocate(cache, buf, reqCapacity); return buf; } + static int tinyIdx(int normCapacity) { + return normCapacity >>> 4; + } + + static int smallIdx(int normCapacity) { + int tableIdx = 0; + int i = normCapacity >>> 10; + while (i != 0) { + i >>>= 1; + tableIdx ++; + } + return tableIdx; + } + + // capacity < pageSize + boolean isTinyOrSmall(int normCapacity) { + return (normCapacity & subpageOverflowMask) == 0; + } + + // normCapacity < 512 + static boolean isTiny(int normCapacity) { + return (normCapacity & 0xFFFFFE00) == 0; + } + private void allocate(PoolThreadCache cache, PooledByteBuf buf, final int reqCapacity) { final int normCapacity = normalizeCapacity(reqCapacity); - if ((normCapacity & subpageOverflowMask) == 0) { // capacity < pageSize + if (isTinyOrSmall(normCapacity)) { // capacity < pageSize int tableIdx; PoolSubpage[] table; - if ((normCapacity & 0xFFFFFE00) == 0) { // < 512 - tableIdx = normCapacity >>> 4; + if (isTiny(normCapacity)) { // < 512 + if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) { + // was able to allocate out of the cache so move on + return; + } + tableIdx = tinyIdx(normCapacity); table = tinySubpagePools; } else { - tableIdx = 0; - int i = normCapacity >>> 10; - while (i != 0) { - i >>>= 1; - tableIdx ++; + if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) { + // was able to allocate out of the cache so move on + return; } + tableIdx = smallIdx(normCapacity); table = smallSubpagePools; } @@ -124,11 +155,16 @@ abstract class PoolArena { return; } } - } else if (normCapacity > chunkSize) { + } else if (normCapacity <= chunkSize) { + if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) { + // was able to allocate out of the cache so move on + return; + } + } else { + // Huge allocations are never served via the cache so just call allocateHuge allocateHuge(buf, reqCapacity); return; } - allocateNormal(buf, reqCapacity, normCapacity); } @@ -151,10 +187,15 @@ abstract class PoolArena { buf.initUnpooled(newUnpooledChunk(reqCapacity), reqCapacity); } - void free(PoolChunk chunk, long handle) { + void free(PoolChunk chunk, long handle, int normCapacity) { if (chunk.unpooled) { destroyChunk(chunk); } else { + PoolThreadCache cache = parent.threadCache.get(); + if (cache.add(this, chunk, handle, normCapacity)) { + // cached so not free it. + return; + } synchronized (this) { chunk.parent.free(chunk, handle); } @@ -164,7 +205,7 @@ abstract class PoolArena { PoolSubpage findSubpagePoolHead(int elemSize) { int tableIdx; PoolSubpage[] table; - if ((elemSize & 0xFFFFFE00) == 0) { // < 512 + if (isTiny(elemSize)) { // < 512 tableIdx = elemSize >>> 4; table = tinySubpagePools; } else { @@ -180,7 +221,7 @@ abstract class PoolArena { return table[tableIdx]; } - private int normalizeCapacity(int reqCapacity) { + int normalizeCapacity(int reqCapacity) { if (reqCapacity < 0) { throw new IllegalArgumentException("capacity: " + reqCapacity + " (expected: 0+)"); } @@ -188,7 +229,7 @@ abstract class PoolArena { return reqCapacity; } - if ((reqCapacity & 0xFFFFFE00) != 0) { // >= 512 + if (!isTiny(reqCapacity)) { // >= 512 // Doubled int normalizedCapacity = reqCapacity; @@ -228,7 +269,7 @@ abstract class PoolArena { long oldHandle = buf.handle; T oldMemory = buf.memory; int oldOffset = buf.offset; - + int oldMaxLength = buf.maxLength; int readerIndex = buf.readerIndex(); int writerIndex = buf.writerIndex(); @@ -253,7 +294,7 @@ abstract class PoolArena { buf.setIndex(readerIndex, writerIndex); if (freeOldMemory) { - free(oldChunk, oldHandle); + free(oldChunk, oldHandle, oldMaxLength); } } @@ -339,6 +380,11 @@ abstract class PoolArena { super(parent, pageSize, maxOrder, pageShifts, chunkSize); } + @Override + boolean isDirect() { + return false; + } + @Override protected PoolChunk newChunk(int pageSize, int maxOrder, int pageShifts, int chunkSize) { return new PoolChunk(this, new byte[chunkSize], pageSize, maxOrder, pageShifts, chunkSize); @@ -377,6 +423,11 @@ abstract class PoolArena { super(parent, pageSize, maxOrder, pageShifts, chunkSize); } + @Override + boolean isDirect() { + return true; + } + @Override protected PoolChunk newChunk(int pageSize, int maxOrder, int pageShifts, int chunkSize) { return new PoolChunk( diff --git a/buffer/src/main/java/io/netty/buffer/PoolThreadCache.java b/buffer/src/main/java/io/netty/buffer/PoolThreadCache.java index 09ac499b04..225c11eb7b 100644 --- a/buffer/src/main/java/io/netty/buffer/PoolThreadCache.java +++ b/buffer/src/main/java/io/netty/buffer/PoolThreadCache.java @@ -16,18 +16,436 @@ package io.netty.buffer; + import java.nio.ByteBuffer; +/** + * Acts a Thread cache for allocations. This implementation is moduled after + * jemalloc and the descripted + * technics of Scalable memory allocation using jemalloc. + */ final class PoolThreadCache { - final PoolArena heapArena; final PoolArena directArena; + // Hold the caches for the different size classes, which are tiny, small and normal. + private final MemoryRegionCache[] tinySubPageHeapCaches; + private final MemoryRegionCache[] smallSubPageHeapCaches; + private final MemoryRegionCache[] tinySubPageDirectCaches; + private final MemoryRegionCache[] smallSubPageDirectCaches; + private final MemoryRegionCache[] normalHeapCaches; + private final MemoryRegionCache[] normalDirectCaches; + + // Used for bitshifting when calculate the index of normal caches later + private final int numShiftsNormalDirect; + private final int numShiftsNormalHeap; + private final int freeSweepAllocationThreshold; + + private int allocations; + // TODO: Test if adding padding helps under contention //private long pad0, pad1, pad2, pad3, pad4, pad5, pad6, pad7; - PoolThreadCache(PoolArena heapArena, PoolArena directArena) { + PoolThreadCache(PoolArena heapArena, PoolArena directArena, + int tinyCacheSize, int smallCacheSize, int normalCacheSize, + int maxCachedBufferCapacity, int freeSweepAllocationThreshold) { + if (maxCachedBufferCapacity < 0) { + throw new IllegalArgumentException("maxCachedBufferCapacity: " + + maxCachedBufferCapacity + " (expected: >= 0)"); + } + if (freeSweepAllocationThreshold < 1) { + throw new IllegalArgumentException("freeSweepAllocationThreshold: " + + maxCachedBufferCapacity + " (expected: > 0)"); + } + this.freeSweepAllocationThreshold = freeSweepAllocationThreshold; this.heapArena = heapArena; this.directArena = directArena; + if (directArena != null) { + tinySubPageDirectCaches = createSubPageCaches(tinyCacheSize, PoolArena.numTinySubpagePools); + smallSubPageDirectCaches = createSubPageCaches(smallCacheSize, directArena.numSmallSubpagePools); + + numShiftsNormalDirect = log2(directArena.pageSize); + normalDirectCaches = createNormalCaches( + normalCacheSize, maxCachedBufferCapacity, directArena); + } else { + // No directArea is configured so just null out all caches + tinySubPageDirectCaches = null; + smallSubPageDirectCaches = null; + normalDirectCaches = null; + numShiftsNormalDirect = -1; + } + if (heapArena != null) { + // Create the caches for the heap allocations + tinySubPageHeapCaches = createSubPageCaches(tinyCacheSize, PoolArena.numTinySubpagePools); + smallSubPageHeapCaches = createSubPageCaches(smallCacheSize, heapArena.numSmallSubpagePools); + + numShiftsNormalHeap = log2(heapArena.pageSize); + normalHeapCaches = createNormalCaches( + normalCacheSize, maxCachedBufferCapacity, heapArena); + } else { + // No heapArea is configured so just null out all caches + tinySubPageHeapCaches = null; + smallSubPageHeapCaches = null; + normalHeapCaches = null; + numShiftsNormalHeap = -1; + } + } + + private static SubPageMemoryRegionCache[] createSubPageCaches(int cacheSize, int numCaches) { + if (cacheSize > 0) { + @SuppressWarnings("unchecked") + SubPageMemoryRegionCache[] cache = new SubPageMemoryRegionCache[numCaches]; + for (int i = 0; i < cache.length; i++) { + // TODO: maybe use cacheSize / cache.length + cache[i] = new SubPageMemoryRegionCache(cacheSize); + } + return cache; + } else { + return null; + } + } + + private static NormalMemoryRegionCache[] createNormalCaches( + int cacheSize, int maxCachedBufferCapacity, PoolArena area) { + if (cacheSize > 0) { + int max = Math.min(area.chunkSize, maxCachedBufferCapacity); + int arraySize = Math.max(1, max / area.pageSize); + + @SuppressWarnings("unchecked") + NormalMemoryRegionCache[] cache = new NormalMemoryRegionCache[arraySize]; + for (int i = 0; i < cache.length; i++) { + cache[i] = new NormalMemoryRegionCache(cacheSize); + } + return cache; + } else { + return null; + } + } + + private static int log2(int val) { + int res = 0; + while (val > 1) { + val >>= 1; + res++; + } + return res; + } + + /** + * Try to allocate a tiny buffer out of the cache. Returns {@code true} if successful {@code false} otherwise + */ + boolean allocateTiny(PoolArena area, PooledByteBuf buf, int reqCapacity, int normCapacity) { + return allocate(cacheForTiny(area, normCapacity), buf, reqCapacity); + } + + /** + * Try to allocate a small buffer out of the cache. Returns {@code true} if successful {@code false} otherwise + */ + boolean allocateSmall(PoolArena area, PooledByteBuf buf, int reqCapacity, int normCapacity) { + return allocate(cacheForSmall(area, normCapacity), buf, reqCapacity); + } + + /** + * Try to allocate a small buffer out of the cache. Returns {@code true} if successful {@code false} otherwise + */ + boolean allocateNormal(PoolArena area, PooledByteBuf buf, int reqCapacity, int normCapacity) { + return allocate(cacheForNormal(area, normCapacity), buf, reqCapacity); + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + private boolean allocate(MemoryRegionCache cache, PooledByteBuf buf, int reqCapacity) { + if (cache == null) { + // no cache found so just return false here + return false; + } + boolean allocated = cache.allocate(buf, reqCapacity); + if (++ allocations >= freeSweepAllocationThreshold) { + allocations = 0; + trim(); + } + return allocated; + } + + /** + * Add {@link PoolChunk} and {@code handle} to the cache if there is enough room. + * Returns {@code true} if it fit into the cache {@code false} otherwise. + */ + @SuppressWarnings({ "unchecked", "rawtypes" }) + boolean add(PoolArena area, PoolChunk chunk, long handle, int normCapacity) { + MemoryRegionCache cache; + if (area.isTinyOrSmall(normCapacity)) { + if (PoolArena.isTiny(normCapacity)) { + cache = cacheForTiny(area, normCapacity); + } else { + cache = cacheForSmall(area, normCapacity); + } + } else { + cache = cacheForNormal(area, normCapacity); + } + if (cache == null) { + return false; + } + return cache.add(chunk, handle); + } + + /** + * Should be called if the Thread that uses this cache is about to exist to release resources out of the cache + */ + void free() { + free(tinySubPageDirectCaches); + free(smallSubPageDirectCaches); + free(normalDirectCaches); + free(tinySubPageHeapCaches); + free(smallSubPageHeapCaches); + free(normalHeapCaches); + } + + private static void free(MemoryRegionCache[] caches) { + if (caches == null) { + return; + } + for (int i = 0; i < caches.length; i++) { + free(caches[i]); + } + } + + private static void free(MemoryRegionCache cache) { + if (cache == null) { + return; + } + cache.free(); + } + + void trim() { + trim(tinySubPageDirectCaches); + trim(smallSubPageDirectCaches); + trim(normalDirectCaches); + trim(tinySubPageHeapCaches); + trim(smallSubPageHeapCaches); + trim(normalHeapCaches); + } + + private static void trim(MemoryRegionCache[] caches) { + if (caches == null) { + return; + } + for (int i = 0; i < caches.length; i++) { + trim(caches[i]); + } + } + + private static void trim(MemoryRegionCache cache) { + if (cache == null) { + return; + } + cache.trim(); + } + + private MemoryRegionCache cacheForTiny(PoolArena area, int normCapacity) { + int idx = PoolArena.tinyIdx(normCapacity); + if (area.isDirect()) { + return cache(tinySubPageDirectCaches, idx); + } + return cache(tinySubPageHeapCaches, idx); + } + + private MemoryRegionCache cacheForSmall(PoolArena area, int normCapacity) { + int idx = PoolArena.smallIdx(normCapacity); + if (area.isDirect()) { + return cache(smallSubPageDirectCaches, idx); + } + return cache(smallSubPageHeapCaches, idx); + } + + private MemoryRegionCache cacheForNormal(PoolArena area, int normCapacity) { + if (area.isDirect()) { + int idx = log2(normCapacity >> numShiftsNormalDirect); + return cache(normalDirectCaches, idx); + } + int idx = log2(normCapacity >> numShiftsNormalHeap); + return cache(normalHeapCaches, idx); + } + + private static MemoryRegionCache cache(MemoryRegionCache[] cache, int idx) { + if (cache == null || idx > cache.length - 1) { + return null; + } + return cache[idx]; + } + + /** + * Cache used for buffers which are backed by TINY or SMALL size. + */ + private static final class SubPageMemoryRegionCache extends MemoryRegionCache { + SubPageMemoryRegionCache(int size) { + super(size); + } + + @Override + protected void initBuf( + PoolChunk chunk, long handle, PooledByteBuf buf, int reqCapacity) { + chunk.initBufWithSubpage(buf, handle, reqCapacity); + } + } + + /** + * Cache used for buffers which are backed by NORMAL size. + */ + private static final class NormalMemoryRegionCache extends MemoryRegionCache { + NormalMemoryRegionCache(int size) { + super(size); + } + + @Override + protected void initBuf( + PoolChunk chunk, long handle, PooledByteBuf buf, int reqCapacity) { + chunk.initBuf(buf, handle, reqCapacity); + } + } + + /** + * Cache of {@link PoolChunk} and handles which can be used to allocate a buffer without locking at all. + */ + private abstract static class MemoryRegionCache { + private final Entry[] entries; + private final int maxUnusedCached; + private int head; + private int tail; + private int maxEntriesInUse; + private int entriesInUse; + + @SuppressWarnings("unchecked") + MemoryRegionCache(int size) { + entries = new Entry[powerOfTwo(size)]; + for (int i = 0; i < entries.length; i++) { + entries[i] = new Entry(); + } + maxUnusedCached = size / 2; + } + + private static int powerOfTwo(int res) { + if (res <= 2) { + return 2; + } + res--; + res |= res >> 1; + res |= res >> 2; + res |= res >> 4; + res |= res >> 8; + res |= res >> 16; + res++; + return res; + } + + /** + * Init the {@link PooledByteBuf} using the provided chunk and handle with the capacity restrictions. + */ + protected abstract void initBuf(PoolChunk chunk, long handle, + PooledByteBuf buf, int reqCapacity); + + /** + * Add to cache if not already full. + */ + public boolean add(PoolChunk chunk, long handle) { + Entry entry = entries[tail]; + if (entry.chunk != null) { + // cache is full + return false; + } + entriesInUse --; + + entry.chunk = chunk; + entry.handle = handle; + tail = nextIdx(tail); + return true; + } + + /** + * Allocate something out of the cache if possible and remove the entry from the cache. + */ + public boolean allocate(PooledByteBuf buf, int reqCapacity) { + Entry entry = entries[head]; + if (entry.chunk == null) { + return false; + } + + entriesInUse ++; + if (maxEntriesInUse < entriesInUse) { + maxEntriesInUse = entriesInUse; + } + initBuf(entry.chunk, entry.handle, buf, reqCapacity); + // only null out the chunk as we only use the chunk to check if the buffer is full or not. + entry.chunk = null; + head = nextIdx(head); + return true; + } + + /** + * Clear out this cache and free up all previous cached {@link PoolChunk}s and {@code handle}s. + */ + public void free() { + entriesInUse = 0; + maxEntriesInUse = 0; + for (int i = head;; i = nextIdx(i)) { + if (!freeEntry(entries[i])) { + // all cleared + return; + } + } + } + + /** + * Free up cached {@link PoolChunk}s if not allocated frequently enough. + */ + private void trim() { + int free = size() - maxEntriesInUse; + entriesInUse = 0; + maxEntriesInUse = 0; + + if (free <= maxUnusedCached) { + return; + } + + int i = head; + for (; free > 0; free--) { + if (!freeEntry(entries[i])) { + // all freed + return; + } + i = nextIdx(i); + } + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + private static boolean freeEntry(Entry entry) { + PoolChunk chunk = entry.chunk; + if (chunk == null) { + return false; + } + // need to synchronize on the area from which it was allocated before. + synchronized (chunk.arena) { + chunk.parent.free(chunk, entry.handle); + } + entry.chunk = null; + return true; + } + + /** + * Return the number of cached entries. + */ + private int size() { + return tail - head & entries.length - 1; + } + + private int nextIdx(int index) { + // use bitwise operation as this is faster as using modulo. + return (index + 1) & entries.length - 1; + } + + private static final class Entry { + PoolChunk chunk; + long handle; + } } } diff --git a/buffer/src/main/java/io/netty/buffer/PooledByteBuf.java b/buffer/src/main/java/io/netty/buffer/PooledByteBuf.java index 64ca85480e..2a993ff6ac 100644 --- a/buffer/src/main/java/io/netty/buffer/PooledByteBuf.java +++ b/buffer/src/main/java/io/netty/buffer/PooledByteBuf.java @@ -31,7 +31,7 @@ abstract class PooledByteBuf extends AbstractReferenceCountedByteBuf { protected T memory; protected int offset; protected int length; - private int maxLength; + int maxLength; private ByteBuffer tmpNioBuf; @@ -142,7 +142,7 @@ abstract class PooledByteBuf extends AbstractReferenceCountedByteBuf { final long handle = this.handle; this.handle = -1; memory = null; - chunk.arena.free(chunk, handle); + chunk.arena.free(chunk, handle, maxLength); recycle(); } } diff --git a/buffer/src/main/java/io/netty/buffer/PooledByteBufAllocator.java b/buffer/src/main/java/io/netty/buffer/PooledByteBufAllocator.java index fd141fd52b..649fd7b1a3 100644 --- a/buffer/src/main/java/io/netty/buffer/PooledByteBufAllocator.java +++ b/buffer/src/main/java/io/netty/buffer/PooledByteBufAllocator.java @@ -33,6 +33,11 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator { private static final int DEFAULT_PAGE_SIZE; private static final int DEFAULT_MAX_ORDER; // 8192 << 11 = 16 MiB per chunk + private static final int DEFAULT_TINY_CACHE_SIZE; + private static final int DEFAULT_SMALL_CACHE_SIZE; + private static final int DEFAULT_NORMAL_CACHE_SIZE; + private static final int DEFAULT_MAX_CACHED_BUFFER_CAPACITY; + private static final int DEFAULT_CACHE_TRIM_INTERVAL; private static final int MIN_PAGE_SIZE = 4096; private static final int MAX_CHUNK_SIZE = (int) (((long) Integer.MAX_VALUE + 1) / 2); @@ -75,6 +80,20 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator { runtime.availableProcessors(), PlatformDependent.maxDirectMemory() / defaultChunkSize / 2 / 3))); + // cache sizes + DEFAULT_TINY_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.tinyCacheSize", 512); + DEFAULT_SMALL_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.smallCacheSize", 256); + DEFAULT_NORMAL_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.normalCacheSize", 64); + + // 32 kb is the default maximum capacity of the cached buffer. Similar to what is explained in + // 'Scalable memory allocation using jemalloc' + DEFAULT_MAX_CACHED_BUFFER_CAPACITY = SystemPropertyUtil.getInt( + "io.netty.allocator.maxCachedBufferCapacity", 32 * 1024); + + // the number of threshold of allocations when cached entries will be freed up if not frequently used + DEFAULT_CACHE_TRIM_INTERVAL = SystemPropertyUtil.getInt( + "io.netty.allocator.cacheTrimInterval", 8192); + if (logger.isDebugEnabled()) { logger.debug("-Dio.netty.allocator.numHeapArenas: {}", DEFAULT_NUM_HEAP_ARENA); logger.debug("-Dio.netty.allocator.numDirectArenas: {}", DEFAULT_NUM_DIRECT_ARENA); @@ -89,6 +108,12 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator { logger.debug("-Dio.netty.allocator.maxOrder: {}", DEFAULT_MAX_ORDER, maxOrderFallbackCause); } logger.debug("-Dio.netty.allocator.chunkSize: {}", DEFAULT_PAGE_SIZE << DEFAULT_MAX_ORDER); + logger.debug("-Dio.netty.allocator.tinyCacheSize: {}", DEFAULT_TINY_CACHE_SIZE); + logger.debug("-Dio.netty.allocator.smallCacheSize: {}", DEFAULT_SMALL_CACHE_SIZE); + logger.debug("-Dio.netty.allocator.normalCacheSize: {}", DEFAULT_NORMAL_CACHE_SIZE); + logger.debug("-Dio.netty.allocator.maxCachedBufferCapacity: {}", DEFAULT_MAX_CACHED_BUFFER_CAPACITY); + logger.debug("-Dio.netty.allocator.cacheTrimInterval: {}", + DEFAULT_CACHE_TRIM_INTERVAL); } } @@ -97,30 +122,11 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator { private final PoolArena[] heapArenas; private final PoolArena[] directArenas; + private final int tinyCacheSize; + private final int smallCacheSize; + private final int normalCacheSize; - final ThreadLocal threadCache = new ThreadLocal() { - private final AtomicInteger index = new AtomicInteger(); - @Override - protected PoolThreadCache initialValue() { - final int idx = index.getAndIncrement(); - final PoolArena heapArena; - final PoolArena directArena; - - if (heapArenas != null) { - heapArena = heapArenas[Math.abs(idx % heapArenas.length)]; - } else { - heapArena = null; - } - - if (directArenas != null) { - directArena = directArenas[Math.abs(idx % directArenas.length)]; - } else { - directArena = null; - } - - return new PoolThreadCache(heapArena, directArena); - } - }; + final PoolThreadLocalCache threadCache = new PoolThreadLocalCache(); public PooledByteBufAllocator() { this(false); @@ -135,8 +141,16 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator { } public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder) { - super(preferDirect); + this(preferDirect, nHeapArena, nDirectArena, pageSize, maxOrder, + DEFAULT_TINY_CACHE_SIZE, DEFAULT_SMALL_CACHE_SIZE, DEFAULT_NORMAL_CACHE_SIZE); + } + public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder, + int tinyCacheSize, int smallCacheSize, int normalCacheSize) { + super(preferDirect); + this.tinyCacheSize = tinyCacheSize; + this.smallCacheSize = smallCacheSize; + this.normalCacheSize = normalCacheSize; final int chunkSize = validateAndCalculateChunkSize(pageSize, maxOrder); if (nHeapArena < 0) { @@ -241,6 +255,73 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator { return directArenas != null; } + /** + * Returns {@code true} if the calling {@link Thread} has a {@link ThreadLocal} cache for the allocated + * buffers. + */ + public boolean hasThreadLocalCache() { + return threadCache.exists(); + } + + /** + * Free all cached buffers for the calling {@link Thread}. + */ + public void freeThreadLocalCache() { + threadCache.free(); + } + + final class PoolThreadLocalCache extends ThreadLocal { + + private final AtomicInteger index = new AtomicInteger(); + + @Override + public PoolThreadCache get() { + PoolThreadCache cache = super.get(); + if (cache == null) { + final int idx = index.getAndIncrement(); + final PoolArena heapArena; + final PoolArena directArena; + + if (heapArenas != null) { + heapArena = heapArenas[Math.abs(idx % heapArenas.length)]; + } else { + heapArena = null; + } + + if (directArenas != null) { + directArena = directArenas[Math.abs(idx % directArenas.length)]; + } else { + directArena = null; + } + // If the current Thread is assigned to an EventExecutor we can + // easily free the cached stuff again once the EventExecutor completes later. + cache = new PoolThreadCache( + heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize, + DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL); + set(cache); + } + return cache; + } + + /** + * Returns {@code true} if the calling {@link Thread} has a {@link ThreadLocal} cache for the allocated + * buffers. + */ + public boolean exists() { + return super.get() != null; + } + + /** + * Free all cached buffers for the calling {@link Thread}. + */ + public void free() { + PoolThreadCache cache = super.get(); + if (cache != null) { + cache.free(); + } + } + } + // Too noisy at the moment. // // public String toString() {