HTTP/2 Inbound Flow Control Connection Window Issues

Motivation:
The inbound flow control code was returning too many bytes to the connection window.  This was resulting in GO_AWAYs being generated by peers with the error code indicating a flow control issue.  Bytes were being returned to the connection window before the call to returnProcessedBytes. All of the state representing the connection window was not updated when a local settings event occurred.

Modifications:
The DefaultHttp2InboundFlowController will be updated to correct the above defects.
The unit tests will be updated to reflect the changes.

Result:
Inbound flow control algorithm does not cause peers to send flow control errors for the above mentioned cases.
This commit is contained in:
Scott Mitchell 2014-11-29 23:53:54 -05:00
parent 91da1e35ab
commit 2d10b252f9
13 changed files with 459 additions and 330 deletions

View File

@ -141,7 +141,7 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder {
Http2FrameReader.Configuration config = frameReader.configuration(); Http2FrameReader.Configuration config = frameReader.configuration();
Http2HeaderTable headerTable = config.headerTable(); Http2HeaderTable headerTable = config.headerTable();
Http2FrameSizePolicy frameSizePolicy = config.frameSizePolicy(); Http2FrameSizePolicy frameSizePolicy = config.frameSizePolicy();
settings.initialWindowSize(inboundFlow.initialInboundWindowSize()); settings.initialWindowSize(inboundFlow.initialWindowSize());
settings.maxConcurrentStreams(connection.remote().maxStreams()); settings.maxConcurrentStreams(connection.remote().maxStreams());
settings.headerTableSize(headerTable.maxHeaderTableSize()); settings.headerTableSize(headerTable.maxHeaderTableSize());
settings.maxFrameSize(frameSizePolicy.maxFrameSize()); settings.maxFrameSize(frameSizePolicy.maxFrameSize());
@ -189,7 +189,7 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder {
Integer initialWindowSize = settings.initialWindowSize(); Integer initialWindowSize = settings.initialWindowSize();
if (initialWindowSize != null) { if (initialWindowSize != null) {
inboundFlow.initialInboundWindowSize(initialWindowSize); inboundFlow.initialWindowSize(initialWindowSize);
} }
} }
@ -452,7 +452,7 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder {
Integer initialWindowSize = settings.initialWindowSize(); Integer initialWindowSize = settings.initialWindowSize();
if (initialWindowSize != null) { if (initialWindowSize != null) {
inboundFlow.initialInboundWindowSize(initialWindowSize); inboundFlow.initialWindowSize(initialWindowSize);
} }
} }
@ -462,7 +462,6 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder {
// Acknowledge receipt of the settings. // Acknowledge receipt of the settings.
encoder.writeSettingsAck(ctx, ctx.newPromise()); encoder.writeSettingsAck(ctx, ctx.newPromise());
ctx.flush();
// We've received at least one non-ack settings frame from the remote endpoint. // We've received at least one non-ack settings frame from the remote endpoint.
prefaceReceived = true; prefaceReceived = true;

View File

@ -398,7 +398,9 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
@Override @Override
public ChannelFuture writeSettingsAck(ChannelHandlerContext ctx, ChannelPromise promise) { public ChannelFuture writeSettingsAck(ChannelHandlerContext ctx, ChannelPromise promise) {
return frameWriter.writeSettingsAck(ctx, promise); ChannelFuture future = frameWriter.writeSettingsAck(ctx, promise);
ctx.flush();
return future;
} }
@Override @Override
@ -446,13 +448,8 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
@Override @Override
public ChannelFuture writeWindowUpdate(ChannelHandlerContext ctx, int streamId, int windowSizeIncrement, public ChannelFuture writeWindowUpdate(ChannelHandlerContext ctx, int streamId, int windowSizeIncrement,
ChannelPromise promise) { ChannelPromise promise) {
if (streamId > 0) { return promise.setFailure(new UnsupportedOperationException("Use the Http2[Inbound|Outbound]FlowController" +
Http2Stream stream = connection().stream(streamId); " objects to control window sizes"));
if (stream != null && stream.isResetSent()) {
throw new IllegalStateException("Sending data after sending RST_STREAM.");
}
}
return frameWriter.writeWindowUpdate(ctx, streamId, windowSizeIncrement, promise);
} }
@Override @Override

View File

@ -14,12 +14,14 @@
*/ */
package io.netty.handler.codec.http2; package io.netty.handler.codec.http2;
import static io.netty.handler.codec.http2.Http2CodecUtil.SETTINGS_INITIAL_WINDOW_SIZE;
import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_MAX_FRAME_SIZE; import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_MAX_FRAME_SIZE;
import static io.netty.handler.codec.http2.Http2CodecUtil.FRAME_HEADER_LENGTH; import static io.netty.handler.codec.http2.Http2CodecUtil.FRAME_HEADER_LENGTH;
import static io.netty.handler.codec.http2.Http2CodecUtil.INT_FIELD_LENGTH; import static io.netty.handler.codec.http2.Http2CodecUtil.INT_FIELD_LENGTH;
import static io.netty.handler.codec.http2.Http2CodecUtil.PRIORITY_ENTRY_LENGTH; import static io.netty.handler.codec.http2.Http2CodecUtil.PRIORITY_ENTRY_LENGTH;
import static io.netty.handler.codec.http2.Http2CodecUtil.SETTINGS_MAX_FRAME_SIZE; import static io.netty.handler.codec.http2.Http2CodecUtil.SETTINGS_MAX_FRAME_SIZE;
import static io.netty.handler.codec.http2.Http2CodecUtil.SETTING_ENTRY_LENGTH; import static io.netty.handler.codec.http2.Http2CodecUtil.SETTING_ENTRY_LENGTH;
import static io.netty.handler.codec.http2.Http2Error.FLOW_CONTROL_ERROR;
import static io.netty.handler.codec.http2.Http2Error.FRAME_SIZE_ERROR; import static io.netty.handler.codec.http2.Http2Error.FRAME_SIZE_ERROR;
import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR; import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR;
import static io.netty.handler.codec.http2.Http2CodecUtil.isMaxFrameSizeValid; import static io.netty.handler.codec.http2.Http2CodecUtil.isMaxFrameSizeValid;
@ -485,10 +487,13 @@ public class DefaultHttp2FrameReader implements Http2FrameReader, Http2FrameSize
try { try {
settings.put(id, value); settings.put(id, value);
} catch (IllegalArgumentException e) { } catch (IllegalArgumentException e) {
if (id == SETTINGS_MAX_FRAME_SIZE) { switch(id) {
throw new Http2Exception(FRAME_SIZE_ERROR, e.getMessage(), e); case SETTINGS_MAX_FRAME_SIZE:
} else { throw connectionError(FRAME_SIZE_ERROR, e, e.getMessage());
throw new Http2Exception(PROTOCOL_ERROR, e.getMessage(), e); case SETTINGS_INITIAL_WINDOW_SIZE:
throw connectionError(FLOW_CONTROL_ERROR, e, e.getMessage());
default:
throw connectionError(PROTOCOL_ERROR, e, e.getMessage());
} }
} }
} }

View File

