From 5759f1d3961f073f506efa80b2d3ba14ab923642 Mon Sep 17 00:00:00 2001 From: nmittler Date: Thu, 16 Oct 2014 15:34:54 -0700 Subject: [PATCH] Changing stream verification to throw Http2StreamException. Motivation: Currently when receiving DATA/HEADERS frames, we throw Http2Exception (a connection error) instead of Http2StreamException (stream error). This is incorrect according to the HTTP/2 spec. Modifications: Updated various places in the encoder and decoder that were out of spec WRT connection/state checking. Result: Stream state verification is properly handled. --- .../codec/http2/DefaultHttp2Connection.java | 66 +++--- .../http2/DefaultHttp2ConnectionDecoder.java | 128 ++++++++--- .../http2/DefaultHttp2ConnectionEncoder.java | 210 ++++++++++-------- .../codec/http2/Http2ConnectionDecoder.java | 5 +- .../codec/http2/Http2ConnectionEncoder.java | 13 ++ .../codec/http2/Http2ConnectionHandler.java | 2 +- .../codec/http2/Http2FrameListener.java | 8 +- .../handler/codec/http2/Http2Stream.java | 51 +++-- .../codec/http2/Http2StreamException.java | 8 + .../DefaultHttp2ConnectionDecoderTest.java | 42 ++++ .../DefaultHttp2ConnectionEncoderTest.java | 2 +- 11 files changed, 361 insertions(+), 174 deletions(-) diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2Connection.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2Connection.java index 043c2b8d9d..2775ed745f 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2Connection.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2Connection.java @@ -20,7 +20,6 @@ import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_PRIORITY_WEIGH import static io.netty.handler.codec.http2.Http2CodecUtil.MAX_WEIGHT; import static io.netty.handler.codec.http2.Http2CodecUtil.MIN_WEIGHT; import static io.netty.handler.codec.http2.Http2CodecUtil.immediateRemovalPolicy; -import static io.netty.handler.codec.http2.Http2Exception.format; import static io.netty.handler.codec.http2.Http2Exception.protocolError; import static io.netty.handler.codec.http2.Http2Stream.State.CLOSED; import static io.netty.handler.codec.http2.Http2Stream.State.HALF_CLOSED_LOCAL; @@ -215,8 +214,10 @@ public class DefaultHttp2Connection implements Http2Connection { private DefaultStream parent; private IntObjectMap children = newChildMap(); private int totalChildWeights; - private boolean terminateSent; - private boolean terminateReceived; + private boolean resetSent; + private boolean resetReceived; + private boolean endOfStreamSent; + private boolean endOfStreamReceived; private FlowState inboundFlow; private FlowState outboundFlow; private EmbeddedChannel decompressor; @@ -237,28 +238,52 @@ public class DefaultHttp2Connection implements Http2Connection { } @Override - public boolean isTerminateReceived() { - return terminateReceived; + public boolean isEndOfStreamReceived() { + return endOfStreamReceived; } @Override - public void terminateReceived() { - terminateReceived = true; + public Http2Stream endOfStreamReceived() { + endOfStreamReceived = true; + return this; } @Override - public boolean isTerminateSent() { - return terminateSent; + public boolean isEndOfStreamSent() { + return endOfStreamSent; } @Override - public void terminateSent() { - terminateSent = true; + public Http2Stream endOfStreamSent() { + endOfStreamSent = true; + return this; } @Override - public boolean isTerminated() { - return terminateSent || terminateReceived; + public boolean isResetReceived() { + return resetReceived; + } + + @Override + public Http2Stream resetReceived() { + resetReceived = true; + return this; + } + + @Override + public boolean isResetSent() { + return resetSent; + } + + @Override + public Http2Stream resetSent() { + resetSent = true; + return this; + } + + @Override + public boolean isReset() { + return resetSent || resetReceived; } @Override @@ -394,16 +419,6 @@ public class DefaultHttp2Connection implements Http2Connection { return this; } - @Override - public Http2Stream verifyState(Http2Error error, State... allowedStates) throws Http2Exception { - for (State allowedState : allowedStates) { - if (state == allowedState) { - return this; - } - } - throw format(error, "Stream %d in unexpected state: %s", id, state); - } - @Override public Http2Stream openForPush() throws Http2Exception { switch (state) { @@ -632,11 +647,6 @@ public class DefaultHttp2Connection implements Http2Connection { throw new UnsupportedOperationException(); } - @Override - public Http2Stream verifyState(Http2Error error, State... allowedStates) { - throw new UnsupportedOperationException(); - } - @Override public Http2Stream openForPush() { throw new UnsupportedOperationException(); diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionDecoder.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionDecoder.java index d27fc6dc5f..810951b448 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionDecoder.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionDecoder.java @@ -15,13 +15,10 @@ package io.netty.handler.codec.http2; import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT; -import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR; import static io.netty.handler.codec.http2.Http2Error.STREAM_CLOSED; import static io.netty.handler.codec.http2.Http2Exception.protocolError; import static io.netty.handler.codec.http2.Http2Stream.State.CLOSED; -import static io.netty.handler.codec.http2.Http2Stream.State.HALF_CLOSED_LOCAL; -import static io.netty.handler.codec.http2.Http2Stream.State.OPEN; -import static io.netty.handler.codec.http2.Http2Stream.State.RESERVED_REMOTE; +import static io.netty.handler.codec.http2.Http2StreamException.streamClosedError; import static io.netty.util.internal.ObjectUtil.checkNotNull; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; @@ -207,21 +204,61 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder { // Check if we received a data frame for a stream which is half-closed Http2Stream stream = connection.requireStream(streamId); - stream.verifyState(STREAM_CLOSED, OPEN, HALF_CLOSED_LOCAL); - - // Apply flow control. - inboundFlow.onDataRead(ctx, streamId, data, padding, endOfStream); + verifyEndOfStreamNotReceived(stream); verifyGoAwayNotReceived(); verifyRstStreamNotReceived(stream); - if (shouldIgnoreFrame(stream)) { - // Ignore this frame. + + // We should ignore this frame if RST_STREAM was sent or if GO_AWAY was sent with a + // lower stream ID. + boolean shouldIgnore = shouldIgnoreFrame(stream); + + boolean shouldApplyFlowControl = false; + Http2Exception error = null; + switch (stream.state()) { + case OPEN: + case HALF_CLOSED_LOCAL: + shouldApplyFlowControl = true; + break; + case HALF_CLOSED_REMOTE: + case CLOSED: + if (stream.isResetSent()) { + shouldApplyFlowControl = true; + } + if (!shouldIgnore) { + // Stream error. + error = streamClosedError(stream.id(), "Stream %d in unexpected state: %s", + stream.id(), stream.state()); + } + break; + default: + if (!shouldIgnore) { + // Connection error. + error = protocolError("Stream %d in unexpected state: %s", stream.id(), + stream.state()); + } + break; + } + + // If we should apply flow control, do so now. + if (shouldApplyFlowControl) { + inboundFlow.onDataRead(ctx, streamId, data, padding, endOfStream); + } + + // If we should ignore this frame, do so now. + if (shouldIgnore) { return; } + // If the stream was in an invalid state to receive the frame, throw the error. + if (error != null) { + throw error; + } + listener.onDataRead(ctx, streamId, data, padding, endOfStream); if (endOfStream) { + stream.endOfStreamReceived(); lifecycleManager.closeRemoteSide(stream, ctx.newSucceededFuture()); } } @@ -237,13 +274,13 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder { @Override public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int padding, - boolean endStream) throws Http2Exception { - onHeadersRead(ctx, streamId, headers, 0, DEFAULT_PRIORITY_WEIGHT, false, padding, endStream); + boolean endOfStream) throws Http2Exception { + onHeadersRead(ctx, streamId, headers, 0, DEFAULT_PRIORITY_WEIGHT, false, padding, endOfStream); } @Override public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int streamDependency, - short weight, boolean exclusive, int padding, boolean endStream) throws Http2Exception { + short weight, boolean exclusive, int padding, boolean endOfStream) throws Http2Exception { verifyPrefaceReceived(); Http2Stream stream = connection.stream(streamId); @@ -255,25 +292,40 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder { } if (stream == null) { - stream = connection.createRemoteStream(streamId, endStream); + stream = connection.createRemoteStream(streamId, endOfStream); } else { - if (stream.state() == RESERVED_REMOTE) { - // Received headers for a reserved push stream ... open it for push to the local endpoint. - stream.verifyState(PROTOCOL_ERROR, RESERVED_REMOTE); - stream.openForPush(); - } else { - // Receiving headers on an existing stream. Make sure the stream is in an allowed state. - stream.verifyState(PROTOCOL_ERROR, OPEN, HALF_CLOSED_LOCAL); + verifyEndOfStreamNotReceived(stream); + + switch (stream.state()) { + case RESERVED_REMOTE: + // Received headers for a reserved push stream ... open it for push to the + // local endpoint. + stream.openForPush(); + break; + case OPEN: + case HALF_CLOSED_LOCAL: + // Allowed to receive headers in these states. + break; + case HALF_CLOSED_REMOTE: + case CLOSED: + // Stream error. + throw streamClosedError(stream.id(), "Stream %d in unexpected state: %s", + stream.id(), stream.state()); + default: + // Connection error. + throw protocolError("Stream %d in unexpected state: %s", stream.id(), + stream.state()); } } listener.onHeadersRead(ctx, streamId, headers, - streamDependency, weight, exclusive, padding, endStream); + streamDependency, weight, exclusive, padding, endOfStream); stream.setPriority(streamDependency, weight, exclusive); // If the headers completes this stream, close it. - if (endStream) { + if (endOfStream) { + stream.endOfStreamReceived(); lifecycleManager.closeRemoteSide(stream, ctx.newSucceededFuture()); } } @@ -307,7 +359,7 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder { return; } - stream.terminateReceived(); + stream.resetReceived(); listener.onRstStreamRead(ctx, streamId, errorCode); @@ -468,26 +520,40 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder { } // Also ignore inbound frames after we sent a RST_STREAM frame. - return stream.isTerminateSent(); + return stream.isResetSent(); } /** - * Verifies that a GO_AWAY frame was not previously received from the remote endpoint. If it was, throws an - * exception. + * Verifies that a frame has not been received from remote endpoint with the + * {@code END_STREAM} flag set. If it was, throws a connection error. + */ + private void verifyEndOfStreamNotReceived(Http2Stream stream) throws Http2Exception { + if (stream.isEndOfStreamReceived()) { + // Connection error. + throw new Http2Exception(STREAM_CLOSED, String.format( + "Received frame for stream %d after receiving END_STREAM", stream.id())); + } + } + + /** + * Verifies that a GO_AWAY frame was not previously received from the remote endpoint. If it was, throws a + * connection error. */ private void verifyGoAwayNotReceived() throws Http2Exception { if (connection.goAwayReceived()) { + // Connection error. throw protocolError("Received frames after receiving GO_AWAY"); } } /** - * Verifies that a RST_STREAM frame was not previously received for the given stream. If it was, throws an - * exception. + * Verifies that a RST_STREAM frame was not previously received for the given stream. If it was, throws a + * stream error. */ private void verifyRstStreamNotReceived(Http2Stream stream) throws Http2Exception { - if (stream != null && stream.isTerminateReceived()) { - throw new Http2StreamException(stream.id(), STREAM_CLOSED, + if (stream != null && stream.isResetReceived()) { + // Stream error. + throw streamClosedError(stream.id(), "Frame received after receiving RST_STREAM for stream: " + stream.id()); } } diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionEncoder.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionEncoder.java index 6815ebe36a..05e710b7cb 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionEncoder.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionEncoder.java @@ -15,11 +15,7 @@ package io.netty.handler.codec.http2; import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT; -import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR; import static io.netty.handler.codec.http2.Http2Exception.protocolError; -import static io.netty.handler.codec.http2.Http2Stream.State.HALF_CLOSED_REMOTE; -import static io.netty.handler.codec.http2.Http2Stream.State.OPEN; -import static io.netty.handler.codec.http2.Http2Stream.State.RESERVED_LOCAL; import static io.netty.util.internal.ObjectUtil.checkNotNull; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelFuture; @@ -146,40 +142,58 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder { @Override public ChannelFuture writeData(final ChannelHandlerContext ctx, final int streamId, ByteBuf data, int padding, - final boolean endStream, ChannelPromise promise) { - boolean release = true; + final boolean endOfStream, ChannelPromise promise) { try { if (connection.isGoAway()) { - throw protocolError("Sending data after connection going away."); + throw new IllegalStateException("Sending data after connection going away."); } Http2Stream stream = connection.requireStream(streamId); - stream.verifyState(PROTOCOL_ERROR, OPEN, HALF_CLOSED_REMOTE); - - // Hand control of the frame to the flow controller. - ChannelFuture future = outboundFlow.writeData(ctx, streamId, data, padding, endStream, promise); - release = false; - future.addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - if (!future.isSuccess()) { - // The write failed, handle the error. - lifecycleManager.onException(ctx, future.cause()); - } else if (endStream) { - // Close the local side of the stream if this is the last frame - Http2Stream stream = connection.stream(streamId); - lifecycleManager.closeLocalSide(stream, ctx.newPromise()); - } - } - }); - - return future; - } catch (Http2Exception e) { - if (release) { - data.release(); + if (stream.isResetSent()) { + throw new IllegalStateException("Sending data after sending RST_STREAM."); } + if (stream.isEndOfStreamSent()) { + throw new IllegalStateException("Sending data after sending END_STREAM."); + } + + // Verify that the stream is in the appropriate state for sending DATA frames. + switch (stream.state()) { + case OPEN: + case HALF_CLOSED_REMOTE: + // Allowed sending DATA frames in these states. + break; + default: + throw new IllegalStateException(String.format( + "Stream %d in unexpected state: %s", stream.id(), stream.state())); + } + + if (endOfStream) { + // Indicate that we have sent END_STREAM. + stream.endOfStreamSent(); + } + } catch (Throwable e) { + data.release(); return promise.setFailure(e); } + + // Hand control of the frame to the flow controller. + ChannelFuture future = + outboundFlow.writeData(ctx, streamId, data, padding, endOfStream, promise); + future.addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (!future.isSuccess()) { + // The write failed, handle the error. + lifecycleManager.onException(ctx, future.cause()); + } else if (endOfStream) { + // Close the local side of the stream if this is the last frame + Http2Stream stream = connection.stream(streamId); + lifecycleManager.closeLocalSide(stream, ctx.newPromise()); + } + } + }); + + return future; } @Override @@ -190,47 +204,56 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder { @Override public ChannelFuture writeHeaders(ChannelHandlerContext ctx, int streamId, Http2Headers headers, - int streamDependency, short weight, boolean exclusive, int padding, boolean endStream, + int streamDependency, short weight, boolean exclusive, int padding, boolean endOfStream, ChannelPromise promise) { + Http2Stream stream = connection.stream(streamId); try { if (connection.isGoAway()) { throw protocolError("Sending headers after connection going away."); } - Http2Stream stream = connection.stream(streamId); if (stream == null) { // Create a new locally-initiated stream. - stream = connection.createLocalStream(streamId, endStream); + stream = connection.createLocalStream(streamId, endOfStream); } else { - // An existing stream... - if (stream.state() == RESERVED_LOCAL) { - // Sending headers on a reserved push stream ... open it for push to the remote - // endpoint. - stream.openForPush(); - } else { - // The stream already exists, make sure it's in an allowed state. - stream.verifyState(PROTOCOL_ERROR, OPEN, HALF_CLOSED_REMOTE); + if (stream.isResetSent()) { + throw new IllegalStateException("Sending headers after sending RST_STREAM."); + } + if (stream.isEndOfStreamSent()) { + throw new IllegalStateException("Sending headers after sending END_STREAM."); + } - // Update the priority for this stream only if we'll be sending more data. - if (!endStream) { - stream.setPriority(streamDependency, weight, exclusive); - } + // An existing stream... + switch (stream.state()) { + case RESERVED_LOCAL: + // Sending headers on a reserved push stream ... open it for push to the remote endpoint. + stream.openForPush(); + break; + case OPEN: + case HALF_CLOSED_REMOTE: + // Allowed sending headers in these states. + break; + default: + throw new IllegalStateException(String.format( + "Stream %d in unexpected state: %s", stream.id(), stream.state())); } } - - ChannelFuture future = frameWriter.writeHeaders(ctx, streamId, headers, streamDependency, weight, - exclusive, padding, endStream, promise); - ctx.flush(); - - // If the headers are the end of the stream, close it now. - if (endStream) { - lifecycleManager.closeLocalSide(stream, promise); - } - - return future; - } catch (Http2Exception e) { + } catch (Throwable e) { return promise.setFailure(e); } + + ChannelFuture future = + frameWriter.writeHeaders(ctx, streamId, headers, streamDependency, weight, + exclusive, padding, endOfStream, promise); + ctx.flush(); + + // If the headers are the end of the stream, close it now. + if (endOfStream) { + stream.endOfStreamSent(); + lifecycleManager.closeLocalSide(stream, promise); + } + + return future; } @Override @@ -243,14 +266,15 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder { // Update the priority on this stream. connection.requireStream(streamId).setPriority(streamDependency, weight, exclusive); - - ChannelFuture future = frameWriter.writePriority(ctx, streamId, streamDependency, weight, exclusive, - promise); - ctx.flush(); - return future; - } catch (Http2Exception e) { + } catch (Throwable e) { return promise.setFailure(e); } + + ChannelFuture future = + frameWriter.writePriority(ctx, streamId, streamDependency, weight, exclusive, + promise); + ctx.flush(); + return future; } @Override @@ -286,7 +310,7 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder { ctx.flush(); if (stream != null) { - stream.terminateSent(); + stream.resetSent(); lifecycleManager.closeStream(stream, promise); } @@ -294,7 +318,8 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder { } @Override - public ChannelFuture writeSettings(ChannelHandlerContext ctx, Http2Settings settings, ChannelPromise promise) { + public ChannelFuture writeSettings(ChannelHandlerContext ctx, Http2Settings settings, + ChannelPromise promise) { outstandingLocalSettingsQueue.add(settings); try { if (connection.isGoAway()) { @@ -305,13 +330,13 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder { if (pushEnabled != null && connection.isServer()) { throw protocolError("Server sending SETTINGS frame with ENABLE_PUSH specified"); } - - ChannelFuture future = frameWriter.writeSettings(ctx, settings, promise); - ctx.flush(); - return future; - } catch (Http2Exception e) { + } catch (Throwable e) { return promise.setFailure(e); } + + ChannelFuture future = frameWriter.writeSettings(ctx, settings, promise); + ctx.flush(); + return future; } @Override @@ -320,23 +345,16 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder { } @Override - public ChannelFuture writePing(ChannelHandlerContext ctx, boolean ack, ByteBuf data, ChannelPromise promise) { - boolean release = true; - try { - if (connection.isGoAway()) { - throw protocolError("Sending ping after connection going away."); - } - - frameWriter.writePing(ctx, ack, data, promise); - release = false; - ctx.flush(); - return promise; - } catch (Http2Exception e) { - if (release) { - data.release(); - } - return promise.setFailure(e); + public ChannelFuture writePing(ChannelHandlerContext ctx, boolean ack, ByteBuf data, + ChannelPromise promise) { + if (connection.isGoAway()) { + data.release(); + return promise.setFailure(protocolError("Sending ping after connection going away.")); } + + ChannelFuture future = frameWriter.writePing(ctx, ack, data, promise); + ctx.flush(); + return future; } @Override @@ -350,14 +368,16 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder { // Reserve the promised stream. Http2Stream stream = connection.requireStream(streamId); connection.local().reservePushStream(promisedStreamId, stream); - - // Write the frame. - frameWriter.writePushPromise(ctx, streamId, promisedStreamId, headers, padding, promise); - ctx.flush(); - return promise; - } catch (Http2Exception e) { + } catch (Throwable e) { return promise.setFailure(e); } + + // Write the frame. + ChannelFuture future = + frameWriter.writePushPromise(ctx, streamId, promisedStreamId, headers, padding, + promise); + ctx.flush(); + return future; } @Override @@ -369,6 +389,12 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder { @Override public ChannelFuture writeWindowUpdate(ChannelHandlerContext ctx, int streamId, int windowSizeIncrement, ChannelPromise promise) { + if (streamId > 0) { + Http2Stream stream = connection().stream(streamId); + if (stream != null && stream.isResetSent()) { + throw new IllegalStateException("Sending data after sending RST_STREAM."); + } + } return frameWriter.writeWindowUpdate(ctx, streamId, windowSizeIncrement, promise); } diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ConnectionDecoder.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ConnectionDecoder.java index 547a142b74..08ae07b31f 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ConnectionDecoder.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ConnectionDecoder.java @@ -21,7 +21,10 @@ import java.io.Closeable; import java.util.List; /** - * Handler for inbound traffic on behalf of {@link Http2ConnectionHandler}. + * Handler for inbound traffic on behalf of {@link Http2ConnectionHandler}. Performs basic protocol + * conformance on inbound frames before calling the delegate {@link Http2FrameListener} for + * application-specific processing. Note that frames of an unknown type (i.e. HTTP/2 extensions) + * will skip all protocol checks and be given directly to the listener for processing. */ public interface Http2ConnectionDecoder extends Closeable { diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ConnectionEncoder.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ConnectionEncoder.java index 85b48fc107..862915d220 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ConnectionEncoder.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ConnectionEncoder.java @@ -14,6 +14,11 @@ */ package io.netty.handler.codec.http2; +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPromise; + /** * Handler for outbound traffic on behalf of {@link Http2ConectionHandler}. @@ -71,4 +76,12 @@ public interface Http2ConnectionEncoder extends Http2FrameWriter, Http2OutboundF * Sets the settings for the remote endpoint of the HTTP/2 connection. */ void remoteSettings(Http2Settings settings) throws Http2Exception; + + /** + * Writes the given data to the internal {@link Http2FrameWriter} without performing any + * state checks on the connection/stream. + */ + @Override + ChannelFuture writeFrame(ChannelHandlerContext ctx, byte frameType, int streamId, + Http2Flags flags, ByteBuf payload, ChannelPromise promise); } diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ConnectionHandler.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ConnectionHandler.java index 59a89f7938..19dbd0ef70 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ConnectionHandler.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ConnectionHandler.java @@ -317,7 +317,7 @@ public class Http2ConnectionHandler extends ByteToMessageDecoder implements Http ctx.flush(); if (stream != null) { - stream.terminateSent(); + stream.resetSent(); closeStream(stream, promise); } diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2FrameListener.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2FrameListener.java index 4ade646914..f5f91199a8 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2FrameListener.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2FrameListener.java @@ -41,11 +41,11 @@ public interface Http2FrameListener extends Http2DataListener { * @param streamId the subject stream for the frame. * @param headers the received headers. * @param padding the number of padding bytes found at the end of the frame. - * @param endStream Indicates whether this is the last frame to be sent from the remote endpoint + * @param endOfStream Indicates whether this is the last frame to be sent from the remote endpoint * for this stream. */ void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int padding, - boolean endStream) throws Http2Exception; + boolean endOfStream) throws Http2Exception; /** * Handles an inbound HEADERS frame with priority information specified. Only called if END_HEADERS encountered. @@ -70,11 +70,11 @@ public interface Http2FrameListener extends Http2DataListener { * @param weight the new weight for the stream. * @param exclusive whether or not the stream should be the exclusive dependent of its parent. * @param padding the number of padding bytes found at the end of the frame. - * @param endStream Indicates whether this is the last frame to be sent from the remote endpoint + * @param endOfStream Indicates whether this is the last frame to be sent from the remote endpoint * for this stream. */ void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, - int streamDependency, short weight, boolean exclusive, int padding, boolean endStream) + int streamDependency, short weight, boolean exclusive, int padding, boolean endOfStream) throws Http2Exception; /** diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2Stream.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2Stream.java index bf684e435c..cabbdb8f6f 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2Stream.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2Stream.java @@ -47,11 +47,6 @@ public interface Http2Stream { */ State state(); - /** - * Verifies that the stream is in one of the given allowed states. - */ - Http2Stream verifyState(Http2Error error, State... allowedStates) throws Http2Exception; - /** * If this is a reserved push stream, opens the stream for push in one direction. */ @@ -75,32 +70,56 @@ public interface Http2Stream { Http2Stream closeRemoteSide(); /** - * Indicates whether a RST_STREAM frame has been received from the remote endpoint for this stream. + * Indicates whether a frame with {@code END_STREAM} set was received from the remote endpoint + * for this stream. */ - boolean isTerminateReceived(); + boolean isEndOfStreamReceived(); /** - * Sets the flag indicating that a RST_STREAM frame has been received from the remote endpoint + * Sets the flag indicating that a frame with {@code END_STREAM} set was received from the + * remote endpoint for this stream. + */ + Http2Stream endOfStreamReceived(); + + /** + * Indicates whether a frame with {@code END_STREAM} set was sent to the remote endpoint for + * this stream. + */ + boolean isEndOfStreamSent(); + + /** + * Sets the flag indicating that a frame with {@code END_STREAM} set was sent to the remote + * endpoint for this stream. + */ + Http2Stream endOfStreamSent(); + + /** + * Indicates whether a {@code RST_STREAM} frame has been received from the remote endpoint for this stream. + */ + boolean isResetReceived(); + + /** + * Sets the flag indicating that a {@code RST_STREAM} frame has been received from the remote endpoint * for this stream. This does not affect the stream state. */ - void terminateReceived(); + Http2Stream resetReceived(); /** - * Indicates whether a RST_STREAM frame has been sent from the local endpoint for this stream. + * Indicates whether a {@code RST_STREAM} frame has been sent from the local endpoint for this stream. */ - boolean isTerminateSent(); + boolean isResetSent(); /** - * Sets the flag indicating that a RST_STREAM frame has been sent from the local endpoint + * Sets the flag indicating that a {@code RST_STREAM} frame has been sent from the local endpoint * for this stream. This does not affect the stream state. */ - void terminateSent(); + Http2Stream resetSent(); /** - * Indicates whether or not this stream has been terminated. This is a short form for - * {@link #isTerminateSent()} || {@link #isTerminateReceived()}. + * Indicates whether or not this stream has been reset. This is a short form for + * {@link #isResetSent()} || {@link #isResetReceived()}. */ - boolean isTerminated(); + boolean isReset(); /** * Indicates whether the remote side of this stream is open (i.e. the state is either diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2StreamException.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2StreamException.java index 03efb25f76..6cccaa975e 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2StreamException.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2StreamException.java @@ -38,4 +38,12 @@ public class Http2StreamException extends Http2Exception { public int streamId() { return streamId; } + + public static Http2StreamException format(int id, Http2Error error, String fmt, Object... args) { + return new Http2StreamException(id, error, String.format(fmt, args)); + } + + public static Http2StreamException streamClosedError(int id, String fmt, Object... args) { + return format(id, Http2Error.STREAM_CLOSED, fmt, args); + } } diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionDecoderTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionDecoderTest.java index b660542aad..1702ec41eb 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionDecoderTest.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionDecoderTest.java @@ -172,6 +172,48 @@ public class DefaultHttp2ConnectionDecoderTest { } } + @Test(expected = Http2StreamException.class) + public void dataReadForStreamInInvalidStateShouldThrow() throws Exception { + // Throw an exception when checking stream state. + when(stream.state()).thenReturn(Http2Stream.State.CLOSED); + final ByteBuf data = dummyData(); + try { + decode().onDataRead(ctx, STREAM_ID, data, 10, true); + } finally { + data.release(); + } + } + + @Test + public void dataReadAfterGoAwayForStreamInInvalidStateShouldIgnore() throws Exception { + // Throw an exception when checking stream state. + when(stream.state()).thenReturn(Http2Stream.State.CLOSED); + when(connection.goAwaySent()).thenReturn(true); + final ByteBuf data = dummyData(); + try { + decode().onDataRead(ctx, STREAM_ID, data, 10, true); + verify(inboundFlow, never()).onDataRead(eq(ctx), eq(STREAM_ID), eq(data), eq(10), eq(true)); + verify(listener, never()).onDataRead(eq(ctx), anyInt(), any(ByteBuf.class), anyInt(), anyBoolean()); + } finally { + data.release(); + } + } + + @Test + public void dataReadAfterRstStreamForStreamInInvalidStateShouldIgnore() throws Exception { + // Throw an exception when checking stream state. + when(stream.state()).thenReturn(Http2Stream.State.CLOSED); + when(stream.isResetSent()).thenReturn(true); + final ByteBuf data = dummyData(); + try { + decode().onDataRead(ctx, STREAM_ID, data, 10, true); + verify(inboundFlow).onDataRead(eq(ctx), eq(STREAM_ID), eq(data), eq(10), eq(true)); + verify(listener, never()).onDataRead(eq(ctx), anyInt(), any(ByteBuf.class), anyInt(), anyBoolean()); + } finally { + data.release(); + } + } + @Test public void dataReadWithEndOfStreamShouldCloseRemoteSide() throws Exception { final ByteBuf data = dummyData(); diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionEncoderTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionEncoderTest.java index a4b3097ff7..1ca69bcb84 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionEncoderTest.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionEncoderTest.java @@ -154,7 +154,7 @@ public class DefaultHttp2ConnectionEncoderTest { final ByteBuf data = dummyData(); try { ChannelFuture future = encoder.writeData(ctx, STREAM_ID, data, 0, false, promise); - assertTrue(future.awaitUninterruptibly().cause() instanceof Http2Exception); + assertTrue(future.awaitUninterruptibly().cause() instanceof IllegalStateException); } finally { while (data.refCnt() > 0) { data.release();