diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2InboundFlowController.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2InboundFlowController.java index 2ebf1abfd4..adbf64eba8 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2InboundFlowController.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2InboundFlowController.java @@ -26,6 +26,17 @@ import io.netty.channel.ChannelHandlerContext; * Basic implementation of {@link Http2InboundFlowController}. */ public class DefaultHttp2InboundFlowController implements Http2InboundFlowController { + /** + * The default ratio of window size to initial window size below which a {@code WINDOW_UPDATE} + * is sent to expand the window. + */ + public static final double DEFAULT_WINDOW_UPDATE_RATIO = 0.5; + + /** + * A value for the window update ratio to be use in order to disable window updates for + * a stream (i.e. {@code 0}). + */ + public static final double WINDOW_UPDATE_OFF = 0.0; private final Http2Connection connection; private final Http2FrameWriter frameWriter; @@ -70,6 +81,38 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro return initialWindowSize; } + /** + * Sets the per-stream ratio used to control when window update is performed. The value + * specified is a ratio of the window size to the initial window size, below which a + * {@code WINDOW_UPDATE} should be sent to expand the window. If the given value is + * {@link #WINDOW_UPDATE_OFF} (i.e. {@code 0}) window updates will be disabled for the + * stream. + * + * @throws IllegalArgumentException if the stream does not exist or if the ratio value is + * outside of the range 0 (inclusive) to 1 (exclusive). + */ + public void setWindowUpdateRatio(ChannelHandlerContext ctx, int streamId, double ratio) { + if (ratio < WINDOW_UPDATE_OFF || ratio >= 1.0) { + throw new IllegalArgumentException("Invalid ratio: " + ratio); + } + + InboundFlowState state = state(streamId); + if (state == null) { + throw new IllegalArgumentException("Stream does not exist: " + streamId); + } + + // Set the ratio. + state.setWindowUpdateRatio(ratio); + + // In the event of enabling window update, check to see if we need to update the window now. + try { + state.updateWindowIfAppropriate(ctx); + } catch (Http2Exception e) { + // Shouldn't happen since we only increase the window. + throw new RuntimeException(e); + } + } + @Override public void onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, boolean endOfStream) throws Http2Exception { @@ -124,14 +167,8 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro InboundFlowState connectionState = connectionState(); connectionState.addAndGet(-dataLength); - // If less than the window update threshold remains, restore the window size - // to the initial value and send a window update to the remote endpoint indicating - // the new window size. - if (connectionState.window() <= getWindowUpdateThreshold()) { - connectionState.updateWindow(ctx); - return true; - } - return false; + // If appropriate, send a WINDOW_UPDATE frame to open the window. + return connectionState.updateWindowIfAppropriate(ctx); } /** @@ -145,22 +182,10 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro // was exceeded. InboundFlowState state = stateOrFail(streamId); state.addAndGet(-dataLength); + state.endOfStream(endOfStream); - // If less than the window update threshold remains, restore the window size - // to the initial value and send a window update to the remote endpoint indicating - // the new window size. - if (state.window() <= getWindowUpdateThreshold() && !endOfStream) { - state.updateWindow(ctx); - return true; - } - return false; - } - - /** - * Gets the threshold for a window size below which a window update should be issued. - */ - private int getWindowUpdateThreshold() { - return initialWindowSize / 2; + // If appropriate, send a WINDOW_UPDATE frame to open the window. + return state.updateWindowIfAppropriate(ctx); } /** @@ -170,6 +195,8 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro private final int streamId; private int window; private int lowerBound; + private boolean endOfStream; + private double windowUpdateRatio = DEFAULT_WINDOW_UPDATE_RATIO; InboundFlowState(int streamId) { this.streamId = streamId; @@ -181,6 +208,35 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro return window; } + void endOfStream(boolean endOfStream) { + this.endOfStream = endOfStream; + } + + /** + * Enables or disables window updates for this stream. + */ + void setWindowUpdateRatio(double ratio) { + windowUpdateRatio = ratio; + } + + /** + * Updates the flow control window for this stream if it is appropriate. + * + * @return {@code} true if a {@code WINDOW_UPDATE} frame was sent. + */ + boolean updateWindowIfAppropriate(ChannelHandlerContext ctx) throws Http2Exception { + if (windowUpdateRatio == WINDOW_UPDATE_OFF || endOfStream || initialWindowSize <= 0) { + return false; + } + + int threshold = (int) (initialWindowSize * windowUpdateRatio); + if (window <= threshold) { + updateWindow(ctx); + return true; + } + return false; + } + /** * Adds the given delta to the window size and returns the new value. * diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2InboundFlowControllerTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2InboundFlowControllerTest.java index 50365086c8..69436402ae 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2InboundFlowControllerTest.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2InboundFlowControllerTest.java @@ -15,6 +15,7 @@ package io.netty.handler.codec.http2; +import static io.netty.handler.codec.http2.DefaultHttp2InboundFlowController.WINDOW_UPDATE_OFF; import static io.netty.handler.codec.http2.Http2CodecUtil.CONNECTION_STREAM_ID; import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_WINDOW_SIZE; import static org.mockito.Matchers.any; @@ -91,6 +92,16 @@ public class DefaultHttp2InboundFlowControllerTest { verifyWindowUpdateSent(CONNECTION_STREAM_ID, windowDelta); } + @Test + public void windowMaintenanceDisabledForConnectionShouldNotUpdateWindow() throws Http2Exception { + controller.setWindowUpdateRatio(ctx, CONNECTION_STREAM_ID, WINDOW_UPDATE_OFF); + int dataSize = DEFAULT_WINDOW_SIZE / 2 + 1; + + // Set end-of-stream on the frame, so no window update will be sent for the stream. + applyFlowControl(dataSize, 0, true); + verifyWindowUpdateNotSent(); + } + @Test public void halfWindowRemainingShouldUpdateAllWindows() throws Http2Exception { int dataSize = DEFAULT_WINDOW_SIZE / 2 + 1; @@ -103,6 +114,19 @@ public class DefaultHttp2InboundFlowControllerTest { verifyWindowUpdateSent(STREAM_ID, windowDelta); } + @Test + public void windowMaintenanceDisabledForStreamShouldNotUpdateWindow() throws Http2Exception { + controller.setWindowUpdateRatio(ctx, STREAM_ID, WINDOW_UPDATE_OFF); + int dataSize = DEFAULT_WINDOW_SIZE / 2 + 1; + int initialWindowSize = DEFAULT_WINDOW_SIZE; + int windowDelta = getWindowDelta(initialWindowSize, initialWindowSize, dataSize); + + // Don't set end-of-stream so we'll get a window update for the stream as well. + applyFlowControl(dataSize, 0, false); + verifyWindowUpdateSent(CONNECTION_STREAM_ID, windowDelta); + verifyWindowUpdateNotSent(STREAM_ID); + } + @Test public void initialWindowUpdateShouldAllowMoreFrames() throws Http2Exception { // Send a frame that takes up the entire window. @@ -145,6 +169,10 @@ public class DefaultHttp2InboundFlowControllerTest { verify(frameWriter).writeWindowUpdate(eq(ctx), eq(streamId), eq(windowSizeIncrement), eq(promise)); } + private void verifyWindowUpdateNotSent(int streamId) { + verify(frameWriter, never()).writeWindowUpdate(eq(ctx), eq(streamId), anyInt(), eq(promise)); + } + private void verifyWindowUpdateNotSent() throws Http2Exception { verify(frameWriter, never()).writeWindowUpdate(any(ChannelHandlerContext.class), anyInt(), anyInt(), any(ChannelPromise.class));