diff --git a/src/main/java/io/netty/buffer/api/AllocatorControl.java b/src/main/java/io/netty/buffer/api/AllocatorControl.java index 35fa502..7010e12 100644 --- a/src/main/java/io/netty/buffer/api/AllocatorControl.java +++ b/src/main/java/io/netty/buffer/api/AllocatorControl.java @@ -22,7 +22,7 @@ package io.netty.buffer.api; */ public interface AllocatorControl { /** - * Allocate a buffer that is not tethered to any particular {@link Drop} implementation, + * Allocate a buffer that is not tethered to any particular {@link Buffer} object, * and return the recoverable memory object from it. *

* This allows a buffer to implement {@link Buffer#ensureWritable(int)} by having new memory allocated to it, @@ -30,9 +30,9 @@ public interface AllocatorControl { * * @param originator The buffer that originated the request for an untethered memory allocated. * @param size The size of the requested memory allocation, in bytes. - * @return A "recoverable memory" object that is the requested allocation. + * @return A {@link UntetheredMemory} object that is the requested allocation. */ - Object allocateUntethered(Buffer originator, int size); + UntetheredMemory allocateUntethered(Buffer originator, int size); /** * Return memory to the allocator, after it has been untethered from it's lifetime. @@ -42,4 +42,12 @@ public interface AllocatorControl { * @param memory The untethered memory to return to the allocator. */ void recoverMemory(Object memory); + + /** + * Memory that isn't attached to any particular buffer. + */ + interface UntetheredMemory { + Memory memory(); + Drop drop(); + } } diff --git a/src/main/java/io/netty/buffer/api/ManagedBufferAllocator.java b/src/main/java/io/netty/buffer/api/ManagedBufferAllocator.java index 09ba957..a132772 100644 --- a/src/main/java/io/netty/buffer/api/ManagedBufferAllocator.java +++ b/src/main/java/io/netty/buffer/api/ManagedBufferAllocator.java @@ -41,11 +41,22 @@ class ManagedBufferAllocator implements BufferAllocator, AllocatorControl { return () -> manager.allocateConstChild(constantBuffer); } + @SuppressWarnings("unchecked") @Override - public Object allocateUntethered(Buffer originator, int size) { + public UntetheredMemory allocateUntethered(Buffer originator, int size) { BufferAllocator.checkSize(size); var buf = manager.allocateShared(this, size, NO_OP_DROP, Statics.CLEANER); - return manager.unwrapRecoverableMemory(buf); + return new UntetheredMemory() { + @Override + public Memory memory() { + return (Memory) manager.unwrapRecoverableMemory(buf); + } + + @Override + public Drop drop() { + return (Drop) manager.drop(); + } + }; } @Override diff --git a/src/main/java/io/netty/buffer/api/MemoryManager.java b/src/main/java/io/netty/buffer/api/MemoryManager.java index 81e72cf..730b0ca 100644 --- a/src/main/java/io/netty/buffer/api/MemoryManager.java +++ b/src/main/java/io/netty/buffer/api/MemoryManager.java @@ -27,6 +27,5 @@ public interface MemoryManager { void discardRecoverableMemory(Object recoverableMemory); // todo should recoverMemory re-attach a cleaner? Buffer recoverMemory(AllocatorControl allocatorControl, Object recoverableMemory, Drop drop); - Buffer recoverMemory(AllocatorControl allocatorControl, Object recoverableMemoryBase, - int offset, int length, Drop drop); + Object sliceMemory(Object memory, int offset, int length); } diff --git a/src/main/java/io/netty/buffer/api/SizeClassedMemoryPool.java b/src/main/java/io/netty/buffer/api/SizeClassedMemoryPool.java index a759639..8a8095b 100644 --- a/src/main/java/io/netty/buffer/api/SizeClassedMemoryPool.java +++ b/src/main/java/io/netty/buffer/api/SizeClassedMemoryPool.java @@ -118,15 +118,29 @@ class SizeClassedMemoryPool implements BufferAllocator, AllocatorControl, Drop Memory memory() { + return (Memory) memory; + } + + @Override + public Drop drop() { + return (Drop) getDrop(); + } + }; } @Override diff --git a/src/main/java/io/netty/buffer/api/bytebuffer/ByteBufferMemoryManager.java b/src/main/java/io/netty/buffer/api/bytebuffer/ByteBufferMemoryManager.java index 26c3017..bd7bfea 100644 --- a/src/main/java/io/netty/buffer/api/bytebuffer/ByteBufferMemoryManager.java +++ b/src/main/java/io/netty/buffer/api/bytebuffer/ByteBufferMemoryManager.java @@ -82,10 +82,8 @@ public class ByteBufferMemoryManager implements MemoryManager { } @Override - public Buffer recoverMemory(AllocatorControl allocatorControl, Object recoverableMemoryBase, - int offset, int length, Drop drop) { - ByteBuffer memory = (ByteBuffer) recoverableMemoryBase; - memory = memory.slice(offset, length); - return new NioBuffer(memory, memory, allocatorControl, convert(drop)); + public Object sliceMemory(Object memory, int offset, int length) { + var buffer = (ByteBuffer) memory; + return buffer.slice(offset, length); } } diff --git a/src/main/java/io/netty/buffer/api/bytebuffer/NioBuffer.java b/src/main/java/io/netty/buffer/api/bytebuffer/NioBuffer.java index e96f2d0..4364958 100644 --- a/src/main/java/io/netty/buffer/api/bytebuffer/NioBuffer.java +++ b/src/main/java/io/netty/buffer/api/bytebuffer/NioBuffer.java @@ -408,27 +408,27 @@ class NioBuffer extends RcSupport implements Buffer, Readable // Allocate a bigger buffer. long newSize = capacity() + (long) Math.max(size - writableBytes(), minimumGrowth); BufferAllocator.checkSize(newSize); - ByteBuffer buffer = (ByteBuffer) control.allocateUntethered(this, (int) newSize); + var untethered = control.allocateUntethered(this, (int) newSize); + ByteBuffer buffer = untethered.memory(); buffer.order(order()); // Copy contents. copyInto(0, buffer, 0, capacity()); // Release the old memory and install the new: - Drop drop = disconnectDrop(); + Drop drop = untethered.drop(); + disconnectDrop(drop); attachNewBuffer(buffer, drop); } - private Drop disconnectDrop() { + private void disconnectDrop(Drop newDrop) { var drop = (Drop) unsafeGetDrop(); int roff = this.roff; int woff = this.woff; drop.drop(this); - drop = ArcDrop.unwrapAllArcs(drop); - unsafeSetDrop(new ArcDrop<>(drop)); + unsafeSetDrop(new ArcDrop<>(newDrop)); this.roff = roff; this.woff = woff; - return drop; } private void attachNewBuffer(ByteBuffer buffer, Drop drop) { diff --git a/src/main/java/io/netty/buffer/api/memseg/AbstractMemorySegmentManager.java b/src/main/java/io/netty/buffer/api/memseg/AbstractMemorySegmentManager.java index 2cc7c82..1aabd96 100644 --- a/src/main/java/io/netty/buffer/api/memseg/AbstractMemorySegmentManager.java +++ b/src/main/java/io/netty/buffer/api/memseg/AbstractMemorySegmentManager.java @@ -75,10 +75,8 @@ public abstract class AbstractMemorySegmentManager implements MemoryManager { } @Override - public Buffer recoverMemory(AllocatorControl allocatorControl, Object recoverableMemoryBase, - int offset, int length, Drop drop) { - var segment = (MemorySegment) recoverableMemoryBase; - segment = segment.asSlice(offset, length); - return new MemSegBuffer(segment, segment, convert(ArcDrop.acquire(drop)), allocatorControl); + public Object sliceMemory(Object memory, int offset, int length) { + var segment = (MemorySegment) memory; + return segment.asSlice(offset, length); } } diff --git a/src/main/java/io/netty/buffer/api/memseg/MemSegBuffer.java b/src/main/java/io/netty/buffer/api/memseg/MemSegBuffer.java index 6e31e04..426f99b 100644 --- a/src/main/java/io/netty/buffer/api/memseg/MemSegBuffer.java +++ b/src/main/java/io/netty/buffer/api/memseg/MemSegBuffer.java @@ -524,32 +524,27 @@ class MemSegBuffer extends RcSupport implements Buffer, Re // Allocate a bigger buffer. long newSize = capacity() + (long) Math.max(size - writableBytes(), minimumGrowth); BufferAllocator.checkSize(newSize); - MemorySegment newSegment = (MemorySegment) control.allocateUntethered(this, (int) newSize); + var untethered = control.allocateUntethered(this, (int) newSize); + MemorySegment newSegment = untethered.memory(); // Copy contents. newSegment.copyFrom(seg); // Release the old memory segment and install the new one: - Drop drop = disconnectDrop(); + Drop drop = untethered.drop(); + disconnectDrop(drop); attachNewMemorySegment(newSegment, drop); } - private Drop disconnectDrop() { + private void disconnectDrop(Drop newDrop) { var drop = unsafeGetDrop(); - if (drop instanceof ArcDrop) { - // Disconnect from the current arc drop, since we'll get our own fresh memory segment. - int roff = this.roff; - int woff = this.woff; - drop.drop(this); - drop = ArcDrop.unwrapAllArcs(drop); - unsafeSetDrop(new ArcDrop<>(drop)); - this.roff = roff; - this.woff = woff; - } else { - // TODO would we ever get here? - control.recoverMemory(recoverableMemory()); - } - return drop; + // Disconnect from the current arc drop, since we'll get our own fresh memory segment. + int roff = this.roff; + int woff = this.woff; + drop.drop(this); + unsafeSetDrop(new ArcDrop<>(newDrop)); + this.roff = roff; + this.woff = woff; } private void attachNewMemorySegment(MemorySegment newSegment, Drop drop) { diff --git a/src/main/java/io/netty/buffer/api/pool/PoolArena.java b/src/main/java/io/netty/buffer/api/pool/PoolArena.java index 7362887..c4c97eb 100644 --- a/src/main/java/io/netty/buffer/api/pool/PoolArena.java +++ b/src/main/java/io/netty/buffer/api/pool/PoolArena.java @@ -114,7 +114,7 @@ class PoolArena extends SizeClasses implements PoolArenaMetric, AllocatorControl return manager.isNative(); } - Buffer allocate(PooledAllocatorControl control, PoolThreadCache cache, int size) { + UntetheredMemory allocate(PooledAllocatorControl control, PoolThreadCache cache, int size) { final int sizeIdx = size2SizeIdx(size); if (sizeIdx <= smallMaxSizeIdx) { @@ -129,12 +129,12 @@ class PoolArena extends SizeClasses implements PoolArenaMetric, AllocatorControl } } - private Buffer tcacheAllocateSmall(PooledAllocatorControl control, PoolThreadCache cache, final int size, - final int sizeIdx) { - Buffer buffer = cache.allocateSmall(control, size, sizeIdx); - if (buffer != null) { + private UntetheredMemory tcacheAllocateSmall(PooledAllocatorControl control, PoolThreadCache cache, final int size, + final int sizeIdx) { + UntetheredMemory memory = cache.allocateSmall(control, size, sizeIdx); + if (memory != null) { // was able to allocate out of the cache so move on - return buffer; + return memory; } /* @@ -150,73 +150,72 @@ class PoolArena extends SizeClasses implements PoolArenaMetric, AllocatorControl assert s.doNotDestroy && s.elemSize == sizeIdx2size(sizeIdx); long handle = s.allocate(); assert handle >= 0; - buffer = s.chunk.allocateBufferWithSubpage(handle, size, cache, control); + memory = s.chunk.allocateBufferWithSubpage(handle, size, cache, control); } } if (needsNormalAllocation) { synchronized (this) { - buffer = allocateNormal(size, sizeIdx, cache, control); + memory = allocateNormal(size, sizeIdx, cache, control); } } incSmallAllocation(); - return buffer; + return memory; } - private Buffer tcacheAllocateNormal(PooledAllocatorControl control, PoolThreadCache cache, int size, int sizeIdx) { - Buffer buffer = cache.allocateNormal(this, control, size, sizeIdx); - if (buffer != null) { + private UntetheredMemory tcacheAllocateNormal(PooledAllocatorControl control, PoolThreadCache cache, int size, int sizeIdx) { + UntetheredMemory memory = cache.allocateNormal(this, control, size, sizeIdx); + if (memory != null) { // was able to allocate out of the cache so move on - return buffer; + return memory; } synchronized (this) { - buffer = allocateNormal(size, sizeIdx, cache, control); + memory = allocateNormal(size, sizeIdx, cache, control); allocationsNormal++; } - return buffer; + return memory; } // Method must be called inside synchronized(this) { ... } block - private Buffer allocateNormal(int size, int sizeIdx, PoolThreadCache threadCache, PooledAllocatorControl control) { - Buffer buffer = q050.allocate(size, sizeIdx, threadCache, control); - if (buffer != null) { - return buffer; + private UntetheredMemory allocateNormal(int size, int sizeIdx, PoolThreadCache threadCache, PooledAllocatorControl control) { + UntetheredMemory memory = q050.allocate(size, sizeIdx, threadCache, control); + if (memory != null) { + return memory; } - buffer = q025.allocate(size, sizeIdx, threadCache, control); - if (buffer != null) { - return buffer; + memory = q025.allocate(size, sizeIdx, threadCache, control); + if (memory != null) { + return memory; } - buffer = q000.allocate(size, sizeIdx, threadCache, control); - if (buffer != null) { - return buffer; + memory = q000.allocate(size, sizeIdx, threadCache, control); + if (memory != null) { + return memory; } - buffer = qInit.allocate(size, sizeIdx, threadCache, control); - if (buffer != null) { - return buffer; + memory = qInit.allocate(size, sizeIdx, threadCache, control); + if (memory != null) { + return memory; } - buffer = q075.allocate(size, sizeIdx, threadCache, control); - if (buffer != null) { - return buffer; + memory = q075.allocate(size, sizeIdx, threadCache, control); + if (memory != null) { + return memory; } // Add a new chunk. PoolChunk c = newChunk(pageSize, nPSizes, pageShifts, chunkSize); - buffer = c.allocate(size, sizeIdx, threadCache, control); - assert buffer != null; + memory = c.allocate(size, sizeIdx, threadCache, control); + assert memory != null; qInit.add(c); - return buffer; + return memory; } private void incSmallAllocation() { allocationsSmall.increment(); } - private Buffer allocateHuge(int size) { + private UntetheredMemory allocateHuge(int size) { activeBytesHuge.add(size); allocationsHuge.increment(); - BufferAllocator allocator = isDirect()? BufferAllocator.direct() : BufferAllocator.heap(); - return allocator.allocate(size); + return new UnpooledUnthetheredMemory(manager, size); } void free(PoolChunk chunk, long handle, int normCapacity, PoolThreadCache cache) { @@ -258,7 +257,7 @@ class PoolArena extends SizeClasses implements PoolArenaMetric, AllocatorControl } @Override - public Object allocateUntethered(Buffer originator, int size) { + public UntetheredMemory allocateUntethered(Buffer originator, int size) { throw new AssertionError("PoolChunk base buffers should never need to reallocate."); } diff --git a/src/main/java/io/netty/buffer/api/pool/PoolChunk.java b/src/main/java/io/netty/buffer/api/pool/PoolChunk.java index 81aef2a..e09292e 100644 --- a/src/main/java/io/netty/buffer/api/pool/PoolChunk.java +++ b/src/main/java/io/netty/buffer/api/pool/PoolChunk.java @@ -15,7 +15,9 @@ */ package io.netty.buffer.api.pool; +import io.netty.buffer.api.AllocatorControl.UntetheredMemory; import io.netty.buffer.api.Buffer; +import io.netty.buffer.api.Drop; import java.util.PriorityQueue; @@ -262,7 +264,7 @@ final class PoolChunk implements PoolChunkMetric { return 100 - freePercentage; } - Buffer allocate(int size, int sizeIdx, PoolThreadCache cache, PooledAllocatorControl control) { + UntetheredMemory allocate(int size, int sizeIdx, PoolThreadCache cache, PooledAllocatorControl control) { final long handle; if (sizeIdx <= arena.smallMaxSizeIdx) { // small @@ -513,21 +515,21 @@ final class PoolChunk implements PoolChunkMetric { | (long) inUsed << IS_USED_SHIFT; } - Buffer allocateBuffer(long handle, int size, PoolThreadCache threadCache, PooledAllocatorControl control) { + UntetheredMemory allocateBuffer(long handle, int size, PoolThreadCache threadCache, PooledAllocatorControl control) { if (isRun(handle)) { int offset = runOffset(handle) << pageShifts; int maxLength = runSize(pageShifts, handle); PoolThreadCache poolThreadCache = arena.parent.threadCache(); initAllocatorControl(control, poolThreadCache, handle, maxLength); - return arena.manager.recoverMemory(control, memory, offset, size, - new PooledDrop(control, arena, this, poolThreadCache, handle, maxLength)); + return new UntetheredChunkAllocation( + memory, control, this, poolThreadCache, handle, maxLength, offset, size); } else { return allocateBufferWithSubpage(handle, size, threadCache, control); } } - Buffer allocateBufferWithSubpage(long handle, int size, PoolThreadCache threadCache, - PooledAllocatorControl control) { + UntetheredMemory allocateBufferWithSubpage(long handle, int size, PoolThreadCache threadCache, + PooledAllocatorControl control) { int runOffset = runOffset(handle); int bitmapIdx = bitmapIdx(handle); @@ -537,8 +539,42 @@ final class PoolChunk implements PoolChunkMetric { int offset = (runOffset << pageShifts) + bitmapIdx * s.elemSize; initAllocatorControl(control, threadCache, handle, s.elemSize); - return arena.manager.recoverMemory(control, memory, offset, size, - new PooledDrop(control, arena, this, threadCache, handle, s.elemSize)); + return new UntetheredChunkAllocation(memory, control, this, threadCache, handle, s.elemSize, offset, size); + } + + @SuppressWarnings("unchecked") + private static class UntetheredChunkAllocation implements UntetheredMemory { + private final Object memory; + private final PooledAllocatorControl control; + private final PoolChunk chunk; + private final PoolThreadCache threadCache; + private final long handle; + private final int maxLength; + private final int offset; + private final int size; + + private UntetheredChunkAllocation( + Object memory, PooledAllocatorControl control, PoolChunk chunk, PoolThreadCache threadCache, + long handle, int maxLength, int offset, int size) { + this.memory = memory; + this.control = control; + this.chunk = chunk; + this.threadCache = threadCache; + this.handle = handle; + this.maxLength = maxLength; + this.offset = offset; + this.size = size; + } + + @Override + public Memory memory() { + return (Memory) chunk.arena.manager.sliceMemory(memory, offset, size); + } + + @Override + public Drop drop() { + return (Drop) new PooledDrop(control, chunk.arena, chunk, threadCache, handle, maxLength); + } } private void initAllocatorControl(PooledAllocatorControl control, PoolThreadCache threadCache, long handle, diff --git a/src/main/java/io/netty/buffer/api/pool/PoolChunkList.java b/src/main/java/io/netty/buffer/api/pool/PoolChunkList.java index 6fe5bae..03347f6 100644 --- a/src/main/java/io/netty/buffer/api/pool/PoolChunkList.java +++ b/src/main/java/io/netty/buffer/api/pool/PoolChunkList.java @@ -15,7 +15,7 @@ */ package io.netty.buffer.api.pool; -import io.netty.buffer.api.Buffer; +import io.netty.buffer.api.AllocatorControl.UntetheredMemory; import io.netty.util.internal.StringUtil; import java.util.ArrayList; @@ -23,7 +23,8 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; -import static java.lang.Math.*; +import static java.lang.Math.max; +import static java.lang.Math.min; final class PoolChunkList implements PoolChunkListMetric { private static final Iterator EMPTY_METRICS = Collections.emptyIterator(); @@ -91,7 +92,7 @@ final class PoolChunkList implements PoolChunkListMetric { this.prevList = prevList; } - Buffer allocate(int size, int sizeIdx, PoolThreadCache threadCache, PooledAllocatorControl control) { + UntetheredMemory allocate(int size, int sizeIdx, PoolThreadCache threadCache, PooledAllocatorControl control) { int normCapacity = arena.sizeIdx2size(sizeIdx); if (normCapacity > maxCapacity) { // Either this PoolChunkList is empty, or the requested capacity is larger than the capacity which can @@ -100,13 +101,13 @@ final class PoolChunkList implements PoolChunkListMetric { } for (PoolChunk cur = head; cur != null; cur = cur.next) { - Buffer buffer = cur.allocate(size, sizeIdx, threadCache, control); - if (buffer != null) { + UntetheredMemory memory = cur.allocate(size, sizeIdx, threadCache, control); + if (memory != null) { if (cur.freeBytes <= freeMinThreshold) { remove(cur); nextList.add(cur); } - return buffer; + return memory; } } return null; diff --git a/src/main/java/io/netty/buffer/api/pool/PoolThreadCache.java b/src/main/java/io/netty/buffer/api/pool/PoolThreadCache.java index c74399d..14dc67d 100644 --- a/src/main/java/io/netty/buffer/api/pool/PoolThreadCache.java +++ b/src/main/java/io/netty/buffer/api/pool/PoolThreadCache.java @@ -15,10 +15,7 @@ */ package io.netty.buffer.api.pool; -import static io.netty.buffer.api.pool.PoolArena.SizeClass.*; -import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero; - -import io.netty.buffer.api.Buffer; +import io.netty.buffer.api.AllocatorControl.UntetheredMemory; import io.netty.buffer.api.pool.PoolArena.SizeClass; import io.netty.util.internal.MathUtil; import io.netty.util.internal.ObjectPool; @@ -31,6 +28,10 @@ import java.util.ArrayList; import java.util.List; import java.util.Queue; +import static io.netty.buffer.api.pool.PoolArena.SizeClass.Normal; +import static io.netty.buffer.api.pool.PoolArena.SizeClass.Small; +import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero; + /** * Acts a Thread cache for allocations. This implementation is modelled after * jemalloc and the described @@ -121,23 +122,23 @@ final class PoolThreadCache { /** * Try to allocate a small buffer out of the cache. Returns {@code true} if successful {@code false} otherwise */ - Buffer allocateSmall(PooledAllocatorControl control, int size, int sizeIdx) { + UntetheredMemory allocateSmall(PooledAllocatorControl control, int size, int sizeIdx) { return allocate(cacheForSmall(sizeIdx), control, size); } /** * Try to allocate a normal buffer out of the cache. Returns {@code true} if successful {@code false} otherwise */ - Buffer allocateNormal(PoolArena area, PooledAllocatorControl control, int size, int sizeIdx) { + UntetheredMemory allocateNormal(PoolArena area, PooledAllocatorControl control, int size, int sizeIdx) { return allocate(cacheForNormal(area, sizeIdx), control, size); } - private Buffer allocate(MemoryRegionCache cache, PooledAllocatorControl control, int size) { + private UntetheredMemory allocate(MemoryRegionCache cache, PooledAllocatorControl control, int size) { if (cache == null) { // no cache found so just return false here return null; } - Buffer allocated = cache.allocate(size, this, control); + UntetheredMemory allocated = cache.allocate(size, this, control); if (++allocations >= freeSweepAllocationThreshold) { allocations = 0; trim(); @@ -160,14 +161,13 @@ final class PoolThreadCache { } private MemoryRegionCache cache(PoolArena area, int sizeIdx, SizeClass sizeClass) { - switch (sizeClass) { - case Normal: + if (sizeClass == Normal) { return cacheForNormal(area, sizeIdx); - case Small: - return cacheForSmall(sizeIdx); - default: - throw new Error(); } + if (sizeClass == Small) { + return cacheForSmall(sizeIdx); + } + throw new AssertionError("Unexpected size class: " + sizeClass); } /** @@ -252,7 +252,7 @@ final class PoolThreadCache { } @Override - protected Buffer allocBuf(PoolChunk chunk, long handle, int size, PoolThreadCache threadCache, + protected UntetheredMemory allocBuf(PoolChunk chunk, long handle, int size, PoolThreadCache threadCache, PooledAllocatorControl control) { return chunk.allocateBufferWithSubpage(handle, size, threadCache, control); } @@ -267,7 +267,7 @@ final class PoolThreadCache { } @Override - protected Buffer allocBuf(PoolChunk chunk, long handle, int size, PoolThreadCache threadCache, + protected UntetheredMemory allocBuf(PoolChunk chunk, long handle, int size, PoolThreadCache threadCache, PooledAllocatorControl control) { return chunk.allocateBuffer(handle, size, threadCache, control); } @@ -286,9 +286,9 @@ final class PoolThreadCache { } /** - * Allocate a new {@link Buffer} using the provided chunk and handle with the capacity restrictions. + * Allocate a new {@link UntetheredMemory} using the provided chunk and handle with the capacity restrictions. */ - protected abstract Buffer allocBuf(PoolChunk chunk, long handle, int size, PoolThreadCache threadCache, + protected abstract UntetheredMemory allocBuf(PoolChunk chunk, long handle, int size, PoolThreadCache threadCache, PooledAllocatorControl control); /** @@ -308,12 +308,12 @@ final class PoolThreadCache { /** * Allocate something out of the cache if possible and remove the entry from the cache. */ - public final Buffer allocate(int size, PoolThreadCache threadCache, PooledAllocatorControl control) { + public final UntetheredMemory allocate(int size, PoolThreadCache threadCache, PooledAllocatorControl control) { Entry entry = queue.poll(); if (entry == null) { return null; } - Buffer buffer = allocBuf(entry.chunk, entry.handle, size, threadCache, control); + UntetheredMemory buffer = allocBuf(entry.chunk, entry.handle, size, threadCache, control); entry.recycle(); // allocations are not thread-safe which is fine as this is only called from the same thread all time. diff --git a/src/main/java/io/netty/buffer/api/pool/PooledAllocatorControl.java b/src/main/java/io/netty/buffer/api/pool/PooledAllocatorControl.java index a4e53ca..87917da 100644 --- a/src/main/java/io/netty/buffer/api/pool/PooledAllocatorControl.java +++ b/src/main/java/io/netty/buffer/api/pool/PooledAllocatorControl.java @@ -27,9 +27,8 @@ class PooledAllocatorControl implements AllocatorControl { public int updates; @Override - public Object allocateUntethered(Buffer originator, int size) { - Buffer allocate = arena.parent.allocate(this, size); - return arena.manager.unwrapRecoverableMemory(allocate); + public UntetheredMemory allocateUntethered(Buffer originator, int size) { + return arena.parent.allocate(this, size); } @Override diff --git a/src/main/java/io/netty/buffer/api/pool/PooledBufferAllocator.java b/src/main/java/io/netty/buffer/api/pool/PooledBufferAllocator.java index 29ffc9f..458318c 100644 --- a/src/main/java/io/netty/buffer/api/pool/PooledBufferAllocator.java +++ b/src/main/java/io/netty/buffer/api/pool/PooledBufferAllocator.java @@ -15,9 +15,7 @@ */ package io.netty.buffer.api.pool; -import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero; -import static java.util.Objects.requireNonNull; - +import io.netty.buffer.api.AllocatorControl.UntetheredMemory; import io.netty.buffer.api.Buffer; import io.netty.buffer.api.BufferAllocator; import io.netty.buffer.api.MemoryManager; @@ -38,6 +36,9 @@ import java.util.Collections; import java.util.List; import java.util.concurrent.TimeUnit; +import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero; +import static java.util.Objects.requireNonNull; + public class PooledBufferAllocator implements BufferAllocator, BufferAllocatorMetricProvider { private static final InternalLogger logger = InternalLoggerFactory.getInstance(PooledBufferAllocator.class); @@ -287,18 +288,24 @@ public class PooledBufferAllocator implements BufferAllocator, BufferAllocatorMe if (size < 1) { throw new IllegalArgumentException("Allocation size must be positive, but was " + size + '.'); } - return allocate(new PooledAllocatorControl(), size); + PooledAllocatorControl control = new PooledAllocatorControl(); + UntetheredMemory memory = allocate(control, size); + Buffer buffer = manager.recoverMemory(control, memory.memory(), memory.drop()); + return buffer.fill((byte) 0).order(ByteOrder.nativeOrder()); } - Buffer allocate(PooledAllocatorControl control, int size) { + UntetheredMemory allocate(PooledAllocatorControl control, int size) { PoolThreadCache cache = threadCache.get(); PoolArena arena = cache.arena; if (arena != null) { - return arena.allocate(control, cache, size).fill((byte) 0).order(ByteOrder.nativeOrder()); + return arena.allocate(control, cache, size); } - BufferAllocator unpooled = manager.isNative()? BufferAllocator.direct() : BufferAllocator.heap(); - return unpooled.allocate(size); + return allocateUnpooled(size); + } + + private UntetheredMemory allocateUnpooled(int size) { + return new UnpooledUnthetheredMemory(manager, size); } @Override diff --git a/src/main/java/io/netty/buffer/api/pool/UnpooledUnthetheredMemory.java b/src/main/java/io/netty/buffer/api/pool/UnpooledUnthetheredMemory.java new file mode 100644 index 0000000..0f537c8 --- /dev/null +++ b/src/main/java/io/netty/buffer/api/pool/UnpooledUnthetheredMemory.java @@ -0,0 +1,44 @@ +/* + * Copyright 2021 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.buffer.api.pool; + +import io.netty.buffer.api.AllocatorControl; +import io.netty.buffer.api.Buffer; +import io.netty.buffer.api.Drop; +import io.netty.buffer.api.MemoryManager; +import io.netty.buffer.api.internal.Statics; + +@SuppressWarnings("unchecked") +class UnpooledUnthetheredMemory implements AllocatorControl.UntetheredMemory { + private final MemoryManager manager; + private final Buffer buffer; + + UnpooledUnthetheredMemory(MemoryManager manager, int size) { + this.manager = manager; + PooledAllocatorControl allocatorControl = new PooledAllocatorControl(); + buffer = manager.allocateShared(allocatorControl, size, manager.drop(), Statics.CLEANER); + } + + @Override + public Memory memory() { + return (Memory) manager.unwrapRecoverableMemory(buffer); + } + + @Override + public Drop drop() { + return (Drop) manager.drop(); + } +} diff --git a/src/main/java/io/netty/buffer/api/unsafe/UnsafeBuffer.java b/src/main/java/io/netty/buffer/api/unsafe/UnsafeBuffer.java index e4b72aa..bdbe06c 100644 --- a/src/main/java/io/netty/buffer/api/unsafe/UnsafeBuffer.java +++ b/src/main/java/io/netty/buffer/api/unsafe/UnsafeBuffer.java @@ -447,7 +447,8 @@ class UnsafeBuffer extends RcSupport implements Buffer, Re // Allocate a bigger buffer. long newSize = capacity() + (long) Math.max(size - writableBytes(), minimumGrowth); BufferAllocator.checkSize(newSize); - UnsafeMemory memory = (UnsafeMemory) control.allocateUntethered(this, (int) newSize); + var untethered = control.allocateUntethered(this, (int) newSize); + UnsafeMemory memory = untethered.memory(); // Copy contents. try { @@ -458,17 +459,17 @@ class UnsafeBuffer extends RcSupport implements Buffer, Re } // Release the old memory, and install the new memory: - Drop drop = disconnectDrop(); + Drop drop = untethered.drop(); + disconnectDrop(drop); attachNewMemory(memory, drop); } - private Drop disconnectDrop() { + private Drop disconnectDrop(Drop newDrop) { var drop = (Drop) unsafeGetDrop(); int roff = this.roff; int woff = this.woff; drop.drop(this); - drop = ArcDrop.unwrapAllArcs(drop); - unsafeSetDrop(new ArcDrop<>(drop)); + unsafeSetDrop(new ArcDrop<>(newDrop)); this.roff = roff; this.woff = woff; return drop; diff --git a/src/main/java/io/netty/buffer/api/unsafe/UnsafeMemoryManager.java b/src/main/java/io/netty/buffer/api/unsafe/UnsafeMemoryManager.java index 1298795..bb51202 100644 --- a/src/main/java/io/netty/buffer/api/unsafe/UnsafeMemoryManager.java +++ b/src/main/java/io/netty/buffer/api/unsafe/UnsafeMemoryManager.java @@ -96,10 +96,7 @@ public class UnsafeMemoryManager implements MemoryManager { } @Override - public Buffer recoverMemory(AllocatorControl allocatorControl, Object recoverableMemoryBase, - int offset, int length, Drop drop) { - UnsafeMemory memory = (UnsafeMemory) recoverableMemoryBase; - memory = memory.slice(offset, length); - return new UnsafeBuffer(memory, 0, memory.size, allocatorControl, convert(drop)); + public Object sliceMemory(Object memory, int offset, int length) { + return ((UnsafeMemory) memory).slice(offset, length); } }