From 12b38234e5265fd7869dd19f21a5d8bb5fe55a6d Mon Sep 17 00:00:00 2001 From: Chris Vest Date: Mon, 17 May 2021 15:15:19 +0200 Subject: [PATCH] Make sure that every allocation get their own unique Drop instance. This allows the pooling allocator to precisely control how each allocation should be dropped. This is important to the pooling allocator, because it needs to know what arena, chunk, page, run, etc. is being freed, exactly. --- .../io/netty/buffer/api/AllocatorControl.java | 14 +++- .../buffer/api/ManagedBufferAllocator.java | 15 +++- .../io/netty/buffer/api/MemoryManager.java | 3 +- .../buffer/api/SizeClassedMemoryPool.java | 24 ++++-- .../bytebuffer/ByteBufferMemoryManager.java | 8 +- .../buffer/api/bytebuffer/NioBuffer.java | 12 +-- .../memseg/AbstractMemorySegmentManager.java | 8 +- .../netty/buffer/api/memseg/MemSegBuffer.java | 29 +++---- .../io/netty/buffer/api/pool/PoolArena.java | 75 +++++++++---------- .../io/netty/buffer/api/pool/PoolChunk.java | 52 +++++++++++-- .../netty/buffer/api/pool/PoolChunkList.java | 13 ++-- .../buffer/api/pool/PoolThreadCache.java | 40 +++++----- .../api/pool/PooledAllocatorControl.java | 5 +- .../api/pool/PooledBufferAllocator.java | 23 ++++-- .../api/pool/UnpooledUnthetheredMemory.java | 44 +++++++++++ .../netty/buffer/api/unsafe/UnsafeBuffer.java | 11 +-- .../api/unsafe/UnsafeMemoryManager.java | 7 +- 17 files changed, 245 insertions(+), 138 deletions(-) create mode 100644 src/main/java/io/netty/buffer/api/pool/UnpooledUnthetheredMemory.java diff --git a/src/main/java/io/netty/buffer/api/AllocatorControl.java b/src/main/java/io/netty/buffer/api/AllocatorControl.java index 35fa502..7010e12 100644 --- a/src/main/java/io/netty/buffer/api/AllocatorControl.java +++ b/src/main/java/io/netty/buffer/api/AllocatorControl.java @@ -22,7 +22,7 @@ package io.netty.buffer.api; */ public interface AllocatorControl { /** - * Allocate a buffer that is not tethered to any particular {@link Drop} implementation, + * Allocate a buffer that is not tethered to any particular {@link Buffer} object, * and return the recoverable memory object from it. *

* This allows a buffer to implement {@link Buffer#ensureWritable(int)} by having new memory allocated to it, @@ -30,9 +30,9 @@ public interface AllocatorControl { * * @param originator The buffer that originated the request for an untethered memory allocated. * @param size The size of the requested memory allocation, in bytes. - * @return A "recoverable memory" object that is the requested allocation. + * @return A {@link UntetheredMemory} object that is the requested allocation. */ - Object allocateUntethered(Buffer originator, int size); + UntetheredMemory allocateUntethered(Buffer originator, int size); /** * Return memory to the allocator, after it has been untethered from it's lifetime. @@ -42,4 +42,12 @@ public interface AllocatorControl { * @param memory The untethered memory to return to the allocator. */ void recoverMemory(Object memory); + + /** + * Memory that isn't attached to any particular buffer. + */ + interface UntetheredMemory { + Memory memory(); + Drop drop(); + } } diff --git a/src/main/java/io/netty/buffer/api/ManagedBufferAllocator.java b/src/main/java/io/netty/buffer/api/ManagedBufferAllocator.java index 09ba957..a132772 100644 --- a/src/main/java/io/netty/buffer/api/ManagedBufferAllocator.java +++ b/src/main/java/io/netty/buffer/api/ManagedBufferAllocator.java @@ -41,11 +41,22 @@ class ManagedBufferAllocator implements BufferAllocator, AllocatorControl { return () -> manager.allocateConstChild(constantBuffer); } + @SuppressWarnings("unchecked") @Override - public Object allocateUntethered(Buffer originator, int size) { + public UntetheredMemory allocateUntethered(Buffer originator, int size) { BufferAllocator.checkSize(size); var buf = manager.allocateShared(this, size, NO_OP_DROP, Statics.CLEANER); - return manager.unwrapRecoverableMemory(buf); + return new UntetheredMemory() { + @Override + public Memory memory() { + return (Memory) manager.unwrapRecoverableMemory(buf); + } + + @Override + public Drop drop() { + return (Drop) manager.drop(); + } + }; } @Override diff --git a/src/main/java/io/netty/buffer/api/MemoryManager.java b/src/main/java/io/netty/buffer/api/MemoryManager.java index 81e72cf..730b0ca 100644 --- a/src/main/java/io/netty/buffer/api/MemoryManager.java +++ b/src/main/java/io/netty/buffer/api/MemoryManager.java @@ -27,6 +27,5 @@ public interface MemoryManager { void discardRecoverableMemory(Object recoverableMemory); // todo should recoverMemory re-attach a cleaner? Buffer recoverMemory(AllocatorControl allocatorControl, Object recoverableMemory, Drop drop); - Buffer recoverMemory(AllocatorControl allocatorControl, Object recoverableMemoryBase, - int offset, int length, Drop drop); + Object sliceMemory(Object memory, int offset, int length); } diff --git a/src/main/java/io/netty/buffer/api/SizeClassedMemoryPool.java b/src/main/java/io/netty/buffer/api/SizeClassedMemoryPool.java index a759639..8a8095b 100644 --- a/src/main/java/io/netty/buffer/api/SizeClassedMemoryPool.java +++ b/src/main/java/io/netty/buffer/api/SizeClassedMemoryPool.java @@ -118,15 +118,29 @@ class SizeClassedMemoryPool implements BufferAllocator, AllocatorControl, Drop Memory memory() { + return (Memory) memory; + } + + @Override + public Drop drop() { + return (Drop) getDrop(); + } + }; } @Override diff --git a/src/main/java/io/netty/buffer/api/bytebuffer/ByteBufferMemoryManager.java b/src/main/java/io/netty/buffer/api/bytebuffer/ByteBufferMemoryManager.java index 26c3017..bd7bfea 100644 --- a/src/main/java/io/netty/buffer/api/bytebuffer/ByteBufferMemoryManager.java +++ b/src/main/java/io/netty/buffer/api/bytebuffer/ByteBufferMemoryManager.java @@ -82,10 +82,8 @@ public class ByteBufferMemoryManager implements MemoryManager { } @Override - public Buffer recoverMemory(AllocatorControl allocatorControl, Object recoverableMemoryBase, - int offset, int length, Drop drop) { - ByteBuffer memory = (ByteBuffer) recoverableMemoryBase; - memory = memory.slice(offset, length); - return new NioBuffer(memory, memory, allocatorControl, convert(drop)); + public Object sliceMemory(Object memory, int offset, int length) { + var buffer = (ByteBuffer) memory; + return buffer.slice(offset, length); } } diff --git a/src/main/java/io/netty/buffer/api/bytebuffer/NioBuffer.java b/src/main/java/io/netty/buffer/api/bytebuffer/NioBuffer.java index e96f2d0..4364958 100644 --- a/src/main/java/io/netty/buffer/api/bytebuffer/NioBuffer.java +++ b/src/main/java/io/netty/buffer/api/bytebuffer/NioBuffer.java @@ -408,27 +408,27 @@ class NioBuffer extends RcSupport implements Buffer, Readable // Allocate a bigger buffer. long newSize = capacity() + (long) Math.max(size - writableBytes(), minimumGrowth); BufferAllocator.checkSize(newSize); - ByteBuffer buffer = (ByteBuffer) control.allocateUntethered(this, (int) newSize); + var untethered = control.allocateUntethered(this, (int) newSize); + ByteBuffer buffer = untethered.memory(); buffer.order(order()); // Copy contents. copyInto(0, buffer, 0, capacity()); // Release the old memory and install the new: - Drop drop = disconnectDrop(); + Drop drop = untethered.drop(); + disconnectDrop(drop); attachNewBuffer(buffer, drop); } - private Drop disconnectDrop() { + private void disconnectDrop(Drop newDrop) { var drop = (Drop) unsafeGetDrop(); int roff = this.roff; int woff = this.woff; drop.drop(this); - drop = ArcDrop.unwrapAllArcs(drop); - unsafeSetDrop(new ArcDrop<>(drop)); + unsafeSetDrop(new ArcDrop<>(newDrop)); this.roff = roff; this.woff = woff; - return drop; } private void attachNewBuffer(ByteBuffer buffer, Drop drop) { diff --git a/src/main/java/io/netty/buffer/api/memseg/AbstractMemorySegmentManager.java b/src/main/java/io/netty/buffer/api/memseg/AbstractMemorySegmentManager.java index 2cc7c82..1aabd96 100644 --- a/src/main/java/io/netty/buffer/api/memseg/AbstractMemorySegmentManager.java +++ b/src/main/java/io/netty/buffer/api/memseg/AbstractMemorySegmentManager.java @@ -75,10 +75,8 @@ public abstract class AbstractMemorySegmentManager implements MemoryManager { } @Override - public Buffer recoverMemory(AllocatorControl allocatorControl, Object recoverableMemoryBase, - int offset, int length, Drop drop) { - var segment = (MemorySegment) recoverableMemoryBase; - segment = segment.asSlice(offset, length); - return new MemSegBuffer(segment, segment, convert(ArcDrop.acquire(drop)), allocatorControl); + public Object sliceMemory(Object memory, int offset, int length) { + var segment = (MemorySegment) memory; + return segment.asSlice(offset, length); } } diff --git a/src/main/java/io/netty/buffer/api/memseg/MemSegBuffer.java b/src/main/java/io/netty/buffer/api/memseg/MemSegBuffer.java index 6e31e04..426f99b 100644 --- a/src/main/java/io/netty/buffer/api/memseg/MemSegBuffer.java +++ b/src/main/java/io/netty/buffer/api/memseg/MemSegBuffer.java @@ -524,32 +524,27 @@ class MemSegBuffer extends RcSupport implements Buffer, Re // Allocate a bigger buffer. long newSize = capacity() + (long) Math.max(size - writableBytes(), minimumGrowth); BufferAllocator.checkSize(newSize); - MemorySegment newSegment = (MemorySegment) control.allocateUntethered(this, (int) newSize); + var untethered = control.allocateUntethered(this, (int) newSize); + MemorySegment newSegment = untethered.memory(); // Copy contents. newSegment.copyFrom(seg); // Release the old memory segment and install the new one: - Drop drop = disconnectDrop(); + Drop drop = untethered.drop(); + disconnectDrop(drop); attachNewMemorySegment(newSegment, drop); } - private Drop disconnectDrop() { + private void disconnectDrop(Drop newDrop) { var drop = unsafeGetDrop(); - if (drop instanceof ArcDrop) { - // Disconnect from the current arc drop, since we'll get our own fresh memory segment. - int roff = this.roff; - int woff = this.woff; - drop.drop(this); - drop = ArcDrop.unwrapAllArcs(drop); - unsafeSetDrop(new ArcDrop<>(drop)); - this.roff = roff; - this.woff = woff; - } else { - // TODO would we ever get here? - control.recoverMemory(recoverableMemory()); - } - return drop; + // Disconnect from the current arc drop, since we'll get our own fresh memory segment. + int roff = this.roff; + int woff = this.woff; + drop.drop(this); + unsafeSetDrop(new ArcDrop<>(newDrop)); + this.roff = roff; + this.woff = woff; } private void attachNewMemorySegment(MemorySegment newSegment, Drop drop) { diff --git a/src/main/java/io/netty/buffer/api/pool/PoolArena.java b/src/main/java/io/netty/buffer/api/pool/PoolArena.java index 7362887..c4c97eb 100644 --- a/src/main/java/io/netty/buffer/api/pool/PoolArena.java +++ b/src/main/java/io/netty/buffer/api/pool/PoolArena.java @@ -114,7 +114,7 @@ class PoolArena extends SizeClasses implements PoolArenaMetric, AllocatorControl return manager.isNative(); } - Buffer allocate(PooledAllocatorControl control, PoolThreadCache cache, int size) { + UntetheredMemory allocate(PooledAllocatorControl control, PoolThreadCache cache, int size) { final int sizeIdx = size2SizeIdx(size); if (sizeIdx <= smallMaxSizeIdx) { @@ -129,12 +129,12 @@ class PoolArena extends SizeClasses implements PoolArenaMetric, AllocatorControl } } - private Buffer tcacheAllocateSmall(PooledAllocatorControl control, PoolThreadCache cache, final int size, - final int sizeIdx) { - Buffer buffer = cache.allocateSmall(control, size, sizeIdx); - if (buffer != null) { + private UntetheredMemory tcacheAllocateSmall(PooledAllocatorControl control, PoolThreadCache cache, final int size, + final int sizeIdx) { + UntetheredMemory memory = cache.allocateSmall(control, size, sizeIdx); + if (memory != null) { // was able to allocate out of the cache so move on - return buffer; + return memory; } /* @@ -150,73 +150,72 @@ class PoolArena extends SizeClasses implements PoolArenaMetric, AllocatorControl assert s.doNotDestroy && s.elemSize == sizeIdx2size(sizeIdx); long handle = s.allocate(); assert handle >= 0; - buffer = s.chunk.allocateBufferWithSubpage(handle, size, cache, control); + memory = s.chunk.allocateBufferWithSubpage(handle, size, cache, control); } } if (needsNormalAllocation) { synchronized (this) { - buffer = allocateNormal(size, sizeIdx, cache, control); + memory = allocateNormal(size, sizeIdx, cache, control); } } incSmallAllocation(); - return buffer; + return memory; } - private Buffer tcacheAllocateNormal(PooledAllocatorControl control, PoolThreadCache cache, int size, int sizeIdx) { - Buffer buffer = cache.allocateNormal(this, control, size, sizeIdx); - if (buffer != null) { + private UntetheredMemory tcacheAllocateNormal(PooledAllocatorControl control, PoolThreadCache cache, int size, int sizeIdx) { + UntetheredMemory memory = cache.allocateNormal(this, control, size, sizeIdx); + if (memory != null) { // was able to allocate out of the cache so move on - return buffer; + return memory; } synchronized (this) { - buffer = allocateNormal(size, sizeIdx, cache, control); + memory = allocateNormal(size, sizeIdx, cache, control); allocationsNormal++; } - return buffer; + return memory; } // Method must be called inside synchronized(this) { ... } block - private Buffer allocateNormal(int size, int sizeIdx, PoolThreadCache threadCache, PooledAllocatorControl control) { - Buffer buffer = q050.allocate(size, sizeIdx, threadCache, control); - if (buffer != null) { - return buffer; + private UntetheredMemory allocateNormal(int size, int sizeIdx, PoolThreadCache threadCache, PooledAllocatorControl control) { + UntetheredMemory memory = q050.allocate(size, sizeIdx, threadCache, control); + if (memory != null) { + return memory; } - buffer = q025.allocate(size, sizeIdx, threadCache, control); - if (buffer != null) { - return buffer; + memory = q025.allocate(size, sizeIdx, threadCache, control); + if (memory != null) { + return memory; } - buffer = q000.allocate(size, sizeIdx, threadCache, control); - if (buffer != null) { - return buffer; + memory = q000.allocate(size, sizeIdx, threadCache, control); + if (memory != null) { + return memory; } - buffer = qInit.allocate(size, sizeIdx, threadCache, control); - if (buffer != null) { - return buffer; + memory = qInit.allocate(size, sizeIdx, threadCache, control); + if (memory != null) { + return memory; } - buffer = q075.allocate(size, sizeIdx, threadCache, control); - if (buffer != null) { - return buffer; + memory = q075.allocate(size, sizeIdx, threadCache, control); + if (memory != null) { + return memory; } // Add a new chunk. PoolChunk c = newChunk(pageSize, nPSizes, pageShifts, chunkSize); - buffer = c.allocate(size, sizeIdx, threadCache, control); - assert buffer != null; + memory = c.allocate(size, sizeIdx, threadCache, control); + assert memory != null; qInit.add(c); - return buffer; + return memory; } private void incSmallAllocation() { allocationsSmall.increment(); } - private Buffer allocateHuge(int size) { + private UntetheredMemory allocateHuge(int size) { activeBytesHuge.add(size); allocationsHuge.increment(); - BufferAllocator allocator = isDirect()? BufferAllocator.direct() : BufferAllocator.heap(); - return allocator.allocate(size); + return new UnpooledUnthetheredMemory(manager, size); } void free(PoolChunk chunk, long handle, int normCapacity, PoolThreadCache cache) { @@ -258,7 +257,7 @@ class PoolArena extends SizeClasses implements PoolArenaMetric, AllocatorControl } @Override - public Object allocateUntethered(Buffer originator, int size) { + public UntetheredMemory allocateUntethered(Buffer originator, int size) { throw new AssertionError("PoolChunk base buffers should never need to reallocate."); } diff --git a/src/main/java/io/netty/buffer/api/pool/PoolChunk.java b/src/main/java/io/netty/buffer/api/pool/PoolChunk.java index 81aef2a..e09292e 100644 --- a/src/main/java/io/netty/buffer/api/pool/PoolChunk.java +++ b/src/main/java/io/netty/buffer/api/pool/PoolChunk.java @@ -15,7 +15,9 @@ */ package io.netty.buffer.api.pool; +import io.netty.buffer.api.AllocatorControl.UntetheredMemory; import io.netty.buffer.api.Buffer; +import io.netty.buffer.api.Drop; import java.util.PriorityQueue; @@ -262,7 +264,7 @@ final class PoolChunk implements PoolChunkMetric { return 100 - freePercentage; } - Buffer allocate(int size, int sizeIdx, PoolThreadCache cache, PooledAllocatorControl control) { + UntetheredMemory allocate(int size, int sizeIdx, PoolThreadCache cache, PooledAllocatorControl control) { final long handle; if (sizeIdx <= arena.smallMaxSizeIdx) { // small @@ -513,21 +515,21 @@ final class PoolChunk implements PoolChunkMetric { | (long) inUsed << IS_USED_SHIFT; } - Buffer allocateBuffer(long handle, int size, PoolThreadCache threadCache, PooledAllocatorControl control) { + UntetheredMemory allocateBuffer(long handle, int size, PoolThreadCache threadCache, PooledAllocatorControl control) { if (isRun(handle)) { int offset = runOffset(handle) << pageShifts; int maxLength = runSize(pageShifts, handle); PoolThreadCache poolThreadCache = arena.parent.threadCache(); initAllocatorControl(control, poolThreadCache, handle, maxLength); - return arena.manager.recoverMemory(control, memory, offset, size, - new PooledDrop(control, arena, this, poolThreadCache, handle, maxLength)); + return new UntetheredChunkAllocation( + memory, control, this, poolThreadCache, handle, maxLength, offset, size); } else { return allocateBufferWithSubpage(handle, size, threadCache, control); } } - Buffer allocateBufferWithSubpage(long handle, int size, PoolThreadCache threadCache, - PooledAllocatorControl control) { + UntetheredMemory allocateBufferWithSubpage(long handle, int size, PoolThreadCache threadCache, + PooledAllocatorControl control) { int runOffset = runOffset(handle); int bitmapIdx = bitmapIdx(handle); @@ -537,8 +539,42 @@ final class PoolChunk implements PoolChunkMetric { int offset = (runOffset << pageShifts) + bitmapIdx * s.elemSize; initAllocatorControl(control, threadCache, handle, s.elemSize); - return arena.manager.recoverMemory(control, memory, offset, size, - new PooledDrop(control, arena, this, threadCache, handle, s.elemSize)); + return new UntetheredChunkAllocation(memory, control, this, threadCache, handle, s.elemSize, offset, size); + } + + @SuppressWarnings("unchecked") + private static class UntetheredChunkAllocation implements UntetheredMemory { + private final Object memory; + private final PooledAllocatorControl control; + private final PoolChunk chunk; + private final PoolThreadCache threadCache; + private final long handle; + private final int maxLength; + private final int offset; + private final int size; + + private UntetheredChunkAllocation( + Object memory, PooledAllocatorControl control, PoolChunk chunk, PoolThreadCache threadCache, + long handle, int maxLength, int offset, int size) { + this.memory = memory; + this.control = control; + this.chunk = chunk; + this.threadCache = threadCache; + this.handle = handle; + this.maxLength = maxLength; + this.offset = offset; + this.size = size; + } + + @Override + public Memory memory() { + return (Memory) chunk.arena.manager.sliceMemory(memory, offset, size); + } + + @Override + public Drop drop() { + return (Drop) new PooledDrop(control, chunk.arena, chunk, threadCache, handle, maxLength); + } } private void initAllocatorControl(PooledAllocatorControl control, PoolThreadCache threadCache, long handle, diff --git a/src/main/java/io/netty/buffer/api/pool/PoolChunkList.java b/src/main/java/io/netty/buffer/api/pool/PoolChunkList.java index 6fe5bae..03347f6 100644 --- a/src/main/java/io/netty/buffer/api/pool/PoolChunkList.java +++ b/src/main/java/io/netty/buffer/api/pool/PoolChunkList.java @@ -15,7 +15,7 @@ */ package io.netty.buffer.api.pool; -import io.netty.buffer.api.Buffer; +import io.netty.buffer.api.AllocatorControl.UntetheredMemory; import io.netty.util.internal.StringUtil; import java.util.ArrayList; @@ -23,7 +23,8 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; -import static java.lang.Math.*; +import static java.lang.Math.max; +import static java.lang.Math.min; final class PoolChunkList implements PoolChunkListMetric { private static final Iterator EMPTY_METRICS = Collections.emptyIterator(); @@ -91,7 +92,7 @@ final class PoolChunkList implements PoolChunkListMetric { this.prevList = prevList; } - Buffer allocate(int size, int sizeIdx, PoolThreadCache threadCache, PooledAllocatorControl control) { + UntetheredMemory allocate(int size, int sizeIdx, PoolThreadCache threadCache, PooledAllocatorControl control) { int normCapacity = arena.sizeIdx2size(sizeIdx); if (normCapacity > maxCapacity) { // Either this PoolChunkList is empty, or the requested capacity is larger than the capacity which can @@ -100,13 +101,13 @@ final class PoolChunkList implements PoolChunkListMetric { } for (PoolChunk cur = head; cur != null; cur = cur.next) { - Buffer buffer = cur.allocate(size, sizeIdx, threadCache, control); - if (buffer != null) { + UntetheredMemory memory = cur.allocate(size, sizeIdx, threadCache, control); + if (memory != null) { if (cur.freeBytes <= freeMinThreshold) { remove(cur); nextList.add(cur); } - return buffer; + return memory; } } return null; diff --git a/src/main/java/io/netty/buffer/api/pool/PoolThreadCache.java b/src/main/java/io/netty/buffer/api/pool/PoolThreadCache.java index c74399d..14dc67d 100644 --- a/src/main/java/io/netty/buffer/api/pool/PoolThreadCache.java +++ b/src/main/java/io/netty/buffer/api/pool/PoolThreadCache.java @@ -15,10 +15,7 @@ */ package io.netty.buffer.api.pool; -import static io.netty.buffer.api.pool.PoolArena.SizeClass.*; -import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero; - -import io.netty.buffer.api.Buffer; +import io.netty.buffer.api.AllocatorControl.UntetheredMemory; import io.netty.buffer.api.pool.PoolArena.SizeClass; import io.netty.util.internal.MathUtil; import io.netty.util.internal.ObjectPool; @@ -31,6 +28,10 @@ import java.util.ArrayList; import java.util.List; import java.util.Queue; +import static io.netty.buffer.api.pool.PoolArena.SizeClass.Normal; +import static io.netty.buffer.api.pool.PoolArena.SizeClass.Small; +import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero; + /** * Acts a Thread cache for allocations. This implementation is modelled after * jemalloc and the described @@ -121,23 +122,23 @@ final class PoolThreadCache { /** * Try to allocate a small buffer out of the cache. Returns {@code true} if successful {@code false} otherwise */ - Buffer allocateSmall(PooledAllocatorControl control, int size, int sizeIdx) { + UntetheredMemory allocateSmall(PooledAllocatorControl control, int size, int sizeIdx) { return allocate(cacheForSmall(sizeIdx), control, size); } /** * Try to allocate a normal buffer out of the cache. Returns {@code true} if successful {@code false} otherwise */ - Buffer allocateNormal(PoolArena area, PooledAllocatorControl control, int size, int sizeIdx) { + UntetheredMemory allocateNormal(PoolArena area, PooledAllocatorControl control, int size, int sizeIdx) { return allocate(cacheForNormal(area, sizeIdx), control, size); } - private Buffer allocate(MemoryRegionCache cache, PooledAllocatorControl control, int size) { + private UntetheredMemory allocate(MemoryRegionCache cache, PooledAllocatorControl control, int size) { if (cache == null) { // no cache found so just return false here return null; } - Buffer allocated = cache.allocate(size, this, control); + UntetheredMemory allocated = cache.allocate(size, this, control); if (++allocations >= freeSweepAllocationThreshold) { allocations = 0; trim(); @@ -160,14 +161,13 @@ final class PoolThreadCache { } private MemoryRegionCache cache(PoolArena area, int sizeIdx, SizeClass sizeClass) { - switch (sizeClass) { - case Normal: + if (sizeClass == Normal) { return cacheForNormal(area, sizeIdx); - case Small: - return cacheForSmall(sizeIdx); - default: - throw new Error(); } + if (sizeClass == Small) { + return cacheForSmall(sizeIdx); + } + throw new AssertionError("Unexpected size class: " + sizeClass); } /** @@ -252,7 +252,7 @@ final class PoolThreadCache { } @Override - protected Buffer allocBuf(PoolChunk chunk, long handle, int size, PoolThreadCache threadCache, + protected UntetheredMemory allocBuf(PoolChunk chunk, long handle, int size, PoolThreadCache threadCache, PooledAllocatorControl control) { return chunk.allocateBufferWithSubpage(handle, size, threadCache, control); } @@ -267,7 +267,7 @@ final class PoolThreadCache { } @Override - protected Buffer allocBuf(PoolChunk chunk, long handle, int size, PoolThreadCache threadCache, + protected UntetheredMemory allocBuf(PoolChunk chunk, long handle, int size, PoolThreadCache threadCache, PooledAllocatorControl control) { return chunk.allocateBuffer(handle, size, threadCache, control); } @@ -286,9 +286,9 @@ final class PoolThreadCache { } /** - * Allocate a new {@link Buffer} using the provided chunk and handle with the capacity restrictions. + * Allocate a new {@link UntetheredMemory} using the provided chunk and handle with the capacity restrictions. */ - protected abstract Buffer allocBuf(PoolChunk chunk, long handle, int size, PoolThreadCache threadCache, + protected abstract UntetheredMemory allocBuf(PoolChunk chunk, long handle, int size, PoolThreadCache threadCache, PooledAllocatorControl control); /** @@ -308,12 +308,12 @@ final class PoolThreadCache { /** * Allocate something out of the cache if possible and remove the entry from the cache. */ - public final Buffer allocate(int size, PoolThreadCache threadCache, PooledAllocatorControl control) { + public final UntetheredMemory allocate(int size, PoolThreadCache threadCache, PooledAllocatorControl control) { Entry entry = queue.poll(); if (entry == null) { return null; } - Buffer buffer = allocBuf(entry.chunk, entry.handle, size, threadCache, control); + UntetheredMemory buffer = allocBuf(entry.chunk, entry.handle, size, threadCache, control); entry.recycle(); // allocations are not thread-safe which is fine as this is only called from the same thread all time. diff --git a/src/main/java/io/netty/buffer/api/pool/PooledAllocatorControl.java b/src/main/java/io/netty/buffer/api/pool/PooledAllocatorControl.java index a4e53ca..87917da 100644 --- a/src/main/java/io/netty/buffer/api/pool/PooledAllocatorControl.java +++ b/src/main/java/io/netty/buffer/api/pool/PooledAllocatorControl.java @@ -27,9 +27,8 @@ class PooledAllocatorControl implements AllocatorControl { public int updates; @Override - public Object allocateUntethered(Buffer originator, int size) { - Buffer allocate = arena.parent.allocate(this, size); - return arena.manager.unwrapRecoverableMemory(allocate); + public UntetheredMemory allocateUntethered(Buffer originator, int size) { + return arena.parent.allocate(this, size); } @Override diff --git a/src/main/java/io/netty/buffer/api/pool/PooledBufferAllocator.java b/src/main/java/io/netty/buffer/api/pool/PooledBufferAllocator.java index 29ffc9f..458318c 100644 --- a/src/main/java/io/netty/buffer/api/pool/PooledBufferAllocator.java +++ b/src/main/java/io/netty/buffer/api/pool/PooledBufferAllocator.java @@ -15,9 +15,7 @@ */ package io.netty.buffer.api.pool; -import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero; -import static java.util.Objects.requireNonNull; - +import io.netty.buffer.api.AllocatorControl.UntetheredMemory; import io.netty.buffer.api.Buffer; import io.netty.buffer.api.BufferAllocator; import io.netty.buffer.api.MemoryManager; @@ -38,6 +36,9 @@ import java.util.Collections; import java.util.List; import java.util.concurrent.TimeUnit; +import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero; +import static java.util.Objects.requireNonNull; + public class PooledBufferAllocator implements BufferAllocator, BufferAllocatorMetricProvider { private static final InternalLogger logger = InternalLoggerFactory.getInstance(PooledBufferAllocator.class); @@ -287,18 +288,24 @@ public class PooledBufferAllocator implements BufferAllocator, BufferAllocatorMe if (size < 1) { throw new IllegalArgumentException("Allocation size must be positive, but was " + size + '.'); } - return allocate(new PooledAllocatorControl(), size); + PooledAllocatorControl control = new PooledAllocatorControl(); + UntetheredMemory memory = allocate(control, size); + Buffer buffer = manager.recoverMemory(control, memory.memory(), memory.drop()); + return buffer.fill((byte) 0).order(ByteOrder.nativeOrder()); } - Buffer allocate(PooledAllocatorControl control, int size) { + UntetheredMemory allocate(PooledAllocatorControl control, int size) { PoolThreadCache cache = threadCache.get(); PoolArena arena = cache.arena; if (arena != null) { - return arena.allocate(control, cache, size).fill((byte) 0).order(ByteOrder.nativeOrder()); + return arena.allocate(control, cache, size); } - BufferAllocator unpooled = manager.isNative()? BufferAllocator.direct() : BufferAllocator.heap(); - return unpooled.allocate(size); + return allocateUnpooled(size); + } + + private UntetheredMemory allocateUnpooled(int size) { + return new UnpooledUnthetheredMemory(manager, size); } @Override diff --git a/src/main/java/io/netty/buffer/api/pool/UnpooledUnthetheredMemory.java b/src/main/java/io/netty/buffer/api/pool/UnpooledUnthetheredMemory.java new file mode 100644 index 0000000..0f537c8 --- /dev/null +++ b/src/main/java/io/netty/buffer/api/pool/UnpooledUnthetheredMemory.java @@ -0,0 +1,44 @@ +/* + * Copyright 2021 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.buffer.api.pool; + +import io.netty.buffer.api.AllocatorControl; +import io.netty.buffer.api.Buffer; +import io.netty.buffer.api.Drop; +import io.netty.buffer.api.MemoryManager; +import io.netty.buffer.api.internal.Statics; + +@SuppressWarnings("unchecked") +class UnpooledUnthetheredMemory implements AllocatorControl.UntetheredMemory { + private final MemoryManager manager; + private final Buffer buffer; + + UnpooledUnthetheredMemory(MemoryManager manager, int size) { + this.manager = manager; + PooledAllocatorControl allocatorControl = new PooledAllocatorControl(); + buffer = manager.allocateShared(allocatorControl, size, manager.drop(), Statics.CLEANER); + } + + @Override + public Memory memory() { + return (Memory) manager.unwrapRecoverableMemory(buffer); + } + + @Override + public Drop drop() { + return (Drop) manager.drop(); + } +} diff --git a/src/main/java/io/netty/buffer/api/unsafe/UnsafeBuffer.java b/src/main/java/io/netty/buffer/api/unsafe/UnsafeBuffer.java index e4b72aa..bdbe06c 100644 --- a/src/main/java/io/netty/buffer/api/unsafe/UnsafeBuffer.java +++ b/src/main/java/io/netty/buffer/api/unsafe/UnsafeBuffer.java @@ -447,7 +447,8 @@ class UnsafeBuffer extends RcSupport implements Buffer, Re // Allocate a bigger buffer. long newSize = capacity() + (long) Math.max(size - writableBytes(), minimumGrowth); BufferAllocator.checkSize(newSize); - UnsafeMemory memory = (UnsafeMemory) control.allocateUntethered(this, (int) newSize); + var untethered = control.allocateUntethered(this, (int) newSize); + UnsafeMemory memory = untethered.memory(); // Copy contents. try { @@ -458,17 +459,17 @@ class UnsafeBuffer extends RcSupport implements Buffer, Re } // Release the old memory, and install the new memory: - Drop drop = disconnectDrop(); + Drop drop = untethered.drop(); + disconnectDrop(drop); attachNewMemory(memory, drop); } - private Drop disconnectDrop() { + private Drop disconnectDrop(Drop newDrop) { var drop = (Drop) unsafeGetDrop(); int roff = this.roff; int woff = this.woff; drop.drop(this); - drop = ArcDrop.unwrapAllArcs(drop); - unsafeSetDrop(new ArcDrop<>(drop)); + unsafeSetDrop(new ArcDrop<>(newDrop)); this.roff = roff; this.woff = woff; return drop; diff --git a/src/main/java/io/netty/buffer/api/unsafe/UnsafeMemoryManager.java b/src/main/java/io/netty/buffer/api/unsafe/UnsafeMemoryManager.java index 1298795..bb51202 100644 --- a/src/main/java/io/netty/buffer/api/unsafe/UnsafeMemoryManager.java +++ b/src/main/java/io/netty/buffer/api/unsafe/UnsafeMemoryManager.java @@ -96,10 +96,7 @@ public class UnsafeMemoryManager implements MemoryManager { } @Override - public Buffer recoverMemory(AllocatorControl allocatorControl, Object recoverableMemoryBase, - int offset, int length, Drop drop) { - UnsafeMemory memory = (UnsafeMemory) recoverableMemoryBase; - memory = memory.slice(offset, length); - return new UnsafeBuffer(memory, 0, memory.size, allocatorControl, convert(drop)); + public Object sliceMemory(Object memory, int offset, int length) { + return ((UnsafeMemory) memory).slice(offset, length); } }