diff --git a/src/main/java/io/netty/buffer/api/BufferAllocator.java b/src/main/java/io/netty/buffer/api/BufferAllocator.java index 9b24050..184c154 100644 --- a/src/main/java/io/netty/buffer/api/BufferAllocator.java +++ b/src/main/java/io/netty/buffer/api/BufferAllocator.java @@ -126,11 +126,9 @@ public interface BufferAllocator extends AutoCloseable { static BufferAllocator pooledHeap() { return new PooledBufferAllocator(MemoryManagers.getManagers().getHeapMemoryManager()); -// return new SizeClassedMemoryPool(MemoryManagers.getManagers().getHeapMemoryManager()); } static BufferAllocator pooledDirect() { return new PooledBufferAllocator(MemoryManagers.getManagers().getNativeMemoryManager()); -// return new SizeClassedMemoryPool(MemoryManagers.getManagers().getNativeMemoryManager()); } } diff --git a/src/main/java/io/netty/buffer/api/CleanerPooledDrop.java b/src/main/java/io/netty/buffer/api/CleanerPooledDrop.java deleted file mode 100644 index 35a0609..0000000 --- a/src/main/java/io/netty/buffer/api/CleanerPooledDrop.java +++ /dev/null @@ -1,115 +0,0 @@ -/* - * Copyright 2020 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.buffer.api; - -import java.lang.invoke.VarHandle; -import java.lang.ref.Cleaner.Cleanable; -import java.lang.ref.WeakReference; -import java.util.concurrent.atomic.AtomicBoolean; - -import static io.netty.buffer.api.internal.Statics.CLEANER; -import static io.netty.buffer.api.internal.Statics.findVarHandle; -import static java.lang.invoke.MethodHandles.lookup; - -class CleanerPooledDrop implements Drop { - private static final VarHandle CLEANABLE = - findVarHandle(lookup(), CleanerPooledDrop.class, "cleanable", GatedCleanable.class); - private final SizeClassedMemoryPool pool; - private final MemoryManager manager; - private final Drop delegate; - @SuppressWarnings("unused") - private volatile GatedCleanable cleanable; - - CleanerPooledDrop(SizeClassedMemoryPool pool, MemoryManager manager, - Drop delegate) { - this.pool = pool; - this.manager = manager; - this.delegate = delegate; - } - - @Override - public void drop(Buffer buf) { - GatedCleanable c = (GatedCleanable) CLEANABLE.getAndSet(this, null); - if (c != null) { - c.clean(); - delegate.drop(buf); - } - } - - @Override - public void attach(Buffer buf) { - // Unregister old cleanable, if any, to avoid uncontrolled build-up. - GatedCleanable c = (GatedCleanable) CLEANABLE.getAndSet(this, null); - if (c != null) { - c.disable(); - c.clean(); - } - - var mem = manager.unwrapRecoverableMemory(buf); - WeakReference ref = new WeakReference<>(this); - AtomicBoolean gate = new AtomicBoolean(true); - cleanable = new GatedCleanable(gate, CLEANER.register(this, new CleanAction(pool, mem, ref, gate))); - } - - @Override - public String toString() { - return "CleanerPooledDrop(" + delegate + ')'; - } - - private static final class CleanAction implements Runnable { - private final SizeClassedMemoryPool pool; - private final Object mem; - private final WeakReference ref; - private final AtomicBoolean gate; - - private CleanAction(SizeClassedMemoryPool pool, Object mem, WeakReference ref, - AtomicBoolean gate) { - this.pool = pool; - this.mem = mem; - this.ref = ref; - this.gate = gate; - } - - @Override - public void run() { - if (gate.getAndSet(false)) { - var monitored = ref.get(); - if (monitored == null) { - pool.recoverMemory(mem); - } - } - } - } - - private static final class GatedCleanable implements Cleanable { - private final AtomicBoolean gate; - private final Cleanable cleanable; - - GatedCleanable(AtomicBoolean gate, Cleanable cleanable) { - this.gate = gate; - this.cleanable = cleanable; - } - - public void disable() { - gate.set(false); - } - - @Override - public void clean() { - cleanable.clean(); - } - } -} diff --git a/src/main/java/io/netty/buffer/api/SizeClassedMemoryPool.java b/src/main/java/io/netty/buffer/api/SizeClassedMemoryPool.java deleted file mode 100644 index 8a8095b..0000000 --- a/src/main/java/io/netty/buffer/api/SizeClassedMemoryPool.java +++ /dev/null @@ -1,162 +0,0 @@ -/* - * Copyright 2020 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.buffer.api; - -import io.netty.buffer.api.internal.Statics; - -import java.lang.invoke.VarHandle; -import java.nio.ByteOrder; -import java.util.ArrayList; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.function.Supplier; - -import static io.netty.buffer.api.internal.Statics.NO_OP_DROP; -import static java.lang.invoke.MethodHandles.lookup; - -class SizeClassedMemoryPool implements BufferAllocator, AllocatorControl, Drop { - private static final VarHandle CLOSE = Statics.findVarHandle( - lookup(), SizeClassedMemoryPool.class, "closed", boolean.class); - private final MemoryManager manager; - private final ConcurrentHashMap> pool; - @SuppressWarnings("unused") - private volatile boolean closed; - - protected SizeClassedMemoryPool(MemoryManager manager) { - this.manager = manager; - pool = new ConcurrentHashMap<>(); - } - - @Override - public Buffer allocate(int size) { - BufferAllocator.checkSize(size); - var sizeClassPool = getSizeClassPool(size); - Object memory = sizeClassPool.poll(); - if (memory != null) { - return recoverMemoryIntoBuffer(memory) - .fill((byte) 0) - .order(ByteOrder.nativeOrder()); - } - return createBuf(size, getDrop()); - } - - @Override - public Supplier constBufferSupplier(byte[] bytes) { - Buffer constantBuffer = manager.allocateShared(this, bytes.length, manager.drop(), Statics.CLEANER); - constantBuffer.writeBytes(bytes).makeReadOnly(); - return () -> manager.allocateConstChild(constantBuffer); - } - - protected MemoryManager getMemoryManager() { - return manager; - } - - protected Buffer createBuf(int size, Drop drop) { - var buf = manager.allocateShared(this, size, drop, null); - drop.attach(buf); - return buf; - } - - protected Drop getDrop() { - return new CleanerPooledDrop(this, getMemoryManager(), this); - } - - @Override - public void close() { - if (CLOSE.compareAndSet(this, false, true)) { - var capturedExceptions = new ArrayList(4); - pool.forEach((k, v) -> { - Object memory; - while ((memory = v.poll()) != null) { - try { - manager.discardRecoverableMemory(memory); - } catch (Exception e) { - capturedExceptions.add(e); - } - } - }); - if (!capturedExceptions.isEmpty()) { - var exception = new ResourceDisposeFailedException(); - capturedExceptions.forEach(exception::addSuppressed); - throw exception; - } - } - } - - @Override - public void drop(Buffer buf) { - if (closed) { - manager.drop().drop(buf); - return; - } - Object mem = manager.unwrapRecoverableMemory(buf); - var sizeClassPool = getSizeClassPool(manager.capacityOfRecoverableMemory(mem)); - sizeClassPool.offer(mem); - if (closed) { - Object memory; - while ((memory = sizeClassPool.poll()) != null) { - manager.discardRecoverableMemory(memory); - } - } - } - - @Override - public String toString() { - return "SizeClassedMemoryPool"; - } - - @SuppressWarnings("unchecked") - @Override - public UntetheredMemory allocateUntethered(Buffer originator, int size) { - var sizeClassPool = getSizeClassPool(size); - Object candidateMemory = sizeClassPool.poll(); - if (candidateMemory == null) { - Buffer untetheredBuf = createBuf(size, NO_OP_DROP); - candidateMemory = manager.unwrapRecoverableMemory(untetheredBuf); - } - Object memory = candidateMemory; - - return new UntetheredMemory() { - - @Override - public Memory memory() { - return (Memory) memory; - } - - @Override - public Drop drop() { - return (Drop) getDrop(); - } - }; - } - - @Override - public void recoverMemory(Object memory) { - Buffer buf = recoverMemoryIntoBuffer(memory); - buf.close(); - } - - private Buffer recoverMemoryIntoBuffer(Object memory) { - var drop = getDrop(); - var buf = manager.recoverMemory(this, memory, drop); - drop.attach(buf); - return buf; - } - - private ConcurrentLinkedQueue getSizeClassPool(int size) { - return pool.computeIfAbsent(size, k -> new ConcurrentLinkedQueue<>()); - } -} diff --git a/src/main/java/io/netty/buffer/api/internal/ArcDrop.java b/src/main/java/io/netty/buffer/api/internal/ArcDrop.java index 3f98b68..b2acd10 100644 --- a/src/main/java/io/netty/buffer/api/internal/ArcDrop.java +++ b/src/main/java/io/netty/buffer/api/internal/ArcDrop.java @@ -46,13 +46,6 @@ public final class ArcDrop implements Drop { return new ArcDrop(drop); } - public static Drop unwrapAllArcs(Drop drop) { - while (drop instanceof ArcDrop) { - drop = ((ArcDrop) drop).unwrap(); - } - return drop; - } - public static Drop acquire(Drop drop) { if (drop.getClass() == ArcDrop.class) { ((ArcDrop) drop).increment(); @@ -103,7 +96,10 @@ public final class ArcDrop implements Drop { @Override public String toString() { - StringBuilder builder = new StringBuilder().append("ArcDrop(").append(count).append(", "); + StringBuilder builder = new StringBuilder() + .append("ArcDrop@") + .append(Integer.toHexString(System.identityHashCode(this))) + .append('(').append(count).append(", "); Drop drop = this; while ((drop = ((ArcDrop) drop).unwrap()) instanceof ArcDrop) { builder.append(((ArcDrop) drop).count).append(", "); diff --git a/src/main/java/io/netty/buffer/api/pool/PoolArena.java b/src/main/java/io/netty/buffer/api/pool/PoolArena.java index b3e87f5..5d5c085 100644 --- a/src/main/java/io/netty/buffer/api/pool/PoolArena.java +++ b/src/main/java/io/netty/buffer/api/pool/PoolArena.java @@ -18,7 +18,6 @@ package io.netty.buffer.api.pool; import io.netty.buffer.api.AllocatorControl; import io.netty.buffer.api.Buffer; import io.netty.buffer.api.MemoryManager; -import io.netty.buffer.api.internal.Statics; import io.netty.util.internal.StringUtil; import java.util.ArrayList; @@ -363,7 +362,6 @@ class PoolArena extends SizeClasses implements PoolArenaMetric, AllocatorControl @Override public long numActiveAllocations() { - long val = allocationsSmall.longValue() + allocationsHuge.longValue() - deallocationsHuge.longValue(); synchronized (this) { @@ -372,11 +370,6 @@ class PoolArena extends SizeClasses implements PoolArenaMetric, AllocatorControl return max(val, 0); } - @Override - public long numActiveTinyAllocations() { - return 0; - } - @Override public long numActiveSmallAllocations() { return max(numSmallAllocations() - numSmallDeallocations(), 0); @@ -410,10 +403,7 @@ class PoolArena extends SizeClasses implements PoolArenaMetric, AllocatorControl } protected final PoolChunk newChunk(int pageSize, int maxPageIdx, int pageShifts, int chunkSize) { - Buffer base = manager.allocateShared(this, chunkSize, manager.drop(), Statics.CLEANER); - Object memory = manager.unwrapRecoverableMemory(base); - return new PoolChunk( - this, base, memory, pageSize, pageShifts, chunkSize, maxPageIdx); + return new PoolChunk(this, pageSize, pageShifts, chunkSize, maxPageIdx); } @Override diff --git a/src/main/java/io/netty/buffer/api/pool/PoolArenaMetric.java b/src/main/java/io/netty/buffer/api/pool/PoolArenaMetric.java index bc4d51e..c5c2f26 100644 --- a/src/main/java/io/netty/buffer/api/pool/PoolArenaMetric.java +++ b/src/main/java/io/netty/buffer/api/pool/PoolArenaMetric.java @@ -108,14 +108,6 @@ public interface PoolArenaMetric extends SizeClassesMetric { */ long numActiveAllocations(); - /** - * Return the number of currently active tiny allocations. - * - * @deprecated Tiny allocations have been merged into small allocations. - */ - @Deprecated - long numActiveTinyAllocations(); - /** * Return the number of currently active small allocations. */ diff --git a/src/main/java/io/netty/buffer/api/pool/PoolChunk.java b/src/main/java/io/netty/buffer/api/pool/PoolChunk.java index 8e14ba0..78b07ed 100644 --- a/src/main/java/io/netty/buffer/api/pool/PoolChunk.java +++ b/src/main/java/io/netty/buffer/api/pool/PoolChunk.java @@ -18,7 +18,10 @@ package io.netty.buffer.api.pool; import io.netty.buffer.api.AllocatorControl.UntetheredMemory; import io.netty.buffer.api.Buffer; import io.netty.buffer.api.Drop; +import io.netty.buffer.api.MemoryManager; +import io.netty.buffer.api.internal.ArcDrop; import io.netty.buffer.api.internal.CleanerDrop; +import io.netty.buffer.api.internal.Statics; import java.util.PriorityQueue; @@ -145,6 +148,7 @@ final class PoolChunk implements PoolChunkMetric { final PoolArena arena; final Buffer base; // The buffer that is the source of the memory. Closing it will free the memory. final Object memory; + final Drop baseDrop; // An ArcDrop that manages references to the base Buffer. /** * store the first page and last page of each avail run @@ -171,11 +175,13 @@ final class PoolChunk implements PoolChunkMetric { PoolChunk prev; PoolChunk next; - PoolChunk(PoolArena arena, Buffer base, Object memory, int pageSize, int pageShifts, int chunkSize, + PoolChunk(PoolArena arena, int pageSize, int pageShifts, int chunkSize, int maxPageIdx) { this.arena = arena; - this.base = base; - this.memory = memory; + MemoryManager manager = arena.manager; + base = manager.allocateShared(arena, chunkSize, manager.drop(), Statics.CLEANER); + memory = manager.unwrapRecoverableMemory(base); + baseDrop = ArcDrop.wrap(Buffer::close); this.pageSize = pageSize; this.pageShifts = pageShifts; this.chunkSize = chunkSize; @@ -419,6 +425,7 @@ final class PoolChunk implements PoolChunkMetric { * @param handle handle to free */ void free(long handle, int normCapacity) { + baseDrop.drop(base); // Decrement reference count. if (isSubpage(handle)) { int sizeIdx = arena.size2SizeIdx(normCapacity); PoolSubpage head = arena.findSubpagePoolHead(sizeIdx); @@ -523,6 +530,7 @@ final class PoolChunk implements PoolChunkMetric { int maxLength = runSize(pageShifts, handle); PoolThreadCache poolThreadCache = arena.parent.threadCache(); initAllocatorControl(control, poolThreadCache, handle, maxLength); + ArcDrop.acquire(baseDrop); return new UntetheredChunkAllocation( memory, this, poolThreadCache, handle, maxLength, offset, size); } else { @@ -541,6 +549,7 @@ final class PoolChunk implements PoolChunkMetric { int offset = (runOffset << pageShifts) + bitmapIdx * s.elemSize; initAllocatorControl(control, threadCache, handle, s.elemSize); + ArcDrop.acquire(baseDrop); return new UntetheredChunkAllocation(memory, this, threadCache, handle, s.elemSize, offset, size); } @@ -620,7 +629,7 @@ final class PoolChunk implements PoolChunkMetric { } void destroy() { - base.close(); + baseDrop.drop(base); // Decrement reference count from the chunk (allocated buffers may keep the base alive) } static int runOffset(long handle) { diff --git a/src/main/java/io/netty/buffer/api/pool/PooledBufferAllocator.java b/src/main/java/io/netty/buffer/api/pool/PooledBufferAllocator.java index 211dc2b..8052f28 100644 --- a/src/main/java/io/netty/buffer/api/pool/PooledBufferAllocator.java +++ b/src/main/java/io/netty/buffer/api/pool/PooledBufferAllocator.java @@ -166,6 +166,7 @@ public class PooledBufferAllocator implements BufferAllocator, BufferAllocatorMe private final int smallCacheSize; private final int normalCacheSize; private final List arenaMetrics; + private final List arenaMetricsView; private final PoolThreadLocalCache threadCache; private final int chunkSize; private final PooledBufferAllocatorMetric metric; @@ -235,10 +236,12 @@ public class PooledBufferAllocator implements BufferAllocator, BufferAllocatorMe arenas[i] = arena; metrics.add(arena); } - arenaMetrics = Collections.unmodifiableList(metrics); + arenaMetrics = metrics; + arenaMetricsView = Collections.unmodifiableList(metrics); } else { arenas = null; - arenaMetrics = Collections.emptyList(); + arenaMetrics = new ArrayList<>(1); + arenaMetricsView = Collections.emptyList(); } metric = new PooledBufferAllocatorMetric(this); @@ -312,10 +315,15 @@ public class PooledBufferAllocator implements BufferAllocator, BufferAllocatorMe @Override public void close() { trimCurrentThreadCache(); - for (PoolArena arena : arenas) { - arena.close(); - } threadCache.remove(); + for (int i = 0, arenasLength = arenas.length; i < arenasLength; i++) { + PoolArena arena = arenas[i]; + if (arena != null) { + arena.close(); + arenas[i] = null; + } + } + arenaMetrics.clear(); } /** @@ -416,7 +424,7 @@ public class PooledBufferAllocator implements BufferAllocator, BufferAllocatorMe return cache; } // No caching so just use 0 as sizes. - return new PoolThreadCache(arena, 0, 0, 0, 0); + return new PoolThreadCache(null, 0, 0, 0, 0); } @Override @@ -450,7 +458,7 @@ public class PooledBufferAllocator implements BufferAllocator, BufferAllocatorMe * Return a {@link List} of all heap {@link PoolArenaMetric}s that are provided by this pool. */ List arenaMetrics() { - return arenaMetrics; + return arenaMetricsView; } /** diff --git a/src/test/java/io/netty/buffer/api/BufferTestSupport.java b/src/test/java/io/netty/buffer/api/BufferTestSupport.java index 184d698..a5a131b 100644 --- a/src/test/java/io/netty/buffer/api/BufferTestSupport.java +++ b/src/test/java/io/netty/buffer/api/BufferTestSupport.java @@ -167,12 +167,14 @@ public abstract class BufferTestSupport { // Add 2-way composite buffers of all combinations. for (Fixture first : initFixtures) { for (Fixture second : initFixtures) { - var a = first.get(); - var b = second.get(); builder.add(new Fixture("compose(" + first + ", " + second + ')', () -> { return new BufferAllocator() { + BufferAllocator a; + BufferAllocator b; @Override public Buffer allocate(int size) { + a = first.get(); + b = second.get(); int half = size / 2; try (Buffer firstHalf = a.allocate(half); Buffer secondHalf = b.allocate(size - half)) { @@ -182,8 +184,14 @@ public abstract class BufferTestSupport { @Override public void close() { - a.close(); - b.close(); + if (a != null) { + a.close(); + a = null; + } + if (b != null) { + b.close(); + b = null; + } } }; }, COMPOSITE)); @@ -193,9 +201,10 @@ public abstract class BufferTestSupport { // Also add a 3-way composite buffer. builder.add(new Fixture("compose(heap,heap,heap)", () -> { return new BufferAllocator() { - final BufferAllocator alloc = BufferAllocator.heap(); + BufferAllocator alloc; @Override public Buffer allocate(int size) { + alloc = BufferAllocator.heap(); int part = size / 3; try (Buffer a = alloc.allocate(part); Buffer b = alloc.allocate(part); @@ -206,17 +215,21 @@ public abstract class BufferTestSupport { @Override public void close() { - alloc.close(); + if (alloc != null) { + alloc.close(); + alloc = null; + } } }; }, COMPOSITE)); for (Fixture fixture : initFixtures) { builder.add(new Fixture(fixture + ".ensureWritable", () -> { - var allocator = fixture.createAllocator(); return new BufferAllocator() { + BufferAllocator allocator; @Override public Buffer allocate(int size) { + allocator = fixture.createAllocator(); if (size < 2) { return allocator.allocate(size); } @@ -227,15 +240,19 @@ public abstract class BufferTestSupport { @Override public void close() { - allocator.close(); + if (allocator != null) { + allocator.close(); + allocator = null; + } } }; }, fixture.getProperties())); builder.add(new Fixture(fixture + ".compose.ensureWritable", () -> { - var allocator = fixture.createAllocator(); return new BufferAllocator() { + BufferAllocator allocator; @Override public Buffer allocate(int size) { + allocator = fixture.createAllocator(); if (size < 2) { return allocator.allocate(size); } @@ -246,7 +263,10 @@ public abstract class BufferTestSupport { @Override public void close() { - allocator.close(); + if (allocator != null) { + allocator.close(); + allocator = null; + } } }; }, COMPOSITE)); @@ -261,10 +281,11 @@ public abstract class BufferTestSupport { Builder builder = Stream.builder(); builder.add(f); builder.add(new Fixture(f + ".split", () -> { - var allocatorBase = f.get(); return new BufferAllocator() { + BufferAllocator allocatorBase; @Override public Buffer allocate(int size) { + allocatorBase = f.get(); try (Buffer buf = allocatorBase.allocate(size + 1)) { buf.writerOffset(size); return buf.split().writerOffset(0); @@ -273,7 +294,10 @@ public abstract class BufferTestSupport { @Override public void close() { - allocatorBase.close(); + if (allocatorBase != null) { + allocatorBase.close(); + allocatorBase = null; + } } }; }, f.getProperties())); @@ -285,10 +309,11 @@ public abstract class BufferTestSupport { builder.add(f); var props = concat(f.getProperties(), Properties.SLICE); builder.add(new Fixture(f + ".slice(0, capacity())", () -> { - var allocatorBase = f.get(); return new BufferAllocator() { + BufferAllocator allocatorBase; @Override public Buffer allocate(int size) { + allocatorBase = f.get(); try (Buffer base = allocatorBase.allocate(size)) { return base.slice(0, base.capacity()).writerOffset(0); } @@ -296,15 +321,19 @@ public abstract class BufferTestSupport { @Override public void close() { - allocatorBase.close(); + if (allocatorBase != null) { + allocatorBase.close(); + allocatorBase = null; + } } }; }, props)); builder.add(new Fixture(f + ".slice(1, capacity() - 2)", () -> { - var allocatorBase = f.get(); return new BufferAllocator() { + BufferAllocator allocatorBase; @Override public Buffer allocate(int size) { + allocatorBase = f.get(); try (Buffer base = allocatorBase.allocate(size + 2)) { return base.slice(1, size).writerOffset(0); } @@ -312,7 +341,10 @@ public abstract class BufferTestSupport { @Override public void close() { - allocatorBase.close(); + if (allocatorBase != null) { + allocatorBase.close(); + allocatorBase = null; + } } }; }, props));