From 5b1d50fa7c404108140160a48aa2f5d08c2121af Mon Sep 17 00:00:00 2001 From: nmittler Date: Fri, 22 Aug 2014 12:32:05 -0700 Subject: [PATCH] Changing HTTP/2 inbound flow control to use Http2FrameWriter Motivation: This is just some general cleanup to get rid of the FrameWriter inner interface withing Http2InboundFlowController. It's not necessary since the flow controller can just use the Http2FrameWriter to send WINDOW_UPDATE frames. Modifications: Updated DefaultHttp2InboundFlowController to use Http2FrameWriter. Result: The inbound flow control code is somewhat less smelly :). --- .../http2/AbstractHttp2ConnectionHandler.java | 13 +---- .../DefaultHttp2InboundFlowController.java | 48 ++++++++++++++----- .../codec/http2/Http2DataObserver.java | 38 +++++++++++++++ .../codec/http2/Http2FrameObserver.java | 16 +------ .../http2/Http2InboundFlowController.java | 29 +---------- ...DefaultHttp2InboundFlowControllerTest.java | 37 ++++++++++---- .../DelegatingHttp2ConnectionHandlerTest.java | 6 +-- .../http2/client/Http2ClientInitializer.java | 2 +- .../http2/server/HelloWorldHttp2Handler.java | 2 +- 9 files changed, 109 insertions(+), 82 deletions(-) create mode 100644 codec-http2/src/main/java/io/netty/handler/codec/http2/Http2DataObserver.java diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/AbstractHttp2ConnectionHandler.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/AbstractHttp2ConnectionHandler.java index 5745a8a1ba..a7664eefef 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/AbstractHttp2ConnectionHandler.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/AbstractHttp2ConnectionHandler.java @@ -81,7 +81,7 @@ public abstract class AbstractHttp2ConnectionHandler extends ByteToMessageDecode protected AbstractHttp2ConnectionHandler(Http2Connection connection, Http2FrameReader frameReader, Http2FrameWriter frameWriter) { this(connection, frameReader, frameWriter, - new DefaultHttp2InboundFlowController(connection), + new DefaultHttp2InboundFlowController(connection, frameWriter), new DefaultHttp2OutboundFlowController(connection, frameWriter)); } @@ -824,16 +824,7 @@ public abstract class AbstractHttp2ConnectionHandler extends ByteToMessageDecode stream.verifyState(STREAM_CLOSED, OPEN, HALF_CLOSED_LOCAL); // Apply flow control. - inboundFlow.applyInboundFlowControl(streamId, data, padding, endOfStream, - new Http2InboundFlowController.FrameWriter() { - @Override - public void writeFrame(int streamId, int windowSizeIncrement) - throws Http2Exception { - frameWriter.writeWindowUpdate(ctx, streamId, windowSizeIncrement, - ctx.newPromise()); - ctx.flush(); - } - }); + inboundFlow.onDataRead(ctx, streamId, data, padding, endOfStream); verifyGoAwayNotReceived(); verifyRstStreamNotReceived(stream); diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2InboundFlowController.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2InboundFlowController.java index dd9c042798..2ebf1abfd4 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2InboundFlowController.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2InboundFlowController.java @@ -20,6 +20,7 @@ import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_WINDOW_SIZE; import static io.netty.handler.codec.http2.Http2Exception.flowControlError; import static io.netty.handler.codec.http2.Http2Exception.protocolError; import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; /** * Basic implementation of {@link Http2InboundFlowController}. @@ -27,13 +28,18 @@ import io.netty.buffer.ByteBuf; public class DefaultHttp2InboundFlowController implements Http2InboundFlowController { private final Http2Connection connection; + private final Http2FrameWriter frameWriter; private int initialWindowSize = DEFAULT_WINDOW_SIZE; - public DefaultHttp2InboundFlowController(Http2Connection connection) { + public DefaultHttp2InboundFlowController(Http2Connection connection, Http2FrameWriter frameWriter) { if (connection == null) { throw new NullPointerException("connection"); } + if (frameWriter == null) { + throw new NullPointerException("frameWriter"); + } this.connection = connection; + this.frameWriter = frameWriter; // Add a flow state for the connection. connection.connectionStream().inboundFlow(new InboundFlowState(CONNECTION_STREAM_ID)); @@ -65,12 +71,22 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro } @Override - public void applyInboundFlowControl(int streamId, ByteBuf data, int padding, - boolean endOfStream, FrameWriter frameWriter) + public void onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, boolean endOfStream) throws Http2Exception { int dataLength = data.readableBytes() + padding; - applyConnectionFlowControl(dataLength, frameWriter); - applyStreamFlowControl(streamId, dataLength, endOfStream, frameWriter); + boolean windowUpdateSent = false; + try { + // Apply the connection-level flow control. + windowUpdateSent = applyConnectionFlowControl(ctx, dataLength); + + // Apply the stream-level flow control. + windowUpdateSent |= applyStreamFlowControl(ctx, streamId, dataLength, endOfStream); + } finally { + // Optimization: only flush once for any sent WINDOW_UPDATE frames. + if (windowUpdateSent) { + ctx.flush(); + } + } } private InboundFlowState connectionState() { @@ -98,8 +114,10 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro /** * Apply connection-wide flow control to the incoming data frame. + * + * @return {@code true} if a {@code WINDOW_UPDATE} frame was sent for the connection. */ - private void applyConnectionFlowControl(int dataLength, FrameWriter frameWriter) + private boolean applyConnectionFlowControl(ChannelHandlerContext ctx, int dataLength) throws Http2Exception { // Remove the data length from the available window size. Throw if the lower bound // was exceeded. @@ -110,15 +128,19 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro // to the initial value and send a window update to the remote endpoint indicating // the new window size. if (connectionState.window() <= getWindowUpdateThreshold()) { - connectionState.updateWindow(frameWriter); + connectionState.updateWindow(ctx); + return true; } + return false; } /** * Apply stream-based flow control to the incoming data frame. + * + * @return {@code true} if a {@code WINDOW_UPDATE} frame was sent for the stream. */ - private void applyStreamFlowControl(int streamId, int dataLength, boolean endOfStream, - FrameWriter frameWriter) throws Http2Exception { + private boolean applyStreamFlowControl(ChannelHandlerContext ctx, int streamId, int dataLength, + boolean endOfStream) throws Http2Exception { // Remove the data length from the available window size. Throw if the lower bound // was exceeded. InboundFlowState state = stateOrFail(streamId); @@ -128,8 +150,10 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro // to the initial value and send a window update to the remote endpoint indicating // the new window size. if (state.window() <= getWindowUpdateThreshold() && !endOfStream) { - state.updateWindow(frameWriter); + state.updateWindow(ctx); + return true; } + return false; } /** @@ -213,13 +237,13 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro * size back to the size of the initial window and sends a window update frame to the remote * endpoint. */ - void updateWindow(FrameWriter frameWriter) throws Http2Exception { + void updateWindow(ChannelHandlerContext ctx) throws Http2Exception { // Expand the window for this stream back to the size of the initial window. int deltaWindowSize = initialWindowSize - window; addAndGet(deltaWindowSize); // Send a window update for the stream/connection. - frameWriter.writeFrame(streamId, deltaWindowSize); + frameWriter.writeWindowUpdate(ctx, streamId, deltaWindowSize, ctx.newPromise()); } } } diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2DataObserver.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2DataObserver.java new file mode 100644 index 0000000000..6c6f402c24 --- /dev/null +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2DataObserver.java @@ -0,0 +1,38 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package io.netty.handler.codec.http2; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; + +/** + * An observer of HTTP/2 {@code DATA} frames. + */ +public interface Http2DataObserver { + + /** + * Handles an inbound {@code DATA} frame. + * + * @param ctx the context from the handler where the frame was read. + * @param streamId the subject stream for the frame. + * @param data payload buffer for the frame. If this buffer needs to be retained by the observer + * they must make a copy. + * @param padding the number of padding bytes found at the end of the frame. + * @param endOfStream Indicates whether this is the last frame to be sent from the remote + * endpoint for this stream. + */ + void onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, + boolean endOfStream) throws Http2Exception; +} diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2FrameObserver.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2FrameObserver.java index 4f1f877fd4..cc5d339330 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2FrameObserver.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2FrameObserver.java @@ -21,21 +21,7 @@ import io.netty.channel.ChannelHandlerContext; /** * An observer of HTTP/2 frames. */ -public interface Http2FrameObserver { - - /** - * Handles an inbound DATA frame. - * - * @param ctx the context from the handler where the frame was read. - * @param streamId the subject stream for the frame. - * @param data payload buffer for the frame. If this buffer needs to be retained by the observer - * they must make a copy. - * @param padding the number of padding bytes found at the end of the frame. - * @param endOfStream Indicates whether this is the last frame to be sent from the remote - * endpoint for this stream. - */ - void onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, - boolean endOfStream) throws Http2Exception; +public interface Http2FrameObserver extends Http2DataObserver { /** * Handles an inbound HEADERS frame. diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2InboundFlowController.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2InboundFlowController.java index 7be4506cad..2fc31ebed7 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2InboundFlowController.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2InboundFlowController.java @@ -15,24 +15,10 @@ package io.netty.handler.codec.http2; -import io.netty.buffer.ByteBuf; - /** * Controls the inbound flow of data frames from the remote endpoint. */ -public interface Http2InboundFlowController { - - /** - * A writer of window update frames. - * TODO: Use Http2FrameWriter instead. - */ - interface FrameWriter { - - /** - * Writes a window update frame to the remote endpoint. - */ - void writeFrame(int streamId, int windowSizeIncrement) throws Http2Exception; - } +public interface Http2InboundFlowController extends Http2DataObserver { /** * Sets the initial inbound flow control window size and updates all stream window sizes by the @@ -47,17 +33,4 @@ public interface Http2InboundFlowController { * Gets the initial inbound flow control window size. */ int initialInboundWindowSize(); - - /** - * Applies flow control for the received data frame. - * - * @param streamId the ID of the stream receiving the data - * @param data the data portion of the data frame. Does not contain padding. - * @param padding the amount of padding received in the original frame. - * @param endOfStream indicates whether this is the last frame for the stream. - * @param frameWriter allows this flow controller to send window updates to the remote endpoint. - * @throws Http2Exception thrown if any protocol-related error occurred. - */ - void applyInboundFlowControl(int streamId, ByteBuf data, int padding, boolean endOfStream, - FrameWriter frameWriter) throws Http2Exception; } diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2InboundFlowControllerTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2InboundFlowControllerTest.java index d3b5d326de..50365086c8 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2InboundFlowControllerTest.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2InboundFlowControllerTest.java @@ -17,14 +17,17 @@ package io.netty.handler.codec.http2; import static io.netty.handler.codec.http2.Http2CodecUtil.CONNECTION_STREAM_ID; import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_WINDOW_SIZE; +import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.never; import static org.mockito.Mockito.reset; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; -import io.netty.handler.codec.http2.Http2InboundFlowController.FrameWriter; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPromise; import org.junit.Before; import org.junit.Test; @@ -43,7 +46,13 @@ public class DefaultHttp2InboundFlowControllerTest { private ByteBuf buffer; @Mock - private FrameWriter frameWriter; + private Http2FrameWriter frameWriter; + + @Mock + private ChannelHandlerContext ctx; + + @Mock + private ChannelPromise promise; private DefaultHttp2Connection connection; @@ -51,8 +60,10 @@ public class DefaultHttp2InboundFlowControllerTest { public void setup() throws Http2Exception { MockitoAnnotations.initMocks(this); + when(ctx.newPromise()).thenReturn(promise); + connection = new DefaultHttp2Connection(false); - controller = new DefaultHttp2InboundFlowController(connection); + controller = new DefaultHttp2InboundFlowController(connection, frameWriter); connection.local().createStream(STREAM_ID, false); } @@ -77,7 +88,7 @@ public class DefaultHttp2InboundFlowControllerTest { // Set end-of-stream on the frame, so no window update will be sent for the stream. applyFlowControl(dataSize, 0, true); - verify(frameWriter).writeFrame(eq(CONNECTION_STREAM_ID), eq(windowDelta)); + verifyWindowUpdateSent(CONNECTION_STREAM_ID, windowDelta); } @Test @@ -88,8 +99,8 @@ public class DefaultHttp2InboundFlowControllerTest { // Don't set end-of-stream so we'll get a window update for the stream as well. applyFlowControl(dataSize, 0, false); - verify(frameWriter).writeFrame(eq(CONNECTION_STREAM_ID), eq(windowDelta)); - verify(frameWriter).writeFrame(eq(STREAM_ID), eq(windowDelta)); + verifyWindowUpdateSent(CONNECTION_STREAM_ID, windowDelta); + verifyWindowUpdateSent(STREAM_ID, windowDelta); } @Test @@ -108,8 +119,8 @@ public class DefaultHttp2InboundFlowControllerTest { // Send the next frame and verify that the expected window updates were sent. applyFlowControl(initialWindowSize, 0, false); int delta = newInitialWindowSize - initialWindowSize; - verify(frameWriter).writeFrame(eq(CONNECTION_STREAM_ID), eq(delta)); - verify(frameWriter).writeFrame(eq(STREAM_ID), eq(delta)); + verifyWindowUpdateSent(CONNECTION_STREAM_ID, delta); + verifyWindowUpdateSent(STREAM_ID, delta); } private static int getWindowDelta(int initialSize, int windowSize, int dataSize) { @@ -119,7 +130,7 @@ public class DefaultHttp2InboundFlowControllerTest { private void applyFlowControl(int dataSize, int padding, boolean endOfStream) throws Http2Exception { ByteBuf buf = dummyData(dataSize); - controller.applyInboundFlowControl(STREAM_ID, buf, padding, endOfStream, frameWriter); + controller.onDataRead(ctx, STREAM_ID, buf, padding, endOfStream); buf.release(); } @@ -129,7 +140,13 @@ public class DefaultHttp2InboundFlowControllerTest { return buffer; } + private void verifyWindowUpdateSent(int streamId, int windowSizeIncrement) + throws Http2Exception { + verify(frameWriter).writeWindowUpdate(eq(ctx), eq(streamId), eq(windowSizeIncrement), eq(promise)); + } + private void verifyWindowUpdateNotSent() throws Http2Exception { - verify(frameWriter, never()).writeFrame(anyInt(), anyInt()); + verify(frameWriter, never()).writeWindowUpdate(any(ChannelHandlerContext.class), anyInt(), + anyInt(), any(ChannelPromise.class)); } } diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/DelegatingHttp2ConnectionHandlerTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/DelegatingHttp2ConnectionHandlerTest.java index 9aa5f69ea0..7960da38b4 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/DelegatingHttp2ConnectionHandlerTest.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/DelegatingHttp2ConnectionHandlerTest.java @@ -254,8 +254,7 @@ public class DelegatingHttp2ConnectionHandlerTest { public void dataReadAfterGoAwayShouldApplyFlowControl() throws Exception { when(remote.isGoAwayReceived()).thenReturn(true); decode().onDataRead(ctx, STREAM_ID, dummyData(), 10, true); - verify(inboundFlow).applyInboundFlowControl(eq(STREAM_ID), eq(dummyData()), eq(10), - eq(true), any(Http2InboundFlowController.FrameWriter.class)); + verify(inboundFlow).onDataRead(eq(ctx), eq(STREAM_ID), eq(dummyData()), eq(10), eq(true)); // Verify that the event was absorbed and not propagated to the oberver. verify(observer, never()).onDataRead(eq(ctx), anyInt(), any(ByteBuf.class), anyInt(), @@ -265,8 +264,7 @@ public class DelegatingHttp2ConnectionHandlerTest { @Test public void dataReadWithEndOfStreamShouldCloseRemoteSide() throws Exception { decode().onDataRead(ctx, STREAM_ID, dummyData(), 10, true); - verify(inboundFlow).applyInboundFlowControl(eq(STREAM_ID), eq(dummyData()), eq(10), - eq(true), any(Http2InboundFlowController.FrameWriter.class)); + verify(inboundFlow).onDataRead(eq(ctx), eq(STREAM_ID), eq(dummyData()), eq(10), eq(true)); verify(stream).closeRemoteSide(); verify(observer).onDataRead(eq(ctx), eq(STREAM_ID), eq(dummyData()), eq(10), eq(true)); } diff --git a/example/src/main/java/io/netty/example/http2/client/Http2ClientInitializer.java b/example/src/main/java/io/netty/example/http2/client/Http2ClientInitializer.java index 51f72deea8..18aabe4104 100644 --- a/example/src/main/java/io/netty/example/http2/client/Http2ClientInitializer.java +++ b/example/src/main/java/io/netty/example/http2/client/Http2ClientInitializer.java @@ -69,7 +69,7 @@ public class Http2ClientInitializer extends ChannelInitializer { Http2Connection connection = new DefaultHttp2Connection(false); Http2FrameWriter frameWriter = frameWriter(); connectionHandler = new DelegatingHttp2HttpConnectionHandler(connection, - frameReader(), frameWriter, new DefaultHttp2InboundFlowController(connection), + frameReader(), frameWriter, new DefaultHttp2InboundFlowController(connection, frameWriter), new DefaultHttp2OutboundFlowController(connection, frameWriter), InboundHttp2ToHttpAdapter.newInstance(connection, maxContentLength)); responseHandler = new HttpResponseHandler(); diff --git a/example/src/main/java/io/netty/example/http2/server/HelloWorldHttp2Handler.java b/example/src/main/java/io/netty/example/http2/server/HelloWorldHttp2Handler.java index e63c9002bb..320a693167 100644 --- a/example/src/main/java/io/netty/example/http2/server/HelloWorldHttp2Handler.java +++ b/example/src/main/java/io/netty/example/http2/server/HelloWorldHttp2Handler.java @@ -55,7 +55,7 @@ public class HelloWorldHttp2Handler extends AbstractHttp2ConnectionHandler { private HelloWorldHttp2Handler(Http2Connection connection, Http2FrameWriter frameWriter) { super(connection, new Http2InboundFrameLogger(new DefaultHttp2FrameReader(), logger), frameWriter, - new DefaultHttp2InboundFlowController(connection), + new DefaultHttp2InboundFlowController(connection, frameWriter), new DefaultHttp2OutboundFlowController(connection, frameWriter)); }