Remove the notion of ByteBufAllocator.bufferMaxCapacity()
- Allocate the unpooled memory if the requested capacity is greater then the chunkSize - Fixes #834
This commit is contained in:
parent
35c01660da
commit
67da6e4bf9
@ -18,28 +18,18 @@ package io.netty.buffer;
|
|||||||
|
|
||||||
public abstract class AbstractByteBufAllocator implements ByteBufAllocator {
|
public abstract class AbstractByteBufAllocator implements ByteBufAllocator {
|
||||||
|
|
||||||
private final int bufferMaxCapacity;
|
|
||||||
private final boolean directByDefault;
|
private final boolean directByDefault;
|
||||||
private final ByteBuf emptyBuf;
|
private final ByteBuf emptyBuf;
|
||||||
|
|
||||||
protected AbstractByteBufAllocator(int bufferMaxCapacity) {
|
protected AbstractByteBufAllocator() {
|
||||||
this(bufferMaxCapacity, false);
|
this(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected AbstractByteBufAllocator(int bufferMaxCapacity, boolean directByDefault) {
|
protected AbstractByteBufAllocator(boolean directByDefault) {
|
||||||
if (bufferMaxCapacity <= 0) {
|
|
||||||
throw new IllegalArgumentException("bufferMaxCapacity: " + bufferMaxCapacity + " (expected: 1+)");
|
|
||||||
}
|
|
||||||
this.directByDefault = directByDefault;
|
this.directByDefault = directByDefault;
|
||||||
this.bufferMaxCapacity = bufferMaxCapacity;
|
|
||||||
emptyBuf = new UnpooledHeapByteBuf(this, 0, 0);
|
emptyBuf = new UnpooledHeapByteBuf(this, 0, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public int bufferMaxCapacity() {
|
|
||||||
return bufferMaxCapacity;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ByteBuf buffer() {
|
public ByteBuf buffer() {
|
||||||
if (directByDefault) {
|
if (directByDefault) {
|
||||||
@ -66,12 +56,12 @@ public abstract class AbstractByteBufAllocator implements ByteBufAllocator {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ByteBuf heapBuffer() {
|
public ByteBuf heapBuffer() {
|
||||||
return heapBuffer(256, bufferMaxCapacity());
|
return heapBuffer(256, Integer.MAX_VALUE);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ByteBuf heapBuffer(int initialCapacity) {
|
public ByteBuf heapBuffer(int initialCapacity) {
|
||||||
return heapBuffer(initialCapacity, bufferMaxCapacity());
|
return heapBuffer(initialCapacity, Integer.MAX_VALUE);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -85,12 +75,12 @@ public abstract class AbstractByteBufAllocator implements ByteBufAllocator {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ByteBuf directBuffer() {
|
public ByteBuf directBuffer() {
|
||||||
return directBuffer(256, bufferMaxCapacity());
|
return directBuffer(256, Integer.MAX_VALUE);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ByteBuf directBuffer(int initialCapacity) {
|
public ByteBuf directBuffer(int initialCapacity) {
|
||||||
return directBuffer(initialCapacity, bufferMaxCapacity());
|
return directBuffer(initialCapacity, Integer.MAX_VALUE);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -138,10 +128,9 @@ public abstract class AbstractByteBufAllocator implements ByteBufAllocator {
|
|||||||
return new DefaultCompositeByteBuf(this, true, maxNumComponents);
|
return new DefaultCompositeByteBuf(this, true, maxNumComponents);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void validate(int initialCapacity, int maxCapacity) {
|
private static void validate(int initialCapacity, int maxCapacity) {
|
||||||
if (maxCapacity > bufferMaxCapacity()) {
|
if (initialCapacity < 0) {
|
||||||
throw new IllegalArgumentException(
|
throw new IllegalArgumentException("initialCapacity: " + initialCapacity + " (expectd: 0+)");
|
||||||
"maxCapacity: " + maxCapacity + " (expected: not greater than " + bufferMaxCapacity());
|
|
||||||
}
|
}
|
||||||
if (initialCapacity > maxCapacity) {
|
if (initialCapacity > maxCapacity) {
|
||||||
throw new IllegalArgumentException(String.format(
|
throw new IllegalArgumentException(String.format(
|
||||||
|
@ -34,6 +34,4 @@ public interface ByteBufAllocator {
|
|||||||
CompositeByteBuf compositeHeapBuffer(int maxNumComponents);
|
CompositeByteBuf compositeHeapBuffer(int maxNumComponents);
|
||||||
CompositeByteBuf compositeDirectBuffer();
|
CompositeByteBuf compositeDirectBuffer();
|
||||||
CompositeByteBuf compositeDirectBuffer(int maxNumComponents);
|
CompositeByteBuf compositeDirectBuffer(int maxNumComponents);
|
||||||
|
|
||||||
int bufferMaxCapacity();
|
|
||||||
}
|
}
|
||||||
|
@ -130,6 +130,9 @@ abstract class PoolArena<T> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
} else if (normCapacity > chunkSize) {
|
||||||
|
allocateHuge(buf, reqCapacity);
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
allocateNormal(buf, reqCapacity, normCapacity);
|
allocateNormal(buf, reqCapacity, normCapacity);
|
||||||
@ -150,9 +153,17 @@ abstract class PoolArena<T> {
|
|||||||
qInit.add(c);
|
qInit.add(c);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void allocateHuge(PooledByteBuf<T> buf, int reqCapacity) {
|
||||||
|
buf.initUnpooled(newUnpooledChunk(reqCapacity), reqCapacity);
|
||||||
|
}
|
||||||
|
|
||||||
synchronized void free(PoolChunk<T> chunk, long handle) {
|
synchronized void free(PoolChunk<T> chunk, long handle) {
|
||||||
|
if (chunk.unpooled) {
|
||||||
|
destroyChunk(chunk);
|
||||||
|
} else {
|
||||||
chunk.parent.free(chunk, handle);
|
chunk.parent.free(chunk, handle);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
void addSubpage(PoolSubpage<T> subpage) {
|
void addSubpage(PoolSubpage<T> subpage) {
|
||||||
int tableIdx;
|
int tableIdx;
|
||||||
@ -175,8 +186,11 @@ abstract class PoolArena<T> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private int normalizeCapacity(int reqCapacity) {
|
private int normalizeCapacity(int reqCapacity) {
|
||||||
if (reqCapacity < 0 || reqCapacity > chunkSize) {
|
if (reqCapacity < 0) {
|
||||||
throw new IllegalArgumentException("capacity: " + reqCapacity + " (expected: 0-" + chunkSize + ')');
|
throw new IllegalArgumentException("capacity: " + reqCapacity + " (expected: 0+)");
|
||||||
|
}
|
||||||
|
if (reqCapacity >= chunkSize) {
|
||||||
|
return reqCapacity;
|
||||||
}
|
}
|
||||||
|
|
||||||
if ((reqCapacity & 0xFFFFFE00) != 0) { // >= 512
|
if ((reqCapacity & 0xFFFFFE00) != 0) { // >= 512
|
||||||
@ -240,6 +254,7 @@ abstract class PoolArena<T> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
protected abstract PoolChunk<T> newChunk(int pageSize, int maxOrder, int pageShifts, int chunkSize);
|
protected abstract PoolChunk<T> newChunk(int pageSize, int maxOrder, int pageShifts, int chunkSize);
|
||||||
|
protected abstract PoolChunk<T> newUnpooledChunk(int capacity);
|
||||||
protected abstract PooledByteBuf<T> newByteBuf(int maxCapacity);
|
protected abstract PooledByteBuf<T> newByteBuf(int maxCapacity);
|
||||||
protected abstract void memoryCopy(T src, int srcOffset, T dst, int dstOffset, int length);
|
protected abstract void memoryCopy(T src, int srcOffset, T dst, int dstOffset, int length);
|
||||||
protected abstract void destroyChunk(PoolChunk<T> chunk);
|
protected abstract void destroyChunk(PoolChunk<T> chunk);
|
||||||
@ -311,6 +326,11 @@ abstract class PoolArena<T> {
|
|||||||
return new PoolChunk<byte[]>(this, new byte[chunkSize], pageSize, maxOrder, pageShifts, chunkSize);
|
return new PoolChunk<byte[]>(this, new byte[chunkSize], pageSize, maxOrder, pageShifts, chunkSize);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected PoolChunk<byte[]> newUnpooledChunk(int capacity) {
|
||||||
|
return new PoolChunk<byte[]>(this, new byte[capacity], capacity);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void destroyChunk(PoolChunk<byte[]> chunk) {
|
protected void destroyChunk(PoolChunk<byte[]> chunk) {
|
||||||
// Rely on GC.
|
// Rely on GC.
|
||||||
@ -343,6 +363,11 @@ abstract class PoolArena<T> {
|
|||||||
this, ByteBuffer.allocateDirect(chunkSize), pageSize, maxOrder, pageShifts, chunkSize);
|
this, ByteBuffer.allocateDirect(chunkSize), pageSize, maxOrder, pageShifts, chunkSize);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected PoolChunk<ByteBuffer> newUnpooledChunk(int capacity) {
|
||||||
|
return new PoolChunk<ByteBuffer>(this, ByteBuffer.allocateDirect(capacity), capacity);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void destroyChunk(PoolChunk<ByteBuffer> chunk) {
|
protected void destroyChunk(PoolChunk<ByteBuffer> chunk) {
|
||||||
UnpooledDirectByteBuf.freeDirect(chunk.memory);
|
UnpooledDirectByteBuf.freeDirect(chunk.memory);
|
||||||
|
@ -28,6 +28,7 @@ final class PoolChunk<T> {
|
|||||||
|
|
||||||
final PoolArena<T> arena;
|
final PoolArena<T> arena;
|
||||||
final T memory;
|
final T memory;
|
||||||
|
final boolean unpooled;
|
||||||
|
|
||||||
private final int[] memoryMap;
|
private final int[] memoryMap;
|
||||||
private final PoolSubpage<T>[] subpages;
|
private final PoolSubpage<T>[] subpages;
|
||||||
@ -51,6 +52,7 @@ final class PoolChunk<T> {
|
|||||||
//private long pad0, pad1, pad2, pad3, pad4, pad5, pad6, pad7;
|
//private long pad0, pad1, pad2, pad3, pad4, pad5, pad6, pad7;
|
||||||
|
|
||||||
PoolChunk(PoolArena<T> arena, T memory, int pageSize, int maxOrder, int pageShifts, int chunkSize) {
|
PoolChunk(PoolArena<T> arena, T memory, int pageSize, int maxOrder, int pageShifts, int chunkSize) {
|
||||||
|
unpooled = false;
|
||||||
this.arena = arena;
|
this.arena = arena;
|
||||||
this.memory = memory;
|
this.memory = memory;
|
||||||
this.pageSize = pageSize;
|
this.pageSize = pageSize;
|
||||||
@ -76,6 +78,20 @@ final class PoolChunk<T> {
|
|||||||
subpages = newSubpageArray(maxSubpageAllocs);
|
subpages = newSubpageArray(maxSubpageAllocs);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** Creates a special chunk that is not pooled. */
|
||||||
|
PoolChunk(PoolArena<T> arena, T memory, int size) {
|
||||||
|
unpooled = true;
|
||||||
|
this.arena = arena;
|
||||||
|
this.memory = memory;
|
||||||
|
memoryMap = null;
|
||||||
|
subpages = null;
|
||||||
|
subpageOverflowMask = 0;
|
||||||
|
pageSize = 0;
|
||||||
|
pageShifts = 0;
|
||||||
|
chunkSize = size;
|
||||||
|
maxSubpageAllocs = 0;
|
||||||
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
private PoolSubpage<T>[] newSubpageArray(int size) {
|
private PoolSubpage<T>[] newSubpageArray(int size) {
|
||||||
return new PoolSubpage[size];
|
return new PoolSubpage[size];
|
||||||
@ -266,7 +282,7 @@ final class PoolChunk<T> {
|
|||||||
if (bitmapIdx == 0) {
|
if (bitmapIdx == 0) {
|
||||||
int val = memoryMap[memoryMapIdx];
|
int val = memoryMap[memoryMapIdx];
|
||||||
assert (val & 3) == ST_ALLOCATED : String.valueOf(val & 3);
|
assert (val & 3) == ST_ALLOCATED : String.valueOf(val & 3);
|
||||||
buf.init(this, handle, memory, runOffset(val), reqCapacity, runLength(val));
|
buf.init(this, handle, runOffset(val), reqCapacity, runLength(val));
|
||||||
} else {
|
} else {
|
||||||
initBufWithSubpage(buf, handle, bitmapIdx, reqCapacity);
|
initBufWithSubpage(buf, handle, bitmapIdx, reqCapacity);
|
||||||
}
|
}
|
||||||
@ -288,7 +304,7 @@ final class PoolChunk<T> {
|
|||||||
assert reqCapacity <= subpage.elemSize;
|
assert reqCapacity <= subpage.elemSize;
|
||||||
|
|
||||||
buf.init(
|
buf.init(
|
||||||
this, handle, memory,
|
this, handle,
|
||||||
runOffset(val) + (bitmapIdx & 0x3FFFFFFF) * subpage.elemSize, reqCapacity, subpage.elemSize);
|
runOffset(val) + (bitmapIdx & 0x3FFFFFFF) * subpage.elemSize, reqCapacity, subpage.elemSize);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -37,13 +37,13 @@ abstract class PooledByteBuf<T> extends AbstractByteBuf {
|
|||||||
super(maxCapacity);
|
super(maxCapacity);
|
||||||
}
|
}
|
||||||
|
|
||||||
final void init(PoolChunk<T> chunk, long handle, T memory, int offset, int length, int maxLength) {
|
final void init(PoolChunk<T> chunk, long handle, int offset, int length, int maxLength) {
|
||||||
assert handle >= 0;
|
assert handle >= 0;
|
||||||
assert memory != null;
|
assert chunk != null;
|
||||||
|
|
||||||
this.chunk = chunk;
|
this.chunk = chunk;
|
||||||
this.handle = handle;
|
this.handle = handle;
|
||||||
this.memory = memory;
|
memory = chunk.memory;
|
||||||
this.offset = offset;
|
this.offset = offset;
|
||||||
this.length = length;
|
this.length = length;
|
||||||
this.maxLength = maxLength;
|
this.maxLength = maxLength;
|
||||||
@ -51,6 +51,18 @@ abstract class PooledByteBuf<T> extends AbstractByteBuf {
|
|||||||
tmpNioBuf = null;
|
tmpNioBuf = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
final void initUnpooled(PoolChunk<T> chunk, int length) {
|
||||||
|
assert chunk != null;
|
||||||
|
|
||||||
|
this.chunk = chunk;
|
||||||
|
handle = 0;
|
||||||
|
memory = chunk.memory;
|
||||||
|
offset = 0;
|
||||||
|
this.length = maxLength = length;
|
||||||
|
setIndex(0, 0);
|
||||||
|
tmpNioBuf = null;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public final int capacity() {
|
public final int capacity() {
|
||||||
return length;
|
return length;
|
||||||
@ -61,6 +73,11 @@ abstract class PooledByteBuf<T> extends AbstractByteBuf {
|
|||||||
checkUnfreed();
|
checkUnfreed();
|
||||||
|
|
||||||
// If the request capacity does not require reallocation, just update the length of the memory.
|
// If the request capacity does not require reallocation, just update the length of the memory.
|
||||||
|
if (chunk.unpooled) {
|
||||||
|
if (newCapacity == length) {
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
if (newCapacity > length) {
|
if (newCapacity > length) {
|
||||||
if (newCapacity <= maxLength) {
|
if (newCapacity <= maxLength) {
|
||||||
length = newCapacity;
|
length = newCapacity;
|
||||||
@ -83,6 +100,7 @@ abstract class PooledByteBuf<T> extends AbstractByteBuf {
|
|||||||
} else {
|
} else {
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Reallocation required.
|
// Reallocation required.
|
||||||
if (suspendedDeallocations == null) {
|
if (suspendedDeallocations == null) {
|
||||||
@ -144,7 +162,7 @@ abstract class PooledByteBuf<T> extends AbstractByteBuf {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for (Allocation<T> a: suspendedDeallocations) {
|
for (Allocation<T> a: suspendedDeallocations) {
|
||||||
chunk.arena.free(a.chunk, a.handle);
|
a.chunk.arena.free(a.chunk, a.handle);
|
||||||
}
|
}
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
@ -59,7 +59,10 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator {
|
|||||||
|
|
||||||
public PooledByteBufAllocator(
|
public PooledByteBufAllocator(
|
||||||
boolean directByDefault, int nHeapArena, int nDirectArena, int pageSize, int maxOrder) {
|
boolean directByDefault, int nHeapArena, int nDirectArena, int pageSize, int maxOrder) {
|
||||||
super(validateAndCalculateChunkSize(pageSize, maxOrder), directByDefault);
|
super(directByDefault);
|
||||||
|
|
||||||
|
final int chunkSize = validateAndCalculateChunkSize(pageSize, maxOrder);
|
||||||
|
|
||||||
if (nHeapArena <= 0) {
|
if (nHeapArena <= 0) {
|
||||||
throw new IllegalArgumentException("nHeapArena: " + nHeapArena + " (expected: 1+)");
|
throw new IllegalArgumentException("nHeapArena: " + nHeapArena + " (expected: 1+)");
|
||||||
}
|
}
|
||||||
@ -68,7 +71,6 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator {
|
|||||||
}
|
}
|
||||||
|
|
||||||
int pageShifts = validateAndCalculatePageShifts(pageSize);
|
int pageShifts = validateAndCalculatePageShifts(pageSize);
|
||||||
int chunkSize = bufferMaxCapacity();
|
|
||||||
|
|
||||||
heapArenas = newArenaArray(nHeapArena);
|
heapArenas = newArenaArray(nHeapArena);
|
||||||
for (int i = 0; i < heapArenas.length; i ++) {
|
for (int i = 0; i < heapArenas.length; i ++) {
|
||||||
@ -147,11 +149,17 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator {
|
|||||||
public String toString() {
|
public String toString() {
|
||||||
StringBuilder buf = new StringBuilder();
|
StringBuilder buf = new StringBuilder();
|
||||||
buf.append(heapArenas.length);
|
buf.append(heapArenas.length);
|
||||||
buf.append(" arena(s):");
|
buf.append(" heap arena(s):");
|
||||||
buf.append(StringUtil.NEWLINE);
|
buf.append(StringUtil.NEWLINE);
|
||||||
for (PoolArena<byte[]> a: heapArenas) {
|
for (PoolArena<byte[]> a: heapArenas) {
|
||||||
buf.append(a);
|
buf.append(a);
|
||||||
}
|
}
|
||||||
|
buf.append(directArenas.length);
|
||||||
|
buf.append(" direct arena(s):");
|
||||||
|
buf.append(StringUtil.NEWLINE);
|
||||||
|
for (PoolArena<ByteBuffer> a: directArenas) {
|
||||||
|
buf.append(a);
|
||||||
|
}
|
||||||
return buf.toString();
|
return buf.toString();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -26,7 +26,7 @@ public final class UnpooledByteBufAllocator extends AbstractByteBufAllocator {
|
|||||||
public static final UnpooledByteBufAllocator DIRECT_BY_DEFAULT = new UnpooledByteBufAllocator(true);
|
public static final UnpooledByteBufAllocator DIRECT_BY_DEFAULT = new UnpooledByteBufAllocator(true);
|
||||||
|
|
||||||
private UnpooledByteBufAllocator(boolean directByDefault) {
|
private UnpooledByteBufAllocator(boolean directByDefault) {
|
||||||
super(Integer.MAX_VALUE, directByDefault);
|
super(directByDefault);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
Loading…
Reference in New Issue
Block a user