NETTY-401 - NullPointerException when ReadTimeoutHandler is

concurrently initialized and destroyed

* Made ReadTimeoutHandler @Sharable
* Updated the UptimeClient again
** no static fields for global state - just reuse the handlers.
This commit is contained in:
Trustin Lee 2011-05-04 17:10:32 +09:00
parent 45a4b5b7c4
commit f22a55d6e2
3 changed files with 50 additions and 25 deletions

View File

@ -19,6 +19,7 @@ import java.net.InetSocketAddress;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import org.jboss.netty.bootstrap.ClientBootstrap; import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.ChannelHandler;
import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory; import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels; import org.jboss.netty.channel.Channels;
@ -70,11 +71,16 @@ public class UptimeClient {
// Configure the pipeline factory. // Configure the pipeline factory.
bootstrap.setPipelineFactory(new ChannelPipelineFactory() { bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
private final ChannelHandler timeoutHandler =
new ReadTimeoutHandler(timer, READ_TIMEOUT);
private final ChannelHandler uptimeHandler =
new UptimeClientHandler(bootstrap, timer);
@Override @Override
public ChannelPipeline getPipeline() throws Exception { public ChannelPipeline getPipeline() throws Exception {
return Channels.pipeline( return Channels.pipeline(
new ReadTimeoutHandler(timer, READ_TIMEOUT), timeoutHandler, uptimeHandler);
new UptimeClientHandler(bootstrap, timer));
} }
}); });

View File

