HTTP/2 Flow Controller interface updates

Motivation:
Flow control is a required part of the HTTP/2 specification but it is currently structured more like an optional item. It must be accessed through the property map which is time consuming and does not represent its required nature. This access pattern does not give any insight into flow control outside of the codec (or flow controller implementation).

Modifications:
1. Create a read only public interface for LocalFlowState and RemoteFlowState.
2. Add a LocalFlowState localFlowState(); and RemoteFlowState remoteFlowState(); to Http2Stream.

Result:
Flow control is not part of the Http2Stream interface. This clarifies its responsibility and logical relationship to other interfaces. The flow controller no longer must be acquired though a map lookup.
This commit is contained in:
Scott Mitchell 2015-04-09 00:07:08 -07:00
parent f467d695be
commit 541137cc93
10 changed files with 122 additions and 86 deletions

View File

@ -246,6 +246,8 @@ public class DefaultHttp2Connection implements Http2Connection {
private int prioritizableForTree = 1;
private boolean resetSent;
private PropertyMap data;
private FlowControlState localFlowState;
private FlowControlState remoteFlowState;
DefaultStream(int id) {
this.id = id;
@ -262,6 +264,26 @@ public class DefaultHttp2Connection implements Http2Connection {
return state;
}
@Override
public FlowControlState localFlowState() {
return localFlowState;
}
@Override
public void localFlowState(FlowControlState state) {
localFlowState = state;
}
@Override
public FlowControlState remoteFlowState() {
return remoteFlowState;
}
@Override
public void remoteFlowState(FlowControlState state) {
remoteFlowState = state;
}
@Override
public boolean isResetSent() {
return resetSent;
@ -917,6 +939,7 @@ public class DefaultHttp2Connection implements Http2Connection {
private void addStream(DefaultStream stream) {
// Add the stream to the map and priority tree.
streamMap.put(stream.id(), stream);
List<ParentChangedEvent> events = new ArrayList<ParentChangedEvent>(1);
connectionStream.takeChild(stream, false, events);

View File

@ -30,6 +30,7 @@ import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http2.Http2Exception.CompositeStreamException;
import io.netty.handler.codec.http2.Http2Exception.StreamException;
import io.netty.handler.codec.http2.Http2Stream.FlowControlState;
/**
* Basic implementation of {@link Http2LocalFlowController}.
@ -57,14 +58,14 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController
windowUpdateRatio(windowUpdateRatio);
// Add a flow state for the connection.
final Http2Stream connectionStream = connection.connectionStream();
connectionStream.setProperty(FlowState.class, new FlowState(connectionStream, initialWindowSize));
connection.connectionStream().localFlowState(
new DefaultFlowState(connection.connectionStream(), initialWindowSize));
// Register for notification of new streams.
connection.addListener(new Http2ConnectionAdapter() {
@Override
public void onStreamAdded(Http2Stream stream) {
stream.setProperty(FlowState.class, new FlowState(stream, 0));
stream.localFlowState(new DefaultFlowState(stream, 0));
}
@Override
@ -91,15 +92,10 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController
return initialWindowSize;
}
@Override
public int windowSize(Http2Stream stream) {
return state(stream).window();
}
@Override
public void incrementWindowSize(ChannelHandlerContext ctx, Http2Stream stream, int delta) throws Http2Exception {
checkNotNull(ctx, "ctx");
FlowState state = state(stream);
DefaultFlowState state = state(stream);
// Just add the delta to the stream-specific initial window size so that the next time the window
// expands it will grow to the new initial size.
state.incrementInitialStreamWindow(delta);
@ -160,7 +156,7 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController
*/
public void windowUpdateRatio(ChannelHandlerContext ctx, Http2Stream stream, float ratio) throws Http2Exception {
checkValidRatio(ratio);
FlowState state = state(stream);
DefaultFlowState state = state(stream);
state.windowUpdateRatio(ratio);
state.writeWindowUpdateIfNeeded(ctx);
}
@ -184,24 +180,23 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController
connectionState().receiveFlowControlledFrame(dataLength);
// Apply the stream-level flow control
FlowState state = state(stream);
DefaultFlowState state = state(stream);
state.endOfStream(endOfStream);
state.receiveFlowControlledFrame(dataLength);
}
private FlowState connectionState() {
return state(connection.connectionStream());
private DefaultFlowState connectionState() {
return (DefaultFlowState) connection.connectionStream().localFlowState();
}
private static FlowState state(Http2Stream stream) {
checkNotNull(stream, "stream");
return stream.getProperty(FlowState.class);
private static DefaultFlowState state(Http2Stream stream) {
return (DefaultFlowState) checkNotNull(stream, "stream").localFlowState();
}
/**
* Flow control window state for an individual stream.
*/
private final class FlowState {
private final class DefaultFlowState implements FlowControlState {
private final Http2Stream stream;
/**
@ -233,16 +228,22 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController
private int lowerBound;
private boolean endOfStream;
FlowState(Http2Stream stream, int initialWindowSize) {
DefaultFlowState(Http2Stream stream, int initialWindowSize) {
this.stream = stream;
window(initialWindowSize);
streamWindowUpdateRatio = windowUpdateRatio;
}
int window() {
@Override
public int windowSize() {
return window;
}
@Override
public int initialWindowSize() {
return initialStreamWindowSize;
}
void window(int initialWindowSize) {
window = processedWindow = initialStreamWindowSize = initialWindowSize;
}
@ -330,7 +331,7 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController
}
// Return bytes to the connection window
FlowState connectionState = connectionState();
DefaultFlowState connectionState = connectionState();
connectionState.returnProcessedBytes(numBytes);
connectionState.writeWindowUpdateIfNeeded(ctx);
@ -392,7 +393,7 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController
public boolean visit(Http2Stream stream) throws Http2Exception {
try {
// Increment flow control window first so state will be consistent if overflow is detected.
FlowState state = state(stream);
DefaultFlowState state = state(stream);
state.incrementFlowControlWindows(delta);
state.incrementInitialStreamWindow(delta);
} catch (StreamException e) {

View File

@ -22,8 +22,8 @@ import static io.netty.handler.codec.http2.Http2Exception.streamError;
import static io.netty.util.internal.ObjectUtil.checkNotNull;
import static java.lang.Math.max;
import static java.lang.Math.min;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http2.Http2Stream.FlowControlState;
import io.netty.handler.codec.http2.Http2Stream.State;
import java.util.ArrayDeque;
@ -50,15 +50,15 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
this.connection = checkNotNull(connection, "connection");
// Add a flow state for the connection.
connection.connectionStream().setProperty(FlowState.class,
new FlowState(connection.connectionStream(), initialWindowSize));
connection.connectionStream().remoteFlowState(
new DefaultFlowState(connection.connectionStream(), initialWindowSize));
// Register for notification of new streams.
connection.addListener(new Http2ConnectionAdapter() {
@Override
public void onStreamAdded(Http2Stream stream) {
// Just add a new flow state to the stream.
stream.setProperty(FlowState.class, new FlowState(stream, 0));
stream.remoteFlowState(new DefaultFlowState(stream, 0));
}
@Override
@ -145,11 +145,6 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
return initialWindowSize;
}
@Override
public int windowSize(Http2Stream stream) {
return state(stream).window();
}
@Override
public void incrementWindowSize(ChannelHandlerContext ctx, Http2Stream stream, int delta) throws Http2Exception {
if (stream.id() == CONNECTION_STREAM_ID) {
@ -158,7 +153,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
writePendingBytes();
} else {
// Update the stream window and write any pending frames for the stream.
FlowState state = state(stream);
DefaultFlowState state = state(stream);
state.incrementStreamWindow(delta);
state.writeBytes(state.writableWindow());
flush();
@ -174,7 +169,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
}
// Save the context. We'll use this later when we write pending bytes.
this.ctx = ctx;
final FlowState state;
final DefaultFlowState state;
try {
state = state(stream);
state.enqueueFrame(frame);
@ -198,20 +193,19 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
return state(stream).streamableBytesForTree();
}
private static FlowState state(Http2Stream stream) {
checkNotNull(stream, "stream");
return stream.getProperty(FlowState.class);
private static DefaultFlowState state(Http2Stream stream) {
return (DefaultFlowState) checkNotNull(stream, "stream").remoteFlowState();
}
private FlowState connectionState() {
return state(connection.connectionStream());
private DefaultFlowState connectionState() {
return (DefaultFlowState) connection.connectionStream().remoteFlowState();
}
/**
* Returns the flow control window for the entire connection.
*/
private int connectionWindow() {
return connectionState().window();
private int connectionWindowSize() {
return connectionState().windowSize();
}
/**
@ -229,11 +223,11 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
*/
private void writePendingBytes() throws Http2Exception {
Http2Stream connectionStream = connection.connectionStream();
int connectionWindow = state(connectionStream).window();
int connectionWindowSize = state(connectionStream).windowSize();
if (connectionWindow > 0) {
if (connectionWindowSize > 0) {
// Allocate the bytes for the connection window to the streams, but do not write.
allocateBytesForTree(connectionStream, connectionWindow);
allocateBytesForTree(connectionStream, connectionWindowSize);
// Now write all of the allocated bytes.
connection.forEachActiveStream(WRITE_ALLOCATED_BYTES);
@ -248,23 +242,23 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
* bytes to send) and we may need some sort of rounding to accomplish this.
*
* @param parent The parent of the tree.
* @param connectionWindow The connection window this is available for use at this point in the tree.
* @param connectionWindowSize The connection window this is available for use at this point in the tree.
* @return An object summarizing the write and allocation results.
*/
static int allocateBytesForTree(Http2Stream parent, int connectionWindow) throws Http2Exception {
FlowState state = state(parent);
static int allocateBytesForTree(Http2Stream parent, int connectionWindowSize) throws Http2Exception {
DefaultFlowState state = state(parent);
if (state.streamableBytesForTree() <= 0) {
return 0;
}
// If the number of streamable bytes for this tree will fit in the connection window
// then there is no need to prioritize the bytes...everyone sends what they have
if (state.streamableBytesForTree() <= connectionWindow) {
SimpleChildFeeder childFeeder = new SimpleChildFeeder(connectionWindow);
if (state.streamableBytesForTree() <= connectionWindowSize) {
SimpleChildFeeder childFeeder = new SimpleChildFeeder(connectionWindowSize);
parent.forEachChild(childFeeder);
return childFeeder.bytesAllocated;
}
ChildFeeder childFeeder = new ChildFeeder(parent, connectionWindow);
ChildFeeder childFeeder = new ChildFeeder(parent, connectionWindowSize);
// Iterate once over all children of this parent and try to feed all the children.
parent.forEachChild(childFeeder);
@ -303,7 +297,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
int connectionWindowChunk = max(1, (int) (connectionWindow * (child.weight() / (double) totalWeight)));
int bytesForTree = min(nextConnectionWindow, connectionWindowChunk);
FlowState state = state(child);
DefaultFlowState state = state(child);
int bytesForChild = min(state.streamableBytes(), bytesForTree);
// Allocate the bytes to this child.
@ -399,7 +393,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
@Override
public boolean visit(Http2Stream child) throws Http2Exception {
FlowState childState = state(child);
DefaultFlowState childState = state(child);
int bytesForChild = childState.streamableBytes();
if (bytesForChild > 0 || childState.hasFrame()) {
@ -415,9 +409,9 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
}
/**
* The outbound flow control state for a single stream.
* The remote flow control state for a single stream.
*/
private final class FlowState {
private final class DefaultFlowState implements FlowControlState {
private final Deque<FlowControlled> pendingWriteQueue;
private final Http2Stream stream;
private int window;
@ -429,16 +423,22 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
// Set to true if cancel() was called.
private boolean cancelled;
FlowState(Http2Stream stream, int initialWindowSize) {
DefaultFlowState(Http2Stream stream, int initialWindowSize) {
this.stream = stream;
window(initialWindowSize);
pendingWriteQueue = new ArrayDeque<FlowControlled>(2);
}
int window() {
@Override
public int windowSize() {
return window;
}
@Override
public int initialWindowSize() {
return initialWindowSize;
}
void window(int initialWindowSize) {
window = initialWindowSize;
}
@ -498,7 +498,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
* Returns the maximum writable window (minimum of the stream and connection windows).
*/
int writableWindow() {
return min(window, connectionWindow());
return min(window, connectionWindowSize());
}
/**

View File

@ -303,11 +303,6 @@ public class DelegatingDecompressorFrameListener extends Http2FrameListenerDecor
return flowController.initialWindowSize();
}
@Override
public int windowSize(Http2Stream stream) {
return flowController.windowSize(stream);
}
@Override
public void incrementWindowSize(ChannelHandlerContext ctx, Http2Stream stream, int delta)
throws Http2Exception {

View File

@ -39,16 +39,6 @@ public interface Http2FlowController {
*/
int initialWindowSize();
/**
* Gets the number of bytes remaining in the flow control window size for the given stream.
*
* @param stream The subject stream. Use {@link Http2Connection#connectionStream()} for
* requesting the size of the connection window.
* @return the current size of the flow control window.
* @throws IllegalArgumentException if the given stream does not exist.
*/
int windowSize(Http2Stream stream);
/**
* Increments the size of the stream's flow control window by the given delta.
* <p>

View File

@ -33,6 +33,24 @@ public interface Http2Stream {
CLOSED
}
/**
* Represents the state which flow controller implementations are expected to track.
*/
interface FlowControlState {
/**
* Get the portion of the flow control window that is available for sending/receiving frames which are subject
* to flow control. This quantity is measured in number of bytes.
*/
int windowSize();
/**
* Get the initial flow control window size. This quantity is measured in number of bytes.
* Note the unavailable window portion can be calculated by
* {@link #initialWindowSize()} - {@link #windowSize()}.
*/
int initialWindowSize();
}
/**
* Gets the unique identifier for this stream within the connection.
*/
@ -43,6 +61,26 @@ public interface Http2Stream {
*/
State state();
/**
* Get the state as related to the {@link Http2LocalFlowController}.
*/
FlowControlState localFlowState();
/**
* Set the state as related to the {@link Http2LocalFlowController}.
*/
void localFlowState(FlowControlState state);
/**
* Get the state as related to {@link Http2RemoteFlowController}.
*/
FlowControlState remoteFlowState();
/**
* Set the state as related to {@link Http2RemoteFlowController}.
*/
void remoteFlowState(FlowControlState state);
/**
* Opens this stream, making it available via {@link Http2Connection#forEachActiveStream(Http2StreamVisitor)} and
* transition state to:

View File

@ -287,7 +287,7 @@ public class DefaultHttp2LocalFlowControllerTest {
}
private int window(int streamId) throws Http2Exception {
return controller.windowSize(stream(streamId));
return stream(streamId).localFlowState().windowSize();
}
private Http2Stream stream(int streamId) throws Http2Exception {

View File

@ -256,7 +256,6 @@ public class DefaultHttp2RemoteFlowControllerTest {
final int initWindow = 20;
final int secondWindowSize = 10;
controller.initialWindowSize(initWindow);
Http2Stream streamA = connection.stream(STREAM_A);
FakeFlowControlled data1 = new FakeFlowControlled(initWindow);
FakeFlowControlled data2 = new FakeFlowControlled(5);
@ -267,7 +266,7 @@ public class DefaultHttp2RemoteFlowControllerTest {
// Make the window size for stream A negative
controller.initialWindowSize(initWindow - secondWindowSize);
assertEquals(-secondWindowSize, controller.windowSize(streamA));
assertEquals(-secondWindowSize, window(STREAM_A));
// Queue up a write. It should not be written now because the window is negative
sendData(STREAM_A, data2);
@ -275,12 +274,12 @@ public class DefaultHttp2RemoteFlowControllerTest {
// Open the window size back up a bit (no send should happen)
incrementWindowSize(STREAM_A, 5);
assertEquals(-5, controller.windowSize(streamA));
assertEquals(-5, window(STREAM_A));
data2.assertNotWritten();
// Open the window size back up a bit (no send should happen)
incrementWindowSize(STREAM_A, 5);
assertEquals(0, controller.windowSize(streamA));
assertEquals(0, window(STREAM_A));
data2.assertNotWritten();
// Open the window size back up and allow the write to happen
@ -1223,7 +1222,7 @@ public class DefaultHttp2RemoteFlowControllerTest {
}
private int window(int streamId) throws Http2Exception {
return controller.windowSize(stream(streamId));
return stream(streamId).remoteFlowState().windowSize();
}
private void incrementWindowSize(int streamId, int delta) throws Http2Exception {

View File

@ -35,11 +35,6 @@ public final class NoopHttp2LocalFlowController implements Http2LocalFlowControl
return MAX_INITIAL_WINDOW_SIZE;
}
@Override
public int windowSize(Http2Stream stream) {
return MAX_INITIAL_WINDOW_SIZE;
}
@Override
public void incrementWindowSize(ChannelHandlerContext ctx, Http2Stream stream, int delta)
throws Http2Exception {

View File

@ -33,11 +33,6 @@ public final class NoopHttp2RemoteFlowController implements Http2RemoteFlowContr
return MAX_INITIAL_WINDOW_SIZE;
}
@Override
public int windowSize(Http2Stream stream) {
return MAX_INITIAL_WINDOW_SIZE;
}
@Override
public void incrementWindowSize(ChannelHandlerContext ctx, Http2Stream stream, int delta)
throws Http2Exception {