From 217fb0de055258b70e540d139ae945db94ee8390 Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Wed, 2 Jul 2014 10:39:34 +0200 Subject: [PATCH] [#2618] Introduce ChannelPromise.unvoid() and ChannelFuture.isVoid() Motivation: There is no way for a ChannelHandler to check if the passed in ChannelPromise for a write(...) call is a VoidChannelPromise. This is a problem as some handlers need to add listeners to the ChannelPromise which is not possible in the case of a VoidChannelPromise. Modification: - Introduce ChannelFuture.isVoid() which will return true if it is not possible to add listeners or wait on the result. - Add ChannelPromise.unvoid() which allows to create a ChannelFuture out of a void ChannelFuture which supports all the operations. Result: It's now easy to write ChannelHandler implementations which also works when a void ChannelPromise is used. --- .../handler/timeout/IdleStateHandler.java | 5 +- .../handler/timeout/WriteTimeoutHandler.java | 49 ++++++++++--------- .../java/io/netty/channel/ChannelFuture.java | 16 ++++++ .../channel/ChannelProgressivePromise.java | 3 ++ .../java/io/netty/channel/ChannelPromise.java | 5 ++ .../netty/channel/CompleteChannelFuture.java | 5 ++ .../DefaultChannelProgressivePromise.java | 10 ++++ .../netty/channel/DefaultChannelPromise.java | 10 ++++ .../io/netty/channel/VoidChannelPromise.java | 21 ++++++++ 9 files changed, 98 insertions(+), 26 deletions(-) diff --git a/handler/src/main/java/io/netty/handler/timeout/IdleStateHandler.java b/handler/src/main/java/io/netty/handler/timeout/IdleStateHandler.java index 78d4616515..80a511585b 100644 --- a/handler/src/main/java/io/netty/handler/timeout/IdleStateHandler.java +++ b/handler/src/main/java/io/netty/handler/timeout/IdleStateHandler.java @@ -256,14 +256,15 @@ public class IdleStateHandler extends ChannelDuplexHandler { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { - promise.addListener(new ChannelFutureListener() { + ChannelPromise unvoid = promise.unvoid(); + unvoid.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { lastWriteTime = System.nanoTime(); firstWriterIdleEvent = firstAllIdleEvent = true; } }); - ctx.write(msg, promise); + ctx.write(msg, unvoid); } private void initialize(ChannelHandlerContext ctx) { diff --git a/handler/src/main/java/io/netty/handler/timeout/WriteTimeoutHandler.java b/handler/src/main/java/io/netty/handler/timeout/WriteTimeoutHandler.java index 6252961e95..b93864eb93 100644 --- a/handler/src/main/java/io/netty/handler/timeout/WriteTimeoutHandler.java +++ b/handler/src/main/java/io/netty/handler/timeout/WriteTimeoutHandler.java @@ -103,37 +103,38 @@ public class WriteTimeoutHandler extends ChannelOutboundHandlerAdapter { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { - scheduleTimeout(ctx, promise); + if (timeoutNanos > 0) { + promise = promise.unvoid(); + scheduleTimeout(ctx, promise); + } ctx.write(msg, promise); } private void scheduleTimeout(final ChannelHandlerContext ctx, final ChannelPromise future) { - if (timeoutNanos > 0) { - // Schedule a timeout. - final ScheduledFuture sf = ctx.executor().schedule(new Runnable() { - @Override - public void run() { - // Was not written yet so issue a write timeout - // The future itself will be failed with a ClosedChannelException once the close() was issued - // See https://github.com/netty/netty/issues/2159 - if (!future.isDone()) { - try { - writeTimedOut(ctx); - } catch (Throwable t) { - ctx.fireExceptionCaught(t); - } + // Schedule a timeout. + final ScheduledFuture sf = ctx.executor().schedule(new Runnable() { + @Override + public void run() { + // Was not written yet so issue a write timeout + // The future itself will be failed with a ClosedChannelException once the close() was issued + // See https://github.com/netty/netty/issues/2159 + if (!future.isDone()) { + try { + writeTimedOut(ctx); + } catch (Throwable t) { + ctx.fireExceptionCaught(t); } } - }, timeoutNanos, TimeUnit.NANOSECONDS); + } + }, timeoutNanos, TimeUnit.NANOSECONDS); - // Cancel the scheduled timeout if the flush future is complete. - future.addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - sf.cancel(false); - } - }); - } + // Cancel the scheduled timeout if the flush future is complete. + future.addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + sf.cancel(false); + } + }); } /** diff --git a/transport/src/main/java/io/netty/channel/ChannelFuture.java b/transport/src/main/java/io/netty/channel/ChannelFuture.java index f2eab7ac43..dfb03cb4c2 100644 --- a/transport/src/main/java/io/netty/channel/ChannelFuture.java +++ b/transport/src/main/java/io/netty/channel/ChannelFuture.java @@ -193,4 +193,20 @@ public interface ChannelFuture extends Future { @Override ChannelFuture awaitUninterruptibly(); + + /** + * Returns {@code true} if this {@link ChannelFuture} is a void future and so not allow to call any of the + * following methods: + * + */ + boolean isVoid(); } diff --git a/transport/src/main/java/io/netty/channel/ChannelProgressivePromise.java b/transport/src/main/java/io/netty/channel/ChannelProgressivePromise.java index b665b7d4e3..b2f9b7c523 100644 --- a/transport/src/main/java/io/netty/channel/ChannelProgressivePromise.java +++ b/transport/src/main/java/io/netty/channel/ChannelProgressivePromise.java @@ -59,4 +59,7 @@ public interface ChannelProgressivePromise extends ProgressivePromise, Cha @Override ChannelProgressivePromise setProgress(long progress, long total); + + @Override + ChannelProgressivePromise unvoid(); } diff --git a/transport/src/main/java/io/netty/channel/ChannelPromise.java b/transport/src/main/java/io/netty/channel/ChannelPromise.java index b808c6395a..3c9dd2407d 100644 --- a/transport/src/main/java/io/netty/channel/ChannelPromise.java +++ b/transport/src/main/java/io/netty/channel/ChannelPromise.java @@ -60,4 +60,9 @@ public interface ChannelPromise extends ChannelFuture, Promise { @Override ChannelPromise awaitUninterruptibly(); + + /** + * Returns a new {@link ChannelPromise} if {@link #isVoid()} returns {@code true} otherwise itself. + */ + ChannelPromise unvoid(); } diff --git a/transport/src/main/java/io/netty/channel/CompleteChannelFuture.java b/transport/src/main/java/io/netty/channel/CompleteChannelFuture.java index 4557aa3938..67a86e5d97 100644 --- a/transport/src/main/java/io/netty/channel/CompleteChannelFuture.java +++ b/transport/src/main/java/io/netty/channel/CompleteChannelFuture.java @@ -104,4 +104,9 @@ abstract class CompleteChannelFuture extends CompleteFuture implements Cha public Void getNow() { return null; } + + @Override + public boolean isVoid() { + return false; + } } diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelProgressivePromise.java b/transport/src/main/java/io/netty/channel/DefaultChannelProgressivePromise.java index 4b101e0cc0..a71f01f5b8 100644 --- a/transport/src/main/java/io/netty/channel/DefaultChannelProgressivePromise.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelProgressivePromise.java @@ -166,4 +166,14 @@ public class DefaultChannelProgressivePromise super.checkDeadLock(); } } + + @Override + public ChannelProgressivePromise unvoid() { + return this; + } + + @Override + public boolean isVoid() { + return false; + } } diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelPromise.java b/transport/src/main/java/io/netty/channel/DefaultChannelPromise.java index 3d76347281..0fc89b9069 100644 --- a/transport/src/main/java/io/netty/channel/DefaultChannelPromise.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelPromise.java @@ -157,4 +157,14 @@ public class DefaultChannelPromise extends DefaultPromise implements Chann super.checkDeadLock(); } } + + @Override + public ChannelPromise unvoid() { + return this; + } + + @Override + public boolean isVoid() { + return false; + } } diff --git a/transport/src/main/java/io/netty/channel/VoidChannelPromise.java b/transport/src/main/java/io/netty/channel/VoidChannelPromise.java index 6b29ce7073..0207a82c68 100644 --- a/transport/src/main/java/io/netty/channel/VoidChannelPromise.java +++ b/transport/src/main/java/io/netty/channel/VoidChannelPromise.java @@ -193,6 +193,27 @@ final class VoidChannelPromise extends AbstractFuture implements ChannelPr return null; } + @Override + public ChannelPromise unvoid() { + ChannelPromise promise = new DefaultChannelPromise(channel); + if (fireException) { + promise.addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (!future.isSuccess()) { + fireException(future.cause()); + } + } + }); + } + return promise; + } + + @Override + public boolean isVoid() { + return true; + } + private void fireException(Throwable cause) { // Only fire the exception if the channel is open and registered // if not the pipeline is not setup and so it would hit the tail