[#2618] Introduce ChannelPromise.unvoid() and ChannelFuture.isVoid()

Motivation:

There is no way for a ChannelHandler to check if the passed in ChannelPromise for a write(...) call is a VoidChannelPromise. This is a problem as some handlers need to add listeners to the ChannelPromise which is not possible in the case of a VoidChannelPromise.

Modification:

- Introduce ChannelFuture.isVoid() which will return true if it is not possible to add listeners or wait on the result.
- Add ChannelPromise.unvoid() which allows to create a ChannelFuture out of a void ChannelFuture which supports all the operations.

Result:

It's now easy to write ChannelHandler implementations which also works when a void ChannelPromise is used.
This commit is contained in:
Norman Maurer 2014-07-02 10:39:34 +02:00
parent 9c4970363e
commit 2438ec5e24
9 changed files with 98 additions and 26 deletions

View File

@ -256,14 +256,15 @@ public class IdleStateHandler extends ChannelHandlerAdapter {
@Override @Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
promise.addListener(new ChannelFutureListener() { ChannelPromise unvoid = promise.unvoid();
unvoid.addListener(new ChannelFutureListener() {
@Override @Override
public void operationComplete(ChannelFuture future) throws Exception { public void operationComplete(ChannelFuture future) throws Exception {
lastWriteTime = System.nanoTime(); lastWriteTime = System.nanoTime();
firstWriterIdleEvent = firstAllIdleEvent = true; firstWriterIdleEvent = firstAllIdleEvent = true;
} }
}); });
ctx.write(msg, promise); ctx.write(msg, unvoid);
} }
private void initialize(ChannelHandlerContext ctx) { private void initialize(ChannelHandlerContext ctx) {

View File

@ -102,37 +102,38 @@ public class WriteTimeoutHandler extends ChannelHandlerAdapter {
@Override @Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
scheduleTimeout(ctx, promise); if (timeoutNanos > 0) {
promise = promise.unvoid();
scheduleTimeout(ctx, promise);
}
ctx.write(msg, promise); ctx.write(msg, promise);
} }
private void scheduleTimeout(final ChannelHandlerContext ctx, final ChannelPromise future) { private void scheduleTimeout(final ChannelHandlerContext ctx, final ChannelPromise future) {
if (timeoutNanos > 0) { // Schedule a timeout.
// Schedule a timeout. final ScheduledFuture<?> sf = ctx.executor().schedule(new Runnable() {
final ScheduledFuture<?> sf = ctx.executor().schedule(new Runnable() { @Override
@Override public void run() {
public void run() { // Was not written yet so issue a write timeout
// Was not written yet so issue a write timeout // The future itself will be failed with a ClosedChannelException once the close() was issued
// The future itself will be failed with a ClosedChannelException once the close() was issued // See https://github.com/netty/netty/issues/2159
// See https://github.com/netty/netty/issues/2159 if (!future.isDone()) {
if (!future.isDone()) { try {
try { writeTimedOut(ctx);
writeTimedOut(ctx); } catch (Throwable t) {
} catch (Throwable t) { ctx.fireExceptionCaught(t);
ctx.fireExceptionCaught(t);
}
} }
} }
}, timeoutNanos, TimeUnit.NANOSECONDS); }
}, timeoutNanos, TimeUnit.NANOSECONDS);
// Cancel the scheduled timeout if the flush future is complete. // Cancel the scheduled timeout if the flush future is complete.
future.addListener(new ChannelFutureListener() { future.addListener(new ChannelFutureListener() {
@Override @Override
public void operationComplete(ChannelFuture future) throws Exception { public void operationComplete(ChannelFuture future) throws Exception {
sf.cancel(false); sf.cancel(false);
} }
}); });
}
} }
/** /**

View File

@ -193,4 +193,20 @@ public interface ChannelFuture extends Future<Void> {
@Override @Override
ChannelFuture awaitUninterruptibly(); ChannelFuture awaitUninterruptibly();
/**
* Returns {@code true} if this {@link ChannelFuture} is a void future and so not allow to call any of the
* following methods:
* <ul>
* <li>{@link #addListener(GenericFutureListener)}</li>
* <li>{@link #addListeners(GenericFutureListener[])}</li>
* <li>{@link #await()}</li>
* <li>{@link #await(long, TimeUnit)} ()}</li>
* <li>{@link #await(long)} ()}</li>
* <li>{@link #awaitUninterruptibly()}</li>
* <li>{@link #sync()}</li>
* <li>{@link #syncUninterruptibly()}</li>
* </ul>
*/
boolean isVoid();
} }

View File

@ -59,4 +59,7 @@ public interface ChannelProgressivePromise extends ProgressivePromise<Void>, Cha
@Override @Override
ChannelProgressivePromise setProgress(long progress, long total); ChannelProgressivePromise setProgress(long progress, long total);
@Override
ChannelProgressivePromise unvoid();
} }

View File

@ -60,4 +60,9 @@ public interface ChannelPromise extends ChannelFuture, Promise<Void> {
@Override @Override
ChannelPromise awaitUninterruptibly(); ChannelPromise awaitUninterruptibly();
/**
* Returns a new {@link ChannelPromise} if {@link #isVoid()} returns {@code true} otherwise itself.
*/
ChannelPromise unvoid();
} }

View File

@ -104,4 +104,9 @@ abstract class CompleteChannelFuture extends CompleteFuture<Void> implements Cha
public Void getNow() { public Void getNow() {
return null; return null;
} }
@Override
public boolean isVoid() {
return false;
}
} }

View File

@ -166,4 +166,14 @@ public class DefaultChannelProgressivePromise
super.checkDeadLock(); super.checkDeadLock();
} }
} }
@Override
public ChannelProgressivePromise unvoid() {
return this;
}
@Override
public boolean isVoid() {
return false;
}
} }

View File

@ -157,4 +157,14 @@ public class DefaultChannelPromise extends DefaultPromise<Void> implements Chann
super.checkDeadLock(); super.checkDeadLock();
} }
} }
@Override
public ChannelPromise unvoid() {
return this;
}
@Override
public boolean isVoid() {
return false;
}
} }

View File

@ -193,6 +193,27 @@ final class VoidChannelPromise extends AbstractFuture<Void> implements ChannelPr
return null; return null;
} }
@Override
public ChannelPromise unvoid() {
ChannelPromise promise = new DefaultChannelPromise(channel);
if (fireException) {
promise.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
fireException(future.cause());
}
}
});
}
return promise;
}
@Override
public boolean isVoid() {
return true;
}
private void fireException(Throwable cause) { private void fireException(Throwable cause) {
// Only fire the exception if the channel is open and registered // Only fire the exception if the channel is open and registered
// if not the pipeline is not setup and so it would hit the tail // if not the pipeline is not setup and so it would hit the tail