Merge pull request #44 from netty/slice-bifurcate

More docs and examples of slice() and bifurcate()
This commit is contained in:
Chris Vest 2021-04-27 16:10:55 +02:00 committed by GitHub
commit ec0dbb6b5e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 2005 additions and 95 deletions

View File

@ -44,7 +44,8 @@ jobs:
- name: Make build
run: make build
- name: Publish Test Report
uses: scacap/action-surefire-report@v1.0.7
uses: scacap/action-surefire-report@v1.0.9
if: ${{ always() }}
with:
github_token: ${{ secrets.GITHUB_TOKEN }}
report_paths: '**/target/surefire-reports/TEST-*.xml'

View File

@ -16,6 +16,9 @@ clean:
docker rm -fv build-container-dbg
docker rm -fv build-container
clean-layer-cache:
docker builder prune -f -a
build: image
docker create --name build-container netty-incubator-buffer:build
mkdir -p target/container-output
@ -23,3 +26,5 @@ build: image
docker wait build-container || (docker cp build-container:/home/build target/container-output && false)
docker cp build-container:/home/build/target .
docker rm build-container
rebuild: clean clean-layer-cache build

View File

@ -399,6 +399,12 @@
<version>3.18.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>3.9.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-build-common</artifactId>

View File

@ -54,8 +54,6 @@ import java.nio.ByteOrder;
* such as with the {@link #getByte(int)} method,
* from multiple threads.
* <p>
* Confined buffers will initially be confined to the thread that allocates them.
* <p>
* If a buffer needs to be accessed by a different thread,
* then the ownership of that buffer must be sent to that thread.
* This can be done with the {@link #send()} method.
@ -101,6 +99,34 @@ import java.nio.ByteOrder;
* 0 <= readerOffset <= writerOffset <= capacity
* </pre>
*
* <h3 name="slice-bifurcate">Slice vs. Bifurcate</h3>
*
* The {@link #slice()} and {@link #bifurcate()} methods both return new buffers on the memory of the buffer they're
* called on.
* However, there are also important differences between the two, as they are aimed at different use cases that were
* previously (in the {@code ByteBuf} API) covered by {@code slice()} alone.
*
* <ul>
* <li>
* Slices create a new view onto the memory, that is shared between the slice and the buffer.
* As long as both the slice and the originating buffer are alive, neither will have ownership of the memory.
* Since the memory is shared, changes to the data made through one will be visible through the other.
* </li>
* <li>
* Bifurcation breaks the ownership of the memory in two.
* Both resulting buffers retain ownership of their respective region of memory.
* They can do this because the regions are guaranteed to not overlap; data changes through one buffer will not
* be visible through the other.
* </li>
* </ul>
*
* These differences means that slicing is mostly suitable for when you temporarily want to share a focused area of a
* buffer.
* Examples of this include doing IO, or decoding a bounded part of a larger message.
* On the other hand, bifurcate is suitable for when you want to hand over a region of a buffer to some other,
* perhaps unknown, piece of code, and relinquish your ownership of that buffer region in the process.
* Examples include aggregating messages into an accumulator buffer, and sending messages down the pipeline for
* further processing, as bifurcated buffer regions, once their data has been received in its entirety.
*/
public interface Buffer extends Rc<Buffer>, BufferAccessors {
/**
@ -291,43 +317,6 @@ public interface Buffer extends Rc<Buffer>, BufferAccessors {
*/
boolean readOnly();
/**
* Returns a slice of this buffer's readable bytes.
* Modifying the content of the returned buffer or this buffer affects each other's content,
* while they maintain separate offsets. This method is identical to
* {@code buf.slice(buf.readerOffset(), buf.readableBytes())}.
* This method does not modify {@link #readerOffset()} or {@link #writerOffset()} of this buffer.
* <p>
* This method increments the reference count of this buffer.
* The reference count is decremented again when the slice is deallocated.
* <p>
* The slice is created with a {@linkplain #writerOffset() write offset} equal to the length of the slice,
* so that the entire contents of the slice is ready to be read.
*
* @return A new buffer instance, with independent {@link #readerOffset()} and {@link #writerOffset()},
* that is a view of the readable region of this buffer.
*/
default Buffer slice() {
return slice(readerOffset(), readableBytes());
}
/**
* Returns a slice of the given region of this buffer.
* Modifying the content of the returned buffer or this buffer affects each other's content,
* while they maintain separate offsets.
* This method does not modify {@link #readerOffset()} or {@link #writerOffset()} of this buffer.
* <p>
* This method increments the reference count of this buffer.
* The reference count is decremented again when the slice is deallocated.
* <p>
* The slice is created with a {@linkplain #writerOffset() write offset} equal to the length of the slice,
* so that the entire contents of the slice is ready to be read.
*
* @return A new buffer instance, with independent {@link #readerOffset()} and {@link #writerOffset()},
* that is a view of the given region of this buffer.
*/
Buffer slice(int offset, int length);
/**
* Copies the given length of data from this buffer into the given destination array, beginning at the given source
* position in this buffer, and the given destination position in the destination array.
@ -388,6 +377,23 @@ public interface Buffer extends Rc<Buffer>, BufferAccessors {
*/
void copyInto(int srcPos, Buffer dest, int destPos, int length);
/**
* Write into this buffer, all the readable bytes from the given buffer.
* This updates the {@linkplain #writerOffset() write offset} of this buffer, and the
* {@linkplain #readerOffset() reader offset} of the given buffer.
*
* @param source The buffer to read from.
* @return This buffer.
*/
default Buffer writeBytes(Buffer source) {
int size = source.readableBytes();
int woff = writerOffset();
writerOffset(woff + size);
source.copyInto(source.readerOffset(), this, woff, size);
source.readerOffset(source.readerOffset() + size);
return this;
}
/**
* Resets the {@linkplain #readerOffset() read offset} and the {@linkplain #writerOffset() write offset} on this
* buffer to their initial values.
@ -469,7 +475,7 @@ public interface Buffer extends Rc<Buffer>, BufferAccessors {
* If this buffer already has the necessary space, then this method returns immediately.
* If this buffer does not already have the necessary space, then it will be expanded using the
* {@link BufferAllocator} the buffer was created with.
* This method is the same as calling {@link #ensureWritable(int, boolean)} where {@code allowCompaction} is
* This method is the same as calling {@link #ensureWritable(int, int, boolean)} where {@code allowCompaction} is
* {@code false}.
*
* @param size The requested number of bytes of space that should be available for writing.
@ -477,7 +483,7 @@ public interface Buffer extends Rc<Buffer>, BufferAccessors {
* or is {@linkplain #readOnly() read-only}.
*/
default void ensureWritable(int size) {
ensureWritable(size, true);
ensureWritable(size, 1, true);
}
/**
@ -509,13 +515,60 @@ public interface Buffer extends Rc<Buffer>, BufferAccessors {
* </ul>
*
* @param size The requested number of bytes of space that should be available for writing.
* @param minimumGrowth The minimum number of bytes to grow by. If it is determined that memory should be allocated
* and copied, make sure that the new memory allocation is bigger than the old one by at least
* this many bytes. This way, the buffer can grow by more than what is immediately necessary,
* thus amortising the costs of allocating and copying.
* @param allowCompaction {@code true} if the method is allowed to modify the
* {@linkplain #readerOffset() reader offset} and
* {@linkplain #writerOffset() writer offset}, otherwise {@code false}.
* @throws IllegalStateException if this buffer is not in an {@linkplain #isOwned() owned} state,
* * or is {@linkplain #readOnly() read-only}.
* or is {@linkplain #readOnly() read-only}.
*/
void ensureWritable(int size, boolean allowCompaction);
void ensureWritable(int size, int minimumGrowth, boolean allowCompaction);
/**
* Returns a slice of this buffer's readable bytes.
* Modifying the content of the returned buffer or this buffer affects each other's content,
* while they maintain separate offsets. This method is identical to
* {@code buf.slice(buf.readerOffset(), buf.readableBytes())}.
* This method does not modify {@link #readerOffset()} or {@link #writerOffset()} of this buffer.
* <p>
* This method increments the reference count of this buffer.
* The reference count is decremented again when the slice is deallocated.
* <p>
* The slice is created with a {@linkplain #writerOffset() write offset} equal to the length of the slice,
* so that the entire contents of the slice is ready to be read.
* <p>
* See the <a href="#slice-bifurcate">Slice vs. Bifurcate</a> section for details on the difference between slice
* and bifurcate.
*
* @return A new buffer instance, with independent {@link #readerOffset()} and {@link #writerOffset()},
* that is a view of the readable region of this buffer.
*/
default Buffer slice() {
return slice(readerOffset(), readableBytes());
}
/**
* Returns a slice of the given region of this buffer.
* Modifying the content of the returned buffer or this buffer affects each other's content,
* while they maintain separate offsets.
* This method does not modify {@link #readerOffset()} or {@link #writerOffset()} of this buffer.
* <p>
* This method increments the reference count of this buffer.
* The reference count is decremented again when the slice is deallocated.
* <p>
* The slice is created with a {@linkplain #writerOffset() write offset} equal to the length of the slice,
* so that the entire contents of the slice is ready to be read.
* <p>
* See the <a href="#slice-bifurcate">Slice vs. Bifurcate</a> section for details on the difference between slice
* and bifurcate.
*
* @return A new buffer instance, with independent {@link #readerOffset()} and {@link #writerOffset()},
* that is a view of the given region of this buffer.
*/
Buffer slice(int offset, int length);
/**
* Split the buffer into two, at the {@linkplain #writerOffset() write offset} position.
@ -523,7 +576,7 @@ public interface Buffer extends Rc<Buffer>, BufferAccessors {
* The buffer must be in {@linkplain #isOwned() an owned state}, or an exception will be thrown.
* <p>
* The region of this buffer that contain the read and readable bytes, will be captured and returned in a new
* buffer, that will hold its own ownership of that region. This allows the returned buffer to be indepentently
* buffer, that will hold its own ownership of that region. This allows the returned buffer to be independently
* {@linkplain #send() sent} to other threads.
* <p>
* The returned buffer will adopt the {@link #readerOffset()} of this buffer, and have its {@link #writerOffset()}
@ -558,10 +611,64 @@ public interface Buffer extends Rc<Buffer>, BufferAccessors {
* simply split its internal array in two.
* <p>
* Bifurcated buffers support all operations that normal buffers do, including {@link #ensureWritable(int)}.
* <p>
* See the <a href="#slice-bifurcate">Slice vs. Bifurcate</a> section for details on the difference between slice
* and bifurcate.
*
* @return A new buffer with independent and exclusive ownership over the read and readable bytes from this buffer.
*/
Buffer bifurcate();
default Buffer bifurcate() {
return bifurcate(writerOffset());
}
/**
* Split the buffer into two, at the given {@code splitOffset}.
* <p>
* The buffer must be in {@linkplain #isOwned() an owned state}, or an exception will be thrown.
* <p>
* The region of this buffer that precede the {@code splitOffset}, will be captured and returned in a new
* buffer, that will hold its own ownership of that region. This allows the returned buffer to be independently
* {@linkplain #send() sent} to other threads.
* <p>
* The returned buffer will adopt the {@link #readerOffset()} and {@link #writerOffset()} of this buffer,
* but truncated to fit within the capacity dictated by the {@code splitOffset}.
* <p>
* The memory region in the returned buffer will become inaccessible through this buffer. If the
* {@link #readerOffset()} or {@link #writerOffset()} of this buffer lie prior to the {@code splitOffset},
* then those offsets will be moved forward so they land on offset 0 after the bifurcation.
* <p>
* Effectively, the following transformation takes place:
* <pre>{@code
* This buffer:
* +--------------------------------+
* 0| |splitOffset |cap
* +---------------+----------------+
* / / \ \
* / / \ \
* / / \ \
* / / \ \
* / / \ \
* +---------------+ +---------------+
* | |cap | |cap
* +---------------+ +---------------+
* Returned buffer. This buffer.
* }</pre>
* When the buffers are in this state, both of the bifurcated parts retain an atomic reference count on the
* underlying memory. This means that shared underlying memory will not be deallocated or returned to a pool, until
* all of the bifurcated parts have been closed.
* <p>
* Composite buffers have it a little easier, in that at most only one of the constituent buffers will actually be
* bifurcated. If the split point lands perfectly between two constituent buffers, then a composite buffer can
* simply split its internal array in two.
* <p>
* Bifurcated buffers support all operations that normal buffers do, including {@link #ensureWritable(int)}.
* <p>
* See the <a href="#slice-bifurcate">Slice vs. Bifurcate</a> section for details on the difference between slice
* and bifurcate.
*
* @return A new buffer with independent and exclusive ownership over the read and readable bytes from this buffer.
*/
Buffer bifurcate(int splitOffset);
/**
* Discards the read bytes, and moves the buffer contents to the beginning of the buffer.

View File

@ -606,13 +606,16 @@ final class CompositeBuffer extends RcSupport<Buffer, CompositeBuffer> implement
}
@Override
public void ensureWritable(int size, boolean allowCompaction) {
public void ensureWritable(int size, int minimumGrowth, boolean allowCompaction) {
if (!isOwned()) {
throw new IllegalStateException("Buffer is not owned. Only owned buffers can call ensureWritable.");
}
if (size < 0) {
throw new IllegalArgumentException("Cannot ensure writable for a negative size: " + size + '.');
}
if (minimumGrowth < 0) {
throw new IllegalArgumentException("The minimum growth cannot be negative: " + minimumGrowth + '.');
}
if (readOnly) {
throw bufferIsReadOnly();
}
@ -649,13 +652,20 @@ final class CompositeBuffer extends RcSupport<Buffer, CompositeBuffer> implement
// Now we have enough space.
return;
}
} else if (bufs.length == 1) {
// If we only have a single component buffer, then we can safely compact that in-place.
bufs[0].compact();
computeBufferOffsets();
if (writableBytes() >= size) {
// Now we have enough space.
return;
}
}
}
long newSize = capacity() + (long) size;
BufferAllocator.checkSize(newSize);
int growth = size - writableBytes();
Buffer extension = bufs.length == 0? allocator.allocate(growth) : allocator.allocate(growth, order());
int growth = Math.max(size - writableBytes(), minimumGrowth);
BufferAllocator.checkSize(capacity() + (long) growth);
Buffer extension = allocator.allocate(growth, order());
unsafeExtendWith(extension);
}
@ -739,7 +749,14 @@ final class CompositeBuffer extends RcSupport<Buffer, CompositeBuffer> implement
}
@Override
public Buffer bifurcate() {
public Buffer bifurcate(int splitOffset) {
if (splitOffset < 0) {
throw new IllegalArgumentException("The split offset cannot be negative: " + splitOffset + '.');
}
if (capacity() < splitOffset) {
throw new IllegalArgumentException("The split offset cannot be greater than the buffer capacity, " +
"but the split offset was " + splitOffset + ", and capacity is " + capacity() + '.');
}
if (!isOwned()) {
throw new IllegalStateException("Cannot bifurcate a buffer that is not owned.");
}
@ -748,12 +765,12 @@ final class CompositeBuffer extends RcSupport<Buffer, CompositeBuffer> implement
return new CompositeBuffer(allocator, bufs, unsafeGetDrop(), true).order(order);
}
int i = searchOffsets(woff);
int off = woff - offsets[i];
int i = searchOffsets(splitOffset);
int off = splitOffset - offsets[i];
Buffer[] bifs = Arrays.copyOf(bufs, off == 0? i : 1 + i);
bufs = Arrays.copyOfRange(bufs, off == bufs[i].capacity()? 1 + i : i, bufs.length);
if (off > 0 && bifs.length > 0 && off < bifs[bifs.length - 1].capacity()) {
bifs[bifs.length - 1] = bufs[0].bifurcate();
bifs[bifs.length - 1] = bufs[0].bifurcate(off);
}
computeBufferOffsets();
try {

View File

@ -101,7 +101,7 @@ public interface Send<T extends Rc<T>> extends Deref<T> {
* @param <R> The result type of the mapping function.
* @return A new {@link Send} instance that will deliver an object that is the result of the mapping.
*/
default <R extends Rc<R>> Send<R> map(Class<R> type, Function<T, ? extends R> mapper) {
default <R extends Rc<R>> Send<R> map(Class<R> type, Function<T, R> mapper) {
return sending(type, () -> mapper.apply(receive()));
}

View File

@ -364,7 +364,7 @@ class NioBuffer extends RcSupport<Buffer, NioBuffer> implements Buffer, Readable
}
@Override
public void ensureWritable(int size, boolean allowCompaction) {
public void ensureWritable(int size, int minimumGrowth, boolean allowCompaction) {
if (!isOwned()) {
throw attachTrace(new IllegalStateException(
"Buffer is not owned. Only owned buffers can call ensureWritable."));
@ -372,6 +372,9 @@ class NioBuffer extends RcSupport<Buffer, NioBuffer> implements Buffer, Readable
if (size < 0) {
throw new IllegalArgumentException("Cannot ensure writable for a negative size: " + size + '.');
}
if (minimumGrowth < 0) {
throw new IllegalArgumentException("The minimum growth cannot be negative: " + minimumGrowth + '.');
}
if (rmem != wmem) {
throw bufferIsReadOnly();
}
@ -387,7 +390,7 @@ class NioBuffer extends RcSupport<Buffer, NioBuffer> implements Buffer, Readable
}
// Allocate a bigger buffer.
long newSize = capacity() + size - (long) writableBytes();
long newSize = capacity() + (long) Math.max(size - writableBytes(), minimumGrowth);
BufferAllocator.checkSize(newSize);
ByteBuffer buffer = (ByteBuffer) control.allocateUntethered(this, (int) newSize);
buffer.order(order());
@ -414,26 +417,33 @@ class NioBuffer extends RcSupport<Buffer, NioBuffer> implements Buffer, Readable
}
@Override
public Buffer bifurcate() {
public Buffer bifurcate(int splitOffset) {
if (splitOffset < 0) {
throw new IllegalArgumentException("The split offset cannot be negative: " + splitOffset + '.');
}
if (capacity() < splitOffset) {
throw new IllegalArgumentException("The split offset cannot be greater than the buffer capacity, " +
"but the split offset was " + splitOffset + ", and capacity is " + capacity() + '.');
}
if (!isOwned()) {
throw attachTrace(new IllegalStateException("Cannot bifurcate a buffer that is not owned."));
}
var drop = (ArcDrop<NioBuffer>) unsafeGetDrop();
unsafeSetDrop(new ArcDrop<>(drop));
var bifurcatedBuffer = rmem.slice(0, woff);
var bifurcatedBuffer = rmem.slice(0, splitOffset);
// TODO maybe incrementing the existing ArcDrop is enough; maybe we don't need to wrap it in another ArcDrop.
var bifurcatedBuf = new NioBuffer(base, bifurcatedBuffer, control, new ArcDrop<>(drop.increment()));
bifurcatedBuf.woff = woff;
bifurcatedBuf.roff = roff;
bifurcatedBuf.woff = Math.min(woff, splitOffset);
bifurcatedBuf.roff = Math.min(roff, splitOffset);
bifurcatedBuf.order(order());
boolean readOnly = readOnly();
bifurcatedBuf.readOnly(readOnly);
rmem = rmem.slice(woff, rmem.capacity() - woff);
rmem = rmem.slice(splitOffset, rmem.capacity() - splitOffset);
if (!readOnly) {
wmem = rmem;
}
woff = 0;
roff = 0;
woff = Math.max(woff, splitOffset) - splitOffset;
roff = Math.max(roff, splitOffset) - splitOffset;
return bifurcatedBuf;
}

View File

@ -490,7 +490,7 @@ class MemSegBuffer extends RcSupport<Buffer, MemSegBuffer> implements Buffer, Re
}
@Override
public void ensureWritable(int size, boolean allowCompaction) {
public void ensureWritable(int size, int minimumGrowth, boolean allowCompaction) {
if (!isOwned()) {
throw attachTrace(new IllegalStateException(
"Buffer is not owned. Only owned buffers can call ensureWritable."));
@ -498,6 +498,9 @@ class MemSegBuffer extends RcSupport<Buffer, MemSegBuffer> implements Buffer, Re
if (size < 0) {
throw new IllegalArgumentException("Cannot ensure writable for a negative size: " + size + '.');
}
if (minimumGrowth < 0) {
throw new IllegalArgumentException("The minimum growth cannot be negative: " + minimumGrowth + '.');
}
if (seg != wseg) {
throw bufferIsReadOnly();
}
@ -513,7 +516,7 @@ class MemSegBuffer extends RcSupport<Buffer, MemSegBuffer> implements Buffer, Re
}
// Allocate a bigger buffer.
long newSize = capacity() + size - (long) writableBytes();
long newSize = capacity() + (long) Math.max(size - writableBytes(), minimumGrowth);
BufferAllocator.checkSize(newSize);
MemorySegment newSegment = (MemorySegment) alloc.allocateUntethered(this, (int) newSize);
@ -545,25 +548,32 @@ class MemSegBuffer extends RcSupport<Buffer, MemSegBuffer> implements Buffer, Re
}
@Override
public Buffer bifurcate() {
public Buffer bifurcate(int splitOffset) {
if (splitOffset < 0) {
throw new IllegalArgumentException("The split offset cannot be negative: " + splitOffset + '.');
}
if (capacity() < splitOffset) {
throw new IllegalArgumentException("The split offset cannot be greater than the buffer capacity, " +
"but the split offset was " + splitOffset + ", and capacity is " + capacity() + '.');
}
if (!isOwned()) {
throw attachTrace(new IllegalStateException("Cannot bifurcate a buffer that is not owned."));
}
var drop = (ArcDrop<MemSegBuffer>) unsafeGetDrop();
unsafeSetDrop(new ArcDrop<>(drop));
var bifurcatedSeg = seg.asSlice(0, woff);
var bifurcatedSeg = seg.asSlice(0, splitOffset);
var bifurcatedBuf = new MemSegBuffer(base, bifurcatedSeg, new ArcDrop<>(drop.increment()), alloc);
bifurcatedBuf.woff = woff;
bifurcatedBuf.roff = roff;
bifurcatedBuf.woff = Math.min(woff, splitOffset);
bifurcatedBuf.roff = Math.min(roff, splitOffset);
bifurcatedBuf.order(order);
boolean readOnly = readOnly();
bifurcatedBuf.readOnly(readOnly);
seg = seg.asSlice(woff, seg.byteSize() - woff);
seg = seg.asSlice(splitOffset, seg.byteSize() - splitOffset);
if (!readOnly) {
wseg = seg;
}
woff = 0;
roff = 0;
woff = Math.max(woff, splitOffset) - splitOffset;
roff = Math.max(roff, splitOffset) - splitOffset;
return bifurcatedBuf;
}

View File

@ -398,7 +398,7 @@ public class UnsafeBuffer extends RcSupport<Buffer, UnsafeBuffer> implements Buf
}
@Override
public void ensureWritable(int size, boolean allowCompaction) {
public void ensureWritable(int size, int minimumGrowth, boolean allowCompaction) {
if (!isOwned()) {
throw attachTrace(new IllegalStateException(
"Buffer is not owned. Only owned buffers can call ensureWritable."));
@ -406,6 +406,9 @@ public class UnsafeBuffer extends RcSupport<Buffer, UnsafeBuffer> implements Buf
if (size < 0) {
throw new IllegalArgumentException("Cannot ensure writable for a negative size: " + size + '.');
}
if (minimumGrowth < 0) {
throw new IllegalArgumentException("The minimum growth cannot be negative: " + minimumGrowth + '.');
}
if (rsize != wsize) {
throw bufferIsReadOnly();
}
@ -421,7 +424,7 @@ public class UnsafeBuffer extends RcSupport<Buffer, UnsafeBuffer> implements Buf
}
// Allocate a bigger buffer.
long newSize = capacity() + size - (long) writableBytes();
long newSize = capacity() + (long) Math.max(size - writableBytes(), minimumGrowth);
BufferAllocator.checkSize(newSize);
UnsafeMemory memory = (UnsafeMemory) control.allocateUntethered(this, (int) newSize);
@ -455,27 +458,34 @@ public class UnsafeBuffer extends RcSupport<Buffer, UnsafeBuffer> implements Buf
}
@Override
public Buffer bifurcate() {
public Buffer bifurcate(int splitOffset) {
if (splitOffset < 0) {
throw new IllegalArgumentException("The split offset cannot be negative: " + splitOffset + '.');
}
if (capacity() < splitOffset) {
throw new IllegalArgumentException("The split offset cannot be greater than the buffer capacity, " +
"but the split offset was " + splitOffset + ", and capacity is " + capacity() + '.');
}
if (!isOwned()) {
throw attachTrace(new IllegalStateException("Cannot bifurcate a buffer that is not owned."));
}
var drop = (ArcDrop<UnsafeBuffer>) unsafeGetDrop();
unsafeSetDrop(new ArcDrop<>(drop));
// TODO maybe incrementing the existing ArcDrop is enough; maybe we don't need to wrap it in another ArcDrop.
var bifurcatedBuf = new UnsafeBuffer(memory, baseOffset, woff, control, new ArcDrop<>(drop.increment()));
bifurcatedBuf.woff = woff;
bifurcatedBuf.roff = roff;
var bifurcatedBuf = new UnsafeBuffer(memory, baseOffset, splitOffset, control, new ArcDrop<>(drop.increment()));
bifurcatedBuf.woff = Math.min(woff, splitOffset);
bifurcatedBuf.roff = Math.min(roff, splitOffset);
bifurcatedBuf.order(order());
boolean readOnly = readOnly();
bifurcatedBuf.readOnly(readOnly);
rsize -= woff;
baseOffset += woff;
address += woff;
rsize -= splitOffset;
baseOffset += splitOffset;
address += splitOffset;
if (!readOnly) {
wsize = rsize;
}
woff = 0;
roff = 0;
woff = Math.max(woff, splitOffset) - splitOffset;
roff = Math.max(roff, splitOffset) - splitOffset;
return bifurcatedBuf;
}

View File

@ -18,7 +18,7 @@ module netty.incubator.buffer {
requires io.netty.common;
requires io.netty.buffer;
// Optional dependencies, needed for some of the examples.
// Optional dependencies, needed for some examples.
requires static java.logging;
exports io.netty.buffer.api;

View File

@ -19,6 +19,7 @@ import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import java.nio.ByteBuffer;
import java.util.Arrays;
import static java.nio.ByteOrder.BIG_ENDIAN;
import static java.nio.ByteOrder.LITTLE_ENDIAN;
@ -301,4 +302,58 @@ public class BufferBulkAccessTest extends BufferTestSupport {
assertThat(buf.nativeAddress()).isNotZero();
}
}
private static final Memoize<Fixture[]> OTHER_FIXTURES = new Memoize<Fixture[]>(
() -> Arrays.stream(allocators()).filter(filterOfTheDay(10)).toArray(Fixture[]::new));
@ParameterizedTest
@MethodSource("allocators")
public void writeBytesMustTransferDataAndUpdateOffsets(Fixture fixture) {
try (BufferAllocator alloc1 = fixture.createAllocator()) {
// Only test 10% of available combinations. Otherwise, this takes too long.
Fixture[] allocators = OTHER_FIXTURES.get();
Arrays.stream(allocators).parallel().forEach(otherFixture -> {
try (BufferAllocator alloc2 = otherFixture.createAllocator();
Buffer target = alloc1.allocate(37);
Buffer source = alloc2.allocate(35)) {
// BE to BE
target.order(BIG_ENDIAN);
source.order(BIG_ENDIAN);
verifyWriteBytes(target, source);
// LE to BE
target.fill((byte) 0).reset().order(BIG_ENDIAN);
source.fill((byte) 0).reset().order(LITTLE_ENDIAN);
verifyWriteBytes(target, source);
// BE to LE
target.fill((byte) 0).reset().order(LITTLE_ENDIAN);
source.fill((byte) 0).reset().order(BIG_ENDIAN);
verifyWriteBytes(target, source);
// LE to LE
target.fill((byte) 0).reset().order(LITTLE_ENDIAN);
source.fill((byte) 0).reset().order(BIG_ENDIAN);
verifyWriteBytes(target, source);
} catch (Exception e) {
e.addSuppressed(new RuntimeException("other fixture was: " + otherFixture));
throw e;
}
});
}
}
private static void verifyWriteBytes(Buffer target, Buffer source) {
for (int i = 0; i < 35; i++) {
source.writeByte((byte) (i + 1));
}
target.writeBytes(source);
assertThat(target.readerOffset()).isZero();
assertThat(target.writerOffset()).isEqualTo(35);
assertThat(source.readerOffset()).isEqualTo(35);
assertThat(source.writerOffset()).isEqualTo(35);
try (Buffer readableSlice = target.slice()) {
assertEquals(source, readableSlice);
}
}
}

View File

@ -53,8 +53,8 @@ public class BufferEnsureWritableTest extends BufferTestSupport {
@MethodSource("allocators")
public void ensureWritableMustThrowIfRequestedSizeWouldGrowBeyondMaxAllowed(Fixture fixture) {
try (BufferAllocator allocator = fixture.createAllocator();
Buffer buf = allocator.allocate(512)) {
assertThrows(IllegalArgumentException.class, () -> buf.ensureWritable(Integer.MAX_VALUE - 8));
Buffer buf = allocator.allocate(8)) {
assertThrows(IllegalArgumentException.class, () -> buf.ensureWritable(Integer.MAX_VALUE - 7));
}
}
@ -130,15 +130,31 @@ public class BufferEnsureWritableTest extends BufferTestSupport {
while (buf.readableBytes() > 0) {
buf.readByte();
}
buf.ensureWritable(4, true);
buf.ensureWritable(4, 4, true);
buf.writeInt(42);
assertThat(buf.capacity()).isEqualTo(64);
buf.writerOffset(60).readerOffset(60);
buf.ensureWritable(8, true);
buf.ensureWritable(8, 8, true);
buf.writeLong(42);
// Don't assert the capacity on this one, because single-component
// composite buffers may choose to allocate rather than compact.
}
}
@ParameterizedTest
@MethodSource("allocators")
public void ensureWritableWithLargeMinimumGrowthMustGrowByAtLeastThatMuch(Fixture fixture) {
try (BufferAllocator allocator = fixture.createAllocator();
Buffer buf = allocator.allocate(16)) {
buf.writeLong(0).writeInt(0);
buf.readLong();
buf.readInt(); // Compaction is now possible as well.
buf.ensureWritable(8, 32, true); // We don't need to allocate.
assertThat(buf.capacity()).isEqualTo(16);
buf.writeByte((byte) 1);
buf.ensureWritable(16, 32, true); // Now we DO need to allocate, because we can't compact.
assertThat(buf.capacity()).isEqualTo(16 /* existing capacity */ + 32 /* minimum growth */);
}
}
}

View File

@ -401,6 +401,26 @@ public class BufferReferenceCountingTest extends BufferTestSupport {
}
}
@ParameterizedTest
@MethodSource("allocators")
public void bifurcateWithNegativeOffsetMustThrow(Fixture fixture) {
try (BufferAllocator allocator = fixture.createAllocator();
Buffer buf = allocator.allocate(8)) {
buf.bifurcate(0).close();
assertThrows(IllegalArgumentException.class, () -> buf.bifurcate(-1));
}
}
@ParameterizedTest
@MethodSource("allocators")
public void bifurcateWithOversizedOffsetMustThrow(Fixture fixture) {
try (BufferAllocator allocator = fixture.createAllocator();
Buffer buf = allocator.allocate(8)) {
assertThrows(IllegalArgumentException.class, () -> buf.bifurcate(9));
buf.bifurcate(8).close();
}
}
@ParameterizedTest
@MethodSource("allocators")
public void bifurcateOfNonOwnedBufferMustThrow(Fixture fixture) {
@ -414,6 +434,53 @@ public class BufferReferenceCountingTest extends BufferTestSupport {
}
}
@ParameterizedTest
@MethodSource("allocators")
public void bifurcateOnOffsetOfNonOwnedBufferMustThrow(Fixture fixture) {
try (BufferAllocator allocator = fixture.createAllocator();
Buffer buf = allocator.allocate(8)) {
try (Buffer acquired = buf.acquire()) {
var exc = assertThrows(IllegalStateException.class, () -> acquired.bifurcate(4));
assertThat(exc).hasMessageContaining("owned");
}
}
}
@ParameterizedTest
@MethodSource("allocators")
public void bifurcateOnOffsetMustTruncateGreaterOffsets(Fixture fixture) {
try (BufferAllocator allocator = fixture.createAllocator();
Buffer buf = allocator.allocate(8)) {
buf.writeInt(0x01020304);
buf.writeByte((byte) 0x05);
buf.readInt();
try (Buffer bif = buf.bifurcate(2)) {
assertThat(buf.readerOffset()).isEqualTo(2);
assertThat(buf.writerOffset()).isEqualTo(3);
assertThat(bif.readerOffset()).isEqualTo(2);
assertThat(bif.writerOffset()).isEqualTo(2);
}
}
}
@ParameterizedTest
@MethodSource("allocators")
public void bifurcateOnOffsetMustExtendLesserOffsets(Fixture fixture) {
try (BufferAllocator allocator = fixture.createAllocator();
Buffer buf = allocator.allocate(8)) {
buf.writeInt(0x01020304);
buf.readInt();
try (Buffer bif = buf.bifurcate(6)) {
assertThat(buf.readerOffset()).isEqualTo(0);
assertThat(buf.writerOffset()).isEqualTo(0);
assertThat(bif.readerOffset()).isEqualTo(4);
assertThat(bif.writerOffset()).isEqualTo(4);
}
}
}
@ParameterizedTest
@MethodSource("allocators")
public void bifurcatedPartMustContainFirstHalfOfBuffer(Fixture fixture) {
@ -507,6 +574,27 @@ public class BufferReferenceCountingTest extends BufferTestSupport {
}
}
@ParameterizedTest
@MethodSource("allocators")
public void mustBePossibleToBifurcateOwnedSlices(Fixture fixture) {
try (BufferAllocator allocator = fixture.createAllocator()) {
Buffer buf = allocator.allocate(16).order(BIG_ENDIAN);
buf.writeLong(0x0102030405060708L);
try (Buffer slice = buf.slice()) {
buf.close();
assertTrue(slice.isOwned());
try (Buffer bifurcate = slice.bifurcate(4)) {
bifurcate.reset().ensureWritable(Long.BYTES);
slice.reset().ensureWritable(Long.BYTES);
assertThat(bifurcate.capacity()).isEqualTo(Long.BYTES);
assertThat(slice.capacity()).isEqualTo(Long.BYTES);
assertThat(bifurcate.getLong(0)).isEqualTo(0x01020304_00000000L);
assertThat(slice.getLong(0)).isEqualTo(0x05060708_00000000L);
}
}
}
}
@ParameterizedTest
@MethodSource("allocators")
public void bifurcatedBufferMustHaveSameByteOrderAsParent(Fixture fixture) {

View File

@ -92,13 +92,15 @@ public abstract class BufferTestSupport {
if ("nosample".equalsIgnoreCase(sampleSetting)) {
return fixture -> true;
}
Instant today = Instant.now().truncatedTo(ChronoUnit.DAYS);
// Filter out 95% of tests.
return filterOfTheDay(5);
}
protected static Predicate<Fixture> filterOfTheDay(int percentage) {
Instant today = Instant.now().truncatedTo(ChronoUnit.DAYS); // New seed every day.
SplittableRandom rng = new SplittableRandom(today.hashCode());
AtomicInteger counter = new AtomicInteger();
return fixture -> {
boolean res = counter.getAndIncrement() < 1 || rng.nextInt(0, 100) <= 2;
return res;
}; // Filter out 97% of tests.
return fixture -> counter.getAndIncrement() < 1 || rng.nextInt(0, 100) < percentage;
}
static Fixture[] allocators() {
@ -977,6 +979,10 @@ public abstract class BufferTestSupport {
return bs;
}
public static void assertEquals(Buffer expected, Buffer actual) {
assertThat(toByteArray(actual)).containsExactly(toByteArray(expected));
}
public static void assertEquals(byte expected, byte actual) {
if (expected != actual) {
fail(String.format("expected: %1$s (0x%1$X) but was: %2$s (0x%2$X)", expected, actual));

View File

@ -0,0 +1,115 @@
/*
* Copyright 2021 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.buffer.api.examples.bytetomessagedecoder;
import io.netty.buffer.api.Buffer;
import io.netty.buffer.api.BufferAllocator;
import io.netty.buffer.api.Send;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import java.util.Objects;
public abstract class AlternativeMessageDecoder extends ChannelHandlerAdapter {
public static final int DEFAULT_CHUNK_SIZE = 1 << 13; // 8 KiB
private Buffer collector;
private BufferAllocator allocator;
protected AlternativeMessageDecoder() {
allocator = initAllocator();
collector = initCollector(allocator, DEFAULT_CHUNK_SIZE);
}
protected BufferAllocator initAllocator() {
return BufferAllocator.heap();
}
protected Buffer initCollector(BufferAllocator allocator, int defaultChunkSize) {
return allocator.allocate(defaultChunkSize);
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
drainCollector(ctx);
collector.close();
super.handlerRemoved(ctx);
}
private void drainCollector(ChannelHandlerContext ctx) {
boolean madeProgress;
do {
madeProgress = decodeAndFireRead(ctx, collector);
} while (madeProgress);
}
protected abstract boolean decodeAndFireRead(ChannelHandlerContext ctx, Buffer input);
public BufferAllocator getAllocator() {
return allocator;
}
public void setAllocator(BufferAllocator allocator) {
this.allocator = Objects.requireNonNull(allocator, "BufferAllocator cannot be null.");
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof Buffer) {
try (Buffer input = (Buffer) msg) {
processRead(ctx, input);
}
} else if (Send.isSendOf(Buffer.class, msg)) {
//noinspection unchecked
try (Buffer input = ((Send<Buffer>) msg).receive()) {
processRead(ctx, input);
}
} else {
super.channelRead(ctx, msg);
}
}
private void processRead(ChannelHandlerContext ctx, Buffer input) {
if (collector.isOwned() && Buffer.isComposite(collector) && input.isOwned()
&& (collector.writableBytes() == 0 || input.writerOffset() == 0)
&& (collector.readableBytes() == 0 || input.readerOffset() == 0)
&& collector.order() == input.order()) {
Buffer.extendComposite(collector, input);
drainCollector(ctx);
return;
}
if (collector.isOwned()) {
collector.ensureWritable(input.readableBytes(), DEFAULT_CHUNK_SIZE, true);
} else {
int requiredCapacity = input.readableBytes() + collector.readableBytes();
int allocationSize = Math.max(requiredCapacity, DEFAULT_CHUNK_SIZE);
try (Buffer newBuffer = allocator.allocate(allocationSize, input.order())) {
newBuffer.writeBytes(collector);
collector.close();
collector = newBuffer.acquire();
}
}
collector.writeBytes(input);
drainCollector(ctx);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
if (ctx.channel().config().isAutoRead()) {
ctx.read();
}
ctx.fireChannelReadComplete();
}
}

View File

@ -0,0 +1,99 @@
/*
* Copyright 2021 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.buffer.api.examples.bytetomessagedecoder;
import io.netty.buffer.api.Buffer;
import io.netty.buffer.api.BufferAllocator;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.embedded.EmbeddedChannel;
import org.junit.jupiter.api.Test;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.SplittableRandom;
import static io.netty.buffer.api.BufferTestSupport.toByteArray;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class AlternativeMessageDecoderTest {
@Test
public void splitAndParseMessagesDownThePipeline() {
EmbeddedChannel channel = new EmbeddedChannel(new AlternativeMessageDecoder() {
@Override
protected boolean decodeAndFireRead(ChannelHandlerContext ctx, Buffer input) {
// Can we read our length header?
if (input.readableBytes() < 4) {
return false;
}
int start = input.readerOffset();
int length = input.readInt();
// Can we read the rest of the message?
if (input.readableBytes() < length) {
input.readerOffset(start);
return false;
}
// We can read our message in full.
Buffer messageBuffer = input.bifurcate(input.readerOffset() + length);
ctx.fireChannelRead(messageBuffer);
return true;
}
});
List<byte[]> messages = new ArrayList<>();
Buffer messagesBuffer = BufferAllocator.heap().allocate(132 * 1024);
SplittableRandom rng = new SplittableRandom(42);
for (int i = 0; i < 1000; i++) {
byte[] message = new byte[rng.nextInt(4, 256)];
rng.nextBytes(message);
message[0] = (byte) (i >> 24);
message[1] = (byte) (i >> 16);
message[2] = (byte) (i >> 8);
message[3] = (byte) i;
messages.add(message);
messagesBuffer.ensureWritable(4 + message.length, 1024, false);
messagesBuffer.writeInt(message.length);
for (byte b : message) {
messagesBuffer.writeByte(b);
}
}
while (messagesBuffer.readableBytes() > 0) {
int length = rng.nextInt(1, Math.min(500, messagesBuffer.readableBytes() + 1));
if (length == messagesBuffer.readableBytes()) {
channel.writeInbound(messagesBuffer);
} else {
channel.writeInbound(messagesBuffer.bifurcate(length));
}
}
Iterator<byte[]> expectedItr = messages.iterator();
Buffer actualMessage;
while ((actualMessage = channel.readInbound()) != null) {
try (Buffer ignore = actualMessage) {
assertTrue(expectedItr.hasNext());
try (Buffer actual = actualMessage.slice()) {
assertThat(toByteArray(actual)).containsExactly(expectedItr.next());
}
}
}
assertFalse(expectedItr.hasNext());
}
}

View File

@ -0,0 +1,748 @@
/*
* Copyright 2021 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.buffer.api.examples.bytetomessagedecoder;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.api.Buffer;
import io.netty.buffer.api.BufferAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelProgressivePromise;
import io.netty.channel.ChannelPromise;
import io.netty.channel.socket.ChannelInputShutdownEvent;
import io.netty.handler.codec.DecoderException;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.FixedLengthFrameDecoder;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.util.Attribute;
import io.netty.util.AttributeKey;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.internal.MathUtil;
import io.netty.util.internal.StringUtil;
import java.net.SocketAddress;
import static io.netty.util.internal.ObjectUtil.checkPositive;
import static java.util.Objects.requireNonNull;
/**
* {@link ChannelHandler} which decodes bytes in a stream-like fashion from one {@link Buffer} to an
* other Message type.
*
* For example here is an implementation which reads all readable bytes from
* the input {@link Buffer}, creates a new {@link Buffer} and forward it to the
* next {@link ChannelHandler} in the {@link ChannelPipeline}.
*
* <pre>
* public class SquareDecoder extends {@link ByteToMessageDecoder} {
* {@code @Override}
* public void decode({@link ChannelHandlerContext} ctx, {@link Buffer} in)
* throws {@link Exception} {
* ctx.fireChannelRead(in.bifurcate());
* }
* }
* </pre>
*
* <h3>Frame detection</h3>
* <p>
* Generally frame detection should be handled earlier in the pipeline by adding a
* {@link DelimiterBasedFrameDecoder}, {@link FixedLengthFrameDecoder}, {@link LengthFieldBasedFrameDecoder},
* or {@link LineBasedFrameDecoder}.
* <p>
* If a custom frame decoder is required, then one needs to be careful when implementing
* one with {@link ByteToMessageDecoder}. Ensure there are enough bytes in the buffer for a
* complete frame by checking {@link Buffer#readableBytes()}. If there are not enough bytes
* for a complete frame, return without modifying the reader index to allow more bytes to arrive.
* <p>
* To check for complete frames without modifying the reader index, use methods like
* {@link Buffer#getInt(int)}.
* One <strong>MUST</strong> use the reader index when using methods like
* {@link Buffer#getInt(int)}.
* For example calling <tt>in.getInt(0)</tt> is assuming the frame starts at the beginning of the buffer, which
* is not always the case. Use <tt>in.getInt(in.readerIndex())</tt> instead.
* <h3>Pitfalls</h3>
* <p>
* Be aware that sub-classes of {@link ByteToMessageDecoder} <strong>MUST NOT</strong>
* annotated with {@link @Sharable}.
* <p>
* Some methods such as {@link Buffer#bifurcate(int)} will cause a memory leak if the returned buffer
* is not released or fired through the {@link ChannelPipeline} via
* {@link ChannelHandlerContext#fireChannelRead(Object)}.
*/
public abstract class ByteToMessageDecoder extends ChannelHandlerAdapter {
/**
* Cumulate {@link Buffer}s by merge them into one {@link Buffer}'s, using memory copies.
*/
public static final Cumulator MERGE_CUMULATOR = (alloc, cumulation, in) -> {
if (cumulation.readableBytes() == 0 && !Buffer.isComposite(cumulation)) {
// If cumulation is empty and input buffer is contiguous, use it directly
cumulation.close();
return in;
}
// We must release 'in' in all cases as otherwise it may produce a leak if writeBytes(...) throw
// for whatever release (for example because of OutOfMemoryError)
try (in) {
final int required = in.readableBytes();
if (required > cumulation.writableBytes() || !cumulation.isOwned() || cumulation.readOnly()) {
// Expand cumulation (by replacing it) under the following conditions:
// - cumulation cannot be resized to accommodate the additional data
// - cumulation can be expanded with a reallocation operation to accommodate but the buffer is
// assumed to be shared (e.g. refCnt() > 1) and the reallocation may not be safe.
return expandCumulation(alloc, cumulation, in);
}
cumulation.writeBytes(in);
return cumulation;
}
};
/**
* Cumulate {@link Buffer}s by add them to a composite buffer and so do no memory copy whenever
* possible.
* Be aware that composite buffer use a more complex indexing implementation so depending on your use-case
* and the decoder implementation this may be slower then just use the {@link #MERGE_CUMULATOR}.
*/
public static final Cumulator COMPOSITE_CUMULATOR = (alloc, cumulation, in) -> {
if (cumulation.readableBytes() == 0) {
cumulation.close();
return in;
}
Buffer composite;
try (in) {
if (Buffer.isComposite(cumulation) && cumulation.isOwned()) {
composite = cumulation;
if (composite.writerOffset() != composite.capacity()) {
// Writer index must equal capacity if we are going to "write"
// new components to the end.
composite = cumulation.slice(0, composite.writerOffset());
cumulation.close();
}
} else {
composite = Buffer.compose(alloc, cumulation);
}
Buffer.extendComposite(composite, in);
return composite;
}
};
Buffer cumulation;
private Cumulator cumulator = MERGE_CUMULATOR;
private boolean singleDecode;
private boolean first;
/**
* This flag is used to determine if we need to call {@link ChannelHandlerContext#read()} to consume more data
* when {@link ChannelConfig#isAutoRead()} is {@code false}.
*/
private boolean firedChannelRead;
private int discardAfterReads = 16;
private int numReads;
private ByteToMessageDecoderContext context;
protected ByteToMessageDecoder() {
ensureNotSharable();
}
/**
* If set then only one message is decoded on each {@link #channelRead(ChannelHandlerContext, Object)}
* call. This may be useful if you need to do some protocol upgrade and want to make sure nothing is mixed up.
*
* Default is {@code false} as this has performance impacts.
*/
public void setSingleDecode(boolean singleDecode) {
this.singleDecode = singleDecode;
}
/**
* If {@code true} then only one message is decoded on each
* {@link #channelRead(ChannelHandlerContext, Object)} call.
*
* Default is {@code false} as this has performance impacts.
*/
public boolean isSingleDecode() {
return singleDecode;
}
/**
* Set the {@link Cumulator} to use for cumulate the received {@link Buffer}s.
*/
public void setCumulator(Cumulator cumulator) {
requireNonNull(cumulator, "cumulator");
this.cumulator = cumulator;
}
/**
* Set the number of reads after which {@link Buffer#compact()} is called to free up memory.
* The default is {@code 16}.
*/
public void setDiscardAfterReads(int discardAfterReads) {
checkPositive(discardAfterReads, "discardAfterReads");
this.discardAfterReads = discardAfterReads;
}
/**
* Returns the actual number of readable bytes in the internal cumulative
* buffer of this decoder. You usually do not need to rely on this value
* to write a decoder. Use it only when you must use it at your own risk.
* This method is a shortcut to {@link #internalBuffer() internalBuffer().readableBytes()}.
*/
protected int actualReadableBytes() {
return internalBuffer().readableBytes();
}
/**
* Returns the internal cumulative buffer of this decoder. You usually
* do not need to access the internal buffer directly to write a decoder.
* Use it only when you must use it at your own risk.
*/
protected Buffer internalBuffer() {
if (cumulation != null) {
return cumulation;
} else {
return newEmptyBuffer();
}
}
private static Buffer newEmptyBuffer() {
return Buffer.compose(BufferAllocator.heap());
}
@Override
public final void handlerAdded(ChannelHandlerContext ctx) throws Exception {
context = new ByteToMessageDecoderContext(ctx);
handlerAdded0(context);
}
protected void handlerAdded0(ChannelHandlerContext ctx) throws Exception {
}
@Override
public final void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
Buffer buf = cumulation;
if (buf != null) {
// Directly set this to null so we are sure we not access it in any other method here anymore.
cumulation = null;
numReads = 0;
int readable = buf.readableBytes();
if (readable > 0) {
ctx.fireChannelRead(buf);
ctx.fireChannelReadComplete();
} else {
buf.close();
}
}
handlerRemoved0(context);
}
/**
* Gets called after the {@link ByteToMessageDecoder} was removed from the actual context and it doesn't handle
* events anymore.
*/
protected void handlerRemoved0(ChannelHandlerContext ctx) throws Exception { }
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof Buffer) {
try {
Buffer data = (Buffer) msg;
first = cumulation == null;
if (first) {
cumulation = data;
} else {
// ByteBufAllocator alloc = ctx.alloc(); // TODO this API integration needs more work
BufferAllocator alloc = BufferAllocator.heap();
cumulation = cumulator.cumulate(alloc, cumulation, data);
}
assert context.ctx == ctx || ctx == context;
callDecode(context, cumulation); // TODO we'll want to bifurcate here, and simplify lifetime handling
} catch (DecoderException e) {
throw e;
} catch (Exception e) {
throw new DecoderException(e);
} finally {
if (cumulation != null && cumulation.readableBytes() == 0) {
numReads = 0;
cumulation.close();
cumulation = null;
} else if (++ numReads >= discardAfterReads) {
// We did enough reads already try to discard some bytes so we not risk to see a OOME.
// See https://github.com/netty/netty/issues/4275
numReads = 0;
discardSomeReadBytes(); // TODO no need to so this dance because ensureWritable can compact for us
}
firedChannelRead |= context.fireChannelReadCallCount() > 0;
context.reset();
}
} else {
ctx.fireChannelRead(msg);
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
numReads = 0;
discardSomeReadBytes();
if (!firedChannelRead && !ctx.channel().config().isAutoRead()) {
ctx.read();
}
firedChannelRead = false;
ctx.fireChannelReadComplete();
}
protected final void discardSomeReadBytes() {
if (cumulation != null && !first && cumulation.isOwned()) {
// discard some bytes if possible to make more room in the
// buffer but only if the refCnt == 1 as otherwise the user may have
// used slice().retain() or duplicate().retain().
//
// See:
// - https://github.com/netty/netty/issues/2327
// - https://github.com/netty/netty/issues/1764
cumulation.compact();
}
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
assert context.ctx == ctx || ctx == context;
channelInputClosed(context, true);
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
ctx.fireUserEventTriggered(evt);
if (evt instanceof ChannelInputShutdownEvent) {
// The decodeLast method is invoked when a channelInactive event is encountered.
// This method is responsible for ending requests in some situations and must be called
// when the input has been shutdown.
assert context.ctx == ctx || ctx == context;
channelInputClosed(context, false);
}
}
private void channelInputClosed(ByteToMessageDecoderContext ctx, boolean callChannelInactive) {
try {
channelInputClosed(ctx);
} catch (DecoderException e) {
throw e;
} catch (Exception e) {
throw new DecoderException(e);
} finally {
if (cumulation != null) {
cumulation.close();
cumulation = null;
}
if (ctx.fireChannelReadCallCount() > 0) {
ctx.reset();
// Something was read, call fireChannelReadComplete()
ctx.fireChannelReadComplete();
}
if (callChannelInactive) {
ctx.fireChannelInactive();
}
}
}
/**
* Called when the input of the channel was closed which may be because it changed to inactive or because of
* {@link ChannelInputShutdownEvent}.
*/
void channelInputClosed(ByteToMessageDecoderContext ctx) throws Exception {
if (cumulation != null) {
callDecode(ctx, cumulation);
// If callDecode(...) removed the handle from the pipeline we should not call decodeLast(...) as this would
// be unexpected.
if (!ctx.isRemoved()) {
// Use Unpooled.EMPTY_BUFFER if cumulation become null after calling callDecode(...).
// See https://github.com/netty/netty/issues/10802.
Buffer buffer = cumulation == null ? newEmptyBuffer() : cumulation;
decodeLast(ctx, buffer);
}
} else {
decodeLast(ctx, newEmptyBuffer());
}
}
/**
* Called once data should be decoded from the given {@link Buffer}. This method will call
* {@link #decode(ChannelHandlerContext, Buffer)} as long as decoding should take place.
*
* @param ctx the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to
* @param in the {@link Buffer} from which to read data
*/
void callDecode(ByteToMessageDecoderContext ctx, Buffer in) {
try {
while (in.readableBytes() > 0 && !ctx.isRemoved()) {
int oldInputLength = in.readableBytes();
int numReadCalled = ctx.fireChannelReadCallCount();
decodeRemovalReentryProtection(ctx, in);
// Check if this handler was removed before continuing the loop.
// If it was removed, it is not safe to continue to operate on the buffer.
//
// See https://github.com/netty/netty/issues/1664
if (ctx.isRemoved()) {
break;
}
if (numReadCalled == ctx.fireChannelReadCallCount()) {
if (oldInputLength == in.readableBytes()) {
break;
} else {
continue;
}
}
if (oldInputLength == in.readableBytes()) {
throw new DecoderException(
StringUtil.simpleClassName(getClass()) +
".decode() did not read anything but decoded a message.");
}
if (isSingleDecode()) {
break;
}
}
} catch (DecoderException e) {
throw e;
} catch (Exception cause) {
throw new DecoderException(cause);
}
}
/**
* Decode the from one {@link Buffer} to an other. This method will be called till either the input
* {@link Buffer} has nothing to read when return from this method or till nothing was read from the input
* {@link Buffer}.
*
* @param ctx the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to
* @param in the {@link Buffer} from which to read data
* @throws Exception is thrown if an error occurs
*/
protected abstract void decode(ChannelHandlerContext ctx, Buffer in) throws Exception;
/**
* Decode the from one {@link Buffer} to an other. This method will be called till either the input
* {@link Buffer} has nothing to read when return from this method or till nothing was read from the input
* {@link Buffer}.
*
* @param ctx the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to
* @param in the {@link Buffer} from which to read data
* @throws Exception is thrown if an error occurs
*/
final void decodeRemovalReentryProtection(ChannelHandlerContext ctx, Buffer in)
throws Exception {
decode(ctx, in);
}
/**
* Is called one last time when the {@link ChannelHandlerContext} goes in-active. Which means the
* {@link #channelInactive(ChannelHandlerContext)} was triggered.
*
* By default this will just call {@link #decode(ChannelHandlerContext, Buffer)} but sub-classes may
* override this for some special cleanup operation.
*/
protected void decodeLast(ChannelHandlerContext ctx, Buffer in) throws Exception {
if (in.readableBytes() > 0) {
// Only call decode() if there is something left in the buffer to decode.
// See https://github.com/netty/netty/issues/4386
decodeRemovalReentryProtection(ctx, in);
}
}
private static Buffer expandCumulation(BufferAllocator alloc, Buffer oldCumulation, Buffer in) {
int newSize = MathUtil.safeFindNextPositivePowerOfTwo(oldCumulation.readableBytes() + in.readableBytes());
Buffer newCumulation = alloc.allocate(newSize, oldCumulation.order());
Buffer toRelease = newCumulation;
try {
newCumulation.writeBytes(oldCumulation);
newCumulation.writeBytes(in);
toRelease = oldCumulation;
return newCumulation;
} finally {
toRelease.close();
}
}
/**
* Cumulate {@link Buffer}s.
*/
public interface Cumulator {
/**
* Cumulate the given {@link Buffer}s and return the {@link Buffer} that holds the cumulated bytes.
* The implementation is responsible to correctly handle the life-cycle of the given {@link Buffer}s and so
* call {@link Buffer#close()} if a {@link Buffer} is fully consumed.
*/
Buffer cumulate(BufferAllocator alloc, Buffer cumulation, Buffer in);
}
// Package private so we can also make use of it in ReplayingDecoder.
static final class ByteToMessageDecoderContext implements ChannelHandlerContext {
private final ChannelHandlerContext ctx;
private int fireChannelReadCalled;
private ByteToMessageDecoderContext(ChannelHandlerContext ctx) {
this.ctx = ctx;
}
void reset() {
fireChannelReadCalled = 0;
}
int fireChannelReadCallCount() {
return fireChannelReadCalled;
}
@Override
public Channel channel() {
return ctx.channel();
}
@Override
public EventExecutor executor() {
return ctx.executor();
}
@Override
public String name() {
return ctx.name();
}
@Override
public ChannelHandler handler() {
return ctx.handler();
}
@Override
public boolean isRemoved() {
return ctx.isRemoved();
}
@Override
public ChannelHandlerContext fireChannelRegistered() {
ctx.fireChannelRegistered();
return this;
}
@Override
public ChannelHandlerContext fireChannelUnregistered() {
ctx.fireChannelUnregistered();
return this;
}
@Override
public ChannelHandlerContext fireChannelActive() {
ctx.fireChannelActive();
return this;
}
@Override
public ChannelHandlerContext fireChannelInactive() {
ctx.fireChannelInactive();
return this;
}
@Override
public ChannelHandlerContext fireExceptionCaught(Throwable cause) {
ctx.fireExceptionCaught(cause);
return this;
}
@Override
public ChannelHandlerContext fireUserEventTriggered(Object evt) {
ctx.fireUserEventTriggered(evt);
return this;
}
@Override
public ChannelHandlerContext fireChannelRead(Object msg) {
fireChannelReadCalled ++;
ctx.fireChannelRead(msg);
return this;
}
@Override
public ChannelHandlerContext fireChannelReadComplete() {
ctx.fireChannelReadComplete();
return this;
}
@Override
public ChannelHandlerContext fireChannelWritabilityChanged() {
ctx.fireChannelWritabilityChanged();
return this;
}
@Override
public ChannelHandlerContext read() {
ctx.read();
return this;
}
@Override
public ChannelHandlerContext flush() {
ctx.flush();
return this;
}
@Override
public ChannelPipeline pipeline() {
return ctx.pipeline();
}
@Override
public ByteBufAllocator alloc() {
return ctx.alloc();
}
@Override
@Deprecated
public <T> Attribute<T> attr(AttributeKey<T> key) {
return ctx.attr(key);
}
@Override
@Deprecated
public <T> boolean hasAttr(AttributeKey<T> key) {
return ctx.hasAttr(key);
}
@Override
public ChannelFuture bind(SocketAddress localAddress) {
return ctx.bind(localAddress);
}
@Override
public ChannelFuture connect(SocketAddress remoteAddress) {
return ctx.connect(remoteAddress);
}
@Override
public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
return ctx.connect(remoteAddress, localAddress);
}
@Override
public ChannelFuture disconnect() {
return ctx.disconnect();
}
@Override
public ChannelFuture close() {
return ctx.close();
}
@Override
public ChannelFuture deregister() {
return ctx.deregister();
}
@Override
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return ctx.bind(localAddress, promise);
}
@Override
public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
return ctx.connect(remoteAddress, promise);
}
@Override
public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
return ctx.connect(remoteAddress, localAddress, promise);
}
@Override
public ChannelFuture disconnect(ChannelPromise promise) {
return ctx.disconnect(promise);
}
@Override
public ChannelFuture close(ChannelPromise promise) {
return ctx.close(promise);
}
@Override
public ChannelFuture register() {
return ctx.register();
}
@Override
public ChannelFuture register(ChannelPromise promise) {
return ctx.register(promise);
}
@Override
public ChannelFuture deregister(ChannelPromise promise) {
return ctx.deregister(promise);
}
@Override
public ChannelFuture write(Object msg) {
return ctx.write(msg);
}
@Override
public ChannelFuture write(Object msg, ChannelPromise promise) {
return ctx.write(msg, promise);
}
@Override
public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
return ctx.writeAndFlush(msg, promise);
}
@Override
public ChannelFuture writeAndFlush(Object msg) {
return ctx.writeAndFlush(msg);
}
@Override
public ChannelPromise newPromise() {
return ctx.newPromise();
}
@Override
public ChannelProgressivePromise newProgressivePromise() {
return ctx.newProgressivePromise();
}
@Override
public ChannelFuture newSucceededFuture() {
return ctx.newSucceededFuture();
}
@Override
public ChannelFuture newFailedFuture(Throwable cause) {
return ctx.newFailedFuture(cause);
}
@Override
public ChannelPromise voidPromise() {
return ctx.voidPromise();
}
}
}

View File

@ -0,0 +1,553 @@
/*
* Copyright 2021 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.buffer.api.examples.bytetomessagedecoder;
import io.netty.buffer.api.Buffer;
import io.netty.buffer.api.BufferAllocator;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.channel.socket.ChannelInputShutdownEvent;
import org.junit.Test;
import org.mockito.stubbing.Answer;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import static io.netty.buffer.api.BufferAllocator.heap;
import static io.netty.buffer.api.BufferTestSupport.assertEquals;
import static java.nio.ByteOrder.BIG_ENDIAN;
import static java.nio.ByteOrder.LITTLE_ENDIAN;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.withSettings;
public class ByteToMessageDecoderTest {
@Test
public void testRemoveItself() {
EmbeddedChannel channel = new EmbeddedChannel(new ByteToMessageDecoder() {
private boolean removed;
@Override
protected void decode(ChannelHandlerContext ctx, Buffer in) {
assertFalse(removed);
in.readByte();
ctx.pipeline().remove(this);
removed = true;
}
});
try (Buffer buf = heap().allocate(4).writeInt(0x01020304)) {
channel.writeInbound(buf.slice());
try (Buffer b = channel.readInbound()) {
buf.readByte();
assertEquals(b, buf);
}
}
}
@Test
public void testRemoveItselfWriteBuffer() {
final Buffer buf = heap().allocate(5, BIG_ENDIAN).writeInt(0x01020304);
EmbeddedChannel channel = new EmbeddedChannel(new ByteToMessageDecoder() {
private boolean removed;
@Override
protected void decode(ChannelHandlerContext ctx, Buffer in) {
assertFalse(removed);
in.readByte();
ctx.pipeline().remove(this);
// This should not let it keep call decode
buf.writeByte((byte) 0x05);
removed = true;
}
});
channel.writeInbound(buf.slice());
try (Buffer expected = heap().allocate(3, BIG_ENDIAN).writeShort((short) 0x0203).writeByte((byte) 0x04);
Buffer b = channel.readInbound();
Buffer actual = b.slice(); // Only compare readable bytes.
buf) {
assertEquals(expected, actual);
}
}
/**
* Verifies that internal buffer of the ByteToMessageDecoder is released once decoder is removed from pipeline. In
* this case input is read fully.
*/
@Test
public void testInternalBufferClearReadAll() {
Buffer buf = heap().allocate(1).writeByte((byte) 'a');
EmbeddedChannel channel = newInternalBufferTestChannel();
assertFalse(channel.writeInbound(buf));
assertFalse(channel.finish());
}
/**
* Verifies that internal buffer of the ByteToMessageDecoder is released once decoder is removed from pipeline. In
* this case input was not fully read.
*/
@Test
public void testInternalBufferClearReadPartly() {
final Buffer buf = heap().allocate(2, BIG_ENDIAN).writeShort((short) 0x0102);
EmbeddedChannel channel = newInternalBufferTestChannel();
assertTrue(channel.writeInbound(buf));
assertTrue(channel.finish());
try (Buffer expected = heap().allocate(1).writeByte((byte) 0x02);
Buffer b = channel.readInbound();
Buffer actual = b.slice()) {
assertEquals(expected, actual);
assertNull(channel.readInbound());
}
}
private EmbeddedChannel newInternalBufferTestChannel() {
return new EmbeddedChannel(new ByteToMessageDecoder() {
@Override
protected void decode(ChannelHandlerContext ctx, Buffer in) {
Buffer buf = internalBuffer();
assertTrue(buf.isOwned());
in.readByte();
// Removal from pipeline should clear internal buffer
ctx.pipeline().remove(this);
}
@Override
protected void handlerRemoved0(ChannelHandlerContext ctx) {
assertCumulationReleased(internalBuffer());
}
});
}
@Test
public void handlerRemovedWillNotReleaseBufferIfDecodeInProgress() {
EmbeddedChannel channel = new EmbeddedChannel(new ByteToMessageDecoder() {
@Override
protected void decode(ChannelHandlerContext ctx, Buffer in) throws Exception {
ctx.pipeline().remove(this);
assertTrue(in.isAccessible());
}
@Override
protected void handlerRemoved0(ChannelHandlerContext ctx) {
assertCumulationReleased(internalBuffer());
}
});
byte[] bytes = new byte[1024];
ThreadLocalRandom.current().nextBytes(bytes);
Buffer buffer = heap().allocate(bytes.length);
for (byte b : bytes) {
buffer.writeByte(b);
}
assertTrue(channel.writeInbound(buffer));
assertTrue(channel.finishAndReleaseAll());
}
private static void assertCumulationReleased(Buffer buffer) {
assertTrue("unexpected value: " + buffer,
buffer == null || buffer.capacity() == 0 || !buffer.isAccessible());
}
@Test
public void testFireChannelReadCompleteOnInactive() throws InterruptedException {
final BlockingQueue<Integer> queue = new LinkedBlockingDeque<>();
EmbeddedChannel channel = new EmbeddedChannel(new ByteToMessageDecoder() {
@Override
protected void decode(ChannelHandlerContext ctx, Buffer in) {
int readable = in.readableBytes();
assertTrue(readable > 0);
in.readerOffset(in.readerOffset() + readable);
}
@Override
protected void decodeLast(ChannelHandlerContext ctx, Buffer in) {
assertEquals(0, in.readableBytes());
ctx.fireChannelRead("data");
}
}, new ChannelHandler() {
@Override
public void channelInactive(ChannelHandlerContext ctx) {
queue.add(3);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
queue.add(1);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
if (!ctx.channel().isActive()) {
queue.add(2);
}
}
});
Buffer buf = heap().allocate(2, BIG_ENDIAN).writeShort((short) 0x0102);
assertFalse(channel.writeInbound(buf));
channel.finish();
assertEquals(1, queue.take());
assertEquals(2, queue.take());
assertEquals(3, queue.take());
assertTrue(queue.isEmpty());
assertFalse(buf.isAccessible());
}
// See https://github.com/netty/netty/issues/4635
@Test
public void testRemoveWhileInCallDecode() {
final Object upgradeMessage = new Object();
final ByteToMessageDecoder decoder = new ByteToMessageDecoder() {
@Override
protected void decode(ChannelHandlerContext ctx, Buffer in) {
assertEquals(1, in.readByte());
ctx.fireChannelRead(upgradeMessage);
}
};
EmbeddedChannel channel = new EmbeddedChannel(decoder, new ChannelHandler() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
if (msg == upgradeMessage) {
ctx.pipeline().remove(decoder);
return;
}
ctx.fireChannelRead(msg);
}
});
try (Buffer buf = heap().allocate(4, BIG_ENDIAN).writeInt(0x01020304)) {
assertTrue(channel.writeInbound(buf.slice()));
try (Buffer expected = buf.slice(1, 3);
Buffer b = channel.readInbound();
Buffer actual = b.slice()) {
assertEquals(expected, actual);
assertFalse(channel.finish());
}
}
}
@Test
public void testDecodeLastEmptyBuffer() {
EmbeddedChannel channel = new EmbeddedChannel(new ByteToMessageDecoder() {
@Override
protected void decode(ChannelHandlerContext ctx, Buffer in) {
assertTrue(in.readableBytes() > 0);
Buffer slice = in.slice();
in.readerOffset(in.readerOffset() + in.readableBytes());
ctx.fireChannelRead(slice);
}
});
byte[] bytes = new byte[1024];
ThreadLocalRandom.current().nextBytes(bytes);
try (Buffer buf = heap().allocate(bytes.length)) {
for (byte b : bytes) {
buf.writeByte(b);
}
assertTrue(channel.writeInbound(buf.slice()));
try (Buffer b = channel.readInbound()) {
assertEquals(buf, b);
assertNull(channel.readInbound());
assertFalse(channel.finish());
assertNull(channel.readInbound());
}
}
}
@Test
public void testDecodeLastNonEmptyBuffer() {
EmbeddedChannel channel = new EmbeddedChannel(new ByteToMessageDecoder() {
private boolean decodeLast;
@Override
protected void decode(ChannelHandlerContext ctx, Buffer in) {
int readable = in.readableBytes();
assertTrue(readable > 0);
if (!decodeLast && readable == 1) {
return;
}
int read = decodeLast ? readable : readable - 1;
Buffer slice = in.slice(in.readerOffset(), read);
in.readerOffset(in.readerOffset() + read);
ctx.fireChannelRead(slice);
}
@Override
protected void decodeLast(ChannelHandlerContext ctx, Buffer in) throws Exception {
assertFalse(decodeLast);
decodeLast = true;
super.decodeLast(ctx, in);
}
});
byte[] bytes = new byte[1024];
ThreadLocalRandom.current().nextBytes(bytes);
try (Buffer buf = heap().allocate(bytes.length, BIG_ENDIAN);
Buffer part1 = buf.slice(0, bytes.length - 1);
Buffer part2 = buf.slice(bytes.length - 1, 1)) {
for (byte b : bytes) {
buf.writeByte(b);
}
assertTrue(channel.writeInbound(buf.slice()));
try (Buffer actual = channel.readInbound()) {
assertEquals(part1, actual);
}
assertNull(channel.readInbound());
assertTrue(channel.finish());
try (Buffer actual = channel.readInbound()) {
assertEquals(part2, actual);
}
assertNull(channel.readInbound());
}
}
@Test
public void testReadOnlyBuffer() {
EmbeddedChannel channel = new EmbeddedChannel(new ByteToMessageDecoder() {
@Override
protected void decode(ChannelHandlerContext ctx, Buffer in) { }
});
assertFalse(channel.writeInbound(heap().allocate(8).writeByte((byte) 1).readOnly(true)));
assertFalse(channel.writeInbound(heap().allocate(1).writeByte((byte) 2)));
assertFalse(channel.finish());
}
@Test
public void releaseWhenMergeCumulateThrows() {
Buffer oldCumulation = writeFailingCumulation(1, 64);
oldCumulation.writeByte((byte) 0);
Buffer in = heap().allocate(12, BIG_ENDIAN).writerOffset(12);
Throwable thrown = null;
try {
ByteToMessageDecoder.MERGE_CUMULATOR.cumulate(heap(), oldCumulation, in);
} catch (Throwable t) {
thrown = t;
}
assertThat(thrown).hasMessage("boom");
assertFalse(in.isAccessible());
assertTrue(oldCumulation.isOwned());
oldCumulation.close();
}
private static Buffer writeFailingCumulation(int untilFailure, int capacity) {
Buffer realBuffer = heap().allocate(capacity, BIG_ENDIAN);
Answer<Object> callRealBuffer = inv -> {
Object result = inv.getMethod().invoke(realBuffer, inv.getArguments());
if (result == realBuffer) {
// Preserve mock wrapper for methods that returns the callee ('this') buffer instance.
return inv.getMock();
}
return result;
};
Buffer buffer = mock(Buffer.class, withSettings().defaultAnswer(callRealBuffer));
AtomicInteger countDown = new AtomicInteger(untilFailure);
doAnswer(inv -> {
if (countDown.decrementAndGet() <= 0) {
throw new Error("boom");
}
return callRealBuffer.answer(inv);
}).when(buffer).writeBytes(any(Buffer.class));
return buffer;
}
@Test
public void releaseWhenMergeCumulateThrowsInExpand() {
releaseWhenMergeCumulateThrowsInExpand(1, true);
releaseWhenMergeCumulateThrowsInExpand(2, true);
releaseWhenMergeCumulateThrowsInExpand(3, false); // sentinel test case
}
private static void releaseWhenMergeCumulateThrowsInExpand(int untilFailure, boolean shouldFail) {
Buffer oldCumulation = heap().allocate(8, BIG_ENDIAN).writeByte((byte) 0);
Buffer newCumulation = writeFailingCumulation(untilFailure, 16);
BufferAllocator allocator = new BufferAllocator() {
@Override
public Buffer allocate(int capacity) {
return newCumulation;
}
};
Buffer in = heap().allocate(12, BIG_ENDIAN).writerOffset(12);
Throwable thrown = null;
try {
ByteToMessageDecoder.MERGE_CUMULATOR.cumulate(allocator, oldCumulation, in);
} catch (Throwable t) {
thrown = t;
}
assertFalse(in.isAccessible());
if (shouldFail) {
assertThat(thrown).hasMessage("boom");
assertTrue(oldCumulation.isOwned());
oldCumulation.close();
assertFalse(newCumulation.isAccessible());
} else {
assertNull(thrown);
assertFalse(oldCumulation.isAccessible());
assertTrue(newCumulation.isOwned());
newCumulation.close();
}
}
@Test
public void releaseWhenCompositeCumulateThrows() {
Buffer in = heap().allocate(12, LITTLE_ENDIAN).writerOffset(12);
try (Buffer cumulation = Buffer.compose(heap(), heap().allocate(1, BIG_ENDIAN).writeByte((byte) 0).send())) {
ByteToMessageDecoder.COMPOSITE_CUMULATOR.cumulate(heap(), cumulation, in);
fail();
} catch (IllegalArgumentException expected) {
assertThat(expected).hasMessageContaining("byte order");
assertFalse(in.isAccessible());
}
}
@Test
public void testDoesNotOverRead() {
class ReadInterceptingHandler implements ChannelHandler {
private int readsTriggered;
@Override
public void read(ChannelHandlerContext ctx) {
readsTriggered++;
ctx.read();
}
}
ReadInterceptingHandler interceptor = new ReadInterceptingHandler();
EmbeddedChannel channel = new EmbeddedChannel();
channel.config().setAutoRead(false);
channel.pipeline().addLast(interceptor, new FixedLengthFrameDecoder(3));
assertEquals(0, interceptor.readsTriggered);
// 0 complete frames, 1 partial frame: SHOULD trigger a read
channel.writeInbound(heap().allocate(2, BIG_ENDIAN).writeShort((short) 0x0001));
assertEquals(1, interceptor.readsTriggered);
// 2 complete frames, 0 partial frames: should NOT trigger a read
channel.writeInbound(heap().allocate(1).writeByte((byte) 2),
heap().allocate(3).writeByte((byte) 3).writeByte((byte) 4).writeByte((byte) 5));
assertEquals(1, interceptor.readsTriggered);
// 1 complete frame, 1 partial frame: should NOT trigger a read
channel.writeInbound(heap().allocate(3).writeByte((byte) 6).writeByte((byte) 7).writeByte((byte) 8),
heap().allocate(1).writeByte((byte) 9));
assertEquals(1, interceptor.readsTriggered);
// 1 complete frame, 1 partial frame: should NOT trigger a read
channel.writeInbound(heap().allocate(2).writeByte((byte) 10).writeByte((byte) 11),
heap().allocate(1).writeByte((byte) 12));
assertEquals(1, interceptor.readsTriggered);
// 0 complete frames, 1 partial frame: SHOULD trigger a read
channel.writeInbound(heap().allocate(1).writeByte((byte) 13));
assertEquals(2, interceptor.readsTriggered);
// 1 complete frame, 0 partial frames: should NOT trigger a read
channel.writeInbound(heap().allocate(1).writeByte((byte) 14));
assertEquals(2, interceptor.readsTriggered);
for (int i = 0; i < 5; i++) {
try (Buffer read = channel.readInbound()) {
assertEquals(i * 3, read.getByte(0));
assertEquals(i * 3 + 1, read.getByte(1));
assertEquals(i * 3 + 2, read.getByte(2));
}
}
assertFalse(channel.finish());
}
@Test
public void testDisorder() {
ByteToMessageDecoder decoder = new ByteToMessageDecoder() {
int count;
//read 4 byte then remove this decoder
@Override
protected void decode(ChannelHandlerContext ctx, Buffer in) {
ctx.fireChannelRead(in.readByte());
if (++count >= 4) {
ctx.pipeline().remove(this);
}
}
};
EmbeddedChannel channel = new EmbeddedChannel(decoder);
byte[] bytes = {1, 2, 3, 4, 5};
Buffer buf = heap().allocate(bytes.length);
for (byte b : bytes) {
buf.writeByte(b);
}
assertTrue(channel.writeInbound(buf));
assertEquals((byte) 1, channel.readInbound());
assertEquals((byte) 2, channel.readInbound());
assertEquals((byte) 3, channel.readInbound());
assertEquals((byte) 4, channel.readInbound());
Buffer buffer5 = channel.readInbound();
assertEquals((byte) 5, buffer5.readByte());
assertEquals(0, buffer5.readableBytes());
buffer5.close();
assertFalse(buffer5.isAccessible());
assertFalse(channel.finish());
}
@Test
public void testDecodeLast() {
final AtomicBoolean removeHandler = new AtomicBoolean();
EmbeddedChannel channel = new EmbeddedChannel(new ByteToMessageDecoder() {
@Override
protected void decode(ChannelHandlerContext ctx, Buffer in) {
if (removeHandler.get()) {
ctx.pipeline().remove(this);
}
}
});
byte[] bytes = new byte[1024];
ThreadLocalRandom.current().nextBytes(bytes);
try (Buffer buf = heap().allocate(bytes.length)) {
for (byte b : bytes) {
buf.writeByte(b);
}
assertFalse(channel.writeInbound(buf.slice()));
assertNull(channel.readInbound());
removeHandler.set(true);
// This should trigger channelInputClosed(...)
channel.pipeline().fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
assertTrue(channel.finish());
try (Buffer actual = channel.readInbound()) {
assertEquals(buf.slice(), actual);
}
assertNull(channel.readInbound());
}
}
}

