Move DefaultChannelPipeline.notifyHandlerException() to DefaultChannelHandlerContext / Always trigger exceptionCaught() at the handler that raised an exception
- Related: #1118
This commit is contained in:
parent
1603d9792d
commit
0e3825899a
@ -77,7 +77,6 @@ public abstract class ChannelInboundMessageHandlerAdapter<I>
|
||||
MessageBuf<Object> out = ctx.nextInboundMessageBuffer();
|
||||
int oldOutSize = out.size();
|
||||
try {
|
||||
boolean unsupportedFound = false;
|
||||
for (;;) {
|
||||
Object msg = in.poll();
|
||||
if (msg == null) {
|
||||
@ -86,18 +85,9 @@ public abstract class ChannelInboundMessageHandlerAdapter<I>
|
||||
|
||||
if (!acceptInboundMessage(msg)) {
|
||||
out.add(msg);
|
||||
unsupportedFound = true;
|
||||
continue;
|
||||
}
|
||||
|
||||
if (unsupportedFound) {
|
||||
// the last message were unsupported, but now we received one that is supported.
|
||||
// So reset the flag and notify the next context
|
||||
unsupportedFound = false;
|
||||
ctx.fireInboundBufferUpdated();
|
||||
oldOutSize = out.size();
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
I imsg = (I) msg;
|
||||
try {
|
||||
@ -109,8 +99,6 @@ public abstract class ChannelInboundMessageHandlerAdapter<I>
|
||||
BufUtil.release(imsg);
|
||||
}
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
exceptionCaught(ctx, t);
|
||||
} finally {
|
||||
if (oldOutSize != out.size()) {
|
||||
ctx.fireInboundBufferUpdated();
|
||||
|
@ -355,7 +355,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
|
||||
try {
|
||||
((ChannelInboundHandler) handler).freeInboundBuffer(this);
|
||||
} catch (Exception e) {
|
||||
pipeline.notifyHandlerException(e);
|
||||
notifyHandlerException(e);
|
||||
} finally {
|
||||
freeInboundBridge();
|
||||
}
|
||||
@ -364,7 +364,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
|
||||
try {
|
||||
((ChannelOutboundHandler) handler).freeOutboundBuffer(this);
|
||||
} catch (Exception e) {
|
||||
pipeline.notifyHandlerException(e);
|
||||
notifyHandlerException(e);
|
||||
} finally {
|
||||
freeOutboundBridge();
|
||||
}
|
||||
@ -720,7 +720,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
|
||||
try {
|
||||
((ChannelStateHandler) handler()).channelRegistered(this);
|
||||
} catch (Throwable t) {
|
||||
pipeline.notifyHandlerException(t);
|
||||
notifyHandlerException(t);
|
||||
} finally {
|
||||
freeHandlerBuffersAfterRemoval();
|
||||
}
|
||||
@ -747,7 +747,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
|
||||
try {
|
||||
((ChannelStateHandler) handler()).channelUnregistered(this);
|
||||
} catch (Throwable t) {
|
||||
pipeline.notifyHandlerException(t);
|
||||
notifyHandlerException(t);
|
||||
}
|
||||
}
|
||||
|
||||
@ -773,7 +773,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
|
||||
try {
|
||||
((ChannelStateHandler) handler()).channelActive(this);
|
||||
} catch (Throwable t) {
|
||||
pipeline.notifyHandlerException(t);
|
||||
notifyHandlerException(t);
|
||||
} finally {
|
||||
freeHandlerBuffersAfterRemoval();
|
||||
}
|
||||
@ -800,7 +800,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
|
||||
try {
|
||||
((ChannelStateHandler) handler()).channelInactive(this);
|
||||
} catch (Throwable t) {
|
||||
pipeline.notifyHandlerException(t);
|
||||
notifyHandlerException(t);
|
||||
} finally {
|
||||
freeHandlerBuffersAfterRemoval();
|
||||
}
|
||||
@ -812,16 +812,20 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
|
||||
throw new NullPointerException("cause");
|
||||
}
|
||||
|
||||
final DefaultChannelHandlerContext next = findContextInbound();
|
||||
EventExecutor executor = next.executor();
|
||||
findContextInbound().invokeExceptionCaught(cause);
|
||||
return this;
|
||||
}
|
||||
|
||||
private void invokeExceptionCaught(final Throwable cause) {
|
||||
EventExecutor executor = executor();
|
||||
if (prev != null && executor.inEventLoop()) {
|
||||
next.invokeExceptionCaught(cause);
|
||||
invokeExceptionCaught0(cause);
|
||||
} else {
|
||||
try {
|
||||
executor.execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
next.invokeExceptionCaught(cause);
|
||||
invokeExceptionCaught0(cause);
|
||||
}
|
||||
});
|
||||
} catch (Throwable t) {
|
||||
@ -831,10 +835,9 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
|
||||
}
|
||||
}
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
||||
private void invokeExceptionCaught(Throwable cause) {
|
||||
private void invokeExceptionCaught0(Throwable cause) {
|
||||
ChannelStateHandler handler = (ChannelStateHandler) handler();
|
||||
try {
|
||||
handler.exceptionCaught(this, cause);
|
||||
@ -876,7 +879,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
|
||||
try {
|
||||
handler.userEventTriggered(this, event);
|
||||
} catch (Throwable t) {
|
||||
pipeline.notifyHandlerException(t);
|
||||
notifyHandlerException(t);
|
||||
} finally {
|
||||
freeHandlerBuffersAfterRemoval();
|
||||
}
|
||||
@ -945,13 +948,13 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
|
||||
break;
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
pipeline.notifyHandlerException(t);
|
||||
notifyHandlerException(t);
|
||||
} finally {
|
||||
if (handler instanceof ChannelInboundByteHandler && !pipeline.isInboundShutdown()) {
|
||||
try {
|
||||
((ChannelInboundByteHandler) handler).discardInboundReadBytes(this);
|
||||
} catch (Throwable t) {
|
||||
pipeline.notifyHandlerException(t);
|
||||
notifyHandlerException(t);
|
||||
}
|
||||
}
|
||||
freeHandlerBuffersAfterRemoval();
|
||||
@ -961,7 +964,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
|
||||
try {
|
||||
handler.inboundBufferUpdated(this);
|
||||
} catch (Throwable t) {
|
||||
pipeline.notifyHandlerException(t);
|
||||
notifyHandlerException(t);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -996,7 +999,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
|
||||
try {
|
||||
((ChannelStateHandler) handler()).channelReadSuspended(this);
|
||||
} catch (Throwable t) {
|
||||
pipeline.notifyHandlerException(t);
|
||||
notifyHandlerException(t);
|
||||
} finally {
|
||||
freeHandlerBuffersAfterRemoval();
|
||||
}
|
||||
@ -1070,7 +1073,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
|
||||
try {
|
||||
((ChannelOperationHandler) handler()).bind(this, localAddress, promise);
|
||||
} catch (Throwable t) {
|
||||
pipeline.notifyHandlerException(t);
|
||||
notifyHandlerException(t);
|
||||
} finally {
|
||||
freeHandlerBuffersAfterRemoval();
|
||||
}
|
||||
@ -1111,7 +1114,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
|
||||
try {
|
||||
((ChannelOperationHandler) handler()).connect(this, remoteAddress, localAddress, promise);
|
||||
} catch (Throwable t) {
|
||||
pipeline.notifyHandlerException(t);
|
||||
notifyHandlerException(t);
|
||||
} finally {
|
||||
freeHandlerBuffersAfterRemoval();
|
||||
}
|
||||
@ -1150,7 +1153,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
|
||||
try {
|
||||
((ChannelOperationHandler) handler()).disconnect(this, promise);
|
||||
} catch (Throwable t) {
|
||||
pipeline.notifyHandlerException(t);
|
||||
notifyHandlerException(t);
|
||||
} finally {
|
||||
freeHandlerBuffersAfterRemoval();
|
||||
}
|
||||
@ -1182,7 +1185,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
|
||||
try {
|
||||
((ChannelOperationHandler) handler()).close(this, promise);
|
||||
} catch (Throwable t) {
|
||||
pipeline.notifyHandlerException(t);
|
||||
notifyHandlerException(t);
|
||||
} finally {
|
||||
freeHandlerBuffersAfterRemoval();
|
||||
}
|
||||
@ -1214,7 +1217,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
|
||||
try {
|
||||
((ChannelOperationHandler) handler()).deregister(this, promise);
|
||||
} catch (Throwable t) {
|
||||
pipeline.notifyHandlerException(t);
|
||||
notifyHandlerException(t);
|
||||
} finally {
|
||||
freeHandlerBuffersAfterRemoval();
|
||||
}
|
||||
@ -1247,7 +1250,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
|
||||
try {
|
||||
((ChannelOperationHandler) handler()).read(this);
|
||||
} catch (Throwable t) {
|
||||
pipeline.notifyHandlerException(t);
|
||||
notifyHandlerException(t);
|
||||
} finally {
|
||||
freeHandlerBuffersAfterRemoval();
|
||||
}
|
||||
@ -1315,13 +1318,13 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
|
||||
try {
|
||||
handler.flush(this, promise);
|
||||
} catch (Throwable t) {
|
||||
pipeline.notifyHandlerException(t);
|
||||
notifyHandlerException(t);
|
||||
} finally {
|
||||
if (handler instanceof ChannelOutboundByteHandler && !pipeline.isOutboundShutdown()) {
|
||||
try {
|
||||
((ChannelOutboundByteHandler) handler).discardOutboundReadBytes(this);
|
||||
} catch (Throwable t) {
|
||||
pipeline.notifyHandlerException(t);
|
||||
notifyHandlerException(t);
|
||||
}
|
||||
}
|
||||
freeHandlerBuffersAfterRemoval();
|
||||
@ -1367,7 +1370,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
|
||||
try {
|
||||
handler.sendFile(this, region, promise);
|
||||
} catch (Throwable t) {
|
||||
pipeline.notifyHandlerException(t);
|
||||
notifyHandlerException(t);
|
||||
} finally {
|
||||
freeHandlerBuffersAfterRemoval();
|
||||
}
|
||||
@ -1475,7 +1478,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
|
||||
try {
|
||||
h.freeInboundBuffer(this);
|
||||
} catch (Throwable t) {
|
||||
pipeline.notifyHandlerException(t);
|
||||
notifyHandlerException(t);
|
||||
} finally {
|
||||
freeInboundBridge();
|
||||
}
|
||||
@ -1527,7 +1530,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
|
||||
try {
|
||||
h.freeOutboundBuffer(this);
|
||||
} catch (Throwable t) {
|
||||
pipeline.notifyHandlerException(t);
|
||||
notifyHandlerException(t);
|
||||
} finally {
|
||||
freeOutboundBridge();
|
||||
}
|
||||
@ -1538,6 +1541,43 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
|
||||
}
|
||||
}
|
||||
|
||||
private void notifyHandlerException(Throwable cause) {
|
||||
if (inExceptionCaught(cause)) {
|
||||
if (logger.isWarnEnabled()) {
|
||||
logger.warn(
|
||||
"An exception was thrown by a user handler " +
|
||||
"while handling an exceptionCaught event", cause);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
if (handler() instanceof ChannelStateHandler) {
|
||||
invokeExceptionCaught(cause);
|
||||
} else {
|
||||
findContextInbound().invokeExceptionCaught(cause);
|
||||
}
|
||||
}
|
||||
|
||||
private static boolean inExceptionCaught(Throwable cause) {
|
||||
do {
|
||||
StackTraceElement[] trace = cause.getStackTrace();
|
||||
if (trace != null) {
|
||||
for (StackTraceElement t : trace) {
|
||||
if (t == null) {
|
||||
break;
|
||||
}
|
||||
if ("exceptionCaught".equals(t.getMethodName())) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
cause = cause.getCause();
|
||||
} while (cause != null);
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelPromise newPromise() {
|
||||
return new DefaultChannelPromise(channel());
|
||||
|
@ -1030,42 +1030,6 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
||||
return tail.write(message, promise);
|
||||
}
|
||||
|
||||
void notifyHandlerException(Throwable cause) {
|
||||
if (!(cause instanceof ChannelPipelineException)) {
|
||||
cause = new ChannelPipelineException(cause);
|
||||
}
|
||||
|
||||
if (inExceptionCaught(cause)) {
|
||||
if (logger.isWarnEnabled()) {
|
||||
logger.warn(
|
||||
"An exception was thrown by a user handler " +
|
||||
"while handling an exceptionCaught event", cause);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
fireExceptionCaught(cause);
|
||||
}
|
||||
|
||||
private static boolean inExceptionCaught(Throwable cause) {
|
||||
for (;;) {
|
||||
if (cause == null) {
|
||||
return false;
|
||||
}
|
||||
|
||||
StackTraceElement[] trace = cause.getStackTrace();
|
||||
if (trace != null) {
|
||||
for (StackTraceElement t : trace) {
|
||||
if ("exceptionCaught".equals(t.getMethodName())) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
cause = cause.getCause();
|
||||
}
|
||||
}
|
||||
|
||||
private void checkDuplicateName(String name) {
|
||||
if (name2ctx.containsKey(name)) {
|
||||
throw new IllegalArgumentException("Duplicate handler name: " + name);
|
||||
|
Loading…
x
Reference in New Issue
Block a user