diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2Connection.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2Connection.java index 7ce5de28e5..589546dbe1 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2Connection.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2Connection.java @@ -296,7 +296,6 @@ public class DefaultHttp2Connection implements Http2Connection { private IntObjectMap children = IntCollections.emptyMap(); private int prioritizableForTree = 1; private boolean resetSent; - private boolean headerSent; DefaultStream(int id, State state) { this.id = id; @@ -324,17 +323,6 @@ public class DefaultHttp2Connection implements Http2Connection { return this; } - @Override - public boolean isHeaderSent() { - return headerSent; - } - - @Override - public Http2Stream headerSent() { - headerSent = true; - return this; - } - @Override public final V setProperty(PropertyKey key, V value) { return properties.add(verifyKey(key), value); @@ -793,16 +781,6 @@ public class DefaultHttp2Connection implements Http2Connection { throw new UnsupportedOperationException(); } - @Override - public boolean isHeaderSent() { - return false; - } - - @Override - public Http2Stream headerSent() { - throw new UnsupportedOperationException(); - } - @Override public Http2Stream setPriority(int parentStreamId, short weight, boolean exclusive) { throw new UnsupportedOperationException(); diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionDecoder.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionDecoder.java index ccb189fda8..a138854b7c 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionDecoder.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionDecoder.java @@ -14,6 +14,14 @@ */ package io.netty.handler.codec.http2; +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.http2.Http2Exception.ClosedStreamCreationException; +import io.netty.util.internal.logging.InternalLogger; +import io.netty.util.internal.logging.InternalLoggerFactory; + +import java.util.List; + import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT; import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR; import static io.netty.handler.codec.http2.Http2Error.STREAM_CLOSED; @@ -23,11 +31,6 @@ import static io.netty.handler.codec.http2.Http2PromisedRequestVerifier.ALWAYS_V import static io.netty.handler.codec.http2.Http2Stream.State.CLOSED; import static io.netty.handler.codec.http2.Http2Stream.State.HALF_CLOSED_REMOTE; import static io.netty.util.internal.ObjectUtil.checkNotNull; -import io.netty.buffer.ByteBuf; -import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.http2.Http2Exception.ClosedStreamCreationException; - -import java.util.List; /** * Provides the default implementation for processing inbound frame events and delegates to a @@ -39,6 +42,7 @@ import java.util.List; * {@link Http2LocalFlowController} */ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder { + private static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultHttp2ConnectionDecoder.class); private Http2FrameListener internalFrameListener = new PrefaceFrameListener(); private final Http2Connection connection; private Http2LifecycleManager lifecycleManager; @@ -185,23 +189,28 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder { */ private final class FrameReadListener implements Http2FrameListener { @Override - public int onDataRead(final ChannelHandlerContext ctx, int streamId, ByteBuf data, - int padding, boolean endOfStream) throws Http2Exception { + public int onDataRead(final ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, + boolean endOfStream) throws Http2Exception { Http2Stream stream = connection.stream(streamId); Http2LocalFlowController flowController = flowController(); int bytesToReturn = data.readableBytes() + padding; - if (stream == null || stream.isResetSent() || streamCreatedAfterGoAwaySent(streamId)) { - // Ignoring this frame. We still need to count the frame towards the connection flow control - // window, but we immediately mark all bytes as consumed. - flowController.receiveFlowControlledFrame(stream, data, padding, endOfStream); - flowController.consumeBytes(stream, bytesToReturn); + boolean shouldIgnore = true; + try { + shouldIgnore = shouldIgnoreHeadersOrDataFrame(ctx, streamId, stream, "DATA"); + } finally { + if (shouldIgnore) { + // Ignoring this frame. We still need to count the frame towards the connection flow control + // window, but we immediately mark all bytes as consumed. + flowController.receiveFlowControlledFrame(stream, data, padding, endOfStream); + flowController.consumeBytes(stream, bytesToReturn); - // Verify that the stream may have existed after we apply flow control. - verifyStreamMayHaveExisted(streamId); + // Verify that the stream may have existed after we apply flow control. + verifyStreamMayHaveExisted(streamId); - // All bytes have been consumed. - return bytesToReturn; + // All bytes have been consumed. + return bytesToReturn; + } } Http2Exception error = null; @@ -276,8 +285,7 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder { allowHalfClosedRemote = stream.state() == HALF_CLOSED_REMOTE; } - if (stream == null || stream.isResetSent() || streamCreatedAfterGoAwaySent(streamId)) { - // Ignore this frame. + if (shouldIgnoreHeadersOrDataFrame(ctx, streamId, stream, "HEADERS")) { return; } @@ -329,7 +337,10 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder { try { if (stream == null) { if (connection.streamMayHaveExisted(streamId)) { - // Ignore this frame. + if (logger.isInfoEnabled()) { + logger.info("%s ignoring PRIORITY frame for stream id %d. Stream doesn't exist but may " + + " have existed", ctx.channel(), streamId); + } return; } @@ -337,7 +348,11 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder { // first frame to be received for a stream that we must create the stream. stream = connection.remote().createIdleStream(streamId); } else if (streamCreatedAfterGoAwaySent(streamId)) { - // Ignore this frame. + if (logger.isInfoEnabled()) { + logger.info("%s ignoring PRIORITY frame for stream id %d. Stream created after GOAWAY sent. " + + "Last known stream by peer " + connection.remote().lastStreamKnownByPeer(), + ctx.channel(), streamId); + } return; } @@ -457,7 +472,7 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder { Http2Headers headers, int padding) throws Http2Exception { Http2Stream parentStream = connection.stream(streamId); - if (streamCreatedAfterGoAwaySent(streamId)) { + if (shouldIgnoreHeadersOrDataFrame(ctx, streamId, parentStream, "PUSH_PROMISE")) { return; } @@ -527,6 +542,36 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder { onUnknownFrame0(ctx, frameType, streamId, flags, payload); } + /** + * Helper method to determine if a frame that has the semantics of headers or data should be ignored for the + * {@code stream} (which may be {@code null}) associated with {@code streamId}. + */ + private boolean shouldIgnoreHeadersOrDataFrame(ChannelHandlerContext ctx, int streamId, Http2Stream stream, + String frameName) throws Http2Exception { + if (stream == null) { + if (streamCreatedAfterGoAwaySent(streamId)) { + if (logger.isInfoEnabled()) { + logger.info("%s ignoring %s frame for stream id %d. Stream sent after GOAWAY sent", + ctx.channel(), frameName, streamId); + } + return true; + } + // Its possible that this frame would result in stream ID out of order creation (PROTOCOL ERROR) and its + // also possible that this frame is received on a CLOSED stream (STREAM_CLOSED after a RST_STREAM is + // sent). We don't have enough information to know for sure, so we choose the lesser of the two errors. + throw streamError(streamId, STREAM_CLOSED, "Received HEADERS frame for an unknown stream %d", streamId); + } else if (stream.isResetSent() || streamCreatedAfterGoAwaySent(streamId)) { + if (logger.isInfoEnabled()) { + logger.info("%s ignoring %s frame for stream id %d. %s", ctx.channel(), frameName, + stream.isResetSent() ? "RST_STREAM sent." : + ("Stream created after GOAWAY sent. Last known stream by peer " + + connection.remote().lastStreamKnownByPeer())); + } + return true; + } + return false; + } + /** * Helper method for determining whether or not to ignore inbound frames. A stream is considered to be created * after a {@code GOAWAY} is sent if the following conditions hold: diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionEncoder.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionEncoder.java index 8c5718a371..6ee351c7eb 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionEncoder.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionEncoder.java @@ -14,12 +14,6 @@ */ package io.netty.handler.codec.http2; -import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT; -import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR; -import static io.netty.handler.codec.http2.Http2Exception.connectionError; -import static io.netty.util.internal.ObjectUtil.checkNotNull; -import static java.lang.Math.min; - import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; @@ -30,6 +24,12 @@ import io.netty.handler.codec.http2.Http2Exception.ClosedStreamCreationException import java.util.ArrayDeque; +import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT; +import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR; +import static io.netty.handler.codec.http2.Http2Exception.connectionError; +import static io.netty.util.internal.ObjectUtil.checkNotNull; +import static java.lang.Math.min; + /** * Default implementation of {@link Http2ConnectionEncoder}. */ @@ -167,11 +167,29 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder { } } - // Pass headers to the flow-controller so it can maintain their sequence relative to DATA frames. - flowController().addFlowControlled(stream, - new FlowControlledHeaders(stream, headers, streamDependency, weight, exclusive, padding, - endOfStream, promise)); - return promise; + // Trailing headers must go through flow control if there are other frames queued in flow control + // for this stream. + Http2RemoteFlowController flowController = flowController(); + if (!endOfStream || !flowController.hasFlowControlled(stream)) { + ChannelFuture future = frameWriter.writeHeaders(ctx, streamId, headers, streamDependency, weight, + exclusive, padding, endOfStream, promise); + if (endOfStream) { + final Http2Stream finalStream = stream; + future.addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + lifecycleManager.closeStreamLocal(finalStream, promise); + } + }); + } + return future; + } else { + // Pass headers to the flow-controller so it can maintain their sequence relative to DATA frames. + flowController.addFlowControlled(stream, + new FlowControlledHeaders(stream, headers, streamDependency, weight, exclusive, padding, + endOfStream, promise)); + return promise; + } } catch (Http2NoMoreStreamIdsException e) { lifecycleManager.onError(ctx, e); return promise.setFailure(e); @@ -410,7 +428,6 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder { } promise.addListener(this); - stream.headerSent(); frameWriter.writeHeaders(ctx, stream.id(), headers, streamDependency, weight, exclusive, padding, endOfStream, promise); } diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2RemoteFlowController.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2RemoteFlowController.java index 0acedebdcf..dd33ce80a4 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2RemoteFlowController.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2RemoteFlowController.java @@ -231,6 +231,11 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll } } + @Override + public boolean hasFlowControlled(Http2Stream stream) { + return state(stream).hasFrame(); + } + private AbstractState state(Http2Stream stream) { return (AbstractState) checkNotNull(stream, "stream").getProperty(stateKey); } diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ConnectionHandler.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ConnectionHandler.java index 28c1804bfd..0dfd1ffd28 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ConnectionHandler.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ConnectionHandler.java @@ -616,7 +616,7 @@ public class Http2ConnectionHandler extends ByteToMessageDecoder implements Http } final ChannelFuture future; - if (stream.state() == IDLE || (connection().local().created(stream) && !stream.isHeaderSent())) { + if (stream.state() == IDLE || connection().local().created(stream)) { // The other endpoint doesn't know about the stream yet, so we can't actually send // the RST_STREAM frame. The HTTP/2 spec also disallows sending RST_STREAM for IDLE streams. future = promise.setSuccess(); diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2RemoteFlowController.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2RemoteFlowController.java index 83fe96da84..b586c16710 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2RemoteFlowController.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2RemoteFlowController.java @@ -41,6 +41,13 @@ public interface Http2RemoteFlowController extends Http2FlowController { */ void addFlowControlled(Http2Stream stream, FlowControlled payload); + /** + * Determine if {@code stream} has any {@link FlowControlled} frames currently queued. + * @param stream the stream to check if it has flow controlled frames. + * @return {@code true} if {@code stream} has any {@link FlowControlled} frames currently queued. + */ + boolean hasFlowControlled(Http2Stream stream); + /** * Write all data pending in the flow controller up to the flow-control limits. * diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2Stream.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2Stream.java index b1ab5c54fe..92f57ddf23 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2Stream.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2Stream.java @@ -110,18 +110,6 @@ public interface Http2Stream { */ Http2Stream resetSent(); - /** - * Indicates whether or not at least one {@code HEADERS} frame has been sent from the local endpoint - * for this stream. - */ - boolean isHeaderSent(); - - /** - * Sets the flag indicating that a {@code HEADERS} frame has been sent from the local endpoint - * for this stream. This does not affect the stream state. - */ - Http2Stream headerSent(); - /** * Associates the application-defined data with this stream. * @return The value that was previously associated with {@code key}, or {@code null} if there was none. diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionDecoderTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionDecoderTest.java index 042a399913..cfca1f8437 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionDecoderTest.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionDecoderTest.java @@ -32,6 +32,7 @@ import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.anyShort; import static org.mockito.Matchers.eq; +import static org.mockito.Matchers.isNull; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.never; @@ -297,6 +298,21 @@ public class DefaultHttp2ConnectionDecoderTest { } } + @Test + public void dataReadAfterGoAwaySentOnUknownStreamShouldIgnore() throws Exception { + // Throw an exception when checking stream state. + when(connection.stream(STREAM_ID)).thenReturn(null); + mockGoAwaySent(); + final ByteBuf data = dummyData(); + try { + decode().onDataRead(ctx, STREAM_ID, data, 10, true); + verify(localFlow).receiveFlowControlledFrame((Http2Stream) isNull(), eq(data), eq(10), eq(true)); + verify(listener, never()).onDataRead(eq(ctx), anyInt(), any(ByteBuf.class), anyInt(), anyBoolean()); + } finally { + data.release(); + } + } + @Test public void dataReadAfterRstStreamForStreamInInvalidStateShouldIgnore() throws Exception { // Throw an exception when checking stream state. @@ -369,21 +385,30 @@ public class DefaultHttp2ConnectionDecoderTest { } } - @Test - public void headersReadAfterGoAwayShouldBeIgnored() throws Exception { - when(connection.goAwaySent()).thenReturn(true); + @Test(expected = Http2Exception.class) + public void headersReadForUnknownStreamShouldThrow() throws Exception { + when(connection.stream(STREAM_ID)).thenReturn(null); decode().onHeadersRead(ctx, STREAM_ID, EmptyHttp2Headers.INSTANCE, 0, false); - verify(remote, never()).createIdleStream(eq(STREAM_ID)); + } + + @Test + public void headersReadForStreamThatAlreadySentResetShouldBeIgnored() throws Exception { + when(stream.isResetSent()).thenReturn(true); + decode().onHeadersRead(ctx, STREAM_ID, EmptyHttp2Headers.INSTANCE, 0, false); + verify(remote, never()).createIdleStream(anyInt()); + verify(remote, never()).createStream(anyInt(), anyBoolean()); verify(stream, never()).open(anyBoolean()); // Verify that the event was absorbed and not propagated to the oberver. verify(listener, never()).onHeadersRead(eq(ctx), anyInt(), any(Http2Headers.class), anyInt(), anyBoolean()); verify(remote, never()).createIdleStream(anyInt()); + verify(remote, never()).createStream(anyInt(), anyBoolean()); verify(stream, never()).open(anyBoolean()); } @Test - public void headersReadForUnknownStreamShouldBeIgnored() throws Exception { + public void headersReadForUnknownStreamAfterGoAwayShouldBeIgnored() throws Exception { + mockGoAwaySent(); when(connection.stream(STREAM_ID)).thenReturn(null); decode().onHeadersRead(ctx, STREAM_ID, EmptyHttp2Headers.INSTANCE, 0, false); verify(remote, never()).createStream(anyInt(), anyBoolean()); diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionEncoderTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionEncoderTest.java index 9158d2b3b2..236fb92d37 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionEncoderTest.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionEncoderTest.java @@ -265,8 +265,9 @@ public class DefaultHttp2ConnectionEncoderTest { public void dataFramesDontMergeWithHeaders() throws Exception { createStream(STREAM_ID, false); final ByteBuf data = dummyData().retain(); - encoder.writeData(ctx, STREAM_ID, data, 0, true, newPromise()); - encoder.writeHeaders(ctx, STREAM_ID, EmptyHttp2Headers.INSTANCE, 0, false, newPromise()); + encoder.writeData(ctx, STREAM_ID, data, 0, false, newPromise()); + when(remoteFlow.hasFlowControlled(any(Http2Stream.class))).thenReturn(true); + encoder.writeHeaders(ctx, STREAM_ID, EmptyHttp2Headers.INSTANCE, 0, true, newPromise()); List capturedWrites = payloadCaptor.getAllValues(); assertFalse(capturedWrites.get(0).merge(ctx, capturedWrites.get(1))); } diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2ConnectionHandlerTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2ConnectionHandlerTest.java index ac9e8a15a5..5dcd0ccbc1 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2ConnectionHandlerTest.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2ConnectionHandlerTest.java @@ -315,7 +315,6 @@ public class Http2ConnectionHandlerTest { when(frameWriter.writeRstStream(eq(ctx), eq(STREAM_ID), anyLong(), any(ChannelPromise.class))).thenReturn(future); when(stream.state()).thenReturn(CLOSED); - when(stream.isHeaderSent()).thenReturn(true); // The stream is "closed" but is still known about by the connection (connection().stream(..) // will return the stream). We should still write a RST_STREAM frame in this scenario. handler.resetStream(ctx, STREAM_ID, STREAM_CLOSED.code(), promise); diff --git a/microbench/src/main/java/io/netty/microbench/http2/NoopHttp2RemoteFlowController.java b/microbench/src/main/java/io/netty/microbench/http2/NoopHttp2RemoteFlowController.java index ede3a9b883..85ba52c8ff 100644 --- a/microbench/src/main/java/io/netty/microbench/http2/NoopHttp2RemoteFlowController.java +++ b/microbench/src/main/java/io/netty/microbench/http2/NoopHttp2RemoteFlowController.java @@ -71,6 +71,11 @@ public final class NoopHttp2RemoteFlowController implements Http2RemoteFlowContr } while (payload.size() > 0); } + @Override + public boolean hasFlowControlled(Http2Stream stream) { + return false; + } + @Override public void channelHandlerContext(ChannelHandlerContext ctx) throws Http2Exception { this.ctx = ctx;