diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2RemoteFlowController.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2RemoteFlowController.java index b9243de2a4..2038ae69a6 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2RemoteFlowController.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2RemoteFlowController.java @@ -23,11 +23,11 @@ import static io.netty.handler.codec.http2.Http2Stream.State.IDLE; import static io.netty.util.internal.ObjectUtil.checkNotNull; import static java.lang.Math.max; import static java.lang.Math.min; + import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.http2.Http2Stream.State; import java.util.ArrayDeque; -import java.util.Arrays; import java.util.Deque; /** @@ -38,29 +38,37 @@ import java.util.Deque; */ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowController { private static final int MIN_WRITABLE_CHUNK = 32 * 1024; - private final Http2StreamVisitor WRITE_ALLOCATED_BYTES = new Http2StreamVisitor() { + + private final StreamByteDistributor.Writer writer = new StreamByteDistributor.Writer() { @Override - public boolean visit(Http2Stream stream) { - int written = state(stream).writeAllocatedBytes(); + public void write(Http2Stream stream, int numBytes) { + int written = state(stream).writeAllocatedBytes(numBytes); if (written != -1 && listener != null) { listener.streamWritten(stream, written); } - return true; } }; private final Http2Connection connection; private final Http2Connection.PropertyKey stateKey; + private final StreamByteDistributor streamByteDistributor; + private final AbstractState connectionState; private int initialWindowSize = DEFAULT_WINDOW_SIZE; private ChannelHandlerContext ctx; private Listener listener; public DefaultHttp2RemoteFlowController(Http2Connection connection) { + this(connection, new PriorityStreamByteDistributor(connection)); + } + + public DefaultHttp2RemoteFlowController(Http2Connection connection, + StreamByteDistributor streamByteDistributor) { this.connection = checkNotNull(connection, "connection"); + this.streamByteDistributor = checkNotNull(streamByteDistributor, "streamWriteDistributor"); // Add a flow state for the connection. stateKey = connection.newKey(); - connection.connectionStream().setProperty(stateKey, - new DefaultState(connection.connectionStream(), initialWindowSize)); + connectionState = new DefaultState(connection.connectionStream(), initialWindowSize); + connection.connectionStream().setProperty(stateKey, connectionState); // Register for notification of new streams. connection.addListener(new Http2ConnectionAdapter() { @@ -117,28 +125,6 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll state(stream).cancel(); } } - - @Override - public void onPriorityTreeParentChanged(Http2Stream stream, Http2Stream oldParent) { - Http2Stream parent = stream.parent(); - if (parent != null) { - int delta = state(stream).streamableBytesForTree(); - if (delta != 0) { - state(parent).incrementStreamableBytesForTree(delta); - } - } - } - - @Override - public void onPriorityTreeParentChanging(Http2Stream stream, Http2Stream newParent) { - Http2Stream parent = stream.parent(); - if (parent != null) { - int delta = -state(stream).streamableBytesForTree(); - if (delta != 0) { - state(parent).incrementStreamableBytesForTree(delta); - } - } - } }); } @@ -209,7 +195,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll assert ctx == null || ctx.executor().inEventLoop(); if (stream.id() == CONNECTION_STREAM_ID) { // Update the connection window - connectionState().incrementStreamWindow(delta); + connectionState.incrementStreamWindow(delta); } else { // Update the stream window AbstractState state = state(stream); @@ -238,31 +224,18 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll state.enqueueFrame(frame); } catch (Throwable t) { frame.error(ctx, t); - return; } } - /** - * For testing purposes only. Exposes the number of streamable bytes for the tree rooted at - * the given stream. - */ - int streamableBytesForTree(Http2Stream stream) { - return state(stream).streamableBytesForTree(); - } - private AbstractState state(Http2Stream stream) { return (AbstractState) checkNotNull(stream, "stream").getProperty(stateKey); } - private AbstractState connectionState() { - return (AbstractState) connection.connectionStream().getProperty(stateKey); - } - /** * Returns the flow control window for the entire connection. */ private int connectionWindowSize() { - return connectionState().windowSize(); + return connectionState.windowSize(); } private int minUsableChannelBytes() { @@ -285,16 +258,17 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll int useableBytes = channelWritableBytes > 0 ? max(channelWritableBytes, minUsableChannelBytes()) : 0; // Clip the usable bytes by the connection window. - return min(connectionState().windowSize(), useableBytes); + return min(connectionState.windowSize(), useableBytes); } /** * Package private for testing purposes only! - * @param requestedBytes The desired amount of bytes. - * @return The amount of bytes that can be supported by underlying {@link Channel} without queuing "too-much". + * + * @return The amount of bytes that can be supported by underlying {@link + * io.netty.channel.Channel} without queuing "too-much". */ - final int writableBytes(int requestedBytes) { - return Math.min(requestedBytes, maxUsableChannelBytes()); + private int writableBytes() { + return Math.min(connectionWindowSize(), maxUsableChannelBytes()); } /** @@ -302,198 +276,15 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll */ @Override public void writePendingBytes() throws Http2Exception { - AbstractState connectionState = connectionState(); - int connectionWindowSize; + int bytesToWrite = writableBytes(); + boolean haveUnwrittenBytes; + + // Using a do-while loop so that we always write at least once, regardless if we have + // bytesToWrite or not. This ensures that zero-length frames will always be written. do { - connectionWindowSize = writableBytes(connectionState.windowSize()); - - if (connectionWindowSize > 0) { - // Allocate the bytes for the connection window to the streams, but do not write. - allocateBytesForTree(connectionState.stream(), connectionWindowSize); - } - - // Write all of allocated bytes. We must call this even if no bytes are allocated as it is possible there - // are empty frames indicating the End Of Stream. - connection.forEachActiveStream(WRITE_ALLOCATED_BYTES); - } while (connectionState.streamableBytesForTree() > 0 && - connectionWindowSize > 0 && - ctx.channel().isWritable()); - } - - /** - * This will allocate bytes by stream weight and priority for the entire tree rooted at {@code parent}, but does not - * write any bytes. The connection window is generally distributed amongst siblings according to their weight, - * however we need to ensure that the entire connection window is used (assuming streams have >= connection window - * bytes to send) and we may need some sort of rounding to accomplish this. - * - * @param parent The parent of the tree. - * @param connectionWindowSize The connection window this is available for use at this point in the tree. - * @return An object summarizing the write and allocation results. - */ - int allocateBytesForTree(Http2Stream parent, int connectionWindowSize) throws Http2Exception { - AbstractState state = state(parent); - if (state.streamableBytesForTree() <= 0) { - return 0; - } - // If the number of streamable bytes for this tree will fit in the connection window - // then there is no need to prioritize the bytes...everyone sends what they have - if (state.streamableBytesForTree() <= connectionWindowSize) { - SimpleChildFeeder childFeeder = new SimpleChildFeeder(connectionWindowSize); - parent.forEachChild(childFeeder); - return childFeeder.bytesAllocated; - } - - ChildFeeder childFeeder = new ChildFeeder(parent, connectionWindowSize); - // Iterate once over all children of this parent and try to feed all the children. - parent.forEachChild(childFeeder); - - // Now feed any remaining children that are still hungry until the connection - // window collapses. - childFeeder.feedHungryChildren(); - - return childFeeder.bytesAllocated; - } - - /** - * A {@link Http2StreamVisitor} that performs the HTTP/2 priority algorithm to distribute the available connection - * window appropriately to the children of a given stream. - */ - private final class ChildFeeder implements Http2StreamVisitor { - final int maxSize; - int totalWeight; - int connectionWindow; - int nextTotalWeight; - int nextConnectionWindow; - int bytesAllocated; - Http2Stream[] stillHungry; - int nextTail; - - ChildFeeder(Http2Stream parent, int connectionWindow) { - maxSize = parent.numChildren(); - totalWeight = parent.totalChildWeights(); - this.connectionWindow = connectionWindow; - this.nextConnectionWindow = connectionWindow; - } - - @Override - public boolean visit(Http2Stream child) throws Http2Exception { - // In order to make progress toward the connection window due to possible rounding errors, we make sure - // that each stream (with data to send) is given at least 1 byte toward the connection window. - int connectionWindowChunk = max(1, (int) (connectionWindow * (child.weight() / (double) totalWeight))); - int bytesForTree = min(nextConnectionWindow, connectionWindowChunk); - - AbstractState state = state(child); - int bytesForChild = min(state.streamableBytes(), bytesForTree); - - // Allocate the bytes to this child. - if (bytesForChild > 0) { - state.allocate(bytesForChild); - bytesAllocated += bytesForChild; - nextConnectionWindow -= bytesForChild; - bytesForTree -= bytesForChild; - } - - // Allocate any remaining bytes to the children of this stream. - if (bytesForTree > 0) { - int childBytesAllocated = allocateBytesForTree(child, bytesForTree); - bytesAllocated += childBytesAllocated; - nextConnectionWindow -= childBytesAllocated; - } - - if (nextConnectionWindow > 0) { - // If this subtree still wants to send then it should be re-considered to take bytes that are unused by - // sibling nodes. This is needed because we don't yet know if all the peers will be able to use all of - // their "fair share" of the connection window, and if they don't use it then we should divide their - // unused shared up for the peers who still want to send. - if (state.streamableBytesForTree() > 0) { - stillHungry(child); - } - return true; - } - - return false; - } - - void feedHungryChildren() throws Http2Exception { - if (stillHungry == null) { - // There are no hungry children to feed. - return; - } - - totalWeight = nextTotalWeight; - connectionWindow = nextConnectionWindow; - - // Loop until there are not bytes left to stream or the connection window has collapsed. - for (int tail = nextTail; tail > 0 && connectionWindow > 0;) { - nextTotalWeight = 0; - nextTail = 0; - - // Iterate over the children that are currently still hungry. - for (int head = 0; head < tail && nextConnectionWindow > 0; ++head) { - if (!visit(stillHungry[head])) { - // The connection window has collapsed, break out of the loop. - break; - } - } - connectionWindow = nextConnectionWindow; - totalWeight = nextTotalWeight; - tail = nextTail; - } - } - - /** - * Indicates that the given child is still hungry (i.e. still has streamable bytes that can - * fit within the current connection window). - */ - private void stillHungry(Http2Stream child) { - ensureSpaceIsAllocated(nextTail); - stillHungry[nextTail++] = child; - nextTotalWeight += child.weight(); - } - - /** - * Ensures that the {@link #stillHungry} array is properly sized to hold the given index. - */ - private void ensureSpaceIsAllocated(int index) { - if (stillHungry == null) { - // Initial size is 1/4 the number of children. Clipping the minimum at 2, which will over allocate if - // maxSize == 1 but if this was true we shouldn't need to re-allocate because the 1 child should get - // all of the available connection window. - stillHungry = new Http2Stream[max(2, maxSize >>> 2)]; - } else if (index == stillHungry.length) { - // Grow the array by a factor of 2. - stillHungry = Arrays.copyOf(stillHungry, min(maxSize, stillHungry.length << 1)); - } - } - } - - /** - * A simplified version of {@link ChildFeeder} that is only used when all streamable bytes fit within the - * available connection window. - */ - private final class SimpleChildFeeder implements Http2StreamVisitor { - int bytesAllocated; - int connectionWindow; - - SimpleChildFeeder(int connectionWindow) { - this.connectionWindow = connectionWindow; - } - - @Override - public boolean visit(Http2Stream child) throws Http2Exception { - AbstractState childState = state(child); - int bytesForChild = childState.streamableBytes(); - - if (bytesForChild > 0 || childState.hasFrame()) { - childState.allocate(bytesForChild); - bytesAllocated += bytesForChild; - connectionWindow -= bytesForChild; - } - int childBytesAllocated = allocateBytesForTree(child, connectionWindow); - bytesAllocated += childBytesAllocated; - connectionWindow -= childBytesAllocated; - return true; - } + // Distribute the connection window across the streams and write the data. + haveUnwrittenBytes = streamByteDistributor.distribute(bytesToWrite, writer); + } while (haveUnwrittenBytes && (bytesToWrite = writableBytes()) > 0 && ctx.channel().isWritable()); } /** @@ -503,7 +294,6 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll private final Deque pendingWriteQueue; private int window; private int pendingBytes; - private int allocated; // Set to true while a frame is being written, false otherwise. private boolean writing; // Set to true if cancel() was called. @@ -537,31 +327,13 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll } @Override - void allocate(int bytes) { - allocated += bytes; - // Also artificially reduce the streamable bytes for this tree to give the appearance - // that the data has been written. This will be restored before the allocated bytes are - // actually written. - incrementStreamableBytesForTree(-bytes); - } - - @Override - int writeAllocatedBytes() { - int numBytes = allocated; - - // Restore the number of streamable bytes to this branch. - incrementStreamableBytesForTree(allocated); - resetAllocated(); - - // Perform the write. - return writeBytes(numBytes); - } - - /** - * Reset the number of bytes that have been allocated to this stream by the priority algorithm. - */ - private void resetAllocated() { - allocated = 0; + int writeAllocatedBytes(int allocated) { + try { + // Perform the write. + return writeBytes(allocated); + } finally { + streamByteDistributor.updateStreamableBytes(this); + } } @Override @@ -570,30 +342,19 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll throw streamError(stream.id(), FLOW_CONTROL_ERROR, "Window size overflow for stream: %d", stream.id()); } - int previouslyStreamable = streamableBytes(); window += delta; - // Update this branch of the priority tree if the streamable bytes have changed for this node. - int streamableDelta = streamableBytes() - previouslyStreamable; - if (streamableDelta != 0) { - incrementStreamableBytesForTree(streamableDelta); - } + streamByteDistributor.updateStreamableBytes(this); return window; } - @Override int writableWindow() { return min(window, connectionWindowSize()); } @Override - int streamableBytes() { - return max(0, min(pendingBytes - allocated, window)); - } - - @Override - int streamableBytesForTree() { - return streamableBytesForTree; + public int streamableBytes() { + return max(0, min(pendingBytes, window)); } @Override @@ -606,7 +367,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll } @Override - boolean hasFrame() { + public boolean hasFrame() { return !pendingWriteQueue.isEmpty(); } @@ -640,9 +401,9 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll writeError(frame, streamError(stream.id(), INTERNAL_ERROR, cause, "Stream closed before write could take place")); } + streamByteDistributor.updateStreamableBytes(this); } - @Override int writeBytes(int bytes) { if (!hasFrame()) { return -1; @@ -712,18 +473,11 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll } /** - * Increments the number of pending bytes for this node. If there was any change to the number of bytes that - * fit into the stream window, then {@link #incrementStreamableBytesForTree} is called to recursively update - * this branch of the priority tree. + * Increments the number of pending bytes for this node and updates the {@link StreamByteDistributor}. */ private void incrementPendingBytes(int numBytes) { - int previouslyStreamable = streamableBytes(); pendingBytes += numBytes; - - int delta = streamableBytes() - previouslyStreamable; - if (delta != 0) { - incrementStreamableBytesForTree(delta); - } + streamByteDistributor.updateStreamableBytes(this); } /** @@ -739,7 +493,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll private void decrementFlowControlWindow(int bytes) { try { int negativeBytes = -bytes; - connectionState().incrementStreamWindow(negativeBytes); + connectionState.incrementStreamWindow(negativeBytes); incrementStreamWindow(negativeBytes); } catch (Http2Exception e) { // Should never get here since we're decrementing. @@ -782,22 +536,12 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll } @Override - int writableWindow() { + public int streamableBytes() { return 0; } @Override - int streamableBytes() { - return 0; - } - - @Override - int streamableBytesForTree() { - return streamableBytesForTree; - } - - @Override - int writeAllocatedBytes() { + int writeAllocatedBytes(int allocated) { throw new UnsupportedOperationException(); } @@ -817,23 +561,13 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll return 0; } - @Override - int writeBytes(int bytes) { - throw new UnsupportedOperationException(); - } - @Override void enqueueFrame(FlowControlled frame) { throw new UnsupportedOperationException(); } @Override - void allocate(int bytes) { - throw new UnsupportedOperationException(); - } - - @Override - boolean hasFrame() { + public boolean hasFrame() { return false; } } @@ -841,9 +575,8 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll /** * An abstraction which provides specific extensions used by remote flow control. */ - private abstract class AbstractState { + private abstract class AbstractState implements StreamByteDistributor.StreamState { protected final Http2Stream stream; - protected int streamableBytesForTree; AbstractState(Http2Stream stream) { this.stream = stream; @@ -851,27 +584,16 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll AbstractState(AbstractState existingState) { this.stream = existingState.stream(); - this.streamableBytesForTree = existingState.streamableBytesForTree(); } /** * The stream this state is associated with. */ - final Http2Stream stream() { + @Override + public final Http2Stream stream() { return stream; } - /** - * Recursively increments the {@link #streamableBytesForTree()} for this branch in the priority tree starting - * at the current node. - */ - final void incrementStreamableBytesForTree(int numBytes) { - streamableBytesForTree += numBytes; - if (!stream.isRoot()) { - state(stream.parent()).incrementStreamableBytesForTree(numBytes); - } - } - abstract int windowSize(); abstract int initialWindowSize(); @@ -881,21 +603,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll * * @return the number of bytes written for a stream or {@code -1} if no write occurred. */ - abstract int writeAllocatedBytes(); - - /** - * Returns the number of pending bytes for this node that will fit within the - * {@link #writableWindow()}. This is used for the priority algorithm to determine the aggregate - * number of bytes that can be written at each node. Each node only takes into account its - * stream window so that when a change occurs to the connection window, these values need - * not change (i.e. no tree traversal is required). - */ - abstract int streamableBytes(); - - /** - * Get the {@link #streamableBytes()} for the entire tree rooted at this node. - */ - abstract int streamableBytesForTree(); + abstract int writeAllocatedBytes(int allocated); /** * Any operations that may be pending are cleared and the status of these operations is failed. @@ -912,31 +620,9 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll */ abstract int incrementStreamWindow(int delta) throws Http2Exception; - /** - * Returns the maximum writable window (minimum of the stream and connection windows). - */ - abstract int writableWindow(); - - /** - * Writes up to the number of bytes from the pending queue. May write less if limited by the writable window, by - * the number of pending writes available, or because a frame does not support splitting on arbitrary - * boundaries. Will return {@code -1} if there are no frames to write. - */ - abstract int writeBytes(int bytes); - /** * Adds the {@code frame} to the pending queue and increments the pending byte count. */ abstract void enqueueFrame(FlowControlled frame); - - /** - * Increment the number of bytes allocated to this stream by the priority algorithm - */ - abstract void allocate(int bytes); - - /** - * Indicates whether or not there are frames in the pending queue. - */ - abstract boolean hasFrame(); } } diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2RemoteFlowController.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2RemoteFlowController.java index ebb82c650e..e54339f443 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2RemoteFlowController.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2RemoteFlowController.java @@ -31,7 +31,7 @@ public interface Http2RemoteFlowController extends Http2FlowController { /** * Queues a payload for transmission to the remote endpoint. There is no guarantee as to when the data - * will be written or how it will be allocated to frames. + * will be written or how it will be assigned to frames. * before sending. *

* Writes do not actually occur until {@link #writePendingBytes()} is called. diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/PriorityStreamByteDistributor.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/PriorityStreamByteDistributor.java new file mode 100644 index 0000000000..c417a25714 --- /dev/null +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/PriorityStreamByteDistributor.java @@ -0,0 +1,428 @@ +/* + * Copyright 2015 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package io.netty.handler.codec.http2; + +import static io.netty.util.internal.ObjectUtil.checkNotNull; +import static java.lang.Math.max; +import static java.lang.Math.min; + +import java.util.Arrays; + +/** + * A {@link StreamByteDistributor} that implements the HTTP/2 priority tree algorithm for allocating + * bytes for all streams in the connection. + */ +public final class PriorityStreamByteDistributor implements StreamByteDistributor { + private final Http2Connection connection; + private final Http2Connection.PropertyKey stateKey; + private final WriteVisitor writeVisitor = new WriteVisitor(); + + public PriorityStreamByteDistributor(Http2Connection connection) { + this.connection = checkNotNull(connection, "connection"); + + // Add a state for the connection. + stateKey = connection.newKey(); + connection.connectionStream().setProperty(stateKey, + new PriorityState(connection.connectionStream())); + + // Register for notification of new streams. + connection.addListener(new Http2ConnectionAdapter() { + @Override + public void onStreamAdded(Http2Stream stream) { + stream.setProperty(stateKey, new PriorityState(stream)); + } + + @Override + public void onStreamClosed(Http2Stream stream) { + state(stream).close(); + } + + @Override + public void onPriorityTreeParentChanged(Http2Stream stream, Http2Stream oldParent) { + Http2Stream parent = stream.parent(); + if (parent != null) { + int delta = state(stream).unallocatedStreamableBytesForTree(); + if (delta != 0) { + state(parent).unallocatedStreamableBytesForTreeChanged(delta); + } + } + } + + @Override + public void onPriorityTreeParentChanging(Http2Stream stream, Http2Stream newParent) { + Http2Stream parent = stream.parent(); + if (parent != null) { + int delta = state(stream).unallocatedStreamableBytesForTree(); + if (delta != 0) { + state(parent).unallocatedStreamableBytesForTreeChanged(-delta); + } + } + } + }); + } + + @Override + public void updateStreamableBytes(StreamState streamState) { + state(streamState.stream()).updateStreamableBytes(streamState.streamableBytes(), + streamState.hasFrame()); + } + + @Override + public boolean distribute(int maxBytes, Writer writer) { + checkNotNull(writer, "writer"); + if (maxBytes > 0) { + allocateBytesForTree(connection.connectionStream(), maxBytes); + } + + // Need to write even if maxBytes == 0 in order to handle the case of empty frames. + writeVisitor.writeAllocatedBytes(writer); + + return state(connection.connectionStream()).unallocatedStreamableBytesForTree() > 0; + } + + /** + * For testing only. + */ + int unallocatedStreamableBytes(Http2Stream stream) { + return state(stream).unallocatedStreamableBytes(); + } + + /** + * For testing only. + */ + int unallocatedStreamableBytesForTree(Http2Stream stream) { + return state(stream).unallocatedStreamableBytesForTree(); + } + + /** + * This will allocate bytes by stream weight and priority for the entire tree rooted at {@code + * parent}, but does not write any bytes. The connection window is generally distributed amongst + * siblings according to their weight, however we need to ensure that the entire connection + * window is used (assuming streams have >= connection window bytes to send) and we may need + * some sort of rounding to accomplish this. + * + * @param parent The parent of the tree. + * @param connectionWindowSize The connection window this is available for use at this point in + * the tree. + * @return The number of bytes actually allocated. + */ + private int allocateBytesForTree(Http2Stream parent, int connectionWindowSize) { + PriorityState state = state(parent); + if (state.unallocatedStreamableBytesForTree() <= 0) { + return 0; + } + // If the number of streamable bytes for this tree will fit in the connection window + // then there is no need to prioritize the bytes...everyone sends what they have + if (state.unallocatedStreamableBytesForTree() <= connectionWindowSize) { + SimpleChildFeeder childFeeder = new SimpleChildFeeder(connectionWindowSize); + forEachChild(parent, childFeeder); + return childFeeder.bytesAllocated; + } + + ChildFeeder childFeeder = new ChildFeeder(parent, connectionWindowSize); + // Iterate once over all children of this parent and try to feed all the children. + forEachChild(parent, childFeeder); + + // Now feed any remaining children that are still hungry until the connection + // window collapses. + childFeeder.feedHungryChildren(); + + return childFeeder.bytesAllocated; + } + + private void forEachChild(Http2Stream parent, Http2StreamVisitor childFeeder) { + try { + parent.forEachChild(childFeeder); + } catch (Http2Exception e) { + // Should never happen since the feeder doesn't throw. + throw new IllegalStateException(e); + } + } + + private PriorityState state(Http2Stream stream) { + return checkNotNull(stream, "stream").getProperty(stateKey); + } + + /** + * A {@link Http2StreamVisitor} that performs the HTTP/2 priority algorithm to distribute the + * available connection window appropriately to the children of a given stream. + */ + private final class ChildFeeder implements Http2StreamVisitor { + final int maxSize; + int totalWeight; + int connectionWindow; + int nextTotalWeight; + int nextConnectionWindow; + int bytesAllocated; + Http2Stream[] stillHungry; + int nextTail; + + ChildFeeder(Http2Stream parent, int connectionWindow) { + maxSize = parent.numChildren(); + totalWeight = parent.totalChildWeights(); + this.connectionWindow = connectionWindow; + this.nextConnectionWindow = connectionWindow; + } + + @Override + public boolean visit(Http2Stream child) { + // In order to make progress toward the connection window due to possible rounding errors, we make sure + // that each stream (with data to send) is given at least 1 byte toward the connection window. + int connectionWindowChunk = + max(1, (int) (connectionWindow * (child.weight() / (double) totalWeight))); + int bytesForTree = min(nextConnectionWindow, connectionWindowChunk); + + PriorityState state = state(child); + int bytesForChild = min(state.unallocatedStreamableBytes(), bytesForTree); + + // Allocate the bytes to this child. + if (bytesForChild > 0) { + state.allocate(bytesForChild); + bytesAllocated += bytesForChild; + nextConnectionWindow -= bytesForChild; + bytesForTree -= bytesForChild; + } + + // Allocate any remaining bytes to the children of this stream. + if (bytesForTree > 0) { + int childBytesAllocated = allocateBytesForTree(child, bytesForTree); + bytesAllocated += childBytesAllocated; + nextConnectionWindow -= childBytesAllocated; + } + + if (nextConnectionWindow > 0) { + // If this subtree still wants to send then it should be re-considered to take bytes that are unused by + // sibling nodes. This is needed because we don't yet know if all the peers will be able to use all of + // their "fair share" of the connection window, and if they don't use it then we should divide their + // unused shared up for the peers who still want to send. + if (state.unallocatedStreamableBytesForTree() > 0) { + stillHungry(child); + } + return true; + } + + return false; + } + + void feedHungryChildren() { + if (stillHungry == null) { + // There are no hungry children to feed. + return; + } + + totalWeight = nextTotalWeight; + connectionWindow = nextConnectionWindow; + + // Loop until there are not bytes left to stream or the connection window has collapsed. + for (int tail = nextTail; tail > 0 && connectionWindow > 0;) { + nextTotalWeight = 0; + nextTail = 0; + + // Iterate over the children that are currently still hungry. + for (int head = 0; head < tail && nextConnectionWindow > 0; ++head) { + if (!visit(stillHungry[head])) { + // The connection window has collapsed, break out of the loop. + break; + } + } + connectionWindow = nextConnectionWindow; + totalWeight = nextTotalWeight; + tail = nextTail; + } + } + + /** + * Indicates that the given child is still hungry (i.e. still has streamable bytes that can + * fit within the current connection window). + */ + void stillHungry(Http2Stream child) { + ensureSpaceIsAllocated(nextTail); + stillHungry[nextTail++] = child; + nextTotalWeight += child.weight(); + } + + /** + * Ensures that the {@link #stillHungry} array is properly sized to hold the given index. + */ + void ensureSpaceIsAllocated(int index) { + if (stillHungry == null) { + // Initial size is 1/4 the number of children. Clipping the minimum at 2, which will over allocate if + // maxSize == 1 but if this was true we shouldn't need to re-allocate because the 1 child should get + // all of the available connection window. + stillHungry = new Http2Stream[max(2, maxSize >>> 2)]; + } else if (index == stillHungry.length) { + // Grow the array by a factor of 2. + stillHungry = Arrays.copyOf(stillHungry, min(maxSize, stillHungry.length << 1)); + } + } + } + + /** + * A simplified version of {@link ChildFeeder} that is only used when all streamable bytes fit + * within the available connection window. + */ + private final class SimpleChildFeeder implements Http2StreamVisitor { + int bytesAllocated; + int connectionWindow; + + SimpleChildFeeder(int connectionWindow) { + this.connectionWindow = connectionWindow; + } + + @Override + public boolean visit(Http2Stream child) { + PriorityState childState = state(child); + int bytesForChild = childState.unallocatedStreamableBytes(); + + if (bytesForChild > 0 || childState.hasFrame()) { + childState.allocate(bytesForChild); + bytesAllocated += bytesForChild; + connectionWindow -= bytesForChild; + } + int childBytesAllocated = allocateBytesForTree(child, connectionWindow); + bytesAllocated += childBytesAllocated; + connectionWindow -= childBytesAllocated; + return true; + } + } + + /** + * The remote flow control state for a single stream. + */ + private final class PriorityState { + final Http2Stream stream; + boolean hasFrame; + int streamableBytes; + int allocated; + int unallocatedStreamableBytesForTree; + + PriorityState(Http2Stream stream) { + this.stream = stream; + } + + /** + * Recursively increments the {@link #unallocatedStreamableBytesForTree()} for this branch in + * the priority tree starting at the current node. + */ + void unallocatedStreamableBytesForTreeChanged(int delta) { + unallocatedStreamableBytesForTree += delta; + if (!stream.isRoot()) { + state(stream.parent()).unallocatedStreamableBytesForTreeChanged(delta); + } + } + + void allocate(int bytes) { + allocated += bytes; + + if (bytes != 0) { + // Also artificially reduce the streamable bytes for this tree to give the appearance + // that the data has been written. This will be restored before the allocated bytes are + // actually written. + unallocatedStreamableBytesForTreeChanged(-bytes); + } + } + + /** + * Reset the number of bytes that have been allocated to this stream by the priority + * algorithm. + */ + void resetAllocated() { + allocate(-allocated); + } + + void updateStreamableBytes(int newStreamableBytes, boolean hasFrame) { + this.hasFrame = hasFrame; + + int delta = newStreamableBytes - streamableBytes; + if (delta != 0) { + streamableBytes = newStreamableBytes; + + // Update this branch of the priority tree if the streamable bytes have changed for this node. + unallocatedStreamableBytesForTreeChanged(delta); + } + } + + void close() { + // Unallocate all bytes. + resetAllocated(); + + // Clear the streamable bytes. + updateStreamableBytes(0, false); + } + + boolean hasFrame() { + return hasFrame; + } + + int unallocatedStreamableBytes() { + return streamableBytes - allocated; + } + + int unallocatedStreamableBytesForTree() { + return unallocatedStreamableBytesForTree; + } + } + + /** + * A connection stream visitor that delegates to the user provided visitor. + */ + private class WriteVisitor implements Http2StreamVisitor { + Writer writer; + RuntimeException error; + + void writeAllocatedBytes(Writer writer) { + try { + this.writer = writer; + try { + connection.forEachActiveStream(this); + } catch (Http2Exception e) { + // Should never happen since the visitor doesn't throw. + throw new IllegalStateException(e); + } + + // If an error was caught when calling back the visitor, throw it now. + if (error != null) { + throw error; + } + } finally { + error = null; + } + } + + @Override + public boolean visit(Http2Stream stream) { + PriorityState state = state(stream); + try { + int allocated = state.allocated; + + // Unallocate all bytes for this stream. + state.resetAllocated(); + + // Write the allocated bytes. + if (error == null) { + writer.write(stream, allocated); + } + } catch (RuntimeException e) { + // Stop calling the visitor, but continue in the loop to reset the allocated for + // all remaining states. + error = e; + } + + // We have to iterate across all streams to ensure that we reset the allocated bytes. + return true; + } + } +} diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/StreamByteDistributor.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/StreamByteDistributor.java new file mode 100644 index 0000000000..3a52b3cbc6 --- /dev/null +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/StreamByteDistributor.java @@ -0,0 +1,83 @@ +/* + * Copyright 2015 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package io.netty.handler.codec.http2; + +/** + * An object (used by remote flow control) that is responsible for distributing the bytes to be + * written across the streams in the connection. + */ +public interface StreamByteDistributor { + + /** + * State information for the stream, indicating the number of bytes that are currently + * streamable. This is provided to the {@link #updateStreamableBytes(StreamState)} method. + */ + interface StreamState { + /** + * Gets the stream this state is associated with. + */ + Http2Stream stream(); + + /** + * Returns the number of pending bytes for this node that will fit within the stream flow + * control window. This is used for the priority algorithm to determine the aggregate number + * of bytes that can be written at each node. Each node only takes into account its stream + * window so that when a change occurs to the connection window, these values need not + * change (i.e. no tree traversal is required). + */ + int streamableBytes(); + + /** + * Indicates whether or not there are frames pending for this stream. + */ + boolean hasFrame(); + } + + /** + * Object that performs the writing of the bytes that have been allocated for a stream. + */ + interface Writer { + /** + * Writes the allocated bytes for this stream. + * @param stream the stream for which to perform the write. + * @param numBytes the number of bytes to write. + */ + void write(Http2Stream stream, int numBytes); + } + + /** + * Called when the streamable bytes for a stream has changed. Until this + * method is called for the first time for a give stream, the stream is assumed to have no + * streamable bytes. + */ + void updateStreamableBytes(StreamState state); + + /** + * Distributes up to {@code maxBytes} to those streams containing streamable bytes and + * iterates across those streams to write the appropriate bytes. Criteria for + * traversing streams is undefined and it is up to the implementation to determine when to stop + * at a given stream. + * + *

The streamable bytes are not automatically updated by calling this method. It is up to the + * caller to indicate the number of bytes streamable after the write by calling + * {@link #updateStreamableBytes(StreamState)}. + * + * @param maxBytes the maximum number of bytes to write. + * @return {@code true} if there are still streamable bytes that have not yet been written, + * otherwise {@code false}. + */ + boolean distribute(int maxBytes, Writer writer); +} diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2RemoteFlowControllerTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2RemoteFlowControllerTest.java index 95dd831fd3..228a993d35 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2RemoteFlowControllerTest.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2RemoteFlowControllerTest.java @@ -25,7 +25,6 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyInt; -import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.never; @@ -35,23 +34,15 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.verifyZeroInteractions; import static org.mockito.Mockito.when; + import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; import io.netty.channel.ChannelConfig; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; import io.netty.handler.codec.http2.Http2FrameWriter.Configuration; -import io.netty.util.collection.IntObjectHashMap; -import io.netty.util.collection.IntObjectMap; import io.netty.util.concurrent.EventExecutor; - -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.atomic.AtomicInteger; - import junit.framework.AssertionFailedError; - import org.junit.Before; import org.junit.Test; import org.mockito.Mock; @@ -60,6 +51,8 @@ import org.mockito.MockitoAnnotations; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; +import java.util.concurrent.atomic.AtomicInteger; + /** * Tests for {@link DefaultHttp2RemoteFlowController}. */ @@ -68,7 +61,6 @@ public class DefaultHttp2RemoteFlowControllerTest { private static final int STREAM_B = 3; private static final int STREAM_C = 5; private static final int STREAM_D = 7; - private static final int STREAM_E = 9; private DefaultHttp2RemoteFlowController controller; @@ -515,750 +507,6 @@ public class DefaultHttp2RemoteFlowControllerTest { assertEquals(DEFAULT_WINDOW_SIZE, window(STREAM_D)); } - /** - * In this test, we block A which allows bytes to be written by C and D. Here's a view of the tree (stream A is - * blocked). - * - *

-     *         0
-     *        / \
-     *      [A]  B
-     *      / \
-     *     C   D
-     * 
- */ - @Test - public void blockedStreamShouldSpreadDataToChildren() throws Http2Exception { - // Block stream A - exhaustStreamWindow(STREAM_A); - - // Block the connection - exhaustStreamWindow(CONNECTION_STREAM_ID); - - // Try sending 10 bytes on each stream. They will be pending until we free up the - // connection. - - FakeFlowControlled dataA = new FakeFlowControlled(10); - FakeFlowControlled dataB = new FakeFlowControlled(10); - FakeFlowControlled dataC = new FakeFlowControlled(10); - FakeFlowControlled dataD = new FakeFlowControlled(10); - - sendData(STREAM_A, dataA); - sendData(STREAM_B, dataB); - sendData(STREAM_C, dataC); - sendData(STREAM_D, dataD); - controller.writePendingBytes(); - dataA.assertNotWritten(); - dataB.assertNotWritten(); - dataC.assertNotWritten(); - dataD.assertNotWritten(); - - // Verify that the entire frame was sent. - incrementWindowSize(CONNECTION_STREAM_ID, 10); - controller.writePendingBytes(); - - assertEquals(0, window(CONNECTION_STREAM_ID)); - - // A is not written - assertEquals(0, window(STREAM_A)); - dataA.assertNotWritten(); - - // B is partially written - assertEquals(DEFAULT_WINDOW_SIZE - 5, window(STREAM_B), 2); - dataB.assertPartiallyWritten(5); - verify(listener, times(1)).streamWritten(stream(STREAM_B), 5); - - // Verify that C and D each shared half of A's allowance. Since A's allowance (5) cannot - // be split evenly, one will get 3 and one will get 2. - assertEquals(2 * DEFAULT_WINDOW_SIZE - 5, window(STREAM_C) + window(STREAM_D), 5); - dataC.assertPartiallyWritten(3); - verify(listener, times(1)).streamWritten(stream(STREAM_C), 3); - dataD.assertPartiallyWritten(2); - verify(listener, times(1)).streamWritten(stream(STREAM_D), 2); - } - - /** - * In this test, we block B which allows all bytes to be written by A. A should not share the data with its children - * since it's not blocked. - * - *
-     *         0
-     *        / \
-     *       A  [B]
-     *      / \
-     *     C   D
-     * 
- */ - @Test - public void childrenShouldNotSendDataUntilParentBlocked() throws Http2Exception { - // Block stream B - exhaustStreamWindow(STREAM_B); - - // Block the connection - exhaustStreamWindow(CONNECTION_STREAM_ID); - - FakeFlowControlled dataA = new FakeFlowControlled(10); - FakeFlowControlled dataB = new FakeFlowControlled(10); - FakeFlowControlled dataC = new FakeFlowControlled(10); - FakeFlowControlled dataD = new FakeFlowControlled(10); - - sendData(STREAM_A, dataA); - sendData(STREAM_B, dataB); - sendData(STREAM_C, dataC); - sendData(STREAM_D, dataD); - controller.writePendingBytes(); - - dataA.assertNotWritten(); - dataB.assertNotWritten(); - dataC.assertNotWritten(); - dataD.assertNotWritten(); - - // Verify that the entire frame was sent. - incrementWindowSize(CONNECTION_STREAM_ID, 10); - controller.writePendingBytes(); - assertEquals(0, window(CONNECTION_STREAM_ID)); - assertEquals(DEFAULT_WINDOW_SIZE - 10, window(STREAM_A)); - assertEquals(0, window(STREAM_B)); - assertEquals(DEFAULT_WINDOW_SIZE, window(STREAM_C)); - assertEquals(DEFAULT_WINDOW_SIZE, window(STREAM_D)); - - dataA.assertFullyWritten(); - verify(listener, times(1)).streamWritten(stream(STREAM_A), 10); - - dataB.assertNotWritten(); - dataC.assertNotWritten(); - dataD.assertNotWritten(); - } - - /** - * In this test, we block B which allows all bytes to be written by A. Once A is complete, it will spill over the - * remaining of its portion to its children. - * - *
-     *         0
-     *        / \
-     *       A  [B]
-     *      / \
-     *     C   D
-     * 
- */ - @Test - public void parentShouldWaterFallDataToChildren() throws Http2Exception { - // Block stream B - exhaustStreamWindow(STREAM_B); - - // Block the connection - exhaustStreamWindow(CONNECTION_STREAM_ID); - - // Only send 5 to A so that it will allow data from its children. - FakeFlowControlled dataA = new FakeFlowControlled(5); - FakeFlowControlled dataB = new FakeFlowControlled(10); - FakeFlowControlled dataC = new FakeFlowControlled(10); - FakeFlowControlled dataD = new FakeFlowControlled(10); - - sendData(STREAM_A, dataA); - sendData(STREAM_B, dataB); - sendData(STREAM_C, dataC); - sendData(STREAM_D, dataD); - controller.writePendingBytes(); - - dataA.assertNotWritten(); - dataB.assertNotWritten(); - dataC.assertNotWritten(); - dataD.assertNotWritten(); - - // Verify that the entire frame was sent. - incrementWindowSize(CONNECTION_STREAM_ID, 10); - controller.writePendingBytes(); - assertEquals(0, window(CONNECTION_STREAM_ID)); - assertEquals(DEFAULT_WINDOW_SIZE - 5, window(STREAM_A)); - assertEquals(0, window(STREAM_B)); - assertEquals(2 * DEFAULT_WINDOW_SIZE - 5, window(STREAM_C) + window(STREAM_D)); - - // Verify that C and D each shared half of A's allowance. Since A's allowance (5) cannot - // be split evenly, one will get 3 and one will get 2. - dataA.assertFullyWritten(); - verify(listener, times(1)).streamWritten(stream(STREAM_A), 5); - dataB.assertNotWritten(); - dataC.assertPartiallyWritten(3); - verify(listener, times(1)).streamWritten(stream(STREAM_C), 3); - dataD.assertPartiallyWritten(2); - verify(listener, times(1)).streamWritten(stream(STREAM_D), 2); - } - - /** - * In this test, we verify re-prioritizing a stream. We start out with B blocked: - * - *
-     *         0
-     *        / \
-     *       A  [B]
-     *      / \
-     *     C   D
-     * 
- * - * We then re-prioritize D so that it's directly off of the connection and verify that A and D split the written - * bytes between them. - * - *
-     *           0
-     *          /|\
-     *        /  |  \
-     *       A  [B]  D
-     *      /
-     *     C
-     * 
- */ - @Test - public void reprioritizeShouldAdjustOutboundFlow() throws Http2Exception { - // Block stream B - exhaustStreamWindow(STREAM_B); - - // Block the connection - exhaustStreamWindow(CONNECTION_STREAM_ID); - - // Send 10 bytes to each. - FakeFlowControlled dataA = new FakeFlowControlled(10); - FakeFlowControlled dataB = new FakeFlowControlled(10); - FakeFlowControlled dataC = new FakeFlowControlled(10); - FakeFlowControlled dataD = new FakeFlowControlled(10); - - sendData(STREAM_A, dataA); - sendData(STREAM_B, dataB); - sendData(STREAM_C, dataC); - sendData(STREAM_D, dataD); - controller.writePendingBytes(); - - dataA.assertNotWritten(); - dataB.assertNotWritten(); - dataC.assertNotWritten(); - dataD.assertNotWritten(); - - // Re-prioritize D as a direct child of the connection. - setPriority(STREAM_D, 0, DEFAULT_PRIORITY_WEIGHT, false); - - // Verify that the entire frame was sent. - incrementWindowSize(CONNECTION_STREAM_ID, 10); - controller.writePendingBytes(); - assertEquals(0, window(CONNECTION_STREAM_ID)); - assertEquals(DEFAULT_WINDOW_SIZE - 5, window(STREAM_A), 2); - assertEquals(0, window(STREAM_B)); - assertEquals(DEFAULT_WINDOW_SIZE, window(STREAM_C)); - assertEquals(DEFAULT_WINDOW_SIZE - 5, window(STREAM_D), 2); - - // Verify that A and D split the bytes. - dataA.assertPartiallyWritten(5); - verify(listener, times(1)).streamWritten(stream(STREAM_A), 5); - dataB.assertNotWritten(); - dataC.assertNotWritten(); - dataD.assertPartiallyWritten(5); - verify(listener, times(1)).streamWritten(stream(STREAM_D), 5); - } - - /** - * Test that the maximum allowed amount the flow controller allows to be sent is always fully allocated if - * the streams have at least this much data to send. See https://github.com/netty/netty/issues/4266. - *
-     *            0
-     *          / | \
-     *        /   |   \
-     *      A(0) B(0) C(0)
-     *     /
-     *    D(> allowed to send in 1 allocation attempt)
-     * 
- */ - @Test - public void unstreamableParentsShouldFeedHungryChildren() throws Http2Exception { - // Max all connection windows. We don't want this being a limiting factor in the test. - maxStreamWindow(CONNECTION_STREAM_ID); - maxStreamWindow(STREAM_A); - maxStreamWindow(STREAM_B); - maxStreamWindow(STREAM_C); - maxStreamWindow(STREAM_D); - - // Setup the priority tree. - setPriority(STREAM_A, 0, (short) 32, false); - setPriority(STREAM_B, 0, (short) 16, false); - setPriority(STREAM_C, 0, (short) 16, false); - setPriority(STREAM_D, STREAM_A, (short) 16, false); - - // The bytesBeforeUnwritable defaults to Long.MAX_VALUE, we need to leave room to send enough data to exceed - // the writableBytes, and so we must reduce this value to something no-zero. - when(channel.bytesBeforeUnwritable()).thenReturn(1L); - - // Calculate the max amount of data the flow controller will allow to be sent now. - final int writableBytes = controller.writableBytes(window(CONNECTION_STREAM_ID)); - - // This is insider knowledge into how writePendingBytes works. Because the algorithm will keep looping while - // the channel is writable, we simulate that the channel will become unwritable after the first write. - when(channel.isWritable()).thenReturn(false); - - // Send enough so it can not be completely written out - final int expectedUnsentAmount = 1; - // Make sure we don't overflow - assertTrue(Integer.MAX_VALUE - expectedUnsentAmount > writableBytes); - FakeFlowControlled dataD = new FakeFlowControlled(writableBytes + expectedUnsentAmount); - sendData(STREAM_D, dataD); - controller.writePendingBytes(); - - dataD.assertPartiallyWritten(writableBytes); - verify(listener, times(1)).streamWritten(eq(stream(STREAM_D)), eq(writableBytes)); - } - - /** - * In this test, we root all streams at the connection, and then verify that data is split appropriately based on - * weight (all available data is the same). - * - *
-     *           0
-     *        / / \ \
-     *       A B   C D
-     * 
- */ - @Test - public void writeShouldPreferHighestWeight() throws Http2Exception { - // Block the connection - exhaustStreamWindow(CONNECTION_STREAM_ID); - - // Root the streams at the connection and assign weights. - setPriority(STREAM_A, 0, (short) 50, false); - setPriority(STREAM_B, 0, (short) 200, false); - setPriority(STREAM_C, 0, (short) 100, false); - setPriority(STREAM_D, 0, (short) 100, false); - - FakeFlowControlled dataA = new FakeFlowControlled(1000); - FakeFlowControlled dataB = new FakeFlowControlled(1000); - FakeFlowControlled dataC = new FakeFlowControlled(1000); - FakeFlowControlled dataD = new FakeFlowControlled(1000); - - sendData(STREAM_A, dataA); - sendData(STREAM_B, dataB); - sendData(STREAM_C, dataC); - sendData(STREAM_D, dataD); - controller.writePendingBytes(); - - dataA.assertNotWritten(); - dataB.assertNotWritten(); - dataC.assertNotWritten(); - dataD.assertNotWritten(); - - // Allow 1000 bytes to be sent. - incrementWindowSize(CONNECTION_STREAM_ID, 1000); - controller.writePendingBytes(); - - // All writes sum == 1000 - assertEquals(1000, dataA.written() + dataB.written() + dataC.written() + dataD.written()); - int allowedError = 10; - dataA.assertPartiallyWritten(109, allowedError); - dataB.assertPartiallyWritten(445, allowedError); - dataC.assertPartiallyWritten(223, allowedError); - dataD.assertPartiallyWritten(223, allowedError); - verify(listener, times(1)).streamWritten(eq(stream(STREAM_A)), anyInt()); - verify(listener, times(1)).streamWritten(eq(stream(STREAM_B)), anyInt()); - verify(listener, times(1)).streamWritten(eq(stream(STREAM_C)), anyInt()); - verify(listener, times(1)).streamWritten(eq(stream(STREAM_D)), anyInt()); - - assertEquals(0, window(CONNECTION_STREAM_ID)); - assertEquals(DEFAULT_WINDOW_SIZE - dataA.written(), window(STREAM_A)); - assertEquals(DEFAULT_WINDOW_SIZE - dataB.written(), window(STREAM_B)); - assertEquals(DEFAULT_WINDOW_SIZE - dataC.written(), window(STREAM_C)); - assertEquals(DEFAULT_WINDOW_SIZE - dataD.written(), window(STREAM_D)); - } - - /** - * In this test, we root all streams at the connection, and then verify that data is split equally among the stream, - * since they all have the same weight. - * - *
-     *           0
-     *        / / \ \
-     *       A B   C D
-     * 
- */ - @Test - public void samePriorityShouldDistributeBasedOnData() throws Http2Exception { - // Block the connection - exhaustStreamWindow(CONNECTION_STREAM_ID); - - // Root the streams at the connection with the same weights. - setPriority(STREAM_A, 0, DEFAULT_PRIORITY_WEIGHT, false); - setPriority(STREAM_B, 0, DEFAULT_PRIORITY_WEIGHT, false); - setPriority(STREAM_C, 0, DEFAULT_PRIORITY_WEIGHT, false); - setPriority(STREAM_D, 0, DEFAULT_PRIORITY_WEIGHT, false); - - // Send a bunch of data on each stream. - FakeFlowControlled dataA = new FakeFlowControlled(400); - FakeFlowControlled dataB = new FakeFlowControlled(500); - FakeFlowControlled dataC = new FakeFlowControlled(0); - FakeFlowControlled dataD = new FakeFlowControlled(700); - - sendData(STREAM_A, dataA); - sendData(STREAM_B, dataB); - sendData(STREAM_C, dataC); - sendData(STREAM_D, dataD); - controller.writePendingBytes(); - - dataA.assertNotWritten(); - dataB.assertNotWritten(); - // The write will occur on C, because it's an empty frame. - dataC.assertFullyWritten(); - verify(listener, times(1)).streamWritten(stream(STREAM_C), 0); - dataD.assertNotWritten(); - - // Allow 1000 bytes to be sent. - incrementWindowSize(CONNECTION_STREAM_ID, 999); - controller.writePendingBytes(); - - assertEquals(0, window(CONNECTION_STREAM_ID)); - assertEquals(DEFAULT_WINDOW_SIZE - 333, window(STREAM_A), 50); - assertEquals(DEFAULT_WINDOW_SIZE - 333, window(STREAM_B), 50); - assertEquals(DEFAULT_WINDOW_SIZE, window(STREAM_C)); - assertEquals(DEFAULT_WINDOW_SIZE - 333, window(STREAM_D), 50); - - dataA.assertPartiallyWritten(333); - verify(listener, times(1)).streamWritten(stream(STREAM_A), 333); - dataB.assertPartiallyWritten(333); - verify(listener, times(1)).streamWritten(stream(STREAM_B), 333); - dataD.assertPartiallyWritten(333); - verify(listener, times(1)).streamWritten(stream(STREAM_D), 333); - } - - /** - * In this test, we block all streams and verify the priority bytes for each sub tree at each node are correct - * - *
-     *        [0]
-     *        / \
-     *       A   B
-     *      / \
-     *     C   D
-     * 
- */ - @Test - public void subTreeBytesShouldBeCorrect() throws Http2Exception { - // Block the connection - exhaustStreamWindow(CONNECTION_STREAM_ID); - - Http2Stream stream0 = connection.connectionStream(); - Http2Stream streamA = connection.stream(STREAM_A); - Http2Stream streamB = connection.stream(STREAM_B); - Http2Stream streamC = connection.stream(STREAM_C); - Http2Stream streamD = connection.stream(STREAM_D); - - // Send a bunch of data on each stream. - final IntObjectMap streamSizes = new IntObjectHashMap(4); - streamSizes.put(STREAM_A, (Integer) 400); - streamSizes.put(STREAM_B, (Integer) 500); - streamSizes.put(STREAM_C, (Integer) 600); - streamSizes.put(STREAM_D, (Integer) 700); - - FakeFlowControlled dataA = new FakeFlowControlled(streamSizes.get(STREAM_A)); - FakeFlowControlled dataB = new FakeFlowControlled(streamSizes.get(STREAM_B)); - FakeFlowControlled dataC = new FakeFlowControlled(streamSizes.get(STREAM_C)); - FakeFlowControlled dataD = new FakeFlowControlled(streamSizes.get(STREAM_D)); - - sendData(STREAM_A, dataA); - sendData(STREAM_B, dataB); - sendData(STREAM_C, dataC); - sendData(STREAM_D, dataD); - controller.writePendingBytes(); - - dataA.assertNotWritten(); - dataB.assertNotWritten(); - dataC.assertNotWritten(); - dataD.assertNotWritten(); - - assertEquals(calculateStreamSizeSum(streamSizes, - Arrays.asList(STREAM_A, STREAM_B, STREAM_C, STREAM_D)), - streamableBytesForTree(stream0)); - assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_A, STREAM_C, STREAM_D)), - streamableBytesForTree(streamA)); - assertEquals(calculateStreamSizeSum(streamSizes, Collections.singletonList(STREAM_B)), - streamableBytesForTree(streamB)); - assertEquals(calculateStreamSizeSum(streamSizes, Collections.singletonList(STREAM_C)), - streamableBytesForTree(streamC)); - assertEquals(calculateStreamSizeSum(streamSizes, Collections.singletonList(STREAM_D)), - streamableBytesForTree(streamD)); - } - - /** - * In this test, we block all streams shift the priority tree and verify priority bytes for each subtree are correct - * - *
-     *        [0]
-     *        / \
-     *       A   B
-     *      / \
-     *     C   D
-     * 
- * - * After the tree shift: - * - *
-     *        [0]
-     *         |
-     *         A
-     *         |
-     *         B
-     *        / \
-     *       C   D
-     * 
- */ - @Test - public void subTreeBytesShouldBeCorrectWithRestructure() throws Http2Exception { - // Block the connection - exhaustStreamWindow(CONNECTION_STREAM_ID); - - Http2Stream stream0 = connection.connectionStream(); - Http2Stream streamA = connection.stream(STREAM_A); - Http2Stream streamB = connection.stream(STREAM_B); - Http2Stream streamC = connection.stream(STREAM_C); - Http2Stream streamD = connection.stream(STREAM_D); - - // Send a bunch of data on each stream. - final IntObjectMap streamSizes = new IntObjectHashMap(4); - streamSizes.put(STREAM_A, (Integer) 400); - streamSizes.put(STREAM_B, (Integer) 500); - streamSizes.put(STREAM_C, (Integer) 600); - streamSizes.put(STREAM_D, (Integer) 700); - - FakeFlowControlled dataA = new FakeFlowControlled(streamSizes.get(STREAM_A)); - FakeFlowControlled dataB = new FakeFlowControlled(streamSizes.get(STREAM_B)); - FakeFlowControlled dataC = new FakeFlowControlled(streamSizes.get(STREAM_C)); - FakeFlowControlled dataD = new FakeFlowControlled(streamSizes.get(STREAM_D)); - - sendData(STREAM_A, dataA); - sendData(STREAM_B, dataB); - sendData(STREAM_C, dataC); - sendData(STREAM_D, dataD); - controller.writePendingBytes(); - - dataA.assertNotWritten(); - dataB.assertNotWritten(); - dataC.assertNotWritten(); - dataD.assertNotWritten(); - - streamB.setPriority(STREAM_A, DEFAULT_PRIORITY_WEIGHT, true); - assertEquals(calculateStreamSizeSum(streamSizes, - Arrays.asList(STREAM_A, STREAM_B, STREAM_C, STREAM_D)), - streamableBytesForTree(stream0)); - assertEquals(calculateStreamSizeSum(streamSizes, - Arrays.asList(STREAM_A, STREAM_B, STREAM_C, STREAM_D)), - streamableBytesForTree(streamA)); - assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_B, STREAM_C, STREAM_D)), - streamableBytesForTree(streamB)); - assertEquals(calculateStreamSizeSum(streamSizes, Collections.singletonList(STREAM_C)), - streamableBytesForTree(streamC)); - assertEquals(calculateStreamSizeSum(streamSizes, Collections.singletonList(STREAM_D)), - streamableBytesForTree(streamD)); - } - - /** - * In this test, we block all streams and add a node to the priority tree and verify - * - *
-     *        [0]
-     *        / \
-     *       A   B
-     *      / \
-     *     C   D
-     * 
- * - * After the tree shift: - * - *
-     *        [0]
-     *        / \
-     *       A   B
-     *       |
-     *       E
-     *      / \
-     *     C   D
-     * 
- */ - @Test - public void subTreeBytesShouldBeCorrectWithAddition() throws Http2Exception { - // Block the connection - exhaustStreamWindow(CONNECTION_STREAM_ID); - - Http2Stream stream0 = connection.connectionStream(); - Http2Stream streamA = connection.stream(STREAM_A); - Http2Stream streamB = connection.stream(STREAM_B); - Http2Stream streamC = connection.stream(STREAM_C); - Http2Stream streamD = connection.stream(STREAM_D); - - Http2Stream streamE = connection.local().createStream(STREAM_E, false); - streamE.setPriority(STREAM_A, DEFAULT_PRIORITY_WEIGHT, true); - - // Send a bunch of data on each stream. - final IntObjectMap streamSizes = new IntObjectHashMap(4); - streamSizes.put(STREAM_A, (Integer) 400); - streamSizes.put(STREAM_B, (Integer) 500); - streamSizes.put(STREAM_C, (Integer) 600); - streamSizes.put(STREAM_D, (Integer) 700); - streamSizes.put(STREAM_E, (Integer) 900); - - FakeFlowControlled dataA = new FakeFlowControlled(streamSizes.get(STREAM_A)); - FakeFlowControlled dataB = new FakeFlowControlled(streamSizes.get(STREAM_B)); - FakeFlowControlled dataC = new FakeFlowControlled(streamSizes.get(STREAM_C)); - FakeFlowControlled dataD = new FakeFlowControlled(streamSizes.get(STREAM_D)); - FakeFlowControlled dataE = new FakeFlowControlled(streamSizes.get(STREAM_E)); - - sendData(STREAM_A, dataA); - sendData(STREAM_B, dataB); - sendData(STREAM_C, dataC); - sendData(STREAM_D, dataD); - sendData(STREAM_E, dataE); - controller.writePendingBytes(); - - dataA.assertNotWritten(); - dataB.assertNotWritten(); - dataC.assertNotWritten(); - dataD.assertNotWritten(); - dataE.assertNotWritten(); - - assertEquals(calculateStreamSizeSum(streamSizes, - Arrays.asList(STREAM_A, STREAM_B, STREAM_C, STREAM_D, STREAM_E)), - streamableBytesForTree(stream0)); - assertEquals(calculateStreamSizeSum(streamSizes, - Arrays.asList(STREAM_A, STREAM_E, STREAM_C, STREAM_D)), - streamableBytesForTree(streamA)); - assertEquals(calculateStreamSizeSum(streamSizes, Collections.singletonList(STREAM_B)), - streamableBytesForTree(streamB)); - assertEquals(calculateStreamSizeSum(streamSizes, Collections.singletonList(STREAM_C)), - streamableBytesForTree(streamC)); - assertEquals(calculateStreamSizeSum(streamSizes, Collections.singletonList(STREAM_D)), - streamableBytesForTree(streamD)); - assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_E, STREAM_C, STREAM_D)), - streamableBytesForTree(streamE)); - } - - /** - * In this test, we block all streams and close an internal stream in the priority tree but tree should not change - * - *
-     *        [0]
-     *        / \
-     *       A   B
-     *      / \
-     *     C   D
-     * 
- */ - @Test - public void subTreeBytesShouldBeCorrectWithInternalStreamClose() throws Http2Exception { - // Block the connection - exhaustStreamWindow(CONNECTION_STREAM_ID); - - Http2Stream stream0 = connection.connectionStream(); - Http2Stream streamA = connection.stream(STREAM_A); - Http2Stream streamB = connection.stream(STREAM_B); - Http2Stream streamC = connection.stream(STREAM_C); - Http2Stream streamD = connection.stream(STREAM_D); - - // Send a bunch of data on each stream. - final IntObjectMap streamSizes = new IntObjectHashMap(4); - streamSizes.put(STREAM_A, (Integer) 400); - streamSizes.put(STREAM_B, (Integer) 500); - streamSizes.put(STREAM_C, (Integer) 600); - streamSizes.put(STREAM_D, (Integer) 700); - - FakeFlowControlled dataA = new FakeFlowControlled(streamSizes.get(STREAM_A)); - FakeFlowControlled dataB = new FakeFlowControlled(streamSizes.get(STREAM_B)); - FakeFlowControlled dataC = new FakeFlowControlled(streamSizes.get(STREAM_C)); - FakeFlowControlled dataD = new FakeFlowControlled(streamSizes.get(STREAM_D)); - - sendData(STREAM_A, dataA); - sendData(STREAM_B, dataB); - sendData(STREAM_C, dataC); - sendData(STREAM_D, dataD); - controller.writePendingBytes(); - - dataA.assertNotWritten(); - dataB.assertNotWritten(); - dataC.assertNotWritten(); - dataD.assertNotWritten(); - - streamA.close(); - - assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_B, STREAM_C, STREAM_D)), - streamableBytesForTree(stream0)); - assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_C, STREAM_D)), - streamableBytesForTree(streamA)); - assertEquals(calculateStreamSizeSum(streamSizes, Collections.singletonList(STREAM_B)), - streamableBytesForTree(streamB)); - assertEquals(calculateStreamSizeSum(streamSizes, Collections.singletonList(STREAM_C)), - streamableBytesForTree(streamC)); - assertEquals(calculateStreamSizeSum(streamSizes, Collections.singletonList(STREAM_D)), - streamableBytesForTree(streamD)); - } - - /** - * In this test, we block all streams and close a leaf stream in the priority tree and verify - * - *
-     *        [0]
-     *        / \
-     *       A   B
-     *      / \
-     *     C   D
-     * 
- * - * After the close: - *
-     *        [0]
-     *        / \
-     *       A   B
-     *       |
-     *       D
-     * 
- */ - @Test - public void subTreeBytesShouldBeCorrectWithLeafStreamClose() throws Http2Exception { - // Block the connection - exhaustStreamWindow(CONNECTION_STREAM_ID); - - Http2Stream stream0 = connection.connectionStream(); - Http2Stream streamA = connection.stream(STREAM_A); - Http2Stream streamB = connection.stream(STREAM_B); - Http2Stream streamC = connection.stream(STREAM_C); - Http2Stream streamD = connection.stream(STREAM_D); - - // Send a bunch of data on each stream. - final IntObjectMap streamSizes = new IntObjectHashMap(4); - streamSizes.put(STREAM_A, (Integer) 400); - streamSizes.put(STREAM_B, (Integer) 500); - streamSizes.put(STREAM_C, (Integer) 600); - streamSizes.put(STREAM_D, (Integer) 700); - - FakeFlowControlled dataA = new FakeFlowControlled(streamSizes.get(STREAM_A)); - FakeFlowControlled dataB = new FakeFlowControlled(streamSizes.get(STREAM_B)); - FakeFlowControlled dataC = new FakeFlowControlled(streamSizes.get(STREAM_C)); - FakeFlowControlled dataD = new FakeFlowControlled(streamSizes.get(STREAM_D)); - - sendData(STREAM_A, dataA); - sendData(STREAM_B, dataB); - sendData(STREAM_C, dataC); - sendData(STREAM_D, dataD); - controller.writePendingBytes(); - - dataA.assertNotWritten(); - dataB.assertNotWritten(); - dataC.assertNotWritten(); - dataD.assertNotWritten(); - - streamC.close(); - - assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_A, STREAM_B, STREAM_D)), - streamableBytesForTree(stream0)); - assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_A, STREAM_D)), - streamableBytesForTree(streamA)); - assertEquals(calculateStreamSizeSum(streamSizes, Collections.singletonList(STREAM_B)), - streamableBytesForTree(streamB)); - assertEquals(0, streamableBytesForTree(streamC)); - assertEquals(calculateStreamSizeSum(streamSizes, Collections.singletonList(STREAM_D)), - streamableBytesForTree(streamD)); - } - @Test public void flowControlledWriteThrowsAnException() throws Exception { final Http2RemoteFlowController.FlowControlled flowControlled = mockedFlowControlledThatThrowsOnWrite(); @@ -1457,26 +705,11 @@ public class DefaultHttp2RemoteFlowControllerTest { return flowControlled; } - private static int calculateStreamSizeSum(IntObjectMap streamSizes, List streamIds) { - int sum = 0; - for (Integer streamId : streamIds) { - Integer streamSize = streamSizes.get(streamId); - if (streamSize != null) { - sum += streamSize; - } - } - return sum; - } - private void sendData(int streamId, FakeFlowControlled data) throws Http2Exception { Http2Stream stream = stream(streamId); controller.addFlowControlled(stream, data); } - private void setPriority(int stream, int parent, int weight, boolean exclusive) throws Http2Exception { - connection.stream(stream).setPriority(parent, (short) weight, exclusive); - } - private void exhaustStreamWindow(int streamId) throws Http2Exception { incrementWindowSize(streamId, -window(streamId)); } @@ -1493,10 +726,6 @@ public class DefaultHttp2RemoteFlowControllerTest { controller.incrementWindowSize(stream(streamId), delta); } - private int streamableBytesForTree(Http2Stream stream) { - return controller.streamableBytesForTree(stream); - } - private Http2Stream stream(int streamId) { return connection.stream(streamId); } diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/PriorityStreamByteDistributorTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/PriorityStreamByteDistributorTest.java new file mode 100644 index 0000000000..12f1a759b2 --- /dev/null +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/PriorityStreamByteDistributorTest.java @@ -0,0 +1,694 @@ +/* + * Copyright 2015 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package io.netty.handler.codec.http2; + +import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Matchers.eq; +import static org.mockito.Matchers.same; +import static org.mockito.Mockito.atMost; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import io.netty.util.collection.IntObjectHashMap; +import io.netty.util.collection.IntObjectMap; +import org.junit.Before; +import org.junit.Test; +import org.mockito.AdditionalMatchers; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.mockito.verification.VerificationMode; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +/** + * Tests for {@link PriorityStreamByteDistributor}. + */ +public class PriorityStreamByteDistributorTest { + private static final int STREAM_A = 1; + private static final int STREAM_B = 3; + private static final int STREAM_C = 5; + private static final int STREAM_D = 7; + private static final int STREAM_E = 9; + + private Http2Connection connection; + private PriorityStreamByteDistributor distributor; + + @Mock + private StreamByteDistributor.Writer writer; + + @Before + public void setup() throws Http2Exception { + MockitoAnnotations.initMocks(this); + + connection = new DefaultHttp2Connection(false); + distributor = new PriorityStreamByteDistributor(connection); + + // Assume we always write all the allocated bytes. + doAnswer(new Answer() { + @Override + public Void answer(InvocationOnMock in) throws Throwable { + Http2Stream stream = (Http2Stream) in.getArguments()[0]; + int numBytes = (Integer) in.getArguments()[1]; + int streamableBytes = distributor.unallocatedStreamableBytes(stream) - numBytes; + updateStream(stream.id(), streamableBytes, streamableBytes > 0); + return null; + } + }).when(writer).write(any(Http2Stream.class), anyInt()); + + connection.local().createStream(STREAM_A, false); + connection.local().createStream(STREAM_B, false); + Http2Stream streamC = connection.local().createStream(STREAM_C, false); + Http2Stream streamD = connection.local().createStream(STREAM_D, false); + streamC.setPriority(STREAM_A, DEFAULT_PRIORITY_WEIGHT, false); + streamD.setPriority(STREAM_A, DEFAULT_PRIORITY_WEIGHT, false); + } + + @Test + public void bytesUnassignedAfterProcessing() { + updateStream(STREAM_A, 1, true); + updateStream(STREAM_B, 2, true); + updateStream(STREAM_C, 3, true); + updateStream(STREAM_D, 4, true); + + assertFalse(write(10)); + verifyWrite(STREAM_A, 1); + verifyWrite(STREAM_B, 2); + verifyWrite(STREAM_C, 3); + verifyWrite(STREAM_D, 4); + + assertFalse(write(10)); + verifyWrite(STREAM_A, 0); + verifyWrite(STREAM_B, 0); + verifyWrite(STREAM_C, 0); + verifyWrite(STREAM_D, 0); + } + + @Test + public void bytesUnassignedAfterProcessingWithException() { + updateStream(STREAM_A, 1, true); + updateStream(STREAM_B, 2, true); + updateStream(STREAM_C, 3, true); + updateStream(STREAM_D, 4, true); + + Exception fakeException = new RuntimeException("Fake exception"); + doThrow(fakeException).when(writer).write(same(stream(STREAM_C)), eq(3)); + + try { + write(10); + fail("Expected an exception"); + } catch (RuntimeException e) { + assertSame(fakeException, e); + } + + verifyWrite(atMost(1), STREAM_A, 1); + verifyWrite(atMost(1), STREAM_B, 2); + verifyWrite(STREAM_C, 3); + verifyWrite(atMost(1), STREAM_D, 4); + + doNothing().when(writer).write(same(stream(STREAM_C)), eq(3)); + write(10); + verifyWrite(atMost(1), STREAM_A, 1); + verifyWrite(atMost(1), STREAM_B, 2); + verifyWrite(times(2), STREAM_C, 3); + verifyWrite(atMost(1), STREAM_D, 4); + } + + /** + * In this test, we block A which allows bytes to be written by C and D. Here's a view of the tree (stream A is + * blocked). + * + *
+     *         0
+     *        / \
+     *      [A]  B
+     *      / \
+     *     C   D
+     * 
+ */ + @Test + public void blockedStreamShouldSpreadDataToChildren() throws Http2Exception { + // A cannot stream. + updateStream(STREAM_B, 10, true); + updateStream(STREAM_C, 10, true); + updateStream(STREAM_D, 10, true); + + // Write up to 10 bytes. + assertTrue(write(10)); + + // A is not written + verifyWrite(STREAM_A, 0); + + // B is partially written + verifyWrite(STREAM_B, 5); + + // Verify that C and D each shared half of A's allowance. Since A's allowance (5) cannot + // be split evenly, one will get 3 and one will get 2. + verifyWrite(STREAM_C, 3); + verifyWrite(STREAM_D, 2); + } + + /** + * In this test, we block B which allows all bytes to be written by A. A should not share the data with its children + * since it's not blocked. + * + *
+     *         0
+     *        / \
+     *       A  [B]
+     *      / \
+     *     C   D
+     * 
+ */ + @Test + public void childrenShouldNotSendDataUntilParentBlocked() throws Http2Exception { + // B cannot stream. + updateStream(STREAM_A, 10, true); + updateStream(STREAM_C, 10, true); + updateStream(STREAM_D, 10, true); + + // Write up to 10 bytes. + assertTrue(write(10)); + + // A is assigned all of the bytes. + verifyWrite(STREAM_A, 10); + verifyWrite(STREAM_B, 0); + verifyWrite(STREAM_C, 0); + verifyWrite(STREAM_D, 0); + } + + /** + * In this test, we block B which allows all bytes to be written by A. Once A is complete, it will spill over the + * remaining of its portion to its children. + * + *
+     *         0
+     *        / \
+     *       A  [B]
+     *      / \
+     *     C   D
+     * 
+ */ + @Test + public void parentShouldWaterFallDataToChildren() throws Http2Exception { + // B cannot stream. + updateStream(STREAM_A, 5, true); + updateStream(STREAM_C, 10, true); + updateStream(STREAM_D, 10, true); + + // Write up to 10 bytes. + assertTrue(write(10)); + + verifyWrite(STREAM_A, 5); + verifyWrite(STREAM_B, 0); + verifyWrite(STREAM_C, 3); + verifyWrite(STREAM_D, 2); + } + + /** + * In this test, we verify re-prioritizing a stream. We start out with B blocked: + * + *
+     *         0
+     *        / \
+     *       A  [B]
+     *      / \
+     *     C   D
+     * 
+ * + * We then re-prioritize D so that it's directly off of the connection and verify that A and D split the written + * bytes between them. + * + *
+     *           0
+     *          /|\
+     *        /  |  \
+     *       A  [B]  D
+     *      /
+     *     C
+     * 
+ */ + @Test + public void reprioritizeShouldAdjustOutboundFlow() throws Http2Exception { + // B cannot stream. + updateStream(STREAM_A, 10, true); + updateStream(STREAM_C, 10, true); + updateStream(STREAM_D, 10, true); + + // Re-prioritize D as a direct child of the connection. + setPriority(STREAM_D, 0, DEFAULT_PRIORITY_WEIGHT, false); + + assertTrue(write(10)); + + verifyWrite(STREAM_A, 5); + verifyWrite(STREAM_B, 0); + verifyWrite(STREAM_C, 0); + verifyWrite(STREAM_D, 5); + } + + /** + * Test that the maximum allowed amount the flow controller allows to be sent is always fully allocated if + * the streams have at least this much data to send. See https://github.com/netty/netty/issues/4266. + *
+     *            0
+     *          / | \
+     *        /   |   \
+     *      A(0) B(0) C(0)
+     *     /
+     *    D(> allowed to send in 1 allocation attempt)
+     * 
+ */ + @Test + public void unstreamableParentsShouldFeedHungryChildren() throws Http2Exception { + // Setup the priority tree. + setPriority(STREAM_A, 0, (short) 32, false); + setPriority(STREAM_B, 0, (short) 16, false); + setPriority(STREAM_C, 0, (short) 16, false); + setPriority(STREAM_D, STREAM_A, (short) 16, false); + + final int writableBytes = 100; + + // Send enough so it can not be completely written out + final int expectedUnsentAmount = 1; + updateStream(STREAM_D, writableBytes + expectedUnsentAmount, true); + + assertTrue(write(writableBytes)); + + verifyWrite(STREAM_D, writableBytes); + assertEquals(expectedUnsentAmount, streamableBytesForTree(stream(STREAM_D))); + } + + /** + * In this test, we root all streams at the connection, and then verify that data is split appropriately based on + * weight (all available data is the same). + * + *
+     *           0
+     *        / / \ \
+     *       A B   C D
+     * 
+ */ + @Test + public void writeShouldPreferHighestWeight() throws Http2Exception { + // Root the streams at the connection and assign weights. + setPriority(STREAM_A, 0, (short) 50, false); + setPriority(STREAM_B, 0, (short) 200, false); + setPriority(STREAM_C, 0, (short) 100, false); + setPriority(STREAM_D, 0, (short) 100, false); + + updateStream(STREAM_A, 1000, true); + updateStream(STREAM_B, 1000, true); + updateStream(STREAM_C, 1000, true); + updateStream(STREAM_D, 1000, true); + + assertTrue(write(1000)); + + // A is assigned all of the bytes. + int allowedError = 10; + verifyWriteWithDelta(STREAM_A, 109, allowedError); + verifyWriteWithDelta(STREAM_B, 445, allowedError); + verifyWriteWithDelta(STREAM_C, 223, allowedError); + verifyWriteWithDelta(STREAM_D, 223, allowedError); + } + + /** + * In this test, we root all streams at the connection, and then verify that data is split equally among the stream, + * since they all have the same weight. + * + *
+     *           0
+     *        / / \ \
+     *       A B   C D
+     * 
+ */ + @Test + public void samePriorityShouldDistributeBasedOnData() throws Http2Exception { + // Root the streams at the connection with the same weights. + setPriority(STREAM_A, 0, DEFAULT_PRIORITY_WEIGHT, false); + setPriority(STREAM_B, 0, DEFAULT_PRIORITY_WEIGHT, false); + setPriority(STREAM_C, 0, DEFAULT_PRIORITY_WEIGHT, false); + setPriority(STREAM_D, 0, DEFAULT_PRIORITY_WEIGHT, false); + + updateStream(STREAM_A, 400, true); + updateStream(STREAM_B, 500, true); + updateStream(STREAM_C, 0, true); + updateStream(STREAM_D, 700, true); + + assertTrue(write(999)); + + verifyWrite(STREAM_A, 333); + verifyWrite(STREAM_B, 333); + verifyWrite(STREAM_C, 0); + verifyWrite(STREAM_D, 333); + } + + /** + * In this test, we verify the priority bytes for each sub tree at each node are correct + * + *
+     *         0
+     *        / \
+     *       A   B
+     *      / \
+     *     C   D
+     * 
+ */ + @Test + public void subTreeBytesShouldBeCorrect() throws Http2Exception { + Http2Stream stream0 = connection.connectionStream(); + Http2Stream streamA = connection.stream(STREAM_A); + Http2Stream streamB = connection.stream(STREAM_B); + Http2Stream streamC = connection.stream(STREAM_C); + Http2Stream streamD = connection.stream(STREAM_D); + + // Send a bunch of data on each stream. + final IntObjectMap streamSizes = new IntObjectHashMap(4); + streamSizes.put(STREAM_A, (Integer) 400); + streamSizes.put(STREAM_B, (Integer) 500); + streamSizes.put(STREAM_C, (Integer) 600); + streamSizes.put(STREAM_D, (Integer) 700); + + updateStream(STREAM_A, streamSizes.get(STREAM_A), true); + updateStream(STREAM_B, streamSizes.get(STREAM_B), true); + updateStream(STREAM_C, streamSizes.get(STREAM_C), true); + updateStream(STREAM_D, streamSizes.get(STREAM_D), true); + + assertEquals(calculateStreamSizeSum(streamSizes, + Arrays.asList(STREAM_A, STREAM_B, STREAM_C, STREAM_D)), + streamableBytesForTree(stream0)); + assertEquals( + calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_A, STREAM_C, STREAM_D)), + streamableBytesForTree(streamA)); + assertEquals(calculateStreamSizeSum(streamSizes, Collections.singletonList(STREAM_B)), + streamableBytesForTree(streamB)); + assertEquals(calculateStreamSizeSum(streamSizes, Collections.singletonList(STREAM_C)), + streamableBytesForTree(streamC)); + assertEquals(calculateStreamSizeSum(streamSizes, Collections.singletonList(STREAM_D)), + streamableBytesForTree(streamD)); + } + + /** + * In this test, we shift the priority tree and verify priority bytes for each subtree are correct + * + *
+     *         0
+     *        / \
+     *       A   B
+     *      / \
+     *     C   D
+     * 
+ * + * After the tree shift: + * + *
+     *         0
+     *         |
+     *         A
+     *         |
+     *         B
+     *        / \
+     *       C   D
+     * 
+ */ + @Test + public void subTreeBytesShouldBeCorrectWithRestructure() throws Http2Exception { + Http2Stream stream0 = connection.connectionStream(); + Http2Stream streamA = connection.stream(STREAM_A); + Http2Stream streamB = connection.stream(STREAM_B); + Http2Stream streamC = connection.stream(STREAM_C); + Http2Stream streamD = connection.stream(STREAM_D); + + // Send a bunch of data on each stream. + final IntObjectMap streamSizes = new IntObjectHashMap(4); + streamSizes.put(STREAM_A, (Integer) 400); + streamSizes.put(STREAM_B, (Integer) 500); + streamSizes.put(STREAM_C, (Integer) 600); + streamSizes.put(STREAM_D, (Integer) 700); + + updateStream(STREAM_A, streamSizes.get(STREAM_A), true); + updateStream(STREAM_B, streamSizes.get(STREAM_B), true); + updateStream(STREAM_C, streamSizes.get(STREAM_C), true); + updateStream(STREAM_D, streamSizes.get(STREAM_D), true); + + streamB.setPriority(STREAM_A, DEFAULT_PRIORITY_WEIGHT, true); + assertEquals(calculateStreamSizeSum(streamSizes, + Arrays.asList(STREAM_A, STREAM_B, STREAM_C, STREAM_D)), + streamableBytesForTree(stream0)); + assertEquals(calculateStreamSizeSum(streamSizes, + Arrays.asList(STREAM_A, STREAM_B, STREAM_C, STREAM_D)), + streamableBytesForTree(streamA)); + assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_B, STREAM_C, STREAM_D)), + streamableBytesForTree(streamB)); + assertEquals(calculateStreamSizeSum(streamSizes, Collections.singletonList(STREAM_C)), + streamableBytesForTree(streamC)); + assertEquals(calculateStreamSizeSum(streamSizes, Collections.singletonList(STREAM_D)), + streamableBytesForTree(streamD)); + } + + /** + * In this test, we add a node to the priority tree and verify + * + *
+     *         0
+     *        / \
+     *       A   B
+     *      / \
+     *     C   D
+     * 
+ * + * After the tree shift: + * + *
+     *         0
+     *        / \
+     *       A   B
+     *       |
+     *       E
+     *      / \
+     *     C   D
+     * 
+ */ + @Test + public void subTreeBytesShouldBeCorrectWithAddition() throws Http2Exception { + Http2Stream stream0 = connection.connectionStream(); + Http2Stream streamA = connection.stream(STREAM_A); + Http2Stream streamB = connection.stream(STREAM_B); + Http2Stream streamC = connection.stream(STREAM_C); + Http2Stream streamD = connection.stream(STREAM_D); + + Http2Stream streamE = connection.local().createStream(STREAM_E, false); + streamE.setPriority(STREAM_A, DEFAULT_PRIORITY_WEIGHT, true); + + // Send a bunch of data on each stream. + final IntObjectMap streamSizes = new IntObjectHashMap(4); + streamSizes.put(STREAM_A, (Integer) 400); + streamSizes.put(STREAM_B, (Integer) 500); + streamSizes.put(STREAM_C, (Integer) 600); + streamSizes.put(STREAM_D, (Integer) 700); + streamSizes.put(STREAM_E, (Integer) 900); + + updateStream(STREAM_A, streamSizes.get(STREAM_A), true); + updateStream(STREAM_B, streamSizes.get(STREAM_B), true); + updateStream(STREAM_C, streamSizes.get(STREAM_C), true); + updateStream(STREAM_D, streamSizes.get(STREAM_D), true); + updateStream(STREAM_E, streamSizes.get(STREAM_E), true); + + assertEquals(calculateStreamSizeSum(streamSizes, + Arrays.asList(STREAM_A, STREAM_B, STREAM_C, STREAM_D, STREAM_E)), + streamableBytesForTree(stream0)); + assertEquals(calculateStreamSizeSum(streamSizes, + Arrays.asList(STREAM_A, STREAM_E, STREAM_C, STREAM_D)), + streamableBytesForTree(streamA)); + assertEquals(calculateStreamSizeSum(streamSizes, Collections.singletonList(STREAM_B)), + streamableBytesForTree(streamB)); + assertEquals(calculateStreamSizeSum(streamSizes, Collections.singletonList(STREAM_C)), + streamableBytesForTree(streamC)); + assertEquals(calculateStreamSizeSum(streamSizes, Collections.singletonList(STREAM_D)), + streamableBytesForTree(streamD)); + assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_E, STREAM_C, STREAM_D)), + streamableBytesForTree(streamE)); + } + + /** + * In this test, we close an internal stream in the priority tree but tree should not change + * + *
+     *         0
+     *        / \
+     *       A   B
+     *      / \
+     *     C   D
+     * 
+ */ + @Test + public void subTreeBytesShouldBeCorrectWithInternalStreamClose() throws Http2Exception { + Http2Stream stream0 = connection.connectionStream(); + Http2Stream streamA = connection.stream(STREAM_A); + Http2Stream streamB = connection.stream(STREAM_B); + Http2Stream streamC = connection.stream(STREAM_C); + Http2Stream streamD = connection.stream(STREAM_D); + + // Send a bunch of data on each stream. + final IntObjectMap streamSizes = new IntObjectHashMap(4); + streamSizes.put(STREAM_A, (Integer) 400); + streamSizes.put(STREAM_B, (Integer) 500); + streamSizes.put(STREAM_C, (Integer) 600); + streamSizes.put(STREAM_D, (Integer) 700); + + updateStream(STREAM_A, streamSizes.get(STREAM_A), true); + updateStream(STREAM_B, streamSizes.get(STREAM_B), true); + updateStream(STREAM_C, streamSizes.get(STREAM_C), true); + updateStream(STREAM_D, streamSizes.get(STREAM_D), true); + + streamA.close(); + + assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_B, STREAM_C, STREAM_D)), + streamableBytesForTree(stream0)); + assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_C, STREAM_D)), + streamableBytesForTree(streamA)); + assertEquals(calculateStreamSizeSum(streamSizes, Collections.singletonList(STREAM_B)), + streamableBytesForTree(streamB)); + assertEquals(calculateStreamSizeSum(streamSizes, Collections.singletonList(STREAM_C)), + streamableBytesForTree(streamC)); + assertEquals(calculateStreamSizeSum(streamSizes, Collections.singletonList(STREAM_D)), + streamableBytesForTree(streamD)); + } + + /** + * In this test, we close a leaf stream in the priority tree and verify + * + *
+     *         0
+     *        / \
+     *       A   B
+     *      / \
+     *     C   D
+     * 
+ * + * After the close: + *
+     *         0
+     *        / \
+     *       A   B
+     *       |
+     *       D
+     * 
+ */ + @Test + public void subTreeBytesShouldBeCorrectWithLeafStreamClose() throws Http2Exception { + Http2Stream stream0 = connection.connectionStream(); + Http2Stream streamA = connection.stream(STREAM_A); + Http2Stream streamB = connection.stream(STREAM_B); + Http2Stream streamC = connection.stream(STREAM_C); + Http2Stream streamD = connection.stream(STREAM_D); + + // Send a bunch of data on each stream. + final IntObjectMap streamSizes = new IntObjectHashMap(4); + streamSizes.put(STREAM_A, (Integer) 400); + streamSizes.put(STREAM_B, (Integer) 500); + streamSizes.put(STREAM_C, (Integer) 600); + streamSizes.put(STREAM_D, (Integer) 700); + + updateStream(STREAM_A, streamSizes.get(STREAM_A), true); + updateStream(STREAM_B, streamSizes.get(STREAM_B), true); + updateStream(STREAM_C, streamSizes.get(STREAM_C), true); + updateStream(STREAM_D, streamSizes.get(STREAM_D), true); + + streamC.close(); + + assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_A, STREAM_B, STREAM_D)), + streamableBytesForTree(stream0)); + assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_A, STREAM_D)), + streamableBytesForTree(streamA)); + assertEquals(calculateStreamSizeSum(streamSizes, Collections.singletonList(STREAM_B)), + streamableBytesForTree(streamB)); + assertEquals(0, streamableBytesForTree(streamC)); + assertEquals(calculateStreamSizeSum(streamSizes, Collections.singletonList(STREAM_D)), + streamableBytesForTree(streamD)); + } + + private Http2Stream stream(int streamId) { + return connection.stream(streamId); + } + + private void updateStream(final int streamId, final int streamableBytes, final boolean hasFrame) { + final Http2Stream stream = stream(streamId); + distributor.updateStreamableBytes(new StreamByteDistributor.StreamState() { + @Override + public Http2Stream stream() { + return stream; + } + + @Override + public int streamableBytes() { + return streamableBytes; + } + + @Override + public boolean hasFrame() { + return hasFrame; + } + }); + } + + private void setPriority(int streamId, int parent, int weight, boolean exclusive) throws Http2Exception { + stream(streamId).setPriority(parent, (short) weight, exclusive); + } + + private int streamableBytesForTree(Http2Stream stream) { + return distributor.unallocatedStreamableBytesForTree(stream); + } + + private boolean write(int numBytes) { + return distributor.distribute(numBytes, writer); + } + + private void verifyWrite(int streamId, int numBytes) { + verify(writer).write(same(stream(streamId)), eq(numBytes)); + } + + private void verifyWrite(VerificationMode mode, int streamId, int numBytes) { + verify(writer, mode).write(same(stream(streamId)), eq(numBytes)); + } + + private void verifyWriteWithDelta(int streamId, int numBytes, int delta) { + verify(writer).write(same(stream(streamId)), (int) AdditionalMatchers.eq(numBytes, delta)); + } + + private static int calculateStreamSizeSum(IntObjectMap streamSizes, List streamIds) { + int sum = 0; + for (Integer streamId : streamIds) { + Integer streamSize = streamSizes.get(streamId); + if (streamSize != null) { + sum += streamSize; + } + } + return sum; + } +}