From 7b946a781e9ed78d9283f7907aeb4b8471dc9a3a Mon Sep 17 00:00:00 2001 From: Bryce Anderson Date: Wed, 4 Mar 2020 02:50:41 -0700 Subject: [PATCH] Make sure we always flush window update frames in AbstractHttp2StreamChannel (#10075) Motivation: Under certain read patters the AbstractHttp2StreamChannel can fail to flush, resulting in flow window starvation. Modifications: - Ensure we flush if we exit the `doBeginRead()` method. - Account for the Http2FrameCodec always synchronously finishing writes of window update frames. Result: Fixes #10072 --- .../http2/AbstractHttp2StreamChannel.java | 110 +++++++++--------- .../codec/http2/Http2MultiplexTest.java | 62 +++++++++- .../handler/codec/http2/Http2TestUtil.java | 4 + 3 files changed, 123 insertions(+), 53 deletions(-) diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/AbstractHttp2StreamChannel.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/AbstractHttp2StreamChannel.java index 547d724f55..92abf19a24 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/AbstractHttp2StreamChannel.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/AbstractHttp2StreamChannel.java @@ -558,10 +558,7 @@ abstract class AbstractHttp2StreamChannel extends DefaultAttributeMap implements // read (unknown, reset) and the trade off is less conditionals for the hot path (headers/data) at the // cost of additional readComplete notifications on the rare path. if (allocHandle.continueReading()) { - if (!readCompletePending) { - readCompletePending = true; - addChannelToReadCompletePendingQueue(); - } + maybeAddChannelToReadCompletePendingQueue(); } else { unsafe.notifyReadComplete(allocHandle, true); } @@ -807,6 +804,9 @@ abstract class AbstractHttp2StreamChannel extends DefaultAttributeMap implements if (readEOS) { unsafe.closeForcibly(); } + // We need to double check that there is nothing left to flush such as a + // window update frame. + flush(); break; } final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle(); @@ -822,10 +822,7 @@ abstract class AbstractHttp2StreamChannel extends DefaultAttributeMap implements // currently reading it is possible that more frames will be delivered to this child channel. In // the case that this child channel still wants to read we delay the channelReadComplete on this // child channel until the parent is done reading. - if (!readCompletePending) { - readCompletePending = true; - addChannelToReadCompletePendingQueue(); - } + maybeAddChannelToReadCompletePendingQueue(); } else { notifyReadComplete(allocHandle, true); } @@ -841,6 +838,10 @@ abstract class AbstractHttp2StreamChannel extends DefaultAttributeMap implements int bytes = flowControlledBytes; flowControlledBytes = 0; ChannelFuture future = write0(parentContext(), new DefaultHttp2WindowUpdateFrame(bytes).stream(stream)); + // window update frames are commonly swallowed by the Http2FrameCodec and the promise is synchronously + // completed but the flow controller _may_ have generated a wire level WINDOW_UPDATE. Therefore we need, + // to assume there was a write done that needs to be flushed or we risk flow control starvation. + writeDoneAndNoFlush = true; // Add a listener which will notify and teardown the stream // when a window update fails if needed or check the result of the future directly if it was completed // already. @@ -849,7 +850,6 @@ abstract class AbstractHttp2StreamChannel extends DefaultAttributeMap implements windowUpdateFrameWriteComplete(future, AbstractHttp2StreamChannel.this); } else { future.addListener(windowUpdateFrameWriteListener); - writeDoneAndNoFlush = true; } } } @@ -920,61 +920,60 @@ abstract class AbstractHttp2StreamChannel extends DefaultAttributeMap implements try { if (msg instanceof Http2StreamFrame) { Http2StreamFrame frame = validateStreamFrame((Http2StreamFrame) msg).stream(stream()); - if (!firstFrameWritten && !isStreamIdValid(stream().id())) { - if (!(frame instanceof Http2HeadersFrame)) { - ReferenceCountUtil.release(frame); - promise.setFailure( - new IllegalArgumentException("The first frame must be a headers frame. Was: " - + frame.name())); - return; - } - firstFrameWritten = true; - ChannelFuture f = write0(parentContext(), frame); - if (f.isDone()) { - firstWriteComplete(f, promise); - } else { - final long bytes = FlowControlledFrameSizeEstimator.HANDLE_INSTANCE.size(msg); - incrementPendingOutboundBytes(bytes, false); - f.addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) { - firstWriteComplete(future, promise); - decrementPendingOutboundBytes(bytes, false); - } - }); - writeDoneAndNoFlush = true; - } - return; - } + writeHttp2StreamFrame(frame, promise); } else { String msgStr = msg.toString(); ReferenceCountUtil.release(msg); promise.setFailure(new IllegalArgumentException( "Message must be an " + StringUtil.simpleClassName(Http2StreamFrame.class) + ": " + msgStr)); - return; - } - - ChannelFuture f = write0(parentContext(), msg); - if (f.isDone()) { - writeComplete(f, promise); - } else { - final long bytes = FlowControlledFrameSizeEstimator.HANDLE_INSTANCE.size(msg); - incrementPendingOutboundBytes(bytes, false); - f.addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) { - writeComplete(future, promise); - decrementPendingOutboundBytes(bytes, false); - } - }); - writeDoneAndNoFlush = true; } } catch (Throwable t) { promise.tryFailure(t); } } + private void writeHttp2StreamFrame(Http2StreamFrame frame, final ChannelPromise promise) { + if (!firstFrameWritten && !isStreamIdValid(stream().id()) && !(frame instanceof Http2HeadersFrame)) { + ReferenceCountUtil.release(frame); + promise.setFailure( + new IllegalArgumentException("The first frame must be a headers frame. Was: " + + frame.name())); + return; + } + + final boolean firstWrite; + if (firstFrameWritten) { + firstWrite = false; + } else { + firstWrite = firstFrameWritten = true; + } + + ChannelFuture f = write0(parentContext(), frame); + if (f.isDone()) { + if (firstWrite) { + firstWriteComplete(f, promise); + } else { + writeComplete(f, promise); + } + } else { + final long bytes = FlowControlledFrameSizeEstimator.HANDLE_INSTANCE.size(frame); + incrementPendingOutboundBytes(bytes, false); + f.addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) { + if (firstWrite) { + firstWriteComplete(future, promise); + } else { + writeComplete(future, promise); + } + decrementPendingOutboundBytes(bytes, false); + } + }); + writeDoneAndNoFlush = true; + } + } + private void firstWriteComplete(ChannelFuture future, ChannelPromise promise) { Throwable cause = future.cause(); if (cause == null) { @@ -1084,6 +1083,13 @@ abstract class AbstractHttp2StreamChannel extends DefaultAttributeMap implements } } + private void maybeAddChannelToReadCompletePendingQueue() { + if (!readCompletePending) { + readCompletePending = true; + addChannelToReadCompletePendingQueue(); + } + } + protected void flush0(ChannelHandlerContext ctx) { ctx.flush(); } diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2MultiplexTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2MultiplexTest.java index 8dcc579797..c1207f686e 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2MultiplexTest.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2MultiplexTest.java @@ -22,6 +22,7 @@ import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelOutboundHandlerAdapter; import io.netty.channel.ChannelPromise; import io.netty.channel.WriteBufferWaterMark; import io.netty.channel.embedded.EmbeddedChannel; @@ -63,7 +64,6 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.ArgumentMatchers.anyShort; import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.never; @@ -1152,6 +1152,66 @@ public abstract class Http2MultiplexTest { verifyFramesMultiplexedToCorrectChannel(childChannel, inboundHandler, 3); } + private static final class FlushSniffer extends ChannelOutboundHandlerAdapter { + + private boolean didFlush; + + public boolean checkFlush() { + boolean r = didFlush; + didFlush = false; + return r; + } + + @Override + public void flush(ChannelHandlerContext ctx) throws Exception { + didFlush = true; + super.flush(ctx); + } + } + + @Test + public void windowUpdatesAreFlushed() { + LastInboundHandler inboundHandler = new LastInboundHandler(); + FlushSniffer flushSniffer = new FlushSniffer(); + parentChannel.pipeline().addFirst(flushSniffer); + + Http2StreamChannel childChannel = newInboundStream(3, false, inboundHandler); + assertTrue(childChannel.config().isAutoRead()); + childChannel.config().setAutoRead(false); + assertFalse(childChannel.config().isAutoRead()); + + Http2HeadersFrame headersFrame = inboundHandler.readInbound(); + assertNotNull(headersFrame); + + assertTrue(flushSniffer.checkFlush()); + + // Write some bytes to get the channel into the idle state with buffered data and also verify we + // do not dispatch it until we receive a read() call. + frameInboundWriter.writeInboundData(childChannel.stream().id(), bb(16 * 1024), 0, false); + frameInboundWriter.writeInboundData(childChannel.stream().id(), bb(16 * 1024), 0, false); + assertTrue(flushSniffer.checkFlush()); + + verify(frameWriter, never()).writeWindowUpdate(eqCodecCtx(), anyInt(), anyInt(), anyChannelPromise()); + // only the first one was read because it was legacy auto-read behavior. + verifyFramesMultiplexedToCorrectChannel(childChannel, inboundHandler, 1); + assertFalse(flushSniffer.checkFlush()); + + // Trigger a read of the second frame. + childChannel.read(); + verifyFramesMultiplexedToCorrectChannel(childChannel, inboundHandler, 1); + // We expect a flush here because the StreamChannel will flush the smaller increment but the + // connection will collect the bytes and decide not to send a wire level frame until more are consumed. + assertTrue(flushSniffer.checkFlush()); + verify(frameWriter, never()).writeWindowUpdate(eqCodecCtx(), anyInt(), anyInt(), anyChannelPromise()); + + // Call read one more time which should trigger the writing of the flow control update. + childChannel.read(); + verify(frameWriter).writeWindowUpdate(eqCodecCtx(), eq(0), eq(32 * 1024), anyChannelPromise()); + verify(frameWriter).writeWindowUpdate( + eqCodecCtx(), eq(childChannel.stream().id()), eq(32 * 1024), anyChannelPromise()); + assertTrue(flushSniffer.checkFlush()); + } + private static void verifyFramesMultiplexedToCorrectChannel(Http2StreamChannel streamChannel, LastInboundHandler inboundHandler, int numFrames) { diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2TestUtil.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2TestUtil.java index 6fa3449709..ff420bf6da 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2TestUtil.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2TestUtil.java @@ -687,6 +687,10 @@ public final class Http2TestUtil { return ByteBufUtil.writeUtf8(UnpooledByteBufAllocator.DEFAULT, s); } + static ByteBuf bb(int size) { + return UnpooledByteBufAllocator.DEFAULT.buffer().writeZero(size); + } + static void assertEqualsAndRelease(Http2Frame expected, Http2Frame actual) { try { assertEquals(expected, actual);