diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/WeightedFairQueueByteDistributor.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/WeightedFairQueueByteDistributor.java index 958103b8b0..a38c0e49c0 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/WeightedFairQueueByteDistributor.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/WeightedFairQueueByteDistributor.java @@ -550,16 +550,30 @@ public final class WeightedFairQueueByteDistributor implements StreamByteDistrib events.add(new ParentChangedEvent(child, child.parent)); child.setParent(null); - // Move up any grand children to be directly dependent on this node. - Iterator> itr = child.children.entries().iterator(); - while (itr.hasNext()) { - takeChild(itr, itr.next().value(), false, events); + if (!child.children.isEmpty()) { + // Move up any grand children to be directly dependent on this node. + Iterator> itr = child.children.entries().iterator(); + long totalWeight = child.getTotalWeight(); + do { + // Redistribute the weight of child to its dependency proportionally. + State dependency = itr.next().value(); + dependency.weight = (short) Math.max(1, dependency.weight * child.weight / totalWeight); + takeChild(itr, dependency, false, events); + } while (itr.hasNext()); } notifyParentChanged(events); } } + private long getTotalWeight() { + long totalWeight = 0L; + for (State state : children.values()) { + totalWeight += state.weight; + } + return totalWeight; + } + /** * Remove all children with the exception of {@code streamToRetain}. * This method is intended to be used to support an exclusive priority dependency operation. diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/WeightedFairQueueByteDistributorDependencyTreeTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/WeightedFairQueueByteDistributorDependencyTreeTest.java index fc39ceed9c..0449348030 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/WeightedFairQueueByteDistributorDependencyTreeTest.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/WeightedFairQueueByteDistributorDependencyTreeTest.java @@ -299,17 +299,19 @@ public class WeightedFairQueueByteDistributorDependencyTreeTest extends verifyLowestPrecedenceStateShouldBeDropped2(weight9, weight5, weight7); // Stream 3 (hasn't been opened) should result in stream 5 being dropped. + // dropping stream 5 will distribute its weight to children (only 9) setPriority(3, 9, weight3, false); - verifyLowestPrecedenceStateShouldBeDropped3(weight3, weight7, weight9); + short newWeight9 = weight5; + verifyLowestPrecedenceStateShouldBeDropped3(weight3, weight7, newWeight9); // Stream 5's state has been discarded so we should be able to re-insert this state. setPriority(5, 0, weight5, false); - verifyLowestPrecedenceStateShouldBeDropped4(weight5, weight7, weight9); + verifyLowestPrecedenceStateShouldBeDropped4(weight5, weight7, newWeight9); // All streams are at the same level, so stream ID should be used to drop the numeric lowest valued stream. short weight11 = (short) (weight9 - 1); setPriority(11, 0, weight11, false); - verifyLowestPrecedenceStateShouldBeDropped5(weight7, weight9, weight11); + verifyLowestPrecedenceStateShouldBeDropped5(weight7, newWeight9, weight11); } private void verifyLowestPrecedenceStateShouldBeDropped1(short weight3, short weight5, short weight7) { @@ -666,6 +668,7 @@ public class WeightedFairQueueByteDistributorDependencyTreeTest extends setPriority(streamD.id(), streamB.id(), DEFAULT_PRIORITY_WEIGHT, false); // Default removal policy will cause it to be removed immediately. + // Closing streamB will distribute its weight to the children (C & D) equally. streamB.close(); // Level 0 @@ -676,10 +679,11 @@ public class WeightedFairQueueByteDistributorDependencyTreeTest extends assertEquals(2, distributor.numChildren(streamA.id())); // Level 2 - assertTrue(distributor.isChild(streamC.id(), streamA.id(), DEFAULT_PRIORITY_WEIGHT)); + short halfWeight = DEFAULT_PRIORITY_WEIGHT / 2; + assertTrue(distributor.isChild(streamC.id(), streamA.id(), halfWeight)); assertEquals(0, distributor.numChildren(streamC.id())); - assertTrue(distributor.isChild(streamD.id(), streamA.id(), DEFAULT_PRIORITY_WEIGHT)); + assertTrue(distributor.isChild(streamD.id(), streamA.id(), halfWeight)); assertEquals(0, distributor.numChildren(streamD.id())); } @@ -700,6 +704,7 @@ public class WeightedFairQueueByteDistributorDependencyTreeTest extends // Close internal nodes, leave 1 leaf node open, the only remaining stream is the one that is not closed (E). streamA.close(); + // Closing streamB will distribute its weight to the children (C & D) equally. streamB.close(); streamC.close(); streamD.close(); @@ -709,10 +714,48 @@ public class WeightedFairQueueByteDistributorDependencyTreeTest extends assertEquals(1, distributor.numChildren(connection.connectionStream().id())); // Level 1 - assertTrue(distributor.isChild(streamE.id(), connection.connectionStream().id(), DEFAULT_PRIORITY_WEIGHT)); + short halfWeight = DEFAULT_PRIORITY_WEIGHT / 2; + assertTrue(distributor.isChild(streamE.id(), connection.connectionStream().id(), halfWeight)); assertEquals(0, distributor.numChildren(streamE.id())); } + @Test + public void closeStreamWithChildrenShouldRedistributeWeightToChildren() throws Exception { + Http2Stream streamA = connection.local().createStream(1, false); + Http2Stream streamB = connection.local().createStream(3, false); + Http2Stream streamC = connection.local().createStream(5, false); + Http2Stream streamD = connection.local().createStream(7, false); + Http2Stream streamE = connection.local().createStream(9, false); + Http2Stream streamF = connection.local().createStream(11, false); + Http2Stream streamG = connection.local().createStream(13, false); + Http2Stream streamH = connection.local().createStream(15, false); + + setPriority(streamC.id(), streamA.id(), MAX_WEIGHT, false); + setPriority(streamD.id(), streamA.id(), MAX_WEIGHT, false); + setPriority(streamE.id(), streamA.id(), MAX_WEIGHT, false); + + setPriority(streamF.id(), streamB.id(), DEFAULT_PRIORITY_WEIGHT, false); + setPriority(streamG.id(), streamB.id(), DEFAULT_PRIORITY_WEIGHT, false); + setPriority(streamH.id(), streamB.id(), 2 * DEFAULT_PRIORITY_WEIGHT, false); + + streamE.close(); + // closing stream A will distribute its weight to the children (C & D) equally + streamA.close(); + // closing stream B will distribute its weight to the children (F & G & H) proportionally + streamB.close(); + // Level 0 + assertEquals(5, distributor.numChildren(connection.connectionStream().id())); + // Level 1 + short halfWeight = DEFAULT_PRIORITY_WEIGHT / 2; + assertTrue(distributor.isChild(streamC.id(), connection.connectionStream().id(), halfWeight)); + assertTrue(distributor.isChild(streamD.id(), connection.connectionStream().id(), halfWeight)); + + short quarterWeight = DEFAULT_PRIORITY_WEIGHT / 4; + assertTrue(distributor.isChild(streamF.id(), connection.connectionStream().id(), quarterWeight)); + assertTrue(distributor.isChild(streamG.id(), connection.connectionStream().id(), quarterWeight)); + assertTrue(distributor.isChild(streamH.id(), connection.connectionStream().id(), (short) (2 * quarterWeight))); + } + @Test public void priorityChangeWithNoPrioritizableDependentsShouldRestructureTree() throws Exception { Http2Stream streamA = connection.local().createStream(1, false); @@ -730,6 +773,7 @@ public class WeightedFairQueueByteDistributorDependencyTreeTest extends // Leave leaf nodes open (E & F) streamA.close(); + // Closing streamB will distribute its weight to the children (C & D) equally. streamB.close(); streamC.close(); streamD.close(); @@ -741,10 +785,11 @@ public class WeightedFairQueueByteDistributorDependencyTreeTest extends assertEquals(2, distributor.numChildren(connection.connectionStream().id())); // Level 1 - assertTrue(distributor.isChild(streamE.id(), connection.connectionStream().id(), DEFAULT_PRIORITY_WEIGHT)); + short halfWeight = DEFAULT_PRIORITY_WEIGHT / 2; + assertTrue(distributor.isChild(streamE.id(), connection.connectionStream().id(), halfWeight)); assertEquals(0, distributor.numChildren(streamE.id())); - assertTrue(distributor.isChild(streamF.id(), connection.connectionStream().id(), DEFAULT_PRIORITY_WEIGHT)); + assertTrue(distributor.isChild(streamF.id(), connection.connectionStream().id(), halfWeight)); assertEquals(0, distributor.numChildren(streamF.id())); }