diff --git a/buffer/src/main/java/io/netty/buffer/DefaultMessageBuf.java b/buffer/src/main/java/io/netty/buffer/DefaultMessageBuf.java index f8560c8549..300b3d46b1 100644 --- a/buffer/src/main/java/io/netty/buffer/DefaultMessageBuf.java +++ b/buffer/src/main/java/io/netty/buffer/DefaultMessageBuf.java @@ -25,10 +25,12 @@ import java.util.Iterator; import java.util.NoSuchElementException; /** - * Default {@link MessageBuf} implementation + * Default {@link MessageBuf} implementation. + * + * You should use {@link Unpooled#messageBuffer()} to create an instance * */ -final class DefaultMessageBuf extends AbstractMessageBuf { +public class DefaultMessageBuf extends AbstractMessageBuf { private static final int MIN_INITIAL_CAPACITY = 8; private static final Object[] PLACEHOLDER = new Object[2]; @@ -37,15 +39,15 @@ final class DefaultMessageBuf extends AbstractMessageBuf { private int head; private int tail; - DefaultMessageBuf() { + protected DefaultMessageBuf() { this(MIN_INITIAL_CAPACITY << 1); } - DefaultMessageBuf(int initialCapacity) { + protected DefaultMessageBuf(int initialCapacity) { this(initialCapacity, Integer.MAX_VALUE); } - DefaultMessageBuf(int initialCapacity, int maxCapacity) { + protected DefaultMessageBuf(int initialCapacity, int maxCapacity) { super(maxCapacity); if (initialCapacity < 0) { diff --git a/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java b/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java index ad971ffa7a..813f4b8ca0 100644 --- a/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java @@ -17,7 +17,6 @@ package io.netty.handler.codec; import io.netty.buffer.ByteBuf; import io.netty.buffer.MessageBuf; -import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerUtil; import io.netty.channel.ChannelInboundByteHandler; @@ -46,24 +45,14 @@ public abstract class ByteToMessageDecoder private volatile boolean singleDecode; private boolean decodeWasNull; - private static final ThreadLocal> decoderOutput = - new ThreadLocal>() { + private static final ThreadLocal decoderOutput = + new ThreadLocal() { @Override - protected MessageBuf initialValue() { - return Unpooled.messageBuffer(); + protected OutputMessageBuf initialValue() { + return new OutputMessageBuf(); } }; - @Override - public ByteBuf newInboundBuffer(ChannelHandlerContext ctx) throws Exception { - return super.newInboundBuffer(ctx); - } - - @Override - public void freeInboundBuffer(ChannelHandlerContext ctx) throws Exception { - super.freeInboundBuffer(ctx); - } - /** * If set then only one message is decoded on each {@link #inboundBufferUpdated(ChannelHandlerContext)} call. * This may be useful if you need to do some protocol upgrade and want to make sure nothing is mixed up. @@ -102,7 +91,7 @@ public abstract class ByteToMessageDecoder @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { - MessageBuf out = decoderOutput(); + OutputMessageBuf out = decoderOutput(); try { ByteBuf in = ctx.inboundByteBuffer(); if (in.isReadable()) { @@ -118,13 +107,19 @@ public abstract class ByteToMessageDecoder } } finally { boolean decoded = false; - for (;;) { - Object msg = out.poll(); - if (msg == null) { - break; + if (out.containsByteBuf()) { + for (;;) { + Object msg = out.poll(); + if (msg == null) { + break; + } + decoded = true; + ChannelHandlerUtil.addToNextInboundBuffer(ctx, msg); + } + } else { + if (out.drainTo(ctx.nextInboundMessageBuffer()) > 0) { + decoded = true; } - decoded = true; - ChannelHandlerUtil.addToNextInboundBuffer(ctx, msg); } if (decoded) { ctx.fireInboundBufferUpdated(); @@ -136,7 +131,7 @@ public abstract class ByteToMessageDecoder protected void callDecode(ChannelHandlerContext ctx, ByteBuf in) { boolean wasNull = false; boolean decoded = false; - MessageBuf out = decoderOutput(); + OutputMessageBuf out = decoderOutput(); assert out.isEmpty(); @@ -173,13 +168,19 @@ public abstract class ByteToMessageDecoder } } } finally { - for (;;) { - Object msg = out.poll(); - if (msg == null) { - break; + if (out.containsByteBuf()) { + for (;;) { + Object msg = out.poll(); + if (msg == null) { + break; + } + decoded = true; + ChannelHandlerUtil.addToNextInboundBuffer(ctx, msg); + } + } else { + if (out.drainTo(ctx.nextInboundMessageBuffer()) > 0) { + decoded = true; } - decoded = true; - ChannelHandlerUtil.addToNextInboundBuffer(ctx, msg); } if (decoded) { @@ -217,7 +218,7 @@ public abstract class ByteToMessageDecoder decode(ctx, in, out); } - final MessageBuf decoderOutput() { + final OutputMessageBuf decoderOutput() { return decoderOutput.get(); } diff --git a/codec/src/main/java/io/netty/handler/codec/MessageToMessageDecoder.java b/codec/src/main/java/io/netty/handler/codec/MessageToMessageDecoder.java index 3cdb30678d..f66c0e922d 100644 --- a/codec/src/main/java/io/netty/handler/codec/MessageToMessageDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/MessageToMessageDecoder.java @@ -16,7 +16,6 @@ package io.netty.handler.codec; import io.netty.buffer.MessageBuf; -import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerUtil; import io.netty.channel.ChannelInboundMessageHandler; @@ -43,11 +42,11 @@ import io.netty.channel.ChannelInboundMessageHandlerAdapter; */ public abstract class MessageToMessageDecoder extends ChannelInboundMessageHandlerAdapter { - private static final ThreadLocal> decoderOutput = - new ThreadLocal>() { + private static final ThreadLocal decoderOutput = + new ThreadLocal() { @Override - protected MessageBuf initialValue() { - return Unpooled.messageBuffer(); + protected OutputMessageBuf initialValue() { + return new OutputMessageBuf(); } }; @@ -59,16 +58,20 @@ public abstract class MessageToMessageDecoder extends ChannelInboundMessageHa @Override public final void messageReceived(ChannelHandlerContext ctx, I msg) throws Exception { - MessageBuf out = decoderOutput.get(); + OutputMessageBuf out = decoderOutput.get(); try { decode(ctx, msg, out); } finally { - for (;;) { - Object obj = out.poll(); - if (obj == null) { - break; + if (out.containsByteBuf()) { + for (;;) { + Object decoded = out.poll(); + if (decoded == null) { + break; + } + ChannelHandlerUtil.addToNextInboundBuffer(ctx, decoded); } - ChannelHandlerUtil.addToNextInboundBuffer(ctx, obj); + } else { + out.drainTo(ctx.nextInboundMessageBuffer()); } } } diff --git a/codec/src/main/java/io/netty/handler/codec/MessageToMessageEncoder.java b/codec/src/main/java/io/netty/handler/codec/MessageToMessageEncoder.java index d1482051c7..32dccdb90e 100644 --- a/codec/src/main/java/io/netty/handler/codec/MessageToMessageEncoder.java +++ b/codec/src/main/java/io/netty/handler/codec/MessageToMessageEncoder.java @@ -16,7 +16,6 @@ package io.netty.handler.codec; import io.netty.buffer.MessageBuf; -import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerUtil; import io.netty.channel.ChannelOutboundMessageHandlerAdapter; @@ -40,11 +39,11 @@ import io.netty.channel.ChannelOutboundMessageHandlerAdapter; * */ public abstract class MessageToMessageEncoder extends ChannelOutboundMessageHandlerAdapter { - private static final ThreadLocal> encoderOutput = - new ThreadLocal>() { + private static final ThreadLocal encoderOutput = + new ThreadLocal() { @Override - protected MessageBuf initialValue() { - return Unpooled.messageBuffer(); + protected OutputMessageBuf initialValue() { + return new OutputMessageBuf(); } }; @@ -56,7 +55,7 @@ public abstract class MessageToMessageEncoder extends ChannelOutboundMessageH @Override public final void flush(ChannelHandlerContext ctx, I msg) throws Exception { - MessageBuf out = encoderOutput.get(); + OutputMessageBuf out = encoderOutput.get(); assert out.isEmpty(); @@ -71,14 +70,18 @@ public abstract class MessageToMessageEncoder extends ChannelOutboundMessageH throw new EncoderException(cause); } } finally { - for (;;) { - Object encoded = out.poll(); - if (encoded == null) { - break; + if (out.containsByteBuf()) { + for (;;) { + Object encoded = out.poll(); + if (encoded == null) { + break; + } + // Handle special case when the encoded output is a ByteBuf and the next handler in the pipeline + // accept bytes. Related to #1222 + ChannelHandlerUtil.addToNextOutboundBuffer(ctx, encoded); } - // Handle special case when the encoded output is a ByteBuf and the next handler in the pipeline - // accept bytes. Related to #1222 - ChannelHandlerUtil.addToNextOutboundBuffer(ctx, encoded); + } else { + out.drainTo(ctx.nextOutboundMessageBuffer()); } } } diff --git a/codec/src/main/java/io/netty/handler/codec/OutputMessageBuf.java b/codec/src/main/java/io/netty/handler/codec/OutputMessageBuf.java new file mode 100644 index 0000000000..bd8f98b1ad --- /dev/null +++ b/codec/src/main/java/io/netty/handler/codec/OutputMessageBuf.java @@ -0,0 +1,71 @@ +/* + * Copyright 2013 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.handler.codec; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.DefaultMessageBuf; + +final class OutputMessageBuf extends DefaultMessageBuf { + private int byteBufs; + public OutputMessageBuf() { + } + + public OutputMessageBuf(int initialCapacity) { + super(initialCapacity); + } + + public OutputMessageBuf(int initialCapacity, int maxCapacity) { + super(initialCapacity, maxCapacity); + } + + @Override + public boolean offer(Object e) { + boolean added = super.offer(e); + if (added && e instanceof ByteBuf) { + byteBufs++; + } + return added; + } + + @Override + public boolean remove(Object o) { + boolean removed = super.remove(o); + + if (removed && o instanceof ByteBuf) { + byteBufs--; + } + return removed; + } + + @Override + public Object poll() { + Object o = super.poll(); + if (o instanceof ByteBuf) { + byteBufs--; + } + return o; + } + + @Override + public void clear() { + super.clear(); + byteBufs = 0; + } + + public boolean containsByteBuf() { + return byteBufs > 0; + } +} diff --git a/codec/src/main/java/io/netty/handler/codec/ReplayingDecoder.java b/codec/src/main/java/io/netty/handler/codec/ReplayingDecoder.java index a66e6f9b91..c1f2071924 100644 --- a/codec/src/main/java/io/netty/handler/codec/ReplayingDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/ReplayingDecoder.java @@ -16,7 +16,6 @@ package io.netty.handler.codec; import io.netty.buffer.ByteBuf; -import io.netty.buffer.MessageBuf; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerUtil; @@ -365,7 +364,7 @@ public abstract class ReplayingDecoder extends ByteToMessageDecoder { @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { - MessageBuf out = decoderOutput(); + OutputMessageBuf out = decoderOutput(); try { replayable.terminate(); @@ -387,13 +386,19 @@ public abstract class ReplayingDecoder extends ByteToMessageDecoder { } finally { boolean decoded = false; - for (;;) { - Object msg = out.poll(); - if (msg == null) { - break; + if (out.containsByteBuf()) { + for (;;) { + Object msg = out.poll(); + if (msg == null) { + break; + } + decoded = true; + ChannelHandlerUtil.addToNextInboundBuffer(ctx, msg); + } + } else { + if (out.drainTo(ctx.nextInboundMessageBuffer()) > 0) { + decoded = true; } - decoded = true; - ChannelHandlerUtil.addToNextInboundBuffer(ctx, msg); } if (decoded) { ctx.fireInboundBufferUpdated(); @@ -407,7 +412,7 @@ public abstract class ReplayingDecoder extends ByteToMessageDecoder { protected void callDecode(ChannelHandlerContext ctx, ByteBuf buf) { boolean wasNull = false; ByteBuf in = cumulation; - MessageBuf out = decoderOutput(); + OutputMessageBuf out = decoderOutput(); boolean decoded = false; assert out.isEmpty(); @@ -460,13 +465,19 @@ public abstract class ReplayingDecoder extends ByteToMessageDecoder { } } } finally { - for (;;) { - Object msg = out.poll(); - if (msg == null) { - break; + if (out.containsByteBuf()) { + for (;;) { + Object msg = out.poll(); + if (msg == null) { + break; + } + decoded = true; + ChannelHandlerUtil.addToNextInboundBuffer(ctx, msg); + } + } else { + if (out.drainTo(ctx.nextInboundMessageBuffer()) > 0) { + decoded = true; } - decoded = true; - ChannelHandlerUtil.addToNextInboundBuffer(ctx, msg); } if (decoded) { decodeWasNull = false;