Address review comments on bifurcate PR

This commit is contained in:
Chris Vest 2020-12-10 12:51:18 +01:00
parent b749106c0c
commit bb2264ac5b
8 changed files with 50 additions and 28 deletions

View File

@ -130,9 +130,11 @@ final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
@Override
public Buf order(ByteOrder order) {
this.order = order;
for (Buf buf : bufs) {
buf.order(order);
if (this.order != order) {
this.order = order;
for (Buf buf : bufs) {
buf.order(order);
}
}
return this;
}
@ -891,7 +893,7 @@ final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
received[i] = sends[i].receive();
}
var composite = new CompositeBuf(allocator, true, received, drop);
drop.reconnect(composite);
drop.attach(composite);
return composite;
}
};

View File

@ -21,6 +21,7 @@ package io.netty.buffer.api;
*
* @param <T>
*/
@FunctionalInterface
public interface Drop<T> {
/**
* Dispose of the resources in the given Rc.
@ -34,6 +35,6 @@ public interface Drop<T> {
*
* @param obj The new Rc instance with the new owner.
*/
default void reconnect(T obj) {
default void attach(T obj) {
}
}

View File

@ -49,7 +49,7 @@ class NativeMemoryCleanerDrop implements Drop<Buf> {
}
@Override
public void reconnect(Buf buf) {
public void attach(Buf buf) {
// Unregister old cleanable, if any, to avoid uncontrolled build-up.
GatedCleanable c = (GatedCleanable) CLEANABLE.getAndSet(this, null);
if (c != null) {

View File

@ -118,8 +118,7 @@ public abstract class RcSupport<I extends Rc<I>, T extends RcSupport<I, T>> impl
}
protected Drop<T> unsafeExchangeDrop(Drop<T> replacement) {
Objects.requireNonNull(replacement, "Replacement drop cannot be null.");
drop = replacement;
drop = Objects.requireNonNull(replacement, "Replacement drop cannot be null.");
return replacement;
}

View File

@ -54,7 +54,7 @@ class SizeClassedMemoryPool implements Allocator, AllocatorControl, Drop<Buf> {
protected Buf createBuf(int size, Drop<Buf> drop) {
var buf = manager.allocateShared(this, size, drop, null);
drop.reconnect(buf);
drop.attach(buf);
return buf;
}
@ -119,7 +119,7 @@ class SizeClassedMemoryPool implements Allocator, AllocatorControl, Drop<Buf> {
public void recoverMemory(Object memory) {
var drop = getDrop();
var buf = manager.recoverMemory(memory, drop);
drop.reconnect(buf);
drop.attach(buf);
buf.close();
}

View File

@ -37,7 +37,7 @@ class TransferSend<I extends Rc<I>, T extends Rc<I>> implements Send<I> {
public I receive() {
gateReception();
var copy = outgoing.transferOwnership(drop);
drop.reconnect(copy);
drop.attach(copy);
return (I) copy;
}

View File

@ -17,45 +17,65 @@ package io.netty.buffer.api.memseg;
import io.netty.buffer.api.Drop;
public class BifurcatedDrop<T> implements Drop<T> {
import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;
class BifurcatedDrop<T> implements Drop<T> {
private static final VarHandle COUNT;
static {
try {
COUNT = MethodHandles.lookup().findVarHandle(BifurcatedDrop.class, "count", int.class);
} catch (Exception e) {
throw new ExceptionInInitializerError(e);
}
}
private final T originalBuf;
private final Drop<T> delegate;
private int count;
private Exception closeTrace;
@SuppressWarnings("FieldMayBeFinal")
private volatile int count;
public BifurcatedDrop(T originalBuf, Drop<T> delegate) {
BifurcatedDrop(T originalBuf, Drop<T> delegate) {
this.originalBuf = originalBuf;
this.delegate = delegate;
count = 2; // These are created by buffer bifurcation, so we initially have 2 references to this drop.
}
public synchronized void increment() {
checkValidState();
count++;
void increment() {
int c;
do {
c = count;
checkValidState(c);
} while (!COUNT.compareAndSet(this, c, c + 1));
}
@Override
public synchronized void drop(T obj) {
checkValidState();
if (--count == 0) {
closeTrace = new Exception("close: " + delegate);
delegate.reconnect(originalBuf);
int c;
int n;
do {
c = count;
n = c - 1;
checkValidState(c);
} while (!COUNT.compareAndSet(this, c, n));
if (n == 0) {
delegate.attach(originalBuf);
delegate.drop(originalBuf);
}
}
@Override
public void reconnect(T obj) {
delegate.reconnect(obj);
public void attach(T obj) {
delegate.attach(obj);
}
Drop<T> unwrap() {
return delegate;
}
private void checkValidState() {
private static void checkValidState(int count) {
if (count == 0) {
throw new IllegalStateException("Underlying resources have already been freed.", closeTrace);
throw new IllegalStateException("Underlying resources have already been freed.");
}
}
}

View File

@ -328,7 +328,7 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
}
seg = newSegment;
drop.reconnect(this);
drop.attach(this);
}
}
@ -340,7 +340,7 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
var drop = unsafeGetDrop();
if (seg.ownerThread() != null) {
seg = seg.share();
drop.reconnect(this);
drop.attach(this);
}
if (drop instanceof BifurcatedDrop) {
((BifurcatedDrop<?>) drop).increment();