Remove the slice methods, add copy methods

This commit is contained in:
Chris Vest 2021-05-27 14:07:31 +02:00
parent bfa8fd0b1f
commit 707e5e2afb
19 changed files with 178 additions and 442 deletions

View File

@ -74,35 +74,20 @@ import java.nio.ByteOrder;
* 0 <= readerOffset <= writerOffset <= capacity
* </pre>
*
* <h3 name="slice-split">Slice vs. Split</h3>
* <h3 name="slice-split">Splitting buffers</h3>
*
* The {@link #slice()} and {@link #split()} methods both return new buffers on the memory of the buffer they're
* called on.
* However, there are also important differences between the two, as they are aimed at different use cases that were
* previously (in the {@code ByteBuf} API) covered by {@code slice()} alone.
*
* <ul>
* <li>
* Slices create a new view onto the memory, that is shared between the slice and the buffer.
* As long as both the slice, and the originating buffer are alive, neither will have ownership of the memory.
* Since the memory is shared, changes to the data made through one will be visible through the other.
* </li>
* <li>
* Split breaks the ownership of the memory in two.
* Both resulting buffers retain ownership of their respective region of memory.
* They can do this because the regions are guaranteed to not overlap; data changes through one buffer will not
* be visible through the other.
* </li>
* </ul>
*
* These differences mean that slicing is mostly suitable for when you temporarily want to share a focused area of a
* buffer.
* Examples of this include doing IO, or decoding a bounded part of a larger message.
* On the other hand, split is suitable for when you want to hand over a region of a buffer to some other,
* The {@link #split()} method break a buffer into two.
* The two buffers will share the underlying memory, but their regions will not overlap, ensuring that the memory is
* safely shared between the two.
* <p>
* Splitting a buffer is useful for when you want to hand over a region of a buffer to some other,
* perhaps unknown, piece of code, and relinquish your ownership of that buffer region in the process.
* Examples include aggregating messages into an accumulator buffer, and sending messages down the pipeline for
* further processing, as split buffer regions, once their data has been received in its entirety.
*
* If you instead wish to temporarily share a region of a buffer, you will have to pass offset and length along with the
* buffer, or you will have to make a copy of the region in a new buffer.
*
* <h3>Buffers as constants</h3>
*
* Sometimes, the same bit of data will be processed or transmitted over and over again. In such cases, it can be
@ -439,47 +424,35 @@ public interface Buffer extends Resource<Buffer>, BufferAccessors {
void ensureWritable(int size, int minimumGrowth, boolean allowCompaction);
/**
* Returns a slice of this buffer's readable bytes.
* Modifying the content of the returned buffer or this buffer affects each other's content,
* while they maintain separate offsets. This method is identical to
* {@code buf.slice(buf.readerOffset(), buf.readableBytes())}.
* Returns a copy of this buffer's readable bytes.
* Modifying the content of the returned buffer will not affect this buffers contents.
* The two buffers will maintain separate offsets. This method is identical to
* {@code buf.copy(buf.readerOffset(), buf.readableBytes())}.
* This method does not modify {@link #readerOffset()} or {@link #writerOffset()} of this buffer.
* <p>
* This method increments the reference count of this buffer.
* The reference count is decremented again when the slice is deallocated.
* <p>
* The slice is created with a {@linkplain #writerOffset() write offset} equal to the length of the slice,
* so that the entire contents of the slice is ready to be read.
* <p>
* See the <a href="#slice-split">Slice vs. Split</a> section for details on the difference between slice
* and split.
* The copy is created with a {@linkplain #writerOffset() write offset} equal to the length of the copied data,
* so that the entire contents of the copy is ready to be read.
*
* @return A new buffer instance, with independent {@link #readerOffset()} and {@link #writerOffset()},
* that is a view of the readable region of this buffer.
* that contains a copy of the readable region of this buffer.
*/
default Buffer slice() {
return slice(readerOffset(), readableBytes());
default Buffer copy() {
return copy(readerOffset(), readableBytes());
}
/**
* Returns a slice of the given region of this buffer.
* Modifying the content of the returned buffer or this buffer affects each other's content,
* while they maintain separate offsets.
* Returns a copy of the given region of this buffer.
* Modifying the content of the returned buffer will not affect this buffers contents.
* The two buffers will maintain separate offsets.
* This method does not modify {@link #readerOffset()} or {@link #writerOffset()} of this buffer.
* <p>
* This method increments the reference count of this buffer.
* The reference count is decremented again when the slice is deallocated.
* <p>
* The slice is created with a {@linkplain #writerOffset() write offset} equal to the length of the slice,
* so that the entire contents of the slice is ready to be read.
* <p>
* See the <a href="#slice-split">Slice vs. Split</a> section for details on the difference between slice
* and split.
* The copy is created with a {@linkplain #writerOffset() write offset} equal to the length of the copy,
* so that the entire contents of the copy is ready to be read.
*
* @return A new buffer instance, with independent {@link #readerOffset()} and {@link #writerOffset()},
* that is a view of the given region of this buffer.
* that contains a copy of the given region of this buffer.
*/
Buffer slice(int offset, int length);
Buffer copy(int offset, int length);
/**
* Split the buffer into two, at the {@linkplain #writerOffset() write offset} position.

View File

@ -414,7 +414,7 @@ public final class CompositeBuffer extends ResourceSupport<Buffer, CompositeBuff
}
@Override
public CompositeBuffer slice(int offset, int length) {
public CompositeBuffer copy(int offset, int length) {
checkWriteBounds(offset, length);
if (offset < 0 || length < 0) {
throw new IllegalArgumentException(
@ -422,31 +422,27 @@ public final class CompositeBuffer extends ResourceSupport<Buffer, CompositeBuff
offset + ", and length was " + length + '.');
}
Buffer choice = (Buffer) chooseBuffer(offset, 0);
Buffer[] slices;
Buffer[] copies;
if (length > 0) {
slices = new Buffer[bufs.length];
copies = new Buffer[bufs.length];
int off = subOffset;
int cap = length;
int i;
for (i = searchOffsets(offset); cap > 0; i++) {
var buf = bufs[i];
int avail = buf.capacity() - off;
slices[i] = buf.slice(off, Math.min(cap, avail));
copies[i] = buf.copy(off, Math.min(cap, avail));
cap -= avail;
off = 0;
}
slices = Arrays.copyOf(slices, i);
copies = Arrays.copyOf(copies, i);
} else {
// Specialize for length == 0, since we must slice from at least one constituent buffer.
slices = new Buffer[] { choice.slice(subOffset, 0) };
copies = new Buffer[] { choice.copy(subOffset, 0) };
}
// Use the constructor that skips filtering out empty buffers, and skips acquiring on the buffers.
// This is important because 1) slice() already acquired the buffers, and 2) if this slice is empty
// then we need to keep holding on to it to prevent this originating composite buffer from getting
// ownership. If it did, its behaviour would be inconsistent with that of a non-composite buffer.
return new CompositeBuffer(allocator, slices, COMPOSITE_DROP);
return new CompositeBuffer(allocator, copies, COMPOSITE_DROP);
}
@Override

View File

@ -1418,11 +1418,8 @@ public final class ByteBufAdaptor extends ByteBuf {
@Override
public ByteBuf retainedSlice(int index, int length) {
checkAccess();
try {
return wrap(buffer.slice(index, length));
} catch (IllegalStateException e) {
throw new IllegalReferenceCountException(e);
}
retain();
return new Slice(this, index, length);
}
private static final class Slice extends SlicedByteBuf {

View File

@ -188,23 +188,22 @@ class NioBuffer extends ResourceSupport<Buffer, NioBuffer> implements Buffer, Re
}
@Override
public Buffer slice(int offset, int length) {
public Buffer copy(int offset, int length) {
if (length < 0) {
throw new IllegalArgumentException("Length cannot be negative: " + length + '.');
}
if (!isAccessible()) {
throw new IllegalStateException("This buffer is closed: " + this + '.');
}
ByteBuffer slice = bbslice(rmem, offset, length);
ArcDrop<NioBuffer> drop = (ArcDrop<NioBuffer>) unsafeGetDrop();
drop.increment();
Buffer sliceBuffer = new NioBuffer(base, slice, control, drop)
.writerOffset(length)
.order(order());
AllocatorControl.UntetheredMemory memory = control.allocateUntethered(this, length);
ByteBuffer byteBuffer = memory.memory();
Buffer copy = new NioBuffer(byteBuffer, byteBuffer, control, memory.drop());
copyInto(0, copy, 0, length);
copy.writerOffset(length).order(order());
if (readOnly()) {
sliceBuffer = sliceBuffer.makeReadOnly();
copy = copy.makeReadOnly();
}
return sliceBuffer;
return copy;
}
@Override

View File

@ -165,20 +165,19 @@ class UnsafeBuffer extends ResourceSupport<Buffer, UnsafeBuffer> implements Buff
}
@Override
public Buffer slice(int offset, int length) {
public Buffer copy(int offset, int length) {
if (length < 0) {
throw new IllegalArgumentException("Length cannot be negative: " + length + '.');
}
checkGet(offset, length);
ArcDrop<UnsafeBuffer> drop = (ArcDrop<UnsafeBuffer>) unsafeGetDrop();
drop.increment();
Buffer sliceBuffer = new UnsafeBuffer(memory, baseOffset + offset, length, control, drop)
.writerOffset(length)
.order(order);
AllocatorControl.UntetheredMemory memory = control.allocateUntethered(this, length);
UnsafeMemory unsafeMemory = memory.memory();
Buffer copy = new UnsafeBuffer(unsafeMemory, unsafeMemory.address, length, control, memory.drop());
copy.writerOffset(length).order(order);
if (readOnly) {
sliceBuffer = sliceBuffer.makeReadOnly();
copy = copy.makeReadOnly();
}
return sliceBuffer;
return copy;
}
@Override

View File

@ -293,22 +293,22 @@ class MemSegBuffer extends ResourceSupport<Buffer, MemSegBuffer> implements Buff
}
@Override
public Buffer slice(int offset, int length) {
public Buffer copy(int offset, int length) {
if (length < 0) {
throw new IllegalArgumentException("Length cannot be negative: " + length + '.');
}
if (!isAccessible()) {
throw new IllegalStateException("This buffer is closed: " + this + '.');
}
var slice = seg.asSlice(offset, length);
Drop<MemSegBuffer> drop = ArcDrop.acquire(unsafeGetDrop());
Buffer sliceBuffer = new MemSegBuffer(base, slice, drop, control)
.writerOffset(length)
.order(order());
AllocatorControl.UntetheredMemory memory = control.allocateUntethered(this, length);
MemorySegment segment = memory.memory();
Buffer copy = new MemSegBuffer(segment, segment, memory.drop(), control);
copyInto(0, copy, 0, length);
copy.writerOffset(length).order(order());
if (readOnly()) {
sliceBuffer = sliceBuffer.makeReadOnly();
copy = copy.makeReadOnly();
}
return sliceBuffer;
return copy;
}
@Override

View File

@ -29,7 +29,6 @@ import static java.nio.ByteOrder.LITTLE_ENDIAN;
import static org.assertj.core.api.Assertions.assertThat;
public class BufferBulkAccessTest extends BufferTestSupport {
@ParameterizedTest
@MethodSource("allocators")
void fill(Fixture fixture) {
@ -86,28 +85,6 @@ public class BufferBulkAccessTest extends BufferTestSupport {
testCopyIntoBuf(fixture, BufferAllocator.direct()::allocate);
}
@ParameterizedTest
@MethodSource("allocators")
void copyIntoOnHeapBufSlice(Fixture fixture) {
try (BufferAllocator allocator = BufferAllocator.heap();
Scope scope = new Scope()) {
testCopyIntoBuf(fixture, size -> {
return scope.add(allocator.allocate(size)).writerOffset(size).slice();
});
}
}
@ParameterizedTest
@MethodSource("allocators")
void copyIntoOffHeapBufSlice(Fixture fixture) {
try (BufferAllocator allocator = BufferAllocator.direct();
Scope scope = new Scope()) {
testCopyIntoBuf(fixture, size -> {
return scope.add(allocator.allocate(size)).writerOffset(size).slice();
});
}
}
@ParameterizedTest
@MethodSource("allocators")
void copyIntoCompositeOnHeapOnHeapBuf(Fixture fixture) {
@ -174,7 +151,7 @@ public class BufferBulkAccessTest extends BufferTestSupport {
@ParameterizedTest
@MethodSource("allocators")
void copyIntoCompositeOnHeapOnHeapBufSlice(Fixture fixture) {
void copyIntoCompositeOnHeapOnHeapBufCopy(Fixture fixture) {
try (var a = BufferAllocator.heap();
var b = BufferAllocator.heap();
var scope = new Scope()) {
@ -183,7 +160,7 @@ public class BufferBulkAccessTest extends BufferTestSupport {
int second = size - first;
try (var bufFirst = a.allocate(first);
var bufSecond = b.allocate(second)) {
return scope.add(CompositeBuffer.compose(a, bufFirst.send(), bufSecond.send())).writerOffset(size).slice();
return scope.add(CompositeBuffer.compose(a, bufFirst.send(), bufSecond.send())).writerOffset(size).copy();
}
});
}
@ -191,7 +168,7 @@ public class BufferBulkAccessTest extends BufferTestSupport {
@ParameterizedTest
@MethodSource("allocators")
void copyIntoCompositeOnHeapOffHeapBufSlice(Fixture fixture) {
void copyIntoCompositeOnHeapOffHeapBufCopy(Fixture fixture) {
try (var a = BufferAllocator.heap();
var b = BufferAllocator.direct();
var scope = new Scope()) {
@ -200,7 +177,7 @@ public class BufferBulkAccessTest extends BufferTestSupport {
int second = size - first;
try (var bufFirst = a.allocate(first);
var bufSecond = b.allocate(second)) {
return scope.add(CompositeBuffer.compose(a, bufFirst.send(), bufSecond.send())).writerOffset(size).slice();
return scope.add(CompositeBuffer.compose(a, bufFirst.send(), bufSecond.send())).writerOffset(size).copy();
}
});
}
@ -208,7 +185,7 @@ public class BufferBulkAccessTest extends BufferTestSupport {
@ParameterizedTest
@MethodSource("allocators")
void copyIntoCompositeOffHeapOnHeapBufSlice(Fixture fixture) {
void copyIntoCompositeOffHeapOnHeapBufCopy(Fixture fixture) {
try (var a = BufferAllocator.direct();
var b = BufferAllocator.heap();
var scope = new Scope()) {
@ -217,7 +194,7 @@ public class BufferBulkAccessTest extends BufferTestSupport {
int second = size - first;
try (var bufFirst = a.allocate(first);
var bufSecond = b.allocate(second)) {
return scope.add(CompositeBuffer.compose(a, bufFirst.send(), bufSecond.send())).writerOffset(size).slice();
return scope.add(CompositeBuffer.compose(a, bufFirst.send(), bufSecond.send())).writerOffset(size).copy();
}
});
}
@ -225,7 +202,7 @@ public class BufferBulkAccessTest extends BufferTestSupport {
@ParameterizedTest
@MethodSource("allocators")
void copyIntoCompositeOffHeapOffHeapBufSlice(Fixture fixture) {
void copyIntoCompositeOffHeapOffHeapBufCopy(Fixture fixture) {
try (var a = BufferAllocator.direct();
var b = BufferAllocator.direct();
var scope = new Scope()) {
@ -234,7 +211,7 @@ public class BufferBulkAccessTest extends BufferTestSupport {
int second = size - first;
try (var bufFirst = a.allocate(first);
var bufSecond = b.allocate(second)) {
return scope.add(CompositeBuffer.compose(a, bufFirst.send(), bufSecond.send())).writerOffset(size).slice();
return scope.add(CompositeBuffer.compose(a, bufFirst.send(), bufSecond.send())).writerOffset(size).copy();
}
});
}

View File

@ -92,15 +92,15 @@ public class BufferEnsureWritableTest extends BufferTestSupport {
@ParameterizedTest
@MethodSource("allocators")
public void mustBeAbleToSliceAfterEnsureWritable(Fixture fixture) {
public void mustBeAbleToCopyAfterEnsureWritable(Fixture fixture) {
try (BufferAllocator allocator = fixture.createAllocator();
Buffer buf = allocator.allocate(4)) {
buf.ensureWritable(8);
assertThat(buf.writableBytes()).isGreaterThanOrEqualTo(8);
assertThat(buf.capacity()).isGreaterThanOrEqualTo(8);
buf.writeLong(0x0102030405060708L);
try (Buffer slice = buf.slice()) {
assertEquals(0x0102030405060708L, slice.readLong());
try (Buffer copy = buf.copy()) {
assertEquals(0x0102030405060708L, copy.readLong());
}
}
}

View File

@ -195,7 +195,7 @@ public class BufferReadOnlyTest extends BufferTestSupport {
assertTrue(asRS(b).isOwned());
assertThat(a.capacity()).isEqualTo(8);
assertThat(b.capacity()).isEqualTo(8);
try (Buffer c = b.slice()) {
try (Buffer c = b.copy()) {
assertTrue(c.readOnly());
assertFalse(asRS(c).isOwned());
assertFalse(asRS(b).isOwned());
@ -211,7 +211,7 @@ public class BufferReadOnlyTest extends BufferTestSupport {
Supplier<Buffer> supplier = allocator.constBufferSupplier(new byte[] {1, 2, 3, 4});
try (Buffer a = supplier.get();
Buffer b = supplier.get();
Buffer c = a.slice()) {
Buffer c = a.copy()) {
assertEquals(1, a.readByte());
assertEquals(2, a.readByte());
assertThrows(IllegalStateException.class, () -> a.compact()); // Can't compact read-only buffer.

View File

@ -108,7 +108,7 @@ public class BufferReferenceCountingTest extends BufferTestSupport {
@ParameterizedTest
@MethodSource("allocators")
void sliceWithoutOffsetAndSizeMustReturnReadableRegion(Fixture fixture) {
void copyWithoutOffsetAndSizeMustReturnReadableRegion(Fixture fixture) {
try (BufferAllocator allocator = fixture.createAllocator();
Buffer buf = allocator.allocate(8)) {
for (byte b : new byte[] { 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08 }) {
@ -116,7 +116,7 @@ public class BufferReferenceCountingTest extends BufferTestSupport {
}
assertEquals(0x01, buf.readByte());
buf.writerOffset(buf.writerOffset() - 1);
try (Buffer slice = buf.slice()) {
try (Buffer slice = buf.copy()) {
assertThat(toByteArray(slice)).containsExactly(0x02, 0x03, 0x04, 0x05, 0x06, 0x07);
assertEquals(0, slice.readerOffset());
assertEquals(6, slice.readableBytes());
@ -135,7 +135,7 @@ public class BufferReferenceCountingTest extends BufferTestSupport {
@ParameterizedTest
@MethodSource("allocators")
void sliceWithOffsetAndSizeMustReturnGivenRegion(Fixture fixture) {
void copyWithOffsetAndSizeMustReturnGivenRegion(Fixture fixture) {
try (BufferAllocator allocator = fixture.createAllocator();
Buffer buf = allocator.allocate(8)) {
for (byte b : new byte[] { 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08 }) {
@ -143,7 +143,7 @@ public class BufferReferenceCountingTest extends BufferTestSupport {
}
buf.readerOffset(3); // Reader and writer offsets must be ignored.
buf.writerOffset(6);
try (Buffer slice = buf.slice(1, 6)) {
try (Buffer slice = buf.copy(1, 6)) {
assertThat(toByteArray(slice)).containsExactly(0x02, 0x03, 0x04, 0x05, 0x06, 0x07);
assertEquals(0, slice.readerOffset());
assertEquals(6, slice.readableBytes());
@ -162,12 +162,14 @@ public class BufferReferenceCountingTest extends BufferTestSupport {
@ParameterizedTest
@MethodSource("allocators")
void sliceWithoutOffsetAndSizeWillIncreaseReferenceCount(Fixture fixture) {
void copyWithoutOffsetAndSizeMustNotInfluenceOwnership(Fixture fixture) {
try (BufferAllocator allocator = fixture.createAllocator();
Buffer buf = allocator.allocate(8)) {
try (Buffer ignored = buf.slice()) {
assertFalse(asRS(buf).isOwned());
assertThrows(IllegalStateException.class, buf::send);
try (Buffer copy = buf.copy()) {
assertTrue(asRS(buf).isOwned());
buf.send().discard();
assertTrue(asRS(copy).isOwned());
copy.send().discard();
}
assertTrue(asRS(buf).isOwned());
}
@ -175,12 +177,14 @@ public class BufferReferenceCountingTest extends BufferTestSupport {
@ParameterizedTest
@MethodSource("allocators")
void sliceWithOffsetAndSizeWillIncreaseReferenceCount(Fixture fixture) {
void copyWithOffsetAndSizeMustNotInfluenceOwnership(Fixture fixture) {
try (BufferAllocator allocator = fixture.createAllocator();
Buffer buf = allocator.allocate(8)) {
try (Buffer ignored = buf.slice(0, 8)) {
assertFalse(asRS(buf).isOwned());
assertThrows(IllegalStateException.class, buf::send);
try (Buffer copy = buf.copy(0, 8)) {
assertTrue(asRS(buf).isOwned());
buf.send().discard();
assertTrue(asRS(copy).isOwned());
copy.send().discard();
}
assertTrue(asRS(buf).isOwned());
}
@ -188,46 +192,46 @@ public class BufferReferenceCountingTest extends BufferTestSupport {
@ParameterizedTest
@MethodSource("allocators")
void sliceWithoutOffsetAndSizeHasSameEndianAsParent(Fixture fixture) {
void copyWithoutOffsetAndSizeHasSameEndianAsParent(Fixture fixture) {
try (BufferAllocator allocator = fixture.createAllocator();
Buffer buf = allocator.allocate(8)) {
buf.order(BIG_ENDIAN);
buf.writeLong(0x0102030405060708L);
try (Buffer slice = buf.slice()) {
assertEquals(0x0102030405060708L, slice.readLong());
try (Buffer copy = buf.copy()) {
assertEquals(0x0102030405060708L, copy.readLong());
}
buf.order(LITTLE_ENDIAN);
try (Buffer slice = buf.slice()) {
assertEquals(0x0807060504030201L, slice.readLong());
try (Buffer copy = buf.copy()) {
assertEquals(0x0807060504030201L, copy.readLong());
}
}
}
@ParameterizedTest
@MethodSource("allocators")
void sliceWithOffsetAndSizeHasSameEndianAsParent(Fixture fixture) {
void copyWithOffsetAndSizeHasSameEndianAsParent(Fixture fixture) {
try (BufferAllocator allocator = fixture.createAllocator();
Buffer buf = allocator.allocate(8)) {
buf.order(BIG_ENDIAN);
buf.writeLong(0x0102030405060708L);
try (Buffer slice = buf.slice(0, 8)) {
assertEquals(0x0102030405060708L, slice.readLong());
try (Buffer copy = buf.copy(0, 8)) {
assertEquals(0x0102030405060708L, copy.readLong());
}
buf.order(LITTLE_ENDIAN);
try (Buffer slice = buf.slice(0, 8)) {
assertEquals(0x0807060504030201L, slice.readLong());
try (Buffer copy = buf.copy(0, 8)) {
assertEquals(0x0807060504030201L, copy.readLong());
}
}
}
@ParameterizedTest
@MethodSource("allocators")
void sendOnSliceWithoutOffsetAndSizeMustThrow(Fixture fixture) {
void sendOnCopyWithoutOffsetAndSizeMustNotThrow(Fixture fixture) {
try (BufferAllocator allocator = fixture.createAllocator();
Buffer buf = allocator.allocate(8)) {
try (Buffer slice = buf.slice()) {
try (Buffer copy = buf.copy()) {
assertFalse(asRS(buf).isOwned());
assertThrows(IllegalStateException.class, slice::send);
copy.send().discard();
}
// Verify that the slice is closed properly afterwards.
assertTrue(asRS(buf).isOwned());
@ -237,12 +241,12 @@ public class BufferReferenceCountingTest extends BufferTestSupport {
@ParameterizedTest
@MethodSource("allocators")
void sendOnSliceWithOffsetAndSizeMustThrow(Fixture fixture) {
void sendOnCopyWithOffsetAndSizeMustThrow(Fixture fixture) {
try (BufferAllocator allocator = fixture.createAllocator();
Buffer buf = allocator.allocate(8)) {
try (Buffer slice = buf.slice(0, 8)) {
try (Buffer copy = buf.copy(0, 8)) {
assertFalse(asRS(buf).isOwned());
assertThrows(IllegalStateException.class, slice::send);
copy.send().discard();
}
// Verify that the slice is closed properly afterwards.
assertTrue(asRS(buf).isOwned());
@ -251,10 +255,10 @@ public class BufferReferenceCountingTest extends BufferTestSupport {
@ParameterizedTest
@MethodSource("allocators")
void sliceWithNegativeOffsetMustThrow(Fixture fixture) {
void copyWithNegativeOffsetMustThrow(Fixture fixture) {
try (BufferAllocator allocator = fixture.createAllocator();
Buffer buf = allocator.allocate(8)) {
assertThrows(IndexOutOfBoundsException.class, () -> buf.slice(-1, 1));
assertThrows(IndexOutOfBoundsException.class, () -> buf.copy(-1, 1));
// Verify that the slice is closed properly afterwards.
assertTrue(asRS(buf).isOwned());
}
@ -262,11 +266,11 @@ public class BufferReferenceCountingTest extends BufferTestSupport {
@ParameterizedTest
@MethodSource("allocators")
void sliceWithNegativeSizeMustThrow(Fixture fixture) {
void copyWithNegativeSizeMustThrow(Fixture fixture) {
try (BufferAllocator allocator = fixture.createAllocator();
Buffer buf = allocator.allocate(8)) {
assertThrows(IllegalArgumentException.class, () -> buf.slice(0, -1));
assertThrows(IllegalArgumentException.class, () -> buf.slice(2, -1));
assertThrows(IllegalArgumentException.class, () -> buf.copy(0, -1));
assertThrows(IllegalArgumentException.class, () -> buf.copy(2, -1));
// Verify that the slice is closed properly afterwards.
assertTrue(asRS(buf).isOwned());
}
@ -274,12 +278,12 @@ public class BufferReferenceCountingTest extends BufferTestSupport {
@ParameterizedTest
@MethodSource("allocators")
void sliceWithSizeGreaterThanCapacityMustThrow(Fixture fixture) {
void copyWithSizeGreaterThanCapacityMustThrow(Fixture fixture) {
try (BufferAllocator allocator = fixture.createAllocator();
Buffer buf = allocator.allocate(8)) {
assertThrows(IndexOutOfBoundsException.class, () -> buf.slice(0, 9));
buf.slice(0, 8).close(); // This is still fine.
assertThrows(IndexOutOfBoundsException.class, () -> buf.slice(1, 8));
assertThrows(IndexOutOfBoundsException.class, () -> buf.copy(0, 9));
buf.copy(0, 8).close(); // This is still fine.
assertThrows(IndexOutOfBoundsException.class, () -> buf.copy(1, 8));
// Verify that the slice is closed properly afterwards.
assertTrue(asRS(buf).isOwned());
}
@ -287,10 +291,10 @@ public class BufferReferenceCountingTest extends BufferTestSupport {
@ParameterizedTest
@MethodSource("allocators")
void sliceWithZeroSizeMustBeAllowed(Fixture fixture) {
void copyWithZeroSizeMustBeAllowed(Fixture fixture) {
try (BufferAllocator allocator = fixture.createAllocator();
Buffer buf = allocator.allocate(8)) {
buf.slice(0, 0).close(); // This is fine.
buf.copy(0, 0).close(); // This is fine.
// Verify that the slice is closed properly afterwards.
assertTrue(asRS(buf).isOwned());
}
@ -298,17 +302,19 @@ public class BufferReferenceCountingTest extends BufferTestSupport {
@ParameterizedTest
@MethodSource("allocators")
public void sliceMustBecomeOwnedOnSourceBufferClose(Fixture fixture) {
public void copyMustBeOwned(Fixture fixture) {
try (BufferAllocator allocator = fixture.createAllocator()) {
Buffer buf = allocator.allocate(8);
buf.writeInt(42);
try (Buffer slice = buf.slice()) {
try (Buffer copy = buf.copy()) {
assertTrue(asRS(copy).isOwned());
assertTrue(asRS(buf).isOwned());
buf.close();
assertFalse(buf.isAccessible());
assertTrue(asRS(slice).isOwned());
try (Buffer receive = slice.send().receive()) {
assertTrue(asRS(copy).isOwned());
try (Buffer receive = copy.send().receive()) {
assertTrue(asRS(receive).isOwned());
assertFalse(slice.isAccessible());
assertFalse(copy.isAccessible());
}
}
}
@ -521,20 +527,20 @@ public class BufferReferenceCountingTest extends BufferTestSupport {
@ParameterizedTest
@MethodSource("allocators")
public void mustBePossibleToSplitOwnedSlices(Fixture fixture) {
public void mustBePossibleToSplitCopies(Fixture fixture) {
try (BufferAllocator allocator = fixture.createAllocator()) {
Buffer buf = allocator.allocate(16).order(BIG_ENDIAN);
buf.writeLong(0x0102030405060708L);
try (Buffer slice = buf.slice()) {
try (Buffer copy = buf.copy()) {
buf.close();
assertTrue(asRS(slice).isOwned());
try (Buffer split = slice.split(4)) {
assertTrue(asRS(copy).isOwned());
try (Buffer split = copy.split(4)) {
split.reset().ensureWritable(Long.BYTES);
slice.reset().ensureWritable(Long.BYTES);
copy.reset().ensureWritable(Long.BYTES);
assertThat(split.capacity()).isEqualTo(Long.BYTES);
assertThat(slice.capacity()).isEqualTo(Long.BYTES);
assertThat(copy.capacity()).isEqualTo(Long.BYTES);
assertThat(split.getLong(0)).isEqualTo(0x01020304_00000000L);
assertThat(slice.getLong(0)).isEqualTo(0x05060708_00000000L);
assertThat(copy.getLong(0)).isEqualTo(0x05060708_00000000L);
}
}
}
@ -665,13 +671,13 @@ public class BufferReferenceCountingTest extends BufferTestSupport {
@ParameterizedTest
@MethodSource("allocators")
public void sliceOfReadOnlyBufferMustBeReadOnly(Fixture fixture) {
public void copyOfReadOnlyBufferMustBeReadOnly(Fixture fixture) {
try (BufferAllocator allocator = fixture.createAllocator();
Buffer buf = allocator.allocate(8)) {
buf.writeLong(0x0102030405060708L);
buf.makeReadOnly();
try (Buffer slice = buf.slice()) {
assertTrue(slice.readOnly());
try (Buffer copy = buf.copy()) {
assertTrue(copy.readOnly());
}
}
}

View File

@ -19,7 +19,6 @@ import io.netty.buffer.api.Buffer;
import io.netty.buffer.api.BufferAllocator;
import io.netty.buffer.api.CompositeBuffer;
import io.netty.buffer.api.MemoryManagers;
import io.netty.buffer.api.internal.ResourceSupport;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
@ -272,8 +271,7 @@ public abstract class BufferTestSupport {
}
var stream = builder.build();
return stream.flatMap(BufferTestSupport::injectSplits)
.flatMap(BufferTestSupport::injectSlices);
return stream.flatMap(BufferTestSupport::injectSplits);
}
private static Stream<Fixture> injectSplits(Fixture f) {
@ -303,59 +301,6 @@ public abstract class BufferTestSupport {
return builder.build();
}
private static Stream<Fixture> injectSlices(Fixture f) {
Builder<Fixture> builder = Stream.builder();
builder.add(f);
var props = concat(f.getProperties(), Fixture.Properties.SLICE);
builder.add(new Fixture(f + ".slice(0, capacity())", () -> {
return new BufferAllocator() {
BufferAllocator allocatorBase;
@Override
public Buffer allocate(int size) {
allocatorBase = f.get();
try (Buffer base = allocatorBase.allocate(size)) {
return base.slice(0, base.capacity()).writerOffset(0);
}
}
@Override
public void close() {
if (allocatorBase != null) {
allocatorBase.close();
allocatorBase = null;
}
}
};
}, props));
builder.add(new Fixture(f + ".slice(1, capacity() - 2)", () -> {
return new BufferAllocator() {
BufferAllocator allocatorBase;
@Override
public Buffer allocate(int size) {
allocatorBase = f.get();
try (Buffer base = allocatorBase.allocate(size + 2)) {
return base.slice(1, size).writerOffset(0);
}
}
@Override
public void close() {
if (allocatorBase != null) {
allocatorBase.close();
allocatorBase = null;
}
}
};
}, props));
return builder.build();
}
private static Fixture.Properties[] concat(Fixture.Properties[] props, Fixture.Properties prop) {
props = Arrays.copyOf(props, props.length + 1);
props[props.length - 1] = prop;
return props;
}
@BeforeAll
static void startExecutor() throws IOException, ParseException {
executor = Executors.newSingleThreadExecutor(new ThreadFactory() {
@ -392,7 +337,7 @@ public abstract class BufferTestSupport {
assertThrows(IllegalStateException.class, () -> buf.split());
assertThrows(IllegalStateException.class, () -> asRS(buf).send());
assertThrows(IllegalStateException.class, () -> asRS(buf).acquire());
assertThrows(IllegalStateException.class, () -> buf.slice());
assertThrows(IllegalStateException.class, () -> buf.copy());
assertThrows(IllegalStateException.class, () -> buf.openCursor());
assertThrows(IllegalStateException.class, () -> buf.openCursor(0, 0));
assertThrows(IllegalStateException.class, () -> buf.openReverseCursor());
@ -864,67 +809,6 @@ public abstract class BufferTestSupport {
}
}
public static void verifyWriteAccessible(Buffer buf) {
buf.reset().writeByte((byte) 32);
assertThat(buf.readByte()).isEqualTo((byte) 32);
buf.reset().writerOffset(0).writeUnsignedByte(32);
assertThat(buf.readUnsignedByte()).isEqualTo(32);
buf.reset().writerOffset(0).writeChar('3');
assertThat(buf.readChar()).isEqualTo('3');
buf.reset().writerOffset(0).writeShort((short) 32);
assertThat(buf.readShort()).isEqualTo((short) 32);
buf.reset().writerOffset(0).writeUnsignedShort(32);
assertThat(buf.readUnsignedShort()).isEqualTo(32);
buf.reset().writerOffset(0).writeMedium(32);
assertThat(buf.readMedium()).isEqualTo(32);
buf.reset().writerOffset(0).writeUnsignedMedium(32);
assertThat(buf.readUnsignedMedium()).isEqualTo(32);
buf.reset().writerOffset(0).writeInt(32);
assertThat(buf.readInt()).isEqualTo(32);
buf.reset().writerOffset(0).writeUnsignedInt(32);
assertThat(buf.readUnsignedInt()).isEqualTo(32L);
buf.reset().writerOffset(0).writeFloat(3.2f);
assertThat(buf.readFloat()).isEqualTo(3.2f);
buf.reset().writerOffset(0).writeLong(32);
assertThat(buf.readLong()).isEqualTo(32L);
buf.reset().writerOffset(0).writeDouble(3.2);
assertThat(buf.readDouble()).isEqualTo(3.2);
buf.setByte(0, (byte) 32);
assertThat(buf.getByte(0)).isEqualTo((byte) 32);
buf.setUnsignedByte(0, 32);
assertThat(buf.getUnsignedByte(0)).isEqualTo(32);
buf.setChar(0, '3');
assertThat(buf.getChar(0)).isEqualTo('3');
buf.setShort(0, (short) 32);
assertThat(buf.getShort(0)).isEqualTo((short) 32);
buf.setUnsignedShort(0, 32);
assertThat(buf.getUnsignedShort(0)).isEqualTo(32);
buf.setMedium(0, 32);
assertThat(buf.getMedium(0)).isEqualTo(32);
buf.setUnsignedMedium(0, 32);
assertThat(buf.getUnsignedMedium(0)).isEqualTo(32);
buf.setInt(0, 32);
assertThat(buf.getInt(0)).isEqualTo(32);
buf.setUnsignedInt(0, 32);
assertThat(buf.getUnsignedInt(0)).isEqualTo(32L);
buf.setFloat(0, 3.2f);
assertThat(buf.getFloat(0)).isEqualTo(3.2f);
buf.setLong(0, 32);
assertThat(buf.getLong(0)).isEqualTo(32L);
buf.setDouble(0, 3.2);
assertThat(buf.getDouble(0)).isEqualTo(3.2);
if (asRS(buf).isOwned()) {
buf.ensureWritable(1);
}
buf.fill((byte) 0);
try (BufferAllocator allocator = BufferAllocator.heap();
Buffer source = allocator.allocate(8)) {
source.copyInto(0, buf, 0, 1);
}
}
public static void verifyForEachReadableSingleComponent(Fixture fixture, Buffer buf) {
buf.forEachReadable(0, (index, component) -> {
var buffer = component.readableBuffer();
@ -999,14 +883,21 @@ public abstract class BufferTestSupport {
return bs;
}
public static int countBorrows(Buffer buf) {
return ((ResourceSupport<?, ?>) buf).countBorrows();
public static byte[] readByteArray(Buffer buf) {
byte[] bs = new byte[buf.readableBytes()];
buf.copyInto(buf.readerOffset(), bs, 0, bs.length);
buf.readerOffset(buf.writerOffset());
return bs;
}
public static void assertEquals(Buffer expected, Buffer actual) {
assertThat(toByteArray(actual)).containsExactly(toByteArray(expected));
}
public static void assertReadableEquals(Buffer expected, Buffer actual) {
assertThat(readByteArray(actual)).containsExactly(readByteArray(expected));
}
public static void assertEquals(byte expected, byte actual) {
if (expected != actual) {
fail(String.format("expected: %1$s (0x%1$X) but was: %2$s (0x%2$X)", expected, actual));

View File

@ -76,8 +76,6 @@ public class BufferWriteBytesCombinationsTest extends BufferTestSupport {
assertThat(target.writerOffset()).isEqualTo(35);
assertThat(source.readerOffset()).isEqualTo(35);
assertThat(source.writerOffset()).isEqualTo(35);
try (Buffer readableSlice = target.slice()) {
assertEquals(source, readableSlice);
}
assertReadableEquals(source, target);
}
}

View File

@ -70,10 +70,6 @@ public final class Fixture implements Supplier<BufferAllocator> {
return properties.contains(Properties.CLEANER);
}
public boolean isSlice() {
return properties.contains(Properties.SLICE);
}
public boolean isConst() {
return properties.contains(Properties.CONST);
}
@ -84,7 +80,6 @@ public final class Fixture implements Supplier<BufferAllocator> {
CONST,
COMPOSITE,
CLEANER,
POOLED,
SLICE
POOLED
}
}

View File

@ -22,7 +22,7 @@ import io.netty.buffer.api.Scope;
import java.util.concurrent.ThreadLocalRandom;
public final class ComposingAndSlicingExample {
public final class ComposingAndSplittingExample {
public static void main(String[] args) {
try (BufferAllocator allocator = BufferAllocator.pooledDirect();
Buffer buf = createBigBuffer(allocator)) {
@ -32,7 +32,7 @@ public final class ComposingAndSlicingExample {
buf.writeByte((byte) tlr.nextInt());
}
try (Buffer slice = buf.slice()) {
try (Buffer slice = buf.split()) {
slice.send();
System.out.println("buf.capacity() = " + buf.capacity());
System.out.println("buf.readableBytes() = " + buf.readableBytes());

View File

@ -103,85 +103,6 @@ public class SendExample {
}
static final class Ex3 {
public static void main(String[] args) throws Exception {
ExecutorService executor = newFixedThreadPool(4);
BufferAllocator allocator = BufferAllocator.heap();
try (Buffer buf = allocator.allocate(4096)) {
// !!! pit-fall: Rc decrement in other thread.
var futA = executor.submit(new Task(buf.slice(0, 1024)));
var futB = executor.submit(new Task(buf.slice(1024, 1024)));
var futC = executor.submit(new Task(buf.slice(2048, 1024)));
var futD = executor.submit(new Task(buf.slice(3072, 1024)));
futA.get();
futB.get();
futC.get();
futD.get();
}
allocator.close();
executor.shutdown();
}
private static class Task implements Runnable {
private final Buffer slice;
Task(Buffer slice) {
this.slice = slice;
}
@Override
public void run() {
try (slice) {
while (slice.writableBytes() > 0) {
slice.writeByte((byte) 42);
}
}
}
}
}
static final class Ex4 {
public static void main(String[] args) throws Exception {
ExecutorService executor = newFixedThreadPool(4);
BufferAllocator allocator = BufferAllocator.heap();
try (Buffer buf = allocator.allocate(4096);
Buffer sliceA = buf.slice(0, 1024);
Buffer sliceB = buf.slice(1024, 1024);
Buffer sliceC = buf.slice(2048, 1024);
Buffer sliceD = buf.slice(3072, 1024)) {
var futA = executor.submit(new Task(sliceA));
var futB = executor.submit(new Task(sliceB));
var futC = executor.submit(new Task(sliceC));
var futD = executor.submit(new Task(sliceD));
futA.get();
futB.get();
futC.get();
futD.get();
}
allocator.close();
executor.shutdown();
}
private static class Task implements Runnable {
private final Buffer slice;
Task(Buffer slice) {
this.slice = slice;
}
@Override
public void run() {
while (slice.writableBytes() > 0) {
slice.writeByte((byte) 42);
}
}
}
}
static final class Ex5 {
public static void main(String[] args) throws Exception {
ExecutorService executor = newFixedThreadPool(4);
BufferAllocator allocator = BufferAllocator.heap();

View File

@ -26,6 +26,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.SplittableRandom;
import static io.netty.buffer.api.tests.BufferTestSupport.readByteArray;
import static io.netty.buffer.api.tests.BufferTestSupport.toByteArray;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertFalse;
@ -89,9 +90,7 @@ public class AlternativeMessageDecoderTest {
while ((actualMessage = channel.readInbound()) != null) {
try (Buffer ignore = actualMessage) {
assertTrue(expectedItr.hasNext());
try (Buffer actual = actualMessage.slice()) {
assertThat(toByteArray(actual)).containsExactly(expectedItr.next());
}
assertThat(readByteArray(actualMessage)).containsExactly(expectedItr.next());
}
}
assertFalse(expectedItr.hasNext());

View File

@ -134,7 +134,7 @@ public abstract class ByteToMessageDecoder extends ChannelHandlerAdapter {
if (composite.writerOffset() != composite.capacity()) {
// Writer index must equal capacity if we are going to "write"
// new components to the end.
composite = cumulation.slice(0, composite.writerOffset());
composite = cumulation.split(composite.writerOffset());
cumulation.close();
}
} else {

View File

@ -33,6 +33,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import static io.netty.buffer.api.internal.Statics.asRS;
import static io.netty.buffer.api.tests.BufferTestSupport.assertEquals;
import static io.netty.buffer.api.CompositeBuffer.compose;
import static io.netty.buffer.api.tests.BufferTestSupport.assertReadableEquals;
import static java.nio.ByteOrder.BIG_ENDIAN;
import static java.nio.ByteOrder.LITTLE_ENDIAN;
import static org.assertj.core.api.Assertions.assertThat;
@ -61,11 +62,13 @@ public class ByteToMessageDecoderTest {
}
});
try (Buffer buf = BufferAllocator.heap().allocate(4).writeInt(0x01020304)) {
channel.writeInbound(buf.slice());
try (Buffer buf = BufferAllocator.heap().allocate(4, BIG_ENDIAN).writeInt(0x01020304)) {
channel.writeInbound(buf);
try (Buffer b = channel.readInbound()) {
buf.readByte();
assertEquals(b, buf);
assertEquals(3, b.readableBytes());
assertEquals(0x02, b.readByte());
assertEquals(0x03, b.readByte());
assertEquals(0x04, b.readByte());
}
}
}
@ -88,12 +91,10 @@ public class ByteToMessageDecoderTest {
}
});
channel.writeInbound(buf.slice());
channel.writeInbound(buf);
try (Buffer expected = BufferAllocator.heap().allocate(3, BIG_ENDIAN).writeShort((short) 0x0203).writeByte((byte) 0x04);
Buffer b = channel.readInbound();
Buffer actual = b.slice(); // Only compare readable bytes.
buf) {
assertEquals(expected, actual);
Buffer actual = channel.readInbound()) {
assertReadableEquals(expected, actual);
}
}
@ -120,9 +121,8 @@ public class ByteToMessageDecoderTest {
assertTrue(channel.writeInbound(buf));
assertTrue(channel.finish());
try (Buffer expected = BufferAllocator.heap().allocate(1).writeByte((byte) 0x02);
Buffer b = channel.readInbound();
Buffer actual = b.slice()) {
assertEquals(expected, actual);
Buffer actual = channel.readInbound()) {
assertReadableEquals(expected, actual);
assertNull(channel.readInbound());
}
}
@ -243,11 +243,10 @@ public class ByteToMessageDecoderTest {
});
try (Buffer buf = BufferAllocator.heap().allocate(4, BIG_ENDIAN).writeInt(0x01020304)) {
assertTrue(channel.writeInbound(buf.slice()));
try (Buffer expected = buf.slice(1, 3);
Buffer b = channel.readInbound();
Buffer actual = b.slice()) {
assertEquals(expected, actual);
assertTrue(channel.writeInbound(buf.copy()));
try (Buffer expected = buf.copy(1, 3);
Buffer actual = channel.readInbound()) {
assertReadableEquals(expected, actual);
assertFalse(channel.finish());
}
}
@ -259,9 +258,8 @@ public class ByteToMessageDecoderTest {
@Override
protected void decode(ChannelHandlerContext ctx, Buffer in) {
assertTrue(in.readableBytes() > 0);
Buffer slice = in.slice();
in.readerOffset(in.readerOffset() + in.readableBytes());
ctx.fireChannelRead(slice);
Buffer chunk = in.split();
ctx.fireChannelRead(chunk);
}
});
byte[] bytes = new byte[1024];
@ -271,9 +269,9 @@ public class ByteToMessageDecoderTest {
for (byte b : bytes) {
buf.writeByte(b);
}
assertTrue(channel.writeInbound(buf.slice()));
assertTrue(channel.writeInbound(buf.copy()));
try (Buffer b = channel.readInbound()) {
assertEquals(buf, b);
assertReadableEquals(buf, b);
assertNull(channel.readInbound());
assertFalse(channel.finish());
assertNull(channel.readInbound());
@ -294,9 +292,8 @@ public class ByteToMessageDecoderTest {
return;
}
int read = decodeLast ? readable : readable - 1;
Buffer slice = in.slice(in.readerOffset(), read);
in.readerOffset(in.readerOffset() + read);
ctx.fireChannelRead(slice);
Buffer chunk = in.split(in.readerOffset() + read);
ctx.fireChannelRead(chunk);
}
@Override
@ -308,13 +305,10 @@ public class ByteToMessageDecoderTest {
});
byte[] bytes = new byte[1024];
ThreadLocalRandom.current().nextBytes(bytes);
try (Buffer buf = BufferAllocator.heap().allocate(bytes.length, BIG_ENDIAN);
Buffer part1 = buf.slice(0, bytes.length - 1);
Buffer part2 = buf.slice(bytes.length - 1, 1)) {
for (byte b : bytes) {
buf.writeByte(b);
}
assertTrue(channel.writeInbound(buf.slice()));
try (Buffer buf = BufferAllocator.heap().allocate(bytes.length, BIG_ENDIAN).writeBytes(bytes);
Buffer part1 = buf.copy(0, bytes.length - 1);
Buffer part2 = buf.copy(bytes.length - 1, 1)) {
assertTrue(channel.writeInbound(buf));
try (Buffer actual = channel.readInbound()) {
assertEquals(part1, actual);
}
@ -523,7 +517,6 @@ public class ByteToMessageDecoderTest {
public void testDecodeLast() {
final AtomicBoolean removeHandler = new AtomicBoolean();
EmbeddedChannel channel = new EmbeddedChannel(new ByteToMessageDecoder() {
@Override
protected void decode(ChannelHandlerContext ctx, Buffer in) {
if (removeHandler.get()) {
@ -533,12 +526,8 @@ public class ByteToMessageDecoderTest {
});
byte[] bytes = new byte[1024];
ThreadLocalRandom.current().nextBytes(bytes);
try (Buffer buf = BufferAllocator.heap().allocate(bytes.length)) {
for (byte b : bytes) {
buf.writeByte(b);
}
assertFalse(channel.writeInbound(buf.slice()));
try (Buffer buf = BufferAllocator.heap().allocate(bytes.length).writeBytes(bytes)) {
assertFalse(channel.writeInbound(buf.copy()));
assertNull(channel.readInbound());
removeHandler.set(true);
// This should trigger channelInputClosed(...)
@ -546,7 +535,7 @@ public class ByteToMessageDecoderTest {
assertTrue(channel.finish());
try (Buffer actual = channel.readInbound()) {
assertEquals(buf.slice(), actual);
assertReadableEquals(buf, actual);
}
assertNull(channel.readInbound());
}

View File

@ -54,11 +54,7 @@ public class FixedLengthFrameDecoder extends ByteToMessageDecoder {
if (in.readableBytes() < frameLength) {
return null;
} else {
try {
return in.slice(in.readerOffset(), frameLength);
} finally {
in.readerOffset(in.readerOffset() + frameLength);
}
return in.split(in.readerOffset() + frameLength);
}
}
}