netty-incubator-buffer-api/buffer-api/src/main/java/io/netty/buffer/api/pool/PoolChunk.java

663 lines
22 KiB
Java

/*
* Copyright 2021 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.buffer.api.pool;
import io.netty.buffer.api.internal.CleanerDrop;
import io.netty.buffer.api.AllocatorControl.UntetheredMemory;
import io.netty.buffer.api.Buffer;
import io.netty.buffer.api.Drop;
import io.netty.buffer.api.MemoryManager;
import io.netty.buffer.api.internal.ArcDrop;
import io.netty.buffer.api.internal.Statics;
import java.util.PriorityQueue;
/**
* Description of algorithm for PageRun/PoolSubpage allocation from PoolChunk
*
* Notation: The following terms are important to understand the code
* > page - a page is the smallest unit of memory chunk that can be allocated
* > run - a run is a collection of pages
* > chunk - a chunk is a collection of runs
* > in this code chunkSize = maxPages * pageSize
*
* To begin we allocate a byte array of size = chunkSize
* Whenever a ByteBuf of given size needs to be created we search for the first position
* in the byte array that has enough empty space to accommodate the requested size and
* return a (long) handle that encodes this offset information, (this memory segment is then
* marked as reserved, so it is always used by exactly one ByteBuf and no more)
*
* For simplicity all sizes are normalized according to {@link PoolArena#size2SizeIdx(int)} method.
* This ensures that when we request for memory segments of size > pageSize the normalizedCapacity
* equals the next nearest size in {@link SizeClasses}.
*
*
* A chunk has the following layout:
*
* /-----------------\
* | run |
* | |
* | |
* |-----------------|
* | run |
* | |
* |-----------------|
* | unalloctated |
* | (freed) |
* | |
* |-----------------|
* | subpage |
* |-----------------|
* | unallocated |
* | (freed) |
* | ... |
* | ... |
* | ... |
* | |
* | |
* | |
* \-----------------/
*
*
* handle:
* -------
* a handle is a long number, the bit layout of a run looks like:
*
* oooooooo ooooooos ssssssss ssssssue bbbbbbbb bbbbbbbb bbbbbbbb bbbbbbbb
*
* o: runOffset (page offset in the chunk), 15bit
* s: size (number of pages) of this run, 15bit
* u: isUsed?, 1bit
* e: isSubpage?, 1bit
* b: bitmapIdx of subpage, zero if it's not subpage, 32bit
*
* runsAvailMap:
* ------
* a map which manages all runs (used and not in used).
* For each run, the first runOffset and last runOffset are stored in runsAvailMap.
* key: runOffset
* value: handle
*
* runsAvail:
* ----------
* an array of {@link PriorityQueue}.
* Each queue manages same size of runs.
* Runs are sorted by offset, so that we always allocate runs with smaller offset.
*
*
* Algorithm:
* ----------
*
* As we allocate runs, we update values stored in runsAvailMap and runsAvail so that the property is maintained.
*
* Initialization -
* In the beginning we store the initial run which is the whole chunk.
* The initial run:
* runOffset = 0
* size = chunkSize
* isUsed = no
* isSubpage = no
* bitmapIdx = 0
*
*
* Algorithm: [allocateRun(size)]
* ----------
* 1) find the first avail run using in runsAvails according to size
* 2) if pages of run is larger than request pages then split it, and save the tailing run
* for later using
*
* Algorithm: [allocateSubpage(size)]
* ----------
* 1) find a not full subpage according to size.
* if it already exists just return, otherwise allocate a new PoolSubpage and call init()
* note that this subpage object is added to subpagesPool in the PoolArena when we init() it
* 2) call subpage.allocate()
*
* Algorithm: [free(handle, length, nioBuffer)]
* ----------
* 1) if it is a subpage, return the slab back into this subpage
* 2) if the subpage is not used, or it is a run, then start free this run
* 3) merge continuous avail runs
* 4) save the merged run
*
*/
final class PoolChunk implements PoolChunkMetric {
private static final int SIZE_BIT_LENGTH = 15;
private static final int INUSED_BIT_LENGTH = 1;
private static final int SUBPAGE_BIT_LENGTH = 1;
private static final int BITMAP_IDX_BIT_LENGTH = 32;
static final int IS_SUBPAGE_SHIFT = BITMAP_IDX_BIT_LENGTH;
static final int IS_USED_SHIFT = SUBPAGE_BIT_LENGTH + IS_SUBPAGE_SHIFT;
static final int SIZE_SHIFT = INUSED_BIT_LENGTH + IS_USED_SHIFT;
static final int RUN_OFFSET_SHIFT = SIZE_BIT_LENGTH + SIZE_SHIFT;
final PoolArena arena;
final Buffer base; // The buffer that is the source of the memory. Closing it will free the memory.
final Object memory;
final Drop<Buffer> baseDrop; // An ArcDrop that manages references to the base Buffer.
/**
* store the first page and last page of each avail run
*/
private final LongLongHashMap runsAvailMap;
/**
* manage all avail runs
*/
private final LongPriorityQueue[] runsAvail;
/**
* manage all subpages in this chunk
*/
private final PoolSubpage[] subpages;
private final int pageSize;
private final int pageShifts;
private final int chunkSize;
int freeBytes;
PoolChunkList parent;
PoolChunk prev;
PoolChunk next;
PoolChunk(PoolArena arena, int pageSize, int pageShifts, int chunkSize,
int maxPageIdx) {
this.arena = arena;
MemoryManager manager = arena.manager;
base = manager.allocateShared(arena, chunkSize, manager.drop(), Statics.CLEANER);
memory = manager.unwrapRecoverableMemory(base);
baseDrop = ArcDrop.wrap(Buffer::close);
this.pageSize = pageSize;
this.pageShifts = pageShifts;
this.chunkSize = chunkSize;
freeBytes = chunkSize;
runsAvail = newRunsAvailqueueArray(maxPageIdx);
runsAvailMap = new LongLongHashMap(-1);
subpages = new PoolSubpage[chunkSize >> pageShifts];
//insert initial run, offset = 0, pages = chunkSize / pageSize
int pages = chunkSize >> pageShifts;
long initHandle = (long) pages << SIZE_SHIFT;
insertAvailRun(0, pages, initHandle);
}
private static LongPriorityQueue[] newRunsAvailqueueArray(int size) {
LongPriorityQueue[] queueArray = new LongPriorityQueue[size];
for (int i = 0; i < queueArray.length; i++) {
queueArray[i] = new LongPriorityQueue();
}
return queueArray;
}
private void insertAvailRun(int runOffset, int pages, long handle) {
int pageIdxFloor = arena.pages2pageIdxFloor(pages);
LongPriorityQueue queue = runsAvail[pageIdxFloor];
queue.offer(handle);
//insert first page of run
insertAvailRun0(runOffset, handle);
if (pages > 1) {
//insert last page of run
insertAvailRun0(lastPage(runOffset, pages), handle);
}
}
private void insertAvailRun0(int runOffset, long handle) {
long pre = runsAvailMap.put(runOffset, handle);
assert pre == -1;
}
private void removeAvailRun(long handle) {
int pageIdxFloor = arena.pages2pageIdxFloor(runPages(handle));
LongPriorityQueue queue = runsAvail[pageIdxFloor];
removeAvailRun(queue, handle);
}
private void removeAvailRun(LongPriorityQueue queue, long handle) {
queue.remove(handle);
int runOffset = runOffset(handle);
int pages = runPages(handle);
//remove first page of run
runsAvailMap.remove(runOffset);
if (pages > 1) {
//remove last page of run
runsAvailMap.remove(lastPage(runOffset, pages));
}
}
private static int lastPage(int runOffset, int pages) {
return runOffset + pages - 1;
}
private long getAvailRunByOffset(int runOffset) {
return runsAvailMap.get(runOffset);
}
@Override
public int usage() {
final int freeBytes;
synchronized (arena) {
freeBytes = this.freeBytes;
}
return usage(freeBytes);
}
private int usage(int freeBytes) {
if (freeBytes == 0) {
return 100;
}
int freePercentage = (int) (freeBytes * 100L / chunkSize);
if (freePercentage == 0) {
return 99;
}
return 100 - freePercentage;
}
UntetheredMemory allocate(int size, int sizeIdx, PoolThreadCache cache, PooledAllocatorControl control) {
final long handle;
if (sizeIdx <= arena.smallMaxSizeIdx) {
// small
handle = allocateSubpage(sizeIdx);
if (handle < 0) {
return null;
}
assert isSubpage(handle);
} else {
// normal
// runSize must be multiple of pageSize
int runSize = arena.sizeIdx2size(sizeIdx);
handle = allocateRun(runSize);
if (handle < 0) {
return null;
}
}
return allocateBuffer(handle, size, cache, control);
}
private long allocateRun(int runSize) {
int pages = runSize >> pageShifts;
int pageIdx = arena.pages2pageIdx(pages);
synchronized (runsAvail) {
//find first queue which has at least one big enough run
int queueIdx = runFirstBestFit(pageIdx);
if (queueIdx == -1) {
return -1;
}
//get run with min offset in this queue
LongPriorityQueue queue = runsAvail[queueIdx];
long handle = queue.poll();
assert handle != LongPriorityQueue.NO_VALUE && !isUsed(handle) : "invalid handle: " + handle;
removeAvailRun(queue, handle);
if (handle != -1) {
handle = splitLargeRun(handle, pages);
}
freeBytes -= runSize(pageShifts, handle);
return handle;
}
}
private int calculateRunSize(int sizeIdx) {
int maxElements = 1 << pageShifts - SizeClasses.LOG2_QUANTUM;
int runSize = 0;
int nElements;
final int elemSize = arena.sizeIdx2size(sizeIdx);
// Find the lowest common multiple of pageSize and elemSize
do {
runSize += pageSize;
nElements = runSize / elemSize;
} while (nElements < maxElements && runSize != nElements * elemSize);
while (nElements > maxElements) {
runSize -= pageSize;
nElements = runSize / elemSize;
}
assert nElements > 0;
assert runSize <= chunkSize;
assert runSize >= elemSize;
return runSize;
}
private int runFirstBestFit(int pageIdx) {
if (freeBytes == chunkSize) {
return arena.nPSizes - 1;
}
for (int i = pageIdx; i < arena.nPSizes; i++) {
LongPriorityQueue queue = runsAvail[i];
if (queue != null && !queue.isEmpty()) {
return i;
}
}
return -1;
}
private long splitLargeRun(long handle, int needPages) {
assert needPages > 0;
int totalPages = runPages(handle);
assert needPages <= totalPages;
int remPages = totalPages - needPages;
if (remPages > 0) {
int runOffset = runOffset(handle);
// keep track of trailing unused pages for later use
int availOffset = runOffset + needPages;
long availRun = toRunHandle(availOffset, remPages, 0);
insertAvailRun(availOffset, remPages, availRun);
// not avail
return toRunHandle(runOffset, needPages, 1);
}
//mark it as used
handle |= 1L << IS_USED_SHIFT;
return handle;
}
/**
* Create / initialize a new PoolSubpage of normCapacity. Any PoolSubpage created / initialized here is added to
* subpage pool in the PoolArena that owns this PoolChunk
*
* @param sizeIdx sizeIdx of normalized size
*
* @return index in memoryMap
*/
private long allocateSubpage(int sizeIdx) {
// Obtain the head of the PoolSubPage pool that is owned by the PoolArena and synchronize on it.
// This is need as we may add it back and so alter the linked-list structure.
PoolSubpage head = arena.findSubpagePoolHead(sizeIdx);
synchronized (head) {
//allocate a new run
int runSize = calculateRunSize(sizeIdx);
//runSize must be multiples of pageSize
long runHandle = allocateRun(runSize);
if (runHandle < 0) {
return -1;
}
int runOffset = runOffset(runHandle);
assert subpages[runOffset] == null;
int elemSize = arena.sizeIdx2size(sizeIdx);
PoolSubpage subpage = new PoolSubpage(head, this, pageShifts, runOffset,
runSize(pageShifts, runHandle), elemSize);
subpages[runOffset] = subpage;
return subpage.allocate();
}
}
/**
* Free a subpage, or a run of pages When a subpage is freed from PoolSubpage, it might be added back to subpage
* pool of the owning PoolArena. If the subpage pool in PoolArena has at least one other PoolSubpage of given
* elemSize, we can completely free the owning Page, so it is available for subsequent allocations.
*
* @param handle handle to free
*/
void free(long handle, int normCapacity) {
baseDrop.drop(base); // Decrement reference count.
if (isSubpage(handle)) {
int sizeIdx = arena.size2SizeIdx(normCapacity);
PoolSubpage head = arena.findSubpagePoolHead(sizeIdx);
int sIdx = runOffset(handle);
PoolSubpage subpage = subpages[sIdx];
assert subpage != null && subpage.doNotDestroy;
// Obtain the head of the PoolSubPage pool that is owned by the PoolArena and synchronize on it.
// This is need as we may add it back and so alter the linked-list structure.
synchronized (head) {
if (subpage.free(head, bitmapIdx(handle))) {
//the subpage is still used, do not free it
return;
}
assert !subpage.doNotDestroy;
// Null out slot in the array as it was freed, and we should not use it anymore.
subpages[sIdx] = null;
}
}
//start free run
int pages = runPages(handle);
synchronized (runsAvail) {
// collapse continuous runs, successfully collapsed runs
// will be removed from runsAvail and runsAvailMap
long finalRun = collapseRuns(handle);
//set run as not used
finalRun &= ~(1L << IS_USED_SHIFT);
//if it is a subpage, set it to run
finalRun &= ~(1L << IS_SUBPAGE_SHIFT);
insertAvailRun(runOffset(finalRun), runPages(finalRun), finalRun);
freeBytes += pages << pageShifts;
}
}
private long collapseRuns(long handle) {
return collapseNext(collapsePast(handle));
}
private long collapsePast(long handle) {
for (;;) {
int runOffset = runOffset(handle);
int runPages = runPages(handle);
long pastRun = getAvailRunByOffset(runOffset - 1);
if (pastRun == -1) {
return handle;
}
int pastOffset = runOffset(pastRun);
int pastPages = runPages(pastRun);
//is continuous
if (pastRun != handle && pastOffset + pastPages == runOffset) {
//remove past run
removeAvailRun(pastRun);
handle = toRunHandle(pastOffset, pastPages + runPages, 0);
} else {
return handle;
}
}
}
private long collapseNext(long handle) {
for (;;) {
int runOffset = runOffset(handle);
int runPages = runPages(handle);
long nextRun = getAvailRunByOffset(runOffset + runPages);
if (nextRun == -1) {
return handle;
}
int nextOffset = runOffset(nextRun);
int nextPages = runPages(nextRun);
//is continuous
if (nextRun != handle && runOffset + runPages == nextOffset) {
//remove next run
removeAvailRun(nextRun);
handle = toRunHandle(runOffset, runPages + nextPages, 0);
} else {
return handle;
}
}
}
private static long toRunHandle(int runOffset, int runPages, int inUsed) {
return (long) runOffset << RUN_OFFSET_SHIFT
| (long) runPages << SIZE_SHIFT
| (long) inUsed << IS_USED_SHIFT;
}
UntetheredMemory allocateBuffer(long handle, int size, PoolThreadCache threadCache,
PooledAllocatorControl control) {
if (isRun(handle)) {
int offset = runOffset(handle) << pageShifts;
int maxLength = runSize(pageShifts, handle);
PoolThreadCache poolThreadCache = arena.parent.threadCache();
initAllocatorControl(control, poolThreadCache, handle, maxLength);
ArcDrop.acquire(baseDrop);
return new UntetheredChunkAllocation(
memory, this, poolThreadCache, handle, maxLength, offset, size);
} else {
return allocateBufferWithSubpage(handle, size, threadCache, control);
}
}
UntetheredMemory allocateBufferWithSubpage(long handle, int size, PoolThreadCache threadCache,
PooledAllocatorControl control) {
int runOffset = runOffset(handle);
int bitmapIdx = bitmapIdx(handle);
PoolSubpage s = subpages[runOffset];
assert s.doNotDestroy;
assert size <= s.elemSize;
int offset = (runOffset << pageShifts) + bitmapIdx * s.elemSize;
initAllocatorControl(control, threadCache, handle, s.elemSize);
ArcDrop.acquire(baseDrop);
return new UntetheredChunkAllocation(memory, this, threadCache, handle, s.elemSize, offset, size);
}
@SuppressWarnings("unchecked")
private static final class UntetheredChunkAllocation implements UntetheredMemory {
private final Object memory;
private final PoolChunk chunk;
private final PoolThreadCache threadCache;
private final long handle;
private final int maxLength;
private final int offset;
private final int size;
private UntetheredChunkAllocation(
Object memory, PoolChunk chunk, PoolThreadCache threadCache,
long handle, int maxLength, int offset, int size) {
this.memory = memory;
this.chunk = chunk;
this.threadCache = threadCache;
this.handle = handle;
this.maxLength = maxLength;
this.offset = offset;
this.size = size;
}
@Override
public <Memory> Memory memory() {
return (Memory) chunk.arena.manager.sliceMemory(memory, offset, size);
}
@Override
public <BufferType extends Buffer> Drop<BufferType> drop() {
PooledDrop pooledDrop = new PooledDrop(chunk.arena, chunk, threadCache, handle, maxLength);
return (Drop<BufferType>) CleanerDrop.wrap(pooledDrop);
}
}
private void initAllocatorControl(PooledAllocatorControl control, PoolThreadCache threadCache, long handle,
int normSize) {
control.arena = arena;
control.chunk = this;
control.threadCache = threadCache;
control.handle = handle;
control.normSize = normSize;
}
@Override
public int chunkSize() {
return chunkSize;
}
@Override
public int freeBytes() {
synchronized (arena) {
return freeBytes;
}
}
@Override
public String toString() {
final int freeBytes;
synchronized (arena) {
freeBytes = this.freeBytes;
}
return new StringBuilder()
.append("Chunk(")
.append(Integer.toHexString(System.identityHashCode(this)))
.append(": ")
.append(usage(freeBytes))
.append("%, ")
.append(chunkSize - freeBytes)
.append('/')
.append(chunkSize)
.append(')')
.toString();
}
void destroy() {
baseDrop.drop(base); // Decrement reference count from the chunk (allocated buffers may keep the base alive)
}
static int runOffset(long handle) {
return (int) (handle >> RUN_OFFSET_SHIFT);
}
static int runSize(int pageShifts, long handle) {
return runPages(handle) << pageShifts;
}
static int runPages(long handle) {
return (int) (handle >> SIZE_SHIFT & 0x7fff);
}
static boolean isUsed(long handle) {
return (handle >> IS_USED_SHIFT & 1) == 1L;
}
static boolean isRun(long handle) {
return !isSubpage(handle);
}
static boolean isSubpage(long handle) {
return (handle >> IS_SUBPAGE_SHIFT & 1) == 1L;
}
static int bitmapIdx(long handle) {
return (int) handle;
}
}