diff --git a/handler/src/main/java/io/netty/handler/timeout/IdleStateHandler.java b/handler/src/main/java/io/netty/handler/timeout/IdleStateHandler.java index 4437e33a28..f60345f225 100644 --- a/handler/src/main/java/io/netty/handler/timeout/IdleStateHandler.java +++ b/handler/src/main/java/io/netty/handler/timeout/IdleStateHandler.java @@ -256,14 +256,15 @@ public class IdleStateHandler extends ChannelHandlerAdapter { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { - promise.addListener(new ChannelFutureListener() { + ChannelPromise unvoid = promise.unvoid(); + unvoid.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { lastWriteTime = System.nanoTime(); firstWriterIdleEvent = firstAllIdleEvent = true; } }); - ctx.write(msg, promise); + ctx.write(msg, unvoid); } private void initialize(ChannelHandlerContext ctx) { diff --git a/handler/src/main/java/io/netty/handler/timeout/WriteTimeoutHandler.java b/handler/src/main/java/io/netty/handler/timeout/WriteTimeoutHandler.java index 2a6f860721..f637b2d196 100644 --- a/handler/src/main/java/io/netty/handler/timeout/WriteTimeoutHandler.java +++ b/handler/src/main/java/io/netty/handler/timeout/WriteTimeoutHandler.java @@ -102,37 +102,38 @@ public class WriteTimeoutHandler extends ChannelHandlerAdapter { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { - scheduleTimeout(ctx, promise); + if (timeoutNanos > 0) { + promise = promise.unvoid(); + scheduleTimeout(ctx, promise); + } ctx.write(msg, promise); } private void scheduleTimeout(final ChannelHandlerContext ctx, final ChannelPromise future) { - if (timeoutNanos > 0) { - // Schedule a timeout. - final ScheduledFuture sf = ctx.executor().schedule(new Runnable() { - @Override - public void run() { - // Was not written yet so issue a write timeout - // The future itself will be failed with a ClosedChannelException once the close() was issued - // See https://github.com/netty/netty/issues/2159 - if (!future.isDone()) { - try { - writeTimedOut(ctx); - } catch (Throwable t) { - ctx.fireExceptionCaught(t); - } + // Schedule a timeout. + final ScheduledFuture sf = ctx.executor().schedule(new Runnable() { + @Override + public void run() { + // Was not written yet so issue a write timeout + // The future itself will be failed with a ClosedChannelException once the close() was issued + // See https://github.com/netty/netty/issues/2159 + if (!future.isDone()) { + try { + writeTimedOut(ctx); + } catch (Throwable t) { + ctx.fireExceptionCaught(t); } } - }, timeoutNanos, TimeUnit.NANOSECONDS); + } + }, timeoutNanos, TimeUnit.NANOSECONDS); - // Cancel the scheduled timeout if the flush future is complete. - future.addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - sf.cancel(false); - } - }); - } + // Cancel the scheduled timeout if the flush future is complete. + future.addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + sf.cancel(false); + } + }); } /** diff --git a/transport/src/main/java/io/netty/channel/ChannelFuture.java b/transport/src/main/java/io/netty/channel/ChannelFuture.java index f2eab7ac43..dfb03cb4c2 100644 --- a/transport/src/main/java/io/netty/channel/ChannelFuture.java +++ b/transport/src/main/java/io/netty/channel/ChannelFuture.java @@ -193,4 +193,20 @@ public interface ChannelFuture extends Future { @Override ChannelFuture awaitUninterruptibly(); + + /** + * Returns {@code true} if this {@link ChannelFuture} is a void future and so not allow to call any of the + * following methods: + * + */ + boolean isVoid(); } diff --git a/transport/src/main/java/io/netty/channel/ChannelProgressivePromise.java b/transport/src/main/java/io/netty/channel/ChannelProgressivePromise.java index b665b7d4e3..b2f9b7c523 100644 --- a/transport/src/main/java/io/netty/channel/ChannelProgressivePromise.java +++ b/transport/src/main/java/io/netty/channel/ChannelProgressivePromise.java @@ -59,4 +59,7 @@ public interface ChannelProgressivePromise extends ProgressivePromise, Cha @Override ChannelProgressivePromise setProgress(long progress, long total); + + @Override + ChannelProgressivePromise unvoid(); } diff --git a/transport/src/main/java/io/netty/channel/ChannelPromise.java b/transport/src/main/java/io/netty/channel/ChannelPromise.java index b808c6395a..3c9dd2407d 100644 --- a/transport/src/main/java/io/netty/channel/ChannelPromise.java +++ b/transport/src/main/java/io/netty/channel/ChannelPromise.java @@ -60,4 +60,9 @@ public interface ChannelPromise extends ChannelFuture, Promise { @Override ChannelPromise awaitUninterruptibly(); + + /** + * Returns a new {@link ChannelPromise} if {@link #isVoid()} returns {@code true} otherwise itself. + */ + ChannelPromise unvoid(); } diff --git a/transport/src/main/java/io/netty/channel/CompleteChannelFuture.java b/transport/src/main/java/io/netty/channel/CompleteChannelFuture.java index 4557aa3938..67a86e5d97 100644 --- a/transport/src/main/java/io/netty/channel/CompleteChannelFuture.java +++ b/transport/src/main/java/io/netty/channel/CompleteChannelFuture.java @@ -104,4 +104,9 @@ abstract class CompleteChannelFuture extends CompleteFuture implements Cha public Void getNow() { return null; } + + @Override + public boolean isVoid() { + return false; + } } diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelProgressivePromise.java b/transport/src/main/java/io/netty/channel/DefaultChannelProgressivePromise.java index 4b101e0cc0..a71f01f5b8 100644 --- a/transport/src/main/java/io/netty/channel/DefaultChannelProgressivePromise.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelProgressivePromise.java @@ -166,4 +166,14 @@ public class DefaultChannelProgressivePromise super.checkDeadLock(); } } + + @Override + public ChannelProgressivePromise unvoid() { + return this; + } + + @Override + public boolean isVoid() { + return false; + } } diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelPromise.java b/transport/src/main/java/io/netty/channel/DefaultChannelPromise.java index 3d76347281..0fc89b9069 100644 --- a/transport/src/main/java/io/netty/channel/DefaultChannelPromise.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelPromise.java @@ -157,4 +157,14 @@ public class DefaultChannelPromise extends DefaultPromise implements Chann super.checkDeadLock(); } } + + @Override + public ChannelPromise unvoid() { + return this; + } + + @Override + public boolean isVoid() { + return false; + } } diff --git a/transport/src/main/java/io/netty/channel/VoidChannelPromise.java b/transport/src/main/java/io/netty/channel/VoidChannelPromise.java index 6b29ce7073..0207a82c68 100644 --- a/transport/src/main/java/io/netty/channel/VoidChannelPromise.java +++ b/transport/src/main/java/io/netty/channel/VoidChannelPromise.java @@ -193,6 +193,27 @@ final class VoidChannelPromise extends AbstractFuture implements ChannelPr return null; } + @Override + public ChannelPromise unvoid() { + ChannelPromise promise = new DefaultChannelPromise(channel); + if (fireException) { + promise.addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (!future.isSuccess()) { + fireException(future.cause()); + } + } + }); + } + return promise; + } + + @Override + public boolean isVoid() { + return true; + } + private void fireException(Throwable cause) { // Only fire the exception if the channel is open and registered // if not the pipeline is not setup and so it would hit the tail