diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionDecoder.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionDecoder.java index 93925705b7..fd46926d59 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionDecoder.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionDecoder.java @@ -141,7 +141,7 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder { Http2FrameReader.Configuration config = frameReader.configuration(); Http2HeaderTable headerTable = config.headerTable(); Http2FrameSizePolicy frameSizePolicy = config.frameSizePolicy(); - settings.initialWindowSize(inboundFlow.initialInboundWindowSize()); + settings.initialWindowSize(inboundFlow.initialWindowSize()); settings.maxConcurrentStreams(connection.remote().maxStreams()); settings.headerTableSize(headerTable.maxHeaderTableSize()); settings.maxFrameSize(frameSizePolicy.maxFrameSize()); @@ -189,7 +189,7 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder { Integer initialWindowSize = settings.initialWindowSize(); if (initialWindowSize != null) { - inboundFlow.initialInboundWindowSize(initialWindowSize); + inboundFlow.initialWindowSize(initialWindowSize); } } @@ -452,7 +452,7 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder { Integer initialWindowSize = settings.initialWindowSize(); if (initialWindowSize != null) { - inboundFlow.initialInboundWindowSize(initialWindowSize); + inboundFlow.initialWindowSize(initialWindowSize); } } @@ -462,7 +462,6 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder { // Acknowledge receipt of the settings. encoder.writeSettingsAck(ctx, ctx.newPromise()); - ctx.flush(); // We've received at least one non-ack settings frame from the remote endpoint. prefaceReceived = true; diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionEncoder.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionEncoder.java index e3068564a2..5874c23bd0 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionEncoder.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionEncoder.java @@ -398,7 +398,9 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder { @Override public ChannelFuture writeSettingsAck(ChannelHandlerContext ctx, ChannelPromise promise) { - return frameWriter.writeSettingsAck(ctx, promise); + ChannelFuture future = frameWriter.writeSettingsAck(ctx, promise); + ctx.flush(); + return future; } @Override @@ -446,13 +448,8 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder { @Override public ChannelFuture writeWindowUpdate(ChannelHandlerContext ctx, int streamId, int windowSizeIncrement, ChannelPromise promise) { - if (streamId > 0) { - Http2Stream stream = connection().stream(streamId); - if (stream != null && stream.isResetSent()) { - throw new IllegalStateException("Sending data after sending RST_STREAM."); - } - } - return frameWriter.writeWindowUpdate(ctx, streamId, windowSizeIncrement, promise); + return promise.setFailure(new UnsupportedOperationException("Use the Http2[Inbound|Outbound]FlowController" + + " objects to control window sizes")); } @Override diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2FrameReader.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2FrameReader.java index b15fefea70..32b2fceeea 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2FrameReader.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2FrameReader.java @@ -14,12 +14,14 @@ */ package io.netty.handler.codec.http2; +import static io.netty.handler.codec.http2.Http2CodecUtil.SETTINGS_INITIAL_WINDOW_SIZE; import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_MAX_FRAME_SIZE; import static io.netty.handler.codec.http2.Http2CodecUtil.FRAME_HEADER_LENGTH; import static io.netty.handler.codec.http2.Http2CodecUtil.INT_FIELD_LENGTH; import static io.netty.handler.codec.http2.Http2CodecUtil.PRIORITY_ENTRY_LENGTH; import static io.netty.handler.codec.http2.Http2CodecUtil.SETTINGS_MAX_FRAME_SIZE; import static io.netty.handler.codec.http2.Http2CodecUtil.SETTING_ENTRY_LENGTH; +import static io.netty.handler.codec.http2.Http2Error.FLOW_CONTROL_ERROR; import static io.netty.handler.codec.http2.Http2Error.FRAME_SIZE_ERROR; import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR; import static io.netty.handler.codec.http2.Http2CodecUtil.isMaxFrameSizeValid; @@ -485,10 +487,13 @@ public class DefaultHttp2FrameReader implements Http2FrameReader, Http2FrameSize try { settings.put(id, value); } catch (IllegalArgumentException e) { - if (id == SETTINGS_MAX_FRAME_SIZE) { - throw new Http2Exception(FRAME_SIZE_ERROR, e.getMessage(), e); - } else { - throw new Http2Exception(PROTOCOL_ERROR, e.getMessage(), e); + switch(id) { + case SETTINGS_MAX_FRAME_SIZE: + throw connectionError(FRAME_SIZE_ERROR, e, e.getMessage()); + case SETTINGS_INITIAL_WINDOW_SIZE: + throw connectionError(FLOW_CONTROL_ERROR, e, e.getMessage()); + default: + throw connectionError(PROTOCOL_ERROR, e, e.getMessage()); } } } diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2InboundFlowController.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2InboundFlowController.java index 61aa1e8b44..bdeaa3d6b3 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2InboundFlowController.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2InboundFlowController.java @@ -17,49 +17,46 @@ package io.netty.handler.codec.http2; import static io.netty.handler.codec.http2.Http2CodecUtil.CONNECTION_STREAM_ID; import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_WINDOW_SIZE; +import static io.netty.handler.codec.http2.Http2CodecUtil.MAX_INITIAL_WINDOW_SIZE; +import static io.netty.handler.codec.http2.Http2CodecUtil.MIN_INITIAL_WINDOW_SIZE; import static io.netty.handler.codec.http2.Http2Error.FLOW_CONTROL_ERROR; import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR; import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR; -import static io.netty.handler.codec.http2.Http2Exception.streamError; import static io.netty.handler.codec.http2.Http2Exception.connectionError; +import static io.netty.handler.codec.http2.Http2Exception.streamError; import static io.netty.util.internal.ObjectUtil.checkNotNull; +import static java.lang.Math.max; +import static java.lang.Math.min; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.http2.Http2Exception.CompositeStreamException; +import io.netty.handler.codec.http2.Http2Exception.StreamException; /** * Basic implementation of {@link Http2InboundFlowController}. */ public class DefaultHttp2InboundFlowController implements Http2InboundFlowController { + private static final int DEFAULT_COMPOSITE_EXCEPTION_SIZE = 4; /** * The default ratio of window size to initial window size below which a {@code WINDOW_UPDATE} * is sent to expand the window. */ - public static final double DEFAULT_WINDOW_UPDATE_RATIO = 0.5; - - /** - * The default maximum connection size used as a limit when the number of active streams is - * large. Set to 2 MiB. - */ - public static final int DEFAULT_MAX_CONNECTION_WINDOW_SIZE = 1048576 * 2; + public static final float DEFAULT_WINDOW_UPDATE_RATIO = 0.5f; private final Http2Connection connection; private final Http2FrameWriter frameWriter; - private final double windowUpdateRatio; - private int maxConnectionWindowSize = DEFAULT_MAX_CONNECTION_WINDOW_SIZE; - private int initialWindowSize = DEFAULT_WINDOW_SIZE; + private volatile float windowUpdateRatio; + private volatile int initialWindowSize = DEFAULT_WINDOW_SIZE; public DefaultHttp2InboundFlowController(Http2Connection connection, Http2FrameWriter frameWriter) { this(connection, frameWriter, DEFAULT_WINDOW_UPDATE_RATIO); } public DefaultHttp2InboundFlowController(Http2Connection connection, - Http2FrameWriter frameWriter, double windowUpdateRatio) { + Http2FrameWriter frameWriter, float windowUpdateRatio) { this.connection = checkNotNull(connection, "connection"); this.frameWriter = checkNotNull(frameWriter, "frameWriter"); - if (Double.compare(windowUpdateRatio, 0.0) <= 0 || Double.compare(windowUpdateRatio, 1.0) >= 0) { - throw new IllegalArgumentException("Invalid ratio: " + windowUpdateRatio); - } - this.windowUpdateRatio = windowUpdateRatio; + windowUpdateRatio(windowUpdateRatio); // Add a flow state for the connection. final Http2Stream connectionStream = connection.connectionStream(); @@ -78,47 +75,124 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro }); } - public DefaultHttp2InboundFlowController setMaxConnectionWindowSize(int maxConnectionWindowSize) { - if (maxConnectionWindowSize <= 0) { - throw new IllegalArgumentException("maxConnectionWindowSize must be > 0"); - } - this.maxConnectionWindowSize = maxConnectionWindowSize; - return this; - } - @Override - public void initialInboundWindowSize(int newWindowSize) throws Http2Exception { - int deltaWindowSize = newWindowSize - initialWindowSize; + public void initialWindowSize(int newWindowSize) throws Http2Exception { + int delta = newWindowSize - initialWindowSize; initialWindowSize = newWindowSize; - // Apply the delta to all of the windows. + CompositeStreamException compositeException = null; for (Http2Stream stream : connection.activeStreams()) { - state(stream).updatedInitialWindowSize(deltaWindowSize); + try { + // Increment flow control window first so state will be consistent if overflow is detected + FlowState state = state(stream); + state.incrementFlowControlWindows(delta); + state.incrementInitialStreamWindow(delta); + } catch (StreamException e) { + if (compositeException == null) { + compositeException = new CompositeStreamException(e.error(), DEFAULT_COMPOSITE_EXCEPTION_SIZE); + } + compositeException.add(e); + } + } + if (compositeException != null) { + throw compositeException; } } @Override - public int initialInboundWindowSize() { + public void initialStreamWindowSize(ChannelHandlerContext ctx, int streamId, int newWindowSize) + throws Http2Exception { + checkNotNull(ctx, "ctx"); + if (newWindowSize < MIN_INITIAL_WINDOW_SIZE || newWindowSize > MAX_INITIAL_WINDOW_SIZE) { + throw new IllegalArgumentException("Invalid newWindowSize: " + newWindowSize); + } + + FlowState state = stateOrFail(streamId); + state.initialStreamWindowSize(newWindowSize); + state.writeWindowUpdateIfNeeded(ctx); + } + + @Override + public int initialWindowSize() { return initialWindowSize; } + @Override + public int initialStreamWindowSize(int streamId) throws Http2Exception { + return stateOrFail(streamId).initialStreamWindowSize(); + } + + private static void checkValidRatio(float ratio) { + if (Double.compare(ratio, 0.0) <= 0 || Double.compare(ratio, 1.0) >= 0) { + throw new IllegalArgumentException("Invalid ratio: " + ratio); + } + } + + /** + * The window update ratio is used to determine when a window update must be sent. If the ratio + * of bytes processed since the last update has meet or exceeded this ratio then a window update will + * be sent. This is the global window update ratio that will be used for new streams. + * @param ratio the ratio to use when checking if a {@code WINDOW_UPDATE} is determined necessary for new streams. + * @throws IllegalArgumentException If the ratio is out of bounds (0, 1). + */ + public void windowUpdateRatio(float ratio) { + checkValidRatio(ratio); + windowUpdateRatio = ratio; + } + + /** + * The window update ratio is used to determine when a window update must be sent. If the ratio + * of bytes processed since the last update has meet or exceeded this ratio then a window update will + * be sent. This is the global window update ratio that will be used for new streams. + */ + public float windowUpdateRatio() { + return windowUpdateRatio; + } + + /** + * The window update ratio is used to determine when a window update must be sent. If the ratio + * of bytes processed since the last update has meet or exceeded this ratio then a window update will + * be sent. This window update ratio will only be applied to {@code streamId}. + *

+ * Note it is the responsibly of the caller to ensure that the the + * initial {@code SETTINGS} frame is sent before this is called. It would + * be considered a {@link Http2Error#PROTOCOL_ERROR} if a {@code WINDOW_UPDATE} + * was generated by this method before the initial {@code SETTINGS} frame is sent. + * @param ctx the context to use if a {@code WINDOW_UPDATE} is determined necessary. + * @param streamId the stream for which {@code ratio} applies to. + * @param ratio the ratio to use when checking if a {@code WINDOW_UPDATE} is determined necessary. + * @throws Http2Exception If a protocol-error occurs while generating {@code WINDOW_UPDATE} frames + */ + public void windowUpdateRatio(ChannelHandlerContext ctx, int streamId, float ratio) throws Http2Exception { + checkNotNull(ctx, "ctx"); + checkValidRatio(ratio); + FlowState state = stateOrFail(streamId); + state.windowUpdateRatio(ratio); + state.writeWindowUpdateIfNeeded(ctx); + } + + /** + * The window update ratio is used to determine when a window update must be sent. If the ratio + * of bytes processed since the last update has meet or exceeded this ratio then a window update will + * be sent. This window update ratio will only be applied to {@code streamId}. + * @throws Http2Exception If no stream corresponding to {@code stream} could be found. + */ + public float windowUpdateRatio(int streamId) throws Http2Exception { + return stateOrFail(streamId).windowUpdateRatio(); + } + @Override public void applyFlowControl(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, boolean endOfStream) throws Http2Exception { int dataLength = data.readableBytes() + padding; - int delta = -dataLength; - // Apply the connection-level flow control. Immediately return the bytes for the connection - // window so that data on this stream does not starve other stream. - FlowState connectionState = connectionState(); - connectionState.addAndGet(delta); - connectionState.returnProcessedBytes(dataLength); - connectionState.updateWindowIfAppropriate(ctx); + // Apply the connection-level flow control + connectionState().applyFlowControl(dataLength); - // Apply the stream-level flow control, but do not return the bytes immediately. + // Apply the stream-level flow control FlowState state = stateOrFail(streamId); state.endOfStream(endOfStream); - state.addAndGet(delta); + state.applyFlowControl(dataLength); } private FlowState connectionState() { @@ -164,13 +238,26 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro */ private int processedWindow; + /** + * This is what is used to determine how many bytes need to be returned relative to {@link #processedWindow}. + * Each stream has their own initial window size. + */ + private volatile int initialStreamWindowSize; + + /** + * This is used to determine when {@link #processedWindow} is sufficiently far away from + * {@link #initialStreamWindowSize} such that a {@code WINDOW_UPDATE} should be sent. + * Each stream has their own window update ratio. + */ + private volatile float streamWindowUpdateRatio; + private int lowerBound; private boolean endOfStream; FlowState(Http2Stream stream) { this.stream = stream; - window = initialWindowSize; - processedWindow = window; + window = processedWindow = initialStreamWindowSize = initialWindowSize; + streamWindowUpdateRatio = windowUpdateRatio; } @Override @@ -182,23 +269,82 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro this.endOfStream = endOfStream; } + float windowUpdateRatio() { + return streamWindowUpdateRatio; + } + + void windowUpdateRatio(float ratio) { + streamWindowUpdateRatio = ratio; + } + + int initialStreamWindowSize() { + return initialStreamWindowSize; + } + + void initialStreamWindowSize(int initialWindowSize) { + initialStreamWindowSize = initialWindowSize; + } + /** - * Returns the initial size of this window. + * Increment the initial window size for this stream. + * @param delta The amount to increase the initial window size by. */ - int initialWindowSize() { - int maxWindowSize = initialWindowSize; - if (stream.id() == CONNECTION_STREAM_ID) { - // Determine the maximum number of streams that we can allow without integer overflow - // of maxWindowSize * numStreams. Also take care to avoid division by zero when - // maxWindowSize == 0. - int maxNumStreams = Integer.MAX_VALUE; - if (maxWindowSize > 0) { - maxNumStreams /= maxWindowSize; - } - int numStreams = Math.min(maxNumStreams, Math.max(1, connection.numActiveStreams())); - maxWindowSize = Math.min(maxConnectionWindowSize, maxWindowSize * numStreams); + void incrementInitialStreamWindow(int delta) { + // Clip the delta so that the resulting initialStreamWindowSize falls within the allowed range. + int newValue = (int) min(MAX_INITIAL_WINDOW_SIZE, + max(MIN_INITIAL_WINDOW_SIZE, initialStreamWindowSize + (long) delta)); + delta = newValue - initialStreamWindowSize; + + initialStreamWindowSize += delta; + } + + /** + * Increment the windows which are used to determine many bytes have been processed. + * @param delta The amount to increment the window by. + * @throws Http2Exception if integer overflow occurs on the window. + */ + void incrementFlowControlWindows(int delta) throws Http2Exception { + if (delta > 0 && window > MAX_INITIAL_WINDOW_SIZE - delta) { + throw streamError(stream.id(), FLOW_CONTROL_ERROR, + "Flow control window overflowed for stream: %d", stream.id()); } - return maxWindowSize; + + window += delta; + processedWindow += delta; + lowerBound = delta < 0 ? delta : 0; + } + + /** + * A flow control event has occurred and we should decrement the amount of available bytes for this stream. + * @param dataLength The amount of data to for which this stream is no longer eligible to use for flow control. + * @throws Http2Exception If too much data is used relative to how much is available. + */ + void applyFlowControl(int dataLength) throws Http2Exception { + assert dataLength > 0; + + // Apply the delta. Even if we throw an exception we want to have taken this delta into account. + window -= dataLength; + + // Window size can become negative if we sent a SETTINGS frame that reduces the + // size of the transfer window after the peer has written data frames. + // The value is bounded by the length that SETTINGS frame decrease the window. + // This difference is stored for the connection when writing the SETTINGS frame + // and is cleared once we send a WINDOW_UPDATE frame. + if (window < lowerBound) { + throw streamError(stream.id(), FLOW_CONTROL_ERROR, + "Flow control window exceeded for stream: %d", stream.id()); + } + } + + /** + * Returns the processed bytes for this stream. + */ + void returnProcessedBytes(int delta) throws Http2Exception { + if (processedWindow - delta < window) { + throw streamError(stream.id(), INTERNAL_ERROR, + "Attempting to return too many bytes for stream %d", stream.id()); + } + processedWindow -= delta; } @Override @@ -211,9 +357,14 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro throw new IllegalArgumentException("numBytes must be positive"); } + // Return bytes to the connection window + FlowState connectionState = connectionState(); + connectionState.returnProcessedBytes(numBytes); + connectionState.writeWindowUpdateIfNeeded(ctx); + // Return the bytes processed and update the window. returnProcessedBytes(numBytes); - updateWindowIfAppropriate(ctx); + writeWindowUpdateIfNeeded(ctx); } @Override @@ -229,95 +380,29 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro /** * Updates the flow control window for this stream if it is appropriate. */ - void updateWindowIfAppropriate(ChannelHandlerContext ctx) { - if (endOfStream || initialWindowSize <= 0) { + void writeWindowUpdateIfNeeded(ChannelHandlerContext ctx) throws Http2Exception { + if (endOfStream || initialStreamWindowSize <= 0) { return; } - int threshold = (int) (initialWindowSize() * windowUpdateRatio); + int threshold = (int) (initialStreamWindowSize * streamWindowUpdateRatio); if (processedWindow <= threshold) { - updateWindow(ctx); + writeWindowUpdate(ctx); } } /** - * Returns the processed bytes for this stream. + * Called to perform a window update for this stream (or connection). Updates the window size back + * to the size of the initial window and sends a window update frame to the remote endpoint. */ - void returnProcessedBytes(int delta) throws Http2Exception { - if (processedWindow - delta < window) { - throw streamError(stream.id(), INTERNAL_ERROR, - "Attempting to return too many bytes for stream %d", stream.id()); - } - processedWindow -= delta; - } - - /** - * Adds the given delta to the window size and returns the new value. - * - * @param delta the delta in the initial window size. - * @throws Http2Exception thrown if the new window is less than the allowed lower bound. - */ - int addAndGet(int delta) throws Http2Exception { - // Apply the delta. Even if we throw an exception we want to have taken this delta into - // account. - window += delta; - if (delta > 0) { - lowerBound = 0; - } - - // Window size can become negative if we sent a SETTINGS frame that reduces the - // size of the transfer window after the peer has written data frames. - // The value is bounded by the length that SETTINGS frame decrease the window. - // This difference is stored for the connection when writing the SETTINGS frame - // and is cleared once we send a WINDOW_UPDATE frame. - if (delta < 0 && window < lowerBound) { - if (stream.id() == CONNECTION_STREAM_ID) { - throw connectionError(FLOW_CONTROL_ERROR, "Connection flow control window exceeded"); - } else { - throw streamError(stream.id(), FLOW_CONTROL_ERROR, - "Flow control window exceeded for stream: %d", stream.id()); - } - } - - return window; - } - - /** - * Called when sending a SETTINGS frame with a new initial window size. If the window has - * gotten smaller (i.e. deltaWindowSize < 0), the lower bound is set to that value. This - * will temporarily allow for receipt of data frames which were sent by the remote endpoint - * before receiving the SETTINGS frame. - * - * @param delta the delta in the initial window size. - * @throws Http2Exception thrown if integer overflow occurs on the window. - */ - void updatedInitialWindowSize(int delta) throws Http2Exception { - if (delta > 0 && window > Integer.MAX_VALUE - delta) { // Integer overflow. - throw connectionError(PROTOCOL_ERROR, "Flow control window overflowed for stream: %d", stream.id()); - } - window += delta; - processedWindow += delta; - - if (delta < 0) { - lowerBound = delta; - } - } - - /** - * Called to perform a window update for this stream (or connection). Updates the window - * size back to the size of the initial window and sends a window update frame to the remote - * endpoint. - */ - void updateWindow(ChannelHandlerContext ctx) { + void writeWindowUpdate(ChannelHandlerContext ctx) throws Http2Exception { // Expand the window for this stream back to the size of the initial window. - int deltaWindowSize = initialWindowSize() - processedWindow; - processedWindow += deltaWindowSize; + int deltaWindowSize = initialStreamWindowSize - processedWindow; try { - addAndGet(deltaWindowSize); - } catch (Http2Exception e) { - // This should never fail since we're adding. - throw new AssertionError("Caught exception while updating window with delta: " - + deltaWindowSize); + incrementFlowControlWindows(deltaWindowSize); + } catch (Throwable t) { + throw connectionError(INTERNAL_ERROR, t, + "Attempting to return too many bytes for stream %d", stream.id()); } // Send a window update for the stream/connection. diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DelegatingDecompressorFrameListener.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DelegatingDecompressorFrameListener.java index a4166ad70e..3f4fa2e3da 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/DelegatingDecompressorFrameListener.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DelegatingDecompressorFrameListener.java @@ -22,6 +22,7 @@ import static io.netty.handler.codec.http.HttpHeaderValues.GZIP; import static io.netty.handler.codec.http.HttpHeaderValues.IDENTITY; import static io.netty.handler.codec.http.HttpHeaderValues.X_DEFLATE; import static io.netty.handler.codec.http.HttpHeaderValues.X_GZIP; +import static io.netty.handler.codec.http2.Http2Exception.streamError; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; @@ -77,53 +78,63 @@ public class DelegatingDecompressorFrameListener extends Http2FrameListenerDecor final int compressedBytes = data.readableBytes() + padding; int processedBytes = 0; decompressor.incrementCompressedBytes(compressedBytes); - // call retain here as it will call release after its written to the channel - channel.writeInbound(data.retain()); - ByteBuf buf = nextReadableBuf(channel); - if (buf == null && endOfStream && channel.finish()) { - buf = nextReadableBuf(channel); - } - if (buf == null) { - if (endOfStream) { - listener.onDataRead(ctx, streamId, Unpooled.EMPTY_BUFFER, padding, true); + try { + // call retain here as it will call release after its written to the channel + channel.writeInbound(data.retain()); + ByteBuf buf = nextReadableBuf(channel); + if (buf == null && endOfStream && channel.finish()) { + buf = nextReadableBuf(channel); } - // No new decompressed data was extracted from the compressed data. This means the application could not be - // provided with data and thus could not return how many bytes were processed. We will assume there is more - // data coming which will complete the decompression block. To allow for more data we return all bytes to - // the flow control window (so the peer can send more data). - decompressor.incrementDecompressedByes(compressedBytes); - processedBytes = compressedBytes; - } else { - try { - decompressor.incrementDecompressedByes(padding); - for (;;) { - ByteBuf nextBuf = nextReadableBuf(channel); - boolean decompressedEndOfStream = nextBuf == null && endOfStream; - if (decompressedEndOfStream && channel.finish()) { - nextBuf = nextReadableBuf(channel); - decompressedEndOfStream = nextBuf == null; - } - - decompressor.incrementDecompressedByes(buf.readableBytes()); - processedBytes += listener.onDataRead(ctx, streamId, buf, padding, decompressedEndOfStream); - if (nextBuf == null) { - break; - } - - padding = 0; // Padding is only communicated once on the first iteration - buf.release(); - buf = nextBuf; + if (buf == null) { + if (endOfStream) { + listener.onDataRead(ctx, streamId, Unpooled.EMPTY_BUFFER, padding, true); } - } finally { - if (buf != null) { - buf.release(); + // No new decompressed data was extracted from the compressed data. This means the application could + // not be provided with data and thus could not return how many bytes were processed. We will assume + // there is more data coming which will complete the decompression block. To allow for more data we + // return all bytes to the flow control window (so the peer can send more data). + decompressor.incrementDecompressedByes(compressedBytes); + processedBytes = compressedBytes; + } else { + try { + decompressor.incrementDecompressedByes(padding); + for (;;) { + ByteBuf nextBuf = nextReadableBuf(channel); + boolean decompressedEndOfStream = nextBuf == null && endOfStream; + if (decompressedEndOfStream && channel.finish()) { + nextBuf = nextReadableBuf(channel); + decompressedEndOfStream = nextBuf == null; + } + + decompressor.incrementDecompressedByes(buf.readableBytes()); + processedBytes += listener.onDataRead(ctx, streamId, buf, padding, decompressedEndOfStream); + if (nextBuf == null) { + break; + } + + padding = 0; // Padding is only communicated once on the first iteration + buf.release(); + buf = nextBuf; + } + } finally { + if (buf != null) { + buf.release(); + } } } + decompressor.incrementProcessedBytes(processedBytes); + // The processed bytes will be translated to pre-decompressed byte amounts by DecompressorGarbageCollector + return processedBytes; + } catch (Http2Exception e) { + // Consider all the bytes consumed because there was an error + decompressor.incrementProcessedBytes(compressedBytes); + throw e; + } catch (Throwable t) { + // Consider all the bytes consumed because there was an error + decompressor.incrementProcessedBytes(compressedBytes); + throw streamError(stream.id(), INTERNAL_ERROR, t, + "Decompressor error detected while delegating data read on streamId %d", stream.id()); } - - decompressor.incrementProcessedBytes(processedBytes); - // The processed bytes will be translated to pre-decompressed byte amounts by DecompressorGarbageCollector - return processedBytes; } @Override diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2CodecUtil.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2CodecUtil.java index 58e75a1bd0..d8c89a34ed 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2CodecUtil.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2CodecUtil.java @@ -69,7 +69,7 @@ public final class Http2CodecUtil { public static final long MIN_HEADER_TABLE_SIZE = 0; public static final long MIN_CONCURRENT_STREAMS = 0; - public static final long MIN_INITIAL_WINDOW_SIZE = 0; + public static final int MIN_INITIAL_WINDOW_SIZE = 0; public static final long MIN_HEADER_LIST_SIZE = 0; public static final int DEFAULT_WINDOW_SIZE = 65535; diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ConnectionHandler.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ConnectionHandler.java index 83a49b8b5c..282c2c9029 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ConnectionHandler.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ConnectionHandler.java @@ -17,9 +17,9 @@ package io.netty.handler.codec.http2; import static io.netty.handler.codec.http2.Http2CodecUtil.HTTP_UPGRADE_STREAM_ID; import static io.netty.handler.codec.http2.Http2CodecUtil.connectionPrefaceBuf; import static io.netty.handler.codec.http2.Http2CodecUtil.getEmbeddedHttp2Exception; -import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR; import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR; import static io.netty.handler.codec.http2.Http2Error.NO_ERROR; +import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR; import static io.netty.handler.codec.http2.Http2Exception.connectionError; import static io.netty.handler.codec.http2.Http2Exception.isStreamError; import static io.netty.util.internal.ObjectUtil.checkNotNull; @@ -29,6 +29,7 @@ import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; import io.netty.handler.codec.ByteToMessageDecoder; +import io.netty.handler.codec.http2.Http2Exception.CompositeStreamException; import io.netty.handler.codec.http2.Http2Exception.StreamException; import java.util.Collection; @@ -279,6 +280,11 @@ public class Http2ConnectionHandler extends ByteToMessageDecoder implements Http Http2Exception embedded = getEmbeddedHttp2Exception(cause); if (isStreamError(embedded)) { onStreamError(ctx, cause, (StreamException) embedded); + } else if (embedded instanceof CompositeStreamException) { + CompositeStreamException compositException = (CompositeStreamException) embedded; + for (StreamException streamException : compositException) { + onStreamError(ctx, cause, streamException); + } } else { onConnectionError(ctx, cause, embedded); } diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2Exception.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2Exception.java index 14c1a26ec5..15f52989ee 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2Exception.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2Exception.java @@ -17,6 +17,10 @@ package io.netty.handler.codec.http2; import static io.netty.handler.codec.http2.Http2CodecUtil.CONNECTION_STREAM_ID; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + /** * Exception thrown when an HTTP/2 error was encountered. */ @@ -152,4 +156,26 @@ public class Http2Exception extends Exception { return streamId; } } + + /** + * Provides the ability to handle multiple stream exceptions with one throw statement. + */ + public static final class CompositeStreamException extends Http2Exception implements Iterable { + private static final long serialVersionUID = -434398146294199889L; + private final List exceptions; + + public CompositeStreamException(Http2Error error, int initialCapacity) { + super(error); + exceptions = new ArrayList(initialCapacity); + } + + public void add(StreamException e) { + exceptions.add(e); + } + + @Override + public Iterator iterator() { + return exceptions.iterator(); + } + } } diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2InboundFlowController.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2InboundFlowController.java index cfd70fbd98..1f34b2797e 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2InboundFlowController.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2InboundFlowController.java @@ -37,16 +37,41 @@ public interface Http2InboundFlowController { boolean endOfStream) throws Http2Exception; /** - * Sets the initial inbound flow control window size and updates all stream window sizes by the - * delta. - * + * Sets the global inbound flow control window size and updates all stream window sizes by the delta. + *

+ * This method is used to apply the {@code SETTINGS_INITIAL_WINDOW_SIZE} value for an + * outbound {@code SETTINGS} frame. + *

+ * The connection stream windows will not be modified as a result of this call. * @param newWindowSize the new initial window size. * @throws Http2Exception thrown if any protocol-related error occurred. */ - void initialInboundWindowSize(int newWindowSize) throws Http2Exception; + void initialWindowSize(int newWindowSize) throws Http2Exception; /** - * Gets the initial inbound flow control window size. + * Gets the initial window size used as the basis for new stream flow control windows. */ - int initialInboundWindowSize(); + int initialWindowSize(); + + /** + * Sets the initial inbound flow control window size for a specific stream. + *

+ * Note it is the responsibly of the caller to ensure that the the + * initial {@code SETTINGS} frame is sent before this is called. It would + * be considered a {@link Http2Error#PROTOCOL_ERROR} if a {@code WINDOW_UPDATE} + * was generated by this method before the initial {@code SETTINGS} frame is sent. + * @param ctx the context to use if a {@code WINDOW_UPDATE} is determined necessary. + * @param streamId The stream to update. + * @param newWindowSize the window size to apply to {@code streamId} + * @throws Http2Exception thrown if any protocol-related error occurred. + */ + void initialStreamWindowSize(ChannelHandlerContext ctx, int streamId, int newWindowSize) throws Http2Exception; + + /** + * Obtain the initial window size for a specific stream. + * @param streamId The stream id to get the initial window size for. + * @return The initial window size for {@code streamId}. + * @throws Http2Exception If no stream corresponding to {@code stream} could be found. + */ + int initialStreamWindowSize(int streamId) throws Http2Exception; } diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2OutboundFrameLogger.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2OutboundFrameLogger.java index 9e4fad3c22..31ac151c05 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2OutboundFrameLogger.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2OutboundFrameLogger.java @@ -69,6 +69,7 @@ public class Http2OutboundFrameLogger implements Http2FrameWriter { @Override public ChannelFuture writeRstStream(ChannelHandlerContext ctx, int streamId, long errorCode, ChannelPromise promise) { + logger.logRstStream(OUTBOUND, streamId, errorCode); return writer.writeRstStream(ctx, streamId, errorCode, promise); } diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/DataCompressionHttp2Test.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/DataCompressionHttp2Test.java index 4192151628..47b4defbaf 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/DataCompressionHttp2Test.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/DataCompressionHttp2Test.java @@ -77,12 +77,10 @@ public class DataCompressionHttp2Test { @Mock private Http2FrameListener clientListener; - private Http2ConnectionEncoder serverEncoder; private Http2ConnectionEncoder clientEncoder; private ServerBootstrap sb; private Bootstrap cb; private Channel serverChannel; - private Channel serverConnectedChannel; private Channel clientChannel; private volatile CountDownLatch serverLatch; private volatile CountDownLatch clientLatch; @@ -226,53 +224,7 @@ public class DataCompressionHttp2Test { } @Test - public void deflateEncodingSingleLargeMessageReducedWindow() throws Exception { - final int BUFFER_SIZE = 1 << 16; - bootstrapEnv(1, BUFFER_SIZE, 1); - final ByteBuf data = Unpooled.buffer(BUFFER_SIZE); - try { - for (int i = 0; i < data.capacity(); ++i) { - data.writeByte((byte) 'a'); - } - final Http2Headers headers = new DefaultHttp2Headers().method(POST).path(PATH) - .set(HttpHeaderNames.CONTENT_ENCODING, HttpHeaderValues.DEFLATE); - final Http2Settings settings = new Http2Settings(); - - // Assume the compression operation will reduce the size by at least 10 bytes - settings.initialWindowSize(BUFFER_SIZE - 10); - runInChannel(serverConnectedChannel, new Http2Runnable() { - @Override - public void run() { - serverEncoder.writeSettings(ctxServer(), settings, newPromiseServer()); - ctxServer().flush(); - } - }); - awaitClient(); - - // Required because the decompressor intercepts the onXXXRead events before - // our {@link Http2TestUtil$FrameAdapter} does. - Http2Stream stream = FrameAdapter.getOrCreateStream(serverConnection, 3, false); - FrameAdapter.getOrCreateStream(clientConnection, 3, false); - runInChannel(clientChannel, new Http2Runnable() { - @Override - public void run() { - clientEncoder.writeSettings(ctxClient(), settings, newPromiseClient()); - clientEncoder.writeHeaders(ctxClient(), 3, headers, 0, false, newPromiseClient()); - clientEncoder.writeData(ctxClient(), 3, data.retain(), 0, true, newPromiseClient()); - ctxClient().flush(); - } - }); - awaitServer(); - assertEquals(0, stream.garbageCollector().unProcessedBytes()); - assertEquals(data.resetReaderIndex().toString(CharsetUtil.UTF_8), - serverOut.toString(CharsetUtil.UTF_8.name())); - } finally { - data.release(); - } - } - - @Test - public void deflateEncodingMultipleWriteLargeMessageReducedWindow() throws Exception { + public void deflateEncodingWriteLargeMessage() throws Exception { final int BUFFER_SIZE = 1 << 12; final byte[] bytes = new byte[BUFFER_SIZE]; new Random().nextBytes(bytes); @@ -281,17 +233,6 @@ public class DataCompressionHttp2Test { try { final Http2Headers headers = new DefaultHttp2Headers().method(POST).path(PATH) .set(HttpHeaderNames.CONTENT_ENCODING, HttpHeaderValues.DEFLATE); - final Http2Settings settings = new Http2Settings(); - - settings.initialWindowSize(BUFFER_SIZE / 2); - runInChannel(serverConnectedChannel, new Http2Runnable() { - @Override - public void run() { - serverEncoder.writeSettings(ctxServer(), settings, newPromiseServer()); - ctxServer().flush(); - } - }); - awaitClient(); // Required because the decompressor intercepts the onXXXRead events before // our {@link Http2TestUtil$FrameAdapter} does. @@ -300,7 +241,6 @@ public class DataCompressionHttp2Test { runInChannel(clientChannel, new Http2Runnable() { @Override public void run() { - clientEncoder.writeSettings(ctxClient(), settings, newPromiseClient()); clientEncoder.writeHeaders(ctxClient(), 3, headers, 0, false, newPromiseClient()); clientEncoder.writeData(ctxClient(), 3, data.retain(), 0, true, newPromiseClient()); ctxClient().flush(); @@ -362,8 +302,6 @@ public class DataCompressionHttp2Test { .listener(new DelegatingDecompressorFrameListener(serverConnection, serverListener)), new CompressorHttp2ConnectionEncoder.Builder().connection(serverConnection).frameWriter(writer) .outboundFlow(new DefaultHttp2OutboundFlowController(serverConnection, writer))); - serverEncoder = connectionHandler.encoder(); - serverConnectedChannel = ch; p.addLast(connectionHandler); p.addLast(Http2CodecUtil.ignoreSettingsHandler()); serverChannelLatch.countDown(); @@ -401,10 +339,6 @@ public class DataCompressionHttp2Test { assertTrue(serverChannelLatch.await(5, SECONDS)); } - private void awaitClient() throws Exception { - assertTrue(clientLatch.await(5, SECONDS)); - } - private void awaitServer() throws Exception { assertTrue(serverLatch.await(5, SECONDS)); serverOut.flush(); @@ -417,12 +351,4 @@ public class DataCompressionHttp2Test { private ChannelPromise newPromiseClient() { return ctxClient().newPromise(); } - - private ChannelHandlerContext ctxServer() { - return serverConnectedChannel.pipeline().firstContext(); - } - - private ChannelPromise newPromiseServer() { - return ctxServer().newPromise(); - } } diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2InboundFlowControllerTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2InboundFlowControllerTest.java index af0c2542e5..b556e628e4 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2InboundFlowControllerTest.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2InboundFlowControllerTest.java @@ -57,6 +57,8 @@ public class DefaultHttp2InboundFlowControllerTest { private DefaultHttp2Connection connection; + private static float updateRatio = 0.5f; + @Before public void setup() throws Http2Exception { MockitoAnnotations.initMocks(this); @@ -64,7 +66,7 @@ public class DefaultHttp2InboundFlowControllerTest { when(ctx.newPromise()).thenReturn(promise); connection = new DefaultHttp2Connection(false); - controller = new DefaultHttp2InboundFlowController(connection, frameWriter); + controller = new DefaultHttp2InboundFlowController(connection, frameWriter, updateRatio); connection.local().createStream(STREAM_ID, false); } @@ -77,16 +79,17 @@ public class DefaultHttp2InboundFlowControllerTest { @Test public void windowUpdateShouldSendOnceBytesReturned() throws Http2Exception { - int dataSize = DEFAULT_WINDOW_SIZE / 2 + 1; + int dataSize = (int) (DEFAULT_WINDOW_SIZE * updateRatio) + 1; applyFlowControl(STREAM_ID, dataSize, 0, false); // Return only a few bytes and verify that the WINDOW_UPDATE hasn't been sent. returnProcessedBytes(STREAM_ID, 10); - verifyWindowUpdateSent(CONNECTION_STREAM_ID, dataSize); + verifyWindowUpdateNotSent(CONNECTION_STREAM_ID); // Return the rest and verify the WINDOW_UPDATE is sent. returnProcessedBytes(STREAM_ID, dataSize - 10); verifyWindowUpdateSent(STREAM_ID, dataSize); + verifyWindowUpdateSent(CONNECTION_STREAM_ID, dataSize); } @Test(expected = Http2Exception.class) @@ -97,22 +100,21 @@ public class DefaultHttp2InboundFlowControllerTest { @Test public void windowUpdateShouldNotBeSentAfterEndOfStream() throws Http2Exception { - int dataSize = DEFAULT_WINDOW_SIZE / 2 + 1; - int newWindow = DEFAULT_WINDOW_SIZE - dataSize; - int windowDelta = DEFAULT_WINDOW_SIZE - newWindow; + int dataSize = (int) (DEFAULT_WINDOW_SIZE * updateRatio) + 1; // Set end-of-stream on the frame, so no window update will be sent for the stream. applyFlowControl(STREAM_ID, dataSize, 0, true); - verifyWindowUpdateSent(CONNECTION_STREAM_ID, windowDelta); + verifyWindowUpdateNotSent(CONNECTION_STREAM_ID); verifyWindowUpdateNotSent(STREAM_ID); returnProcessedBytes(STREAM_ID, dataSize); + verifyWindowUpdateSent(CONNECTION_STREAM_ID, dataSize); verifyWindowUpdateNotSent(STREAM_ID); } @Test public void halfWindowRemainingShouldUpdateAllWindows() throws Http2Exception { - int dataSize = DEFAULT_WINDOW_SIZE / 2 + 1; + int dataSize = (int) (DEFAULT_WINDOW_SIZE * updateRatio) + 1; int initialWindowSize = DEFAULT_WINDOW_SIZE; int windowDelta = getWindowDelta(initialWindowSize, initialWindowSize, dataSize); @@ -129,14 +131,14 @@ public class DefaultHttp2InboundFlowControllerTest { int initialWindowSize = DEFAULT_WINDOW_SIZE; applyFlowControl(STREAM_ID, initialWindowSize, 0, false); assertEquals(0, window(STREAM_ID)); - assertEquals(DEFAULT_WINDOW_SIZE, window(CONNECTION_STREAM_ID)); + assertEquals(0, window(CONNECTION_STREAM_ID)); returnProcessedBytes(STREAM_ID, initialWindowSize); assertEquals(initialWindowSize, window(STREAM_ID)); assertEquals(DEFAULT_WINDOW_SIZE, window(CONNECTION_STREAM_ID)); // Update the initial window size to allow another frame. int newInitialWindowSize = 2 * initialWindowSize; - controller.initialInboundWindowSize(newInitialWindowSize); + controller.initialWindowSize(newInitialWindowSize); assertEquals(newInitialWindowSize, window(STREAM_ID)); assertEquals(DEFAULT_WINDOW_SIZE, window(CONNECTION_STREAM_ID)); @@ -147,57 +149,102 @@ public class DefaultHttp2InboundFlowControllerTest { applyFlowControl(STREAM_ID, initialWindowSize, 0, false); returnProcessedBytes(STREAM_ID, initialWindowSize); int delta = newInitialWindowSize - initialWindowSize; - verifyWindowUpdateSent(CONNECTION_STREAM_ID, newInitialWindowSize); verifyWindowUpdateSent(STREAM_ID, delta); + verifyWindowUpdateSent(CONNECTION_STREAM_ID, delta); } @Test - public void connectionWindowShouldExpandWithNumberOfStreams() throws Http2Exception { - // Create another stream + public void connectionWindowShouldAdjustWithMultipleStreams() throws Http2Exception { int newStreamId = 3; connection.local().createStream(newStreamId, false); - assertEquals(DEFAULT_WINDOW_SIZE, window(STREAM_ID)); - assertEquals(DEFAULT_WINDOW_SIZE, window(CONNECTION_STREAM_ID)); + try { + assertEquals(DEFAULT_WINDOW_SIZE, window(STREAM_ID)); + assertEquals(DEFAULT_WINDOW_SIZE, window(CONNECTION_STREAM_ID)); - // Receive some data - this should cause the connection window to expand. - int data1 = 50; - int expectedMaxConnectionWindow = DEFAULT_WINDOW_SIZE * 2; - applyFlowControl(STREAM_ID, data1, 0, false); - verifyWindowUpdateNotSent(STREAM_ID); - verifyWindowUpdateSent(CONNECTION_STREAM_ID, DEFAULT_WINDOW_SIZE + data1); - assertEquals(DEFAULT_WINDOW_SIZE - data1, window(STREAM_ID)); - assertEquals(expectedMaxConnectionWindow, window(CONNECTION_STREAM_ID)); + // Test that both stream and connection window are updated (or not updated) together + int data1 = (int) (DEFAULT_WINDOW_SIZE * updateRatio) + 1; + applyFlowControl(STREAM_ID, data1, 0, false); + verifyWindowUpdateNotSent(STREAM_ID); + verifyWindowUpdateNotSent(CONNECTION_STREAM_ID); + assertEquals(DEFAULT_WINDOW_SIZE - data1, window(STREAM_ID)); + assertEquals(DEFAULT_WINDOW_SIZE - data1, window(CONNECTION_STREAM_ID)); + returnProcessedBytes(STREAM_ID, data1); + verifyWindowUpdateSent(STREAM_ID, data1); + verifyWindowUpdateSent(CONNECTION_STREAM_ID, data1); + reset(frameWriter); + + // Create a scenario where data is depleted from multiple streams, but not enough data + // to generate a window update on those streams. The amount will be enough to generate + // a window update for the connection stream. + --data1; + int data2 = data1 >> 1; + applyFlowControl(STREAM_ID, data1, 0, false); + applyFlowControl(newStreamId, data1, 0, false); + verifyWindowUpdateNotSent(STREAM_ID); + verifyWindowUpdateNotSent(newStreamId); + verifyWindowUpdateNotSent(CONNECTION_STREAM_ID); + assertEquals(DEFAULT_WINDOW_SIZE - data1, window(STREAM_ID)); + assertEquals(DEFAULT_WINDOW_SIZE - data1, window(newStreamId)); + assertEquals(DEFAULT_WINDOW_SIZE - (data1 << 1), window(CONNECTION_STREAM_ID)); + returnProcessedBytes(STREAM_ID, data1); + returnProcessedBytes(newStreamId, data2); + verifyWindowUpdateNotSent(STREAM_ID); + verifyWindowUpdateNotSent(newStreamId); + verifyWindowUpdateSent(CONNECTION_STREAM_ID, data1 + data2); + assertEquals(DEFAULT_WINDOW_SIZE - data1, window(STREAM_ID)); + assertEquals(DEFAULT_WINDOW_SIZE - data1, window(newStreamId)); + assertEquals(DEFAULT_WINDOW_SIZE - (data1 - data2), window(CONNECTION_STREAM_ID)); + } finally { + connection.stream(newStreamId).close(); + } + } + + @Test + public void globalRatioShouldImpactStreams() throws Http2Exception { + float ratio = 0.6f; + controller.windowUpdateRatio(ratio); + testRatio(ratio, DEFAULT_WINDOW_SIZE << 1, 3, false); + } + + @Test + public void streamlRatioShouldImpactStreams() throws Http2Exception { + float ratio = 0.6f; + testRatio(ratio, DEFAULT_WINDOW_SIZE << 1, 3, true); + } + + private void testRatio(float ratio, int newDefaultWindowSize, int newStreamId, boolean setStreamRatio) + throws Http2Exception { + controller.initialStreamWindowSize(ctx, 0, newDefaultWindowSize); + connection.local().createStream(newStreamId, false); + if (setStreamRatio) { + controller.windowUpdateRatio(ctx, newStreamId, ratio); + } + controller.initialStreamWindowSize(ctx, newStreamId, newDefaultWindowSize); reset(frameWriter); - - // Close the new stream. - connection.stream(newStreamId).close(); - - // Read more data and verify that the stream window refreshes but the - // connection window continues collapsing. - int data2 = window(STREAM_ID); - applyFlowControl(STREAM_ID, data2, 0, false); - returnProcessedBytes(STREAM_ID, data2); - verifyWindowUpdateSent(STREAM_ID, data2); - verifyWindowUpdateNotSent(CONNECTION_STREAM_ID); - assertEquals(DEFAULT_WINDOW_SIZE - data1, window(STREAM_ID)); - assertEquals(DEFAULT_WINDOW_SIZE * 2 - data2 , window(CONNECTION_STREAM_ID)); - - reset(frameWriter); - returnProcessedBytes(STREAM_ID, data1); - verifyWindowUpdateNotSent(STREAM_ID); - - // Read enough data to cause a WINDOW_UPDATE for both the stream and connection and - // verify the new maximum of the connection window. - int data3 = window(STREAM_ID); - applyFlowControl(STREAM_ID, data3, 0, false); - returnProcessedBytes(STREAM_ID, data3); - verifyWindowUpdateSent(STREAM_ID, DEFAULT_WINDOW_SIZE); - verifyWindowUpdateSent(CONNECTION_STREAM_ID, DEFAULT_WINDOW_SIZE - - (DEFAULT_WINDOW_SIZE * 2 - (data2 + data3))); - assertEquals(DEFAULT_WINDOW_SIZE, window(STREAM_ID)); - assertEquals(DEFAULT_WINDOW_SIZE, window(CONNECTION_STREAM_ID)); + try { + int data1 = (int) (newDefaultWindowSize * ratio) + 1; + int data2 = (int) (DEFAULT_WINDOW_SIZE * updateRatio) >> 1; + applyFlowControl(STREAM_ID, data2, 0, false); + applyFlowControl(newStreamId, data1, 0, false); + verifyWindowUpdateNotSent(STREAM_ID); + verifyWindowUpdateNotSent(newStreamId); + verifyWindowUpdateNotSent(CONNECTION_STREAM_ID); + assertEquals(DEFAULT_WINDOW_SIZE - data2, window(STREAM_ID)); + assertEquals(newDefaultWindowSize - data1, window(newStreamId)); + assertEquals(newDefaultWindowSize - data2 - data1, window(CONNECTION_STREAM_ID)); + returnProcessedBytes(STREAM_ID, data2); + returnProcessedBytes(newStreamId, data1); + verifyWindowUpdateNotSent(STREAM_ID); + verifyWindowUpdateSent(newStreamId, data1); + verifyWindowUpdateSent(CONNECTION_STREAM_ID, data1 + data2); + assertEquals(DEFAULT_WINDOW_SIZE - data2, window(STREAM_ID)); + assertEquals(newDefaultWindowSize, window(newStreamId)); + assertEquals(newDefaultWindowSize, window(CONNECTION_STREAM_ID)); + } finally { + connection.stream(newStreamId).close(); + } } private static int getWindowDelta(int initialSize, int windowSize, int dataSize) { diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/HttpToHttp2ConnectionHandlerTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/HttpToHttp2ConnectionHandlerTest.java index 204a133852..760b5d2dd8 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/HttpToHttp2ConnectionHandlerTest.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/HttpToHttp2ConnectionHandlerTest.java @@ -71,6 +71,7 @@ import org.mockito.stubbing.Answer; * Testing the {@link HttpToHttp2ConnectionHandler} for {@link FullHttpRequest} objects into HTTP/2 frames */ public class HttpToHttp2ConnectionHandlerTest { + private static final int WAIT_TIME_SECONDS = 5; @Mock private Http2FrameListener clientListener; @@ -122,9 +123,9 @@ public class HttpToHttp2ConnectionHandlerTest { ChannelPromise writePromise = newPromise(); ChannelFuture writeFuture = clientChannel.writeAndFlush(request, writePromise); - writePromise.awaitUninterruptibly(2, SECONDS); + assertTrue(writePromise.awaitUninterruptibly(WAIT_TIME_SECONDS, SECONDS)); assertTrue(writePromise.isSuccess()); - writeFuture.awaitUninterruptibly(2, SECONDS); + assertTrue(writeFuture.awaitUninterruptibly(WAIT_TIME_SECONDS, SECONDS)); assertTrue(writeFuture.isSuccess()); awaitRequests(); verify(serverListener).onHeadersRead(any(ChannelHandlerContext.class), eq(5), @@ -162,9 +163,9 @@ public class HttpToHttp2ConnectionHandlerTest { ChannelPromise writePromise = newPromise(); ChannelFuture writeFuture = clientChannel.writeAndFlush(request, writePromise); - writePromise.awaitUninterruptibly(2, SECONDS); + assertTrue(writePromise.awaitUninterruptibly(WAIT_TIME_SECONDS, SECONDS)); assertTrue(writePromise.isSuccess()); - writeFuture.awaitUninterruptibly(2, SECONDS); + assertTrue(writeFuture.awaitUninterruptibly(WAIT_TIME_SECONDS, SECONDS)); assertTrue(writeFuture.isSuccess()); awaitRequests(); verify(serverListener).onHeadersRead(any(ChannelHandlerContext.class), eq(3), eq(http2Headers), eq(0), @@ -214,7 +215,7 @@ public class HttpToHttp2ConnectionHandlerTest { } private void awaitRequests() throws Exception { - assertTrue(requestLatch.await(2, SECONDS)); + assertTrue(requestLatch.await(WAIT_TIME_SECONDS, SECONDS)); } private ChannelHandlerContext ctx() {