commit
8d05092c36
@ -100,9 +100,9 @@ import java.nio.ByteOrder;
|
||||
* 0 <= readerOffset <= writerOffset <= capacity
|
||||
* </pre>
|
||||
*
|
||||
* <h3 name="slice-bifurcate">Slice vs. Bifurcate</h3>
|
||||
* <h3 name="slice-split">Slice vs. Split</h3>
|
||||
*
|
||||
* The {@link #slice()} and {@link #bifurcate()} methods both return new buffers on the memory of the buffer they're
|
||||
* The {@link #slice()} and {@link #split()} methods both return new buffers on the memory of the buffer they're
|
||||
* called on.
|
||||
* However, there are also important differences between the two, as they are aimed at different use cases that were
|
||||
* previously (in the {@code ByteBuf} API) covered by {@code slice()} alone.
|
||||
@ -114,7 +114,7 @@ import java.nio.ByteOrder;
|
||||
* Since the memory is shared, changes to the data made through one will be visible through the other.
|
||||
* </li>
|
||||
* <li>
|
||||
* Bifurcation breaks the ownership of the memory in two.
|
||||
* Split breaks the ownership of the memory in two.
|
||||
* Both resulting buffers retain ownership of their respective region of memory.
|
||||
* They can do this because the regions are guaranteed to not overlap; data changes through one buffer will not
|
||||
* be visible through the other.
|
||||
@ -124,10 +124,10 @@ import java.nio.ByteOrder;
|
||||
* These differences mean that slicing is mostly suitable for when you temporarily want to share a focused area of a
|
||||
* buffer.
|
||||
* Examples of this include doing IO, or decoding a bounded part of a larger message.
|
||||
* On the other hand, bifurcate is suitable for when you want to hand over a region of a buffer to some other,
|
||||
* On the other hand, split is suitable for when you want to hand over a region of a buffer to some other,
|
||||
* perhaps unknown, piece of code, and relinquish your ownership of that buffer region in the process.
|
||||
* Examples include aggregating messages into an accumulator buffer, and sending messages down the pipeline for
|
||||
* further processing, as bifurcated buffer regions, once their data has been received in its entirety.
|
||||
* further processing, as split buffer regions, once their data has been received in its entirety.
|
||||
*/
|
||||
public interface Buffer extends Rc<Buffer>, BufferAccessors {
|
||||
/**
|
||||
@ -454,8 +454,8 @@ public interface Buffer extends Rc<Buffer>, BufferAccessors {
|
||||
* The slice is created with a {@linkplain #writerOffset() write offset} equal to the length of the slice,
|
||||
* so that the entire contents of the slice is ready to be read.
|
||||
* <p>
|
||||
* See the <a href="#slice-bifurcate">Slice vs. Bifurcate</a> section for details on the difference between slice
|
||||
* and bifurcate.
|
||||
* See the <a href="#slice-split">Slice vs. Split</a> section for details on the difference between slice
|
||||
* and split.
|
||||
*
|
||||
* @return A new buffer instance, with independent {@link #readerOffset()} and {@link #writerOffset()},
|
||||
* that is a view of the readable region of this buffer.
|
||||
@ -476,8 +476,8 @@ public interface Buffer extends Rc<Buffer>, BufferAccessors {
|
||||
* The slice is created with a {@linkplain #writerOffset() write offset} equal to the length of the slice,
|
||||
* so that the entire contents of the slice is ready to be read.
|
||||
* <p>
|
||||
* See the <a href="#slice-bifurcate">Slice vs. Bifurcate</a> section for details on the difference between slice
|
||||
* and bifurcate.
|
||||
* See the <a href="#slice-split">Slice vs. Split</a> section for details on the difference between slice
|
||||
* and split.
|
||||
*
|
||||
* @return A new buffer instance, with independent {@link #readerOffset()} and {@link #writerOffset()},
|
||||
* that is a view of the given region of this buffer.
|
||||
@ -516,23 +516,23 @@ public interface Buffer extends Rc<Buffer>, BufferAccessors {
|
||||
* +---+---------------------+ +---------------+
|
||||
* Returned buffer. This buffer.
|
||||
* }</pre>
|
||||
* When the buffers are in this state, both of the bifurcated parts retain an atomic reference count on the
|
||||
* When the buffers are in this state, both of the split parts retain an atomic reference count on the
|
||||
* underlying memory. This means that shared underlying memory will not be deallocated or returned to a pool, until
|
||||
* all the bifurcated parts have been closed.
|
||||
* all the split parts have been closed.
|
||||
* <p>
|
||||
* Composite buffers have it a little easier, in that at most only one of the constituent buffers will actually be
|
||||
* bifurcated. If the split point lands perfectly between two constituent buffers, then a composite buffer can
|
||||
* split. If the split point lands perfectly between two constituent buffers, then a composite buffer can
|
||||
* simply split its internal array in two.
|
||||
* <p>
|
||||
* Bifurcated buffers support all operations that normal buffers do, including {@link #ensureWritable(int)}.
|
||||
* Split buffers support all operations that normal buffers do, including {@link #ensureWritable(int)}.
|
||||
* <p>
|
||||
* See the <a href="#slice-bifurcate">Slice vs. Bifurcate</a> section for details on the difference between slice
|
||||
* and bifurcate.
|
||||
* See the <a href="#slice-split">Slice vs. Split</a> section for details on the difference between slice
|
||||
* and split.
|
||||
*
|
||||
* @return A new buffer with independent and exclusive ownership over the read and readable bytes from this buffer.
|
||||
*/
|
||||
default Buffer bifurcate() {
|
||||
return bifurcate(writerOffset());
|
||||
default Buffer split() {
|
||||
return split(writerOffset());
|
||||
}
|
||||
|
||||
/**
|
||||
@ -549,7 +549,7 @@ public interface Buffer extends Rc<Buffer>, BufferAccessors {
|
||||
* <p>
|
||||
* The memory region in the returned buffer will become inaccessible through this buffer. If the
|
||||
* {@link #readerOffset()} or {@link #writerOffset()} of this buffer lie prior to the {@code splitOffset},
|
||||
* then those offsets will be moved forward, so they land on offset 0 after the bifurcation.
|
||||
* then those offsets will be moved forward, so they land on offset 0 after the split.
|
||||
* <p>
|
||||
* Effectively, the following transformation takes place:
|
||||
* <pre>{@code
|
||||
@ -567,22 +567,22 @@ public interface Buffer extends Rc<Buffer>, BufferAccessors {
|
||||
* +---------------+ +---------------+
|
||||
* Returned buffer. This buffer.
|
||||
* }</pre>
|
||||
* When the buffers are in this state, both of the bifurcated parts retain an atomic reference count on the
|
||||
* When the buffers are in this state, both of the split parts retain an atomic reference count on the
|
||||
* underlying memory. This means that shared underlying memory will not be deallocated or returned to a pool, until
|
||||
* all the bifurcated parts have been closed.
|
||||
* all the split parts have been closed.
|
||||
* <p>
|
||||
* Composite buffers have it a little easier, in that at most only one of the constituent buffers will actually be
|
||||
* bifurcated. If the split point lands perfectly between two constituent buffers, then a composite buffer can
|
||||
* split. If the split point lands perfectly between two constituent buffers, then a composite buffer can
|
||||
* simply split its internal array in two.
|
||||
* <p>
|
||||
* Bifurcated buffers support all operations that normal buffers do, including {@link #ensureWritable(int)}.
|
||||
* Split buffers support all operations that normal buffers do, including {@link #ensureWritable(int)}.
|
||||
* <p>
|
||||
* See the <a href="#slice-bifurcate">Slice vs. Bifurcate</a> section for details on the difference between slice
|
||||
* and bifurcate.
|
||||
* See the <a href="#slice-split">Slice vs. Split</a> section for details on the difference between slice
|
||||
* and split.
|
||||
*
|
||||
* @return A new buffer with independent and exclusive ownership over the read and readable bytes from this buffer.
|
||||
*/
|
||||
Buffer bifurcate(int splitOffset);
|
||||
Buffer split(int splitOffset);
|
||||
|
||||
/**
|
||||
* Discards the read bytes, and moves the buffer contents to the beginning of the buffer.
|
||||
|
@ -922,7 +922,7 @@ public final class CompositeBuffer extends RcSupport<Buffer, CompositeBuffer> im
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompositeBuffer bifurcate(int splitOffset) {
|
||||
public CompositeBuffer split(int splitOffset) {
|
||||
if (splitOffset < 0) {
|
||||
throw new IllegalArgumentException("The split offset cannot be negative: " + splitOffset + '.');
|
||||
}
|
||||
@ -931,29 +931,29 @@ public final class CompositeBuffer extends RcSupport<Buffer, CompositeBuffer> im
|
||||
"but the split offset was " + splitOffset + ", and capacity is " + capacity() + '.');
|
||||
}
|
||||
if (!isOwned()) {
|
||||
throw new IllegalStateException("Cannot bifurcate a buffer that is not owned.");
|
||||
throw new IllegalStateException("Cannot split a buffer that is not owned.");
|
||||
}
|
||||
if (bufs.length == 0) {
|
||||
// Bifurcating a zero-length buffer is trivial.
|
||||
// Splitting a zero-length buffer is trivial.
|
||||
return new CompositeBuffer(allocator, bufs, unsafeGetDrop(), true).order(order);
|
||||
}
|
||||
|
||||
int i = searchOffsets(splitOffset);
|
||||
int off = splitOffset - offsets[i];
|
||||
Buffer[] bifs = Arrays.copyOf(bufs, off == 0? i : 1 + i);
|
||||
Buffer[] splits = Arrays.copyOf(bufs, off == 0? i : 1 + i);
|
||||
bufs = Arrays.copyOfRange(bufs, off == bufs[i].capacity()? 1 + i : i, bufs.length);
|
||||
if (off > 0 && bifs.length > 0 && off < bifs[bifs.length - 1].capacity()) {
|
||||
bifs[bifs.length - 1] = bufs[0].bifurcate(off);
|
||||
if (off > 0 && splits.length > 0 && off < splits[splits.length - 1].capacity()) {
|
||||
splits[splits.length - 1] = bufs[0].split(off);
|
||||
}
|
||||
computeBufferOffsets();
|
||||
try {
|
||||
var compositeBuf = new CompositeBuffer(allocator, bifs, unsafeGetDrop(), true);
|
||||
compositeBuf.order = order; // Preserve byte order even if bifs array is empty.
|
||||
var compositeBuf = new CompositeBuffer(allocator, splits, unsafeGetDrop(), true);
|
||||
compositeBuf.order = order; // Preserve byte order even if splits array is empty.
|
||||
return compositeBuf;
|
||||
} finally {
|
||||
// Drop our references to the buffers in the bifs array. They belong to the new composite buffer now.
|
||||
for (Buffer bif : bifs) {
|
||||
bif.close();
|
||||
// Drop our references to the buffers in the splits array. They belong to the new composite buffer now.
|
||||
for (Buffer split : splits) {
|
||||
split.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -417,7 +417,7 @@ class NioBuffer extends RcSupport<Buffer, NioBuffer> implements Buffer, Readable
|
||||
}
|
||||
|
||||
@Override
|
||||
public Buffer bifurcate(int splitOffset) {
|
||||
public Buffer split(int splitOffset) {
|
||||
if (splitOffset < 0) {
|
||||
throw new IllegalArgumentException("The split offset cannot be negative: " + splitOffset + '.');
|
||||
}
|
||||
@ -426,25 +426,25 @@ class NioBuffer extends RcSupport<Buffer, NioBuffer> implements Buffer, Readable
|
||||
"but the split offset was " + splitOffset + ", and capacity is " + capacity() + '.');
|
||||
}
|
||||
if (!isOwned()) {
|
||||
throw attachTrace(new IllegalStateException("Cannot bifurcate a buffer that is not owned."));
|
||||
throw attachTrace(new IllegalStateException("Cannot split a buffer that is not owned."));
|
||||
}
|
||||
var drop = (ArcDrop<NioBuffer>) unsafeGetDrop();
|
||||
unsafeSetDrop(new ArcDrop<>(drop));
|
||||
var bifurcatedBuffer = rmem.slice(0, splitOffset);
|
||||
var splitByteBuffer = rmem.slice(0, splitOffset);
|
||||
// TODO maybe incrementing the existing ArcDrop is enough; maybe we don't need to wrap it in another ArcDrop.
|
||||
var bifurcatedBuf = new NioBuffer(base, bifurcatedBuffer, control, new ArcDrop<>(drop.increment()));
|
||||
bifurcatedBuf.woff = Math.min(woff, splitOffset);
|
||||
bifurcatedBuf.roff = Math.min(roff, splitOffset);
|
||||
bifurcatedBuf.order(order());
|
||||
var splitBuffer = new NioBuffer(base, splitByteBuffer, control, new ArcDrop<>(drop.increment()));
|
||||
splitBuffer.woff = Math.min(woff, splitOffset);
|
||||
splitBuffer.roff = Math.min(roff, splitOffset);
|
||||
splitBuffer.order(order());
|
||||
boolean readOnly = readOnly();
|
||||
bifurcatedBuf.readOnly(readOnly);
|
||||
splitBuffer.readOnly(readOnly);
|
||||
rmem = rmem.slice(splitOffset, rmem.capacity() - splitOffset);
|
||||
if (!readOnly) {
|
||||
wmem = rmem;
|
||||
}
|
||||
woff = Math.max(woff, splitOffset) - splitOffset;
|
||||
roff = Math.max(roff, splitOffset) - splitOffset;
|
||||
return bifurcatedBuf;
|
||||
return splitBuffer;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -548,7 +548,7 @@ class MemSegBuffer extends RcSupport<Buffer, MemSegBuffer> implements Buffer, Re
|
||||
}
|
||||
|
||||
@Override
|
||||
public Buffer bifurcate(int splitOffset) {
|
||||
public Buffer split(int splitOffset) {
|
||||
if (splitOffset < 0) {
|
||||
throw new IllegalArgumentException("The split offset cannot be negative: " + splitOffset + '.');
|
||||
}
|
||||
@ -557,24 +557,24 @@ class MemSegBuffer extends RcSupport<Buffer, MemSegBuffer> implements Buffer, Re
|
||||
"but the split offset was " + splitOffset + ", and capacity is " + capacity() + '.');
|
||||
}
|
||||
if (!isOwned()) {
|
||||
throw attachTrace(new IllegalStateException("Cannot bifurcate a buffer that is not owned."));
|
||||
throw attachTrace(new IllegalStateException("Cannot split a buffer that is not owned."));
|
||||
}
|
||||
var drop = (ArcDrop<MemSegBuffer>) unsafeGetDrop();
|
||||
unsafeSetDrop(new ArcDrop<>(drop));
|
||||
var bifurcatedSeg = seg.asSlice(0, splitOffset);
|
||||
var bifurcatedBuf = new MemSegBuffer(base, bifurcatedSeg, new ArcDrop<>(drop.increment()), alloc);
|
||||
bifurcatedBuf.woff = Math.min(woff, splitOffset);
|
||||
bifurcatedBuf.roff = Math.min(roff, splitOffset);
|
||||
bifurcatedBuf.order(order);
|
||||
var splitSegment = seg.asSlice(0, splitOffset);
|
||||
var splitBuffer = new MemSegBuffer(base, splitSegment, new ArcDrop<>(drop.increment()), alloc);
|
||||
splitBuffer.woff = Math.min(woff, splitOffset);
|
||||
splitBuffer.roff = Math.min(roff, splitOffset);
|
||||
splitBuffer.order(order);
|
||||
boolean readOnly = readOnly();
|
||||
bifurcatedBuf.readOnly(readOnly);
|
||||
splitBuffer.readOnly(readOnly);
|
||||
seg = seg.asSlice(splitOffset, seg.byteSize() - splitOffset);
|
||||
if (!readOnly) {
|
||||
wseg = seg;
|
||||
}
|
||||
woff = Math.max(woff, splitOffset) - splitOffset;
|
||||
roff = Math.max(roff, splitOffset) - splitOffset;
|
||||
return bifurcatedBuf;
|
||||
return splitBuffer;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -458,7 +458,7 @@ public class UnsafeBuffer extends RcSupport<Buffer, UnsafeBuffer> implements Buf
|
||||
}
|
||||
|
||||
@Override
|
||||
public Buffer bifurcate(int splitOffset) {
|
||||
public Buffer split(int splitOffset) {
|
||||
if (splitOffset < 0) {
|
||||
throw new IllegalArgumentException("The split offset cannot be negative: " + splitOffset + '.');
|
||||
}
|
||||
@ -467,17 +467,17 @@ public class UnsafeBuffer extends RcSupport<Buffer, UnsafeBuffer> implements Buf
|
||||
"but the split offset was " + splitOffset + ", and capacity is " + capacity() + '.');
|
||||
}
|
||||
if (!isOwned()) {
|
||||
throw attachTrace(new IllegalStateException("Cannot bifurcate a buffer that is not owned."));
|
||||
throw attachTrace(new IllegalStateException("Cannot split a buffer that is not owned."));
|
||||
}
|
||||
var drop = (ArcDrop<UnsafeBuffer>) unsafeGetDrop();
|
||||
unsafeSetDrop(new ArcDrop<>(drop));
|
||||
// TODO maybe incrementing the existing ArcDrop is enough; maybe we don't need to wrap it in another ArcDrop.
|
||||
var bifurcatedBuf = new UnsafeBuffer(memory, baseOffset, splitOffset, control, new ArcDrop<>(drop.increment()));
|
||||
bifurcatedBuf.woff = Math.min(woff, splitOffset);
|
||||
bifurcatedBuf.roff = Math.min(roff, splitOffset);
|
||||
bifurcatedBuf.order(order());
|
||||
var splitBuffer = new UnsafeBuffer(memory, baseOffset, splitOffset, control, new ArcDrop<>(drop.increment()));
|
||||
splitBuffer.woff = Math.min(woff, splitOffset);
|
||||
splitBuffer.roff = Math.min(roff, splitOffset);
|
||||
splitBuffer.order(order());
|
||||
boolean readOnly = readOnly();
|
||||
bifurcatedBuf.readOnly(readOnly);
|
||||
splitBuffer.readOnly(readOnly);
|
||||
rsize -= splitOffset;
|
||||
baseOffset += splitOffset;
|
||||
address += splitOffset;
|
||||
@ -486,7 +486,7 @@ public class UnsafeBuffer extends RcSupport<Buffer, UnsafeBuffer> implements Buf
|
||||
}
|
||||
woff = Math.max(woff, splitOffset) - splitOffset;
|
||||
roff = Math.max(roff, splitOffset) - splitOffset;
|
||||
return bifurcatedBuf;
|
||||
return splitBuffer;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -403,32 +403,32 @@ public class BufferReferenceCountingTest extends BufferTestSupport {
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource("allocators")
|
||||
public void bifurcateWithNegativeOffsetMustThrow(Fixture fixture) {
|
||||
public void splitWithNegativeOffsetMustThrow(Fixture fixture) {
|
||||
try (BufferAllocator allocator = fixture.createAllocator();
|
||||
Buffer buf = allocator.allocate(8)) {
|
||||
buf.bifurcate(0).close();
|
||||
assertThrows(IllegalArgumentException.class, () -> buf.bifurcate(-1));
|
||||
buf.split(0).close();
|
||||
assertThrows(IllegalArgumentException.class, () -> buf.split(-1));
|
||||
}
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource("allocators")
|
||||
public void bifurcateWithOversizedOffsetMustThrow(Fixture fixture) {
|
||||
public void splitWithOversizedOffsetMustThrow(Fixture fixture) {
|
||||
try (BufferAllocator allocator = fixture.createAllocator();
|
||||
Buffer buf = allocator.allocate(8)) {
|
||||
assertThrows(IllegalArgumentException.class, () -> buf.bifurcate(9));
|
||||
buf.bifurcate(8).close();
|
||||
assertThrows(IllegalArgumentException.class, () -> buf.split(9));
|
||||
buf.split(8).close();
|
||||
}
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource("allocators")
|
||||
public void bifurcateOfNonOwnedBufferMustThrow(Fixture fixture) {
|
||||
public void splitOfNonOwnedBufferMustThrow(Fixture fixture) {
|
||||
try (BufferAllocator allocator = fixture.createAllocator();
|
||||
Buffer buf = allocator.allocate(8)) {
|
||||
buf.writeInt(1);
|
||||
try (Buffer acquired = buf.acquire()) {
|
||||
var exc = assertThrows(IllegalStateException.class, () -> acquired.bifurcate());
|
||||
var exc = assertThrows(IllegalStateException.class, () -> acquired.split());
|
||||
assertThat(exc).hasMessageContaining("owned");
|
||||
}
|
||||
}
|
||||
@ -436,11 +436,11 @@ public class BufferReferenceCountingTest extends BufferTestSupport {
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource("allocators")
|
||||
public void bifurcateOnOffsetOfNonOwnedBufferMustThrow(Fixture fixture) {
|
||||
public void splitOnOffsetOfNonOwnedBufferMustThrow(Fixture fixture) {
|
||||
try (BufferAllocator allocator = fixture.createAllocator();
|
||||
Buffer buf = allocator.allocate(8)) {
|
||||
try (Buffer acquired = buf.acquire()) {
|
||||
var exc = assertThrows(IllegalStateException.class, () -> acquired.bifurcate(4));
|
||||
var exc = assertThrows(IllegalStateException.class, () -> acquired.split(4));
|
||||
assertThat(exc).hasMessageContaining("owned");
|
||||
}
|
||||
}
|
||||
@ -448,47 +448,47 @@ public class BufferReferenceCountingTest extends BufferTestSupport {
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource("allocators")
|
||||
public void bifurcateOnOffsetMustTruncateGreaterOffsets(Fixture fixture) {
|
||||
public void splitOnOffsetMustTruncateGreaterOffsets(Fixture fixture) {
|
||||
try (BufferAllocator allocator = fixture.createAllocator();
|
||||
Buffer buf = allocator.allocate(8)) {
|
||||
buf.writeInt(0x01020304);
|
||||
buf.writeByte((byte) 0x05);
|
||||
buf.readInt();
|
||||
try (Buffer bif = buf.bifurcate(2)) {
|
||||
try (Buffer split = buf.split(2)) {
|
||||
assertThat(buf.readerOffset()).isEqualTo(2);
|
||||
assertThat(buf.writerOffset()).isEqualTo(3);
|
||||
|
||||
assertThat(bif.readerOffset()).isEqualTo(2);
|
||||
assertThat(bif.writerOffset()).isEqualTo(2);
|
||||
assertThat(split.readerOffset()).isEqualTo(2);
|
||||
assertThat(split.writerOffset()).isEqualTo(2);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource("allocators")
|
||||
public void bifurcateOnOffsetMustExtendLesserOffsets(Fixture fixture) {
|
||||
public void splitOnOffsetMustExtendLesserOffsets(Fixture fixture) {
|
||||
try (BufferAllocator allocator = fixture.createAllocator();
|
||||
Buffer buf = allocator.allocate(8)) {
|
||||
buf.writeInt(0x01020304);
|
||||
buf.readInt();
|
||||
try (Buffer bif = buf.bifurcate(6)) {
|
||||
try (Buffer split = buf.split(6)) {
|
||||
assertThat(buf.readerOffset()).isEqualTo(0);
|
||||
assertThat(buf.writerOffset()).isEqualTo(0);
|
||||
|
||||
assertThat(bif.readerOffset()).isEqualTo(4);
|
||||
assertThat(bif.writerOffset()).isEqualTo(4);
|
||||
assertThat(split.readerOffset()).isEqualTo(4);
|
||||
assertThat(split.writerOffset()).isEqualTo(4);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource("allocators")
|
||||
public void bifurcatedPartMustContainFirstHalfOfBuffer(Fixture fixture) {
|
||||
public void splitPartMustContainFirstHalfOfBuffer(Fixture fixture) {
|
||||
try (BufferAllocator allocator = fixture.createAllocator();
|
||||
Buffer buf = allocator.allocate(16).order(BIG_ENDIAN)) {
|
||||
buf.writeLong(0x0102030405060708L);
|
||||
assertThat(buf.readByte()).isEqualTo((byte) 0x01);
|
||||
try (Buffer bif = buf.bifurcate()) {
|
||||
try (Buffer split = buf.split()) {
|
||||
// Original buffer:
|
||||
assertThat(buf.capacity()).isEqualTo(8);
|
||||
assertThat(buf.readerOffset()).isZero();
|
||||
@ -496,19 +496,19 @@ public class BufferReferenceCountingTest extends BufferTestSupport {
|
||||
assertThat(buf.readableBytes()).isZero();
|
||||
assertThrows(IndexOutOfBoundsException.class, () -> buf.readByte());
|
||||
|
||||
// Bifurcated part:
|
||||
assertThat(bif.capacity()).isEqualTo(8);
|
||||
assertThat(bif.readerOffset()).isOne();
|
||||
assertThat(bif.writerOffset()).isEqualTo(8);
|
||||
assertThat(bif.readableBytes()).isEqualTo(7);
|
||||
assertThat(bif.readByte()).isEqualTo((byte) 0x02);
|
||||
assertThat(bif.readInt()).isEqualTo(0x03040506);
|
||||
assertThat(bif.readByte()).isEqualTo((byte) 0x07);
|
||||
assertThat(bif.readByte()).isEqualTo((byte) 0x08);
|
||||
assertThrows(IndexOutOfBoundsException.class, () -> bif.readByte());
|
||||
// Split part:
|
||||
assertThat(split.capacity()).isEqualTo(8);
|
||||
assertThat(split.readerOffset()).isOne();
|
||||
assertThat(split.writerOffset()).isEqualTo(8);
|
||||
assertThat(split.readableBytes()).isEqualTo(7);
|
||||
assertThat(split.readByte()).isEqualTo((byte) 0x02);
|
||||
assertThat(split.readInt()).isEqualTo(0x03040506);
|
||||
assertThat(split.readByte()).isEqualTo((byte) 0x07);
|
||||
assertThat(split.readByte()).isEqualTo((byte) 0x08);
|
||||
assertThrows(IndexOutOfBoundsException.class, () -> split.readByte());
|
||||
}
|
||||
|
||||
// Bifurcated part does NOT return when closed:
|
||||
// Split part does NOT return when closed:
|
||||
assertThat(buf.capacity()).isEqualTo(8);
|
||||
assertThat(buf.readerOffset()).isZero();
|
||||
assertThat(buf.writerOffset()).isZero();
|
||||
@ -519,12 +519,12 @@ public class BufferReferenceCountingTest extends BufferTestSupport {
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource("allocators")
|
||||
public void bifurcatedPartsMustBeIndividuallySendable(Fixture fixture) {
|
||||
public void splitPartsMustBeIndividuallySendable(Fixture fixture) {
|
||||
try (BufferAllocator allocator = fixture.createAllocator();
|
||||
Buffer buf = allocator.allocate(16).order(BIG_ENDIAN)) {
|
||||
buf.writeLong(0x0102030405060708L);
|
||||
assertThat(buf.readByte()).isEqualTo((byte) 0x01);
|
||||
try (Buffer sentBif = buf.bifurcate().send().receive()) {
|
||||
try (Buffer sentSplit = buf.split().send().receive()) {
|
||||
try (Buffer sentBuf = buf.send().receive()) {
|
||||
assertThat(sentBuf.capacity()).isEqualTo(8);
|
||||
assertThat(sentBuf.readerOffset()).isZero();
|
||||
@ -533,28 +533,28 @@ public class BufferReferenceCountingTest extends BufferTestSupport {
|
||||
assertThrows(IndexOutOfBoundsException.class, () -> sentBuf.readByte());
|
||||
}
|
||||
|
||||
assertThat(sentBif.capacity()).isEqualTo(8);
|
||||
assertThat(sentBif.readerOffset()).isOne();
|
||||
assertThat(sentBif.writerOffset()).isEqualTo(8);
|
||||
assertThat(sentBif.readableBytes()).isEqualTo(7);
|
||||
assertThat(sentBif.readByte()).isEqualTo((byte) 0x02);
|
||||
assertThat(sentBif.readInt()).isEqualTo(0x03040506);
|
||||
assertThat(sentBif.readByte()).isEqualTo((byte) 0x07);
|
||||
assertThat(sentBif.readByte()).isEqualTo((byte) 0x08);
|
||||
assertThrows(IndexOutOfBoundsException.class, () -> sentBif.readByte());
|
||||
assertThat(sentSplit.capacity()).isEqualTo(8);
|
||||
assertThat(sentSplit.readerOffset()).isOne();
|
||||
assertThat(sentSplit.writerOffset()).isEqualTo(8);
|
||||
assertThat(sentSplit.readableBytes()).isEqualTo(7);
|
||||
assertThat(sentSplit.readByte()).isEqualTo((byte) 0x02);
|
||||
assertThat(sentSplit.readInt()).isEqualTo(0x03040506);
|
||||
assertThat(sentSplit.readByte()).isEqualTo((byte) 0x07);
|
||||
assertThat(sentSplit.readByte()).isEqualTo((byte) 0x08);
|
||||
assertThrows(IndexOutOfBoundsException.class, () -> sentSplit.readByte());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource("allocators")
|
||||
public void mustBePossibleToBifurcateMoreThanOnce(Fixture fixture) {
|
||||
public void mustBePossibleToSplitMoreThanOnce(Fixture fixture) {
|
||||
try (BufferAllocator allocator = fixture.createAllocator();
|
||||
Buffer buf = allocator.allocate(16).order(BIG_ENDIAN)) {
|
||||
buf.writeLong(0x0102030405060708L);
|
||||
try (Buffer a = buf.bifurcate()) {
|
||||
try (Buffer a = buf.split()) {
|
||||
a.writerOffset(4);
|
||||
try (Buffer b = a.bifurcate()) {
|
||||
try (Buffer b = a.split()) {
|
||||
assertEquals(0x01020304, b.readInt());
|
||||
a.writerOffset(4);
|
||||
assertEquals(0x05060708, a.readInt());
|
||||
@ -562,7 +562,7 @@ public class BufferReferenceCountingTest extends BufferTestSupport {
|
||||
assertThrows(IndexOutOfBoundsException.class, () -> a.readByte());
|
||||
buf.writeLong(0xA1A2A3A4A5A6A7A8L);
|
||||
buf.writerOffset(4);
|
||||
try (Buffer c = buf.bifurcate()) {
|
||||
try (Buffer c = buf.split()) {
|
||||
assertEquals(0xA1A2A3A4, c.readInt());
|
||||
buf.writerOffset(4);
|
||||
assertEquals(0xA5A6A7A8, buf.readInt());
|
||||
@ -576,19 +576,19 @@ public class BufferReferenceCountingTest extends BufferTestSupport {
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource("allocators")
|
||||
public void mustBePossibleToBifurcateOwnedSlices(Fixture fixture) {
|
||||
public void mustBePossibleToSplitOwnedSlices(Fixture fixture) {
|
||||
try (BufferAllocator allocator = fixture.createAllocator()) {
|
||||
Buffer buf = allocator.allocate(16).order(BIG_ENDIAN);
|
||||
buf.writeLong(0x0102030405060708L);
|
||||
try (Buffer slice = buf.slice()) {
|
||||
buf.close();
|
||||
assertTrue(slice.isOwned());
|
||||
try (Buffer bifurcate = slice.bifurcate(4)) {
|
||||
bifurcate.reset().ensureWritable(Long.BYTES);
|
||||
try (Buffer split = slice.split(4)) {
|
||||
split.reset().ensureWritable(Long.BYTES);
|
||||
slice.reset().ensureWritable(Long.BYTES);
|
||||
assertThat(bifurcate.capacity()).isEqualTo(Long.BYTES);
|
||||
assertThat(split.capacity()).isEqualTo(Long.BYTES);
|
||||
assertThat(slice.capacity()).isEqualTo(Long.BYTES);
|
||||
assertThat(bifurcate.getLong(0)).isEqualTo(0x01020304_00000000L);
|
||||
assertThat(split.getLong(0)).isEqualTo(0x01020304_00000000L);
|
||||
assertThat(slice.getLong(0)).isEqualTo(0x05060708_00000000L);
|
||||
}
|
||||
}
|
||||
@ -597,15 +597,15 @@ public class BufferReferenceCountingTest extends BufferTestSupport {
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource("allocators")
|
||||
public void bifurcatedBufferMustHaveSameByteOrderAsParent(Fixture fixture) {
|
||||
public void splitBufferMustHaveSameByteOrderAsParent(Fixture fixture) {
|
||||
try (BufferAllocator allocator = fixture.createAllocator();
|
||||
Buffer buf = allocator.allocate(8).order(BIG_ENDIAN)) {
|
||||
buf.writeLong(0x0102030405060708L);
|
||||
try (Buffer a = buf.bifurcate()) {
|
||||
try (Buffer a = buf.split()) {
|
||||
assertThat(a.order()).isEqualTo(BIG_ENDIAN);
|
||||
a.order(LITTLE_ENDIAN);
|
||||
a.writerOffset(4);
|
||||
try (Buffer b = a.bifurcate()) {
|
||||
try (Buffer b = a.split()) {
|
||||
assertThat(b.order()).isEqualTo(LITTLE_ENDIAN);
|
||||
assertThat(buf.order()).isEqualTo(BIG_ENDIAN);
|
||||
}
|
||||
@ -615,11 +615,11 @@ public class BufferReferenceCountingTest extends BufferTestSupport {
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource("allocators")
|
||||
public void ensureWritableOnBifurcatedBuffers(Fixture fixture) {
|
||||
public void ensureWritableOnSplitBuffers(Fixture fixture) {
|
||||
try (BufferAllocator allocator = fixture.createAllocator();
|
||||
Buffer buf = allocator.allocate(8)) {
|
||||
buf.writeLong(0x0102030405060708L);
|
||||
try (Buffer a = buf.bifurcate()) {
|
||||
try (Buffer a = buf.split()) {
|
||||
assertEquals(0x0102030405060708L, a.readLong());
|
||||
a.ensureWritable(8);
|
||||
a.writeLong(0xA1A2A3A4A5A6A7A8L);
|
||||
@ -634,13 +634,13 @@ public class BufferReferenceCountingTest extends BufferTestSupport {
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource("allocators")
|
||||
public void ensureWritableOnBifurcatedBuffersWithOddOffsets(Fixture fixture) {
|
||||
public void ensureWritableOnSplitBuffersWithOddOffsets(Fixture fixture) {
|
||||
try (BufferAllocator allocator = fixture.createAllocator();
|
||||
Buffer buf = allocator.allocate(10).order(BIG_ENDIAN)) {
|
||||
buf.writeLong(0x0102030405060708L);
|
||||
buf.writeByte((byte) 0x09);
|
||||
buf.readByte();
|
||||
try (Buffer a = buf.bifurcate()) {
|
||||
try (Buffer a = buf.split()) {
|
||||
assertEquals(0x0203040506070809L, a.readLong());
|
||||
a.ensureWritable(8);
|
||||
a.writeLong(0xA1A2A3A4A5A6A7A8L);
|
||||
@ -654,28 +654,28 @@ public class BufferReferenceCountingTest extends BufferTestSupport {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void bifurcateOnEmptyBigEndianCompositeBuffer() {
|
||||
public void splitOnEmptyBigEndianCompositeBuffer() {
|
||||
try (BufferAllocator allocator = BufferAllocator.heap();
|
||||
Buffer buf = CompositeBuffer.compose(allocator).order(BIG_ENDIAN)) {
|
||||
verifyBifurcateEmptyCompositeBuffer(buf);
|
||||
verifySplitEmptyCompositeBuffer(buf);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void bifurcateOnEmptyLittleEndianCompositeBuffer() {
|
||||
public void splitOnEmptyLittleEndianCompositeBuffer() {
|
||||
try (BufferAllocator allocator = BufferAllocator.heap();
|
||||
Buffer buf = CompositeBuffer.compose(allocator).order(LITTLE_ENDIAN)) {
|
||||
verifyBifurcateEmptyCompositeBuffer(buf);
|
||||
verifySplitEmptyCompositeBuffer(buf);
|
||||
}
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource("allocators")
|
||||
public void bifurcatedBuffersMustBeAccessibleInOtherThreads(Fixture fixture) throws Exception {
|
||||
public void splitBuffersMustBeAccessibleInOtherThreads(Fixture fixture) throws Exception {
|
||||
try (BufferAllocator allocator = fixture.createAllocator();
|
||||
Buffer buf = allocator.allocate(8)) {
|
||||
buf.writeInt(42);
|
||||
var send = buf.bifurcate().send();
|
||||
var send = buf.split().send();
|
||||
var fut = executor.submit(() -> {
|
||||
try (Buffer receive = send.receive()) {
|
||||
assertEquals(42, receive.readInt());
|
||||
@ -716,13 +716,13 @@ public class BufferReferenceCountingTest extends BufferTestSupport {
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource("allocators")
|
||||
public void bifurcateOfReadOnlyBufferMustBeReadOnly(Fixture fixture) {
|
||||
public void splitOfReadOnlyBufferMustBeReadOnly(Fixture fixture) {
|
||||
try (BufferAllocator allocator = fixture.createAllocator();
|
||||
Buffer buf = allocator.allocate(16)) {
|
||||
buf.writeLong(0x0102030405060708L);
|
||||
buf.readOnly(true);
|
||||
try (Buffer bifurcate = buf.bifurcate()) {
|
||||
assertTrue(bifurcate.readOnly());
|
||||
try (Buffer split = buf.split()) {
|
||||
assertTrue(split.readOnly());
|
||||
assertTrue(buf.readOnly());
|
||||
}
|
||||
}
|
||||
|
@ -111,15 +111,15 @@ public class BufferSendTest extends BufferTestSupport {
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource("allocators")
|
||||
public void sendMustNotMakeBifurcatedBuffersInaccessible(Fixture fixture) throws Exception {
|
||||
public void sendMustNotMakeSplitBuffersInaccessible(Fixture fixture) throws Exception {
|
||||
try (BufferAllocator allocator = fixture.createAllocator();
|
||||
Buffer buf = allocator.allocate(16)) {
|
||||
buf.writeInt(64);
|
||||
var bifA = buf.bifurcate();
|
||||
var splitA = buf.split();
|
||||
buf.writeInt(42);
|
||||
var send = buf.bifurcate().send();
|
||||
var send = buf.split().send();
|
||||
buf.writeInt(72);
|
||||
var bifB = buf.bifurcate();
|
||||
var splitB = buf.split();
|
||||
var fut = executor.submit(() -> {
|
||||
try (Buffer receive = send.receive()) {
|
||||
assertEquals(42, receive.readInt());
|
||||
@ -128,8 +128,8 @@ public class BufferSendTest extends BufferTestSupport {
|
||||
fut.get();
|
||||
buf.writeInt(32);
|
||||
assertEquals(32, buf.readInt());
|
||||
assertEquals(64, bifA.readInt());
|
||||
assertEquals(72, bifB.readInt());
|
||||
assertEquals(64, splitA.readInt());
|
||||
assertEquals(72, splitB.readInt());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -243,22 +243,22 @@ public abstract class BufferTestSupport {
|
||||
}
|
||||
|
||||
var stream = builder.build();
|
||||
return stream.flatMap(BufferTestSupport::injectBifurcations)
|
||||
return stream.flatMap(BufferTestSupport::injectSplits)
|
||||
.flatMap(BufferTestSupport::injectSlices)
|
||||
.flatMap(BufferTestSupport::injectReadOnlyToggling);
|
||||
}
|
||||
|
||||
private static Stream<Fixture> injectBifurcations(Fixture f) {
|
||||
private static Stream<Fixture> injectSplits(Fixture f) {
|
||||
Builder<Fixture> builder = Stream.builder();
|
||||
builder.add(f);
|
||||
builder.add(new Fixture(f + ".bifurcate", () -> {
|
||||
builder.add(new Fixture(f + ".split", () -> {
|
||||
var allocatorBase = f.get();
|
||||
return new BufferAllocator() {
|
||||
@Override
|
||||
public Buffer allocate(int size) {
|
||||
try (Buffer buf = allocatorBase.allocate(size + 1)) {
|
||||
buf.writerOffset(size);
|
||||
return buf.bifurcate().writerOffset(0);
|
||||
return buf.split().writerOffset(0);
|
||||
}
|
||||
}
|
||||
|
||||
@ -369,7 +369,7 @@ public abstract class BufferTestSupport {
|
||||
}
|
||||
}
|
||||
|
||||
assertThrows(IllegalStateException.class, () -> buf.bifurcate());
|
||||
assertThrows(IllegalStateException.class, () -> buf.split());
|
||||
assertThrows(IllegalStateException.class, () -> buf.send());
|
||||
assertThrows(IllegalStateException.class, () -> buf.acquire());
|
||||
assertThrows(IllegalStateException.class, () -> buf.slice());
|
||||
@ -832,8 +832,8 @@ public abstract class BufferTestSupport {
|
||||
assertEquals(woff, buf.writerOffset());
|
||||
}
|
||||
|
||||
public static void verifyBifurcateEmptyCompositeBuffer(Buffer buf) {
|
||||
try (Buffer a = buf.bifurcate()) {
|
||||
public static void verifySplitEmptyCompositeBuffer(Buffer buf) {
|
||||
try (Buffer a = buf.split()) {
|
||||
a.ensureWritable(4);
|
||||
buf.ensureWritable(4);
|
||||
a.writeInt(1);
|
||||
|
@ -226,9 +226,9 @@ public class SendExample {
|
||||
BufferAllocator allocator = BufferAllocator.heap();
|
||||
|
||||
try (Buffer buf = allocator.allocate(4096)) {
|
||||
var futA = executor.submit(new Task(buf.writerOffset(1024).bifurcate().send()));
|
||||
var futB = executor.submit(new Task(buf.writerOffset(1024).bifurcate().send()));
|
||||
var futC = executor.submit(new Task(buf.writerOffset(1024).bifurcate().send()));
|
||||
var futA = executor.submit(new Task(buf.writerOffset(1024).split().send()));
|
||||
var futB = executor.submit(new Task(buf.writerOffset(1024).split().send()));
|
||||
var futC = executor.submit(new Task(buf.writerOffset(1024).split().send()));
|
||||
var futD = executor.submit(new Task(buf.send()));
|
||||
futA.get();
|
||||
futB.get();
|
||||
|
@ -51,7 +51,7 @@ public class AlternativeMessageDecoderTest {
|
||||
}
|
||||
|
||||
// We can read our message in full.
|
||||
Buffer messageBuffer = input.bifurcate(input.readerOffset() + length);
|
||||
Buffer messageBuffer = input.split(input.readerOffset() + length);
|
||||
ctx.fireChannelRead(messageBuffer);
|
||||
return true;
|
||||
}
|
||||
@ -80,7 +80,7 @@ public class AlternativeMessageDecoderTest {
|
||||
if (length == messagesBuffer.readableBytes()) {
|
||||
channel.writeInbound(messagesBuffer);
|
||||
} else {
|
||||
channel.writeInbound(messagesBuffer.bifurcate(length));
|
||||
channel.writeInbound(messagesBuffer.split(length));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -58,7 +58,7 @@ import static java.util.Objects.requireNonNull;
|
||||
* {@code @Override}
|
||||
* public void decode({@link ChannelHandlerContext} ctx, {@link Buffer} in)
|
||||
* throws {@link Exception} {
|
||||
* ctx.fireChannelRead(in.bifurcate());
|
||||
* ctx.fireChannelRead(in.split());
|
||||
* }
|
||||
* }
|
||||
* </pre>
|
||||
@ -85,7 +85,7 @@ import static java.util.Objects.requireNonNull;
|
||||
* Be aware that sub-classes of {@link ByteToMessageDecoder} <strong>MUST NOT</strong>
|
||||
* annotated with {@link @Sharable}.
|
||||
* <p>
|
||||
* Some methods such as {@link Buffer#bifurcate(int)} will cause a memory leak if the returned buffer
|
||||
* Some methods such as {@link Buffer#split(int)} will cause a memory leak if the returned buffer
|
||||
* is not released or fired through the {@link ChannelPipeline} via
|
||||
* {@link ChannelHandlerContext#fireChannelRead(Object)}.
|
||||
*/
|
||||
@ -276,7 +276,7 @@ public abstract class ByteToMessageDecoder extends ChannelHandlerAdapter {
|
||||
}
|
||||
assert context.ctx == ctx || ctx == context;
|
||||
|
||||
callDecode(context, cumulation); // TODO we'll want to bifurcate here, and simplify lifetime handling
|
||||
callDecode(context, cumulation); // TODO we'll want to split here, and simplify lifetime handling
|
||||
} catch (DecoderException e) {
|
||||
throw e;
|
||||
} catch (Exception e) {
|
||||
|
Loading…
Reference in New Issue
Block a user