From a2d49fed3ecbdf2902c82c18637d44dbf627f44c Mon Sep 17 00:00:00 2001 From: Chris Vest Date: Thu, 15 Apr 2021 17:17:00 +0200 Subject: [PATCH 01/12] Bring slice and bifurcate methods together They are conceptually related. --- src/main/java/io/netty/buffer/api/Buffer.java | 74 +++++++++---------- 1 file changed, 37 insertions(+), 37 deletions(-) diff --git a/src/main/java/io/netty/buffer/api/Buffer.java b/src/main/java/io/netty/buffer/api/Buffer.java index d44869f..81e1d53 100644 --- a/src/main/java/io/netty/buffer/api/Buffer.java +++ b/src/main/java/io/netty/buffer/api/Buffer.java @@ -291,43 +291,6 @@ public interface Buffer extends Rc, BufferAccessors { */ boolean readOnly(); - /** - * Returns a slice of this buffer's readable bytes. - * Modifying the content of the returned buffer or this buffer affects each other's content, - * while they maintain separate offsets. This method is identical to - * {@code buf.slice(buf.readerOffset(), buf.readableBytes())}. - * This method does not modify {@link #readerOffset()} or {@link #writerOffset()} of this buffer. - *

- * This method increments the reference count of this buffer. - * The reference count is decremented again when the slice is deallocated. - *

- * The slice is created with a {@linkplain #writerOffset() write offset} equal to the length of the slice, - * so that the entire contents of the slice is ready to be read. - * - * @return A new buffer instance, with independent {@link #readerOffset()} and {@link #writerOffset()}, - * that is a view of the readable region of this buffer. - */ - default Buffer slice() { - return slice(readerOffset(), readableBytes()); - } - - /** - * Returns a slice of the given region of this buffer. - * Modifying the content of the returned buffer or this buffer affects each other's content, - * while they maintain separate offsets. - * This method does not modify {@link #readerOffset()} or {@link #writerOffset()} of this buffer. - *

- * This method increments the reference count of this buffer. - * The reference count is decremented again when the slice is deallocated. - *

- * The slice is created with a {@linkplain #writerOffset() write offset} equal to the length of the slice, - * so that the entire contents of the slice is ready to be read. - * - * @return A new buffer instance, with independent {@link #readerOffset()} and {@link #writerOffset()}, - * that is a view of the given region of this buffer. - */ - Buffer slice(int offset, int length); - /** * Copies the given length of data from this buffer into the given destination array, beginning at the given source * position in this buffer, and the given destination position in the destination array. @@ -517,6 +480,43 @@ public interface Buffer extends Rc, BufferAccessors { */ void ensureWritable(int size, boolean allowCompaction); + /** + * Returns a slice of this buffer's readable bytes. + * Modifying the content of the returned buffer or this buffer affects each other's content, + * while they maintain separate offsets. This method is identical to + * {@code buf.slice(buf.readerOffset(), buf.readableBytes())}. + * This method does not modify {@link #readerOffset()} or {@link #writerOffset()} of this buffer. + *

+ * This method increments the reference count of this buffer. + * The reference count is decremented again when the slice is deallocated. + *

+ * The slice is created with a {@linkplain #writerOffset() write offset} equal to the length of the slice, + * so that the entire contents of the slice is ready to be read. + * + * @return A new buffer instance, with independent {@link #readerOffset()} and {@link #writerOffset()}, + * that is a view of the readable region of this buffer. + */ + default Buffer slice() { + return slice(readerOffset(), readableBytes()); + } + + /** + * Returns a slice of the given region of this buffer. + * Modifying the content of the returned buffer or this buffer affects each other's content, + * while they maintain separate offsets. + * This method does not modify {@link #readerOffset()} or {@link #writerOffset()} of this buffer. + *

+ * This method increments the reference count of this buffer. + * The reference count is decremented again when the slice is deallocated. + *

+ * The slice is created with a {@linkplain #writerOffset() write offset} equal to the length of the slice, + * so that the entire contents of the slice is ready to be read. + * + * @return A new buffer instance, with independent {@link #readerOffset()} and {@link #writerOffset()}, + * that is a view of the given region of this buffer. + */ + Buffer slice(int offset, int length); + /** * Split the buffer into two, at the {@linkplain #writerOffset() write offset} position. *

From 77754609840f8ce53c50e45255ebf79378a0e05d Mon Sep 17 00:00:00 2001 From: Chris Vest Date: Thu, 22 Apr 2021 16:57:31 +0200 Subject: [PATCH 02/12] Make bifurcate and ensureWritable more flexible This supports more use cases. The ensureWritable method can now amortise its allocation cost by allocating more than what is strictly necessary to satisfy the immediate call. The bifurcate method can now split at a given offset. --- src/main/java/io/netty/buffer/api/Buffer.java | 104 ++++++++++++++++-- .../io/netty/buffer/api/CompositeBuffer.java | 35 ++++-- src/main/java/io/netty/buffer/api/Send.java | 2 +- .../buffer/api/bytebuffer/NioBuffer.java | 28 +++-- .../netty/buffer/api/memseg/MemSegBuffer.java | 28 +++-- .../netty/buffer/api/unsafe/UnsafeBuffer.java | 32 ++++-- .../buffer/api/BufferEnsureWritableTest.java | 24 +++- .../api/BufferReferenceCountingTest.java | 88 +++++++++++++++ .../netty/buffer/api/BufferTestSupport.java | 12 +- 9 files changed, 299 insertions(+), 54 deletions(-) diff --git a/src/main/java/io/netty/buffer/api/Buffer.java b/src/main/java/io/netty/buffer/api/Buffer.java index 81e1d53..b3f382d 100644 --- a/src/main/java/io/netty/buffer/api/Buffer.java +++ b/src/main/java/io/netty/buffer/api/Buffer.java @@ -54,8 +54,6 @@ import java.nio.ByteOrder; * such as with the {@link #getByte(int)} method, * from multiple threads. *

- * Confined buffers will initially be confined to the thread that allocates them. - *

* If a buffer needs to be accessed by a different thread, * then the ownership of that buffer must be sent to that thread. * This can be done with the {@link #send()} method. @@ -101,6 +99,34 @@ import java.nio.ByteOrder; * 0 <= readerOffset <= writerOffset <= capacity * * + *

Slice vs. Bifurcate

+ * + * The {@link #slice()} and {@link #bifurcate()} methods both return new buffers on the memory of the buffer they're + * called on. + * However, there are also important differences between the two, as they are aimed at different use cases that were + * previously (in the {@code ByteBuf} API) covered by {@code slice()} alone. + * + * + * + * These differences means that slicing is mostly suitable for when you temporarily want to share a focused area of a + * buffer. + * Examples of this include doing IO, or decoding a bounded part of a larger message. + * On the other hand, bifurcate is suitable for when you want to hand over a region of a buffer to some other, + * perhaps unknown, piece of code, and relinquish your ownership of that buffer region in the process. + * Examples of include aggregating messages into an accumulator buffer, and sending messages down the pipeline for + * further processing, as bifurcated buffer regions, once their data has been received in its entirety. */ public interface Buffer extends Rc, BufferAccessors { /** @@ -432,7 +458,7 @@ public interface Buffer extends Rc, BufferAccessors { * If this buffer already has the necessary space, then this method returns immediately. * If this buffer does not already have the necessary space, then it will be expanded using the * {@link BufferAllocator} the buffer was created with. - * This method is the same as calling {@link #ensureWritable(int, boolean)} where {@code allowCompaction} is + * This method is the same as calling {@link #ensureWritable(int, int, boolean)} where {@code allowCompaction} is * {@code false}. * * @param size The requested number of bytes of space that should be available for writing. @@ -440,7 +466,7 @@ public interface Buffer extends Rc, BufferAccessors { * or is {@linkplain #readOnly() read-only}. */ default void ensureWritable(int size) { - ensureWritable(size, true); + ensureWritable(size, 1, true); } /** @@ -472,13 +498,17 @@ public interface Buffer extends Rc, BufferAccessors { * * * @param size The requested number of bytes of space that should be available for writing. + * @param minimumGrowth The minimum number of bytes to grow by. If it is determined that memory should be allocated + * and copied, make sure that the new memory allocation is bigger than the old one by at least + * this many bytes. This way, the buffer can grow by more than what is immediately necessary, + * thus amortising the costs of allocating and copying. * @param allowCompaction {@code true} if the method is allowed to modify the * {@linkplain #readerOffset() reader offset} and * {@linkplain #writerOffset() writer offset}, otherwise {@code false}. * @throws IllegalStateException if this buffer is not in an {@linkplain #isOwned() owned} state, - * * or is {@linkplain #readOnly() read-only}. + * or is {@linkplain #readOnly() read-only}. */ - void ensureWritable(int size, boolean allowCompaction); + void ensureWritable(int size, int minimumGrowth, boolean allowCompaction); /** * Returns a slice of this buffer's readable bytes. @@ -492,6 +522,9 @@ public interface Buffer extends Rc, BufferAccessors { *

* The slice is created with a {@linkplain #writerOffset() write offset} equal to the length of the slice, * so that the entire contents of the slice is ready to be read. + *

+ * See the Slice vs. Bifurcate section for details on the difference between slice + * and bifurcate. * * @return A new buffer instance, with independent {@link #readerOffset()} and {@link #writerOffset()}, * that is a view of the readable region of this buffer. @@ -511,6 +544,9 @@ public interface Buffer extends Rc, BufferAccessors { *

* The slice is created with a {@linkplain #writerOffset() write offset} equal to the length of the slice, * so that the entire contents of the slice is ready to be read. + *

+ * See the Slice vs. Bifurcate section for details on the difference between slice + * and bifurcate. * * @return A new buffer instance, with independent {@link #readerOffset()} and {@link #writerOffset()}, * that is a view of the given region of this buffer. @@ -558,10 +594,64 @@ public interface Buffer extends Rc, BufferAccessors { * simply split its internal array in two. *

* Bifurcated buffers support all operations that normal buffers do, including {@link #ensureWritable(int)}. + *

+ * See the Slice vs. Bifurcate section for details on the difference between slice + * and bifurcate. * * @return A new buffer with independent and exclusive ownership over the read and readable bytes from this buffer. */ - Buffer bifurcate(); + default Buffer bifurcate() { + return bifurcate(writerOffset()); + } + + /** + * Split the buffer into two, at the given {@code splitOffset}. + *

+ * The buffer must be in {@linkplain #isOwned() an owned state}, or an exception will be thrown. + *

+ * The region of this buffer that precede the {@code splitOffset}, will be captured and returned in a new + * buffer, that will hold its own ownership of that region. This allows the returned buffer to be indepentently + * {@linkplain #send() sent} to other threads. + *

+ * The returned buffer will adopt the {@link #readerOffset()} and {@link #writerOffset()} of this buffer, + * but truncated to fit within the capacity dictated by the {@code splitOffset}. + *

+ * The memory region in the returned buffer will become inaccessible through this buffer. If the + * {@link #readerOffset()} or {@link #writerOffset()} of this buffer lie prior to the {@code splitOffset}, + * then those offsets will be moved forward so they land on offset 0 after the bifurcation. + *

+ * Effectively, the following transformation takes place: + *

{@code
+     *         This buffer:
+     *          +--------------------------------+
+     *         0|               |splitOffset     |cap
+     *          +---------------+----------------+
+     *         /               / \               \
+     *        /               /   \               \
+     *       /               /     \               \
+     *      /               /       \               \
+     *     /               /         \               \
+     *    +---------------+           +---------------+
+     *    |               |cap        |               |cap
+     *    +---------------+           +---------------+
+     *    Returned buffer.            This buffer.
+     * }
+ * When the buffers are in this state, both of the bifurcated parts retain an atomic reference count on the + * underlying memory. This means that shared underlying memory will not be deallocated or returned to a pool, until + * all of the bifurcated parts have been closed. + *

+ * Composite buffers have it a little easier, in that at most only one of the constituent buffers will actually be + * bifurcated. If the split point lands perfectly between two constituent buffers, then a composite buffer can + * simply split its internal array in two. + *

+ * Bifurcated buffers support all operations that normal buffers do, including {@link #ensureWritable(int)}. + *

+ * See the Slice vs. Bifurcate section for details on the difference between slice + * and bifurcate. + * + * @return A new buffer with independent and exclusive ownership over the read and readable bytes from this buffer. + */ + Buffer bifurcate(int splitOffset); /** * Discards the read bytes, and moves the buffer contents to the beginning of the buffer. diff --git a/src/main/java/io/netty/buffer/api/CompositeBuffer.java b/src/main/java/io/netty/buffer/api/CompositeBuffer.java index c986d65..7c72a1c 100644 --- a/src/main/java/io/netty/buffer/api/CompositeBuffer.java +++ b/src/main/java/io/netty/buffer/api/CompositeBuffer.java @@ -606,13 +606,16 @@ final class CompositeBuffer extends RcSupport implement } @Override - public void ensureWritable(int size, boolean allowCompaction) { + public void ensureWritable(int size, int minimumGrowth, boolean allowCompaction) { if (!isOwned()) { throw new IllegalStateException("Buffer is not owned. Only owned buffers can call ensureWritable."); } if (size < 0) { throw new IllegalArgumentException("Cannot ensure writable for a negative size: " + size + '.'); } + if (minimumGrowth < 0) { + throw new IllegalArgumentException("The minimum growth cannot be negative: " + minimumGrowth + '.'); + } if (readOnly) { throw bufferIsReadOnly(); } @@ -649,13 +652,20 @@ final class CompositeBuffer extends RcSupport implement // Now we have enough space. return; } + } else if (bufs.length == 1) { + // If we only have a single component buffer, then we can safely compact that in-place. + bufs[0].compact(); + computeBufferOffsets(); + if (writableBytes() >= size) { + // Now we have enough space. + return; + } } } - long newSize = capacity() + (long) size; - BufferAllocator.checkSize(newSize); - int growth = size - writableBytes(); - Buffer extension = bufs.length == 0? allocator.allocate(growth) : allocator.allocate(growth, order()); + int growth = Math.max(size - writableBytes(), minimumGrowth); + BufferAllocator.checkSize(capacity() + (long) growth); + Buffer extension = allocator.allocate(growth, order()); unsafeExtendWith(extension); } @@ -739,7 +749,14 @@ final class CompositeBuffer extends RcSupport implement } @Override - public Buffer bifurcate() { + public Buffer bifurcate(int splitOffset) { + if (splitOffset < 0) { + throw new IllegalArgumentException("The split offset cannot be negative: " + splitOffset + '.'); + } + if (capacity() < splitOffset) { + throw new IllegalArgumentException("The split offset cannot be greater than the buffer capacity, " + + "but the split offset was " + splitOffset + ", and capacity is " + capacity() + '.'); + } if (!isOwned()) { throw new IllegalStateException("Cannot bifurcate a buffer that is not owned."); } @@ -748,12 +765,12 @@ final class CompositeBuffer extends RcSupport implement return new CompositeBuffer(allocator, bufs, unsafeGetDrop(), true).order(order); } - int i = searchOffsets(woff); - int off = woff - offsets[i]; + int i = searchOffsets(splitOffset); + int off = splitOffset - offsets[i]; Buffer[] bifs = Arrays.copyOf(bufs, off == 0? i : 1 + i); bufs = Arrays.copyOfRange(bufs, off == bufs[i].capacity()? 1 + i : i, bufs.length); if (off > 0 && bifs.length > 0 && off < bifs[bifs.length - 1].capacity()) { - bifs[bifs.length - 1] = bufs[0].bifurcate(); + bifs[bifs.length - 1] = bufs[0].bifurcate(off); } computeBufferOffsets(); try { diff --git a/src/main/java/io/netty/buffer/api/Send.java b/src/main/java/io/netty/buffer/api/Send.java index 005c9c5..4e4d62d 100644 --- a/src/main/java/io/netty/buffer/api/Send.java +++ b/src/main/java/io/netty/buffer/api/Send.java @@ -101,7 +101,7 @@ public interface Send> extends Deref { * @param The result type of the mapping function. * @return A new {@link Send} instance that will deliver an object that is the result of the mapping. */ - default > Send map(Class type, Function mapper) { + default > Send map(Class type, Function mapper) { return sending(type, () -> mapper.apply(receive())); } diff --git a/src/main/java/io/netty/buffer/api/bytebuffer/NioBuffer.java b/src/main/java/io/netty/buffer/api/bytebuffer/NioBuffer.java index 7fbb7c6..24f5f27 100644 --- a/src/main/java/io/netty/buffer/api/bytebuffer/NioBuffer.java +++ b/src/main/java/io/netty/buffer/api/bytebuffer/NioBuffer.java @@ -364,7 +364,7 @@ class NioBuffer extends RcSupport implements Buffer, Readable } @Override - public void ensureWritable(int size, boolean allowCompaction) { + public void ensureWritable(int size, int minimumGrowth, boolean allowCompaction) { if (!isOwned()) { throw attachTrace(new IllegalStateException( "Buffer is not owned. Only owned buffers can call ensureWritable.")); @@ -372,6 +372,9 @@ class NioBuffer extends RcSupport implements Buffer, Readable if (size < 0) { throw new IllegalArgumentException("Cannot ensure writable for a negative size: " + size + '.'); } + if (minimumGrowth < 0) { + throw new IllegalArgumentException("The minimum growth cannot be negative: " + minimumGrowth + '.'); + } if (rmem != wmem) { throw bufferIsReadOnly(); } @@ -387,7 +390,7 @@ class NioBuffer extends RcSupport implements Buffer, Readable } // Allocate a bigger buffer. - long newSize = capacity() + size - (long) writableBytes(); + long newSize = capacity() + (long) Math.max(size - writableBytes(), minimumGrowth); BufferAllocator.checkSize(newSize); ByteBuffer buffer = (ByteBuffer) control.allocateUntethered(this, (int) newSize); buffer.order(order()); @@ -414,26 +417,33 @@ class NioBuffer extends RcSupport implements Buffer, Readable } @Override - public Buffer bifurcate() { + public Buffer bifurcate(int splitOffset) { + if (splitOffset < 0) { + throw new IllegalArgumentException("The split offset cannot be negative: " + splitOffset + '.'); + } + if (capacity() < splitOffset) { + throw new IllegalArgumentException("The split offset cannot be greater than the buffer capacity, " + + "but the split offset was " + splitOffset + ", and capacity is " + capacity() + '.'); + } if (!isOwned()) { throw attachTrace(new IllegalStateException("Cannot bifurcate a buffer that is not owned.")); } var drop = (ArcDrop) unsafeGetDrop(); unsafeSetDrop(new ArcDrop<>(drop)); - var bifurcatedBuffer = rmem.slice(0, woff); + var bifurcatedBuffer = rmem.slice(0, splitOffset); // TODO maybe incrementing the existing ArcDrop is enough; maybe we don't need to wrap it in another ArcDrop. var bifurcatedBuf = new NioBuffer(base, bifurcatedBuffer, control, new ArcDrop<>(drop.increment())); - bifurcatedBuf.woff = woff; - bifurcatedBuf.roff = roff; + bifurcatedBuf.woff = Math.min(woff, splitOffset); + bifurcatedBuf.roff = Math.min(roff, splitOffset); bifurcatedBuf.order(order()); boolean readOnly = readOnly(); bifurcatedBuf.readOnly(readOnly); - rmem = rmem.slice(woff, rmem.capacity() - woff); + rmem = rmem.slice(splitOffset, rmem.capacity() - splitOffset); if (!readOnly) { wmem = rmem; } - woff = 0; - roff = 0; + woff = Math.max(woff, splitOffset) - splitOffset; + roff = Math.max(roff, splitOffset) - splitOffset; return bifurcatedBuf; } diff --git a/src/main/java/io/netty/buffer/api/memseg/MemSegBuffer.java b/src/main/java/io/netty/buffer/api/memseg/MemSegBuffer.java index 75d2f99..1f7f16f 100644 --- a/src/main/java/io/netty/buffer/api/memseg/MemSegBuffer.java +++ b/src/main/java/io/netty/buffer/api/memseg/MemSegBuffer.java @@ -490,7 +490,7 @@ class MemSegBuffer extends RcSupport implements Buffer, Re } @Override - public void ensureWritable(int size, boolean allowCompaction) { + public void ensureWritable(int size, int minimumGrowth, boolean allowCompaction) { if (!isOwned()) { throw attachTrace(new IllegalStateException( "Buffer is not owned. Only owned buffers can call ensureWritable.")); @@ -498,6 +498,9 @@ class MemSegBuffer extends RcSupport implements Buffer, Re if (size < 0) { throw new IllegalArgumentException("Cannot ensure writable for a negative size: " + size + '.'); } + if (minimumGrowth < 0) { + throw new IllegalArgumentException("The minimum growth cannot be negative: " + minimumGrowth + '.'); + } if (seg != wseg) { throw bufferIsReadOnly(); } @@ -513,7 +516,7 @@ class MemSegBuffer extends RcSupport implements Buffer, Re } // Allocate a bigger buffer. - long newSize = capacity() + size - (long) writableBytes(); + long newSize = capacity() + (long) Math.max(size - writableBytes(), minimumGrowth); BufferAllocator.checkSize(newSize); MemorySegment newSegment = (MemorySegment) alloc.allocateUntethered(this, (int) newSize); @@ -545,25 +548,32 @@ class MemSegBuffer extends RcSupport implements Buffer, Re } @Override - public Buffer bifurcate() { + public Buffer bifurcate(int splitOffset) { + if (splitOffset < 0) { + throw new IllegalArgumentException("The split offset cannot be negative: " + splitOffset + '.'); + } + if (capacity() < splitOffset) { + throw new IllegalArgumentException("The split offset cannot be greater than the buffer capacity, " + + "but the split offset was " + splitOffset + ", and capacity is " + capacity() + '.'); + } if (!isOwned()) { throw attachTrace(new IllegalStateException("Cannot bifurcate a buffer that is not owned.")); } var drop = (ArcDrop) unsafeGetDrop(); unsafeSetDrop(new ArcDrop<>(drop)); - var bifurcatedSeg = seg.asSlice(0, woff); + var bifurcatedSeg = seg.asSlice(0, splitOffset); var bifurcatedBuf = new MemSegBuffer(base, bifurcatedSeg, new ArcDrop<>(drop.increment()), alloc); - bifurcatedBuf.woff = woff; - bifurcatedBuf.roff = roff; + bifurcatedBuf.woff = Math.min(woff, splitOffset); + bifurcatedBuf.roff = Math.min(roff, splitOffset); bifurcatedBuf.order(order); boolean readOnly = readOnly(); bifurcatedBuf.readOnly(readOnly); - seg = seg.asSlice(woff, seg.byteSize() - woff); + seg = seg.asSlice(splitOffset, seg.byteSize() - splitOffset); if (!readOnly) { wseg = seg; } - woff = 0; - roff = 0; + woff = Math.max(woff, splitOffset) - splitOffset; + roff = Math.max(roff, splitOffset) - splitOffset; return bifurcatedBuf; } diff --git a/src/main/java/io/netty/buffer/api/unsafe/UnsafeBuffer.java b/src/main/java/io/netty/buffer/api/unsafe/UnsafeBuffer.java index 3b824e4..3be6fac 100644 --- a/src/main/java/io/netty/buffer/api/unsafe/UnsafeBuffer.java +++ b/src/main/java/io/netty/buffer/api/unsafe/UnsafeBuffer.java @@ -398,7 +398,7 @@ public class UnsafeBuffer extends RcSupport implements Buf } @Override - public void ensureWritable(int size, boolean allowCompaction) { + public void ensureWritable(int size, int minimumGrowth, boolean allowCompaction) { if (!isOwned()) { throw attachTrace(new IllegalStateException( "Buffer is not owned. Only owned buffers can call ensureWritable.")); @@ -406,6 +406,9 @@ public class UnsafeBuffer extends RcSupport implements Buf if (size < 0) { throw new IllegalArgumentException("Cannot ensure writable for a negative size: " + size + '.'); } + if (minimumGrowth < 0) { + throw new IllegalArgumentException("The minimum growth cannot be negative: " + minimumGrowth + '.'); + } if (rsize != wsize) { throw bufferIsReadOnly(); } @@ -421,7 +424,7 @@ public class UnsafeBuffer extends RcSupport implements Buf } // Allocate a bigger buffer. - long newSize = capacity() + size - (long) writableBytes(); + long newSize = capacity() + (long) Math.max(size - writableBytes(), minimumGrowth); BufferAllocator.checkSize(newSize); UnsafeMemory memory = (UnsafeMemory) control.allocateUntethered(this, (int) newSize); @@ -455,27 +458,34 @@ public class UnsafeBuffer extends RcSupport implements Buf } @Override - public Buffer bifurcate() { + public Buffer bifurcate(int splitOffset) { + if (splitOffset < 0) { + throw new IllegalArgumentException("The split offset cannot be negative: " + splitOffset + '.'); + } + if (capacity() < splitOffset) { + throw new IllegalArgumentException("The split offset cannot be greater than the buffer capacity, " + + "but the split offset was " + splitOffset + ", and capacity is " + capacity() + '.'); + } if (!isOwned()) { throw attachTrace(new IllegalStateException("Cannot bifurcate a buffer that is not owned.")); } var drop = (ArcDrop) unsafeGetDrop(); unsafeSetDrop(new ArcDrop<>(drop)); // TODO maybe incrementing the existing ArcDrop is enough; maybe we don't need to wrap it in another ArcDrop. - var bifurcatedBuf = new UnsafeBuffer(memory, baseOffset, woff, control, new ArcDrop<>(drop.increment())); - bifurcatedBuf.woff = woff; - bifurcatedBuf.roff = roff; + var bifurcatedBuf = new UnsafeBuffer(memory, baseOffset, splitOffset, control, new ArcDrop<>(drop.increment())); + bifurcatedBuf.woff = Math.min(woff, splitOffset); + bifurcatedBuf.roff = Math.min(roff, splitOffset); bifurcatedBuf.order(order()); boolean readOnly = readOnly(); bifurcatedBuf.readOnly(readOnly); - rsize -= woff; - baseOffset += woff; - address += woff; + rsize -= splitOffset; + baseOffset += splitOffset; + address += splitOffset; if (!readOnly) { wsize = rsize; } - woff = 0; - roff = 0; + woff = Math.max(woff, splitOffset) - splitOffset; + roff = Math.max(roff, splitOffset) - splitOffset; return bifurcatedBuf; } diff --git a/src/test/java/io/netty/buffer/api/BufferEnsureWritableTest.java b/src/test/java/io/netty/buffer/api/BufferEnsureWritableTest.java index 4d24bf3..eb436bb 100644 --- a/src/test/java/io/netty/buffer/api/BufferEnsureWritableTest.java +++ b/src/test/java/io/netty/buffer/api/BufferEnsureWritableTest.java @@ -53,8 +53,8 @@ public class BufferEnsureWritableTest extends BufferTestSupport { @MethodSource("allocators") public void ensureWritableMustThrowIfRequestedSizeWouldGrowBeyondMaxAllowed(Fixture fixture) { try (BufferAllocator allocator = fixture.createAllocator(); - Buffer buf = allocator.allocate(512)) { - assertThrows(IllegalArgumentException.class, () -> buf.ensureWritable(Integer.MAX_VALUE - 8)); + Buffer buf = allocator.allocate(8)) { + assertThrows(IllegalArgumentException.class, () -> buf.ensureWritable(Integer.MAX_VALUE - 7)); } } @@ -130,15 +130,31 @@ public class BufferEnsureWritableTest extends BufferTestSupport { while (buf.readableBytes() > 0) { buf.readByte(); } - buf.ensureWritable(4, true); + buf.ensureWritable(4, 4, true); buf.writeInt(42); assertThat(buf.capacity()).isEqualTo(64); buf.writerOffset(60).readerOffset(60); - buf.ensureWritable(8, true); + buf.ensureWritable(8, 8, true); buf.writeLong(42); // Don't assert the capacity on this one, because single-component // composite buffers may choose to allocate rather than compact. } } + + @ParameterizedTest + @MethodSource("allocators") + public void ensureWritableWithLargeMinimumGrowthMustGrowByAtLeastThatMuch(Fixture fixture) { + try (BufferAllocator allocator = fixture.createAllocator(); + Buffer buf = allocator.allocate(16)) { + buf.writeLong(0).writeInt(0); + buf.readLong(); + buf.readInt(); // Compaction is now possible as well. + buf.ensureWritable(8, 32, true); // We don't need to allocate. + assertThat(buf.capacity()).isEqualTo(16); + buf.writeByte((byte) 1); + buf.ensureWritable(16, 32, true); // Now we DO need to allocate, because we can't compact. + assertThat(buf.capacity()).isEqualTo(16 /* existing capacity */ + 32 /* minimum growth */); + } + } } diff --git a/src/test/java/io/netty/buffer/api/BufferReferenceCountingTest.java b/src/test/java/io/netty/buffer/api/BufferReferenceCountingTest.java index 5d8f27f..a8974fd 100644 --- a/src/test/java/io/netty/buffer/api/BufferReferenceCountingTest.java +++ b/src/test/java/io/netty/buffer/api/BufferReferenceCountingTest.java @@ -401,6 +401,26 @@ public class BufferReferenceCountingTest extends BufferTestSupport { } } + @ParameterizedTest + @MethodSource("allocators") + public void bifurcateWithNegativeOffsetMustThrow(Fixture fixture) { + try (BufferAllocator allocator = fixture.createAllocator(); + Buffer buf = allocator.allocate(8)) { + buf.bifurcate(0).close(); + assertThrows(IllegalArgumentException.class, () -> buf.bifurcate(-1)); + } + } + + @ParameterizedTest + @MethodSource("allocators") + public void bifurcateWithOversizedOffsetMustThrow(Fixture fixture) { + try (BufferAllocator allocator = fixture.createAllocator(); + Buffer buf = allocator.allocate(8)) { + assertThrows(IllegalArgumentException.class, () -> buf.bifurcate(9)); + buf.bifurcate(8).close(); + } + } + @ParameterizedTest @MethodSource("allocators") public void bifurcateOfNonOwnedBufferMustThrow(Fixture fixture) { @@ -414,6 +434,53 @@ public class BufferReferenceCountingTest extends BufferTestSupport { } } + @ParameterizedTest + @MethodSource("allocators") + public void bifurcateOnOffsetOfNonOwnedBufferMustThrow(Fixture fixture) { + try (BufferAllocator allocator = fixture.createAllocator(); + Buffer buf = allocator.allocate(8)) { + try (Buffer acquired = buf.acquire()) { + var exc = assertThrows(IllegalStateException.class, () -> acquired.bifurcate(4)); + assertThat(exc).hasMessageContaining("owned"); + } + } + } + + @ParameterizedTest + @MethodSource("allocators") + public void bifurcateOnOffsetMustTruncateGreaterOffsets(Fixture fixture) { + try (BufferAllocator allocator = fixture.createAllocator(); + Buffer buf = allocator.allocate(8)) { + buf.writeInt(0x01020304); + buf.writeByte((byte) 0x05); + buf.readInt(); + try (Buffer bif = buf.bifurcate(2)) { + assertThat(buf.readerOffset()).isEqualTo(2); + assertThat(buf.writerOffset()).isEqualTo(3); + + assertThat(bif.readerOffset()).isEqualTo(2); + assertThat(bif.writerOffset()).isEqualTo(2); + } + } + } + + @ParameterizedTest + @MethodSource("allocators") + public void bifurcateOnOffsetMustExtendLesserOffsets(Fixture fixture) { + try (BufferAllocator allocator = fixture.createAllocator(); + Buffer buf = allocator.allocate(8)) { + buf.writeInt(0x01020304); + buf.readInt(); + try (Buffer bif = buf.bifurcate(6)) { + assertThat(buf.readerOffset()).isEqualTo(0); + assertThat(buf.writerOffset()).isEqualTo(0); + + assertThat(bif.readerOffset()).isEqualTo(4); + assertThat(bif.writerOffset()).isEqualTo(4); + } + } + } + @ParameterizedTest @MethodSource("allocators") public void bifurcatedPartMustContainFirstHalfOfBuffer(Fixture fixture) { @@ -507,6 +574,27 @@ public class BufferReferenceCountingTest extends BufferTestSupport { } } + @ParameterizedTest + @MethodSource("allocators") + public void mustBePossibleToBifurcateOwnedSlices(Fixture fixture) { + try (BufferAllocator allocator = fixture.createAllocator()) { + Buffer buf = allocator.allocate(16).order(BIG_ENDIAN); + buf.writeLong(0x0102030405060708L); + try (Buffer slice = buf.slice()) { + buf.close(); + assertTrue(slice.isOwned()); + try (Buffer bifurcate = slice.bifurcate(4)) { + bifurcate.reset().ensureWritable(Long.BYTES); + slice.reset().ensureWritable(Long.BYTES); + assertThat(bifurcate.capacity()).isEqualTo(Long.BYTES); + assertThat(slice.capacity()).isEqualTo(Long.BYTES); + assertThat(bifurcate.getLong(0)).isEqualTo(0x01020304_00000000L); + assertThat(slice.getLong(0)).isEqualTo(0x05060708_00000000L); + } + } + } + } + @ParameterizedTest @MethodSource("allocators") public void bifurcatedBufferMustHaveSameByteOrderAsParent(Fixture fixture) { diff --git a/src/test/java/io/netty/buffer/api/BufferTestSupport.java b/src/test/java/io/netty/buffer/api/BufferTestSupport.java index ca6bd14..7e56591 100644 --- a/src/test/java/io/netty/buffer/api/BufferTestSupport.java +++ b/src/test/java/io/netty/buffer/api/BufferTestSupport.java @@ -92,13 +92,13 @@ public abstract class BufferTestSupport { if ("nosample".equalsIgnoreCase(sampleSetting)) { return fixture -> true; } - Instant today = Instant.now().truncatedTo(ChronoUnit.DAYS); + Instant today = Instant.now().truncatedTo(ChronoUnit.DAYS); // New seed every day. SplittableRandom rng = new SplittableRandom(today.hashCode()); AtomicInteger counter = new AtomicInteger(); return fixture -> { - boolean res = counter.getAndIncrement() < 1 || rng.nextInt(0, 100) <= 2; - return res; - }; // Filter out 97% of tests. + // Filter out 95% of tests. + return counter.getAndIncrement() < 1 || rng.nextInt(0, 100) < 5; + }; } static Fixture[] allocators() { @@ -977,6 +977,10 @@ public abstract class BufferTestSupport { return bs; } + public static void assertEquals(Buffer expected, Buffer actual) { + assertThat(toByteArray(actual)).containsExactly(toByteArray(expected)); + } + public static void assertEquals(byte expected, byte actual) { if (expected != actual) { fail(String.format("expected: %1$s (0x%1$X) but was: %2$s (0x%2$X)", expected, actual)); From c081c7388596faa1b739f0712a3ae55b5e929296 Mon Sep 17 00:00:00 2001 From: Chris Vest Date: Thu, 22 Apr 2021 16:57:53 +0200 Subject: [PATCH 03/12] Port over the ByteToMessageDecoder as an example --- .../buffer/api/adaptor/BufferAdaptor.java | 535 +++++++++++++ .../ByteToMessageDecoder.java | 751 ++++++++++++++++++ .../ByteToMessageDecoderTest.java | 561 +++++++++++++ .../FixedLengthFrameDecoder.java | 64 ++ 4 files changed, 1911 insertions(+) create mode 100644 src/main/java/io/netty/buffer/api/adaptor/BufferAdaptor.java create mode 100644 src/test/java/io/netty/buffer/api/examples/bytetomessagedecoder/ByteToMessageDecoder.java create mode 100644 src/test/java/io/netty/buffer/api/examples/bytetomessagedecoder/ByteToMessageDecoderTest.java create mode 100644 src/test/java/io/netty/buffer/api/examples/bytetomessagedecoder/FixedLengthFrameDecoder.java diff --git a/src/main/java/io/netty/buffer/api/adaptor/BufferAdaptor.java b/src/main/java/io/netty/buffer/api/adaptor/BufferAdaptor.java new file mode 100644 index 0000000..958e2ab --- /dev/null +++ b/src/main/java/io/netty/buffer/api/adaptor/BufferAdaptor.java @@ -0,0 +1,535 @@ +/* + * Copyright 2021 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.buffer.api.adaptor; + +import io.netty.buffer.api.Buffer; +import io.netty.buffer.api.BufferHolder; +import io.netty.buffer.api.ByteCursor; +import io.netty.buffer.api.ReadableComponentProcessor; +import io.netty.buffer.api.Send; +import io.netty.buffer.api.WritableComponentProcessor; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.Objects; +import java.util.function.Function; + +/** + * A {@link Buffer} implementation that delegates all method calls to a given delegate buffer instance. + */ +public abstract class BufferAdaptor implements Buffer { + protected Buffer buffer; + + protected BufferAdaptor(Buffer buffer) { + this.buffer = Objects.requireNonNull(buffer, "Delegate buffer cannot be null."); + } + + @Override + public Buffer order(ByteOrder order) { + buffer.order(order); + return this; + } + + @Override + public ByteOrder order() { + return buffer.order(); + } + + @Override + public int capacity() { + return buffer.capacity(); + } + + @Override + public int readerOffset() { + return buffer.readerOffset(); + } + + @Override + public Buffer readerOffset(int offset) { + buffer.readerOffset(offset); + return this; + } + + @Override + public int writerOffset() { + return buffer.writerOffset(); + } + + @Override + public Buffer writerOffset(int offset) { + buffer.writerOffset(offset); + return this; + } + + @Override + public int readableBytes() { + return buffer.readableBytes(); + } + + @Override + public int writableBytes() { + return buffer.writableBytes(); + } + + @Override + public Buffer fill(byte value) { + buffer.fill(value); + return this; + } + + @Override + public long nativeAddress() { + return buffer.nativeAddress(); + } + + @Override + public Buffer readOnly(boolean readOnly) { + buffer.readOnly(readOnly); + return this; + } + + @Override + public boolean readOnly() { + return buffer.readOnly(); + } + + @Override + public void copyInto(int srcPos, byte[] dest, int destPos, int length) { + buffer.copyInto(srcPos, dest, destPos, length); + } + + @Override + public void copyInto(int srcPos, ByteBuffer dest, int destPos, int length) { + buffer.copyInto(srcPos, dest, destPos, length); + } + + @Override + public void copyInto(int srcPos, Buffer dest, int destPos, int length) { + buffer.copyInto(srcPos, dest, destPos, length); + } + + @Override + public Buffer reset() { + buffer.reset(); + return this; + } + + @Override + public ByteCursor openCursor() { + return buffer.openCursor(); + } + + @Override + public ByteCursor openCursor(int fromOffset, int length) { + return buffer.openCursor(fromOffset, length); + } + + @Override + public ByteCursor openReverseCursor() { + return buffer.openReverseCursor(); + } + + @Override + public ByteCursor openReverseCursor(int fromOffset, int length) { + return buffer.openReverseCursor(fromOffset, length); + } + + @Override + public void ensureWritable(int size) { + buffer.ensureWritable(size); + } + + @Override + public void ensureWritable(int size, int minimumGrowth, boolean allowCompaction) { + buffer.ensureWritable(size, minimumGrowth, allowCompaction); + } + + @Override + public Buffer slice() { + buffer.slice(); + return this; + } + + @Override + public Buffer slice(int offset, int length) { + buffer.slice(offset, length); + return this; + } + + @Override + public Buffer bifurcate() { + buffer.bifurcate(); + return this; + } + + @Override + public Buffer bifurcate(int splitOffset) { + buffer.bifurcate(splitOffset); + return this; + } + + @Override + public void compact() { + buffer.compact(); + } + + @Override + public int countComponents() { + return buffer.countComponents(); + } + + @Override + public int countReadableComponents() { + return buffer.countReadableComponents(); + } + + @Override + public int countWritableComponents() { + return buffer.countWritableComponents(); + } + + @Override + public int forEachReadable(int initialIndex, ReadableComponentProcessor processor) + throws E { + return buffer.forEachReadable(initialIndex, processor); + } + + @Override + public int forEachWritable(int initialIndex, WritableComponentProcessor processor) + throws E { + return buffer.forEachWritable(initialIndex, processor); + } + + @Override + public byte readByte() { + return buffer.readByte(); + } + + @Override + public byte getByte(int roff) { + return buffer.getByte(roff); + } + + @Override + public int readUnsignedByte() { + return buffer.readUnsignedByte(); + } + + @Override + public int getUnsignedByte(int roff) { + return buffer.getUnsignedByte(roff); + } + + @Override + public Buffer writeByte(byte value) { + buffer.writeByte(value); + return this; + } + + @Override + public Buffer setByte(int woff, byte value) { + buffer.setByte(woff, value); + return this; + } + + @Override + public Buffer writeUnsignedByte(int value) { + buffer.writeUnsignedByte(value); + return this; + } + + @Override + public Buffer setUnsignedByte(int woff, int value) { + buffer.setUnsignedByte(woff, value); + return this; + } + + @Override + public char readChar() { + return buffer.readChar(); + } + + @Override + public char getChar(int roff) { + return buffer.getChar(roff); + } + + @Override + public Buffer writeChar(char value) { + buffer.writeChar(value); + return this; + } + + @Override + public Buffer setChar(int woff, char value) { + buffer.setChar(woff, value); + return this; + } + + @Override + public short readShort() { + return buffer.readShort(); + } + + @Override + public short getShort(int roff) { + return buffer.getShort(roff); + } + + @Override + public int readUnsignedShort() { + return buffer.readUnsignedShort(); + } + + @Override + public int getUnsignedShort(int roff) { + return buffer.getUnsignedShort(roff); + } + + @Override + public Buffer writeShort(short value) { + buffer.writeShort(value); + return this; + } + + @Override + public Buffer setShort(int woff, short value) { + buffer.setShort(woff, value); + return this; + } + + @Override + public Buffer writeUnsignedShort(int value) { + buffer.writeUnsignedShort(value); + return this; + } + + @Override + public Buffer setUnsignedShort(int woff, int value) { + buffer.setUnsignedShort(woff, value); + return this; + } + + @Override + public int readMedium() { + return buffer.readMedium(); + } + + @Override + public int getMedium(int roff) { + return buffer.getMedium(roff); + } + + @Override + public int readUnsignedMedium() { + return buffer.readUnsignedMedium(); + } + + @Override + public int getUnsignedMedium(int roff) { + return buffer.getUnsignedMedium(roff); + } + + @Override + public Buffer writeMedium(int value) { + buffer.writeMedium(value); + return this; + } + + @Override + public Buffer setMedium(int woff, int value) { + buffer.setMedium(woff, value); + return this; + } + + @Override + public Buffer writeUnsignedMedium(int value) { + buffer.writeUnsignedMedium(value); + return this; + } + + @Override + public Buffer setUnsignedMedium(int woff, int value) { + buffer.setUnsignedMedium(woff, value); + return this; + } + + @Override + public int readInt() { + return buffer.readInt(); + } + + @Override + public int getInt(int roff) { + return buffer.getInt(roff); + } + + @Override + public long readUnsignedInt() { + return buffer.readUnsignedInt(); + } + + @Override + public long getUnsignedInt(int roff) { + return buffer.getUnsignedInt(roff); + } + + @Override + public Buffer writeInt(int value) { + buffer.writeInt(value); + return this; + } + + @Override + public Buffer setInt(int woff, int value) { + buffer.setInt(woff, value); + return this; + } + + @Override + public Buffer writeUnsignedInt(long value) { + buffer.writeUnsignedInt(value); + return this; + } + + @Override + public Buffer setUnsignedInt(int woff, long value) { + buffer.setUnsignedInt(woff, value); + return this; + } + + @Override + public float readFloat() { + return buffer.readFloat(); + } + + @Override + public float getFloat(int roff) { + return buffer.getFloat(roff); + } + + @Override + public Buffer writeFloat(float value) { + buffer.writeFloat(value); + return this; + } + + @Override + public Buffer setFloat(int woff, float value) { + buffer.setFloat(woff, value); + return this; + } + + @Override + public long readLong() { + return buffer.readLong(); + } + + @Override + public long getLong(int roff) { + return buffer.getLong(roff); + } + + @Override + public Buffer writeLong(long value) { + buffer.writeLong(value); + return this; + } + + @Override + public Buffer setLong(int woff, long value) { + buffer.setLong(woff, value); + return this; + } + + @Override + public double readDouble() { + return buffer.readDouble(); + } + + @Override + public double getDouble(int roff) { + return buffer.getDouble(roff); + } + + @Override + public Buffer writeDouble(double value) { + buffer.writeDouble(value); + return this; + } + + @Override + public Buffer setDouble(int woff, double value) { + buffer.setDouble(woff, value); + return this; + } + + @Override + public Buffer acquire() { + buffer.acquire(); + return this; + } + + @Override + public Buffer get() { + buffer.get(); + return this; + } + + @Override + public boolean isInstanceOf(Class cls) { + return buffer.isInstanceOf(cls); + } + + @Override + public void close() { + buffer.close(); + } + + @SuppressWarnings("unchecked") + @Override + public Send send() { + Class aClass = (Class) (Class) getClass(); + Function receive = this::receive; + return buffer.send().map(aClass, receive); + } + + /** + * Called when a {@linkplain #send() sent} {@link BufferAdaptor} is received by the recipient. + * The {@link BufferAdaptor} should return a new concrete instance, that wraps the given {@link Buffer} object. + * + * @param buf The {@link Buffer} that is {@linkplain Send#receive() received} by the recipient, + * and needs to be wrapped in a new {@link BufferHolder} instance. + * @return A new buffer adaptor instance, containing the given {@linkplain Buffer buffer}. + */ + protected abstract BufferAdaptor receive(Buffer buf); + + @Override + public boolean isOwned() { + return buffer.isOwned(); + } + + @Override + public int countBorrows() { + return buffer.countBorrows(); + } + + @Override + public boolean isAccessible() { + return buffer.isAccessible(); + } +} diff --git a/src/test/java/io/netty/buffer/api/examples/bytetomessagedecoder/ByteToMessageDecoder.java b/src/test/java/io/netty/buffer/api/examples/bytetomessagedecoder/ByteToMessageDecoder.java new file mode 100644 index 0000000..c5ac58c --- /dev/null +++ b/src/test/java/io/netty/buffer/api/examples/bytetomessagedecoder/ByteToMessageDecoder.java @@ -0,0 +1,751 @@ +/* + * Copyright 2021 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.buffer.api.examples.bytetomessagedecoder; + +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.api.Buffer; +import io.netty.buffer.api.BufferAllocator; +import io.netty.channel.Channel; +import io.netty.channel.ChannelConfig; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerAdapter; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.ChannelProgressivePromise; +import io.netty.channel.ChannelPromise; +import io.netty.channel.socket.ChannelInputShutdownEvent; +import io.netty.handler.codec.DecoderException; +import io.netty.handler.codec.DelimiterBasedFrameDecoder; +import io.netty.handler.codec.FixedLengthFrameDecoder; +import io.netty.handler.codec.LengthFieldBasedFrameDecoder; +import io.netty.handler.codec.LineBasedFrameDecoder; +import io.netty.util.Attribute; +import io.netty.util.AttributeKey; +import io.netty.util.concurrent.EventExecutor; +import io.netty.util.internal.MathUtil; +import io.netty.util.internal.StringUtil; + +import java.net.SocketAddress; + +import static io.netty.util.internal.ObjectUtil.checkPositive; +import static java.util.Objects.requireNonNull; + +/** + * {@link ChannelHandler} which decodes bytes in a stream-like fashion from one {@link Buffer} to an + * other Message type. + * + * For example here is an implementation which reads all readable bytes from + * the input {@link Buffer}, creates a new {@link Buffer} and forward it to the + * next {@link ChannelHandler} in the {@link ChannelPipeline}. + * + *

+ *     public class SquareDecoder extends {@link ByteToMessageDecoder} {
+ *         {@code @Override}
+ *         public void decode({@link ChannelHandlerContext} ctx, {@link Buffer} in)
+ *                 throws {@link Exception} {
+ *             ctx.fireChannelRead(in.bifurcate());
+ *         }
+ *     }
+ * 
+ * + *

Frame detection

+ *

+ * Generally frame detection should be handled earlier in the pipeline by adding a + * {@link DelimiterBasedFrameDecoder}, {@link FixedLengthFrameDecoder}, {@link LengthFieldBasedFrameDecoder}, + * or {@link LineBasedFrameDecoder}. + *

+ * If a custom frame decoder is required, then one needs to be careful when implementing + * one with {@link ByteToMessageDecoder}. Ensure there are enough bytes in the buffer for a + * complete frame by checking {@link Buffer#readableBytes()}. If there are not enough bytes + * for a complete frame, return without modifying the reader index to allow more bytes to arrive. + *

+ * To check for complete frames without modifying the reader index, use methods like + * {@link Buffer#getInt(int)}. + * One MUST use the reader index when using methods like + * {@link Buffer#getInt(int)}. + * For example calling in.getInt(0) is assuming the frame starts at the beginning of the buffer, which + * is not always the case. Use in.getInt(in.readerIndex()) instead. + *

Pitfalls

+ *

+ * Be aware that sub-classes of {@link ByteToMessageDecoder} MUST NOT + * annotated with {@link @Sharable}. + *

+ * Some methods such as {@link Buffer#bifurcate(int)} will cause a memory leak if the returned buffer + * is not released or fired through the {@link ChannelPipeline} via + * {@link ChannelHandlerContext#fireChannelRead(Object)}. + */ +public abstract class ByteToMessageDecoder extends ChannelHandlerAdapter { + + /** + * Cumulate {@link Buffer}s by merge them into one {@link Buffer}'s, using memory copies. + */ + public static final Cumulator MERGE_CUMULATOR = (alloc, cumulation, in) -> { + if (cumulation.readableBytes() == 0 && !Buffer.isComposite(cumulation)) { + // If cumulation is empty and input buffer is contiguous, use it directly + cumulation.close(); + return in; + } + // We must release 'in' in all cases as otherwise it may produce a leak if writeBytes(...) throw + // for whatever release (for example because of OutOfMemoryError) + try (in) { + final int required = in.readableBytes(); + if (required > cumulation.writableBytes() || !cumulation.isOwned() || cumulation.readOnly()) { + // Expand cumulation (by replacing it) under the following conditions: + // - cumulation cannot be resized to accommodate the additional data + // - cumulation can be expanded with a reallocation operation to accommodate but the buffer is + // assumed to be shared (e.g. refCnt() > 1) and the reallocation may not be safe. + return expandCumulation(alloc, cumulation, in); + } + in.copyInto(in.readerOffset(), cumulation, cumulation.writerOffset(), required); + cumulation.writerOffset(cumulation.writerOffset() + required); + in.readerOffset(in.writerOffset()); + return cumulation; + } + }; + + /** + * Cumulate {@link Buffer}s by add them to a composite buffer and so do no memory copy whenever + * possible. + * Be aware that composite buffer use a more complex indexing implementation so depending on your use-case + * and the decoder implementation this may be slower then just use the {@link #MERGE_CUMULATOR}. + */ + public static final Cumulator COMPOSITE_CUMULATOR = (alloc, cumulation, in) -> { + if (cumulation.readableBytes() == 0) { + cumulation.close(); + return in; + } + Buffer composite; + try (in) { + if (Buffer.isComposite(cumulation) && cumulation.isOwned()) { + composite = cumulation; + if (composite.writerOffset() != composite.capacity()) { + // Writer index must equal capacity if we are going to "write" + // new components to the end. + composite = cumulation.slice(0, composite.writerOffset()); + cumulation.close(); + } + } else { + composite = Buffer.compose(alloc, cumulation); + } + Buffer.extendComposite(composite, in); + return composite; + } + }; + + Buffer cumulation; + private Cumulator cumulator = MERGE_CUMULATOR; + private boolean singleDecode; + private boolean first; + + /** + * This flag is used to determine if we need to call {@link ChannelHandlerContext#read()} to consume more data + * when {@link ChannelConfig#isAutoRead()} is {@code false}. + */ + private boolean firedChannelRead; + + private int discardAfterReads = 16; + private int numReads; + private ByteToMessageDecoderContext context; + + protected ByteToMessageDecoder() { + ensureNotSharable(); + } + + /** + * If set then only one message is decoded on each {@link #channelRead(ChannelHandlerContext, Object)} + * call. This may be useful if you need to do some protocol upgrade and want to make sure nothing is mixed up. + * + * Default is {@code false} as this has performance impacts. + */ + public void setSingleDecode(boolean singleDecode) { + this.singleDecode = singleDecode; + } + + /** + * If {@code true} then only one message is decoded on each + * {@link #channelRead(ChannelHandlerContext, Object)} call. + * + * Default is {@code false} as this has performance impacts. + */ + public boolean isSingleDecode() { + return singleDecode; + } + + /** + * Set the {@link Cumulator} to use for cumulate the received {@link Buffer}s. + */ + public void setCumulator(Cumulator cumulator) { + requireNonNull(cumulator, "cumulator"); + this.cumulator = cumulator; + } + + /** + * Set the number of reads after which {@link Buffer#compact()} is called to free up memory. + * The default is {@code 16}. + */ + public void setDiscardAfterReads(int discardAfterReads) { + checkPositive(discardAfterReads, "discardAfterReads"); + this.discardAfterReads = discardAfterReads; + } + + /** + * Returns the actual number of readable bytes in the internal cumulative + * buffer of this decoder. You usually do not need to rely on this value + * to write a decoder. Use it only when you must use it at your own risk. + * This method is a shortcut to {@link #internalBuffer() internalBuffer().readableBytes()}. + */ + protected int actualReadableBytes() { + return internalBuffer().readableBytes(); + } + + /** + * Returns the internal cumulative buffer of this decoder. You usually + * do not need to access the internal buffer directly to write a decoder. + * Use it only when you must use it at your own risk. + */ + protected Buffer internalBuffer() { + if (cumulation != null) { + return cumulation; + } else { + return newEmptyBuffer(); + } + } + + private static Buffer newEmptyBuffer() { + return Buffer.compose(BufferAllocator.heap()); + } + + @Override + public final void handlerAdded(ChannelHandlerContext ctx) throws Exception { + context = new ByteToMessageDecoderContext(ctx); + handlerAdded0(context); + } + + protected void handlerAdded0(ChannelHandlerContext ctx) throws Exception { + } + + @Override + public final void handlerRemoved(ChannelHandlerContext ctx) throws Exception { + Buffer buf = cumulation; + if (buf != null) { + // Directly set this to null so we are sure we not access it in any other method here anymore. + cumulation = null; + numReads = 0; + int readable = buf.readableBytes(); + if (readable > 0) { + ctx.fireChannelRead(buf); + ctx.fireChannelReadComplete(); + } else { + buf.close(); + } + } + handlerRemoved0(context); + } + + /** + * Gets called after the {@link ByteToMessageDecoder} was removed from the actual context and it doesn't handle + * events anymore. + */ + protected void handlerRemoved0(ChannelHandlerContext ctx) throws Exception { } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + if (msg instanceof Buffer) { + try { + Buffer data = (Buffer) msg; + first = cumulation == null; + if (first) { + cumulation = data; + } else { +// ByteBufAllocator alloc = ctx.alloc(); // TODO this API integration needs more work + BufferAllocator alloc = BufferAllocator.heap(); + cumulation = cumulator.cumulate(alloc, cumulation, data); + } + assert context.ctx == ctx || ctx == context; + + callDecode(context, cumulation); // TODO we'll want to bifurcate here, and simplify lifetime handling + } catch (DecoderException e) { + throw e; + } catch (Exception e) { + throw new DecoderException(e); + } finally { + if (cumulation != null && cumulation.readableBytes() == 0) { + numReads = 0; + cumulation.close(); + cumulation = null; + } else if (++ numReads >= discardAfterReads) { + // We did enough reads already try to discard some bytes so we not risk to see a OOME. + // See https://github.com/netty/netty/issues/4275 + numReads = 0; + discardSomeReadBytes(); // TODO no need to so this dance because ensureWritable can compact for us + } + + firedChannelRead |= context.fireChannelReadCallCount() > 0; + context.reset(); + } + } else { + ctx.fireChannelRead(msg); + } + } + + @Override + public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { + numReads = 0; + discardSomeReadBytes(); + if (!firedChannelRead && !ctx.channel().config().isAutoRead()) { + ctx.read(); + } + firedChannelRead = false; + ctx.fireChannelReadComplete(); + } + + protected final void discardSomeReadBytes() { + if (cumulation != null && !first && cumulation.isOwned()) { + // discard some bytes if possible to make more room in the + // buffer but only if the refCnt == 1 as otherwise the user may have + // used slice().retain() or duplicate().retain(). + // + // See: + // - https://github.com/netty/netty/issues/2327 + // - https://github.com/netty/netty/issues/1764 + cumulation.compact(); + } + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + assert context.ctx == ctx || ctx == context; + channelInputClosed(context, true); + } + + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + ctx.fireUserEventTriggered(evt); + if (evt instanceof ChannelInputShutdownEvent) { + // The decodeLast method is invoked when a channelInactive event is encountered. + // This method is responsible for ending requests in some situations and must be called + // when the input has been shutdown. + assert context.ctx == ctx || ctx == context; + channelInputClosed(context, false); + } + } + + private void channelInputClosed(ByteToMessageDecoderContext ctx, boolean callChannelInactive) { + try { + channelInputClosed(ctx); + } catch (DecoderException e) { + throw e; + } catch (Exception e) { + throw new DecoderException(e); + } finally { + if (cumulation != null) { + cumulation.close(); + cumulation = null; + } + if (ctx.fireChannelReadCallCount() > 0) { + ctx.reset(); + // Something was read, call fireChannelReadComplete() + ctx.fireChannelReadComplete(); + } + if (callChannelInactive) { + ctx.fireChannelInactive(); + } + } + } + + /** + * Called when the input of the channel was closed which may be because it changed to inactive or because of + * {@link ChannelInputShutdownEvent}. + */ + void channelInputClosed(ByteToMessageDecoderContext ctx) throws Exception { + if (cumulation != null) { + callDecode(ctx, cumulation); + // If callDecode(...) removed the handle from the pipeline we should not call decodeLast(...) as this would + // be unexpected. + if (!ctx.isRemoved()) { + // Use Unpooled.EMPTY_BUFFER if cumulation become null after calling callDecode(...). + // See https://github.com/netty/netty/issues/10802. + Buffer buffer = cumulation == null ? newEmptyBuffer() : cumulation; + decodeLast(ctx, buffer); + } + } else { + decodeLast(ctx, newEmptyBuffer()); + } + } + + /** + * Called once data should be decoded from the given {@link Buffer}. This method will call + * {@link #decode(ChannelHandlerContext, Buffer)} as long as decoding should take place. + * + * @param ctx the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to + * @param in the {@link Buffer} from which to read data + */ + void callDecode(ByteToMessageDecoderContext ctx, Buffer in) { + try { + while (in.readableBytes() > 0 && !ctx.isRemoved()) { + + int oldInputLength = in.readableBytes(); + int numReadCalled = ctx.fireChannelReadCallCount(); + decodeRemovalReentryProtection(ctx, in); + + // Check if this handler was removed before continuing the loop. + // If it was removed, it is not safe to continue to operate on the buffer. + // + // See https://github.com/netty/netty/issues/1664 + if (ctx.isRemoved()) { + break; + } + + if (numReadCalled == ctx.fireChannelReadCallCount()) { + if (oldInputLength == in.readableBytes()) { + break; + } else { + continue; + } + } + + if (oldInputLength == in.readableBytes()) { + throw new DecoderException( + StringUtil.simpleClassName(getClass()) + + ".decode() did not read anything but decoded a message."); + } + + if (isSingleDecode()) { + break; + } + } + } catch (DecoderException e) { + throw e; + } catch (Exception cause) { + throw new DecoderException(cause); + } + } + + /** + * Decode the from one {@link Buffer} to an other. This method will be called till either the input + * {@link Buffer} has nothing to read when return from this method or till nothing was read from the input + * {@link Buffer}. + * + * @param ctx the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to + * @param in the {@link Buffer} from which to read data + * @throws Exception is thrown if an error occurs + */ + protected abstract void decode(ChannelHandlerContext ctx, Buffer in) throws Exception; + + /** + * Decode the from one {@link Buffer} to an other. This method will be called till either the input + * {@link Buffer} has nothing to read when return from this method or till nothing was read from the input + * {@link Buffer}. + * + * @param ctx the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to + * @param in the {@link Buffer} from which to read data + * @throws Exception is thrown if an error occurs + */ + final void decodeRemovalReentryProtection(ChannelHandlerContext ctx, Buffer in) + throws Exception { + decode(ctx, in); + } + + /** + * Is called one last time when the {@link ChannelHandlerContext} goes in-active. Which means the + * {@link #channelInactive(ChannelHandlerContext)} was triggered. + * + * By default this will just call {@link #decode(ChannelHandlerContext, Buffer)} but sub-classes may + * override this for some special cleanup operation. + */ + protected void decodeLast(ChannelHandlerContext ctx, Buffer in) throws Exception { + if (in.readableBytes() > 0) { + // Only call decode() if there is something left in the buffer to decode. + // See https://github.com/netty/netty/issues/4386 + decodeRemovalReentryProtection(ctx, in); + } + } + + private static Buffer expandCumulation(BufferAllocator alloc, Buffer oldCumulation, Buffer in) { + int newSize = MathUtil.safeFindNextPositivePowerOfTwo(oldCumulation.readableBytes() + in.readableBytes()); + Buffer newCumulation = alloc.allocate(newSize, oldCumulation.order()); + Buffer toRelease = newCumulation; + try { + oldCumulation.copyInto(oldCumulation.readerOffset(), newCumulation, 0, oldCumulation.readableBytes()); + in.copyInto(in.readerOffset(), newCumulation, oldCumulation.readableBytes(), in.readableBytes()); + newCumulation.writerOffset(oldCumulation.readableBytes() + in.readableBytes()); + toRelease = oldCumulation; + return newCumulation; + } finally { + toRelease.close(); + } + } + + /** + * Cumulate {@link Buffer}s. + */ + public interface Cumulator { + /** + * Cumulate the given {@link Buffer}s and return the {@link Buffer} that holds the cumulated bytes. + * The implementation is responsible to correctly handle the life-cycle of the given {@link Buffer}s and so + * call {@link Buffer#close()} if a {@link Buffer} is fully consumed. + */ + Buffer cumulate(BufferAllocator alloc, Buffer cumulation, Buffer in); + } + + // Package private so we can also make use of it in ReplayingDecoder. + static final class ByteToMessageDecoderContext implements ChannelHandlerContext { + private final ChannelHandlerContext ctx; + private int fireChannelReadCalled; + + private ByteToMessageDecoderContext(ChannelHandlerContext ctx) { + this.ctx = ctx; + } + + void reset() { + fireChannelReadCalled = 0; + } + + int fireChannelReadCallCount() { + return fireChannelReadCalled; + } + + @Override + public Channel channel() { + return ctx.channel(); + } + + @Override + public EventExecutor executor() { + return ctx.executor(); + } + + @Override + public String name() { + return ctx.name(); + } + + @Override + public ChannelHandler handler() { + return ctx.handler(); + } + + @Override + public boolean isRemoved() { + return ctx.isRemoved(); + } + + @Override + public ChannelHandlerContext fireChannelRegistered() { + ctx.fireChannelRegistered(); + return this; + } + + @Override + public ChannelHandlerContext fireChannelUnregistered() { + ctx.fireChannelUnregistered(); + return this; + } + + @Override + public ChannelHandlerContext fireChannelActive() { + ctx.fireChannelActive(); + return this; + } + + @Override + public ChannelHandlerContext fireChannelInactive() { + ctx.fireChannelInactive(); + return this; + } + + @Override + public ChannelHandlerContext fireExceptionCaught(Throwable cause) { + ctx.fireExceptionCaught(cause); + return this; + } + + @Override + public ChannelHandlerContext fireUserEventTriggered(Object evt) { + ctx.fireUserEventTriggered(evt); + return this; + } + + @Override + public ChannelHandlerContext fireChannelRead(Object msg) { + fireChannelReadCalled ++; + ctx.fireChannelRead(msg); + return this; + } + + @Override + public ChannelHandlerContext fireChannelReadComplete() { + ctx.fireChannelReadComplete(); + return this; + } + + @Override + public ChannelHandlerContext fireChannelWritabilityChanged() { + ctx.fireChannelWritabilityChanged(); + return this; + } + + @Override + public ChannelHandlerContext read() { + ctx.read(); + return this; + } + + @Override + public ChannelHandlerContext flush() { + ctx.flush(); + return this; + } + + @Override + public ChannelPipeline pipeline() { + return ctx.pipeline(); + } + + @Override + public ByteBufAllocator alloc() { + return ctx.alloc(); + } + + @Override + @Deprecated + public Attribute attr(AttributeKey key) { + return ctx.attr(key); + } + + @Override + @Deprecated + public boolean hasAttr(AttributeKey key) { + return ctx.hasAttr(key); + } + + @Override + public ChannelFuture bind(SocketAddress localAddress) { + return ctx.bind(localAddress); + } + + @Override + public ChannelFuture connect(SocketAddress remoteAddress) { + return ctx.connect(remoteAddress); + } + + @Override + public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) { + return ctx.connect(remoteAddress, localAddress); + } + + @Override + public ChannelFuture disconnect() { + return ctx.disconnect(); + } + + @Override + public ChannelFuture close() { + return ctx.close(); + } + + @Override + public ChannelFuture deregister() { + return ctx.deregister(); + } + + @Override + public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) { + return ctx.bind(localAddress, promise); + } + + @Override + public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) { + return ctx.connect(remoteAddress, promise); + } + + @Override + public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) { + return ctx.connect(remoteAddress, localAddress, promise); + } + + @Override + public ChannelFuture disconnect(ChannelPromise promise) { + return ctx.disconnect(promise); + } + + @Override + public ChannelFuture close(ChannelPromise promise) { + return ctx.close(promise); + } + + @Override + public ChannelFuture register() { + return ctx.register(); + } + + @Override + public ChannelFuture register(ChannelPromise promise) { + return ctx.register(promise); + } + + @Override + public ChannelFuture deregister(ChannelPromise promise) { + return ctx.deregister(promise); + } + + @Override + public ChannelFuture write(Object msg) { + return ctx.write(msg); + } + + @Override + public ChannelFuture write(Object msg, ChannelPromise promise) { + return ctx.write(msg, promise); + } + + @Override + public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) { + return ctx.writeAndFlush(msg, promise); + } + + @Override + public ChannelFuture writeAndFlush(Object msg) { + return ctx.writeAndFlush(msg); + } + + @Override + public ChannelPromise newPromise() { + return ctx.newPromise(); + } + + @Override + public ChannelProgressivePromise newProgressivePromise() { + return ctx.newProgressivePromise(); + } + + @Override + public ChannelFuture newSucceededFuture() { + return ctx.newSucceededFuture(); + } + + @Override + public ChannelFuture newFailedFuture(Throwable cause) { + return ctx.newFailedFuture(cause); + } + + @Override + public ChannelPromise voidPromise() { + return ctx.voidPromise(); + } + } +} diff --git a/src/test/java/io/netty/buffer/api/examples/bytetomessagedecoder/ByteToMessageDecoderTest.java b/src/test/java/io/netty/buffer/api/examples/bytetomessagedecoder/ByteToMessageDecoderTest.java new file mode 100644 index 0000000..d2ab93e --- /dev/null +++ b/src/test/java/io/netty/buffer/api/examples/bytetomessagedecoder/ByteToMessageDecoderTest.java @@ -0,0 +1,561 @@ +/* + * Copyright 2021 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.buffer.api.examples.bytetomessagedecoder; + +import io.netty.buffer.api.Buffer; +import io.netty.buffer.api.BufferAllocator; +import io.netty.buffer.api.adaptor.BufferAdaptor; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.embedded.EmbeddedChannel; +import io.netty.channel.socket.ChannelInputShutdownEvent; +import org.junit.Test; + +import java.nio.ByteOrder; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; + +import static io.netty.buffer.api.BufferAllocator.heap; +import static io.netty.buffer.api.BufferTestSupport.assertEquals; +import static java.nio.ByteOrder.BIG_ENDIAN; +import static java.nio.ByteOrder.LITTLE_ENDIAN; +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class ByteToMessageDecoderTest { + + @Test + public void testRemoveItself() { + EmbeddedChannel channel = new EmbeddedChannel(new ByteToMessageDecoder() { + private boolean removed; + + @Override + protected void decode(ChannelHandlerContext ctx, Buffer in) { + assertFalse(removed); + in.readByte(); + ctx.pipeline().remove(this); + removed = true; + } + }); + + try (Buffer buf = heap().allocate(4).writeInt(0x01020304)) { + channel.writeInbound(buf.slice()); + try (Buffer b = channel.readInbound()) { + buf.readByte(); + assertEquals(b, buf); + } + } + } + + @Test + public void testRemoveItselfWriteBuffer() { + final Buffer buf = heap().allocate(5, BIG_ENDIAN).writeInt(0x01020304); + EmbeddedChannel channel = new EmbeddedChannel(new ByteToMessageDecoder() { + private boolean removed; + + @Override + protected void decode(ChannelHandlerContext ctx, Buffer in) { + assertFalse(removed); + in.readByte(); + ctx.pipeline().remove(this); + + // This should not let it keep call decode + buf.writeByte((byte) 0x05); + removed = true; + } + }); + + channel.writeInbound(buf.slice()); + try (Buffer expected = heap().allocate(3, BIG_ENDIAN).writeShort((short) 0x0203).writeByte((byte) 0x04); + Buffer b = channel.readInbound(); + Buffer actual = b.slice(); // Only compare readable bytes. + buf) { + assertEquals(expected, actual); + } + } + + /** + * Verifies that internal buffer of the ByteToMessageDecoder is released once decoder is removed from pipeline. In + * this case input is read fully. + */ + @Test + public void testInternalBufferClearReadAll() { + Buffer buf = heap().allocate(1).writeByte((byte) 'a'); + EmbeddedChannel channel = newInternalBufferTestChannel(); + assertFalse(channel.writeInbound(buf)); + assertFalse(channel.finish()); + } + + /** + * Verifies that internal buffer of the ByteToMessageDecoder is released once decoder is removed from pipeline. In + * this case input was not fully read. + */ + @Test + public void testInternalBufferClearReadPartly() { + final Buffer buf = heap().allocate(2, BIG_ENDIAN).writeShort((short) 0x0102); + EmbeddedChannel channel = newInternalBufferTestChannel(); + assertTrue(channel.writeInbound(buf)); + assertTrue(channel.finish()); + try (Buffer expected = heap().allocate(1).writeByte((byte) 0x02); + Buffer b = channel.readInbound(); + Buffer actual = b.slice()) { + assertEquals(expected, actual); + assertNull(channel.readInbound()); + } + } + + private EmbeddedChannel newInternalBufferTestChannel() { + return new EmbeddedChannel(new ByteToMessageDecoder() { + @Override + protected void decode(ChannelHandlerContext ctx, Buffer in) { + Buffer buf = internalBuffer(); + assertTrue(buf.isOwned()); + in.readByte(); + // Removal from pipeline should clear internal buffer + ctx.pipeline().remove(this); + } + + @Override + protected void handlerRemoved0(ChannelHandlerContext ctx) { + assertCumulationReleased(internalBuffer()); + } + }); + } + + @Test + public void handlerRemovedWillNotReleaseBufferIfDecodeInProgress() { + EmbeddedChannel channel = new EmbeddedChannel(new ByteToMessageDecoder() { + @Override + protected void decode(ChannelHandlerContext ctx, Buffer in) throws Exception { + ctx.pipeline().remove(this); + assertTrue(in.isAccessible()); + } + + @Override + protected void handlerRemoved0(ChannelHandlerContext ctx) { + assertCumulationReleased(internalBuffer()); + } + }); + byte[] bytes = new byte[1024]; + ThreadLocalRandom.current().nextBytes(bytes); + + Buffer buffer = heap().allocate(bytes.length); + for (byte b : bytes) { + buffer.writeByte(b); + } + assertTrue(channel.writeInbound(buffer)); + assertTrue(channel.finishAndReleaseAll()); + } + + private static void assertCumulationReleased(Buffer buffer) { + assertTrue("unexpected value: " + buffer, + buffer == null || buffer.capacity() == 0 || !buffer.isAccessible()); + } + + @Test + public void testFireChannelReadCompleteOnInactive() throws InterruptedException { + final BlockingQueue queue = new LinkedBlockingDeque<>(); + EmbeddedChannel channel = new EmbeddedChannel(new ByteToMessageDecoder() { + @Override + protected void decode(ChannelHandlerContext ctx, Buffer in) { + int readable = in.readableBytes(); + assertTrue(readable > 0); + in.readerOffset(in.readerOffset() + readable); + } + + @Override + protected void decodeLast(ChannelHandlerContext ctx, Buffer in) { + assertEquals(0, in.readableBytes()); + ctx.fireChannelRead("data"); + } + }, new ChannelHandler() { + @Override + public void channelInactive(ChannelHandlerContext ctx) { + queue.add(3); + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) { + queue.add(1); + } + + @Override + public void channelReadComplete(ChannelHandlerContext ctx) { + if (!ctx.channel().isActive()) { + queue.add(2); + } + } + }); + Buffer buf = heap().allocate(2, BIG_ENDIAN).writeShort((short) 0x0102); + assertFalse(channel.writeInbound(buf)); + channel.finish(); + assertEquals(1, queue.take()); + assertEquals(2, queue.take()); + assertEquals(3, queue.take()); + assertTrue(queue.isEmpty()); + assertFalse(buf.isAccessible()); + } + + // See https://github.com/netty/netty/issues/4635 + @Test + public void testRemoveWhileInCallDecode() { + final Object upgradeMessage = new Object(); + final ByteToMessageDecoder decoder = new ByteToMessageDecoder() { + @Override + protected void decode(ChannelHandlerContext ctx, Buffer in) { + assertEquals(1, in.readByte()); + ctx.fireChannelRead(upgradeMessage); + } + }; + + EmbeddedChannel channel = new EmbeddedChannel(decoder, new ChannelHandler() { + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) { + if (msg == upgradeMessage) { + ctx.pipeline().remove(decoder); + return; + } + ctx.fireChannelRead(msg); + } + }); + + try (Buffer buf = heap().allocate(4, BIG_ENDIAN).writeInt(0x01020304)) { + assertTrue(channel.writeInbound(buf.slice())); + try (Buffer expected = buf.slice(1, 3); + Buffer b = channel.readInbound(); + Buffer actual = b.slice()) { + assertEquals(expected, actual); + assertFalse(channel.finish()); + } + } + } + + @Test + public void testDecodeLastEmptyBuffer() { + EmbeddedChannel channel = new EmbeddedChannel(new ByteToMessageDecoder() { + @Override + protected void decode(ChannelHandlerContext ctx, Buffer in) { + assertTrue(in.readableBytes() > 0); + Buffer slice = in.slice(); + in.readerOffset(in.readerOffset() + in.readableBytes()); + ctx.fireChannelRead(slice); + } + }); + byte[] bytes = new byte[1024]; + ThreadLocalRandom.current().nextBytes(bytes); + + try (Buffer buf = heap().allocate(bytes.length)) { + for (byte b : bytes) { + buf.writeByte(b); + } + assertTrue(channel.writeInbound(buf.slice())); + try (Buffer b = channel.readInbound()) { + assertEquals(buf, b); + assertNull(channel.readInbound()); + assertFalse(channel.finish()); + assertNull(channel.readInbound()); + } + } + } + + @Test + public void testDecodeLastNonEmptyBuffer() { + EmbeddedChannel channel = new EmbeddedChannel(new ByteToMessageDecoder() { + private boolean decodeLast; + + @Override + protected void decode(ChannelHandlerContext ctx, Buffer in) { + int readable = in.readableBytes(); + assertTrue(readable > 0); + if (!decodeLast && readable == 1) { + return; + } + int read = decodeLast ? readable : readable - 1; + Buffer slice = in.slice(in.readerOffset(), read); + in.readerOffset(in.readerOffset() + read); + ctx.fireChannelRead(slice); + } + + @Override + protected void decodeLast(ChannelHandlerContext ctx, Buffer in) throws Exception { + assertFalse(decodeLast); + decodeLast = true; + super.decodeLast(ctx, in); + } + }); + byte[] bytes = new byte[1024]; + ThreadLocalRandom.current().nextBytes(bytes); + try (Buffer buf = heap().allocate(bytes.length, BIG_ENDIAN); + Buffer part1 = buf.slice(0, bytes.length - 1); + Buffer part2 = buf.slice(bytes.length - 1, 1)) { + for (byte b : bytes) { + buf.writeByte(b); + } + assertTrue(channel.writeInbound(buf.slice())); + try (Buffer actual = channel.readInbound()) { + assertEquals(part1, actual); + } + assertNull(channel.readInbound()); + assertTrue(channel.finish()); + try (Buffer actual = channel.readInbound()) { + assertEquals(part2, actual); + } + assertNull(channel.readInbound()); + } + } + + @Test + public void testReadOnlyBuffer() { + EmbeddedChannel channel = new EmbeddedChannel(new ByteToMessageDecoder() { + @Override + protected void decode(ChannelHandlerContext ctx, Buffer in) { } + }); + assertFalse(channel.writeInbound(heap().allocate(8).writeByte((byte) 1).readOnly(true))); + assertFalse(channel.writeInbound(heap().allocate(1).writeByte((byte) 2))); + assertFalse(channel.finish()); + } + + static class WriteFailingByteBuf extends BufferAdaptor { + private final Error error = new Error(); + private int untilFailure; + + WriteFailingByteBuf(int untilFailure, int capacity) { + this(untilFailure, heap().allocate(capacity, BIG_ENDIAN)); + this.untilFailure = untilFailure; + } + + private WriteFailingByteBuf(int untilFailure, Buffer buffer) { + super(buffer); + this.untilFailure = untilFailure; + } + + @Override + public Buffer order(ByteOrder order) { + if (order == LITTLE_ENDIAN && --untilFailure <= 0) { + throw error; + } + return super.order(order); + } + + @Override + protected BufferAdaptor receive(Buffer buf) { + return new WriteFailingByteBuf(untilFailure, buf); + } + + Error writeError() { + return error; + } + } + + @Test + public void releaseWhenMergeCumulateThrows() { + WriteFailingByteBuf oldCumulation = new WriteFailingByteBuf(1, 64); + oldCumulation.writeByte((byte) 0); + Buffer in = heap().allocate(12, BIG_ENDIAN).writerOffset(12); + + Throwable thrown = null; + try { + ByteToMessageDecoder.MERGE_CUMULATOR.cumulate(heap(), oldCumulation, in); + } catch (Throwable t) { + thrown = t; + } + + assertSame(oldCumulation.writeError(), thrown); + assertFalse(in.isAccessible()); + assertTrue(oldCumulation.isOwned()); + oldCumulation.close(); + } + + @Test + public void releaseWhenMergeCumulateThrowsInExpand() { + releaseWhenMergeCumulateThrowsInExpand(1, true); + releaseWhenMergeCumulateThrowsInExpand(2, true); + releaseWhenMergeCumulateThrowsInExpand(3, false); // sentinel test case + } + + private static void releaseWhenMergeCumulateThrowsInExpand(int untilFailure, boolean shouldFail) { + Buffer oldCumulation = heap().allocate(8, BIG_ENDIAN).writeByte((byte) 0); + final WriteFailingByteBuf newCumulation = new WriteFailingByteBuf(untilFailure, 16); + + BufferAllocator allocator = new BufferAllocator() { + @Override + public Buffer allocate(int capacity) { + return newCumulation; + } + }; + + Buffer in = heap().allocate(12, BIG_ENDIAN).writerOffset(12); + Throwable thrown = null; + try { + ByteToMessageDecoder.MERGE_CUMULATOR.cumulate(allocator, oldCumulation, in); + } catch (Throwable t) { + thrown = t; + } + + assertFalse(in.isAccessible()); + + if (shouldFail) { + assertSame(newCumulation.writeError(), thrown); + assertTrue(oldCumulation.isOwned()); + oldCumulation.close(); + assertFalse(newCumulation.isAccessible()); + } else { + assertNull(thrown); + assertFalse(oldCumulation.isAccessible()); + assertTrue(newCumulation.isOwned()); + newCumulation.close(); + } + } + + @Test + public void releaseWhenCompositeCumulateThrows() { + Buffer in = heap().allocate(12, LITTLE_ENDIAN).writerOffset(12); + try (Buffer cumulation = Buffer.compose(heap(), heap().allocate(1, BIG_ENDIAN).writeByte((byte) 0).send())) { + ByteToMessageDecoder.COMPOSITE_CUMULATOR.cumulate(heap(), cumulation, in); + fail(); + } catch (IllegalArgumentException expected) { + assertThat(expected).hasMessageContaining("byte order"); + assertFalse(in.isAccessible()); + } + } + + @Test + public void testDoesNotOverRead() { + class ReadInterceptingHandler implements ChannelHandler { + private int readsTriggered; + + @Override + public void read(ChannelHandlerContext ctx) { + readsTriggered++; + ctx.read(); + } + } + ReadInterceptingHandler interceptor = new ReadInterceptingHandler(); + + EmbeddedChannel channel = new EmbeddedChannel(); + channel.config().setAutoRead(false); + channel.pipeline().addLast(interceptor, new FixedLengthFrameDecoder(3)); + assertEquals(0, interceptor.readsTriggered); + + // 0 complete frames, 1 partial frame: SHOULD trigger a read + channel.writeInbound(heap().allocate(2, BIG_ENDIAN).writeShort((short) 0x0001)); + assertEquals(1, interceptor.readsTriggered); + + // 2 complete frames, 0 partial frames: should NOT trigger a read + channel.writeInbound(heap().allocate(1).writeByte((byte) 2), + heap().allocate(3).writeByte((byte) 3).writeByte((byte) 4).writeByte((byte) 5)); + assertEquals(1, interceptor.readsTriggered); + + // 1 complete frame, 1 partial frame: should NOT trigger a read + channel.writeInbound(heap().allocate(3).writeByte((byte) 6).writeByte((byte) 7).writeByte((byte) 8), + heap().allocate(1).writeByte((byte) 9)); + assertEquals(1, interceptor.readsTriggered); + + // 1 complete frame, 1 partial frame: should NOT trigger a read + channel.writeInbound(heap().allocate(2).writeByte((byte) 10).writeByte((byte) 11), + heap().allocate(1).writeByte((byte) 12)); + assertEquals(1, interceptor.readsTriggered); + + // 0 complete frames, 1 partial frame: SHOULD trigger a read + channel.writeInbound(heap().allocate(1).writeByte((byte) 13)); + assertEquals(2, interceptor.readsTriggered); + + // 1 complete frame, 0 partial frames: should NOT trigger a read + channel.writeInbound(heap().allocate(1).writeByte((byte) 14)); + assertEquals(2, interceptor.readsTriggered); + + for (int i = 0; i < 5; i++) { + try (Buffer read = channel.readInbound()) { + assertEquals(i * 3, read.getByte(0)); + assertEquals(i * 3 + 1, read.getByte(1)); + assertEquals(i * 3 + 2, read.getByte(2)); + } + } + assertFalse(channel.finish()); + } + + @Test + public void testDisorder() { + ByteToMessageDecoder decoder = new ByteToMessageDecoder() { + int count; + + //read 4 byte then remove this decoder + @Override + protected void decode(ChannelHandlerContext ctx, Buffer in) { + ctx.fireChannelRead(in.readByte()); + if (++count >= 4) { + ctx.pipeline().remove(this); + } + } + }; + EmbeddedChannel channel = new EmbeddedChannel(decoder); + byte[] bytes = {1, 2, 3, 4, 5}; + Buffer buf = heap().allocate(bytes.length); + for (byte b : bytes) { + buf.writeByte(b); + } + assertTrue(channel.writeInbound(buf)); + assertEquals((byte) 1, channel.readInbound()); + assertEquals((byte) 2, channel.readInbound()); + assertEquals((byte) 3, channel.readInbound()); + assertEquals((byte) 4, channel.readInbound()); + Buffer buffer5 = channel.readInbound(); + assertEquals((byte) 5, buffer5.readByte()); + assertEquals(0, buffer5.readableBytes()); + buffer5.close(); + assertFalse(buffer5.isAccessible()); + assertFalse(channel.finish()); + } + + @Test + public void testDecodeLast() { + final AtomicBoolean removeHandler = new AtomicBoolean(); + EmbeddedChannel channel = new EmbeddedChannel(new ByteToMessageDecoder() { + + @Override + protected void decode(ChannelHandlerContext ctx, Buffer in) { + if (removeHandler.get()) { + ctx.pipeline().remove(this); + } + } + }); + byte[] bytes = new byte[1024]; + ThreadLocalRandom.current().nextBytes(bytes); + try (Buffer buf = heap().allocate(bytes.length)) { + for (byte b : bytes) { + buf.writeByte(b); + } + + assertFalse(channel.writeInbound(buf.slice())); + assertNull(channel.readInbound()); + removeHandler.set(true); + // This should trigger channelInputClosed(...) + channel.pipeline().fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE); + + assertTrue(channel.finish()); + try (Buffer actual = channel.readInbound()) { + assertEquals(buf.slice(), actual); + } + assertNull(channel.readInbound()); + } + } +} diff --git a/src/test/java/io/netty/buffer/api/examples/bytetomessagedecoder/FixedLengthFrameDecoder.java b/src/test/java/io/netty/buffer/api/examples/bytetomessagedecoder/FixedLengthFrameDecoder.java new file mode 100644 index 0000000..9927c10 --- /dev/null +++ b/src/test/java/io/netty/buffer/api/examples/bytetomessagedecoder/FixedLengthFrameDecoder.java @@ -0,0 +1,64 @@ +/* + * Copyright 2021 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.buffer.api.examples.bytetomessagedecoder; + +import io.netty.buffer.api.Buffer; +import io.netty.channel.ChannelHandlerContext; + +import static io.netty.util.internal.ObjectUtil.checkPositive; + +public class FixedLengthFrameDecoder extends ByteToMessageDecoder { + private final int frameLength; + + /** + * Creates a new instance. + * + * @param frameLength the length of the frame + */ + public FixedLengthFrameDecoder(int frameLength) { + checkPositive(frameLength, "frameLength"); + this.frameLength = frameLength; + } + + @Override + protected final void decode(ChannelHandlerContext ctx, Buffer in) throws Exception { + Object decoded = decode0(ctx, in); + if (decoded != null) { + ctx.fireChannelRead(decoded); + } + } + + /** + * Create a frame out of the {@link Buffer} and return it. + * + * @param ctx the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to + * @param in the {@link Buffer} from which to read data + * @return frame the {@link Buffer} which represent the frame or {@code null} if no frame could + * be created. + */ + protected Object decode0( + @SuppressWarnings("UnusedParameters") ChannelHandlerContext ctx, Buffer in) throws Exception { + if (in.readableBytes() < frameLength) { + return null; + } else { + try { + return in.slice(in.readerOffset(), frameLength); + } finally { + in.readerOffset(in.readerOffset() + frameLength); + } + } + } +} From 0748d206d2e317b50946dc2a7fec5cca3686e6b3 Mon Sep 17 00:00:00 2001 From: Chris Vest Date: Fri, 23 Apr 2021 17:13:48 +0200 Subject: [PATCH 04/12] Add an alternative message decoder implementation This one is a rewrite, making use of the new APIs where possible. The test uses bifurcate to cut buffers into segments. --- .../AlternativeMessageDecoder.java | 115 ++++++++++++++++++ .../AlternativeMessageDecoderTest.java | 99 +++++++++++++++ 2 files changed, 214 insertions(+) create mode 100644 src/test/java/io/netty/buffer/api/examples/bytetomessagedecoder/AlternativeMessageDecoder.java create mode 100644 src/test/java/io/netty/buffer/api/examples/bytetomessagedecoder/AlternativeMessageDecoderTest.java diff --git a/src/test/java/io/netty/buffer/api/examples/bytetomessagedecoder/AlternativeMessageDecoder.java b/src/test/java/io/netty/buffer/api/examples/bytetomessagedecoder/AlternativeMessageDecoder.java new file mode 100644 index 0000000..e8e1a89 --- /dev/null +++ b/src/test/java/io/netty/buffer/api/examples/bytetomessagedecoder/AlternativeMessageDecoder.java @@ -0,0 +1,115 @@ +/* + * Copyright 2021 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.buffer.api.examples.bytetomessagedecoder; + +import io.netty.buffer.api.Buffer; +import io.netty.buffer.api.BufferAllocator; +import io.netty.buffer.api.Send; +import io.netty.channel.ChannelHandlerAdapter; +import io.netty.channel.ChannelHandlerContext; + +import java.util.Objects; + +public abstract class AlternativeMessageDecoder extends ChannelHandlerAdapter { + public static final int DEFAULT_CHUNK_SIZE = 1 << 13; // 8 KiB + private Buffer collector; + private BufferAllocator allocator; + + protected AlternativeMessageDecoder() { + allocator = initAllocator(); + collector = initCollector(allocator, DEFAULT_CHUNK_SIZE); + } + + protected BufferAllocator initAllocator() { + return BufferAllocator.heap(); + } + + protected Buffer initCollector(BufferAllocator allocator, int defaultChunkSize) { + return allocator.allocate(defaultChunkSize); + } + + @Override + public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { + drainCollector(ctx); + collector.close(); + super.handlerRemoved(ctx); + } + + private void drainCollector(ChannelHandlerContext ctx) { + boolean madeProgress; + do { + madeProgress = decodeAndFireRead(ctx, collector); + } while (madeProgress); + } + + protected abstract boolean decodeAndFireRead(ChannelHandlerContext ctx, Buffer input); + + public BufferAllocator getAllocator() { + return allocator; + } + + public void setAllocator(BufferAllocator allocator) { + this.allocator = Objects.requireNonNull(allocator, "BufferAllocator cannot be null."); + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + if (msg instanceof Buffer) { + try (Buffer input = (Buffer) msg) { + processRead(ctx, input); + } + } else if (msg instanceof Send && ((Send) msg).isInstanceOf(Buffer.class)) { + //noinspection unchecked + try (Buffer input = ((Send) msg).receive()) { + processRead(ctx, input); + } + } else { + super.channelRead(ctx, msg); + } + } + + private void processRead(ChannelHandlerContext ctx, Buffer input) { + if (collector.isOwned() && Buffer.isComposite(collector) && input.isOwned() + && (collector.writableBytes() == 0 || input.writerOffset() == 0) + && (collector.readableBytes() == 0 || input.readerOffset() == 0) + && collector.order() == input.order()) { + Buffer.extendComposite(collector, input); + drainCollector(ctx); + return; + } + if (collector.isOwned()) { + collector.ensureWritable(input.readableBytes(), DEFAULT_CHUNK_SIZE, true); + } else { + try (Buffer prev = collector) { + int requiredCapacity = input.capacity() + prev.readableBytes(); + collector = allocator.allocate(Math.max(requiredCapacity, DEFAULT_CHUNK_SIZE), input.order()); + prev.copyInto(prev.readerOffset(), collector, 0, prev.readableBytes()); + collector.readerOffset(prev.readableBytes()); + } + } + input.copyInto(input.readerOffset(), collector, collector.writerOffset(), input.readableBytes()); + collector.writerOffset(collector.writerOffset() + input.readableBytes()); + drainCollector(ctx); + } + + @Override + public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { + if (ctx.channel().config().isAutoRead()) { + ctx.read(); + } + ctx.fireChannelReadComplete(); + } +} diff --git a/src/test/java/io/netty/buffer/api/examples/bytetomessagedecoder/AlternativeMessageDecoderTest.java b/src/test/java/io/netty/buffer/api/examples/bytetomessagedecoder/AlternativeMessageDecoderTest.java new file mode 100644 index 0000000..fb98f86 --- /dev/null +++ b/src/test/java/io/netty/buffer/api/examples/bytetomessagedecoder/AlternativeMessageDecoderTest.java @@ -0,0 +1,99 @@ +/* + * Copyright 2021 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.buffer.api.examples.bytetomessagedecoder; + +import io.netty.buffer.api.Buffer; +import io.netty.buffer.api.BufferAllocator; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.embedded.EmbeddedChannel; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.SplittableRandom; + +import static io.netty.buffer.api.BufferTestSupport.toByteArray; +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class AlternativeMessageDecoderTest { + @Test + public void splitAndParseMessagesDownThePipeline() { + EmbeddedChannel channel = new EmbeddedChannel(new AlternativeMessageDecoder() { + @Override + protected boolean decodeAndFireRead(ChannelHandlerContext ctx, Buffer input) { + // Can we read our length header? + if (input.readableBytes() < 4) { + return false; + } + + int start = input.readerOffset(); + int length = input.readInt(); + // Can we read the rest of the message? + if (input.readableBytes() < length) { + input.readerOffset(start); + return false; + } + + // We can read our message in full. + Buffer messageBuffer = input.bifurcate(input.readerOffset() + length); + ctx.fireChannelRead(messageBuffer); + return true; + } + }); + + List messages = new ArrayList<>(); + Buffer messagesBuffer = BufferAllocator.heap().allocate(132 * 1024); + SplittableRandom rng = new SplittableRandom(42); + for (int i = 0; i < 1000; i++) { + byte[] message = new byte[rng.nextInt(4, 256)]; + rng.nextBytes(message); + message[0] = (byte) (i >> 24); + message[1] = (byte) (i >> 16); + message[2] = (byte) (i >> 8); + message[3] = (byte) i; + messages.add(message); + messagesBuffer.ensureWritable(4 + message.length, 1024, false); + messagesBuffer.writeInt(message.length); + for (byte b : message) { + messagesBuffer.writeByte(b); + } + } + + while (messagesBuffer.readableBytes() > 0) { + int length = rng.nextInt(1, Math.min(500, messagesBuffer.readableBytes() + 1)); + if (length == messagesBuffer.readableBytes()) { + channel.writeInbound(messagesBuffer); + } else { + channel.writeInbound(messagesBuffer.bifurcate(length)); + } + } + + Iterator expectedItr = messages.iterator(); + Buffer actualMessage; + while ((actualMessage = channel.readInbound()) != null) { + try (Buffer ignore = actualMessage) { + assertTrue(expectedItr.hasNext()); + try (Buffer actual = actualMessage.slice()) { + assertThat(toByteArray(actual)).containsExactly(expectedItr.next()); + } + } + } + assertFalse(expectedItr.hasNext()); + } +} \ No newline at end of file From d72982a5ef018ffd97283fa5490f68ff629d29fa Mon Sep 17 00:00:00 2001 From: Chris Vest Date: Mon, 26 Apr 2021 09:41:21 +0200 Subject: [PATCH 05/12] Fix checkstyle issue --- .../bytetomessagedecoder/AlternativeMessageDecoderTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/java/io/netty/buffer/api/examples/bytetomessagedecoder/AlternativeMessageDecoderTest.java b/src/test/java/io/netty/buffer/api/examples/bytetomessagedecoder/AlternativeMessageDecoderTest.java index fb98f86..573f75d 100644 --- a/src/test/java/io/netty/buffer/api/examples/bytetomessagedecoder/AlternativeMessageDecoderTest.java +++ b/src/test/java/io/netty/buffer/api/examples/bytetomessagedecoder/AlternativeMessageDecoderTest.java @@ -96,4 +96,4 @@ class AlternativeMessageDecoderTest { } assertFalse(expectedItr.hasNext()); } -} \ No newline at end of file +} From 8c2987a824029701dca3d7ebc96f39069b2a5ce0 Mon Sep 17 00:00:00 2001 From: Chris Vest Date: Mon, 26 Apr 2021 10:34:54 +0200 Subject: [PATCH 06/12] Add a Buffer.writeBytes bulk transfer method This simplifies some of the ByteToMessageDecoder example code. --- src/main/java/io/netty/buffer/api/Buffer.java | 17 +++++++ .../buffer/api/BufferBulkAccessTest.java | 47 +++++++++++++++++++ .../AlternativeMessageDecoder.java | 6 +-- .../ByteToMessageDecoder.java | 9 ++-- 4 files changed, 69 insertions(+), 10 deletions(-) diff --git a/src/main/java/io/netty/buffer/api/Buffer.java b/src/main/java/io/netty/buffer/api/Buffer.java index b3f382d..b34a9dc 100644 --- a/src/main/java/io/netty/buffer/api/Buffer.java +++ b/src/main/java/io/netty/buffer/api/Buffer.java @@ -377,6 +377,23 @@ public interface Buffer extends Rc, BufferAccessors { */ void copyInto(int srcPos, Buffer dest, int destPos, int length); + /** + * Write into this buffer, all the readable bytes from the given buffer. + * This updates the {@linkplain #writerOffset() write offset} of this buffer, and the + * {@linkplain #readerOffset() reader offset} of the given buffer. + * + * @param source The buffer to read from. + * @return This buffer. + */ + default Buffer writeBytes(Buffer source) { + int size = source.readableBytes(); + int woff = writerOffset(); + writerOffset(woff + size); + source.copyInto(source.readerOffset(), this, woff, size); + source.readerOffset(source.readerOffset() + size); + return this; + } + /** * Resets the {@linkplain #readerOffset() read offset} and the {@linkplain #writerOffset() write offset} on this * buffer to their initial values. diff --git a/src/test/java/io/netty/buffer/api/BufferBulkAccessTest.java b/src/test/java/io/netty/buffer/api/BufferBulkAccessTest.java index e59277b..d1374c3 100644 --- a/src/test/java/io/netty/buffer/api/BufferBulkAccessTest.java +++ b/src/test/java/io/netty/buffer/api/BufferBulkAccessTest.java @@ -22,6 +22,7 @@ import java.nio.ByteBuffer; import static java.nio.ByteOrder.BIG_ENDIAN; import static java.nio.ByteOrder.LITTLE_ENDIAN; +import static org.assertj.core.api.Assertions.as; import static org.assertj.core.api.Assertions.assertThat; public class BufferBulkAccessTest extends BufferTestSupport { @@ -301,4 +302,50 @@ public class BufferBulkAccessTest extends BufferTestSupport { assertThat(buf.nativeAddress()).isNotZero(); } } + + @ParameterizedTest + @MethodSource("allocators") + public void writeBytesMustTransferDataAndUpdateOffsets(Fixture fixture) { + try (BufferAllocator alloc1 = fixture.createAllocator()) { + for (Fixture otherFixture : allocators()) { + try (BufferAllocator alloc2 = otherFixture.createAllocator(); + Buffer target = alloc1.allocate(37); + Buffer source = alloc2.allocate(35)) { + // BE to BE + target.order(BIG_ENDIAN); + source.order(BIG_ENDIAN); + verifyWriteBytes(target, source); + + // LE to BE + target.fill((byte) 0).reset().order(BIG_ENDIAN); + source.fill((byte) 0).reset().order(LITTLE_ENDIAN); + verifyWriteBytes(target, source); + + // BE to LE + target.fill((byte) 0).reset().order(LITTLE_ENDIAN); + source.fill((byte) 0).reset().order(BIG_ENDIAN); + verifyWriteBytes(target, source); + + // LE to LE + target.fill((byte) 0).reset().order(LITTLE_ENDIAN); + source.fill((byte) 0).reset().order(BIG_ENDIAN); + verifyWriteBytes(target, source); + } + } + } + } + + private static void verifyWriteBytes(Buffer target, Buffer source) { + for (int i = 0; i < 35; i++) { + source.writeByte((byte) (i + 1)); + } + target.writeBytes(source); + assertThat(target.readerOffset()).isZero(); + assertThat(target.writerOffset()).isEqualTo(35); + assertThat(source.readerOffset()).isEqualTo(35); + assertThat(source.writerOffset()).isEqualTo(35); + try (Buffer readableSlice = target.slice()) { + assertEquals(source, readableSlice); + } + } } diff --git a/src/test/java/io/netty/buffer/api/examples/bytetomessagedecoder/AlternativeMessageDecoder.java b/src/test/java/io/netty/buffer/api/examples/bytetomessagedecoder/AlternativeMessageDecoder.java index e8e1a89..d302af6 100644 --- a/src/test/java/io/netty/buffer/api/examples/bytetomessagedecoder/AlternativeMessageDecoder.java +++ b/src/test/java/io/netty/buffer/api/examples/bytetomessagedecoder/AlternativeMessageDecoder.java @@ -96,12 +96,10 @@ public abstract class AlternativeMessageDecoder extends ChannelHandlerAdapter { try (Buffer prev = collector) { int requiredCapacity = input.capacity() + prev.readableBytes(); collector = allocator.allocate(Math.max(requiredCapacity, DEFAULT_CHUNK_SIZE), input.order()); - prev.copyInto(prev.readerOffset(), collector, 0, prev.readableBytes()); - collector.readerOffset(prev.readableBytes()); + collector.writeBytes(prev); } } - input.copyInto(input.readerOffset(), collector, collector.writerOffset(), input.readableBytes()); - collector.writerOffset(collector.writerOffset() + input.readableBytes()); + collector.writeBytes(input); drainCollector(ctx); } diff --git a/src/test/java/io/netty/buffer/api/examples/bytetomessagedecoder/ByteToMessageDecoder.java b/src/test/java/io/netty/buffer/api/examples/bytetomessagedecoder/ByteToMessageDecoder.java index c5ac58c..a145d9c 100644 --- a/src/test/java/io/netty/buffer/api/examples/bytetomessagedecoder/ByteToMessageDecoder.java +++ b/src/test/java/io/netty/buffer/api/examples/bytetomessagedecoder/ByteToMessageDecoder.java @@ -110,9 +110,7 @@ public abstract class ByteToMessageDecoder extends ChannelHandlerAdapter { // assumed to be shared (e.g. refCnt() > 1) and the reallocation may not be safe. return expandCumulation(alloc, cumulation, in); } - in.copyInto(in.readerOffset(), cumulation, cumulation.writerOffset(), required); - cumulation.writerOffset(cumulation.writerOffset() + required); - in.readerOffset(in.writerOffset()); + cumulation.writeBytes(in); return cumulation; } }; @@ -480,9 +478,8 @@ public abstract class ByteToMessageDecoder extends ChannelHandlerAdapter { Buffer newCumulation = alloc.allocate(newSize, oldCumulation.order()); Buffer toRelease = newCumulation; try { - oldCumulation.copyInto(oldCumulation.readerOffset(), newCumulation, 0, oldCumulation.readableBytes()); - in.copyInto(in.readerOffset(), newCumulation, oldCumulation.readableBytes(), in.readableBytes()); - newCumulation.writerOffset(oldCumulation.readableBytes() + in.readableBytes()); + newCumulation.writeBytes(oldCumulation); + newCumulation.writeBytes(in); toRelease = oldCumulation; return newCumulation; } finally { From c09276373e09583a7504d5b7dbe1590d0231d3f4 Mon Sep 17 00:00:00 2001 From: Chris Vest Date: Mon, 26 Apr 2021 11:24:24 +0200 Subject: [PATCH 07/12] Fix prose errors --- src/main/java/io/netty/buffer/api/Buffer.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/main/java/io/netty/buffer/api/Buffer.java b/src/main/java/io/netty/buffer/api/Buffer.java index b34a9dc..4a67118 100644 --- a/src/main/java/io/netty/buffer/api/Buffer.java +++ b/src/main/java/io/netty/buffer/api/Buffer.java @@ -125,7 +125,7 @@ import java.nio.ByteOrder; * Examples of this include doing IO, or decoding a bounded part of a larger message. * On the other hand, bifurcate is suitable for when you want to hand over a region of a buffer to some other, * perhaps unknown, piece of code, and relinquish your ownership of that buffer region in the process. - * Examples of include aggregating messages into an accumulator buffer, and sending messages down the pipeline for + * Examples include aggregating messages into an accumulator buffer, and sending messages down the pipeline for * further processing, as bifurcated buffer regions, once their data has been received in its entirety. */ public interface Buffer extends Rc, BufferAccessors { @@ -576,7 +576,7 @@ public interface Buffer extends Rc, BufferAccessors { * The buffer must be in {@linkplain #isOwned() an owned state}, or an exception will be thrown. *

* The region of this buffer that contain the read and readable bytes, will be captured and returned in a new - * buffer, that will hold its own ownership of that region. This allows the returned buffer to be indepentently + * buffer, that will hold its own ownership of that region. This allows the returned buffer to be independently * {@linkplain #send() sent} to other threads. *

* The returned buffer will adopt the {@link #readerOffset()} of this buffer, and have its {@link #writerOffset()} @@ -627,7 +627,7 @@ public interface Buffer extends Rc, BufferAccessors { * The buffer must be in {@linkplain #isOwned() an owned state}, or an exception will be thrown. *

* The region of this buffer that precede the {@code splitOffset}, will be captured and returned in a new - * buffer, that will hold its own ownership of that region. This allows the returned buffer to be indepentently + * buffer, that will hold its own ownership of that region. This allows the returned buffer to be independently * {@linkplain #send() sent} to other threads. *

* The returned buffer will adopt the {@link #readerOffset()} and {@link #writerOffset()} of this buffer, From 926a1807b402fd8b4555b4f5002413a391a5e524 Mon Sep 17 00:00:00 2001 From: Chris Vest Date: Mon, 26 Apr 2021 15:29:56 +0200 Subject: [PATCH 08/12] Clean up code and remove the BufferAdaptor Instead use Mockito to implement the throwing behaviour on the buffers in those tests. Sadly Mockito cannot spy or mock our Buffer implementation classes, and does not allow mocking an interface while spying on an implementation, so we have to do a more complicated dance with our mocking. --- pom.xml | 6 + .../buffer/api/adaptor/BufferAdaptor.java | 535 ------------------ .../AlternativeMessageDecoder.java | 10 +- .../ByteToMessageDecoderTest.java | 70 +-- 4 files changed, 43 insertions(+), 578 deletions(-) delete mode 100644 src/main/java/io/netty/buffer/api/adaptor/BufferAdaptor.java diff --git a/pom.xml b/pom.xml index 6bd8954..65bace2 100644 --- a/pom.xml +++ b/pom.xml @@ -399,6 +399,12 @@ 3.18.0 test + + org.mockito + mockito-core + 3.9.0 + test + io.netty netty-build-common diff --git a/src/main/java/io/netty/buffer/api/adaptor/BufferAdaptor.java b/src/main/java/io/netty/buffer/api/adaptor/BufferAdaptor.java deleted file mode 100644 index 958e2ab..0000000 --- a/src/main/java/io/netty/buffer/api/adaptor/BufferAdaptor.java +++ /dev/null @@ -1,535 +0,0 @@ -/* - * Copyright 2021 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.buffer.api.adaptor; - -import io.netty.buffer.api.Buffer; -import io.netty.buffer.api.BufferHolder; -import io.netty.buffer.api.ByteCursor; -import io.netty.buffer.api.ReadableComponentProcessor; -import io.netty.buffer.api.Send; -import io.netty.buffer.api.WritableComponentProcessor; - -import java.nio.ByteBuffer; -import java.nio.ByteOrder; -import java.util.Objects; -import java.util.function.Function; - -/** - * A {@link Buffer} implementation that delegates all method calls to a given delegate buffer instance. - */ -public abstract class BufferAdaptor implements Buffer { - protected Buffer buffer; - - protected BufferAdaptor(Buffer buffer) { - this.buffer = Objects.requireNonNull(buffer, "Delegate buffer cannot be null."); - } - - @Override - public Buffer order(ByteOrder order) { - buffer.order(order); - return this; - } - - @Override - public ByteOrder order() { - return buffer.order(); - } - - @Override - public int capacity() { - return buffer.capacity(); - } - - @Override - public int readerOffset() { - return buffer.readerOffset(); - } - - @Override - public Buffer readerOffset(int offset) { - buffer.readerOffset(offset); - return this; - } - - @Override - public int writerOffset() { - return buffer.writerOffset(); - } - - @Override - public Buffer writerOffset(int offset) { - buffer.writerOffset(offset); - return this; - } - - @Override - public int readableBytes() { - return buffer.readableBytes(); - } - - @Override - public int writableBytes() { - return buffer.writableBytes(); - } - - @Override - public Buffer fill(byte value) { - buffer.fill(value); - return this; - } - - @Override - public long nativeAddress() { - return buffer.nativeAddress(); - } - - @Override - public Buffer readOnly(boolean readOnly) { - buffer.readOnly(readOnly); - return this; - } - - @Override - public boolean readOnly() { - return buffer.readOnly(); - } - - @Override - public void copyInto(int srcPos, byte[] dest, int destPos, int length) { - buffer.copyInto(srcPos, dest, destPos, length); - } - - @Override - public void copyInto(int srcPos, ByteBuffer dest, int destPos, int length) { - buffer.copyInto(srcPos, dest, destPos, length); - } - - @Override - public void copyInto(int srcPos, Buffer dest, int destPos, int length) { - buffer.copyInto(srcPos, dest, destPos, length); - } - - @Override - public Buffer reset() { - buffer.reset(); - return this; - } - - @Override - public ByteCursor openCursor() { - return buffer.openCursor(); - } - - @Override - public ByteCursor openCursor(int fromOffset, int length) { - return buffer.openCursor(fromOffset, length); - } - - @Override - public ByteCursor openReverseCursor() { - return buffer.openReverseCursor(); - } - - @Override - public ByteCursor openReverseCursor(int fromOffset, int length) { - return buffer.openReverseCursor(fromOffset, length); - } - - @Override - public void ensureWritable(int size) { - buffer.ensureWritable(size); - } - - @Override - public void ensureWritable(int size, int minimumGrowth, boolean allowCompaction) { - buffer.ensureWritable(size, minimumGrowth, allowCompaction); - } - - @Override - public Buffer slice() { - buffer.slice(); - return this; - } - - @Override - public Buffer slice(int offset, int length) { - buffer.slice(offset, length); - return this; - } - - @Override - public Buffer bifurcate() { - buffer.bifurcate(); - return this; - } - - @Override - public Buffer bifurcate(int splitOffset) { - buffer.bifurcate(splitOffset); - return this; - } - - @Override - public void compact() { - buffer.compact(); - } - - @Override - public int countComponents() { - return buffer.countComponents(); - } - - @Override - public int countReadableComponents() { - return buffer.countReadableComponents(); - } - - @Override - public int countWritableComponents() { - return buffer.countWritableComponents(); - } - - @Override - public int forEachReadable(int initialIndex, ReadableComponentProcessor processor) - throws E { - return buffer.forEachReadable(initialIndex, processor); - } - - @Override - public int forEachWritable(int initialIndex, WritableComponentProcessor processor) - throws E { - return buffer.forEachWritable(initialIndex, processor); - } - - @Override - public byte readByte() { - return buffer.readByte(); - } - - @Override - public byte getByte(int roff) { - return buffer.getByte(roff); - } - - @Override - public int readUnsignedByte() { - return buffer.readUnsignedByte(); - } - - @Override - public int getUnsignedByte(int roff) { - return buffer.getUnsignedByte(roff); - } - - @Override - public Buffer writeByte(byte value) { - buffer.writeByte(value); - return this; - } - - @Override - public Buffer setByte(int woff, byte value) { - buffer.setByte(woff, value); - return this; - } - - @Override - public Buffer writeUnsignedByte(int value) { - buffer.writeUnsignedByte(value); - return this; - } - - @Override - public Buffer setUnsignedByte(int woff, int value) { - buffer.setUnsignedByte(woff, value); - return this; - } - - @Override - public char readChar() { - return buffer.readChar(); - } - - @Override - public char getChar(int roff) { - return buffer.getChar(roff); - } - - @Override - public Buffer writeChar(char value) { - buffer.writeChar(value); - return this; - } - - @Override - public Buffer setChar(int woff, char value) { - buffer.setChar(woff, value); - return this; - } - - @Override - public short readShort() { - return buffer.readShort(); - } - - @Override - public short getShort(int roff) { - return buffer.getShort(roff); - } - - @Override - public int readUnsignedShort() { - return buffer.readUnsignedShort(); - } - - @Override - public int getUnsignedShort(int roff) { - return buffer.getUnsignedShort(roff); - } - - @Override - public Buffer writeShort(short value) { - buffer.writeShort(value); - return this; - } - - @Override - public Buffer setShort(int woff, short value) { - buffer.setShort(woff, value); - return this; - } - - @Override - public Buffer writeUnsignedShort(int value) { - buffer.writeUnsignedShort(value); - return this; - } - - @Override - public Buffer setUnsignedShort(int woff, int value) { - buffer.setUnsignedShort(woff, value); - return this; - } - - @Override - public int readMedium() { - return buffer.readMedium(); - } - - @Override - public int getMedium(int roff) { - return buffer.getMedium(roff); - } - - @Override - public int readUnsignedMedium() { - return buffer.readUnsignedMedium(); - } - - @Override - public int getUnsignedMedium(int roff) { - return buffer.getUnsignedMedium(roff); - } - - @Override - public Buffer writeMedium(int value) { - buffer.writeMedium(value); - return this; - } - - @Override - public Buffer setMedium(int woff, int value) { - buffer.setMedium(woff, value); - return this; - } - - @Override - public Buffer writeUnsignedMedium(int value) { - buffer.writeUnsignedMedium(value); - return this; - } - - @Override - public Buffer setUnsignedMedium(int woff, int value) { - buffer.setUnsignedMedium(woff, value); - return this; - } - - @Override - public int readInt() { - return buffer.readInt(); - } - - @Override - public int getInt(int roff) { - return buffer.getInt(roff); - } - - @Override - public long readUnsignedInt() { - return buffer.readUnsignedInt(); - } - - @Override - public long getUnsignedInt(int roff) { - return buffer.getUnsignedInt(roff); - } - - @Override - public Buffer writeInt(int value) { - buffer.writeInt(value); - return this; - } - - @Override - public Buffer setInt(int woff, int value) { - buffer.setInt(woff, value); - return this; - } - - @Override - public Buffer writeUnsignedInt(long value) { - buffer.writeUnsignedInt(value); - return this; - } - - @Override - public Buffer setUnsignedInt(int woff, long value) { - buffer.setUnsignedInt(woff, value); - return this; - } - - @Override - public float readFloat() { - return buffer.readFloat(); - } - - @Override - public float getFloat(int roff) { - return buffer.getFloat(roff); - } - - @Override - public Buffer writeFloat(float value) { - buffer.writeFloat(value); - return this; - } - - @Override - public Buffer setFloat(int woff, float value) { - buffer.setFloat(woff, value); - return this; - } - - @Override - public long readLong() { - return buffer.readLong(); - } - - @Override - public long getLong(int roff) { - return buffer.getLong(roff); - } - - @Override - public Buffer writeLong(long value) { - buffer.writeLong(value); - return this; - } - - @Override - public Buffer setLong(int woff, long value) { - buffer.setLong(woff, value); - return this; - } - - @Override - public double readDouble() { - return buffer.readDouble(); - } - - @Override - public double getDouble(int roff) { - return buffer.getDouble(roff); - } - - @Override - public Buffer writeDouble(double value) { - buffer.writeDouble(value); - return this; - } - - @Override - public Buffer setDouble(int woff, double value) { - buffer.setDouble(woff, value); - return this; - } - - @Override - public Buffer acquire() { - buffer.acquire(); - return this; - } - - @Override - public Buffer get() { - buffer.get(); - return this; - } - - @Override - public boolean isInstanceOf(Class cls) { - return buffer.isInstanceOf(cls); - } - - @Override - public void close() { - buffer.close(); - } - - @SuppressWarnings("unchecked") - @Override - public Send send() { - Class aClass = (Class) (Class) getClass(); - Function receive = this::receive; - return buffer.send().map(aClass, receive); - } - - /** - * Called when a {@linkplain #send() sent} {@link BufferAdaptor} is received by the recipient. - * The {@link BufferAdaptor} should return a new concrete instance, that wraps the given {@link Buffer} object. - * - * @param buf The {@link Buffer} that is {@linkplain Send#receive() received} by the recipient, - * and needs to be wrapped in a new {@link BufferHolder} instance. - * @return A new buffer adaptor instance, containing the given {@linkplain Buffer buffer}. - */ - protected abstract BufferAdaptor receive(Buffer buf); - - @Override - public boolean isOwned() { - return buffer.isOwned(); - } - - @Override - public int countBorrows() { - return buffer.countBorrows(); - } - - @Override - public boolean isAccessible() { - return buffer.isAccessible(); - } -} diff --git a/src/test/java/io/netty/buffer/api/examples/bytetomessagedecoder/AlternativeMessageDecoder.java b/src/test/java/io/netty/buffer/api/examples/bytetomessagedecoder/AlternativeMessageDecoder.java index d302af6..2353d76 100644 --- a/src/test/java/io/netty/buffer/api/examples/bytetomessagedecoder/AlternativeMessageDecoder.java +++ b/src/test/java/io/netty/buffer/api/examples/bytetomessagedecoder/AlternativeMessageDecoder.java @@ -93,10 +93,12 @@ public abstract class AlternativeMessageDecoder extends ChannelHandlerAdapter { if (collector.isOwned()) { collector.ensureWritable(input.readableBytes(), DEFAULT_CHUNK_SIZE, true); } else { - try (Buffer prev = collector) { - int requiredCapacity = input.capacity() + prev.readableBytes(); - collector = allocator.allocate(Math.max(requiredCapacity, DEFAULT_CHUNK_SIZE), input.order()); - collector.writeBytes(prev); + int requiredCapacity = input.readableBytes() + collector.readableBytes(); + int allocationSize = Math.max(requiredCapacity, DEFAULT_CHUNK_SIZE); + try (Buffer newBuffer = allocator.allocate(allocationSize, input.order())) { + newBuffer.writeBytes(collector); + collector.close(); + collector = newBuffer.acquire(); } } collector.writeBytes(input); diff --git a/src/test/java/io/netty/buffer/api/examples/bytetomessagedecoder/ByteToMessageDecoderTest.java b/src/test/java/io/netty/buffer/api/examples/bytetomessagedecoder/ByteToMessageDecoderTest.java index d2ab93e..ac8175d 100644 --- a/src/test/java/io/netty/buffer/api/examples/bytetomessagedecoder/ByteToMessageDecoderTest.java +++ b/src/test/java/io/netty/buffer/api/examples/bytetomessagedecoder/ByteToMessageDecoderTest.java @@ -17,18 +17,18 @@ package io.netty.buffer.api.examples.bytetomessagedecoder; import io.netty.buffer.api.Buffer; import io.netty.buffer.api.BufferAllocator; -import io.netty.buffer.api.adaptor.BufferAdaptor; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.embedded.EmbeddedChannel; import io.netty.channel.socket.ChannelInputShutdownEvent; import org.junit.Test; +import org.mockito.stubbing.Answer; -import java.nio.ByteOrder; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import static io.netty.buffer.api.BufferAllocator.heap; import static io.netty.buffer.api.BufferTestSupport.assertEquals; @@ -37,9 +37,12 @@ import static java.nio.ByteOrder.LITTLE_ENDIAN; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.withSettings; public class ByteToMessageDecoderTest { @@ -334,41 +337,9 @@ public class ByteToMessageDecoderTest { assertFalse(channel.finish()); } - static class WriteFailingByteBuf extends BufferAdaptor { - private final Error error = new Error(); - private int untilFailure; - - WriteFailingByteBuf(int untilFailure, int capacity) { - this(untilFailure, heap().allocate(capacity, BIG_ENDIAN)); - this.untilFailure = untilFailure; - } - - private WriteFailingByteBuf(int untilFailure, Buffer buffer) { - super(buffer); - this.untilFailure = untilFailure; - } - - @Override - public Buffer order(ByteOrder order) { - if (order == LITTLE_ENDIAN && --untilFailure <= 0) { - throw error; - } - return super.order(order); - } - - @Override - protected BufferAdaptor receive(Buffer buf) { - return new WriteFailingByteBuf(untilFailure, buf); - } - - Error writeError() { - return error; - } - } - @Test public void releaseWhenMergeCumulateThrows() { - WriteFailingByteBuf oldCumulation = new WriteFailingByteBuf(1, 64); + Buffer oldCumulation = writeFailingCumulation(1, 64); oldCumulation.writeByte((byte) 0); Buffer in = heap().allocate(12, BIG_ENDIAN).writerOffset(12); @@ -379,12 +350,33 @@ public class ByteToMessageDecoderTest { thrown = t; } - assertSame(oldCumulation.writeError(), thrown); + assertThat(thrown).hasMessage("boom"); assertFalse(in.isAccessible()); assertTrue(oldCumulation.isOwned()); oldCumulation.close(); } + private static Buffer writeFailingCumulation(int untilFailure, int capacity) { + Buffer realBuffer = heap().allocate(capacity, BIG_ENDIAN); + Answer callRealBuffer = inv -> { + Object result = inv.getMethod().invoke(realBuffer, inv.getArguments()); + if (result == realBuffer) { + // Preserve mock wrapper for methods that returns the callee ('this') buffer instance. + return inv.getMock(); + } + return result; + }; + Buffer buffer = mock(Buffer.class, withSettings().defaultAnswer(callRealBuffer)); + AtomicInteger countDown = new AtomicInteger(untilFailure); + doAnswer(inv -> { + if (countDown.decrementAndGet() <= 0) { + throw new Error("boom"); + } + return callRealBuffer.answer(inv); + }).when(buffer).writeBytes(any(Buffer.class)); + return buffer; + } + @Test public void releaseWhenMergeCumulateThrowsInExpand() { releaseWhenMergeCumulateThrowsInExpand(1, true); @@ -394,7 +386,7 @@ public class ByteToMessageDecoderTest { private static void releaseWhenMergeCumulateThrowsInExpand(int untilFailure, boolean shouldFail) { Buffer oldCumulation = heap().allocate(8, BIG_ENDIAN).writeByte((byte) 0); - final WriteFailingByteBuf newCumulation = new WriteFailingByteBuf(untilFailure, 16); + Buffer newCumulation = writeFailingCumulation(untilFailure, 16); BufferAllocator allocator = new BufferAllocator() { @Override @@ -414,7 +406,7 @@ public class ByteToMessageDecoderTest { assertFalse(in.isAccessible()); if (shouldFail) { - assertSame(newCumulation.writeError(), thrown); + assertThat(thrown).hasMessage("boom"); assertTrue(oldCumulation.isOwned()); oldCumulation.close(); assertFalse(newCumulation.isAccessible()); From 60954394a507515f5ddcaf1fe57fe6a94b0326f1 Mon Sep 17 00:00:00 2001 From: Chris Vest Date: Tue, 27 Apr 2021 10:03:24 +0200 Subject: [PATCH 09/12] Use Send.isSendOf to simplify code --- .../bytetomessagedecoder/AlternativeMessageDecoder.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/java/io/netty/buffer/api/examples/bytetomessagedecoder/AlternativeMessageDecoder.java b/src/test/java/io/netty/buffer/api/examples/bytetomessagedecoder/AlternativeMessageDecoder.java index 2353d76..a1ed218 100644 --- a/src/test/java/io/netty/buffer/api/examples/bytetomessagedecoder/AlternativeMessageDecoder.java +++ b/src/test/java/io/netty/buffer/api/examples/bytetomessagedecoder/AlternativeMessageDecoder.java @@ -71,7 +71,7 @@ public abstract class AlternativeMessageDecoder extends ChannelHandlerAdapter { try (Buffer input = (Buffer) msg) { processRead(ctx, input); } - } else if (msg instanceof Send && ((Send) msg).isInstanceOf(Buffer.class)) { + } else if (Send.isSendOf(Buffer.class, msg)) { //noinspection unchecked try (Buffer input = ((Send) msg).receive()) { processRead(ctx, input); From 25b234acd21a7d5c828379d142c0499c27474459 Mon Sep 17 00:00:00 2001 From: Chris Vest Date: Tue, 27 Apr 2021 13:33:28 +0200 Subject: [PATCH 10/12] Make the BufferBulkAccessTest.writeBytesMustTransferDataAndUpdateOffsets test run faster Only run a sample of 10% of the possible combinations, and then run them in parallel. --- .../io/netty/buffer/api/BufferBulkAccessTest.java | 14 +++++++++++--- .../io/netty/buffer/api/BufferTestSupport.java | 10 ++++++---- 2 files changed, 17 insertions(+), 7 deletions(-) diff --git a/src/test/java/io/netty/buffer/api/BufferBulkAccessTest.java b/src/test/java/io/netty/buffer/api/BufferBulkAccessTest.java index d1374c3..739b73e 100644 --- a/src/test/java/io/netty/buffer/api/BufferBulkAccessTest.java +++ b/src/test/java/io/netty/buffer/api/BufferBulkAccessTest.java @@ -19,10 +19,10 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; import java.nio.ByteBuffer; +import java.util.Arrays; import static java.nio.ByteOrder.BIG_ENDIAN; import static java.nio.ByteOrder.LITTLE_ENDIAN; -import static org.assertj.core.api.Assertions.as; import static org.assertj.core.api.Assertions.assertThat; public class BufferBulkAccessTest extends BufferTestSupport { @@ -303,11 +303,16 @@ public class BufferBulkAccessTest extends BufferTestSupport { } } + private static final Memoize OTHER_FIXTURES = new Memoize( + () -> Arrays.stream(allocators()).filter(filterOfTheDay(10)).toArray(Fixture[]::new)); + @ParameterizedTest @MethodSource("allocators") public void writeBytesMustTransferDataAndUpdateOffsets(Fixture fixture) { try (BufferAllocator alloc1 = fixture.createAllocator()) { - for (Fixture otherFixture : allocators()) { + // Only test 10% of available combinations. Otherwise, this takes too long. + Fixture[] allocators = OTHER_FIXTURES.get(); + Arrays.stream(allocators).parallel().forEach(otherFixture -> { try (BufferAllocator alloc2 = otherFixture.createAllocator(); Buffer target = alloc1.allocate(37); Buffer source = alloc2.allocate(35)) { @@ -330,8 +335,11 @@ public class BufferBulkAccessTest extends BufferTestSupport { target.fill((byte) 0).reset().order(LITTLE_ENDIAN); source.fill((byte) 0).reset().order(BIG_ENDIAN); verifyWriteBytes(target, source); + } catch (Exception e) { + e.addSuppressed(new RuntimeException("other fixture was: " + otherFixture)); + throw e; } - } + }); } } diff --git a/src/test/java/io/netty/buffer/api/BufferTestSupport.java b/src/test/java/io/netty/buffer/api/BufferTestSupport.java index 7e56591..722e1b2 100644 --- a/src/test/java/io/netty/buffer/api/BufferTestSupport.java +++ b/src/test/java/io/netty/buffer/api/BufferTestSupport.java @@ -92,13 +92,15 @@ public abstract class BufferTestSupport { if ("nosample".equalsIgnoreCase(sampleSetting)) { return fixture -> true; } + // Filter out 95% of tests. + return filterOfTheDay(5); + } + + protected static Predicate filterOfTheDay(int percentage) { Instant today = Instant.now().truncatedTo(ChronoUnit.DAYS); // New seed every day. SplittableRandom rng = new SplittableRandom(today.hashCode()); AtomicInteger counter = new AtomicInteger(); - return fixture -> { - // Filter out 95% of tests. - return counter.getAndIncrement() < 1 || rng.nextInt(0, 100) < 5; - }; + return fixture -> counter.getAndIncrement() < 1 || rng.nextInt(0, 100) < percentage; } static Fixture[] allocators() { From 49deb77fd37d5970f80066896f991af7a24a58dd Mon Sep 17 00:00:00 2001 From: Chris Vest Date: Tue, 27 Apr 2021 14:09:27 +0200 Subject: [PATCH 11/12] Allow running AlternativeMessageDecoderTest from Maven --- src/main/java/module-info.java | 2 +- .../bytetomessagedecoder/AlternativeMessageDecoderTest.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/module-info.java b/src/main/java/module-info.java index 99f995b..53f3187 100644 --- a/src/main/java/module-info.java +++ b/src/main/java/module-info.java @@ -18,7 +18,7 @@ module netty.incubator.buffer { requires io.netty.common; requires io.netty.buffer; - // Optional dependencies, needed for some of the examples. + // Optional dependencies, needed for some examples. requires static java.logging; exports io.netty.buffer.api; diff --git a/src/test/java/io/netty/buffer/api/examples/bytetomessagedecoder/AlternativeMessageDecoderTest.java b/src/test/java/io/netty/buffer/api/examples/bytetomessagedecoder/AlternativeMessageDecoderTest.java index 573f75d..35f99bf 100644 --- a/src/test/java/io/netty/buffer/api/examples/bytetomessagedecoder/AlternativeMessageDecoderTest.java +++ b/src/test/java/io/netty/buffer/api/examples/bytetomessagedecoder/AlternativeMessageDecoderTest.java @@ -31,7 +31,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; -class AlternativeMessageDecoderTest { +public class AlternativeMessageDecoderTest { @Test public void splitAndParseMessagesDownThePipeline() { EmbeddedChannel channel = new EmbeddedChannel(new AlternativeMessageDecoder() { From 0db6a745dc5b287d1fcf21bdf411fb248b220db5 Mon Sep 17 00:00:00 2001 From: Chris Vest Date: Tue, 27 Apr 2021 15:06:43 +0200 Subject: [PATCH 12/12] Always attach test results to PR builds Previously the test results were only attached to passing builds, which is, like, the opposite of useful. Also finally figured out how to nerf the local docker layer cache. Recorded this new knowledge as make commands. --- .github/workflows/ci-workflow.yml | 3 ++- Makefile | 5 +++++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/.github/workflows/ci-workflow.yml b/.github/workflows/ci-workflow.yml index 4aeca98..8ba7711 100644 --- a/.github/workflows/ci-workflow.yml +++ b/.github/workflows/ci-workflow.yml @@ -44,7 +44,8 @@ jobs: - name: Make build run: make build - name: Publish Test Report - uses: scacap/action-surefire-report@v1.0.7 + uses: scacap/action-surefire-report@v1.0.9 + if: ${{ always() }} with: github_token: ${{ secrets.GITHUB_TOKEN }} report_paths: '**/target/surefire-reports/TEST-*.xml' diff --git a/Makefile b/Makefile index d7694cb..4fa84ec 100644 --- a/Makefile +++ b/Makefile @@ -16,6 +16,9 @@ clean: docker rm -fv build-container-dbg docker rm -fv build-container +clean-layer-cache: + docker builder prune -f -a + build: image docker create --name build-container netty-incubator-buffer:build mkdir -p target/container-output @@ -23,3 +26,5 @@ build: image docker wait build-container || (docker cp build-container:/home/build target/container-output && false) docker cp build-container:/home/build/target . docker rm build-container + +rebuild: clean clean-layer-cache build