Do not fire outbound exception throught the pipeline when using Http2FrameCodec / Http2MultiplexCodec

Motivation:

Usually when using netty exceptions which happen for outbound operations should not be fired through the pipeline but only the ChannelPromise should be failed.

Modifications:

- Change Http2LifecycleManager.onError(...) to take also an boolean that indicate if the error was caused by an outbound operation
- Channel Http2ConnectionHandler.on*Error(...) methods to also take this boolean
- Change Http2FrameCodec to only fire exceptions through the pipeline if these are not outbound operations related
- Add unit test.

Result:

More consistent error handling when using Http2FrameCodec and Http2MultiplexCodec.
This commit is contained in:
Norman Maurer 2018-01-25 22:42:28 +01:00 committed by Scott Mitchell
parent 8a095d0244
commit 1df5b02fd9
6 changed files with 74 additions and 36 deletions

View File

@ -211,7 +211,7 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
// other headers containing pseudo-header fields. // other headers containing pseudo-header fields.
stream.headersSent(isInformational); stream.headersSent(isInformational);
} else { } else {
lifecycleManager.onError(ctx, failureCause); lifecycleManager.onError(ctx, true, failureCause);
} }
return future; return future;
@ -223,7 +223,7 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
return promise; return promise;
} }
} catch (Throwable t) { } catch (Throwable t) {
lifecycleManager.onError(ctx, t); lifecycleManager.onError(ctx, true, t);
promise.tryFailure(t); promise.tryFailure(t);
return promise; return promise;
} }
@ -287,11 +287,11 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
if (failureCause == null) { if (failureCause == null) {
stream.pushPromiseSent(); stream.pushPromiseSent();
} else { } else {
lifecycleManager.onError(ctx, failureCause); lifecycleManager.onError(ctx, true, failureCause);
} }
return future; return future;
} catch (Throwable t) { } catch (Throwable t) {
lifecycleManager.onError(ctx, t); lifecycleManager.onError(ctx, true, t);
promise.tryFailure(t); promise.tryFailure(t);
return promise; return promise;
} }
@ -376,7 +376,7 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
queue.releaseAndFailAll(cause); queue.releaseAndFailAll(cause);
// Don't update dataSize because we need to ensure the size() method returns a consistent size even after // Don't update dataSize because we need to ensure the size() method returns a consistent size even after
// error so we don't invalidate flow control when returning bytes to flow control. // error so we don't invalidate flow control when returning bytes to flow control.
lifecycleManager.onError(ctx, cause); lifecycleManager.onError(ctx, true, cause);
} }
@Override @Override
@ -456,7 +456,7 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
@Override @Override
public void error(ChannelHandlerContext ctx, Throwable cause) { public void error(ChannelHandlerContext ctx, Throwable cause) {
if (ctx != null) { if (ctx != null) {
lifecycleManager.onError(ctx, cause); lifecycleManager.onError(ctx, true, cause);
} }
promise.tryFailure(cause); promise.tryFailure(cause);
} }
@ -476,7 +476,7 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
if (failureCause == null) { if (failureCause == null) {
stream.headersSent(isInformational); stream.headersSent(isInformational);
} else { } else {
lifecycleManager.onError(ctx, failureCause); lifecycleManager.onError(ctx, true, failureCause);
} }
} }

View File

