Make sure that every allocation get their own unique Drop instance.

This allows the pooling allocator to precisely control how each allocation should be dropped.
This is important to the pooling allocator, because it needs to know what arena, chunk, page, run, etc. is being freed, exactly.
This commit is contained in:
Chris Vest 2021-05-17 15:15:19 +02:00
parent fa75c81c6c
commit 12b38234e5
17 changed files with 245 additions and 138 deletions

View File

@ -22,7 +22,7 @@ package io.netty.buffer.api;
*/
public interface AllocatorControl {
/**
* Allocate a buffer that is not tethered to any particular {@link Drop} implementation,
* Allocate a buffer that is not tethered to any particular {@link Buffer} object,
* and return the recoverable memory object from it.
* <p>
* This allows a buffer to implement {@link Buffer#ensureWritable(int)} by having new memory allocated to it,
@ -30,9 +30,9 @@ public interface AllocatorControl {
*
* @param originator The buffer that originated the request for an untethered memory allocated.
* @param size The size of the requested memory allocation, in bytes.
* @return A "recoverable memory" object that is the requested allocation.
* @return A {@link UntetheredMemory} object that is the requested allocation.
*/
Object allocateUntethered(Buffer originator, int size);
UntetheredMemory allocateUntethered(Buffer originator, int size);
/**
* Return memory to the allocator, after it has been untethered from it's lifetime.
@ -42,4 +42,12 @@ public interface AllocatorControl {
* @param memory The untethered memory to return to the allocator.
*/
void recoverMemory(Object memory);
/**
* Memory that isn't attached to any particular buffer.
*/
interface UntetheredMemory {
<Memory> Memory memory();
<BufferType extends Buffer> Drop<BufferType> drop();
}
}

View File

