Reducing memory usage for HTTP/2

Motivation:

The connection, priority tree, and inbound/outbound flow controllers
each maintain a separate map for stream information. This is wasteful
and complicates the design since as streams are added/removed, multiple
structures have to be updated.

Modifications:

- Merging the priority tree into Http2Connection. Then we can use
Http2Connection as the central stream repository.

- Adding observer pattern to Http2Connection so flow controllers can be
told when a new stream is created, closed, etc.

- Adding properties for inboundFlow/outboundFlow state to Http2Stream.
This allows the controller to access flow control state directly from
the stream without requiring additional structures.

- Separate out the StreamRemovalPolicy and created a "default"
implementation that runs periodic garbage collection.  This used to be
internal to the outbound flow controller, but I think it is more general
than that.

Result:

HTTP/2 classes will require less storage for new streams.
This commit is contained in:
nmittler 2014-06-05 08:05:25 -07:00 committed by Norman Maurer
parent 405d573715
commit bcb180b08d
22 changed files with 1349 additions and 1218 deletions

View File

@ -82,7 +82,7 @@ public abstract class AbstractHttp2ConnectionHandler extends ByteToMessageDecode
protected AbstractHttp2ConnectionHandler(Http2Connection connection) {
this(connection, new DefaultHttp2FrameReader(), new DefaultHttp2FrameWriter(),
new DefaultHttp2InboundFlowController(), new DefaultHttp2OutboundFlowController());
new DefaultHttp2InboundFlowController(connection), new DefaultHttp2OutboundFlowController(connection));
}
protected AbstractHttp2ConnectionHandler(Http2Connection connection,
@ -283,18 +283,13 @@ public abstract class AbstractHttp2ConnectionHandler extends ByteToMessageDecode
// Sending headers on a reserved push stream ... open it for push to the remote
// endpoint.
stream.openForPush();
// Allow outbound traffic only.
if (!endStream) {
outboundFlow.addStream(streamId, streamDependency, weight, exclusive);
}
} else {
// The stream already exists, make sure it's in an allowed state.
stream.verifyState(PROTOCOL_ERROR, OPEN, HALF_CLOSED_REMOTE);
// Update the priority for this stream only if we'll be sending more data.
if (!endStream) {
outboundFlow.updateStream(stream.id(), streamDependency, weight, exclusive);
stream.setPriority(streamDependency, weight, exclusive);
}
}
@ -319,7 +314,7 @@ public abstract class AbstractHttp2ConnectionHandler extends ByteToMessageDecode
}
// Update the priority on this stream.
outboundFlow.updateStream(streamId, streamDependency, weight, exclusive);
connection.requireStream(streamId).setPriority(streamDependency, weight, exclusive);
return frameWriter.writePriority(ctx, promise, streamId, streamDependency, weight,
exclusive);
@ -543,7 +538,6 @@ public abstract class AbstractHttp2ConnectionHandler extends ByteToMessageDecode
case HALF_CLOSED_LOCAL:
case OPEN:
stream.closeLocalSide();
outboundFlow.removeStream(stream.id());
break;
default:
close(stream, ctx, future);
@ -556,7 +550,6 @@ public abstract class AbstractHttp2ConnectionHandler extends ByteToMessageDecode
case HALF_CLOSED_REMOTE:
case OPEN:
stream.closeRemoteSide();
inboundFlow.removeStream(stream.id());
break;
default:
close(stream, ctx, future);
@ -567,10 +560,6 @@ public abstract class AbstractHttp2ConnectionHandler extends ByteToMessageDecode
private void close(Http2Stream stream, ChannelHandlerContext ctx, ChannelFuture future) {
stream.close();
// Notify the flow controllers.
inboundFlow.removeStream(stream.id());
outboundFlow.removeStream(stream.id());
// If this connection is closing and there are no longer any
// active streams, close after the current operation completes.
if (closeListener != null && connection.numActiveStreams() == 0) {
@ -670,12 +659,7 @@ public abstract class AbstractHttp2ConnectionHandler extends ByteToMessageDecode
*/
private Http2Stream createLocalStream(int streamId, boolean halfClosed, int streamDependency,
short weight, boolean exclusive) throws Http2Exception {
Http2Stream stream = connection.local().createStream(streamId, halfClosed);
inboundFlow.addStream(streamId);
if (!halfClosed) {
outboundFlow.addStream(streamId, streamDependency, weight, exclusive);
}
return stream;
return connection.local().createStream(streamId, halfClosed);
}
/**
@ -683,12 +667,7 @@ public abstract class AbstractHttp2ConnectionHandler extends ByteToMessageDecode
*/
private Http2Stream createRemoteStream(int streamId, boolean halfClosed, int streamDependency,
short weight, boolean exclusive) throws Http2Exception {
Http2Stream stream = connection.remote().createStream(streamId, halfClosed);
outboundFlow.addStream(streamId, streamDependency, weight, exclusive);
if (!halfClosed) {
inboundFlow.addStream(streamId);
}
return stream;
return connection.remote().createStream(streamId, halfClosed);
}
/**
@ -759,11 +738,6 @@ public abstract class AbstractHttp2ConnectionHandler extends ByteToMessageDecode
// local endpoint.
stream.verifyState(PROTOCOL_ERROR, RESERVED_REMOTE);
stream.openForPush();
// Allow inbound traffic only.
if (!endStream) {
inboundFlow.addStream(streamId);
}
} else {
// Receiving headers on an existing stream. Make sure the stream is in an
// allowed
@ -772,7 +746,7 @@ public abstract class AbstractHttp2ConnectionHandler extends ByteToMessageDecode
// Update the outbound priority if outbound traffic is allowed.
if (stream.state() == OPEN) {
outboundFlow.updateStream(streamId, streamDependency, weight, exclusive);
stream.setPriority(streamDependency, weight, exclusive);
}
}
@ -797,7 +771,7 @@ public abstract class AbstractHttp2ConnectionHandler extends ByteToMessageDecode
}
// Set the priority for this stream on the flow controller.
outboundFlow.updateStream(streamId, streamDependency, weight, exclusive);
connection.requireStream(streamId).setPriority(streamDependency, weight, exclusive);
AbstractHttp2ConnectionHandler.this.onPriorityRead(ctx, streamId, streamDependency,
weight, exclusive);

View File

@ -15,16 +15,27 @@
package io.netty.handler.codec.http2;
import static io.netty.handler.codec.http2.Http2CodecUtil.CONNECTION_STREAM_ID;
import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT;
import static io.netty.handler.codec.http2.Http2CodecUtil.MAX_WEIGHT;
import static io.netty.handler.codec.http2.Http2CodecUtil.MIN_WEIGHT;
import static io.netty.handler.codec.http2.Http2CodecUtil.immediateRemovalPolicy;
import static io.netty.handler.codec.http2.Http2Exception.format;
import static io.netty.handler.codec.http2.Http2Exception.protocolError;
import static io.netty.handler.codec.http2.Http2Stream.State.CLOSED;
import static io.netty.handler.codec.http2.Http2Stream.State.HALF_CLOSED_LOCAL;
import static io.netty.handler.codec.http2.Http2Stream.State.HALF_CLOSED_REMOTE;
import static io.netty.handler.codec.http2.Http2Stream.State.IDLE;
import static io.netty.handler.codec.http2.Http2Stream.State.OPEN;
import static io.netty.handler.codec.http2.Http2Stream.State.RESERVED_LOCAL;
import static io.netty.handler.codec.http2.Http2Stream.State.RESERVED_REMOTE;
import io.netty.handler.codec.http2.Http2StreamRemovalPolicy.Action;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Set;
@ -34,16 +45,71 @@ import java.util.Set;
*/
public class DefaultHttp2Connection implements Http2Connection {
private final Set<Listener> listeners = new HashSet<Listener>(4);
private final Map<Integer, Http2Stream> streamMap = new HashMap<Integer, Http2Stream>();
private final ConnectionStream connectionStream = new ConnectionStream();
private final Set<Http2Stream> activeStreams = new LinkedHashSet<Http2Stream>();
private final DefaultEndpoint localEndpoint;
private final DefaultEndpoint remoteEndpoint;
private final Http2StreamRemovalPolicy removalPolicy;
private boolean goAwaySent;
private boolean goAwayReceived;
/**
* Creates a connection with compression disabled and an immediate stream removal policy.
*
* @param server whether or not this end-point is the server-side of the HTTP/2 connection.
*/
public DefaultHttp2Connection(boolean server) {
this(server, false);
}
/**
* Creates a connection with an immediate stream removal policy.
*
* @param server whether or not this end-point is the server-side of the HTTP/2 connection.
* @param allowCompressedData if true, compressed frames are allowed from the remote end-point.
*/
public DefaultHttp2Connection(boolean server, boolean allowCompressedData) {
this(server, allowCompressedData, immediateRemovalPolicy());
}
/**
* Creates a new connection with the given settings.
*
* @param server whether or not this end-point is the server-side of the HTTP/2 connection.
* @param allowCompressedData if true, compressed frames are allowed from the remote end-point.
* @param removalPolicy the policy to be used for removal of closed stream.
*/
public DefaultHttp2Connection(boolean server, boolean allowCompressedData,
Http2StreamRemovalPolicy removalPolicy) {
if (removalPolicy == null) {
throw new NullPointerException("removalPolicy");
}
this.removalPolicy = removalPolicy;
localEndpoint = new DefaultEndpoint(server, allowCompressedData);
remoteEndpoint = new DefaultEndpoint(!server, false);
// Tell the removal policy how to remove a stream from this connection.
removalPolicy.setAction(new Action() {
@Override
public void removeStream(Http2Stream stream) {
DefaultHttp2Connection.this.removeStream((DefaultStream) stream);
}
});
// Add the connection stream to the map.
streamMap.put(connectionStream.id(), connectionStream);
}
@Override
public void addListener(Listener listener) {
listeners.add(listener);
}
@Override
public void removeListener(Listener listener) {
listeners.remove(listener);
}
@Override
@ -51,6 +117,11 @@ public class DefaultHttp2Connection implements Http2Connection {
return localEndpoint.isServer();
}
@Override
public Http2Stream connectionStream() {
return connectionStream;
}
@Override
public Http2Stream requireStream(int streamId) throws Http2Exception {
Http2Stream stream = stream(streamId);
@ -110,29 +181,216 @@ public class DefaultHttp2Connection implements Http2Connection {
return isGoAwaySent() || isGoAwayReceived();
}
private void addStream(DefaultStream stream) {
// Add the stream to the map and priority tree.
streamMap.put(stream.id(), stream);
connectionStream.addChild(stream, false);
// Notify the observers of the event.
for (Listener listener : listeners) {
listener.streamAdded(stream);
}
}
private void removeStream(DefaultStream stream) {
// Notify the observers of the event first.
for (Listener listener : listeners) {
listener.streamRemoved(stream);
}
// Remove it from the map and priority tree.
streamMap.remove(stream.id());
((DefaultStream) stream.parent()).removeChild(stream);
}
private void activate(Http2Stream stream) {
activeStreams.add(stream);
for (Listener listener : listeners) {
listener.streamActive(stream);
}
}
private void deactivate(Http2Stream stream) {
activeStreams.remove(stream);
for (Listener listener : listeners) {
listener.streamInactive(stream);
}
}
private void notifyHalfClosed(Http2Stream stream) {
for (Listener listener : listeners) {
listener.streamHalfClosed(stream);
}
}
private void notifyPriorityChanged(Http2Stream stream, Http2Stream previousParent) {
for (Listener listener : listeners) {
listener.streamPriorityChanged(stream, previousParent);
}
}
private void notifyPrioritySubtreeChanged(Http2Stream stream, Http2Stream subtreeRoot) {
for (Listener listener : listeners) {
listener.streamPrioritySubtreeChanged(stream, subtreeRoot);
}
}
/**
* Simple stream implementation. Streams can be compared to each other by priority.
*/
private final class DefaultStream implements Http2Stream {
private class DefaultStream implements Http2Stream {
private final int id;
private State state = State.IDLE;
private State state = IDLE;
private short weight = DEFAULT_PRIORITY_WEIGHT;
private DefaultStream parent;
private Map<Integer, DefaultStream> children = newChildMap();
private int totalChildWeights;
private FlowState inboundFlow;
private FlowState outboundFlow;
DefaultStream(int id) {
this.id = id;
}
@Override
public int id() {
public final int id() {
return id;
}
@Override
public State state() {
public final State state() {
return state;
}
@Override
public Http2Stream verifyState(Http2Error error, State... allowedStates) throws Http2Exception {
public FlowState inboundFlow() {
return inboundFlow;
}
@Override
public void inboundFlow(FlowState state) {
inboundFlow = state;
}
@Override
public FlowState outboundFlow() {
return outboundFlow;
}
@Override
public void outboundFlow(FlowState state) {
outboundFlow = state;
}
@Override
public final boolean isRoot() {
return parent == null;
}
@Override
public final short weight() {
return weight;
}
@Override
public final int totalChildWeights() {
return totalChildWeights;
}
@Override
public final Http2Stream parent() {
return parent;
}
@Override
public final boolean isDescendantOf(Http2Stream stream) {
Http2Stream next = parent();
while (next != null) {
if (next == stream) {
return true;
}
next = next.parent();
}
return false;
}
@Override
public final boolean isLeaf() {
return numChildren() == 0;
}
@Override
public final int numChildren() {
return children.size();
}
@Override
public final Collection<? extends Http2Stream> children() {
return Collections.unmodifiableCollection(children.values());
}
@Override
public final boolean hasChild(int streamId) {
return child(streamId) != null;
}
@Override
public final Http2Stream child(int streamId) {
return children.get(streamId);
}
@Override
public Http2Stream setPriority(int parentStreamId, short weight, boolean exclusive)
throws Http2Exception {
if (weight < MIN_WEIGHT || weight > MAX_WEIGHT) {
throw new IllegalArgumentException(String.format(
"Invalid weight: %d. Must be between %d and %d (inclusive).", weight,
MIN_WEIGHT, MAX_WEIGHT));
}
// Get the parent stream.
DefaultStream newParent = (DefaultStream) requireStream(parentStreamId);
if (this == newParent) {
throw new IllegalArgumentException("A stream cannot depend on itself");
}
// Already have a priority. Re-prioritize the stream.
weight(weight);
boolean needToRestructure = newParent.isDescendantOf(this);
DefaultStream oldParent = (DefaultStream) parent();
try {
if (newParent == oldParent && !exclusive) {
// No changes were made to the tree structure.
return this;
}
// Break off the priority branch from it's current parent.
oldParent.removeChildBranch(this);
if (needToRestructure) {
// Adding a circular dependency (priority<->newParent). Break off the new
// parent's branch and add it above this priority.
((DefaultStream) newParent.parent()).removeChildBranch(newParent);
oldParent.addChild(newParent, false);
}
// Add the priority under the new parent.
newParent.addChild(this, exclusive);
return this;
} finally {
// Notify observers.
if (needToRestructure) {
notifyPrioritySubtreeChanged(this, newParent);
} else {
notifyPriorityChanged(this, oldParent);
}
}
}
@Override
public Http2Stream verifyState(Http2Error error, State... allowedStates)
throws Http2Exception {
for (State allowedState : allowedStates) {
if (state == allowedState) {
return this;
@ -153,18 +411,21 @@ public class DefaultHttp2Connection implements Http2Connection {
default:
throw protocolError("Attempting to open non-reserved stream for push");
}
activate(this);
return this;
}
@Override
public Http2Stream close() {
if (state == State.CLOSED) {
if (state == CLOSED) {
return this;
}
state = State.CLOSED;
activeStreams.remove(this);
streamMap.remove(id);
state = CLOSED;
deactivate(this);
// Mark this stream for removal.
removalPolicy.markForRemoval(this);
return this;
}
@ -173,6 +434,7 @@ public class DefaultHttp2Connection implements Http2Connection {
switch (state) {
case OPEN:
state = HALF_CLOSED_LOCAL;
notifyHalfClosed(this);
break;
case HALF_CLOSED_LOCAL:
break;
@ -188,6 +450,7 @@ public class DefaultHttp2Connection implements Http2Connection {
switch (state) {
case OPEN:
state = HALF_CLOSED_REMOTE;
notifyHalfClosed(this);
break;
case HALF_CLOSED_REMOTE:
break;
@ -199,14 +462,124 @@ public class DefaultHttp2Connection implements Http2Connection {
}
@Override
public boolean remoteSideOpen() {
public final boolean remoteSideOpen() {
return state == HALF_CLOSED_LOCAL || state == OPEN || state == RESERVED_REMOTE;
}
@Override
public boolean localSideOpen() {
public final boolean localSideOpen() {
return state == HALF_CLOSED_REMOTE || state == OPEN || state == RESERVED_LOCAL;
}
final void weight(short weight) {
if (parent != null && weight != this.weight) {
int delta = weight - this.weight;
parent.totalChildWeights += delta;
}
this.weight = weight;
}
final Map<Integer, DefaultStream> removeAllChildren() {
if (children.isEmpty()) {
return Collections.emptyMap();
}
totalChildWeights = 0;
Map<Integer, DefaultStream> prevChildren = children;
children = newChildMap();
return prevChildren;
}
/**
* Adds a child to this priority. If exclusive is set, any children of this node are moved
* to being dependent on the child.
*/
final void addChild(DefaultStream child, boolean exclusive) {
if (exclusive) {
// If it was requested that this child be the exclusive dependency of this node,
// move any previous children to the child node, becoming grand children
// of this node.
for (DefaultStream grandchild : removeAllChildren().values()) {
child.addChild(grandchild, false);
}
}
child.parent = this;
if (children.put(child.id(), child) == null) {
totalChildWeights += child.weight();
}
}
/**
* Removes the child priority and moves any of its dependencies to being direct dependencies
* on this node.
*/
final void removeChild(DefaultStream child) {
if (children.remove(child.id()) != null) {
child.parent = null;
totalChildWeights -= child.weight();
// Move up any grand children to be directly dependent on this node.
for (DefaultStream grandchild : child.children.values()) {
addChild(grandchild, false);
}
}
}
/**
* Removes the child priority but unlike {@link #removeChild}, leaves its branch unaffected.
*/
final void removeChildBranch(DefaultStream child) {
if (children.remove(child.id()) != null) {
child.parent = null;
totalChildWeights -= child.weight();
}
}
}
private static <T> Map<Integer, DefaultStream> newChildMap() {
return new LinkedHashMap<Integer, DefaultStream>(4);
}
/**
* Stream class representing the connection, itself.
*/
private final class ConnectionStream extends DefaultStream {
public ConnectionStream() {
super(CONNECTION_STREAM_ID);
}
@Override
public Http2Stream setPriority(int parentStreamId, short weight, boolean exclusive)
throws Http2Exception {
throw new UnsupportedOperationException();
}
@Override
public Http2Stream verifyState(Http2Error error, State... allowedStates)
throws Http2Exception {
throw new UnsupportedOperationException();
}
@Override
public Http2Stream openForPush() throws Http2Exception {
throw new UnsupportedOperationException();
}
@Override
public Http2Stream close() {
throw new UnsupportedOperationException();
}
@Override
public Http2Stream closeLocalSide() {
throw new UnsupportedOperationException();
}
@Override
public Http2Stream closeRemoteSide() {
throw new UnsupportedOperationException();
}
}
/**
@ -259,8 +632,8 @@ public class DefaultHttp2Connection implements Http2Connection {
lastStreamCreated = streamId;
// Register the stream and mark it as active.
streamMap.put(streamId, stream);
activeStreams.add(stream);
addStream(stream);
activate(stream);
return stream;
}
@ -291,7 +664,7 @@ public class DefaultHttp2Connection implements Http2Connection {
lastStreamCreated = streamId;
// Register the stream.
streamMap.put(streamId, stream);
addStream(stream);
return stream;
}

View File

@ -21,32 +21,30 @@ import static io.netty.handler.codec.http2.Http2Exception.flowControlError;
import static io.netty.handler.codec.http2.Http2Exception.protocolError;
import io.netty.buffer.ByteBuf;
import java.util.HashMap;
import java.util.Map;
/**
* Basic implementation of {@link Http2InboundFlowController}.
*/
public class DefaultHttp2InboundFlowController implements Http2InboundFlowController {
private final Http2Connection connection;
private int initialWindowSize = DEFAULT_FLOW_CONTROL_WINDOW_SIZE;
private final FlowState connectionState = new FlowState(CONNECTION_STREAM_ID);
private final Map<Integer, FlowState> streamStates = new HashMap<Integer, FlowState>();
@Override
public void addStream(int streamId) {
if (streamId <= 0) {
throw new IllegalArgumentException("Stream ID must be > 0");
public DefaultHttp2InboundFlowController(Http2Connection connection) {
if (connection == null) {
throw new NullPointerException("connection");
}
if (streamStates.containsKey(streamId)) {
throw new IllegalArgumentException("Stream " + streamId + " already exists.");
}
streamStates.put(streamId, new FlowState(streamId));
}
this.connection = connection;
@Override
public void removeStream(int streamId) {
streamStates.remove(streamId);
// Add a flow state for the connection.
connection.connectionStream().inboundFlow(new InboundFlowState(CONNECTION_STREAM_ID));
// Register for notification of new streams.
connection.addListener(new Http2ConnectionAdapter() {
@Override
public void streamAdded(Http2Stream stream) {
stream.inboundFlow(new InboundFlowState(stream.id()));
}
});
}
@Override
@ -55,9 +53,9 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro
initialWindowSize = newWindowSize;
// Apply the delta to all of the windows.
connectionState.addAndGet(deltaWindowSize);
for (FlowState window : streamStates.values()) {
window.updatedInitialWindowSize(deltaWindowSize);
connectionState().addAndGet(deltaWindowSize);
for (Http2Stream stream : connection.activeStreams()) {
state(stream).updatedInitialWindowSize(deltaWindowSize);
}
}
@ -75,6 +73,29 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro
applyStreamFlowControl(streamId, dataLength, endOfStream, frameWriter);
}
private InboundFlowState connectionState() {
return state(connection.connectionStream());
}
private InboundFlowState state(int streamId) {
return state(connection.stream(streamId));
}
private InboundFlowState state(Http2Stream stream) {
return stream != null? (InboundFlowState) stream.inboundFlow() : null;
}
/**
* Gets the window for the stream or raises a {@code PROTOCOL_ERROR} if not found.
*/
private InboundFlowState stateOrFail(int streamId) throws Http2Exception {
InboundFlowState state = state(streamId);
if (state == null) {
throw protocolError("Flow control window missing for stream: %d", streamId);
}
return state;
}
/**
* Apply connection-wide flow control to the incoming data frame.
*/
@ -82,12 +103,13 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro
throws Http2Exception {
// Remove the data length from the available window size. Throw if the lower bound
// was exceeded.
InboundFlowState connectionState = connectionState();
connectionState.addAndGet(-dataLength);
// If less than the window update threshold remains, restore the window size
// to the initial value and send a window update to the remote endpoint indicating
// the new window size.
if (connectionState.windowSize() <= getWindowUpdateThreshold()) {
if (connectionState.window() <= getWindowUpdateThreshold()) {
connectionState.updateWindow(frameWriter);
}
}
@ -99,13 +121,13 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro
FrameWriter frameWriter) throws Http2Exception {
// Remove the data length from the available window size. Throw if the lower bound
// was exceeded.
FlowState state = getStateOrFail(streamId);
InboundFlowState state = stateOrFail(streamId);
state.addAndGet(-dataLength);
// If less than the window update threshold remains, restore the window size
// to the initial value and send a window update to the remote endpoint indicating
// the new window size.
if (state.windowSize() <= getWindowUpdateThreshold() && !endOfStream) {
if (state.window() <= getWindowUpdateThreshold() && !endOfStream) {
state.updateWindow(frameWriter);
}
}
@ -117,32 +139,22 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro
return initialWindowSize / 2;
}
/**
* Gets the window for the stream or raises a {@code PROTOCOL_ERROR} if not found.
*/
private FlowState getStateOrFail(int streamId) throws Http2Exception {
FlowState window = streamStates.get(streamId);
if (window == null) {
throw protocolError("Flow control window missing for stream: %d", streamId);
}
return window;
}
/**
* Flow control window state for an individual stream.
*/
private final class FlowState {
private int windowSize;
private int lowerBound;
private final class InboundFlowState implements FlowState {
private final int streamId;
private int window;
private int lowerBound;
FlowState(int streamId) {
InboundFlowState(int streamId) {
this.streamId = streamId;
windowSize = initialWindowSize;
window = initialWindowSize;
}
int windowSize() {
return windowSize;
@Override
public int window() {
return window;
}
/**
@ -154,7 +166,7 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro
int addAndGet(int delta) throws Http2Exception {
// Apply the delta. Even if we throw an exception we want to have taken this delta into
// account.
windowSize += delta;
window += delta;
if (delta > 0) {
lowerBound = 0;
}
@ -164,7 +176,7 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro
// The value is bounded by the length that SETTINGS frame decrease the window.
// This difference is stored for the connection when writing the SETTINGS frame
// and is cleared once we send a WINDOW_UPDATE frame.
if (windowSize < lowerBound) {
if (window < lowerBound) {
if (streamId == CONNECTION_STREAM_ID) {
throw protocolError("Connection flow control window exceeded");
} else {
@ -172,7 +184,7 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro
}
}
return windowSize;
return window;
}
/**
@ -185,11 +197,11 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro
* @throws Http2Exception thrown if integer overflow occurs on the window.
*/
void updatedInitialWindowSize(int delta) throws Http2Exception {
if (delta > 0 && windowSize > Integer.MAX_VALUE - delta) {
if (delta > 0 && window > Integer.MAX_VALUE - delta) {
// Integer overflow.
throw flowControlError("Flow control window overflowed for stream: %d", streamId);
}
windowSize += delta;
window += delta;
if (delta < 0) {
lowerBound = delta;
@ -203,7 +215,7 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro
*/
void updateWindow(FrameWriter frameWriter) throws Http2Exception {
// Expand the window for this stream back to the size of the initial window.
int deltaWindowSize = initialWindowSize - windowSize();
int deltaWindowSize = initialWindowSize - window;
addAndGet(deltaWindowSize);
// Send a window update for the stream/connection.

View File

@ -24,149 +24,105 @@ import static io.netty.handler.codec.http2.Http2Exception.protocolError;
import static java.lang.Math.max;
import static java.lang.Math.min;
import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.http2.Http2PriorityTree.Priority;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
/**
* Basic implementation of {@link Http2OutboundFlowController}.
*/
public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowController {
/**
* The interval (in ns) at which the removed priority garbage collector runs.
*/
private static final long GARBAGE_COLLECTION_INTERVAL = TimeUnit.SECONDS.toNanos(2);
/**
* A comparators that sorts priority nodes in ascending order by the amount
* of priority data available for its subtree.
* A comparators that sorts priority nodes in ascending order by the amount of priority data
* available for its subtree.
*/
private static final Comparator<Priority<FlowState>> DATA_WEIGHT =
new Comparator<Priority<FlowState>>() {
private static final int MAX_DATA_THRESHOLD = Integer.MAX_VALUE / 256;
@Override
public int compare(Priority<FlowState> o1, Priority<FlowState> o2) {
int o1Data = o1.data().priorityBytes();
int o2Data = o2.data().priorityBytes();
if (o1Data > MAX_DATA_THRESHOLD || o2Data > MAX_DATA_THRESHOLD) {
// Corner case to make sure we don't overflow an integer with
// the multiply.
return o1.data().priorityBytes() - o2.data().priorityBytes();
}
private static final Comparator<Http2Stream> DATA_WEIGHT = new Comparator<Http2Stream>() {
private static final int MAX_DATA_THRESHOLD = Integer.MAX_VALUE / 256;
// Scale the data by the weight.
return (o1Data * o1.weight()) - (o2Data * o2.weight());
}
};
@Override
public int compare(Http2Stream o1, Http2Stream o2) {
int o1Data = state(o1).priorityBytes();
int o2Data = state(o2).priorityBytes();
if (o1Data > MAX_DATA_THRESHOLD || o2Data > MAX_DATA_THRESHOLD) {
// Corner case to make sure we don't overflow an integer with
// the multiply.
return o1Data - o2Data;
}
private final Http2PriorityTree<FlowState> priorityTree;
private final FlowState connectionFlow;
private final GarbageCollector garbageCollector;
// Scale the data by the weight.
return (o1Data * o1.weight()) - (o2Data * o2.weight());
}
};
private final Http2Connection connection;
private int initialWindowSize = DEFAULT_FLOW_CONTROL_WINDOW_SIZE;
public DefaultHttp2OutboundFlowController() {
priorityTree = new DefaultHttp2PriorityTree<FlowState>();
connectionFlow = new FlowState(priorityTree.root());
priorityTree.root().data(connectionFlow);
garbageCollector = new GarbageCollector();
}
@Override
public void addStream(int streamId, int parent, short weight, boolean exclusive) {
if (streamId <= 0) {
throw new IllegalArgumentException("Stream ID must be > 0");
}
Priority<FlowState> priority = priorityTree.get(streamId);
if (priority != null) {
throw new IllegalArgumentException("stream " + streamId + " already exists");
public DefaultHttp2OutboundFlowController(Http2Connection connection) {
if (connection == null) {
throw new NullPointerException("connection");
}
this.connection = connection;
priority = priorityTree.prioritize(streamId, parent, weight, exclusive);
priority.data(new FlowState(priority));
}
// Add a flow state for the connection.
connection.connectionStream().outboundFlow(
new OutboundFlowState(connection.connectionStream()));
@Override
public void updateStream(int streamId, int parentId, short weight, boolean exclusive) {
if (streamId <= 0) {
throw new IllegalArgumentException("Stream ID must be > 0");
}
Priority<FlowState> priority = priorityTree.get(streamId);
if (priority == null) {
throw new IllegalArgumentException("stream " + streamId + " does not exist");
}
// Get the parent stream.
Priority<FlowState> parent = priorityTree.root();
if (parentId != 0) {
parent = priorityTree.get(parentId);
if (parent == null) {
throw new IllegalArgumentException("Parent stream " + parentId + " does not exist");
}
}
// Determine whether we're adding a circular dependency on the parent. If so, this will
// restructure the tree to move this priority above the parent.
boolean circularDependency = parent.isDescendantOf(priority);
Priority<FlowState> previousParent = priority.parent();
boolean parentChange = previousParent != parent;
// If the parent is changing, remove the priority bytes from all relevant branches of the
// priority tree. We will restore them where appropriate after the move.
if (parentChange) {
// The parent is changing, remove the priority bytes for this subtree from its previous
// parent.
previousParent.data().incrementPriorityBytes(-priority.data().priorityBytes());
if (circularDependency) {
// The parent is currently a descendant of priority. Remove the priority bytes
// for its subtree starting at its parent node.
parent.parent().data().incrementPriorityBytes(-parent.data().priorityBytes());
}
}
// Adjust the priority tree.
priorityTree.prioritize(streamId, parentId, weight, exclusive);
// If the parent was changed, restore the priority bytes to the appropriate branches
// of the priority tree.
if (parentChange) {
if (circularDependency) {
// The parent was re-rooted. Update the priority bytes for its parent branch.
parent.parent().data().incrementPriorityBytes(parent.data().priorityBytes());
// Register for notification of new streams.
connection.addListener(new Http2ConnectionAdapter() {
@Override
public void streamAdded(Http2Stream stream) {
// Just add a new flow state to the stream.
stream.outboundFlow(new OutboundFlowState(stream));
}
// Add the priority bytes for this subtree to the new parent branch.
parent.data().incrementPriorityBytes(priority.data().priorityBytes());
}
}
@Override
public void streamHalfClosed(Http2Stream stream) {
if (!stream.localSideOpen()) {
// Any pending frames can never be written, clear and
// write errors for any pending frames.
state(stream).clear();
}
}
@Override
public void removeStream(int streamId) {
if (streamId <= 0) {
throw new IllegalArgumentException("Stream ID must be > 0");
}
@Override
public void streamInactive(Http2Stream stream) {
// Any pending frames can never be written, clear and
// write errors for any pending frames.
state(stream).clear();
}
Priority<FlowState> priority = priorityTree.get(streamId);
if (priority != null) {
priority.data().markForRemoval();
}
@Override
public void streamPriorityChanged(Http2Stream stream, Http2Stream previousParent) {
if (stream.parent() != previousParent) {
// The parent changed, move the priority bytes to the new parent.
int priorityBytes = state(stream).priorityBytes();
state(previousParent).incrementPriorityBytes(-priorityBytes);
state(stream.parent()).incrementPriorityBytes(priorityBytes);
}
}
@Override
public void streamPrioritySubtreeChanged(Http2Stream stream, Http2Stream subtreeRoot) {
// Reset the priority bytes for the entire subtree.
resetSubtree(subtreeRoot);
}
});
}
@Override
public void initialOutboundWindowSize(int newWindowSize) throws Http2Exception {
int delta = newWindowSize - initialWindowSize;
initialWindowSize = newWindowSize;
connectionFlow.incrementStreamWindow(delta);
for (Priority<FlowState> priority : priorityTree) {
FlowState state = priority.data();
if (!state.isMarkedForRemoval()) {
// Verify that the maximum value is not exceeded by this change.
state.incrementStreamWindow(delta);
}
connectionState().incrementStreamWindow(delta);
for (Http2Stream stream : connection.activeStreams()) {
// Verify that the maximum value is not exceeded by this change.
OutboundFlowState state = state(stream);
state.incrementStreamWindow(delta);
}
if (delta > 0) {
@ -184,11 +140,11 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
public void updateOutboundWindowSize(int streamId, int delta) throws Http2Exception {
if (streamId == CONNECTION_STREAM_ID) {
// Update the connection window and write any pending frames for all streams.
connectionFlow.incrementStreamWindow(delta);
connectionState().incrementStreamWindow(delta);
writePendingBytes();
} else {
// Update the stream window and write any pending frames for the stream.
FlowState state = getStateOrFail(streamId);
OutboundFlowState state = stateOrFail(streamId);
state.incrementStreamWindow(delta);
state.writeBytes(state.writableWindow());
}
@ -202,8 +158,8 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
@Override
public void sendFlowControlled(int streamId, ByteBuf data, int padding, boolean endStream,
boolean endSegment, boolean compressed, FrameWriter frameWriter) throws Http2Exception {
FlowState state = getStateOrFail(streamId);
FlowState.Frame frame =
OutboundFlowState state = stateOrFail(streamId);
OutboundFlowState.Frame frame =
state.newFrame(data, padding, endStream, endSegment, compressed, frameWriter);
int dataLength = data.readableBytes();
@ -225,12 +181,24 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
frame.split(state.writableWindow()).write();
}
private static OutboundFlowState state(Http2Stream stream) {
return (OutboundFlowState) stream.outboundFlow();
}
private OutboundFlowState connectionState() {
return state(connection.connectionStream());
}
private OutboundFlowState state(int streamId) {
return state(connection.stream(streamId));
}
/**
* Attempts to get the {@link FlowState} for the given stream. If not available, raises a
* {@code PROTOCOL_ERROR}.
* Attempts to get the {@link OutboundFlowState} for the given stream. If not available, raises
* a {@code PROTOCOL_ERROR}.
*/
private FlowState getStateOrFail(int streamId) throws Http2Exception {
FlowState state = getFlowState(streamId);
private OutboundFlowState stateOrFail(int streamId) throws Http2Exception {
OutboundFlowState state = state(streamId);
if (state == null) {
throw protocolError("Missing flow control window for stream: %d", streamId);
}
@ -241,19 +209,38 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
* Returns the flow control window for the entire connection.
*/
private int connectionWindow() {
return priorityTree.root().data().streamWindow();
return connectionState().window();
}
/**
* Resets the priority bytes for the given subtree following a restructuring of the priority
* tree.
*/
private void resetSubtree(Http2Stream subtree) {
// Reset the state priority bytes for this node to its pending bytes and propagate the
// delta required for this change up the tree. It's important to note that the total number
// of priority bytes for this subtree hasn't changed. As we traverse the subtree we will
// subtract off values from the parent of this tree, but we'll add them back later as we
// traverse the rest of the subtree.
OutboundFlowState state = state(subtree);
int delta = state.pendingBytes - state.priorityBytes;
state.incrementPriorityBytes(delta);
// Now recurse this operation for each child.
for (Http2Stream child : subtree.children()) {
resetSubtree(child);
}
}
/**
* Writes as many pending bytes as possible, according to stream priority.
*/
private void writePendingBytes() throws Http2Exception {
// Perform garbage collection to remove any priorities marked for deletion from the tree.
garbageCollector.run();
// Recursively write as many of the total writable bytes as possible.
Priority<FlowState> root = priorityTree.root();
writeAllowedBytes(root, root.data().priorityBytes());
Http2Stream connectionStream = connection.connectionStream();
int totalAllowance = state(connectionStream).priorityBytes();
writeAllowedBytes(connectionStream, totalAllowance);
}
/**
@ -265,15 +252,14 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
* @param allowance an allowed number of bytes that may be written to the streams in this sub
* tree.
*/
private void writeAllowedBytes(Priority<FlowState> priority, int allowance)
throws Http2Exception {
private void writeAllowedBytes(Http2Stream stream, int allowance) throws Http2Exception {
// Write the allowed bytes for this node. If not all of the allowance was used,
// restore what's left so that it can be propagated to future nodes.
FlowState state = priority.data();
OutboundFlowState state = state(stream);
int bytesWritten = state.writeBytes(allowance);
allowance -= bytesWritten;
if (allowance <= 0 || priority.isLeaf()) {
if (allowance <= 0 || stream.isLeaf()) {
// Nothing left to do in this sub tree.
return;
}
@ -287,8 +273,8 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
// Optimization. If the window is big enough to fit all the data. Just write everything
// and skip the priority algorithm.
if (unallocatedBytes <= remainingWindow) {
for (Priority<FlowState> child : priority.children()) {
writeAllowedBytes(child, child.data().unallocatedPriorityBytes());
for (Http2Stream child : stream.children()) {
writeAllowedBytes(child, state(child).unallocatedPriorityBytes());
}
return;
}
@ -299,7 +285,7 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
// increases in value as the list is iterated. This means that with this node ordering,
// the most bytes will be written to those nodes with the largest aggregate number of
// bytes and the highest priority.
ArrayList<Priority<FlowState>> states = new ArrayList<Priority<FlowState>>(priority.children());
List<Http2Stream> states = new ArrayList<Http2Stream>(stream.children());
Collections.sort(states, DATA_WEIGHT);
// Iterate over the children and spread the remaining bytes across them as is appropriate
@ -309,11 +295,11 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
// allocated are then decremented from the totals, so that the subsequent
// nodes split the difference. If after being processed, a node still has writable data,
// it is added back to the queue for further processing in the next pass.
int remainingWeight = priority.totalChildWeights();
int remainingWeight = stream.totalChildWeights();
int nextTail = 0;
int unallocatedBytesForNextPass = 0;
int remainingWeightForNextPass = 0;
for (int head = 0, tail = states.size(); ; ++head) {
for (int head = 0, tail = states.size();; ++head) {
if (head >= tail) {
// We've reached the end one pass of the nodes. Reset the totals based on
// the nodes that were re-added to the deque since they still have data available.
@ -330,16 +316,17 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
if (head >= tail) {
break;
}
Priority<FlowState> next = states.get(head);
FlowState node = next.data();
int weight = node.priority().weight();
Http2Stream next = states.get(head);
OutboundFlowState nextState = state(next);
int weight = next.weight();
// Determine the value (in bytes) of a single unit of weight.
double dataToWeightRatio = min(unallocatedBytes, remainingWindow) / (double) remainingWeight;
unallocatedBytes -= node.unallocatedPriorityBytes();
double dataToWeightRatio =
min(unallocatedBytes, remainingWindow) / (double) remainingWeight;
unallocatedBytes -= nextState.unallocatedPriorityBytes();
remainingWeight -= weight;
if (dataToWeightRatio > 0.0 && node.unallocatedPriorityBytes() > 0) {
if (dataToWeightRatio > 0.0 && nextState.unallocatedPriorityBytes() > 0) {
// Determine the portion of the current writable data that is assigned to this
// node.
@ -347,95 +334,53 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
// Clip the chunk allocated by the total amount of unallocated data remaining in
// the node.
int allocatedChunk = min(writableChunk, node.unallocatedPriorityBytes());
int allocatedChunk = min(writableChunk, nextState.unallocatedPriorityBytes());
// Update the remaining connection window size.
remainingWindow -= allocatedChunk;
// Mark these bytes as allocated.
node.allocatePriorityBytes(allocatedChunk);
if (node.unallocatedPriorityBytes() > 0) {
nextState.allocatePriorityBytes(allocatedChunk);
if (nextState.unallocatedPriorityBytes() > 0) {
// There is still data remaining for this stream. Add it back to the queue
// for the next pass.
unallocatedBytesForNextPass += node.unallocatedPriorityBytes();
unallocatedBytesForNextPass += nextState.unallocatedPriorityBytes();
remainingWeightForNextPass += weight;
states.set(nextTail++, node.priority());
states.set(nextTail++, next);
continue;
}
}
if (node.allocatedPriorityBytes() > 0) {
if (nextState.allocatedPriorityBytes() > 0) {
// Write the allocated data for this stream.
writeAllowedBytes(node.priority(), node.allocatedPriorityBytes());
writeAllowedBytes(next, nextState.allocatedPriorityBytes());
// We're done with this node. Remark all bytes as unallocated for future
// invocations.
node.allocatePriorityBytes(0);
nextState.allocatePriorityBytes(0);
}
}
}
private FlowState getFlowState(int streamId) {
Priority<FlowState> priority = priorityTree.get(streamId);
return priority != null ? priority.data() : null;
}
/**
* The outbound flow control state for a single stream.
*/
private final class FlowState {
private final class OutboundFlowState implements FlowState {
private final Queue<Frame> pendingWriteQueue;
private final Priority<FlowState> priority;
private int streamWindow = initialWindowSize;
private long removalTime;
private final Http2Stream stream;
private int window = initialWindowSize;
private int pendingBytes;
private int priorityBytes;
private int allocatedPriorityBytes;
FlowState(Priority<FlowState> priority) {
this.priority = priority;
OutboundFlowState(Http2Stream stream) {
this.stream = stream;
pendingWriteQueue = new ArrayDeque<Frame>(2);
}
/**
* Gets the priority in the tree associated with this flow state.
*/
Priority<FlowState> priority() {
return priority;
}
/**
* Indicates that this priority has been marked for removal, thus making it a candidate for
* garbage collection.
*/
boolean isMarkedForRemoval() {
return removalTime > 0L;
}
/**
* If marked for removal, indicates the removal time of this priority.
*/
long removalTime() {
return removalTime;
}
/**
* Marks this state for removal, thus making it a candidate for garbage collection. Sets the
* removal time to the current system time.
*/
void markForRemoval() {
if (!isMarkedForRemoval()) {
garbageCollector.add(priority);
clear();
}
}
/**
* The flow control window for this stream. If this state is for stream 0, then this is
* the flow control window for the entire connection.
*/
int streamWindow() {
return streamWindow;
@Override
public int window() {
return window;
}
/**
@ -443,35 +388,35 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
* value.
*/
int incrementStreamWindow(int delta) throws Http2Exception {
if (delta > 0 && (Integer.MAX_VALUE - delta) < streamWindow) {
throw new Http2StreamException(priority.streamId(), FLOW_CONTROL_ERROR,
"Window size overflow for stream: " + priority.streamId());
if (delta > 0 && (Integer.MAX_VALUE - delta) < window) {
throw new Http2StreamException(stream.id(), FLOW_CONTROL_ERROR,
"Window size overflow for stream: " + stream.id());
}
int previouslyStreamable = streamableBytes();
streamWindow += delta;
window += delta;
// Update this branch of the priority tree if the streamable bytes have changed for this
// node.
incrementPriorityBytes(streamableBytes() - previouslyStreamable);
return streamWindow;
return window;
}
/**
* Returns the maximum writable window (minimum of the stream and connection windows).
*/
int writableWindow() {
return min(streamWindow, connectionWindow());
return min(window, connectionWindow());
}
/**
* Returns the number of pending bytes for this node that will fit within the
* {@link #streamWindow}. This is used for the priority algorithm to determine the aggregate
* total for {@link #priorityBytes} at each node. Each node only takes into account it's
* stream window so that when a change occurs to the connection window, these values need
* not change (i.e. no tree traversal is required).
* {@link #window}. This is used for the priority algorithm to determine the aggregate total
* for {@link #priorityBytes} at each node. Each node only takes into account it's stream
* window so that when a change occurs to the connection window, these values need not
* change (i.e. no tree traversal is required).
*/
int streamableBytes() {
return max(0, min(pendingBytes, streamWindow));
return max(0, min(pendingBytes, window));
}
/**
@ -524,7 +469,7 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
* size is zero.
*/
Frame peek() {
if (streamWindow > 0) {
if (window > 0) {
return pendingWriteQueue.peek();
}
return null;
@ -551,7 +496,7 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
*/
int writeBytes(int bytes) throws Http2Exception {
int bytesWritten = 0;
if (isMarkedForRemoval()) {
if (!stream.localSideOpen()) {
return bytesWritten;
}
@ -583,7 +528,9 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
private void incrementPendingBytes(int numBytes) {
int previouslyStreamable = streamableBytes();
pendingBytes += numBytes;
incrementPriorityBytes(streamableBytes() - previouslyStreamable);
int delta = streamableBytes() - previouslyStreamable;
incrementPriorityBytes(delta);
}
/**
@ -593,8 +540,8 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
private void incrementPriorityBytes(int numBytes) {
if (numBytes != 0) {
priorityBytes += numBytes;
if (!priority.isRoot()) {
priority.parent().data().incrementPriorityBytes(numBytes);
if (!stream.isRoot()) {
state(stream.parent()).incrementPriorityBytes(numBytes);
}
}
}
@ -642,10 +589,9 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
*/
void write() throws Http2Exception {
int dataLength = data.readableBytes();
connectionFlow.incrementStreamWindow(-dataLength);
connectionState().incrementStreamWindow(-dataLength);
incrementStreamWindow(-dataLength);
writer.writeFrame(priority.streamId(), data, padding, endStream, endSegment,
compressed);
writer.writeFrame(stream.id(), data, padding, endStream, endSegment, compressed);
decrementPendingBytes(dataLength);
}
@ -671,8 +617,7 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
Frame split(int maxBytes) {
// TODO: Should padding be included in the chunks or only the last frame?
maxBytes = min(maxBytes, data.readableBytes());
Frame frame = new Frame(data.readSlice(maxBytes).retain(), 0, false, false, compressed,
writer);
Frame frame = new Frame(data.readSlice(maxBytes).retain(), 0, false, false, compressed, writer);
decrementPendingBytes(maxBytes);
return frame;
}
@ -688,56 +633,4 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
}
}
}
/**
* Controls garbage collection for priorities that have been marked for removal.
*/
private final class GarbageCollector implements Runnable {
private final Queue<Priority<FlowState>> garbage;
private long lastGarbageCollection;
GarbageCollector() {
garbage = new ArrayDeque<Priority<FlowState>>();
}
void add(Priority<FlowState> priority) {
priority.data().removalTime = System.nanoTime();
garbage.add(priority);
}
/**
* Removes any priorities from the tree that were marked for removal greater than
* {@link #GARBAGE_COLLECTION_INTERVAL} milliseconds ago. Garbage collection will run at most on
* the interval {@link #GARBAGE_COLLECTION_INTERVAL}, so calling it more frequently will have no
* effect.
*/
@Override
public void run() {
if (garbage.isEmpty()) {
return;
}
long time = System.nanoTime();
if (time - lastGarbageCollection < GARBAGE_COLLECTION_INTERVAL) {
// Only run the garbage collection on the threshold interval (at most).
return;
}
lastGarbageCollection = time;
for (;;) {
Priority<FlowState> next = garbage.peek();
if (next == null) {
break;
}
long removeTime = next.data().removalTime();
if (time - removeTime > GARBAGE_COLLECTION_INTERVAL) {
Priority<FlowState> priority = garbage.remove();
priorityTree.remove(priority.streamId());
} else {
break;
}
}
}
}
}

View File

@ -1,322 +0,0 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License, version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.netty.handler.codec.http2;
import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT;
import static io.netty.handler.codec.http2.Http2CodecUtil.MAX_WEIGHT;
import static io.netty.handler.codec.http2.Http2CodecUtil.MIN_WEIGHT;
import java.util.ArrayDeque;
import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Set;
/**
* Default implementation of {@link Http2PriorityTree}.
*/
public class DefaultHttp2PriorityTree<T> implements Http2PriorityTree<T> {
private final DefaultPriority<T> root = new DefaultPriority<T>(0, (short) 0);
private final Map<Integer, Priority<T>> priorityMap = new HashMap<Integer, Priority<T>>();
@Override
public Priority<T> get(int streamId) {
return priorityMap.get(streamId);
}
@Override
public Iterator<Priority<T>> iterator() {
return Collections.unmodifiableCollection(priorityMap.values()).iterator();
}
@Override
public Priority<T> prioritizeUsingDefaults(int streamId) {
return prioritize(streamId, 0, DEFAULT_PRIORITY_WEIGHT, false);
}
@Override
public Priority<T> prioritize(int streamId, int parent, short weight, boolean exclusive) {
if (streamId <= 0) {
throw new IllegalArgumentException("Stream ID must be > 0");
}
if (streamId == parent) {
throw new IllegalArgumentException("A stream cannot depend on itself");
}
if (parent < 0) {
throw new IllegalArgumentException("Parent stream ID must be >= 0");
}
if (weight < MIN_WEIGHT || weight > MAX_WEIGHT) {
throw new IllegalArgumentException("Invalid weight: " + weight);
}
// Get the parent.
DefaultPriority<T> newParent = root;
if (parent > 0) {
newParent = internalGet(parent);
}
if (newParent == null) {
throw new IllegalArgumentException("Parent priority does not exist: " + parent);
}
DefaultPriority<T> priority = internalGet(streamId);
if (priority == null) {
// Add a new priority.
priority = new DefaultPriority<T>(streamId, weight);
newParent.addChild(priority, exclusive);
priorityMap.put(streamId, priority);
return priority;
}
// Already have a priority. Re-prioritize the stream.
priority.weight(weight);
if (newParent == priority.parent() && !exclusive) {
// No changes were made to the tree structure.
return priority;
}
// Break off the priority branch from it's current parent.
DefaultPriority<T> oldParent = priority.parent();
oldParent.removeChildBranch(priority);
if (newParent.isDescendantOf(priority)) {
// Adding a circular dependency (priority<->newParent). Break off the new parent's
// branch and add it above this priority.
newParent.parent().removeChildBranch(newParent);
oldParent.addChild(newParent, false);
}
// Add the priority under the new parent.
newParent.addChild(priority, exclusive);
return priority;
}
@Override
public T remove(int streamId) {
if (streamId <= 0) {
throw new IllegalArgumentException("Stream ID must be > 0");
}
// Remove the priority from the map.
DefaultPriority<T> priority = internalGet(streamId);
if (priority != null) {
// Remove it from the tree as well.
priority.parent().removeChild(priority);
return priority.data();
}
return null;
}
@Override
public int size() {
return priorityMap.size();
}
@Override
public Priority<T> root() {
return root;
}
@Override
public String toString() {
StringBuilder builder = new StringBuilder(getClass().getSimpleName()).append("[");
Deque<DefaultPriority<T>> nodes = new ArrayDeque<DefaultPriority<T>>();
nodes.addLast(root);
while (!nodes.isEmpty()) {
DefaultPriority<T> p = nodes.pop();
builder.append(p.streamId()).append("->")
.append(p.parent() == null ? "null" : p.parent().streamId).append(", ");
for (DefaultPriority<T> child : p.children) {
nodes.addLast(child);
}
}
builder.append("]");
return builder.toString();
}
private DefaultPriority<T> internalGet(int streamId) {
return (DefaultPriority<T>) priorityMap.get(streamId);
}
/**
* Internal implementation of {@link Priority}.
*/
private static final class DefaultPriority<T> implements Priority<T> {
private final int streamId;
private short weight;
private T data;
private Set<DefaultPriority<T>> children = newChildSet();
private DefaultPriority<T> parent;
private int totalChildWeights;
DefaultPriority(int streamId, short weight) {
this.streamId = streamId;
this.weight = weight;
}
@Override
public boolean isRoot() {
return parent == null;
}
@Override
public int streamId() {
return streamId;
}
@Override
public short weight() {
return weight;
}
@Override
public T data() {
return data;
}
@Override
public Priority<T> data(T data) {
this.data = data;
return this;
}
@Override
public int totalChildWeights() {
return totalChildWeights;
}
@Override
public DefaultPriority<T> parent() {
return parent;
}
@Override
public boolean isDescendantOf(Priority<T> priority) {
Priority<T> next = parent;
while (next != null) {
if (next == priority) {
return true;
}
next = next.parent();
}
return false;
}
@Override
public boolean isLeaf() {
return numChildren() == 0;
}
@Override
public int numChildren() {
return children.size();
}
@Override
public Set<? extends Priority<T>> children() {
return Collections.unmodifiableSet(children);
}
@Override
public boolean hasChild(int streamId) {
return child(streamId) != null;
}
@Override
public Priority<T> child(int streamId) {
for (DefaultPriority<T> child : children) {
if (child.streamId() == streamId) {
return child;
}
}
return null;
}
void weight(short weight) {
if (parent != null && weight != this.weight) {
int delta = weight - this.weight;
parent.totalChildWeights += delta;
}
this.weight = weight;
}
Set<DefaultPriority<T>> removeAllChildren() {
if (children.isEmpty()) {
return Collections.emptySet();
}
totalChildWeights = 0;
Set<DefaultPriority<T>> prevChildren = children;
children = newChildSet();
return prevChildren;
}
/**
* Adds a child to this priority. If exclusive is set, any children of this node are moved
* to being dependent on the child.
*/
void addChild(DefaultPriority<T> child, boolean exclusive) {
if (exclusive) {
// If it was requested that this child be the exclusive dependency of this node,
// move any previous children to the child node, becoming grand children
// of this node.
for (DefaultPriority<T> grandchild : removeAllChildren()) {
child.addChild(grandchild, false);
}
}
child.parent = this;
if (children.add(child)) {
totalChildWeights += child.weight();
}
}
/**
* Removes the child priority and moves any of its dependencies to being direct dependencies
* on this node.
*/
void removeChild(DefaultPriority<T> child) {
if (children.remove(child)) {
child.parent = null;
totalChildWeights -= child.weight();
// Move up any grand children to be directly dependent on this node.
for (DefaultPriority<T> grandchild : child.children) {
addChild(grandchild, false);
}
}
}
/**
* Removes the child priority but unlike {@link #removeChild}, leaves its branch unaffected.
*/
void removeChildBranch(DefaultPriority<T> child) {
if (children.remove(child)) {
child.parent = null;
totalChildWeights -= child.weight();
}
}
private static <T> Set<DefaultPriority<T>> newChildSet() {
return new LinkedHashSet<DefaultPriority<T>>(2);
}
}
}

View File

@ -0,0 +1,106 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License, version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.netty.handler.codec.http2;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.ScheduledFuture;
/**
* A {@link Http2StreamRemovalPolicy} that periodically runs garbage collection on streams that have
* been marked for removal.
*/
public class DefaultHttp2StreamRemovalPolicy extends ChannelHandlerAdapter implements
Http2StreamRemovalPolicy, Runnable {
/**
* The interval (in ns) at which the removed priority garbage collector runs.
*/
private static final long GARBAGE_COLLECTION_INTERVAL = SECONDS.toNanos(5);
private final Queue<Garbage> garbage = new ArrayDeque<Garbage>();
private ScheduledFuture<?> timerFuture;
private Action action;
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
// Schedule the periodic timer for performing the policy check.
timerFuture = ctx.channel().eventLoop().scheduleWithFixedDelay(this,
GARBAGE_COLLECTION_INTERVAL,
GARBAGE_COLLECTION_INTERVAL,
NANOSECONDS);
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
// Cancel the periodic timer.
if (timerFuture != null) {
timerFuture.cancel(false);
timerFuture = null;
}
}
@Override
public void setAction(Action action) {
this.action = action;
}
@Override
public void markForRemoval(Http2Stream stream) {
garbage.add(new Garbage(stream));
}
/**
* Runs garbage collection of any streams marked for removal >
* {@link #GARBAGE_COLLECTION_INTERVAL} nanoseconds ago.
*/
@Override
public void run() {
if (garbage.isEmpty() || action == null) {
return;
}
long time = System.nanoTime();
for (;;) {
Garbage next = garbage.peek();
if (next == null) {
break;
}
if (time - next.removalTime > GARBAGE_COLLECTION_INTERVAL) {
garbage.remove();
action.removeStream(next.stream);
} else {
break;
}
}
}
/**
* Wrapper around a stream and its removal time.
*/
private static final class Garbage {
private final long removalTime = System.nanoTime();
private final Http2Stream stream;
Garbage(Http2Stream stream) {
this.stream = stream;
}
}
}

View File

@ -0,0 +1,27 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License, version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.netty.handler.codec.http2;
/**
* Base interface for flow-control state for a particular stream.
*/
public interface FlowState {
/**
* Returns the current remaining flow control window (in bytes) for the stream. Depending on the
* flow control implementation, this value may be negative.
*/
int window();
}

View File

@ -22,6 +22,7 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http2.Http2StreamRemovalPolicy.Action;
/**
* Constants and utility method used for encoding/decoding HTTP2 frames.
@ -77,6 +78,33 @@ public final class Http2CodecUtil {
return Unpooled.wrappedBuffer(EMPTY_PING);
}
/**
* Returns a simple {@link Http2StreamRemovalPolicy} that immediately calls back the
* {@link Action} when a stream is marked for removal.
*/
public static Http2StreamRemovalPolicy immediateRemovalPolicy() {
return new Http2StreamRemovalPolicy() {
private Action action;
@Override
public void setAction(Action action) {
if (action == null) {
throw new NullPointerException("action");
}
this.action = action;
}
@Override
public void markForRemoval(Http2Stream stream) {
if (action == null) {
throw new IllegalStateException(
"Action must be called before removing streams.");
}
action.removeStream(stream);
}
};
}
/**
* Converts the given cause to a {@link Http2Exception} if it isn't already.
*/

View File

@ -17,8 +17,75 @@ package io.netty.handler.codec.http2;
import java.util.Collection;
/**
* Manager for the state of an HTTP/2 connection with the remote end-point.
*/
public interface Http2Connection {
/**
* Listener for life-cycle events for streams in this connection.
*/
interface Listener {
/**
* Notifies the listener that the given stream was added to the connection. This stream may
* not yet be active (i.e. open/half-closed).
*/
void streamAdded(Http2Stream stream);
/**
* Notifies the listener that the given stream was made active (i.e. open in at least one
* direction).
*/
void streamActive(Http2Stream stream);
/**
* Notifies the listener that the given stream is now half-closed. The stream can be
* inspected to determine which side is closed.
*/
void streamHalfClosed(Http2Stream stream);
/**
* Notifies the listener that the given stream is now closed in both directions.
*/
void streamInactive(Http2Stream stream);
/**
* Notifies the listener that the given stream has now been removed from the connection and
* will no longer be returned via {@link Http2Connection#stream(int)}. The connection may
* maintain inactive streams for some time before removing them.
*/
void streamRemoved(Http2Stream stream);
/**
* Notifies the listener that the priority for the stream has changed. The parent of the
* stream may have changed, so the previous parent is also provided.
* <p>
* Either this method or {@link #streamPrioritySubtreeChanged} will be called, but not both
* for a single change. This method is called for simple priority changes. If a priority
* change causes a circular dependency between the stream and one of its descendants, the
* subtree must be restructured causing {@link #streamPrioritySubtreeChanged} instead.
*
* @param stream the stream for which the priority has changed.
* @param previousParent the previous parent of the stream. May be the same as its current
* parent if unchanged.
*/
void streamPriorityChanged(Http2Stream stream, Http2Stream previousParent);
/**
* Called when a priority change for a stream creates a circular dependency between the
* stream and one of its descendants. This requires a restructuring of the priority tree.
* <p>
* Either this method or {@link #streamPriorityChanged} will be called, but not both for a
* single change. For simple changes that do not cause the tree to be restructured,
* {@link #streamPriorityChanged} will be called instead.
*
* @param stream the stream for which the priority has changed, causing the tree to be
* restructured.
* @param subtreeRoot the new root of the subtree that has changed.
*/
void streamPrioritySubtreeChanged(Http2Stream stream, Http2Stream subtreeRoot);
}
/**
* A view of the connection from one endpoint (local or remote).
*/
@ -108,9 +175,14 @@ public interface Http2Connection {
}
/**
* Indicates whether or not the local endpoint for this connection is the server.
* Adds a listener of stream life-cycle events. Adding the same listener multiple times has no effect.
*/
boolean isServer();
void addListener(Listener listener);
/**
* Removes a listener of stream life-cycle events.
*/
void removeListener(Listener listener);
/**
* Attempts to get the stream for the given ID. If it doesn't exist, throws.
@ -123,7 +195,13 @@ public interface Http2Connection {
Http2Stream stream(int streamId);
/**
* Gets the number of active streams in this connection.
* Gets the stream object representing the connection, itself (i.e. stream zero). This object
* always exists.
*/
Http2Stream connectionStream();
/**
* Gets the number of streams that are currently either open or half-closed.
*/
int numActiveStreams();
@ -133,6 +211,11 @@ public interface Http2Connection {
*/
Collection<Http2Stream> activeStreams();
/**
* Indicates whether or not the local endpoint for this connection is the server.
*/
boolean isServer();
/**
* Gets a view of this connection from the local {@link Endpoint}.
*/

View File

@ -0,0 +1,49 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License, version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.netty.handler.codec.http2;
/**
* Provides empty implementations of all {@link Http2Connection.Listener} methods.
*/
public class Http2ConnectionAdapter implements Http2Connection.Listener {
@Override
public void streamAdded(Http2Stream stream) {
}
@Override
public void streamActive(Http2Stream stream) {
}
@Override
public void streamHalfClosed(Http2Stream stream) {
}
@Override
public void streamInactive(Http2Stream stream) {
}
@Override
public void streamRemoved(Http2Stream stream) {
}
@Override
public void streamPriorityChanged(Http2Stream stream, Http2Stream previousParent) {
}
@Override
public void streamPrioritySubtreeChanged(Http2Stream stream, Http2Stream subtreeRoot) {
}
}

View File

@ -33,18 +33,6 @@ public interface Http2InboundFlowController {
void writeFrame(int streamId, int windowSizeIncrement) throws Http2Exception;
}
/**
* Informs the flow controller of the existence of a new stream, allowing it to allocate
* resources as needed.
*/
void addStream(int streamId);
/**
* Removes the given stream from flow control processing logic and frees resources as
* appropriate.
*/
void removeStream(int streamId);
/**
* Sets the initial inbound flow control window size and updates all stream window sizes by the
* delta. This is called as part of the processing for an outbound SETTINGS frame.

View File

@ -40,37 +40,6 @@ public interface Http2OutboundFlowController {
void setFailure(Throwable cause);
}
/**
* Creates a new priority for the a stream with respect to out-bound flow control.
*
* @param streamId the stream to be prioritized.
* @param parent an optional stream that the given stream should depend on. Zero, if no
* dependency.
* @param weight the weight to be assigned to this stream relative to its parent. This value
* must be between 1 and 256 (inclusive)
* @param exclusive indicates that the stream should be the exclusive dependent on its parent.
* This only applies if the stream has a parent.
*/
void addStream(int streamId, int parent, short weight, boolean exclusive);
/**
* Updates the priority for a stream with respect to out-bound flow control.
*
* @param streamId the stream to be prioritized.
* @param parent an optional stream that the given stream should depend on. Zero, if no
* dependency.
* @param weight the weight to be assigned to this stream relative to its parent. This value
* must be between 1 and 256 (inclusive)
* @param exclusive indicates that the stream should be the exclusive dependent on its parent.
* This only applies if the stream has a parent.
*/
void updateStream(int streamId, int parent, short weight, boolean exclusive);
/**
* Removes the given stream from those considered for out-bound flow control.
*/
void removeStream(int streamId);
/**
* Sets the initial size of the connection's outbound flow control window. The outbound flow
* control windows for all streams are updated by the delta in the initial window size. This is

View File

@ -1,145 +0,0 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License, version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.netty.handler.codec.http2;
import java.util.Collection;
/**
* A tree for maintaining relative priority information among streams.
*/
public interface Http2PriorityTree<T> extends Iterable<Http2PriorityTree.Priority<T>> {
/**
* Priority node for a single stream.
*/
interface Priority<T> {
/**
* Indicates whether or not this priority is the root of the tree.
*/
boolean isRoot();
/**
* Indicates whether or not this is a leaf node (i.e. {@link #numChildren} is 0).
*/
boolean isLeaf();
/**
* Returns the subject stream for this priority information.
*/
int streamId();
/**
* Returns optional data associated with this stream.
*/
T data();
/**
* Associates the given data with this priority node.
*/
Priority<T> data(T data);
/**
* Returns weight assigned to the dependency with the parent. The weight will be a value
* between 1 and 256.
*/
short weight();
/**
* The total of the weights of all children of this node.
*/
int totalChildWeights();
/**
* The parent (i.e. the node on which this node depends), or {@code null} if this is the
* root node.
*/
Priority<T> parent();
/**
* Indicates whether or not this priority is descended from the given priority.
*/
boolean isDescendantOf(Priority<T> priority);
/**
* Returns the number of children directly dependent on this node.
*/
int numChildren();
/**
* Indicates whether the priority for the given stream is a direct child of this node.
*/
boolean hasChild(int streamId);
/**
* Attempts to find a child of this node for the given stream. If not found, returns
* {@code null}.
*/
Priority<T> child(int streamId);
/**
* Gets the children nodes that are dependent on this node.
*/
Collection<? extends Priority<T>> children();
}
/**
* Adds a new priority or updates an existing priority for the given stream, using default
* priority values.
*
* @param streamId the stream to be prioritized
* @return the priority for the stream.
*/
Priority<T> prioritizeUsingDefaults(int streamId);
/**
* Adds a new priority or updates an existing priority for the given stream.
*
* @param streamId the stream to be prioritized.
* @param parent an optional stream that the given stream should depend on. Zero, if no
* dependency.
* @param weight the weight to be assigned to this stream relative to its parent. This value
* must be between 1 and 256 (inclusive)
* @param exclusive indicates that the stream should be the exclusive dependent on its parent.
* This only applies if the stream has a parent.
* @return the priority for the stream.
*/
Priority<T> prioritize(int streamId, int parent, short weight, boolean exclusive);
/**
* Removes the priority information for the given stream. Adjusts other priorities if necessary.
*
* @return the data that was associated with the stream or {@code null} if the node was not
* found or no data was found in the node.
*/
T remove(int streamId);
/**
* Gets the total number of streams that have been prioritized in the tree (not counting the
* root node).
*/
int size();
/**
* Returns the root of the tree. The root always exists and represents the connection itself.
*/
Priority<T> root();
/**
* Returns the priority for the given stream, or {@code null} if not available.
*/
Priority<T> get(int streamId);
}

View File

@ -15,6 +15,8 @@
package io.netty.handler.codec.http2;
import java.util.Collection;
/**
* A single stream within an HTTP2 connection. Streams are compared to each other by priority.
*/
@ -81,4 +83,93 @@ public interface Http2Stream {
* {@link State#OPEN} or {@link State#HALF_CLOSED_REMOTE}).
*/
boolean localSideOpen();
/**
* Gets the in-bound flow control state for this stream.
*/
FlowState inboundFlow();
/**
* Sets the in-bound flow control state for this stream.
*/
void inboundFlow(FlowState state);
/**
* Gets the out-bound flow control window for this stream.
*/
FlowState outboundFlow();
/**
* Sets the out-bound flow control window for this stream.
*/
void outboundFlow(FlowState state);
/**
* Updates an priority for this stream. Calling this method may affect the straucture of the
* priority tree.
*
* @param parentStreamId the parent stream that given stream should depend on. May be {@code 0},
* if the stream has no dependencies and should be an immediate child of the
* connection.
* @param weight the weight to be assigned to this stream relative to its parent. This value
* must be between 1 and 256 (inclusive)
* @param exclusive indicates that the stream should be the exclusive dependent on its parent.
* This only applies if the stream has a parent.
* @return this stream.
*/
Http2Stream setPriority(int parentStreamId, short weight, boolean exclusive)
throws Http2Exception;
/**
* Indicates whether or not this stream is the root node of the priority tree.
*/
boolean isRoot();
/**
* Indicates whether or not this is a leaf node (i.e. {@link #numChildren} is 0) of the priority tree.
*/
boolean isLeaf();
/**
* Returns weight assigned to the dependency with the parent. The weight will be a value
* between 1 and 256.
*/
short weight();
/**
* The total of the weights of all children of this stream.
*/
int totalChildWeights();
/**
* The parent (i.e. the node in the priority tree on which this node depends), or {@code null}
* if this is the root node (i.e. the connection, itself).
*/
Http2Stream parent();
/**
* Indicates whether or not this stream is a descendant in the priority tree from the given stream.
*/
boolean isDescendantOf(Http2Stream stream);
/**
* Returns the number of child streams directly dependent on this stream.
*/
int numChildren();
/**
* Indicates whether the given stream is a direct child of this stream.
*/
boolean hasChild(int streamId);
/**
* Attempts to find a child of this stream with the given ID. If not found, returns
* {@code null}.
*/
Http2Stream child(int streamId);
/**
* Gets all streams that are direct dependents on this stream.
*/
Collection<? extends Http2Stream> children();
}

View File

@ -0,0 +1,43 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License, version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.netty.handler.codec.http2;
/**
* A policy for determining when it is appropriate to remove streams from a
* {@link Http2StreamRegistry}.
*/
public interface Http2StreamRemovalPolicy {
/**
* Performs the action of removing the stream.
*/
interface Action {
/**
* Removes the stream from the registry.
*/
void removeStream(Http2Stream stream);
}
/**
* Sets the removal action.
*/
void setAction(Action action);
/**
* Marks the given stream for removal. When this policy has determined that the given stream
* should be removed, it will call back the {@link Action}.
*/
void markForRemoval(Http2Stream stream);
}

View File

@ -15,7 +15,9 @@
package io.netty.handler.codec.http2;
import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import io.netty.handler.codec.http2.Http2Stream.State;
@ -189,4 +191,229 @@ public class DefaultHttp2ConnectionTest {
assertEquals(State.CLOSED, stream.state());
assertTrue(server.activeStreams().isEmpty());
}
@Test
public void prioritizeShouldUseDefaults() throws Exception {
Http2Stream stream = client.local().createStream(1, false);
assertEquals(1, client.connectionStream().numChildren());
assertEquals(stream, client.connectionStream().child(1));
assertEquals(DEFAULT_PRIORITY_WEIGHT, stream.weight());
assertEquals(0, stream.parent().id());
assertEquals(0, stream.numChildren());
}
@Test
public void reprioritizeWithNoChangeShouldDoNothing() throws Exception {
Http2Stream stream = client.local().createStream(1, false);
stream.setPriority(0, DEFAULT_PRIORITY_WEIGHT, false);
assertEquals(1, client.connectionStream().numChildren());
assertEquals(stream, client.connectionStream().child(1));
assertEquals(DEFAULT_PRIORITY_WEIGHT, stream.weight());
assertEquals(0, stream.parent().id());
assertEquals(0, stream.numChildren());
}
@Test
public void insertExclusiveShouldAddNewLevel() throws Exception {
Http2Stream streamA = client.local().createStream(1, false);
Http2Stream streamB = client.local().createStream(3, false);
Http2Stream streamC = client.local().createStream(5, false);
Http2Stream streamD = client.local().createStream(7, false);
streamB.setPriority(streamA.id(), DEFAULT_PRIORITY_WEIGHT, false);
streamC.setPriority(streamA.id(), DEFAULT_PRIORITY_WEIGHT, false);
streamD.setPriority(streamA.id(), DEFAULT_PRIORITY_WEIGHT, true);
assertEquals(4, client.numActiveStreams());
// Level 0
Http2Stream p = client.connectionStream();
assertEquals(1, p.numChildren());
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
// Level 1
p = p.child(streamA.id());
assertNotNull(p);
assertEquals(0, p.parent().id());
assertEquals(1, p.numChildren());
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
// Level 2
p = p.child(streamD.id());
assertNotNull(p);
assertEquals(streamA.id(), p.parent().id());
assertEquals(2, p.numChildren());
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
// Level 3
p = p.child(streamB.id());
assertNotNull(p);
assertEquals(streamD.id(), p.parent().id());
assertEquals(0, p.numChildren());
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
p = p.parent().child(streamC.id());
assertNotNull(p);
assertEquals(streamD.id(), p.parent().id());
assertEquals(0, p.numChildren());
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
}
@Test
public void removeShouldRestructureTree() throws Exception {
Http2Stream streamA = client.local().createStream(1, false);
Http2Stream streamB = client.local().createStream(3, false);
Http2Stream streamC = client.local().createStream(5, false);
Http2Stream streamD = client.local().createStream(7, false);
streamB.setPriority(streamA.id(), DEFAULT_PRIORITY_WEIGHT, false);
streamC.setPriority(streamB.id(), DEFAULT_PRIORITY_WEIGHT, false);
streamD.setPriority(streamB.id(), DEFAULT_PRIORITY_WEIGHT, false);
// Default removal policy will cause it to be removed immediately.
streamB.close();
// Level 0
Http2Stream p = client.connectionStream();
assertEquals(1, p.numChildren());
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
// Level 1
p = p.child(streamA.id());
assertNotNull(p);
assertEquals(0, p.parent().id());
assertEquals(2, p.numChildren());
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
// Level 2
p = p.child(streamC.id());
assertNotNull(p);
assertEquals(streamA.id(), p.parent().id());
assertEquals(0, p.numChildren());
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
p = p.parent().child(streamD.id());
assertNotNull(p);
assertEquals(streamA.id(), p.parent().id());
assertEquals(0, p.numChildren());
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
}
@Test
public void circularDependencyShouldRestructureTree() throws Exception {
// Using example from http://tools.ietf.org/html/draft-ietf-httpbis-http2-12#section-5.3.3
Http2Stream streamA = client.local().createStream(1, false);
Http2Stream streamB = client.local().createStream(3, false);
Http2Stream streamC = client.local().createStream(5, false);
Http2Stream streamD = client.local().createStream(7, false);
Http2Stream streamE = client.local().createStream(9, false);
Http2Stream streamF = client.local().createStream(11, false);
streamB.setPriority(streamA.id(), DEFAULT_PRIORITY_WEIGHT, false);
streamC.setPriority(streamA.id(), DEFAULT_PRIORITY_WEIGHT, false);
streamD.setPriority(streamC.id(), DEFAULT_PRIORITY_WEIGHT, false);
streamE.setPriority(streamC.id(), DEFAULT_PRIORITY_WEIGHT, false);
streamF.setPriority(streamD.id(), DEFAULT_PRIORITY_WEIGHT, false);
assertEquals(6, client.numActiveStreams());
// Non-exclusive re-prioritization of a->d.
streamA.setPriority(streamD.id(), DEFAULT_PRIORITY_WEIGHT, false);
// Level 0
Http2Stream p = client.connectionStream();
assertEquals(1, p.numChildren());
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
// Level 1
p = p.child(streamD.id());
assertNotNull(p);
assertEquals(2, p.numChildren());
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
// Level 2
p = p.child(streamF.id());
assertNotNull(p);
assertEquals(0, p.numChildren());
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
p = p.parent().child(streamA.id());
assertNotNull(p);
assertEquals(2, p.numChildren());
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
// Level 3
p = p.child(streamB.id());
assertNotNull(p);
assertEquals(0, p.numChildren());
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
p = p.parent().child(streamC.id());
assertNotNull(p);
assertEquals(1, p.numChildren());
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
// Level 4;
p = p.child(streamE.id());
assertNotNull(p);
assertEquals(0, p.numChildren());
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
}
@Test
public void circularDependencyWithExclusiveShouldRestructureTree() throws Exception {
// Using example from http://tools.ietf.org/html/draft-ietf-httpbis-http2-12#section-5.3.3
// Although the expected output for the exclusive case has an error in the document. The
// final dependency of C should be E (not F). This is fixed here.
Http2Stream streamA = client.local().createStream(1, false);
Http2Stream streamB = client.local().createStream(3, false);
Http2Stream streamC = client.local().createStream(5, false);
Http2Stream streamD = client.local().createStream(7, false);
Http2Stream streamE = client.local().createStream(9, false);
Http2Stream streamF = client.local().createStream(11, false);
streamB.setPriority(streamA.id(), DEFAULT_PRIORITY_WEIGHT, false);
streamC.setPriority(streamA.id(), DEFAULT_PRIORITY_WEIGHT, false);
streamD.setPriority(streamC.id(), DEFAULT_PRIORITY_WEIGHT, false);
streamE.setPriority(streamC.id(), DEFAULT_PRIORITY_WEIGHT, false);
streamF.setPriority(streamD.id(), DEFAULT_PRIORITY_WEIGHT, false);
assertEquals(6, client.numActiveStreams());
// Exclusive re-prioritization of a->d.
streamA.setPriority(streamD.id(), DEFAULT_PRIORITY_WEIGHT, true);
// Level 0
Http2Stream p = client.connectionStream();
assertEquals(1, p.numChildren());
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
// Level 1
p = p.child(streamD.id());
assertNotNull(p);
assertEquals(1, p.numChildren());
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
// Level 2
p = p.child(streamA.id());
assertNotNull(p);
assertEquals(3, p.numChildren());
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
// Level 3
p = p.child(streamB.id());
assertNotNull(p);
assertEquals(0, p.numChildren());
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
p = p.parent().child(streamF.id());
assertNotNull(p);
assertEquals(0, p.numChildren());
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
p = p.parent().child(streamC.id());
assertNotNull(p);
assertEquals(1, p.numChildren());
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
// Level 4;
p = p.child(streamE.id());
assertNotNull(p);
assertEquals(0, p.numChildren());
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
}
}

View File

@ -45,12 +45,16 @@ public class DefaultHttp2InboundFlowControllerTest {
@Mock
private FrameWriter frameWriter;
private DefaultHttp2Connection connection;
@Before
public void setup() throws Http2Exception {
MockitoAnnotations.initMocks(this);
controller = new DefaultHttp2InboundFlowController();
controller.addStream(STREAM_ID);
connection = new DefaultHttp2Connection(false, false);
controller = new DefaultHttp2InboundFlowController(connection);
connection.local().createStream(STREAM_ID, false);
}
@Test

View File

@ -41,9 +41,9 @@ import org.mockito.MockitoAnnotations;
*/
public class DefaultHttp2OutboundFlowControllerTest {
private static final int STREAM_A = 1;
private static final int STREAM_B = 2;
private static final int STREAM_C = 3;
private static final int STREAM_D = 4;
private static final int STREAM_B = 3;
private static final int STREAM_C = 5;
private static final int STREAM_D = 7;
private DefaultHttp2OutboundFlowController controller;
@ -53,15 +53,21 @@ public class DefaultHttp2OutboundFlowControllerTest {
@Mock
private FrameWriter frameWriter;
private DefaultHttp2Connection connection;
@Before
public void setup() throws Http2Exception {
MockitoAnnotations.initMocks(this);
controller = new DefaultHttp2OutboundFlowController();
controller.addStream(STREAM_A, 0, DEFAULT_PRIORITY_WEIGHT, false);
controller.addStream(STREAM_B, 0, DEFAULT_PRIORITY_WEIGHT, false);
controller.addStream(STREAM_C, STREAM_A, DEFAULT_PRIORITY_WEIGHT, false);
controller.addStream(STREAM_D, STREAM_A, DEFAULT_PRIORITY_WEIGHT, false);
connection = new DefaultHttp2Connection(false, false);
controller = new DefaultHttp2OutboundFlowController(connection);
connection.local().createStream(STREAM_A, false);
connection.local().createStream(STREAM_B, false);
Http2Stream streamC = connection.local().createStream(STREAM_C, false);
Http2Stream streamD = connection.local().createStream(STREAM_D, false);
streamC.setPriority(STREAM_A, DEFAULT_PRIORITY_WEIGHT, false);
streamD.setPriority(STREAM_A, DEFAULT_PRIORITY_WEIGHT, false);
}
@Test
@ -236,8 +242,8 @@ public class DefaultHttp2OutboundFlowControllerTest {
@Test
public void blockedStreamShouldSpreadDataToChildren() throws Http2Exception {
// Block the connection
controller.updateOutboundWindowSize(CONNECTION_STREAM_ID,
-DEFAULT_FLOW_CONTROL_WINDOW_SIZE);
controller
.updateOutboundWindowSize(CONNECTION_STREAM_ID, -DEFAULT_FLOW_CONTROL_WINDOW_SIZE);
// Block stream A
controller.updateOutboundWindowSize(STREAM_A, -DEFAULT_FLOW_CONTROL_WINDOW_SIZE);
@ -288,8 +294,8 @@ public class DefaultHttp2OutboundFlowControllerTest {
@Test
public void childrenShouldNotSendDataUntilParentBlocked() throws Http2Exception {
// Block the connection
controller.updateOutboundWindowSize(CONNECTION_STREAM_ID,
-DEFAULT_FLOW_CONTROL_WINDOW_SIZE);
controller
.updateOutboundWindowSize(CONNECTION_STREAM_ID, -DEFAULT_FLOW_CONTROL_WINDOW_SIZE);
// Block stream B
controller.updateOutboundWindowSize(STREAM_B, -DEFAULT_FLOW_CONTROL_WINDOW_SIZE);
@ -331,8 +337,8 @@ public class DefaultHttp2OutboundFlowControllerTest {
@Test
public void parentShouldWaterFallDataToChildren() throws Http2Exception {
// Block the connection
controller.updateOutboundWindowSize(CONNECTION_STREAM_ID,
-DEFAULT_FLOW_CONTROL_WINDOW_SIZE);
controller
.updateOutboundWindowSize(CONNECTION_STREAM_ID, -DEFAULT_FLOW_CONTROL_WINDOW_SIZE);
// Block stream B
controller.updateOutboundWindowSize(STREAM_B, -DEFAULT_FLOW_CONTROL_WINDOW_SIZE);
@ -378,8 +384,9 @@ public class DefaultHttp2OutboundFlowControllerTest {
* C D
* </pre>
*
* We then re-prioritize D so that it's directly off of the connection and verify that A and D split the
* written bytes between them.
* We then re-prioritize D so that it's directly off of the connection and verify that A and D
* split the written bytes between them.
*
* <pre>
* 0
* /|\
@ -392,8 +399,8 @@ public class DefaultHttp2OutboundFlowControllerTest {
@Test
public void reprioritizeShouldAdjustOutboundFlow() throws Http2Exception {
// Block the connection
controller.updateOutboundWindowSize(CONNECTION_STREAM_ID,
-DEFAULT_FLOW_CONTROL_WINDOW_SIZE);
controller
.updateOutboundWindowSize(CONNECTION_STREAM_ID, -DEFAULT_FLOW_CONTROL_WINDOW_SIZE);
// Block stream B
controller.updateOutboundWindowSize(STREAM_B, -DEFAULT_FLOW_CONTROL_WINDOW_SIZE);
@ -409,7 +416,7 @@ public class DefaultHttp2OutboundFlowControllerTest {
verifyNoWrite(STREAM_D);
// Re-prioritize D as a direct child of the connection.
controller.updateStream(STREAM_D, 0, DEFAULT_PRIORITY_WEIGHT, false);
setPriority(STREAM_D, 0, DEFAULT_PRIORITY_WEIGHT, false);
// Verify that the entire frame was sent.
controller.updateOutboundWindowSize(CONNECTION_STREAM_ID, 10);
@ -425,8 +432,8 @@ public class DefaultHttp2OutboundFlowControllerTest {
}
/**
* In this test, we root all streams at the connection, and then verify that data
* is split appropriately based on weight (all available data is the same).
* In this test, we root all streams at the connection, and then verify that data is split
* appropriately based on weight (all available data is the same).
*
* <pre>
* 0
@ -437,14 +444,14 @@ public class DefaultHttp2OutboundFlowControllerTest {
@Test
public void writeShouldPreferHighestWeight() throws Http2Exception {
// Block the connection
controller.updateOutboundWindowSize(CONNECTION_STREAM_ID,
-DEFAULT_FLOW_CONTROL_WINDOW_SIZE);
controller
.updateOutboundWindowSize(CONNECTION_STREAM_ID, -DEFAULT_FLOW_CONTROL_WINDOW_SIZE);
// Root the streams at the connection and assign weights.
controller.updateStream(STREAM_A, 0, (short) 50, false);
controller.updateStream(STREAM_B, 0, (short) 200, false);
controller.updateStream(STREAM_C, 0, (short) 100, false);
controller.updateStream(STREAM_D, 0, (short) 100, false);
setPriority(STREAM_A, 0, (short) 50, false);
setPriority(STREAM_B, 0, (short) 200, false);
setPriority(STREAM_C, 0, (short) 100, false);
setPriority(STREAM_D, 0, (short) 100, false);
// Send a bunch of data on each stream.
send(STREAM_A, dummyData(1000));
@ -489,8 +496,8 @@ public class DefaultHttp2OutboundFlowControllerTest {
}
/**
* In this test, we root all streams at the connection, and then verify that data
* is split equally among the stream, since they all have the same weight.
* In this test, we root all streams at the connection, and then verify that data is split
* equally among the stream, since they all have the same weight.
*
* <pre>
* 0
@ -501,14 +508,14 @@ public class DefaultHttp2OutboundFlowControllerTest {
@Test
public void samePriorityShouldWriteEqualData() throws Http2Exception {
// Block the connection
controller.updateOutboundWindowSize(CONNECTION_STREAM_ID,
-DEFAULT_FLOW_CONTROL_WINDOW_SIZE);
controller
.updateOutboundWindowSize(CONNECTION_STREAM_ID, -DEFAULT_FLOW_CONTROL_WINDOW_SIZE);
// Root the streams at the connection with the same weights.
controller.updateStream(STREAM_A, 0, DEFAULT_PRIORITY_WEIGHT, false);
controller.updateStream(STREAM_B, 0, DEFAULT_PRIORITY_WEIGHT, false);
controller.updateStream(STREAM_C, 0, DEFAULT_PRIORITY_WEIGHT, false);
controller.updateStream(STREAM_D, 0, DEFAULT_PRIORITY_WEIGHT, false);
setPriority(STREAM_A, 0, DEFAULT_PRIORITY_WEIGHT, false);
setPriority(STREAM_B, 0, DEFAULT_PRIORITY_WEIGHT, false);
setPriority(STREAM_C, 0, DEFAULT_PRIORITY_WEIGHT, false);
setPriority(STREAM_D, 0, DEFAULT_PRIORITY_WEIGHT, false);
// Send a bunch of data on each stream.
send(STREAM_A, dummyData(400));
@ -567,6 +574,11 @@ public class DefaultHttp2OutboundFlowControllerTest {
eq(false), eq(false));
}
private void setPriority(int stream, int parent, int weight, boolean exclusive)
throws Http2Exception {
connection.stream(stream).setPriority(parent, (short) weight, exclusive);
}
private ByteBuf dummyData(int size) {
ByteBuf buffer = Unpooled.buffer(size);
buffer.writerIndex(size);

View File

@ -1,261 +0,0 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License, version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.netty.handler.codec.http2;
import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import io.netty.handler.codec.http2.Http2PriorityTree.Priority;
import org.junit.Before;
import org.junit.Test;
/**
* Tests for {@link DefaultHttp2PriorityTree}.
*/
public class DefaultHttp2PriorityTreeTest {
private DefaultHttp2PriorityTree<Object> tree;
@Before
public void setup() {
tree = new DefaultHttp2PriorityTree<Object>();
}
@Test
public void prioritizeShouldUseDefaults() {
tree.prioritizeUsingDefaults(1);
assertEquals(1, tree.root().numChildren());
Priority<Object> p = tree.root().children().iterator().next();
assertEquals(1, p.streamId());
assertEquals(DEFAULT_PRIORITY_WEIGHT, p.weight());
assertEquals(0, p.parent().streamId());
assertEquals(0, p.numChildren());
}
@Test
public void prioritizeFromEmptyShouldSucceed() {
tree.prioritize(1, 0, DEFAULT_PRIORITY_WEIGHT, false);
assertEquals(1, tree.root().numChildren());
Priority<Object> p = tree.root().child(1);
assertNotNull(p);
assertEquals(DEFAULT_PRIORITY_WEIGHT, p.weight());
assertEquals(0, p.parent().streamId());
assertEquals(0, p.numChildren());
}
@Test
public void reprioritizeWithNoChangeShouldDoNothing() {
tree.prioritize(1, 0, DEFAULT_PRIORITY_WEIGHT, false);
tree.prioritize(1, 0, DEFAULT_PRIORITY_WEIGHT, false);
assertEquals(1, tree.root().numChildren());
Priority<Object> p = tree.root().child(1);
assertNotNull(p);
assertEquals(DEFAULT_PRIORITY_WEIGHT, p.weight());
assertEquals(0, p.parent().streamId());
assertEquals(0, p.numChildren());
}
@Test
public void insertExclusiveShouldAddNewLevel() {
tree.prioritize(1, 0, DEFAULT_PRIORITY_WEIGHT, false);
tree.prioritize(2, 1, DEFAULT_PRIORITY_WEIGHT, false);
tree.prioritize(3, 1, DEFAULT_PRIORITY_WEIGHT, false);
tree.prioritize(4, 1, DEFAULT_PRIORITY_WEIGHT, true);
assertEquals(4, tree.size());
// Level 0
Priority<Object> p = tree.root();
assertEquals(1, p.numChildren());
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
// Level 1
p = p.child(1);
assertNotNull(p);
assertEquals(0, p.parent().streamId());
assertEquals(1, p.numChildren());
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
// Level 2
p = p.child(4);
assertNotNull(p);
assertEquals(1, p.parent().streamId());
assertEquals(2, p.numChildren());
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
// Level 3
p = p.child(2);
assertNotNull(p);
assertEquals(4, p.parent().streamId());
assertEquals(0, p.numChildren());
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
p = p.parent().child(3);
assertNotNull(p);
assertEquals(4, p.parent().streamId());
assertEquals(0, p.numChildren());
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
}
@Test
public void removeShouldRestructureTree() {
tree.prioritize(1, 0, DEFAULT_PRIORITY_WEIGHT, false);
tree.prioritize(2, 1, DEFAULT_PRIORITY_WEIGHT, false);
tree.prioritize(3, 2, DEFAULT_PRIORITY_WEIGHT, false);
tree.prioritize(4, 2, DEFAULT_PRIORITY_WEIGHT, false);
tree.remove(2);
// Level 0
Priority<Object> p = tree.root();
assertEquals(1, p.numChildren());
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
// Level 1
p = p.child(1);
assertNotNull(p);
assertEquals(0, p.parent().streamId());
assertEquals(2, p.numChildren());
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
// Level 2
p = p.child(3);
assertNotNull(p);
assertEquals(1, p.parent().streamId());
assertEquals(0, p.numChildren());
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
p = p.parent().child(4);
assertNotNull(p);
assertEquals(1, p.parent().streamId());
assertEquals(0, p.numChildren());
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
}
@Test
public void circularDependencyShouldRestructureTree() {
// Using example from http://tools.ietf.org/html/draft-ietf-httpbis-http2-12#section-5.3.3
int a = 1;
int b = 2;
int c = 3;
int d = 4;
int e = 5;
int f = 6;
tree.prioritize(a, 0, DEFAULT_PRIORITY_WEIGHT, false);
tree.prioritize(b, a, DEFAULT_PRIORITY_WEIGHT, false);
tree.prioritize(c, a, DEFAULT_PRIORITY_WEIGHT, false);
tree.prioritize(d, c, DEFAULT_PRIORITY_WEIGHT, false);
tree.prioritize(e, c, DEFAULT_PRIORITY_WEIGHT, false);
tree.prioritize(f, d, DEFAULT_PRIORITY_WEIGHT, false);
assertEquals(6, tree.size());
// Non-exclusive re-prioritization of a->d.
tree.prioritize(a, d, DEFAULT_PRIORITY_WEIGHT, false);
// Level 0
Priority<Object> p = tree.root();
assertEquals(1, p.numChildren());
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
// Level 1
p = p.child(d);
assertNotNull(p);
assertEquals(2, p.numChildren());
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
// Level 2
p = p.child(f);
assertNotNull(p);
assertEquals(0, p.numChildren());
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
p = p.parent().child(a);
assertNotNull(p);
assertEquals(2, p.numChildren());
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
// Level 3
p = p.child(b);
assertNotNull(p);
assertEquals(0, p.numChildren());
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
p = p.parent().child(c);
assertNotNull(p);
assertEquals(1, p.numChildren());
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
// Level 4;
p = p.child(e);
assertNotNull(p);
assertEquals(0, p.numChildren());
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
}
@Test
public void circularDependencyWithExclusiveShouldRestructureTree() {
// Using example from http://tools.ietf.org/html/draft-ietf-httpbis-http2-12#section-5.3.3
// Although the expected output for the exclusive case has an error in the document. The
// final dependency of C should be E (not F). This is fixed here.
int a = 1;
int b = 2;
int c = 3;
int d = 4;
int e = 5;
int f = 6;
tree.prioritize(a, 0, DEFAULT_PRIORITY_WEIGHT, false);
tree.prioritize(b, a, DEFAULT_PRIORITY_WEIGHT, false);
tree.prioritize(c, a, DEFAULT_PRIORITY_WEIGHT, false);
tree.prioritize(d, c, DEFAULT_PRIORITY_WEIGHT, false);
tree.prioritize(e, c, DEFAULT_PRIORITY_WEIGHT, false);
tree.prioritize(f, d, DEFAULT_PRIORITY_WEIGHT, false);
assertEquals(6, tree.size());
// Exclusive re-prioritization of a->d.
tree.prioritize(a, d, DEFAULT_PRIORITY_WEIGHT, true);
// Level 0
Priority<Object> p = tree.root();
assertEquals(1, p.numChildren());
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
// Level 1
p = p.child(d);
assertNotNull(p);
assertEquals(1, p.numChildren());
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
// Level 2
p = p.child(a);
assertNotNull(p);
assertEquals(3, p.numChildren());
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
// Level 3
p = p.child(b);
assertNotNull(p);
assertEquals(0, p.numChildren());
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
p = p.parent().child(f);
assertNotNull(p);
assertEquals(0, p.numChildren());
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
p = p.parent().child(c);
assertNotNull(p);
assertEquals(1, p.numChildren());
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
// Level 4;
p = p.child(e);
assertNotNull(p);
assertEquals(0, p.numChildren());
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
}
}

View File

@ -223,8 +223,6 @@ public class DelegatingHttp2ConnectionHandlerTest {
public void channelInactiveShouldCloseStreams() throws Exception {
handler.channelInactive(ctx);
verify(stream).close();
verify(inboundFlow).removeStream(STREAM_ID);
verify(outboundFlow).removeStream(STREAM_ID);
}
@Test
@ -232,8 +230,6 @@ public class DelegatingHttp2ConnectionHandlerTest {
Http2Exception e = new Http2StreamException(STREAM_ID, PROTOCOL_ERROR);
handler.exceptionCaught(ctx, e);
verify(stream).close();
verify(inboundFlow).removeStream(STREAM_ID);
verify(outboundFlow).removeStream(STREAM_ID);
verify(writer).writeRstStream(eq(ctx), eq(promise), eq(STREAM_ID),
eq((long) PROTOCOL_ERROR.code()));
}
@ -295,16 +291,13 @@ public class DelegatingHttp2ConnectionHandlerTest {
// Verify that the event was absorbed and not propagated to the oberver.
verify(observer, never()).onHeadersRead(eq(ctx), anyInt(), any(Http2Headers.class),
anyInt(), anyBoolean(), anyBoolean());
verify(outboundFlow, never()).updateStream(anyInt(), anyInt(), anyShort(), anyBoolean());
verify(inboundFlow, never()).addStream(anyInt());
verify(remote, never()).createStream(anyInt(), anyBoolean());
}
@Test
public void headersReadForUnknownStreamShouldCreateStream() throws Exception {
decode().onHeadersRead(ctx, 5, EMPTY_HEADERS, 0, false, false);
verify(remote).createStream(eq(5), eq(false));
verify(outboundFlow).addStream(eq(5), eq(0), eq(DEFAULT_PRIORITY_WEIGHT), eq(false));
verify(inboundFlow).addStream(5);
verify(observer).onHeadersRead(eq(ctx), eq(5), eq(EMPTY_HEADERS), eq(0),
eq(DEFAULT_PRIORITY_WEIGHT), eq(false), eq(0), eq(false), eq(false));
}
@ -313,8 +306,6 @@ public class DelegatingHttp2ConnectionHandlerTest {
public void headersReadForUnknownStreamShouldCreateHalfClosedStream() throws Exception {
decode().onHeadersRead(ctx, 5, EMPTY_HEADERS, 0, true, false);
verify(remote).createStream(eq(5), eq(true));
verify(outboundFlow).addStream(eq(5), eq(0), eq(DEFAULT_PRIORITY_WEIGHT), eq(false));
verify(inboundFlow, never()).addStream(5);
verify(observer).onHeadersRead(eq(ctx), eq(5), eq(EMPTY_HEADERS), eq(0),
eq(DEFAULT_PRIORITY_WEIGHT), eq(false), eq(0), eq(true), eq(false));
}
@ -324,8 +315,6 @@ public class DelegatingHttp2ConnectionHandlerTest {
when(stream.state()).thenReturn(RESERVED_REMOTE);
decode().onHeadersRead(ctx, STREAM_ID, EMPTY_HEADERS, 0, false, false);
verify(stream).openForPush();
verify(outboundFlow, never()).addStream(anyInt(), anyInt(), anyShort(), anyBoolean());
verify(inboundFlow).addStream(STREAM_ID);
verify(observer).onHeadersRead(eq(ctx), eq(STREAM_ID), eq(EMPTY_HEADERS), eq(0),
eq(DEFAULT_PRIORITY_WEIGHT), eq(false), eq(0), eq(false), eq(false));
}
@ -336,10 +325,6 @@ public class DelegatingHttp2ConnectionHandlerTest {
decode().onHeadersRead(ctx, STREAM_ID, EMPTY_HEADERS, 0, true, false);
verify(stream).openForPush();
verify(stream).close();
verify(outboundFlow, never()).addStream(anyInt(), anyInt(), anyShort(), anyBoolean());
verify(inboundFlow, never()).addStream(STREAM_ID);
verify(outboundFlow).removeStream(STREAM_ID);
verify(inboundFlow).removeStream(STREAM_ID);
verify(observer).onHeadersRead(eq(ctx), eq(STREAM_ID), eq(EMPTY_HEADERS), eq(0),
eq(DEFAULT_PRIORITY_WEIGHT), eq(false), eq(0), eq(true), eq(false));
}
@ -365,24 +350,14 @@ public class DelegatingHttp2ConnectionHandlerTest {
public void priorityReadAfterGoAwayShouldBeIgnored() throws Exception {
when(connection.isGoAwaySent()).thenReturn(true);
decode().onPriorityRead(ctx, STREAM_ID, 0, (short) 255, true);
verify(outboundFlow, never()).updateStream(anyInt(), anyInt(), anyShort(), anyBoolean());
verify(stream, never()).setPriority(anyInt(), anyShort(), anyBoolean());
verify(observer, never()).onPriorityRead(eq(ctx), anyInt(), anyInt(), anyShort(), anyBoolean());
}
@Test
public void priorityReadForUnknownStreamShouldUpdateFlowController() throws Exception {
// The outbound flow controller may keep a prioritized stream around for some time after
// being closed. Verify that the flow controller is updated regardless of the presence of
// the stream.
decode().onPriorityRead(ctx, 5, 0, (short) 255, true);
verify(outboundFlow).updateStream(eq(5), eq(0), eq((short) 255), eq(true));
verify(observer).onPriorityRead(eq(ctx), eq(5), eq(0), eq((short) 255), eq(true));
}
@Test
public void priorityReadShouldSucceed() throws Exception {
decode().onPriorityRead(ctx, STREAM_ID, 0, (short) 255, true);
verify(outboundFlow).updateStream(eq(STREAM_ID), eq(0), eq((short) 255), eq(true));
verify(stream).setPriority(eq(0), eq((short) 255), eq(true));
verify(observer).onPriorityRead(eq(ctx), eq(STREAM_ID), eq(0), eq((short) 255), eq(true));
}
@ -535,10 +510,9 @@ public class DelegatingHttp2ConnectionHandlerTest {
when(connection.isGoAway()).thenReturn(true);
ChannelFuture future = handler.writeHeaders(
ctx, promise, 5, EMPTY_HEADERS, 0, (short) 255, false, 0, false, false);
verify(local, never()).createStream(anyInt(), anyBoolean());
verify(writer, never()).writeHeaders(eq(ctx), eq(promise), anyInt(),
any(Http2Headers.class), anyInt(), anyBoolean(), anyBoolean());
verify(outboundFlow, never()).addStream(anyInt(), anyInt(), anyShort(), anyBoolean());
verify(inboundFlow, never()).addStream(anyInt());
assertTrue(future.awaitUninterruptibly().cause() instanceof Http2Exception);
}
@ -548,8 +522,6 @@ public class DelegatingHttp2ConnectionHandlerTest {
verify(local).createStream(eq(5), eq(false));
verify(writer).writeHeaders(eq(ctx), eq(promise), eq(5), eq(EMPTY_HEADERS), eq(0),
eq(DEFAULT_PRIORITY_WEIGHT), eq(false), eq(0), eq(false), eq(false));
verify(outboundFlow).addStream(eq(5), eq(0), eq(DEFAULT_PRIORITY_WEIGHT), eq(false));
verify(inboundFlow).addStream(5);
}
@Test
@ -558,8 +530,6 @@ public class DelegatingHttp2ConnectionHandlerTest {
verify(local).createStream(eq(5), eq(true));
verify(writer).writeHeaders(eq(ctx), eq(promise), eq(5), eq(EMPTY_HEADERS), eq(0),
eq(DEFAULT_PRIORITY_WEIGHT), eq(false), eq(0), eq(true), eq(false));
verify(outboundFlow, never()).addStream(anyInt(), anyInt(), anyShort(), anyBoolean());
verify(inboundFlow).addStream(5);
}
@Test
@ -607,7 +577,7 @@ public class DelegatingHttp2ConnectionHandlerTest {
@Test
public void priorityWriteShouldSetPriorityForStream() throws Exception {
handler.writePriority(ctx, promise, STREAM_ID, 0, (short) 255, true);
verify(outboundFlow).updateStream(eq(STREAM_ID), eq(0), eq((short) 255), eq(true));
verify(stream).setPriority(eq(0), eq((short) 255), eq(true));
verify(writer).writePriority(eq(ctx), eq(promise), eq(STREAM_ID), eq(0), eq((short) 255),
eq(true));
}

View File

@ -25,6 +25,7 @@ import io.netty.handler.codec.http2.DefaultHttp2FrameWriter;
import io.netty.handler.codec.http2.DefaultHttp2Headers;
import io.netty.handler.codec.http2.DefaultHttp2InboundFlowController;
import io.netty.handler.codec.http2.DefaultHttp2OutboundFlowController;
import io.netty.handler.codec.http2.Http2Connection;
import io.netty.handler.codec.http2.Http2Exception;
import io.netty.handler.codec.http2.Http2FrameLogger;
import io.netty.handler.codec.http2.Http2FrameReader;
@ -56,8 +57,13 @@ public class Http2ClientConnectionHandler extends AbstractHttp2ConnectionHandler
private ByteBuf collectedData;
public Http2ClientConnectionHandler(ChannelPromise initPromise, ChannelPromise responsePromise) {
super(new DefaultHttp2Connection(false, false), frameReader(), frameWriter(),
new DefaultHttp2InboundFlowController(), new DefaultHttp2OutboundFlowController());
this(initPromise, responsePromise, new DefaultHttp2Connection(false, false));
}
private Http2ClientConnectionHandler(ChannelPromise initPromise,
ChannelPromise responsePromise, Http2Connection connection) {
super(connection, frameReader(), frameWriter(), new DefaultHttp2InboundFlowController(
connection), new DefaultHttp2OutboundFlowController(connection));
this.initPromise = initPromise;
this.responsePromise = responsePromise;
}

View File

@ -26,6 +26,7 @@ import io.netty.handler.codec.http2.DefaultHttp2FrameWriter;
import io.netty.handler.codec.http2.DefaultHttp2Headers;
import io.netty.handler.codec.http2.DefaultHttp2InboundFlowController;
import io.netty.handler.codec.http2.DefaultHttp2OutboundFlowController;
import io.netty.handler.codec.http2.Http2Connection;
import io.netty.handler.codec.http2.Http2Exception;
import io.netty.handler.codec.http2.Http2FrameLogger;
import io.netty.handler.codec.http2.Http2Headers;
@ -34,7 +35,6 @@ import io.netty.handler.codec.http2.Http2OutboundFrameLogger;
import io.netty.handler.codec.http2.Http2Settings;
import io.netty.util.CharsetUtil;
import io.netty.util.internal.logging.InternalLoggerFactory;
import static io.netty.example.http2.Http2ExampleUtil.*;
import static io.netty.util.internal.logging.InternalLogLevel.*;
@ -48,10 +48,14 @@ public class HelloWorldHttp2Handler extends AbstractHttp2ConnectionHandler {
static final byte[] RESPONSE_BYTES = "Hello World".getBytes(CharsetUtil.UTF_8);
public HelloWorldHttp2Handler() {
super(new DefaultHttp2Connection(true, false), new Http2InboundFrameLogger(
new DefaultHttp2FrameReader(), logger), new Http2OutboundFrameLogger(
new DefaultHttp2FrameWriter(), logger), new DefaultHttp2InboundFlowController(),
new DefaultHttp2OutboundFlowController());
this(new DefaultHttp2Connection(true, false));
}
private HelloWorldHttp2Handler(Http2Connection connection) {
super(connection, new Http2InboundFrameLogger(new DefaultHttp2FrameReader(), logger),
new Http2OutboundFrameLogger(new DefaultHttp2FrameWriter(), logger),
new DefaultHttp2InboundFlowController(connection),
new DefaultHttp2OutboundFlowController(connection));
}
/**