From 0d701d7c3c51263a1eef56d5a549ef2075b9aa9e Mon Sep 17 00:00:00 2001 From: Ruwei <295415537@qq.com> Date: Thu, 16 Jul 2020 03:33:27 +0800 Subject: [PATCH] Review PooledByteBufAllocator in respect of jemalloc 4.x changes and update allocate algorithm.(#10267) Motivation: For size from 512 bytes to chunkSize, we use a buddy algorithm. The drawback is that it has a large internal fragmentation. Modifications: 1. add SizeClassesMetric and SizeClasses 2. remove tiny size, now we have small, normal and huge size 3. rewrite the structure of PoolChunk 4. rewrite pooled allocate algorithm in PoolChunk 5. when allocate subpage, using lowest common multiple of pageSize and elemSize instead of pageSize. 6. add more tests in PooledByteBufAllocatorTest and PoolArenaTest Result: Reduce internal fragmentation and closes #3910 --- .../main/java/io/netty/buffer/PoolArena.java | 318 +++----- .../java/io/netty/buffer/PoolArenaMetric.java | 17 +- .../main/java/io/netty/buffer/PoolChunk.java | 681 +++++++++++------- .../java/io/netty/buffer/PoolChunkList.java | 9 +- .../java/io/netty/buffer/PoolSubpage.java | 47 +- .../io/netty/buffer/PoolSubpageMetric.java | 2 +- .../java/io/netty/buffer/PoolThreadCache.java | 100 +-- .../netty/buffer/PooledByteBufAllocator.java | 63 +- .../buffer/PooledByteBufAllocatorMetric.java | 4 +- .../java/io/netty/buffer/SizeClasses.java | 407 +++++++++++ .../io/netty/buffer/SizeClassesMetric.java | 87 +++ .../java/io/netty/buffer/PoolArenaTest.java | 93 ++- .../buffer/PooledByteBufAllocatorTest.java | 124 +++- 13 files changed, 1306 insertions(+), 646 deletions(-) create mode 100644 buffer/src/main/java/io/netty/buffer/SizeClasses.java create mode 100644 buffer/src/main/java/io/netty/buffer/SizeClassesMetric.java diff --git a/buffer/src/main/java/io/netty/buffer/PoolArena.java b/buffer/src/main/java/io/netty/buffer/PoolArena.java index e025d4a9e5..427e55c8ec 100644 --- a/buffer/src/main/java/io/netty/buffer/PoolArena.java +++ b/buffer/src/main/java/io/netty/buffer/PoolArena.java @@ -26,31 +26,22 @@ import java.util.List; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.LongAdder; -import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero; +import static io.netty.buffer.PoolChunk.isSubpage; import static java.lang.Math.max; -abstract class PoolArena implements PoolArenaMetric { +abstract class PoolArena extends SizeClasses implements PoolArenaMetric { static final boolean HAS_UNSAFE = PlatformDependent.hasUnsafe(); enum SizeClass { - Tiny, Small, Normal } - static final int numTinySubpagePools = 512 >>> 4; - final PooledByteBufAllocator parent; - private final int maxOrder; - final int pageSize; - final int pageShifts; - final int chunkSize; - final int subpageOverflowMask; final int numSmallSubpagePools; final int directMemoryCacheAlignment; final int directMemoryCacheAlignmentMask; - private final PoolSubpage[] tinySubpagePools; private final PoolSubpage[] smallSubpagePools; private final PoolChunkList q050; @@ -64,13 +55,12 @@ abstract class PoolArena implements PoolArenaMetric { // Metrics for allocations and deallocations private long allocationsNormal; + // We need to use the LongAdder here as this is not guarded via synchronized block. - private final LongAdder allocationsTiny = new LongAdder(); private final LongAdder allocationsSmall = new LongAdder(); private final LongAdder allocationsHuge = new LongAdder(); private final LongAdder activeBytesHuge = new LongAdder(); - private long deallocationsTiny; private long deallocationsSmall; private long deallocationsNormal; @@ -84,24 +74,16 @@ abstract class PoolArena implements PoolArenaMetric { //private long pad0, pad1, pad2, pad3, pad4, pad5, pad6, pad7; protected PoolArena(PooledByteBufAllocator parent, int pageSize, - int maxOrder, int pageShifts, int chunkSize, int cacheAlignment) { + int pageShifts, int chunkSize, int cacheAlignment) { + super(pageSize, pageShifts, chunkSize, cacheAlignment); this.parent = parent; - this.pageSize = pageSize; - this.maxOrder = maxOrder; - this.pageShifts = pageShifts; - this.chunkSize = chunkSize; directMemoryCacheAlignment = cacheAlignment; directMemoryCacheAlignmentMask = cacheAlignment - 1; - subpageOverflowMask = ~(pageSize - 1); - tinySubpagePools = newSubpagePoolArray(numTinySubpagePools); - for (int i = 0; i < tinySubpagePools.length; i ++) { - tinySubpagePools[i] = newSubpagePoolHead(pageSize); - } - numSmallSubpagePools = pageShifts - 9; + numSmallSubpagePools = nSubpages; smallSubpagePools = newSubpagePoolArray(numSmallSubpagePools); for (int i = 0; i < smallSubpagePools.length; i ++) { - smallSubpagePools[i] = newSubpagePoolHead(pageSize); + smallSubpagePools[i] = newSubpagePoolHead(); } q100 = new PoolChunkList<>(this, null, 100, Integer.MAX_VALUE, chunkSize); @@ -128,8 +110,8 @@ abstract class PoolArena implements PoolArenaMetric { chunkListMetrics = Collections.unmodifiableList(metrics); } - private PoolSubpage newSubpagePoolHead(int pageSize) { - PoolSubpage head = new PoolSubpage<>(pageSize); + private PoolSubpage newSubpagePoolHead() { + PoolSubpage head = new PoolSubpage<>(); head.prev = head; head.next = head; return head; @@ -148,114 +130,83 @@ abstract class PoolArena implements PoolArenaMetric { return buf; } - static int tinyIdx(int normCapacity) { - return normCapacity >>> 4; - } - - static int smallIdx(int normCapacity) { - int tableIdx = 0; - int i = normCapacity >>> 10; - while (i != 0) { - i >>>= 1; - tableIdx ++; - } - return tableIdx; - } - - // capacity < pageSize - boolean isTinyOrSmall(int normCapacity) { - return (normCapacity & subpageOverflowMask) == 0; - } - - // normCapacity < 512 - static boolean isTiny(int normCapacity) { - return (normCapacity & 0xFFFFFE00) == 0; - } - private void allocate(PoolThreadCache cache, PooledByteBuf buf, final int reqCapacity) { - final int normCapacity = normalizeCapacity(reqCapacity); - if (isTinyOrSmall(normCapacity)) { // capacity < pageSize - int tableIdx; - PoolSubpage[] table; - boolean tiny = isTiny(normCapacity); - if (tiny) { // < 512 - if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) { - // was able to allocate out of the cache so move on - return; - } - tableIdx = tinyIdx(normCapacity); - table = tinySubpagePools; - } else { - if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) { - // was able to allocate out of the cache so move on - return; - } - tableIdx = smallIdx(normCapacity); - table = smallSubpagePools; - } + final int sizeIdx = size2SizeIdx(reqCapacity); - final PoolSubpage head = table[tableIdx]; + if (sizeIdx <= smallMaxSizeIdx) { + tcacheAllocateSmall(cache, buf, reqCapacity, sizeIdx); + } else if (sizeIdx < nSizes) { + tcacheAllocateNormal(cache, buf, reqCapacity, sizeIdx); + } else { + int normCapacity = directMemoryCacheAlignment > 0 + ? normalizeSize(reqCapacity) : reqCapacity; + // Huge allocations are never served via the cache so just call allocateHuge + allocateHuge(buf, normCapacity); + } + } - /** - * Synchronize on the head. This is needed as {@link PoolChunk#allocateSubpage(int)} and - * {@link PoolChunk#free(long)} may modify the doubly linked list as well. - */ - synchronized (head) { - final PoolSubpage s = head.next; - if (s != head) { - assert s.doNotDestroy && s.elemSize == normCapacity; - long handle = s.allocate(); - assert handle >= 0; - s.chunk.initBufWithSubpage(buf, null, handle, reqCapacity, cache); - incTinySmallAllocation(tiny); - return; - } - } - synchronized (this) { - allocateNormal(buf, reqCapacity, normCapacity, cache); - } + private void tcacheAllocateSmall(PoolThreadCache cache, PooledByteBuf buf, final int reqCapacity, + final int sizeIdx) { - incTinySmallAllocation(tiny); + if (cache.allocateSmall(this, buf, reqCapacity, sizeIdx)) { + // was able to allocate out of the cache so move on return; } - if (normCapacity <= chunkSize) { - if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) { - // was able to allocate out of the cache so move on + + /** + * Synchronize on the head. This is needed as {@link PoolChunk#allocateSubpage(int)} and + * {@link PoolChunk#free(long)} may modify the doubly linked list as well. + */ + final PoolSubpage head = smallSubpagePools[sizeIdx]; + synchronized (head) { + final PoolSubpage s = head.next; + if (s != head) { + assert s.doNotDestroy && s.elemSize == sizeIdx2size(sizeIdx); + long handle = s.allocate(); + assert handle >= 0; + s.chunk.initBufWithSubpage(buf, null, handle, reqCapacity, cache); + incSmallAllocation(); return; } - synchronized (this) { - allocateNormal(buf, reqCapacity, normCapacity, cache); - ++allocationsNormal; - } - } else { - // Huge allocations are never served via the cache so just call allocateHuge - allocateHuge(buf, reqCapacity); + } + + synchronized (this) { + allocateNormal(buf, reqCapacity, sizeIdx, cache); + } + incSmallAllocation(); + } + + private void tcacheAllocateNormal(PoolThreadCache cache, PooledByteBuf buf, final int reqCapacity, + final int sizeIdx) { + if (cache.allocateNormal(this, buf, reqCapacity, sizeIdx)) { + // was able to allocate out of the cache so move on + return; + } + synchronized (this) { + allocateNormal(buf, reqCapacity, sizeIdx, cache); + ++allocationsNormal; } } // Method must be called inside synchronized(this) { ... } block - private void allocateNormal(PooledByteBuf buf, int reqCapacity, int normCapacity, PoolThreadCache threadCache) { - if (q050.allocate(buf, reqCapacity, normCapacity, threadCache) || - q025.allocate(buf, reqCapacity, normCapacity, threadCache) || - q000.allocate(buf, reqCapacity, normCapacity, threadCache) || - qInit.allocate(buf, reqCapacity, normCapacity, threadCache) || - q075.allocate(buf, reqCapacity, normCapacity, threadCache)) { + private void allocateNormal(PooledByteBuf buf, int reqCapacity, int sizeIdx, PoolThreadCache threadCache) { + if (q050.allocate(buf, reqCapacity, sizeIdx, threadCache) || + q025.allocate(buf, reqCapacity, sizeIdx, threadCache) || + q000.allocate(buf, reqCapacity, sizeIdx, threadCache) || + qInit.allocate(buf, reqCapacity, sizeIdx, threadCache) || + q075.allocate(buf, reqCapacity, sizeIdx, threadCache)) { return; } // Add a new chunk. - PoolChunk c = newChunk(pageSize, maxOrder, pageShifts, chunkSize); - boolean success = c.allocate(buf, reqCapacity, normCapacity, threadCache); + PoolChunk c = newChunk(pageSize, nPSizes, pageShifts, chunkSize); + boolean success = c.allocate(buf, reqCapacity, sizeIdx, threadCache); assert success; qInit.add(c); } - private void incTinySmallAllocation(boolean tiny) { - if (tiny) { - allocationsTiny.increment(); - } else { - allocationsSmall.increment(); - } + private void incSmallAllocation() { + allocationsSmall.increment(); } private void allocateHuge(PooledByteBuf buf, int reqCapacity) { @@ -272,24 +223,22 @@ abstract class PoolArena implements PoolArenaMetric { activeBytesHuge.add(-size); deallocationsHuge.increment(); } else { - SizeClass sizeClass = sizeClass(normCapacity); + SizeClass sizeClass = sizeClass(handle); if (cache != null && cache.add(this, chunk, nioBuffer, handle, normCapacity, sizeClass)) { // cached so not free it. return; } - freeChunk(chunk, handle, sizeClass, nioBuffer, false); + freeChunk(chunk, handle, normCapacity, sizeClass, nioBuffer, false); } } - private SizeClass sizeClass(int normCapacity) { - if (!isTinyOrSmall(normCapacity)) { - return SizeClass.Normal; - } - return isTiny(normCapacity) ? SizeClass.Tiny : SizeClass.Small; + private SizeClass sizeClass(long handle) { + return isSubpage(handle) ? SizeClass.Small : SizeClass.Normal; } - void freeChunk(PoolChunk chunk, long handle, SizeClass sizeClass, ByteBuffer nioBuffer, boolean finalizer) { + void freeChunk(PoolChunk chunk, long handle, int normCapacity, SizeClass sizeClass, ByteBuffer nioBuffer, + boolean finalizer) { final boolean destroyChunk; synchronized (this) { // We only call this if freeChunk is not called because of the PoolThreadCache finalizer as otherwise this @@ -302,14 +251,11 @@ abstract class PoolArena implements PoolArenaMetric { case Small: ++deallocationsSmall; break; - case Tiny: - ++deallocationsTiny; - break; default: throw new Error(); } } - destroyChunk = !chunk.parent.free(chunk, handle, nioBuffer); + destroyChunk = !chunk.parent.free(chunk, handle, normCapacity, nioBuffer); } if (destroyChunk) { // destroyChunk not need to be called while holding the synchronized lock. @@ -317,62 +263,8 @@ abstract class PoolArena implements PoolArenaMetric { } } - PoolSubpage findSubpagePoolHead(int elemSize) { - int tableIdx; - PoolSubpage[] table; - if (isTiny(elemSize)) { // < 512 - tableIdx = tinyIdx(elemSize); - table = tinySubpagePools; - } else { - tableIdx = smallIdx(elemSize); - table = smallSubpagePools; - } - - return table[tableIdx]; - } - - int normalizeCapacity(int reqCapacity) { - checkPositiveOrZero(reqCapacity, "reqCapacity"); - - if (reqCapacity >= chunkSize) { - return directMemoryCacheAlignment == 0 ? reqCapacity : alignCapacity(reqCapacity); - } - - if (!isTiny(reqCapacity)) { // >= 512 - // Doubled - - int normalizedCapacity = reqCapacity; - normalizedCapacity --; - normalizedCapacity |= normalizedCapacity >>> 1; - normalizedCapacity |= normalizedCapacity >>> 2; - normalizedCapacity |= normalizedCapacity >>> 4; - normalizedCapacity |= normalizedCapacity >>> 8; - normalizedCapacity |= normalizedCapacity >>> 16; - normalizedCapacity ++; - - if (normalizedCapacity < 0) { - normalizedCapacity >>>= 1; - } - assert directMemoryCacheAlignment == 0 || (normalizedCapacity & directMemoryCacheAlignmentMask) == 0; - - return normalizedCapacity; - } - - if (directMemoryCacheAlignment > 0) { - return alignCapacity(reqCapacity); - } - - // Quantum-spaced - if ((reqCapacity & 15) == 0) { - return reqCapacity; - } - - return (reqCapacity & ~15) + 16; - } - - int alignCapacity(int reqCapacity) { - int delta = reqCapacity & directMemoryCacheAlignmentMask; - return delta == 0 ? reqCapacity : reqCapacity + directMemoryCacheAlignment - delta; + PoolSubpage findSubpagePoolHead(int sizeIdx) { + return smallSubpagePools[sizeIdx]; } void reallocate(PooledByteBuf buf, int newCapacity, boolean freeOldMemory) { @@ -412,7 +304,7 @@ abstract class PoolArena implements PoolArenaMetric { @Override public int numTinySubpages() { - return tinySubpagePools.length; + return 0; } @Override @@ -427,7 +319,7 @@ abstract class PoolArena implements PoolArenaMetric { @Override public List tinySubpages() { - return subPageMetricList(tinySubpagePools); + return Collections.emptyList(); } @Override @@ -461,12 +353,13 @@ abstract class PoolArena implements PoolArenaMetric { synchronized (this) { allocsNormal = allocationsNormal; } - return allocationsTiny.longValue() + allocationsSmall.longValue() + allocsNormal + allocationsHuge.longValue(); + + return allocationsSmall.longValue() + allocsNormal + allocationsHuge.longValue(); } @Override public long numTinyAllocations() { - return allocationsTiny.longValue(); + return 0; } @Override @@ -483,14 +376,14 @@ abstract class PoolArena implements PoolArenaMetric { public long numDeallocations() { final long deallocs; synchronized (this) { - deallocs = deallocationsTiny + deallocationsSmall + deallocationsNormal; + deallocs = deallocationsSmall + deallocationsNormal; } return deallocs + deallocationsHuge.longValue(); } @Override - public synchronized long numTinyDeallocations() { - return deallocationsTiny; + public long numTinyDeallocations() { + return 0; } @Override @@ -515,17 +408,18 @@ abstract class PoolArena implements PoolArenaMetric { @Override public long numActiveAllocations() { - long val = allocationsTiny.longValue() + allocationsSmall.longValue() + allocationsHuge.longValue() + + long val = allocationsSmall.longValue() + allocationsHuge.longValue() - deallocationsHuge.longValue(); synchronized (this) { - val += allocationsNormal - (deallocationsTiny + deallocationsSmall + deallocationsNormal); + val += allocationsNormal - (deallocationsSmall + deallocationsNormal); } return max(val, 0); } @Override public long numActiveTinyAllocations() { - return max(numTinyAllocations() - numTinyDeallocations(), 0); + return 0; } @Override @@ -560,7 +454,7 @@ abstract class PoolArena implements PoolArenaMetric { return max(0, val); } - protected abstract PoolChunk newChunk(int pageSize, int maxOrder, int pageShifts, int chunkSize); + protected abstract PoolChunk newChunk(int pageSize, int maxPageIdx, int pageShifts, int chunkSize); protected abstract PoolChunk newUnpooledChunk(int capacity); protected abstract PooledByteBuf newByteBuf(int maxCapacity); protected abstract void memoryCopy(T src, int srcOffset, PooledByteBuf dst, int length); @@ -593,10 +487,7 @@ abstract class PoolArena implements PoolArenaMetric { .append(StringUtil.NEWLINE) .append(q100) .append(StringUtil.NEWLINE) - .append("tiny subpages:"); - appendPoolSubPages(buf, tinySubpagePools); - buf.append(StringUtil.NEWLINE) - .append("small subpages:"); + .append("small subpages:"); appendPoolSubPages(buf, smallSubpagePools); buf.append(StringUtil.NEWLINE); @@ -627,7 +518,6 @@ abstract class PoolArena implements PoolArenaMetric { super.finalize(); } finally { destroyPoolSubPages(smallSubpagePools); - destroyPoolSubPages(tinySubpagePools); destroyPoolChunkLists(qInit, q000, q025, q050, q075, q100); } } @@ -646,10 +536,10 @@ abstract class PoolArena implements PoolArenaMetric { static final class HeapArena extends PoolArena { - HeapArena(PooledByteBufAllocator parent, int pageSize, int maxOrder, - int pageShifts, int chunkSize, int directMemoryCacheAlignment) { - super(parent, pageSize, maxOrder, pageShifts, chunkSize, - directMemoryCacheAlignment); + HeapArena(PooledByteBufAllocator parent, int pageSize, int pageShifts, + int chunkSize, int directMemoryCacheAlignment) { + super(parent, pageSize, pageShifts, chunkSize, + directMemoryCacheAlignment); } private static byte[] newByteArray(int size) { @@ -662,8 +552,8 @@ abstract class PoolArena implements PoolArenaMetric { } @Override - protected PoolChunk newChunk(int pageSize, int maxOrder, int pageShifts, int chunkSize) { - return new PoolChunk<>(this, newByteArray(chunkSize), pageSize, maxOrder, pageShifts, chunkSize, 0); + protected PoolChunk newChunk(int pageSize, int maxPageIdx, int pageShifts, int chunkSize) { + return new PoolChunk<>(this, newByteArray(chunkSize), pageSize, pageShifts, chunkSize, maxPageIdx, 0); } @Override @@ -694,10 +584,10 @@ abstract class PoolArena implements PoolArenaMetric { static final class DirectArena extends PoolArena { - DirectArena(PooledByteBufAllocator parent, int pageSize, int maxOrder, - int pageShifts, int chunkSize, int directMemoryCacheAlignment) { - super(parent, pageSize, maxOrder, pageShifts, chunkSize, - directMemoryCacheAlignment); + DirectArena(PooledByteBufAllocator parent, int pageSize, int pageShifts, + int chunkSize, int directMemoryCacheAlignment) { + super(parent, pageSize, pageShifts, chunkSize, + directMemoryCacheAlignment); } @Override @@ -718,17 +608,17 @@ abstract class PoolArena implements PoolArenaMetric { } @Override - protected PoolChunk newChunk(int pageSize, int maxOrder, - int pageShifts, int chunkSize) { + protected PoolChunk newChunk(int pageSize, int maxPageIdx, + int pageShifts, int chunkSize) { if (directMemoryCacheAlignment == 0) { return new PoolChunk<>(this, - allocateDirect(chunkSize), pageSize, maxOrder, - pageShifts, chunkSize, 0); + allocateDirect(chunkSize), pageSize, pageShifts, + chunkSize, maxPageIdx, 0); } final ByteBuffer memory = allocateDirect(chunkSize + directMemoryCacheAlignment); return new PoolChunk<>(this, memory, pageSize, - maxOrder, pageShifts, chunkSize, + pageShifts, chunkSize, maxPageIdx, offsetCacheLine(memory)); } diff --git a/buffer/src/main/java/io/netty/buffer/PoolArenaMetric.java b/buffer/src/main/java/io/netty/buffer/PoolArenaMetric.java index d3281f3bee..c8ac0bdea7 100644 --- a/buffer/src/main/java/io/netty/buffer/PoolArenaMetric.java +++ b/buffer/src/main/java/io/netty/buffer/PoolArenaMetric.java @@ -21,7 +21,7 @@ import java.util.List; /** * Expose metrics for an arena. */ -public interface PoolArenaMetric { +public interface PoolArenaMetric extends SizeClassesMetric { /** * Returns the number of thread caches backed by this arena. @@ -30,7 +30,10 @@ public interface PoolArenaMetric { /** * Returns the number of tiny sub-pages for the arena. + * + * @deprecated Tiny sub-pages have been merged into small sub-pages. */ + @Deprecated int numTinySubpages(); /** @@ -45,7 +48,10 @@ public interface PoolArenaMetric { /** * Returns an unmodifiable {@link List} which holds {@link PoolSubpageMetric}s for tiny sub-pages. + * + * @deprecated Tiny sub-pages have been merged into small sub-pages. */ + @Deprecated List tinySubpages(); /** @@ -65,7 +71,10 @@ public interface PoolArenaMetric { /** * Return the number of tiny allocations done via the arena. + * + * @deprecated Tiny allocations have been merged into small allocations. */ + @Deprecated long numTinyAllocations(); /** @@ -90,7 +99,10 @@ public interface PoolArenaMetric { /** * Return the number of tiny deallocations done via the arena. + * + * @deprecated Tiny deallocations have been merged into small deallocations. */ + @Deprecated long numTinyDeallocations(); /** @@ -115,7 +127,10 @@ public interface PoolArenaMetric { /** * Return the number of currently active tiny allocations. + * + * @deprecated Tiny allocations have been merged into small allocations. */ + @Deprecated long numActiveTinyAllocations(); /** diff --git a/buffer/src/main/java/io/netty/buffer/PoolChunk.java b/buffer/src/main/java/io/netty/buffer/PoolChunk.java index 9836918726..5acd74da7e 100644 --- a/buffer/src/main/java/io/netty/buffer/PoolChunk.java +++ b/buffer/src/main/java/io/netty/buffer/PoolChunk.java @@ -13,20 +13,24 @@ * License for the specific language governing permissions and limitations * under the License. */ - package io.netty.buffer; +import io.netty.util.collection.IntObjectHashMap; +import io.netty.util.collection.IntObjectMap; + import java.nio.ByteBuffer; import java.util.ArrayDeque; import java.util.Deque; +import java.util.PriorityQueue; /** * Description of algorithm for PageRun/PoolSubpage allocation from PoolChunk * * Notation: The following terms are important to understand the code * > page - a page is the smallest unit of memory chunk that can be allocated - * > chunk - a chunk is a collection of pages - * > in this code chunkSize = 2^{maxOrder} * pageSize + * > run - a run is a collection of pages + * > chunk - a chunk is a collection of runs + * > in this code chunkSize = maxPages * pageSize * * To begin we allocate a byte array of size = chunkSize * Whenever a ByteBuf of given size needs to be created we search for the first position @@ -34,96 +38,136 @@ import java.util.Deque; * return a (long) handle that encodes this offset information, (this memory segment is then * marked as reserved so it is always used by exactly one ByteBuf and no more) * - * For simplicity all sizes are normalized according to PoolArena#normalizeCapacity method - * This ensures that when we request for memory segments of size >= pageSize the normalizedCapacity - * equals the next nearest power of 2 + * For simplicity all sizes are normalized according to {@link PoolArena#size2SizeIdx(int)} method. + * This ensures that when we request for memory segments of size > pageSize the normalizedCapacity + * equals the next nearest size in {@link SizeClasses}. * - * To search for the first offset in chunk that has at least requested size available we construct a - * complete balanced binary tree and store it in an array (just like heaps) - memoryMap * - * The tree looks like this (the size of each node being mentioned in the parenthesis) + * A chunk has the following layout: * - * depth=0 1 node (chunkSize) - * depth=1 2 nodes (chunkSize/2) - * .. - * .. - * depth=d 2^d nodes (chunkSize/2^d) - * .. - * depth=maxOrder 2^maxOrder nodes (chunkSize/2^{maxOrder} = pageSize) + * /-----------------\ + * | run | + * | | + * | | + * |-----------------| + * | run | + * | | + * |-----------------| + * | unalloctated | + * | (freed) | + * | | + * |-----------------| + * | subpage | + * |-----------------| + * | unallocated | + * | (freed) | + * | ... | + * | ... | + * | ... | + * | | + * | | + * | | + * \-----------------/ * - * depth=maxOrder is the last level and the leafs consist of pages * - * With this tree available searching in chunkArray translates like this: - * To allocate a memory segment of size chunkSize/2^k we search for the first node (from left) at height k - * which is unused + * handle: + * ------- + * a handle is a long number, the bit layout of a run looks like: + * + * oooooooo ooooooos ssssssss ssssssue bbbbbbbb bbbbbbbb bbbbbbbb bbbbbbbb + * + * o: runOffset (page offset in the chunk), 15bit + * s: size (number of pages) of this run, 15bit + * u: isUsed?, 1bit + * e: isSubpage?, 1bit + * b: bitmapIdx of subpage, zero if it's not subpage, 32bit + * + * runsAvailMap: + * ------ + * a map which manages all runs (used and not in used). + * For each run, the first runOffset and last runOffset are stored in runsAvailMap. + * key: runOffset + * value: handle + * + * runsAvail: + * ---------- + * an array of {@link PriorityQueue}. + * Each queue manages same size of runs. + * Runs are sorted by offset, so that we always allocate runs with smaller offset. + * * * Algorithm: * ---------- - * Encode the tree in memoryMap with the notation - * memoryMap[id] = x => in the subtree rooted at id, the first node that is free to be allocated - * is at depth x (counted from depth=0) i.e., at depths [depth_of_id, x), there is no node that is free * - * As we allocate & free nodes, we update values stored in memoryMap so that the property is maintained + * As we allocate runs, we update values stored in runsAvailMap and runsAvail so that the property is maintained. * * Initialization - - * In the beginning we construct the memoryMap array by storing the depth of a node at each node - * i.e., memoryMap[id] = depth_of_id + * In the beginning we store the initial run which is the whole chunk. + * The initial run: + * runOffset = 0 + * size = chunkSize + * isUsed = no + * isSubpage = no + * bitmapIdx = 0 * - * Observations: - * ------------- - * 1) memoryMap[id] = depth_of_id => it is free / unallocated - * 2) memoryMap[id] > depth_of_id => at least one of its child nodes is allocated, so we cannot allocate it, but - * some of its children can still be allocated based on their availability - * 3) memoryMap[id] = maxOrder + 1 => the node is fully allocated & thus none of its children can be allocated, it - * is thus marked as unusable - * - * Algorithm: [allocateNode(d) => we want to find the first node (from left) at height h that can be allocated] - * ---------- - * 1) start at root (i.e., depth = 0 or id = 1) - * 2) if memoryMap[1] > d => cannot be allocated from this chunk - * 3) if left node value <= h; we can allocate from left subtree so move to left and repeat until found - * 4) else try in right subtree * * Algorithm: [allocateRun(size)] * ---------- - * 1) Compute d = log_2(chunkSize/size) - * 2) Return allocateNode(d) + * 1) find the first avail run using in runsAvails according to size + * 2) if pages of run is larger than request pages then split it, and save the tailing run + * for later using * * Algorithm: [allocateSubpage(size)] * ---------- - * 1) use allocateNode(maxOrder) to find an empty (i.e., unused) leaf (i.e., page) - * 2) use this handle to construct the PoolSubpage object or if it already exists just call init(normCapacity) - * note that this PoolSubpage object is added to subpagesPool in the PoolArena when we init() it + * 1) find a not full subpage according to size. + * if it already exists just return, otherwise allocate a new PoolSubpage and call init() + * note that this subpage object is added to subpagesPool in the PoolArena when we init() it + * 2) call subpage.allocate() * - * Note: - * ----- - * In the implementation for improving cache coherence, - * we store 2 pieces of information depth_of_id and x as two byte values in memoryMap and depthMap respectively + * Algorithm: [free(handle, length, nioBuffer)] + * ---------- + * 1) if it is a subpage, return the slab back into this subpage + * 2) if the subpage is not used or it is a run, then start free this run + * 3) merge continuous avail runs + * 4) save the merged run * - * memoryMap[id]= depth_of_id is defined above - * depthMap[id]= x indicates that the first node which is free to be allocated is at depth x (from root) */ final class PoolChunk implements PoolChunkMetric { - private static final int INTEGER_SIZE_MINUS_ONE = Integer.SIZE - 1; + private static final int OFFSET_BIT_LENGTH = 15; + private static final int SIZE_BIT_LENGTH = 15; + private static final int INUSED_BIT_LENGTH = 1; + private static final int SUBPAGE_BIT_LENGTH = 1; + private static final int BITMAP_IDX_BIT_LENGTH = 32; + + static final int IS_SUBPAGE_SHIFT = BITMAP_IDX_BIT_LENGTH; + static final int IS_USED_SHIFT = SUBPAGE_BIT_LENGTH + IS_SUBPAGE_SHIFT; + static final int SIZE_SHIFT = INUSED_BIT_LENGTH + IS_USED_SHIFT; + static final int RUN_OFFSET_SHIFT = SIZE_BIT_LENGTH + SIZE_SHIFT; final PoolArena arena; final T memory; final boolean unpooled; final int offset; - private final byte[] memoryMap; - private final byte[] depthMap; + + /** + * store the first page and last page of each avail run + */ + private final IntObjectMap runsAvailMap; + + /** + * manage all avail runs + */ + private final PriorityQueue[] runsAvail; + + /** + * manage all subpages in this chunk + */ private final PoolSubpage[] subpages; - /** Used to determine if the requested capacity is equal to or greater than pageSize. */ - private final int subpageOverflowMask; + private final int pageSize; private final int pageShifts; - private final int maxOrder; private final int chunkSize; - private final int log2ChunkSize; - private final int maxSubpageAllocs; - /** Used to mark memory as unusable */ - private final byte unusable; // Use as cache for ByteBuffer created from the memory. These are just duplicates and so are only a container // around the memory itself. These are often needed for operations within the Pooled*ByteBuf and so @@ -141,38 +185,26 @@ final class PoolChunk implements PoolChunkMetric { // TODO: Test if adding padding helps under contention //private long pad0, pad1, pad2, pad3, pad4, pad5, pad6, pad7; - PoolChunk(PoolArena arena, T memory, int pageSize, int maxOrder, int pageShifts, int chunkSize, int offset) { + @SuppressWarnings("unchecked") + PoolChunk(PoolArena arena, T memory, int pageSize, int pageShifts, int chunkSize, int maxPageIdx, int offset) { unpooled = false; this.arena = arena; this.memory = memory; this.pageSize = pageSize; this.pageShifts = pageShifts; - this.maxOrder = maxOrder; this.chunkSize = chunkSize; this.offset = offset; - unusable = (byte) (maxOrder + 1); - log2ChunkSize = log2(chunkSize); - subpageOverflowMask = ~(pageSize - 1); freeBytes = chunkSize; - assert maxOrder < 30 : "maxOrder should be < 30, but is: " + maxOrder; - maxSubpageAllocs = 1 << maxOrder; + runsAvail = newRunsAvailqueueArray(maxPageIdx); + runsAvailMap = new IntObjectHashMap(); + subpages = new PoolSubpage[chunkSize >> pageShifts]; - // Generate the memory map. - memoryMap = new byte[maxSubpageAllocs << 1]; - depthMap = new byte[memoryMap.length]; - int memoryMapIndex = 1; - for (int d = 0; d <= maxOrder; ++ d) { // move down the tree one level at a time - int depth = 1 << d; - for (int p = 0; p < depth; ++ p) { - // in each level traverse left to right and set value to the depth of subtree - memoryMap[memoryMapIndex] = (byte) d; - depthMap[memoryMapIndex] = (byte) d; - memoryMapIndex ++; - } - } + //insert initial run, offset = 0, pages = chunkSize / pageSize + int pages = chunkSize >> pageShifts; + long initHandle = (long) pages << SIZE_SHIFT; + insertAvailRun(0, pages, initHandle); - subpages = newSubpageArray(maxSubpageAllocs); cachedNioBuffers = new ArrayDeque<>(8); } @@ -182,23 +214,67 @@ final class PoolChunk implements PoolChunkMetric { this.arena = arena; this.memory = memory; this.offset = offset; - memoryMap = null; - depthMap = null; - subpages = null; - subpageOverflowMask = 0; pageSize = 0; pageShifts = 0; - maxOrder = 0; - unusable = (byte) (maxOrder + 1); + runsAvailMap = null; + runsAvail = null; + subpages = null; chunkSize = size; - log2ChunkSize = log2(chunkSize); - maxSubpageAllocs = 0; cachedNioBuffers = null; } @SuppressWarnings("unchecked") - private PoolSubpage[] newSubpageArray(int size) { - return new PoolSubpage[size]; + private static PriorityQueue[] newRunsAvailqueueArray(int size) { + PriorityQueue[] queueArray = new PriorityQueue[size]; + for (int i = 0; i < queueArray.length; i++) { + queueArray[i] = new PriorityQueue(); + } + return queueArray; + } + + private void insertAvailRun(int runOffset, int pages, Long handle) { + int pageIdxFloor = arena.pages2pageIdxFloor(pages); + PriorityQueue queue = runsAvail[pageIdxFloor]; + queue.offer(handle); + + //insert first page of run + insertAvailRun0(runOffset, handle); + if (pages > 1) { + //insert last page of run + insertAvailRun0(lastPage(runOffset, pages), handle); + } + } + + private void insertAvailRun0(int runOffset, Long handle) { + Long pre = runsAvailMap.put(runOffset, handle); + assert pre == null; + } + + private void removeAvailRun(long handle) { + int pageIdxFloor = arena.pages2pageIdxFloor(runPages(handle)); + PriorityQueue queue = runsAvail[pageIdxFloor]; + removeAvailRun(queue, handle); + } + + private void removeAvailRun(PriorityQueue queue, long handle) { + queue.remove(handle); + + int runOffset = runOffset(handle); + int pages = runPages(handle); + //remove first page of run + runsAvailMap.remove(runOffset); + if (pages > 1) { + //remove last page of run + runsAvailMap.remove(lastPage(runOffset, pages)); + } + } + + private static int lastPage(int runOffset, int pages) { + return runOffset + pages - 1; + } + + private Long getAvailRunByOffset(int runOffset) { + return runsAvailMap.get(runOffset); } @Override @@ -222,256 +298,281 @@ final class PoolChunk implements PoolChunkMetric { return 100 - freePercentage; } - boolean allocate(PooledByteBuf buf, int reqCapacity, int normCapacity, PoolThreadCache threadCache) { + boolean allocate(PooledByteBuf buf, int reqCapacity, int sizeIdx, PoolThreadCache cache) { final long handle; - if ((normCapacity & subpageOverflowMask) != 0) { // >= pageSize - handle = allocateRun(normCapacity); + if (sizeIdx <= arena.smallMaxSizeIdx) { + // small + handle = allocateSubpage(sizeIdx); + if (handle < 0) { + return false; + } + assert isSubpage(handle); } else { - handle = allocateSubpage(normCapacity); + // normal + // runSize must be multiple of pageSize + int runSize = arena.sizeIdx2size(sizeIdx); + handle = allocateRun(runSize); + if (handle < 0) { + return false; + } } - if (handle < 0) { - return false; - } - ByteBuffer nioBuffer = cachedNioBuffers != null ? cachedNioBuffers.pollLast() : null; - initBuf(buf, nioBuffer, handle, reqCapacity, threadCache); + ByteBuffer nioBuffer = cachedNioBuffers != null? cachedNioBuffers.pollLast() : null; + initBuf(buf, nioBuffer, handle, reqCapacity, cache); return true; } - /** - * Update method used by allocate - * This is triggered only when a successor is allocated and all its predecessors - * need to update their state - * The minimal depth at which subtree rooted at id has some free space - * - * @param id id - */ - private void updateParentsAlloc(int id) { - while (id > 1) { - int parentId = id >>> 1; - byte val1 = value(id); - byte val2 = value(id ^ 1); - byte val = val1 < val2 ? val1 : val2; - setValue(parentId, val); - id = parentId; - } - } + private long allocateRun(int runSize) { + int pages = runSize >> pageShifts; + int pageIdx = arena.pages2pageIdx(pages); - /** - * Update method used by free - * This needs to handle the special case when both children are completely free - * in which case parent be directly allocated on request of size = child-size * 2 - * - * @param id id - */ - private void updateParentsFree(int id) { - int logChild = depth(id) + 1; - while (id > 1) { - int parentId = id >>> 1; - byte val1 = value(id); - byte val2 = value(id ^ 1); - logChild -= 1; // in first iteration equals log, subsequently reduce 1 from logChild as we traverse up - - if (val1 == logChild && val2 == logChild) { - setValue(parentId, (byte) (logChild - 1)); - } else { - byte val = val1 < val2 ? val1 : val2; - setValue(parentId, val); + synchronized (runsAvail) { + //find first queue which has at least one big enough run + int queueIdx = runFirstBestFit(pageIdx); + if (queueIdx == -1) { + return -1; } - id = parentId; + //get run with min offset in this queue + PriorityQueue queue = runsAvail[queueIdx]; + long handle = queue.poll(); + + assert !isUsed(handle); + + removeAvailRun(queue, handle); + + if (handle != -1) { + handle = splitLargeRun(handle, pages); + } + + freeBytes -= runSize(pageShifts, handle); + return handle; } } - /** - * Algorithm to allocate an index in memoryMap when we query for a free node - * at depth d - * - * @param d depth - * @return index in memoryMap - */ - private int allocateNode(int d) { - int id = 1; - int initial = - (1 << d); // has last d bits = 0 and rest all = 1 - byte val = value(id); - if (val > d) { // unusable - return -1; + private int calculateRunSize(int sizeIdx) { + int maxElements = 1 << pageShifts - SizeClasses.LOG2_QUANTUM; + int runSize = 0; + int nElements; + + final int elemSize = arena.sizeIdx2size(sizeIdx); + + //find lowest common multiple of pageSize and elemSize + do { + runSize += pageSize; + nElements = runSize / elemSize; + } while (nElements < maxElements && runSize != nElements * elemSize); + + while (nElements > maxElements) { + runSize -= pageSize; + nElements = runSize / elemSize; } - while (val < d || (id & initial) == 0) { // id & initial == 1 << d for all ids at depth d, for < d it is 0 - id <<= 1; - val = value(id); - if (val > d) { - id ^= 1; - val = value(id); + + assert nElements > 0; + assert runSize <= chunkSize; + assert runSize >= elemSize; + + return runSize; + } + + private int runFirstBestFit(int pageIdx) { + if (freeBytes == chunkSize) { + return arena.nPSizes - 1; + } + for (int i = pageIdx; i < arena.nPSizes; i++) { + PriorityQueue queue = runsAvail[i]; + if (queue != null && !queue.isEmpty()) { + return i; } } - byte value = value(id); - assert value == d && (id & initial) == 1 << d : String.format("val = %d, id & initial = %d, d = %d", - value, id & initial, d); - setValue(id, unusable); // mark as unusable - updateParentsAlloc(id); - return id; + return -1; } - /** - * Allocate a run of pages (>=1) - * - * @param normCapacity normalized capacity - * @return index in memoryMap - */ - private long allocateRun(int normCapacity) { - int d = maxOrder - (log2(normCapacity) - pageShifts); - int id = allocateNode(d); - if (id < 0) { - return id; + private long splitLargeRun(long handle, int needPages) { + assert needPages > 0; + + int totalPages = runPages(handle); + assert needPages <= totalPages; + + int remPages = totalPages - needPages; + + if (remPages > 0) { + int runOffset = runOffset(handle); + + // keep track of trailing unused pages for later use + int availOffset = runOffset + needPages; + long availRun = toRunHandle(availOffset, remPages, 0); + insertAvailRun(availOffset, remPages, availRun); + + // not avail + return toRunHandle(runOffset, needPages, 1); } - freeBytes -= runLength(id); - return id; + + //mark it as used + handle |= 1L << IS_USED_SHIFT; + return handle; } /** - * Create / initialize a new PoolSubpage of normCapacity - * Any PoolSubpage created / initialized here is added to subpage pool in the PoolArena that owns this PoolChunk + * Create / initialize a new PoolSubpage of normCapacity. Any PoolSubpage created / initialized here is added to + * subpage pool in the PoolArena that owns this PoolChunk + * + * @param sizeIdx sizeIdx of normalized size * - * @param normCapacity normalized capacity * @return index in memoryMap */ - private long allocateSubpage(int normCapacity) { + private long allocateSubpage(int sizeIdx) { // Obtain the head of the PoolSubPage pool that is owned by the PoolArena and synchronize on it. // This is need as we may add it back and so alter the linked-list structure. - PoolSubpage head = arena.findSubpagePoolHead(normCapacity); - int d = maxOrder; // subpages are only be allocated from pages i.e., leaves + PoolSubpage head = arena.findSubpagePoolHead(sizeIdx); synchronized (head) { - int id = allocateNode(d); - if (id < 0) { - return id; + //allocate a new run + int runSize = calculateRunSize(sizeIdx); + //runSize must be multiples of pageSize + long runHandle = allocateRun(runSize); + if (runHandle < 0) { + return -1; } - final PoolSubpage[] subpages = this.subpages; - final int pageSize = this.pageSize; + int runOffset = runOffset(runHandle); + int elemSize = arena.sizeIdx2size(sizeIdx); - freeBytes -= pageSize; + PoolSubpage subpage = new PoolSubpage(head, this, pageShifts, runOffset, + runSize(pageShifts, runHandle), elemSize); - int subpageIdx = subpageIdx(id); - PoolSubpage subpage = subpages[subpageIdx]; - if (subpage == null) { - subpage = new PoolSubpage<>(head, this, id, runOffset(id), pageSize, normCapacity); - subpages[subpageIdx] = subpage; - } else { - subpage.init(head, normCapacity); - } + subpages[runOffset] = subpage; return subpage.allocate(); } } /** - * Free a subpage or a run of pages - * When a subpage is freed from PoolSubpage, it might be added back to subpage pool of the owning PoolArena - * If the subpage pool in PoolArena has at least one other PoolSubpage of given elemSize, we can - * completely free the owning Page so it is available for subsequent allocations + * Free a subpage or a run of pages When a subpage is freed from PoolSubpage, it might be added back to subpage pool + * of the owning PoolArena. If the subpage pool in PoolArena has at least one other PoolSubpage of given elemSize, + * we can completely free the owning Page so it is available for subsequent allocations * * @param handle handle to free */ - void free(long handle, ByteBuffer nioBuffer) { - int memoryMapIdx = memoryMapIdx(handle); - int bitmapIdx = bitmapIdx(handle); + void free(long handle, int normCapacity, ByteBuffer nioBuffer) { + if (isSubpage(handle)) { + int sizeIdx = arena.size2SizeIdx(normCapacity); + PoolSubpage head = arena.findSubpagePoolHead(sizeIdx); - if (bitmapIdx != 0) { // free a subpage - PoolSubpage subpage = subpages[subpageIdx(memoryMapIdx)]; + PoolSubpage subpage = subpages[runOffset(handle)]; assert subpage != null && subpage.doNotDestroy; // Obtain the head of the PoolSubPage pool that is owned by the PoolArena and synchronize on it. // This is need as we may add it back and so alter the linked-list structure. - PoolSubpage head = arena.findSubpagePoolHead(subpage.elemSize); synchronized (head) { - if (subpage.free(head, bitmapIdx & 0x3FFFFFFF)) { + if (subpage.free(head, bitmapIdx(handle))) { + //the subpage is still used, do not free it return; } } } - freeBytes += runLength(memoryMapIdx); - setValue(memoryMapIdx, depth(memoryMapIdx)); - updateParentsFree(memoryMapIdx); + + //start free run + int pages = runPages(handle); + + synchronized (runsAvail) { + // collapse continuous runs, successfully collapsed runs + // will be removed from runsAvail and runsAvailMap + long finalRun = collapseRuns(handle); + + //set run as not used + finalRun &= ~(1L << IS_USED_SHIFT); + //if it is a subpage, set it to run + finalRun &= ~(1L << IS_SUBPAGE_SHIFT); + + insertAvailRun(runOffset(finalRun), runPages(finalRun), finalRun); + freeBytes += pages << pageShifts; + } if (nioBuffer != null && cachedNioBuffers != null && - cachedNioBuffers.size() < PooledByteBufAllocator.DEFAULT_MAX_CACHED_BYTEBUFFERS_PER_CHUNK) { + cachedNioBuffers.size() < PooledByteBufAllocator.DEFAULT_MAX_CACHED_BYTEBUFFERS_PER_CHUNK) { cachedNioBuffers.offer(nioBuffer); } } + private long collapseRuns(long handle) { + return collapseNext(collapsePast(handle)); + } + + private long collapsePast(long handle) { + for (;;) { + int runOffset = runOffset(handle); + int runPages = runPages(handle); + + Long pastRun = getAvailRunByOffset(runOffset - 1); + if (pastRun == null) { + return handle; + } + + int pastOffset = runOffset(pastRun); + int pastPages = runPages(pastRun); + + //is continuous + if (pastRun != handle && pastOffset + pastPages == runOffset) { + //remove past run + removeAvailRun(pastRun); + handle = toRunHandle(pastOffset, pastPages + runPages, 0); + } else { + return handle; + } + } + } + + private long collapseNext(long handle) { + for (;;) { + int runOffset = runOffset(handle); + int runPages = runPages(handle); + + Long nextRun = getAvailRunByOffset(runOffset + runPages); + if (nextRun == null) { + return handle; + } + + int nextOffset = runOffset(nextRun); + int nextPages = runPages(nextRun); + + //is continuous + if (nextRun != handle && runOffset + runPages == nextOffset) { + //remove next run + removeAvailRun(nextRun); + handle = toRunHandle(runOffset, runPages + nextPages, 0); + } else { + return handle; + } + } + } + + private static long toRunHandle(int runOffset, int runPages, int inUsed) { + return (long) runOffset << RUN_OFFSET_SHIFT + | (long) runPages << SIZE_SHIFT + | (long) inUsed << IS_USED_SHIFT; + } + void initBuf(PooledByteBuf buf, ByteBuffer nioBuffer, long handle, int reqCapacity, PoolThreadCache threadCache) { - int memoryMapIdx = memoryMapIdx(handle); - int bitmapIdx = bitmapIdx(handle); - if (bitmapIdx == 0) { - byte val = value(memoryMapIdx); - assert val == unusable : String.valueOf(val); - buf.init(this, nioBuffer, handle, runOffset(memoryMapIdx) + offset, - reqCapacity, runLength(memoryMapIdx), threadCache); + if (isRun(handle)) { + buf.init(this, nioBuffer, handle, runOffset(handle) << pageShifts, + reqCapacity, runSize(pageShifts, handle), arena.parent.threadCache()); } else { - initBufWithSubpage(buf, nioBuffer, handle, bitmapIdx, reqCapacity, threadCache); + initBufWithSubpage(buf, nioBuffer, handle, reqCapacity, threadCache); } } void initBufWithSubpage(PooledByteBuf buf, ByteBuffer nioBuffer, long handle, int reqCapacity, PoolThreadCache threadCache) { - initBufWithSubpage(buf, nioBuffer, handle, bitmapIdx(handle), reqCapacity, threadCache); - } + int runOffset = runOffset(handle); + int bitmapIdx = bitmapIdx(handle); - private void initBufWithSubpage(PooledByteBuf buf, ByteBuffer nioBuffer, - long handle, int bitmapIdx, int reqCapacity, PoolThreadCache threadCache) { - assert bitmapIdx != 0; + PoolSubpage s = subpages[runOffset]; + assert s.doNotDestroy; + assert reqCapacity <= s.elemSize; - int memoryMapIdx = memoryMapIdx(handle); - - PoolSubpage subpage = subpages[subpageIdx(memoryMapIdx)]; - assert subpage.doNotDestroy; - assert reqCapacity <= subpage.elemSize; - - buf.init( - this, nioBuffer, handle, - runOffset(memoryMapIdx) + (bitmapIdx & 0x3FFFFFFF) * subpage.elemSize + offset, - reqCapacity, subpage.elemSize, threadCache); - } - - private byte value(int id) { - return memoryMap[id]; - } - - private void setValue(int id, byte val) { - memoryMap[id] = val; - } - - private byte depth(int id) { - return depthMap[id]; - } - - private static int log2(int val) { - // compute the (0-based, with lsb = 0) position of highest set bit i.e, log2 - return INTEGER_SIZE_MINUS_ONE - Integer.numberOfLeadingZeros(val); - } - - private int runLength(int id) { - // represents the size in #bytes supported by node 'id' in the tree - return 1 << log2ChunkSize - depth(id); - } - - private int runOffset(int id) { - // represents the 0-based offset in #bytes from start of the byte-array chunk - int shift = id ^ 1 << depth(id); - return shift * runLength(id); - } - - private int subpageIdx(int memoryMapIdx) { - return memoryMapIdx ^ maxSubpageAllocs; // remove highest set bit, to get offset - } - - private static int memoryMapIdx(long handle) { - return (int) handle; - } - - private static int bitmapIdx(long handle) { - return (int) (handle >>> Integer.SIZE); + buf.init(this, nioBuffer, handle, + (runOffset << pageShifts) + bitmapIdx * s.elemSize + offset, + reqCapacity, s.elemSize, threadCache); } @Override @@ -509,4 +610,32 @@ final class PoolChunk implements PoolChunkMetric { void destroy() { arena.destroyChunk(this); } + + static int runOffset(long handle) { + return (int) (handle >> RUN_OFFSET_SHIFT); + } + + static int runSize(int pageShifts, long handle) { + return runPages(handle) << pageShifts; + } + + static int runPages(long handle) { + return (int) (handle >> SIZE_SHIFT & 0x7fff); + } + + static boolean isUsed(long handle) { + return (handle >> IS_USED_SHIFT & 1) == 1L; + } + + static boolean isRun(long handle) { + return !isSubpage(handle); + } + + static boolean isSubpage(long handle) { + return (handle >> IS_SUBPAGE_SHIFT & 1) == 1L; + } + + static int bitmapIdx(long handle) { + return (int) handle; + } } diff --git a/buffer/src/main/java/io/netty/buffer/PoolChunkList.java b/buffer/src/main/java/io/netty/buffer/PoolChunkList.java index 08b77aeee3..5cb2854ffd 100644 --- a/buffer/src/main/java/io/netty/buffer/PoolChunkList.java +++ b/buffer/src/main/java/io/netty/buffer/PoolChunkList.java @@ -96,7 +96,8 @@ final class PoolChunkList implements PoolChunkListMetric { this.prevList = prevList; } - boolean allocate(PooledByteBuf buf, int reqCapacity, int normCapacity, PoolThreadCache threadCache) { + boolean allocate(PooledByteBuf buf, int reqCapacity, int sizeIdx, PoolThreadCache threadCache) { + int normCapacity = arena.sizeIdx2size(sizeIdx); if (normCapacity > maxCapacity) { // Either this PoolChunkList is empty or the requested capacity is larger then the capacity which can // be handled by the PoolChunks that are contained in this PoolChunkList. @@ -104,7 +105,7 @@ final class PoolChunkList implements PoolChunkListMetric { } for (PoolChunk cur = head; cur != null; cur = cur.next) { - if (cur.allocate(buf, reqCapacity, normCapacity, threadCache)) { + if (cur.allocate(buf, reqCapacity, sizeIdx, threadCache)) { if (cur.freeBytes <= freeMinThreshold) { remove(cur); nextList.add(cur); @@ -115,8 +116,8 @@ final class PoolChunkList implements PoolChunkListMetric { return false; } - boolean free(PoolChunk chunk, long handle, ByteBuffer nioBuffer) { - chunk.free(handle, nioBuffer); + boolean free(PoolChunk chunk, long handle, int normCapacity, ByteBuffer nioBuffer) { + chunk.free(handle, normCapacity, nioBuffer); if (chunk.freeBytes > freeMaxThreshold) { remove(chunk); // Move the PoolChunk down the PoolChunkList linked-list. diff --git a/buffer/src/main/java/io/netty/buffer/PoolSubpage.java b/buffer/src/main/java/io/netty/buffer/PoolSubpage.java index 328aae646d..4a5c875b4c 100644 --- a/buffer/src/main/java/io/netty/buffer/PoolSubpage.java +++ b/buffer/src/main/java/io/netty/buffer/PoolSubpage.java @@ -16,12 +16,18 @@ package io.netty.buffer; +import static io.netty.buffer.PoolChunk.RUN_OFFSET_SHIFT; +import static io.netty.buffer.PoolChunk.SIZE_SHIFT; +import static io.netty.buffer.PoolChunk.IS_USED_SHIFT; +import static io.netty.buffer.PoolChunk.IS_SUBPAGE_SHIFT; +import static io.netty.buffer.SizeClasses.LOG2_QUANTUM; + final class PoolSubpage implements PoolSubpageMetric { final PoolChunk chunk; - private final int memoryMapIdx; + private final int pageShifts; private final int runOffset; - private final int pageSize; + private final int runSize; private final long[] bitmap; PoolSubpage prev; @@ -38,29 +44,29 @@ final class PoolSubpage implements PoolSubpageMetric { //private long pad0, pad1, pad2, pad3, pad4, pad5, pad6, pad7; /** Special constructor that creates a linked list head */ - PoolSubpage(int pageSize) { + PoolSubpage() { chunk = null; - memoryMapIdx = -1; + pageShifts = -1; runOffset = -1; elemSize = -1; - this.pageSize = pageSize; + runSize = -1; bitmap = null; } - PoolSubpage(PoolSubpage head, PoolChunk chunk, int memoryMapIdx, int runOffset, int pageSize, int elemSize) { + PoolSubpage(PoolSubpage head, PoolChunk chunk, int pageShifts, int runOffset, int runSize, int elemSize) { this.chunk = chunk; - this.memoryMapIdx = memoryMapIdx; + this.pageShifts = pageShifts; this.runOffset = runOffset; - this.pageSize = pageSize; - bitmap = new long[pageSize >>> 10]; // pageSize / 16 / 64 + this.runSize = runSize; + this.elemSize = elemSize; + bitmap = new long[runSize >>> 6 + LOG2_QUANTUM]; // runSize / 64 / QUANTUM init(head, elemSize); } void init(PoolSubpage head, int elemSize) { doNotDestroy = true; - this.elemSize = elemSize; if (elemSize != 0) { - maxNumElems = numAvail = pageSize / elemSize; + maxNumElems = numAvail = runSize / elemSize; nextAvail = 0; bitmapLength = maxNumElems >>> 6; if ((maxNumElems & 63) != 0) { @@ -78,10 +84,6 @@ final class PoolSubpage implements PoolSubpageMetric { * Returns the bitmap index of the subpage allocation. */ long allocate() { - if (elemSize == 0) { - return toHandle(0); - } - if (numAvail == 0 || !doNotDestroy) { return -1; } @@ -195,7 +197,12 @@ final class PoolSubpage implements PoolSubpageMetric { } private long toHandle(int bitmapIdx) { - return 0x4000000000000000L | (long) bitmapIdx << 32 | memoryMapIdx; + int pages = runSize >> pageShifts; + return (long) runOffset << RUN_OFFSET_SHIFT + | (long) pages << SIZE_SHIFT + | 1L << IS_USED_SHIFT + | 1L << IS_SUBPAGE_SHIFT + | bitmapIdx; } @Override @@ -226,11 +233,11 @@ final class PoolSubpage implements PoolSubpageMetric { } if (!doNotDestroy) { - return "(" + memoryMapIdx + ": not in use)"; + return "(" + runOffset + ": not in use)"; } - return "(" + memoryMapIdx + ": " + (maxNumElems - numAvail) + '/' + maxNumElems + - ", offset: " + runOffset + ", length: " + pageSize + ", elemSize: " + elemSize + ')'; + return "(" + runOffset + ": " + (maxNumElems - numAvail) + '/' + maxNumElems + + ", offset: " + runOffset + ", length: " + runSize + ", elemSize: " + elemSize + ')'; } @Override @@ -271,7 +278,7 @@ final class PoolSubpage implements PoolSubpageMetric { @Override public int pageSize() { - return pageSize; + return 1 << pageShifts; } void destroy() { diff --git a/buffer/src/main/java/io/netty/buffer/PoolSubpageMetric.java b/buffer/src/main/java/io/netty/buffer/PoolSubpageMetric.java index d674767462..ec0e4d6310 100644 --- a/buffer/src/main/java/io/netty/buffer/PoolSubpageMetric.java +++ b/buffer/src/main/java/io/netty/buffer/PoolSubpageMetric.java @@ -36,7 +36,7 @@ public interface PoolSubpageMetric { int elementSize(); /** - * Return the size (in bytes) of this page. + * Return the page size (in bytes) of this page. */ int pageSize(); } diff --git a/buffer/src/main/java/io/netty/buffer/PoolThreadCache.java b/buffer/src/main/java/io/netty/buffer/PoolThreadCache.java index 5cf224051a..cd1806f9f9 100644 --- a/buffer/src/main/java/io/netty/buffer/PoolThreadCache.java +++ b/buffer/src/main/java/io/netty/buffer/PoolThreadCache.java @@ -47,9 +47,7 @@ final class PoolThreadCache { final PoolArena directArena; // Hold the caches for the different size classes, which are tiny, small and normal. - private final MemoryRegionCache[] tinySubPageHeapCaches; private final MemoryRegionCache[] smallSubPageHeapCaches; - private final MemoryRegionCache[] tinySubPageDirectCaches; private final MemoryRegionCache[] smallSubPageDirectCaches; private final MemoryRegionCache[] normalHeapCaches; private final MemoryRegionCache[] normalDirectCaches; @@ -66,17 +64,15 @@ final class PoolThreadCache { //private long pad0, pad1, pad2, pad3, pad4, pad5, pad6, pad7; PoolThreadCache(PoolArena heapArena, PoolArena directArena, - int tinyCacheSize, int smallCacheSize, int normalCacheSize, - int maxCachedBufferCapacity, int freeSweepAllocationThreshold) { + int smallCacheSize, int normalCacheSize, int maxCachedBufferCapacity, + int freeSweepAllocationThreshold) { checkPositiveOrZero(maxCachedBufferCapacity, "maxCachedBufferCapacity"); this.freeSweepAllocationThreshold = freeSweepAllocationThreshold; this.heapArena = heapArena; this.directArena = directArena; if (directArena != null) { - tinySubPageDirectCaches = createSubPageCaches( - tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny); smallSubPageDirectCaches = createSubPageCaches( - smallCacheSize, directArena.numSmallSubpagePools, SizeClass.Small); + smallCacheSize, directArena.numSmallSubpagePools); numShiftsNormalDirect = log2(directArena.pageSize); normalDirectCaches = createNormalCaches( @@ -85,17 +81,14 @@ final class PoolThreadCache { directArena.numThreadCaches.getAndIncrement(); } else { // No directArea is configured so just null out all caches - tinySubPageDirectCaches = null; smallSubPageDirectCaches = null; normalDirectCaches = null; numShiftsNormalDirect = -1; } if (heapArena != null) { // Create the caches for the heap allocations - tinySubPageHeapCaches = createSubPageCaches( - tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny); smallSubPageHeapCaches = createSubPageCaches( - smallCacheSize, heapArena.numSmallSubpagePools, SizeClass.Small); + smallCacheSize, heapArena.numSmallSubpagePools); numShiftsNormalHeap = log2(heapArena.pageSize); normalHeapCaches = createNormalCaches( @@ -104,15 +97,14 @@ final class PoolThreadCache { heapArena.numThreadCaches.getAndIncrement(); } else { // No heapArea is configured so just null out all caches - tinySubPageHeapCaches = null; smallSubPageHeapCaches = null; normalHeapCaches = null; numShiftsNormalHeap = -1; } // Only check if there are caches in use. - if ((tinySubPageDirectCaches != null || smallSubPageDirectCaches != null || normalDirectCaches != null - || tinySubPageHeapCaches != null || smallSubPageHeapCaches != null || normalHeapCaches != null) + if ((smallSubPageDirectCaches != null || normalDirectCaches != null + || smallSubPageHeapCaches != null || normalHeapCaches != null) && freeSweepAllocationThreshold < 1) { throw new IllegalArgumentException("freeSweepAllocationThreshold: " + freeSweepAllocationThreshold + " (expected: > 0)"); @@ -120,13 +112,13 @@ final class PoolThreadCache { } private static MemoryRegionCache[] createSubPageCaches( - int cacheSize, int numCaches, SizeClass sizeClass) { + int cacheSize, int numCaches) { if (cacheSize > 0 && numCaches > 0) { @SuppressWarnings("unchecked") MemoryRegionCache[] cache = new MemoryRegionCache[numCaches]; for (int i = 0; i < cache.length; i++) { // TODO: maybe use cacheSize / cache.length - cache[i] = new SubPageMemoryRegionCache<>(cacheSize, sizeClass); + cache[i] = new SubPageMemoryRegionCache<>(cacheSize); } return cache; } else { @@ -152,22 +144,15 @@ final class PoolThreadCache { } // val > 0 - private static int log2(int val) { + static int log2(int val) { return INTEGER_SIZE_MINUS_ONE - Integer.numberOfLeadingZeros(val); } - /** - * Try to allocate a tiny buffer out of the cache. Returns {@code true} if successful {@code false} otherwise - */ - boolean allocateTiny(PoolArena area, PooledByteBuf buf, int reqCapacity, int normCapacity) { - return allocate(cacheForTiny(area, normCapacity), buf, reqCapacity); - } - /** * Try to allocate a small buffer out of the cache. Returns {@code true} if successful {@code false} otherwise */ - boolean allocateSmall(PoolArena area, PooledByteBuf buf, int reqCapacity, int normCapacity) { - return allocate(cacheForSmall(area, normCapacity), buf, reqCapacity); + boolean allocateSmall(PoolArena area, PooledByteBuf buf, int reqCapacity, int sizeIdx) { + return allocate(cacheForSmall(area, sizeIdx), buf, reqCapacity); } /** @@ -198,21 +183,20 @@ final class PoolThreadCache { @SuppressWarnings({ "unchecked", "rawtypes" }) boolean add(PoolArena area, PoolChunk chunk, ByteBuffer nioBuffer, long handle, int normCapacity, SizeClass sizeClass) { - MemoryRegionCache cache = cache(area, normCapacity, sizeClass); + int sizeIdx = area.size2SizeIdx(normCapacity); + MemoryRegionCache cache = cache(area, sizeIdx, sizeClass); if (cache == null) { return false; } - return cache.add(chunk, nioBuffer, handle); + return cache.add(chunk, nioBuffer, handle, normCapacity); } - private MemoryRegionCache cache(PoolArena area, int normCapacity, SizeClass sizeClass) { + private MemoryRegionCache cache(PoolArena area, int sizeIdx, SizeClass sizeClass) { switch (sizeClass) { case Normal: - return cacheForNormal(area, normCapacity); + return cacheForNormal(area, sizeIdx); case Small: - return cacheForSmall(area, normCapacity); - case Tiny: - return cacheForTiny(area, normCapacity); + return cacheForSmall(area, sizeIdx); default: throw new Error(); } @@ -235,10 +219,8 @@ final class PoolThreadCache { // As free() may be called either by the finalizer or by FastThreadLocal.onRemoval(...) we need to ensure // we only call this one time. if (freed.compareAndSet(false, true)) { - int numFreed = free(tinySubPageDirectCaches, finalizer) + - free(smallSubPageDirectCaches, finalizer) + + int numFreed = free(smallSubPageDirectCaches, finalizer) + free(normalDirectCaches, finalizer) + - free(tinySubPageHeapCaches, finalizer) + free(smallSubPageHeapCaches, finalizer) + free(normalHeapCaches, finalizer); @@ -277,10 +259,8 @@ final class PoolThreadCache { } void trim() { - trim(tinySubPageDirectCaches); trim(smallSubPageDirectCaches); trim(normalDirectCaches); - trim(tinySubPageHeapCaches); trim(smallSubPageHeapCaches); trim(normalHeapCaches); } @@ -301,45 +281,33 @@ final class PoolThreadCache { cache.trim(); } - private MemoryRegionCache cacheForTiny(PoolArena area, int normCapacity) { - int idx = PoolArena.tinyIdx(normCapacity); + private MemoryRegionCache cacheForSmall(PoolArena area, int sizeIdx) { if (area.isDirect()) { - return cache(tinySubPageDirectCaches, idx); + return cache(smallSubPageDirectCaches, sizeIdx); } - return cache(tinySubPageHeapCaches, idx); + return cache(smallSubPageHeapCaches, sizeIdx); } - private MemoryRegionCache cacheForSmall(PoolArena area, int normCapacity) { - int idx = PoolArena.smallIdx(normCapacity); + private MemoryRegionCache cacheForNormal(PoolArena area, int sizeIdx) { if (area.isDirect()) { - return cache(smallSubPageDirectCaches, idx); + return cache(normalDirectCaches, sizeIdx); } - return cache(smallSubPageHeapCaches, idx); + return cache(normalHeapCaches, sizeIdx); } - private MemoryRegionCache cacheForNormal(PoolArena area, int normCapacity) { - if (area.isDirect()) { - // sizeClass == Normal => normCapacity >= pageSize => the shifted value > 0 - int idx = log2(normCapacity >> numShiftsNormalDirect); - return cache(normalDirectCaches, idx); - } - int idx = log2(normCapacity >> numShiftsNormalHeap); - return cache(normalHeapCaches, idx); - } - - private static MemoryRegionCache cache(MemoryRegionCache[] cache, int idx) { - if (cache == null || idx > cache.length - 1) { + private static MemoryRegionCache cache(MemoryRegionCache[] cache, int sizeIdx) { + if (cache == null || sizeIdx > cache.length - 1) { return null; } - return cache[idx]; + return cache[sizeIdx]; } /** * Cache used for buffers which are backed by TINY or SMALL size. */ private static final class SubPageMemoryRegionCache extends MemoryRegionCache { - SubPageMemoryRegionCache(int size, SizeClass sizeClass) { - super(size, sizeClass); + SubPageMemoryRegionCache(int size) { + super(size, SizeClass.Small); } @Override @@ -388,8 +356,8 @@ final class PoolThreadCache { * Add to cache if not already full. */ @SuppressWarnings("unchecked") - public final boolean add(PoolChunk chunk, ByteBuffer nioBuffer, long handle) { - Entry entry = newEntry(chunk, nioBuffer, handle); + public final boolean add(PoolChunk chunk, ByteBuffer nioBuffer, long handle, int normCapacity) { + Entry entry = newEntry(chunk, nioBuffer, handle, normCapacity); boolean queued = queue.offer(entry); if (!queued) { // If it was not possible to cache the chunk, immediately recycle the entry @@ -461,7 +429,7 @@ final class PoolThreadCache { entry.recycle(); } - chunk.arena.freeChunk(chunk, handle, sizeClass, nioBuffer, finalizer); + chunk.arena.freeChunk(chunk, handle, entry.normCapacity, sizeClass, nioBuffer, finalizer); } static final class Entry { @@ -469,6 +437,7 @@ final class PoolThreadCache { PoolChunk chunk; ByteBuffer nioBuffer; long handle = -1; + int normCapacity; Entry(Handle> recyclerHandle) { this.recyclerHandle = recyclerHandle; @@ -483,11 +452,12 @@ final class PoolThreadCache { } @SuppressWarnings("rawtypes") - private static Entry newEntry(PoolChunk chunk, ByteBuffer nioBuffer, long handle) { + private static Entry newEntry(PoolChunk chunk, ByteBuffer nioBuffer, long handle, int normCapacity) { Entry entry = RECYCLER.get(); entry.chunk = chunk; entry.nioBuffer = nioBuffer; entry.handle = handle; + entry.normCapacity = normCapacity; return entry; } diff --git a/buffer/src/main/java/io/netty/buffer/PooledByteBufAllocator.java b/buffer/src/main/java/io/netty/buffer/PooledByteBufAllocator.java index 31c0dce2ed..e8ac36626e 100644 --- a/buffer/src/main/java/io/netty/buffer/PooledByteBufAllocator.java +++ b/buffer/src/main/java/io/netty/buffer/PooledByteBufAllocator.java @@ -43,7 +43,6 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator implements private static final int DEFAULT_PAGE_SIZE; private static final int DEFAULT_MAX_ORDER; // 8192 << 11 = 16 MiB per chunk - private static final int DEFAULT_TINY_CACHE_SIZE; private static final int DEFAULT_SMALL_CACHE_SIZE; private static final int DEFAULT_NORMAL_CACHE_SIZE; private static final int DEFAULT_MAX_CACHED_BUFFER_CAPACITY; @@ -106,7 +105,6 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator implements PlatformDependent.maxDirectMemory() / defaultChunkSize / 2 / 3))); // cache sizes - DEFAULT_TINY_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.tinyCacheSize", 512); DEFAULT_SMALL_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.smallCacheSize", 256); DEFAULT_NORMAL_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.normalCacheSize", 64); @@ -147,7 +145,6 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator implements logger.debug("-Dio.netty.allocator.maxOrder: {}", DEFAULT_MAX_ORDER, maxOrderFallbackCause); } logger.debug("-Dio.netty.allocator.chunkSize: {}", DEFAULT_PAGE_SIZE << DEFAULT_MAX_ORDER); - logger.debug("-Dio.netty.allocator.tinyCacheSize: {}", DEFAULT_TINY_CACHE_SIZE); logger.debug("-Dio.netty.allocator.smallCacheSize: {}", DEFAULT_SMALL_CACHE_SIZE); logger.debug("-Dio.netty.allocator.normalCacheSize: {}", DEFAULT_NORMAL_CACHE_SIZE); logger.debug("-Dio.netty.allocator.maxCachedBufferCapacity: {}", DEFAULT_MAX_CACHED_BUFFER_CAPACITY); @@ -164,7 +161,6 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator implements private final PoolArena[] heapArenas; private final PoolArena[] directArenas; - private final int tinyCacheSize; private final int smallCacheSize; private final int normalCacheSize; private final List heapArenaMetrics; @@ -189,40 +185,66 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator implements /** * @deprecated use - * {@link PooledByteBufAllocator#PooledByteBufAllocator(boolean, int, int, int, int, int, int, int, boolean)} + * {@link PooledByteBufAllocator#PooledByteBufAllocator(boolean, int, int, int, int, int, int, boolean)} */ @Deprecated public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder) { this(preferDirect, nHeapArena, nDirectArena, pageSize, maxOrder, - DEFAULT_TINY_CACHE_SIZE, DEFAULT_SMALL_CACHE_SIZE, DEFAULT_NORMAL_CACHE_SIZE); + 0, DEFAULT_SMALL_CACHE_SIZE, DEFAULT_NORMAL_CACHE_SIZE); } /** * @deprecated use - * {@link PooledByteBufAllocator#PooledByteBufAllocator(boolean, int, int, int, int, int, int, int, boolean)} + * {@link PooledByteBufAllocator#PooledByteBufAllocator(boolean, int, int, int, int, int, int, boolean)} */ @Deprecated public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder, int tinyCacheSize, int smallCacheSize, int normalCacheSize) { - this(preferDirect, nHeapArena, nDirectArena, pageSize, maxOrder, tinyCacheSize, smallCacheSize, - normalCacheSize, DEFAULT_USE_CACHE_FOR_ALL_THREADS, DEFAULT_DIRECT_MEMORY_CACHE_ALIGNMENT); + this(preferDirect, nHeapArena, nDirectArena, pageSize, maxOrder, smallCacheSize, + normalCacheSize, DEFAULT_USE_CACHE_FOR_ALL_THREADS, DEFAULT_DIRECT_MEMORY_CACHE_ALIGNMENT); } + /** + * @deprecated use + * {@link PooledByteBufAllocator#PooledByteBufAllocator(boolean, int, int, int, int, int, int, boolean)} + */ + @Deprecated public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder, int tinyCacheSize, int smallCacheSize, int normalCacheSize, boolean useCacheForAllThreads) { this(preferDirect, nHeapArena, nDirectArena, pageSize, maxOrder, - tinyCacheSize, smallCacheSize, normalCacheSize, - useCacheForAllThreads, DEFAULT_DIRECT_MEMORY_CACHE_ALIGNMENT); + smallCacheSize, normalCacheSize, + useCacheForAllThreads); } + public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, + int nDirectArena, int pageSize, int maxOrder, + int smallCacheSize, int normalCacheSize, + boolean useCacheForAllThreads) { + this(preferDirect, nHeapArena, nDirectArena, pageSize, maxOrder, + smallCacheSize, normalCacheSize, + useCacheForAllThreads, DEFAULT_DIRECT_MEMORY_CACHE_ALIGNMENT); + } + + /** + * @deprecated use + * {@link PooledByteBufAllocator#PooledByteBufAllocator(boolean, int, int, int, int, int, int, boolean, int)} + */ + @Deprecated public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder, int tinyCacheSize, int smallCacheSize, int normalCacheSize, boolean useCacheForAllThreads, int directMemoryCacheAlignment) { + this(preferDirect, nHeapArena, nDirectArena, pageSize, maxOrder, + smallCacheSize, normalCacheSize, + useCacheForAllThreads, directMemoryCacheAlignment); + } + + public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder, + int smallCacheSize, int normalCacheSize, + boolean useCacheForAllThreads, int directMemoryCacheAlignment) { super(preferDirect); threadCache = new PoolThreadLocalCache(useCacheForAllThreads); - this.tinyCacheSize = tinyCacheSize; this.smallCacheSize = smallCacheSize; this.normalCacheSize = normalCacheSize; chunkSize = validateAndCalculateChunkSize(pageSize, maxOrder); @@ -247,7 +269,7 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator implements List metrics = new ArrayList<>(heapArenas.length); for (int i = 0; i < heapArenas.length; i ++) { PoolArena.HeapArena arena = new PoolArena.HeapArena(this, - pageSize, maxOrder, pageShifts, chunkSize, + pageSize, pageShifts, chunkSize, directMemoryCacheAlignment); heapArenas[i] = arena; metrics.add(arena); @@ -263,7 +285,7 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator implements List metrics = new ArrayList<>(directArenas.length); for (int i = 0; i < directArenas.length; i ++) { PoolArena.DirectArena arena = new PoolArena.DirectArena( - this, pageSize, maxOrder, pageShifts, chunkSize, directMemoryCacheAlignment); + this, pageSize, pageShifts, chunkSize, directMemoryCacheAlignment); directArenas[i] = arena; metrics.add(arena); } @@ -387,10 +409,13 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator implements } /** - * Default tiny cache size - System Property: io.netty.allocator.tinyCacheSize - default 512 + * Default tiny cache size - default 0 + * + * @deprecated Tiny caches have been merged into small caches. */ + @Deprecated public static int defaultTinyCacheSize() { - return DEFAULT_TINY_CACHE_SIZE; + return 0; } /** @@ -451,7 +476,7 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator implements final Thread current = Thread.currentThread(); if (useCacheForAllThreads || current instanceof FastThreadLocalThread) { final PoolThreadCache cache = new PoolThreadCache( - heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize, + heapArena, directArena, smallCacheSize, normalCacheSize, DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL); if (DEFAULT_CACHE_TRIM_INTERVAL_MILLIS > 0) { @@ -464,7 +489,7 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator implements return cache; } // No caching so just use 0 as sizes. - return new PoolThreadCache(heapArena, directArena, 0, 0, 0, 0, 0); + return new PoolThreadCache(heapArena, directArena, 0, 0, 0, 0); } @Override @@ -561,7 +586,7 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator implements */ @Deprecated public int tinyCacheSize() { - return tinyCacheSize; + return 0; } /** diff --git a/buffer/src/main/java/io/netty/buffer/PooledByteBufAllocatorMetric.java b/buffer/src/main/java/io/netty/buffer/PooledByteBufAllocatorMetric.java index f9391f6edb..5ac7b6b92d 100644 --- a/buffer/src/main/java/io/netty/buffer/PooledByteBufAllocatorMetric.java +++ b/buffer/src/main/java/io/netty/buffer/PooledByteBufAllocatorMetric.java @@ -68,7 +68,10 @@ public final class PooledByteBufAllocatorMetric implements ByteBufAllocatorMetri /** * Return the size of the tiny cache. + * + * @deprecated Tiny caches have been merged into small caches. */ + @Deprecated public int tinyCacheSize() { return allocator.tinyCacheSize(); } @@ -112,7 +115,6 @@ public final class PooledByteBufAllocatorMetric implements ByteBufAllocatorMetri .append("; usedDirectMemory: ").append(usedDirectMemory()) .append("; numHeapArenas: ").append(numHeapArenas()) .append("; numDirectArenas: ").append(numDirectArenas()) - .append("; tinyCacheSize: ").append(tinyCacheSize()) .append("; smallCacheSize: ").append(smallCacheSize()) .append("; normalCacheSize: ").append(normalCacheSize()) .append("; numThreadLocalCaches: ").append(numThreadLocalCaches()) diff --git a/buffer/src/main/java/io/netty/buffer/SizeClasses.java b/buffer/src/main/java/io/netty/buffer/SizeClasses.java new file mode 100644 index 0000000000..2b426eff56 --- /dev/null +++ b/buffer/src/main/java/io/netty/buffer/SizeClasses.java @@ -0,0 +1,407 @@ +/* + * Copyright 2020 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.buffer; + +import static io.netty.buffer.PoolThreadCache.*; + +/** + * SizeClasses requires {@code pageShifts} to be defined prior to inclusion, + * and it in turn defines: + *

+ * LOG2_SIZE_CLASS_GROUP: Log of size class count for each size doubling. + * LOG2_MAX_LOOKUP_SIZE: Log of max size class in the lookup table. + * sizeClasses: Complete table of [index, log2Group, log2Delta, nDelta, isMultiPageSize, + * isSubPage, log2DeltaLookup] tuples. + * index: Size class index. + * log2Group: Log of group base size (no deltas added). + * log2Delta: Log of delta to previous size class. + * nDelta: Delta multiplier. + * isMultiPageSize: 'yes' if a multiple of the page size, 'no' otherwise. + * isSubPage: 'yes' if a subpage size class, 'no' otherwise. + * log2DeltaLookup: Same as log2Delta if a lookup table size class, 'no' + * otherwise. + *

+ * nSubpages: Number of subpages size classes. + * nSizes: Number of size classes. + * nPSizes: Number of size classes that are multiples of pageSize. + * + * smallMaxSizeIdx: Maximum small size class index. + * + * lookupMaxclass: Maximum size class included in lookup table. + * log2NormalMinClass: Log of minimum normal size class. + *

+ * The first size class and spacing are 1 << LOG2_QUANTUM. + * Each group has 1 << LOG2_SIZE_CLASS_GROUP of size classes. + * + * size = 1 << log2Group + nDelta * (1 << log2Delta) + * + * The first size class has an unusual encoding, because the size has to be + * split between group and delta*nDelta. + * + * If pageShift = 13, sizeClasses looks like this: + * + * (index, log2Group, log2Delta, nDelta, isMultiPageSize, isSubPage, log2DeltaLookup) + *

+ * ( 0, 4, 4, 0, no, yes, 4) + * ( 1, 4, 4, 1, no, yes, 4) + * ( 2, 4, 4, 2, no, yes, 4) + * ( 3, 4, 4, 3, no, yes, 4) + *

+ * ( 4, 6, 4, 1, no, yes, 4) + * ( 5, 6, 4, 2, no, yes, 4) + * ( 6, 6, 4, 3, no, yes, 4) + * ( 7, 6, 4, 4, no, yes, 4) + *

+ * ( 8, 7, 5, 1, no, yes, 5) + * ( 9, 7, 5, 2, no, yes, 5) + * ( 10, 7, 5, 3, no, yes, 5) + * ( 11, 7, 5, 4, no, yes, 5) + * ... + * ... + * ( 72, 23, 21, 1, yes, no, no) + * ( 73, 23, 21, 2, yes, no, no) + * ( 74, 23, 21, 3, yes, no, no) + * ( 75, 23, 21, 4, yes, no, no) + *

+ * ( 76, 24, 22, 1, yes, no, no) + */ +abstract class SizeClasses implements SizeClassesMetric { + + static final int LOG2_QUANTUM = 4; + + private static final int LOG2_SIZE_CLASS_GROUP = 2; + private static final int LOG2_MAX_LOOKUP_SIZE = 12; + + private static final int INDEX_IDX = 0; + private static final int LOG2GROUP_IDX = 1; + private static final int LOG2DELTA_IDX = 2; + private static final int NDELTA_IDX = 3; + private static final int PAGESIZE_IDX = 4; + private static final int SUBPAGE_IDX = 5; + private static final int LOG2_DELTA_LOOKUP_IDX = 6; + + private static final byte no = 0, yes = 1; + + protected SizeClasses(int pageSize, int pageShifts, int chunkSize, int directMemoryCacheAlignment) { + this.pageSize = pageSize; + this.pageShifts = pageShifts; + this.chunkSize = chunkSize; + this.directMemoryCacheAlignment = directMemoryCacheAlignment; + + int group = log2(chunkSize) + 1 - LOG2_QUANTUM; + + //generate size classes + //[index, log2Group, log2Delta, nDelta, isMultiPageSize, isSubPage, log2DeltaLookup] + sizeClasses = new short[group << LOG2_SIZE_CLASS_GROUP][7]; + nSizes = sizeClasses(); + + //generate lookup table + sizeIdx2sizeTab = new int[nSizes]; + pageIdx2sizeTab = new int[nPSizes]; + idx2SizeTab(sizeIdx2sizeTab, pageIdx2sizeTab); + + size2idxTab = new int[lookupMaxSize >> LOG2_QUANTUM]; + size2idxTab(size2idxTab); + } + + protected final int pageSize; + protected final int pageShifts; + protected final int chunkSize; + protected final int directMemoryCacheAlignment; + + final int nSizes; + int nSubpages; + int nPSizes; + + int smallMaxSizeIdx; + + private int lookupMaxSize; + + private final short[][] sizeClasses; + + private final int[] pageIdx2sizeTab; + + // lookup table for sizeIdx <= smallMaxSizeIdx + private final int[] sizeIdx2sizeTab; + + // lookup table used for size <= lookupMaxclass + // spacing is 1 << LOG2_QUANTUM, so the size of array is lookupMaxclass >> LOG2_QUANTUM + private final int[] size2idxTab; + + private int sizeClasses() { + int normalMaxSize = -1; + + int index = 0; + int size = 0; + + int log2Group = LOG2_QUANTUM; + int log2Delta = LOG2_QUANTUM; + int ndeltaLimit = 1 << LOG2_SIZE_CLASS_GROUP; + + //First small group, nDelta start at 0. + //first size class is 1 << LOG2_QUANTUM + int nDelta = 0; + while (nDelta < ndeltaLimit) { + size = sizeClass(index++, log2Group, log2Delta, nDelta++); + } + log2Group += LOG2_SIZE_CLASS_GROUP; + + //All remaining groups, nDelta start at 1. + while (size < chunkSize) { + nDelta = 1; + + while (nDelta <= ndeltaLimit && size < chunkSize) { + size = sizeClass(index++, log2Group, log2Delta, nDelta++); + normalMaxSize = size; + } + + log2Group++; + log2Delta++; + } + + //chunkSize must be normalMaxSize + assert chunkSize == normalMaxSize; + + //return number of size index + return index; + } + + //calculate size class + private int sizeClass(int index, int log2Group, int log2Delta, int nDelta) { + short isMultiPageSize; + if (log2Delta >= pageShifts) { + isMultiPageSize = yes; + } else { + int pageSize = 1 << pageShifts; + int size = (1 << log2Group) + (1 << log2Delta) * nDelta; + + isMultiPageSize = size == size / pageSize * pageSize? yes : no; + } + + int log2Ndelta = nDelta == 0? 0 : log2(nDelta); + + byte remove = 1 << log2Ndelta < nDelta? yes : no; + + int log2Size = log2Delta + log2Ndelta == log2Group? log2Group + 1 : log2Group; + if (log2Size == log2Group) { + remove = yes; + } + + short isSubpage = log2Size < pageShifts + LOG2_SIZE_CLASS_GROUP? yes : no; + + int log2DeltaLookup = log2Size < LOG2_MAX_LOOKUP_SIZE || + log2Size == LOG2_MAX_LOOKUP_SIZE && remove == no + ? log2Delta : no; + + short[] sz = { + (short) index, (short) log2Group, (short) log2Delta, + (short) nDelta, isMultiPageSize, isSubpage, (short) log2DeltaLookup + }; + + sizeClasses[index] = sz; + int size = (1 << log2Group) + (nDelta << log2Delta); + + if (sz[PAGESIZE_IDX] == yes) { + nPSizes++; + } + if (sz[SUBPAGE_IDX] == yes) { + nSubpages++; + smallMaxSizeIdx = index; + } + if (sz[LOG2_DELTA_LOOKUP_IDX] != no) { + lookupMaxSize = size; + } + return size; + } + + private void idx2SizeTab(int[] sizeIdx2sizeTab, int[] pageIdx2sizeTab) { + int pageIdx = 0; + + for (int i = 0; i < nSizes; i++) { + short[] sizeClass = sizeClasses[i]; + int log2Group = sizeClass[LOG2GROUP_IDX]; + int log2Delta = sizeClass[LOG2DELTA_IDX]; + int nDelta = sizeClass[NDELTA_IDX]; + + int size = (1 << log2Group) + (nDelta << log2Delta); + sizeIdx2sizeTab[i] = size; + + if (sizeClass[PAGESIZE_IDX] == yes) { + pageIdx2sizeTab[pageIdx++] = size; + } + } + } + + private void size2idxTab(int[] size2idxTab) { + int idx = 0; + int size = 0; + + for (int i = 0; size <= lookupMaxSize; i++) { + int log2Delta = sizeClasses[i][LOG2DELTA_IDX]; + int times = 1 << log2Delta - LOG2_QUANTUM; + + while (size <= lookupMaxSize && times-- > 0) { + size2idxTab[idx++] = i; + size = idx + 1 << LOG2_QUANTUM; + } + } + } + + @Override + public int sizeIdx2size(int sizeIdx) { + return sizeIdx2sizeTab[sizeIdx]; + } + + @Override + public int sizeIdx2sizeCompute(int sizeIdx) { + int group = sizeIdx >> LOG2_SIZE_CLASS_GROUP; + int mod = sizeIdx & (1 << LOG2_SIZE_CLASS_GROUP) - 1; + + int groupSize = group == 0? 0 : + 1 << LOG2_QUANTUM + LOG2_SIZE_CLASS_GROUP - 1 << group; + + int shift = group == 0? 1 : group; + int lgDelta = shift + LOG2_QUANTUM - 1; + int modSize = mod + 1 << lgDelta; + + return groupSize + modSize; + } + + @Override + public long pageIdx2size(int pageIdx) { + return pageIdx2sizeTab[pageIdx]; + } + + @Override + public long pageIdx2sizeCompute(int pageIdx) { + int group = pageIdx >> LOG2_SIZE_CLASS_GROUP; + int mod = pageIdx & (1 << LOG2_SIZE_CLASS_GROUP) - 1; + + long groupSize = group == 0? 0 : + 1L << pageShifts + LOG2_SIZE_CLASS_GROUP - 1 << group; + + int shift = group == 0? 1 : group; + int log2Delta = shift + pageShifts - 1; + int modSize = mod + 1 << log2Delta; + + return groupSize + modSize; + } + + @Override + public int size2SizeIdx(int size) { + if (size == 0) { + return 0; + } + if (size > chunkSize) { + return nSizes; + } + + if (directMemoryCacheAlignment > 0) { + size = alignSize(size); + } + + if (size <= lookupMaxSize) { + //size-1 / MIN_TINY + return size2idxTab[size - 1 >> LOG2_QUANTUM]; + } + + int x = log2((size << 1) - 1); + int shift = x < LOG2_SIZE_CLASS_GROUP + LOG2_QUANTUM + 1 + ? 0 : x - (LOG2_SIZE_CLASS_GROUP + LOG2_QUANTUM); + + int group = shift << LOG2_SIZE_CLASS_GROUP; + + int log2Delta = x < LOG2_SIZE_CLASS_GROUP + LOG2_QUANTUM + 1 + ? LOG2_QUANTUM : x - LOG2_SIZE_CLASS_GROUP - 1; + + int deltaInverseMask = -1 << log2Delta; + int mod = (size - 1 & deltaInverseMask) >> log2Delta & + (1 << LOG2_SIZE_CLASS_GROUP) - 1; + + return group + mod; + } + + @Override + public int pages2pageIdx(int pages) { + return pages2pageIdxCompute(pages, false); + } + + @Override + public int pages2pageIdxFloor(int pages) { + return pages2pageIdxCompute(pages, true); + } + + private int pages2pageIdxCompute(int pages, boolean floor) { + int pageSize = pages << pageShifts; + if (pageSize > chunkSize) { + return nPSizes; + } + + int x = log2((pageSize << 1) - 1); + + int shift = x < LOG2_SIZE_CLASS_GROUP + pageShifts + ? 0 : x - (LOG2_SIZE_CLASS_GROUP + pageShifts); + + int group = shift << LOG2_SIZE_CLASS_GROUP; + + int log2Delta = x < LOG2_SIZE_CLASS_GROUP + pageShifts + 1? + pageShifts : x - LOG2_SIZE_CLASS_GROUP - 1; + + int deltaInverseMask = -1 << log2Delta; + int mod = (pageSize - 1 & deltaInverseMask) >> log2Delta & + (1 << LOG2_SIZE_CLASS_GROUP) - 1; + + int pageIdx = group + mod; + + if (floor && pageIdx2sizeTab[pageIdx] > pages << pageShifts) { + pageIdx--; + } + + return pageIdx; + } + + // Round size up to the nearest multiple of alignment. + private int alignSize(int size) { + int delta = size & directMemoryCacheAlignment - 1; + return delta == 0? size : size + directMemoryCacheAlignment - delta; + } + + @Override + public int normalizeSize(int size) { + if (size == 0) { + return sizeIdx2sizeTab[0]; + } + if (directMemoryCacheAlignment > 0) { + size = alignSize(size); + } + + if (size <= lookupMaxSize) { + int ret = sizeIdx2sizeTab[size2idxTab[size - 1 >> LOG2_QUANTUM]]; + assert ret == normalizeSizeCompute(size); + return ret; + } + return normalizeSizeCompute(size); + } + + private static int normalizeSizeCompute(int size) { + int x = log2((size << 1) - 1); + int log2Delta = x < LOG2_SIZE_CLASS_GROUP + LOG2_QUANTUM + 1 + ? LOG2_QUANTUM : x - LOG2_SIZE_CLASS_GROUP - 1; + int delta = 1 << log2Delta; + int delta_mask = delta - 1; + return size + delta_mask & ~delta_mask; + } +} diff --git a/buffer/src/main/java/io/netty/buffer/SizeClassesMetric.java b/buffer/src/main/java/io/netty/buffer/SizeClassesMetric.java new file mode 100644 index 0000000000..d066b11c44 --- /dev/null +++ b/buffer/src/main/java/io/netty/buffer/SizeClassesMetric.java @@ -0,0 +1,87 @@ +/* + * Copyright 2020 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.buffer; + +/** + * Expose metrics for an SizeClasses. + */ +public interface SizeClassesMetric { + + /** + * Computes size from lookup table according to sizeIdx. + * + * @return size + */ + int sizeIdx2size(int sizeIdx); + + /** + * Computes size according to sizeIdx. + * + * @return size + */ + int sizeIdx2sizeCompute(int sizeIdx); + + /** + * Computes size from lookup table according to pageIdx. + * + * @return size which is multiples of pageSize. + */ + long pageIdx2size(int pageIdx); + + /** + * Computes size according to pageIdx. + * + * @return size which is multiples of pageSize + */ + long pageIdx2sizeCompute(int pageIdx); + + /** + * Normalizes request size up to the nearest size class. + * + * @param size request size + * + * @return sizeIdx of the size class + */ + int size2SizeIdx(int size); + + /** + * Normalizes request size up to the nearest pageSize class. + * + * @param pages multiples of pageSizes + * + * @return pageIdx of the pageSize class + */ + int pages2pageIdx(int pages); + + /** + * Normalizes request size down to the nearest pageSize class. + * + * @param pages multiples of pageSizes + * + * @return pageIdx of the pageSize class + */ + int pages2pageIdxFloor(int pages); + + /** + * Normalizes usable size that would result from allocating an object with the + * specified size and alignment. + * + * @param size request size + * + * @return normalized size + */ + int normalizeSize(int size); +} diff --git a/buffer/src/test/java/io/netty/buffer/PoolArenaTest.java b/buffer/src/test/java/io/netty/buffer/PoolArenaTest.java index 3ad331ddb3..37e607f169 100644 --- a/buffer/src/test/java/io/netty/buffer/PoolArenaTest.java +++ b/buffer/src/test/java/io/netty/buffer/PoolArenaTest.java @@ -20,29 +20,86 @@ import io.netty.util.internal.PlatformDependent; import org.junit.Assert; import org.junit.Test; -import static org.junit.Assume.assumeTrue; - import java.nio.ByteBuffer; +import static org.junit.Assert.*; +import static org.junit.Assume.*; + public class PoolArenaTest { + private static final int PAGE_SIZE = 8192; + private static final int PAGE_SHIFTS = 11; + //chunkSize = pageSize * (2 ^ pageShifts) + private static final int CHUNK_SIZE = 16777216; + @Test - public void testNormalizeCapacity() throws Exception { - PoolArena arena = new PoolArena.DirectArena(null, 0, 0, 9, 999999, 0); + public void testNormalizeCapacity() { + PoolArena arena = new PoolArena.DirectArena(null, PAGE_SIZE, PAGE_SHIFTS, CHUNK_SIZE, 0); int[] reqCapacities = {0, 15, 510, 1024, 1023, 1025}; - int[] expectedResult = {0, 16, 512, 1024, 1024, 2048}; + int[] expectedResult = {16, 16, 512, 1024, 1024, 1280}; for (int i = 0; i < reqCapacities.length; i ++) { - Assert.assertEquals(expectedResult[i], arena.normalizeCapacity(reqCapacities[i])); + Assert.assertEquals(expectedResult[i], arena.sizeIdx2size(arena.size2SizeIdx(reqCapacities[i]))); } } @Test - public void testNormalizeAlignedCapacity() throws Exception { - PoolArena arena = new PoolArena.DirectArena(null, 0, 0, 9, 999999, 64); + public void testNormalizeAlignedCapacity() { + PoolArena arena = new PoolArena.DirectArena(null, PAGE_SIZE, PAGE_SHIFTS, CHUNK_SIZE, 64); int[] reqCapacities = {0, 15, 510, 1024, 1023, 1025}; - int[] expectedResult = {0, 64, 512, 1024, 1024, 2048}; + int[] expectedResult = {16, 64, 512, 1024, 1024, 1280}; for (int i = 0; i < reqCapacities.length; i ++) { - Assert.assertEquals(expectedResult[i], arena.normalizeCapacity(reqCapacities[i])); + Assert.assertEquals(expectedResult[i], arena.sizeIdx2size(arena.size2SizeIdx(reqCapacities[i]))); + } + } + + @Test + public void testSize2SizeIdx() { + PoolArena arena = new PoolArena.DirectArena(null, PAGE_SIZE, PAGE_SHIFTS, CHUNK_SIZE, 0); + + for (int sz = 0; sz <= CHUNK_SIZE; sz++) { + int sizeIdx = arena.size2SizeIdx(sz); + Assert.assertTrue(sz <= arena.sizeIdx2size(sizeIdx)); + if (sizeIdx > 0) { + Assert.assertTrue(sz > arena.sizeIdx2size(sizeIdx - 1)); + } + } + } + + @Test + public void testPages2PageIdx() { + int pageShifts = PAGE_SHIFTS; + + PoolArena arena = new PoolArena.DirectArena(null, PAGE_SIZE, PAGE_SHIFTS, CHUNK_SIZE, 0); + + int maxPages = CHUNK_SIZE >> pageShifts; + for (int pages = 1; pages <= maxPages; pages++) { + int pageIdxFloor = arena.pages2pageIdxFloor(pages); + Assert.assertTrue(pages << pageShifts >= arena.pageIdx2size(pageIdxFloor)); + if (pageIdxFloor > 0 && pages < maxPages) { + Assert.assertTrue(pages << pageShifts < arena.pageIdx2size(pageIdxFloor + 1)); + } + + int pageIdxCeiling = arena.pages2pageIdx(pages); + Assert.assertTrue(pages << pageShifts <= arena.pageIdx2size(pageIdxCeiling)); + if (pageIdxCeiling > 0) { + Assert.assertTrue(pages << pageShifts > arena.pageIdx2size(pageIdxCeiling - 1)); + } + } + } + + @Test + public void testSizeIdx2size() { + PoolArena arena = new PoolArena.DirectArena(null, PAGE_SIZE, PAGE_SHIFTS, CHUNK_SIZE, 0); + for (int i = 0; i < arena.nSizes; i++) { + assertEquals(arena.sizeIdx2sizeCompute(i), arena.sizeIdx2size(i)); + } + } + + @Test + public void testPageIdx2size() { + PoolArena arena = new PoolArena.DirectArena(null, PAGE_SIZE, PAGE_SHIFTS, CHUNK_SIZE, 0); + for (int i = 0; i < arena.nPSizes; i++) { + assertEquals(arena.pageIdx2sizeCompute(i), arena.pageIdx2size(i)); } } @@ -57,7 +114,7 @@ public class PoolArenaTest { ? PlatformDependent.allocateDirectNoCleaner(capacity + alignment) : ByteBuffer.allocateDirect(capacity + alignment); - PoolArena.DirectArena arena = new PoolArena.DirectArena(null, 0, 0, 9, 9, alignment); + PoolArena.DirectArena arena = new PoolArena.DirectArena(null, 512, 9, 512, alignment); int offset = arena.offsetCacheLine(bb); long address = PlatformDependent.directBufferAddress(bb); @@ -80,31 +137,25 @@ public class PoolArenaTest { true // useCacheForAllThreads ); - // create tiny buffer - final ByteBuf b1 = allocator.directBuffer(24); // create small buffer - final ByteBuf b2 = allocator.directBuffer(800); + final ByteBuf b1 = allocator.directBuffer(800); // create normal buffer - final ByteBuf b3 = allocator.directBuffer(8192 * 2); + final ByteBuf b2 = allocator.directBuffer(8192 * 5); Assert.assertNotNull(b1); Assert.assertNotNull(b2); - Assert.assertNotNull(b3); // then release buffer to deallocated memory while threadlocal cache has been disabled // allocations counter value must equals deallocations counter value Assert.assertTrue(b1.release()); Assert.assertTrue(b2.release()); - Assert.assertTrue(b3.release()); Assert.assertTrue(allocator.directArenas().size() >= 1); final PoolArenaMetric metric = allocator.directArenas().get(0); - Assert.assertEquals(3, metric.numDeallocations()); - Assert.assertEquals(3, metric.numAllocations()); + Assert.assertEquals(2, metric.numDeallocations()); + Assert.assertEquals(2, metric.numAllocations()); - Assert.assertEquals(1, metric.numTinyDeallocations()); - Assert.assertEquals(1, metric.numTinyAllocations()); Assert.assertEquals(1, metric.numSmallDeallocations()); Assert.assertEquals(1, metric.numSmallAllocations()); Assert.assertEquals(1, metric.numNormalDeallocations()); diff --git a/buffer/src/test/java/io/netty/buffer/PooledByteBufAllocatorTest.java b/buffer/src/test/java/io/netty/buffer/PooledByteBufAllocatorTest.java index 3de9c0da4f..27eda87733 100644 --- a/buffer/src/test/java/io/netty/buffer/PooledByteBufAllocatorTest.java +++ b/buffer/src/test/java/io/netty/buffer/PooledByteBufAllocatorTest.java @@ -23,6 +23,7 @@ import io.netty.util.internal.SystemPropertyUtil; import org.junit.Assume; import org.junit.Test; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; import java.util.Queue; @@ -32,6 +33,8 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.LockSupport; +import static io.netty.buffer.PoolChunk.runOffset; +import static io.netty.buffer.PoolChunk.runPages; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -212,19 +215,6 @@ public class PooledByteBufAllocatorTest extends AbstractByteBufAllocatorTest b = unwrapIfNeeded(b4); + + //b2 and b3 are collapsed, b4 should start at offset 4 + assertEquals(4, runOffset(b.handle)); + assertEquals(10, runPages(b.handle)); + + b1.release(); + b4.release(); + + //all ByteBuf are collapsed, b5 should start at offset 0 + ByteBuf b5 = allocator.buffer(pageSize * 20); + b = unwrapIfNeeded(b5); + + assertEquals(0, runOffset(b.handle)); + assertEquals(20, runPages(b.handle)); + + b5.release(); + } + + @Test + public void testAllocateSmallOffset() { + int pageSize = 8192; + ByteBufAllocator allocator = new PooledByteBufAllocator(true, 1, 1, 8192, 11, 0, 0, 0); + + int size = pageSize * 5; + + ByteBuf[] bufs = new ByteBuf[10]; + for (int i = 0; i < 10; i++) { + bufs[i] = allocator.buffer(size); + } + + for (int i = 0; i < 5; i++) { + bufs[i].release(); + } + + //make sure we always allocate runs with small offset + for (int i = 0; i < 5; i++) { + ByteBuf buf = allocator.buffer(size); + PooledByteBuf unwrapedBuf = unwrapIfNeeded(buf); + assertEquals(runOffset(unwrapedBuf.handle), i * 5); + bufs[i] = buf; + } + + //release at reverse order + for (int i = 10 - 1; i >= 5; i--) { + bufs[i].release(); + } + + for (int i = 5; i < 10; i++) { + ByteBuf buf = allocator.buffer(size); + PooledByteBuf unwrapedBuf = unwrapIfNeeded(buf); + assertEquals(runOffset(unwrapedBuf.handle), i * 5); + bufs[i] = buf; + } + + for (int i = 0; i < 10; i++) { + bufs[i].release(); + } + } + @Test (timeout = 4000) public void testThreadCacheDestroyedByThreadCleaner() throws InterruptedException { testThreadCacheDestroyed(false); @@ -501,26 +565,33 @@ public class PooledByteBufAllocatorTest extends AbstractByteBufAllocatorTest unwrapIfNeeded(ByteBuf buf) { + return (PooledByteBuf) (buf instanceof PooledByteBuf ? buf : buf.unwrap()); + } }