From 5f24f176bb4a6c2ef2ecb1ac1b7f0584b9d406fc Mon Sep 17 00:00:00 2001 From: Trustin Lee Date: Thu, 31 May 2012 15:04:25 -0700 Subject: [PATCH] Port ReadTimeoutHandler --- .../handler/timeout/ReadTimeoutHandler.java | 182 ++++++------------ 1 file changed, 61 insertions(+), 121 deletions(-) diff --git a/handler/src/main/java/io/netty/handler/timeout/ReadTimeoutHandler.java b/handler/src/main/java/io/netty/handler/timeout/ReadTimeoutHandler.java index 0bc92089b9..60aa0f1494 100644 --- a/handler/src/main/java/io/netty/handler/timeout/ReadTimeoutHandler.java +++ b/handler/src/main/java/io/netty/handler/timeout/ReadTimeoutHandler.java @@ -15,26 +15,21 @@ */ package io.netty.handler.timeout; -import static io.netty.channel.Channels.*; - -import java.util.concurrent.TimeUnit; - import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.ChannelBufferHolder; +import io.netty.channel.ChannelBufferHolders; import io.netty.channel.ChannelHandler; -import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelInboundHandlerContext; import io.netty.channel.ChannelPipeline; -import io.netty.channel.ChannelPipelineFactory; -import io.netty.channel.ChannelStateEvent; -import io.netty.channel.Channels; -import io.netty.channel.LifeCycleAwareChannelHandler; -import io.netty.channel.MessageEvent; -import io.netty.channel.SimpleChannelUpstreamHandler; -import io.netty.util.ExternalResourceReleasable; +import io.netty.channel.EventLoop; import io.netty.util.HashedWheelTimer; -import io.netty.util.Timeout; import io.netty.util.Timer; -import io.netty.util.TimerTask; + +import java.nio.channels.Channels; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; /** * Raises a {@link ReadTimeoutException} when no data was read within a certain @@ -76,49 +71,43 @@ import io.netty.util.TimerTask; * @apiviz.uses io.netty.util.HashedWheelTimer * @apiviz.has io.netty.handler.timeout.TimeoutException oneway - - raises */ -@Sharable -public class ReadTimeoutHandler extends SimpleChannelUpstreamHandler - implements LifeCycleAwareChannelHandler, - ExternalResourceReleasable { +public class ReadTimeoutHandler extends ChannelInboundHandlerAdapter { - static final ReadTimeoutException EXCEPTION = new ReadTimeoutException(); + private static final ReadTimeoutException EXCEPTION = new ReadTimeoutException(); - final Timer timer; - final long timeoutMillis; + static { + EXCEPTION.setStackTrace(new StackTraceElement[0]); + } + + private final long timeoutMillis; + + private volatile ScheduledFuture timeout; + private volatile long lastReadTime; + private volatile boolean destroyed; /** * Creates a new instance. * - * @param timer - * the {@link Timer} that is used to trigger the scheduled event. - * The recommended {@link Timer} implementation is {@link HashedWheelTimer}. * @param timeoutSeconds * read timeout in seconds */ - public ReadTimeoutHandler(Timer timer, int timeoutSeconds) { - this(timer, timeoutSeconds, TimeUnit.SECONDS); + public ReadTimeoutHandler(int timeoutSeconds) { + this(timeoutSeconds, TimeUnit.SECONDS); } /** * Creates a new instance. * - * @param timer - * the {@link Timer} that is used to trigger the scheduled event. - * The recommended {@link Timer} implementation is {@link HashedWheelTimer}. * @param timeout * read timeout * @param unit * the {@link TimeUnit} of {@code timeout} */ - public ReadTimeoutHandler(Timer timer, long timeout, TimeUnit unit) { - if (timer == null) { - throw new NullPointerException("timer"); - } + public ReadTimeoutHandler(long timeout, TimeUnit unit) { if (unit == null) { throw new NullPointerException("unit"); } - this.timer = timer; if (timeout <= 0) { timeoutMillis = 0; } else { @@ -126,115 +115,82 @@ public class ReadTimeoutHandler extends SimpleChannelUpstreamHandler } } - /** - * Stops the {@link Timer} which was specified in the constructor of this - * handler. You should not call this method if the {@link Timer} is in use - * by other objects. - */ @Override - public void releaseExternalResources() { - timer.stop(); + public ChannelBufferHolder newInboundBuffer( + ChannelInboundHandlerContext ctx) throws Exception { + return ChannelBufferHolders.inboundBypassBuffer(ctx); } @Override public void beforeAdd(ChannelHandlerContext ctx) throws Exception { - if (ctx.pipeline().isAttached()) { - // channelOpen event has been fired already, which means - // this.channelOpen() will not be invoked. - // We have to initialize here instead. + if (ctx.channel().isActive()) { + // channelActvie() event has been fired already, which means this.channelActive() will + // not be invoked. We have to initialize here instead. initialize(ctx); } else { - // channelOpen event has not been fired yet. - // this.channelOpen() will be invoked and initialization will occur there. + // channelActive() event has not been fired yet. this.channelOpen() will be invoked + // and initialization will occur there. } } - @Override - public void afterAdd(ChannelHandlerContext ctx) throws Exception { - // NOOP - } - @Override public void beforeRemove(ChannelHandlerContext ctx) throws Exception { - destroy(ctx); + destroy(); } @Override - public void afterRemove(ChannelHandlerContext ctx) throws Exception { - // NOOP - } - - @Override - public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) + public void channelActive(ChannelInboundHandlerContext ctx) throws Exception { // This method will be invoked only if this handler was added - // before channelOpen event is fired. If a user adds this handler - // after the channelOpen event, initialize() will be called by beforeAdd(). + // before channelActive() event is fired. If a user adds this handler + // after the channelActive() event, initialize() will be called by beforeAdd(). initialize(ctx); - ctx.sendUpstream(e); + super.channelActive(ctx); } @Override - public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) - throws Exception { - destroy(ctx); - ctx.sendUpstream(e); + public void channelInactive(ChannelInboundHandlerContext ctx) throws Exception { + destroy(); + super.channelInactive(ctx); } @Override - public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) - throws Exception { - State state = (State) ctx.getAttachment(); - state.lastReadTime = System.currentTimeMillis(); - ctx.sendUpstream(e); + public void inboundBufferUpdated(ChannelInboundHandlerContext ctx) throws Exception { + lastReadTime = System.currentTimeMillis(); + ctx.fireInboundBufferUpdated(); } private void initialize(ChannelHandlerContext ctx) { - State state = state(ctx); - // Avoid the case where destroy() is called before scheduling timeouts. // See: https://github.com/netty/netty/issues/143 - if (state.destroyed) { + if (destroyed) { return; } + EventLoop loop = ctx.eventLoop(); + + lastReadTime = System.currentTimeMillis(); if (timeoutMillis > 0) { - state.timeout = timer.newTimeout(new ReadTimeoutTask(ctx), timeoutMillis, TimeUnit.MILLISECONDS); + timeout = loop.schedule( + new ReadTimeoutTask(ctx), + timeoutMillis, TimeUnit.MILLISECONDS); } } - private void destroy(ChannelHandlerContext ctx) { - State state; - synchronized (ctx) { - state = state(ctx); - state.destroyed = true; - } + private void destroy() { + destroyed = true; - if (state.timeout != null) { - state.timeout.cancel(); - state.timeout = null; + if (timeout != null) { + timeout.cancel(false); + timeout = null; } } - private State state(ChannelHandlerContext ctx) { - State state; - synchronized (ctx) { - // FIXME: It could have been better if there is setAttachmentIfAbsent(). - state = (State) ctx.getAttachment(); - if (state != null) { - return state; - } - state = new State(); - ctx.setAttachment(state); - } - return state; - } - protected void readTimedOut(ChannelHandlerContext ctx) throws Exception { - Channels.fireExceptionCaught(ctx, EXCEPTION); + ctx.fireExceptionCaught(EXCEPTION); } - private final class ReadTimeoutTask implements TimerTask { + private final class ReadTimeoutTask implements Runnable { private final ChannelHandlerContext ctx; @@ -243,43 +199,27 @@ public class ReadTimeoutHandler extends SimpleChannelUpstreamHandler } @Override - public void run(Timeout timeout) throws Exception { - if (timeout.isCancelled()) { - return; - } - + public void run() { if (!ctx.channel().isOpen()) { return; } - State state = (State) ctx.getAttachment(); long currentTime = System.currentTimeMillis(); - long nextDelay = timeoutMillis - (currentTime - state.lastReadTime); + long nextDelay = timeoutMillis - (currentTime - lastReadTime); if (nextDelay <= 0) { // Read timed out - set a new timeout and notify the callback. - state.timeout = - timer.newTimeout(this, timeoutMillis, TimeUnit.MILLISECONDS); + timeout = ctx.eventLoop().schedule(this, timeoutMillis, TimeUnit.MILLISECONDS); try { // FIXME This should be called from an I/O thread. // To be fixed in Netty 4. readTimedOut(ctx); } catch (Throwable t) { - fireExceptionCaught(ctx, t); + ctx.fireExceptionCaught(t); } } else { // Read occurred before the timeout - set a new timeout with shorter delay. - state.timeout = - timer.newTimeout(this, nextDelay, TimeUnit.MILLISECONDS); + timeout = ctx.eventLoop().schedule(this, nextDelay, TimeUnit.MILLISECONDS); } } } - - private static final class State { - volatile Timeout timeout; - volatile long lastReadTime = System.currentTimeMillis(); - volatile boolean destroyed; - - State() { - } - } }