diff --git a/buffer/src/main/java/io/netty/buffer/AbstractMessageBuf.java b/buffer/src/main/java/io/netty/buffer/AbstractMessageBuf.java index 8abb685cee..ec9067239c 100644 --- a/buffer/src/main/java/io/netty/buffer/AbstractMessageBuf.java +++ b/buffer/src/main/java/io/netty/buffer/AbstractMessageBuf.java @@ -207,29 +207,6 @@ public abstract class AbstractMessageBuf extends AbstractQueue implements return super.element(); } - @Override - @SuppressWarnings("unchecked") - public boolean unfoldAndAdd(Object o) { - if (o == null) { - return false; - } - - if (o instanceof Object[]) { - Object[] a = (Object[]) o; - int i; - for (i = 0; i < a.length; i ++) { - Object m = a[i]; - if (m == null) { - break; - } - add((T) m); - } - return i != 0; - } - - return add((T) o); - } - @Override public int drainTo(Collection c) { ensureAccessible(); diff --git a/buffer/src/main/java/io/netty/buffer/FilteredMessageBuf.java b/buffer/src/main/java/io/netty/buffer/FilteredMessageBuf.java index cef27554bf..496814db08 100644 --- a/buffer/src/main/java/io/netty/buffer/FilteredMessageBuf.java +++ b/buffer/src/main/java/io/netty/buffer/FilteredMessageBuf.java @@ -194,11 +194,6 @@ public abstract class FilteredMessageBuf implements MessageBuf { buf.clear(); } - @Override - public boolean unfoldAndAdd(Object o) { - return buf.unfoldAndAdd(o); - } - @Override public int refCnt() { return buf.refCnt(); diff --git a/buffer/src/main/java/io/netty/buffer/MessageBuf.java b/buffer/src/main/java/io/netty/buffer/MessageBuf.java index 00d45c5156..0e2b6ada22 100644 --- a/buffer/src/main/java/io/netty/buffer/MessageBuf.java +++ b/buffer/src/main/java/io/netty/buffer/MessageBuf.java @@ -25,17 +25,6 @@ import java.util.Queue; */ public interface MessageBuf extends Buf, Queue { - /** - * Unfold the specified object if necessary, and then add the unfolded objects (or the specified object if - * unfonding was not necessary) to this buffer. If the specified object is an object array ({@code Object[]}), - * this method adds the elements of the array to this buffer until {@code null} is encountered. If the specified - * object is {@code null}, nothing is added to this buffer. Otherwise, the specified object is added to this - * buffer as-is. - * - * @return {@code true} if one or more messages were added to this buffer. {@code false} otherwise. - */ - boolean unfoldAndAdd(Object o); - /** * Drain the content of te {@link MessageBuf} to the given {@link Collection}. * diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/HttpClientCodec.java b/codec-http/src/main/java/io/netty/handler/codec/http/HttpClientCodec.java index 162e0e248a..36a9195af9 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/HttpClientCodec.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/HttpClientCodec.java @@ -16,6 +16,7 @@ package io.netty.handler.codec.http; import io.netty.buffer.ByteBuf; +import io.netty.buffer.FilteredMessageBuf; import io.netty.buffer.MessageBuf; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; @@ -119,6 +120,7 @@ public final class HttpClientCodec } private final class Encoder extends HttpRequestEncoder { + @Override protected void encode( ChannelHandlerContext ctx, HttpObject msg, ByteBuf out) throws Exception { @@ -139,28 +141,32 @@ public final class HttpClientCodec } private final class Decoder extends HttpResponseDecoder { - Decoder(int maxInitialLineLength, int maxHeaderSize, int maxChunkSize) { super(maxInitialLineLength, maxHeaderSize, maxChunkSize); } @Override - protected Object decode( - ChannelHandlerContext ctx, ByteBuf buffer) throws Exception { + protected void decode( + ChannelHandlerContext ctx, ByteBuf buffer, MessageBuf out) throws Exception { if (done) { int readable = actualReadableBytes(); if (readable == 0) { // if non is readable just return null // https://github.com/netty/netty/issues/1159 - return null; + return; } - return buffer.readBytes(readable); + out.add(buffer.readBytes(readable)); } else { - Object msg = super.decode(ctx, buffer); if (failOnMissingResponse) { - decrement(msg); + out = new FilteredMessageBuf(out) { + @Override + protected Object filter(Object msg) { + decrement(msg); + return msg; + } + }; } - return msg; + super.decode(ctx, buffer, out); } } @@ -172,11 +178,6 @@ public final class HttpClientCodec // check if it's an Header and its transfer encoding is not chunked. if (msg instanceof LastHttpContent) { requestResponseCounter.decrementAndGet(); - } else if (msg instanceof Object[]) { - Object[] objects = (Object[]) msg; - for (Object obj: objects) { - decrement(obj); - } } } diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/HttpContentDecoder.java b/codec-http/src/main/java/io/netty/handler/codec/http/HttpContentDecoder.java index 2ec27ad97f..df3ce33c54 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/HttpContentDecoder.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/HttpContentDecoder.java @@ -18,6 +18,7 @@ package io.netty.handler.codec.http; import io.netty.buffer.BufUtil; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufHolder; +import io.netty.buffer.MessageBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.embedded.EmbeddedByteChannel; @@ -50,14 +51,15 @@ public abstract class HttpContentDecoder extends MessageToMessageDecoder out) throws Exception { if (msg instanceof HttpResponse && ((HttpResponse) msg).getStatus().code() == 100) { - // 100-continue response must be passed through. - BufUtil.retain(msg); + if (!(msg instanceof LastHttpContent)) { continueResponse = true; } - return msg; + // 100-continue response must be passed through. + out.add(BufUtil.retain(msg)); + return; } if (continueResponse) { @@ -65,8 +67,8 @@ public abstract class HttpContentDecoder extends MessageToMessageDecoder out) throws Exception { String acceptedEncoding = msg.headers().get(HttpHeaders.Names.ACCEPT_ENCODING); if (acceptedEncoding == null) { acceptedEncoding = HttpHeaders.Values.IDENTITY; } acceptEncodingQueue.add(acceptedEncoding); - BufUtil.retain(msg); - return msg; + out.add(BufUtil.retain(msg)); } @Override - protected Object encode(ChannelHandlerContext ctx, HttpObject msg) + protected void encode(ChannelHandlerContext ctx, HttpObject msg, MessageBuf out) throws Exception { if (msg instanceof HttpResponse && ((HttpResponse) msg).getStatus().code() == 100) { - // 100-continue response must be passed through. - BufUtil.retain(msg); + if (!(msg instanceof LastHttpContent)) { continueResponse = true; } - return msg; + // 100-continue response must be passed through. + out.add(BufUtil.retain(msg)); + return; } if (continueResponse) { @@ -84,8 +86,8 @@ public abstract class HttpContentEncoder extends MessageToMessageCodec { } @Override - protected Object decode(ChannelHandlerContext ctx, HttpObject msg) throws Exception { + protected void decode(ChannelHandlerContext ctx, HttpObject msg, MessageBuf out) throws Exception { FullHttpMessage currentMessage = this.currentMessage; if (msg instanceof HttpMessage) { @@ -126,8 +127,8 @@ public class HttpObjectAggregator extends MessageToMessageDecoder { if (!m.getDecoderResult().isSuccess()) { removeTransferEncodingChunked(m); this.currentMessage = null; - BufUtil.retain(m); - return m; + out.add(BufUtil.retain(m)); + return; } if (msg instanceof HttpRequest) { HttpRequest header = (HttpRequest) msg; @@ -146,8 +147,6 @@ public class HttpObjectAggregator extends MessageToMessageDecoder { // A streamed message - initialize the cumulative buffer, and wait for incoming chunks. removeTransferEncodingChunked(currentMessage); - return null; - } else if (msg instanceof HttpContent) { assert currentMessage != null; @@ -196,9 +195,7 @@ public class HttpObjectAggregator extends MessageToMessageDecoder { String.valueOf(content.readableBytes())); // All done - return currentMessage; - } else { - return null; + out.add(currentMessage); } } else { throw new Error(); diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/HttpObjectDecoder.java b/codec-http/src/main/java/io/netty/handler/codec/http/HttpObjectDecoder.java index dab0c44e8a..dc4c78c087 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/HttpObjectDecoder.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/HttpObjectDecoder.java @@ -16,6 +16,7 @@ package io.netty.handler.codec.http; import io.netty.buffer.ByteBuf; +import io.netty.buffer.MessageBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPipeline; @@ -167,7 +168,7 @@ public abstract class HttpObjectDecoder extends ReplayingDecoder out) throws Exception { switch (state()) { case SKIP_CONTROL_CHARS: { try { @@ -182,14 +183,15 @@ public abstract class HttpObjectDecoder extends ReplayingDecoder maxChunkSize || HttpHeaders.is100ContinueExpected(message)) { // Generate FullHttpMessage first. HttpChunks will follow. checkpoint(State.READ_VARIABLE_LENGTH_CONTENT_AS_CHUNKS); - return message; + out.add(message); + return; } break; default: throw new IllegalStateException("Unexpected state: " + nextState); } - // We return null here, this forces decode to be called again where we will decode the content - return null; + // We return here, this forces decode to be called again where we will decode the content + return; } catch (Exception e) { - return invalidMessage(e); + out.add(invalidMessage(e)); + return; } case READ_VARIABLE_LENGTH_CONTENT: { int toRead = actualReadableBytes(); if (toRead > maxChunkSize) { toRead = maxChunkSize; } - return new Object[] { message, new DefaultHttpContent(buffer.readBytes(toRead))}; + out.add(message); + out.add(new DefaultHttpContent(buffer.readBytes(toRead))); + return; } case READ_VARIABLE_LENGTH_CONTENT_AS_CHUNKS: { // Keep reading data as a chunk until the end of connection is reached. @@ -253,12 +269,18 @@ public abstract class HttpObjectDecoder extends ReplayingDecoder maxChunkSize) { // A chunk is too large. Split them into multiple chunks again. checkpoint(State.READ_CHUNKED_CONTENT_AS_CHUNKS); @@ -314,13 +338,15 @@ public abstract class HttpObjectDecoder extends ReplayingDecoder { } @Override - protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception { + protected void decode(ChannelHandlerContext ctx, ByteBuf in, MessageBuf out) throws Exception { // Discard all data received if closing handshake was received before. if (receivedClosingHandshake) { in.skipBytes(actualReadableBytes()); - return null; + return; } // Decode a frame otherwise. byte type = in.readByte(); if ((type & 0x80) == 0x80) { // If the MSB on type is set, decode the frame length - return decodeBinaryFrame(ctx, type, in); + out.add(decodeBinaryFrame(ctx, type, in)); } else { // Decode a 0xff terminated UTF-8 string - return decodeTextFrame(ctx, in); + out.add(decodeTextFrame(ctx, in)); } } diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocket08FrameDecoder.java b/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocket08FrameDecoder.java index b1601de28e..2e0be188d7 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocket08FrameDecoder.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocket08FrameDecoder.java @@ -54,6 +54,7 @@ package io.netty.handler.codec.http.websocketx; import io.netty.buffer.ByteBuf; +import io.netty.buffer.MessageBuf; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.CorruptedFrameException; @@ -117,12 +118,12 @@ public class WebSocket08FrameDecoder extends ReplayingDecoder out) throws Exception { // Discard all data received if closing handshake was received before. if (receivedClosingHandshake) { in.skipBytes(actualReadableBytes()); - return null; + return; } switch (state()) { @@ -148,31 +149,31 @@ public class WebSocket08FrameDecoder extends ReplayingDecoder 7) { // control frame (have MSB in opcode set) // control frames MUST NOT be fragmented if (!frameFinalFlag) { protocolViolation(ctx, "fragmented control frame"); - return null; + return; } // control frames MUST have payload 125 octets or less if (framePayloadLen1 > 125) { protocolViolation(ctx, "control frame with payload length > 125 octets"); - return null; + return; } // check for reserved control frame opcodes if (!(frameOpcode == OPCODE_CLOSE || frameOpcode == OPCODE_PING || frameOpcode == OPCODE_PONG)) { protocolViolation(ctx, "control frame using reserved opcode " + frameOpcode); - return null; + return; } // close frame : if there is a body, the first two bytes of the @@ -180,25 +181,25 @@ public class WebSocket08FrameDecoder extends ReplayingDecoder maxFramePayloadLength) { protocolViolation(ctx, "Max frame length of " + maxFramePayloadLength + " has been exceeded."); - return null; + return; } if (logger.isDebugEnabled()) { @@ -262,7 +263,7 @@ public class WebSocket08FrameDecoder extends ReplayingDecoder framePayloadLength) { // We have more than what we need so read up to the end of frame // Leave the remainder in the buffer for next frame @@ -291,15 +292,18 @@ public class WebSocket08FrameDecoder extends ReplayingDecoder out) throws Exception { if (currentFrame == null) { if (msg.isFinalFragment()) { - return msg.retain(); + out.add(msg.retain()); + return; } ByteBuf buf = ctx.alloc().compositeBuffer().addComponent(msg.data().retain()); buf.writerIndex(buf.writerIndex() + msg.data().readableBytes()); @@ -61,7 +63,7 @@ public class WebSocketFrameAggregator extends MessageToMessageDecoder out) throws Exception { try { - return decode(ctx, in); + decode(ctx, in, out); } finally { headerBlockDecompressor.end(); } } @Override - protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception { + protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, MessageBuf out) throws Exception { switch(state) { case READ_COMMON_HEADER: state = readCommonHeader(buffer); @@ -121,19 +122,20 @@ public class SpdyFrameDecoder extends ByteToMessageDecoder { if (streamID == 0) { state = State.FRAME_ERROR; fireProtocolException(ctx, "Received invalid data frame"); - return null; + return; } SpdyDataFrame spdyDataFrame = new DefaultSpdyDataFrame(streamID); spdyDataFrame.setLast((flags & SPDY_DATA_FLAG_FIN) != 0); state = State.READ_COMMON_HEADER; - return spdyDataFrame; + out.add(spdyDataFrame); + return; } // There are no length 0 control frames state = State.READ_COMMON_HEADER; } - return null; + return; case READ_CONTROL_FRAME: try { @@ -141,18 +143,19 @@ public class SpdyFrameDecoder extends ByteToMessageDecoder { if (frame != null) { state = State.READ_COMMON_HEADER; } - return frame; + out.add(frame); + return; } catch (IllegalArgumentException e) { state = State.FRAME_ERROR; fireInvalidControlFrameException(ctx); } - return null; + return; case READ_SETTINGS_FRAME: if (spdySettingsFrame == null) { // Validate frame length against number of entries if (buffer.readableBytes() < 4) { - return null; + return; } int numEntries = getUnsignedInt(buffer, buffer.readerIndex()); buffer.skipBytes(4); @@ -162,7 +165,7 @@ public class SpdyFrameDecoder extends ByteToMessageDecoder { if ((length & 0x07) != 0 || length >> 3 != numEntries) { state = State.FRAME_ERROR; fireInvalidControlFrameException(ctx); - return null; + return; } spdySettingsFrame = new DefaultSpdySettingsFrame(); @@ -196,7 +199,7 @@ public class SpdyFrameDecoder extends ByteToMessageDecoder { state = State.FRAME_ERROR; spdySettingsFrame = null; fireInvalidControlFrameException(ctx); - return null; + return; } if (!spdySettingsFrame.isSet(ID)) { @@ -211,9 +214,10 @@ public class SpdyFrameDecoder extends ByteToMessageDecoder { state = State.READ_COMMON_HEADER; Object frame = spdySettingsFrame; spdySettingsFrame = null; - return frame; + out.add(frame); + return; } - return null; + return; case READ_HEADER_BLOCK_FRAME: try { @@ -223,15 +227,16 @@ public class SpdyFrameDecoder extends ByteToMessageDecoder { state = State.READ_COMMON_HEADER; Object frame = spdyHeaderBlock; spdyHeaderBlock = null; - return frame; + out.add(frame); + return; } state = State.READ_HEADER_BLOCK; } - return null; + return; } catch (IllegalArgumentException e) { state = State.FRAME_ERROR; fireInvalidControlFrameException(ctx); - return null; + return; } case READ_HEADER_BLOCK: @@ -245,7 +250,7 @@ public class SpdyFrameDecoder extends ByteToMessageDecoder { spdyHeaderBlock = null; decompressed = null; ctx.fireExceptionCaught(e); - return null; + return; } if (spdyHeaderBlock != null && spdyHeaderBlock.isInvalid()) { @@ -255,22 +260,24 @@ public class SpdyFrameDecoder extends ByteToMessageDecoder { if (length == 0) { state = State.READ_COMMON_HEADER; } - return frame; + out.add(frame); + return; } if (length == 0) { Object frame = spdyHeaderBlock; spdyHeaderBlock = null; state = State.READ_COMMON_HEADER; - return frame; + out.add(frame); + return; } - return null; + return; case READ_DATA_FRAME: if (streamID == 0) { state = State.FRAME_ERROR; fireProtocolException(ctx, "Received invalid data frame"); - return null; + return; } // Generate data frames that do not exceed maxChunkSize @@ -278,7 +285,7 @@ public class SpdyFrameDecoder extends ByteToMessageDecoder { // Wait until entire frame is readable if (buffer.readableBytes() < dataLength) { - return null; + return; } ByteBuf data = ctx.alloc().buffer(dataLength); @@ -290,7 +297,8 @@ public class SpdyFrameDecoder extends ByteToMessageDecoder { spdyDataFrame.setLast((flags & SPDY_DATA_FLAG_FIN) != 0); state = State.READ_COMMON_HEADER; } - return spdyDataFrame; + out.add(spdyDataFrame); + return; case DISCARD_FRAME: int numBytes = Math.min(buffer.readableBytes(), length); @@ -299,11 +307,11 @@ public class SpdyFrameDecoder extends ByteToMessageDecoder { if (length == 0) { state = State.READ_COMMON_HEADER; } - return null; + return; case FRAME_ERROR: buffer.skipBytes(buffer.readableBytes()); - return null; + return; default: throw new Error("Shouldn't reach here."); diff --git a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyHttpDecoder.java b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyHttpDecoder.java index 485e906ca7..75c0e26a35 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyHttpDecoder.java +++ b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyHttpDecoder.java @@ -16,6 +16,7 @@ package io.netty.handler.codec.spdy; import io.netty.buffer.ByteBuf; +import io.netty.buffer.MessageBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToMessageDecoder; import io.netty.handler.codec.TooLongFrameException; @@ -65,7 +66,8 @@ public class SpdyHttpDecoder extends MessageToMessageDecoder out) + throws Exception { if (msg instanceof SpdySynStreamFrame) { // HTTP requests/responses are mapped one-to-one to SPDY streams. @@ -106,7 +108,8 @@ public class SpdyHttpDecoder extends MessageToMessageDecoder e: spdyHeadersFrame.headers().entries()) { @@ -193,7 +198,7 @@ public class SpdyHttpDecoder extends MessageToMessageDecoder { } @Override - protected Object encode(ChannelHandlerContext ctx, HttpObject msg) throws Exception { - - List out = new ArrayList(); + protected void encode(ChannelHandlerContext ctx, HttpObject msg, MessageBuf out) throws Exception { boolean valid = false; @@ -199,8 +197,6 @@ public class SpdyHttpEncoder extends MessageToMessageEncoder { if (!valid) { throw new UnsupportedMessageTypeException(msg); } - - return out.toArray(); } private SpdySynStreamFrame createSynStreamFrame(HttpMessage httpMessage) diff --git a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyHttpResponseStreamIdHandler.java b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyHttpResponseStreamIdHandler.java index 1cbcbdcc4a..cb18684d67 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyHttpResponseStreamIdHandler.java +++ b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyHttpResponseStreamIdHandler.java @@ -16,6 +16,7 @@ package io.netty.handler.codec.spdy; import io.netty.buffer.BufUtil; +import io.netty.buffer.MessageBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToMessageCodec; import io.netty.handler.codec.http.HttpMessage; @@ -39,18 +40,17 @@ public class SpdyHttpResponseStreamIdHandler extends } @Override - protected Object encode(ChannelHandlerContext ctx, HttpMessage msg) throws Exception { + protected void encode(ChannelHandlerContext ctx, HttpMessage msg, MessageBuf out) throws Exception { Integer id = ids.poll(); if (id != null && id.intValue() != NO_ID && !msg.headers().contains(SpdyHttpHeaders.Names.STREAM_ID)) { SpdyHttpHeaders.setStreamId(msg, id); } - BufUtil.retain(msg); - return msg; + out.add(BufUtil.retain(msg)); } @Override - protected Object decode(ChannelHandlerContext ctx, Object msg) throws Exception { + protected void decode(ChannelHandlerContext ctx, Object msg, MessageBuf out) throws Exception { if (msg instanceof HttpMessage) { boolean contains = ((HttpMessage) msg).headers().contains(SpdyHttpHeaders.Names.STREAM_ID); if (!contains) { @@ -62,7 +62,6 @@ public class SpdyHttpResponseStreamIdHandler extends ids.remove(((SpdyRstStreamFrame) msg).getStreamId()); } - BufUtil.retain(msg); - return msg; + out.add(BufUtil.retain(msg)); } } diff --git a/codec-socks/src/main/java/io/netty/handler/codec/socks/SocksAuthRequestDecoder.java b/codec-socks/src/main/java/io/netty/handler/codec/socks/SocksAuthRequestDecoder.java index 60bb857a6a..53101ed862 100644 --- a/codec-socks/src/main/java/io/netty/handler/codec/socks/SocksAuthRequestDecoder.java +++ b/codec-socks/src/main/java/io/netty/handler/codec/socks/SocksAuthRequestDecoder.java @@ -16,6 +16,7 @@ package io.netty.handler.codec.socks; import io.netty.buffer.ByteBuf; +import io.netty.buffer.MessageBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ReplayingDecoder; import io.netty.util.CharsetUtil; @@ -42,7 +43,7 @@ public class SocksAuthRequestDecoder extends ReplayingDecoder out) throws Exception { switch (state()) { case CHECK_PROTOCOL_VERSION: { version = SocksSubnegotiationVersion.fromByte(byteBuf.readByte()); @@ -63,7 +64,7 @@ public class SocksAuthRequestDecoder extends ReplayingDecoder out) + throws Exception { switch (state()) { case CHECK_PROTOCOL_VERSION: { version = SocksSubnegotiationVersion.fromByte(byteBuf.readByte()); @@ -54,7 +56,7 @@ public class SocksAuthResponseDecoder extends ReplayingDecoder out) throws Exception { switch (state()) { case CHECK_PROTOCOL_VERSION: { version = SocksProtocolVersion.fromByte(byteBuf.readByte()); @@ -87,7 +88,7 @@ public class SocksCmdRequestDecoder extends ReplayingDecoder out) throws Exception { switch (state()) { case CHECK_PROTOCOL_VERSION: { version = SocksProtocolVersion.fromByte(byteBuf.readByte()); @@ -87,7 +88,7 @@ public class SocksCmdResponseDecoder extends ReplayingDecoder out) throws Exception { switch (state()) { case CHECK_PROTOCOL_VERSION: { version = SocksProtocolVersion.fromByte(byteBuf.readByte()); @@ -63,7 +64,7 @@ public class SocksInitRequestDecoder extends ReplayingDecoder out) throws Exception { switch (state()) { case CHECK_PROTOCOL_VERSION: { version = SocksProtocolVersion.fromByte(byteBuf.readByte()); @@ -56,7 +57,7 @@ public class SocksInitResponseDecoder extends ReplayingDecoder extends ChannelDuplexHandler private final ByteToMessageDecoder decoder = new ByteToMessageDecoder() { @Override - public Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception { - return ByteToMessageCodec.this.decode(ctx, in); + public void decode(ChannelHandlerContext ctx, ByteBuf in, MessageBuf out) throws Exception { + ByteToMessageCodec.this.decode(ctx, in, out); } @Override - protected Object decodeLast(ChannelHandlerContext ctx, ByteBuf in) throws Exception { - return ByteToMessageCodec.this.decodeLast(ctx, in); + protected void decodeLast(ChannelHandlerContext ctx, ByteBuf in, MessageBuf out) throws Exception { + ByteToMessageCodec.this.decodeLast(ctx, in, out); } }; @@ -105,8 +105,8 @@ public abstract class ByteToMessageCodec extends ChannelDuplexHandler } protected abstract void encode(ChannelHandlerContext ctx, I msg, ByteBuf out) throws Exception; - protected abstract Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception; - protected Object decodeLast(ChannelHandlerContext ctx, ByteBuf in) throws Exception { - return decode(ctx, in); + protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, MessageBuf out) throws Exception; + protected void decodeLast(ChannelHandlerContext ctx, ByteBuf in, MessageBuf out) throws Exception { + decode(ctx, in, out); } } diff --git a/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java b/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java index 157d4b3b6e..04b0fed5f7 100644 --- a/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java @@ -17,7 +17,9 @@ package io.netty.handler.codec; import io.netty.buffer.ByteBuf; import io.netty.buffer.MessageBuf; +import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelHandlerUtil; import io.netty.channel.ChannelInboundByteHandler; import io.netty.channel.ChannelInboundByteHandlerAdapter; @@ -31,9 +33,9 @@ import io.netty.channel.ChannelInboundByteHandlerAdapter; *
  *     public class SquareDecoder extends {@link ByteToMessageDecoder} {
  *         {@code @Override}
- *         public {@link Object} decode({@link ChannelHandlerContext} ctx, {@link ByteBuf} in)
+ *         public void decode({@link ChannelHandlerContext} ctx, {@link ByteBuf} in, {@link MessageBuf} out)
  *                 throws {@link Exception} {
- *             return in.readBytes(in.readableBytes());
+ *             out.add(in.readBytes(in.readableBytes()));
  *         }
  *     }
  * 
@@ -43,6 +45,19 @@ public abstract class ByteToMessageDecoder private volatile boolean singleDecode; private boolean decodeWasNull; + private MessageBuf decoderOutput; + + @Override + public ByteBuf newInboundBuffer(ChannelHandlerContext ctx) throws Exception { + decoderOutput = Unpooled.messageBuffer(); + return super.newInboundBuffer(ctx); + } + + @Override + public void freeInboundBuffer(ChannelHandlerContext ctx) throws Exception { + super.freeInboundBuffer(ctx); + decoderOutput.release(); + } /** * If set then only one message is decoded on each {@link #inboundBufferUpdated(ChannelHandlerContext)} call. @@ -82,22 +97,33 @@ public abstract class ByteToMessageDecoder @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { + MessageBuf out = decoderOutput(); try { ByteBuf in = ctx.inboundByteBuffer(); if (in.isReadable()) { callDecode(ctx, in); } + decodeLast(ctx, in, out); - if (ctx.nextInboundMessageBuffer().unfoldAndAdd(decodeLast(ctx, in))) { - ctx.fireInboundBufferUpdated(); - } } catch (Throwable t) { if (t instanceof CodecException) { - ctx.fireExceptionCaught(t); + throw (CodecException) t; } else { - ctx.fireExceptionCaught(new DecoderException(t)); + throw new DecoderException(t); } } finally { + boolean decoded = false; + for (;;) { + Object msg = out.poll(); + if (msg == null) { + break; + } + decoded = true; + ChannelHandlerUtil.addToNextInboundBuffer(ctx, msg); + } + if (decoded) { + ctx.fireInboundBufferUpdated(); + } ctx.fireChannelInactive(); } } @@ -106,12 +132,16 @@ public abstract class ByteToMessageDecoder boolean wasNull = false; boolean decoded = false; - MessageBuf out = ctx.nextInboundMessageBuffer(); + MessageBuf out = decoderOutput(); + + assert out.isEmpty(); + while (in.isReadable()) { try { + int outSize = out.size(); int oldInputLength = in.readableBytes(); - Object o = decode(ctx, in); - if (o == null) { + decode(ctx, in, out); + if (outSize == out.size()) { wasNull = true; if (oldInputLength == in.readableBytes()) { break; @@ -119,33 +149,36 @@ public abstract class ByteToMessageDecoder continue; } } + wasNull = false; if (oldInputLength == in.readableBytes()) { throw new IllegalStateException( "decode() did not read anything but decoded a message."); } - if (out.unfoldAndAdd(o)) { - decoded = true; - if (isSingleDecode()) { - break; - } - } else { + if (isSingleDecode()) { break; } } catch (Throwable t) { + if (t instanceof CodecException) { + throw (CodecException) t; + } else { + throw new DecoderException(t); + } + } finally { + for (;;) { + Object msg = out.poll(); + if (msg == null) { + break; + } + decoded = true; + ChannelHandlerUtil.addToNextInboundBuffer(ctx, msg); + } + if (decoded) { decoded = false; ctx.fireInboundBufferUpdated(); } - - if (t instanceof CodecException) { - ctx.fireExceptionCaught(t); - } else { - ctx.fireExceptionCaught(new DecoderException(t)); - } - - break; } } @@ -166,20 +199,25 @@ public abstract class ByteToMessageDecoder * * @param ctx the {@link ChannelHandlerContext} which this {@link ByteToByteDecoder} belongs to * @param in the {@link ByteBuf} from which to read data - * @return message the message to which the content of the {@link ByteBuf} was decoded, or {@code null} if - * there was not enough data left in the {@link ByteBuf} to decode. + * @param out the {@link MessageBuf} to which decoded messages should be added + * @throws Exception is thrown if an error accour */ - protected abstract Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception; + protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, MessageBuf out) throws Exception; /** * Is called one last time when the {@link ChannelHandlerContext} goes in-active. Which means the * {@link #channelInactive(ChannelHandlerContext)} was triggered. * - * By default this will just call {@link #decode(ChannelHandlerContext, ByteBuf)} but sub-classes may + * By default this will just call {@link #decode(ChannelHandlerContext, ByteBuf, MessageBuf)} but sub-classes may * override this for some special cleanup operation. */ - protected Object decodeLast(ChannelHandlerContext ctx, ByteBuf in) throws Exception { - return decode(ctx, in); + protected void decodeLast(ChannelHandlerContext ctx, ByteBuf in, MessageBuf out) throws Exception { + decode(ctx, in, out); } + + final MessageBuf decoderOutput() { + return decoderOutput; + } + } diff --git a/codec/src/main/java/io/netty/handler/codec/DelimiterBasedFrameDecoder.java b/codec/src/main/java/io/netty/handler/codec/DelimiterBasedFrameDecoder.java index 438a984d02..3d22a7dc83 100644 --- a/codec/src/main/java/io/netty/handler/codec/DelimiterBasedFrameDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/DelimiterBasedFrameDecoder.java @@ -16,6 +16,7 @@ package io.netty.handler.codec; import io.netty.buffer.ByteBuf; +import io.netty.buffer.MessageBuf; import io.netty.channel.ChannelHandlerContext; /** @@ -210,6 +211,13 @@ public class DelimiterBasedFrameDecoder extends ByteToMessageDecoder { } @Override + protected void decode(ChannelHandlerContext ctx, ByteBuf in, MessageBuf out) throws Exception { + Object decoded = decode(ctx, in); + if (decoded != null) { + out.add(decoded); + } + } + protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception { if (lineBasedDecoder != null) { return lineBasedDecoder.decode(ctx, buffer); diff --git a/codec/src/main/java/io/netty/handler/codec/FixedLengthFrameDecoder.java b/codec/src/main/java/io/netty/handler/codec/FixedLengthFrameDecoder.java index bc86dc4d6d..600c19babd 100644 --- a/codec/src/main/java/io/netty/handler/codec/FixedLengthFrameDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/FixedLengthFrameDecoder.java @@ -16,6 +16,7 @@ package io.netty.handler.codec; import io.netty.buffer.ByteBuf; +import io.netty.buffer.MessageBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerUtil; @@ -75,6 +76,13 @@ public class FixedLengthFrameDecoder extends ByteToMessageDecoder { } @Override + protected void decode(ChannelHandlerContext ctx, ByteBuf in, MessageBuf out) throws Exception { + Object decoded = decode(ctx, in); + if (decoded != null) { + out.add(decoded); + } + } + protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception { if (in.readableBytes() < frameLength) { return null; diff --git a/codec/src/main/java/io/netty/handler/codec/LengthFieldBasedFrameDecoder.java b/codec/src/main/java/io/netty/handler/codec/LengthFieldBasedFrameDecoder.java index a42f923b8e..b9a6e357b2 100644 --- a/codec/src/main/java/io/netty/handler/codec/LengthFieldBasedFrameDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/LengthFieldBasedFrameDecoder.java @@ -16,6 +16,7 @@ package io.netty.handler.codec; import io.netty.buffer.ByteBuf; +import io.netty.buffer.MessageBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.serialization.ObjectDecoder; @@ -347,6 +348,13 @@ public class LengthFieldBasedFrameDecoder extends ByteToMessageDecoder { } @Override + protected void decode(ChannelHandlerContext ctx, ByteBuf in, MessageBuf out) throws Exception { + Object decoded = decode(ctx, in); + if (decoded != null) { + out.add(decoded); + } + } + protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception { if (discardingTooLongFrame) { long bytesToDiscard = this.bytesToDiscard; @@ -446,12 +454,12 @@ public class LengthFieldBasedFrameDecoder extends ByteToMessageDecoder { discardingTooLongFrame = false; if (!failFast || failFast && firstDetectionOfTooLongFrame) { - fail(ctx, tooLongFrameLength); + fail(tooLongFrameLength); } } else { // Keep discarding and notify handlers if necessary. if (failFast && firstDetectionOfTooLongFrame) { - fail(ctx, tooLongFrameLength); + fail(tooLongFrameLength); } } } @@ -473,17 +481,15 @@ public class LengthFieldBasedFrameDecoder extends ByteToMessageDecoder { return frame; } - private void fail(ChannelHandlerContext ctx, long frameLength) { + private void fail(long frameLength) { if (frameLength > 0) { - ctx.fireExceptionCaught( - new TooLongFrameException( + throw new TooLongFrameException( "Adjusted frame length exceeds " + maxFrameLength + - ": " + frameLength + " - discarded")); + ": " + frameLength + " - discarded"); } else { - ctx.fireExceptionCaught( - new TooLongFrameException( + throw new TooLongFrameException( "Adjusted frame length exceeds " + maxFrameLength + - " - discarding")); + " - discarding"); } } } diff --git a/codec/src/main/java/io/netty/handler/codec/LineBasedFrameDecoder.java b/codec/src/main/java/io/netty/handler/codec/LineBasedFrameDecoder.java index 1c064bc52e..cc77c8e0ae 100644 --- a/codec/src/main/java/io/netty/handler/codec/LineBasedFrameDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/LineBasedFrameDecoder.java @@ -16,6 +16,7 @@ package io.netty.handler.codec; import io.netty.buffer.ByteBuf; +import io.netty.buffer.MessageBuf; import io.netty.channel.ChannelHandlerContext; /** @@ -67,6 +68,13 @@ public class LineBasedFrameDecoder extends ByteToMessageDecoder { } @Override + protected void decode(ChannelHandlerContext ctx, ByteBuf in, MessageBuf out) throws Exception { + Object decoded = decode(ctx, in); + if (decoded != null) { + out.add(decoded); + } + } + protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception { final int eol = findEndOfLine(buffer); if (eol != -1) { diff --git a/codec/src/main/java/io/netty/handler/codec/MessageToMessageCodec.java b/codec/src/main/java/io/netty/handler/codec/MessageToMessageCodec.java index fb4b586405..f451a2effd 100644 --- a/codec/src/main/java/io/netty/handler/codec/MessageToMessageCodec.java +++ b/codec/src/main/java/io/netty/handler/codec/MessageToMessageCodec.java @@ -62,8 +62,8 @@ public abstract class MessageToMessageCodec @Override @SuppressWarnings("unchecked") - protected Object encode(ChannelHandlerContext ctx, Object msg) throws Exception { - return MessageToMessageCodec.this.encode(ctx, (OUTBOUND_IN) msg); + protected void encode(ChannelHandlerContext ctx, Object msg, MessageBuf out) throws Exception { + MessageToMessageCodec.this.encode(ctx, (OUTBOUND_IN) msg, out); } }; @@ -77,8 +77,8 @@ public abstract class MessageToMessageCodec @Override @SuppressWarnings("unchecked") - protected Object decode(ChannelHandlerContext ctx, Object msg) throws Exception { - return MessageToMessageCodec.this.decode(ctx, (INBOUND_IN) msg); + protected void decode(ChannelHandlerContext ctx, Object msg, MessageBuf out) throws Exception { + MessageToMessageCodec.this.decode(ctx, (INBOUND_IN) msg, out); } }; @@ -147,6 +147,6 @@ public abstract class MessageToMessageCodec return outboundMsgMatcher.match(msg); } - protected abstract Object encode(ChannelHandlerContext ctx, OUTBOUND_IN msg) throws Exception; - protected abstract Object decode(ChannelHandlerContext ctx, INBOUND_IN msg) throws Exception; + protected abstract void encode(ChannelHandlerContext ctx, OUTBOUND_IN msg, MessageBuf out) throws Exception; + protected abstract void decode(ChannelHandlerContext ctx, INBOUND_IN msg, MessageBuf out) throws Exception; } diff --git a/codec/src/main/java/io/netty/handler/codec/MessageToMessageDecoder.java b/codec/src/main/java/io/netty/handler/codec/MessageToMessageDecoder.java index 0c52e0002e..3cdb30678d 100644 --- a/codec/src/main/java/io/netty/handler/codec/MessageToMessageDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/MessageToMessageDecoder.java @@ -16,7 +16,9 @@ package io.netty.handler.codec; import io.netty.buffer.MessageBuf; +import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelHandlerUtil; import io.netty.channel.ChannelInboundMessageHandler; import io.netty.channel.ChannelInboundMessageHandlerAdapter; @@ -29,14 +31,11 @@ import io.netty.channel.ChannelInboundMessageHandlerAdapter; *
  *     public class StringToIntegerDecoder extends
  *             {@link MessageToMessageDecoder}<{@link String}> {
- *         public StringToIntegerDecoder() {
- *             super(String.class);
- *         }
  *
  *         {@code @Override}
- *         public {@link Object} decode({@link ChannelHandlerContext} ctx, {@link String} message)
- *                 throws {@link Exception} {
- *             return message.length());
+ *         public void decode({@link ChannelHandlerContext} ctx, {@link String} message,
+ *                 {@link MessageBuf} out) throws {@link Exception} {
+ *             out.add(message.length());
  *         }
  *     }
  * 
@@ -44,6 +43,14 @@ import io.netty.channel.ChannelInboundMessageHandlerAdapter; */ public abstract class MessageToMessageDecoder extends ChannelInboundMessageHandlerAdapter { + private static final ThreadLocal> decoderOutput = + new ThreadLocal>() { + @Override + protected MessageBuf initialValue() { + return Unpooled.messageBuffer(); + } + }; + protected MessageToMessageDecoder() { } protected MessageToMessageDecoder(Class inboundMessageType) { @@ -52,7 +59,18 @@ public abstract class MessageToMessageDecoder extends ChannelInboundMessageHa @Override public final void messageReceived(ChannelHandlerContext ctx, I msg) throws Exception { - ctx.nextInboundMessageBuffer().unfoldAndAdd(decode(ctx, msg)); + MessageBuf out = decoderOutput.get(); + try { + decode(ctx, msg, out); + } finally { + for (;;) { + Object obj = out.poll(); + if (obj == null) { + break; + } + ChannelHandlerUtil.addToNextInboundBuffer(ctx, obj); + } + } } /** @@ -61,9 +79,8 @@ public abstract class MessageToMessageDecoder extends ChannelInboundMessageHa * * @param ctx the {@link ChannelHandlerContext} which this {@link MessageToMessageDecoder} belongs to * @param msg the message to decode to an other one - * @return message the decoded message or {@code null} if more messages are needed be cause the implementation - * needs to do some kind of aggragation + * @param out the {@link MessageBuf} to which decoded messages should be added * @throws Exception is thrown if an error accour */ - protected abstract Object decode(ChannelHandlerContext ctx, I msg) throws Exception; + protected abstract void decode(ChannelHandlerContext ctx, I msg, MessageBuf out) throws Exception; } diff --git a/codec/src/main/java/io/netty/handler/codec/MessageToMessageEncoder.java b/codec/src/main/java/io/netty/handler/codec/MessageToMessageEncoder.java index ed6d8745e1..d1482051c7 100644 --- a/codec/src/main/java/io/netty/handler/codec/MessageToMessageEncoder.java +++ b/codec/src/main/java/io/netty/handler/codec/MessageToMessageEncoder.java @@ -16,6 +16,7 @@ package io.netty.handler.codec; import io.netty.buffer.MessageBuf; +import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerUtil; import io.netty.channel.ChannelOutboundMessageHandlerAdapter; @@ -28,20 +29,24 @@ import io.netty.channel.ChannelOutboundMessageHandlerAdapter; *
  *     public class IntegerToStringEncoder extends
  *             {@link MessageToMessageEncoder}<{@link Integer}> {
- *         public StringToIntegerDecoder() {
- *             super(String.class);
- *         }
  *
  *         {@code @Override}
- *         public {@link Object} encode({@link ChannelHandlerContext} ctx, {@link Integer} message)
+ *         public void encode({@link ChannelHandlerContext} ctx, {@link Integer} message, {@link MessageBuf} out)
  *                 throws {@link Exception} {
- *             return message.toString();
+ *             out.add(message.toString());
  *         }
  *     }
  * 
* */ public abstract class MessageToMessageEncoder extends ChannelOutboundMessageHandlerAdapter { + private static final ThreadLocal> encoderOutput = + new ThreadLocal>() { + @Override + protected MessageBuf initialValue() { + return Unpooled.messageBuffer(); + } + }; protected MessageToMessageEncoder() { } @@ -51,16 +56,30 @@ public abstract class MessageToMessageEncoder extends ChannelOutboundMessageH @Override public final void flush(ChannelHandlerContext ctx, I msg) throws Exception { - try { - Object encoded = encode(ctx, msg); - // Handle special case when the encoded output is a ByteBuf and the next handler in the pipeline - // accept bytes. Related to #1222 - ChannelHandlerUtil.addToNextOutboundBuffer(ctx, encoded); + MessageBuf out = encoderOutput.get(); + assert out.isEmpty(); + + try { + encode(ctx, msg, out); } catch (CodecException e) { throw e; - } catch (Exception e) { - throw new CodecException(e); + } catch (Throwable cause) { + if (cause instanceof CodecException) { + throw (CodecException) cause; + } else { + throw new EncoderException(cause); + } + } finally { + for (;;) { + Object encoded = out.poll(); + if (encoded == null) { + break; + } + // Handle special case when the encoded output is a ByteBuf and the next handler in the pipeline + // accept bytes. Related to #1222 + ChannelHandlerUtil.addToNextOutboundBuffer(ctx, encoded); + } } } @@ -70,9 +89,9 @@ public abstract class MessageToMessageEncoder extends ChannelOutboundMessageH * * @param ctx the {@link ChannelHandlerContext} which this {@link MessageToMessageEncoder} belongs to * @param msg the message to encode to an other one - * @return message the encoded message or {@code null} if more messages are needed be cause the implementation + * @param out the {@link MessageBuf} into which the encoded msg should be added * needs to do some kind of aggragation * @throws Exception is thrown if an error accour */ - protected abstract Object encode(ChannelHandlerContext ctx, I msg) throws Exception; + protected abstract void encode(ChannelHandlerContext ctx, I msg, MessageBuf out) throws Exception; } diff --git a/codec/src/main/java/io/netty/handler/codec/ReplayingDecoder.java b/codec/src/main/java/io/netty/handler/codec/ReplayingDecoder.java index d712d3ff70..f5704c1e9b 100644 --- a/codec/src/main/java/io/netty/handler/codec/ReplayingDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/ReplayingDecoder.java @@ -19,6 +19,7 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.MessageBuf; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelHandlerUtil; import io.netty.channel.ChannelPipeline; import io.netty.util.Signal; @@ -364,6 +365,8 @@ public abstract class ReplayingDecoder extends ByteToMessageDecoder { @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { + MessageBuf out = decoderOutput(); + try { replayable.terminate(); ByteBuf in = cumulation; @@ -371,19 +374,31 @@ public abstract class ReplayingDecoder extends ByteToMessageDecoder { callDecode(ctx, in); } - if (ctx.nextInboundMessageBuffer().unfoldAndAdd(decodeLast(ctx, replayable))) { - ctx.fireInboundBufferUpdated(); - } + decodeLast(ctx, replayable, out); } catch (Signal replay) { // Ignore replay.expect(REPLAY); } catch (Throwable t) { if (t instanceof CodecException) { - ctx.fireExceptionCaught(t); + throw (CodecException) t; } else { - ctx.fireExceptionCaught(new DecoderException(t)); + throw new DecoderException(t); } } finally { + + boolean decoded = false; + for (;;) { + Object msg = out.poll(); + if (msg == null) { + break; + } + decoded = true; + ChannelHandlerUtil.addToNextInboundBuffer(ctx, msg); + } + if (decoded) { + ctx.fireInboundBufferUpdated(); + } + ctx.fireChannelInactive(); } } @@ -393,16 +408,17 @@ public abstract class ReplayingDecoder extends ByteToMessageDecoder { boolean wasNull = false; ByteBuf in = cumulation; - MessageBuf out = ctx.nextInboundMessageBuffer(); + MessageBuf out = decoderOutput(); boolean decoded = false; while (in.isReadable()) { try { int oldReaderIndex = checkpoint = in.readerIndex(); - Object result = null; + int outSize = out.size(); S oldState = state; try { - result = decode(ctx, replayable); - if (result == null) { + decode(ctx, replayable, out); + if (outSize == out.size()) { + wasNull = true; if (oldReaderIndex == in.readerIndex() && oldState == state) { throw new IllegalStateException( "null cannot be returned if no data is consumed and state didn't change."); @@ -422,13 +438,6 @@ public abstract class ReplayingDecoder extends ByteToMessageDecoder { // Called by cleanup() - no need to maintain the readerIndex // anymore because the buffer has been released already. } - } - - if (result == null) { - wasNull = true; - - // Seems like more data is required. - // Let us wait for the next notification. break; } wasNull = false; @@ -439,27 +448,26 @@ public abstract class ReplayingDecoder extends ByteToMessageDecoder { "if it returned a decoded message (caused by: " + getClass() + ')'); } + } catch (Throwable t) { + if (t instanceof CodecException) { + throw (CodecException) t; + } else { + throw new DecoderException(t); + } + } finally { - // A successful decode - if (out.unfoldAndAdd(result)) { - decoded = true; - if (isSingleDecode()) { + for (;;) { + Object msg = out.poll(); + if (msg == null) { break; } + decoded = true; + ChannelHandlerUtil.addToNextInboundBuffer(ctx, msg); } - } catch (Throwable t) { if (decoded) { decoded = false; ctx.fireInboundBufferUpdated(); } - - if (t instanceof CodecException) { - ctx.fireExceptionCaught(t); - } else { - ctx.fireExceptionCaught(new DecoderException(t)); - } - - break; } } diff --git a/codec/src/main/java/io/netty/handler/codec/base64/Base64Decoder.java b/codec/src/main/java/io/netty/handler/codec/base64/Base64Decoder.java index bd8bb840ee..ff1eb4458d 100644 --- a/codec/src/main/java/io/netty/handler/codec/base64/Base64Decoder.java +++ b/codec/src/main/java/io/netty/handler/codec/base64/Base64Decoder.java @@ -16,6 +16,7 @@ package io.netty.handler.codec.base64; import io.netty.buffer.ByteBuf; +import io.netty.buffer.MessageBuf; import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPipeline; @@ -58,7 +59,7 @@ public class Base64Decoder extends MessageToMessageDecoder { } @Override - protected Object decode(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { - return Base64.decode(msg, msg.readerIndex(), msg.readableBytes(), dialect); + protected void decode(ChannelHandlerContext ctx, ByteBuf msg, MessageBuf out) throws Exception { + out.add(Base64.decode(msg, msg.readerIndex(), msg.readableBytes(), dialect)); } } diff --git a/codec/src/main/java/io/netty/handler/codec/bytes/ByteArrayDecoder.java b/codec/src/main/java/io/netty/handler/codec/bytes/ByteArrayDecoder.java index 145de1ca1a..1d0f8acf9e 100644 --- a/codec/src/main/java/io/netty/handler/codec/bytes/ByteArrayDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/bytes/ByteArrayDecoder.java @@ -16,6 +16,7 @@ package io.netty.handler.codec.bytes; import io.netty.buffer.ByteBuf; +import io.netty.buffer.MessageBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPipeline; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; @@ -49,7 +50,7 @@ import io.netty.handler.codec.MessageToMessageDecoder; public class ByteArrayDecoder extends MessageToMessageDecoder { @Override - protected Object decode(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { + protected void decode(ChannelHandlerContext ctx, ByteBuf msg, MessageBuf out) throws Exception { byte[] array; if (msg.hasArray()) { if (msg.arrayOffset() == 0 && msg.readableBytes() == msg.capacity()) { @@ -67,6 +68,6 @@ public class ByteArrayDecoder extends MessageToMessageDecoder { msg.getBytes(0, array); } - return array; + out.add(array); } } diff --git a/codec/src/main/java/io/netty/handler/codec/marshalling/CompatibleMarshallingDecoder.java b/codec/src/main/java/io/netty/handler/codec/marshalling/CompatibleMarshallingDecoder.java index 2ed6b7be63..7af3ebd75e 100644 --- a/codec/src/main/java/io/netty/handler/codec/marshalling/CompatibleMarshallingDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/marshalling/CompatibleMarshallingDecoder.java @@ -16,6 +16,7 @@ package io.netty.handler.codec.marshalling; import io.netty.buffer.ByteBuf; +import io.netty.buffer.MessageBuf; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ReplayingDecoder; @@ -54,11 +55,11 @@ public class CompatibleMarshallingDecoder extends ReplayingDecoder { } @Override - protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception { + protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, MessageBuf out) throws Exception { if (discardingTooLongFrame) { buffer.skipBytes(actualReadableBytes()); checkpoint(); - return null; + return; } Unmarshaller unmarshaller = provider.getUnmarshaller(ctx); @@ -70,7 +71,7 @@ public class CompatibleMarshallingDecoder extends ReplayingDecoder { unmarshaller.start(input); Object obj = unmarshaller.readObject(); unmarshaller.finish(); - return obj; + out.add(obj); } catch (LimitingByteInput.TooBigObjectException e) { discardingTooLongFrame = true; throw new TooLongFrameException(); @@ -82,19 +83,19 @@ public class CompatibleMarshallingDecoder extends ReplayingDecoder { } @Override - protected Object decodeLast(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception { + protected void decodeLast(ChannelHandlerContext ctx, ByteBuf buffer, MessageBuf out) throws Exception { switch (buffer.readableBytes()) { case 0: - return null; + return; case 1: // Ignore the last TC_RESET if (buffer.getByte(buffer.readerIndex()) == ObjectStreamConstants.TC_RESET) { buffer.skipBytes(1); - return null; + return; } } - return decode(ctx, buffer); + decode(ctx, buffer, out); } @Override diff --git a/codec/src/main/java/io/netty/handler/codec/protobuf/ProtobufDecoder.java b/codec/src/main/java/io/netty/handler/codec/protobuf/ProtobufDecoder.java index 3a5c1fea25..7bce890497 100644 --- a/codec/src/main/java/io/netty/handler/codec/protobuf/ProtobufDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/protobuf/ProtobufDecoder.java @@ -19,6 +19,7 @@ import com.google.protobuf.ExtensionRegistry; import com.google.protobuf.Message; import com.google.protobuf.MessageLite; import io.netty.buffer.ByteBuf; +import io.netty.buffer.MessageBuf; import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPipeline; @@ -94,7 +95,7 @@ public class ProtobufDecoder extends MessageToMessageDecoder { } @Override - protected Object decode(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { + protected void decode(ChannelHandlerContext ctx, ByteBuf msg, MessageBuf out) throws Exception { final byte[] array; final int offset; final int length = msg.readableBytes(); @@ -109,15 +110,15 @@ public class ProtobufDecoder extends MessageToMessageDecoder { if (extensionRegistry == null) { if (HAS_PARSER) { - return prototype.getParserForType().parseFrom(array, offset, length); + out.add(prototype.getParserForType().parseFrom(array, offset, length)); } else { - return prototype.newBuilderForType().mergeFrom(array, offset, length).build(); + out.add(prototype.newBuilderForType().mergeFrom(array, offset, length).build()); } } else { if (HAS_PARSER) { - return prototype.getParserForType().parseFrom(array, offset, length, extensionRegistry); + out.add(prototype.getParserForType().parseFrom(array, offset, length, extensionRegistry)); } else { - return prototype.newBuilderForType().mergeFrom(array, offset, length, extensionRegistry).build(); + out.add(prototype.newBuilderForType().mergeFrom(array, offset, length, extensionRegistry).build()); } } } diff --git a/codec/src/main/java/io/netty/handler/codec/protobuf/ProtobufEncoder.java b/codec/src/main/java/io/netty/handler/codec/protobuf/ProtobufEncoder.java index 70d8a8719e..874f402af6 100644 --- a/codec/src/main/java/io/netty/handler/codec/protobuf/ProtobufEncoder.java +++ b/codec/src/main/java/io/netty/handler/codec/protobuf/ProtobufEncoder.java @@ -19,6 +19,7 @@ import com.google.protobuf.Message; import com.google.protobuf.MessageLite; import com.google.protobuf.MessageLiteOrBuilder; import io.netty.buffer.ByteBuf; +import io.netty.buffer.MessageBuf; import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPipeline; @@ -59,13 +60,14 @@ import static io.netty.buffer.Unpooled.*; public class ProtobufEncoder extends MessageToMessageEncoder { @Override - protected Object encode(ChannelHandlerContext ctx, MessageLiteOrBuilder msg) throws Exception { + protected void encode(ChannelHandlerContext ctx, MessageLiteOrBuilder msg, MessageBuf out) + throws Exception { if (msg instanceof MessageLite) { - return wrappedBuffer(((MessageLite) msg).toByteArray()); + out.add(wrappedBuffer(((MessageLite) msg).toByteArray())); + return; } if (msg instanceof MessageLite.Builder) { - return wrappedBuffer(((MessageLite.Builder) msg).build().toByteArray()); + out.add(wrappedBuffer(((MessageLite.Builder) msg).build().toByteArray())); } - return null; } } diff --git a/codec/src/main/java/io/netty/handler/codec/protobuf/ProtobufVarint32FrameDecoder.java b/codec/src/main/java/io/netty/handler/codec/protobuf/ProtobufVarint32FrameDecoder.java index 02f01c2020..c9b0059112 100644 --- a/codec/src/main/java/io/netty/handler/codec/protobuf/ProtobufVarint32FrameDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/protobuf/ProtobufVarint32FrameDecoder.java @@ -17,6 +17,7 @@ package io.netty.handler.codec.protobuf; import com.google.protobuf.CodedInputStream; import io.netty.buffer.ByteBuf; +import io.netty.buffer.MessageBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; import io.netty.handler.codec.CorruptedFrameException; @@ -42,13 +43,13 @@ public class ProtobufVarint32FrameDecoder extends ByteToMessageDecoder { // (just like LengthFieldBasedFrameDecoder) @Override - protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception { + protected void decode(ChannelHandlerContext ctx, ByteBuf in, MessageBuf out) throws Exception { in.markReaderIndex(); final byte[] buf = new byte[5]; for (int i = 0; i < buf.length; i ++) { if (!in.isReadable()) { in.resetReaderIndex(); - return null; + return; } buf[i] = in.readByte(); @@ -60,9 +61,10 @@ public class ProtobufVarint32FrameDecoder extends ByteToMessageDecoder { if (in.readableBytes() < length) { in.resetReaderIndex(); - return null; + return; } else { - return in.readBytes(length); + out.add(in.readBytes(length)); + return; } } } diff --git a/codec/src/main/java/io/netty/handler/codec/string/StringDecoder.java b/codec/src/main/java/io/netty/handler/codec/string/StringDecoder.java index 26c8004707..2f465866fc 100644 --- a/codec/src/main/java/io/netty/handler/codec/string/StringDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/string/StringDecoder.java @@ -16,6 +16,7 @@ package io.netty.handler.codec.string; import io.netty.buffer.ByteBuf; +import io.netty.buffer.MessageBuf; import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPipeline; @@ -74,7 +75,7 @@ public class StringDecoder extends MessageToMessageDecoder { } @Override - protected Object decode(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { - return msg.toString(charset); + protected void decode(ChannelHandlerContext ctx, ByteBuf msg, MessageBuf out) throws Exception { + out.add(msg.toString(charset)); } } diff --git a/codec/src/test/java/io/netty/handler/codec/ReplayingDecoderTest.java b/codec/src/test/java/io/netty/handler/codec/ReplayingDecoderTest.java index f28e076a59..90d219e2c8 100644 --- a/codec/src/test/java/io/netty/handler/codec/ReplayingDecoderTest.java +++ b/codec/src/test/java/io/netty/handler/codec/ReplayingDecoderTest.java @@ -17,6 +17,7 @@ package io.netty.handler.codec; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufIndexFinder; +import io.netty.buffer.MessageBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.embedded.EmbeddedByteChannel; @@ -53,10 +54,10 @@ public class ReplayingDecoderTest { } @Override - protected ByteBuf decode(ChannelHandlerContext ctx, ByteBuf in) { + protected void decode(ChannelHandlerContext ctx, ByteBuf in, MessageBuf out) { ByteBuf msg = in.readBytes(in.bytesBefore(ByteBufIndexFinder.LF)); in.skipBytes(1); - return msg; + out.add(msg); } } } diff --git a/codec/src/test/java/io/netty/handler/codec/marshalling/AbstractCompatibleMarshallingDecoderTest.java b/codec/src/test/java/io/netty/handler/codec/marshalling/AbstractCompatibleMarshallingDecoderTest.java index 67e736e671..3be464deff 100644 --- a/codec/src/test/java/io/netty/handler/codec/marshalling/AbstractCompatibleMarshallingDecoderTest.java +++ b/codec/src/test/java/io/netty/handler/codec/marshalling/AbstractCompatibleMarshallingDecoderTest.java @@ -19,8 +19,6 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandler; import io.netty.channel.embedded.EmbeddedByteChannel; -import io.netty.handler.codec.CodecException; -import io.netty.handler.codec.TooLongFrameException; import org.jboss.marshalling.Marshaller; import org.jboss.marshalling.MarshallerFactory; import org.jboss.marshalling.Marshalling; @@ -112,12 +110,12 @@ public abstract class AbstractCompatibleMarshallingDecoderTest { marshaller.close(); byte[] testBytes = bout.toByteArray(); - try { - ch.writeInbound(input(testBytes)); - fail(); - } catch (CodecException e) { - assertEquals(TooLongFrameException.class, e.getClass()); - } + onTooBigFrame(ch, input(testBytes)); + } + + protected void onTooBigFrame(EmbeddedByteChannel ch, ByteBuf input) { + ch.writeInbound(input); + assertFalse(ch.isActive()); } protected ChannelHandler createDecoder(int maxObjectSize) { diff --git a/codec/src/test/java/io/netty/handler/codec/marshalling/RiverMarshallingDecoderTest.java b/codec/src/test/java/io/netty/handler/codec/marshalling/RiverMarshallingDecoderTest.java index fec50f21c6..17d7cb2d59 100644 --- a/codec/src/test/java/io/netty/handler/codec/marshalling/RiverMarshallingDecoderTest.java +++ b/codec/src/test/java/io/netty/handler/codec/marshalling/RiverMarshallingDecoderTest.java @@ -18,6 +18,11 @@ package io.netty.handler.codec.marshalling; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandler; +import io.netty.channel.embedded.EmbeddedByteChannel; +import io.netty.handler.codec.CodecException; +import io.netty.handler.codec.TooLongFrameException; + +import static org.junit.Assert.*; public class RiverMarshallingDecoderTest extends RiverCompatibleMarshallingDecoderTest { @@ -34,4 +39,13 @@ public class RiverMarshallingDecoderTest extends RiverCompatibleMarshallingDecod createMarshallingConfig()), maxObjectSize); } + @Override + protected void onTooBigFrame(EmbeddedByteChannel ch, ByteBuf input) { + try { + ch.writeInbound(input); + fail(); + } catch (CodecException e) { + assertEquals(TooLongFrameException.class, e.getClass()); + } + } } diff --git a/codec/src/test/java/io/netty/handler/codec/marshalling/SerialMarshallingDecoderTest.java b/codec/src/test/java/io/netty/handler/codec/marshalling/SerialMarshallingDecoderTest.java index cf8f4f61f3..6f27baf078 100644 --- a/codec/src/test/java/io/netty/handler/codec/marshalling/SerialMarshallingDecoderTest.java +++ b/codec/src/test/java/io/netty/handler/codec/marshalling/SerialMarshallingDecoderTest.java @@ -18,6 +18,11 @@ package io.netty.handler.codec.marshalling; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandler; +import io.netty.channel.embedded.EmbeddedByteChannel; +import io.netty.handler.codec.CodecException; +import io.netty.handler.codec.TooLongFrameException; + +import static org.junit.Assert.*; public class SerialMarshallingDecoderTest extends SerialCompatibleMarshallingDecoderTest { @@ -34,4 +39,13 @@ public class SerialMarshallingDecoderTest extends SerialCompatibleMarshallingDec createMarshallingConfig()), maxObjectSize); } + @Override + protected void onTooBigFrame(EmbeddedByteChannel ch, ByteBuf input) { + try { + ch.writeInbound(input); + fail(); + } catch (CodecException e) { + assertEquals(TooLongFrameException.class, e.getClass()); + } + } } diff --git a/example/src/main/java/io/netty/example/factorial/BigIntegerDecoder.java b/example/src/main/java/io/netty/example/factorial/BigIntegerDecoder.java index 692fe8a944..459e842b32 100644 --- a/example/src/main/java/io/netty/example/factorial/BigIntegerDecoder.java +++ b/example/src/main/java/io/netty/example/factorial/BigIntegerDecoder.java @@ -16,6 +16,7 @@ package io.netty.example.factorial; import io.netty.buffer.ByteBuf; +import io.netty.buffer.MessageBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; import io.netty.handler.codec.CorruptedFrameException; @@ -31,10 +32,10 @@ import java.math.BigInteger; public class BigIntegerDecoder extends ByteToMessageDecoder { @Override - protected BigInteger decode(ChannelHandlerContext ctx, ByteBuf in) { + protected void decode(ChannelHandlerContext ctx, ByteBuf in, MessageBuf out) { // Wait until the length prefix is available. if (in.readableBytes() < 5) { - return null; + return; } in.markReaderIndex(); @@ -51,13 +52,13 @@ public class BigIntegerDecoder extends ByteToMessageDecoder { int dataLength = in.readInt(); if (in.readableBytes() < dataLength) { in.resetReaderIndex(); - return null; + return; } // Convert the received data into a new BigInteger. byte[] decoded = new byte[dataLength]; in.readBytes(decoded); - return new BigInteger(decoded); + out.add(new BigInteger(decoded)); } } diff --git a/transport/src/main/java/io/netty/channel/ChannelHandlerUtil.java b/transport/src/main/java/io/netty/channel/ChannelHandlerUtil.java index eb81d63574..596265abc4 100644 --- a/transport/src/main/java/io/netty/channel/ChannelHandlerUtil.java +++ b/transport/src/main/java/io/netty/channel/ChannelHandlerUtil.java @@ -221,7 +221,7 @@ public final class ChannelHandlerUtil { return true; } } - return ctx.nextOutboundMessageBuffer().unfoldAndAdd(msg); + return ctx.nextOutboundMessageBuffer().add(msg); } /** @@ -235,7 +235,7 @@ public final class ChannelHandlerUtil { return true; } } - return ctx.nextInboundMessageBuffer().unfoldAndAdd(msg); + return ctx.nextInboundMessageBuffer().add(msg); } private ChannelHandlerUtil() { }