Making HTTP/2 stream byte assignment pluggable

Motivation:

The DefaultHttp2RemoteFlowController has become very large and is getting difficult to understand and maintain. It is also desirable for some applications to be able to disable the priority algorithm altogether for performance reasons.

Modifications:

Abstract the stream byte assignment logic (renamed allocation->assignment for clarity) behind an interface `StreamByteAssigner` with a single implementation `PriorityStreamByteAssigner`.

Result:

Goes some way towards supporting #4246
This commit is contained in:
nmittler 2015-09-21 14:09:06 -07:00
parent 2766fc49e2
commit 7ab132f28a
6 changed files with 1262 additions and 1142 deletions

View File

@ -23,11 +23,11 @@ import static io.netty.handler.codec.http2.Http2Stream.State.IDLE;
import static io.netty.util.internal.ObjectUtil.checkNotNull; import static io.netty.util.internal.ObjectUtil.checkNotNull;
import static java.lang.Math.max; import static java.lang.Math.max;
import static java.lang.Math.min; import static java.lang.Math.min;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http2.Http2Stream.State; import io.netty.handler.codec.http2.Http2Stream.State;
import java.util.ArrayDeque; import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.Deque; import java.util.Deque;
/** /**
@ -38,29 +38,37 @@ import java.util.Deque;
*/ */
public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowController { public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowController {
private static final int MIN_WRITABLE_CHUNK = 32 * 1024; private static final int MIN_WRITABLE_CHUNK = 32 * 1024;
private final Http2StreamVisitor WRITE_ALLOCATED_BYTES = new Http2StreamVisitor() {
private final StreamByteDistributor.Writer writer = new StreamByteDistributor.Writer() {
@Override @Override
public boolean visit(Http2Stream stream) { public void write(Http2Stream stream, int numBytes) {
int written = state(stream).writeAllocatedBytes(); int written = state(stream).writeAllocatedBytes(numBytes);
if (written != -1 && listener != null) { if (written != -1 && listener != null) {
listener.streamWritten(stream, written); listener.streamWritten(stream, written);
} }
return true;
} }
}; };
private final Http2Connection connection; private final Http2Connection connection;
private final Http2Connection.PropertyKey stateKey; private final Http2Connection.PropertyKey stateKey;
private final StreamByteDistributor streamByteDistributor;
private final AbstractState connectionState;
private int initialWindowSize = DEFAULT_WINDOW_SIZE; private int initialWindowSize = DEFAULT_WINDOW_SIZE;
private ChannelHandlerContext ctx; private ChannelHandlerContext ctx;
private Listener listener; private Listener listener;
public DefaultHttp2RemoteFlowController(Http2Connection connection) { public DefaultHttp2RemoteFlowController(Http2Connection connection) {
this(connection, new PriorityStreamByteDistributor(connection));
}
public DefaultHttp2RemoteFlowController(Http2Connection connection,
StreamByteDistributor streamByteDistributor) {
this.connection = checkNotNull(connection, "connection"); this.connection = checkNotNull(connection, "connection");
this.streamByteDistributor = checkNotNull(streamByteDistributor, "streamWriteDistributor");
// Add a flow state for the connection. // Add a flow state for the connection.
stateKey = connection.newKey(); stateKey = connection.newKey();
connection.connectionStream().setProperty(stateKey, connectionState = new DefaultState(connection.connectionStream(), initialWindowSize);
new DefaultState(connection.connectionStream(), initialWindowSize)); connection.connectionStream().setProperty(stateKey, connectionState);
// Register for notification of new streams. // Register for notification of new streams.
connection.addListener(new Http2ConnectionAdapter() { connection.addListener(new Http2ConnectionAdapter() {
@ -117,28 +125,6 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
state(stream).cancel(); state(stream).cancel();
} }
} }
@Override
public void onPriorityTreeParentChanged(Http2Stream stream, Http2Stream oldParent) {
Http2Stream parent = stream.parent();
if (parent != null) {
int delta = state(stream).streamableBytesForTree();
if (delta != 0) {
state(parent).incrementStreamableBytesForTree(delta);
}
}
}
@Override
public void onPriorityTreeParentChanging(Http2Stream stream, Http2Stream newParent) {
Http2Stream parent = stream.parent();
if (parent != null) {
int delta = -state(stream).streamableBytesForTree();
if (delta != 0) {
state(parent).incrementStreamableBytesForTree(delta);
}
}
}
}); });
} }
@ -209,7 +195,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
assert ctx == null || ctx.executor().inEventLoop(); assert ctx == null || ctx.executor().inEventLoop();
if (stream.id() == CONNECTION_STREAM_ID) { if (stream.id() == CONNECTION_STREAM_ID) {
// Update the connection window // Update the connection window
connectionState().incrementStreamWindow(delta); connectionState.incrementStreamWindow(delta);
} else { } else {
// Update the stream window // Update the stream window
AbstractState state = state(stream); AbstractState state = state(stream);
@ -238,31 +224,18 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
state.enqueueFrame(frame); state.enqueueFrame(frame);
} catch (Throwable t) { } catch (Throwable t) {
frame.error(ctx, t); frame.error(ctx, t);
return;
} }
} }
/**
* For testing purposes only. Exposes the number of streamable bytes for the tree rooted at
* the given stream.
*/
int streamableBytesForTree(Http2Stream stream) {
return state(stream).streamableBytesForTree();
}
private AbstractState state(Http2Stream stream) { private AbstractState state(Http2Stream stream) {
return (AbstractState) checkNotNull(stream, "stream").getProperty(stateKey); return (AbstractState) checkNotNull(stream, "stream").getProperty(stateKey);
} }
private AbstractState connectionState() {
return (AbstractState) connection.connectionStream().getProperty(stateKey);
}
/** /**
* Returns the flow control window for the entire connection. * Returns the flow control window for the entire connection.
*/ */
private int connectionWindowSize() { private int connectionWindowSize() {
return connectionState().windowSize(); return connectionState.windowSize();
} }
private int minUsableChannelBytes() { private int minUsableChannelBytes() {
@ -285,16 +258,17 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
int useableBytes = channelWritableBytes > 0 ? max(channelWritableBytes, minUsableChannelBytes()) : 0; int useableBytes = channelWritableBytes > 0 ? max(channelWritableBytes, minUsableChannelBytes()) : 0;
// Clip the usable bytes by the connection window. // Clip the usable bytes by the connection window.
return min(connectionState().windowSize(), useableBytes); return min(connectionState.windowSize(), useableBytes);
} }
/** /**
* Package private for testing purposes only! * Package private for testing purposes only!
* @param requestedBytes The desired amount of bytes. *
* @return The amount of bytes that can be supported by underlying {@link Channel} without queuing "too-much". * @return The amount of bytes that can be supported by underlying {@link
* io.netty.channel.Channel} without queuing "too-much".
*/ */
final int writableBytes(int requestedBytes) { private int writableBytes() {
return Math.min(requestedBytes, maxUsableChannelBytes()); return Math.min(connectionWindowSize(), maxUsableChannelBytes());
} }
/** /**
@ -302,198 +276,15 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
*/ */
@Override @Override
public void writePendingBytes() throws Http2Exception { public void writePendingBytes() throws Http2Exception {
AbstractState connectionState = connectionState(); int bytesToWrite = writableBytes();
int connectionWindowSize; boolean haveUnwrittenBytes;
// Using a do-while loop so that we always write at least once, regardless if we have
// bytesToWrite or not. This ensures that zero-length frames will always be written.
do { do {
connectionWindowSize = writableBytes(connectionState.windowSize()); // Distribute the connection window across the streams and write the data.
haveUnwrittenBytes = streamByteDistributor.distribute(bytesToWrite, writer);
if (connectionWindowSize > 0) { } while (haveUnwrittenBytes && (bytesToWrite = writableBytes()) > 0 && ctx.channel().isWritable());
// Allocate the bytes for the connection window to the streams, but do not write.
allocateBytesForTree(connectionState.stream(), connectionWindowSize);
}
// Write all of allocated bytes. We must call this even if no bytes are allocated as it is possible there
// are empty frames indicating the End Of Stream.
connection.forEachActiveStream(WRITE_ALLOCATED_BYTES);
} while (connectionState.streamableBytesForTree() > 0 &&
connectionWindowSize > 0 &&
ctx.channel().isWritable());
}
/**
* This will allocate bytes by stream weight and priority for the entire tree rooted at {@code parent}, but does not
* write any bytes. The connection window is generally distributed amongst siblings according to their weight,
* however we need to ensure that the entire connection window is used (assuming streams have >= connection window
* bytes to send) and we may need some sort of rounding to accomplish this.
*
* @param parent The parent of the tree.
* @param connectionWindowSize The connection window this is available for use at this point in the tree.
* @return An object summarizing the write and allocation results.
*/
int allocateBytesForTree(Http2Stream parent, int connectionWindowSize) throws Http2Exception {
AbstractState state = state(parent);
if (state.streamableBytesForTree() <= 0) {
return 0;
}
// If the number of streamable bytes for this tree will fit in the connection window
// then there is no need to prioritize the bytes...everyone sends what they have
if (state.streamableBytesForTree() <= connectionWindowSize) {
SimpleChildFeeder childFeeder = new SimpleChildFeeder(connectionWindowSize);
parent.forEachChild(childFeeder);
return childFeeder.bytesAllocated;
}
ChildFeeder childFeeder = new ChildFeeder(parent, connectionWindowSize);
// Iterate once over all children of this parent and try to feed all the children.
parent.forEachChild(childFeeder);
// Now feed any remaining children that are still hungry until the connection
// window collapses.
childFeeder.feedHungryChildren();
return childFeeder.bytesAllocated;
}
/**
* A {@link Http2StreamVisitor} that performs the HTTP/2 priority algorithm to distribute the available connection
* window appropriately to the children of a given stream.
*/
private final class ChildFeeder implements Http2StreamVisitor {
final int maxSize;
int totalWeight;
int connectionWindow;
int nextTotalWeight;
int nextConnectionWindow;
int bytesAllocated;
Http2Stream[] stillHungry;
int nextTail;
ChildFeeder(Http2Stream parent, int connectionWindow) {
maxSize = parent.numChildren();
totalWeight = parent.totalChildWeights();
this.connectionWindow = connectionWindow;
this.nextConnectionWindow = connectionWindow;
}
@Override
public boolean visit(Http2Stream child) throws Http2Exception {
// In order to make progress toward the connection window due to possible rounding errors, we make sure
// that each stream (with data to send) is given at least 1 byte toward the connection window.
int connectionWindowChunk = max(1, (int) (connectionWindow * (child.weight() / (double) totalWeight)));
int bytesForTree = min(nextConnectionWindow, connectionWindowChunk);
AbstractState state = state(child);
int bytesForChild = min(state.streamableBytes(), bytesForTree);
// Allocate the bytes to this child.
if (bytesForChild > 0) {
state.allocate(bytesForChild);
bytesAllocated += bytesForChild;
nextConnectionWindow -= bytesForChild;
bytesForTree -= bytesForChild;
}
// Allocate any remaining bytes to the children of this stream.
if (bytesForTree > 0) {
int childBytesAllocated = allocateBytesForTree(child, bytesForTree);
bytesAllocated += childBytesAllocated;
nextConnectionWindow -= childBytesAllocated;
}
if (nextConnectionWindow > 0) {
// If this subtree still wants to send then it should be re-considered to take bytes that are unused by
// sibling nodes. This is needed because we don't yet know if all the peers will be able to use all of
// their "fair share" of the connection window, and if they don't use it then we should divide their
// unused shared up for the peers who still want to send.
if (state.streamableBytesForTree() > 0) {
stillHungry(child);
}
return true;
}
return false;
}
void feedHungryChildren() throws Http2Exception {
if (stillHungry == null) {
// There are no hungry children to feed.
return;
}
totalWeight = nextTotalWeight;
connectionWindow = nextConnectionWindow;
// Loop until there are not bytes left to stream or the connection window has collapsed.
for (int tail = nextTail; tail > 0 && connectionWindow > 0;) {
nextTotalWeight = 0;
nextTail = 0;
// Iterate over the children that are currently still hungry.
for (int head = 0; head < tail && nextConnectionWindow > 0; ++head) {
if (!visit(stillHungry[head])) {
// The connection window has collapsed, break out of the loop.
break;
}
}
connectionWindow = nextConnectionWindow;
totalWeight = nextTotalWeight;
tail = nextTail;
}
}
/**
* Indicates that the given child is still hungry (i.e. still has streamable bytes that can
* fit within the current connection window).
*/
private void stillHungry(Http2Stream child) {
ensureSpaceIsAllocated(nextTail);
stillHungry[nextTail++] = child;
nextTotalWeight += child.weight();
}
/**
* Ensures that the {@link #stillHungry} array is properly sized to hold the given index.
*/
private void ensureSpaceIsAllocated(int index) {
if (stillHungry == null) {
// Initial size is 1/4 the number of children. Clipping the minimum at 2, which will over allocate if
// maxSize == 1 but if this was true we shouldn't need to re-allocate because the 1 child should get
// all of the available connection window.
stillHungry = new Http2Stream[max(2, maxSize >>> 2)];
} else if (index == stillHungry.length) {
// Grow the array by a factor of 2.
stillHungry = Arrays.copyOf(stillHungry, min(maxSize, stillHungry.length << 1));
}
}
}
/**
* A simplified version of {@link ChildFeeder} that is only used when all streamable bytes fit within the
* available connection window.
*/
private final class SimpleChildFeeder implements Http2StreamVisitor {
int bytesAllocated;
int connectionWindow;
SimpleChildFeeder(int connectionWindow) {
this.connectionWindow = connectionWindow;
}
@Override
public boolean visit(Http2Stream child) throws Http2Exception {
AbstractState childState = state(child);
int bytesForChild = childState.streamableBytes();
if (bytesForChild > 0 || childState.hasFrame()) {
childState.allocate(bytesForChild);
bytesAllocated += bytesForChild;
connectionWindow -= bytesForChild;
}
int childBytesAllocated = allocateBytesForTree(child, connectionWindow);
bytesAllocated += childBytesAllocated;
connectionWindow -= childBytesAllocated;
return true;
}
} }
/** /**
@ -503,7 +294,6 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
private final Deque<FlowControlled> pendingWriteQueue; private final Deque<FlowControlled> pendingWriteQueue;
private int window; private int window;
private int pendingBytes; private int pendingBytes;
private int allocated;
// Set to true while a frame is being written, false otherwise. // Set to true while a frame is being written, false otherwise.
private boolean writing; private boolean writing;
// Set to true if cancel() was called. // Set to true if cancel() was called.
@ -537,31 +327,13 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
} }
@Override @Override
void allocate(int bytes) { int writeAllocatedBytes(int allocated) {
allocated += bytes; try {
// Also artificially reduce the streamable bytes for this tree to give the appearance
// that the data has been written. This will be restored before the allocated bytes are
// actually written.
incrementStreamableBytesForTree(-bytes);
}
@Override
int writeAllocatedBytes() {
int numBytes = allocated;
// Restore the number of streamable bytes to this branch.
incrementStreamableBytesForTree(allocated);
resetAllocated();
// Perform the write. // Perform the write.
return writeBytes(numBytes); return writeBytes(allocated);
} finally {
streamByteDistributor.updateStreamableBytes(this);
} }
/**
* Reset the number of bytes that have been allocated to this stream by the priority algorithm.
*/
private void resetAllocated() {
allocated = 0;
} }
@Override @Override
@ -570,30 +342,19 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
throw streamError(stream.id(), FLOW_CONTROL_ERROR, throw streamError(stream.id(), FLOW_CONTROL_ERROR,
"Window size overflow for stream: %d", stream.id()); "Window size overflow for stream: %d", stream.id());
} }
int previouslyStreamable = streamableBytes();
window += delta; window += delta;
// Update this branch of the priority tree if the streamable bytes have changed for this node. streamByteDistributor.updateStreamableBytes(this);
int streamableDelta = streamableBytes() - previouslyStreamable;
if (streamableDelta != 0) {
incrementStreamableBytesForTree(streamableDelta);
}
return window; return window;
} }
@Override
int writableWindow() { int writableWindow() {
return min(window, connectionWindowSize()); return min(window, connectionWindowSize());
} }
@Override @Override
int streamableBytes() { public int streamableBytes() {
return max(0, min(pendingBytes - allocated, window)); return max(0, min(pendingBytes, window));
}
@Override
int streamableBytesForTree() {
return streamableBytesForTree;
} }
@Override @Override
@ -606,7 +367,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
} }
@Override @Override
boolean hasFrame() { public boolean hasFrame() {
return !pendingWriteQueue.isEmpty(); return !pendingWriteQueue.isEmpty();
} }
@ -640,9 +401,9 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
writeError(frame, streamError(stream.id(), INTERNAL_ERROR, cause, writeError(frame, streamError(stream.id(), INTERNAL_ERROR, cause,
"Stream closed before write could take place")); "Stream closed before write could take place"));
} }
streamByteDistributor.updateStreamableBytes(this);
} }
@Override
int writeBytes(int bytes) { int writeBytes(int bytes) {
if (!hasFrame()) { if (!hasFrame()) {
return -1; return -1;
@ -712,18 +473,11 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
} }
/** /**
* Increments the number of pending bytes for this node. If there was any change to the number of bytes that * Increments the number of pending bytes for this node and updates the {@link StreamByteDistributor}.
* fit into the stream window, then {@link #incrementStreamableBytesForTree} is called to recursively update
* this branch of the priority tree.
*/ */
private void incrementPendingBytes(int numBytes) { private void incrementPendingBytes(int numBytes) {
int previouslyStreamable = streamableBytes();
pendingBytes += numBytes; pendingBytes += numBytes;
streamByteDistributor.updateStreamableBytes(this);
int delta = streamableBytes() - previouslyStreamable;
if (delta != 0) {
incrementStreamableBytesForTree(delta);
}
} }
/** /**
@ -739,7 +493,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
private void decrementFlowControlWindow(int bytes) { private void decrementFlowControlWindow(int bytes) {
try { try {
int negativeBytes = -bytes; int negativeBytes = -bytes;
connectionState().incrementStreamWindow(negativeBytes); connectionState.incrementStreamWindow(negativeBytes);
incrementStreamWindow(negativeBytes); incrementStreamWindow(negativeBytes);
} catch (Http2Exception e) { } catch (Http2Exception e) {
// Should never get here since we're decrementing. // Should never get here since we're decrementing.
@ -782,22 +536,12 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
} }
@Override @Override
int writableWindow() { public int streamableBytes() {
return 0; return 0;
} }
@Override @Override
int streamableBytes() { int writeAllocatedBytes(int allocated) {
return 0;
}
@Override
int streamableBytesForTree() {
return streamableBytesForTree;
}
@Override
int writeAllocatedBytes() {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
@ -817,23 +561,13 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
return 0; return 0;
} }
@Override
int writeBytes(int bytes) {
throw new UnsupportedOperationException();
}
@Override @Override
void enqueueFrame(FlowControlled frame) { void enqueueFrame(FlowControlled frame) {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
@Override @Override
void allocate(int bytes) { public boolean hasFrame() {
throw new UnsupportedOperationException();
}
@Override
boolean hasFrame() {
return false; return false;
} }
} }
@ -841,9 +575,8 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
/** /**
* An abstraction which provides specific extensions used by remote flow control. * An abstraction which provides specific extensions used by remote flow control.
*/ */
private abstract class AbstractState { private abstract class AbstractState implements StreamByteDistributor.StreamState {
protected final Http2Stream stream; protected final Http2Stream stream;
protected int streamableBytesForTree;
AbstractState(Http2Stream stream) { AbstractState(Http2Stream stream) {
this.stream = stream; this.stream = stream;
@ -851,27 +584,16 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
AbstractState(AbstractState existingState) { AbstractState(AbstractState existingState) {
this.stream = existingState.stream(); this.stream = existingState.stream();
this.streamableBytesForTree = existingState.streamableBytesForTree();
} }
/** /**
* The stream this state is associated with. * The stream this state is associated with.
*/ */
final Http2Stream stream() { @Override
public final Http2Stream stream() {
return stream; return stream;
} }
/**
* Recursively increments the {@link #streamableBytesForTree()} for this branch in the priority tree starting
* at the current node.
*/
final void incrementStreamableBytesForTree(int numBytes) {
streamableBytesForTree += numBytes;
if (!stream.isRoot()) {
state(stream.parent()).incrementStreamableBytesForTree(numBytes);
}
}
abstract int windowSize(); abstract int windowSize();
abstract int initialWindowSize(); abstract int initialWindowSize();
@ -881,21 +603,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
* *
* @return the number of bytes written for a stream or {@code -1} if no write occurred. * @return the number of bytes written for a stream or {@code -1} if no write occurred.
*/ */
abstract int writeAllocatedBytes(); abstract int writeAllocatedBytes(int allocated);
/**
* Returns the number of pending bytes for this node that will fit within the
* {@link #writableWindow()}. This is used for the priority algorithm to determine the aggregate
* number of bytes that can be written at each node. Each node only takes into account its
* stream window so that when a change occurs to the connection window, these values need
* not change (i.e. no tree traversal is required).
*/
abstract int streamableBytes();
/**
* Get the {@link #streamableBytes()} for the entire tree rooted at this node.
*/
abstract int streamableBytesForTree();
/** /**
* Any operations that may be pending are cleared and the status of these operations is failed. * Any operations that may be pending are cleared and the status of these operations is failed.
@ -912,31 +620,9 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
*/ */
abstract int incrementStreamWindow(int delta) throws Http2Exception; abstract int incrementStreamWindow(int delta) throws Http2Exception;
/**
* Returns the maximum writable window (minimum of the stream and connection windows).
*/
abstract int writableWindow();
/**
* Writes up to the number of bytes from the pending queue. May write less if limited by the writable window, by
* the number of pending writes available, or because a frame does not support splitting on arbitrary
* boundaries. Will return {@code -1} if there are no frames to write.
*/
abstract int writeBytes(int bytes);
/** /**
* Adds the {@code frame} to the pending queue and increments the pending byte count. * Adds the {@code frame} to the pending queue and increments the pending byte count.
*/ */
abstract void enqueueFrame(FlowControlled frame); abstract void enqueueFrame(FlowControlled frame);
/**
* Increment the number of bytes allocated to this stream by the priority algorithm
*/
abstract void allocate(int bytes);
/**
* Indicates whether or not there are frames in the pending queue.
*/
abstract boolean hasFrame();
} }
} }

View File

@ -31,7 +31,7 @@ public interface Http2RemoteFlowController extends Http2FlowController {
/** /**
* Queues a payload for transmission to the remote endpoint. There is no guarantee as to when the data * Queues a payload for transmission to the remote endpoint. There is no guarantee as to when the data
* will be written or how it will be allocated to frames. * will be written or how it will be assigned to frames.
* before sending. * before sending.
* <p> * <p>
* Writes do not actually occur until {@link #writePendingBytes()} is called. * Writes do not actually occur until {@link #writePendingBytes()} is called.

View File

@ -0,0 +1,428 @@
/*
* Copyright 2015 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License, version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.netty.handler.codec.http2;
import static io.netty.util.internal.ObjectUtil.checkNotNull;
import static java.lang.Math.max;
import static java.lang.Math.min;
import java.util.Arrays;
/**
* A {@link StreamByteDistributor} that implements the HTTP/2 priority tree algorithm for allocating
* bytes for all streams in the connection.
*/
public final class PriorityStreamByteDistributor implements StreamByteDistributor {
private final Http2Connection connection;
private final Http2Connection.PropertyKey stateKey;
private final WriteVisitor writeVisitor = new WriteVisitor();
public PriorityStreamByteDistributor(Http2Connection connection) {
this.connection = checkNotNull(connection, "connection");
// Add a state for the connection.
stateKey = connection.newKey();
connection.connectionStream().setProperty(stateKey,
new PriorityState(connection.connectionStream()));
// Register for notification of new streams.
connection.addListener(new Http2ConnectionAdapter() {
@Override
public void onStreamAdded(Http2Stream stream) {
stream.setProperty(stateKey, new PriorityState(stream));
}
@Override
public void onStreamClosed(Http2Stream stream) {
state(stream).close();
}
@Override
public void onPriorityTreeParentChanged(Http2Stream stream, Http2Stream oldParent) {
Http2Stream parent = stream.parent();
if (parent != null) {
int delta = state(stream).unallocatedStreamableBytesForTree();
if (delta != 0) {
state(parent).unallocatedStreamableBytesForTreeChanged(delta);
}
}
}
@Override
public void onPriorityTreeParentChanging(Http2Stream stream, Http2Stream newParent) {
Http2Stream parent = stream.parent();
if (parent != null) {
int delta = state(stream).unallocatedStreamableBytesForTree();
if (delta != 0) {
state(parent).unallocatedStreamableBytesForTreeChanged(-delta);
}
}
}
});
}
@Override
public void updateStreamableBytes(StreamState streamState) {
state(streamState.stream()).updateStreamableBytes(streamState.streamableBytes(),
streamState.hasFrame());
}
@Override
public boolean distribute(int maxBytes, Writer writer) {
checkNotNull(writer, "writer");
if (maxBytes > 0) {
allocateBytesForTree(connection.connectionStream(), maxBytes);
}
// Need to write even if maxBytes == 0 in order to handle the case of empty frames.
writeVisitor.writeAllocatedBytes(writer);
return state(connection.connectionStream()).unallocatedStreamableBytesForTree() > 0;
}
/**
* For testing only.
*/
int unallocatedStreamableBytes(Http2Stream stream) {
return state(stream).unallocatedStreamableBytes();
}
/**
* For testing only.
*/
int unallocatedStreamableBytesForTree(Http2Stream stream) {
return state(stream).unallocatedStreamableBytesForTree();
}
/**
* This will allocate bytes by stream weight and priority for the entire tree rooted at {@code
* parent}, but does not write any bytes. The connection window is generally distributed amongst
* siblings according to their weight, however we need to ensure that the entire connection
* window is used (assuming streams have >= connection window bytes to send) and we may need
* some sort of rounding to accomplish this.
*
* @param parent The parent of the tree.
* @param connectionWindowSize The connection window this is available for use at this point in
* the tree.
* @return The number of bytes actually allocated.
*/
private int allocateBytesForTree(Http2Stream parent, int connectionWindowSize) {
PriorityState state = state(parent);
if (state.unallocatedStreamableBytesForTree() <= 0) {
return 0;
}
// If the number of streamable bytes for this tree will fit in the connection window
// then there is no need to prioritize the bytes...everyone sends what they have
if (state.unallocatedStreamableBytesForTree() <= connectionWindowSize) {
SimpleChildFeeder childFeeder = new SimpleChildFeeder(connectionWindowSize);
forEachChild(parent, childFeeder);
return childFeeder.bytesAllocated;
}
ChildFeeder childFeeder = new ChildFeeder(parent, connectionWindowSize);
// Iterate once over all children of this parent and try to feed all the children.
forEachChild(parent, childFeeder);
// Now feed any remaining children that are still hungry until the connection
// window collapses.
childFeeder.feedHungryChildren();
return childFeeder.bytesAllocated;
}
private void forEachChild(Http2Stream parent, Http2StreamVisitor childFeeder) {
try {
parent.forEachChild(childFeeder);
} catch (Http2Exception e) {
// Should never happen since the feeder doesn't throw.
throw new IllegalStateException(e);
}
}
private PriorityState state(Http2Stream stream) {
return checkNotNull(stream, "stream").getProperty(stateKey);
}
/**
* A {@link Http2StreamVisitor} that performs the HTTP/2 priority algorithm to distribute the
* available connection window appropriately to the children of a given stream.
*/
private final class ChildFeeder implements Http2StreamVisitor {
final int maxSize;
int totalWeight;
int connectionWindow;
int nextTotalWeight;
int nextConnectionWindow;
int bytesAllocated;
Http2Stream[] stillHungry;
int nextTail;
ChildFeeder(Http2Stream parent, int connectionWindow) {
maxSize = parent.numChildren();
totalWeight = parent.totalChildWeights();
this.connectionWindow = connectionWindow;
this.nextConnectionWindow = connectionWindow;
}
@Override
public boolean visit(Http2Stream child) {
// In order to make progress toward the connection window due to possible rounding errors, we make sure
// that each stream (with data to send) is given at least 1 byte toward the connection window.
int connectionWindowChunk =
max(1, (int) (connectionWindow * (child.weight() / (double) totalWeight)));
int bytesForTree = min(nextConnectionWindow, connectionWindowChunk);
PriorityState state = state(child);
int bytesForChild = min(state.unallocatedStreamableBytes(), bytesForTree);
// Allocate the bytes to this child.
if (bytesForChild > 0) {
state.allocate(bytesForChild);
bytesAllocated += bytesForChild;
nextConnectionWindow -= bytesForChild;
bytesForTree -= bytesForChild;
}
// Allocate any remaining bytes to the children of this stream.
if (bytesForTree > 0) {
int childBytesAllocated = allocateBytesForTree(child, bytesForTree);
bytesAllocated += childBytesAllocated;
nextConnectionWindow -= childBytesAllocated;
}
if (nextConnectionWindow > 0) {
// If this subtree still wants to send then it should be re-considered to take bytes that are unused by
// sibling nodes. This is needed because we don't yet know if all the peers will be able to use all of
// their "fair share" of the connection window, and if they don't use it then we should divide their
// unused shared up for the peers who still want to send.
if (state.unallocatedStreamableBytesForTree() > 0) {
stillHungry(child);
}
return true;
}
return false;
}
void feedHungryChildren() {
if (stillHungry == null) {
// There are no hungry children to feed.
return;
}
totalWeight = nextTotalWeight;
connectionWindow = nextConnectionWindow;
// Loop until there are not bytes left to stream or the connection window has collapsed.
for (int tail = nextTail; tail > 0 && connectionWindow > 0;) {
nextTotalWeight = 0;
nextTail = 0;
// Iterate over the children that are currently still hungry.
for (int head = 0; head < tail && nextConnectionWindow > 0; ++head) {
if (!visit(stillHungry[head])) {
// The connection window has collapsed, break out of the loop.
break;
}
}
connectionWindow = nextConnectionWindow;
totalWeight = nextTotalWeight;
tail = nextTail;
}
}
/**
* Indicates that the given child is still hungry (i.e. still has streamable bytes that can
* fit within the current connection window).
*/
void stillHungry(Http2Stream child) {
ensureSpaceIsAllocated(nextTail);
stillHungry[nextTail++] = child;
nextTotalWeight += child.weight();
}
/**
* Ensures that the {@link #stillHungry} array is properly sized to hold the given index.
*/
void ensureSpaceIsAllocated(int index) {
if (stillHungry == null) {
// Initial size is 1/4 the number of children. Clipping the minimum at 2, which will over allocate if
// maxSize == 1 but if this was true we shouldn't need to re-allocate because the 1 child should get
// all of the available connection window.
stillHungry = new Http2Stream[max(2, maxSize >>> 2)];
} else if (index == stillHungry.length) {
// Grow the array by a factor of 2.
stillHungry = Arrays.copyOf(stillHungry, min(maxSize, stillHungry.length << 1));
}
}
}
/**
* A simplified version of {@link ChildFeeder} that is only used when all streamable bytes fit
* within the available connection window.
*/
private final class SimpleChildFeeder implements Http2StreamVisitor {
int bytesAllocated;
int connectionWindow;
SimpleChildFeeder(int connectionWindow) {
this.connectionWindow = connectionWindow;
}
@Override
public boolean visit(Http2Stream child) {
PriorityState childState = state(child);
int bytesForChild = childState.unallocatedStreamableBytes();
if (bytesForChild > 0 || childState.hasFrame()) {
childState.allocate(bytesForChild);
bytesAllocated += bytesForChild;
connectionWindow -= bytesForChild;
}
int childBytesAllocated = allocateBytesForTree(child, connectionWindow);
bytesAllocated += childBytesAllocated;
connectionWindow -= childBytesAllocated;
return true;
}
}
/**
* The remote flow control state for a single stream.
*/
private final class PriorityState {
final Http2Stream stream;
boolean hasFrame;
int streamableBytes;
int allocated;
int unallocatedStreamableBytesForTree;
PriorityState(Http2Stream stream) {
this.stream = stream;
}
/**
* Recursively increments the {@link #unallocatedStreamableBytesForTree()} for this branch in
* the priority tree starting at the current node.
*/
void unallocatedStreamableBytesForTreeChanged(int delta) {
unallocatedStreamableBytesForTree += delta;
if (!stream.isRoot()) {
state(stream.parent()).unallocatedStreamableBytesForTreeChanged(delta);
}
}
void allocate(int bytes) {
allocated += bytes;
if (bytes != 0) {
// Also artificially reduce the streamable bytes for this tree to give the appearance
// that the data has been written. This will be restored before the allocated bytes are
// actually written.
unallocatedStreamableBytesForTreeChanged(-bytes);
}
}
/**
* Reset the number of bytes that have been allocated to this stream by the priority
* algorithm.
*/
void resetAllocated() {
allocate(-allocated);
}
void updateStreamableBytes(int newStreamableBytes, boolean hasFrame) {
this.hasFrame = hasFrame;
int delta = newStreamableBytes - streamableBytes;
if (delta != 0) {
streamableBytes = newStreamableBytes;
// Update this branch of the priority tree if the streamable bytes have changed for this node.
unallocatedStreamableBytesForTreeChanged(delta);
}
}
void close() {
// Unallocate all bytes.
resetAllocated();
// Clear the streamable bytes.
updateStreamableBytes(0, false);
}
boolean hasFrame() {
return hasFrame;
}
int unallocatedStreamableBytes() {
return streamableBytes - allocated;
}
int unallocatedStreamableBytesForTree() {
return unallocatedStreamableBytesForTree;
}
}
/**
* A connection stream visitor that delegates to the user provided visitor.
*/
private class WriteVisitor implements Http2StreamVisitor {
Writer writer;
RuntimeException error;
void writeAllocatedBytes(Writer writer) {
try {
this.writer = writer;
try {
connection.forEachActiveStream(this);
} catch (Http2Exception e) {
// Should never happen since the visitor doesn't throw.
throw new IllegalStateException(e);
}
// If an error was caught when calling back the visitor, throw it now.
if (error != null) {
throw error;
}
} finally {
error = null;
}
}
@Override
public boolean visit(Http2Stream stream) {
PriorityState state = state(stream);
try {
int allocated = state.allocated;
// Unallocate all bytes for this stream.
state.resetAllocated();
// Write the allocated bytes.
if (error == null) {
writer.write(stream, allocated);
}
} catch (RuntimeException e) {
// Stop calling the visitor, but continue in the loop to reset the allocated for
// all remaining states.
error = e;
}
// We have to iterate across all streams to ensure that we reset the allocated bytes.
return true;
}
}
}

View File

@ -0,0 +1,83 @@
/*
* Copyright 2015 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License, version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.netty.handler.codec.http2;
/**
* An object (used by remote flow control) that is responsible for distributing the bytes to be
* written across the streams in the connection.
*/
public interface StreamByteDistributor {
/**
* State information for the stream, indicating the number of bytes that are currently
* streamable. This is provided to the {@link #updateStreamableBytes(StreamState)} method.
*/
interface StreamState {
/**
* Gets the stream this state is associated with.
*/
Http2Stream stream();
/**
* Returns the number of pending bytes for this node that will fit within the stream flow
* control window. This is used for the priority algorithm to determine the aggregate number
* of bytes that can be written at each node. Each node only takes into account its stream
* window so that when a change occurs to the connection window, these values need not
* change (i.e. no tree traversal is required).
*/
int streamableBytes();
/**
* Indicates whether or not there are frames pending for this stream.
*/
boolean hasFrame();
}
/**
* Object that performs the writing of the bytes that have been allocated for a stream.
*/
interface Writer {
/**
* Writes the allocated bytes for this stream.
* @param stream the stream for which to perform the write.
* @param numBytes the number of bytes to write.
*/
void write(Http2Stream stream, int numBytes);
}
/**
* Called when the streamable bytes for a stream has changed. Until this
* method is called for the first time for a give stream, the stream is assumed to have no
* streamable bytes.
*/
void updateStreamableBytes(StreamState state);
/**
* Distributes up to {@code maxBytes} to those streams containing streamable bytes and
* iterates across those streams to write the appropriate bytes. Criteria for
* traversing streams is undefined and it is up to the implementation to determine when to stop
* at a given stream.
*
* <p>The streamable bytes are not automatically updated by calling this method. It is up to the
* caller to indicate the number of bytes streamable after the write by calling
* {@link #updateStreamableBytes(StreamState)}.
*
* @param maxBytes the maximum number of bytes to write.
* @return {@code true} if there are still streamable bytes that have not yet been written,
* otherwise {@code false}.
*/
boolean distribute(int maxBytes, Writer writer);
}

View File

@ -25,7 +25,6 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.never; import static org.mockito.Mockito.never;
@ -35,23 +34,15 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.verifyZeroInteractions; import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelConfig; import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise; import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http2.Http2FrameWriter.Configuration; import io.netty.handler.codec.http2.Http2FrameWriter.Configuration;
import io.netty.util.collection.IntObjectHashMap;
import io.netty.util.collection.IntObjectMap;
import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.EventExecutor;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import junit.framework.AssertionFailedError; import junit.framework.AssertionFailedError;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.mockito.Mock; import org.mockito.Mock;
@ -60,6 +51,8 @@ import org.mockito.MockitoAnnotations;
import org.mockito.invocation.InvocationOnMock; import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer; import org.mockito.stubbing.Answer;
import java.util.concurrent.atomic.AtomicInteger;
/** /**
* Tests for {@link DefaultHttp2RemoteFlowController}. * Tests for {@link DefaultHttp2RemoteFlowController}.
*/ */
@ -68,7 +61,6 @@ public class DefaultHttp2RemoteFlowControllerTest {
private static final int STREAM_B = 3; private static final int STREAM_B = 3;
private static final int STREAM_C = 5; private static final int STREAM_C = 5;
private static final int STREAM_D = 7; private static final int STREAM_D = 7;
private static final int STREAM_E = 9;
private DefaultHttp2RemoteFlowController controller; private DefaultHttp2RemoteFlowController controller;
@ -515,750 +507,6 @@ public class DefaultHttp2RemoteFlowControllerTest {
assertEquals(DEFAULT_WINDOW_SIZE, window(STREAM_D)); assertEquals(DEFAULT_WINDOW_SIZE, window(STREAM_D));
} }
/**
* In this test, we block A which allows bytes to be written by C and D. Here's a view of the tree (stream A is
* blocked).
*
* <pre>
* 0
* / \
* [A] B
* / \
* C D
* </pre>
*/
@Test
public void blockedStreamShouldSpreadDataToChildren() throws Http2Exception {
// Block stream A
exhaustStreamWindow(STREAM_A);
// Block the connection
exhaustStreamWindow(CONNECTION_STREAM_ID);
// Try sending 10 bytes on each stream. They will be pending until we free up the
// connection.
FakeFlowControlled dataA = new FakeFlowControlled(10);
FakeFlowControlled dataB = new FakeFlowControlled(10);
FakeFlowControlled dataC = new FakeFlowControlled(10);
FakeFlowControlled dataD = new FakeFlowControlled(10);
sendData(STREAM_A, dataA);
sendData(STREAM_B, dataB);
sendData(STREAM_C, dataC);
sendData(STREAM_D, dataD);
controller.writePendingBytes();
dataA.assertNotWritten();
dataB.assertNotWritten();
dataC.assertNotWritten();
dataD.assertNotWritten();
// Verify that the entire frame was sent.
incrementWindowSize(CONNECTION_STREAM_ID, 10);
controller.writePendingBytes();
assertEquals(0, window(CONNECTION_STREAM_ID));
// A is not written
assertEquals(0, window(STREAM_A));
dataA.assertNotWritten();
// B is partially written
assertEquals(DEFAULT_WINDOW_SIZE - 5, window(STREAM_B), 2);
dataB.assertPartiallyWritten(5);
verify(listener, times(1)).streamWritten(stream(STREAM_B), 5);
// Verify that C and D each shared half of A's allowance. Since A's allowance (5) cannot
// be split evenly, one will get 3 and one will get 2.
assertEquals(2 * DEFAULT_WINDOW_SIZE - 5, window(STREAM_C) + window(STREAM_D), 5);
dataC.assertPartiallyWritten(3);
verify(listener, times(1)).streamWritten(stream(STREAM_C), 3);
dataD.assertPartiallyWritten(2);
verify(listener, times(1)).streamWritten(stream(STREAM_D), 2);
}
/**
* In this test, we block B which allows all bytes to be written by A. A should not share the data with its children
* since it's not blocked.
*
* <pre>
* 0
* / \
* A [B]
* / \
* C D
* </pre>
*/
@Test
public void childrenShouldNotSendDataUntilParentBlocked() throws Http2Exception {
// Block stream B
exhaustStreamWindow(STREAM_B);
// Block the connection
exhaustStreamWindow(CONNECTION_STREAM_ID);
FakeFlowControlled dataA = new FakeFlowControlled(10);
FakeFlowControlled dataB = new FakeFlowControlled(10);
FakeFlowControlled dataC = new FakeFlowControlled(10);
FakeFlowControlled dataD = new FakeFlowControlled(10);
sendData(STREAM_A, dataA);
sendData(STREAM_B, dataB);
sendData(STREAM_C, dataC);
sendData(STREAM_D, dataD);
controller.writePendingBytes();
dataA.assertNotWritten();
dataB.assertNotWritten();
dataC.assertNotWritten();
dataD.assertNotWritten();
// Verify that the entire frame was sent.
incrementWindowSize(CONNECTION_STREAM_ID, 10);
controller.writePendingBytes();
assertEquals(0, window(CONNECTION_STREAM_ID));
assertEquals(DEFAULT_WINDOW_SIZE - 10, window(STREAM_A));
assertEquals(0, window(STREAM_B));
assertEquals(DEFAULT_WINDOW_SIZE, window(STREAM_C));
assertEquals(DEFAULT_WINDOW_SIZE, window(STREAM_D));
dataA.assertFullyWritten();
verify(listener, times(1)).streamWritten(stream(STREAM_A), 10);
dataB.assertNotWritten();
dataC.assertNotWritten();
dataD.assertNotWritten();
}
/**
* In this test, we block B which allows all bytes to be written by A. Once A is complete, it will spill over the
* remaining of its portion to its children.
*
* <pre>
* 0
* / \
* A [B]
* / \
* C D
* </pre>
*/
@Test
public void parentShouldWaterFallDataToChildren() throws Http2Exception {
// Block stream B
exhaustStreamWindow(STREAM_B);
// Block the connection
exhaustStreamWindow(CONNECTION_STREAM_ID);
// Only send 5 to A so that it will allow data from its children.
FakeFlowControlled dataA = new FakeFlowControlled(5);
FakeFlowControlled dataB = new FakeFlowControlled(10);
FakeFlowControlled dataC = new FakeFlowControlled(10);
FakeFlowControlled dataD = new FakeFlowControlled(10);
sendData(STREAM_A, dataA);
sendData(STREAM_B, dataB);
sendData(STREAM_C, dataC);
sendData(STREAM_D, dataD);
controller.writePendingBytes();
dataA.assertNotWritten();
dataB.assertNotWritten();
dataC.assertNotWritten();
dataD.assertNotWritten();
// Verify that the entire frame was sent.
incrementWindowSize(CONNECTION_STREAM_ID, 10);
controller.writePendingBytes();
assertEquals(0, window(CONNECTION_STREAM_ID));
assertEquals(DEFAULT_WINDOW_SIZE - 5, window(STREAM_A));
assertEquals(0, window(STREAM_B));
assertEquals(2 * DEFAULT_WINDOW_SIZE - 5, window(STREAM_C) + window(STREAM_D));
// Verify that C and D each shared half of A's allowance. Since A's allowance (5) cannot
// be split evenly, one will get 3 and one will get 2.
dataA.assertFullyWritten();
verify(listener, times(1)).streamWritten(stream(STREAM_A), 5);
dataB.assertNotWritten();
dataC.assertPartiallyWritten(3);
verify(listener, times(1)).streamWritten(stream(STREAM_C), 3);
dataD.assertPartiallyWritten(2);
verify(listener, times(1)).streamWritten(stream(STREAM_D), 2);
}
/**
* In this test, we verify re-prioritizing a stream. We start out with B blocked:
*
* <pre>
* 0
* / \
* A [B]
* / \
* C D
* </pre>
*
* We then re-prioritize D so that it's directly off of the connection and verify that A and D split the written
* bytes between them.
*
* <pre>
* 0
* /|\
* / | \
* A [B] D
* /
* C
* </pre>
*/
@Test
public void reprioritizeShouldAdjustOutboundFlow() throws Http2Exception {
// Block stream B
exhaustStreamWindow(STREAM_B);
// Block the connection
exhaustStreamWindow(CONNECTION_STREAM_ID);
// Send 10 bytes to each.
FakeFlowControlled dataA = new FakeFlowControlled(10);
FakeFlowControlled dataB = new FakeFlowControlled(10);
FakeFlowControlled dataC = new FakeFlowControlled(10);
FakeFlowControlled dataD = new FakeFlowControlled(10);
sendData(STREAM_A, dataA);
sendData(STREAM_B, dataB);
sendData(STREAM_C, dataC);
sendData(STREAM_D, dataD);
controller.writePendingBytes();
dataA.assertNotWritten();
dataB.assertNotWritten();
dataC.assertNotWritten();
dataD.assertNotWritten();
// Re-prioritize D as a direct child of the connection.
setPriority(STREAM_D, 0, DEFAULT_PRIORITY_WEIGHT, false);
// Verify that the entire frame was sent.
incrementWindowSize(CONNECTION_STREAM_ID, 10);
controller.writePendingBytes();
assertEquals(0, window(CONNECTION_STREAM_ID));
assertEquals(DEFAULT_WINDOW_SIZE - 5, window(STREAM_A), 2);
assertEquals(0, window(STREAM_B));
assertEquals(DEFAULT_WINDOW_SIZE, window(STREAM_C));
assertEquals(DEFAULT_WINDOW_SIZE - 5, window(STREAM_D), 2);
// Verify that A and D split the bytes.
dataA.assertPartiallyWritten(5);
verify(listener, times(1)).streamWritten(stream(STREAM_A), 5);
dataB.assertNotWritten();
dataC.assertNotWritten();
dataD.assertPartiallyWritten(5);
verify(listener, times(1)).streamWritten(stream(STREAM_D), 5);
}
/**
* Test that the maximum allowed amount the flow controller allows to be sent is always fully allocated if
* the streams have at least this much data to send. See https://github.com/netty/netty/issues/4266.
* <pre>
* 0
* / | \
* / | \
* A(0) B(0) C(0)
* /
* D(> allowed to send in 1 allocation attempt)
* </pre>
*/
@Test
public void unstreamableParentsShouldFeedHungryChildren() throws Http2Exception {
// Max all connection windows. We don't want this being a limiting factor in the test.
maxStreamWindow(CONNECTION_STREAM_ID);
maxStreamWindow(STREAM_A);
maxStreamWindow(STREAM_B);
maxStreamWindow(STREAM_C);
maxStreamWindow(STREAM_D);
// Setup the priority tree.
setPriority(STREAM_A, 0, (short) 32, false);
setPriority(STREAM_B, 0, (short) 16, false);
setPriority(STREAM_C, 0, (short) 16, false);
setPriority(STREAM_D, STREAM_A, (short) 16, false);
// The bytesBeforeUnwritable defaults to Long.MAX_VALUE, we need to leave room to send enough data to exceed
// the writableBytes, and so we must reduce this value to something no-zero.
when(channel.bytesBeforeUnwritable()).thenReturn(1L);
// Calculate the max amount of data the flow controller will allow to be sent now.
final int writableBytes = controller.writableBytes(window(CONNECTION_STREAM_ID));
// This is insider knowledge into how writePendingBytes works. Because the algorithm will keep looping while
// the channel is writable, we simulate that the channel will become unwritable after the first write.
when(channel.isWritable()).thenReturn(false);
// Send enough so it can not be completely written out
final int expectedUnsentAmount = 1;
// Make sure we don't overflow
assertTrue(Integer.MAX_VALUE - expectedUnsentAmount > writableBytes);
FakeFlowControlled dataD = new FakeFlowControlled(writableBytes + expectedUnsentAmount);
sendData(STREAM_D, dataD);
controller.writePendingBytes();
dataD.assertPartiallyWritten(writableBytes);
verify(listener, times(1)).streamWritten(eq(stream(STREAM_D)), eq(writableBytes));
}
/**
* In this test, we root all streams at the connection, and then verify that data is split appropriately based on
* weight (all available data is the same).
*
* <pre>
* 0
* / / \ \
* A B C D
* </pre>
*/
@Test
public void writeShouldPreferHighestWeight() throws Http2Exception {
// Block the connection
exhaustStreamWindow(CONNECTION_STREAM_ID);
// Root the streams at the connection and assign weights.
setPriority(STREAM_A, 0, (short) 50, false);
setPriority(STREAM_B, 0, (short) 200, false);
setPriority(STREAM_C, 0, (short) 100, false);
setPriority(STREAM_D, 0, (short) 100, false);
FakeFlowControlled dataA = new FakeFlowControlled(1000);
FakeFlowControlled dataB = new FakeFlowControlled(1000);
FakeFlowControlled dataC = new FakeFlowControlled(1000);
FakeFlowControlled dataD = new FakeFlowControlled(1000);
sendData(STREAM_A, dataA);
sendData(STREAM_B, dataB);
sendData(STREAM_C, dataC);
sendData(STREAM_D, dataD);
controller.writePendingBytes();
dataA.assertNotWritten();
dataB.assertNotWritten();
dataC.assertNotWritten();
dataD.assertNotWritten();
// Allow 1000 bytes to be sent.
incrementWindowSize(CONNECTION_STREAM_ID, 1000);
controller.writePendingBytes();
// All writes sum == 1000
assertEquals(1000, dataA.written() + dataB.written() + dataC.written() + dataD.written());
int allowedError = 10;
dataA.assertPartiallyWritten(109, allowedError);
dataB.assertPartiallyWritten(445, allowedError);
dataC.assertPartiallyWritten(223, allowedError);
dataD.assertPartiallyWritten(223, allowedError);
verify(listener, times(1)).streamWritten(eq(stream(STREAM_A)), anyInt());
verify(listener, times(1)).streamWritten(eq(stream(STREAM_B)), anyInt());
verify(listener, times(1)).streamWritten(eq(stream(STREAM_C)), anyInt());
verify(listener, times(1)).streamWritten(eq(stream(STREAM_D)), anyInt());
assertEquals(0, window(CONNECTION_STREAM_ID));
assertEquals(DEFAULT_WINDOW_SIZE - dataA.written(), window(STREAM_A));
assertEquals(DEFAULT_WINDOW_SIZE - dataB.written(), window(STREAM_B));
assertEquals(DEFAULT_WINDOW_SIZE - dataC.written(), window(STREAM_C));
assertEquals(DEFAULT_WINDOW_SIZE - dataD.written(), window(STREAM_D));
}
/**
* In this test, we root all streams at the connection, and then verify that data is split equally among the stream,
* since they all have the same weight.
*
* <pre>
* 0
* / / \ \
* A B C D
* </pre>
*/
@Test
public void samePriorityShouldDistributeBasedOnData() throws Http2Exception {
// Block the connection
exhaustStreamWindow(CONNECTION_STREAM_ID);
// Root the streams at the connection with the same weights.
setPriority(STREAM_A, 0, DEFAULT_PRIORITY_WEIGHT, false);
setPriority(STREAM_B, 0, DEFAULT_PRIORITY_WEIGHT, false);
setPriority(STREAM_C, 0, DEFAULT_PRIORITY_WEIGHT, false);
setPriority(STREAM_D, 0, DEFAULT_PRIORITY_WEIGHT, false);
// Send a bunch of data on each stream.
FakeFlowControlled dataA = new FakeFlowControlled(400);
FakeFlowControlled dataB = new FakeFlowControlled(500);
FakeFlowControlled dataC = new FakeFlowControlled(0);
FakeFlowControlled dataD = new FakeFlowControlled(700);
sendData(STREAM_A, dataA);
sendData(STREAM_B, dataB);
sendData(STREAM_C, dataC);
sendData(STREAM_D, dataD);
controller.writePendingBytes();
dataA.assertNotWritten();
dataB.assertNotWritten();
// The write will occur on C, because it's an empty frame.
dataC.assertFullyWritten();
verify(listener, times(1)).streamWritten(stream(STREAM_C), 0);
dataD.assertNotWritten();
// Allow 1000 bytes to be sent.
incrementWindowSize(CONNECTION_STREAM_ID, 999);
controller.writePendingBytes();
assertEquals(0, window(CONNECTION_STREAM_ID));
assertEquals(DEFAULT_WINDOW_SIZE - 333, window(STREAM_A), 50);
assertEquals(DEFAULT_WINDOW_SIZE - 333, window(STREAM_B), 50);
assertEquals(DEFAULT_WINDOW_SIZE, window(STREAM_C));
assertEquals(DEFAULT_WINDOW_SIZE - 333, window(STREAM_D), 50);
dataA.assertPartiallyWritten(333);
verify(listener, times(1)).streamWritten(stream(STREAM_A), 333);
dataB.assertPartiallyWritten(333);
verify(listener, times(1)).streamWritten(stream(STREAM_B), 333);
dataD.assertPartiallyWritten(333);
verify(listener, times(1)).streamWritten(stream(STREAM_D), 333);
}
/**
* In this test, we block all streams and verify the priority bytes for each sub tree at each node are correct
*
* <pre>
* [0]
* / \
* A B
* / \
* C D
* </pre>
*/
@Test
public void subTreeBytesShouldBeCorrect() throws Http2Exception {
// Block the connection
exhaustStreamWindow(CONNECTION_STREAM_ID);
Http2Stream stream0 = connection.connectionStream();
Http2Stream streamA = connection.stream(STREAM_A);
Http2Stream streamB = connection.stream(STREAM_B);
Http2Stream streamC = connection.stream(STREAM_C);
Http2Stream streamD = connection.stream(STREAM_D);
// Send a bunch of data on each stream.
final IntObjectMap<Integer> streamSizes = new IntObjectHashMap<Integer>(4);
streamSizes.put(STREAM_A, (Integer) 400);
streamSizes.put(STREAM_B, (Integer) 500);
streamSizes.put(STREAM_C, (Integer) 600);
streamSizes.put(STREAM_D, (Integer) 700);
FakeFlowControlled dataA = new FakeFlowControlled(streamSizes.get(STREAM_A));
FakeFlowControlled dataB = new FakeFlowControlled(streamSizes.get(STREAM_B));
FakeFlowControlled dataC = new FakeFlowControlled(streamSizes.get(STREAM_C));
FakeFlowControlled dataD = new FakeFlowControlled(streamSizes.get(STREAM_D));
sendData(STREAM_A, dataA);
sendData(STREAM_B, dataB);
sendData(STREAM_C, dataC);
sendData(STREAM_D, dataD);
controller.writePendingBytes();
dataA.assertNotWritten();
dataB.assertNotWritten();
dataC.assertNotWritten();
dataD.assertNotWritten();
assertEquals(calculateStreamSizeSum(streamSizes,
Arrays.asList(STREAM_A, STREAM_B, STREAM_C, STREAM_D)),
streamableBytesForTree(stream0));
assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_A, STREAM_C, STREAM_D)),
streamableBytesForTree(streamA));
assertEquals(calculateStreamSizeSum(streamSizes, Collections.singletonList(STREAM_B)),
streamableBytesForTree(streamB));
assertEquals(calculateStreamSizeSum(streamSizes, Collections.singletonList(STREAM_C)),
streamableBytesForTree(streamC));
assertEquals(calculateStreamSizeSum(streamSizes, Collections.singletonList(STREAM_D)),
streamableBytesForTree(streamD));
}
/**
* In this test, we block all streams shift the priority tree and verify priority bytes for each subtree are correct
*
* <pre>
* [0]
* / \
* A B
* / \
* C D
* </pre>
*
* After the tree shift:
*
* <pre>
* [0]
* |
* A
* |
* B
* / \
* C D
* </pre>
*/
@Test
public void subTreeBytesShouldBeCorrectWithRestructure() throws Http2Exception {
// Block the connection
exhaustStreamWindow(CONNECTION_STREAM_ID);
Http2Stream stream0 = connection.connectionStream();
Http2Stream streamA = connection.stream(STREAM_A);
Http2Stream streamB = connection.stream(STREAM_B);
Http2Stream streamC = connection.stream(STREAM_C);
Http2Stream streamD = connection.stream(STREAM_D);
// Send a bunch of data on each stream.
final IntObjectMap<Integer> streamSizes = new IntObjectHashMap<Integer>(4);
streamSizes.put(STREAM_A, (Integer) 400);
streamSizes.put(STREAM_B, (Integer) 500);
streamSizes.put(STREAM_C, (Integer) 600);
streamSizes.put(STREAM_D, (Integer) 700);
FakeFlowControlled dataA = new FakeFlowControlled(streamSizes.get(STREAM_A));
FakeFlowControlled dataB = new FakeFlowControlled(streamSizes.get(STREAM_B));
FakeFlowControlled dataC = new FakeFlowControlled(streamSizes.get(STREAM_C));
FakeFlowControlled dataD = new FakeFlowControlled(streamSizes.get(STREAM_D));
sendData(STREAM_A, dataA);
sendData(STREAM_B, dataB);
sendData(STREAM_C, dataC);
sendData(STREAM_D, dataD);
controller.writePendingBytes();
dataA.assertNotWritten();
dataB.assertNotWritten();
dataC.assertNotWritten();
dataD.assertNotWritten();
streamB.setPriority(STREAM_A, DEFAULT_PRIORITY_WEIGHT, true);
assertEquals(calculateStreamSizeSum(streamSizes,
Arrays.asList(STREAM_A, STREAM_B, STREAM_C, STREAM_D)),
streamableBytesForTree(stream0));
assertEquals(calculateStreamSizeSum(streamSizes,
Arrays.asList(STREAM_A, STREAM_B, STREAM_C, STREAM_D)),
streamableBytesForTree(streamA));
assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_B, STREAM_C, STREAM_D)),
streamableBytesForTree(streamB));
assertEquals(calculateStreamSizeSum(streamSizes, Collections.singletonList(STREAM_C)),
streamableBytesForTree(streamC));
assertEquals(calculateStreamSizeSum(streamSizes, Collections.singletonList(STREAM_D)),
streamableBytesForTree(streamD));
}
/**
* In this test, we block all streams and add a node to the priority tree and verify
*
* <pre>
* [0]
* / \
* A B
* / \
* C D
* </pre>
*
* After the tree shift:
*
* <pre>
* [0]
* / \
* A B
* |
* E
* / \
* C D
* </pre>
*/
@Test
public void subTreeBytesShouldBeCorrectWithAddition() throws Http2Exception {
// Block the connection
exhaustStreamWindow(CONNECTION_STREAM_ID);
Http2Stream stream0 = connection.connectionStream();
Http2Stream streamA = connection.stream(STREAM_A);
Http2Stream streamB = connection.stream(STREAM_B);
Http2Stream streamC = connection.stream(STREAM_C);
Http2Stream streamD = connection.stream(STREAM_D);
Http2Stream streamE = connection.local().createStream(STREAM_E, false);
streamE.setPriority(STREAM_A, DEFAULT_PRIORITY_WEIGHT, true);
// Send a bunch of data on each stream.
final IntObjectMap<Integer> streamSizes = new IntObjectHashMap<Integer>(4);
streamSizes.put(STREAM_A, (Integer) 400);
streamSizes.put(STREAM_B, (Integer) 500);
streamSizes.put(STREAM_C, (Integer) 600);
streamSizes.put(STREAM_D, (Integer) 700);
streamSizes.put(STREAM_E, (Integer) 900);
FakeFlowControlled dataA = new FakeFlowControlled(streamSizes.get(STREAM_A));
FakeFlowControlled dataB = new FakeFlowControlled(streamSizes.get(STREAM_B));
FakeFlowControlled dataC = new FakeFlowControlled(streamSizes.get(STREAM_C));
FakeFlowControlled dataD = new FakeFlowControlled(streamSizes.get(STREAM_D));
FakeFlowControlled dataE = new FakeFlowControlled(streamSizes.get(STREAM_E));
sendData(STREAM_A, dataA);
sendData(STREAM_B, dataB);
sendData(STREAM_C, dataC);
sendData(STREAM_D, dataD);
sendData(STREAM_E, dataE);
controller.writePendingBytes();
dataA.assertNotWritten();
dataB.assertNotWritten();
dataC.assertNotWritten();
dataD.assertNotWritten();
dataE.assertNotWritten();
assertEquals(calculateStreamSizeSum(streamSizes,
Arrays.asList(STREAM_A, STREAM_B, STREAM_C, STREAM_D, STREAM_E)),
streamableBytesForTree(stream0));
assertEquals(calculateStreamSizeSum(streamSizes,
Arrays.asList(STREAM_A, STREAM_E, STREAM_C, STREAM_D)),
streamableBytesForTree(streamA));
assertEquals(calculateStreamSizeSum(streamSizes, Collections.singletonList(STREAM_B)),
streamableBytesForTree(streamB));
assertEquals(calculateStreamSizeSum(streamSizes, Collections.singletonList(STREAM_C)),
streamableBytesForTree(streamC));
assertEquals(calculateStreamSizeSum(streamSizes, Collections.singletonList(STREAM_D)),
streamableBytesForTree(streamD));
assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_E, STREAM_C, STREAM_D)),
streamableBytesForTree(streamE));
}
/**
* In this test, we block all streams and close an internal stream in the priority tree but tree should not change
*
* <pre>
* [0]
* / \
* A B
* / \
* C D
* </pre>
*/
@Test
public void subTreeBytesShouldBeCorrectWithInternalStreamClose() throws Http2Exception {
// Block the connection
exhaustStreamWindow(CONNECTION_STREAM_ID);
Http2Stream stream0 = connection.connectionStream();
Http2Stream streamA = connection.stream(STREAM_A);
Http2Stream streamB = connection.stream(STREAM_B);
Http2Stream streamC = connection.stream(STREAM_C);
Http2Stream streamD = connection.stream(STREAM_D);
// Send a bunch of data on each stream.
final IntObjectMap<Integer> streamSizes = new IntObjectHashMap<Integer>(4);
streamSizes.put(STREAM_A, (Integer) 400);
streamSizes.put(STREAM_B, (Integer) 500);
streamSizes.put(STREAM_C, (Integer) 600);
streamSizes.put(STREAM_D, (Integer) 700);
FakeFlowControlled dataA = new FakeFlowControlled(streamSizes.get(STREAM_A));
FakeFlowControlled dataB = new FakeFlowControlled(streamSizes.get(STREAM_B));
FakeFlowControlled dataC = new FakeFlowControlled(streamSizes.get(STREAM_C));
FakeFlowControlled dataD = new FakeFlowControlled(streamSizes.get(STREAM_D));
sendData(STREAM_A, dataA);
sendData(STREAM_B, dataB);
sendData(STREAM_C, dataC);
sendData(STREAM_D, dataD);
controller.writePendingBytes();
dataA.assertNotWritten();
dataB.assertNotWritten();
dataC.assertNotWritten();
dataD.assertNotWritten();
streamA.close();
assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_B, STREAM_C, STREAM_D)),
streamableBytesForTree(stream0));
assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_C, STREAM_D)),
streamableBytesForTree(streamA));
assertEquals(calculateStreamSizeSum(streamSizes, Collections.singletonList(STREAM_B)),
streamableBytesForTree(streamB));
assertEquals(calculateStreamSizeSum(streamSizes, Collections.singletonList(STREAM_C)),
streamableBytesForTree(streamC));
assertEquals(calculateStreamSizeSum(streamSizes, Collections.singletonList(STREAM_D)),
streamableBytesForTree(streamD));
}
/**
* In this test, we block all streams and close a leaf stream in the priority tree and verify
*
* <pre>
* [0]
* / \
* A B
* / \
* C D
* </pre>
*
* After the close:
* <pre>
* [0]
* / \
* A B
* |
* D
* </pre>
*/
@Test
public void subTreeBytesShouldBeCorrectWithLeafStreamClose() throws Http2Exception {
// Block the connection
exhaustStreamWindow(CONNECTION_STREAM_ID);
Http2Stream stream0 = connection.connectionStream();
Http2Stream streamA = connection.stream(STREAM_A);
Http2Stream streamB = connection.stream(STREAM_B);
Http2Stream streamC = connection.stream(STREAM_C);
Http2Stream streamD = connection.stream(STREAM_D);
// Send a bunch of data on each stream.
final IntObjectMap<Integer> streamSizes = new IntObjectHashMap<Integer>(4);
streamSizes.put(STREAM_A, (Integer) 400);
streamSizes.put(STREAM_B, (Integer) 500);
streamSizes.put(STREAM_C, (Integer) 600);
streamSizes.put(STREAM_D, (Integer) 700);
FakeFlowControlled dataA = new FakeFlowControlled(streamSizes.get(STREAM_A));
FakeFlowControlled dataB = new FakeFlowControlled(streamSizes.get(STREAM_B));
FakeFlowControlled dataC = new FakeFlowControlled(streamSizes.get(STREAM_C));
FakeFlowControlled dataD = new FakeFlowControlled(streamSizes.get(STREAM_D));
sendData(STREAM_A, dataA);
sendData(STREAM_B, dataB);
sendData(STREAM_C, dataC);
sendData(STREAM_D, dataD);
controller.writePendingBytes();
dataA.assertNotWritten();
dataB.assertNotWritten();
dataC.assertNotWritten();
dataD.assertNotWritten();
streamC.close();
assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_A, STREAM_B, STREAM_D)),
streamableBytesForTree(stream0));
assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_A, STREAM_D)),
streamableBytesForTree(streamA));
assertEquals(calculateStreamSizeSum(streamSizes, Collections.singletonList(STREAM_B)),
streamableBytesForTree(streamB));
assertEquals(0, streamableBytesForTree(streamC));
assertEquals(calculateStreamSizeSum(streamSizes, Collections.singletonList(STREAM_D)),
streamableBytesForTree(streamD));
}
@Test @Test
public void flowControlledWriteThrowsAnException() throws Exception { public void flowControlledWriteThrowsAnException() throws Exception {
final Http2RemoteFlowController.FlowControlled flowControlled = mockedFlowControlledThatThrowsOnWrite(); final Http2RemoteFlowController.FlowControlled flowControlled = mockedFlowControlledThatThrowsOnWrite();
@ -1457,26 +705,11 @@ public class DefaultHttp2RemoteFlowControllerTest {
return flowControlled; return flowControlled;
} }
private static int calculateStreamSizeSum(IntObjectMap<Integer> streamSizes, List<Integer> streamIds) {
int sum = 0;
for (Integer streamId : streamIds) {
Integer streamSize = streamSizes.get(streamId);
if (streamSize != null) {
sum += streamSize;
}
}
return sum;
}
private void sendData(int streamId, FakeFlowControlled data) throws Http2Exception { private void sendData(int streamId, FakeFlowControlled data) throws Http2Exception {
Http2Stream stream = stream(streamId); Http2Stream stream = stream(streamId);
controller.addFlowControlled(stream, data); controller.addFlowControlled(stream, data);
} }
private void setPriority(int stream, int parent, int weight, boolean exclusive) throws Http2Exception {
connection.stream(stream).setPriority(parent, (short) weight, exclusive);
}
private void exhaustStreamWindow(int streamId) throws Http2Exception { private void exhaustStreamWindow(int streamId) throws Http2Exception {
incrementWindowSize(streamId, -window(streamId)); incrementWindowSize(streamId, -window(streamId));
} }
@ -1493,10 +726,6 @@ public class DefaultHttp2RemoteFlowControllerTest {
controller.incrementWindowSize(stream(streamId), delta); controller.incrementWindowSize(stream(streamId), delta);
} }
private int streamableBytesForTree(Http2Stream stream) {
return controller.streamableBytesForTree(stream);
}
private Http2Stream stream(int streamId) { private Http2Stream stream(int streamId) {
return connection.stream(streamId); return connection.stream(streamId);
} }

