Adding window maintenance flag to HTTP/2 inbound flow control

Motivation:

Currently, window maintenance is automatically performed when a flow
control window drops below half its initial size. We should provide a
way for advanced applications to determine whether or not this should be
done on a per-stream basis.

Modifications:

Modifying DefaultHttp2InboundFlowController to allow enabling/disabling
of window maintenance per stream.

Result:

Inbound flow control window maintenance will be dynamically
configurable.
This commit is contained in:
nmittler 2014-08-25 11:22:54 -07:00
parent e166780f0b
commit 6409a5a1d5
2 changed files with 107 additions and 23 deletions

View File

@ -26,6 +26,17 @@ import io.netty.channel.ChannelHandlerContext;
* Basic implementation of {@link Http2InboundFlowController}.
*/
public class DefaultHttp2InboundFlowController implements Http2InboundFlowController {
/**
* The default ratio of window size to initial window size below which a {@code WINDOW_UPDATE}
* is sent to expand the window.
*/
public static final double DEFAULT_WINDOW_UPDATE_RATIO = 0.5;
/**
* A value for the window update ratio to be use in order to disable window updates for
* a stream (i.e. {@code 0}).
*/
public static final double WINDOW_UPDATE_OFF = 0.0;
private final Http2Connection connection;
private final Http2FrameWriter frameWriter;
@ -70,6 +81,38 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro
return initialWindowSize;
}
/**
* Sets the per-stream ratio used to control when window update is performed. The value
* specified is a ratio of the window size to the initial window size, below which a
* {@code WINDOW_UPDATE} should be sent to expand the window. If the given value is
* {@link #WINDOW_UPDATE_OFF} (i.e. {@code 0}) window updates will be disabled for the
* stream.
*
* @throws IllegalArgumentException if the stream does not exist or if the ratio value is
* outside of the range 0 (inclusive) to 1 (exclusive).
*/
public void setWindowUpdateRatio(ChannelHandlerContext ctx, int streamId, double ratio) {
if (ratio < WINDOW_UPDATE_OFF || ratio >= 1.0) {
throw new IllegalArgumentException("Invalid ratio: " + ratio);
}
InboundFlowState state = state(streamId);
if (state == null) {
throw new IllegalArgumentException("Stream does not exist: " + streamId);
}
// Set the ratio.
state.setWindowUpdateRatio(ratio);
// In the event of enabling window update, check to see if we need to update the window now.
try {
state.updateWindowIfAppropriate(ctx);
} catch (Http2Exception e) {
// Shouldn't happen since we only increase the window.
throw new RuntimeException(e);
}
}
@Override
public void onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, boolean endOfStream)
throws Http2Exception {
@ -124,14 +167,8 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro
InboundFlowState connectionState = connectionState();
connectionState.addAndGet(-dataLength);
// If less than the window update threshold remains, restore the window size
// to the initial value and send a window update to the remote endpoint indicating
// the new window size.
if (connectionState.window() <= getWindowUpdateThreshold()) {
connectionState.updateWindow(ctx);
return true;
}
return false;
// If appropriate, send a WINDOW_UPDATE frame to open the window.
return connectionState.updateWindowIfAppropriate(ctx);
}
/**
@ -145,22 +182,10 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro
// was exceeded.
InboundFlowState state = stateOrFail(streamId);
state.addAndGet(-dataLength);
state.endOfStream(endOfStream);
// If less than the window update threshold remains, restore the window size
// to the initial value and send a window update to the remote endpoint indicating
// the new window size.
if (state.window() <= getWindowUpdateThreshold() && !endOfStream) {
state.updateWindow(ctx);
return true;
}
return false;
}
/**
* Gets the threshold for a window size below which a window update should be issued.
*/
private int getWindowUpdateThreshold() {
return initialWindowSize / 2;
// If appropriate, send a WINDOW_UPDATE frame to open the window.
return state.updateWindowIfAppropriate(ctx);
}
/**
@ -170,6 +195,8 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro
private final int streamId;
private int window;
private int lowerBound;
private boolean endOfStream;
private double windowUpdateRatio = DEFAULT_WINDOW_UPDATE_RATIO;
InboundFlowState(int streamId) {
this.streamId = streamId;
@ -181,6 +208,35 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro
return window;
}
void endOfStream(boolean endOfStream) {
this.endOfStream = endOfStream;
}
/**
* Enables or disables window updates for this stream.
*/
void setWindowUpdateRatio(double ratio) {
windowUpdateRatio = ratio;
}
/**
* Updates the flow control window for this stream if it is appropriate.
*
* @return {@code} true if a {@code WINDOW_UPDATE} frame was sent.
*/
boolean updateWindowIfAppropriate(ChannelHandlerContext ctx) throws Http2Exception {
if (windowUpdateRatio == WINDOW_UPDATE_OFF || endOfStream || initialWindowSize <= 0) {
return false;
}
int threshold = (int) (initialWindowSize * windowUpdateRatio);
if (window <= threshold) {
updateWindow(ctx);
return true;
}
return false;
}
/**
* Adds the given delta to the window size and returns the new value.
*

View File

@ -15,6 +15,7 @@
package io.netty.handler.codec.http2;
import static io.netty.handler.codec.http2.DefaultHttp2InboundFlowController.WINDOW_UPDATE_OFF;
import static io.netty.handler.codec.http2.Http2CodecUtil.CONNECTION_STREAM_ID;
import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_WINDOW_SIZE;
import static org.mockito.Matchers.any;
@ -91,6 +92,16 @@ public class DefaultHttp2InboundFlowControllerTest {
verifyWindowUpdateSent(CONNECTION_STREAM_ID, windowDelta);
}
@Test
public void windowMaintenanceDisabledForConnectionShouldNotUpdateWindow() throws Http2Exception {
controller.setWindowUpdateRatio(ctx, CONNECTION_STREAM_ID, WINDOW_UPDATE_OFF);
int dataSize = DEFAULT_WINDOW_SIZE / 2 + 1;
// Set end-of-stream on the frame, so no window update will be sent for the stream.
applyFlowControl(dataSize, 0, true);
verifyWindowUpdateNotSent();
}
@Test
public void halfWindowRemainingShouldUpdateAllWindows() throws Http2Exception {
int dataSize = DEFAULT_WINDOW_SIZE / 2 + 1;
@ -103,6 +114,19 @@ public class DefaultHttp2InboundFlowControllerTest {
verifyWindowUpdateSent(STREAM_ID, windowDelta);
}
@Test
public void windowMaintenanceDisabledForStreamShouldNotUpdateWindow() throws Http2Exception {
controller.setWindowUpdateRatio(ctx, STREAM_ID, WINDOW_UPDATE_OFF);
int dataSize = DEFAULT_WINDOW_SIZE / 2 + 1;
int initialWindowSize = DEFAULT_WINDOW_SIZE;
int windowDelta = getWindowDelta(initialWindowSize, initialWindowSize, dataSize);
// Don't set end-of-stream so we'll get a window update for the stream as well.
applyFlowControl(dataSize, 0, false);
verifyWindowUpdateSent(CONNECTION_STREAM_ID, windowDelta);
verifyWindowUpdateNotSent(STREAM_ID);
}
@Test
public void initialWindowUpdateShouldAllowMoreFrames() throws Http2Exception {
// Send a frame that takes up the entire window.
@ -145,6 +169,10 @@ public class DefaultHttp2InboundFlowControllerTest {
verify(frameWriter).writeWindowUpdate(eq(ctx), eq(streamId), eq(windowSizeIncrement), eq(promise));
}
private void verifyWindowUpdateNotSent(int streamId) {
verify(frameWriter, never()).writeWindowUpdate(eq(ctx), eq(streamId), anyInt(), eq(promise));
}
private void verifyWindowUpdateNotSent() throws Http2Exception {
verify(frameWriter, never()).writeWindowUpdate(any(ChannelHandlerContext.class), anyInt(),
anyInt(), any(ChannelPromise.class));