diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2MultiplexCodec.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2MultiplexCodec.java index 9c3874c4d3..a7ca0bb831 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2MultiplexCodec.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2MultiplexCodec.java @@ -31,10 +31,10 @@ import io.netty.channel.ChannelProgressivePromise; import io.netty.channel.ChannelPromise; import io.netty.channel.DefaultChannelConfig; import io.netty.channel.DefaultChannelPipeline; -import io.netty.channel.DefaultMaxMessagesRecvByteBufAllocator; import io.netty.channel.EventLoop; import io.netty.channel.MessageSizeEstimator; import io.netty.channel.RecvByteBufAllocator; +import io.netty.channel.RecvByteBufAllocator.Handle; import io.netty.channel.VoidChannelPromise; import io.netty.channel.WriteBufferWaterMark; import io.netty.util.DefaultAttributeMap; @@ -56,7 +56,6 @@ import static io.netty.handler.codec.http2.Http2CodecUtil.HTTP_UPGRADE_STREAM_ID import static io.netty.handler.codec.http2.Http2CodecUtil.isStreamIdValid; import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR; import static io.netty.handler.codec.http2.Http2Exception.connectionError; - import static java.lang.Math.min; /** @@ -111,7 +110,7 @@ public class Http2MultiplexCodec extends Http2FrameCodec { private static final ChannelFutureListener CHILD_CHANNEL_REGISTRATION_LISTENER = new ChannelFutureListener() { @Override - public void operationComplete(ChannelFuture future) throws Exception { + public void operationComplete(ChannelFuture future) { registerDone(future); } }; @@ -148,19 +147,6 @@ public class Http2MultiplexCodec extends Http2FrameCodec { } } - private static final class Http2StreamChannelRecvByteBufAllocator extends DefaultMaxMessagesRecvByteBufAllocator { - - @Override - public MaxMessageHandle newHandle() { - return new MaxMessageHandle() { - @Override - public int guess() { - return 1024; - } - }; - } - } - private final ChannelHandler inboundStreamHandler; private final ChannelHandler upgradeStreamHandler; @@ -230,7 +216,7 @@ public class Http2MultiplexCodec extends Http2FrameCodec { while (ch != null) { DefaultHttp2StreamChannel curr = ch; ch = curr.next; - curr.next = null; + curr.next = curr.previous = null; } head = tail = null; } @@ -244,7 +230,7 @@ public class Http2MultiplexCodec extends Http2FrameCodec { final void onHttp2Frame(ChannelHandlerContext ctx, Http2Frame frame) { if (frame instanceof Http2StreamFrame) { Http2StreamFrame streamFrame = (Http2StreamFrame) frame; - onHttp2StreamFrame(((Http2MultiplexCodecStream) streamFrame.stream()).channel, streamFrame); + ((Http2MultiplexCodecStream) streamFrame.stream()).channel.fireChildRead(streamFrame); } else if (frame instanceof Http2GoAwayFrame) { onHttp2GoAwayFrame(ctx, (Http2GoAwayFrame) frame); // Allow other handlers to act on GOAWAY frame @@ -331,38 +317,48 @@ public class Http2MultiplexCodec extends Http2FrameCodec { } } - private void onHttp2StreamFrame(DefaultHttp2StreamChannel childChannel, Http2StreamFrame frame) { - switch (childChannel.fireChildRead(frame)) { - case READ_PROCESSED_BUT_STOP_READING: - childChannel.fireChildReadComplete(); - break; - case READ_PROCESSED_OK_TO_PROCESS_MORE: - addChildChannelToReadPendingQueue(childChannel); - break; - case READ_IGNORED_CHANNEL_INACTIVE: - case READ_QUEUED: - // nothing to do: - break; - default: - throw new Error(); + private boolean isChildChannelInReadPendingQueue(DefaultHttp2StreamChannel childChannel) { + return childChannel.previous != null || childChannel.next != null || head == childChannel; + } + + final void tryAddChildChannelToReadPendingQueue(DefaultHttp2StreamChannel childChannel) { + if (!isChildChannelInReadPendingQueue(childChannel)) { + addChildChannelToReadPendingQueue(childChannel); } } final void addChildChannelToReadPendingQueue(DefaultHttp2StreamChannel childChannel) { - if (!childChannel.fireChannelReadPending) { - assert childChannel.next == null; - - if (tail == null) { - assert head == null; - tail = head = childChannel; - } else { - tail.next = childChannel; - tail = childChannel; - } - childChannel.fireChannelReadPending = true; + if (tail == null) { + assert head == null; + tail = head = childChannel; + } else { + childChannel.previous = tail; + tail.next = childChannel; + tail = childChannel; } } + private void tryRemoveChildChannelFromReadPendingQueue(DefaultHttp2StreamChannel childChannel) { + if (isChildChannelInReadPendingQueue(childChannel)) { + removeChildChannelFromReadPendingQueue(childChannel); + } + } + + private void removeChildChannelFromReadPendingQueue(DefaultHttp2StreamChannel childChannel) { + DefaultHttp2StreamChannel previous = childChannel.previous; + if (childChannel.next != null) { + childChannel.next.previous = previous; + } else { + tail = tail.previous; // If there is no next, this childChannel is the tail, so move the tail back. + } + if (previous != null) { + previous.next = childChannel.next; + } else { + head = head.next; // If there is no previous, this childChannel is the head, so move the tail forward. + } + childChannel.next = childChannel.previous = null; + } + private void onHttp2GoAwayFrame(ChannelHandlerContext ctx, final Http2GoAwayFrame goAwayFrame) { try { forEachActiveStream(new Http2FrameStreamVisitor() { @@ -387,8 +383,14 @@ public class Http2MultiplexCodec extends Http2FrameCodec { */ @Override public final void channelReadComplete(ChannelHandlerContext ctx) throws Exception { - parentReadInProgress = false; - onChannelReadComplete(ctx); + try { + onChannelReadComplete(ctx); + } finally { + parentReadInProgress = false; + tail = head = null; + // We always flush as this is what Http2ConnectionHandler does for now. + flush0(ctx); + } channelReadComplete0(ctx); } @@ -402,23 +404,13 @@ public class Http2MultiplexCodec extends Http2FrameCodec { // If we have many child channel we can optimize for the case when multiple call flush() in // channelReadComplete(...) callbacks and only do it once as otherwise we will end-up with multiple // write calls on the socket which is expensive. - try { - DefaultHttp2StreamChannel current = head; - while (current != null) { - DefaultHttp2StreamChannel childChannel = current; - if (childChannel.fireChannelReadPending) { - // Clear early in case fireChildReadComplete() causes it to need to be re-processed - childChannel.fireChannelReadPending = false; - childChannel.fireChildReadComplete(); - } - childChannel.next = null; - current = current.next; - } - } finally { - tail = head = null; - - // We always flush as this is what Http2ConnectionHandler does for now. - flush0(ctx); + DefaultHttp2StreamChannel current = head; + while (current != null) { + DefaultHttp2StreamChannel childChannel = current; + // Clear early in case fireChildReadComplete() causes it to need to be re-processed + current = current.next; + childChannel.next = childChannel.previous = null; + childChannel.fireChildReadComplete(); } } @@ -447,13 +439,6 @@ public class Http2MultiplexCodec extends Http2FrameCodec { DefaultHttp2StreamChannel channel; } - private enum ReadState { - READ_QUEUED, - READ_IGNORED_CHANNEL_INACTIVE, - READ_PROCESSED_BUT_STOP_READING, - READ_PROCESSED_OK_TO_PROCESS_MORE - } - private boolean initialWritability(DefaultHttp2FrameStream stream) { // If the stream id is not valid yet we will just mark the channel as writable as we will be notified // about non-writability state as soon as the first Http2HeaderFrame is written (if needed). @@ -476,24 +461,24 @@ public class Http2MultiplexCodec extends Http2FrameCodec { private volatile boolean writable; private boolean outboundClosed; - private boolean closePending; + /** + * This variable represents if a read is in progress for the current channel. Note that depending upon the + * {@link RecvByteBufAllocator} behavior a read may extend beyond the {@link Http2ChannelUnsafe#beginRead()} + * method scope. The {@link Http2ChannelUnsafe#beginRead()} loop may drain all pending data, and then if the + * parent channel is reading this channel may still accept frames. + */ private boolean readInProgress; private Queue inboundBuffer; /** {@code true} after the first HEADERS frame has been written **/ private boolean firstFrameWritten; - /** {@code true} if a close without an error was initiated **/ - private boolean streamClosedWithoutError; - - // Keeps track of flush calls in channelReadComplete(...) and aggregate these. - private boolean inFireChannelReadComplete; - - boolean fireChannelReadPending; - - // Holds the reference to the next DefaultHttp2StreamChannel that should be processed in - // channelReadComplete(...) + // Currently the child channel and parent channel are always on the same EventLoop thread. This allows us to + // extend the read loop of a child channel if the child channel drains its queued data during read, and the + // parent channel is still in its read loop. The next/previous links build a doubly linked list that the parent + // channel will iterate in its channelReadComplete to end the read cycle for each child channel in the list. DefaultHttp2StreamChannel next; + DefaultHttp2StreamChannel previous; DefaultHttp2StreamChannel(DefaultHttp2FrameStream stream, boolean outbound) { this.stream = stream; @@ -521,13 +506,10 @@ public class Http2MultiplexCodec extends Http2FrameCodec { } void streamClosed() { - streamClosedWithoutError = true; - if (readInProgress) { - // Just call closeForcibly() as this will take care of fireChannelInactive(). - unsafe().closeForcibly(); - } else { - closePending = true; - } + unsafe.readEOS(); + // Attempt to drain any queued data from the queue and deliver it to the application before closing this + // channel. + unsafe.doBeginRead(); } @Override @@ -771,49 +753,48 @@ public class Http2MultiplexCodec extends Http2FrameCodec { * Receive a read message. This does not notify handlers unless a read is in progress on the * channel. */ - ReadState fireChildRead(Http2Frame frame) { + void fireChildRead(Http2Frame frame) { assert eventLoop().inEventLoop(); if (!isActive()) { ReferenceCountUtil.release(frame); - return ReadState.READ_IGNORED_CHANNEL_INACTIVE; - } - if (readInProgress && (inboundBuffer == null || inboundBuffer.isEmpty())) { - // Check for null because inboundBuffer doesn't support null; we want to be consistent - // for what values are supported. - RecvByteBufAllocator.ExtendedHandle allocHandle = unsafe.recvBufAllocHandle(); + } else if (readInProgress) { + // If readInProgress there cannot be anything in the queue, otherwise we would have drained it from the + // queue and processed it during the read cycle. + assert inboundBuffer == null || inboundBuffer.isEmpty(); + final Handle allocHandle = unsafe.recvBufAllocHandle(); unsafe.doRead0(frame, allocHandle); - return allocHandle.continueReading() ? - ReadState.READ_PROCESSED_OK_TO_PROCESS_MORE : ReadState.READ_PROCESSED_BUT_STOP_READING; + // We currently don't need to check for readEOS because the parent channel and child channel are limited + // to the same EventLoop thread. There are a limited number of frame types that may come after EOS is + // read (unknown, reset) and the trade off is less conditionals for the hot path (headers/data) at the + // cost of additional readComplete notifications on the rare path. + if (allocHandle.continueReading()) { + tryAddChildChannelToReadPendingQueue(this); + } else { + tryRemoveChildChannelFromReadPendingQueue(this); + unsafe.notifyReadComplete(allocHandle); + } } else { if (inboundBuffer == null) { inboundBuffer = new ArrayDeque(4); } inboundBuffer.add(frame); - return ReadState.READ_QUEUED; } } void fireChildReadComplete() { assert eventLoop().inEventLoop(); - try { - if (readInProgress) { - inFireChannelReadComplete = true; - readInProgress = false; - unsafe().recvBufAllocHandle().readComplete(); - pipeline().fireChannelReadComplete(); - } - } finally { - inFireChannelReadComplete = false; - } + assert readInProgress; + unsafe.notifyReadComplete(unsafe.recvBufAllocHandle()); } private final class Http2ChannelUnsafe implements Unsafe { private final VoidChannelPromise unsafeVoidPromise = new VoidChannelPromise(DefaultHttp2StreamChannel.this, false); @SuppressWarnings("deprecation") - private RecvByteBufAllocator.ExtendedHandle recvHandle; + private Handle recvHandle; private boolean writeDoneAndNoFlush; private boolean closeInitiated; + private boolean readEOS; @Override public void connect(final SocketAddress remoteAddress, @@ -825,9 +806,10 @@ public class Http2MultiplexCodec extends Http2FrameCodec { } @Override - public RecvByteBufAllocator.ExtendedHandle recvBufAllocHandle() { + public Handle recvBufAllocHandle() { if (recvHandle == null) { - recvHandle = (RecvByteBufAllocator.ExtendedHandle) config().getRecvByteBufAllocator().newHandle(); + recvHandle = config().getRecvByteBufAllocator().newHandle(); + recvHandle.reset(config()); } return recvHandle; } @@ -892,7 +874,7 @@ public class Http2MultiplexCodec extends Http2FrameCodec { // This means close() was called before so we just register a listener and return closePromise.addListener(new ChannelFutureListener() { @Override - public void operationComplete(ChannelFuture future) throws Exception { + public void operationComplete(ChannelFuture future) { promise.setSuccess(); } }); @@ -901,15 +883,13 @@ public class Http2MultiplexCodec extends Http2FrameCodec { } closeInitiated = true; - closePending = false; - fireChannelReadPending = false; + tryRemoveChildChannelFromReadPendingQueue(DefaultHttp2StreamChannel.this); final boolean wasActive = isActive(); // Only ever send a reset frame if the connection is still alive and if the stream may have existed // as otherwise we may send a RST on a stream in an invalid state and cause a connection error. - if (parent().isActive() && !streamClosedWithoutError && - connection().streamMayHaveExisted(stream().id())) { + if (parent().isActive() && !readEOS && connection().streamMayHaveExisted(stream().id())) { Http2StreamFrame resetFrame = new DefaultHttp2ResetFrame(Http2Error.CANCEL).stream(stream()); write(resetFrame, unsafe().voidPromise()); flush(); @@ -1009,64 +989,74 @@ public class Http2MultiplexCodec extends Http2FrameCodec { return; } readInProgress = true; + doBeginRead(); + } - final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); - allocHandle.reset(config()); - if (inboundBuffer == null || inboundBuffer.isEmpty()) { - if (closePending) { + void doBeginRead() { + Object message; + if (inboundBuffer == null || (message = inboundBuffer.poll()) == null) { + if (readEOS) { unsafe.closeForcibly(); } - return; - } - - // We have already checked that the queue is not empty, so before this value is used it will always be - // set by allocHandle.continueReading(). - boolean continueReading; - do { - Object m = inboundBuffer.poll(); - if (m == null) { - continueReading = false; - break; - } - doRead0((Http2Frame) m, allocHandle); - } while (continueReading = allocHandle.continueReading()); - - if (continueReading && parentReadInProgress) { - // We don't know if more frames will be delivered in the parent channel's read loop, so add this - // channel to the channelReadComplete queue to be notified later. - addChildChannelToReadPendingQueue(DefaultHttp2StreamChannel.this); } else { - // Reading data may result in frames being written (e.g. WINDOW_UPDATE, RST, etc..). If the parent - // channel is not currently reading we need to force a flush at the child channel, because we cannot - // rely upon flush occurring in channelReadComplete on the parent channel. - readInProgress = false; - allocHandle.readComplete(); - pipeline().fireChannelReadComplete(); - flush(); - if (closePending) { - unsafe.closeForcibly(); + final Handle allocHandle = recvBufAllocHandle(); + allocHandle.reset(config()); + boolean continueReading = false; + do { + doRead0((Http2Frame) message, allocHandle); + } while ((readEOS || (continueReading = allocHandle.continueReading())) && + (message = inboundBuffer.poll()) != null); + + if (continueReading && parentReadInProgress && !readEOS) { + // Currently the parent and child channel are on the same EventLoop thread. If the parent is + // currently reading it is possile that more frames will be delivered to this child channel. In + // the case that this child channel still wants to read we delay the channelReadComplete on this + // child channel until the parent is done reading. + assert !isChildChannelInReadPendingQueue(DefaultHttp2StreamChannel.this); + addChildChannelToReadPendingQueue(DefaultHttp2StreamChannel.this); + } else { + notifyReadComplete(allocHandle); } } } - @SuppressWarnings("deprecation") - void doRead0(Http2Frame frame, RecvByteBufAllocator.Handle allocHandle) { - int numBytesToBeConsumed = 0; - if (frame instanceof Http2DataFrame) { - numBytesToBeConsumed = ((Http2DataFrame) frame).initialFlowControlledBytes(); - allocHandle.lastBytesRead(numBytesToBeConsumed); - } else { - allocHandle.lastBytesRead(MIN_HTTP2_FRAME_SIZE); - } - allocHandle.incMessagesRead(1); - pipeline().fireChannelRead(frame); + void readEOS() { + readEOS = true; + } - if (numBytesToBeConsumed != 0) { - try { - writeDoneAndNoFlush |= onBytesConsumed(ctx, stream, numBytesToBeConsumed); - } catch (Http2Exception e) { - pipeline().fireExceptionCaught(e); + void notifyReadComplete(Handle allocHandle) { + assert next == null && previous == null; + readInProgress = false; + allocHandle.readComplete(); + pipeline().fireChannelReadComplete(); + // Reading data may result in frames being written (e.g. WINDOW_UPDATE, RST, etc..). If the parent + // channel is not currently reading we need to force a flush at the child channel, because we cannot + // rely upon flush occurring in channelReadComplete on the parent channel. + flush(); + if (readEOS) { + unsafe.closeForcibly(); + } + } + + @SuppressWarnings("deprecation") + void doRead0(Http2Frame frame, Handle allocHandle) { + pipeline().fireChannelRead(frame); + allocHandle.incMessagesRead(1); + + if (frame instanceof Http2DataFrame) { + final int numBytesToBeConsumed = ((Http2DataFrame) frame).initialFlowControlledBytes(); + allocHandle.attemptedBytesRead(numBytesToBeConsumed); + allocHandle.lastBytesRead(numBytesToBeConsumed); + if (numBytesToBeConsumed != 0) { + try { + writeDoneAndNoFlush |= onBytesConsumed(ctx, stream, numBytesToBeConsumed); + } catch (Http2Exception e) { + pipeline().fireExceptionCaught(e); + } } + } else { + allocHandle.attemptedBytesRead(MIN_HTTP2_FRAME_SIZE); + allocHandle.lastBytesRead(MIN_HTTP2_FRAME_SIZE); } } @@ -1104,7 +1094,7 @@ public class Http2MultiplexCodec extends Http2FrameCodec { } else { future.addListener(new ChannelFutureListener() { @Override - public void operationComplete(ChannelFuture future) throws Exception { + public void operationComplete(ChannelFuture future) { firstWriteComplete(future, promise); } }); @@ -1126,7 +1116,7 @@ public class Http2MultiplexCodec extends Http2FrameCodec { } else { future.addListener(new ChannelFutureListener() { @Override - public void operationComplete(ChannelFuture future) throws Exception { + public void operationComplete(ChannelFuture future) { writeComplete(future, promise); } }); @@ -1197,18 +1187,16 @@ public class Http2MultiplexCodec extends Http2FrameCodec { @Override public void flush() { - if (!writeDoneAndNoFlush) { + // If we are currently in the parent channel's read loop we should just ignore the flush. + // We will ensure we trigger ctx.flush() after we processed all Channels later on and + // so aggregate the flushes. This is done as ctx.flush() is expensive when as it may trigger an + // write(...) or writev(...) operation on the socket. + if (!writeDoneAndNoFlush || parentReadInProgress) { // There is nothing to flush so this is a NOOP. return; } try { - // If we are currently in the channelReadComplete(...) call we should just ignore the flush. - // We will ensure we trigger ctx.flush() after we processed all Channels later on and - // so aggregate the flushes. This is done as ctx.flush() is expensive when as it may trigger an - // write(...) or writev(...) operation on the socket. - if (!inFireChannelReadComplete) { - flush0(ctx); - } + flush0(ctx); } finally { writeDoneAndNoFlush = false; } @@ -1232,10 +1220,8 @@ public class Http2MultiplexCodec extends Http2FrameCodec { * changes. */ private final class Http2StreamChannelConfig extends DefaultChannelConfig { - Http2StreamChannelConfig(Channel channel) { super(channel); - setRecvByteBufAllocator(new Http2StreamChannelRecvByteBufAllocator()); } @Override diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2MultiplexCodecTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2MultiplexCodecTest.java index f96b12bbe6..f09d55e916 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2MultiplexCodecTest.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2MultiplexCodecTest.java @@ -28,8 +28,14 @@ import io.netty.channel.embedded.EmbeddedChannel; import io.netty.handler.codec.http.HttpMethod; import io.netty.handler.codec.http.HttpScheme; import io.netty.handler.codec.http2.Http2Exception.StreamException; +import io.netty.handler.codec.http2.LastInboundHandler.Consumer; import io.netty.util.AsciiString; import io.netty.util.AttributeKey; +import io.netty.util.ReferenceCountUtil; +import org.junit.After; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; import java.net.InetSocketAddress; import java.nio.channels.ClosedChannelException; @@ -38,12 +44,6 @@ import java.util.Queue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import io.netty.util.ReferenceCountUtil; -import org.junit.After; -import org.junit.Before; -import org.junit.Ignore; -import org.junit.Test; - import static io.netty.util.ReferenceCountUtil.release; import static org.hamcrest.Matchers.instanceOf; import static org.junit.Assert.assertEquals; @@ -743,9 +743,197 @@ public class Http2MultiplexCodecTest { childChannel.closeFuture().syncUninterruptibly(); } + @Test + public void endOfStreamDoesNotDiscardData() { + AtomicInteger numReads = new AtomicInteger(1); + final AtomicBoolean shouldDisableAutoRead = new AtomicBoolean(); + Consumer ctxConsumer = new Consumer() { + @Override + public void accept(ChannelHandlerContext obj) { + if (shouldDisableAutoRead.get()) { + obj.channel().config().setAutoRead(false); + } + } + }; + LastInboundHandler inboundHandler = streamActiveAndWriteHeaders(inboundStream, numReads, ctxConsumer); + Http2StreamChannel childChannel = (Http2StreamChannel) inboundHandler.channel(); + childChannel.config().setAutoRead(false); + + Http2DataFrame dataFrame1 = new DefaultHttp2DataFrame(bb("1")).stream(inboundStream); + Http2DataFrame dataFrame2 = new DefaultHttp2DataFrame(bb("2")).stream(inboundStream); + Http2DataFrame dataFrame3 = new DefaultHttp2DataFrame(bb("3")).stream(inboundStream); + Http2DataFrame dataFrame4 = new DefaultHttp2DataFrame(bb("4")).stream(inboundStream); + + assertEquals(new DefaultHttp2HeadersFrame(request).stream(inboundStream), inboundHandler.readInbound()); + + // We want to simulate the parent channel calling channelRead and delay calling channelReadComplete. + parentChannel.writeOneInbound(new Object()); + codec.onHttp2Frame(dataFrame1); + assertEquals(dataFrame1, inboundHandler.readInbound()); + + // Deliver frames, and then a stream closed while read is inactive. + codec.onHttp2Frame(dataFrame2); + codec.onHttp2Frame(dataFrame3); + codec.onHttp2Frame(dataFrame4); + + shouldDisableAutoRead.set(true); + childChannel.config().setAutoRead(true); + numReads.set(1); + + inboundStream.state = Http2Stream.State.CLOSED; + codec.onHttp2StreamStateChanged(inboundStream); + + // Detecting EOS should flush all pending data regardless of read calls. + assertEquals(dataFrame2, inboundHandler.readInbound()); + assertEquals(dataFrame3, inboundHandler.readInbound()); + assertEquals(dataFrame4, inboundHandler.readInbound()); + assertNull(inboundHandler.readInbound()); + + // Now we want to call channelReadComplete and simulate the end of the read loop. + parentChannel.flushInbound(); + + childChannel.closeFuture().syncUninterruptibly(); + + dataFrame1.release(); + dataFrame2.release(); + dataFrame3.release(); + dataFrame4.release(); + } + + @Test + public void childQueueIsDrainedAndNewDataIsDispatchedInParentReadLoopAutoRead() { + AtomicInteger numReads = new AtomicInteger(1); + final AtomicInteger channelReadCompleteCount = new AtomicInteger(0); + final AtomicBoolean shouldDisableAutoRead = new AtomicBoolean(); + Consumer ctxConsumer = new Consumer() { + @Override + public void accept(ChannelHandlerContext obj) { + channelReadCompleteCount.incrementAndGet(); + if (shouldDisableAutoRead.get()) { + obj.channel().config().setAutoRead(false); + } + } + }; + LastInboundHandler inboundHandler = streamActiveAndWriteHeaders(inboundStream, numReads, ctxConsumer); + Http2StreamChannel childChannel = (Http2StreamChannel) inboundHandler.channel(); + childChannel.config().setAutoRead(false); + + Http2DataFrame dataFrame1 = new DefaultHttp2DataFrame(bb("1")).stream(inboundStream); + Http2DataFrame dataFrame2 = new DefaultHttp2DataFrame(bb("2")).stream(inboundStream); + Http2DataFrame dataFrame3 = new DefaultHttp2DataFrame(bb("3")).stream(inboundStream); + Http2DataFrame dataFrame4 = new DefaultHttp2DataFrame(bb("4")).stream(inboundStream); + + assertEquals(new DefaultHttp2HeadersFrame(request).stream(inboundStream), inboundHandler.readInbound()); + + // We want to simulate the parent channel calling channelRead and delay calling channelReadComplete. + parentChannel.writeOneInbound(new Object()); + codec.onHttp2Frame(dataFrame1); + assertEquals(dataFrame1, inboundHandler.readInbound()); + + // We want one item to be in the queue, and allow the numReads to be larger than 1. This will ensure that + // when beginRead() is called the child channel is added to the readPending queue of the parent channel. + codec.onHttp2Frame(dataFrame2); + + numReads.set(10); + shouldDisableAutoRead.set(true); + childChannel.config().setAutoRead(true); + + codec.onHttp2Frame(dataFrame3); + codec.onHttp2Frame(dataFrame4); + + // Detecting EOS should flush all pending data regardless of read calls. + assertEquals(dataFrame2, inboundHandler.readInbound()); + assertEquals(dataFrame3, inboundHandler.readInbound()); + assertEquals(dataFrame4, inboundHandler.readInbound()); + assertNull(inboundHandler.readInbound()); + + // Now we want to call channelReadComplete and simulate the end of the read loop. + parentChannel.flushInbound(); + + // 3 = 1 for initialization + 1 for read when auto read was off + 1 for when auto read was back on + assertEquals(3, channelReadCompleteCount.get()); + + dataFrame1.release(); + dataFrame2.release(); + dataFrame3.release(); + dataFrame4.release(); + } + + @Test + public void childQueueIsDrainedAndNewDataIsDispatchedInParentReadLoopNoAutoRead() { + AtomicInteger numReads = new AtomicInteger(1); + final AtomicInteger channelReadCompleteCount = new AtomicInteger(0); + final AtomicBoolean shouldDisableAutoRead = new AtomicBoolean(); + Consumer ctxConsumer = new Consumer() { + @Override + public void accept(ChannelHandlerContext obj) { + channelReadCompleteCount.incrementAndGet(); + if (shouldDisableAutoRead.get()) { + obj.channel().config().setAutoRead(false); + } + } + }; + LastInboundHandler inboundHandler = streamActiveAndWriteHeaders(inboundStream, numReads, ctxConsumer); + Http2StreamChannel childChannel = (Http2StreamChannel) inboundHandler.channel(); + childChannel.config().setAutoRead(false); + + Http2DataFrame dataFrame1 = new DefaultHttp2DataFrame(bb("1")).stream(inboundStream); + Http2DataFrame dataFrame2 = new DefaultHttp2DataFrame(bb("2")).stream(inboundStream); + Http2DataFrame dataFrame3 = new DefaultHttp2DataFrame(bb("3")).stream(inboundStream); + Http2DataFrame dataFrame4 = new DefaultHttp2DataFrame(bb("4")).stream(inboundStream); + + assertEquals(new DefaultHttp2HeadersFrame(request).stream(inboundStream), inboundHandler.readInbound()); + + // We want to simulate the parent channel calling channelRead and delay calling channelReadComplete. + parentChannel.writeOneInbound(new Object()); + codec.onHttp2Frame(dataFrame1); + assertEquals(dataFrame1, inboundHandler.readInbound()); + + // We want one item to be in the queue, and allow the numReads to be larger than 1. This will ensure that + // when beginRead() is called the child channel is added to the readPending queue of the parent channel. + codec.onHttp2Frame(dataFrame2); + + numReads.set(2); + childChannel.read(); + assertEquals(dataFrame2, inboundHandler.readInbound()); + assertNull(inboundHandler.readInbound()); + + // This is the second item that was read, this should be the last until we call read() again. This should also + // notify of readComplete(). + codec.onHttp2Frame(dataFrame3); + assertEquals(dataFrame3, inboundHandler.readInbound()); + + codec.onHttp2Frame(dataFrame4); + assertNull(inboundHandler.readInbound()); + + childChannel.read(); + assertEquals(dataFrame4, inboundHandler.readInbound()); + assertNull(inboundHandler.readInbound()); + + // Now we want to call channelReadComplete and simulate the end of the read loop. + parentChannel.flushInbound(); + + // 3 = 1 for initialization + 1 for first read of 2 items + 1 for second read of 2 items + + // 1 for parent channel readComplete + assertEquals(4, channelReadCompleteCount.get()); + + dataFrame1.release(); + dataFrame2.release(); + dataFrame3.release(); + dataFrame4.release(); + } + private LastInboundHandler streamActiveAndWriteHeaders(Http2FrameStream stream) { - LastInboundHandler inboundHandler = new LastInboundHandler(); + return streamActiveAndWriteHeaders(stream, null, LastInboundHandler.noopConsumer()); + } + + private LastInboundHandler streamActiveAndWriteHeaders(Http2FrameStream stream, + AtomicInteger maxReads, + Consumer contextConsumer) { + + LastInboundHandler inboundHandler = new LastInboundHandler(contextConsumer); childChannelInitializer.handler = inboundHandler; + childChannelInitializer.maxReads = maxReads; assertFalse(inboundHandler.isChannelActive()); ((TestableHttp2MultiplexCodec.Stream) stream).state = Http2Stream.State.OPEN; codec.onHttp2StreamStateChanged(stream); diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/LastInboundHandler.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/LastInboundHandler.java index 55c4df0a98..38f400af14 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/LastInboundHandler.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/LastInboundHandler.java @@ -27,6 +27,7 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.locks.LockSupport; +import static io.netty.util.internal.ObjectUtil.checkNotNull; import static java.util.concurrent.TimeUnit.MILLISECONDS; /** @@ -34,11 +35,36 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS; */ public class LastInboundHandler extends ChannelDuplexHandler { private final List queue = new ArrayList(); + private final Consumer channelReadCompleteConsumer; private Throwable lastException; private ChannelHandlerContext ctx; private boolean channelActive; private String writabilityStates = ""; + // TODO(scott): use JDK 8's Consumer + public interface Consumer { + void accept(T obj); + } + + private static final Consumer NOOP_CONSUMER = new Consumer() { + @Override + public void accept(Object obj) { + } + }; + + @SuppressWarnings("unchecked") + public static Consumer noopConsumer() { + return (Consumer) NOOP_CONSUMER; + } + + public LastInboundHandler() { + this(LastInboundHandler.noopConsumer()); + } + + public LastInboundHandler(Consumer channelReadCompleteConsumer) { + this.channelReadCompleteConsumer = checkNotNull(channelReadCompleteConsumer, "channelReadCompleteConsumer"); + } + @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { super.handlerAdded(ctx); @@ -86,6 +112,11 @@ public class LastInboundHandler extends ChannelDuplexHandler { queue.add(msg); } + @Override + public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { + channelReadCompleteConsumer.accept(ctx); + } + @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { queue.add(new UserEvent(evt)); diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/TestChannelInitializer.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/TestChannelInitializer.java index 5e13d056bb..015550f078 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/TestChannelInitializer.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/TestChannelInitializer.java @@ -16,10 +16,17 @@ package io.netty.handler.codec.http2; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; import io.netty.channel.Channel; +import io.netty.channel.ChannelConfig; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelInitializer; +import io.netty.channel.RecvByteBufAllocator; +import io.netty.util.UncheckedBooleanSupplier; + +import java.util.concurrent.atomic.AtomicInteger; /** * Channel initializer useful in tests. @@ -27,6 +34,7 @@ import io.netty.channel.ChannelInitializer; @Sharable public class TestChannelInitializer extends ChannelInitializer { ChannelHandler handler; + AtomicInteger maxReads; @Override public void initChannel(Channel channel) { @@ -34,5 +42,81 @@ public class TestChannelInitializer extends ChannelInitializer { channel.pipeline().addLast(handler); handler = null; } + if (maxReads != null) { + channel.config().setRecvByteBufAllocator(new TestNumReadsRecvByteBufAllocator(maxReads)); + } + } + + /** + * Designed to read a single byte at a time to control the number of reads done at a fine granularity. + */ + private static final class TestNumReadsRecvByteBufAllocator implements RecvByteBufAllocator { + private final AtomicInteger numReads; + TestNumReadsRecvByteBufAllocator(AtomicInteger numReads) { + this.numReads = numReads; + } + + @Override + public ExtendedHandle newHandle() { + return new ExtendedHandle() { + private int attemptedBytesRead; + private int lastBytesRead; + private int numMessagesRead; + @Override + public ByteBuf allocate(ByteBufAllocator alloc) { + return alloc.ioBuffer(guess(), guess()); + } + + @Override + public int guess() { + return 1; // only ever allocate buffers of size 1 to ensure the number of reads is controlled. + } + + @Override + public void reset(ChannelConfig config) { + numMessagesRead = 0; + } + + @Override + public void incMessagesRead(int numMessages) { + numMessagesRead += numMessages; + } + + @Override + public void lastBytesRead(int bytes) { + lastBytesRead = bytes; + } + + @Override + public int lastBytesRead() { + return lastBytesRead; + } + + @Override + public void attemptedBytesRead(int bytes) { + attemptedBytesRead = bytes; + } + + @Override + public int attemptedBytesRead() { + return attemptedBytesRead; + } + + @Override + public boolean continueReading() { + return numMessagesRead < numReads.get(); + } + + @Override + public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) { + return continueReading(); + } + + @Override + public void readComplete() { + // Nothing needs to be done or adjusted after each read cycle is completed. + } + }; + } } }