@ -40,11 +40,9 @@ import org.jboss.netty.util.TimerTask;
*/ */
public class UptimeClientHandler extends SimpleChannelUpstreamHandler { public class UptimeClientHandler extends SimpleChannelUpstreamHandler {
// We assume that we are tracking only one server in this example.
private static long startTime = -1;
final ClientBootstrap bootstrap; final ClientBootstrap bootstrap;
private final Timer timer; private final Timer timer;
private long startTime = -1;
public UptimeClientHandler(ClientBootstrap bootstrap, Timer timer) { public UptimeClientHandler(ClientBootstrap bootstrap, Timer timer) {
this.bootstrap = bootstrap; this.bootstrap = bootstrap;

View File

@ -20,6 +20,8 @@ import static org.jboss.netty.channel.Channels.*;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.jboss.netty.bootstrap.ServerBootstrap; import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.ChannelHandler;
import org.jboss.netty.channel.ChannelHandler.Sharable;
import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory; import org.jboss.netty.channel.ChannelPipelineFactory;
@ -42,15 +44,17 @@ import org.jboss.netty.util.TimerTask;
* public class MyPipelineFactory implements {@link ChannelPipelineFactory} { * public class MyPipelineFactory implements {@link ChannelPipelineFactory} {
* *
* private final {@link Timer} timer; * private final {@link Timer} timer;
* private final {@link ChannelHandler} timeoutHandler;
* *
* public MyPipelineFactory({@link Timer} timer) { * public MyPipelineFactory({@link Timer} timer) {
* this.timer = timer; * this.timer = timer;
* this.timeoutHandler = <b>new {@link ReadTimeoutHandler}(timer, 30), // timer must be shared.</b>
* } * }
* *
* public {@link ChannelPipeline} getPipeline() { * public {@link ChannelPipeline} getPipeline() {
* // An example configuration that implements 30-second read timeout: * // An example configuration that implements 30-second read timeout:
* return {@link Channels}.pipeline( * return {@link Channels}.pipeline(
* <b>new {@link ReadTimeoutHandler}(timer, 30), // timer must be shared.</b> * timeoutHandler,
* new MyHandler()); * new MyHandler());
* } * }
* } * }
@ -77,6 +81,7 @@ import org.jboss.netty.util.TimerTask;
* @apiviz.uses org.jboss.netty.util.HashedWheelTimer * @apiviz.uses org.jboss.netty.util.HashedWheelTimer
* @apiviz.has org.jboss.netty.handler.timeout.TimeoutException oneway - - raises * @apiviz.has org.jboss.netty.handler.timeout.TimeoutException oneway - - raises
*/ */
@Sharable
public class ReadTimeoutHandler extends SimpleChannelUpstreamHandler public class ReadTimeoutHandler extends SimpleChannelUpstreamHandler
implements LifeCycleAwareChannelHandler, implements LifeCycleAwareChannelHandler,
ExternalResourceReleasable { ExternalResourceReleasable {
@ -85,9 +90,6 @@ public class ReadTimeoutHandler extends SimpleChannelUpstreamHandler
final Timer timer; final Timer timer;
final long timeoutMillis; final long timeoutMillis;
volatile Timeout timeout;
private volatile ReadTimeoutTask task;
volatile long lastReadTime;
/** /**
* Creates a new instance. * Creates a new instance.
@ -159,7 +161,7 @@ public class ReadTimeoutHandler extends SimpleChannelUpstreamHandler
@Override @Override
public void beforeRemove(ChannelHandlerContext ctx) throws Exception { public void beforeRemove(ChannelHandlerContext ctx) throws Exception {
destroy(); destroy(ctx);
} }
@Override @Override
@ -180,35 +182,42 @@ public class ReadTimeoutHandler extends SimpleChannelUpstreamHandler
@Override @Override
public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e)
throws Exception { throws Exception {
destroy(); destroy(ctx);
ctx.sendUpstream(e); ctx.sendUpstream(e);
} }
@Override @Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
throws Exception { throws Exception {
updateLastReadTime(); updateLastReadTime(ctx);
ctx.sendUpstream(e); ctx.sendUpstream(e);
} }
private void initialize(ChannelHandlerContext ctx) { private void initialize(ChannelHandlerContext ctx) {
updateLastReadTime(); updateLastReadTime(ctx);
task = new ReadTimeoutTask(ctx);
if (timeoutMillis > 0) { if (timeoutMillis > 0) {
timeout = timer.newTimeout(task, timeoutMillis, TimeUnit.MILLISECONDS); State state = (State) ctx.getAttachment();
state.timeout = timer.newTimeout(new ReadTimeoutTask(ctx), timeoutMillis, TimeUnit.MILLISECONDS);
} }
} }
private void updateLastReadTime() { private void updateLastReadTime(ChannelHandlerContext ctx) {
lastReadTime = System.currentTimeMillis(); State state = (State) ctx.getAttachment();
if (state == null) {
// lastReadTime will be set by the constructor, so we do not do it
// again here.
ctx.setAttachment(state = new State());
} else {
state.lastReadTime = System.currentTimeMillis();
}
} }
private void destroy() { private void destroy(ChannelHandlerContext ctx) {
if (timeout != null) { State state = (State) ctx.getAttachment();
timeout.cancel(); if (state.timeout != null) {
state.timeout.cancel();
state.timeout = null;
} }
timeout = null;
task = null;
} }
protected void readTimedOut(ChannelHandlerContext ctx) throws Exception { protected void readTimedOut(ChannelHandlerContext ctx) throws Exception {
@ -233,22 +242,34 @@ public class ReadTimeoutHandler extends SimpleChannelUpstreamHandler
return; return;
} }
State state = (State) ctx.getAttachment();
long currentTime = System.currentTimeMillis(); long currentTime = System.currentTimeMillis();
long nextDelay = timeoutMillis - (currentTime - lastReadTime); long nextDelay = timeoutMillis - (currentTime - state.lastReadTime);
if (nextDelay <= 0) { if (nextDelay <= 0) {
// Read timed out - set a new timeout and notify the callback. // Read timed out - set a new timeout and notify the callback.
ReadTimeoutHandler.this.timeout = state.timeout =
timer.newTimeout(this, timeoutMillis, TimeUnit.MILLISECONDS); timer.newTimeout(this, timeoutMillis, TimeUnit.MILLISECONDS);
try { try {
// FIXME This should be called from an I/O thread.
// To be fixed in Netty 4.
readTimedOut(ctx); readTimedOut(ctx);
} catch (Throwable t) { } catch (Throwable t) {
fireExceptionCaught(ctx, t); fireExceptionCaught(ctx, t);
} }
} else { } else {
// Read occurred before the timeout - set a new timeout with shorter delay. // Read occurred before the timeout - set a new timeout with shorter delay.
ReadTimeoutHandler.this.timeout = state.timeout =
timer.newTimeout(this, nextDelay, TimeUnit.MILLISECONDS); timer.newTimeout(this, nextDelay, TimeUnit.MILLISECONDS);
} }
} }
} }
private static final class State {
volatile Timeout timeout;
volatile long lastReadTime = System.currentTimeMillis();
State() {
super();
}
}
} }