Correctly propagate failures while update the flow-controller to the … (#9664)
Motivation: We may fail to update the flow-controller and in this case need to notify the stream channel and close it. Modifications: Attach a future to the write of the update frame and in case of a failure propagate it to the channel and close it Result: Fixes https://github.com/netty/netty/issues/9663
This commit is contained in:
parent
c7441f68f3
commit
9bf10f2dc5
@ -105,6 +105,28 @@ abstract class AbstractHttp2StreamChannel extends DefaultAttributeMap implements
|
|||||||
private static final AtomicIntegerFieldUpdater<AbstractHttp2StreamChannel> UNWRITABLE_UPDATER =
|
private static final AtomicIntegerFieldUpdater<AbstractHttp2StreamChannel> UNWRITABLE_UPDATER =
|
||||||
AtomicIntegerFieldUpdater.newUpdater(AbstractHttp2StreamChannel.class, "unwritable");
|
AtomicIntegerFieldUpdater.newUpdater(AbstractHttp2StreamChannel.class, "unwritable");
|
||||||
|
|
||||||
|
private static void windowUpdateFrameWriteComplete(ChannelFuture future, Channel streamChannel) {
|
||||||
|
Throwable cause = future.cause();
|
||||||
|
if (cause != null) {
|
||||||
|
Throwable unwrappedCause;
|
||||||
|
// Unwrap if needed
|
||||||
|
if (cause instanceof Http2FrameStreamException && ((unwrappedCause = cause.getCause()) != null)) {
|
||||||
|
cause = unwrappedCause;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Notify the child-channel and close it.
|
||||||
|
streamChannel.pipeline().fireExceptionCaught(cause);
|
||||||
|
streamChannel.unsafe().close(streamChannel.unsafe().voidPromise());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private final ChannelFutureListener windowUpdateFrameWriteListener = new ChannelFutureListener() {
|
||||||
|
@Override
|
||||||
|
public void operationComplete(ChannelFuture future) {
|
||||||
|
windowUpdateFrameWriteComplete(future, AbstractHttp2StreamChannel.this);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The current status of the read-processing for a {@link AbstractHttp2StreamChannel}.
|
* The current status of the read-processing for a {@link AbstractHttp2StreamChannel}.
|
||||||
*/
|
*/
|
||||||
@ -530,7 +552,7 @@ abstract class AbstractHttp2StreamChannel extends DefaultAttributeMap implements
|
|||||||
// otherwise we would have drained it from the queue and processed it during the read cycle.
|
// otherwise we would have drained it from the queue and processed it during the read cycle.
|
||||||
assert inboundBuffer == null || inboundBuffer.isEmpty();
|
assert inboundBuffer == null || inboundBuffer.isEmpty();
|
||||||
final RecvByteBufAllocator.Handle allocHandle = unsafe.recvBufAllocHandle();
|
final RecvByteBufAllocator.Handle allocHandle = unsafe.recvBufAllocHandle();
|
||||||
flowControlledBytes += unsafe.doRead0(frame, allocHandle);
|
unsafe.doRead0(frame, allocHandle);
|
||||||
// We currently don't need to check for readEOS because the parent channel and child channel are limited
|
// We currently don't need to check for readEOS because the parent channel and child channel are limited
|
||||||
// to the same EventLoop thread. There are a limited number of frame types that may come after EOS is
|
// to the same EventLoop thread. There are a limited number of frame types that may come after EOS is
|
||||||
// read (unknown, reset) and the trade off is less conditionals for the hot path (headers/data) at the
|
// read (unknown, reset) and the trade off is less conditionals for the hot path (headers/data) at the
|
||||||
@ -653,7 +675,8 @@ abstract class AbstractHttp2StreamChannel extends DefaultAttributeMap implements
|
|||||||
|
|
||||||
final boolean wasActive = isActive();
|
final boolean wasActive = isActive();
|
||||||
|
|
||||||
updateLocalWindowIfNeeded();
|
// There is no need to update the local window as once the stream is closed all the pending bytes will be
|
||||||
|
// given back to the connection window by the controller itself.
|
||||||
|
|
||||||
// Only ever send a reset frame if the connection is still alive and if the stream was created before
|
// Only ever send a reset frame if the connection is still alive and if the stream was created before
|
||||||
// as otherwise we may send a RST on a stream in an invalid state and cause a connection error.
|
// as otherwise we may send a RST on a stream in an invalid state and cause a connection error.
|
||||||
@ -790,7 +813,7 @@ abstract class AbstractHttp2StreamChannel extends DefaultAttributeMap implements
|
|||||||
allocHandle.reset(config());
|
allocHandle.reset(config());
|
||||||
boolean continueReading = false;
|
boolean continueReading = false;
|
||||||
do {
|
do {
|
||||||
flowControlledBytes += doRead0((Http2Frame) message, allocHandle);
|
doRead0((Http2Frame) message, allocHandle);
|
||||||
} while ((readEOS || (continueReading = allocHandle.continueReading()))
|
} while ((readEOS || (continueReading = allocHandle.continueReading()))
|
||||||
&& (message = pollQueuedMessage()) != null);
|
&& (message = pollQueuedMessage()) != null);
|
||||||
|
|
||||||
@ -817,8 +840,17 @@ abstract class AbstractHttp2StreamChannel extends DefaultAttributeMap implements
|
|||||||
if (flowControlledBytes != 0) {
|
if (flowControlledBytes != 0) {
|
||||||
int bytes = flowControlledBytes;
|
int bytes = flowControlledBytes;
|
||||||
flowControlledBytes = 0;
|
flowControlledBytes = 0;
|
||||||
write0(parentContext(), new DefaultHttp2WindowUpdateFrame(bytes).stream(stream));
|
ChannelFuture future = write0(parentContext(), new DefaultHttp2WindowUpdateFrame(bytes).stream(stream));
|
||||||
writeDoneAndNoFlush = true;
|
// Add a listener which will notify and teardown the stream
|
||||||
|
// when a window update fails if needed or check the result of the future directly if it was completed
|
||||||
|
// already.
|
||||||
|
// See https://github.com/netty/netty/issues/9663
|
||||||
|
if (future.isDone()) {
|
||||||
|
windowUpdateFrameWriteComplete(future, AbstractHttp2StreamChannel.this);
|
||||||
|
} else {
|
||||||
|
future.addListener(windowUpdateFrameWriteListener);
|
||||||
|
writeDoneAndNoFlush = true;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -847,20 +879,26 @@ abstract class AbstractHttp2StreamChannel extends DefaultAttributeMap implements
|
|||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("deprecation")
|
@SuppressWarnings("deprecation")
|
||||||
int doRead0(Http2Frame frame, RecvByteBufAllocator.Handle allocHandle) {
|
void doRead0(Http2Frame frame, RecvByteBufAllocator.Handle allocHandle) {
|
||||||
pipeline().fireChannelRead(frame);
|
final int bytes;
|
||||||
|
if (frame instanceof Http2DataFrame) {
|
||||||
|
bytes = ((Http2DataFrame) frame).initialFlowControlledBytes();
|
||||||
|
|
||||||
|
// It is important that we increment the flowControlledBytes before we call fireChannelRead(...)
|
||||||
|
// as it may cause a read() that will call updateLocalWindowIfNeeded() and we need to ensure
|
||||||
|
// in this case that we accounted for it.
|
||||||
|
//
|
||||||
|
// See https://github.com/netty/netty/issues/9663
|
||||||
|
flowControlledBytes += bytes;
|
||||||
|
} else {
|
||||||
|
bytes = MIN_HTTP2_FRAME_SIZE;
|
||||||
|
}
|
||||||
|
// Update before firing event through the pipeline to be consistent with other Channel implementation.
|
||||||
|
allocHandle.attemptedBytesRead(bytes);
|
||||||
|
allocHandle.lastBytesRead(bytes);
|
||||||
allocHandle.incMessagesRead(1);
|
allocHandle.incMessagesRead(1);
|
||||||
|
|
||||||
if (frame instanceof Http2DataFrame) {
|
pipeline().fireChannelRead(frame);
|
||||||
final int numBytesToBeConsumed = ((Http2DataFrame) frame).initialFlowControlledBytes();
|
|
||||||
allocHandle.attemptedBytesRead(numBytesToBeConsumed);
|
|
||||||
allocHandle.lastBytesRead(numBytesToBeConsumed);
|
|
||||||
return numBytesToBeConsumed;
|
|
||||||
} else {
|
|
||||||
allocHandle.attemptedBytesRead(MIN_HTTP2_FRAME_SIZE);
|
|
||||||
allocHandle.lastBytesRead(MIN_HTTP2_FRAME_SIZE);
|
|
||||||
}
|
|
||||||
return 0;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
Loading…
x
Reference in New Issue
Block a user