Changing HTTP/2 inbound flow control to use Http2FrameWriter

Motivation:

This is just some general cleanup to get rid of the FrameWriter inner
interface withing Http2InboundFlowController.  It's not necessary since
the flow controller can just use the Http2FrameWriter to send
WINDOW_UPDATE frames.

Modifications:

Updated DefaultHttp2InboundFlowController to use Http2FrameWriter.

Result:

The inbound flow control code is somewhat less smelly :).
This commit is contained in:
nmittler 2014-08-22 12:32:05 -07:00
parent 8785ff5256
commit 5b1d50fa7c
9 changed files with 109 additions and 82 deletions

View File

@ -81,7 +81,7 @@ public abstract class AbstractHttp2ConnectionHandler extends ByteToMessageDecode
protected AbstractHttp2ConnectionHandler(Http2Connection connection,
Http2FrameReader frameReader, Http2FrameWriter frameWriter) {
this(connection, frameReader, frameWriter,
new DefaultHttp2InboundFlowController(connection),
new DefaultHttp2InboundFlowController(connection, frameWriter),
new DefaultHttp2OutboundFlowController(connection, frameWriter));
}
@ -824,16 +824,7 @@ public abstract class AbstractHttp2ConnectionHandler extends ByteToMessageDecode
stream.verifyState(STREAM_CLOSED, OPEN, HALF_CLOSED_LOCAL);
// Apply flow control.
inboundFlow.applyInboundFlowControl(streamId, data, padding, endOfStream,
new Http2InboundFlowController.FrameWriter() {
@Override
public void writeFrame(int streamId, int windowSizeIncrement)
throws Http2Exception {
frameWriter.writeWindowUpdate(ctx, streamId, windowSizeIncrement,
ctx.newPromise());
ctx.flush();
}
});
inboundFlow.onDataRead(ctx, streamId, data, padding, endOfStream);
verifyGoAwayNotReceived();
verifyRstStreamNotReceived(stream);

View File

