From b3e0e09a82f7d467766f36ac46e2bc36f5f09b2b Mon Sep 17 00:00:00 2001 From: Heng Zhang Date: Tue, 20 Jul 2021 20:17:56 +0800 Subject: [PATCH] Distribue weight to children when closing stream (#11490) Motivation: As suggested in [section 5.3.4 in http2 spec](https://datatracker.ietf.org/doc/html/rfc7540#section-5.3.4): > When a stream is removed from the dependency tree, its dependencies can be moved to become dependent on the parent of the closed stream. The weights of new dependencies are recalculated by distributing the weight of the dependency of the closed stream proportionally based on the weights of its dependencies. For example, we have stream A and B depend on connection stream with default weights (16), and stream C depends on A with max weight (256). When stream A was closed, we move stream C to become dependent on the connection stream, then we should distribute the weight of stream A to its children (only stream C), so the new weight of stream C will be 16. If we keep the weight of stream C unchanged, it will get more resource than stream B Modification: - distribute weight to its children when closing a stream - add a unit test for the case above and fix other related unit tests Result: More spec-compliant and more appropriate stream reprioritization Co-authored-by: Heng Zhang --- .../WeightedFairQueueByteDistributor.java | 22 +++++-- ...ueueByteDistributorDependencyTreeTest.java | 61 ++++++++++++++++--- 2 files changed, 71 insertions(+), 12 deletions(-) diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/WeightedFairQueueByteDistributor.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/WeightedFairQueueByteDistributor.java index 958103b8b0..a38c0e49c0 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/WeightedFairQueueByteDistributor.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/WeightedFairQueueByteDistributor.java @@ -550,16 +550,30 @@ public final class WeightedFairQueueByteDistributor implements StreamByteDistrib events.add(new ParentChangedEvent(child, child.parent)); child.setParent(null); - // Move up any grand children to be directly dependent on this node. - Iterator> itr = child.children.entries().iterator(); - while (itr.hasNext()) { - takeChild(itr, itr.next().value(), false, events); + if (!child.children.isEmpty()) { + // Move up any grand children to be directly dependent on this node. + Iterator> itr = child.children.entries().iterator(); + long totalWeight = child.getTotalWeight(); + do { + // Redistribute the weight of child to its dependency proportionally. + State dependency = itr.next().value(); + dependency.weight = (short) Math.max(1, dependency.weight * child.weight / totalWeight); + takeChild(itr, dependency, false, events); + } while (itr.hasNext()); } notifyParentChanged(events); } } + private long getTotalWeight() { + long totalWeight = 0L; + for (State state : children.values()) { + totalWeight += state.weight; + } + return totalWeight; + } + /** * Remove all children with the exception of {@code streamToRetain}. * This method is intended to be used to support an exclusive priority dependency operation. diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/WeightedFairQueueByteDistributorDependencyTreeTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/WeightedFairQueueByteDistributorDependencyTreeTest.java index fc39ceed9c..0449348030 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/WeightedFairQueueByteDistributorDependencyTreeTest.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/WeightedFairQueueByteDistributorDependencyTreeTest.java @@ -299,17 +299,19 @@ public class WeightedFairQueueByteDistributorDependencyTreeTest extends verifyLowestPrecedenceStateShouldBeDropped2(weight9, weight5, weight7); // Stream 3 (hasn't been opened) should result in stream 5 being dropped. + // dropping stream 5 will distribute its weight to children (only 9) setPriority(3, 9, weight3, false); - verifyLowestPrecedenceStateShouldBeDropped3(weight3, weight7, weight9); + short newWeight9 = weight5; + verifyLowestPrecedenceStateShouldBeDropped3(weight3, weight7, newWeight9); // Stream 5's state has been discarded so we should be able to re-insert this state. setPriority(5, 0, weight5, false); - verifyLowestPrecedenceStateShouldBeDropped4(weight5, weight7, weight9); + verifyLowestPrecedenceStateShouldBeDropped4(weight5, weight7, newWeight9); // All streams are at the same level, so stream ID should be used to drop the numeric lowest valued stream. short weight11 = (short) (weight9 - 1); setPriority(11, 0, weight11, false); - verifyLowestPrecedenceStateShouldBeDropped5(weight7, weight9, weight11); + verifyLowestPrecedenceStateShouldBeDropped5(weight7, newWeight9, weight11); } private void verifyLowestPrecedenceStateShouldBeDropped1(short weight3, short weight5, short weight7) { @@ -666,6 +668,7 @@ public class WeightedFairQueueByteDistributorDependencyTreeTest extends setPriority(streamD.id(), streamB.id(), DEFAULT_PRIORITY_WEIGHT, false); // Default removal policy will cause it to be removed immediately. + // Closing streamB will distribute its weight to the children (C & D) equally. streamB.close(); // Level 0 @@ -676,10 +679,11 @@ public class WeightedFairQueueByteDistributorDependencyTreeTest extends assertEquals(2, distributor.numChildren(streamA.id())); // Level 2 - assertTrue(distributor.isChild(streamC.id(), streamA.id(), DEFAULT_PRIORITY_WEIGHT)); + short halfWeight = DEFAULT_PRIORITY_WEIGHT / 2; + assertTrue(distributor.isChild(streamC.id(), streamA.id(), halfWeight)); assertEquals(0, distributor.numChildren(streamC.id())); - assertTrue(distributor.isChild(streamD.id(), streamA.id(), DEFAULT_PRIORITY_WEIGHT)); + assertTrue(distributor.isChild(streamD.id(), streamA.id(), halfWeight)); assertEquals(0, distributor.numChildren(streamD.id())); } @@ -700,6 +704,7 @@ public class WeightedFairQueueByteDistributorDependencyTreeTest extends // Close internal nodes, leave 1 leaf node open, the only remaining stream is the one that is not closed (E). streamA.close(); + // Closing streamB will distribute its weight to the children (C & D) equally. streamB.close(); streamC.close(); streamD.close(); @@ -709,10 +714,48 @@ public class WeightedFairQueueByteDistributorDependencyTreeTest extends assertEquals(1, distributor.numChildren(connection.connectionStream().id())); // Level 1 - assertTrue(distributor.isChild(streamE.id(), connection.connectionStream().id(), DEFAULT_PRIORITY_WEIGHT)); + short halfWeight = DEFAULT_PRIORITY_WEIGHT / 2; + assertTrue(distributor.isChild(streamE.id(), connection.connectionStream().id(), halfWeight)); assertEquals(0, distributor.numChildren(streamE.id())); } + @Test + public void closeStreamWithChildrenShouldRedistributeWeightToChildren() throws Exception { + Http2Stream streamA = connection.local().createStream(1, false); + Http2Stream streamB = connection.local().createStream(3, false); + Http2Stream streamC = connection.local().createStream(5, false); + Http2Stream streamD = connection.local().createStream(7, false); + Http2Stream streamE = connection.local().createStream(9, false); + Http2Stream streamF = connection.local().createStream(11, false); + Http2Stream streamG = connection.local().createStream(13, false); + Http2Stream streamH = connection.local().createStream(15, false); + + setPriority(streamC.id(), streamA.id(), MAX_WEIGHT, false); + setPriority(streamD.id(), streamA.id(), MAX_WEIGHT, false); + setPriority(streamE.id(), streamA.id(), MAX_WEIGHT, false); + + setPriority(streamF.id(), streamB.id(), DEFAULT_PRIORITY_WEIGHT, false); + setPriority(streamG.id(), streamB.id(), DEFAULT_PRIORITY_WEIGHT, false); + setPriority(streamH.id(), streamB.id(), 2 * DEFAULT_PRIORITY_WEIGHT, false); + + streamE.close(); + // closing stream A will distribute its weight to the children (C & D) equally + streamA.close(); + // closing stream B will distribute its weight to the children (F & G & H) proportionally + streamB.close(); + // Level 0 + assertEquals(5, distributor.numChildren(connection.connectionStream().id())); + // Level 1 + short halfWeight = DEFAULT_PRIORITY_WEIGHT / 2; + assertTrue(distributor.isChild(streamC.id(), connection.connectionStream().id(), halfWeight)); + assertTrue(distributor.isChild(streamD.id(), connection.connectionStream().id(), halfWeight)); + + short quarterWeight = DEFAULT_PRIORITY_WEIGHT / 4; + assertTrue(distributor.isChild(streamF.id(), connection.connectionStream().id(), quarterWeight)); + assertTrue(distributor.isChild(streamG.id(), connection.connectionStream().id(), quarterWeight)); + assertTrue(distributor.isChild(streamH.id(), connection.connectionStream().id(), (short) (2 * quarterWeight))); + } + @Test public void priorityChangeWithNoPrioritizableDependentsShouldRestructureTree() throws Exception { Http2Stream streamA = connection.local().createStream(1, false); @@ -730,6 +773,7 @@ public class WeightedFairQueueByteDistributorDependencyTreeTest extends // Leave leaf nodes open (E & F) streamA.close(); + // Closing streamB will distribute its weight to the children (C & D) equally. streamB.close(); streamC.close(); streamD.close(); @@ -741,10 +785,11 @@ public class WeightedFairQueueByteDistributorDependencyTreeTest extends assertEquals(2, distributor.numChildren(connection.connectionStream().id())); // Level 1 - assertTrue(distributor.isChild(streamE.id(), connection.connectionStream().id(), DEFAULT_PRIORITY_WEIGHT)); + short halfWeight = DEFAULT_PRIORITY_WEIGHT / 2; + assertTrue(distributor.isChild(streamE.id(), connection.connectionStream().id(), halfWeight)); assertEquals(0, distributor.numChildren(streamE.id())); - assertTrue(distributor.isChild(streamF.id(), connection.connectionStream().id(), DEFAULT_PRIORITY_WEIGHT)); + assertTrue(distributor.isChild(streamF.id(), connection.connectionStream().id(), halfWeight)); assertEquals(0, distributor.numChildren(streamF.id())); }