diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2Connection.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2Connection.java index 372a1fed8c..9e24d9ddc5 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2Connection.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2Connection.java @@ -1144,19 +1144,19 @@ public class DefaultHttp2Connection implements Http2Connection { void removeFromActiveStreams(DefaultStream stream) { if (streams.remove(stream)) { - try { - // Update the number of active streams initiated by the endpoint. - stream.createdBy().numActiveStreams--; + // Update the number of active streams initiated by the endpoint. + stream.createdBy().numActiveStreams--; + } + notifyClosed(stream); + removeStream(stream); + } - for (int i = 0; i < listeners.size(); i++) { - try { - listeners.get(i).onStreamClosed(stream); - } catch (RuntimeException e) { - logger.error("Caught RuntimeException from listener onStreamClosed.", e); - } - } - } finally { - removeStream(stream); + private void notifyClosed(DefaultStream stream) { + for (int i = 0; i < listeners.size(); i++) { + try { + listeners.get(i).onStreamClosed(stream); + } catch (RuntimeException e) { + logger.error("Caught RuntimeException from listener onStreamClosed.", e); } } } diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2LocalFlowController.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2LocalFlowController.java index 479d5d5d76..43ad3a5731 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2LocalFlowController.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2LocalFlowController.java @@ -31,6 +31,7 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.http2.Http2Exception.CompositeStreamException; import io.netty.handler.codec.http2.Http2Exception.StreamException; import io.netty.handler.codec.http2.Http2Stream.FlowControlState; +import io.netty.util.internal.PlatformDependent; /** * Basic implementation of {@link Http2LocalFlowController}. @@ -44,6 +45,7 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController private final Http2Connection connection; private final Http2FrameWriter frameWriter; + private ChannelHandlerContext ctx; private volatile float windowUpdateRatio; private volatile int initialWindowSize = DEFAULT_WINDOW_SIZE; @@ -74,6 +76,22 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController // frames which may have been exchanged while it was in IDLE state(stream).window(initialWindowSize); } + + @Override + public void onStreamClosed(Http2Stream stream) { + try { + // When a stream is closed, consume any remaining bytes so that they + // are restored to the connection window. + DefaultFlowState state = state(stream); + int unconsumedBytes = state.unconsumedBytes(); + if (ctx != null && unconsumedBytes > 0) { + connectionState().consumeBytes(ctx, unconsumedBytes); + state.consumeBytes(ctx, unconsumedBytes); + } + } catch (Http2Exception e) { + PlatformDependent.throwException(e); + } + } }); } @@ -105,7 +123,19 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController @Override public void consumeBytes(ChannelHandlerContext ctx, Http2Stream stream, int numBytes) throws Http2Exception { - state(stream).consumeBytes(ctx, numBytes); + if (stream.id() == CONNECTION_STREAM_ID) { + throw new UnsupportedOperationException("Returning bytes for the connection window is not supported"); + } + if (numBytes <= 0) { + throw new IllegalArgumentException("numBytes must be positive"); + } + + // Streams automatically consume all remaining bytes when they are closed, so just ignore + // if already closed. + if (!isClosed(stream)) { + connectionState().consumeBytes(ctx, numBytes); + state(stream).consumeBytes(ctx, numBytes); + } } @Override @@ -174,15 +204,23 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController @Override public void receiveFlowControlledFrame(ChannelHandlerContext ctx, Http2Stream stream, ByteBuf data, int padding, boolean endOfStream) throws Http2Exception { + this.ctx = checkNotNull(ctx, "ctx"); int dataLength = data.readableBytes() + padding; // Apply the connection-level flow control - connectionState().receiveFlowControlledFrame(dataLength); - // Apply the stream-level flow control - DefaultFlowState state = state(stream); - state.endOfStream(endOfStream); - state.receiveFlowControlledFrame(dataLength); + DefaultFlowState connectionState = connectionState(); + connectionState.receiveFlowControlledFrame(dataLength); + + if (!isClosed(stream)) { + // Apply the stream-level flow control + DefaultFlowState state = state(stream); + state.endOfStream(endOfStream); + state.receiveFlowControlledFrame(dataLength); + } else if (dataLength > 0) { + // Immediately consume the bytes for the connection window. + connectionState.consumeBytes(ctx, dataLength); + } } private DefaultFlowState connectionState() { @@ -193,6 +231,10 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController return (DefaultFlowState) checkNotNull(stream, "stream").localFlowState(); } + private static boolean isClosed(Http2Stream stream) { + return stream.state() == Http2Stream.State.CLOSED; + } + /** * Flow control window state for an individual stream. */ @@ -323,18 +365,6 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController } void consumeBytes(ChannelHandlerContext ctx, int numBytes) throws Http2Exception { - if (stream.id() == CONNECTION_STREAM_ID) { - throw new UnsupportedOperationException("Returning bytes for the connection window is not supported"); - } - if (numBytes <= 0) { - throw new IllegalArgumentException("numBytes must be positive"); - } - - // Return bytes to the connection window - DefaultFlowState connectionState = connectionState(); - connectionState.returnProcessedBytes(numBytes); - connectionState.writeWindowUpdateIfNeeded(ctx); - // Return the bytes processed and update the window. returnProcessedBytes(numBytes); writeWindowUpdateIfNeeded(ctx); @@ -348,7 +378,7 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController * Updates the flow control window for this stream if it is appropriate. */ void writeWindowUpdateIfNeeded(ChannelHandlerContext ctx) throws Http2Exception { - if (endOfStream || initialStreamWindowSize <= 0) { + if (endOfStream || initialStreamWindowSize <= 0 || isClosed(stream)) { return; } diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2LocalFlowController.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2LocalFlowController.java index b852b249f5..83d193350e 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2LocalFlowController.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2LocalFlowController.java @@ -28,6 +28,8 @@ public interface Http2LocalFlowController extends Http2FlowController { * policies to it for both the {@code stream} as well as the connection. If any flow control * policies have been violated, an exception is raised immediately, otherwise the frame is * considered to have "passed" flow control. + *
+ * If {@code stream} is closed, flow control should only be applied to the connection window. * * @param ctx the context from the handler where the frame was read. * @param stream the subject stream for the received frame. The connection stream object must @@ -39,22 +41,24 @@ public interface Http2LocalFlowController extends Http2FlowController { * @throws Http2Exception if any flow control errors are encountered. */ void receiveFlowControlledFrame(ChannelHandlerContext ctx, Http2Stream stream, ByteBuf data, int padding, - boolean endOfStream) throws Http2Exception; + boolean endOfStream) throws Http2Exception; /** - * Indicates that the application has consumed a number of bytes for the given stream and is - * therefore ready to receive more data from the remote endpoint. The application must consume - * any bytes that it receives or the flow control window will collapse. Consuming bytes enables - * the flow controller to send {@code WINDOW_UPDATE} to restore a portion of the flow control - * window for the stream. + * Indicates that the application has consumed a number of bytes for the given stream and is therefore ready to + * receive more data from the remote endpoint. The application must consume any bytes that it receives or the flow + * control window will collapse. Consuming bytes enables the flow controller to send {@code WINDOW_UPDATE} to + * restore a portion of the flow control window for the stream. + * + * If {@code stream} is closed (i.e. {@link Http2Stream#state()} method returns {@link Http2Stream.State#CLOSED}), + * the consumed bytes are only restored to the connection window. When a stream is closed, the flow controller + * automatically restores any unconsumed bytes for that stream to the connection window. This is done to ensure that + * the connection window does not degrade over time as streams are closed. * - * @param ctx the channel handler context to use when sending a {@code WINDOW_UPDATE} if - * appropriate - * @param stream the stream for which window space should be freed. The connection stream object - * must not be used. + * @param ctx the channel handler context to use when sending a {@code WINDOW_UPDATE} if appropriate + * @param stream the stream for which window space should be freed. The connection stream object must not be used. * @param numBytes the number of bytes to be returned to the flow control window. - * @throws Http2Exception if the number of bytes returned exceeds the {@link #unconsumedBytes} - * for the stream. + * @throws Http2Exception if the number of bytes returned exceeds the {@link #unconsumedBytes(Http2Stream)} for the + * stream. */ void consumeBytes(ChannelHandlerContext ctx, Http2Stream stream, int numBytes) throws Http2Exception; diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2LocalFlowControllerTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2LocalFlowControllerTest.java index c35639f0fc..567e625fd9 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2LocalFlowControllerTest.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2LocalFlowControllerTest.java @@ -201,6 +201,22 @@ public class DefaultHttp2LocalFlowControllerTest { } } + @Test + public void closeShouldConsumeBytes() throws Http2Exception { + receiveFlowControlledFrame(STREAM_ID, 10, 0, false); + assertEquals(10, controller.unconsumedBytes(connection.connectionStream())); + stream(STREAM_ID).close(); + assertEquals(0, controller.unconsumedBytes(connection.connectionStream())); + } + + @Test + public void dataReceivedForClosedStreamShouldImmediatelyConsumeBytes() throws Http2Exception { + Http2Stream stream = stream(STREAM_ID); + stream.close(); + receiveFlowControlledFrame(stream, 10, 0, false); + assertEquals(0, controller.unconsumedBytes(connection.connectionStream())); + } + @Test public void globalRatioShouldImpactStreams() throws Http2Exception { float ratio = 0.6f; @@ -254,10 +270,15 @@ public class DefaultHttp2LocalFlowControllerTest { } private void receiveFlowControlledFrame(int streamId, int dataSize, int padding, - boolean endOfStream) throws Http2Exception { + boolean endOfStream) throws Http2Exception { + receiveFlowControlledFrame(stream(streamId), dataSize, padding, endOfStream); + } + + private void receiveFlowControlledFrame(Http2Stream stream, int dataSize, int padding, + boolean endOfStream) throws Http2Exception { final ByteBuf buf = dummyData(dataSize); try { - controller.receiveFlowControlledFrame(ctx, stream(streamId), buf, padding, endOfStream); + controller.receiveFlowControlledFrame(ctx, stream, buf, padding, endOfStream); } finally { buf.release(); }