@ -41,11 +41,22 @@ class ManagedBufferAllocator implements BufferAllocator, AllocatorControl {
return () -> manager.allocateConstChild(constantBuffer);
}
@SuppressWarnings("unchecked")
@Override
public Object allocateUntethered(Buffer originator, int size) {
public UntetheredMemory allocateUntethered(Buffer originator, int size) {
BufferAllocator.checkSize(size);
var buf = manager.allocateShared(this, size, NO_OP_DROP, Statics.CLEANER);
return manager.unwrapRecoverableMemory(buf);
return new UntetheredMemory() {
@Override
public <Memory> Memory memory() {
return (Memory) manager.unwrapRecoverableMemory(buf);
}
@Override
public <BufferType extends Buffer> Drop<BufferType> drop() {
return (Drop<BufferType>) manager.drop();
}
};
}
@Override

View File

@ -27,6 +27,5 @@ public interface MemoryManager {
void discardRecoverableMemory(Object recoverableMemory);
// todo should recoverMemory re-attach a cleaner?
Buffer recoverMemory(AllocatorControl allocatorControl, Object recoverableMemory, Drop<Buffer> drop);
Buffer recoverMemory(AllocatorControl allocatorControl, Object recoverableMemoryBase,
int offset, int length, Drop<Buffer> drop);
Object sliceMemory(Object memory, int offset, int length);
}

View File

@ -118,15 +118,29 @@ class SizeClassedMemoryPool implements BufferAllocator, AllocatorControl, Drop<B
return "SizeClassedMemoryPool";
}
@SuppressWarnings("unchecked")
@Override
public Object allocateUntethered(Buffer originator, int size) {
public UntetheredMemory allocateUntethered(Buffer originator, int size) {
var sizeClassPool = getSizeClassPool(size);
Object memory = sizeClassPool.poll();
if (memory == null) {
Object candidateMemory = sizeClassPool.poll();
if (candidateMemory == null) {
Buffer untetheredBuf = createBuf(size, NO_OP_DROP);
memory = manager.unwrapRecoverableMemory(untetheredBuf);
candidateMemory = manager.unwrapRecoverableMemory(untetheredBuf);
}
return memory;
Object memory = candidateMemory;
return new UntetheredMemory() {
@Override
public <Memory> Memory memory() {
return (Memory) memory;
}
@Override
public <BufferType extends Buffer> Drop<BufferType> drop() {
return (Drop<BufferType>) getDrop();
}
};
}
@Override

View File

@ -82,10 +82,8 @@ public class ByteBufferMemoryManager implements MemoryManager {
}
@Override
public Buffer recoverMemory(AllocatorControl allocatorControl, Object recoverableMemoryBase,
int offset, int length, Drop<Buffer> drop) {
ByteBuffer memory = (ByteBuffer) recoverableMemoryBase;
memory = memory.slice(offset, length);
return new NioBuffer(memory, memory, allocatorControl, convert(drop));
public Object sliceMemory(Object memory, int offset, int length) {
var buffer = (ByteBuffer) memory;
return buffer.slice(offset, length);
}
}

View File

@ -408,27 +408,27 @@ class NioBuffer extends RcSupport<Buffer, NioBuffer> implements Buffer, Readable
// Allocate a bigger buffer.
long newSize = capacity() + (long) Math.max(size - writableBytes(), minimumGrowth);
BufferAllocator.checkSize(newSize);
ByteBuffer buffer = (ByteBuffer) control.allocateUntethered(this, (int) newSize);
var untethered = control.allocateUntethered(this, (int) newSize);
ByteBuffer buffer = untethered.memory();
buffer.order(order());
// Copy contents.
copyInto(0, buffer, 0, capacity());
// Release the old memory and install the new:
Drop<NioBuffer> drop = disconnectDrop();
Drop<NioBuffer> drop = untethered.drop();
disconnectDrop(drop);
attachNewBuffer(buffer, drop);
}
private Drop<NioBuffer> disconnectDrop() {
private void disconnectDrop(Drop<NioBuffer> newDrop) {
var drop = (Drop<NioBuffer>) unsafeGetDrop();
int roff = this.roff;
int woff = this.woff;
drop.drop(this);
drop = ArcDrop.unwrapAllArcs(drop);
unsafeSetDrop(new ArcDrop<>(drop));
unsafeSetDrop(new ArcDrop<>(newDrop));
this.roff = roff;
this.woff = woff;
return drop;
}
private void attachNewBuffer(ByteBuffer buffer, Drop<NioBuffer> drop) {

View File

@ -75,10 +75,8 @@ public abstract class AbstractMemorySegmentManager implements MemoryManager {
}
@Override
public Buffer recoverMemory(AllocatorControl allocatorControl, Object recoverableMemoryBase,
int offset, int length, Drop<Buffer> drop) {
var segment = (MemorySegment) recoverableMemoryBase;
segment = segment.asSlice(offset, length);
return new MemSegBuffer(segment, segment, convert(ArcDrop.acquire(drop)), allocatorControl);
public Object sliceMemory(Object memory, int offset, int length) {
var segment = (MemorySegment) memory;
return segment.asSlice(offset, length);
}
}

View File

@ -524,32 +524,27 @@ class MemSegBuffer extends RcSupport<Buffer, MemSegBuffer> implements Buffer, Re
// Allocate a bigger buffer.
long newSize = capacity() + (long) Math.max(size - writableBytes(), minimumGrowth);
BufferAllocator.checkSize(newSize);
MemorySegment newSegment = (MemorySegment) control.allocateUntethered(this, (int) newSize);
var untethered = control.allocateUntethered(this, (int) newSize);
MemorySegment newSegment = untethered.memory();
// Copy contents.
newSegment.copyFrom(seg);
// Release the old memory segment and install the new one:
Drop<MemSegBuffer> drop = disconnectDrop();
Drop<MemSegBuffer> drop = untethered.drop();
disconnectDrop(drop);
attachNewMemorySegment(newSegment, drop);
}
private Drop<MemSegBuffer> disconnectDrop() {
private void disconnectDrop(Drop<MemSegBuffer> newDrop) {
var drop = unsafeGetDrop();
if (drop instanceof ArcDrop) {
// Disconnect from the current arc drop, since we'll get our own fresh memory segment.
int roff = this.roff;
int woff = this.woff;
drop.drop(this);
drop = ArcDrop.unwrapAllArcs(drop);
unsafeSetDrop(new ArcDrop<>(drop));
this.roff = roff;
this.woff = woff;
} else {
// TODO would we ever get here?
control.recoverMemory(recoverableMemory());
}
return drop;
// Disconnect from the current arc drop, since we'll get our own fresh memory segment.
int roff = this.roff;
int woff = this.woff;
drop.drop(this);
unsafeSetDrop(new ArcDrop<>(newDrop));
this.roff = roff;
this.woff = woff;
}
private void attachNewMemorySegment(MemorySegment newSegment, Drop<MemSegBuffer> drop) {

View File

@ -114,7 +114,7 @@ class PoolArena extends SizeClasses implements PoolArenaMetric, AllocatorControl
return manager.isNative();
}
Buffer allocate(PooledAllocatorControl control, PoolThreadCache cache, int size) {
UntetheredMemory allocate(PooledAllocatorControl control, PoolThreadCache cache, int size) {
final int sizeIdx = size2SizeIdx(size);
if (sizeIdx <= smallMaxSizeIdx) {
@ -129,12 +129,12 @@ class PoolArena extends SizeClasses implements PoolArenaMetric, AllocatorControl
}
}
private Buffer tcacheAllocateSmall(PooledAllocatorControl control, PoolThreadCache cache, final int size,
final int sizeIdx) {
Buffer buffer = cache.allocateSmall(control, size, sizeIdx);
if (buffer != null) {
private UntetheredMemory tcacheAllocateSmall(PooledAllocatorControl control, PoolThreadCache cache, final int size,
final int sizeIdx) {
UntetheredMemory memory = cache.allocateSmall(control, size, sizeIdx);
if (memory != null) {
// was able to allocate out of the cache so move on
return buffer;
return memory;
}
/*
@ -150,73 +150,72 @@ class PoolArena extends SizeClasses implements PoolArenaMetric, AllocatorControl
assert s.doNotDestroy && s.elemSize == sizeIdx2size(sizeIdx);
long handle = s.allocate();
assert handle >= 0;
buffer = s.chunk.allocateBufferWithSubpage(handle, size, cache, control);
memory = s.chunk.allocateBufferWithSubpage(handle, size, cache, control);
}
}
if (needsNormalAllocation) {
synchronized (this) {
buffer = allocateNormal(size, sizeIdx, cache, control);
memory = allocateNormal(size, sizeIdx, cache, control);
}
}
incSmallAllocation();
return buffer;
return memory;
}
private Buffer tcacheAllocateNormal(PooledAllocatorControl control, PoolThreadCache cache, int size, int sizeIdx) {
Buffer buffer = cache.allocateNormal(this, control, size, sizeIdx);
if (buffer != null) {
private UntetheredMemory tcacheAllocateNormal(PooledAllocatorControl control, PoolThreadCache cache, int size, int sizeIdx) {
UntetheredMemory memory = cache.allocateNormal(this, control, size, sizeIdx);
if (memory != null) {
// was able to allocate out of the cache so move on
return buffer;
return memory;
}
synchronized (this) {
buffer = allocateNormal(size, sizeIdx, cache, control);
memory = allocateNormal(size, sizeIdx, cache, control);
allocationsNormal++;
}
return buffer;
return memory;
}
// Method must be called inside synchronized(this) { ... } block
private Buffer allocateNormal(int size, int sizeIdx, PoolThreadCache threadCache, PooledAllocatorControl control) {
Buffer buffer = q050.allocate(size, sizeIdx, threadCache, control);
if (buffer != null) {
return buffer;
private UntetheredMemory allocateNormal(int size, int sizeIdx, PoolThreadCache threadCache, PooledAllocatorControl control) {
UntetheredMemory memory = q050.allocate(size, sizeIdx, threadCache, control);
if (memory != null) {
return memory;
}
buffer = q025.allocate(size, sizeIdx, threadCache, control);
if (buffer != null) {
return buffer;
memory = q025.allocate(size, sizeIdx, threadCache, control);
if (memory != null) {
return memory;
}
buffer = q000.allocate(size, sizeIdx, threadCache, control);
if (buffer != null) {
return buffer;
memory = q000.allocate(size, sizeIdx, threadCache, control);
if (memory != null) {
return memory;
}
buffer = qInit.allocate(size, sizeIdx, threadCache, control);
if (buffer != null) {
return buffer;
memory = qInit.allocate(size, sizeIdx, threadCache, control);
if (memory != null) {
return memory;
}
buffer = q075.allocate(size, sizeIdx, threadCache, control);
if (buffer != null) {
return buffer;
memory = q075.allocate(size, sizeIdx, threadCache, control);
if (memory != null) {
return memory;
}
// Add a new chunk.
PoolChunk c = newChunk(pageSize, nPSizes, pageShifts, chunkSize);
buffer = c.allocate(size, sizeIdx, threadCache, control);
assert buffer != null;
memory = c.allocate(size, sizeIdx, threadCache, control);
assert memory != null;
qInit.add(c);
return buffer;
return memory;
}
private void incSmallAllocation() {
allocationsSmall.increment();
}
private Buffer allocateHuge(int size) {
private UntetheredMemory allocateHuge(int size) {
activeBytesHuge.add(size);
allocationsHuge.increment();
BufferAllocator allocator = isDirect()? BufferAllocator.direct() : BufferAllocator.heap();
return allocator.allocate(size);
return new UnpooledUnthetheredMemory(manager, size);
}
void free(PoolChunk chunk, long handle, int normCapacity, PoolThreadCache cache) {
@ -258,7 +257,7 @@ class PoolArena extends SizeClasses implements PoolArenaMetric, AllocatorControl
}
@Override
public Object allocateUntethered(Buffer originator, int size) {
public UntetheredMemory allocateUntethered(Buffer originator, int size) {
throw new AssertionError("PoolChunk base buffers should never need to reallocate.");
}

View File

@ -15,7 +15,9 @@
*/
package io.netty.buffer.api.pool;
import io.netty.buffer.api.AllocatorControl.UntetheredMemory;
import io.netty.buffer.api.Buffer;
import io.netty.buffer.api.Drop;
import java.util.PriorityQueue;
@ -262,7 +264,7 @@ final class PoolChunk implements PoolChunkMetric {
return 100 - freePercentage;
}
Buffer allocate(int size, int sizeIdx, PoolThreadCache cache, PooledAllocatorControl control) {
UntetheredMemory allocate(int size, int sizeIdx, PoolThreadCache cache, PooledAllocatorControl control) {
final long handle;
if (sizeIdx <= arena.smallMaxSizeIdx) {
// small
@ -513,21 +515,21 @@ final class PoolChunk implements PoolChunkMetric {
| (long) inUsed << IS_USED_SHIFT;
}
Buffer allocateBuffer(long handle, int size, PoolThreadCache threadCache, PooledAllocatorControl control) {
UntetheredMemory allocateBuffer(long handle, int size, PoolThreadCache threadCache, PooledAllocatorControl control) {
if (isRun(handle)) {
int offset = runOffset(handle) << pageShifts;
int maxLength = runSize(pageShifts, handle);
PoolThreadCache poolThreadCache = arena.parent.threadCache();
initAllocatorControl(control, poolThreadCache, handle, maxLength);
return arena.manager.recoverMemory(control, memory, offset, size,
new PooledDrop(control, arena, this, poolThreadCache, handle, maxLength));
return new UntetheredChunkAllocation(
memory, control, this, poolThreadCache, handle, maxLength, offset, size);
} else {
return allocateBufferWithSubpage(handle, size, threadCache, control);
}
}
Buffer allocateBufferWithSubpage(long handle, int size, PoolThreadCache threadCache,
PooledAllocatorControl control) {
UntetheredMemory allocateBufferWithSubpage(long handle, int size, PoolThreadCache threadCache,
PooledAllocatorControl control) {
int runOffset = runOffset(handle);
int bitmapIdx = bitmapIdx(handle);
@ -537,8 +539,42 @@ final class PoolChunk implements PoolChunkMetric {
int offset = (runOffset << pageShifts) + bitmapIdx * s.elemSize;
initAllocatorControl(control, threadCache, handle, s.elemSize);
return arena.manager.recoverMemory(control, memory, offset, size,
new PooledDrop(control, arena, this, threadCache, handle, s.elemSize));
return new UntetheredChunkAllocation(memory, control, this, threadCache, handle, s.elemSize, offset, size);
}
@SuppressWarnings("unchecked")
private static class UntetheredChunkAllocation implements UntetheredMemory {
private final Object memory;
private final PooledAllocatorControl control;
private final PoolChunk chunk;
private final PoolThreadCache threadCache;
private final long handle;
private final int maxLength;
private final int offset;
private final int size;
private UntetheredChunkAllocation(
Object memory, PooledAllocatorControl control, PoolChunk chunk, PoolThreadCache threadCache,
long handle, int maxLength, int offset, int size) {
this.memory = memory;
this.control = control;
this.chunk = chunk;
this.threadCache = threadCache;
this.handle = handle;
this.maxLength = maxLength;
this.offset = offset;
this.size = size;
}
@Override
public <Memory> Memory memory() {
return (Memory) chunk.arena.manager.sliceMemory(memory, offset, size);
}
@Override
public <BufferType extends Buffer> Drop<BufferType> drop() {
return (Drop<BufferType>) new PooledDrop(control, chunk.arena, chunk, threadCache, handle, maxLength);
}
}
private void initAllocatorControl(PooledAllocatorControl control, PoolThreadCache threadCache, long handle,

View File

@ -15,7 +15,7 @@
*/
package io.netty.buffer.api.pool;
import io.netty.buffer.api.Buffer;
import io.netty.buffer.api.AllocatorControl.UntetheredMemory;
import io.netty.util.internal.StringUtil;
import java.util.ArrayList;
@ -23,7 +23,8 @@ import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import static java.lang.Math.*;
import static java.lang.Math.max;
import static java.lang.Math.min;
final class PoolChunkList implements PoolChunkListMetric {
private static final Iterator<PoolChunkMetric> EMPTY_METRICS = Collections.emptyIterator();
@ -91,7 +92,7 @@ final class PoolChunkList implements PoolChunkListMetric {
this.prevList = prevList;
}
Buffer allocate(int size, int sizeIdx, PoolThreadCache threadCache, PooledAllocatorControl control) {
UntetheredMemory allocate(int size, int sizeIdx, PoolThreadCache threadCache, PooledAllocatorControl control) {
int normCapacity = arena.sizeIdx2size(sizeIdx);
if (normCapacity > maxCapacity) {
// Either this PoolChunkList is empty, or the requested capacity is larger than the capacity which can
@ -100,13 +101,13 @@ final class PoolChunkList implements PoolChunkListMetric {
}
for (PoolChunk cur = head; cur != null; cur = cur.next) {
Buffer buffer = cur.allocate(size, sizeIdx, threadCache, control);
if (buffer != null) {
UntetheredMemory memory = cur.allocate(size, sizeIdx, threadCache, control);
if (memory != null) {
if (cur.freeBytes <= freeMinThreshold) {
remove(cur);
nextList.add(cur);
}
return buffer;
return memory;
}
}
return null;

View File

@ -15,10 +15,7 @@
*/
package io.netty.buffer.api.pool;
import static io.netty.buffer.api.pool.PoolArena.SizeClass.*;
import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
import io.netty.buffer.api.Buffer;
import io.netty.buffer.api.AllocatorControl.UntetheredMemory;
import io.netty.buffer.api.pool.PoolArena.SizeClass;
import io.netty.util.internal.MathUtil;
import io.netty.util.internal.ObjectPool;
@ -31,6 +28,10 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import static io.netty.buffer.api.pool.PoolArena.SizeClass.Normal;
import static io.netty.buffer.api.pool.PoolArena.SizeClass.Small;
import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
/**
* Acts a Thread cache for allocations. This implementation is modelled after
* <a href="https://people.freebsd.org/~jasone/jemalloc/bsdcan2006/jemalloc.pdf">jemalloc</a> and the described
@ -121,23 +122,23 @@ final class PoolThreadCache {
/**
* Try to allocate a small buffer out of the cache. Returns {@code true} if successful {@code false} otherwise
*/
Buffer allocateSmall(PooledAllocatorControl control, int size, int sizeIdx) {
UntetheredMemory allocateSmall(PooledAllocatorControl control, int size, int sizeIdx) {
return allocate(cacheForSmall(sizeIdx), control, size);
}
/**
* Try to allocate a normal buffer out of the cache. Returns {@code true} if successful {@code false} otherwise
*/
Buffer allocateNormal(PoolArena area, PooledAllocatorControl control, int size, int sizeIdx) {
UntetheredMemory allocateNormal(PoolArena area, PooledAllocatorControl control, int size, int sizeIdx) {
return allocate(cacheForNormal(area, sizeIdx), control, size);
}
private Buffer allocate(MemoryRegionCache cache, PooledAllocatorControl control, int size) {
private UntetheredMemory allocate(MemoryRegionCache cache, PooledAllocatorControl control, int size) {
if (cache == null) {
// no cache found so just return false here
return null;
}
Buffer allocated = cache.allocate(size, this, control);
UntetheredMemory allocated = cache.allocate(size, this, control);
if (++allocations >= freeSweepAllocationThreshold) {
allocations = 0;
trim();
@ -160,14 +161,13 @@ final class PoolThreadCache {
}
private MemoryRegionCache cache(PoolArena area, int sizeIdx, SizeClass sizeClass) {
switch (sizeClass) {
case Normal:
if (sizeClass == Normal) {
return cacheForNormal(area, sizeIdx);
case Small:
return cacheForSmall(sizeIdx);
default:
throw new Error();
}
if (sizeClass == Small) {
return cacheForSmall(sizeIdx);
}
throw new AssertionError("Unexpected size class: " + sizeClass);
}
/**
@ -252,7 +252,7 @@ final class PoolThreadCache {
}
@Override
protected Buffer allocBuf(PoolChunk chunk, long handle, int size, PoolThreadCache threadCache,
protected UntetheredMemory allocBuf(PoolChunk chunk, long handle, int size, PoolThreadCache threadCache,
PooledAllocatorControl control) {
return chunk.allocateBufferWithSubpage(handle, size, threadCache, control);
}
@ -267,7 +267,7 @@ final class PoolThreadCache {
}
@Override
protected Buffer allocBuf(PoolChunk chunk, long handle, int size, PoolThreadCache threadCache,
protected UntetheredMemory allocBuf(PoolChunk chunk, long handle, int size, PoolThreadCache threadCache,
PooledAllocatorControl control) {
return chunk.allocateBuffer(handle, size, threadCache, control);
}
@ -286,9 +286,9 @@ final class PoolThreadCache {
}
/**
* Allocate a new {@link Buffer} using the provided chunk and handle with the capacity restrictions.
* Allocate a new {@link UntetheredMemory} using the provided chunk and handle with the capacity restrictions.
*/
protected abstract Buffer allocBuf(PoolChunk chunk, long handle, int size, PoolThreadCache threadCache,
protected abstract UntetheredMemory allocBuf(PoolChunk chunk, long handle, int size, PoolThreadCache threadCache,
PooledAllocatorControl control);
/**
@ -308,12 +308,12 @@ final class PoolThreadCache {
/**
* Allocate something out of the cache if possible and remove the entry from the cache.
*/
public final Buffer allocate(int size, PoolThreadCache threadCache, PooledAllocatorControl control) {
public final UntetheredMemory allocate(int size, PoolThreadCache threadCache, PooledAllocatorControl control) {
Entry entry = queue.poll();
if (entry == null) {
return null;
}
Buffer buffer = allocBuf(entry.chunk, entry.handle, size, threadCache, control);
UntetheredMemory buffer = allocBuf(entry.chunk, entry.handle, size, threadCache, control);
entry.recycle();
// allocations are not thread-safe which is fine as this is only called from the same thread all time.

View File

@ -27,9 +27,8 @@ class PooledAllocatorControl implements AllocatorControl {
public int updates;
@Override
public Object allocateUntethered(Buffer originator, int size) {
Buffer allocate = arena.parent.allocate(this, size);
return arena.manager.unwrapRecoverableMemory(allocate);
public UntetheredMemory allocateUntethered(Buffer originator, int size) {
return arena.parent.allocate(this, size);
}
@Override

View File

@ -15,9 +15,7 @@
*/
package io.netty.buffer.api.pool;
import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
import static java.util.Objects.requireNonNull;
import io.netty.buffer.api.AllocatorControl.UntetheredMemory;
import io.netty.buffer.api.Buffer;
import io.netty.buffer.api.BufferAllocator;
import io.netty.buffer.api.MemoryManager;
@ -38,6 +36,9 @@ import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
import static java.util.Objects.requireNonNull;
public class PooledBufferAllocator implements BufferAllocator, BufferAllocatorMetricProvider {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(PooledBufferAllocator.class);
@ -287,18 +288,24 @@ public class PooledBufferAllocator implements BufferAllocator, BufferAllocatorMe
if (size < 1) {
throw new IllegalArgumentException("Allocation size must be positive, but was " + size + '.');
}
return allocate(new PooledAllocatorControl(), size);
PooledAllocatorControl control = new PooledAllocatorControl();
UntetheredMemory memory = allocate(control, size);
Buffer buffer = manager.recoverMemory(control, memory.memory(), memory.drop());
return buffer.fill((byte) 0).order(ByteOrder.nativeOrder());
}
Buffer allocate(PooledAllocatorControl control, int size) {
UntetheredMemory allocate(PooledAllocatorControl control, int size) {
PoolThreadCache cache = threadCache.get();
PoolArena arena = cache.arena;
if (arena != null) {
return arena.allocate(control, cache, size).fill((byte) 0).order(ByteOrder.nativeOrder());
return arena.allocate(control, cache, size);
}
BufferAllocator unpooled = manager.isNative()? BufferAllocator.direct() : BufferAllocator.heap();
return unpooled.allocate(size);
return allocateUnpooled(size);
}
private UntetheredMemory allocateUnpooled(int size) {
return new UnpooledUnthetheredMemory(manager, size);
}
@Override

View File

@ -0,0 +1,44 @@
/*
* Copyright 2021 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.buffer.api.pool;
import io.netty.buffer.api.AllocatorControl;
import io.netty.buffer.api.Buffer;
import io.netty.buffer.api.Drop;
import io.netty.buffer.api.MemoryManager;
import io.netty.buffer.api.internal.Statics;
@SuppressWarnings("unchecked")
class UnpooledUnthetheredMemory implements AllocatorControl.UntetheredMemory {
private final MemoryManager manager;
private final Buffer buffer;
UnpooledUnthetheredMemory(MemoryManager manager, int size) {
this.manager = manager;
PooledAllocatorControl allocatorControl = new PooledAllocatorControl();
buffer = manager.allocateShared(allocatorControl, size, manager.drop(), Statics.CLEANER);
}
@Override
public <Memory> Memory memory() {
return (Memory) manager.unwrapRecoverableMemory(buffer);
}
@Override
public <BufferType extends Buffer> Drop<BufferType> drop() {
return (Drop<BufferType>) manager.drop();
}
}

View File

@ -447,7 +447,8 @@ class UnsafeBuffer extends RcSupport<Buffer, UnsafeBuffer> implements Buffer, Re
// Allocate a bigger buffer.
long newSize = capacity() + (long) Math.max(size - writableBytes(), minimumGrowth);
BufferAllocator.checkSize(newSize);
UnsafeMemory memory = (UnsafeMemory) control.allocateUntethered(this, (int) newSize);
var untethered = control.allocateUntethered(this, (int) newSize);
UnsafeMemory memory = untethered.memory();
// Copy contents.
try {
@ -458,17 +459,17 @@ class UnsafeBuffer extends RcSupport<Buffer, UnsafeBuffer> implements Buffer, Re
}
// Release the old memory, and install the new memory:
Drop<UnsafeBuffer> drop = disconnectDrop();
Drop<UnsafeBuffer> drop = untethered.drop();
disconnectDrop(drop);
attachNewMemory(memory, drop);
}
private Drop<UnsafeBuffer> disconnectDrop() {
private Drop<UnsafeBuffer> disconnectDrop(Drop<UnsafeBuffer> newDrop) {
var drop = (Drop<UnsafeBuffer>) unsafeGetDrop();
int roff = this.roff;
int woff = this.woff;
drop.drop(this);
drop = ArcDrop.unwrapAllArcs(drop);
unsafeSetDrop(new ArcDrop<>(drop));
unsafeSetDrop(new ArcDrop<>(newDrop));
this.roff = roff;
this.woff = woff;
return drop;

View File

@ -96,10 +96,7 @@ public class UnsafeMemoryManager implements MemoryManager {
}
@Override
public Buffer recoverMemory(AllocatorControl allocatorControl, Object recoverableMemoryBase,
int offset, int length, Drop<Buffer> drop) {
UnsafeMemory memory = (UnsafeMemory) recoverableMemoryBase;
memory = memory.slice(offset, length);
return new UnsafeBuffer(memory, 0, memory.size, allocatorControl, convert(drop));
public Object sliceMemory(Object memory, int offset, int length) {
return ((UnsafeMemory) memory).slice(offset, length);
}
}