Http/2 Priority on CLOSED stream

Motivation:
The encoder/decoder currently do not handle streams which have previously existed but no longer exist because they were closed. The specification requires supporting this.

Modifications:
- encoder/decoder should tolerate the frame or the dependent frame not existing in the streams map due to the fact that it may have previously existed.

Result:
encoder/decoder are more compliant with the specification.
This commit is contained in:
Scott Mitchell 2015-03-25 16:29:14 -07:00
parent 0d3a6e0511
commit ab74dccd23
7 changed files with 161 additions and 46 deletions

View File

@ -22,6 +22,7 @@ import static io.netty.handler.codec.http2.Http2CodecUtil.MIN_WEIGHT;
import static io.netty.handler.codec.http2.Http2CodecUtil.immediateRemovalPolicy; import static io.netty.handler.codec.http2.Http2CodecUtil.immediateRemovalPolicy;
import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR; import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR;
import static io.netty.handler.codec.http2.Http2Error.REFUSED_STREAM; import static io.netty.handler.codec.http2.Http2Error.REFUSED_STREAM;
import static io.netty.handler.codec.http2.Http2Exception.closedStreamError;
import static io.netty.handler.codec.http2.Http2Exception.connectionError; import static io.netty.handler.codec.http2.Http2Exception.connectionError;
import static io.netty.handler.codec.http2.Http2Exception.streamError; import static io.netty.handler.codec.http2.Http2Exception.streamError;
import static io.netty.handler.codec.http2.Http2Stream.State.CLOSED; import static io.netty.handler.codec.http2.Http2Stream.State.CLOSED;
@ -689,8 +690,7 @@ public class DefaultHttp2Connection implements Http2Connection {
@Override @Override
public int nextStreamId() { public int nextStreamId() {
// For manually created client-side streams, 1 is reserved for HTTP upgrade, so // For manually created client-side streams, 1 is reserved for HTTP upgrade, so start at 3.
// start at 3.
return nextStreamId > 1 ? nextStreamId : nextStreamId + 2; return nextStreamId > 1 ? nextStreamId : nextStreamId + 2;
} }
@ -836,24 +836,22 @@ public class DefaultHttp2Connection implements Http2Connection {
if (isGoAway()) { if (isGoAway()) {
throw connectionError(PROTOCOL_ERROR, "Cannot create a stream since the connection is going away"); throw connectionError(PROTOCOL_ERROR, "Cannot create a stream since the connection is going away");
} }
verifyStreamId(streamId);
if (!canCreateStream()) {
throw connectionError(REFUSED_STREAM, "Maximum streams exceeded for this endpoint.");
}
}
private void verifyStreamId(int streamId) throws Http2Exception {
if (streamId < 0) { if (streamId < 0) {
throw new Http2NoMoreStreamIdsException(); throw new Http2NoMoreStreamIdsException();
} }
if (streamId < nextStreamId) {
throw connectionError(PROTOCOL_ERROR, "Request stream %d is behind the next expected stream %d",
streamId, nextStreamId);
}
if (!createdStreamId(streamId)) { if (!createdStreamId(streamId)) {
throw connectionError(PROTOCOL_ERROR, "Request stream %d is not correct for %s connection", throw connectionError(PROTOCOL_ERROR, "Request stream %d is not correct for %s connection",
streamId, server ? "server" : "client"); streamId, server ? "server" : "client");
} }
// This check must be after all id validated checks, but before the max streams check because it may be
// recoverable to some degree for handling frames which can be sent on closed streams.
if (streamId < nextStreamId) {
throw closedStreamError(PROTOCOL_ERROR, "Request stream %d is behind the next expected stream %d",
streamId, nextStreamId);
}
if (!canCreateStream()) {
throw connectionError(REFUSED_STREAM, "Maximum streams exceeded for this endpoint.");
}
} }
private boolean isLocal() { private boolean isLocal() {

View File

@ -24,6 +24,7 @@ import static io.netty.handler.codec.http2.Http2Stream.State.CLOSED;
import static io.netty.util.internal.ObjectUtil.checkNotNull; import static io.netty.util.internal.ObjectUtil.checkNotNull;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http2.Http2Exception.ClosedStreamCreationException;
import java.util.List; import java.util.List;
@ -374,6 +375,7 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder {
return; return;
} }
try {
if (stream == null) { if (stream == null) {
// PRIORITY frames always identify a stream. This means that if a PRIORITY frame is the // PRIORITY frames always identify a stream. This means that if a PRIORITY frame is the
// first frame to be received for a stream that we must create the stream. // first frame to be received for a stream that we must create the stream.
@ -383,6 +385,10 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder {
// This call will create a stream for streamDependency if necessary. // This call will create a stream for streamDependency if necessary.
// For this reason it must be done before notifying the listener. // For this reason it must be done before notifying the listener.
stream.setPriority(streamDependency, weight, exclusive); stream.setPriority(streamDependency, weight, exclusive);
} catch (ClosedStreamCreationException ignored) {
// It is possible that either the stream for this frame or the parent stream is closed.
// In this case we should ignore the exception and allow the frame to be sent.
}
listener.onPriorityRead(ctx, streamId, streamDependency, weight, exclusive); listener.onPriorityRead(ctx, streamId, streamDependency, weight, exclusive);
} }

View File

@ -24,6 +24,7 @@ import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise; import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http2.Http2Exception.ClosedStreamCreationException;
import io.netty.util.ReferenceCountUtil; import io.netty.util.ReferenceCountUtil;
import java.util.ArrayDeque; import java.util.ArrayDeque;
@ -238,9 +239,14 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
stream = connection.local().createStream(streamId); stream = connection.local().createStream(streamId);
} }
// The set priority operation must be done before sending the frame. The parent may not yet exist
// and the priority tree may also be modified before sending.
stream.setPriority(streamDependency, weight, exclusive); stream.setPriority(streamDependency, weight, exclusive);
} catch (Throwable e) { } catch (ClosedStreamCreationException ignored) {
return promise.setFailure(e); // It is possible that either the stream for this frame or the parent stream is closed.
// In this case we should ignore the exception and allow the frame to be sent.
} catch (Throwable t) {
return promise.setFailure(t);
} }
ChannelFuture future = frameWriter.writePriority(ctx, streamId, streamDependency, weight, exclusive, promise); ChannelFuture future = frameWriter.writePriority(ctx, streamId, streamDependency, weight, exclusive, promise);

