From c4fbc0642de213774c479b3fb61d5f05abc8fc62 Mon Sep 17 00:00:00 2001 From: Scott Mitchell Date: Fri, 19 Feb 2016 17:50:25 -0800 Subject: [PATCH] HTTP/2 stream removed from map before onStreamClosed called Motivation: The interface contract of Http2Connection.Listener.onStreamClosed says that the stream will be removed from the active stream map, and not necessarily the stream map. If the channel becomes inactive we may remove from the stream map before calling onStreamClosed. Modifications: - Don't remove from the stream map during iteration until after onStreamClosed is called Result: Expectations of onStreamClosed interface are not violated --- .../codec/http2/DefaultHttp2Connection.java | 56 +++++++++---------- .../http2/DefaultHttp2ConnectionTest.java | 14 +++++ 2 files changed, 40 insertions(+), 30 deletions(-) diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2Connection.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2Connection.java index 6dae971b4a..9f8029374c 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2Connection.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2Connection.java @@ -23,8 +23,8 @@ import io.netty.util.collection.IntObjectHashMap; import io.netty.util.collection.IntObjectMap; import io.netty.util.collection.IntObjectMap.PrimitiveEntry; import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.Promise; +import io.netty.util.concurrent.UnaryPromiseNotifier; import io.netty.util.internal.EmptyArrays; import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.SystemPropertyUtil; @@ -122,18 +122,7 @@ public class DefaultHttp2Connection implements Http2Connection { } else if ((promise instanceof ChannelPromise) && ((ChannelPromise) closePromise).isVoid()) { closePromise = promise; } else { - closePromise.addListener(new FutureListener() { - @Override - public void operationComplete(Future future) throws Exception { - if (future.isSuccess()) { - promise.trySuccess(null); - } else if (future.isCancelled()) { - promise.cancel(false); - } else { - promise.tryFailure(future.cause()); - } - } - }); + closePromise.addListener(new UnaryPromiseNotifier(promise)); } } else { closePromise = promise; @@ -147,13 +136,12 @@ public class DefaultHttp2Connection implements Http2Connection { // paths iterating over the active streams. if (activeStreams.allowModifications()) { while (itr.hasNext()) { - Http2Stream stream = itr.next().value(); + DefaultStream stream = (DefaultStream) itr.next().value(); if (stream.id() != CONNECTION_STREAM_ID) { // If modifications of the activeStream map is allowed, then a stream close operation will also - // modify the streamMap. We must prevent concurrent modifications to the streamMap, so use the - // iterator to remove the current stream. - itr.remove(); - stream.close(); + // modify the streamMap. Pass the iterator in so that remove will be called to prevent concurrent + // modification exceptions. + stream.close(itr); } } } else { @@ -297,10 +285,14 @@ public class DefaultHttp2Connection implements Http2Connection { * (see [3] {@link DefaultStream#takeChild(DefaultStream, boolean, List)}). * @param stream The stream to remove. */ - void removeStream(DefaultStream stream) { + void removeStream(DefaultStream stream, Iterator itr) { // [1] Check if this stream can be removed because it has no prioritizable descendants. if (stream.parent().removeChild(stream)) { - streamMap.remove(stream.id()); + if (itr == null) { + streamMap.remove(stream.id()); + } else { + itr.remove(); + } for (int i = 0; i < listeners.size(); i++) { try { @@ -521,8 +513,7 @@ public class DefaultHttp2Connection implements Http2Connection { activeStreams.activate(this); } - @Override - public Http2Stream close() { + Http2Stream close(Iterator itr) { if (state == CLOSED) { return this; } @@ -530,10 +521,15 @@ public class DefaultHttp2Connection implements Http2Connection { state = CLOSED; decrementPrioritizableForTree(1); - activeStreams.deactivate(this); + activeStreams.deactivate(this, itr); return this; } + @Override + public Http2Stream close() { + return close(null); + } + @Override public Http2Stream closeLocalSide() { switch (state) { @@ -696,7 +692,7 @@ public class DefaultHttp2Connection implements Http2Connection { // concrete cases known but is risky because it could invalidate the data structure. // 2. We are notifying listeners of the removal while the tree is in flux. Currently the // codec listeners make no assumptions about priority tree structure when being notified. - removeStream(oldParent); + removeStream(oldParent, null); } } @@ -739,7 +735,7 @@ public class DefaultHttp2Connection implements Http2Connection { // concrete cases known but is risky because it could invalidate the data structure. // 2. We are notifying listeners of the removal while the tree is in flux. Currently the // codec listeners make no assumptions about priority tree structure when being notified. - removeStream(this); + removeStream(this, null); } notifyParentChanged(events); return true; @@ -1178,14 +1174,14 @@ public class DefaultHttp2Connection implements Http2Connection { } } - public void deactivate(final DefaultStream stream) { + public void deactivate(final DefaultStream stream, final Iterator itr) { if (allowModifications()) { - removeFromActiveStreams(stream); + removeFromActiveStreams(stream, itr); } else { pendingEvents.add(new Event() { @Override public void process() { - removeFromActiveStreams(stream); + removeFromActiveStreams(stream, itr); } }); } @@ -1233,13 +1229,13 @@ public class DefaultHttp2Connection implements Http2Connection { } } - void removeFromActiveStreams(DefaultStream stream) { + void removeFromActiveStreams(DefaultStream stream, Iterator itr) { if (streams.remove(stream)) { // Update the number of active streams initiated by the endpoint. stream.createdBy().numActiveStreams--; } notifyClosed(stream); - removeStream(stream); + removeStream(stream, itr); } boolean allowModifications() { diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionTest.java index bb4ace388d..fdc043e960 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionTest.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionTest.java @@ -96,6 +96,20 @@ public class DefaultHttp2ConnectionTest { server = new DefaultHttp2Connection(true); client = new DefaultHttp2Connection(false); client.addListener(clientListener); + doAnswer(new Answer() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + assertNotNull(client.stream(invocation.getArgumentAt(0, Http2Stream.class).id())); + return null; + } + }).when(clientListener).onStreamClosed(any(Http2Stream.class)); + doAnswer(new Answer() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + assertNull(client.stream(invocation.getArgumentAt(0, Http2Stream.class).id())); + return null; + } + }).when(clientListener).onStreamRemoved(any(Http2Stream.class)); } @Test