Cleaning up HTTP/2 outbound flow control

Motivation:

The outbound flow controller currently has to walk the entire tree each
time to calculate the total available data for each subtree. For better
performance we should maintain a running total for each subtree as we
queue/write frames.

Modifications:

I've modified the DefaultHttp2OutboundFlowController to manage the state
of "priorityData" at each node in the priority tree, which is
essentially the total writable data for all streams in that subtree.
These totals do not take into account the connection window, as that is
applied when splitting the data across the streams at each level in the
tree.

The flow controller now sorts the children of a node by the product of
their data (for the subtree) and its weight. This is used since in
certain cases the algorithm might prefer nodes that appear later in the
list. Sorting helps keep nodes with similar characteristics (e.g. a lot
of data and a high priority) with similar output.

To help clean things up, I'm storing a FlowState for the root node as
well, which maintains the runnning total of the currently writable data
for all stream.

Another item of cleanup is that I created a GarbageCollector innerclass
within the outbound flow controller. This keeps all of the garbage
collection code in one place, away from the flow control code.

Also added some more unit tests for the flow controller.

Result:

The outbound flow controller is a bit cleaner and perhaps a bit more
fair when distributing outbound data across streams.
This commit is contained in:
nmittler 2014-05-07 19:37:07 -07:00 committed by Norman Maurer
parent 22a21cf54e
commit de0c416de0
5 changed files with 555 additions and 284 deletions

View File

