HTTP/2 Priority Tree Restructure

Motivation:

The current interface and implementation for HTTP/2 priority tree events does not notify listeners of all parent change events. As a result operations which depend upon knowing about parent change events may be missing events and result in stale data. This interface also allows new listeners to easily consume priority tree change events.

Modifications:

-Http2Connection.Listener interface will change to support notifications on every node after the priority has changed and tree events have settled
-This will affect the outbound flow controller, DefaultHttp2Connection, and other listeners using the old interface

Result:
A modified (hopefully simplified and correct) Listener interface to get priority tree change event notification
This commit is contained in:
Scott Mitchell 2014-08-22 15:53:06 -04:00 committed by nmittler
parent 21bc279700
commit d3538dee2e
6 changed files with 682 additions and 246 deletions

View File

@ -15,22 +15,34 @@
package io.netty.handler.codec.http2;
import static io.netty.handler.codec.http2.Http2CodecUtil.CONNECTION_STREAM_ID;
import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT;
import static io.netty.handler.codec.http2.Http2CodecUtil.MAX_WEIGHT;
import static io.netty.handler.codec.http2.Http2CodecUtil.MIN_WEIGHT;
import static io.netty.handler.codec.http2.Http2CodecUtil.immediateRemovalPolicy;
import static io.netty.handler.codec.http2.Http2Exception.format;
import static io.netty.handler.codec.http2.Http2Exception.protocolError;
import static io.netty.handler.codec.http2.Http2Stream.State.CLOSED;
import static io.netty.handler.codec.http2.Http2Stream.State.HALF_CLOSED_LOCAL;
import static io.netty.handler.codec.http2.Http2Stream.State.HALF_CLOSED_REMOTE;
import static io.netty.handler.codec.http2.Http2Stream.State.IDLE;
import static io.netty.handler.codec.http2.Http2Stream.State.OPEN;
import static io.netty.handler.codec.http2.Http2Stream.State.RESERVED_LOCAL;
import static io.netty.handler.codec.http2.Http2Stream.State.RESERVED_REMOTE;
import io.netty.handler.codec.http2.Http2StreamRemovalPolicy.Action;
import io.netty.util.collection.IntObjectHashMap;
import io.netty.util.collection.IntObjectMap;
import io.netty.util.collection.PrimitiveCollections;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import static io.netty.handler.codec.http2.Http2CodecUtil.*;
import static io.netty.handler.codec.http2.Http2Exception.*;
import static io.netty.handler.codec.http2.Http2Stream.State.*;
/**
* Simple implementation of {@link Http2Connection}.
*/
@ -47,7 +59,8 @@ public class DefaultHttp2Connection implements Http2Connection {
/**
* Creates a connection with an immediate stream removal policy.
*
* @param server whether or not this end-point is the server-side of the HTTP/2 connection.
* @param server
* whether or not this end-point is the server-side of the HTTP/2 connection.
*/
public DefaultHttp2Connection(boolean server) {
this(server, immediateRemovalPolicy());
@ -56,11 +69,12 @@ public class DefaultHttp2Connection implements Http2Connection {
/**
* Creates a new connection with the given settings.
*
* @param server whether or not this end-point is the server-side of the HTTP/2 connection.
* @param removalPolicy the policy to be used for removal of closed stream.
* @param server
* whether or not this end-point is the server-side of the HTTP/2 connection.
* @param removalPolicy
* the policy to be used for removal of closed stream.
*/
public DefaultHttp2Connection(boolean server,
Http2StreamRemovalPolicy removalPolicy) {
public DefaultHttp2Connection(boolean server, Http2StreamRemovalPolicy removalPolicy) {
if (removalPolicy == null) {
throw new NullPointerException("removalPolicy");
}
@ -147,7 +161,7 @@ public class DefaultHttp2Connection implements Http2Connection {
// Remove it from the map and priority tree.
streamMap.remove(stream.id());
((DefaultStream) stream.parent()).removeChild(stream);
stream.parent().removeChild(stream);
}
private void activate(DefaultStream stream) {
@ -264,7 +278,7 @@ public class DefaultHttp2Connection implements Http2Connection {
}
@Override
public final Http2Stream parent() {
public final DefaultStream parent() {
return parent;
}
@ -307,12 +321,10 @@ public class DefaultHttp2Connection implements Http2Connection {
}
@Override
public Http2Stream setPriority(int parentStreamId, short weight, boolean exclusive)
throws Http2Exception {
public Http2Stream setPriority(int parentStreamId, short weight, boolean exclusive) throws Http2Exception {
if (weight < MIN_WEIGHT || weight > MAX_WEIGHT) {
throw new IllegalArgumentException(String.format(
"Invalid weight: %d. Must be between %d and %d (inclusive).", weight,
MIN_WEIGHT, MAX_WEIGHT));
"Invalid weight: %d. Must be between %d and %d (inclusive).", weight, MIN_WEIGHT, MAX_WEIGHT));
}
// Get the parent stream.
@ -324,52 +336,23 @@ public class DefaultHttp2Connection implements Http2Connection {
// Already have a priority. Re-prioritize the stream.
weight(weight);
boolean needToRestructure = newParent.isDescendantOf(this);
DefaultStream oldParent = (DefaultStream) parent();
try {
if (newParent == oldParent && !exclusive) {
// No changes were made to the tree structure.
return this;
}
// Break off the priority branch from it's current parent.
oldParent.removeChildBranch(this);
if (needToRestructure) {
// Adding a circular dependency (priority<->newParent). Break off the new
// parent's branch and add it above this priority.
((DefaultStream) newParent.parent()).removeChildBranch(newParent);
oldParent.addChild(newParent, false);
}
// Add the priority under the new parent.
newParent.addChild(this, exclusive);
return this;
} finally {
// Notify observers.
if (needToRestructure) {
notifyPrioritySubtreeChanged(this, newParent);
if (newParent != parent() || exclusive) {
List<ParentChangedEvent> events = null;
if (newParent.isDescendantOf(this)) {
events = new ArrayList<ParentChangedEvent>(2 + (exclusive ? newParent.children().size() : 0));
parent.takeChild(newParent, false, events);
} else {
notifyPriorityChanged(this, oldParent);
events = new ArrayList<ParentChangedEvent>(1 + (exclusive ? newParent.children().size() : 0));
}
newParent.takeChild(this, exclusive, events);
notifyParentChanged(events);
}
}
private void notifyPriorityChanged(Http2Stream stream, Http2Stream previousParent) {
for (Listener listener : listeners) {
listener.streamPriorityChanged(stream, previousParent);
}
}
private void notifyPrioritySubtreeChanged(Http2Stream stream, Http2Stream subtreeRoot) {
for (Listener listener : listeners) {
listener.streamPrioritySubtreeChanged(stream, subtreeRoot);
}
return this;
}
@Override
public Http2Stream verifyState(Http2Error error, State... allowedStates)
throws Http2Exception {
public Http2Stream verifyState(Http2Error error, State... allowedStates) throws Http2Exception {
for (State allowedState : allowedStates) {
if (state == allowedState) {
return this;
@ -381,14 +364,14 @@ public class DefaultHttp2Connection implements Http2Connection {
@Override
public Http2Stream openForPush() throws Http2Exception {
switch (state) {
case RESERVED_LOCAL:
state = HALF_CLOSED_REMOTE;
break;
case RESERVED_REMOTE:
state = HALF_CLOSED_LOCAL;
break;
default:
throw protocolError("Attempting to open non-reserved stream for push");
case RESERVED_LOCAL:
state = HALF_CLOSED_REMOTE;
break;
case RESERVED_REMOTE:
state = HALF_CLOSED_LOCAL;
break;
default:
throw protocolError("Attempting to open non-reserved stream for push");
}
activate(this);
return this;
@ -423,15 +406,15 @@ public class DefaultHttp2Connection implements Http2Connection {
@Override
public Http2Stream closeLocalSide() {
switch (state) {
case OPEN:
state = HALF_CLOSED_LOCAL;
notifyHalfClosed(this);
break;
case HALF_CLOSED_LOCAL:
break;
default:
close();
break;
case OPEN:
state = HALF_CLOSED_LOCAL;
notifyHalfClosed(this);
break;
case HALF_CLOSED_LOCAL:
break;
default:
close();
break;
}
return this;
}
@ -439,15 +422,15 @@ public class DefaultHttp2Connection implements Http2Connection {
@Override
public Http2Stream closeRemoteSide() {
switch (state) {
case OPEN:
state = HALF_CLOSED_REMOTE;
notifyHalfClosed(this);
break;
case HALF_CLOSED_REMOTE:
break;
default:
close();
break;
case OPEN:
state = HALF_CLOSED_REMOTE;
notifyHalfClosed(this);
break;
case HALF_CLOSED_REMOTE:
break;
default:
close();
break;
}
return this;
}
@ -469,15 +452,21 @@ public class DefaultHttp2Connection implements Http2Connection {
}
final DefaultEndpoint createdBy() {
return localEndpoint.createdStreamId(id)? localEndpoint : remoteEndpoint;
return localEndpoint.createdStreamId(id) ? localEndpoint : remoteEndpoint;
}
final void weight(short weight) {
if (parent != null && weight != this.weight) {
int delta = weight - this.weight;
parent.totalChildWeights += delta;
if (weight != this.weight) {
if (parent != null) {
int delta = weight - this.weight;
parent.totalChildWeights += delta;
}
final short oldWeight = this.weight;
this.weight = weight;
for (Listener l : listeners) {
l.onWeightChanged(this, oldWeight);
}
}
this.weight = weight;
}
final IntObjectMap<DefaultStream> removeAllChildren() {
@ -492,48 +481,50 @@ public class DefaultHttp2Connection implements Http2Connection {
}
/**
* Adds a child to this priority. If exclusive is set, any children of this node are moved
* to being dependent on the child.
* Adds a child to this priority. If exclusive is set, any children of this node are moved to being dependent on
* the child.
*/
final void addChild(DefaultStream child, boolean exclusive) {
final void takeChild(DefaultStream child, boolean exclusive, List<ParentChangedEvent> events) {
DefaultStream oldParent = child.parent();
events.add(new ParentChangedEvent(child, oldParent));
notifyParentChanging(child, this);
child.parent = this;
if (exclusive) {
// If it was requested that this child be the exclusive dependency of this node,
// move any previous children to the child node, becoming grand children
// of this node.
for (DefaultStream grandchild : removeAllChildren().values(DefaultStream.class)) {
child.addChild(grandchild, false);
child.takeChild(grandchild, false, events);
}
}
child.parent = this;
if (children.put(child.id(), child) == null) {
totalChildWeights += child.weight();
}
if (oldParent != null && oldParent.children.remove(child.id()) != null) {
oldParent.totalChildWeights -= child.weight();
}
}
/**
* Removes the child priority and moves any of its dependencies to being direct dependencies
* on this node.
* Removes the child priority and moves any of its dependencies to being direct dependencies on this node.
*/
final void removeChild(DefaultStream child) {
if (children.remove(child.id()) != null) {
List<ParentChangedEvent> events = new ArrayList<ParentChangedEvent>(1 + child.children.size());
events.add(new ParentChangedEvent(child, child.parent()));
notifyParentChanging(child, null);
child.parent = null;
totalChildWeights -= child.weight();
// Move up any grand children to be directly dependent on this node.
for (DefaultStream grandchild : child.children.values(DefaultStream.class)) {
addChild(grandchild, false);
takeChild(grandchild, false, events);
}
}
}
/**
* Removes the child priority but unlike {@link #removeChild}, leaves its branch unaffected.
*/
final void removeChildBranch(DefaultStream child) {
if (children.remove(child.id()) != null) {
child.parent = null;
totalChildWeights -= child.weight();
notifyParentChanged(events);
}
}
}
@ -542,6 +533,51 @@ public class DefaultHttp2Connection implements Http2Connection {
return new IntObjectHashMap<DefaultStream>(4);
}
/**
* Allows a correlation to be made between a stream and its old parent before a parent change occurs
*/
private final class ParentChangedEvent {
private Http2Stream stream;
private Http2Stream oldParent;
/**
* Create a new instance
* @param stream The stream who has had a parent change
* @param oldParent The previous parent
*/
public ParentChangedEvent(Http2Stream stream, Http2Stream oldParent) {
this.stream = stream;
this.oldParent = oldParent;
}
/**
* Notify all listeners of the tree change event
* @param l The listener to notify
*/
public void notifyListener(Listener l) {
l.priorityTreeParentChanged(stream, oldParent);
}
}
/**
* Notify all listeners of the priority tree change events (in ascending order)
* @param events The events (top down order) which have changed
*/
private void notifyParentChanged(List<ParentChangedEvent> events) {
for (int i = 0; i < events.size(); ++i) {
ParentChangedEvent event = events.get(i);
for (Listener l : listeners) {
event.notifyListener(l);
}
}
}
private void notifyParentChanging(Http2Stream stream, Http2Stream newParent) {
for (Listener l : listeners) {
l.priorityTreeParentChanging(stream, newParent);
}
}
/**
* Stream class representing the connection, itself.
*/
@ -619,7 +655,7 @@ public class DefaultHttp2Connection implements Http2Connection {
public int nextStreamId() {
// For manually created client-side streams, 1 is reserved for HTTP upgrade, so
// start at 3.
return nextStreamId > 1? nextStreamId : nextStreamId + 2;
return nextStreamId > 1 ? nextStreamId : nextStreamId + 2;
}
@Override
@ -661,8 +697,7 @@ public class DefaultHttp2Connection implements Http2Connection {
}
@Override
public DefaultStream reservePushStream(int streamId, Http2Stream parent)
throws Http2Exception {
public DefaultStream reservePushStream(int streamId, Http2Stream parent) throws Http2Exception {
if (parent == null) {
throw protocolError("Parent stream missing");
}
@ -689,12 +724,15 @@ public class DefaultHttp2Connection implements Http2Connection {
private void addStream(DefaultStream stream) {
// Add the stream to the map and priority tree.
streamMap.put(stream.id(), stream);
connectionStream.addChild(stream, false);
List<ParentChangedEvent> events = new ArrayList<ParentChangedEvent>(1);
connectionStream.takeChild(stream, false, events);
// Notify the observers of the event.
for (Listener listener : listeners) {
listener.streamAdded(stream);
}
notifyParentChanged(events);
}
@Override
@ -732,7 +770,7 @@ public class DefaultHttp2Connection implements Http2Connection {
@Override
public int lastKnownStream() {
return isGoAwayReceived()? lastKnownStream : lastStreamCreated;
return isGoAwayReceived() ? lastKnownStream : lastStreamCreated;
}
@Override
@ -775,12 +813,11 @@ public class DefaultHttp2Connection implements Http2Connection {
throw protocolError("No more streams can be created on this connection");
}
if (streamId < nextStreamId) {
throw protocolError("Request stream %d is behind the next expected stream %d",
streamId, nextStreamId);
throw protocolError("Request stream %d is behind the next expected stream %d", streamId, nextStreamId);
}
if (!createdStreamId(streamId)) {
throw protocolError("Request stream %d is not correct for %s connection", streamId,
server ? "server" : "client");
throw protocolError("Request stream %d is not correct for %s connection", streamId, server ? "server"
: "client");
}
}

View File

@ -39,8 +39,8 @@ import static java.lang.Math.*;
public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowController {
/**
* A comparators that sorts priority nodes in ascending order by the amount of priority data
* available for its subtree.
* A comparators that sorts priority nodes in ascending order by the amount of priority data available for its
* subtree.
*/
private static final Comparator<Http2Stream> DATA_WEIGHT = new Comparator<Http2Stream>() {
private static final int MAX_DATA_THRESHOLD = Integer.MAX_VALUE / 256;
@ -76,8 +76,7 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
this.frameWriter = frameWriter;
// Add a flow state for the connection.
connection.connectionStream().outboundFlow(
new OutboundFlowState(connection.connectionStream()));
connection.connectionStream().outboundFlow(new OutboundFlowState(connection.connectionStream()));
// Register for notification of new streams.
connection.addListener(new Http2ConnectionAdapter() {
@ -104,19 +103,19 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
}
@Override
public void streamPriorityChanged(Http2Stream stream, Http2Stream previousParent) {
if (stream.parent() != previousParent) {
// The parent changed, move the priority bytes to the new parent.
int priorityBytes = state(stream).priorityBytes();
state(previousParent).incrementPriorityBytes(-priorityBytes);
state(stream.parent()).incrementPriorityBytes(priorityBytes);
public void priorityTreeParentChanged(Http2Stream stream, Http2Stream oldParent) {
Http2Stream parent = stream.parent();
if (parent != null) {
state(parent).incrementPriorityBytes(state(stream).priorityBytes());
}
}
@Override
public void streamPrioritySubtreeChanged(Http2Stream stream, Http2Stream subtreeRoot) {
// Reset the priority bytes for the entire subtree.
resetSubtree(subtreeRoot);
public void priorityTreeParentChanging(Http2Stream stream, Http2Stream newParent) {
Http2Stream parent = stream.parent();
if (parent != null) {
state(parent).incrementPriorityBytes(-state(stream).priorityBytes());
}
}
});
}
@ -216,8 +215,8 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
}
/**
* Attempts to get the {@link OutboundFlowState} for the given stream. If not available, raises
* a {@code PROTOCOL_ERROR}.
* Attempts to get the {@link OutboundFlowState} for the given stream. If not available, raises a
* {@code PROTOCOL_ERROR}.
*/
private OutboundFlowState stateOrFail(int streamId) throws Http2Exception {
OutboundFlowState state = state(streamId);
@ -243,26 +242,6 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
}
}
/**
* Resets the priority bytes for the given subtree following a restructuring of the priority
* tree.
*/
private void resetSubtree(Http2Stream subtree) {
// Reset the state priority bytes for this node to its pending bytes and propagate the
// delta required for this change up the tree. It's important to note that the total number
// of priority bytes for this subtree hasn't changed. As we traverse the subtree we will
// subtract off values from the parent of this tree, but we'll add them back later as we
// traverse the rest of the subtree.
OutboundFlowState state = state(subtree);
int delta = state.pendingBytes - state.priorityBytes;
state.incrementPriorityBytes(delta);
// Now recurse this operation for each child.
for (Http2Stream child : subtree.children()) {
resetSubtree(child);
}
}
/**
* Writes as many pending bytes as possible, according to stream priority.
*/
@ -279,10 +258,11 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
}
/**
* Recursively traverses the priority tree rooted at the given node. Attempts to write the
* allowed bytes for the streams in this sub tree based on their weighted priorities.
* Recursively traverses the priority tree rooted at the given node. Attempts to write the allowed bytes for the
* streams in this sub tree based on their weighted priorities.
*
* @param allowance an allowed number of bytes that may be written to the streams in this subtree
* @param allowance
* an allowed number of bytes that may be written to the streams in this subtree
*/
private void writeAllowedBytes(Http2Stream stream, int allowance) throws Http2Exception {
// Write the allowed bytes for this node. If not all of the allowance was used,
@ -353,8 +333,7 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
int weight = next.weight();
// Determine the value (in bytes) of a single unit of weight.
double dataToWeightRatio =
min(unallocatedBytes, remainingWindow) / (double) remainingWeight;
double dataToWeightRatio = min(unallocatedBytes, remainingWindow) / (double) remainingWeight;
unallocatedBytes -= nextState.unallocatedPriorityBytes();
remainingWeight -= weight;
@ -397,7 +376,7 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
/**
* The outbound flow control state for a single stream.
*/
private final class OutboundFlowState implements FlowState {
final class OutboundFlowState implements FlowState {
private final Queue<Frame> pendingWriteQueue;
private final Http2Stream stream;
private int window = initialWindowSize;
@ -405,7 +384,7 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
private int priorityBytes;
private int allocatedPriorityBytes;
OutboundFlowState(Http2Stream stream) {
private OutboundFlowState(Http2Stream stream) {
this.stream = stream;
pendingWriteQueue = new ArrayDeque<Frame>(2);
}
@ -416,13 +395,12 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
}
/**
* Increments the flow control window for this stream by the given delta and returns the new
* value.
* Increments the flow control window for this stream by the given delta and returns the new value.
*/
int incrementStreamWindow(int delta) throws Http2Exception {
private int incrementStreamWindow(int delta) throws Http2Exception {
if (delta > 0 && Integer.MAX_VALUE - delta < window) {
throw new Http2StreamException(stream.id(), FLOW_CONTROL_ERROR,
"Window size overflow for stream: " + stream.id());
throw new Http2StreamException(stream.id(), FLOW_CONTROL_ERROR, "Window size overflow for stream: "
+ stream.id());
}
int previouslyStreamable = streamableBytes();
window += delta;
@ -441,11 +419,10 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
}
/**
* Returns the number of pending bytes for this node that will fit within the
* {@link #window}. This is used for the priority algorithm to determine the aggregate total
* for {@link #priorityBytes} at each node. Each node only takes into account it's stream
* window so that when a change occurs to the connection window, these values need not
* change (i.e. no tree traversal is required).
* Returns the number of pending bytes for this node that will fit within the {@link #window}. This is used for
* the priority algorithm to determine the aggregate total for {@link #priorityBytes} at each node. Each node
* only takes into account it's stream window so that when a change occurs to the connection window, these
* values need not change (i.e. no tree traversal is required).
*/
int streamableBytes() {
return max(0, min(pendingBytes, window));
@ -461,30 +438,28 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
/**
* Used by the priority algorithm to allocate bytes to this stream.
*/
void allocatePriorityBytes(int bytes) {
private void allocatePriorityBytes(int bytes) {
allocatedPriorityBytes += bytes;
}
/**
* Used by the priority algorithm to get the intermediate allocation of bytes to this
* stream.
* Used by the priority algorithm to get the intermediate allocation of bytes to this stream.
*/
int allocatedPriorityBytes() {
return allocatedPriorityBytes;
}
/**
* Used by the priority algorithm to determine the number of writable bytes that have not
* yet been allocated.
* Used by the priority algorithm to determine the number of writable bytes that have not yet been allocated.
*/
int unallocatedPriorityBytes() {
private int unallocatedPriorityBytes() {
return priorityBytes - allocatedPriorityBytes;
}
/**
* Creates a new frame with the given values but does not add it to the pending queue.
*/
Frame newFrame(ChannelHandlerContext ctx, ChannelPromise promise, ByteBuf data,
private Frame newFrame(ChannelHandlerContext ctx, ChannelPromise promise, ByteBuf data,
int padding, boolean endStream) {
return new Frame(ctx, new ChannelPromiseAggregator(promise), data, padding, endStream);
}
@ -497,8 +472,7 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
}
/**
* Returns the the head of the pending queue, or {@code null} if empty or the current window
* size is zero.
* Returns the the head of the pending queue, or {@code null} if empty or the current window size is zero.
*/
Frame peek() {
if (window > 0) {
@ -510,23 +484,22 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
/**
* Clears the pending queue and writes errors for each remaining frame.
*/
void clear() {
private void clear() {
for (;;) {
Frame frame = pendingWriteQueue.poll();
if (frame == null) {
break;
}
frame.writeError(format(STREAM_CLOSED,
"Stream closed before write could take place"));
frame.writeError(format(STREAM_CLOSED, "Stream closed before write could take place"));
}
}
/**
* Writes up to the number of bytes from the pending queue. May write less if limited by the
* writable window, by the number of pending writes available, or because a frame does not
* support splitting on arbitrary boundaries.
* Writes up to the number of bytes from the pending queue. May write less if limited by the writable window, by
* the number of pending writes available, or because a frame does not support splitting on arbitrary
* boundaries.
*/
int writeBytes(int bytes) throws Http2Exception {
private int writeBytes(int bytes) throws Http2Exception {
int bytesWritten = 0;
if (!stream.localSideOpen()) {
return bytesWritten;
@ -553,8 +526,7 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
}
/**
* Recursively increments the priority bytes for this branch in the priority tree starting
* at the current node.
* Recursively increments the priority bytes for this branch in the priority tree starting at the current node.
*/
private void incrementPriorityBytes(int numBytes) {
if (numBytes != 0) {
@ -604,9 +576,9 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
}
/**
* Increments the number of pending bytes for this node. If there was any change to the
* number of bytes that fit into the stream window, then {@link #incrementPriorityBytes} to
* recursively update this branch of the priority tree.
* Increments the number of pending bytes for this node. If there was any change to the number of bytes that
* fit into the stream window, then {@link #incrementPriorityBytes} to recursively update this branch of the
* priority tree.
*/
private void incrementPendingBytes(int numBytes) {
int previouslyStreamable = streamableBytes();
@ -646,8 +618,8 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
}
/**
* Discards this frame, writing an error. If this frame is in the pending queue, the
* unwritten bytes are removed from this branch of the priority tree.
* Discards this frame, writing an error. If this frame is in the pending queue, the unwritten bytes are
* removed from this branch of the priority tree.
*/
void writeError(Http2Exception cause) {
decrementPendingBytes(data.readableBytes());
@ -656,12 +628,13 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
}
/**
* Creates a new frame that is a view of this frame's data buffer starting at the
* current read index with the given number of bytes. The reader index on the input
* frame is then advanced by the number of bytes. The returned frame will not have
* end-of-stream set and it will not be automatically placed in the pending queue.
* Creates a new frame that is a view of this frame's data buffer starting at the current read index with
* the given number of bytes. The reader index on the input frame is then advanced by the number of bytes.
* The returned frame will not have end-of-stream set and it will not be automatically placed in the pending
* queue.
*
* @param maxBytes the maximum number of bytes that is allowed in the created frame.
* @param maxBytes
* the maximum number of bytes that is allowed in the created frame.
* @return the partial frame.
*/
Frame split(int maxBytes) {
@ -673,8 +646,7 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
}
/**
* If this frame is in the pending queue, decrements the number of pending bytes for the
* stream.
* If this frame is in the pending queue, decrements the number of pending bytes for the stream.
*/
void decrementPendingBytes(int bytes) {
if (enqueued) {

View File

@ -57,33 +57,30 @@ public interface Http2Connection {
void streamRemoved(Http2Stream stream);
/**
* Notifies the listener that the priority for the stream has changed. The parent of the
* stream may have changed, so the previous parent is also provided.
* <p>
* Either this method or {@link #streamPrioritySubtreeChanged} will be called, but not both
* for a single change. This method is called for simple priority changes. If a priority
* change causes a circular dependency between the stream and one of its descendants, the
* subtree must be restructured causing {@link #streamPrioritySubtreeChanged} instead.
*
* @param stream the stream for which the priority has changed.
* @param previousParent the previous parent of the stream. May be the same as its current
* parent if unchanged.
* Notifies the listener that a priority tree parent change has occurred. This method will be invoked
* in a top down order relative to the priority tree. This method will also be invoked after all tree
* structure changes have been made and the tree is in steady state relative to the priority change
* which caused the tree structure to change.
* @param stream The stream which had a parent change (new parent and children will be steady state)
* @param oldParent The old parent which {@code stream} used to be a child of (may be {@code null})
*/
void streamPriorityChanged(Http2Stream stream, Http2Stream previousParent);
void priorityTreeParentChanged(Http2Stream stream, Http2Stream oldParent);
/**
* Called when a priority change for a stream creates a circular dependency between the
* stream and one of its descendants. This requires a restructuring of the priority tree.
* <p>
* Either this method or {@link #streamPriorityChanged} will be called, but not both for a
* single change. For simple changes that do not cause the tree to be restructured,
* {@link #streamPriorityChanged} will be called instead.
*
* @param stream the stream for which the priority has changed, causing the tree to be
* restructured.
* @param subtreeRoot the new root of the subtree that has changed.
* Notifies the listener that a parent dependency is about to change
* This is called while the tree is being restructured and so the tree
* structure is not necessarily steady state.
* @param stream The stream which the parent is about to change to {@code newParent}
* @param newParent The stream which will be the parent of {@code stream}
*/
void streamPrioritySubtreeChanged(Http2Stream stream, Http2Stream subtreeRoot);
void priorityTreeParentChanging(Http2Stream stream, Http2Stream newParent);
/**
* Notifies the listener that the weight has changed for {@code stream}
* @param stream The stream which the weight has changed
* @param oldWeight The old weight for {@code stream}
*/
void onWeightChanged(Http2Stream stream, short oldWeight);
/**
* Called when a GO_AWAY frame has either been sent or received for the connection.

View File

@ -39,15 +39,19 @@ public class Http2ConnectionAdapter implements Http2Connection.Listener {
public void streamRemoved(Http2Stream stream) {
}
@Override
public void streamPriorityChanged(Http2Stream stream, Http2Stream previousParent) {
}
@Override
public void streamPrioritySubtreeChanged(Http2Stream stream, Http2Stream subtreeRoot) {
}
@Override
public void goingAway() {
}
@Override
public void priorityTreeParentChanged(Http2Stream stream, Http2Stream oldParent) {
}
@Override
public void priorityTreeParentChanging(Http2Stream stream, Http2Stream newParent) {
}
@Override
public void onWeightChanged(Http2Stream stream, short oldWeight) {
}
}

View File

@ -20,10 +20,24 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyShort;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.isNull;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import io.netty.handler.codec.http2.Http2Stream.State;
import java.util.Arrays;
import java.util.List;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
/**
@ -34,12 +48,16 @@ public class DefaultHttp2ConnectionTest {
private DefaultHttp2Connection server;
private DefaultHttp2Connection client;
@Mock
private Http2Connection.Listener clientListener;
@Before
public void setup() {
MockitoAnnotations.initMocks(this);
server = new DefaultHttp2Connection(true);
client = new DefaultHttp2Connection(false);
client.addListener(clientListener);
}
@Test(expected = Http2Exception.class)
@ -258,6 +276,31 @@ public class DefaultHttp2ConnectionTest {
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
}
@Test
public void weightChangeWithNoTreeChangeShouldNotifyListeners() throws Http2Exception {
Http2Stream streamA = client.local().createStream(1, false);
Http2Stream streamB = client.local().createStream(3, false);
Http2Stream streamC = client.local().createStream(5, false);
Http2Stream streamD = client.local().createStream(7, false);
streamB.setPriority(streamA.id(), DEFAULT_PRIORITY_WEIGHT, false);
streamC.setPriority(streamA.id(), DEFAULT_PRIORITY_WEIGHT, false);
streamD.setPriority(streamA.id(), DEFAULT_PRIORITY_WEIGHT, true);
assertEquals(4, client.numActiveStreams());
short oldWeight = streamD.weight();
short newWeight = (short) (oldWeight + 1);
reset(clientListener);
streamD.setPriority(streamD.parent().id(), newWeight, false);
verify(clientListener).onWeightChanged(eq(streamD), eq(oldWeight));
Assert.assertEquals(streamD.weight(), newWeight);
verify(clientListener, never()).priorityTreeParentChanging(any(Http2Stream.class),
any(Http2Stream.class));
verify(clientListener, never()).priorityTreeParentChanged(any(Http2Stream.class),
any(Http2Stream.class));
}
@Test
public void removeShouldRestructureTree() throws Exception {
Http2Stream streamA = client.local().createStream(1, false);
@ -299,24 +342,55 @@ public class DefaultHttp2ConnectionTest {
@Test
public void circularDependencyShouldRestructureTree() throws Exception {
// Using example from http://tools.ietf.org/html/draft-ietf-httpbis-http2-12#section-5.3.3
// Using example from http://tools.ietf.org/html/draft-ietf-httpbis-http2-14#section-5.3.3
// Initialize all the nodes
Http2Stream streamA = client.local().createStream(1, false);
verifyParentChanged(streamA, null);
Http2Stream streamB = client.local().createStream(3, false);
verifyParentChanged(streamB, null);
Http2Stream streamC = client.local().createStream(5, false);
verifyParentChanged(streamC, null);
Http2Stream streamD = client.local().createStream(7, false);
verifyParentChanged(streamD, null);
Http2Stream streamE = client.local().createStream(9, false);
verifyParentChanged(streamE, null);
Http2Stream streamF = client.local().createStream(11, false);
verifyParentChanged(streamF, null);
// Build the tree
streamB.setPriority(streamA.id(), DEFAULT_PRIORITY_WEIGHT, false);
verify(clientListener, never()).onWeightChanged(eq(streamB), anyShort());
verifyParentChanged(streamB, client.connectionStream());
verifyParentChanging(streamB, client.connectionStream());
streamC.setPriority(streamA.id(), DEFAULT_PRIORITY_WEIGHT, false);
verify(clientListener, never()).onWeightChanged(eq(streamC), anyShort());
verifyParentChanged(streamC, client.connectionStream());
verifyParentChanging(streamC, client.connectionStream());
streamD.setPriority(streamC.id(), DEFAULT_PRIORITY_WEIGHT, false);
verify(clientListener, never()).onWeightChanged(eq(streamD), anyShort());
verifyParentChanged(streamD, client.connectionStream());
verifyParentChanging(streamD, client.connectionStream());
streamE.setPriority(streamC.id(), DEFAULT_PRIORITY_WEIGHT, false);
verify(clientListener, never()).onWeightChanged(eq(streamE), anyShort());
verifyParentChanged(streamE, client.connectionStream());
verifyParentChanging(streamE, client.connectionStream());
streamF.setPriority(streamD.id(), DEFAULT_PRIORITY_WEIGHT, false);
verify(clientListener, never()).onWeightChanged(eq(streamF), anyShort());
verifyParentChanged(streamF, client.connectionStream());
verifyParentChanging(streamF, client.connectionStream());
assertEquals(6, client.numActiveStreams());
// Non-exclusive re-prioritization of a->d.
reset(clientListener);
streamA.setPriority(streamD.id(), DEFAULT_PRIORITY_WEIGHT, false);
verify(clientListener, never()).onWeightChanged(eq(streamA), anyShort());
verifyParentChanging(Arrays.asList(streamD, streamA), Arrays.asList(client.connectionStream(), streamD));
verifyParentsChanged(Arrays.asList(streamD, streamA), Arrays.asList(streamC, client.connectionStream()));
// Level 0
Http2Stream p = client.connectionStream();
@ -358,26 +432,57 @@ public class DefaultHttp2ConnectionTest {
@Test
public void circularDependencyWithExclusiveShouldRestructureTree() throws Exception {
// Using example from http://tools.ietf.org/html/draft-ietf-httpbis-http2-12#section-5.3.3
// Although the expected output for the exclusive case has an error in the document. The
// final dependency of C should be E (not F). This is fixed here.
// Using example from http://tools.ietf.org/html/draft-ietf-httpbis-http2-14#section-5.3.3
// Initialize all the nodes
Http2Stream streamA = client.local().createStream(1, false);
verifyParentChanged(streamA, null);
Http2Stream streamB = client.local().createStream(3, false);
verifyParentChanged(streamB, null);
Http2Stream streamC = client.local().createStream(5, false);
verifyParentChanged(streamC, null);
Http2Stream streamD = client.local().createStream(7, false);
verifyParentChanged(streamD, null);
Http2Stream streamE = client.local().createStream(9, false);
verifyParentChanged(streamE, null);
Http2Stream streamF = client.local().createStream(11, false);
verifyParentChanged(streamF, null);
// Build the tree
streamB.setPriority(streamA.id(), DEFAULT_PRIORITY_WEIGHT, false);
verify(clientListener, never()).onWeightChanged(eq(streamB), anyShort());
verifyParentChanged(streamB, client.connectionStream());
verifyParentChanging(streamB, client.connectionStream());
streamC.setPriority(streamA.id(), DEFAULT_PRIORITY_WEIGHT, false);
verify(clientListener, never()).onWeightChanged(eq(streamC), anyShort());
verifyParentChanged(streamC, client.connectionStream());
verifyParentChanging(streamC, client.connectionStream());
streamD.setPriority(streamC.id(), DEFAULT_PRIORITY_WEIGHT, false);
verify(clientListener, never()).onWeightChanged(eq(streamD), anyShort());
verifyParentChanged(streamD, client.connectionStream());
verifyParentChanging(streamD, client.connectionStream());
streamE.setPriority(streamC.id(), DEFAULT_PRIORITY_WEIGHT, false);
verify(clientListener, never()).onWeightChanged(eq(streamE), anyShort());
verifyParentChanged(streamE, client.connectionStream());
verifyParentChanging(streamE, client.connectionStream());
streamF.setPriority(streamD.id(), DEFAULT_PRIORITY_WEIGHT, false);
verify(clientListener, never()).onWeightChanged(eq(streamF), anyShort());
verifyParentChanged(streamF, client.connectionStream());
verifyParentChanging(streamF, client.connectionStream());
assertEquals(6, client.numActiveStreams());
// Exclusive re-prioritization of a->d.
reset(clientListener);
streamA.setPriority(streamD.id(), DEFAULT_PRIORITY_WEIGHT, true);
verify(clientListener, never()).onWeightChanged(eq(streamA), anyShort());
verifyParentChanging(Arrays.asList(streamD, streamA, streamF),
Arrays.asList(client.connectionStream(), streamD, streamA));
verifyParentsChanged(Arrays.asList(streamD, streamA, streamF),
Arrays.asList(streamC, client.connectionStream(), streamD));
// Level 0
Http2Stream p = client.connectionStream();
@ -416,4 +521,49 @@ public class DefaultHttp2ConnectionTest {
assertEquals(0, p.numChildren());
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
}
private void verifyParentChanging(List<Http2Stream> expectedArg1, List<Http2Stream> expectedArg2) {
Assert.assertTrue(expectedArg1.size() == expectedArg2.size());
ArgumentCaptor<Http2Stream> arg1Captor = ArgumentCaptor.forClass(Http2Stream.class);
ArgumentCaptor<Http2Stream> arg2Captor = ArgumentCaptor.forClass(Http2Stream.class);
verify(clientListener, times(expectedArg1.size())).priorityTreeParentChanging(arg1Captor.capture(),
arg2Captor.capture());
List<Http2Stream> capturedArg1 = arg1Captor.getAllValues();
List<Http2Stream> capturedArg2 = arg2Captor.getAllValues();
Assert.assertTrue(capturedArg1.size() == capturedArg2.size());
Assert.assertTrue(capturedArg1.size() == expectedArg1.size());
for (int i = 0; i < capturedArg1.size(); ++i) {
Assert.assertEquals(expectedArg1.get(i), capturedArg1.get(i));
Assert.assertEquals(expectedArg2.get(i), capturedArg2.get(i));
}
}
private void verifyParentsChanged(List<Http2Stream> expectedArg1, List<Http2Stream> expectedArg2) {
Assert.assertTrue(expectedArg1.size() == expectedArg2.size());
ArgumentCaptor<Http2Stream> arg1Captor = ArgumentCaptor.forClass(Http2Stream.class);
ArgumentCaptor<Http2Stream> arg2Captor = ArgumentCaptor.forClass(Http2Stream.class);
verify(clientListener, times(expectedArg1.size())).priorityTreeParentChanged(arg1Captor.capture(),
arg2Captor.capture());
List<Http2Stream> capturedArg1 = arg1Captor.getAllValues();
List<Http2Stream> capturedArg2 = arg2Captor.getAllValues();
Assert.assertTrue(capturedArg1.size() == capturedArg2.size());
Assert.assertTrue(capturedArg1.size() == expectedArg1.size());
for (int i = 0; i < capturedArg1.size(); ++i) {
Assert.assertEquals(expectedArg1.get(i), capturedArg1.get(i));
Assert.assertEquals(expectedArg2.get(i), capturedArg2.get(i));
}
}
@SuppressWarnings("unchecked")
private static <T> T streamEq(T stream) {
return (T) (stream == null ? isNull(Http2Stream.class) : eq(stream));
}
private void verifyParentChanging(Http2Stream stream, Http2Stream newParent) {
verify(clientListener).priorityTreeParentChanging(streamEq(stream), streamEq(newParent));
}
private void verifyParentChanged(Http2Stream stream, Http2Stream oldParent) {
verify(clientListener).priorityTreeParentChanged(streamEq(stream), streamEq(oldParent));
}
}

View File

@ -31,6 +31,12 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http2.DefaultHttp2OutboundFlowController.OutboundFlowState;
import io.netty.util.collection.IntObjectHashMap;
import io.netty.util.collection.IntObjectMap;
import java.util.Arrays;
import java.util.List;
import org.junit.Before;
import org.junit.Test;
@ -46,6 +52,7 @@ public class DefaultHttp2OutboundFlowControllerTest {
private static final int STREAM_B = 3;
private static final int STREAM_C = 5;
private static final int STREAM_D = 7;
private static final int STREAM_E = 9;
private DefaultHttp2OutboundFlowController controller;
@ -193,8 +200,7 @@ public class DefaultHttp2OutboundFlowControllerTest {
@Test
public void connectionWindowUpdateShouldSendPartialFrame() throws Http2Exception {
// Set the connection window size to zero.
controller
.updateOutboundWindowSize(CONNECTION_STREAM_ID, -DEFAULT_WINDOW_SIZE);
controller.updateOutboundWindowSize(CONNECTION_STREAM_ID, -DEFAULT_WINDOW_SIZE);
ByteBuf data = dummyData(10);
send(STREAM_A, data);
@ -252,8 +258,8 @@ public class DefaultHttp2OutboundFlowControllerTest {
}
/**
* In this test, we block A which allows bytes to be written by C and D. Here's a view of the
* tree (stream A is blocked).
* In this test, we block A which allows bytes to be written by C and D. Here's a view of the tree (stream A is
* blocked).
*
* <pre>
* 0
@ -303,8 +309,8 @@ public class DefaultHttp2OutboundFlowControllerTest {
}
/**
* In this test, we block B which allows all bytes to be written by A. A should not share the
* data with its children since it's not blocked.
* In this test, we block B which allows all bytes to be written by A. A should not share the data with its children
* since it's not blocked.
*
* <pre>
* 0
@ -345,8 +351,8 @@ public class DefaultHttp2OutboundFlowControllerTest {
}
/**
* In this test, we block B which allows all bytes to be written by A. Once A is blocked, it
* will spill over the remaining of its portion to its children.
* In this test, we block B which allows all bytes to be written by A. Once A is blocked, it will spill over the
* remaining of its portion to its children.
*
* <pre>
* 0
@ -405,8 +411,8 @@ public class DefaultHttp2OutboundFlowControllerTest {
* C D
* </pre>
*
* We then re-prioritize D so that it's directly off of the connection and verify that A and D
* split the written bytes between them.
* We then re-prioritize D so that it's directly off of the connection and verify that A and D split the written
* bytes between them.
*
* <pre>
* 0
@ -452,8 +458,8 @@ public class DefaultHttp2OutboundFlowControllerTest {
}
/**
* In this test, we root all streams at the connection, and then verify that data is split
* appropriately based on weight (all available data is the same).
* In this test, we root all streams at the connection, and then verify that data is split appropriately based on
* weight (all available data is the same).
*
* <pre>
* 0
@ -515,8 +521,8 @@ public class DefaultHttp2OutboundFlowControllerTest {
}
/**
* In this test, we root all streams at the connection, and then verify that data is split
* equally among the stream, since they all have the same weight.
* In this test, we root all streams at the connection, and then verify that data is split equally among the stream,
* since they all have the same weight.
*
* <pre>
* 0
@ -567,6 +573,277 @@ public class DefaultHttp2OutboundFlowControllerTest {
assertEquals(333, dWritten);
}
/**
* In this test, we block all streams and verify the priority bytes for each sub tree at each node are correct
*
* <pre>
* [0]
* / \
* A B
* / \
* C D
* </pre>
*/
@Test
public void subTreeBytesShouldBeCorrect() throws Http2Exception {
// Block the connection
controller.updateOutboundWindowSize(CONNECTION_STREAM_ID, -DEFAULT_WINDOW_SIZE);
Http2Stream stream0 = connection.connectionStream();
Http2Stream streamA = connection.stream(STREAM_A);
Http2Stream streamB = connection.stream(STREAM_B);
Http2Stream streamC = connection.stream(STREAM_C);
Http2Stream streamD = connection.stream(STREAM_D);
// Send a bunch of data on each stream.
IntObjectMap<Integer> streamSizes = new IntObjectHashMap<Integer>(4);
streamSizes.put(STREAM_A, 400);
streamSizes.put(STREAM_B, 500);
streamSizes.put(STREAM_C, 600);
streamSizes.put(STREAM_D, 700);
send(STREAM_A, dummyData(streamSizes.get(STREAM_A)));
send(STREAM_B, dummyData(streamSizes.get(STREAM_B)));
send(STREAM_C, dummyData(streamSizes.get(STREAM_C)));
send(STREAM_D, dummyData(streamSizes.get(STREAM_D)));
verifyNoWrite(STREAM_A);
verifyNoWrite(STREAM_B);
verifyNoWrite(STREAM_C);
verifyNoWrite(STREAM_D);
OutboundFlowState state = state(stream0);
assertEquals(calculateStreamSizeSum(streamSizes,
Arrays.asList(STREAM_A, STREAM_B, STREAM_C, STREAM_D)), state.priorityBytes());
state = state(streamA);
assertEquals(calculateStreamSizeSum(streamSizes,
Arrays.asList(STREAM_A, STREAM_C, STREAM_D)), state.priorityBytes());
state = state(streamB);
assertEquals(calculateStreamSizeSum(streamSizes,
Arrays.asList(STREAM_B)), state.priorityBytes());
state = state(streamC);
assertEquals(calculateStreamSizeSum(streamSizes,
Arrays.asList(STREAM_C)), state.priorityBytes());
state = state(streamD);
assertEquals(calculateStreamSizeSum(streamSizes,
Arrays.asList(STREAM_D)), state.priorityBytes());
}
/**
* In this test, we block all streams shift the priority tree and verify priority bytes for each subtree are correct
*
* <pre>
* [0]
* / \
* A B
* / \
* C D
* </pre>
*
* After the tree shift:
* <pre>
* [0]
* |
* A
* |
* B
* / \
* C D
* </pre>
*/
@Test
public void subTreeBytesShouldBeCorrectWithRestructure() throws Http2Exception {
// Block the connection
controller.updateOutboundWindowSize(CONNECTION_STREAM_ID, -DEFAULT_WINDOW_SIZE);
Http2Stream stream0 = connection.connectionStream();
Http2Stream streamA = connection.stream(STREAM_A);
Http2Stream streamB = connection.stream(STREAM_B);
Http2Stream streamC = connection.stream(STREAM_C);
Http2Stream streamD = connection.stream(STREAM_D);
// Send a bunch of data on each stream.
IntObjectMap<Integer> streamSizes = new IntObjectHashMap<Integer>(4);
streamSizes.put(STREAM_A, 400);
streamSizes.put(STREAM_B, 500);
streamSizes.put(STREAM_C, 600);
streamSizes.put(STREAM_D, 700);
send(STREAM_A, dummyData(streamSizes.get(STREAM_A)));
send(STREAM_B, dummyData(streamSizes.get(STREAM_B)));
send(STREAM_C, dummyData(streamSizes.get(STREAM_C)));
send(STREAM_D, dummyData(streamSizes.get(STREAM_D)));
verifyNoWrite(STREAM_A);
verifyNoWrite(STREAM_B);
verifyNoWrite(STREAM_C);
verifyNoWrite(STREAM_D);
streamB.setPriority(STREAM_A, DEFAULT_PRIORITY_WEIGHT, true);
OutboundFlowState state = state(stream0);
assertEquals(calculateStreamSizeSum(streamSizes,
Arrays.asList(STREAM_A, STREAM_B, STREAM_C, STREAM_D)), state.priorityBytes());
state = state(streamA);
assertEquals(calculateStreamSizeSum(streamSizes,
Arrays.asList(STREAM_A, STREAM_B, STREAM_C, STREAM_D)), state.priorityBytes());
state = state(streamB);
assertEquals(calculateStreamSizeSum(streamSizes,
Arrays.asList(STREAM_B, STREAM_C, STREAM_D)), state.priorityBytes());
state = state(streamC);
assertEquals(calculateStreamSizeSum(streamSizes,
Arrays.asList(STREAM_C)), state.priorityBytes());
state = state(streamD);
assertEquals(calculateStreamSizeSum(streamSizes,
Arrays.asList(STREAM_D)), state.priorityBytes());
}
/**
* In this test, we block all streams and add a node to the priority tree and verify
*
* <pre>
* [0]
* / \
* A B
* / \
* C D
* </pre>
*
* After the tree shift:
* <pre>
* [0]
* / \
* A B
* |
* E
* / \
* C D
* </pre>
*/
@Test
public void subTreeBytesShouldBeCorrectWithAddition() throws Http2Exception {
// Block the connection
controller.updateOutboundWindowSize(CONNECTION_STREAM_ID, -DEFAULT_WINDOW_SIZE);
Http2Stream stream0 = connection.connectionStream();
Http2Stream streamA = connection.stream(STREAM_A);
Http2Stream streamB = connection.stream(STREAM_B);
Http2Stream streamC = connection.stream(STREAM_C);
Http2Stream streamD = connection.stream(STREAM_D);
Http2Stream streamE = connection.local().createStream(STREAM_E, false);
streamE.setPriority(STREAM_A, DEFAULT_PRIORITY_WEIGHT, true);
// Send a bunch of data on each stream.
IntObjectMap<Integer> streamSizes = new IntObjectHashMap<Integer>(4);
streamSizes.put(STREAM_A, 400);
streamSizes.put(STREAM_B, 500);
streamSizes.put(STREAM_C, 600);
streamSizes.put(STREAM_D, 700);
streamSizes.put(STREAM_E, 900);
send(STREAM_A, dummyData(streamSizes.get(STREAM_A)));
send(STREAM_B, dummyData(streamSizes.get(STREAM_B)));
send(STREAM_C, dummyData(streamSizes.get(STREAM_C)));
send(STREAM_D, dummyData(streamSizes.get(STREAM_D)));
send(STREAM_E, dummyData(streamSizes.get(STREAM_E)));
verifyNoWrite(STREAM_A);
verifyNoWrite(STREAM_B);
verifyNoWrite(STREAM_C);
verifyNoWrite(STREAM_D);
verifyNoWrite(STREAM_E);
OutboundFlowState state = state(stream0);
assertEquals(calculateStreamSizeSum(streamSizes,
Arrays.asList(STREAM_A, STREAM_B, STREAM_C, STREAM_D, STREAM_E)), state.priorityBytes());
state = state(streamA);
assertEquals(calculateStreamSizeSum(streamSizes,
Arrays.asList(STREAM_A, STREAM_E, STREAM_C, STREAM_D)), state.priorityBytes());
state = state(streamB);
assertEquals(calculateStreamSizeSum(streamSizes,
Arrays.asList(STREAM_B)), state.priorityBytes());
state = state(streamC);
assertEquals(calculateStreamSizeSum(streamSizes,
Arrays.asList(STREAM_C)), state.priorityBytes());
state = state(streamD);
assertEquals(calculateStreamSizeSum(streamSizes,
Arrays.asList(STREAM_D)), state.priorityBytes());
state = state(streamE);
assertEquals(calculateStreamSizeSum(streamSizes,
Arrays.asList(STREAM_E, STREAM_C, STREAM_D)), state.priorityBytes());
}
/**
* In this test, we block all streams and remove a node from the priority tree and verify
*
* <pre>
* [0]
* / \
* A B
* / \
* C D
* </pre>
*
* After the tree shift:
* <pre>
* [0]
* / | \
* C D B
* </pre>
*/
@Test
public void subTreeBytesShouldBeCorrectWithRemoval() throws Http2Exception {
// Block the connection
controller.updateOutboundWindowSize(CONNECTION_STREAM_ID, -DEFAULT_WINDOW_SIZE);
Http2Stream stream0 = connection.connectionStream();
Http2Stream streamA = connection.stream(STREAM_A);
Http2Stream streamB = connection.stream(STREAM_B);
Http2Stream streamC = connection.stream(STREAM_C);
Http2Stream streamD = connection.stream(STREAM_D);
// Send a bunch of data on each stream.
IntObjectMap<Integer> streamSizes = new IntObjectHashMap<Integer>(4);
streamSizes.put(STREAM_A, 400);
streamSizes.put(STREAM_B, 500);
streamSizes.put(STREAM_C, 600);
streamSizes.put(STREAM_D, 700);
send(STREAM_A, dummyData(streamSizes.get(STREAM_A)));
send(STREAM_B, dummyData(streamSizes.get(STREAM_B)));
send(STREAM_C, dummyData(streamSizes.get(STREAM_C)));
send(STREAM_D, dummyData(streamSizes.get(STREAM_D)));
verifyNoWrite(STREAM_A);
verifyNoWrite(STREAM_B);
verifyNoWrite(STREAM_C);
verifyNoWrite(STREAM_D);
streamA.close();
OutboundFlowState state = state(stream0);
assertEquals(calculateStreamSizeSum(streamSizes,
Arrays.asList(STREAM_B, STREAM_C, STREAM_D)), state.priorityBytes());
state = state(streamA);
assertEquals(0, state.priorityBytes());
state = state(streamB);
assertEquals(calculateStreamSizeSum(streamSizes,
Arrays.asList(STREAM_B)), state.priorityBytes());
state = state(streamC);
assertEquals(calculateStreamSizeSum(streamSizes,
Arrays.asList(STREAM_C)), state.priorityBytes());
state = state(streamD);
assertEquals(calculateStreamSizeSum(streamSizes,
Arrays.asList(STREAM_D)), state.priorityBytes());
}
private static OutboundFlowState state(Http2Stream stream) {
return (OutboundFlowState) stream.outboundFlow();
}
private static int calculateStreamSizeSum(IntObjectMap<Integer> streamSizes, List<Integer> streamIds) {
int sum = 0;
for (int i = 0; i < streamIds.size(); ++i) {
Integer streamSize = streamSizes.get(streamIds.get(i));
if (streamSize != null) {
sum += streamSize;
}
}
return sum;
}
private void send(int streamId, ByteBuf data) throws Http2Exception {
controller.writeData(ctx, streamId, data, 0, false, promise);
}
@ -576,16 +853,15 @@ public class DefaultHttp2OutboundFlowControllerTest {
}
private void verifyNoWrite(int streamId) {
verify(frameWriter, never()).writeData(eq(ctx), eq(streamId), any(ByteBuf.class), anyInt(),
anyBoolean(), eq(promise));
verify(frameWriter, never()).writeData(eq(ctx), eq(streamId), any(ByteBuf.class), anyInt(), anyBoolean(),
eq(promise));
}
private void captureWrite(int streamId, ArgumentCaptor<ByteBuf> captor, boolean endStream) {
verify(frameWriter).writeData(eq(ctx), eq(streamId), captor.capture(), eq(0), eq(endStream), eq(promise));
}
private void setPriority(int stream, int parent, int weight, boolean exclusive)
throws Http2Exception {
private void setPriority(int stream, int parent, int weight, boolean exclusive) throws Http2Exception {
connection.stream(stream).setPriority(parent, (short) weight, exclusive);
}