Correctly propagate failures while update the flow-controller to the … (#9664)

Motivation:

We may fail to update the flow-controller and in this case need to notify the stream channel and close it.

Modifications:

Attach a future to the write of the update frame and in case of a failure propagate it to the channel and close it

Result:

Fixes https://github.com/netty/netty/issues/9663
This commit is contained in:
Norman Maurer 2019-10-22 05:40:14 -07:00
parent 3577a52809
commit d71661ff88

View File

@ -99,6 +99,28 @@ abstract class AbstractHttp2StreamChannel extends DefaultAttributeMap implements
private static final AtomicIntegerFieldUpdater<AbstractHttp2StreamChannel> UNWRITABLE_UPDATER = private static final AtomicIntegerFieldUpdater<AbstractHttp2StreamChannel> UNWRITABLE_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(AbstractHttp2StreamChannel.class, "unwritable"); AtomicIntegerFieldUpdater.newUpdater(AbstractHttp2StreamChannel.class, "unwritable");
private static void windowUpdateFrameWriteComplete(ChannelFuture future, Channel streamChannel) {
Throwable cause = future.cause();
if (cause != null) {
Throwable unwrappedCause;
// Unwrap if needed
if (cause instanceof Http2FrameStreamException && ((unwrappedCause = cause.getCause()) != null)) {
cause = unwrappedCause;
}
// Notify the child-channel and close it.
streamChannel.pipeline().fireExceptionCaught(cause);
streamChannel.unsafe().close(streamChannel.unsafe().voidPromise());
}
}
private final ChannelFutureListener windowUpdateFrameWriteListener = new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
windowUpdateFrameWriteComplete(future, AbstractHttp2StreamChannel.this);
}
};
/** /**
* The current status of the read-processing for a {@link AbstractHttp2StreamChannel}. * The current status of the read-processing for a {@link AbstractHttp2StreamChannel}.
*/ */
@ -529,7 +551,7 @@ abstract class AbstractHttp2StreamChannel extends DefaultAttributeMap implements
// otherwise we would have drained it from the queue and processed it during the read cycle. // otherwise we would have drained it from the queue and processed it during the read cycle.
assert inboundBuffer == null || inboundBuffer.isEmpty(); assert inboundBuffer == null || inboundBuffer.isEmpty();
final RecvByteBufAllocator.Handle allocHandle = unsafe.recvBufAllocHandle(); final RecvByteBufAllocator.Handle allocHandle = unsafe.recvBufAllocHandle();
flowControlledBytes += unsafe.doRead0(frame, allocHandle); unsafe.doRead0(frame, allocHandle);
// We currently don't need to check for readEOS because the parent channel and child channel are limited // We currently don't need to check for readEOS because the parent channel and child channel are limited
// to the same EventLoop thread. There are a limited number of frame types that may come after EOS is // to the same EventLoop thread. There are a limited number of frame types that may come after EOS is
// read (unknown, reset) and the trade off is less conditionals for the hot path (headers/data) at the // read (unknown, reset) and the trade off is less conditionals for the hot path (headers/data) at the
@ -650,7 +672,8 @@ abstract class AbstractHttp2StreamChannel extends DefaultAttributeMap implements
final boolean wasActive = isActive(); final boolean wasActive = isActive();
updateLocalWindowIfNeeded(); // There is no need to update the local window as once the stream is closed all the pending bytes will be
// given back to the connection window by the controller itself.
// Only ever send a reset frame if the connection is still alive and if the stream was created before // Only ever send a reset frame if the connection is still alive and if the stream was created before
// as otherwise we may send a RST on a stream in an invalid state and cause a connection error. // as otherwise we may send a RST on a stream in an invalid state and cause a connection error.
@ -784,7 +807,7 @@ abstract class AbstractHttp2StreamChannel extends DefaultAttributeMap implements
allocHandle.reset(config()); allocHandle.reset(config());
boolean continueReading = false; boolean continueReading = false;
do { do {
flowControlledBytes += doRead0((Http2Frame) message, allocHandle); doRead0((Http2Frame) message, allocHandle);
} while ((readEOS || (continueReading = allocHandle.continueReading())) } while ((readEOS || (continueReading = allocHandle.continueReading()))
&& (message = pollQueuedMessage()) != null); && (message = pollQueuedMessage()) != null);
@ -811,10 +834,19 @@ abstract class AbstractHttp2StreamChannel extends DefaultAttributeMap implements
if (flowControlledBytes != 0) { if (flowControlledBytes != 0) {
int bytes = flowControlledBytes; int bytes = flowControlledBytes;
flowControlledBytes = 0; flowControlledBytes = 0;
write0(parentContext(), new DefaultHttp2WindowUpdateFrame(bytes).stream(stream)); ChannelFuture future = write0(parentContext(), new DefaultHttp2WindowUpdateFrame(bytes).stream(stream));
// Add a listener which will notify and teardown the stream
// when a window update fails if needed or check the result of the future directly if it was completed
// already.
// See https://github.com/netty/netty/issues/9663
if (future.isDone()) {
windowUpdateFrameWriteComplete(future, AbstractHttp2StreamChannel.this);
} else {
future.addListener(windowUpdateFrameWriteListener);
writeDoneAndNoFlush = true; writeDoneAndNoFlush = true;
} }
} }
}
void notifyReadComplete(RecvByteBufAllocator.Handle allocHandle, boolean forceReadComplete) { void notifyReadComplete(RecvByteBufAllocator.Handle allocHandle, boolean forceReadComplete) {
if (!readCompletePending && !forceReadComplete) { if (!readCompletePending && !forceReadComplete) {
@ -845,20 +877,26 @@ abstract class AbstractHttp2StreamChannel extends DefaultAttributeMap implements
} }
@SuppressWarnings("deprecation") @SuppressWarnings("deprecation")
int doRead0(Http2Frame frame, RecvByteBufAllocator.Handle allocHandle) { void doRead0(Http2Frame frame, RecvByteBufAllocator.Handle allocHandle) {
pipeline().fireChannelRead(frame); final int bytes;
if (frame instanceof Http2DataFrame) {
bytes = ((Http2DataFrame) frame).initialFlowControlledBytes();
// It is important that we increment the flowControlledBytes before we call fireChannelRead(...)
// as it may cause a read() that will call updateLocalWindowIfNeeded() and we need to ensure
// in this case that we accounted for it.
//
// See https://github.com/netty/netty/issues/9663
flowControlledBytes += bytes;
} else {
bytes = MIN_HTTP2_FRAME_SIZE;
}
// Update before firing event through the pipeline to be consistent with other Channel implementation.
allocHandle.attemptedBytesRead(bytes);
allocHandle.lastBytesRead(bytes);
allocHandle.incMessagesRead(1); allocHandle.incMessagesRead(1);
if (frame instanceof Http2DataFrame) { pipeline().fireChannelRead(frame);
final int numBytesToBeConsumed = ((Http2DataFrame) frame).initialFlowControlledBytes();
allocHandle.attemptedBytesRead(numBytesToBeConsumed);
allocHandle.lastBytesRead(numBytesToBeConsumed);
return numBytesToBeConsumed;
} else {
allocHandle.attemptedBytesRead(MIN_HTTP2_FRAME_SIZE);
allocHandle.lastBytesRead(MIN_HTTP2_FRAME_SIZE);
}
return 0;
} }
@Override @Override