From f22a55d6e24c4a3c12d35aeb07f03f6cacadb3e1 Mon Sep 17 00:00:00 2001 From: Trustin Lee Date: Wed, 4 May 2011 17:10:32 +0900 Subject: [PATCH] NETTY-401 - NullPointerException when ReadTimeoutHandler is concurrently initialized and destroyed * Made ReadTimeoutHandler @Sharable * Updated the UptimeClient again ** no static fields for global state - just reuse the handlers. --- .../netty/example/uptime/UptimeClient.java | 10 ++- .../example/uptime/UptimeClientHandler.java | 4 +- .../handler/timeout/ReadTimeoutHandler.java | 61 +++++++++++++------ 3 files changed, 50 insertions(+), 25 deletions(-) diff --git a/src/main/java/org/jboss/netty/example/uptime/UptimeClient.java b/src/main/java/org/jboss/netty/example/uptime/UptimeClient.java index bded1dac5d..05c0faeec0 100644 --- a/src/main/java/org/jboss/netty/example/uptime/UptimeClient.java +++ b/src/main/java/org/jboss/netty/example/uptime/UptimeClient.java @@ -19,6 +19,7 @@ import java.net.InetSocketAddress; import java.util.concurrent.Executors; import org.jboss.netty.bootstrap.ClientBootstrap; +import org.jboss.netty.channel.ChannelHandler; import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelPipelineFactory; import org.jboss.netty.channel.Channels; @@ -70,11 +71,16 @@ public class UptimeClient { // Configure the pipeline factory. bootstrap.setPipelineFactory(new ChannelPipelineFactory() { + + private final ChannelHandler timeoutHandler = + new ReadTimeoutHandler(timer, READ_TIMEOUT); + private final ChannelHandler uptimeHandler = + new UptimeClientHandler(bootstrap, timer); + @Override public ChannelPipeline getPipeline() throws Exception { return Channels.pipeline( - new ReadTimeoutHandler(timer, READ_TIMEOUT), - new UptimeClientHandler(bootstrap, timer)); + timeoutHandler, uptimeHandler); } }); diff --git a/src/main/java/org/jboss/netty/example/uptime/UptimeClientHandler.java b/src/main/java/org/jboss/netty/example/uptime/UptimeClientHandler.java index 5bdfc40f93..6fe49f5363 100644 --- a/src/main/java/org/jboss/netty/example/uptime/UptimeClientHandler.java +++ b/src/main/java/org/jboss/netty/example/uptime/UptimeClientHandler.java @@ -40,11 +40,9 @@ import org.jboss.netty.util.TimerTask; */ public class UptimeClientHandler extends SimpleChannelUpstreamHandler { - // We assume that we are tracking only one server in this example. - private static long startTime = -1; - final ClientBootstrap bootstrap; private final Timer timer; + private long startTime = -1; public UptimeClientHandler(ClientBootstrap bootstrap, Timer timer) { this.bootstrap = bootstrap; diff --git a/src/main/java/org/jboss/netty/handler/timeout/ReadTimeoutHandler.java b/src/main/java/org/jboss/netty/handler/timeout/ReadTimeoutHandler.java index 2d06a95716..0e1eaf2460 100644 --- a/src/main/java/org/jboss/netty/handler/timeout/ReadTimeoutHandler.java +++ b/src/main/java/org/jboss/netty/handler/timeout/ReadTimeoutHandler.java @@ -20,6 +20,8 @@ import static org.jboss.netty.channel.Channels.*; import java.util.concurrent.TimeUnit; import org.jboss.netty.bootstrap.ServerBootstrap; +import org.jboss.netty.channel.ChannelHandler; +import org.jboss.netty.channel.ChannelHandler.Sharable; import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelPipelineFactory; @@ -42,15 +44,17 @@ import org.jboss.netty.util.TimerTask; * public class MyPipelineFactory implements {@link ChannelPipelineFactory} { * * private final {@link Timer} timer; + * private final {@link ChannelHandler} timeoutHandler; * * public MyPipelineFactory({@link Timer} timer) { * this.timer = timer; + * this.timeoutHandler = new {@link ReadTimeoutHandler}(timer, 30), // timer must be shared. * } * * public {@link ChannelPipeline} getPipeline() { * // An example configuration that implements 30-second read timeout: * return {@link Channels}.pipeline( - * new {@link ReadTimeoutHandler}(timer, 30), // timer must be shared. + * timeoutHandler, * new MyHandler()); * } * } @@ -77,6 +81,7 @@ import org.jboss.netty.util.TimerTask; * @apiviz.uses org.jboss.netty.util.HashedWheelTimer * @apiviz.has org.jboss.netty.handler.timeout.TimeoutException oneway - - raises */ +@Sharable public class ReadTimeoutHandler extends SimpleChannelUpstreamHandler implements LifeCycleAwareChannelHandler, ExternalResourceReleasable { @@ -85,9 +90,6 @@ public class ReadTimeoutHandler extends SimpleChannelUpstreamHandler final Timer timer; final long timeoutMillis; - volatile Timeout timeout; - private volatile ReadTimeoutTask task; - volatile long lastReadTime; /** * Creates a new instance. @@ -159,7 +161,7 @@ public class ReadTimeoutHandler extends SimpleChannelUpstreamHandler @Override public void beforeRemove(ChannelHandlerContext ctx) throws Exception { - destroy(); + destroy(ctx); } @Override @@ -180,35 +182,42 @@ public class ReadTimeoutHandler extends SimpleChannelUpstreamHandler @Override public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { - destroy(); + destroy(ctx); ctx.sendUpstream(e); } @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { - updateLastReadTime(); + updateLastReadTime(ctx); ctx.sendUpstream(e); } private void initialize(ChannelHandlerContext ctx) { - updateLastReadTime(); - task = new ReadTimeoutTask(ctx); + updateLastReadTime(ctx); if (timeoutMillis > 0) { - timeout = timer.newTimeout(task, timeoutMillis, TimeUnit.MILLISECONDS); + State state = (State) ctx.getAttachment(); + state.timeout = timer.newTimeout(new ReadTimeoutTask(ctx), timeoutMillis, TimeUnit.MILLISECONDS); } } - private void updateLastReadTime() { - lastReadTime = System.currentTimeMillis(); + private void updateLastReadTime(ChannelHandlerContext ctx) { + State state = (State) ctx.getAttachment(); + if (state == null) { + // lastReadTime will be set by the constructor, so we do not do it + // again here. + ctx.setAttachment(state = new State()); + } else { + state.lastReadTime = System.currentTimeMillis(); + } } - private void destroy() { - if (timeout != null) { - timeout.cancel(); + private void destroy(ChannelHandlerContext ctx) { + State state = (State) ctx.getAttachment(); + if (state.timeout != null) { + state.timeout.cancel(); + state.timeout = null; } - timeout = null; - task = null; } protected void readTimedOut(ChannelHandlerContext ctx) throws Exception { @@ -233,22 +242,34 @@ public class ReadTimeoutHandler extends SimpleChannelUpstreamHandler return; } + State state = (State) ctx.getAttachment(); long currentTime = System.currentTimeMillis(); - long nextDelay = timeoutMillis - (currentTime - lastReadTime); + long nextDelay = timeoutMillis - (currentTime - state.lastReadTime); if (nextDelay <= 0) { // Read timed out - set a new timeout and notify the callback. - ReadTimeoutHandler.this.timeout = + state.timeout = timer.newTimeout(this, timeoutMillis, TimeUnit.MILLISECONDS); try { + // FIXME This should be called from an I/O thread. + // To be fixed in Netty 4. readTimedOut(ctx); } catch (Throwable t) { fireExceptionCaught(ctx, t); } } else { // Read occurred before the timeout - set a new timeout with shorter delay. - ReadTimeoutHandler.this.timeout = + state.timeout = timer.newTimeout(this, nextDelay, TimeUnit.MILLISECONDS); } } } + + private static final class State { + volatile Timeout timeout; + volatile long lastReadTime = System.currentTimeMillis(); + + State() { + super(); + } + } }