From f94edd2e92f6e978c0b911aabbd101df54d86279 Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Mon, 11 Apr 2016 09:15:30 +0200 Subject: [PATCH] Correctly handle ChannelInputShutdownEvent in ReplayingDecoder Motivation: b112673554bafc1eccfd43913a3e8605337dd7fb added ChannelInputShutdownEvent support to ByteToMessageDecoder but missed updating the code for ReplayingDecoder. This has the effect: - If a ChannelInputShutdownEvent is fired ByteToMessageDecoder (the super-class of ReplayingDecoder) will call the channelInputClosed(...) method which will pass the incorrect buffer to the decode method of ReplayingDecoder. Modifications: Share more code between ByteToMessageDEcoder and ReplayingDecoder and so also support ChannelInputShutdownEvent correctly in ReplayingDecoder Result: ChannelInputShutdownEvent is corrrectly handle in ReplayingDecoder as well. --- .../handler/codec/ByteToMessageDecoder.java | 20 +++++++--- .../netty/handler/codec/ReplayingDecoder.java | 35 +++++------------- .../handler/codec/ReplayingDecoderTest.java | 37 +++++++++++++++++++ 3 files changed, 60 insertions(+), 32 deletions(-) diff --git a/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java b/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java index ad42f602e7..04c573e6d0 100644 --- a/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java @@ -323,12 +323,7 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter private void channelInputClosed(ChannelHandlerContext ctx, boolean callChannelInactive) throws Exception { RecyclableArrayList out = RecyclableArrayList.newInstance(); try { - if (cumulation != null) { - callDecode(ctx, cumulation, out); - decodeLast(ctx, cumulation, out); - } else { - decodeLast(ctx, Unpooled.EMPTY_BUFFER, out); - } + channelInputClosed(ctx, out); } catch (DecoderException e) { throw e; } catch (Exception e) { @@ -355,6 +350,19 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter } } + /** + * Called when the input of the channel was closed which may be because it changed to inactive or because of + * {@link ChannelInputShutdownEvent}. + */ + void channelInputClosed(ChannelHandlerContext ctx, List out) throws Exception { + if (cumulation != null) { + callDecode(ctx, cumulation, out); + decodeLast(ctx, cumulation, out); + } else { + decodeLast(ctx, Unpooled.EMPTY_BUFFER, out); + } + } + /** * Called once data should be decoded from the given {@link ByteBuf}. This method will call * {@link #decode(ChannelHandlerContext, ByteBuf, List)} as long as decoding should take place. diff --git a/codec/src/main/java/io/netty/handler/codec/ReplayingDecoder.java b/codec/src/main/java/io/netty/handler/codec/ReplayingDecoder.java index 67c33c7880..f53728cdf6 100644 --- a/codec/src/main/java/io/netty/handler/codec/ReplayingDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/ReplayingDecoder.java @@ -16,6 +16,7 @@ package io.netty.handler.codec; import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPipeline; @@ -322,37 +323,19 @@ public abstract class ReplayingDecoder extends ByteToMessageDecoder { } @Override - public void channelInactive(ChannelHandlerContext ctx) throws Exception { - RecyclableArrayList out = RecyclableArrayList.newInstance(); + final void channelInputClosed(ChannelHandlerContext ctx, List out) throws Exception { try { replayable.terminate(); - callDecode(ctx, internalBuffer(), out); - decodeLast(ctx, replayable, out); + if (cumulation != null) { + callDecode(ctx, internalBuffer(), out); + decodeLast(ctx, replayable, out); + } else { + replayable.setCumulation(Unpooled.EMPTY_BUFFER); + decodeLast(ctx, replayable, out); + } } catch (Signal replay) { // Ignore replay.expect(REPLAY); - } catch (DecoderException e) { - throw e; - } catch (Exception e) { - throw new DecoderException(e); - } finally { - try { - if (cumulation != null) { - cumulation.release(); - cumulation = null; - } - - int size = out.size(); - if (size > 0) { - fireChannelRead(ctx, out, size); - // Something was read, call fireChannelReadComplete() - ctx.fireChannelReadComplete(); - } - ctx.fireChannelInactive(); - } finally { - // recycle in all cases - out.recycle(); - } } } diff --git a/codec/src/test/java/io/netty/handler/codec/ReplayingDecoderTest.java b/codec/src/test/java/io/netty/handler/codec/ReplayingDecoderTest.java index 8968c2002f..657fd0d7b1 100644 --- a/codec/src/test/java/io/netty/handler/codec/ReplayingDecoderTest.java +++ b/codec/src/test/java/io/netty/handler/codec/ReplayingDecoderTest.java @@ -20,11 +20,13 @@ import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.embedded.EmbeddedChannel; +import io.netty.channel.socket.ChannelInputShutdownEvent; import org.junit.Test; import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.atomic.AtomicReference; import static io.netty.util.ReferenceCountUtil.releaseLater; import static org.junit.Assert.*; @@ -225,4 +227,39 @@ public class ReplayingDecoderTest { assertEquals(3, (int) queue.take()); assertTrue(queue.isEmpty()); } + + @Test + public void testChannelInputShutdownEvent() { + final AtomicReference error = new AtomicReference(); + + EmbeddedChannel channel = new EmbeddedChannel(new ReplayingDecoder(0) { + private boolean decoded; + + @Override + protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { + if (!(in instanceof ReplayingDecoderByteBuf)) { + error.set(new AssertionError("in must be of type " + ReplayingDecoderByteBuf.class + + " but was " + in.getClass())); + return; + } + if (!decoded) { + decoded = true; + in.readByte(); + state(1); + } else { + // This will throw an ReplayingError + in.skipBytes(Integer.MAX_VALUE); + } + } + }); + + assertFalse(channel.writeInbound(Unpooled.wrappedBuffer(new byte[] {0, 1}))); + channel.pipeline().fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE); + assertFalse(channel.finishAndReleaseAll()); + + Error err = error.get(); + if (err != null) { + throw err; + } + } }