From 06e29e0d1b4ebf16b47969d5fbc71bdaf19fe3e8 Mon Sep 17 00:00:00 2001 From: Scott Mitchell Date: Fri, 5 Feb 2016 19:02:49 -0800 Subject: [PATCH] HTTP/2 codec may not always call Http2Connection.onStreamRemoved Motivation: Http2Connection.onStreamRemoved is not always called if Http2Connection.onStreamAdded is called. This is problematic as users may rely on the onStreamRemoved method to be called to release ByteBuf objects and do other cleanup. Modifications: - Http2Connection.close will remove all streams existing streams and prevent new ones from being created - Http2ConnectionHandler will call the new close method in channelInactive Result: Http2Connection.onStreamRemoved is always called when Http2Connection.onStreamRemoved is called to preserve the Http2Connection guarantees. Fixes https://github.com/netty/netty/issues/4838 --- .../codec/http2/DefaultHttp2Connection.java | 147 +++++++++++---- .../handler/codec/http2/Http2Connection.java | 12 ++ .../codec/http2/Http2ConnectionHandler.java | 15 +- .../http2/DefaultHttp2ConnectionTest.java | 174 ++++++++++++++++-- .../http2/Http2ConnectionHandlerTest.java | 4 +- 5 files changed, 289 insertions(+), 63 deletions(-) diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2Connection.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2Connection.java index 589546dbe1..6dae971b4a 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2Connection.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2Connection.java @@ -15,28 +15,16 @@ package io.netty.handler.codec.http2; -import static io.netty.handler.codec.http2.Http2CodecUtil.CONNECTION_STREAM_ID; -import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT; -import static io.netty.handler.codec.http2.Http2CodecUtil.MAX_WEIGHT; -import static io.netty.handler.codec.http2.Http2CodecUtil.MIN_WEIGHT; -import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR; -import static io.netty.handler.codec.http2.Http2Error.REFUSED_STREAM; -import static io.netty.handler.codec.http2.Http2Exception.closedStreamError; -import static io.netty.handler.codec.http2.Http2Exception.connectionError; -import static io.netty.handler.codec.http2.Http2Exception.streamError; -import static io.netty.handler.codec.http2.Http2Stream.State.CLOSED; -import static io.netty.handler.codec.http2.Http2Stream.State.HALF_CLOSED_LOCAL; -import static io.netty.handler.codec.http2.Http2Stream.State.HALF_CLOSED_REMOTE; -import static io.netty.handler.codec.http2.Http2Stream.State.IDLE; -import static io.netty.handler.codec.http2.Http2Stream.State.OPEN; -import static io.netty.handler.codec.http2.Http2Stream.State.RESERVED_LOCAL; -import static io.netty.handler.codec.http2.Http2Stream.State.RESERVED_REMOTE; -import static io.netty.util.internal.ObjectUtil.checkNotNull; import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelPromise; import io.netty.handler.codec.http2.Http2Stream.State; import io.netty.util.collection.IntCollections; import io.netty.util.collection.IntObjectHashMap; import io.netty.util.collection.IntObjectMap; +import io.netty.util.collection.IntObjectMap.PrimitiveEntry; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.FutureListener; +import io.netty.util.concurrent.Promise; import io.netty.util.internal.EmptyArrays; import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.SystemPropertyUtil; @@ -51,6 +39,25 @@ import java.util.LinkedHashSet; import java.util.List; import java.util.Queue; import java.util.Set; + +import static io.netty.handler.codec.http2.Http2CodecUtil.CONNECTION_STREAM_ID; +import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT; +import static io.netty.handler.codec.http2.Http2CodecUtil.MAX_WEIGHT; +import static io.netty.handler.codec.http2.Http2CodecUtil.MIN_WEIGHT; +import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR; +import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR; +import static io.netty.handler.codec.http2.Http2Error.REFUSED_STREAM; +import static io.netty.handler.codec.http2.Http2Exception.closedStreamError; +import static io.netty.handler.codec.http2.Http2Exception.connectionError; +import static io.netty.handler.codec.http2.Http2Exception.streamError; +import static io.netty.handler.codec.http2.Http2Stream.State.CLOSED; +import static io.netty.handler.codec.http2.Http2Stream.State.HALF_CLOSED_LOCAL; +import static io.netty.handler.codec.http2.Http2Stream.State.HALF_CLOSED_REMOTE; +import static io.netty.handler.codec.http2.Http2Stream.State.IDLE; +import static io.netty.handler.codec.http2.Http2Stream.State.OPEN; +import static io.netty.handler.codec.http2.Http2Stream.State.RESERVED_LOCAL; +import static io.netty.handler.codec.http2.Http2Stream.State.RESERVED_REMOTE; +import static io.netty.util.internal.ObjectUtil.checkNotNull; import static java.lang.Math.max; /** @@ -80,6 +87,7 @@ public class DefaultHttp2Connection implements Http2Connection { */ final List listeners = new ArrayList(4); final ActiveStreams activeStreams; + Promise closePromise; /** * Creates a new connection with the given settings. @@ -96,6 +104,71 @@ public class DefaultHttp2Connection implements Http2Connection { streamMap.put(connectionStream.id(), connectionStream); } + /** + * Determine if {@link #close(Promise)} has been called and no more streams are allowed to be created. + */ + final boolean isClosed() { + return closePromise != null; + } + + @Override + public Future close(final Promise promise) { + checkNotNull(promise, "promise"); + // Since we allow this method to be called multiple times, we must make sure that all the promises are notified + // when all streams are removed and the close operation completes. + if (closePromise != null) { + if (closePromise == promise) { + // Do nothing + } else if ((promise instanceof ChannelPromise) && ((ChannelPromise) closePromise).isVoid()) { + closePromise = promise; + } else { + closePromise.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (future.isSuccess()) { + promise.trySuccess(null); + } else if (future.isCancelled()) { + promise.cancel(false); + } else { + promise.tryFailure(future.cause()); + } + } + }); + } + } else { + closePromise = promise; + } + if (isStreamMapEmpty()) { + promise.trySuccess(null); + return promise; + } + Iterator> itr = streamMap.entries().iterator(); + // We must take care while iterating the streamMap as to not modify while iterating in case there are other code + // paths iterating over the active streams. + if (activeStreams.allowModifications()) { + while (itr.hasNext()) { + Http2Stream stream = itr.next().value(); + if (stream.id() != CONNECTION_STREAM_ID) { + // If modifications of the activeStream map is allowed, then a stream close operation will also + // modify the streamMap. We must prevent concurrent modifications to the streamMap, so use the + // iterator to remove the current stream. + itr.remove(); + stream.close(); + } + } + } else { + while (itr.hasNext()) { + Http2Stream stream = itr.next().value(); + if (stream.id() != CONNECTION_STREAM_ID) { + // We are not allowed to make modifications, so the close calls will be executed after this + // iteration completes. + stream.close(); + } + } + } + return closePromise; + } + @Override public void addListener(Listener listener) { listeners.add(listener); @@ -208,6 +281,13 @@ public class DefaultHttp2Connection implements Http2Connection { } } + /** + * Determine if {@link #streamMap} only contains the connection stream. + */ + private boolean isStreamMapEmpty() { + return streamMap.size() == 1; + } + /** * Closed streams may stay in the priority tree if they have dependents that are in prioritizable states. * When a stream is requested to be removed we can only actually remove that stream when there are no more @@ -220,7 +300,6 @@ public class DefaultHttp2Connection implements Http2Connection { void removeStream(DefaultStream stream) { // [1] Check if this stream can be removed because it has no prioritizable descendants. if (stream.parent().removeChild(stream)) { - // Remove it from the map and priority tree. streamMap.remove(stream.id()); for (int i = 0; i < listeners.size(); i++) { @@ -230,6 +309,10 @@ public class DefaultHttp2Connection implements Http2Connection { logger.error("Caught RuntimeException from listener onStreamRemoved.", e); } } + + if (closePromise != null && isStreamMapEmpty()) { + closePromise.trySuccess(null); + } } } @@ -604,17 +687,16 @@ public class DefaultHttp2Connection implements Http2Connection { // path is updated with the correct child.prioritizableForTree() value. Note that the removal operation // may not be successful and may return null. This is because when an exclusive dependency is processed // the children are removed in a previous recursive call but the child's parent link is updated here. - if (oldParent != null && oldParent.children.remove(child.id()) != null) { - if (!child.isDescendantOf(oldParent)) { - oldParent.decrementPrioritizableForTree(child.prioritizableForTree()); - if (oldParent.prioritizableForTree() == 0) { - // There are a few risks with immediately removing nodes from the priority tree: - // 1. We are removing nodes while we are potentially shifting the tree. There are no - // concrete cases known but is risky because it could invalidate the data structure. - // 2. We are notifying listeners of the removal while the tree is in flux. Currently the - // codec listeners make no assumptions about priority tree structure when being notified. - removeStream(oldParent); - } + if (oldParent != null && oldParent.children.remove(child.id()) != null && + !child.isDescendantOf(oldParent)) { + oldParent.decrementPrioritizableForTree(child.prioritizableForTree()); + if (oldParent.prioritizableForTree() == 0) { + // There are a few risks with immediately removing nodes from the priority tree: + // 1. We are removing nodes while we are potentially shifting the tree. There are no + // concrete cases known but is risky because it could invalidate the data structure. + // 2. We are notifying listeners of the removal while the tree is in flux. Currently the + // codec listeners make no assumptions about priority tree structure when being notified. + removeStream(oldParent); } } @@ -1040,6 +1122,10 @@ public class DefaultHttp2Connection implements Http2Connection { if ((state.localSideOpen() || state.remoteSideOpen()) && !canOpenStream()) { throw connectionError(REFUSED_STREAM, "Maximum active streams violated for this endpoint."); } + if (isClosed()) { + throw connectionError(INTERNAL_ERROR, "Attempted to create stream id %d after connection was closed", + streamId); + } } private boolean isLocal() { @@ -1066,7 +1152,6 @@ public class DefaultHttp2Connection implements Http2Connection { * active streams in order to prevent modification while iterating. */ private final class ActiveStreams { - private final List listeners; private final Queue pendingEvents = new ArrayDeque(4); private final Set streams = new LinkedHashSet(); @@ -1157,7 +1242,7 @@ public class DefaultHttp2Connection implements Http2Connection { removeStream(stream); } - private boolean allowModifications() { + boolean allowModifications() { return pendingIterations == 0; } } diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2Connection.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2Connection.java index 380ab329a7..419e216982 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2Connection.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2Connection.java @@ -16,6 +16,8 @@ package io.netty.handler.codec.http2; import io.netty.buffer.ByteBuf; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.Promise; /** * Manager for the state of an HTTP/2 connection with the remote end-point. @@ -292,6 +294,16 @@ public interface Http2Connection { interface PropertyKey { } + /** + * Close this connection. No more new streams can be created after this point and + * all streams that exists (active or otherwise) will be closed and removed. + *