View File

@ -72,6 +72,18 @@ public class Http2Exception extends Exception {
return new Http2Exception(error, String.format(fmt, args), cause); return new Http2Exception(error, String.format(fmt, args), cause);
} }
/**
* Use if an error has occurred which can not be isolated to a single stream, but instead applies
* to the entire connection.
* @param error The type of error as defined by the HTTP/2 specification.
* @param fmt String with the content and format for the additional debug data.
* @param args Objects which fit into the format defined by {@code fmt}.
* @return An exception which can be translated into a HTTP/2 error.
*/
public static Http2Exception closedStreamError(Http2Error error, String fmt, Object... args) {
return new ClosedStreamCreationException(error, String.format(fmt, args));
}
/** /**
* Use if an error which can be isolated to a single stream has occurred. If the {@code id} is not * Use if an error which can be isolated to a single stream has occurred. If the {@code id} is not
* {@link Http2CodecUtil#CONNECTION_STREAM_ID} then a {@link Http2Exception.StreamException} will be returned. * {@link Http2CodecUtil#CONNECTION_STREAM_ID} then a {@link Http2Exception.StreamException} will be returned.
@ -130,6 +142,25 @@ public class Http2Exception extends Exception {
return isStreamError(e) ? ((StreamException) e).streamId() : CONNECTION_STREAM_ID; return isStreamError(e) ? ((StreamException) e).streamId() : CONNECTION_STREAM_ID;
} }
/**
* Used when a stream creation attempt fails but may be because the stream was previously closed.
*/
public static final class ClosedStreamCreationException extends Http2Exception {
private static final long serialVersionUID = -1911637707391622439L;
public ClosedStreamCreationException(Http2Error error) {
super(error);
}
public ClosedStreamCreationException(Http2Error error, String message) {
super(error, message);
}
public ClosedStreamCreationException(Http2Error error, String message, Throwable cause) {
super(error, message, cause);
}
}
/** /**
* Represents an exception that can be isolated to a single stream (as opposed to the entire connection). * Represents an exception that can be isolated to a single stream (as opposed to the entire connection).
*/ */

View File

