Only call ctx.fireChannelReadComplete() if ByteToMessageDecoder decoded at least one message.
Motivation: Its wasteful and also confusing that channelReadComplete() is called even if there was no message forwarded to the next handler. Modifications: - Only call ctx.fireChannelReadComplete() if at least one message was decoded - Add unit test Result: Less confusing behavior. Fixes [#4312].
This commit is contained in:
parent
a75ac747f0
commit
d63bb4811e
|
@ -125,13 +125,10 @@ public class SpdyFrameCodec extends ByteToMessageDecoder
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
|
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
|
||||||
if (!read) {
|
boolean wasRead = read;
|
||||||
if (!ctx.channel().config().isAutoRead()) {
|
|
||||||
ctx.read();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
read = false;
|
read = false;
|
||||||
super.channelReadComplete(ctx);
|
|
||||||
|
channelReadComplete(ctx, wasRead);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -313,15 +313,18 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
|
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
|
||||||
|
channelReadComplete(ctx, !decodeWasNull);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected final void channelReadComplete(ChannelHandlerContext ctx, boolean readData) throws Exception {
|
||||||
numReads = 0;
|
numReads = 0;
|
||||||
discardSomeReadBytes();
|
discardSomeReadBytes();
|
||||||
if (decodeWasNull) {
|
decodeWasNull = false;
|
||||||
decodeWasNull = false;
|
if (readData) {
|
||||||
if (!ctx.channel().config().isAutoRead()) {
|
ctx.fireChannelReadComplete();
|
||||||
ctx.read();
|
} else if (!ctx.channel().config().isAutoRead()) {
|
||||||
}
|
ctx.read();
|
||||||
}
|
}
|
||||||
ctx.fireChannelReadComplete();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
protected final void discardSomeReadBytes() {
|
protected final void discardSomeReadBytes() {
|
||||||
|
|
|
@ -26,6 +26,8 @@ import org.junit.Test;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.BlockingQueue;
|
import java.util.concurrent.BlockingQueue;
|
||||||
import java.util.concurrent.LinkedBlockingDeque;
|
import java.util.concurrent.LinkedBlockingDeque;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertFalse;
|
import static org.junit.Assert.assertFalse;
|
||||||
|
@ -305,4 +307,41 @@ public class ByteToMessageDecoderTest {
|
||||||
assertFalse(channel.writeInbound(Unpooled.wrappedBuffer(new byte[] { (byte) 2 })));
|
assertFalse(channel.writeInbound(Unpooled.wrappedBuffer(new byte[] { (byte) 2 })));
|
||||||
assertFalse(channel.finish());
|
assertFalse(channel.finish());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testFireChannelReadComplete() {
|
||||||
|
final AtomicBoolean readCompleteExpected = new AtomicBoolean();
|
||||||
|
final AtomicInteger readCompleteCount = new AtomicInteger();
|
||||||
|
EmbeddedChannel channel = new EmbeddedChannel(new ByteToMessageDecoder() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
|
||||||
|
if (in.readableBytes() > 1) {
|
||||||
|
readCompleteExpected.set(true);
|
||||||
|
out.add(in.readBytes(in.readableBytes()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}, new ChannelInboundHandlerAdapter() {
|
||||||
|
@Override
|
||||||
|
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
|
||||||
|
assertTrue(readCompleteExpected.get());
|
||||||
|
readCompleteCount.incrementAndGet();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
assertFalse(channel.writeInbound(Unpooled.wrappedBuffer(new byte[] {'a'})));
|
||||||
|
assertTrue(channel.writeInbound(Unpooled.wrappedBuffer(new byte[] {'b'})));
|
||||||
|
ByteBuf b = channel.readInbound();
|
||||||
|
|
||||||
|
ByteBuf expected = Unpooled.wrappedBuffer(new byte[] {'a', 'b'});
|
||||||
|
assertEquals(expected, b);
|
||||||
|
b.release();
|
||||||
|
expected.release();
|
||||||
|
|
||||||
|
assertTrue(readCompleteExpected.get());
|
||||||
|
|
||||||
|
assertFalse(channel.finish());
|
||||||
|
|
||||||
|
assertEquals(1, readCompleteCount.get());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1167,14 +1167,15 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
|
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
|
||||||
// Discard bytes of the cumulation buffer if needed.
|
|
||||||
discardSomeReadBytes();
|
|
||||||
|
|
||||||
flushIfNeeded(ctx);
|
flushIfNeeded(ctx);
|
||||||
readIfNeeded(ctx);
|
boolean readData = firedChannelRead;
|
||||||
|
|
||||||
firedChannelRead = false;
|
firedChannelRead = false;
|
||||||
ctx.fireChannelReadComplete();
|
// if readData is false channelReadComplete(....) will take care of calling read.
|
||||||
|
if (readData && !handshakePromise.isDone() && !ctx.channel().config().isAutoRead()) {
|
||||||
|
// If handshake is not finished yet, we need more data.
|
||||||
|
ctx.read();
|
||||||
|
}
|
||||||
|
channelReadComplete(ctx, readData);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void readIfNeeded(ChannelHandlerContext ctx) {
|
private void readIfNeeded(ChannelHandlerContext ctx) {
|
||||||
|
|
Loading…
Reference in New Issue
Block a user