Fire the IdleStateEvent and also the ReadTimeOutException / WriteTimeOutException from the Worker-Thread. See #641

This commit is contained in:
Norman Maurer 2012-10-05 20:08:21 +02:00
parent 8669732479
commit 2056882cfc
3 changed files with 46 additions and 28 deletions

View File

@ -373,6 +373,20 @@ public class IdleStateHandler extends SimpleChannelUpstreamHandler
return state;
}
private void fireChannelIdle(
final ChannelHandlerContext ctx, final IdleState state, final long lastActivityTimeMillis) {
ctx.getPipeline().execute(new Runnable() {
public void run() {
try {
channelIdle(ctx, state, lastActivityTimeMillis);
} catch (Throwable t) {
fireExceptionCaught(ctx, t);
}
}
});
}
protected void channelIdle(
ChannelHandlerContext ctx, IdleState state, long lastActivityTimeMillis) throws Exception {
ctx.sendUpstream(new DefaultIdleStateEvent(ctx.getChannel(), state, lastActivityTimeMillis));
@ -399,11 +413,7 @@ public class IdleStateHandler extends SimpleChannelUpstreamHandler
// Reader is idle - set a new timeout and notify the callback.
state.readerIdleTimeout =
timer.newTimeout(this, readerIdleTimeMillis, TimeUnit.MILLISECONDS);
try {
channelIdle(ctx, IdleState.READER_IDLE, lastReadTime);
} catch (Throwable t) {
fireExceptionCaught(ctx, t);
}
fireChannelIdle(ctx, IdleState.READER_IDLE, lastReadTime);
} else {
// Read occurred before the timeout - set a new timeout with shorter delay.
state.readerIdleTimeout =
@ -434,11 +444,7 @@ public class IdleStateHandler extends SimpleChannelUpstreamHandler
// Writer is idle - set a new timeout and notify the callback.
state.writerIdleTimeout =
timer.newTimeout(this, writerIdleTimeMillis, TimeUnit.MILLISECONDS);
try {
channelIdle(ctx, IdleState.WRITER_IDLE, lastWriteTime);
} catch (Throwable t) {
fireExceptionCaught(ctx, t);
}
fireChannelIdle(ctx, IdleState.WRITER_IDLE, lastWriteTime);
} else {
// Write occurred before the timeout - set a new timeout with shorter delay.
state.writerIdleTimeout =
@ -469,11 +475,7 @@ public class IdleStateHandler extends SimpleChannelUpstreamHandler
// notify the callback.
state.allIdleTimeout =
timer.newTimeout(this, allIdleTimeMillis, TimeUnit.MILLISECONDS);
try {
channelIdle(ctx, IdleState.ALL_IDLE, lastIoTime);
} catch (Throwable t) {
fireExceptionCaught(ctx, t);
}
fireChannelIdle(ctx, IdleState.ALL_IDLE, lastIoTime);
} else {
// Either read or write occurred before the timeout - set a new
// timeout with shorter delay.

View File

@ -232,6 +232,19 @@ public class ReadTimeoutHandler extends SimpleChannelUpstreamHandler
return state;
}
private void fireReadTimedOut(final ChannelHandlerContext ctx) throws Exception {
ctx.getPipeline().execute(new Runnable() {
public void run() {
try {
readTimedOut(ctx);
} catch (Throwable t) {
fireExceptionCaught(ctx, t);
}
}
});
}
protected void readTimedOut(ChannelHandlerContext ctx) throws Exception {
Channels.fireExceptionCaught(ctx, EXCEPTION);
}
@ -260,13 +273,7 @@ public class ReadTimeoutHandler extends SimpleChannelUpstreamHandler
// Read timed out - set a new timeout and notify the callback.
state.timeout =
timer.newTimeout(this, timeoutMillis, TimeUnit.MILLISECONDS);
try {
// FIXME This should be called from an I/O thread.
// To be fixed in Netty 4.
readTimedOut(ctx);
} catch (Throwable t) {
fireExceptionCaught(ctx, t);
}
fireReadTimedOut(ctx);
} else {
// Read occurred before the timeout - set a new timeout with shorter delay.
state.timeout =

View File

@ -159,8 +159,21 @@ public class WriteTimeoutHandler extends SimpleChannelDownstreamHandler
super.writeRequested(ctx, e);
}
private void fireWriteTimeOut(final ChannelHandlerContext ctx) {
ctx.getPipeline().execute(new Runnable() {
public void run() {
try {
writeTimedOut(ctx);
} catch (Throwable t) {
fireExceptionCaught(ctx, t);
}
}
});
}
protected void writeTimedOut(ChannelHandlerContext ctx) throws Exception {
Channels.fireExceptionCaughtLater(ctx, EXCEPTION);
Channels.fireExceptionCaught(ctx, EXCEPTION);
}
private final class WriteTimeoutTask implements TimerTask {
@ -185,11 +198,7 @@ public class WriteTimeoutHandler extends SimpleChannelDownstreamHandler
// Mark the future as failure
if (future.setFailure(EXCEPTION)) {
// If succeeded to mark as failure, notify the pipeline, too.
try {
writeTimedOut(ctx);
} catch (Throwable t) {
fireExceptionCaught(ctx, t);
}
fireWriteTimeOut(ctx);
}
}
}