diff --git a/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java b/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java index 51dcad6473..16310dede7 100644 --- a/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java @@ -18,7 +18,6 @@ package io.netty.handler.codec; import io.netty.buffer.ByteBuf; import io.netty.buffer.MessageBuf; import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelHandlerUtil; import io.netty.channel.ChannelInboundByteHandler; import io.netty.channel.ChannelInboundByteHandlerAdapter; @@ -98,22 +97,7 @@ public abstract class ByteToMessageDecoder throw new DecoderException(t); } } finally { - boolean decoded = false; - if (out.containsByteBuf()) { - for (;;) { - Object msg = out.poll(); - if (msg == null) { - break; - } - decoded = true; - ChannelHandlerUtil.addToNextInboundBuffer(ctx, msg); - } - } else { - if (out.drainTo(ctx.nextInboundMessageBuffer()) > 0) { - decoded = true; - } - } - if (decoded) { + if (out.drainToNextInbound(ctx)) { ctx.fireInboundBufferUpdated(); } ctx.fireChannelInactive(); @@ -122,7 +106,6 @@ public abstract class ByteToMessageDecoder protected void callDecode(ChannelHandlerContext ctx, ByteBuf in) { boolean wasNull = false; - boolean decoded = false; OutputMessageBuf out = OutputMessageBuf.get(); assert out.isEmpty(); @@ -160,22 +143,7 @@ public abstract class ByteToMessageDecoder } } } finally { - if (out.containsByteBuf()) { - for (;;) { - Object msg = out.poll(); - if (msg == null) { - break; - } - decoded = true; - ChannelHandlerUtil.addToNextInboundBuffer(ctx, msg); - } - } else { - if (out.drainTo(ctx.nextInboundMessageBuffer()) > 0) { - decoded = true; - } - } - - if (decoded) { + if (out.drainToNextInbound(ctx)) { decodeWasNull = false; ctx.fireInboundBufferUpdated(); } else { diff --git a/codec/src/main/java/io/netty/handler/codec/MessageToMessageDecoder.java b/codec/src/main/java/io/netty/handler/codec/MessageToMessageDecoder.java index ea17a58475..4efa56ded3 100644 --- a/codec/src/main/java/io/netty/handler/codec/MessageToMessageDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/MessageToMessageDecoder.java @@ -17,7 +17,6 @@ package io.netty.handler.codec; import io.netty.buffer.MessageBuf; import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelHandlerUtil; import io.netty.channel.ChannelInboundMessageHandler; import io.netty.channel.ChannelInboundMessageHandlerAdapter; @@ -54,17 +53,7 @@ public abstract class MessageToMessageDecoder extends ChannelInboundMessageHa try { decode(ctx, msg, out); } finally { - if (out.containsByteBuf()) { - for (;;) { - Object decoded = out.poll(); - if (decoded == null) { - break; - } - ChannelHandlerUtil.addToNextInboundBuffer(ctx, decoded); - } - } else { - out.drainTo(ctx.nextInboundMessageBuffer()); - } + out.drainToNextInbound(ctx); } } diff --git a/codec/src/main/java/io/netty/handler/codec/MessageToMessageEncoder.java b/codec/src/main/java/io/netty/handler/codec/MessageToMessageEncoder.java index 50790f56e3..5c7dda5b3a 100644 --- a/codec/src/main/java/io/netty/handler/codec/MessageToMessageEncoder.java +++ b/codec/src/main/java/io/netty/handler/codec/MessageToMessageEncoder.java @@ -17,7 +17,6 @@ package io.netty.handler.codec; import io.netty.buffer.MessageBuf; import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelHandlerUtil; import io.netty.channel.ChannelOutboundMessageHandlerAdapter; /** @@ -61,19 +60,7 @@ public abstract class MessageToMessageEncoder extends ChannelOutboundMessageH throw new EncoderException(cause); } } finally { - if (out.containsByteBuf()) { - for (;;) { - Object encoded = out.poll(); - if (encoded == null) { - break; - } - // Handle special case when the encoded output is a ByteBuf and the next handler in the pipeline - // accept bytes. Related to #1222 - ChannelHandlerUtil.addToNextOutboundBuffer(ctx, encoded); - } - } else { - out.drainTo(ctx.nextOutboundMessageBuffer()); - } + out.drainToNextOutbound(ctx); } } diff --git a/codec/src/main/java/io/netty/handler/codec/OutputMessageBuf.java b/codec/src/main/java/io/netty/handler/codec/OutputMessageBuf.java index b60a2541a0..eeaaef2f39 100644 --- a/codec/src/main/java/io/netty/handler/codec/OutputMessageBuf.java +++ b/codec/src/main/java/io/netty/handler/codec/OutputMessageBuf.java @@ -15,11 +15,15 @@ */ package io.netty.handler.codec; +import io.netty.buffer.BufType; import io.netty.buffer.ByteBuf; import io.netty.buffer.DefaultMessageBuf; +import io.netty.buffer.MessageBuf; +import io.netty.channel.ChannelHandlerContext; final class OutputMessageBuf extends DefaultMessageBuf { private int byteBufs; + private int nonByteBufs; private static final ThreadLocal output = new ThreadLocal() { @@ -47,8 +51,12 @@ final class OutputMessageBuf extends DefaultMessageBuf { @Override public boolean offer(Object e) { boolean added = super.offer(e); - if (added && e instanceof ByteBuf) { - byteBufs++; + if (added) { + if (e instanceof ByteBuf) { + byteBufs++; + } else { + nonByteBufs++; + } } return added; } @@ -57,8 +65,12 @@ final class OutputMessageBuf extends DefaultMessageBuf { public boolean remove(Object o) { boolean removed = super.remove(o); - if (removed && o instanceof ByteBuf) { - byteBufs--; + if (removed) { + if (o instanceof ByteBuf) { + byteBufs--; + } else { + nonByteBufs--; + } } return removed; } @@ -66,8 +78,13 @@ final class OutputMessageBuf extends DefaultMessageBuf { @Override public Object poll() { Object o = super.poll(); + if (o == null) { + return o; + } if (o instanceof ByteBuf) { byteBufs--; + } else { + nonByteBufs--; } return o; } @@ -76,9 +93,84 @@ final class OutputMessageBuf extends DefaultMessageBuf { public void clear() { super.clear(); byteBufs = 0; + nonByteBufs = 0; } - public boolean containsByteBuf() { + private boolean containsByteBuf() { return byteBufs > 0; } + + private boolean containsNonByteBuf() { + return nonByteBufs > 0; + } + + public boolean drainToNextInbound(ChannelHandlerContext ctx) { + if (containsByteBuf() && ctx.nextInboundBufferType() == BufType.BYTE) { + ByteBuf buf = ctx.nextInboundByteBuffer(); + boolean drained = false; + if (!containsNonByteBuf()) { + for (;;) { + Object o = poll(); + if (o == null) { + break; + } + buf.writeBytes((ByteBuf) o); + drained = true; + } + } else { + // mixed case + MessageBuf msgBuf = ctx.nextInboundMessageBuffer(); + for (;;) { + Object o = poll(); + if (o == null) { + break; + } + if (o instanceof ByteBuf) { + buf.writeBytes((ByteBuf) o); + } else { + msgBuf.add(o); + } + drained = true; + } + } + return drained; + } else { + return drainTo(ctx.nextInboundMessageBuffer()) > 0; + } + } + + public boolean drainToNextOutbound(ChannelHandlerContext ctx) { + if (containsByteBuf() && ctx.nextOutboundBufferType() == BufType.BYTE) { + ByteBuf buf = ctx.nextOutboundByteBuffer(); + boolean drained = false; + if (!containsNonByteBuf()) { + for (;;) { + Object o = poll(); + if (o == null) { + break; + } + buf.writeBytes((ByteBuf) o); + drained = true; + } + } else { + // mixed case + MessageBuf msgBuf = ctx.nextOutboundMessageBuffer(); + for (;;) { + Object o = poll(); + if (o == null) { + break; + } + if (o instanceof ByteBuf) { + buf.writeBytes((ByteBuf) o); + } else { + msgBuf.add(o); + } + drained = true; + } + } + return drained; + } else { + return drainTo(ctx.nextOutboundMessageBuffer()) > 0; + } + } } diff --git a/codec/src/main/java/io/netty/handler/codec/ReplayingDecoder.java b/codec/src/main/java/io/netty/handler/codec/ReplayingDecoder.java index 979fcdfaee..c0f6246dce 100644 --- a/codec/src/main/java/io/netty/handler/codec/ReplayingDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/ReplayingDecoder.java @@ -18,7 +18,6 @@ package io.netty.handler.codec; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelHandlerUtil; import io.netty.channel.ChannelPipeline; import io.netty.util.Signal; @@ -384,23 +383,7 @@ public abstract class ReplayingDecoder extends ByteToMessageDecoder { throw new DecoderException(t); } } finally { - - boolean decoded = false; - if (out.containsByteBuf()) { - for (;;) { - Object msg = out.poll(); - if (msg == null) { - break; - } - decoded = true; - ChannelHandlerUtil.addToNextInboundBuffer(ctx, msg); - } - } else { - if (out.drainTo(ctx.nextInboundMessageBuffer()) > 0) { - decoded = true; - } - } - if (decoded) { + if (out.drainToNextInbound(ctx)) { ctx.fireInboundBufferUpdated(); } @@ -413,8 +396,6 @@ public abstract class ReplayingDecoder extends ByteToMessageDecoder { boolean wasNull = false; ByteBuf in = cumulation; OutputMessageBuf out = OutputMessageBuf.get(); - boolean decoded = false; - assert out.isEmpty(); try { @@ -465,21 +446,7 @@ public abstract class ReplayingDecoder extends ByteToMessageDecoder { } } } finally { - if (out.containsByteBuf()) { - for (;;) { - Object msg = out.poll(); - if (msg == null) { - break; - } - decoded = true; - ChannelHandlerUtil.addToNextInboundBuffer(ctx, msg); - } - } else { - if (out.drainTo(ctx.nextInboundMessageBuffer()) > 0) { - decoded = true; - } - } - if (decoded) { + if (out.drainToNextInbound(ctx)) { decodeWasNull = false; ctx.fireInboundBufferUpdated(); } else {