DefaultHttp2Connection.close Reentrant Modification

Motivation:
The DefaultHttp2Conneciton.close method accounts for active streams being iterated and attempts to avoid reentrant modifications of the underlying stream map by using iterators to remove from the stream map. However there are a few issues:

- While iterating over the stream map we don't prevent iterations over the active stream collection
- Removing a single stream may actually remove > 1 streams due to closed non-leaf streams being preserved in the priority tree which may result in NPE

Preserving closed non-leaf streams in the priority tree is no longer necessary with our current allocation algorithms, and so this feature (and related complexity) can be removed.

Modifications:
- DefaultHttp2Connection.close should prevent others from iterating over the active streams and reentrant modification scenarios which may result from this
- DefaultHttp2Connection should not keep closed stream in the priority tree
  - Remove all associated code in DefaultHttp2RemoteFlowController which accounts for this case including the ReducedState object
  - This includes fixing writability changes which depended on ReducedState
- Update unit tests

Result:
Fixes https://github.com/netty/netty/issues/5198
This commit is contained in:
Scott Mitchell 2016-05-03 13:52:56 -07:00
parent 9ce84dcb21
commit d580245afc
12 changed files with 348 additions and 575 deletions

View File

@ -133,18 +133,24 @@ public class DefaultHttp2Connection implements Http2Connection {
promise.trySuccess(null); promise.trySuccess(null);
return promise; return promise;
} }
Iterator<PrimitiveEntry<Http2Stream>> itr = streamMap.entries().iterator(); Iterator<PrimitiveEntry<Http2Stream>> itr = streamMap.entries().iterator();
// We must take care while iterating the streamMap as to not modify while iterating in case there are other code // We must take care while iterating the streamMap as to not modify while iterating in case there are other code
// paths iterating over the active streams. // paths iterating over the active streams.
if (activeStreams.allowModifications()) { if (activeStreams.allowModifications()) {
while (itr.hasNext()) { activeStreams.incrementPendingIterations();
DefaultStream stream = (DefaultStream) itr.next().value(); try {
if (stream.id() != CONNECTION_STREAM_ID) { while (itr.hasNext()) {
// If modifications of the activeStream map is allowed, then a stream close operation will also DefaultStream stream = (DefaultStream) itr.next().value();
// modify the streamMap. Pass the iterator in so that remove will be called to prevent concurrent if (stream.id() != CONNECTION_STREAM_ID) {
// modification exceptions. // If modifications of the activeStream map is allowed, then a stream close operation will also
stream.close(itr); // modify the streamMap. Pass the iterator in so that remove will be called to prevent
// concurrent modification exceptions.
stream.close(itr);
}
} }
} finally {
activeStreams.decrementPendingIterations();
} }
} else { } else {
while (itr.hasNext()) { while (itr.hasNext()) {
@ -220,8 +226,8 @@ public class DefaultHttp2Connection implements Http2Connection {
for (int i = 0; i < listeners.size(); ++i) { for (int i = 0; i < listeners.size(); ++i) {
try { try {
listeners.get(i).onGoAwayReceived(lastKnownStream, errorCode, debugData); listeners.get(i).onGoAwayReceived(lastKnownStream, errorCode, debugData);
} catch (RuntimeException e) { } catch (Throwable cause) {
logger.error("Caught RuntimeException from listener onGoAwayReceived.", e); logger.error("Caught Throwable from listener onGoAwayReceived.", cause);
} }
} }
@ -251,8 +257,8 @@ public class DefaultHttp2Connection implements Http2Connection {
for (int i = 0; i < listeners.size(); ++i) { for (int i = 0; i < listeners.size(); ++i) {
try { try {
listeners.get(i).onGoAwaySent(lastKnownStream, errorCode, debugData); listeners.get(i).onGoAwaySent(lastKnownStream, errorCode, debugData);
} catch (RuntimeException e) { } catch (Throwable cause) {
logger.error("Caught RuntimeException from listener onGoAwaySent.", e); logger.error("Caught Throwable from listener onGoAwaySent.", cause);
} }
} }
@ -279,16 +285,12 @@ public class DefaultHttp2Connection implements Http2Connection {
} }
/** /**
* Closed streams may stay in the priority tree if they have dependents that are in prioritizable states. * Remove a stream from the {@link #streamMap}.
* When a stream is requested to be removed we can only actually remove that stream when there are no more * @param stream the stream to remove.
* prioritizable children. * @param itr an iterator that may be pointing to the stream during iteration and {@link Iterator#remove()} will be
* (see [1] {@link Http2Stream#prioritizableForTree()} and [2] {@link DefaultStream#removeChild(DefaultStream)}). * used if non-{@code null}.
* When a priority tree edge changes we also have to re-evaluate viable nodes
* (see [3] {@link DefaultStream#takeChild(DefaultStream, boolean, List)}).
* @param stream The stream to remove.
*/ */
void removeStream(DefaultStream stream, Iterator<?> itr) { void removeStream(DefaultStream stream, Iterator<?> itr) {
// [1] Check if this stream can be removed because it has no prioritizable descendants.
if (stream.parent().removeChild(stream)) { if (stream.parent().removeChild(stream)) {
if (itr == null) { if (itr == null) {
streamMap.remove(stream.id()); streamMap.remove(stream.id());
@ -299,8 +301,8 @@ public class DefaultHttp2Connection implements Http2Connection {
for (int i = 0; i < listeners.size(); i++) { for (int i = 0; i < listeners.size(); i++) {
try { try {
listeners.get(i).onStreamRemoved(stream); listeners.get(i).onStreamRemoved(stream);
} catch (RuntimeException e) { } catch (Throwable cause) {
logger.error("Caught RuntimeException from listener onStreamRemoved.", e); logger.error("Caught Throwable from listener onStreamRemoved.", cause);
} }
} }
@ -329,8 +331,8 @@ public class DefaultHttp2Connection implements Http2Connection {
for (int i = 0; i < listeners.size(); i++) { for (int i = 0; i < listeners.size(); i++) {
try { try {
listeners.get(i).onStreamHalfClosed(stream); listeners.get(i).onStreamHalfClosed(stream);
} catch (RuntimeException e) { } catch (Throwable cause) {
logger.error("Caught RuntimeException from listener onStreamHalfClosed.", e); logger.error("Caught Throwable from listener onStreamHalfClosed.", cause);
} }
} }
} }
@ -339,8 +341,8 @@ public class DefaultHttp2Connection implements Http2Connection {
for (int i = 0; i < listeners.size(); i++) { for (int i = 0; i < listeners.size(); i++) {
try { try {
listeners.get(i).onStreamClosed(stream); listeners.get(i).onStreamClosed(stream);
} catch (RuntimeException e) { } catch (Throwable cause) {
logger.error("Caught RuntimeException from listener onStreamClosed.", e); logger.error("Caught Throwable from listener onStreamClosed.", cause);
} }
} }
} }
@ -371,7 +373,6 @@ public class DefaultHttp2Connection implements Http2Connection {
private short weight = DEFAULT_PRIORITY_WEIGHT; private short weight = DEFAULT_PRIORITY_WEIGHT;
private DefaultStream parent; private DefaultStream parent;
private IntObjectMap<DefaultStream> children = IntCollections.emptyMap(); private IntObjectMap<DefaultStream> children = IntCollections.emptyMap();
private int prioritizableForTree = 1;
private boolean resetSent; private boolean resetSent;
DefaultStream(int id, State state) { DefaultStream(int id, State state) {
@ -430,11 +431,6 @@ public class DefaultHttp2Connection implements Http2Connection {
return parent; return parent;
} }
@Override
public final int prioritizableForTree() {
return prioritizableForTree;
}
@Override @Override
public final boolean isDescendantOf(Http2Stream stream) { public final boolean isDescendantOf(Http2Stream stream) {
Http2Stream next = parent(); Http2Stream next = parent();
@ -521,7 +517,6 @@ public class DefaultHttp2Connection implements Http2Connection {
} }
state = CLOSED; state = CLOSED;
decrementPrioritizableForTree(1);
activeStreams.deactivate(this, itr); activeStreams.deactivate(this, itr);
return this; return this;
@ -564,59 +559,6 @@ public class DefaultHttp2Connection implements Http2Connection {
return this; return this;
} }
/**
* Recursively increment the {@link #prioritizableForTree} for this object up the parent links until
* either we go past the root or {@code oldParent} is encountered.
* @param amt The amount to increment by. This must be positive.
* @param oldParent The previous parent for this stream.
*/
private void incrementPrioritizableForTree(int amt, Http2Stream oldParent) {
if (amt != 0) {
incrementPrioritizableForTree0(amt, oldParent);
}
}
/**
* Direct calls to this method are discouraged.
* Instead use {@link #incrementPrioritizableForTree(int, Http2Stream)}.
*/
private void incrementPrioritizableForTree0(int amt, Http2Stream oldParent) {
assert amt > 0 && Integer.MAX_VALUE - amt >= prioritizableForTree;
prioritizableForTree += amt;
if (parent != null && parent != oldParent) {
parent.incrementPrioritizableForTree0(amt, oldParent);
}
}
/**
* Recursively increment the {@link #prioritizableForTree} for this object up the parent links until
* either we go past the root.
* @param amt The amount to decrement by. This must be positive.
*/
private void decrementPrioritizableForTree(int amt) {
if (amt != 0) {
decrementPrioritizableForTree0(amt);
}
}
/**
* Direct calls to this method are discouraged. Instead use {@link #decrementPrioritizableForTree(int)}.
*/
private void decrementPrioritizableForTree0(int amt) {
assert amt > 0 && prioritizableForTree >= amt;
prioritizableForTree -= amt;
if (parent != null) {
parent.decrementPrioritizableForTree0(amt);
}
}
/**
* Determine if this node by itself is considered to be valid in the priority tree.
*/
private boolean isPrioritizable() {
return state != CLOSED;
}
private void initChildrenIfEmpty() { private void initChildrenIfEmpty() {
if (children == IntCollections.<DefaultStream>emptyMap()) { if (children == IntCollections.<DefaultStream>emptyMap()) {
initChildren(); initChildren();
@ -642,8 +584,8 @@ public class DefaultHttp2Connection implements Http2Connection {
for (int i = 0; i < listeners.size(); i++) { for (int i = 0; i < listeners.size(); i++) {
try { try {
listeners.get(i).onWeightChanged(this, oldWeight); listeners.get(i).onWeightChanged(this, oldWeight);
} catch (RuntimeException e) { } catch (Throwable cause) {
logger.error("Caught RuntimeException from listener onWeightChanged.", e); logger.error("Caught Throwable from listener onWeightChanged.", cause);
} }
} }
} }
@ -660,11 +602,7 @@ public class DefaultHttp2Connection implements Http2Connection {
// This map should be re-initialized in anticipation for the 1 exclusive child which will be added. // This map should be re-initialized in anticipation for the 1 exclusive child which will be added.
// It will either be added directly in this method, or after this method is called...but it will be added. // It will either be added directly in this method, or after this method is called...but it will be added.
initChildren(); initChildren();
if (streamToRetain == null) { if (streamToRetain != null) {
prioritizableForTree = isPrioritizable() ? 1 : 0;
} else {
// prioritizableForTree does not change because it is assumed all children node will still be
// descendants through an exclusive priority tree operation.
children.put(streamToRetain.id(), streamToRetain); children.put(streamToRetain.id(), streamToRetain);
} }
return prevChildren; return prevChildren;
@ -681,21 +619,11 @@ public class DefaultHttp2Connection implements Http2Connection {
events.add(new ParentChangedEvent(child, oldParent)); events.add(new ParentChangedEvent(child, oldParent));
notifyParentChanging(child, this); notifyParentChanging(child, this);
child.parent = this; child.parent = this;
// We need the removal operation to happen first so the prioritizableForTree for the old parent to root // Note that the removal operation may not be successful and may return null. This is because when an
// path is updated with the correct child.prioritizableForTree() value. Note that the removal operation // exclusive dependency is processed the children are removed in a previous recursive call but the
// may not be successful and may return null. This is because when an exclusive dependency is processed // child's parent link is updated here.
// the children are removed in a previous recursive call but the child's parent link is updated here. if (oldParent != null) {
if (oldParent != null && oldParent.children.remove(child.id()) != null && oldParent.children.remove(child.id());
!child.isDescendantOf(oldParent)) {
oldParent.decrementPrioritizableForTree(child.prioritizableForTree());
if (oldParent.prioritizableForTree() == 0) {
// There are a few risks with immediately removing nodes from the priority tree:
// 1. We are removing nodes while we are potentially shifting the tree. There are no
// concrete cases known but is risky because it could invalidate the data structure.
// 2. We are notifying listeners of the removal while the tree is in flux. Currently the
// codec listeners make no assumptions about priority tree structure when being notified.
removeStream(oldParent, null);
}
} }
// Lazily initialize the children to save object allocations. // Lazily initialize the children to save object allocations.
@ -703,7 +631,6 @@ public class DefaultHttp2Connection implements Http2Connection {
final Http2Stream oldChild = children.put(child.id(), child); final Http2Stream oldChild = children.put(child.id(), child);
assert oldChild == null : "A stream with the same stream ID was already in the child map."; assert oldChild == null : "A stream with the same stream ID was already in the child map.";
incrementPrioritizableForTree(child.prioritizableForTree(), oldParent);
} }
if (exclusive && !children.isEmpty()) { if (exclusive && !children.isEmpty()) {
@ -719,26 +646,17 @@ public class DefaultHttp2Connection implements Http2Connection {
* Removes the child priority and moves any of its dependencies to being direct dependencies on this node. * Removes the child priority and moves any of its dependencies to being direct dependencies on this node.
*/ */
final boolean removeChild(DefaultStream child) { final boolean removeChild(DefaultStream child) {
if (child.prioritizableForTree() == 0 && children.remove(child.id()) != null) { if (children.remove(child.id()) != null) {
List<ParentChangedEvent> events = new ArrayList<ParentChangedEvent>(1 + child.numChildren()); List<ParentChangedEvent> events = new ArrayList<ParentChangedEvent>(1 + child.numChildren());
events.add(new ParentChangedEvent(child, child.parent())); events.add(new ParentChangedEvent(child, child.parent()));
notifyParentChanging(child, null); notifyParentChanging(child, null);
child.parent = null; child.parent = null;
decrementPrioritizableForTree(child.prioritizableForTree());
// Move up any grand children to be directly dependent on this node. // Move up any grand children to be directly dependent on this node.
for (DefaultStream grandchild : child.children.values()) { for (DefaultStream grandchild : child.children.values()) {
takeChild(grandchild, false, events); takeChild(grandchild, false, events);
} }
if (prioritizableForTree() == 0) {
// There are a few risks with immediately removing nodes from the priority tree:
// 1. We are removing nodes while we are potentially shifting the tree. There are no
// concrete cases known but is risky because it could invalidate the data structure.
// 2. We are notifying listeners of the removal while the tree is in flux. Currently the
// codec listeners make no assumptions about priority tree structure when being notified.
removeStream(this, null);
}
notifyParentChanged(events); notifyParentChanged(events);
return true; return true;
} }
@ -809,8 +727,8 @@ public class DefaultHttp2Connection implements Http2Connection {
public void notifyListener(Listener l) { public void notifyListener(Listener l) {
try { try {
l.onPriorityTreeParentChanged(stream, oldParent); l.onPriorityTreeParentChanged(stream, oldParent);
} catch (RuntimeException e) { } catch (Throwable cause) {
logger.error("Caught RuntimeException from listener onPriorityTreeParentChanged.", e); logger.error("Caught Throwable from listener onPriorityTreeParentChanged.", cause);
} }
} }
} }
@ -832,8 +750,8 @@ public class DefaultHttp2Connection implements Http2Connection {
for (int i = 0; i < listeners.size(); i++) { for (int i = 0; i < listeners.size(); i++) {
try { try {
listeners.get(i).onPriorityTreeParentChanging(stream, newParent); listeners.get(i).onPriorityTreeParentChanging(stream, newParent);
} catch (RuntimeException e) { } catch (Throwable cause) {
logger.error("Caught RuntimeException from listener onPriorityTreeParentChanging.", e); logger.error("Caught Throwable from listener onPriorityTreeParentChanging.", cause);
} }
} }
} }
@ -1030,8 +948,8 @@ public class DefaultHttp2Connection implements Http2Connection {
for (int i = 0; i < listeners.size(); i++) { for (int i = 0; i < listeners.size(); i++) {
try { try {
listeners.get(i).onStreamAdded(stream); listeners.get(i).onStreamAdded(stream);
} catch (RuntimeException e) { } catch (Throwable cause) {
logger.error("Caught RuntimeException from listener onStreamAdded.", e); logger.error("Caught Throwable from listener onStreamAdded.", cause);
} }
} }
@ -1177,7 +1095,7 @@ public class DefaultHttp2Connection implements Http2Connection {
} }
public void deactivate(final DefaultStream stream, final Iterator<?> itr) { public void deactivate(final DefaultStream stream, final Iterator<?> itr) {
if (allowModifications()) { if (allowModifications() || itr != null) {
removeFromActiveStreams(stream, itr); removeFromActiveStreams(stream, itr);
} else { } else {
pendingEvents.add(new Event() { pendingEvents.add(new Event() {
@ -1198,7 +1116,7 @@ public class DefaultHttp2Connection implements Http2Connection {
} }
public Http2Stream forEachActiveStream(Http2StreamVisitor visitor) throws Http2Exception { public Http2Stream forEachActiveStream(Http2StreamVisitor visitor) throws Http2Exception {
++pendingIterations; incrementPendingIterations();
try { try {
for (Http2Stream stream : streams) { for (Http2Stream stream : streams) {
if (!visitor.visit(stream)) { if (!visitor.visit(stream)) {
@ -1207,20 +1125,7 @@ public class DefaultHttp2Connection implements Http2Connection {
} }
return null; return null;
} finally { } finally {
--pendingIterations; decrementPendingIterations();
if (allowModifications()) {
for (;;) {
Event event = pendingEvents.poll();
if (event == null) {
break;
}
try {
event.process();
} catch (RuntimeException e) {
logger.error("Caught RuntimeException while processing pending ActiveStreams$Event.", e);
}
}
}
} }
} }
@ -1232,8 +1137,8 @@ public class DefaultHttp2Connection implements Http2Connection {
for (int i = 0; i < listeners.size(); i++) { for (int i = 0; i < listeners.size(); i++) {
try { try {
listeners.get(i).onStreamActive(stream); listeners.get(i).onStreamActive(stream);
} catch (RuntimeException e) { } catch (Throwable cause) {
logger.error("Caught RuntimeException from listener onStreamActive.", e); logger.error("Caught Throwable from listener onStreamActive.", cause);
} }
} }
} }
@ -1251,6 +1156,27 @@ public class DefaultHttp2Connection implements Http2Connection {
boolean allowModifications() { boolean allowModifications() {
return pendingIterations == 0; return pendingIterations == 0;
} }
void incrementPendingIterations() {
++pendingIterations;
}
void decrementPendingIterations() {
--pendingIterations;
if (allowModifications()) {
for (;;) {
Event event = pendingEvents.poll();
if (event == null) {
break;
}
try {
event.process();
} catch (Throwable cause) {
logger.error("Caught Throwable while processing pending ActiveStreams$Event.", cause);
}
}
}
}
} }
/** /**

View File

@ -284,7 +284,6 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController
} }
private FlowState state(Http2Stream stream) { private FlowState state(Http2Stream stream) {
checkNotNull(stream, "stream");
return stream.getProperty(stateKey); return stream.getProperty(stateKey);
} }

View File

@ -14,18 +14,9 @@
*/ */
package io.netty.handler.codec.http2; package io.netty.handler.codec.http2;
import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_WINDOW_SIZE;
import static io.netty.handler.codec.http2.Http2Error.FLOW_CONTROL_ERROR;
import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
import static io.netty.handler.codec.http2.Http2Exception.streamError;
import static io.netty.handler.codec.http2.Http2Stream.State.HALF_CLOSED_LOCAL;
import static io.netty.handler.codec.http2.Http2Stream.State.IDLE;
import static io.netty.util.internal.ObjectUtil.checkNotNull;
import static java.lang.Math.max;
import static java.lang.Math.min;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http2.StreamByteDistributor.Writer; import io.netty.handler.codec.http2.StreamByteDistributor.Writer;
import io.netty.util.BooleanSupplier;
import io.netty.util.internal.UnstableApi; import io.netty.util.internal.UnstableApi;
import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory; import io.netty.util.internal.logging.InternalLoggerFactory;
@ -33,6 +24,15 @@ import io.netty.util.internal.logging.InternalLoggerFactory;
import java.util.ArrayDeque; import java.util.ArrayDeque;
import java.util.Deque; import java.util.Deque;
import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_WINDOW_SIZE;
import static io.netty.handler.codec.http2.Http2Error.FLOW_CONTROL_ERROR;
import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
import static io.netty.handler.codec.http2.Http2Exception.streamError;
import static io.netty.handler.codec.http2.Http2Stream.State.HALF_CLOSED_LOCAL;
import static io.netty.util.internal.ObjectUtil.checkNotNull;
import static java.lang.Math.max;
import static java.lang.Math.min;
/** /**
* Basic implementation of {@link Http2RemoteFlowController}. * Basic implementation of {@link Http2RemoteFlowController}.
* <p> * <p>
@ -40,7 +40,6 @@ import java.util.Deque;
* Typically this thread is the event loop thread for the {@link ChannelHandlerContext} managed by this class. * Typically this thread is the event loop thread for the {@link ChannelHandlerContext} managed by this class.
*/ */
@UnstableApi @UnstableApi
public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowController { public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowController {
private static final InternalLogger logger = private static final InternalLogger logger =
InternalLoggerFactory.getInstance(DefaultHttp2RemoteFlowController.class); InternalLoggerFactory.getInstance(DefaultHttp2RemoteFlowController.class);
@ -48,7 +47,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
private final Http2Connection connection; private final Http2Connection connection;
private final Http2Connection.PropertyKey stateKey; private final Http2Connection.PropertyKey stateKey;
private final StreamByteDistributor streamByteDistributor; private final StreamByteDistributor streamByteDistributor;
private final AbstractState connectionState; private final FlowState connectionState;
private int initialWindowSize = DEFAULT_WINDOW_SIZE; private int initialWindowSize = DEFAULT_WINDOW_SIZE;
private WritabilityMonitor monitor; private WritabilityMonitor monitor;
private ChannelHandlerContext ctx; private ChannelHandlerContext ctx;
@ -74,12 +73,12 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
// Add a flow state for the connection. // Add a flow state for the connection.
stateKey = connection.newKey(); stateKey = connection.newKey();
connectionState = new DefaultState(connection.connectionStream(), initialWindowSize, connectionState = new FlowState(connection.connectionStream());
initialWindowSize > 0 && isChannelWritable());
connection.connectionStream().setProperty(stateKey, connectionState); connection.connectionStream().setProperty(stateKey, connectionState);
// Monitor may depend upon connectionState, and so initialize after connectionState // Monitor may depend upon connectionState, and so initialize after connectionState
listener(listener); listener(listener);
monitor.windowSize(connectionState, initialWindowSize);
// Register for notification of new streams. // Register for notification of new streams.
connection.addListener(new Http2ConnectionAdapter() { connection.addListener(new Http2ConnectionAdapter() {
@ -87,40 +86,21 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
public void onStreamAdded(Http2Stream stream) { public void onStreamAdded(Http2Stream stream) {
// If the stream state is not open then the stream is not yet eligible for flow controlled frames and // If the stream state is not open then the stream is not yet eligible for flow controlled frames and
// only requires the ReducedFlowState. Otherwise the full amount of memory is required. // only requires the ReducedFlowState. Otherwise the full amount of memory is required.
stream.setProperty(stateKey, stream.state() == IDLE ? stream.setProperty(stateKey, new FlowState(stream));
new ReducedState(stream) :
new DefaultState(stream, 0,
isWritable(DefaultHttp2RemoteFlowController.this.connection.connectionStream())));
} }
@Override @Override
public void onStreamActive(Http2Stream stream) { public void onStreamActive(Http2Stream stream) {
// If the object was previously created, but later activated then we have to ensure // If the object was previously created, but later activated then we have to ensure the proper
// the full state is allocated and the proper initialWindowSize is used. // initialWindowSize is used.
AbstractState state = state(stream); monitor.windowSize(state(stream), initialWindowSize);
if (state.getClass() == DefaultState.class) {
state.window(initialWindowSize);
} else {
stream.setProperty(stateKey, new DefaultState(state, initialWindowSize));
}
} }
@Override @Override
public void onStreamClosed(Http2Stream stream) { public void onStreamClosed(Http2Stream stream) {
// Any pending frames can never be written, cancel and // Any pending frames can never be written, cancel and
// write errors for any pending frames. // write errors for any pending frames.
AbstractState state = state(stream); state(stream).cancel();
state.cancel();
// If the stream is now eligible for removal, but will persist in the priority tree then we can
// decrease the amount of memory required for this stream because no flow controlled frames can
// be exchanged on this stream
if (stream.prioritizableForTree() != 0) {
state = new ReducedState(state);
stream.setProperty(stateKey, state);
}
// Tell the monitor after cancel has been called and after the new state is used.
monitor.stateCancelled(state);
} }
@Override @Override
@ -137,9 +117,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
* *
* This is to cancel any such illegal writes. * This is to cancel any such illegal writes.
*/ */
AbstractState state = state(stream); state(stream).cancel();
state.cancel();
monitor.stateCancelled(state);
} }
} }
}); });
@ -211,11 +189,6 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
monitor = listener == null ? new WritabilityMonitor() : new ListenerWritabilityMonitor(listener); monitor = listener == null ? new WritabilityMonitor() : new ListenerWritabilityMonitor(listener);
} }
@Override
public int initialWindowSize(Http2Stream stream) {
return state(stream).initialWindowSize();
}
@Override @Override
public void incrementWindowSize(Http2Stream stream, int delta) throws Http2Exception { public void incrementWindowSize(Http2Stream stream, int delta) throws Http2Exception {
assert ctx == null || ctx.executor().inEventLoop(); assert ctx == null || ctx.executor().inEventLoop();
@ -239,8 +212,8 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
return state(stream).hasFrame(); return state(stream).hasFrame();
} }
private AbstractState state(Http2Stream stream) { private FlowState state(Http2Stream stream) {
return (AbstractState) checkNotNull(stream, "stream").getProperty(stateKey); return (FlowState) stream.getProperty(stateKey);
} }
/** /**
@ -285,25 +258,65 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
/** /**
* The remote flow control state for a single stream. * The remote flow control state for a single stream.
*/ */
private final class DefaultState extends AbstractState { private final class FlowState implements StreamByteDistributor.StreamState {
private final Http2Stream stream;
private final Deque<FlowControlled> pendingWriteQueue; private final Deque<FlowControlled> pendingWriteQueue;
private int window; private int window;
private int pendingBytes; private int pendingBytes;
// Set to true while a frame is being written, false otherwise. private boolean markedWritable;
private boolean writing;
// Set to true if cancel() was called.
private boolean cancelled;
DefaultState(Http2Stream stream, int initialWindowSize, boolean markedWritable) { /**
super(stream, markedWritable); * Set to true while a frame is being written, false otherwise.
window(initialWindowSize); */
private boolean writing;
/**
* Set to true if cancel() was called.
*/
private boolean cancelled;
private BooleanSupplier isWritableSupplier = new BooleanSupplier() {
@Override
public boolean get() throws Exception {
return windowSize() - pendingBytes() > 0;
}
};
FlowState(Http2Stream stream) {
this.stream = stream;
pendingWriteQueue = new ArrayDeque<FlowControlled>(2); pendingWriteQueue = new ArrayDeque<FlowControlled>(2);
} }
DefaultState(AbstractState existingState, int initialWindowSize) { /**
super(existingState); * Determine if the stream associated with this object is writable.
window(initialWindowSize); * @return {@code true} if the stream associated with this object is writable.
pendingWriteQueue = new ArrayDeque<FlowControlled>(2); */
boolean isWritable() {
try {
return isWritableSupplier.get();
} catch (Throwable cause) {
throw new Error("isWritableSupplier should never throw!", cause);
}
}
/**
* The stream this state is associated with.
*/
@Override
public Http2Stream stream() {
return stream;
}
/**
* Returns the parameter from the last call to {@link #markedWritability(boolean)}.
*/
boolean markedWritability() {
return markedWritable;
}
/**
* Save the state of writability.
*/
void markedWritability(boolean isWritable) {
this.markedWritable = isWritable;
} }
@Override @Override
@ -311,17 +324,17 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
return window; return window;
} }
@Override /**
int initialWindowSize() { * Reset the window size for this stream.
return initialWindowSize; */
} void windowSize(int initialWindowSize) {
@Override
void window(int initialWindowSize) {
window = initialWindowSize; window = initialWindowSize;
} }
@Override /**
* Write the allocated bytes for this stream.
* @return the number of bytes written for a stream or {@code -1} if no write occurred.
*/
int writeAllocatedBytes(int allocated) { int writeAllocatedBytes(int allocated) {
final int initialAllocated = allocated; final int initialAllocated = allocated;
int writtenBytes; int writtenBytes;
@ -385,7 +398,9 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
return writtenBytes; return writtenBytes;
} }
@Override /**
* Increments the flow control window for this stream by the given delta and returns the new value.
*/
int incrementStreamWindow(int delta) throws Http2Exception { int incrementStreamWindow(int delta) throws Http2Exception {
if (delta > 0 && Integer.MAX_VALUE - delta < window) { if (delta > 0 && Integer.MAX_VALUE - delta < window) {
throw streamError(stream.id(), FLOW_CONTROL_ERROR, throw streamError(stream.id(), FLOW_CONTROL_ERROR,
@ -409,7 +424,9 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
return pendingBytes; return pendingBytes;
} }
@Override /**
* Adds the {@code frame} to the pending queue and increments the pending byte count.
*/
void enqueueFrame(FlowControlled frame) { void enqueueFrame(FlowControlled frame) {
FlowControlled last = pendingWriteQueue.peekLast(); FlowControlled last = pendingWriteQueue.peekLast();
if (last == null) { if (last == null) {
@ -444,7 +461,9 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
return pendingWriteQueue.peek(); return pendingWriteQueue.peek();
} }
@Override /**
* Any operations that may be pending are cleared and the status of these operations is failed.
*/
void cancel() { void cancel() {
cancel(null); cancel(null);
} }
@ -459,6 +478,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
if (writing) { if (writing) {
return; return;
} }
for (;;) { for (;;) {
FlowControlled frame = pendingWriteQueue.poll(); FlowControlled frame = pendingWriteQueue.poll();
if (frame == null) { if (frame == null) {
@ -469,6 +489,9 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
} }
streamByteDistributor.updateStreamableBytes(this); streamByteDistributor.updateStreamableBytes(this);
isWritableSupplier = BooleanSupplier.FALSE_SUPPLIER;
monitor.stateCancelled(this);
} }
/** /**
@ -515,135 +538,6 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
} }
} }
/**
* The remote flow control state for a single stream that is not in a state where flow controlled frames cannot
* be exchanged.
*/
private final class ReducedState extends AbstractState {
ReducedState(Http2Stream stream) {
super(stream, false);
}
ReducedState(AbstractState existingState) {
super(existingState);
}
@Override
public int windowSize() {
return 0;
}
@Override
int initialWindowSize() {
return 0;
}
@Override
public int pendingBytes() {
return 0;
}
@Override
int writeAllocatedBytes(int allocated) {
throw new UnsupportedOperationException();
}
@Override
void cancel() {
}
@Override
void window(int initialWindowSize) {
throw new UnsupportedOperationException();
}
@Override
int incrementStreamWindow(int delta) throws Http2Exception {
// This operation needs to be supported during the initial settings exchange when
// the peer has not yet acknowledged this peer being activated.
return 0;
}
@Override
void enqueueFrame(FlowControlled frame) {
throw new UnsupportedOperationException();
}
@Override
public boolean hasFrame() {
return false;
}
}
/**
* An abstraction which provides specific extensions used by remote flow control.
*/
private abstract class AbstractState implements StreamByteDistributor.StreamState {
protected final Http2Stream stream;
private boolean markedWritable;
AbstractState(Http2Stream stream, boolean markedWritable) {
this.stream = stream;
this.markedWritable = markedWritable;
}
AbstractState(AbstractState existingState) {
stream = existingState.stream();
markedWritable = existingState.markWritability();
}
/**
* The stream this state is associated with.
*/
@Override
public final Http2Stream stream() {
return stream;
}
/**
* Returns the parameter from the last call to {@link #markWritability(boolean)}.
*/
final boolean markWritability() {
return markedWritable;
}
/**
* Save the state of writability.
*/
final void markWritability(boolean isWritable) {
this.markedWritable = isWritable;
}
abstract int initialWindowSize();
/**
* Write the allocated bytes for this stream.
*
* @return the number of bytes written for a stream or {@code -1} if no write occurred.
*/
abstract int writeAllocatedBytes(int allocated);
/**
* Any operations that may be pending are cleared and the status of these operations is failed.
*/
abstract void cancel();
/**
* Reset the window size for this stream.
*/
abstract void window(int initialWindowSize);
/**
* Increments the flow control window for this stream by the given delta and returns the new value.
*/
abstract int incrementStreamWindow(int delta) throws Http2Exception;
/**
* Adds the {@code frame} to the pending queue and increments the pending byte count.
*/
abstract void enqueueFrame(FlowControlled frame);
}
/** /**
* Abstract class which provides common functionality for writability monitor implementations. * Abstract class which provides common functionality for writability monitor implementations.
*/ */
@ -660,13 +554,22 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
* Called when the writability of the underlying channel changes. * Called when the writability of the underlying channel changes.
* @throws Http2Exception If a write occurs and an exception happens in the write operation. * @throws Http2Exception If a write occurs and an exception happens in the write operation.
*/ */
public void channelWritabilityChange() throws Http2Exception { } void channelWritabilityChange() throws Http2Exception { }
/** /**
* Called when the state is cancelled outside of a write operation. * Called when the state is cancelled.
* @param state the state that was cancelled. * @param state the state that was cancelled.
*/ */
public void stateCancelled(AbstractState state) { } void stateCancelled(FlowState state) { }
/**
* Set the initial window size for {@code state}.
* @param state the state to change the initial window size for.
* @param initialWindowSize the size of the window in bytes.
*/
void windowSize(FlowState state, int initialWindowSize) {
state.windowSize(initialWindowSize);
}
/** /**
* Increment the window size for a particular stream. * Increment the window size for a particular stream.
@ -674,7 +577,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
* @param delta The amount to increment by. * @param delta The amount to increment by.
* @throws Http2Exception If this operation overflows the window for {@code state}. * @throws Http2Exception If this operation overflows the window for {@code state}.
*/ */
public void incrementWindowSize(AbstractState state, int delta) throws Http2Exception { void incrementWindowSize(FlowState state, int delta) throws Http2Exception {
state.incrementStreamWindow(delta); state.incrementStreamWindow(delta);
} }
@ -684,7 +587,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
* @param frame the frame to enqueue. * @param frame the frame to enqueue.
* @throws Http2Exception If a writability error occurs. * @throws Http2Exception If a writability error occurs.
*/ */
public void enqueueFrame(AbstractState state, FlowControlled frame) throws Http2Exception { void enqueueFrame(FlowState state, FlowControlled frame) throws Http2Exception {
state.enqueueFrame(frame); state.enqueueFrame(frame);
} }
@ -693,7 +596,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
* method should be called. * method should be called.
* @param delta The amount to increment by. * @param delta The amount to increment by.
*/ */
public final void incrementPendingBytes(int delta) { final void incrementPendingBytes(int delta) {
totalPendingBytes += delta; totalPendingBytes += delta;
// Notification of writibilty change should be delayed until the end of the top level event. // Notification of writibilty change should be delayed until the end of the top level event.
@ -703,13 +606,13 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
/** /**
* Determine if the stream associated with {@code state} is writable. * Determine if the stream associated with {@code state} is writable.
* @param state The state which is associated with the stream to test writability for. * @param state The state which is associated with the stream to test writability for.
* @return {@code true} if {@link AbstractState#stream()} is writable. {@code false} otherwise. * @return {@code true} if {@link FlowState#stream()} is writable. {@code false} otherwise.
*/ */
public final boolean isWritable(AbstractState state) { final boolean isWritable(FlowState state) {
return isWritableConnection() && state.windowSize() - state.pendingBytes() > 0; return isWritableConnection() && state.isWritable();
} }
protected final void writePendingBytes() throws Http2Exception { final void writePendingBytes() throws Http2Exception {
int bytesToWrite = writableBytes(); int bytesToWrite = writableBytes();
// Make sure we always write at least once, regardless if we have bytesToWrite or not. // Make sure we always write at least once, regardless if we have bytesToWrite or not.
@ -723,7 +626,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
} }
} }
protected void initialWindowSize(int newWindowSize) throws Http2Exception { void initialWindowSize(int newWindowSize) throws Http2Exception {
if (newWindowSize < 0) { if (newWindowSize < 0) {
throw new IllegalArgumentException("Invalid initial window size: " + newWindowSize); throw new IllegalArgumentException("Invalid initial window size: " + newWindowSize);
} }
@ -744,7 +647,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
} }
} }
protected final boolean isWritableConnection() { final boolean isWritableConnection() {
return connectionState.windowSize() - totalPendingBytes > 0 && isChannelWritable(); return connectionState.windowSize() - totalPendingBytes > 0 && isChannelWritable();
} }
} }
@ -762,8 +665,8 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
private final Http2StreamVisitor checkStreamWritabilityVisitor = new Http2StreamVisitor() { private final Http2StreamVisitor checkStreamWritabilityVisitor = new Http2StreamVisitor() {
@Override @Override
public boolean visit(Http2Stream stream) throws Http2Exception { public boolean visit(Http2Stream stream) throws Http2Exception {
AbstractState state = state(stream); FlowState state = state(stream);
if (isWritable(state) != state.markWritability()) { if (isWritable(state) != state.markedWritability()) {
notifyWritabilityChanged(state); notifyWritabilityChanged(state);
} }
return true; return true;
@ -775,19 +678,23 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
} }
@Override @Override
public void incrementWindowSize(AbstractState state, int delta) throws Http2Exception { void windowSize(FlowState state, int initialWindowSize) {
super.incrementWindowSize(state, delta); super.windowSize(state, initialWindowSize);
if (isWritable(state) != state.markWritability()) { try {
if (state == connectionState) { checkStateWritability(state);
checkAllWritabilityChanged(); } catch (Http2Exception e) {
} else { throw new RuntimeException("Caught unexpected exception from window", e);
notifyWritabilityChanged(state);
}
} }
} }
@Override @Override
protected void initialWindowSize(int newWindowSize) throws Http2Exception { void incrementWindowSize(FlowState state, int delta) throws Http2Exception {
super.incrementWindowSize(state, delta);
checkStateWritability(state);
}
@Override
void initialWindowSize(int newWindowSize) throws Http2Exception {
super.initialWindowSize(newWindowSize); super.initialWindowSize(newWindowSize);
if (isWritableConnection()) { if (isWritableConnection()) {
// If the write operation does not occur we still need to check all streams because they // If the write operation does not occur we still need to check all streams because they
@ -797,48 +704,58 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
} }
@Override @Override
public void enqueueFrame(AbstractState state, FlowControlled frame) throws Http2Exception { void enqueueFrame(FlowState state, FlowControlled frame) throws Http2Exception {
super.enqueueFrame(state, frame); super.enqueueFrame(state, frame);
checkConnectionThenStreamWritabilityChanged(state); checkConnectionThenStreamWritabilityChanged(state);
} }
@Override @Override
public void stateCancelled(AbstractState state) { void stateCancelled(FlowState state) {
try { try {
checkConnectionThenStreamWritabilityChanged(state); checkConnectionThenStreamWritabilityChanged(state);
} catch (Http2Exception e) { } catch (Http2Exception e) {
logger.error("Caught unexpected exception from checkAllWritabilityChanged", e); throw new RuntimeException("Caught unexpected exception from checkAllWritabilityChanged", e);
} }
} }
@Override @Override
public void channelWritabilityChange() throws Http2Exception { void channelWritabilityChange() throws Http2Exception {
if (connectionState.markWritability() != isChannelWritable()) { if (connectionState.markedWritability() != isChannelWritable()) {
checkAllWritabilityChanged(); checkAllWritabilityChanged();
} }
} }
private void notifyWritabilityChanged(AbstractState state) { private void checkStateWritability(FlowState state) throws Http2Exception {
state.markWritability(!state.markWritability()); if (isWritable(state) != state.markedWritability()) {
if (state == connectionState) {
checkAllWritabilityChanged();
} else {
notifyWritabilityChanged(state);
}
}
}
private void notifyWritabilityChanged(FlowState state) {
state.markedWritability(!state.markedWritability());
try { try {
listener.writabilityChanged(state.stream); listener.writabilityChanged(state.stream);
} catch (RuntimeException e) { } catch (Throwable cause) {
logger.error("Caught unexpected exception from listener.writabilityChanged", e); logger.error("Caught Throwable from listener.writabilityChanged", cause);
} }
} }
private void checkConnectionThenStreamWritabilityChanged(AbstractState state) throws Http2Exception { private void checkConnectionThenStreamWritabilityChanged(FlowState state) throws Http2Exception {
// It is possible that the connection window and/or the individual stream writability could change. // It is possible that the connection window and/or the individual stream writability could change.
if (isWritableConnection() != connectionState.markWritability()) { if (isWritableConnection() != connectionState.markedWritability()) {
checkAllWritabilityChanged(); checkAllWritabilityChanged();
} else if (isWritable(state) != state.markWritability()) { } else if (isWritable(state) != state.markedWritability()) {
notifyWritabilityChanged(state); notifyWritabilityChanged(state);
} }
} }
private void checkAllWritabilityChanged() throws Http2Exception { private void checkAllWritabilityChanged() throws Http2Exception {
// Make sure we mark that we have notified as a result of this change. // Make sure we mark that we have notified as a result of this change.
connectionState.markWritability(isWritableConnection()); connectionState.markedWritability(isWritableConnection());
connection.forEachActiveStream(checkStreamWritabilityVisitor); connection.forEachActiveStream(checkStreamWritabilityVisitor);
} }
} }

View File

@ -321,11 +321,6 @@ public class DelegatingDecompressorFrameListener extends Http2FrameListenerDecor
return flowController.windowSize(stream); return flowController.windowSize(stream);
} }
@Override
public int initialWindowSize(Http2Stream stream) {
return flowController.initialWindowSize(stream);
}
@Override @Override
public void incrementWindowSize(Http2Stream stream, int delta) throws Http2Exception { public void incrementWindowSize(Http2Stream stream, int delta) throws Http2Exception {
flowController.incrementWindowSize(stream, delta); flowController.incrementWindowSize(stream, delta);
@ -367,6 +362,11 @@ public class DelegatingDecompressorFrameListener extends Http2FrameListenerDecor
public int unconsumedBytes(Http2Stream stream) { public int unconsumedBytes(Http2Stream stream) {
return flowController.unconsumedBytes(stream); return flowController.unconsumedBytes(stream);
} }
@Override
public int initialWindowSize(Http2Stream stream) {
return flowController.initialWindowSize(stream);
}
} }
/** /**
@ -418,7 +418,7 @@ public class DelegatingDecompressorFrameListener extends Http2FrameListenerDecor
/** /**
* Increment the number of bytes after the decompression process. Under normal circumstances this * Increment the number of bytes after the decompression process. Under normal circumstances this
* delta should not exceed {@link Http2Decompressor#processedBytes()}. * delta should not exceed {@link Http2Decompressor#processed)}.
*/ */
void incrementDecompressedByes(int delta) { void incrementDecompressedByes(int delta) {
if (decompressed + delta < 0) { if (decompressed + delta < 0) {
@ -428,10 +428,10 @@ public class DelegatingDecompressorFrameListener extends Http2FrameListenerDecor
} }
/** /**
* Decrements {@link Http2Decompressor#processedBytes()} by {@code processedBytes} and determines the ratio * Decrements {@link Http2Decompressor#processed} by {@code processedBytes} and determines the ratio
* between {@code processedBytes} and {@link Http2Decompressor#decompressedBytes()}. * between {@code processedBytes} and {@link Http2Decompressor#decompressed}.
* This ratio is used to decrement {@link Http2Decompressor#decompressedBytes()} and * This ratio is used to decrement {@link Http2Decompressor#decompressed} and
* {@link Http2Decompressor#compressedBytes()}. * {@link Http2Decompressor#compressed}.
* @param processedBytes The number of post-decompressed bytes that have been processed. * @param processedBytes The number of post-decompressed bytes that have been processed.
* @return The number of pre-decompressed bytes that have been consumed. * @return The number of pre-decompressed bytes that have been consumed.
*/ */

View File

@ -56,13 +56,6 @@ public interface Http2FlowController {
*/ */
int windowSize(Http2Stream stream); int windowSize(Http2Stream stream);
/**
* Get the initial flow control window size for the given stream. This quantity is measured in number of bytes. Note
* the unavailable window portion can be calculated by {@link #initialWindowSize()} - {@link
* #windowSize(Http2Stream)}.
*/
int initialWindowSize(Http2Stream stream);
/** /**
* Increments the size of the stream's flow control window by the given delta. * Increments the size of the stream's flow control window by the given delta.
* <p> * <p>

View File

@ -76,4 +76,11 @@ public interface Http2LocalFlowController extends Http2FlowController {
* @return the number of unconsumed bytes for the stream. * @return the number of unconsumed bytes for the stream.
*/ */
int unconsumedBytes(Http2Stream stream); int unconsumedBytes(Http2Stream stream);
/**
* Get the initial flow control window size for the given stream. This quantity is measured in number of bytes. Note
* the unavailable window portion can be calculated by {@link #initialWindowSize()} - {@link
* #windowSize(Http2Stream)}.
*/
int initialWindowSize(Http2Stream stream);
} }

View File

@ -166,13 +166,6 @@ public interface Http2Stream {
*/ */
Http2Stream parent(); Http2Stream parent();
/**
* Get the number of streams in the priority tree rooted at this node that are OK to exist in the priority
* tree on their own right. Some streams may be in the priority tree because their dependents require them to
* remain.
*/
int prioritizableForTree();
/** /**
* Indicates whether or not this stream is a descendant in the priority tree from the given stream. * Indicates whether or not this stream is a descendant in the priority tree from the given stream.
*/ */

View File

@ -161,6 +161,26 @@ public class DefaultHttp2ConnectionTest {
testRemoveAllStreams(); testRemoveAllStreams();
} }
@Test
public void removeIndividualStreamsWhileCloseDoesNotNPE() throws InterruptedException, Http2Exception {
final Http2Stream streamA = client.local().createStream(3, false);
final Http2Stream streamB = client.remote().createStream(2, false);
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
streamA.close();
streamB.close();
return null;
}
}).when(clientListener2).onStreamClosed(any(Http2Stream.class));
try {
client.addListener(clientListener2);
testRemoveAllStreams();
} finally {
client.removeListener(clientListener2);
}
}
@Test @Test
public void removeAllStreamsWhileIteratingActiveStreams() throws InterruptedException, Http2Exception { public void removeAllStreamsWhileIteratingActiveStreams() throws InterruptedException, Http2Exception {
final Endpoint<Http2RemoteFlowController> remote = client.remote(); final Endpoint<Http2RemoteFlowController> remote = client.remote();
@ -497,7 +517,6 @@ public class DefaultHttp2ConnectionTest {
public void prioritizeShouldUseDefaults() throws Exception { public void prioritizeShouldUseDefaults() throws Exception {
Http2Stream stream = client.local().createStream(1, false); Http2Stream stream = client.local().createStream(1, false);
assertEquals(1, client.connectionStream().numChildren()); assertEquals(1, client.connectionStream().numChildren());
assertEquals(2, client.connectionStream().prioritizableForTree());
assertEquals(stream, child(client.connectionStream(), 1)); assertEquals(stream, child(client.connectionStream(), 1));
assertEquals(DEFAULT_PRIORITY_WEIGHT, stream.weight()); assertEquals(DEFAULT_PRIORITY_WEIGHT, stream.weight());
assertEquals(0, stream.parent().id()); assertEquals(0, stream.parent().id());
@ -509,7 +528,6 @@ public class DefaultHttp2ConnectionTest {
Http2Stream stream = client.local().createStream(1, false); Http2Stream stream = client.local().createStream(1, false);
stream.setPriority(0, DEFAULT_PRIORITY_WEIGHT, false); stream.setPriority(0, DEFAULT_PRIORITY_WEIGHT, false);
assertEquals(1, client.connectionStream().numChildren()); assertEquals(1, client.connectionStream().numChildren());
assertEquals(2, client.connectionStream().prioritizableForTree());
assertEquals(stream, child(client.connectionStream(), 1)); assertEquals(stream, child(client.connectionStream(), 1));
assertEquals(DEFAULT_PRIORITY_WEIGHT, stream.weight()); assertEquals(DEFAULT_PRIORITY_WEIGHT, stream.weight());
assertEquals(0, stream.parent().id()); assertEquals(0, stream.parent().id());
@ -532,33 +550,28 @@ public class DefaultHttp2ConnectionTest {
// Level 0 // Level 0
Http2Stream p = client.connectionStream(); Http2Stream p = client.connectionStream();
assertEquals(1, p.numChildren()); assertEquals(1, p.numChildren());
assertEquals(5, p.prioritizableForTree());
// Level 1 // Level 1
p = child(p, streamA.id()); p = child(p, streamA.id());
assertNotNull(p); assertNotNull(p);
assertEquals(0, p.parent().id()); assertEquals(0, p.parent().id());
assertEquals(1, p.numChildren()); assertEquals(1, p.numChildren());
assertEquals(4, p.prioritizableForTree());
// Level 2 // Level 2
p = child(p, streamD.id()); p = child(p, streamD.id());
assertNotNull(p); assertNotNull(p);
assertEquals(streamA.id(), p.parent().id()); assertEquals(streamA.id(), p.parent().id());
assertEquals(2, p.numChildren()); assertEquals(2, p.numChildren());
assertEquals(3, p.prioritizableForTree());
// Level 3 // Level 3
p = child(p, streamB.id()); p = child(p, streamB.id());
assertNotNull(p); assertNotNull(p);
assertEquals(streamD.id(), p.parent().id()); assertEquals(streamD.id(), p.parent().id());
assertEquals(0, p.numChildren()); assertEquals(0, p.numChildren());
assertEquals(1, p.prioritizableForTree());
p = child(p.parent(), streamC.id()); p = child(p.parent(), streamC.id());
assertNotNull(p); assertNotNull(p);
assertEquals(streamD.id(), p.parent().id()); assertEquals(streamD.id(), p.parent().id());
assertEquals(0, p.numChildren()); assertEquals(0, p.numChildren());
assertEquals(1, p.prioritizableForTree());
} }
@Test @Test
@ -580,33 +593,28 @@ public class DefaultHttp2ConnectionTest {
// Level 0 // Level 0
Http2Stream p = client.connectionStream(); Http2Stream p = client.connectionStream();
assertEquals(1, p.numChildren()); assertEquals(1, p.numChildren());
assertEquals(5, p.prioritizableForTree());
// Level 1 // Level 1
p = child(p, streamA.id()); p = child(p, streamA.id());
assertNotNull(p); assertNotNull(p);
assertEquals(0, p.parent().id()); assertEquals(0, p.parent().id());
assertEquals(1, p.numChildren()); assertEquals(1, p.numChildren());
assertEquals(4, p.prioritizableForTree());
// Level 2 // Level 2
p = child(p, streamC.id()); p = child(p, streamC.id());
assertNotNull(p); assertNotNull(p);
assertEquals(streamA.id(), p.parent().id()); assertEquals(streamA.id(), p.parent().id());
assertEquals(2, p.numChildren()); assertEquals(2, p.numChildren());
assertEquals(3, p.prioritizableForTree());
// Level 3 // Level 3
p = child(p, streamB.id()); p = child(p, streamB.id());
assertNotNull(p); assertNotNull(p);
assertEquals(streamC.id(), p.parent().id()); assertEquals(streamC.id(), p.parent().id());
assertEquals(0, p.numChildren()); assertEquals(0, p.numChildren());
assertEquals(1, p.prioritizableForTree());
p = child(p.parent(), streamD.id()); p = child(p.parent(), streamD.id());
assertNotNull(p); assertNotNull(p);
assertEquals(streamC.id(), p.parent().id()); assertEquals(streamC.id(), p.parent().id());
assertEquals(0, p.numChildren()); assertEquals(0, p.numChildren());
assertEquals(1, p.prioritizableForTree());
} }
@Test @Test
@ -632,45 +640,38 @@ public class DefaultHttp2ConnectionTest {
// Level 0 // Level 0
Http2Stream p = client.connectionStream(); Http2Stream p = client.connectionStream();
assertEquals(2, p.numChildren()); assertEquals(2, p.numChildren());
assertEquals(7, p.prioritizableForTree());
// Level 1 // Level 1
p = child(p, streamE.id()); p = child(p, streamE.id());
assertNotNull(p); assertNotNull(p);
assertEquals(0, p.parent().id()); assertEquals(0, p.parent().id());
assertEquals(0, p.numChildren()); assertEquals(0, p.numChildren());
assertEquals(1, p.prioritizableForTree());
p = child(p.parent(), streamA.id()); p = child(p.parent(), streamA.id());
assertNotNull(p); assertNotNull(p);
assertEquals(0, p.parent().id()); assertEquals(0, p.parent().id());
assertEquals(1, p.numChildren()); assertEquals(1, p.numChildren());
assertEquals(5, p.prioritizableForTree());
// Level 2 // Level 2
p = child(p, streamF.id()); p = child(p, streamF.id());
assertNotNull(p); assertNotNull(p);
assertEquals(streamA.id(), p.parent().id()); assertEquals(streamA.id(), p.parent().id());
assertEquals(2, p.numChildren()); assertEquals(2, p.numChildren());
assertEquals(4, p.prioritizableForTree());
// Level 3 // Level 3
p = child(p, streamB.id()); p = child(p, streamB.id());
assertNotNull(p); assertNotNull(p);
assertEquals(streamF.id(), p.parent().id()); assertEquals(streamF.id(), p.parent().id());
assertEquals(0, p.numChildren()); assertEquals(0, p.numChildren());
assertEquals(1, p.prioritizableForTree());
p = child(p.parent(), streamC.id()); p = child(p.parent(), streamC.id());
assertNotNull(p); assertNotNull(p);
assertEquals(streamF.id(), p.parent().id()); assertEquals(streamF.id(), p.parent().id());
assertEquals(1, p.numChildren()); assertEquals(1, p.numChildren());
assertEquals(2, p.prioritizableForTree());
// Level 4 // Level 4
p = child(p, streamD.id()); p = child(p, streamD.id());
assertNotNull(p); assertNotNull(p);
assertEquals(streamC.id(), p.parent().id()); assertEquals(streamC.id(), p.parent().id());
assertEquals(0, p.numChildren()); assertEquals(0, p.numChildren());
assertEquals(1, p.prioritizableForTree());
} }
@Test @Test
@ -696,11 +697,6 @@ public class DefaultHttp2ConnectionTest {
any(Http2Stream.class)); any(Http2Stream.class));
verify(clientListener, never()).onPriorityTreeParentChanged(any(Http2Stream.class), verify(clientListener, never()).onPriorityTreeParentChanged(any(Http2Stream.class),
any(Http2Stream.class)); any(Http2Stream.class));
assertEquals(5, client.connectionStream().prioritizableForTree());
assertEquals(4, streamA.prioritizableForTree());
assertEquals(1, streamB.prioritizableForTree());
assertEquals(1, streamC.prioritizableForTree());
assertEquals(3, streamD.prioritizableForTree());
} }
@Test @Test
@ -720,11 +716,6 @@ public class DefaultHttp2ConnectionTest {
assertEquals(4, client.numActiveStreams()); assertEquals(4, client.numActiveStreams());
Http2Stream connectionStream = client.connectionStream(); Http2Stream connectionStream = client.connectionStream();
assertEquals(5, connectionStream.prioritizableForTree());
assertEquals(4, streamA.prioritizableForTree());
assertEquals(1, streamB.prioritizableForTree());
assertEquals(1, streamC.prioritizableForTree());
assertEquals(3, streamD.prioritizableForTree());
// The goal is to call setPriority with the same parent and vary the parameters // The goal is to call setPriority with the same parent and vary the parameters
// we were at one point adding a circular depends to the tree and then throwing // we were at one point adding a circular depends to the tree and then throwing
@ -732,11 +723,6 @@ public class DefaultHttp2ConnectionTest {
for (int j = 0; j < weights.length; ++j) { for (int j = 0; j < weights.length; ++j) {
for (int i = 0; i < exclusive.length; ++i) { for (int i = 0; i < exclusive.length; ++i) {
streamD.setPriority(streamA.id(), weights[j], exclusive[i]); streamD.setPriority(streamA.id(), weights[j], exclusive[i]);
assertEquals(5, connectionStream.prioritizableForTree());
assertEquals(4, streamA.prioritizableForTree());
assertEquals(1, streamB.prioritizableForTree());
assertEquals(1, streamC.prioritizableForTree());
assertEquals(3, streamD.prioritizableForTree());
} }
} }
} }
@ -755,45 +741,20 @@ public class DefaultHttp2ConnectionTest {
assertEquals(4, client.numActiveStreams()); assertEquals(4, client.numActiveStreams());
Http2Stream connectionStream = client.connectionStream(); Http2Stream connectionStream = client.connectionStream();
assertEquals(5, connectionStream.prioritizableForTree());
assertEquals(4, streamA.prioritizableForTree());
assertEquals(1, streamB.prioritizableForTree());
assertEquals(1, streamC.prioritizableForTree());
assertEquals(3, streamD.prioritizableForTree());
// Bring B to the root // Bring B to the root
streamA.setPriority(streamB.id(), DEFAULT_PRIORITY_WEIGHT, true); streamA.setPriority(streamB.id(), DEFAULT_PRIORITY_WEIGHT, true);
assertEquals(5, connectionStream.prioritizableForTree());
assertEquals(3, streamA.prioritizableForTree());
assertEquals(4, streamB.prioritizableForTree());
assertEquals(1, streamC.prioritizableForTree());
assertEquals(2, streamD.prioritizableForTree());
// Move all streams to be children of B // Move all streams to be children of B
streamC.setPriority(streamB.id(), DEFAULT_PRIORITY_WEIGHT, false); streamC.setPriority(streamB.id(), DEFAULT_PRIORITY_WEIGHT, false);
streamD.setPriority(streamB.id(), DEFAULT_PRIORITY_WEIGHT, false); streamD.setPriority(streamB.id(), DEFAULT_PRIORITY_WEIGHT, false);
assertEquals(5, connectionStream.prioritizableForTree());
assertEquals(1, streamA.prioritizableForTree());
assertEquals(4, streamB.prioritizableForTree());
assertEquals(1, streamC.prioritizableForTree());
assertEquals(1, streamD.prioritizableForTree());
// Move A back to the root // Move A back to the root
streamB.setPriority(streamA.id(), DEFAULT_PRIORITY_WEIGHT, true); streamB.setPriority(streamA.id(), DEFAULT_PRIORITY_WEIGHT, true);
assertEquals(5, connectionStream.prioritizableForTree());
assertEquals(4, streamA.prioritizableForTree());
assertEquals(3, streamB.prioritizableForTree());
assertEquals(1, streamC.prioritizableForTree());
assertEquals(1, streamD.prioritizableForTree());
// Move all streams to be children of A // Move all streams to be children of A
streamC.setPriority(streamA.id(), DEFAULT_PRIORITY_WEIGHT, false); streamC.setPriority(streamA.id(), DEFAULT_PRIORITY_WEIGHT, false);
streamD.setPriority(streamA.id(), DEFAULT_PRIORITY_WEIGHT, false); streamD.setPriority(streamA.id(), DEFAULT_PRIORITY_WEIGHT, false);
assertEquals(5, connectionStream.prioritizableForTree());
assertEquals(4, streamA.prioritizableForTree());
assertEquals(1, streamB.prioritizableForTree());
assertEquals(1, streamC.prioritizableForTree());
assertEquals(1, streamD.prioritizableForTree());
} }
@Test @Test
@ -812,33 +773,22 @@ public class DefaultHttp2ConnectionTest {
// Level 0 // Level 0
Http2Stream p = client.connectionStream(); Http2Stream p = client.connectionStream();
assertEquals(4, p.prioritizableForTree());
assertEquals(1, p.numChildren()); assertEquals(1, p.numChildren());
// Level 1 // Level 1
p = child(p, streamA.id()); p = child(p, streamA.id());
assertNotNull(p); assertNotNull(p);
assertEquals(3, p.prioritizableForTree()); assertEquals(client.connectionStream().id(), p.parent().id());
assertEquals(0, p.parent().id());
assertEquals(1, p.numChildren());
// Level 2
p = child(p, streamB.id());
assertNotNull(p);
assertEquals(2, p.prioritizableForTree());
assertEquals(streamA.id(), p.parent().id());
assertEquals(2, p.numChildren()); assertEquals(2, p.numChildren());
// Level 3 // Level 2
p = child(p, streamC.id()); p = child(p, streamC.id());
assertNotNull(p); assertNotNull(p);
assertEquals(1, p.prioritizableForTree()); assertEquals(streamA.id(), p.parent().id());
assertEquals(streamB.id(), p.parent().id());
assertEquals(0, p.numChildren()); assertEquals(0, p.numChildren());
p = child(p.parent(), streamD.id()); p = child(p.parent(), streamD.id());
assertNotNull(p); assertNotNull(p);
assertEquals(1, p.prioritizableForTree()); assertEquals(streamA.id(), p.parent().id());
assertEquals(streamB.id(), p.parent().id());
assertEquals(0, p.numChildren()); assertEquals(0, p.numChildren());
} }
@ -857,7 +807,7 @@ public class DefaultHttp2ConnectionTest {
streamE.setPriority(streamC.id(), DEFAULT_PRIORITY_WEIGHT, false); streamE.setPriority(streamC.id(), DEFAULT_PRIORITY_WEIGHT, false);
streamF.setPriority(streamD.id(), DEFAULT_PRIORITY_WEIGHT, false); streamF.setPriority(streamD.id(), DEFAULT_PRIORITY_WEIGHT, false);
// Close internal nodes, leave 1 leaf node open, and ensure part of the tree (D & F) is cleaned up // Close internal nodes, leave 1 leaf node open, the only remaining stream is the one that is not closed (E).
streamA.close(); streamA.close();
streamB.close(); streamB.close();
streamC.close(); streamC.close();
@ -867,38 +817,15 @@ public class DefaultHttp2ConnectionTest {
// Level 0 // Level 0
Http2Stream p = client.connectionStream(); Http2Stream p = client.connectionStream();
assertEquals(1, p.numChildren()); assertEquals(1, p.numChildren());
assertEquals(2, p.prioritizableForTree());
// Level 1 // Level 1
p = child(p, streamA.id());
assertNotNull(p);
assertEquals(0, p.parent().id());
assertEquals(1, p.numChildren());
assertEquals(1, p.prioritizableForTree());
// Level 2
p = child(p, streamB.id());
assertNotNull(p);
assertEquals(streamA.id(), p.parent().id());
assertEquals(1, p.numChildren());
assertEquals(1, p.prioritizableForTree());
// Level 3
p = child(p, streamC.id());
assertNotNull(p);
assertEquals(streamB.id(), p.parent().id());
assertEquals(1, p.numChildren());
assertEquals(1, p.prioritizableForTree());
// Level 4
p = child(p, streamE.id()); p = child(p, streamE.id());
assertNotNull(p); assertNotNull(p);
assertEquals(streamC.id(), p.parent().id()); assertEquals(client.connectionStream().id(), p.parent().id());
assertEquals(0, p.numChildren()); assertEquals(0, p.numChildren());
assertEquals(1, p.prioritizableForTree());
} }
@Test @Test(expected = Http2Exception.class)
public void priorityChangeWithNoPrioritizableDependentsShouldRestructureTree() throws Exception { public void priorityChangeWithNoPrioritizableDependentsShouldRestructureTree() throws Exception {
Http2Stream streamA = client.local().createStream(1, false); Http2Stream streamA = client.local().createStream(1, false);
Http2Stream streamB = client.local().createStream(3, false); Http2Stream streamB = client.local().createStream(3, false);
@ -919,46 +846,8 @@ public class DefaultHttp2ConnectionTest {
streamC.close(); streamC.close();
streamD.close(); streamD.close();
// Move F to depend on C, this should close D // Attempt to move F to depend on C, however this should throw an exception because C is closed.
streamF.setPriority(streamC.id(), DEFAULT_PRIORITY_WEIGHT, false); streamF.setPriority(streamC.id(), DEFAULT_PRIORITY_WEIGHT, false);
// Level 0
Http2Stream p = client.connectionStream();
assertEquals(1, p.numChildren());
assertEquals(3, p.prioritizableForTree());
// Level 1
p = child(p, streamA.id());
assertNotNull(p);
assertEquals(0, p.parent().id());
assertEquals(1, p.numChildren());
assertEquals(2, p.prioritizableForTree());
// Level 2
p = child(p, streamB.id());
assertNotNull(p);
assertEquals(streamA.id(), p.parent().id());
assertEquals(1, p.numChildren());
assertEquals(2, p.prioritizableForTree());
// Level 3
p = child(p, streamC.id());
assertNotNull(p);
assertEquals(streamB.id(), p.parent().id());
assertEquals(2, p.numChildren());
assertEquals(2, p.prioritizableForTree());
// Level 4
p = child(p, streamE.id());
assertNotNull(p);
assertEquals(streamC.id(), p.parent().id());
assertEquals(0, p.numChildren());
assertEquals(1, p.prioritizableForTree());
p = child(p.parent(), streamF.id());
assertNotNull(p);
assertEquals(streamC.id(), p.parent().id());
assertEquals(0, p.numChildren());
assertEquals(1, p.prioritizableForTree());
} }
@Test @Test
@ -1016,19 +905,16 @@ public class DefaultHttp2ConnectionTest {
// Level 0 // Level 0
Http2Stream p = client.connectionStream(); Http2Stream p = client.connectionStream();
assertEquals(1, p.numChildren()); assertEquals(1, p.numChildren());
assertEquals(7, p.prioritizableForTree());
// Level 1 // Level 1
p = child(p, streamD.id()); p = child(p, streamD.id());
assertNotNull(p); assertNotNull(p);
assertEquals(2, p.numChildren()); assertEquals(2, p.numChildren());
assertEquals(6, p.prioritizableForTree());
// Level 2 // Level 2
p = child(p, streamF.id()); p = child(p, streamF.id());
assertNotNull(p); assertNotNull(p);
assertEquals(0, p.numChildren()); assertEquals(0, p.numChildren());
assertEquals(1, p.prioritizableForTree());
p = child(p.parent(), streamA.id()); p = child(p.parent(), streamA.id());
assertNotNull(p); assertNotNull(p);
assertEquals(2, p.numChildren()); assertEquals(2, p.numChildren());
@ -1037,17 +923,14 @@ public class DefaultHttp2ConnectionTest {
p = child(p, streamB.id()); p = child(p, streamB.id());
assertNotNull(p); assertNotNull(p);
assertEquals(0, p.numChildren()); assertEquals(0, p.numChildren());
assertEquals(1, p.prioritizableForTree());
p = child(p.parent(), streamC.id()); p = child(p.parent(), streamC.id());
assertNotNull(p); assertNotNull(p);
assertEquals(1, p.numChildren()); assertEquals(1, p.numChildren());
assertEquals(2, p.prioritizableForTree());
// Level 4; // Level 4;
p = child(p, streamE.id()); p = child(p, streamE.id());
assertNotNull(p); assertNotNull(p);
assertEquals(0, p.numChildren()); assertEquals(0, p.numChildren());
assertEquals(1, p.prioritizableForTree());
} }
@Test @Test
@ -1107,39 +990,32 @@ public class DefaultHttp2ConnectionTest {
// Level 0 // Level 0
Http2Stream p = client.connectionStream(); Http2Stream p = client.connectionStream();
assertEquals(1, p.numChildren()); assertEquals(1, p.numChildren());
assertEquals(7, p.prioritizableForTree());
// Level 1 // Level 1
p = child(p, streamD.id()); p = child(p, streamD.id());
assertNotNull(p); assertNotNull(p);
assertEquals(1, p.numChildren()); assertEquals(1, p.numChildren());
assertEquals(6, p.prioritizableForTree());
// Level 2 // Level 2
p = child(p, streamA.id()); p = child(p, streamA.id());
assertNotNull(p); assertNotNull(p);
assertEquals(3, p.numChildren()); assertEquals(3, p.numChildren());
assertEquals(5, p.prioritizableForTree());
// Level 3 // Level 3
p = child(p, streamB.id()); p = child(p, streamB.id());
assertNotNull(p); assertNotNull(p);
assertEquals(0, p.numChildren()); assertEquals(0, p.numChildren());
assertEquals(1, p.prioritizableForTree());
p = child(p.parent(), streamF.id()); p = child(p.parent(), streamF.id());
assertNotNull(p); assertNotNull(p);
assertEquals(0, p.numChildren()); assertEquals(0, p.numChildren());
assertEquals(1, p.prioritizableForTree());
p = child(p.parent(), streamC.id()); p = child(p.parent(), streamC.id());
assertNotNull(p); assertNotNull(p);
assertEquals(1, p.numChildren()); assertEquals(1, p.numChildren());
assertEquals(2, p.prioritizableForTree());
// Level 4; // Level 4;
p = child(p, streamE.id()); p = child(p, streamE.id());
assertNotNull(p); assertNotNull(p);
assertEquals(0, p.numChildren()); assertEquals(0, p.numChildren());
assertEquals(1, p.prioritizableForTree());
} }
/** /**

View File

@ -186,13 +186,15 @@ public abstract class DefaultHttp2RemoteFlowControllerTest {
@Test @Test
public void unflushedPayloadsShouldBeDroppedOnCancel() throws Http2Exception { public void unflushedPayloadsShouldBeDroppedOnCancel() throws Http2Exception {
FakeFlowControlled data = new FakeFlowControlled(5); FakeFlowControlled data = new FakeFlowControlled(5);
Http2Stream streamA = stream(STREAM_A);
sendData(STREAM_A, data); sendData(STREAM_A, data);
connection.stream(STREAM_A).close(); streamA.close();
controller.writePendingBytes(); controller.writePendingBytes();
data.assertNotWritten(); data.assertNotWritten();
controller.writePendingBytes(); controller.writePendingBytes();
data.assertNotWritten(); data.assertNotWritten();
verify(listener, times(1)).writabilityChanged(stream(STREAM_A)); verify(listener, times(1)).writabilityChanged(streamA);
assertFalse(controller.isWritable(streamA));
} }
@Test @Test
@ -716,7 +718,14 @@ public abstract class DefaultHttp2RemoteFlowControllerTest {
verify(flowControlled, never()).writeComplete(); verify(flowControlled, never()).writeComplete();
assertEquals(90, windowBefore - window(STREAM_A)); assertEquals(90, windowBefore - window(STREAM_A));
assertWritabilityChanged(0, true); verify(listener, times(1)).writabilityChanged(stream(STREAM_A));
verify(listener, never()).writabilityChanged(stream(STREAM_B));
verify(listener, never()).writabilityChanged(stream(STREAM_C));
verify(listener, never()).writabilityChanged(stream(STREAM_D));
assertFalse(controller.isWritable(stream(STREAM_A)));
assertTrue(controller.isWritable(stream(STREAM_B)));
assertTrue(controller.isWritable(stream(STREAM_C)));
assertTrue(controller.isWritable(stream(STREAM_D)));
} }
@Test @Test
@ -755,6 +764,7 @@ public abstract class DefaultHttp2RemoteFlowControllerTest {
public void flowControlledWriteCompleteThrowsAnException() throws Exception { public void flowControlledWriteCompleteThrowsAnException() throws Exception {
final Http2RemoteFlowController.FlowControlled flowControlled = final Http2RemoteFlowController.FlowControlled flowControlled =
mock(Http2RemoteFlowController.FlowControlled.class); mock(Http2RemoteFlowController.FlowControlled.class);
Http2Stream streamA = stream(STREAM_A);
final AtomicInteger size = new AtomicInteger(150); final AtomicInteger size = new AtomicInteger(150);
doAnswer(new Answer<Integer>() { doAnswer(new Answer<Integer>() {
@Override @Override
@ -780,19 +790,22 @@ public abstract class DefaultHttp2RemoteFlowControllerTest {
int windowBefore = window(STREAM_A); int windowBefore = window(STREAM_A);
try { controller.addFlowControlled(stream, flowControlled);
controller.addFlowControlled(stream, flowControlled); controller.writePendingBytes();
controller.writePendingBytes();
} catch (Exception e) {
fail();
}
verify(flowControlled, times(3)).write(any(ChannelHandlerContext.class), anyInt()); verify(flowControlled, times(3)).write(any(ChannelHandlerContext.class), anyInt());
verify(flowControlled, never()).error(any(ChannelHandlerContext.class), any(Throwable.class)); verify(flowControlled, never()).error(any(ChannelHandlerContext.class), any(Throwable.class));
verify(flowControlled).writeComplete(); verify(flowControlled).writeComplete();
assertEquals(150, windowBefore - window(STREAM_A)); assertEquals(150, windowBefore - window(STREAM_A));
assertWritabilityChanged(0, true); verify(listener, times(1)).writabilityChanged(streamA);
verify(listener, never()).writabilityChanged(stream(STREAM_B));
verify(listener, never()).writabilityChanged(stream(STREAM_C));
verify(listener, never()).writabilityChanged(stream(STREAM_D));
assertFalse(controller.isWritable(streamA));
assertTrue(controller.isWritable(stream(STREAM_B)));
assertTrue(controller.isWritable(stream(STREAM_C)));
assertTrue(controller.isWritable(stream(STREAM_D)));
} }
@Test @Test
@ -817,11 +830,11 @@ public abstract class DefaultHttp2RemoteFlowControllerTest {
verify(flowControlled).write(any(ChannelHandlerContext.class), anyInt()); verify(flowControlled).write(any(ChannelHandlerContext.class), anyInt());
verify(flowControlled).error(any(ChannelHandlerContext.class), any(Throwable.class)); verify(flowControlled).error(any(ChannelHandlerContext.class), any(Throwable.class));
verify(flowControlled, never()).writeComplete(); verify(flowControlled, never()).writeComplete();
verify(listener, times(1)).writabilityChanged(stream(STREAM_A)); verify(listener, times(1)).writabilityChanged(stream);
verify(listener, never()).writabilityChanged(stream(STREAM_B)); verify(listener, never()).writabilityChanged(stream(STREAM_B));
verify(listener, never()).writabilityChanged(stream(STREAM_C)); verify(listener, never()).writabilityChanged(stream(STREAM_C));
verify(listener, never()).writabilityChanged(stream(STREAM_D)); verify(listener, never()).writabilityChanged(stream(STREAM_D));
assertFalse(controller.isWritable(stream(STREAM_A))); assertFalse(controller.isWritable(stream));
assertTrue(controller.isWritable(stream(STREAM_B))); assertTrue(controller.isWritable(stream(STREAM_B)));
assertTrue(controller.isWritable(stream(STREAM_C))); assertTrue(controller.isWritable(stream(STREAM_C)));
assertTrue(controller.isWritable(stream(STREAM_D))); assertTrue(controller.isWritable(stream(STREAM_D)));

View File

@ -798,7 +798,7 @@ public class WeightedFairQueueByteDistributorTest {
} }
/** /**
* In this test, we close an internal stream in the priority tree but tree should not change. * In this test, we close an internal stream in the priority tree.
* *
* <pre> * <pre>
* 0 * 0
@ -807,6 +807,13 @@ public class WeightedFairQueueByteDistributorTest {
* / \ * / \
* C D * C D
* </pre> * </pre>
*
* After the close:
* <pre>
* 0
* / | \
* C D B
* </pre>
*/ */
@Test @Test
public void bytesDistributedShouldBeCorrectWithInternalStreamClose() throws Http2Exception { public void bytesDistributedShouldBeCorrectWithInternalStreamClose() throws Http2Exception {
@ -819,8 +826,7 @@ public class WeightedFairQueueByteDistributorTest {
assertTrue(write(500)); assertTrue(write(500));
verifyNeverWrite(STREAM_A); verifyNeverWrite(STREAM_A);
assertEquals(200, captureWrites(STREAM_B)); assertEquals(500, captureWrites(STREAM_B) + captureWrites(STREAM_C) + captureWrites(STREAM_D));
assertEquals(300, captureWrites(STREAM_C) + captureWrites(STREAM_D));
assertFalse(write(1300)); assertFalse(write(1300));
verifyNeverWrite(STREAM_A); verifyNeverWrite(STREAM_A);

View File

@ -0,0 +1,48 @@
/*
* Copyright 2016 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.util;
/**
* Represents a supplier of {@code boolean}-valued results.
*/
public interface BooleanSupplier {
/**
* Gets a boolean value.
* @return a boolean value.
* @throws Exception If an exception occurs.
*/
boolean get() throws Exception;
/**
* A supplier which always returns {@code false} and never throws.
*/
BooleanSupplier FALSE_SUPPLIER = new BooleanSupplier() {
@Override
public boolean get() {
return false;
}
};
/**
* A supplier which always returns {@code true} and never throws.
*/
BooleanSupplier TRUE_SUPPLIER = new BooleanSupplier() {
@Override
public boolean get() {
return true;
}
};
}

View File

@ -46,11 +46,6 @@ public final class NoopHttp2RemoteFlowController implements Http2RemoteFlowContr
return true; return true;
} }
@Override
public int initialWindowSize(Http2Stream stream) {
return MAX_INITIAL_WINDOW_SIZE;
}
@Override @Override
public void incrementWindowSize(Http2Stream stream, int delta) throws Http2Exception { public void incrementWindowSize(Http2Stream stream, int delta) throws Http2Exception {
} }