Prevent ByteToMessageDecoder from overreading when !isAutoRead (#9252)
Motivation: ByteToMessageDecoder only looks at the last channelRead() in the batch of channelRead()-s when determining whether or not it should call ChannelHandlerContext#read() to consume more data when !isAutoRead. This will lead to read() calls issued unnecessaily and unprompted if the very last channelRead() didn't result in at least one decoded message, even if there have been messages decoded from other channelRead()-s in the current batch. Modifications: Track decode outcomes for the entire batch of channelRead() calls and only issue a read in BTMD if the entire batch of channelRead() calls yielded no complete messages. Result: ByteToMessageDecoder will no longer overread when the very last read yielded no message, but the batch of reads did.
This commit is contained in:
parent
efe40ac17d
commit
6b6475fb56
@ -21,6 +21,7 @@ import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.ByteBufAllocator;
|
||||
import io.netty.buffer.CompositeByteBuf;
|
||||
import io.netty.buffer.Unpooled;
|
||||
import io.netty.channel.ChannelConfig;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelInboundHandlerAdapter;
|
||||
import io.netty.channel.socket.ChannelInputShutdownEvent;
|
||||
@ -151,8 +152,14 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter
|
||||
ByteBuf cumulation;
|
||||
private Cumulator cumulator = MERGE_CUMULATOR;
|
||||
private boolean singleDecode;
|
||||
private boolean decodeWasNull;
|
||||
private boolean first;
|
||||
|
||||
/**
|
||||
* This flag is used to determine if we need to call {@link ChannelHandlerContext#read()} to consume more data
|
||||
* when {@link ChannelConfig#isAutoRead()} is {@code false}.
|
||||
*/
|
||||
private boolean firedChannelRead;
|
||||
|
||||
/**
|
||||
* A bitmask where the bits are defined as
|
||||
* <ul>
|
||||
@ -291,7 +298,7 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter
|
||||
}
|
||||
|
||||
int size = out.size();
|
||||
decodeWasNull = !out.insertSinceRecycled();
|
||||
firedChannelRead |= out.insertSinceRecycled();
|
||||
fireChannelRead(ctx, out, size);
|
||||
out.recycle();
|
||||
}
|
||||
@ -326,12 +333,10 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter
|
||||
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
|
||||
numReads = 0;
|
||||
discardSomeReadBytes();
|
||||
if (decodeWasNull) {
|
||||
decodeWasNull = false;
|
||||
if (!ctx.channel().config().isAutoRead()) {
|
||||
ctx.read();
|
||||
}
|
||||
if (!firedChannelRead && !ctx.channel().config().isAutoRead()) {
|
||||
ctx.read();
|
||||
}
|
||||
firedChannelRead = false;
|
||||
ctx.fireChannelReadComplete();
|
||||
}
|
||||
|
||||
|
@ -22,6 +22,7 @@ import io.netty.buffer.UnpooledByteBufAllocator;
|
||||
import io.netty.buffer.UnpooledHeapByteBuf;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelInboundHandlerAdapter;
|
||||
import io.netty.channel.ChannelOutboundHandlerAdapter;
|
||||
import io.netty.channel.embedded.EmbeddedChannel;
|
||||
import io.netty.util.internal.PlatformDependent;
|
||||
import org.junit.Test;
|
||||
@ -30,6 +31,7 @@ import java.util.List;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.LinkedBlockingDeque;
|
||||
|
||||
import static io.netty.buffer.Unpooled.wrappedBuffer;
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
public class ByteToMessageDecoderTest {
|
||||
@ -345,4 +347,56 @@ public class ByteToMessageDecoderTest {
|
||||
assertEquals(0, in.refCnt());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDoesNotOverRead() {
|
||||
class ReadInterceptingHandler extends ChannelOutboundHandlerAdapter {
|
||||
private int readsTriggered;
|
||||
|
||||
@Override
|
||||
public void read(ChannelHandlerContext ctx) throws Exception {
|
||||
readsTriggered++;
|
||||
super.read(ctx);
|
||||
}
|
||||
}
|
||||
ReadInterceptingHandler interceptor = new ReadInterceptingHandler();
|
||||
|
||||
EmbeddedChannel channel = new EmbeddedChannel();
|
||||
channel.config().setAutoRead(false);
|
||||
channel.pipeline().addLast(interceptor, new FixedLengthFrameDecoder(3));
|
||||
assertEquals(0, interceptor.readsTriggered);
|
||||
|
||||
// 0 complete frames, 1 partial frame: SHOULD trigger a read
|
||||
channel.writeInbound(wrappedBuffer(new byte[] { 0, 1 }));
|
||||
assertEquals(1, interceptor.readsTriggered);
|
||||
|
||||
// 2 complete frames, 0 partial frames: should NOT trigger a read
|
||||
channel.writeInbound(wrappedBuffer(new byte[] { 2 }), wrappedBuffer(new byte[] { 3, 4, 5 }));
|
||||
assertEquals(1, interceptor.readsTriggered);
|
||||
|
||||
// 1 complete frame, 1 partial frame: should NOT trigger a read
|
||||
channel.writeInbound(wrappedBuffer(new byte[] { 6, 7, 8 }), wrappedBuffer(new byte[] { 9 }));
|
||||
assertEquals(1, interceptor.readsTriggered);
|
||||
|
||||
// 1 complete frame, 1 partial frame: should NOT trigger a read
|
||||
channel.writeInbound(wrappedBuffer(new byte[] { 10, 11 }), wrappedBuffer(new byte[] { 12 }));
|
||||
assertEquals(1, interceptor.readsTriggered);
|
||||
|
||||
// 0 complete frames, 1 partial frame: SHOULD trigger a read
|
||||
channel.writeInbound(wrappedBuffer(new byte[] { 13 }));
|
||||
assertEquals(2, interceptor.readsTriggered);
|
||||
|
||||
// 1 complete frame, 0 partial frames: should NOT trigger a read
|
||||
channel.writeInbound(wrappedBuffer(new byte[] { 14 }));
|
||||
assertEquals(2, interceptor.readsTriggered);
|
||||
|
||||
for (int i = 0; i < 5; i++) {
|
||||
ByteBuf read = channel.readInbound();
|
||||
assertEquals(i * 3 + 0, read.getByte(0));
|
||||
assertEquals(i * 3 + 1, read.getByte(1));
|
||||
assertEquals(i * 3 + 2, read.getByte(2));
|
||||
read.release();
|
||||
}
|
||||
assertFalse(channel.finish());
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user