From d71661ff88353254cc5c990192ff7908925cf581 Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Tue, 22 Oct 2019 05:40:14 -0700 Subject: [PATCH] =?UTF-8?q?Correctly=20propagate=20failures=20while=20upda?= =?UTF-8?q?te=20the=20flow-controller=20to=20the=20=E2=80=A6=20(#9664)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Motivation: We may fail to update the flow-controller and in this case need to notify the stream channel and close it. Modifications: Attach a future to the write of the update frame and in case of a failure propagate it to the channel and close it Result: Fixes https://github.com/netty/netty/issues/9663 --- .../http2/AbstractHttp2StreamChannel.java | 72 ++++++++++++++----- 1 file changed, 55 insertions(+), 17 deletions(-) diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/AbstractHttp2StreamChannel.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/AbstractHttp2StreamChannel.java index 15478d2eb7..c6c6ee04de 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/AbstractHttp2StreamChannel.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/AbstractHttp2StreamChannel.java @@ -99,6 +99,28 @@ abstract class AbstractHttp2StreamChannel extends DefaultAttributeMap implements private static final AtomicIntegerFieldUpdater UNWRITABLE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(AbstractHttp2StreamChannel.class, "unwritable"); + private static void windowUpdateFrameWriteComplete(ChannelFuture future, Channel streamChannel) { + Throwable cause = future.cause(); + if (cause != null) { + Throwable unwrappedCause; + // Unwrap if needed + if (cause instanceof Http2FrameStreamException && ((unwrappedCause = cause.getCause()) != null)) { + cause = unwrappedCause; + } + + // Notify the child-channel and close it. + streamChannel.pipeline().fireExceptionCaught(cause); + streamChannel.unsafe().close(streamChannel.unsafe().voidPromise()); + } + } + + private final ChannelFutureListener windowUpdateFrameWriteListener = new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) { + windowUpdateFrameWriteComplete(future, AbstractHttp2StreamChannel.this); + } + }; + /** * The current status of the read-processing for a {@link AbstractHttp2StreamChannel}. */ @@ -529,7 +551,7 @@ abstract class AbstractHttp2StreamChannel extends DefaultAttributeMap implements // otherwise we would have drained it from the queue and processed it during the read cycle. assert inboundBuffer == null || inboundBuffer.isEmpty(); final RecvByteBufAllocator.Handle allocHandle = unsafe.recvBufAllocHandle(); - flowControlledBytes += unsafe.doRead0(frame, allocHandle); + unsafe.doRead0(frame, allocHandle); // We currently don't need to check for readEOS because the parent channel and child channel are limited // to the same EventLoop thread. There are a limited number of frame types that may come after EOS is // read (unknown, reset) and the trade off is less conditionals for the hot path (headers/data) at the @@ -650,7 +672,8 @@ abstract class AbstractHttp2StreamChannel extends DefaultAttributeMap implements final boolean wasActive = isActive(); - updateLocalWindowIfNeeded(); + // There is no need to update the local window as once the stream is closed all the pending bytes will be + // given back to the connection window by the controller itself. // Only ever send a reset frame if the connection is still alive and if the stream was created before // as otherwise we may send a RST on a stream in an invalid state and cause a connection error. @@ -784,7 +807,7 @@ abstract class AbstractHttp2StreamChannel extends DefaultAttributeMap implements allocHandle.reset(config()); boolean continueReading = false; do { - flowControlledBytes += doRead0((Http2Frame) message, allocHandle); + doRead0((Http2Frame) message, allocHandle); } while ((readEOS || (continueReading = allocHandle.continueReading())) && (message = pollQueuedMessage()) != null); @@ -811,8 +834,17 @@ abstract class AbstractHttp2StreamChannel extends DefaultAttributeMap implements if (flowControlledBytes != 0) { int bytes = flowControlledBytes; flowControlledBytes = 0; - write0(parentContext(), new DefaultHttp2WindowUpdateFrame(bytes).stream(stream)); - writeDoneAndNoFlush = true; + ChannelFuture future = write0(parentContext(), new DefaultHttp2WindowUpdateFrame(bytes).stream(stream)); + // Add a listener which will notify and teardown the stream + // when a window update fails if needed or check the result of the future directly if it was completed + // already. + // See https://github.com/netty/netty/issues/9663 + if (future.isDone()) { + windowUpdateFrameWriteComplete(future, AbstractHttp2StreamChannel.this); + } else { + future.addListener(windowUpdateFrameWriteListener); + writeDoneAndNoFlush = true; + } } } @@ -845,20 +877,26 @@ abstract class AbstractHttp2StreamChannel extends DefaultAttributeMap implements } @SuppressWarnings("deprecation") - int doRead0(Http2Frame frame, RecvByteBufAllocator.Handle allocHandle) { - pipeline().fireChannelRead(frame); + void doRead0(Http2Frame frame, RecvByteBufAllocator.Handle allocHandle) { + final int bytes; + if (frame instanceof Http2DataFrame) { + bytes = ((Http2DataFrame) frame).initialFlowControlledBytes(); + + // It is important that we increment the flowControlledBytes before we call fireChannelRead(...) + // as it may cause a read() that will call updateLocalWindowIfNeeded() and we need to ensure + // in this case that we accounted for it. + // + // See https://github.com/netty/netty/issues/9663 + flowControlledBytes += bytes; + } else { + bytes = MIN_HTTP2_FRAME_SIZE; + } + // Update before firing event through the pipeline to be consistent with other Channel implementation. + allocHandle.attemptedBytesRead(bytes); + allocHandle.lastBytesRead(bytes); allocHandle.incMessagesRead(1); - if (frame instanceof Http2DataFrame) { - final int numBytesToBeConsumed = ((Http2DataFrame) frame).initialFlowControlledBytes(); - allocHandle.attemptedBytesRead(numBytesToBeConsumed); - allocHandle.lastBytesRead(numBytesToBeConsumed); - return numBytesToBeConsumed; - } else { - allocHandle.attemptedBytesRead(MIN_HTTP2_FRAME_SIZE); - allocHandle.lastBytesRead(MIN_HTTP2_FRAME_SIZE); - } - return 0; + pipeline().fireChannelRead(frame); } @Override