Decoupling allocation from writing in HTTP/2 outbound flow control

Motivation:

The current DefaultHttp2RemoteFlowController's writePendingBytes currently operates in 2 passes. The first allocates bytes and optionally writes some frames. The second pass just loops across all active streams and writes all remaining bytes.

If streams can be removed/added as a side effect of writing (EOS or error) then we need to take more care when the write actually occurs. Moving all of the writes to the second loop (across active streams) is simpler since we can just make a copy of the list and not worry about any restructuring of the priority tree that may result.

Modifications:

Modified DefaultHttp2RemoteFlowController.writePendingBytes to only allocate bytes on the first pass and then write any allocated bytes on the second pass.

Result:

Side effects resulting from writing should no longer impact the flow control algorithm.
This commit is contained in:
nmittler 2015-03-28 09:23:58 -07:00
parent a6c729bdf8
commit e1c24fd4e5

View File

@ -27,6 +27,7 @@ import io.netty.handler.codec.http2.Http2Stream.State;
import java.util.ArrayDeque; import java.util.ArrayDeque;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator; import java.util.Comparator;
import java.util.Deque; import java.util.Deque;
@ -232,21 +233,29 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
int connectionWindow = state(connectionStream).window(); int connectionWindow = state(connectionStream).window();
if (connectionWindow > 0) { if (connectionWindow > 0) {
writeChildren(connectionStream, connectionWindow); // Allocate the bytes for the connection window to the streams, but do not write.
for (Http2Stream stream : connection.activeStreams()) { allocateBytesForTree(connectionStream, connectionWindow);
writeChildNode(state(stream));
// Now write all of the allocated bytes. Copying the activeStreams array to avoid
// side effects due to stream removal/addition which might occur as a result
// of end-of-stream or errors.
Collection<Http2Stream> streams = connection.activeStreams();
for (Http2Stream stream : streams.toArray(new Http2Stream[streams.size()])) {
state(stream).writeAllocatedBytes();
} }
flush(); flush();
} }
} }
/** /**
* Write the children of {@code parent} in the priority tree. This will allocate bytes by stream weight. * This will allocate bytes by stream weight and priority for the entire tree rooted at {@code parent}, but does
* @param parent The parent of the nodes which will be written. * not write any bytes.
*
* @param parent The parent of the tree.
* @param connectionWindow The connection window this is available for use at this point in the tree. * @param connectionWindow The connection window this is available for use at this point in the tree.
* @return An object summarizing the write and allocation results. * @return An object summarizing the write and allocation results.
*/ */
private int writeChildren(Http2Stream parent, int connectionWindow) { private int allocateBytesForTree(Http2Stream parent, int connectionWindow) {
FlowState state = state(parent); FlowState state = state(parent);
if (state.streamableBytesForTree() <= 0) { if (state.streamableBytesForTree() <= 0) {
return 0; return 0;
@ -261,11 +270,10 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
if (bytesForChild > 0 || state.hasFrame()) { if (bytesForChild > 0 || state.hasFrame()) {
state.allocate(bytesForChild); state.allocate(bytesForChild);
writeChildNode(state);
bytesAllocated += bytesForChild; bytesAllocated += bytesForChild;
connectionWindow -= bytesForChild; connectionWindow -= bytesForChild;
} }
int childBytesAllocated = writeChildren(child, connectionWindow); int childBytesAllocated = allocateBytesForTree(child, connectionWindow);
bytesAllocated += childBytesAllocated; bytesAllocated += childBytesAllocated;
connectionWindow -= childBytesAllocated; connectionWindow -= childBytesAllocated;
} }
@ -291,7 +299,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
int bytesForTree = Math.min(nextConnectionWindow, (int) Math.ceil(connectionWindow * weightRatio)); int bytesForTree = Math.min(nextConnectionWindow, (int) Math.ceil(connectionWindow * weightRatio));
int bytesForChild = Math.min(state.streamableBytes(), bytesForTree); int bytesForChild = Math.min(state.streamableBytes(), bytesForTree);
if (bytesForChild > 0 || state.hasFrame()) { if (bytesForChild > 0) {
state.allocate(bytesForChild); state.allocate(bytesForChild);
bytesAllocated += bytesForChild; bytesAllocated += bytesForChild;
nextConnectionWindow -= bytesForChild; nextConnectionWindow -= bytesForChild;
@ -300,17 +308,14 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
// iteration. This is needed because we don't yet know if all the peers will be able to use // iteration. This is needed because we don't yet know if all the peers will be able to use
// all of their "fair share" of the connection window, and if they don't use it then we should // all of their "fair share" of the connection window, and if they don't use it then we should
// divide their unused shared up for the peers who still want to send. // divide their unused shared up for the peers who still want to send.
if (state.streamableBytesForTree() - bytesForChild > 0) { if (state.streamableBytesForTree() > 0) {
children[nextTail++] = child; children[nextTail++] = child;
nextTotalWeight += weight; nextTotalWeight += weight;
} }
if (state.streamableBytes() - bytesForChild == 0) {
writeChildNode(state);
}
} }
if (bytesForTree > 0) { if (bytesForTree > 0) {
int childBytesAllocated = writeChildren(child, bytesForTree); int childBytesAllocated = allocateBytesForTree(child, bytesForTree);
bytesAllocated += childBytesAllocated; bytesAllocated += childBytesAllocated;
nextConnectionWindow -= childBytesAllocated; nextConnectionWindow -= childBytesAllocated;
} }
@ -323,14 +328,6 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
return bytesAllocated; return bytesAllocated;
} }
/**
* Write bytes allocated to {@code state}
*/
private static void writeChildNode(FlowState state) {
state.writeBytes(state.allocated());
state.resetAllocated();
}
/** /**
* The outbound flow control state for a single stream. * The outbound flow control state for a single stream.
*/ */
@ -365,13 +362,24 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
*/ */
void allocate(int bytes) { void allocate(int bytes) {
allocated += bytes; allocated += bytes;
// Also artificially reduce the streamable bytes for this tree to give the appearance
// that the data has been written. This will be restored before the allocated bytes are
// actually written.
incrementStreamableBytesForTree(-bytes);
} }
/** /**
* Gets the number of bytes that have been allocated to this stream by the priority algorithm. * Write bytes allocated bytes for this stream.
*/ */
int allocated() { void writeAllocatedBytes() {
return allocated; int numBytes = allocated;
// Restore the number of streamable bytes to this branch.
incrementStreamableBytesForTree(allocated);
resetAllocated();
// Perform the write.
writeBytes(numBytes);
} }
/** /**
@ -415,7 +423,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
* not change (i.e. no tree traversal is required). * not change (i.e. no tree traversal is required).
*/ */
int streamableBytes() { int streamableBytes() {
return max(0, min(pendingBytes, window)); return max(0, min(pendingBytes - allocated, window));
} }
int streamableBytesForTree() { int streamableBytesForTree() {
@ -484,13 +492,21 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
while (hasFrame()) { while (hasFrame()) {
int maxBytes = min(bytes - bytesAttempted, writableWindow()); int maxBytes = min(bytes - bytesAttempted, writableWindow());
bytesAttempted += write(peek(), maxBytes); bytesAttempted += write(peek(), maxBytes);
if (bytes - bytesAttempted <= 0) { if (bytes - bytesAttempted <= 0 && !isNextFrameEmpty()) {
// The frame had data and all of it was written.
break; break;
} }
} }
return bytesAttempted; return bytesAttempted;
} }
/**
* @return {@code true} if there is a next frame and its size is zero.
*/
private boolean isNextFrameEmpty() {
return hasFrame() && peek().size() == 0;
}
/** /**
* Writes the frame and decrements the stream and connection window sizes. If the frame is in the pending * Writes the frame and decrements the stream and connection window sizes. If the frame is in the pending
* queue, the written bytes are removed from this branch of the priority tree. * queue, the written bytes are removed from this branch of the priority tree.