Ported Read/WriteTimeoutHandler with simplification

- The default behavior is now to close the channel on timeout.  A user
  can override this behavior, but I would just use IdleStateHandler or
  use eventLoop's timer facility directly for finer control.
This commit is contained in:
Trustin Lee 2012-05-31 15:53:38 -07:00
parent 5f24f176bb
commit 2aa466640e
5 changed files with 62 additions and 184 deletions

View File

@ -21,32 +21,9 @@ package io.netty.handler.timeout;
*/ */
public class ReadTimeoutException extends TimeoutException { public class ReadTimeoutException extends TimeoutException {
private static final long serialVersionUID = -4596059237992273913L; private static final long serialVersionUID = 169287984113283421L;
/** public static final ReadTimeoutException INSTANCE = new ReadTimeoutException();
* Creates a new instance.
*/
public ReadTimeoutException() {
}
/** private ReadTimeoutException() {}
* Creates a new instance.
*/
public ReadTimeoutException(String message, Throwable cause) {
super(message, cause);
}
/**
* Creates a new instance.
*/
public ReadTimeoutException(String message) {
super(message);
}
/**
* Creates a new instance.
*/
public ReadTimeoutException(Throwable cause) {
super(cause);
}
} }

View File

@ -73,18 +73,15 @@ import java.util.concurrent.TimeUnit;
*/ */
public class ReadTimeoutHandler extends ChannelInboundHandlerAdapter<Object> { public class ReadTimeoutHandler extends ChannelInboundHandlerAdapter<Object> {
private static final ReadTimeoutException EXCEPTION = new ReadTimeoutException();
static {
EXCEPTION.setStackTrace(new StackTraceElement[0]);
}
private final long timeoutMillis; private final long timeoutMillis;
private volatile ScheduledFuture<?> timeout; private volatile ScheduledFuture<?> timeout;
private volatile long lastReadTime; private volatile long lastReadTime;
private volatile boolean destroyed; private volatile boolean destroyed;
private boolean closed;
/** /**
* Creates a new instance. * Creates a new instance.
* *
@ -187,7 +184,11 @@ public class ReadTimeoutHandler extends ChannelInboundHandlerAdapter<Object> {
} }
protected void readTimedOut(ChannelHandlerContext ctx) throws Exception { protected void readTimedOut(ChannelHandlerContext ctx) throws Exception {
ctx.fireExceptionCaught(EXCEPTION); if (!closed) {
ctx.fireExceptionCaught(ReadTimeoutException.INSTANCE);
ctx.close();
closed = true;
}
} }
private final class ReadTimeoutTask implements Runnable { private final class ReadTimeoutTask implements Runnable {
@ -210,8 +211,6 @@ public class ReadTimeoutHandler extends ChannelInboundHandlerAdapter<Object> {
// Read timed out - set a new timeout and notify the callback. // Read timed out - set a new timeout and notify the callback.
timeout = ctx.eventLoop().schedule(this, timeoutMillis, TimeUnit.MILLISECONDS); timeout = ctx.eventLoop().schedule(this, timeoutMillis, TimeUnit.MILLISECONDS);
try { try {
// FIXME This should be called from an I/O thread.
// To be fixed in Netty 4.
readTimedOut(ctx); readTimedOut(ctx);
} catch (Throwable t) { } catch (Throwable t) {
ctx.fireExceptionCaught(t); ctx.fireExceptionCaught(t);

View File

@ -25,30 +25,10 @@ public class TimeoutException extends ChannelException {
private static final long serialVersionUID = 4673641882869672533L; private static final long serialVersionUID = 4673641882869672533L;
/** TimeoutException() {}
* Creates a new instance.
*/
public TimeoutException() {
}
/** @Override
* Creates a new instance. public Throwable fillInStackTrace() {
*/ return this;
public TimeoutException(String message, Throwable cause) {
super(message, cause);
}
/**
* Creates a new instance.
*/
public TimeoutException(String message) {
super(message);
}
/**
* Creates a new instance.
*/
public TimeoutException(Throwable cause) {
super(cause);
} }
} }

