NETTY-401 - NullPointerException when ReadTimeoutHandler is

concurrently initialized and destroyed

* Made ReadTimeoutHandler @Sharable
* Updated the UptimeClient again
** no static fields for global state - just reuse the handlers.
This commit is contained in:
Trustin Lee 2011-05-04 17:10:32 +09:00
parent 8458c289e2
commit 098ca0342c
3 changed files with 50 additions and 25 deletions

View File

@ -19,6 +19,7 @@ import java.net.InetSocketAddress;
import java.util.concurrent.Executors;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.ChannelHandler;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
@ -70,10 +71,15 @@ public class UptimeClient {
// Configure the pipeline factory.
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
private final ChannelHandler timeoutHandler =
new ReadTimeoutHandler(timer, READ_TIMEOUT);
private final ChannelHandler uptimeHandler =
new UptimeClientHandler(bootstrap, timer);
public ChannelPipeline getPipeline() throws Exception {
return Channels.pipeline(
new ReadTimeoutHandler(timer, READ_TIMEOUT),
new UptimeClientHandler(bootstrap, timer));
timeoutHandler, uptimeHandler);
}
});

View File

@ -40,11 +40,9 @@ import org.jboss.netty.util.TimerTask;
*/
public class UptimeClientHandler extends SimpleChannelUpstreamHandler {
// We assume that we are tracking only one server in this example.
private static long startTime = -1;
final ClientBootstrap bootstrap;
private final Timer timer;
private long startTime = -1;
public UptimeClientHandler(ClientBootstrap bootstrap, Timer timer) {
this.bootstrap = bootstrap;

View File

@ -20,6 +20,8 @@ import static org.jboss.netty.channel.Channels.*;
import java.util.concurrent.TimeUnit;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.ChannelHandler;
import org.jboss.netty.channel.ChannelHandler.Sharable;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
@ -42,15 +44,17 @@ import org.jboss.netty.util.TimerTask;
* public class MyPipelineFactory implements {@link ChannelPipelineFactory} {
*
* private final {@link Timer} timer;
* private final {@link ChannelHandler} timeoutHandler;
*
* public MyPipelineFactory({@link Timer} timer) {
* this.timer = timer;
* this.timeoutHandler = <b>new {@link ReadTimeoutHandler}(timer, 30), // timer must be shared.</b>
* }
*
* public {@link ChannelPipeline} getPipeline() {
* // An example configuration that implements 30-second read timeout:
* return {@link Channels}.pipeline(
* <b>new {@link ReadTimeoutHandler}(timer, 30), // timer must be shared.</b>
* timeoutHandler,
* new MyHandler());
* }
* }
@ -77,6 +81,7 @@ import org.jboss.netty.util.TimerTask;
* @apiviz.uses org.jboss.netty.util.HashedWheelTimer
* @apiviz.has org.jboss.netty.handler.timeout.TimeoutException oneway - - raises
*/
@Sharable
public class ReadTimeoutHandler extends SimpleChannelUpstreamHandler
implements LifeCycleAwareChannelHandler,
ExternalResourceReleasable {
@ -85,9 +90,6 @@ public class ReadTimeoutHandler extends SimpleChannelUpstreamHandler
final Timer timer;
final long timeoutMillis;
volatile Timeout timeout;
private volatile ReadTimeoutTask task;
volatile long lastReadTime;
/**
* Creates a new instance.
@ -155,7 +157,7 @@ public class ReadTimeoutHandler extends SimpleChannelUpstreamHandler
}
public void beforeRemove(ChannelHandlerContext ctx) throws Exception {
destroy();
destroy(ctx);
}
public void afterRemove(ChannelHandlerContext ctx) throws Exception {
@ -175,35 +177,42 @@ public class ReadTimeoutHandler extends SimpleChannelUpstreamHandler
@Override
public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e)
throws Exception {
destroy();
destroy(ctx);
ctx.sendUpstream(e);
}
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
throws Exception {
updateLastReadTime();
updateLastReadTime(ctx);
ctx.sendUpstream(e);
}
private void initialize(ChannelHandlerContext ctx) {
updateLastReadTime();
task = new ReadTimeoutTask(ctx);
updateLastReadTime(ctx);
if (timeoutMillis > 0) {
timeout = timer.newTimeout(task, timeoutMillis, TimeUnit.MILLISECONDS);
State state = (State) ctx.getAttachment();
state.timeout = timer.newTimeout(new ReadTimeoutTask(ctx), timeoutMillis, TimeUnit.MILLISECONDS);
}
}
private void updateLastReadTime() {
lastReadTime = System.currentTimeMillis();
private void updateLastReadTime(ChannelHandlerContext ctx) {
State state = (State) ctx.getAttachment();
if (state == null) {
// lastReadTime will be set by the constructor, so we do not do it
// again here.
ctx.setAttachment(state = new State());
} else {
state.lastReadTime = System.currentTimeMillis();
}
}
private void destroy() {
if (timeout != null) {
timeout.cancel();
private void destroy(ChannelHandlerContext ctx) {
State state = (State) ctx.getAttachment();
if (state.timeout != null) {
state.timeout.cancel();
state.timeout = null;
}
timeout = null;
task = null;
}
protected void readTimedOut(ChannelHandlerContext ctx) throws Exception {
@ -227,22 +236,34 @@ public class ReadTimeoutHandler extends SimpleChannelUpstreamHandler
return;
}
State state = (State) ctx.getAttachment();
long currentTime = System.currentTimeMillis();
long nextDelay = timeoutMillis - (currentTime - lastReadTime);
long nextDelay = timeoutMillis - (currentTime - state.lastReadTime);
if (nextDelay <= 0) {
// Read timed out - set a new timeout and notify the callback.
ReadTimeoutHandler.this.timeout =
state.timeout =
timer.newTimeout(this, timeoutMillis, TimeUnit.MILLISECONDS);
try {
// FIXME This should be called from an I/O thread.
// To be fixed in Netty 4.
readTimedOut(ctx);
} catch (Throwable t) {
fireExceptionCaught(ctx, t);
}
} else {
// Read occurred before the timeout - set a new timeout with shorter delay.
ReadTimeoutHandler.this.timeout =
state.timeout =
timer.newTimeout(this, nextDelay, TimeUnit.MILLISECONDS);
}
}
}
private static final class State {
volatile Timeout timeout;
volatile long lastReadTime = System.currentTimeMillis();
State() {
super();
}
}
}