HTTP/2 codec may not always call Http2Connection.onStreamRemoved

Motivation:
Http2Connection.onStreamRemoved is not always called if Http2Connection.onStreamAdded is called. This is problematic as users may rely on the onStreamRemoved method to be called to release ByteBuf objects and do other cleanup.

Modifications:
- Http2Connection.close will remove all streams existing streams and prevent new ones from being created
- Http2ConnectionHandler will call the new close method in channelInactive

Result:
Http2Connection.onStreamRemoved is always called when Http2Connection.onStreamRemoved is called to preserve the Http2Connection guarantees.
Fixes https://github.com/netty/netty/issues/4838
This commit is contained in:
Scott Mitchell 2016-02-05 19:02:49 -08:00
parent c6a3729e4c
commit 06e29e0d1b
5 changed files with 289 additions and 63 deletions

View File

@ -15,28 +15,16 @@
package io.netty.handler.codec.http2;
import static io.netty.handler.codec.http2.Http2CodecUtil.CONNECTION_STREAM_ID;
import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT;
import static io.netty.handler.codec.http2.Http2CodecUtil.MAX_WEIGHT;
import static io.netty.handler.codec.http2.Http2CodecUtil.MIN_WEIGHT;
import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR;
import static io.netty.handler.codec.http2.Http2Error.REFUSED_STREAM;
import static io.netty.handler.codec.http2.Http2Exception.closedStreamError;
import static io.netty.handler.codec.http2.Http2Exception.connectionError;
import static io.netty.handler.codec.http2.Http2Exception.streamError;
import static io.netty.handler.codec.http2.Http2Stream.State.CLOSED;
import static io.netty.handler.codec.http2.Http2Stream.State.HALF_CLOSED_LOCAL;
import static io.netty.handler.codec.http2.Http2Stream.State.HALF_CLOSED_REMOTE;
import static io.netty.handler.codec.http2.Http2Stream.State.IDLE;
import static io.netty.handler.codec.http2.Http2Stream.State.OPEN;
import static io.netty.handler.codec.http2.Http2Stream.State.RESERVED_LOCAL;
import static io.netty.handler.codec.http2.Http2Stream.State.RESERVED_REMOTE;
import static io.netty.util.internal.ObjectUtil.checkNotNull;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http2.Http2Stream.State;
import io.netty.util.collection.IntCollections;
import io.netty.util.collection.IntObjectHashMap;
import io.netty.util.collection.IntObjectMap;
import io.netty.util.collection.IntObjectMap.PrimitiveEntry;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.Promise;
import io.netty.util.internal.EmptyArrays;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.SystemPropertyUtil;
@ -51,6 +39,25 @@ import java.util.LinkedHashSet;
import java.util.List;
import java.util.Queue;
import java.util.Set;
import static io.netty.handler.codec.http2.Http2CodecUtil.CONNECTION_STREAM_ID;
import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT;
import static io.netty.handler.codec.http2.Http2CodecUtil.MAX_WEIGHT;
import static io.netty.handler.codec.http2.Http2CodecUtil.MIN_WEIGHT;
import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR;
import static io.netty.handler.codec.http2.Http2Error.REFUSED_STREAM;
import static io.netty.handler.codec.http2.Http2Exception.closedStreamError;
import static io.netty.handler.codec.http2.Http2Exception.connectionError;
import static io.netty.handler.codec.http2.Http2Exception.streamError;
import static io.netty.handler.codec.http2.Http2Stream.State.CLOSED;
import static io.netty.handler.codec.http2.Http2Stream.State.HALF_CLOSED_LOCAL;
import static io.netty.handler.codec.http2.Http2Stream.State.HALF_CLOSED_REMOTE;
import static io.netty.handler.codec.http2.Http2Stream.State.IDLE;
import static io.netty.handler.codec.http2.Http2Stream.State.OPEN;
import static io.netty.handler.codec.http2.Http2Stream.State.RESERVED_LOCAL;
import static io.netty.handler.codec.http2.Http2Stream.State.RESERVED_REMOTE;
import static io.netty.util.internal.ObjectUtil.checkNotNull;
import static java.lang.Math.max;
/**
@ -80,6 +87,7 @@ public class DefaultHttp2Connection implements Http2Connection {
*/
final List<Listener> listeners = new ArrayList<Listener>(4);
final ActiveStreams activeStreams;
Promise<Void> closePromise;
/**
* Creates a new connection with the given settings.
@ -96,6 +104,71 @@ public class DefaultHttp2Connection implements Http2Connection {
streamMap.put(connectionStream.id(), connectionStream);
}
/**
* Determine if {@link #close(Promise)} has been called and no more streams are allowed to be created.
*/
final boolean isClosed() {
return closePromise != null;
}
@Override
public Future<Void> close(final Promise<Void> promise) {
checkNotNull(promise, "promise");
// Since we allow this method to be called multiple times, we must make sure that all the promises are notified
// when all streams are removed and the close operation completes.
if (closePromise != null) {
if (closePromise == promise) {
// Do nothing
} else if ((promise instanceof ChannelPromise) && ((ChannelPromise) closePromise).isVoid()) {
closePromise = promise;
} else {
closePromise.addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
if (future.isSuccess()) {
promise.trySuccess(null);
} else if (future.isCancelled()) {
promise.cancel(false);
} else {
promise.tryFailure(future.cause());
}
}
});
}
} else {
closePromise = promise;
}
if (isStreamMapEmpty()) {
promise.trySuccess(null);
return promise;
}
Iterator<PrimitiveEntry<Http2Stream>> itr = streamMap.entries().iterator();
// We must take care while iterating the streamMap as to not modify while iterating in case there are other code
// paths iterating over the active streams.
if (activeStreams.allowModifications()) {
while (itr.hasNext()) {
Http2Stream stream = itr.next().value();
if (stream.id() != CONNECTION_STREAM_ID) {
// If modifications of the activeStream map is allowed, then a stream close operation will also
// modify the streamMap. We must prevent concurrent modifications to the streamMap, so use the
// iterator to remove the current stream.
itr.remove();
stream.close();
}
}
} else {
while (itr.hasNext()) {
Http2Stream stream = itr.next().value();
if (stream.id() != CONNECTION_STREAM_ID) {
// We are not allowed to make modifications, so the close calls will be executed after this
// iteration completes.
stream.close();
}
}
}
return closePromise;
}
@Override
public void addListener(Listener listener) {
listeners.add(listener);
@ -208,6 +281,13 @@ public class DefaultHttp2Connection implements Http2Connection {
}
}
/**
* Determine if {@link #streamMap} only contains the connection stream.
*/
private boolean isStreamMapEmpty() {
return streamMap.size() == 1;
}
/**
* Closed streams may stay in the priority tree if they have dependents that are in prioritizable states.
* When a stream is requested to be removed we can only actually remove that stream when there are no more
@ -220,7 +300,6 @@ public class DefaultHttp2Connection implements Http2Connection {
void removeStream(DefaultStream stream) {
// [1] Check if this stream can be removed because it has no prioritizable descendants.
if (stream.parent().removeChild(stream)) {
// Remove it from the map and priority tree.
streamMap.remove(stream.id());
for (int i = 0; i < listeners.size(); i++) {
@ -230,6 +309,10 @@ public class DefaultHttp2Connection implements Http2Connection {
logger.error("Caught RuntimeException from listener onStreamRemoved.", e);
}
}
if (closePromise != null && isStreamMapEmpty()) {
closePromise.trySuccess(null);
}
}
}
@ -604,17 +687,16 @@ public class DefaultHttp2Connection implements Http2Connection {
// path is updated with the correct child.prioritizableForTree() value. Note that the removal operation
// may not be successful and may return null. This is because when an exclusive dependency is processed
// the children are removed in a previous recursive call but the child's parent link is updated here.
if (oldParent != null && oldParent.children.remove(child.id()) != null) {
if (!child.isDescendantOf(oldParent)) {
oldParent.decrementPrioritizableForTree(child.prioritizableForTree());
if (oldParent.prioritizableForTree() == 0) {
// There are a few risks with immediately removing nodes from the priority tree:
// 1. We are removing nodes while we are potentially shifting the tree. There are no
// concrete cases known but is risky because it could invalidate the data structure.
// 2. We are notifying listeners of the removal while the tree is in flux. Currently the
// codec listeners make no assumptions about priority tree structure when being notified.
removeStream(oldParent);
}
if (oldParent != null && oldParent.children.remove(child.id()) != null &&
!child.isDescendantOf(oldParent)) {
oldParent.decrementPrioritizableForTree(child.prioritizableForTree());
if (oldParent.prioritizableForTree() == 0) {
// There are a few risks with immediately removing nodes from the priority tree:
// 1. We are removing nodes while we are potentially shifting the tree. There are no
// concrete cases known but is risky because it could invalidate the data structure.
// 2. We are notifying listeners of the removal while the tree is in flux. Currently the
// codec listeners make no assumptions about priority tree structure when being notified.
removeStream(oldParent);
}
}
@ -1040,6 +1122,10 @@ public class DefaultHttp2Connection implements Http2Connection {
if ((state.localSideOpen() || state.remoteSideOpen()) && !canOpenStream()) {
throw connectionError(REFUSED_STREAM, "Maximum active streams violated for this endpoint.");
}
if (isClosed()) {
throw connectionError(INTERNAL_ERROR, "Attempted to create stream id %d after connection was closed",
streamId);
}
}
private boolean isLocal() {
@ -1066,7 +1152,6 @@ public class DefaultHttp2Connection implements Http2Connection {
* active streams in order to prevent modification while iterating.
*/
private final class ActiveStreams {
private final List<Listener> listeners;
private final Queue<Event> pendingEvents = new ArrayDeque<Event>(4);
private final Set<Http2Stream> streams = new LinkedHashSet<Http2Stream>();
@ -1157,7 +1242,7 @@ public class DefaultHttp2Connection implements Http2Connection {
removeStream(stream);
}
private boolean allowModifications() {
boolean allowModifications() {
return pendingIterations == 0;
}
}

View File

@ -16,6 +16,8 @@
package io.netty.handler.codec.http2;
import io.netty.buffer.ByteBuf;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise;
/**
* Manager for the state of an HTTP/2 connection with the remote end-point.
@ -292,6 +294,16 @@ public interface Http2Connection {
interface PropertyKey {
}
/**
* Close this connection. No more new streams can be created after this point and
* all streams that exists (active or otherwise) will be closed and removed.
* <p>Note if iterating active streams via {@link #forEachActiveStream(Http2StreamVisitor)} and an exception is
* thrown it is necessary to call this method again to ensure the close completes.
* @param promise Will be completed when all streams have been removed, and listeners have been notified.
* @return A future that will be completed when all streams have been removed, and listeners have been notified.
*/
Future<Void> close(Promise<Void> promise);
/**
* Creates a new key that is unique within this {@link Http2Connection}.
*/

View File

@ -175,18 +175,9 @@ public class Http2ConnectionHandler extends ByteToMessageDecoder implements Http
encoder().close();
decoder().close();
final Http2Connection connection = connection();
// Check if there are streams to avoid the overhead of creating the ChannelFuture.
if (connection.numActiveStreams() > 0) {
final ChannelFuture future = ctx.newSucceededFuture();
connection.forEachActiveStream(new Http2StreamVisitor() {
@Override
public boolean visit(Http2Stream stream) throws Http2Exception {
closeStream(stream, future);
return true;
}
});
}
// We need to remove all streams (not just the active ones).
// See https://github.com/netty/netty/issues/4838.
connection().close(ctx.voidPromise());
}
/**

View File

@ -15,6 +15,33 @@
package io.netty.handler.codec.http2;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.DefaultEventLoopGroup;
import io.netty.handler.codec.http2.Http2Connection.Endpoint;
import io.netty.handler.codec.http2.Http2Stream.State;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.Promise;
import io.netty.util.internal.PlatformDependent;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT;
import static io.netty.handler.codec.http2.Http2CodecUtil.MIN_WEIGHT;
import static org.junit.Assert.assertEquals;
@ -35,25 +62,6 @@ import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.http2.Http2Connection.Endpoint;
import io.netty.handler.codec.http2.Http2Stream.State;
import io.netty.util.internal.PlatformDependent;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
/**
* Tests for {@link DefaultHttp2Connection}.
*/
@ -63,6 +71,7 @@ public class DefaultHttp2ConnectionTest {
private DefaultHttp2Connection server;
private DefaultHttp2Connection client;
private static DefaultEventLoopGroup group;
@Mock
private Http2Connection.Listener clientListener;
@ -70,6 +79,16 @@ public class DefaultHttp2ConnectionTest {
@Mock
private Http2Connection.Listener clientListener2;
@BeforeClass
public static void beforeClass() {
group = new DefaultEventLoopGroup(2);
}
@AfterClass
public static void afterClass() {
group.shutdownGracefully();
}
@Before
public void setup() {
MockitoAnnotations.initMocks(this);
@ -84,6 +103,110 @@ public class DefaultHttp2ConnectionTest {
assertNull(server.stream(100));
}
@Test
public void removeAllStreamsWithEmptyStreams() throws InterruptedException {
testRemoveAllStreams();
}
@Test
public void removeAllStreamsWithJustOneLocalStream() throws InterruptedException, Http2Exception {
client.local().createStream(3, false);
testRemoveAllStreams();
}
@Test
public void removeAllStreamsWithJustOneRemoveStream() throws InterruptedException, Http2Exception {
client.remote().createStream(2, false);
testRemoveAllStreams();
}
@Test
public void removeAllStreamsWithManyActiveStreams() throws InterruptedException, Http2Exception {
Endpoint<Http2RemoteFlowController> remote = client.remote();
Endpoint<Http2LocalFlowController> local = client.local();
for (int c = 3, s = 2; c < 5000; c += 2, s += 2) {
local.createStream(c, false);
remote.createStream(s, false);
}
testRemoveAllStreams();
}
@Test
public void removeAllStreamsWithNonActiveStreams() throws InterruptedException, Http2Exception {
client.local().createIdleStream(3);
client.remote().createIdleStream(2);
testRemoveAllStreams();
}
@Test
public void removeAllStreamsWithNonActiveAndActiveStreams() throws InterruptedException, Http2Exception {
client.local().createIdleStream(3);
client.remote().createIdleStream(2);
client.local().createStream(5, false);
client.remote().createStream(4, true);
testRemoveAllStreams();
}
@Test
public void removeAllStreamsWhileIteratingActiveStreams() throws InterruptedException, Http2Exception {
final Endpoint<Http2RemoteFlowController> remote = client.remote();
final Endpoint<Http2LocalFlowController> local = client.local();
for (int c = 3, s = 2; c < 5000; c += 2, s += 2) {
local.createStream(c, false);
remote.createStream(s, false);
}
final Promise<Void> promise = group.next().newPromise();
final CountDownLatch latch = new CountDownLatch(client.numActiveStreams());
client.forEachActiveStream(new Http2StreamVisitor() {
@Override
public boolean visit(Http2Stream stream) throws Http2Exception {
client.close(promise).addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
assertTrue(promise.isDone());
latch.countDown();
}
});
return true;
}
});
assertTrue(latch.await(2, TimeUnit.SECONDS));
}
@Test
public void removeAllStreamsWhileIteratingActiveStreamsAndExceptionOccurs()
throws InterruptedException, Http2Exception {
final Endpoint<Http2RemoteFlowController> remote = client.remote();
final Endpoint<Http2LocalFlowController> local = client.local();
for (int c = 3, s = 2; c < 5000; c += 2, s += 2) {
local.createStream(c, false);
remote.createStream(s, false);
}
final Promise<Void> promise = group.next().newPromise();
final CountDownLatch latch = new CountDownLatch(1);
try {
client.forEachActiveStream(new Http2StreamVisitor() {
@Override
public boolean visit(Http2Stream stream) throws Http2Exception {
// This close call is basically a noop, because the following statement will throw an exception.
client.close(promise);
// Do an invalid operation while iterating.
remote.createStream(3, false);
return true;
}
});
} catch (Http2Exception ignored) {
client.close(promise).addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
assertTrue(promise.isDone());
latch.countDown();
}
});
}
assertTrue(latch.await(2, TimeUnit.SECONDS));
}
@Test
public void goAwayReceivedShouldCloseStreamsGreaterThanLastStream() throws Exception {
Http2Stream stream1 = client.local().createStream(3, false);
@ -1107,6 +1230,19 @@ public class DefaultHttp2ConnectionTest {
}
}
private void testRemoveAllStreams() throws InterruptedException {
final CountDownLatch latch = new CountDownLatch(1);
final Promise<Void> promise = group.next().newPromise();
client.close(promise).addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
assertTrue(promise.isDone());
latch.countDown();
}
});
assertTrue(latch.await(2, TimeUnit.SECONDS));
}
private void incrementAndGetStreamShouldRespectOverflow(Endpoint<?> endpoint, int streamId) throws Http2Exception {
assertTrue(streamId > 0);
try {

View File

@ -26,6 +26,7 @@ import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultChannelPromise;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.concurrent.Promise;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@ -269,11 +270,12 @@ public class Http2ConnectionHandlerTest {
verify(decoder, atLeastOnce()).decodeFrame(eq(ctx), any(ByteBuf.class), Matchers.<List<Object>>any());
}
@SuppressWarnings("unchecked")
@Test
public void channelInactiveShouldCloseStreams() throws Exception {
handler = newHandler();
handler.channelInactive(ctx);
verify(stream).close();
verify(connection).close(any(Promise.class));
}
@Test