@ -17,49 +17,46 @@ package io.netty.handler.codec.http2;
import static io.netty.handler.codec.http2.Http2CodecUtil.CONNECTION_STREAM_ID; import static io.netty.handler.codec.http2.Http2CodecUtil.CONNECTION_STREAM_ID;
import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_WINDOW_SIZE; import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_WINDOW_SIZE;
import static io.netty.handler.codec.http2.Http2CodecUtil.MAX_INITIAL_WINDOW_SIZE;
import static io.netty.handler.codec.http2.Http2CodecUtil.MIN_INITIAL_WINDOW_SIZE;
import static io.netty.handler.codec.http2.Http2Error.FLOW_CONTROL_ERROR; import static io.netty.handler.codec.http2.Http2Error.FLOW_CONTROL_ERROR;
import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR; import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR; import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR;
import static io.netty.handler.codec.http2.Http2Exception.streamError;
import static io.netty.handler.codec.http2.Http2Exception.connectionError; import static io.netty.handler.codec.http2.Http2Exception.connectionError;
import static io.netty.handler.codec.http2.Http2Exception.streamError;
import static io.netty.util.internal.ObjectUtil.checkNotNull; import static io.netty.util.internal.ObjectUtil.checkNotNull;
import static java.lang.Math.max;
import static java.lang.Math.min;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http2.Http2Exception.CompositeStreamException;
import io.netty.handler.codec.http2.Http2Exception.StreamException;
/** /**
* Basic implementation of {@link Http2InboundFlowController}. * Basic implementation of {@link Http2InboundFlowController}.
*/ */
public class DefaultHttp2InboundFlowController implements Http2InboundFlowController { public class DefaultHttp2InboundFlowController implements Http2InboundFlowController {
private static final int DEFAULT_COMPOSITE_EXCEPTION_SIZE = 4;
/** /**
* The default ratio of window size to initial window size below which a {@code WINDOW_UPDATE} * The default ratio of window size to initial window size below which a {@code WINDOW_UPDATE}
* is sent to expand the window. * is sent to expand the window.
*/ */
public static final double DEFAULT_WINDOW_UPDATE_RATIO = 0.5; public static final float DEFAULT_WINDOW_UPDATE_RATIO = 0.5f;
/**
* The default maximum connection size used as a limit when the number of active streams is
* large. Set to 2 MiB.
*/
public static final int DEFAULT_MAX_CONNECTION_WINDOW_SIZE = 1048576 * 2;
private final Http2Connection connection; private final Http2Connection connection;
private final Http2FrameWriter frameWriter; private final Http2FrameWriter frameWriter;
private final double windowUpdateRatio; private volatile float windowUpdateRatio;
private int maxConnectionWindowSize = DEFAULT_MAX_CONNECTION_WINDOW_SIZE; private volatile int initialWindowSize = DEFAULT_WINDOW_SIZE;
private int initialWindowSize = DEFAULT_WINDOW_SIZE;
public DefaultHttp2InboundFlowController(Http2Connection connection, Http2FrameWriter frameWriter) { public DefaultHttp2InboundFlowController(Http2Connection connection, Http2FrameWriter frameWriter) {
this(connection, frameWriter, DEFAULT_WINDOW_UPDATE_RATIO); this(connection, frameWriter, DEFAULT_WINDOW_UPDATE_RATIO);
} }
public DefaultHttp2InboundFlowController(Http2Connection connection, public DefaultHttp2InboundFlowController(Http2Connection connection,
Http2FrameWriter frameWriter, double windowUpdateRatio) { Http2FrameWriter frameWriter, float windowUpdateRatio) {
this.connection = checkNotNull(connection, "connection"); this.connection = checkNotNull(connection, "connection");
this.frameWriter = checkNotNull(frameWriter, "frameWriter"); this.frameWriter = checkNotNull(frameWriter, "frameWriter");
if (Double.compare(windowUpdateRatio, 0.0) <= 0 || Double.compare(windowUpdateRatio, 1.0) >= 0) { windowUpdateRatio(windowUpdateRatio);
throw new IllegalArgumentException("Invalid ratio: " + windowUpdateRatio);
}
this.windowUpdateRatio = windowUpdateRatio;
// Add a flow state for the connection. // Add a flow state for the connection.
final Http2Stream connectionStream = connection.connectionStream(); final Http2Stream connectionStream = connection.connectionStream();
@ -78,47 +75,124 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro
}); });
} }
public DefaultHttp2InboundFlowController setMaxConnectionWindowSize(int maxConnectionWindowSize) {
if (maxConnectionWindowSize <= 0) {
throw new IllegalArgumentException("maxConnectionWindowSize must be > 0");
}
this.maxConnectionWindowSize = maxConnectionWindowSize;
return this;
}
@Override @Override
public void initialInboundWindowSize(int newWindowSize) throws Http2Exception { public void initialWindowSize(int newWindowSize) throws Http2Exception {
int deltaWindowSize = newWindowSize - initialWindowSize; int delta = newWindowSize - initialWindowSize;
initialWindowSize = newWindowSize; initialWindowSize = newWindowSize;
// Apply the delta to all of the windows. CompositeStreamException compositeException = null;
for (Http2Stream stream : connection.activeStreams()) { for (Http2Stream stream : connection.activeStreams()) {
state(stream).updatedInitialWindowSize(deltaWindowSize); try {
// Increment flow control window first so state will be consistent if overflow is detected
FlowState state = state(stream);
state.incrementFlowControlWindows(delta);
state.incrementInitialStreamWindow(delta);
} catch (StreamException e) {
if (compositeException == null) {
compositeException = new CompositeStreamException(e.error(), DEFAULT_COMPOSITE_EXCEPTION_SIZE);
}
compositeException.add(e);
}
}
if (compositeException != null) {
throw compositeException;
} }
} }
@Override @Override
public int initialInboundWindowSize() { public void initialStreamWindowSize(ChannelHandlerContext ctx, int streamId, int newWindowSize)
throws Http2Exception {
checkNotNull(ctx, "ctx");
if (newWindowSize < MIN_INITIAL_WINDOW_SIZE || newWindowSize > MAX_INITIAL_WINDOW_SIZE) {
throw new IllegalArgumentException("Invalid newWindowSize: " + newWindowSize);
}
FlowState state = stateOrFail(streamId);
state.initialStreamWindowSize(newWindowSize);
state.writeWindowUpdateIfNeeded(ctx);
}
@Override
public int initialWindowSize() {
return initialWindowSize; return initialWindowSize;
} }
@Override
public int initialStreamWindowSize(int streamId) throws Http2Exception {
return stateOrFail(streamId).initialStreamWindowSize();
}
private static void checkValidRatio(float ratio) {
if (Double.compare(ratio, 0.0) <= 0 || Double.compare(ratio, 1.0) >= 0) {
throw new IllegalArgumentException("Invalid ratio: " + ratio);
}
}
/**
* The window update ratio is used to determine when a window update must be sent. If the ratio
* of bytes processed since the last update has meet or exceeded this ratio then a window update will
* be sent. This is the global window update ratio that will be used for new streams.
* @param ratio the ratio to use when checking if a {@code WINDOW_UPDATE} is determined necessary for new streams.
* @throws IllegalArgumentException If the ratio is out of bounds (0, 1).
*/
public void windowUpdateRatio(float ratio) {
checkValidRatio(ratio);
windowUpdateRatio = ratio;
}
/**
* The window update ratio is used to determine when a window update must be sent. If the ratio
* of bytes processed since the last update has meet or exceeded this ratio then a window update will
* be sent. This is the global window update ratio that will be used for new streams.
*/
public float windowUpdateRatio() {
return windowUpdateRatio;
}
/**
* The window update ratio is used to determine when a window update must be sent. If the ratio
* of bytes processed since the last update has meet or exceeded this ratio then a window update will
* be sent. This window update ratio will only be applied to {@code streamId}.
* <p>
* Note it is the responsibly of the caller to ensure that the the
* initial {@code SETTINGS} frame is sent before this is called. It would
* be considered a {@link Http2Error#PROTOCOL_ERROR} if a {@code WINDOW_UPDATE}
* was generated by this method before the initial {@code SETTINGS} frame is sent.
* @param ctx the context to use if a {@code WINDOW_UPDATE} is determined necessary.
* @param streamId the stream for which {@code ratio} applies to.
* @param ratio the ratio to use when checking if a {@code WINDOW_UPDATE} is determined necessary.
* @throws Http2Exception If a protocol-error occurs while generating {@code WINDOW_UPDATE} frames
*/
public void windowUpdateRatio(ChannelHandlerContext ctx, int streamId, float ratio) throws Http2Exception {
checkNotNull(ctx, "ctx");
checkValidRatio(ratio);
FlowState state = stateOrFail(streamId);
state.windowUpdateRatio(ratio);
state.writeWindowUpdateIfNeeded(ctx);
}
/**
* The window update ratio is used to determine when a window update must be sent. If the ratio
* of bytes processed since the last update has meet or exceeded this ratio then a window update will
* be sent. This window update ratio will only be applied to {@code streamId}.
* @throws Http2Exception If no stream corresponding to {@code stream} could be found.
*/
public float windowUpdateRatio(int streamId) throws Http2Exception {
return stateOrFail(streamId).windowUpdateRatio();
}
@Override @Override
public void applyFlowControl(ChannelHandlerContext ctx, int streamId, ByteBuf data, public void applyFlowControl(ChannelHandlerContext ctx, int streamId, ByteBuf data,
int padding, boolean endOfStream) throws Http2Exception { int padding, boolean endOfStream) throws Http2Exception {
int dataLength = data.readableBytes() + padding; int dataLength = data.readableBytes() + padding;
int delta = -dataLength;
// Apply the connection-level flow control. Immediately return the bytes for the connection // Apply the connection-level flow control
// window so that data on this stream does not starve other stream. connectionState().applyFlowControl(dataLength);
FlowState connectionState = connectionState();
connectionState.addAndGet(delta);
connectionState.returnProcessedBytes(dataLength);
connectionState.updateWindowIfAppropriate(ctx);
// Apply the stream-level flow control, but do not return the bytes immediately. // Apply the stream-level flow control
FlowState state = stateOrFail(streamId); FlowState state = stateOrFail(streamId);
state.endOfStream(endOfStream); state.endOfStream(endOfStream);
state.addAndGet(delta); state.applyFlowControl(dataLength);
} }
private FlowState connectionState() { private FlowState connectionState() {
@ -164,13 +238,26 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro
*/ */
private int processedWindow; private int processedWindow;
/**
* This is what is used to determine how many bytes need to be returned relative to {@link #processedWindow}.
* Each stream has their own initial window size.
*/
private volatile int initialStreamWindowSize;
/**
* This is used to determine when {@link #processedWindow} is sufficiently far away from
* {@link #initialStreamWindowSize} such that a {@code WINDOW_UPDATE} should be sent.
* Each stream has their own window update ratio.
*/
private volatile float streamWindowUpdateRatio;
private int lowerBound; private int lowerBound;
private boolean endOfStream; private boolean endOfStream;
FlowState(Http2Stream stream) { FlowState(Http2Stream stream) {
this.stream = stream; this.stream = stream;
window = initialWindowSize; window = processedWindow = initialStreamWindowSize = initialWindowSize;
processedWindow = window; streamWindowUpdateRatio = windowUpdateRatio;
} }
@Override @Override
@ -182,23 +269,82 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro
this.endOfStream = endOfStream; this.endOfStream = endOfStream;
} }
float windowUpdateRatio() {
return streamWindowUpdateRatio;
}
void windowUpdateRatio(float ratio) {
streamWindowUpdateRatio = ratio;
}
int initialStreamWindowSize() {
return initialStreamWindowSize;
}
void initialStreamWindowSize(int initialWindowSize) {
initialStreamWindowSize = initialWindowSize;
}
/** /**
* Returns the initial size of this window. * Increment the initial window size for this stream.
* @param delta The amount to increase the initial window size by.
*/ */
int initialWindowSize() { void incrementInitialStreamWindow(int delta) {
int maxWindowSize = initialWindowSize; // Clip the delta so that the resulting initialStreamWindowSize falls within the allowed range.
if (stream.id() == CONNECTION_STREAM_ID) { int newValue = (int) min(MAX_INITIAL_WINDOW_SIZE,
// Determine the maximum number of streams that we can allow without integer overflow max(MIN_INITIAL_WINDOW_SIZE, initialStreamWindowSize + (long) delta));
// of maxWindowSize * numStreams. Also take care to avoid division by zero when delta = newValue - initialStreamWindowSize;
// maxWindowSize == 0.
int maxNumStreams = Integer.MAX_VALUE; initialStreamWindowSize += delta;
if (maxWindowSize > 0) { }
maxNumStreams /= maxWindowSize;
} /**
int numStreams = Math.min(maxNumStreams, Math.max(1, connection.numActiveStreams())); * Increment the windows which are used to determine many bytes have been processed.
maxWindowSize = Math.min(maxConnectionWindowSize, maxWindowSize * numStreams); * @param delta The amount to increment the window by.
* @throws Http2Exception if integer overflow occurs on the window.
*/
void incrementFlowControlWindows(int delta) throws Http2Exception {
if (delta > 0 && window > MAX_INITIAL_WINDOW_SIZE - delta) {
throw streamError(stream.id(), FLOW_CONTROL_ERROR,
"Flow control window overflowed for stream: %d", stream.id());
} }
return maxWindowSize;
window += delta;
processedWindow += delta;
lowerBound = delta < 0 ? delta : 0;
}
/**
* A flow control event has occurred and we should decrement the amount of available bytes for this stream.
* @param dataLength The amount of data to for which this stream is no longer eligible to use for flow control.
* @throws Http2Exception If too much data is used relative to how much is available.
*/
void applyFlowControl(int dataLength) throws Http2Exception {
assert dataLength > 0;
// Apply the delta. Even if we throw an exception we want to have taken this delta into account.
window -= dataLength;
// Window size can become negative if we sent a SETTINGS frame that reduces the
// size of the transfer window after the peer has written data frames.
// The value is bounded by the length that SETTINGS frame decrease the window.
// This difference is stored for the connection when writing the SETTINGS frame
// and is cleared once we send a WINDOW_UPDATE frame.
if (window < lowerBound) {
throw streamError(stream.id(), FLOW_CONTROL_ERROR,
"Flow control window exceeded for stream: %d", stream.id());
}
}
/**
* Returns the processed bytes for this stream.
*/
void returnProcessedBytes(int delta) throws Http2Exception {
if (processedWindow - delta < window) {
throw streamError(stream.id(), INTERNAL_ERROR,
"Attempting to return too many bytes for stream %d", stream.id());
}
processedWindow -= delta;
} }
@Override @Override
@ -211,9 +357,14 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro
throw new IllegalArgumentException("numBytes must be positive"); throw new IllegalArgumentException("numBytes must be positive");
} }
// Return bytes to the connection window
FlowState connectionState = connectionState();
connectionState.returnProcessedBytes(numBytes);
connectionState.writeWindowUpdateIfNeeded(ctx);
// Return the bytes processed and update the window. // Return the bytes processed and update the window.
returnProcessedBytes(numBytes); returnProcessedBytes(numBytes);
updateWindowIfAppropriate(ctx); writeWindowUpdateIfNeeded(ctx);
} }
@Override @Override
@ -229,95 +380,29 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro
/** /**
* Updates the flow control window for this stream if it is appropriate. * Updates the flow control window for this stream if it is appropriate.
*/ */
void updateWindowIfAppropriate(ChannelHandlerContext ctx) { void writeWindowUpdateIfNeeded(ChannelHandlerContext ctx) throws Http2Exception {
if (endOfStream || initialWindowSize <= 0) { if (endOfStream || initialStreamWindowSize <= 0) {
return; return;
} }
int threshold = (int) (initialWindowSize() * windowUpdateRatio); int threshold = (int) (initialStreamWindowSize * streamWindowUpdateRatio);
if (processedWindow <= threshold) { if (processedWindow <= threshold) {
updateWindow(ctx); writeWindowUpdate(ctx);
} }
} }
/** /**
* Returns the processed bytes for this stream. * Called to perform a window update for this stream (or connection). Updates the window size back
* to the size of the initial window and sends a window update frame to the remote endpoint.
*/ */
void returnProcessedBytes(int delta) throws Http2Exception { void writeWindowUpdate(ChannelHandlerContext ctx) throws Http2Exception {
if (processedWindow - delta < window) {
throw streamError(stream.id(), INTERNAL_ERROR,
"Attempting to return too many bytes for stream %d", stream.id());
}
processedWindow -= delta;
}
/**
* Adds the given delta to the window size and returns the new value.
*
* @param delta the delta in the initial window size.
* @throws Http2Exception thrown if the new window is less than the allowed lower bound.
*/
int addAndGet(int delta) throws Http2Exception {
// Apply the delta. Even if we throw an exception we want to have taken this delta into
// account.
window += delta;
if (delta > 0) {
lowerBound = 0;
}
// Window size can become negative if we sent a SETTINGS frame that reduces the
// size of the transfer window after the peer has written data frames.
// The value is bounded by the length that SETTINGS frame decrease the window.
// This difference is stored for the connection when writing the SETTINGS frame
// and is cleared once we send a WINDOW_UPDATE frame.
if (delta < 0 && window < lowerBound) {
if (stream.id() == CONNECTION_STREAM_ID) {
throw connectionError(FLOW_CONTROL_ERROR, "Connection flow control window exceeded");
} else {
throw streamError(stream.id(), FLOW_CONTROL_ERROR,
"Flow control window exceeded for stream: %d", stream.id());
}
}
return window;
}
/**
* Called when sending a SETTINGS frame with a new initial window size. If the window has
* gotten smaller (i.e. deltaWindowSize < 0), the lower bound is set to that value. This
* will temporarily allow for receipt of data frames which were sent by the remote endpoint
* before receiving the SETTINGS frame.
*
* @param delta the delta in the initial window size.
* @throws Http2Exception thrown if integer overflow occurs on the window.
*/
void updatedInitialWindowSize(int delta) throws Http2Exception {
if (delta > 0 && window > Integer.MAX_VALUE - delta) { // Integer overflow.
throw connectionError(PROTOCOL_ERROR, "Flow control window overflowed for stream: %d", stream.id());
}
window += delta;
processedWindow += delta;
if (delta < 0) {
lowerBound = delta;
}
}
/**
* Called to perform a window update for this stream (or connection). Updates the window
* size back to the size of the initial window and sends a window update frame to the remote
* endpoint.
*/
void updateWindow(ChannelHandlerContext ctx) {
// Expand the window for this stream back to the size of the initial window. // Expand the window for this stream back to the size of the initial window.
int deltaWindowSize = initialWindowSize() - processedWindow; int deltaWindowSize = initialStreamWindowSize - processedWindow;
processedWindow += deltaWindowSize;
try { try {
addAndGet(deltaWindowSize); incrementFlowControlWindows(deltaWindowSize);
} catch (Http2Exception e) { } catch (Throwable t) {
// This should never fail since we're adding. throw connectionError(INTERNAL_ERROR, t,
throw new AssertionError("Caught exception while updating window with delta: " "Attempting to return too many bytes for stream %d", stream.id());
+ deltaWindowSize);
} }
// Send a window update for the stream/connection. // Send a window update for the stream/connection.

View File

@ -22,6 +22,7 @@ import static io.netty.handler.codec.http.HttpHeaderValues.GZIP;
import static io.netty.handler.codec.http.HttpHeaderValues.IDENTITY; import static io.netty.handler.codec.http.HttpHeaderValues.IDENTITY;
import static io.netty.handler.codec.http.HttpHeaderValues.X_DEFLATE; import static io.netty.handler.codec.http.HttpHeaderValues.X_DEFLATE;
import static io.netty.handler.codec.http.HttpHeaderValues.X_GZIP; import static io.netty.handler.codec.http.HttpHeaderValues.X_GZIP;
import static io.netty.handler.codec.http2.Http2Exception.streamError;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
@ -77,53 +78,63 @@ public class DelegatingDecompressorFrameListener extends Http2FrameListenerDecor
final int compressedBytes = data.readableBytes() + padding; final int compressedBytes = data.readableBytes() + padding;
int processedBytes = 0; int processedBytes = 0;
decompressor.incrementCompressedBytes(compressedBytes); decompressor.incrementCompressedBytes(compressedBytes);
// call retain here as it will call release after its written to the channel try {
channel.writeInbound(data.retain()); // call retain here as it will call release after its written to the channel
ByteBuf buf = nextReadableBuf(channel); channel.writeInbound(data.retain());
if (buf == null && endOfStream && channel.finish()) { ByteBuf buf = nextReadableBuf(channel);
buf = nextReadableBuf(channel); if (buf == null && endOfStream && channel.finish()) {
} buf = nextReadableBuf(channel);
if (buf == null) {
if (endOfStream) {
listener.onDataRead(ctx, streamId, Unpooled.EMPTY_BUFFER, padding, true);
} }
// No new decompressed data was extracted from the compressed data. This means the application could not be if (buf == null) {
// provided with data and thus could not return how many bytes were processed. We will assume there is more if (endOfStream) {
// data coming which will complete the decompression block. To allow for more data we return all bytes to listener.onDataRead(ctx, streamId, Unpooled.EMPTY_BUFFER, padding, true);
// the flow control window (so the peer can send more data).
decompressor.incrementDecompressedByes(compressedBytes);
processedBytes = compressedBytes;
} else {
try {
decompressor.incrementDecompressedByes(padding);
for (;;) {
ByteBuf nextBuf = nextReadableBuf(channel);
boolean decompressedEndOfStream = nextBuf == null && endOfStream;
if (decompressedEndOfStream && channel.finish()) {
nextBuf = nextReadableBuf(channel);
decompressedEndOfStream = nextBuf == null;
}
decompressor.incrementDecompressedByes(buf.readableBytes());
processedBytes += listener.onDataRead(ctx, streamId, buf, padding, decompressedEndOfStream);
if (nextBuf == null) {
break;
}
padding = 0; // Padding is only communicated once on the first iteration
buf.release();
buf = nextBuf;
} }
} finally { // No new decompressed data was extracted from the compressed data. This means the application could
if (buf != null) { // not be provided with data and thus could not return how many bytes were processed. We will assume
buf.release(); // there is more data coming which will complete the decompression block. To allow for more data we
// return all bytes to the flow control window (so the peer can send more data).
decompressor.incrementDecompressedByes(compressedBytes);
processedBytes = compressedBytes;
} else {
try {
decompressor.incrementDecompressedByes(padding);
for (;;) {
ByteBuf nextBuf = nextReadableBuf(channel);
boolean decompressedEndOfStream = nextBuf == null && endOfStream;
if (decompressedEndOfStream && channel.finish()) {
nextBuf = nextReadableBuf(channel);
decompressedEndOfStream = nextBuf == null;
}
decompressor.incrementDecompressedByes(buf.readableBytes());
processedBytes += listener.onDataRead(ctx, streamId, buf, padding, decompressedEndOfStream);
if (nextBuf == null) {
break;
}
padding = 0; // Padding is only communicated once on the first iteration
buf.release();
buf = nextBuf;
}
} finally {
if (buf != null) {
buf.release();
}
} }
} }
decompressor.incrementProcessedBytes(processedBytes);
// The processed bytes will be translated to pre-decompressed byte amounts by DecompressorGarbageCollector
return processedBytes;
} catch (Http2Exception e) {
// Consider all the bytes consumed because there was an error
decompressor.incrementProcessedBytes(compressedBytes);
throw e;
} catch (Throwable t) {
// Consider all the bytes consumed because there was an error
decompressor.incrementProcessedBytes(compressedBytes);
throw streamError(stream.id(), INTERNAL_ERROR, t,
"Decompressor error detected while delegating data read on streamId %d", stream.id());
} }
decompressor.incrementProcessedBytes(processedBytes);
// The processed bytes will be translated to pre-decompressed byte amounts by DecompressorGarbageCollector
return processedBytes;
} }
@Override @Override

