diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/AbstractHttp2ConnectionHandler.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/AbstractHttp2ConnectionHandler.java index 5745a8a1ba..a7664eefef 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/AbstractHttp2ConnectionHandler.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/AbstractHttp2ConnectionHandler.java @@ -81,7 +81,7 @@ public abstract class AbstractHttp2ConnectionHandler extends ByteToMessageDecode protected AbstractHttp2ConnectionHandler(Http2Connection connection, Http2FrameReader frameReader, Http2FrameWriter frameWriter) { this(connection, frameReader, frameWriter, - new DefaultHttp2InboundFlowController(connection), + new DefaultHttp2InboundFlowController(connection, frameWriter), new DefaultHttp2OutboundFlowController(connection, frameWriter)); } @@ -824,16 +824,7 @@ public abstract class AbstractHttp2ConnectionHandler extends ByteToMessageDecode stream.verifyState(STREAM_CLOSED, OPEN, HALF_CLOSED_LOCAL); // Apply flow control. - inboundFlow.applyInboundFlowControl(streamId, data, padding, endOfStream, - new Http2InboundFlowController.FrameWriter() { - @Override - public void writeFrame(int streamId, int windowSizeIncrement) - throws Http2Exception { - frameWriter.writeWindowUpdate(ctx, streamId, windowSizeIncrement, - ctx.newPromise()); - ctx.flush(); - } - }); + inboundFlow.onDataRead(ctx, streamId, data, padding, endOfStream); verifyGoAwayNotReceived(); verifyRstStreamNotReceived(stream); diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2InboundFlowController.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2InboundFlowController.java index dd9c042798..2ebf1abfd4 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2InboundFlowController.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2InboundFlowController.java @@ -20,6 +20,7 @@ import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_WINDOW_SIZE; import static io.netty.handler.codec.http2.Http2Exception.flowControlError; import static io.netty.handler.codec.http2.Http2Exception.protocolError; import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; /** * Basic implementation of {@link Http2InboundFlowController}. @@ -27,13 +28,18 @@ import io.netty.buffer.ByteBuf; public class DefaultHttp2InboundFlowController implements Http2InboundFlowController { private final Http2Connection connection; + private final Http2FrameWriter frameWriter; private int initialWindowSize = DEFAULT_WINDOW_SIZE; - public DefaultHttp2InboundFlowController(Http2Connection connection) { + public DefaultHttp2InboundFlowController(Http2Connection connection, Http2FrameWriter frameWriter) { if (connection == null) { throw new NullPointerException("connection"); } + if (frameWriter == null) { + throw new NullPointerException("frameWriter"); + } this.connection = connection; + this.frameWriter = frameWriter; // Add a flow state for the connection. connection.connectionStream().inboundFlow(new InboundFlowState(CONNECTION_STREAM_ID)); @@ -65,12 +71,22 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro } @Override - public void applyInboundFlowControl(int streamId, ByteBuf data, int padding, - boolean endOfStream, FrameWriter frameWriter) + public void onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, boolean endOfStream) throws Http2Exception { int dataLength = data.readableBytes() + padding; - applyConnectionFlowControl(dataLength, frameWriter); - applyStreamFlowControl(streamId, dataLength, endOfStream, frameWriter); + boolean windowUpdateSent = false; + try { + // Apply the connection-level flow control. + windowUpdateSent = applyConnectionFlowControl(ctx, dataLength); + + // Apply the stream-level flow control. + windowUpdateSent |= applyStreamFlowControl(ctx, streamId, dataLength, endOfStream); + } finally { + // Optimization: only flush once for any sent WINDOW_UPDATE frames. + if (windowUpdateSent) { + ctx.flush(); + } + } } private InboundFlowState connectionState() { @@ -98,8 +114,10 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro /** * Apply connection-wide flow control to the incoming data frame. + * + * @return {@code true} if a {@code WINDOW_UPDATE} frame was sent for the connection. */ - private void applyConnectionFlowControl(int dataLength, FrameWriter frameWriter) + private boolean applyConnectionFlowControl(ChannelHandlerContext ctx, int dataLength) throws Http2Exception { // Remove the data length from the available window size. Throw if the lower bound // was exceeded. @@ -110,15 +128,19 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro // to the initial value and send a window update to the remote endpoint indicating // the new window size. if (connectionState.window() <= getWindowUpdateThreshold()) { - connectionState.updateWindow(frameWriter); + connectionState.updateWindow(ctx); + return true; } + return false; } /** * Apply stream-based flow control to the incoming data frame. + * + * @return {@code true} if a {@code WINDOW_UPDATE} frame was sent for the stream. */ - private void applyStreamFlowControl(int streamId, int dataLength, boolean endOfStream, - FrameWriter frameWriter) throws Http2Exception { + private boolean applyStreamFlowControl(ChannelHandlerContext ctx, int streamId, int dataLength, + boolean endOfStream) throws Http2Exception { // Remove the data length from the available window size. Throw if the lower bound // was exceeded. InboundFlowState state = stateOrFail(streamId); @@ -128,8 +150,10 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro // to the initial value and send a window update to the remote endpoint indicating // the new window size. if (state.window() <= getWindowUpdateThreshold() && !endOfStream) { - state.updateWindow(frameWriter); + state.updateWindow(ctx); + return true; } + return false; } /** @@ -213,13 +237,13 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro * size back to the size of the initial window and sends a window update frame to the remote * endpoint. */ - void updateWindow(FrameWriter frameWriter) throws Http2Exception { + void updateWindow(ChannelHandlerContext ctx) throws Http2Exception { // Expand the window for this stream back to the size of the initial window. int deltaWindowSize = initialWindowSize - window; addAndGet(deltaWindowSize); // Send a window update for the stream/connection. - frameWriter.writeFrame(streamId, deltaWindowSize); + frameWriter.writeWindowUpdate(ctx, streamId, deltaWindowSize, ctx.newPromise()); } } } diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2DataObserver.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2DataObserver.java new file mode 100644 index 0000000000..6c6f402c24 --- /dev/null +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2DataObserver.java @@ -0,0 +1,38 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package io.netty.handler.codec.http2; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; + +/** + * An observer of HTTP/2 {@code DATA} frames. + */ +public interface Http2DataObserver { + + /** + * Handles an inbound {@code DATA} frame. + * + * @param ctx the context from the handler where the frame was read. + * @param streamId the subject stream for the frame. + * @param data payload buffer for the frame. If this buffer needs to be retained by the observer + * they must make a copy. + * @param padding the number of padding bytes found at the end of the frame. + * @param endOfStream Indicates whether this is the last frame to be sent from the remote + * endpoint for this stream. + */ + void onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, + boolean endOfStream) throws Http2Exception; +} diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2FrameObserver.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2FrameObserver.java index 4f1f877fd4..cc5d339330 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2FrameObserver.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2FrameObserver.java @@ -21,21 +21,7 @@ import io.netty.channel.ChannelHandlerContext; /** * An observer of HTTP/2 frames. */ -public interface Http2FrameObserver { - - /** - * Handles an inbound DATA frame. - * - * @param ctx the context from the handler where the frame was read. - * @param streamId the subject stream for the frame. - * @param data payload buffer for the frame. If this buffer needs to be retained by the observer - * they must make a copy. - * @param padding the number of padding bytes found at the end of the frame. - * @param endOfStream Indicates whether this is the last frame to be sent from the remote - * endpoint for this stream. - */ - void onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, - boolean endOfStream) throws Http2Exception; +public interface Http2FrameObserver extends Http2DataObserver { /** * Handles an inbound HEADERS frame. diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2InboundFlowController.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2InboundFlowController.java index 7be4506cad..2fc31ebed7 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2InboundFlowController.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2InboundFlowController.java @@ -15,24 +15,10 @@ package io.netty.handler.codec.http2; -import io.netty.buffer.ByteBuf; - /** * Controls the inbound flow of data frames from the remote endpoint. */ -public interface Http2InboundFlowController { - - /** - * A writer of window update frames. - * TODO: Use Http2FrameWriter instead. - */ - interface FrameWriter { - - /** - * Writes a window update frame to the remote endpoint. - */ - void writeFrame(int streamId, int windowSizeIncrement) throws Http2Exception; - } +public interface Http2InboundFlowController extends Http2DataObserver { /** * Sets the initial inbound flow control window size and updates all stream window sizes by the @@ -47,17 +33,4 @@ public interface Http2InboundFlowController { * Gets the initial inbound flow control window size. */ int initialInboundWindowSize(); - - /** - * Applies flow control for the received data frame. - * - * @param streamId the ID of the stream receiving the data - * @param data the data portion of the data frame. Does not contain padding. - * @param padding the amount of padding received in the original frame. - * @param endOfStream indicates whether this is the last frame for the stream. - * @param frameWriter allows this flow controller to send window updates to the remote endpoint. - * @throws Http2Exception thrown if any protocol-related error occurred. - */ - void applyInboundFlowControl(int streamId, ByteBuf data, int padding, boolean endOfStream, - FrameWriter frameWriter) throws Http2Exception; } diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2InboundFlowControllerTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2InboundFlowControllerTest.java index d3b5d326de..50365086c8 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2InboundFlowControllerTest.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2InboundFlowControllerTest.java @@ -17,14 +17,17 @@ package io.netty.handler.codec.http2; import static io.netty.handler.codec.http2.Http2CodecUtil.CONNECTION_STREAM_ID; import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_WINDOW_SIZE; +import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.never; import static org.mockito.Mockito.reset; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; -import io.netty.handler.codec.http2.Http2InboundFlowController.FrameWriter; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPromise; import org.junit.Before; import org.junit.Test; @@ -43,7 +46,13 @@ public class DefaultHttp2InboundFlowControllerTest { private ByteBuf buffer; @Mock - private FrameWriter frameWriter; + private Http2FrameWriter frameWriter; + + @Mock + private ChannelHandlerContext ctx; + + @Mock + private ChannelPromise promise; private DefaultHttp2Connection connection; @@ -51,8 +60,10 @@ public class DefaultHttp2InboundFlowControllerTest { public void setup() throws Http2Exception { MockitoAnnotations.initMocks(this); + when(ctx.newPromise()).thenReturn(promise); + connection = new DefaultHttp2Connection(false); - controller = new DefaultHttp2InboundFlowController(connection); + controller = new DefaultHttp2InboundFlowController(connection, frameWriter); connection.local().createStream(STREAM_ID, false); } @@ -77,7 +88,7 @@ public class DefaultHttp2InboundFlowControllerTest { // Set end-of-stream on the frame, so no window update will be sent for the stream. applyFlowControl(dataSize, 0, true); - verify(frameWriter).writeFrame(eq(CONNECTION_STREAM_ID), eq(windowDelta)); + verifyWindowUpdateSent(CONNECTION_STREAM_ID, windowDelta); } @Test @@ -88,8 +99,8 @@ public class DefaultHttp2InboundFlowControllerTest { // Don't set end-of-stream so we'll get a window update for the stream as well. applyFlowControl(dataSize, 0, false); - verify(frameWriter).writeFrame(eq(CONNECTION_STREAM_ID), eq(windowDelta)); - verify(frameWriter).writeFrame(eq(STREAM_ID), eq(windowDelta)); + verifyWindowUpdateSent(CONNECTION_STREAM_ID, windowDelta); + verifyWindowUpdateSent(STREAM_ID, windowDelta); } @Test @@ -108,8 +119,8 @@ public class DefaultHttp2InboundFlowControllerTest { // Send the next frame and verify that the expected window updates were sent. applyFlowControl(initialWindowSize, 0, false); int delta = newInitialWindowSize - initialWindowSize; - verify(frameWriter).writeFrame(eq(CONNECTION_STREAM_ID), eq(delta)); - verify(frameWriter).writeFrame(eq(STREAM_ID), eq(delta)); + verifyWindowUpdateSent(CONNECTION_STREAM_ID, delta); + verifyWindowUpdateSent(STREAM_ID, delta); } private static int getWindowDelta(int initialSize, int windowSize, int dataSize) { @@ -119,7 +130,7 @@ public class DefaultHttp2InboundFlowControllerTest { private void applyFlowControl(int dataSize, int padding, boolean endOfStream) throws Http2Exception { ByteBuf buf = dummyData(dataSize); - controller.applyInboundFlowControl(STREAM_ID, buf, padding, endOfStream, frameWriter); + controller.onDataRead(ctx, STREAM_ID, buf, padding, endOfStream); buf.release(); } @@ -129,7 +140,13 @@ public class DefaultHttp2InboundFlowControllerTest { return buffer; } + private void verifyWindowUpdateSent(int streamId, int windowSizeIncrement) + throws Http2Exception { + verify(frameWriter).writeWindowUpdate(eq(ctx), eq(streamId), eq(windowSizeIncrement), eq(promise)); + } + private void verifyWindowUpdateNotSent() throws Http2Exception { - verify(frameWriter, never()).writeFrame(anyInt(), anyInt()); + verify(frameWriter, never()).writeWindowUpdate(any(ChannelHandlerContext.class), anyInt(), + anyInt(), any(ChannelPromise.class)); } } diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/DelegatingHttp2ConnectionHandlerTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/DelegatingHttp2ConnectionHandlerTest.java index 9aa5f69ea0..7960da38b4 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/DelegatingHttp2ConnectionHandlerTest.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/DelegatingHttp2ConnectionHandlerTest.java @@ -254,8 +254,7 @@ public class DelegatingHttp2ConnectionHandlerTest { public void dataReadAfterGoAwayShouldApplyFlowControl() throws Exception { when(remote.isGoAwayReceived()).thenReturn(true); decode().onDataRead(ctx, STREAM_ID, dummyData(), 10, true); - verify(inboundFlow).applyInboundFlowControl(eq(STREAM_ID), eq(dummyData()), eq(10), - eq(true), any(Http2InboundFlowController.FrameWriter.class)); + verify(inboundFlow).onDataRead(eq(ctx), eq(STREAM_ID), eq(dummyData()), eq(10), eq(true)); // Verify that the event was absorbed and not propagated to the oberver. verify(observer, never()).onDataRead(eq(ctx), anyInt(), any(ByteBuf.class), anyInt(), @@ -265,8 +264,7 @@ public class DelegatingHttp2ConnectionHandlerTest { @Test public void dataReadWithEndOfStreamShouldCloseRemoteSide() throws Exception { decode().onDataRead(ctx, STREAM_ID, dummyData(), 10, true); - verify(inboundFlow).applyInboundFlowControl(eq(STREAM_ID), eq(dummyData()), eq(10), - eq(true), any(Http2InboundFlowController.FrameWriter.class)); + verify(inboundFlow).onDataRead(eq(ctx), eq(STREAM_ID), eq(dummyData()), eq(10), eq(true)); verify(stream).closeRemoteSide(); verify(observer).onDataRead(eq(ctx), eq(STREAM_ID), eq(dummyData()), eq(10), eq(true)); } diff --git a/example/src/main/java/io/netty/example/http2/client/Http2ClientInitializer.java b/example/src/main/java/io/netty/example/http2/client/Http2ClientInitializer.java index 51f72deea8..18aabe4104 100644 --- a/example/src/main/java/io/netty/example/http2/client/Http2ClientInitializer.java +++ b/example/src/main/java/io/netty/example/http2/client/Http2ClientInitializer.java @@ -69,7 +69,7 @@ public class Http2ClientInitializer extends ChannelInitializer { Http2Connection connection = new DefaultHttp2Connection(false); Http2FrameWriter frameWriter = frameWriter(); connectionHandler = new DelegatingHttp2HttpConnectionHandler(connection, - frameReader(), frameWriter, new DefaultHttp2InboundFlowController(connection), + frameReader(), frameWriter, new DefaultHttp2InboundFlowController(connection, frameWriter), new DefaultHttp2OutboundFlowController(connection, frameWriter), InboundHttp2ToHttpAdapter.newInstance(connection, maxContentLength)); responseHandler = new HttpResponseHandler(); diff --git a/example/src/main/java/io/netty/example/http2/server/HelloWorldHttp2Handler.java b/example/src/main/java/io/netty/example/http2/server/HelloWorldHttp2Handler.java index e63c9002bb..320a693167 100644 --- a/example/src/main/java/io/netty/example/http2/server/HelloWorldHttp2Handler.java +++ b/example/src/main/java/io/netty/example/http2/server/HelloWorldHttp2Handler.java @@ -55,7 +55,7 @@ public class HelloWorldHttp2Handler extends AbstractHttp2ConnectionHandler { private HelloWorldHttp2Handler(Http2Connection connection, Http2FrameWriter frameWriter) { super(connection, new Http2InboundFrameLogger(new DefaultHttp2FrameReader(), logger), frameWriter, - new DefaultHttp2InboundFlowController(connection), + new DefaultHttp2InboundFlowController(connection, frameWriter), new DefaultHttp2OutboundFlowController(connection, frameWriter)); }