From bb2264ac5b38ac0355274608a4a2a0989f47c41d Mon Sep 17 00:00:00 2001 From: Chris Vest Date: Thu, 10 Dec 2020 12:51:18 +0100 Subject: [PATCH] Address review comments on bifurcate PR --- .../io/netty/buffer/api/CompositeBuf.java | 10 ++-- src/main/java/io/netty/buffer/api/Drop.java | 3 +- .../buffer/api/NativeMemoryCleanerDrop.java | 2 +- .../java/io/netty/buffer/api/RcSupport.java | 3 +- .../buffer/api/SizeClassedMemoryPool.java | 4 +- .../io/netty/buffer/api/TransferSend.java | 2 +- .../buffer/api/memseg/BifurcatedDrop.java | 50 +++++++++++++------ .../io/netty/buffer/api/memseg/MemSegBuf.java | 4 +- 8 files changed, 50 insertions(+), 28 deletions(-) diff --git a/src/main/java/io/netty/buffer/api/CompositeBuf.java b/src/main/java/io/netty/buffer/api/CompositeBuf.java index 22c222c..7d2577e 100644 --- a/src/main/java/io/netty/buffer/api/CompositeBuf.java +++ b/src/main/java/io/netty/buffer/api/CompositeBuf.java @@ -130,9 +130,11 @@ final class CompositeBuf extends RcSupport implements Buf { @Override public Buf order(ByteOrder order) { - this.order = order; - for (Buf buf : bufs) { - buf.order(order); + if (this.order != order) { + this.order = order; + for (Buf buf : bufs) { + buf.order(order); + } } return this; } @@ -891,7 +893,7 @@ final class CompositeBuf extends RcSupport implements Buf { received[i] = sends[i].receive(); } var composite = new CompositeBuf(allocator, true, received, drop); - drop.reconnect(composite); + drop.attach(composite); return composite; } }; diff --git a/src/main/java/io/netty/buffer/api/Drop.java b/src/main/java/io/netty/buffer/api/Drop.java index de949c5..1658c7e 100644 --- a/src/main/java/io/netty/buffer/api/Drop.java +++ b/src/main/java/io/netty/buffer/api/Drop.java @@ -21,6 +21,7 @@ package io.netty.buffer.api; * * @param */ +@FunctionalInterface public interface Drop { /** * Dispose of the resources in the given Rc. @@ -34,6 +35,6 @@ public interface Drop { * * @param obj The new Rc instance with the new owner. */ - default void reconnect(T obj) { + default void attach(T obj) { } } diff --git a/src/main/java/io/netty/buffer/api/NativeMemoryCleanerDrop.java b/src/main/java/io/netty/buffer/api/NativeMemoryCleanerDrop.java index 3f1321a..1f0a9ef 100644 --- a/src/main/java/io/netty/buffer/api/NativeMemoryCleanerDrop.java +++ b/src/main/java/io/netty/buffer/api/NativeMemoryCleanerDrop.java @@ -49,7 +49,7 @@ class NativeMemoryCleanerDrop implements Drop { } @Override - public void reconnect(Buf buf) { + public void attach(Buf buf) { // Unregister old cleanable, if any, to avoid uncontrolled build-up. GatedCleanable c = (GatedCleanable) CLEANABLE.getAndSet(this, null); if (c != null) { diff --git a/src/main/java/io/netty/buffer/api/RcSupport.java b/src/main/java/io/netty/buffer/api/RcSupport.java index dd39e83..4e59120 100644 --- a/src/main/java/io/netty/buffer/api/RcSupport.java +++ b/src/main/java/io/netty/buffer/api/RcSupport.java @@ -118,8 +118,7 @@ public abstract class RcSupport, T extends RcSupport> impl } protected Drop unsafeExchangeDrop(Drop replacement) { - Objects.requireNonNull(replacement, "Replacement drop cannot be null."); - drop = replacement; + drop = Objects.requireNonNull(replacement, "Replacement drop cannot be null."); return replacement; } diff --git a/src/main/java/io/netty/buffer/api/SizeClassedMemoryPool.java b/src/main/java/io/netty/buffer/api/SizeClassedMemoryPool.java index bd24d3d..5b178ae 100644 --- a/src/main/java/io/netty/buffer/api/SizeClassedMemoryPool.java +++ b/src/main/java/io/netty/buffer/api/SizeClassedMemoryPool.java @@ -54,7 +54,7 @@ class SizeClassedMemoryPool implements Allocator, AllocatorControl, Drop { protected Buf createBuf(int size, Drop drop) { var buf = manager.allocateShared(this, size, drop, null); - drop.reconnect(buf); + drop.attach(buf); return buf; } @@ -119,7 +119,7 @@ class SizeClassedMemoryPool implements Allocator, AllocatorControl, Drop { public void recoverMemory(Object memory) { var drop = getDrop(); var buf = manager.recoverMemory(memory, drop); - drop.reconnect(buf); + drop.attach(buf); buf.close(); } diff --git a/src/main/java/io/netty/buffer/api/TransferSend.java b/src/main/java/io/netty/buffer/api/TransferSend.java index a21ff73..24098f4 100644 --- a/src/main/java/io/netty/buffer/api/TransferSend.java +++ b/src/main/java/io/netty/buffer/api/TransferSend.java @@ -37,7 +37,7 @@ class TransferSend, T extends Rc> implements Send { public I receive() { gateReception(); var copy = outgoing.transferOwnership(drop); - drop.reconnect(copy); + drop.attach(copy); return (I) copy; } diff --git a/src/main/java/io/netty/buffer/api/memseg/BifurcatedDrop.java b/src/main/java/io/netty/buffer/api/memseg/BifurcatedDrop.java index d3523ff..2e17421 100644 --- a/src/main/java/io/netty/buffer/api/memseg/BifurcatedDrop.java +++ b/src/main/java/io/netty/buffer/api/memseg/BifurcatedDrop.java @@ -17,45 +17,65 @@ package io.netty.buffer.api.memseg; import io.netty.buffer.api.Drop; -public class BifurcatedDrop implements Drop { +import java.lang.invoke.MethodHandles; +import java.lang.invoke.VarHandle; + +class BifurcatedDrop implements Drop { + private static final VarHandle COUNT; + static { + try { + COUNT = MethodHandles.lookup().findVarHandle(BifurcatedDrop.class, "count", int.class); + } catch (Exception e) { + throw new ExceptionInInitializerError(e); + } + } + private final T originalBuf; private final Drop delegate; - private int count; - private Exception closeTrace; + @SuppressWarnings("FieldMayBeFinal") + private volatile int count; - public BifurcatedDrop(T originalBuf, Drop delegate) { + BifurcatedDrop(T originalBuf, Drop delegate) { this.originalBuf = originalBuf; this.delegate = delegate; count = 2; // These are created by buffer bifurcation, so we initially have 2 references to this drop. } - public synchronized void increment() { - checkValidState(); - count++; + void increment() { + int c; + do { + c = count; + checkValidState(c); + } while (!COUNT.compareAndSet(this, c, c + 1)); } @Override public synchronized void drop(T obj) { - checkValidState(); - if (--count == 0) { - closeTrace = new Exception("close: " + delegate); - delegate.reconnect(originalBuf); + int c; + int n; + do { + c = count; + n = c - 1; + checkValidState(c); + } while (!COUNT.compareAndSet(this, c, n)); + if (n == 0) { + delegate.attach(originalBuf); delegate.drop(originalBuf); } } @Override - public void reconnect(T obj) { - delegate.reconnect(obj); + public void attach(T obj) { + delegate.attach(obj); } Drop unwrap() { return delegate; } - private void checkValidState() { + private static void checkValidState(int count) { if (count == 0) { - throw new IllegalStateException("Underlying resources have already been freed.", closeTrace); + throw new IllegalStateException("Underlying resources have already been freed."); } } } diff --git a/src/main/java/io/netty/buffer/api/memseg/MemSegBuf.java b/src/main/java/io/netty/buffer/api/memseg/MemSegBuf.java index 9aed640..7a11636 100644 --- a/src/main/java/io/netty/buffer/api/memseg/MemSegBuf.java +++ b/src/main/java/io/netty/buffer/api/memseg/MemSegBuf.java @@ -328,7 +328,7 @@ class MemSegBuf extends RcSupport implements Buf { } seg = newSegment; - drop.reconnect(this); + drop.attach(this); } } @@ -340,7 +340,7 @@ class MemSegBuf extends RcSupport implements Buf { var drop = unsafeGetDrop(); if (seg.ownerThread() != null) { seg = seg.share(); - drop.reconnect(this); + drop.attach(this); } if (drop instanceof BifurcatedDrop) { ((BifurcatedDrop) drop).increment();