Distribue weight to children when closing stream (#11490)

Motivation:

As suggested in [section 5.3.4 in http2 spec](https://datatracker.ietf.org/doc/html/rfc7540#section-5.3.4):

> When a stream is removed from the dependency tree, its dependencies can be moved to become dependent on the parent of the closed stream. The weights of new dependencies are recalculated by distributing the weight of the dependency of the closed stream proportionally based on the weights of its dependencies.

For example, we have stream A and B depend on connection stream with default weights (16), and stream C depends on A with max weight (256). When stream A was closed, we move stream C to become dependent on the connection stream, then we should distribute the weight of stream A to its children (only stream C), so the new weight of stream C will be 16. If we keep the weight of stream C unchanged, it will get more resource than stream B

Modification:
- distribute weight to its children when closing a stream
- add a unit test for the case above and fix other related unit tests

Result:

More spec-compliant and more appropriate stream reprioritization

Co-authored-by: Heng Zhang <zhangheng@imo.im>
This commit is contained in:
Heng Zhang 2021-07-20 20:17:56 +08:00 committed by Norman Maurer
parent c982d45493
commit d920cbf143
2 changed files with 71 additions and 12 deletions

View File

@ -549,16 +549,30 @@ public final class WeightedFairQueueByteDistributor implements StreamByteDistrib
events.add(new ParentChangedEvent(child, child.parent));
child.setParent(null);
if (!child.children.isEmpty()) {
// Move up any grand children to be directly dependent on this node.
Iterator<IntObjectMap.PrimitiveEntry<State>> itr = child.children.entries().iterator();
while (itr.hasNext()) {
takeChild(itr, itr.next().value(), false, events);
long totalWeight = child.getTotalWeight();
do {
// Redistribute the weight of child to its dependency proportionally.
State dependency = itr.next().value();
dependency.weight = (short) Math.max(1, dependency.weight * child.weight / totalWeight);
takeChild(itr, dependency, false, events);
} while (itr.hasNext());
}
notifyParentChanged(events);
}
}
private long getTotalWeight() {
long totalWeight = 0L;
for (State state : children.values()) {
totalWeight += state.weight;
}
return totalWeight;
}
/**
* Remove all children with the exception of {@code streamToRetain}.
* This method is intended to be used to support an exclusive priority dependency operation.

View File

@ -296,17 +296,19 @@ public class WeightedFairQueueByteDistributorDependencyTreeTest extends
verifyLowestPrecedenceStateShouldBeDropped2(weight9, weight5, weight7);
// Stream 3 (hasn't been opened) should result in stream 5 being dropped.
// dropping stream 5 will distribute its weight to children (only 9)
setPriority(3, 9, weight3, false);
verifyLowestPrecedenceStateShouldBeDropped3(weight3, weight7, weight9);
short newWeight9 = weight5;
verifyLowestPrecedenceStateShouldBeDropped3(weight3, weight7, newWeight9);
// Stream 5's state has been discarded so we should be able to re-insert this state.
setPriority(5, 0, weight5, false);
verifyLowestPrecedenceStateShouldBeDropped4(weight5, weight7, weight9);
verifyLowestPrecedenceStateShouldBeDropped4(weight5, weight7, newWeight9);
// All streams are at the same level, so stream ID should be used to drop the numeric lowest valued stream.
short weight11 = (short) (weight9 - 1);
setPriority(11, 0, weight11, false);
verifyLowestPrecedenceStateShouldBeDropped5(weight7, weight9, weight11);
verifyLowestPrecedenceStateShouldBeDropped5(weight7, newWeight9, weight11);
}
private void verifyLowestPrecedenceStateShouldBeDropped1(short weight3, short weight5, short weight7) {
@ -663,6 +665,7 @@ public class WeightedFairQueueByteDistributorDependencyTreeTest extends
setPriority(streamD.id(), streamB.id(), DEFAULT_PRIORITY_WEIGHT, false);
// Default removal policy will cause it to be removed immediately.
// Closing streamB will distribute its weight to the children (C & D) equally.
streamB.close();
// Level 0
@ -673,10 +676,11 @@ public class WeightedFairQueueByteDistributorDependencyTreeTest extends
assertEquals(2, distributor.numChildren(streamA.id()));
// Level 2
assertTrue(distributor.isChild(streamC.id(), streamA.id(), DEFAULT_PRIORITY_WEIGHT));
short halfWeight = DEFAULT_PRIORITY_WEIGHT / 2;
assertTrue(distributor.isChild(streamC.id(), streamA.id(), halfWeight));
assertEquals(0, distributor.numChildren(streamC.id()));
assertTrue(distributor.isChild(streamD.id(), streamA.id(), DEFAULT_PRIORITY_WEIGHT));
assertTrue(distributor.isChild(streamD.id(), streamA.id(), halfWeight));
assertEquals(0, distributor.numChildren(streamD.id()));
}
@ -697,6 +701,7 @@ public class WeightedFairQueueByteDistributorDependencyTreeTest extends
// Close internal nodes, leave 1 leaf node open, the only remaining stream is the one that is not closed (E).
streamA.close();
// Closing streamB will distribute its weight to the children (C & D) equally.
streamB.close();
streamC.close();
streamD.close();
@ -706,10 +711,48 @@ public class WeightedFairQueueByteDistributorDependencyTreeTest extends
assertEquals(1, distributor.numChildren(connection.connectionStream().id()));
// Level 1
assertTrue(distributor.isChild(streamE.id(), connection.connectionStream().id(), DEFAULT_PRIORITY_WEIGHT));
short halfWeight = DEFAULT_PRIORITY_WEIGHT / 2;
assertTrue(distributor.isChild(streamE.id(), connection.connectionStream().id(), halfWeight));
assertEquals(0, distributor.numChildren(streamE.id()));
}
@Test
public void closeStreamWithChildrenShouldRedistributeWeightToChildren() throws Exception {
Http2Stream streamA = connection.local().createStream(1, false);
Http2Stream streamB = connection.local().createStream(3, false);
Http2Stream streamC = connection.local().createStream(5, false);
Http2Stream streamD = connection.local().createStream(7, false);
Http2Stream streamE = connection.local().createStream(9, false);
Http2Stream streamF = connection.local().createStream(11, false);
Http2Stream streamG = connection.local().createStream(13, false);
Http2Stream streamH = connection.local().createStream(15, false);
setPriority(streamC.id(), streamA.id(), MAX_WEIGHT, false);
setPriority(streamD.id(), streamA.id(), MAX_WEIGHT, false);
setPriority(streamE.id(), streamA.id(), MAX_WEIGHT, false);
setPriority(streamF.id(), streamB.id(), DEFAULT_PRIORITY_WEIGHT, false);
setPriority(streamG.id(), streamB.id(), DEFAULT_PRIORITY_WEIGHT, false);
setPriority(streamH.id(), streamB.id(), 2 * DEFAULT_PRIORITY_WEIGHT, false);
streamE.close();
// closing stream A will distribute its weight to the children (C & D) equally
streamA.close();
// closing stream B will distribute its weight to the children (F & G & H) proportionally
streamB.close();
// Level 0
assertEquals(5, distributor.numChildren(connection.connectionStream().id()));
// Level 1
short halfWeight = DEFAULT_PRIORITY_WEIGHT / 2;
assertTrue(distributor.isChild(streamC.id(), connection.connectionStream().id(), halfWeight));
assertTrue(distributor.isChild(streamD.id(), connection.connectionStream().id(), halfWeight));
short quarterWeight = DEFAULT_PRIORITY_WEIGHT / 4;
assertTrue(distributor.isChild(streamF.id(), connection.connectionStream().id(), quarterWeight));
assertTrue(distributor.isChild(streamG.id(), connection.connectionStream().id(), quarterWeight));
assertTrue(distributor.isChild(streamH.id(), connection.connectionStream().id(), (short) (2 * quarterWeight)));
}
@Test
public void priorityChangeWithNoPrioritizableDependentsShouldRestructureTree() throws Exception {
Http2Stream streamA = connection.local().createStream(1, false);
@ -727,6 +770,7 @@ public class WeightedFairQueueByteDistributorDependencyTreeTest extends
// Leave leaf nodes open (E & F)
streamA.close();
// Closing streamB will distribute its weight to the children (C & D) equally.
streamB.close();
streamC.close();
streamD.close();
@ -738,10 +782,11 @@ public class WeightedFairQueueByteDistributorDependencyTreeTest extends
assertEquals(2, distributor.numChildren(connection.connectionStream().id()));
// Level 1
assertTrue(distributor.isChild(streamE.id(), connection.connectionStream().id(), DEFAULT_PRIORITY_WEIGHT));
short halfWeight = DEFAULT_PRIORITY_WEIGHT / 2;
assertTrue(distributor.isChild(streamE.id(), connection.connectionStream().id(), halfWeight));
assertEquals(0, distributor.numChildren(streamE.id()));
assertTrue(distributor.isChild(streamF.id(), connection.connectionStream().id(), DEFAULT_PRIORITY_WEIGHT));
assertTrue(distributor.isChild(streamF.id(), connection.connectionStream().id(), halfWeight));
assertEquals(0, distributor.numChildren(streamF.id()));
}