Fix bug in HTTP/2 outbound flow control

Motivation:

The outbound flow controller logic does not properly reset the allocated
bytes between successive invocations of the priority algorithm.

Modifications:

Updated the priority algorithm to reset the allocated bytes for each
stream.

Result:

Each call to the priority algorithm now starts with zero allocated bytes
for each stream.
This commit is contained in:
nmittler 2014-11-09 16:53:35 -08:00
parent d220afa885
commit f23f3b9617
11 changed files with 485 additions and 318 deletions

View File

@ -35,7 +35,6 @@ import io.netty.util.collection.IntObjectHashMap;
import io.netty.util.collection.IntObjectMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
@ -388,8 +387,7 @@ public class DefaultHttp2Connection implements Http2Connection {
@Override
public final Collection<? extends Http2Stream> children() {
DefaultStream[] childrenArray = children.values(DefaultStream.class);
return Arrays.asList(childrenArray);
return children.values();
}
@Override
@ -421,10 +419,10 @@ public class DefaultHttp2Connection implements Http2Connection {
if (newParent != parent() || exclusive) {
List<ParentChangedEvent> events = null;
if (newParent.isDescendantOf(this)) {
events = new ArrayList<ParentChangedEvent>(2 + (exclusive ? newParent.children().size() : 0));
events = new ArrayList<ParentChangedEvent>(2 + (exclusive ? newParent.numChildren(): 0));
parent.takeChild(newParent, false, events);
} else {
events = new ArrayList<ParentChangedEvent>(1 + (exclusive ? newParent.children().size() : 0));
events = new ArrayList<ParentChangedEvent>(1 + (exclusive ? newParent.numChildren() : 0));
}
newParent.takeChild(this, exclusive, events);
notifyParentChanged(events);
@ -563,7 +561,7 @@ public class DefaultHttp2Connection implements Http2Connection {
// move any previous children to the child node, becoming grand children
// of this node.
if (!children.isEmpty()) {
for (DefaultStream grandchild : removeAllChildren().values(DefaultStream.class)) {
for (DefaultStream grandchild : removeAllChildren().values()) {
child.takeChild(grandchild, false, events);
}
}
@ -590,7 +588,7 @@ public class DefaultHttp2Connection implements Http2Connection {
totalChildWeights -= child.weight();
// Move up any grand children to be directly dependent on this node.
for (DefaultStream grandchild : child.children.values(DefaultStream.class)) {
for (DefaultStream grandchild : child.children.values()) {
takeChild(grandchild, false, events);
}

View File

@ -22,7 +22,6 @@ import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.channel.ChannelPromiseAggregator;
import java.util.ArrayDeque;
@ -268,12 +267,6 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
// There were previous DATA frames sent. We need to send the HEADERS only after the most
// recent DATA frame to keep them in sync...
// Wrap the original promise in an aggregate which will complete the original promise
// once the headers are written.
final ChannelPromiseAggregator aggregatePromise = new ChannelPromiseAggregator(promise);
final ChannelPromise innerPromise = ctx.newPromise();
aggregatePromise.add(innerPromise);
// Only write the HEADERS frame after the previous DATA frame has been written.
final Http2Stream theStream = stream;
lastDataWrite.addListener(new ChannelFutureListener() {
@ -281,13 +274,13 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
// The DATA write failed, also fail this write.
innerPromise.setFailure(future.cause());
promise.setFailure(future.cause());
return;
}
// Perform the write.
writeHeaders(ctx, theStream, headers, streamDependency, weight, exclusive, padding,
endOfStream, innerPromise);
endOfStream, promise);
}
});

View File

@ -33,6 +33,12 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro
*/
public static final double DEFAULT_WINDOW_UPDATE_RATIO = 0.5;
/**
* The default maximum connection size used as a limit when the number of active streams is
* large. Set to 2 MiB.
*/
public static final int DEFAULT_MAX_CONNECTION_WINDOW_SIZE = 1048576 * 2;
/**
* A value for the window update ratio to be use in order to disable window updates for
* a stream (i.e. {@code 0}).
@ -41,6 +47,7 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro
private final Http2Connection connection;
private final Http2FrameWriter frameWriter;
private int maxConnectionWindowSize = DEFAULT_MAX_CONNECTION_WINDOW_SIZE;
private int initialWindowSize = DEFAULT_WINDOW_SIZE;
public DefaultHttp2InboundFlowController(Http2Connection connection, Http2FrameWriter frameWriter) {
@ -59,6 +66,14 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro
});
}
public DefaultHttp2InboundFlowController setMaxConnectionWindowSize(int maxConnectionWindowSize) {
if (maxConnectionWindowSize <= 0) {
throw new IllegalArgumentException("maxConnectionWindowSize must be > 0");
}
this.maxConnectionWindowSize = maxConnectionWindowSize;
return this;
}
@Override
public void initialInboundWindowSize(int newWindowSize) throws Http2Exception {
int deltaWindowSize = newWindowSize - initialWindowSize;
@ -114,7 +129,6 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro
int dataLength = data.readableBytes() + padding;
boolean windowUpdateSent = false;
try {
// Apply the connection-level flow control.
windowUpdateSent = applyConnectionFlowControl(ctx, dataLength);
// Apply the stream-level flow control.
@ -214,6 +228,25 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro
windowUpdateRatio = ratio;
}
/**
* Returns the initial size of this window.
*/
int initialWindowSize() {
int maxWindowSize = initialWindowSize;
if (streamId == CONNECTION_STREAM_ID) {
// Determine the maximum number of streams that we can allow without integer overflow
// of maxWindowSize * numStreams. Also take care to avoid division by zero when
// maxWindowSize == 0.
int maxNumStreams = Integer.MAX_VALUE;
if (maxWindowSize > 0) {
maxNumStreams /= maxWindowSize;
}
int numStreams = Math.min(maxNumStreams, Math.max(1, connection.numActiveStreams()));
maxWindowSize = Math.min(maxConnectionWindowSize, maxWindowSize * numStreams);
}
return maxWindowSize;
}
/**
* Updates the flow control window for this stream if it is appropriate.
*
@ -224,7 +257,7 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro
return false;
}
int threshold = (int) (initialWindowSize * windowUpdateRatio);
int threshold = (int) (initialWindowSize() * windowUpdateRatio);
if (window <= threshold) {
updateWindow(ctx);
return true;
@ -290,10 +323,8 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro
*/
void updateWindow(ChannelHandlerContext ctx) throws Http2Exception {
// Expand the window for this stream back to the size of the initial window.
int deltaWindowSize = initialWindowSize - window;
int deltaWindowSize = initialWindowSize() - window;
addAndGet(deltaWindowSize);
// Send a window update for the stream/connection.
frameWriter.writeWindowUpdate(ctx, streamId, deltaWindowSize, ctx.newPromise());
}
}

View File

@ -25,37 +25,35 @@ import static java.lang.Math.max;
import static java.lang.Math.min;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.channel.ChannelPromiseAggregator;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Basic implementation of {@link Http2OutboundFlowController}.
*/
public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowController {
/**
* A comparators that sorts priority nodes in ascending order by the amount of priority data available for its
* subtree.
* A {@link Comparator} that sorts streams in ascending order the amount of streamable data.
*/
private static final Comparator<Http2Stream> DATA_WEIGHT = new Comparator<Http2Stream>() {
private static final Comparator<Http2Stream> DATA_ORDER = new Comparator<Http2Stream>() {
@Override
public int compare(Http2Stream o1, Http2Stream o2) {
final long result = ((long) state(o1).priorityBytes()) * o1.weight() -
((long) state(o2).priorityBytes()) * o2.weight();
return result > 0 ? 1 : (result < 0 ? -1 : 0);
return state(o1).streamableBytesForTree() - state(o2).streamableBytesForTree();
}
};
private final Http2Connection connection;
private final Http2FrameWriter frameWriter;
private int initialWindowSize = DEFAULT_WINDOW_SIZE;
private boolean frameSent;
private ChannelHandlerContext ctx;
public DefaultHttp2OutboundFlowController(Http2Connection connection, Http2FrameWriter frameWriter) {
@ -93,7 +91,7 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
public void priorityTreeParentChanged(Http2Stream stream, Http2Stream oldParent) {
Http2Stream parent = stream.parent();
if (parent != null) {
state(parent).incrementPriorityBytes(state(stream).priorityBytes());
state(parent).incrementStreamableBytesForTree(state(stream).streamableBytesForTree());
}
}
@ -101,7 +99,7 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
public void priorityTreeParentChanging(Http2Stream stream, Http2Stream newParent) {
Http2Stream parent = stream.parent();
if (parent != null) {
state(parent).incrementPriorityBytes(-state(stream).priorityBytes());
state(parent).incrementStreamableBytesForTree(-state(stream).streamableBytesForTree());
}
}
});
@ -134,10 +132,6 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
@Override
public void updateOutboundWindowSize(int streamId, int delta) throws Http2Exception {
if (delta <= 0) {
throw new IllegalArgumentException("delta must be > 0");
}
if (streamId == CONNECTION_STREAM_ID) {
// Update the connection window and write any pending frames for all streams.
connectionState().incrementStreamWindow(delta);
@ -252,131 +246,108 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
* Writes as many pending bytes as possible, according to stream priority.
*/
private void writePendingBytes() throws Http2Exception {
// Recursively write as many of the total writable bytes as possible.
frameSent = false;
Http2Stream connectionStream = connection.connectionStream();
int totalAllowance = state(connectionStream).priorityBytes();
writeAllowedBytes(connectionStream, totalAllowance);
OutboundFlowState connectionState = state(connectionStream);
int connectionWindow = Math.max(0, connectionState.window());
// Optimization: only flush once for all written frames. If it's null, there are no
// data frames to send anyway.
flush();
// Allocate the bytes for the entire priority tree.
allocateBytesForTree(connectionStream, connectionWindow);
// Perform the write of the allocated bytes for each stream.
for (Http2Stream stream : connection.activeStreams()) {
OutboundFlowState state = state(stream);
// The allocated bytes are for the entire sub-tree but the write will be limited
// by the number of pending bytes for the stream.
state.writeBytes(state.allocatedBytesForTree());
state.resetAllocatedBytesForTree();
}
connectionState.resetAllocatedBytesForTree();
// Only flush once for all written frames.
if (frameSent) {
flush();
}
}
/**
* Recursively traverses the priority tree rooted at the given node. Attempts to write the allowed bytes for the
* streams in this sub tree based on their weighted priorities.
* Allocates as many bytes as possible for the given tree within the provided connection window.
*
* @param allowance
* an allowed number of bytes that may be written to the streams in this subtree
* @param stream the tree for which the given bytes are to be allocated.
* @param connectionWindow the connection window that acts as an upper bound on the total number
* of bytes that can be allocated for the tree.
* @return the total number of bytes actually allocated for this subtree.
*/
private void writeAllowedBytes(Http2Stream stream, int allowance) throws Http2Exception {
// Write the allowed bytes for this node. If not all of the allowance was used,
// restore what's left so that it can be propagated to future nodes.
private int allocateBytesForTree(Http2Stream stream, int connectionWindow) {
OutboundFlowState state = state(stream);
int bytesWritten = state.writeBytes(allowance);
allowance -= bytesWritten;
connectionWindow = min(connectionWindow, state.unallocatedBytesForTree());
if (allowance <= 0 || stream.isLeaf()) {
// Nothing left to do in this sub tree.
return;
// Determine the amount of bytes to allocate for 'this' stream.
int streamable = Math.max(0, state.streamableBytes() - state.allocatedBytesForTree());
int totalAllocated = min(connectionWindow, streamable);
connectionWindow -= totalAllocated;
int remainingInTree = state.streamableBytesForTree() - totalAllocated;
if (stream.isLeaf() || remainingInTree <= 0 || connectionWindow <= 0) {
// Nothing left to do in this subtree.
state.allocateBytesForTree(totalAllocated);
return totalAllocated;
}
// Clip the remaining connection flow control window by the allowance.
int remainingWindow = min(allowance, connectionWindow());
// The total number of unallocated bytes from the children of this node.
int unallocatedBytes = state.priorityBytes() - state.streamableBytes();
// Optimization. If the window is big enough to fit all the data. Just write everything
// If the window is big enough to fit all the remaining data. Just write everything
// and skip the priority algorithm.
if (unallocatedBytes <= remainingWindow) {
if (remainingInTree <= connectionWindow) {
for (Http2Stream child : stream.children()) {
writeAllowedBytes(child, state(child).unallocatedPriorityBytes());
int writtenToChild = allocateBytesForTree(child, connectionWindow);
totalAllocated += writtenToChild;
connectionWindow -= writtenToChild;
}
return;
state.allocateBytesForTree(totalAllocated);
return totalAllocated;
}
// Copy and sort the children of this node. They are sorted in ascending order the total
// priority bytes for the subtree scaled by the weight of the node. The algorithm gives
// preference to nodes that appear later in the list, since the weight of each node
// increases in value as the list is iterated. This means that with this node ordering,
// the most bytes will be written to those nodes with the largest aggregate number of
// bytes and the highest priority.
List<Http2Stream> states = new ArrayList<Http2Stream>(stream.children());
Collections.sort(states, DATA_WEIGHT);
Http2Stream[] children = stream.children().toArray(new Http2Stream[0]);
Arrays.sort(children, DATA_ORDER);
// Iterate over the children and spread the remaining bytes across them as is appropriate
// based on the weights. This algorithm loops over all of the children more than once,
// although it should typically only take a few passes to complete. In each pass we
// give a node its share of the current remaining bytes. The node's weight and bytes
// allocated are then decremented from the totals, so that the subsequent
// nodes split the difference. If after being processed, a node still has writable data,
// it is added back to the queue for further processing in the next pass.
int remainingWeight = stream.totalChildWeights();
int nextTail = 0;
int unallocatedBytesForNextPass = 0;
int remainingWeightForNextPass = 0;
for (int head = 0, tail = states.size();; ++head) {
if (head >= tail) {
// We've reached the end one pass of the nodes. Reset the totals based on
// the nodes that were re-added to the deque since they still have data available.
unallocatedBytes = unallocatedBytesForNextPass;
remainingWeight = remainingWeightForNextPass;
unallocatedBytesForNextPass = 0;
remainingWeightForNextPass = 0;
head = 0;
tail = nextTail;
nextTail = 0;
}
// Clip the total remaining bytes by the connection window.
int totalWeight = stream.totalChildWeights();
int tail = children.length;
// Outer loop: continue until we've exhausted the connection window or allocated all bytes in the tree.
while (tail > 0 && connectionWindow > 0) {
int tailNextPass = 0;
int totalWeightNextPass = 0;
// Get the next state, or break if nothing to do.
if (head >= tail) {
break;
}
Http2Stream next = states.get(head);
OutboundFlowState nextState = state(next);
int weight = next.weight();
// Inner loop: allocate bytes to the children based on their weight.
for (int index = 0; index < tail && connectionWindow > 0; ++index) {
Http2Stream child = children[index];
OutboundFlowState childState = state(child);
// Determine the value (in bytes) of a single unit of weight.
double dataToWeightRatio = min(unallocatedBytes, remainingWindow) / (double) remainingWeight;
unallocatedBytes -= nextState.unallocatedPriorityBytes();
remainingWeight -= weight;
// Determine the ratio of this stream to all children.
int weight = child.weight();
double weightRatio = weight / (double) totalWeight;
if (dataToWeightRatio > 0.0 && nextState.unallocatedPriorityBytes() > 0) {
int windowSlice = Math.max(1, (int) Math.round(connectionWindow * weightRatio));
// Determine the portion of the current writable data that is assigned to this
// node.
int writableChunk = (int) (weight * dataToWeightRatio);
// Allocate the bytes for this child.
int allocated = allocateBytesForTree(child, windowSlice);
// Clip the chunk allocated by the total amount of unallocated data remaining in
// the node.
int allocatedChunk = min(writableChunk, nextState.unallocatedPriorityBytes());
totalAllocated += allocated;
connectionWindow -= allocated;
totalWeight -= weight;
// Update the remaining connection window size.
remainingWindow -= allocatedChunk;
// Mark these bytes as allocated.
nextState.allocatePriorityBytes(allocatedChunk);
if (nextState.unallocatedPriorityBytes() > 0) {
// There is still data remaining for this stream. Add it back to the queue
// for the next pass.
unallocatedBytesForNextPass += nextState.unallocatedPriorityBytes();
remainingWeightForNextPass += weight;
states.set(nextTail++, next);
continue;
if (childState.unallocatedBytesForTree() > 0) {
// This stream still has more data, add it to the next pass.
children[tailNextPass++] = child;
totalWeightNextPass += weight;
}
}
if (nextState.allocatedPriorityBytes() > 0) {
// Write the allocated data for this stream.
writeAllowedBytes(next, nextState.allocatedPriorityBytes());
// We're done with this node. Remark all bytes as unallocated for future
// invocations.
nextState.allocatePriorityBytes(0);
}
totalWeight = totalWeightNextPass;
tail = tailNextPass;
}
state.allocateBytesForTree(totalAllocated);
return totalAllocated;
}
/**
@ -387,8 +358,8 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
private final Http2Stream stream;
private int window = initialWindowSize;
private int pendingBytes;
private int priorityBytes;
private int allocatedPriorityBytes;
private int streamableBytesForTree;
private int allocatedBytesForTree;
private ChannelFuture lastNewFrame;
private OutboundFlowState(Http2Stream stream) {
@ -401,6 +372,36 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
return window;
}
/**
* Increments the number of bytes allocated to this tree by the priority algorithm.
*/
private void allocateBytesForTree(int bytes) {
allocatedBytesForTree += bytes;
}
/**
* Gets the number of bytes that have been allocated to this tree by the priority algorithm.
*/
private int allocatedBytesForTree() {
return allocatedBytesForTree;
}
/**
* Gets the number of unallocated bytes (i.e. {@link #streamableBytesForTree()} -
* {@link #allocatedBytesForTree()}).
*/
private int unallocatedBytesForTree() {
return streamableBytesForTree - allocatedBytesForTree;
}
/**
* Resets the number of bytes allocated to this stream. This is called at the end of the priority
* algorithm for each stream to reset the count for the next invocation.
*/
private void resetAllocatedBytesForTree() {
allocatedBytesForTree = 0;
}
/**
* Increments the flow control window for this stream by the given delta and returns the new value.
*/
@ -414,7 +415,8 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
// Update this branch of the priority tree if the streamable bytes have changed for this
// node.
incrementPriorityBytes(streamableBytes() - previouslyStreamable);
int streamableDelta = streamableBytes() - previouslyStreamable;
incrementStreamableBytesForTree(streamableDelta);
return window;
}
@ -442,41 +444,17 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
return max(0, min(pendingBytes, window));
}
/**
* The aggregate total of all {@link #streamableBytes()} for subtree rooted at this node.
*/
int priorityBytes() {
return priorityBytes;
}
/**
* Used by the priority algorithm to allocate bytes to this stream.
*/
private void allocatePriorityBytes(int bytes) {
allocatedPriorityBytes += bytes;
}
/**
* Used by the priority algorithm to get the intermediate allocation of bytes to this stream.
*/
int allocatedPriorityBytes() {
return allocatedPriorityBytes;
}
/**
* Used by the priority algorithm to determine the number of writable bytes that have not yet been allocated.
*/
private int unallocatedPriorityBytes() {
return priorityBytes - allocatedPriorityBytes;
int streamableBytesForTree() {
return streamableBytesForTree;
}
/**
* Creates a new frame with the given values but does not add it to the pending queue.
*/
private Frame newFrame(ChannelPromise promise, ByteBuf data, int padding, boolean endStream) {
private Frame newFrame(final ChannelPromise promise, ByteBuf data, int padding, boolean endStream) {
// Store this as the future for the most recent write attempt.
lastNewFrame = promise;
return new Frame(new ChannelPromiseAggregator(promise), data, padding, endStream);
return new Frame(new SimplePromiseAggregator(promise), data, padding, endStream);
}
/**
@ -489,7 +467,7 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
/**
* Returns the the head of the pending queue, or {@code null} if empty.
*/
Frame peek() {
private Frame peek() {
return pendingWriteQueue.peek();
}
@ -512,7 +490,7 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
* the number of pending writes available, or because a frame does not support splitting on arbitrary
* boundaries.
*/
private int writeBytes(int bytes) throws Http2Exception {
private int writeBytes(int bytes) {
int bytesWritten = 0;
if (!stream.localSideOpen()) {
return bytesWritten;
@ -544,13 +522,14 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
}
/**
* Recursively increments the priority bytes for this branch in the priority tree starting at the current node.
* Recursively increments the streamable bytes for this branch in the priority tree starting
* at the current node.
*/
private void incrementPriorityBytes(int numBytes) {
private void incrementStreamableBytesForTree(int numBytes) {
if (numBytes != 0) {
priorityBytes += numBytes;
streamableBytesForTree += numBytes;
if (!stream.isRoot()) {
state(stream.parent()).incrementPriorityBytes(numBytes);
state(stream.parent()).incrementStreamableBytesForTree(numBytes);
}
}
}
@ -561,12 +540,12 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
private final class Frame {
final ByteBuf data;
final boolean endStream;
final ChannelPromiseAggregator promiseAggregator;
final SimplePromiseAggregator promiseAggregator;
final ChannelPromise promise;
int padding;
boolean enqueued;
Frame(ChannelPromiseAggregator promiseAggregator, ByteBuf data, int padding,
Frame(SimplePromiseAggregator promiseAggregator, ByteBuf data, int padding,
boolean endStream) {
this.data = data;
this.padding = padding;
@ -594,8 +573,9 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
}
/**
* Increments the number of pending bytes for this node. If there was any change to the number of bytes that
* fit into the stream window, then {@link #incrementPriorityBytes} to recursively update this branch of the
* Increments the number of pending bytes for this node. If there was any change to the
* number of bytes that fit into the stream window, then
* {@link #incrementStreamableBytesForTree} to recursively update this branch of the
* priority tree.
*/
private void incrementPendingBytes(int numBytes) {
@ -603,7 +583,7 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
pendingBytes += numBytes;
int delta = streamableBytes() - previouslyStreamable;
incrementPriorityBytes(delta);
incrementStreamableBytesForTree(delta);
}
/**
@ -613,7 +593,7 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
* <p>
* Note: this does not flush the {@link ChannelHandlerContext}.
*/
void write() throws Http2Exception {
void write() {
// Using a do/while loop because if the buffer is empty we still need to call
// the writer once to send the empty frame.
final Http2FrameSizePolicy frameSizePolicy = frameWriter.configuration().frameSizePolicy();
@ -622,9 +602,15 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
int frameBytes = Math.min(bytesToWrite, frameSizePolicy.maxFrameSize());
if (frameBytes == bytesToWrite) {
// All the bytes fit into a single HTTP/2 frame, just send it all.
connectionState().incrementStreamWindow(-bytesToWrite);
incrementStreamWindow(-bytesToWrite);
try {
connectionState().incrementStreamWindow(-bytesToWrite);
incrementStreamWindow(-bytesToWrite);
} catch (Http2Exception e) {
// Should never get here since we're decrementing.
throw new AssertionError("Invalid window state when writing frame: " + e.getMessage());
}
frameWriter.writeData(ctx, stream.id(), data, padding, endStream, promise);
frameSent = true;
decrementPendingBytes(bytesToWrite);
if (enqueued) {
// It's enqueued - remove it from the head of the pending write queue.
@ -659,8 +645,6 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
* @return the partial frame.
*/
Frame split(int maxBytes) {
// TODO: Should padding be spread across chunks or only at the end?
// The requested maxBytes should always be less than the size of this frame.
assert maxBytes < size() : "Attempting to split a frame for the full size.";
@ -690,4 +674,33 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
}
}
}
/**
* Lightweight promise aggregator.
*/
private class SimplePromiseAggregator {
final ChannelPromise promise;
final AtomicInteger awaiting = new AtomicInteger();
final ChannelFutureListener listener = new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
promise.tryFailure(future.cause());
} else {
if (awaiting.decrementAndGet() == 0) {
promise.trySuccess();
}
}
}
};
SimplePromiseAggregator(ChannelPromise promise) {
this.promise = promise;
}
void add(ChannelPromise promise) {
awaiting.incrementAndGet();
promise.addListener(listener);
}
}
}

View File

@ -18,6 +18,7 @@ package io.netty.handler.codec.http2;
import static io.netty.handler.codec.http2.DefaultHttp2InboundFlowController.WINDOW_UPDATE_OFF;
import static io.netty.handler.codec.http2.Http2CodecUtil.CONNECTION_STREAM_ID;
import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_WINDOW_SIZE;
import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.eq;
@ -71,14 +72,14 @@ public class DefaultHttp2InboundFlowControllerTest {
@Test
public void dataFrameShouldBeAccepted() throws Http2Exception {
applyFlowControl(10, 0, false);
applyFlowControl(STREAM_ID, 10, 0, false);
verifyWindowUpdateNotSent();
}
@Test(expected = Http2Exception.class)
public void connectionFlowControlExceededShouldThrow() throws Http2Exception {
// Window exceeded because of the padding.
applyFlowControl(DEFAULT_WINDOW_SIZE, 1, true);
applyFlowControl(STREAM_ID, DEFAULT_WINDOW_SIZE, 1, true);
}
@Test
@ -88,7 +89,7 @@ public class DefaultHttp2InboundFlowControllerTest {
int windowDelta = DEFAULT_WINDOW_SIZE - newWindow;
// Set end-of-stream on the frame, so no window update will be sent for the stream.
applyFlowControl(dataSize, 0, true);
applyFlowControl(STREAM_ID, dataSize, 0, true);
verifyWindowUpdateSent(CONNECTION_STREAM_ID, windowDelta);
}
@ -98,7 +99,7 @@ public class DefaultHttp2InboundFlowControllerTest {
int dataSize = DEFAULT_WINDOW_SIZE / 2 + 1;
// Set end-of-stream on the frame, so no window update will be sent for the stream.
applyFlowControl(dataSize, 0, true);
applyFlowControl(STREAM_ID, dataSize, 0, true);
verifyWindowUpdateNotSent();
}
@ -109,7 +110,7 @@ public class DefaultHttp2InboundFlowControllerTest {
int windowDelta = getWindowDelta(initialWindowSize, initialWindowSize, dataSize);
// Don't set end-of-stream so we'll get a window update for the stream as well.
applyFlowControl(dataSize, 0, false);
applyFlowControl(STREAM_ID, dataSize, 0, false);
verifyWindowUpdateSent(CONNECTION_STREAM_ID, windowDelta);
verifyWindowUpdateSent(STREAM_ID, windowDelta);
}
@ -122,7 +123,7 @@ public class DefaultHttp2InboundFlowControllerTest {
int windowDelta = getWindowDelta(initialWindowSize, initialWindowSize, dataSize);
// Don't set end-of-stream so we'll get a window update for the stream as well.
applyFlowControl(dataSize, 0, false);
applyFlowControl(STREAM_ID, dataSize, 0, false);
verifyWindowUpdateSent(CONNECTION_STREAM_ID, windowDelta);
verifyWindowUpdateNotSent(STREAM_ID);
}
@ -131,7 +132,7 @@ public class DefaultHttp2InboundFlowControllerTest {
public void initialWindowUpdateShouldAllowMoreFrames() throws Http2Exception {
// Send a frame that takes up the entire window.
int initialWindowSize = DEFAULT_WINDOW_SIZE;
applyFlowControl(initialWindowSize, 0, false);
applyFlowControl(STREAM_ID, initialWindowSize, 0, false);
// Update the initial window size to allow another frame.
int newInitialWindowSize = 2 * initialWindowSize;
@ -141,21 +142,66 @@ public class DefaultHttp2InboundFlowControllerTest {
reset(frameWriter);
// Send the next frame and verify that the expected window updates were sent.
applyFlowControl(initialWindowSize, 0, false);
applyFlowControl(STREAM_ID, initialWindowSize, 0, false);
int delta = newInitialWindowSize - initialWindowSize;
verifyWindowUpdateSent(CONNECTION_STREAM_ID, delta);
verifyWindowUpdateSent(STREAM_ID, delta);
}
@Test
public void connectionWindowShouldExpandWithNumberOfStreams() throws Http2Exception {
// Create another stream
int newStreamId = 3;
connection.local().createStream(newStreamId, false);
assertEquals(DEFAULT_WINDOW_SIZE, window(STREAM_ID));
assertEquals(DEFAULT_WINDOW_SIZE, window(CONNECTION_STREAM_ID));
// Receive some data - this should cause the connection window to expand.
int data1 = 50;
int expectedMaxConnectionWindow = DEFAULT_WINDOW_SIZE * 2;
applyFlowControl(STREAM_ID, data1, 0, false);
verifyWindowUpdateNotSent(STREAM_ID);
verifyWindowUpdateSent(CONNECTION_STREAM_ID, DEFAULT_WINDOW_SIZE + data1);
assertEquals(DEFAULT_WINDOW_SIZE - data1, window(STREAM_ID));
assertEquals(expectedMaxConnectionWindow, window(CONNECTION_STREAM_ID));
reset(frameWriter);
// Close the new stream.
connection.stream(newStreamId).close();
// Read more data and verify that the stream window refreshes but the
// connection window continues collapsing.
int data2 = window(STREAM_ID);
applyFlowControl(STREAM_ID, data2, 0, false);
verifyWindowUpdateSent(STREAM_ID, data1 + data2);
verifyWindowUpdateNotSent(CONNECTION_STREAM_ID);
assertEquals(DEFAULT_WINDOW_SIZE, window(STREAM_ID));
assertEquals(DEFAULT_WINDOW_SIZE * 2 - data2 , window(CONNECTION_STREAM_ID));
reset(frameWriter);
// Read enough data to cause a WINDOW_UPDATE for both the stream and connection and
// verify the new maximum of the connection window.
int data3 = window(STREAM_ID);
applyFlowControl(STREAM_ID, data3, 0, false);
verifyWindowUpdateSent(STREAM_ID, DEFAULT_WINDOW_SIZE);
verifyWindowUpdateSent(CONNECTION_STREAM_ID, DEFAULT_WINDOW_SIZE
- (DEFAULT_WINDOW_SIZE * 2 - (data2 + data3)));
assertEquals(DEFAULT_WINDOW_SIZE, window(STREAM_ID));
assertEquals(DEFAULT_WINDOW_SIZE, window(CONNECTION_STREAM_ID));
}
private static int getWindowDelta(int initialSize, int windowSize, int dataSize) {
int newWindowSize = windowSize - dataSize;
return initialSize - newWindowSize;
}
private void applyFlowControl(int dataSize, int padding, boolean endOfStream) throws Http2Exception {
private void applyFlowControl(int streamId, int dataSize, int padding, boolean endOfStream) throws Http2Exception {
final ByteBuf buf = dummyData(dataSize);
try {
controller.onDataRead(ctx, STREAM_ID, buf, padding, endOfStream);
controller.onDataRead(ctx, streamId, buf, padding, endOfStream);
} finally {
buf.release();
}
@ -179,4 +225,8 @@ public class DefaultHttp2InboundFlowControllerTest {
verify(frameWriter, never()).writeWindowUpdate(any(ChannelHandlerContext.class), anyInt(), anyInt(),
any(ChannelPromise.class));
}
private int window(int streamId) {
return connection.stream(streamId).inboundFlow().window();
}
}

View File

@ -317,6 +317,52 @@ public class DefaultHttp2OutboundFlowControllerTest {
}
}
@Test
public void successiveSendsShouldNotInteract() throws Http2Exception {
// Collapse the connection window to force queueing.
controller.updateOutboundWindowSize(CONNECTION_STREAM_ID, -window(CONNECTION_STREAM_ID));
assertEquals(0, window(CONNECTION_STREAM_ID));
ByteBuf data = dummyData(5, 5);
ByteBuf dataOnly = data.slice(0, 5);
try {
// Queue data for stream A and allow most of it to be written.
send(STREAM_A, dataOnly.slice(), 5);
verifyNoWrite(STREAM_A);
verifyNoWrite(STREAM_B);
controller.updateOutboundWindowSize(CONNECTION_STREAM_ID, 8);
ArgumentCaptor<ByteBuf> argument = ArgumentCaptor.forClass(ByteBuf.class);
captureWrite(STREAM_A, argument, 3, false);
ByteBuf writtenBuf = argument.getValue();
assertEquals(dataOnly, writtenBuf);
assertEquals(65527, window(STREAM_A));
assertEquals(0, window(CONNECTION_STREAM_ID));
resetFrameWriter();
// Queue data for stream B and allow the rest of A and all of B to be written.
send(STREAM_B, dataOnly.slice(), 5);
verifyNoWrite(STREAM_A);
verifyNoWrite(STREAM_B);
controller.updateOutboundWindowSize(CONNECTION_STREAM_ID, 12);
assertEquals(0, window(CONNECTION_STREAM_ID));
// Verify the rest of A is written.
captureWrite(STREAM_A, argument, 2, false);
writtenBuf = argument.getValue();
assertEquals(Unpooled.EMPTY_BUFFER, writtenBuf);
assertEquals(65525, window(STREAM_A));
argument = ArgumentCaptor.forClass(ByteBuf.class);
captureWrite(STREAM_B, argument, 5, false);
writtenBuf = argument.getValue();
assertEquals(dataOnly, writtenBuf);
assertEquals(65525, window(STREAM_B));
} finally {
manualSafeRelease(data);
}
}
@Test
public void negativeWindowShouldNotThrowException() throws Http2Exception {
final int initWindow = 20;
@ -609,8 +655,8 @@ public class DefaultHttp2OutboundFlowControllerTest {
controller.updateOutboundWindowSize(CONNECTION_STREAM_ID, 10);
assertEquals(0, window(CONNECTION_STREAM_ID));
assertEquals(0, window(STREAM_A));
assertEquals(DEFAULT_WINDOW_SIZE - 5, window(STREAM_B));
assertEquals((2 * DEFAULT_WINDOW_SIZE) - 5, window(STREAM_C) + window(STREAM_D));
assertEquals(DEFAULT_WINDOW_SIZE - 5, window(STREAM_B), 2);
assertEquals((2 * DEFAULT_WINDOW_SIZE) - 5, window(STREAM_C) + window(STREAM_D), 5);
final ArgumentCaptor<ByteBuf> captor = ArgumentCaptor.forClass(ByteBuf.class);
@ -618,7 +664,7 @@ public class DefaultHttp2OutboundFlowControllerTest {
verifyNoWrite(STREAM_A);
captureWrite(STREAM_B, captor, 0, false);
assertEquals(5, captor.getValue().readableBytes());
assertEquals(5, captor.getValue().readableBytes(), 2);
// Verify that C and D each shared half of A's allowance. Since A's allowance (5) cannot
// be split evenly, one will get 3 and one will get 2.
@ -626,7 +672,7 @@ public class DefaultHttp2OutboundFlowControllerTest {
int c = captor.getValue().readableBytes();
captureWrite(STREAM_D, captor, 0, false);
int d = captor.getValue().readableBytes();
assertEquals(5, c + d);
assertEquals(5, c + d, 4);
assertEquals(1, Math.abs(c - d));
} finally {
manualSafeRelease(bufs);
@ -795,18 +841,18 @@ public class DefaultHttp2OutboundFlowControllerTest {
// Verify that the entire frame was sent.
controller.updateOutboundWindowSize(CONNECTION_STREAM_ID, 10);
assertEquals(0, window(CONNECTION_STREAM_ID));
assertEquals(DEFAULT_WINDOW_SIZE - 5, window(STREAM_A));
assertEquals(DEFAULT_WINDOW_SIZE - 5, window(STREAM_A), 2);
assertEquals(0, window(STREAM_B));
assertEquals(DEFAULT_WINDOW_SIZE, window(STREAM_C));
assertEquals(DEFAULT_WINDOW_SIZE - 5, window(STREAM_D));
assertEquals(DEFAULT_WINDOW_SIZE - 5, window(STREAM_D), 2);
final ArgumentCaptor<ByteBuf> captor = ArgumentCaptor.forClass(ByteBuf.class);
// Verify that A received all the bytes.
captureWrite(STREAM_A, captor, 0, false);
assertEquals(5, captor.getValue().readableBytes());
assertEquals(5, captor.getValue().readableBytes(), 2);
captureWrite(STREAM_D, captor, 0, false);
assertEquals(5, captor.getValue().readableBytes());
assertEquals(5, captor.getValue().readableBytes(), 2);
verifyNoWrite(STREAM_B);
verifyNoWrite(STREAM_C);
} finally {
@ -875,7 +921,7 @@ public class DefaultHttp2OutboundFlowControllerTest {
assertEquals(aWritten, min);
assertEquals(bWritten, max);
assertTrue(aWritten < cWritten);
assertEquals(cWritten, dWritten);
assertEquals(cWritten, dWritten, 1);
assertTrue(cWritten < bWritten);
assertEquals(0, window(CONNECTION_STREAM_ID));
@ -899,7 +945,7 @@ public class DefaultHttp2OutboundFlowControllerTest {
* </pre>
*/
@Test
public void samePriorityShouldWriteEqualData() throws Http2Exception {
public void samePriorityShouldDistributeBasedOnData() throws Http2Exception {
// Block the connection
exhaustStreamWindow(CONNECTION_STREAM_ID);
@ -928,10 +974,10 @@ public class DefaultHttp2OutboundFlowControllerTest {
// Allow 1000 bytes to be sent.
controller.updateOutboundWindowSize(CONNECTION_STREAM_ID, 999);
assertEquals(0, window(CONNECTION_STREAM_ID));
assertEquals(DEFAULT_WINDOW_SIZE - 333, window(STREAM_A));
assertEquals(DEFAULT_WINDOW_SIZE - 333, window(STREAM_B));
assertEquals(DEFAULT_WINDOW_SIZE - 333, window(STREAM_A), 50);
assertEquals(DEFAULT_WINDOW_SIZE - 333, window(STREAM_B), 50);
assertEquals(DEFAULT_WINDOW_SIZE, window(STREAM_C));
assertEquals(DEFAULT_WINDOW_SIZE - 333, window(STREAM_D));
assertEquals(DEFAULT_WINDOW_SIZE - 333, window(STREAM_D), 50);
captureWrite(STREAM_A, captor, 0, false);
int aWritten = captor.getValue().readableBytes();
@ -943,9 +989,9 @@ public class DefaultHttp2OutboundFlowControllerTest {
int dWritten = captor.getValue().readableBytes();
assertEquals(999, aWritten + bWritten + dWritten);
assertEquals(333, aWritten);
assertEquals(333, bWritten);
assertEquals(333, dWritten);
assertEquals(333, aWritten, 50);
assertEquals(333, bWritten, 50);
assertEquals(333, dWritten, 50);
} finally {
manualSafeRelease(bufs);
}
@ -993,16 +1039,16 @@ public class DefaultHttp2OutboundFlowControllerTest {
OutboundFlowState state = state(stream0);
assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_A, STREAM_B, STREAM_C, STREAM_D)),
state.priorityBytes());
state.streamableBytesForTree());
state = state(streamA);
assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_A, STREAM_C, STREAM_D)),
state.priorityBytes());
state.streamableBytesForTree());
state = state(streamB);
assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_B)), state.priorityBytes());
assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_B)), state.streamableBytesForTree());
state = state(streamC);
assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_C)), state.priorityBytes());
assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_C)), state.streamableBytesForTree());
state = state(streamD);
assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_D)), state.priorityBytes());
assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_D)), state.streamableBytesForTree());
} finally {
manualSafeRelease(bufs);
}
@ -1063,17 +1109,17 @@ public class DefaultHttp2OutboundFlowControllerTest {
streamB.setPriority(STREAM_A, DEFAULT_PRIORITY_WEIGHT, true);
OutboundFlowState state = state(stream0);
assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_A, STREAM_B, STREAM_C, STREAM_D)),
state.priorityBytes());
state.streamableBytesForTree());
state = state(streamA);
assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_A, STREAM_B, STREAM_C, STREAM_D)),
state.priorityBytes());
state.streamableBytesForTree());
state = state(streamB);
assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_B, STREAM_C, STREAM_D)),
state.priorityBytes());
state.streamableBytesForTree());
state = state(streamC);
assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_C)), state.priorityBytes());
assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_C)), state.streamableBytesForTree());
state = state(streamD);
assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_D)), state.priorityBytes());
assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_D)), state.streamableBytesForTree());
} finally {
manualSafeRelease(bufs);
}
@ -1142,19 +1188,19 @@ public class DefaultHttp2OutboundFlowControllerTest {
assertEquals(
calculateStreamSizeSum(streamSizes,
Arrays.asList(STREAM_A, STREAM_B, STREAM_C, STREAM_D, STREAM_E)),
state.priorityBytes());
state.streamableBytesForTree());
state = state(streamA);
assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_A, STREAM_E, STREAM_C, STREAM_D)),
state.priorityBytes());
state.streamableBytesForTree());
state = state(streamB);
assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_B)), state.priorityBytes());
assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_B)), state.streamableBytesForTree());
state = state(streamC);
assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_C)), state.priorityBytes());
assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_C)), state.streamableBytesForTree());
state = state(streamD);
assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_D)), state.priorityBytes());
assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_D)), state.streamableBytesForTree());
state = state(streamE);
assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_E, STREAM_C, STREAM_D)),
state.priorityBytes());
state.streamableBytesForTree());
} finally {
manualSafeRelease(bufs);
}
@ -1212,15 +1258,15 @@ public class DefaultHttp2OutboundFlowControllerTest {
OutboundFlowState state = state(stream0);
assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_B, STREAM_C, STREAM_D)),
state.priorityBytes());
state.streamableBytesForTree());
state = state(streamA);
assertEquals(0, state.priorityBytes());
assertEquals(0, state.streamableBytesForTree());
state = state(streamB);
assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_B)), state.priorityBytes());
assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_B)), state.streamableBytesForTree());
state = state(streamC);
assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_C)), state.priorityBytes());
assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_C)), state.streamableBytesForTree());
state = state(streamD);
assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_D)), state.priorityBytes());
assertEquals(calculateStreamSizeSum(streamSizes, Arrays.asList(STREAM_D)), state.streamableBytesForTree());
} finally {
manualSafeRelease(bufs);
}
@ -1264,37 +1310,7 @@ public class DefaultHttp2OutboundFlowControllerTest {
}
private void exhaustStreamWindow(int streamId) throws Http2Exception {
final int dataLength = window(streamId);
final ByteBuf data = dummyData(dataLength, 0);
try {
if (streamId == CONNECTION_STREAM_ID) {
// Find a stream that we can use to shrink the connection window.
int streamToWrite = 0;
for (Http2Stream stream : connection.activeStreams()) {
if (stream.outboundFlow().window() >= dataLength) {
streamToWrite = stream.id();
break;
}
}
// Write to STREAM_A to decrease the connection window and then restore STREAM_A's window.
int prevWindow = window(streamToWrite);
send(streamToWrite, data, 0);
int delta = prevWindow - window(streamToWrite);
controller.updateOutboundWindowSize(streamToWrite, delta);
} else {
// Write to the stream and then restore the connection window.
int prevWindow = window(CONNECTION_STREAM_ID);
send(streamId, data, 0);
int delta = prevWindow - window(CONNECTION_STREAM_ID);
controller.updateOutboundWindowSize(CONNECTION_STREAM_ID, delta);
}
// Reset the frameWriter so that this write doesn't interfere with other tests.
resetFrameWriter();
} finally {
manualSafeRelease(data);
}
controller.updateOutboundWindowSize(streamId, -window(streamId));
}
private void resetFrameWriter() {

View File

@ -25,15 +25,18 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
@ -52,9 +55,6 @@ import io.netty.util.concurrent.Future;
import java.io.ByteArrayOutputStream;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@ -63,7 +63,6 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@ -92,6 +91,8 @@ public class Http2ConnectionRoundtripTest {
@Before
public void setup() throws Exception {
MockitoAnnotations.initMocks(this);
mockFlowControl(clientListener);
mockFlowControl(serverListener);
}
@After
@ -248,19 +249,20 @@ public class Http2ConnectionRoundtripTest {
final int length = 10485760; // 10MB
// Create a buffer filled with random bytes.
final byte[] bytes = new byte[length];
new Random().nextBytes(bytes);
final ByteBuf data = Unpooled.wrappedBuffer(bytes);
final ByteBuf data = randomBytes(length);
final ByteArrayOutputStream out = new ByteArrayOutputStream(length);
doAnswer(new Answer<Void>() {
doAnswer(new Answer<Integer>() {
@Override
public Void answer(InvocationOnMock in) throws Throwable {
public Integer answer(InvocationOnMock in) throws Throwable {
ByteBuf buf = (ByteBuf) in.getArguments()[2];
int padding = (Integer) in.getArguments()[3];
int processedBytes = buf.readableBytes() + padding;
buf.readBytes(out, buf.readableBytes());
return null;
return processedBytes;
}
}).when(serverListener).onDataRead(any(ChannelHandlerContext.class), eq(3),
any(ByteBuf.class), eq(0), Mockito.anyBoolean());
any(ByteBuf.class), eq(0), anyBoolean());
try {
// Initialize the data latch based on the number of bytes expected.
bootstrapEnv(length, 2, 1);
@ -292,7 +294,7 @@ public class Http2ConnectionRoundtripTest {
assertEquals(0, dataLatch.getCount());
out.flush();
byte[] received = out.toByteArray();
assertArrayEquals(bytes, received);
assertArrayEquals(data.array(), received);
} finally {
data.release();
out.close();
@ -302,62 +304,80 @@ public class Http2ConnectionRoundtripTest {
@Test
public void stressTest() throws Exception {
final Http2Headers headers = dummyHeaders();
final String text = "hello world";
final String pingMsg = "12345678";
final ByteBuf data = Unpooled.copiedBuffer(text, UTF_8);
int length = 10;
final ByteBuf data = randomBytes(length);
final String dataAsHex = ByteBufUtil.hexDump(data);
final ByteBuf pingData = Unpooled.copiedBuffer(pingMsg, UTF_8);
final int numStreams = 5000;
final List<String> receivedPingBuffers = Collections.synchronizedList(new ArrayList<String>(numStreams));
final int numStreams = 2000;
// Collect all the ping buffers as we receive them at the server.
final String[] receivedPings = new String[numStreams];
doAnswer(new Answer<Void>() {
int nextIndex;
@Override
public Void answer(InvocationOnMock in) throws Throwable {
receivedPingBuffers.add(((ByteBuf) in.getArguments()[1]).toString(UTF_8));
receivedPings[nextIndex++] = ((ByteBuf) in.getArguments()[1]).toString(UTF_8);
return null;
}
}).when(serverListener).onPingRead(any(ChannelHandlerContext.class), eq(pingData));
final List<String> receivedDataBuffers = Collections.synchronizedList(new ArrayList<String>(numStreams));
doAnswer(new Answer<Void>() {
}).when(serverListener).onPingRead(any(ChannelHandlerContext.class), any(ByteBuf.class));
// Collect all the data buffers as we receive them at the server.
final StringBuilder[] receivedData = new StringBuilder[numStreams];
doAnswer(new Answer<Integer>() {
@Override
public Void answer(InvocationOnMock in) throws Throwable {
receivedDataBuffers.add(((ByteBuf) in.getArguments()[2]).toString(UTF_8));
return null;
public Integer answer(InvocationOnMock in) throws Throwable {
int streamId = (Integer) in.getArguments()[1];
ByteBuf buf = (ByteBuf) in.getArguments()[2];
int padding = (Integer) in.getArguments()[3];
int processedBytes = buf.readableBytes() + padding;
int streamIndex = (streamId - 3) / 2;
StringBuilder builder = receivedData[streamIndex];
if (builder == null) {
builder = new StringBuilder(dataAsHex.length());
receivedData[streamIndex] = builder;
}
builder.append(ByteBufUtil.hexDump(buf));
return processedBytes;
}
}).when(serverListener).onDataRead(any(ChannelHandlerContext.class), anyInt(), eq(data),
eq(0), eq(false));
}).when(serverListener).onDataRead(any(ChannelHandlerContext.class), anyInt(),
any(ByteBuf.class), anyInt(), anyBoolean());
try {
bootstrapEnv(numStreams * text.length(), numStreams * 4, numStreams);
bootstrapEnv(numStreams * length, numStreams * 4, numStreams);
runInChannel(clientChannel, new Http2Runnable() {
@Override
public void run() {
for (int i = 0, nextStream = 3; i < numStreams; ++i, nextStream += 2) {
http2Client.encoder().writeHeaders(ctx(), nextStream, headers, 0,
(short) 16, false, 0, false, newPromise());
int upperLimit = 3 + 2 * numStreams;
for (int streamId = 3; streamId < upperLimit; streamId += 2) {
// Send a bunch of data on each stream.
http2Client.encoder().writeHeaders(ctx(), streamId, headers, 0, (short) 16,
false, 0, false, newPromise());
http2Client.encoder().writePing(ctx(), false, pingData.slice().retain(),
newPromise());
http2Client.encoder().writeData(ctx(), nextStream, data.slice().retain(),
0, false, newPromise());
http2Client.encoder().writeData(ctx(), streamId, data.slice().retain(), 0,
false, newPromise());
// Write trailers.
http2Client.encoder().writeHeaders(ctx(), nextStream, headers, 0,
(short) 16, false, 0, true, newPromise());
http2Client.encoder().writeHeaders(ctx(), streamId, headers, 0, (short) 16,
false, 0, true, newPromise());
}
}
});
// Wait for all frames to be received.
assertTrue(trailersLatch.await(30, SECONDS));
assertTrue(trailersLatch.await(60, SECONDS));
verify(serverListener, times(numStreams)).onHeadersRead(any(ChannelHandlerContext.class), anyInt(),
eq(headers), eq(0), eq((short) 16), eq(false), eq(0), eq(false));
verify(serverListener, times(numStreams)).onHeadersRead(any(ChannelHandlerContext.class), anyInt(),
eq(headers), eq(0), eq((short) 16), eq(false), eq(0), eq(true));
verify(serverListener, times(numStreams)).onPingRead(any(ChannelHandlerContext.class),
any(ByteBuf.class));
verify(serverListener, times(numStreams)).onDataRead(any(ChannelHandlerContext.class),
anyInt(), any(ByteBuf.class), eq(0), eq(false));
assertEquals(numStreams, receivedPingBuffers.size());
assertEquals(numStreams, receivedDataBuffers.size());
for (String receivedData : receivedDataBuffers) {
assertEquals(text, receivedData);
verify(serverListener, never()).onDataRead(any(ChannelHandlerContext.class),
anyInt(), any(ByteBuf.class), eq(0), eq(true));
for (StringBuilder builder : receivedData) {
assertEquals(dataAsHex, builder.toString());
}
for (String receivedPing : receivedPingBuffers) {
for (String receivedPing : receivedPings) {
assertEquals(pingMsg, receivedPing);
}
} finally {
@ -419,4 +439,27 @@ public class Http2ConnectionRoundtripTest {
return new DefaultHttp2Headers().method(as("GET")).scheme(as("https"))
.authority(as("example.org")).path(as("/some/path/resource2")).add(randomString(), randomString());
}
private void mockFlowControl(Http2FrameListener listener) throws Http2Exception {
doAnswer(new Answer<Integer>() {
@Override
public Integer answer(InvocationOnMock invocation) throws Throwable {
ByteBuf buf = (ByteBuf) invocation.getArguments()[2];
int padding = (Integer) invocation.getArguments()[3];
int processedBytes = buf.readableBytes() + padding;
return processedBytes;
}
}).when(listener).onDataRead(any(ChannelHandlerContext.class), anyInt(),
any(ByteBuf.class), anyInt(), anyBoolean());
}
/**
* Creates a {@link ByteBuf} of the given length, filled with random bytes.
*/
private static ByteBuf randomBytes(int length) {
final byte[] bytes = new byte[length];
new Random().nextBytes(bytes);
return Unpooled.wrappedBuffer(bytes);
}
}

View File

@ -15,8 +15,9 @@
package io.netty.util.collection;
import java.lang.reflect.Array;
import java.util.AbstractCollection;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.NoSuchElementException;
@ -221,16 +222,34 @@ public class IntObjectHashMap<V> implements IntObjectMap<V>, Iterable<IntObjectM
}
@Override
public V[] values(Class<V> clazz) {
@SuppressWarnings("unchecked")
V[] outValues = (V[]) Array.newInstance(clazz, size());
int targetIx = 0;
for (int i = 0; i < values.length; ++i) {
if (values[i] != null) {
outValues[targetIx++] = values[i];
public Collection<V> values() {
return new AbstractCollection<V>() {
@Override
public Iterator<V> iterator() {
return new Iterator<V>() {
final Iterator<Entry<V>> iter = IntObjectHashMap.this.iterator();
@Override
public boolean hasNext() {
return iter.hasNext();
}
@Override
public V next() {
return iter.next().value();
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
};
}
}
return outValues;
@Override
public int size() {
return size;
}
};
}
@Override

View File

@ -14,6 +14,8 @@
*/
package io.netty.util.collection;
import java.util.Collection;
/**
* Interface for a primitive map that uses {@code int}s as keys.
*
@ -112,5 +114,5 @@ public interface IntObjectMap<V> {
/**
* Gets the values contained in this map.
*/
V[] values(Class<V> clazz);
Collection<V> values();
}

View File

@ -16,6 +16,7 @@ package io.netty.util.collection;
import io.netty.util.internal.EmptyArrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.NoSuchElementException;
@ -106,8 +107,8 @@ public final class PrimitiveCollections {
}
@Override
public Object[] values(Class<Object> clazz) {
return EmptyArrays.EMPTY_OBJECTS;
public Collection<Object> values() {
return Collections.emptyList();
}
}
@ -185,8 +186,8 @@ public final class PrimitiveCollections {
}
@Override
public V[] values(Class<V> clazz) {
return map.values(clazz);
public Collection<V> values() {
return map.values();
}
/**

View File

@ -24,6 +24,7 @@ import org.junit.Before;
import org.junit.Test;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Random;
@ -280,8 +281,8 @@ public class IntObjectHashMapTest {
map.put(4, new Value("v4"));
map.remove(4);
Value[] values = map.values(Value.class);
assertEquals(3, values.length);
Collection<Value> values = map.values();
assertEquals(3, values.size());
Set<Value> expected = new HashSet<Value>();
expected.add(v1);