Second, more complete draft of porting over the pooling allocator from Netty

This commit is contained in:
Chris Vest 2021-05-12 10:44:33 +02:00
parent ae2abdd2aa
commit b4b0afd787
6 changed files with 122 additions and 70 deletions

View File

@ -1,5 +1,6 @@
package io.netty.buffer.api.pool;
import io.netty.buffer.api.AllocatorControl;
import io.netty.buffer.api.Buffer;
import io.netty.buffer.api.BufferAllocator;
import io.netty.buffer.api.MemoryManager;
@ -14,7 +15,7 @@ import java.util.concurrent.atomic.LongAdder;
import static io.netty.buffer.api.pool.PoolChunk.isSubpage;
import static java.lang.Math.max;
class PoolArena extends SizeClasses implements PoolArenaMetric {
class PoolArena extends SizeClasses implements PoolArenaMetric, AllocatorControl {
enum SizeClass {
Small,
Normal
@ -98,13 +99,13 @@ class PoolArena extends SizeClasses implements PoolArenaMetric {
return manager.isNative();
}
Buffer allocate(PoolThreadCache cache, int size) {
Buffer allocate(PooledAllocatorControl control, PoolThreadCache cache, int size) {
final int sizeIdx = size2SizeIdx(size);
if (sizeIdx <= smallMaxSizeIdx) {
return tcacheAllocateSmall(cache, size, sizeIdx);
return tcacheAllocateSmall(control, cache, size, sizeIdx);
} else if (sizeIdx < nSizes) {
return tcacheAllocateNormal(cache, size, sizeIdx);
return tcacheAllocateNormal(control, cache, size, sizeIdx);
} else {
int normCapacity = directMemoryCacheAlignment > 0
? normalizeSize(size) : size;
@ -113,9 +114,9 @@ class PoolArena extends SizeClasses implements PoolArenaMetric {
}
}
private Buffer tcacheAllocateSmall(PoolThreadCache cache, final int size,
final int sizeIdx) {
Buffer buffer = cache.allocateSmall(size, sizeIdx);
private Buffer tcacheAllocateSmall(PooledAllocatorControl control, PoolThreadCache cache, final int size,
final int sizeIdx) {
Buffer buffer = cache.allocateSmall(control, size, sizeIdx);
if (buffer != null) {
// was able to allocate out of the cache so move on
return buffer;
@ -134,13 +135,13 @@ class PoolArena extends SizeClasses implements PoolArenaMetric {
assert s.doNotDestroy && s.elemSize == sizeIdx2size(sizeIdx);
long handle = s.allocate();
assert handle >= 0;
buffer = s.chunk.allocateBufferWithSubpage(handle, size, cache);
buffer = s.chunk.allocateBufferWithSubpage(handle, size, cache, control);
}
}
if (needsNormalAllocation) {
synchronized (this) {
buffer = allocateNormal(size, sizeIdx, cache);
buffer = allocateNormal(size, sizeIdx, cache, control);
}
}
@ -148,45 +149,45 @@ class PoolArena extends SizeClasses implements PoolArenaMetric {
return buffer;
}
private Buffer tcacheAllocateNormal(PoolThreadCache cache, int size, int sizeIdx) {
Buffer buffer = cache.allocateNormal(this, size, sizeIdx);
private Buffer tcacheAllocateNormal(PooledAllocatorControl control, PoolThreadCache cache, int size, int sizeIdx) {
Buffer buffer = cache.allocateNormal(this, control, size, sizeIdx);
if (buffer != null) {
// was able to allocate out of the cache so move on
return buffer;
}
synchronized (this) {
buffer = allocateNormal(size, sizeIdx, cache);
buffer = allocateNormal(size, sizeIdx, cache, control);
allocationsNormal++;
}
return buffer;
}
// Method must be called inside synchronized(this) { ... } block
private Buffer allocateNormal(int size, int sizeIdx, PoolThreadCache threadCache) {
Buffer buffer = q050.allocate(size, sizeIdx, threadCache);
private Buffer allocateNormal(int size, int sizeIdx, PoolThreadCache threadCache, PooledAllocatorControl control) {
Buffer buffer = q050.allocate(size, sizeIdx, threadCache, control);
if (buffer != null) {
return buffer;
}
buffer = q025.allocate(size, sizeIdx, threadCache);
buffer = q025.allocate(size, sizeIdx, threadCache, control);
if (buffer != null) {
return buffer;
}
buffer = q000.allocate(size, sizeIdx, threadCache);
buffer = q000.allocate(size, sizeIdx, threadCache, control);
if (buffer != null) {
return buffer;
}
buffer = qInit.allocate(size, sizeIdx, threadCache);
buffer = qInit.allocate(size, sizeIdx, threadCache, control);
if (buffer != null) {
return buffer;
}
buffer = q075.allocate(size, sizeIdx, threadCache);
buffer = q075.allocate(size, sizeIdx, threadCache, control);
if (buffer != null) {
return buffer;
}
// Add a new chunk.
PoolChunk c = newChunk(pageSize, nPSizes, pageShifts, chunkSize);
buffer = c.allocate(size, sizeIdx, threadCache);
buffer = c.allocate(size, sizeIdx, threadCache, control);
assert buffer != null;
qInit.add(c);
return buffer;
@ -249,6 +250,18 @@ class PoolArena extends SizeClasses implements PoolArenaMetric {
return smallSubpagePools[sizeIdx];
}
@Override
public Object allocateUntethered(Buffer originator, int size) {
throw new AssertionError("PoolChunk base buffers should never need to reallocate.");
}
@Override
public void recoverMemory(Object memory) {
// This means we've lost all strong references to a PoolChunk.
// Probably means we don't need it anymore, so just free its memory.
manager.discardRecoverableMemory(memory);
}
@Override
public int numThreadCaches() {
return numThreadCaches.get();
@ -397,7 +410,7 @@ class PoolArena extends SizeClasses implements PoolArenaMetric {
}
protected final PoolChunk newChunk(int pageSize, int maxPageIdx, int pageShifts, int chunkSize) {
Buffer base = manager.allocateShared(parent, chunkSize, manager.drop(), Statics.CLEANER);
Buffer base = manager.allocateShared(this, chunkSize, manager.drop(), Statics.CLEANER);
Object memory = manager.unwrapRecoverableMemory(base);
return new PoolChunk(
this, base, memory, pageSize, pageShifts, chunkSize, maxPageIdx);
@ -454,4 +467,13 @@ class PoolArena extends SizeClasses implements PoolArenaMetric {
} while (s != head);
}
}
public void close() {
for (PoolSubpage page : smallSubpagePools) {
page.destroy();
}
for (PoolChunkList list : new PoolChunkList[] {qInit, q000, q025, q050, q100}) {
list.destroy();
}
}
}

View File

@ -175,20 +175,6 @@ final class PoolChunk implements PoolChunkMetric {
insertAvailRun(0, pages, initHandle);
}
/** Creates a special chunk that is not pooled. */
PoolChunk(PoolArena arena, Buffer base, Object memory, int size) {
unpooled = true;
this.arena = arena;
this.base = base;
this.memory = memory;
pageSize = 0;
pageShifts = 0;
runsAvailMap = null;
runsAvail = null;
subpages = null;
chunkSize = size;
}
private static LongPriorityQueue[] newRunsAvailqueueArray(int size) {
LongPriorityQueue[] queueArray = new LongPriorityQueue[size];
for (int i = 0; i < queueArray.length; i++) {
@ -263,7 +249,7 @@ final class PoolChunk implements PoolChunkMetric {
return 100 - freePercentage;
}
Buffer allocate(int size, int sizeIdx, PoolThreadCache cache) {
Buffer allocate(int size, int sizeIdx, PoolThreadCache cache, PooledAllocatorControl control) {
final long handle;
if (sizeIdx <= arena.smallMaxSizeIdx) {
// small
@ -282,7 +268,7 @@ final class PoolChunk implements PoolChunkMetric {
}
}
return allocateBuffer(handle, size, cache);
return allocateBuffer(handle, size, cache, control);
}
private long allocateRun(int runSize) {
@ -514,23 +500,25 @@ final class PoolChunk implements PoolChunkMetric {
| (long) inUsed << IS_USED_SHIFT;
}
Buffer allocateBuffer(long handle, int size, PoolThreadCache threadCache) {
Buffer allocateBuffer(long handle, int size, PoolThreadCache threadCache, PooledAllocatorControl control) {
if (isRun(handle)) {
int offset = runOffset(handle) << pageShifts;
int maxLength = runSize(pageShifts, handle);
PoolThreadCache poolThreadCache = arena.parent.threadCache();
return arena.manager.recoverMemory(arena.parent, memory, offset, size, new Drop<Buffer>() {
initAllocatorControl(control, poolThreadCache, handle, maxLength, offset, size);
return arena.manager.recoverMemory(control, memory, offset, size, new Drop<Buffer>() {
@Override
public void drop(Buffer obj) {
arena.free(PoolChunk.this, handle, maxLength, poolThreadCache);
}
});
} else {
return allocateBufferWithSubpage(handle, size, threadCache);
return allocateBufferWithSubpage(handle, size, threadCache, control);
}
}
Buffer allocateBufferWithSubpage(long handle, int size, PoolThreadCache threadCache) {
Buffer allocateBufferWithSubpage(long handle, int size, PoolThreadCache threadCache,
PooledAllocatorControl control) {
int runOffset = runOffset(handle);
int bitmapIdx = bitmapIdx(handle);
@ -539,7 +527,8 @@ final class PoolChunk implements PoolChunkMetric {
assert size <= s.elemSize;
int offset = (runOffset << pageShifts) + bitmapIdx * s.elemSize;
return arena.manager.recoverMemory(arena.parent, memory, offset, size, new Drop<Buffer>() {
initAllocatorControl(control, threadCache, handle, s.elemSize, offset, size);
return arena.manager.recoverMemory(control, memory, offset, size, new Drop<Buffer>() {
@Override
public void drop(Buffer obj) {
arena.free(PoolChunk.this, handle, s.elemSize, threadCache);
@ -547,6 +536,18 @@ final class PoolChunk implements PoolChunkMetric {
});
}
private void initAllocatorControl(PooledAllocatorControl control, PoolThreadCache threadCache, long handle,
int normSize, int offset, int size) {
control.arena = arena;
control.chunk = this;
control.threadCache = threadCache;
control.handle = handle;
control.normSize = normSize;
control.memory = memory;
control.offset = offset;
control.size = size;
}
@Override
public int chunkSize() {
return chunkSize;

View File

@ -76,7 +76,7 @@ final class PoolChunkList implements PoolChunkListMetric {
this.prevList = prevList;
}
Buffer allocate(int size, int sizeIdx, PoolThreadCache threadCache) {
Buffer allocate(int size, int sizeIdx, PoolThreadCache threadCache, PooledAllocatorControl control) {
int normCapacity = arena.sizeIdx2size(sizeIdx);
if (normCapacity > maxCapacity) {
// Either this PoolChunkList is empty, or the requested capacity is larger than the capacity which can
@ -85,7 +85,7 @@ final class PoolChunkList implements PoolChunkListMetric {
}
for (PoolChunk cur = head; cur != null; cur = cur.next) {
Buffer buffer = cur.allocate(size, sizeIdx, threadCache);
Buffer buffer = cur.allocate(size, sizeIdx, threadCache, control);
if (buffer != null) {
if (cur.freeBytes <= freeMinThreshold) {
remove(cur);
@ -223,10 +223,10 @@ final class PoolChunkList implements PoolChunkListMetric {
return buf.toString();
}
void destroy(PoolArena arena) {
void destroy() {
PoolChunk chunk = head;
while (chunk != null) {
arena.destroyChunk(chunk);
chunk.destroy();
chunk = chunk.next;
}
head = null;

View File

@ -106,24 +106,24 @@ final class PoolThreadCache {
/**
* Try to allocate a small buffer out of the cache. Returns {@code true} if successful {@code false} otherwise
*/
Buffer allocateSmall(int size, int sizeIdx) {
return allocate(cacheForSmall(sizeIdx), size);
Buffer allocateSmall(PooledAllocatorControl control, int size, int sizeIdx) {
return allocate(cacheForSmall(sizeIdx), control, size);
}
/**
* Try to allocate a normal buffer out of the cache. Returns {@code true} if successful {@code false} otherwise
*/
Buffer allocateNormal(PoolArena area, int size, int sizeIdx) {
return allocate(cacheForNormal(area, sizeIdx), size);
Buffer allocateNormal(PoolArena area, PooledAllocatorControl control, int size, int sizeIdx) {
return allocate(cacheForNormal(area, sizeIdx), control, size);
}
private Buffer allocate(MemoryRegionCache cache, int size) {
private Buffer allocate(MemoryRegionCache cache, PooledAllocatorControl control, int size) {
if (cache == null) {
// no cache found so just return false here
return null;
}
Buffer allocated = cache.allocate(size, this);
if (++ allocations >= freeSweepAllocationThreshold) {
Buffer allocated = cache.allocate(size, this, control);
if (++allocations >= freeSweepAllocationThreshold) {
allocations = 0;
trim();
}
@ -237,8 +237,9 @@ final class PoolThreadCache {
}
@Override
protected Buffer allocBuf(PoolChunk chunk, long handle, int size, PoolThreadCache threadCache) {
return chunk.allocateBufferWithSubpage(handle, size, threadCache);
protected Buffer allocBuf(PoolChunk chunk, long handle, int size, PoolThreadCache threadCache,
PooledAllocatorControl control) {
return chunk.allocateBufferWithSubpage(handle, size, threadCache, control);
}
}
@ -251,8 +252,9 @@ final class PoolThreadCache {
}
@Override
protected Buffer allocBuf(PoolChunk chunk, long handle, int size, PoolThreadCache threadCache) {
return chunk.allocateBuffer(handle, size, threadCache);
protected Buffer allocBuf(PoolChunk chunk, long handle, int size, PoolThreadCache threadCache,
PooledAllocatorControl control) {
return chunk.allocateBuffer(handle, size, threadCache, control);
}
}
@ -271,7 +273,8 @@ final class PoolThreadCache {
/**
* Allocate a new {@link Buffer} using the provided chunk and handle with the capacity restrictions.
*/
protected abstract Buffer allocBuf(PoolChunk chunk, long handle, int size, PoolThreadCache threadCache);
protected abstract Buffer allocBuf(PoolChunk chunk, long handle, int size, PoolThreadCache threadCache,
PooledAllocatorControl control);
/**
* Add to cache if not already full.
@ -290,12 +293,12 @@ final class PoolThreadCache {
/**
* Allocate something out of the cache if possible and remove the entry from the cache.
*/
public final Buffer allocate(int size, PoolThreadCache threadCache) {
public final Buffer allocate(int size, PoolThreadCache threadCache, PooledAllocatorControl control) {
Entry entry = queue.poll();
if (entry == null) {
return null;
}
Buffer buffer = allocBuf(entry.chunk, entry.handle, size, threadCache);
Buffer buffer = allocBuf(entry.chunk, entry.handle, size, threadCache, control);
entry.recycle();
// allocations are not thread-safe which is fine as this is only called from the same thread all time.

View File

@ -0,0 +1,26 @@
package io.netty.buffer.api.pool;
import io.netty.buffer.api.AllocatorControl;
import io.netty.buffer.api.Buffer;
class PooledAllocatorControl implements AllocatorControl {
public PoolArena arena;
public PoolChunk chunk;
public PoolThreadCache threadCache;
public long handle;
public int normSize;
public Object memory;
public int offset;
public int size;
@Override
public Object allocateUntethered(Buffer originator, int size) {
Buffer allocate = arena.parent.allocate(this, size);
return arena.manager.unwrapRecoverableMemory(allocate);
}
@Override
public void recoverMemory(Object memory) {
arena.free(chunk, handle, normSize, threadCache);
}
}

View File

@ -23,7 +23,7 @@ import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
public class PooledByteBufAllocator implements BufferAllocator, ByteBufAllocatorMetricProvider, AllocatorControl {
public class PooledByteBufAllocator implements BufferAllocator, ByteBufAllocatorMetricProvider {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(PooledByteBufAllocator.class);
private static final int DEFAULT_NUM_HEAP_ARENA;
@ -257,7 +257,7 @@ public class PooledByteBufAllocator implements BufferAllocator, ByteBufAllocator
// Ensure the resulting chunkSize does not overflow.
int chunkSize = pageSize;
for (int i = maxOrder; i > 0; i --) {
for (int i = maxOrder; i > 0; i--) {
if (chunkSize > MAX_CHUNK_SIZE / 2) {
throw new IllegalArgumentException(String.format(
"pageSize (%d) << maxOrder (%d) must not exceed %d", pageSize, maxOrder, MAX_CHUNK_SIZE));
@ -269,25 +269,25 @@ public class PooledByteBufAllocator implements BufferAllocator, ByteBufAllocator
@Override
public Buffer allocate(int size) {
return allocate(new PooledAllocatorControl(), size);
}
Buffer allocate(PooledAllocatorControl control, int size) {
PoolThreadCache cache = threadCache.get();
PoolArena arena = cache.arena;
if (arena != null) {
return arena.allocate(cache, size);
return arena.allocate(control, cache, size);
}
BufferAllocator unpooled = manager.isNative()? BufferAllocator.direct() : BufferAllocator.heap();
return unpooled.allocate(size);
}
@Override
public Object allocateUntethered(Buffer originator, int size) {
// TODO
return null;
}
@Override
public void recoverMemory(Object memory) {
// TODO
public void close() {
for (PoolArena arena : arenas) {
arena.close();
}
}
/**