Add an Rc.isSendable method

Motivation:
Reference counted objects may be stateful and cannot always be sent or transfer their ownership.
It's desirable that integrators can check whether or not an object can be sent.

Modification:
Add an Rc.isSendable method that returns true if the object can be sent, and false otherwise.
Implementors of the Rc interface, and extenders or RcSupport, can then implement whatever special logic they need for restricting sending in certain situations.

Result:
It's possible to test if an object supports send() or not in any given situation.
This commit is contained in:
Chris Vest 2020-10-30 14:21:20 +01:00
parent 6d4ad29149
commit 2c5be51ec6
5 changed files with 88 additions and 29 deletions

View File

@ -36,20 +36,20 @@ final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
private final TornBufAccessors tornBufAccessors;
private final boolean isSendable;
private Buf[] bufs;
private int[] offsets; // The offset, for the composite buffer, where each constituent buffer starts.
private int[] offsetSums; // The cumulative composite buffer offset.
private int capacity;
private final Buf[] bufs;
private final int[] offsets; // The offset, for the composite buffer, where each constituent buffer starts.
private final int[] offsetSums; // The cumulative composite buffer offset.
private final int capacity;
private int roff;
private int woff;
private int subOffset; // The next offset *within* a consituent buffer to read from or write to.
CompositeBuf(Buf[] bufs) {
this(true, bufs.clone()); // Clone prevents external modification of array.
this(true, bufs.clone(), COMPOSITE_DROP); // Clone prevents external modification of array.
}
private CompositeBuf(boolean isSendable, Buf[] bufs) {
super(COMPOSITE_DROP);
private CompositeBuf(boolean isSendable, Buf[] bufs, Drop<CompositeBuf> drop) {
super(drop);
this.isSendable = isSendable;
for (Buf buf : bufs) {
buf.acquire();
@ -196,6 +196,11 @@ final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
}
Buf choice = (Buf) chooseBuffer(offset, 0);
Buf[] slices = null;
acquire(); // Increase reference count of the original composite buffer.
Drop<CompositeBuf> drop = obj -> {
close(); // Decrement the reference count of the original composite buffer.
COMPOSITE_DROP.drop(obj);
};
try {
if (length > 0) {
@ -216,7 +221,11 @@ final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
slices = new Buf[] { choice.slice(subOffset, 0) };
}
return new CompositeBuf(false, slices).writerIndex(length);
return new CompositeBuf(false, slices, drop).writerIndex(length);
} catch (Throwable throwable) {
// We called acquire prior to the try-clause. We need to undo that if we're not creating a composite buffer:
close();
throw throwable;
} finally {
if (slices != null) {
for (Buf slice : slices) {
@ -496,10 +505,6 @@ final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
@Override
protected Owned<CompositeBuf> prepareSend() {
if (!isSendable) {
throw new IllegalStateException(
"Cannot send() this buffer. This buffer might be a slice of another buffer.");
}
@SuppressWarnings("unchecked")
Send<Buf>[] sends = new Send[bufs.length];
try {
@ -526,14 +531,26 @@ final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
for (int i = 0; i < sends.length; i++) {
received[i] = sends[i].receive();
}
var composite = new CompositeBuf(true, received);
var composite = new CompositeBuf(true, received, drop);
drop.accept(composite);
return composite;
}
};
}
@Override
protected IllegalStateException notSendableException() {
if (!isSendable) {
return new IllegalStateException(
"Cannot send() this buffer. This buffer might be a slice of another buffer.");
}
return super.notSendableException();
}
@Override
public boolean isSendable() {
return isSendable && super.isSendable();
}
long readPassThrough() {
var buf = choosePassThroughBuffer(subOffset++);

View File

@ -607,10 +607,6 @@ getByteAtOffset_BE(seg, roff) & 0xFF |
@Override
protected Owned<MemSegBuf> prepareSend() {
if (!isSendable) {
throw new IllegalStateException(
"Cannot send() this buffer. This buffer might be a slice of another buffer.");
}
MemSegBuf outer = this;
boolean isConfined = seg.ownerThread() == null;
MemorySegment transferSegment = isConfined? seg : seg.share();
@ -627,6 +623,20 @@ getByteAtOffset_BE(seg, roff) & 0xFF |
};
}
@Override
protected IllegalStateException notSendableException() {
if (!isSendable) {
return new IllegalStateException(
"Cannot send() this buffer. This buffer might be a slice of another buffer.");
}
return super.notSendableException();
}
@Override
public boolean isSendable() {
return isSendable && super.isSendable();
}
private void checkRead(int index, int size) {
if (index < 0 || woff < index + size) {
throw indexOutOfBounds(index);

View File

@ -30,7 +30,7 @@ public interface Rc<I extends Rc<I>> extends AutoCloseable {
/**
* Increment the reference count.
* <p>
* Note, this method is not thread-safe because Rc's are meant to thread-confined.
* Note, this method is not thread-safe because reference counted objects are meant to thread-confined.
*
* @return This Rc instance.
*/
@ -39,7 +39,7 @@ public interface Rc<I extends Rc<I>> extends AutoCloseable {
/**
* Decrement the reference count, and despose of the resource if the last reference is closed.
* <p>
* Note, this method is not thread-safe because Rc's are meant to be thread-confined.
* Note, this method is not thread-safe because reference counted objects are meant to be thread-confined.
*
* @throws IllegalStateException If this Rc has already been closed.
*/
@ -47,10 +47,25 @@ public interface Rc<I extends Rc<I>> extends AutoCloseable {
void close();
/**
* Send this Rc instance to another Thread, transferring the ownership to the recipient.
* Send this reference counted object instance to another Thread, transferring the ownership to the recipient.
* <p>
* This instance immediately becomes inaccessible, and all attempts at accessing this Rc will throw. Calling {@link
* #close()} will have no effect, so this method is safe to call within a try-with-resources statement.
* Note that the object must be {@linkplain #isSendable() sendable}, and cannot have any outstanding borrows,
* when it's being sent.
* That is, all previous acquires must have been closed, and {@link #isSendable()} must return {@code true}.
* <p>
* This instance immediately becomes inaccessible, and all attempts at accessing this reference counted object
* will throw. Calling {@link #close()} will have no effect, so this method is safe to call within a
* try-with-resources statement.
*/
Send<I> send();
/**
* Check that this reference counted object is sendable.
* <p>
* To be sendable, the object must have no outstanding acquires, and no other implementation defined restrictions.
*
* @return {@code true} if this object can be {@linkplain #send() sent},
* or {@code false} if calling {@link #send()} would throw an exception.
*/
boolean isSendable();
}

View File

@ -72,15 +72,24 @@ public abstract class RcSupport<I extends Rc<I>, T extends RcSupport<I, T>> impl
*/
@Override
public final Send<I> send() {
if (acquires != 0) {
throw new IllegalStateException(
"Cannot send() a reference counted object with " + acquires + " outstanding acquires: " + this);
if (!isSendable()) {
throw notSendableException();
}
var owned = prepareSend();
acquires = -2; // close without dropping (also ignore future double-free attempts)
return new TransferSend<I, T>(owned, drop);
}
protected IllegalStateException notSendableException() {
return new IllegalStateException(
"Cannot send() a reference counted object with " + acquires + " outstanding acquires: " + this);
}
@Override
public boolean isSendable() {
return acquires == 0;
}
/**
* Prepare this instance for ownsership transfer. This method is called from {@link #send()} in the sending thread.
* This method should put this Rc in a deactivated state where it is no longer accessible from the currently owning

View File

@ -31,6 +31,7 @@ import java.util.concurrent.SynchronousQueue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@ -152,6 +153,7 @@ public abstract class BufTest {
public void sendMustThrowWhenBufIsAcquired() {
try (Buf buf = allocate(8)) {
try (Buf ignored = buf.acquire()) {
assertFalse(buf.isSendable());
try {
buf.send();
fail("Should not be able to send() a borrowed buffer.");
@ -160,6 +162,7 @@ public abstract class BufTest {
}
}
// Now send() should work again.
assertTrue(buf.isSendable());
buf.send().receive().close();
}
}
@ -387,6 +390,7 @@ public abstract class BufTest {
public void sliceWithoutOffsetAndSizeWillIncreaseReferenceCount() {
try (Buf buf = allocate(8)) {
try (Buf ignored = buf.slice()) {
assertFalse(buf.isSendable());
buf.send();
fail("Should have refused send() of acquired buffer.");
} catch (IllegalStateException ignore) {
@ -399,6 +403,7 @@ public abstract class BufTest {
public void sliceWithOffsetAndSizeWillIncreaseReferenceCount() {
try (Buf buf = allocate(8)) {
try (Buf ignored = buf.slice(0, 8)) {
assertFalse(buf.isSendable());
buf.send();
fail("Should have refused send() of acquired buffer.");
} catch (IllegalStateException ignore) {
@ -441,12 +446,14 @@ public abstract class BufTest {
public void sendOnSliceWithoutOffsetAndSizeMustThrow() {
try (Buf buf = allocate(8)) {
try (Buf slice = buf.slice()) {
assertFalse(buf.isSendable());
slice.send();
fail("Should not be able to send a slice.");
} catch (IllegalStateException ignore) {
// Good.
}
// Verify that the slice is closed properly afterwards.
assertTrue(buf.isSendable());
buf.send().receive().close();
}
}
@ -455,13 +462,14 @@ public abstract class BufTest {
public void sendOnSliceWithOffsetAndSizeMustThrow() {
try (Buf buf = allocate(8)) {
try (Buf slice = buf.slice(0, 8)) {
assertFalse(buf.isSendable());
slice.send();
fail("Should not be able to send a slice.");
} catch (IllegalStateException ignore) {
// Good.
}
// Verify that the slice is closed properly afterwards.
buf.send().receive().close();
assertTrue(buf.isSendable());
}
}
@ -474,7 +482,7 @@ public abstract class BufTest {
// Good.
}
// Verify that the slice is closed properly afterwards.
buf.send().receive().close();
assertTrue(buf.isSendable());
}
}
@ -487,7 +495,7 @@ public abstract class BufTest {
// Good.
}
// Verify that the slice is closed properly afterwards.
buf.send().receive().close();
assertTrue(buf.isSendable());
}
}
@ -506,7 +514,7 @@ public abstract class BufTest {
// Good.
}
// Verify that the slice is closed properly afterwards.
buf.send().receive().close();
assertTrue(buf.isSendable());
}
}