Removing direct access to HTTP/2 child streams.

Motivation:

We've removed access to the activeStreams collection, we should do the same for the children of a stream to provide a consistent interface.

Modifications:

Moved Http2StreamVisitor to a top-level interface. Removed unnecessary child operations from the Http2Stream interface so that we no longer require a map structure.

Result:

Cleaner and more consistent interface for iterating over child streams.
This commit is contained in:
nmittler 2015-04-09 09:48:58 -07:00
parent 386fd89597
commit da01902ea2
11 changed files with 278 additions and 164 deletions

View File

@ -33,6 +33,7 @@ import static io.netty.handler.codec.http2.Http2Stream.State.OPEN;
import static io.netty.handler.codec.http2.Http2Stream.State.RESERVED_LOCAL; import static io.netty.handler.codec.http2.Http2Stream.State.RESERVED_LOCAL;
import static io.netty.handler.codec.http2.Http2Stream.State.RESERVED_REMOTE; import static io.netty.handler.codec.http2.Http2Stream.State.RESERVED_REMOTE;
import static io.netty.util.internal.ObjectUtil.checkNotNull; import static io.netty.util.internal.ObjectUtil.checkNotNull;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.http2.Http2StreamRemovalPolicy.Action; import io.netty.handler.codec.http2.Http2StreamRemovalPolicy.Action;
import io.netty.util.collection.IntObjectHashMap; import io.netty.util.collection.IntObjectHashMap;
@ -42,7 +43,6 @@ import io.netty.util.internal.PlatformDependent;
import java.util.ArrayDeque; import java.util.ArrayDeque;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedHashSet; import java.util.LinkedHashSet;
@ -142,7 +142,7 @@ public class DefaultHttp2Connection implements Http2Connection {
} }
@Override @Override
public Http2Stream forEachActiveStream(StreamVisitor visitor) throws Http2Exception { public Http2Stream forEachActiveStream(Http2StreamVisitor visitor) throws Http2Exception {
return activeStreams.forEachActiveStream(visitor); return activeStreams.forEachActiveStream(visitor);
} }
@ -169,7 +169,7 @@ public class DefaultHttp2Connection implements Http2Connection {
} }
try { try {
forEachActiveStream(new StreamVisitor() { forEachActiveStream(new Http2StreamVisitor() {
@Override @Override
public boolean visit(Http2Stream stream) { public boolean visit(Http2Stream stream) {
if (stream.id() > lastKnownStream && localEndpoint.createdStreamId(stream.id())) { if (stream.id() > lastKnownStream && localEndpoint.createdStreamId(stream.id())) {
@ -196,7 +196,7 @@ public class DefaultHttp2Connection implements Http2Connection {
} }
try { try {
forEachActiveStream(new StreamVisitor() { forEachActiveStream(new Http2StreamVisitor() {
@Override @Override
public boolean visit(Http2Stream stream) { public boolean visit(Http2Stream stream) {
if (stream.id() > lastKnownStream && remoteEndpoint.createdStreamId(stream.id())) { if (stream.id() > lastKnownStream && remoteEndpoint.createdStreamId(stream.id())) {
@ -334,18 +334,14 @@ public class DefaultHttp2Connection implements Http2Connection {
} }
@Override @Override
public final Collection<? extends Http2Stream> children() { public Http2Stream forEachChild(Http2StreamVisitor visitor) throws Http2Exception {
return children.values(); for (IntObjectHashMap.Entry<DefaultStream> entry : children.entries()) {
Http2Stream stream = entry.value();
if (!visitor.visit(stream)) {
return stream;
} }
@Override
public final boolean hasChild(int streamId) {
return child(streamId) != null;
} }
return null;
@Override
public final Http2Stream child(int streamId) {
return children.get(streamId);
} }
@Override @Override
@ -1020,17 +1016,15 @@ public class DefaultHttp2Connection implements Http2Connection {
} }
} }
public Http2Stream forEachActiveStream(StreamVisitor visitor) throws Http2Exception { public Http2Stream forEachActiveStream(Http2StreamVisitor visitor) throws Http2Exception {
++pendingIterations; ++pendingIterations;
Http2Stream resultStream = null;
try { try {
for (Http2Stream stream : streams) { for (Http2Stream stream : streams) {
if (!visitor.visit(stream)) { if (!visitor.visit(stream)) {
resultStream = stream; return stream;
break;
} }
} }
return resultStream; return null;
} finally { } finally {
--pendingIterations; --pendingIterations;
if (allowModifications()) { if (allowModifications()) {

View File

@ -28,7 +28,6 @@ import static java.lang.Math.max;
import static java.lang.Math.min; import static java.lang.Math.min;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http2.Http2Connection.StreamVisitor;
import io.netty.handler.codec.http2.Http2Exception.CompositeStreamException; import io.netty.handler.codec.http2.Http2Exception.CompositeStreamException;
import io.netty.handler.codec.http2.Http2Exception.StreamException; import io.netty.handler.codec.http2.Http2Exception.StreamException;
@ -381,7 +380,7 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController
/** /**
* Provides a means to iterate over all active streams and increment the flow control windows. * Provides a means to iterate over all active streams and increment the flow control windows.
*/ */
private static final class WindowUpdateVisitor implements StreamVisitor { private static final class WindowUpdateVisitor implements Http2StreamVisitor {
private CompositeStreamException compositeException; private CompositeStreamException compositeException;
private final int delta; private final int delta;

View File

@ -22,18 +22,19 @@ import static io.netty.handler.codec.http2.Http2Exception.streamError;
import static io.netty.util.internal.ObjectUtil.checkNotNull; import static io.netty.util.internal.ObjectUtil.checkNotNull;
import static java.lang.Math.max; import static java.lang.Math.max;
import static java.lang.Math.min; import static java.lang.Math.min;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http2.Http2Connection.StreamVisitor;
import io.netty.handler.codec.http2.Http2Stream.State; import io.netty.handler.codec.http2.Http2Stream.State;
import java.util.ArrayDeque; import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.Deque; import java.util.Deque;
/** /**
* Basic implementation of {@link Http2RemoteFlowController}. * Basic implementation of {@link Http2RemoteFlowController}.
*/ */
public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowController { public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowController {
private static final StreamVisitor WRITE_ALLOCATED_BYTES = new StreamVisitor() { private static final Http2StreamVisitor WRITE_ALLOCATED_BYTES = new Http2StreamVisitor() {
@Override @Override
public boolean visit(Http2Stream stream) { public boolean visit(Http2Stream stream) {
state(stream).writeAllocatedBytes(); state(stream).writeAllocatedBytes();
@ -124,7 +125,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
final int delta = newWindowSize - initialWindowSize; final int delta = newWindowSize - initialWindowSize;
initialWindowSize = newWindowSize; initialWindowSize = newWindowSize;
connection.forEachActiveStream(new StreamVisitor() { connection.forEachActiveStream(new Http2StreamVisitor() {
@Override @Override
public boolean visit(Http2Stream stream) throws Http2Exception { public boolean visit(Http2Stream stream) throws Http2Exception {
// Verify that the maximum value is not exceeded by this change. // Verify that the maximum value is not exceeded by this change.
@ -250,79 +251,167 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
* @param connectionWindow The connection window this is available for use at this point in the tree. * @param connectionWindow The connection window this is available for use at this point in the tree.
* @return An object summarizing the write and allocation results. * @return An object summarizing the write and allocation results.
*/ */
private int allocateBytesForTree(Http2Stream parent, int connectionWindow) { static int allocateBytesForTree(Http2Stream parent, int connectionWindow) throws Http2Exception {
FlowState state = state(parent); FlowState state = state(parent);
if (state.streamableBytesForTree() <= 0) { if (state.streamableBytesForTree() <= 0) {
return 0; return 0;
} }
int bytesAllocated = 0;
// If the number of streamable bytes for this tree will fit in the connection window // If the number of streamable bytes for this tree will fit in the connection window
// then there is no need to prioritize the bytes...everyone sends what they have // then there is no need to prioritize the bytes...everyone sends what they have
if (state.streamableBytesForTree() <= connectionWindow) { if (state.streamableBytesForTree() <= connectionWindow) {
for (Http2Stream child : parent.children()) { SimpleChildFeeder childFeeder = new SimpleChildFeeder(connectionWindow);
state = state(child); parent.forEachChild(childFeeder);
int bytesForChild = state.streamableBytes(); return childFeeder.bytesAllocated;
if (bytesForChild > 0 || state.hasFrame()) {
state.allocate(bytesForChild);
bytesAllocated += bytesForChild;
connectionWindow -= bytesForChild;
}
int childBytesAllocated = allocateBytesForTree(child, connectionWindow);
bytesAllocated += childBytesAllocated;
connectionWindow -= childBytesAllocated;
}
return bytesAllocated;
} }
// This is the priority algorithm which will divide the available bytes based ChildFeeder childFeeder = new ChildFeeder(parent, connectionWindow);
// upon stream weight relative to its peers // Iterate once over all children of this parent and try to feed all the children.
Http2Stream[] children = parent.children().toArray(new Http2Stream[parent.numChildren()]); parent.forEachChild(childFeeder);
int totalWeight = parent.totalChildWeights();
for (int tail = children.length; tail > 0;) {
int head = 0;
int nextTail = 0;
int nextTotalWeight = 0;
int nextConnectionWindow = connectionWindow;
for (; head < tail && nextConnectionWindow > 0; ++head) {
Http2Stream child = children[head];
state = state(child);
int weight = child.weight();
double weightRatio = weight / (double) totalWeight;
// Now feed any remaining children that are still hungry until the connection
// window collapses.
childFeeder.feedHungryChildren();
return childFeeder.bytesAllocated;
}
/**
* A {@link Http2StreamVisitor} that performs the HTTP/2 priority algorithm to distribute the available connection
* window appropriately to the children of a given stream.
*/
private static final class ChildFeeder implements Http2StreamVisitor {
final int maxSize;
int totalWeight;
int connectionWindow;
int nextTotalWeight;
int nextConnectionWindow;
int bytesAllocated;
Http2Stream[] stillHungry;
int nextTail;
ChildFeeder(Http2Stream parent, int connectionWindow) {
maxSize = parent.numChildren();
totalWeight = parent.totalChildWeights();
this.connectionWindow = connectionWindow;
this.nextConnectionWindow = connectionWindow;
}
@Override
public boolean visit(Http2Stream child) throws Http2Exception {
// In order to make progress toward the connection window due to possible rounding errors, we make sure // In order to make progress toward the connection window due to possible rounding errors, we make sure
// that each stream (with data to send) is given at least 1 byte toward the connection window. // that each stream (with data to send) is given at least 1 byte toward the connection window.
int connectionWindowChunk = Math.max(1, (int) (connectionWindow * weightRatio)); int connectionWindowChunk = max(1, (int) (connectionWindow * (child.weight() / (double) totalWeight)));
int bytesForTree = Math.min(nextConnectionWindow, connectionWindowChunk); int bytesForTree = min(nextConnectionWindow, connectionWindowChunk);
int bytesForChild = Math.min(state.streamableBytes(), bytesForTree);
FlowState state = state(child);
int bytesForChild = min(state.streamableBytes(), bytesForTree);
// Allocate the bytes to this child.
if (bytesForChild > 0) { if (bytesForChild > 0) {
state.allocate(bytesForChild); state.allocate(bytesForChild);
bytesAllocated += bytesForChild; bytesAllocated += bytesForChild;
nextConnectionWindow -= bytesForChild; nextConnectionWindow -= bytesForChild;
bytesForTree -= bytesForChild; bytesForTree -= bytesForChild;
// If this subtree still wants to send then re-insert into children list and re-consider for next // If this subtree still wants to send then re-insert into children list and re-consider for next
// iteration. This is needed because we don't yet know if all the peers will be able to use // iteration. This is needed because we don't yet know if all the peers will be able to use
// all of their "fair share" of the connection window, and if they don't use it then we should // all of their "fair share" of the connection window, and if they don't use it then we should
// divide their unused shared up for the peers who still want to send. // divide their unused shared up for the peers who still want to send.
if (state.streamableBytesForTree() > 0) { if (nextConnectionWindow > 0 && state.streamableBytesForTree() > 0) {
children[nextTail++] = child; stillHungry(child);
nextTotalWeight += weight; nextTotalWeight += child.weight();
} }
} }
// Allocate any remaining bytes to the children of this stream.
if (bytesForTree > 0) { if (bytesForTree > 0) {
int childBytesAllocated = allocateBytesForTree(child, bytesForTree); int childBytesAllocated = allocateBytesForTree(child, bytesForTree);
bytesAllocated += childBytesAllocated; bytesAllocated += childBytesAllocated;
nextConnectionWindow -= childBytesAllocated; nextConnectionWindow -= childBytesAllocated;
} }
return nextConnectionWindow > 0;
}
void feedHungryChildren() throws Http2Exception {
if (stillHungry == null) {
// There are no hungry children to feed.
return;
}
totalWeight = nextTotalWeight;
connectionWindow = nextConnectionWindow;
// Loop until there are not bytes left to stream or the connection window has collapsed.
for (int tail = nextTail; tail > 0 && connectionWindow > 0;) {
nextTotalWeight = 0;
nextTail = 0;
// Iterate over the children that are currently still hungry.
for (int head = 0; head < tail && nextConnectionWindow > 0; ++head) {
if (!visit(stillHungry[head])) {
// The connection window has collapsed, break out of the loop.
break;
}
} }
connectionWindow = nextConnectionWindow; connectionWindow = nextConnectionWindow;
totalWeight = nextTotalWeight; totalWeight = nextTotalWeight;
tail = nextTail; tail = nextTail;
} }
}
return bytesAllocated; /**
* Indicates that the given child is still hungry (i.e. still has streamable bytes that can
* fit within the current connection window).
*/
void stillHungry(Http2Stream child) {
ensureSpaceIsAllocated(nextTail);
stillHungry[nextTail++] = child;
}
/**
* Ensures that the {@link #stillHungry} array is properly sized to hold the given index.
*/
void ensureSpaceIsAllocated(int index) {
if (stillHungry == null) {
// Initial size is 1/4 the number of children. Clipping the minimum at 2, which will over allocate if
// maxSize == 1 but if this was true we shouldn't need to re-allocate because the 1 child should get
// all of the available connection window.
stillHungry = new Http2Stream[max(2, maxSize / 4)];
} else if (index == stillHungry.length) {
// Grow the array by a factor of 2.
stillHungry = Arrays.copyOf(stillHungry, min(maxSize, stillHungry.length * 2));
}
}
}
/**
* A simplified version of {@link ChildFeeder} that is only used when all streamable bytes fit within the
* available connection window.
*/
private static final class SimpleChildFeeder implements Http2StreamVisitor {
int bytesAllocated;
int connectionWindow;
SimpleChildFeeder(int connectionWindow) {
this.connectionWindow = connectionWindow;
}
@Override
public boolean visit(Http2Stream child) throws Http2Exception {
FlowState childState = state(child);
int bytesForChild = childState.streamableBytes();
if (bytesForChild > 0 || childState.hasFrame()) {
childState.allocate(bytesForChild);
bytesAllocated += bytesForChild;
connectionWindow -= bytesForChild;
}
int childBytesAllocated = allocateBytesForTree(child, connectionWindow);
bytesAllocated += childBytesAllocated;
connectionWindow -= childBytesAllocated;
return true;
}
} }
/** /**
@ -522,7 +611,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
// Write the portion of the frame. // Write the portion of the frame.
writing = true; writing = true;
needFlush |= frame.write(Math.max(0, allowedBytes)); needFlush |= frame.write(max(0, allowedBytes));
if (!cancelled && frame.size() == 0) { if (!cancelled && frame.size() == 0) {
// This frame has been fully written, remove this frame // This frame has been fully written, remove this frame
// and notify it. Since we remove this frame // and notify it. Since we remove this frame

View File

@ -45,7 +45,7 @@ public interface Http2Connection {
/** /**
* Notifies the listener that the given stream is now {@code CLOSED} in both directions and will no longer * Notifies the listener that the given stream is now {@code CLOSED} in both directions and will no longer
* be returned by {@link #activeStreams()}. * be accessible via {@link #forEachActiveStream(Http2StreamVisitor)}.
*/ */
void onStreamClosed(Http2Stream stream); void onStreamClosed(Http2Stream stream);
@ -263,20 +263,7 @@ public interface Http2Connection {
* @param visitor The visitor which will visit each active stream. * @param visitor The visitor which will visit each active stream.
* @return The stream before iteration stopped or {@code null} if iteration went past the end. * @return The stream before iteration stopped or {@code null} if iteration went past the end.
*/ */
Http2Stream forEachActiveStream(StreamVisitor visitor) throws Http2Exception; Http2Stream forEachActiveStream(Http2StreamVisitor visitor) throws Http2Exception;
/**
* A visitor that allows iteration over a collection of streams.
*/
interface StreamVisitor {
/**
* @return <ul>
* <li>{@code true} if the processor wants to continue the loop and handle the entry.</li>
* <li>{@code false} if the processor wants to stop handling headers and abort the loop.</li>
* </ul>
*/
boolean visit(Http2Stream stream) throws Http2Exception;
}
/** /**
* Indicates whether or not the local endpoint for this connection is the server. * Indicates whether or not the local endpoint for this connection is the server.

View File

@ -32,7 +32,6 @@ import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise; import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.ByteToMessageDecoder; import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.http2.Http2Connection.StreamVisitor;
import io.netty.handler.codec.http2.Http2Exception.CompositeStreamException; import io.netty.handler.codec.http2.Http2Exception.CompositeStreamException;
import io.netty.handler.codec.http2.Http2Exception.StreamException; import io.netty.handler.codec.http2.Http2Exception.StreamException;
import io.netty.util.concurrent.GenericFutureListener; import io.netty.util.concurrent.GenericFutureListener;
@ -146,7 +145,7 @@ public class Http2ConnectionHandler extends ByteToMessageDecoder implements Http
// Check if there are streams to avoid the overhead of creating the ChannelFuture. // Check if there are streams to avoid the overhead of creating the ChannelFuture.
if (connection.numActiveStreams() > 0) { if (connection.numActiveStreams() > 0) {
final ChannelFuture future = ctx.newSucceededFuture(); final ChannelFuture future = ctx.newSucceededFuture();
connection.forEachActiveStream(new StreamVisitor() { connection.forEachActiveStream(new Http2StreamVisitor() {
@Override @Override
public boolean visit(Http2Stream stream) throws Http2Exception { public boolean visit(Http2Stream stream) throws Http2Exception {
closeStream(stream, future); closeStream(stream, future);

View File

@ -15,8 +15,6 @@
package io.netty.handler.codec.http2; package io.netty.handler.codec.http2;
import java.util.Collection;
/** /**
* A single stream within an HTTP2 connection. Streams are compared to each other by priority. * A single stream within an HTTP2 connection. Streams are compared to each other by priority.
*/ */
@ -46,7 +44,8 @@ public interface Http2Stream {
State state(); State state();
/** /**
* Add this stream to {@link Http2Connection#activeStreams()} and transition state to: * Opens this stream, making it available via {@link Http2Connection#forEachActiveStream(Http2StreamVisitor)} and
* transition state to:
* <ul> * <ul>
* <li>{@link State#OPEN} if {@link #state()} is {@link State#IDLE} and {@code halfClosed} is {@code false}.</li> * <li>{@link State#OPEN} if {@link #state()} is {@link State#IDLE} and {@code halfClosed} is {@code false}.</li>
* <li>{@link State#HALF_CLOSED_LOCAL} if {@link #state()} is {@link State#IDLE} and {@code halfClosed} * <li>{@link State#HALF_CLOSED_LOCAL} if {@link #state()} is {@link State#IDLE} and {@code halfClosed}
@ -175,18 +174,10 @@ public interface Http2Stream {
int numChildren(); int numChildren();
/** /**
* Indicates whether the given stream is a direct child of this stream. * Provide a means of iterating over the children of this stream.
*
* @param visitor The visitor which will visit each child stream.
* @return The stream before iteration stopped or {@code null} if iteration went past the end.
*/ */
boolean hasChild(int streamId); Http2Stream forEachChild(Http2StreamVisitor visitor) throws Http2Exception;
/**
* Attempts to find a child of this stream with the given ID. If not found, returns
* {@code null}.
*/
Http2Stream child(int streamId);
/**
* Gets all streams that are direct dependents on this stream.
*/
Collection<? extends Http2Stream> children();
} }

View File

@ -0,0 +1,28 @@
/*
* Copyright 2015 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License, version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.netty.handler.codec.http2;
/**
* A visitor that allows iteration over a collection of streams.
*/
public interface Http2StreamVisitor {
/**
* @return <ul>
* <li>{@code true} if the visitor wants to continue the loop and handle the entry.</li>
* <li>{@code false} if the visitor wants to stop handling headers and abort the loop.</li>
* </ul>
*/
boolean visit(Http2Stream stream) throws Http2Exception;
}

View File

@ -46,7 +46,6 @@ import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise; import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultChannelPromise; import io.netty.channel.DefaultChannelPromise;
import io.netty.handler.codec.http2.Http2Connection.StreamVisitor;
import io.netty.handler.codec.http2.Http2Exception.ClosedStreamCreationException; import io.netty.handler.codec.http2.Http2Exception.ClosedStreamCreationException;
import java.util.Collections; import java.util.Collections;
@ -128,13 +127,13 @@ public class DefaultHttp2ConnectionDecoderTest {
doAnswer(new Answer<Http2Stream>() { doAnswer(new Answer<Http2Stream>() {
@Override @Override
public Http2Stream answer(InvocationOnMock in) throws Throwable { public Http2Stream answer(InvocationOnMock in) throws Throwable {
StreamVisitor visitor = in.getArgumentAt(0, StreamVisitor.class); Http2StreamVisitor visitor = in.getArgumentAt(0, Http2StreamVisitor.class);
if (!visitor.visit(stream)) { if (!visitor.visit(stream)) {
return stream; return stream;
} }
return null; return null;
} }
}).when(connection).forEachActiveStream(any(StreamVisitor.class)); }).when(connection).forEachActiveStream(any(Http2StreamVisitor.class));
when(connection.stream(STREAM_ID)).thenReturn(stream); when(connection.stream(STREAM_ID)).thenReturn(stream);
when(connection.requireStream(STREAM_ID)).thenReturn(stream); when(connection.requireStream(STREAM_ID)).thenReturn(stream);
when(connection.local()).thenReturn(local); when(connection.local()).thenReturn(local);

View File

@ -53,7 +53,6 @@ import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise; import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultChannelPromise; import io.netty.channel.DefaultChannelPromise;
import io.netty.handler.codec.http2.Http2Connection.StreamVisitor;
import io.netty.handler.codec.http2.Http2Exception.ClosedStreamCreationException; import io.netty.handler.codec.http2.Http2Exception.ClosedStreamCreationException;
import io.netty.handler.codec.http2.Http2RemoteFlowController.FlowControlled; import io.netty.handler.codec.http2.Http2RemoteFlowController.FlowControlled;
import io.netty.util.concurrent.ImmediateEventExecutor; import io.netty.util.concurrent.ImmediateEventExecutor;
@ -141,13 +140,13 @@ public class DefaultHttp2ConnectionEncoderTest {
doAnswer(new Answer<Http2Stream>() { doAnswer(new Answer<Http2Stream>() {
@Override @Override
public Http2Stream answer(InvocationOnMock in) throws Throwable { public Http2Stream answer(InvocationOnMock in) throws Throwable {
StreamVisitor visitor = in.getArgumentAt(0, StreamVisitor.class); Http2StreamVisitor visitor = in.getArgumentAt(0, Http2StreamVisitor.class);
if (!visitor.visit(stream)) { if (!visitor.visit(stream)) {
return stream; return stream;
} }
return null; return null;
} }
}).when(connection).forEachActiveStream(any(StreamVisitor.class)); }).when(connection).forEachActiveStream(any(Http2StreamVisitor.class));
when(connection.stream(STREAM_ID)).thenReturn(stream); when(connection.stream(STREAM_ID)).thenReturn(stream);
when(connection.requireStream(STREAM_ID)).thenReturn(stream); when(connection.requireStream(STREAM_ID)).thenReturn(stream);
when(connection.local()).thenReturn(local); when(connection.local()).thenReturn(local);

View File

@ -29,19 +29,21 @@ import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset; import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.times; import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;
import io.netty.handler.codec.http2.Http2Connection.Endpoint; import io.netty.handler.codec.http2.Http2Connection.Endpoint;
import io.netty.handler.codec.http2.Http2Stream.State; import io.netty.handler.codec.http2.Http2Stream.State;
import io.netty.util.internal.PlatformDependent;
import java.util.Arrays;
import java.util.List;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.mockito.ArgumentCaptor; import org.mockito.ArgumentCaptor;
import org.mockito.Mock; import org.mockito.Mock;
import org.mockito.MockitoAnnotations; import org.mockito.MockitoAnnotations;
import javax.xml.ws.Holder;
import java.util.Arrays;
import java.util.List;
/** /**
* Tests for {@link DefaultHttp2Connection}. * Tests for {@link DefaultHttp2Connection}.
*/ */
@ -251,11 +253,13 @@ public class DefaultHttp2ConnectionTest {
assertEquals(0, server.numActiveStreams()); assertEquals(0, server.numActiveStreams());
} }
@SuppressWarnings("NumericOverflow")
@Test(expected = Http2Exception.class) @Test(expected = Http2Exception.class)
public void localStreamInvalidStreamIdShouldThrow() throws Http2Exception { public void localStreamInvalidStreamIdShouldThrow() throws Http2Exception {
client.local().createStream(Integer.MAX_VALUE + 2).open(false); client.local().createStream(Integer.MAX_VALUE + 2).open(false);
} }
@SuppressWarnings("NumericOverflow")
@Test(expected = Http2Exception.class) @Test(expected = Http2Exception.class)
public void remoteStreamInvalidStreamIdShouldThrow() throws Http2Exception { public void remoteStreamInvalidStreamIdShouldThrow() throws Http2Exception {
client.remote().createStream(Integer.MAX_VALUE + 1).open(false); client.remote().createStream(Integer.MAX_VALUE + 1).open(false);
@ -280,7 +284,7 @@ public class DefaultHttp2ConnectionTest {
Http2Stream stream = client.local().createStream(1).open(false); Http2Stream stream = client.local().createStream(1).open(false);
assertEquals(1, client.connectionStream().numChildren()); assertEquals(1, client.connectionStream().numChildren());
assertEquals(2, client.connectionStream().prioritizableForTree()); assertEquals(2, client.connectionStream().prioritizableForTree());
assertEquals(stream, client.connectionStream().child(1)); assertEquals(stream, child(client.connectionStream(), 1));
assertEquals(DEFAULT_PRIORITY_WEIGHT, stream.weight()); assertEquals(DEFAULT_PRIORITY_WEIGHT, stream.weight());
assertEquals(0, stream.parent().id()); assertEquals(0, stream.parent().id());
assertEquals(0, stream.numChildren()); assertEquals(0, stream.numChildren());
@ -292,7 +296,7 @@ public class DefaultHttp2ConnectionTest {
stream.setPriority(0, DEFAULT_PRIORITY_WEIGHT, false); stream.setPriority(0, DEFAULT_PRIORITY_WEIGHT, false);
assertEquals(1, client.connectionStream().numChildren()); assertEquals(1, client.connectionStream().numChildren());
assertEquals(2, client.connectionStream().prioritizableForTree()); assertEquals(2, client.connectionStream().prioritizableForTree());
assertEquals(stream, client.connectionStream().child(1)); assertEquals(stream, child(client.connectionStream(), 1));
assertEquals(DEFAULT_PRIORITY_WEIGHT, stream.weight()); assertEquals(DEFAULT_PRIORITY_WEIGHT, stream.weight());
assertEquals(0, stream.parent().id()); assertEquals(0, stream.parent().id());
assertEquals(0, stream.numChildren()); assertEquals(0, stream.numChildren());
@ -318,7 +322,7 @@ public class DefaultHttp2ConnectionTest {
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights()); assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
// Level 1 // Level 1
p = p.child(streamA.id()); p = child(p, streamA.id());
assertNotNull(p); assertNotNull(p);
assertEquals(0, p.parent().id()); assertEquals(0, p.parent().id());
assertEquals(1, p.numChildren()); assertEquals(1, p.numChildren());
@ -326,7 +330,7 @@ public class DefaultHttp2ConnectionTest {
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights()); assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
// Level 2 // Level 2
p = p.child(streamD.id()); p = child(p, streamD.id());
assertNotNull(p); assertNotNull(p);
assertEquals(streamA.id(), p.parent().id()); assertEquals(streamA.id(), p.parent().id());
assertEquals(2, p.numChildren()); assertEquals(2, p.numChildren());
@ -334,13 +338,13 @@ public class DefaultHttp2ConnectionTest {
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights()); assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
// Level 3 // Level 3
p = p.child(streamB.id()); p = child(p, streamB.id());
assertNotNull(p); assertNotNull(p);
assertEquals(streamD.id(), p.parent().id()); assertEquals(streamD.id(), p.parent().id());
assertEquals(0, p.numChildren()); assertEquals(0, p.numChildren());
assertEquals(1, p.prioritizableForTree()); assertEquals(1, p.prioritizableForTree());
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights()); assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
p = p.parent().child(streamC.id()); p = child(p.parent(), streamC.id());
assertNotNull(p); assertNotNull(p);
assertEquals(streamD.id(), p.parent().id()); assertEquals(streamD.id(), p.parent().id());
assertEquals(0, p.numChildren()); assertEquals(0, p.numChildren());
@ -492,7 +496,7 @@ public class DefaultHttp2ConnectionTest {
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights()); assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
// Level 1 // Level 1
p = p.child(streamA.id()); p = child(p, streamA.id());
assertNotNull(p); assertNotNull(p);
assertEquals(3, p.prioritizableForTree()); assertEquals(3, p.prioritizableForTree());
assertEquals(0, p.parent().id()); assertEquals(0, p.parent().id());
@ -500,7 +504,7 @@ public class DefaultHttp2ConnectionTest {
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights()); assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
// Level 2 // Level 2
p = p.child(streamB.id()); p = child(p, streamB.id());
assertNotNull(p); assertNotNull(p);
assertEquals(2, p.prioritizableForTree()); assertEquals(2, p.prioritizableForTree());
assertEquals(streamA.id(), p.parent().id()); assertEquals(streamA.id(), p.parent().id());
@ -508,13 +512,13 @@ public class DefaultHttp2ConnectionTest {
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights()); assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
// Level 3 // Level 3
p = p.child(streamC.id()); p = child(p, streamC.id());
assertNotNull(p); assertNotNull(p);
assertEquals(1, p.prioritizableForTree()); assertEquals(1, p.prioritizableForTree());
assertEquals(streamB.id(), p.parent().id()); assertEquals(streamB.id(), p.parent().id());
assertEquals(0, p.numChildren()); assertEquals(0, p.numChildren());
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights()); assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
p = p.parent().child(streamD.id()); p = child(p.parent(), streamD.id());
assertNotNull(p); assertNotNull(p);
assertEquals(1, p.prioritizableForTree()); assertEquals(1, p.prioritizableForTree());
assertEquals(streamB.id(), p.parent().id()); assertEquals(streamB.id(), p.parent().id());
@ -551,7 +555,7 @@ public class DefaultHttp2ConnectionTest {
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights()); assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
// Level 1 // Level 1
p = p.child(streamA.id()); p = child(p, streamA.id());
assertNotNull(p); assertNotNull(p);
assertEquals(0, p.parent().id()); assertEquals(0, p.parent().id());
assertEquals(1, p.numChildren()); assertEquals(1, p.numChildren());
@ -559,7 +563,7 @@ public class DefaultHttp2ConnectionTest {
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights()); assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
// Level 2 // Level 2
p = p.child(streamB.id()); p = child(p, streamB.id());
assertNotNull(p); assertNotNull(p);
assertEquals(streamA.id(), p.parent().id()); assertEquals(streamA.id(), p.parent().id());
assertEquals(1, p.numChildren()); assertEquals(1, p.numChildren());
@ -567,7 +571,7 @@ public class DefaultHttp2ConnectionTest {
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights()); assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
// Level 3 // Level 3
p = p.child(streamC.id()); p = child(p, streamC.id());
assertNotNull(p); assertNotNull(p);
assertEquals(streamB.id(), p.parent().id()); assertEquals(streamB.id(), p.parent().id());
assertEquals(1, p.numChildren()); assertEquals(1, p.numChildren());
@ -575,7 +579,7 @@ public class DefaultHttp2ConnectionTest {
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights()); assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
// Level 4 // Level 4
p = p.child(streamE.id()); p = child(p, streamE.id());
assertNotNull(p); assertNotNull(p);
assertEquals(streamC.id(), p.parent().id()); assertEquals(streamC.id(), p.parent().id());
assertEquals(0, p.numChildren()); assertEquals(0, p.numChildren());
@ -613,7 +617,7 @@ public class DefaultHttp2ConnectionTest {
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights()); assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
// Level 1 // Level 1
p = p.child(streamA.id()); p = child(p, streamA.id());
assertNotNull(p); assertNotNull(p);
assertEquals(0, p.parent().id()); assertEquals(0, p.parent().id());
assertEquals(1, p.numChildren()); assertEquals(1, p.numChildren());
@ -621,7 +625,7 @@ public class DefaultHttp2ConnectionTest {
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights()); assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
// Level 2 // Level 2
p = p.child(streamB.id()); p = child(p, streamB.id());
assertNotNull(p); assertNotNull(p);
assertEquals(streamA.id(), p.parent().id()); assertEquals(streamA.id(), p.parent().id());
assertEquals(1, p.numChildren()); assertEquals(1, p.numChildren());
@ -629,7 +633,7 @@ public class DefaultHttp2ConnectionTest {
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights()); assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
// Level 3 // Level 3
p = p.child(streamC.id()); p = child(p, streamC.id());
assertNotNull(p); assertNotNull(p);
assertEquals(streamB.id(), p.parent().id()); assertEquals(streamB.id(), p.parent().id());
assertEquals(2, p.numChildren()); assertEquals(2, p.numChildren());
@ -637,12 +641,12 @@ public class DefaultHttp2ConnectionTest {
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights()); assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
// Level 4 // Level 4
p = p.child(streamE.id()); p = child(p, streamE.id());
assertNotNull(p); assertNotNull(p);
assertEquals(streamC.id(), p.parent().id()); assertEquals(streamC.id(), p.parent().id());
assertEquals(0, p.numChildren()); assertEquals(0, p.numChildren());
assertEquals(1, p.prioritizableForTree()); assertEquals(1, p.prioritizableForTree());
p = p.parent().child(streamF.id()); p = child(p.parent(), streamF.id());
assertNotNull(p); assertNotNull(p);
assertEquals(streamC.id(), p.parent().id()); assertEquals(streamC.id(), p.parent().id());
assertEquals(0, p.numChildren()); assertEquals(0, p.numChildren());
@ -708,37 +712,37 @@ public class DefaultHttp2ConnectionTest {
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights()); assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
// Level 1 // Level 1
p = p.child(streamD.id()); p = child(p, streamD.id());
assertNotNull(p); assertNotNull(p);
assertEquals(2, p.numChildren()); assertEquals(2, p.numChildren());
assertEquals(6, p.prioritizableForTree()); assertEquals(6, p.prioritizableForTree());
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights()); assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
// Level 2 // Level 2
p = p.child(streamF.id()); p = child(p, streamF.id());
assertNotNull(p); assertNotNull(p);
assertEquals(0, p.numChildren()); assertEquals(0, p.numChildren());
assertEquals(1, p.prioritizableForTree()); assertEquals(1, p.prioritizableForTree());
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights()); assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
p = p.parent().child(streamA.id()); p = child(p.parent(), streamA.id());
assertNotNull(p); assertNotNull(p);
assertEquals(2, p.numChildren()); assertEquals(2, p.numChildren());
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights()); assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
// Level 3 // Level 3
p = p.child(streamB.id()); p = child(p, streamB.id());
assertNotNull(p); assertNotNull(p);
assertEquals(0, p.numChildren()); assertEquals(0, p.numChildren());
assertEquals(1, p.prioritizableForTree()); assertEquals(1, p.prioritizableForTree());
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights()); assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
p = p.parent().child(streamC.id()); p = child(p.parent(), streamC.id());
assertNotNull(p); assertNotNull(p);
assertEquals(1, p.numChildren()); assertEquals(1, p.numChildren());
assertEquals(2, p.prioritizableForTree()); assertEquals(2, p.prioritizableForTree());
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights()); assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
// Level 4; // Level 4;
p = p.child(streamE.id()); p = child(p, streamE.id());
assertNotNull(p); assertNotNull(p);
assertEquals(0, p.numChildren()); assertEquals(0, p.numChildren());
assertEquals(1, p.prioritizableForTree()); assertEquals(1, p.prioritizableForTree());
@ -806,38 +810,38 @@ public class DefaultHttp2ConnectionTest {
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights()); assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
// Level 1 // Level 1
p = p.child(streamD.id()); p = child(p, streamD.id());
assertNotNull(p); assertNotNull(p);
assertEquals(1, p.numChildren()); assertEquals(1, p.numChildren());
assertEquals(6, p.prioritizableForTree()); assertEquals(6, p.prioritizableForTree());
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights()); assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
// Level 2 // Level 2
p = p.child(streamA.id()); p = child(p, streamA.id());
assertNotNull(p); assertNotNull(p);
assertEquals(3, p.numChildren()); assertEquals(3, p.numChildren());
assertEquals(5, p.prioritizableForTree()); assertEquals(5, p.prioritizableForTree());
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights()); assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
// Level 3 // Level 3
p = p.child(streamB.id()); p = child(p, streamB.id());
assertNotNull(p); assertNotNull(p);
assertEquals(0, p.numChildren()); assertEquals(0, p.numChildren());
assertEquals(1, p.prioritizableForTree()); assertEquals(1, p.prioritizableForTree());
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights()); assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
p = p.parent().child(streamF.id()); p = child(p.parent(), streamF.id());
assertNotNull(p); assertNotNull(p);
assertEquals(0, p.numChildren()); assertEquals(0, p.numChildren());
assertEquals(1, p.prioritizableForTree()); assertEquals(1, p.prioritizableForTree());
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights()); assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
p = p.parent().child(streamC.id()); p = child(p.parent(), streamC.id());
assertNotNull(p); assertNotNull(p);
assertEquals(1, p.numChildren()); assertEquals(1, p.numChildren());
assertEquals(2, p.prioritizableForTree()); assertEquals(2, p.prioritizableForTree());
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights()); assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
// Level 4; // Level 4;
p = p.child(streamE.id()); p = child(p, streamE.id());
assertNotNull(p); assertNotNull(p);
assertEquals(0, p.numChildren()); assertEquals(0, p.numChildren());
assertEquals(1, p.prioritizableForTree()); assertEquals(1, p.prioritizableForTree());
@ -876,7 +880,8 @@ public class DefaultHttp2ConnectionTest {
} }
} }
private static void verifyDependUponIdleStream(Http2Stream streamA, Http2Stream streamB, Endpoint<?> endpoint) { private static void verifyDependUponIdleStream(final Http2Stream streamA, Http2Stream streamB, Endpoint<?> endpoint)
throws Http2Exception {
assertNotNull(streamB); assertNotNull(streamB);
assertEquals(streamB.id(), endpoint.lastStreamCreated()); assertEquals(streamB.id(), endpoint.lastStreamCreated());
assertEquals(State.IDLE, streamB.state()); assertEquals(State.IDLE, streamB.state());
@ -884,7 +889,13 @@ public class DefaultHttp2ConnectionTest {
assertEquals(DEFAULT_PRIORITY_WEIGHT, streamB.weight()); assertEquals(DEFAULT_PRIORITY_WEIGHT, streamB.weight());
assertEquals(streamB, streamA.parent()); assertEquals(streamB, streamA.parent());
assertEquals(1, streamB.numChildren()); assertEquals(1, streamB.numChildren());
assertEquals(streamA, streamB.children().iterator().next()); streamB.forEachChild(new Http2StreamVisitor() {
@Override
public boolean visit(Http2Stream stream) throws Http2Exception {
assertEquals(streamA, stream);
return false;
}
});
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@ -899,4 +910,23 @@ public class DefaultHttp2ConnectionTest {
private void verifyParentChanged(Http2Stream stream, Http2Stream oldParent) { private void verifyParentChanged(Http2Stream stream, Http2Stream oldParent) {
verify(clientListener).onPriorityTreeParentChanged(streamEq(stream), streamEq(oldParent)); verify(clientListener).onPriorityTreeParentChanged(streamEq(stream), streamEq(oldParent));
} }
private Http2Stream child(Http2Stream parent, final int id) {
try {
final Holder<Http2Stream> streamHolder = new Holder<Http2Stream>();
parent.forEachChild(new Http2StreamVisitor() {
@Override
public boolean visit(Http2Stream stream) throws Http2Exception {
if (stream.id() == id) {
streamHolder.value = stream;
return false;
}
return true;
}
});
return streamHolder.value;
} catch (Http2Exception e) {
PlatformDependent.throwException(e);
return null;
}
}
} }

View File

@ -44,7 +44,6 @@ import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise; import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultChannelPromise; import io.netty.channel.DefaultChannelPromise;
import io.netty.handler.codec.http2.Http2Connection.StreamVisitor;
import io.netty.util.concurrent.GenericFutureListener; import io.netty.util.concurrent.GenericFutureListener;
import java.util.List; import java.util.List;
@ -115,13 +114,13 @@ public class Http2ConnectionHandlerTest {
doAnswer(new Answer<Http2Stream>() { doAnswer(new Answer<Http2Stream>() {
@Override @Override
public Http2Stream answer(InvocationOnMock in) throws Throwable { public Http2Stream answer(InvocationOnMock in) throws Throwable {
StreamVisitor visitor = in.getArgumentAt(0, StreamVisitor.class); Http2StreamVisitor visitor = in.getArgumentAt(0, Http2StreamVisitor.class);
if (!visitor.visit(stream)) { if (!visitor.visit(stream)) {
return stream; return stream;
} }
return null; return null;
} }
}).when(connection).forEachActiveStream(any(StreamVisitor.class)); }).when(connection).forEachActiveStream(any(Http2StreamVisitor.class));
when(connection.stream(NON_EXISTANT_STREAM_ID)).thenReturn(null); when(connection.stream(NON_EXISTANT_STREAM_ID)).thenReturn(null);
when(connection.numActiveStreams()).thenReturn(1); when(connection.numActiveStreams()).thenReturn(1);
when(connection.stream(STREAM_ID)).thenReturn(stream); when(connection.stream(STREAM_ID)).thenReturn(stream);
@ -288,7 +287,7 @@ public class Http2ConnectionHandlerTest {
public Http2Stream answer(InvocationOnMock in) throws Throwable { public Http2Stream answer(InvocationOnMock in) throws Throwable {
return null; return null;
} }
}).when(connection).forEachActiveStream(any(StreamVisitor.class)); }).when(connection).forEachActiveStream(any(Http2StreamVisitor.class));
when(connection.numActiveStreams()).thenReturn(0); when(connection.numActiveStreams()).thenReturn(0);
// Simulate the future being completed. // Simulate the future being completed.
listener.operationComplete(future); listener.operationComplete(future);