From 2c5be51ec6ec6c646e92e815b5b6d32af4d8395b Mon Sep 17 00:00:00 2001 From: Chris Vest Date: Fri, 30 Oct 2020 14:21:20 +0100 Subject: [PATCH] Add an Rc.isSendable method Motivation: Reference counted objects may be stateful and cannot always be sent or transfer their ownership. It's desirable that integrators can check whether or not an object can be sent. Modification: Add an Rc.isSendable method that returns true if the object can be sent, and false otherwise. Implementors of the Rc interface, and extenders or RcSupport, can then implement whatever special logic they need for restricting sending in certain situations. Result: It's possible to test if an object supports send() or not in any given situation. --- .../java/io/netty/buffer/b2/CompositeBuf.java | 43 +++++++++++++------ .../java/io/netty/buffer/b2/MemSegBuf.java | 18 ++++++-- .../src/main/java/io/netty/buffer/b2/Rc.java | 25 ++++++++--- .../java/io/netty/buffer/b2/RcSupport.java | 15 +++++-- .../test/java/io/netty/buffer/b2/BufTest.java | 16 +++++-- 5 files changed, 88 insertions(+), 29 deletions(-) diff --git a/buffer/src/main/java/io/netty/buffer/b2/CompositeBuf.java b/buffer/src/main/java/io/netty/buffer/b2/CompositeBuf.java index abe1603..5544479 100644 --- a/buffer/src/main/java/io/netty/buffer/b2/CompositeBuf.java +++ b/buffer/src/main/java/io/netty/buffer/b2/CompositeBuf.java @@ -36,20 +36,20 @@ final class CompositeBuf extends RcSupport implements Buf { private final TornBufAccessors tornBufAccessors; private final boolean isSendable; - private Buf[] bufs; - private int[] offsets; // The offset, for the composite buffer, where each constituent buffer starts. - private int[] offsetSums; // The cumulative composite buffer offset. - private int capacity; + private final Buf[] bufs; + private final int[] offsets; // The offset, for the composite buffer, where each constituent buffer starts. + private final int[] offsetSums; // The cumulative composite buffer offset. + private final int capacity; private int roff; private int woff; private int subOffset; // The next offset *within* a consituent buffer to read from or write to. CompositeBuf(Buf[] bufs) { - this(true, bufs.clone()); // Clone prevents external modification of array. + this(true, bufs.clone(), COMPOSITE_DROP); // Clone prevents external modification of array. } - private CompositeBuf(boolean isSendable, Buf[] bufs) { - super(COMPOSITE_DROP); + private CompositeBuf(boolean isSendable, Buf[] bufs, Drop drop) { + super(drop); this.isSendable = isSendable; for (Buf buf : bufs) { buf.acquire(); @@ -196,6 +196,11 @@ final class CompositeBuf extends RcSupport implements Buf { } Buf choice = (Buf) chooseBuffer(offset, 0); Buf[] slices = null; + acquire(); // Increase reference count of the original composite buffer. + Drop drop = obj -> { + close(); // Decrement the reference count of the original composite buffer. + COMPOSITE_DROP.drop(obj); + }; try { if (length > 0) { @@ -216,7 +221,11 @@ final class CompositeBuf extends RcSupport implements Buf { slices = new Buf[] { choice.slice(subOffset, 0) }; } - return new CompositeBuf(false, slices).writerIndex(length); + return new CompositeBuf(false, slices, drop).writerIndex(length); + } catch (Throwable throwable) { + // We called acquire prior to the try-clause. We need to undo that if we're not creating a composite buffer: + close(); + throw throwable; } finally { if (slices != null) { for (Buf slice : slices) { @@ -496,10 +505,6 @@ final class CompositeBuf extends RcSupport implements Buf { @Override protected Owned prepareSend() { - if (!isSendable) { - throw new IllegalStateException( - "Cannot send() this buffer. This buffer might be a slice of another buffer."); - } @SuppressWarnings("unchecked") Send[] sends = new Send[bufs.length]; try { @@ -526,14 +531,26 @@ final class CompositeBuf extends RcSupport implements Buf { for (int i = 0; i < sends.length; i++) { received[i] = sends[i].receive(); } - var composite = new CompositeBuf(true, received); + var composite = new CompositeBuf(true, received, drop); drop.accept(composite); return composite; } }; } + @Override + protected IllegalStateException notSendableException() { + if (!isSendable) { + return new IllegalStateException( + "Cannot send() this buffer. This buffer might be a slice of another buffer."); + } + return super.notSendableException(); + } + @Override + public boolean isSendable() { + return isSendable && super.isSendable(); + } long readPassThrough() { var buf = choosePassThroughBuffer(subOffset++); diff --git a/buffer/src/main/java/io/netty/buffer/b2/MemSegBuf.java b/buffer/src/main/java/io/netty/buffer/b2/MemSegBuf.java index 5ef018a..8ddb52d 100644 --- a/buffer/src/main/java/io/netty/buffer/b2/MemSegBuf.java +++ b/buffer/src/main/java/io/netty/buffer/b2/MemSegBuf.java @@ -607,10 +607,6 @@ getByteAtOffset_BE(seg, roff) & 0xFF | @Override protected Owned prepareSend() { - if (!isSendable) { - throw new IllegalStateException( - "Cannot send() this buffer. This buffer might be a slice of another buffer."); - } MemSegBuf outer = this; boolean isConfined = seg.ownerThread() == null; MemorySegment transferSegment = isConfined? seg : seg.share(); @@ -627,6 +623,20 @@ getByteAtOffset_BE(seg, roff) & 0xFF | }; } + @Override + protected IllegalStateException notSendableException() { + if (!isSendable) { + return new IllegalStateException( + "Cannot send() this buffer. This buffer might be a slice of another buffer."); + } + return super.notSendableException(); + } + + @Override + public boolean isSendable() { + return isSendable && super.isSendable(); + } + private void checkRead(int index, int size) { if (index < 0 || woff < index + size) { throw indexOutOfBounds(index); diff --git a/buffer/src/main/java/io/netty/buffer/b2/Rc.java b/buffer/src/main/java/io/netty/buffer/b2/Rc.java index e76da94..81ff808 100644 --- a/buffer/src/main/java/io/netty/buffer/b2/Rc.java +++ b/buffer/src/main/java/io/netty/buffer/b2/Rc.java @@ -30,7 +30,7 @@ public interface Rc> extends AutoCloseable { /** * Increment the reference count. *

- * Note, this method is not thread-safe because Rc's are meant to thread-confined. + * Note, this method is not thread-safe because reference counted objects are meant to thread-confined. * * @return This Rc instance. */ @@ -39,7 +39,7 @@ public interface Rc> extends AutoCloseable { /** * Decrement the reference count, and despose of the resource if the last reference is closed. *

- * Note, this method is not thread-safe because Rc's are meant to be thread-confined. + * Note, this method is not thread-safe because reference counted objects are meant to be thread-confined. * * @throws IllegalStateException If this Rc has already been closed. */ @@ -47,10 +47,25 @@ public interface Rc> extends AutoCloseable { void close(); /** - * Send this Rc instance to another Thread, transferring the ownership to the recipient. + * Send this reference counted object instance to another Thread, transferring the ownership to the recipient. *

- * This instance immediately becomes inaccessible, and all attempts at accessing this Rc will throw. Calling {@link - * #close()} will have no effect, so this method is safe to call within a try-with-resources statement. + * Note that the object must be {@linkplain #isSendable() sendable}, and cannot have any outstanding borrows, + * when it's being sent. + * That is, all previous acquires must have been closed, and {@link #isSendable()} must return {@code true}. + *

+ * This instance immediately becomes inaccessible, and all attempts at accessing this reference counted object + * will throw. Calling {@link #close()} will have no effect, so this method is safe to call within a + * try-with-resources statement. */ Send send(); + + /** + * Check that this reference counted object is sendable. + *

+ * To be sendable, the object must have no outstanding acquires, and no other implementation defined restrictions. + * + * @return {@code true} if this object can be {@linkplain #send() sent}, + * or {@code false} if calling {@link #send()} would throw an exception. + */ + boolean isSendable(); } diff --git a/buffer/src/main/java/io/netty/buffer/b2/RcSupport.java b/buffer/src/main/java/io/netty/buffer/b2/RcSupport.java index 87d5270..2d0b76c 100644 --- a/buffer/src/main/java/io/netty/buffer/b2/RcSupport.java +++ b/buffer/src/main/java/io/netty/buffer/b2/RcSupport.java @@ -72,15 +72,24 @@ public abstract class RcSupport, T extends RcSupport> impl */ @Override public final Send send() { - if (acquires != 0) { - throw new IllegalStateException( - "Cannot send() a reference counted object with " + acquires + " outstanding acquires: " + this); + if (!isSendable()) { + throw notSendableException(); } var owned = prepareSend(); acquires = -2; // close without dropping (also ignore future double-free attempts) return new TransferSend(owned, drop); } + protected IllegalStateException notSendableException() { + return new IllegalStateException( + "Cannot send() a reference counted object with " + acquires + " outstanding acquires: " + this); + } + + @Override + public boolean isSendable() { + return acquires == 0; + } + /** * Prepare this instance for ownsership transfer. This method is called from {@link #send()} in the sending thread. * This method should put this Rc in a deactivated state where it is no longer accessible from the currently owning diff --git a/buffer/src/test/java/io/netty/buffer/b2/BufTest.java b/buffer/src/test/java/io/netty/buffer/b2/BufTest.java index 27ff4aa..d7f6213 100644 --- a/buffer/src/test/java/io/netty/buffer/b2/BufTest.java +++ b/buffer/src/test/java/io/netty/buffer/b2/BufTest.java @@ -31,6 +31,7 @@ import java.util.concurrent.SynchronousQueue; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsString; import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -152,6 +153,7 @@ public abstract class BufTest { public void sendMustThrowWhenBufIsAcquired() { try (Buf buf = allocate(8)) { try (Buf ignored = buf.acquire()) { + assertFalse(buf.isSendable()); try { buf.send(); fail("Should not be able to send() a borrowed buffer."); @@ -160,6 +162,7 @@ public abstract class BufTest { } } // Now send() should work again. + assertTrue(buf.isSendable()); buf.send().receive().close(); } } @@ -387,6 +390,7 @@ public abstract class BufTest { public void sliceWithoutOffsetAndSizeWillIncreaseReferenceCount() { try (Buf buf = allocate(8)) { try (Buf ignored = buf.slice()) { + assertFalse(buf.isSendable()); buf.send(); fail("Should have refused send() of acquired buffer."); } catch (IllegalStateException ignore) { @@ -399,6 +403,7 @@ public abstract class BufTest { public void sliceWithOffsetAndSizeWillIncreaseReferenceCount() { try (Buf buf = allocate(8)) { try (Buf ignored = buf.slice(0, 8)) { + assertFalse(buf.isSendable()); buf.send(); fail("Should have refused send() of acquired buffer."); } catch (IllegalStateException ignore) { @@ -441,12 +446,14 @@ public abstract class BufTest { public void sendOnSliceWithoutOffsetAndSizeMustThrow() { try (Buf buf = allocate(8)) { try (Buf slice = buf.slice()) { + assertFalse(buf.isSendable()); slice.send(); fail("Should not be able to send a slice."); } catch (IllegalStateException ignore) { // Good. } // Verify that the slice is closed properly afterwards. + assertTrue(buf.isSendable()); buf.send().receive().close(); } } @@ -455,13 +462,14 @@ public abstract class BufTest { public void sendOnSliceWithOffsetAndSizeMustThrow() { try (Buf buf = allocate(8)) { try (Buf slice = buf.slice(0, 8)) { + assertFalse(buf.isSendable()); slice.send(); fail("Should not be able to send a slice."); } catch (IllegalStateException ignore) { // Good. } // Verify that the slice is closed properly afterwards. - buf.send().receive().close(); + assertTrue(buf.isSendable()); } } @@ -474,7 +482,7 @@ public abstract class BufTest { // Good. } // Verify that the slice is closed properly afterwards. - buf.send().receive().close(); + assertTrue(buf.isSendable()); } } @@ -487,7 +495,7 @@ public abstract class BufTest { // Good. } // Verify that the slice is closed properly afterwards. - buf.send().receive().close(); + assertTrue(buf.isSendable()); } } @@ -506,7 +514,7 @@ public abstract class BufTest { // Good. } // Verify that the slice is closed properly afterwards. - buf.send().receive().close(); + assertTrue(buf.isSendable()); } }