diff --git a/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java b/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java
index 67c09041b6..b36186f8c9 100644
--- a/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java
+++ b/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java
@@ -23,6 +23,7 @@ import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
+import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandler;
import io.netty.channel.socket.ChannelInputShutdownEvent;
@@ -147,8 +148,14 @@ public abstract class ByteToMessageDecoder extends ChannelHandlerAdapter impleme
ByteBuf cumulation;
private Cumulator cumulator = MERGE_CUMULATOR;
private boolean singleDecode;
- private boolean decodeWasNull;
private boolean first;
+
+ /**
+ * This flag is used to determine if we need to call {@link ChannelHandlerContext#read()} to consume more data
+ * when {@link ChannelConfig#isAutoRead()} is {@code false}.
+ */
+ private boolean firedChannelRead;
+
/**
* A bitmask where the bits are defined as
*
@@ -285,7 +292,7 @@ public abstract class ByteToMessageDecoder extends ChannelHandlerAdapter impleme
}
int size = out.size();
- decodeWasNull = !out.insertSinceRecycled();
+ firedChannelRead |= out.insertSinceRecycled();
fireChannelRead(ctx, out, size);
out.recycle();
}
@@ -320,12 +327,10 @@ public abstract class ByteToMessageDecoder extends ChannelHandlerAdapter impleme
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
numReads = 0;
discardSomeReadBytes();
- if (decodeWasNull) {
- decodeWasNull = false;
- if (!ctx.channel().config().isAutoRead()) {
- ctx.read();
- }
+ if (!firedChannelRead && !ctx.channel().config().isAutoRead()) {
+ ctx.read();
}
+ firedChannelRead = false;
ctx.fireChannelReadComplete();
}
diff --git a/codec/src/test/java/io/netty/handler/codec/ByteToMessageDecoderTest.java b/codec/src/test/java/io/netty/handler/codec/ByteToMessageDecoderTest.java
index cac324470a..85182d8ba2 100644
--- a/codec/src/test/java/io/netty/handler/codec/ByteToMessageDecoderTest.java
+++ b/codec/src/test/java/io/netty/handler/codec/ByteToMessageDecoderTest.java
@@ -22,6 +22,7 @@ import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.buffer.UnpooledHeapByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.embedded.EmbeddedChannel;
import org.junit.Test;
@@ -30,6 +31,7 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadLocalRandom;
+import static io.netty.buffer.Unpooled.wrappedBuffer;
import static org.junit.Assert.*;
public class ByteToMessageDecoderTest {
@@ -345,4 +347,56 @@ public class ByteToMessageDecoderTest {
assertEquals(0, in.refCnt());
}
}
+
+ @Test
+ public void testDoesNotOverRead() {
+ class ReadInterceptingHandler extends ChannelOutboundHandlerAdapter {
+ private int readsTriggered;
+
+ @Override
+ public void read(ChannelHandlerContext ctx) throws Exception {
+ readsTriggered++;
+ super.read(ctx);
+ }
+ }
+ ReadInterceptingHandler interceptor = new ReadInterceptingHandler();
+
+ EmbeddedChannel channel = new EmbeddedChannel();
+ channel.config().setAutoRead(false);
+ channel.pipeline().addLast(interceptor, new FixedLengthFrameDecoder(3));
+ assertEquals(0, interceptor.readsTriggered);
+
+ // 0 complete frames, 1 partial frame: SHOULD trigger a read
+ channel.writeInbound(wrappedBuffer(new byte[] { 0, 1 }));
+ assertEquals(1, interceptor.readsTriggered);
+
+ // 2 complete frames, 0 partial frames: should NOT trigger a read
+ channel.writeInbound(wrappedBuffer(new byte[] { 2 }), wrappedBuffer(new byte[] { 3, 4, 5 }));
+ assertEquals(1, interceptor.readsTriggered);
+
+ // 1 complete frame, 1 partial frame: should NOT trigger a read
+ channel.writeInbound(wrappedBuffer(new byte[] { 6, 7, 8 }), wrappedBuffer(new byte[] { 9 }));
+ assertEquals(1, interceptor.readsTriggered);
+
+ // 1 complete frame, 1 partial frame: should NOT trigger a read
+ channel.writeInbound(wrappedBuffer(new byte[] { 10, 11 }), wrappedBuffer(new byte[] { 12 }));
+ assertEquals(1, interceptor.readsTriggered);
+
+ // 0 complete frames, 1 partial frame: SHOULD trigger a read
+ channel.writeInbound(wrappedBuffer(new byte[] { 13 }));
+ assertEquals(2, interceptor.readsTriggered);
+
+ // 1 complete frame, 0 partial frames: should NOT trigger a read
+ channel.writeInbound(wrappedBuffer(new byte[] { 14 }));
+ assertEquals(2, interceptor.readsTriggered);
+
+ for (int i = 0; i < 5; i++) {
+ ByteBuf read = channel.readInbound();
+ assertEquals(i * 3 + 0, read.getByte(0));
+ assertEquals(i * 3 + 1, read.getByte(1));
+ assertEquals(i * 3 + 2, read.getByte(2));
+ read.release();
+ }
+ assertFalse(channel.finish());
+ }
}