Simplify the exception handling of ChannelOutboundHandler.write()
DefaultChannelHandlerContext does not trigger exceptionCaught() immediately when ChannelOutboundHandler.write() raises an exception. It just records the exception until flush() is triggered. On invokeFlush(), if there's any exception recorded, DefaultChannelHandlerContext will fail the promise without calling ChannelOutboundHandler.flush(). If more than one exception were raised, only the first exception is used as the cause of the failure and the others will be logged at warn level.
This commit is contained in:
parent
26e9d70457
commit
7bedd8f28e
@ -22,8 +22,6 @@ import io.netty.channel.ChannelHandlerContext;
|
|||||||
import io.netty.channel.ChannelPromise;
|
import io.netty.channel.ChannelPromise;
|
||||||
import io.netty.util.internal.EmptyArrays;
|
import io.netty.util.internal.EmptyArrays;
|
||||||
|
|
||||||
import java.util.ArrayDeque;
|
|
||||||
import java.util.Queue;
|
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -55,8 +53,6 @@ public class SpdySessionHandler
|
|||||||
|
|
||||||
private final AtomicInteger pings = new AtomicInteger();
|
private final AtomicInteger pings = new AtomicInteger();
|
||||||
|
|
||||||
private final Queue<Object> outboundBuffer = new ArrayDeque<Object>();
|
|
||||||
|
|
||||||
private boolean sentGoAwayFrame;
|
private boolean sentGoAwayFrame;
|
||||||
private boolean receivedGoAwayFrame;
|
private boolean receivedGoAwayFrame;
|
||||||
|
|
||||||
@ -397,18 +393,6 @@ public class SpdySessionHandler
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void write(ChannelHandlerContext ctx, Object msg) throws Exception {
|
public void write(ChannelHandlerContext ctx, Object msg) throws Exception {
|
||||||
outboundBuffer.add(msg);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void flush(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
|
|
||||||
try {
|
|
||||||
for (;;) {
|
|
||||||
Object msg = outboundBuffer.poll();
|
|
||||||
if (msg == null) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (msg instanceof SpdyDataFrame ||
|
if (msg instanceof SpdyDataFrame ||
|
||||||
msg instanceof SpdySynStreamFrame ||
|
msg instanceof SpdySynStreamFrame ||
|
||||||
msg instanceof SpdySynReplyFrame ||
|
msg instanceof SpdySynReplyFrame ||
|
||||||
@ -418,25 +402,12 @@ public class SpdySessionHandler
|
|||||||
msg instanceof SpdyGoAwayFrame ||
|
msg instanceof SpdyGoAwayFrame ||
|
||||||
msg instanceof SpdyHeadersFrame ||
|
msg instanceof SpdyHeadersFrame ||
|
||||||
msg instanceof SpdyWindowUpdateFrame) {
|
msg instanceof SpdyWindowUpdateFrame) {
|
||||||
try {
|
|
||||||
handleOutboundMessage(ctx, msg);
|
handleOutboundMessage(ctx, msg);
|
||||||
} catch (SpdyProtocolException e) {
|
|
||||||
if (e == PROTOCOL_EXCEPTION) {
|
|
||||||
// On the case of PROTOCOL_EXCEPTION, fail the promise directly
|
|
||||||
// See #1211
|
|
||||||
promise.setFailure(PROTOCOL_EXCEPTION);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
ctx.write(msg);
|
ctx.write(msg);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
ctx.flush(promise);
|
|
||||||
} finally {
|
|
||||||
outboundBuffer.clear();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void handleOutboundMessage(ChannelHandlerContext ctx, Object msg) throws Exception {
|
private void handleOutboundMessage(ChannelHandlerContext ctx, Object msg) throws Exception {
|
||||||
if (msg instanceof SpdyDataFrame) {
|
if (msg instanceof SpdyDataFrame) {
|
||||||
|
@ -19,6 +19,7 @@ import io.netty.buffer.ByteBufAllocator;
|
|||||||
import io.netty.util.DefaultAttributeMap;
|
import io.netty.util.DefaultAttributeMap;
|
||||||
import io.netty.util.concurrent.EventExecutor;
|
import io.netty.util.concurrent.EventExecutor;
|
||||||
import io.netty.util.concurrent.EventExecutorGroup;
|
import io.netty.util.concurrent.EventExecutorGroup;
|
||||||
|
import io.netty.util.internal.StringUtil;
|
||||||
|
|
||||||
import java.net.SocketAddress;
|
import java.net.SocketAddress;
|
||||||
|
|
||||||
@ -33,6 +34,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
|
|||||||
private final DefaultChannelPipeline pipeline;
|
private final DefaultChannelPipeline pipeline;
|
||||||
private final String name;
|
private final String name;
|
||||||
private final ChannelHandler handler;
|
private final ChannelHandler handler;
|
||||||
|
private Throwable lastWriteException;
|
||||||
private boolean removed;
|
private boolean removed;
|
||||||
|
|
||||||
// Will be set to null if no child executor should be used, otherwise it will be set to the
|
// Will be set to null if no child executor should be used, otherwise it will be set to the
|
||||||
@ -696,7 +698,13 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
|
|||||||
try {
|
try {
|
||||||
handler.write(this, msg);
|
handler.write(this, msg);
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
notifyHandlerException(t);
|
if (lastWriteException == null) {
|
||||||
|
lastWriteException = t;
|
||||||
|
} else if (logger.isWarnEnabled()) {
|
||||||
|
logger.warn(
|
||||||
|
"More than one exception was raised by " + StringUtil.simpleClassName(handler) + ".write()." +
|
||||||
|
"Will fail the subsequent flush() with the first one and log others.", t);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -723,6 +731,13 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void invokeFlush0(ChannelPromise promise) {
|
private void invokeFlush0(ChannelPromise promise) {
|
||||||
|
Throwable lastWriteException = this.lastWriteException;
|
||||||
|
if (lastWriteException != null) {
|
||||||
|
this.lastWriteException = null;
|
||||||
|
promise.setFailure(lastWriteException);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
((ChannelOutboundHandler) handler()).flush(this, promise);
|
((ChannelOutboundHandler) handler()).flush(this, promise);
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
|
Loading…
Reference in New Issue
Block a user