From 6504d52b9431e7ec785377e29c34584f7f876f88 Mon Sep 17 00:00:00 2001 From: nmittler Date: Fri, 30 Oct 2015 09:14:52 -0700 Subject: [PATCH] Add HTTP/2 local flow control option for auto refill Motivation: For many HTTP/2 applications (such as gRPC) it is necessary to autorefill the connection window in order to prevent application-level deadlocking. Consider an application with 2 streams, A and B. A receives a stream of messages and the application pops off one message at a time and makes a request on stream B. However, if receiving of data on A has caused the connection window to collapse, B will not be able to receive any data and the application will deadlock. The only way (currently) to get around this is 1) use multiple connections, or 2) manually refill the connection window. Both are undesirable and could needlessly complicate the application code. Modifications: Add a configuration option to DefaultHttp2LocalFlowController, allowing it to autorefill the connection window. Result: Applications can configure HTTP/2 to avoid inter-stream deadlocking. --- .../http2/DefaultHttp2ConnectionDecoder.java | 4 +- .../http2/DefaultHttp2ConnectionEncoder.java | 15 ++--- .../DefaultHttp2LocalFlowController.java | 62 ++++++++++++++--- .../DefaultHttp2RemoteFlowController.java | 20 +++--- .../DelegatingDecompressorFrameListener.java | 5 ++ .../codec/http2/Http2ConnectionHandler.java | 3 +- .../codec/http2/Http2LocalFlowController.java | 8 ++- .../http2/Http2RemoteFlowController.java | 10 +-- .../codec/http2/DataCompressionHttp2Test.java | 15 ++++- .../DefaultHttp2ConnectionDecoderTest.java | 2 + .../DefaultHttp2LocalFlowControllerTest.java | 67 ++++++++++++++----- .../http2/StreamBufferingEncoderTest.java | 2 + .../http2/NoopHttp2LocalFlowController.java | 6 ++ 13 files changed, 164 insertions(+), 55 deletions(-) diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionDecoder.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionDecoder.java index 882beb1200..ccb189fda8 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionDecoder.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionDecoder.java @@ -62,9 +62,9 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder { this.encoder = checkNotNull(encoder, "encoder"); this.requestVerifier = checkNotNull(requestVerifier, "requestVerifier"); if (connection.local().flowController() == null) { - connection.local().flowController( - new DefaultHttp2LocalFlowController(connection, encoder.frameWriter())); + connection.local().flowController(new DefaultHttp2LocalFlowController(connection)); } + connection.local().flowController().frameWriter(encoder.frameWriter()); } @Override diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionEncoder.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionEncoder.java index 1112a10146..cac68747fa 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionEncoder.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionEncoder.java @@ -198,8 +198,7 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder { return promise.setFailure(t); } - ChannelFuture future = frameWriter.writePriority(ctx, streamId, streamDependency, weight, exclusive, promise); - return future; + return frameWriter.writePriority(ctx, streamId, streamDependency, weight, exclusive, promise); } @Override @@ -222,20 +221,17 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder { return promise.setFailure(e); } - ChannelFuture future = frameWriter.writeSettings(ctx, settings, promise); - return future; + return frameWriter.writeSettings(ctx, settings, promise); } @Override public ChannelFuture writeSettingsAck(ChannelHandlerContext ctx, ChannelPromise promise) { - ChannelFuture future = frameWriter.writeSettingsAck(ctx, promise); - return future; + return frameWriter.writeSettingsAck(ctx, promise); } @Override public ChannelFuture writePing(ChannelHandlerContext ctx, boolean ack, ByteBuf data, ChannelPromise promise) { - ChannelFuture future = frameWriter.writePing(ctx, ack, data, promise); - return future; + return frameWriter.writePing(ctx, ack, data, promise); } @Override @@ -253,8 +249,7 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder { return promise.setFailure(e); } - ChannelFuture future = frameWriter.writePushPromise(ctx, streamId, promisedStreamId, headers, padding, promise); - return future; + return frameWriter.writePushPromise(ctx, streamId, promisedStreamId, headers, padding, promise); } @Override diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2LocalFlowController.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2LocalFlowController.java index 11fada19aa..60c4633f68 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2LocalFlowController.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2LocalFlowController.java @@ -46,26 +46,41 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController public static final float DEFAULT_WINDOW_UPDATE_RATIO = 0.5f; private final Http2Connection connection; - private final Http2FrameWriter frameWriter; private final Http2Connection.PropertyKey stateKey; + private Http2FrameWriter frameWriter; private ChannelHandlerContext ctx; private float windowUpdateRatio; private int initialWindowSize = DEFAULT_WINDOW_SIZE; - public DefaultHttp2LocalFlowController(Http2Connection connection, Http2FrameWriter frameWriter) { - this(connection, frameWriter, DEFAULT_WINDOW_UPDATE_RATIO); + public DefaultHttp2LocalFlowController(Http2Connection connection) { + this(connection, DEFAULT_WINDOW_UPDATE_RATIO, false); } + /** + * Constructs a controller with the given settings. + * + * @param connection the connection state. + * @param windowUpdateRatio the window percentage below which to send a {@code WINDOW_UPDATE}. + * @param autoRefillConnectionWindow if {@code true}, effectively disables the connection window + * in the flow control algorithm as they will always refill automatically without requiring the + * application to consume the bytes. When enabled, the maximum bytes you must be prepared to + * queue is proportional to {@code maximum number of concurrent streams * the initial window + * size per stream} + * (SETTINGS_MAX_CONCURRENT_STREAMS + * SETTINGS_INITIAL_WINDOW_SIZE). + */ public DefaultHttp2LocalFlowController(Http2Connection connection, - Http2FrameWriter frameWriter, float windowUpdateRatio) { + float windowUpdateRatio, + boolean autoRefillConnectionWindow) { this.connection = checkNotNull(connection, "connection"); - this.frameWriter = checkNotNull(frameWriter, "frameWriter"); windowUpdateRatio(windowUpdateRatio); // Add a flow state for the connection. stateKey = connection.newKey(); - connection.connectionStream() - .setProperty(stateKey, new DefaultState(connection.connectionStream(), initialWindowSize)); + FlowState connectionState = autoRefillConnectionWindow ? + new AutoRefillState(connection.connectionStream(), initialWindowSize) : + new DefaultState(connection.connectionStream(), initialWindowSize); + connection.connectionStream().setProperty(stateKey, connectionState); // Register for notification of new streams. connection.addListener(new Http2ConnectionAdapter() { @@ -106,9 +121,15 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController }); } + @Override + public DefaultHttp2LocalFlowController frameWriter(Http2FrameWriter frameWriter) { + this.frameWriter = checkNotNull(frameWriter, "frameWriter"); + return this; + } + @Override public void channelHandlerContext(ChannelHandlerContext ctx) { - this.ctx = ctx; + this.ctx = checkNotNull(ctx, "ctx"); } @Override @@ -269,10 +290,33 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController return stream.state() == Http2Stream.State.CLOSED; } + /** + * Flow control state that does autorefill of the flow control window when the data is + * received. + */ + private final class AutoRefillState extends DefaultState { + public AutoRefillState(Http2Stream stream, int initialWindowSize) { + super(stream, initialWindowSize); + } + + @Override + public void receiveFlowControlledFrame(int dataLength) throws Http2Exception { + super.receiveFlowControlledFrame(dataLength); + // Need to call the super to consume the bytes, since this.consumeBytes does nothing. + super.consumeBytes(dataLength); + } + + @Override + public boolean consumeBytes(int numBytes) throws Http2Exception { + // Do nothing, since the bytes are already consumed upon receiving the data. + return false; + } + } + /** * Flow control window state for an individual stream. */ - private final class DefaultState implements FlowState { + private class DefaultState implements FlowState { private final Http2Stream stream; /** diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2RemoteFlowController.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2RemoteFlowController.java index bc0db77844..366adc7006 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2RemoteFlowController.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2RemoteFlowController.java @@ -14,14 +14,6 @@ */ package io.netty.handler.codec.http2; -import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.http2.StreamByteDistributor.Writer; -import io.netty.util.internal.logging.InternalLogger; -import io.netty.util.internal.logging.InternalLoggerFactory; - -import java.util.ArrayDeque; -import java.util.Deque; - import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_WINDOW_SIZE; import static io.netty.handler.codec.http2.Http2Error.FLOW_CONTROL_ERROR; import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR; @@ -32,6 +24,14 @@ import static io.netty.util.internal.ObjectUtil.checkNotNull; import static java.lang.Math.max; import static java.lang.Math.min; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.http2.StreamByteDistributor.Writer; +import io.netty.util.internal.logging.InternalLogger; +import io.netty.util.internal.logging.InternalLoggerFactory; + +import java.util.ArrayDeque; +import java.util.Deque; + /** * Basic implementation of {@link Http2RemoteFlowController}. *

@@ -149,7 +149,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll */ @Override public void channelHandlerContext(ChannelHandlerContext ctx) throws Http2Exception { - this.ctx = ctx; + this.ctx = checkNotNull(ctx, "ctx"); // Writing the pending bytes will not check writability change and instead a writability change notification // to be provided by an explicit call. @@ -652,7 +652,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll } /** - * Abstract class which provides common functionality for {@link WritabilityMonitorfoo} implementations. + * Abstract class which provides common functionality for writability monitor implementations. */ private abstract class WritabilityMonitor { private long totalPendingBytes; diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DelegatingDecompressorFrameListener.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DelegatingDecompressorFrameListener.java index dca4209a9a..aa9a6114a5 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/DelegatingDecompressorFrameListener.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DelegatingDecompressorFrameListener.java @@ -294,6 +294,11 @@ public class DelegatingDecompressorFrameListener extends Http2FrameListenerDecor this.flowController = checkNotNull(flowController, "flowController"); } + @Override + public Http2LocalFlowController frameWriter(Http2FrameWriter frameWriter) { + return flowController.frameWriter(frameWriter); + } + @Override public void channelHandlerContext(ChannelHandlerContext ctx) throws Http2Exception { flowController.channelHandlerContext(ctx); diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ConnectionHandler.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ConnectionHandler.java index 19097bad29..8b2a25e787 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ConnectionHandler.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ConnectionHandler.java @@ -772,7 +772,8 @@ public class Http2ConnectionHandler extends ByteToMessageDecoder implements Http * @param cause the exception that was caught * @param http2Ex the {@link StreamException} that is embedded in the causality chain. */ - protected void onStreamError(ChannelHandlerContext ctx, Throwable cause, StreamException http2Ex) { + protected void onStreamError(ChannelHandlerContext ctx, @SuppressWarnings("unused") Throwable cause, + StreamException http2Ex) { resetStream(ctx, http2Ex.streamId(), http2Ex.error().code(), ctx.newPromise()); } diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2LocalFlowController.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2LocalFlowController.java index e9f3a6308c..25e6b4d9b3 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2LocalFlowController.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2LocalFlowController.java @@ -20,6 +20,13 @@ import io.netty.buffer.ByteBuf; * A {@link Http2FlowController} for controlling the inbound flow of {@code DATA} frames from the remote endpoint. */ public interface Http2LocalFlowController extends Http2FlowController { + /** + * Sets the writer to be use for sending {@code WINDOW_UPDATE} frames. This must be called before any flow + * controlled data is received. + * + * @param frameWriter the HTTP/2 frame writer. + */ + Http2LocalFlowController frameWriter(Http2FrameWriter frameWriter); /** * Receives an inbound {@code DATA} frame from the remote endpoint and applies flow control policies to it for both @@ -29,7 +36,6 @@ public interface Http2LocalFlowController extends Http2FlowController { * If {@code stream} is {@code null} or closed, flow control should only be applied to the connection window and the * bytes are immediately consumed. * - * @param ctx the context from the handler where the frame was read. * @param stream the subject stream for the received frame. The connection stream object must not be used. If {@code * stream} is {@code null} or closed, flow control should only be applied to the connection window and the bytes are * immediately consumed. diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2RemoteFlowController.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2RemoteFlowController.java index 052146f644..279129c363 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2RemoteFlowController.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2RemoteFlowController.java @@ -90,8 +90,8 @@ public interface Http2RemoteFlowController extends Http2FlowController { /** * Called to indicate that an error occurred before this object could be completely written. *

- * The {@link Http2RemoteFlowController} will make exactly one call to either {@link #error(Throwable)} or - * {@link #writeComplete()}. + * The {@link Http2RemoteFlowController} will make exactly one call to either + * this method or {@link #writeComplete()}. *

* * @param ctx The context to use if any communication needs to occur as a result of the error. @@ -103,8 +103,8 @@ public interface Http2RemoteFlowController extends Http2FlowController { /** * Called after this object has been successfully written. *

- * The {@link Http2RemoteFlowController} will make exactly one call to either {@link #error(Throwable)} or - * {@link #writeComplete()}. + * The {@link Http2RemoteFlowController} will make exactly one call to either + * this method or {@link #error(ChannelHandlerContext, Throwable)}. *

*/ void writeComplete(); @@ -116,7 +116,7 @@ public interface Http2RemoteFlowController extends Http2FlowController { * the payload is fully written, i.e it's size after the write is 0. *

* When an exception is thrown the {@link Http2RemoteFlowController} will make a call to - * {@link #error(Throwable)}. + * {@link #error(ChannelHandlerContext, Throwable)}. *

* * @param ctx The context to use for writing. diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/DataCompressionHttp2Test.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/DataCompressionHttp2Test.java index bd7513a3da..8f3a1ca3f5 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/DataCompressionHttp2Test.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/DataCompressionHttp2Test.java @@ -296,8 +296,13 @@ public class DataCompressionHttp2Test { @Override protected void initChannel(Channel ch) throws Exception { ChannelPipeline p = ch.pipeline(); + Http2FrameWriter frameWriter = new DefaultHttp2FrameWriter(); + serverConnection.remote().flowController( + new DefaultHttp2RemoteFlowController(serverConnection)); + serverConnection.local().flowController( + new DefaultHttp2LocalFlowController(serverConnection).frameWriter(frameWriter)); Http2ConnectionEncoder encoder = new CompressorHttp2ConnectionEncoder( - new DefaultHttp2ConnectionEncoder(serverConnection, new DefaultHttp2FrameWriter())); + new DefaultHttp2ConnectionEncoder(serverConnection, frameWriter)); Http2ConnectionDecoder decoder = new DefaultHttp2ConnectionDecoder(serverConnection, encoder, new DefaultHttp2FrameReader()); Http2ConnectionHandler connectionHandler = new Http2ConnectionHandler.Builder() @@ -314,8 +319,14 @@ public class DataCompressionHttp2Test { @Override protected void initChannel(Channel ch) throws Exception { ChannelPipeline p = ch.pipeline(); + Http2FrameWriter frameWriter = new DefaultHttp2FrameWriter(); + clientConnection.remote().flowController( + new DefaultHttp2RemoteFlowController(clientConnection)); + clientConnection.local().flowController( + new DefaultHttp2LocalFlowController(clientConnection).frameWriter(frameWriter)); clientEncoder = new CompressorHttp2ConnectionEncoder( - new DefaultHttp2ConnectionEncoder(clientConnection, new DefaultHttp2FrameWriter())); + new DefaultHttp2ConnectionEncoder(clientConnection, frameWriter)); + Http2ConnectionDecoder decoder = new DefaultHttp2ConnectionDecoder(clientConnection, clientEncoder, new DefaultHttp2FrameReader()); diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionDecoderTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionDecoderTest.java index 8c41d38bdc..042a399913 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionDecoderTest.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionDecoderTest.java @@ -221,6 +221,7 @@ public class DefaultHttp2ConnectionDecoderTest { verify(localFlow) .receiveFlowControlledFrame(eq((Http2Stream) null), eq(data), eq(padding), eq(true)); verify(localFlow).consumeBytes(eq((Http2Stream) null), eq(processedBytes)); + verify(localFlow).frameWriter(any(Http2FrameWriter.class)); verifyNoMoreInteractions(localFlow); verify(listener, never()).onDataRead(eq(ctx), anyInt(), any(ByteBuf.class), anyInt(), anyBoolean()); } finally { @@ -240,6 +241,7 @@ public class DefaultHttp2ConnectionDecoderTest { verify(localFlow) .receiveFlowControlledFrame(eq((Http2Stream) null), eq(data), eq(padding), eq(true)); verify(localFlow).consumeBytes(eq((Http2Stream) null), eq(processedBytes)); + verify(localFlow).frameWriter(any(Http2FrameWriter.class)); verifyNoMoreInteractions(localFlow); // Verify that the event was absorbed and not propagated to the observer. diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2LocalFlowControllerTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2LocalFlowControllerTest.java index 553f1e769b..2b834adc11 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2LocalFlowControllerTest.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2LocalFlowControllerTest.java @@ -15,6 +15,7 @@ package io.netty.handler.codec.http2; +import static io.netty.handler.codec.http2.DefaultHttp2LocalFlowController.DEFAULT_WINDOW_UPDATE_RATIO; import static io.netty.handler.codec.http2.Http2CodecUtil.CONNECTION_STREAM_ID; import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_WINDOW_SIZE; import static org.junit.Assert.assertEquals; @@ -26,14 +27,15 @@ import static org.mockito.Matchers.eq; import static org.mockito.Mockito.never; import static org.mockito.Mockito.reset; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; + import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; import io.netty.util.concurrent.EventExecutor; import junit.framework.AssertionFailedError; - import org.junit.Before; import org.junit.Test; import org.mockito.Mock; @@ -64,23 +66,16 @@ public class DefaultHttp2LocalFlowControllerTest { private DefaultHttp2Connection connection; - private static float updateRatio = 0.5f; - @Before public void setup() throws Http2Exception { MockitoAnnotations.initMocks(this); when(ctx.newPromise()).thenReturn(promise); when(ctx.flush()).thenThrow(new AssertionFailedError("forbidden")); - - connection = new DefaultHttp2Connection(false); - controller = new DefaultHttp2LocalFlowController(connection, frameWriter, updateRatio); - connection.local().flowController(controller); - - connection.local().createStream(STREAM_ID, false); - controller.channelHandlerContext(ctx); when(ctx.executor()).thenReturn(executor); when(executor.inEventLoop()).thenReturn(true); + + initController(false); } @Test @@ -91,17 +86,39 @@ public class DefaultHttp2LocalFlowControllerTest { @Test public void windowUpdateShouldSendOnceBytesReturned() throws Http2Exception { - int dataSize = (int) (DEFAULT_WINDOW_SIZE * updateRatio) + 1; + int dataSize = (int) (DEFAULT_WINDOW_SIZE * DEFAULT_WINDOW_UPDATE_RATIO) + 1; receiveFlowControlledFrame(STREAM_ID, dataSize, 0, false); // Return only a few bytes and verify that the WINDOW_UPDATE hasn't been sent. assertFalse(consumeBytes(STREAM_ID, 10)); + verifyWindowUpdateNotSent(STREAM_ID); verifyWindowUpdateNotSent(CONNECTION_STREAM_ID); // Return the rest and verify the WINDOW_UPDATE is sent. assertTrue(consumeBytes(STREAM_ID, dataSize - 10)); verifyWindowUpdateSent(STREAM_ID, dataSize); verifyWindowUpdateSent(CONNECTION_STREAM_ID, dataSize); + verifyNoMoreInteractions(frameWriter); + } + + @Test + public void connectionWindowShouldAutoRefillWhenDataReceived() throws Http2Exception { + // Reconfigure controller to auto-refill the connection window. + initController(true); + + int dataSize = (int) (DEFAULT_WINDOW_SIZE * DEFAULT_WINDOW_UPDATE_RATIO) + 1; + receiveFlowControlledFrame(STREAM_ID, dataSize, 0, false); + // Verify that we immediately refill the connection window. + verifyWindowUpdateSent(CONNECTION_STREAM_ID, dataSize); + + // Return only a few bytes and verify that the WINDOW_UPDATE hasn't been sent for the stream. + assertFalse(consumeBytes(STREAM_ID, 10)); + verifyWindowUpdateNotSent(STREAM_ID); + + // Return the rest and verify the WINDOW_UPDATE is sent for the stream. + assertTrue(consumeBytes(STREAM_ID, dataSize - 10)); + verifyWindowUpdateSent(STREAM_ID, dataSize); + verifyNoMoreInteractions(frameWriter); } @Test(expected = Http2Exception.class) @@ -112,7 +129,7 @@ public class DefaultHttp2LocalFlowControllerTest { @Test public void windowUpdateShouldNotBeSentAfterEndOfStream() throws Http2Exception { - int dataSize = (int) (DEFAULT_WINDOW_SIZE * updateRatio) + 1; + int dataSize = (int) (DEFAULT_WINDOW_SIZE * DEFAULT_WINDOW_UPDATE_RATIO) + 1; // Set end-of-stream on the frame, so no window update will be sent for the stream. receiveFlowControlledFrame(STREAM_ID, dataSize, 0, true); @@ -126,7 +143,7 @@ public class DefaultHttp2LocalFlowControllerTest { @Test public void halfWindowRemainingShouldUpdateAllWindows() throws Http2Exception { - int dataSize = (int) (DEFAULT_WINDOW_SIZE * updateRatio) + 1; + int dataSize = (int) (DEFAULT_WINDOW_SIZE * DEFAULT_WINDOW_UPDATE_RATIO) + 1; int initialWindowSize = DEFAULT_WINDOW_SIZE; int windowDelta = getWindowDelta(initialWindowSize, initialWindowSize, dataSize); @@ -175,7 +192,7 @@ public class DefaultHttp2LocalFlowControllerTest { assertEquals(DEFAULT_WINDOW_SIZE, window(CONNECTION_STREAM_ID)); // Test that both stream and connection window are updated (or not updated) together - int data1 = (int) (DEFAULT_WINDOW_SIZE * updateRatio) + 1; + int data1 = (int) (DEFAULT_WINDOW_SIZE * DEFAULT_WINDOW_UPDATE_RATIO) + 1; receiveFlowControlledFrame(STREAM_ID, data1, 0, false); verifyWindowUpdateNotSent(STREAM_ID); verifyWindowUpdateNotSent(CONNECTION_STREAM_ID); @@ -221,6 +238,17 @@ public class DefaultHttp2LocalFlowControllerTest { assertEquals(0, controller.unconsumedBytes(connection.connectionStream())); } + @Test + public void closeShouldNotConsumeConnectionWindowWhenAutoRefilled() throws Http2Exception { + // Reconfigure controller to auto-refill the connection window. + initController(true); + + receiveFlowControlledFrame(STREAM_ID, 10, 0, false); + assertEquals(0, controller.unconsumedBytes(connection.connectionStream())); + stream(STREAM_ID).close(); + assertEquals(0, controller.unconsumedBytes(connection.connectionStream())); + } + @Test public void dataReceivedForClosedStreamShouldImmediatelyConsumeBytes() throws Http2Exception { Http2Stream stream = stream(STREAM_ID); @@ -276,7 +304,7 @@ public class DefaultHttp2LocalFlowControllerTest { reset(frameWriter); try { int data1 = (int) (newDefaultWindowSize * ratio) + 1; - int data2 = (int) (DEFAULT_WINDOW_SIZE * updateRatio) >> 1; + int data2 = (int) (DEFAULT_WINDOW_SIZE * DEFAULT_WINDOW_UPDATE_RATIO) >> 1; receiveFlowControlledFrame(STREAM_ID, data2, 0, false); receiveFlowControlledFrame(newStreamId, data1, 0, false); verifyWindowUpdateNotSent(STREAM_ID); @@ -348,4 +376,13 @@ public class DefaultHttp2LocalFlowControllerTest { private Http2Stream stream(int streamId) { return connection.stream(streamId); } + + private void initController(boolean autoRefillConnectionWindow) throws Http2Exception { + connection = new DefaultHttp2Connection(false); + controller = new DefaultHttp2LocalFlowController(connection, + DEFAULT_WINDOW_UPDATE_RATIO, autoRefillConnectionWindow).frameWriter(frameWriter); + connection.local().flowController(controller); + connection.local().createStream(STREAM_ID, false); + controller.channelHandlerContext(ctx); + } } diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/StreamBufferingEncoderTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/StreamBufferingEncoderTest.java index 0d1bed87e8..4e787d6e0a 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/StreamBufferingEncoderTest.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/StreamBufferingEncoderTest.java @@ -105,6 +105,8 @@ public class StreamBufferingEncoderTest { .thenAnswer(successAnswer()); connection = new DefaultHttp2Connection(false); + connection.remote().flowController(new DefaultHttp2RemoteFlowController(connection)); + connection.local().flowController(new DefaultHttp2LocalFlowController(connection).frameWriter(writer)); DefaultHttp2ConnectionEncoder defaultEncoder = new DefaultHttp2ConnectionEncoder(connection, writer); diff --git a/microbench/src/main/java/io/netty/microbench/http2/NoopHttp2LocalFlowController.java b/microbench/src/main/java/io/netty/microbench/http2/NoopHttp2LocalFlowController.java index e1c3478df4..5e7230c014 100644 --- a/microbench/src/main/java/io/netty/microbench/http2/NoopHttp2LocalFlowController.java +++ b/microbench/src/main/java/io/netty/microbench/http2/NoopHttp2LocalFlowController.java @@ -19,6 +19,7 @@ import static io.netty.handler.codec.http2.Http2CodecUtil.MAX_INITIAL_WINDOW_SIZ import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.http2.Http2Exception; +import io.netty.handler.codec.http2.Http2FrameWriter; import io.netty.handler.codec.http2.Http2LocalFlowController; import io.netty.handler.codec.http2.Http2Stream; @@ -68,4 +69,9 @@ public final class NoopHttp2LocalFlowController implements Http2LocalFlowControl @Override public void channelHandlerContext(ChannelHandlerContext ctx) throws Http2Exception { } + + @Override + public Http2LocalFlowController frameWriter(Http2FrameWriter frameWriter) { + return this; + } }