View File

@ -69,7 +69,7 @@ public final class Http2CodecUtil {
public static final long MIN_HEADER_TABLE_SIZE = 0; public static final long MIN_HEADER_TABLE_SIZE = 0;
public static final long MIN_CONCURRENT_STREAMS = 0; public static final long MIN_CONCURRENT_STREAMS = 0;
public static final long MIN_INITIAL_WINDOW_SIZE = 0; public static final int MIN_INITIAL_WINDOW_SIZE = 0;
public static final long MIN_HEADER_LIST_SIZE = 0; public static final long MIN_HEADER_LIST_SIZE = 0;
public static final int DEFAULT_WINDOW_SIZE = 65535; public static final int DEFAULT_WINDOW_SIZE = 65535;

View File

@ -17,9 +17,9 @@ package io.netty.handler.codec.http2;
import static io.netty.handler.codec.http2.Http2CodecUtil.HTTP_UPGRADE_STREAM_ID; import static io.netty.handler.codec.http2.Http2CodecUtil.HTTP_UPGRADE_STREAM_ID;
import static io.netty.handler.codec.http2.Http2CodecUtil.connectionPrefaceBuf; import static io.netty.handler.codec.http2.Http2CodecUtil.connectionPrefaceBuf;
import static io.netty.handler.codec.http2.Http2CodecUtil.getEmbeddedHttp2Exception; import static io.netty.handler.codec.http2.Http2CodecUtil.getEmbeddedHttp2Exception;
import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR;
import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR; import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
import static io.netty.handler.codec.http2.Http2Error.NO_ERROR; import static io.netty.handler.codec.http2.Http2Error.NO_ERROR;
import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR;
import static io.netty.handler.codec.http2.Http2Exception.connectionError; import static io.netty.handler.codec.http2.Http2Exception.connectionError;
import static io.netty.handler.codec.http2.Http2Exception.isStreamError; import static io.netty.handler.codec.http2.Http2Exception.isStreamError;
import static io.netty.util.internal.ObjectUtil.checkNotNull; import static io.netty.util.internal.ObjectUtil.checkNotNull;
@ -29,6 +29,7 @@ import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise; import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.ByteToMessageDecoder; import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.http2.Http2Exception.CompositeStreamException;
import io.netty.handler.codec.http2.Http2Exception.StreamException; import io.netty.handler.codec.http2.Http2Exception.StreamException;
import java.util.Collection; import java.util.Collection;
@ -279,6 +280,11 @@ public class Http2ConnectionHandler extends ByteToMessageDecoder implements Http
Http2Exception embedded = getEmbeddedHttp2Exception(cause); Http2Exception embedded = getEmbeddedHttp2Exception(cause);
if (isStreamError(embedded)) { if (isStreamError(embedded)) {
onStreamError(ctx, cause, (StreamException) embedded); onStreamError(ctx, cause, (StreamException) embedded);
} else if (embedded instanceof CompositeStreamException) {
CompositeStreamException compositException = (CompositeStreamException) embedded;
for (StreamException streamException : compositException) {
onStreamError(ctx, cause, streamException);
}
} else { } else {
onConnectionError(ctx, cause, embedded); onConnectionError(ctx, cause, embedded);
} }

View File

@ -17,6 +17,10 @@ package io.netty.handler.codec.http2;
import static io.netty.handler.codec.http2.Http2CodecUtil.CONNECTION_STREAM_ID; import static io.netty.handler.codec.http2.Http2CodecUtil.CONNECTION_STREAM_ID;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
/** /**
* Exception thrown when an HTTP/2 error was encountered. * Exception thrown when an HTTP/2 error was encountered.
*/ */
@ -152,4 +156,26 @@ public class Http2Exception extends Exception {
return streamId; return streamId;
} }
} }
/**
* Provides the ability to handle multiple stream exceptions with one throw statement.
*/
public static final class CompositeStreamException extends Http2Exception implements Iterable<StreamException> {
private static final long serialVersionUID = -434398146294199889L;
private final List<StreamException> exceptions;
public CompositeStreamException(Http2Error error, int initialCapacity) {
super(error);
exceptions = new ArrayList<StreamException>(initialCapacity);
}
public void add(StreamException e) {
exceptions.add(e);
}
@Override
public Iterator<StreamException> iterator() {
return exceptions.iterator();
}
}
} }

