HTTP/2 Don't Flow Control Iniital Headers

Motivation:
Currently the initial headers for every stream is queued in the flow controller. Since the initial header frame may create streams the peer must receive these frames in the order in which they were created, or else this will be a protocol error and the connection will be closed. Tolerating the initial headers being queued would increase the complexity of the WeightedFairQueueByteDistributor and there is benefit of doing so is not clear.

Modifications:
- The initial headers will no longer be queued in the flow controllers

Result:
Fixes https://github.com/netty/netty/issues/4758
This commit is contained in:
Scott Mitchell 2016-01-29 00:45:12 -08:00
parent 5fb18e3415
commit f990f9983d
11 changed files with 146 additions and 76 deletions

View File

@ -296,7 +296,6 @@ public class DefaultHttp2Connection implements Http2Connection {
private IntObjectMap<DefaultStream> children = IntCollections.emptyMap();
private int prioritizableForTree = 1;
private boolean resetSent;
private boolean headerSent;
DefaultStream(int id, State state) {
this.id = id;
@ -324,17 +323,6 @@ public class DefaultHttp2Connection implements Http2Connection {
return this;
}
@Override
public boolean isHeaderSent() {
return headerSent;
}
@Override
public Http2Stream headerSent() {
headerSent = true;
return this;
}
@Override
public final <V> V setProperty(PropertyKey key, V value) {
return properties.add(verifyKey(key), value);
@ -793,16 +781,6 @@ public class DefaultHttp2Connection implements Http2Connection {
throw new UnsupportedOperationException();
}
@Override
public boolean isHeaderSent() {
return false;
}
@Override
public Http2Stream headerSent() {
throw new UnsupportedOperationException();
}
@Override
public Http2Stream setPriority(int parentStreamId, short weight, boolean exclusive) {
throw new UnsupportedOperationException();

View File

@ -14,6 +14,14 @@
*/
package io.netty.handler.codec.http2;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http2.Http2Exception.ClosedStreamCreationException;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import java.util.List;
import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT;
import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR;
import static io.netty.handler.codec.http2.Http2Error.STREAM_CLOSED;
@ -23,11 +31,6 @@ import static io.netty.handler.codec.http2.Http2PromisedRequestVerifier.ALWAYS_V
import static io.netty.handler.codec.http2.Http2Stream.State.CLOSED;
import static io.netty.handler.codec.http2.Http2Stream.State.HALF_CLOSED_REMOTE;
import static io.netty.util.internal.ObjectUtil.checkNotNull;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http2.Http2Exception.ClosedStreamCreationException;
import java.util.List;
/**
* Provides the default implementation for processing inbound frame events and delegates to a
@ -39,6 +42,7 @@ import java.util.List;
* {@link Http2LocalFlowController}
*/
public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultHttp2ConnectionDecoder.class);
private Http2FrameListener internalFrameListener = new PrefaceFrameListener();
private final Http2Connection connection;
private Http2LifecycleManager lifecycleManager;
@ -185,23 +189,28 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder {
*/
private final class FrameReadListener implements Http2FrameListener {
@Override
public int onDataRead(final ChannelHandlerContext ctx, int streamId, ByteBuf data,
int padding, boolean endOfStream) throws Http2Exception {
public int onDataRead(final ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding,
boolean endOfStream) throws Http2Exception {
Http2Stream stream = connection.stream(streamId);
Http2LocalFlowController flowController = flowController();
int bytesToReturn = data.readableBytes() + padding;
if (stream == null || stream.isResetSent() || streamCreatedAfterGoAwaySent(streamId)) {
// Ignoring this frame. We still need to count the frame towards the connection flow control
// window, but we immediately mark all bytes as consumed.
flowController.receiveFlowControlledFrame(stream, data, padding, endOfStream);
flowController.consumeBytes(stream, bytesToReturn);
boolean shouldIgnore = true;
try {
shouldIgnore = shouldIgnoreHeadersOrDataFrame(ctx, streamId, stream, "DATA");
} finally {
if (shouldIgnore) {
// Ignoring this frame. We still need to count the frame towards the connection flow control
// window, but we immediately mark all bytes as consumed.
flowController.receiveFlowControlledFrame(stream, data, padding, endOfStream);
flowController.consumeBytes(stream, bytesToReturn);
// Verify that the stream may have existed after we apply flow control.
verifyStreamMayHaveExisted(streamId);
// Verify that the stream may have existed after we apply flow control.
verifyStreamMayHaveExisted(streamId);
// All bytes have been consumed.
return bytesToReturn;
// All bytes have been consumed.
return bytesToReturn;
}
}
Http2Exception error = null;
@ -276,8 +285,7 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder {
allowHalfClosedRemote = stream.state() == HALF_CLOSED_REMOTE;
}
if (stream == null || stream.isResetSent() || streamCreatedAfterGoAwaySent(streamId)) {
// Ignore this frame.
if (shouldIgnoreHeadersOrDataFrame(ctx, streamId, stream, "HEADERS")) {
return;
}
@ -329,7 +337,10 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder {
try {
if (stream == null) {
if (connection.streamMayHaveExisted(streamId)) {
// Ignore this frame.
if (logger.isInfoEnabled()) {
logger.info("%s ignoring PRIORITY frame for stream id %d. Stream doesn't exist but may " +
" have existed", ctx.channel(), streamId);
}
return;
}
@ -337,7 +348,11 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder {
// first frame to be received for a stream that we must create the stream.
stream = connection.remote().createIdleStream(streamId);
} else if (streamCreatedAfterGoAwaySent(streamId)) {
// Ignore this frame.
if (logger.isInfoEnabled()) {
logger.info("%s ignoring PRIORITY frame for stream id %d. Stream created after GOAWAY sent. " +
"Last known stream by peer " + connection.remote().lastStreamKnownByPeer(),
ctx.channel(), streamId);
}
return;
}
@ -457,7 +472,7 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder {
Http2Headers headers, int padding) throws Http2Exception {
Http2Stream parentStream = connection.stream(streamId);
if (streamCreatedAfterGoAwaySent(streamId)) {
if (shouldIgnoreHeadersOrDataFrame(ctx, streamId, parentStream, "PUSH_PROMISE")) {
return;
}
@ -527,6 +542,36 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder {
onUnknownFrame0(ctx, frameType, streamId, flags, payload);
}
/**
* Helper method to determine if a frame that has the semantics of headers or data should be ignored for the
* {@code stream} (which may be {@code null}) associated with {@code streamId}.
*/
private boolean shouldIgnoreHeadersOrDataFrame(ChannelHandlerContext ctx, int streamId, Http2Stream stream,
String frameName) throws Http2Exception {
if (stream == null) {
if (streamCreatedAfterGoAwaySent(streamId)) {
if (logger.isInfoEnabled()) {
logger.info("%s ignoring %s frame for stream id %d. Stream sent after GOAWAY sent",
ctx.channel(), frameName, streamId);
}
return true;
}
// Its possible that this frame would result in stream ID out of order creation (PROTOCOL ERROR) and its
// also possible that this frame is received on a CLOSED stream (STREAM_CLOSED after a RST_STREAM is
// sent). We don't have enough information to know for sure, so we choose the lesser of the two errors.
throw streamError(streamId, STREAM_CLOSED, "Received HEADERS frame for an unknown stream %d", streamId);
} else if (stream.isResetSent() || streamCreatedAfterGoAwaySent(streamId)) {
if (logger.isInfoEnabled()) {
logger.info("%s ignoring %s frame for stream id %d. %s", ctx.channel(), frameName,
stream.isResetSent() ? "RST_STREAM sent." :
("Stream created after GOAWAY sent. Last known stream by peer " +
connection.remote().lastStreamKnownByPeer()));
}
return true;
}
return false;
}
/**
* Helper method for determining whether or not to ignore inbound frames. A stream is considered to be created
* after a {@code GOAWAY} is sent if the following conditions hold:

View File

@ -14,12 +14,6 @@
*/
package io.netty.handler.codec.http2;
import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT;
import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR;
import static io.netty.handler.codec.http2.Http2Exception.connectionError;
import static io.netty.util.internal.ObjectUtil.checkNotNull;
import static java.lang.Math.min;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
@ -30,6 +24,12 @@ import io.netty.handler.codec.http2.Http2Exception.ClosedStreamCreationException
import java.util.ArrayDeque;
import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT;
import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR;
import static io.netty.handler.codec.http2.Http2Exception.connectionError;
import static io.netty.util.internal.ObjectUtil.checkNotNull;
import static java.lang.Math.min;
/**
* Default implementation of {@link Http2ConnectionEncoder}.
*/
@ -167,11 +167,29 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
}
}
// Pass headers to the flow-controller so it can maintain their sequence relative to DATA frames.
flowController().addFlowControlled(stream,
new FlowControlledHeaders(stream, headers, streamDependency, weight, exclusive, padding,
endOfStream, promise));
return promise;
// Trailing headers must go through flow control if there are other frames queued in flow control
// for this stream.
Http2RemoteFlowController flowController = flowController();
if (!endOfStream || !flowController.hasFlowControlled(stream)) {
ChannelFuture future = frameWriter.writeHeaders(ctx, streamId, headers, streamDependency, weight,
exclusive, padding, endOfStream, promise);
if (endOfStream) {
final Http2Stream finalStream = stream;
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
lifecycleManager.closeStreamLocal(finalStream, promise);
}
});
}
return future;
} else {
// Pass headers to the flow-controller so it can maintain their sequence relative to DATA frames.
flowController.addFlowControlled(stream,
new FlowControlledHeaders(stream, headers, streamDependency, weight, exclusive, padding,
endOfStream, promise));
return promise;
}
} catch (Http2NoMoreStreamIdsException e) {
lifecycleManager.onError(ctx, e);
return promise.setFailure(e);
@ -410,7 +428,6 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
}
promise.addListener(this);
stream.headerSent();
frameWriter.writeHeaders(ctx, stream.id(), headers, streamDependency, weight, exclusive,
padding, endOfStream, promise);
}

View File

@ -231,6 +231,11 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
}
}
@Override
public boolean hasFlowControlled(Http2Stream stream) {
return state(stream).hasFrame();
}
private AbstractState state(Http2Stream stream) {
return (AbstractState) checkNotNull(stream, "stream").getProperty(stateKey);
}

View File

@ -616,7 +616,7 @@ public class Http2ConnectionHandler extends ByteToMessageDecoder implements Http
}
final ChannelFuture future;
if (stream.state() == IDLE || (connection().local().created(stream) && !stream.isHeaderSent())) {
if (stream.state() == IDLE || connection().local().created(stream)) {
// The other endpoint doesn't know about the stream yet, so we can't actually send
// the RST_STREAM frame. The HTTP/2 spec also disallows sending RST_STREAM for IDLE streams.
future = promise.setSuccess();

View File

@ -41,6 +41,13 @@ public interface Http2RemoteFlowController extends Http2FlowController {
*/
void addFlowControlled(Http2Stream stream, FlowControlled payload);
/**
* Determine if {@code stream} has any {@link FlowControlled} frames currently queued.
* @param stream the stream to check if it has flow controlled frames.
* @return {@code true} if {@code stream} has any {@link FlowControlled} frames currently queued.
*/
boolean hasFlowControlled(Http2Stream stream);
/**
* Write all data pending in the flow controller up to the flow-control limits.
*

View File

@ -110,18 +110,6 @@ public interface Http2Stream {
*/
Http2Stream resetSent();
/**
* Indicates whether or not at least one {@code HEADERS} frame has been sent from the local endpoint
* for this stream.
*/
boolean isHeaderSent();
/**
* Sets the flag indicating that a {@code HEADERS} frame has been sent from the local endpoint
* for this stream. This does not affect the stream state.
*/
Http2Stream headerSent();
/**
* Associates the application-defined data with this stream.
* @return The value that was previously associated with {@code key}, or {@code null} if there was none.

View File

@ -32,6 +32,7 @@ import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.anyShort;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.isNull;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.never;
@ -297,6 +298,21 @@ public class DefaultHttp2ConnectionDecoderTest {
}
}
@Test
public void dataReadAfterGoAwaySentOnUknownStreamShouldIgnore() throws Exception {
// Throw an exception when checking stream state.
when(connection.stream(STREAM_ID)).thenReturn(null);
mockGoAwaySent();
final ByteBuf data = dummyData();
try {
decode().onDataRead(ctx, STREAM_ID, data, 10, true);
verify(localFlow).receiveFlowControlledFrame((Http2Stream) isNull(), eq(data), eq(10), eq(true));
verify(listener, never()).onDataRead(eq(ctx), anyInt(), any(ByteBuf.class), anyInt(), anyBoolean());
} finally {
data.release();
}
}
@Test
public void dataReadAfterRstStreamForStreamInInvalidStateShouldIgnore() throws Exception {
// Throw an exception when checking stream state.
@ -369,21 +385,30 @@ public class DefaultHttp2ConnectionDecoderTest {
}
}
@Test
public void headersReadAfterGoAwayShouldBeIgnored() throws Exception {
when(connection.goAwaySent()).thenReturn(true);
@Test(expected = Http2Exception.class)
public void headersReadForUnknownStreamShouldThrow() throws Exception {
when(connection.stream(STREAM_ID)).thenReturn(null);
decode().onHeadersRead(ctx, STREAM_ID, EmptyHttp2Headers.INSTANCE, 0, false);
verify(remote, never()).createIdleStream(eq(STREAM_ID));
}
@Test
public void headersReadForStreamThatAlreadySentResetShouldBeIgnored() throws Exception {
when(stream.isResetSent()).thenReturn(true);
decode().onHeadersRead(ctx, STREAM_ID, EmptyHttp2Headers.INSTANCE, 0, false);
verify(remote, never()).createIdleStream(anyInt());
verify(remote, never()).createStream(anyInt(), anyBoolean());
verify(stream, never()).open(anyBoolean());
// Verify that the event was absorbed and not propagated to the oberver.
verify(listener, never()).onHeadersRead(eq(ctx), anyInt(), any(Http2Headers.class), anyInt(), anyBoolean());
verify(remote, never()).createIdleStream(anyInt());
verify(remote, never()).createStream(anyInt(), anyBoolean());
verify(stream, never()).open(anyBoolean());
}
@Test
public void headersReadForUnknownStreamShouldBeIgnored() throws Exception {
public void headersReadForUnknownStreamAfterGoAwayShouldBeIgnored() throws Exception {
mockGoAwaySent();
when(connection.stream(STREAM_ID)).thenReturn(null);
decode().onHeadersRead(ctx, STREAM_ID, EmptyHttp2Headers.INSTANCE, 0, false);
verify(remote, never()).createStream(anyInt(), anyBoolean());

View File

@ -265,8 +265,9 @@ public class DefaultHttp2ConnectionEncoderTest {
public void dataFramesDontMergeWithHeaders() throws Exception {
createStream(STREAM_ID, false);
final ByteBuf data = dummyData().retain();
encoder.writeData(ctx, STREAM_ID, data, 0, true, newPromise());
encoder.writeHeaders(ctx, STREAM_ID, EmptyHttp2Headers.INSTANCE, 0, false, newPromise());
encoder.writeData(ctx, STREAM_ID, data, 0, false, newPromise());
when(remoteFlow.hasFlowControlled(any(Http2Stream.class))).thenReturn(true);
encoder.writeHeaders(ctx, STREAM_ID, EmptyHttp2Headers.INSTANCE, 0, true, newPromise());
List<FlowControlled> capturedWrites = payloadCaptor.getAllValues();
assertFalse(capturedWrites.get(0).merge(ctx, capturedWrites.get(1)));
}

View File

@ -315,7 +315,6 @@ public class Http2ConnectionHandlerTest {
when(frameWriter.writeRstStream(eq(ctx), eq(STREAM_ID),
anyLong(), any(ChannelPromise.class))).thenReturn(future);
when(stream.state()).thenReturn(CLOSED);
when(stream.isHeaderSent()).thenReturn(true);
// The stream is "closed" but is still known about by the connection (connection().stream(..)
// will return the stream). We should still write a RST_STREAM frame in this scenario.
handler.resetStream(ctx, STREAM_ID, STREAM_CLOSED.code(), promise);

View File

@ -71,6 +71,11 @@ public final class NoopHttp2RemoteFlowController implements Http2RemoteFlowContr
} while (payload.size() > 0);
}
@Override
public boolean hasFlowControlled(Http2Stream stream) {
return false;
}
@Override
public void channelHandlerContext(ChannelHandlerContext ctx) throws Http2Exception {
this.ctx = ctx;