View File

@ -0,0 +1,64 @@
/*
* Copyright 2021 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.buffer.api.examples.bytetomessagedecoder;
import io.netty.buffer.api.Buffer;
import io.netty.channel.ChannelHandlerContext;
import static io.netty.util.internal.ObjectUtil.checkPositive;
public class FixedLengthFrameDecoder extends ByteToMessageDecoder {
private final int frameLength;
/**
* Creates a new instance.
*
* @param frameLength the length of the frame
*/
public FixedLengthFrameDecoder(int frameLength) {
checkPositive(frameLength, "frameLength");
this.frameLength = frameLength;
}
@Override
protected final void decode(ChannelHandlerContext ctx, Buffer in) throws Exception {
Object decoded = decode0(ctx, in);
if (decoded != null) {
ctx.fireChannelRead(decoded);
}
}
/**
* Create a frame out of the {@link Buffer} and return it.
*
* @param ctx the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to
* @param in the {@link Buffer} from which to read data
* @return frame the {@link Buffer} which represent the frame or {@code null} if no frame could
* be created.
*/
protected Object decode0(
@SuppressWarnings("UnusedParameters") ChannelHandlerContext ctx, Buffer in) throws Exception {
if (in.readableBytes() < frameLength) {
return null;
} else {
try {
return in.slice(in.readerOffset(), frameLength);
} finally {
in.readerOffset(in.readerOffset() + frameLength);
}
}
}
}