HTTP/2 stream removed from map before onStreamClosed called

Motivation:
The interface contract of Http2Connection.Listener.onStreamClosed says that the stream will be removed from the active stream map, and not necessarily the stream map. If the channel becomes inactive we may remove from the stream map before calling onStreamClosed.

Modifications:
- Don't remove from the stream map during iteration until after onStreamClosed is called

Result:
Expectations of onStreamClosed interface are not violated
This commit is contained in:
Scott Mitchell 2016-02-19 17:50:25 -08:00
parent 94f27be59b
commit c4fbc0642d
2 changed files with 40 additions and 30 deletions

View File

@ -23,8 +23,8 @@ import io.netty.util.collection.IntObjectHashMap;
import io.netty.util.collection.IntObjectMap; import io.netty.util.collection.IntObjectMap;
import io.netty.util.collection.IntObjectMap.PrimitiveEntry; import io.netty.util.collection.IntObjectMap.PrimitiveEntry;
import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.Promise; import io.netty.util.concurrent.Promise;
import io.netty.util.concurrent.UnaryPromiseNotifier;
import io.netty.util.internal.EmptyArrays; import io.netty.util.internal.EmptyArrays;
import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.SystemPropertyUtil; import io.netty.util.internal.SystemPropertyUtil;
@ -122,18 +122,7 @@ public class DefaultHttp2Connection implements Http2Connection {
} else if ((promise instanceof ChannelPromise) && ((ChannelPromise) closePromise).isVoid()) { } else if ((promise instanceof ChannelPromise) && ((ChannelPromise) closePromise).isVoid()) {
closePromise = promise; closePromise = promise;
} else { } else {
closePromise.addListener(new FutureListener<Void>() { closePromise.addListener(new UnaryPromiseNotifier<Void>(promise));
@Override
public void operationComplete(Future<Void> future) throws Exception {
if (future.isSuccess()) {
promise.trySuccess(null);
} else if (future.isCancelled()) {
promise.cancel(false);
} else {
promise.tryFailure(future.cause());
}
}
});
} }
} else { } else {
closePromise = promise; closePromise = promise;
@ -147,13 +136,12 @@ public class DefaultHttp2Connection implements Http2Connection {
// paths iterating over the active streams. // paths iterating over the active streams.
if (activeStreams.allowModifications()) { if (activeStreams.allowModifications()) {
while (itr.hasNext()) { while (itr.hasNext()) {
Http2Stream stream = itr.next().value(); DefaultStream stream = (DefaultStream) itr.next().value();
if (stream.id() != CONNECTION_STREAM_ID) { if (stream.id() != CONNECTION_STREAM_ID) {
// If modifications of the activeStream map is allowed, then a stream close operation will also // If modifications of the activeStream map is allowed, then a stream close operation will also
// modify the streamMap. We must prevent concurrent modifications to the streamMap, so use the // modify the streamMap. Pass the iterator in so that remove will be called to prevent concurrent
// iterator to remove the current stream. // modification exceptions.
itr.remove(); stream.close(itr);
stream.close();
} }
} }
} else { } else {
@ -297,10 +285,14 @@ public class DefaultHttp2Connection implements Http2Connection {
* (see [3] {@link DefaultStream#takeChild(DefaultStream, boolean, List)}). * (see [3] {@link DefaultStream#takeChild(DefaultStream, boolean, List)}).
* @param stream The stream to remove. * @param stream The stream to remove.
*/ */
void removeStream(DefaultStream stream) { void removeStream(DefaultStream stream, Iterator<?> itr) {
// [1] Check if this stream can be removed because it has no prioritizable descendants. // [1] Check if this stream can be removed because it has no prioritizable descendants.
if (stream.parent().removeChild(stream)) { if (stream.parent().removeChild(stream)) {
streamMap.remove(stream.id()); if (itr == null) {
streamMap.remove(stream.id());
} else {
itr.remove();
}
for (int i = 0; i < listeners.size(); i++) { for (int i = 0; i < listeners.size(); i++) {
try { try {
@ -521,8 +513,7 @@ public class DefaultHttp2Connection implements Http2Connection {
activeStreams.activate(this); activeStreams.activate(this);
} }
@Override Http2Stream close(Iterator<?> itr) {
public Http2Stream close() {
if (state == CLOSED) { if (state == CLOSED) {
return this; return this;
} }
@ -530,10 +521,15 @@ public class DefaultHttp2Connection implements Http2Connection {
state = CLOSED; state = CLOSED;
decrementPrioritizableForTree(1); decrementPrioritizableForTree(1);
activeStreams.deactivate(this); activeStreams.deactivate(this, itr);
return this; return this;
} }
@Override
public Http2Stream close() {
return close(null);
}
@Override @Override
public Http2Stream closeLocalSide() { public Http2Stream closeLocalSide() {
switch (state) { switch (state) {
@ -696,7 +692,7 @@ public class DefaultHttp2Connection implements Http2Connection {
// concrete cases known but is risky because it could invalidate the data structure. // concrete cases known but is risky because it could invalidate the data structure.
// 2. We are notifying listeners of the removal while the tree is in flux. Currently the // 2. We are notifying listeners of the removal while the tree is in flux. Currently the
// codec listeners make no assumptions about priority tree structure when being notified. // codec listeners make no assumptions about priority tree structure when being notified.
removeStream(oldParent); removeStream(oldParent, null);
} }
} }
@ -739,7 +735,7 @@ public class DefaultHttp2Connection implements Http2Connection {
// concrete cases known but is risky because it could invalidate the data structure. // concrete cases known but is risky because it could invalidate the data structure.
// 2. We are notifying listeners of the removal while the tree is in flux. Currently the // 2. We are notifying listeners of the removal while the tree is in flux. Currently the
// codec listeners make no assumptions about priority tree structure when being notified. // codec listeners make no assumptions about priority tree structure when being notified.
removeStream(this); removeStream(this, null);
} }
notifyParentChanged(events); notifyParentChanged(events);
return true; return true;
@ -1178,14 +1174,14 @@ public class DefaultHttp2Connection implements Http2Connection {
} }
} }
public void deactivate(final DefaultStream stream) { public void deactivate(final DefaultStream stream, final Iterator<?> itr) {
if (allowModifications()) { if (allowModifications()) {
removeFromActiveStreams(stream); removeFromActiveStreams(stream, itr);
} else { } else {
pendingEvents.add(new Event() { pendingEvents.add(new Event() {
@Override @Override
public void process() { public void process() {
removeFromActiveStreams(stream); removeFromActiveStreams(stream, itr);
} }
}); });
} }
@ -1233,13 +1229,13 @@ public class DefaultHttp2Connection implements Http2Connection {
} }
} }
void removeFromActiveStreams(DefaultStream stream) { void removeFromActiveStreams(DefaultStream stream, Iterator<?> itr) {
if (streams.remove(stream)) { if (streams.remove(stream)) {
// Update the number of active streams initiated by the endpoint. // Update the number of active streams initiated by the endpoint.
stream.createdBy().numActiveStreams--; stream.createdBy().numActiveStreams--;
} }
notifyClosed(stream); notifyClosed(stream);
removeStream(stream); removeStream(stream, itr);
} }
boolean allowModifications() { boolean allowModifications() {

View File

@ -96,6 +96,20 @@ public class DefaultHttp2ConnectionTest {
server = new DefaultHttp2Connection(true); server = new DefaultHttp2Connection(true);
client = new DefaultHttp2Connection(false); client = new DefaultHttp2Connection(false);
client.addListener(clientListener); client.addListener(clientListener);
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
assertNotNull(client.stream(invocation.getArgumentAt(0, Http2Stream.class).id()));
return null;
}
}).when(clientListener).onStreamClosed(any(Http2Stream.class));
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
assertNull(client.stream(invocation.getArgumentAt(0, Http2Stream.class).id()));
return null;
}
}).when(clientListener).onStreamRemoved(any(Http2Stream.class));
} }
@Test @Test