netty5/buffer/src/main/java/io/netty/buffer/PoolArena.java

481 lines
16 KiB
Java
Raw Normal View History

/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.buffer;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.StringUtil;
import java.nio.ByteBuffer;
abstract class PoolArena<T> {
static final int numTinySubpagePools = 512 >>> 4;
final PooledByteBufAllocator parent;
private final int maxOrder;
final int pageSize;
final int pageShifts;
final int chunkSize;
final int subpageOverflowMask;
final int numSmallSubpagePools;
private final PoolSubpage<T>[] tinySubpagePools;
private final PoolSubpage<T>[] smallSubpagePools;
private final PoolChunkList<T> q050;
private final PoolChunkList<T> q025;
private final PoolChunkList<T> q000;
private final PoolChunkList<T> qInit;
private final PoolChunkList<T> q075;
private final PoolChunkList<T> q100;
// TODO: Test if adding padding helps under contention
//private long pad0, pad1, pad2, pad3, pad4, pad5, pad6, pad7;
protected PoolArena(PooledByteBufAllocator parent, int pageSize, int maxOrder, int pageShifts, int chunkSize) {
this.parent = parent;
this.pageSize = pageSize;
this.maxOrder = maxOrder;
this.pageShifts = pageShifts;
this.chunkSize = chunkSize;
subpageOverflowMask = ~(pageSize - 1);
tinySubpagePools = newSubpagePoolArray(numTinySubpagePools);
for (int i = 0; i < tinySubpagePools.length; i ++) {
tinySubpagePools[i] = newSubpagePoolHead(pageSize);
}
numSmallSubpagePools = pageShifts - 9;
smallSubpagePools = newSubpagePoolArray(numSmallSubpagePools);
for (int i = 0; i < smallSubpagePools.length; i ++) {
smallSubpagePools[i] = newSubpagePoolHead(pageSize);
}
q100 = new PoolChunkList<T>(this, null, 100, Integer.MAX_VALUE);
q075 = new PoolChunkList<T>(this, q100, 75, 100);
q050 = new PoolChunkList<T>(this, q075, 50, 100);
q025 = new PoolChunkList<T>(this, q050, 25, 75);
q000 = new PoolChunkList<T>(this, q025, 1, 50);
qInit = new PoolChunkList<T>(this, q000, Integer.MIN_VALUE, 25);
q100.prevList = q075;
q075.prevList = q050;
q050.prevList = q025;
q025.prevList = q000;
q000.prevList = null;
qInit.prevList = qInit;
}
private PoolSubpage<T> newSubpagePoolHead(int pageSize) {
PoolSubpage<T> head = new PoolSubpage<T>(pageSize);
head.prev = head;
head.next = head;
return head;
}
@SuppressWarnings("unchecked")
private PoolSubpage<T>[] newSubpagePoolArray(int size) {
return new PoolSubpage[size];
}
abstract boolean isDirect();
PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) {
PooledByteBuf<T> buf = newByteBuf(maxCapacity);
allocate(cache, buf, reqCapacity);
return buf;
}
static int tinyIdx(int normCapacity) {
return normCapacity >>> 4;
}
static int smallIdx(int normCapacity) {
int tableIdx = 0;
int i = normCapacity >>> 10;
while (i != 0) {
i >>>= 1;
tableIdx ++;
}
return tableIdx;
}
// capacity < pageSize
boolean isTinyOrSmall(int normCapacity) {
return (normCapacity & subpageOverflowMask) == 0;
}
// normCapacity < 512
static boolean isTiny(int normCapacity) {
return (normCapacity & 0xFFFFFE00) == 0;
}
private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
final int normCapacity = normalizeCapacity(reqCapacity);
if (isTinyOrSmall(normCapacity)) { // capacity < pageSize
int tableIdx;
PoolSubpage<T>[] table;
if (isTiny(normCapacity)) { // < 512
if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {
// was able to allocate out of the cache so move on
return;
}
tableIdx = tinyIdx(normCapacity);
table = tinySubpagePools;
} else {
if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {
// was able to allocate out of the cache so move on
return;
}
tableIdx = smallIdx(normCapacity);
table = smallSubpagePools;
}
synchronized (this) {
final PoolSubpage<T> head = table[tableIdx];
final PoolSubpage<T> s = head.next;
if (s != head) {
assert s.doNotDestroy && s.elemSize == normCapacity;
long handle = s.allocate();
assert handle >= 0;
s.chunk.initBufWithSubpage(buf, handle, reqCapacity);
return;
}
}
} else if (normCapacity <= chunkSize) {
if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {
// was able to allocate out of the cache so move on
return;
}
} else {
// Huge allocations are never served via the cache so just call allocateHuge
allocateHuge(buf, reqCapacity);
return;
}
allocateNormal(buf, reqCapacity, normCapacity);
}
private synchronized void allocateNormal(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) {
if (q050.allocate(buf, reqCapacity, normCapacity) || q025.allocate(buf, reqCapacity, normCapacity) ||
q000.allocate(buf, reqCapacity, normCapacity) || qInit.allocate(buf, reqCapacity, normCapacity) ||
q075.allocate(buf, reqCapacity, normCapacity) || q100.allocate(buf, reqCapacity, normCapacity)) {
return;
}
// Add a new chunk.
PoolChunk<T> c = newChunk(pageSize, maxOrder, pageShifts, chunkSize);
long handle = c.allocate(normCapacity);
assert handle > 0;
c.initBuf(buf, handle, reqCapacity);
qInit.add(c);
}
private void allocateHuge(PooledByteBuf<T> buf, int reqCapacity) {
buf.initUnpooled(newUnpooledChunk(reqCapacity), reqCapacity);
}
void free(PoolChunk<T> chunk, long handle, int normCapacity, boolean sameThreads) {
if (chunk.unpooled) {
destroyChunk(chunk);
} else {
if (sameThreads) {
PoolThreadCache cache = parent.threadCache.get();
if (cache.add(this, chunk, handle, normCapacity)) {
// cached so not free it.
return;
}
}
2013-12-02 08:23:57 +01:00
synchronized (this) {
chunk.parent.free(chunk, handle);
}
}
}
PoolSubpage<T> findSubpagePoolHead(int elemSize) {
int tableIdx;
PoolSubpage<T>[] table;
if (isTiny(elemSize)) { // < 512
tableIdx = elemSize >>> 4;
table = tinySubpagePools;
} else {
tableIdx = 0;
elemSize >>>= 10;
while (elemSize != 0) {
elemSize >>>= 1;
tableIdx ++;
}
table = smallSubpagePools;
}
return table[tableIdx];
}
int normalizeCapacity(int reqCapacity) {
if (reqCapacity < 0) {
throw new IllegalArgumentException("capacity: " + reqCapacity + " (expected: 0+)");
}
if (reqCapacity >= chunkSize) {
return reqCapacity;
}
if (!isTiny(reqCapacity)) { // >= 512
// Doubled
int normalizedCapacity = reqCapacity;
Fix error that causes (up to) double memory usage Motivation: PoolArena's 'normalizeCapacity' function was micro-optimized some time ago to remove a while loop. However, there was a change of behavior in the function as a result. Capacities passed into it that are already powers of 2 (and >= 512) are doubled in size. So if I ask for a buffer with a capacity of 1024, I will get back one that actually uses 2048 bytes (stored in maxLength). Aligning to powers of two for book keeping ease is reasonable, and if someone tries to expand a buffer, you might as well use some of the previously wasted space. However, since this distinction between 'easily expanded' and 'costly to expand' space is not supported at all by the APIs, I cannot imagine this change to doubling is desirable or intentional. This is especially costly when using composite buffers. They frequently allocate components with a capacity that is a power of 2, and they never attempt to expand components themselves. The end result is that heavy use of pool-backed composite buffers wastes almost half of the memory pool (the smaller / initial components are <512 and so are not affected by the off-by-one bug). Modifications: Although I find it difficult to believe that such an optimization is really helpful, I left it in and fixed the off-by-one issue by decrementing the value at the start. I also added a simple test to both attempt to verify that the decrement fixes the issue without introducing any other change, and to make it easy for a reviewer to test the existing behavior. PoolArena does not seem to have much testing or testability support though so the test is kind of a hack and will break for unrelated changes. I suggest either removing it or factoring out the single non-static portion of normalizeCapacity so that the fragile dummy PoolArena is not required. Result: Pooled allocators will allocate less resources to the highly inefficient and undocumented buffer section between length and maxLength. Composite buffers of non-trivial size that are backed by pooled allocators will use about half as much memory.
2014-04-14 22:57:24 +02:00
normalizedCapacity --;
normalizedCapacity |= normalizedCapacity >>> 1;
normalizedCapacity |= normalizedCapacity >>> 2;
normalizedCapacity |= normalizedCapacity >>> 4;
normalizedCapacity |= normalizedCapacity >>> 8;
normalizedCapacity |= normalizedCapacity >>> 16;
normalizedCapacity ++;
if (normalizedCapacity < 0) {
normalizedCapacity >>>= 1;
}
return normalizedCapacity;
}
// Quantum-spaced
if ((reqCapacity & 15) == 0) {
return reqCapacity;
}
return (reqCapacity & ~15) + 16;
}
void reallocate(PooledByteBuf<T> buf, int newCapacity, boolean freeOldMemory) {
if (newCapacity < 0 || newCapacity > buf.maxCapacity()) {
throw new IllegalArgumentException("newCapacity: " + newCapacity);
}
int oldCapacity = buf.length;
if (oldCapacity == newCapacity) {
return;
}
PoolChunk<T> oldChunk = buf.chunk;
long oldHandle = buf.handle;
T oldMemory = buf.memory;
int oldOffset = buf.offset;
int oldMaxLength = buf.maxLength;
int readerIndex = buf.readerIndex();
int writerIndex = buf.writerIndex();
allocate(parent.threadCache.get(), buf, newCapacity);
if (newCapacity > oldCapacity) {
memoryCopy(
oldMemory, oldOffset,
buf.memory, buf.offset, oldCapacity);
} else if (newCapacity < oldCapacity) {
if (readerIndex < newCapacity) {
if (writerIndex > newCapacity) {
writerIndex = newCapacity;
}
memoryCopy(
oldMemory, oldOffset + readerIndex,
buf.memory, buf.offset + readerIndex, writerIndex - readerIndex);
} else {
readerIndex = writerIndex = newCapacity;
}
}
buf.setIndex(readerIndex, writerIndex);
if (freeOldMemory) {
free(oldChunk, oldHandle, oldMaxLength, buf.initThread == Thread.currentThread());
}
}
protected abstract PoolChunk<T> newChunk(int pageSize, int maxOrder, int pageShifts, int chunkSize);
protected abstract PoolChunk<T> newUnpooledChunk(int capacity);
protected abstract PooledByteBuf<T> newByteBuf(int maxCapacity);
protected abstract void memoryCopy(T src, int srcOffset, T dst, int dstOffset, int length);
protected abstract void destroyChunk(PoolChunk<T> chunk);
public synchronized String toString() {
StringBuilder buf = new StringBuilder()
.append("Chunk(s) at 0~25%:")
.append(StringUtil.NEWLINE)
.append(qInit)
.append(StringUtil.NEWLINE)
.append("Chunk(s) at 0~50%:")
.append(StringUtil.NEWLINE)
.append(q000)
.append(StringUtil.NEWLINE)
.append("Chunk(s) at 25~75%:")
.append(StringUtil.NEWLINE)
.append(q025)
.append(StringUtil.NEWLINE)
.append("Chunk(s) at 50~100%:")
.append(StringUtil.NEWLINE)
.append(q050)
.append(StringUtil.NEWLINE)
.append("Chunk(s) at 75~100%:")
.append(StringUtil.NEWLINE)
.append(q075)
.append(StringUtil.NEWLINE)
.append("Chunk(s) at 100%:")
.append(StringUtil.NEWLINE)
.append(q100)
.append(StringUtil.NEWLINE)
.append("tiny subpages:");
for (int i = 1; i < tinySubpagePools.length; i ++) {
PoolSubpage<T> head = tinySubpagePools[i];
if (head.next == head) {
continue;
}
buf.append(StringUtil.NEWLINE)
.append(i)
.append(": ");
PoolSubpage<T> s = head.next;
for (;;) {
buf.append(s);
s = s.next;
if (s == head) {
break;
}
}
}
buf.append(StringUtil.NEWLINE)
.append("small subpages:");
for (int i = 1; i < smallSubpagePools.length; i ++) {
PoolSubpage<T> head = smallSubpagePools[i];
if (head.next == head) {
continue;
}
buf.append(StringUtil.NEWLINE)
.append(i)
.append(": ");
PoolSubpage<T> s = head.next;
for (;;) {
buf.append(s);
s = s.next;
if (s == head) {
break;
}
}
}
buf.append(StringUtil.NEWLINE);
return buf.toString();
}
static final class HeapArena extends PoolArena<byte[]> {
HeapArena(PooledByteBufAllocator parent, int pageSize, int maxOrder, int pageShifts, int chunkSize) {
super(parent, pageSize, maxOrder, pageShifts, chunkSize);
}
@Override
boolean isDirect() {
return false;
}
@Override
protected PoolChunk<byte[]> newChunk(int pageSize, int maxOrder, int pageShifts, int chunkSize) {
return new PoolChunk<byte[]>(this, new byte[chunkSize], pageSize, maxOrder, pageShifts, chunkSize);
}
@Override
protected PoolChunk<byte[]> newUnpooledChunk(int capacity) {
return new PoolChunk<byte[]>(this, new byte[capacity], capacity);
}
@Override
protected void destroyChunk(PoolChunk<byte[]> chunk) {
// Rely on GC.
}
@Override
protected PooledByteBuf<byte[]> newByteBuf(int maxCapacity) {
return PooledHeapByteBuf.newInstance(maxCapacity);
}
@Override
protected void memoryCopy(byte[] src, int srcOffset, byte[] dst, int dstOffset, int length) {
if (length == 0) {
return;
}
System.arraycopy(src, srcOffset, dst, dstOffset, length);
}
}
static final class DirectArena extends PoolArena<ByteBuffer> {
private static final boolean HAS_UNSAFE = PlatformDependent.hasUnsafe();
2013-02-21 23:17:04 +01:00
DirectArena(PooledByteBufAllocator parent, int pageSize, int maxOrder, int pageShifts, int chunkSize) {
super(parent, pageSize, maxOrder, pageShifts, chunkSize);
}
@Override
boolean isDirect() {
return true;
}
@Override
protected PoolChunk<ByteBuffer> newChunk(int pageSize, int maxOrder, int pageShifts, int chunkSize) {
return new PoolChunk<ByteBuffer>(
this, ByteBuffer.allocateDirect(chunkSize), pageSize, maxOrder, pageShifts, chunkSize);
}
@Override
protected PoolChunk<ByteBuffer> newUnpooledChunk(int capacity) {
return new PoolChunk<ByteBuffer>(this, ByteBuffer.allocateDirect(capacity), capacity);
}
@Override
protected void destroyChunk(PoolChunk<ByteBuffer> chunk) {
PlatformDependent.freeDirectBuffer(chunk.memory);
}
@Override
protected PooledByteBuf<ByteBuffer> newByteBuf(int maxCapacity) {
if (HAS_UNSAFE) {
return PooledUnsafeDirectByteBuf.newInstance(maxCapacity);
} else {
return PooledDirectByteBuf.newInstance(maxCapacity);
}
}
@Override
protected void memoryCopy(ByteBuffer src, int srcOffset, ByteBuffer dst, int dstOffset, int length) {
if (length == 0) {
return;
}
if (HAS_UNSAFE) {
PlatformDependent.copyMemory(
PlatformDependent.directBufferAddress(src) + srcOffset,
PlatformDependent.directBufferAddress(dst) + dstOffset, length);
} else {
// We must duplicate the NIO buffers because they may be accessed by other Netty buffers.
src = src.duplicate();
dst = dst.duplicate();
src.position(srcOffset).limit(srcOffset + length);
dst.position(dstOffset);
dst.put(src);
}
}
}
}