diff --git a/handler/src/main/java/io/netty/handler/timeout/ReadTimeoutException.java b/handler/src/main/java/io/netty/handler/timeout/ReadTimeoutException.java index 1d0adbe5d1..d91f053f48 100644 --- a/handler/src/main/java/io/netty/handler/timeout/ReadTimeoutException.java +++ b/handler/src/main/java/io/netty/handler/timeout/ReadTimeoutException.java @@ -21,32 +21,9 @@ package io.netty.handler.timeout; */ public class ReadTimeoutException extends TimeoutException { - private static final long serialVersionUID = -4596059237992273913L; + private static final long serialVersionUID = 169287984113283421L; - /** - * Creates a new instance. - */ - public ReadTimeoutException() { - } + public static final ReadTimeoutException INSTANCE = new ReadTimeoutException(); - /** - * Creates a new instance. - */ - public ReadTimeoutException(String message, Throwable cause) { - super(message, cause); - } - - /** - * Creates a new instance. - */ - public ReadTimeoutException(String message) { - super(message); - } - - /** - * Creates a new instance. - */ - public ReadTimeoutException(Throwable cause) { - super(cause); - } + private ReadTimeoutException() {} } diff --git a/handler/src/main/java/io/netty/handler/timeout/ReadTimeoutHandler.java b/handler/src/main/java/io/netty/handler/timeout/ReadTimeoutHandler.java index 60aa0f1494..1c08ac7030 100644 --- a/handler/src/main/java/io/netty/handler/timeout/ReadTimeoutHandler.java +++ b/handler/src/main/java/io/netty/handler/timeout/ReadTimeoutHandler.java @@ -73,18 +73,15 @@ import java.util.concurrent.TimeUnit; */ public class ReadTimeoutHandler extends ChannelInboundHandlerAdapter { - private static final ReadTimeoutException EXCEPTION = new ReadTimeoutException(); - - static { - EXCEPTION.setStackTrace(new StackTraceElement[0]); - } - private final long timeoutMillis; private volatile ScheduledFuture timeout; private volatile long lastReadTime; + private volatile boolean destroyed; + private boolean closed; + /** * Creates a new instance. * @@ -187,7 +184,11 @@ public class ReadTimeoutHandler extends ChannelInboundHandlerAdapter { } protected void readTimedOut(ChannelHandlerContext ctx) throws Exception { - ctx.fireExceptionCaught(EXCEPTION); + if (!closed) { + ctx.fireExceptionCaught(ReadTimeoutException.INSTANCE); + ctx.close(); + closed = true; + } } private final class ReadTimeoutTask implements Runnable { @@ -210,8 +211,6 @@ public class ReadTimeoutHandler extends ChannelInboundHandlerAdapter { // Read timed out - set a new timeout and notify the callback. timeout = ctx.eventLoop().schedule(this, timeoutMillis, TimeUnit.MILLISECONDS); try { - // FIXME This should be called from an I/O thread. - // To be fixed in Netty 4. readTimedOut(ctx); } catch (Throwable t) { ctx.fireExceptionCaught(t); diff --git a/handler/src/main/java/io/netty/handler/timeout/TimeoutException.java b/handler/src/main/java/io/netty/handler/timeout/TimeoutException.java index 17ec225427..b48e09b1f5 100644 --- a/handler/src/main/java/io/netty/handler/timeout/TimeoutException.java +++ b/handler/src/main/java/io/netty/handler/timeout/TimeoutException.java @@ -25,30 +25,10 @@ public class TimeoutException extends ChannelException { private static final long serialVersionUID = 4673641882869672533L; - /** - * Creates a new instance. - */ - public TimeoutException() { - } + TimeoutException() {} - /** - * Creates a new instance. - */ - public TimeoutException(String message, Throwable cause) { - super(message, cause); - } - - /** - * Creates a new instance. - */ - public TimeoutException(String message) { - super(message); - } - - /** - * Creates a new instance. - */ - public TimeoutException(Throwable cause) { - super(cause); + @Override + public Throwable fillInStackTrace() { + return this; } } diff --git a/handler/src/main/java/io/netty/handler/timeout/WriteTimeoutException.java b/handler/src/main/java/io/netty/handler/timeout/WriteTimeoutException.java index 42c2ecedab..1f63d6ba9c 100644 --- a/handler/src/main/java/io/netty/handler/timeout/WriteTimeoutException.java +++ b/handler/src/main/java/io/netty/handler/timeout/WriteTimeoutException.java @@ -15,39 +15,15 @@ */ package io.netty.handler.timeout; - /** * A {@link TimeoutException} raised by {@link WriteTimeoutHandler} when no data * was written within a certain period of time. */ public class WriteTimeoutException extends TimeoutException { - private static final long serialVersionUID = -7746685254523245218L; + private static final long serialVersionUID = -144786655770296065L; - /** - * Creates a new instance. - */ - public WriteTimeoutException() { - } + public static final WriteTimeoutException INSTANCE = new WriteTimeoutException(); - /** - * Creates a new instance. - */ - public WriteTimeoutException(String message, Throwable cause) { - super(message, cause); - } - - /** - * Creates a new instance. - */ - public WriteTimeoutException(String message) { - super(message); - } - - /** - * Creates a new instance. - */ - public WriteTimeoutException(Throwable cause) { - super(cause); - } + private WriteTimeoutException() {} } diff --git a/handler/src/main/java/io/netty/handler/timeout/WriteTimeoutHandler.java b/handler/src/main/java/io/netty/handler/timeout/WriteTimeoutHandler.java index 0806048b00..30121824e0 100644 --- a/handler/src/main/java/io/netty/handler/timeout/WriteTimeoutHandler.java +++ b/handler/src/main/java/io/netty/handler/timeout/WriteTimeoutHandler.java @@ -15,25 +15,21 @@ */ package io.netty.handler.timeout; -import static io.netty.channel.Channels.*; - -import java.util.concurrent.TimeUnit; - import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.ChannelBufferHolder; +import io.netty.channel.ChannelBufferHolders; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelOutboundHandlerAdapter; +import io.netty.channel.ChannelOutboundHandlerContext; import io.netty.channel.ChannelPipeline; -import io.netty.channel.ChannelPipelineFactory; -import io.netty.channel.Channels; -import io.netty.channel.MessageEvent; -import io.netty.channel.SimpleChannelDownstreamHandler; -import io.netty.channel.ChannelHandler.Sharable; -import io.netty.util.ExternalResourceReleasable; import io.netty.util.HashedWheelTimer; -import io.netty.util.Timeout; import io.netty.util.Timer; -import io.netty.util.TimerTask; + +import java.nio.channels.Channels; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; /** * Raises a {@link WriteTimeoutException} when no data was written within a @@ -72,48 +68,35 @@ import io.netty.util.TimerTask; * @apiviz.uses io.netty.util.HashedWheelTimer * @apiviz.has io.netty.handler.timeout.TimeoutException oneway - - raises */ -@Sharable -public class WriteTimeoutHandler extends SimpleChannelDownstreamHandler - implements ExternalResourceReleasable { +public class WriteTimeoutHandler extends ChannelOutboundHandlerAdapter { - static final WriteTimeoutException EXCEPTION = new WriteTimeoutException(); - - private final Timer timer; private final long timeoutMillis; + private boolean closed; + /** * Creates a new instance. * - * @param timer - * the {@link Timer} that is used to trigger the scheduled event. - * The recommended {@link Timer} implementation is {@link HashedWheelTimer}. * @param timeoutSeconds * write timeout in seconds */ - public WriteTimeoutHandler(Timer timer, int timeoutSeconds) { - this(timer, timeoutSeconds, TimeUnit.SECONDS); + public WriteTimeoutHandler(int timeoutSeconds) { + this(timeoutSeconds, TimeUnit.SECONDS); } /** * Creates a new instance. * - * @param timer - * the {@link Timer} that is used to trigger the scheduled event. - * The recommended {@link Timer} implementation is {@link HashedWheelTimer}. * @param timeout * write timeout * @param unit * the {@link TimeUnit} of {@code timeout} */ - public WriteTimeoutHandler(Timer timer, long timeout, TimeUnit unit) { - if (timer == null) { - throw new NullPointerException("timer"); - } + public WriteTimeoutHandler(long timeout, TimeUnit unit) { if (unit == null) { throw new NullPointerException("unit"); } - this.timer = timer; if (timeout <= 0) { timeoutMillis = 0; } else { @@ -121,84 +104,47 @@ public class WriteTimeoutHandler extends SimpleChannelDownstreamHandler } } - /** - * Stops the {@link Timer} which was specified in the constructor of this - * handler. You should not call this method if the {@link Timer} is in use - * by other objects. - */ @Override - public void releaseExternalResources() { - timer.stop(); - } - - protected long getTimeoutMillis(MessageEvent e) { - return timeoutMillis; + public ChannelBufferHolder newOutboundBuffer(ChannelOutboundHandlerContext ctx) throws Exception { + return ChannelBufferHolders.outboundBypassBuffer(ctx); } @Override - public void writeRequested(ChannelHandlerContext ctx, MessageEvent e) - throws Exception { - - long timeoutMillis = getTimeoutMillis(e); + public void flush(final ChannelOutboundHandlerContext ctx, final ChannelFuture future) throws Exception { if (timeoutMillis > 0) { - // Set timeout only when getTimeoutMillis() returns a positive value. - ChannelFuture future = e.getFuture(); - final Timeout timeout = timer.newTimeout( - new WriteTimeoutTask(ctx, future), - timeoutMillis, TimeUnit.MILLISECONDS); + // Schedule a timeout. + final ScheduledFuture sf = ctx.eventLoop().schedule(new Runnable() { + @Override + public void run() { + if (future.setFailure(WriteTimeoutException.INSTANCE)) { + // If succeeded to mark as failure, notify the pipeline, too. + try { + writeTimedOut(ctx); + } catch (Throwable t) { + ctx.fireExceptionCaught(t); + } + } + } - future.addListener(new TimeoutCanceller(timeout)); + }, timeoutMillis, TimeUnit.MILLISECONDS); + + // Cancel the scheduled timeout if the flush future is complete. + future.addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + sf.cancel(false); + } + }); } - super.writeRequested(ctx, e); + super.flush(ctx, future); } protected void writeTimedOut(ChannelHandlerContext ctx) throws Exception { - Channels.fireExceptionCaughtLater(ctx, EXCEPTION); - } - - private final class WriteTimeoutTask implements TimerTask { - - private final ChannelHandlerContext ctx; - private final ChannelFuture future; - - WriteTimeoutTask(ChannelHandlerContext ctx, ChannelFuture future) { - this.ctx = ctx; - this.future = future; - } - - @Override - public void run(Timeout timeout) throws Exception { - if (timeout.isCancelled()) { - return; - } - - if (!ctx.channel().isOpen()) { - return; - } - - // Mark the future as failure - if (future.setFailure(EXCEPTION)) { - // If succeeded to mark as failure, notify the pipeline, too. - try { - writeTimedOut(ctx); - } catch (Throwable t) { - fireExceptionCaught(ctx, t); - } - } - } - } - - private static final class TimeoutCanceller implements ChannelFutureListener { - private final Timeout timeout; - - TimeoutCanceller(Timeout timeout) { - this.timeout = timeout; - } - - @Override - public void operationComplete(ChannelFuture future) throws Exception { - timeout.cancel(); + if (!closed) { + ctx.fireExceptionCaught(WriteTimeoutException.INSTANCE); + ctx.close(); + closed = true; } } }