Make sure we always flush window update frames in AbstractHttp2StreamChannel (#10075)
Motivation: Under certain read patters the AbstractHttp2StreamChannel can fail to flush, resulting in flow window starvation. Modifications: - Ensure we flush if we exit the `doBeginRead()` method. - Account for the Http2FrameCodec always synchronously finishing writes of window update frames. Result: Fixes #10072
This commit is contained in:
parent
1b0e3d95f4
commit
7b946a781e
@ -558,10 +558,7 @@ abstract class AbstractHttp2StreamChannel extends DefaultAttributeMap implements
|
||||
// read (unknown, reset) and the trade off is less conditionals for the hot path (headers/data) at the
|
||||
// cost of additional readComplete notifications on the rare path.
|
||||
if (allocHandle.continueReading()) {
|
||||
if (!readCompletePending) {
|
||||
readCompletePending = true;
|
||||
addChannelToReadCompletePendingQueue();
|
||||
}
|
||||
maybeAddChannelToReadCompletePendingQueue();
|
||||
} else {
|
||||
unsafe.notifyReadComplete(allocHandle, true);
|
||||
}
|
||||
@ -807,6 +804,9 @@ abstract class AbstractHttp2StreamChannel extends DefaultAttributeMap implements
|
||||
if (readEOS) {
|
||||
unsafe.closeForcibly();
|
||||
}
|
||||
// We need to double check that there is nothing left to flush such as a
|
||||
// window update frame.
|
||||
flush();
|
||||
break;
|
||||
}
|
||||
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
|
||||
@ -822,10 +822,7 @@ abstract class AbstractHttp2StreamChannel extends DefaultAttributeMap implements
|
||||
// currently reading it is possible that more frames will be delivered to this child channel. In
|
||||
// the case that this child channel still wants to read we delay the channelReadComplete on this
|
||||
// child channel until the parent is done reading.
|
||||
if (!readCompletePending) {
|
||||
readCompletePending = true;
|
||||
addChannelToReadCompletePendingQueue();
|
||||
}
|
||||
maybeAddChannelToReadCompletePendingQueue();
|
||||
} else {
|
||||
notifyReadComplete(allocHandle, true);
|
||||
}
|
||||
@ -841,6 +838,10 @@ abstract class AbstractHttp2StreamChannel extends DefaultAttributeMap implements
|
||||
int bytes = flowControlledBytes;
|
||||
flowControlledBytes = 0;
|
||||
ChannelFuture future = write0(parentContext(), new DefaultHttp2WindowUpdateFrame(bytes).stream(stream));
|
||||
// window update frames are commonly swallowed by the Http2FrameCodec and the promise is synchronously
|
||||
// completed but the flow controller _may_ have generated a wire level WINDOW_UPDATE. Therefore we need,
|
||||
// to assume there was a write done that needs to be flushed or we risk flow control starvation.
|
||||
writeDoneAndNoFlush = true;
|
||||
// Add a listener which will notify and teardown the stream
|
||||
// when a window update fails if needed or check the result of the future directly if it was completed
|
||||
// already.
|
||||
@ -849,7 +850,6 @@ abstract class AbstractHttp2StreamChannel extends DefaultAttributeMap implements
|
||||
windowUpdateFrameWriteComplete(future, AbstractHttp2StreamChannel.this);
|
||||
} else {
|
||||
future.addListener(windowUpdateFrameWriteListener);
|
||||
writeDoneAndNoFlush = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -920,59 +920,58 @@ abstract class AbstractHttp2StreamChannel extends DefaultAttributeMap implements
|
||||
try {
|
||||
if (msg instanceof Http2StreamFrame) {
|
||||
Http2StreamFrame frame = validateStreamFrame((Http2StreamFrame) msg).stream(stream());
|
||||
if (!firstFrameWritten && !isStreamIdValid(stream().id())) {
|
||||
if (!(frame instanceof Http2HeadersFrame)) {
|
||||
ReferenceCountUtil.release(frame);
|
||||
promise.setFailure(
|
||||
new IllegalArgumentException("The first frame must be a headers frame. Was: "
|
||||
+ frame.name()));
|
||||
return;
|
||||
}
|
||||
firstFrameWritten = true;
|
||||
ChannelFuture f = write0(parentContext(), frame);
|
||||
if (f.isDone()) {
|
||||
firstWriteComplete(f, promise);
|
||||
} else {
|
||||
final long bytes = FlowControlledFrameSizeEstimator.HANDLE_INSTANCE.size(msg);
|
||||
incrementPendingOutboundBytes(bytes, false);
|
||||
f.addListener(new ChannelFutureListener() {
|
||||
@Override
|
||||
public void operationComplete(ChannelFuture future) {
|
||||
firstWriteComplete(future, promise);
|
||||
decrementPendingOutboundBytes(bytes, false);
|
||||
}
|
||||
});
|
||||
writeDoneAndNoFlush = true;
|
||||
}
|
||||
return;
|
||||
}
|
||||
writeHttp2StreamFrame(frame, promise);
|
||||
} else {
|
||||
String msgStr = msg.toString();
|
||||
ReferenceCountUtil.release(msg);
|
||||
promise.setFailure(new IllegalArgumentException(
|
||||
"Message must be an " + StringUtil.simpleClassName(Http2StreamFrame.class) +
|
||||
": " + msgStr));
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
promise.tryFailure(t);
|
||||
}
|
||||
}
|
||||
|
||||
private void writeHttp2StreamFrame(Http2StreamFrame frame, final ChannelPromise promise) {
|
||||
if (!firstFrameWritten && !isStreamIdValid(stream().id()) && !(frame instanceof Http2HeadersFrame)) {
|
||||
ReferenceCountUtil.release(frame);
|
||||
promise.setFailure(
|
||||
new IllegalArgumentException("The first frame must be a headers frame. Was: "
|
||||
+ frame.name()));
|
||||
return;
|
||||
}
|
||||
|
||||
ChannelFuture f = write0(parentContext(), msg);
|
||||
if (f.isDone()) {
|
||||
writeComplete(f, promise);
|
||||
final boolean firstWrite;
|
||||
if (firstFrameWritten) {
|
||||
firstWrite = false;
|
||||
} else {
|
||||
final long bytes = FlowControlledFrameSizeEstimator.HANDLE_INSTANCE.size(msg);
|
||||
firstWrite = firstFrameWritten = true;
|
||||
}
|
||||
|
||||
ChannelFuture f = write0(parentContext(), frame);
|
||||
if (f.isDone()) {
|
||||
if (firstWrite) {
|
||||
firstWriteComplete(f, promise);
|
||||
} else {
|
||||
writeComplete(f, promise);
|
||||
}
|
||||
} else {
|
||||
final long bytes = FlowControlledFrameSizeEstimator.HANDLE_INSTANCE.size(frame);
|
||||
incrementPendingOutboundBytes(bytes, false);
|
||||
f.addListener(new ChannelFutureListener() {
|
||||
@Override
|
||||
public void operationComplete(ChannelFuture future) {
|
||||
if (firstWrite) {
|
||||
firstWriteComplete(future, promise);
|
||||
} else {
|
||||
writeComplete(future, promise);
|
||||
}
|
||||
decrementPendingOutboundBytes(bytes, false);
|
||||
}
|
||||
});
|
||||
writeDoneAndNoFlush = true;
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
promise.tryFailure(t);
|
||||
}
|
||||
}
|
||||
|
||||
private void firstWriteComplete(ChannelFuture future, ChannelPromise promise) {
|
||||
@ -1084,6 +1083,13 @@ abstract class AbstractHttp2StreamChannel extends DefaultAttributeMap implements
|
||||
}
|
||||
}
|
||||
|
||||
private void maybeAddChannelToReadCompletePendingQueue() {
|
||||
if (!readCompletePending) {
|
||||
readCompletePending = true;
|
||||
addChannelToReadCompletePendingQueue();
|
||||
}
|
||||
}
|
||||
|
||||
protected void flush0(ChannelHandlerContext ctx) {
|
||||
ctx.flush();
|
||||
}
|
||||
|
@ -22,6 +22,7 @@ import io.netty.channel.ChannelFutureListener;
|
||||
import io.netty.channel.ChannelHandler;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelInboundHandlerAdapter;
|
||||
import io.netty.channel.ChannelOutboundHandlerAdapter;
|
||||
import io.netty.channel.ChannelPromise;
|
||||
import io.netty.channel.WriteBufferWaterMark;
|
||||
import io.netty.channel.embedded.EmbeddedChannel;
|
||||
@ -63,7 +64,6 @@ import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.ArgumentMatchers.anyBoolean;
|
||||
import static org.mockito.ArgumentMatchers.anyInt;
|
||||
import static org.mockito.ArgumentMatchers.anyLong;
|
||||
import static org.mockito.ArgumentMatchers.anyShort;
|
||||
import static org.mockito.ArgumentMatchers.argThat;
|
||||
import static org.mockito.ArgumentMatchers.eq;
|
||||
import static org.mockito.Mockito.never;
|
||||
@ -1152,6 +1152,66 @@ public abstract class Http2MultiplexTest<C extends Http2FrameCodec> {
|
||||
verifyFramesMultiplexedToCorrectChannel(childChannel, inboundHandler, 3);
|
||||
}
|
||||
|
||||
private static final class FlushSniffer extends ChannelOutboundHandlerAdapter {
|
||||
|
||||
private boolean didFlush;
|
||||
|
||||
public boolean checkFlush() {
|
||||
boolean r = didFlush;
|
||||
didFlush = false;
|
||||
return r;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void flush(ChannelHandlerContext ctx) throws Exception {
|
||||
didFlush = true;
|
||||
super.flush(ctx);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void windowUpdatesAreFlushed() {
|
||||
LastInboundHandler inboundHandler = new LastInboundHandler();
|
||||
FlushSniffer flushSniffer = new FlushSniffer();
|
||||
parentChannel.pipeline().addFirst(flushSniffer);
|
||||
|
||||
Http2StreamChannel childChannel = newInboundStream(3, false, inboundHandler);
|
||||
assertTrue(childChannel.config().isAutoRead());
|
||||
childChannel.config().setAutoRead(false);
|
||||
assertFalse(childChannel.config().isAutoRead());
|
||||
|
||||
Http2HeadersFrame headersFrame = inboundHandler.readInbound();
|
||||
assertNotNull(headersFrame);
|
||||
|
||||
assertTrue(flushSniffer.checkFlush());
|
||||
|
||||
// Write some bytes to get the channel into the idle state with buffered data and also verify we
|
||||
// do not dispatch it until we receive a read() call.
|
||||
frameInboundWriter.writeInboundData(childChannel.stream().id(), bb(16 * 1024), 0, false);
|
||||
frameInboundWriter.writeInboundData(childChannel.stream().id(), bb(16 * 1024), 0, false);
|
||||
assertTrue(flushSniffer.checkFlush());
|
||||
|
||||
verify(frameWriter, never()).writeWindowUpdate(eqCodecCtx(), anyInt(), anyInt(), anyChannelPromise());
|
||||
// only the first one was read because it was legacy auto-read behavior.
|
||||
verifyFramesMultiplexedToCorrectChannel(childChannel, inboundHandler, 1);
|
||||
assertFalse(flushSniffer.checkFlush());
|
||||
|
||||
// Trigger a read of the second frame.
|
||||
childChannel.read();
|
||||
verifyFramesMultiplexedToCorrectChannel(childChannel, inboundHandler, 1);
|
||||
// We expect a flush here because the StreamChannel will flush the smaller increment but the
|
||||
// connection will collect the bytes and decide not to send a wire level frame until more are consumed.
|
||||
assertTrue(flushSniffer.checkFlush());
|
||||
verify(frameWriter, never()).writeWindowUpdate(eqCodecCtx(), anyInt(), anyInt(), anyChannelPromise());
|
||||
|
||||
// Call read one more time which should trigger the writing of the flow control update.
|
||||
childChannel.read();
|
||||
verify(frameWriter).writeWindowUpdate(eqCodecCtx(), eq(0), eq(32 * 1024), anyChannelPromise());
|
||||
verify(frameWriter).writeWindowUpdate(
|
||||
eqCodecCtx(), eq(childChannel.stream().id()), eq(32 * 1024), anyChannelPromise());
|
||||
assertTrue(flushSniffer.checkFlush());
|
||||
}
|
||||
|
||||
private static void verifyFramesMultiplexedToCorrectChannel(Http2StreamChannel streamChannel,
|
||||
LastInboundHandler inboundHandler,
|
||||
int numFrames) {
|
||||
|
@ -687,6 +687,10 @@ public final class Http2TestUtil {
|
||||
return ByteBufUtil.writeUtf8(UnpooledByteBufAllocator.DEFAULT, s);
|
||||
}
|
||||
|
||||
static ByteBuf bb(int size) {
|
||||
return UnpooledByteBufAllocator.DEFAULT.buffer().writeZero(size);
|
||||
}
|
||||
|
||||
static void assertEqualsAndRelease(Http2Frame expected, Http2Frame actual) {
|
||||
try {
|
||||
assertEquals(expected, actual);
|
||||
|
Loading…
x
Reference in New Issue
Block a user