@ -20,6 +20,7 @@ import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_WINDOW_SIZE;
import static io.netty.handler.codec.http2.Http2Exception.flowControlError;
import static io.netty.handler.codec.http2.Http2Exception.protocolError;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
/**
* Basic implementation of {@link Http2InboundFlowController}.
@ -27,13 +28,18 @@ import io.netty.buffer.ByteBuf;
public class DefaultHttp2InboundFlowController implements Http2InboundFlowController {
private final Http2Connection connection;
private final Http2FrameWriter frameWriter;
private int initialWindowSize = DEFAULT_WINDOW_SIZE;
public DefaultHttp2InboundFlowController(Http2Connection connection) {
public DefaultHttp2InboundFlowController(Http2Connection connection, Http2FrameWriter frameWriter) {
if (connection == null) {
throw new NullPointerException("connection");
}
if (frameWriter == null) {
throw new NullPointerException("frameWriter");
}
this.connection = connection;
this.frameWriter = frameWriter;
// Add a flow state for the connection.
connection.connectionStream().inboundFlow(new InboundFlowState(CONNECTION_STREAM_ID));
@ -65,12 +71,22 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro
}
@Override
public void applyInboundFlowControl(int streamId, ByteBuf data, int padding,
boolean endOfStream, FrameWriter frameWriter)
public void onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, boolean endOfStream)
throws Http2Exception {
int dataLength = data.readableBytes() + padding;
applyConnectionFlowControl(dataLength, frameWriter);
applyStreamFlowControl(streamId, dataLength, endOfStream, frameWriter);
boolean windowUpdateSent = false;
try {
// Apply the connection-level flow control.
windowUpdateSent = applyConnectionFlowControl(ctx, dataLength);
// Apply the stream-level flow control.
windowUpdateSent |= applyStreamFlowControl(ctx, streamId, dataLength, endOfStream);
} finally {
// Optimization: only flush once for any sent WINDOW_UPDATE frames.
if (windowUpdateSent) {
ctx.flush();
}
}
}
private InboundFlowState connectionState() {
@ -98,8 +114,10 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro
/**
* Apply connection-wide flow control to the incoming data frame.
*
* @return {@code true} if a {@code WINDOW_UPDATE} frame was sent for the connection.
*/
private void applyConnectionFlowControl(int dataLength, FrameWriter frameWriter)
private boolean applyConnectionFlowControl(ChannelHandlerContext ctx, int dataLength)
throws Http2Exception {
// Remove the data length from the available window size. Throw if the lower bound
// was exceeded.
@ -110,15 +128,19 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro
// to the initial value and send a window update to the remote endpoint indicating
// the new window size.
if (connectionState.window() <= getWindowUpdateThreshold()) {
connectionState.updateWindow(frameWriter);
connectionState.updateWindow(ctx);
return true;
}
return false;
}
/**
* Apply stream-based flow control to the incoming data frame.
*
* @return {@code true} if a {@code WINDOW_UPDATE} frame was sent for the stream.
*/
private void applyStreamFlowControl(int streamId, int dataLength, boolean endOfStream,
FrameWriter frameWriter) throws Http2Exception {
private boolean applyStreamFlowControl(ChannelHandlerContext ctx, int streamId, int dataLength,
boolean endOfStream) throws Http2Exception {
// Remove the data length from the available window size. Throw if the lower bound
// was exceeded.
InboundFlowState state = stateOrFail(streamId);
@ -128,8 +150,10 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro
// to the initial value and send a window update to the remote endpoint indicating
// the new window size.
if (state.window() <= getWindowUpdateThreshold() && !endOfStream) {
state.updateWindow(frameWriter);
state.updateWindow(ctx);
return true;
}
return false;
}
/**
@ -213,13 +237,13 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro
* size back to the size of the initial window and sends a window update frame to the remote
* endpoint.
*/
void updateWindow(FrameWriter frameWriter) throws Http2Exception {
void updateWindow(ChannelHandlerContext ctx) throws Http2Exception {
// Expand the window for this stream back to the size of the initial window.
int deltaWindowSize = initialWindowSize - window;
addAndGet(deltaWindowSize);
// Send a window update for the stream/connection.
frameWriter.writeFrame(streamId, deltaWindowSize);
frameWriter.writeWindowUpdate(ctx, streamId, deltaWindowSize, ctx.newPromise());
}
}
}

View File

@ -0,0 +1,38 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License, version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.netty.handler.codec.http2;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
/**
* An observer of HTTP/2 {@code DATA} frames.
*/
public interface Http2DataObserver {
/**
* Handles an inbound {@code DATA} frame.
*
* @param ctx the context from the handler where the frame was read.
* @param streamId the subject stream for the frame.
* @param data payload buffer for the frame. If this buffer needs to be retained by the observer
* they must make a copy.
* @param padding the number of padding bytes found at the end of the frame.
* @param endOfStream Indicates whether this is the last frame to be sent from the remote
* endpoint for this stream.
*/
void onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding,
boolean endOfStream) throws Http2Exception;
}

View File

