HTTP/2 multiplex: Correctly process buffered inbound data even if autoRead is false (#9389)
Motivation: When using the HTTP/2 multiplex implementation we need to ensure we correctly drain the buffered inbound data even if the RecvByteBufallocator.Handle tells us to stop reading in between. Modifications: Correctly loop through the buffered inbound data until the user does stop to request from it. Result: Fixes https://github.com/netty/netty/issues/9387. Co-authored-by: Bryce Anderson <banderson@twitter.com>
This commit is contained in:
parent
f6005c5b2d
commit
a677e62dd0
@ -764,24 +764,31 @@ abstract class AbstractHttp2StreamChannel extends DefaultAttributeMap implements
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private Object pollQueuedMessage() {
|
||||||
|
return inboundBuffer == null ? null : inboundBuffer.poll();
|
||||||
|
}
|
||||||
|
|
||||||
void doBeginRead() {
|
void doBeginRead() {
|
||||||
Object message;
|
// Process messages until there are none left (or the user stopped requesting) and also handle EOS.
|
||||||
if (inboundBuffer == null || (message = inboundBuffer.poll()) == null) {
|
while (readStatus != ReadStatus.IDLE) {
|
||||||
|
Object message = pollQueuedMessage();
|
||||||
|
if (message == null) {
|
||||||
if (readEOS) {
|
if (readEOS) {
|
||||||
unsafe.closeForcibly();
|
unsafe.closeForcibly();
|
||||||
}
|
}
|
||||||
} else {
|
break;
|
||||||
|
}
|
||||||
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
|
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
|
||||||
allocHandle.reset(config());
|
allocHandle.reset(config());
|
||||||
boolean continueReading = false;
|
boolean continueReading = false;
|
||||||
do {
|
do {
|
||||||
flowControlledBytes += doRead0((Http2Frame) message, allocHandle);
|
flowControlledBytes += doRead0((Http2Frame) message, allocHandle);
|
||||||
} while ((readEOS || (continueReading = allocHandle.continueReading())) &&
|
} while ((readEOS || (continueReading = allocHandle.continueReading()))
|
||||||
inboundBuffer != null && (message = inboundBuffer.poll()) != null);
|
&& (message = pollQueuedMessage()) != null);
|
||||||
|
|
||||||
if (continueReading && isParentReadInProgress() && !readEOS) {
|
if (continueReading && isParentReadInProgress() && !readEOS) {
|
||||||
// Currently the parent and child channel are on the same EventLoop thread. If the parent is
|
// Currently the parent and child channel are on the same EventLoop thread. If the parent is
|
||||||
// currently reading it is possile that more frames will be delivered to this child channel. In
|
// currently reading it is possible that more frames will be delivered to this child channel. In
|
||||||
// the case that this child channel still wants to read we delay the channelReadComplete on this
|
// the case that this child channel still wants to read we delay the channelReadComplete on this
|
||||||
// child channel until the parent is done reading.
|
// child channel until the parent is done reading.
|
||||||
if (!readCompletePending) {
|
if (!readCompletePending) {
|
||||||
|
@ -890,6 +890,11 @@ public abstract class Http2MultiplexTest<C extends Http2FrameCodec> {
|
|||||||
|
|
||||||
// Detecting EOS should flush all pending data regardless of read calls.
|
// Detecting EOS should flush all pending data regardless of read calls.
|
||||||
assertEqualsAndRelease(dataFrame2, inboundHandler.<Http2DataFrame>readInbound());
|
assertEqualsAndRelease(dataFrame2, inboundHandler.<Http2DataFrame>readInbound());
|
||||||
|
assertNull(inboundHandler.readInbound());
|
||||||
|
|
||||||
|
// As we limited the number to 1 we also need to call read() again.
|
||||||
|
childChannel.read();
|
||||||
|
|
||||||
assertEqualsAndRelease(dataFrame3, inboundHandler.<Http2DataFrame>readInbound());
|
assertEqualsAndRelease(dataFrame3, inboundHandler.<Http2DataFrame>readInbound());
|
||||||
assertEqualsAndRelease(dataFrame4, inboundHandler.<Http2DataFrame>readInbound());
|
assertEqualsAndRelease(dataFrame4, inboundHandler.<Http2DataFrame>readInbound());
|
||||||
|
|
||||||
@ -1064,12 +1069,71 @@ public abstract class Http2MultiplexTest<C extends Http2FrameCodec> {
|
|||||||
assertEquals(4, channelReadCompleteCount.get());
|
assertEquals(4, channelReadCompleteCount.get());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void useReadWithoutAutoReadInRead() {
|
||||||
|
useReadWithoutAutoReadBuffered(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void useReadWithoutAutoReadInReadComplete() {
|
||||||
|
useReadWithoutAutoReadBuffered(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void useReadWithoutAutoReadBuffered(final boolean triggerOnReadComplete) {
|
||||||
|
LastInboundHandler inboundHandler = new LastInboundHandler();
|
||||||
|
Http2StreamChannel childChannel = newInboundStream(3, false, inboundHandler);
|
||||||
|
assertTrue(childChannel.config().isAutoRead());
|
||||||
|
childChannel.config().setAutoRead(false);
|
||||||
|
assertFalse(childChannel.config().isAutoRead());
|
||||||
|
|
||||||
|
Http2HeadersFrame headersFrame = inboundHandler.readInbound();
|
||||||
|
assertNotNull(headersFrame);
|
||||||
|
|
||||||
|
// Write some bytes to get the channel into the idle state with buffered data and also verify we
|
||||||
|
// do not dispatch it until we receive a read() call.
|
||||||
|
frameInboundWriter.writeInboundData(childChannel.stream().id(), bb("hello world"), 0, false);
|
||||||
|
frameInboundWriter.writeInboundData(childChannel.stream().id(), bb("foo"), 0, false);
|
||||||
|
frameInboundWriter.writeInboundData(childChannel.stream().id(), bb("bar"), 0, false);
|
||||||
|
|
||||||
|
// Add a handler which will request reads.
|
||||||
|
childChannel.pipeline().addFirst(new ChannelInboundHandlerAdapter() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
|
||||||
|
super.channelReadComplete(ctx);
|
||||||
|
if (triggerOnReadComplete) {
|
||||||
|
ctx.read();
|
||||||
|
ctx.read();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void channelRead(ChannelHandlerContext ctx, Object msg) {
|
||||||
|
ctx.fireChannelRead(msg);
|
||||||
|
if (!triggerOnReadComplete) {
|
||||||
|
ctx.read();
|
||||||
|
ctx.read();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
inboundHandler.channel().read();
|
||||||
|
|
||||||
|
verifyFramesMultiplexedToCorrectChannel(childChannel, inboundHandler, 3);
|
||||||
|
|
||||||
|
frameInboundWriter.writeInboundData(childChannel.stream().id(), bb("hello world2"), 0, false);
|
||||||
|
frameInboundWriter.writeInboundData(childChannel.stream().id(), bb("foo2"), 0, false);
|
||||||
|
frameInboundWriter.writeInboundData(childChannel.stream().id(), bb("bar2"), 0, true);
|
||||||
|
|
||||||
|
verifyFramesMultiplexedToCorrectChannel(childChannel, inboundHandler, 3);
|
||||||
|
}
|
||||||
|
|
||||||
private static void verifyFramesMultiplexedToCorrectChannel(Http2StreamChannel streamChannel,
|
private static void verifyFramesMultiplexedToCorrectChannel(Http2StreamChannel streamChannel,
|
||||||
LastInboundHandler inboundHandler,
|
LastInboundHandler inboundHandler,
|
||||||
int numFrames) {
|
int numFrames) {
|
||||||
for (int i = 0; i < numFrames; i++) {
|
for (int i = 0; i < numFrames; i++) {
|
||||||
Http2StreamFrame frame = inboundHandler.readInbound();
|
Http2StreamFrame frame = inboundHandler.readInbound();
|
||||||
assertNotNull(frame);
|
assertNotNull(i + " out of " + numFrames + " received", frame);
|
||||||
assertEquals(streamChannel.stream(), frame.stream());
|
assertEquals(streamChannel.stream(), frame.stream());
|
||||||
release(frame);
|
release(frame);
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user