diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2Connection.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2Connection.java index 6dae971b4a..9f8029374c 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2Connection.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2Connection.java @@ -23,8 +23,8 @@ import io.netty.util.collection.IntObjectHashMap; import io.netty.util.collection.IntObjectMap; import io.netty.util.collection.IntObjectMap.PrimitiveEntry; import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.Promise; +import io.netty.util.concurrent.UnaryPromiseNotifier; import io.netty.util.internal.EmptyArrays; import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.SystemPropertyUtil; @@ -122,18 +122,7 @@ public class DefaultHttp2Connection implements Http2Connection { } else if ((promise instanceof ChannelPromise) && ((ChannelPromise) closePromise).isVoid()) { closePromise = promise; } else { - closePromise.addListener(new FutureListener() { - @Override - public void operationComplete(Future future) throws Exception { - if (future.isSuccess()) { - promise.trySuccess(null); - } else if (future.isCancelled()) { - promise.cancel(false); - } else { - promise.tryFailure(future.cause()); - } - } - }); + closePromise.addListener(new UnaryPromiseNotifier(promise)); } } else { closePromise = promise; @@ -147,13 +136,12 @@ public class DefaultHttp2Connection implements Http2Connection { // paths iterating over the active streams. if (activeStreams.allowModifications()) { while (itr.hasNext()) { - Http2Stream stream = itr.next().value(); + DefaultStream stream = (DefaultStream) itr.next().value(); if (stream.id() != CONNECTION_STREAM_ID) { // If modifications of the activeStream map is allowed, then a stream close operation will also - // modify the streamMap. We must prevent concurrent modifications to the streamMap, so use the - // iterator to remove the current stream. - itr.remove(); - stream.close(); + // modify the streamMap. Pass the iterator in so that remove will be called to prevent concurrent + // modification exceptions. + stream.close(itr); } } } else { @@ -297,10 +285,14 @@ public class DefaultHttp2Connection implements Http2Connection { * (see [3] {@link DefaultStream#takeChild(DefaultStream, boolean, List)}). * @param stream The stream to remove. */ - void removeStream(DefaultStream stream) { + void removeStream(DefaultStream stream, Iterator itr) { // [1] Check if this stream can be removed because it has no prioritizable descendants. if (stream.parent().removeChild(stream)) { - streamMap.remove(stream.id()); + if (itr == null) { + streamMap.remove(stream.id()); + } else { + itr.remove(); + } for (int i = 0; i < listeners.size(); i++) { try { @@ -521,8 +513,7 @@ public class DefaultHttp2Connection implements Http2Connection { activeStreams.activate(this); } - @Override - public Http2Stream close() { + Http2Stream close(Iterator itr) { if (state == CLOSED) { return this; } @@ -530,10 +521,15 @@ public class DefaultHttp2Connection implements Http2Connection { state = CLOSED; decrementPrioritizableForTree(1); - activeStreams.deactivate(this); + activeStreams.deactivate(this, itr); return this; } + @Override + public Http2Stream close() { + return close(null); + } + @Override public Http2Stream closeLocalSide() { switch (state) { @@ -696,7 +692,7 @@ public class DefaultHttp2Connection implements Http2Connection { // concrete cases known but is risky because it could invalidate the data structure. // 2. We are notifying listeners of the removal while the tree is in flux. Currently the // codec listeners make no assumptions about priority tree structure when being notified. - removeStream(oldParent); + removeStream(oldParent, null); } } @@ -739,7 +735,7 @@ public class DefaultHttp2Connection implements Http2Connection { // concrete cases known but is risky because it could invalidate the data structure. // 2. We are notifying listeners of the removal while the tree is in flux. Currently the // codec listeners make no assumptions about priority tree structure when being notified. - removeStream(this); + removeStream(this, null); } notifyParentChanged(events); return true; @@ -1178,14 +1174,14 @@ public class DefaultHttp2Connection implements Http2Connection { } } - public void deactivate(final DefaultStream stream) { + public void deactivate(final DefaultStream stream, final Iterator itr) { if (allowModifications()) { - removeFromActiveStreams(stream); + removeFromActiveStreams(stream, itr); } else { pendingEvents.add(new Event() { @Override public void process() { - removeFromActiveStreams(stream); + removeFromActiveStreams(stream, itr); } }); } @@ -1233,13 +1229,13 @@ public class DefaultHttp2Connection implements Http2Connection { } } - void removeFromActiveStreams(DefaultStream stream) { + void removeFromActiveStreams(DefaultStream stream, Iterator itr) { if (streams.remove(stream)) { // Update the number of active streams initiated by the endpoint. stream.createdBy().numActiveStreams--; } notifyClosed(stream); - removeStream(stream); + removeStream(stream, itr); } boolean allowModifications() { diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionTest.java index bb4ace388d..fdc043e960 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionTest.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionTest.java @@ -96,6 +96,20 @@ public class DefaultHttp2ConnectionTest { server = new DefaultHttp2Connection(true); client = new DefaultHttp2Connection(false); client.addListener(clientListener); + doAnswer(new Answer() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + assertNotNull(client.stream(invocation.getArgumentAt(0, Http2Stream.class).id())); + return null; + } + }).when(clientListener).onStreamClosed(any(Http2Stream.class)); + doAnswer(new Answer() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + assertNull(client.stream(invocation.getArgumentAt(0, Http2Stream.class).id())); + return null; + } + }).when(clientListener).onStreamRemoved(any(Http2Stream.class)); } @Test