Make Rc an interface

This is necessary in preparation for making BBuf an interface.
This commit is contained in:
Chris Vest 2020-08-24 16:34:32 +02:00
parent 917bfb24f7
commit 7ab05dae7a
6 changed files with 129 additions and 70 deletions

View File

@ -7,7 +7,7 @@ import jdk.incubator.foreign.MemorySegment;
import static io.netty.buffer.b2.Statics.*;
public class BBuf extends Rc<BBuf> {
public class BBuf extends RcSupport<BBuf> {
static final Drop<BBuf> NO_DROP = buf -> {};
static final Drop<BBuf> SEGMENT_CLOSE = buf -> buf.segment.close();
static final Drop<BBuf> SEGMENT_CLOSE_NATIVE = buf -> {
@ -78,7 +78,7 @@ public class BBuf extends Rc<BBuf> {
}
@Override
protected BBuf transferOwnership(Thread recipient, Drop<BBuf> drop) {
public BBuf transferOwnership(Thread recipient, Drop<BBuf> drop) {
BBuf copy = new BBuf(segment.withOwnerThread(recipient), drop);
copy.read = read;
copy.write = write;
@ -91,7 +91,7 @@ public class BBuf extends Rc<BBuf> {
MemorySegment transferSegment = segment.withOwnerThread(TRANSFER_OWNER);
return new BBuf(transferSegment, NO_DROP) {
@Override
protected BBuf transferOwnership(Thread recipient, Drop<BBuf> drop) {
public BBuf transferOwnership(Thread recipient, Drop<BBuf> drop) {
overwriteMemorySegmentOwner(transferSegment, recipient);
BBuf copy = new BBuf(transferSegment, drop);
copy.read = outer.read;

View File

@ -0,0 +1,24 @@
package io.netty.buffer.b2;
/**
* This interface encapsulates the ownership of an {@link Rc}, and exposes a method that may be used to transfer this
* ownership to the specified recipient thread.
*
* @param <T> The concrete type of {@link Rc} that is owned.
*/
@SuppressWarnings("InterfaceMayBeAnnotatedFunctional")
public interface Owned<T extends Rc<T>> {
/**
* Transfer the ownership of the owned Rc, to the given recipient thread. The owned Rc is invalidated but without
* disposing of its internal state. Then a new Rc with the given owner is produced in its stead.
* <p>
* This method is called by {@link Send} implementations. These implementations will ensure that the transfer of
* ownership (the calling of this method) happens-before the new owner begins accessing the new object. This ensures
* that the new Rc is safely published to the new owners.
*
* @param recipient The new owner of the state represented by this Rc.
* @param drop The drop object that knows how to dispose of the state represented by this Rc.
* @return A new Rc instance that is exactly the same as this Rc, except it has the new owner.
*/
T transferOwnership(Thread recipient, Drop<T> drop);
}

View File

@ -13,14 +13,7 @@ import java.util.function.Consumer;
*
* @param <T> The concrete subtype.
*/
public abstract class Rc<T extends Rc<T>> implements AutoCloseable {
private int acquires; // Closed if negative.
private final Drop<T> drop;
Rc(Drop<T> drop) {
this.drop = drop;
}
public interface Rc<T extends Rc<T>> extends AutoCloseable {
/**
* Increment the reference count.
* <p>
@ -28,13 +21,7 @@ public abstract class Rc<T extends Rc<T>> implements AutoCloseable {
*
* @return This Rc instance.
*/
public T acquire() {
if (acquires < 0) {
throw new IllegalStateException("Resource is closed.");
}
acquires++;
return self();
}
T acquire();
/**
* Decrement the reference count, and despose of the resource if the last reference is closed.
@ -44,15 +31,7 @@ public abstract class Rc<T extends Rc<T>> implements AutoCloseable {
* @throws IllegalStateException If this Rc has already been closed.
*/
@Override
public void close() {
if (acquires == -1) {
throw new IllegalStateException("Double-free: Already closed and dropped.");
}
if (acquires == 0) {
drop.drop(self());
}
acquires--;
}
void close();
/**
* Send this Rc instance ot another Thread, transferring the ownsership fo the recipient, using a rendesvouz
@ -62,12 +41,7 @@ public abstract class Rc<T extends Rc<T>> implements AutoCloseable {
* @param consumer The consumer encodes the mechanism by which the recipient recieves the Rc instance.
* @throws InterruptedException If this thread was interrupted
*/
public void sendTo(Consumer<Send<T>> consumer) throws InterruptedException {
var send = new RendezvousSend<>(self(), drop);
consumer.accept(send);
send.finish();
acquires = -2; // close without dropping (also ignore future double-free attempts)
}
void sendTo(Consumer<Send<T>> consumer) throws InterruptedException;
/**
* Send this Rc instance to another Thread, transferring the ownership to the recipient. This method can be used
@ -79,39 +53,5 @@ public abstract class Rc<T extends Rc<T>> implements AutoCloseable {
* @implNote Not possible without hacks because we need the receiving thread in order to set the new owner in the
* currently owning thread.
*/
public Send<T> send() {
acquires = -2; // close without dropping (also ignore future double-free attempts)
return new TransferSend<>(prepareSend(), drop);
}
/**
* Transfer the ownership of this Rc, to the given recipient thread. This Rc is invalidated but without disposing of
* its internal state. Then a new Rc with the given owner is produced in its stead.
* <p>
* This method is called by {@link Send} implementations. These implementations will ensure that the transfer of
* ownership (the calling of this method) happens-before the new owner begins accessing the new object. This ensures
* that the new Rc is safely published to the new owners.
*
* @param recipient The new owner of the state represented by this Rc.
* @param drop The drop object that knows how to dispose of the state represented by this Rc.
* @return A new Rc instance that is exactly the same as this Rc, except it has the new owner.
*/
protected abstract T transferOwnership(Thread recipient, Drop<T> drop);
/**
* Prepare this instance for ownsership transfer. This method is called from {@link #send()} in the sending thread.
* This method should put this Rc in a deactivated state where it is no longer accessible from the currently owning
* thread. In this state, the Rc instance should only allow a call to {@link #transferOwnership(Thread, Drop)} in
* the recipient thread.
*
* @return This Rc instance in a deactivated state.
*/
protected T prepareSend() {
return self();
}
@SuppressWarnings("unchecked")
private T self() {
return (T) this;
}
Send<T> send();
}

View File

@ -0,0 +1,95 @@
package io.netty.buffer.b2;
import java.util.function.Consumer;
public abstract class RcSupport<T extends RcSupport<T> & Owned<T>> implements Rc<T>, Owned<T> {
private int acquires; // Closed if negative.
private final Drop<T> drop;
RcSupport(Drop<T> drop) {
this.drop = drop;
}
/**
* Increment the reference count.
* <p>
* Note, this method is not thread-safe because Rc's are meant to thread-confined.
*
* @return This Rc instance.
*/
@Override
public T acquire() {
if (acquires < 0) {
throw new IllegalStateException("Resource is closed.");
}
acquires++;
return self();
}
/**
* Decrement the reference count, and despose of the resource if the last reference is closed.
* <p>
* Note, this method is not thread-safe because Rc's are meant to be thread-confined.
*
* @throws IllegalStateException If this Rc has already been closed.
*/
@Override
public void close() {
if (acquires == -1) {
throw new IllegalStateException("Double-free: Already closed and dropped.");
}
if (acquires == 0) {
drop.drop(self());
}
acquires--;
}
/**
* Send this Rc instance ot another Thread, transferring the ownsership fo the recipient, using a rendesvouz
* protocol. This method can be used when the sender wishes to block until the transfer completes. This requires
* that both threads be alive an running for the transfer to complete.
*
* @param consumer The consumer encodes the mechanism by which the recipient recieves the Rc instance.
* @throws InterruptedException If this thread was interrupted
*/
@Override
public void sendTo(Consumer<Send<T>> consumer) throws InterruptedException {
var send = new RendezvousSend<>(self(), drop);
consumer.accept(send);
send.finish();
acquires = -2; // close without dropping (also ignore future double-free attempts)
}
/**
* Send this Rc instance to another Thread, transferring the ownership to the recipient. This method can be used
* when the receiving thread is not known up front.
* <p>
* This instance immediately becomes inaccessible, and all attempts at accessing this Rc will throw. Calling {@link
* #close()} will have no effect, so this method is safe to call within a try-with-resources statement.
*
* @implNote Not possible without hacks because we need the receiving thread in order to set the new owner in the
* currently owning thread.
*/
@Override
public Send<T> send() {
acquires = -2; // close without dropping (also ignore future double-free attempts)
return new TransferSend<>(prepareSend(), drop);
}
/**
* Prepare this instance for ownsership transfer. This method is called from {@link #send()} in the sending thread.
* This method should put this Rc in a deactivated state where it is no longer accessible from the currently owning
* thread. In this state, the Rc instance should only allow a call to {@link #transferOwnership(Thread, Drop)} in
* the recipient thread.
*
* @return This Rc instance in a deactivated state.
*/
protected T prepareSend() {
return self();
}
@SuppressWarnings("unchecked")
private T self() {
return (T) this;
}
}

View File

@ -6,7 +6,7 @@ import java.util.concurrent.CountDownLatch;
import static io.netty.buffer.b2.Statics.*;
import static java.lang.invoke.MethodHandles.*;
class RendezvousSend<T extends Rc<T>> implements Send<T> {
class RendezvousSend<T extends Rc<T> & Owned<T>> implements Send<T> {
private static final VarHandle RECEIVED = findVarHandle(lookup(), RendezvousSend.class, "received", boolean.class);
private final CountDownLatch recipientLatch;
private final CountDownLatch sentLatch;

View File

@ -5,7 +5,7 @@ import java.lang.invoke.VarHandle;
import static io.netty.buffer.b2.Statics.*;
import static java.lang.invoke.MethodHandles.*;
class TransferSend<T extends Rc<T>> implements Send<T> {
class TransferSend<T extends Rc<T> & Owned<T>> implements Send<T> {
private static final VarHandle RECEIVED = findVarHandle(lookup(), TransferSend.class, "received", boolean.class);
private final T outgoing;
private final Drop<T> drop;