Ensure FlowControlled data frames will be correctly removed from the … (#8726)
Motivation: When a write error happens during writing of flowcontrolled data frames we miss to correctly detect this in the write loop which may result in an infinite loop as we will never detect that the frame should be removed from the queue. Modifications: - When we fail a flowcontrolled data frame we ensure that the next frame.write(...) call will signal back that the whole frame was handled and so can be removed. - Add unit test. Result: Fixes https://github.com/netty/netty/issues/8707.
This commit is contained in:
parent
df5eb060f7
commit
dae5d9d3f9
@ -397,6 +397,9 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
|
||||
queue.releaseAndFailAll(cause);
|
||||
// Don't update dataSize because we need to ensure the size() method returns a consistent size even after
|
||||
// error so we don't invalidate flow control when returning bytes to flow control.
|
||||
//
|
||||
// That said we will set dataSize and padding to 0 in the write(...) method if we cleared the queue
|
||||
// because of an error.
|
||||
lifecycleManager.onError(ctx, true, cause);
|
||||
}
|
||||
|
||||
@ -405,11 +408,21 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
|
||||
int queuedData = queue.readableBytes();
|
||||
if (!endOfStream) {
|
||||
if (queuedData == 0) {
|
||||
// There's no need to write any data frames because there are only empty data frames in the queue
|
||||
// and it is not end of stream yet. Just complete their promises by getting the buffer corresponding
|
||||
// to 0 bytes and writing it to the channel (to preserve notification order).
|
||||
ChannelPromise writePromise = ctx.newPromise().addListener(this);
|
||||
ctx.write(queue.remove(0, writePromise), writePromise);
|
||||
if (queue.isEmpty()) {
|
||||
// When the queue is empty it means we did clear it because of an error(...) call
|
||||
// (as otherwise we will have at least 1 entry in there), which will happen either when called
|
||||
// explicit or when the write itself fails. In this case just set dataSize and padding to 0
|
||||
// which will signal back that the whole frame was consumed.
|
||||
//
|
||||
// See https://github.com/netty/netty/issues/8707.
|
||||
padding = dataSize = 0;
|
||||
} else {
|
||||
// There's no need to write any data frames because there are only empty data frames in the
|
||||
// queue and it is not end of stream yet. Just complete their promises by getting the buffer
|
||||
// corresponding to 0 bytes and writing it to the channel (to preserve notification order).
|
||||
ChannelPromise writePromise = ctx.newPromise().addListener(this);
|
||||
ctx.write(queue.remove(0, writePromise), writePromise);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -55,6 +55,7 @@ import static org.hamcrest.CoreMatchers.instanceOf;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertSame;
|
||||
import static org.junit.Assert.assertThat;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
@ -126,6 +127,13 @@ public class DefaultHttp2ConnectionEncoderTest {
|
||||
when(channel.unsafe()).thenReturn(unsafe);
|
||||
ChannelConfig config = new DefaultChannelConfig(channel);
|
||||
when(channel.config()).thenReturn(config);
|
||||
doAnswer(new Answer<ChannelFuture>() {
|
||||
@Override
|
||||
public ChannelFuture answer(InvocationOnMock in) {
|
||||
return newPromise().setFailure((Throwable) in.getArgument(0));
|
||||
}
|
||||
}).when(channel).newFailedFuture(any(Throwable.class));
|
||||
|
||||
when(writer.configuration()).thenReturn(writerConfig);
|
||||
when(writerConfig.frameSizePolicy()).thenReturn(frameSizePolicy);
|
||||
when(frameSizePolicy.maxFrameSize()).thenReturn(64);
|
||||
@ -206,6 +214,36 @@ public class DefaultHttp2ConnectionEncoderTest {
|
||||
encoder.lifecycleManager(lifecycleManager);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void dataWithEndOfStreamWriteShouldSignalThatFrameWasConsumedOnError() throws Exception {
|
||||
dataWriteShouldSignalThatFrameWasConsumedOnError0(true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void dataWriteShouldSignalThatFrameWasConsumedOnError() throws Exception {
|
||||
dataWriteShouldSignalThatFrameWasConsumedOnError0(false);
|
||||
}
|
||||
|
||||
private void dataWriteShouldSignalThatFrameWasConsumedOnError0(boolean endOfStream) throws Exception {
|
||||
createStream(STREAM_ID, false);
|
||||
final ByteBuf data = dummyData();
|
||||
ChannelPromise p = newPromise();
|
||||
encoder.writeData(ctx, STREAM_ID, data, 0, endOfStream, p);
|
||||
|
||||
FlowControlled controlled = payloadCaptor.getValue();
|
||||
assertEquals(8, controlled.size());
|
||||
payloadCaptor.getValue().write(ctx, 4);
|
||||
assertEquals(4, controlled.size());
|
||||
|
||||
Throwable error = new IllegalStateException();
|
||||
payloadCaptor.getValue().error(ctx, error);
|
||||
payloadCaptor.getValue().write(ctx, 8);
|
||||
assertEquals(0, controlled.size());
|
||||
assertEquals("abcd", writtenData.get(0));
|
||||
assertEquals(0, data.refCnt());
|
||||
assertSame(error, p.cause());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void dataWriteShouldSucceed() throws Exception {
|
||||
createStream(STREAM_ID, false);
|
||||
|
@ -27,6 +27,7 @@ import io.netty.channel.ChannelHandlerAdapter;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelInboundHandlerAdapter;
|
||||
import io.netty.channel.ChannelInitializer;
|
||||
import io.netty.channel.ChannelOutboundHandlerAdapter;
|
||||
import io.netty.channel.ChannelPipeline;
|
||||
import io.netty.channel.ChannelPromise;
|
||||
import io.netty.channel.DefaultEventLoopGroup;
|
||||
@ -37,6 +38,7 @@ import io.netty.handler.codec.http2.Http2TestUtil.FrameCountDown;
|
||||
import io.netty.handler.codec.http2.Http2TestUtil.Http2Runnable;
|
||||
import io.netty.util.AsciiString;
|
||||
import io.netty.util.IllegalReferenceCountException;
|
||||
import io.netty.util.ReferenceCountUtil;
|
||||
import io.netty.util.concurrent.Future;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
@ -682,7 +684,7 @@ public class Http2ConnectionRoundtripTest {
|
||||
writeOfEmptyReleasedBufferQueuedInFlowControllerShouldFail(WriteEmptyBufferMode.SECOND_WITH_TRAILERS);
|
||||
}
|
||||
|
||||
public void writeOfEmptyReleasedBufferQueuedInFlowControllerShouldFail(final WriteEmptyBufferMode mode)
|
||||
private void writeOfEmptyReleasedBufferQueuedInFlowControllerShouldFail(final WriteEmptyBufferMode mode)
|
||||
throws Exception {
|
||||
bootstrapEnv(1, 1, 2, 1);
|
||||
|
||||
@ -728,6 +730,59 @@ public class Http2ConnectionRoundtripTest {
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void writeFailureFlowControllerRemoveFrame()
|
||||
throws Exception {
|
||||
bootstrapEnv(1, 1, 2, 1);
|
||||
|
||||
final ChannelPromise dataPromise = newPromise();
|
||||
final ChannelPromise assertPromise = newPromise();
|
||||
|
||||
runInChannel(clientChannel, new Http2Runnable() {
|
||||
@Override
|
||||
public void run() throws Http2Exception {
|
||||
http2Client.encoder().writeHeaders(ctx(), 3, EmptyHttp2Headers.INSTANCE, 0, (short) 16, false, 0, false,
|
||||
newPromise());
|
||||
clientChannel.pipeline().addFirst(new ChannelOutboundHandlerAdapter() {
|
||||
@Override
|
||||
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
|
||||
ReferenceCountUtil.release(msg);
|
||||
|
||||
// Ensure we update the window size so we will try to write the rest of the frame while
|
||||
// processing the flush.
|
||||
http2Client.encoder().flowController().initialWindowSize(8);
|
||||
promise.setFailure(new IllegalStateException());
|
||||
}
|
||||
});
|
||||
|
||||
http2Client.encoder().flowController().initialWindowSize(4);
|
||||
http2Client.encoder().writeData(ctx(), 3, randomBytes(8), 0, false, dataPromise);
|
||||
assertTrue(http2Client.encoder().flowController()
|
||||
.hasFlowControlled(http2Client.connection().stream(3)));
|
||||
|
||||
http2Client.flush(ctx());
|
||||
|
||||
try {
|
||||
// The Frame should have been removed after the write failed.
|
||||
assertFalse(http2Client.encoder().flowController()
|
||||
.hasFlowControlled(http2Client.connection().stream(3)));
|
||||
assertPromise.setSuccess();
|
||||
} catch (Throwable error) {
|
||||
assertPromise.setFailure(error);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
try {
|
||||
dataPromise.get();
|
||||
fail();
|
||||
} catch (ExecutionException e) {
|
||||
assertThat(e.getCause(), is(instanceOf(IllegalStateException.class)));
|
||||
}
|
||||
|
||||
assertPromise.sync();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void nonHttp2ExceptionInPipelineShouldNotCloseConnection() throws Exception {
|
||||
bootstrapEnv(1, 1, 2, 1);
|
||||
|
Loading…
Reference in New Issue
Block a user