@ -46,10 +46,10 @@ public interface Http2FrameListener {
boolean endOfStream) throws Http2Exception; boolean endOfStream) throws Http2Exception;
/** /**
* Handles an inbound HEADERS frame. * Handles an inbound {@code HEADERS} frame.
* <p> * <p>
* Only one of the following methods will be called for each HEADERS frame sequence. * Only one of the following methods will be called for each {@code HEADERS} frame sequence.
* One will be called when the END_HEADERS flag has been received. * One will be called when the {@code END_HEADERS} flag has been received.
* <ul> * <ul>
* <li>{@link #onHeadersRead(ChannelHandlerContext, int, Http2Headers, int, boolean)}</li> * <li>{@link #onHeadersRead(ChannelHandlerContext, int, Http2Headers, int, boolean)}</li>
* <li>{@link #onHeadersRead(ChannelHandlerContext, int, Http2Headers, int, short, boolean, int, boolean)}</li> * <li>{@link #onHeadersRead(ChannelHandlerContext, int, Http2Headers, int, short, boolean, int, boolean)}</li>
@ -70,11 +70,11 @@ public interface Http2FrameListener {
boolean endOfStream) throws Http2Exception; boolean endOfStream) throws Http2Exception;
/** /**
* Handles an inbound HEADERS frame with priority information specified. Only called if END_HEADERS encountered. * Handles an inbound {@code HEADERS} frame with priority information specified.
* * Only called if {@code END_HEADERS} encountered.
* <p> * <p>
* Only one of the following methods will be called for each HEADERS frame sequence. * Only one of the following methods will be called for each {@code HEADERS} frame sequence.
* One will be called when the END_HEADERS flag has been received. * One will be called when the {@code END_HEADERS} flag has been received.
* <ul> * <ul>
* <li>{@link #onHeadersRead(ChannelHandlerContext, int, Http2Headers, int, boolean)}</li> * <li>{@link #onHeadersRead(ChannelHandlerContext, int, Http2Headers, int, boolean)}</li>
* <li>{@link #onHeadersRead(ChannelHandlerContext, int, Http2Headers, int, short, boolean, int, boolean)}</li> * <li>{@link #onHeadersRead(ChannelHandlerContext, int, Http2Headers, int, short, boolean, int, boolean)}</li>
@ -100,7 +100,11 @@ public interface Http2FrameListener {
throws Http2Exception; throws Http2Exception;
/** /**
* Handles an inbound PRIORITY frame. * Handles an inbound {@code PRIORITY} frame.
* <p>
* Note that is it possible to have this method called and no stream object exist for either
* {@code streamId}, {@code streamDependency}, or both. This is because the {@code PRIORITY} frame can be
* sent/received when streams are in the {@code CLOSED} state.
* *
* @param ctx the context from the handler where the frame was read. * @param ctx the context from the handler where the frame was read.
* @param streamId the subject stream for the frame. * @param streamId the subject stream for the frame.
@ -113,7 +117,7 @@ public interface Http2FrameListener {
short weight, boolean exclusive) throws Http2Exception; short weight, boolean exclusive) throws Http2Exception;
/** /**
* Handles an inbound RST_STREAM frame. * Handles an inbound {@code RST_STREAM} frame.
* *
* @param ctx the context from the handler where the frame was read. * @param ctx the context from the handler where the frame was read.
* @param streamId the stream that is terminating. * @param streamId the stream that is terminating.
@ -122,13 +126,13 @@ public interface Http2FrameListener {
void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode) throws Http2Exception; void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode) throws Http2Exception;
/** /**
* Handles an inbound SETTINGS acknowledgment frame. * Handles an inbound {@code SETTINGS} acknowledgment frame.
* @param ctx the context from the handler where the frame was read. * @param ctx the context from the handler where the frame was read.
*/ */
void onSettingsAckRead(ChannelHandlerContext ctx) throws Http2Exception; void onSettingsAckRead(ChannelHandlerContext ctx) throws Http2Exception;
/** /**
* Handles an inbound SETTINGS frame. * Handles an inbound {@code SETTINGS} frame.
* *
* @param ctx the context from the handler where the frame was read. * @param ctx the context from the handler where the frame was read.
* @param settings the settings received from the remote endpoint. * @param settings the settings received from the remote endpoint.
@ -136,7 +140,7 @@ public interface Http2FrameListener {
void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings) throws Http2Exception; void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings) throws Http2Exception;
/** /**
* Handles an inbound PING frame. * Handles an inbound {@code PING} frame.
* *
* @param ctx the context from the handler where the frame was read. * @param ctx the context from the handler where the frame was read.
* @param data the payload of the frame. If this buffer needs to be retained by the listener * @param data the payload of the frame. If this buffer needs to be retained by the listener
@ -145,7 +149,7 @@ public interface Http2FrameListener {
void onPingRead(ChannelHandlerContext ctx, ByteBuf data) throws Http2Exception; void onPingRead(ChannelHandlerContext ctx, ByteBuf data) throws Http2Exception;
/** /**
* Handles an inbound PING acknowledgment. * Handles an inbound {@code PING} acknowledgment.
* *
* @param ctx the context from the handler where the frame was read. * @param ctx the context from the handler where the frame was read.
* @param data the payload of the frame. If this buffer needs to be retained by the listener * @param data the payload of the frame. If this buffer needs to be retained by the listener
@ -154,13 +158,13 @@ public interface Http2FrameListener {
void onPingAckRead(ChannelHandlerContext ctx, ByteBuf data) throws Http2Exception; void onPingAckRead(ChannelHandlerContext ctx, ByteBuf data) throws Http2Exception;
/** /**
* Handles an inbound PUSH_PROMISE frame. Only called if END_HEADERS encountered. * Handles an inbound {@code PUSH_PROMISE} frame. Only called if {@code END_HEADERS} encountered.
* <p> * <p>
* Promised requests MUST be authoritative, cacheable, and safe. * Promised requests MUST be authoritative, cacheable, and safe.
* See <a href="https://tools.ietf.org/html/draft-ietf-httpbis-http2-17#section-8.2">[RFC http2], Seciton 8.2</a>. * See <a href="https://tools.ietf.org/html/draft-ietf-httpbis-http2-17#section-8.2">[RFC http2], Seciton 8.2</a>.
* <p> * <p>
* Only one of the following methods will be called for each HEADERS frame sequence. * Only one of the following methods will be called for each {@code HEADERS} frame sequence.
* One will be called when the END_HEADERS flag has been received. * One will be called when the {@code END_HEADERS} flag has been received.
* <ul> * <ul>
* <li>{@link #onHeadersRead(ChannelHandlerContext, int, Http2Headers, int, boolean)}</li> * <li>{@link #onHeadersRead(ChannelHandlerContext, int, Http2Headers, int, boolean)}</li>
* <li>{@link #onHeadersRead(ChannelHandlerContext, int, Http2Headers, int, short, boolean, int, boolean)}</li> * <li>{@link #onHeadersRead(ChannelHandlerContext, int, Http2Headers, int, short, boolean, int, boolean)}</li>
@ -180,7 +184,7 @@ public interface Http2FrameListener {
Http2Headers headers, int padding) throws Http2Exception; Http2Headers headers, int padding) throws Http2Exception;
/** /**
* Handles an inbound GO_AWAY frame. * Handles an inbound {@code GO_AWAY} frame.
* *
* @param ctx the context from the handler where the frame was read. * @param ctx the context from the handler where the frame was read.
* @param lastStreamId the last known stream of the remote endpoint. * @param lastStreamId the last known stream of the remote endpoint.
@ -192,7 +196,7 @@ public interface Http2FrameListener {
throws Http2Exception; throws Http2Exception;
/** /**
* Handles an inbound WINDOW_UPDATE frame. * Handles an inbound {@code WINDOW_UPDATE} frame.
* *
* @param ctx the context from the handler where the frame was read. * @param ctx the context from the handler where the frame was read.
* @param streamId the stream the frame was sent on. * @param streamId the stream the frame was sent on.

View File

@ -45,6 +45,7 @@ import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise; import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultChannelPromise; import io.netty.channel.DefaultChannelPromise;
import io.netty.handler.codec.http2.Http2Exception.ClosedStreamCreationException;
import java.util.Collections; import java.util.Collections;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@ -379,6 +380,40 @@ public class DefaultHttp2ConnectionDecoderTest {
verify(stream, never()).open(anyBoolean()); verify(stream, never()).open(anyBoolean());
} }
@Test
public void priorityReadOnPreviouslyExistingStreamShouldSucceed() throws Exception {
doAnswer(new Answer<Http2Stream>() {
@Override
public Http2Stream answer(InvocationOnMock in) throws Throwable {
throw new ClosedStreamCreationException(Http2Error.INTERNAL_ERROR);
}
}).when(remote).createStream(eq(STREAM_ID));
when(connection.stream(STREAM_ID)).thenReturn(null);
when(connection.requireStream(STREAM_ID)).thenReturn(null);
// Just return the stream object as the connection stream to ensure the dependent stream "exists"
when(connection.stream(0)).thenReturn(stream);
when(connection.requireStream(0)).thenReturn(stream);
decode().onPriorityRead(ctx, STREAM_ID, 0, (short) 255, true);
verify(stream, never()).setPriority(anyInt(), anyShort(), anyBoolean());
verify(listener).onPriorityRead(eq(ctx), eq(STREAM_ID), eq(0), eq((short) 255), eq(true));
verify(remote).createStream(STREAM_ID);
}
@Test
public void priorityReadOnPreviouslyParentExistingStreamShouldSucceed() throws Exception {
doAnswer(new Answer<Http2Stream>() {
@Override
public Http2Stream answer(InvocationOnMock in) throws Throwable {
throw new ClosedStreamCreationException(Http2Error.INTERNAL_ERROR);
}
}).when(stream).setPriority(eq(0), eq((short) 255), eq(true));
when(connection.stream(STREAM_ID)).thenReturn(stream);
when(connection.requireStream(STREAM_ID)).thenReturn(stream);
decode().onPriorityRead(ctx, STREAM_ID, 0, (short) 255, true);
verify(stream).setPriority(eq(0), eq((short) 255), eq(true));
verify(listener).onPriorityRead(eq(ctx), eq(STREAM_ID), eq(0), eq((short) 255), eq(true));
}
@Test @Test
public void windowUpdateReadAfterGoAwayShouldBeIgnored() throws Exception { public void windowUpdateReadAfterGoAwayShouldBeIgnored() throws Exception {
when(connection.goAwaySent()).thenReturn(true); when(connection.goAwaySent()).thenReturn(true);

View File

@ -38,7 +38,6 @@ import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.never; import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;
import io.netty.buffer.UnpooledByteBufAllocator; import io.netty.buffer.UnpooledByteBufAllocator;
@ -48,8 +47,14 @@ import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise; import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultChannelPromise; import io.netty.channel.DefaultChannelPromise;
import io.netty.handler.codec.http2.Http2Exception.ClosedStreamCreationException;
import io.netty.handler.codec.http2.Http2RemoteFlowController.FlowControlled; import io.netty.handler.codec.http2.Http2RemoteFlowController.FlowControlled;
import io.netty.util.concurrent.ImmediateEventExecutor; import io.netty.util.concurrent.ImmediateEventExecutor;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.mockito.ArgumentCaptor; import org.mockito.ArgumentCaptor;
@ -58,10 +63,6 @@ import org.mockito.MockitoAnnotations;
import org.mockito.invocation.InvocationOnMock; import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer; import org.mockito.stubbing.Answer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
/** /**
* Tests for {@link DefaultHttp2ConnectionEncoder} * Tests for {@link DefaultHttp2ConnectionEncoder}
*/ */
@ -377,6 +378,40 @@ public class DefaultHttp2ConnectionEncoderTest {
verify(stream, never()).open(anyBoolean()); verify(stream, never()).open(anyBoolean());
} }
@Test
public void priorityWriteOnPreviouslyExistingStreamShouldSucceed() throws Exception {
doAnswer(new Answer<Http2Stream>() {
@Override
public Http2Stream answer(InvocationOnMock in) throws Throwable {
throw new ClosedStreamCreationException(Http2Error.INTERNAL_ERROR);
}
}).when(local).createStream(eq(STREAM_ID));
when(connection.stream(STREAM_ID)).thenReturn(null);
when(connection.requireStream(STREAM_ID)).thenReturn(null);
// Just return the stream object as the connection stream to ensure the dependent stream "exists"
when(connection.stream(0)).thenReturn(stream);
when(connection.requireStream(0)).thenReturn(stream);
encoder.writePriority(ctx, STREAM_ID, 0, (short) 255, true, promise);
verify(stream, never()).setPriority(anyInt(), anyShort(), anyBoolean());
verify(writer).writePriority(eq(ctx), eq(STREAM_ID), eq(0), eq((short) 255), eq(true), eq(promise));
verify(local).createStream(STREAM_ID);
}
@Test
public void priorityWriteOnPreviouslyExistingParentStreamShouldSucceed() throws Exception {
doAnswer(new Answer<Http2Stream>() {
@Override
public Http2Stream answer(InvocationOnMock in) throws Throwable {
throw new ClosedStreamCreationException(Http2Error.INTERNAL_ERROR);
}
}).when(stream).setPriority(eq(0), eq((short) 255), eq(true));
when(connection.stream(STREAM_ID)).thenReturn(stream);
when(connection.requireStream(STREAM_ID)).thenReturn(stream);
encoder.writePriority(ctx, STREAM_ID, 0, (short) 255, true, promise);
verify(stream).setPriority(eq(0), eq((short) 255), eq(true));
verify(writer).writePriority(eq(ctx), eq(STREAM_ID), eq(0), eq((short) 255), eq(true), eq(promise));
}
@Test @Test
public void rstStreamWriteForUnknownStreamShouldIgnore() throws Exception { public void rstStreamWriteForUnknownStreamShouldIgnore() throws Exception {
encoder.writeRstStream(ctx, 5, PROTOCOL_ERROR.code(), promise); encoder.writeRstStream(ctx, 5, PROTOCOL_ERROR.code(), promise);