@ -21,10 +21,15 @@ import static io.netty.handler.codec.http2.Http2Error.FLOW_CONTROL_ERROR;
import static io.netty.handler.codec.http2.Http2Error.STREAM_CLOSED;
import static io.netty.handler.codec.http2.Http2Exception.format;
import static io.netty.handler.codec.http2.Http2Exception.protocolError;
import static java.lang.Math.max;
import static java.lang.Math.min;
import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.http2.Http2PriorityTree.Priority;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
@ -33,16 +38,38 @@ import java.util.concurrent.TimeUnit;
*/
public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowController {
/**
* The interval (in ns) at which the removed priority garbage collector runs.
* A comparators that sorts priority nodes in ascending order by the amount
* of priority data available for its subtree.
*/
private final long GARBAGE_COLLECTION_INTERVAL = TimeUnit.SECONDS.toNanos(2);
private static final Comparator<Priority<FlowState>> DATA_WEIGHT =
new Comparator<Priority<FlowState>>() {
private static final int MAX_DATA_THRESHOLD = Integer.MAX_VALUE / 256;
@Override
public int compare(Priority<FlowState> o1, Priority<FlowState> o2) {
int o1Data = o1.data().priorityBytes();
int o2Data = o2.data().priorityBytes();
if (o1Data > MAX_DATA_THRESHOLD || o2Data > MAX_DATA_THRESHOLD) {
// Corner case to make sure we don't overflow an integer with
// the multiply.
return o1.data().priorityBytes() - o2.data().priorityBytes();
}
private final Http2PriorityTree<FlowState> priorityTree =
new DefaultHttp2PriorityTree<FlowState>();
private final Queue<Priority<FlowState>> garbage = new ArrayDeque<Priority<FlowState>>();
// Scale the data by the weight.
return (o1Data * o1.weight()) - (o2Data * o2.weight());
}
};
private final Http2PriorityTree<FlowState> priorityTree;
private final FlowState connectionFlow;
private final GarbageCollector garbageCollector;
private int initialWindowSize = DEFAULT_FLOW_CONTROL_WINDOW_SIZE;
private int connectionWindowSize = DEFAULT_FLOW_CONTROL_WINDOW_SIZE;
private long lastGarbageCollection;
public DefaultHttp2OutboundFlowController() {
priorityTree = new DefaultHttp2PriorityTree<FlowState>();
connectionFlow = new FlowState(priorityTree.root());
priorityTree.root().data(connectionFlow);
garbageCollector = new GarbageCollector();
}
@Override
public void addStream(int streamId, int parent, short weight, boolean exclusive) {
@ -54,13 +81,12 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
throw new IllegalArgumentException("stream " + streamId + " already exists");
}
FlowState state = new FlowState();
priority = priorityTree.prioritize(streamId, parent, weight, exclusive, state);
state.init(priority);
priority = priorityTree.prioritize(streamId, parent, weight, exclusive);
priority.data(new FlowState(priority));
}
@Override
public void updateStream(int streamId, int parent, short weight, boolean exclusive) {
public void updateStream(int streamId, int parentId, short weight, boolean exclusive) {
if (streamId <= 0) {
throw new IllegalArgumentException("Stream ID must be > 0");
}
@ -69,7 +95,48 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
throw new IllegalArgumentException("stream " + streamId + " does not exist");
}
priorityTree.prioritize(streamId, parent, weight, exclusive, priority.data());
// Get the parent stream.
Priority<FlowState> parent = priorityTree.root();
if (parentId != 0) {
parent = priorityTree.get(parentId);
if (parent == null) {
throw new IllegalArgumentException("Parent stream " + parentId + " does not exist");
}
}
// Determine whether we're adding a circular dependency on the parent. If so, this will
// restructure the tree to move this priority above the parent.
boolean circularDependency = parent.isDescendantOf(priority);
Priority<FlowState> previousParent = priority.parent();
boolean parentChange = previousParent != parent;
// If the parent is changing, remove the priority bytes from all relevant branches of the
// priority tree. We will restore them where appropriate after the move.
if (parentChange) {
// The parent is changing, remove the priority bytes for this subtree from its previous
// parent.
previousParent.data().incrementPriorityBytes(-priority.data().priorityBytes());
if (circularDependency) {
// The parent is currently a descendant of priority. Remove the priority bytes
// for its subtree starting at its parent node.
parent.parent().data().incrementPriorityBytes(-parent.data().priorityBytes());
}
}
// Adjust the priority tree.
priorityTree.prioritize(streamId, parentId, weight, exclusive);
// If the parent was changed, restore the priority bytes to the appropriate branches
// of the priority tree.
if (parentChange) {
if (circularDependency) {
// The parent was re-rooted. Update the priority bytes for its parent branch.
parent.parent().data().incrementPriorityBytes(parent.data().priorityBytes());
}
// Add the priority bytes for this subtree to the new parent branch.
parent.data().incrementPriorityBytes(priority.data().priorityBytes());
}
}
@Override
@ -88,12 +155,12 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
public void initialOutboundWindowSize(int newWindowSize) throws Http2Exception {
int delta = newWindowSize - initialWindowSize;
initialWindowSize = newWindowSize;
addAndGetConnectionWindowSize(delta);
connectionFlow.incrementStreamWindow(delta);
for (Priority<FlowState> priority : priorityTree) {
FlowState state = priority.data();
if (!state.isMarkedForRemoval()) {
// Verify that the maximum value is not exceeded by this change.
state.addAndGetWindow(delta);
state.incrementStreamWindow(delta);
}
}
@ -112,12 +179,12 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
public void updateOutboundWindowSize(int streamId, int delta) throws Http2Exception {
if (streamId == CONNECTION_STREAM_ID) {
// Update the connection window and write any pending frames for all streams.
addAndGetConnectionWindowSize(delta);
connectionFlow.incrementStreamWindow(delta);
writePendingBytes();
} else {
// Update the stream window and write any pending frames for the stream.
FlowState state = getStateOrFail(streamId);
state.addAndGetWindow(delta);
state.incrementStreamWindow(delta);
state.writeBytes(state.writableWindow());
}
}
@ -166,20 +233,10 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
}
/**
* Indicates whether applying the delta to the given value will cause an integer overflow.
* Returns the flow control window for the entire connection.
*/
private static boolean isIntegerOverflow(int previousValue, int delta) {
return delta > 0 && (Integer.MAX_VALUE - delta) < previousValue;
}
/**
* Increments the connectionWindowSize and returns the new value.
*/
private int addAndGetConnectionWindowSize(int delta) throws Http2Exception {
if (isIntegerOverflow(connectionWindowSize, delta)) {
throw format(FLOW_CONTROL_ERROR, "Window update exceeds maximum for connection");
}
return connectionWindowSize += delta;
private int connectionWindow() {
return priorityTree.root().data().streamWindow();
}
/**
@ -187,41 +244,29 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
*/
private void writePendingBytes() throws Http2Exception {
// Perform garbage collection to remove any priorities marked for deletion from the tree.
garbageCollect();
// Calculate the total writable bytes for each stream in the tree.
// TODO: consider maintaining a running total of the writable bytes at each node so we
// don't have to walk the entire tree here.
int totalWritableBytes = 0;
for (Priority<FlowState> priority : priorityTree.root().children()) {
totalWritableBytes += priority.data().calcWritableBytes();
}
garbageCollector.run();
// Recursively write as many of the total writable bytes as possible.
writeAllowedBytes(totalWritableBytes, priorityTree.root());
Priority<FlowState> root = priorityTree.root();
writeAllowedBytes(root, root.data().priorityBytes());
}
/**
* Recursively traverses the priority tree rooted at the given node. Attempts to write the
* allowed bytes for the streams in this sub tree based on their weighted priorities.
*
* @param allowance an allowed number of bytes that may be written to the streams in this sub
* tree.
* @param priority the sub tree to write to. On the first invocation, this will be the root of
* the priority tree (i.e. the connection node).
* @param allowance an allowed number of bytes that may be written to the streams in this sub
* tree.
*/
private void writeAllowedBytes(int allowance, Priority<FlowState> priority)
private void writeAllowedBytes(Priority<FlowState> priority, int allowance)
throws Http2Exception {
// Get the flow control state for this priority node. It may be null if processing the root
// node (i.e. the connection) or if the priority is being retained for a short time
// after the stream was closed.
// Write the allowed bytes for this node. If not all of the allowance was used,
// restore what's left so that it can be propagated to future nodes.
FlowState state = priority.data();
if (state != null) {
// Write the allowed bytes for this node. If not all of the allowance was used,
// restore what's left so that it can be propagated to future nodes.
int bytesWritten = state.writeBytes(allowance);
allowance -= bytesWritten;
}
int bytesWritten = state.writeBytes(allowance);
allowance -= bytesWritten;
if (allowance <= 0 || priority.isLeaf()) {
// Nothing left to do in this sub tree.
@ -229,23 +274,28 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
}
// Clip the remaining connection flow control window by the allowance.
int remainingWindow = Math.min(allowance, connectionWindowSize);
int remainingWindow = min(allowance, connectionWindow());
// Decreasing totals for all nodes yet to be processed in the current pass.
// For the root node, the allowance is the total number of writable bytes.
int unallocatedBytes = priority.isRoot() ? allowance : priority.data().unallocatedBytes();
// The total number of unallocated bytes from the children of this node.
int unallocatedBytes = state.priorityBytes() - state.streamableBytes();
// Optimization. If the window is big enough to fit all the data. Just write everything
// and skip the priority algorithm.
if (unallocatedBytes <= remainingWindow) {
for (Priority<FlowState> child : priority.children()) {
child.data().writeBytes(child.data().unallocatedBytes());
writeAllowedBytes(child, child.data().unallocatedPriorityBytes());
}
return;
}
// Get the total weight of all children directly under the current node.
int remainingWeight = priority.totalChildWeights();
// Copy and sort the children of this node. They are sorted in ascending order the total
// priority bytes for the subtree scaled by the weight of the node. The algorithm gives
// preference to nodes that appear later in the list, since the weight of each node
// increases in value as the list is iterated. This means that with this node ordering,
// the most bytes will be written to those nodes with the largest aggregate number of
// bytes and the highest priority.
ArrayList<Priority<FlowState>> states = new ArrayList<Priority<FlowState>>(priority.children());
Collections.sort(states, DATA_WEIGHT);
// Iterate over the children and spread the remaining bytes across them as is appropriate
// based on the weights. This algorithm loops over all of the children more than once,
@ -253,40 +303,38 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
// give a node its share of the current remaining bytes. The node's weight and bytes
// allocated are then decremented from the totals, so that the subsequent
// nodes split the difference. If after being processed, a node still has writable data,
// it is added to the end of the queue and will be processed again in the next pass.
// Increasing totals for nodes that have been re-added to the queue for the next pass.
// it is added back to the queue for further processing in the next pass.
int remainingWeight = priority.totalChildWeights();
int nextTail = 0;
int unallocatedBytesForNextPass = 0;
int remainingWeightForNextPass = 0;
// Copy the children to a deque
ArrayDeque<Priority<FlowState>> deque =
new ArrayDeque<Priority<FlowState>>(priority.children());
for (;;) {
Priority<FlowState> next = deque.poll();
if (next == null) {
break;
}
FlowState node = next.data();
if (remainingWeight == 0) {
for (int head = 0, tail = states.size(); ; ++head) {
if (head >= tail) {
// We've reached the end one pass of the nodes. Reset the totals based on
// the nodes that were re-added to the deque since they still have data available.
unallocatedBytes = unallocatedBytesForNextPass;
remainingWeight = remainingWeightForNextPass;
unallocatedBytesForNextPass = 0;
remainingWeightForNextPass = 0;
head = 0;
tail = nextTail;
nextTail = 0;
}
// Get the next state, or break if nothing to do.
if (head >= tail) {
break;
}
Priority<FlowState> next = states.get(head);
FlowState node = next.data();
int weight = node.priority().weight();
// Determine the amount of data that's still unallocated and will fit into
// the current connection window.
int writableData = Math.min(unallocatedBytes, remainingWindow);
if (writableData > 0 && node.unallocatedBytes() > 0) {
// Determine the value (in bytes) of a single unit of weight.
double dataToWeightRatio = min(unallocatedBytes, remainingWindow) / (double) remainingWeight;
unallocatedBytes -= node.unallocatedPriorityBytes();
remainingWeight -= weight;
// Determine the value (in bytes) of a single unit of weight.
double dataToWeightRatio = writableData / (double) remainingWeight;
if (dataToWeightRatio > 0.0 && node.unallocatedPriorityBytes() > 0) {
// Determine the portion of the current writable data that is assigned to this
// node.
@ -294,37 +342,30 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
// Clip the chunk allocated by the total amount of unallocated data remaining in
// the node.
int allocatedChunk = Math.min(writableChunk, node.unallocatedBytes());
int allocatedChunk = min(writableChunk, node.unallocatedPriorityBytes());
// Update the remaining connection window size.
remainingWindow -= allocatedChunk;
// This node has been processed for this loop. Remove it from the loop totals.
unallocatedBytes -= node.unallocatedBytes();
remainingWeight -= weight;
// Update the node state.
node.allocateBytes(allocatedChunk);
if (node.unallocatedBytes() > 0) {
// There is still data remaining for this stream. Add it to the end of the
// deque to be processed in the next loop.
unallocatedBytesForNextPass += node.unallocatedBytes();
// Mark these bytes as allocated.
node.allocatePriorityBytes(allocatedChunk);
if (node.unallocatedPriorityBytes() > 0) {
// There is still data remaining for this stream. Add it back to the queue
// for the next pass.
unallocatedBytesForNextPass += node.unallocatedPriorityBytes();
remainingWeightForNextPass += weight;
deque.add(node.priority());
// Don't write the data for this node yet - there may be more that will
// be allocated in the next loop.
states.set(nextTail++, node.priority());
continue;
}
} else {
// This node has been processed for this loop. Remove it from the loop totals.
unallocatedBytes -= node.unallocatedBytes();
remainingWeight -= weight;
}
if (node.allocatedBytes() > 0) {
if (node.allocatedPriorityBytes() > 0) {
// Write the allocated data for this stream.
writeAllowedBytes(node.allocatedBytes(), node.priority());
writeAllowedBytes(node.priority(), node.allocatedPriorityBytes());
// We're done with this node. Remark all bytes as unallocated for future
// invocations.
node.allocatePriorityBytes(0);
}
}
}
@ -334,55 +375,21 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
return priority != null ? priority.data() : null;
}
/**
* Removes any priorities from the tree that were marked for removal greater than
* {@link #GARBAGE_COLLECTION_INTERVAL} milliseconds ago. Garbage collection will run at most on
* the interval {@link #GARBAGE_COLLECTION_INTERVAL}, so calling it more frequently will have no
* effect.
*/
private void garbageCollect() {
if (garbage.isEmpty()) {
return;
}
long time = System.nanoTime();
if (time - lastGarbageCollection < GARBAGE_COLLECTION_INTERVAL) {
// Only run the garbage collection on the threshold interval (at most).
return;
}
lastGarbageCollection = time;
for (;;) {
Priority<FlowState> next = garbage.peek();
if (next == null) {
break;
}
long removeTime = next.data().removalTime();
if (time - removeTime > GARBAGE_COLLECTION_INTERVAL) {
Priority<FlowState> priority = garbage.remove();
priorityTree.remove(priority.streamId());
} else {
break;
}
}
}
/**
* The outbound flow control state for a single stream.
*/
private final class FlowState {
private final Queue<Frame> pendingWriteQueue = new ArrayDeque<Frame>(2);
private Priority<FlowState> priority;
private int windowSize = initialWindowSize;
private final Queue<Frame> pendingWriteQueue;
private final Priority<FlowState> priority;
private int streamWindow = initialWindowSize;
private long removalTime;
private int writableBytes;
private int allocatedBytes;
private int pendingBytes;
private int priorityBytes;
private int allocatedPriorityBytes;
/**
* Initializes this flow state with the stream priority.
*/
void init(Priority<FlowState> priority) {
FlowState(Priority<FlowState> priority) {
this.priority = priority;
pendingWriteQueue = new ArrayDeque<Frame>(2);
}
/**
@ -413,80 +420,83 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
*/
void markForRemoval() {
if (!isMarkedForRemoval()) {
removalTime = System.nanoTime();
garbage.add(priority);
garbageCollector.add(priority);
clear();
}
}
/**
* The flow control window for this stream. If this state is for stream 0, then this is
* the flow control window for the entire connection.
*/
int streamWindow() {
return streamWindow;
}
/**
* Increments the flow control window for this stream by the given delta and returns the new
* value.
*/
int addAndGetWindow(int delta) throws Http2Exception {
if (isIntegerOverflow(windowSize, delta)) {
int incrementStreamWindow(int delta) throws Http2Exception {
if (delta > 0 && (Integer.MAX_VALUE - delta) < streamWindow) {
throw new Http2StreamException(priority.streamId(), FLOW_CONTROL_ERROR,
"Window size overflow for stream");
"Window size overflow for stream: " + priority.streamId());
}
windowSize += delta;
return windowSize;
int previouslyStreamable = streamableBytes();
streamWindow += delta;
// Update this branch of the priority tree if the streamable bytes have changed for this
// node.
incrementPriorityBytes(streamableBytes() - previouslyStreamable);
return streamWindow;
}
/**
* Returns the maximum writable window (minimum of the stream and connection windows).
*/
int writableWindow() {
return Math.min(windowSize, connectionWindowSize);
return min(streamWindow, connectionWindow());
}
/**
* Returns the number of pending bytes for this node that will fit within the
* {@link #streamWindow}. This is used for the priority algorithm to determine the aggregate
* total for {@link #priorityBytes} at each node. Each node only takes into account it's
* stream window so that when a change occurs to the connection window, these values need
* not change (i.e. no tree traversal is required).
*/
int streamableBytes() {
return max(0, min(pendingBytes, streamWindow));
}
/**
* The aggregate total of all {@link #streamableBytes()} for subtree rooted at this node.
*/
int priorityBytes() {
return priorityBytes;
}
/**
* Used by the priority algorithm to allocate bytes to this stream.
*/
void allocateBytes(int bytes) {
allocatedBytes += bytes;
void allocatePriorityBytes(int bytes) {
allocatedPriorityBytes += bytes;
}
/**
* Used by the priority algorithm to get the intermediate allocation of bytes to this
* stream.
*/
int allocatedBytes() {
return allocatedBytes;
int allocatedPriorityBytes() {
return allocatedPriorityBytes;
}
/**
* Used by the priority algorithm to determine the number of writable bytes that have not
* yet been allocated.
*/
int unallocatedBytes() {
return writableBytes - allocatedBytes;
}
/**
* Used by the priority algorithm to calculate the number of writable bytes for this
* sub-tree. Writable bytes takes into account the connection window and the stream windows
* for each node.
*/
int calcWritableBytes() {
writableBytes = 0;
// Calculate the writable bytes for this node.
if (!isMarkedForRemoval()) {
int window = writableWindow();
for (Frame frame : pendingWriteQueue) {
writableBytes += Math.min(window, frame.data.readableBytes());
if (writableBytes == window) {
break;
}
}
}
// Calculate the writable bytes for all children.
for (Priority<FlowState> child : priority.children()) {
writableBytes += child.data().calcWritableBytes();
}
return writableBytes;
int unallocatedPriorityBytes() {
return priorityBytes - allocatedPriorityBytes;
}
/**
@ -509,7 +519,7 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
* size is zero.
*/
Frame peek() {
if (windowSize > 0) {
if (streamWindow > 0) {
return pendingWriteQueue.peek();
}
return null;
@ -519,7 +529,7 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
* Clears the pending queue and writes errors for each remaining frame.
*/
void clear() {
while (true) {
for (;;) {
Frame frame = pendingWriteQueue.poll();
if (frame == null) {
break;
@ -540,7 +550,7 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
return bytesWritten;
}
int maxBytes = Math.min(bytes, writableWindow());
int maxBytes = min(bytes, writableWindow());
while (bytesWritten < maxBytes && hasFrame()) {
Frame pendingWrite = peek();
if (maxBytes >= pendingWrite.size()) {
@ -555,11 +565,35 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
}
// Update the threshold.
maxBytes = Math.min(bytes - bytesWritten, writableWindow());
maxBytes = min(bytes - bytesWritten, writableWindow());
}
return bytesWritten;
}
/**
* Increments the number of pending bytes for this node. If there was any change to the
* number of bytes that fit into the stream window, then {@link #incrementPriorityBytes} to
* recursively update this branch of the priority tree.
*/
private void incrementPendingBytes(int numBytes) {
int previouslyStreamable = streamableBytes();
this.pendingBytes += numBytes;
incrementPriorityBytes(streamableBytes() - previouslyStreamable);
}
/**
* Recursively increments the priority bytes for this branch in the priority tree starting
* at the current node.
*/
private void incrementPriorityBytes(int numBytes) {
if (numBytes != 0) {
priorityBytes += numBytes;
if (!priority.isRoot()) {
priority.parent().data().incrementPriorityBytes(numBytes);
}
}
}
/**
* A wrapper class around the content of a data frame.
*/
@ -590,6 +624,9 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
if (!enqueued) {
enqueued = true;
pendingWriteQueue.offer(this);
// Increment the number of pending bytes for this stream.
incrementPendingBytes(data.readableBytes());
}
}
@ -600,10 +637,11 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
*/
void write() throws Http2Exception {
int dataLength = data.readableBytes();
connectionWindowSize -= dataLength;
addAndGetWindow(-dataLength);
connectionFlow.incrementStreamWindow(-dataLength);
incrementStreamWindow(-dataLength);
writer.writeFrame(priority.streamId(), data, padding, endStream, endSegment,
compressed);
decrementPendingBytes(dataLength);
}
/**
@ -611,6 +649,7 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
* unwritten bytes are removed from this branch of the priority tree.
*/
void writeError(Http2Exception cause) {
decrementPendingBytes(data.readableBytes());
data.release();
writer.setFailure(cause);
}
@ -626,9 +665,77 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
*/
Frame split(int maxBytes) {
// TODO: Should padding be included in the chunks or only the last frame?
maxBytes = Math.min(maxBytes, data.readableBytes());
return new Frame(data.readSlice(maxBytes).retain(), 0, false, false, compressed,
maxBytes = min(maxBytes, data.readableBytes());
Frame frame = new Frame(data.readSlice(maxBytes).retain(), 0, false, false, compressed,
writer);
decrementPendingBytes(maxBytes);
return frame;
}
/**
* If this frame is in the pending queue, decrements the number of pending bytes for the
* stream.
*/
void decrementPendingBytes(int bytes) {
if (enqueued) {
incrementPendingBytes(-bytes);
}
}
}
}
/**
* Controls garbage collection for priorities that have been marked for removal.
*/
private final class GarbageCollector implements Runnable {
/**
* The interval (in ns) at which the removed priority garbage collector runs.
*/
private final long GARBAGE_COLLECTION_INTERVAL = TimeUnit.SECONDS.toNanos(2);
private final Queue<Priority<FlowState>> garbage;
private long lastGarbageCollection;
GarbageCollector() {
garbage = new ArrayDeque<Priority<FlowState>>();
}
void add(Priority<FlowState> priority) {
priority.data().removalTime = System.nanoTime();
garbage.add(priority);
}
/**
* Removes any priorities from the tree that were marked for removal greater than
* {@link #GARBAGE_COLLECTION_INTERVAL} milliseconds ago. Garbage collection will run at most on
* the interval {@link #GARBAGE_COLLECTION_INTERVAL}, so calling it more frequently will have no
* effect.
*/
@Override
public void run() {
if (garbage.isEmpty()) {
return;
}
long time = System.nanoTime();
if (time - lastGarbageCollection < GARBAGE_COLLECTION_INTERVAL) {
// Only run the garbage collection on the threshold interval (at most).
return;
}
lastGarbageCollection = time;
for (;;) {
Priority<FlowState> next = garbage.peek();
if (next == null) {
break;
}
long removeTime = next.data().removalTime();
if (time - removeTime > GARBAGE_COLLECTION_INTERVAL) {
Priority<FlowState> priority = garbage.remove();
priorityTree.remove(priority.streamId());
} else {
break;
}
}
}
}

View File

@ -16,6 +16,8 @@
package io.netty.handler.codec.http2;
import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT;
import static io.netty.handler.codec.http2.Http2CodecUtil.MAX_UNSIGNED_BYTE;
import io.netty.handler.codec.http2.Http2PriorityTree.Priority;
import java.util.ArrayDeque;
import java.util.Collections;
@ -31,8 +33,8 @@ import java.util.Set;
*/
public class DefaultHttp2PriorityTree<T> implements Http2PriorityTree<T> {
private final DefaultPriority<T> root = new DefaultPriority<T>(0, (short) 0, null);
private Map<Integer, Priority<T>> priorityMap = new HashMap<Integer, Priority<T>>();
private final DefaultPriority<T> root = new DefaultPriority<T>(0, (short) 0);
private final Map<Integer, Priority<T>> priorityMap = new HashMap<Integer, Priority<T>>();
@Override
public Priority<T> get(int streamId) {
@ -45,13 +47,12 @@ public class DefaultHttp2PriorityTree<T> implements Http2PriorityTree<T> {
}
@Override
public Priority<T> prioritizeUsingDefaults(int streamId, T data) {
return prioritize(streamId, 0, DEFAULT_PRIORITY_WEIGHT, false, data);
public Priority<T> prioritizeUsingDefaults(int streamId) {
return prioritize(streamId, 0, DEFAULT_PRIORITY_WEIGHT, false);
}
@Override
public Priority<T> prioritize(int streamId, int parent, short weight, boolean exclusive,
T data) {
public Priority<T> prioritize(int streamId, int parent, short weight, boolean exclusive) {
if (streamId <= 0) {
throw new IllegalArgumentException("Stream ID must be > 0");
}
@ -61,7 +62,7 @@ public class DefaultHttp2PriorityTree<T> implements Http2PriorityTree<T> {
if (parent < 0) {
throw new IllegalArgumentException("Parent stream ID must be >= 0");
}
if (weight < 1 || weight > 256) {
if (weight < 1 || weight > MAX_UNSIGNED_BYTE) {
throw new IllegalArgumentException("Invalid weight: " + weight);
}
@ -77,15 +78,14 @@ public class DefaultHttp2PriorityTree<T> implements Http2PriorityTree<T> {
DefaultPriority<T> priority = internalGet(streamId);
if (priority == null) {
// Add a new priority.
priority = new DefaultPriority<T>(streamId, weight, data);
priority = new DefaultPriority<T>(streamId, weight);
newParent.addChild(priority, exclusive);
priorityMap.put(streamId, priority);
return priority;
}
// Already have a priority. Re-prioritize the stream.
priority.setWeight(weight);
priority.setData(data);
priority.weight(weight);
if (newParent == priority.parent() && !exclusive) {
// No changes were made to the tree structure.
@ -167,10 +167,9 @@ public class DefaultHttp2PriorityTree<T> implements Http2PriorityTree<T> {
private DefaultPriority<T> parent;
private int totalChildWeights;
DefaultPriority(int streamId, short weight, T data) {
DefaultPriority(int streamId, short weight) {
this.streamId = streamId;
this.weight = weight;
this.data = data;
}
@Override
@ -193,6 +192,12 @@ public class DefaultHttp2PriorityTree<T> implements Http2PriorityTree<T> {
return data;
}
@Override
public Priority<T> data(T data) {
this.data = data;
return this;
}
@Override
public int totalChildWeights() {
return totalChildWeights;
@ -232,11 +237,11 @@ public class DefaultHttp2PriorityTree<T> implements Http2PriorityTree<T> {
@Override
public boolean hasChild(int streamId) {
return getChild(streamId) != null;
return child(streamId) != null;
}
@Override
public Priority<T> getChild(int streamId) {
public Priority<T> child(int streamId) {
for (DefaultPriority<T> child : children) {
if (child.streamId() == streamId) {
return child;
@ -245,7 +250,7 @@ public class DefaultHttp2PriorityTree<T> implements Http2PriorityTree<T> {
return null;
}
void setWeight(short weight) {
void weight(short weight) {
if (parent != null && weight != this.weight) {
int delta = weight - this.weight;
parent.totalChildWeights += delta;
@ -253,10 +258,6 @@ public class DefaultHttp2PriorityTree<T> implements Http2PriorityTree<T> {
this.weight = weight;
}
void setData(T data) {
this.data = data;
}
Set<DefaultPriority<T>> removeAllChildren() {
if (children.isEmpty()) {
return Collections.emptySet();

View File

@ -47,6 +47,11 @@ public interface Http2PriorityTree<T> extends Iterable<Http2PriorityTree.Priorit
*/
T data();
/**
* Associates the given data with this priority node.
*/
Priority<T> data(T data);
/**
* Returns weight assigned to the dependency with the parent. The weight will be a value
* between 1 and 256.
@ -83,7 +88,7 @@ public interface Http2PriorityTree<T> extends Iterable<Http2PriorityTree.Priorit
* Attempts to find a child of this node for the given stream. If not found, returns
* {@code null}.
*/
Priority<T> getChild(int streamId);
Priority<T> child(int streamId);
/**
* Gets the children nodes that are dependent on this node.
@ -99,7 +104,7 @@ public interface Http2PriorityTree<T> extends Iterable<Http2PriorityTree.Priorit
* @param data optional user-defined data to associate to the stream
* @return the priority for the stream.
*/
Priority<T> prioritizeUsingDefaults(int streamId, T data);
Priority<T> prioritizeUsingDefaults(int streamId);
/**
* Adds a new priority or updates an existing priority for the given stream.
@ -111,10 +116,9 @@ public interface Http2PriorityTree<T> extends Iterable<Http2PriorityTree.Priorit
* must be between 1 and 256 (inclusive)
* @param exclusive indicates that the stream should be the exclusive dependent on its parent.
* This only applies if the stream has a parent.
* @param data optional user-defined data to associate to the stream.
* @return the priority for the stream.
*/
Priority<T> prioritize(int streamId, int parent, short weight, boolean exclusive, T data);
Priority<T> prioritize(int streamId, int parent, short weight, boolean exclusive);
/**
* Removes the priority information for the given stream. Adjusts other priorities if necessary.

View File

@ -19,6 +19,7 @@ import static io.netty.handler.codec.http2.Http2CodecUtil.CONNECTION_STREAM_ID;
import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_FLOW_CONTROL_WINDOW_SIZE;
import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyInt;
@ -366,6 +367,187 @@ public class DefaultHttp2OutboundFlowControllerTest {
assertEquals(1, Math.abs(c - d));
}
/**
* In this test, we verify re-prioritizing a stream. We start out with B blocked:
*
* <pre>
* 0
* / \
* A [B]
* / \
* C D
* </pre>
*
* We then re-prioritize D so that it's directly off of the connection and verify that A and D split the
* written bytes between them.
* <pre>
* 0
* /|\
* / | \
* A [B] D
* /
* C
* </pre>
*/
@Test
public void reprioritizeShouldAdjustOutboundFlow() throws Http2Exception {
// Block the connection
controller.updateOutboundWindowSize(CONNECTION_STREAM_ID,
-DEFAULT_FLOW_CONTROL_WINDOW_SIZE);
// Block stream B
controller.updateOutboundWindowSize(STREAM_B, -DEFAULT_FLOW_CONTROL_WINDOW_SIZE);
// Send 10 bytes to each.
send(STREAM_A, dummyData(10));
send(STREAM_B, dummyData(10));
send(STREAM_C, dummyData(10));
send(STREAM_D, dummyData(10));
verifyNoWrite(STREAM_A);
verifyNoWrite(STREAM_B);
verifyNoWrite(STREAM_C);
verifyNoWrite(STREAM_D);
// Re-prioritize D as a direct child of the connection.
controller.updateStream(STREAM_D, 0, DEFAULT_PRIORITY_WEIGHT, false);
// Verify that the entire frame was sent.
controller.updateOutboundWindowSize(CONNECTION_STREAM_ID, 10);
ArgumentCaptor<ByteBuf> captor = ArgumentCaptor.forClass(ByteBuf.class);
// Verify that A received all the bytes.
captureWrite(STREAM_A, captor, false);
assertEquals(5, captor.getValue().readableBytes());
captureWrite(STREAM_D, captor, false);
assertEquals(5, captor.getValue().readableBytes());
verifyNoWrite(STREAM_B);
verifyNoWrite(STREAM_C);
}
/**
* In this test, we root all streams at the connection, and then verify that data
* is split appropriately based on weight (all available data is the same).
*
* <pre>
* 0
* / / \ \
* A B C D
* </pre>
*/
@Test
public void writeShouldPreferHighestWeight() throws Http2Exception {
// Block the connection
controller.updateOutboundWindowSize(CONNECTION_STREAM_ID,
-DEFAULT_FLOW_CONTROL_WINDOW_SIZE);
// Root the streams at the connection and assign weights.
controller.updateStream(STREAM_A, 0, (short) 50, false);
controller.updateStream(STREAM_B, 0, (short) 200, false);
controller.updateStream(STREAM_C, 0, (short) 100, false);
controller.updateStream(STREAM_D, 0, (short) 100, false);
// Send a bunch of data on each stream.
send(STREAM_A, dummyData(1000));
send(STREAM_B, dummyData(1000));
send(STREAM_C, dummyData(1000));
send(STREAM_D, dummyData(1000));
verifyNoWrite(STREAM_A);
verifyNoWrite(STREAM_B);
verifyNoWrite(STREAM_C);
verifyNoWrite(STREAM_D);
// Allow 1000 bytes to be sent.
controller.updateOutboundWindowSize(CONNECTION_STREAM_ID, 1000);
ArgumentCaptor<ByteBuf> captor = ArgumentCaptor.forClass(ByteBuf.class);
captureWrite(STREAM_A, captor, false);
int aWritten = captor.getValue().readableBytes();
int min = aWritten;
int max = aWritten;
captureWrite(STREAM_B, captor, false);
int bWritten = captor.getValue().readableBytes();
min = Math.min(min, bWritten);
max = Math.max(max, bWritten);
captureWrite(STREAM_C, captor, false);
int cWritten = captor.getValue().readableBytes();
min = Math.min(min, cWritten);
max = Math.max(max, cWritten);
captureWrite(STREAM_D, captor, false);
int dWritten = captor.getValue().readableBytes();
min = Math.min(min, dWritten);
max = Math.max(max, dWritten);
assertEquals(1000, aWritten + bWritten + cWritten + dWritten);
assertEquals(aWritten, min);
assertEquals(bWritten, max);
assertTrue(aWritten < cWritten);
assertEquals(cWritten, dWritten);
assertTrue(cWritten < bWritten);
}
/**
* In this test, we root all streams at the connection, and then verify that data
* is split equally among the stream, since they all have the same weight.
*
* <pre>
* 0
* / / \ \
* A B C D
* </pre>
*/
@Test
public void samePriorityShouldWriteEqualData() throws Http2Exception {
// Block the connection
controller.updateOutboundWindowSize(CONNECTION_STREAM_ID,
-DEFAULT_FLOW_CONTROL_WINDOW_SIZE);
// Root the streams at the connection with the same weights.
controller.updateStream(STREAM_A, 0, DEFAULT_PRIORITY_WEIGHT, false);
controller.updateStream(STREAM_B, 0, DEFAULT_PRIORITY_WEIGHT, false);
controller.updateStream(STREAM_C, 0, DEFAULT_PRIORITY_WEIGHT, false);
controller.updateStream(STREAM_D, 0, DEFAULT_PRIORITY_WEIGHT, false);
// Send a bunch of data on each stream.
send(STREAM_A, dummyData(400));
send(STREAM_B, dummyData(500));
send(STREAM_C, dummyData(0));
send(STREAM_D, dummyData(700));
verifyNoWrite(STREAM_A);
verifyNoWrite(STREAM_B);
verifyNoWrite(STREAM_D);
// The write will occur on C, because it's an empty frame.
ArgumentCaptor<ByteBuf> captor = ArgumentCaptor.forClass(ByteBuf.class);
captureWrite(STREAM_C, captor, false);
assertEquals(0, captor.getValue().readableBytes());
// Allow 1000 bytes to be sent.
controller.updateOutboundWindowSize(CONNECTION_STREAM_ID, 999);
captureWrite(STREAM_A, captor, false);
int aWritten = captor.getValue().readableBytes();
int min = aWritten;
int max = aWritten;
captureWrite(STREAM_B, captor, false);
int bWritten = captor.getValue().readableBytes();
min = Math.min(min, bWritten);
max = Math.max(max, bWritten);
captureWrite(STREAM_D, captor, false);
int dWritten = captor.getValue().readableBytes();
min = Math.min(min, dWritten);
max = Math.max(max, dWritten);
assertEquals(999, aWritten + bWritten + dWritten);
assertEquals(333, aWritten);
assertEquals(333, bWritten);
assertEquals(333, dWritten);
}
private void send(int streamId, ByteBuf data) throws Http2Exception {
controller.sendFlowControlled(streamId, data, 0, false, false, false, frameWriter);
}

View File

@ -18,7 +18,6 @@ package io.netty.handler.codec.http2;
import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertSame;
import io.netty.handler.codec.http2.Http2PriorityTree.Priority;
import org.junit.Before;
@ -37,12 +36,10 @@ public class DefaultHttp2PriorityTreeTest {
@Test
public void prioritizeShouldUseDefaults() {
Object data = new Object();
tree.prioritizeUsingDefaults(1, data);
tree.prioritizeUsingDefaults(1);
assertEquals(1, tree.root().numChildren());
Priority<Object> p = tree.root().children().iterator().next();
assertEquals(1, p.streamId());
assertSame(data, p.data());
assertEquals(DEFAULT_PRIORITY_WEIGHT, p.weight());
assertEquals(0, p.parent().streamId());
assertEquals(0, p.numChildren());
@ -50,12 +47,10 @@ public class DefaultHttp2PriorityTreeTest {
@Test
public void prioritizeFromEmptyShouldSucceed() {
Object data = new Object();
tree.prioritize(1, 0, DEFAULT_PRIORITY_WEIGHT, false, data);
tree.prioritize(1, 0, DEFAULT_PRIORITY_WEIGHT, false);
assertEquals(1, tree.root().numChildren());
Priority<Object> p = tree.root().getChild(1);
Priority<Object> p = tree.root().child(1);
assertNotNull(p);
assertSame(data, p.data());
assertEquals(DEFAULT_PRIORITY_WEIGHT, p.weight());
assertEquals(0, p.parent().streamId());
assertEquals(0, p.numChildren());
@ -63,14 +58,11 @@ public class DefaultHttp2PriorityTreeTest {
@Test
public void reprioritizeWithNoChangeShouldDoNothing() {
Object d1 = new Object();
Object d2 = new Object();
tree.prioritize(1, 0, DEFAULT_PRIORITY_WEIGHT, false, d1);
tree.prioritize(1, 0, DEFAULT_PRIORITY_WEIGHT, false, d2);
tree.prioritize(1, 0, DEFAULT_PRIORITY_WEIGHT, false);
tree.prioritize(1, 0, DEFAULT_PRIORITY_WEIGHT, false);
assertEquals(1, tree.root().numChildren());
Priority<Object> p = tree.root().getChild(1);
Priority<Object> p = tree.root().child(1);
assertNotNull(p);
assertSame(d2, p.data());
assertEquals(DEFAULT_PRIORITY_WEIGHT, p.weight());
assertEquals(0, p.parent().streamId());
assertEquals(0, p.numChildren());
@ -78,14 +70,10 @@ public class DefaultHttp2PriorityTreeTest {
@Test
public void insertExclusiveShouldAddNewLevel() {
Object d1 = new Object();
Object d2 = new Object();
Object d3 = new Object();
Object d4 = new Object();
tree.prioritize(1, 0, DEFAULT_PRIORITY_WEIGHT, false, d1);
tree.prioritize(2, 1, DEFAULT_PRIORITY_WEIGHT, false, d2);
tree.prioritize(3, 1, DEFAULT_PRIORITY_WEIGHT, false, d3);
tree.prioritize(4, 1, DEFAULT_PRIORITY_WEIGHT, true, d4);
tree.prioritize(1, 0, DEFAULT_PRIORITY_WEIGHT, false);
tree.prioritize(2, 1, DEFAULT_PRIORITY_WEIGHT, false);
tree.prioritize(3, 1, DEFAULT_PRIORITY_WEIGHT, false);
tree.prioritize(4, 1, DEFAULT_PRIORITY_WEIGHT, true);
assertEquals(4, tree.size());
// Level 0
@ -94,31 +82,27 @@ public class DefaultHttp2PriorityTreeTest {
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
// Level 1
p = p.getChild(1);
p = p.child(1);
assertNotNull(p);
assertSame(d1, p.data());
assertEquals(0, p.parent().streamId());
assertEquals(1, p.numChildren());
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
// Level 2
p = p.getChild(4);
p = p.child(4);
assertNotNull(p);
assertSame(d4, p.data());
assertEquals(1, p.parent().streamId());
assertEquals(2, p.numChildren());
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
// Level 3
p = p.getChild(2);
p = p.child(2);
assertNotNull(p);
assertSame(d2, p.data());
assertEquals(4, p.parent().streamId());
assertEquals(0, p.numChildren());
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
p = p.parent().getChild(3);
p = p.parent().child(3);
assertNotNull(p);
assertSame(d3, p.data());
assertEquals(4, p.parent().streamId());
assertEquals(0, p.numChildren());
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
@ -126,14 +110,10 @@ public class DefaultHttp2PriorityTreeTest {
@Test
public void removeShouldRestructureTree() {
Object d1 = new Object();
Object d2 = new Object();
Object d3 = new Object();
Object d4 = new Object();
tree.prioritize(1, 0, DEFAULT_PRIORITY_WEIGHT, false, d1);
tree.prioritize(2, 1, DEFAULT_PRIORITY_WEIGHT, false, d2);
tree.prioritize(3, 2, DEFAULT_PRIORITY_WEIGHT, false, d3);
tree.prioritize(4, 2, DEFAULT_PRIORITY_WEIGHT, false, d4);
tree.prioritize(1, 0, DEFAULT_PRIORITY_WEIGHT, false);
tree.prioritize(2, 1, DEFAULT_PRIORITY_WEIGHT, false);
tree.prioritize(3, 2, DEFAULT_PRIORITY_WEIGHT, false);
tree.prioritize(4, 2, DEFAULT_PRIORITY_WEIGHT, false);
tree.remove(2);
// Level 0
@ -142,23 +122,20 @@ public class DefaultHttp2PriorityTreeTest {
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
// Level 1
p = p.getChild(1);
p = p.child(1);
assertNotNull(p);
assertSame(d1, p.data());
assertEquals(0, p.parent().streamId());
assertEquals(2, p.numChildren());
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
// Level 2
p = p.getChild(3);
p = p.child(3);
assertNotNull(p);
assertSame(d3, p.data());
assertEquals(1, p.parent().streamId());
assertEquals(0, p.numChildren());
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
p = p.parent().getChild(4);
p = p.parent().child(4);
assertNotNull(p);
assertSame(d4, p.data());
assertEquals(1, p.parent().streamId());
assertEquals(0, p.numChildren());
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
@ -173,16 +150,16 @@ public class DefaultHttp2PriorityTreeTest {
int d = 4;
int e = 5;
int f = 6;
tree.prioritize(a, 0, DEFAULT_PRIORITY_WEIGHT, false, null);
tree.prioritize(b, a, DEFAULT_PRIORITY_WEIGHT, false, null);
tree.prioritize(c, a, DEFAULT_PRIORITY_WEIGHT, false, null);
tree.prioritize(d, c, DEFAULT_PRIORITY_WEIGHT, false, null);
tree.prioritize(e, c, DEFAULT_PRIORITY_WEIGHT, false, null);
tree.prioritize(f, d, DEFAULT_PRIORITY_WEIGHT, false, null);
tree.prioritize(a, 0, DEFAULT_PRIORITY_WEIGHT, false);
tree.prioritize(b, a, DEFAULT_PRIORITY_WEIGHT, false);
tree.prioritize(c, a, DEFAULT_PRIORITY_WEIGHT, false);
tree.prioritize(d, c, DEFAULT_PRIORITY_WEIGHT, false);
tree.prioritize(e, c, DEFAULT_PRIORITY_WEIGHT, false);
tree.prioritize(f, d, DEFAULT_PRIORITY_WEIGHT, false);
assertEquals(6, tree.size());
// Non-exclusive re-prioritization of a->d.
tree.prioritize(a, d, DEFAULT_PRIORITY_WEIGHT, false, null);
tree.prioritize(a, d, DEFAULT_PRIORITY_WEIGHT, false);
// Level 0
Priority<Object> p = tree.root();
@ -190,33 +167,33 @@ public class DefaultHttp2PriorityTreeTest {
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
// Level 1
p = p.getChild(d);
p = p.child(d);
assertNotNull(p);
assertEquals(2, p.numChildren());
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
// Level 2
p = p.getChild(f);
p = p.child(f);
assertNotNull(p);
assertEquals(0, p.numChildren());
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
p = p.parent().getChild(a);
p = p.parent().child(a);
assertNotNull(p);
assertEquals(2, p.numChildren());
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
// Level 3
p = p.getChild(b);
p = p.child(b);
assertNotNull(p);
assertEquals(0, p.numChildren());
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
p = p.parent().getChild(c);
p = p.parent().child(c);
assertNotNull(p);
assertEquals(1, p.numChildren());
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
// Level 4;
p = p.getChild(e);
p = p.child(e);
assertNotNull(p);
assertEquals(0, p.numChildren());
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
@ -233,16 +210,16 @@ public class DefaultHttp2PriorityTreeTest {
int d = 4;
int e = 5;
int f = 6;
tree.prioritize(a, 0, DEFAULT_PRIORITY_WEIGHT, false, null);
tree.prioritize(b, a, DEFAULT_PRIORITY_WEIGHT, false, null);
tree.prioritize(c, a, DEFAULT_PRIORITY_WEIGHT, false, null);
tree.prioritize(d, c, DEFAULT_PRIORITY_WEIGHT, false, null);
tree.prioritize(e, c, DEFAULT_PRIORITY_WEIGHT, false, null);
tree.prioritize(f, d, DEFAULT_PRIORITY_WEIGHT, false, null);
tree.prioritize(a, 0, DEFAULT_PRIORITY_WEIGHT, false);
tree.prioritize(b, a, DEFAULT_PRIORITY_WEIGHT, false);
tree.prioritize(c, a, DEFAULT_PRIORITY_WEIGHT, false);
tree.prioritize(d, c, DEFAULT_PRIORITY_WEIGHT, false);
tree.prioritize(e, c, DEFAULT_PRIORITY_WEIGHT, false);
tree.prioritize(f, d, DEFAULT_PRIORITY_WEIGHT, false);
assertEquals(6, tree.size());
// Exclusive re-prioritization of a->d.
tree.prioritize(a, d, DEFAULT_PRIORITY_WEIGHT, true, null);
tree.prioritize(a, d, DEFAULT_PRIORITY_WEIGHT, true);
// Level 0
Priority<Object> p = tree.root();
@ -250,33 +227,33 @@ public class DefaultHttp2PriorityTreeTest {
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
// Level 1
p = p.getChild(d);
p = p.child(d);
assertNotNull(p);
assertEquals(1, p.numChildren());
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
// Level 2
p = p.getChild(a);
p = p.child(a);
assertNotNull(p);
assertEquals(3, p.numChildren());
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
// Level 3
p = p.getChild(b);
p = p.child(b);
assertNotNull(p);
assertEquals(0, p.numChildren());
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
p = p.parent().getChild(f);
p = p.parent().child(f);
assertNotNull(p);
assertEquals(0, p.numChildren());
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
p = p.parent().getChild(c);
p = p.parent().child(c);
assertNotNull(p);
assertEquals(1, p.numChildren());
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
// Level 4;
p = p.getChild(e);
p = p.child(e);
assertNotNull(p);
assertEquals(0, p.numChildren());
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());