From 7f9ed7dec7cb9281d9bd859c199c51642e968594 Mon Sep 17 00:00:00 2001 From: Chris Vest Date: Thu, 15 Oct 2020 16:20:26 +0200 Subject: [PATCH] Implement Buf.slice() Motivation: Slicing gives you a derived buffer. This is useful for sending along just the part of a buffer that has the relevant data, or to get a new buffer instance for the same data, but with independent read and write offsets. Modification: Add slice() methods to the Buf interface, and implement them for MemSegBuf. Buffer slices increments the reference count of the parent buffer, which prevents the parent from being send()-able. Slices are themselves also not send()-able. This is because send() involves ownership transfer, while slicing is like lending out mutable borrows. The send() capability returns to the parent buffer once all slices are closed. --- .../src/main/java/io/netty/buffer/b2/Buf.java | 31 +++ .../java/io/netty/buffer/b2/MemSegBuf.java | 21 +- .../java/io/netty/buffer/b2/RcSupport.java | 16 +- .../test/java/io/netty/buffer/b2/BufTest.java | 203 ++++++++++++++++++ 4 files changed, 266 insertions(+), 5 deletions(-) diff --git a/buffer/src/main/java/io/netty/buffer/b2/Buf.java b/buffer/src/main/java/io/netty/buffer/b2/Buf.java index a1b386c..ea9d59b 100644 --- a/buffer/src/main/java/io/netty/buffer/b2/Buf.java +++ b/buffer/src/main/java/io/netty/buffer/b2/Buf.java @@ -111,6 +111,37 @@ public interface Buf extends Rc { */ long getNativeAddress(); + /** + * Returns a slice of this buffer's readable bytes. + * Modifying the content of the returned buffer or this buffer affects each other's content, + * while they maintain separate indexes. This method is identical to + * {@code buf.slice(buf.readerIndex(), buf.readableBytes())}. + * This method does not modify {@link #readerIndex()} or {@link #writerIndex()} of this buffer. + *

+ * This method increments the reference count of this buffer. + * The reference count is decremented again when the slice is deallocated. + * + * @return A new buffer instance, with independent {@link #readerIndex()} and {@link #writerIndex()}, + * that is a view of the readable region of this buffer. + */ + default Buf slice() { + return slice(readerIndex(), readableBytes()); + } + + /** + * Returns a slice of the given region of this buffer. + * Modifying the content of the returned buffer or this buffer affects each other's content, + * while they maintain separate indexes. + * This method does not modify {@link #readerIndex()} or {@link #writerIndex()} of this buffer. + *

+ * This method increments the reference count of this buffer. + * The reference count is decremented again when the slice is deallocated. + * + * @return A new buffer instance, with independent {@link #readerIndex()} and {@link #writerIndex()}, + * that is a view of the given region of this buffer. + */ + Buf slice(int offset, int length); + // ### CODEGEN START primitive accessors interface // diff --git a/buffer/src/main/java/io/netty/buffer/b2/MemSegBuf.java b/buffer/src/main/java/io/netty/buffer/b2/MemSegBuf.java index 86fd16a..d48d957 100644 --- a/buffer/src/main/java/io/netty/buffer/b2/MemSegBuf.java +++ b/buffer/src/main/java/io/netty/buffer/b2/MemSegBuf.java @@ -52,12 +52,18 @@ class MemSegBuf extends RcSupport implements Buf { static final Drop SEGMENT_CLOSE = buf -> buf.seg.close(); final MemorySegment seg; private boolean isBigEndian; + private boolean isSendable; private int roff; private int woff; - MemSegBuf(MemorySegment segment, Drop drop) { + MemSegBuf(MemorySegment segmet, Drop drop) { + this(segmet, drop, true); + } + + private MemSegBuf(MemorySegment segment, Drop drop, boolean isSendable) { super(drop); seg = segment; + this.isSendable = isSendable; isBigEndian = ByteOrder.nativeOrder() == ByteOrder.BIG_ENDIAN; } @@ -131,6 +137,15 @@ class MemSegBuf extends RcSupport implements Buf { } } + @Override + public Buf slice(int offset, int length) { + var slice = seg.asSlice(offset, length); + acquire(); + Drop drop = b -> close(); + var sendable = false; // Sending implies ownership change, which we can't do for slices. + return new MemSegBuf(slice, drop, sendable).writerIndex(length).order(order()); + } + // ### CODEGEN START primitive accessors implementation // @@ -602,6 +617,10 @@ getByteAtOffset_BE(seg, roff) & 0xFF | @Override protected Owned prepareSend() { + if (!isSendable) { + throw new IllegalStateException( + "Cannot send() this buffer. This buffer might be a slice of another buffer."); + } MemSegBuf outer = this; boolean isConfined = seg.ownerThread() == null; MemorySegment transferSegment = isConfined? seg : seg.withOwnerThread(null); diff --git a/buffer/src/main/java/io/netty/buffer/b2/RcSupport.java b/buffer/src/main/java/io/netty/buffer/b2/RcSupport.java index 1ee365e..c71089f 100644 --- a/buffer/src/main/java/io/netty/buffer/b2/RcSupport.java +++ b/buffer/src/main/java/io/netty/buffer/b2/RcSupport.java @@ -35,6 +35,9 @@ public abstract class RcSupport, T extends RcSupport> impl if (acquires < 0) { throw new IllegalStateException("Resource is closed."); } + if (acquires == Integer.MAX_VALUE) { + throw new IllegalStateException("Cannot acquire more references; counter would overflow."); + } acquires++; return self(); } @@ -49,7 +52,7 @@ public abstract class RcSupport, T extends RcSupport> impl @Override public final void close() { if (acquires == -1) { - throw new IllegalStateException("Double-free: Already closed and dropped."); + throw new IllegalStateException("Double-free: Resource already closed and dropped."); } if (acquires == 0) { drop.drop(impl()); @@ -64,13 +67,18 @@ public abstract class RcSupport, T extends RcSupport> impl * This instance immediately becomes inaccessible, and all attempts at accessing this Rc will throw. Calling {@link * #close()} will have no effect, so this method is safe to call within a try-with-resources statement. * - * @implNote Not possible without hacks because we need the receiving thread in order to set the new owner in the - * currently owning thread. + * @throws IllegalStateException if this object has any outstanding acquires; that is, if this object has been + * {@link #acquire() acquired} more times than it has been {@link #close() closed}. */ @Override public final Send send() { + if (acquires != 0) { + throw new IllegalStateException( + "Cannot send() a reference counted object with " + acquires + " outstanding acquires: " + this); + } + var owned = prepareSend(); acquires = -2; // close without dropping (also ignore future double-free attempts) - return new TransferSend(prepareSend(), drop); + return new TransferSend(owned, drop); } /** diff --git a/buffer/src/test/java/io/netty/buffer/b2/BufTest.java b/buffer/src/test/java/io/netty/buffer/b2/BufTest.java index 11f347c..1fc1bf1 100644 --- a/buffer/src/test/java/io/netty/buffer/b2/BufTest.java +++ b/buffer/src/test/java/io/netty/buffer/b2/BufTest.java @@ -147,6 +147,22 @@ public abstract class BufTest { assertEquals((byte) 42, future.get().byteValue()); } + @Test + public void sendMustThrowWhenBufIsAcquired() { + try (Buf buf = allocate(8)) { + try (Buf ignored = buf.acquire()) { + try { + buf.send(); + fail("Should not be able to send() a borrowed buffer."); + } catch (IllegalStateException ignore) { + // Good. + } + } + // Now send() should work again. + buf.send().receive().close(); + } + } + @Test public void mustThrowWhenAllocatingZeroSizedBuffer() { try { @@ -304,6 +320,193 @@ public abstract class BufTest { } } + @Test + public void sliceWithoutOffsetAndSizeMustReturnReadableRegion() { + try (Buf buf = allocate(8)) { + for (byte b : new byte[] { 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08 }) { + buf.writeByte(b); + } + assertEquals(0x01, buf.readByte()); + buf.writerIndex(buf.writerIndex() - 1); + try (Buf slice = buf.slice()) { + assertArrayEquals(new byte[] {0x02, 0x03, 0x04, 0x05, 0x06, 0x07}, slice.copy()); + assertEquals(0, slice.readerIndex()); + assertEquals(6, slice.readableBytes()); + assertEquals(6, slice.writerIndex()); + assertEquals(6, slice.capacity()); + assertEquals(0x02, slice.readByte()); + assertEquals(0x03, slice.readByte()); + assertEquals(0x04, slice.readByte()); + assertEquals(0x05, slice.readByte()); + assertEquals(0x06, slice.readByte()); + assertEquals(0x07, slice.readByte()); + try { + slice.readByte(); + fail("Should have bounds checked."); + } catch (IndexOutOfBoundsException ignore) { + // Good. + } + } + } + } + + @Test + public void sliceWithOffsetAndSizeMustReturnGivenRegion() { + try (Buf buf = allocate(8)) { + for (byte b : new byte[] { 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08 }) { + buf.writeByte(b); + } + buf.readerIndex(3); // Reader and writer offsets must be ignored. + buf.writerIndex(6); + try (Buf slice = buf.slice(1, 6)) { + assertArrayEquals(new byte[] {0x02, 0x03, 0x04, 0x05, 0x06, 0x07}, slice.copy()); + assertEquals(0, slice.readerIndex()); + assertEquals(6, slice.readableBytes()); + assertEquals(6, slice.writerIndex()); + assertEquals(6, slice.capacity()); + assertEquals(0x02, slice.readByte()); + assertEquals(0x03, slice.readByte()); + assertEquals(0x04, slice.readByte()); + assertEquals(0x05, slice.readByte()); + assertEquals(0x06, slice.readByte()); + assertEquals(0x07, slice.readByte()); + try { + slice.readByte(); + fail("Should have bounds checked."); + } catch (IndexOutOfBoundsException ignore) { + // Good. + } + } + } + } + + @Test + public void sliceWithoutOffsetAndSizeWillIncreaseReferenceCount() { + try (Buf buf = allocate(8)) { + try (Buf ignored = buf.slice()) { + buf.send(); + fail("Should have refused send() of acquired buffer."); + } catch (IllegalStateException ignore) { + // Good. + } + } + } + + @Test + public void sliceWithOffsetAndSizeWillIncreaseReferenceCount() { + try (Buf buf = allocate(8)) { + try (Buf ignored = buf.slice(0, 8)) { + buf.send(); + fail("Should have refused send() of acquired buffer."); + } catch (IllegalStateException ignore) { + // Good. + } + } + } + + @Test + public void sliceWithoutOffsetAndSizeHasSameEndianAsParent() { + try (Buf buf = allocate(8)) { + buf.order(ByteOrder.BIG_ENDIAN); + buf.writeLong(0x0102030405060708L); + try (Buf slice = buf.slice()) { + assertEquals(0x0102030405060708L, slice.readLong()); + } + buf.order(ByteOrder.LITTLE_ENDIAN); + try (Buf slice = buf.slice()) { + assertEquals(0x0807060504030201L, slice.readLong()); + } + } + } + + @Test + public void sliceWithOffsetAndSizeHasSameEndianAsParent() { + try (Buf buf = allocate(8)) { + buf.order(ByteOrder.BIG_ENDIAN); + buf.writeLong(0x0102030405060708L); + try (Buf slice = buf.slice(0, 8)) { + assertEquals(0x0102030405060708L, slice.readLong()); + } + buf.order(ByteOrder.LITTLE_ENDIAN); + try (Buf slice = buf.slice(0, 8)) { + assertEquals(0x0807060504030201L, slice.readLong()); + } + } + } + + @Test + public void sendOnSliceWithoutOffsetAndSizeMustThrow() { + try (Buf buf = allocate(8)) { + try (Buf slice = buf.slice()) { + slice.send(); + fail("Should not be able to send a slice."); + } catch (IllegalStateException ignore) { + // Good. + } + // Verify that the slice is closed properly afterwards. + buf.send().receive().close(); + } + } + + @Test + public void sendOnSliceWithOffsetAndSizeMustThrow() { + try (Buf buf = allocate(8)) { + try (Buf slice = buf.slice(0, 8)) { + slice.send(); + fail("Should not be able to send a slice."); + } catch (IllegalStateException ignore) { + // Good. + } + // Verify that the slice is closed properly afterwards. + buf.send().receive().close(); + } + } + + @Test + public void sliceWithNegativeOffsetMustThrow() { + try (Buf buf = allocate(8)) { + try (Buf ignored = buf.slice(-1, 1)) { + fail("Should not allow negative offsets to slice()."); + } catch (IndexOutOfBoundsException ignore) { + // Good. + } + // Verify that the slice is closed properly afterwards. + buf.send().receive().close(); + } + } + + @Test + public void sliceWithNegativeSizeMustThrow() { + try (Buf buf = allocate(8)) { + try (Buf ignored = buf.slice(0, -1)) { + fail("Should not allow negative size to slice()."); + } catch (IndexOutOfBoundsException ignore) { + // Good. + } + // Verify that the slice is closed properly afterwards. + buf.send().receive().close(); + } + } + + @Test + public void sliceWithSizeGreaterThanCapacityMustThrow() { + try (Buf buf = allocate(8)) { + try (Buf ignored = buf.slice(0, 9)) { + fail("Should not allow slice() size greater than parent capacity."); + } catch (IndexOutOfBoundsException ignore) { + // Good. + } + buf.slice(0, 8).close(); // This is still fine. + try (Buf ignored = buf.slice(1, 8)) { + fail("Should not allow slice() size greater than parent capacity."); + } catch (IndexOutOfBoundsException ignore) { + // Good. + } + // Verify that the slice is closed properly afterwards. + buf.send().receive().close(); + } + } + // ### CODEGEN START primitive accessors tests //