From 6a3723c6194f46540575db6eb0a0d12e68fa52bc Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Wed, 5 Dec 2018 15:29:33 +0100 Subject: [PATCH] Respect ctx.read() calls while processing reads for the child channels when using the Http2MultiplexCodec. (#8617) Motivation: We did not correct respect ctx.read() calls while processing a read for a child Channel. This could lead to read stales when auto read is disabled and no other read was requested. Modifications: - Keep track of extra read() calls while processing reads - Add unit tests that verify that read() is respected when triggered either in channelRead(...) or channelReadComplete(...) Result: Fixes https://github.com/netty/netty/issues/8209. --- .../codec/http2/Http2MultiplexCodec.java | 61 +++++++++++++++---- .../codec/http2/Http2MultiplexCodecTest.java | 54 ++++++++++++++++ 2 files changed, 102 insertions(+), 13 deletions(-) diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2MultiplexCodec.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2MultiplexCodec.java index a7ca0bb831..6de1dd2df9 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2MultiplexCodec.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2MultiplexCodec.java @@ -446,6 +446,26 @@ public class Http2MultiplexCodec extends Http2FrameCodec { return !isStreamIdValid(stream.id()) || isWritable(stream); } + /** + * The current status of the read-processing for a {@link Http2StreamChannel}. + */ + private enum ReadStatus { + /** + * No read in progress and no read was requested (yet) + */ + IDLE, + + /** + * Reading in progress + */ + IN_PROGRESS, + + /** + * A read operation was requested. + */ + REQUESTED + } + // TODO: Handle writability changes due writing from outside the eventloop. private final class DefaultHttp2StreamChannel extends DefaultAttributeMap implements Http2StreamChannel { private final Http2StreamChannelConfig config = new Http2StreamChannelConfig(this); @@ -461,13 +481,15 @@ public class Http2MultiplexCodec extends Http2FrameCodec { private volatile boolean writable; private boolean outboundClosed; + /** - * This variable represents if a read is in progress for the current channel. Note that depending upon the - * {@link RecvByteBufAllocator} behavior a read may extend beyond the {@link Http2ChannelUnsafe#beginRead()} - * method scope. The {@link Http2ChannelUnsafe#beginRead()} loop may drain all pending data, and then if the - * parent channel is reading this channel may still accept frames. + * This variable represents if a read is in progress for the current channel or was requested. + * Note that depending upon the {@link RecvByteBufAllocator} behavior a read may extend beyond the + * {@link Http2ChannelUnsafe#beginRead()} method scope. The {@link Http2ChannelUnsafe#beginRead()} loop may + * drain all pending data, and then if the parent channel is reading this channel may still accept frames. */ - private boolean readInProgress; + private ReadStatus readStatus = ReadStatus.IDLE; + private Queue inboundBuffer; /** {@code true} after the first HEADERS frame has been written **/ @@ -757,9 +779,9 @@ public class Http2MultiplexCodec extends Http2FrameCodec { assert eventLoop().inEventLoop(); if (!isActive()) { ReferenceCountUtil.release(frame); - } else if (readInProgress) { - // If readInProgress there cannot be anything in the queue, otherwise we would have drained it from the - // queue and processed it during the read cycle. + } else if (readStatus != ReadStatus.IDLE) { + // If a read is in progress or has been requested, there cannot be anything in the queue, + // otherwise we would have drained it from the queue and processed it during the read cycle. assert inboundBuffer == null || inboundBuffer.isEmpty(); final Handle allocHandle = unsafe.recvBufAllocHandle(); unsafe.doRead0(frame, allocHandle); @@ -783,7 +805,7 @@ public class Http2MultiplexCodec extends Http2FrameCodec { void fireChildReadComplete() { assert eventLoop().inEventLoop(); - assert readInProgress; + assert readStatus == ReadStatus.IN_PROGRESS; unsafe.notifyReadComplete(unsafe.recvBufAllocHandle()); } @@ -985,11 +1007,20 @@ public class Http2MultiplexCodec extends Http2FrameCodec { @Override public void beginRead() { - if (readInProgress || !isActive()) { + if (!isActive()) { return; } - readInProgress = true; - doBeginRead(); + switch (readStatus) { + case IDLE: + readStatus = ReadStatus.IN_PROGRESS; + doBeginRead(); + break; + case IN_PROGRESS: + readStatus = ReadStatus.REQUESTED; + break; + default: + break; + } } void doBeginRead() { @@ -1026,7 +1057,11 @@ public class Http2MultiplexCodec extends Http2FrameCodec { void notifyReadComplete(Handle allocHandle) { assert next == null && previous == null; - readInProgress = false; + if (readStatus == ReadStatus.REQUESTED) { + readStatus = ReadStatus.IN_PROGRESS; + } else { + readStatus = ReadStatus.IDLE; + } allocHandle.readComplete(); pipeline().fireChannelReadComplete(); // Reading data may result in frames being written (e.g. WINDOW_UPDATE, RST, etc..). If the parent diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2MultiplexCodecTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2MultiplexCodecTest.java index f09d55e916..d857cf8d1e 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2MultiplexCodecTest.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2MultiplexCodecTest.java @@ -260,6 +260,60 @@ public class Http2MultiplexCodecTest { verifyFramesMultiplexedToCorrectChannel(inboundStream, inboundHandler, 2); } + @Test + public void readInChannelReadWithoutAutoRead() { + useReadWithoutAutoRead(false); + } + + @Test + public void readInChannelReadCompleteWithoutAutoRead() { + useReadWithoutAutoRead(true); + } + + private void useReadWithoutAutoRead(final boolean readComplete) { + LastInboundHandler inboundHandler = streamActiveAndWriteHeaders(inboundStream); + Channel childChannel = inboundHandler.channel(); + assertTrue(childChannel.config().isAutoRead()); + childChannel.config().setAutoRead(false); + assertFalse(childChannel.config().isAutoRead()); + + Http2HeadersFrame headersFrame = inboundHandler.readInbound(); + assertNotNull(headersFrame); + + // Add a handler which will request reads. + childChannel.pipeline().addFirst(new ChannelInboundHandlerAdapter() { + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) { + ctx.fireChannelRead(msg); + if (!readComplete) { + ctx.read(); + } + } + + @Override + public void channelReadComplete(ChannelHandlerContext ctx) { + ctx.fireChannelReadComplete(); + if (readComplete) { + ctx.read(); + } + } + }); + + codec.onHttp2Frame( + new DefaultHttp2DataFrame(bb("hello world"), false).stream(inboundStream)); + codec.onHttp2Frame(new DefaultHttp2DataFrame(bb("foo"), false).stream(inboundStream)); + codec.onHttp2Frame(new DefaultHttp2DataFrame(bb("bar"), true).stream(inboundStream)); + codec.onChannelReadComplete(); + + codec.onHttp2Frame( + new DefaultHttp2DataFrame(bb("hello world"), false).stream(inboundStream)); + codec.onHttp2Frame(new DefaultHttp2DataFrame(bb("foo"), false).stream(inboundStream)); + codec.onHttp2Frame(new DefaultHttp2DataFrame(bb("bar"), true).stream(inboundStream)); + codec.onChannelReadComplete(); + + verifyFramesMultiplexedToCorrectChannel(inboundStream, inboundHandler, 6); + } + private Http2StreamChannel newOutboundStream() { return new Http2StreamChannelBootstrap(parentChannel).handler(childChannelInitializer) .open().syncUninterruptibly().getNow();