HTTP/2 remove PriorityStreamByteDistributor

Motivation:
PriorityStreamByteDistributor is now obsolete and can be replaced by WeightedFairQueueByteDistributor.

Modifications:
- Remove PriorityStreamByteDistributor and use WeightedFairQueueByteDistributor by default.

Result:
PriorityStreamByteDistributor no longer has to be maintained and is replaced by a better algorithm.
This commit is contained in:
Scott Mitchell 2015-12-17 13:30:13 -08:00
parent 9ac430f16f
commit 72accceeac
7 changed files with 1 additions and 1191 deletions

View File

@ -293,7 +293,6 @@ public class DefaultHttp2Connection implements Http2Connection {
private short weight = DEFAULT_PRIORITY_WEIGHT;
private DefaultStream parent;
private IntObjectMap<DefaultStream> children = IntCollections.emptyMap();
private int totalChildWeights;
private int prioritizableForTree = 1;
private boolean resetSent;
private boolean headerSent;
@ -360,11 +359,6 @@ public class DefaultHttp2Connection implements Http2Connection {
return weight;
}
@Override
public final int totalChildWeights() {
return totalChildWeights;
}
@Override
public final DefaultStream parent() {
return parent;
@ -573,10 +567,6 @@ public class DefaultHttp2Connection implements Http2Connection {
final void weight(short weight) {
if (weight != this.weight) {
if (parent != null) {
int delta = weight - this.weight;
parent.totalChildWeights += delta;
}
final short oldWeight = this.weight;
this.weight = weight;
for (int i = 0; i < listeners.size(); i++) {
@ -601,10 +591,8 @@ public class DefaultHttp2Connection implements Http2Connection {
// It will either be added directly in this method, or after this method is called...but it will be added.
initChildren();
if (streamToRetain == null) {
totalChildWeights = 0;
prioritizableForTree = isPrioritizable() ? 1 : 0;
} else {
totalChildWeights = streamToRetain.weight();
// prioritizableForTree does not change because it is assumed all children node will still be
// descendants through an exclusive priority tree operation.
children.put(streamToRetain.id(), streamToRetain);
@ -628,7 +616,6 @@ public class DefaultHttp2Connection implements Http2Connection {
// may not be successful and may return null. This is because when an exclusive dependency is processed
// the children are removed in a previous recursive call but the child's parent link is updated here.
if (oldParent != null && oldParent.children.remove(child.id()) != null) {
oldParent.totalChildWeights -= child.weight();
if (!child.isDescendantOf(oldParent)) {
oldParent.decrementPrioritizableForTree(child.prioritizableForTree());
if (oldParent.prioritizableForTree() == 0) {
@ -647,7 +634,6 @@ public class DefaultHttp2Connection implements Http2Connection {
final Http2Stream oldChild = children.put(child.id(), child);
assert oldChild == null : "A stream with the same stream ID was already in the child map.";
totalChildWeights += child.weight();
incrementPrioritizableForTree(child.prioritizableForTree(), oldParent);
}
@ -669,7 +655,6 @@ public class DefaultHttp2Connection implements Http2Connection {
events.add(new ParentChangedEvent(child, child.parent()));
notifyParentChanging(child, null);
child.parent = null;
totalChildWeights -= child.weight();
decrementPrioritizableForTree(child.prioritizableForTree());
// Move up any grand children to be directly dependent on this node.

View File

@ -60,7 +60,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
}
public DefaultHttp2RemoteFlowController(Http2Connection connection, final Listener listener) {
this(connection, new PriorityStreamByteDistributor(connection), listener);
this(connection, new WeightedFairQueueByteDistributor(connection), listener);
}
public DefaultHttp2RemoteFlowController(Http2Connection connection,

View File

@ -49,7 +49,6 @@ import static io.netty.util.internal.ObjectUtil.checkNotNull;
import static java.lang.Math.min;
import static java.lang.String.format;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
/**
* Provides the default implementation for processing inbound frame events and delegates to a

View File

@ -169,11 +169,6 @@ public interface Http2Stream {
*/
short weight();
/**
* The total of the weights of all children of this stream.
*/
int totalChildWeights();
/**
* The parent (i.e. the node in the priority tree on which this node depends), or {@code null}
* if this is the root node (i.e. the connection, itself).

View File

@ -1,424 +0,0 @@
/*
* Copyright 2015 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License, version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.netty.handler.codec.http2;
import java.util.Arrays;
import static io.netty.handler.codec.http2.Http2CodecUtil.streamableBytes;
import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
import static io.netty.handler.codec.http2.Http2Exception.connectionError;
import static io.netty.util.internal.ObjectUtil.checkNotNull;
import static java.lang.Math.max;
import static java.lang.Math.min;
/**
* A {@link StreamByteDistributor} that implements the HTTP/2 priority tree algorithm for allocating
* bytes for all streams in the connection.
*/
public final class PriorityStreamByteDistributor implements StreamByteDistributor {
private final Http2Connection connection;
private final Http2Connection.PropertyKey stateKey;
private final WriteVisitor writeVisitor = new WriteVisitor();
public PriorityStreamByteDistributor(Http2Connection connection) {
this.connection = checkNotNull(connection, "connection");
// Add a state for the connection.
stateKey = connection.newKey();
connection.connectionStream().setProperty(stateKey,
new PriorityState(connection.connectionStream()));
// Register for notification of new streams.
connection.addListener(new Http2ConnectionAdapter() {
@Override
public void onStreamAdded(Http2Stream stream) {
stream.setProperty(stateKey, new PriorityState(stream));
}
@Override
public void onStreamClosed(Http2Stream stream) {
state(stream).close();
}
@Override
public void onPriorityTreeParentChanged(Http2Stream stream, Http2Stream oldParent) {
Http2Stream parent = stream.parent();
if (parent != null) {
long delta = state(stream).unallocatedStreamableBytesForTree();
if (delta != 0) {
state(parent).unallocatedStreamableBytesForTreeChanged(delta);
}
}
}
@Override
public void onPriorityTreeParentChanging(Http2Stream stream, Http2Stream newParent) {
Http2Stream parent = stream.parent();
if (parent != null) {
long delta = state(stream).unallocatedStreamableBytesForTree();
if (delta != 0) {
state(parent).unallocatedStreamableBytesForTreeChanged(-delta);
}
}
}
});
}
@Override
public void updateStreamableBytes(StreamState streamState) {
state(streamState.stream()).updateStreamableBytes(streamableBytes(streamState),
streamState.hasFrame());
}
@Override
public boolean distribute(int maxBytes, Writer writer) throws Http2Exception {
checkNotNull(writer, "writer");
if (maxBytes > 0) {
allocateBytesForTree(connection.connectionStream(), maxBytes);
}
// Need to write even if maxBytes == 0 in order to handle the case of empty frames.
writeVisitor.writeAllocatedBytes(writer);
return state(connection.connectionStream()).unallocatedStreamableBytesForTree() > 0;
}
/**
* For testing only.
*/
int unallocatedStreamableBytes(Http2Stream stream) {
return state(stream).unallocatedStreamableBytes();
}
/**
* For testing only.
*/
long unallocatedStreamableBytesForTree(Http2Stream stream) {
return state(stream).unallocatedStreamableBytesForTree();
}
/**
* This will allocate bytes by stream weight and priority for the entire tree rooted at {@code
* parent}, but does not write any bytes. The connection window is generally distributed amongst
* siblings according to their weight, however we need to ensure that the entire connection
* window is used (assuming streams have >= connection window bytes to send) and we may need
* some sort of rounding to accomplish this.
*
* @param parent The parent of the tree.
* @param connectionWindowSize The connection window this is available for use at this point in
* the tree.
* @return The number of bytes actually allocated.
*/
private int allocateBytesForTree(Http2Stream parent, int connectionWindowSize) {
PriorityState state = state(parent);
if (state.unallocatedStreamableBytesForTree() <= 0) {
return 0;
}
// If the number of streamable bytes for this tree will fit in the connection window
// then there is no need to prioritize the bytes...everyone sends what they have
if (state.unallocatedStreamableBytesForTree() <= connectionWindowSize) {
SimpleChildFeeder childFeeder = new SimpleChildFeeder(connectionWindowSize);
forEachChild(parent, childFeeder);
return childFeeder.bytesAllocated;
}
ChildFeeder childFeeder = new ChildFeeder(parent, connectionWindowSize);
// Iterate once over all children of this parent and try to feed all the children.
forEachChild(parent, childFeeder);
// Now feed any remaining children that are still hungry until the connection
// window collapses.
childFeeder.feedHungryChildren();
return childFeeder.bytesAllocated;
}
private void forEachChild(Http2Stream parent, Http2StreamVisitor childFeeder) {
try {
parent.forEachChild(childFeeder);
} catch (Http2Exception e) {
// Should never happen since the feeder doesn't throw.
throw new IllegalStateException(e);
}
}
private PriorityState state(Http2Stream stream) {
return checkNotNull(stream, "stream").getProperty(stateKey);
}
/**
* A {@link Http2StreamVisitor} that performs the HTTP/2 priority algorithm to distribute the
* available connection window appropriately to the children of a given stream.
*/
private final class ChildFeeder implements Http2StreamVisitor {
final int maxSize;
int totalWeight;
int connectionWindow;
int nextTotalWeight;
int nextConnectionWindow;
int bytesAllocated;
Http2Stream[] stillHungry;
int nextTail;
ChildFeeder(Http2Stream parent, int connectionWindow) {
maxSize = parent.numChildren();
totalWeight = parent.totalChildWeights();
this.connectionWindow = connectionWindow;
this.nextConnectionWindow = connectionWindow;
}
@Override
public boolean visit(Http2Stream child) {
// In order to make progress toward the connection window due to possible rounding errors, we make sure
// that each stream (with data to send) is given at least 1 byte toward the connection window.
int connectionWindowChunk =
max(1, (int) (connectionWindow * (child.weight() / (double) totalWeight)));
int bytesForTree = min(nextConnectionWindow, connectionWindowChunk);
PriorityState state = state(child);
int bytesForChild = min(state.unallocatedStreamableBytes(), bytesForTree);
// Allocate the bytes to this child.
if (bytesForChild > 0) {
state.allocate(bytesForChild);
bytesAllocated += bytesForChild;
nextConnectionWindow -= bytesForChild;
bytesForTree -= bytesForChild;
}
// Allocate any remaining bytes to the children of this stream.
if (bytesForTree > 0) {
int childBytesAllocated = allocateBytesForTree(child, bytesForTree);
bytesAllocated += childBytesAllocated;
nextConnectionWindow -= childBytesAllocated;
}
if (nextConnectionWindow > 0) {
// If this subtree still wants to send then it should be re-considered to take bytes that are unused by
// sibling nodes. This is needed because we don't yet know if all the peers will be able to use all of
// their "fair share" of the connection window, and if they don't use it then we should divide their
// unused shared up for the peers who still want to send.
if (state.unallocatedStreamableBytesForTree() > 0) {
stillHungry(child);
}
return true;
}
return false;
}
void feedHungryChildren() {
if (stillHungry == null) {
// There are no hungry children to feed.
return;
}
totalWeight = nextTotalWeight;
connectionWindow = nextConnectionWindow;
// Loop until there are not bytes left to stream or the connection window has collapsed.
for (int tail = nextTail; tail > 0 && connectionWindow > 0;) {
nextTotalWeight = 0;
nextTail = 0;
// Iterate over the children that are currently still hungry.
for (int head = 0; head < tail && nextConnectionWindow > 0; ++head) {
if (!visit(stillHungry[head])) {
// The connection window has collapsed, break out of the loop.
break;
}
}
connectionWindow = nextConnectionWindow;
totalWeight = nextTotalWeight;
tail = nextTail;
}
}
/**
* Indicates that the given child is still hungry (i.e. still has streamable bytes that can
* fit within the current connection window).
*/
void stillHungry(Http2Stream child) {
ensureSpaceIsAllocated(nextTail);
stillHungry[nextTail++] = child;
nextTotalWeight += child.weight();
}
/**
* Ensures that the {@link #stillHungry} array is properly sized to hold the given index.
*/
void ensureSpaceIsAllocated(int index) {
if (stillHungry == null) {
// Initial size is 1/4 the number of children. Clipping the minimum at 2, which will over allocate if
// maxSize == 1 but if this was true we shouldn't need to re-allocate because the 1 child should get
// all of the available connection window.
stillHungry = new Http2Stream[max(2, maxSize >>> 2)];
} else if (index == stillHungry.length) {
// Grow the array by a factor of 2.
stillHungry = Arrays.copyOf(stillHungry, min(maxSize, stillHungry.length << 1));
}
}
}
/**
* A simplified version of {@link ChildFeeder} that is only used when all streamable bytes fit
* within the available connection window.
*/
private final class SimpleChildFeeder implements Http2StreamVisitor {
int bytesAllocated;
int connectionWindow;
SimpleChildFeeder(int connectionWindow) {
this.connectionWindow = connectionWindow;
}
@Override
public boolean visit(Http2Stream child) {
PriorityState childState = state(child);
int bytesForChild = childState.unallocatedStreamableBytes();
if (bytesForChild > 0 || childState.hasFrame()) {
childState.allocate(bytesForChild);
bytesAllocated += bytesForChild;
connectionWindow -= bytesForChild;
}
int childBytesAllocated = allocateBytesForTree(child, connectionWindow);
bytesAllocated += childBytesAllocated;
connectionWindow -= childBytesAllocated;
return true;
}
}
/**
* The remote flow control state for a single stream.
*/
private final class PriorityState {
final Http2Stream stream;
boolean hasFrame;
int streamableBytes;
int allocated;
long unallocatedStreamableBytesForTree;
PriorityState(Http2Stream stream) {
this.stream = stream;
}
/**
* Recursively increments the {@link #unallocatedStreamableBytesForTree()} for this branch in
* the priority tree starting at the current node.
*/
void unallocatedStreamableBytesForTreeChanged(long delta) {
unallocatedStreamableBytesForTree += delta;
if (!stream.isRoot()) {
state(stream.parent()).unallocatedStreamableBytesForTreeChanged(delta);
}
}
void allocate(int bytes) {
allocated += bytes;
if (bytes != 0) {
// Also artificially reduce the streamable bytes for this tree to give the appearance
// that the data has been written. This will be restored before the allocated bytes are
// actually written.
unallocatedStreamableBytesForTreeChanged(-bytes);
}
}
/**
* Reset the number of bytes that have been allocated to this stream by the priority
* algorithm.
*/
void resetAllocated() {
allocate(-allocated);
}
void updateStreamableBytes(int newStreamableBytes, boolean hasFrame) {
assert hasFrame || newStreamableBytes == 0;
this.hasFrame = hasFrame;
int delta = newStreamableBytes - streamableBytes;
if (delta != 0) {
streamableBytes = newStreamableBytes;
// Update this branch of the priority tree if the streamable bytes have changed for this node.
unallocatedStreamableBytesForTreeChanged(delta);
}
}
void close() {
// Unallocate all bytes.
resetAllocated();
// Clear the streamable bytes.
updateStreamableBytes(0, false);
}
boolean hasFrame() {
return hasFrame;
}
int unallocatedStreamableBytes() {
return streamableBytes - allocated;
}
long unallocatedStreamableBytesForTree() {
return unallocatedStreamableBytesForTree;
}
}
/**
* A connection stream visitor that delegates to the user provided visitor.
*/
private final class WriteVisitor implements Http2StreamVisitor {
private boolean iterating;
private Writer writer;
void writeAllocatedBytes(Writer writer) throws Http2Exception {
if (iterating) {
throw connectionError(INTERNAL_ERROR, "byte distribution re-entry error");
}
this.writer = writer;
try {
iterating = true;
connection.forEachActiveStream(this);
} finally {
iterating = false;
}
}
@Override
public boolean visit(Http2Stream stream) throws Http2Exception {
PriorityState state = state(stream);
int allocated = state.allocated;
// Unallocate all bytes for this stream.
state.resetAllocated();
try {
// Write the allocated bytes.
writer.write(stream, allocated);
} catch (Throwable t) { // catch Throwable in case any unchecked re-throw tricks are used.
// Stop calling the visitor and close the connection as exceptions from the writer are not supported.
// If we don't close the connection there is risk that our internal state may be corrupted.
throw connectionError(INTERNAL_ERROR, t, "byte distribution write error");
}
// We have to iterate across all streams to ensure that we reset the allocated bytes.
return true;
}
}
}

View File

@ -339,7 +339,6 @@ public class DefaultHttp2ConnectionTest {
Http2Stream p = client.connectionStream();
assertEquals(1, p.numChildren());
assertEquals(5, p.prioritizableForTree());
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
// Level 1
p = child(p, streamA.id());
@ -347,7 +346,6 @@ public class DefaultHttp2ConnectionTest {
assertEquals(0, p.parent().id());
assertEquals(1, p.numChildren());
assertEquals(4, p.prioritizableForTree());
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
// Level 2
p = child(p, streamD.id());
@ -355,7 +353,6 @@ public class DefaultHttp2ConnectionTest {
assertEquals(streamA.id(), p.parent().id());
assertEquals(2, p.numChildren());
assertEquals(3, p.prioritizableForTree());
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
// Level 3
p = child(p, streamB.id());
@ -363,13 +360,11 @@ public class DefaultHttp2ConnectionTest {
assertEquals(streamD.id(), p.parent().id());
assertEquals(0, p.numChildren());
assertEquals(1, p.prioritizableForTree());
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
p = child(p.parent(), streamC.id());
assertNotNull(p);
assertEquals(streamD.id(), p.parent().id());
assertEquals(0, p.numChildren());
assertEquals(1, p.prioritizableForTree());
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
}
@Test
@ -392,7 +387,6 @@ public class DefaultHttp2ConnectionTest {
Http2Stream p = client.connectionStream();
assertEquals(1, p.numChildren());
assertEquals(5, p.prioritizableForTree());
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
// Level 1
p = child(p, streamA.id());
@ -400,7 +394,6 @@ public class DefaultHttp2ConnectionTest {
assertEquals(0, p.parent().id());
assertEquals(1, p.numChildren());
assertEquals(4, p.prioritizableForTree());
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
// Level 2
p = child(p, streamC.id());
@ -408,7 +401,6 @@ public class DefaultHttp2ConnectionTest {
assertEquals(streamA.id(), p.parent().id());
assertEquals(2, p.numChildren());
assertEquals(3, p.prioritizableForTree());
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
// Level 3
p = child(p, streamB.id());
@ -416,13 +408,11 @@ public class DefaultHttp2ConnectionTest {
assertEquals(streamC.id(), p.parent().id());
assertEquals(0, p.numChildren());
assertEquals(1, p.prioritizableForTree());
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
p = child(p.parent(), streamD.id());
assertNotNull(p);
assertEquals(streamC.id(), p.parent().id());
assertEquals(0, p.numChildren());
assertEquals(1, p.prioritizableForTree());
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
}
@Test
@ -449,7 +439,6 @@ public class DefaultHttp2ConnectionTest {
Http2Stream p = client.connectionStream();
assertEquals(2, p.numChildren());
assertEquals(7, p.prioritizableForTree());
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
// Level 1
p = child(p, streamE.id());
@ -457,13 +446,11 @@ public class DefaultHttp2ConnectionTest {
assertEquals(0, p.parent().id());
assertEquals(0, p.numChildren());
assertEquals(1, p.prioritizableForTree());
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
p = child(p.parent(), streamA.id());
assertNotNull(p);
assertEquals(0, p.parent().id());
assertEquals(1, p.numChildren());
assertEquals(5, p.prioritizableForTree());
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
// Level 2
p = child(p, streamF.id());
@ -471,7 +458,6 @@ public class DefaultHttp2ConnectionTest {
assertEquals(streamA.id(), p.parent().id());
assertEquals(2, p.numChildren());
assertEquals(4, p.prioritizableForTree());
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
// Level 3
p = child(p, streamB.id());
@ -479,13 +465,11 @@ public class DefaultHttp2ConnectionTest {
assertEquals(streamF.id(), p.parent().id());
assertEquals(0, p.numChildren());
assertEquals(1, p.prioritizableForTree());
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
p = child(p.parent(), streamC.id());
assertNotNull(p);
assertEquals(streamF.id(), p.parent().id());
assertEquals(1, p.numChildren());
assertEquals(2, p.prioritizableForTree());
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
// Level 4
p = child(p, streamD.id());
@ -493,7 +477,6 @@ public class DefaultHttp2ConnectionTest {
assertEquals(streamC.id(), p.parent().id());
assertEquals(0, p.numChildren());
assertEquals(1, p.prioritizableForTree());
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
}
@Test
@ -637,7 +620,6 @@ public class DefaultHttp2ConnectionTest {
Http2Stream p = client.connectionStream();
assertEquals(4, p.prioritizableForTree());
assertEquals(1, p.numChildren());
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
// Level 1
p = child(p, streamA.id());
@ -645,7 +627,6 @@ public class DefaultHttp2ConnectionTest {
assertEquals(3, p.prioritizableForTree());
assertEquals(0, p.parent().id());
assertEquals(1, p.numChildren());
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
// Level 2
p = child(p, streamB.id());
@ -653,7 +634,6 @@ public class DefaultHttp2ConnectionTest {
assertEquals(2, p.prioritizableForTree());
assertEquals(streamA.id(), p.parent().id());
assertEquals(2, p.numChildren());
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
// Level 3
p = child(p, streamC.id());
@ -661,13 +641,11 @@ public class DefaultHttp2ConnectionTest {
assertEquals(1, p.prioritizableForTree());
assertEquals(streamB.id(), p.parent().id());
assertEquals(0, p.numChildren());
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
p = child(p.parent(), streamD.id());
assertNotNull(p);
assertEquals(1, p.prioritizableForTree());
assertEquals(streamB.id(), p.parent().id());
assertEquals(0, p.numChildren());
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
}
@Test
@ -696,7 +674,6 @@ public class DefaultHttp2ConnectionTest {
Http2Stream p = client.connectionStream();
assertEquals(1, p.numChildren());
assertEquals(2, p.prioritizableForTree());
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
// Level 1
p = child(p, streamA.id());
@ -704,7 +681,6 @@ public class DefaultHttp2ConnectionTest {
assertEquals(0, p.parent().id());
assertEquals(1, p.numChildren());
assertEquals(1, p.prioritizableForTree());
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
// Level 2
p = child(p, streamB.id());
@ -712,7 +688,6 @@ public class DefaultHttp2ConnectionTest {
assertEquals(streamA.id(), p.parent().id());
assertEquals(1, p.numChildren());
assertEquals(1, p.prioritizableForTree());
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
// Level 3
p = child(p, streamC.id());
@ -720,7 +695,6 @@ public class DefaultHttp2ConnectionTest {
assertEquals(streamB.id(), p.parent().id());
assertEquals(1, p.numChildren());
assertEquals(1, p.prioritizableForTree());
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
// Level 4
p = child(p, streamE.id());
@ -758,7 +732,6 @@ public class DefaultHttp2ConnectionTest {
Http2Stream p = client.connectionStream();
assertEquals(1, p.numChildren());
assertEquals(3, p.prioritizableForTree());
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
// Level 1
p = child(p, streamA.id());
@ -766,7 +739,6 @@ public class DefaultHttp2ConnectionTest {
assertEquals(0, p.parent().id());
assertEquals(1, p.numChildren());
assertEquals(2, p.prioritizableForTree());
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
// Level 2
p = child(p, streamB.id());
@ -774,7 +746,6 @@ public class DefaultHttp2ConnectionTest {
assertEquals(streamA.id(), p.parent().id());
assertEquals(1, p.numChildren());
assertEquals(2, p.prioritizableForTree());
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
// Level 3
p = child(p, streamC.id());
@ -782,7 +753,6 @@ public class DefaultHttp2ConnectionTest {
assertEquals(streamB.id(), p.parent().id());
assertEquals(2, p.numChildren());
assertEquals(2, p.prioritizableForTree());
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
// Level 4
p = child(p, streamE.id());
@ -853,44 +823,37 @@ public class DefaultHttp2ConnectionTest {
Http2Stream p = client.connectionStream();
assertEquals(1, p.numChildren());
assertEquals(7, p.prioritizableForTree());
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
// Level 1
p = child(p, streamD.id());
assertNotNull(p);
assertEquals(2, p.numChildren());
assertEquals(6, p.prioritizableForTree());
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
// Level 2
p = child(p, streamF.id());
assertNotNull(p);
assertEquals(0, p.numChildren());
assertEquals(1, p.prioritizableForTree());
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
p = child(p.parent(), streamA.id());
assertNotNull(p);
assertEquals(2, p.numChildren());
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
// Level 3
p = child(p, streamB.id());
assertNotNull(p);
assertEquals(0, p.numChildren());
assertEquals(1, p.prioritizableForTree());
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
p = child(p.parent(), streamC.id());
assertNotNull(p);
assertEquals(1, p.numChildren());
assertEquals(2, p.prioritizableForTree());
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
// Level 4;
p = child(p, streamE.id());
assertNotNull(p);
assertEquals(0, p.numChildren());
assertEquals(1, p.prioritizableForTree());
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
}
@Test
@ -951,45 +914,38 @@ public class DefaultHttp2ConnectionTest {
Http2Stream p = client.connectionStream();
assertEquals(1, p.numChildren());
assertEquals(7, p.prioritizableForTree());
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
// Level 1
p = child(p, streamD.id());
assertNotNull(p);
assertEquals(1, p.numChildren());
assertEquals(6, p.prioritizableForTree());
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
// Level 2
p = child(p, streamA.id());
assertNotNull(p);
assertEquals(3, p.numChildren());
assertEquals(5, p.prioritizableForTree());
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
// Level 3
p = child(p, streamB.id());
assertNotNull(p);
assertEquals(0, p.numChildren());
assertEquals(1, p.prioritizableForTree());
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
p = child(p.parent(), streamF.id());
assertNotNull(p);
assertEquals(0, p.numChildren());
assertEquals(1, p.prioritizableForTree());
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
p = child(p.parent(), streamC.id());
assertNotNull(p);
assertEquals(1, p.numChildren());
assertEquals(2, p.prioritizableForTree());
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
// Level 4;
p = child(p, streamE.id());
assertNotNull(p);
assertEquals(0, p.numChildren());
assertEquals(1, p.prioritizableForTree());
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
}
/**

View File

@ -1,701 +0,0 @@
/*
* Copyright 2015 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License, version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.netty.handler.codec.http2;
import io.netty.util.collection.IntObjectHashMap;
import io.netty.util.collection.IntObjectMap;
import org.junit.Before;
import org.junit.Test;
import org.mockito.AdditionalMatchers;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.mockito.verification.VerificationMode;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.same;
import static org.mockito.Mockito.atMost;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
/**
* Tests for {@link PriorityStreamByteDistributor}.
*/
public class PriorityStreamByteDistributorTest {
private static final int STREAM_A = 1;
private static final int STREAM_B = 3;
private static final int STREAM_C = 5;
private static final int STREAM_D = 7;
private static final int STREAM_E = 9;
private Http2Connection connection;
private PriorityStreamByteDistributor distributor;
@Mock
private StreamByteDistributor.Writer writer;
@Before
public void setup() throws Http2Exception {
MockitoAnnotations.initMocks(this);
connection = new DefaultHttp2Connection(false);
distributor = new PriorityStreamByteDistributor(connection);
// Assume we always write all the allocated bytes.
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock in) throws Throwable {
Http2Stream stream = (Http2Stream) in.getArguments()[0];
int numBytes = (Integer) in.getArguments()[1];
int streamableBytes = distributor.unallocatedStreamableBytes(stream) - numBytes;
updateStream(stream.id(), streamableBytes, streamableBytes > 0);
return null;
}
}).when(writer).write(any(Http2Stream.class), anyInt());
connection.local().createStream(STREAM_A, false);
connection.local().createStream(STREAM_B, false);
Http2Stream streamC = connection.local().createStream(STREAM_C, false);
Http2Stream streamD = connection.local().createStream(STREAM_D, false);
streamC.setPriority(STREAM_A, DEFAULT_PRIORITY_WEIGHT, false);
streamD.setPriority(STREAM_A, DEFAULT_PRIORITY_WEIGHT, false);
}
@Test
public void bytesUnassignedAfterProcessing() throws Http2Exception {
updateStream(STREAM_A, 1, true);
updateStream(STREAM_B, 2, true);
updateStream(STREAM_C, 3, true);
updateStream(STREAM_D, 4, true);
assertFalse(write(10));
verifyWrite(STREAM_A, 1);
verifyWrite(STREAM_B, 2);
verifyWrite(STREAM_C, 3);
verifyWrite(STREAM_D, 4);
assertFalse(write(10));
verifyWrite(STREAM_A, 0);
verifyWrite(STREAM_B, 0);
verifyWrite(STREAM_C, 0);
verifyWrite(STREAM_D, 0);
}
@Test
public void connectionErrorForWriterException() throws Http2Exception {
updateStream(STREAM_A, 1, true);
updateStream(STREAM_B, 2, true);
updateStream(STREAM_C, 3, true);
updateStream(STREAM_D, 4, true);
Exception fakeException = new RuntimeException("Fake exception");
doThrow(fakeException).when(writer).write(same(stream(STREAM_C)), eq(3));
try {
write(10);
fail("Expected an exception");
} catch (Http2Exception e) {
assertFalse(Http2Exception.isStreamError(e));
assertEquals(Http2Error.INTERNAL_ERROR, e.error());
assertSame(fakeException, e.getCause());
}
verifyWrite(atMost(1), STREAM_A, 1);
verifyWrite(atMost(1), STREAM_B, 2);
verifyWrite(STREAM_C, 3);
verifyWrite(atMost(1), STREAM_D, 4);
doNothing().when(writer).write(same(stream(STREAM_C)), eq(3));
write(10);
verifyWrite(STREAM_A, 1);
verifyWrite(STREAM_B, 2);
verifyWrite(times(2), STREAM_C, 3);
verifyWrite(STREAM_D, 4);
}
/**
* In this test, we block A which allows bytes to be written by C and D. Here's a view of the tree (stream A is
* blocked).
*
* <pre>
* 0
* / \
* [A] B
* / \
* C D
* </pre>
*/
@Test
public void blockedStreamShouldSpreadDataToChildren() throws Http2Exception {
// A cannot stream.
updateStream(STREAM_B, 10, true);
updateStream(STREAM_C, 10, true);
updateStream(STREAM_D, 10, true);
// Write up to 10 bytes.
assertTrue(write(10));
// A is not written
verifyWrite(STREAM_A, 0);
// B is partially written
verifyWrite(STREAM_B, 5);
// Verify that C and D each shared half of A's allowance. Since A's allowance (5) cannot
// be split evenly, one will get 3 and one will get 2.
verifyWrite(STREAM_C, 3);
verifyWrite(STREAM_D, 2);
}
/**
* In this test, we block B which allows all bytes to be written by A. A should not share the data with its children
* since it's not blocked.
*
* <pre>
* 0
* / \
* A [B]
* / \
* C D
* </pre>
*/
@Test
public void childrenShouldNotSendDataUntilParentBlocked() throws Http2Exception {
// B cannot stream.
updateStream(STREAM_A, 10, true);
updateStream(STREAM_C, 10, true);
updateStream(STREAM_D, 10, true);
// Write up to 10 bytes.
assertTrue(write(10));
// A is assigned all of the bytes.
verifyWrite(STREAM_A, 10);
verifyWrite(STREAM_B, 0);
verifyWrite(STREAM_C, 0);
verifyWrite(STREAM_D, 0);
}
/**
* In this test, we block B which allows all bytes to be written by A. Once A is complete, it will spill over the
* remaining of its portion to its children.
*
* <pre>
* 0
* / \
* A [B]
* / \
* C D
* </pre>
*/
@Test
public void parentShouldWaterFallDataToChildren() throws Http2Exception {
// B cannot stream.
updateStream(STREAM_A, 5, true);
updateStream(STREAM_C, 10, true);
updateStream(STREAM_D, 10, true);
// Write up to 10 bytes.
assertTrue(write(10));
verifyWrite(STREAM_A, 5);
verifyWrite(STREAM_B, 0);
verifyWrite(STREAM_C, 3);
verifyWrite(STREAM_D, 2);
}
/**
* In this test, we verify re-prioritizing a stream. We start out with B blocked:
*
* <pre>
* 0
* / \
* A [B]
* / \
* C D
* </pre>
*
* We then re-prioritize D so that it's directly off of the connection and verify that A and D split the written
* bytes between them.
*
* <pre>
* 0
* /|\
* / | \
* A [B] D
* /
* C
* </pre>
*/
@Test
public void reprioritizeShouldAdjustOutboundFlow() throws Http2Exception {
// B cannot stream.
updateStream(STREAM_A, 10, true);
updateStream(STREAM_C, 10, true);
updateStream(STREAM_D, 10, true);
// Re-prioritize D as a direct child of the connection.
setPriority(STREAM_D, 0, DEFAULT_PRIORITY_WEIGHT, false);
assertTrue(write(10));
verifyWrite(STREAM_A, 5);
verifyWrite(STREAM_B, 0);
verifyWrite(STREAM_C, 0);
verifyWrite(STREAM_D, 5);
}
/**
* Test that the maximum allowed amount the flow controller allows to be sent is always fully allocated if
* the streams have at least this much data to send. See https://github.com/netty/netty/issues/4266.
* <pre>
* 0
* / | \
* / | \
* A(0) B(0) C(0)
* /
* D(> allowed to send in 1 allocation attempt)
* </pre>
*/
@Test
public void unstreamableParentsShouldFeedHungryChildren() throws Http2Exception {
// Setup the priority tree.
setPriority(STREAM_A, 0, (short) 32, false);
setPriority(STREAM_B, 0, (short) 16, false);
setPriority(STREAM_C, 0, (short) 16, false);
setPriority(STREAM_D, STREAM_A, (short) 16, false);
final int writableBytes = 100;
// Send enough so it can not be completely written out
final int expectedUnsentAmount = 1;
updateStream(STREAM_D, writableBytes + expectedUnsentAmount, true);
assertTrue(write(writableBytes));
verifyWrite(STREAM_D, writableBytes);
assertEquals(expectedUnsentAmount, streamableBytesForTree(stream(STREAM_D)));
}
/**
* In this test, we root all streams at the connection, and then verify that data is split appropriately based on
* weight (all available data is the same).
*
* <pre>
* 0
* / / \ \
* A B C D
* </pre>
*/
@Test
public void writeShouldPreferHighestWeight() throws Http2Exception {
// Root the streams at the connection and assign weights.
setPriority(STREAM_A, 0, (short) 50, false);
setPriority(STREAM_B, 0, (short) 200, false);
setPriority(STREAM_C, 0, (short) 100, false);
setPriority(STREAM_D, 0, (short) 100, false);
updateStream(STREAM_A, 1000, true);
updateStream(STREAM_B, 1000, true);
updateStream(STREAM_C, 1000, true);
updateStream(STREAM_D, 1000, true);
assertTrue(write(1000));
// A is assigned all of the bytes.
int allowedError = 10;
verifyWriteWithDelta(STREAM_A, 109, allowedError);
verifyWriteWithDelta(STREAM_B, 445, allowedError);
verifyWriteWithDelta(STREAM_C, 223, allowedError);
verifyWriteWithDelta(STREAM_D, 223, allowedError);
}
/**
* In this test, we root all streams at the connection, and then verify that data is split equally among the stream,
* since they all have the same weight.
*
* <pre>
* 0
* / / \ \
* A B C D
* </pre>
*/
@Test
public void samePriorityShouldDistributeBasedOnData() throws Http2Exception {
// Root the streams at the connection with the same weights.
setPriority(STREAM_A, 0, DEFAULT_PRIORITY_WEIGHT, false);
setPriority(STREAM_B, 0, DEFAULT_PRIORITY_WEIGHT, false);
setPriority(STREAM_C, 0, DEFAULT_PRIORITY_WEIGHT, false);
setPriority(STREAM_D, 0, DEFAULT_PRIORITY_WEIGHT, false);
updateStream(STREAM_A, 400, true);
updateStream(STREAM_B, 500, true);
updateStream(STREAM_C, 0, true);
updateStream(STREAM_D, 700, true);
assertTrue(write(999));
verifyWrite(STREAM_A, 333);
verifyWrite(STREAM_B, 333);
verifyWrite(STREAM_C, 0);
verifyWrite(STREAM_D, 333);
}
/**
* In this test, we verify the priority bytes for each sub tree at each node are correct
*
* <pre>
* 0
* / \
* A B
* / \
* C D
* </pre>
*/
@Test
public void subTreeBytesShouldBeCorrect() throws Http2Exception {
Http2Stream stream0 = connection.connectionStream();
Http2Stream streamA = connection.stream(STREAM_A);
Http2Stream streamB = connection.stream(STREAM_B);
Http2Stream streamC = connection.stream(STREAM_C);
Http2Stream streamD = connection.stream(STREAM_D);
// Send a bunch of data on each stream.
final IntObjectMap<Integer> streamSizes = new IntObjectHashMap<Integer>(4);
streamSizes.put(STREAM_A, (Integer) 400);
streamSizes.put(STREAM_B, (Integer) 500);
streamSizes.put(STREAM_C, (Integer) 600);
streamSizes.put(STREAM_D, (Integer) 700);
updateStream(STREAM_A, streamSizes.get(STREAM_A), true);
updateStream(STREAM_B, streamSizes.get(STREAM_B), true);
updateStream(STREAM_C, streamSizes.get(STREAM_C), true);
updateStream(STREAM_D, streamSizes.get(STREAM_D), true);
assertEquals(calculateStreamSizeSum(streamSizes,
Arrays.asList(STREAM_A, STREAM_B, STREAM_C, STREAM_D)),
streamableBytesForTree(stream0));
assertEquals(
calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_A, STREAM_C, STREAM_D)),
streamableBytesForTree(streamA));
assertEquals(calculateStreamSizeSum(streamSizes, Collections.singletonList(STREAM_B)),
streamableBytesForTree(streamB));
assertEquals(calculateStreamSizeSum(streamSizes, Collections.singletonList(STREAM_C)),
streamableBytesForTree(streamC));
assertEquals(calculateStreamSizeSum(streamSizes, Collections.singletonList(STREAM_D)),
streamableBytesForTree(streamD));
}
/**
* In this test, we shift the priority tree and verify priority bytes for each subtree are correct
*
* <pre>
* 0
* / \
* A B
* / \
* C D
* </pre>
*
* After the tree shift:
*
* <pre>
* 0
* |
* A
* |
* B
* / \
* C D
* </pre>
*/
@Test
public void subTreeBytesShouldBeCorrectWithRestructure() throws Http2Exception {
Http2Stream stream0 = connection.connectionStream();
Http2Stream streamA = connection.stream(STREAM_A);
Http2Stream streamB = connection.stream(STREAM_B);
Http2Stream streamC = connection.stream(STREAM_C);
Http2Stream streamD = connection.stream(STREAM_D);
// Send a bunch of data on each stream.
final IntObjectMap<Integer> streamSizes = new IntObjectHashMap<Integer>(4);
streamSizes.put(STREAM_A, (Integer) 400);
streamSizes.put(STREAM_B, (Integer) 500);
streamSizes.put(STREAM_C, (Integer) 600);
streamSizes.put(STREAM_D, (Integer) 700);
updateStream(STREAM_A, streamSizes.get(STREAM_A), true);
updateStream(STREAM_B, streamSizes.get(STREAM_B), true);
updateStream(STREAM_C, streamSizes.get(STREAM_C), true);
updateStream(STREAM_D, streamSizes.get(STREAM_D), true);
streamB.setPriority(STREAM_A, DEFAULT_PRIORITY_WEIGHT, true);
assertEquals(calculateStreamSizeSum(streamSizes,
Arrays.asList(STREAM_A, STREAM_B, STREAM_C, STREAM_D)),
streamableBytesForTree(stream0));
assertEquals(calculateStreamSizeSum(streamSizes,
Arrays.asList(STREAM_A, STREAM_B, STREAM_C, STREAM_D)),
streamableBytesForTree(streamA));
assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_B, STREAM_C, STREAM_D)),
streamableBytesForTree(streamB));
assertEquals(calculateStreamSizeSum(streamSizes, Collections.singletonList(STREAM_C)),
streamableBytesForTree(streamC));
assertEquals(calculateStreamSizeSum(streamSizes, Collections.singletonList(STREAM_D)),
streamableBytesForTree(streamD));
}
/**
* In this test, we add a node to the priority tree and verify
*
* <pre>
* 0
* / \
* A B
* / \
* C D
* </pre>
*
* After the tree shift:
*
* <pre>
* 0
* / \
* A B
* |
* E
* / \
* C D
* </pre>
*/
@Test
public void subTreeBytesShouldBeCorrectWithAddition() throws Http2Exception {
Http2Stream stream0 = connection.connectionStream();
Http2Stream streamA = connection.stream(STREAM_A);
Http2Stream streamB = connection.stream(STREAM_B);
Http2Stream streamC = connection.stream(STREAM_C);
Http2Stream streamD = connection.stream(STREAM_D);
Http2Stream streamE = connection.local().createStream(STREAM_E, false);
streamE.setPriority(STREAM_A, DEFAULT_PRIORITY_WEIGHT, true);
// Send a bunch of data on each stream.
final IntObjectMap<Integer> streamSizes = new IntObjectHashMap<Integer>(4);
streamSizes.put(STREAM_A, (Integer) 400);
streamSizes.put(STREAM_B, (Integer) 500);
streamSizes.put(STREAM_C, (Integer) 600);
streamSizes.put(STREAM_D, (Integer) 700);
streamSizes.put(STREAM_E, (Integer) 900);
updateStream(STREAM_A, streamSizes.get(STREAM_A), true);
updateStream(STREAM_B, streamSizes.get(STREAM_B), true);
updateStream(STREAM_C, streamSizes.get(STREAM_C), true);
updateStream(STREAM_D, streamSizes.get(STREAM_D), true);
updateStream(STREAM_E, streamSizes.get(STREAM_E), true);
assertEquals(calculateStreamSizeSum(streamSizes,
Arrays.asList(STREAM_A, STREAM_B, STREAM_C, STREAM_D, STREAM_E)),
streamableBytesForTree(stream0));
assertEquals(calculateStreamSizeSum(streamSizes,
Arrays.asList(STREAM_A, STREAM_E, STREAM_C, STREAM_D)),
streamableBytesForTree(streamA));
assertEquals(calculateStreamSizeSum(streamSizes, Collections.singletonList(STREAM_B)),
streamableBytesForTree(streamB));
assertEquals(calculateStreamSizeSum(streamSizes, Collections.singletonList(STREAM_C)),
streamableBytesForTree(streamC));
assertEquals(calculateStreamSizeSum(streamSizes, Collections.singletonList(STREAM_D)),
streamableBytesForTree(streamD));
assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_E, STREAM_C, STREAM_D)),
streamableBytesForTree(streamE));
}
/**
* In this test, we close an internal stream in the priority tree but tree should not change
*
* <pre>
* 0
* / \
* A B
* / \
* C D
* </pre>
*/
@Test
public void subTreeBytesShouldBeCorrectWithInternalStreamClose() throws Http2Exception {
Http2Stream stream0 = connection.connectionStream();
Http2Stream streamA = connection.stream(STREAM_A);
Http2Stream streamB = connection.stream(STREAM_B);
Http2Stream streamC = connection.stream(STREAM_C);
Http2Stream streamD = connection.stream(STREAM_D);
// Send a bunch of data on each stream.
final IntObjectMap<Integer> streamSizes = new IntObjectHashMap<Integer>(4);
streamSizes.put(STREAM_A, (Integer) 400);
streamSizes.put(STREAM_B, (Integer) 500);
streamSizes.put(STREAM_C, (Integer) 600);
streamSizes.put(STREAM_D, (Integer) 700);
updateStream(STREAM_A, streamSizes.get(STREAM_A), true);
updateStream(STREAM_B, streamSizes.get(STREAM_B), true);
updateStream(STREAM_C, streamSizes.get(STREAM_C), true);
updateStream(STREAM_D, streamSizes.get(STREAM_D), true);
streamA.close();
assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_B, STREAM_C, STREAM_D)),
streamableBytesForTree(stream0));
assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_C, STREAM_D)),
streamableBytesForTree(streamA));
assertEquals(calculateStreamSizeSum(streamSizes, Collections.singletonList(STREAM_B)),
streamableBytesForTree(streamB));
assertEquals(calculateStreamSizeSum(streamSizes, Collections.singletonList(STREAM_C)),
streamableBytesForTree(streamC));
assertEquals(calculateStreamSizeSum(streamSizes, Collections.singletonList(STREAM_D)),
streamableBytesForTree(streamD));
}
/**
* In this test, we close a leaf stream in the priority tree and verify
*
* <pre>
* 0
* / \
* A B
* / \
* C D
* </pre>
*
* After the close:
* <pre>
* 0
* / \
* A B
* |
* D
* </pre>
*/
@Test
public void subTreeBytesShouldBeCorrectWithLeafStreamClose() throws Http2Exception {
Http2Stream stream0 = connection.connectionStream();
Http2Stream streamA = connection.stream(STREAM_A);
Http2Stream streamB = connection.stream(STREAM_B);
Http2Stream streamC = connection.stream(STREAM_C);
Http2Stream streamD = connection.stream(STREAM_D);
// Send a bunch of data on each stream.
final IntObjectMap<Integer> streamSizes = new IntObjectHashMap<Integer>(4);
streamSizes.put(STREAM_A, (Integer) 400);
streamSizes.put(STREAM_B, (Integer) 500);
streamSizes.put(STREAM_C, (Integer) 600);
streamSizes.put(STREAM_D, (Integer) 700);
updateStream(STREAM_A, streamSizes.get(STREAM_A), true);
updateStream(STREAM_B, streamSizes.get(STREAM_B), true);
updateStream(STREAM_C, streamSizes.get(STREAM_C), true);
updateStream(STREAM_D, streamSizes.get(STREAM_D), true);
streamC.close();
assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_A, STREAM_B, STREAM_D)),
streamableBytesForTree(stream0));
assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_A, STREAM_D)),
streamableBytesForTree(streamA));
assertEquals(calculateStreamSizeSum(streamSizes, Collections.singletonList(STREAM_B)),
streamableBytesForTree(streamB));
assertEquals(0, streamableBytesForTree(streamC));
assertEquals(calculateStreamSizeSum(streamSizes, Collections.singletonList(STREAM_D)),
streamableBytesForTree(streamD));
}
private Http2Stream stream(int streamId) {
return connection.stream(streamId);
}
private void updateStream(final int streamId, final int pendingBytes, final boolean hasFrame) {
final Http2Stream stream = stream(streamId);
distributor.updateStreamableBytes(new StreamByteDistributor.StreamState() {
@Override
public Http2Stream stream() {
return stream;
}
@Override
public int pendingBytes() {
return pendingBytes;
}
@Override
public boolean hasFrame() {
return hasFrame;
}
@Override
public int windowSize() {
return pendingBytes;
}
});
}
private void setPriority(int streamId, int parent, int weight, boolean exclusive) throws Http2Exception {
stream(streamId).setPriority(parent, (short) weight, exclusive);
}
private long streamableBytesForTree(Http2Stream stream) {
return distributor.unallocatedStreamableBytesForTree(stream);
}
private boolean write(int numBytes) throws Http2Exception {
return distributor.distribute(numBytes, writer);
}
private void verifyWrite(int streamId, int numBytes) {
verify(writer).write(same(stream(streamId)), eq(numBytes));
}
private void verifyWrite(VerificationMode mode, int streamId, int numBytes) {
verify(writer, mode).write(same(stream(streamId)), eq(numBytes));
}
private void verifyWriteWithDelta(int streamId, int numBytes, int delta) {
verify(writer).write(same(stream(streamId)), (int) AdditionalMatchers.eq(numBytes, delta));
}
private static long calculateStreamSizeSum(IntObjectMap<Integer> streamSizes, List<Integer> streamIds) {
long sum = 0;
for (Integer streamId : streamIds) {
Integer streamSize = streamSizes.get(streamId);
if (streamSize != null) {
sum += streamSize;
}
}
return sum;
}
}