diff --git a/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java b/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java index bd880f9748..7dc3a932e6 100644 --- a/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java @@ -212,16 +212,24 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter } catch (Exception e) { throw new DecoderException(e); } finally { - if (cumulation != null) { - cumulation.release(); - cumulation = null; + try { + if (cumulation != null) { + cumulation.release(); + cumulation = null; + } + int size = out.size(); + for (int i = 0; i < size; i++) { + ctx.fireChannelRead(out.get(i)); + } + if (size > 0) { + // Something was read, call fireChannelReadComplete() + ctx.fireChannelReadComplete(); + } + ctx.fireChannelInactive(); + } finally { + // recycle in all cases + out.recycle(); } - int size = out.size(); - for (int i = 0; i < size; i ++) { - ctx.fireChannelRead(out.get(i)); - } - ctx.fireChannelInactive(); - out.recycle(); } } diff --git a/codec/src/main/java/io/netty/handler/codec/ReplayingDecoder.java b/codec/src/main/java/io/netty/handler/codec/ReplayingDecoder.java index b3605f0d38..d6e93dcf5a 100644 --- a/codec/src/main/java/io/netty/handler/codec/ReplayingDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/ReplayingDecoder.java @@ -336,17 +336,24 @@ public abstract class ReplayingDecoder extends ByteToMessageDecoder { } catch (Exception e) { throw new DecoderException(e); } finally { - if (cumulation != null) { - cumulation.release(); - cumulation = null; + try { + if (cumulation != null) { + cumulation.release(); + cumulation = null; + } + int size = out.size(); + for (int i = 0; i < size; i++) { + ctx.fireChannelRead(out.get(i)); + } + if (size > 0) { + // Something was read, call fireChannelReadComplete() + ctx.fireChannelReadComplete(); + } + ctx.fireChannelInactive(); + } finally { + // recycle in all cases + out.recycle(); } - - int size = out.size(); - for (int i = 0; i < size; i ++) { - ctx.fireChannelRead(out.get(i)); - } - ctx.fireChannelInactive(); - out.recycle(); } } diff --git a/codec/src/test/java/io/netty/handler/codec/ByteToMessageDecoderTest.java b/codec/src/test/java/io/netty/handler/codec/ByteToMessageDecoderTest.java index 3cd8584a62..aad5e13128 100644 --- a/codec/src/test/java/io/netty/handler/codec/ByteToMessageDecoderTest.java +++ b/codec/src/test/java/io/netty/handler/codec/ByteToMessageDecoderTest.java @@ -18,12 +18,15 @@ package io.netty.handler.codec; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.embedded.EmbeddedChannel; import io.netty.util.ReferenceCountUtil; import org.junit.Assert; import org.junit.Test; import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingDeque; public class ByteToMessageDecoderTest { @@ -119,4 +122,42 @@ public class ByteToMessageDecoderTest { Assert.assertEquals(channel.readInbound(), Unpooled.wrappedBuffer(new byte[] {'b'})); Assert.assertNull(channel.readInbound()); } + + @Test + public void testFireChannelReadCompleteOnInactive() throws InterruptedException { + final BlockingQueue queue = new LinkedBlockingDeque(); + final ByteBuf buf = ReferenceCountUtil.releaseLater(Unpooled.buffer().writeBytes(new byte[]{'a', 'b'})); + EmbeddedChannel channel = new EmbeddedChannel(new ByteToMessageDecoder() { + @Override + protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { + in.skipBytes(in.readableBytes()); + if (!ctx.channel().isActive()) { + out.add("data"); + } + } + }, new ChannelInboundHandlerAdapter() { + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + queue.add(3); + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + queue.add(1); + } + + @Override + public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { + if (!ctx.channel().isActive()) { + queue.add(2); + } + } + }); + Assert.assertFalse(channel.writeInbound(buf)); + channel.finish(); + Assert.assertEquals(1, (int) queue.take()); + Assert.assertEquals(2, (int) queue.take()); + Assert.assertEquals(3, (int) queue.take()); + Assert.assertTrue(queue.isEmpty()); + } } diff --git a/codec/src/test/java/io/netty/handler/codec/ReplayingDecoderTest.java b/codec/src/test/java/io/netty/handler/codec/ReplayingDecoderTest.java index 3989d0bbe3..493a1614e3 100644 --- a/codec/src/test/java/io/netty/handler/codec/ReplayingDecoderTest.java +++ b/codec/src/test/java/io/netty/handler/codec/ReplayingDecoderTest.java @@ -23,6 +23,8 @@ import io.netty.channel.embedded.EmbeddedChannel; import org.junit.Test; import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingDeque; import static io.netty.util.ReferenceCountUtil.releaseLater; import static org.junit.Assert.*; @@ -177,4 +179,44 @@ public class ReplayingDecoderTest { b.release(); buf.release(); } + + @Test + public void testFireChannelReadCompleteOnInactive() throws InterruptedException { + final BlockingQueue queue = new LinkedBlockingDeque(); + final ByteBuf buf = releaseLater(Unpooled.buffer().writeBytes(new byte[]{'a', 'b'})); + EmbeddedChannel channel = new EmbeddedChannel(new ReplayingDecoder() { + + @Override + protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { + in.skipBytes(in.readableBytes()); + if (!ctx.channel().isActive()) { + out.add("data"); + } + } + }, new ChannelInboundHandlerAdapter() { + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + queue.add(3); + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + queue.add(1); + } + + @Override + public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { + if (!ctx.channel().isActive()) { + queue.add(2); + } + } + }); + assertFalse(channel.writeInbound(buf)); + channel.finish(); + assertEquals(1, (int) queue.take()); + assertEquals(1, (int) queue.take()); + assertEquals(2, (int) queue.take()); + assertEquals(3, (int) queue.take()); + assertTrue(queue.isEmpty()); + } }