Respect ctx.read() calls while processing reads for the child channels when using the Http2MultiplexCodec. (#8617)
Motivation: We did not correct respect ctx.read() calls while processing a read for a child Channel. This could lead to read stales when auto read is disabled and no other read was requested. Modifications: - Keep track of extra read() calls while processing reads - Add unit tests that verify that read() is respected when triggered either in channelRead(...) or channelReadComplete(...) Result: Fixes https://github.com/netty/netty/issues/8209.
This commit is contained in:
parent
d0d30f1fbe
commit
9f9aa1ae01
@ -446,6 +446,26 @@ public class Http2MultiplexCodec extends Http2FrameCodec {
|
|||||||
return !isStreamIdValid(stream.id()) || isWritable(stream);
|
return !isStreamIdValid(stream.id()) || isWritable(stream);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The current status of the read-processing for a {@link Http2StreamChannel}.
|
||||||
|
*/
|
||||||
|
private enum ReadStatus {
|
||||||
|
/**
|
||||||
|
* No read in progress and no read was requested (yet)
|
||||||
|
*/
|
||||||
|
IDLE,
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Reading in progress
|
||||||
|
*/
|
||||||
|
IN_PROGRESS,
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A read operation was requested.
|
||||||
|
*/
|
||||||
|
REQUESTED
|
||||||
|
}
|
||||||
|
|
||||||
// TODO: Handle writability changes due writing from outside the eventloop.
|
// TODO: Handle writability changes due writing from outside the eventloop.
|
||||||
private final class DefaultHttp2StreamChannel extends DefaultAttributeMap implements Http2StreamChannel {
|
private final class DefaultHttp2StreamChannel extends DefaultAttributeMap implements Http2StreamChannel {
|
||||||
private final Http2StreamChannelConfig config = new Http2StreamChannelConfig(this);
|
private final Http2StreamChannelConfig config = new Http2StreamChannelConfig(this);
|
||||||
@ -461,13 +481,15 @@ public class Http2MultiplexCodec extends Http2FrameCodec {
|
|||||||
private volatile boolean writable;
|
private volatile boolean writable;
|
||||||
|
|
||||||
private boolean outboundClosed;
|
private boolean outboundClosed;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This variable represents if a read is in progress for the current channel. Note that depending upon the
|
* This variable represents if a read is in progress for the current channel or was requested.
|
||||||
* {@link RecvByteBufAllocator} behavior a read may extend beyond the {@link Http2ChannelUnsafe#beginRead()}
|
* Note that depending upon the {@link RecvByteBufAllocator} behavior a read may extend beyond the
|
||||||
* method scope. The {@link Http2ChannelUnsafe#beginRead()} loop may drain all pending data, and then if the
|
* {@link Http2ChannelUnsafe#beginRead()} method scope. The {@link Http2ChannelUnsafe#beginRead()} loop may
|
||||||
* parent channel is reading this channel may still accept frames.
|
* drain all pending data, and then if the parent channel is reading this channel may still accept frames.
|
||||||
*/
|
*/
|
||||||
private boolean readInProgress;
|
private ReadStatus readStatus = ReadStatus.IDLE;
|
||||||
|
|
||||||
private Queue<Object> inboundBuffer;
|
private Queue<Object> inboundBuffer;
|
||||||
|
|
||||||
/** {@code true} after the first HEADERS frame has been written **/
|
/** {@code true} after the first HEADERS frame has been written **/
|
||||||
@ -757,9 +779,9 @@ public class Http2MultiplexCodec extends Http2FrameCodec {
|
|||||||
assert eventLoop().inEventLoop();
|
assert eventLoop().inEventLoop();
|
||||||
if (!isActive()) {
|
if (!isActive()) {
|
||||||
ReferenceCountUtil.release(frame);
|
ReferenceCountUtil.release(frame);
|
||||||
} else if (readInProgress) {
|
} else if (readStatus != ReadStatus.IDLE) {
|
||||||
// If readInProgress there cannot be anything in the queue, otherwise we would have drained it from the
|
// If a read is in progress or has been requested, there cannot be anything in the queue,
|
||||||
// queue and processed it during the read cycle.
|
// otherwise we would have drained it from the queue and processed it during the read cycle.
|
||||||
assert inboundBuffer == null || inboundBuffer.isEmpty();
|
assert inboundBuffer == null || inboundBuffer.isEmpty();
|
||||||
final Handle allocHandle = unsafe.recvBufAllocHandle();
|
final Handle allocHandle = unsafe.recvBufAllocHandle();
|
||||||
unsafe.doRead0(frame, allocHandle);
|
unsafe.doRead0(frame, allocHandle);
|
||||||
@ -783,7 +805,7 @@ public class Http2MultiplexCodec extends Http2FrameCodec {
|
|||||||
|
|
||||||
void fireChildReadComplete() {
|
void fireChildReadComplete() {
|
||||||
assert eventLoop().inEventLoop();
|
assert eventLoop().inEventLoop();
|
||||||
assert readInProgress;
|
assert readStatus == ReadStatus.IN_PROGRESS;
|
||||||
unsafe.notifyReadComplete(unsafe.recvBufAllocHandle());
|
unsafe.notifyReadComplete(unsafe.recvBufAllocHandle());
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -985,11 +1007,20 @@ public class Http2MultiplexCodec extends Http2FrameCodec {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void beginRead() {
|
public void beginRead() {
|
||||||
if (readInProgress || !isActive()) {
|
if (!isActive()) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
readInProgress = true;
|
switch (readStatus) {
|
||||||
doBeginRead();
|
case IDLE:
|
||||||
|
readStatus = ReadStatus.IN_PROGRESS;
|
||||||
|
doBeginRead();
|
||||||
|
break;
|
||||||
|
case IN_PROGRESS:
|
||||||
|
readStatus = ReadStatus.REQUESTED;
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void doBeginRead() {
|
void doBeginRead() {
|
||||||
@ -1026,7 +1057,11 @@ public class Http2MultiplexCodec extends Http2FrameCodec {
|
|||||||
|
|
||||||
void notifyReadComplete(Handle allocHandle) {
|
void notifyReadComplete(Handle allocHandle) {
|
||||||
assert next == null && previous == null;
|
assert next == null && previous == null;
|
||||||
readInProgress = false;
|
if (readStatus == ReadStatus.REQUESTED) {
|
||||||
|
readStatus = ReadStatus.IN_PROGRESS;
|
||||||
|
} else {
|
||||||
|
readStatus = ReadStatus.IDLE;
|
||||||
|
}
|
||||||
allocHandle.readComplete();
|
allocHandle.readComplete();
|
||||||
pipeline().fireChannelReadComplete();
|
pipeline().fireChannelReadComplete();
|
||||||
// Reading data may result in frames being written (e.g. WINDOW_UPDATE, RST, etc..). If the parent
|
// Reading data may result in frames being written (e.g. WINDOW_UPDATE, RST, etc..). If the parent
|
||||||
|
@ -260,6 +260,60 @@ public class Http2MultiplexCodecTest {
|
|||||||
verifyFramesMultiplexedToCorrectChannel(inboundStream, inboundHandler, 2);
|
verifyFramesMultiplexedToCorrectChannel(inboundStream, inboundHandler, 2);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void readInChannelReadWithoutAutoRead() {
|
||||||
|
useReadWithoutAutoRead(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void readInChannelReadCompleteWithoutAutoRead() {
|
||||||
|
useReadWithoutAutoRead(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void useReadWithoutAutoRead(final boolean readComplete) {
|
||||||
|
LastInboundHandler inboundHandler = streamActiveAndWriteHeaders(inboundStream);
|
||||||
|
Channel childChannel = inboundHandler.channel();
|
||||||
|
assertTrue(childChannel.config().isAutoRead());
|
||||||
|
childChannel.config().setAutoRead(false);
|
||||||
|
assertFalse(childChannel.config().isAutoRead());
|
||||||
|
|
||||||
|
Http2HeadersFrame headersFrame = inboundHandler.readInbound();
|
||||||
|
assertNotNull(headersFrame);
|
||||||
|
|
||||||
|
// Add a handler which will request reads.
|
||||||
|
childChannel.pipeline().addFirst(new ChannelInboundHandlerAdapter() {
|
||||||
|
@Override
|
||||||
|
public void channelRead(ChannelHandlerContext ctx, Object msg) {
|
||||||
|
ctx.fireChannelRead(msg);
|
||||||
|
if (!readComplete) {
|
||||||
|
ctx.read();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void channelReadComplete(ChannelHandlerContext ctx) {
|
||||||
|
ctx.fireChannelReadComplete();
|
||||||
|
if (readComplete) {
|
||||||
|
ctx.read();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
codec.onHttp2Frame(
|
||||||
|
new DefaultHttp2DataFrame(bb("hello world"), false).stream(inboundStream));
|
||||||
|
codec.onHttp2Frame(new DefaultHttp2DataFrame(bb("foo"), false).stream(inboundStream));
|
||||||
|
codec.onHttp2Frame(new DefaultHttp2DataFrame(bb("bar"), true).stream(inboundStream));
|
||||||
|
codec.onChannelReadComplete();
|
||||||
|
|
||||||
|
codec.onHttp2Frame(
|
||||||
|
new DefaultHttp2DataFrame(bb("hello world"), false).stream(inboundStream));
|
||||||
|
codec.onHttp2Frame(new DefaultHttp2DataFrame(bb("foo"), false).stream(inboundStream));
|
||||||
|
codec.onHttp2Frame(new DefaultHttp2DataFrame(bb("bar"), true).stream(inboundStream));
|
||||||
|
codec.onChannelReadComplete();
|
||||||
|
|
||||||
|
verifyFramesMultiplexedToCorrectChannel(inboundStream, inboundHandler, 6);
|
||||||
|
}
|
||||||
|
|
||||||
private Http2StreamChannel newOutboundStream() {
|
private Http2StreamChannel newOutboundStream() {
|
||||||
return new Http2StreamChannelBootstrap(parentChannel).handler(childChannelInitializer)
|
return new Http2StreamChannelBootstrap(parentChannel).handler(childChannelInitializer)
|
||||||
.open().syncUninterruptibly().getNow();
|
.open().syncUninterruptibly().getNow();
|
||||||
|
Loading…
Reference in New Issue
Block a user