Make sure that ChannelDownstreamHandler impl fire exception caughts
later via the io-worker. See #140 and #187
This commit is contained in:
parent
f2d1f1e8ad
commit
68066c5e4b
@ -90,7 +90,7 @@ public class ChunkedWriteHandler implements ChannelUpstreamHandler, ChannelDowns
|
||||
}
|
||||
|
||||
try {
|
||||
flush(ctx);
|
||||
flush(ctx, false);
|
||||
} catch (Exception e) {
|
||||
if (logger.isWarnEnabled()) {
|
||||
logger.warn("Unexpected exception while sending chunks.", e);
|
||||
@ -112,10 +112,10 @@ public class ChunkedWriteHandler implements ChannelUpstreamHandler, ChannelDowns
|
||||
final Channel channel = ctx.getChannel();
|
||||
if (channel.isWritable()) {
|
||||
this.ctx = ctx;
|
||||
flush(ctx);
|
||||
flush(ctx, false);
|
||||
} else if (!channel.isConnected()) {
|
||||
this.ctx = ctx;
|
||||
discard(ctx);
|
||||
discard(ctx, false);
|
||||
}
|
||||
}
|
||||
|
||||
@ -127,12 +127,12 @@ public class ChunkedWriteHandler implements ChannelUpstreamHandler, ChannelDowns
|
||||
switch (cse.getState()) {
|
||||
case INTEREST_OPS:
|
||||
// Continue writing when the channel becomes writable.
|
||||
flush(ctx);
|
||||
flush(ctx, true);
|
||||
break;
|
||||
case OPEN:
|
||||
if (!Boolean.TRUE.equals(cse.getValue())) {
|
||||
// Fail all pending writes
|
||||
discard(ctx);
|
||||
discard(ctx, true);
|
||||
}
|
||||
break;
|
||||
}
|
||||
@ -140,7 +140,7 @@ public class ChunkedWriteHandler implements ChannelUpstreamHandler, ChannelDowns
|
||||
ctx.sendUpstream(e);
|
||||
}
|
||||
|
||||
private void discard(ChannelHandlerContext ctx) {
|
||||
private void discard(ChannelHandlerContext ctx, boolean fireNow) {
|
||||
ClosedChannelException cause = null;
|
||||
boolean fireExceptionCaught = false;
|
||||
|
||||
@ -175,14 +175,18 @@ public class ChunkedWriteHandler implements ChannelUpstreamHandler, ChannelDowns
|
||||
|
||||
|
||||
if (fireExceptionCaught) {
|
||||
Channels.fireExceptionCaught(ctx.getChannel(), cause);
|
||||
if (fireNow) {
|
||||
fireExceptionCaught(ctx.getChannel(), cause);
|
||||
} else {
|
||||
fireExceptionCaughtLater(ctx.getChannel(), cause);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private synchronized void flush(ChannelHandlerContext ctx) throws Exception {
|
||||
private synchronized void flush(ChannelHandlerContext ctx, boolean fireNow) throws Exception {
|
||||
final Channel channel = ctx.getChannel();
|
||||
if (!channel.isConnected()) {
|
||||
discard(ctx);
|
||||
discard(ctx, fireNow);
|
||||
}
|
||||
|
||||
while (channel.isWritable()) {
|
||||
@ -220,7 +224,11 @@ public class ChunkedWriteHandler implements ChannelUpstreamHandler, ChannelDowns
|
||||
this.currentEvent = null;
|
||||
|
||||
currentEvent.getFuture().setFailure(t);
|
||||
fireExceptionCaught(ctx, t);
|
||||
if (fireNow) {
|
||||
fireExceptionCaught(ctx, t);
|
||||
} else {
|
||||
fireExceptionCaughtLater(ctx.getChannel(), t);
|
||||
}
|
||||
|
||||
closeInput(chunks);
|
||||
break;
|
||||
@ -262,7 +270,7 @@ public class ChunkedWriteHandler implements ChannelUpstreamHandler, ChannelDowns
|
||||
}
|
||||
|
||||
if (!channel.isConnected()) {
|
||||
discard(ctx);
|
||||
discard(ctx, fireNow);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
@ -154,7 +154,7 @@ public class WriteTimeoutHandler extends SimpleChannelDownstreamHandler
|
||||
}
|
||||
|
||||
protected void writeTimedOut(ChannelHandlerContext ctx) throws Exception {
|
||||
Channels.fireExceptionCaught(ctx, EXCEPTION);
|
||||
Channels.fireExceptionCaughtLater(ctx.getChannel(), EXCEPTION);
|
||||
}
|
||||
|
||||
private final class WriteTimeoutTask implements TimerTask {
|
||||
|
Loading…
Reference in New Issue
Block a user