Simplify the exception handling of ChannelOutboundHandler.write()

DefaultChannelHandlerContext does not trigger exceptionCaught() immediately when ChannelOutboundHandler.write() raises an exception.  It just records the exception until flush() is triggered.  On invokeFlush(), if there's any exception recorded, DefaultChannelHandlerContext will fail the promise without calling ChannelOutboundHandler.flush().  If more than one exception were raised, only the first exception is used as the cause of the failure and the others will be logged at warn level.
This commit is contained in:
Trustin Lee 2013-07-10 00:36:47 +09:00
parent 26e9d70457
commit 7bedd8f28e
2 changed files with 28 additions and 42 deletions

View File

@ -22,8 +22,6 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise; import io.netty.channel.ChannelPromise;
import io.netty.util.internal.EmptyArrays; import io.netty.util.internal.EmptyArrays;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
/** /**
@ -55,8 +53,6 @@ public class SpdySessionHandler
private final AtomicInteger pings = new AtomicInteger(); private final AtomicInteger pings = new AtomicInteger();
private final Queue<Object> outboundBuffer = new ArrayDeque<Object>();
private boolean sentGoAwayFrame; private boolean sentGoAwayFrame;
private boolean receivedGoAwayFrame; private boolean receivedGoAwayFrame;
@ -397,44 +393,19 @@ public class SpdySessionHandler
@Override @Override
public void write(ChannelHandlerContext ctx, Object msg) throws Exception { public void write(ChannelHandlerContext ctx, Object msg) throws Exception {
outboundBuffer.add(msg); if (msg instanceof SpdyDataFrame ||
} msg instanceof SpdySynStreamFrame ||
msg instanceof SpdySynReplyFrame ||
msg instanceof SpdyRstStreamFrame ||
msg instanceof SpdySettingsFrame ||
msg instanceof SpdyPingFrame ||
msg instanceof SpdyGoAwayFrame ||
msg instanceof SpdyHeadersFrame ||
msg instanceof SpdyWindowUpdateFrame) {
@Override handleOutboundMessage(ctx, msg);
public void flush(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { } else {
try { ctx.write(msg);
for (;;) {
Object msg = outboundBuffer.poll();
if (msg == null) {
break;
}
if (msg instanceof SpdyDataFrame ||
msg instanceof SpdySynStreamFrame ||
msg instanceof SpdySynReplyFrame ||
msg instanceof SpdyRstStreamFrame ||
msg instanceof SpdySettingsFrame ||
msg instanceof SpdyPingFrame ||
msg instanceof SpdyGoAwayFrame ||
msg instanceof SpdyHeadersFrame ||
msg instanceof SpdyWindowUpdateFrame) {
try {
handleOutboundMessage(ctx, msg);
} catch (SpdyProtocolException e) {
if (e == PROTOCOL_EXCEPTION) {
// On the case of PROTOCOL_EXCEPTION, fail the promise directly
// See #1211
promise.setFailure(PROTOCOL_EXCEPTION);
return;
}
}
} else {
ctx.write(msg);
}
}
ctx.flush(promise);
} finally {
outboundBuffer.clear();
} }
} }

View File

@ -19,6 +19,7 @@ import io.netty.buffer.ByteBufAllocator;
import io.netty.util.DefaultAttributeMap; import io.netty.util.DefaultAttributeMap;
import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.EventExecutorGroup; import io.netty.util.concurrent.EventExecutorGroup;
import io.netty.util.internal.StringUtil;
import java.net.SocketAddress; import java.net.SocketAddress;
@ -33,6 +34,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
private final DefaultChannelPipeline pipeline; private final DefaultChannelPipeline pipeline;
private final String name; private final String name;
private final ChannelHandler handler; private final ChannelHandler handler;
private Throwable lastWriteException;
private boolean removed; private boolean removed;
// Will be set to null if no child executor should be used, otherwise it will be set to the // Will be set to null if no child executor should be used, otherwise it will be set to the
@ -696,7 +698,13 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
try { try {
handler.write(this, msg); handler.write(this, msg);
} catch (Throwable t) { } catch (Throwable t) {
notifyHandlerException(t); if (lastWriteException == null) {
lastWriteException = t;
} else if (logger.isWarnEnabled()) {
logger.warn(
"More than one exception was raised by " + StringUtil.simpleClassName(handler) + ".write()." +
"Will fail the subsequent flush() with the first one and log others.", t);
}
} }
} }
@ -723,6 +731,13 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
} }
private void invokeFlush0(ChannelPromise promise) { private void invokeFlush0(ChannelPromise promise) {
Throwable lastWriteException = this.lastWriteException;
if (lastWriteException != null) {
this.lastWriteException = null;
promise.setFailure(lastWriteException);
return;
}
try { try {
((ChannelOutboundHandler) handler()).flush(this, promise); ((ChannelOutboundHandler) handler()).flush(this, promise);
} catch (Throwable t) { } catch (Throwable t) {