diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2Connection.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2Connection.java index f44b571570..6b9b705983 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2Connection.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2Connection.java @@ -33,6 +33,7 @@ import static io.netty.handler.codec.http2.Http2Stream.State.OPEN; import static io.netty.handler.codec.http2.Http2Stream.State.RESERVED_LOCAL; import static io.netty.handler.codec.http2.Http2Stream.State.RESERVED_REMOTE; import static io.netty.util.internal.ObjectUtil.checkNotNull; + import io.netty.buffer.ByteBuf; import io.netty.handler.codec.http2.Http2StreamRemovalPolicy.Action; import io.netty.util.collection.IntObjectHashMap; @@ -42,7 +43,6 @@ import io.netty.util.internal.PlatformDependent; import java.util.ArrayDeque; import java.util.ArrayList; -import java.util.Collection; import java.util.HashMap; import java.util.Iterator; import java.util.LinkedHashSet; @@ -142,7 +142,7 @@ public class DefaultHttp2Connection implements Http2Connection { } @Override - public Http2Stream forEachActiveStream(StreamVisitor visitor) throws Http2Exception { + public Http2Stream forEachActiveStream(Http2StreamVisitor visitor) throws Http2Exception { return activeStreams.forEachActiveStream(visitor); } @@ -169,7 +169,7 @@ public class DefaultHttp2Connection implements Http2Connection { } try { - forEachActiveStream(new StreamVisitor() { + forEachActiveStream(new Http2StreamVisitor() { @Override public boolean visit(Http2Stream stream) { if (stream.id() > lastKnownStream && localEndpoint.createdStreamId(stream.id())) { @@ -196,7 +196,7 @@ public class DefaultHttp2Connection implements Http2Connection { } try { - forEachActiveStream(new StreamVisitor() { + forEachActiveStream(new Http2StreamVisitor() { @Override public boolean visit(Http2Stream stream) { if (stream.id() > lastKnownStream && remoteEndpoint.createdStreamId(stream.id())) { @@ -334,18 +334,14 @@ public class DefaultHttp2Connection implements Http2Connection { } @Override - public final Collection children() { - return children.values(); - } - - @Override - public final boolean hasChild(int streamId) { - return child(streamId) != null; - } - - @Override - public final Http2Stream child(int streamId) { - return children.get(streamId); + public Http2Stream forEachChild(Http2StreamVisitor visitor) throws Http2Exception { + for (IntObjectHashMap.Entry entry : children.entries()) { + Http2Stream stream = entry.value(); + if (!visitor.visit(stream)) { + return stream; + } + } + return null; } @Override @@ -1020,17 +1016,15 @@ public class DefaultHttp2Connection implements Http2Connection { } } - public Http2Stream forEachActiveStream(StreamVisitor visitor) throws Http2Exception { + public Http2Stream forEachActiveStream(Http2StreamVisitor visitor) throws Http2Exception { ++pendingIterations; - Http2Stream resultStream = null; try { for (Http2Stream stream : streams) { if (!visitor.visit(stream)) { - resultStream = stream; - break; + return stream; } } - return resultStream; + return null; } finally { --pendingIterations; if (allowModifications()) { diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2LocalFlowController.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2LocalFlowController.java index a12ae287c3..2fa90dc759 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2LocalFlowController.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2LocalFlowController.java @@ -28,7 +28,6 @@ import static java.lang.Math.max; import static java.lang.Math.min; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.http2.Http2Connection.StreamVisitor; import io.netty.handler.codec.http2.Http2Exception.CompositeStreamException; import io.netty.handler.codec.http2.Http2Exception.StreamException; @@ -381,7 +380,7 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController /** * Provides a means to iterate over all active streams and increment the flow control windows. */ - private static final class WindowUpdateVisitor implements StreamVisitor { + private static final class WindowUpdateVisitor implements Http2StreamVisitor { private CompositeStreamException compositeException; private final int delta; diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2RemoteFlowController.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2RemoteFlowController.java index a6eebdc0e6..366680505e 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2RemoteFlowController.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2RemoteFlowController.java @@ -22,18 +22,19 @@ import static io.netty.handler.codec.http2.Http2Exception.streamError; import static io.netty.util.internal.ObjectUtil.checkNotNull; import static java.lang.Math.max; import static java.lang.Math.min; + import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.http2.Http2Connection.StreamVisitor; import io.netty.handler.codec.http2.Http2Stream.State; import java.util.ArrayDeque; +import java.util.Arrays; import java.util.Deque; /** * Basic implementation of {@link Http2RemoteFlowController}. */ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowController { - private static final StreamVisitor WRITE_ALLOCATED_BYTES = new StreamVisitor() { + private static final Http2StreamVisitor WRITE_ALLOCATED_BYTES = new Http2StreamVisitor() { @Override public boolean visit(Http2Stream stream) { state(stream).writeAllocatedBytes(); @@ -124,7 +125,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll final int delta = newWindowSize - initialWindowSize; initialWindowSize = newWindowSize; - connection.forEachActiveStream(new StreamVisitor() { + connection.forEachActiveStream(new Http2StreamVisitor() { @Override public boolean visit(Http2Stream stream) throws Http2Exception { // Verify that the maximum value is not exceeded by this change. @@ -250,79 +251,167 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll * @param connectionWindow The connection window this is available for use at this point in the tree. * @return An object summarizing the write and allocation results. */ - private int allocateBytesForTree(Http2Stream parent, int connectionWindow) { + static int allocateBytesForTree(Http2Stream parent, int connectionWindow) throws Http2Exception { FlowState state = state(parent); if (state.streamableBytesForTree() <= 0) { return 0; } - int bytesAllocated = 0; // If the number of streamable bytes for this tree will fit in the connection window // then there is no need to prioritize the bytes...everyone sends what they have if (state.streamableBytesForTree() <= connectionWindow) { - for (Http2Stream child : parent.children()) { - state = state(child); - int bytesForChild = state.streamableBytes(); - - if (bytesForChild > 0 || state.hasFrame()) { - state.allocate(bytesForChild); - bytesAllocated += bytesForChild; - connectionWindow -= bytesForChild; - } - int childBytesAllocated = allocateBytesForTree(child, connectionWindow); - bytesAllocated += childBytesAllocated; - connectionWindow -= childBytesAllocated; - } - return bytesAllocated; + SimpleChildFeeder childFeeder = new SimpleChildFeeder(connectionWindow); + parent.forEachChild(childFeeder); + return childFeeder.bytesAllocated; } - // This is the priority algorithm which will divide the available bytes based - // upon stream weight relative to its peers - Http2Stream[] children = parent.children().toArray(new Http2Stream[parent.numChildren()]); - int totalWeight = parent.totalChildWeights(); - for (int tail = children.length; tail > 0;) { - int head = 0; - int nextTail = 0; - int nextTotalWeight = 0; - int nextConnectionWindow = connectionWindow; - for (; head < tail && nextConnectionWindow > 0; ++head) { - Http2Stream child = children[head]; - state = state(child); - int weight = child.weight(); - double weightRatio = weight / (double) totalWeight; + ChildFeeder childFeeder = new ChildFeeder(parent, connectionWindow); + // Iterate once over all children of this parent and try to feed all the children. + parent.forEachChild(childFeeder); - // In order to make progress toward the connection window due to possible rounding errors, we make sure - // that each stream (with data to send) is given at least 1 byte toward the connection window. - int connectionWindowChunk = Math.max(1, (int) (connectionWindow * weightRatio)); - int bytesForTree = Math.min(nextConnectionWindow, connectionWindowChunk); - int bytesForChild = Math.min(state.streamableBytes(), bytesForTree); + // Now feed any remaining children that are still hungry until the connection + // window collapses. + childFeeder.feedHungryChildren(); - if (bytesForChild > 0) { - state.allocate(bytesForChild); - bytesAllocated += bytesForChild; - nextConnectionWindow -= bytesForChild; - bytesForTree -= bytesForChild; - // If this subtree still wants to send then re-insert into children list and re-consider for next - // iteration. This is needed because we don't yet know if all the peers will be able to use - // all of their "fair share" of the connection window, and if they don't use it then we should - // divide their unused shared up for the peers who still want to send. - if (state.streamableBytesForTree() > 0) { - children[nextTail++] = child; - nextTotalWeight += weight; + return childFeeder.bytesAllocated; + } + + /** + * A {@link Http2StreamVisitor} that performs the HTTP/2 priority algorithm to distribute the available connection + * window appropriately to the children of a given stream. + */ + private static final class ChildFeeder implements Http2StreamVisitor { + final int maxSize; + int totalWeight; + int connectionWindow; + int nextTotalWeight; + int nextConnectionWindow; + int bytesAllocated; + Http2Stream[] stillHungry; + int nextTail; + + ChildFeeder(Http2Stream parent, int connectionWindow) { + maxSize = parent.numChildren(); + totalWeight = parent.totalChildWeights(); + this.connectionWindow = connectionWindow; + this.nextConnectionWindow = connectionWindow; + } + + @Override + public boolean visit(Http2Stream child) throws Http2Exception { + // In order to make progress toward the connection window due to possible rounding errors, we make sure + // that each stream (with data to send) is given at least 1 byte toward the connection window. + int connectionWindowChunk = max(1, (int) (connectionWindow * (child.weight() / (double) totalWeight))); + int bytesForTree = min(nextConnectionWindow, connectionWindowChunk); + + FlowState state = state(child); + int bytesForChild = min(state.streamableBytes(), bytesForTree); + + // Allocate the bytes to this child. + if (bytesForChild > 0) { + state.allocate(bytesForChild); + bytesAllocated += bytesForChild; + nextConnectionWindow -= bytesForChild; + bytesForTree -= bytesForChild; + + // If this subtree still wants to send then re-insert into children list and re-consider for next + // iteration. This is needed because we don't yet know if all the peers will be able to use + // all of their "fair share" of the connection window, and if they don't use it then we should + // divide their unused shared up for the peers who still want to send. + if (nextConnectionWindow > 0 && state.streamableBytesForTree() > 0) { + stillHungry(child); + nextTotalWeight += child.weight(); + } + } + + // Allocate any remaining bytes to the children of this stream. + if (bytesForTree > 0) { + int childBytesAllocated = allocateBytesForTree(child, bytesForTree); + bytesAllocated += childBytesAllocated; + nextConnectionWindow -= childBytesAllocated; + } + + return nextConnectionWindow > 0; + } + + void feedHungryChildren() throws Http2Exception { + if (stillHungry == null) { + // There are no hungry children to feed. + return; + } + + totalWeight = nextTotalWeight; + connectionWindow = nextConnectionWindow; + + // Loop until there are not bytes left to stream or the connection window has collapsed. + for (int tail = nextTail; tail > 0 && connectionWindow > 0;) { + nextTotalWeight = 0; + nextTail = 0; + + // Iterate over the children that are currently still hungry. + for (int head = 0; head < tail && nextConnectionWindow > 0; ++head) { + if (!visit(stillHungry[head])) { + // The connection window has collapsed, break out of the loop. + break; } } - - if (bytesForTree > 0) { - int childBytesAllocated = allocateBytesForTree(child, bytesForTree); - bytesAllocated += childBytesAllocated; - nextConnectionWindow -= childBytesAllocated; - } + connectionWindow = nextConnectionWindow; + totalWeight = nextTotalWeight; + tail = nextTail; } - connectionWindow = nextConnectionWindow; - totalWeight = nextTotalWeight; - tail = nextTail; } - return bytesAllocated; + /** + * Indicates that the given child is still hungry (i.e. still has streamable bytes that can + * fit within the current connection window). + */ + void stillHungry(Http2Stream child) { + ensureSpaceIsAllocated(nextTail); + stillHungry[nextTail++] = child; + } + + /** + * Ensures that the {@link #stillHungry} array is properly sized to hold the given index. + */ + void ensureSpaceIsAllocated(int index) { + if (stillHungry == null) { + // Initial size is 1/4 the number of children. Clipping the minimum at 2, which will over allocate if + // maxSize == 1 but if this was true we shouldn't need to re-allocate because the 1 child should get + // all of the available connection window. + stillHungry = new Http2Stream[max(2, maxSize / 4)]; + } else if (index == stillHungry.length) { + // Grow the array by a factor of 2. + stillHungry = Arrays.copyOf(stillHungry, min(maxSize, stillHungry.length * 2)); + } + } + } + + /** + * A simplified version of {@link ChildFeeder} that is only used when all streamable bytes fit within the + * available connection window. + */ + private static final class SimpleChildFeeder implements Http2StreamVisitor { + int bytesAllocated; + int connectionWindow; + + SimpleChildFeeder(int connectionWindow) { + this.connectionWindow = connectionWindow; + } + + @Override + public boolean visit(Http2Stream child) throws Http2Exception { + FlowState childState = state(child); + int bytesForChild = childState.streamableBytes(); + + if (bytesForChild > 0 || childState.hasFrame()) { + childState.allocate(bytesForChild); + bytesAllocated += bytesForChild; + connectionWindow -= bytesForChild; + } + int childBytesAllocated = allocateBytesForTree(child, connectionWindow); + bytesAllocated += childBytesAllocated; + connectionWindow -= childBytesAllocated; + return true; + } } /** @@ -522,7 +611,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll // Write the portion of the frame. writing = true; - needFlush |= frame.write(Math.max(0, allowedBytes)); + needFlush |= frame.write(max(0, allowedBytes)); if (!cancelled && frame.size() == 0) { // This frame has been fully written, remove this frame // and notify it. Since we remove this frame diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2Connection.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2Connection.java index cf6a2fbc9e..80bd87379e 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2Connection.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2Connection.java @@ -45,7 +45,7 @@ public interface Http2Connection { /** * Notifies the listener that the given stream is now {@code CLOSED} in both directions and will no longer - * be returned by {@link #activeStreams()}. + * be accessible via {@link #forEachActiveStream(Http2StreamVisitor)}. */ void onStreamClosed(Http2Stream stream); @@ -263,20 +263,7 @@ public interface Http2Connection { * @param visitor The visitor which will visit each active stream. * @return The stream before iteration stopped or {@code null} if iteration went past the end. */ - Http2Stream forEachActiveStream(StreamVisitor visitor) throws Http2Exception; - - /** - * A visitor that allows iteration over a collection of streams. - */ - interface StreamVisitor { - /** - * @return - */ - boolean visit(Http2Stream stream) throws Http2Exception; - } + Http2Stream forEachActiveStream(Http2StreamVisitor visitor) throws Http2Exception; /** * Indicates whether or not the local endpoint for this connection is the server. diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ConnectionHandler.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ConnectionHandler.java index 4f89a77256..6ea08cd72b 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ConnectionHandler.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ConnectionHandler.java @@ -33,7 +33,6 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelOutboundHandler; import io.netty.channel.ChannelPromise; import io.netty.handler.codec.ByteToMessageDecoder; -import io.netty.handler.codec.http2.Http2Connection.StreamVisitor; import io.netty.handler.codec.http2.Http2Exception.CompositeStreamException; import io.netty.handler.codec.http2.Http2Exception.StreamException; import io.netty.util.concurrent.GenericFutureListener; @@ -149,7 +148,7 @@ public class Http2ConnectionHandler extends ByteToMessageDecoder implements Http // Check if there are streams to avoid the overhead of creating the ChannelFuture. if (connection.numActiveStreams() > 0) { final ChannelFuture future = ctx.newSucceededFuture(); - connection.forEachActiveStream(new StreamVisitor() { + connection.forEachActiveStream(new Http2StreamVisitor() { @Override public boolean visit(Http2Stream stream) throws Http2Exception { closeStream(stream, future); diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2Stream.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2Stream.java index 0e741a5aaa..c57a51e652 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2Stream.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2Stream.java @@ -15,8 +15,6 @@ package io.netty.handler.codec.http2; -import java.util.Collection; - /** * A single stream within an HTTP2 connection. Streams are compared to each other by priority. */ @@ -46,7 +44,8 @@ public interface Http2Stream { State state(); /** - * Add this stream to {@link Http2Connection#activeStreams()} and transition state to: + * Opens this stream, making it available via {@link Http2Connection#forEachActiveStream(Http2StreamVisitor)} and + * transition state to: *