From ec6cd54f850638510e1fab7e15155b59afd6adf5 Mon Sep 17 00:00:00 2001 From: nmittler Date: Tue, 21 Apr 2015 14:18:02 -0700 Subject: [PATCH] Ignore frames for streams that may have previously existed. Motivation: The recent PR that discarded the Http2StreamRemovalPolicy causes connection errors when receiving a frame for a stream that no longer exists. We should ignore these frames if we think there's a chance that the stream has existed previously Modifications: Modified the Http2Connection interface to provide a `streamMayHaveExisted` method. Also removed the requireStream() method to identify all of the places in the code that need to be updated. Modified the encoder and decoder to properly handle cases where a stream may have existed but no longer does. Result: Fixes #3643 --- .../codec/http2/DefaultHttp2Connection.java | 31 ++-- .../http2/DefaultHttp2ConnectionDecoder.java | 102 +++++++++----- .../http2/DefaultHttp2ConnectionEncoder.java | 19 ++- .../DefaultHttp2LocalFlowController.java | 19 ++- .../handler/codec/http2/Http2Connection.java | 19 ++- .../codec/http2/Http2FrameListener.java | 22 ++- .../codec/http2/Http2LocalFlowController.java | 26 ++-- .../DefaultHttp2ConnectionDecoderTest.java | 132 +++++++++++++----- .../DefaultHttp2ConnectionEncoderTest.java | 5 - .../http2/DefaultHttp2ConnectionTest.java | 5 - .../DefaultHttp2LocalFlowControllerTest.java | 16 ++- .../DefaultHttp2RemoteFlowControllerTest.java | 4 +- 12 files changed, 260 insertions(+), 140 deletions(-) diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2Connection.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2Connection.java index 9e24d9ddc5..1322ace09a 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2Connection.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2Connection.java @@ -113,17 +113,13 @@ public class DefaultHttp2Connection implements Http2Connection { } @Override - public Http2Stream requireStream(int streamId) throws Http2Exception { - Http2Stream stream = stream(streamId); - if (stream == null) { - throw connectionError(PROTOCOL_ERROR, "Stream does not exist %d", streamId); - } - return stream; + public Http2Stream stream(int streamId) { + return streamMap.get(streamId); } @Override - public Http2Stream stream(int streamId) { - return streamMap.get(streamId); + public boolean streamMayHaveExisted(int streamId) { + return remoteEndpoint.mayHaveCreatedStream(streamId) || localEndpoint.mayHaveCreatedStream(streamId); } @Override @@ -166,7 +162,7 @@ public class DefaultHttp2Connection implements Http2Connection { forEachActiveStream(new Http2StreamVisitor() { @Override public boolean visit(Http2Stream stream) { - if (stream.id() > lastKnownStream && localEndpoint.createdStreamId(stream.id())) { + if (stream.id() > lastKnownStream && localEndpoint.isValidStreamId(stream.id())) { stream.close(); } return true; @@ -197,7 +193,7 @@ public class DefaultHttp2Connection implements Http2Connection { forEachActiveStream(new Http2StreamVisitor() { @Override public boolean visit(Http2Stream stream) { - if (stream.id() > lastKnownStream && remoteEndpoint.createdStreamId(stream.id())) { + if (stream.id() > lastKnownStream && remoteEndpoint.isValidStreamId(stream.id())) { stream.close(); } return true; @@ -551,11 +547,11 @@ public class DefaultHttp2Connection implements Http2Connection { } final DefaultEndpoint createdBy() { - return localEndpoint.createdStreamId(id) ? localEndpoint : remoteEndpoint; + return localEndpoint.isValidStreamId(id) ? localEndpoint : remoteEndpoint; } final boolean isLocal() { - return localEndpoint.createdStreamId(id); + return localEndpoint.isValidStreamId(id); } final void weight(short weight) { @@ -880,9 +876,14 @@ public class DefaultHttp2Connection implements Http2Connection { } @Override - public boolean createdStreamId(int streamId) { + public boolean isValidStreamId(int streamId) { boolean even = (streamId & 1) == 0; - return server == even; + return streamId > 0 && server == even; + } + + @Override + public boolean mayHaveCreatedStream(int streamId) { + return isValidStreamId(streamId) && streamId <= lastStreamCreated; } @Override @@ -1021,7 +1022,7 @@ public class DefaultHttp2Connection implements Http2Connection { if (streamId < 0) { throw new Http2NoMoreStreamIdsException(); } - if (!createdStreamId(streamId)) { + if (!isValidStreamId(streamId)) { throw connectionError(PROTOCOL_ERROR, "Request stream %d is not correct for %s connection", streamId, server ? "server" : "client"); } diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionDecoder.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionDecoder.java index 14929bf95d..27608b569f 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionDecoder.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionDecoder.java @@ -21,6 +21,7 @@ import static io.netty.handler.codec.http2.Http2Exception.connectionError; import static io.netty.handler.codec.http2.Http2Exception.streamError; import static io.netty.handler.codec.http2.Http2PromisedRequestVerifier.ALWAYS_VERIFY; import static io.netty.handler.codec.http2.Http2Stream.State.CLOSED; +import static io.netty.handler.codec.http2.Http2Stream.State.HALF_CLOSED_REMOTE; import static io.netty.util.internal.ObjectUtil.checkNotNull; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; @@ -184,16 +185,20 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder { @Override public int onDataRead(final ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, boolean endOfStream) throws Http2Exception { - Http2Stream stream = connection.requireStream(streamId); + Http2Stream stream = connection.stream(streamId); Http2LocalFlowController flowController = flowController(); int bytesToReturn = data.readableBytes() + padding; - if (stream.isResetSent() || streamCreatedAfterGoAwaySent(stream)) { - // Count the frame towards the connection flow control window and don't process it further. + if (stream == null || stream.isResetSent() || streamCreatedAfterGoAwaySent(streamId)) { + // Ignoring this frame. We still need to count the frame towards the connection flow control + // window, but we immediately mark all bytes as consumed. flowController.receiveFlowControlledFrame(ctx, stream, data, padding, endOfStream); flowController.consumeBytes(ctx, stream, bytesToReturn); - // Since no bytes are consumed, return them all. + // Verify that the stream may have existed after we apply flow control. + verifyStreamMayHaveExisted(streamId); + + // All bytes have been consumed. return bytesToReturn; } @@ -264,31 +269,40 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder { public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int streamDependency, short weight, boolean exclusive, int padding, boolean endOfStream) throws Http2Exception { Http2Stream stream = connection.stream(streamId); - - if (stream == null) { + boolean allowHalfClosedRemote = false; + if (stream == null && !connection.streamMayHaveExisted(streamId)) { stream = connection.remote().createStream(streamId).open(endOfStream); - } else if (stream.isResetSent() || streamCreatedAfterGoAwaySent(stream)) { + // Allow the state to be HALF_CLOSE_REMOTE if we're creating it in that state. + allowHalfClosedRemote = stream.state() == HALF_CLOSED_REMOTE; + } + + if (stream == null || stream.isResetSent() || streamCreatedAfterGoAwaySent(streamId)) { // Ignore this frame. return; - } else { - switch (stream.state()) { - case RESERVED_REMOTE: - case IDLE: - stream.open(endOfStream); - break; - case OPEN: - case HALF_CLOSED_LOCAL: - // Allowed to receive headers in these states. - break; - case HALF_CLOSED_REMOTE: - case CLOSED: + } + + switch (stream.state()) { + case RESERVED_REMOTE: + case IDLE: + stream.open(endOfStream); + break; + case OPEN: + case HALF_CLOSED_LOCAL: + // Allowed to receive headers in these states. + break; + case HALF_CLOSED_REMOTE: + if (!allowHalfClosedRemote) { throw streamError(stream.id(), STREAM_CLOSED, "Stream %d in unexpected state: %s", - stream.id(), stream.state()); - default: - // Connection error. - throw connectionError(PROTOCOL_ERROR, "Stream %d in unexpected state: %s", stream.id(), - stream.state()); - } + stream.id(), stream.state()); + } + break; + case CLOSED: + throw streamError(stream.id(), STREAM_CLOSED, "Stream %d in unexpected state: %s", + stream.id(), stream.state()); + default: + // Connection error. + throw connectionError(PROTOCOL_ERROR, "Stream %d in unexpected state: %s", stream.id(), + stream.state()); } try { @@ -315,10 +329,15 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder { try { if (stream == null) { + if (connection.streamMayHaveExisted(streamId)) { + // Ignore this frame. + return; + } + // PRIORITY frames always identify a stream. This means that if a PRIORITY frame is the // first frame to be received for a stream that we must create the stream. stream = connection.remote().createStream(streamId); - } else if (streamCreatedAfterGoAwaySent(stream)) { + } else if (streamCreatedAfterGoAwaySent(streamId)) { // Ignore this frame. return; } @@ -336,7 +355,11 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder { @Override public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode) throws Http2Exception { - Http2Stream stream = connection.requireStream(streamId); + Http2Stream stream = connection.stream(streamId); + if (stream == null) { + verifyStreamMayHaveExisted(streamId); + return; + } switch(stream.state()) { case IDLE: @@ -434,12 +457,16 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder { @Override public void onPushPromiseRead(ChannelHandlerContext ctx, int streamId, int promisedStreamId, Http2Headers headers, int padding) throws Http2Exception { - Http2Stream parentStream = connection.requireStream(streamId); + Http2Stream parentStream = connection.stream(streamId); - if (streamCreatedAfterGoAwaySent(parentStream)) { + if (streamCreatedAfterGoAwaySent(streamId)) { return; } + if (parentStream == null) { + throw connectionError(PROTOCOL_ERROR, "Stream does not exist %d", streamId); + } + switch (parentStream.state()) { case OPEN: case HALF_CLOSED_LOCAL: @@ -483,9 +510,10 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder { @Override public void onWindowUpdateRead(ChannelHandlerContext ctx, int streamId, int windowSizeIncrement) throws Http2Exception { - Http2Stream stream = connection.requireStream(streamId); - - if (stream.state() == CLOSED || streamCreatedAfterGoAwaySent(stream)) { + Http2Stream stream = connection.stream(streamId); + if (stream == null || stream.state() == CLOSED || streamCreatedAfterGoAwaySent(streamId)) { + // Ignore this frame. + verifyStreamMayHaveExisted(streamId); return; } @@ -501,10 +529,16 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder { onUnknownFrame0(ctx, frameType, streamId, flags, payload); } - private boolean streamCreatedAfterGoAwaySent(Http2Stream stream) { + private boolean streamCreatedAfterGoAwaySent(int streamId) { // Ignore inbound frames after a GOAWAY was sent and the stream id is greater than // the last stream id set in the GOAWAY frame. - return connection().goAwaySent() && stream.id() > connection().remote().lastKnownStream(); + return connection.goAwaySent() && streamId > connection.remote().lastKnownStream(); + } + + private void verifyStreamMayHaveExisted(int streamId) throws Http2Exception { + if (!connection.streamMayHaveExisted(streamId)) { + throw connectionError(PROTOCOL_ERROR, "Stream does not exist %d", streamId); + } } } diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionEncoder.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionEncoder.java index 3e53d250f3..1d71e291de 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionEncoder.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionEncoder.java @@ -112,7 +112,8 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder { final boolean endOfStream, ChannelPromise promise) { final Http2Stream stream; try { - stream = connection.requireStream(streamId); + stream = requireStream(streamId); + // Verify that the stream is in the appropriate state for sending DATA frames. switch (stream.state()) { case OPEN: @@ -250,7 +251,7 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder { throw connectionError(PROTOCOL_ERROR, "Sending PUSH_PROMISE after GO_AWAY received."); } - Http2Stream stream = connection.requireStream(streamId); + Http2Stream stream = requireStream(streamId); // Reserve the promised stream. connection.local().reservePushStream(promisedStreamId, stream); } catch (Throwable e) { @@ -296,6 +297,20 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder { return frameWriter.configuration(); } + private Http2Stream requireStream(int streamId) { + Http2Stream stream = connection.stream(streamId); + if (stream == null) { + final String message; + if (connection.streamMayHaveExisted(streamId)) { + message = "Stream no longer exists: " + streamId; + } else { + message = "Stream does not exist: " + streamId; + } + throw new IllegalArgumentException(message); + } + return stream; + } + /** * Wrap a DATA frame so it can be written subject to flow-control. Note that this implementation assumes it * only writes padding once for the entire payload as opposed to writing it once per-frame. This makes the diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2LocalFlowController.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2LocalFlowController.java index 43ad3a5731..c1a7d495b3 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2LocalFlowController.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2LocalFlowController.java @@ -123,16 +123,16 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController @Override public void consumeBytes(ChannelHandlerContext ctx, Http2Stream stream, int numBytes) throws Http2Exception { - if (stream.id() == CONNECTION_STREAM_ID) { - throw new UnsupportedOperationException("Returning bytes for the connection window is not supported"); - } - if (numBytes <= 0) { - throw new IllegalArgumentException("numBytes must be positive"); - } - // Streams automatically consume all remaining bytes when they are closed, so just ignore // if already closed. - if (!isClosed(stream)) { + if (stream != null && !isClosed(stream)) { + if (stream.id() == CONNECTION_STREAM_ID) { + throw new UnsupportedOperationException("Returning bytes for the connection window is not supported"); + } + if (numBytes <= 0) { + throw new IllegalArgumentException("numBytes must be positive"); + } + connectionState().consumeBytes(ctx, numBytes); state(stream).consumeBytes(ctx, numBytes); } @@ -208,11 +208,10 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController int dataLength = data.readableBytes() + padding; // Apply the connection-level flow control - DefaultFlowState connectionState = connectionState(); connectionState.receiveFlowControlledFrame(dataLength); - if (!isClosed(stream)) { + if (stream != null && !isClosed(stream)) { // Apply the stream-level flow control DefaultFlowState state = state(stream); state.endOfStream(endOfStream); diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2Connection.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2Connection.java index 07d6cf6fd6..676abc4804 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2Connection.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2Connection.java @@ -147,7 +147,13 @@ public interface Http2Connection { * Indicates whether the given streamId is from the set of IDs used by this endpoint to * create new streams. */ - boolean createdStreamId(int streamId); + boolean isValidStreamId(int streamId); + + /** + * Indicates whether or not this endpoint may have created the given stream. This is {@code true} if + * {@link #isValidStreamId(int)} and {@code streamId} <= {@link #lastStreamCreated()}. + */ + boolean mayHaveCreatedStream(int streamId); /** * Indicates whether or not this endpoint is currently allowed to create new streams. This will be @@ -263,16 +269,17 @@ public interface Http2Connection { */ void removeListener(Listener listener); - /** - * Attempts to get the stream for the given ID. If it doesn't exist, throws. - */ - Http2Stream requireStream(int streamId) throws Http2Exception; - /** * Gets the stream if it exists. If not, returns {@code null}. */ Http2Stream stream(int streamId); + /** + * Indicates whether or not the given stream may have existed within this connection. This is a short form + * for calling {@link Endpoint#mayHaveCreatedStream(int)} on both endpoints. + */ + boolean streamMayHaveExisted(int streamId); + /** * Gets the stream object representing the connection, itself (i.e. stream zero). This object * always exists. diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2FrameListener.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2FrameListener.java index 2bb9a2125f..0a8535e123 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2FrameListener.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2FrameListener.java @@ -29,21 +29,17 @@ public interface Http2FrameListener { * @param streamId the subject stream for the frame. * @param data payload buffer for the frame. This buffer will be released by the codec. * @param padding the number of padding bytes found at the end of the frame. - * @param endOfStream Indicates whether this is the last frame to be sent from the remote - * endpoint for this stream. - * @return the number of bytes that have been processed by the application. The returned bytes - * are used by the inbound flow controller to determine the appropriate time to expand - * the inbound flow control window (i.e. send {@code WINDOW_UPDATE}). Returning a value - * equal to the length of {@code data} + {@code padding} will effectively opt-out of - * application-level flow control for this frame. Returning a value less than the length - * of {@code data} + {@code padding} will defer the returning of the processed bytes, - * which the application must later return via - * {@link Http2InboundFlowState#returnProcessedBytes(ChannelHandlerContext, int)}. The - * returned value must be >= {@code 0} and <= {@code data.readableBytes()} + - * {@code padding}. + * @param endOfStream Indicates whether this is the last frame to be sent from the remote endpoint for this stream. + * @return the number of bytes that have been processed by the application. The returned bytes are used by the + * inbound flow controller to determine the appropriate time to expand the inbound flow control window (i.e. send + * {@code WINDOW_UPDATE}). Returning a value equal to the length of {@code data} + {@code padding} will effectively + * opt-out of application-level flow control for this frame. Returning a value less than the length of {@code data} + * + {@code padding} will defer the returning of the processed bytes, which the application must later return via + * {@link Http2LocalFlowController#consumeBytes(ChannelHandlerContext, Http2Stream, int)}. The returned value must + * be >= {@code 0} and <= {@code data.readableBytes()} + {@code padding}. */ int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, - boolean endOfStream) throws Http2Exception; + boolean endOfStream) throws Http2Exception; /** * Handles an inbound {@code HEADERS} frame. diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2LocalFlowController.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2LocalFlowController.java index 83d193350e..a06e42be0b 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2LocalFlowController.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2LocalFlowController.java @@ -24,20 +24,20 @@ import io.netty.channel.ChannelHandlerContext; public interface Http2LocalFlowController extends Http2FlowController { /** - * Receives an inbound {@code DATA} frame from the remote endpoint and applies flow control - * policies to it for both the {@code stream} as well as the connection. If any flow control - * policies have been violated, an exception is raised immediately, otherwise the frame is - * considered to have "passed" flow control. + * Receives an inbound {@code DATA} frame from the remote endpoint and applies flow control policies to it for both + * the {@code stream} as well as the connection. If any flow control policies have been violated, an exception is + * raised immediately, otherwise the frame is considered to have "passed" flow control. *

- * If {@code stream} is closed, flow control should only be applied to the connection window. + * If {@code stream} is {@code null} or closed, flow control should only be applied to the connection window and the + * bytes are immediately consumed. * * @param ctx the context from the handler where the frame was read. - * @param stream the subject stream for the received frame. The connection stream object must - * not be used. + * @param stream the subject stream for the received frame. The connection stream object must not be used. If {@code + * stream} is {@code null} or closed, flow control should only be applied to the connection window and the bytes are + * immediately consumed. * @param data payload buffer for the frame. * @param padding the number of padding bytes found at the end of the frame. - * @param endOfStream Indicates whether this is the last frame to be sent from the remote - * endpoint for this stream. + * @param endOfStream Indicates whether this is the last frame to be sent from the remote endpoint for this stream. * @throws Http2Exception if any flow control errors are encountered. */ void receiveFlowControlledFrame(ChannelHandlerContext ctx, Http2Stream stream, ByteBuf data, int padding, @@ -49,13 +49,13 @@ public interface Http2LocalFlowController extends Http2FlowController { * control window will collapse. Consuming bytes enables the flow controller to send {@code WINDOW_UPDATE} to * restore a portion of the flow control window for the stream. *

- * If {@code stream} is closed (i.e. {@link Http2Stream#state()} method returns {@link Http2Stream.State#CLOSED}), - * the consumed bytes are only restored to the connection window. When a stream is closed, the flow controller - * automatically restores any unconsumed bytes for that stream to the connection window. This is done to ensure that - * the connection window does not degrade over time as streams are closed. + * If {@code stream} is {@code null} or closed (i.e. {@link Http2Stream#state()} method returns {@link + * Http2Stream.State#CLOSED}), calling this method has no effect. * * @param ctx the channel handler context to use when sending a {@code WINDOW_UPDATE} if appropriate * @param stream the stream for which window space should be freed. The connection stream object must not be used. + * If {@code stream} is {@code null} or closed (i.e. {@link Http2Stream#state()} method returns {@link + * Http2Stream.State#CLOSED}), calling this method has no effect. * @param numBytes the number of bytes to be returned to the flow control window. * @throws Http2Exception if the number of bytes returned exceeds the {@link #unconsumedBytes(Http2Stream)} for the * stream. diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionDecoderTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionDecoderTest.java index 9b0cdf5b0c..d503914f7e 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionDecoderTest.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionDecoderTest.java @@ -19,7 +19,6 @@ import static io.netty.buffer.Unpooled.wrappedBuffer; import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT; import static io.netty.handler.codec.http2.Http2CodecUtil.emptyPingBuf; import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR; -import static io.netty.handler.codec.http2.Http2Exception.connectionError; import static io.netty.handler.codec.http2.Http2Stream.State.IDLE; import static io.netty.handler.codec.http2.Http2Stream.State.OPEN; import static io.netty.handler.codec.http2.Http2Stream.State.RESERVED_REMOTE; @@ -38,7 +37,9 @@ import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; + import io.netty.buffer.ByteBuf; import io.netty.buffer.UnpooledByteBufAllocator; import io.netty.channel.Channel; @@ -47,10 +48,6 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; import io.netty.channel.DefaultChannelPromise; import io.netty.handler.codec.http2.Http2Exception.ClosedStreamCreationException; - -import java.util.Collections; -import java.util.concurrent.atomic.AtomicInteger; - import org.junit.Before; import org.junit.Test; import org.mockito.ArgumentCaptor; @@ -59,6 +56,9 @@ import org.mockito.MockitoAnnotations; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; +import java.util.Collections; +import java.util.concurrent.atomic.AtomicInteger; + /** * Tests for {@link DefaultHttp2ConnectionDecoder}. */ @@ -135,7 +135,7 @@ public class DefaultHttp2ConnectionDecoderTest { } }).when(connection).forEachActiveStream(any(Http2StreamVisitor.class)); when(connection.stream(STREAM_ID)).thenReturn(stream); - when(connection.requireStream(STREAM_ID)).thenReturn(stream); + when(connection.streamMayHaveExisted(STREAM_ID)).thenReturn(true); when(connection.local()).thenReturn(local); when(local.flowController()).thenReturn(localFlow); when(encoder.flowController()).thenReturn(remoteFlow); @@ -182,6 +182,48 @@ public class DefaultHttp2ConnectionDecoderTest { } } + @Test(expected = Http2Exception.class) + public void dataReadForUnknownStreamShouldApplyFlowControlAndFail() throws Exception { + when(connection.streamMayHaveExisted(STREAM_ID)).thenReturn(false); + when(connection.stream(STREAM_ID)).thenReturn(null); + final ByteBuf data = dummyData(); + int padding = 10; + int processedBytes = data.readableBytes() + padding; + try { + decode().onDataRead(ctx, STREAM_ID, data, padding, true); + } finally { + try { + verify(localFlow) + .receiveFlowControlledFrame(eq(ctx), eq((Http2Stream) null), eq(data), eq(padding), eq(true)); + verify(localFlow).consumeBytes(eq(ctx), eq((Http2Stream) null), eq(processedBytes)); + verifyNoMoreInteractions(localFlow); + verify(listener, never()).onDataRead(eq(ctx), anyInt(), any(ByteBuf.class), anyInt(), anyBoolean()); + } finally { + data.release(); + } + } + } + + @Test + public void dataReadForUnknownStreamShouldApplyFlowControl() throws Exception { + when(connection.stream(STREAM_ID)).thenReturn(null); + final ByteBuf data = dummyData(); + int padding = 10; + int processedBytes = data.readableBytes() + padding; + try { + decode().onDataRead(ctx, STREAM_ID, data, padding, true); + verify(localFlow) + .receiveFlowControlledFrame(eq(ctx), eq((Http2Stream) null), eq(data), eq(padding), eq(true)); + verify(localFlow).consumeBytes(eq(ctx), eq((Http2Stream) null), eq(processedBytes)); + verifyNoMoreInteractions(localFlow); + + // Verify that the event was absorbed and not propagated to the observer. + verify(listener, never()).onDataRead(eq(ctx), anyInt(), any(ByteBuf.class), anyInt(), anyBoolean()); + } finally { + data.release(); + } + } + @Test public void emptyDataFrameShouldApplyFlowControl() throws Exception { final ByteBuf data = EMPTY_BUFFER; @@ -314,6 +356,19 @@ public class DefaultHttp2ConnectionDecoderTest { verify(stream, never()).open(anyBoolean()); } + @Test + public void headersReadForUnknownStreamShouldBeIgnored() throws Exception { + when(connection.stream(STREAM_ID)).thenReturn(null); + decode().onHeadersRead(ctx, STREAM_ID, EmptyHttp2Headers.INSTANCE, 0, false); + verify(remote, never()).createStream(eq(STREAM_ID)); + verify(stream, never()).open(anyBoolean()); + + // Verify that the event was absorbed and not propagated to the oberver. + verify(listener, never()).onHeadersRead(eq(ctx), anyInt(), any(Http2Headers.class), anyInt(), anyBoolean()); + verify(remote, never()).createStream(anyInt()); + verify(stream, never()).open(anyBoolean()); + } + @Test public void headersReadForUnknownStreamShouldCreateStream() throws Exception { final int streamId = 5; @@ -408,6 +463,12 @@ public class DefaultHttp2ConnectionDecoderTest { verify(listener, never()).onPushPromiseRead(eq(ctx), anyInt(), anyInt(), any(Http2Headers.class), anyInt()); } + @Test(expected = Http2Exception.class) + public void pushPromiseReadForUnknownStreamShouldThrow() throws Exception { + when(connection.stream(STREAM_ID)).thenReturn(null); + decode().onPushPromiseRead(ctx, STREAM_ID, PUSH_STREAM_ID, EmptyHttp2Headers.INSTANCE, 0); + } + @Test public void pushPromiseReadShouldSucceed() throws Exception { decode().onPushPromiseRead(ctx, STREAM_ID, PUSH_STREAM_ID, EmptyHttp2Headers.INSTANCE, 0); @@ -425,9 +486,17 @@ public class DefaultHttp2ConnectionDecoderTest { } @Test - public void priorityReadShouldSucceed() throws Exception { + public void priorityReadForUnknownStreamShouldBeIgnored() throws Exception { + when(connection.stream(STREAM_ID)).thenReturn(null); + decode().onPriorityRead(ctx, STREAM_ID, 0, (short) 255, true); + verify(stream, never()).setPriority(anyInt(), anyShort(), anyBoolean()); + verify(listener, never()).onPriorityRead(eq(ctx), anyInt(), anyInt(), anyShort(), anyBoolean()); + } + + @Test + public void priorityReadShouldCreateNewStream() throws Exception { + when(connection.streamMayHaveExisted(STREAM_ID)).thenReturn(false); when(connection.stream(STREAM_ID)).thenReturn(null); - when(connection.requireStream(STREAM_ID)).thenReturn(null); decode().onPriorityRead(ctx, STREAM_ID, STREAM_DEPENDENCY_ID, (short) 255, true); verify(stream).setPriority(eq(STREAM_DEPENDENCY_ID), eq((short) 255), eq(true)); verify(listener).onPriorityRead(eq(ctx), eq(STREAM_ID), eq(STREAM_DEPENDENCY_ID), eq((short) 255), eq(true)); @@ -435,25 +504,6 @@ public class DefaultHttp2ConnectionDecoderTest { verify(stream, never()).open(anyBoolean()); } - @Test - public void priorityReadOnPreviouslyExistingStreamShouldSucceed() throws Exception { - doAnswer(new Answer() { - @Override - public Http2Stream answer(InvocationOnMock in) throws Throwable { - throw new ClosedStreamCreationException(Http2Error.INTERNAL_ERROR); - } - }).when(remote).createStream(eq(STREAM_ID)); - when(connection.stream(STREAM_ID)).thenReturn(null); - when(connection.requireStream(STREAM_ID)).thenReturn(null); - // Just return the stream object as the connection stream to ensure the dependent stream "exists" - when(connection.stream(STREAM_DEPENDENCY_ID)).thenReturn(stream); - when(connection.requireStream(STREAM_DEPENDENCY_ID)).thenReturn(stream); - decode().onPriorityRead(ctx, STREAM_ID, STREAM_DEPENDENCY_ID, (short) 255, true); - verify(stream, never()).setPriority(anyInt(), anyShort(), anyBoolean()); - verify(listener).onPriorityRead(eq(ctx), eq(STREAM_ID), eq(STREAM_DEPENDENCY_ID), eq((short) 255), eq(true)); - verify(remote).createStream(STREAM_ID); - } - @Test public void priorityReadOnPreviouslyParentExistingStreamShouldSucceed() throws Exception { doAnswer(new Answer() { @@ -462,8 +512,6 @@ public class DefaultHttp2ConnectionDecoderTest { throw new ClosedStreamCreationException(Http2Error.INTERNAL_ERROR); } }).when(stream).setPriority(eq(STREAM_DEPENDENCY_ID), eq((short) 255), eq(true)); - when(connection.stream(STREAM_ID)).thenReturn(stream); - when(connection.requireStream(STREAM_ID)).thenReturn(stream); decode().onPriorityRead(ctx, STREAM_ID, STREAM_DEPENDENCY_ID, (short) 255, true); verify(stream).setPriority(eq(STREAM_DEPENDENCY_ID), eq((short) 255), eq(true)); verify(listener).onPriorityRead(eq(ctx), eq(STREAM_ID), eq(STREAM_DEPENDENCY_ID), eq((short) 255), eq(true)); @@ -479,8 +527,17 @@ public class DefaultHttp2ConnectionDecoderTest { @Test(expected = Http2Exception.class) public void windowUpdateReadForUnknownStreamShouldThrow() throws Exception { - when(connection.requireStream(5)).thenThrow(connectionError(PROTOCOL_ERROR, "")); - decode().onWindowUpdateRead(ctx, 5, 10); + when(connection.streamMayHaveExisted(STREAM_ID)).thenReturn(false); + when(connection.stream(STREAM_ID)).thenReturn(null); + decode().onWindowUpdateRead(ctx, STREAM_ID, 10); + } + + @Test + public void windowUpdateReadForUnknownStreamShouldBeIgnored() throws Exception { + when(connection.stream(STREAM_ID)).thenReturn(null); + decode().onWindowUpdateRead(ctx, STREAM_ID, 10); + verify(remoteFlow, never()).incrementWindowSize(eq(ctx), any(Http2Stream.class), anyInt()); + verify(listener, never()).onWindowUpdateRead(eq(ctx), anyInt(), anyInt()); } @Test @@ -500,8 +557,17 @@ public class DefaultHttp2ConnectionDecoderTest { @Test(expected = Http2Exception.class) public void rstStreamReadForUnknownStreamShouldThrow() throws Exception { - when(connection.requireStream(5)).thenThrow(connectionError(PROTOCOL_ERROR, "")); - decode().onRstStreamRead(ctx, 5, PROTOCOL_ERROR.code()); + when(connection.streamMayHaveExisted(STREAM_ID)).thenReturn(false); + when(connection.stream(STREAM_ID)).thenReturn(null); + decode().onRstStreamRead(ctx, STREAM_ID, PROTOCOL_ERROR.code()); + } + + @Test + public void rstStreamReadForUnknownStreamShouldBeIgnored() throws Exception { + when(connection.stream(STREAM_ID)).thenReturn(null); + decode().onRstStreamRead(ctx, STREAM_ID, PROTOCOL_ERROR.code()); + verify(lifecycleManager, never()).closeStream(eq(stream), eq(future)); + verify(listener, never()).onRstStreamRead(eq(ctx), anyInt(), anyLong()); } @Test diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionEncoderTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionEncoderTest.java index 5be5494812..098bb7b7d5 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionEncoderTest.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionEncoderTest.java @@ -148,7 +148,6 @@ public class DefaultHttp2ConnectionEncoderTest { } }).when(connection).forEachActiveStream(any(Http2StreamVisitor.class)); when(connection.stream(STREAM_ID)).thenReturn(stream); - when(connection.requireStream(STREAM_ID)).thenReturn(stream); when(connection.local()).thenReturn(local); when(connection.remote()).thenReturn(remote); when(remote.flowController()).thenReturn(remoteFlow); @@ -357,7 +356,6 @@ public class DefaultHttp2ConnectionEncoderTest { @Test public void priorityWriteShouldSetPriorityForStream() throws Exception { when(connection.stream(STREAM_ID)).thenReturn(null); - when(connection.requireStream(STREAM_ID)).thenReturn(null); encoder.writePriority(ctx, STREAM_ID, 0, (short) 255, true, promise); verify(stream).setPriority(eq(0), eq((short) 255), eq(true)); verify(writer).writePriority(eq(ctx), eq(STREAM_ID), eq(0), eq((short) 255), eq(true), eq(promise)); @@ -374,10 +372,8 @@ public class DefaultHttp2ConnectionEncoderTest { } }).when(local).createStream(eq(STREAM_ID)); when(connection.stream(STREAM_ID)).thenReturn(null); - when(connection.requireStream(STREAM_ID)).thenReturn(null); // Just return the stream object as the connection stream to ensure the dependent stream "exists" when(connection.stream(0)).thenReturn(stream); - when(connection.requireStream(0)).thenReturn(stream); encoder.writePriority(ctx, STREAM_ID, 0, (short) 255, true, promise); verify(stream, never()).setPriority(anyInt(), anyShort(), anyBoolean()); verify(writer).writePriority(eq(ctx), eq(STREAM_ID), eq(0), eq((short) 255), eq(true), eq(promise)); @@ -393,7 +389,6 @@ public class DefaultHttp2ConnectionEncoderTest { } }).when(stream).setPriority(eq(0), eq((short) 255), eq(true)); when(connection.stream(STREAM_ID)).thenReturn(stream); - when(connection.requireStream(STREAM_ID)).thenReturn(stream); encoder.writePriority(ctx, STREAM_ID, 0, (short) 255, true, promise); verify(stream).setPriority(eq(0), eq((short) 255), eq(true)); verify(writer).writePriority(eq(ctx), eq(STREAM_ID), eq(0), eq((short) 255), eq(true), eq(promise)); diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionTest.java index 2569d5d5c1..5726df145b 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionTest.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionTest.java @@ -74,11 +74,6 @@ public class DefaultHttp2ConnectionTest { client.addListener(clientListener); } - @Test(expected = Http2Exception.class) - public void getStreamOrFailWithoutStreamShouldFail() throws Http2Exception { - server.requireStream(100); - } - @Test public void getStreamWithoutStreamShouldReturnNull() { assertNull(server.stream(100)); diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2LocalFlowControllerTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2LocalFlowControllerTest.java index 567e625fd9..3551cf9622 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2LocalFlowControllerTest.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2LocalFlowControllerTest.java @@ -217,6 +217,18 @@ public class DefaultHttp2LocalFlowControllerTest { assertEquals(0, controller.unconsumedBytes(connection.connectionStream())); } + @Test + public void dataReceivedForNullStreamShouldImmediatelyConsumeBytes() throws Http2Exception { + receiveFlowControlledFrame(null, 10, 0, false); + assertEquals(0, controller.unconsumedBytes(connection.connectionStream())); + } + + @Test + public void consumeBytesForNullStreamShouldIgnore() throws Http2Exception { + controller.consumeBytes(ctx, null, 10); + assertEquals(0, controller.unconsumedBytes(connection.connectionStream())); + } + @Test public void globalRatioShouldImpactStreams() throws Http2Exception { float ratio = 0.6f; @@ -311,7 +323,7 @@ public class DefaultHttp2LocalFlowControllerTest { return stream(streamId).localFlowState().windowSize(); } - private Http2Stream stream(int streamId) throws Http2Exception { - return connection.requireStream(streamId); + private Http2Stream stream(int streamId) { + return connection.stream(streamId); } } diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2RemoteFlowControllerTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2RemoteFlowControllerTest.java index 00584a62d8..7acd2c7c32 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2RemoteFlowControllerTest.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2RemoteFlowControllerTest.java @@ -1233,8 +1233,8 @@ public class DefaultHttp2RemoteFlowControllerTest { return controller.streamableBytesForTree(stream); } - private Http2Stream stream(int streamId) throws Http2Exception { - return connection.requireStream(streamId); + private Http2Stream stream(int streamId) { + return connection.stream(streamId); } private static final class FakeFlowControlled implements Http2RemoteFlowController.FlowControlled {