@ -200,9 +200,9 @@ public class Http2ConnectionHandler extends ByteToMessageDecoder implements Http
encoder.flowController().writePendingBytes(); encoder.flowController().writePendingBytes();
ctx.flush(); ctx.flush();
} catch (Http2Exception e) { } catch (Http2Exception e) {
onError(ctx, e); onError(ctx, true, e);
} catch (Throwable cause) { } catch (Throwable cause) {
onError(ctx, connectionError(INTERNAL_ERROR, cause, "Error flushing")); onError(ctx, true, connectionError(INTERNAL_ERROR, cause, "Error flushing"));
} }
} }
@ -254,7 +254,7 @@ public class Http2ConnectionHandler extends ByteToMessageDecoder implements Http
byteDecoder.decode(ctx, in, out); byteDecoder.decode(ctx, in, out);
} }
} catch (Throwable e) { } catch (Throwable e) {
onError(ctx, e); onError(ctx, false, e);
} }
} }
@ -389,7 +389,7 @@ public class Http2ConnectionHandler extends ByteToMessageDecoder implements Http
try { try {
decoder.decodeFrame(ctx, in, out); decoder.decodeFrame(ctx, in, out);
} catch (Throwable e) { } catch (Throwable e) {
onError(ctx, e); onError(ctx, false, e);
} }
} }
} }
@ -538,7 +538,7 @@ public class Http2ConnectionHandler extends ByteToMessageDecoder implements Http
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
if (getEmbeddedHttp2Exception(cause) != null) { if (getEmbeddedHttp2Exception(cause) != null) {
// Some exception in the causality chain is an Http2Exception - handle it. // Some exception in the causality chain is an Http2Exception - handle it.
onError(ctx, cause); onError(ctx, false, cause);
} else { } else {
super.exceptionCaught(ctx, cause); super.exceptionCaught(ctx, cause);
} }
@ -604,17 +604,17 @@ public class Http2ConnectionHandler extends ByteToMessageDecoder implements Http
* Central handler for all exceptions caught during HTTP/2 processing. * Central handler for all exceptions caught during HTTP/2 processing.
*/ */
@Override @Override
public void onError(ChannelHandlerContext ctx, Throwable cause) { public void onError(ChannelHandlerContext ctx, boolean outbound, Throwable cause) {
Http2Exception embedded = getEmbeddedHttp2Exception(cause); Http2Exception embedded = getEmbeddedHttp2Exception(cause);
if (isStreamError(embedded)) { if (isStreamError(embedded)) {
onStreamError(ctx, cause, (StreamException) embedded); onStreamError(ctx, outbound, cause, (StreamException) embedded);
} else if (embedded instanceof CompositeStreamException) { } else if (embedded instanceof CompositeStreamException) {
CompositeStreamException compositException = (CompositeStreamException) embedded; CompositeStreamException compositException = (CompositeStreamException) embedded;
for (StreamException streamException : compositException) { for (StreamException streamException : compositException) {
onStreamError(ctx, cause, streamException); onStreamError(ctx, outbound, cause, streamException);
} }
} else { } else {
onConnectionError(ctx, cause, embedded); onConnectionError(ctx, outbound, cause, embedded);
} }
ctx.flush(); ctx.flush();
} }
@ -633,11 +633,13 @@ public class Http2ConnectionHandler extends ByteToMessageDecoder implements Http
* streams are closed, the connection is shut down. * streams are closed, the connection is shut down.
* *
* @param ctx the channel context * @param ctx the channel context
* @param outbound {@code true} if the error was caused by an outbound operation.
* @param cause the exception that was caught * @param cause the exception that was caught
* @param http2Ex the {@link Http2Exception} that is embedded in the causality chain. This may * @param http2Ex the {@link Http2Exception} that is embedded in the causality chain. This may
* be {@code null} if it's an unknown exception. * be {@code null} if it's an unknown exception.
*/ */
protected void onConnectionError(ChannelHandlerContext ctx, Throwable cause, Http2Exception http2Ex) { protected void onConnectionError(ChannelHandlerContext ctx, boolean outbound,
Throwable cause, Http2Exception http2Ex) {
if (http2Ex == null) { if (http2Ex == null) {
http2Ex = new Http2Exception(INTERNAL_ERROR, cause.getMessage(), cause); http2Ex = new Http2Exception(INTERNAL_ERROR, cause.getMessage(), cause);
} }
@ -659,11 +661,12 @@ public class Http2ConnectionHandler extends ByteToMessageDecoder implements Http
* stream. * stream.
* *
* @param ctx the channel context * @param ctx the channel context
* @param outbound {@code true} if the error was caused by an outbound operation.
* @param cause the exception that was caught * @param cause the exception that was caught
* @param http2Ex the {@link StreamException} that is embedded in the causality chain. * @param http2Ex the {@link StreamException} that is embedded in the causality chain.
*/ */
protected void onStreamError(ChannelHandlerContext ctx, @SuppressWarnings("unused") Throwable cause, protected void onStreamError(ChannelHandlerContext ctx, boolean outbound,
StreamException http2Ex) { @SuppressWarnings("unused") Throwable cause, StreamException http2Ex) {
final int streamId = http2Ex.streamId(); final int streamId = http2Ex.streamId();
Http2Stream stream = connection().stream(streamId); Http2Stream stream = connection().stream(streamId);
@ -692,7 +695,7 @@ public class Http2ConnectionHandler extends ByteToMessageDecoder implements Http
try { try {
handleServerHeaderDecodeSizeError(ctx, stream); handleServerHeaderDecodeSizeError(ctx, stream);
} catch (Throwable cause2) { } catch (Throwable cause2) {
onError(ctx, connectionError(INTERNAL_ERROR, cause2, "Error DecodeSizeError")); onError(ctx, outbound, connectionError(INTERNAL_ERROR, cause2, "Error DecodeSizeError"));
} }
} }
} }
@ -867,13 +870,13 @@ public class Http2ConnectionHandler extends ByteToMessageDecoder implements Http
closeStream(stream, future); closeStream(stream, future);
} else { } else {
// The connection will be closed and so no need to change the resetSent flag to false. // The connection will be closed and so no need to change the resetSent flag to false.
onConnectionError(ctx, future.cause(), null); onConnectionError(ctx, true, future.cause(), null);
} }
} }
private void closeConnectionOnError(ChannelHandlerContext ctx, ChannelFuture future) { private void closeConnectionOnError(ChannelHandlerContext ctx, ChannelFuture future) {
if (!future.isSuccess()) { if (!future.isSuccess()) {
onConnectionError(ctx, future.cause(), null); onConnectionError(ctx, true, future.cause(), null);
} }
} }

