Ignore frames for streams that may have previously existed.

Motivation:

The recent PR that discarded the Http2StreamRemovalPolicy causes connection errors when receiving a frame for a stream that no longer exists. We should ignore these frames if we think there's a chance that the stream has existed previously

Modifications:

Modified the Http2Connection interface to provide a `streamMayHaveExisted` method. Also removed the requireStream() method to identify all of the places in the code that need to be updated.

Modified the encoder and decoder to properly handle cases where a stream may have existed but no longer does.

Result:

Fixes #3643
This commit is contained in:
nmittler 2015-04-21 14:18:02 -07:00
parent ed99766c26
commit ec6cd54f85
12 changed files with 260 additions and 140 deletions

View File

@ -113,17 +113,13 @@ public class DefaultHttp2Connection implements Http2Connection {
} }
@Override @Override
public Http2Stream requireStream(int streamId) throws Http2Exception { public Http2Stream stream(int streamId) {
Http2Stream stream = stream(streamId); return streamMap.get(streamId);
if (stream == null) {
throw connectionError(PROTOCOL_ERROR, "Stream does not exist %d", streamId);
}
return stream;
} }
@Override @Override
public Http2Stream stream(int streamId) { public boolean streamMayHaveExisted(int streamId) {
return streamMap.get(streamId); return remoteEndpoint.mayHaveCreatedStream(streamId) || localEndpoint.mayHaveCreatedStream(streamId);
} }
@Override @Override
@ -166,7 +162,7 @@ public class DefaultHttp2Connection implements Http2Connection {
forEachActiveStream(new Http2StreamVisitor() { forEachActiveStream(new Http2StreamVisitor() {
@Override @Override
public boolean visit(Http2Stream stream) { public boolean visit(Http2Stream stream) {
if (stream.id() > lastKnownStream && localEndpoint.createdStreamId(stream.id())) { if (stream.id() > lastKnownStream && localEndpoint.isValidStreamId(stream.id())) {
stream.close(); stream.close();
} }
return true; return true;
@ -197,7 +193,7 @@ public class DefaultHttp2Connection implements Http2Connection {
forEachActiveStream(new Http2StreamVisitor() { forEachActiveStream(new Http2StreamVisitor() {
@Override @Override
public boolean visit(Http2Stream stream) { public boolean visit(Http2Stream stream) {
if (stream.id() > lastKnownStream && remoteEndpoint.createdStreamId(stream.id())) { if (stream.id() > lastKnownStream && remoteEndpoint.isValidStreamId(stream.id())) {
stream.close(); stream.close();
} }
return true; return true;
@ -551,11 +547,11 @@ public class DefaultHttp2Connection implements Http2Connection {
} }
final DefaultEndpoint<? extends Http2FlowController> createdBy() { final DefaultEndpoint<? extends Http2FlowController> createdBy() {
return localEndpoint.createdStreamId(id) ? localEndpoint : remoteEndpoint; return localEndpoint.isValidStreamId(id) ? localEndpoint : remoteEndpoint;
} }
final boolean isLocal() { final boolean isLocal() {
return localEndpoint.createdStreamId(id); return localEndpoint.isValidStreamId(id);
} }
final void weight(short weight) { final void weight(short weight) {
@ -880,9 +876,14 @@ public class DefaultHttp2Connection implements Http2Connection {
} }
@Override @Override
public boolean createdStreamId(int streamId) { public boolean isValidStreamId(int streamId) {
boolean even = (streamId & 1) == 0; boolean even = (streamId & 1) == 0;
return server == even; return streamId > 0 && server == even;
}
@Override
public boolean mayHaveCreatedStream(int streamId) {
return isValidStreamId(streamId) && streamId <= lastStreamCreated;
} }
@Override @Override
@ -1021,7 +1022,7 @@ public class DefaultHttp2Connection implements Http2Connection {
if (streamId < 0) { if (streamId < 0) {
throw new Http2NoMoreStreamIdsException(); throw new Http2NoMoreStreamIdsException();
} }
if (!createdStreamId(streamId)) { if (!isValidStreamId(streamId)) {
throw connectionError(PROTOCOL_ERROR, "Request stream %d is not correct for %s connection", streamId, throw connectionError(PROTOCOL_ERROR, "Request stream %d is not correct for %s connection", streamId,
server ? "server" : "client"); server ? "server" : "client");
} }

View File

@ -21,6 +21,7 @@ import static io.netty.handler.codec.http2.Http2Exception.connectionError;
import static io.netty.handler.codec.http2.Http2Exception.streamError; import static io.netty.handler.codec.http2.Http2Exception.streamError;
import static io.netty.handler.codec.http2.Http2PromisedRequestVerifier.ALWAYS_VERIFY; import static io.netty.handler.codec.http2.Http2PromisedRequestVerifier.ALWAYS_VERIFY;
import static io.netty.handler.codec.http2.Http2Stream.State.CLOSED; import static io.netty.handler.codec.http2.Http2Stream.State.CLOSED;
import static io.netty.handler.codec.http2.Http2Stream.State.HALF_CLOSED_REMOTE;
import static io.netty.util.internal.ObjectUtil.checkNotNull; import static io.netty.util.internal.ObjectUtil.checkNotNull;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
@ -184,16 +185,20 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder {
@Override @Override
public int onDataRead(final ChannelHandlerContext ctx, int streamId, ByteBuf data, public int onDataRead(final ChannelHandlerContext ctx, int streamId, ByteBuf data,
int padding, boolean endOfStream) throws Http2Exception { int padding, boolean endOfStream) throws Http2Exception {
Http2Stream stream = connection.requireStream(streamId); Http2Stream stream = connection.stream(streamId);
Http2LocalFlowController flowController = flowController(); Http2LocalFlowController flowController = flowController();
int bytesToReturn = data.readableBytes() + padding; int bytesToReturn = data.readableBytes() + padding;
if (stream.isResetSent() || streamCreatedAfterGoAwaySent(stream)) { if (stream == null || stream.isResetSent() || streamCreatedAfterGoAwaySent(streamId)) {
// Count the frame towards the connection flow control window and don't process it further. // Ignoring this frame. We still need to count the frame towards the connection flow control
// window, but we immediately mark all bytes as consumed.
flowController.receiveFlowControlledFrame(ctx, stream, data, padding, endOfStream); flowController.receiveFlowControlledFrame(ctx, stream, data, padding, endOfStream);
flowController.consumeBytes(ctx, stream, bytesToReturn); flowController.consumeBytes(ctx, stream, bytesToReturn);
// Since no bytes are consumed, return them all. // Verify that the stream may have existed after we apply flow control.
verifyStreamMayHaveExisted(streamId);
// All bytes have been consumed.
return bytesToReturn; return bytesToReturn;
} }
@ -264,31 +269,40 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder {
public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int streamDependency, public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int streamDependency,
short weight, boolean exclusive, int padding, boolean endOfStream) throws Http2Exception { short weight, boolean exclusive, int padding, boolean endOfStream) throws Http2Exception {
Http2Stream stream = connection.stream(streamId); Http2Stream stream = connection.stream(streamId);
boolean allowHalfClosedRemote = false;
if (stream == null) { if (stream == null && !connection.streamMayHaveExisted(streamId)) {
stream = connection.remote().createStream(streamId).open(endOfStream); stream = connection.remote().createStream(streamId).open(endOfStream);
} else if (stream.isResetSent() || streamCreatedAfterGoAwaySent(stream)) { // Allow the state to be HALF_CLOSE_REMOTE if we're creating it in that state.
allowHalfClosedRemote = stream.state() == HALF_CLOSED_REMOTE;
}
if (stream == null || stream.isResetSent() || streamCreatedAfterGoAwaySent(streamId)) {
// Ignore this frame. // Ignore this frame.
return; return;
} else { }
switch (stream.state()) {
case RESERVED_REMOTE: switch (stream.state()) {
case IDLE: case RESERVED_REMOTE:
stream.open(endOfStream); case IDLE:
break; stream.open(endOfStream);
case OPEN: break;
case HALF_CLOSED_LOCAL: case OPEN:
// Allowed to receive headers in these states. case HALF_CLOSED_LOCAL:
break; // Allowed to receive headers in these states.
case HALF_CLOSED_REMOTE: break;
case CLOSED: case HALF_CLOSED_REMOTE:
if (!allowHalfClosedRemote) {
throw streamError(stream.id(), STREAM_CLOSED, "Stream %d in unexpected state: %s", throw streamError(stream.id(), STREAM_CLOSED, "Stream %d in unexpected state: %s",
stream.id(), stream.state()); stream.id(), stream.state());
default: }
// Connection error. break;
throw connectionError(PROTOCOL_ERROR, "Stream %d in unexpected state: %s", stream.id(), case CLOSED:
stream.state()); throw streamError(stream.id(), STREAM_CLOSED, "Stream %d in unexpected state: %s",
} stream.id(), stream.state());
default:
// Connection error.
throw connectionError(PROTOCOL_ERROR, "Stream %d in unexpected state: %s", stream.id(),
stream.state());
} }
try { try {
@ -315,10 +329,15 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder {
try { try {
if (stream == null) { if (stream == null) {
if (connection.streamMayHaveExisted(streamId)) {
// Ignore this frame.
return;
}
// PRIORITY frames always identify a stream. This means that if a PRIORITY frame is the // PRIORITY frames always identify a stream. This means that if a PRIORITY frame is the
// first frame to be received for a stream that we must create the stream. // first frame to be received for a stream that we must create the stream.
stream = connection.remote().createStream(streamId); stream = connection.remote().createStream(streamId);
} else if (streamCreatedAfterGoAwaySent(stream)) { } else if (streamCreatedAfterGoAwaySent(streamId)) {
// Ignore this frame. // Ignore this frame.
return; return;
} }
@ -336,7 +355,11 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder {
@Override @Override
public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode) throws Http2Exception { public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode) throws Http2Exception {
Http2Stream stream = connection.requireStream(streamId); Http2Stream stream = connection.stream(streamId);
if (stream == null) {
verifyStreamMayHaveExisted(streamId);
return;
}
switch(stream.state()) { switch(stream.state()) {
case IDLE: case IDLE:
@ -434,12 +457,16 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder {
@Override @Override
public void onPushPromiseRead(ChannelHandlerContext ctx, int streamId, int promisedStreamId, public void onPushPromiseRead(ChannelHandlerContext ctx, int streamId, int promisedStreamId,
Http2Headers headers, int padding) throws Http2Exception { Http2Headers headers, int padding) throws Http2Exception {
Http2Stream parentStream = connection.requireStream(streamId); Http2Stream parentStream = connection.stream(streamId);
if (streamCreatedAfterGoAwaySent(parentStream)) { if (streamCreatedAfterGoAwaySent(streamId)) {
return; return;
} }
if (parentStream == null) {
throw connectionError(PROTOCOL_ERROR, "Stream does not exist %d", streamId);
}
switch (parentStream.state()) { switch (parentStream.state()) {
case OPEN: case OPEN:
case HALF_CLOSED_LOCAL: case HALF_CLOSED_LOCAL:
@ -483,9 +510,10 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder {
@Override @Override
public void onWindowUpdateRead(ChannelHandlerContext ctx, int streamId, int windowSizeIncrement) public void onWindowUpdateRead(ChannelHandlerContext ctx, int streamId, int windowSizeIncrement)
throws Http2Exception { throws Http2Exception {
Http2Stream stream = connection.requireStream(streamId); Http2Stream stream = connection.stream(streamId);
if (stream == null || stream.state() == CLOSED || streamCreatedAfterGoAwaySent(streamId)) {
if (stream.state() == CLOSED || streamCreatedAfterGoAwaySent(stream)) { // Ignore this frame.
verifyStreamMayHaveExisted(streamId);
return; return;
} }
@ -501,10 +529,16 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder {
onUnknownFrame0(ctx, frameType, streamId, flags, payload); onUnknownFrame0(ctx, frameType, streamId, flags, payload);
} }
private boolean streamCreatedAfterGoAwaySent(Http2Stream stream) { private boolean streamCreatedAfterGoAwaySent(int streamId) {
// Ignore inbound frames after a GOAWAY was sent and the stream id is greater than // Ignore inbound frames after a GOAWAY was sent and the stream id is greater than
// the last stream id set in the GOAWAY frame. // the last stream id set in the GOAWAY frame.
return connection().goAwaySent() && stream.id() > connection().remote().lastKnownStream(); return connection.goAwaySent() && streamId > connection.remote().lastKnownStream();
}
private void verifyStreamMayHaveExisted(int streamId) throws Http2Exception {
if (!connection.streamMayHaveExisted(streamId)) {
throw connectionError(PROTOCOL_ERROR, "Stream does not exist %d", streamId);
}
} }
} }

View File

@ -112,7 +112,8 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
final boolean endOfStream, ChannelPromise promise) { final boolean endOfStream, ChannelPromise promise) {
final Http2Stream stream; final Http2Stream stream;
try { try {
stream = connection.requireStream(streamId); stream = requireStream(streamId);
// Verify that the stream is in the appropriate state for sending DATA frames. // Verify that the stream is in the appropriate state for sending DATA frames.
switch (stream.state()) { switch (stream.state()) {
case OPEN: case OPEN:
@ -250,7 +251,7 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
throw connectionError(PROTOCOL_ERROR, "Sending PUSH_PROMISE after GO_AWAY received."); throw connectionError(PROTOCOL_ERROR, "Sending PUSH_PROMISE after GO_AWAY received.");
} }
Http2Stream stream = connection.requireStream(streamId); Http2Stream stream = requireStream(streamId);
// Reserve the promised stream. // Reserve the promised stream.
connection.local().reservePushStream(promisedStreamId, stream); connection.local().reservePushStream(promisedStreamId, stream);
} catch (Throwable e) { } catch (Throwable e) {
@ -296,6 +297,20 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
return frameWriter.configuration(); return frameWriter.configuration();
} }
private Http2Stream requireStream(int streamId) {
Http2Stream stream = connection.stream(streamId);
if (stream == null) {
final String message;
if (connection.streamMayHaveExisted(streamId)) {
message = "Stream no longer exists: " + streamId;
} else {
message = "Stream does not exist: " + streamId;
}
throw new IllegalArgumentException(message);
}
return stream;
}
/** /**
* Wrap a DATA frame so it can be written subject to flow-control. Note that this implementation assumes it * Wrap a DATA frame so it can be written subject to flow-control. Note that this implementation assumes it
* only writes padding once for the entire payload as opposed to writing it once per-frame. This makes the * only writes padding once for the entire payload as opposed to writing it once per-frame. This makes the

View File

@ -123,16 +123,16 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController
@Override @Override
public void consumeBytes(ChannelHandlerContext ctx, Http2Stream stream, int numBytes) public void consumeBytes(ChannelHandlerContext ctx, Http2Stream stream, int numBytes)
throws Http2Exception { throws Http2Exception {
if (stream.id() == CONNECTION_STREAM_ID) {
throw new UnsupportedOperationException("Returning bytes for the connection window is not supported");
}
if (numBytes <= 0) {
throw new IllegalArgumentException("numBytes must be positive");
}
// Streams automatically consume all remaining bytes when they are closed, so just ignore // Streams automatically consume all remaining bytes when they are closed, so just ignore
// if already closed. // if already closed.
if (!isClosed(stream)) { if (stream != null && !isClosed(stream)) {
if (stream.id() == CONNECTION_STREAM_ID) {
throw new UnsupportedOperationException("Returning bytes for the connection window is not supported");
}
if (numBytes <= 0) {
throw new IllegalArgumentException("numBytes must be positive");
}
connectionState().consumeBytes(ctx, numBytes); connectionState().consumeBytes(ctx, numBytes);
state(stream).consumeBytes(ctx, numBytes); state(stream).consumeBytes(ctx, numBytes);
} }
@ -208,11 +208,10 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController
int dataLength = data.readableBytes() + padding; int dataLength = data.readableBytes() + padding;
// Apply the connection-level flow control // Apply the connection-level flow control
DefaultFlowState connectionState = connectionState(); DefaultFlowState connectionState = connectionState();
connectionState.receiveFlowControlledFrame(dataLength); connectionState.receiveFlowControlledFrame(dataLength);
if (!isClosed(stream)) { if (stream != null && !isClosed(stream)) {
// Apply the stream-level flow control // Apply the stream-level flow control
DefaultFlowState state = state(stream); DefaultFlowState state = state(stream);
state.endOfStream(endOfStream); state.endOfStream(endOfStream);

View File

@ -147,7 +147,13 @@ public interface Http2Connection {
* Indicates whether the given streamId is from the set of IDs used by this endpoint to * Indicates whether the given streamId is from the set of IDs used by this endpoint to
* create new streams. * create new streams.
*/ */
boolean createdStreamId(int streamId); boolean isValidStreamId(int streamId);
/**
* Indicates whether or not this endpoint may have created the given stream. This is {@code true} if
* {@link #isValidStreamId(int)} and {@code streamId} <= {@link #lastStreamCreated()}.
*/
boolean mayHaveCreatedStream(int streamId);
/** /**
* Indicates whether or not this endpoint is currently allowed to create new streams. This will be * Indicates whether or not this endpoint is currently allowed to create new streams. This will be
@ -263,16 +269,17 @@ public interface Http2Connection {
*/ */
void removeListener(Listener listener); void removeListener(Listener listener);
/**
* Attempts to get the stream for the given ID. If it doesn't exist, throws.
*/
Http2Stream requireStream(int streamId) throws Http2Exception;
/** /**
* Gets the stream if it exists. If not, returns {@code null}. * Gets the stream if it exists. If not, returns {@code null}.
*/ */
Http2Stream stream(int streamId); Http2Stream stream(int streamId);
/**
* Indicates whether or not the given stream may have existed within this connection. This is a short form
* for calling {@link Endpoint#mayHaveCreatedStream(int)} on both endpoints.
*/
boolean streamMayHaveExisted(int streamId);
/** /**
* Gets the stream object representing the connection, itself (i.e. stream zero). This object * Gets the stream object representing the connection, itself (i.e. stream zero). This object
* always exists. * always exists.

View File

@ -29,21 +29,17 @@ public interface Http2FrameListener {
* @param streamId the subject stream for the frame. * @param streamId the subject stream for the frame.
* @param data payload buffer for the frame. This buffer will be released by the codec. * @param data payload buffer for the frame. This buffer will be released by the codec.
* @param padding the number of padding bytes found at the end of the frame. * @param padding the number of padding bytes found at the end of the frame.
* @param endOfStream Indicates whether this is the last frame to be sent from the remote * @param endOfStream Indicates whether this is the last frame to be sent from the remote endpoint for this stream.
* endpoint for this stream. * @return the number of bytes that have been processed by the application. The returned bytes are used by the
* @return the number of bytes that have been processed by the application. The returned bytes * inbound flow controller to determine the appropriate time to expand the inbound flow control window (i.e. send
* are used by the inbound flow controller to determine the appropriate time to expand * {@code WINDOW_UPDATE}). Returning a value equal to the length of {@code data} + {@code padding} will effectively
* the inbound flow control window (i.e. send {@code WINDOW_UPDATE}). Returning a value * opt-out of application-level flow control for this frame. Returning a value less than the length of {@code data}
* equal to the length of {@code data} + {@code padding} will effectively opt-out of * + {@code padding} will defer the returning of the processed bytes, which the application must later return via
* application-level flow control for this frame. Returning a value less than the length * {@link Http2LocalFlowController#consumeBytes(ChannelHandlerContext, Http2Stream, int)}. The returned value must
* of {@code data} + {@code padding} will defer the returning of the processed bytes, * be >= {@code 0} and <= {@code data.readableBytes()} + {@code padding}.
* which the application must later return via
* {@link Http2InboundFlowState#returnProcessedBytes(ChannelHandlerContext, int)}. The
* returned value must be >= {@code 0} and <= {@code data.readableBytes()} +
* {@code padding}.
*/ */
int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding,
boolean endOfStream) throws Http2Exception; boolean endOfStream) throws Http2Exception;
/** /**
* Handles an inbound {@code HEADERS} frame. * Handles an inbound {@code HEADERS} frame.

View File

@ -24,20 +24,20 @@ import io.netty.channel.ChannelHandlerContext;
public interface Http2LocalFlowController extends Http2FlowController { public interface Http2LocalFlowController extends Http2FlowController {
/** /**
* Receives an inbound {@code DATA} frame from the remote endpoint and applies flow control * Receives an inbound {@code DATA} frame from the remote endpoint and applies flow control policies to it for both
* policies to it for both the {@code stream} as well as the connection. If any flow control * the {@code stream} as well as the connection. If any flow control policies have been violated, an exception is
* policies have been violated, an exception is raised immediately, otherwise the frame is * raised immediately, otherwise the frame is considered to have "passed" flow control.
* considered to have "passed" flow control.
* <p/> * <p/>
* If {@code stream} is closed, flow control should only be applied to the connection window. * If {@code stream} is {@code null} or closed, flow control should only be applied to the connection window and the
* bytes are immediately consumed.
* *
* @param ctx the context from the handler where the frame was read. * @param ctx the context from the handler where the frame was read.
* @param stream the subject stream for the received frame. The connection stream object must * @param stream the subject stream for the received frame. The connection stream object must not be used. If {@code
* not be used. * stream} is {@code null} or closed, flow control should only be applied to the connection window and the bytes are
* immediately consumed.
* @param data payload buffer for the frame. * @param data payload buffer for the frame.
* @param padding the number of padding bytes found at the end of the frame. * @param padding the number of padding bytes found at the end of the frame.
* @param endOfStream Indicates whether this is the last frame to be sent from the remote * @param endOfStream Indicates whether this is the last frame to be sent from the remote endpoint for this stream.
* endpoint for this stream.
* @throws Http2Exception if any flow control errors are encountered. * @throws Http2Exception if any flow control errors are encountered.
*/ */
void receiveFlowControlledFrame(ChannelHandlerContext ctx, Http2Stream stream, ByteBuf data, int padding, void receiveFlowControlledFrame(ChannelHandlerContext ctx, Http2Stream stream, ByteBuf data, int padding,
@ -49,13 +49,13 @@ public interface Http2LocalFlowController extends Http2FlowController {
* control window will collapse. Consuming bytes enables the flow controller to send {@code WINDOW_UPDATE} to * control window will collapse. Consuming bytes enables the flow controller to send {@code WINDOW_UPDATE} to
* restore a portion of the flow control window for the stream. * restore a portion of the flow control window for the stream.
* <p/> * <p/>
* If {@code stream} is closed (i.e. {@link Http2Stream#state()} method returns {@link Http2Stream.State#CLOSED}), * If {@code stream} is {@code null} or closed (i.e. {@link Http2Stream#state()} method returns {@link
* the consumed bytes are only restored to the connection window. When a stream is closed, the flow controller * Http2Stream.State#CLOSED}), calling this method has no effect.
* automatically restores any unconsumed bytes for that stream to the connection window. This is done to ensure that
* the connection window does not degrade over time as streams are closed.
* *
* @param ctx the channel handler context to use when sending a {@code WINDOW_UPDATE} if appropriate * @param ctx the channel handler context to use when sending a {@code WINDOW_UPDATE} if appropriate
* @param stream the stream for which window space should be freed. The connection stream object must not be used. * @param stream the stream for which window space should be freed. The connection stream object must not be used.
* If {@code stream} is {@code null} or closed (i.e. {@link Http2Stream#state()} method returns {@link
* Http2Stream.State#CLOSED}), calling this method has no effect.
* @param numBytes the number of bytes to be returned to the flow control window. * @param numBytes the number of bytes to be returned to the flow control window.
* @throws Http2Exception if the number of bytes returned exceeds the {@link #unconsumedBytes(Http2Stream)} for the * @throws Http2Exception if the number of bytes returned exceeds the {@link #unconsumedBytes(Http2Stream)} for the
* stream. * stream.

View File

@ -19,7 +19,6 @@ import static io.netty.buffer.Unpooled.wrappedBuffer;
import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT; import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT;
import static io.netty.handler.codec.http2.Http2CodecUtil.emptyPingBuf; import static io.netty.handler.codec.http2.Http2CodecUtil.emptyPingBuf;
import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR; import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR;
import static io.netty.handler.codec.http2.Http2Exception.connectionError;
import static io.netty.handler.codec.http2.Http2Stream.State.IDLE; import static io.netty.handler.codec.http2.Http2Stream.State.IDLE;
import static io.netty.handler.codec.http2.Http2Stream.State.OPEN; import static io.netty.handler.codec.http2.Http2Stream.State.OPEN;
import static io.netty.handler.codec.http2.Http2Stream.State.RESERVED_REMOTE; import static io.netty.handler.codec.http2.Http2Stream.State.RESERVED_REMOTE;
@ -38,7 +37,9 @@ import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.never; import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times; import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.UnpooledByteBufAllocator; import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.Channel; import io.netty.channel.Channel;
@ -47,10 +48,6 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise; import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultChannelPromise; import io.netty.channel.DefaultChannelPromise;
import io.netty.handler.codec.http2.Http2Exception.ClosedStreamCreationException; import io.netty.handler.codec.http2.Http2Exception.ClosedStreamCreationException;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.mockito.ArgumentCaptor; import org.mockito.ArgumentCaptor;
@ -59,6 +56,9 @@ import org.mockito.MockitoAnnotations;
import org.mockito.invocation.InvocationOnMock; import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer; import org.mockito.stubbing.Answer;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicInteger;
/** /**
* Tests for {@link DefaultHttp2ConnectionDecoder}. * Tests for {@link DefaultHttp2ConnectionDecoder}.
*/ */
@ -135,7 +135,7 @@ public class DefaultHttp2ConnectionDecoderTest {
} }
}).when(connection).forEachActiveStream(any(Http2StreamVisitor.class)); }).when(connection).forEachActiveStream(any(Http2StreamVisitor.class));
when(connection.stream(STREAM_ID)).thenReturn(stream); when(connection.stream(STREAM_ID)).thenReturn(stream);
when(connection.requireStream(STREAM_ID)).thenReturn(stream); when(connection.streamMayHaveExisted(STREAM_ID)).thenReturn(true);
when(connection.local()).thenReturn(local); when(connection.local()).thenReturn(local);
when(local.flowController()).thenReturn(localFlow); when(local.flowController()).thenReturn(localFlow);
when(encoder.flowController()).thenReturn(remoteFlow); when(encoder.flowController()).thenReturn(remoteFlow);
@ -182,6 +182,48 @@ public class DefaultHttp2ConnectionDecoderTest {
} }
} }
@Test(expected = Http2Exception.class)
public void dataReadForUnknownStreamShouldApplyFlowControlAndFail() throws Exception {
when(connection.streamMayHaveExisted(STREAM_ID)).thenReturn(false);
when(connection.stream(STREAM_ID)).thenReturn(null);
final ByteBuf data = dummyData();
int padding = 10;
int processedBytes = data.readableBytes() + padding;
try {
decode().onDataRead(ctx, STREAM_ID, data, padding, true);
} finally {
try {
verify(localFlow)
.receiveFlowControlledFrame(eq(ctx), eq((Http2Stream) null), eq(data), eq(padding), eq(true));
verify(localFlow).consumeBytes(eq(ctx), eq((Http2Stream) null), eq(processedBytes));
verifyNoMoreInteractions(localFlow);
verify(listener, never()).onDataRead(eq(ctx), anyInt(), any(ByteBuf.class), anyInt(), anyBoolean());
} finally {
data.release();
}
}
}
@Test
public void dataReadForUnknownStreamShouldApplyFlowControl() throws Exception {
when(connection.stream(STREAM_ID)).thenReturn(null);
final ByteBuf data = dummyData();
int padding = 10;
int processedBytes = data.readableBytes() + padding;
try {
decode().onDataRead(ctx, STREAM_ID, data, padding, true);
verify(localFlow)
.receiveFlowControlledFrame(eq(ctx), eq((Http2Stream) null), eq(data), eq(padding), eq(true));
verify(localFlow).consumeBytes(eq(ctx), eq((Http2Stream) null), eq(processedBytes));
verifyNoMoreInteractions(localFlow);
// Verify that the event was absorbed and not propagated to the observer.
verify(listener, never()).onDataRead(eq(ctx), anyInt(), any(ByteBuf.class), anyInt(), anyBoolean());
} finally {
data.release();
}
}
@Test @Test
public void emptyDataFrameShouldApplyFlowControl() throws Exception { public void emptyDataFrameShouldApplyFlowControl() throws Exception {
final ByteBuf data = EMPTY_BUFFER; final ByteBuf data = EMPTY_BUFFER;
@ -314,6 +356,19 @@ public class DefaultHttp2ConnectionDecoderTest {
verify(stream, never()).open(anyBoolean()); verify(stream, never()).open(anyBoolean());
} }
@Test
public void headersReadForUnknownStreamShouldBeIgnored() throws Exception {
when(connection.stream(STREAM_ID)).thenReturn(null);
decode().onHeadersRead(ctx, STREAM_ID, EmptyHttp2Headers.INSTANCE, 0, false);
verify(remote, never()).createStream(eq(STREAM_ID));
verify(stream, never()).open(anyBoolean());
// Verify that the event was absorbed and not propagated to the oberver.
verify(listener, never()).onHeadersRead(eq(ctx), anyInt(), any(Http2Headers.class), anyInt(), anyBoolean());
verify(remote, never()).createStream(anyInt());
verify(stream, never()).open(anyBoolean());
}
@Test @Test
public void headersReadForUnknownStreamShouldCreateStream() throws Exception { public void headersReadForUnknownStreamShouldCreateStream() throws Exception {
final int streamId = 5; final int streamId = 5;
@ -408,6 +463,12 @@ public class DefaultHttp2ConnectionDecoderTest {
verify(listener, never()).onPushPromiseRead(eq(ctx), anyInt(), anyInt(), any(Http2Headers.class), anyInt()); verify(listener, never()).onPushPromiseRead(eq(ctx), anyInt(), anyInt(), any(Http2Headers.class), anyInt());
} }
@Test(expected = Http2Exception.class)
public void pushPromiseReadForUnknownStreamShouldThrow() throws Exception {
when(connection.stream(STREAM_ID)).thenReturn(null);
decode().onPushPromiseRead(ctx, STREAM_ID, PUSH_STREAM_ID, EmptyHttp2Headers.INSTANCE, 0);
}
@Test @Test
public void pushPromiseReadShouldSucceed() throws Exception { public void pushPromiseReadShouldSucceed() throws Exception {
decode().onPushPromiseRead(ctx, STREAM_ID, PUSH_STREAM_ID, EmptyHttp2Headers.INSTANCE, 0); decode().onPushPromiseRead(ctx, STREAM_ID, PUSH_STREAM_ID, EmptyHttp2Headers.INSTANCE, 0);
@ -425,9 +486,17 @@ public class DefaultHttp2ConnectionDecoderTest {
} }
@Test @Test
public void priorityReadShouldSucceed() throws Exception { public void priorityReadForUnknownStreamShouldBeIgnored() throws Exception {
when(connection.stream(STREAM_ID)).thenReturn(null);
decode().onPriorityRead(ctx, STREAM_ID, 0, (short) 255, true);
verify(stream, never()).setPriority(anyInt(), anyShort(), anyBoolean());
verify(listener, never()).onPriorityRead(eq(ctx), anyInt(), anyInt(), anyShort(), anyBoolean());
}
@Test
public void priorityReadShouldCreateNewStream() throws Exception {
when(connection.streamMayHaveExisted(STREAM_ID)).thenReturn(false);
when(connection.stream(STREAM_ID)).thenReturn(null); when(connection.stream(STREAM_ID)).thenReturn(null);
when(connection.requireStream(STREAM_ID)).thenReturn(null);
decode().onPriorityRead(ctx, STREAM_ID, STREAM_DEPENDENCY_ID, (short) 255, true); decode().onPriorityRead(ctx, STREAM_ID, STREAM_DEPENDENCY_ID, (short) 255, true);
verify(stream).setPriority(eq(STREAM_DEPENDENCY_ID), eq((short) 255), eq(true)); verify(stream).setPriority(eq(STREAM_DEPENDENCY_ID), eq((short) 255), eq(true));
verify(listener).onPriorityRead(eq(ctx), eq(STREAM_ID), eq(STREAM_DEPENDENCY_ID), eq((short) 255), eq(true)); verify(listener).onPriorityRead(eq(ctx), eq(STREAM_ID), eq(STREAM_DEPENDENCY_ID), eq((short) 255), eq(true));
@ -435,25 +504,6 @@ public class DefaultHttp2ConnectionDecoderTest {
verify(stream, never()).open(anyBoolean()); verify(stream, never()).open(anyBoolean());
} }
@Test
public void priorityReadOnPreviouslyExistingStreamShouldSucceed() throws Exception {
doAnswer(new Answer<Http2Stream>() {
@Override
public Http2Stream answer(InvocationOnMock in) throws Throwable {
throw new ClosedStreamCreationException(Http2Error.INTERNAL_ERROR);
}
}).when(remote).createStream(eq(STREAM_ID));
when(connection.stream(STREAM_ID)).thenReturn(null);
when(connection.requireStream(STREAM_ID)).thenReturn(null);
// Just return the stream object as the connection stream to ensure the dependent stream "exists"
when(connection.stream(STREAM_DEPENDENCY_ID)).thenReturn(stream);
when(connection.requireStream(STREAM_DEPENDENCY_ID)).thenReturn(stream);
decode().onPriorityRead(ctx, STREAM_ID, STREAM_DEPENDENCY_ID, (short) 255, true);
verify(stream, never()).setPriority(anyInt(), anyShort(), anyBoolean());
verify(listener).onPriorityRead(eq(ctx), eq(STREAM_ID), eq(STREAM_DEPENDENCY_ID), eq((short) 255), eq(true));
verify(remote).createStream(STREAM_ID);
}
@Test @Test
public void priorityReadOnPreviouslyParentExistingStreamShouldSucceed() throws Exception { public void priorityReadOnPreviouslyParentExistingStreamShouldSucceed() throws Exception {
doAnswer(new Answer<Http2Stream>() { doAnswer(new Answer<Http2Stream>() {
@ -462,8 +512,6 @@ public class DefaultHttp2ConnectionDecoderTest {
throw new ClosedStreamCreationException(Http2Error.INTERNAL_ERROR); throw new ClosedStreamCreationException(Http2Error.INTERNAL_ERROR);
} }
}).when(stream).setPriority(eq(STREAM_DEPENDENCY_ID), eq((short) 255), eq(true)); }).when(stream).setPriority(eq(STREAM_DEPENDENCY_ID), eq((short) 255), eq(true));
when(connection.stream(STREAM_ID)).thenReturn(stream);
when(connection.requireStream(STREAM_ID)).thenReturn(stream);
decode().onPriorityRead(ctx, STREAM_ID, STREAM_DEPENDENCY_ID, (short) 255, true); decode().onPriorityRead(ctx, STREAM_ID, STREAM_DEPENDENCY_ID, (short) 255, true);
verify(stream).setPriority(eq(STREAM_DEPENDENCY_ID), eq((short) 255), eq(true)); verify(stream).setPriority(eq(STREAM_DEPENDENCY_ID), eq((short) 255), eq(true));
verify(listener).onPriorityRead(eq(ctx), eq(STREAM_ID), eq(STREAM_DEPENDENCY_ID), eq((short) 255), eq(true)); verify(listener).onPriorityRead(eq(ctx), eq(STREAM_ID), eq(STREAM_DEPENDENCY_ID), eq((short) 255), eq(true));
@ -479,8 +527,17 @@ public class DefaultHttp2ConnectionDecoderTest {
@Test(expected = Http2Exception.class) @Test(expected = Http2Exception.class)
public void windowUpdateReadForUnknownStreamShouldThrow() throws Exception { public void windowUpdateReadForUnknownStreamShouldThrow() throws Exception {
when(connection.requireStream(5)).thenThrow(connectionError(PROTOCOL_ERROR, "")); when(connection.streamMayHaveExisted(STREAM_ID)).thenReturn(false);
decode().onWindowUpdateRead(ctx, 5, 10); when(connection.stream(STREAM_ID)).thenReturn(null);
decode().onWindowUpdateRead(ctx, STREAM_ID, 10);
}
@Test
public void windowUpdateReadForUnknownStreamShouldBeIgnored() throws Exception {
when(connection.stream(STREAM_ID)).thenReturn(null);
decode().onWindowUpdateRead(ctx, STREAM_ID, 10);
verify(remoteFlow, never()).incrementWindowSize(eq(ctx), any(Http2Stream.class), anyInt());
verify(listener, never()).onWindowUpdateRead(eq(ctx), anyInt(), anyInt());
} }
@Test @Test
@ -500,8 +557,17 @@ public class DefaultHttp2ConnectionDecoderTest {
@Test(expected = Http2Exception.class) @Test(expected = Http2Exception.class)
public void rstStreamReadForUnknownStreamShouldThrow() throws Exception { public void rstStreamReadForUnknownStreamShouldThrow() throws Exception {
when(connection.requireStream(5)).thenThrow(connectionError(PROTOCOL_ERROR, "")); when(connection.streamMayHaveExisted(STREAM_ID)).thenReturn(false);
decode().onRstStreamRead(ctx, 5, PROTOCOL_ERROR.code()); when(connection.stream(STREAM_ID)).thenReturn(null);
decode().onRstStreamRead(ctx, STREAM_ID, PROTOCOL_ERROR.code());
}
@Test
public void rstStreamReadForUnknownStreamShouldBeIgnored() throws Exception {
when(connection.stream(STREAM_ID)).thenReturn(null);
decode().onRstStreamRead(ctx, STREAM_ID, PROTOCOL_ERROR.code());
verify(lifecycleManager, never()).closeStream(eq(stream), eq(future));
verify(listener, never()).onRstStreamRead(eq(ctx), anyInt(), anyLong());
} }
@Test @Test

View File

@ -148,7 +148,6 @@ public class DefaultHttp2ConnectionEncoderTest {
} }
}).when(connection).forEachActiveStream(any(Http2StreamVisitor.class)); }).when(connection).forEachActiveStream(any(Http2StreamVisitor.class));
when(connection.stream(STREAM_ID)).thenReturn(stream); when(connection.stream(STREAM_ID)).thenReturn(stream);
when(connection.requireStream(STREAM_ID)).thenReturn(stream);
when(connection.local()).thenReturn(local); when(connection.local()).thenReturn(local);
when(connection.remote()).thenReturn(remote); when(connection.remote()).thenReturn(remote);
when(remote.flowController()).thenReturn(remoteFlow); when(remote.flowController()).thenReturn(remoteFlow);
@ -357,7 +356,6 @@ public class DefaultHttp2ConnectionEncoderTest {
@Test @Test
public void priorityWriteShouldSetPriorityForStream() throws Exception { public void priorityWriteShouldSetPriorityForStream() throws Exception {
when(connection.stream(STREAM_ID)).thenReturn(null); when(connection.stream(STREAM_ID)).thenReturn(null);
when(connection.requireStream(STREAM_ID)).thenReturn(null);
encoder.writePriority(ctx, STREAM_ID, 0, (short) 255, true, promise); encoder.writePriority(ctx, STREAM_ID, 0, (short) 255, true, promise);
verify(stream).setPriority(eq(0), eq((short) 255), eq(true)); verify(stream).setPriority(eq(0), eq((short) 255), eq(true));
verify(writer).writePriority(eq(ctx), eq(STREAM_ID), eq(0), eq((short) 255), eq(true), eq(promise)); verify(writer).writePriority(eq(ctx), eq(STREAM_ID), eq(0), eq((short) 255), eq(true), eq(promise));
@ -374,10 +372,8 @@ public class DefaultHttp2ConnectionEncoderTest {
} }
}).when(local).createStream(eq(STREAM_ID)); }).when(local).createStream(eq(STREAM_ID));
when(connection.stream(STREAM_ID)).thenReturn(null); when(connection.stream(STREAM_ID)).thenReturn(null);
when(connection.requireStream(STREAM_ID)).thenReturn(null);
// Just return the stream object as the connection stream to ensure the dependent stream "exists" // Just return the stream object as the connection stream to ensure the dependent stream "exists"
when(connection.stream(0)).thenReturn(stream); when(connection.stream(0)).thenReturn(stream);
when(connection.requireStream(0)).thenReturn(stream);
encoder.writePriority(ctx, STREAM_ID, 0, (short) 255, true, promise); encoder.writePriority(ctx, STREAM_ID, 0, (short) 255, true, promise);
verify(stream, never()).setPriority(anyInt(), anyShort(), anyBoolean()); verify(stream, never()).setPriority(anyInt(), anyShort(), anyBoolean());
verify(writer).writePriority(eq(ctx), eq(STREAM_ID), eq(0), eq((short) 255), eq(true), eq(promise)); verify(writer).writePriority(eq(ctx), eq(STREAM_ID), eq(0), eq((short) 255), eq(true), eq(promise));
@ -393,7 +389,6 @@ public class DefaultHttp2ConnectionEncoderTest {
} }
}).when(stream).setPriority(eq(0), eq((short) 255), eq(true)); }).when(stream).setPriority(eq(0), eq((short) 255), eq(true));
when(connection.stream(STREAM_ID)).thenReturn(stream); when(connection.stream(STREAM_ID)).thenReturn(stream);
when(connection.requireStream(STREAM_ID)).thenReturn(stream);
encoder.writePriority(ctx, STREAM_ID, 0, (short) 255, true, promise); encoder.writePriority(ctx, STREAM_ID, 0, (short) 255, true, promise);
verify(stream).setPriority(eq(0), eq((short) 255), eq(true)); verify(stream).setPriority(eq(0), eq((short) 255), eq(true));
verify(writer).writePriority(eq(ctx), eq(STREAM_ID), eq(0), eq((short) 255), eq(true), eq(promise)); verify(writer).writePriority(eq(ctx), eq(STREAM_ID), eq(0), eq((short) 255), eq(true), eq(promise));

View File

@ -74,11 +74,6 @@ public class DefaultHttp2ConnectionTest {
client.addListener(clientListener); client.addListener(clientListener);
} }
@Test(expected = Http2Exception.class)
public void getStreamOrFailWithoutStreamShouldFail() throws Http2Exception {
server.requireStream(100);
}
@Test @Test
public void getStreamWithoutStreamShouldReturnNull() { public void getStreamWithoutStreamShouldReturnNull() {
assertNull(server.stream(100)); assertNull(server.stream(100));

View File

@ -217,6 +217,18 @@ public class DefaultHttp2LocalFlowControllerTest {
assertEquals(0, controller.unconsumedBytes(connection.connectionStream())); assertEquals(0, controller.unconsumedBytes(connection.connectionStream()));
} }
@Test
public void dataReceivedForNullStreamShouldImmediatelyConsumeBytes() throws Http2Exception {
receiveFlowControlledFrame(null, 10, 0, false);
assertEquals(0, controller.unconsumedBytes(connection.connectionStream()));
}
@Test
public void consumeBytesForNullStreamShouldIgnore() throws Http2Exception {
controller.consumeBytes(ctx, null, 10);
assertEquals(0, controller.unconsumedBytes(connection.connectionStream()));
}
@Test @Test
public void globalRatioShouldImpactStreams() throws Http2Exception { public void globalRatioShouldImpactStreams() throws Http2Exception {
float ratio = 0.6f; float ratio = 0.6f;
@ -311,7 +323,7 @@ public class DefaultHttp2LocalFlowControllerTest {
return stream(streamId).localFlowState().windowSize(); return stream(streamId).localFlowState().windowSize();
} }
private Http2Stream stream(int streamId) throws Http2Exception { private Http2Stream stream(int streamId) {
return connection.requireStream(streamId); return connection.stream(streamId);
} }
} }

View File

@ -1233,8 +1233,8 @@ public class DefaultHttp2RemoteFlowControllerTest {
return controller.streamableBytesForTree(stream); return controller.streamableBytesForTree(stream);
} }
private Http2Stream stream(int streamId) throws Http2Exception { private Http2Stream stream(int streamId) {
return connection.requireStream(streamId); return connection.stream(streamId);
} }
private static final class FakeFlowControlled implements Http2RemoteFlowController.FlowControlled { private static final class FakeFlowControlled implements Http2RemoteFlowController.FlowControlled {