diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2Connection.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2Connection.java index 60308f947a..8102a79991 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2Connection.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2Connection.java @@ -217,8 +217,8 @@ public class DefaultHttp2Connection implements Http2Connection { private boolean resetReceived; private boolean endOfStreamSent; private boolean endOfStreamReceived; - private FlowState inboundFlow; - private FlowState outboundFlow; + private Http2InboundFlowState inboundFlow; + private Http2FlowState outboundFlow; private EmbeddedChannel decompressor; private EmbeddedChannel compressor; private Object data; @@ -324,22 +324,22 @@ public class DefaultHttp2Connection implements Http2Connection { } @Override - public FlowState inboundFlow() { + public Http2InboundFlowState inboundFlow() { return inboundFlow; } @Override - public void inboundFlow(FlowState state) { + public void inboundFlow(Http2InboundFlowState state) { inboundFlow = state; } @Override - public FlowState outboundFlow() { + public Http2FlowState outboundFlow() { return outboundFlow; } @Override - public void outboundFlow(FlowState state) { + public void outboundFlow(Http2FlowState state) { outboundFlow = state; } diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionDecoder.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionDecoder.java index ee6f3f80e8..73988cd2a8 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionDecoder.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionDecoder.java @@ -198,8 +198,8 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder { private final class FrameReadListener implements Http2FrameListener { @Override - public void onDataRead(final ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, - boolean endOfStream) throws Http2Exception { + public int onDataRead(final ChannelHandlerContext ctx, int streamId, ByteBuf data, + int padding, boolean endOfStream) throws Http2Exception { verifyPrefaceReceived(); // Check if we received a data frame for a stream which is half-closed @@ -211,9 +211,9 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder { // We should ignore this frame if RST_STREAM was sent or if GO_AWAY was sent with a // lower stream ID. - boolean shouldIgnore = shouldIgnoreFrame(stream); - boolean shouldApplyFlowControl = false; + int processedBytes = data.readableBytes() + padding; + boolean shouldIgnore = shouldIgnoreFrame(stream); Http2Exception error = null; switch (stream.state()) { case OPEN: @@ -240,26 +240,36 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder { break; } - // If we should apply flow control, do so now. - if (shouldApplyFlowControl) { - inboundFlow.onDataRead(ctx, streamId, data, padding, endOfStream); - } + try { + // If we should apply flow control, do so now. + if (shouldApplyFlowControl) { + inboundFlow.applyFlowControl(ctx, streamId, data, padding, endOfStream); + } - // If we should ignore this frame, do so now. - if (shouldIgnore) { - return; - } + // If we should ignore this frame, do so now. + if (shouldIgnore) { + return processedBytes; + } - // If the stream was in an invalid state to receive the frame, throw the error. - if (error != null) { - throw error; - } + // If the stream was in an invalid state to receive the frame, throw the error. + if (error != null) { + throw error; + } - listener.onDataRead(ctx, streamId, data, padding, endOfStream); + // Call back the application and retrieve the number of bytes that have been + // immediately processed. + processedBytes = listener.onDataRead(ctx, streamId, data, padding, endOfStream); + return processedBytes; + } finally { + // If appropriate, returned the processed bytes to the flow controller. + if (shouldApplyFlowControl && processedBytes > 0) { + stream.inboundFlow().returnProcessedBytes(ctx, processedBytes); + } - if (endOfStream) { - stream.endOfStreamReceived(); - lifecycleManager.closeRemoteSide(stream, ctx.newSucceededFuture()); + if (endOfStream) { + stream.endOfStreamReceived(); + lifecycleManager.closeRemoteSide(stream, ctx.newSucceededFuture()); + } } } diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2InboundFlowController.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2InboundFlowController.java index 6fb518fe44..d5fce6d9ea 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2InboundFlowController.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2InboundFlowController.java @@ -39,29 +39,33 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro */ public static final int DEFAULT_MAX_CONNECTION_WINDOW_SIZE = 1048576 * 2; - /** - * A value for the window update ratio to be use in order to disable window updates for - * a stream (i.e. {@code 0}). - */ - public static final double WINDOW_UPDATE_OFF = 0.0; - private final Http2Connection connection; private final Http2FrameWriter frameWriter; + private final double windowUpdateRatio; private int maxConnectionWindowSize = DEFAULT_MAX_CONNECTION_WINDOW_SIZE; private int initialWindowSize = DEFAULT_WINDOW_SIZE; public DefaultHttp2InboundFlowController(Http2Connection connection, Http2FrameWriter frameWriter) { + this(connection, frameWriter, DEFAULT_WINDOW_UPDATE_RATIO); + } + + public DefaultHttp2InboundFlowController(Http2Connection connection, + Http2FrameWriter frameWriter, double windowUpdateRatio) { this.connection = checkNotNull(connection, "connection"); this.frameWriter = checkNotNull(frameWriter, "frameWriter"); + if (Double.compare(windowUpdateRatio, 0.0) <= 0 || Double.compare(windowUpdateRatio, 1.0) >= 0) { + throw new IllegalArgumentException("Invalid ratio: " + windowUpdateRatio); + } + this.windowUpdateRatio = windowUpdateRatio; // Add a flow state for the connection. - connection.connectionStream().inboundFlow(new InboundFlowState(CONNECTION_STREAM_ID)); + connection.connectionStream().inboundFlow(new FlowState(CONNECTION_STREAM_ID)); // Register for notification of new streams. connection.addListener(new Http2ConnectionAdapter() { @Override public void streamAdded(Http2Stream stream) { - stream.inboundFlow(new InboundFlowState(stream.id())); + stream.inboundFlow(new FlowState(stream.id())); } }); } @@ -80,7 +84,6 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro initialWindowSize = newWindowSize; // Apply the delta to all of the windows. - connectionState().addAndGet(deltaWindowSize); for (Http2Stream stream : connection.activeStreams()) { state(stream).updatedInitialWindowSize(deltaWindowSize); } @@ -91,125 +94,75 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro return initialWindowSize; } - /** - * Sets the per-stream ratio used to control when window update is performed. The value - * specified is a ratio of the window size to the initial window size, below which a - * {@code WINDOW_UPDATE} should be sent to expand the window. If the given value is - * {@link #WINDOW_UPDATE_OFF} (i.e. {@code 0}) window updates will be disabled for the - * stream. - * - * @throws IllegalArgumentException if the stream does not exist or if the ratio value is - * outside of the range 0 (inclusive) to 1 (exclusive). - */ - public void setWindowUpdateRatio(ChannelHandlerContext ctx, int streamId, double ratio) { - if (ratio < WINDOW_UPDATE_OFF || ratio >= 1.0) { - throw new IllegalArgumentException("Invalid ratio: " + ratio); - } - - InboundFlowState state = state(streamId); - if (state == null) { - throw new IllegalArgumentException("Stream does not exist: " + streamId); - } - - // Set the ratio. - state.setWindowUpdateRatio(ratio); - - // In the event of enabling window update, check to see if we need to update the window now. - try { - state.updateWindowIfAppropriate(ctx); - } catch (Http2Exception e) { - // Shouldn't happen since we only increase the window. - throw new RuntimeException(e); - } - } - @Override - public void onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, boolean endOfStream) - throws Http2Exception { + public void applyFlowControl(ChannelHandlerContext ctx, int streamId, ByteBuf data, + int padding, boolean endOfStream) throws Http2Exception { int dataLength = data.readableBytes() + padding; - boolean windowUpdateSent = false; - try { - windowUpdateSent = applyConnectionFlowControl(ctx, dataLength); + int delta = -dataLength; - // Apply the stream-level flow control. - windowUpdateSent |= applyStreamFlowControl(ctx, streamId, dataLength, endOfStream); - } finally { - // Optimization: only flush once for any sent WINDOW_UPDATE frames. - if (windowUpdateSent) { - ctx.flush(); - } - } + // Apply the connection-level flow control. Immediately return the bytes for the connection + // window so that data on this stream does not starve other stream. + FlowState connectionState = connectionState(); + connectionState.addAndGet(delta); + connectionState.returnProcessedBytes(dataLength); + connectionState.updateWindowIfAppropriate(ctx); + + // Apply the stream-level flow control, but do not return the bytes immediately. + FlowState state = stateOrFail(streamId); + state.endOfStream(endOfStream); + state.addAndGet(delta); } - private InboundFlowState connectionState() { + private FlowState connectionState() { return state(connection.connectionStream()); } - private InboundFlowState state(int streamId) { - return state(connection.stream(streamId)); + private FlowState state(int streamId) { + Http2Stream stream = connection.stream(streamId); + return stream != null ? state(stream) : null; } - private InboundFlowState state(Http2Stream stream) { - return stream != null ? (InboundFlowState) stream.inboundFlow() : null; + private FlowState state(Http2Stream stream) { + return (FlowState) stream.inboundFlow(); } /** * Gets the window for the stream or raises a {@code PROTOCOL_ERROR} if not found. */ - private InboundFlowState stateOrFail(int streamId) throws Http2Exception { - InboundFlowState state = state(streamId); + private FlowState stateOrFail(int streamId) throws Http2Exception { + FlowState state = state(streamId); if (state == null) { throw protocolError("Flow control window missing for stream: %d", streamId); } return state; } - /** - * Apply connection-wide flow control to the incoming data frame. - * - * @return {@code true} if a {@code WINDOW_UPDATE} frame was sent for the connection. - */ - private boolean applyConnectionFlowControl(ChannelHandlerContext ctx, int dataLength) - throws Http2Exception { - // Remove the data length from the available window size. Throw if the lower bound - // was exceeded. - InboundFlowState connectionState = connectionState(); - connectionState.addAndGet(-dataLength); - - // If appropriate, send a WINDOW_UPDATE frame to open the window. - return connectionState.updateWindowIfAppropriate(ctx); - } - - /** - * Apply stream-based flow control to the incoming data frame. - * - * @return {@code true} if a {@code WINDOW_UPDATE} frame was sent for the stream. - */ - private boolean applyStreamFlowControl(ChannelHandlerContext ctx, int streamId, int dataLength, - boolean endOfStream) throws Http2Exception { - // Remove the data length from the available window size. Throw if the lower bound - // was exceeded. - InboundFlowState state = stateOrFail(streamId); - state.addAndGet(-dataLength); - state.endOfStream(endOfStream); - - // If appropriate, send a WINDOW_UPDATE frame to open the window. - return state.updateWindowIfAppropriate(ctx); - } - /** * Flow control window state for an individual stream. */ - private final class InboundFlowState implements FlowState { + private final class FlowState implements Http2InboundFlowState { private final int streamId; + + /** + * The actual flow control window that is decremented as soon as {@code DATA} arrives. + */ private int window; + + /** + * A view of {@link #window} that is used to determine when to send {@code WINDOW_UPDATE} + * frames. Decrementing this window for received {@code DATA} frames is delayed until the + * application has indicated that the data has been fully processed. This prevents sending + * a {@code WINDOW_UPDATE} until the number of processed bytes drops below the threshold. + */ + private int processedWindow; + private int lowerBound; private boolean endOfStream; - private double windowUpdateRatio = DEFAULT_WINDOW_UPDATE_RATIO; - InboundFlowState(int streamId) { + FlowState(int streamId) { this.streamId = streamId; window = initialWindowSize; + processedWindow = window; } @Override @@ -221,13 +174,6 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro this.endOfStream = endOfStream; } - /** - * Enables or disables window updates for this stream. - */ - void setWindowUpdateRatio(double ratio) { - windowUpdateRatio = ratio; - } - /** * Returns the initial size of this window. */ @@ -247,22 +193,44 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro return maxWindowSize; } + @Override + public void returnProcessedBytes(ChannelHandlerContext ctx, int numBytes) throws Http2Exception { + if (streamId == CONNECTION_STREAM_ID) { + throw new UnsupportedOperationException("Returning bytes for the connection window is not supported"); + } + checkNotNull(ctx, "ctx"); + if (numBytes <= 0) { + throw new IllegalArgumentException("numBytes must be positive"); + } + + // Return the bytes processed and update the window. + returnProcessedBytes(numBytes); + updateWindowIfAppropriate(ctx); + } + /** * Updates the flow control window for this stream if it is appropriate. - * - * @return {@code} true if a {@code WINDOW_UPDATE} frame was sent. */ - boolean updateWindowIfAppropriate(ChannelHandlerContext ctx) throws Http2Exception { - if (windowUpdateRatio == WINDOW_UPDATE_OFF || endOfStream || initialWindowSize <= 0) { - return false; + void updateWindowIfAppropriate(ChannelHandlerContext ctx) { + if (endOfStream || initialWindowSize <= 0) { + return; } int threshold = (int) (initialWindowSize() * windowUpdateRatio); - if (window <= threshold) { + if (processedWindow <= threshold) { updateWindow(ctx); - return true; } - return false; + } + + /** + * Returns the processed bytes for this stream. + */ + void returnProcessedBytes(int delta) { + if (processedWindow - delta < window) { + throw new IllegalArgumentException( + "Attempting to return too many bytes for stream " + streamId); + } + processedWindow -= delta; } /** @@ -284,7 +252,7 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro // The value is bounded by the length that SETTINGS frame decrease the window. // This difference is stored for the connection when writing the SETTINGS frame // and is cleared once we send a WINDOW_UPDATE frame. - if (window < lowerBound) { + if (delta < 0 && window < lowerBound) { if (streamId == CONNECTION_STREAM_ID) { throw protocolError("Connection flow control window exceeded"); } else { @@ -310,6 +278,7 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro throw flowControlError("Flow control window overflowed for stream: %d", streamId); } window += delta; + processedWindow += delta; if (delta < 0) { lowerBound = delta; @@ -321,11 +290,21 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro * size back to the size of the initial window and sends a window update frame to the remote * endpoint. */ - void updateWindow(ChannelHandlerContext ctx) throws Http2Exception { + void updateWindow(ChannelHandlerContext ctx) { // Expand the window for this stream back to the size of the initial window. int deltaWindowSize = initialWindowSize() - window; - addAndGet(deltaWindowSize); + processedWindow += deltaWindowSize; + try { + addAndGet(deltaWindowSize); + } catch (Http2Exception e) { + // This should never fail since we're adding. + throw new AssertionError("Caught exception while updating window with delta: " + + deltaWindowSize); + } + + // Send a window update for the stream/connection. frameWriter.writeWindowUpdate(ctx, streamId, deltaWindowSize, ctx.newPromise()); + ctx.flush(); } } } diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2OutboundFlowController.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2OutboundFlowController.java index d962a656af..fa7cfe076e 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2OutboundFlowController.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2OutboundFlowController.java @@ -353,7 +353,7 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont /** * The outbound flow control state for a single stream. */ - final class OutboundFlowState implements FlowState { + final class OutboundFlowState implements Http2FlowState { private final Queue pendingWriteQueue; private final Http2Stream stream; private int window = initialWindowSize; diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DelegatingDecompressorFrameListener.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DelegatingDecompressorFrameListener.java index 56bbd59dd5..817bbdbf7b 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/DelegatingDecompressorFrameListener.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DelegatingDecompressorFrameListener.java @@ -62,16 +62,18 @@ public class DelegatingDecompressorFrameListener extends Http2FrameListenerDecor } @Override - public void onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, boolean endOfStream) + public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, boolean endOfStream) throws Http2Exception { final Http2Stream stream = connection.stream(streamId); final EmbeddedChannel decompressor = stream == null ? null : stream.decompressor(); if (decompressor == null) { // The decompressor may be null if no compatible encoding type was found in this stream's headers - listener.onDataRead(ctx, streamId, data, padding, endOfStream); - return; + return listener.onDataRead(ctx, streamId, data, padding, endOfStream); } + // When decompressing, always opt-out of application-level flow control. + // TODO: investigate how to apply application-level flow control when decompressing. + int processedBytes = data.readableBytes() + padding; try { // call retain here as it will call release after its written to the channel decompressor.writeInbound(data.retain()); @@ -96,6 +98,7 @@ public class DelegatingDecompressorFrameListener extends Http2FrameListenerDecor buf = nextBuf; } } + return processedBytes; } finally { if (endOfStream) { cleanup(stream, decompressor); diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2DataListener.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2DataListener.java deleted file mode 100644 index b32bb2170f..0000000000 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2DataListener.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Copyright 2014 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ -package io.netty.handler.codec.http2; - -import io.netty.buffer.ByteBuf; -import io.netty.channel.ChannelHandlerContext; - -/** - * An listener of HTTP/2 {@code DATA} frames. - */ -public interface Http2DataListener { - - /** - * Handles an inbound {@code DATA} frame. - * - * @param ctx the context from the handler where the frame was read. - * @param streamId the subject stream for the frame. - * @param data payload buffer for the frame. If this buffer needs to be retained by the listener - * they must make a copy. - * @param padding the number of padding bytes found at the end of the frame. - * @param endOfStream Indicates whether this is the last frame to be sent from the remote - * endpoint for this stream. - */ - void onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, - boolean endOfStream) throws Http2Exception; -} diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2EventAdapter.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2EventAdapter.java index db9f5ec438..002b36817f 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2EventAdapter.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2EventAdapter.java @@ -23,8 +23,9 @@ import io.netty.channel.ChannelHandlerContext; */ public class Http2EventAdapter implements Http2Connection.Listener, Http2FrameListener { @Override - public void onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, boolean endOfStream) + public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, boolean endOfStream) throws Http2Exception { + return data.readableBytes() + padding; } @Override diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/FlowState.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2FlowState.java similarity index 96% rename from codec-http2/src/main/java/io/netty/handler/codec/http2/FlowState.java rename to codec-http2/src/main/java/io/netty/handler/codec/http2/Http2FlowState.java index b587ef21d2..a6972a4fdf 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/FlowState.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2FlowState.java @@ -17,7 +17,7 @@ package io.netty.handler.codec.http2; /** * Base interface for flow-control state for a particular stream. */ -public interface FlowState { +public interface Http2FlowState { /** * Returns the current remaining flow control window (in bytes) for the stream. Depending on the diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2FrameAdapter.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2FrameAdapter.java index 190aa3e267..62204b179e 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2FrameAdapter.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2FrameAdapter.java @@ -23,8 +23,9 @@ import io.netty.channel.ChannelHandlerContext; public class Http2FrameAdapter implements Http2FrameListener { @Override - public void onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, + public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, boolean endOfStream) throws Http2Exception { + return data.readableBytes() + padding; } @Override diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2FrameListener.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2FrameListener.java index f5f91199a8..69805d3621 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2FrameListener.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2FrameListener.java @@ -21,7 +21,30 @@ import io.netty.channel.ChannelHandlerContext; /** * An listener of HTTP/2 frames. */ -public interface Http2FrameListener extends Http2DataListener { +public interface Http2FrameListener { + + /** + * Handles an inbound {@code DATA} frame. + * + * @param ctx the context from the handler where the frame was read. + * @param streamId the subject stream for the frame. + * @param data payload buffer for the frame. If this buffer will be released by the codec. + * @param padding the number of padding bytes found at the end of the frame. + * @param endOfStream Indicates whether this is the last frame to be sent from the remote + * endpoint for this stream. + * @return the number of bytes that have been processed by the application. The returned bytes + * are used by the inbound flow controller to determine the appropriate time to expand + * the inbound flow control window (i.e. send {@code WINDOW_UPDATE}). Returning a value + * equal to the length of {@code data} + {@code padding} will effectively opt-out of + * application-level flow control for this frame. Returning a value less than the length + * of {@code data} + {@code padding} will defer the returning of the processed bytes, + * which the application must later return via + * {@link Http2InboundFlowState#returnProcessedBytes(ChannelHandlerContext, int)}. The + * returned value must be >= {@code 0} and <= {@code data.readableBytes()} + + * {@code padding}. + */ + int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, + boolean endOfStream) throws Http2Exception; /** * Handles an inbound HEADERS frame. diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2FrameListenerDecorator.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2FrameListenerDecorator.java index 8664d64fe2..365cf17589 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2FrameListenerDecorator.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2FrameListenerDecorator.java @@ -29,9 +29,9 @@ public class Http2FrameListenerDecorator implements Http2FrameListener { } @Override - public void onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, boolean endOfStream) + public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, boolean endOfStream) throws Http2Exception { - listener.onDataRead(ctx, streamId, data, padding, endOfStream); + return listener.onDataRead(ctx, streamId, data, padding, endOfStream); } @Override diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2InboundFlowController.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2InboundFlowController.java index af5c3c24d9..cfd70fbd98 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2InboundFlowController.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2InboundFlowController.java @@ -15,14 +15,30 @@ package io.netty.handler.codec.http2; +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; + /** * Controls the inbound flow of data frames from the remote endpoint. */ -public interface Http2InboundFlowController extends Http2DataListener { +public interface Http2InboundFlowController { + + /** + * Applies inbound flow control to the given {@code DATA} frame. + * + * @param ctx the context from the handler where the frame was read. + * @param streamId the subject stream for the frame. + * @param data payload buffer for the frame. + * @param padding the number of padding bytes found at the end of the frame. + * @param endOfStream Indicates whether this is the last frame to be sent from the remote + * endpoint for this stream. + */ + void applyFlowControl(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, + boolean endOfStream) throws Http2Exception; /** * Sets the initial inbound flow control window size and updates all stream window sizes by the - * delta. This is called as part of the processing for an outbound SETTINGS frame. + * delta. * * @param newWindowSize the new initial window size. * @throws Http2Exception thrown if any protocol-related error occurred. diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2InboundFlowState.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2InboundFlowState.java new file mode 100644 index 0000000000..ad1a5e7c4a --- /dev/null +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2InboundFlowState.java @@ -0,0 +1,36 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package io.netty.handler.codec.http2; + +import io.netty.channel.ChannelHandlerContext; + +/** + * The inbound flow control state for a stream. This object is created and managed by the + * {@link Http2InboundFlowController}. + */ +public interface Http2InboundFlowState extends Http2FlowState { + + /** + * Used by applications that participate in application-level inbound flow control. Allows the + * application to return a number of bytes that has been processed and thereby enabling the + * {@link Http2InboundFlowController} to send a {@code WINDOW_UPDATE} to restore at least part + * of the flow control window. + * + * @param ctx the channel handler context to use when sending a {@code WINDOW_UPDATE} if + * appropriate + * @param numBytes the number of bytes to be returned to the flow control window. + */ + void returnProcessedBytes(ChannelHandlerContext ctx, int numBytes) throws Http2Exception; +} diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2InboundFrameLogger.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2InboundFrameLogger.java index 2b1942f9e3..d74e645e0c 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2InboundFrameLogger.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2InboundFrameLogger.java @@ -39,11 +39,11 @@ public class Http2InboundFrameLogger implements Http2FrameReader { reader.readFrame(ctx, input, new Http2FrameListener() { @Override - public void onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, + public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, boolean endOfStream) throws Http2Exception { logger.logData(INBOUND, streamId, data, padding, endOfStream); - listener.onDataRead(ctx, streamId, data, padding, endOfStream); + return listener.onDataRead(ctx, streamId, data, padding, endOfStream); } @Override diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2Stream.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2Stream.java index 34f5c3971e..029c919d37 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2Stream.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2Stream.java @@ -166,22 +166,22 @@ public interface Http2Stream { /** * Gets the in-bound flow control state for this stream. */ - FlowState inboundFlow(); + Http2InboundFlowState inboundFlow(); /** * Sets the in-bound flow control state for this stream. */ - void inboundFlow(FlowState state); + void inboundFlow(Http2InboundFlowState state); /** * Gets the out-bound flow control window for this stream. */ - FlowState outboundFlow(); + Http2FlowState outboundFlow(); /** * Sets the out-bound flow control window for this stream. */ - void outboundFlow(FlowState state); + void outboundFlow(Http2FlowState state); /** * Updates an priority for this stream. Calling this method may affect the straucture of the diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/InboundHttp2ToHttpAdapter.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/InboundHttp2ToHttpAdapter.java index 239bb6fdf1..bbde4fd438 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/InboundHttp2ToHttpAdapter.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/InboundHttp2ToHttpAdapter.java @@ -252,7 +252,7 @@ public class InboundHttp2ToHttpAdapter extends Http2EventAdapter { } @Override - public void onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, boolean endOfStream) + public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, boolean endOfStream) throws Http2Exception { FullHttpMessage msg = messageMap.get(streamId); if (msg == null) { @@ -271,6 +271,9 @@ public class InboundHttp2ToHttpAdapter extends Http2EventAdapter { if (endOfStream) { fireChannelRead(ctx, msg, streamId); } + + // All bytes have been processed. + return dataReadableBytes + padding; } @Override diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionDecoderTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionDecoderTest.java index 9fb84ebaa3..44020f4ca3 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionDecoderTest.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionDecoderTest.java @@ -101,6 +101,9 @@ public class DefaultHttp2ConnectionDecoderTest { @Mock private Http2ConnectionEncoder encoder; + @Mock + private Http2InboundFlowState inFlowState; + @Mock private Http2LifecycleManager lifecycleManager; @@ -113,6 +116,7 @@ public class DefaultHttp2ConnectionDecoderTest { when(channel.isActive()).thenReturn(true); when(stream.id()).thenReturn(STREAM_ID); when(stream.state()).thenReturn(OPEN); + when(stream.inboundFlow()).thenReturn(inFlowState); when(pushStream.id()).thenReturn(PUSH_STREAM_ID); when(connection.activeStreams()).thenReturn(Collections.singletonList(stream)); when(connection.stream(STREAM_ID)).thenReturn(stream); @@ -161,9 +165,13 @@ public class DefaultHttp2ConnectionDecoderTest { public void dataReadAfterGoAwayShouldApplyFlowControl() throws Exception { when(connection.goAwaySent()).thenReturn(true); final ByteBuf data = dummyData(); + int padding = 10; + int processedBytes = data.readableBytes() + padding; + mockFlowControl(processedBytes); try { decode().onDataRead(ctx, STREAM_ID, data, 10, true); - verify(inboundFlow).onDataRead(eq(ctx), eq(STREAM_ID), eq(data), eq(10), eq(true)); + verify(inboundFlow).applyFlowControl(eq(ctx), eq(STREAM_ID), eq(data), eq(padding), eq(true)); + verify(inFlowState).returnProcessedBytes(eq(ctx), eq(processedBytes)); // Verify that the event was absorbed and not propagated to the oberver. verify(listener, never()).onDataRead(eq(ctx), anyInt(), any(ByteBuf.class), anyInt(), anyBoolean()); @@ -192,7 +200,7 @@ public class DefaultHttp2ConnectionDecoderTest { final ByteBuf data = dummyData(); try { decode().onDataRead(ctx, STREAM_ID, data, 10, true); - verify(inboundFlow, never()).onDataRead(eq(ctx), eq(STREAM_ID), eq(data), eq(10), eq(true)); + verify(inboundFlow, never()).applyFlowControl(eq(ctx), eq(STREAM_ID), eq(data), eq(10), eq(true)); verify(listener, never()).onDataRead(eq(ctx), anyInt(), any(ByteBuf.class), anyInt(), anyBoolean()); } finally { data.release(); @@ -207,7 +215,7 @@ public class DefaultHttp2ConnectionDecoderTest { final ByteBuf data = dummyData(); try { decode().onDataRead(ctx, STREAM_ID, data, 10, true); - verify(inboundFlow).onDataRead(eq(ctx), eq(STREAM_ID), eq(data), eq(10), eq(true)); + verify(inboundFlow).applyFlowControl(eq(ctx), eq(STREAM_ID), eq(data), eq(10), eq(true)); verify(listener, never()).onDataRead(eq(ctx), anyInt(), any(ByteBuf.class), anyInt(), anyBoolean()); } finally { data.release(); @@ -219,7 +227,7 @@ public class DefaultHttp2ConnectionDecoderTest { final ByteBuf data = dummyData(); try { decode().onDataRead(ctx, STREAM_ID, data, 10, true); - verify(inboundFlow).onDataRead(eq(ctx), eq(STREAM_ID), eq(data), eq(10), eq(true)); + verify(inboundFlow).applyFlowControl(eq(ctx), eq(STREAM_ID), eq(data), eq(10), eq(true)); verify(lifecycleManager).closeRemoteSide(eq(stream), eq(future)); verify(listener).onDataRead(eq(ctx), eq(STREAM_ID), eq(data), eq(10), eq(true)); } finally { @@ -402,5 +410,14 @@ public class DefaultHttp2ConnectionDecoderTest { decoder.decodeFrame(ctx, EMPTY_BUFFER, Collections.emptyList()); return internallistener.getValue(); } -} + private void mockFlowControl(final int processedBytes) throws Http2Exception { + doAnswer(new Answer() { + @Override + public Integer answer(InvocationOnMock invocation) throws Throwable { + return processedBytes; + } + }).when(listener).onDataRead(any(ChannelHandlerContext.class), anyInt(), + any(ByteBuf.class), anyInt(), anyBoolean()); + } +} diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2InboundFlowControllerTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2InboundFlowControllerTest.java index bcb1af3154..90184c1b49 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2InboundFlowControllerTest.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2InboundFlowControllerTest.java @@ -15,7 +15,6 @@ package io.netty.handler.codec.http2; -import static io.netty.handler.codec.http2.DefaultHttp2InboundFlowController.WINDOW_UPDATE_OFF; import static io.netty.handler.codec.http2.Http2CodecUtil.CONNECTION_STREAM_ID; import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_WINDOW_SIZE; import static org.junit.Assert.assertEquals; @@ -76,6 +75,20 @@ public class DefaultHttp2InboundFlowControllerTest { verifyWindowUpdateNotSent(); } + @Test + public void windowUpdateShouldSendOnceBytesReturned() throws Http2Exception { + int dataSize = DEFAULT_WINDOW_SIZE / 2 + 1; + applyFlowControl(STREAM_ID, dataSize, 0, false); + + // Return only a few bytes and verify that the WINDOW_UPDATE hasn't been sent. + returnProcessedBytes(STREAM_ID, 10); + verifyWindowUpdateSent(CONNECTION_STREAM_ID, dataSize); + + // Return the rest and verify the WINDOW_UPDATE is sent. + returnProcessedBytes(STREAM_ID, dataSize - 10); + verifyWindowUpdateSent(STREAM_ID, dataSize); + } + @Test(expected = Http2Exception.class) public void connectionFlowControlExceededShouldThrow() throws Http2Exception { // Window exceeded because of the padding. @@ -83,7 +96,7 @@ public class DefaultHttp2InboundFlowControllerTest { } @Test - public void halfWindowRemainingShouldUpdateConnectionWindow() throws Http2Exception { + public void windowUpdateShouldNotBeSentAfterEndOfStream() throws Http2Exception { int dataSize = DEFAULT_WINDOW_SIZE / 2 + 1; int newWindow = DEFAULT_WINDOW_SIZE - dataSize; int windowDelta = DEFAULT_WINDOW_SIZE - newWindow; @@ -91,16 +104,10 @@ public class DefaultHttp2InboundFlowControllerTest { // Set end-of-stream on the frame, so no window update will be sent for the stream. applyFlowControl(STREAM_ID, dataSize, 0, true); verifyWindowUpdateSent(CONNECTION_STREAM_ID, windowDelta); - } + verifyWindowUpdateNotSent(STREAM_ID); - @Test - public void windowMaintenanceDisabledForConnectionShouldNotUpdateWindow() throws Http2Exception { - controller.setWindowUpdateRatio(ctx, CONNECTION_STREAM_ID, WINDOW_UPDATE_OFF); - int dataSize = DEFAULT_WINDOW_SIZE / 2 + 1; - - // Set end-of-stream on the frame, so no window update will be sent for the stream. - applyFlowControl(STREAM_ID, dataSize, 0, true); - verifyWindowUpdateNotSent(); + returnProcessedBytes(STREAM_ID, dataSize); + verifyWindowUpdateNotSent(STREAM_ID); } @Test @@ -111,40 +118,36 @@ public class DefaultHttp2InboundFlowControllerTest { // Don't set end-of-stream so we'll get a window update for the stream as well. applyFlowControl(STREAM_ID, dataSize, 0, false); + returnProcessedBytes(STREAM_ID, dataSize); verifyWindowUpdateSent(CONNECTION_STREAM_ID, windowDelta); verifyWindowUpdateSent(STREAM_ID, windowDelta); } - @Test - public void windowMaintenanceDisabledForStreamShouldNotUpdateWindow() throws Http2Exception { - controller.setWindowUpdateRatio(ctx, STREAM_ID, WINDOW_UPDATE_OFF); - int dataSize = DEFAULT_WINDOW_SIZE / 2 + 1; - int initialWindowSize = DEFAULT_WINDOW_SIZE; - int windowDelta = getWindowDelta(initialWindowSize, initialWindowSize, dataSize); - - // Don't set end-of-stream so we'll get a window update for the stream as well. - applyFlowControl(STREAM_ID, dataSize, 0, false); - verifyWindowUpdateSent(CONNECTION_STREAM_ID, windowDelta); - verifyWindowUpdateNotSent(STREAM_ID); - } - @Test public void initialWindowUpdateShouldAllowMoreFrames() throws Http2Exception { // Send a frame that takes up the entire window. int initialWindowSize = DEFAULT_WINDOW_SIZE; applyFlowControl(STREAM_ID, initialWindowSize, 0, false); + assertEquals(0, window(STREAM_ID)); + assertEquals(DEFAULT_WINDOW_SIZE, window(CONNECTION_STREAM_ID)); + returnProcessedBytes(STREAM_ID, initialWindowSize); + assertEquals(initialWindowSize, window(STREAM_ID)); + assertEquals(DEFAULT_WINDOW_SIZE, window(CONNECTION_STREAM_ID)); // Update the initial window size to allow another frame. int newInitialWindowSize = 2 * initialWindowSize; controller.initialInboundWindowSize(newInitialWindowSize); + assertEquals(newInitialWindowSize, window(STREAM_ID)); + assertEquals(DEFAULT_WINDOW_SIZE, window(CONNECTION_STREAM_ID)); // Clear any previous calls to the writer. reset(frameWriter); // Send the next frame and verify that the expected window updates were sent. applyFlowControl(STREAM_ID, initialWindowSize, 0, false); + returnProcessedBytes(STREAM_ID, initialWindowSize); int delta = newInitialWindowSize - initialWindowSize; - verifyWindowUpdateSent(CONNECTION_STREAM_ID, delta); + verifyWindowUpdateSent(CONNECTION_STREAM_ID, newInitialWindowSize); verifyWindowUpdateSent(STREAM_ID, delta); } @@ -175,6 +178,7 @@ public class DefaultHttp2InboundFlowControllerTest { // connection window continues collapsing. int data2 = window(STREAM_ID); applyFlowControl(STREAM_ID, data2, 0, false); + returnProcessedBytes(STREAM_ID, data2); verifyWindowUpdateSent(STREAM_ID, data1 + data2); verifyWindowUpdateNotSent(CONNECTION_STREAM_ID); assertEquals(DEFAULT_WINDOW_SIZE, window(STREAM_ID)); @@ -186,6 +190,7 @@ public class DefaultHttp2InboundFlowControllerTest { // verify the new maximum of the connection window. int data3 = window(STREAM_ID); applyFlowControl(STREAM_ID, data3, 0, false); + returnProcessedBytes(STREAM_ID, data3); verifyWindowUpdateSent(STREAM_ID, DEFAULT_WINDOW_SIZE); verifyWindowUpdateSent(CONNECTION_STREAM_ID, DEFAULT_WINDOW_SIZE - (DEFAULT_WINDOW_SIZE * 2 - (data2 + data3))); @@ -201,7 +206,7 @@ public class DefaultHttp2InboundFlowControllerTest { private void applyFlowControl(int streamId, int dataSize, int padding, boolean endOfStream) throws Http2Exception { final ByteBuf buf = dummyData(dataSize); try { - controller.onDataRead(ctx, streamId, buf, padding, endOfStream); + controller.applyFlowControl(ctx, streamId, buf, padding, endOfStream); } finally { buf.release(); } @@ -213,7 +218,11 @@ public class DefaultHttp2InboundFlowControllerTest { return buffer; } - private void verifyWindowUpdateSent(int streamId, int windowSizeIncrement) { + private void returnProcessedBytes(int streamId, int processedBytes) throws Http2Exception { + connection.requireStream(streamId).inboundFlow().returnProcessedBytes(ctx, processedBytes); + } + + private void verifyWindowUpdateSent(int streamId, int windowSizeIncrement) throws Http2Exception { verify(frameWriter).writeWindowUpdate(eq(ctx), eq(streamId), eq(windowSizeIncrement), eq(promise)); } diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2TestUtil.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2TestUtil.java index d7c36fe316..ff8b912bf6 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2TestUtil.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2TestUtil.java @@ -140,14 +140,15 @@ final class Http2TestUtil { protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { reader.readFrame(ctx, in, new Http2FrameListener() { @Override - public void onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, + public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, boolean endOfStream) throws Http2Exception { Http2Stream stream = getOrCreateStream(streamId, endOfStream); - listener.onDataRead(ctx, streamId, data, padding, endOfStream); + int processed = listener.onDataRead(ctx, streamId, data, padding, endOfStream); if (endOfStream) { closeStream(stream, true); } latch.countDown(); + return processed; } @Override @@ -281,16 +282,17 @@ final class Http2TestUtil { } @Override - public void onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, boolean endOfStream) + public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, boolean endOfStream) throws Http2Exception { int numBytes = data.readableBytes(); - listener.onDataRead(ctx, streamId, data, padding, endOfStream); + int processed = listener.onDataRead(ctx, streamId, data, padding, endOfStream); messageLatch.countDown(); if (dataLatch != null) { for (int i = 0; i < numBytes; ++i) { dataLatch.countDown(); } } + return processed; } @Override diff --git a/example/src/main/java/io/netty/example/http2/server/HelloWorldHttp2Handler.java b/example/src/main/java/io/netty/example/http2/server/HelloWorldHttp2Handler.java index 8aed6610b7..b608c3086e 100644 --- a/example/src/main/java/io/netty/example/http2/server/HelloWorldHttp2Handler.java +++ b/example/src/main/java/io/netty/example/http2/server/HelloWorldHttp2Handler.java @@ -95,11 +95,13 @@ public class HelloWorldHttp2Handler extends Http2ConnectionHandler { * If receive a frame with end-of-stream set, send a pre-canned response. */ @Override - public void onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, + public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, boolean endOfStream) throws Http2Exception { + int processed = data.readableBytes() + padding; if (endOfStream) { sendResponse(ctx, streamId, data.retain()); } + return processed; } /**