View File

@ -0,0 +1,694 @@
/*
* Copyright 2015 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License, version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.netty.handler.codec.http2;
import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.same;
import static org.mockito.Mockito.atMost;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import io.netty.util.collection.IntObjectHashMap;
import io.netty.util.collection.IntObjectMap;
import org.junit.Before;
import org.junit.Test;
import org.mockito.AdditionalMatchers;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.mockito.verification.VerificationMode;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
/**
* Tests for {@link PriorityStreamByteDistributor}.
*/
public class PriorityStreamByteDistributorTest {
private static final int STREAM_A = 1;
private static final int STREAM_B = 3;
private static final int STREAM_C = 5;
private static final int STREAM_D = 7;
private static final int STREAM_E = 9;
private Http2Connection connection;
private PriorityStreamByteDistributor distributor;
@Mock
private StreamByteDistributor.Writer writer;
@Before
public void setup() throws Http2Exception {
MockitoAnnotations.initMocks(this);
connection = new DefaultHttp2Connection(false);
distributor = new PriorityStreamByteDistributor(connection);
// Assume we always write all the allocated bytes.
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock in) throws Throwable {
Http2Stream stream = (Http2Stream) in.getArguments()[0];
int numBytes = (Integer) in.getArguments()[1];
int streamableBytes = distributor.unallocatedStreamableBytes(stream) - numBytes;
updateStream(stream.id(), streamableBytes, streamableBytes > 0);
return null;
}
}).when(writer).write(any(Http2Stream.class), anyInt());
connection.local().createStream(STREAM_A, false);
connection.local().createStream(STREAM_B, false);
Http2Stream streamC = connection.local().createStream(STREAM_C, false);
Http2Stream streamD = connection.local().createStream(STREAM_D, false);
streamC.setPriority(STREAM_A, DEFAULT_PRIORITY_WEIGHT, false);
streamD.setPriority(STREAM_A, DEFAULT_PRIORITY_WEIGHT, false);
}
@Test
public void bytesUnassignedAfterProcessing() {
updateStream(STREAM_A, 1, true);
updateStream(STREAM_B, 2, true);
updateStream(STREAM_C, 3, true);
updateStream(STREAM_D, 4, true);
assertFalse(write(10));
verifyWrite(STREAM_A, 1);
verifyWrite(STREAM_B, 2);
verifyWrite(STREAM_C, 3);
verifyWrite(STREAM_D, 4);
assertFalse(write(10));
verifyWrite(STREAM_A, 0);
verifyWrite(STREAM_B, 0);
verifyWrite(STREAM_C, 0);
verifyWrite(STREAM_D, 0);
}
@Test
public void bytesUnassignedAfterProcessingWithException() {
updateStream(STREAM_A, 1, true);
updateStream(STREAM_B, 2, true);
updateStream(STREAM_C, 3, true);
updateStream(STREAM_D, 4, true);
Exception fakeException = new RuntimeException("Fake exception");
doThrow(fakeException).when(writer).write(same(stream(STREAM_C)), eq(3));
try {
write(10);
fail("Expected an exception");
} catch (RuntimeException e) {
assertSame(fakeException, e);
}
verifyWrite(atMost(1), STREAM_A, 1);
verifyWrite(atMost(1), STREAM_B, 2);
verifyWrite(STREAM_C, 3);
verifyWrite(atMost(1), STREAM_D, 4);
doNothing().when(writer).write(same(stream(STREAM_C)), eq(3));
write(10);
verifyWrite(atMost(1), STREAM_A, 1);
verifyWrite(atMost(1), STREAM_B, 2);
verifyWrite(times(2), STREAM_C, 3);
verifyWrite(atMost(1), STREAM_D, 4);
}
/**
* In this test, we block A which allows bytes to be written by C and D. Here's a view of the tree (stream A is
* blocked).
*
* <pre>
* 0
* / \
* [A] B
* / \
* C D
* </pre>
*/
@Test
public void blockedStreamShouldSpreadDataToChildren() throws Http2Exception {
// A cannot stream.
updateStream(STREAM_B, 10, true);
updateStream(STREAM_C, 10, true);
updateStream(STREAM_D, 10, true);
// Write up to 10 bytes.
assertTrue(write(10));
// A is not written
verifyWrite(STREAM_A, 0);
// B is partially written
verifyWrite(STREAM_B, 5);
// Verify that C and D each shared half of A's allowance. Since A's allowance (5) cannot
// be split evenly, one will get 3 and one will get 2.
verifyWrite(STREAM_C, 3);
verifyWrite(STREAM_D, 2);
}
/**
* In this test, we block B which allows all bytes to be written by A. A should not share the data with its children
* since it's not blocked.
*
* <pre>
* 0
* / \
* A [B]
* / \
* C D
* </pre>
*/
@Test
public void childrenShouldNotSendDataUntilParentBlocked() throws Http2Exception {
// B cannot stream.
updateStream(STREAM_A, 10, true);
updateStream(STREAM_C, 10, true);
updateStream(STREAM_D, 10, true);
// Write up to 10 bytes.
assertTrue(write(10));
// A is assigned all of the bytes.
verifyWrite(STREAM_A, 10);
verifyWrite(STREAM_B, 0);
verifyWrite(STREAM_C, 0);
verifyWrite(STREAM_D, 0);
}
/**
* In this test, we block B which allows all bytes to be written by A. Once A is complete, it will spill over the
* remaining of its portion to its children.
*
* <pre>
* 0
* / \
* A [B]
* / \
* C D
* </pre>
*/
@Test
public void parentShouldWaterFallDataToChildren() throws Http2Exception {
// B cannot stream.
updateStream(STREAM_A, 5, true);
updateStream(STREAM_C, 10, true);
updateStream(STREAM_D, 10, true);
// Write up to 10 bytes.
assertTrue(write(10));
verifyWrite(STREAM_A, 5);
verifyWrite(STREAM_B, 0);
verifyWrite(STREAM_C, 3);
verifyWrite(STREAM_D, 2);
}
/**
* In this test, we verify re-prioritizing a stream. We start out with B blocked:
*
* <pre>
* 0
* / \
* A [B]
* / \
* C D
* </pre>
*
* We then re-prioritize D so that it's directly off of the connection and verify that A and D split the written
* bytes between them.
*
* <pre>
* 0
* /|\
* / | \
* A [B] D
* /
* C
* </pre>
*/
@Test
public void reprioritizeShouldAdjustOutboundFlow() throws Http2Exception {
// B cannot stream.
updateStream(STREAM_A, 10, true);
updateStream(STREAM_C, 10, true);
updateStream(STREAM_D, 10, true);
// Re-prioritize D as a direct child of the connection.
setPriority(STREAM_D, 0, DEFAULT_PRIORITY_WEIGHT, false);
assertTrue(write(10));
verifyWrite(STREAM_A, 5);
verifyWrite(STREAM_B, 0);
verifyWrite(STREAM_C, 0);
verifyWrite(STREAM_D, 5);
}
/**
* Test that the maximum allowed amount the flow controller allows to be sent is always fully allocated if
* the streams have at least this much data to send. See https://github.com/netty/netty/issues/4266.
* <pre>
* 0
* / | \
* / | \
* A(0) B(0) C(0)
* /
* D(> allowed to send in 1 allocation attempt)
* </pre>
*/
@Test
public void unstreamableParentsShouldFeedHungryChildren() throws Http2Exception {
// Setup the priority tree.
setPriority(STREAM_A, 0, (short) 32, false);
setPriority(STREAM_B, 0, (short) 16, false);
setPriority(STREAM_C, 0, (short) 16, false);
setPriority(STREAM_D, STREAM_A, (short) 16, false);
final int writableBytes = 100;
// Send enough so it can not be completely written out
final int expectedUnsentAmount = 1;
updateStream(STREAM_D, writableBytes + expectedUnsentAmount, true);
assertTrue(write(writableBytes));
verifyWrite(STREAM_D, writableBytes);
assertEquals(expectedUnsentAmount, streamableBytesForTree(stream(STREAM_D)));
}
/**
* In this test, we root all streams at the connection, and then verify that data is split appropriately based on
* weight (all available data is the same).
*
* <pre>
* 0
* / / \ \
* A B C D
* </pre>
*/
@Test
public void writeShouldPreferHighestWeight() throws Http2Exception {
// Root the streams at the connection and assign weights.
setPriority(STREAM_A, 0, (short) 50, false);
setPriority(STREAM_B, 0, (short) 200, false);
setPriority(STREAM_C, 0, (short) 100, false);
setPriority(STREAM_D, 0, (short) 100, false);
updateStream(STREAM_A, 1000, true);
updateStream(STREAM_B, 1000, true);
updateStream(STREAM_C, 1000, true);
updateStream(STREAM_D, 1000, true);
assertTrue(write(1000));
// A is assigned all of the bytes.
int allowedError = 10;
verifyWriteWithDelta(STREAM_A, 109, allowedError);
verifyWriteWithDelta(STREAM_B, 445, allowedError);
verifyWriteWithDelta(STREAM_C, 223, allowedError);
verifyWriteWithDelta(STREAM_D, 223, allowedError);
}
/**
* In this test, we root all streams at the connection, and then verify that data is split equally among the stream,
* since they all have the same weight.
*
* <pre>
* 0
* / / \ \
* A B C D
* </pre>
*/
@Test
public void samePriorityShouldDistributeBasedOnData() throws Http2Exception {
// Root the streams at the connection with the same weights.
setPriority(STREAM_A, 0, DEFAULT_PRIORITY_WEIGHT, false);
setPriority(STREAM_B, 0, DEFAULT_PRIORITY_WEIGHT, false);
setPriority(STREAM_C, 0, DEFAULT_PRIORITY_WEIGHT, false);
setPriority(STREAM_D, 0, DEFAULT_PRIORITY_WEIGHT, false);
updateStream(STREAM_A, 400, true);
updateStream(STREAM_B, 500, true);
updateStream(STREAM_C, 0, true);
updateStream(STREAM_D, 700, true);
assertTrue(write(999));
verifyWrite(STREAM_A, 333);
verifyWrite(STREAM_B, 333);
verifyWrite(STREAM_C, 0);
verifyWrite(STREAM_D, 333);
}
/**
* In this test, we verify the priority bytes for each sub tree at each node are correct
*
* <pre>
* 0
* / \
* A B
* / \
* C D
* </pre>
*/
@Test
public void subTreeBytesShouldBeCorrect() throws Http2Exception {
Http2Stream stream0 = connection.connectionStream();
Http2Stream streamA = connection.stream(STREAM_A);
Http2Stream streamB = connection.stream(STREAM_B);
Http2Stream streamC = connection.stream(STREAM_C);
Http2Stream streamD = connection.stream(STREAM_D);
// Send a bunch of data on each stream.
final IntObjectMap<Integer> streamSizes = new IntObjectHashMap<Integer>(4);
streamSizes.put(STREAM_A, (Integer) 400);
streamSizes.put(STREAM_B, (Integer) 500);
streamSizes.put(STREAM_C, (Integer) 600);
streamSizes.put(STREAM_D, (Integer) 700);
updateStream(STREAM_A, streamSizes.get(STREAM_A), true);
updateStream(STREAM_B, streamSizes.get(STREAM_B), true);
updateStream(STREAM_C, streamSizes.get(STREAM_C), true);
updateStream(STREAM_D, streamSizes.get(STREAM_D), true);
assertEquals(calculateStreamSizeSum(streamSizes,
Arrays.asList(STREAM_A, STREAM_B, STREAM_C, STREAM_D)),
streamableBytesForTree(stream0));
assertEquals(
calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_A, STREAM_C, STREAM_D)),
streamableBytesForTree(streamA));
assertEquals(calculateStreamSizeSum(streamSizes, Collections.singletonList(STREAM_B)),
streamableBytesForTree(streamB));
assertEquals(calculateStreamSizeSum(streamSizes, Collections.singletonList(STREAM_C)),
streamableBytesForTree(streamC));
assertEquals(calculateStreamSizeSum(streamSizes, Collections.singletonList(STREAM_D)),
streamableBytesForTree(streamD));
}
/**
* In this test, we shift the priority tree and verify priority bytes for each subtree are correct
*
* <pre>
* 0
* / \
* A B
* / \
* C D
* </pre>
*
* After the tree shift:
*
* <pre>
* 0
* |
* A
* |
* B
* / \
* C D
* </pre>
*/
@Test
public void subTreeBytesShouldBeCorrectWithRestructure() throws Http2Exception {
Http2Stream stream0 = connection.connectionStream();
Http2Stream streamA = connection.stream(STREAM_A);
Http2Stream streamB = connection.stream(STREAM_B);
Http2Stream streamC = connection.stream(STREAM_C);
Http2Stream streamD = connection.stream(STREAM_D);
// Send a bunch of data on each stream.
final IntObjectMap<Integer> streamSizes = new IntObjectHashMap<Integer>(4);
streamSizes.put(STREAM_A, (Integer) 400);
streamSizes.put(STREAM_B, (Integer) 500);
streamSizes.put(STREAM_C, (Integer) 600);
streamSizes.put(STREAM_D, (Integer) 700);
updateStream(STREAM_A, streamSizes.get(STREAM_A), true);
updateStream(STREAM_B, streamSizes.get(STREAM_B), true);
updateStream(STREAM_C, streamSizes.get(STREAM_C), true);
updateStream(STREAM_D, streamSizes.get(STREAM_D), true);
streamB.setPriority(STREAM_A, DEFAULT_PRIORITY_WEIGHT, true);
assertEquals(calculateStreamSizeSum(streamSizes,
Arrays.asList(STREAM_A, STREAM_B, STREAM_C, STREAM_D)),
streamableBytesForTree(stream0));
assertEquals(calculateStreamSizeSum(streamSizes,
Arrays.asList(STREAM_A, STREAM_B, STREAM_C, STREAM_D)),
streamableBytesForTree(streamA));
assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_B, STREAM_C, STREAM_D)),
streamableBytesForTree(streamB));
assertEquals(calculateStreamSizeSum(streamSizes, Collections.singletonList(STREAM_C)),
streamableBytesForTree(streamC));
assertEquals(calculateStreamSizeSum(streamSizes, Collections.singletonList(STREAM_D)),
streamableBytesForTree(streamD));
}
/**
* In this test, we add a node to the priority tree and verify
*
* <pre>
* 0
* / \
* A B
* / \
* C D
* </pre>
*
* After the tree shift:
*
* <pre>
* 0
* / \
* A B
* |
* E
* / \
* C D
* </pre>
*/
@Test
public void subTreeBytesShouldBeCorrectWithAddition() throws Http2Exception {
Http2Stream stream0 = connection.connectionStream();
Http2Stream streamA = connection.stream(STREAM_A);
Http2Stream streamB = connection.stream(STREAM_B);
Http2Stream streamC = connection.stream(STREAM_C);
Http2Stream streamD = connection.stream(STREAM_D);
Http2Stream streamE = connection.local().createStream(STREAM_E, false);
streamE.setPriority(STREAM_A, DEFAULT_PRIORITY_WEIGHT, true);
// Send a bunch of data on each stream.
final IntObjectMap<Integer> streamSizes = new IntObjectHashMap<Integer>(4);
streamSizes.put(STREAM_A, (Integer) 400);
streamSizes.put(STREAM_B, (Integer) 500);
streamSizes.put(STREAM_C, (Integer) 600);
streamSizes.put(STREAM_D, (Integer) 700);
streamSizes.put(STREAM_E, (Integer) 900);
updateStream(STREAM_A, streamSizes.get(STREAM_A), true);
updateStream(STREAM_B, streamSizes.get(STREAM_B), true);
updateStream(STREAM_C, streamSizes.get(STREAM_C), true);
updateStream(STREAM_D, streamSizes.get(STREAM_D), true);
updateStream(STREAM_E, streamSizes.get(STREAM_E), true);
assertEquals(calculateStreamSizeSum(streamSizes,
Arrays.asList(STREAM_A, STREAM_B, STREAM_C, STREAM_D, STREAM_E)),
streamableBytesForTree(stream0));
assertEquals(calculateStreamSizeSum(streamSizes,
Arrays.asList(STREAM_A, STREAM_E, STREAM_C, STREAM_D)),
streamableBytesForTree(streamA));
assertEquals(calculateStreamSizeSum(streamSizes, Collections.singletonList(STREAM_B)),
streamableBytesForTree(streamB));
assertEquals(calculateStreamSizeSum(streamSizes, Collections.singletonList(STREAM_C)),
streamableBytesForTree(streamC));
assertEquals(calculateStreamSizeSum(streamSizes, Collections.singletonList(STREAM_D)),
streamableBytesForTree(streamD));
assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_E, STREAM_C, STREAM_D)),
streamableBytesForTree(streamE));
}
/**
* In this test, we close an internal stream in the priority tree but tree should not change
*
* <pre>
* 0
* / \
* A B
* / \
* C D
* </pre>
*/
@Test
public void subTreeBytesShouldBeCorrectWithInternalStreamClose() throws Http2Exception {
Http2Stream stream0 = connection.connectionStream();
Http2Stream streamA = connection.stream(STREAM_A);
Http2Stream streamB = connection.stream(STREAM_B);
Http2Stream streamC = connection.stream(STREAM_C);
Http2Stream streamD = connection.stream(STREAM_D);
// Send a bunch of data on each stream.
final IntObjectMap<Integer> streamSizes = new IntObjectHashMap<Integer>(4);
streamSizes.put(STREAM_A, (Integer) 400);
streamSizes.put(STREAM_B, (Integer) 500);
streamSizes.put(STREAM_C, (Integer) 600);
streamSizes.put(STREAM_D, (Integer) 700);
updateStream(STREAM_A, streamSizes.get(STREAM_A), true);
updateStream(STREAM_B, streamSizes.get(STREAM_B), true);
updateStream(STREAM_C, streamSizes.get(STREAM_C), true);
updateStream(STREAM_D, streamSizes.get(STREAM_D), true);
streamA.close();
assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_B, STREAM_C, STREAM_D)),
streamableBytesForTree(stream0));
assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_C, STREAM_D)),
streamableBytesForTree(streamA));
assertEquals(calculateStreamSizeSum(streamSizes, Collections.singletonList(STREAM_B)),
streamableBytesForTree(streamB));
assertEquals(calculateStreamSizeSum(streamSizes, Collections.singletonList(STREAM_C)),
streamableBytesForTree(streamC));
assertEquals(calculateStreamSizeSum(streamSizes, Collections.singletonList(STREAM_D)),
streamableBytesForTree(streamD));
}
/**
* In this test, we close a leaf stream in the priority tree and verify
*
* <pre>
* 0
* / \
* A B
* / \
* C D
* </pre>
*
* After the close:
* <pre>
* 0
* / \
* A B
* |
* D
* </pre>
*/
@Test
public void subTreeBytesShouldBeCorrectWithLeafStreamClose() throws Http2Exception {
Http2Stream stream0 = connection.connectionStream();
Http2Stream streamA = connection.stream(STREAM_A);
Http2Stream streamB = connection.stream(STREAM_B);
Http2Stream streamC = connection.stream(STREAM_C);
Http2Stream streamD = connection.stream(STREAM_D);
// Send a bunch of data on each stream.
final IntObjectMap<Integer> streamSizes = new IntObjectHashMap<Integer>(4);
streamSizes.put(STREAM_A, (Integer) 400);
streamSizes.put(STREAM_B, (Integer) 500);
streamSizes.put(STREAM_C, (Integer) 600);
streamSizes.put(STREAM_D, (Integer) 700);
updateStream(STREAM_A, streamSizes.get(STREAM_A), true);
updateStream(STREAM_B, streamSizes.get(STREAM_B), true);
updateStream(STREAM_C, streamSizes.get(STREAM_C), true);
updateStream(STREAM_D, streamSizes.get(STREAM_D), true);
streamC.close();
assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_A, STREAM_B, STREAM_D)),
streamableBytesForTree(stream0));
assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_A, STREAM_D)),
streamableBytesForTree(streamA));
assertEquals(calculateStreamSizeSum(streamSizes, Collections.singletonList(STREAM_B)),
streamableBytesForTree(streamB));
assertEquals(0, streamableBytesForTree(streamC));
assertEquals(calculateStreamSizeSum(streamSizes, Collections.singletonList(STREAM_D)),
streamableBytesForTree(streamD));
}
private Http2Stream stream(int streamId) {
return connection.stream(streamId);
}
private void updateStream(final int streamId, final int streamableBytes, final boolean hasFrame) {
final Http2Stream stream = stream(streamId);
distributor.updateStreamableBytes(new StreamByteDistributor.StreamState() {
@Override
public Http2Stream stream() {
return stream;
}
@Override
public int streamableBytes() {
return streamableBytes;
}
@Override
public boolean hasFrame() {
return hasFrame;
}
});
}
private void setPriority(int streamId, int parent, int weight, boolean exclusive) throws Http2Exception {
stream(streamId).setPriority(parent, (short) weight, exclusive);
}
private int streamableBytesForTree(Http2Stream stream) {
return distributor.unallocatedStreamableBytesForTree(stream);
}
private boolean write(int numBytes) {
return distributor.distribute(numBytes, writer);
}
private void verifyWrite(int streamId, int numBytes) {
verify(writer).write(same(stream(streamId)), eq(numBytes));
}
private void verifyWrite(VerificationMode mode, int streamId, int numBytes) {
verify(writer, mode).write(same(stream(streamId)), eq(numBytes));
}
private void verifyWriteWithDelta(int streamId, int numBytes, int delta) {
verify(writer).write(same(stream(streamId)), (int) AdditionalMatchers.eq(numBytes, delta));
}
private static int calculateStreamSizeSum(IntObjectMap<Integer> streamSizes, List<Integer> streamIds) {
int sum = 0;
for (Integer streamId : streamIds) {
Integer streamSize = streamSizes.get(streamId);
if (streamSize != null) {
sum += streamSize;
}
}
return sum;
}
}