501 lines
20 KiB
Java
501 lines
20 KiB
Java
/*
|
|
* Copyright 2012 The Netty Project
|
|
*
|
|
* The Netty Project licenses this file to you under the Apache License,
|
|
* version 2.0 (the "License"); you may not use this file except in compliance
|
|
* with the License. You may obtain a copy of the License at:
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
* License for the specific language governing permissions and limitations
|
|
* under the License.
|
|
*/
|
|
package io.netty.handler.codec;
|
|
|
|
import static io.netty.util.internal.ObjectUtil.checkPositive;
|
|
import static java.util.Objects.requireNonNull;
|
|
|
|
import io.netty.buffer.ByteBuf;
|
|
import io.netty.buffer.ByteBufAllocator;
|
|
import io.netty.buffer.CompositeByteBuf;
|
|
import io.netty.buffer.Unpooled;
|
|
import io.netty.channel.ChannelHandlerAdapter;
|
|
import io.netty.channel.ChannelConfig;
|
|
import io.netty.channel.ChannelHandlerContext;
|
|
import io.netty.channel.ChannelHandler;
|
|
import io.netty.channel.socket.ChannelInputShutdownEvent;
|
|
import io.netty.util.internal.StringUtil;
|
|
|
|
import java.util.List;
|
|
|
|
/**
|
|
* {@link ChannelHandler} which decodes bytes in a stream-like fashion from one {@link ByteBuf} to an
|
|
* other Message type.
|
|
*
|
|
* For example here is an implementation which reads all readable bytes from
|
|
* the input {@link ByteBuf} and create a new {@link ByteBuf}.
|
|
*
|
|
* <pre>
|
|
* public class SquareDecoder extends {@link ByteToMessageDecoder} {
|
|
* {@code @Override}
|
|
* public void decode({@link ChannelHandlerContext} ctx, {@link ByteBuf} in, List<Object> out)
|
|
* throws {@link Exception} {
|
|
* out.add(in.readBytes(in.readableBytes()));
|
|
* }
|
|
* }
|
|
* </pre>
|
|
*
|
|
* <h3>Frame detection</h3>
|
|
* <p>
|
|
* Generally frame detection should be handled earlier in the pipeline by adding a
|
|
* {@link DelimiterBasedFrameDecoder}, {@link FixedLengthFrameDecoder}, {@link LengthFieldBasedFrameDecoder},
|
|
* or {@link LineBasedFrameDecoder}.
|
|
* <p>
|
|
* If a custom frame decoder is required, then one needs to be careful when implementing
|
|
* one with {@link ByteToMessageDecoder}. Ensure there are enough bytes in the buffer for a
|
|
* complete frame by checking {@link ByteBuf#readableBytes()}. If there are not enough bytes
|
|
* for a complete frame, return without modifying the reader index to allow more bytes to arrive.
|
|
* <p>
|
|
* To check for complete frames without modifying the reader index, use methods like {@link ByteBuf#getInt(int)}.
|
|
* One <strong>MUST</strong> use the reader index when using methods like {@link ByteBuf#getInt(int)}.
|
|
* For example calling <tt>in.getInt(0)</tt> is assuming the frame starts at the beginning of the buffer, which
|
|
* is not always the case. Use <tt>in.getInt(in.readerIndex())</tt> instead.
|
|
* <h3>Pitfalls</h3>
|
|
* <p>
|
|
* Be aware that sub-classes of {@link ByteToMessageDecoder} <strong>MUST NOT</strong>
|
|
* annotated with {@link @Sharable}.
|
|
* <p>
|
|
* Some methods such as {@link ByteBuf#readBytes(int)} will cause a memory leak if the returned buffer
|
|
* is not released or added to the <tt>out</tt> {@link List}. Use derived buffers like {@link ByteBuf#readSlice(int)}
|
|
* to avoid leaking memory.
|
|
*/
|
|
public abstract class ByteToMessageDecoder extends ChannelHandlerAdapter {
|
|
|
|
/**
|
|
* Cumulate {@link ByteBuf}s by merge them into one {@link ByteBuf}'s, using memory copies.
|
|
*/
|
|
public static final Cumulator MERGE_CUMULATOR = (alloc, cumulation, in) -> {
|
|
try {
|
|
if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes()
|
|
|| cumulation.refCnt() > 1 || cumulation.isReadOnly()) {
|
|
// Expand cumulation (by replace it) when either there is not more room in the buffer
|
|
// or if the refCnt is greater then 1 which may happen when the user use slice().retain() or
|
|
// duplicate().retain() or if its read-only.
|
|
//
|
|
// See:
|
|
// - https://github.com/netty/netty/issues/2327
|
|
// - https://github.com/netty/netty/issues/1764
|
|
cumulation = expandCumulation(alloc, cumulation, in);
|
|
} else {
|
|
cumulation.writeBytes(in);
|
|
}
|
|
return cumulation;
|
|
} finally {
|
|
// We must release in in all cases as otherwise it may produce a leak if writeBytes(...) throw
|
|
// for whatever release (for example because of OutOfMemoryError)
|
|
in.release();
|
|
}
|
|
};
|
|
|
|
/**
|
|
* Cumulate {@link ByteBuf}s by add them to a {@link CompositeByteBuf} and so do no memory copy whenever possible.
|
|
* Be aware that {@link CompositeByteBuf} use a more complex indexing implementation so depending on your use-case
|
|
* and the decoder implementation this may be slower then just use the {@link #MERGE_CUMULATOR}.
|
|
*/
|
|
public static final Cumulator COMPOSITE_CUMULATOR = (alloc, cumulation, in) -> {
|
|
ByteBuf buffer;
|
|
try {
|
|
if (cumulation.refCnt() > 1) {
|
|
// Expand cumulation (by replace it) when the refCnt is greater then 1 which may happen when the
|
|
// user use slice().retain() or duplicate().retain().
|
|
//
|
|
// See:
|
|
// - https://github.com/netty/netty/issues/2327
|
|
// - https://github.com/netty/netty/issues/1764
|
|
buffer = expandCumulation(alloc, cumulation, in);
|
|
} else {
|
|
CompositeByteBuf composite;
|
|
if (cumulation instanceof CompositeByteBuf) {
|
|
composite = (CompositeByteBuf) cumulation;
|
|
} else {
|
|
composite = alloc.compositeBuffer(Integer.MAX_VALUE);
|
|
composite.addComponent(true, cumulation);
|
|
}
|
|
composite.addComponent(true, in);
|
|
in = null;
|
|
buffer = composite;
|
|
}
|
|
return buffer;
|
|
} finally {
|
|
if (in != null) {
|
|
// We must release if the ownership was not transferred as otherwise it may produce a leak if
|
|
// writeBytes(...) throw for whatever release (for example because of OutOfMemoryError).
|
|
in.release();
|
|
}
|
|
}
|
|
};
|
|
|
|
ByteBuf cumulation;
|
|
private Cumulator cumulator = MERGE_CUMULATOR;
|
|
private boolean singleDecode;
|
|
private boolean first;
|
|
// TODO: Improve this...
|
|
private CodecOutputList out = CodecOutputList.newInstance();
|
|
|
|
/**
|
|
* This flag is used to determine if we need to call {@link ChannelHandlerContext#read()} to consume more data
|
|
* when {@link ChannelConfig#isAutoRead()} is {@code false}.
|
|
*/
|
|
private boolean firedChannelRead;
|
|
private int discardAfterReads = 16;
|
|
private int numReads;
|
|
|
|
protected ByteToMessageDecoder() {
|
|
ensureNotSharable();
|
|
}
|
|
|
|
/**
|
|
* If set then only one message is decoded on each {@link #channelRead(ChannelHandlerContext, Object)}
|
|
* call. This may be useful if you need to do some protocol upgrade and want to make sure nothing is mixed up.
|
|
*
|
|
* Default is {@code false} as this has performance impacts.
|
|
*/
|
|
public void setSingleDecode(boolean singleDecode) {
|
|
this.singleDecode = singleDecode;
|
|
}
|
|
|
|
/**
|
|
* If {@code true} then only one message is decoded on each
|
|
* {@link #channelRead(ChannelHandlerContext, Object)} call.
|
|
*
|
|
* Default is {@code false} as this has performance impacts.
|
|
*/
|
|
public boolean isSingleDecode() {
|
|
return singleDecode;
|
|
}
|
|
|
|
/**
|
|
* Set the {@link Cumulator} to use for cumulate the received {@link ByteBuf}s.
|
|
*/
|
|
public void setCumulator(Cumulator cumulator) {
|
|
requireNonNull(cumulator, "cumulator");
|
|
this.cumulator = cumulator;
|
|
}
|
|
|
|
/**
|
|
* Set the number of reads after which {@link ByteBuf#discardSomeReadBytes()} are called and so free up memory.
|
|
* The default is {@code 16}.
|
|
*/
|
|
public void setDiscardAfterReads(int discardAfterReads) {
|
|
checkPositive(discardAfterReads, "discardAfterReads");
|
|
this.discardAfterReads = discardAfterReads;
|
|
}
|
|
|
|
/**
|
|
* Returns the actual number of readable bytes in the internal cumulative
|
|
* buffer of this decoder. You usually do not need to rely on this value
|
|
* to write a decoder. Use it only when you must use it at your own risk.
|
|
* This method is a shortcut to {@link #internalBuffer() internalBuffer().readableBytes()}.
|
|
*/
|
|
protected int actualReadableBytes() {
|
|
return internalBuffer().readableBytes();
|
|
}
|
|
|
|
/**
|
|
* Returns the internal cumulative buffer of this decoder. You usually
|
|
* do not need to access the internal buffer directly to write a decoder.
|
|
* Use it only when you must use it at your own risk.
|
|
*/
|
|
protected ByteBuf internalBuffer() {
|
|
if (cumulation != null) {
|
|
return cumulation;
|
|
} else {
|
|
return Unpooled.EMPTY_BUFFER;
|
|
}
|
|
}
|
|
|
|
@Override
|
|
public final void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
|
|
//fireChannelRead(ctx, out, out.size());
|
|
//out.clear();
|
|
|
|
ByteBuf buf = cumulation;
|
|
if (buf != null) {
|
|
// Directly set this to null so we are sure we not access it in any other method here anymore.
|
|
cumulation = null;
|
|
numReads = 0;
|
|
int readable = buf.readableBytes();
|
|
if (readable > 0) {
|
|
ctx.fireChannelRead(buf);
|
|
ctx.fireChannelReadComplete();
|
|
} else {
|
|
buf.release();
|
|
}
|
|
}
|
|
handlerRemoved0(ctx);
|
|
}
|
|
|
|
/**
|
|
* Gets called after the {@link ByteToMessageDecoder} was removed from the actual context and it doesn't handle
|
|
* events anymore.
|
|
*/
|
|
protected void handlerRemoved0(ChannelHandlerContext ctx) throws Exception { }
|
|
|
|
@Override
|
|
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
|
|
if (msg instanceof ByteBuf) {
|
|
try {
|
|
ByteBuf data = (ByteBuf) msg;
|
|
first = cumulation == null;
|
|
if (first) {
|
|
cumulation = data;
|
|
} else {
|
|
cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
|
|
}
|
|
callDecode(ctx, cumulation, out);
|
|
} catch (DecoderException e) {
|
|
throw e;
|
|
} catch (Exception e) {
|
|
throw new DecoderException(e);
|
|
} finally {
|
|
if (cumulation != null && !cumulation.isReadable()) {
|
|
numReads = 0;
|
|
cumulation.release();
|
|
cumulation = null;
|
|
} else if (++ numReads >= discardAfterReads) {
|
|
// We did enough reads already try to discard some bytes so we not risk to see a OOME.
|
|
// See https://github.com/netty/netty/issues/4275
|
|
numReads = 0;
|
|
discardSomeReadBytes();
|
|
}
|
|
|
|
int size = out.size();
|
|
firedChannelRead |= size > 0;
|
|
fireChannelRead(ctx, out, size);
|
|
out.clear();
|
|
}
|
|
} else {
|
|
ctx.fireChannelRead(msg);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Get {@code numElements} out of the {@link List} and forward these through the pipeline.
|
|
*/
|
|
static void fireChannelRead(ChannelHandlerContext ctx, List<Object> msgs, int numElements) {
|
|
if (msgs instanceof CodecOutputList) {
|
|
fireChannelRead(ctx, (CodecOutputList) msgs, numElements);
|
|
} else {
|
|
for (int i = 0; i < numElements; i++) {
|
|
ctx.fireChannelRead(msgs.get(i));
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Get {@code numElements} out of the {@link CodecOutputList} and forward these through the pipeline.
|
|
*/
|
|
static void fireChannelRead(ChannelHandlerContext ctx, CodecOutputList msgs, int numElements) {
|
|
for (int i = 0; i < numElements; i ++) {
|
|
ctx.fireChannelRead(msgs.getUnsafe(i));
|
|
}
|
|
}
|
|
|
|
@Override
|
|
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
|
|
numReads = 0;
|
|
discardSomeReadBytes();
|
|
if (!firedChannelRead && !ctx.channel().config().isAutoRead()) {
|
|
ctx.read();
|
|
}
|
|
firedChannelRead = false;
|
|
ctx.fireChannelReadComplete();
|
|
}
|
|
|
|
protected final void discardSomeReadBytes() {
|
|
if (cumulation != null && !first && cumulation.refCnt() == 1) {
|
|
// discard some bytes if possible to make more room in the
|
|
// buffer but only if the refCnt == 1 as otherwise the user may have
|
|
// used slice().retain() or duplicate().retain().
|
|
//
|
|
// See:
|
|
// - https://github.com/netty/netty/issues/2327
|
|
// - https://github.com/netty/netty/issues/1764
|
|
cumulation.discardSomeReadBytes();
|
|
}
|
|
}
|
|
|
|
@Override
|
|
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
|
|
channelInputClosed(ctx, true);
|
|
}
|
|
|
|
@Override
|
|
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
|
|
if (evt instanceof ChannelInputShutdownEvent) {
|
|
// The decodeLast method is invoked when a channelInactive event is encountered.
|
|
// This method is responsible for ending requests in some situations and must be called
|
|
// when the input has been shutdown.
|
|
channelInputClosed(ctx, false);
|
|
}
|
|
ctx.fireUserEventTriggered(evt);
|
|
}
|
|
|
|
private void channelInputClosed(ChannelHandlerContext ctx, boolean callChannelInactive) throws Exception {
|
|
try {
|
|
channelInputClosed(ctx, out);
|
|
} catch (DecoderException e) {
|
|
throw e;
|
|
} catch (Exception e) {
|
|
throw new DecoderException(e);
|
|
} finally {
|
|
if (cumulation != null) {
|
|
cumulation.release();
|
|
cumulation = null;
|
|
}
|
|
int size = out.size();
|
|
fireChannelRead(ctx, out, size);
|
|
out.clear();
|
|
if (size > 0) {
|
|
// Something was read, call fireChannelReadComplete()
|
|
ctx.fireChannelReadComplete();
|
|
}
|
|
if (callChannelInactive) {
|
|
ctx.fireChannelInactive();
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Called when the input of the channel was closed which may be because it changed to inactive or because of
|
|
* {@link ChannelInputShutdownEvent}.
|
|
*/
|
|
void channelInputClosed(ChannelHandlerContext ctx, List<Object> out) throws Exception {
|
|
if (cumulation != null) {
|
|
callDecode(ctx, cumulation, out);
|
|
decodeLast(ctx, cumulation, out);
|
|
} else {
|
|
decodeLast(ctx, Unpooled.EMPTY_BUFFER, out);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Called once data should be decoded from the given {@link ByteBuf}. This method will call
|
|
* {@link #decode(ChannelHandlerContext, ByteBuf, List)} as long as decoding should take place.
|
|
*
|
|
* @param ctx the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to
|
|
* @param in the {@link ByteBuf} from which to read data
|
|
* @param out the {@link List} to which decoded messages should be added
|
|
*/
|
|
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
|
|
try {
|
|
while (in.isReadable()) {
|
|
int outSize = out.size();
|
|
|
|
if (outSize > 0) {
|
|
fireChannelRead(ctx, out, outSize);
|
|
out.clear();
|
|
|
|
// Check if this handler was removed before continuing with decoding.
|
|
// If it was removed, it is not safe to continue to operate on the buffer.
|
|
//
|
|
// See:
|
|
// - https://github.com/netty/netty/issues/4635
|
|
if (ctx.isRemoved()) {
|
|
break;
|
|
}
|
|
outSize = 0;
|
|
}
|
|
|
|
int oldInputLength = in.readableBytes();
|
|
decode(ctx, in, out);
|
|
|
|
// Check if this handler was removed before continuing the loop.
|
|
// If it was removed, it is not safe to continue to operate on the buffer.
|
|
//
|
|
// See https://github.com/netty/netty/issues/1664
|
|
if (ctx.isRemoved()) {
|
|
break;
|
|
}
|
|
|
|
if (outSize == out.size()) {
|
|
if (oldInputLength == in.readableBytes()) {
|
|
break;
|
|
} else {
|
|
continue;
|
|
}
|
|
}
|
|
|
|
if (oldInputLength == in.readableBytes()) {
|
|
throw new DecoderException(
|
|
StringUtil.simpleClassName(getClass()) +
|
|
".decode() did not read anything but decoded a message.");
|
|
}
|
|
|
|
if (isSingleDecode()) {
|
|
break;
|
|
}
|
|
}
|
|
} catch (DecoderException e) {
|
|
throw e;
|
|
} catch (Exception cause) {
|
|
throw new DecoderException(cause);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Decode the from one {@link ByteBuf} to an other. This method will be called till either the input
|
|
* {@link ByteBuf} has nothing to read when return from this method or till nothing was read from the input
|
|
* {@link ByteBuf}.
|
|
*
|
|
* @param ctx the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to
|
|
* @param in the {@link ByteBuf} from which to read data
|
|
* @param out the {@link List} to which decoded messages should be added
|
|
* @throws Exception is thrown if an error occurs
|
|
*/
|
|
protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;
|
|
|
|
/**
|
|
* Is called one last time when the {@link ChannelHandlerContext} goes in-active. Which means the
|
|
* {@link #channelInactive(ChannelHandlerContext)} was triggered.
|
|
*
|
|
* By default this will just call {@link #decode(ChannelHandlerContext, ByteBuf, List)} but sub-classes may
|
|
* override this for some special cleanup operation.
|
|
*/
|
|
protected void decodeLast(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
|
|
if (in.isReadable()) {
|
|
// Only call decode() if there is something left in the buffer to decode.
|
|
// See https://github.com/netty/netty/issues/4386
|
|
decode(ctx, in, out);
|
|
}
|
|
}
|
|
|
|
static ByteBuf expandCumulation(ByteBufAllocator alloc, ByteBuf oldCumulation, ByteBuf in) {
|
|
ByteBuf newCumulation = alloc.buffer(oldCumulation.readableBytes() + in.readableBytes());
|
|
ByteBuf toRelease = newCumulation;
|
|
try {
|
|
newCumulation.writeBytes(oldCumulation);
|
|
newCumulation.writeBytes(in);
|
|
toRelease = oldCumulation;
|
|
return newCumulation;
|
|
} finally {
|
|
toRelease.release();
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Cumulate {@link ByteBuf}s.
|
|
*/
|
|
public interface Cumulator {
|
|
/**
|
|
* Cumulate the given {@link ByteBuf}s and return the {@link ByteBuf} that holds the cumulated bytes.
|
|
* The implementation is responsible to correctly handle the life-cycle of the given {@link ByteBuf}s and so
|
|
* call {@link ByteBuf#release()} if a {@link ByteBuf} is fully consumed.
|
|
*/
|
|
ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in);
|
|
}
|
|
}
|