From af4b71a00e30faf86ef1c2365e965e8f04f65bee Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Wed, 3 Apr 2013 11:32:33 +0200 Subject: [PATCH] Remove special handling of Object[] in codec framework (a.k.a unfolding) - Fixes #1229 - Primarily written by @normanmaurer and revised by @trustin This commit removes the notion of unfolding from the codec framework completely. Unfolding was introduced in Netty 3.x to work around the shortcoming of the codec framework where encode() and decode() did not allow generating multiple messages. Such a shortcoming can be fixed by changing the signature of encode() and decode() instead of introducing an obscure workaround like unfolding. Therefore, we changed the signature of them in 4.0. The change is simple, but backward-incompatible. encode() and decode() do not return anything. Instead, the codec framework will pass a MessageBuf so encode() and decode() can add the generated messages into the MessageBuf. --- .../io/netty/buffer/AbstractMessageBuf.java | 23 ----- .../io/netty/buffer/FilteredMessageBuf.java | 5 - .../main/java/io/netty/buffer/MessageBuf.java | 11 --- .../handler/codec/http/HttpClientCodec.java | 27 ++--- .../codec/http/HttpContentDecoder.java | 35 ++++--- .../codec/http/HttpContentEncoder.java | 65 +++++++----- .../codec/http/HttpObjectAggregator.java | 13 +-- .../handler/codec/http/HttpObjectDecoder.java | 99 ++++++++++++------- .../websocketx/WebSocket00FrameDecoder.java | 9 +- .../websocketx/WebSocket08FrameDecoder.java | 51 +++++----- .../websocketx/WebSocketFrameAggregator.java | 15 +-- .../handler/codec/spdy/SpdyFrameDecoder.java | 58 ++++++----- .../handler/codec/spdy/SpdyHttpDecoder.java | 22 +++-- .../handler/codec/spdy/SpdyHttpEncoder.java | 8 +- .../spdy/SpdyHttpResponseStreamIdHandler.java | 11 +-- .../codec/socks/SocksAuthRequestDecoder.java | 5 +- .../codec/socks/SocksAuthResponseDecoder.java | 6 +- .../codec/socks/SocksCmdRequestDecoder.java | 5 +- .../codec/socks/SocksCmdResponseDecoder.java | 5 +- .../codec/socks/SocksInitRequestDecoder.java | 5 +- .../codec/socks/SocksInitResponseDecoder.java | 5 +- .../handler/codec/ByteToMessageCodec.java | 14 +-- .../handler/codec/ByteToMessageDecoder.java | 98 ++++++++++++------ .../codec/DelimiterBasedFrameDecoder.java | 8 ++ .../codec/FixedLengthFrameDecoder.java | 8 ++ .../codec/LengthFieldBasedFrameDecoder.java | 24 +++-- .../handler/codec/LineBasedFrameDecoder.java | 8 ++ .../handler/codec/MessageToMessageCodec.java | 12 +-- .../codec/MessageToMessageDecoder.java | 37 +++++-- .../codec/MessageToMessageEncoder.java | 47 ++++++--- .../netty/handler/codec/ReplayingDecoder.java | 66 +++++++------ .../handler/codec/base64/Base64Decoder.java | 5 +- .../handler/codec/bytes/ByteArrayDecoder.java | 5 +- .../CompatibleMarshallingDecoder.java | 15 +-- .../codec/protobuf/ProtobufDecoder.java | 11 ++- .../codec/protobuf/ProtobufEncoder.java | 10 +- .../ProtobufVarint32FrameDecoder.java | 10 +- .../handler/codec/string/StringDecoder.java | 5 +- .../handler/codec/ReplayingDecoderTest.java | 5 +- ...tractCompatibleMarshallingDecoderTest.java | 14 ++- .../RiverMarshallingDecoderTest.java | 14 +++ .../SerialMarshallingDecoderTest.java | 14 +++ .../example/factorial/BigIntegerDecoder.java | 9 +- .../io/netty/channel/ChannelHandlerUtil.java | 4 +- 44 files changed, 558 insertions(+), 368 deletions(-) diff --git a/buffer/src/main/java/io/netty/buffer/AbstractMessageBuf.java b/buffer/src/main/java/io/netty/buffer/AbstractMessageBuf.java index 8abb685cee..ec9067239c 100644 --- a/buffer/src/main/java/io/netty/buffer/AbstractMessageBuf.java +++ b/buffer/src/main/java/io/netty/buffer/AbstractMessageBuf.java @@ -207,29 +207,6 @@ public abstract class AbstractMessageBuf extends AbstractQueue implements return super.element(); } - @Override - @SuppressWarnings("unchecked") - public boolean unfoldAndAdd(Object o) { - if (o == null) { - return false; - } - - if (o instanceof Object[]) { - Object[] a = (Object[]) o; - int i; - for (i = 0; i < a.length; i ++) { - Object m = a[i]; - if (m == null) { - break; - } - add((T) m); - } - return i != 0; - } - - return add((T) o); - } - @Override public int drainTo(Collection c) { ensureAccessible(); diff --git a/buffer/src/main/java/io/netty/buffer/FilteredMessageBuf.java b/buffer/src/main/java/io/netty/buffer/FilteredMessageBuf.java index cef27554bf..496814db08 100644 --- a/buffer/src/main/java/io/netty/buffer/FilteredMessageBuf.java +++ b/buffer/src/main/java/io/netty/buffer/FilteredMessageBuf.java @@ -194,11 +194,6 @@ public abstract class FilteredMessageBuf implements MessageBuf { buf.clear(); } - @Override - public boolean unfoldAndAdd(Object o) { - return buf.unfoldAndAdd(o); - } - @Override public int refCnt() { return buf.refCnt(); diff --git a/buffer/src/main/java/io/netty/buffer/MessageBuf.java b/buffer/src/main/java/io/netty/buffer/MessageBuf.java index 00d45c5156..0e2b6ada22 100644 --- a/buffer/src/main/java/io/netty/buffer/MessageBuf.java +++ b/buffer/src/main/java/io/netty/buffer/MessageBuf.java @@ -25,17 +25,6 @@ import java.util.Queue; */ public interface MessageBuf extends Buf, Queue { - /** - * Unfold the specified object if necessary, and then add the unfolded objects (or the specified object if - * unfonding was not necessary) to this buffer. If the specified object is an object array ({@code Object[]}), - * this method adds the elements of the array to this buffer until {@code null} is encountered. If the specified - * object is {@code null}, nothing is added to this buffer. Otherwise, the specified object is added to this - * buffer as-is. - * - * @return {@code true} if one or more messages were added to this buffer. {@code false} otherwise. - */ - boolean unfoldAndAdd(Object o); - /** * Drain the content of te {@link MessageBuf} to the given {@link Collection}. * diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/HttpClientCodec.java b/codec-http/src/main/java/io/netty/handler/codec/http/HttpClientCodec.java index 162e0e248a..36a9195af9 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/HttpClientCodec.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/HttpClientCodec.java @@ -16,6 +16,7 @@ package io.netty.handler.codec.http; import io.netty.buffer.ByteBuf; +import io.netty.buffer.FilteredMessageBuf; import io.netty.buffer.MessageBuf; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; @@ -119,6 +120,7 @@ public final class HttpClientCodec } private final class Encoder extends HttpRequestEncoder { + @Override protected void encode( ChannelHandlerContext ctx, HttpObject msg, ByteBuf out) throws Exception { @@ -139,28 +141,32 @@ public final class HttpClientCodec } private final class Decoder extends HttpResponseDecoder { - Decoder(int maxInitialLineLength, int maxHeaderSize, int maxChunkSize) { super(maxInitialLineLength, maxHeaderSize, maxChunkSize); } @Override - protected Object decode( - ChannelHandlerContext ctx, ByteBuf buffer) throws Exception { + protected void decode( + ChannelHandlerContext ctx, ByteBuf buffer, MessageBuf out) throws Exception { if (done) { int readable = actualReadableBytes(); if (readable == 0) { // if non is readable just return null // https://github.com/netty/netty/issues/1159 - return null; + return; } - return buffer.readBytes(readable); + out.add(buffer.readBytes(readable)); } else { - Object msg = super.decode(ctx, buffer); if (failOnMissingResponse) { - decrement(msg); + out = new FilteredMessageBuf(out) { + @Override + protected Object filter(Object msg) { + decrement(msg); + return msg; + } + }; } - return msg; + super.decode(ctx, buffer, out); } } @@ -172,11 +178,6 @@ public final class HttpClientCodec // check if it's an Header and its transfer encoding is not chunked. if (msg instanceof LastHttpContent) { requestResponseCounter.decrementAndGet(); - } else if (msg instanceof Object[]) { - Object[] objects = (Object[]) msg; - for (Object obj: objects) { - decrement(obj); - } } } diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/HttpContentDecoder.java b/codec-http/src/main/java/io/netty/handler/codec/http/HttpContentDecoder.java index 2ec27ad97f..df3ce33c54 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/HttpContentDecoder.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/HttpContentDecoder.java @@ -18,6 +18,7 @@ package io.netty.handler.codec.http; import io.netty.buffer.BufUtil; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufHolder; +import io.netty.buffer.MessageBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.embedded.EmbeddedByteChannel; @@ -50,14 +51,15 @@ public abstract class HttpContentDecoder extends MessageToMessageDecoder out) throws Exception { if (msg instanceof HttpResponse && ((HttpResponse) msg).getStatus().code() == 100) { - // 100-continue response must be passed through. - BufUtil.retain(msg); + if (!(msg instanceof LastHttpContent)) { continueResponse = true; } - return msg; + // 100-continue response must be passed through. + out.add(BufUtil.retain(msg)); + return; } if (continueResponse) { @@ -65,8 +67,8 @@ public abstract class HttpContentDecoder extends MessageToMessageDecoder out) throws Exception { String acceptedEncoding = msg.headers().get(HttpHeaders.Names.ACCEPT_ENCODING); if (acceptedEncoding == null) { acceptedEncoding = HttpHeaders.Values.IDENTITY; } acceptEncodingQueue.add(acceptedEncoding); - BufUtil.retain(msg); - return msg; + out.add(BufUtil.retain(msg)); } @Override - protected Object encode(ChannelHandlerContext ctx, HttpObject msg) + protected void encode(ChannelHandlerContext ctx, HttpObject msg, MessageBuf out) throws Exception { if (msg instanceof HttpResponse && ((HttpResponse) msg).getStatus().code() == 100) { - // 100-continue response must be passed through. - BufUtil.retain(msg); + if (!(msg instanceof LastHttpContent)) { continueResponse = true; } - return msg; + // 100-continue response must be passed through. + out.add(BufUtil.retain(msg)); + return; } if (continueResponse) { @@ -84,8 +86,8 @@ public abstract class HttpContentEncoder extends MessageToMessageCodec { } @Override - protected Object decode(ChannelHandlerContext ctx, HttpObject msg) throws Exception { + protected void decode(ChannelHandlerContext ctx, HttpObject msg, MessageBuf out) throws Exception { FullHttpMessage currentMessage = this.currentMessage; if (msg instanceof HttpMessage) { @@ -126,8 +127,8 @@ public class HttpObjectAggregator extends MessageToMessageDecoder { if (!m.getDecoderResult().isSuccess()) { removeTransferEncodingChunked(m); this.currentMessage = null; - BufUtil.retain(m); - return m; + out.add(BufUtil.retain(m)); + return; } if (msg instanceof HttpRequest) { HttpRequest header = (HttpRequest) msg; @@ -146,8 +147,6 @@ public class HttpObjectAggregator extends MessageToMessageDecoder { // A streamed message - initialize the cumulative buffer, and wait for incoming chunks. removeTransferEncodingChunked(currentMessage); - return null; - } else if (msg instanceof HttpContent) { assert currentMessage != null; @@ -196,9 +195,7 @@ public class HttpObjectAggregator extends MessageToMessageDecoder { String.valueOf(content.readableBytes())); // All done - return currentMessage; - } else { - return null; + out.add(currentMessage); } } else { throw new Error(); diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/HttpObjectDecoder.java b/codec-http/src/main/java/io/netty/handler/codec/http/HttpObjectDecoder.java index dab0c44e8a..dc4c78c087 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/HttpObjectDecoder.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/HttpObjectDecoder.java @@ -16,6 +16,7 @@ package io.netty.handler.codec.http; import io.netty.buffer.ByteBuf; +import io.netty.buffer.MessageBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPipeline; @@ -167,7 +168,7 @@ public abstract class HttpObjectDecoder extends ReplayingDecoder out) throws Exception { switch (state()) { case SKIP_CONTROL_CHARS: { try { @@ -182,14 +183,15 @@ public abstract class HttpObjectDecoder extends ReplayingDecoder maxChunkSize || HttpHeaders.is100ContinueExpected(message)) { // Generate FullHttpMessage first. HttpChunks will follow. checkpoint(State.READ_VARIABLE_LENGTH_CONTENT_AS_CHUNKS); - return message; + out.add(message); + return; } break; default: throw new IllegalStateException("Unexpected state: " + nextState); } - // We return null here, this forces decode to be called again where we will decode the content - return null; + // We return here, this forces decode to be called again where we will decode the content + return; } catch (Exception e) { - return invalidMessage(e); + out.add(invalidMessage(e)); + return; } case READ_VARIABLE_LENGTH_CONTENT: { int toRead = actualReadableBytes(); if (toRead > maxChunkSize) { toRead = maxChunkSize; } - return new Object[] { message, new DefaultHttpContent(buffer.readBytes(toRead))}; + out.add(message); + out.add(new DefaultHttpContent(buffer.readBytes(toRead))); + return; } case READ_VARIABLE_LENGTH_CONTENT_AS_CHUNKS: { // Keep reading data as a chunk until the end of connection is reached. @@ -253,12 +269,18 @@ public abstract class HttpObjectDecoder extends ReplayingDecoder maxChunkSize) { // A chunk is too large. Split them into multiple chunks again. checkpoint(State.READ_CHUNKED_CONTENT_AS_CHUNKS); @@ -314,13 +338,15 @@ public abstract class HttpObjectDecoder extends ReplayingDecoder { } @Override - protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception { + protected void decode(ChannelHandlerContext ctx, ByteBuf in, MessageBuf out) throws Exception { // Discard all data received if closing handshake was received before. if (receivedClosingHandshake) { in.skipBytes(actualReadableBytes()); - return null; + return; } // Decode a frame otherwise. byte type = in.readByte(); if ((type & 0x80) == 0x80) { // If the MSB on type is set, decode the frame length - return decodeBinaryFrame(ctx, type, in); + out.add(decodeBinaryFrame(ctx, type, in)); } else { // Decode a 0xff terminated UTF-8 string - return decodeTextFrame(ctx, in); + out.add(decodeTextFrame(ctx, in)); } } diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocket08FrameDecoder.java b/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocket08FrameDecoder.java index b1601de28e..2e0be188d7 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocket08FrameDecoder.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocket08FrameDecoder.java @@ -54,6 +54,7 @@ package io.netty.handler.codec.http.websocketx; import io.netty.buffer.ByteBuf; +import io.netty.buffer.MessageBuf; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.CorruptedFrameException; @@ -117,12 +118,12 @@ public class WebSocket08FrameDecoder extends ReplayingDecoder out) throws Exception { // Discard all data received if closing handshake was received before. if (receivedClosingHandshake) { in.skipBytes(actualReadableBytes()); - return null; + return; } switch (state()) { @@ -148,31 +149,31 @@ public class WebSocket08FrameDecoder extends ReplayingDecoder 7) { // control frame (have MSB in opcode set) // control frames MUST NOT be fragmented if (!frameFinalFlag) { protocolViolation(ctx, "fragmented control frame"); - return null; + return; } // control frames MUST have payload 125 octets or less if (framePayloadLen1 > 125) { protocolViolation(ctx, "control frame with payload length > 125 octets"); - return null; + return; } // check for reserved control frame opcodes if (!(frameOpcode == OPCODE_CLOSE || frameOpcode == OPCODE_PING || frameOpcode == OPCODE_PONG)) { protocolViolation(ctx, "control frame using reserved opcode " + frameOpcode); - return null; + return; } // close frame : if there is a body, the first two bytes of the @@ -180,25 +181,25 @@ public class WebSocket08FrameDecoder extends ReplayingDecoder maxFramePayloadLength) { protocolViolation(ctx, "Max frame length of " + maxFramePayloadLength + " has been exceeded."); - return null; + return; } if (logger.isDebugEnabled()) { @@ -262,7 +263,7 @@ public class WebSocket08FrameDecoder extends ReplayingDecoder framePayloadLength) { // We have more than what we need so read up to the end of frame // Leave the remainder in the buffer for next frame @@ -291,15 +292,18 @@ public class WebSocket08FrameDecoder extends ReplayingDecoder out) throws Exception { if (currentFrame == null) { if (msg.isFinalFragment()) { - return msg.retain(); + out.add(msg.retain()); + return; } ByteBuf buf = ctx.alloc().compositeBuffer().addComponent(msg.data().retain()); buf.writerIndex(buf.writerIndex() + msg.data().readableBytes()); @@ -61,7 +63,7 @@ public class WebSocketFrameAggregator extends MessageToMessageDecoder out) throws Exception { try { - return decode(ctx, in); + decode(ctx, in, out); } finally { headerBlockDecompressor.end(); } } @Override - protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception { + protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, MessageBuf out) throws Exception { switch(state) { case READ_COMMON_HEADER: state = readCommonHeader(buffer); @@ -121,19 +122,20 @@ public class SpdyFrameDecoder extends ByteToMessageDecoder { if (streamID == 0) { state = State.FRAME_ERROR; fireProtocolException(ctx, "Received invalid data frame"); - return null; + return; } SpdyDataFrame spdyDataFrame = new DefaultSpdyDataFrame(streamID); spdyDataFrame.setLast((flags & SPDY_DATA_FLAG_FIN) != 0); state = State.READ_COMMON_HEADER; - return spdyDataFrame; + out.add(spdyDataFrame); + return; } // There are no length 0 control frames state = State.READ_COMMON_HEADER; } - return null; + return; case READ_CONTROL_FRAME: try { @@ -141,18 +143,19 @@ public class SpdyFrameDecoder extends ByteToMessageDecoder { if (frame != null) { state = State.READ_COMMON_HEADER; } - return frame; + out.add(frame); + return; } catch (IllegalArgumentException e) { state = State.FRAME_ERROR; fireInvalidControlFrameException(ctx); } - return null; + return; case READ_SETTINGS_FRAME: if (spdySettingsFrame == null) { // Validate frame length against number of entries if (buffer.readableBytes() < 4) { - return null; + return; } int numEntries = getUnsignedInt(buffer, buffer.readerIndex()); buffer.skipBytes(4); @@ -162,7 +165,7 @@ public class SpdyFrameDecoder extends ByteToMessageDecoder { if ((length & 0x07) != 0 || length >> 3 != numEntries) { state = State.FRAME_ERROR; fireInvalidControlFrameException(ctx); - return null; + return; } spdySettingsFrame = new DefaultSpdySettingsFrame(); @@ -196,7 +199,7 @@ public class SpdyFrameDecoder extends ByteToMessageDecoder { state = State.FRAME_ERROR; spdySettingsFrame = null; fireInvalidControlFrameException(ctx); - return null; + return; } if (!spdySettingsFrame.isSet(ID)) { @@ -211,9 +214,10 @@ public class SpdyFrameDecoder extends ByteToMessageDecoder { state = State.READ_COMMON_HEADER; Object frame = spdySettingsFrame; spdySettingsFrame = null; - return frame; + out.add(frame); + return; } - return null; + return; case READ_HEADER_BLOCK_FRAME: try { @@ -223,15 +227,16 @@ public class SpdyFrameDecoder extends ByteToMessageDecoder { state = State.READ_COMMON_HEADER; Object frame = spdyHeaderBlock; spdyHeaderBlock = null; - return frame; + out.add(frame); + return; } state = State.READ_HEADER_BLOCK; } - return null; + return; } catch (IllegalArgumentException e) { state = State.FRAME_ERROR; fireInvalidControlFrameException(ctx); - return null; + return; } case READ_HEADER_BLOCK: @@ -245,7 +250,7 @@ public class SpdyFrameDecoder extends ByteToMessageDecoder { spdyHeaderBlock = null; decompressed = null; ctx.fireExceptionCaught(e); - return null; + return; } if (spdyHeaderBlock != null && spdyHeaderBlock.isInvalid()) { @@ -255,22 +260,24 @@ public class SpdyFrameDecoder extends ByteToMessageDecoder { if (length == 0) { state = State.READ_COMMON_HEADER; } - return frame; + out.add(frame); + return; } if (length == 0) { Object frame = spdyHeaderBlock; spdyHeaderBlock = null; state = State.READ_COMMON_HEADER; - return frame; + out.add(frame); + return; } - return null; + return; case READ_DATA_FRAME: if (streamID == 0) { state = State.FRAME_ERROR; fireProtocolException(ctx, "Received invalid data frame"); - return null; + return; } // Generate data frames that do not exceed maxChunkSize @@ -278,7 +285,7 @@ public class SpdyFrameDecoder extends ByteToMessageDecoder { // Wait until entire frame is readable if (buffer.readableBytes() < dataLength) { - return null; + return; } ByteBuf data = ctx.alloc().buffer(dataLength); @@ -290,7 +297,8 @@ public class SpdyFrameDecoder extends ByteToMessageDecoder { spdyDataFrame.setLast((flags & SPDY_DATA_FLAG_FIN) != 0); state = State.READ_COMMON_HEADER; } - return spdyDataFrame; + out.add(spdyDataFrame); + return; case DISCARD_FRAME: int numBytes = Math.min(buffer.readableBytes(), length); @@ -299,11 +307,11 @@ public class SpdyFrameDecoder extends ByteToMessageDecoder { if (length == 0) { state = State.READ_COMMON_HEADER; } - return null; + return; case FRAME_ERROR: buffer.skipBytes(buffer.readableBytes()); - return null; + return; default: throw new Error("Shouldn't reach here."); diff --git a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyHttpDecoder.java b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyHttpDecoder.java index 485e906ca7..75c0e26a35 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyHttpDecoder.java +++ b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyHttpDecoder.java @@ -16,6 +16,7 @@ package io.netty.handler.codec.spdy; import io.netty.buffer.ByteBuf; +import io.netty.buffer.MessageBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToMessageDecoder; import io.netty.handler.codec.TooLongFrameException; @@ -65,7 +66,8 @@ public class SpdyHttpDecoder extends MessageToMessageDecoder out) + throws Exception { if (msg instanceof SpdySynStreamFrame) { // HTTP requests/responses are mapped one-to-one to SPDY streams. @@ -106,7 +108,8 @@ public class SpdyHttpDecoder extends MessageToMessageDecoder e: spdyHeadersFrame.headers().entries()) { @@ -193,7 +198,7 @@ public class SpdyHttpDecoder extends MessageToMessageDecoder { } @Override - protected Object encode(ChannelHandlerContext ctx, HttpObject msg) throws Exception { - - List out = new ArrayList(); + protected void encode(ChannelHandlerContext ctx, HttpObject msg, MessageBuf out) throws Exception { boolean valid = false; @@ -199,8 +197,6 @@ public class SpdyHttpEncoder extends MessageToMessageEncoder { if (!valid) { throw new UnsupportedMessageTypeException(msg); } - - return out.toArray(); } private SpdySynStreamFrame createSynStreamFrame(HttpMessage httpMessage) diff --git a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyHttpResponseStreamIdHandler.java b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyHttpResponseStreamIdHandler.java index 1cbcbdcc4a..cb18684d67 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyHttpResponseStreamIdHandler.java +++ b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyHttpResponseStreamIdHandler.java @@ -16,6 +16,7 @@ package io.netty.handler.codec.spdy; import io.netty.buffer.BufUtil; +import io.netty.buffer.MessageBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToMessageCodec; import io.netty.handler.codec.http.HttpMessage; @@ -39,18 +40,17 @@ public class SpdyHttpResponseStreamIdHandler extends } @Override - protected Object encode(ChannelHandlerContext ctx, HttpMessage msg) throws Exception { + protected void encode(ChannelHandlerContext ctx, HttpMessage msg, MessageBuf out) throws Exception { Integer id = ids.poll(); if (id != null && id.intValue() != NO_ID && !msg.headers().contains(SpdyHttpHeaders.Names.STREAM_ID)) { SpdyHttpHeaders.setStreamId(msg, id); } - BufUtil.retain(msg); - return msg; + out.add(BufUtil.retain(msg)); } @Override - protected Object decode(ChannelHandlerContext ctx, Object msg) throws Exception { + protected void decode(ChannelHandlerContext ctx, Object msg, MessageBuf out) throws Exception { if (msg instanceof HttpMessage) { boolean contains = ((HttpMessage) msg).headers().contains(SpdyHttpHeaders.Names.STREAM_ID); if (!contains) { @@ -62,7 +62,6 @@ public class SpdyHttpResponseStreamIdHandler extends ids.remove(((SpdyRstStreamFrame) msg).getStreamId()); } - BufUtil.retain(msg); - return msg; + out.add(BufUtil.retain(msg)); } } diff --git a/codec-socks/src/main/java/io/netty/handler/codec/socks/SocksAuthRequestDecoder.java b/codec-socks/src/main/java/io/netty/handler/codec/socks/SocksAuthRequestDecoder.java index 60bb857a6a..53101ed862 100644 --- a/codec-socks/src/main/java/io/netty/handler/codec/socks/SocksAuthRequestDecoder.java +++ b/codec-socks/src/main/java/io/netty/handler/codec/socks/SocksAuthRequestDecoder.java @@ -16,6 +16,7 @@ package io.netty.handler.codec.socks; import io.netty.buffer.ByteBuf; +import io.netty.buffer.MessageBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ReplayingDecoder; import io.netty.util.CharsetUtil; @@ -42,7 +43,7 @@ public class SocksAuthRequestDecoder extends ReplayingDecoder out) throws Exception { switch (state()) { case CHECK_PROTOCOL_VERSION: { version = SocksSubnegotiationVersion.fromByte(byteBuf.readByte()); @@ -63,7 +64,7 @@ public class SocksAuthRequestDecoder extends ReplayingDecoder out) + throws Exception { switch (state()) { case CHECK_PROTOCOL_VERSION: { version = SocksSubnegotiationVersion.fromByte(byteBuf.readByte()); @@ -54,7 +56,7 @@ public class SocksAuthResponseDecoder extends ReplayingDecoder out) throws Exception { switch (state()) { case CHECK_PROTOCOL_VERSION: { version = SocksProtocolVersion.fromByte(byteBuf.readByte()); @@ -87,7 +88,7 @@ public class SocksCmdRequestDecoder extends ReplayingDecoder out) throws Exception { switch (state()) { case CHECK_PROTOCOL_VERSION: { version = SocksProtocolVersion.fromByte(byteBuf.readByte()); @@ -87,7 +88,7 @@ public class SocksCmdResponseDecoder extends ReplayingDecoder out) throws Exception { switch (state()) { case CHECK_PROTOCOL_VERSION: { version = SocksProtocolVersion.fromByte(byteBuf.readByte()); @@ -63,7 +64,7 @@ public class SocksInitRequestDecoder extends ReplayingDecoder out) throws Exception { switch (state()) { case CHECK_PROTOCOL_VERSION: { version = SocksProtocolVersion.fromByte(byteBuf.readByte()); @@ -56,7 +57,7 @@ public class SocksInitResponseDecoder extends ReplayingDecoder extends ChannelDuplexHandler private final ByteToMessageDecoder decoder = new ByteToMessageDecoder() { @Override - public Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception { - return ByteToMessageCodec.this.decode(ctx, in); + public void decode(ChannelHandlerContext ctx, ByteBuf in, MessageBuf out) throws Exception { + ByteToMessageCodec.this.decode(ctx, in, out); } @Override - protected Object decodeLast(ChannelHandlerContext ctx, ByteBuf in) throws Exception { - return ByteToMessageCodec.this.decodeLast(ctx, in); + protected void decodeLast(ChannelHandlerContext ctx, ByteBuf in, MessageBuf out) throws Exception { + ByteToMessageCodec.this.decodeLast(ctx, in, out); } }; @@ -105,8 +105,8 @@ public abstract class ByteToMessageCodec extends ChannelDuplexHandler } protected abstract void encode(ChannelHandlerContext ctx, I msg, ByteBuf out) throws Exception; - protected abstract Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception; - protected Object decodeLast(ChannelHandlerContext ctx, ByteBuf in) throws Exception { - return decode(ctx, in); + protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, MessageBuf out) throws Exception; + protected void decodeLast(ChannelHandlerContext ctx, ByteBuf in, MessageBuf out) throws Exception { + decode(ctx, in, out); } } diff --git a/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java b/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java index 157d4b3b6e..04b0fed5f7 100644 --- a/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java @@ -17,7 +17,9 @@ package io.netty.handler.codec; import io.netty.buffer.ByteBuf; import io.netty.buffer.MessageBuf; +import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelHandlerUtil; import io.netty.channel.ChannelInboundByteHandler; import io.netty.channel.ChannelInboundByteHandlerAdapter; @@ -31,9 +33,9 @@ import io.netty.channel.ChannelInboundByteHandlerAdapter; *
  *     public class SquareDecoder extends {@link ByteToMessageDecoder} {
  *         {@code @Override}
- *         public {@link Object} decode({@link ChannelHandlerContext} ctx, {@link ByteBuf} in)
+ *         public void decode({@link ChannelHandlerContext} ctx, {@link ByteBuf} in, {@link MessageBuf} out)
  *                 throws {@link Exception} {
- *             return in.readBytes(in.readableBytes());
+ *             out.add(in.readBytes(in.readableBytes()));
  *         }
  *     }
  * 