@ -21,21 +21,7 @@ import io.netty.channel.ChannelHandlerContext;
/**
* An observer of HTTP/2 frames.
*/
public interface Http2FrameObserver {
/**
* Handles an inbound DATA frame.
*
* @param ctx the context from the handler where the frame was read.
* @param streamId the subject stream for the frame.
* @param data payload buffer for the frame. If this buffer needs to be retained by the observer
* they must make a copy.
* @param padding the number of padding bytes found at the end of the frame.
* @param endOfStream Indicates whether this is the last frame to be sent from the remote
* endpoint for this stream.
*/
void onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding,
boolean endOfStream) throws Http2Exception;
public interface Http2FrameObserver extends Http2DataObserver {
/**
* Handles an inbound HEADERS frame.

View File

@ -15,24 +15,10 @@
package io.netty.handler.codec.http2;
import io.netty.buffer.ByteBuf;
/**
* Controls the inbound flow of data frames from the remote endpoint.
*/
public interface Http2InboundFlowController {
/**
* A writer of window update frames.
* TODO: Use Http2FrameWriter instead.
*/
interface FrameWriter {
/**
* Writes a window update frame to the remote endpoint.
*/
void writeFrame(int streamId, int windowSizeIncrement) throws Http2Exception;
}
public interface Http2InboundFlowController extends Http2DataObserver {
/**
* Sets the initial inbound flow control window size and updates all stream window sizes by the
@ -47,17 +33,4 @@ public interface Http2InboundFlowController {
* Gets the initial inbound flow control window size.
*/
int initialInboundWindowSize();
/**
* Applies flow control for the received data frame.
*
* @param streamId the ID of the stream receiving the data
* @param data the data portion of the data frame. Does not contain padding.
* @param padding the amount of padding received in the original frame.
* @param endOfStream indicates whether this is the last frame for the stream.
* @param frameWriter allows this flow controller to send window updates to the remote endpoint.
* @throws Http2Exception thrown if any protocol-related error occurred.
*/
void applyInboundFlowControl(int streamId, ByteBuf data, int padding, boolean endOfStream,
FrameWriter frameWriter) throws Http2Exception;
}

View File

@ -17,14 +17,17 @@ package io.netty.handler.codec.http2;
import static io.netty.handler.codec.http2.Http2CodecUtil.CONNECTION_STREAM_ID;
import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_WINDOW_SIZE;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.http2.Http2InboundFlowController.FrameWriter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import org.junit.Before;
import org.junit.Test;
@ -43,7 +46,13 @@ public class DefaultHttp2InboundFlowControllerTest {
private ByteBuf buffer;
@Mock
private FrameWriter frameWriter;
private Http2FrameWriter frameWriter;
@Mock
private ChannelHandlerContext ctx;
@Mock
private ChannelPromise promise;
private DefaultHttp2Connection connection;
@ -51,8 +60,10 @@ public class DefaultHttp2InboundFlowControllerTest {
public void setup() throws Http2Exception {
MockitoAnnotations.initMocks(this);
when(ctx.newPromise()).thenReturn(promise);
connection = new DefaultHttp2Connection(false);
controller = new DefaultHttp2InboundFlowController(connection);
controller = new DefaultHttp2InboundFlowController(connection, frameWriter);
connection.local().createStream(STREAM_ID, false);
}
@ -77,7 +88,7 @@ public class DefaultHttp2InboundFlowControllerTest {
// Set end-of-stream on the frame, so no window update will be sent for the stream.
applyFlowControl(dataSize, 0, true);
verify(frameWriter).writeFrame(eq(CONNECTION_STREAM_ID), eq(windowDelta));
verifyWindowUpdateSent(CONNECTION_STREAM_ID, windowDelta);
}
@Test
@ -88,8 +99,8 @@ public class DefaultHttp2InboundFlowControllerTest {
// Don't set end-of-stream so we'll get a window update for the stream as well.
applyFlowControl(dataSize, 0, false);
verify(frameWriter).writeFrame(eq(CONNECTION_STREAM_ID), eq(windowDelta));
verify(frameWriter).writeFrame(eq(STREAM_ID), eq(windowDelta));
verifyWindowUpdateSent(CONNECTION_STREAM_ID, windowDelta);
verifyWindowUpdateSent(STREAM_ID, windowDelta);
}
@Test
@ -108,8 +119,8 @@ public class DefaultHttp2InboundFlowControllerTest {
// Send the next frame and verify that the expected window updates were sent.
applyFlowControl(initialWindowSize, 0, false);
int delta = newInitialWindowSize - initialWindowSize;
verify(frameWriter).writeFrame(eq(CONNECTION_STREAM_ID), eq(delta));
verify(frameWriter).writeFrame(eq(STREAM_ID), eq(delta));
verifyWindowUpdateSent(CONNECTION_STREAM_ID, delta);
verifyWindowUpdateSent(STREAM_ID, delta);
}
private static int getWindowDelta(int initialSize, int windowSize, int dataSize) {
@ -119,7 +130,7 @@ public class DefaultHttp2InboundFlowControllerTest {
private void applyFlowControl(int dataSize, int padding, boolean endOfStream) throws Http2Exception {
ByteBuf buf = dummyData(dataSize);
controller.applyInboundFlowControl(STREAM_ID, buf, padding, endOfStream, frameWriter);
controller.onDataRead(ctx, STREAM_ID, buf, padding, endOfStream);
buf.release();
}
@ -129,7 +140,13 @@ public class DefaultHttp2InboundFlowControllerTest {
return buffer;
}
private void verifyWindowUpdateSent(int streamId, int windowSizeIncrement)
throws Http2Exception {
verify(frameWriter).writeWindowUpdate(eq(ctx), eq(streamId), eq(windowSizeIncrement), eq(promise));
}
private void verifyWindowUpdateNotSent() throws Http2Exception {
verify(frameWriter, never()).writeFrame(anyInt(), anyInt());
verify(frameWriter, never()).writeWindowUpdate(any(ChannelHandlerContext.class), anyInt(),
anyInt(), any(ChannelPromise.class));
}
}

View File

@ -254,8 +254,7 @@ public class DelegatingHttp2ConnectionHandlerTest {
public void dataReadAfterGoAwayShouldApplyFlowControl() throws Exception {
when(remote.isGoAwayReceived()).thenReturn(true);
decode().onDataRead(ctx, STREAM_ID, dummyData(), 10, true);
verify(inboundFlow).applyInboundFlowControl(eq(STREAM_ID), eq(dummyData()), eq(10),
eq(true), any(Http2InboundFlowController.FrameWriter.class));
verify(inboundFlow).onDataRead(eq(ctx), eq(STREAM_ID), eq(dummyData()), eq(10), eq(true));
// Verify that the event was absorbed and not propagated to the oberver.
verify(observer, never()).onDataRead(eq(ctx), anyInt(), any(ByteBuf.class), anyInt(),
@ -265,8 +264,7 @@ public class DelegatingHttp2ConnectionHandlerTest {
@Test
public void dataReadWithEndOfStreamShouldCloseRemoteSide() throws Exception {
decode().onDataRead(ctx, STREAM_ID, dummyData(), 10, true);
verify(inboundFlow).applyInboundFlowControl(eq(STREAM_ID), eq(dummyData()), eq(10),
eq(true), any(Http2InboundFlowController.FrameWriter.class));
verify(inboundFlow).onDataRead(eq(ctx), eq(STREAM_ID), eq(dummyData()), eq(10), eq(true));
verify(stream).closeRemoteSide();
verify(observer).onDataRead(eq(ctx), eq(STREAM_ID), eq(dummyData()), eq(10), eq(true));
}

View File

@ -69,7 +69,7 @@ public class Http2ClientInitializer extends ChannelInitializer<SocketChannel> {
Http2Connection connection = new DefaultHttp2Connection(false);
Http2FrameWriter frameWriter = frameWriter();
connectionHandler = new DelegatingHttp2HttpConnectionHandler(connection,
frameReader(), frameWriter, new DefaultHttp2InboundFlowController(connection),
frameReader(), frameWriter, new DefaultHttp2InboundFlowController(connection, frameWriter),
new DefaultHttp2OutboundFlowController(connection, frameWriter),
InboundHttp2ToHttpAdapter.newInstance(connection, maxContentLength));
responseHandler = new HttpResponseHandler();

View File

@ -55,7 +55,7 @@ public class HelloWorldHttp2Handler extends AbstractHttp2ConnectionHandler {
private HelloWorldHttp2Handler(Http2Connection connection, Http2FrameWriter frameWriter) {
super(connection, new Http2InboundFrameLogger(new DefaultHttp2FrameReader(), logger),
frameWriter,
new DefaultHttp2InboundFlowController(connection),
new DefaultHttp2InboundFlowController(connection, frameWriter),
new DefaultHttp2OutboundFlowController(connection, frameWriter));
}