Note if iterating active streams via {@link #forEachActiveStream(Http2StreamVisitor)} and an exception is + * thrown it is necessary to call this method again to ensure the close completes. + * @param promise Will be completed when all streams have been removed, and listeners have been notified. + * @return A future that will be completed when all streams have been removed, and listeners have been notified. + */ + Future close(Promise promise); + /** * Creates a new key that is unique within this {@link Http2Connection}. */ diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ConnectionHandler.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ConnectionHandler.java index e8f92c14b7..5c975af55a 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ConnectionHandler.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ConnectionHandler.java @@ -175,18 +175,9 @@ public class Http2ConnectionHandler extends ByteToMessageDecoder implements Http encoder().close(); decoder().close(); - final Http2Connection connection = connection(); - // Check if there are streams to avoid the overhead of creating the ChannelFuture. - if (connection.numActiveStreams() > 0) { - final ChannelFuture future = ctx.newSucceededFuture(); - connection.forEachActiveStream(new Http2StreamVisitor() { - @Override - public boolean visit(Http2Stream stream) throws Http2Exception { - closeStream(stream, future); - return true; - } - }); - } + // We need to remove all streams (not just the active ones). + // See https://github.com/netty/netty/issues/4838. + connection().close(ctx.voidPromise()); } /** diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionTest.java index 2e68c39ff1..bb4ace388d 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionTest.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionTest.java @@ -15,6 +15,33 @@ package io.netty.handler.codec.http2; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.DefaultEventLoopGroup; +import io.netty.handler.codec.http2.Http2Connection.Endpoint; +import io.netty.handler.codec.http2.Http2Stream.State; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.FutureListener; +import io.netty.util.concurrent.Promise; +import io.netty.util.internal.PlatformDependent; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT; import static io.netty.handler.codec.http2.Http2CodecUtil.MIN_WEIGHT; import static org.junit.Assert.assertEquals; @@ -35,25 +62,6 @@ import static org.mockito.Mockito.reset; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; -import io.netty.handler.codec.http2.Http2Connection.Endpoint; -import io.netty.handler.codec.http2.Http2Stream.State; -import io.netty.util.internal.PlatformDependent; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; -import org.mockito.ArgumentCaptor; -import org.mockito.Mock; -import org.mockito.MockitoAnnotations; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; - -import java.util.Arrays; -import java.util.List; -import java.util.concurrent.atomic.AtomicReference; - /** * Tests for {@link DefaultHttp2Connection}. */ @@ -63,6 +71,7 @@ public class DefaultHttp2ConnectionTest { private DefaultHttp2Connection server; private DefaultHttp2Connection client; + private static DefaultEventLoopGroup group; @Mock private Http2Connection.Listener clientListener; @@ -70,6 +79,16 @@ public class DefaultHttp2ConnectionTest { @Mock private Http2Connection.Listener clientListener2; + @BeforeClass + public static void beforeClass() { + group = new DefaultEventLoopGroup(2); + } + + @AfterClass + public static void afterClass() { + group.shutdownGracefully(); + } + @Before public void setup() { MockitoAnnotations.initMocks(this); @@ -84,6 +103,110 @@ public class DefaultHttp2ConnectionTest { assertNull(server.stream(100)); } + @Test + public void removeAllStreamsWithEmptyStreams() throws InterruptedException { + testRemoveAllStreams(); + } + + @Test + public void removeAllStreamsWithJustOneLocalStream() throws InterruptedException, Http2Exception { + client.local().createStream(3, false); + testRemoveAllStreams(); + } + + @Test + public void removeAllStreamsWithJustOneRemoveStream() throws InterruptedException, Http2Exception { + client.remote().createStream(2, false); + testRemoveAllStreams(); + } + + @Test + public void removeAllStreamsWithManyActiveStreams() throws InterruptedException, Http2Exception { + Endpoint remote = client.remote(); + Endpoint local = client.local(); + for (int c = 3, s = 2; c < 5000; c += 2, s += 2) { + local.createStream(c, false); + remote.createStream(s, false); + } + testRemoveAllStreams(); + } + + @Test + public void removeAllStreamsWithNonActiveStreams() throws InterruptedException, Http2Exception { + client.local().createIdleStream(3); + client.remote().createIdleStream(2); + testRemoveAllStreams(); + } + + @Test + public void removeAllStreamsWithNonActiveAndActiveStreams() throws InterruptedException, Http2Exception { + client.local().createIdleStream(3); + client.remote().createIdleStream(2); + client.local().createStream(5, false); + client.remote().createStream(4, true); + testRemoveAllStreams(); + } + + @Test + public void removeAllStreamsWhileIteratingActiveStreams() throws InterruptedException, Http2Exception { + final Endpoint remote = client.remote(); + final Endpoint local = client.local(); + for (int c = 3, s = 2; c < 5000; c += 2, s += 2) { + local.createStream(c, false); + remote.createStream(s, false); + } + final Promise promise = group.next().newPromise(); + final CountDownLatch latch = new CountDownLatch(client.numActiveStreams()); + client.forEachActiveStream(new Http2StreamVisitor() { + @Override + public boolean visit(Http2Stream stream) throws Http2Exception { + client.close(promise).addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + assertTrue(promise.isDone()); + latch.countDown(); + } + }); + return true; + } + }); + assertTrue(latch.await(2, TimeUnit.SECONDS)); + } + + @Test + public void removeAllStreamsWhileIteratingActiveStreamsAndExceptionOccurs() + throws InterruptedException, Http2Exception { + final Endpoint remote = client.remote(); + final Endpoint local = client.local(); + for (int c = 3, s = 2; c < 5000; c += 2, s += 2) { + local.createStream(c, false); + remote.createStream(s, false); + } + final Promise promise = group.next().newPromise(); + final CountDownLatch latch = new CountDownLatch(1); + try { + client.forEachActiveStream(new Http2StreamVisitor() { + @Override + public boolean visit(Http2Stream stream) throws Http2Exception { + // This close call is basically a noop, because the following statement will throw an exception. + client.close(promise); + // Do an invalid operation while iterating. + remote.createStream(3, false); + return true; + } + }); + } catch (Http2Exception ignored) { + client.close(promise).addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + assertTrue(promise.isDone()); + latch.countDown(); + } + }); + } + assertTrue(latch.await(2, TimeUnit.SECONDS)); + } + @Test public void goAwayReceivedShouldCloseStreamsGreaterThanLastStream() throws Exception { Http2Stream stream1 = client.local().createStream(3, false); @@ -1107,6 +1230,19 @@ public class DefaultHttp2ConnectionTest { } } + private void testRemoveAllStreams() throws InterruptedException { + final CountDownLatch latch = new CountDownLatch(1); + final Promise promise = group.next().newPromise(); + client.close(promise).addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + assertTrue(promise.isDone()); + latch.countDown(); + } + }); + assertTrue(latch.await(2, TimeUnit.SECONDS)); + } + private void incrementAndGetStreamShouldRespectOverflow(Endpoint endpoint, int streamId) throws Http2Exception { assertTrue(streamId > 0); try { diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2ConnectionHandlerTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2ConnectionHandlerTest.java index c17cc4d881..ed9d1c06ea 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2ConnectionHandlerTest.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2ConnectionHandlerTest.java @@ -26,6 +26,7 @@ import io.netty.channel.ChannelPromise; import io.netty.channel.DefaultChannelPromise; import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.GenericFutureListener; +import io.netty.util.concurrent.Promise; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -269,11 +270,12 @@ public class Http2ConnectionHandlerTest { verify(decoder, atLeastOnce()).decodeFrame(eq(ctx), any(ByteBuf.class), Matchers.>any()); } + @SuppressWarnings("unchecked") @Test public void channelInactiveShouldCloseStreams() throws Exception { handler = newHandler(); handler.channelInactive(ctx); - verify(stream).close(); + verify(connection).close(any(Promise.class)); } @Test