@@ -43,6 +45,19 @@ public abstract class ByteToMessageDecoder private volatile boolean singleDecode; private boolean decodeWasNull; + private MessageBuf decoderOutput; + + @Override + public ByteBuf newInboundBuffer(ChannelHandlerContext ctx) throws Exception { + decoderOutput = Unpooled.messageBuffer(); + return super.newInboundBuffer(ctx); + } + + @Override + public void freeInboundBuffer(ChannelHandlerContext ctx) throws Exception { + super.freeInboundBuffer(ctx); + decoderOutput.release(); + } /** * If set then only one message is decoded on each {@link #inboundBufferUpdated(ChannelHandlerContext)} call. @@ -82,22 +97,33 @@ public abstract class ByteToMessageDecoder @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { + MessageBuf out = decoderOutput(); try { ByteBuf in = ctx.inboundByteBuffer(); if (in.isReadable()) { callDecode(ctx, in); } + decodeLast(ctx, in, out); - if (ctx.nextInboundMessageBuffer().unfoldAndAdd(decodeLast(ctx, in))) { - ctx.fireInboundBufferUpdated(); - } } catch (Throwable t) { if (t instanceof CodecException) { - ctx.fireExceptionCaught(t); + throw (CodecException) t; } else { - ctx.fireExceptionCaught(new DecoderException(t)); + throw new DecoderException(t); } } finally { + boolean decoded = false; + for (;;) { + Object msg = out.poll(); + if (msg == null) { + break; + } + decoded = true; + ChannelHandlerUtil.addToNextInboundBuffer(ctx, msg); + } + if (decoded) { + ctx.fireInboundBufferUpdated(); + } ctx.fireChannelInactive(); } } @@ -106,12 +132,16 @@ public abstract class ByteToMessageDecoder boolean wasNull = false; boolean decoded = false; - MessageBuf out = ctx.nextInboundMessageBuffer(); + MessageBuf out = decoderOutput(); + + assert out.isEmpty(); + while (in.isReadable()) { try { + int outSize = out.size(); int oldInputLength = in.readableBytes(); - Object o = decode(ctx, in); - if (o == null) { + decode(ctx, in, out); + if (outSize == out.size()) { wasNull = true; if (oldInputLength == in.readableBytes()) { break; @@ -119,33 +149,36 @@ public abstract class ByteToMessageDecoder continue; } } + wasNull = false; if (oldInputLength == in.readableBytes()) { throw new IllegalStateException( "decode() did not read anything but decoded a message."); } - if (out.unfoldAndAdd(o)) { - decoded = true; - if (isSingleDecode()) { - break; - } - } else { + if (isSingleDecode()) { break; } } catch (Throwable t) { + if (t instanceof CodecException) { + throw (CodecException) t; + } else { + throw new DecoderException(t); + } + } finally { + for (;;) { + Object msg = out.poll(); + if (msg == null) { + break; + } + decoded = true; + ChannelHandlerUtil.addToNextInboundBuffer(ctx, msg); + } + if (decoded) { decoded = false; ctx.fireInboundBufferUpdated(); } - - if (t instanceof CodecException) { - ctx.fireExceptionCaught(t); - } else { - ctx.fireExceptionCaught(new DecoderException(t)); - } - - break; } } @@ -166,20 +199,25 @@ public abstract class ByteToMessageDecoder * * @param ctx the {@link ChannelHandlerContext} which this {@link ByteToByteDecoder} belongs to * @param in the {@link ByteBuf} from which to read data - * @return message the message to which the content of the {@link ByteBuf} was decoded, or {@code null} if - * there was not enough data left in the {@link ByteBuf} to decode. + * @param out the {@link MessageBuf} to which decoded messages should be added + * @throws Exception is thrown if an error accour */ - protected abstract Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception; + protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, MessageBuf out) throws Exception; /** * Is called one last time when the {@link ChannelHandlerContext} goes in-active. Which means the * {@link #channelInactive(ChannelHandlerContext)} was triggered. * - * By default this will just call {@link #decode(ChannelHandlerContext, ByteBuf)} but sub-classes may + * By default this will just call {@link #decode(ChannelHandlerContext, ByteBuf, MessageBuf)} but sub-classes may * override this for some special cleanup operation. */ - protected Object decodeLast(ChannelHandlerContext ctx, ByteBuf in) throws Exception { - return decode(ctx, in); + protected void decodeLast(ChannelHandlerContext ctx, ByteBuf in, MessageBuf out) throws Exception { + decode(ctx, in, out); } + + final MessageBuf decoderOutput() { + return decoderOutput; + } + } diff --git a/codec/src/main/java/io/netty/handler/codec/DelimiterBasedFrameDecoder.java b/codec/src/main/java/io/netty/handler/codec/DelimiterBasedFrameDecoder.java index 438a984d02..3d22a7dc83 100644 --- a/codec/src/main/java/io/netty/handler/codec/DelimiterBasedFrameDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/DelimiterBasedFrameDecoder.java @@ -16,6 +16,7 @@ package io.netty.handler.codec; import io.netty.buffer.ByteBuf; +import io.netty.buffer.MessageBuf; import io.netty.channel.ChannelHandlerContext; /** @@ -210,6 +211,13 @@ public class DelimiterBasedFrameDecoder extends ByteToMessageDecoder { } @Override + protected void decode(ChannelHandlerContext ctx, ByteBuf in, MessageBuf out) throws Exception { + Object decoded = decode(ctx, in); + if (decoded != null) { + out.add(decoded); + } + } + protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception { if (lineBasedDecoder != null) { return lineBasedDecoder.decode(ctx, buffer); diff --git a/codec/src/main/java/io/netty/handler/codec/FixedLengthFrameDecoder.java b/codec/src/main/java/io/netty/handler/codec/FixedLengthFrameDecoder.java index bc86dc4d6d..600c19babd 100644 --- a/codec/src/main/java/io/netty/handler/codec/FixedLengthFrameDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/FixedLengthFrameDecoder.java @@ -16,6 +16,7 @@ package io.netty.handler.codec; import io.netty.buffer.ByteBuf; +import io.netty.buffer.MessageBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerUtil; @@ -75,6 +76,13 @@ public class FixedLengthFrameDecoder extends ByteToMessageDecoder { } @Override + protected void decode(ChannelHandlerContext ctx, ByteBuf in, MessageBuf out) throws Exception { + Object decoded = decode(ctx, in); + if (decoded != null) { + out.add(decoded); + } + } + protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception { if (in.readableBytes() < frameLength) { return null; diff --git a/codec/src/main/java/io/netty/handler/codec/LengthFieldBasedFrameDecoder.java b/codec/src/main/java/io/netty/handler/codec/LengthFieldBasedFrameDecoder.java index a42f923b8e..b9a6e357b2 100644 --- a/codec/src/main/java/io/netty/handler/codec/LengthFieldBasedFrameDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/LengthFieldBasedFrameDecoder.java @@ -16,6 +16,7 @@ package io.netty.handler.codec; import io.netty.buffer.ByteBuf; +import io.netty.buffer.MessageBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.serialization.ObjectDecoder; @@ -347,6 +348,13 @@ public class LengthFieldBasedFrameDecoder extends ByteToMessageDecoder { } @Override + protected void decode(ChannelHandlerContext ctx, ByteBuf in, MessageBuf out) throws Exception { + Object decoded = decode(ctx, in); + if (decoded != null) { + out.add(decoded); + } + } + protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception { if (discardingTooLongFrame) { long bytesToDiscard = this.bytesToDiscard; @@ -446,12 +454,12 @@ public class LengthFieldBasedFrameDecoder extends ByteToMessageDecoder { discardingTooLongFrame = false; if (!failFast || failFast && firstDetectionOfTooLongFrame) { - fail(ctx, tooLongFrameLength); + fail(tooLongFrameLength); } } else { // Keep discarding and notify handlers if necessary. if (failFast && firstDetectionOfTooLongFrame) { - fail(ctx, tooLongFrameLength); + fail(tooLongFrameLength); } } } @@ -473,17 +481,15 @@ public class LengthFieldBasedFrameDecoder extends ByteToMessageDecoder { return frame; } - private void fail(ChannelHandlerContext ctx, long frameLength) { + private void fail(long frameLength) { if (frameLength > 0) { - ctx.fireExceptionCaught( - new TooLongFrameException( + throw new TooLongFrameException( "Adjusted frame length exceeds " + maxFrameLength + - ": " + frameLength + " - discarded")); + ": " + frameLength + " - discarded"); } else { - ctx.fireExceptionCaught( - new TooLongFrameException( + throw new TooLongFrameException( "Adjusted frame length exceeds " + maxFrameLength + - " - discarding")); + " - discarding"); } } } diff --git a/codec/src/main/java/io/netty/handler/codec/LineBasedFrameDecoder.java b/codec/src/main/java/io/netty/handler/codec/LineBasedFrameDecoder.java index 1c064bc52e..cc77c8e0ae 100644 --- a/codec/src/main/java/io/netty/handler/codec/LineBasedFrameDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/LineBasedFrameDecoder.java @@ -16,6 +16,7 @@ package io.netty.handler.codec; import io.netty.buffer.ByteBuf; +import io.netty.buffer.MessageBuf; import io.netty.channel.ChannelHandlerContext; /** @@ -67,6 +68,13 @@ public class LineBasedFrameDecoder extends ByteToMessageDecoder { } @Override + protected void decode(ChannelHandlerContext ctx, ByteBuf in, MessageBuf out) throws Exception { + Object decoded = decode(ctx, in); + if (decoded != null) { + out.add(decoded); + } + } + protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception { final int eol = findEndOfLine(buffer); if (eol != -1) { diff --git a/codec/src/main/java/io/netty/handler/codec/MessageToMessageCodec.java b/codec/src/main/java/io/netty/handler/codec/MessageToMessageCodec.java index fb4b586405..f451a2effd 100644 --- a/codec/src/main/java/io/netty/handler/codec/MessageToMessageCodec.java +++ b/codec/src/main/java/io/netty/handler/codec/MessageToMessageCodec.java @@ -62,8 +62,8 @@ public abstract class MessageToMessageCodec @Override @SuppressWarnings("unchecked") - protected Object encode(ChannelHandlerContext ctx, Object msg) throws Exception { - return MessageToMessageCodec.this.encode(ctx, (OUTBOUND_IN) msg); + protected void encode(ChannelHandlerContext ctx, Object msg, MessageBuf out) throws Exception { + MessageToMessageCodec.this.encode(ctx, (OUTBOUND_IN) msg, out); } }; @@ -77,8 +77,8 @@ public abstract class MessageToMessageCodec @Override @SuppressWarnings("unchecked") - protected Object decode(ChannelHandlerContext ctx, Object msg) throws Exception { - return MessageToMessageCodec.this.decode(ctx, (INBOUND_IN) msg); + protected void decode(ChannelHandlerContext ctx, Object msg, MessageBuf out) throws Exception { + MessageToMessageCodec.this.decode(ctx, (INBOUND_IN) msg, out); } }; @@ -147,6 +147,6 @@ public abstract class MessageToMessageCodec return outboundMsgMatcher.match(msg); } - protected abstract Object encode(ChannelHandlerContext ctx, OUTBOUND_IN msg) throws Exception; - protected abstract Object decode(ChannelHandlerContext ctx, INBOUND_IN msg) throws Exception; + protected abstract void encode(ChannelHandlerContext ctx, OUTBOUND_IN msg, MessageBuf out) throws Exception; + protected abstract void decode(ChannelHandlerContext ctx, INBOUND_IN msg, MessageBuf out) throws Exception; } diff --git a/codec/src/main/java/io/netty/handler/codec/MessageToMessageDecoder.java b/codec/src/main/java/io/netty/handler/codec/MessageToMessageDecoder.java index 0c52e0002e..3cdb30678d 100644 --- a/codec/src/main/java/io/netty/handler/codec/MessageToMessageDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/MessageToMessageDecoder.java @@ -16,7 +16,9 @@ package io.netty.handler.codec; import io.netty.buffer.MessageBuf; +import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelHandlerUtil; import io.netty.channel.ChannelInboundMessageHandler; import io.netty.channel.ChannelInboundMessageHandlerAdapter; @@ -29,14 +31,11 @@ import io.netty.channel.ChannelInboundMessageHandlerAdapter; *
  *     public class StringToIntegerDecoder extends
  *             {@link MessageToMessageDecoder}<{@link String}> {
- *         public StringToIntegerDecoder() {
- *             super(String.class);
- *         }
  *
  *         {@code @Override}
- *         public {@link Object} decode({@link ChannelHandlerContext} ctx, {@link String} message)
- *                 throws {@link Exception} {
- *             return message.length());
+ *         public void decode({@link ChannelHandlerContext} ctx, {@link String} message,
+ *                 {@link MessageBuf} out) throws {@link Exception} {
+ *             out.add(message.length());
  *         }
  *     }
  * 
@@ -44,6 +43,14 @@ import io.netty.channel.ChannelInboundMessageHandlerAdapter; */ public abstract class MessageToMessageDecoder extends ChannelInboundMessageHandlerAdapter { + private static final ThreadLocal> decoderOutput = + new ThreadLocal>() { + @Override + protected MessageBuf initialValue() { + return Unpooled.messageBuffer(); + } + }; + protected MessageToMessageDecoder() { } protected MessageToMessageDecoder(Class inboundMessageType) { @@ -52,7 +59,18 @@ public abstract class MessageToMessageDecoder extends ChannelInboundMessageHa @Override public final void messageReceived(ChannelHandlerContext ctx, I msg) throws Exception { - ctx.nextInboundMessageBuffer().unfoldAndAdd(decode(ctx, msg)); + MessageBuf out = decoderOutput.get(); + try { + decode(ctx, msg, out); + } finally { + for (;;) { + Object obj = out.poll(); + if (obj == null) { + break; + } + ChannelHandlerUtil.addToNextInboundBuffer(ctx, obj); + } + } } /** @@ -61,9 +79,8 @@ public abstract class MessageToMessageDecoder extends ChannelInboundMessageHa * * @param ctx the {@link ChannelHandlerContext} which this {@link MessageToMessageDecoder} belongs to * @param msg the message to decode to an other one - * @return message the decoded message or {@code null} if more messages are needed be cause the implementation - * needs to do some kind of aggragation + * @param out the {@link MessageBuf} to which decoded messages should be added * @throws Exception is thrown if an error accour */ - protected abstract Object decode(ChannelHandlerContext ctx, I msg) throws Exception; + protected abstract void decode(ChannelHandlerContext ctx, I msg, MessageBuf out) throws Exception; } diff --git a/codec/src/main/java/io/netty/handler/codec/MessageToMessageEncoder.java b/codec/src/main/java/io/netty/handler/codec/MessageToMessageEncoder.java index ed6d8745e1..d1482051c7 100644 --- a/codec/src/main/java/io/netty/handler/codec/MessageToMessageEncoder.java +++ b/codec/src/main/java/io/netty/handler/codec/MessageToMessageEncoder.java @@ -16,6 +16,7 @@ package io.netty.handler.codec; import io.netty.buffer.MessageBuf; +import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerUtil; import io.netty.channel.ChannelOutboundMessageHandlerAdapter; @@ -28,20 +29,24 @@ import io.netty.channel.ChannelOutboundMessageHandlerAdapter; *
  *     public class IntegerToStringEncoder extends
  *             {@link MessageToMessageEncoder}<{@link Integer}> {
- *         public StringToIntegerDecoder() {
- *             super(String.class);
- *         }
  *
  *         {@code @Override}
- *         public {@link Object} encode({@link ChannelHandlerContext} ctx, {@link Integer} message)
+ *         public void encode({@link ChannelHandlerContext} ctx, {@link Integer} message, {@link MessageBuf} out)
  *                 throws {@link Exception} {
- *             return message.toString();
+ *             out.add(message.toString());
  *         }
  *     }
  * 
* */ public abstract class MessageToMessageEncoder extends ChannelOutboundMessageHandlerAdapter { + private static final ThreadLocal> encoderOutput = + new ThreadLocal>() { + @Override + protected MessageBuf initialValue() { + return Unpooled.messageBuffer(); + } + }; protected MessageToMessageEncoder() { } @@ -51,16 +56,30 @@ public abstract class MessageToMessageEncoder extends ChannelOutboundMessageH @Override public final void flush(ChannelHandlerContext ctx, I msg) throws Exception { - try { - Object encoded = encode(ctx, msg); - // Handle special case when the encoded output is a ByteBuf and the next handler in the pipeline - // accept bytes. Related to #1222 - ChannelHandlerUtil.addToNextOutboundBuffer(ctx, encoded); + MessageBuf out = encoderOutput.get(); + assert out.isEmpty(); + + try { + encode(ctx, msg, out); } catch (CodecException e) { throw e; - } catch (Exception e) { - throw new CodecException(e); + } catch (Throwable cause) { + if (cause instanceof CodecException) { + throw (CodecException) cause; + } else { + throw new EncoderException(cause); + } + } finally { + for (;;) { + Object encoded = out.poll(); + if (encoded == null) { + break; + } + // Handle special case when the encoded output is a ByteBuf and the next handler in the pipeline + // accept bytes. Related to #1222 + ChannelHandlerUtil.addToNextOutboundBuffer(ctx, encoded); + } } } @@ -70,9 +89,9 @@ public abstract class MessageToMessageEncoder extends ChannelOutboundMessageH * * @param ctx the {@link ChannelHandlerContext} which this {@link MessageToMessageEncoder} belongs to * @param msg the message to encode to an other one - * @return message the encoded message or {@code null} if more messages are needed be cause the implementation + * @param out the {@link MessageBuf} into which the encoded msg should be added * needs to do some kind of aggragation * @throws Exception is thrown if an error accour */ - protected abstract Object encode(ChannelHandlerContext ctx, I msg) throws Exception; + protected abstract void encode(ChannelHandlerContext ctx, I msg, MessageBuf out) throws Exception; } diff --git a/codec/src/main/java/io/netty/handler/codec/ReplayingDecoder.java b/codec/src/main/java/io/netty/handler/codec/ReplayingDecoder.java index d712d3ff70..f5704c1e9b 100644 --- a/codec/src/main/java/io/netty/handler/codec/ReplayingDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/ReplayingDecoder.java @@ -19,6 +19,7 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.MessageBuf; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelHandlerUtil; import io.netty.channel.ChannelPipeline; import io.netty.util.Signal; @@ -364,6 +365,8 @@ public abstract class ReplayingDecoder extends ByteToMessageDecoder { @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { + MessageBuf out = decoderOutput(); + try { replayable.terminate(); ByteBuf in = cumulation; @@ -371,19 +374,31 @@ public abstract class ReplayingDecoder extends ByteToMessageDecoder { callDecode(ctx, in); } - if (ctx.nextInboundMessageBuffer().unfoldAndAdd(decodeLast(ctx, replayable))) { - ctx.fireInboundBufferUpdated(); - } + decodeLast(ctx, replayable, out); } catch (Signal replay) { // Ignore replay.expect(REPLAY); } catch (Throwable t) { if (t instanceof CodecException) { - ctx.fireExceptionCaught(t); + throw (CodecException) t; } else { - ctx.fireExceptionCaught(new DecoderException(t)); + throw new DecoderException(t); } } finally { + + boolean decoded = false; + for (;;) { + Object msg = out.poll(); + if (msg == null) { + break; + } + decoded = true; + ChannelHandlerUtil.addToNextInboundBuffer(ctx, msg); + } + if (decoded) { + ctx.fireInboundBufferUpdated(); + } + ctx.fireChannelInactive(); } } @@ -393,16 +408,17 @@ public abstract class ReplayingDecoder extends ByteToMessageDecoder { boolean wasNull = false; ByteBuf in = cumulation; - MessageBuf out = ctx.nextInboundMessageBuffer(); + MessageBuf out = decoderOutput(); boolean decoded = false; while (in.isReadable()) { try { int oldReaderIndex = checkpoint = in.readerIndex(); - Object result = null; + int outSize = out.size(); S oldState = state; try { - result = decode(ctx, replayable); - if (result == null) { + decode(ctx, replayable, out); + if (outSize == out.size()) { + wasNull = true; if (oldReaderIndex == in.readerIndex() && oldState == state) { throw new IllegalStateException( "null cannot be returned if no data is consumed and state didn't change."); @@ -422,13 +438,6 @@ public abstract class ReplayingDecoder extends ByteToMessageDecoder { // Called by cleanup() - no need to maintain the readerIndex // anymore because the buffer has been released already. } - } - - if (result == null) { - wasNull = true; - - // Seems like more data is required. - // Let us wait for the next notification. break; } wasNull = false; @@ -439,27 +448,26 @@ public abstract class ReplayingDecoder extends ByteToMessageDecoder { "if it returned a decoded message (caused by: " + getClass() + ')'); } + } catch (Throwable t) { + if (t instanceof CodecException) { + throw (CodecException) t; + } else { + throw new DecoderException(t); + } + } finally { - // A successful decode - if (out.unfoldAndAdd(result)) { - decoded = true; - if (isSingleDecode()) { + for (;;) { + Object msg = out.poll(); + if (msg == null) { break; } + decoded = true; + ChannelHandlerUtil.addToNextInboundBuffer(ctx, msg); } - } catch (Throwable t) { if (decoded) { decoded = false; ctx.fireInboundBufferUpdated(); } - - if (t instanceof CodecException) { - ctx.fireExceptionCaught(t); - } else { - ctx.fireExceptionCaught(new DecoderException(t)); - } - - break; } } diff --git a/codec/src/main/java/io/netty/handler/codec/base64/Base64Decoder.java b/codec/src/main/java/io/netty/handler/codec/base64/Base64Decoder.java index bd8bb840ee..ff1eb4458d 100644 --- a/codec/src/main/java/io/netty/handler/codec/base64/Base64Decoder.java +++ b/codec/src/main/java/io/netty/handler/codec/base64/Base64Decoder.java @@ -16,6 +16,7 @@ package io.netty.handler.codec.base64; import io.netty.buffer.ByteBuf; +import io.netty.buffer.MessageBuf; import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPipeline; @@ -58,7 +59,7 @@ public class Base64Decoder extends MessageToMessageDecoder { } @Override - protected Object decode(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { - return Base64.decode(msg, msg.readerIndex(), msg.readableBytes(), dialect); + protected void decode(ChannelHandlerContext ctx, ByteBuf msg, MessageBuf out) throws Exception { + out.add(Base64.decode(msg, msg.readerIndex(), msg.readableBytes(), dialect)); } } diff --git a/codec/src/main/java/io/netty/handler/codec/bytes/ByteArrayDecoder.java b/codec/src/main/java/io/netty/handler/codec/bytes/ByteArrayDecoder.java index 145de1ca1a..1d0f8acf9e 100644 --- a/codec/src/main/java/io/netty/handler/codec/bytes/ByteArrayDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/bytes/ByteArrayDecoder.java @@ -16,6 +16,7 @@ package io.netty.handler.codec.bytes; import io.netty.buffer.ByteBuf; +import io.netty.buffer.MessageBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPipeline; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; @@ -49,7 +50,7 @@ import io.netty.handler.codec.MessageToMessageDecoder; public class ByteArrayDecoder extends MessageToMessageDecoder { @Override - protected Object decode(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { + protected void decode(ChannelHandlerContext ctx, ByteBuf msg, MessageBuf out) throws Exception { byte[] array; if (msg.hasArray()) { if (msg.arrayOffset() == 0 && msg.readableBytes() == msg.capacity()) { @@ -67,6 +68,6 @@ public class ByteArrayDecoder extends MessageToMessageDecoder { msg.getBytes(0, array); } - return array; + out.add(array); } } diff --git a/codec/src/main/java/io/netty/handler/codec/marshalling/CompatibleMarshallingDecoder.java b/codec/src/main/java/io/netty/handler/codec/marshalling/CompatibleMarshallingDecoder.java index 2ed6b7be63..7af3ebd75e 100644 --- a/codec/src/main/java/io/netty/handler/codec/marshalling/CompatibleMarshallingDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/marshalling/CompatibleMarshallingDecoder.java @@ -16,6 +16,7 @@ package io.netty.handler.codec.marshalling; import io.netty.buffer.ByteBuf; +import io.netty.buffer.MessageBuf; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ReplayingDecoder; @@ -54,11 +55,11 @@ public class CompatibleMarshallingDecoder extends ReplayingDecoder { } @Override - protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception { + protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, MessageBuf out) throws Exception { if (discardingTooLongFrame) { buffer.skipBytes(actualReadableBytes()); checkpoint(); - return null; + return; } Unmarshaller unmarshaller = provider.getUnmarshaller(ctx); @@ -70,7 +71,7 @@ public class CompatibleMarshallingDecoder extends ReplayingDecoder { unmarshaller.start(input); Object obj = unmarshaller.readObject(); unmarshaller.finish(); - return obj; + out.add(obj); } catch (LimitingByteInput.TooBigObjectException e) { discardingTooLongFrame = true; throw new TooLongFrameException(); @@ -82,19 +83,19 @@ public class CompatibleMarshallingDecoder extends ReplayingDecoder { } @Override - protected Object decodeLast(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception { + protected void decodeLast(ChannelHandlerContext ctx, ByteBuf buffer, MessageBuf out) throws Exception { switch (buffer.readableBytes()) { case 0: - return null; + return; case 1: // Ignore the last TC_RESET if (buffer.getByte(buffer.readerIndex()) == ObjectStreamConstants.TC_RESET) { buffer.skipBytes(1); - return null; + return; } } - return decode(ctx, buffer); + decode(ctx, buffer, out); } @Override diff --git a/codec/src/main/java/io/netty/handler/codec/protobuf/ProtobufDecoder.java b/codec/src/main/java/io/netty/handler/codec/protobuf/ProtobufDecoder.java index 3a5c1fea25..7bce890497 100644 --- a/codec/src/main/java/io/netty/handler/codec/protobuf/ProtobufDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/protobuf/ProtobufDecoder.java @@ -19,6 +19,7 @@ import com.google.protobuf.ExtensionRegistry; import com.google.protobuf.Message; import com.google.protobuf.MessageLite; import io.netty.buffer.ByteBuf; +import io.netty.buffer.MessageBuf; import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPipeline; @@ -94,7 +95,7 @@ public class ProtobufDecoder extends MessageToMessageDecoder { } @Override - protected Object decode(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { + protected void decode(ChannelHandlerContext ctx, ByteBuf msg, MessageBuf out) throws Exception { final byte[] array; final int offset; final int length = msg.readableBytes(); @@ -109,15 +110,15 @@ public class ProtobufDecoder extends MessageToMessageDecoder { if (extensionRegistry == null) { if (HAS_PARSER) { - return prototype.getParserForType().parseFrom(array, offset, length); + out.add(prototype.getParserForType().parseFrom(array, offset, length)); } else { - return prototype.newBuilderForType().mergeFrom(array, offset, length).build(); + out.add(prototype.newBuilderForType().mergeFrom(array, offset, length).build()); } } else { if (HAS_PARSER) { - return prototype.getParserForType().parseFrom(array, offset, length, extensionRegistry); + out.add(prototype.getParserForType().parseFrom(array, offset, length, extensionRegistry)); } else { - return prototype.newBuilderForType().mergeFrom(array, offset, length, extensionRegistry).build(); + out.add(prototype.newBuilderForType().mergeFrom(array, offset, length, extensionRegistry).build()); } } } diff --git a/codec/src/main/java/io/netty/handler/codec/protobuf/ProtobufEncoder.java b/codec/src/main/java/io/netty/handler/codec/protobuf/ProtobufEncoder.java index 70d8a8719e..874f402af6 100644 --- a/codec/src/main/java/io/netty/handler/codec/protobuf/ProtobufEncoder.java +++ b/codec/src/main/java/io/netty/handler/codec/protobuf/ProtobufEncoder.java @@ -19,6 +19,7 @@ import com.google.protobuf.Message; import com.google.protobuf.MessageLite; import com.google.protobuf.MessageLiteOrBuilder; import io.netty.buffer.ByteBuf; +import io.netty.buffer.MessageBuf; import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPipeline; @@ -59,13 +60,14 @@ import static io.netty.buffer.Unpooled.*; public class ProtobufEncoder extends MessageToMessageEncoder { @Override - protected Object encode(ChannelHandlerContext ctx, MessageLiteOrBuilder msg) throws Exception { + protected void encode(ChannelHandlerContext ctx, MessageLiteOrBuilder msg, MessageBuf out) + throws Exception { if (msg instanceof MessageLite) { - return wrappedBuffer(((MessageLite) msg).toByteArray()); + out.add(wrappedBuffer(((MessageLite) msg).toByteArray())); + return; } if (msg instanceof MessageLite.Builder) { - return wrappedBuffer(((MessageLite.Builder) msg).build().toByteArray()); + out.add(wrappedBuffer(((MessageLite.Builder) msg).build().toByteArray())); } - return null; } } diff --git a/codec/src/main/java/io/netty/handler/codec/protobuf/ProtobufVarint32FrameDecoder.java b/codec/src/main/java/io/netty/handler/codec/protobuf/ProtobufVarint32FrameDecoder.java index 02f01c2020..c9b0059112 100644 --- a/codec/src/main/java/io/netty/handler/codec/protobuf/ProtobufVarint32FrameDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/protobuf/ProtobufVarint32FrameDecoder.java @@ -17,6 +17,7 @@ package io.netty.handler.codec.protobuf; import com.google.protobuf.CodedInputStream; import io.netty.buffer.ByteBuf; +import io.netty.buffer.MessageBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; import io.netty.handler.codec.CorruptedFrameException; @@ -42,13 +43,13 @@ public class ProtobufVarint32FrameDecoder extends ByteToMessageDecoder { // (just like LengthFieldBasedFrameDecoder) @Override - protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception { + protected void decode(ChannelHandlerContext ctx, ByteBuf in, MessageBuf out) throws Exception { in.markReaderIndex(); final byte[] buf = new byte[5]; for (int i = 0; i < buf.length; i ++) { if (!in.isReadable()) { in.resetReaderIndex(); - return null; + return; } buf[i] = in.readByte(); @@ -60,9 +61,10 @@ public class ProtobufVarint32FrameDecoder extends ByteToMessageDecoder { if (in.readableBytes() < length) { in.resetReaderIndex(); - return null; + return; } else { - return in.readBytes(length); + out.add(in.readBytes(length)); + return; } } } diff --git a/codec/src/main/java/io/netty/handler/codec/string/StringDecoder.java b/codec/src/main/java/io/netty/handler/codec/string/StringDecoder.java index 26c8004707..2f465866fc 100644 --- a/codec/src/main/java/io/netty/handler/codec/string/StringDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/string/StringDecoder.java @@ -16,6 +16,7 @@ package io.netty.handler.codec.string; import io.netty.buffer.ByteBuf; +import io.netty.buffer.MessageBuf; import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPipeline; @@ -74,7 +75,7 @@ public class StringDecoder extends MessageToMessageDecoder { } @Override - protected Object decode(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { - return msg.toString(charset); + protected void decode(ChannelHandlerContext ctx, ByteBuf msg, MessageBuf out) throws Exception { + out.add(msg.toString(charset)); } } diff --git a/codec/src/test/java/io/netty/handler/codec/ReplayingDecoderTest.java b/codec/src/test/java/io/netty/handler/codec/ReplayingDecoderTest.java index f28e076a59..90d219e2c8 100644 --- a/codec/src/test/java/io/netty/handler/codec/ReplayingDecoderTest.java +++ b/codec/src/test/java/io/netty/handler/codec/ReplayingDecoderTest.java @@ -17,6 +17,7 @@ package io.netty.handler.codec; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufIndexFinder; +import io.netty.buffer.MessageBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.embedded.EmbeddedByteChannel; @@ -53,10 +54,10 @@ public class ReplayingDecoderTest { } @Override - protected ByteBuf decode(ChannelHandlerContext ctx, ByteBuf in) { + protected void decode(ChannelHandlerContext ctx, ByteBuf in, MessageBuf out) { ByteBuf msg = in.readBytes(in.bytesBefore(ByteBufIndexFinder.LF)); in.skipBytes(1); - return msg; + out.add(msg); } } } diff --git a/codec/src/test/java/io/netty/handler/codec/marshalling/AbstractCompatibleMarshallingDecoderTest.java b/codec/src/test/java/io/netty/handler/codec/marshalling/AbstractCompatibleMarshallingDecoderTest.java index 67e736e671..3be464deff 100644 --- a/codec/src/test/java/io/netty/handler/codec/marshalling/AbstractCompatibleMarshallingDecoderTest.java +++ b/codec/src/test/java/io/netty/handler/codec/marshalling/AbstractCompatibleMarshallingDecoderTest.java @@ -19,8 +19,6 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandler; import io.netty.channel.embedded.EmbeddedByteChannel; -import io.netty.handler.codec.CodecException; -import io.netty.handler.codec.TooLongFrameException; import org.jboss.marshalling.Marshaller; import org.jboss.marshalling.MarshallerFactory; import org.jboss.marshalling.Marshalling; @@ -112,12 +110,12 @@ public abstract class AbstractCompatibleMarshallingDecoderTest { marshaller.close(); byte[] testBytes = bout.toByteArray(); - try { - ch.writeInbound(input(testBytes)); - fail(); - } catch (CodecException e) { - assertEquals(TooLongFrameException.class, e.getClass()); - } + onTooBigFrame(ch, input(testBytes)); + } + + protected void onTooBigFrame(EmbeddedByteChannel ch, ByteBuf input) { + ch.writeInbound(input); + assertFalse(ch.isActive()); } protected ChannelHandler createDecoder(int maxObjectSize) { diff --git a/codec/src/test/java/io/netty/handler/codec/marshalling/RiverMarshallingDecoderTest.java b/codec/src/test/java/io/netty/handler/codec/marshalling/RiverMarshallingDecoderTest.java index fec50f21c6..17d7cb2d59 100644 --- a/codec/src/test/java/io/netty/handler/codec/marshalling/RiverMarshallingDecoderTest.java +++ b/codec/src/test/java/io/netty/handler/codec/marshalling/RiverMarshallingDecoderTest.java @@ -18,6 +18,11 @@ package io.netty.handler.codec.marshalling; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandler; +import io.netty.channel.embedded.EmbeddedByteChannel; +import io.netty.handler.codec.CodecException; +import io.netty.handler.codec.TooLongFrameException; + +import static org.junit.Assert.*; public class RiverMarshallingDecoderTest extends RiverCompatibleMarshallingDecoderTest { @@ -34,4 +39,13 @@ public class RiverMarshallingDecoderTest extends RiverCompatibleMarshallingDecod createMarshallingConfig()), maxObjectSize); } + @Override + protected void onTooBigFrame(EmbeddedByteChannel ch, ByteBuf input) { + try { + ch.writeInbound(input); + fail(); + } catch (CodecException e) { + assertEquals(TooLongFrameException.class, e.getClass()); + } + } } diff --git a/codec/src/test/java/io/netty/handler/codec/marshalling/SerialMarshallingDecoderTest.java b/codec/src/test/java/io/netty/handler/codec/marshalling/SerialMarshallingDecoderTest.java index cf8f4f61f3..6f27baf078 100644 --- a/codec/src/test/java/io/netty/handler/codec/marshalling/SerialMarshallingDecoderTest.java +++ b/codec/src/test/java/io/netty/handler/codec/marshalling/SerialMarshallingDecoderTest.java @@ -18,6 +18,11 @@ package io.netty.handler.codec.marshalling; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandler; +import io.netty.channel.embedded.EmbeddedByteChannel; +import io.netty.handler.codec.CodecException; +import io.netty.handler.codec.TooLongFrameException; + +import static org.junit.Assert.*; public class SerialMarshallingDecoderTest extends SerialCompatibleMarshallingDecoderTest { @@ -34,4 +39,13 @@ public class SerialMarshallingDecoderTest extends SerialCompatibleMarshallingDec createMarshallingConfig()), maxObjectSize); } + @Override + protected void onTooBigFrame(EmbeddedByteChannel ch, ByteBuf input) { + try { + ch.writeInbound(input); + fail(); + } catch (CodecException e) { + assertEquals(TooLongFrameException.class, e.getClass()); + } + } } diff --git a/example/src/main/java/io/netty/example/factorial/BigIntegerDecoder.java b/example/src/main/java/io/netty/example/factorial/BigIntegerDecoder.java index 692fe8a944..459e842b32 100644 --- a/example/src/main/java/io/netty/example/factorial/BigIntegerDecoder.java +++ b/example/src/main/java/io/netty/example/factorial/BigIntegerDecoder.java @@ -16,6 +16,7 @@ package io.netty.example.factorial; import io.netty.buffer.ByteBuf; +import io.netty.buffer.MessageBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; import io.netty.handler.codec.CorruptedFrameException; @@ -31,10 +32,10 @@ import java.math.BigInteger; public class BigIntegerDecoder extends ByteToMessageDecoder { @Override - protected BigInteger decode(ChannelHandlerContext ctx, ByteBuf in) { + protected void decode(ChannelHandlerContext ctx, ByteBuf in, MessageBuf out) { // Wait until the length prefix is available. if (in.readableBytes() < 5) { - return null; + return; } in.markReaderIndex(); @@ -51,13 +52,13 @@ public class BigIntegerDecoder extends ByteToMessageDecoder { int dataLength = in.readInt(); if (in.readableBytes() < dataLength) { in.resetReaderIndex(); - return null; + return; } // Convert the received data into a new BigInteger. byte[] decoded = new byte[dataLength]; in.readBytes(decoded); - return new BigInteger(decoded); + out.add(new BigInteger(decoded)); } } diff --git a/transport/src/main/java/io/netty/channel/ChannelHandlerUtil.java b/transport/src/main/java/io/netty/channel/ChannelHandlerUtil.java index eb81d63574..596265abc4 100644 --- a/transport/src/main/java/io/netty/channel/ChannelHandlerUtil.java +++ b/transport/src/main/java/io/netty/channel/ChannelHandlerUtil.java @@ -221,7 +221,7 @@ public final class ChannelHandlerUtil { return true; } } - return ctx.nextOutboundMessageBuffer().unfoldAndAdd(msg); + return ctx.nextOutboundMessageBuffer().add(msg); } /** @@ -235,7 +235,7 @@ public final class ChannelHandlerUtil { return true; } } - return ctx.nextInboundMessageBuffer().unfoldAndAdd(msg); + return ctx.nextInboundMessageBuffer().add(msg); } private ChannelHandlerUtil() { }