Changing stream verification to throw Http2StreamException.

Motivation:

Currently when receiving DATA/HEADERS frames, we throw Http2Exception (a
connection error) instead of Http2StreamException (stream error).  This
is incorrect according to the HTTP/2 spec.

Modifications:

Updated various places in the encoder and decoder that were out of spec
WRT connection/state checking.

Result:

Stream state verification is properly handled.
This commit is contained in:
nmittler 2014-10-16 15:34:54 -07:00 committed by Frederic Bregier
parent 1e4bbe9839
commit 5759f1d396
11 changed files with 361 additions and 174 deletions

View File

@ -20,7 +20,6 @@ import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_PRIORITY_WEIGH
import static io.netty.handler.codec.http2.Http2CodecUtil.MAX_WEIGHT;
import static io.netty.handler.codec.http2.Http2CodecUtil.MIN_WEIGHT;
import static io.netty.handler.codec.http2.Http2CodecUtil.immediateRemovalPolicy;
import static io.netty.handler.codec.http2.Http2Exception.format;
import static io.netty.handler.codec.http2.Http2Exception.protocolError;
import static io.netty.handler.codec.http2.Http2Stream.State.CLOSED;
import static io.netty.handler.codec.http2.Http2Stream.State.HALF_CLOSED_LOCAL;
@ -215,8 +214,10 @@ public class DefaultHttp2Connection implements Http2Connection {
private DefaultStream parent;
private IntObjectMap<DefaultStream> children = newChildMap();
private int totalChildWeights;
private boolean terminateSent;
private boolean terminateReceived;
private boolean resetSent;
private boolean resetReceived;
private boolean endOfStreamSent;
private boolean endOfStreamReceived;
private FlowState inboundFlow;
private FlowState outboundFlow;
private EmbeddedChannel decompressor;
@ -237,28 +238,52 @@ public class DefaultHttp2Connection implements Http2Connection {
}
@Override
public boolean isTerminateReceived() {
return terminateReceived;
public boolean isEndOfStreamReceived() {
return endOfStreamReceived;
}
@Override
public void terminateReceived() {
terminateReceived = true;
public Http2Stream endOfStreamReceived() {
endOfStreamReceived = true;
return this;
}
@Override
public boolean isTerminateSent() {
return terminateSent;
public boolean isEndOfStreamSent() {
return endOfStreamSent;
}
@Override
public void terminateSent() {
terminateSent = true;
public Http2Stream endOfStreamSent() {
endOfStreamSent = true;
return this;
}
@Override
public boolean isTerminated() {
return terminateSent || terminateReceived;
public boolean isResetReceived() {
return resetReceived;
}
@Override
public Http2Stream resetReceived() {
resetReceived = true;
return this;
}
@Override
public boolean isResetSent() {
return resetSent;
}
@Override
public Http2Stream resetSent() {
resetSent = true;
return this;
}
@Override
public boolean isReset() {
return resetSent || resetReceived;
}
@Override
@ -394,16 +419,6 @@ public class DefaultHttp2Connection implements Http2Connection {
return this;
}
@Override
public Http2Stream verifyState(Http2Error error, State... allowedStates) throws Http2Exception {
for (State allowedState : allowedStates) {
if (state == allowedState) {
return this;
}
}
throw format(error, "Stream %d in unexpected state: %s", id, state);
}
@Override
public Http2Stream openForPush() throws Http2Exception {
switch (state) {
@ -632,11 +647,6 @@ public class DefaultHttp2Connection implements Http2Connection {
throw new UnsupportedOperationException();
}
@Override
public Http2Stream verifyState(Http2Error error, State... allowedStates) {
throw new UnsupportedOperationException();
}
@Override
public Http2Stream openForPush() {
throw new UnsupportedOperationException();

View File

@ -15,13 +15,10 @@
package io.netty.handler.codec.http2;
import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT;
import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR;
import static io.netty.handler.codec.http2.Http2Error.STREAM_CLOSED;
import static io.netty.handler.codec.http2.Http2Exception.protocolError;
import static io.netty.handler.codec.http2.Http2Stream.State.CLOSED;
import static io.netty.handler.codec.http2.Http2Stream.State.HALF_CLOSED_LOCAL;
import static io.netty.handler.codec.http2.Http2Stream.State.OPEN;
import static io.netty.handler.codec.http2.Http2Stream.State.RESERVED_REMOTE;
import static io.netty.handler.codec.http2.Http2StreamException.streamClosedError;
import static io.netty.util.internal.ObjectUtil.checkNotNull;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
@ -207,21 +204,61 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder {
// Check if we received a data frame for a stream which is half-closed
Http2Stream stream = connection.requireStream(streamId);
stream.verifyState(STREAM_CLOSED, OPEN, HALF_CLOSED_LOCAL);
// Apply flow control.
inboundFlow.onDataRead(ctx, streamId, data, padding, endOfStream);
verifyEndOfStreamNotReceived(stream);
verifyGoAwayNotReceived();
verifyRstStreamNotReceived(stream);
if (shouldIgnoreFrame(stream)) {
// Ignore this frame.
// We should ignore this frame if RST_STREAM was sent or if GO_AWAY was sent with a
// lower stream ID.
boolean shouldIgnore = shouldIgnoreFrame(stream);
boolean shouldApplyFlowControl = false;
Http2Exception error = null;
switch (stream.state()) {
case OPEN:
case HALF_CLOSED_LOCAL:
shouldApplyFlowControl = true;
break;
case HALF_CLOSED_REMOTE:
case CLOSED:
if (stream.isResetSent()) {
shouldApplyFlowControl = true;
}
if (!shouldIgnore) {
// Stream error.
error = streamClosedError(stream.id(), "Stream %d in unexpected state: %s",
stream.id(), stream.state());
}
break;
default:
if (!shouldIgnore) {
// Connection error.
error = protocolError("Stream %d in unexpected state: %s", stream.id(),
stream.state());
}
break;
}
// If we should apply flow control, do so now.
if (shouldApplyFlowControl) {
inboundFlow.onDataRead(ctx, streamId, data, padding, endOfStream);
}
// If we should ignore this frame, do so now.
if (shouldIgnore) {
return;
}
// If the stream was in an invalid state to receive the frame, throw the error.
if (error != null) {
throw error;
}
listener.onDataRead(ctx, streamId, data, padding, endOfStream);
if (endOfStream) {
stream.endOfStreamReceived();
lifecycleManager.closeRemoteSide(stream, ctx.newSucceededFuture());
}
}
@ -237,13 +274,13 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder {
@Override
public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int padding,
boolean endStream) throws Http2Exception {
onHeadersRead(ctx, streamId, headers, 0, DEFAULT_PRIORITY_WEIGHT, false, padding, endStream);
boolean endOfStream) throws Http2Exception {
onHeadersRead(ctx, streamId, headers, 0, DEFAULT_PRIORITY_WEIGHT, false, padding, endOfStream);
}
@Override
public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int streamDependency,
short weight, boolean exclusive, int padding, boolean endStream) throws Http2Exception {
short weight, boolean exclusive, int padding, boolean endOfStream) throws Http2Exception {
verifyPrefaceReceived();
Http2Stream stream = connection.stream(streamId);
@ -255,25 +292,40 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder {
}
if (stream == null) {
stream = connection.createRemoteStream(streamId, endStream);
stream = connection.createRemoteStream(streamId, endOfStream);
} else {
if (stream.state() == RESERVED_REMOTE) {
// Received headers for a reserved push stream ... open it for push to the local endpoint.
stream.verifyState(PROTOCOL_ERROR, RESERVED_REMOTE);
stream.openForPush();
} else {
// Receiving headers on an existing stream. Make sure the stream is in an allowed state.
stream.verifyState(PROTOCOL_ERROR, OPEN, HALF_CLOSED_LOCAL);
verifyEndOfStreamNotReceived(stream);
switch (stream.state()) {
case RESERVED_REMOTE:
// Received headers for a reserved push stream ... open it for push to the
// local endpoint.
stream.openForPush();
break;
case OPEN:
case HALF_CLOSED_LOCAL:
// Allowed to receive headers in these states.
break;
case HALF_CLOSED_REMOTE:
case CLOSED:
// Stream error.
throw streamClosedError(stream.id(), "Stream %d in unexpected state: %s",
stream.id(), stream.state());
default:
// Connection error.
throw protocolError("Stream %d in unexpected state: %s", stream.id(),
stream.state());
}
}
listener.onHeadersRead(ctx, streamId, headers,
streamDependency, weight, exclusive, padding, endStream);
streamDependency, weight, exclusive, padding, endOfStream);
stream.setPriority(streamDependency, weight, exclusive);
// If the headers completes this stream, close it.
if (endStream) {
if (endOfStream) {
stream.endOfStreamReceived();
lifecycleManager.closeRemoteSide(stream, ctx.newSucceededFuture());
}
}
@ -307,7 +359,7 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder {
return;
}
stream.terminateReceived();
stream.resetReceived();
listener.onRstStreamRead(ctx, streamId, errorCode);
@ -468,26 +520,40 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder {
}
// Also ignore inbound frames after we sent a RST_STREAM frame.
return stream.isTerminateSent();
return stream.isResetSent();
}
/**
* Verifies that a GO_AWAY frame was not previously received from the remote endpoint. If it was, throws an
* exception.
* Verifies that a frame has not been received from remote endpoint with the
* {@code END_STREAM} flag set. If it was, throws a connection error.
*/
private void verifyEndOfStreamNotReceived(Http2Stream stream) throws Http2Exception {
if (stream.isEndOfStreamReceived()) {
// Connection error.
throw new Http2Exception(STREAM_CLOSED, String.format(
"Received frame for stream %d after receiving END_STREAM", stream.id()));
}
}
/**
* Verifies that a GO_AWAY frame was not previously received from the remote endpoint. If it was, throws a
* connection error.
*/
private void verifyGoAwayNotReceived() throws Http2Exception {
if (connection.goAwayReceived()) {
// Connection error.
throw protocolError("Received frames after receiving GO_AWAY");
}
}
/**
* Verifies that a RST_STREAM frame was not previously received for the given stream. If it was, throws an
* exception.
* Verifies that a RST_STREAM frame was not previously received for the given stream. If it was, throws a
* stream error.
*/
private void verifyRstStreamNotReceived(Http2Stream stream) throws Http2Exception {
if (stream != null && stream.isTerminateReceived()) {
throw new Http2StreamException(stream.id(), STREAM_CLOSED,
if (stream != null && stream.isResetReceived()) {
// Stream error.
throw streamClosedError(stream.id(),
"Frame received after receiving RST_STREAM for stream: " + stream.id());
}
}

View File

@ -15,11 +15,7 @@
package io.netty.handler.codec.http2;
import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT;
import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR;
import static io.netty.handler.codec.http2.Http2Exception.protocolError;
import static io.netty.handler.codec.http2.Http2Stream.State.HALF_CLOSED_REMOTE;
import static io.netty.handler.codec.http2.Http2Stream.State.OPEN;
import static io.netty.handler.codec.http2.Http2Stream.State.RESERVED_LOCAL;
import static io.netty.util.internal.ObjectUtil.checkNotNull;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
@ -146,40 +142,58 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
@Override
public ChannelFuture writeData(final ChannelHandlerContext ctx, final int streamId, ByteBuf data, int padding,
final boolean endStream, ChannelPromise promise) {
boolean release = true;
final boolean endOfStream, ChannelPromise promise) {
try {
if (connection.isGoAway()) {
throw protocolError("Sending data after connection going away.");
throw new IllegalStateException("Sending data after connection going away.");
}
Http2Stream stream = connection.requireStream(streamId);
stream.verifyState(PROTOCOL_ERROR, OPEN, HALF_CLOSED_REMOTE);
// Hand control of the frame to the flow controller.
ChannelFuture future = outboundFlow.writeData(ctx, streamId, data, padding, endStream, promise);
release = false;
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
// The write failed, handle the error.
lifecycleManager.onException(ctx, future.cause());
} else if (endStream) {
// Close the local side of the stream if this is the last frame
Http2Stream stream = connection.stream(streamId);
lifecycleManager.closeLocalSide(stream, ctx.newPromise());
}
}
});
return future;
} catch (Http2Exception e) {
if (release) {
data.release();
if (stream.isResetSent()) {
throw new IllegalStateException("Sending data after sending RST_STREAM.");
}
if (stream.isEndOfStreamSent()) {
throw new IllegalStateException("Sending data after sending END_STREAM.");
}
// Verify that the stream is in the appropriate state for sending DATA frames.
switch (stream.state()) {
case OPEN:
case HALF_CLOSED_REMOTE:
// Allowed sending DATA frames in these states.
break;
default:
throw new IllegalStateException(String.format(
"Stream %d in unexpected state: %s", stream.id(), stream.state()));
}
if (endOfStream) {
// Indicate that we have sent END_STREAM.
stream.endOfStreamSent();
}
} catch (Throwable e) {
data.release();
return promise.setFailure(e);
}
// Hand control of the frame to the flow controller.
ChannelFuture future =
outboundFlow.writeData(ctx, streamId, data, padding, endOfStream, promise);
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
// The write failed, handle the error.
lifecycleManager.onException(ctx, future.cause());
} else if (endOfStream) {
// Close the local side of the stream if this is the last frame
Http2Stream stream = connection.stream(streamId);
lifecycleManager.closeLocalSide(stream, ctx.newPromise());
}
}
});
return future;
}
@Override
@ -190,47 +204,56 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
@Override
public ChannelFuture writeHeaders(ChannelHandlerContext ctx, int streamId, Http2Headers headers,
int streamDependency, short weight, boolean exclusive, int padding, boolean endStream,
int streamDependency, short weight, boolean exclusive, int padding, boolean endOfStream,
ChannelPromise promise) {
Http2Stream stream = connection.stream(streamId);
try {
if (connection.isGoAway()) {
throw protocolError("Sending headers after connection going away.");
}
Http2Stream stream = connection.stream(streamId);
if (stream == null) {
// Create a new locally-initiated stream.
stream = connection.createLocalStream(streamId, endStream);
stream = connection.createLocalStream(streamId, endOfStream);
} else {
// An existing stream...
if (stream.state() == RESERVED_LOCAL) {
// Sending headers on a reserved push stream ... open it for push to the remote
// endpoint.
stream.openForPush();
} else {
// The stream already exists, make sure it's in an allowed state.
stream.verifyState(PROTOCOL_ERROR, OPEN, HALF_CLOSED_REMOTE);
if (stream.isResetSent()) {
throw new IllegalStateException("Sending headers after sending RST_STREAM.");
}
if (stream.isEndOfStreamSent()) {
throw new IllegalStateException("Sending headers after sending END_STREAM.");
}
// Update the priority for this stream only if we'll be sending more data.
if (!endStream) {
stream.setPriority(streamDependency, weight, exclusive);
}
// An existing stream...
switch (stream.state()) {
case RESERVED_LOCAL:
// Sending headers on a reserved push stream ... open it for push to the remote endpoint.
stream.openForPush();
break;
case OPEN:
case HALF_CLOSED_REMOTE:
// Allowed sending headers in these states.
break;
default:
throw new IllegalStateException(String.format(
"Stream %d in unexpected state: %s", stream.id(), stream.state()));
}
}
ChannelFuture future = frameWriter.writeHeaders(ctx, streamId, headers, streamDependency, weight,
exclusive, padding, endStream, promise);
ctx.flush();
// If the headers are the end of the stream, close it now.
if (endStream) {
lifecycleManager.closeLocalSide(stream, promise);
}
return future;
} catch (Http2Exception e) {
} catch (Throwable e) {
return promise.setFailure(e);
}
ChannelFuture future =
frameWriter.writeHeaders(ctx, streamId, headers, streamDependency, weight,
exclusive, padding, endOfStream, promise);
ctx.flush();
// If the headers are the end of the stream, close it now.
if (endOfStream) {
stream.endOfStreamSent();
lifecycleManager.closeLocalSide(stream, promise);
}
return future;
}
@Override
@ -243,14 +266,15 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
// Update the priority on this stream.
connection.requireStream(streamId).setPriority(streamDependency, weight, exclusive);
ChannelFuture future = frameWriter.writePriority(ctx, streamId, streamDependency, weight, exclusive,
promise);
ctx.flush();
return future;
} catch (Http2Exception e) {
} catch (Throwable e) {
return promise.setFailure(e);
}
ChannelFuture future =
frameWriter.writePriority(ctx, streamId, streamDependency, weight, exclusive,
promise);
ctx.flush();
return future;
}
@Override
@ -286,7 +310,7 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
ctx.flush();
if (stream != null) {
stream.terminateSent();
stream.resetSent();
lifecycleManager.closeStream(stream, promise);
}
@ -294,7 +318,8 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
}
@Override
public ChannelFuture writeSettings(ChannelHandlerContext ctx, Http2Settings settings, ChannelPromise promise) {
public ChannelFuture writeSettings(ChannelHandlerContext ctx, Http2Settings settings,
ChannelPromise promise) {
outstandingLocalSettingsQueue.add(settings);
try {
if (connection.isGoAway()) {
@ -305,13 +330,13 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
if (pushEnabled != null && connection.isServer()) {
throw protocolError("Server sending SETTINGS frame with ENABLE_PUSH specified");
}
ChannelFuture future = frameWriter.writeSettings(ctx, settings, promise);
ctx.flush();
return future;
} catch (Http2Exception e) {
} catch (Throwable e) {
return promise.setFailure(e);
}
ChannelFuture future = frameWriter.writeSettings(ctx, settings, promise);
ctx.flush();
return future;
}
@Override
@ -320,23 +345,16 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
}
@Override
public ChannelFuture writePing(ChannelHandlerContext ctx, boolean ack, ByteBuf data, ChannelPromise promise) {
boolean release = true;
try {
if (connection.isGoAway()) {
throw protocolError("Sending ping after connection going away.");
}
frameWriter.writePing(ctx, ack, data, promise);
release = false;
ctx.flush();
return promise;
} catch (Http2Exception e) {
if (release) {
data.release();
}
return promise.setFailure(e);
public ChannelFuture writePing(ChannelHandlerContext ctx, boolean ack, ByteBuf data,
ChannelPromise promise) {
if (connection.isGoAway()) {
data.release();
return promise.setFailure(protocolError("Sending ping after connection going away."));
}
ChannelFuture future = frameWriter.writePing(ctx, ack, data, promise);
ctx.flush();
return future;
}
@Override
@ -350,14 +368,16 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
// Reserve the promised stream.
Http2Stream stream = connection.requireStream(streamId);
connection.local().reservePushStream(promisedStreamId, stream);
// Write the frame.
frameWriter.writePushPromise(ctx, streamId, promisedStreamId, headers, padding, promise);
ctx.flush();
return promise;
} catch (Http2Exception e) {
} catch (Throwable e) {
return promise.setFailure(e);
}
// Write the frame.
ChannelFuture future =
frameWriter.writePushPromise(ctx, streamId, promisedStreamId, headers, padding,
promise);
ctx.flush();
return future;
}
@Override
@ -369,6 +389,12 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
@Override
public ChannelFuture writeWindowUpdate(ChannelHandlerContext ctx, int streamId, int windowSizeIncrement,
ChannelPromise promise) {
if (streamId > 0) {
Http2Stream stream = connection().stream(streamId);
if (stream != null && stream.isResetSent()) {
throw new IllegalStateException("Sending data after sending RST_STREAM.");
}
}
return frameWriter.writeWindowUpdate(ctx, streamId, windowSizeIncrement, promise);
}

View File

@ -21,7 +21,10 @@ import java.io.Closeable;
import java.util.List;
/**
* Handler for inbound traffic on behalf of {@link Http2ConnectionHandler}.
* Handler for inbound traffic on behalf of {@link Http2ConnectionHandler}. Performs basic protocol
* conformance on inbound frames before calling the delegate {@link Http2FrameListener} for
* application-specific processing. Note that frames of an unknown type (i.e. HTTP/2 extensions)
* will skip all protocol checks and be given directly to the listener for processing.
*/
public interface Http2ConnectionDecoder extends Closeable {

View File

@ -14,6 +14,11 @@
*/
package io.netty.handler.codec.http2;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
/**
* Handler for outbound traffic on behalf of {@link Http2ConectionHandler}.
@ -71,4 +76,12 @@ public interface Http2ConnectionEncoder extends Http2FrameWriter, Http2OutboundF
* Sets the settings for the remote endpoint of the HTTP/2 connection.
*/
void remoteSettings(Http2Settings settings) throws Http2Exception;
/**
* Writes the given data to the internal {@link Http2FrameWriter} without performing any
* state checks on the connection/stream.
*/
@Override
ChannelFuture writeFrame(ChannelHandlerContext ctx, byte frameType, int streamId,
Http2Flags flags, ByteBuf payload, ChannelPromise promise);
}

View File

@ -317,7 +317,7 @@ public class Http2ConnectionHandler extends ByteToMessageDecoder implements Http
ctx.flush();
if (stream != null) {
stream.terminateSent();
stream.resetSent();
closeStream(stream, promise);
}

View File

@ -41,11 +41,11 @@ public interface Http2FrameListener extends Http2DataListener {
* @param streamId the subject stream for the frame.
* @param headers the received headers.
* @param padding the number of padding bytes found at the end of the frame.
* @param endStream Indicates whether this is the last frame to be sent from the remote endpoint
* @param endOfStream Indicates whether this is the last frame to be sent from the remote endpoint
* for this stream.
*/
void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int padding,
boolean endStream) throws Http2Exception;
boolean endOfStream) throws Http2Exception;
/**
* Handles an inbound HEADERS frame with priority information specified. Only called if END_HEADERS encountered.
@ -70,11 +70,11 @@ public interface Http2FrameListener extends Http2DataListener {
* @param weight the new weight for the stream.
* @param exclusive whether or not the stream should be the exclusive dependent of its parent.
* @param padding the number of padding bytes found at the end of the frame.
* @param endStream Indicates whether this is the last frame to be sent from the remote endpoint
* @param endOfStream Indicates whether this is the last frame to be sent from the remote endpoint
* for this stream.
*/
void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers,
int streamDependency, short weight, boolean exclusive, int padding, boolean endStream)
int streamDependency, short weight, boolean exclusive, int padding, boolean endOfStream)
throws Http2Exception;
/**

View File

@ -47,11 +47,6 @@ public interface Http2Stream {
*/
State state();
/**
* Verifies that the stream is in one of the given allowed states.
*/
Http2Stream verifyState(Http2Error error, State... allowedStates) throws Http2Exception;
/**
* If this is a reserved push stream, opens the stream for push in one direction.
*/
@ -75,32 +70,56 @@ public interface Http2Stream {
Http2Stream closeRemoteSide();
/**
* Indicates whether a RST_STREAM frame has been received from the remote endpoint for this stream.
* Indicates whether a frame with {@code END_STREAM} set was received from the remote endpoint
* for this stream.
*/
boolean isTerminateReceived();
boolean isEndOfStreamReceived();
/**
* Sets the flag indicating that a RST_STREAM frame has been received from the remote endpoint
* Sets the flag indicating that a frame with {@code END_STREAM} set was received from the
* remote endpoint for this stream.
*/
Http2Stream endOfStreamReceived();
/**
* Indicates whether a frame with {@code END_STREAM} set was sent to the remote endpoint for
* this stream.
*/
boolean isEndOfStreamSent();
/**
* Sets the flag indicating that a frame with {@code END_STREAM} set was sent to the remote
* endpoint for this stream.
*/
Http2Stream endOfStreamSent();
/**
* Indicates whether a {@code RST_STREAM} frame has been received from the remote endpoint for this stream.
*/
boolean isResetReceived();
/**
* Sets the flag indicating that a {@code RST_STREAM} frame has been received from the remote endpoint
* for this stream. This does not affect the stream state.
*/
void terminateReceived();
Http2Stream resetReceived();
/**
* Indicates whether a RST_STREAM frame has been sent from the local endpoint for this stream.
* Indicates whether a {@code RST_STREAM} frame has been sent from the local endpoint for this stream.
*/
boolean isTerminateSent();
boolean isResetSent();
/**
* Sets the flag indicating that a RST_STREAM frame has been sent from the local endpoint
* Sets the flag indicating that a {@code RST_STREAM} frame has been sent from the local endpoint
* for this stream. This does not affect the stream state.
*/
void terminateSent();
Http2Stream resetSent();
/**
* Indicates whether or not this stream has been terminated. This is a short form for
* {@link #isTerminateSent()} || {@link #isTerminateReceived()}.
* Indicates whether or not this stream has been reset. This is a short form for
* {@link #isResetSent()} || {@link #isResetReceived()}.
*/
boolean isTerminated();
boolean isReset();
/**
* Indicates whether the remote side of this stream is open (i.e. the state is either

View File

@ -38,4 +38,12 @@ public class Http2StreamException extends Http2Exception {
public int streamId() {
return streamId;
}
public static Http2StreamException format(int id, Http2Error error, String fmt, Object... args) {
return new Http2StreamException(id, error, String.format(fmt, args));
}
public static Http2StreamException streamClosedError(int id, String fmt, Object... args) {
return format(id, Http2Error.STREAM_CLOSED, fmt, args);
}
}

View File

@ -172,6 +172,48 @@ public class DefaultHttp2ConnectionDecoderTest {
}
}
@Test(expected = Http2StreamException.class)
public void dataReadForStreamInInvalidStateShouldThrow() throws Exception {
// Throw an exception when checking stream state.
when(stream.state()).thenReturn(Http2Stream.State.CLOSED);
final ByteBuf data = dummyData();
try {
decode().onDataRead(ctx, STREAM_ID, data, 10, true);
} finally {
data.release();
}
}
@Test
public void dataReadAfterGoAwayForStreamInInvalidStateShouldIgnore() throws Exception {
// Throw an exception when checking stream state.
when(stream.state()).thenReturn(Http2Stream.State.CLOSED);
when(connection.goAwaySent()).thenReturn(true);
final ByteBuf data = dummyData();
try {
decode().onDataRead(ctx, STREAM_ID, data, 10, true);
verify(inboundFlow, never()).onDataRead(eq(ctx), eq(STREAM_ID), eq(data), eq(10), eq(true));
verify(listener, never()).onDataRead(eq(ctx), anyInt(), any(ByteBuf.class), anyInt(), anyBoolean());
} finally {
data.release();
}
}
@Test
public void dataReadAfterRstStreamForStreamInInvalidStateShouldIgnore() throws Exception {
// Throw an exception when checking stream state.
when(stream.state()).thenReturn(Http2Stream.State.CLOSED);
when(stream.isResetSent()).thenReturn(true);
final ByteBuf data = dummyData();
try {
decode().onDataRead(ctx, STREAM_ID, data, 10, true);
verify(inboundFlow).onDataRead(eq(ctx), eq(STREAM_ID), eq(data), eq(10), eq(true));
verify(listener, never()).onDataRead(eq(ctx), anyInt(), any(ByteBuf.class), anyInt(), anyBoolean());
} finally {
data.release();
}
}
@Test
public void dataReadWithEndOfStreamShouldCloseRemoteSide() throws Exception {
final ByteBuf data = dummyData();

View File

@ -154,7 +154,7 @@ public class DefaultHttp2ConnectionEncoderTest {
final ByteBuf data = dummyData();
try {
ChannelFuture future = encoder.writeData(ctx, STREAM_ID, data, 0, false, promise);
assertTrue(future.awaitUninterruptibly().cause() instanceof Http2Exception);
assertTrue(future.awaitUninterruptibly().cause() instanceof IllegalStateException);
} finally {
while (data.refCnt() > 0) {
data.release();