diff --git a/src/main/java/io/netty/buffer/api/pool/PoolArena.java b/src/main/java/io/netty/buffer/api/pool/PoolArena.java index 61afa4e..4a2c179 100644 --- a/src/main/java/io/netty/buffer/api/pool/PoolArena.java +++ b/src/main/java/io/netty/buffer/api/pool/PoolArena.java @@ -1,5 +1,6 @@ package io.netty.buffer.api.pool; +import io.netty.buffer.api.AllocatorControl; import io.netty.buffer.api.Buffer; import io.netty.buffer.api.BufferAllocator; import io.netty.buffer.api.MemoryManager; @@ -14,7 +15,7 @@ import java.util.concurrent.atomic.LongAdder; import static io.netty.buffer.api.pool.PoolChunk.isSubpage; import static java.lang.Math.max; -class PoolArena extends SizeClasses implements PoolArenaMetric { +class PoolArena extends SizeClasses implements PoolArenaMetric, AllocatorControl { enum SizeClass { Small, Normal @@ -98,13 +99,13 @@ class PoolArena extends SizeClasses implements PoolArenaMetric { return manager.isNative(); } - Buffer allocate(PoolThreadCache cache, int size) { + Buffer allocate(PooledAllocatorControl control, PoolThreadCache cache, int size) { final int sizeIdx = size2SizeIdx(size); if (sizeIdx <= smallMaxSizeIdx) { - return tcacheAllocateSmall(cache, size, sizeIdx); + return tcacheAllocateSmall(control, cache, size, sizeIdx); } else if (sizeIdx < nSizes) { - return tcacheAllocateNormal(cache, size, sizeIdx); + return tcacheAllocateNormal(control, cache, size, sizeIdx); } else { int normCapacity = directMemoryCacheAlignment > 0 ? normalizeSize(size) : size; @@ -113,9 +114,9 @@ class PoolArena extends SizeClasses implements PoolArenaMetric { } } - private Buffer tcacheAllocateSmall(PoolThreadCache cache, final int size, - final int sizeIdx) { - Buffer buffer = cache.allocateSmall(size, sizeIdx); + private Buffer tcacheAllocateSmall(PooledAllocatorControl control, PoolThreadCache cache, final int size, + final int sizeIdx) { + Buffer buffer = cache.allocateSmall(control, size, sizeIdx); if (buffer != null) { // was able to allocate out of the cache so move on return buffer; @@ -134,13 +135,13 @@ class PoolArena extends SizeClasses implements PoolArenaMetric { assert s.doNotDestroy && s.elemSize == sizeIdx2size(sizeIdx); long handle = s.allocate(); assert handle >= 0; - buffer = s.chunk.allocateBufferWithSubpage(handle, size, cache); + buffer = s.chunk.allocateBufferWithSubpage(handle, size, cache, control); } } if (needsNormalAllocation) { synchronized (this) { - buffer = allocateNormal(size, sizeIdx, cache); + buffer = allocateNormal(size, sizeIdx, cache, control); } } @@ -148,45 +149,45 @@ class PoolArena extends SizeClasses implements PoolArenaMetric { return buffer; } - private Buffer tcacheAllocateNormal(PoolThreadCache cache, int size, int sizeIdx) { - Buffer buffer = cache.allocateNormal(this, size, sizeIdx); + private Buffer tcacheAllocateNormal(PooledAllocatorControl control, PoolThreadCache cache, int size, int sizeIdx) { + Buffer buffer = cache.allocateNormal(this, control, size, sizeIdx); if (buffer != null) { // was able to allocate out of the cache so move on return buffer; } synchronized (this) { - buffer = allocateNormal(size, sizeIdx, cache); + buffer = allocateNormal(size, sizeIdx, cache, control); allocationsNormal++; } return buffer; } // Method must be called inside synchronized(this) { ... } block - private Buffer allocateNormal(int size, int sizeIdx, PoolThreadCache threadCache) { - Buffer buffer = q050.allocate(size, sizeIdx, threadCache); + private Buffer allocateNormal(int size, int sizeIdx, PoolThreadCache threadCache, PooledAllocatorControl control) { + Buffer buffer = q050.allocate(size, sizeIdx, threadCache, control); if (buffer != null) { return buffer; } - buffer = q025.allocate(size, sizeIdx, threadCache); + buffer = q025.allocate(size, sizeIdx, threadCache, control); if (buffer != null) { return buffer; } - buffer = q000.allocate(size, sizeIdx, threadCache); + buffer = q000.allocate(size, sizeIdx, threadCache, control); if (buffer != null) { return buffer; } - buffer = qInit.allocate(size, sizeIdx, threadCache); + buffer = qInit.allocate(size, sizeIdx, threadCache, control); if (buffer != null) { return buffer; } - buffer = q075.allocate(size, sizeIdx, threadCache); + buffer = q075.allocate(size, sizeIdx, threadCache, control); if (buffer != null) { return buffer; } // Add a new chunk. PoolChunk c = newChunk(pageSize, nPSizes, pageShifts, chunkSize); - buffer = c.allocate(size, sizeIdx, threadCache); + buffer = c.allocate(size, sizeIdx, threadCache, control); assert buffer != null; qInit.add(c); return buffer; @@ -249,6 +250,18 @@ class PoolArena extends SizeClasses implements PoolArenaMetric { return smallSubpagePools[sizeIdx]; } + @Override + public Object allocateUntethered(Buffer originator, int size) { + throw new AssertionError("PoolChunk base buffers should never need to reallocate."); + } + + @Override + public void recoverMemory(Object memory) { + // This means we've lost all strong references to a PoolChunk. + // Probably means we don't need it anymore, so just free its memory. + manager.discardRecoverableMemory(memory); + } + @Override public int numThreadCaches() { return numThreadCaches.get(); @@ -397,7 +410,7 @@ class PoolArena extends SizeClasses implements PoolArenaMetric { } protected final PoolChunk newChunk(int pageSize, int maxPageIdx, int pageShifts, int chunkSize) { - Buffer base = manager.allocateShared(parent, chunkSize, manager.drop(), Statics.CLEANER); + Buffer base = manager.allocateShared(this, chunkSize, manager.drop(), Statics.CLEANER); Object memory = manager.unwrapRecoverableMemory(base); return new PoolChunk( this, base, memory, pageSize, pageShifts, chunkSize, maxPageIdx); @@ -454,4 +467,13 @@ class PoolArena extends SizeClasses implements PoolArenaMetric { } while (s != head); } } + + public void close() { + for (PoolSubpage page : smallSubpagePools) { + page.destroy(); + } + for (PoolChunkList list : new PoolChunkList[] {qInit, q000, q025, q050, q100}) { + list.destroy(); + } + } } diff --git a/src/main/java/io/netty/buffer/api/pool/PoolChunk.java b/src/main/java/io/netty/buffer/api/pool/PoolChunk.java index 1804a51..c9b65e9 100644 --- a/src/main/java/io/netty/buffer/api/pool/PoolChunk.java +++ b/src/main/java/io/netty/buffer/api/pool/PoolChunk.java @@ -175,20 +175,6 @@ final class PoolChunk implements PoolChunkMetric { insertAvailRun(0, pages, initHandle); } - /** Creates a special chunk that is not pooled. */ - PoolChunk(PoolArena arena, Buffer base, Object memory, int size) { - unpooled = true; - this.arena = arena; - this.base = base; - this.memory = memory; - pageSize = 0; - pageShifts = 0; - runsAvailMap = null; - runsAvail = null; - subpages = null; - chunkSize = size; - } - private static LongPriorityQueue[] newRunsAvailqueueArray(int size) { LongPriorityQueue[] queueArray = new LongPriorityQueue[size]; for (int i = 0; i < queueArray.length; i++) { @@ -263,7 +249,7 @@ final class PoolChunk implements PoolChunkMetric { return 100 - freePercentage; } - Buffer allocate(int size, int sizeIdx, PoolThreadCache cache) { + Buffer allocate(int size, int sizeIdx, PoolThreadCache cache, PooledAllocatorControl control) { final long handle; if (sizeIdx <= arena.smallMaxSizeIdx) { // small @@ -282,7 +268,7 @@ final class PoolChunk implements PoolChunkMetric { } } - return allocateBuffer(handle, size, cache); + return allocateBuffer(handle, size, cache, control); } private long allocateRun(int runSize) { @@ -514,23 +500,25 @@ final class PoolChunk implements PoolChunkMetric { | (long) inUsed << IS_USED_SHIFT; } - Buffer allocateBuffer(long handle, int size, PoolThreadCache threadCache) { + Buffer allocateBuffer(long handle, int size, PoolThreadCache threadCache, PooledAllocatorControl control) { if (isRun(handle)) { int offset = runOffset(handle) << pageShifts; int maxLength = runSize(pageShifts, handle); PoolThreadCache poolThreadCache = arena.parent.threadCache(); - return arena.manager.recoverMemory(arena.parent, memory, offset, size, new Drop() { + initAllocatorControl(control, poolThreadCache, handle, maxLength, offset, size); + return arena.manager.recoverMemory(control, memory, offset, size, new Drop() { @Override public void drop(Buffer obj) { arena.free(PoolChunk.this, handle, maxLength, poolThreadCache); } }); } else { - return allocateBufferWithSubpage(handle, size, threadCache); + return allocateBufferWithSubpage(handle, size, threadCache, control); } } - Buffer allocateBufferWithSubpage(long handle, int size, PoolThreadCache threadCache) { + Buffer allocateBufferWithSubpage(long handle, int size, PoolThreadCache threadCache, + PooledAllocatorControl control) { int runOffset = runOffset(handle); int bitmapIdx = bitmapIdx(handle); @@ -539,7 +527,8 @@ final class PoolChunk implements PoolChunkMetric { assert size <= s.elemSize; int offset = (runOffset << pageShifts) + bitmapIdx * s.elemSize; - return arena.manager.recoverMemory(arena.parent, memory, offset, size, new Drop() { + initAllocatorControl(control, threadCache, handle, s.elemSize, offset, size); + return arena.manager.recoverMemory(control, memory, offset, size, new Drop() { @Override public void drop(Buffer obj) { arena.free(PoolChunk.this, handle, s.elemSize, threadCache); @@ -547,6 +536,18 @@ final class PoolChunk implements PoolChunkMetric { }); } + private void initAllocatorControl(PooledAllocatorControl control, PoolThreadCache threadCache, long handle, + int normSize, int offset, int size) { + control.arena = arena; + control.chunk = this; + control.threadCache = threadCache; + control.handle = handle; + control.normSize = normSize; + control.memory = memory; + control.offset = offset; + control.size = size; + } + @Override public int chunkSize() { return chunkSize; diff --git a/src/main/java/io/netty/buffer/api/pool/PoolChunkList.java b/src/main/java/io/netty/buffer/api/pool/PoolChunkList.java index 4c18f95..9211939 100644 --- a/src/main/java/io/netty/buffer/api/pool/PoolChunkList.java +++ b/src/main/java/io/netty/buffer/api/pool/PoolChunkList.java @@ -76,7 +76,7 @@ final class PoolChunkList implements PoolChunkListMetric { this.prevList = prevList; } - Buffer allocate(int size, int sizeIdx, PoolThreadCache threadCache) { + Buffer allocate(int size, int sizeIdx, PoolThreadCache threadCache, PooledAllocatorControl control) { int normCapacity = arena.sizeIdx2size(sizeIdx); if (normCapacity > maxCapacity) { // Either this PoolChunkList is empty, or the requested capacity is larger than the capacity which can @@ -85,7 +85,7 @@ final class PoolChunkList implements PoolChunkListMetric { } for (PoolChunk cur = head; cur != null; cur = cur.next) { - Buffer buffer = cur.allocate(size, sizeIdx, threadCache); + Buffer buffer = cur.allocate(size, sizeIdx, threadCache, control); if (buffer != null) { if (cur.freeBytes <= freeMinThreshold) { remove(cur); @@ -223,10 +223,10 @@ final class PoolChunkList implements PoolChunkListMetric { return buf.toString(); } - void destroy(PoolArena arena) { + void destroy() { PoolChunk chunk = head; while (chunk != null) { - arena.destroyChunk(chunk); + chunk.destroy(); chunk = chunk.next; } head = null; diff --git a/src/main/java/io/netty/buffer/api/pool/PoolThreadCache.java b/src/main/java/io/netty/buffer/api/pool/PoolThreadCache.java index 5e986b1..8fb2e93 100644 --- a/src/main/java/io/netty/buffer/api/pool/PoolThreadCache.java +++ b/src/main/java/io/netty/buffer/api/pool/PoolThreadCache.java @@ -106,24 +106,24 @@ final class PoolThreadCache { /** * Try to allocate a small buffer out of the cache. Returns {@code true} if successful {@code false} otherwise */ - Buffer allocateSmall(int size, int sizeIdx) { - return allocate(cacheForSmall(sizeIdx), size); + Buffer allocateSmall(PooledAllocatorControl control, int size, int sizeIdx) { + return allocate(cacheForSmall(sizeIdx), control, size); } /** * Try to allocate a normal buffer out of the cache. Returns {@code true} if successful {@code false} otherwise */ - Buffer allocateNormal(PoolArena area, int size, int sizeIdx) { - return allocate(cacheForNormal(area, sizeIdx), size); + Buffer allocateNormal(PoolArena area, PooledAllocatorControl control, int size, int sizeIdx) { + return allocate(cacheForNormal(area, sizeIdx), control, size); } - private Buffer allocate(MemoryRegionCache cache, int size) { + private Buffer allocate(MemoryRegionCache cache, PooledAllocatorControl control, int size) { if (cache == null) { // no cache found so just return false here return null; } - Buffer allocated = cache.allocate(size, this); - if (++ allocations >= freeSweepAllocationThreshold) { + Buffer allocated = cache.allocate(size, this, control); + if (++allocations >= freeSweepAllocationThreshold) { allocations = 0; trim(); } @@ -237,8 +237,9 @@ final class PoolThreadCache { } @Override - protected Buffer allocBuf(PoolChunk chunk, long handle, int size, PoolThreadCache threadCache) { - return chunk.allocateBufferWithSubpage(handle, size, threadCache); + protected Buffer allocBuf(PoolChunk chunk, long handle, int size, PoolThreadCache threadCache, + PooledAllocatorControl control) { + return chunk.allocateBufferWithSubpage(handle, size, threadCache, control); } } @@ -251,8 +252,9 @@ final class PoolThreadCache { } @Override - protected Buffer allocBuf(PoolChunk chunk, long handle, int size, PoolThreadCache threadCache) { - return chunk.allocateBuffer(handle, size, threadCache); + protected Buffer allocBuf(PoolChunk chunk, long handle, int size, PoolThreadCache threadCache, + PooledAllocatorControl control) { + return chunk.allocateBuffer(handle, size, threadCache, control); } } @@ -271,7 +273,8 @@ final class PoolThreadCache { /** * Allocate a new {@link Buffer} using the provided chunk and handle with the capacity restrictions. */ - protected abstract Buffer allocBuf(PoolChunk chunk, long handle, int size, PoolThreadCache threadCache); + protected abstract Buffer allocBuf(PoolChunk chunk, long handle, int size, PoolThreadCache threadCache, + PooledAllocatorControl control); /** * Add to cache if not already full. @@ -290,12 +293,12 @@ final class PoolThreadCache { /** * Allocate something out of the cache if possible and remove the entry from the cache. */ - public final Buffer allocate(int size, PoolThreadCache threadCache) { + public final Buffer allocate(int size, PoolThreadCache threadCache, PooledAllocatorControl control) { Entry entry = queue.poll(); if (entry == null) { return null; } - Buffer buffer = allocBuf(entry.chunk, entry.handle, size, threadCache); + Buffer buffer = allocBuf(entry.chunk, entry.handle, size, threadCache, control); entry.recycle(); // allocations are not thread-safe which is fine as this is only called from the same thread all time. diff --git a/src/main/java/io/netty/buffer/api/pool/PooledAllocatorControl.java b/src/main/java/io/netty/buffer/api/pool/PooledAllocatorControl.java new file mode 100644 index 0000000..1a33990 --- /dev/null +++ b/src/main/java/io/netty/buffer/api/pool/PooledAllocatorControl.java @@ -0,0 +1,26 @@ +package io.netty.buffer.api.pool; + +import io.netty.buffer.api.AllocatorControl; +import io.netty.buffer.api.Buffer; + +class PooledAllocatorControl implements AllocatorControl { + public PoolArena arena; + public PoolChunk chunk; + public PoolThreadCache threadCache; + public long handle; + public int normSize; + public Object memory; + public int offset; + public int size; + + @Override + public Object allocateUntethered(Buffer originator, int size) { + Buffer allocate = arena.parent.allocate(this, size); + return arena.manager.unwrapRecoverableMemory(allocate); + } + + @Override + public void recoverMemory(Object memory) { + arena.free(chunk, handle, normSize, threadCache); + } +} diff --git a/src/main/java/io/netty/buffer/api/pool/PooledByteBufAllocator.java b/src/main/java/io/netty/buffer/api/pool/PooledByteBufAllocator.java index b9120d3..f5a7aad 100644 --- a/src/main/java/io/netty/buffer/api/pool/PooledByteBufAllocator.java +++ b/src/main/java/io/netty/buffer/api/pool/PooledByteBufAllocator.java @@ -23,7 +23,7 @@ import java.util.Collections; import java.util.List; import java.util.concurrent.TimeUnit; -public class PooledByteBufAllocator implements BufferAllocator, ByteBufAllocatorMetricProvider, AllocatorControl { +public class PooledByteBufAllocator implements BufferAllocator, ByteBufAllocatorMetricProvider { private static final InternalLogger logger = InternalLoggerFactory.getInstance(PooledByteBufAllocator.class); private static final int DEFAULT_NUM_HEAP_ARENA; @@ -257,7 +257,7 @@ public class PooledByteBufAllocator implements BufferAllocator, ByteBufAllocator // Ensure the resulting chunkSize does not overflow. int chunkSize = pageSize; - for (int i = maxOrder; i > 0; i --) { + for (int i = maxOrder; i > 0; i--) { if (chunkSize > MAX_CHUNK_SIZE / 2) { throw new IllegalArgumentException(String.format( "pageSize (%d) << maxOrder (%d) must not exceed %d", pageSize, maxOrder, MAX_CHUNK_SIZE)); @@ -269,25 +269,25 @@ public class PooledByteBufAllocator implements BufferAllocator, ByteBufAllocator @Override public Buffer allocate(int size) { + return allocate(new PooledAllocatorControl(), size); + } + + Buffer allocate(PooledAllocatorControl control, int size) { PoolThreadCache cache = threadCache.get(); PoolArena arena = cache.arena; if (arena != null) { - return arena.allocate(cache, size); + return arena.allocate(control, cache, size); } BufferAllocator unpooled = manager.isNative()? BufferAllocator.direct() : BufferAllocator.heap(); return unpooled.allocate(size); } @Override - public Object allocateUntethered(Buffer originator, int size) { - // TODO - return null; - } - - @Override - public void recoverMemory(Object memory) { - // TODO + public void close() { + for (PoolArena arena : arenas) { + arena.close(); + } } /**