Review PooledByteBufAllocator in respect of jemalloc 4.x changes and update allocate algorithm.(#10267)

Motivation:

For size from 512 bytes to chunkSize, we use a buddy algorithm. The
drawback is that it has a large internal fragmentation.

Modifications:

1. add SizeClassesMetric and SizeClasses
2. remove tiny size, now we have small, normal and huge size
3. rewrite the structure of PoolChunk
4. rewrite pooled allocate algorithm in PoolChunk
5. when allocate subpage, using lowest common multiple of pageSize and
   elemSize instead of pageSize.
   6. add more tests in PooledByteBufAllocatorTest and PoolArenaTest

Result:
Reduce internal fragmentation and closes #3910
This commit is contained in:
Ruwei 2020-07-16 03:33:27 +08:00 committed by Norman Maurer
parent 7bf1ffb2d4
commit 0d701d7c3c
13 changed files with 1306 additions and 646 deletions

View File

@ -26,31 +26,22 @@ import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.LongAdder;
import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
import static io.netty.buffer.PoolChunk.isSubpage;
import static java.lang.Math.max;
abstract class PoolArena<T> implements PoolArenaMetric {
abstract class PoolArena<T> extends SizeClasses implements PoolArenaMetric {
static final boolean HAS_UNSAFE = PlatformDependent.hasUnsafe();
enum SizeClass {
Tiny,
Small,
Normal
}
static final int numTinySubpagePools = 512 >>> 4;
final PooledByteBufAllocator parent;
private final int maxOrder;
final int pageSize;
final int pageShifts;
final int chunkSize;
final int subpageOverflowMask;
final int numSmallSubpagePools;
final int directMemoryCacheAlignment;
final int directMemoryCacheAlignmentMask;
private final PoolSubpage<T>[] tinySubpagePools;
private final PoolSubpage<T>[] smallSubpagePools;
private final PoolChunkList<T> q050;
@ -64,13 +55,12 @@ abstract class PoolArena<T> implements PoolArenaMetric {
// Metrics for allocations and deallocations
private long allocationsNormal;
// We need to use the LongAdder here as this is not guarded via synchronized block.
private final LongAdder allocationsTiny = new LongAdder();
private final LongAdder allocationsSmall = new LongAdder();
private final LongAdder allocationsHuge = new LongAdder();
private final LongAdder activeBytesHuge = new LongAdder();
private long deallocationsTiny;
private long deallocationsSmall;
private long deallocationsNormal;
@ -84,24 +74,16 @@ abstract class PoolArena<T> implements PoolArenaMetric {
//private long pad0, pad1, pad2, pad3, pad4, pad5, pad6, pad7;
protected PoolArena(PooledByteBufAllocator parent, int pageSize,
int maxOrder, int pageShifts, int chunkSize, int cacheAlignment) {
int pageShifts, int chunkSize, int cacheAlignment) {
super(pageSize, pageShifts, chunkSize, cacheAlignment);
this.parent = parent;
this.pageSize = pageSize;
this.maxOrder = maxOrder;
this.pageShifts = pageShifts;
this.chunkSize = chunkSize;
directMemoryCacheAlignment = cacheAlignment;
directMemoryCacheAlignmentMask = cacheAlignment - 1;
subpageOverflowMask = ~(pageSize - 1);
tinySubpagePools = newSubpagePoolArray(numTinySubpagePools);
for (int i = 0; i < tinySubpagePools.length; i ++) {
tinySubpagePools[i] = newSubpagePoolHead(pageSize);
}
numSmallSubpagePools = pageShifts - 9;
numSmallSubpagePools = nSubpages;
smallSubpagePools = newSubpagePoolArray(numSmallSubpagePools);
for (int i = 0; i < smallSubpagePools.length; i ++) {
smallSubpagePools[i] = newSubpagePoolHead(pageSize);
smallSubpagePools[i] = newSubpagePoolHead();
}
q100 = new PoolChunkList<>(this, null, 100, Integer.MAX_VALUE, chunkSize);
@ -128,8 +110,8 @@ abstract class PoolArena<T> implements PoolArenaMetric {
chunkListMetrics = Collections.unmodifiableList(metrics);
}
private PoolSubpage<T> newSubpagePoolHead(int pageSize) {
PoolSubpage<T> head = new PoolSubpage<>(pageSize);
private PoolSubpage<T> newSubpagePoolHead() {
PoolSubpage<T> head = new PoolSubpage<>();
head.prev = head;
head.next = head;
return head;
@ -148,114 +130,83 @@ abstract class PoolArena<T> implements PoolArenaMetric {
return buf;
}
static int tinyIdx(int normCapacity) {
return normCapacity >>> 4;
}
static int smallIdx(int normCapacity) {
int tableIdx = 0;
int i = normCapacity >>> 10;
while (i != 0) {
i >>>= 1;
tableIdx ++;
}
return tableIdx;
}
// capacity < pageSize
boolean isTinyOrSmall(int normCapacity) {
return (normCapacity & subpageOverflowMask) == 0;
}
// normCapacity < 512
static boolean isTiny(int normCapacity) {
return (normCapacity & 0xFFFFFE00) == 0;
}
private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
final int normCapacity = normalizeCapacity(reqCapacity);
if (isTinyOrSmall(normCapacity)) { // capacity < pageSize
int tableIdx;
PoolSubpage<T>[] table;
boolean tiny = isTiny(normCapacity);
if (tiny) { // < 512
if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {
// was able to allocate out of the cache so move on
return;
}
tableIdx = tinyIdx(normCapacity);
table = tinySubpagePools;
} else {
if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {
// was able to allocate out of the cache so move on
return;
}
tableIdx = smallIdx(normCapacity);
table = smallSubpagePools;
}
final int sizeIdx = size2SizeIdx(reqCapacity);
final PoolSubpage<T> head = table[tableIdx];
if (sizeIdx <= smallMaxSizeIdx) {
tcacheAllocateSmall(cache, buf, reqCapacity, sizeIdx);
} else if (sizeIdx < nSizes) {
tcacheAllocateNormal(cache, buf, reqCapacity, sizeIdx);
} else {
int normCapacity = directMemoryCacheAlignment > 0
? normalizeSize(reqCapacity) : reqCapacity;
// Huge allocations are never served via the cache so just call allocateHuge
allocateHuge(buf, normCapacity);
}
}
/**
* Synchronize on the head. This is needed as {@link PoolChunk#allocateSubpage(int)} and
* {@link PoolChunk#free(long)} may modify the doubly linked list as well.
*/
synchronized (head) {
final PoolSubpage<T> s = head.next;
if (s != head) {
assert s.doNotDestroy && s.elemSize == normCapacity;
long handle = s.allocate();
assert handle >= 0;
s.chunk.initBufWithSubpage(buf, null, handle, reqCapacity, cache);
incTinySmallAllocation(tiny);
return;
}
}
synchronized (this) {
allocateNormal(buf, reqCapacity, normCapacity, cache);
}
private void tcacheAllocateSmall(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity,
final int sizeIdx) {
incTinySmallAllocation(tiny);
if (cache.allocateSmall(this, buf, reqCapacity, sizeIdx)) {
// was able to allocate out of the cache so move on
return;
}
if (normCapacity <= chunkSize) {
if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {
// was able to allocate out of the cache so move on
/**
* Synchronize on the head. This is needed as {@link PoolChunk#allocateSubpage(int)} and
* {@link PoolChunk#free(long)} may modify the doubly linked list as well.
*/
final PoolSubpage<T> head = smallSubpagePools[sizeIdx];
synchronized (head) {
final PoolSubpage<T> s = head.next;
if (s != head) {
assert s.doNotDestroy && s.elemSize == sizeIdx2size(sizeIdx);
long handle = s.allocate();
assert handle >= 0;
s.chunk.initBufWithSubpage(buf, null, handle, reqCapacity, cache);
incSmallAllocation();
return;
}
synchronized (this) {
allocateNormal(buf, reqCapacity, normCapacity, cache);
++allocationsNormal;
}
} else {
// Huge allocations are never served via the cache so just call allocateHuge
allocateHuge(buf, reqCapacity);
}
synchronized (this) {
allocateNormal(buf, reqCapacity, sizeIdx, cache);
}
incSmallAllocation();
}
private void tcacheAllocateNormal(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity,
final int sizeIdx) {
if (cache.allocateNormal(this, buf, reqCapacity, sizeIdx)) {
// was able to allocate out of the cache so move on
return;
}
synchronized (this) {
allocateNormal(buf, reqCapacity, sizeIdx, cache);
++allocationsNormal;
}
}
// Method must be called inside synchronized(this) { ... } block
private void allocateNormal(PooledByteBuf<T> buf, int reqCapacity, int normCapacity, PoolThreadCache threadCache) {
if (q050.allocate(buf, reqCapacity, normCapacity, threadCache) ||
q025.allocate(buf, reqCapacity, normCapacity, threadCache) ||
q000.allocate(buf, reqCapacity, normCapacity, threadCache) ||
qInit.allocate(buf, reqCapacity, normCapacity, threadCache) ||
q075.allocate(buf, reqCapacity, normCapacity, threadCache)) {
private void allocateNormal(PooledByteBuf<T> buf, int reqCapacity, int sizeIdx, PoolThreadCache threadCache) {
if (q050.allocate(buf, reqCapacity, sizeIdx, threadCache) ||
q025.allocate(buf, reqCapacity, sizeIdx, threadCache) ||
q000.allocate(buf, reqCapacity, sizeIdx, threadCache) ||
qInit.allocate(buf, reqCapacity, sizeIdx, threadCache) ||
q075.allocate(buf, reqCapacity, sizeIdx, threadCache)) {
return;
}
// Add a new chunk.
PoolChunk<T> c = newChunk(pageSize, maxOrder, pageShifts, chunkSize);
boolean success = c.allocate(buf, reqCapacity, normCapacity, threadCache);
PoolChunk<T> c = newChunk(pageSize, nPSizes, pageShifts, chunkSize);
boolean success = c.allocate(buf, reqCapacity, sizeIdx, threadCache);
assert success;
qInit.add(c);
}
private void incTinySmallAllocation(boolean tiny) {
if (tiny) {
allocationsTiny.increment();
} else {
allocationsSmall.increment();
}
private void incSmallAllocation() {
allocationsSmall.increment();
}
private void allocateHuge(PooledByteBuf<T> buf, int reqCapacity) {
@ -272,24 +223,22 @@ abstract class PoolArena<T> implements PoolArenaMetric {
activeBytesHuge.add(-size);
deallocationsHuge.increment();
} else {
SizeClass sizeClass = sizeClass(normCapacity);
SizeClass sizeClass = sizeClass(handle);
if (cache != null && cache.add(this, chunk, nioBuffer, handle, normCapacity, sizeClass)) {
// cached so not free it.
return;
}
freeChunk(chunk, handle, sizeClass, nioBuffer, false);
freeChunk(chunk, handle, normCapacity, sizeClass, nioBuffer, false);
}
}
private SizeClass sizeClass(int normCapacity) {
if (!isTinyOrSmall(normCapacity)) {
return SizeClass.Normal;
}
return isTiny(normCapacity) ? SizeClass.Tiny : SizeClass.Small;
private SizeClass sizeClass(long handle) {
return isSubpage(handle) ? SizeClass.Small : SizeClass.Normal;
}
void freeChunk(PoolChunk<T> chunk, long handle, SizeClass sizeClass, ByteBuffer nioBuffer, boolean finalizer) {
void freeChunk(PoolChunk<T> chunk, long handle, int normCapacity, SizeClass sizeClass, ByteBuffer nioBuffer,
boolean finalizer) {
final boolean destroyChunk;
synchronized (this) {
// We only call this if freeChunk is not called because of the PoolThreadCache finalizer as otherwise this
@ -302,14 +251,11 @@ abstract class PoolArena<T> implements PoolArenaMetric {
case Small:
++deallocationsSmall;
break;
case Tiny:
++deallocationsTiny;
break;
default:
throw new Error();
}
}
destroyChunk = !chunk.parent.free(chunk, handle, nioBuffer);
destroyChunk = !chunk.parent.free(chunk, handle, normCapacity, nioBuffer);
}
if (destroyChunk) {
// destroyChunk not need to be called while holding the synchronized lock.
@ -317,62 +263,8 @@ abstract class PoolArena<T> implements PoolArenaMetric {
}
}
PoolSubpage<T> findSubpagePoolHead(int elemSize) {
int tableIdx;
PoolSubpage<T>[] table;
if (isTiny(elemSize)) { // < 512
tableIdx = tinyIdx(elemSize);
table = tinySubpagePools;
} else {
tableIdx = smallIdx(elemSize);
table = smallSubpagePools;
}
return table[tableIdx];
}
int normalizeCapacity(int reqCapacity) {
checkPositiveOrZero(reqCapacity, "reqCapacity");
if (reqCapacity >= chunkSize) {
return directMemoryCacheAlignment == 0 ? reqCapacity : alignCapacity(reqCapacity);
}
if (!isTiny(reqCapacity)) { // >= 512
// Doubled
int normalizedCapacity = reqCapacity;
normalizedCapacity --;
normalizedCapacity |= normalizedCapacity >>> 1;
normalizedCapacity |= normalizedCapacity >>> 2;
normalizedCapacity |= normalizedCapacity >>> 4;
normalizedCapacity |= normalizedCapacity >>> 8;
normalizedCapacity |= normalizedCapacity >>> 16;
normalizedCapacity ++;
if (normalizedCapacity < 0) {
normalizedCapacity >>>= 1;
}
assert directMemoryCacheAlignment == 0 || (normalizedCapacity & directMemoryCacheAlignmentMask) == 0;
return normalizedCapacity;
}
if (directMemoryCacheAlignment > 0) {
return alignCapacity(reqCapacity);
}
// Quantum-spaced
if ((reqCapacity & 15) == 0) {
return reqCapacity;
}
return (reqCapacity & ~15) + 16;
}
int alignCapacity(int reqCapacity) {
int delta = reqCapacity & directMemoryCacheAlignmentMask;
return delta == 0 ? reqCapacity : reqCapacity + directMemoryCacheAlignment - delta;
PoolSubpage<T> findSubpagePoolHead(int sizeIdx) {
return smallSubpagePools[sizeIdx];
}
void reallocate(PooledByteBuf<T> buf, int newCapacity, boolean freeOldMemory) {
@ -412,7 +304,7 @@ abstract class PoolArena<T> implements PoolArenaMetric {
@Override
public int numTinySubpages() {
return tinySubpagePools.length;
return 0;
}
@Override
@ -427,7 +319,7 @@ abstract class PoolArena<T> implements PoolArenaMetric {
@Override
public List<PoolSubpageMetric> tinySubpages() {
return subPageMetricList(tinySubpagePools);
return Collections.emptyList();
}
@Override
@ -461,12 +353,13 @@ abstract class PoolArena<T> implements PoolArenaMetric {
synchronized (this) {
allocsNormal = allocationsNormal;
}
return allocationsTiny.longValue() + allocationsSmall.longValue() + allocsNormal + allocationsHuge.longValue();
return allocationsSmall.longValue() + allocsNormal + allocationsHuge.longValue();
}
@Override
public long numTinyAllocations() {
return allocationsTiny.longValue();
return 0;
}
@Override
@ -483,14 +376,14 @@ abstract class PoolArena<T> implements PoolArenaMetric {
public long numDeallocations() {
final long deallocs;
synchronized (this) {
deallocs = deallocationsTiny + deallocationsSmall + deallocationsNormal;
deallocs = deallocationsSmall + deallocationsNormal;
}
return deallocs + deallocationsHuge.longValue();
}
@Override
public synchronized long numTinyDeallocations() {
return deallocationsTiny;
public long numTinyDeallocations() {
return 0;
}
@Override
@ -515,17 +408,18 @@ abstract class PoolArena<T> implements PoolArenaMetric {
@Override
public long numActiveAllocations() {
long val = allocationsTiny.longValue() + allocationsSmall.longValue() + allocationsHuge.longValue()
long val = allocationsSmall.longValue() + allocationsHuge.longValue()
- deallocationsHuge.longValue();
synchronized (this) {
val += allocationsNormal - (deallocationsTiny + deallocationsSmall + deallocationsNormal);
val += allocationsNormal - (deallocationsSmall + deallocationsNormal);
}
return max(val, 0);
}
@Override
public long numActiveTinyAllocations() {
return max(numTinyAllocations() - numTinyDeallocations(), 0);
return 0;
}
@Override
@ -560,7 +454,7 @@ abstract class PoolArena<T> implements PoolArenaMetric {
return max(0, val);
}
protected abstract PoolChunk<T> newChunk(int pageSize, int maxOrder, int pageShifts, int chunkSize);
protected abstract PoolChunk<T> newChunk(int pageSize, int maxPageIdx, int pageShifts, int chunkSize);
protected abstract PoolChunk<T> newUnpooledChunk(int capacity);
protected abstract PooledByteBuf<T> newByteBuf(int maxCapacity);
protected abstract void memoryCopy(T src, int srcOffset, PooledByteBuf<T> dst, int length);
@ -593,10 +487,7 @@ abstract class PoolArena<T> implements PoolArenaMetric {
.append(StringUtil.NEWLINE)
.append(q100)
.append(StringUtil.NEWLINE)
.append("tiny subpages:");
appendPoolSubPages(buf, tinySubpagePools);
buf.append(StringUtil.NEWLINE)
.append("small subpages:");
.append("small subpages:");
appendPoolSubPages(buf, smallSubpagePools);
buf.append(StringUtil.NEWLINE);
@ -627,7 +518,6 @@ abstract class PoolArena<T> implements PoolArenaMetric {
super.finalize();
} finally {
destroyPoolSubPages(smallSubpagePools);
destroyPoolSubPages(tinySubpagePools);
destroyPoolChunkLists(qInit, q000, q025, q050, q075, q100);
}
}
@ -646,10 +536,10 @@ abstract class PoolArena<T> implements PoolArenaMetric {
static final class HeapArena extends PoolArena<byte[]> {
HeapArena(PooledByteBufAllocator parent, int pageSize, int maxOrder,
int pageShifts, int chunkSize, int directMemoryCacheAlignment) {
super(parent, pageSize, maxOrder, pageShifts, chunkSize,
directMemoryCacheAlignment);
HeapArena(PooledByteBufAllocator parent, int pageSize, int pageShifts,
int chunkSize, int directMemoryCacheAlignment) {
super(parent, pageSize, pageShifts, chunkSize,
directMemoryCacheAlignment);
}
private static byte[] newByteArray(int size) {
@ -662,8 +552,8 @@ abstract class PoolArena<T> implements PoolArenaMetric {
}
@Override
protected PoolChunk<byte[]> newChunk(int pageSize, int maxOrder, int pageShifts, int chunkSize) {
return new PoolChunk<>(this, newByteArray(chunkSize), pageSize, maxOrder, pageShifts, chunkSize, 0);
protected PoolChunk<byte[]> newChunk(int pageSize, int maxPageIdx, int pageShifts, int chunkSize) {
return new PoolChunk<>(this, newByteArray(chunkSize), pageSize, pageShifts, chunkSize, maxPageIdx, 0);
}
@Override
@ -694,10 +584,10 @@ abstract class PoolArena<T> implements PoolArenaMetric {
static final class DirectArena extends PoolArena<ByteBuffer> {
DirectArena(PooledByteBufAllocator parent, int pageSize, int maxOrder,
int pageShifts, int chunkSize, int directMemoryCacheAlignment) {
super(parent, pageSize, maxOrder, pageShifts, chunkSize,
directMemoryCacheAlignment);
DirectArena(PooledByteBufAllocator parent, int pageSize, int pageShifts,
int chunkSize, int directMemoryCacheAlignment) {
super(parent, pageSize, pageShifts, chunkSize,
directMemoryCacheAlignment);
}
@Override
@ -718,17 +608,17 @@ abstract class PoolArena<T> implements PoolArenaMetric {
}
@Override
protected PoolChunk<ByteBuffer> newChunk(int pageSize, int maxOrder,
int pageShifts, int chunkSize) {
protected PoolChunk<ByteBuffer> newChunk(int pageSize, int maxPageIdx,
int pageShifts, int chunkSize) {
if (directMemoryCacheAlignment == 0) {
return new PoolChunk<>(this,
allocateDirect(chunkSize), pageSize, maxOrder,
pageShifts, chunkSize, 0);
allocateDirect(chunkSize), pageSize, pageShifts,
chunkSize, maxPageIdx, 0);
}
final ByteBuffer memory = allocateDirect(chunkSize
+ directMemoryCacheAlignment);
return new PoolChunk<>(this, memory, pageSize,
maxOrder, pageShifts, chunkSize,
pageShifts, chunkSize, maxPageIdx,
offsetCacheLine(memory));
}

View File

@ -21,7 +21,7 @@ import java.util.List;
/**
* Expose metrics for an arena.
*/
public interface PoolArenaMetric {
public interface PoolArenaMetric extends SizeClassesMetric {
/**
* Returns the number of thread caches backed by this arena.
@ -30,7 +30,10 @@ public interface PoolArenaMetric {
/**
* Returns the number of tiny sub-pages for the arena.
*
* @deprecated Tiny sub-pages have been merged into small sub-pages.
*/
@Deprecated
int numTinySubpages();
/**
@ -45,7 +48,10 @@ public interface PoolArenaMetric {
/**
* Returns an unmodifiable {@link List} which holds {@link PoolSubpageMetric}s for tiny sub-pages.
*
* @deprecated Tiny sub-pages have been merged into small sub-pages.
*/
@Deprecated
List<PoolSubpageMetric> tinySubpages();
/**
@ -65,7 +71,10 @@ public interface PoolArenaMetric {
/**
* Return the number of tiny allocations done via the arena.
*
* @deprecated Tiny allocations have been merged into small allocations.
*/
@Deprecated
long numTinyAllocations();
/**
@ -90,7 +99,10 @@ public interface PoolArenaMetric {
/**
* Return the number of tiny deallocations done via the arena.
*
* @deprecated Tiny deallocations have been merged into small deallocations.
*/
@Deprecated
long numTinyDeallocations();
/**
@ -115,7 +127,10 @@ public interface PoolArenaMetric {
/**
* Return the number of currently active tiny allocations.
*
* @deprecated Tiny allocations have been merged into small allocations.
*/
@Deprecated
long numActiveTinyAllocations();
/**

View File

@ -13,20 +13,24 @@
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.buffer;
import io.netty.util.collection.IntObjectHashMap;
import io.netty.util.collection.IntObjectMap;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.PriorityQueue;
/**
* Description of algorithm for PageRun/PoolSubpage allocation from PoolChunk
*
* Notation: The following terms are important to understand the code
* > page - a page is the smallest unit of memory chunk that can be allocated
* > chunk - a chunk is a collection of pages
* > in this code chunkSize = 2^{maxOrder} * pageSize
* > run - a run is a collection of pages
* > chunk - a chunk is a collection of runs
* > in this code chunkSize = maxPages * pageSize
*
* To begin we allocate a byte array of size = chunkSize
* Whenever a ByteBuf of given size needs to be created we search for the first position
@ -34,96 +38,136 @@ import java.util.Deque;
* return a (long) handle that encodes this offset information, (this memory segment is then
* marked as reserved so it is always used by exactly one ByteBuf and no more)
*
* For simplicity all sizes are normalized according to PoolArena#normalizeCapacity method
* This ensures that when we request for memory segments of size >= pageSize the normalizedCapacity
* equals the next nearest power of 2
* For simplicity all sizes are normalized according to {@link PoolArena#size2SizeIdx(int)} method.
* This ensures that when we request for memory segments of size > pageSize the normalizedCapacity
* equals the next nearest size in {@link SizeClasses}.
*
* To search for the first offset in chunk that has at least requested size available we construct a
* complete balanced binary tree and store it in an array (just like heaps) - memoryMap
*
* The tree looks like this (the size of each node being mentioned in the parenthesis)
* A chunk has the following layout:
*
* depth=0 1 node (chunkSize)
* depth=1 2 nodes (chunkSize/2)
* ..
* ..
* depth=d 2^d nodes (chunkSize/2^d)
* ..
* depth=maxOrder 2^maxOrder nodes (chunkSize/2^{maxOrder} = pageSize)
* /-----------------\
* | run |
* | |
* | |
* |-----------------|
* | run |
* | |
* |-----------------|
* | unalloctated |
* | (freed) |
* | |
* |-----------------|
* | subpage |
* |-----------------|
* | unallocated |
* | (freed) |
* | ... |
* | ... |
* | ... |
* | |
* | |
* | |
* \-----------------/
*
* depth=maxOrder is the last level and the leafs consist of pages
*
* With this tree available searching in chunkArray translates like this:
* To allocate a memory segment of size chunkSize/2^k we search for the first node (from left) at height k
* which is unused
* handle:
* -------
* a handle is a long number, the bit layout of a run looks like:
*
* oooooooo ooooooos ssssssss ssssssue bbbbbbbb bbbbbbbb bbbbbbbb bbbbbbbb
*
* o: runOffset (page offset in the chunk), 15bit
* s: size (number of pages) of this run, 15bit
* u: isUsed?, 1bit
* e: isSubpage?, 1bit
* b: bitmapIdx of subpage, zero if it's not subpage, 32bit
*
* runsAvailMap:
* ------
* a map which manages all runs (used and not in used).
* For each run, the first runOffset and last runOffset are stored in runsAvailMap.
* key: runOffset
* value: handle
*
* runsAvail:
* ----------
* an array of {@link PriorityQueue}.
* Each queue manages same size of runs.
* Runs are sorted by offset, so that we always allocate runs with smaller offset.
*
*
* Algorithm:
* ----------
* Encode the tree in memoryMap with the notation
* memoryMap[id] = x => in the subtree rooted at id, the first node that is free to be allocated
* is at depth x (counted from depth=0) i.e., at depths [depth_of_id, x), there is no node that is free
*
* As we allocate & free nodes, we update values stored in memoryMap so that the property is maintained
* As we allocate runs, we update values stored in runsAvailMap and runsAvail so that the property is maintained.
*
* Initialization -
* In the beginning we construct the memoryMap array by storing the depth of a node at each node
* i.e., memoryMap[id] = depth_of_id
* In the beginning we store the initial run which is the whole chunk.
* The initial run:
* runOffset = 0
* size = chunkSize
* isUsed = no
* isSubpage = no
* bitmapIdx = 0
*
* Observations:
* -------------
* 1) memoryMap[id] = depth_of_id => it is free / unallocated
* 2) memoryMap[id] > depth_of_id => at least one of its child nodes is allocated, so we cannot allocate it, but
* some of its children can still be allocated based on their availability
* 3) memoryMap[id] = maxOrder + 1 => the node is fully allocated & thus none of its children can be allocated, it
* is thus marked as unusable
*
* Algorithm: [allocateNode(d) => we want to find the first node (from left) at height h that can be allocated]
* ----------
* 1) start at root (i.e., depth = 0 or id = 1)
* 2) if memoryMap[1] > d => cannot be allocated from this chunk
* 3) if left node value <= h; we can allocate from left subtree so move to left and repeat until found
* 4) else try in right subtree
*
* Algorithm: [allocateRun(size)]
* ----------
* 1) Compute d = log_2(chunkSize/size)
* 2) Return allocateNode(d)
* 1) find the first avail run using in runsAvails according to size
* 2) if pages of run is larger than request pages then split it, and save the tailing run
* for later using
*
* Algorithm: [allocateSubpage(size)]
* ----------
* 1) use allocateNode(maxOrder) to find an empty (i.e., unused) leaf (i.e., page)
* 2) use this handle to construct the PoolSubpage object or if it already exists just call init(normCapacity)
* note that this PoolSubpage object is added to subpagesPool in the PoolArena when we init() it
* 1) find a not full subpage according to size.
* if it already exists just return, otherwise allocate a new PoolSubpage and call init()
* note that this subpage object is added to subpagesPool in the PoolArena when we init() it
* 2) call subpage.allocate()
*
* Note:
* -----
* In the implementation for improving cache coherence,
* we store 2 pieces of information depth_of_id and x as two byte values in memoryMap and depthMap respectively
* Algorithm: [free(handle, length, nioBuffer)]
* ----------
* 1) if it is a subpage, return the slab back into this subpage
* 2) if the subpage is not used or it is a run, then start free this run
* 3) merge continuous avail runs
* 4) save the merged run
*
* memoryMap[id]= depth_of_id is defined above
* depthMap[id]= x indicates that the first node which is free to be allocated is at depth x (from root)
*/
final class PoolChunk<T> implements PoolChunkMetric {
private static final int INTEGER_SIZE_MINUS_ONE = Integer.SIZE - 1;
private static final int OFFSET_BIT_LENGTH = 15;
private static final int SIZE_BIT_LENGTH = 15;
private static final int INUSED_BIT_LENGTH = 1;
private static final int SUBPAGE_BIT_LENGTH = 1;
private static final int BITMAP_IDX_BIT_LENGTH = 32;
static final int IS_SUBPAGE_SHIFT = BITMAP_IDX_BIT_LENGTH;
static final int IS_USED_SHIFT = SUBPAGE_BIT_LENGTH + IS_SUBPAGE_SHIFT;
static final int SIZE_SHIFT = INUSED_BIT_LENGTH + IS_USED_SHIFT;
static final int RUN_OFFSET_SHIFT = SIZE_BIT_LENGTH + SIZE_SHIFT;
final PoolArena<T> arena;
final T memory;
final boolean unpooled;
final int offset;
private final byte[] memoryMap;
private final byte[] depthMap;
/**
* store the first page and last page of each avail run
*/
private final IntObjectMap<Long> runsAvailMap;
/**
* manage all avail runs
*/
private final PriorityQueue<Long>[] runsAvail;
/**
* manage all subpages in this chunk
*/
private final PoolSubpage<T>[] subpages;
/** Used to determine if the requested capacity is equal to or greater than pageSize. */
private final int subpageOverflowMask;
private final int pageSize;
private final int pageShifts;
private final int maxOrder;
private final int chunkSize;
private final int log2ChunkSize;
private final int maxSubpageAllocs;
/** Used to mark memory as unusable */
private final byte unusable;
// Use as cache for ByteBuffer created from the memory. These are just duplicates and so are only a container
// around the memory itself. These are often needed for operations within the Pooled*ByteBuf and so
@ -141,38 +185,26 @@ final class PoolChunk<T> implements PoolChunkMetric {
// TODO: Test if adding padding helps under contention
//private long pad0, pad1, pad2, pad3, pad4, pad5, pad6, pad7;
PoolChunk(PoolArena<T> arena, T memory, int pageSize, int maxOrder, int pageShifts, int chunkSize, int offset) {
@SuppressWarnings("unchecked")
PoolChunk(PoolArena<T> arena, T memory, int pageSize, int pageShifts, int chunkSize, int maxPageIdx, int offset) {
unpooled = false;
this.arena = arena;
this.memory = memory;
this.pageSize = pageSize;
this.pageShifts = pageShifts;
this.maxOrder = maxOrder;
this.chunkSize = chunkSize;
this.offset = offset;
unusable = (byte) (maxOrder + 1);
log2ChunkSize = log2(chunkSize);
subpageOverflowMask = ~(pageSize - 1);
freeBytes = chunkSize;
assert maxOrder < 30 : "maxOrder should be < 30, but is: " + maxOrder;
maxSubpageAllocs = 1 << maxOrder;
runsAvail = newRunsAvailqueueArray(maxPageIdx);
runsAvailMap = new IntObjectHashMap<Long>();
subpages = new PoolSubpage[chunkSize >> pageShifts];
// Generate the memory map.
memoryMap = new byte[maxSubpageAllocs << 1];
depthMap = new byte[memoryMap.length];
int memoryMapIndex = 1;
for (int d = 0; d <= maxOrder; ++ d) { // move down the tree one level at a time
int depth = 1 << d;
for (int p = 0; p < depth; ++ p) {
// in each level traverse left to right and set value to the depth of subtree
memoryMap[memoryMapIndex] = (byte) d;
depthMap[memoryMapIndex] = (byte) d;
memoryMapIndex ++;
}
}
//insert initial run, offset = 0, pages = chunkSize / pageSize
int pages = chunkSize >> pageShifts;
long initHandle = (long) pages << SIZE_SHIFT;
insertAvailRun(0, pages, initHandle);
subpages = newSubpageArray(maxSubpageAllocs);
cachedNioBuffers = new ArrayDeque<>(8);
}
@ -182,23 +214,67 @@ final class PoolChunk<T> implements PoolChunkMetric {
this.arena = arena;
this.memory = memory;
this.offset = offset;
memoryMap = null;
depthMap = null;
subpages = null;
subpageOverflowMask = 0;
pageSize = 0;
pageShifts = 0;
maxOrder = 0;
unusable = (byte) (maxOrder + 1);
runsAvailMap = null;
runsAvail = null;
subpages = null;
chunkSize = size;
log2ChunkSize = log2(chunkSize);
maxSubpageAllocs = 0;
cachedNioBuffers = null;
}
@SuppressWarnings("unchecked")
private PoolSubpage<T>[] newSubpageArray(int size) {
return new PoolSubpage[size];
private static PriorityQueue<Long>[] newRunsAvailqueueArray(int size) {
PriorityQueue<Long>[] queueArray = new PriorityQueue[size];
for (int i = 0; i < queueArray.length; i++) {
queueArray[i] = new PriorityQueue<Long>();
}
return queueArray;
}
private void insertAvailRun(int runOffset, int pages, Long handle) {
int pageIdxFloor = arena.pages2pageIdxFloor(pages);
PriorityQueue<Long> queue = runsAvail[pageIdxFloor];
queue.offer(handle);
//insert first page of run
insertAvailRun0(runOffset, handle);
if (pages > 1) {
//insert last page of run
insertAvailRun0(lastPage(runOffset, pages), handle);
}
}
private void insertAvailRun0(int runOffset, Long handle) {
Long pre = runsAvailMap.put(runOffset, handle);
assert pre == null;
}
private void removeAvailRun(long handle) {
int pageIdxFloor = arena.pages2pageIdxFloor(runPages(handle));
PriorityQueue<Long> queue = runsAvail[pageIdxFloor];
removeAvailRun(queue, handle);
}
private void removeAvailRun(PriorityQueue<Long> queue, long handle) {
queue.remove(handle);
int runOffset = runOffset(handle);
int pages = runPages(handle);
//remove first page of run
runsAvailMap.remove(runOffset);
if (pages > 1) {
//remove last page of run
runsAvailMap.remove(lastPage(runOffset, pages));
}
}
private static int lastPage(int runOffset, int pages) {
return runOffset + pages - 1;
}
private Long getAvailRunByOffset(int runOffset) {
return runsAvailMap.get(runOffset);
}
@Override
@ -222,256 +298,281 @@ final class PoolChunk<T> implements PoolChunkMetric {
return 100 - freePercentage;
}
boolean allocate(PooledByteBuf<T> buf, int reqCapacity, int normCapacity, PoolThreadCache threadCache) {
boolean allocate(PooledByteBuf<T> buf, int reqCapacity, int sizeIdx, PoolThreadCache cache) {
final long handle;
if ((normCapacity & subpageOverflowMask) != 0) { // >= pageSize
handle = allocateRun(normCapacity);
if (sizeIdx <= arena.smallMaxSizeIdx) {
// small
handle = allocateSubpage(sizeIdx);
if (handle < 0) {
return false;
}
assert isSubpage(handle);
} else {
handle = allocateSubpage(normCapacity);
// normal
// runSize must be multiple of pageSize
int runSize = arena.sizeIdx2size(sizeIdx);
handle = allocateRun(runSize);
if (handle < 0) {
return false;
}
}
if (handle < 0) {
return false;
}
ByteBuffer nioBuffer = cachedNioBuffers != null ? cachedNioBuffers.pollLast() : null;
initBuf(buf, nioBuffer, handle, reqCapacity, threadCache);
ByteBuffer nioBuffer = cachedNioBuffers != null? cachedNioBuffers.pollLast() : null;
initBuf(buf, nioBuffer, handle, reqCapacity, cache);
return true;
}
/**
* Update method used by allocate
* This is triggered only when a successor is allocated and all its predecessors
* need to update their state
* The minimal depth at which subtree rooted at id has some free space
*
* @param id id
*/
private void updateParentsAlloc(int id) {
while (id > 1) {
int parentId = id >>> 1;
byte val1 = value(id);
byte val2 = value(id ^ 1);
byte val = val1 < val2 ? val1 : val2;
setValue(parentId, val);
id = parentId;
}
}
private long allocateRun(int runSize) {
int pages = runSize >> pageShifts;
int pageIdx = arena.pages2pageIdx(pages);
/**
* Update method used by free
* This needs to handle the special case when both children are completely free
* in which case parent be directly allocated on request of size = child-size * 2
*
* @param id id
*/
private void updateParentsFree(int id) {
int logChild = depth(id) + 1;
while (id > 1) {
int parentId = id >>> 1;
byte val1 = value(id);
byte val2 = value(id ^ 1);
logChild -= 1; // in first iteration equals log, subsequently reduce 1 from logChild as we traverse up
if (val1 == logChild && val2 == logChild) {
setValue(parentId, (byte) (logChild - 1));
} else {
byte val = val1 < val2 ? val1 : val2;
setValue(parentId, val);
synchronized (runsAvail) {
//find first queue which has at least one big enough run
int queueIdx = runFirstBestFit(pageIdx);
if (queueIdx == -1) {
return -1;
}
id = parentId;
//get run with min offset in this queue
PriorityQueue<Long> queue = runsAvail[queueIdx];
long handle = queue.poll();
assert !isUsed(handle);
removeAvailRun(queue, handle);
if (handle != -1) {
handle = splitLargeRun(handle, pages);
}
freeBytes -= runSize(pageShifts, handle);
return handle;
}
}
/**
* Algorithm to allocate an index in memoryMap when we query for a free node
* at depth d
*
* @param d depth
* @return index in memoryMap
*/
private int allocateNode(int d) {
int id = 1;
int initial = - (1 << d); // has last d bits = 0 and rest all = 1
byte val = value(id);
if (val > d) { // unusable
return -1;
private int calculateRunSize(int sizeIdx) {
int maxElements = 1 << pageShifts - SizeClasses.LOG2_QUANTUM;
int runSize = 0;
int nElements;
final int elemSize = arena.sizeIdx2size(sizeIdx);
//find lowest common multiple of pageSize and elemSize
do {
runSize += pageSize;
nElements = runSize / elemSize;
} while (nElements < maxElements && runSize != nElements * elemSize);
while (nElements > maxElements) {
runSize -= pageSize;
nElements = runSize / elemSize;
}
while (val < d || (id & initial) == 0) { // id & initial == 1 << d for all ids at depth d, for < d it is 0
id <<= 1;
val = value(id);
if (val > d) {
id ^= 1;
val = value(id);
assert nElements > 0;
assert runSize <= chunkSize;
assert runSize >= elemSize;
return runSize;
}
private int runFirstBestFit(int pageIdx) {
if (freeBytes == chunkSize) {
return arena.nPSizes - 1;
}
for (int i = pageIdx; i < arena.nPSizes; i++) {
PriorityQueue<Long> queue = runsAvail[i];
if (queue != null && !queue.isEmpty()) {
return i;
}
}
byte value = value(id);
assert value == d && (id & initial) == 1 << d : String.format("val = %d, id & initial = %d, d = %d",
value, id & initial, d);
setValue(id, unusable); // mark as unusable
updateParentsAlloc(id);
return id;
return -1;
}
/**
* Allocate a run of pages (>=1)
*
* @param normCapacity normalized capacity
* @return index in memoryMap
*/
private long allocateRun(int normCapacity) {
int d = maxOrder - (log2(normCapacity) - pageShifts);
int id = allocateNode(d);
if (id < 0) {
return id;
private long splitLargeRun(long handle, int needPages) {
assert needPages > 0;
int totalPages = runPages(handle);
assert needPages <= totalPages;
int remPages = totalPages - needPages;
if (remPages > 0) {
int runOffset = runOffset(handle);
// keep track of trailing unused pages for later use
int availOffset = runOffset + needPages;
long availRun = toRunHandle(availOffset, remPages, 0);
insertAvailRun(availOffset, remPages, availRun);
// not avail
return toRunHandle(runOffset, needPages, 1);
}
freeBytes -= runLength(id);
return id;
//mark it as used
handle |= 1L << IS_USED_SHIFT;
return handle;
}
/**
* Create / initialize a new PoolSubpage of normCapacity
* Any PoolSubpage created / initialized here is added to subpage pool in the PoolArena that owns this PoolChunk
* Create / initialize a new PoolSubpage of normCapacity. Any PoolSubpage created / initialized here is added to
* subpage pool in the PoolArena that owns this PoolChunk
*
* @param sizeIdx sizeIdx of normalized size
*
* @param normCapacity normalized capacity
* @return index in memoryMap
*/
private long allocateSubpage(int normCapacity) {
private long allocateSubpage(int sizeIdx) {
// Obtain the head of the PoolSubPage pool that is owned by the PoolArena and synchronize on it.
// This is need as we may add it back and so alter the linked-list structure.
PoolSubpage<T> head = arena.findSubpagePoolHead(normCapacity);
int d = maxOrder; // subpages are only be allocated from pages i.e., leaves
PoolSubpage<T> head = arena.findSubpagePoolHead(sizeIdx);
synchronized (head) {
int id = allocateNode(d);
if (id < 0) {
return id;
//allocate a new run
int runSize = calculateRunSize(sizeIdx);
//runSize must be multiples of pageSize
long runHandle = allocateRun(runSize);
if (runHandle < 0) {
return -1;
}
final PoolSubpage<T>[] subpages = this.subpages;
final int pageSize = this.pageSize;
int runOffset = runOffset(runHandle);
int elemSize = arena.sizeIdx2size(sizeIdx);
freeBytes -= pageSize;
PoolSubpage<T> subpage = new PoolSubpage<T>(head, this, pageShifts, runOffset,
runSize(pageShifts, runHandle), elemSize);
int subpageIdx = subpageIdx(id);
PoolSubpage<T> subpage = subpages[subpageIdx];
if (subpage == null) {
subpage = new PoolSubpage<>(head, this, id, runOffset(id), pageSize, normCapacity);
subpages[subpageIdx] = subpage;
} else {
subpage.init(head, normCapacity);
}
subpages[runOffset] = subpage;
return subpage.allocate();
}
}
/**
* Free a subpage or a run of pages
* When a subpage is freed from PoolSubpage, it might be added back to subpage pool of the owning PoolArena
* If the subpage pool in PoolArena has at least one other PoolSubpage of given elemSize, we can
* completely free the owning Page so it is available for subsequent allocations
* Free a subpage or a run of pages When a subpage is freed from PoolSubpage, it might be added back to subpage pool
* of the owning PoolArena. If the subpage pool in PoolArena has at least one other PoolSubpage of given elemSize,
* we can completely free the owning Page so it is available for subsequent allocations
*
* @param handle handle to free
*/
void free(long handle, ByteBuffer nioBuffer) {
int memoryMapIdx = memoryMapIdx(handle);
int bitmapIdx = bitmapIdx(handle);
void free(long handle, int normCapacity, ByteBuffer nioBuffer) {
if (isSubpage(handle)) {
int sizeIdx = arena.size2SizeIdx(normCapacity);
PoolSubpage<T> head = arena.findSubpagePoolHead(sizeIdx);
if (bitmapIdx != 0) { // free a subpage
PoolSubpage<T> subpage = subpages[subpageIdx(memoryMapIdx)];
PoolSubpage<T> subpage = subpages[runOffset(handle)];
assert subpage != null && subpage.doNotDestroy;
// Obtain the head of the PoolSubPage pool that is owned by the PoolArena and synchronize on it.
// This is need as we may add it back and so alter the linked-list structure.
PoolSubpage<T> head = arena.findSubpagePoolHead(subpage.elemSize);
synchronized (head) {
if (subpage.free(head, bitmapIdx & 0x3FFFFFFF)) {
if (subpage.free(head, bitmapIdx(handle))) {
//the subpage is still used, do not free it
return;
}
}
}
freeBytes += runLength(memoryMapIdx);
setValue(memoryMapIdx, depth(memoryMapIdx));
updateParentsFree(memoryMapIdx);
//start free run
int pages = runPages(handle);
synchronized (runsAvail) {
// collapse continuous runs, successfully collapsed runs
// will be removed from runsAvail and runsAvailMap
long finalRun = collapseRuns(handle);
//set run as not used
finalRun &= ~(1L << IS_USED_SHIFT);
//if it is a subpage, set it to run
finalRun &= ~(1L << IS_SUBPAGE_SHIFT);
insertAvailRun(runOffset(finalRun), runPages(finalRun), finalRun);
freeBytes += pages << pageShifts;
}
if (nioBuffer != null && cachedNioBuffers != null &&
cachedNioBuffers.size() < PooledByteBufAllocator.DEFAULT_MAX_CACHED_BYTEBUFFERS_PER_CHUNK) {
cachedNioBuffers.size() < PooledByteBufAllocator.DEFAULT_MAX_CACHED_BYTEBUFFERS_PER_CHUNK) {
cachedNioBuffers.offer(nioBuffer);
}
}
private long collapseRuns(long handle) {
return collapseNext(collapsePast(handle));
}
private long collapsePast(long handle) {
for (;;) {
int runOffset = runOffset(handle);
int runPages = runPages(handle);
Long pastRun = getAvailRunByOffset(runOffset - 1);
if (pastRun == null) {
return handle;
}
int pastOffset = runOffset(pastRun);
int pastPages = runPages(pastRun);
//is continuous
if (pastRun != handle && pastOffset + pastPages == runOffset) {
//remove past run
removeAvailRun(pastRun);
handle = toRunHandle(pastOffset, pastPages + runPages, 0);
} else {
return handle;
}
}
}
private long collapseNext(long handle) {
for (;;) {
int runOffset = runOffset(handle);
int runPages = runPages(handle);
Long nextRun = getAvailRunByOffset(runOffset + runPages);
if (nextRun == null) {
return handle;
}
int nextOffset = runOffset(nextRun);
int nextPages = runPages(nextRun);
//is continuous
if (nextRun != handle && runOffset + runPages == nextOffset) {
//remove next run
removeAvailRun(nextRun);
handle = toRunHandle(runOffset, runPages + nextPages, 0);
} else {
return handle;
}
}
}
private static long toRunHandle(int runOffset, int runPages, int inUsed) {
return (long) runOffset << RUN_OFFSET_SHIFT
| (long) runPages << SIZE_SHIFT
| (long) inUsed << IS_USED_SHIFT;
}
void initBuf(PooledByteBuf<T> buf, ByteBuffer nioBuffer, long handle, int reqCapacity,
PoolThreadCache threadCache) {
int memoryMapIdx = memoryMapIdx(handle);
int bitmapIdx = bitmapIdx(handle);
if (bitmapIdx == 0) {
byte val = value(memoryMapIdx);
assert val == unusable : String.valueOf(val);
buf.init(this, nioBuffer, handle, runOffset(memoryMapIdx) + offset,
reqCapacity, runLength(memoryMapIdx), threadCache);
if (isRun(handle)) {
buf.init(this, nioBuffer, handle, runOffset(handle) << pageShifts,
reqCapacity, runSize(pageShifts, handle), arena.parent.threadCache());
} else {
initBufWithSubpage(buf, nioBuffer, handle, bitmapIdx, reqCapacity, threadCache);
initBufWithSubpage(buf, nioBuffer, handle, reqCapacity, threadCache);
}
}
void initBufWithSubpage(PooledByteBuf<T> buf, ByteBuffer nioBuffer, long handle, int reqCapacity,
PoolThreadCache threadCache) {
initBufWithSubpage(buf, nioBuffer, handle, bitmapIdx(handle), reqCapacity, threadCache);
}
int runOffset = runOffset(handle);
int bitmapIdx = bitmapIdx(handle);
private void initBufWithSubpage(PooledByteBuf<T> buf, ByteBuffer nioBuffer,
long handle, int bitmapIdx, int reqCapacity, PoolThreadCache threadCache) {
assert bitmapIdx != 0;
PoolSubpage<T> s = subpages[runOffset];
assert s.doNotDestroy;
assert reqCapacity <= s.elemSize;
int memoryMapIdx = memoryMapIdx(handle);
PoolSubpage<T> subpage = subpages[subpageIdx(memoryMapIdx)];
assert subpage.doNotDestroy;
assert reqCapacity <= subpage.elemSize;
buf.init(
this, nioBuffer, handle,
runOffset(memoryMapIdx) + (bitmapIdx & 0x3FFFFFFF) * subpage.elemSize + offset,
reqCapacity, subpage.elemSize, threadCache);
}
private byte value(int id) {
return memoryMap[id];
}
private void setValue(int id, byte val) {
memoryMap[id] = val;
}
private byte depth(int id) {
return depthMap[id];
}
private static int log2(int val) {
// compute the (0-based, with lsb = 0) position of highest set bit i.e, log2
return INTEGER_SIZE_MINUS_ONE - Integer.numberOfLeadingZeros(val);
}
private int runLength(int id) {
// represents the size in #bytes supported by node 'id' in the tree
return 1 << log2ChunkSize - depth(id);
}
private int runOffset(int id) {
// represents the 0-based offset in #bytes from start of the byte-array chunk
int shift = id ^ 1 << depth(id);
return shift * runLength(id);
}
private int subpageIdx(int memoryMapIdx) {
return memoryMapIdx ^ maxSubpageAllocs; // remove highest set bit, to get offset
}
private static int memoryMapIdx(long handle) {
return (int) handle;
}
private static int bitmapIdx(long handle) {
return (int) (handle >>> Integer.SIZE);
buf.init(this, nioBuffer, handle,
(runOffset << pageShifts) + bitmapIdx * s.elemSize + offset,
reqCapacity, s.elemSize, threadCache);
}
@Override
@ -509,4 +610,32 @@ final class PoolChunk<T> implements PoolChunkMetric {
void destroy() {
arena.destroyChunk(this);
}
static int runOffset(long handle) {
return (int) (handle >> RUN_OFFSET_SHIFT);
}
static int runSize(int pageShifts, long handle) {
return runPages(handle) << pageShifts;
}
static int runPages(long handle) {
return (int) (handle >> SIZE_SHIFT & 0x7fff);
}
static boolean isUsed(long handle) {
return (handle >> IS_USED_SHIFT & 1) == 1L;
}
static boolean isRun(long handle) {
return !isSubpage(handle);
}
static boolean isSubpage(long handle) {
return (handle >> IS_SUBPAGE_SHIFT & 1) == 1L;
}
static int bitmapIdx(long handle) {
return (int) handle;
}
}

View File

@ -96,7 +96,8 @@ final class PoolChunkList<T> implements PoolChunkListMetric {
this.prevList = prevList;
}
boolean allocate(PooledByteBuf<T> buf, int reqCapacity, int normCapacity, PoolThreadCache threadCache) {
boolean allocate(PooledByteBuf<T> buf, int reqCapacity, int sizeIdx, PoolThreadCache threadCache) {
int normCapacity = arena.sizeIdx2size(sizeIdx);
if (normCapacity > maxCapacity) {
// Either this PoolChunkList is empty or the requested capacity is larger then the capacity which can
// be handled by the PoolChunks that are contained in this PoolChunkList.
@ -104,7 +105,7 @@ final class PoolChunkList<T> implements PoolChunkListMetric {
}
for (PoolChunk<T> cur = head; cur != null; cur = cur.next) {
if (cur.allocate(buf, reqCapacity, normCapacity, threadCache)) {
if (cur.allocate(buf, reqCapacity, sizeIdx, threadCache)) {
if (cur.freeBytes <= freeMinThreshold) {
remove(cur);
nextList.add(cur);
@ -115,8 +116,8 @@ final class PoolChunkList<T> implements PoolChunkListMetric {
return false;
}
boolean free(PoolChunk<T> chunk, long handle, ByteBuffer nioBuffer) {
chunk.free(handle, nioBuffer);
boolean free(PoolChunk<T> chunk, long handle, int normCapacity, ByteBuffer nioBuffer) {
chunk.free(handle, normCapacity, nioBuffer);
if (chunk.freeBytes > freeMaxThreshold) {
remove(chunk);
// Move the PoolChunk down the PoolChunkList linked-list.

View File

@ -16,12 +16,18 @@
package io.netty.buffer;
import static io.netty.buffer.PoolChunk.RUN_OFFSET_SHIFT;
import static io.netty.buffer.PoolChunk.SIZE_SHIFT;
import static io.netty.buffer.PoolChunk.IS_USED_SHIFT;
import static io.netty.buffer.PoolChunk.IS_SUBPAGE_SHIFT;
import static io.netty.buffer.SizeClasses.LOG2_QUANTUM;
final class PoolSubpage<T> implements PoolSubpageMetric {
final PoolChunk<T> chunk;
private final int memoryMapIdx;
private final int pageShifts;
private final int runOffset;
private final int pageSize;
private final int runSize;
private final long[] bitmap;
PoolSubpage<T> prev;
@ -38,29 +44,29 @@ final class PoolSubpage<T> implements PoolSubpageMetric {
//private long pad0, pad1, pad2, pad3, pad4, pad5, pad6, pad7;
/** Special constructor that creates a linked list head */
PoolSubpage(int pageSize) {
PoolSubpage() {
chunk = null;
memoryMapIdx = -1;
pageShifts = -1;
runOffset = -1;
elemSize = -1;
this.pageSize = pageSize;
runSize = -1;
bitmap = null;
}
PoolSubpage(PoolSubpage<T> head, PoolChunk<T> chunk, int memoryMapIdx, int runOffset, int pageSize, int elemSize) {
PoolSubpage(PoolSubpage<T> head, PoolChunk<T> chunk, int pageShifts, int runOffset, int runSize, int elemSize) {
this.chunk = chunk;
this.memoryMapIdx = memoryMapIdx;
this.pageShifts = pageShifts;
this.runOffset = runOffset;
this.pageSize = pageSize;
bitmap = new long[pageSize >>> 10]; // pageSize / 16 / 64
this.runSize = runSize;
this.elemSize = elemSize;
bitmap = new long[runSize >>> 6 + LOG2_QUANTUM]; // runSize / 64 / QUANTUM
init(head, elemSize);
}
void init(PoolSubpage<T> head, int elemSize) {
doNotDestroy = true;
this.elemSize = elemSize;
if (elemSize != 0) {
maxNumElems = numAvail = pageSize / elemSize;
maxNumElems = numAvail = runSize / elemSize;
nextAvail = 0;
bitmapLength = maxNumElems >>> 6;
if ((maxNumElems & 63) != 0) {
@ -78,10 +84,6 @@ final class PoolSubpage<T> implements PoolSubpageMetric {
* Returns the bitmap index of the subpage allocation.
*/
long allocate() {
if (elemSize == 0) {
return toHandle(0);
}
if (numAvail == 0 || !doNotDestroy) {
return -1;
}
@ -195,7 +197,12 @@ final class PoolSubpage<T> implements PoolSubpageMetric {
}
private long toHandle(int bitmapIdx) {
return 0x4000000000000000L | (long) bitmapIdx << 32 | memoryMapIdx;
int pages = runSize >> pageShifts;
return (long) runOffset << RUN_OFFSET_SHIFT
| (long) pages << SIZE_SHIFT
| 1L << IS_USED_SHIFT
| 1L << IS_SUBPAGE_SHIFT
| bitmapIdx;
}
@Override
@ -226,11 +233,11 @@ final class PoolSubpage<T> implements PoolSubpageMetric {
}
if (!doNotDestroy) {
return "(" + memoryMapIdx + ": not in use)";
return "(" + runOffset + ": not in use)";
}
return "(" + memoryMapIdx + ": " + (maxNumElems - numAvail) + '/' + maxNumElems +
", offset: " + runOffset + ", length: " + pageSize + ", elemSize: " + elemSize + ')';
return "(" + runOffset + ": " + (maxNumElems - numAvail) + '/' + maxNumElems +
", offset: " + runOffset + ", length: " + runSize + ", elemSize: " + elemSize + ')';
}
@Override
@ -271,7 +278,7 @@ final class PoolSubpage<T> implements PoolSubpageMetric {
@Override
public int pageSize() {
return pageSize;
return 1 << pageShifts;
}
void destroy() {

View File

@ -36,7 +36,7 @@ public interface PoolSubpageMetric {
int elementSize();
/**
* Return the size (in bytes) of this page.
* Return the page size (in bytes) of this page.
*/
int pageSize();
}

View File

@ -47,9 +47,7 @@ final class PoolThreadCache {
final PoolArena<ByteBuffer> directArena;
// Hold the caches for the different size classes, which are tiny, small and normal.
private final MemoryRegionCache<byte[]>[] tinySubPageHeapCaches;
private final MemoryRegionCache<byte[]>[] smallSubPageHeapCaches;
private final MemoryRegionCache<ByteBuffer>[] tinySubPageDirectCaches;
private final MemoryRegionCache<ByteBuffer>[] smallSubPageDirectCaches;
private final MemoryRegionCache<byte[]>[] normalHeapCaches;
private final MemoryRegionCache<ByteBuffer>[] normalDirectCaches;
@ -66,17 +64,15 @@ final class PoolThreadCache {
//private long pad0, pad1, pad2, pad3, pad4, pad5, pad6, pad7;
PoolThreadCache(PoolArena<byte[]> heapArena, PoolArena<ByteBuffer> directArena,
int tinyCacheSize, int smallCacheSize, int normalCacheSize,
int maxCachedBufferCapacity, int freeSweepAllocationThreshold) {
int smallCacheSize, int normalCacheSize, int maxCachedBufferCapacity,
int freeSweepAllocationThreshold) {
checkPositiveOrZero(maxCachedBufferCapacity, "maxCachedBufferCapacity");
this.freeSweepAllocationThreshold = freeSweepAllocationThreshold;
this.heapArena = heapArena;
this.directArena = directArena;
if (directArena != null) {
tinySubPageDirectCaches = createSubPageCaches(
tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny);
smallSubPageDirectCaches = createSubPageCaches(
smallCacheSize, directArena.numSmallSubpagePools, SizeClass.Small);
smallCacheSize, directArena.numSmallSubpagePools);
numShiftsNormalDirect = log2(directArena.pageSize);
normalDirectCaches = createNormalCaches(
@ -85,17 +81,14 @@ final class PoolThreadCache {
directArena.numThreadCaches.getAndIncrement();
} else {
// No directArea is configured so just null out all caches
tinySubPageDirectCaches = null;
smallSubPageDirectCaches = null;
normalDirectCaches = null;
numShiftsNormalDirect = -1;
}
if (heapArena != null) {
// Create the caches for the heap allocations
tinySubPageHeapCaches = createSubPageCaches(
tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny);
smallSubPageHeapCaches = createSubPageCaches(
smallCacheSize, heapArena.numSmallSubpagePools, SizeClass.Small);
smallCacheSize, heapArena.numSmallSubpagePools);
numShiftsNormalHeap = log2(heapArena.pageSize);
normalHeapCaches = createNormalCaches(
@ -104,15 +97,14 @@ final class PoolThreadCache {
heapArena.numThreadCaches.getAndIncrement();
} else {
// No heapArea is configured so just null out all caches
tinySubPageHeapCaches = null;
smallSubPageHeapCaches = null;
normalHeapCaches = null;
numShiftsNormalHeap = -1;
}
// Only check if there are caches in use.
if ((tinySubPageDirectCaches != null || smallSubPageDirectCaches != null || normalDirectCaches != null
|| tinySubPageHeapCaches != null || smallSubPageHeapCaches != null || normalHeapCaches != null)
if ((smallSubPageDirectCaches != null || normalDirectCaches != null
|| smallSubPageHeapCaches != null || normalHeapCaches != null)
&& freeSweepAllocationThreshold < 1) {
throw new IllegalArgumentException("freeSweepAllocationThreshold: "
+ freeSweepAllocationThreshold + " (expected: > 0)");
@ -120,13 +112,13 @@ final class PoolThreadCache {
}
private static <T> MemoryRegionCache<T>[] createSubPageCaches(
int cacheSize, int numCaches, SizeClass sizeClass) {
int cacheSize, int numCaches) {
if (cacheSize > 0 && numCaches > 0) {
@SuppressWarnings("unchecked")
MemoryRegionCache<T>[] cache = new MemoryRegionCache[numCaches];
for (int i = 0; i < cache.length; i++) {
// TODO: maybe use cacheSize / cache.length
cache[i] = new SubPageMemoryRegionCache<>(cacheSize, sizeClass);
cache[i] = new SubPageMemoryRegionCache<>(cacheSize);
}
return cache;
} else {
@ -152,22 +144,15 @@ final class PoolThreadCache {
}
// val > 0
private static int log2(int val) {
static int log2(int val) {
return INTEGER_SIZE_MINUS_ONE - Integer.numberOfLeadingZeros(val);
}
/**
* Try to allocate a tiny buffer out of the cache. Returns {@code true} if successful {@code false} otherwise
*/
boolean allocateTiny(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) {
return allocate(cacheForTiny(area, normCapacity), buf, reqCapacity);
}
/**
* Try to allocate a small buffer out of the cache. Returns {@code true} if successful {@code false} otherwise
*/
boolean allocateSmall(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) {
return allocate(cacheForSmall(area, normCapacity), buf, reqCapacity);
boolean allocateSmall(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int sizeIdx) {
return allocate(cacheForSmall(area, sizeIdx), buf, reqCapacity);
}
/**
@ -198,21 +183,20 @@ final class PoolThreadCache {
@SuppressWarnings({ "unchecked", "rawtypes" })
boolean add(PoolArena<?> area, PoolChunk chunk, ByteBuffer nioBuffer,
long handle, int normCapacity, SizeClass sizeClass) {
MemoryRegionCache<?> cache = cache(area, normCapacity, sizeClass);
int sizeIdx = area.size2SizeIdx(normCapacity);
MemoryRegionCache<?> cache = cache(area, sizeIdx, sizeClass);
if (cache == null) {
return false;
}
return cache.add(chunk, nioBuffer, handle);
return cache.add(chunk, nioBuffer, handle, normCapacity);
}
private MemoryRegionCache<?> cache(PoolArena<?> area, int normCapacity, SizeClass sizeClass) {
private MemoryRegionCache<?> cache(PoolArena<?> area, int sizeIdx, SizeClass sizeClass) {
switch (sizeClass) {
case Normal:
return cacheForNormal(area, normCapacity);
return cacheForNormal(area, sizeIdx);
case Small:
return cacheForSmall(area, normCapacity);
case Tiny:
return cacheForTiny(area, normCapacity);
return cacheForSmall(area, sizeIdx);
default:
throw new Error();
}
@ -235,10 +219,8 @@ final class PoolThreadCache {
// As free() may be called either by the finalizer or by FastThreadLocal.onRemoval(...) we need to ensure
// we only call this one time.
if (freed.compareAndSet(false, true)) {
int numFreed = free(tinySubPageDirectCaches, finalizer) +
free(smallSubPageDirectCaches, finalizer) +
int numFreed = free(smallSubPageDirectCaches, finalizer) +
free(normalDirectCaches, finalizer) +
free(tinySubPageHeapCaches, finalizer) +
free(smallSubPageHeapCaches, finalizer) +
free(normalHeapCaches, finalizer);
@ -277,10 +259,8 @@ final class PoolThreadCache {
}
void trim() {
trim(tinySubPageDirectCaches);
trim(smallSubPageDirectCaches);
trim(normalDirectCaches);
trim(tinySubPageHeapCaches);
trim(smallSubPageHeapCaches);
trim(normalHeapCaches);
}
@ -301,45 +281,33 @@ final class PoolThreadCache {
cache.trim();
}
private MemoryRegionCache<?> cacheForTiny(PoolArena<?> area, int normCapacity) {
int idx = PoolArena.tinyIdx(normCapacity);
private MemoryRegionCache<?> cacheForSmall(PoolArena<?> area, int sizeIdx) {
if (area.isDirect()) {
return cache(tinySubPageDirectCaches, idx);
return cache(smallSubPageDirectCaches, sizeIdx);
}
return cache(tinySubPageHeapCaches, idx);
return cache(smallSubPageHeapCaches, sizeIdx);
}
private MemoryRegionCache<?> cacheForSmall(PoolArena<?> area, int normCapacity) {
int idx = PoolArena.smallIdx(normCapacity);
private MemoryRegionCache<?> cacheForNormal(PoolArena<?> area, int sizeIdx) {
if (area.isDirect()) {
return cache(smallSubPageDirectCaches, idx);
return cache(normalDirectCaches, sizeIdx);
}
return cache(smallSubPageHeapCaches, idx);
return cache(normalHeapCaches, sizeIdx);
}
private MemoryRegionCache<?> cacheForNormal(PoolArena<?> area, int normCapacity) {
if (area.isDirect()) {
// sizeClass == Normal => normCapacity >= pageSize => the shifted value > 0
int idx = log2(normCapacity >> numShiftsNormalDirect);
return cache(normalDirectCaches, idx);
}
int idx = log2(normCapacity >> numShiftsNormalHeap);
return cache(normalHeapCaches, idx);
}
private static <T> MemoryRegionCache<T> cache(MemoryRegionCache<T>[] cache, int idx) {
if (cache == null || idx > cache.length - 1) {
private static <T> MemoryRegionCache<T> cache(MemoryRegionCache<T>[] cache, int sizeIdx) {
if (cache == null || sizeIdx > cache.length - 1) {
return null;
}
return cache[idx];
return cache[sizeIdx];
}
/**
* Cache used for buffers which are backed by TINY or SMALL size.
*/
private static final class SubPageMemoryRegionCache<T> extends MemoryRegionCache<T> {
SubPageMemoryRegionCache(int size, SizeClass sizeClass) {
super(size, sizeClass);
SubPageMemoryRegionCache(int size) {
super(size, SizeClass.Small);
}
@Override
@ -388,8 +356,8 @@ final class PoolThreadCache {
* Add to cache if not already full.
*/
@SuppressWarnings("unchecked")
public final boolean add(PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle) {
Entry<T> entry = newEntry(chunk, nioBuffer, handle);
public final boolean add(PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle, int normCapacity) {
Entry<T> entry = newEntry(chunk, nioBuffer, handle, normCapacity);
boolean queued = queue.offer(entry);
if (!queued) {
// If it was not possible to cache the chunk, immediately recycle the entry
@ -461,7 +429,7 @@ final class PoolThreadCache {
entry.recycle();
}
chunk.arena.freeChunk(chunk, handle, sizeClass, nioBuffer, finalizer);
chunk.arena.freeChunk(chunk, handle, entry.normCapacity, sizeClass, nioBuffer, finalizer);
}
static final class Entry<T> {
@ -469,6 +437,7 @@ final class PoolThreadCache {
PoolChunk<T> chunk;
ByteBuffer nioBuffer;
long handle = -1;
int normCapacity;
Entry(Handle<Entry<?>> recyclerHandle) {
this.recyclerHandle = recyclerHandle;
@ -483,11 +452,12 @@ final class PoolThreadCache {
}
@SuppressWarnings("rawtypes")
private static Entry newEntry(PoolChunk<?> chunk, ByteBuffer nioBuffer, long handle) {
private static Entry newEntry(PoolChunk<?> chunk, ByteBuffer nioBuffer, long handle, int normCapacity) {
Entry entry = RECYCLER.get();
entry.chunk = chunk;
entry.nioBuffer = nioBuffer;
entry.handle = handle;
entry.normCapacity = normCapacity;
return entry;
}

View File

@ -43,7 +43,6 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator implements
private static final int DEFAULT_PAGE_SIZE;
private static final int DEFAULT_MAX_ORDER; // 8192 << 11 = 16 MiB per chunk
private static final int DEFAULT_TINY_CACHE_SIZE;
private static final int DEFAULT_SMALL_CACHE_SIZE;
private static final int DEFAULT_NORMAL_CACHE_SIZE;
private static final int DEFAULT_MAX_CACHED_BUFFER_CAPACITY;
@ -106,7 +105,6 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator implements
PlatformDependent.maxDirectMemory() / defaultChunkSize / 2 / 3)));
// cache sizes
DEFAULT_TINY_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.tinyCacheSize", 512);
DEFAULT_SMALL_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.smallCacheSize", 256);
DEFAULT_NORMAL_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.normalCacheSize", 64);
@ -147,7 +145,6 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator implements
logger.debug("-Dio.netty.allocator.maxOrder: {}", DEFAULT_MAX_ORDER, maxOrderFallbackCause);
}
logger.debug("-Dio.netty.allocator.chunkSize: {}", DEFAULT_PAGE_SIZE << DEFAULT_MAX_ORDER);
logger.debug("-Dio.netty.allocator.tinyCacheSize: {}", DEFAULT_TINY_CACHE_SIZE);
logger.debug("-Dio.netty.allocator.smallCacheSize: {}", DEFAULT_SMALL_CACHE_SIZE);
logger.debug("-Dio.netty.allocator.normalCacheSize: {}", DEFAULT_NORMAL_CACHE_SIZE);
logger.debug("-Dio.netty.allocator.maxCachedBufferCapacity: {}", DEFAULT_MAX_CACHED_BUFFER_CAPACITY);
@ -164,7 +161,6 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator implements
private final PoolArena<byte[]>[] heapArenas;
private final PoolArena<ByteBuffer>[] directArenas;
private final int tinyCacheSize;
private final int smallCacheSize;
private final int normalCacheSize;
private final List<PoolArenaMetric> heapArenaMetrics;
@ -189,40 +185,66 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator implements
/**
* @deprecated use
* {@link PooledByteBufAllocator#PooledByteBufAllocator(boolean, int, int, int, int, int, int, int, boolean)}
* {@link PooledByteBufAllocator#PooledByteBufAllocator(boolean, int, int, int, int, int, int, boolean)}
*/
@Deprecated
public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder) {
this(preferDirect, nHeapArena, nDirectArena, pageSize, maxOrder,
DEFAULT_TINY_CACHE_SIZE, DEFAULT_SMALL_CACHE_SIZE, DEFAULT_NORMAL_CACHE_SIZE);
0, DEFAULT_SMALL_CACHE_SIZE, DEFAULT_NORMAL_CACHE_SIZE);
}
/**
* @deprecated use
* {@link PooledByteBufAllocator#PooledByteBufAllocator(boolean, int, int, int, int, int, int, int, boolean)}
* {@link PooledByteBufAllocator#PooledByteBufAllocator(boolean, int, int, int, int, int, int, boolean)}
*/
@Deprecated
public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder,
int tinyCacheSize, int smallCacheSize, int normalCacheSize) {
this(preferDirect, nHeapArena, nDirectArena, pageSize, maxOrder, tinyCacheSize, smallCacheSize,
normalCacheSize, DEFAULT_USE_CACHE_FOR_ALL_THREADS, DEFAULT_DIRECT_MEMORY_CACHE_ALIGNMENT);
this(preferDirect, nHeapArena, nDirectArena, pageSize, maxOrder, smallCacheSize,
normalCacheSize, DEFAULT_USE_CACHE_FOR_ALL_THREADS, DEFAULT_DIRECT_MEMORY_CACHE_ALIGNMENT);
}
/**
* @deprecated use
* {@link PooledByteBufAllocator#PooledByteBufAllocator(boolean, int, int, int, int, int, int, boolean)}
*/
@Deprecated
public PooledByteBufAllocator(boolean preferDirect, int nHeapArena,
int nDirectArena, int pageSize, int maxOrder, int tinyCacheSize,
int smallCacheSize, int normalCacheSize,
boolean useCacheForAllThreads) {
this(preferDirect, nHeapArena, nDirectArena, pageSize, maxOrder,
tinyCacheSize, smallCacheSize, normalCacheSize,
useCacheForAllThreads, DEFAULT_DIRECT_MEMORY_CACHE_ALIGNMENT);
smallCacheSize, normalCacheSize,
useCacheForAllThreads);
}
public PooledByteBufAllocator(boolean preferDirect, int nHeapArena,
int nDirectArena, int pageSize, int maxOrder,
int smallCacheSize, int normalCacheSize,
boolean useCacheForAllThreads) {
this(preferDirect, nHeapArena, nDirectArena, pageSize, maxOrder,
smallCacheSize, normalCacheSize,
useCacheForAllThreads, DEFAULT_DIRECT_MEMORY_CACHE_ALIGNMENT);
}
/**
* @deprecated use
* {@link PooledByteBufAllocator#PooledByteBufAllocator(boolean, int, int, int, int, int, int, boolean, int)}
*/
@Deprecated
public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder,
int tinyCacheSize, int smallCacheSize, int normalCacheSize,
boolean useCacheForAllThreads, int directMemoryCacheAlignment) {
this(preferDirect, nHeapArena, nDirectArena, pageSize, maxOrder,
smallCacheSize, normalCacheSize,
useCacheForAllThreads, directMemoryCacheAlignment);
}
public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder,
int smallCacheSize, int normalCacheSize,
boolean useCacheForAllThreads, int directMemoryCacheAlignment) {
super(preferDirect);
threadCache = new PoolThreadLocalCache(useCacheForAllThreads);
this.tinyCacheSize = tinyCacheSize;
this.smallCacheSize = smallCacheSize;
this.normalCacheSize = normalCacheSize;
chunkSize = validateAndCalculateChunkSize(pageSize, maxOrder);
@ -247,7 +269,7 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator implements
List<PoolArenaMetric> metrics = new ArrayList<>(heapArenas.length);
for (int i = 0; i < heapArenas.length; i ++) {
PoolArena.HeapArena arena = new PoolArena.HeapArena(this,
pageSize, maxOrder, pageShifts, chunkSize,
pageSize, pageShifts, chunkSize,
directMemoryCacheAlignment);
heapArenas[i] = arena;
metrics.add(arena);
@ -263,7 +285,7 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator implements
List<PoolArenaMetric> metrics = new ArrayList<>(directArenas.length);
for (int i = 0; i < directArenas.length; i ++) {
PoolArena.DirectArena arena = new PoolArena.DirectArena(
this, pageSize, maxOrder, pageShifts, chunkSize, directMemoryCacheAlignment);
this, pageSize, pageShifts, chunkSize, directMemoryCacheAlignment);
directArenas[i] = arena;
metrics.add(arena);
}
@ -387,10 +409,13 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator implements
}
/**
* Default tiny cache size - System Property: io.netty.allocator.tinyCacheSize - default 512
* Default tiny cache size - default 0
*
* @deprecated Tiny caches have been merged into small caches.
*/
@Deprecated
public static int defaultTinyCacheSize() {
return DEFAULT_TINY_CACHE_SIZE;
return 0;
}
/**
@ -451,7 +476,7 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator implements
final Thread current = Thread.currentThread();
if (useCacheForAllThreads || current instanceof FastThreadLocalThread) {
final PoolThreadCache cache = new PoolThreadCache(
heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize,
heapArena, directArena, smallCacheSize, normalCacheSize,
DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL);
if (DEFAULT_CACHE_TRIM_INTERVAL_MILLIS > 0) {
@ -464,7 +489,7 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator implements
return cache;
}
// No caching so just use 0 as sizes.
return new PoolThreadCache(heapArena, directArena, 0, 0, 0, 0, 0);
return new PoolThreadCache(heapArena, directArena, 0, 0, 0, 0);
}
@Override
@ -561,7 +586,7 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator implements
*/
@Deprecated
public int tinyCacheSize() {
return tinyCacheSize;
return 0;
}
/**

View File

@ -68,7 +68,10 @@ public final class PooledByteBufAllocatorMetric implements ByteBufAllocatorMetri
/**
* Return the size of the tiny cache.
*
* @deprecated Tiny caches have been merged into small caches.
*/
@Deprecated
public int tinyCacheSize() {
return allocator.tinyCacheSize();
}
@ -112,7 +115,6 @@ public final class PooledByteBufAllocatorMetric implements ByteBufAllocatorMetri
.append("; usedDirectMemory: ").append(usedDirectMemory())
.append("; numHeapArenas: ").append(numHeapArenas())
.append("; numDirectArenas: ").append(numDirectArenas())
.append("; tinyCacheSize: ").append(tinyCacheSize())
.append("; smallCacheSize: ").append(smallCacheSize())
.append("; normalCacheSize: ").append(normalCacheSize())
.append("; numThreadLocalCaches: ").append(numThreadLocalCaches())

View File

@ -0,0 +1,407 @@
/*
* Copyright 2020 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.buffer;
import static io.netty.buffer.PoolThreadCache.*;
/**
* SizeClasses requires {@code pageShifts} to be defined prior to inclusion,
* and it in turn defines:
* <p>
* LOG2_SIZE_CLASS_GROUP: Log of size class count for each size doubling.
* LOG2_MAX_LOOKUP_SIZE: Log of max size class in the lookup table.
* sizeClasses: Complete table of [index, log2Group, log2Delta, nDelta, isMultiPageSize,
* isSubPage, log2DeltaLookup] tuples.
* index: Size class index.
* log2Group: Log of group base size (no deltas added).
* log2Delta: Log of delta to previous size class.
* nDelta: Delta multiplier.
* isMultiPageSize: 'yes' if a multiple of the page size, 'no' otherwise.
* isSubPage: 'yes' if a subpage size class, 'no' otherwise.
* log2DeltaLookup: Same as log2Delta if a lookup table size class, 'no'
* otherwise.
* <p>
* nSubpages: Number of subpages size classes.
* nSizes: Number of size classes.
* nPSizes: Number of size classes that are multiples of pageSize.
*
* smallMaxSizeIdx: Maximum small size class index.
*
* lookupMaxclass: Maximum size class included in lookup table.
* log2NormalMinClass: Log of minimum normal size class.
* <p>
* The first size class and spacing are 1 << LOG2_QUANTUM.
* Each group has 1 << LOG2_SIZE_CLASS_GROUP of size classes.
*
* size = 1 << log2Group + nDelta * (1 << log2Delta)
*
* The first size class has an unusual encoding, because the size has to be
* split between group and delta*nDelta.
*
* If pageShift = 13, sizeClasses looks like this:
*
* (index, log2Group, log2Delta, nDelta, isMultiPageSize, isSubPage, log2DeltaLookup)
* <p>
* ( 0, 4, 4, 0, no, yes, 4)
* ( 1, 4, 4, 1, no, yes, 4)
* ( 2, 4, 4, 2, no, yes, 4)
* ( 3, 4, 4, 3, no, yes, 4)
* <p>
* ( 4, 6, 4, 1, no, yes, 4)
* ( 5, 6, 4, 2, no, yes, 4)
* ( 6, 6, 4, 3, no, yes, 4)
* ( 7, 6, 4, 4, no, yes, 4)
* <p>
* ( 8, 7, 5, 1, no, yes, 5)
* ( 9, 7, 5, 2, no, yes, 5)
* ( 10, 7, 5, 3, no, yes, 5)
* ( 11, 7, 5, 4, no, yes, 5)
* ...
* ...
* ( 72, 23, 21, 1, yes, no, no)
* ( 73, 23, 21, 2, yes, no, no)
* ( 74, 23, 21, 3, yes, no, no)
* ( 75, 23, 21, 4, yes, no, no)
* <p>
* ( 76, 24, 22, 1, yes, no, no)
*/
abstract class SizeClasses implements SizeClassesMetric {
static final int LOG2_QUANTUM = 4;
private static final int LOG2_SIZE_CLASS_GROUP = 2;
private static final int LOG2_MAX_LOOKUP_SIZE = 12;
private static final int INDEX_IDX = 0;
private static final int LOG2GROUP_IDX = 1;
private static final int LOG2DELTA_IDX = 2;
private static final int NDELTA_IDX = 3;
private static final int PAGESIZE_IDX = 4;
private static final int SUBPAGE_IDX = 5;
private static final int LOG2_DELTA_LOOKUP_IDX = 6;
private static final byte no = 0, yes = 1;
protected SizeClasses(int pageSize, int pageShifts, int chunkSize, int directMemoryCacheAlignment) {
this.pageSize = pageSize;
this.pageShifts = pageShifts;
this.chunkSize = chunkSize;
this.directMemoryCacheAlignment = directMemoryCacheAlignment;
int group = log2(chunkSize) + 1 - LOG2_QUANTUM;
//generate size classes
//[index, log2Group, log2Delta, nDelta, isMultiPageSize, isSubPage, log2DeltaLookup]
sizeClasses = new short[group << LOG2_SIZE_CLASS_GROUP][7];
nSizes = sizeClasses();
//generate lookup table
sizeIdx2sizeTab = new int[nSizes];
pageIdx2sizeTab = new int[nPSizes];
idx2SizeTab(sizeIdx2sizeTab, pageIdx2sizeTab);
size2idxTab = new int[lookupMaxSize >> LOG2_QUANTUM];
size2idxTab(size2idxTab);
}
protected final int pageSize;
protected final int pageShifts;
protected final int chunkSize;
protected final int directMemoryCacheAlignment;
final int nSizes;
int nSubpages;
int nPSizes;
int smallMaxSizeIdx;
private int lookupMaxSize;
private final short[][] sizeClasses;
private final int[] pageIdx2sizeTab;
// lookup table for sizeIdx <= smallMaxSizeIdx
private final int[] sizeIdx2sizeTab;
// lookup table used for size <= lookupMaxclass
// spacing is 1 << LOG2_QUANTUM, so the size of array is lookupMaxclass >> LOG2_QUANTUM
private final int[] size2idxTab;
private int sizeClasses() {
int normalMaxSize = -1;
int index = 0;
int size = 0;
int log2Group = LOG2_QUANTUM;
int log2Delta = LOG2_QUANTUM;
int ndeltaLimit = 1 << LOG2_SIZE_CLASS_GROUP;
//First small group, nDelta start at 0.
//first size class is 1 << LOG2_QUANTUM
int nDelta = 0;
while (nDelta < ndeltaLimit) {
size = sizeClass(index++, log2Group, log2Delta, nDelta++);
}
log2Group += LOG2_SIZE_CLASS_GROUP;
//All remaining groups, nDelta start at 1.
while (size < chunkSize) {
nDelta = 1;
while (nDelta <= ndeltaLimit && size < chunkSize) {
size = sizeClass(index++, log2Group, log2Delta, nDelta++);
normalMaxSize = size;
}
log2Group++;
log2Delta++;
}
//chunkSize must be normalMaxSize
assert chunkSize == normalMaxSize;
//return number of size index
return index;
}
//calculate size class
private int sizeClass(int index, int log2Group, int log2Delta, int nDelta) {
short isMultiPageSize;
if (log2Delta >= pageShifts) {
isMultiPageSize = yes;
} else {
int pageSize = 1 << pageShifts;
int size = (1 << log2Group) + (1 << log2Delta) * nDelta;
isMultiPageSize = size == size / pageSize * pageSize? yes : no;
}
int log2Ndelta = nDelta == 0? 0 : log2(nDelta);
byte remove = 1 << log2Ndelta < nDelta? yes : no;
int log2Size = log2Delta + log2Ndelta == log2Group? log2Group + 1 : log2Group;
if (log2Size == log2Group) {
remove = yes;
}
short isSubpage = log2Size < pageShifts + LOG2_SIZE_CLASS_GROUP? yes : no;
int log2DeltaLookup = log2Size < LOG2_MAX_LOOKUP_SIZE ||
log2Size == LOG2_MAX_LOOKUP_SIZE && remove == no
? log2Delta : no;
short[] sz = {
(short) index, (short) log2Group, (short) log2Delta,
(short) nDelta, isMultiPageSize, isSubpage, (short) log2DeltaLookup
};
sizeClasses[index] = sz;
int size = (1 << log2Group) + (nDelta << log2Delta);
if (sz[PAGESIZE_IDX] == yes) {
nPSizes++;
}
if (sz[SUBPAGE_IDX] == yes) {
nSubpages++;
smallMaxSizeIdx = index;
}
if (sz[LOG2_DELTA_LOOKUP_IDX] != no) {
lookupMaxSize = size;
}
return size;
}
private void idx2SizeTab(int[] sizeIdx2sizeTab, int[] pageIdx2sizeTab) {
int pageIdx = 0;
for (int i = 0; i < nSizes; i++) {
short[] sizeClass = sizeClasses[i];
int log2Group = sizeClass[LOG2GROUP_IDX];
int log2Delta = sizeClass[LOG2DELTA_IDX];
int nDelta = sizeClass[NDELTA_IDX];
int size = (1 << log2Group) + (nDelta << log2Delta);
sizeIdx2sizeTab[i] = size;
if (sizeClass[PAGESIZE_IDX] == yes) {
pageIdx2sizeTab[pageIdx++] = size;
}
}
}
private void size2idxTab(int[] size2idxTab) {
int idx = 0;
int size = 0;
for (int i = 0; size <= lookupMaxSize; i++) {
int log2Delta = sizeClasses[i][LOG2DELTA_IDX];
int times = 1 << log2Delta - LOG2_QUANTUM;
while (size <= lookupMaxSize && times-- > 0) {
size2idxTab[idx++] = i;
size = idx + 1 << LOG2_QUANTUM;
}
}
}
@Override
public int sizeIdx2size(int sizeIdx) {
return sizeIdx2sizeTab[sizeIdx];
}
@Override
public int sizeIdx2sizeCompute(int sizeIdx) {
int group = sizeIdx >> LOG2_SIZE_CLASS_GROUP;
int mod = sizeIdx & (1 << LOG2_SIZE_CLASS_GROUP) - 1;
int groupSize = group == 0? 0 :
1 << LOG2_QUANTUM + LOG2_SIZE_CLASS_GROUP - 1 << group;
int shift = group == 0? 1 : group;
int lgDelta = shift + LOG2_QUANTUM - 1;
int modSize = mod + 1 << lgDelta;
return groupSize + modSize;
}
@Override
public long pageIdx2size(int pageIdx) {
return pageIdx2sizeTab[pageIdx];
}
@Override
public long pageIdx2sizeCompute(int pageIdx) {
int group = pageIdx >> LOG2_SIZE_CLASS_GROUP;
int mod = pageIdx & (1 << LOG2_SIZE_CLASS_GROUP) - 1;
long groupSize = group == 0? 0 :
1L << pageShifts + LOG2_SIZE_CLASS_GROUP - 1 << group;
int shift = group == 0? 1 : group;
int log2Delta = shift + pageShifts - 1;
int modSize = mod + 1 << log2Delta;
return groupSize + modSize;
}
@Override
public int size2SizeIdx(int size) {
if (size == 0) {
return 0;
}
if (size > chunkSize) {
return nSizes;
}
if (directMemoryCacheAlignment > 0) {
size = alignSize(size);
}
if (size <= lookupMaxSize) {
//size-1 / MIN_TINY
return size2idxTab[size - 1 >> LOG2_QUANTUM];
}
int x = log2((size << 1) - 1);
int shift = x < LOG2_SIZE_CLASS_GROUP + LOG2_QUANTUM + 1
? 0 : x - (LOG2_SIZE_CLASS_GROUP + LOG2_QUANTUM);
int group = shift << LOG2_SIZE_CLASS_GROUP;
int log2Delta = x < LOG2_SIZE_CLASS_GROUP + LOG2_QUANTUM + 1
? LOG2_QUANTUM : x - LOG2_SIZE_CLASS_GROUP - 1;
int deltaInverseMask = -1 << log2Delta;
int mod = (size - 1 & deltaInverseMask) >> log2Delta &
(1 << LOG2_SIZE_CLASS_GROUP) - 1;
return group + mod;
}
@Override
public int pages2pageIdx(int pages) {
return pages2pageIdxCompute(pages, false);
}
@Override
public int pages2pageIdxFloor(int pages) {
return pages2pageIdxCompute(pages, true);
}
private int pages2pageIdxCompute(int pages, boolean floor) {
int pageSize = pages << pageShifts;
if (pageSize > chunkSize) {
return nPSizes;
}
int x = log2((pageSize << 1) - 1);
int shift = x < LOG2_SIZE_CLASS_GROUP + pageShifts
? 0 : x - (LOG2_SIZE_CLASS_GROUP + pageShifts);
int group = shift << LOG2_SIZE_CLASS_GROUP;
int log2Delta = x < LOG2_SIZE_CLASS_GROUP + pageShifts + 1?
pageShifts : x - LOG2_SIZE_CLASS_GROUP - 1;
int deltaInverseMask = -1 << log2Delta;
int mod = (pageSize - 1 & deltaInverseMask) >> log2Delta &
(1 << LOG2_SIZE_CLASS_GROUP) - 1;
int pageIdx = group + mod;
if (floor && pageIdx2sizeTab[pageIdx] > pages << pageShifts) {
pageIdx--;
}
return pageIdx;
}
// Round size up to the nearest multiple of alignment.
private int alignSize(int size) {
int delta = size & directMemoryCacheAlignment - 1;
return delta == 0? size : size + directMemoryCacheAlignment - delta;
}
@Override
public int normalizeSize(int size) {
if (size == 0) {
return sizeIdx2sizeTab[0];
}
if (directMemoryCacheAlignment > 0) {
size = alignSize(size);
}
if (size <= lookupMaxSize) {
int ret = sizeIdx2sizeTab[size2idxTab[size - 1 >> LOG2_QUANTUM]];
assert ret == normalizeSizeCompute(size);
return ret;
}
return normalizeSizeCompute(size);
}
private static int normalizeSizeCompute(int size) {
int x = log2((size << 1) - 1);
int log2Delta = x < LOG2_SIZE_CLASS_GROUP + LOG2_QUANTUM + 1
? LOG2_QUANTUM : x - LOG2_SIZE_CLASS_GROUP - 1;
int delta = 1 << log2Delta;
int delta_mask = delta - 1;
return size + delta_mask & ~delta_mask;
}
}

View File

@ -0,0 +1,87 @@
/*
* Copyright 2020 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.buffer;
/**
* Expose metrics for an SizeClasses.
*/
public interface SizeClassesMetric {
/**
* Computes size from lookup table according to sizeIdx.
*
* @return size
*/
int sizeIdx2size(int sizeIdx);
/**
* Computes size according to sizeIdx.
*
* @return size
*/
int sizeIdx2sizeCompute(int sizeIdx);
/**
* Computes size from lookup table according to pageIdx.
*
* @return size which is multiples of pageSize.
*/
long pageIdx2size(int pageIdx);
/**
* Computes size according to pageIdx.
*
* @return size which is multiples of pageSize
*/
long pageIdx2sizeCompute(int pageIdx);
/**
* Normalizes request size up to the nearest size class.
*
* @param size request size
*
* @return sizeIdx of the size class
*/
int size2SizeIdx(int size);
/**
* Normalizes request size up to the nearest pageSize class.
*
* @param pages multiples of pageSizes
*
* @return pageIdx of the pageSize class
*/
int pages2pageIdx(int pages);
/**
* Normalizes request size down to the nearest pageSize class.
*
* @param pages multiples of pageSizes
*
* @return pageIdx of the pageSize class
*/
int pages2pageIdxFloor(int pages);
/**
* Normalizes usable size that would result from allocating an object with the
* specified size and alignment.
*
* @param size request size
*
* @return normalized size
*/
int normalizeSize(int size);
}

View File

@ -20,29 +20,86 @@ import io.netty.util.internal.PlatformDependent;
import org.junit.Assert;
import org.junit.Test;
import static org.junit.Assume.assumeTrue;
import java.nio.ByteBuffer;
import static org.junit.Assert.*;
import static org.junit.Assume.*;
public class PoolArenaTest {
private static final int PAGE_SIZE = 8192;
private static final int PAGE_SHIFTS = 11;
//chunkSize = pageSize * (2 ^ pageShifts)
private static final int CHUNK_SIZE = 16777216;
@Test
public void testNormalizeCapacity() throws Exception {
PoolArena<ByteBuffer> arena = new PoolArena.DirectArena(null, 0, 0, 9, 999999, 0);
public void testNormalizeCapacity() {
PoolArena<ByteBuffer> arena = new PoolArena.DirectArena(null, PAGE_SIZE, PAGE_SHIFTS, CHUNK_SIZE, 0);
int[] reqCapacities = {0, 15, 510, 1024, 1023, 1025};
int[] expectedResult = {0, 16, 512, 1024, 1024, 2048};
int[] expectedResult = {16, 16, 512, 1024, 1024, 1280};
for (int i = 0; i < reqCapacities.length; i ++) {
Assert.assertEquals(expectedResult[i], arena.normalizeCapacity(reqCapacities[i]));
Assert.assertEquals(expectedResult[i], arena.sizeIdx2size(arena.size2SizeIdx(reqCapacities[i])));
}
}
@Test
public void testNormalizeAlignedCapacity() throws Exception {
PoolArena<ByteBuffer> arena = new PoolArena.DirectArena(null, 0, 0, 9, 999999, 64);
public void testNormalizeAlignedCapacity() {
PoolArena<ByteBuffer> arena = new PoolArena.DirectArena(null, PAGE_SIZE, PAGE_SHIFTS, CHUNK_SIZE, 64);
int[] reqCapacities = {0, 15, 510, 1024, 1023, 1025};
int[] expectedResult = {0, 64, 512, 1024, 1024, 2048};
int[] expectedResult = {16, 64, 512, 1024, 1024, 1280};
for (int i = 0; i < reqCapacities.length; i ++) {
Assert.assertEquals(expectedResult[i], arena.normalizeCapacity(reqCapacities[i]));
Assert.assertEquals(expectedResult[i], arena.sizeIdx2size(arena.size2SizeIdx(reqCapacities[i])));
}
}
@Test
public void testSize2SizeIdx() {
PoolArena<ByteBuffer> arena = new PoolArena.DirectArena(null, PAGE_SIZE, PAGE_SHIFTS, CHUNK_SIZE, 0);
for (int sz = 0; sz <= CHUNK_SIZE; sz++) {
int sizeIdx = arena.size2SizeIdx(sz);
Assert.assertTrue(sz <= arena.sizeIdx2size(sizeIdx));
if (sizeIdx > 0) {
Assert.assertTrue(sz > arena.sizeIdx2size(sizeIdx - 1));
}
}
}
@Test
public void testPages2PageIdx() {
int pageShifts = PAGE_SHIFTS;
PoolArena<ByteBuffer> arena = new PoolArena.DirectArena(null, PAGE_SIZE, PAGE_SHIFTS, CHUNK_SIZE, 0);
int maxPages = CHUNK_SIZE >> pageShifts;
for (int pages = 1; pages <= maxPages; pages++) {
int pageIdxFloor = arena.pages2pageIdxFloor(pages);
Assert.assertTrue(pages << pageShifts >= arena.pageIdx2size(pageIdxFloor));
if (pageIdxFloor > 0 && pages < maxPages) {
Assert.assertTrue(pages << pageShifts < arena.pageIdx2size(pageIdxFloor + 1));
}
int pageIdxCeiling = arena.pages2pageIdx(pages);
Assert.assertTrue(pages << pageShifts <= arena.pageIdx2size(pageIdxCeiling));
if (pageIdxCeiling > 0) {
Assert.assertTrue(pages << pageShifts > arena.pageIdx2size(pageIdxCeiling - 1));
}
}
}
@Test
public void testSizeIdx2size() {
PoolArena<ByteBuffer> arena = new PoolArena.DirectArena(null, PAGE_SIZE, PAGE_SHIFTS, CHUNK_SIZE, 0);
for (int i = 0; i < arena.nSizes; i++) {
assertEquals(arena.sizeIdx2sizeCompute(i), arena.sizeIdx2size(i));
}
}
@Test
public void testPageIdx2size() {
PoolArena<ByteBuffer> arena = new PoolArena.DirectArena(null, PAGE_SIZE, PAGE_SHIFTS, CHUNK_SIZE, 0);
for (int i = 0; i < arena.nPSizes; i++) {
assertEquals(arena.pageIdx2sizeCompute(i), arena.pageIdx2size(i));
}
}
@ -57,7 +114,7 @@ public class PoolArenaTest {
? PlatformDependent.allocateDirectNoCleaner(capacity + alignment)
: ByteBuffer.allocateDirect(capacity + alignment);
PoolArena.DirectArena arena = new PoolArena.DirectArena(null, 0, 0, 9, 9, alignment);
PoolArena.DirectArena arena = new PoolArena.DirectArena(null, 512, 9, 512, alignment);
int offset = arena.offsetCacheLine(bb);
long address = PlatformDependent.directBufferAddress(bb);
@ -80,31 +137,25 @@ public class PoolArenaTest {
true // useCacheForAllThreads
);
// create tiny buffer
final ByteBuf b1 = allocator.directBuffer(24);
// create small buffer
final ByteBuf b2 = allocator.directBuffer(800);
final ByteBuf b1 = allocator.directBuffer(800);
// create normal buffer
final ByteBuf b3 = allocator.directBuffer(8192 * 2);
final ByteBuf b2 = allocator.directBuffer(8192 * 5);
Assert.assertNotNull(b1);
Assert.assertNotNull(b2);
Assert.assertNotNull(b3);
// then release buffer to deallocated memory while threadlocal cache has been disabled
// allocations counter value must equals deallocations counter value
Assert.assertTrue(b1.release());
Assert.assertTrue(b2.release());
Assert.assertTrue(b3.release());
Assert.assertTrue(allocator.directArenas().size() >= 1);
final PoolArenaMetric metric = allocator.directArenas().get(0);
Assert.assertEquals(3, metric.numDeallocations());
Assert.assertEquals(3, metric.numAllocations());
Assert.assertEquals(2, metric.numDeallocations());
Assert.assertEquals(2, metric.numAllocations());
Assert.assertEquals(1, metric.numTinyDeallocations());
Assert.assertEquals(1, metric.numTinyAllocations());
Assert.assertEquals(1, metric.numSmallDeallocations());
Assert.assertEquals(1, metric.numSmallAllocations());
Assert.assertEquals(1, metric.numNormalDeallocations());

View File

@ -23,6 +23,7 @@ import io.netty.util.internal.SystemPropertyUtil;
import org.junit.Assume;
import org.junit.Test;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
@ -32,6 +33,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;
import static io.netty.buffer.PoolChunk.runOffset;
import static io.netty.buffer.PoolChunk.runPages;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@ -212,19 +215,6 @@ public class PooledByteBufAllocatorTest extends AbstractByteBufAllocatorTest<Poo
}
}
@Test
public void testTinySubpageMetric() {
PooledByteBufAllocator allocator = new PooledByteBufAllocator(true, 1, 1, 8192, 11, 0, 0, 0);
ByteBuf buffer = allocator.heapBuffer(1);
try {
PoolArenaMetric metric = allocator.metric().heapArenas().get(0);
PoolSubpageMetric subpageMetric = metric.tinySubpages().get(0);
assertEquals(1, subpageMetric.maxNumElements() - subpageMetric.numAvailable());
} finally {
buffer.release();
}
}
@Test
public void testAllocNotNull() {
PooledByteBufAllocator allocator = new PooledByteBufAllocator(true, 1, 1, 8192, 11, 0, 0, 0);
@ -234,7 +224,6 @@ public class PooledByteBufAllocatorTest extends AbstractByteBufAllocatorTest<Poo
testAllocNotNull(allocator, 1024);
// Small allocation
testAllocNotNull(allocator, 512);
// Tiny allocation
testAllocNotNull(allocator, 1);
}
@ -274,6 +263,81 @@ public class PooledByteBufAllocatorTest extends AbstractByteBufAllocatorTest<Poo
assertFalse(lists.get(5).iterator().hasNext());
}
@Test
public void testCollapse() {
int pageSize = 8192;
//no cache
ByteBufAllocator allocator = new PooledByteBufAllocator(true, 1, 1, 8192, 11, 0, 0, 0);
ByteBuf b1 = allocator.buffer(pageSize * 4);
ByteBuf b2 = allocator.buffer(pageSize * 5);
ByteBuf b3 = allocator.buffer(pageSize * 6);
b2.release();
b3.release();
ByteBuf b4 = allocator.buffer(pageSize * 10);
PooledByteBuf<ByteBuffer> b = unwrapIfNeeded(b4);
//b2 and b3 are collapsed, b4 should start at offset 4
assertEquals(4, runOffset(b.handle));
assertEquals(10, runPages(b.handle));
b1.release();
b4.release();
//all ByteBuf are collapsed, b5 should start at offset 0
ByteBuf b5 = allocator.buffer(pageSize * 20);
b = unwrapIfNeeded(b5);
assertEquals(0, runOffset(b.handle));
assertEquals(20, runPages(b.handle));
b5.release();
}
@Test
public void testAllocateSmallOffset() {
int pageSize = 8192;
ByteBufAllocator allocator = new PooledByteBufAllocator(true, 1, 1, 8192, 11, 0, 0, 0);
int size = pageSize * 5;
ByteBuf[] bufs = new ByteBuf[10];
for (int i = 0; i < 10; i++) {
bufs[i] = allocator.buffer(size);
}
for (int i = 0; i < 5; i++) {
bufs[i].release();
}
//make sure we always allocate runs with small offset
for (int i = 0; i < 5; i++) {
ByteBuf buf = allocator.buffer(size);
PooledByteBuf<ByteBuffer> unwrapedBuf = unwrapIfNeeded(buf);
assertEquals(runOffset(unwrapedBuf.handle), i * 5);
bufs[i] = buf;
}
//release at reverse order
for (int i = 10 - 1; i >= 5; i--) {
bufs[i].release();
}
for (int i = 5; i < 10; i++) {
ByteBuf buf = allocator.buffer(size);
PooledByteBuf<ByteBuffer> unwrapedBuf = unwrapIfNeeded(buf);
assertEquals(runOffset(unwrapedBuf.handle), i * 5);
bufs[i] = buf;
}
for (int i = 0; i < 10; i++) {
bufs[i].release();
}
}
@Test (timeout = 4000)
public void testThreadCacheDestroyedByThreadCleaner() throws InterruptedException {
testThreadCacheDestroyed(false);
@ -501,26 +565,33 @@ public class PooledByteBufAllocatorTest extends AbstractByteBufAllocatorTest<Poo
int idx = 0;
while (finish.get() == null) {
for (int i = 0; i < 10; i++) {
buffers.add(allocator.directBuffer(
ALLOCATION_SIZES[Math.abs(idx++ % ALLOCATION_SIZES.length)],
Integer.MAX_VALUE));
int len = ALLOCATION_SIZES[Math.abs(idx++ % ALLOCATION_SIZES.length)];
ByteBuf buf = allocator.directBuffer(len, Integer.MAX_VALUE);
assertEquals(len, buf.writableBytes());
while (buf.isWritable()) {
buf.writeByte(i);
}
buffers.offer(buf);
}
releaseBuffers();
releaseBuffersAndCheckContent();
}
} catch (Throwable cause) {
finish.set(cause);
} finally {
releaseBuffers();
releaseBuffersAndCheckContent();
}
}
private void releaseBuffers() {
for (;;) {
private void releaseBuffersAndCheckContent() {
int i = 0;
while (!buffers.isEmpty()) {
ByteBuf buf = buffers.poll();
if (buf == null) {
break;
while (buf.isReadable()) {
assertEquals(i, buf.readByte());
}
buf.release();
i++;
}
}
@ -537,7 +608,7 @@ public class PooledByteBufAllocatorTest extends AbstractByteBufAllocatorTest<Poo
// Mark as finish if not already done but ensure we not override the previous set error.
join();
} finally {
releaseBuffers();
releaseBuffersAndCheckContent();
}
checkForError();
}
@ -549,4 +620,9 @@ public class PooledByteBufAllocatorTest extends AbstractByteBufAllocatorTest<Poo
}
}
}
@SuppressWarnings("unchecked")
private static PooledByteBuf<ByteBuffer> unwrapIfNeeded(ByteBuf buf) {
return (PooledByteBuf<ByteBuffer>) (buf instanceof PooledByteBuf ? buf : buf.unwrap());
}
}