View File

@ -37,16 +37,41 @@ public interface Http2InboundFlowController {
boolean endOfStream) throws Http2Exception; boolean endOfStream) throws Http2Exception;
/** /**
* Sets the initial inbound flow control window size and updates all stream window sizes by the * Sets the global inbound flow control window size and updates all stream window sizes by the delta.
* delta. * <p>
* * This method is used to apply the {@code SETTINGS_INITIAL_WINDOW_SIZE} value for an
* outbound {@code SETTINGS} frame.
* <p>
* The connection stream windows will not be modified as a result of this call.
* @param newWindowSize the new initial window size. * @param newWindowSize the new initial window size.
* @throws Http2Exception thrown if any protocol-related error occurred. * @throws Http2Exception thrown if any protocol-related error occurred.
*/ */
void initialInboundWindowSize(int newWindowSize) throws Http2Exception; void initialWindowSize(int newWindowSize) throws Http2Exception;
/** /**
* Gets the initial inbound flow control window size. * Gets the initial window size used as the basis for new stream flow control windows.
*/ */
int initialInboundWindowSize(); int initialWindowSize();
/**
* Sets the initial inbound flow control window size for a specific stream.
* <p>
* Note it is the responsibly of the caller to ensure that the the
* initial {@code SETTINGS} frame is sent before this is called. It would
* be considered a {@link Http2Error#PROTOCOL_ERROR} if a {@code WINDOW_UPDATE}
* was generated by this method before the initial {@code SETTINGS} frame is sent.
* @param ctx the context to use if a {@code WINDOW_UPDATE} is determined necessary.
* @param streamId The stream to update.
* @param newWindowSize the window size to apply to {@code streamId}
* @throws Http2Exception thrown if any protocol-related error occurred.
*/
void initialStreamWindowSize(ChannelHandlerContext ctx, int streamId, int newWindowSize) throws Http2Exception;
/**
* Obtain the initial window size for a specific stream.
* @param streamId The stream id to get the initial window size for.
* @return The initial window size for {@code streamId}.
* @throws Http2Exception If no stream corresponding to {@code stream} could be found.
*/
int initialStreamWindowSize(int streamId) throws Http2Exception;
} }

