HTTP/2 Priority Algorithm Restructure

Motivation:
The current priority algorithm uses 2 different mechanisms to iterate the priority tree and send the results of the allocation.  The current algorithm also uses a two step phase where the priority tree is traversed and allocation amounts are calculated and then all active streams are traversed to send for any streams that may or may not have been allocated bytes.

Modifications:
- DefaultHttp2OutboundFlowController will allocate and send (when possible) in the same looping structure.
- The recursive method will send only for the children instead of itself and its children which should simplify the recursion.

Result:
Hopefully simplified recursive algorithm where the tree iteration determines who needs to send and less iteration after the recursive calls complete.
This commit is contained in:
Scott Mitchell 2014-11-13 18:11:49 -05:00
parent 700ac93b15
commit c8a1d077b5

View File

@ -43,18 +43,18 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
/** /**
* A {@link Comparator} that sorts streams in ascending order the amount of streamable data. * A {@link Comparator} that sorts streams in ascending order the amount of streamable data.
*/ */
private static final Comparator<Http2Stream> DATA_ORDER = new Comparator<Http2Stream>() { private static final Comparator<Http2Stream> WEIGHT_ORDER = new Comparator<Http2Stream>() {
@Override @Override
public int compare(Http2Stream o1, Http2Stream o2) { public int compare(Http2Stream o1, Http2Stream o2) {
return state(o1).streamableBytesForTree() - state(o2).streamableBytesForTree(); return o2.weight() - o1.weight();
} }
}; };
private final Http2Connection connection; private final Http2Connection connection;
private final Http2FrameWriter frameWriter; private final Http2FrameWriter frameWriter;
private int initialWindowSize = DEFAULT_WINDOW_SIZE; private int initialWindowSize = DEFAULT_WINDOW_SIZE;
private boolean frameSent;
private ChannelHandlerContext ctx; private ChannelHandlerContext ctx;
private boolean frameSent;
public DefaultHttp2OutboundFlowController(Http2Connection connection, Http2FrameWriter frameWriter) { public DefaultHttp2OutboundFlowController(Http2Connection connection, Http2FrameWriter frameWriter) {
this.connection = checkNotNull(connection, "connection"); this.connection = checkNotNull(connection, "connection");
@ -140,15 +140,17 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
// Update the stream window and write any pending frames for the stream. // Update the stream window and write any pending frames for the stream.
OutboundFlowState state = stateOrFail(streamId); OutboundFlowState state = stateOrFail(streamId);
state.incrementStreamWindow(delta); state.incrementStreamWindow(delta);
if (state.writeBytes(state.writableWindow()) > 0) { frameSent = false;
state.writeBytes(state.writableWindow());
if (frameSent) {
flush(); flush();
} }
} }
} }
@Override @Override
public ChannelFuture writeData(ChannelHandlerContext ctx, int streamId, ByteBuf data, public ChannelFuture writeData(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding,
int padding, boolean endStream, ChannelPromise promise) { boolean endStream, ChannelPromise promise) {
checkNotNull(ctx, "ctx"); checkNotNull(ctx, "ctx");
checkNotNull(promise, "promise"); checkNotNull(promise, "promise");
checkNotNull(data, "data"); checkNotNull(data, "data");
@ -203,7 +205,7 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
} }
private static OutboundFlowState state(Http2Stream stream) { private static OutboundFlowState state(Http2Stream stream) {
return stream != null ? (OutboundFlowState) stream.outboundFlow() : null; return (OutboundFlowState) stream.outboundFlow();
} }
private OutboundFlowState connectionState() { private OutboundFlowState connectionState() {
@ -211,7 +213,8 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
} }
private OutboundFlowState state(int streamId) { private OutboundFlowState state(int streamId) {
return state(connection.stream(streamId)); Http2Stream stream = connection.stream(streamId);
return stream != null ? state(stream) : null;
} }
/** /**
@ -246,108 +249,111 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
* Writes as many pending bytes as possible, according to stream priority. * Writes as many pending bytes as possible, according to stream priority.
*/ */
private void writePendingBytes() throws Http2Exception { private void writePendingBytes() throws Http2Exception {
frameSent = false;
Http2Stream connectionStream = connection.connectionStream(); Http2Stream connectionStream = connection.connectionStream();
OutboundFlowState connectionState = state(connectionStream); int connectionWindow = state(connectionStream).window();
int connectionWindow = Math.max(0, connectionState.window());
// Allocate the bytes for the entire priority tree. if (connectionWindow > 0) {
allocateBytesForTree(connectionStream, connectionWindow); frameSent = false;
writeChildren(connectionStream, connectionWindow);
// Perform the write of the allocated bytes for each stream.
for (Http2Stream stream : connection.activeStreams()) { for (Http2Stream stream : connection.activeStreams()) {
OutboundFlowState state = state(stream); writeChildNode(state(stream));
// The allocated bytes are for the entire sub-tree but the write will be limited
// by the number of pending bytes for the stream.
state.writeBytes(state.allocatedBytesForTree());
state.resetAllocatedBytesForTree();
} }
connectionState.resetAllocatedBytesForTree();
// Only flush once for all written frames.
if (frameSent) { if (frameSent) {
flush(); flush();
} }
} }
}
/** /**
* Allocates as many bytes as possible for the given tree within the provided connection window. * Write the children of {@code parent} in the priority tree. This will allocate bytes by stream weight.
* * @param parent The parent of the nodes which will be written.
* @param stream the tree for which the given bytes are to be allocated. * @param connectionWindow The connection window this is available for use at this point in the tree.
* @param connectionWindow the connection window that acts as an upper bound on the total number * @return An object summarizing the write and allocation results.
* of bytes that can be allocated for the tree.
* @return the total number of bytes actually allocated for this subtree.
*/ */
private int allocateBytesForTree(Http2Stream stream, int connectionWindow) { private int writeChildren(Http2Stream parent, int connectionWindow) {
OutboundFlowState state = state(stream); OutboundFlowState state = state(parent);
connectionWindow = min(connectionWindow, state.unallocatedBytesForTree()); if (state.streamableBytesForTree() <= 0) {
return 0;
}
int bytesAllocated = 0;
// Determine the amount of bytes to allocate for 'this' stream. // If the number of streamable bytes for this tree will fit in the connection window
int streamable = Math.max(0, state.streamableBytes() - state.allocatedBytesForTree()); // then there is no need to prioritize the bytes...everyone sends what they have
int totalAllocated = min(connectionWindow, streamable); if (state.streamableBytesForTree() <= connectionWindow) {
for (Http2Stream child : parent.children()) {
state = state(child);
int bytesForChild = state.streamableBytes();
connectionWindow -= totalAllocated; if (bytesForChild > 0 || state.hasFrame()) {
int remainingInTree = state.streamableBytesForTree() - totalAllocated; state.allocate(bytesForChild);
if (stream.isLeaf() || remainingInTree <= 0 || connectionWindow <= 0) { writeChildNode(state);
// Nothing left to do in this subtree. bytesAllocated += bytesForChild;
state.allocateBytesForTree(totalAllocated); connectionWindow -= bytesForChild;
return totalAllocated; }
int childBytesAllocated = writeChildren(child, connectionWindow);
bytesAllocated += childBytesAllocated;
connectionWindow -= childBytesAllocated;
}
return bytesAllocated;
} }
// If the window is big enough to fit all the remaining data. Just write everything // This is the priority algorithm which will divide the available bytes based
// and skip the priority algorithm. // upon stream weight relative to its peers
if (remainingInTree <= connectionWindow) { Http2Stream[] children = parent.children().toArray(new Http2Stream[parent.numChildren()]);
for (Http2Stream child : stream.children()) { Arrays.sort(children, WEIGHT_ORDER);
int writtenToChild = allocateBytesForTree(child, connectionWindow); int totalWeight = parent.totalChildWeights();
totalAllocated += writtenToChild; for (int tail = children.length; tail > 0;) {
connectionWindow -= writtenToChild; int head = 0;
} int nextTail = 0;
state.allocateBytesForTree(totalAllocated); int nextTotalWeight = 0;
return totalAllocated; int nextConnectionWindow = connectionWindow;
} for (; head < tail && nextConnectionWindow > 0; ++head) {
Http2Stream child = children[head];
Http2Stream[] children = stream.children().toArray(new Http2Stream[0]); state = state(child);
Arrays.sort(children, DATA_ORDER);
// Clip the total remaining bytes by the connection window.
int totalWeight = stream.totalChildWeights();
int tail = children.length;
// Outer loop: continue until we've exhausted the connection window or allocated all bytes in the tree.
while (tail > 0 && connectionWindow > 0) {
int tailNextPass = 0;
int totalWeightNextPass = 0;
// Inner loop: allocate bytes to the children based on their weight.
for (int index = 0; index < tail && connectionWindow > 0; ++index) {
Http2Stream child = children[index];
OutboundFlowState childState = state(child);
// Determine the ratio of this stream to all children.
int weight = child.weight(); int weight = child.weight();
double weightRatio = weight / (double) totalWeight; double weightRatio = weight / (double) totalWeight;
int windowSlice = Math.max(1, (int) Math.round(connectionWindow * weightRatio)); int bytesForTree = Math.min(nextConnectionWindow, (int) Math.ceil(connectionWindow * weightRatio));
int bytesForChild = Math.min(state.streamableBytes(), bytesForTree);
// Allocate the bytes for this child. if (bytesForChild > 0 || state.hasFrame()) {
int allocated = allocateBytesForTree(child, windowSlice); state.allocate(bytesForChild);
bytesAllocated += bytesForChild;
totalAllocated += allocated; nextConnectionWindow -= bytesForChild;
connectionWindow -= allocated; bytesForTree -= bytesForChild;
totalWeight -= weight; // If this subtree still wants to send then re-insert into children list and re-consider for next
// iteration. This is needed because we don't yet know if all the peers will be able to use
if (childState.unallocatedBytesForTree() > 0) { // all of their "fair share" of the connection window, and if they don't use it then we should
// This stream still has more data, add it to the next pass. // divide their unused shared up for the peers who still want to send.
children[tailNextPass++] = child; if (state.streamableBytesForTree() - bytesForChild > 0) {
totalWeightNextPass += weight; children[nextTail++] = child;
nextTotalWeight += weight;
}
if (state.streamableBytes() - bytesForChild == 0) {
writeChildNode(state);
} }
} }
totalWeight = totalWeightNextPass; if (bytesForTree > 0) {
tail = tailNextPass; int childBytesAllocated = writeChildren(child, bytesForTree);
bytesAllocated += childBytesAllocated;
nextConnectionWindow -= childBytesAllocated;
}
}
connectionWindow = nextConnectionWindow;
totalWeight = nextTotalWeight;
tail = nextTail;
} }
state.allocateBytesForTree(totalAllocated); return bytesAllocated;
return totalAllocated; }
/**
* Write bytes allocated to {@code state}
*/
private static void writeChildNode(OutboundFlowState state) {
state.writeBytes(state.allocated());
state.resetAllocated();
} }
/** /**
@ -359,7 +365,7 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
private int window = initialWindowSize; private int window = initialWindowSize;
private int pendingBytes; private int pendingBytes;
private int streamableBytesForTree; private int streamableBytesForTree;
private int allocatedBytesForTree; private int allocated;
private ChannelFuture lastNewFrame; private ChannelFuture lastNewFrame;
private OutboundFlowState(Http2Stream stream) { private OutboundFlowState(Http2Stream stream) {
@ -373,33 +379,24 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
} }
/** /**
* Increments the number of bytes allocated to this tree by the priority algorithm. * Increment the number of bytes allocated to this stream by the priority algorithm
*/ */
private void allocateBytesForTree(int bytes) { private void allocate(int bytes) {
allocatedBytesForTree += bytes; allocated += bytes;
} }
/** /**
* Gets the number of bytes that have been allocated to this tree by the priority algorithm. * Gets the number of bytes that have been allocated to this stream by the priority algorithm.
*/ */
private int allocatedBytesForTree() { private int allocated() {
return allocatedBytesForTree; return allocated;
} }
/** /**
* Gets the number of unallocated bytes (i.e. {@link #streamableBytesForTree()} - * Reset the number of bytes that have been allocated to this stream by the priority algorithm.
* {@link #allocatedBytesForTree()}).
*/ */
private int unallocatedBytesForTree() { private void resetAllocated() {
return streamableBytesForTree - allocatedBytesForTree; allocated = 0;
}
/**
* Resets the number of bytes allocated to this stream. This is called at the end of the priority
* algorithm for each stream to reset the count for the next invocation.
*/
private void resetAllocatedBytesForTree() {
allocatedBytesForTree = 0;
} }
/** /**
@ -413,8 +410,7 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
int previouslyStreamable = streamableBytes(); int previouslyStreamable = streamableBytes();
window += delta; window += delta;
// Update this branch of the priority tree if the streamable bytes have changed for this // Update this branch of the priority tree if the streamable bytes have changed for this node.
// node.
int streamableDelta = streamableBytes() - previouslyStreamable; int streamableDelta = streamableBytes() - previouslyStreamable;
incrementStreamableBytesForTree(streamableDelta); incrementStreamableBytesForTree(streamableDelta);
return window; return window;
@ -491,17 +487,17 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
* boundaries. * boundaries.
*/ */
private int writeBytes(int bytes) { private int writeBytes(int bytes) {
int bytesWritten = 0;
if (!stream.localSideOpen()) { if (!stream.localSideOpen()) {
return bytesWritten; return 0;
} }
int bytesAttempted = 0;
int maxBytes = min(bytes, writableWindow()); int maxBytes = min(bytes, writableWindow());
while (hasFrame()) { while (hasFrame()) {
Frame pendingWrite = peek(); Frame pendingWrite = peek();
if (maxBytes >= pendingWrite.size()) { if (maxBytes >= pendingWrite.size()) {
// Window size is large enough to send entire data frame // Window size is large enough to send entire data frame
bytesWritten += pendingWrite.size(); bytesAttempted += pendingWrite.size();
pendingWrite.write(); pendingWrite.write();
} else if (maxBytes <= 0) { } else if (maxBytes <= 0) {
// No data from the current frame can be written - we're done. // No data from the current frame can be written - we're done.
@ -511,19 +507,19 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
} else { } else {
// We can send a partial frame // We can send a partial frame
Frame partialFrame = pendingWrite.split(maxBytes); Frame partialFrame = pendingWrite.split(maxBytes);
bytesWritten += partialFrame.size(); bytesAttempted += partialFrame.size();
partialFrame.write(); partialFrame.write();
} }
// Update the threshold. // Update the threshold.
maxBytes = min(bytes - bytesWritten, writableWindow()); maxBytes = min(bytes - bytesAttempted, writableWindow());
} }
return bytesWritten; return bytesAttempted;
} }
/** /**
* Recursively increments the streamable bytes for this branch in the priority tree starting * Recursively increments the streamable bytes for this branch in the priority tree starting at the current
* at the current node. * node.
*/ */
private void incrementStreamableBytesForTree(int numBytes) { private void incrementStreamableBytesForTree(int numBytes) {
if (numBytes != 0) { if (numBytes != 0) {
@ -545,8 +541,7 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
int padding; int padding;
boolean enqueued; boolean enqueued;
Frame(SimplePromiseAggregator promiseAggregator, ByteBuf data, int padding, Frame(SimplePromiseAggregator promiseAggregator, ByteBuf data, int padding, boolean endStream) {
boolean endStream) {
this.data = data; this.data = data;
this.padding = padding; this.padding = padding;
this.endStream = endStream; this.endStream = endStream;
@ -573,10 +568,9 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
} }
/** /**
* Increments the number of pending bytes for this node. If there was any change to the * Increments the number of pending bytes for this node. If there was any change to the number of bytes that
* number of bytes that fit into the stream window, then * fit into the stream window, then {@link #incrementStreamableBytesForTree} to recursively update this
* {@link #incrementStreamableBytesForTree} to recursively update this branch of the * branch of the priority tree.
* priority tree.
*/ */
private void incrementPendingBytes(int numBytes) { private void incrementPendingBytes(int numBytes) {
int previouslyStreamable = streamableBytes(); int previouslyStreamable = streamableBytes();
@ -587,9 +581,8 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
} }
/** /**
* Writes the frame and decrements the stream and connection window sizes. If the frame * Writes the frame and decrements the stream and connection window sizes. If the frame is in the pending
* is in the pending queue, the written bytes are removed from this branch of the * queue, the written bytes are removed from this branch of the priority tree.
* priority tree.
* <p> * <p>
* Note: this does not flush the {@link ChannelHandlerContext}. * Note: this does not flush the {@link ChannelHandlerContext}.
*/ */
@ -636,12 +629,11 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
} }
/** /**
* Creates a new frame that is a view of this frame's data. The {@code maxBytes} are * Creates a new frame that is a view of this frame's data. The {@code maxBytes} are first split from the
* first split from the data buffer. If not all the requested bytes are available, the * data buffer. If not all the requested bytes are available, the remaining bytes are then split from the
* remaining bytes are then split from the padding (if available). * padding (if available).
* *
* @param maxBytes * @param maxBytes the maximum number of bytes that is allowed in the created frame.
* the maximum number of bytes that is allowed in the created frame.
* @return the partial frame. * @return the partial frame.
*/ */
Frame split(int maxBytes) { Frame split(int maxBytes) {