HTTP/2 HEADERS stream dependency fix

Motivation:
The DefaultHttp2ConnectionDecoder has the setPriority call after the Http2FrameListener is notified of the change. The setPriority call has additional verification logic and may even create the dependency stream and so it must be before the Http2FrameListener is notified.

Modifications:
The DefaultHttp2ConnectionDecoder should treat the setPriority call in the same for the HEADERS and PRIORITY frame (call it before notifying the listener).

Result:
Http2FrameListener should see correct state when a HEADERS frame has a stream dependency that has not yet been created yet.  Fixes https://github.com/netty/netty/issues/3572.
This commit is contained in:
Scott Mitchell 2015-04-03 13:10:11 -07:00
parent e40c27d9ed
commit e5d01c4caf
2 changed files with 66 additions and 13 deletions

View File

@ -307,11 +307,18 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder {
}
}
try {
// This call will create a stream for streamDependency if necessary.
// For this reason it must be done before notifying the listener.
stream.setPriority(streamDependency, weight, exclusive);
} catch (ClosedStreamCreationException ignored) {
// It is possible that either the stream for this frame or the parent stream is closed.
// In this case we should ignore the exception and allow the frame to be sent.
}
listener.onHeadersRead(ctx, streamId, headers,
streamDependency, weight, exclusive, padding, endOfStream);
stream.setPriority(streamDependency, weight, exclusive);
// If the headers completes this stream, close it.
if (endOfStream) {
lifecycleManager.closeRemoteSide(stream, ctx.newSucceededFuture());

View File

@ -64,6 +64,7 @@ import org.mockito.stubbing.Answer;
public class DefaultHttp2ConnectionDecoderTest {
private static final int STREAM_ID = 1;
private static final int PUSH_STREAM_ID = 2;
private static final int STREAM_DEPENDENCY_ID = 3;
private Http2ConnectionDecoder decoder;
@ -344,6 +345,51 @@ public class DefaultHttp2ConnectionDecoderTest {
eq(DEFAULT_PRIORITY_WEIGHT), eq(false), eq(0), eq(true));
}
@Test
public void headersDependencyNotCreatedShouldCreateAndSucceed() throws Exception {
final short weight = 1;
decode().onHeadersRead(ctx, STREAM_ID, EmptyHttp2Headers.INSTANCE, STREAM_DEPENDENCY_ID,
weight, true, 0, true);
verify(listener).onHeadersRead(eq(ctx), eq(STREAM_ID), eq(EmptyHttp2Headers.INSTANCE), eq(STREAM_DEPENDENCY_ID),
eq(weight), eq(true), eq(0), eq(true));
verify(stream).setPriority(eq(STREAM_DEPENDENCY_ID), eq(weight), eq(true));
verify(lifecycleManager).closeRemoteSide(eq(stream), any(ChannelFuture.class));
}
@Test
public void headersDependencyPreviouslyCreatedStreamShouldSucceed() throws Exception {
final short weight = 1;
doAnswer(new Answer<Http2Stream>() {
@Override
public Http2Stream answer(InvocationOnMock in) throws Throwable {
throw new ClosedStreamCreationException(Http2Error.INTERNAL_ERROR);
}
}).when(stream).setPriority(eq(STREAM_DEPENDENCY_ID), eq(weight), eq(true));
decode().onHeadersRead(ctx, STREAM_ID, EmptyHttp2Headers.INSTANCE, STREAM_DEPENDENCY_ID,
weight, true, 0, true);
verify(listener).onHeadersRead(eq(ctx), eq(STREAM_ID), eq(EmptyHttp2Headers.INSTANCE), eq(STREAM_DEPENDENCY_ID),
eq(weight), eq(true), eq(0), eq(true));
verify(stream).setPriority(eq(STREAM_DEPENDENCY_ID), eq(weight), eq(true));
verify(lifecycleManager).closeRemoteSide(eq(stream), any(ChannelFuture.class));
}
@Test(expected = RuntimeException.class)
public void headersDependencyInvalidStreamShouldFail() throws Exception {
final short weight = 1;
doAnswer(new Answer<Http2Stream>() {
@Override
public Http2Stream answer(InvocationOnMock in) throws Throwable {
throw new RuntimeException("Fake Exception");
}
}).when(stream).setPriority(eq(STREAM_DEPENDENCY_ID), eq(weight), eq(true));
decode().onHeadersRead(ctx, STREAM_ID, EmptyHttp2Headers.INSTANCE, STREAM_DEPENDENCY_ID,
weight, true, 0, true);
verify(listener, never()).onHeadersRead(any(ChannelHandlerContext.class), anyInt(), any(Http2Headers.class),
anyInt(), anyShort(), anyBoolean(), anyInt(), anyBoolean());
verify(stream).setPriority(eq(STREAM_DEPENDENCY_ID), eq(weight), eq(true));
verify(lifecycleManager, never()).closeRemoteSide(eq(stream), any(ChannelFuture.class));
}
@Test
public void pushPromiseReadAfterGoAwayShouldBeIgnored() throws Exception {
when(connection.goAwaySent()).thenReturn(true);
@ -372,9 +418,9 @@ public class DefaultHttp2ConnectionDecoderTest {
public void priorityReadShouldSucceed() throws Exception {
when(connection.stream(STREAM_ID)).thenReturn(null);
when(connection.requireStream(STREAM_ID)).thenReturn(null);
decode().onPriorityRead(ctx, STREAM_ID, 0, (short) 255, true);
verify(stream).setPriority(eq(0), eq((short) 255), eq(true));
verify(listener).onPriorityRead(eq(ctx), eq(STREAM_ID), eq(0), eq((short) 255), eq(true));
decode().onPriorityRead(ctx, STREAM_ID, STREAM_DEPENDENCY_ID, (short) 255, true);
verify(stream).setPriority(eq(STREAM_DEPENDENCY_ID), eq((short) 255), eq(true));
verify(listener).onPriorityRead(eq(ctx), eq(STREAM_ID), eq(STREAM_DEPENDENCY_ID), eq((short) 255), eq(true));
verify(remote).createStream(STREAM_ID);
verify(stream, never()).open(anyBoolean());
}
@ -390,11 +436,11 @@ public class DefaultHttp2ConnectionDecoderTest {
when(connection.stream(STREAM_ID)).thenReturn(null);
when(connection.requireStream(STREAM_ID)).thenReturn(null);
// Just return the stream object as the connection stream to ensure the dependent stream "exists"
when(connection.stream(0)).thenReturn(stream);
when(connection.requireStream(0)).thenReturn(stream);
decode().onPriorityRead(ctx, STREAM_ID, 0, (short) 255, true);
when(connection.stream(STREAM_DEPENDENCY_ID)).thenReturn(stream);
when(connection.requireStream(STREAM_DEPENDENCY_ID)).thenReturn(stream);
decode().onPriorityRead(ctx, STREAM_ID, STREAM_DEPENDENCY_ID, (short) 255, true);
verify(stream, never()).setPriority(anyInt(), anyShort(), anyBoolean());
verify(listener).onPriorityRead(eq(ctx), eq(STREAM_ID), eq(0), eq((short) 255), eq(true));
verify(listener).onPriorityRead(eq(ctx), eq(STREAM_ID), eq(STREAM_DEPENDENCY_ID), eq((short) 255), eq(true));
verify(remote).createStream(STREAM_ID);
}
@ -405,12 +451,12 @@ public class DefaultHttp2ConnectionDecoderTest {
public Http2Stream answer(InvocationOnMock in) throws Throwable {
throw new ClosedStreamCreationException(Http2Error.INTERNAL_ERROR);
}
}).when(stream).setPriority(eq(0), eq((short) 255), eq(true));
}).when(stream).setPriority(eq(STREAM_DEPENDENCY_ID), eq((short) 255), eq(true));
when(connection.stream(STREAM_ID)).thenReturn(stream);
when(connection.requireStream(STREAM_ID)).thenReturn(stream);
decode().onPriorityRead(ctx, STREAM_ID, 0, (short) 255, true);
verify(stream).setPriority(eq(0), eq((short) 255), eq(true));
verify(listener).onPriorityRead(eq(ctx), eq(STREAM_ID), eq(0), eq((short) 255), eq(true));
decode().onPriorityRead(ctx, STREAM_ID, STREAM_DEPENDENCY_ID, (short) 255, true);
verify(stream).setPriority(eq(STREAM_DEPENDENCY_ID), eq((short) 255), eq(true));
verify(listener).onPriorityRead(eq(ctx), eq(STREAM_ID), eq(STREAM_DEPENDENCY_ID), eq((short) 255), eq(true));
}
@Test