Return the new buffer's capacity is same with the requested capacity
- Rename capacity variables to reqCapacity or normCapacity to distinguish if its the request capacity or the normalized capacity - Do not reallocate on ByteBuf.capacity(int) if reallocation is unnecessary; just update the index range. - Revert the workaround in DefaultChannelHandlerContext
This commit is contained in:
parent
b6e83dff4f
commit
0e017db89a
@ -83,23 +83,23 @@ abstract class PoolArena<T> {
|
||||
return new Deque[size];
|
||||
}
|
||||
|
||||
PooledByteBuf<T> allocate(PoolThreadCache cache, int minCapacity, int maxCapacity) {
|
||||
PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) {
|
||||
PooledByteBuf<T> buf = newByteBuf(maxCapacity);
|
||||
allocate(cache, buf, minCapacity);
|
||||
allocate(cache, buf, reqCapacity);
|
||||
return buf;
|
||||
}
|
||||
|
||||
private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, int minCapacity) {
|
||||
final int capacity = normalizeCapacity(minCapacity);
|
||||
if ((capacity & subpageOverflowMask) == 0) { // capacity < pageSize
|
||||
private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
|
||||
final int normCapacity = normalizeCapacity(reqCapacity);
|
||||
if ((normCapacity & subpageOverflowMask) == 0) { // capacity < pageSize
|
||||
int tableIdx;
|
||||
Deque<PoolSubpage<T>>[] table;
|
||||
if ((capacity & 0xFFFFFE00) == 0) { // < 512
|
||||
tableIdx = capacity >>> 4;
|
||||
if ((normCapacity & 0xFFFFFE00) == 0) { // < 512
|
||||
tableIdx = normCapacity >>> 4;
|
||||
table = tinySubpagePools;
|
||||
} else {
|
||||
tableIdx = 0;
|
||||
int i = capacity >>> 10;
|
||||
int i = normCapacity >>> 10;
|
||||
while (i != 0) {
|
||||
i >>>= 1;
|
||||
tableIdx ++;
|
||||
@ -115,7 +115,7 @@ abstract class PoolArena<T> {
|
||||
break;
|
||||
}
|
||||
|
||||
if (!s.doNotDestroy || s.elemSize != capacity) {
|
||||
if (!s.doNotDestroy || s.elemSize != normCapacity) {
|
||||
// The subpage has been destroyed or being used for different element size.
|
||||
subpages.removeFirst();
|
||||
continue;
|
||||
@ -125,28 +125,28 @@ abstract class PoolArena<T> {
|
||||
if (handle < 0) {
|
||||
subpages.removeFirst();
|
||||
} else {
|
||||
s.chunk.initBufWithSubpage(buf, handle);
|
||||
s.chunk.initBufWithSubpage(buf, handle, reqCapacity);
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
allocateNormal(buf, capacity);
|
||||
allocateNormal(buf, reqCapacity, normCapacity);
|
||||
}
|
||||
|
||||
private synchronized void allocateNormal(PooledByteBuf<T> buf, int capacity) {
|
||||
if (q050.allocate(buf, capacity) || q025.allocate(buf, capacity) ||
|
||||
q000.allocate(buf, capacity) || qInit.allocate(buf, capacity) ||
|
||||
q075.allocate(buf, capacity)) {
|
||||
private synchronized void allocateNormal(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) {
|
||||
if (q050.allocate(buf, reqCapacity, normCapacity) || q025.allocate(buf, reqCapacity, normCapacity) ||
|
||||
q000.allocate(buf, reqCapacity, normCapacity) || qInit.allocate(buf, reqCapacity, normCapacity) ||
|
||||
q075.allocate(buf, reqCapacity, normCapacity)) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Add a new chunk.
|
||||
PoolChunk<T> c = newChunk(pageSize, maxOrder, pageShifts, chunkSize);
|
||||
long handle = c.allocate(capacity);
|
||||
long handle = c.allocate(normCapacity);
|
||||
assert handle > 0;
|
||||
c.initBuf(buf, handle);
|
||||
c.initBuf(buf, handle, reqCapacity);
|
||||
qInit.add(c);
|
||||
}
|
||||
|
||||
@ -174,26 +174,26 @@ abstract class PoolArena<T> {
|
||||
table[tableIdx].addFirst(subpage);
|
||||
}
|
||||
|
||||
private int normalizeCapacity(int capacity) {
|
||||
if (capacity < 0 || capacity > chunkSize) {
|
||||
throw new IllegalArgumentException("capacity: " + capacity + " (expected: 0-" + chunkSize + ')');
|
||||
private int normalizeCapacity(int reqCapacity) {
|
||||
if (reqCapacity < 0 || reqCapacity > chunkSize) {
|
||||
throw new IllegalArgumentException("capacity: " + reqCapacity + " (expected: 0-" + chunkSize + ')');
|
||||
}
|
||||
|
||||
if ((capacity & 0xFFFFFE00) != 0) { // >= 512
|
||||
if ((reqCapacity & 0xFFFFFE00) != 0) { // >= 512
|
||||
// Doubled
|
||||
int normalizedCapacity = 512;
|
||||
while (normalizedCapacity < capacity) {
|
||||
while (normalizedCapacity < reqCapacity) {
|
||||
normalizedCapacity <<= 1;
|
||||
}
|
||||
return normalizedCapacity;
|
||||
}
|
||||
|
||||
// Quantum-spaced
|
||||
if ((capacity & 15) == 0) {
|
||||
return capacity;
|
||||
if ((reqCapacity & 15) == 0) {
|
||||
return reqCapacity;
|
||||
}
|
||||
|
||||
return (capacity & ~15) + 16;
|
||||
return (reqCapacity & ~15) + 16;
|
||||
}
|
||||
|
||||
void reallocate(PooledByteBuf<T> buf, int newCapacity, boolean freeOldMemory) {
|
||||
|
@ -93,16 +93,16 @@ final class PoolChunk<T> {
|
||||
return 100 - freePercentage;
|
||||
}
|
||||
|
||||
long allocate(int capacity) {
|
||||
long allocate(int normCapacity) {
|
||||
int firstVal = memoryMap[1];
|
||||
if ((capacity & subpageOverflowMask) != 0) { // >= pageSize
|
||||
return allocateRun(capacity, 1, firstVal);
|
||||
if ((normCapacity & subpageOverflowMask) != 0) { // >= pageSize
|
||||
return allocateRun(normCapacity, 1, firstVal);
|
||||
} else {
|
||||
return allocateSubpage(capacity, 1, firstVal);
|
||||
return allocateSubpage(normCapacity, 1, firstVal);
|
||||
}
|
||||
}
|
||||
|
||||
private long allocateRun(int capacity, int curIdx, int val) {
|
||||
private long allocateRun(int normCapacity, int curIdx, int val) {
|
||||
for (;;) {
|
||||
if ((val & ST_ALLOCATED) != 0) { // state == ST_ALLOCATED || state == ST_ALLOCATED_SUBPAGE
|
||||
return -1;
|
||||
@ -110,7 +110,7 @@ final class PoolChunk<T> {
|
||||
|
||||
if ((val & ST_BRANCH) != 0) { // state == ST_BRANCH
|
||||
int nextIdx = curIdx << 1 ^ nextRandom();
|
||||
long res = allocateRun(capacity, nextIdx, memoryMap[nextIdx]);
|
||||
long res = allocateRun(normCapacity, nextIdx, memoryMap[nextIdx]);
|
||||
if (res > 0) {
|
||||
return res;
|
||||
}
|
||||
@ -121,18 +121,18 @@ final class PoolChunk<T> {
|
||||
}
|
||||
|
||||
// state == ST_UNUSED
|
||||
return allocateRunSimple(capacity, curIdx, val);
|
||||
return allocateRunSimple(normCapacity, curIdx, val);
|
||||
}
|
||||
}
|
||||
|
||||
private long allocateRunSimple(int capacity, int curIdx, int val) {
|
||||
private long allocateRunSimple(int normCapacity, int curIdx, int val) {
|
||||
int runLength = runLength(val);
|
||||
if (capacity > runLength) {
|
||||
if (normCapacity > runLength) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
for (;;) {
|
||||
if (capacity == runLength) {
|
||||
if (normCapacity == runLength) {
|
||||
// Found the run that fits.
|
||||
// Note that capacity has been normalized already, so we don't need to deal with
|
||||
// the values that are not power of 2.
|
||||
@ -154,26 +154,26 @@ final class PoolChunk<T> {
|
||||
}
|
||||
}
|
||||
|
||||
private long allocateSubpage(int capacity, int curIdx, int val) {
|
||||
private long allocateSubpage(int normCapacity, int curIdx, int val) {
|
||||
int state = val & 3;
|
||||
if (state == ST_BRANCH) {
|
||||
int nextIdx = curIdx << 1 ^ nextRandom();
|
||||
long res = branchSubpage(capacity, nextIdx);
|
||||
long res = branchSubpage(normCapacity, nextIdx);
|
||||
if (res > 0) {
|
||||
return res;
|
||||
}
|
||||
|
||||
return branchSubpage(capacity, nextIdx ^ 1);
|
||||
return branchSubpage(normCapacity, nextIdx ^ 1);
|
||||
}
|
||||
|
||||
if (state == ST_UNUSED) {
|
||||
return allocateSubpageSimple(capacity, curIdx, val);
|
||||
return allocateSubpageSimple(normCapacity, curIdx, val);
|
||||
}
|
||||
|
||||
if (state == ST_ALLOCATED_SUBPAGE) {
|
||||
PoolSubpage<T> subpage = subpages[subpageIdx(curIdx)];
|
||||
int elemSize = subpage.elemSize;
|
||||
if (capacity != elemSize) {
|
||||
if (normCapacity != elemSize) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
@ -183,7 +183,7 @@ final class PoolChunk<T> {
|
||||
return -1;
|
||||
}
|
||||
|
||||
private long allocateSubpageSimple(int capacity, int curIdx, int val) {
|
||||
private long allocateSubpageSimple(int normCapacity, int curIdx, int val) {
|
||||
int runLength = runLength(val);
|
||||
for (;;) {
|
||||
if (runLength == pageSize) {
|
||||
@ -193,10 +193,10 @@ final class PoolChunk<T> {
|
||||
int subpageIdx = subpageIdx(curIdx);
|
||||
PoolSubpage<T> subpage = subpages[subpageIdx];
|
||||
if (subpage == null) {
|
||||
subpage = new PoolSubpage<T>(this, curIdx, runOffset(val), pageSize, capacity);
|
||||
subpage = new PoolSubpage<T>(this, curIdx, runOffset(val), pageSize, normCapacity);
|
||||
subpages[subpageIdx] = subpage;
|
||||
} else {
|
||||
subpage.init(capacity);
|
||||
subpage.init(normCapacity);
|
||||
}
|
||||
arena.addSubpage(subpage);
|
||||
return subpage.allocate();
|
||||
@ -215,10 +215,10 @@ final class PoolChunk<T> {
|
||||
}
|
||||
}
|
||||
|
||||
private long branchSubpage(int capacity, int nextIdx) {
|
||||
private long branchSubpage(int normCapacity, int nextIdx) {
|
||||
int nextVal = memoryMap[nextIdx];
|
||||
if ((nextVal & 3) != ST_ALLOCATED) {
|
||||
return allocateSubpage(capacity, nextIdx, nextVal);
|
||||
return allocateSubpage(normCapacity, nextIdx, nextVal);
|
||||
}
|
||||
return -1;
|
||||
}
|
||||
@ -260,23 +260,23 @@ final class PoolChunk<T> {
|
||||
}
|
||||
}
|
||||
|
||||
void initBuf(PooledByteBuf<T> buf, long handle) {
|
||||
void initBuf(PooledByteBuf<T> buf, long handle, int reqCapacity) {
|
||||
int memoryMapIdx = (int) handle;
|
||||
int bitmapIdx = (int) (handle >>> 32);
|
||||
if (bitmapIdx == 0) {
|
||||
int val = memoryMap[memoryMapIdx];
|
||||
assert (val & 3) == ST_ALLOCATED : String.valueOf(val & 3);
|
||||
buf.init(this, handle, memory, runOffset(val), runLength(val));
|
||||
buf.init(this, handle, memory, runOffset(val), reqCapacity, runLength(val));
|
||||
} else {
|
||||
initBufWithSubpage(buf, handle, bitmapIdx);
|
||||
initBufWithSubpage(buf, handle, bitmapIdx, reqCapacity);
|
||||
}
|
||||
}
|
||||
|
||||
void initBufWithSubpage(PooledByteBuf<T> buf, long handle) {
|
||||
initBufWithSubpage(buf, handle, (int) (handle >>> 32));
|
||||
void initBufWithSubpage(PooledByteBuf<T> buf, long handle, int reqCapacity) {
|
||||
initBufWithSubpage(buf, handle, (int) (handle >>> 32), reqCapacity);
|
||||
}
|
||||
|
||||
private void initBufWithSubpage(PooledByteBuf<T> buf, long handle, int bitmapIdx) {
|
||||
private void initBufWithSubpage(PooledByteBuf<T> buf, long handle, int bitmapIdx, int reqCapacity) {
|
||||
assert bitmapIdx != 0;
|
||||
|
||||
int memoryMapIdx = (int) handle;
|
||||
@ -285,10 +285,11 @@ final class PoolChunk<T> {
|
||||
|
||||
PoolSubpage<T> subpage = subpages[subpageIdx(memoryMapIdx)];
|
||||
assert subpage.doNotDestroy;
|
||||
assert reqCapacity <= subpage.elemSize;
|
||||
|
||||
buf.init(
|
||||
this, handle, memory,
|
||||
runOffset(val) + (bitmapIdx & 0x3FFFFFFF) * subpage.elemSize, subpage.elemSize);
|
||||
runOffset(val) + (bitmapIdx & 0x3FFFFFFF) * subpage.elemSize, reqCapacity, subpage.elemSize);
|
||||
}
|
||||
|
||||
private static int parentIdx(int memoryMapIdx) {
|
||||
|
@ -38,20 +38,20 @@ final class PoolChunkList<T> {
|
||||
this.maxUsage = maxUsage;
|
||||
}
|
||||
|
||||
boolean allocate(PooledByteBuf<T> buf, int capacity) {
|
||||
boolean allocate(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) {
|
||||
if (head == null) {
|
||||
return false;
|
||||
}
|
||||
|
||||
for (PoolChunk<T> cur = head;;) {
|
||||
long handle = cur.allocate(capacity);
|
||||
long handle = cur.allocate(normCapacity);
|
||||
if (handle < 0) {
|
||||
cur = cur.next;
|
||||
if (cur == null) {
|
||||
return false;
|
||||
}
|
||||
} else {
|
||||
cur.initBuf(buf, handle);
|
||||
cur.initBuf(buf, handle, reqCapacity);
|
||||
if (cur.usage() >= maxUsage) {
|
||||
remove(cur);
|
||||
nextList.add(cur);
|
||||
|
@ -28,6 +28,7 @@ abstract class PooledByteBuf<T> extends AbstractByteBuf {
|
||||
protected T memory;
|
||||
protected int offset;
|
||||
protected int length;
|
||||
private int maxLength;
|
||||
|
||||
private ByteBuffer tmpNioBuf;
|
||||
private Queue<Allocation<T>> suspendedDeallocations;
|
||||
@ -36,7 +37,7 @@ abstract class PooledByteBuf<T> extends AbstractByteBuf {
|
||||
super(maxCapacity);
|
||||
}
|
||||
|
||||
final void init(PoolChunk<T> chunk, long handle, T memory, int offset, int length) {
|
||||
final void init(PoolChunk<T> chunk, long handle, T memory, int offset, int length, int maxLength) {
|
||||
assert handle >= 0;
|
||||
assert memory != null;
|
||||
|
||||
@ -45,6 +46,7 @@ abstract class PooledByteBuf<T> extends AbstractByteBuf {
|
||||
this.memory = memory;
|
||||
this.offset = offset;
|
||||
this.length = length;
|
||||
this.maxLength = maxLength;
|
||||
setIndex(0, 0);
|
||||
tmpNioBuf = null;
|
||||
}
|
||||
@ -57,6 +59,32 @@ abstract class PooledByteBuf<T> extends AbstractByteBuf {
|
||||
@Override
|
||||
public final ByteBuf capacity(int newCapacity) {
|
||||
checkUnfreed();
|
||||
|
||||
// If the request capacity does not require reallocation, just update the length of the memory.
|
||||
if (newCapacity > length) {
|
||||
if (newCapacity <= maxLength) {
|
||||
length = newCapacity;
|
||||
return this;
|
||||
}
|
||||
} else if (newCapacity < length) {
|
||||
if (newCapacity > maxLength >>> 1) {
|
||||
if (maxLength <= 512) {
|
||||
if (newCapacity > maxLength - 16) {
|
||||
length = newCapacity;
|
||||
setIndex(Math.min(readerIndex(), newCapacity), Math.min(writerIndex(), newCapacity));
|
||||
return this;
|
||||
}
|
||||
} else { // > 512 (i.e. >= 1024)
|
||||
length = newCapacity;
|
||||
setIndex(Math.min(readerIndex(), newCapacity), Math.min(writerIndex(), newCapacity));
|
||||
return this;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
return this;
|
||||
}
|
||||
|
||||
// Reallocation required.
|
||||
if (suspendedDeallocations == null) {
|
||||
chunk.arena.reallocate(this, newCapacity, true);
|
||||
} else {
|
||||
|
@ -1241,7 +1241,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
|
||||
data = ctx.alloc().buffer(dataLen, dataLen);
|
||||
}
|
||||
|
||||
byteBuf.readBytes(data, dataLen).discardSomeReadBytes();
|
||||
byteBuf.readBytes(data).discardSomeReadBytes();
|
||||
|
||||
exchangeBuf.add(data);
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user