View File

@ -186,7 +186,7 @@ public class Http2FrameCodec extends Http2ConnectionHandler {
try { try {
return streamVisitor.visit((Http2FrameStream) stream.getProperty(streamKey)); return streamVisitor.visit((Http2FrameStream) stream.getProperty(streamKey));
} catch (Throwable cause) { } catch (Throwable cause) {
onError(ctx, cause); onError(ctx, false, cause);
return false; return false;
} }
} }
@ -431,11 +431,16 @@ public class Http2FrameCodec extends Http2ConnectionHandler {
} }
@Override @Override
protected void onConnectionError(ChannelHandlerContext ctx, Throwable cause, Http2Exception http2Ex) { protected void onConnectionError(
// allow the user to handle it first in the pipeline, and then automatically clean up. ChannelHandlerContext ctx, boolean outbound, Throwable cause, Http2Exception http2Ex) {
// If this is not desired behavior the user can override this method. if (!outbound) {
ctx.fireExceptionCaught(cause); // allow the user to handle it first in the pipeline, and then automatically clean up.
super.onConnectionError(ctx, cause, http2Ex); // If this is not desired behavior the user can override this method.
//
// We only forward non outbound errors as outbound errors will already be reflected by failing the promise.
ctx.fireExceptionCaught(cause);
}
super.onConnectionError(ctx, outbound, cause, http2Ex);
} }
/** /**
@ -443,14 +448,14 @@ public class Http2FrameCodec extends Http2ConnectionHandler {
* are simply logged and replied to by sending a RST_STREAM frame. * are simply logged and replied to by sending a RST_STREAM frame.
*/ */
@Override @Override
protected final void onStreamError(ChannelHandlerContext ctx, Throwable cause, protected final void onStreamError(ChannelHandlerContext ctx, boolean outbound, Throwable cause,
Http2Exception.StreamException streamException) { Http2Exception.StreamException streamException) {
int streamId = streamException.streamId(); int streamId = streamException.streamId();
Http2Stream connectionStream = connection().stream(streamId); Http2Stream connectionStream = connection().stream(streamId);
if (connectionStream == null) { if (connectionStream == null) {
onHttp2UnknownStreamError(ctx, cause, streamException); onHttp2UnknownStreamError(ctx, cause, streamException);
// Write a RST_STREAM // Write a RST_STREAM
super.onStreamError(ctx, cause, streamException); super.onStreamError(ctx, outbound, cause, streamException);
return; return;
} }
@ -458,11 +463,14 @@ public class Http2FrameCodec extends Http2ConnectionHandler {
if (stream == null) { if (stream == null) {
LOG.warn("Stream exception thrown without stream object attached.", cause); LOG.warn("Stream exception thrown without stream object attached.", cause);
// Write a RST_STREAM // Write a RST_STREAM
super.onStreamError(ctx, cause, streamException); super.onStreamError(ctx, outbound, cause, streamException);
return; return;
} }
onHttp2FrameStreamException(ctx, new Http2FrameStreamException(stream, streamException.error(), cause)); if (!outbound) {
// We only forward non outbound errors as outbound errors will already be reflected by failing the promise.
onHttp2FrameStreamException(ctx, new Http2FrameStreamException(stream, streamException.error(), cause));
}
} }
void onHttp2UnknownStreamError(@SuppressWarnings("unused") ChannelHandlerContext ctx, Throwable cause, void onHttp2UnknownStreamError(@SuppressWarnings("unused") ChannelHandlerContext ctx, Throwable cause,

View File

@ -88,6 +88,11 @@ public interface Http2LifecycleManager {
/** /**
* Processes the given error. * Processes the given error.
*
* @param ctx The context used for communication and buffer allocation if necessary.
* @param outbound {@code true} if the error was caused by an outbound operation and so the corresponding
* {@link ChannelPromise} was failed as well.
* @param cause the error.
*/ */
void onError(ChannelHandlerContext ctx, Throwable cause); void onError(ChannelHandlerContext ctx, boolean outbound, Throwable cause);
} }

View File

@ -112,7 +112,7 @@ public class HttpToHttp2ConnectionHandler extends Http2ConnectionHandler {
} }
} }
} catch (Throwable t) { } catch (Throwable t) {
onError(ctx, t); onError(ctx, true, t);
promiseAggregator.setFailure(t); promiseAggregator.setFailure(t);
} finally { } finally {
if (release) { if (release) {

View File

@ -389,14 +389,14 @@ public class Http2FrameCodecTest {
} }
@Test @Test
public void streamErrorShouldFireException() throws Exception { public void streamErrorShouldFireExceptionForInbound() throws Exception {
frameListener.onHeadersRead(http2HandlerCtx, 3, request, 31, false); frameListener.onHeadersRead(http2HandlerCtx, 3, request, 31, false);
Http2Stream stream = frameCodec.connection().stream(3); Http2Stream stream = frameCodec.connection().stream(3);
assertNotNull(stream); assertNotNull(stream);
StreamException streamEx = new StreamException(3, Http2Error.INTERNAL_ERROR, "foo"); StreamException streamEx = new StreamException(3, Http2Error.INTERNAL_ERROR, "foo");
frameCodec.onError(http2HandlerCtx, streamEx); frameCodec.onError(http2HandlerCtx, false, streamEx);
Http2FrameStreamEvent event = inboundHandler.readInboundMessageOrUserEvent(); Http2FrameStreamEvent event = inboundHandler.readInboundMessageOrUserEvent();
assertEquals(Http2FrameStreamEvent.Type.State, event.type()); assertEquals(Http2FrameStreamEvent.Type.State, event.type());
@ -414,6 +414,28 @@ public class Http2FrameCodecTest {
assertNull(inboundHandler.readInboundMessageOrUserEvent()); assertNull(inboundHandler.readInboundMessageOrUserEvent());
} }
@Test
public void streamErrorShouldNotFireExceptionForOutbound() throws Exception {
frameListener.onHeadersRead(http2HandlerCtx, 3, request, 31, false);
Http2Stream stream = frameCodec.connection().stream(3);
assertNotNull(stream);
StreamException streamEx = new StreamException(3, Http2Error.INTERNAL_ERROR, "foo");
frameCodec.onError(http2HandlerCtx, true, streamEx);
Http2FrameStreamEvent event = inboundHandler.readInboundMessageOrUserEvent();
assertEquals(Http2FrameStreamEvent.Type.State, event.type());
assertEquals(State.OPEN, event.stream().state());
Http2HeadersFrame headersFrame = inboundHandler.readInboundMessageOrUserEvent();
assertNotNull(headersFrame);
// No exception expected
inboundHandler.checkException();
assertNull(inboundHandler.readInboundMessageOrUserEvent());
}
@Test @Test
public void windowUpdateFrameDecrementsConsumedBytes() throws Exception { public void windowUpdateFrameDecrementsConsumedBytes() throws Exception {
frameListener.onHeadersRead(http2HandlerCtx, 3, request, 31, false); frameListener.onHeadersRead(http2HandlerCtx, 3, request, 31, false);