From 19c864505d017a52fdbf1ef600715d42d36d36fa Mon Sep 17 00:00:00 2001 From: Jakob Buchgraber Date: Thu, 19 Mar 2015 18:36:24 -0700 Subject: [PATCH] Fix GOAWAY logic in Http2Encoder and Http2Decoder. Motivation: 1) The current implementation doesn't allow for HEADERS, DATA, PING, PRIORITY and SETTINGS frames to be sent after GOAWAY. 2) When receiving or sending a GOAWAY frame, all streams with ids greater than the lastStreamId of the GOAWAY frame should be closed. That's not happening. Modifications: 1) Allow sending of HEADERS and DATA frames after GOAWAY for streams with ids < lastStreamId. 2) Always allow sending PING, PRIORITY AND SETTINGS frames. 3) Allow sending multiple GOAWAY frames with decreasing lastStreamIds. 4) After receiving or sending a GOAWAY frame, close all streams with ids > lastStreamId. Result: The GOAWAY handling is more correct. --- .../codec/http2/DefaultHttp2Connection.java | 32 +++-- .../http2/DefaultHttp2ConnectionDecoder.java | 109 +++++----------- .../http2/DefaultHttp2ConnectionEncoder.java | 30 +---- .../handler/codec/http2/Http2Connection.java | 5 +- .../codec/http2/Http2ConnectionHandler.java | 57 ++++++--- .../DefaultHttp2ConnectionEncoderTest.java | 121 ++++++++++++------ .../http2/DefaultHttp2ConnectionTest.java | 39 ++++++ .../http2/Http2ConnectionHandlerTest.java | 51 +++++++- 8 files changed, 268 insertions(+), 176 deletions(-) diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2Connection.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2Connection.java index b8f9fea7c5..8551634632 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2Connection.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2Connection.java @@ -163,11 +163,15 @@ public class DefaultHttp2Connection implements Http2Connection { @Override public void goAwayReceived(int lastKnownStream, long errorCode, ByteBuf debugData) { - boolean alreadyNotified = goAwayReceived(); localEndpoint.lastKnownStream(lastKnownStream); - if (!alreadyNotified) { - for (int i = 0; i < listeners.size(); i++) { - listeners.get(i).onGoAwayReceived(lastKnownStream, errorCode, debugData); + for (Listener listener : listeners) { + listener.onGoAwayReceived(lastKnownStream, errorCode, debugData); + } + + Http2Stream[] streams = new Http2Stream[numActiveStreams()]; + for (Http2Stream stream : activeStreams().toArray(streams)) { + if (stream.id() > lastKnownStream && localEndpoint.createdStreamId(stream.id())) { + stream.close(); } } } @@ -179,11 +183,15 @@ public class DefaultHttp2Connection implements Http2Connection { @Override public void goAwaySent(int lastKnownStream, long errorCode, ByteBuf debugData) { - boolean alreadyNotified = goAwaySent(); remoteEndpoint.lastKnownStream(lastKnownStream); - if (!alreadyNotified) { - for (int i = 0; i < listeners.size(); i++) { - listeners.get(i).onGoAwaySent(lastKnownStream, errorCode, debugData); + for (Listener listener : listeners) { + listener.onGoAwaySent(lastKnownStream, errorCode, debugData); + } + + Http2Stream[] streams = new Http2Stream[numActiveStreams()]; + for (Http2Stream stream : activeStreams().toArray(streams)) { + if (stream.id() > lastKnownStream && remoteEndpoint.createdStreamId(stream.id())) { + stream.close(); } } } @@ -911,7 +919,7 @@ public class DefaultHttp2Connection implements Http2Connection { @Override public int lastKnownStream() { - return lastKnownStream >= 0 ? lastKnownStream : lastStreamCreated; + return lastKnownStream; } private void lastKnownStream(int lastKnownStream) { @@ -934,8 +942,10 @@ public class DefaultHttp2Connection implements Http2Connection { } private void checkNewStreamAllowed(int streamId) throws Http2Exception { - if (goAwaySent() || goAwayReceived()) { - throw connectionError(PROTOCOL_ERROR, "Cannot create a stream since the connection is going away"); + if (goAwayReceived() && streamId > localEndpoint.lastKnownStream()) { + throw connectionError(PROTOCOL_ERROR, "Cannot create stream %d since this endpoint has received a " + + "GOAWAY frame with last stream id %d.", streamId, + localEndpoint.lastKnownStream()); } if (streamId < 0) { throw new Http2NoMoreStreamIdsException(); diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionDecoder.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionDecoder.java index 134cc48e61..14929bf95d 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionDecoder.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionDecoder.java @@ -168,10 +168,8 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder { void onGoAwayRead0(ChannelHandlerContext ctx, int lastStreamId, long errorCode, ByteBuf debugData) throws Http2Exception { - // Don't allow any more connections to be created. - connection.goAwayReceived(lastStreamId, errorCode, debugData); - listener.onGoAwayRead(ctx, lastStreamId, errorCode, debugData); + connection.goAwayReceived(lastStreamId, errorCode, debugData); } void onUnknownFrame0(ChannelHandlerContext ctx, byte frameType, int streamId, Http2Flags flags, @@ -186,53 +184,42 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder { @Override public int onDataRead(final ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, boolean endOfStream) throws Http2Exception { - // Check if we received a data frame for a stream which is half-closed Http2Stream stream = connection.requireStream(streamId); + Http2LocalFlowController flowController = flowController(); + int bytesToReturn = data.readableBytes() + padding; - verifyGoAwayNotReceived(); + if (stream.isResetSent() || streamCreatedAfterGoAwaySent(stream)) { + // Count the frame towards the connection flow control window and don't process it further. + flowController.receiveFlowControlledFrame(ctx, stream, data, padding, endOfStream); + flowController.consumeBytes(ctx, stream, bytesToReturn); + + // Since no bytes are consumed, return them all. + return bytesToReturn; + } - // We should ignore this frame if RST_STREAM was sent or if GO_AWAY was sent with a - // lower stream ID. - boolean shouldIgnore = shouldIgnoreFrame(stream, false); Http2Exception error = null; switch (stream.state()) { case OPEN: case HALF_CLOSED_LOCAL: break; case HALF_CLOSED_REMOTE: - // Always fail the stream if we've more data after the remote endpoint half-closed. + case CLOSED: error = streamError(stream.id(), STREAM_CLOSED, "Stream %d in unexpected state: %s", stream.id(), stream.state()); break; - case CLOSED: - if (!shouldIgnore) { - error = streamError(stream.id(), STREAM_CLOSED, "Stream %d in unexpected state: %s", - stream.id(), stream.state()); - } - break; default: - if (!shouldIgnore) { - error = streamError(stream.id(), PROTOCOL_ERROR, - "Stream %d in unexpected state: %s", stream.id(), stream.state()); - } + error = streamError(stream.id(), PROTOCOL_ERROR, + "Stream %d in unexpected state: %s", stream.id(), stream.state()); break; } - int bytesToReturn = data.readableBytes() + padding; int unconsumedBytes = unconsumedBytes(stream); - Http2LocalFlowController flowController = flowController(); try { - // If we should apply flow control, do so now. flowController.receiveFlowControlledFrame(ctx, stream, data, padding, endOfStream); // Update the unconsumed bytes after flow control is applied. unconsumedBytes = unconsumedBytes(stream); - // If we should ignore this frame, do so now. - if (shouldIgnore) { - return bytesToReturn; - } - - // If the stream was in an invalid state to receive the frame, throw the error. + // If the stream is in an invalid state to receive the frame, throw the error. if (error != null) { throw error; } @@ -256,7 +243,7 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder { bytesToReturn -= delta; throw e; } finally { - // If appropriate, returned the processed bytes to the flow controller. + // If appropriate, return the processed bytes to the flow controller. if (bytesToReturn > 0) { flowController.consumeBytes(ctx, stream, bytesToReturn); } @@ -277,14 +264,12 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder { public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int streamDependency, short weight, boolean exclusive, int padding, boolean endOfStream) throws Http2Exception { Http2Stream stream = connection.stream(streamId); - verifyGoAwayNotReceived(); - if (shouldIgnoreFrame(stream, false)) { - // Ignore this frame. - return; - } if (stream == null) { stream = connection.remote().createStream(streamId).open(endOfStream); + } else if (stream.isResetSent() || streamCreatedAfterGoAwaySent(stream)) { + // Ignore this frame. + return; } else { switch (stream.state()) { case RESERVED_REMOTE: @@ -297,9 +282,8 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder { break; case HALF_CLOSED_REMOTE: case CLOSED: - // Stream error. throw streamError(stream.id(), STREAM_CLOSED, "Stream %d in unexpected state: %s", - stream.id(), stream.state()); + stream.id(), stream.state()); default: // Connection error. throw connectionError(PROTOCOL_ERROR, "Stream %d in unexpected state: %s", stream.id(), @@ -316,8 +300,7 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder { // In this case we should ignore the exception and allow the frame to be sent. } - listener.onHeadersRead(ctx, streamId, headers, - streamDependency, weight, exclusive, padding, endOfStream); + listener.onHeadersRead(ctx, streamId, headers, streamDependency, weight, exclusive, padding, endOfStream); // If the headers completes this stream, close it. if (endOfStream) { @@ -329,17 +312,15 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder { public void onPriorityRead(ChannelHandlerContext ctx, int streamId, int streamDependency, short weight, boolean exclusive) throws Http2Exception { Http2Stream stream = connection.stream(streamId); - verifyGoAwayNotReceived(); - if (shouldIgnoreFrame(stream, true)) { - // Ignore this frame. - return; - } try { if (stream == null) { // PRIORITY frames always identify a stream. This means that if a PRIORITY frame is the // first frame to be received for a stream that we must create the stream. stream = connection.remote().createStream(streamId); + } else if (streamCreatedAfterGoAwaySent(stream)) { + // Ignore this frame. + return; } // This call will create a stream for streamDependency if necessary. @@ -356,6 +337,7 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder { @Override public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode) throws Http2Exception { Http2Stream stream = connection.requireStream(streamId); + switch(stream.state()) { case IDLE: throw connectionError(PROTOCOL_ERROR, "RST_STREAM received for IDLE stream %d", streamId); @@ -453,9 +435,8 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder { public void onPushPromiseRead(ChannelHandlerContext ctx, int streamId, int promisedStreamId, Http2Headers headers, int padding) throws Http2Exception { Http2Stream parentStream = connection.requireStream(streamId); - verifyGoAwayNotReceived(); - if (shouldIgnoreFrame(parentStream, false)) { - // Ignore frames for any stream created after we sent a go-away. + + if (streamCreatedAfterGoAwaySent(parentStream)) { return; } @@ -503,13 +484,12 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder { public void onWindowUpdateRead(ChannelHandlerContext ctx, int streamId, int windowSizeIncrement) throws Http2Exception { Http2Stream stream = connection.requireStream(streamId); - verifyGoAwayNotReceived(); - if (stream.state() == CLOSED || shouldIgnoreFrame(stream, false)) { - // Ignore frames for any stream created after we sent a go-away. + + if (stream.state() == CLOSED || streamCreatedAfterGoAwaySent(stream)) { return; } - // Update the outbound flow controller. + // Update the outbound flow control window. encoder.flowController().incrementWindowSize(ctx, stream, windowSizeIncrement); listener.onWindowUpdateRead(ctx, streamId, windowSizeIncrement); @@ -521,31 +501,10 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder { onUnknownFrame0(ctx, frameType, streamId, flags, payload); } - /** - * Indicates whether or not frames for the given stream should be ignored based on the state of the - * stream/connection. - */ - private boolean shouldIgnoreFrame(Http2Stream stream, boolean allowResetSent) { - if (connection.goAwaySent() && - (stream == null || connection.remote().lastStreamCreated() <= stream.id())) { - // Frames from streams created after we sent a go-away should be ignored. - // Frames for the connection stream ID (i.e. 0) will always be allowed. - return true; - } - - // Also ignore inbound frames after we sent a RST_STREAM frame. - return stream != null && !allowResetSent && stream.isResetSent(); - } - - /** - * Verifies that a GO_AWAY frame was not previously received from the remote endpoint. If it was, throws a - * connection error. - */ - private void verifyGoAwayNotReceived() throws Http2Exception { - if (connection.goAwayReceived()) { - // Connection error. - throw connectionError(PROTOCOL_ERROR, "Received frames after receiving GO_AWAY"); - } + private boolean streamCreatedAfterGoAwaySent(Http2Stream stream) { + // Ignore inbound frames after a GOAWAY was sent and the stream id is greater than + // the last stream id set in the GOAWAY frame. + return connection().goAwaySent() && stream.id() > connection().remote().lastKnownStream(); } } diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionEncoder.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionEncoder.java index 2b1a2d6252..3e53d250f3 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionEncoder.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionEncoder.java @@ -112,12 +112,7 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder { final boolean endOfStream, ChannelPromise promise) { final Http2Stream stream; try { - if (connection.goAwayReceived() || connection.goAwaySent()) { - throw new IllegalStateException("Sending data after connection going away."); - } - stream = connection.requireStream(streamId); - // Verify that the stream is in the appropriate state for sending DATA frames. switch (stream.state()) { case OPEN: @@ -151,9 +146,6 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder { final boolean exclusive, final int padding, final boolean endOfStream, final ChannelPromise promise) { try { - if (connection.goAwayReceived() || connection.goAwaySent()) { - throw connectionError(PROTOCOL_ERROR, "Sending headers after connection going away."); - } Http2Stream stream = connection.stream(streamId); if (stream == null) { stream = connection.local().createStream(streamId); @@ -190,10 +182,6 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder { public ChannelFuture writePriority(ChannelHandlerContext ctx, int streamId, int streamDependency, short weight, boolean exclusive, ChannelPromise promise) { try { - if (connection.goAwayReceived() || connection.goAwaySent()) { - throw connectionError(PROTOCOL_ERROR, "Sending priority after connection going away."); - } - // Update the priority on this stream. Http2Stream stream = connection.stream(streamId); if (stream == null) { @@ -227,10 +215,6 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder { ChannelPromise promise) { outstandingLocalSettingsQueue.add(settings); try { - if (connection.goAwayReceived() || connection.goAwaySent()) { - throw connectionError(PROTOCOL_ERROR, "Sending settings after connection going away."); - } - Boolean pushEnabled = settings.pushEnabled(); if (pushEnabled != null && connection.isServer()) { throw connectionError(PROTOCOL_ERROR, "Server sending SETTINGS frame with ENABLE_PUSH specified"); @@ -252,13 +236,7 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder { } @Override - public ChannelFuture writePing(ChannelHandlerContext ctx, boolean ack, ByteBuf data, - ChannelPromise promise) { - if (connection.goAwayReceived() || connection.goAwaySent()) { - data.release(); - return promise.setFailure(connectionError(PROTOCOL_ERROR, "Sending ping after connection going away.")); - } - + public ChannelFuture writePing(ChannelHandlerContext ctx, boolean ack, ByteBuf data, ChannelPromise promise) { ChannelFuture future = frameWriter.writePing(ctx, ack, data, promise); ctx.flush(); return future; @@ -268,12 +246,12 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder { public ChannelFuture writePushPromise(ChannelHandlerContext ctx, int streamId, int promisedStreamId, Http2Headers headers, int padding, ChannelPromise promise) { try { - if (connection.goAwayReceived() || connection.goAwaySent()) { - throw connectionError(PROTOCOL_ERROR, "Sending push promise after connection going away."); + if (connection.goAwayReceived()) { + throw connectionError(PROTOCOL_ERROR, "Sending PUSH_PROMISE after GO_AWAY received."); } - // Reserve the promised stream. Http2Stream stream = connection.requireStream(streamId); + // Reserve the promised stream. connection.local().reservePushStream(promisedStreamId, stream); } catch (Throwable e) { return promise.setFailure(e); diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2Connection.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2Connection.java index 4ba4848351..3b4ab03657 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2Connection.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2Connection.java @@ -97,7 +97,7 @@ public interface Http2Connection { * Called when a {@code GOAWAY} was received from the remote endpoint. This event handler duplicates {@link * Http2FrameListener#onGoAwayRead(io.netty.channel.ChannelHandlerContext, int, long, io.netty.buffer.ByteBuf)} * but is added here in order to simplify application logic for handling {@code GOAWAY} in a uniform way. An - * application should generally not handle both events, but if it does this method is called first, before + * application should generally not handle both events, but if it does this method is called second, after * notifying the {@link Http2FrameListener}. * * @param lastStreamId the last known stream of the remote endpoint. @@ -206,9 +206,8 @@ public interface Http2Connection { int lastStreamCreated(); /** - * Gets the last stream created by this endpoint that is "known" by the opposite endpoint. * If a GOAWAY was received for this endpoint, this will be the last stream ID from the - * GOAWAY frame. Otherwise, this will be same as {@link #lastStreamCreated()}. + * GOAWAY frame. Otherwise, this will be {@code -1}. */ int lastKnownStream(); diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ConnectionHandler.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ConnectionHandler.java index b73c5e52f9..1d89d0d621 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ConnectionHandler.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ConnectionHandler.java @@ -23,6 +23,8 @@ import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR; import static io.netty.handler.codec.http2.Http2Exception.connectionError; import static io.netty.handler.codec.http2.Http2Exception.isStreamError; import static io.netty.util.internal.ObjectUtil.checkNotNull; +import static java.lang.String.format; + import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufUtil; import io.netty.channel.ChannelFuture; @@ -32,6 +34,9 @@ import io.netty.channel.ChannelPromise; import io.netty.handler.codec.ByteToMessageDecoder; import io.netty.handler.codec.http2.Http2Exception.CompositeStreamException; import io.netty.handler.codec.http2.Http2Exception.StreamException; +import io.netty.util.concurrent.GenericFutureListener; +import io.netty.util.internal.logging.InternalLogger; +import io.netty.util.internal.logging.InternalLoggerFactory; import java.util.Collection; import java.util.List; @@ -50,6 +55,7 @@ public class Http2ConnectionHandler extends ByteToMessageDecoder implements Http private final Http2ConnectionEncoder encoder; private ChannelFutureListener closeListener; private BaseDecoder byteDecoder; + private static final InternalLogger logger = InternalLoggerFactory.getInstance(Http2ConnectionHandler.class); public Http2ConnectionHandler(boolean server, Http2FrameListener listener) { this(new DefaultHttp2Connection(server), listener); @@ -510,36 +516,47 @@ public class Http2ConnectionHandler extends ByteToMessageDecoder implements Http } @Override - public ChannelFuture goAway(ChannelHandlerContext ctx, int lastStreamId, long errorCode, - ByteBuf debugData, ChannelPromise promise) { - Http2Connection connection = connection(); - if (connection.goAwayReceived() || connection.goAwaySent()) { + public ChannelFuture goAway(final ChannelHandlerContext ctx, final int lastStreamId, final long errorCode, + final ByteBuf debugData, ChannelPromise promise) { + try { + final Http2Connection connection = connection(); + if (connection.goAwaySent() && connection.remote().lastKnownStream() < lastStreamId) { + throw connectionError(PROTOCOL_ERROR, "Last stream identifier must not increase between " + + "sending multiple GOAWAY frames (was '%d', is '%d').", + connection.remote().lastKnownStream(), + lastStreamId); + } + connection.goAwaySent(lastStreamId, errorCode, debugData); + + ChannelFuture future = frameWriter().writeGoAway(ctx, lastStreamId, errorCode, debugData, promise); + ctx.flush(); + + future.addListener(new GenericFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (!future.isSuccess()) { + String msg = format("Sending GOAWAY failed: lastStreamId '%d', errorCode '%d', " + + "debugData '%s'.", lastStreamId, errorCode, debugData); + logger.error(msg, future.cause()); + ctx.channel().close(); + } + } + }); + + return future; + } catch (Http2Exception e) { debugData.release(); - return ctx.newSucceededFuture(); + return promise.setFailure(e); } - - connection.goAwaySent(lastStreamId, errorCode, debugData); - - ChannelFuture future = frameWriter().writeGoAway(ctx, lastStreamId, errorCode, debugData, promise); - ctx.flush(); - - return future; } /** * Close the remote endpoint with with a {@code GO_AWAY} frame. */ private ChannelFuture goAway(ChannelHandlerContext ctx, Http2Exception cause) { - Http2Connection connection = connection(); - if (connection.goAwayReceived() || connection.goAwaySent()) { - return ctx.newSucceededFuture(); - } - - // The connection isn't alredy going away, send the GO_AWAY frame now to start - // the process. long errorCode = cause != null ? cause.error().code() : NO_ERROR.code(); ByteBuf debugData = Http2CodecUtil.toByteBuf(ctx, cause); - int lastKnownStream = connection.remote().lastStreamCreated(); + int lastKnownStream = connection().remote().lastStreamCreated(); return goAway(ctx, lastKnownStream, errorCode, debugData, ctx.newPromise()); } diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionEncoderTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionEncoderTest.java index 9fb455823b..529216ff71 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionEncoderTest.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionEncoderTest.java @@ -18,13 +18,17 @@ import static io.netty.buffer.Unpooled.wrappedBuffer; import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT; import static io.netty.handler.codec.http2.Http2CodecUtil.emptyPingBuf; import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR; +import static io.netty.handler.codec.http2.Http2Stream.State.CLOSED; import static io.netty.handler.codec.http2.Http2Stream.State.HALF_CLOSED_LOCAL; import static io.netty.handler.codec.http2.Http2Stream.State.IDLE; import static io.netty.handler.codec.http2.Http2Stream.State.OPEN; import static io.netty.handler.codec.http2.Http2Stream.State.RESERVED_LOCAL; import static io.netty.util.CharsetUtil.UTF_8; +import static org.hamcrest.CoreMatchers.instanceOf; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; @@ -35,8 +39,10 @@ import static org.mockito.Matchers.anyShort; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; @@ -199,20 +205,6 @@ public class DefaultHttp2ConnectionEncoderTest { encoder.lifecycleManager(lifecycleManager); } - @Test - public void dataWriteAfterGoAwayShouldFail() throws Exception { - when(connection.goAwayReceived()).thenReturn(true); - final ByteBuf data = dummyData(); - try { - ChannelFuture future = encoder.writeData(ctx, STREAM_ID, data, 0, true, promise); - assertTrue(future.awaitUninterruptibly().cause() instanceof IllegalStateException); - } finally { - while (data.refCnt() > 0) { - data.release(); - } - } - } - @Test public void dataWriteShouldSucceed() throws Exception { final ByteBuf data = dummyData(); @@ -302,18 +294,6 @@ public class DefaultHttp2ConnectionEncoderTest { assertEquals(5, (int) writtenPadding.get(1)); } - @Test - public void headersWriteAfterGoAwayShouldFail() throws Exception { - when(connection.goAwayReceived()).thenReturn(true); - ChannelFuture future = encoder.writeHeaders( - ctx, 5, EmptyHttp2Headers.INSTANCE, 0, (short) 255, false, 0, false, promise); - verify(local, never()).createStream(anyInt()); - verify(stream, never()).open(anyBoolean()); - verify(writer, never()).writeHeaders(eq(ctx), anyInt(), any(Http2Headers.class), anyInt(), anyBoolean(), - eq(promise)); - assertTrue(future.awaitUninterruptibly().cause() instanceof Http2Exception); - } - @Test public void headersWriteForUnknownStreamShouldCreateStream() throws Exception { int streamId = 5; @@ -344,11 +324,10 @@ public class DefaultHttp2ConnectionEncoderTest { } @Test - public void pushPromiseWriteAfterGoAwayShouldFail() throws Exception { + public void pushPromiseWriteAfterGoAwayReceivedShouldFail() throws Exception { when(connection.goAwayReceived()).thenReturn(true); - ChannelFuture future = - encoder.writePushPromise(ctx, STREAM_ID, PUSH_STREAM_ID, - EmptyHttp2Headers.INSTANCE, 0, promise); + ChannelFuture future = encoder.writePushPromise(ctx, STREAM_ID, PUSH_STREAM_ID, EmptyHttp2Headers.INSTANCE, 0, + promise); assertTrue(future.awaitUninterruptibly().cause() instanceof Http2Exception); } @@ -361,10 +340,10 @@ public class DefaultHttp2ConnectionEncoderTest { } @Test - public void priorityWriteAfterGoAwayShouldFail() throws Exception { + public void priorityWriteAfterGoAwayShouldSucceed() throws Exception { when(connection.goAwayReceived()).thenReturn(true); - ChannelFuture future = encoder.writePriority(ctx, STREAM_ID, 0, (short) 255, true, promise); - assertTrue(future.awaitUninterruptibly().cause() instanceof Http2Exception); + encoder.writePriority(ctx, STREAM_ID, 0, (short) 255, true, promise); + verify(writer).writePriority(eq(ctx), eq(STREAM_ID), eq(0), eq((short) 255), eq(true), eq(promise)); } @Test @@ -425,10 +404,10 @@ public class DefaultHttp2ConnectionEncoderTest { } @Test - public void pingWriteAfterGoAwayShouldFail() throws Exception { + public void pingWriteAfterGoAwayShouldSucceed() throws Exception { when(connection.goAwayReceived()).thenReturn(true); - ChannelFuture future = encoder.writePing(ctx, false, emptyPingBuf(), promise); - assertTrue(future.awaitUninterruptibly().cause() instanceof Http2Exception); + encoder.writePing(ctx, false, emptyPingBuf(), promise); + verify(writer).writePing(eq(ctx), eq(false), eq(emptyPingBuf()), eq(promise)); } @Test @@ -438,10 +417,10 @@ public class DefaultHttp2ConnectionEncoderTest { } @Test - public void settingsWriteAfterGoAwayShouldFail() throws Exception { + public void settingsWriteAfterGoAwayShouldSucceed() throws Exception { when(connection.goAwayReceived()).thenReturn(true); - ChannelFuture future = encoder.writeSettings(ctx, new Http2Settings(), promise); - assertTrue(future.awaitUninterruptibly().cause() instanceof Http2Exception); + encoder.writeSettings(ctx, new Http2Settings(), promise); + verify(writer).writeSettings(eq(ctx), any(Http2Settings.class), eq(promise)); } @Test @@ -501,6 +480,70 @@ public class DefaultHttp2ConnectionEncoderTest { verify(lifecycleManager).closeStreamLocal(eq(stream), eq(promise)); } + @Test + public void encoderDelegatesGoAwayToLifeCycleManager() { + encoder.writeGoAway(ctx, STREAM_ID, Http2Error.INTERNAL_ERROR.code(), null, promise); + verify(lifecycleManager).goAway(eq(ctx), eq(STREAM_ID), eq(Http2Error.INTERNAL_ERROR.code()), + eq((ByteBuf) null), eq(promise)); + verifyNoMoreInteractions(writer); + } + + @Test + public void dataWriteToClosedStreamShouldFail() { + when(stream.state()).thenReturn(CLOSED); + ByteBuf data = mock(ByteBuf.class); + encoder.writeData(ctx, STREAM_ID, data, 0, false, promise); + assertTrue(promise.isDone()); + assertFalse(promise.isSuccess()); + assertThat(promise.cause(), instanceOf(IllegalStateException.class)); + verify(data).release(); + } + + @Test + public void dataWriteToHalfClosedLocalStreamShouldFail() { + when(stream.state()).thenReturn(HALF_CLOSED_LOCAL); + ByteBuf data = mock(ByteBuf.class); + encoder.writeData(ctx, STREAM_ID, data, 0, false, promise); + assertTrue(promise.isDone()); + assertFalse(promise.isSuccess()); + assertThat(promise.cause(), instanceOf(IllegalStateException.class)); + verify(data).release(); + } + + @Test + public void canWriteDataFrameAfterGoAwaySent() { + when(connection.goAwaySent()).thenReturn(true); + when(remote.lastKnownStream()).thenReturn(0); + ByteBuf data = mock(ByteBuf.class); + encoder.writeData(ctx, STREAM_ID, data, 0, false, promise); + verify(remoteFlow).sendFlowControlled(eq(ctx), eq(stream), any(FlowControlled.class)); + } + + @Test + public void canWriteHeaderFrameAfterGoAwaySent() { + when(connection.goAwaySent()).thenReturn(true); + when(remote.lastKnownStream()).thenReturn(0); + encoder.writeHeaders(ctx, STREAM_ID, EmptyHttp2Headers.INSTANCE, 0, false, promise); + verify(remoteFlow).sendFlowControlled(eq(ctx), eq(stream), any(FlowControlled.class)); + } + + @Test + public void canWriteDataFrameAfterGoAwayReceived() { + when(connection.goAwayReceived()).thenReturn(true); + when(local.lastKnownStream()).thenReturn(STREAM_ID); + ByteBuf data = mock(ByteBuf.class); + encoder.writeData(ctx, STREAM_ID, data, 0, false, promise); + verify(remoteFlow).sendFlowControlled(eq(ctx), eq(stream), any(FlowControlled.class)); + } + + @Test + public void canWriteHeaderFrameAfterGoAwayReceived() { + when(connection.goAwayReceived()).thenReturn(true); + when(local.lastKnownStream()).thenReturn(STREAM_ID); + encoder.writeHeaders(ctx, STREAM_ID, EmptyHttp2Headers.INSTANCE, 0, false, promise); + verify(remoteFlow).sendFlowControlled(eq(ctx), eq(stream), any(FlowControlled.class)); + } + private void mockSendFlowControlledWriteEverything() { doAnswer(new Answer() { @Override diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionTest.java index 197be34bae..cdcbecbd14 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionTest.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionTest.java @@ -74,6 +74,45 @@ public class DefaultHttp2ConnectionTest { assertNull(server.stream(100)); } + @Test + public void goAwayReceivedShouldCloseStreamsGreaterThanLastStream() throws Exception { + Http2Stream stream1 = client.local().createStream(3).open(false); + Http2Stream stream2 = client.local().createStream(5).open(false); + Http2Stream remoteStream = client.remote().createStream(4).open(false); + + assertEquals(State.OPEN, stream1.state()); + assertEquals(State.OPEN, stream2.state()); + + client.goAwayReceived(3, 8, null); + + assertEquals(State.OPEN, stream1.state()); + assertEquals(State.CLOSED, stream2.state()); + assertEquals(State.OPEN, remoteStream.state()); + assertEquals(3, client.local().lastKnownStream()); + assertEquals(5, client.local().lastStreamCreated()); + // The remote endpoint must not be affected by a received GOAWAY frame. + assertEquals(-1, client.remote().lastKnownStream()); + assertEquals(State.OPEN, remoteStream.state()); + } + + @Test + public void goAwaySentShouldCloseStreamsGreaterThanLastStream() throws Exception { + Http2Stream stream1 = server.remote().createStream(3).open(false); + Http2Stream stream2 = server.remote().createStream(5).open(false); + Http2Stream localStream = server.local().createStream(4).open(false); + + server.goAwaySent(3, 8, null); + + assertEquals(State.OPEN, stream1.state()); + assertEquals(State.CLOSED, stream2.state()); + + assertEquals(3, server.remote().lastKnownStream()); + assertEquals(5, server.remote().lastStreamCreated()); + // The local endpoint must not be affected by a sent GOAWAY frame. + assertEquals(-1, server.local().lastKnownStream()); + assertEquals(State.OPEN, localStream.state()); + } + @Test public void serverCreateStreamShouldSucceed() throws Http2Exception { Http2Stream stream = server.local().createStream(2).open(false); diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2ConnectionHandlerTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2ConnectionHandlerTest.java index fd8fbfe53a..8a3d3203e8 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2ConnectionHandlerTest.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2ConnectionHandlerTest.java @@ -22,6 +22,7 @@ import static io.netty.handler.codec.http2.Http2Error.STREAM_CLOSED; import static io.netty.handler.codec.http2.Http2Stream.State.CLOSED; import static io.netty.util.CharsetUtil.UTF_8; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyBoolean; @@ -29,16 +30,17 @@ import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.buffer.UnpooledByteBufAllocator; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; import io.netty.channel.DefaultChannelPromise; @@ -272,7 +274,7 @@ public class Http2ConnectionHandlerTest { @Override public ChannelFuture answer(InvocationOnMock invocation) throws Throwable { Object[] args = invocation.getArguments(); - ChannelFutureListener listener = (ChannelFutureListener) args[0]; + GenericFutureListener listener = (GenericFutureListener) args[0]; // Simulate that all streams have become inactive by the time the future completes. when(connection.activeStreams()).thenReturn(Collections.emptyList()); when(connection.numActiveStreams()).thenReturn(0); @@ -287,4 +289,49 @@ public class Http2ConnectionHandlerTest { handler.closeStream(stream, future); verify(ctx, times(1)).close(any(ChannelPromise.class)); } + + public void canSendGoAwayFrame() throws Exception { + handler = newHandler(); + ByteBuf data = mock(ByteBuf.class); + long errorCode = Http2Error.INTERNAL_ERROR.code(); + handler.goAway(ctx, STREAM_ID, errorCode, data, promise); + + verify(connection).goAwaySent(eq(STREAM_ID), eq(errorCode), eq(data)); + verify(frameWriter).writeGoAway(eq(ctx), eq(STREAM_ID), eq(errorCode), eq(data), eq(promise)); + } + + @Test + public void canSendGoAwayFramesWithDecreasingLastStreamIds() throws Exception { + handler = newHandler(); + ByteBuf data = mock(ByteBuf.class); + long errorCode = Http2Error.INTERNAL_ERROR.code(); + + handler.goAway(ctx, STREAM_ID + 2, errorCode, data, promise); + verify(frameWriter).writeGoAway(eq(ctx), eq(STREAM_ID + 2), eq(errorCode), eq(data), eq(promise)); + verify(connection).goAwaySent(eq(STREAM_ID + 2), eq(errorCode), eq(data)); + handler.goAway(ctx, STREAM_ID, errorCode, data, promise); + verify(frameWriter).writeGoAway(eq(ctx), eq(STREAM_ID), eq(errorCode), eq(data), eq(promise)); + verify(connection).goAwaySent(eq(STREAM_ID), eq(errorCode), eq(data)); + } + + @Test + public void cannotSendGoAwayFrameWithIncreasingLastStreamIds() throws Exception { + handler = newHandler(); + ByteBuf data = mock(ByteBuf.class); + long errorCode = Http2Error.INTERNAL_ERROR.code(); + + handler.goAway(ctx, STREAM_ID, errorCode, data, promise); + verify(connection).goAwaySent(eq(STREAM_ID), eq(errorCode), eq(data)); + verify(frameWriter).writeGoAway(eq(ctx), eq(STREAM_ID), eq(errorCode), eq(data), eq(promise)); + // The frameWriter is only mocked, so it should not have interacted with the promise. + assertFalse(promise.isDone()); + + when(connection.goAwaySent()).thenReturn(true); + when(remote.lastKnownStream()).thenReturn(STREAM_ID); + handler.goAway(ctx, STREAM_ID + 2, errorCode, data, promise); + assertTrue(promise.isDone()); + assertFalse(promise.isSuccess()); + verify(data).release(); + verifyNoMoreInteractions(frameWriter); + } }