First incomplete draft of porting over the pooling allocator

This commit is contained in:
Chris Vest 2021-05-11 11:24:06 +02:00
parent e6a238b14d
commit ae2abdd2aa
22 changed files with 3452 additions and 0 deletions

View File

@ -18,6 +18,7 @@ package io.netty.buffer.api;
import java.lang.ref.Cleaner;
public interface MemoryManager {
boolean isNative();
Buffer allocateShared(AllocatorControl allocatorControl, long size, Drop<Buffer> drop, Cleaner cleaner);
Buffer allocateConstChild(Buffer readOnlyConstParent);
Drop<Buffer> drop();

View File

@ -34,6 +34,11 @@ public class ByteBufferMemoryManager implements MemoryManager {
this.direct = direct;
}
@Override
public boolean isNative() {
return direct;
}
@Override
public Buffer allocateShared(AllocatorControl allocatorControl, long size, Drop<Buffer> drop, Cleaner cleaner) {
int capacity = Math.toIntExact(size);

View File

@ -24,4 +24,9 @@ public class HeapMemorySegmentManager extends AbstractMemorySegmentManager {
protected MemorySegment createSegment(long size, Cleaner cleaner) {
return MemorySegment.ofArray(new byte[Math.toIntExact(size)]);
}
@Override
public boolean isNative() {
return false;
}
}

View File

@ -52,6 +52,11 @@ public class NativeMemorySegmentManager extends AbstractMemorySegmentManager {
}
}
@Override
public boolean isNative() {
return true;
}
@Override
protected MemorySegment createSegment(long size, Cleaner cleaner) {
final ResourceScope scope = cleaner == null ? newSharedScope() : newSharedScope(cleaner);

View File

@ -0,0 +1,10 @@
package io.netty.buffer.api.pool;
import io.netty.buffer.api.BufferAllocator;
public interface ByteBufAllocatorMetric {
/**
* Returns the number of bytes of heap memory used by a {@link BufferAllocator} or {@code -1} if unknown.
*/
long usedMemory();
}

View File

@ -0,0 +1,11 @@
package io.netty.buffer.api.pool;
import io.netty.buffer.api.BufferAllocator;
public interface ByteBufAllocatorMetricProvider {
/**
* Returns a {@link ByteBufAllocatorMetric} for a {@link BufferAllocator}.
*/
ByteBufAllocatorMetric metric();
}

View File

@ -0,0 +1,114 @@
package io.netty.buffer.api.pool;
/**
* Internal primitive map implementation that is specifically optimised for the runs availability map use case in
* {@link PoolChunk}.
*/
final class LongLongHashMap {
private static final int MASK_TEMPLATE = ~1;
private int mask;
private long[] array;
private int maxProbe;
private long zeroVal;
private final long emptyVal;
LongLongHashMap(long emptyVal) {
this.emptyVal = emptyVal;
zeroVal = emptyVal;
int initialSize = 32;
array = new long[initialSize];
mask = initialSize - 1;
computeMaskAndProbe();
}
public long put(long key, long value) {
if (key == 0) {
long prev = zeroVal;
zeroVal = value;
return prev;
}
for (;;) {
int index = index(key);
for (int i = 0; i < maxProbe; i++) {
long existing = array[index];
if (existing == key || existing == 0) {
long prev = existing == 0? emptyVal : array[index + 1];
array[index] = key;
array[index + 1] = value;
for (; i < maxProbe; i++) { // Nerf any existing misplaced entries.
index = index + 2 & mask;
if (array[index] == key) {
array[index] = 0;
prev = array[index + 1];
break;
}
}
return prev;
}
index = index + 2 & mask;
}
expand(); // Grow array and re-hash.
}
}
public void remove(long key) {
if (key == 0) {
zeroVal = emptyVal;
return;
}
int index = index(key);
for (int i = 0; i < maxProbe; i++) {
long existing = array[index];
if (existing == key) {
array[index] = 0;
break;
}
index = index + 2 & mask;
}
}
public long get(long key) {
if (key == 0) {
return zeroVal;
}
int index = index(key);
for (int i = 0; i < maxProbe; i++) {
long existing = array[index];
if (existing == key) {
return array[index + 1];
}
index = index + 2 & mask;
}
return emptyVal;
}
private int index(long key) {
// Hash with murmur64, and mask.
key ^= key >>> 33;
key *= 0xff51afd7ed558ccdL;
key ^= key >>> 33;
key *= 0xc4ceb9fe1a85ec53L;
key ^= key >>> 33;
return (int) key & mask;
}
private void expand() {
long[] prev = array;
array = new long[prev.length * 2];
computeMaskAndProbe();
for (int i = 0; i < prev.length; i += 2) {
long key = prev[i];
if (key != 0) {
long val = prev[i + 1];
put(key, val);
}
}
}
private void computeMaskAndProbe() {
int length = array.length;
mask = length - 1 & MASK_TEMPLATE;
maxProbe = (int) Math.log(length);
}
}

View File

@ -0,0 +1,92 @@
package io.netty.buffer.api.pool;
import java.util.Arrays;
/**
* Internal primitive priority queue, used by {@link PoolChunk}.
* The implementation is based on the binary heap, as described in Algorithms by Sedgewick and Wayne.
*/
final class LongPriorityQueue {
public static final int NO_VALUE = -1;
private long[] array = new long[9];
private int size;
public void offer(long handle) {
if (handle == NO_VALUE) {
throw new IllegalArgumentException("The NO_VALUE (" + NO_VALUE + ") cannot be added to the queue.");
}
size++;
if (size == array.length) {
// Grow queue capacity.
array = Arrays.copyOf(array, 1 + (array.length - 1) * 2);
}
array[size] = handle;
lift(size);
}
public void remove(long value) {
for (int i = 1; i <= size; i++) {
if (array[i] == value) {
array[i] = array[size--];
lift(i);
sink(i);
return;
}
}
}
public long peek() {
if (size == 0) {
return NO_VALUE;
}
return array[1];
}
public long poll() {
if (size == 0) {
return NO_VALUE;
}
long val = array[1];
array[1] = array[size];
array[size] = 0;
size--;
sink(1);
return val;
}
public boolean isEmpty() {
return size == 0;
}
private void lift(int index) {
int parentIndex;
while (index > 1 && subord(parentIndex = index >> 1, index)) {
swap(index, parentIndex);
index = parentIndex;
}
}
private void sink(int index) {
int child;
while ((child = index << 1) <= size) {
if (child < size && subord(child, child + 1)) {
child++;
}
if (!subord(index, child)) {
break;
}
swap(index, child);
index = child;
}
}
private boolean subord(int a, int b) {
return array[a] > array[b];
}
private void swap(int a, int b) {
long value = array[a];
array[a] = array[b];
array[b] = value;
}
}

View File

@ -0,0 +1,457 @@
package io.netty.buffer.api.pool;
import io.netty.buffer.api.Buffer;
import io.netty.buffer.api.BufferAllocator;
import io.netty.buffer.api.MemoryManager;
import io.netty.buffer.api.internal.Statics;
import io.netty.util.internal.StringUtil;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.LongAdder;
import static io.netty.buffer.api.pool.PoolChunk.isSubpage;
import static java.lang.Math.max;
class PoolArena extends SizeClasses implements PoolArenaMetric {
enum SizeClass {
Small,
Normal
}
final PooledByteBufAllocator parent;
final MemoryManager manager;
final int numSmallSubpagePools;
final int directMemoryCacheAlignment;
private final PoolSubpage[] smallSubpagePools;
private final PoolChunkList q050;
private final PoolChunkList q025;
private final PoolChunkList q000;
private final PoolChunkList qInit;
private final PoolChunkList q075;
private final PoolChunkList q100;
private final List<PoolChunkListMetric> chunkListMetrics;
// Metrics for allocations and deallocations
private long allocationsNormal;
// We need to use the LongAdder here as this is not guarded via synchronized block.
private final LongAdder allocationsSmall = new LongAdder();
private final LongAdder allocationsHuge = new LongAdder();
private final LongAdder activeBytesHuge = new LongAdder();
private long deallocationsSmall;
private long deallocationsNormal;
// We need to use the LongAdder here as this is not guarded via synchronized block.
private final LongAdder deallocationsHuge = new LongAdder();
// Number of thread caches backed by this arena.
final AtomicInteger numThreadCaches = new AtomicInteger();
protected PoolArena(PooledByteBufAllocator parent, MemoryManager manager, int pageSize,
int pageShifts, int chunkSize, int cacheAlignment) {
super(pageSize, pageShifts, chunkSize, cacheAlignment);
this.parent = parent;
this.manager = manager;
directMemoryCacheAlignment = cacheAlignment;
numSmallSubpagePools = nSubpages;
smallSubpagePools = newSubpagePoolArray(numSmallSubpagePools);
for (int i = 0; i < smallSubpagePools.length; i ++) {
smallSubpagePools[i] = newSubpagePoolHead();
}
q100 = new PoolChunkList(this, null, 100, Integer.MAX_VALUE, chunkSize);
q075 = new PoolChunkList(this, q100, 75, 100, chunkSize);
q050 = new PoolChunkList(this, q075, 50, 100, chunkSize);
q025 = new PoolChunkList(this, q050, 25, 75, chunkSize);
q000 = new PoolChunkList(this, q025, 1, 50, chunkSize);
qInit = new PoolChunkList(this, q000, Integer.MIN_VALUE, 25, chunkSize);
q100.prevList(q075);
q075.prevList(q050);
q050.prevList(q025);
q025.prevList(q000);
q000.prevList(null);
qInit.prevList(qInit);
chunkListMetrics = List.of(qInit, q000, q025, q050, q075, q100);
}
private static PoolSubpage newSubpagePoolHead() {
PoolSubpage head = new PoolSubpage();
head.prev = head;
head.next = head;
return head;
}
private static PoolSubpage[] newSubpagePoolArray(int size) {
return new PoolSubpage[size];
}
boolean isDirect() {
return manager.isNative();
}
Buffer allocate(PoolThreadCache cache, int size) {
final int sizeIdx = size2SizeIdx(size);
if (sizeIdx <= smallMaxSizeIdx) {
return tcacheAllocateSmall(cache, size, sizeIdx);
} else if (sizeIdx < nSizes) {
return tcacheAllocateNormal(cache, size, sizeIdx);
} else {
int normCapacity = directMemoryCacheAlignment > 0
? normalizeSize(size) : size;
// Huge allocations are never served via the cache so just call allocateHuge
return allocateHuge(normCapacity);
}
}
private Buffer tcacheAllocateSmall(PoolThreadCache cache, final int size,
final int sizeIdx) {
Buffer buffer = cache.allocateSmall(size, sizeIdx);
if (buffer != null) {
// was able to allocate out of the cache so move on
return buffer;
}
/*
* Synchronize on the head. This is needed as {@link PoolChunk#allocateSubpage(int)} and
* {@link PoolChunk#free(long)} may modify the doubly linked list as well.
*/
final PoolSubpage head = smallSubpagePools[sizeIdx];
final boolean needsNormalAllocation;
synchronized (head) {
final PoolSubpage s = head.next;
needsNormalAllocation = s == head;
if (!needsNormalAllocation) {
assert s.doNotDestroy && s.elemSize == sizeIdx2size(sizeIdx);
long handle = s.allocate();
assert handle >= 0;
buffer = s.chunk.allocateBufferWithSubpage(handle, size, cache);
}
}
if (needsNormalAllocation) {
synchronized (this) {
buffer = allocateNormal(size, sizeIdx, cache);
}
}
incSmallAllocation();
return buffer;
}
private Buffer tcacheAllocateNormal(PoolThreadCache cache, int size, int sizeIdx) {
Buffer buffer = cache.allocateNormal(this, size, sizeIdx);
if (buffer != null) {
// was able to allocate out of the cache so move on
return buffer;
}
synchronized (this) {
buffer = allocateNormal(size, sizeIdx, cache);
allocationsNormal++;
}
return buffer;
}
// Method must be called inside synchronized(this) { ... } block
private Buffer allocateNormal(int size, int sizeIdx, PoolThreadCache threadCache) {
Buffer buffer = q050.allocate(size, sizeIdx, threadCache);
if (buffer != null) {
return buffer;
}
buffer = q025.allocate(size, sizeIdx, threadCache);
if (buffer != null) {
return buffer;
}
buffer = q000.allocate(size, sizeIdx, threadCache);
if (buffer != null) {
return buffer;
}
buffer = qInit.allocate(size, sizeIdx, threadCache);
if (buffer != null) {
return buffer;
}
buffer = q075.allocate(size, sizeIdx, threadCache);
if (buffer != null) {
return buffer;
}
// Add a new chunk.
PoolChunk c = newChunk(pageSize, nPSizes, pageShifts, chunkSize);
buffer = c.allocate(size, sizeIdx, threadCache);
assert buffer != null;
qInit.add(c);
return buffer;
}
private void incSmallAllocation() {
allocationsSmall.increment();
}
private Buffer allocateHuge(int size) {
activeBytesHuge.add(size);
allocationsHuge.increment();
BufferAllocator allocator = isDirect()? BufferAllocator.direct() : BufferAllocator.heap();
return allocator.allocate(size);
}
void free(PoolChunk chunk, long handle, int normCapacity, PoolThreadCache cache) {
if (chunk.unpooled) {
int size = chunk.chunkSize();
chunk.destroy();
activeBytesHuge.add(-size);
deallocationsHuge.increment();
} else {
SizeClass sizeClass = sizeClass(handle);
if (cache != null && cache.add(this, chunk, handle, normCapacity, sizeClass)) {
// cached so not free it.
return;
}
freeChunk(chunk, handle, normCapacity, sizeClass);
}
}
private static SizeClass sizeClass(long handle) {
return isSubpage(handle) ? SizeClass.Small : SizeClass.Normal;
}
void freeChunk(PoolChunk chunk, long handle, int normCapacity, SizeClass sizeClass) {
final boolean destroyChunk;
synchronized (this) {
switch (sizeClass) {
case Normal:
++deallocationsNormal;
break;
case Small:
++deallocationsSmall;
break;
default:
throw new Error();
}
destroyChunk = !chunk.parent.free(chunk, handle, normCapacity);
}
if (destroyChunk) {
// destroyChunk not need to be called while holding the synchronized lock.
chunk.destroy();
}
}
PoolSubpage findSubpagePoolHead(int sizeIdx) {
return smallSubpagePools[sizeIdx];
}
@Override
public int numThreadCaches() {
return numThreadCaches.get();
}
@Override
public int numSmallSubpages() {
return smallSubpagePools.length;
}
@Override
public int numChunkLists() {
return chunkListMetrics.size();
}
@Override
public List<PoolSubpageMetric> smallSubpages() {
return subPageMetricList(smallSubpagePools);
}
@Override
public List<PoolChunkListMetric> chunkLists() {
return chunkListMetrics;
}
private static List<PoolSubpageMetric> subPageMetricList(PoolSubpage[] pages) {
List<PoolSubpageMetric> metrics = new ArrayList<>();
for (PoolSubpage head : pages) {
if (head.next == head) {
continue;
}
PoolSubpage s = head.next;
do {
metrics.add(s);
s = s.next;
} while (s != head);
}
return metrics;
}
@Override
public long numAllocations() {
final long allocsNormal;
synchronized (this) {
allocsNormal = allocationsNormal;
}
return allocationsSmall.longValue() + allocsNormal + allocationsHuge.longValue();
}
@Override
public long numTinyAllocations() {
return 0;
}
@Override
public long numSmallAllocations() {
return allocationsSmall.longValue();
}
@Override
public synchronized long numNormalAllocations() {
return allocationsNormal;
}
@Override
public long numDeallocations() {
final long deallocs;
synchronized (this) {
deallocs = deallocationsSmall + deallocationsNormal;
}
return deallocs + deallocationsHuge.longValue();
}
@Override
public long numTinyDeallocations() {
return 0;
}
@Override
public synchronized long numSmallDeallocations() {
return deallocationsSmall;
}
@Override
public synchronized long numNormalDeallocations() {
return deallocationsNormal;
}
@Override
public long numHugeAllocations() {
return allocationsHuge.longValue();
}
@Override
public long numHugeDeallocations() {
return deallocationsHuge.longValue();
}
@Override
public long numActiveAllocations() {
long val = allocationsSmall.longValue() + allocationsHuge.longValue()
- deallocationsHuge.longValue();
synchronized (this) {
val += allocationsNormal - (deallocationsSmall + deallocationsNormal);
}
return max(val, 0);
}
@Override
public long numActiveTinyAllocations() {
return 0;
}
@Override
public long numActiveSmallAllocations() {
return max(numSmallAllocations() - numSmallDeallocations(), 0);
}
@Override
public long numActiveNormalAllocations() {
final long val;
synchronized (this) {
val = allocationsNormal - deallocationsNormal;
}
return max(val, 0);
}
@Override
public long numActiveHugeAllocations() {
return max(numHugeAllocations() - numHugeDeallocations(), 0);
}
@Override
public long numActiveBytes() {
long val = activeBytesHuge.longValue();
synchronized (this) {
for (int i = 0; i < chunkListMetrics.size(); i++) {
for (PoolChunkMetric m: chunkListMetrics.get(i)) {
val += m.chunkSize();
}
}
}
return max(0, val);
}
protected final PoolChunk newChunk(int pageSize, int maxPageIdx, int pageShifts, int chunkSize) {
Buffer base = manager.allocateShared(parent, chunkSize, manager.drop(), Statics.CLEANER);
Object memory = manager.unwrapRecoverableMemory(base);
return new PoolChunk(
this, base, memory, pageSize, pageShifts, chunkSize, maxPageIdx);
}
@Override
public synchronized String toString() {
StringBuilder buf = new StringBuilder()
.append("Chunk(s) at 0~25%:")
.append(StringUtil.NEWLINE)
.append(qInit)
.append(StringUtil.NEWLINE)
.append("Chunk(s) at 0~50%:")
.append(StringUtil.NEWLINE)
.append(q000)
.append(StringUtil.NEWLINE)
.append("Chunk(s) at 25~75%:")
.append(StringUtil.NEWLINE)
.append(q025)
.append(StringUtil.NEWLINE)
.append("Chunk(s) at 50~100%:")
.append(StringUtil.NEWLINE)
.append(q050)
.append(StringUtil.NEWLINE)
.append("Chunk(s) at 75~100%:")
.append(StringUtil.NEWLINE)
.append(q075)
.append(StringUtil.NEWLINE)
.append("Chunk(s) at 100%:")
.append(StringUtil.NEWLINE)
.append(q100)
.append(StringUtil.NEWLINE)
.append("small subpages:");
appendPoolSubPages(buf, smallSubpagePools);
buf.append(StringUtil.NEWLINE);
return buf.toString();
}
private static void appendPoolSubPages(StringBuilder buf, PoolSubpage[] subpages) {
for (int i = 0; i < subpages.length; i ++) {
PoolSubpage head = subpages[i];
if (head.next == head) {
continue;
}
buf.append(StringUtil.NEWLINE)
.append(i)
.append(": ");
PoolSubpage s = head.next;
do {
buf.append(s);
s = s.next;
} while (s != head);
}
}
}

View File

@ -0,0 +1,123 @@
package io.netty.buffer.api.pool;
import java.util.List;
/**
* Expose metrics for an arena.
*/
public interface PoolArenaMetric extends SizeClassesMetric {
/**
* Returns the number of thread caches backed by this arena.
*/
int numThreadCaches();
/**
* Returns the number of small sub-pages for the arena.
*/
int numSmallSubpages();
/**
* Returns the number of chunk lists for the arena.
*/
int numChunkLists();
/**
* Returns an unmodifiable {@link List} which holds {@link PoolSubpageMetric}s for small sub-pages.
*/
List<PoolSubpageMetric> smallSubpages();
/**
* Returns an unmodifiable {@link List} which holds {@link PoolChunkListMetric}s.
*/
List<PoolChunkListMetric> chunkLists();
/**
* Return the number of allocations done via the arena. This includes all sizes.
*/
long numAllocations();
/**
* Return the number of tiny allocations done via the arena.
*
* @deprecated Tiny allocations have been merged into small allocations.
*/
@Deprecated
long numTinyAllocations();
/**
* Return the number of small allocations done via the arena.
*/
long numSmallAllocations();
/**
* Return the number of normal allocations done via the arena.
*/
long numNormalAllocations();
/**
* Return the number of huge allocations done via the arena.
*/
long numHugeAllocations();
/**
* Return the number of deallocations done via the arena. This includes all sizes.
*/
long numDeallocations();
/**
* Return the number of tiny deallocations done via the arena.
*
* @deprecated Tiny deallocations have been merged into small deallocations.
*/
@Deprecated
long numTinyDeallocations();
/**
* Return the number of small deallocations done via the arena.
*/
long numSmallDeallocations();
/**
* Return the number of normal deallocations done via the arena.
*/
long numNormalDeallocations();
/**
* Return the number of huge deallocations done via the arena.
*/
long numHugeDeallocations();
/**
* Return the number of currently active allocations.
*/
long numActiveAllocations();
/**
* Return the number of currently active tiny allocations.
*
* @deprecated Tiny allocations have been merged into small allocations.
*/
@Deprecated
long numActiveTinyAllocations();
/**
* Return the number of currently active small allocations.
*/
long numActiveSmallAllocations();
/**
* Return the number of currently active normal allocations.
*/
long numActiveNormalAllocations();
/**
* Return the number of currently active huge allocations.
*/
long numActiveHugeAllocations();
/**
* Return the number of active bytes that are currently allocated by the arena.
*/
long numActiveBytes();
}

View File

@ -0,0 +1,613 @@
package io.netty.buffer.api.pool;
import io.netty.buffer.api.Buffer;
import io.netty.buffer.api.Drop;
import java.util.PriorityQueue;
/**
* Description of algorithm for PageRun/PoolSubpage allocation from PoolChunk
*
* Notation: The following terms are important to understand the code
* > page - a page is the smallest unit of memory chunk that can be allocated
* > run - a run is a collection of pages
* > chunk - a chunk is a collection of runs
* > in this code chunkSize = maxPages * pageSize
*
* To begin we allocate a byte array of size = chunkSize
* Whenever a ByteBuf of given size needs to be created we search for the first position
* in the byte array that has enough empty space to accommodate the requested size and
* return a (long) handle that encodes this offset information, (this memory segment is then
* marked as reserved, so it is always used by exactly one ByteBuf and no more)
*
* For simplicity all sizes are normalized according to {@link PoolArena#size2SizeIdx(int)} method.
* This ensures that when we request for memory segments of size > pageSize the normalizedCapacity
* equals the next nearest size in {@link SizeClasses}.
*
*
* A chunk has the following layout:
*
* /-----------------\
* | run |
* | |
* | |
* |-----------------|
* | run |
* | |
* |-----------------|
* | unalloctated |
* | (freed) |
* | |
* |-----------------|
* | subpage |
* |-----------------|
* | unallocated |
* | (freed) |
* | ... |
* | ... |
* | ... |
* | |
* | |
* | |
* \-----------------/
*
*
* handle:
* -------
* a handle is a long number, the bit layout of a run looks like:
*
* oooooooo ooooooos ssssssss ssssssue bbbbbbbb bbbbbbbb bbbbbbbb bbbbbbbb
*
* o: runOffset (page offset in the chunk), 15bit
* s: size (number of pages) of this run, 15bit
* u: isUsed?, 1bit
* e: isSubpage?, 1bit
* b: bitmapIdx of subpage, zero if it's not subpage, 32bit
*
* runsAvailMap:
* ------
* a map which manages all runs (used and not in used).
* For each run, the first runOffset and last runOffset are stored in runsAvailMap.
* key: runOffset
* value: handle
*
* runsAvail:
* ----------
* an array of {@link PriorityQueue}.
* Each queue manages same size of runs.
* Runs are sorted by offset, so that we always allocate runs with smaller offset.
*
*
* Algorithm:
* ----------
*
* As we allocate runs, we update values stored in runsAvailMap and runsAvail so that the property is maintained.
*
* Initialization -
* In the beginning we store the initial run which is the whole chunk.
* The initial run:
* runOffset = 0
* size = chunkSize
* isUsed = no
* isSubpage = no
* bitmapIdx = 0
*
*
* Algorithm: [allocateRun(size)]
* ----------
* 1) find the first avail run using in runsAvails according to size
* 2) if pages of run is larger than request pages then split it, and save the tailing run
* for later using
*
* Algorithm: [allocateSubpage(size)]
* ----------
* 1) find a not full subpage according to size.
* if it already exists just return, otherwise allocate a new PoolSubpage and call init()
* note that this subpage object is added to subpagesPool in the PoolArena when we init() it
* 2) call subpage.allocate()
*
* Algorithm: [free(handle, length, nioBuffer)]
* ----------
* 1) if it is a subpage, return the slab back into this subpage
* 2) if the subpage is not used, or it is a run, then start free this run
* 3) merge continuous avail runs
* 4) save the merged run
*
*/
final class PoolChunk implements PoolChunkMetric {
private static final int SIZE_BIT_LENGTH = 15;
private static final int INUSED_BIT_LENGTH = 1;
private static final int SUBPAGE_BIT_LENGTH = 1;
private static final int BITMAP_IDX_BIT_LENGTH = 32;
static final int IS_SUBPAGE_SHIFT = BITMAP_IDX_BIT_LENGTH;
static final int IS_USED_SHIFT = SUBPAGE_BIT_LENGTH + IS_SUBPAGE_SHIFT;
static final int SIZE_SHIFT = INUSED_BIT_LENGTH + IS_USED_SHIFT;
static final int RUN_OFFSET_SHIFT = SIZE_BIT_LENGTH + SIZE_SHIFT;
final PoolArena arena;
final Buffer base; // The buffer that is the source of the memory. Closing it will free the memory.
final Object memory;
final boolean unpooled;
/**
* store the first page and last page of each avail run
*/
private final LongLongHashMap runsAvailMap;
/**
* manage all avail runs
*/
private final LongPriorityQueue[] runsAvail;
/**
* manage all subpages in this chunk
*/
private final PoolSubpage[] subpages;
private final int pageSize;
private final int pageShifts;
private final int chunkSize;
int freeBytes;
PoolChunkList parent;
PoolChunk prev;
PoolChunk next;
PoolChunk(PoolArena arena, Buffer base, Object memory, int pageSize, int pageShifts, int chunkSize, int maxPageIdx) {
unpooled = false;
this.arena = arena;
this.base = base;
this.memory = memory;
this.pageSize = pageSize;
this.pageShifts = pageShifts;
this.chunkSize = chunkSize;
freeBytes = chunkSize;
runsAvail = newRunsAvailqueueArray(maxPageIdx);
runsAvailMap = new LongLongHashMap(-1);
subpages = new PoolSubpage[chunkSize >> pageShifts];
//insert initial run, offset = 0, pages = chunkSize / pageSize
int pages = chunkSize >> pageShifts;
long initHandle = (long) pages << SIZE_SHIFT;
insertAvailRun(0, pages, initHandle);
}
/** Creates a special chunk that is not pooled. */
PoolChunk(PoolArena arena, Buffer base, Object memory, int size) {
unpooled = true;
this.arena = arena;
this.base = base;
this.memory = memory;
pageSize = 0;
pageShifts = 0;
runsAvailMap = null;
runsAvail = null;
subpages = null;
chunkSize = size;
}
private static LongPriorityQueue[] newRunsAvailqueueArray(int size) {
LongPriorityQueue[] queueArray = new LongPriorityQueue[size];
for (int i = 0; i < queueArray.length; i++) {
queueArray[i] = new LongPriorityQueue();
}
return queueArray;
}
private void insertAvailRun(int runOffset, int pages, long handle) {
int pageIdxFloor = arena.pages2pageIdxFloor(pages);
LongPriorityQueue queue = runsAvail[pageIdxFloor];
queue.offer(handle);
//insert first page of run
insertAvailRun0(runOffset, handle);
if (pages > 1) {
//insert last page of run
insertAvailRun0(lastPage(runOffset, pages), handle);
}
}
private void insertAvailRun0(int runOffset, long handle) {
long pre = runsAvailMap.put(runOffset, handle);
assert pre == -1;
}
private void removeAvailRun(long handle) {
int pageIdxFloor = arena.pages2pageIdxFloor(runPages(handle));
LongPriorityQueue queue = runsAvail[pageIdxFloor];
removeAvailRun(queue, handle);
}
private void removeAvailRun(LongPriorityQueue queue, long handle) {
queue.remove(handle);
int runOffset = runOffset(handle);
int pages = runPages(handle);
//remove first page of run
runsAvailMap.remove(runOffset);
if (pages > 1) {
//remove last page of run
runsAvailMap.remove(lastPage(runOffset, pages));
}
}
private static int lastPage(int runOffset, int pages) {
return runOffset + pages - 1;
}
private long getAvailRunByOffset(int runOffset) {
return runsAvailMap.get(runOffset);
}
@Override
public int usage() {
final int freeBytes;
synchronized (arena) {
freeBytes = this.freeBytes;
}
return usage(freeBytes);
}
private int usage(int freeBytes) {
if (freeBytes == 0) {
return 100;
}
int freePercentage = (int) (freeBytes * 100L / chunkSize);
if (freePercentage == 0) {
return 99;
}
return 100 - freePercentage;
}
Buffer allocate(int size, int sizeIdx, PoolThreadCache cache) {
final long handle;
if (sizeIdx <= arena.smallMaxSizeIdx) {
// small
handle = allocateSubpage(sizeIdx);
if (handle < 0) {
return null;
}
assert isSubpage(handle);
} else {
// normal
// runSize must be multiple of pageSize
int runSize = arena.sizeIdx2size(sizeIdx);
handle = allocateRun(runSize);
if (handle < 0) {
return null;
}
}
return allocateBuffer(handle, size, cache);
}
private long allocateRun(int runSize) {
int pages = runSize >> pageShifts;
int pageIdx = arena.pages2pageIdx(pages);
synchronized (runsAvail) {
//find first queue which has at least one big enough run
int queueIdx = runFirstBestFit(pageIdx);
if (queueIdx == -1) {
return -1;
}
//get run with min offset in this queue
LongPriorityQueue queue = runsAvail[queueIdx];
long handle = queue.poll();
assert handle != LongPriorityQueue.NO_VALUE && !isUsed(handle) : "invalid handle: " + handle;
removeAvailRun(queue, handle);
if (handle != -1) {
handle = splitLargeRun(handle, pages);
}
freeBytes -= runSize(pageShifts, handle);
return handle;
}
}
private int calculateRunSize(int sizeIdx) {
int maxElements = 1 << pageShifts - SizeClasses.LOG2_QUANTUM;
int runSize = 0;
int nElements;
final int elemSize = arena.sizeIdx2size(sizeIdx);
// Find the lowest common multiple of pageSize and elemSize
do {
runSize += pageSize;
nElements = runSize / elemSize;
} while (nElements < maxElements && runSize != nElements * elemSize);
while (nElements > maxElements) {
runSize -= pageSize;
nElements = runSize / elemSize;
}
assert nElements > 0;
assert runSize <= chunkSize;
assert runSize >= elemSize;
return runSize;
}
private int runFirstBestFit(int pageIdx) {
if (freeBytes == chunkSize) {
return arena.nPSizes - 1;
}
for (int i = pageIdx; i < arena.nPSizes; i++) {
LongPriorityQueue queue = runsAvail[i];
if (queue != null && !queue.isEmpty()) {
return i;
}
}
return -1;
}
private long splitLargeRun(long handle, int needPages) {
assert needPages > 0;
int totalPages = runPages(handle);
assert needPages <= totalPages;
int remPages = totalPages - needPages;
if (remPages > 0) {
int runOffset = runOffset(handle);
// keep track of trailing unused pages for later use
int availOffset = runOffset + needPages;
long availRun = toRunHandle(availOffset, remPages, 0);
insertAvailRun(availOffset, remPages, availRun);
// not avail
return toRunHandle(runOffset, needPages, 1);
}
//mark it as used
handle |= 1L << IS_USED_SHIFT;
return handle;
}
/**
* Create / initialize a new PoolSubpage of normCapacity. Any PoolSubpage created / initialized here is added to
* subpage pool in the PoolArena that owns this PoolChunk
*
* @param sizeIdx sizeIdx of normalized size
*
* @return index in memoryMap
*/
private long allocateSubpage(int sizeIdx) {
// Obtain the head of the PoolSubPage pool that is owned by the PoolArena and synchronize on it.
// This is need as we may add it back and so alter the linked-list structure.
PoolSubpage head = arena.findSubpagePoolHead(sizeIdx);
synchronized (head) {
//allocate a new run
int runSize = calculateRunSize(sizeIdx);
//runSize must be multiples of pageSize
long runHandle = allocateRun(runSize);
if (runHandle < 0) {
return -1;
}
int runOffset = runOffset(runHandle);
assert subpages[runOffset] == null;
int elemSize = arena.sizeIdx2size(sizeIdx);
PoolSubpage subpage = new PoolSubpage(head, this, pageShifts, runOffset,
runSize(pageShifts, runHandle), elemSize);
subpages[runOffset] = subpage;
return subpage.allocate();
}
}
/**
* Free a subpage, or a run of pages When a subpage is freed from PoolSubpage, it might be added back to subpage
* pool of the owning PoolArena. If the subpage pool in PoolArena has at least one other PoolSubpage of given
* elemSize, we can completely free the owning Page, so it is available for subsequent allocations.
*
* @param handle handle to free
*/
void free(long handle, int normCapacity) {
if (isSubpage(handle)) {
int sizeIdx = arena.size2SizeIdx(normCapacity);
PoolSubpage head = arena.findSubpagePoolHead(sizeIdx);
int sIdx = runOffset(handle);
PoolSubpage subpage = subpages[sIdx];
assert subpage != null && subpage.doNotDestroy;
// Obtain the head of the PoolSubPage pool that is owned by the PoolArena and synchronize on it.
// This is need as we may add it back and so alter the linked-list structure.
synchronized (head) {
if (subpage.free(head, bitmapIdx(handle))) {
//the subpage is still used, do not free it
return;
}
assert !subpage.doNotDestroy;
// Null out slot in the array as it was freed, and we should not use it anymore.
subpages[sIdx] = null;
}
}
//start free run
int pages = runPages(handle);
synchronized (runsAvail) {
// collapse continuous runs, successfully collapsed runs
// will be removed from runsAvail and runsAvailMap
long finalRun = collapseRuns(handle);
//set run as not used
finalRun &= ~(1L << IS_USED_SHIFT);
//if it is a subpage, set it to run
finalRun &= ~(1L << IS_SUBPAGE_SHIFT);
insertAvailRun(runOffset(finalRun), runPages(finalRun), finalRun);
freeBytes += pages << pageShifts;
}
}
private long collapseRuns(long handle) {
return collapseNext(collapsePast(handle));
}
private long collapsePast(long handle) {
for (;;) {
int runOffset = runOffset(handle);
int runPages = runPages(handle);
long pastRun = getAvailRunByOffset(runOffset - 1);
if (pastRun == -1) {
return handle;
}
int pastOffset = runOffset(pastRun);
int pastPages = runPages(pastRun);
//is continuous
if (pastRun != handle && pastOffset + pastPages == runOffset) {
//remove past run
removeAvailRun(pastRun);
handle = toRunHandle(pastOffset, pastPages + runPages, 0);
} else {
return handle;
}
}
}
private long collapseNext(long handle) {
for (;;) {
int runOffset = runOffset(handle);
int runPages = runPages(handle);
long nextRun = getAvailRunByOffset(runOffset + runPages);
if (nextRun == -1) {
return handle;
}
int nextOffset = runOffset(nextRun);
int nextPages = runPages(nextRun);
//is continuous
if (nextRun != handle && runOffset + runPages == nextOffset) {
//remove next run
removeAvailRun(nextRun);
handle = toRunHandle(runOffset, runPages + nextPages, 0);
} else {
return handle;
}
}
}
private static long toRunHandle(int runOffset, int runPages, int inUsed) {
return (long) runOffset << RUN_OFFSET_SHIFT
| (long) runPages << SIZE_SHIFT
| (long) inUsed << IS_USED_SHIFT;
}
Buffer allocateBuffer(long handle, int size, PoolThreadCache threadCache) {
if (isRun(handle)) {
int offset = runOffset(handle) << pageShifts;
int maxLength = runSize(pageShifts, handle);
PoolThreadCache poolThreadCache = arena.parent.threadCache();
return arena.manager.recoverMemory(arena.parent, memory, offset, size, new Drop<Buffer>() {
@Override
public void drop(Buffer obj) {
arena.free(PoolChunk.this, handle, maxLength, poolThreadCache);
}
});
} else {
return allocateBufferWithSubpage(handle, size, threadCache);
}
}
Buffer allocateBufferWithSubpage(long handle, int size, PoolThreadCache threadCache) {
int runOffset = runOffset(handle);
int bitmapIdx = bitmapIdx(handle);
PoolSubpage s = subpages[runOffset];
assert s.doNotDestroy;
assert size <= s.elemSize;
int offset = (runOffset << pageShifts) + bitmapIdx * s.elemSize;
return arena.manager.recoverMemory(arena.parent, memory, offset, size, new Drop<Buffer>() {
@Override
public void drop(Buffer obj) {
arena.free(PoolChunk.this, handle, s.elemSize, threadCache);
}
});
}
@Override
public int chunkSize() {
return chunkSize;
}
@Override
public int freeBytes() {
synchronized (arena) {
return freeBytes;
}
}
@Override
public String toString() {
final int freeBytes;
synchronized (arena) {
freeBytes = this.freeBytes;
}
return new StringBuilder()
.append("Chunk(")
.append(Integer.toHexString(System.identityHashCode(this)))
.append(": ")
.append(usage(freeBytes))
.append("%, ")
.append(chunkSize - freeBytes)
.append('/')
.append(chunkSize)
.append(')')
.toString();
}
void destroy() {
base.close();
}
static int runOffset(long handle) {
return (int) (handle >> RUN_OFFSET_SHIFT);
}
static int runSize(int pageShifts, long handle) {
return runPages(handle) << pageShifts;
}
static int runPages(long handle) {
return (int) (handle >> SIZE_SHIFT & 0x7fff);
}
static boolean isUsed(long handle) {
return (handle >> IS_USED_SHIFT & 1) == 1L;
}
static boolean isRun(long handle) {
return !isSubpage(handle);
}
static boolean isSubpage(long handle) {
return (handle >> IS_SUBPAGE_SHIFT & 1) == 1L;
}
static int bitmapIdx(long handle) {
return (int) handle;
}
}

View File

@ -0,0 +1,234 @@
package io.netty.buffer.api.pool;
import io.netty.buffer.api.Buffer;
import io.netty.util.internal.StringUtil;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import static java.lang.Math.*;
final class PoolChunkList implements PoolChunkListMetric {
private static final Iterator<PoolChunkMetric> EMPTY_METRICS = Collections.emptyIterator();
private final PoolArena arena;
private final PoolChunkList nextList;
private final int minUsage;
private final int maxUsage;
private final int maxCapacity;
private PoolChunk head;
private final int freeMinThreshold;
private final int freeMaxThreshold;
// This is only update once when create the linked like list of PoolChunkList in PoolArena constructor.
private PoolChunkList prevList;
PoolChunkList(PoolArena arena, PoolChunkList nextList, int minUsage, int maxUsage, int chunkSize) {
assert minUsage <= maxUsage;
this.arena = arena;
this.nextList = nextList;
this.minUsage = minUsage;
this.maxUsage = maxUsage;
maxCapacity = calculateMaxCapacity(minUsage, chunkSize);
// the thresholds are aligned with PoolChunk.usage() logic:
// 1) basic logic: usage() = 100 - freeBytes * 100L / chunkSize
// so, for example: (usage() >= maxUsage) condition can be transformed in the following way:
// 100 - freeBytes * 100L / chunkSize >= maxUsage
// freeBytes <= chunkSize * (100 - maxUsage) / 100
// let freeMinThreshold = chunkSize * (100 - maxUsage) / 100, then freeBytes <= freeMinThreshold
//
// 2) usage() returns an int value and has a floor rounding during a calculation,
// to be aligned absolute thresholds should be shifted for "the rounding step":
// freeBytes * 100 / chunkSize < 1
// the condition can be converted to: freeBytes < 1 * chunkSize / 100
// this is why we have + 0.99999999 shifts. A example why just +1 shift cannot be used:
// freeBytes = 16777216 == freeMaxThreshold: 16777216, usage = 0 < minUsage: 1, chunkSize: 16777216
// At the same time we want to have zero thresholds in case of (maxUsage == 100) and (minUsage == 100).
//
freeMinThreshold = maxUsage == 100 ? 0 : (int) (chunkSize * (100.0 - maxUsage + 0.99999999) / 100L);
freeMaxThreshold = minUsage == 100 ? 0 : (int) (chunkSize * (100.0 - minUsage + 0.99999999) / 100L);
}
/**
* Calculates the maximum capacity of a buffer that will ever be possible to allocate out of the {@link PoolChunk}s
* that belong to the {@link PoolChunkList} with the given {@code minUsage} and {@code maxUsage} settings.
*/
private static int calculateMaxCapacity(int minUsage, int chunkSize) {
minUsage = minUsage0(minUsage);
if (minUsage == 100) {
// If the minUsage is 100 we can not allocate anything out of this list.
return 0;
}
// Calculate the maximum amount of bytes that can be allocated from a PoolChunk in this PoolChunkList.
//
// As an example:
// - If a PoolChunkList has minUsage == 25 we are allowed to allocate at most 75% of the chunkSize because
// this is the maximum amount available in any PoolChunk in this PoolChunkList.
return (int) (chunkSize * (100L - minUsage) / 100L);
}
void prevList(PoolChunkList prevList) {
assert this.prevList == null;
this.prevList = prevList;
}
Buffer allocate(int size, int sizeIdx, PoolThreadCache threadCache) {
int normCapacity = arena.sizeIdx2size(sizeIdx);
if (normCapacity > maxCapacity) {
// Either this PoolChunkList is empty, or the requested capacity is larger than the capacity which can
// be handled by the PoolChunks that are contained in this PoolChunkList.
return null;
}
for (PoolChunk cur = head; cur != null; cur = cur.next) {
Buffer buffer = cur.allocate(size, sizeIdx, threadCache);
if (buffer != null) {
if (cur.freeBytes <= freeMinThreshold) {
remove(cur);
nextList.add(cur);
}
return buffer;
}
}
return null;
}
boolean free(PoolChunk chunk, long handle, int normCapacity) {
chunk.free(handle, normCapacity);
if (chunk.freeBytes > freeMaxThreshold) {
remove(chunk);
// Move the PoolChunk down the PoolChunkList linked-list.
return move0(chunk);
}
return true;
}
private boolean move(PoolChunk chunk) {
if (chunk.freeBytes > freeMaxThreshold) {
// Move the PoolChunk down the PoolChunkList linked-list.
return move0(chunk);
}
// PoolChunk fits into this PoolChunkList, adding it here.
add0(chunk);
return true;
}
/**
* Moves the {@link PoolChunk} down the {@link PoolChunkList} linked-list, so it will end up in the right
* {@link PoolChunkList} that has the correct minUsage / maxUsage in respect to {@link PoolChunk#usage()}.
*/
private boolean move0(PoolChunk chunk) {
if (prevList == null) {
// There is no previous PoolChunkList so return false which result in having the PoolChunk destroyed and
// all memory associated with the PoolChunk will be released.
return false;
}
return prevList.move(chunk);
}
void add(PoolChunk chunk) {
if (chunk.freeBytes <= freeMinThreshold) {
nextList.add(chunk);
return;
}
add0(chunk);
}
/**
* Adds the {@link PoolChunk} to this {@link PoolChunkList}.
*/
void add0(PoolChunk chunk) {
chunk.parent = this;
if (head == null) {
head = chunk;
chunk.prev = null;
chunk.next = null;
} else {
chunk.prev = null;
chunk.next = head;
head.prev = chunk;
head = chunk;
}
}
private void remove(PoolChunk cur) {
if (cur == head) {
head = cur.next;
if (head != null) {
head.prev = null;
}
} else {
PoolChunk next = cur.next;
cur.prev.next = next;
if (next != null) {
next.prev = cur.prev;
}
}
}
@Override
public int minUsage() {
return minUsage0(minUsage);
}
@Override
public int maxUsage() {
return min(maxUsage, 100);
}
private static int minUsage0(int value) {
return max(1, value);
}
@Override
public Iterator<PoolChunkMetric> iterator() {
synchronized (arena) {
if (head == null) {
return EMPTY_METRICS;
}
List<PoolChunkMetric> metrics = new ArrayList<>();
for (PoolChunk cur = head;;) {
metrics.add(cur);
cur = cur.next;
if (cur == null) {
break;
}
}
return metrics.iterator();
}
}
@Override
public String toString() {
StringBuilder buf = new StringBuilder();
synchronized (arena) {
if (head == null) {
return "none";
}
for (PoolChunk cur = head;;) {
buf.append(cur);
cur = cur.next;
if (cur == null) {
break;
}
buf.append(StringUtil.NEWLINE);
}
}
return buf.toString();
}
void destroy(PoolArena arena) {
PoolChunk chunk = head;
while (chunk != null) {
arena.destroyChunk(chunk);
chunk = chunk.next;
}
head = null;
}
}

View File

@ -0,0 +1,17 @@
package io.netty.buffer.api.pool;
/**
* Metrics for a list of chunks.
*/
public interface PoolChunkListMetric extends Iterable<PoolChunkMetric> {
/**
* Return the minimum usage of the chunk list before which chunks are promoted to the previous list.
*/
int minUsage();
/**
* Return the maximum usage of the chunk list after which chunks are promoted to the next list.
*/
int maxUsage();
}

View File

@ -0,0 +1,22 @@
package io.netty.buffer.api.pool;
/**
* Metrics for a chunk.
*/
public interface PoolChunkMetric {
/**
* Return the percentage of the current usage of the chunk.
*/
int usage();
/**
* Return the size of the chunk in bytes, this is the maximum of bytes that can be served out of the chunk.
*/
int chunkSize();
/**
* Return the number of free bytes in the chunk.
*/
int freeBytes();
}

View File

@ -0,0 +1,272 @@
package io.netty.buffer.api.pool;
import static io.netty.buffer.api.pool.PoolChunk.RUN_OFFSET_SHIFT;
import static io.netty.buffer.api.pool.PoolChunk.SIZE_SHIFT;
import static io.netty.buffer.api.pool.PoolChunk.IS_USED_SHIFT;
import static io.netty.buffer.api.pool.PoolChunk.IS_SUBPAGE_SHIFT;
import static io.netty.buffer.api.pool.SizeClasses.LOG2_QUANTUM;
final class PoolSubpage implements PoolSubpageMetric {
final PoolChunk chunk;
private final int pageShifts;
private final int runOffset;
private final int runSize;
private final long[] bitmap;
PoolSubpage prev;
PoolSubpage next;
boolean doNotDestroy;
int elemSize;
private int maxNumElems;
private int bitmapLength;
private int nextAvail;
private int numAvail;
/** Special constructor that creates a linked list head */
PoolSubpage() {
chunk = null;
pageShifts = -1;
runOffset = -1;
elemSize = -1;
runSize = -1;
bitmap = null;
}
PoolSubpage(PoolSubpage head, PoolChunk chunk, int pageShifts, int runOffset, int runSize, int elemSize) {
this.chunk = chunk;
this.pageShifts = pageShifts;
this.runOffset = runOffset;
this.runSize = runSize;
this.elemSize = elemSize;
bitmap = new long[runSize >>> 6 + LOG2_QUANTUM]; // runSize / 64 / QUANTUM
doNotDestroy = true;
if (elemSize != 0) {
maxNumElems = numAvail = runSize / elemSize;
nextAvail = 0;
bitmapLength = maxNumElems >>> 6;
if ((maxNumElems & 63) != 0) {
bitmapLength ++;
}
for (int i = 0; i < bitmapLength; i ++) {
bitmap[i] = 0;
}
}
addToPool(head);
}
/**
* Returns the bitmap index of the subpage allocation.
*/
long allocate() {
if (numAvail == 0 || !doNotDestroy) {
return -1;
}
final int bitmapIdx = getNextAvail();
int q = bitmapIdx >>> 6;
int r = bitmapIdx & 63;
assert (bitmap[q] >>> r & 1) == 0;
bitmap[q] |= 1L << r;
if (-- numAvail == 0) {
removeFromPool();
}
return toHandle(bitmapIdx);
}
/**
* @return {@code true} if this subpage is in use.
* {@code false} if this subpage is not used by its chunk and thus it's OK to be released.
*/
boolean free(PoolSubpage head, int bitmapIdx) {
if (elemSize == 0) {
return true;
}
int q = bitmapIdx >>> 6;
int r = bitmapIdx & 63;
assert (bitmap[q] >>> r & 1) != 0;
bitmap[q] ^= 1L << r;
setNextAvail(bitmapIdx);
if (numAvail ++ == 0) {
addToPool(head);
// When maxNumElems == 1, the maximum numAvail is also 1.
// Each of these PoolSubpages will go in here when they do free operation.
// If they return true directly from here, then the rest of the code will be unreachable,
// and they will not actually be recycled. So return true only on maxNumElems > 1.
if (maxNumElems > 1) {
return true;
}
}
if (numAvail != maxNumElems) {
return true;
} else {
// Subpage not in use (numAvail == maxNumElems)
if (prev == next) {
// Do not remove if this subpage is the only one left in the pool.
return true;
}
// Remove this subpage from the pool if there are other subpages left in the pool.
doNotDestroy = false;
removeFromPool();
return false;
}
}
private void addToPool(PoolSubpage head) {
assert prev == null && next == null;
prev = head;
next = head.next;
next.prev = this;
head.next = this;
}
private void removeFromPool() {
assert prev != null && next != null;
prev.next = next;
next.prev = prev;
next = null;
prev = null;
}
private void setNextAvail(int bitmapIdx) {
nextAvail = bitmapIdx;
}
private int getNextAvail() {
int nextAvail = this.nextAvail;
if (nextAvail >= 0) {
this.nextAvail = -1;
return nextAvail;
}
return findNextAvail();
}
private int findNextAvail() {
final long[] bitmap = this.bitmap;
final int bitmapLength = this.bitmapLength;
for (int i = 0; i < bitmapLength; i ++) {
long bits = bitmap[i];
if (~bits != 0) {
return findNextAvail0(i, bits);
}
}
return -1;
}
private int findNextAvail0(int i, long bits) {
final int maxNumElems = this.maxNumElems;
final int baseVal = i << 6;
for (int j = 0; j < 64; j ++) {
if ((bits & 1) == 0) {
int val = baseVal | j;
if (val < maxNumElems) {
return val;
} else {
break;
}
}
bits >>>= 1;
}
return -1;
}
private long toHandle(int bitmapIdx) {
int pages = runSize >> pageShifts;
return (long) runOffset << RUN_OFFSET_SHIFT
| (long) pages << SIZE_SHIFT
| 1L << IS_USED_SHIFT
| 1L << IS_SUBPAGE_SHIFT
| bitmapIdx;
}
@Override
public String toString() {
final boolean doNotDestroy;
final int maxNumElems;
final int numAvail;
final int elemSize;
if (chunk == null) {
// This is the head so there is no need to synchronize at all as these never change.
doNotDestroy = true;
maxNumElems = 0;
numAvail = 0;
elemSize = -1;
} else {
synchronized (chunk.arena) {
if (!this.doNotDestroy) {
doNotDestroy = false;
// Not used for creating the String.
maxNumElems = numAvail = elemSize = -1;
} else {
doNotDestroy = true;
maxNumElems = this.maxNumElems;
numAvail = this.numAvail;
elemSize = this.elemSize;
}
}
}
if (!doNotDestroy) {
return "(" + runOffset + ": not in use)";
}
return "(" + runOffset + ": " + (maxNumElems - numAvail) + '/' + maxNumElems +
", offset: " + runOffset + ", length: " + runSize + ", elemSize: " + elemSize + ')';
}
@Override
public int maxNumElements() {
if (chunk == null) {
// It's the head.
return 0;
}
synchronized (chunk.arena) {
return maxNumElems;
}
}
@Override
public int numAvailable() {
if (chunk == null) {
// It's the head.
return 0;
}
synchronized (chunk.arena) {
return numAvail;
}
}
@Override
public int elementSize() {
if (chunk == null) {
// It's the head.
return -1;
}
synchronized (chunk.arena) {
return elemSize;
}
}
@Override
public int pageSize() {
return 1 << pageShifts;
}
void destroy() {
if (chunk != null) {
chunk.destroy();
}
}
}

View File

@ -0,0 +1,27 @@
package io.netty.buffer.api.pool;
/**
* Metrics for a sub-page.
*/
public interface PoolSubpageMetric {
/**
* Return the number of maximal elements that can be allocated out of the sub-page.
*/
int maxNumElements();
/**
* Return the number of available elements to be allocated.
*/
int numAvailable();
/**
* Return the size (in bytes) of the elements that will be allocated.
*/
int elementSize();
/**
* Return the page size (in bytes) of this page.
*/
int pageSize();
}

View File

@ -0,0 +1,375 @@
package io.netty.buffer.api.pool;
import static io.netty.buffer.api.pool.PoolArena.SizeClass.*;
import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
import io.netty.buffer.api.Buffer;
import io.netty.buffer.api.pool.PoolArena.SizeClass;
import io.netty.util.internal.MathUtil;
import io.netty.util.internal.ObjectPool;
import io.netty.util.internal.ObjectPool.Handle;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
/**
* Acts a Thread cache for allocations. This implementation is modelled after
* <a href="https://people.freebsd.org/~jasone/jemalloc/bsdcan2006/jemalloc.pdf">jemalloc</a> and the described
* techniques of
* <a href="https://www.facebook.com/notes/facebook-engineering/scalable-memory-allocation-using-jemalloc/480222803919">
* Scalable memory allocation using jemalloc</a>.
*/
final class PoolThreadCache {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(PoolThreadCache.class);
private static final int INTEGER_SIZE_MINUS_ONE = Integer.SIZE - 1;
final PoolArena arena;
// Hold the caches for the different size classes, which are tiny, small and normal.
private final MemoryRegionCache[] smallSubPageCaches;
private final MemoryRegionCache[] normalCaches;
private final int freeSweepAllocationThreshold;
private int allocations;
PoolThreadCache(PoolArena arena,
int smallCacheSize, int normalCacheSize, int maxCachedBufferCapacity,
int freeSweepAllocationThreshold) {
checkPositiveOrZero(maxCachedBufferCapacity, "maxCachedBufferCapacity");
this.freeSweepAllocationThreshold = freeSweepAllocationThreshold;
this.arena = arena;
if (arena != null) {
// Create the caches for the heap allocations
smallSubPageCaches = createSubPageCaches(
smallCacheSize, arena.numSmallSubpagePools);
normalCaches = createNormalCaches(
normalCacheSize, maxCachedBufferCapacity, arena);
arena.numThreadCaches.getAndIncrement();
} else {
// No heapArea is configured so just null out all caches
smallSubPageCaches = null;
normalCaches = null;
}
// Only check if there are caches in use.
if ((smallSubPageCaches != null || normalCaches != null)
&& freeSweepAllocationThreshold < 1) {
throw new IllegalArgumentException("freeSweepAllocationThreshold: "
+ freeSweepAllocationThreshold + " (expected: > 0)");
}
}
private static MemoryRegionCache[] createSubPageCaches(
int cacheSize, int numCaches) {
if (cacheSize > 0 && numCaches > 0) {
MemoryRegionCache[] cache = new MemoryRegionCache[numCaches];
for (int i = 0; i < cache.length; i++) {
// TODO: maybe use cacheSize / cache.length
cache[i] = new SubPageMemoryRegionCache(cacheSize);
}
return cache;
} else {
return null;
}
}
private static MemoryRegionCache[] createNormalCaches(
int cacheSize, int maxCachedBufferCapacity, PoolArena area) {
if (cacheSize > 0 && maxCachedBufferCapacity > 0) {
int max = Math.min(area.chunkSize, maxCachedBufferCapacity);
// Create as many normal caches as we support based on how many sizeIdx we have and what the upper
// bound is that we want to cache in general.
List<MemoryRegionCache> cache = new ArrayList<>() ;
for (int idx = area.numSmallSubpagePools; idx < area.nSizes && area.sizeIdx2size(idx) <= max ; idx++) {
cache.add(new NormalMemoryRegionCache(cacheSize));
}
return cache.toArray(MemoryRegionCache[]::new);
} else {
return null;
}
}
// val > 0
static int log2(int val) {
return INTEGER_SIZE_MINUS_ONE - Integer.numberOfLeadingZeros(val);
}
/**
* Try to allocate a small buffer out of the cache. Returns {@code true} if successful {@code false} otherwise
*/
Buffer allocateSmall(int size, int sizeIdx) {
return allocate(cacheForSmall(sizeIdx), size);
}
/**
* Try to allocate a normal buffer out of the cache. Returns {@code true} if successful {@code false} otherwise
*/
Buffer allocateNormal(PoolArena area, int size, int sizeIdx) {
return allocate(cacheForNormal(area, sizeIdx), size);
}
private Buffer allocate(MemoryRegionCache cache, int size) {
if (cache == null) {
// no cache found so just return false here
return null;
}
Buffer allocated = cache.allocate(size, this);
if (++ allocations >= freeSweepAllocationThreshold) {
allocations = 0;
trim();
}
return allocated;
}
/**
* Add {@link PoolChunk} and {@code handle} to the cache if there is enough room.
* Returns {@code true} if it fit into the cache {@code false} otherwise.
*/
boolean add(PoolArena area, PoolChunk chunk,
long handle, int normCapacity, SizeClass sizeClass) {
int sizeIdx = area.size2SizeIdx(normCapacity);
MemoryRegionCache cache = cache(area, sizeIdx, sizeClass);
if (cache == null) {
return false;
}
return cache.add(chunk, handle, normCapacity);
}
private MemoryRegionCache cache(PoolArena area, int sizeIdx, SizeClass sizeClass) {
switch (sizeClass) {
case Normal:
return cacheForNormal(area, sizeIdx);
case Small:
return cacheForSmall(sizeIdx);
default:
throw new Error();
}
}
/**
* Should be called if the Thread that uses this cache is about to exist to release resources out of the cache
*/
void free() {
int numFreed = free(smallSubPageCaches) + free(normalCaches);
if (numFreed > 0 && logger.isDebugEnabled()) {
logger.debug("Freed {} thread-local buffer(s) from thread: {}", numFreed,
Thread.currentThread().getName());
}
if (arena != null) {
arena.numThreadCaches.getAndDecrement();
}
}
private static int free(MemoryRegionCache[] caches) {
if (caches == null) {
return 0;
}
int numFreed = 0;
for (MemoryRegionCache c: caches) {
numFreed += free(c);
}
return numFreed;
}
private static int free(MemoryRegionCache cache) {
if (cache == null) {
return 0;
}
return cache.free();
}
void trim() {
trim(smallSubPageCaches);
trim(normalCaches);
}
private static void trim(MemoryRegionCache[] caches) {
if (caches == null) {
return;
}
for (MemoryRegionCache c: caches) {
trim(c);
}
}
private static void trim(MemoryRegionCache cache) {
if (cache == null) {
return;
}
cache.trim();
}
private MemoryRegionCache cacheForSmall(int sizeIdx) {
return cache(smallSubPageCaches, sizeIdx);
}
private MemoryRegionCache cacheForNormal(PoolArena area, int sizeIdx) {
// We need to substract area.numSmallSubpagePools as sizeIdx is the overall index for all sizes.
int idx = sizeIdx - area.numSmallSubpagePools;
return cache(normalCaches, idx);
}
private static MemoryRegionCache cache(MemoryRegionCache[] cache, int sizeIdx) {
if (cache == null || sizeIdx > cache.length - 1) {
return null;
}
return cache[sizeIdx];
}
/**
* Cache used for buffers which are backed by SMALL size.
*/
private static final class SubPageMemoryRegionCache extends MemoryRegionCache {
SubPageMemoryRegionCache(int size) {
super(size, Small);
}
@Override
protected Buffer allocBuf(PoolChunk chunk, long handle, int size, PoolThreadCache threadCache) {
return chunk.allocateBufferWithSubpage(handle, size, threadCache);
}
}
/**
* Cache used for buffers which are backed by NORMAL size.
*/
private static final class NormalMemoryRegionCache extends MemoryRegionCache {
NormalMemoryRegionCache(int size) {
super(size, Normal);
}
@Override
protected Buffer allocBuf(PoolChunk chunk, long handle, int size, PoolThreadCache threadCache) {
return chunk.allocateBuffer(handle, size, threadCache);
}
}
private abstract static class MemoryRegionCache {
private final int size;
private final Queue<Entry> queue;
private final SizeClass sizeClass;
private int allocations;
MemoryRegionCache(int size, SizeClass sizeClass) {
this.size = MathUtil.safeFindNextPositivePowerOfTwo(size);
queue = PlatformDependent.newFixedMpscQueue(this.size);
this.sizeClass = sizeClass;
}
/**
* Allocate a new {@link Buffer} using the provided chunk and handle with the capacity restrictions.
*/
protected abstract Buffer allocBuf(PoolChunk chunk, long handle, int size, PoolThreadCache threadCache);
/**
* Add to cache if not already full.
*/
public final boolean add(PoolChunk chunk, long handle, int normCapacity) {
Entry entry = newEntry(chunk, handle, normCapacity);
boolean queued = queue.offer(entry);
if (!queued) {
// If it was not possible to cache the chunk, immediately recycle the entry
entry.recycle();
}
return queued;
}
/**
* Allocate something out of the cache if possible and remove the entry from the cache.
*/
public final Buffer allocate(int size, PoolThreadCache threadCache) {
Entry entry = queue.poll();
if (entry == null) {
return null;
}
Buffer buffer = allocBuf(entry.chunk, entry.handle, size, threadCache);
entry.recycle();
// allocations are not thread-safe which is fine as this is only called from the same thread all time.
allocations++;
return buffer;
}
/**
* Clear out this cache and free up all previous cached {@link PoolChunk}s and {@code handle}s.
*/
public final int free() {
return free(Integer.MAX_VALUE);
}
private int free(int max) {
int numFreed = 0;
for (; numFreed < max; numFreed++) {
Entry entry = queue.poll();
if (entry != null) {
freeEntry(entry);
} else {
// all cleared
return numFreed;
}
}
return numFreed;
}
/**
* Free up cached {@link PoolChunk}s if not allocated frequently enough.
*/
public final void trim() {
int free = size - allocations;
allocations = 0;
// We not even allocated all the number that are
if (free > 0) {
free(free);
}
}
private void freeEntry(Entry entry) {
PoolChunk chunk = entry.chunk;
long handle = entry.handle;
entry.recycle();
chunk.arena.freeChunk(chunk, handle, entry.normCapacity, sizeClass);
}
static final class Entry {
final Handle<Entry> recyclerHandle;
PoolChunk chunk;
long handle = -1;
int normCapacity;
Entry(Handle<Entry> recyclerHandle) {
this.recyclerHandle = recyclerHandle;
}
void recycle() {
chunk = null;
handle = -1;
recyclerHandle.recycle(this);
}
}
private static Entry newEntry(PoolChunk chunk, long handle, int normCapacity) {
Entry entry = RECYCLER.get();
entry.chunk = chunk;
entry.handle = handle;
entry.normCapacity = normCapacity;
return entry;
}
private static final ObjectPool<Entry> RECYCLER = ObjectPool.newPool(handle -> new Entry(handle));
}
}

View File

@ -0,0 +1,522 @@
package io.netty.buffer.api.pool;
import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
import static java.util.Objects.requireNonNull;
import io.netty.buffer.api.AllocatorControl;
import io.netty.buffer.api.Buffer;
import io.netty.buffer.api.BufferAllocator;
import io.netty.buffer.api.MemoryManager;
import io.netty.util.NettyRuntime;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.FastThreadLocal;
import io.netty.util.concurrent.FastThreadLocalThread;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.StringUtil;
import io.netty.util.internal.SystemPropertyUtil;
import io.netty.util.internal.ThreadExecutorMap;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
public class PooledByteBufAllocator implements BufferAllocator, ByteBufAllocatorMetricProvider, AllocatorControl {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(PooledByteBufAllocator.class);
private static final int DEFAULT_NUM_HEAP_ARENA;
private static final int DEFAULT_NUM_DIRECT_ARENA;
private static final int DEFAULT_PAGE_SIZE;
private static final int DEFAULT_MAX_ORDER; // 8192 << 11 = 16 MiB per chunk
private static final int DEFAULT_SMALL_CACHE_SIZE;
private static final int DEFAULT_NORMAL_CACHE_SIZE;
static final int DEFAULT_MAX_CACHED_BUFFER_CAPACITY;
private static final int DEFAULT_CACHE_TRIM_INTERVAL;
private static final long DEFAULT_CACHE_TRIM_INTERVAL_MILLIS;
private static final boolean DEFAULT_USE_CACHE_FOR_ALL_THREADS;
private static final int DEFAULT_DIRECT_MEMORY_CACHE_ALIGNMENT;
static final int DEFAULT_MAX_CACHED_BYTEBUFFERS_PER_CHUNK;
private static final int MIN_PAGE_SIZE = 4096;
private static final int MAX_CHUNK_SIZE = (int) (((long) Integer.MAX_VALUE + 1) / 2);
private final Runnable trimTask = this::trimCurrentThreadCache;
static {
int defaultAlignment = SystemPropertyUtil.getInt(
"io.netty.allocator.directMemoryCacheAlignment", 0);
int defaultPageSize = SystemPropertyUtil.getInt("io.netty.allocator.pageSize", 8192);
Throwable pageSizeFallbackCause = null;
try {
validateAndCalculatePageShifts(defaultPageSize, defaultAlignment);
} catch (Throwable t) {
pageSizeFallbackCause = t;
defaultPageSize = 8192;
defaultAlignment = 0;
}
DEFAULT_PAGE_SIZE = defaultPageSize;
DEFAULT_DIRECT_MEMORY_CACHE_ALIGNMENT = defaultAlignment;
int defaultMaxOrder = SystemPropertyUtil.getInt("io.netty.allocator.maxOrder", 11);
Throwable maxOrderFallbackCause = null;
try {
validateAndCalculateChunkSize(DEFAULT_PAGE_SIZE, defaultMaxOrder);
} catch (Throwable t) {
maxOrderFallbackCause = t;
defaultMaxOrder = 11;
}
DEFAULT_MAX_ORDER = defaultMaxOrder;
// Determine reasonable default for nHeapArena and nDirectArena.
// Assuming each arena has 3 chunks, the pool should not consume more than 50% of max memory.
final Runtime runtime = Runtime.getRuntime();
/*
* We use 2 * available processors by default to reduce contention as we use 2 * available processors for the
* number of EventLoops in NIO and EPOLL as well. If we choose a smaller number we will run into hot spots as
* allocation and de-allocation needs to be synchronized on the PoolArena.
*
* See https://github.com/netty/netty/issues/3888.
*/
final int defaultMinNumArena = NettyRuntime.availableProcessors() * 2;
final int defaultChunkSize = DEFAULT_PAGE_SIZE << DEFAULT_MAX_ORDER;
DEFAULT_NUM_HEAP_ARENA = Math.max(0,
SystemPropertyUtil.getInt(
"io.netty.allocator.numArenas",
(int) Math.min(
defaultMinNumArena,
runtime.maxMemory() / defaultChunkSize / 2 / 3)));
DEFAULT_NUM_DIRECT_ARENA = Math.max(0,
SystemPropertyUtil.getInt(
"io.netty.allocator.numDirectArenas",
(int) Math.min(
defaultMinNumArena,
PlatformDependent.maxDirectMemory() / defaultChunkSize / 2 / 3)));
// cache sizes
DEFAULT_SMALL_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.smallCacheSize", 256);
DEFAULT_NORMAL_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.normalCacheSize", 64);
// 32 kb is the default maximum capacity of the cached buffer. Similar to what is explained in
// 'Scalable memory allocation using jemalloc'
DEFAULT_MAX_CACHED_BUFFER_CAPACITY = SystemPropertyUtil.getInt(
"io.netty.allocator.maxCachedBufferCapacity", 32 * 1024);
// the number of threshold of allocations when cached entries will be freed up if not frequently used
DEFAULT_CACHE_TRIM_INTERVAL = SystemPropertyUtil.getInt(
"io.netty.allocator.cacheTrimInterval", 8192);
DEFAULT_CACHE_TRIM_INTERVAL_MILLIS = SystemPropertyUtil.getLong(
"io.netty.allocator.cacheTrimIntervalMillis", 0);
DEFAULT_USE_CACHE_FOR_ALL_THREADS = SystemPropertyUtil.getBoolean(
"io.netty.allocator.useCacheForAllThreads", false);
// Use 1023 by default as we use an ArrayDeque as backing storage which will then allocate an internal array
// of 1024 elements. Otherwise, we would allocate 2048 and only use 1024 which is wasteful.
DEFAULT_MAX_CACHED_BYTEBUFFERS_PER_CHUNK = SystemPropertyUtil.getInt(
"io.netty.allocator.maxCachedByteBuffersPerChunk", 1023);
if (logger.isDebugEnabled()) {
logger.debug("-Dio.netty.allocator.numArenas: {}", DEFAULT_NUM_HEAP_ARENA);
logger.debug("-Dio.netty.allocator.numDirectArenas: {}", DEFAULT_NUM_DIRECT_ARENA);
if (pageSizeFallbackCause == null) {
logger.debug("-Dio.netty.allocator.pageSize: {}", DEFAULT_PAGE_SIZE);
} else {
logger.debug("-Dio.netty.allocator.pageSize: {}", DEFAULT_PAGE_SIZE, pageSizeFallbackCause);
}
if (maxOrderFallbackCause == null) {
logger.debug("-Dio.netty.allocator.maxOrder: {}", DEFAULT_MAX_ORDER);
} else {
logger.debug("-Dio.netty.allocator.maxOrder: {}", DEFAULT_MAX_ORDER, maxOrderFallbackCause);
}
logger.debug("-Dio.netty.allocator.chunkSize: {}", DEFAULT_PAGE_SIZE << DEFAULT_MAX_ORDER);
logger.debug("-Dio.netty.allocator.smallCacheSize: {}", DEFAULT_SMALL_CACHE_SIZE);
logger.debug("-Dio.netty.allocator.normalCacheSize: {}", DEFAULT_NORMAL_CACHE_SIZE);
logger.debug("-Dio.netty.allocator.maxCachedBufferCapacity: {}", DEFAULT_MAX_CACHED_BUFFER_CAPACITY);
logger.debug("-Dio.netty.allocator.cacheTrimInterval: {}", DEFAULT_CACHE_TRIM_INTERVAL);
logger.debug("-Dio.netty.allocator.cacheTrimIntervalMillis: {}", DEFAULT_CACHE_TRIM_INTERVAL_MILLIS);
logger.debug("-Dio.netty.allocator.useCacheForAllThreads: {}", DEFAULT_USE_CACHE_FOR_ALL_THREADS);
logger.debug("-Dio.netty.allocator.maxCachedByteBuffersPerChunk: {}",
DEFAULT_MAX_CACHED_BYTEBUFFERS_PER_CHUNK);
}
}
private final MemoryManager manager;
private final PoolArena[] arenas;
private final int smallCacheSize;
private final int normalCacheSize;
private final List<PoolArenaMetric> arenaMetrics;
private final PoolThreadLocalCache threadCache;
private final int chunkSize;
private final PooledByteBufAllocatorMetric metric;
public PooledByteBufAllocator(MemoryManager manager) {
this(manager, manager.isNative()? DEFAULT_NUM_DIRECT_ARENA : DEFAULT_NUM_HEAP_ARENA,
DEFAULT_PAGE_SIZE, DEFAULT_MAX_ORDER, DEFAULT_SMALL_CACHE_SIZE,
DEFAULT_NORMAL_CACHE_SIZE, DEFAULT_USE_CACHE_FOR_ALL_THREADS,
DEFAULT_DIRECT_MEMORY_CACHE_ALIGNMENT);
}
public PooledByteBufAllocator(MemoryManager manager, int numArenas, int pageSize, int maxOrder) {
this(manager, numArenas, pageSize, maxOrder, DEFAULT_SMALL_CACHE_SIZE,
DEFAULT_NORMAL_CACHE_SIZE, DEFAULT_USE_CACHE_FOR_ALL_THREADS,
DEFAULT_DIRECT_MEMORY_CACHE_ALIGNMENT);
}
public PooledByteBufAllocator(MemoryManager manager, int numArenas, int pageSize, int maxOrder,
int smallCacheSize, int normalCacheSize,
boolean useCacheForAllThreads) {
this(manager, numArenas, pageSize, maxOrder,
smallCacheSize, normalCacheSize,
useCacheForAllThreads, DEFAULT_DIRECT_MEMORY_CACHE_ALIGNMENT);
}
public PooledByteBufAllocator(MemoryManager manager, int numArenas, int pageSize, int maxOrder,
int smallCacheSize, int normalCacheSize,
boolean useCacheForAllThreads, int directMemoryCacheAlignment) {
this.manager = requireNonNull(manager, "MemoryManager");
threadCache = new PoolThreadLocalCache(useCacheForAllThreads);
this.smallCacheSize = smallCacheSize;
this.normalCacheSize = normalCacheSize;
if (directMemoryCacheAlignment != 0) {
if (!PlatformDependent.hasAlignDirectByteBuffer()) {
throw new UnsupportedOperationException("Buffer alignment is not supported. " +
"Either Unsafe or ByteBuffer.alignSlice() must be available.");
}
// Ensure page size is a whole multiple of the alignment, or bump it to the next whole multiple.
pageSize = (int) PlatformDependent.align(pageSize, directMemoryCacheAlignment);
}
chunkSize = validateAndCalculateChunkSize(pageSize, maxOrder);
checkPositiveOrZero(numArenas, "numArenas");
checkPositiveOrZero(directMemoryCacheAlignment, "directMemoryCacheAlignment");
if (directMemoryCacheAlignment > 0 && !isDirectMemoryCacheAlignmentSupported()) {
throw new IllegalArgumentException("directMemoryCacheAlignment is not supported");
}
if ((directMemoryCacheAlignment & -directMemoryCacheAlignment) != directMemoryCacheAlignment) {
throw new IllegalArgumentException("directMemoryCacheAlignment: "
+ directMemoryCacheAlignment + " (expected: power of two)");
}
int pageShifts = validateAndCalculatePageShifts(pageSize, directMemoryCacheAlignment);
if (numArenas > 0) {
arenas = newArenaArray(numArenas);
List<PoolArenaMetric> metrics = new ArrayList<>(arenas.length);
for (int i = 0; i < arenas.length; i ++) {
PoolArena arena = new PoolArena(this, manager,
pageSize, pageShifts, chunkSize,
directMemoryCacheAlignment);
arenas[i] = arena;
metrics.add(arena);
}
arenaMetrics = Collections.unmodifiableList(metrics);
} else {
arenas = null;
arenaMetrics = Collections.emptyList();
}
metric = new PooledByteBufAllocatorMetric(this);
}
private static PoolArena[] newArenaArray(int size) {
return new PoolArena[size];
}
private static int validateAndCalculatePageShifts(int pageSize, int alignment) {
if (pageSize < MIN_PAGE_SIZE) {
throw new IllegalArgumentException("pageSize: " + pageSize + " (expected: " + MIN_PAGE_SIZE + ')');
}
if ((pageSize & pageSize - 1) != 0) {
throw new IllegalArgumentException("pageSize: " + pageSize + " (expected: power of 2)");
}
if (pageSize < alignment) {
throw new IllegalArgumentException("Alignment cannot be greater than page size. " +
"Alignment: " + alignment + ", page size: " + pageSize + '.');
}
// Logarithm base 2. At this point we know that pageSize is a power of two.
return Integer.SIZE - 1 - Integer.numberOfLeadingZeros(pageSize);
}
private static int validateAndCalculateChunkSize(int pageSize, int maxOrder) {
if (maxOrder > 14) {
throw new IllegalArgumentException("maxOrder: " + maxOrder + " (expected: 0-14)");
}
// Ensure the resulting chunkSize does not overflow.
int chunkSize = pageSize;
for (int i = maxOrder; i > 0; i --) {
if (chunkSize > MAX_CHUNK_SIZE / 2) {
throw new IllegalArgumentException(String.format(
"pageSize (%d) << maxOrder (%d) must not exceed %d", pageSize, maxOrder, MAX_CHUNK_SIZE));
}
chunkSize <<= 1;
}
return chunkSize;
}
@Override
public Buffer allocate(int size) {
PoolThreadCache cache = threadCache.get();
PoolArena arena = cache.arena;
if (arena != null) {
return arena.allocate(cache, size);
}
BufferAllocator unpooled = manager.isNative()? BufferAllocator.direct() : BufferAllocator.heap();
return unpooled.allocate(size);
}
@Override
public Object allocateUntethered(Buffer originator, int size) {
// TODO
return null;
}
@Override
public void recoverMemory(Object memory) {
// TODO
}
/**
* Default number of heap arenas - System Property: io.netty.allocator.numHeapArenas - default 2 * cores
*/
public static int defaultNumHeapArena() {
return DEFAULT_NUM_HEAP_ARENA;
}
/**
* Default number of direct arenas - System Property: io.netty.allocator.numDirectArenas - default 2 * cores
*/
public static int defaultNumDirectArena() {
return DEFAULT_NUM_DIRECT_ARENA;
}
/**
* Default buffer page size - System Property: io.netty.allocator.pageSize - default 8192
*/
public static int defaultPageSize() {
return DEFAULT_PAGE_SIZE;
}
/**
* Default maximum order - System Property: io.netty.allocator.maxOrder - default 11
*/
public static int defaultMaxOrder() {
return DEFAULT_MAX_ORDER;
}
/**
* Default thread caching behavior - System Property: io.netty.allocator.useCacheForAllThreads - default true
*/
public static boolean defaultUseCacheForAllThreads() {
return DEFAULT_USE_CACHE_FOR_ALL_THREADS;
}
/**
* Default prefer direct - System Property: io.netty.noPreferDirect - default false
*/
public static boolean defaultPreferDirect() {
return PlatformDependent.directBufferPreferred();
}
/**
* Default small cache size - System Property: io.netty.allocator.smallCacheSize - default 256
*/
public static int defaultSmallCacheSize() {
return DEFAULT_SMALL_CACHE_SIZE;
}
/**
* Default normal cache size - System Property: io.netty.allocator.normalCacheSize - default 64
*/
public static int defaultNormalCacheSize() {
return DEFAULT_NORMAL_CACHE_SIZE;
}
/**
* Return {@code true} if direct memory cache alignment is supported, {@code false} otherwise.
*/
public static boolean isDirectMemoryCacheAlignmentSupported() {
return PlatformDependent.hasUnsafe();
}
public boolean isDirectBufferPooled() {
return manager.isNative();
}
public int numArenas() {
return arenas.length;
}
final class PoolThreadLocalCache extends FastThreadLocal<PoolThreadCache> {
private final boolean useCacheForAllThreads;
PoolThreadLocalCache(boolean useCacheForAllThreads) {
this.useCacheForAllThreads = useCacheForAllThreads;
}
@Override
protected synchronized PoolThreadCache initialValue() {
final PoolArena arena = leastUsedArena(arenas);
final Thread current = Thread.currentThread();
if (useCacheForAllThreads || current instanceof FastThreadLocalThread) {
final PoolThreadCache cache = new PoolThreadCache(
arena, smallCacheSize, normalCacheSize,
DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL);
if (DEFAULT_CACHE_TRIM_INTERVAL_MILLIS > 0) {
final EventExecutor executor = ThreadExecutorMap.currentExecutor();
if (executor != null) {
executor.scheduleAtFixedRate(trimTask, DEFAULT_CACHE_TRIM_INTERVAL_MILLIS,
DEFAULT_CACHE_TRIM_INTERVAL_MILLIS, TimeUnit.MILLISECONDS);
}
}
return cache;
}
// No caching so just use 0 as sizes.
return new PoolThreadCache(arena, 0, 0, 0, 0);
}
@Override
protected void onRemoval(PoolThreadCache threadCache) {
threadCache.free();
}
private PoolArena leastUsedArena(PoolArena[] arenas) {
if (arenas == null || arenas.length == 0) {
return null;
}
PoolArena minArena = arenas[0];
for (int i = 1; i < arenas.length; i++) {
PoolArena arena = arenas[i];
if (arena.numThreadCaches.get() < minArena.numThreadCaches.get()) {
minArena = arena;
}
}
return minArena;
}
}
@Override
public PooledByteBufAllocatorMetric metric() {
return metric;
}
/**
* Return a {@link List} of all heap {@link PoolArenaMetric}s that are provided by this pool.
*/
List<PoolArenaMetric> arenaMetrics() {
return arenaMetrics;
}
/**
* Return the number of thread local caches used by this {@link PooledByteBufAllocator}.
*/
int numThreadLocalCaches() {
if (arenas == null) {
return 0;
}
int total = 0;
for (PoolArena arena : arenas) {
total += arena.numThreadCaches.get();
}
return total;
}
/**
* Return the size of the small cache.
*/
int smallCacheSize() {
return smallCacheSize;
}
/**
* Return the size of the normal cache.
*/
int normalCacheSize() {
return normalCacheSize;
}
/**
* Return the chunk size for an arena.
*/
final int chunkSize() {
return chunkSize;
}
final long usedMemory() {
return usedMemory(arenas);
}
private static long usedMemory(PoolArena[] arenas) {
if (arenas == null) {
return -1;
}
long used = 0;
for (PoolArena arena : arenas) {
used += arena.numActiveBytes();
if (used < 0) {
return Long.MAX_VALUE;
}
}
return used;
}
final PoolThreadCache threadCache() {
PoolThreadCache cache = threadCache.get();
assert cache != null;
return cache;
}
/**
* Trim thread local cache for the current {@link Thread}, which will give back any cached memory that was not
* allocated frequently since the last trim operation.
*
* Returns {@code true} if a cache for the current {@link Thread} exists and so was trimmed, false otherwise.
*/
public boolean trimCurrentThreadCache() {
PoolThreadCache cache = threadCache.getIfExists();
if (cache != null) {
cache.trim();
return true;
}
return false;
}
/**
* Returns the status of the allocator (which contains all metrics) as string. Be aware this may be expensive
* and so should not be called too frequently.
*/
public String dumpStats() {
int heapArenasLen = arenas == null ? 0 : arenas.length;
StringBuilder buf = new StringBuilder(512)
.append(heapArenasLen)
.append(" arena(s):")
.append(StringUtil.NEWLINE);
if (heapArenasLen > 0) {
for (PoolArena a: arenas) {
buf.append(a);
}
}
return buf.toString();
}
}

View File

@ -0,0 +1,78 @@
package io.netty.buffer.api.pool;
import io.netty.util.internal.StringUtil;
import java.util.List;
/**
* Exposed metric for {@link PooledByteBufAllocator}.
*/
@SuppressWarnings("deprecation")
public final class PooledByteBufAllocatorMetric implements ByteBufAllocatorMetric {
private final PooledByteBufAllocator allocator;
PooledByteBufAllocatorMetric(PooledByteBufAllocator allocator) {
this.allocator = allocator;
}
/**
* Return the number of arenas.
*/
public int numArenas() {
return allocator.numArenas();
}
/**
* Return a {@link List} of all {@link PoolArenaMetric}s that are provided by this pool.
*/
public List<PoolArenaMetric> arenaMetrics() {
return allocator.arenaMetrics();
}
/**
* Return the number of thread local caches used by this {@link PooledByteBufAllocator}.
*/
public int numThreadLocalCaches() {
return allocator.numThreadLocalCaches();
}
/**
* Return the size of the small cache.
*/
public int smallCacheSize() {
return allocator.smallCacheSize();
}
/**
* Return the size of the normal cache.
*/
public int normalCacheSize() {
return allocator.normalCacheSize();
}
/**
* Return the chunk size for an arena.
*/
public int chunkSize() {
return allocator.chunkSize();
}
@Override
public long usedMemory() {
return allocator.usedMemory();
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder(256);
sb.append(StringUtil.simpleClassName(this))
.append("(usedMemory: ").append(usedMemory())
.append("; numArenas: ").append(numArenas())
.append("; smallCacheSize: ").append(smallCacheSize())
.append("; normalCacheSize: ").append(normalCacheSize())
.append("; numThreadLocalCaches: ").append(numThreadLocalCaches())
.append("; chunkSize: ").append(chunkSize()).append(')');
return sb.toString();
}
}

View File

@ -0,0 +1,392 @@
package io.netty.buffer.api.pool;
import static io.netty.buffer.api.pool.PoolThreadCache.*;
/**
* SizeClasses requires {@code pageShifts} to be defined prior to inclusion,
* and it in turn defines:
* <p>
* LOG2_SIZE_CLASS_GROUP: Log of size class count for each size doubling.
* LOG2_MAX_LOOKUP_SIZE: Log of max size class in the lookup table.
* sizeClasses: Complete table of [index, log2Group, log2Delta, nDelta, isMultiPageSize,
* isSubPage, log2DeltaLookup] tuples.
* index: Size class index.
* log2Group: Log of group base size (no deltas added).
* log2Delta: Log of delta to previous size class.
* nDelta: Delta multiplier.
* isMultiPageSize: 'yes' if a multiple of the page size, 'no' otherwise.
* isSubPage: 'yes' if a subpage size class, 'no' otherwise.
* log2DeltaLookup: Same as log2Delta if a lookup table size class, 'no'
* otherwise.
* <p>
* nSubpages: Number of subpages size classes.
* nSizes: Number of size classes.
* nPSizes: Number of size classes that are multiples of pageSize.
*
* smallMaxSizeIdx: Maximum small size class index.
*
* lookupMaxclass: Maximum size class included in lookup table.
* log2NormalMinClass: Log of minimum normal size class.
* <p>
* The first size class and spacing are 1 << LOG2_QUANTUM.
* Each group has 1 << LOG2_SIZE_CLASS_GROUP of size classes.
*
* size = 1 << log2Group + nDelta * (1 << log2Delta)
*
* The first size class has an unusual encoding, because the size has to be
* split between group and delta*nDelta.
*
* If pageShift = 13, sizeClasses looks like this:
*
* (index, log2Group, log2Delta, nDelta, isMultiPageSize, isSubPage, log2DeltaLookup)
* <p>
* ( 0, 4, 4, 0, no, yes, 4)
* ( 1, 4, 4, 1, no, yes, 4)
* ( 2, 4, 4, 2, no, yes, 4)
* ( 3, 4, 4, 3, no, yes, 4)
* <p>
* ( 4, 6, 4, 1, no, yes, 4)
* ( 5, 6, 4, 2, no, yes, 4)
* ( 6, 6, 4, 3, no, yes, 4)
* ( 7, 6, 4, 4, no, yes, 4)
* <p>
* ( 8, 7, 5, 1, no, yes, 5)
* ( 9, 7, 5, 2, no, yes, 5)
* ( 10, 7, 5, 3, no, yes, 5)
* ( 11, 7, 5, 4, no, yes, 5)
* ...
* ...
* ( 72, 23, 21, 1, yes, no, no)
* ( 73, 23, 21, 2, yes, no, no)
* ( 74, 23, 21, 3, yes, no, no)
* ( 75, 23, 21, 4, yes, no, no)
* <p>
* ( 76, 24, 22, 1, yes, no, no)
*/
abstract class SizeClasses implements SizeClassesMetric {
static final int LOG2_QUANTUM = 4;
private static final int LOG2_SIZE_CLASS_GROUP = 2;
private static final int LOG2_MAX_LOOKUP_SIZE = 12;
private static final int INDEX_IDX = 0;
private static final int LOG2GROUP_IDX = 1;
private static final int LOG2DELTA_IDX = 2;
private static final int NDELTA_IDX = 3;
private static final int PAGESIZE_IDX = 4;
private static final int SUBPAGE_IDX = 5;
private static final int LOG2_DELTA_LOOKUP_IDX = 6;
private static final byte no = 0, yes = 1;
protected SizeClasses(int pageSize, int pageShifts, int chunkSize, int directMemoryCacheAlignment) {
this.pageSize = pageSize;
this.pageShifts = pageShifts;
this.chunkSize = chunkSize;
this.directMemoryCacheAlignment = directMemoryCacheAlignment;
int group = log2(chunkSize) + 1 - LOG2_QUANTUM;
//generate size classes
//[index, log2Group, log2Delta, nDelta, isMultiPageSize, isSubPage, log2DeltaLookup]
sizeClasses = new short[group << LOG2_SIZE_CLASS_GROUP][7];
nSizes = sizeClasses();
//generate lookup table
sizeIdx2sizeTab = new int[nSizes];
pageIdx2sizeTab = new int[nPSizes];
idx2SizeTab(sizeIdx2sizeTab, pageIdx2sizeTab);
size2idxTab = new int[lookupMaxSize >> LOG2_QUANTUM];
size2idxTab(size2idxTab);
}
protected final int pageSize;
protected final int pageShifts;
protected final int chunkSize;
protected final int directMemoryCacheAlignment;
final int nSizes;
int nSubpages;
int nPSizes;
int smallMaxSizeIdx;
private int lookupMaxSize;
private final short[][] sizeClasses;
private final int[] pageIdx2sizeTab;
// lookup table for sizeIdx <= smallMaxSizeIdx
private final int[] sizeIdx2sizeTab;
// lookup table used for size <= lookupMaxclass
// spacing is 1 << LOG2_QUANTUM, so the size of array is lookupMaxclass >> LOG2_QUANTUM
private final int[] size2idxTab;
private int sizeClasses() {
int normalMaxSize = -1;
int index = 0;
int size = 0;
int log2Group = LOG2_QUANTUM;
int log2Delta = LOG2_QUANTUM;
int ndeltaLimit = 1 << LOG2_SIZE_CLASS_GROUP;
//First small group, nDelta start at 0.
//first size class is 1 << LOG2_QUANTUM
int nDelta = 0;
while (nDelta < ndeltaLimit) {
size = sizeClass(index++, log2Group, log2Delta, nDelta++);
}
log2Group += LOG2_SIZE_CLASS_GROUP;
//All remaining groups, nDelta start at 1.
while (size < chunkSize) {
nDelta = 1;
while (nDelta <= ndeltaLimit && size < chunkSize) {
size = sizeClass(index++, log2Group, log2Delta, nDelta++);
normalMaxSize = size;
}
log2Group++;
log2Delta++;
}
//chunkSize must be normalMaxSize
assert chunkSize == normalMaxSize;
//return number of size index
return index;
}
//calculate size class
private int sizeClass(int index, int log2Group, int log2Delta, int nDelta) {
short isMultiPageSize;
if (log2Delta >= pageShifts) {
isMultiPageSize = yes;
} else {
int pageSize = 1 << pageShifts;
int size = (1 << log2Group) + (1 << log2Delta) * nDelta;
isMultiPageSize = size == size / pageSize * pageSize? yes : no;
}
int log2Ndelta = nDelta == 0? 0 : log2(nDelta);
byte remove = 1 << log2Ndelta < nDelta? yes : no;
int log2Size = log2Delta + log2Ndelta == log2Group? log2Group + 1 : log2Group;
if (log2Size == log2Group) {
remove = yes;
}
short isSubpage = log2Size < pageShifts + LOG2_SIZE_CLASS_GROUP? yes : no;
int log2DeltaLookup = log2Size < LOG2_MAX_LOOKUP_SIZE ||
log2Size == LOG2_MAX_LOOKUP_SIZE && remove == no
? log2Delta : no;
short[] sz = {
(short) index, (short) log2Group, (short) log2Delta,
(short) nDelta, isMultiPageSize, isSubpage, (short) log2DeltaLookup
};
sizeClasses[index] = sz;
int size = (1 << log2Group) + (nDelta << log2Delta);
if (sz[PAGESIZE_IDX] == yes) {
nPSizes++;
}
if (sz[SUBPAGE_IDX] == yes) {
nSubpages++;
smallMaxSizeIdx = index;
}
if (sz[LOG2_DELTA_LOOKUP_IDX] != no) {
lookupMaxSize = size;
}
return size;
}
private void idx2SizeTab(int[] sizeIdx2sizeTab, int[] pageIdx2sizeTab) {
int pageIdx = 0;
for (int i = 0; i < nSizes; i++) {
short[] sizeClass = sizeClasses[i];
int log2Group = sizeClass[LOG2GROUP_IDX];
int log2Delta = sizeClass[LOG2DELTA_IDX];
int nDelta = sizeClass[NDELTA_IDX];
int size = (1 << log2Group) + (nDelta << log2Delta);
sizeIdx2sizeTab[i] = size;
if (sizeClass[PAGESIZE_IDX] == yes) {
pageIdx2sizeTab[pageIdx++] = size;
}
}
}
private void size2idxTab(int[] size2idxTab) {
int idx = 0;
int size = 0;
for (int i = 0; size <= lookupMaxSize; i++) {
int log2Delta = sizeClasses[i][LOG2DELTA_IDX];
int times = 1 << log2Delta - LOG2_QUANTUM;
while (size <= lookupMaxSize && times-- > 0) {
size2idxTab[idx++] = i;
size = idx + 1 << LOG2_QUANTUM;
}
}
}
@Override
public int sizeIdx2size(int sizeIdx) {
return sizeIdx2sizeTab[sizeIdx];
}
@Override
public int sizeIdx2sizeCompute(int sizeIdx) {
int group = sizeIdx >> LOG2_SIZE_CLASS_GROUP;
int mod = sizeIdx & (1 << LOG2_SIZE_CLASS_GROUP) - 1;
int groupSize = group == 0? 0 :
1 << LOG2_QUANTUM + LOG2_SIZE_CLASS_GROUP - 1 << group;
int shift = group == 0? 1 : group;
int lgDelta = shift + LOG2_QUANTUM - 1;
int modSize = mod + 1 << lgDelta;
return groupSize + modSize;
}
@Override
public long pageIdx2size(int pageIdx) {
return pageIdx2sizeTab[pageIdx];
}
@Override
public long pageIdx2sizeCompute(int pageIdx) {
int group = pageIdx >> LOG2_SIZE_CLASS_GROUP;
int mod = pageIdx & (1 << LOG2_SIZE_CLASS_GROUP) - 1;
long groupSize = group == 0? 0 :
1L << pageShifts + LOG2_SIZE_CLASS_GROUP - 1 << group;
int shift = group == 0? 1 : group;
int log2Delta = shift + pageShifts - 1;
int modSize = mod + 1 << log2Delta;
return groupSize + modSize;
}
@Override
public int size2SizeIdx(int size) {
if (size == 0) {
return 0;
}
if (size > chunkSize) {
return nSizes;
}
if (directMemoryCacheAlignment > 0) {
size = alignSize(size);
}
if (size <= lookupMaxSize) {
//size-1 / MIN_TINY
return size2idxTab[size - 1 >> LOG2_QUANTUM];
}
int x = log2((size << 1) - 1);
int shift = x < LOG2_SIZE_CLASS_GROUP + LOG2_QUANTUM + 1
? 0 : x - (LOG2_SIZE_CLASS_GROUP + LOG2_QUANTUM);
int group = shift << LOG2_SIZE_CLASS_GROUP;
int log2Delta = x < LOG2_SIZE_CLASS_GROUP + LOG2_QUANTUM + 1
? LOG2_QUANTUM : x - LOG2_SIZE_CLASS_GROUP - 1;
int deltaInverseMask = -1 << log2Delta;
int mod = (size - 1 & deltaInverseMask) >> log2Delta &
(1 << LOG2_SIZE_CLASS_GROUP) - 1;
return group + mod;
}
@Override
public int pages2pageIdx(int pages) {
return pages2pageIdxCompute(pages, false);
}
@Override
public int pages2pageIdxFloor(int pages) {
return pages2pageIdxCompute(pages, true);
}
private int pages2pageIdxCompute(int pages, boolean floor) {
int pageSize = pages << pageShifts;
if (pageSize > chunkSize) {
return nPSizes;
}
int x = log2((pageSize << 1) - 1);
int shift = x < LOG2_SIZE_CLASS_GROUP + pageShifts
? 0 : x - (LOG2_SIZE_CLASS_GROUP + pageShifts);
int group = shift << LOG2_SIZE_CLASS_GROUP;
int log2Delta = x < LOG2_SIZE_CLASS_GROUP + pageShifts + 1?
pageShifts : x - LOG2_SIZE_CLASS_GROUP - 1;
int deltaInverseMask = -1 << log2Delta;
int mod = (pageSize - 1 & deltaInverseMask) >> log2Delta &
(1 << LOG2_SIZE_CLASS_GROUP) - 1;
int pageIdx = group + mod;
if (floor && pageIdx2sizeTab[pageIdx] > pages << pageShifts) {
pageIdx--;
}
return pageIdx;
}
// Round size up to the nearest multiple of alignment.
private int alignSize(int size) {
int delta = size & directMemoryCacheAlignment - 1;
return delta == 0? size : size + directMemoryCacheAlignment - delta;
}
@Override
public int normalizeSize(int size) {
if (size == 0) {
return sizeIdx2sizeTab[0];
}
if (directMemoryCacheAlignment > 0) {
size = alignSize(size);
}
if (size <= lookupMaxSize) {
int ret = sizeIdx2sizeTab[size2idxTab[size - 1 >> LOG2_QUANTUM]];
assert ret == normalizeSizeCompute(size);
return ret;
}
return normalizeSizeCompute(size);
}
private static int normalizeSizeCompute(int size) {
int x = log2((size << 1) - 1);
int log2Delta = x < LOG2_SIZE_CLASS_GROUP + LOG2_QUANTUM + 1
? LOG2_QUANTUM : x - LOG2_SIZE_CLASS_GROUP - 1;
int delta = 1 << log2Delta;
int delta_mask = delta - 1;
return size + delta_mask & ~delta_mask;
}
}

View File

@ -0,0 +1,72 @@
package io.netty.buffer.api.pool;
/**
* Expose metrics for an SizeClasses.
*/
public interface SizeClassesMetric {
/**
* Computes size from lookup table according to sizeIdx.
*
* @return size
*/
int sizeIdx2size(int sizeIdx);
/**
* Computes size according to sizeIdx.
*
* @return size
*/
int sizeIdx2sizeCompute(int sizeIdx);
/**
* Computes size from lookup table according to pageIdx.
*
* @return size which is multiples of pageSize.
*/
long pageIdx2size(int pageIdx);
/**
* Computes size according to pageIdx.
*
* @return size which is multiples of pageSize
*/
long pageIdx2sizeCompute(int pageIdx);
/**
* Normalizes request size up to the nearest size class.
*
* @param size request size
*
* @return sizeIdx of the size class
*/
int size2SizeIdx(int size);
/**
* Normalizes request size up to the nearest pageSize class.
*
* @param pages multiples of pageSizes
*
* @return pageIdx of the pageSize class
*/
int pages2pageIdx(int pages);
/**
* Normalizes request size down to the nearest pageSize class.
*
* @param pages multiples of pageSizes
*
* @return pageIdx of the pageSize class
*/
int pages2pageIdxFloor(int pages);
/**
* Normalizes usable size that would result from allocating an object with the
* specified size and alignment.
*
* @param size request size
*
* @return normalized size
*/
int normalizeSize(int size);
}

View File

@ -33,6 +33,11 @@ public class UnsafeMemoryManager implements MemoryManager {
this.offheap = offheap;
}
@Override
public boolean isNative() {
return offheap;
}
@Override
public Buffer allocateShared(AllocatorControl allocatorControl, long size, Drop<Buffer> drop, Cleaner cleaner) {
final Object base;