diff --git a/buffer/src/main/java/io/netty/buffer/b2/CompositeBuf.java b/buffer/src/main/java/io/netty/buffer/b2/CompositeBuf.java index abe1603..5544479 100644 --- a/buffer/src/main/java/io/netty/buffer/b2/CompositeBuf.java +++ b/buffer/src/main/java/io/netty/buffer/b2/CompositeBuf.java @@ -36,20 +36,20 @@ final class CompositeBuf extends RcSupport implements Buf { private final TornBufAccessors tornBufAccessors; private final boolean isSendable; - private Buf[] bufs; - private int[] offsets; // The offset, for the composite buffer, where each constituent buffer starts. - private int[] offsetSums; // The cumulative composite buffer offset. - private int capacity; + private final Buf[] bufs; + private final int[] offsets; // The offset, for the composite buffer, where each constituent buffer starts. + private final int[] offsetSums; // The cumulative composite buffer offset. + private final int capacity; private int roff; private int woff; private int subOffset; // The next offset *within* a consituent buffer to read from or write to. CompositeBuf(Buf[] bufs) { - this(true, bufs.clone()); // Clone prevents external modification of array. + this(true, bufs.clone(), COMPOSITE_DROP); // Clone prevents external modification of array. } - private CompositeBuf(boolean isSendable, Buf[] bufs) { - super(COMPOSITE_DROP); + private CompositeBuf(boolean isSendable, Buf[] bufs, Drop drop) { + super(drop); this.isSendable = isSendable; for (Buf buf : bufs) { buf.acquire(); @@ -196,6 +196,11 @@ final class CompositeBuf extends RcSupport implements Buf { } Buf choice = (Buf) chooseBuffer(offset, 0); Buf[] slices = null; + acquire(); // Increase reference count of the original composite buffer. + Drop drop = obj -> { + close(); // Decrement the reference count of the original composite buffer. + COMPOSITE_DROP.drop(obj); + }; try { if (length > 0) { @@ -216,7 +221,11 @@ final class CompositeBuf extends RcSupport implements Buf { slices = new Buf[] { choice.slice(subOffset, 0) }; } - return new CompositeBuf(false, slices).writerIndex(length); + return new CompositeBuf(false, slices, drop).writerIndex(length); + } catch (Throwable throwable) { + // We called acquire prior to the try-clause. We need to undo that if we're not creating a composite buffer: + close(); + throw throwable; } finally { if (slices != null) { for (Buf slice : slices) { @@ -496,10 +505,6 @@ final class CompositeBuf extends RcSupport implements Buf { @Override protected Owned prepareSend() { - if (!isSendable) { - throw new IllegalStateException( - "Cannot send() this buffer. This buffer might be a slice of another buffer."); - } @SuppressWarnings("unchecked") Send[] sends = new Send[bufs.length]; try { @@ -526,14 +531,26 @@ final class CompositeBuf extends RcSupport implements Buf { for (int i = 0; i < sends.length; i++) { received[i] = sends[i].receive(); } - var composite = new CompositeBuf(true, received); + var composite = new CompositeBuf(true, received, drop); drop.accept(composite); return composite; } }; } + @Override + protected IllegalStateException notSendableException() { + if (!isSendable) { + return new IllegalStateException( + "Cannot send() this buffer. This buffer might be a slice of another buffer."); + } + return super.notSendableException(); + } + @Override + public boolean isSendable() { + return isSendable && super.isSendable(); + } long readPassThrough() { var buf = choosePassThroughBuffer(subOffset++); diff --git a/buffer/src/main/java/io/netty/buffer/b2/MemSegBuf.java b/buffer/src/main/java/io/netty/buffer/b2/MemSegBuf.java index 5ef018a..8ddb52d 100644 --- a/buffer/src/main/java/io/netty/buffer/b2/MemSegBuf.java +++ b/buffer/src/main/java/io/netty/buffer/b2/MemSegBuf.java @@ -607,10 +607,6 @@ getByteAtOffset_BE(seg, roff) & 0xFF | @Override protected Owned prepareSend() { - if (!isSendable) { - throw new IllegalStateException( - "Cannot send() this buffer. This buffer might be a slice of another buffer."); - } MemSegBuf outer = this; boolean isConfined = seg.ownerThread() == null; MemorySegment transferSegment = isConfined? seg : seg.share(); @@ -627,6 +623,20 @@ getByteAtOffset_BE(seg, roff) & 0xFF | }; } + @Override + protected IllegalStateException notSendableException() { + if (!isSendable) { + return new IllegalStateException( + "Cannot send() this buffer. This buffer might be a slice of another buffer."); + } + return super.notSendableException(); + } + + @Override + public boolean isSendable() { + return isSendable && super.isSendable(); + } + private void checkRead(int index, int size) { if (index < 0 || woff < index + size) { throw indexOutOfBounds(index); diff --git a/buffer/src/main/java/io/netty/buffer/b2/Rc.java b/buffer/src/main/java/io/netty/buffer/b2/Rc.java index e76da94..81ff808 100644 --- a/buffer/src/main/java/io/netty/buffer/b2/Rc.java +++ b/buffer/src/main/java/io/netty/buffer/b2/Rc.java @@ -30,7 +30,7 @@ public interface Rc> extends AutoCloseable { /** * Increment the reference count. *

- * Note, this method is not thread-safe because Rc's are meant to thread-confined. + * Note, this method is not thread-safe because reference counted objects are meant to thread-confined. * * @return This Rc instance. */ @@ -39,7 +39,7 @@ public interface Rc> extends AutoCloseable { /** * Decrement the reference count, and despose of the resource if the last reference is closed. *

- * Note, this method is not thread-safe because Rc's are meant to be thread-confined. + * Note, this method is not thread-safe because reference counted objects are meant to be thread-confined. * * @throws IllegalStateException If this Rc has already been closed. */ @@ -47,10 +47,25 @@ public interface Rc> extends AutoCloseable { void close(); /** - * Send this Rc instance to another Thread, transferring the ownership to the recipient. + * Send this reference counted object instance to another Thread, transferring the ownership to the recipient. *

- * This instance immediately becomes inaccessible, and all attempts at accessing this Rc will throw. Calling {@link - * #close()} will have no effect, so this method is safe to call within a try-with-resources statement. + * Note that the object must be {@linkplain #isSendable() sendable}, and cannot have any outstanding borrows, + * when it's being sent. + * That is, all previous acquires must have been closed, and {@link #isSendable()} must return {@code true}. + *

+ * This instance immediately becomes inaccessible, and all attempts at accessing this reference counted object + * will throw. Calling {@link #close()} will have no effect, so this method is safe to call within a + * try-with-resources statement. */ Send send(); + + /** + * Check that this reference counted object is sendable. + *

+ * To be sendable, the object must have no outstanding acquires, and no other implementation defined restrictions. + * + * @return {@code true} if this object can be {@linkplain #send() sent}, + * or {@code false} if calling {@link #send()} would throw an exception. + */ + boolean isSendable(); } diff --git a/buffer/src/main/java/io/netty/buffer/b2/RcSupport.java b/buffer/src/main/java/io/netty/buffer/b2/RcSupport.java index 87d5270..2d0b76c 100644 --- a/buffer/src/main/java/io/netty/buffer/b2/RcSupport.java +++ b/buffer/src/main/java/io/netty/buffer/b2/RcSupport.java @@ -72,15 +72,24 @@ public abstract class RcSupport, T extends RcSupport> impl */ @Override public final Send send() { - if (acquires != 0) { - throw new IllegalStateException( - "Cannot send() a reference counted object with " + acquires + " outstanding acquires: " + this); + if (!isSendable()) { + throw notSendableException(); } var owned = prepareSend(); acquires = -2; // close without dropping (also ignore future double-free attempts) return new TransferSend(owned, drop); } + protected IllegalStateException notSendableException() { + return new IllegalStateException( + "Cannot send() a reference counted object with " + acquires + " outstanding acquires: " + this); + } + + @Override + public boolean isSendable() { + return acquires == 0; + } + /** * Prepare this instance for ownsership transfer. This method is called from {@link #send()} in the sending thread. * This method should put this Rc in a deactivated state where it is no longer accessible from the currently owning diff --git a/buffer/src/test/java/io/netty/buffer/b2/BufTest.java b/buffer/src/test/java/io/netty/buffer/b2/BufTest.java index 27ff4aa..d7f6213 100644 --- a/buffer/src/test/java/io/netty/buffer/b2/BufTest.java +++ b/buffer/src/test/java/io/netty/buffer/b2/BufTest.java @@ -31,6 +31,7 @@ import java.util.concurrent.SynchronousQueue; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsString; import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -152,6 +153,7 @@ public abstract class BufTest { public void sendMustThrowWhenBufIsAcquired() { try (Buf buf = allocate(8)) { try (Buf ignored = buf.acquire()) { + assertFalse(buf.isSendable()); try { buf.send(); fail("Should not be able to send() a borrowed buffer."); @@ -160,6 +162,7 @@ public abstract class BufTest { } } // Now send() should work again. + assertTrue(buf.isSendable()); buf.send().receive().close(); } } @@ -387,6 +390,7 @@ public abstract class BufTest { public void sliceWithoutOffsetAndSizeWillIncreaseReferenceCount() { try (Buf buf = allocate(8)) { try (Buf ignored = buf.slice()) { + assertFalse(buf.isSendable()); buf.send(); fail("Should have refused send() of acquired buffer."); } catch (IllegalStateException ignore) { @@ -399,6 +403,7 @@ public abstract class BufTest { public void sliceWithOffsetAndSizeWillIncreaseReferenceCount() { try (Buf buf = allocate(8)) { try (Buf ignored = buf.slice(0, 8)) { + assertFalse(buf.isSendable()); buf.send(); fail("Should have refused send() of acquired buffer."); } catch (IllegalStateException ignore) { @@ -441,12 +446,14 @@ public abstract class BufTest { public void sendOnSliceWithoutOffsetAndSizeMustThrow() { try (Buf buf = allocate(8)) { try (Buf slice = buf.slice()) { + assertFalse(buf.isSendable()); slice.send(); fail("Should not be able to send a slice."); } catch (IllegalStateException ignore) { // Good. } // Verify that the slice is closed properly afterwards. + assertTrue(buf.isSendable()); buf.send().receive().close(); } } @@ -455,13 +462,14 @@ public abstract class BufTest { public void sendOnSliceWithOffsetAndSizeMustThrow() { try (Buf buf = allocate(8)) { try (Buf slice = buf.slice(0, 8)) { + assertFalse(buf.isSendable()); slice.send(); fail("Should not be able to send a slice."); } catch (IllegalStateException ignore) { // Good. } // Verify that the slice is closed properly afterwards. - buf.send().receive().close(); + assertTrue(buf.isSendable()); } } @@ -474,7 +482,7 @@ public abstract class BufTest { // Good. } // Verify that the slice is closed properly afterwards. - buf.send().receive().close(); + assertTrue(buf.isSendable()); } } @@ -487,7 +495,7 @@ public abstract class BufTest { // Good. } // Verify that the slice is closed properly afterwards. - buf.send().receive().close(); + assertTrue(buf.isSendable()); } } @@ -506,7 +514,7 @@ public abstract class BufTest { // Good. } // Verify that the slice is closed properly afterwards. - buf.send().receive().close(); + assertTrue(buf.isSendable()); } }