Port ReadTimeoutHandler

This commit is contained in:
Trustin Lee 2012-05-31 15:04:25 -07:00
parent 7ddc93bed8
commit 5f24f176bb

View File

@ -15,26 +15,21 @@
*/ */
package io.netty.handler.timeout; package io.netty.handler.timeout;
import static io.netty.channel.Channels.*;
import java.util.concurrent.TimeUnit;
import io.netty.bootstrap.ServerBootstrap; import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelBufferHolder;
import io.netty.channel.ChannelBufferHolders;
import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInboundHandlerContext;
import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPipelineFactory; import io.netty.channel.EventLoop;
import io.netty.channel.ChannelStateEvent;
import io.netty.channel.Channels;
import io.netty.channel.LifeCycleAwareChannelHandler;
import io.netty.channel.MessageEvent;
import io.netty.channel.SimpleChannelUpstreamHandler;
import io.netty.util.ExternalResourceReleasable;
import io.netty.util.HashedWheelTimer; import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import io.netty.util.Timer; import io.netty.util.Timer;
import io.netty.util.TimerTask;
import java.nio.channels.Channels;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
/** /**
* Raises a {@link ReadTimeoutException} when no data was read within a certain * Raises a {@link ReadTimeoutException} when no data was read within a certain
@ -76,49 +71,43 @@ import io.netty.util.TimerTask;
* @apiviz.uses io.netty.util.HashedWheelTimer * @apiviz.uses io.netty.util.HashedWheelTimer
* @apiviz.has io.netty.handler.timeout.TimeoutException oneway - - raises * @apiviz.has io.netty.handler.timeout.TimeoutException oneway - - raises
*/ */
@Sharable public class ReadTimeoutHandler extends ChannelInboundHandlerAdapter<Object> {
public class ReadTimeoutHandler extends SimpleChannelUpstreamHandler
implements LifeCycleAwareChannelHandler,
ExternalResourceReleasable {
static final ReadTimeoutException EXCEPTION = new ReadTimeoutException(); private static final ReadTimeoutException EXCEPTION = new ReadTimeoutException();
final Timer timer; static {
final long timeoutMillis; EXCEPTION.setStackTrace(new StackTraceElement[0]);
}
private final long timeoutMillis;
private volatile ScheduledFuture<?> timeout;
private volatile long lastReadTime;
private volatile boolean destroyed;
/** /**
* Creates a new instance. * Creates a new instance.
* *
* @param timer
* the {@link Timer} that is used to trigger the scheduled event.
* The recommended {@link Timer} implementation is {@link HashedWheelTimer}.
* @param timeoutSeconds * @param timeoutSeconds
* read timeout in seconds * read timeout in seconds
*/ */
public ReadTimeoutHandler(Timer timer, int timeoutSeconds) { public ReadTimeoutHandler(int timeoutSeconds) {
this(timer, timeoutSeconds, TimeUnit.SECONDS); this(timeoutSeconds, TimeUnit.SECONDS);
} }
/** /**
* Creates a new instance. * Creates a new instance.
* *
* @param timer
* the {@link Timer} that is used to trigger the scheduled event.
* The recommended {@link Timer} implementation is {@link HashedWheelTimer}.
* @param timeout * @param timeout
* read timeout * read timeout
* @param unit * @param unit
* the {@link TimeUnit} of {@code timeout} * the {@link TimeUnit} of {@code timeout}
*/ */
public ReadTimeoutHandler(Timer timer, long timeout, TimeUnit unit) { public ReadTimeoutHandler(long timeout, TimeUnit unit) {
if (timer == null) {
throw new NullPointerException("timer");
}
if (unit == null) { if (unit == null) {
throw new NullPointerException("unit"); throw new NullPointerException("unit");
} }
this.timer = timer;
if (timeout <= 0) { if (timeout <= 0) {
timeoutMillis = 0; timeoutMillis = 0;
} else { } else {
@ -126,115 +115,82 @@ public class ReadTimeoutHandler extends SimpleChannelUpstreamHandler
} }
} }
/**
* Stops the {@link Timer} which was specified in the constructor of this
* handler. You should not call this method if the {@link Timer} is in use
* by other objects.
*/
@Override @Override
public void releaseExternalResources() { public ChannelBufferHolder<Object> newInboundBuffer(
timer.stop(); ChannelInboundHandlerContext<Object> ctx) throws Exception {
return ChannelBufferHolders.inboundBypassBuffer(ctx);
} }
@Override @Override
public void beforeAdd(ChannelHandlerContext ctx) throws Exception { public void beforeAdd(ChannelHandlerContext ctx) throws Exception {
if (ctx.pipeline().isAttached()) { if (ctx.channel().isActive()) {
// channelOpen event has been fired already, which means // channelActvie() event has been fired already, which means this.channelActive() will
// this.channelOpen() will not be invoked. // not be invoked. We have to initialize here instead.
// We have to initialize here instead.
initialize(ctx); initialize(ctx);
} else { } else {
// channelOpen event has not been fired yet. // channelActive() event has not been fired yet. this.channelOpen() will be invoked
// this.channelOpen() will be invoked and initialization will occur there. // and initialization will occur there.
} }
} }
@Override
public void afterAdd(ChannelHandlerContext ctx) throws Exception {
// NOOP
}
@Override @Override
public void beforeRemove(ChannelHandlerContext ctx) throws Exception { public void beforeRemove(ChannelHandlerContext ctx) throws Exception {
destroy(ctx); destroy();
} }
@Override @Override
public void afterRemove(ChannelHandlerContext ctx) throws Exception { public void channelActive(ChannelInboundHandlerContext<Object> ctx)
// NOOP
}
@Override
public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e)
throws Exception { throws Exception {
// This method will be invoked only if this handler was added // This method will be invoked only if this handler was added
// before channelOpen event is fired. If a user adds this handler // before channelActive() event is fired. If a user adds this handler
// after the channelOpen event, initialize() will be called by beforeAdd(). // after the channelActive() event, initialize() will be called by beforeAdd().
initialize(ctx); initialize(ctx);
ctx.sendUpstream(e); super.channelActive(ctx);
} }
@Override @Override
public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) public void channelInactive(ChannelInboundHandlerContext<Object> ctx) throws Exception {
throws Exception { destroy();
destroy(ctx); super.channelInactive(ctx);
ctx.sendUpstream(e);
} }
@Override @Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) public void inboundBufferUpdated(ChannelInboundHandlerContext<Object> ctx) throws Exception {
throws Exception { lastReadTime = System.currentTimeMillis();
State state = (State) ctx.getAttachment(); ctx.fireInboundBufferUpdated();
state.lastReadTime = System.currentTimeMillis();
ctx.sendUpstream(e);
} }
private void initialize(ChannelHandlerContext ctx) { private void initialize(ChannelHandlerContext ctx) {
State state = state(ctx);
// Avoid the case where destroy() is called before scheduling timeouts. // Avoid the case where destroy() is called before scheduling timeouts.
// See: https://github.com/netty/netty/issues/143 // See: https://github.com/netty/netty/issues/143
if (state.destroyed) { if (destroyed) {
return; return;
} }
EventLoop loop = ctx.eventLoop();
lastReadTime = System.currentTimeMillis();
if (timeoutMillis > 0) { if (timeoutMillis > 0) {
state.timeout = timer.newTimeout(new ReadTimeoutTask(ctx), timeoutMillis, TimeUnit.MILLISECONDS); timeout = loop.schedule(
new ReadTimeoutTask(ctx),
timeoutMillis, TimeUnit.MILLISECONDS);
} }
} }
private void destroy(ChannelHandlerContext ctx) { private void destroy() {
State state; destroyed = true;
synchronized (ctx) {
state = state(ctx);
state.destroyed = true;
}
if (state.timeout != null) { if (timeout != null) {
state.timeout.cancel(); timeout.cancel(false);
state.timeout = null; timeout = null;
} }
} }
private State state(ChannelHandlerContext ctx) {
State state;
synchronized (ctx) {
// FIXME: It could have been better if there is setAttachmentIfAbsent().
state = (State) ctx.getAttachment();
if (state != null) {
return state;
}
state = new State();
ctx.setAttachment(state);
}
return state;
}
protected void readTimedOut(ChannelHandlerContext ctx) throws Exception { protected void readTimedOut(ChannelHandlerContext ctx) throws Exception {
Channels.fireExceptionCaught(ctx, EXCEPTION); ctx.fireExceptionCaught(EXCEPTION);
} }
private final class ReadTimeoutTask implements TimerTask { private final class ReadTimeoutTask implements Runnable {
private final ChannelHandlerContext ctx; private final ChannelHandlerContext ctx;
@ -243,43 +199,27 @@ public class ReadTimeoutHandler extends SimpleChannelUpstreamHandler
} }
@Override @Override
public void run(Timeout timeout) throws Exception { public void run() {
if (timeout.isCancelled()) {
return;
}
if (!ctx.channel().isOpen()) { if (!ctx.channel().isOpen()) {
return; return;
} }
State state = (State) ctx.getAttachment();
long currentTime = System.currentTimeMillis(); long currentTime = System.currentTimeMillis();
long nextDelay = timeoutMillis - (currentTime - state.lastReadTime); long nextDelay = timeoutMillis - (currentTime - lastReadTime);
if (nextDelay <= 0) { if (nextDelay <= 0) {
// Read timed out - set a new timeout and notify the callback. // Read timed out - set a new timeout and notify the callback.
state.timeout = timeout = ctx.eventLoop().schedule(this, timeoutMillis, TimeUnit.MILLISECONDS);
timer.newTimeout(this, timeoutMillis, TimeUnit.MILLISECONDS);
try { try {
// FIXME This should be called from an I/O thread. // FIXME This should be called from an I/O thread.
// To be fixed in Netty 4. // To be fixed in Netty 4.
readTimedOut(ctx); readTimedOut(ctx);
} catch (Throwable t) { } catch (Throwable t) {
fireExceptionCaught(ctx, t); ctx.fireExceptionCaught(t);
} }
} else { } else {
// Read occurred before the timeout - set a new timeout with shorter delay. // Read occurred before the timeout - set a new timeout with shorter delay.
state.timeout = timeout = ctx.eventLoop().schedule(this, nextDelay, TimeUnit.MILLISECONDS);
timer.newTimeout(this, nextDelay, TimeUnit.MILLISECONDS);
} }
} }
} }
private static final class State {
volatile Timeout timeout;
volatile long lastReadTime = System.currentTimeMillis();
volatile boolean destroyed;
State() {
}
}
} }