/* * Copyright 2012 The Netty Project * * The Netty Project licenses this file to you under the Apache License, * version 2.0 (the "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at: * * https://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the * License for the specific language governing permissions and limitations * under the License. */ package io.net5.buffer; import io.net5.util.internal.PlatformDependent; import io.net5.util.internal.StringUtil; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.LongAdder; import static io.net5.buffer.PoolChunk.isSubpage; import static java.lang.Math.max; abstract class PoolArena extends SizeClasses implements PoolArenaMetric { static final boolean HAS_UNSAFE = PlatformDependent.hasUnsafe(); enum SizeClass { Small, Normal } final PooledByteBufAllocator parent; final int numSmallSubpagePools; final int directMemoryCacheAlignment; private final PoolSubpage[] smallSubpagePools; private final PoolChunkList q050; private final PoolChunkList q025; private final PoolChunkList q000; private final PoolChunkList qInit; private final PoolChunkList q075; private final PoolChunkList q100; private final List chunkListMetrics; // Metrics for allocations and deallocations private long allocationsNormal; // We need to use the LongAdder here as this is not guarded via synchronized block. private final LongAdder allocationsSmall = new LongAdder(); private final LongAdder allocationsHuge = new LongAdder(); private final LongAdder activeBytesHuge = new LongAdder(); private long deallocationsSmall; private long deallocationsNormal; // We need to use the LongAdder here as this is not guarded via synchronized block. private final LongAdder deallocationsHuge = new LongAdder(); // Number of thread caches backed by this arena. final AtomicInteger numThreadCaches = new AtomicInteger(); // TODO: Test if adding padding helps under contention //private long pad0, pad1, pad2, pad3, pad4, pad5, pad6, pad7; protected PoolArena(PooledByteBufAllocator parent, int pageSize, int pageShifts, int chunkSize, int cacheAlignment) { super(pageSize, pageShifts, chunkSize, cacheAlignment); this.parent = parent; directMemoryCacheAlignment = cacheAlignment; numSmallSubpagePools = nSubpages; smallSubpagePools = newSubpagePoolArray(numSmallSubpagePools); for (int i = 0; i < smallSubpagePools.length; i ++) { smallSubpagePools[i] = newSubpagePoolHead(); } q100 = new PoolChunkList<>(this, null, 100, Integer.MAX_VALUE, chunkSize); q075 = new PoolChunkList<>(this, q100, 75, 100, chunkSize); q050 = new PoolChunkList<>(this, q075, 50, 100, chunkSize); q025 = new PoolChunkList<>(this, q050, 25, 75, chunkSize); q000 = new PoolChunkList<>(this, q025, 1, 50, chunkSize); qInit = new PoolChunkList<>(this, q000, Integer.MIN_VALUE, 25, chunkSize); q100.prevList(q075); q075.prevList(q050); q050.prevList(q025); q025.prevList(q000); q000.prevList(null); qInit.prevList(qInit); List metrics = new ArrayList<>(6); metrics.add(qInit); metrics.add(q000); metrics.add(q025); metrics.add(q050); metrics.add(q075); metrics.add(q100); chunkListMetrics = Collections.unmodifiableList(metrics); } private PoolSubpage newSubpagePoolHead() { PoolSubpage head = new PoolSubpage<>(); head.prev = head; head.next = head; return head; } @SuppressWarnings("unchecked") private PoolSubpage[] newSubpagePoolArray(int size) { return new PoolSubpage[size]; } abstract boolean isDirect(); PooledByteBuf allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) { PooledByteBuf buf = newByteBuf(maxCapacity); allocate(cache, buf, reqCapacity); return buf; } private void allocate(PoolThreadCache cache, PooledByteBuf buf, final int reqCapacity) { final int sizeIdx = size2SizeIdx(reqCapacity); if (sizeIdx <= smallMaxSizeIdx) { tcacheAllocateSmall(cache, buf, reqCapacity, sizeIdx); } else if (sizeIdx < nSizes) { tcacheAllocateNormal(cache, buf, reqCapacity, sizeIdx); } else { int normCapacity = directMemoryCacheAlignment > 0 ? normalizeSize(reqCapacity) : reqCapacity; // Huge allocations are never served via the cache so just call allocateHuge allocateHuge(buf, normCapacity); } } private void tcacheAllocateSmall(PoolThreadCache cache, PooledByteBuf buf, final int reqCapacity, final int sizeIdx) { if (cache.allocateSmall(this, buf, reqCapacity, sizeIdx)) { // was able to allocate out of the cache so move on return; } /* * Synchronize on the head. This is needed as {@link PoolChunk#allocateSubpage(int)} and * {@link PoolChunk#free(long)} may modify the doubly linked list as well. */ final PoolSubpage head = smallSubpagePools[sizeIdx]; final boolean needsNormalAllocation; synchronized (head) { final PoolSubpage s = head.next; needsNormalAllocation = s == head; if (!needsNormalAllocation) { assert s.doNotDestroy && s.elemSize == sizeIdx2size(sizeIdx); long handle = s.allocate(); assert handle >= 0; s.chunk.initBufWithSubpage(buf, null, handle, reqCapacity, cache); } } if (needsNormalAllocation) { synchronized (this) { allocateNormal(buf, reqCapacity, sizeIdx, cache); } } incSmallAllocation(); } private void tcacheAllocateNormal(PoolThreadCache cache, PooledByteBuf buf, final int reqCapacity, final int sizeIdx) { if (cache.allocateNormal(this, buf, reqCapacity, sizeIdx)) { // was able to allocate out of the cache so move on return; } synchronized (this) { allocateNormal(buf, reqCapacity, sizeIdx, cache); ++allocationsNormal; } } // Method must be called inside synchronized(this) { ... } block private void allocateNormal(PooledByteBuf buf, int reqCapacity, int sizeIdx, PoolThreadCache threadCache) { if (q050.allocate(buf, reqCapacity, sizeIdx, threadCache) || q025.allocate(buf, reqCapacity, sizeIdx, threadCache) || q000.allocate(buf, reqCapacity, sizeIdx, threadCache) || qInit.allocate(buf, reqCapacity, sizeIdx, threadCache) || q075.allocate(buf, reqCapacity, sizeIdx, threadCache)) { return; } // Add a new chunk. PoolChunk c = newChunk(pageSize, nPSizes, pageShifts, chunkSize); boolean success = c.allocate(buf, reqCapacity, sizeIdx, threadCache); assert success; qInit.add(c); } private void incSmallAllocation() { allocationsSmall.increment(); } private void allocateHuge(PooledByteBuf buf, int reqCapacity) { PoolChunk chunk = newUnpooledChunk(reqCapacity); activeBytesHuge.add(chunk.chunkSize()); buf.initUnpooled(chunk, reqCapacity); allocationsHuge.increment(); } void free(PoolChunk chunk, ByteBuffer nioBuffer, long handle, int normCapacity, PoolThreadCache cache) { if (chunk.unpooled) { int size = chunk.chunkSize(); destroyChunk(chunk); activeBytesHuge.add(-size); deallocationsHuge.increment(); } else { SizeClass sizeClass = sizeClass(handle); if (cache != null && cache.add(this, chunk, nioBuffer, handle, normCapacity, sizeClass)) { // cached so not free it. return; } freeChunk(chunk, handle, normCapacity, sizeClass, nioBuffer, false); } } private static SizeClass sizeClass(long handle) { return isSubpage(handle) ? SizeClass.Small : SizeClass.Normal; } void freeChunk(PoolChunk chunk, long handle, int normCapacity, SizeClass sizeClass, ByteBuffer nioBuffer, boolean finalizer) { final boolean destroyChunk; synchronized (this) { // We only call this if freeChunk is not called because of the PoolThreadCache finalizer as otherwise this // may fail due lazy class-loading in for example tomcat. if (!finalizer) { switch (sizeClass) { case Normal: ++deallocationsNormal; break; case Small: ++deallocationsSmall; break; default: throw new Error(); } } destroyChunk = !chunk.parent.free(chunk, handle, normCapacity, nioBuffer); } if (destroyChunk) { // destroyChunk not need to be called while holding the synchronized lock. destroyChunk(chunk); } } PoolSubpage findSubpagePoolHead(int sizeIdx) { return smallSubpagePools[sizeIdx]; } void reallocate(PooledByteBuf buf, int newCapacity, boolean freeOldMemory) { assert newCapacity >= 0 && newCapacity <= buf.maxCapacity(); int oldCapacity = buf.length; if (oldCapacity == newCapacity) { return; } PoolChunk oldChunk = buf.chunk; ByteBuffer oldNioBuffer = buf.tmpNioBuf; long oldHandle = buf.handle; T oldMemory = buf.memory; int oldOffset = buf.offset; int oldMaxLength = buf.maxLength; // This does not touch buf's reader/writer indices allocate(parent.threadCache(), buf, newCapacity); int bytesToCopy; if (newCapacity > oldCapacity) { bytesToCopy = oldCapacity; } else { buf.trimIndicesToCapacity(newCapacity); bytesToCopy = newCapacity; } memoryCopy(oldMemory, oldOffset, buf, bytesToCopy); if (freeOldMemory) { free(oldChunk, oldNioBuffer, oldHandle, oldMaxLength, buf.cache); } } @Override public int numThreadCaches() { return numThreadCaches.get(); } @Override public int numTinySubpages() { return 0; } @Override public int numSmallSubpages() { return smallSubpagePools.length; } @Override public int numChunkLists() { return chunkListMetrics.size(); } @Override public List tinySubpages() { return Collections.emptyList(); } @Override public List smallSubpages() { return subPageMetricList(smallSubpagePools); } @Override public List chunkLists() { return chunkListMetrics; } private static List subPageMetricList(PoolSubpage[] pages) { List metrics = new ArrayList<>(); for (PoolSubpage head : pages) { if (head.next == head) { continue; } PoolSubpage s = head.next; do { metrics.add(s); s = s.next; } while (s != head); } return metrics; } @Override public long numAllocations() { final long allocsNormal; synchronized (this) { allocsNormal = allocationsNormal; } return allocationsSmall.longValue() + allocsNormal + allocationsHuge.longValue(); } @Override public long numTinyAllocations() { return 0; } @Override public long numSmallAllocations() { return allocationsSmall.longValue(); } @Override public synchronized long numNormalAllocations() { return allocationsNormal; } @Override public long numDeallocations() { final long deallocs; synchronized (this) { deallocs = deallocationsSmall + deallocationsNormal; } return deallocs + deallocationsHuge.longValue(); } @Override public long numTinyDeallocations() { return 0; } @Override public synchronized long numSmallDeallocations() { return deallocationsSmall; } @Override public synchronized long numNormalDeallocations() { return deallocationsNormal; } @Override public long numHugeAllocations() { return allocationsHuge.longValue(); } @Override public long numHugeDeallocations() { return deallocationsHuge.longValue(); } @Override public long numActiveAllocations() { long val = allocationsSmall.longValue() + allocationsHuge.longValue() - deallocationsHuge.longValue(); synchronized (this) { val += allocationsNormal - (deallocationsSmall + deallocationsNormal); } return max(val, 0); } @Override public long numActiveTinyAllocations() { return 0; } @Override public long numActiveSmallAllocations() { return max(numSmallAllocations() - numSmallDeallocations(), 0); } @Override public long numActiveNormalAllocations() { final long val; synchronized (this) { val = allocationsNormal - deallocationsNormal; } return max(val, 0); } @Override public long numActiveHugeAllocations() { return max(numHugeAllocations() - numHugeDeallocations(), 0); } @Override public long numActiveBytes() { long val = activeBytesHuge.longValue(); synchronized (this) { for (int i = 0; i < chunkListMetrics.size(); i++) { for (PoolChunkMetric m: chunkListMetrics.get(i)) { val += m.chunkSize(); } } } return max(0, val); } /** * Return the number of bytes that are currently pinned to buffer instances, by the arena. The pinned memory is not * accessible for use by any other allocation, until the buffers using have all been released. */ public long numPinnedBytes() { long val = activeBytesHuge.longValue(); // Huge chunks are exact-sized for the buffers they were allocated to. synchronized (this) { for (int i = 0; i < chunkListMetrics.size(); i++) { for (PoolChunkMetric m: chunkListMetrics.get(i)) { val += ((PoolChunk) m).pinnedBytes(); } } } return max(0, val); } protected abstract PoolChunk newChunk(int pageSize, int maxPageIdx, int pageShifts, int chunkSize); protected abstract PoolChunk newUnpooledChunk(int capacity); protected abstract PooledByteBuf newByteBuf(int maxCapacity); protected abstract void memoryCopy(T src, int srcOffset, PooledByteBuf dst, int length); protected abstract void destroyChunk(PoolChunk chunk); @Override public synchronized String toString() { StringBuilder buf = new StringBuilder() .append("Chunk(s) at 0~25%:") .append(StringUtil.NEWLINE) .append(qInit) .append(StringUtil.NEWLINE) .append("Chunk(s) at 0~50%:") .append(StringUtil.NEWLINE) .append(q000) .append(StringUtil.NEWLINE) .append("Chunk(s) at 25~75%:") .append(StringUtil.NEWLINE) .append(q025) .append(StringUtil.NEWLINE) .append("Chunk(s) at 50~100%:") .append(StringUtil.NEWLINE) .append(q050) .append(StringUtil.NEWLINE) .append("Chunk(s) at 75~100%:") .append(StringUtil.NEWLINE) .append(q075) .append(StringUtil.NEWLINE) .append("Chunk(s) at 100%:") .append(StringUtil.NEWLINE) .append(q100) .append(StringUtil.NEWLINE) .append("small subpages:"); appendPoolSubPages(buf, smallSubpagePools); buf.append(StringUtil.NEWLINE); return buf.toString(); } private static void appendPoolSubPages(StringBuilder buf, PoolSubpage[] subpages) { for (int i = 0; i < subpages.length; i ++) { PoolSubpage head = subpages[i]; if (head.next == head) { continue; } buf.append(StringUtil.NEWLINE) .append(i) .append(": "); PoolSubpage s = head.next; do { buf.append(s); s = s.next; } while (s != head); } } @Override protected final void finalize() throws Throwable { try { super.finalize(); } finally { destroyPoolSubPages(smallSubpagePools); destroyPoolChunkLists(qInit, q000, q025, q050, q075, q100); } } private static void destroyPoolSubPages(PoolSubpage[] pages) { for (PoolSubpage page : pages) { page.destroy(); } } private void destroyPoolChunkLists(PoolChunkList... chunkLists) { for (PoolChunkList chunkList: chunkLists) { chunkList.destroy(this); } } static final class HeapArena extends PoolArena { HeapArena(PooledByteBufAllocator parent, int pageSize, int pageShifts, int chunkSize, int directMemoryCacheAlignment) { super(parent, pageSize, pageShifts, chunkSize, directMemoryCacheAlignment); } private static byte[] newByteArray(int size) { return PlatformDependent.allocateUninitializedArray(size); } @Override boolean isDirect() { return false; } @Override protected PoolChunk newChunk(int pageSize, int maxPageIdx, int pageShifts, int chunkSize) { return new PoolChunk<>( this, null, newByteArray(chunkSize), pageSize, pageShifts, chunkSize, maxPageIdx); } @Override protected PoolChunk newUnpooledChunk(int capacity) { return new PoolChunk<>(this, null, newByteArray(capacity), capacity); } @Override protected void destroyChunk(PoolChunk chunk) { // Rely on GC. } @Override protected PooledByteBuf newByteBuf(int maxCapacity) { return HAS_UNSAFE ? PooledUnsafeHeapByteBuf.newUnsafeInstance(maxCapacity) : PooledHeapByteBuf.newInstance(maxCapacity); } @Override protected void memoryCopy(byte[] src, int srcOffset, PooledByteBuf dst, int length) { if (length == 0) { return; } System.arraycopy(src, srcOffset, dst.memory, dst.offset, length); } } static final class DirectArena extends PoolArena { DirectArena(PooledByteBufAllocator parent, int pageSize, int pageShifts, int chunkSize, int directMemoryCacheAlignment) { super(parent, pageSize, pageShifts, chunkSize, directMemoryCacheAlignment); } @Override boolean isDirect() { return true; } @Override protected PoolChunk newChunk(int pageSize, int maxPageIdx, int pageShifts, int chunkSize) { if (directMemoryCacheAlignment == 0) { ByteBuffer memory = allocateDirect(chunkSize); return new PoolChunk<>(this, memory, memory, pageSize, pageShifts, chunkSize, maxPageIdx); } final ByteBuffer base = allocateDirect(chunkSize + directMemoryCacheAlignment); final ByteBuffer memory = PlatformDependent.alignDirectBuffer(base, directMemoryCacheAlignment); return new PoolChunk<>(this, base, memory, pageSize, pageShifts, chunkSize, maxPageIdx); } @Override protected PoolChunk newUnpooledChunk(int capacity) { if (directMemoryCacheAlignment == 0) { ByteBuffer memory = allocateDirect(capacity); return new PoolChunk<>(this, memory, memory, capacity); } final ByteBuffer base = allocateDirect(capacity + directMemoryCacheAlignment); final ByteBuffer memory = PlatformDependent.alignDirectBuffer(base, directMemoryCacheAlignment); return new PoolChunk<>(this, base, memory, capacity); } private static ByteBuffer allocateDirect(int capacity) { return PlatformDependent.useDirectBufferNoCleaner() ? PlatformDependent.allocateDirectNoCleaner(capacity) : ByteBuffer.allocateDirect(capacity); } @Override protected void destroyChunk(PoolChunk chunk) { if (PlatformDependent.useDirectBufferNoCleaner()) { PlatformDependent.freeDirectNoCleaner((ByteBuffer) chunk.base); } else { PlatformDependent.freeDirectBuffer((ByteBuffer) chunk.base); } } @Override protected PooledByteBuf newByteBuf(int maxCapacity) { if (HAS_UNSAFE) { return PooledUnsafeDirectByteBuf.newInstance(maxCapacity); } else { return PooledDirectByteBuf.newInstance(maxCapacity); } } @Override protected void memoryCopy(ByteBuffer src, int srcOffset, PooledByteBuf dstBuf, int length) { if (length == 0) { return; } if (HAS_UNSAFE) { PlatformDependent.copyMemory( PlatformDependent.directBufferAddress(src) + srcOffset, PlatformDependent.directBufferAddress(dstBuf.memory) + dstBuf.offset, length); } else { // We must duplicate the NIO buffers because they may be accessed by other Netty buffers. src = src.duplicate(); ByteBuffer dst = dstBuf.internalNioBuffer(); src.position(srcOffset).limit(srcOffset + length); dst.position(dstBuf.offset); dst.put(src); } } } }