diff --git a/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java b/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java index 6f0df5f382..eb2cb42965 100644 --- a/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java @@ -16,6 +16,7 @@ package io.netty.handler.codec; import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelConfig; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerUtil; import io.netty.channel.ChannelInboundByteHandler; @@ -42,6 +43,7 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter implements ChannelInboundByteHandler { private volatile boolean singleDecode; + private boolean decodeWasNull; /** * If set then only one message is decoded on each {@link #inboundBufferUpdated(ChannelHandlerContext)} call. @@ -62,7 +64,6 @@ public abstract class ByteToMessageDecoder public boolean isSingleDecode() { return singleDecode; } - @Override public ByteBuf newInboundBuffer(ChannelHandlerContext ctx) throws Exception { return ctx.alloc().buffer(); @@ -78,6 +79,17 @@ public abstract class ByteToMessageDecoder callDecode(ctx); } + @Override + public void channelReadSuspended(ChannelHandlerContext ctx) throws Exception { + if (decodeWasNull) { + decodeWasNull = false; + if (!ctx.channel().config().isAutoRead()) { + ctx.read(); + } + } + super.channelReadSuspended(ctx); + } + @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { ByteBuf in = ctx.inboundByteBuffer(); @@ -101,6 +113,8 @@ public abstract class ByteToMessageDecoder } protected void callDecode(ChannelHandlerContext ctx) { + boolean wasNull = false; + ByteBuf in = ctx.inboundByteBuffer(); boolean decoded = false; @@ -109,12 +123,14 @@ public abstract class ByteToMessageDecoder int oldInputLength = in.readableBytes(); Object o = decode(ctx, in); if (o == null) { + wasNull = true; if (oldInputLength == in.readableBytes()) { break; } else { continue; } } + wasNull = false; if (oldInputLength == in.readableBytes()) { throw new IllegalStateException( "decode() did not read anything but decoded a message."); @@ -143,7 +159,12 @@ public abstract class ByteToMessageDecoder } if (decoded) { + decodeWasNull = false; ctx.fireInboundBufferUpdated(); + } else { + if (wasNull) { + decodeWasNull = true; + } } } diff --git a/codec/src/main/java/io/netty/handler/codec/ReplayingDecoder.java b/codec/src/main/java/io/netty/handler/codec/ReplayingDecoder.java index 76bc1a0400..7e7c8d37d7 100644 --- a/codec/src/main/java/io/netty/handler/codec/ReplayingDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/ReplayingDecoder.java @@ -272,6 +272,7 @@ public abstract class ReplayingDecoder extends ByteToMessageDecoder { private ReplayingDecoderBuffer replayable; private S state; private int checkpoint = -1; + private boolean decodeWasNull; /** * Creates a new instance with no initial state (i.e: {@code null}). @@ -385,6 +386,8 @@ public abstract class ReplayingDecoder extends ByteToMessageDecoder { @Override protected void callDecode(ChannelHandlerContext ctx) { + boolean wasNull = false; + ByteBuf in = cumulation; boolean decoded = false; while (in.readable()) { @@ -417,10 +420,13 @@ public abstract class ReplayingDecoder extends ByteToMessageDecoder { } if (result == null) { + wasNull = true; + // Seems like more data is required. // Let us wait for the next notification. break; } + wasNull = false; if (oldReaderIndex == in.readerIndex() && oldState == state) { throw new IllegalStateException( @@ -451,7 +457,25 @@ public abstract class ReplayingDecoder extends ByteToMessageDecoder { } if (decoded) { + decodeWasNull = false; ctx.fireInboundBufferUpdated(); + } else { + if (wasNull) { + decodeWasNull = true; + } } } + + @Override + public void channelReadSuspended(ChannelHandlerContext ctx) throws Exception { + if (decodeWasNull) { + decodeWasNull = false; + if (!ctx.channel().config().isAutoRead()) { + ctx.read(); + } + } + + super.channelReadSuspended(ctx); + } + }