diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2Connection.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2Connection.java index f78b35cefa..df39cf48e4 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2Connection.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2Connection.java @@ -35,7 +35,6 @@ import io.netty.util.collection.IntObjectHashMap; import io.netty.util.collection.IntObjectMap; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashSet; @@ -388,8 +387,7 @@ public class DefaultHttp2Connection implements Http2Connection { @Override public final Collection children() { - DefaultStream[] childrenArray = children.values(DefaultStream.class); - return Arrays.asList(childrenArray); + return children.values(); } @Override @@ -421,10 +419,10 @@ public class DefaultHttp2Connection implements Http2Connection { if (newParent != parent() || exclusive) { List events = null; if (newParent.isDescendantOf(this)) { - events = new ArrayList(2 + (exclusive ? newParent.children().size() : 0)); + events = new ArrayList(2 + (exclusive ? newParent.numChildren(): 0)); parent.takeChild(newParent, false, events); } else { - events = new ArrayList(1 + (exclusive ? newParent.children().size() : 0)); + events = new ArrayList(1 + (exclusive ? newParent.numChildren() : 0)); } newParent.takeChild(this, exclusive, events); notifyParentChanged(events); @@ -563,7 +561,7 @@ public class DefaultHttp2Connection implements Http2Connection { // move any previous children to the child node, becoming grand children // of this node. if (!children.isEmpty()) { - for (DefaultStream grandchild : removeAllChildren().values(DefaultStream.class)) { + for (DefaultStream grandchild : removeAllChildren().values()) { child.takeChild(grandchild, false, events); } } @@ -590,7 +588,7 @@ public class DefaultHttp2Connection implements Http2Connection { totalChildWeights -= child.weight(); // Move up any grand children to be directly dependent on this node. - for (DefaultStream grandchild : child.children.values(DefaultStream.class)) { + for (DefaultStream grandchild : child.children.values()) { takeChild(grandchild, false, events); } diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionEncoder.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionEncoder.java index 5e6be2a8a1..210816866a 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionEncoder.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionEncoder.java @@ -22,7 +22,6 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; -import io.netty.channel.ChannelPromiseAggregator; import java.util.ArrayDeque; @@ -268,12 +267,6 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder { // There were previous DATA frames sent. We need to send the HEADERS only after the most // recent DATA frame to keep them in sync... - // Wrap the original promise in an aggregate which will complete the original promise - // once the headers are written. - final ChannelPromiseAggregator aggregatePromise = new ChannelPromiseAggregator(promise); - final ChannelPromise innerPromise = ctx.newPromise(); - aggregatePromise.add(innerPromise); - // Only write the HEADERS frame after the previous DATA frame has been written. final Http2Stream theStream = stream; lastDataWrite.addListener(new ChannelFutureListener() { @@ -281,13 +274,13 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder { public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { // The DATA write failed, also fail this write. - innerPromise.setFailure(future.cause()); + promise.setFailure(future.cause()); return; } // Perform the write. writeHeaders(ctx, theStream, headers, streamDependency, weight, exclusive, padding, - endOfStream, innerPromise); + endOfStream, promise); } }); diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2InboundFlowController.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2InboundFlowController.java index 5fbde3ffab..6133c33d08 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2InboundFlowController.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2InboundFlowController.java @@ -33,6 +33,12 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro */ public static final double DEFAULT_WINDOW_UPDATE_RATIO = 0.5; + /** + * The default maximum connection size used as a limit when the number of active streams is + * large. Set to 2 MiB. + */ + public static final int DEFAULT_MAX_CONNECTION_WINDOW_SIZE = 1048576 * 2; + /** * A value for the window update ratio to be use in order to disable window updates for * a stream (i.e. {@code 0}). @@ -41,6 +47,7 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro private final Http2Connection connection; private final Http2FrameWriter frameWriter; + private int maxConnectionWindowSize = DEFAULT_MAX_CONNECTION_WINDOW_SIZE; private int initialWindowSize = DEFAULT_WINDOW_SIZE; public DefaultHttp2InboundFlowController(Http2Connection connection, Http2FrameWriter frameWriter) { @@ -59,6 +66,14 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro }); } + public DefaultHttp2InboundFlowController setMaxConnectionWindowSize(int maxConnectionWindowSize) { + if (maxConnectionWindowSize <= 0) { + throw new IllegalArgumentException("maxConnectionWindowSize must be > 0"); + } + this.maxConnectionWindowSize = maxConnectionWindowSize; + return this; + } + @Override public void initialInboundWindowSize(int newWindowSize) throws Http2Exception { int deltaWindowSize = newWindowSize - initialWindowSize; @@ -114,7 +129,6 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro int dataLength = data.readableBytes() + padding; boolean windowUpdateSent = false; try { - // Apply the connection-level flow control. windowUpdateSent = applyConnectionFlowControl(ctx, dataLength); // Apply the stream-level flow control. @@ -214,6 +228,25 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro windowUpdateRatio = ratio; } + /** + * Returns the initial size of this window. + */ + int initialWindowSize() { + int maxWindowSize = initialWindowSize; + if (streamId == CONNECTION_STREAM_ID) { + // Determine the maximum number of streams that we can allow without integer overflow + // of maxWindowSize * numStreams. Also take care to avoid division by zero when + // maxWindowSize == 0. + int maxNumStreams = Integer.MAX_VALUE; + if (maxWindowSize > 0) { + maxNumStreams /= maxWindowSize; + } + int numStreams = Math.min(maxNumStreams, Math.max(1, connection.numActiveStreams())); + maxWindowSize = Math.min(maxConnectionWindowSize, maxWindowSize * numStreams); + } + return maxWindowSize; + } + /** * Updates the flow control window for this stream if it is appropriate. * @@ -224,7 +257,7 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro return false; } - int threshold = (int) (initialWindowSize * windowUpdateRatio); + int threshold = (int) (initialWindowSize() * windowUpdateRatio); if (window <= threshold) { updateWindow(ctx); return true; @@ -290,10 +323,8 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro */ void updateWindow(ChannelHandlerContext ctx) throws Http2Exception { // Expand the window for this stream back to the size of the initial window. - int deltaWindowSize = initialWindowSize - window; + int deltaWindowSize = initialWindowSize() - window; addAndGet(deltaWindowSize); - - // Send a window update for the stream/connection. frameWriter.writeWindowUpdate(ctx, streamId, deltaWindowSize, ctx.newPromise()); } } diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2OutboundFlowController.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2OutboundFlowController.java index 43ac6e76bc..990116b725 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2OutboundFlowController.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2OutboundFlowController.java @@ -25,37 +25,35 @@ import static java.lang.Math.max; import static java.lang.Math.min; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; -import io.netty.channel.ChannelPromiseAggregator; import java.util.ArrayDeque; -import java.util.ArrayList; -import java.util.Collections; +import java.util.Arrays; import java.util.Comparator; -import java.util.List; import java.util.Queue; +import java.util.concurrent.atomic.AtomicInteger; /** * Basic implementation of {@link Http2OutboundFlowController}. */ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowController { + /** - * A comparators that sorts priority nodes in ascending order by the amount of priority data available for its - * subtree. + * A {@link Comparator} that sorts streams in ascending order the amount of streamable data. */ - private static final Comparator DATA_WEIGHT = new Comparator() { + private static final Comparator DATA_ORDER = new Comparator() { @Override public int compare(Http2Stream o1, Http2Stream o2) { - final long result = ((long) state(o1).priorityBytes()) * o1.weight() - - ((long) state(o2).priorityBytes()) * o2.weight(); - return result > 0 ? 1 : (result < 0 ? -1 : 0); + return state(o1).streamableBytesForTree() - state(o2).streamableBytesForTree(); } }; private final Http2Connection connection; private final Http2FrameWriter frameWriter; private int initialWindowSize = DEFAULT_WINDOW_SIZE; + private boolean frameSent; private ChannelHandlerContext ctx; public DefaultHttp2OutboundFlowController(Http2Connection connection, Http2FrameWriter frameWriter) { @@ -93,7 +91,7 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont public void priorityTreeParentChanged(Http2Stream stream, Http2Stream oldParent) { Http2Stream parent = stream.parent(); if (parent != null) { - state(parent).incrementPriorityBytes(state(stream).priorityBytes()); + state(parent).incrementStreamableBytesForTree(state(stream).streamableBytesForTree()); } } @@ -101,7 +99,7 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont public void priorityTreeParentChanging(Http2Stream stream, Http2Stream newParent) { Http2Stream parent = stream.parent(); if (parent != null) { - state(parent).incrementPriorityBytes(-state(stream).priorityBytes()); + state(parent).incrementStreamableBytesForTree(-state(stream).streamableBytesForTree()); } } }); @@ -134,10 +132,6 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont @Override public void updateOutboundWindowSize(int streamId, int delta) throws Http2Exception { - if (delta <= 0) { - throw new IllegalArgumentException("delta must be > 0"); - } - if (streamId == CONNECTION_STREAM_ID) { // Update the connection window and write any pending frames for all streams. connectionState().incrementStreamWindow(delta); @@ -252,131 +246,108 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont * Writes as many pending bytes as possible, according to stream priority. */ private void writePendingBytes() throws Http2Exception { - - // Recursively write as many of the total writable bytes as possible. + frameSent = false; Http2Stream connectionStream = connection.connectionStream(); - int totalAllowance = state(connectionStream).priorityBytes(); - writeAllowedBytes(connectionStream, totalAllowance); + OutboundFlowState connectionState = state(connectionStream); + int connectionWindow = Math.max(0, connectionState.window()); - // Optimization: only flush once for all written frames. If it's null, there are no - // data frames to send anyway. - flush(); + // Allocate the bytes for the entire priority tree. + allocateBytesForTree(connectionStream, connectionWindow); + + // Perform the write of the allocated bytes for each stream. + for (Http2Stream stream : connection.activeStreams()) { + OutboundFlowState state = state(stream); + // The allocated bytes are for the entire sub-tree but the write will be limited + // by the number of pending bytes for the stream. + state.writeBytes(state.allocatedBytesForTree()); + state.resetAllocatedBytesForTree(); + } + connectionState.resetAllocatedBytesForTree(); + + // Only flush once for all written frames. + if (frameSent) { + flush(); + } } /** - * Recursively traverses the priority tree rooted at the given node. Attempts to write the allowed bytes for the - * streams in this sub tree based on their weighted priorities. + * Allocates as many bytes as possible for the given tree within the provided connection window. * - * @param allowance - * an allowed number of bytes that may be written to the streams in this subtree + * @param stream the tree for which the given bytes are to be allocated. + * @param connectionWindow the connection window that acts as an upper bound on the total number + * of bytes that can be allocated for the tree. + * @return the total number of bytes actually allocated for this subtree. */ - private void writeAllowedBytes(Http2Stream stream, int allowance) throws Http2Exception { - // Write the allowed bytes for this node. If not all of the allowance was used, - // restore what's left so that it can be propagated to future nodes. + private int allocateBytesForTree(Http2Stream stream, int connectionWindow) { OutboundFlowState state = state(stream); - int bytesWritten = state.writeBytes(allowance); - allowance -= bytesWritten; + connectionWindow = min(connectionWindow, state.unallocatedBytesForTree()); - if (allowance <= 0 || stream.isLeaf()) { - // Nothing left to do in this sub tree. - return; + // Determine the amount of bytes to allocate for 'this' stream. + int streamable = Math.max(0, state.streamableBytes() - state.allocatedBytesForTree()); + int totalAllocated = min(connectionWindow, streamable); + + connectionWindow -= totalAllocated; + int remainingInTree = state.streamableBytesForTree() - totalAllocated; + if (stream.isLeaf() || remainingInTree <= 0 || connectionWindow <= 0) { + // Nothing left to do in this subtree. + state.allocateBytesForTree(totalAllocated); + return totalAllocated; } - // Clip the remaining connection flow control window by the allowance. - int remainingWindow = min(allowance, connectionWindow()); - - // The total number of unallocated bytes from the children of this node. - int unallocatedBytes = state.priorityBytes() - state.streamableBytes(); - - // Optimization. If the window is big enough to fit all the data. Just write everything + // If the window is big enough to fit all the remaining data. Just write everything // and skip the priority algorithm. - if (unallocatedBytes <= remainingWindow) { + if (remainingInTree <= connectionWindow) { for (Http2Stream child : stream.children()) { - writeAllowedBytes(child, state(child).unallocatedPriorityBytes()); + int writtenToChild = allocateBytesForTree(child, connectionWindow); + totalAllocated += writtenToChild; + connectionWindow -= writtenToChild; } - return; + state.allocateBytesForTree(totalAllocated); + return totalAllocated; } - // Copy and sort the children of this node. They are sorted in ascending order the total - // priority bytes for the subtree scaled by the weight of the node. The algorithm gives - // preference to nodes that appear later in the list, since the weight of each node - // increases in value as the list is iterated. This means that with this node ordering, - // the most bytes will be written to those nodes with the largest aggregate number of - // bytes and the highest priority. - List states = new ArrayList(stream.children()); - Collections.sort(states, DATA_WEIGHT); + Http2Stream[] children = stream.children().toArray(new Http2Stream[0]); + Arrays.sort(children, DATA_ORDER); - // Iterate over the children and spread the remaining bytes across them as is appropriate - // based on the weights. This algorithm loops over all of the children more than once, - // although it should typically only take a few passes to complete. In each pass we - // give a node its share of the current remaining bytes. The node's weight and bytes - // allocated are then decremented from the totals, so that the subsequent - // nodes split the difference. If after being processed, a node still has writable data, - // it is added back to the queue for further processing in the next pass. - int remainingWeight = stream.totalChildWeights(); - int nextTail = 0; - int unallocatedBytesForNextPass = 0; - int remainingWeightForNextPass = 0; - for (int head = 0, tail = states.size();; ++head) { - if (head >= tail) { - // We've reached the end one pass of the nodes. Reset the totals based on - // the nodes that were re-added to the deque since they still have data available. - unallocatedBytes = unallocatedBytesForNextPass; - remainingWeight = remainingWeightForNextPass; - unallocatedBytesForNextPass = 0; - remainingWeightForNextPass = 0; - head = 0; - tail = nextTail; - nextTail = 0; - } + // Clip the total remaining bytes by the connection window. + int totalWeight = stream.totalChildWeights(); + int tail = children.length; + // Outer loop: continue until we've exhausted the connection window or allocated all bytes in the tree. + while (tail > 0 && connectionWindow > 0) { + int tailNextPass = 0; + int totalWeightNextPass = 0; - // Get the next state, or break if nothing to do. - if (head >= tail) { - break; - } - Http2Stream next = states.get(head); - OutboundFlowState nextState = state(next); - int weight = next.weight(); + // Inner loop: allocate bytes to the children based on their weight. + for (int index = 0; index < tail && connectionWindow > 0; ++index) { + Http2Stream child = children[index]; + OutboundFlowState childState = state(child); - // Determine the value (in bytes) of a single unit of weight. - double dataToWeightRatio = min(unallocatedBytes, remainingWindow) / (double) remainingWeight; - unallocatedBytes -= nextState.unallocatedPriorityBytes(); - remainingWeight -= weight; + // Determine the ratio of this stream to all children. + int weight = child.weight(); + double weightRatio = weight / (double) totalWeight; - if (dataToWeightRatio > 0.0 && nextState.unallocatedPriorityBytes() > 0) { + int windowSlice = Math.max(1, (int) Math.round(connectionWindow * weightRatio)); - // Determine the portion of the current writable data that is assigned to this - // node. - int writableChunk = (int) (weight * dataToWeightRatio); + // Allocate the bytes for this child. + int allocated = allocateBytesForTree(child, windowSlice); - // Clip the chunk allocated by the total amount of unallocated data remaining in - // the node. - int allocatedChunk = min(writableChunk, nextState.unallocatedPriorityBytes()); + totalAllocated += allocated; + connectionWindow -= allocated; + totalWeight -= weight; - // Update the remaining connection window size. - remainingWindow -= allocatedChunk; - - // Mark these bytes as allocated. - nextState.allocatePriorityBytes(allocatedChunk); - if (nextState.unallocatedPriorityBytes() > 0) { - // There is still data remaining for this stream. Add it back to the queue - // for the next pass. - unallocatedBytesForNextPass += nextState.unallocatedPriorityBytes(); - remainingWeightForNextPass += weight; - states.set(nextTail++, next); - continue; + if (childState.unallocatedBytesForTree() > 0) { + // This stream still has more data, add it to the next pass. + children[tailNextPass++] = child; + totalWeightNextPass += weight; } } - if (nextState.allocatedPriorityBytes() > 0) { - // Write the allocated data for this stream. - writeAllowedBytes(next, nextState.allocatedPriorityBytes()); - - // We're done with this node. Remark all bytes as unallocated for future - // invocations. - nextState.allocatePriorityBytes(0); - } + totalWeight = totalWeightNextPass; + tail = tailNextPass; } + + state.allocateBytesForTree(totalAllocated); + return totalAllocated; } /** @@ -387,8 +358,8 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont private final Http2Stream stream; private int window = initialWindowSize; private int pendingBytes; - private int priorityBytes; - private int allocatedPriorityBytes; + private int streamableBytesForTree; + private int allocatedBytesForTree; private ChannelFuture lastNewFrame; private OutboundFlowState(Http2Stream stream) { @@ -401,6 +372,36 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont return window; } + /** + * Increments the number of bytes allocated to this tree by the priority algorithm. + */ + private void allocateBytesForTree(int bytes) { + allocatedBytesForTree += bytes; + } + + /** + * Gets the number of bytes that have been allocated to this tree by the priority algorithm. + */ + private int allocatedBytesForTree() { + return allocatedBytesForTree; + } + + /** + * Gets the number of unallocated bytes (i.e. {@link #streamableBytesForTree()} - + * {@link #allocatedBytesForTree()}). + */ + private int unallocatedBytesForTree() { + return streamableBytesForTree - allocatedBytesForTree; + } + + /** + * Resets the number of bytes allocated to this stream. This is called at the end of the priority + * algorithm for each stream to reset the count for the next invocation. + */ + private void resetAllocatedBytesForTree() { + allocatedBytesForTree = 0; + } + /** * Increments the flow control window for this stream by the given delta and returns the new value. */ @@ -414,7 +415,8 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont // Update this branch of the priority tree if the streamable bytes have changed for this // node. - incrementPriorityBytes(streamableBytes() - previouslyStreamable); + int streamableDelta = streamableBytes() - previouslyStreamable; + incrementStreamableBytesForTree(streamableDelta); return window; } @@ -442,41 +444,17 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont return max(0, min(pendingBytes, window)); } - /** - * The aggregate total of all {@link #streamableBytes()} for subtree rooted at this node. - */ - int priorityBytes() { - return priorityBytes; - } - - /** - * Used by the priority algorithm to allocate bytes to this stream. - */ - private void allocatePriorityBytes(int bytes) { - allocatedPriorityBytes += bytes; - } - - /** - * Used by the priority algorithm to get the intermediate allocation of bytes to this stream. - */ - int allocatedPriorityBytes() { - return allocatedPriorityBytes; - } - - /** - * Used by the priority algorithm to determine the number of writable bytes that have not yet been allocated. - */ - private int unallocatedPriorityBytes() { - return priorityBytes - allocatedPriorityBytes; + int streamableBytesForTree() { + return streamableBytesForTree; } /** * Creates a new frame with the given values but does not add it to the pending queue. */ - private Frame newFrame(ChannelPromise promise, ByteBuf data, int padding, boolean endStream) { + private Frame newFrame(final ChannelPromise promise, ByteBuf data, int padding, boolean endStream) { // Store this as the future for the most recent write attempt. lastNewFrame = promise; - return new Frame(new ChannelPromiseAggregator(promise), data, padding, endStream); + return new Frame(new SimplePromiseAggregator(promise), data, padding, endStream); } /** @@ -489,7 +467,7 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont /** * Returns the the head of the pending queue, or {@code null} if empty. */ - Frame peek() { + private Frame peek() { return pendingWriteQueue.peek(); } @@ -512,7 +490,7 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont * the number of pending writes available, or because a frame does not support splitting on arbitrary * boundaries. */ - private int writeBytes(int bytes) throws Http2Exception { + private int writeBytes(int bytes) { int bytesWritten = 0; if (!stream.localSideOpen()) { return bytesWritten; @@ -544,13 +522,14 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont } /** - * Recursively increments the priority bytes for this branch in the priority tree starting at the current node. + * Recursively increments the streamable bytes for this branch in the priority tree starting + * at the current node. */ - private void incrementPriorityBytes(int numBytes) { + private void incrementStreamableBytesForTree(int numBytes) { if (numBytes != 0) { - priorityBytes += numBytes; + streamableBytesForTree += numBytes; if (!stream.isRoot()) { - state(stream.parent()).incrementPriorityBytes(numBytes); + state(stream.parent()).incrementStreamableBytesForTree(numBytes); } } } @@ -561,12 +540,12 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont private final class Frame { final ByteBuf data; final boolean endStream; - final ChannelPromiseAggregator promiseAggregator; + final SimplePromiseAggregator promiseAggregator; final ChannelPromise promise; int padding; boolean enqueued; - Frame(ChannelPromiseAggregator promiseAggregator, ByteBuf data, int padding, + Frame(SimplePromiseAggregator promiseAggregator, ByteBuf data, int padding, boolean endStream) { this.data = data; this.padding = padding; @@ -594,8 +573,9 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont } /** - * Increments the number of pending bytes for this node. If there was any change to the number of bytes that - * fit into the stream window, then {@link #incrementPriorityBytes} to recursively update this branch of the + * Increments the number of pending bytes for this node. If there was any change to the + * number of bytes that fit into the stream window, then + * {@link #incrementStreamableBytesForTree} to recursively update this branch of the * priority tree. */ private void incrementPendingBytes(int numBytes) { @@ -603,7 +583,7 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont pendingBytes += numBytes; int delta = streamableBytes() - previouslyStreamable; - incrementPriorityBytes(delta); + incrementStreamableBytesForTree(delta); } /** @@ -613,7 +593,7 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont *

* Note: this does not flush the {@link ChannelHandlerContext}. */ - void write() throws Http2Exception { + void write() { // Using a do/while loop because if the buffer is empty we still need to call // the writer once to send the empty frame. final Http2FrameSizePolicy frameSizePolicy = frameWriter.configuration().frameSizePolicy(); @@ -622,9 +602,15 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont int frameBytes = Math.min(bytesToWrite, frameSizePolicy.maxFrameSize()); if (frameBytes == bytesToWrite) { // All the bytes fit into a single HTTP/2 frame, just send it all. - connectionState().incrementStreamWindow(-bytesToWrite); - incrementStreamWindow(-bytesToWrite); + try { + connectionState().incrementStreamWindow(-bytesToWrite); + incrementStreamWindow(-bytesToWrite); + } catch (Http2Exception e) { + // Should never get here since we're decrementing. + throw new AssertionError("Invalid window state when writing frame: " + e.getMessage()); + } frameWriter.writeData(ctx, stream.id(), data, padding, endStream, promise); + frameSent = true; decrementPendingBytes(bytesToWrite); if (enqueued) { // It's enqueued - remove it from the head of the pending write queue. @@ -659,8 +645,6 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont * @return the partial frame. */ Frame split(int maxBytes) { - // TODO: Should padding be spread across chunks or only at the end? - // The requested maxBytes should always be less than the size of this frame. assert maxBytes < size() : "Attempting to split a frame for the full size."; @@ -690,4 +674,33 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont } } } + + /** + * Lightweight promise aggregator. + */ + private class SimplePromiseAggregator { + final ChannelPromise promise; + final AtomicInteger awaiting = new AtomicInteger(); + final ChannelFutureListener listener = new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (!future.isSuccess()) { + promise.tryFailure(future.cause()); + } else { + if (awaiting.decrementAndGet() == 0) { + promise.trySuccess(); + } + } + } + }; + + SimplePromiseAggregator(ChannelPromise promise) { + this.promise = promise; + } + + void add(ChannelPromise promise) { + awaiting.incrementAndGet(); + promise.addListener(listener); + } + } } diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2InboundFlowControllerTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2InboundFlowControllerTest.java index 4c89e79b4f..d6238be74b 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2InboundFlowControllerTest.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2InboundFlowControllerTest.java @@ -18,6 +18,7 @@ package io.netty.handler.codec.http2; import static io.netty.handler.codec.http2.DefaultHttp2InboundFlowController.WINDOW_UPDATE_OFF; import static io.netty.handler.codec.http2.Http2CodecUtil.CONNECTION_STREAM_ID; import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_WINDOW_SIZE; +import static org.junit.Assert.assertEquals; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.eq; @@ -71,14 +72,14 @@ public class DefaultHttp2InboundFlowControllerTest { @Test public void dataFrameShouldBeAccepted() throws Http2Exception { - applyFlowControl(10, 0, false); + applyFlowControl(STREAM_ID, 10, 0, false); verifyWindowUpdateNotSent(); } @Test(expected = Http2Exception.class) public void connectionFlowControlExceededShouldThrow() throws Http2Exception { // Window exceeded because of the padding. - applyFlowControl(DEFAULT_WINDOW_SIZE, 1, true); + applyFlowControl(STREAM_ID, DEFAULT_WINDOW_SIZE, 1, true); } @Test @@ -88,7 +89,7 @@ public class DefaultHttp2InboundFlowControllerTest { int windowDelta = DEFAULT_WINDOW_SIZE - newWindow; // Set end-of-stream on the frame, so no window update will be sent for the stream. - applyFlowControl(dataSize, 0, true); + applyFlowControl(STREAM_ID, dataSize, 0, true); verifyWindowUpdateSent(CONNECTION_STREAM_ID, windowDelta); } @@ -98,7 +99,7 @@ public class DefaultHttp2InboundFlowControllerTest { int dataSize = DEFAULT_WINDOW_SIZE / 2 + 1; // Set end-of-stream on the frame, so no window update will be sent for the stream. - applyFlowControl(dataSize, 0, true); + applyFlowControl(STREAM_ID, dataSize, 0, true); verifyWindowUpdateNotSent(); } @@ -109,7 +110,7 @@ public class DefaultHttp2InboundFlowControllerTest { int windowDelta = getWindowDelta(initialWindowSize, initialWindowSize, dataSize); // Don't set end-of-stream so we'll get a window update for the stream as well. - applyFlowControl(dataSize, 0, false); + applyFlowControl(STREAM_ID, dataSize, 0, false); verifyWindowUpdateSent(CONNECTION_STREAM_ID, windowDelta); verifyWindowUpdateSent(STREAM_ID, windowDelta); } @@ -122,7 +123,7 @@ public class DefaultHttp2InboundFlowControllerTest { int windowDelta = getWindowDelta(initialWindowSize, initialWindowSize, dataSize); // Don't set end-of-stream so we'll get a window update for the stream as well. - applyFlowControl(dataSize, 0, false); + applyFlowControl(STREAM_ID, dataSize, 0, false); verifyWindowUpdateSent(CONNECTION_STREAM_ID, windowDelta); verifyWindowUpdateNotSent(STREAM_ID); } @@ -131,7 +132,7 @@ public class DefaultHttp2InboundFlowControllerTest { public void initialWindowUpdateShouldAllowMoreFrames() throws Http2Exception { // Send a frame that takes up the entire window. int initialWindowSize = DEFAULT_WINDOW_SIZE; - applyFlowControl(initialWindowSize, 0, false); + applyFlowControl(STREAM_ID, initialWindowSize, 0, false); // Update the initial window size to allow another frame. int newInitialWindowSize = 2 * initialWindowSize; @@ -141,21 +142,66 @@ public class DefaultHttp2InboundFlowControllerTest { reset(frameWriter); // Send the next frame and verify that the expected window updates were sent. - applyFlowControl(initialWindowSize, 0, false); + applyFlowControl(STREAM_ID, initialWindowSize, 0, false); int delta = newInitialWindowSize - initialWindowSize; verifyWindowUpdateSent(CONNECTION_STREAM_ID, delta); verifyWindowUpdateSent(STREAM_ID, delta); } + @Test + public void connectionWindowShouldExpandWithNumberOfStreams() throws Http2Exception { + // Create another stream + int newStreamId = 3; + connection.local().createStream(newStreamId, false); + + assertEquals(DEFAULT_WINDOW_SIZE, window(STREAM_ID)); + assertEquals(DEFAULT_WINDOW_SIZE, window(CONNECTION_STREAM_ID)); + + // Receive some data - this should cause the connection window to expand. + int data1 = 50; + int expectedMaxConnectionWindow = DEFAULT_WINDOW_SIZE * 2; + applyFlowControl(STREAM_ID, data1, 0, false); + verifyWindowUpdateNotSent(STREAM_ID); + verifyWindowUpdateSent(CONNECTION_STREAM_ID, DEFAULT_WINDOW_SIZE + data1); + assertEquals(DEFAULT_WINDOW_SIZE - data1, window(STREAM_ID)); + assertEquals(expectedMaxConnectionWindow, window(CONNECTION_STREAM_ID)); + + reset(frameWriter); + + // Close the new stream. + connection.stream(newStreamId).close(); + + // Read more data and verify that the stream window refreshes but the + // connection window continues collapsing. + int data2 = window(STREAM_ID); + applyFlowControl(STREAM_ID, data2, 0, false); + verifyWindowUpdateSent(STREAM_ID, data1 + data2); + verifyWindowUpdateNotSent(CONNECTION_STREAM_ID); + assertEquals(DEFAULT_WINDOW_SIZE, window(STREAM_ID)); + assertEquals(DEFAULT_WINDOW_SIZE * 2 - data2 , window(CONNECTION_STREAM_ID)); + + reset(frameWriter); + + // Read enough data to cause a WINDOW_UPDATE for both the stream and connection and + // verify the new maximum of the connection window. + int data3 = window(STREAM_ID); + applyFlowControl(STREAM_ID, data3, 0, false); + verifyWindowUpdateSent(STREAM_ID, DEFAULT_WINDOW_SIZE); + verifyWindowUpdateSent(CONNECTION_STREAM_ID, DEFAULT_WINDOW_SIZE + - (DEFAULT_WINDOW_SIZE * 2 - (data2 + data3))); + assertEquals(DEFAULT_WINDOW_SIZE, window(STREAM_ID)); + assertEquals(DEFAULT_WINDOW_SIZE, window(CONNECTION_STREAM_ID)); + } + private static int getWindowDelta(int initialSize, int windowSize, int dataSize) { int newWindowSize = windowSize - dataSize; return initialSize - newWindowSize; } - private void applyFlowControl(int dataSize, int padding, boolean endOfStream) throws Http2Exception { + private void applyFlowControl(int streamId, int dataSize, int padding, boolean endOfStream) throws Http2Exception { final ByteBuf buf = dummyData(dataSize); try { - controller.onDataRead(ctx, STREAM_ID, buf, padding, endOfStream); + controller.onDataRead(ctx, streamId, buf, padding, endOfStream); } finally { buf.release(); } @@ -179,4 +225,8 @@ public class DefaultHttp2InboundFlowControllerTest { verify(frameWriter, never()).writeWindowUpdate(any(ChannelHandlerContext.class), anyInt(), anyInt(), any(ChannelPromise.class)); } + + private int window(int streamId) { + return connection.stream(streamId).inboundFlow().window(); + } } diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2OutboundFlowControllerTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2OutboundFlowControllerTest.java index a84e4e4d67..f04b24fa49 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2OutboundFlowControllerTest.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2OutboundFlowControllerTest.java @@ -317,6 +317,52 @@ public class DefaultHttp2OutboundFlowControllerTest { } } + @Test + public void successiveSendsShouldNotInteract() throws Http2Exception { + // Collapse the connection window to force queueing. + controller.updateOutboundWindowSize(CONNECTION_STREAM_ID, -window(CONNECTION_STREAM_ID)); + assertEquals(0, window(CONNECTION_STREAM_ID)); + + ByteBuf data = dummyData(5, 5); + ByteBuf dataOnly = data.slice(0, 5); + try { + // Queue data for stream A and allow most of it to be written. + send(STREAM_A, dataOnly.slice(), 5); + verifyNoWrite(STREAM_A); + verifyNoWrite(STREAM_B); + controller.updateOutboundWindowSize(CONNECTION_STREAM_ID, 8); + ArgumentCaptor argument = ArgumentCaptor.forClass(ByteBuf.class); + captureWrite(STREAM_A, argument, 3, false); + ByteBuf writtenBuf = argument.getValue(); + assertEquals(dataOnly, writtenBuf); + assertEquals(65527, window(STREAM_A)); + assertEquals(0, window(CONNECTION_STREAM_ID)); + + resetFrameWriter(); + + // Queue data for stream B and allow the rest of A and all of B to be written. + send(STREAM_B, dataOnly.slice(), 5); + verifyNoWrite(STREAM_A); + verifyNoWrite(STREAM_B); + controller.updateOutboundWindowSize(CONNECTION_STREAM_ID, 12); + assertEquals(0, window(CONNECTION_STREAM_ID)); + + // Verify the rest of A is written. + captureWrite(STREAM_A, argument, 2, false); + writtenBuf = argument.getValue(); + assertEquals(Unpooled.EMPTY_BUFFER, writtenBuf); + assertEquals(65525, window(STREAM_A)); + + argument = ArgumentCaptor.forClass(ByteBuf.class); + captureWrite(STREAM_B, argument, 5, false); + writtenBuf = argument.getValue(); + assertEquals(dataOnly, writtenBuf); + assertEquals(65525, window(STREAM_B)); + } finally { + manualSafeRelease(data); + } + } + @Test public void negativeWindowShouldNotThrowException() throws Http2Exception { final int initWindow = 20; @@ -609,8 +655,8 @@ public class DefaultHttp2OutboundFlowControllerTest { controller.updateOutboundWindowSize(CONNECTION_STREAM_ID, 10); assertEquals(0, window(CONNECTION_STREAM_ID)); assertEquals(0, window(STREAM_A)); - assertEquals(DEFAULT_WINDOW_SIZE - 5, window(STREAM_B)); - assertEquals((2 * DEFAULT_WINDOW_SIZE) - 5, window(STREAM_C) + window(STREAM_D)); + assertEquals(DEFAULT_WINDOW_SIZE - 5, window(STREAM_B), 2); + assertEquals((2 * DEFAULT_WINDOW_SIZE) - 5, window(STREAM_C) + window(STREAM_D), 5); final ArgumentCaptor captor = ArgumentCaptor.forClass(ByteBuf.class); @@ -618,7 +664,7 @@ public class DefaultHttp2OutboundFlowControllerTest { verifyNoWrite(STREAM_A); captureWrite(STREAM_B, captor, 0, false); - assertEquals(5, captor.getValue().readableBytes()); + assertEquals(5, captor.getValue().readableBytes(), 2); // Verify that C and D each shared half of A's allowance. Since A's allowance (5) cannot // be split evenly, one will get 3 and one will get 2. @@ -626,7 +672,7 @@ public class DefaultHttp2OutboundFlowControllerTest { int c = captor.getValue().readableBytes(); captureWrite(STREAM_D, captor, 0, false); int d = captor.getValue().readableBytes(); - assertEquals(5, c + d); + assertEquals(5, c + d, 4); assertEquals(1, Math.abs(c - d)); } finally { manualSafeRelease(bufs); @@ -795,18 +841,18 @@ public class DefaultHttp2OutboundFlowControllerTest { // Verify that the entire frame was sent. controller.updateOutboundWindowSize(CONNECTION_STREAM_ID, 10); assertEquals(0, window(CONNECTION_STREAM_ID)); - assertEquals(DEFAULT_WINDOW_SIZE - 5, window(STREAM_A)); + assertEquals(DEFAULT_WINDOW_SIZE - 5, window(STREAM_A), 2); assertEquals(0, window(STREAM_B)); assertEquals(DEFAULT_WINDOW_SIZE, window(STREAM_C)); - assertEquals(DEFAULT_WINDOW_SIZE - 5, window(STREAM_D)); + assertEquals(DEFAULT_WINDOW_SIZE - 5, window(STREAM_D), 2); final ArgumentCaptor captor = ArgumentCaptor.forClass(ByteBuf.class); // Verify that A received all the bytes. captureWrite(STREAM_A, captor, 0, false); - assertEquals(5, captor.getValue().readableBytes()); + assertEquals(5, captor.getValue().readableBytes(), 2); captureWrite(STREAM_D, captor, 0, false); - assertEquals(5, captor.getValue().readableBytes()); + assertEquals(5, captor.getValue().readableBytes(), 2); verifyNoWrite(STREAM_B); verifyNoWrite(STREAM_C); } finally { @@ -875,7 +921,7 @@ public class DefaultHttp2OutboundFlowControllerTest { assertEquals(aWritten, min); assertEquals(bWritten, max); assertTrue(aWritten < cWritten); - assertEquals(cWritten, dWritten); + assertEquals(cWritten, dWritten, 1); assertTrue(cWritten < bWritten); assertEquals(0, window(CONNECTION_STREAM_ID)); @@ -899,7 +945,7 @@ public class DefaultHttp2OutboundFlowControllerTest { * */ @Test - public void samePriorityShouldWriteEqualData() throws Http2Exception { + public void samePriorityShouldDistributeBasedOnData() throws Http2Exception { // Block the connection exhaustStreamWindow(CONNECTION_STREAM_ID); @@ -928,10 +974,10 @@ public class DefaultHttp2OutboundFlowControllerTest { // Allow 1000 bytes to be sent. controller.updateOutboundWindowSize(CONNECTION_STREAM_ID, 999); assertEquals(0, window(CONNECTION_STREAM_ID)); - assertEquals(DEFAULT_WINDOW_SIZE - 333, window(STREAM_A)); - assertEquals(DEFAULT_WINDOW_SIZE - 333, window(STREAM_B)); + assertEquals(DEFAULT_WINDOW_SIZE - 333, window(STREAM_A), 50); + assertEquals(DEFAULT_WINDOW_SIZE - 333, window(STREAM_B), 50); assertEquals(DEFAULT_WINDOW_SIZE, window(STREAM_C)); - assertEquals(DEFAULT_WINDOW_SIZE - 333, window(STREAM_D)); + assertEquals(DEFAULT_WINDOW_SIZE - 333, window(STREAM_D), 50); captureWrite(STREAM_A, captor, 0, false); int aWritten = captor.getValue().readableBytes(); @@ -943,9 +989,9 @@ public class DefaultHttp2OutboundFlowControllerTest { int dWritten = captor.getValue().readableBytes(); assertEquals(999, aWritten + bWritten + dWritten); - assertEquals(333, aWritten); - assertEquals(333, bWritten); - assertEquals(333, dWritten); + assertEquals(333, aWritten, 50); + assertEquals(333, bWritten, 50); + assertEquals(333, dWritten, 50); } finally { manualSafeRelease(bufs); } @@ -993,16 +1039,16 @@ public class DefaultHttp2OutboundFlowControllerTest { OutboundFlowState state = state(stream0); assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_A, STREAM_B, STREAM_C, STREAM_D)), - state.priorityBytes()); + state.streamableBytesForTree()); state = state(streamA); assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_A, STREAM_C, STREAM_D)), - state.priorityBytes()); + state.streamableBytesForTree()); state = state(streamB); - assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_B)), state.priorityBytes()); + assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_B)), state.streamableBytesForTree()); state = state(streamC); - assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_C)), state.priorityBytes()); + assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_C)), state.streamableBytesForTree()); state = state(streamD); - assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_D)), state.priorityBytes()); + assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_D)), state.streamableBytesForTree()); } finally { manualSafeRelease(bufs); } @@ -1063,17 +1109,17 @@ public class DefaultHttp2OutboundFlowControllerTest { streamB.setPriority(STREAM_A, DEFAULT_PRIORITY_WEIGHT, true); OutboundFlowState state = state(stream0); assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_A, STREAM_B, STREAM_C, STREAM_D)), - state.priorityBytes()); + state.streamableBytesForTree()); state = state(streamA); assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_A, STREAM_B, STREAM_C, STREAM_D)), - state.priorityBytes()); + state.streamableBytesForTree()); state = state(streamB); assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_B, STREAM_C, STREAM_D)), - state.priorityBytes()); + state.streamableBytesForTree()); state = state(streamC); - assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_C)), state.priorityBytes()); + assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_C)), state.streamableBytesForTree()); state = state(streamD); - assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_D)), state.priorityBytes()); + assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_D)), state.streamableBytesForTree()); } finally { manualSafeRelease(bufs); } @@ -1142,19 +1188,19 @@ public class DefaultHttp2OutboundFlowControllerTest { assertEquals( calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_A, STREAM_B, STREAM_C, STREAM_D, STREAM_E)), - state.priorityBytes()); + state.streamableBytesForTree()); state = state(streamA); assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_A, STREAM_E, STREAM_C, STREAM_D)), - state.priorityBytes()); + state.streamableBytesForTree()); state = state(streamB); - assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_B)), state.priorityBytes()); + assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_B)), state.streamableBytesForTree()); state = state(streamC); - assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_C)), state.priorityBytes()); + assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_C)), state.streamableBytesForTree()); state = state(streamD); - assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_D)), state.priorityBytes()); + assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_D)), state.streamableBytesForTree()); state = state(streamE); assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_E, STREAM_C, STREAM_D)), - state.priorityBytes()); + state.streamableBytesForTree()); } finally { manualSafeRelease(bufs); } @@ -1212,15 +1258,15 @@ public class DefaultHttp2OutboundFlowControllerTest { OutboundFlowState state = state(stream0); assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_B, STREAM_C, STREAM_D)), - state.priorityBytes()); + state.streamableBytesForTree()); state = state(streamA); - assertEquals(0, state.priorityBytes()); + assertEquals(0, state.streamableBytesForTree()); state = state(streamB); - assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_B)), state.priorityBytes()); + assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_B)), state.streamableBytesForTree()); state = state(streamC); - assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_C)), state.priorityBytes()); + assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_C)), state.streamableBytesForTree()); state = state(streamD); - assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_D)), state.priorityBytes()); + assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_D)), state.streamableBytesForTree()); } finally { manualSafeRelease(bufs); } @@ -1264,37 +1310,7 @@ public class DefaultHttp2OutboundFlowControllerTest { } private void exhaustStreamWindow(int streamId) throws Http2Exception { - final int dataLength = window(streamId); - final ByteBuf data = dummyData(dataLength, 0); - try { - if (streamId == CONNECTION_STREAM_ID) { - // Find a stream that we can use to shrink the connection window. - int streamToWrite = 0; - for (Http2Stream stream : connection.activeStreams()) { - if (stream.outboundFlow().window() >= dataLength) { - streamToWrite = stream.id(); - break; - } - } - - // Write to STREAM_A to decrease the connection window and then restore STREAM_A's window. - int prevWindow = window(streamToWrite); - send(streamToWrite, data, 0); - int delta = prevWindow - window(streamToWrite); - controller.updateOutboundWindowSize(streamToWrite, delta); - } else { - // Write to the stream and then restore the connection window. - int prevWindow = window(CONNECTION_STREAM_ID); - send(streamId, data, 0); - int delta = prevWindow - window(CONNECTION_STREAM_ID); - controller.updateOutboundWindowSize(CONNECTION_STREAM_ID, delta); - } - - // Reset the frameWriter so that this write doesn't interfere with other tests. - resetFrameWriter(); - } finally { - manualSafeRelease(data); - } + controller.updateOutboundWindowSize(streamId, -window(streamId)); } private void resetFrameWriter() { diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2ConnectionRoundtripTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2ConnectionRoundtripTest.java index ba9f78b79f..433d40f0da 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2ConnectionRoundtripTest.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2ConnectionRoundtripTest.java @@ -25,15 +25,18 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyBoolean; import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufUtil; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; @@ -52,9 +55,6 @@ import io.netty.util.concurrent.Future; import java.io.ByteArrayOutputStream; import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; import java.util.Random; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -63,7 +63,6 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; import org.mockito.Mock; -import org.mockito.Mockito; import org.mockito.MockitoAnnotations; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -92,6 +91,8 @@ public class Http2ConnectionRoundtripTest { @Before public void setup() throws Exception { MockitoAnnotations.initMocks(this); + mockFlowControl(clientListener); + mockFlowControl(serverListener); } @After @@ -248,19 +249,20 @@ public class Http2ConnectionRoundtripTest { final int length = 10485760; // 10MB // Create a buffer filled with random bytes. - final byte[] bytes = new byte[length]; - new Random().nextBytes(bytes); - final ByteBuf data = Unpooled.wrappedBuffer(bytes); + final ByteBuf data = randomBytes(length); final ByteArrayOutputStream out = new ByteArrayOutputStream(length); - doAnswer(new Answer() { + doAnswer(new Answer() { @Override - public Void answer(InvocationOnMock in) throws Throwable { + public Integer answer(InvocationOnMock in) throws Throwable { ByteBuf buf = (ByteBuf) in.getArguments()[2]; + int padding = (Integer) in.getArguments()[3]; + int processedBytes = buf.readableBytes() + padding; + buf.readBytes(out, buf.readableBytes()); - return null; + return processedBytes; } }).when(serverListener).onDataRead(any(ChannelHandlerContext.class), eq(3), - any(ByteBuf.class), eq(0), Mockito.anyBoolean()); + any(ByteBuf.class), eq(0), anyBoolean()); try { // Initialize the data latch based on the number of bytes expected. bootstrapEnv(length, 2, 1); @@ -292,7 +294,7 @@ public class Http2ConnectionRoundtripTest { assertEquals(0, dataLatch.getCount()); out.flush(); byte[] received = out.toByteArray(); - assertArrayEquals(bytes, received); + assertArrayEquals(data.array(), received); } finally { data.release(); out.close(); @@ -302,62 +304,80 @@ public class Http2ConnectionRoundtripTest { @Test public void stressTest() throws Exception { final Http2Headers headers = dummyHeaders(); - final String text = "hello world"; final String pingMsg = "12345678"; - final ByteBuf data = Unpooled.copiedBuffer(text, UTF_8); + int length = 10; + final ByteBuf data = randomBytes(length); + final String dataAsHex = ByteBufUtil.hexDump(data); final ByteBuf pingData = Unpooled.copiedBuffer(pingMsg, UTF_8); - final int numStreams = 5000; - final List receivedPingBuffers = Collections.synchronizedList(new ArrayList(numStreams)); + final int numStreams = 2000; + + // Collect all the ping buffers as we receive them at the server. + final String[] receivedPings = new String[numStreams]; doAnswer(new Answer() { + int nextIndex; + @Override public Void answer(InvocationOnMock in) throws Throwable { - receivedPingBuffers.add(((ByteBuf) in.getArguments()[1]).toString(UTF_8)); + receivedPings[nextIndex++] = ((ByteBuf) in.getArguments()[1]).toString(UTF_8); return null; } - }).when(serverListener).onPingRead(any(ChannelHandlerContext.class), eq(pingData)); - final List receivedDataBuffers = Collections.synchronizedList(new ArrayList(numStreams)); - doAnswer(new Answer() { + }).when(serverListener).onPingRead(any(ChannelHandlerContext.class), any(ByteBuf.class)); + + // Collect all the data buffers as we receive them at the server. + final StringBuilder[] receivedData = new StringBuilder[numStreams]; + doAnswer(new Answer() { @Override - public Void answer(InvocationOnMock in) throws Throwable { - receivedDataBuffers.add(((ByteBuf) in.getArguments()[2]).toString(UTF_8)); - return null; + public Integer answer(InvocationOnMock in) throws Throwable { + int streamId = (Integer) in.getArguments()[1]; + ByteBuf buf = (ByteBuf) in.getArguments()[2]; + int padding = (Integer) in.getArguments()[3]; + int processedBytes = buf.readableBytes() + padding; + + int streamIndex = (streamId - 3) / 2; + StringBuilder builder = receivedData[streamIndex]; + if (builder == null) { + builder = new StringBuilder(dataAsHex.length()); + receivedData[streamIndex] = builder; + } + builder.append(ByteBufUtil.hexDump(buf)); + return processedBytes; } - }).when(serverListener).onDataRead(any(ChannelHandlerContext.class), anyInt(), eq(data), - eq(0), eq(false)); + }).when(serverListener).onDataRead(any(ChannelHandlerContext.class), anyInt(), + any(ByteBuf.class), anyInt(), anyBoolean()); try { - bootstrapEnv(numStreams * text.length(), numStreams * 4, numStreams); + bootstrapEnv(numStreams * length, numStreams * 4, numStreams); runInChannel(clientChannel, new Http2Runnable() { @Override public void run() { - for (int i = 0, nextStream = 3; i < numStreams; ++i, nextStream += 2) { - http2Client.encoder().writeHeaders(ctx(), nextStream, headers, 0, - (short) 16, false, 0, false, newPromise()); + int upperLimit = 3 + 2 * numStreams; + for (int streamId = 3; streamId < upperLimit; streamId += 2) { + // Send a bunch of data on each stream. + http2Client.encoder().writeHeaders(ctx(), streamId, headers, 0, (short) 16, + false, 0, false, newPromise()); http2Client.encoder().writePing(ctx(), false, pingData.slice().retain(), newPromise()); - http2Client.encoder().writeData(ctx(), nextStream, data.slice().retain(), - 0, false, newPromise()); + http2Client.encoder().writeData(ctx(), streamId, data.slice().retain(), 0, + false, newPromise()); // Write trailers. - http2Client.encoder().writeHeaders(ctx(), nextStream, headers, 0, - (short) 16, false, 0, true, newPromise()); + http2Client.encoder().writeHeaders(ctx(), streamId, headers, 0, (short) 16, + false, 0, true, newPromise()); } } }); // Wait for all frames to be received. - assertTrue(trailersLatch.await(30, SECONDS)); + assertTrue(trailersLatch.await(60, SECONDS)); verify(serverListener, times(numStreams)).onHeadersRead(any(ChannelHandlerContext.class), anyInt(), eq(headers), eq(0), eq((short) 16), eq(false), eq(0), eq(false)); verify(serverListener, times(numStreams)).onHeadersRead(any(ChannelHandlerContext.class), anyInt(), eq(headers), eq(0), eq((short) 16), eq(false), eq(0), eq(true)); verify(serverListener, times(numStreams)).onPingRead(any(ChannelHandlerContext.class), any(ByteBuf.class)); - verify(serverListener, times(numStreams)).onDataRead(any(ChannelHandlerContext.class), - anyInt(), any(ByteBuf.class), eq(0), eq(false)); - assertEquals(numStreams, receivedPingBuffers.size()); - assertEquals(numStreams, receivedDataBuffers.size()); - for (String receivedData : receivedDataBuffers) { - assertEquals(text, receivedData); + verify(serverListener, never()).onDataRead(any(ChannelHandlerContext.class), + anyInt(), any(ByteBuf.class), eq(0), eq(true)); + for (StringBuilder builder : receivedData) { + assertEquals(dataAsHex, builder.toString()); } - for (String receivedPing : receivedPingBuffers) { + for (String receivedPing : receivedPings) { assertEquals(pingMsg, receivedPing); } } finally { @@ -419,4 +439,27 @@ public class Http2ConnectionRoundtripTest { return new DefaultHttp2Headers().method(as("GET")).scheme(as("https")) .authority(as("example.org")).path(as("/some/path/resource2")).add(randomString(), randomString()); } + + private void mockFlowControl(Http2FrameListener listener) throws Http2Exception { + doAnswer(new Answer() { + @Override + public Integer answer(InvocationOnMock invocation) throws Throwable { + ByteBuf buf = (ByteBuf) invocation.getArguments()[2]; + int padding = (Integer) invocation.getArguments()[3]; + int processedBytes = buf.readableBytes() + padding; + return processedBytes; + } + + }).when(listener).onDataRead(any(ChannelHandlerContext.class), anyInt(), + any(ByteBuf.class), anyInt(), anyBoolean()); + } + + /** + * Creates a {@link ByteBuf} of the given length, filled with random bytes. + */ + private static ByteBuf randomBytes(int length) { + final byte[] bytes = new byte[length]; + new Random().nextBytes(bytes); + return Unpooled.wrappedBuffer(bytes); + } } diff --git a/common/src/main/java/io/netty/util/collection/IntObjectHashMap.java b/common/src/main/java/io/netty/util/collection/IntObjectHashMap.java index bc92e06287..6b27616ee0 100644 --- a/common/src/main/java/io/netty/util/collection/IntObjectHashMap.java +++ b/common/src/main/java/io/netty/util/collection/IntObjectHashMap.java @@ -15,8 +15,9 @@ package io.netty.util.collection; -import java.lang.reflect.Array; +import java.util.AbstractCollection; import java.util.Arrays; +import java.util.Collection; import java.util.Iterator; import java.util.NoSuchElementException; @@ -221,16 +222,34 @@ public class IntObjectHashMap implements IntObjectMap, Iterable clazz) { - @SuppressWarnings("unchecked") - V[] outValues = (V[]) Array.newInstance(clazz, size()); - int targetIx = 0; - for (int i = 0; i < values.length; ++i) { - if (values[i] != null) { - outValues[targetIx++] = values[i]; + public Collection values() { + return new AbstractCollection() { + @Override + public Iterator iterator() { + return new Iterator() { + final Iterator> iter = IntObjectHashMap.this.iterator(); + @Override + public boolean hasNext() { + return iter.hasNext(); + } + + @Override + public V next() { + return iter.next().value(); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; } - } - return outValues; + + @Override + public int size() { + return size; + } + }; } @Override diff --git a/common/src/main/java/io/netty/util/collection/IntObjectMap.java b/common/src/main/java/io/netty/util/collection/IntObjectMap.java index ae1c9629b6..85ec62c6b6 100644 --- a/common/src/main/java/io/netty/util/collection/IntObjectMap.java +++ b/common/src/main/java/io/netty/util/collection/IntObjectMap.java @@ -14,6 +14,8 @@ */ package io.netty.util.collection; +import java.util.Collection; + /** * Interface for a primitive map that uses {@code int}s as keys. * @@ -112,5 +114,5 @@ public interface IntObjectMap { /** * Gets the values contained in this map. */ - V[] values(Class clazz); + Collection values(); } diff --git a/common/src/main/java/io/netty/util/collection/PrimitiveCollections.java b/common/src/main/java/io/netty/util/collection/PrimitiveCollections.java index c7a27ee7be..90766deec4 100644 --- a/common/src/main/java/io/netty/util/collection/PrimitiveCollections.java +++ b/common/src/main/java/io/netty/util/collection/PrimitiveCollections.java @@ -16,6 +16,7 @@ package io.netty.util.collection; import io.netty.util.internal.EmptyArrays; +import java.util.Collection; import java.util.Collections; import java.util.Iterator; import java.util.NoSuchElementException; @@ -106,8 +107,8 @@ public final class PrimitiveCollections { } @Override - public Object[] values(Class clazz) { - return EmptyArrays.EMPTY_OBJECTS; + public Collection values() { + return Collections.emptyList(); } } @@ -185,8 +186,8 @@ public final class PrimitiveCollections { } @Override - public V[] values(Class clazz) { - return map.values(clazz); + public Collection values() { + return map.values(); } /** diff --git a/common/src/test/java/io/netty/util/collection/IntObjectHashMapTest.java b/common/src/test/java/io/netty/util/collection/IntObjectHashMapTest.java index 0f5de5811b..c9520c7c60 100644 --- a/common/src/test/java/io/netty/util/collection/IntObjectHashMapTest.java +++ b/common/src/test/java/io/netty/util/collection/IntObjectHashMapTest.java @@ -24,6 +24,7 @@ import org.junit.Before; import org.junit.Test; import java.util.Arrays; +import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.Random; @@ -280,8 +281,8 @@ public class IntObjectHashMapTest { map.put(4, new Value("v4")); map.remove(4); - Value[] values = map.values(Value.class); - assertEquals(3, values.length); + Collection values = map.values(); + assertEquals(3, values.size()); Set expected = new HashSet(); expected.add(v1);