[#2618] Introduce ChannelPromise.unvoid() and ChannelFuture.isVoid()

Motivation:

There is no way for a ChannelHandler to check if the passed in ChannelPromise for a write(...) call is a VoidChannelPromise. This is a problem as some handlers need to add listeners to the ChannelPromise which is not possible in the case of a VoidChannelPromise.

Modification:

- Introduce ChannelFuture.isVoid() which will return true if it is not possible to add listeners or wait on the result.
- Add ChannelPromise.unvoid() which allows to create a ChannelFuture out of a void ChannelFuture which supports all the operations.

Result:

It's now easy to write ChannelHandler implementations which also works when a void ChannelPromise is used.
This commit is contained in:
Norman Maurer 2014-07-02 10:39:34 +02:00
parent a1974ef35b
commit 217fb0de05
9 changed files with 98 additions and 26 deletions

View File

@ -256,14 +256,15 @@ public class IdleStateHandler extends ChannelDuplexHandler {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
promise.addListener(new ChannelFutureListener() {
ChannelPromise unvoid = promise.unvoid();
unvoid.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
lastWriteTime = System.nanoTime();
firstWriterIdleEvent = firstAllIdleEvent = true;
}
});
ctx.write(msg, promise);
ctx.write(msg, unvoid);
}
private void initialize(ChannelHandlerContext ctx) {

View File

@ -103,37 +103,38 @@ public class WriteTimeoutHandler extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
scheduleTimeout(ctx, promise);
if (timeoutNanos > 0) {
promise = promise.unvoid();
scheduleTimeout(ctx, promise);
}
ctx.write(msg, promise);
}
private void scheduleTimeout(final ChannelHandlerContext ctx, final ChannelPromise future) {
if (timeoutNanos > 0) {
// Schedule a timeout.
final ScheduledFuture<?> sf = ctx.executor().schedule(new Runnable() {
@Override
public void run() {
// Was not written yet so issue a write timeout
// The future itself will be failed with a ClosedChannelException once the close() was issued
// See https://github.com/netty/netty/issues/2159
if (!future.isDone()) {
try {
writeTimedOut(ctx);
} catch (Throwable t) {
ctx.fireExceptionCaught(t);
}
// Schedule a timeout.
final ScheduledFuture<?> sf = ctx.executor().schedule(new Runnable() {
@Override
public void run() {
// Was not written yet so issue a write timeout
// The future itself will be failed with a ClosedChannelException once the close() was issued
// See https://github.com/netty/netty/issues/2159
if (!future.isDone()) {
try {
writeTimedOut(ctx);
} catch (Throwable t) {
ctx.fireExceptionCaught(t);
}
}
}, timeoutNanos, TimeUnit.NANOSECONDS);
}
}, timeoutNanos, TimeUnit.NANOSECONDS);
// Cancel the scheduled timeout if the flush future is complete.
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
sf.cancel(false);
}
});
}
// Cancel the scheduled timeout if the flush future is complete.
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
sf.cancel(false);
}
});
}
/**

View File

@ -193,4 +193,20 @@ public interface ChannelFuture extends Future<Void> {
@Override
ChannelFuture awaitUninterruptibly();
/**
* Returns {@code true} if this {@link ChannelFuture} is a void future and so not allow to call any of the
* following methods:
* <ul>
* <li>{@link #addListener(GenericFutureListener)}</li>
* <li>{@link #addListeners(GenericFutureListener[])}</li>
* <li>{@link #await()}</li>
* <li>{@link #await(long, TimeUnit)} ()}</li>
* <li>{@link #await(long)} ()}</li>
* <li>{@link #awaitUninterruptibly()}</li>
* <li>{@link #sync()}</li>
* <li>{@link #syncUninterruptibly()}</li>
* </ul>
*/
boolean isVoid();
}

View File

@ -59,4 +59,7 @@ public interface ChannelProgressivePromise extends ProgressivePromise<Void>, Cha
@Override
ChannelProgressivePromise setProgress(long progress, long total);
@Override
ChannelProgressivePromise unvoid();
}

View File

@ -60,4 +60,9 @@ public interface ChannelPromise extends ChannelFuture, Promise<Void> {
@Override
ChannelPromise awaitUninterruptibly();
/**
* Returns a new {@link ChannelPromise} if {@link #isVoid()} returns {@code true} otherwise itself.
*/
ChannelPromise unvoid();
}

View File

@ -104,4 +104,9 @@ abstract class CompleteChannelFuture extends CompleteFuture<Void> implements Cha
public Void getNow() {
return null;
}
@Override
public boolean isVoid() {
return false;
}
}

View File

@ -166,4 +166,14 @@ public class DefaultChannelProgressivePromise
super.checkDeadLock();
}
}
@Override
public ChannelProgressivePromise unvoid() {
return this;
}
@Override
public boolean isVoid() {
return false;
}
}

View File

@ -157,4 +157,14 @@ public class DefaultChannelPromise extends DefaultPromise<Void> implements Chann
super.checkDeadLock();
}
}
@Override
public ChannelPromise unvoid() {
return this;
}
@Override
public boolean isVoid() {
return false;
}
}

View File

@ -193,6 +193,27 @@ final class VoidChannelPromise extends AbstractFuture<Void> implements ChannelPr
return null;
}
@Override
public ChannelPromise unvoid() {
ChannelPromise promise = new DefaultChannelPromise(channel);
if (fireException) {
promise.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
fireException(future.cause());
}
}
});
}
return promise;
}
@Override
public boolean isVoid() {
return true;
}
private void fireException(Throwable cause) {
// Only fire the exception if the channel is open and registered
// if not the pipeline is not setup and so it would hit the tail