Ensure we can correctly propagate exceptions to streams even if endStream flag is set (#11105)
Motivation: We need to ensure we are still be able to correctly map errors to streams in all cases. The problem was that we sometimes called closeStreamRemote(...) in a finally block and so closed the underyling stream before the actual exception was propagated. This was only true in some cases and not in all. Generally speaking we should only call closeStreamRemote(...) if there was no error as in a case of error we should generate a RST frame. Modifications: - Only call closeStreamRemote(...) if no exeption was thrown and so let the Http2ConnectionHandler handle the exception correctly - Add unit tests Result: Correctly handle errors even when endStream is set to true
This commit is contained in:
parent
9ba653c851
commit
9c39c0de35
@ -316,6 +316,11 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder {
|
||||
// Call back the application and retrieve the number of bytes that have been
|
||||
// immediately processed.
|
||||
bytesToReturn = listener.onDataRead(ctx, streamId, data, padding, endOfStream);
|
||||
|
||||
if (endOfStream) {
|
||||
lifecycleManager.closeStreamRemote(stream, ctx.newSucceededFuture());
|
||||
}
|
||||
|
||||
return bytesToReturn;
|
||||
} catch (Http2Exception | RuntimeException e) {
|
||||
// If an exception happened during delivery, the listener may have returned part
|
||||
@ -327,10 +332,6 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder {
|
||||
} finally {
|
||||
// If appropriate, return the processed bytes to the flow controller.
|
||||
flowController.consumeBytes(stream, bytesToReturn);
|
||||
|
||||
if (endOfStream) {
|
||||
lifecycleManager.closeStreamRemote(stream, ctx.newSucceededFuture());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -404,18 +405,15 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder {
|
||||
}
|
||||
|
||||
stream.headersReceived(isInformational);
|
||||
try {
|
||||
verifyContentLength(stream, 0, endOfStream);
|
||||
encoder.flowController().updateDependencyTree(streamId, streamDependency, weight, exclusive);
|
||||
listener.onHeadersRead(ctx, streamId, headers, streamDependency,
|
||||
weight, exclusive, padding, endOfStream);
|
||||
} finally {
|
||||
// If the headers completes this stream, close it.
|
||||
if (endOfStream) {
|
||||
lifecycleManager.closeStreamRemote(stream, ctx.newSucceededFuture());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onPriorityRead(ChannelHandlerContext ctx, int streamId, int streamDependency, short weight,
|
||||
|
@ -448,7 +448,6 @@ public class DefaultHttp2ConnectionDecoderTest {
|
||||
} catch (RuntimeException cause) {
|
||||
verify(localFlow)
|
||||
.receiveFlowControlledFrame(eq(stream), eq(data), eq(padding), eq(true));
|
||||
verify(lifecycleManager).closeStreamRemote(eq(stream), eq(future));
|
||||
verify(listener).onDataRead(eq(ctx), eq(STREAM_ID), eq(data), eq(padding), eq(true));
|
||||
assertEquals(0, localFlow.unconsumedBytes(stream));
|
||||
} finally {
|
||||
|
@ -231,12 +231,64 @@ public abstract class Http2MultiplexTest<C extends Http2FrameCodec> {
|
||||
|
||||
@Test
|
||||
public void headerContentLengthNotMatchValidationShouldPropagate() {
|
||||
headerContentLengthNotMatchValidationShouldPropagate(false, false, false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void headerContentLengthNotMatchValidationShouldPropagateWithEndStream() {
|
||||
headerContentLengthNotMatchValidationShouldPropagate(false, true, false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void headerContentLengthNotMatchValidationShouldPropagateCloseLocal() {
|
||||
headerContentLengthNotMatchValidationShouldPropagate(true, false, false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void headerContentLengthNotMatchValidationShouldPropagateWithEndStreamCloseLocal() {
|
||||
headerContentLengthNotMatchValidationShouldPropagate(true, true, false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void headerContentLengthNotMatchValidationShouldPropagateTrailers() {
|
||||
headerContentLengthNotMatchValidationShouldPropagate(false, false, true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void headerContentLengthNotMatchValidationShouldPropagateWithEndStreamTrailers() {
|
||||
headerContentLengthNotMatchValidationShouldPropagate(false, true, true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void headerContentLengthNotMatchValidationShouldPropagateCloseLocalTrailers() {
|
||||
headerContentLengthNotMatchValidationShouldPropagate(true, false, true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void headerContentLengthNotMatchValidationShouldPropagateWithEndStreamCloseLocalTrailers() {
|
||||
headerContentLengthNotMatchValidationShouldPropagate(true, true, true);
|
||||
}
|
||||
|
||||
private void headerContentLengthNotMatchValidationShouldPropagate(
|
||||
boolean closeLocal, boolean endStream, boolean trailer) {
|
||||
LastInboundHandler inboundHandler = new LastInboundHandler();
|
||||
request.addLong(HttpHeaderNames.CONTENT_LENGTH, 1);
|
||||
Http2StreamChannel channel = newInboundStream(3, false, inboundHandler);
|
||||
assertTrue(channel.isActive());
|
||||
|
||||
frameInboundWriter.writeInboundData(channel.stream().id(), bb("foo"), 0, false);
|
||||
if (closeLocal) {
|
||||
channel.writeAndFlush(new DefaultHttp2HeadersFrame(new DefaultHttp2Headers(), true))
|
||||
.syncUninterruptibly();
|
||||
assertEquals(Http2Stream.State.HALF_CLOSED_LOCAL, channel.stream().state());
|
||||
} else {
|
||||
assertEquals(Http2Stream.State.OPEN, channel.stream().state());
|
||||
}
|
||||
|
||||
if (trailer) {
|
||||
frameInboundWriter.writeInboundHeaders(channel.stream().id(), new DefaultHttp2Headers(), 0, endStream);
|
||||
} else {
|
||||
frameInboundWriter.writeInboundData(channel.stream().id(), bb("foo"), 0, endStream);
|
||||
}
|
||||
try {
|
||||
inboundHandler.checkException();
|
||||
fail();
|
||||
|
Loading…
x
Reference in New Issue
Block a user