Port over the ByteToMessageDecoder as an example
This commit is contained in:
parent
7775460984
commit
c081c73885
535
src/main/java/io/netty/buffer/api/adaptor/BufferAdaptor.java
Normal file
535
src/main/java/io/netty/buffer/api/adaptor/BufferAdaptor.java
Normal file
@ -0,0 +1,535 @@
|
||||
/*
|
||||
* Copyright 2021 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.netty.buffer.api.adaptor;
|
||||
|
||||
import io.netty.buffer.api.Buffer;
|
||||
import io.netty.buffer.api.BufferHolder;
|
||||
import io.netty.buffer.api.ByteCursor;
|
||||
import io.netty.buffer.api.ReadableComponentProcessor;
|
||||
import io.netty.buffer.api.Send;
|
||||
import io.netty.buffer.api.WritableComponentProcessor;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.ByteOrder;
|
||||
import java.util.Objects;
|
||||
import java.util.function.Function;
|
||||
|
||||
/**
|
||||
* A {@link Buffer} implementation that delegates all method calls to a given delegate buffer instance.
|
||||
*/
|
||||
public abstract class BufferAdaptor implements Buffer {
|
||||
protected Buffer buffer;
|
||||
|
||||
protected BufferAdaptor(Buffer buffer) {
|
||||
this.buffer = Objects.requireNonNull(buffer, "Delegate buffer cannot be null.");
|
||||
}
|
||||
|
||||
@Override
|
||||
public Buffer order(ByteOrder order) {
|
||||
buffer.order(order);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteOrder order() {
|
||||
return buffer.order();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int capacity() {
|
||||
return buffer.capacity();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int readerOffset() {
|
||||
return buffer.readerOffset();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Buffer readerOffset(int offset) {
|
||||
buffer.readerOffset(offset);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int writerOffset() {
|
||||
return buffer.writerOffset();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Buffer writerOffset(int offset) {
|
||||
buffer.writerOffset(offset);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int readableBytes() {
|
||||
return buffer.readableBytes();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int writableBytes() {
|
||||
return buffer.writableBytes();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Buffer fill(byte value) {
|
||||
buffer.fill(value);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long nativeAddress() {
|
||||
return buffer.nativeAddress();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Buffer readOnly(boolean readOnly) {
|
||||
buffer.readOnly(readOnly);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean readOnly() {
|
||||
return buffer.readOnly();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void copyInto(int srcPos, byte[] dest, int destPos, int length) {
|
||||
buffer.copyInto(srcPos, dest, destPos, length);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void copyInto(int srcPos, ByteBuffer dest, int destPos, int length) {
|
||||
buffer.copyInto(srcPos, dest, destPos, length);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void copyInto(int srcPos, Buffer dest, int destPos, int length) {
|
||||
buffer.copyInto(srcPos, dest, destPos, length);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Buffer reset() {
|
||||
buffer.reset();
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteCursor openCursor() {
|
||||
return buffer.openCursor();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteCursor openCursor(int fromOffset, int length) {
|
||||
return buffer.openCursor(fromOffset, length);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteCursor openReverseCursor() {
|
||||
return buffer.openReverseCursor();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteCursor openReverseCursor(int fromOffset, int length) {
|
||||
return buffer.openReverseCursor(fromOffset, length);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void ensureWritable(int size) {
|
||||
buffer.ensureWritable(size);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void ensureWritable(int size, int minimumGrowth, boolean allowCompaction) {
|
||||
buffer.ensureWritable(size, minimumGrowth, allowCompaction);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Buffer slice() {
|
||||
buffer.slice();
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Buffer slice(int offset, int length) {
|
||||
buffer.slice(offset, length);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Buffer bifurcate() {
|
||||
buffer.bifurcate();
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Buffer bifurcate(int splitOffset) {
|
||||
buffer.bifurcate(splitOffset);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void compact() {
|
||||
buffer.compact();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int countComponents() {
|
||||
return buffer.countComponents();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int countReadableComponents() {
|
||||
return buffer.countReadableComponents();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int countWritableComponents() {
|
||||
return buffer.countWritableComponents();
|
||||
}
|
||||
|
||||
@Override
|
||||
public <E extends Exception> int forEachReadable(int initialIndex, ReadableComponentProcessor<E> processor)
|
||||
throws E {
|
||||
return buffer.forEachReadable(initialIndex, processor);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <E extends Exception> int forEachWritable(int initialIndex, WritableComponentProcessor<E> processor)
|
||||
throws E {
|
||||
return buffer.forEachWritable(initialIndex, processor);
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte readByte() {
|
||||
return buffer.readByte();
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte getByte(int roff) {
|
||||
return buffer.getByte(roff);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int readUnsignedByte() {
|
||||
return buffer.readUnsignedByte();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getUnsignedByte(int roff) {
|
||||
return buffer.getUnsignedByte(roff);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Buffer writeByte(byte value) {
|
||||
buffer.writeByte(value);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Buffer setByte(int woff, byte value) {
|
||||
buffer.setByte(woff, value);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Buffer writeUnsignedByte(int value) {
|
||||
buffer.writeUnsignedByte(value);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Buffer setUnsignedByte(int woff, int value) {
|
||||
buffer.setUnsignedByte(woff, value);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public char readChar() {
|
||||
return buffer.readChar();
|
||||
}
|
||||
|
||||
@Override
|
||||
public char getChar(int roff) {
|
||||
return buffer.getChar(roff);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Buffer writeChar(char value) {
|
||||
buffer.writeChar(value);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Buffer setChar(int woff, char value) {
|
||||
buffer.setChar(woff, value);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public short readShort() {
|
||||
return buffer.readShort();
|
||||
}
|
||||
|
||||
@Override
|
||||
public short getShort(int roff) {
|
||||
return buffer.getShort(roff);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int readUnsignedShort() {
|
||||
return buffer.readUnsignedShort();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getUnsignedShort(int roff) {
|
||||
return buffer.getUnsignedShort(roff);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Buffer writeShort(short value) {
|
||||
buffer.writeShort(value);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Buffer setShort(int woff, short value) {
|
||||
buffer.setShort(woff, value);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Buffer writeUnsignedShort(int value) {
|
||||
buffer.writeUnsignedShort(value);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Buffer setUnsignedShort(int woff, int value) {
|
||||
buffer.setUnsignedShort(woff, value);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int readMedium() {
|
||||
return buffer.readMedium();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getMedium(int roff) {
|
||||
return buffer.getMedium(roff);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int readUnsignedMedium() {
|
||||
return buffer.readUnsignedMedium();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getUnsignedMedium(int roff) {
|
||||
return buffer.getUnsignedMedium(roff);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Buffer writeMedium(int value) {
|
||||
buffer.writeMedium(value);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Buffer setMedium(int woff, int value) {
|
||||
buffer.setMedium(woff, value);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Buffer writeUnsignedMedium(int value) {
|
||||
buffer.writeUnsignedMedium(value);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Buffer setUnsignedMedium(int woff, int value) {
|
||||
buffer.setUnsignedMedium(woff, value);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int readInt() {
|
||||
return buffer.readInt();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getInt(int roff) {
|
||||
return buffer.getInt(roff);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long readUnsignedInt() {
|
||||
return buffer.readUnsignedInt();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getUnsignedInt(int roff) {
|
||||
return buffer.getUnsignedInt(roff);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Buffer writeInt(int value) {
|
||||
buffer.writeInt(value);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Buffer setInt(int woff, int value) {
|
||||
buffer.setInt(woff, value);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Buffer writeUnsignedInt(long value) {
|
||||
buffer.writeUnsignedInt(value);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Buffer setUnsignedInt(int woff, long value) {
|
||||
buffer.setUnsignedInt(woff, value);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public float readFloat() {
|
||||
return buffer.readFloat();
|
||||
}
|
||||
|
||||
@Override
|
||||
public float getFloat(int roff) {
|
||||
return buffer.getFloat(roff);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Buffer writeFloat(float value) {
|
||||
buffer.writeFloat(value);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Buffer setFloat(int woff, float value) {
|
||||
buffer.setFloat(woff, value);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long readLong() {
|
||||
return buffer.readLong();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getLong(int roff) {
|
||||
return buffer.getLong(roff);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Buffer writeLong(long value) {
|
||||
buffer.writeLong(value);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Buffer setLong(int woff, long value) {
|
||||
buffer.setLong(woff, value);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public double readDouble() {
|
||||
return buffer.readDouble();
|
||||
}
|
||||
|
||||
@Override
|
||||
public double getDouble(int roff) {
|
||||
return buffer.getDouble(roff);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Buffer writeDouble(double value) {
|
||||
buffer.writeDouble(value);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Buffer setDouble(int woff, double value) {
|
||||
buffer.setDouble(woff, value);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Buffer acquire() {
|
||||
buffer.acquire();
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Buffer get() {
|
||||
buffer.get();
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isInstanceOf(Class<?> cls) {
|
||||
return buffer.isInstanceOf(cls);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
buffer.close();
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public Send<Buffer> send() {
|
||||
Class<Buffer> aClass = (Class<Buffer>) (Class<?>) getClass();
|
||||
Function<Buffer, Buffer> receive = this::receive;
|
||||
return buffer.send().map(aClass, receive);
|
||||
}
|
||||
|
||||
/**
|
||||
* Called when a {@linkplain #send() sent} {@link BufferAdaptor} is received by the recipient.
|
||||
* The {@link BufferAdaptor} should return a new concrete instance, that wraps the given {@link Buffer} object.
|
||||
*
|
||||
* @param buf The {@link Buffer} that is {@linkplain Send#receive() received} by the recipient,
|
||||
* and needs to be wrapped in a new {@link BufferHolder} instance.
|
||||
* @return A new buffer adaptor instance, containing the given {@linkplain Buffer buffer}.
|
||||
*/
|
||||
protected abstract BufferAdaptor receive(Buffer buf);
|
||||
|
||||
@Override
|
||||
public boolean isOwned() {
|
||||
return buffer.isOwned();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int countBorrows() {
|
||||
return buffer.countBorrows();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isAccessible() {
|
||||
return buffer.isAccessible();
|
||||
}
|
||||
}
|
@ -0,0 +1,751 @@
|
||||
/*
|
||||
* Copyright 2021 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.netty.buffer.api.examples.bytetomessagedecoder;
|
||||
|
||||
import io.netty.buffer.ByteBufAllocator;
|
||||
import io.netty.buffer.api.Buffer;
|
||||
import io.netty.buffer.api.BufferAllocator;
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelConfig;
|
||||
import io.netty.channel.ChannelFuture;
|
||||
import io.netty.channel.ChannelHandler;
|
||||
import io.netty.channel.ChannelHandlerAdapter;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelPipeline;
|
||||
import io.netty.channel.ChannelProgressivePromise;
|
||||
import io.netty.channel.ChannelPromise;
|
||||
import io.netty.channel.socket.ChannelInputShutdownEvent;
|
||||
import io.netty.handler.codec.DecoderException;
|
||||
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
|
||||
import io.netty.handler.codec.FixedLengthFrameDecoder;
|
||||
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
|
||||
import io.netty.handler.codec.LineBasedFrameDecoder;
|
||||
import io.netty.util.Attribute;
|
||||
import io.netty.util.AttributeKey;
|
||||
import io.netty.util.concurrent.EventExecutor;
|
||||
import io.netty.util.internal.MathUtil;
|
||||
import io.netty.util.internal.StringUtil;
|
||||
|
||||
import java.net.SocketAddress;
|
||||
|
||||
import static io.netty.util.internal.ObjectUtil.checkPositive;
|
||||
import static java.util.Objects.requireNonNull;
|
||||
|
||||
/**
|
||||
* {@link ChannelHandler} which decodes bytes in a stream-like fashion from one {@link Buffer} to an
|
||||
* other Message type.
|
||||
*
|
||||
* For example here is an implementation which reads all readable bytes from
|
||||
* the input {@link Buffer}, creates a new {@link Buffer} and forward it to the
|
||||
* next {@link ChannelHandler} in the {@link ChannelPipeline}.
|
||||
*
|
||||
* <pre>
|
||||
* public class SquareDecoder extends {@link ByteToMessageDecoder} {
|
||||
* {@code @Override}
|
||||
* public void decode({@link ChannelHandlerContext} ctx, {@link Buffer} in)
|
||||
* throws {@link Exception} {
|
||||
* ctx.fireChannelRead(in.bifurcate());
|
||||
* }
|
||||
* }
|
||||
* </pre>
|
||||
*
|
||||
* <h3>Frame detection</h3>
|
||||
* <p>
|
||||
* Generally frame detection should be handled earlier in the pipeline by adding a
|
||||
* {@link DelimiterBasedFrameDecoder}, {@link FixedLengthFrameDecoder}, {@link LengthFieldBasedFrameDecoder},
|
||||
* or {@link LineBasedFrameDecoder}.
|
||||
* <p>
|
||||
* If a custom frame decoder is required, then one needs to be careful when implementing
|
||||
* one with {@link ByteToMessageDecoder}. Ensure there are enough bytes in the buffer for a
|
||||
* complete frame by checking {@link Buffer#readableBytes()}. If there are not enough bytes
|
||||
* for a complete frame, return without modifying the reader index to allow more bytes to arrive.
|
||||
* <p>
|
||||
* To check for complete frames without modifying the reader index, use methods like
|
||||
* {@link Buffer#getInt(int)}.
|
||||
* One <strong>MUST</strong> use the reader index when using methods like
|
||||
* {@link Buffer#getInt(int)}.
|
||||
* For example calling <tt>in.getInt(0)</tt> is assuming the frame starts at the beginning of the buffer, which
|
||||
* is not always the case. Use <tt>in.getInt(in.readerIndex())</tt> instead.
|
||||
* <h3>Pitfalls</h3>
|
||||
* <p>
|
||||
* Be aware that sub-classes of {@link ByteToMessageDecoder} <strong>MUST NOT</strong>
|
||||
* annotated with {@link @Sharable}.
|
||||
* <p>
|
||||
* Some methods such as {@link Buffer#bifurcate(int)} will cause a memory leak if the returned buffer
|
||||
* is not released or fired through the {@link ChannelPipeline} via
|
||||
* {@link ChannelHandlerContext#fireChannelRead(Object)}.
|
||||
*/
|
||||
public abstract class ByteToMessageDecoder extends ChannelHandlerAdapter {
|
||||
|
||||
/**
|
||||
* Cumulate {@link Buffer}s by merge them into one {@link Buffer}'s, using memory copies.
|
||||
*/
|
||||
public static final Cumulator MERGE_CUMULATOR = (alloc, cumulation, in) -> {
|
||||
if (cumulation.readableBytes() == 0 && !Buffer.isComposite(cumulation)) {
|
||||
// If cumulation is empty and input buffer is contiguous, use it directly
|
||||
cumulation.close();
|
||||
return in;
|
||||
}
|
||||
// We must release 'in' in all cases as otherwise it may produce a leak if writeBytes(...) throw
|
||||
// for whatever release (for example because of OutOfMemoryError)
|
||||
try (in) {
|
||||
final int required = in.readableBytes();
|
||||
if (required > cumulation.writableBytes() || !cumulation.isOwned() || cumulation.readOnly()) {
|
||||
// Expand cumulation (by replacing it) under the following conditions:
|
||||
// - cumulation cannot be resized to accommodate the additional data
|
||||
// - cumulation can be expanded with a reallocation operation to accommodate but the buffer is
|
||||
// assumed to be shared (e.g. refCnt() > 1) and the reallocation may not be safe.
|
||||
return expandCumulation(alloc, cumulation, in);
|
||||
}
|
||||
in.copyInto(in.readerOffset(), cumulation, cumulation.writerOffset(), required);
|
||||
cumulation.writerOffset(cumulation.writerOffset() + required);
|
||||
in.readerOffset(in.writerOffset());
|
||||
return cumulation;
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* Cumulate {@link Buffer}s by add them to a composite buffer and so do no memory copy whenever
|
||||
* possible.
|
||||
* Be aware that composite buffer use a more complex indexing implementation so depending on your use-case
|
||||
* and the decoder implementation this may be slower then just use the {@link #MERGE_CUMULATOR}.
|
||||
*/
|
||||
public static final Cumulator COMPOSITE_CUMULATOR = (alloc, cumulation, in) -> {
|
||||
if (cumulation.readableBytes() == 0) {
|
||||
cumulation.close();
|
||||
return in;
|
||||
}
|
||||
Buffer composite;
|
||||
try (in) {
|
||||
if (Buffer.isComposite(cumulation) && cumulation.isOwned()) {
|
||||
composite = cumulation;
|
||||
if (composite.writerOffset() != composite.capacity()) {
|
||||
// Writer index must equal capacity if we are going to "write"
|
||||
// new components to the end.
|
||||
composite = cumulation.slice(0, composite.writerOffset());
|
||||
cumulation.close();
|
||||
}
|
||||
} else {
|
||||
composite = Buffer.compose(alloc, cumulation);
|
||||
}
|
||||
Buffer.extendComposite(composite, in);
|
||||
return composite;
|
||||
}
|
||||
};
|
||||
|
||||
Buffer cumulation;
|
||||
private Cumulator cumulator = MERGE_CUMULATOR;
|
||||
private boolean singleDecode;
|
||||
private boolean first;
|
||||
|
||||
/**
|
||||
* This flag is used to determine if we need to call {@link ChannelHandlerContext#read()} to consume more data
|
||||
* when {@link ChannelConfig#isAutoRead()} is {@code false}.
|
||||
*/
|
||||
private boolean firedChannelRead;
|
||||
|
||||
private int discardAfterReads = 16;
|
||||
private int numReads;
|
||||
private ByteToMessageDecoderContext context;
|
||||
|
||||
protected ByteToMessageDecoder() {
|
||||
ensureNotSharable();
|
||||
}
|
||||
|
||||
/**
|
||||
* If set then only one message is decoded on each {@link #channelRead(ChannelHandlerContext, Object)}
|
||||
* call. This may be useful if you need to do some protocol upgrade and want to make sure nothing is mixed up.
|
||||
*
|
||||
* Default is {@code false} as this has performance impacts.
|
||||
*/
|
||||
public void setSingleDecode(boolean singleDecode) {
|
||||
this.singleDecode = singleDecode;
|
||||
}
|
||||
|
||||
/**
|
||||
* If {@code true} then only one message is decoded on each
|
||||
* {@link #channelRead(ChannelHandlerContext, Object)} call.
|
||||
*
|
||||
* Default is {@code false} as this has performance impacts.
|
||||
*/
|
||||
public boolean isSingleDecode() {
|
||||
return singleDecode;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the {@link Cumulator} to use for cumulate the received {@link Buffer}s.
|
||||
*/
|
||||
public void setCumulator(Cumulator cumulator) {
|
||||
requireNonNull(cumulator, "cumulator");
|
||||
this.cumulator = cumulator;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the number of reads after which {@link Buffer#compact()} is called to free up memory.
|
||||
* The default is {@code 16}.
|
||||
*/
|
||||
public void setDiscardAfterReads(int discardAfterReads) {
|
||||
checkPositive(discardAfterReads, "discardAfterReads");
|
||||
this.discardAfterReads = discardAfterReads;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the actual number of readable bytes in the internal cumulative
|
||||
* buffer of this decoder. You usually do not need to rely on this value
|
||||
* to write a decoder. Use it only when you must use it at your own risk.
|
||||
* This method is a shortcut to {@link #internalBuffer() internalBuffer().readableBytes()}.
|
||||
*/
|
||||
protected int actualReadableBytes() {
|
||||
return internalBuffer().readableBytes();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the internal cumulative buffer of this decoder. You usually
|
||||
* do not need to access the internal buffer directly to write a decoder.
|
||||
* Use it only when you must use it at your own risk.
|
||||
*/
|
||||
protected Buffer internalBuffer() {
|
||||
if (cumulation != null) {
|
||||
return cumulation;
|
||||
} else {
|
||||
return newEmptyBuffer();
|
||||
}
|
||||
}
|
||||
|
||||
private static Buffer newEmptyBuffer() {
|
||||
return Buffer.compose(BufferAllocator.heap());
|
||||
}
|
||||
|
||||
@Override
|
||||
public final void handlerAdded(ChannelHandlerContext ctx) throws Exception {
|
||||
context = new ByteToMessageDecoderContext(ctx);
|
||||
handlerAdded0(context);
|
||||
}
|
||||
|
||||
protected void handlerAdded0(ChannelHandlerContext ctx) throws Exception {
|
||||
}
|
||||
|
||||
@Override
|
||||
public final void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
|
||||
Buffer buf = cumulation;
|
||||
if (buf != null) {
|
||||
// Directly set this to null so we are sure we not access it in any other method here anymore.
|
||||
cumulation = null;
|
||||
numReads = 0;
|
||||
int readable = buf.readableBytes();
|
||||
if (readable > 0) {
|
||||
ctx.fireChannelRead(buf);
|
||||
ctx.fireChannelReadComplete();
|
||||
} else {
|
||||
buf.close();
|
||||
}
|
||||
}
|
||||
handlerRemoved0(context);
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets called after the {@link ByteToMessageDecoder} was removed from the actual context and it doesn't handle
|
||||
* events anymore.
|
||||
*/
|
||||
protected void handlerRemoved0(ChannelHandlerContext ctx) throws Exception { }
|
||||
|
||||
@Override
|
||||
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
|
||||
if (msg instanceof Buffer) {
|
||||
try {
|
||||
Buffer data = (Buffer) msg;
|
||||
first = cumulation == null;
|
||||
if (first) {
|
||||
cumulation = data;
|
||||
} else {
|
||||
// ByteBufAllocator alloc = ctx.alloc(); // TODO this API integration needs more work
|
||||
BufferAllocator alloc = BufferAllocator.heap();
|
||||
cumulation = cumulator.cumulate(alloc, cumulation, data);
|
||||
}
|
||||
assert context.ctx == ctx || ctx == context;
|
||||
|
||||
callDecode(context, cumulation); // TODO we'll want to bifurcate here, and simplify lifetime handling
|
||||
} catch (DecoderException e) {
|
||||
throw e;
|
||||
} catch (Exception e) {
|
||||
throw new DecoderException(e);
|
||||
} finally {
|
||||
if (cumulation != null && cumulation.readableBytes() == 0) {
|
||||
numReads = 0;
|
||||
cumulation.close();
|
||||
cumulation = null;
|
||||
} else if (++ numReads >= discardAfterReads) {
|
||||
// We did enough reads already try to discard some bytes so we not risk to see a OOME.
|
||||
// See https://github.com/netty/netty/issues/4275
|
||||
numReads = 0;
|
||||
discardSomeReadBytes(); // TODO no need to so this dance because ensureWritable can compact for us
|
||||
}
|
||||
|
||||
firedChannelRead |= context.fireChannelReadCallCount() > 0;
|
||||
context.reset();
|
||||
}
|
||||
} else {
|
||||
ctx.fireChannelRead(msg);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
|
||||
numReads = 0;
|
||||
discardSomeReadBytes();
|
||||
if (!firedChannelRead && !ctx.channel().config().isAutoRead()) {
|
||||
ctx.read();
|
||||
}
|
||||
firedChannelRead = false;
|
||||
ctx.fireChannelReadComplete();
|
||||
}
|
||||
|
||||
protected final void discardSomeReadBytes() {
|
||||
if (cumulation != null && !first && cumulation.isOwned()) {
|
||||
// discard some bytes if possible to make more room in the
|
||||
// buffer but only if the refCnt == 1 as otherwise the user may have
|
||||
// used slice().retain() or duplicate().retain().
|
||||
//
|
||||
// See:
|
||||
// - https://github.com/netty/netty/issues/2327
|
||||
// - https://github.com/netty/netty/issues/1764
|
||||
cumulation.compact();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
|
||||
assert context.ctx == ctx || ctx == context;
|
||||
channelInputClosed(context, true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
|
||||
ctx.fireUserEventTriggered(evt);
|
||||
if (evt instanceof ChannelInputShutdownEvent) {
|
||||
// The decodeLast method is invoked when a channelInactive event is encountered.
|
||||
// This method is responsible for ending requests in some situations and must be called
|
||||
// when the input has been shutdown.
|
||||
assert context.ctx == ctx || ctx == context;
|
||||
channelInputClosed(context, false);
|
||||
}
|
||||
}
|
||||
|
||||
private void channelInputClosed(ByteToMessageDecoderContext ctx, boolean callChannelInactive) {
|
||||
try {
|
||||
channelInputClosed(ctx);
|
||||
} catch (DecoderException e) {
|
||||
throw e;
|
||||
} catch (Exception e) {
|
||||
throw new DecoderException(e);
|
||||
} finally {
|
||||
if (cumulation != null) {
|
||||
cumulation.close();
|
||||
cumulation = null;
|
||||
}
|
||||
if (ctx.fireChannelReadCallCount() > 0) {
|
||||
ctx.reset();
|
||||
// Something was read, call fireChannelReadComplete()
|
||||
ctx.fireChannelReadComplete();
|
||||
}
|
||||
if (callChannelInactive) {
|
||||
ctx.fireChannelInactive();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Called when the input of the channel was closed which may be because it changed to inactive or because of
|
||||
* {@link ChannelInputShutdownEvent}.
|
||||
*/
|
||||
void channelInputClosed(ByteToMessageDecoderContext ctx) throws Exception {
|
||||
if (cumulation != null) {
|
||||
callDecode(ctx, cumulation);
|
||||
// If callDecode(...) removed the handle from the pipeline we should not call decodeLast(...) as this would
|
||||
// be unexpected.
|
||||
if (!ctx.isRemoved()) {
|
||||
// Use Unpooled.EMPTY_BUFFER if cumulation become null after calling callDecode(...).
|
||||
// See https://github.com/netty/netty/issues/10802.
|
||||
Buffer buffer = cumulation == null ? newEmptyBuffer() : cumulation;
|
||||
decodeLast(ctx, buffer);
|
||||
}
|
||||
} else {
|
||||
decodeLast(ctx, newEmptyBuffer());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Called once data should be decoded from the given {@link Buffer}. This method will call
|
||||
* {@link #decode(ChannelHandlerContext, Buffer)} as long as decoding should take place.
|
||||
*
|
||||
* @param ctx the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to
|
||||
* @param in the {@link Buffer} from which to read data
|
||||
*/
|
||||
void callDecode(ByteToMessageDecoderContext ctx, Buffer in) {
|
||||
try {
|
||||
while (in.readableBytes() > 0 && !ctx.isRemoved()) {
|
||||
|
||||
int oldInputLength = in.readableBytes();
|
||||
int numReadCalled = ctx.fireChannelReadCallCount();
|
||||
decodeRemovalReentryProtection(ctx, in);
|
||||
|
||||
// Check if this handler was removed before continuing the loop.
|
||||
// If it was removed, it is not safe to continue to operate on the buffer.
|
||||
//
|
||||
// See https://github.com/netty/netty/issues/1664
|
||||
if (ctx.isRemoved()) {
|
||||
break;
|
||||
}
|
||||
|
||||
if (numReadCalled == ctx.fireChannelReadCallCount()) {
|
||||
if (oldInputLength == in.readableBytes()) {
|
||||
break;
|
||||
} else {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
if (oldInputLength == in.readableBytes()) {
|
||||
throw new DecoderException(
|
||||
StringUtil.simpleClassName(getClass()) +
|
||||
".decode() did not read anything but decoded a message.");
|
||||
}
|
||||
|
||||
if (isSingleDecode()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
} catch (DecoderException e) {
|
||||
throw e;
|
||||
} catch (Exception cause) {
|
||||
throw new DecoderException(cause);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Decode the from one {@link Buffer} to an other. This method will be called till either the input
|
||||
* {@link Buffer} has nothing to read when return from this method or till nothing was read from the input
|
||||
* {@link Buffer}.
|
||||
*
|
||||
* @param ctx the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to
|
||||
* @param in the {@link Buffer} from which to read data
|
||||
* @throws Exception is thrown if an error occurs
|
||||
*/
|
||||
protected abstract void decode(ChannelHandlerContext ctx, Buffer in) throws Exception;
|
||||
|
||||
/**
|
||||
* Decode the from one {@link Buffer} to an other. This method will be called till either the input
|
||||
* {@link Buffer} has nothing to read when return from this method or till nothing was read from the input
|
||||
* {@link Buffer}.
|
||||
*
|
||||
* @param ctx the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to
|
||||
* @param in the {@link Buffer} from which to read data
|
||||
* @throws Exception is thrown if an error occurs
|
||||
*/
|
||||
final void decodeRemovalReentryProtection(ChannelHandlerContext ctx, Buffer in)
|
||||
throws Exception {
|
||||
decode(ctx, in);
|
||||
}
|
||||
|
||||
/**
|
||||
* Is called one last time when the {@link ChannelHandlerContext} goes in-active. Which means the
|
||||
* {@link #channelInactive(ChannelHandlerContext)} was triggered.
|
||||
*
|
||||
* By default this will just call {@link #decode(ChannelHandlerContext, Buffer)} but sub-classes may
|
||||
* override this for some special cleanup operation.
|
||||
*/
|
||||
protected void decodeLast(ChannelHandlerContext ctx, Buffer in) throws Exception {
|
||||
if (in.readableBytes() > 0) {
|
||||
// Only call decode() if there is something left in the buffer to decode.
|
||||
// See https://github.com/netty/netty/issues/4386
|
||||
decodeRemovalReentryProtection(ctx, in);
|
||||
}
|
||||
}
|
||||
|
||||
private static Buffer expandCumulation(BufferAllocator alloc, Buffer oldCumulation, Buffer in) {
|
||||
int newSize = MathUtil.safeFindNextPositivePowerOfTwo(oldCumulation.readableBytes() + in.readableBytes());
|
||||
Buffer newCumulation = alloc.allocate(newSize, oldCumulation.order());
|
||||
Buffer toRelease = newCumulation;
|
||||
try {
|
||||
oldCumulation.copyInto(oldCumulation.readerOffset(), newCumulation, 0, oldCumulation.readableBytes());
|
||||
in.copyInto(in.readerOffset(), newCumulation, oldCumulation.readableBytes(), in.readableBytes());
|
||||
newCumulation.writerOffset(oldCumulation.readableBytes() + in.readableBytes());
|
||||
toRelease = oldCumulation;
|
||||
return newCumulation;
|
||||
} finally {
|
||||
toRelease.close();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Cumulate {@link Buffer}s.
|
||||
*/
|
||||
public interface Cumulator {
|
||||
/**
|
||||
* Cumulate the given {@link Buffer}s and return the {@link Buffer} that holds the cumulated bytes.
|
||||
* The implementation is responsible to correctly handle the life-cycle of the given {@link Buffer}s and so
|
||||
* call {@link Buffer#close()} if a {@link Buffer} is fully consumed.
|
||||
*/
|
||||
Buffer cumulate(BufferAllocator alloc, Buffer cumulation, Buffer in);
|
||||
}
|
||||
|
||||
// Package private so we can also make use of it in ReplayingDecoder.
|
||||
static final class ByteToMessageDecoderContext implements ChannelHandlerContext {
|
||||
private final ChannelHandlerContext ctx;
|
||||
private int fireChannelReadCalled;
|
||||
|
||||
private ByteToMessageDecoderContext(ChannelHandlerContext ctx) {
|
||||
this.ctx = ctx;
|
||||
}
|
||||
|
||||
void reset() {
|
||||
fireChannelReadCalled = 0;
|
||||
}
|
||||
|
||||
int fireChannelReadCallCount() {
|
||||
return fireChannelReadCalled;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Channel channel() {
|
||||
return ctx.channel();
|
||||
}
|
||||
|
||||
@Override
|
||||
public EventExecutor executor() {
|
||||
return ctx.executor();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String name() {
|
||||
return ctx.name();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelHandler handler() {
|
||||
return ctx.handler();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isRemoved() {
|
||||
return ctx.isRemoved();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelHandlerContext fireChannelRegistered() {
|
||||
ctx.fireChannelRegistered();
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelHandlerContext fireChannelUnregistered() {
|
||||
ctx.fireChannelUnregistered();
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelHandlerContext fireChannelActive() {
|
||||
ctx.fireChannelActive();
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelHandlerContext fireChannelInactive() {
|
||||
ctx.fireChannelInactive();
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelHandlerContext fireExceptionCaught(Throwable cause) {
|
||||
ctx.fireExceptionCaught(cause);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelHandlerContext fireUserEventTriggered(Object evt) {
|
||||
ctx.fireUserEventTriggered(evt);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelHandlerContext fireChannelRead(Object msg) {
|
||||
fireChannelReadCalled ++;
|
||||
ctx.fireChannelRead(msg);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelHandlerContext fireChannelReadComplete() {
|
||||
ctx.fireChannelReadComplete();
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelHandlerContext fireChannelWritabilityChanged() {
|
||||
ctx.fireChannelWritabilityChanged();
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelHandlerContext read() {
|
||||
ctx.read();
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelHandlerContext flush() {
|
||||
ctx.flush();
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelPipeline pipeline() {
|
||||
return ctx.pipeline();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBufAllocator alloc() {
|
||||
return ctx.alloc();
|
||||
}
|
||||
|
||||
@Override
|
||||
@Deprecated
|
||||
public <T> Attribute<T> attr(AttributeKey<T> key) {
|
||||
return ctx.attr(key);
|
||||
}
|
||||
|
||||
@Override
|
||||
@Deprecated
|
||||
public <T> boolean hasAttr(AttributeKey<T> key) {
|
||||
return ctx.hasAttr(key);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture bind(SocketAddress localAddress) {
|
||||
return ctx.bind(localAddress);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture connect(SocketAddress remoteAddress) {
|
||||
return ctx.connect(remoteAddress);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
|
||||
return ctx.connect(remoteAddress, localAddress);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture disconnect() {
|
||||
return ctx.disconnect();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture close() {
|
||||
return ctx.close();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture deregister() {
|
||||
return ctx.deregister();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
|
||||
return ctx.bind(localAddress, promise);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
|
||||
return ctx.connect(remoteAddress, promise);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
|
||||
return ctx.connect(remoteAddress, localAddress, promise);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture disconnect(ChannelPromise promise) {
|
||||
return ctx.disconnect(promise);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture close(ChannelPromise promise) {
|
||||
return ctx.close(promise);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture register() {
|
||||
return ctx.register();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture register(ChannelPromise promise) {
|
||||
return ctx.register(promise);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture deregister(ChannelPromise promise) {
|
||||
return ctx.deregister(promise);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture write(Object msg) {
|
||||
return ctx.write(msg);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture write(Object msg, ChannelPromise promise) {
|
||||
return ctx.write(msg, promise);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
|
||||
return ctx.writeAndFlush(msg, promise);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture writeAndFlush(Object msg) {
|
||||
return ctx.writeAndFlush(msg);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelPromise newPromise() {
|
||||
return ctx.newPromise();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelProgressivePromise newProgressivePromise() {
|
||||
return ctx.newProgressivePromise();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture newSucceededFuture() {
|
||||
return ctx.newSucceededFuture();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture newFailedFuture(Throwable cause) {
|
||||
return ctx.newFailedFuture(cause);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelPromise voidPromise() {
|
||||
return ctx.voidPromise();
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,561 @@
|
||||
/*
|
||||
* Copyright 2021 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.netty.buffer.api.examples.bytetomessagedecoder;
|
||||
|
||||
import io.netty.buffer.api.Buffer;
|
||||
import io.netty.buffer.api.BufferAllocator;
|
||||
import io.netty.buffer.api.adaptor.BufferAdaptor;
|
||||
import io.netty.channel.ChannelHandler;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.embedded.EmbeddedChannel;
|
||||
import io.netty.channel.socket.ChannelInputShutdownEvent;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.nio.ByteOrder;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.LinkedBlockingDeque;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import static io.netty.buffer.api.BufferAllocator.heap;
|
||||
import static io.netty.buffer.api.BufferTestSupport.assertEquals;
|
||||
import static java.nio.ByteOrder.BIG_ENDIAN;
|
||||
import static java.nio.ByteOrder.LITTLE_ENDIAN;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertSame;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
public class ByteToMessageDecoderTest {
|
||||
|
||||
@Test
|
||||
public void testRemoveItself() {
|
||||
EmbeddedChannel channel = new EmbeddedChannel(new ByteToMessageDecoder() {
|
||||
private boolean removed;
|
||||
|
||||
@Override
|
||||
protected void decode(ChannelHandlerContext ctx, Buffer in) {
|
||||
assertFalse(removed);
|
||||
in.readByte();
|
||||
ctx.pipeline().remove(this);
|
||||
removed = true;
|
||||
}
|
||||
});
|
||||
|
||||
try (Buffer buf = heap().allocate(4).writeInt(0x01020304)) {
|
||||
channel.writeInbound(buf.slice());
|
||||
try (Buffer b = channel.readInbound()) {
|
||||
buf.readByte();
|
||||
assertEquals(b, buf);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRemoveItselfWriteBuffer() {
|
||||
final Buffer buf = heap().allocate(5, BIG_ENDIAN).writeInt(0x01020304);
|
||||
EmbeddedChannel channel = new EmbeddedChannel(new ByteToMessageDecoder() {
|
||||
private boolean removed;
|
||||
|
||||
@Override
|
||||
protected void decode(ChannelHandlerContext ctx, Buffer in) {
|
||||
assertFalse(removed);
|
||||
in.readByte();
|
||||
ctx.pipeline().remove(this);
|
||||
|
||||
// This should not let it keep call decode
|
||||
buf.writeByte((byte) 0x05);
|
||||
removed = true;
|
||||
}
|
||||
});
|
||||
|
||||
channel.writeInbound(buf.slice());
|
||||
try (Buffer expected = heap().allocate(3, BIG_ENDIAN).writeShort((short) 0x0203).writeByte((byte) 0x04);
|
||||
Buffer b = channel.readInbound();
|
||||
Buffer actual = b.slice(); // Only compare readable bytes.
|
||||
buf) {
|
||||
assertEquals(expected, actual);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Verifies that internal buffer of the ByteToMessageDecoder is released once decoder is removed from pipeline. In
|
||||
* this case input is read fully.
|
||||
*/
|
||||
@Test
|
||||
public void testInternalBufferClearReadAll() {
|
||||
Buffer buf = heap().allocate(1).writeByte((byte) 'a');
|
||||
EmbeddedChannel channel = newInternalBufferTestChannel();
|
||||
assertFalse(channel.writeInbound(buf));
|
||||
assertFalse(channel.finish());
|
||||
}
|
||||
|
||||
/**
|
||||
* Verifies that internal buffer of the ByteToMessageDecoder is released once decoder is removed from pipeline. In
|
||||
* this case input was not fully read.
|
||||
*/
|
||||
@Test
|
||||
public void testInternalBufferClearReadPartly() {
|
||||
final Buffer buf = heap().allocate(2, BIG_ENDIAN).writeShort((short) 0x0102);
|
||||
EmbeddedChannel channel = newInternalBufferTestChannel();
|
||||
assertTrue(channel.writeInbound(buf));
|
||||
assertTrue(channel.finish());
|
||||
try (Buffer expected = heap().allocate(1).writeByte((byte) 0x02);
|
||||
Buffer b = channel.readInbound();
|
||||
Buffer actual = b.slice()) {
|
||||
assertEquals(expected, actual);
|
||||
assertNull(channel.readInbound());
|
||||
}
|
||||
}
|
||||
|
||||
private EmbeddedChannel newInternalBufferTestChannel() {
|
||||
return new EmbeddedChannel(new ByteToMessageDecoder() {
|
||||
@Override
|
||||
protected void decode(ChannelHandlerContext ctx, Buffer in) {
|
||||
Buffer buf = internalBuffer();
|
||||
assertTrue(buf.isOwned());
|
||||
in.readByte();
|
||||
// Removal from pipeline should clear internal buffer
|
||||
ctx.pipeline().remove(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void handlerRemoved0(ChannelHandlerContext ctx) {
|
||||
assertCumulationReleased(internalBuffer());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
public void handlerRemovedWillNotReleaseBufferIfDecodeInProgress() {
|
||||
EmbeddedChannel channel = new EmbeddedChannel(new ByteToMessageDecoder() {
|
||||
@Override
|
||||
protected void decode(ChannelHandlerContext ctx, Buffer in) throws Exception {
|
||||
ctx.pipeline().remove(this);
|
||||
assertTrue(in.isAccessible());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void handlerRemoved0(ChannelHandlerContext ctx) {
|
||||
assertCumulationReleased(internalBuffer());
|
||||
}
|
||||
});
|
||||
byte[] bytes = new byte[1024];
|
||||
ThreadLocalRandom.current().nextBytes(bytes);
|
||||
|
||||
Buffer buffer = heap().allocate(bytes.length);
|
||||
for (byte b : bytes) {
|
||||
buffer.writeByte(b);
|
||||
}
|
||||
assertTrue(channel.writeInbound(buffer));
|
||||
assertTrue(channel.finishAndReleaseAll());
|
||||
}
|
||||
|
||||
private static void assertCumulationReleased(Buffer buffer) {
|
||||
assertTrue("unexpected value: " + buffer,
|
||||
buffer == null || buffer.capacity() == 0 || !buffer.isAccessible());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFireChannelReadCompleteOnInactive() throws InterruptedException {
|
||||
final BlockingQueue<Integer> queue = new LinkedBlockingDeque<>();
|
||||
EmbeddedChannel channel = new EmbeddedChannel(new ByteToMessageDecoder() {
|
||||
@Override
|
||||
protected void decode(ChannelHandlerContext ctx, Buffer in) {
|
||||
int readable = in.readableBytes();
|
||||
assertTrue(readable > 0);
|
||||
in.readerOffset(in.readerOffset() + readable);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void decodeLast(ChannelHandlerContext ctx, Buffer in) {
|
||||
assertEquals(0, in.readableBytes());
|
||||
ctx.fireChannelRead("data");
|
||||
}
|
||||
}, new ChannelHandler() {
|
||||
@Override
|
||||
public void channelInactive(ChannelHandlerContext ctx) {
|
||||
queue.add(3);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void channelRead(ChannelHandlerContext ctx, Object msg) {
|
||||
queue.add(1);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void channelReadComplete(ChannelHandlerContext ctx) {
|
||||
if (!ctx.channel().isActive()) {
|
||||
queue.add(2);
|
||||
}
|
||||
}
|
||||
});
|
||||
Buffer buf = heap().allocate(2, BIG_ENDIAN).writeShort((short) 0x0102);
|
||||
assertFalse(channel.writeInbound(buf));
|
||||
channel.finish();
|
||||
assertEquals(1, queue.take());
|
||||
assertEquals(2, queue.take());
|
||||
assertEquals(3, queue.take());
|
||||
assertTrue(queue.isEmpty());
|
||||
assertFalse(buf.isAccessible());
|
||||
}
|
||||
|
||||
// See https://github.com/netty/netty/issues/4635
|
||||
@Test
|
||||
public void testRemoveWhileInCallDecode() {
|
||||
final Object upgradeMessage = new Object();
|
||||
final ByteToMessageDecoder decoder = new ByteToMessageDecoder() {
|
||||
@Override
|
||||
protected void decode(ChannelHandlerContext ctx, Buffer in) {
|
||||
assertEquals(1, in.readByte());
|
||||
ctx.fireChannelRead(upgradeMessage);
|
||||
}
|
||||
};
|
||||
|
||||
EmbeddedChannel channel = new EmbeddedChannel(decoder, new ChannelHandler() {
|
||||
@Override
|
||||
public void channelRead(ChannelHandlerContext ctx, Object msg) {
|
||||
if (msg == upgradeMessage) {
|
||||
ctx.pipeline().remove(decoder);
|
||||
return;
|
||||
}
|
||||
ctx.fireChannelRead(msg);
|
||||
}
|
||||
});
|
||||
|
||||
try (Buffer buf = heap().allocate(4, BIG_ENDIAN).writeInt(0x01020304)) {
|
||||
assertTrue(channel.writeInbound(buf.slice()));
|
||||
try (Buffer expected = buf.slice(1, 3);
|
||||
Buffer b = channel.readInbound();
|
||||
Buffer actual = b.slice()) {
|
||||
assertEquals(expected, actual);
|
||||
assertFalse(channel.finish());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDecodeLastEmptyBuffer() {
|
||||
EmbeddedChannel channel = new EmbeddedChannel(new ByteToMessageDecoder() {
|
||||
@Override
|
||||
protected void decode(ChannelHandlerContext ctx, Buffer in) {
|
||||
assertTrue(in.readableBytes() > 0);
|
||||
Buffer slice = in.slice();
|
||||
in.readerOffset(in.readerOffset() + in.readableBytes());
|
||||
ctx.fireChannelRead(slice);
|
||||
}
|
||||
});
|
||||
byte[] bytes = new byte[1024];
|
||||
ThreadLocalRandom.current().nextBytes(bytes);
|
||||
|
||||
try (Buffer buf = heap().allocate(bytes.length)) {
|
||||
for (byte b : bytes) {
|
||||
buf.writeByte(b);
|
||||
}
|
||||
assertTrue(channel.writeInbound(buf.slice()));
|
||||
try (Buffer b = channel.readInbound()) {
|
||||
assertEquals(buf, b);
|
||||
assertNull(channel.readInbound());
|
||||
assertFalse(channel.finish());
|
||||
assertNull(channel.readInbound());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDecodeLastNonEmptyBuffer() {
|
||||
EmbeddedChannel channel = new EmbeddedChannel(new ByteToMessageDecoder() {
|
||||
private boolean decodeLast;
|
||||
|
||||
@Override
|
||||
protected void decode(ChannelHandlerContext ctx, Buffer in) {
|
||||
int readable = in.readableBytes();
|
||||
assertTrue(readable > 0);
|
||||
if (!decodeLast && readable == 1) {
|
||||
return;
|
||||
}
|
||||
int read = decodeLast ? readable : readable - 1;
|
||||
Buffer slice = in.slice(in.readerOffset(), read);
|
||||
in.readerOffset(in.readerOffset() + read);
|
||||
ctx.fireChannelRead(slice);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void decodeLast(ChannelHandlerContext ctx, Buffer in) throws Exception {
|
||||
assertFalse(decodeLast);
|
||||
decodeLast = true;
|
||||
super.decodeLast(ctx, in);
|
||||
}
|
||||
});
|
||||
byte[] bytes = new byte[1024];
|
||||
ThreadLocalRandom.current().nextBytes(bytes);
|
||||
try (Buffer buf = heap().allocate(bytes.length, BIG_ENDIAN);
|
||||
Buffer part1 = buf.slice(0, bytes.length - 1);
|
||||
Buffer part2 = buf.slice(bytes.length - 1, 1)) {
|
||||
for (byte b : bytes) {
|
||||
buf.writeByte(b);
|
||||
}
|
||||
assertTrue(channel.writeInbound(buf.slice()));
|
||||
try (Buffer actual = channel.readInbound()) {
|
||||
assertEquals(part1, actual);
|
||||
}
|
||||
assertNull(channel.readInbound());
|
||||
assertTrue(channel.finish());
|
||||
try (Buffer actual = channel.readInbound()) {
|
||||
assertEquals(part2, actual);
|
||||
}
|
||||
assertNull(channel.readInbound());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReadOnlyBuffer() {
|
||||
EmbeddedChannel channel = new EmbeddedChannel(new ByteToMessageDecoder() {
|
||||
@Override
|
||||
protected void decode(ChannelHandlerContext ctx, Buffer in) { }
|
||||
});
|
||||
assertFalse(channel.writeInbound(heap().allocate(8).writeByte((byte) 1).readOnly(true)));
|
||||
assertFalse(channel.writeInbound(heap().allocate(1).writeByte((byte) 2)));
|
||||
assertFalse(channel.finish());
|
||||
}
|
||||
|
||||
static class WriteFailingByteBuf extends BufferAdaptor {
|
||||
private final Error error = new Error();
|
||||
private int untilFailure;
|
||||
|
||||
WriteFailingByteBuf(int untilFailure, int capacity) {
|
||||
this(untilFailure, heap().allocate(capacity, BIG_ENDIAN));
|
||||
this.untilFailure = untilFailure;
|
||||
}
|
||||
|
||||
private WriteFailingByteBuf(int untilFailure, Buffer buffer) {
|
||||
super(buffer);
|
||||
this.untilFailure = untilFailure;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Buffer order(ByteOrder order) {
|
||||
if (order == LITTLE_ENDIAN && --untilFailure <= 0) {
|
||||
throw error;
|
||||
}
|
||||
return super.order(order);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected BufferAdaptor receive(Buffer buf) {
|
||||
return new WriteFailingByteBuf(untilFailure, buf);
|
||||
}
|
||||
|
||||
Error writeError() {
|
||||
return error;
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void releaseWhenMergeCumulateThrows() {
|
||||
WriteFailingByteBuf oldCumulation = new WriteFailingByteBuf(1, 64);
|
||||
oldCumulation.writeByte((byte) 0);
|
||||
Buffer in = heap().allocate(12, BIG_ENDIAN).writerOffset(12);
|
||||
|
||||
Throwable thrown = null;
|
||||
try {
|
||||
ByteToMessageDecoder.MERGE_CUMULATOR.cumulate(heap(), oldCumulation, in);
|
||||
} catch (Throwable t) {
|
||||
thrown = t;
|
||||
}
|
||||
|
||||
assertSame(oldCumulation.writeError(), thrown);
|
||||
assertFalse(in.isAccessible());
|
||||
assertTrue(oldCumulation.isOwned());
|
||||
oldCumulation.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void releaseWhenMergeCumulateThrowsInExpand() {
|
||||
releaseWhenMergeCumulateThrowsInExpand(1, true);
|
||||
releaseWhenMergeCumulateThrowsInExpand(2, true);
|
||||
releaseWhenMergeCumulateThrowsInExpand(3, false); // sentinel test case
|
||||
}
|
||||
|
||||
private static void releaseWhenMergeCumulateThrowsInExpand(int untilFailure, boolean shouldFail) {
|
||||
Buffer oldCumulation = heap().allocate(8, BIG_ENDIAN).writeByte((byte) 0);
|
||||
final WriteFailingByteBuf newCumulation = new WriteFailingByteBuf(untilFailure, 16);
|
||||
|
||||
BufferAllocator allocator = new BufferAllocator() {
|
||||
@Override
|
||||
public Buffer allocate(int capacity) {
|
||||
return newCumulation;
|
||||
}
|
||||
};
|
||||
|
||||
Buffer in = heap().allocate(12, BIG_ENDIAN).writerOffset(12);
|
||||
Throwable thrown = null;
|
||||
try {
|
||||
ByteToMessageDecoder.MERGE_CUMULATOR.cumulate(allocator, oldCumulation, in);
|
||||
} catch (Throwable t) {
|
||||
thrown = t;
|
||||
}
|
||||
|
||||
assertFalse(in.isAccessible());
|
||||
|
||||
if (shouldFail) {
|
||||
assertSame(newCumulation.writeError(), thrown);
|
||||
assertTrue(oldCumulation.isOwned());
|
||||
oldCumulation.close();
|
||||
assertFalse(newCumulation.isAccessible());
|
||||
} else {
|
||||
assertNull(thrown);
|
||||
assertFalse(oldCumulation.isAccessible());
|
||||
assertTrue(newCumulation.isOwned());
|
||||
newCumulation.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void releaseWhenCompositeCumulateThrows() {
|
||||
Buffer in = heap().allocate(12, LITTLE_ENDIAN).writerOffset(12);
|
||||
try (Buffer cumulation = Buffer.compose(heap(), heap().allocate(1, BIG_ENDIAN).writeByte((byte) 0).send())) {
|
||||
ByteToMessageDecoder.COMPOSITE_CUMULATOR.cumulate(heap(), cumulation, in);
|
||||
fail();
|
||||
} catch (IllegalArgumentException expected) {
|
||||
assertThat(expected).hasMessageContaining("byte order");
|
||||
assertFalse(in.isAccessible());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDoesNotOverRead() {
|
||||
class ReadInterceptingHandler implements ChannelHandler {
|
||||
private int readsTriggered;
|
||||
|
||||
@Override
|
||||
public void read(ChannelHandlerContext ctx) {
|
||||
readsTriggered++;
|
||||
ctx.read();
|
||||
}
|
||||
}
|
||||
ReadInterceptingHandler interceptor = new ReadInterceptingHandler();
|
||||
|
||||
EmbeddedChannel channel = new EmbeddedChannel();
|
||||
channel.config().setAutoRead(false);
|
||||
channel.pipeline().addLast(interceptor, new FixedLengthFrameDecoder(3));
|
||||
assertEquals(0, interceptor.readsTriggered);
|
||||
|
||||
// 0 complete frames, 1 partial frame: SHOULD trigger a read
|
||||
channel.writeInbound(heap().allocate(2, BIG_ENDIAN).writeShort((short) 0x0001));
|
||||
assertEquals(1, interceptor.readsTriggered);
|
||||
|
||||
// 2 complete frames, 0 partial frames: should NOT trigger a read
|
||||
channel.writeInbound(heap().allocate(1).writeByte((byte) 2),
|
||||
heap().allocate(3).writeByte((byte) 3).writeByte((byte) 4).writeByte((byte) 5));
|
||||
assertEquals(1, interceptor.readsTriggered);
|
||||
|
||||
// 1 complete frame, 1 partial frame: should NOT trigger a read
|
||||
channel.writeInbound(heap().allocate(3).writeByte((byte) 6).writeByte((byte) 7).writeByte((byte) 8),
|
||||
heap().allocate(1).writeByte((byte) 9));
|
||||
assertEquals(1, interceptor.readsTriggered);
|
||||
|
||||
// 1 complete frame, 1 partial frame: should NOT trigger a read
|
||||
channel.writeInbound(heap().allocate(2).writeByte((byte) 10).writeByte((byte) 11),
|
||||
heap().allocate(1).writeByte((byte) 12));
|
||||
assertEquals(1, interceptor.readsTriggered);
|
||||
|
||||
// 0 complete frames, 1 partial frame: SHOULD trigger a read
|
||||
channel.writeInbound(heap().allocate(1).writeByte((byte) 13));
|
||||
assertEquals(2, interceptor.readsTriggered);
|
||||
|
||||
// 1 complete frame, 0 partial frames: should NOT trigger a read
|
||||
channel.writeInbound(heap().allocate(1).writeByte((byte) 14));
|
||||
assertEquals(2, interceptor.readsTriggered);
|
||||
|
||||
for (int i = 0; i < 5; i++) {
|
||||
try (Buffer read = channel.readInbound()) {
|
||||
assertEquals(i * 3, read.getByte(0));
|
||||
assertEquals(i * 3 + 1, read.getByte(1));
|
||||
assertEquals(i * 3 + 2, read.getByte(2));
|
||||
}
|
||||
}
|
||||
assertFalse(channel.finish());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDisorder() {
|
||||
ByteToMessageDecoder decoder = new ByteToMessageDecoder() {
|
||||
int count;
|
||||
|
||||
//read 4 byte then remove this decoder
|
||||
@Override
|
||||
protected void decode(ChannelHandlerContext ctx, Buffer in) {
|
||||
ctx.fireChannelRead(in.readByte());
|
||||
if (++count >= 4) {
|
||||
ctx.pipeline().remove(this);
|
||||
}
|
||||
}
|
||||
};
|
||||
EmbeddedChannel channel = new EmbeddedChannel(decoder);
|
||||
byte[] bytes = {1, 2, 3, 4, 5};
|
||||
Buffer buf = heap().allocate(bytes.length);
|
||||
for (byte b : bytes) {
|
||||
buf.writeByte(b);
|
||||
}
|
||||
assertTrue(channel.writeInbound(buf));
|
||||
assertEquals((byte) 1, channel.readInbound());
|
||||
assertEquals((byte) 2, channel.readInbound());
|
||||
assertEquals((byte) 3, channel.readInbound());
|
||||
assertEquals((byte) 4, channel.readInbound());
|
||||
Buffer buffer5 = channel.readInbound();
|
||||
assertEquals((byte) 5, buffer5.readByte());
|
||||
assertEquals(0, buffer5.readableBytes());
|
||||
buffer5.close();
|
||||
assertFalse(buffer5.isAccessible());
|
||||
assertFalse(channel.finish());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDecodeLast() {
|
||||
final AtomicBoolean removeHandler = new AtomicBoolean();
|
||||
EmbeddedChannel channel = new EmbeddedChannel(new ByteToMessageDecoder() {
|
||||
|
||||
@Override
|
||||
protected void decode(ChannelHandlerContext ctx, Buffer in) {
|
||||
if (removeHandler.get()) {
|
||||
ctx.pipeline().remove(this);
|
||||
}
|
||||
}
|
||||
});
|
||||
byte[] bytes = new byte[1024];
|
||||
ThreadLocalRandom.current().nextBytes(bytes);
|
||||
try (Buffer buf = heap().allocate(bytes.length)) {
|
||||
for (byte b : bytes) {
|
||||
buf.writeByte(b);
|
||||
}
|
||||
|
||||
assertFalse(channel.writeInbound(buf.slice()));
|
||||
assertNull(channel.readInbound());
|
||||
removeHandler.set(true);
|
||||
// This should trigger channelInputClosed(...)
|
||||
channel.pipeline().fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
|
||||
|
||||
assertTrue(channel.finish());
|
||||
try (Buffer actual = channel.readInbound()) {
|
||||
assertEquals(buf.slice(), actual);
|
||||
}
|
||||
assertNull(channel.readInbound());
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,64 @@
|
||||
/*
|
||||
* Copyright 2021 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.netty.buffer.api.examples.bytetomessagedecoder;
|
||||
|
||||
import io.netty.buffer.api.Buffer;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
|
||||
import static io.netty.util.internal.ObjectUtil.checkPositive;
|
||||
|
||||
public class FixedLengthFrameDecoder extends ByteToMessageDecoder {
|
||||
private final int frameLength;
|
||||
|
||||
/**
|
||||
* Creates a new instance.
|
||||
*
|
||||
* @param frameLength the length of the frame
|
||||
*/
|
||||
public FixedLengthFrameDecoder(int frameLength) {
|
||||
checkPositive(frameLength, "frameLength");
|
||||
this.frameLength = frameLength;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected final void decode(ChannelHandlerContext ctx, Buffer in) throws Exception {
|
||||
Object decoded = decode0(ctx, in);
|
||||
if (decoded != null) {
|
||||
ctx.fireChannelRead(decoded);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a frame out of the {@link Buffer} and return it.
|
||||
*
|
||||
* @param ctx the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to
|
||||
* @param in the {@link Buffer} from which to read data
|
||||
* @return frame the {@link Buffer} which represent the frame or {@code null} if no frame could
|
||||
* be created.
|
||||
*/
|
||||
protected Object decode0(
|
||||
@SuppressWarnings("UnusedParameters") ChannelHandlerContext ctx, Buffer in) throws Exception {
|
||||
if (in.readableBytes() < frameLength) {
|
||||
return null;
|
||||
} else {
|
||||
try {
|
||||
return in.slice(in.readerOffset(), frameLength);
|
||||
} finally {
|
||||
in.readerOffset(in.readerOffset() + frameLength);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user