diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2RemoteFlowController.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2RemoteFlowController.java index 0e56646650..a01399e657 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2RemoteFlowController.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2RemoteFlowController.java @@ -27,6 +27,7 @@ import io.netty.handler.codec.http2.Http2Stream.State; import java.util.ArrayDeque; import java.util.Arrays; +import java.util.Collection; import java.util.Comparator; import java.util.Deque; @@ -232,21 +233,29 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll int connectionWindow = state(connectionStream).window(); if (connectionWindow > 0) { - writeChildren(connectionStream, connectionWindow); - for (Http2Stream stream : connection.activeStreams()) { - writeChildNode(state(stream)); + // Allocate the bytes for the connection window to the streams, but do not write. + allocateBytesForTree(connectionStream, connectionWindow); + + // Now write all of the allocated bytes. Copying the activeStreams array to avoid + // side effects due to stream removal/addition which might occur as a result + // of end-of-stream or errors. + Collection streams = connection.activeStreams(); + for (Http2Stream stream : streams.toArray(new Http2Stream[streams.size()])) { + state(stream).writeAllocatedBytes(); } flush(); } } /** - * Write the children of {@code parent} in the priority tree. This will allocate bytes by stream weight. - * @param parent The parent of the nodes which will be written. + * This will allocate bytes by stream weight and priority for the entire tree rooted at {@code parent}, but does + * not write any bytes. + * + * @param parent The parent of the tree. * @param connectionWindow The connection window this is available for use at this point in the tree. * @return An object summarizing the write and allocation results. */ - private int writeChildren(Http2Stream parent, int connectionWindow) { + private int allocateBytesForTree(Http2Stream parent, int connectionWindow) { FlowState state = state(parent); if (state.streamableBytesForTree() <= 0) { return 0; @@ -261,11 +270,10 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll if (bytesForChild > 0 || state.hasFrame()) { state.allocate(bytesForChild); - writeChildNode(state); bytesAllocated += bytesForChild; connectionWindow -= bytesForChild; } - int childBytesAllocated = writeChildren(child, connectionWindow); + int childBytesAllocated = allocateBytesForTree(child, connectionWindow); bytesAllocated += childBytesAllocated; connectionWindow -= childBytesAllocated; } @@ -291,7 +299,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll int bytesForTree = Math.min(nextConnectionWindow, (int) Math.ceil(connectionWindow * weightRatio)); int bytesForChild = Math.min(state.streamableBytes(), bytesForTree); - if (bytesForChild > 0 || state.hasFrame()) { + if (bytesForChild > 0) { state.allocate(bytesForChild); bytesAllocated += bytesForChild; nextConnectionWindow -= bytesForChild; @@ -300,17 +308,14 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll // iteration. This is needed because we don't yet know if all the peers will be able to use // all of their "fair share" of the connection window, and if they don't use it then we should // divide their unused shared up for the peers who still want to send. - if (state.streamableBytesForTree() - bytesForChild > 0) { + if (state.streamableBytesForTree() > 0) { children[nextTail++] = child; nextTotalWeight += weight; } - if (state.streamableBytes() - bytesForChild == 0) { - writeChildNode(state); - } } if (bytesForTree > 0) { - int childBytesAllocated = writeChildren(child, bytesForTree); + int childBytesAllocated = allocateBytesForTree(child, bytesForTree); bytesAllocated += childBytesAllocated; nextConnectionWindow -= childBytesAllocated; } @@ -323,14 +328,6 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll return bytesAllocated; } - /** - * Write bytes allocated to {@code state} - */ - private static void writeChildNode(FlowState state) { - state.writeBytes(state.allocated()); - state.resetAllocated(); - } - /** * The outbound flow control state for a single stream. */ @@ -365,13 +362,24 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll */ void allocate(int bytes) { allocated += bytes; + // Also artificially reduce the streamable bytes for this tree to give the appearance + // that the data has been written. This will be restored before the allocated bytes are + // actually written. + incrementStreamableBytesForTree(-bytes); } /** - * Gets the number of bytes that have been allocated to this stream by the priority algorithm. + * Write bytes allocated bytes for this stream. */ - int allocated() { - return allocated; + void writeAllocatedBytes() { + int numBytes = allocated; + + // Restore the number of streamable bytes to this branch. + incrementStreamableBytesForTree(allocated); + resetAllocated(); + + // Perform the write. + writeBytes(numBytes); } /** @@ -415,7 +423,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll * not change (i.e. no tree traversal is required). */ int streamableBytes() { - return max(0, min(pendingBytes, window)); + return max(0, min(pendingBytes - allocated, window)); } int streamableBytesForTree() { @@ -484,13 +492,21 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll while (hasFrame()) { int maxBytes = min(bytes - bytesAttempted, writableWindow()); bytesAttempted += write(peek(), maxBytes); - if (bytes - bytesAttempted <= 0) { - break; + if (bytes - bytesAttempted <= 0 && !isNextFrameEmpty()) { + // The frame had data and all of it was written. + break; } } return bytesAttempted; } + /** + * @return {@code true} if there is a next frame and its size is zero. + */ + private boolean isNextFrameEmpty() { + return hasFrame() && peek().size() == 0; + } + /** * Writes the frame and decrements the stream and connection window sizes. If the frame is in the pending * queue, the written bytes are removed from this branch of the priority tree.