netty-incubator-buffer-api/src/main/java/io/netty/buffer/api/pool/PoolChunk.java
Chris Vest 0105e5231d Remove the SizeClassedMemoryPool implementation
And fix the remaining test failures for the PooledBufferAllocator.
The PooledBufferAllocator now also keeps its chunks alive, even after closing the pool, as long as there are allocated buffers that refer to the memory.
The pool now clears all of its relevant internal references when closed, allowing the GC to reclaim all of the pooled memory, assuming no allocated buffers remain.
2021-05-18 18:20:32 +02:00

663 lines
22 KiB
Java

/*
* Copyright 2021 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.buffer.api.pool;
import io.netty.buffer.api.AllocatorControl.UntetheredMemory;
import io.netty.buffer.api.Buffer;
import io.netty.buffer.api.Drop;
import io.netty.buffer.api.MemoryManager;
import io.netty.buffer.api.internal.ArcDrop;
import io.netty.buffer.api.internal.CleanerDrop;
import io.netty.buffer.api.internal.Statics;
import java.util.PriorityQueue;
/**
* Description of algorithm for PageRun/PoolSubpage allocation from PoolChunk
*
* Notation: The following terms are important to understand the code
* > page - a page is the smallest unit of memory chunk that can be allocated
* > run - a run is a collection of pages
* > chunk - a chunk is a collection of runs
* > in this code chunkSize = maxPages * pageSize
*
* To begin we allocate a byte array of size = chunkSize
* Whenever a ByteBuf of given size needs to be created we search for the first position
* in the byte array that has enough empty space to accommodate the requested size and
* return a (long) handle that encodes this offset information, (this memory segment is then
* marked as reserved, so it is always used by exactly one ByteBuf and no more)
*
* For simplicity all sizes are normalized according to {@link PoolArena#size2SizeIdx(int)} method.
* This ensures that when we request for memory segments of size > pageSize the normalizedCapacity
* equals the next nearest size in {@link SizeClasses}.
*
*
* A chunk has the following layout:
*
* /-----------------\
* | run |
* | |
* | |
* |-----------------|
* | run |
* | |
* |-----------------|
* | unalloctated |
* | (freed) |
* | |
* |-----------------|
* | subpage |
* |-----------------|
* | unallocated |
* | (freed) |
* | ... |
* | ... |
* | ... |
* | |
* | |
* | |
* \-----------------/
*
*
* handle:
* -------
* a handle is a long number, the bit layout of a run looks like:
*
* oooooooo ooooooos ssssssss ssssssue bbbbbbbb bbbbbbbb bbbbbbbb bbbbbbbb
*
* o: runOffset (page offset in the chunk), 15bit
* s: size (number of pages) of this run, 15bit
* u: isUsed?, 1bit
* e: isSubpage?, 1bit
* b: bitmapIdx of subpage, zero if it's not subpage, 32bit
*
* runsAvailMap:
* ------
* a map which manages all runs (used and not in used).
* For each run, the first runOffset and last runOffset are stored in runsAvailMap.
* key: runOffset
* value: handle
*
* runsAvail:
* ----------
* an array of {@link PriorityQueue}.
* Each queue manages same size of runs.
* Runs are sorted by offset, so that we always allocate runs with smaller offset.
*
*
* Algorithm:
* ----------
*
* As we allocate runs, we update values stored in runsAvailMap and runsAvail so that the property is maintained.
*
* Initialization -
* In the beginning we store the initial run which is the whole chunk.
* The initial run:
* runOffset = 0
* size = chunkSize
* isUsed = no
* isSubpage = no
* bitmapIdx = 0
*
*
* Algorithm: [allocateRun(size)]
* ----------
* 1) find the first avail run using in runsAvails according to size
* 2) if pages of run is larger than request pages then split it, and save the tailing run
* for later using
*
* Algorithm: [allocateSubpage(size)]
* ----------
* 1) find a not full subpage according to size.
* if it already exists just return, otherwise allocate a new PoolSubpage and call init()
* note that this subpage object is added to subpagesPool in the PoolArena when we init() it
* 2) call subpage.allocate()
*
* Algorithm: [free(handle, length, nioBuffer)]
* ----------
* 1) if it is a subpage, return the slab back into this subpage
* 2) if the subpage is not used, or it is a run, then start free this run
* 3) merge continuous avail runs
* 4) save the merged run
*
*/
final class PoolChunk implements PoolChunkMetric {
private static final int SIZE_BIT_LENGTH = 15;
private static final int INUSED_BIT_LENGTH = 1;
private static final int SUBPAGE_BIT_LENGTH = 1;
private static final int BITMAP_IDX_BIT_LENGTH = 32;
static final int IS_SUBPAGE_SHIFT = BITMAP_IDX_BIT_LENGTH;
static final int IS_USED_SHIFT = SUBPAGE_BIT_LENGTH + IS_SUBPAGE_SHIFT;
static final int SIZE_SHIFT = INUSED_BIT_LENGTH + IS_USED_SHIFT;
static final int RUN_OFFSET_SHIFT = SIZE_BIT_LENGTH + SIZE_SHIFT;
final PoolArena arena;
final Buffer base; // The buffer that is the source of the memory. Closing it will free the memory.
final Object memory;
final Drop<Buffer> baseDrop; // An ArcDrop that manages references to the base Buffer.
/**
* store the first page and last page of each avail run
*/
private final LongLongHashMap runsAvailMap;
/**
* manage all avail runs
*/
private final LongPriorityQueue[] runsAvail;
/**
* manage all subpages in this chunk
*/
private final PoolSubpage[] subpages;
private final int pageSize;
private final int pageShifts;
private final int chunkSize;
int freeBytes;
PoolChunkList parent;
PoolChunk prev;
PoolChunk next;
PoolChunk(PoolArena arena, int pageSize, int pageShifts, int chunkSize,
int maxPageIdx) {
this.arena = arena;
MemoryManager manager = arena.manager;
base = manager.allocateShared(arena, chunkSize, manager.drop(), Statics.CLEANER);
memory = manager.unwrapRecoverableMemory(base);
baseDrop = ArcDrop.wrap(Buffer::close);
this.pageSize = pageSize;
this.pageShifts = pageShifts;
this.chunkSize = chunkSize;
freeBytes = chunkSize;
runsAvail = newRunsAvailqueueArray(maxPageIdx);
runsAvailMap = new LongLongHashMap(-1);
subpages = new PoolSubpage[chunkSize >> pageShifts];
//insert initial run, offset = 0, pages = chunkSize / pageSize
int pages = chunkSize >> pageShifts;
long initHandle = (long) pages << SIZE_SHIFT;
insertAvailRun(0, pages, initHandle);
}
private static LongPriorityQueue[] newRunsAvailqueueArray(int size) {
LongPriorityQueue[] queueArray = new LongPriorityQueue[size];
for (int i = 0; i < queueArray.length; i++) {
queueArray[i] = new LongPriorityQueue();
}
return queueArray;
}
private void insertAvailRun(int runOffset, int pages, long handle) {
int pageIdxFloor = arena.pages2pageIdxFloor(pages);
LongPriorityQueue queue = runsAvail[pageIdxFloor];
queue.offer(handle);
//insert first page of run
insertAvailRun0(runOffset, handle);
if (pages > 1) {
//insert last page of run
insertAvailRun0(lastPage(runOffset, pages), handle);
}
}
private void insertAvailRun0(int runOffset, long handle) {
long pre = runsAvailMap.put(runOffset, handle);
assert pre == -1;
}
private void removeAvailRun(long handle) {
int pageIdxFloor = arena.pages2pageIdxFloor(runPages(handle));
LongPriorityQueue queue = runsAvail[pageIdxFloor];
removeAvailRun(queue, handle);
}
private void removeAvailRun(LongPriorityQueue queue, long handle) {
queue.remove(handle);
int runOffset = runOffset(handle);
int pages = runPages(handle);
//remove first page of run
runsAvailMap.remove(runOffset);
if (pages > 1) {
//remove last page of run
runsAvailMap.remove(lastPage(runOffset, pages));
}
}
private static int lastPage(int runOffset, int pages) {
return runOffset + pages - 1;
}
private long getAvailRunByOffset(int runOffset) {
return runsAvailMap.get(runOffset);
}
@Override
public int usage() {
final int freeBytes;
synchronized (arena) {
freeBytes = this.freeBytes;
}
return usage(freeBytes);
}
private int usage(int freeBytes) {
if (freeBytes == 0) {
return 100;
}
int freePercentage = (int) (freeBytes * 100L / chunkSize);
if (freePercentage == 0) {
return 99;
}
return 100 - freePercentage;
}
UntetheredMemory allocate(int size, int sizeIdx, PoolThreadCache cache, PooledAllocatorControl control) {
final long handle;
if (sizeIdx <= arena.smallMaxSizeIdx) {
// small
handle = allocateSubpage(sizeIdx);
if (handle < 0) {
return null;
}
assert isSubpage(handle);
} else {
// normal
// runSize must be multiple of pageSize
int runSize = arena.sizeIdx2size(sizeIdx);
handle = allocateRun(runSize);
if (handle < 0) {
return null;
}
}
return allocateBuffer(handle, size, cache, control);
}
private long allocateRun(int runSize) {
int pages = runSize >> pageShifts;
int pageIdx = arena.pages2pageIdx(pages);
synchronized (runsAvail) {
//find first queue which has at least one big enough run
int queueIdx = runFirstBestFit(pageIdx);
if (queueIdx == -1) {
return -1;
}
//get run with min offset in this queue
LongPriorityQueue queue = runsAvail[queueIdx];
long handle = queue.poll();
assert handle != LongPriorityQueue.NO_VALUE && !isUsed(handle) : "invalid handle: " + handle;
removeAvailRun(queue, handle);
if (handle != -1) {
handle = splitLargeRun(handle, pages);
}
freeBytes -= runSize(pageShifts, handle);
return handle;
}
}
private int calculateRunSize(int sizeIdx) {
int maxElements = 1 << pageShifts - SizeClasses.LOG2_QUANTUM;
int runSize = 0;
int nElements;
final int elemSize = arena.sizeIdx2size(sizeIdx);
// Find the lowest common multiple of pageSize and elemSize
do {
runSize += pageSize;
nElements = runSize / elemSize;
} while (nElements < maxElements && runSize != nElements * elemSize);
while (nElements > maxElements) {
runSize -= pageSize;
nElements = runSize / elemSize;
}
assert nElements > 0;
assert runSize <= chunkSize;
assert runSize >= elemSize;
return runSize;
}
private int runFirstBestFit(int pageIdx) {
if (freeBytes == chunkSize) {
return arena.nPSizes - 1;
}
for (int i = pageIdx; i < arena.nPSizes; i++) {
LongPriorityQueue queue = runsAvail[i];
if (queue != null && !queue.isEmpty()) {
return i;
}
}
return -1;
}
private long splitLargeRun(long handle, int needPages) {
assert needPages > 0;
int totalPages = runPages(handle);
assert needPages <= totalPages;
int remPages = totalPages - needPages;
if (remPages > 0) {
int runOffset = runOffset(handle);
// keep track of trailing unused pages for later use
int availOffset = runOffset + needPages;
long availRun = toRunHandle(availOffset, remPages, 0);
insertAvailRun(availOffset, remPages, availRun);
// not avail
return toRunHandle(runOffset, needPages, 1);
}
//mark it as used
handle |= 1L << IS_USED_SHIFT;
return handle;
}
/**
* Create / initialize a new PoolSubpage of normCapacity. Any PoolSubpage created / initialized here is added to
* subpage pool in the PoolArena that owns this PoolChunk
*
* @param sizeIdx sizeIdx of normalized size
*
* @return index in memoryMap
*/
private long allocateSubpage(int sizeIdx) {
// Obtain the head of the PoolSubPage pool that is owned by the PoolArena and synchronize on it.
// This is need as we may add it back and so alter the linked-list structure.
PoolSubpage head = arena.findSubpagePoolHead(sizeIdx);
synchronized (head) {
//allocate a new run
int runSize = calculateRunSize(sizeIdx);
//runSize must be multiples of pageSize
long runHandle = allocateRun(runSize);
if (runHandle < 0) {
return -1;
}
int runOffset = runOffset(runHandle);
assert subpages[runOffset] == null;
int elemSize = arena.sizeIdx2size(sizeIdx);
PoolSubpage subpage = new PoolSubpage(head, this, pageShifts, runOffset,
runSize(pageShifts, runHandle), elemSize);
subpages[runOffset] = subpage;
return subpage.allocate();
}
}
/**
* Free a subpage, or a run of pages When a subpage is freed from PoolSubpage, it might be added back to subpage
* pool of the owning PoolArena. If the subpage pool in PoolArena has at least one other PoolSubpage of given
* elemSize, we can completely free the owning Page, so it is available for subsequent allocations.
*
* @param handle handle to free
*/
void free(long handle, int normCapacity) {
baseDrop.drop(base); // Decrement reference count.
if (isSubpage(handle)) {
int sizeIdx = arena.size2SizeIdx(normCapacity);
PoolSubpage head = arena.findSubpagePoolHead(sizeIdx);
int sIdx = runOffset(handle);
PoolSubpage subpage = subpages[sIdx];
assert subpage != null && subpage.doNotDestroy;
// Obtain the head of the PoolSubPage pool that is owned by the PoolArena and synchronize on it.
// This is need as we may add it back and so alter the linked-list structure.
synchronized (head) {
if (subpage.free(head, bitmapIdx(handle))) {
//the subpage is still used, do not free it
return;
}
assert !subpage.doNotDestroy;
// Null out slot in the array as it was freed, and we should not use it anymore.
subpages[sIdx] = null;
}
}
//start free run
int pages = runPages(handle);
synchronized (runsAvail) {
// collapse continuous runs, successfully collapsed runs
// will be removed from runsAvail and runsAvailMap
long finalRun = collapseRuns(handle);
//set run as not used
finalRun &= ~(1L << IS_USED_SHIFT);
//if it is a subpage, set it to run
finalRun &= ~(1L << IS_SUBPAGE_SHIFT);
insertAvailRun(runOffset(finalRun), runPages(finalRun), finalRun);
freeBytes += pages << pageShifts;
}
}
private long collapseRuns(long handle) {
return collapseNext(collapsePast(handle));
}
private long collapsePast(long handle) {
for (;;) {
int runOffset = runOffset(handle);
int runPages = runPages(handle);
long pastRun = getAvailRunByOffset(runOffset - 1);
if (pastRun == -1) {
return handle;
}
int pastOffset = runOffset(pastRun);
int pastPages = runPages(pastRun);
//is continuous
if (pastRun != handle && pastOffset + pastPages == runOffset) {
//remove past run
removeAvailRun(pastRun);
handle = toRunHandle(pastOffset, pastPages + runPages, 0);
} else {
return handle;
}
}
}
private long collapseNext(long handle) {
for (;;) {
int runOffset = runOffset(handle);
int runPages = runPages(handle);
long nextRun = getAvailRunByOffset(runOffset + runPages);
if (nextRun == -1) {
return handle;
}
int nextOffset = runOffset(nextRun);
int nextPages = runPages(nextRun);
//is continuous
if (nextRun != handle && runOffset + runPages == nextOffset) {
//remove next run
removeAvailRun(nextRun);
handle = toRunHandle(runOffset, runPages + nextPages, 0);
} else {
return handle;
}
}
}
private static long toRunHandle(int runOffset, int runPages, int inUsed) {
return (long) runOffset << RUN_OFFSET_SHIFT
| (long) runPages << SIZE_SHIFT
| (long) inUsed << IS_USED_SHIFT;
}
UntetheredMemory allocateBuffer(long handle, int size, PoolThreadCache threadCache,
PooledAllocatorControl control) {
if (isRun(handle)) {
int offset = runOffset(handle) << pageShifts;
int maxLength = runSize(pageShifts, handle);
PoolThreadCache poolThreadCache = arena.parent.threadCache();
initAllocatorControl(control, poolThreadCache, handle, maxLength);
ArcDrop.acquire(baseDrop);
return new UntetheredChunkAllocation(
memory, this, poolThreadCache, handle, maxLength, offset, size);
} else {
return allocateBufferWithSubpage(handle, size, threadCache, control);
}
}
UntetheredMemory allocateBufferWithSubpage(long handle, int size, PoolThreadCache threadCache,
PooledAllocatorControl control) {
int runOffset = runOffset(handle);
int bitmapIdx = bitmapIdx(handle);
PoolSubpage s = subpages[runOffset];
assert s.doNotDestroy;
assert size <= s.elemSize;
int offset = (runOffset << pageShifts) + bitmapIdx * s.elemSize;
initAllocatorControl(control, threadCache, handle, s.elemSize);
ArcDrop.acquire(baseDrop);
return new UntetheredChunkAllocation(memory, this, threadCache, handle, s.elemSize, offset, size);
}
@SuppressWarnings("unchecked")
private static final class UntetheredChunkAllocation implements UntetheredMemory {
private final Object memory;
private final PoolChunk chunk;
private final PoolThreadCache threadCache;
private final long handle;
private final int maxLength;
private final int offset;
private final int size;
private UntetheredChunkAllocation(
Object memory, PoolChunk chunk, PoolThreadCache threadCache,
long handle, int maxLength, int offset, int size) {
this.memory = memory;
this.chunk = chunk;
this.threadCache = threadCache;
this.handle = handle;
this.maxLength = maxLength;
this.offset = offset;
this.size = size;
}
@Override
public <Memory> Memory memory() {
return (Memory) chunk.arena.manager.sliceMemory(memory, offset, size);
}
@Override
public <BufferType extends Buffer> Drop<BufferType> drop() {
PooledDrop pooledDrop = new PooledDrop(chunk.arena, chunk, threadCache, handle, maxLength);
return (Drop<BufferType>) CleanerDrop.wrap(pooledDrop);
}
}
private void initAllocatorControl(PooledAllocatorControl control, PoolThreadCache threadCache, long handle,
int normSize) {
control.arena = arena;
control.chunk = this;
control.threadCache = threadCache;
control.handle = handle;
control.normSize = normSize;
}
@Override
public int chunkSize() {
return chunkSize;
}
@Override
public int freeBytes() {
synchronized (arena) {
return freeBytes;
}
}
@Override
public String toString() {
final int freeBytes;
synchronized (arena) {
freeBytes = this.freeBytes;
}
return new StringBuilder()
.append("Chunk(")
.append(Integer.toHexString(System.identityHashCode(this)))
.append(": ")
.append(usage(freeBytes))
.append("%, ")
.append(chunkSize - freeBytes)
.append('/')
.append(chunkSize)
.append(')')
.toString();
}
void destroy() {
baseDrop.drop(base); // Decrement reference count from the chunk (allocated buffers may keep the base alive)
}
static int runOffset(long handle) {
return (int) (handle >> RUN_OFFSET_SHIFT);
}
static int runSize(int pageShifts, long handle) {
return runPages(handle) << pageShifts;
}
static int runPages(long handle) {
return (int) (handle >> SIZE_SHIFT & 0x7fff);
}
static boolean isUsed(long handle) {
return (handle >> IS_USED_SHIFT & 1) == 1L;
}
static boolean isRun(long handle) {
return !isSubpage(handle);
}
static boolean isSubpage(long handle) {
return (handle >> IS_SUBPAGE_SHIFT & 1) == 1L;
}
static int bitmapIdx(long handle) {
return (int) handle;
}
}