Revert "Only call ctx.fireChannelReadComplete() if ByteToMessageDecoder decoded at least one message."

This reverts commit d63bb4811ed8ccd5d9e45853f3ac6aee9da7ecab as this not covered correctly all cases and so could lead to missing fireChannelReadComplete() calls. We will re-evalute d63bb4811ed8ccd5d9e45853f3ac6aee9da7ecab and resbumit a pr once we are sure all is handled correctly
This commit is contained in:
Norman Maurer 2017-08-17 19:00:19 +02:00
parent 5de38051c9
commit 123e07ca80
4 changed files with 18 additions and 58 deletions

View File

@ -125,10 +125,13 @@ public class SpdyFrameCodec extends ByteToMessageDecoder
@Override @Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
boolean wasRead = read; if (!read) {
if (!ctx.channel().config().isAutoRead()) {
ctx.read();
}
}
read = false; read = false;
super.channelReadComplete(ctx);
channelReadComplete(ctx, wasRead);
} }
@Override @Override

View File

@ -313,18 +313,15 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter
@Override @Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
channelReadComplete(ctx, !decodeWasNull);
}
protected final void channelReadComplete(ChannelHandlerContext ctx, boolean readData) throws Exception {
numReads = 0; numReads = 0;
discardSomeReadBytes(); discardSomeReadBytes();
decodeWasNull = false; if (decodeWasNull) {
if (readData) { decodeWasNull = false;
ctx.fireChannelReadComplete(); if (!ctx.channel().config().isAutoRead()) {
} else if (!ctx.channel().config().isAutoRead()) { ctx.read();
ctx.read(); }
} }
ctx.fireChannelReadComplete();
} }
protected final void discardSomeReadBytes() { protected final void discardSomeReadBytes() {

View File

@ -26,8 +26,6 @@ import org.junit.Test;
import java.util.List; import java.util.List;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
@ -307,41 +305,4 @@ public class ByteToMessageDecoderTest {
assertFalse(channel.writeInbound(Unpooled.wrappedBuffer(new byte[] { (byte) 2 }))); assertFalse(channel.writeInbound(Unpooled.wrappedBuffer(new byte[] { (byte) 2 })));
assertFalse(channel.finish()); assertFalse(channel.finish());
} }
@Test
public void testFireChannelReadComplete() {
final AtomicBoolean readCompleteExpected = new AtomicBoolean();
final AtomicInteger readCompleteCount = new AtomicInteger();
EmbeddedChannel channel = new EmbeddedChannel(new ByteToMessageDecoder() {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
if (in.readableBytes() > 1) {
readCompleteExpected.set(true);
out.add(in.readBytes(in.readableBytes()));
}
}
}, new ChannelInboundHandlerAdapter() {
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
assertTrue(readCompleteExpected.get());
readCompleteCount.incrementAndGet();
}
});
assertFalse(channel.writeInbound(Unpooled.wrappedBuffer(new byte[] {'a'})));
assertTrue(channel.writeInbound(Unpooled.wrappedBuffer(new byte[] {'b'})));
ByteBuf b = channel.readInbound();
ByteBuf expected = Unpooled.wrappedBuffer(new byte[] {'a', 'b'});
assertEquals(expected, b);
b.release();
expected.release();
assertTrue(readCompleteExpected.get());
assertFalse(channel.finish());
assertEquals(1, readCompleteCount.get());
}
} }

View File

@ -1167,15 +1167,14 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH
@Override @Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
// Discard bytes of the cumulation buffer if needed.
discardSomeReadBytes();
flushIfNeeded(ctx); flushIfNeeded(ctx);
boolean readData = firedChannelRead; readIfNeeded(ctx);
firedChannelRead = false; firedChannelRead = false;
// if readData is false channelReadComplete(....) will take care of calling read. ctx.fireChannelReadComplete();
if (readData && !handshakePromise.isDone() && !ctx.channel().config().isAutoRead()) {
// If handshake is not finished yet, we need more data.
ctx.read();
}
channelReadComplete(ctx, readData);
} }
private void readIfNeeded(ChannelHandlerContext ctx) { private void readIfNeeded(ChannelHandlerContext ctx) {