From df46a349e09f94ed92fa5fc9cc7b142c10ab2305 Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Thu, 27 Jun 2019 21:43:31 +0200 Subject: [PATCH] Reduce coupeling between Http2FrameCodec and Http2Multiplex* (#9273) Motivation: Http2MultiplexCodec and Http2MultiplexHandler had a very strong coupling with Http2FrameCodec which we can reduce easily. The end-goal should be to have no coupling at all. Modifications: - Reduce coupling by move some common logic to Http2CodecUtil - Move logic to check if a stream may have existed before to Http2FrameCodec - Use ArrayDeque as replacement for custom double-linked-list which makes the code a lot more readable - Use WindowUpdateFrame to signal consume bytes (just as users do when they use Http2FrameCodec directly) Result: Less coupling and cleaner code. --- .../http2/AbstractHttp2StreamChannel.java | 84 +++++------ .../http2/Http2ChannelDuplexHandler.java | 12 -- .../handler/codec/http2/Http2CodecUtil.java | 4 + .../handler/codec/http2/Http2FrameCodec.java | 11 +- .../codec/http2/Http2MultiplexCodec.java | 130 +++++------------- .../codec/http2/Http2MultiplexHandler.java | 103 ++++---------- .../handler/codec/http2/MaxCapacityQueue.java | 129 +++++++++++++++++ 7 files changed, 250 insertions(+), 223 deletions(-) create mode 100644 codec-http2/src/main/java/io/netty/handler/codec/http2/MaxCapacityQueue.java diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/AbstractHttp2StreamChannel.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/AbstractHttp2StreamChannel.java index cb67e277f2..68ed5c990a 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/AbstractHttp2StreamChannel.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/AbstractHttp2StreamChannel.java @@ -141,6 +141,7 @@ abstract class AbstractHttp2StreamChannel extends DefaultAttributeMap implements private Runnable fireChannelWritabilityChangedTask; private boolean outboundClosed; + private int flowControlledBytes; /** * This variable represents if a read is in progress for the current channel or was requested. @@ -154,13 +155,7 @@ abstract class AbstractHttp2StreamChannel extends DefaultAttributeMap implements /** {@code true} after the first HEADERS frame has been written **/ private boolean firstFrameWritten; - - // Currently the child channel and parent channel are always on the same EventLoop thread. This allows us to - // extend the read loop of a child channel if the child channel drains its queued data during read, and the - // parent channel is still in its read loop. The next/previous links build a doubly linked list that the parent - // channel will iterate in its channelReadComplete to end the read cycle for each child channel in the list. - AbstractHttp2StreamChannel next; - AbstractHttp2StreamChannel previous; + private boolean readCompletePending; AbstractHttp2StreamChannel(DefaultHttp2FrameStream stream, int id, ChannelHandler inboundHandler) { this.stream = stream; @@ -535,16 +530,18 @@ abstract class AbstractHttp2StreamChannel extends DefaultAttributeMap implements // otherwise we would have drained it from the queue and processed it during the read cycle. assert inboundBuffer == null || inboundBuffer.isEmpty(); final RecvByteBufAllocator.Handle allocHandle = unsafe.recvBufAllocHandle(); - unsafe.doRead0(frame, allocHandle); + flowControlledBytes += unsafe.doRead0(frame, allocHandle); // We currently don't need to check for readEOS because the parent channel and child channel are limited // to the same EventLoop thread. There are a limited number of frame types that may come after EOS is // read (unknown, reset) and the trade off is less conditionals for the hot path (headers/data) at the // cost of additional readComplete notifications on the rare path. if (allocHandle.continueReading()) { - tryAddChildChannelToReadPendingQueue(); + if (!readCompletePending) { + readCompletePending = true; + addChannelToReadCompletePendingQueue(); + } } else { - tryRemoveChildChannelFromReadPendingQueue(); - unsafe.notifyReadComplete(allocHandle); + unsafe.notifyReadComplete(allocHandle, true); } } else { if (inboundBuffer == null) { @@ -556,8 +553,8 @@ abstract class AbstractHttp2StreamChannel extends DefaultAttributeMap implements void fireChildReadComplete() { assert eventLoop().inEventLoop(); - assert readStatus != ReadStatus.IDLE; - unsafe.notifyReadComplete(unsafe.recvBufAllocHandle()); + assert readStatus != ReadStatus.IDLE || !readCompletePending; + unsafe.notifyReadComplete(unsafe.recvBufAllocHandle(), false); } private final class Http2ChannelUnsafe implements Unsafe { @@ -651,14 +648,16 @@ abstract class AbstractHttp2StreamChannel extends DefaultAttributeMap implements return; } closeInitiated = true; - - tryRemoveChildChannelFromReadPendingQueue(); + // Just set to false as removing from an underlying queue would even be more expensive. + readCompletePending = false; final boolean wasActive = isActive(); - // Only ever send a reset frame if the connection is still alive and if the stream may have existed + updateLocalWindowIfNeeded(); + + // Only ever send a reset frame if the connection is still alive and if the stream was created before // as otherwise we may send a RST on a stream in an invalid state and cause a connection error. - if (parent().isActive() && !readEOS && streamMayHaveExisted(stream())) { + if (parent().isActive() && !readEOS && Http2CodecUtil.isStreamIdValid(stream.id())) { Http2StreamFrame resetFrame = new DefaultHttp2ResetFrame(Http2Error.CANCEL).stream(stream()); write(resetFrame, unsafe().voidPromise()); flush(); @@ -782,7 +781,7 @@ abstract class AbstractHttp2StreamChannel extends DefaultAttributeMap implements allocHandle.reset(config()); boolean continueReading = false; do { - doRead0((Http2Frame) message, allocHandle); + flowControlledBytes += doRead0((Http2Frame) message, allocHandle); } while ((readEOS || (continueReading = allocHandle.continueReading())) && (message = inboundBuffer.poll()) != null); @@ -791,10 +790,12 @@ abstract class AbstractHttp2StreamChannel extends DefaultAttributeMap implements // currently reading it is possile that more frames will be delivered to this child channel. In // the case that this child channel still wants to read we delay the channelReadComplete on this // child channel until the parent is done reading. - boolean added = tryAddChildChannelToReadPendingQueue(); - assert added; + if (!readCompletePending) { + readCompletePending = true; + addChannelToReadCompletePendingQueue(); + } } else { - notifyReadComplete(allocHandle); + notifyReadComplete(allocHandle, true); } } } @@ -803,13 +804,30 @@ abstract class AbstractHttp2StreamChannel extends DefaultAttributeMap implements readEOS = true; } - void notifyReadComplete(RecvByteBufAllocator.Handle allocHandle) { - assert next == null && previous == null; + private void updateLocalWindowIfNeeded() { + if (flowControlledBytes != 0) { + int bytes = flowControlledBytes; + flowControlledBytes = 0; + write0(parentContext(), new DefaultHttp2WindowUpdateFrame(bytes).stream(stream)); + writeDoneAndNoFlush = true; + } + } + + void notifyReadComplete(RecvByteBufAllocator.Handle allocHandle, boolean forceReadComplete) { + if (!readCompletePending && !forceReadComplete) { + return; + } + // Set to false just in case we added the channel multiple times before. + readCompletePending = false; + if (readStatus == ReadStatus.REQUESTED) { readStatus = ReadStatus.IN_PROGRESS; } else { readStatus = ReadStatus.IDLE; } + + updateLocalWindowIfNeeded(); + allocHandle.readComplete(); pipeline().fireChannelReadComplete(); // Reading data may result in frames being written (e.g. WINDOW_UPDATE, RST, etc..). If the parent @@ -822,7 +840,7 @@ abstract class AbstractHttp2StreamChannel extends DefaultAttributeMap implements } @SuppressWarnings("deprecation") - void doRead0(Http2Frame frame, RecvByteBufAllocator.Handle allocHandle) { + int doRead0(Http2Frame frame, RecvByteBufAllocator.Handle allocHandle) { pipeline().fireChannelRead(frame); allocHandle.incMessagesRead(1); @@ -830,21 +848,12 @@ abstract class AbstractHttp2StreamChannel extends DefaultAttributeMap implements final int numBytesToBeConsumed = ((Http2DataFrame) frame).initialFlowControlledBytes(); allocHandle.attemptedBytesRead(numBytesToBeConsumed); allocHandle.lastBytesRead(numBytesToBeConsumed); - if (numBytesToBeConsumed != 0) { - try { - if (consumeBytes(stream, numBytesToBeConsumed)) { - // We wrote some WINDOW_UPDATE frame, so we may need to do a flush. - writeDoneAndNoFlush = true; - flush(); - } - } catch (Http2Exception e) { - pipeline().fireExceptionCaught(e); - } - } + return numBytesToBeConsumed; } else { allocHandle.attemptedBytesRead(MIN_HTTP2_FRAME_SIZE); allocHandle.lastBytesRead(MIN_HTTP2_FRAME_SIZE); } + return 0; } @Override @@ -1041,10 +1050,7 @@ abstract class AbstractHttp2StreamChannel extends DefaultAttributeMap implements return promise; } - protected abstract boolean consumeBytes(Http2FrameStream stream, int bytes) throws Http2Exception; protected abstract boolean isParentReadInProgress(); - protected abstract boolean streamMayHaveExisted(Http2FrameStream stream); - protected abstract void tryRemoveChildChannelFromReadPendingQueue(); - protected abstract boolean tryAddChildChannelToReadPendingQueue(); + protected abstract void addChannelToReadCompletePendingQueue(); protected abstract ChannelHandlerContext parentContext(); } diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ChannelDuplexHandler.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ChannelDuplexHandler.java index 6c09014aaa..b595696f4e 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ChannelDuplexHandler.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ChannelDuplexHandler.java @@ -91,16 +91,4 @@ public abstract class Http2ChannelDuplexHandler extends ChannelDuplexHandler { } return (Http2FrameCodec) frameCodecCtx.handler(); } - - boolean isValidLocalStreamId(Http2FrameStream stream) { - return frameCodec.connection().local().isValidStreamId(stream.id()); - } - - boolean streamMayHaveExisted(Http2FrameStream stream) { - return frameCodec.connection().streamMayHaveExisted(stream.id()); - } - - boolean consumeBytes(Http2FrameStream stream, int bytes) throws Http2Exception { - return frameCodec.consumeBytes(stream.id(), bytes); - } } diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2CodecUtil.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2CodecUtil.java index 7ebc8fd488..7eed52d850 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2CodecUtil.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2CodecUtil.java @@ -151,6 +151,10 @@ public final class Http2CodecUtil { return streamId >= 0; } + static boolean isStreamIdValid(int streamId, boolean server) { + return isStreamIdValid(streamId) && server == ((streamId & 1) == 0); + } + /** * Indicates whether or not the given value for max frame size falls within the valid range. */ diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2FrameCodec.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2FrameCodec.java index c11c5e31ed..c263061420 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2FrameCodec.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2FrameCodec.java @@ -296,7 +296,16 @@ public class Http2FrameCodec extends Http2ConnectionHandler { } } else if (msg instanceof Http2ResetFrame) { Http2ResetFrame rstFrame = (Http2ResetFrame) msg; - encoder().writeRstStream(ctx, rstFrame.stream().id(), rstFrame.errorCode(), promise); + int id = rstFrame.stream().id(); + // Only ever send a reset frame if stream may have existed before as otherwise we may send a RST on a + // stream in an invalid state and cause a connection error. + if (connection().streamMayHaveExisted(id)) { + encoder().writeRstStream(ctx, rstFrame.stream().id(), rstFrame.errorCode(), promise); + } else { + ReferenceCountUtil.release(rstFrame); + promise.setFailure(Http2Exception.streamError( + rstFrame.stream().id(), Http2Error.PROTOCOL_ERROR, "Stream never existed")); + } } else if (msg instanceof Http2PingFrame) { Http2PingFrame frame = (Http2PingFrame) msg; encoder().writePing(ctx, frame.ack(), frame.content(), promise); diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2MultiplexCodec.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2MultiplexCodec.java index a9d49d6e44..daa6f60d8d 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2MultiplexCodec.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2MultiplexCodec.java @@ -27,6 +27,9 @@ import io.netty.util.ReferenceCounted; import io.netty.util.internal.UnstableApi; +import java.util.ArrayDeque; +import java.util.Queue; + import static io.netty.handler.codec.http2.Http2CodecUtil.HTTP_UPGRADE_STREAM_ID; import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR; import static io.netty.handler.codec.http2.Http2Exception.connectionError; @@ -84,14 +87,14 @@ public class Http2MultiplexCodec extends Http2FrameCodec { private final ChannelHandler inboundStreamHandler; private final ChannelHandler upgradeStreamHandler; + private final Queue readCompletePendingQueue = + new MaxCapacityQueue(new ArrayDeque(8), + // Choose 100 which is what is used most of the times as default. + Http2CodecUtil.SMALLEST_MAX_CONCURRENT_STREAMS); private boolean parentReadInProgress; private int idCount; - // Linked-List for Http2MultiplexCodecStreamChannel instances that need to be processed by channelReadComplete(...) - private AbstractHttp2StreamChannel head; - private AbstractHttp2StreamChannel tail; - // Need to be volatile as accessed from within the Http2MultiplexCodecStreamChannel in a multi-threaded fashion. volatile ChannelHandlerContext ctx; @@ -131,14 +134,7 @@ public class Http2MultiplexCodec extends Http2FrameCodec { public final void handlerRemoved0(ChannelHandlerContext ctx) throws Exception { super.handlerRemoved0(ctx); - // Unlink the linked list to guard against GC nepotism. - AbstractHttp2StreamChannel ch = head; - while (ch != null) { - AbstractHttp2StreamChannel curr = ch; - ch = curr.next; - curr.next = curr.previous = null; - } - head = tail = null; + readCompletePendingQueue.clear(); } @Override @@ -222,50 +218,6 @@ public class Http2MultiplexCodec extends Http2FrameCodec { } } - private boolean isChildChannelInReadPendingQueue(AbstractHttp2StreamChannel childChannel) { - return childChannel.previous != null || childChannel.next != null || head == childChannel; - } - - private boolean tryAddChildChannelToReadPendingQueue(AbstractHttp2StreamChannel childChannel) { - if (!isChildChannelInReadPendingQueue(childChannel)) { - addChildChannelToReadPendingQueue(childChannel); - return true; - } - return false; - } - - private void addChildChannelToReadPendingQueue(AbstractHttp2StreamChannel childChannel) { - if (tail == null) { - assert head == null; - tail = head = childChannel; - } else { - childChannel.previous = tail; - tail.next = childChannel; - tail = childChannel; - } - } - - private void tryRemoveChildChannelFromReadPendingQueue(AbstractHttp2StreamChannel childChannel) { - if (isChildChannelInReadPendingQueue(childChannel)) { - removeChildChannelFromReadPendingQueue(childChannel); - } - } - - private void removeChildChannelFromReadPendingQueue(AbstractHttp2StreamChannel childChannel) { - AbstractHttp2StreamChannel previous = childChannel.previous; - if (childChannel.next != null) { - childChannel.next.previous = previous; - } else { - tail = tail.previous; // If there is no next, this childChannel is the tail, so move the tail back. - } - if (previous != null) { - previous.next = childChannel.next; - } else { - head = head.next; // If there is no previous, this childChannel is the head, so move the tail forward. - } - childChannel.next = childChannel.previous = null; - } - private void onHttp2GoAwayFrame(ChannelHandlerContext ctx, final Http2GoAwayFrame goAwayFrame) { try { forEachActiveStream(new Http2FrameStreamVisitor() { @@ -291,17 +243,30 @@ public class Http2MultiplexCodec extends Http2FrameCodec { */ @Override public final void channelReadComplete(ChannelHandlerContext ctx) throws Exception { - try { - onChannelReadComplete(ctx); - } finally { - parentReadInProgress = false; - tail = head = null; - // We always flush as this is what Http2ConnectionHandler does for now. - flush0(ctx); - } + processPendingReadCompleteQueue(); channelReadComplete0(ctx); } + private void processPendingReadCompleteQueue() { + parentReadInProgress = true; + try { + // If we have many child channel we can optimize for the case when multiple call flush() in + // channelReadComplete(...) callbacks and only do it once as otherwise we will end-up with multiple + // write calls on the socket which is expensive. + for (;;) { + AbstractHttp2StreamChannel childChannel = readCompletePendingQueue.poll(); + if (childChannel == null) { + break; + } + childChannel.fireChildReadComplete(); + } + } finally { + parentReadInProgress = false; + readCompletePendingQueue.clear(); + // We always flush as this is what Http2ConnectionHandler does for now. + flush0(ctx); + } + } @Override public final void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { parentReadInProgress = true; @@ -319,20 +284,6 @@ public class Http2MultiplexCodec extends Http2FrameCodec { ctx.fireChannelWritabilityChanged(); } - final void onChannelReadComplete(ChannelHandlerContext ctx) { - // If we have many child channel we can optimize for the case when multiple call flush() in - // channelReadComplete(...) callbacks and only do it once as otherwise we will end-up with multiple - // write calls on the socket which is expensive. - AbstractHttp2StreamChannel current = head; - while (current != null) { - AbstractHttp2StreamChannel childChannel = current; - // Clear early in case fireChildReadComplete() causes it to need to be re-processed - current = current.next; - childChannel.next = childChannel.previous = null; - childChannel.fireChildReadComplete(); - } - } - final void flush0(ChannelHandlerContext ctx) { flush(ctx); } @@ -343,29 +294,18 @@ public class Http2MultiplexCodec extends Http2FrameCodec { super(stream, ++idCount, inboundHandler); } - @Override - protected boolean consumeBytes(Http2FrameStream stream, int bytes) throws Http2Exception { - return Http2MultiplexCodec.this.consumeBytes(stream.id(), bytes); - } - @Override protected boolean isParentReadInProgress() { return parentReadInProgress; } @Override - protected boolean streamMayHaveExisted(Http2FrameStream stream) { - return Http2MultiplexCodec.this.connection().streamMayHaveExisted(stream.id()); - } - - @Override - protected void tryRemoveChildChannelFromReadPendingQueue() { - Http2MultiplexCodec.this.tryRemoveChildChannelFromReadPendingQueue(this); - } - - @Override - protected boolean tryAddChildChannelToReadPendingQueue() { - return Http2MultiplexCodec.this.tryAddChildChannelToReadPendingQueue(this); + protected void addChannelToReadCompletePendingQueue() { + // If there is no space left in the queue, just keep on processing everything that is already + // stored there and try again. + while (!readCompletePendingQueue.offer(this)) { + processPendingReadCompleteQueue(); + } } @Override diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2MultiplexHandler.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2MultiplexHandler.java index 4a1a3db4a3..300e37df3d 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2MultiplexHandler.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2MultiplexHandler.java @@ -24,11 +24,15 @@ import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoop; +import io.netty.channel.ServerChannel; import io.netty.handler.codec.http2.Http2FrameCodec.DefaultHttp2FrameStream; import io.netty.util.ReferenceCounted; import io.netty.util.internal.ObjectUtil; import io.netty.util.internal.UnstableApi; +import java.util.ArrayDeque; +import java.util.Queue; + import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR; import static io.netty.handler.codec.http2.Http2Exception.connectionError; @@ -90,15 +94,14 @@ public final class Http2MultiplexHandler extends Http2ChannelDuplexHandler { private final ChannelHandler inboundStreamHandler; private final ChannelHandler upgradeStreamHandler; + private final Queue readCompletePendingQueue = + new MaxCapacityQueue(new ArrayDeque(8), + // Choose 100 which is what is used most of the times as default. + Http2CodecUtil.SMALLEST_MAX_CONCURRENT_STREAMS); private boolean parentReadInProgress; private int idCount; - // Linked-List for Http2MultiplexHandlerStreamChannel instances that need to be processed by - // channelReadComplete(...) - private AbstractHttp2StreamChannel head; - private AbstractHttp2StreamChannel tail; - // Need to be volatile as accessed from within the Http2MultiplexHandlerStreamChannel in a multi-threaded fashion. private volatile ChannelHandlerContext ctx; @@ -149,14 +152,7 @@ public final class Http2MultiplexHandler extends Http2ChannelDuplexHandler { @Override protected void handlerRemoved0(ChannelHandlerContext ctx) { - // Unlink the linked list to guard against GC nepotism. - AbstractHttp2StreamChannel ch = head; - while (ch != null) { - AbstractHttp2StreamChannel curr = ch; - ch = curr.next; - curr.next = curr.previous = null; - } - head = tail = null; + readCompletePendingQueue.clear(); } @Override @@ -269,49 +265,14 @@ public final class Http2MultiplexHandler extends Http2ChannelDuplexHandler { ctx.fireExceptionCaught(cause); } - private boolean isChildChannelInReadPendingQueue(AbstractHttp2StreamChannel childChannel) { - return childChannel.previous != null || childChannel.next != null || head == childChannel; - } - - private boolean tryAddChildChannelToReadPendingQueue(AbstractHttp2StreamChannel childChannel) { - if (!isChildChannelInReadPendingQueue(childChannel)) { - if (tail == null) { - assert head == null; - tail = head = childChannel; - } else { - childChannel.previous = tail; - tail.next = childChannel; - tail = childChannel; - } - return true; - } - return false; - } - - private void tryRemoveChildChannelFromReadPendingQueue(AbstractHttp2StreamChannel childChannel) { - if (isChildChannelInReadPendingQueue(childChannel)) { - AbstractHttp2StreamChannel previous = childChannel.previous; - if (childChannel.next != null) { - childChannel.next.previous = previous; - } else { - tail = tail.previous; // If there is no next, this childChannel is the tail, so move the tail back. - } - if (previous != null) { - previous.next = childChannel.next; - } else { - head = head.next; // If there is no previous, this childChannel is the head, so move the tail forward. - } - childChannel.next = childChannel.previous = null; - } - } - private void onHttp2GoAwayFrame(ChannelHandlerContext ctx, final Http2GoAwayFrame goAwayFrame) { try { + final boolean server = ctx.channel().parent() instanceof ServerChannel; forEachActiveStream(new Http2FrameStreamVisitor() { @Override public boolean visit(Http2FrameStream stream) { final int streamId = stream.id(); - if (streamId > goAwayFrame.lastStreamId() && isValidLocalStreamId(stream)) { + if (streamId > goAwayFrame.lastStreamId() && Http2CodecUtil.isStreamIdValid(streamId, server)) { final AbstractHttp2StreamChannel childChannel = (AbstractHttp2StreamChannel) ((DefaultHttp2FrameStream) stream).attachment; childChannel.pipeline().fireUserEventTriggered(goAwayFrame.retainedDuplicate()); @@ -330,29 +291,30 @@ public final class Http2MultiplexHandler extends Http2ChannelDuplexHandler { */ @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { + processPendingReadCompleteQueue(); + ctx.fireChannelReadComplete(); + } + + private void processPendingReadCompleteQueue() { parentReadInProgress = true; // If we have many child channel we can optimize for the case when multiple call flush() in // channelReadComplete(...) callbacks and only do it once as otherwise we will end-up with multiple // write calls on the socket which is expensive. - AbstractHttp2StreamChannel current = head; - if (current != null) { + AbstractHttp2StreamChannel childChannel = readCompletePendingQueue.poll(); + if (childChannel != null) { try { do { - AbstractHttp2StreamChannel childChannel = current; - // Clear early in case fireChildReadComplete() causes it to need to be re-processed - current = current.next; - childChannel.next = childChannel.previous = null; childChannel.fireChildReadComplete(); - } while (current != null); + childChannel = readCompletePendingQueue.poll(); + } while (childChannel != null); } finally { parentReadInProgress = false; - tail = head = null; + readCompletePendingQueue.clear(); ctx.flush(); } } else { parentReadInProgress = false; } - ctx.fireChannelReadComplete(); } private final class Http2MultiplexHandlerStreamChannel extends AbstractHttp2StreamChannel { @@ -361,29 +323,18 @@ public final class Http2MultiplexHandler extends Http2ChannelDuplexHandler { super(stream, ++idCount, inboundHandler); } - @Override - protected boolean consumeBytes(Http2FrameStream stream, int bytes) throws Http2Exception { - return Http2MultiplexHandler.this.consumeBytes(stream, bytes); - } - @Override protected boolean isParentReadInProgress() { return parentReadInProgress; } @Override - protected boolean streamMayHaveExisted(Http2FrameStream stream) { - return Http2MultiplexHandler.this.streamMayHaveExisted(stream); - } - - @Override - protected void tryRemoveChildChannelFromReadPendingQueue() { - Http2MultiplexHandler.this.tryRemoveChildChannelFromReadPendingQueue(this); - } - - @Override - protected boolean tryAddChildChannelToReadPendingQueue() { - return Http2MultiplexHandler.this.tryAddChildChannelToReadPendingQueue(this); + protected void addChannelToReadCompletePendingQueue() { + // If there is no space left in the queue, just keep on processing everything that is already + // stored there and try again. + while (!readCompletePendingQueue.offer(this)) { + processPendingReadCompleteQueue(); + } } @Override diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/MaxCapacityQueue.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/MaxCapacityQueue.java new file mode 100644 index 0000000000..7f72511fb0 --- /dev/null +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/MaxCapacityQueue.java @@ -0,0 +1,129 @@ +/* + * Copyright 2019 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.handler.codec.http2; + +import java.util.Collection; +import java.util.Iterator; +import java.util.Queue; + +final class MaxCapacityQueue implements Queue { + private final Queue queue; + private final int maxCapacity; + + MaxCapacityQueue(Queue queue, int maxCapacity) { + this.queue = queue; + this.maxCapacity = maxCapacity; + } + + @Override + public boolean add(E element) { + if (offer(element)) { + return true; + } + throw new IllegalStateException(); + } + + @Override + public boolean offer(E element) { + if (maxCapacity <= queue.size()) { + return false; + } + return queue.offer(element); + } + + @Override + public E remove() { + return queue.remove(); + } + + @Override + public E poll() { + return queue.poll(); + } + + @Override + public E element() { + return queue.element(); + } + + @Override + public E peek() { + return queue.peek(); + } + + @Override + public int size() { + return queue.size(); + } + + @Override + public boolean isEmpty() { + return queue.isEmpty(); + } + + @Override + public boolean contains(Object o) { + return queue.contains(o); + } + + @Override + public Iterator iterator() { + return queue.iterator(); + } + + @Override + public Object[] toArray() { + return queue.toArray(); + } + + @Override + public T[] toArray(T[] a) { + return queue.toArray(a); + } + + @Override + public boolean remove(Object o) { + return queue.remove(o); + } + + @Override + public boolean containsAll(Collection c) { + return queue.containsAll(c); + } + + @Override + public boolean addAll(Collection c) { + if (maxCapacity >= size() + c.size()) { + return queue.addAll(c); + } + throw new IllegalStateException(); + } + + @Override + public boolean removeAll(Collection c) { + return queue.removeAll(c); + } + + @Override + public boolean retainAll(Collection c) { + return queue.retainAll(c); + } + + @Override + public void clear() { + queue.clear(); + } +}