DefaultHttp2FrameReader stops reading if stream error

Motivation:
DefaultHttp2FrameReader will stop reading data if any exception is thrown. However some exceptions are recoverable and we will lose data if we don't continue reading. For example some stream errors are recoverable.

Modifications:
- DefaultHttp2FrameReader should attempt to continue reading if a stream error is encountered.

Result:
Fixes https://github.com/netty/netty/issues/5186
This commit is contained in:
Scott Mitchell 2016-04-28 22:52:04 -07:00
parent 5b59250657
commit 467e5a1019
3 changed files with 116 additions and 52 deletions

View File

@ -343,7 +343,7 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder {
if (stream == null) {
if (connection.streamMayHaveExisted(streamId)) {
if (logger.isInfoEnabled()) {
logger.info("%s ignoring PRIORITY frame for stream id %d. Stream doesn't exist but may " +
logger.info("%s ignoring PRIORITY frame for stream %d. Stream doesn't exist but may " +
" have existed", ctx.channel(), streamId);
}
return;
@ -354,7 +354,7 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder {
stream = connection.remote().createIdleStream(streamId);
} else if (streamCreatedAfterGoAwaySent(streamId)) {
if (logger.isInfoEnabled()) {
logger.info("%s ignoring PRIORITY frame for stream id %d. Stream created after GOAWAY sent. " +
logger.info("%s ignoring PRIORITY frame for stream %d. Stream created after GOAWAY sent. " +
"Last known stream by peer " + connection.remote().lastStreamKnownByPeer(),
ctx.channel(), streamId);
}
@ -482,7 +482,7 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder {
}
if (parentStream == null) {
throw connectionError(PROTOCOL_ERROR, "Stream does not exist %d", streamId);
throw connectionError(PROTOCOL_ERROR, "Stream %d does not exist", streamId);
}
switch (parentStream.state()) {
@ -556,7 +556,7 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder {
if (stream == null) {
if (streamCreatedAfterGoAwaySent(streamId)) {
if (logger.isInfoEnabled()) {
logger.info("%s ignoring %s frame for stream id %d. Stream sent after GOAWAY sent",
logger.info("%s ignoring %s frame for stream %d. Stream sent after GOAWAY sent",
ctx.channel(), frameName, streamId);
}
return true;
@ -564,10 +564,11 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder {
// Its possible that this frame would result in stream ID out of order creation (PROTOCOL ERROR) and its
// also possible that this frame is received on a CLOSED stream (STREAM_CLOSED after a RST_STREAM is
// sent). We don't have enough information to know for sure, so we choose the lesser of the two errors.
throw streamError(streamId, STREAM_CLOSED, "Received HEADERS frame for an unknown stream %d", streamId);
throw streamError(streamId, STREAM_CLOSED, "Received %s frame for an unknown stream %d",
frameName, streamId);
} else if (stream.isResetSent() || streamCreatedAfterGoAwaySent(streamId)) {
if (logger.isInfoEnabled()) {
logger.info("%s ignoring %s frame for stream id %d. %s", ctx.channel(), frameName,
logger.info("%s ignoring %s frame for stream %d. %s", ctx.channel(), frameName,
stream.isResetSent() ? "RST_STREAM sent." :
("Stream created after GOAWAY sent. Last known stream by peer " +
connection.remote().lastStreamKnownByPeer()));
@ -597,7 +598,7 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder {
private void verifyStreamMayHaveExisted(int streamId) throws Http2Exception {
if (!connection.streamMayHaveExisted(streamId)) {
throw connectionError(PROTOCOL_ERROR, "Stream does not exist %d", streamId);
throw connectionError(PROTOCOL_ERROR, "Stream %d does not exist", streamId);
}
}
}

View File

@ -18,6 +18,7 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http2.Http2FrameReader.Configuration;
import io.netty.util.internal.PlatformDependent;
import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_MAX_FRAME_SIZE;
import static io.netty.handler.codec.http2.Http2CodecUtil.FRAME_HEADER_LENGTH;
@ -50,15 +51,17 @@ import static io.netty.handler.codec.http2.Http2FrameTypes.WINDOW_UPDATE;
* A {@link Http2FrameReader} that supports all frame types defined by the HTTP/2 specification.
*/
public class DefaultHttp2FrameReader implements Http2FrameReader, Http2FrameSizePolicy, Configuration {
private enum State {
FRAME_HEADER,
FRAME_PAYLOAD,
ERROR
}
private final Http2HeadersDecoder headersDecoder;
private State state = State.FRAME_HEADER;
/**
* {@code true} = reading headers, {@code false} = reading payload.
*/
private boolean readingHeaders = true;
/**
* Once set to {@code true} the value will never change. This is set to {@code true} if an unrecoverable error which
* renders the connection unusable.
*/
private boolean readError;
private byte frameType;
private int streamId;
private Http2Flags flags;
@ -128,44 +131,40 @@ public class DefaultHttp2FrameReader implements Http2FrameReader, Http2FrameSize
@Override
public void readFrame(ChannelHandlerContext ctx, ByteBuf input, Http2FrameListener listener)
throws Http2Exception {
if (readError) {
input.skipBytes(input.readableBytes());
return;
}
try {
while (input.isReadable()) {
switch (state) {
case FRAME_HEADER:
processHeaderState(input);
if (state == State.FRAME_HEADER) {
// Wait until the entire header has arrived.
return;
}
// The header is complete, fall into the next case to process the payload.
// This is to ensure the proper handling of zero-length payloads. In this
// case, we don't want to loop around because there may be no more data
// available, causing us to exit the loop. Instead, we just want to perform
// the first pass at payload processing now.
case FRAME_PAYLOAD:
processPayloadState(ctx, input, listener);
if (state == State.FRAME_PAYLOAD) {
// Wait until the entire payload has arrived.
return;
}
break;
case ERROR:
input.skipBytes(input.readableBytes());
do {
if (readingHeaders) {
processHeaderState(input);
if (readingHeaders) {
// Wait until the entire header has arrived.
return;
default:
throw new IllegalStateException("Should never get here");
}
}
}
// The header is complete, fall into the next case to process the payload.
// This is to ensure the proper handling of zero-length payloads. In this
// case, we don't want to loop around because there may be no more data
// available, causing us to exit the loop. Instead, we just want to perform
// the first pass at payload processing now.
processPayloadState(ctx, input, listener);
if (!readingHeaders) {
// Wait until the entire payload has arrived.
return;
}
} while (input.isReadable());
} catch (Http2Exception e) {
state = State.ERROR;
readError = !Http2Exception.isStreamError(e);
throw e;
} catch (RuntimeException e) {
state = State.ERROR;
throw e;
} catch (Error e) {
state = State.ERROR;
readError = true;
throw e;
} catch (Throwable cause) {
readError = true;
PlatformDependent.throwException(cause);
}
}
@ -184,6 +183,9 @@ public class DefaultHttp2FrameReader implements Http2FrameReader, Http2FrameSize
flags = new Http2Flags(in.readUnsignedByte());
streamId = readUnsignedInt(in);
// We have consumed the data, next time we read we will be expecting to read the frame payload.
readingHeaders = false;
switch (frameType) {
case DATA:
verifyDataFrame();
@ -219,9 +221,6 @@ public class DefaultHttp2FrameReader implements Http2FrameReader, Http2FrameSize
// Unknown frame type, could be an extension.
break;
}
// Start reading the payload for the frame.
state = State.FRAME_PAYLOAD;
}
private void processPayloadState(ChannelHandlerContext ctx, ByteBuf in, Http2FrameListener listener)
@ -234,6 +233,9 @@ public class DefaultHttp2FrameReader implements Http2FrameReader, Http2FrameSize
// Get a view of the buffer for the size of the payload.
ByteBuf payload = in.readSlice(payloadLength);
// We have consumed the data, next time we read we will be expecting to read a frame header.
readingHeaders = true;
// Read the payload and fire the frame event to the listener.
switch (frameType) {
case DATA:
@ -270,9 +272,6 @@ public class DefaultHttp2FrameReader implements Http2FrameReader, Http2FrameSize
readUnknownFrame(ctx, payload, listener);
break;
}
// Go back to reading the next frame header.
state = State.FRAME_HEADER;
}
private void verifyDataFrame() throws Http2Exception {

View File

@ -47,6 +47,8 @@ import org.mockito.stubbing.Answer;
import java.io.ByteArrayOutputStream;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR;
import static io.netty.handler.codec.http2.Http2TestUtil.randomString;
@ -62,6 +64,7 @@ import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.anyShort;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
@ -81,6 +84,7 @@ public class Http2ConnectionRoundtripTest {
private Http2FrameListener serverListener;
private Http2ConnectionHandler http2Client;
private Http2ConnectionHandler http2Server;
private ServerBootstrap sb;
private Bootstrap cb;
private Channel serverChannel;
@ -117,6 +121,59 @@ public class Http2ConnectionRoundtripTest {
clientGroup.sync();
}
@Test
public void inflightFrameAfterStreamResetShouldNotMakeConnectionUnsuable() throws Exception {
bootstrapEnv(1, 1, 2, 1);
final CountDownLatch latch = new CountDownLatch(1);
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
ChannelHandlerContext ctx = invocationOnMock.getArgumentAt(0, ChannelHandlerContext.class);
http2Server.encoder().writeHeaders(ctx,
invocationOnMock.getArgumentAt(1, Integer.class),
invocationOnMock.getArgumentAt(2, Http2Headers.class),
0,
false,
ctx.newPromise());
http2Server.flush(ctx);
return null;
}
}).when(serverListener).onHeadersRead(any(ChannelHandlerContext.class), anyInt(), any(Http2Headers.class),
anyInt(), anyShort(), anyBoolean(), anyInt(), anyBoolean());
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
latch.countDown();
return null;
}
}).when(clientListener).onHeadersRead(any(ChannelHandlerContext.class), eq(5), any(Http2Headers.class),
anyInt(), anyShort(), anyBoolean(), anyInt(), anyBoolean());
// Create a single stream by sending a HEADERS frame to the server.
final short weight = 16;
final Http2Headers headers = dummyHeaders();
runInChannel(clientChannel, new Http2Runnable() {
@Override
public void run() throws Http2Exception {
http2Client.encoder().writeHeaders(ctx(), 3, headers, 0, weight, false, 0, false, newPromise());
http2Client.flush(ctx());
http2Client.encoder().writeRstStream(ctx(), 3, Http2Error.INTERNAL_ERROR.code(), newPromise());
http2Client.flush(ctx());
}
});
runInChannel(clientChannel, new Http2Runnable() {
@Override
public void run() throws Http2Exception {
http2Client.encoder().writeHeaders(ctx(), 5, headers, 0, weight, false, 0, false, newPromise());
http2Client.flush(ctx());
}
});
assertTrue(latch.await(5, TimeUnit.SECONDS));
}
@Test
public void headersWithEndStreamShouldNotSendError() throws Exception {
bootstrapEnv(1, 1, 2, 1);
@ -473,6 +530,8 @@ public class Http2ConnectionRoundtripTest {
sb = new ServerBootstrap();
cb = new Bootstrap();
final AtomicReference<Http2ConnectionHandler> serverHandlerRef = new AtomicReference<Http2ConnectionHandler>();
final CountDownLatch serverInitLatch = new CountDownLatch(1);
sb.group(new DefaultEventLoopGroup());
sb.channel(LocalServerChannel.class);
sb.childHandler(new ChannelInitializer<Channel>() {
@ -482,11 +541,13 @@ public class Http2ConnectionRoundtripTest {
serverFrameCountDown =
new FrameCountDown(serverListener, serverSettingsAckLatch,
requestLatch, dataLatch, trailersLatch, goAwayLatch);
p.addLast(new Http2ConnectionHandlerBuilder()
serverHandlerRef.set(new Http2ConnectionHandlerBuilder()
.server(true)
.frameListener(serverFrameCountDown)
.validateHeaders(false)
.build());
p.addLast(serverHandlerRef.get());
serverInitLatch.countDown();
}
});
@ -500,6 +561,7 @@ public class Http2ConnectionRoundtripTest {
.server(false)
.frameListener(clientListener)
.validateHeaders(false)
.gracefulShutdownTimeoutMillis(0)
.build());
}
});
@ -510,6 +572,8 @@ public class Http2ConnectionRoundtripTest {
assertTrue(ccf.awaitUninterruptibly().isSuccess());
clientChannel = ccf.channel();
http2Client = clientChannel.pipeline().get(Http2ConnectionHandler.class);
assertTrue(serverInitLatch.await(2, TimeUnit.SECONDS));
http2Server = serverHandlerRef.get();
}
private ChannelHandlerContext ctx() {