Prevent ByteToMessageDecoder from overreading when !isAutoRead (#9252)

Motivation:

ByteToMessageDecoder only looks at the last channelRead() in the batch
of channelRead()-s when determining whether or not it should call
ChannelHandlerContext#read() to consume more data when !isAutoRead. This
will lead to read() calls issued unnecessaily and unprompted if the very
last channelRead() didn't result in at least one decoded message, even
if there have been messages decoded from other channelRead()-s in the
current batch.

Modifications:

Track decode outcomes for the entire batch of channelRead() calls and
only issue a read in BTMD if the entire batch of channelRead() calls
yielded no complete messages.

Result:

ByteToMessageDecoder will no longer overread when the very last read
yielded no message, but the batch of reads did.
This commit is contained in:
Aleksey Yeschenko 2019-06-28 12:43:25 +01:00 committed by Norman Maurer
parent cc1528bdad
commit 8d43869369
2 changed files with 66 additions and 7 deletions

View File

@ -23,6 +23,7 @@ import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandler;
import io.netty.channel.socket.ChannelInputShutdownEvent;
@ -147,8 +148,14 @@ public abstract class ByteToMessageDecoder extends ChannelHandlerAdapter impleme
ByteBuf cumulation;
private Cumulator cumulator = MERGE_CUMULATOR;
private boolean singleDecode;
private boolean decodeWasNull;
private boolean first;
/**
* This flag is used to determine if we need to call {@link ChannelHandlerContext#read()} to consume more data
* when {@link ChannelConfig#isAutoRead()} is {@code false}.
*/
private boolean firedChannelRead;
/**
* A bitmask where the bits are defined as
* <ul>
@ -285,7 +292,7 @@ public abstract class ByteToMessageDecoder extends ChannelHandlerAdapter impleme
}
int size = out.size();
decodeWasNull = !out.insertSinceRecycled();
firedChannelRead |= out.insertSinceRecycled();
fireChannelRead(ctx, out, size);
out.recycle();
}
@ -320,12 +327,10 @@ public abstract class ByteToMessageDecoder extends ChannelHandlerAdapter impleme
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
numReads = 0;
discardSomeReadBytes();
if (decodeWasNull) {
decodeWasNull = false;
if (!ctx.channel().config().isAutoRead()) {
ctx.read();
}
if (!firedChannelRead && !ctx.channel().config().isAutoRead()) {
ctx.read();
}
firedChannelRead = false;
ctx.fireChannelReadComplete();
}

View File

@ -22,6 +22,7 @@ import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.buffer.UnpooledHeapByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.embedded.EmbeddedChannel;
import org.junit.Test;
@ -30,6 +31,7 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadLocalRandom;
import static io.netty.buffer.Unpooled.wrappedBuffer;
import static org.junit.Assert.*;
public class ByteToMessageDecoderTest {
@ -345,4 +347,56 @@ public class ByteToMessageDecoderTest {
assertEquals(0, in.refCnt());
}
}
@Test
public void testDoesNotOverRead() {
class ReadInterceptingHandler extends ChannelOutboundHandlerAdapter {
private int readsTriggered;
@Override
public void read(ChannelHandlerContext ctx) throws Exception {
readsTriggered++;
super.read(ctx);
}
}
ReadInterceptingHandler interceptor = new ReadInterceptingHandler();
EmbeddedChannel channel = new EmbeddedChannel();
channel.config().setAutoRead(false);
channel.pipeline().addLast(interceptor, new FixedLengthFrameDecoder(3));
assertEquals(0, interceptor.readsTriggered);
// 0 complete frames, 1 partial frame: SHOULD trigger a read
channel.writeInbound(wrappedBuffer(new byte[] { 0, 1 }));
assertEquals(1, interceptor.readsTriggered);
// 2 complete frames, 0 partial frames: should NOT trigger a read
channel.writeInbound(wrappedBuffer(new byte[] { 2 }), wrappedBuffer(new byte[] { 3, 4, 5 }));
assertEquals(1, interceptor.readsTriggered);
// 1 complete frame, 1 partial frame: should NOT trigger a read
channel.writeInbound(wrappedBuffer(new byte[] { 6, 7, 8 }), wrappedBuffer(new byte[] { 9 }));
assertEquals(1, interceptor.readsTriggered);
// 1 complete frame, 1 partial frame: should NOT trigger a read
channel.writeInbound(wrappedBuffer(new byte[] { 10, 11 }), wrappedBuffer(new byte[] { 12 }));
assertEquals(1, interceptor.readsTriggered);
// 0 complete frames, 1 partial frame: SHOULD trigger a read
channel.writeInbound(wrappedBuffer(new byte[] { 13 }));
assertEquals(2, interceptor.readsTriggered);
// 1 complete frame, 0 partial frames: should NOT trigger a read
channel.writeInbound(wrappedBuffer(new byte[] { 14 }));
assertEquals(2, interceptor.readsTriggered);
for (int i = 0; i < 5; i++) {
ByteBuf read = channel.readInbound();
assertEquals(i * 3 + 0, read.getByte(0));
assertEquals(i * 3 + 1, read.getByte(1));
assertEquals(i * 3 + 2, read.getByte(2));
read.release();
}
assertFalse(channel.finish());
}
}