Implement Buf.slice()

Motivation:
Slicing gives you a derived buffer. This is useful for sending along just the part of a buffer that has the relevant data, or to get a new buffer instance for the same data, but with independent read and write offsets.

Modification:
Add slice() methods to the Buf interface, and implement them for MemSegBuf.
Buffer slices increments the reference count of the parent buffer, which prevents the parent from being send()-able.
Slices are themselves also not send()-able.
This is because send() involves ownership transfer, while slicing is like lending out mutable borrows.
The send() capability returns to the parent buffer once all slices are closed.
This commit is contained in:
Chris Vest 2020-10-15 16:20:26 +02:00
parent 99ad2cc120
commit 7f9ed7dec7
4 changed files with 266 additions and 5 deletions

View File

@ -111,6 +111,37 @@ public interface Buf extends Rc<Buf> {
*/
long getNativeAddress();
/**
* Returns a slice of this buffer's readable bytes.
* Modifying the content of the returned buffer or this buffer affects each other's content,
* while they maintain separate indexes. This method is identical to
* {@code buf.slice(buf.readerIndex(), buf.readableBytes())}.
* This method does not modify {@link #readerIndex()} or {@link #writerIndex()} of this buffer.
* <p>
* This method increments the reference count of this buffer.
* The reference count is decremented again when the slice is deallocated.
*
* @return A new buffer instance, with independent {@link #readerIndex()} and {@link #writerIndex()},
* that is a view of the readable region of this buffer.
*/
default Buf slice() {
return slice(readerIndex(), readableBytes());
}
/**
* Returns a slice of the given region of this buffer.
* Modifying the content of the returned buffer or this buffer affects each other's content,
* while they maintain separate indexes.
* This method does not modify {@link #readerIndex()} or {@link #writerIndex()} of this buffer.
* <p>
* This method increments the reference count of this buffer.
* The reference count is decremented again when the slice is deallocated.
*
* @return A new buffer instance, with independent {@link #readerIndex()} and {@link #writerIndex()},
* that is a view of the given region of this buffer.
*/
Buf slice(int offset, int length);
// ### CODEGEN START primitive accessors interface
// <editor-fold defaultstate="collapsed" desc="Generated primitive accessors interface.">

View File

@ -52,12 +52,18 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
static final Drop<MemSegBuf> SEGMENT_CLOSE = buf -> buf.seg.close();
final MemorySegment seg;
private boolean isBigEndian;
private boolean isSendable;
private int roff;
private int woff;
MemSegBuf(MemorySegment segment, Drop<MemSegBuf> drop) {
MemSegBuf(MemorySegment segmet, Drop<MemSegBuf> drop) {
this(segmet, drop, true);
}
private MemSegBuf(MemorySegment segment, Drop<MemSegBuf> drop, boolean isSendable) {
super(drop);
seg = segment;
this.isSendable = isSendable;
isBigEndian = ByteOrder.nativeOrder() == ByteOrder.BIG_ENDIAN;
}
@ -131,6 +137,15 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
}
}
@Override
public Buf slice(int offset, int length) {
var slice = seg.asSlice(offset, length);
acquire();
Drop<MemSegBuf> drop = b -> close();
var sendable = false; // Sending implies ownership change, which we can't do for slices.
return new MemSegBuf(slice, drop, sendable).writerIndex(length).order(order());
}
// ### CODEGEN START primitive accessors implementation
// <editor-fold defaultstate="collapsed" desc="Generated primitive accessors implementation.">
@ -602,6 +617,10 @@ getByteAtOffset_BE(seg, roff) & 0xFF |
@Override
protected Owned<MemSegBuf> prepareSend() {
if (!isSendable) {
throw new IllegalStateException(
"Cannot send() this buffer. This buffer might be a slice of another buffer.");
}
MemSegBuf outer = this;
boolean isConfined = seg.ownerThread() == null;
MemorySegment transferSegment = isConfined? seg : seg.withOwnerThread(null);

View File

@ -35,6 +35,9 @@ public abstract class RcSupport<I extends Rc<I>, T extends RcSupport<I, T>> impl
if (acquires < 0) {
throw new IllegalStateException("Resource is closed.");
}
if (acquires == Integer.MAX_VALUE) {
throw new IllegalStateException("Cannot acquire more references; counter would overflow.");
}
acquires++;
return self();
}
@ -49,7 +52,7 @@ public abstract class RcSupport<I extends Rc<I>, T extends RcSupport<I, T>> impl
@Override
public final void close() {
if (acquires == -1) {
throw new IllegalStateException("Double-free: Already closed and dropped.");
throw new IllegalStateException("Double-free: Resource already closed and dropped.");
}
if (acquires == 0) {
drop.drop(impl());
@ -64,13 +67,18 @@ public abstract class RcSupport<I extends Rc<I>, T extends RcSupport<I, T>> impl
* This instance immediately becomes inaccessible, and all attempts at accessing this Rc will throw. Calling {@link
* #close()} will have no effect, so this method is safe to call within a try-with-resources statement.
*
* @implNote Not possible without hacks because we need the receiving thread in order to set the new owner in the
* currently owning thread.
* @throws IllegalStateException if this object has any outstanding acquires; that is, if this object has been
* {@link #acquire() acquired} more times than it has been {@link #close() closed}.
*/
@Override
public final Send<I> send() {
if (acquires != 0) {
throw new IllegalStateException(
"Cannot send() a reference counted object with " + acquires + " outstanding acquires: " + this);
}
var owned = prepareSend();
acquires = -2; // close without dropping (also ignore future double-free attempts)
return new TransferSend<I, T>(prepareSend(), drop);
return new TransferSend<I, T>(owned, drop);
}
/**

View File

@ -147,6 +147,22 @@ public abstract class BufTest {
assertEquals((byte) 42, future.get().byteValue());
}
@Test
public void sendMustThrowWhenBufIsAcquired() {
try (Buf buf = allocate(8)) {
try (Buf ignored = buf.acquire()) {
try {
buf.send();
fail("Should not be able to send() a borrowed buffer.");
} catch (IllegalStateException ignore) {
// Good.
}
}
// Now send() should work again.
buf.send().receive().close();
}
}
@Test
public void mustThrowWhenAllocatingZeroSizedBuffer() {
try {
@ -304,6 +320,193 @@ public abstract class BufTest {
}
}
@Test
public void sliceWithoutOffsetAndSizeMustReturnReadableRegion() {
try (Buf buf = allocate(8)) {
for (byte b : new byte[] { 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08 }) {
buf.writeByte(b);
}
assertEquals(0x01, buf.readByte());
buf.writerIndex(buf.writerIndex() - 1);
try (Buf slice = buf.slice()) {
assertArrayEquals(new byte[] {0x02, 0x03, 0x04, 0x05, 0x06, 0x07}, slice.copy());
assertEquals(0, slice.readerIndex());
assertEquals(6, slice.readableBytes());
assertEquals(6, slice.writerIndex());
assertEquals(6, slice.capacity());
assertEquals(0x02, slice.readByte());
assertEquals(0x03, slice.readByte());
assertEquals(0x04, slice.readByte());
assertEquals(0x05, slice.readByte());
assertEquals(0x06, slice.readByte());
assertEquals(0x07, slice.readByte());
try {
slice.readByte();
fail("Should have bounds checked.");
} catch (IndexOutOfBoundsException ignore) {
// Good.
}
}
}
}
@Test
public void sliceWithOffsetAndSizeMustReturnGivenRegion() {
try (Buf buf = allocate(8)) {
for (byte b : new byte[] { 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08 }) {
buf.writeByte(b);
}
buf.readerIndex(3); // Reader and writer offsets must be ignored.
buf.writerIndex(6);
try (Buf slice = buf.slice(1, 6)) {
assertArrayEquals(new byte[] {0x02, 0x03, 0x04, 0x05, 0x06, 0x07}, slice.copy());
assertEquals(0, slice.readerIndex());
assertEquals(6, slice.readableBytes());
assertEquals(6, slice.writerIndex());
assertEquals(6, slice.capacity());
assertEquals(0x02, slice.readByte());
assertEquals(0x03, slice.readByte());
assertEquals(0x04, slice.readByte());
assertEquals(0x05, slice.readByte());
assertEquals(0x06, slice.readByte());
assertEquals(0x07, slice.readByte());
try {
slice.readByte();
fail("Should have bounds checked.");
} catch (IndexOutOfBoundsException ignore) {
// Good.
}
}
}
}
@Test
public void sliceWithoutOffsetAndSizeWillIncreaseReferenceCount() {
try (Buf buf = allocate(8)) {
try (Buf ignored = buf.slice()) {
buf.send();
fail("Should have refused send() of acquired buffer.");
} catch (IllegalStateException ignore) {
// Good.
}
}
}
@Test
public void sliceWithOffsetAndSizeWillIncreaseReferenceCount() {
try (Buf buf = allocate(8)) {
try (Buf ignored = buf.slice(0, 8)) {
buf.send();
fail("Should have refused send() of acquired buffer.");
} catch (IllegalStateException ignore) {
// Good.
}
}
}
@Test
public void sliceWithoutOffsetAndSizeHasSameEndianAsParent() {
try (Buf buf = allocate(8)) {
buf.order(ByteOrder.BIG_ENDIAN);
buf.writeLong(0x0102030405060708L);
try (Buf slice = buf.slice()) {
assertEquals(0x0102030405060708L, slice.readLong());
}
buf.order(ByteOrder.LITTLE_ENDIAN);
try (Buf slice = buf.slice()) {
assertEquals(0x0807060504030201L, slice.readLong());
}
}
}
@Test
public void sliceWithOffsetAndSizeHasSameEndianAsParent() {
try (Buf buf = allocate(8)) {
buf.order(ByteOrder.BIG_ENDIAN);
buf.writeLong(0x0102030405060708L);
try (Buf slice = buf.slice(0, 8)) {
assertEquals(0x0102030405060708L, slice.readLong());
}
buf.order(ByteOrder.LITTLE_ENDIAN);
try (Buf slice = buf.slice(0, 8)) {
assertEquals(0x0807060504030201L, slice.readLong());
}
}
}
@Test
public void sendOnSliceWithoutOffsetAndSizeMustThrow() {
try (Buf buf = allocate(8)) {
try (Buf slice = buf.slice()) {
slice.send();
fail("Should not be able to send a slice.");
} catch (IllegalStateException ignore) {
// Good.
}
// Verify that the slice is closed properly afterwards.
buf.send().receive().close();
}
}
@Test
public void sendOnSliceWithOffsetAndSizeMustThrow() {
try (Buf buf = allocate(8)) {
try (Buf slice = buf.slice(0, 8)) {
slice.send();
fail("Should not be able to send a slice.");
} catch (IllegalStateException ignore) {
// Good.
}
// Verify that the slice is closed properly afterwards.
buf.send().receive().close();
}
}
@Test
public void sliceWithNegativeOffsetMustThrow() {
try (Buf buf = allocate(8)) {
try (Buf ignored = buf.slice(-1, 1)) {
fail("Should not allow negative offsets to slice().");
} catch (IndexOutOfBoundsException ignore) {
// Good.
}
// Verify that the slice is closed properly afterwards.
buf.send().receive().close();
}
}
@Test
public void sliceWithNegativeSizeMustThrow() {
try (Buf buf = allocate(8)) {
try (Buf ignored = buf.slice(0, -1)) {
fail("Should not allow negative size to slice().");
} catch (IndexOutOfBoundsException ignore) {
// Good.
}
// Verify that the slice is closed properly afterwards.
buf.send().receive().close();
}
}
@Test
public void sliceWithSizeGreaterThanCapacityMustThrow() {
try (Buf buf = allocate(8)) {
try (Buf ignored = buf.slice(0, 9)) {
fail("Should not allow slice() size greater than parent capacity.");
} catch (IndexOutOfBoundsException ignore) {
// Good.
}
buf.slice(0, 8).close(); // This is still fine.
try (Buf ignored = buf.slice(1, 8)) {
fail("Should not allow slice() size greater than parent capacity.");
} catch (IndexOutOfBoundsException ignore) {
// Good.
}
// Verify that the slice is closed properly afterwards.
buf.send().receive().close();
}
}
// ### CODEGEN START primitive accessors tests
// <editor-fold defaultstate="collapsed" desc="Generated primitive accessors tests.">