View File

@ -69,6 +69,7 @@ public class Http2OutboundFrameLogger implements Http2FrameWriter {
@Override @Override
public ChannelFuture writeRstStream(ChannelHandlerContext ctx, public ChannelFuture writeRstStream(ChannelHandlerContext ctx,
int streamId, long errorCode, ChannelPromise promise) { int streamId, long errorCode, ChannelPromise promise) {
logger.logRstStream(OUTBOUND, streamId, errorCode);
return writer.writeRstStream(ctx, streamId, errorCode, promise); return writer.writeRstStream(ctx, streamId, errorCode, promise);
} }

View File

@ -77,12 +77,10 @@ public class DataCompressionHttp2Test {
@Mock @Mock
private Http2FrameListener clientListener; private Http2FrameListener clientListener;
private Http2ConnectionEncoder serverEncoder;
private Http2ConnectionEncoder clientEncoder; private Http2ConnectionEncoder clientEncoder;
private ServerBootstrap sb; private ServerBootstrap sb;
private Bootstrap cb; private Bootstrap cb;
private Channel serverChannel; private Channel serverChannel;
private Channel serverConnectedChannel;
private Channel clientChannel; private Channel clientChannel;
private volatile CountDownLatch serverLatch; private volatile CountDownLatch serverLatch;
private volatile CountDownLatch clientLatch; private volatile CountDownLatch clientLatch;
@ -226,53 +224,7 @@ public class DataCompressionHttp2Test {
} }
@Test @Test
public void deflateEncodingSingleLargeMessageReducedWindow() throws Exception { public void deflateEncodingWriteLargeMessage() throws Exception {
final int BUFFER_SIZE = 1 << 16;
bootstrapEnv(1, BUFFER_SIZE, 1);
final ByteBuf data = Unpooled.buffer(BUFFER_SIZE);
try {
for (int i = 0; i < data.capacity(); ++i) {
data.writeByte((byte) 'a');
}
final Http2Headers headers = new DefaultHttp2Headers().method(POST).path(PATH)
.set(HttpHeaderNames.CONTENT_ENCODING, HttpHeaderValues.DEFLATE);
final Http2Settings settings = new Http2Settings();
// Assume the compression operation will reduce the size by at least 10 bytes
settings.initialWindowSize(BUFFER_SIZE - 10);
runInChannel(serverConnectedChannel, new Http2Runnable() {
@Override
public void run() {
serverEncoder.writeSettings(ctxServer(), settings, newPromiseServer());
ctxServer().flush();
}
});
awaitClient();
// Required because the decompressor intercepts the onXXXRead events before
// our {@link Http2TestUtil$FrameAdapter} does.
Http2Stream stream = FrameAdapter.getOrCreateStream(serverConnection, 3, false);
FrameAdapter.getOrCreateStream(clientConnection, 3, false);
runInChannel(clientChannel, new Http2Runnable() {
@Override
public void run() {
clientEncoder.writeSettings(ctxClient(), settings, newPromiseClient());
clientEncoder.writeHeaders(ctxClient(), 3, headers, 0, false, newPromiseClient());
clientEncoder.writeData(ctxClient(), 3, data.retain(), 0, true, newPromiseClient());
ctxClient().flush();
}
});
awaitServer();
assertEquals(0, stream.garbageCollector().unProcessedBytes());
assertEquals(data.resetReaderIndex().toString(CharsetUtil.UTF_8),
serverOut.toString(CharsetUtil.UTF_8.name()));
} finally {
data.release();
}
}
@Test
public void deflateEncodingMultipleWriteLargeMessageReducedWindow() throws Exception {
final int BUFFER_SIZE = 1 << 12; final int BUFFER_SIZE = 1 << 12;
final byte[] bytes = new byte[BUFFER_SIZE]; final byte[] bytes = new byte[BUFFER_SIZE];
new Random().nextBytes(bytes); new Random().nextBytes(bytes);
@ -281,17 +233,6 @@ public class DataCompressionHttp2Test {
try { try {
final Http2Headers headers = new DefaultHttp2Headers().method(POST).path(PATH) final Http2Headers headers = new DefaultHttp2Headers().method(POST).path(PATH)
.set(HttpHeaderNames.CONTENT_ENCODING, HttpHeaderValues.DEFLATE); .set(HttpHeaderNames.CONTENT_ENCODING, HttpHeaderValues.DEFLATE);
final Http2Settings settings = new Http2Settings();
settings.initialWindowSize(BUFFER_SIZE / 2);
runInChannel(serverConnectedChannel, new Http2Runnable() {
@Override
public void run() {
serverEncoder.writeSettings(ctxServer(), settings, newPromiseServer());
ctxServer().flush();
}
});
awaitClient();
// Required because the decompressor intercepts the onXXXRead events before // Required because the decompressor intercepts the onXXXRead events before
// our {@link Http2TestUtil$FrameAdapter} does. // our {@link Http2TestUtil$FrameAdapter} does.
@ -300,7 +241,6 @@ public class DataCompressionHttp2Test {
runInChannel(clientChannel, new Http2Runnable() { runInChannel(clientChannel, new Http2Runnable() {
@Override @Override
public void run() { public void run() {
clientEncoder.writeSettings(ctxClient(), settings, newPromiseClient());
clientEncoder.writeHeaders(ctxClient(), 3, headers, 0, false, newPromiseClient()); clientEncoder.writeHeaders(ctxClient(), 3, headers, 0, false, newPromiseClient());
clientEncoder.writeData(ctxClient(), 3, data.retain(), 0, true, newPromiseClient()); clientEncoder.writeData(ctxClient(), 3, data.retain(), 0, true, newPromiseClient());
ctxClient().flush(); ctxClient().flush();
@ -362,8 +302,6 @@ public class DataCompressionHttp2Test {
.listener(new DelegatingDecompressorFrameListener(serverConnection, serverListener)), .listener(new DelegatingDecompressorFrameListener(serverConnection, serverListener)),
new CompressorHttp2ConnectionEncoder.Builder().connection(serverConnection).frameWriter(writer) new CompressorHttp2ConnectionEncoder.Builder().connection(serverConnection).frameWriter(writer)
.outboundFlow(new DefaultHttp2OutboundFlowController(serverConnection, writer))); .outboundFlow(new DefaultHttp2OutboundFlowController(serverConnection, writer)));
serverEncoder = connectionHandler.encoder();
serverConnectedChannel = ch;
p.addLast(connectionHandler); p.addLast(connectionHandler);
p.addLast(Http2CodecUtil.ignoreSettingsHandler()); p.addLast(Http2CodecUtil.ignoreSettingsHandler());
serverChannelLatch.countDown(); serverChannelLatch.countDown();
@ -401,10 +339,6 @@ public class DataCompressionHttp2Test {
assertTrue(serverChannelLatch.await(5, SECONDS)); assertTrue(serverChannelLatch.await(5, SECONDS));
} }
private void awaitClient() throws Exception {
assertTrue(clientLatch.await(5, SECONDS));
}
private void awaitServer() throws Exception { private void awaitServer() throws Exception {
assertTrue(serverLatch.await(5, SECONDS)); assertTrue(serverLatch.await(5, SECONDS));
serverOut.flush(); serverOut.flush();
@ -417,12 +351,4 @@ public class DataCompressionHttp2Test {
private ChannelPromise newPromiseClient() { private ChannelPromise newPromiseClient() {
return ctxClient().newPromise(); return ctxClient().newPromise();
} }
private ChannelHandlerContext ctxServer() {
return serverConnectedChannel.pipeline().firstContext();
}
private ChannelPromise newPromiseServer() {
return ctxServer().newPromise();
}
} }

View File

@ -57,6 +57,8 @@ public class DefaultHttp2InboundFlowControllerTest {
private DefaultHttp2Connection connection; private DefaultHttp2Connection connection;
private static float updateRatio = 0.5f;
@Before @Before
public void setup() throws Http2Exception { public void setup() throws Http2Exception {
MockitoAnnotations.initMocks(this); MockitoAnnotations.initMocks(this);
@ -64,7 +66,7 @@ public class DefaultHttp2InboundFlowControllerTest {
when(ctx.newPromise()).thenReturn(promise); when(ctx.newPromise()).thenReturn(promise);
connection = new DefaultHttp2Connection(false); connection = new DefaultHttp2Connection(false);
controller = new DefaultHttp2InboundFlowController(connection, frameWriter); controller = new DefaultHttp2InboundFlowController(connection, frameWriter, updateRatio);
connection.local().createStream(STREAM_ID, false); connection.local().createStream(STREAM_ID, false);
} }
@ -77,16 +79,17 @@ public class DefaultHttp2InboundFlowControllerTest {
@Test @Test
public void windowUpdateShouldSendOnceBytesReturned() throws Http2Exception { public void windowUpdateShouldSendOnceBytesReturned() throws Http2Exception {
int dataSize = DEFAULT_WINDOW_SIZE / 2 + 1; int dataSize = (int) (DEFAULT_WINDOW_SIZE * updateRatio) + 1;
applyFlowControl(STREAM_ID, dataSize, 0, false); applyFlowControl(STREAM_ID, dataSize, 0, false);
// Return only a few bytes and verify that the WINDOW_UPDATE hasn't been sent. // Return only a few bytes and verify that the WINDOW_UPDATE hasn't been sent.
returnProcessedBytes(STREAM_ID, 10); returnProcessedBytes(STREAM_ID, 10);
verifyWindowUpdateSent(CONNECTION_STREAM_ID, dataSize); verifyWindowUpdateNotSent(CONNECTION_STREAM_ID);
// Return the rest and verify the WINDOW_UPDATE is sent. // Return the rest and verify the WINDOW_UPDATE is sent.
returnProcessedBytes(STREAM_ID, dataSize - 10); returnProcessedBytes(STREAM_ID, dataSize - 10);
verifyWindowUpdateSent(STREAM_ID, dataSize); verifyWindowUpdateSent(STREAM_ID, dataSize);
verifyWindowUpdateSent(CONNECTION_STREAM_ID, dataSize);
} }
@Test(expected = Http2Exception.class) @Test(expected = Http2Exception.class)
@ -97,22 +100,21 @@ public class DefaultHttp2InboundFlowControllerTest {
@Test @Test
public void windowUpdateShouldNotBeSentAfterEndOfStream() throws Http2Exception { public void windowUpdateShouldNotBeSentAfterEndOfStream() throws Http2Exception {
int dataSize = DEFAULT_WINDOW_SIZE / 2 + 1; int dataSize = (int) (DEFAULT_WINDOW_SIZE * updateRatio) + 1;
int newWindow = DEFAULT_WINDOW_SIZE - dataSize;
int windowDelta = DEFAULT_WINDOW_SIZE - newWindow;
// Set end-of-stream on the frame, so no window update will be sent for the stream. // Set end-of-stream on the frame, so no window update will be sent for the stream.
applyFlowControl(STREAM_ID, dataSize, 0, true); applyFlowControl(STREAM_ID, dataSize, 0, true);
verifyWindowUpdateSent(CONNECTION_STREAM_ID, windowDelta); verifyWindowUpdateNotSent(CONNECTION_STREAM_ID);
verifyWindowUpdateNotSent(STREAM_ID); verifyWindowUpdateNotSent(STREAM_ID);
returnProcessedBytes(STREAM_ID, dataSize); returnProcessedBytes(STREAM_ID, dataSize);
verifyWindowUpdateSent(CONNECTION_STREAM_ID, dataSize);
verifyWindowUpdateNotSent(STREAM_ID); verifyWindowUpdateNotSent(STREAM_ID);
} }
@Test @Test
public void halfWindowRemainingShouldUpdateAllWindows() throws Http2Exception { public void halfWindowRemainingShouldUpdateAllWindows() throws Http2Exception {
int dataSize = DEFAULT_WINDOW_SIZE / 2 + 1; int dataSize = (int) (DEFAULT_WINDOW_SIZE * updateRatio) + 1;
int initialWindowSize = DEFAULT_WINDOW_SIZE; int initialWindowSize = DEFAULT_WINDOW_SIZE;
int windowDelta = getWindowDelta(initialWindowSize, initialWindowSize, dataSize); int windowDelta = getWindowDelta(initialWindowSize, initialWindowSize, dataSize);
@ -129,14 +131,14 @@ public class DefaultHttp2InboundFlowControllerTest {
int initialWindowSize = DEFAULT_WINDOW_SIZE; int initialWindowSize = DEFAULT_WINDOW_SIZE;
applyFlowControl(STREAM_ID, initialWindowSize, 0, false); applyFlowControl(STREAM_ID, initialWindowSize, 0, false);
assertEquals(0, window(STREAM_ID)); assertEquals(0, window(STREAM_ID));
assertEquals(DEFAULT_WINDOW_SIZE, window(CONNECTION_STREAM_ID)); assertEquals(0, window(CONNECTION_STREAM_ID));
returnProcessedBytes(STREAM_ID, initialWindowSize); returnProcessedBytes(STREAM_ID, initialWindowSize);
assertEquals(initialWindowSize, window(STREAM_ID)); assertEquals(initialWindowSize, window(STREAM_ID));
assertEquals(DEFAULT_WINDOW_SIZE, window(CONNECTION_STREAM_ID)); assertEquals(DEFAULT_WINDOW_SIZE, window(CONNECTION_STREAM_ID));
// Update the initial window size to allow another frame. // Update the initial window size to allow another frame.
int newInitialWindowSize = 2 * initialWindowSize; int newInitialWindowSize = 2 * initialWindowSize;
controller.initialInboundWindowSize(newInitialWindowSize); controller.initialWindowSize(newInitialWindowSize);
assertEquals(newInitialWindowSize, window(STREAM_ID)); assertEquals(newInitialWindowSize, window(STREAM_ID));
assertEquals(DEFAULT_WINDOW_SIZE, window(CONNECTION_STREAM_ID)); assertEquals(DEFAULT_WINDOW_SIZE, window(CONNECTION_STREAM_ID));
@ -147,57 +149,102 @@ public class DefaultHttp2InboundFlowControllerTest {
applyFlowControl(STREAM_ID, initialWindowSize, 0, false); applyFlowControl(STREAM_ID, initialWindowSize, 0, false);
returnProcessedBytes(STREAM_ID, initialWindowSize); returnProcessedBytes(STREAM_ID, initialWindowSize);
int delta = newInitialWindowSize - initialWindowSize; int delta = newInitialWindowSize - initialWindowSize;
verifyWindowUpdateSent(CONNECTION_STREAM_ID, newInitialWindowSize);
verifyWindowUpdateSent(STREAM_ID, delta); verifyWindowUpdateSent(STREAM_ID, delta);
verifyWindowUpdateSent(CONNECTION_STREAM_ID, delta);
} }
@Test @Test
public void connectionWindowShouldExpandWithNumberOfStreams() throws Http2Exception { public void connectionWindowShouldAdjustWithMultipleStreams() throws Http2Exception {
// Create another stream
int newStreamId = 3; int newStreamId = 3;
connection.local().createStream(newStreamId, false); connection.local().createStream(newStreamId, false);
assertEquals(DEFAULT_WINDOW_SIZE, window(STREAM_ID)); try {
assertEquals(DEFAULT_WINDOW_SIZE, window(CONNECTION_STREAM_ID)); assertEquals(DEFAULT_WINDOW_SIZE, window(STREAM_ID));
assertEquals(DEFAULT_WINDOW_SIZE, window(CONNECTION_STREAM_ID));
// Receive some data - this should cause the connection window to expand. // Test that both stream and connection window are updated (or not updated) together
int data1 = 50; int data1 = (int) (DEFAULT_WINDOW_SIZE * updateRatio) + 1;
int expectedMaxConnectionWindow = DEFAULT_WINDOW_SIZE * 2; applyFlowControl(STREAM_ID, data1, 0, false);
applyFlowControl(STREAM_ID, data1, 0, false); verifyWindowUpdateNotSent(STREAM_ID);
verifyWindowUpdateNotSent(STREAM_ID); verifyWindowUpdateNotSent(CONNECTION_STREAM_ID);
verifyWindowUpdateSent(CONNECTION_STREAM_ID, DEFAULT_WINDOW_SIZE + data1); assertEquals(DEFAULT_WINDOW_SIZE - data1, window(STREAM_ID));
assertEquals(DEFAULT_WINDOW_SIZE - data1, window(STREAM_ID)); assertEquals(DEFAULT_WINDOW_SIZE - data1, window(CONNECTION_STREAM_ID));
assertEquals(expectedMaxConnectionWindow, window(CONNECTION_STREAM_ID)); returnProcessedBytes(STREAM_ID, data1);
verifyWindowUpdateSent(STREAM_ID, data1);
verifyWindowUpdateSent(CONNECTION_STREAM_ID, data1);
reset(frameWriter);
// Create a scenario where data is depleted from multiple streams, but not enough data
// to generate a window update on those streams. The amount will be enough to generate
// a window update for the connection stream.
--data1;
int data2 = data1 >> 1;
applyFlowControl(STREAM_ID, data1, 0, false);
applyFlowControl(newStreamId, data1, 0, false);
verifyWindowUpdateNotSent(STREAM_ID);
verifyWindowUpdateNotSent(newStreamId);
verifyWindowUpdateNotSent(CONNECTION_STREAM_ID);
assertEquals(DEFAULT_WINDOW_SIZE - data1, window(STREAM_ID));
assertEquals(DEFAULT_WINDOW_SIZE - data1, window(newStreamId));
assertEquals(DEFAULT_WINDOW_SIZE - (data1 << 1), window(CONNECTION_STREAM_ID));
returnProcessedBytes(STREAM_ID, data1);
returnProcessedBytes(newStreamId, data2);
verifyWindowUpdateNotSent(STREAM_ID);
verifyWindowUpdateNotSent(newStreamId);
verifyWindowUpdateSent(CONNECTION_STREAM_ID, data1 + data2);
assertEquals(DEFAULT_WINDOW_SIZE - data1, window(STREAM_ID));
assertEquals(DEFAULT_WINDOW_SIZE - data1, window(newStreamId));
assertEquals(DEFAULT_WINDOW_SIZE - (data1 - data2), window(CONNECTION_STREAM_ID));
} finally {
connection.stream(newStreamId).close();
}
}
@Test
public void globalRatioShouldImpactStreams() throws Http2Exception {
float ratio = 0.6f;
controller.windowUpdateRatio(ratio);
testRatio(ratio, DEFAULT_WINDOW_SIZE << 1, 3, false);
}
@Test
public void streamlRatioShouldImpactStreams() throws Http2Exception {
float ratio = 0.6f;
testRatio(ratio, DEFAULT_WINDOW_SIZE << 1, 3, true);
}
private void testRatio(float ratio, int newDefaultWindowSize, int newStreamId, boolean setStreamRatio)
throws Http2Exception {
controller.initialStreamWindowSize(ctx, 0, newDefaultWindowSize);
connection.local().createStream(newStreamId, false);
if (setStreamRatio) {
controller.windowUpdateRatio(ctx, newStreamId, ratio);
}
controller.initialStreamWindowSize(ctx, newStreamId, newDefaultWindowSize);
reset(frameWriter); reset(frameWriter);
try {
// Close the new stream. int data1 = (int) (newDefaultWindowSize * ratio) + 1;
connection.stream(newStreamId).close(); int data2 = (int) (DEFAULT_WINDOW_SIZE * updateRatio) >> 1;
applyFlowControl(STREAM_ID, data2, 0, false);
// Read more data and verify that the stream window refreshes but the applyFlowControl(newStreamId, data1, 0, false);
// connection window continues collapsing. verifyWindowUpdateNotSent(STREAM_ID);
int data2 = window(STREAM_ID); verifyWindowUpdateNotSent(newStreamId);
applyFlowControl(STREAM_ID, data2, 0, false); verifyWindowUpdateNotSent(CONNECTION_STREAM_ID);
returnProcessedBytes(STREAM_ID, data2); assertEquals(DEFAULT_WINDOW_SIZE - data2, window(STREAM_ID));
verifyWindowUpdateSent(STREAM_ID, data2); assertEquals(newDefaultWindowSize - data1, window(newStreamId));
verifyWindowUpdateNotSent(CONNECTION_STREAM_ID); assertEquals(newDefaultWindowSize - data2 - data1, window(CONNECTION_STREAM_ID));
assertEquals(DEFAULT_WINDOW_SIZE - data1, window(STREAM_ID)); returnProcessedBytes(STREAM_ID, data2);
assertEquals(DEFAULT_WINDOW_SIZE * 2 - data2 , window(CONNECTION_STREAM_ID)); returnProcessedBytes(newStreamId, data1);
verifyWindowUpdateNotSent(STREAM_ID);
reset(frameWriter); verifyWindowUpdateSent(newStreamId, data1);
returnProcessedBytes(STREAM_ID, data1); verifyWindowUpdateSent(CONNECTION_STREAM_ID, data1 + data2);
verifyWindowUpdateNotSent(STREAM_ID); assertEquals(DEFAULT_WINDOW_SIZE - data2, window(STREAM_ID));
assertEquals(newDefaultWindowSize, window(newStreamId));
// Read enough data to cause a WINDOW_UPDATE for both the stream and connection and assertEquals(newDefaultWindowSize, window(CONNECTION_STREAM_ID));
// verify the new maximum of the connection window. } finally {
int data3 = window(STREAM_ID); connection.stream(newStreamId).close();
applyFlowControl(STREAM_ID, data3, 0, false); }
returnProcessedBytes(STREAM_ID, data3);
verifyWindowUpdateSent(STREAM_ID, DEFAULT_WINDOW_SIZE);
verifyWindowUpdateSent(CONNECTION_STREAM_ID, DEFAULT_WINDOW_SIZE
- (DEFAULT_WINDOW_SIZE * 2 - (data2 + data3)));
assertEquals(DEFAULT_WINDOW_SIZE, window(STREAM_ID));
assertEquals(DEFAULT_WINDOW_SIZE, window(CONNECTION_STREAM_ID));
} }
private static int getWindowDelta(int initialSize, int windowSize, int dataSize) { private static int getWindowDelta(int initialSize, int windowSize, int dataSize) {

View File

@ -71,6 +71,7 @@ import org.mockito.stubbing.Answer;
* Testing the {@link HttpToHttp2ConnectionHandler} for {@link FullHttpRequest} objects into HTTP/2 frames * Testing the {@link HttpToHttp2ConnectionHandler} for {@link FullHttpRequest} objects into HTTP/2 frames
*/ */
public class HttpToHttp2ConnectionHandlerTest { public class HttpToHttp2ConnectionHandlerTest {
private static final int WAIT_TIME_SECONDS = 5;
@Mock @Mock
private Http2FrameListener clientListener; private Http2FrameListener clientListener;
@ -122,9 +123,9 @@ public class HttpToHttp2ConnectionHandlerTest {
ChannelPromise writePromise = newPromise(); ChannelPromise writePromise = newPromise();
ChannelFuture writeFuture = clientChannel.writeAndFlush(request, writePromise); ChannelFuture writeFuture = clientChannel.writeAndFlush(request, writePromise);
writePromise.awaitUninterruptibly(2, SECONDS); assertTrue(writePromise.awaitUninterruptibly(WAIT_TIME_SECONDS, SECONDS));
assertTrue(writePromise.isSuccess()); assertTrue(writePromise.isSuccess());
writeFuture.awaitUninterruptibly(2, SECONDS); assertTrue(writeFuture.awaitUninterruptibly(WAIT_TIME_SECONDS, SECONDS));
assertTrue(writeFuture.isSuccess()); assertTrue(writeFuture.isSuccess());
awaitRequests(); awaitRequests();
verify(serverListener).onHeadersRead(any(ChannelHandlerContext.class), eq(5), verify(serverListener).onHeadersRead(any(ChannelHandlerContext.class), eq(5),
@ -162,9 +163,9 @@ public class HttpToHttp2ConnectionHandlerTest {
ChannelPromise writePromise = newPromise(); ChannelPromise writePromise = newPromise();
ChannelFuture writeFuture = clientChannel.writeAndFlush(request, writePromise); ChannelFuture writeFuture = clientChannel.writeAndFlush(request, writePromise);
writePromise.awaitUninterruptibly(2, SECONDS); assertTrue(writePromise.awaitUninterruptibly(WAIT_TIME_SECONDS, SECONDS));
assertTrue(writePromise.isSuccess()); assertTrue(writePromise.isSuccess());
writeFuture.awaitUninterruptibly(2, SECONDS); assertTrue(writeFuture.awaitUninterruptibly(WAIT_TIME_SECONDS, SECONDS));
assertTrue(writeFuture.isSuccess()); assertTrue(writeFuture.isSuccess());
awaitRequests(); awaitRequests();
verify(serverListener).onHeadersRead(any(ChannelHandlerContext.class), eq(3), eq(http2Headers), eq(0), verify(serverListener).onHeadersRead(any(ChannelHandlerContext.class), eq(3), eq(http2Headers), eq(0),
@ -214,7 +215,7 @@ public class HttpToHttp2ConnectionHandlerTest {
} }
private void awaitRequests() throws Exception { private void awaitRequests() throws Exception {
assertTrue(requestLatch.await(2, SECONDS)); assertTrue(requestLatch.await(WAIT_TIME_SECONDS, SECONDS));
} }
private ChannelHandlerContext ctx() { private ChannelHandlerContext ctx() {