Ensure FlowControlled data frames will be correctly removed from the … (#8726)

Motivation:

When a write error happens during writing of flowcontrolled data frames we miss to correctly detect this in the write loop which may result in an infinite loop as we will never detect that the frame should be removed from the queue.

Modifications:

- When we fail a flowcontrolled data frame we ensure that the next frame.write(...) call will signal back that the whole frame was handled and so can be removed.
- Add unit test.

Result:

Fixes https://github.com/netty/netty/issues/8707.
This commit is contained in:
Norman Maurer 2019-01-19 14:01:31 +01:00
parent cd78d24761
commit af8e17f8a2
3 changed files with 112 additions and 6 deletions

View File

@ -397,6 +397,9 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
queue.releaseAndFailAll(cause); queue.releaseAndFailAll(cause);
// Don't update dataSize because we need to ensure the size() method returns a consistent size even after // Don't update dataSize because we need to ensure the size() method returns a consistent size even after
// error so we don't invalidate flow control when returning bytes to flow control. // error so we don't invalidate flow control when returning bytes to flow control.
//
// That said we will set dataSize and padding to 0 in the write(...) method if we cleared the queue
// because of an error.
lifecycleManager.onError(ctx, true, cause); lifecycleManager.onError(ctx, true, cause);
} }
@ -405,11 +408,21 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
int queuedData = queue.readableBytes(); int queuedData = queue.readableBytes();
if (!endOfStream) { if (!endOfStream) {
if (queuedData == 0) { if (queuedData == 0) {
// There's no need to write any data frames because there are only empty data frames in the queue if (queue.isEmpty()) {
// and it is not end of stream yet. Just complete their promises by getting the buffer corresponding // When the queue is empty it means we did clear it because of an error(...) call
// to 0 bytes and writing it to the channel (to preserve notification order). // (as otherwise we will have at least 1 entry in there), which will happen either when called
ChannelPromise writePromise = ctx.newPromise().addListener(this); // explicit or when the write itself fails. In this case just set dataSize and padding to 0
ctx.write(queue.remove(0, writePromise), writePromise); // which will signal back that the whole frame was consumed.
//
// See https://github.com/netty/netty/issues/8707.
padding = dataSize = 0;
} else {
// There's no need to write any data frames because there are only empty data frames in the
// queue and it is not end of stream yet. Just complete their promises by getting the buffer
// corresponding to 0 bytes and writing it to the channel (to preserve notification order).
ChannelPromise writePromise = ctx.newPromise().addListener(this);
ctx.write(queue.remove(0, writePromise), writePromise);
}
return; return;
} }

View File

@ -55,6 +55,7 @@ import static org.hamcrest.CoreMatchers.instanceOf;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull; import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertThat; import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
@ -126,6 +127,13 @@ public class DefaultHttp2ConnectionEncoderTest {
when(channel.unsafe()).thenReturn(unsafe); when(channel.unsafe()).thenReturn(unsafe);
ChannelConfig config = new DefaultChannelConfig(channel); ChannelConfig config = new DefaultChannelConfig(channel);
when(channel.config()).thenReturn(config); when(channel.config()).thenReturn(config);
doAnswer(new Answer<ChannelFuture>() {
@Override
public ChannelFuture answer(InvocationOnMock in) {
return newPromise().setFailure((Throwable) in.getArgument(0));
}
}).when(channel).newFailedFuture(any(Throwable.class));
when(writer.configuration()).thenReturn(writerConfig); when(writer.configuration()).thenReturn(writerConfig);
when(writerConfig.frameSizePolicy()).thenReturn(frameSizePolicy); when(writerConfig.frameSizePolicy()).thenReturn(frameSizePolicy);
when(frameSizePolicy.maxFrameSize()).thenReturn(64); when(frameSizePolicy.maxFrameSize()).thenReturn(64);
@ -206,6 +214,36 @@ public class DefaultHttp2ConnectionEncoderTest {
encoder.lifecycleManager(lifecycleManager); encoder.lifecycleManager(lifecycleManager);
} }
@Test
public void dataWithEndOfStreamWriteShouldSignalThatFrameWasConsumedOnError() throws Exception {
dataWriteShouldSignalThatFrameWasConsumedOnError0(true);
}
@Test
public void dataWriteShouldSignalThatFrameWasConsumedOnError() throws Exception {
dataWriteShouldSignalThatFrameWasConsumedOnError0(false);
}
private void dataWriteShouldSignalThatFrameWasConsumedOnError0(boolean endOfStream) throws Exception {
createStream(STREAM_ID, false);
final ByteBuf data = dummyData();
ChannelPromise p = newPromise();
encoder.writeData(ctx, STREAM_ID, data, 0, endOfStream, p);
FlowControlled controlled = payloadCaptor.getValue();
assertEquals(8, controlled.size());
payloadCaptor.getValue().write(ctx, 4);
assertEquals(4, controlled.size());
Throwable error = new IllegalStateException();
payloadCaptor.getValue().error(ctx, error);
payloadCaptor.getValue().write(ctx, 8);
assertEquals(0, controlled.size());
assertEquals("abcd", writtenData.get(0));
assertEquals(0, data.refCnt());
assertSame(error, p.cause());
}
@Test @Test
public void dataWriteShouldSucceed() throws Exception { public void dataWriteShouldSucceed() throws Exception {
createStream(STREAM_ID, false); createStream(STREAM_ID, false);

View File

@ -27,6 +27,7 @@ import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise; import io.netty.channel.ChannelPromise;
import io.netty.channel.local.LocalEventLoopGroup; import io.netty.channel.local.LocalEventLoopGroup;
@ -37,6 +38,7 @@ import io.netty.handler.codec.http2.Http2TestUtil.FrameCountDown;
import io.netty.handler.codec.http2.Http2TestUtil.Http2Runnable; import io.netty.handler.codec.http2.Http2TestUtil.Http2Runnable;
import io.netty.util.AsciiString; import io.netty.util.AsciiString;
import io.netty.util.IllegalReferenceCountException; import io.netty.util.IllegalReferenceCountException;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Future;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
@ -682,7 +684,7 @@ public class Http2ConnectionRoundtripTest {
writeOfEmptyReleasedBufferQueuedInFlowControllerShouldFail(WriteEmptyBufferMode.SECOND_WITH_TRAILERS); writeOfEmptyReleasedBufferQueuedInFlowControllerShouldFail(WriteEmptyBufferMode.SECOND_WITH_TRAILERS);
} }
public void writeOfEmptyReleasedBufferQueuedInFlowControllerShouldFail(final WriteEmptyBufferMode mode) private void writeOfEmptyReleasedBufferQueuedInFlowControllerShouldFail(final WriteEmptyBufferMode mode)
throws Exception { throws Exception {
bootstrapEnv(1, 1, 2, 1); bootstrapEnv(1, 1, 2, 1);
@ -728,6 +730,59 @@ public class Http2ConnectionRoundtripTest {
} }
} }
@Test
public void writeFailureFlowControllerRemoveFrame()
throws Exception {
bootstrapEnv(1, 1, 2, 1);
final ChannelPromise dataPromise = newPromise();
final ChannelPromise assertPromise = newPromise();
runInChannel(clientChannel, new Http2Runnable() {
@Override
public void run() throws Http2Exception {
http2Client.encoder().writeHeaders(ctx(), 3, EmptyHttp2Headers.INSTANCE, 0, (short) 16, false, 0, false,
newPromise());
clientChannel.pipeline().addFirst(new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
ReferenceCountUtil.release(msg);
// Ensure we update the window size so we will try to write the rest of the frame while
// processing the flush.
http2Client.encoder().flowController().initialWindowSize(8);
promise.setFailure(new IllegalStateException());
}
});
http2Client.encoder().flowController().initialWindowSize(4);
http2Client.encoder().writeData(ctx(), 3, randomBytes(8), 0, false, dataPromise);
assertTrue(http2Client.encoder().flowController()
.hasFlowControlled(http2Client.connection().stream(3)));
http2Client.flush(ctx());
try {
// The Frame should have been removed after the write failed.
assertFalse(http2Client.encoder().flowController()
.hasFlowControlled(http2Client.connection().stream(3)));
assertPromise.setSuccess();
} catch (Throwable error) {
assertPromise.setFailure(error);
}
}
});
try {
dataPromise.get();
fail();
} catch (ExecutionException e) {
assertThat(e.getCause(), is(instanceOf(IllegalStateException.class)));
}
assertPromise.sync();
}
@Test @Test
public void nonHttp2ExceptionInPipelineShouldNotCloseConnection() throws Exception { public void nonHttp2ExceptionInPipelineShouldNotCloseConnection() throws Exception {
bootstrapEnv(1, 1, 2, 1); bootstrapEnv(1, 1, 2, 1);