View File

@ -15,39 +15,15 @@
*/ */
package io.netty.handler.timeout; package io.netty.handler.timeout;
/** /**
* A {@link TimeoutException} raised by {@link WriteTimeoutHandler} when no data * A {@link TimeoutException} raised by {@link WriteTimeoutHandler} when no data
* was written within a certain period of time. * was written within a certain period of time.
*/ */
public class WriteTimeoutException extends TimeoutException { public class WriteTimeoutException extends TimeoutException {
private static final long serialVersionUID = -7746685254523245218L; private static final long serialVersionUID = -144786655770296065L;
/** public static final WriteTimeoutException INSTANCE = new WriteTimeoutException();
* Creates a new instance.
*/
public WriteTimeoutException() {
}
/** private WriteTimeoutException() {}
* Creates a new instance.
*/
public WriteTimeoutException(String message, Throwable cause) {
super(message, cause);
}
/**
* Creates a new instance.
*/
public WriteTimeoutException(String message) {
super(message);
}
/**
* Creates a new instance.
*/
public WriteTimeoutException(Throwable cause) {
super(cause);
}
} }

View File

@ -15,25 +15,21 @@
*/ */
package io.netty.handler.timeout; package io.netty.handler.timeout;
import static io.netty.channel.Channels.*;
import java.util.concurrent.TimeUnit;
import io.netty.bootstrap.ServerBootstrap; import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelBufferHolder;
import io.netty.channel.ChannelBufferHolders;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelOutboundHandlerContext;
import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPipelineFactory;
import io.netty.channel.Channels;
import io.netty.channel.MessageEvent;
import io.netty.channel.SimpleChannelDownstreamHandler;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.util.ExternalResourceReleasable;
import io.netty.util.HashedWheelTimer; import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import io.netty.util.Timer; import io.netty.util.Timer;
import io.netty.util.TimerTask;
import java.nio.channels.Channels;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
/** /**
* Raises a {@link WriteTimeoutException} when no data was written within a * Raises a {@link WriteTimeoutException} when no data was written within a
@ -72,48 +68,35 @@ import io.netty.util.TimerTask;
* @apiviz.uses io.netty.util.HashedWheelTimer * @apiviz.uses io.netty.util.HashedWheelTimer
* @apiviz.has io.netty.handler.timeout.TimeoutException oneway - - raises * @apiviz.has io.netty.handler.timeout.TimeoutException oneway - - raises
*/ */
@Sharable public class WriteTimeoutHandler extends ChannelOutboundHandlerAdapter<Object> {
public class WriteTimeoutHandler extends SimpleChannelDownstreamHandler
implements ExternalResourceReleasable {
static final WriteTimeoutException EXCEPTION = new WriteTimeoutException();
private final Timer timer;
private final long timeoutMillis; private final long timeoutMillis;
private boolean closed;
/** /**
* Creates a new instance. * Creates a new instance.
* *
* @param timer
* the {@link Timer} that is used to trigger the scheduled event.
* The recommended {@link Timer} implementation is {@link HashedWheelTimer}.
* @param timeoutSeconds * @param timeoutSeconds
* write timeout in seconds * write timeout in seconds
*/ */
public WriteTimeoutHandler(Timer timer, int timeoutSeconds) { public WriteTimeoutHandler(int timeoutSeconds) {
this(timer, timeoutSeconds, TimeUnit.SECONDS); this(timeoutSeconds, TimeUnit.SECONDS);
} }
/** /**
* Creates a new instance. * Creates a new instance.
* *
* @param timer
* the {@link Timer} that is used to trigger the scheduled event.
* The recommended {@link Timer} implementation is {@link HashedWheelTimer}.
* @param timeout * @param timeout
* write timeout * write timeout
* @param unit * @param unit
* the {@link TimeUnit} of {@code timeout} * the {@link TimeUnit} of {@code timeout}
*/ */
public WriteTimeoutHandler(Timer timer, long timeout, TimeUnit unit) { public WriteTimeoutHandler(long timeout, TimeUnit unit) {
if (timer == null) {
throw new NullPointerException("timer");
}
if (unit == null) { if (unit == null) {
throw new NullPointerException("unit"); throw new NullPointerException("unit");
} }
this.timer = timer;
if (timeout <= 0) { if (timeout <= 0) {
timeoutMillis = 0; timeoutMillis = 0;
} else { } else {
@ -121,84 +104,47 @@ public class WriteTimeoutHandler extends SimpleChannelDownstreamHandler
} }
} }
/**
* Stops the {@link Timer} which was specified in the constructor of this
* handler. You should not call this method if the {@link Timer} is in use
* by other objects.
*/
@Override @Override
public void releaseExternalResources() { public ChannelBufferHolder<Object> newOutboundBuffer(ChannelOutboundHandlerContext<Object> ctx) throws Exception {
timer.stop(); return ChannelBufferHolders.outboundBypassBuffer(ctx);
}
protected long getTimeoutMillis(MessageEvent e) {
return timeoutMillis;
} }
@Override @Override
public void writeRequested(ChannelHandlerContext ctx, MessageEvent e) public void flush(final ChannelOutboundHandlerContext<Object> ctx, final ChannelFuture future) throws Exception {
throws Exception {
long timeoutMillis = getTimeoutMillis(e);
if (timeoutMillis > 0) { if (timeoutMillis > 0) {
// Set timeout only when getTimeoutMillis() returns a positive value. // Schedule a timeout.
ChannelFuture future = e.getFuture(); final ScheduledFuture<?> sf = ctx.eventLoop().schedule(new Runnable() {
final Timeout timeout = timer.newTimeout( @Override
new WriteTimeoutTask(ctx, future), public void run() {
timeoutMillis, TimeUnit.MILLISECONDS); if (future.setFailure(WriteTimeoutException.INSTANCE)) {
// If succeeded to mark as failure, notify the pipeline, too.
try {
writeTimedOut(ctx);
} catch (Throwable t) {
ctx.fireExceptionCaught(t);
}
}
}
future.addListener(new TimeoutCanceller(timeout)); }, timeoutMillis, TimeUnit.MILLISECONDS);
// Cancel the scheduled timeout if the flush future is complete.
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
sf.cancel(false);
}
});
} }
super.writeRequested(ctx, e); super.flush(ctx, future);
} }
protected void writeTimedOut(ChannelHandlerContext ctx) throws Exception { protected void writeTimedOut(ChannelHandlerContext ctx) throws Exception {
Channels.fireExceptionCaughtLater(ctx, EXCEPTION); if (!closed) {
} ctx.fireExceptionCaught(WriteTimeoutException.INSTANCE);
ctx.close();
private final class WriteTimeoutTask implements TimerTask { closed = true;
private final ChannelHandlerContext ctx;
private final ChannelFuture future;
WriteTimeoutTask(ChannelHandlerContext ctx, ChannelFuture future) {
this.ctx = ctx;
this.future = future;
}
@Override
public void run(Timeout timeout) throws Exception {
if (timeout.isCancelled()) {
return;
}
if (!ctx.channel().isOpen()) {
return;
}
// Mark the future as failure
if (future.setFailure(EXCEPTION)) {
// If succeeded to mark as failure, notify the pipeline, too.
try {
writeTimedOut(ctx);
} catch (Throwable t) {
fireExceptionCaught(ctx, t);
}
}
}
}
private static final class TimeoutCanceller implements ChannelFutureListener {
private final Timeout timeout;
TimeoutCanceller(Timeout timeout) {
this.timeout = timeout;
}
@Override
public void operationComplete(ChannelFuture future) throws Exception {
timeout.cancel();
} }
} }
} }